0
点赞
收藏
分享

微信扫一扫

逻辑复制之乾坤大挪移

笑望叔叔 2022-11-03 阅读 151

前言

PostgreSQL自从V10开始支持逻辑复制这一重量级特性,在V10以前的版本需要借助pglogical这个插件来实现逻辑复制。不同于流复制的物理复制方式,逻辑复制的好处很多,比如:

  1. 充当汇集库,从多个数据库采集报表数据
  2. 根据业务需求,仅仅同步一个库中的部分表到另外一个库中
  3. 跨大版本升级
  4. 支持下游节点可写

等等。但是一直有一个疑难杂症未得到根除,复制槽不会被同步到备机上,因此主机一旦发生故障,备机提升为主后,原来的复制槽将不能继续使用。另外备库也不支持逻辑解码,因此客户端/订阅者也无法在备库创建逻辑复制插槽。

postgres=# select pg_is_in_recovery();
pg_is_in_recovery
-------------------
t
(1 row)

postgres=# create publication pub1 for table test;
ERROR: cannot execute CREATE PUBLICATION in a read-only transaction
postgres=# select pg_create_logical_replication_slot('myslot1','wal2json');
ERROR: logical decoding cannot be used while in recovery

数据丢失的隐患

如前文所述,正常使用pg_basebackup做的从库,是不会同步复制槽的信息的,这里为了演示,主库创建一个物理复制槽,在pg_replslot子目录下可以看到相关信息:

postgres=# select pg_create_physical_replication_slot('myslot');
pg_create_physical_replication_slot
-------------------------------------
(myslot,)
(1 row)

postgres=# select pg_is_in_recovery();
pg_is_in_recovery
-------------------
f
(1 row)

postgres=# \q
[postgres@xiongcc ~]$ hexdump -c pgdata/pg_replslot/myslot/state
0000000 241 034 005 001 361 222 331 021 002 \0 \0 \0 250 \0 \0 \0
0000010 m y s l o t \0 \0 \0 \0 \0 \0 \0 \0 \0 \0
0000020 \0 \0 \0 \0 \0 \0 \0 \0 \0 \0 \0 \0 \0 \0 \0 \0
*
00000b0 \0 \0 \0 \0 \0 \0 \0 \0
00000b8

使用pg_basebackup搭建一个从库,pg_replslot目录下是空的

[postgres@xiongcc ~]$ pg_basebackup -Fp -Xs -U postgres -D backupdata/ -R -v -P
pg_basebackup: initiating base backup, waiting for checkpoint to complete
pg_basebackup: checkpoint completed
pg_basebackup: write-ahead log start point: 6/34000028 on timeline 1
pg_basebackup: starting background WAL receiver
pg_basebackup: created temporary replication slot "pg_basebackup_17058"
521494/521494 kB (100%), 1/1 tablespace
pg_basebackup: write-ahead log end point: 6/34000100
pg_basebackup: waiting for background process to finish streaming ...
pg_basebackup: syncing data to disk ...
pg_basebackup: renaming backup_manifest.tmp to backup_manifest
pg_basebackup: base backup completed
[postgres@xiongcc ~]$ ll backupdata/pg_replslot/
total 0

那么问题就来了,假如在逻辑复制的场景下,主库不幸over了,逻辑复制槽并不会自动转移到备库,虽然我们可以在新的主数据库上创建一个新的复制槽然后开始一个新的发布订阅来“解决”,但这真的解决了吗?

设想这么一个场景,因为我们知道,逻辑复制的场景下备库可写,那么就无法避免锁冲突带来的延迟复制问题,又比如要订阅的表含有large object或者binary这种大对象类型,此时再碰到一个长事务,延迟也是无法避免的,那么重新创建一个复制槽,势必会导致这一段时间内的数据丢失。

下面看一个锁导致的例子

role

host

port

主库

localhost

5432

从库

localhost

5432

订阅端

localhost

5439

先搭建一个正常的主从流复制,然后主库创建发布,再启一个实例进行订阅。

主库,发布端,创建单表发布:

