diff options
Diffstat (limited to 'src/backend/replication/logical/reorderbuffer.c')
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 71 |
1 files changed, 66 insertions, 5 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 88a37fde722..1c21a1d14b6 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -349,6 +349,8 @@ ReorderBufferAllocate(void) buffer->by_txn_last_xid = InvalidTransactionId; buffer->by_txn_last_txn = NULL; + buffer->catchange_ntxns = 0; + buffer->outbuf = NULL; buffer->outbufsize = 0; buffer->size = 0; @@ -366,6 +368,7 @@ ReorderBufferAllocate(void) dlist_init(&buffer->toplevel_by_lsn); dlist_init(&buffer->txns_by_base_snapshot_lsn); + dlist_init(&buffer->catchange_txns); /* * Ensure there's no stale data from prior uses of this slot, in case some @@ -1526,14 +1529,22 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) } /* - * Remove TXN from its containing list. + * Remove TXN from its containing lists. * * Note: if txn is known as subxact, we are deleting the TXN from its * parent's list of known subxacts; this leaves the parent's nsubxacts * count too high, but we don't care. Otherwise, we are deleting the TXN - * from the LSN-ordered list of toplevel TXNs. + * from the LSN-ordered list of toplevel TXNs. We remove the TXN from the + * list of catalog modifying transactions as well. */ dlist_delete(&txn->node); + if (rbtxn_has_catalog_changes(txn)) + { + dlist_delete(&txn->catchange_node); + rb->catchange_ntxns--; + + Assert(rb->catchange_ntxns >= 0); + } /* now remove reference from buffer */ hash_search(rb->by_txn, @@ -3275,10 +3286,16 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) { ReorderBufferTXN *txn; + ReorderBufferTXN *toptxn; txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); - txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES; + if (!rbtxn_has_catalog_changes(txn)) + { + txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES; + dlist_push_tail(&rb->catchange_txns, &txn->catchange_node); + rb->catchange_ntxns++; + } /* * Mark top-level transaction as having catalog changes too if one of its @@ -3286,8 +3303,52 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, * conveniently check just top-level transaction and decide whether to * build the hash table or not. */ - if (txn->toptxn != NULL) - txn->toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES; + toptxn = txn->toptxn; + if (toptxn != NULL && !rbtxn_has_catalog_changes(toptxn)) + { + toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES; + dlist_push_tail(&rb->catchange_txns, &toptxn->catchange_node); + rb->catchange_ntxns++; + } +} + +/* + * Return palloc'ed array of the transactions that have changed catalogs. + * The returned array is sorted in xidComparator order. + * + * The caller must free the returned array when done with it. + */ +TransactionId * +ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb) +{ + dlist_iter iter; + TransactionId *xids = NULL; + size_t xcnt = 0; + + /* Quick return if the list is empty */ + if (rb->catchange_ntxns == 0) + { + Assert(dlist_is_empty(&rb->catchange_txns)); + return NULL; + } + + /* Initialize XID array */ + xids = (TransactionId *) palloc(sizeof(TransactionId) * rb->catchange_ntxns); + dlist_foreach(iter, &rb->catchange_txns) + { + ReorderBufferTXN *txn = dlist_container(ReorderBufferTXN, + catchange_node, + iter.cur); + + Assert(rbtxn_has_catalog_changes(txn)); + + xids[xcnt++] = txn->xid; + } + + qsort(xids, xcnt, sizeof(TransactionId), xidComparator); + + Assert(xcnt == rb->catchange_ntxns); + return xids; } /* |