aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/reorderbuffer.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/reorderbuffer.c')
-rw-r--r--src/backend/replication/logical/reorderbuffer.c71
1 files changed, 66 insertions, 5 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 88a37fde722..1c21a1d14b6 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -349,6 +349,8 @@ ReorderBufferAllocate(void)
buffer->by_txn_last_xid = InvalidTransactionId;
buffer->by_txn_last_txn = NULL;
+ buffer->catchange_ntxns = 0;
+
buffer->outbuf = NULL;
buffer->outbufsize = 0;
buffer->size = 0;
@@ -366,6 +368,7 @@ ReorderBufferAllocate(void)
dlist_init(&buffer->toplevel_by_lsn);
dlist_init(&buffer->txns_by_base_snapshot_lsn);
+ dlist_init(&buffer->catchange_txns);
/*
* Ensure there's no stale data from prior uses of this slot, in case some
@@ -1526,14 +1529,22 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
}
/*
- * Remove TXN from its containing list.
+ * Remove TXN from its containing lists.
*
* Note: if txn is known as subxact, we are deleting the TXN from its
* parent's list of known subxacts; this leaves the parent's nsubxacts
* count too high, but we don't care. Otherwise, we are deleting the TXN
- * from the LSN-ordered list of toplevel TXNs.
+ * from the LSN-ordered list of toplevel TXNs. We remove the TXN from the
+ * list of catalog modifying transactions as well.
*/
dlist_delete(&txn->node);
+ if (rbtxn_has_catalog_changes(txn))
+ {
+ dlist_delete(&txn->catchange_node);
+ rb->catchange_ntxns--;
+
+ Assert(rb->catchange_ntxns >= 0);
+ }
/* now remove reference from buffer */
hash_search(rb->by_txn,
@@ -3275,10 +3286,16 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr lsn)
{
ReorderBufferTXN *txn;
+ ReorderBufferTXN *toptxn;
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
- txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
+ if (!rbtxn_has_catalog_changes(txn))
+ {
+ txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
+ dlist_push_tail(&rb->catchange_txns, &txn->catchange_node);
+ rb->catchange_ntxns++;
+ }
/*
* Mark top-level transaction as having catalog changes too if one of its
@@ -3286,8 +3303,52 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
* conveniently check just top-level transaction and decide whether to
* build the hash table or not.
*/
- if (txn->toptxn != NULL)
- txn->toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
+ toptxn = txn->toptxn;
+ if (toptxn != NULL && !rbtxn_has_catalog_changes(toptxn))
+ {
+ toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
+ dlist_push_tail(&rb->catchange_txns, &toptxn->catchange_node);
+ rb->catchange_ntxns++;
+ }
+}
+
+/*
+ * Return palloc'ed array of the transactions that have changed catalogs.
+ * The returned array is sorted in xidComparator order.
+ *
+ * The caller must free the returned array when done with it.
+ */
+TransactionId *
+ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb)
+{
+ dlist_iter iter;
+ TransactionId *xids = NULL;
+ size_t xcnt = 0;
+
+ /* Quick return if the list is empty */
+ if (rb->catchange_ntxns == 0)
+ {
+ Assert(dlist_is_empty(&rb->catchange_txns));
+ return NULL;
+ }
+
+ /* Initialize XID array */
+ xids = (TransactionId *) palloc(sizeof(TransactionId) * rb->catchange_ntxns);
+ dlist_foreach(iter, &rb->catchange_txns)
+ {
+ ReorderBufferTXN *txn = dlist_container(ReorderBufferTXN,
+ catchange_node,
+ iter.cur);
+
+ Assert(rbtxn_has_catalog_changes(txn));
+
+ xids[xcnt++] = txn->xid;
+ }
+
+ qsort(xids, xcnt, sizeof(TransactionId), xidComparator);
+
+ Assert(xcnt == rb->catchange_ntxns);
+ return xids;
}
/*