postgres=# create publication pub1 for table test;
CREATE PUBLICATION

订阅端创建订阅:

postgres=# create subscription mysub connection 'host=localhost port=5432 dbname=postgres' publication pub1;
NOTICE: created replication slot "mysub" on publisher
CREATE SUBSCRIPTION

主库插入几条数据:

postgres=# insert into test select n,'test',clock_timestamp() from generate_series(1,3) as n;
INSERT 0 3

订阅端可以正常消费:

postgres=# select * from test;
id | info | t_time
----+------+----------------------------
1 | test | 2021-04-12 23:48:33.825533
2 | test | 2021-04-12 23:48:33.825756
3 | test | 2021-04-12 23:48:33.825769
(3 rows)

因为订阅端可写,那么在订阅端模拟锁冲突,更新一条数据但不提交(对于delete或update的这些操作,相应表上需要有Replica Identity,以标识如何设别Old tuple,可以是primary key、unique index或者full):

postgres=# begin;
BEGIN
postgres=*# update test set id = 99 where id = 1;
UPDATE 1

然后主库也更新同一条,再进行其他写入:

postgres=# update test set id = 99 where id = 1;
UPDATE 1
postgres=# select * from pg_replication_slots ;
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size
-----------+----------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------
mysub | pgoutput | logical | 13578 | postgres | f | t | 25306 | | 10487101 | 6/7F008C68 | 6/7F008CA0 | reserved |
(1 row)

postgres=# insert into test select n,'test',clock_timestamp() from generate_series(100,1000) as n;
INSERT 0 901
postgres=# insert into test select n,'test',clock_timestamp() from generate_series(1001,10000) as n;
INSERT 0 9000

订阅端再开一个session查看数据,可以看到,一直停留在三条,因为wal的decoding是“顺序”的,后面的“消息”会全部堆积:

postgres=# select * from test;
id | info | t_time
----+------+----------------------------
1 | test | 2021-04-12 23:48:33.825533
2 | test | 2021-04-12 23:48:33.825756
3 | test | 2021-04-12 23:48:33.825769
(3 rows)

订阅端的日志上有所体现,logical worker进程正在waiting:

2021-04-13 00:00:19.696 CST,"postgres","postgres",25551,"[local]",60746e86.63cf,5,"BEGIN",2021-04-13 00:00:06 CST,5/9,0,LOG,00000,"duration: 0.150 ms",,,,,,,,,"psql","client backend"
2021-04-13 00:00:32.118 CST,"postgres","postgres",25551,"[local]",60746e86.63cf,6,"UPDATE",2021-04-13 00:00:06 CST,5/9,10487112,LOG,00000,"duration: 3.472 ms",,,,,,,,,"psql","client backend"
2021-04-13 00:00:39.938 CST,,,25541,,60746e70.63c5,2,,2021-04-12 23:59:44 CST,4/9,0,LOG,00000,"process 25541 still waiting for ShareLock on transaction 10487112 after 1000.086 ms","Process holding the lock: 25551. Wait queue: 25541.",,,,,,,,"","logical replication worker"

[postgres@xiongcc pg_log]$ ps -ef | grep 25541 | grep -v 'grep'
postgres 25541 25524 0 Apr12 ? 00:00:00 postgres: logical replication worker for subscription 41848 waiting

主库上查看pg_replication_slots,其中restart_lsn指此复制槽的使用者可能仍然需要最老的WAL的地址(LSN),confirmed_flush_lsn是使用者确切接收到的lsn

postgres=# select * from pg_replication_slots ;
-[ RECORD 1 ]-------+-----------
slot_name | mysub
plugin | pgoutput
slot_type | logical
datoid | 13578
database | postgres
temporary | f
active | f
active_pid |
xmin |
catalog_xmin | 10487109
restart_lsn | 6/83008E88
confirmed_flush_lsn | 6/83008EC0
wal_status | reserved
safe_wal_size |

这个时候模拟主库宕机了,同时将之前模拟的锁冲突回滚rollback

