Home > Enterprise >  Logical replication with psycopg2 and publishers
Logical replication with psycopg2 and publishers

Time:11-28

There is logical replication script written in python and psycopg2: https://www.psycopg.org/docs/advanced.html#logical-replication-quick-start

It works. But I want to receive not all actions in the database, but only those that are specified in publisher.

For this:

  1. I created publisher in source database (ver. 13.3) with name "PUB" and one table. I also deployed the second database (ver. 14.1) and created a subscription to this publication in it in order to check the functionality of the publication. And it works.

  2. In psycopg2 documentation (https://www.psycopg.org/docs/extras.html#replication-support-objects) and postgresql documentation (https://postgrespro.ru/docs/postgresql/13/protocol-logical-replication?lang=en) I found a way to set publication_names in start_replication

cur.start_replication(slot_name='pytest', decode=True, options={'publication_names': 'PUB'})

But it didn't work. I got error:

psycopg2.errors.InvalidParameterValue: option "publication_names" = "PUB," is unknown

Actually the question. Can logical replication in psycorpg2 be associated with a subscription? And if so, how?

P.s. There is alternative (Trigger/Notify -> Listen/psycopg2), but i want to solve this problem if it possible.

CodePudding user response:

The test_decoding plugin doesn't support the publication_names option.

The plugin used by the built-in PUBLICATION is called 'pgoutput', not 'test_decoding'. But as far as I can tell, psycopg2 doesn't support use of that plugin.

So what you want is not supported. I don't know if anyone is working to add support for it or not.

CodePudding user response:

The answer to your question is probably "yes and no".

As jjanes states in their answer, you must use the pgoutput output plugin to make use of publications. You can configure psycopg2 to use it like this:

conn = psycopg2.connect(
    'dbname=test',
    connection_factory=psycopg2.extras.LogicalReplicationConnection,
)

options = {'publication_names': 'pub', 'proto_version': '1'}
cur = conn.cursor()
try:
    cur.start_replication(slot_name='pytest', decode=False, options=options)
except psycopg2.ProgrammingError:
    cur.create_replication_slot(
        'pytest',
        output_plugin='pgoutput',
    )
    cur.start_replication(slot_name='pytest', decode=False, options=options)

However pgoutput uses a binary protocol, so the output for these statements

insert into table1 (col) values ('hello hello');
update table1 set col = 'hello goodbye' where id = 6;

is

b'B\x00\x00\x00\x00r!<x\x00\x02t\xc65\xbb\xb5\xd3\x00\x00jd'
b'R\x00\x00F:public\x00table1\x00d\x00\x02\x01id\x00\x00\x00\x00\x17\xff\xff\xff\xff\x00col\x00\x00\x00\x04\x13\x00\x00\x00\x14'
b'I\x00\x00F:N\x00\x02t\x00\x00\x00\x016t\x00\x00\x00\x0bhello hello'
b'C\x00\x00\x00\x00\x00r!<x\x00\x00\x00\x00r!<\xa8\x00\x02t\xc65\xbb\xb5\xd3'
b'B\x00\x00\x00\x00r!=8\x00\x02t\xc67`\xb5\xae\x00\x00je'
b'U\x00\x00F:N\x00\x02t\x00\x00\x00\x016t\x00\x00\x00\rhello goodbye'
b'C\x00\x00\x00\x00\x00r!=8\x00\x00\x00\x00r!=h\x00\x02t\xc67`\xb5\xae'

because psycopg2 doesn't know how to decode the messages. You could decode them yourself, using the struct module from Python's standard library and the published message formats, or you could consider using wal2json instead of pgoutput. wal2json does not currently support publications, but it does provide the means to emulate them.

  • Related