前言
上一期分享了pg_tm_aux这个插件,除了实现逻辑复制槽的故障转移之外,配合wal2json等plugin可以实现另外一些骚操作,突然发现了一个新大陆。
实操
首先创建一个发布,然后进行一些写入操作,主要是需要这些 lsn。
postgres=# create publication pub1 for table test;
CREATE PUBLICATION
postgres=# select * from pg_replication_slots ;
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size
-----------+----------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------
mysub | pgoutput | logical | 13578 | postgres | f | t | 7976 | | 11309900 | 7/A400AD70 | 7/A400ADA8 | reserved |
(1 row)
postgres=# insert into test select n,'test',now() from generate_series(0,10) n;
INSERT 0 11
postgres=# select * from pg_replication_slots ;
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size
-----------+----------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------
mysub | pgoutput | logical | 13578 | postgres | f | t | 7976 | | 11309900 | 7/A400B798 | 7/A400B7D0 | reserved |
(1 row)
postgres=# insert into test select n,'test',now() from generate_series(11,100) n;
INSERT 0 90
postgres=# select * from pg_replication_slots ;
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | safe_wal_size
-----------+----------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+---------------
mysub | pgoutput | logical | 13578 | postgres | f | t | 7976 | | 11309900 | 7/A400B798 | 7/A4010168 | reserved |
(1 row)
然后使用pg_tm_aux插件提供的函数,创建两个指定lsn的slot,第一个slot是从刚开始插入数据开始的,也就是说,表里还没有数据的时候;而第二个slot是从插入了11条数据之后的。
postgres=# select pg_create_logical_replication_slot_lsn('myslot','wal2json',false,pg_lsn('7/A400AD70'));
pg_create_logical_replication_slot_lsn
----------------------------------------
(myslot,7/A400AD70)
(1 row)
postgres=# select pg_create_logical_replication_slot_lsn('myslot2','wal2json',false,pg_lsn('7/A400B798'));
pg_create_logical_replication_slot_lsn
----------------------------------------
(myslot2,7/A400B798)
(1 row)
这里还需要用到wal2json这个逻辑解码工具,其他的逻辑解码工具如wal2mongo,pglogical就不介绍了,需要在共享库里添加。
然后配合原生的逻辑解码工具pg_recvlogical,看一下效果:
[postgres@xiongcc ~]$ pg_recvlogical --start -S myslot -d postgres -P wal2json -f -
{"change":[{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[0,"test","2021-04-15 17:00:02.421606"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[1,"test","2021-04-15 17:00:02.421606"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[2,"test","2021-04-15 17:00:02.421606"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[3,"test","2021-04-15 17:00:02.421606"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[4,"test","2021-04-15 17:00:02.421606"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[5,"test","2021-04-15 17:00:02.421606"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[6,"test","2021-04-15 17:00:02.421606"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[7,"test","2021-04-15 17:00:02.421606"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[8,"test","2021-04-15 17:00:02.421606"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[9,"test","2021-04-15 17:00:02.421606"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[10,"test","2021-04-15 17:00:02.421606"]}]}
{"change":[{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[11,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[12,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[13,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[14,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[15,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[16,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[17,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[18,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[19,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[20,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[21,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[22,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[23,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[24,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id
......
可以看到,成功的将lsn 7/A400AD70之后的数据消费了出来,看看第二个slot,也如预期,从11这条数据开始消费
[postgres@xiongcc ~]$ pg_recvlogical --start -S myslot2 -d postgres -P wal2json -f -
{"change":[{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[11,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[12,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[13,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[14,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[15,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[16,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[17,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[18,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[19,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[20,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[21,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[22,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[23,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[24,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[25,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[26,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[27,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[28,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[29,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[30,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[31,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[32,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[33,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[34,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_time"],"columntypes":["integer","text","timestamp without time zone"],"columnvalues":[35,"test","2021-04-15 17:00:17.698731"]},{"kind":"insert","schema":"public","table":"test","columnnames":["id","info","t_t
......
小结
可以看到,使用pg_tm_aux这个插件,可以实现类似于pgwalminer、binlog2sql的这种效果,消费端再自行组装一下,也可以实现往异构数据库的同步。不过还是要注意那个老问题,xlog要确保还在。
真是个牛逼的东西啊!