diff options
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r-- | src/backend/replication/logical/worker.c | 56 |
1 files changed, 56 insertions, 0 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 82dcffc2db8..f3868b3e1f8 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -143,6 +143,7 @@ #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "catalog/pg_tablespace.h" +#include "commands/sequence.h" #include "commands/tablecmds.h" #include "commands/tablespace.h" #include "commands/trigger.h" @@ -1144,6 +1145,57 @@ apply_handle_origin(StringInfo s) } /* + * Handle SEQUENCE message. + */ +static void +apply_handle_sequence(StringInfo s) +{ + LogicalRepSequence seq; + Oid relid; + + if (handle_streamed_transaction(LOGICAL_REP_MSG_SEQUENCE, s)) + return; + + logicalrep_read_sequence(s, &seq); + + /* + * Non-transactional sequence updates should not be part of a remote + * transaction. There should not be any running transaction. + */ + Assert((!seq.transactional) || in_remote_transaction); + Assert(!(!seq.transactional && in_remote_transaction)); + Assert(!(!seq.transactional && IsTransactionState())); + + /* + * Make sure we're in a transaction (needed by SetSequence). For + * non-transactional updates we're guaranteed to start a new one, + * and we'll commit it at the end. + */ + if (!IsTransactionState()) + { + StartTransactionCommand(); + maybe_reread_subscription(); + } + + relid = RangeVarGetRelid(makeRangeVar(seq.nspname, + seq.seqname, -1), + RowExclusiveLock, false); + + /* lock the sequence in AccessExclusiveLock, as expected by SetSequence */ + LockRelationOid(relid, AccessExclusiveLock); + + /* apply the sequence change */ + SetSequence(relid, seq.transactional, seq.last_value, seq.log_cnt, seq.is_called); + + /* + * Commit the per-stream transaction (we only do this when not in + * remote transaction, i.e. for non-transactional sequence updates. + */ + if (!in_remote_transaction) + CommitTransactionCommand(); +} + +/* * Handle STREAM START message. */ static void @@ -2511,6 +2563,10 @@ apply_dispatch(StringInfo s) */ break; + case LOGICAL_REP_MSG_SEQUENCE: + apply_handle_sequence(s); + return; + case LOGICAL_REP_MSG_STREAM_START: apply_handle_stream_start(s); break; |