diff options
author | Tomas Vondra <tomas.vondra@postgresql.org> | 2022-03-24 18:20:21 +0100 |
---|---|---|
committer | Tomas Vondra <tomas.vondra@postgresql.org> | 2022-03-24 18:49:27 +0100 |
commit | 75b1521dae1ff1fde17fda2e30e591f2e5d64b6a (patch) | |
tree | 924b88a40d67da7b5b441f2f37a7a5d6daa79f49 /src/backend/replication/logical/worker.c | |
parent | 0adb3dc68bfb9a347ff2c7fe63200419bb649265 (diff) | |
download | postgresql-75b1521dae1ff1fde17fda2e30e591f2e5d64b6a.tar.gz postgresql-75b1521dae1ff1fde17fda2e30e591f2e5d64b6a.zip |
Add decoding of sequences to built-in replication
This commit adds support for decoding of sequences to the built-in
replication (the infrastructure was added by commit 0da92dc530).
The syntax and behavior mostly mimics handling of tables, i.e. a
publication may be defined as FOR ALL SEQUENCES (replicating all
sequences in a database), FOR ALL SEQUENCES IN SCHEMA (replicating
all sequences in a particular schema) or individual sequences.
To publish sequence modifications, the publication has to include
'sequence' action. The protocol is extended with a new message,
describing sequence increments.
A new system view pg_publication_sequences lists all the sequences
added to a publication, both directly and indirectly. Various psql
commands (\d and \dRp) are improved to also display publications
including a given sequence, or sequences included in a publication.
Author: Tomas Vondra, Cary Huang
Reviewed-by: Peter Eisentraut, Amit Kapila, Hannu Krosing, Andres
Freund, Petr Jelinek
Discussion: https://postgr.es/m/d045f3c2-6cfb-06d3-5540-e63c320df8bc@enterprisedb.com
Discussion: https://postgr.es/m/1710ed7e13b.cd7177461430746.3372264562543607781@highgo.ca
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r-- | src/backend/replication/logical/worker.c | 56 |
1 files changed, 56 insertions, 0 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 82dcffc2db8..f3868b3e1f8 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -143,6 +143,7 @@ #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "catalog/pg_tablespace.h" +#include "commands/sequence.h" #include "commands/tablecmds.h" #include "commands/tablespace.h" #include "commands/trigger.h" @@ -1144,6 +1145,57 @@ apply_handle_origin(StringInfo s) } /* + * Handle SEQUENCE message. + */ +static void +apply_handle_sequence(StringInfo s) +{ + LogicalRepSequence seq; + Oid relid; + + if (handle_streamed_transaction(LOGICAL_REP_MSG_SEQUENCE, s)) + return; + + logicalrep_read_sequence(s, &seq); + + /* + * Non-transactional sequence updates should not be part of a remote + * transaction. There should not be any running transaction. + */ + Assert((!seq.transactional) || in_remote_transaction); + Assert(!(!seq.transactional && in_remote_transaction)); + Assert(!(!seq.transactional && IsTransactionState())); + + /* + * Make sure we're in a transaction (needed by SetSequence). For + * non-transactional updates we're guaranteed to start a new one, + * and we'll commit it at the end. + */ + if (!IsTransactionState()) + { + StartTransactionCommand(); + maybe_reread_subscription(); + } + + relid = RangeVarGetRelid(makeRangeVar(seq.nspname, + seq.seqname, -1), + RowExclusiveLock, false); + + /* lock the sequence in AccessExclusiveLock, as expected by SetSequence */ + LockRelationOid(relid, AccessExclusiveLock); + + /* apply the sequence change */ + SetSequence(relid, seq.transactional, seq.last_value, seq.log_cnt, seq.is_called); + + /* + * Commit the per-stream transaction (we only do this when not in + * remote transaction, i.e. for non-transactional sequence updates. + */ + if (!in_remote_transaction) + CommitTransactionCommand(); +} + +/* * Handle STREAM START message. */ static void @@ -2511,6 +2563,10 @@ apply_dispatch(StringInfo s) */ break; + case LOGICAL_REP_MSG_SEQUENCE: + apply_handle_sequence(s); + return; + case LOGICAL_REP_MSG_STREAM_START: apply_handle_stream_start(s); break; |