aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/sequence.c
diff options
context:
space:
mode:
authorTomas Vondra <tomas.vondra@postgresql.org>2022-03-24 18:20:21 +0100
committerTomas Vondra <tomas.vondra@postgresql.org>2022-03-24 18:49:27 +0100
commit75b1521dae1ff1fde17fda2e30e591f2e5d64b6a (patch)
tree924b88a40d67da7b5b441f2f37a7a5d6daa79f49 /src/backend/commands/sequence.c
parent0adb3dc68bfb9a347ff2c7fe63200419bb649265 (diff)
downloadpostgresql-75b1521dae1ff1fde17fda2e30e591f2e5d64b6a.tar.gz
postgresql-75b1521dae1ff1fde17fda2e30e591f2e5d64b6a.zip
Add decoding of sequences to built-in replication
This commit adds support for decoding of sequences to the built-in replication (the infrastructure was added by commit 0da92dc530). The syntax and behavior mostly mimics handling of tables, i.e. a publication may be defined as FOR ALL SEQUENCES (replicating all sequences in a database), FOR ALL SEQUENCES IN SCHEMA (replicating all sequences in a particular schema) or individual sequences. To publish sequence modifications, the publication has to include 'sequence' action. The protocol is extended with a new message, describing sequence increments. A new system view pg_publication_sequences lists all the sequences added to a publication, both directly and indirectly. Various psql commands (\d and \dRp) are improved to also display publications including a given sequence, or sequences included in a publication. Author: Tomas Vondra, Cary Huang Reviewed-by: Peter Eisentraut, Amit Kapila, Hannu Krosing, Andres Freund, Petr Jelinek Discussion: https://postgr.es/m/d045f3c2-6cfb-06d3-5540-e63c320df8bc@enterprisedb.com Discussion: https://postgr.es/m/1710ed7e13b.cd7177461430746.3372264562543607781@highgo.ca
Diffstat (limited to 'src/backend/commands/sequence.c')
-rw-r--r--src/backend/commands/sequence.c154
1 files changed, 154 insertions, 0 deletions
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index c13cada3bf1..717bb0b2aa9 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -337,6 +337,160 @@ ResetSequence(Oid seq_relid)
}
/*
+ * Update the sequence state by modifying the existing sequence data row.
+ *
+ * This keeps the same relfilenode, so the behavior is non-transactional.
+ */
+static void
+SetSequence_non_transactional(Oid seqrelid, int64 last_value, int64 log_cnt, bool is_called)
+{
+ SeqTable elm;
+ Relation seqrel;
+ Buffer buf;
+ HeapTupleData seqdatatuple;
+ Form_pg_sequence_data seq;
+
+ /* open and lock sequence */
+ init_sequence(seqrelid, &elm, &seqrel);
+
+ /* lock page' buffer and read tuple */
+ seq = read_seq_tuple(seqrel, &buf, &seqdatatuple);
+
+ /* check the comment above nextval_internal()'s equivalent call. */
+ if (RelationNeedsWAL(seqrel))
+ {
+ GetTopTransactionId();
+
+ if (XLogLogicalInfoActive())
+ GetCurrentTransactionId();
+ }
+
+ /* ready to change the on-disk (or really, in-buffer) tuple */
+ START_CRIT_SECTION();
+
+ seq->last_value = last_value;
+ seq->is_called = is_called;
+ seq->log_cnt = log_cnt;
+
+ MarkBufferDirty(buf);
+
+ /* XLOG stuff */
+ if (RelationNeedsWAL(seqrel))
+ {
+ xl_seq_rec xlrec;
+ XLogRecPtr recptr;
+ Page page = BufferGetPage(buf);
+
+ XLogBeginInsert();
+ XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
+
+ xlrec.node = seqrel->rd_node;
+ xlrec.created = false;
+
+ XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
+ XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
+
+ recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG);
+
+ PageSetLSN(page, recptr);
+ }
+
+ END_CRIT_SECTION();
+
+ UnlockReleaseBuffer(buf);
+
+ /* Clear local cache so that we don't think we have cached numbers */
+ /* Note that we do not change the currval() state */
+ elm->cached = elm->last;
+
+ relation_close(seqrel, NoLock);
+}
+
+/*
+ * Update the sequence state by creating a new relfilenode.
+ *
+ * This creates a new relfilenode, to allow transactional behavior.
+ */
+static void
+SetSequence_transactional(Oid seq_relid, int64 last_value, int64 log_cnt, bool is_called)
+{
+ SeqTable elm;
+ Relation seqrel;
+ Buffer buf;
+ HeapTupleData seqdatatuple;
+ Form_pg_sequence_data seq;
+ HeapTuple tuple;
+
+ /* open and lock sequence */
+ init_sequence(seq_relid, &elm, &seqrel);
+
+ /* lock page' buffer and read tuple */
+ seq = read_seq_tuple(seqrel, &buf, &seqdatatuple);
+
+ /* Copy the existing sequence tuple. */
+ tuple = heap_copytuple(&seqdatatuple);
+
+ /* Now we're done with the old page */
+ UnlockReleaseBuffer(buf);
+
+ /*
+ * Modify the copied tuple to update the sequence state (similar to what
+ * ResetSequence does).
+ */
+ seq = (Form_pg_sequence_data) GETSTRUCT(tuple);
+ seq->last_value = last_value;
+ seq->is_called = is_called;
+ seq->log_cnt = log_cnt;
+
+ /*
+ * Create a new storage file for the sequence - this is needed for the
+ * transactional behavior.
+ */
+ RelationSetNewRelfilenode(seqrel, seqrel->rd_rel->relpersistence);
+
+ /*
+ * Ensure sequence's relfrozenxid is at 0, since it won't contain any
+ * unfrozen XIDs. Same with relminmxid, since a sequence will never
+ * contain multixacts.
+ */
+ Assert(seqrel->rd_rel->relfrozenxid == InvalidTransactionId);
+ Assert(seqrel->rd_rel->relminmxid == InvalidMultiXactId);
+
+ /*
+ * Insert the modified tuple into the new storage file. This does all the
+ * necessary WAL-logging etc.
+ */
+ fill_seq_with_data(seqrel, tuple);
+
+ /* Clear local cache so that we don't think we have cached numbers */
+ /* Note that we do not change the currval() state */
+ elm->cached = elm->last;
+
+ relation_close(seqrel, NoLock);
+}
+
+/*
+ * Set a sequence to a specified internal state.
+ *
+ * The change is made transactionally, so that on failure of the current
+ * transaction, the sequence will be restored to its previous state.
+ * We do that by creating a whole new relfilenode for the sequence; so this
+ * works much like the rewriting forms of ALTER TABLE.
+ *
+ * Caller is assumed to have acquired AccessExclusiveLock on the sequence,
+ * which must not be released until end of transaction. Caller is also
+ * responsible for permissions checking.
+ */
+void
+SetSequence(Oid seq_relid, bool transactional, int64 last_value, int64 log_cnt, bool is_called)
+{
+ if (transactional)
+ SetSequence_transactional(seq_relid, last_value, log_cnt, is_called);
+ else
+ SetSequence_non_transactional(seq_relid, last_value, log_cnt, is_called);
+}
+
+/*
* Initialize a sequence's relation with the specified tuple as content
*/
static void