[postgres@xiongcc ~]$ pg_ctl -D pgdata/ stop
waiting for server to shut down.... done
server stopped

订阅端再次查看数据,还是3条,丢失了接近1W条数据:

postgres=# select * from test;
id | info | t_time
----+------+----------------------------
1 | test | 2021-04-12 23:59:51.525884
2 | test | 2021-04-12 23:59:51.52609
3 | test | 2021-04-12 23:59:51.526109
(3 rows)

postgres=# select count(*) from test;
count
-------
3
(1 row)

所以这个时候,新搭建一个复制槽以继续复制,是无用的,中间丢失的数据已找不回来。

拷贝文件是否可行?

定时将主库的复制槽信息拷贝到备库,将主库的pg_replslot下的文件拷贝到备库,这样备库也就有了一份复制槽的信息,这样是否可行?遗憾的是,备库并不会主动监测到复制槽,还需要重启

主库创建一个逻辑复制槽

postgres=# create publication pub1 for table test;
CREATE PUBLICATION
postgres=# select * from pg_replication_slots ;
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size
-----------+----------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------
mysub | pgoutput | logical | 13578 | postgres | f | t | 26231 | | 10487136 | 6/8F0093D8 | 6/8F009410 | reserved |
(1 row)

备库是看不到复制槽信息的,拷贝过去需要重启之后才能发现

[postgres@xiongcc ~]$ cp -r pgdata/pg_replslot/mysub/ backupdata/pg_replslot/
[postgres@xiongcc ~]$ psql -p 5433
psql (13.2)
Type "help" for help.

postgres=# select * from pg_replication_slots ;
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size
-----------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------
(0 rows)

postgres=# \q
[postgres@xiongcc ~]$ pg_ctl -D backupdata/ restart
waiting for server to shut down.... done
server stopped
waiting for server to start....2021-04-13 00:50:00.667 CSTLOG: redirecting log output to logging collector process
2021-04-13 00:50:00.667 CSTHINT: Future log output will appear in directory "pg_log".
done
server started
[postgres@xiongcc ~]$ psql -p 5433
psql (13.2)
Type "help" for help.

postgres=# select * from pg_replication_slots ;
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size
-----------+----------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------
mysub | pgoutput | logical | 13578 | postgres | f | f | | | 10487136 | 6/8F0093D8 | 6/8F0093D8 | reserved |
(1 row)

但是这样就会有一个很大的风险,备库复制槽的xmin和lsn是不会推进的,这就会导致备库的WAL会一直保留

主库进行写入,然后模拟宕机

postgres=# insert into test select n,'test',clock_timestamp() from generate_series(100,1000) as n;
INSERT 0 901
postgres=# insert into test select n,'test',clock_timestamp() from generate_series(1001,2000) as n;
INSERT 0 1000
postgres=# \q
[postgres@xiongcc ~]$ pg_ctl -D pgdata/ stop
waiting for server to shut down....
done
server stopped

备机promote,然后订阅端刷新一下订阅alter subscription mysub connection 'host=localhost port=5433 dbname=postgres';

[postgres@xiongcc ~]$ pg_ctl -D backupdata/ promote
waiting for server to promote.... done
server promoted
[postgres@xiongcc ~]$ psql -p 5433
psql (13.2)
Type "help" for help.

postgres=# select * from pg_publication;
oid | pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate | pubviaroot
-------+---------+----------+--------------+-----------+-----------+-----------+-------------+------------
33691 | pub1 | 10 | f | t | t | t | t | f
(1 row)

postgres=# select count(*) from test;
count
-------
1901
(1 row)

postgres=# insert into test values(10000,'xiongcc',now());
INSERT 0 1

postgres=# select count(*) from test;
count
-------
1902
(1 row)

postgres=# select * from pg_replication_slots ;
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size
-----------+----------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------
mysub | pgoutput | logical | 13578 | postgres | f | t | 26288 | | 10487140 | 6/90001328 | 6/90001360 | reserved |
(1 row)

订阅端日志中可以看到

