aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/worker.c
diff options
context:
space:
mode:
authorTomas Vondra <tomas.vondra@postgresql.org>2022-03-24 18:20:21 +0100
committerTomas Vondra <tomas.vondra@postgresql.org>2022-03-24 18:49:27 +0100
commit75b1521dae1ff1fde17fda2e30e591f2e5d64b6a (patch)
tree924b88a40d67da7b5b441f2f37a7a5d6daa79f49 /src/backend/replication/logical/worker.c
parent0adb3dc68bfb9a347ff2c7fe63200419bb649265 (diff)
downloadpostgresql-75b1521dae1ff1fde17fda2e30e591f2e5d64b6a.tar.gz
postgresql-75b1521dae1ff1fde17fda2e30e591f2e5d64b6a.zip
Add decoding of sequences to built-in replication
This commit adds support for decoding of sequences to the built-in replication (the infrastructure was added by commit 0da92dc530). The syntax and behavior mostly mimics handling of tables, i.e. a publication may be defined as FOR ALL SEQUENCES (replicating all sequences in a database), FOR ALL SEQUENCES IN SCHEMA (replicating all sequences in a particular schema) or individual sequences. To publish sequence modifications, the publication has to include 'sequence' action. The protocol is extended with a new message, describing sequence increments. A new system view pg_publication_sequences lists all the sequences added to a publication, both directly and indirectly. Various psql commands (\d and \dRp) are improved to also display publications including a given sequence, or sequences included in a publication. Author: Tomas Vondra, Cary Huang Reviewed-by: Peter Eisentraut, Amit Kapila, Hannu Krosing, Andres Freund, Petr Jelinek Discussion: https://postgr.es/m/d045f3c2-6cfb-06d3-5540-e63c320df8bc@enterprisedb.com Discussion: https://postgr.es/m/1710ed7e13b.cd7177461430746.3372264562543607781@highgo.ca
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r--src/backend/replication/logical/worker.c56
1 files changed, 56 insertions, 0 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 82dcffc2db8..f3868b3e1f8 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -143,6 +143,7 @@
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_tablespace.h"
+#include "commands/sequence.h"
#include "commands/tablecmds.h"
#include "commands/tablespace.h"
#include "commands/trigger.h"
@@ -1144,6 +1145,57 @@ apply_handle_origin(StringInfo s)
}
/*
+ * Handle SEQUENCE message.
+ */
+static void
+apply_handle_sequence(StringInfo s)
+{
+ LogicalRepSequence seq;
+ Oid relid;
+
+ if (handle_streamed_transaction(LOGICAL_REP_MSG_SEQUENCE, s))
+ return;
+
+ logicalrep_read_sequence(s, &seq);
+
+ /*
+ * Non-transactional sequence updates should not be part of a remote
+ * transaction. There should not be any running transaction.
+ */
+ Assert((!seq.transactional) || in_remote_transaction);
+ Assert(!(!seq.transactional && in_remote_transaction));
+ Assert(!(!seq.transactional && IsTransactionState()));
+
+ /*
+ * Make sure we're in a transaction (needed by SetSequence). For
+ * non-transactional updates we're guaranteed to start a new one,
+ * and we'll commit it at the end.
+ */
+ if (!IsTransactionState())
+ {
+ StartTransactionCommand();
+ maybe_reread_subscription();
+ }
+
+ relid = RangeVarGetRelid(makeRangeVar(seq.nspname,
+ seq.seqname, -1),
+ RowExclusiveLock, false);
+
+ /* lock the sequence in AccessExclusiveLock, as expected by SetSequence */
+ LockRelationOid(relid, AccessExclusiveLock);
+
+ /* apply the sequence change */
+ SetSequence(relid, seq.transactional, seq.last_value, seq.log_cnt, seq.is_called);
+
+ /*
+ * Commit the per-stream transaction (we only do this when not in
+ * remote transaction, i.e. for non-transactional sequence updates.
+ */
+ if (!in_remote_transaction)
+ CommitTransactionCommand();
+}
+
+/*
* Handle STREAM START message.
*/
static void
@@ -2511,6 +2563,10 @@ apply_dispatch(StringInfo s)
*/
break;
+ case LOGICAL_REP_MSG_SEQUENCE:
+ apply_handle_sequence(s);
+ return;
+
case LOGICAL_REP_MSG_STREAM_START:
apply_handle_stream_start(s);
break;