2021-04-13 01:02:40.907 CST,,,26373,,60747c80.6705,4,,2021-04-13 00:59:44 CST,4/10,0,LOG,00000,"logical replication apply worker for subscription ""mysub"" will restart because the connection information was changed",,,,,,,,,"","logical replication worker"
2021-04-13 01:02:40.910 CST,,,26437,,60747d30.6745,1,,2021-04-13 01:02:40 CST,4/12,0,LOG,00000,"logical replication apply worker for subscription ""mysub"" has started",,,,,,,,,"","logical replication worker"

查看数据

postgres=# select count(*) from test;
count
-------
1901
(1 row)

postgres=# select count(*) from test;
count
-------
1902
(1 row)

可以看到,拷贝目录的方式也行,但是会有很多风险,因为备库复制槽一旦发现了,xmin和lsn是不会推进的,会导致表膨胀和wal膨胀,有没有更方便快捷一点的办法?

复制槽之乾坤大挪移

所谓你有张良计我有过桥梯,​​https://github.com/x4m/pg_tm_aux​​,介绍很简短

Extension to create a logical replication slot in the past. It is useful to implement continuous logical streaming from the highly available cluster on physical replication. When primary node of a cluster is failovered, we need to start logical streaming from new node.
We cannot start logical replication from LSN different from LSN of a slot. And cannot create a slot on LSN in the past, particularly before or right after promotion.
This leads to massive waste of network bandwidth in our installations, due to necessity of initial table sync.
This extension implements Yandex Data Transfer auxiliary functions to create slot in the past.

理解为支持故障转移的逻辑复制槽即可。编译的时候要改下Makefile的topdir,该插件核心就一个函数pg_create_logical_replication_slot_lsn

postgres=# \df pg_create_logical_replication_slot_lsn
List of functions
Schema | Name | Result data type | Argument data types | Type
--------+----------------------------------------+------------------+-------------------------------------------------------------------------------------------------------------------------------------------+------
public | pg_create_logical_replication_slot_lsn | record | slot_name name, plugin name, temporary boolean DEFAULT false, restart_lsn pg_lsn DEFAULT NULL::pg_lsn, OUT slot_name name, OUT lsn pg_lsn | func
(1 row)

恢复一下环境,主库步骤和之前一样

postgres=# create publication pub1 for table test;
CREATE PUBLICATION
postgres=# insert into test select n,'test',clock_timestamp() from generate_series(1,3) as n;
INSERT 0 3
postgres=# update test set id = 99 where id = 1;
UPDATE 1
postgres=# insert into test select n,'test',clock_timestamp() from generate_series(100,1000) as n;
INSERT 0 901
postgres=# insert into test select n,'test',clock_timestamp() from generate_series(1001,10000) as n;
INSERT 0 9000

订阅端也一样,锁冲突后,保持在3条

postgres=# select * from test;
id | info | t_time
----+------+----------------------------
1 | test | 2021-04-13 00:16:16.505276
2 | test | 2021-04-13 00:16:16.50548
3 | test | 2021-04-13 00:16:16.505492
(3 rows)

此时主库宕机,但要先记录一下主库的restart_lsn和slot_name,分别是6/87011B20和mysub

postgres=# select * from pg_replication_slots ;
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size
-----------+----------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------
mysub | pgoutput | logical | 13578 | postgres | f | f | | | 10487123 | 6/87011B20 | 6/87011B58 | reserved |
(1 row)

备库promote为新主,同时创建转移后的复制槽,可以看到此时active还是false

postgres=# select * from pg_create_logical_replication_slot_lsn('mysub','pgoutput',false,pg_lsn('6/87011B20'));
slot_name | lsn
-----------+------------
mysub | 6/87011B20
(1 row)

postgres=# select * from pg_replication_slots ;
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size
-----------+----------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------
mysub | pgoutput | logical | 13578 | postgres | f | f | | | 10487127 | 6/87011B20 | 6/87011B20 | reserved |
(1 row)

订阅端刷新一下订阅

postgres=# alter subscription mysub connection 'host=localhost port=5433 dbname=postgres';
ALTER SUBSCRIPTION

然后模拟锁冲突的事务进行回滚,这是日志开始打印,注意关注,rollback之后,逻辑复制开始进行restart:logical replication apply worker for subscription ""mysub"" will restart because the connection information was changed,logical replication apply worker for subscription ""mysub"" has started

2021-04-13 00:19:52.472 CST,"postgres","postgres",25831,"[local]",60747209.64e7,17,"ALTER SUBSCRIPTION",2021-04-13 00:15:05 CST,3/0,0,LOG,00000,"duration: 1.000 ms",,,,,,,,,"psql","client backend"
2021-04-13 00:20:00.147 CST,"postgres","postgres",25862,"[local]",60747265.6506,5,"ROLLBACK",2021-04-13 00:16:37 CST,5/0,0,LOG,00000,"duration: 0.227 ms",,,,,,,,,"psql","client backend"
2021-04-13 00:20:00.147 CST,,,25856,,6074724c.6500,3,,2021-04-13 00:16:12 CST,4/39,0,LOG,00000,"process 25856 acquired ShareLock on transaction 10487123 after 187508.095 ms",,,,,,,,,"","logical replication worker"
2021-04-13 00:20:00.147 CST,,,25856,,6074724c.6500,4,,2021-04-13 00:16:12 CST,4/40,0,LOG,00000,"logical replication apply worker for subscription ""mysub"" will restart because the connection information was changed",,,,,,,,,"","logical replication worker"
2021-04-13 00:20:00.150 CST,,,25929,,60747330.6549,1,,2021-04-13 00:20:00 CST,4/42,0,LOG,00000,"logical replication apply worker for subscription ""mysub"" has started",,,,,,,,,"","logical replication worker"
2021-04-13 00:20:15.946 CST,"postgres","postgres",25831,"[local]",60747209.64e7,18,"SELECT",2021-04-13 00:15:05 CST,3/0,0,LOG,00000,"duration: 7.743 ms",,,,,,,,,"psql","client backend"

查询一下总数,和新主保持一致,并且可以继续和新主保持订阅消费

postgres=# alter subscription mysub connection 'host=localhost port=5433 dbname=postgres';
ALTER SUBSCRIPTION
postgres=# select count(*) from test;
count
-------
9904
(1 row)

主库

postgres=# insert into test values(99999,'xiongcc',now());
INSERT 0 1

订阅端

postgres=# select * from test where id = 99999;
id | info | t_time
-------+---------+----------------------------
99999 | xiongcc | 2021-04-13 00:27:22.321033
(1 row)

逻辑复制HA思考

可以看到,该pg_tm_aux插件,可以实现逻辑复制槽的故障转移,对于目前主流的PG HA软件,稍微集成与适配一下,如前文实验那样,主库宕机了,HA切换之前,可以定时获取如每隔5s获取一次主库复制槽的restart_lsn,或者主库挂掉恢复之后,不加入现有集群,然后获取一下旧主宕机前的restart_lsn(因为复制槽的信息是持久化到存储上的,不会丢失)然后根据restart_lsn在新的主库上创建新的复制槽、刷新订阅,以此种种,即可让PG的逻辑复制能力进一步得到提升,当然,拷贝复制槽文件也可以是另一种办法,但是备库并不会主动发现,并且一旦发现,由于lsn和xmin不会推进,会带来wal膨胀和表膨胀的风险。需要注意的是,上面两种方案,都要确保logical decoding的wal存在,所以会要求wal_keep_segments较大(PG13里是max_slot_wal_keep_size),不然会提示xxx has been removed!另外,该插件还支持其他logical decoding plugin,如wal2json,笔者也试过,也能达到类似效果,在此不再赘述,感兴趣的自行实验

小结

pg_tm_aux插件让PG的逻辑复制能力上了一个阶梯,但是需要适配与集成,也算是一个小遗憾。拷贝数据文件比较繁琐并且很不人性化,风险也大。以上是个人所感,可能有不足与纰漏之处,望谅解。



举报

相关推荐

0 条评论