diff options
author | Amit Kapila <akapila@postgresql.org> | 2020-07-23 08:19:07 +0530 |
---|---|---|
committer | Amit Kapila <akapila@postgresql.org> | 2020-07-23 08:34:48 +0530 |
commit | c55040ccd017962b7b8d3fbcdc184aa90c722a21 (patch) | |
tree | 8d15aa2f834cf0d0ac59391ee6f202577eeaf058 /src/backend/replication/logical/reorderbuffer.c | |
parent | 38f60f174e3279069b5547b5f4015eb4a8492037 (diff) | |
download | postgresql-c55040ccd017962b7b8d3fbcdc184aa90c722a21.tar.gz postgresql-c55040ccd017962b7b8d3fbcdc184aa90c722a21.zip |
WAL Log invalidations at command end with wal_level=logical.
When wal_level=logical, write invalidations at command end into WAL so
that decoding can use this information.
This patch is required to allow the streaming of in-progress transactions
in logical decoding. The actual work to allow streaming will be committed
as a separate patch.
We still add the invalidations to the cache and write them to WAL at
commit time in RecordTransactionCommit(). This uses the existing
XLOG_INVALIDATIONS xlog record type, from the RM_STANDBY_ID resource
manager (see LogStandbyInvalidations for details).
So existing code relying on those invalidations (e.g. redo) does not need
to be changed.
The invalidations written at command end uses a new xlog record type
XLOG_XACT_INVALIDATIONS, from RM_XACT_ID resource manager. See
LogLogicalInvalidations for details.
These new xlog records are ignored by existing redo procedures, which
still rely on the invalidations written to commit records.
The invalidations are decoded and accumulated in top-transaction, and then
executed during replay. This obviates the need to decode the
invalidations as part of a commit record.
Bump XLOG_PAGE_MAGIC, since this introduces XLOG_XACT_INVALIDATIONS.
Author: Dilip Kumar, Tomas Vondra, Amit Kapila
Reviewed-by: Amit Kapila
Tested-by: Neha Sharma and Mahendra Singh Thalor
Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
Diffstat (limited to 'src/backend/replication/logical/reorderbuffer.c')
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 52 |
1 files changed, 43 insertions, 9 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 449327a147f..ce6e62152f0 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -856,6 +856,9 @@ ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, subtxn->toplevel_xid = xid; Assert(subtxn->nsubtxns == 0); + /* set the reference to top-level transaction */ + subtxn->toptxn = txn; + /* add to subtransaction list */ dlist_push_tail(&txn->subtxns, &subtxn->node); txn->nsubtxns++; @@ -2201,7 +2204,11 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, /* * Setup the invalidation of the toplevel transaction. * - * This needs to be done before ReorderBufferCommit is called! + * This needs to be called for each XLOG_XACT_INVALIDATIONS message and + * accumulates all the invalidation messages in the toplevel transaction. + * This is required because in some cases where we skip processing the + * transaction (see ReorderBufferForget), we need to execute all the + * invalidations together. */ void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, @@ -2212,17 +2219,35 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); - if (txn->ninvalidations != 0) - elog(ERROR, "only ever add one set of invalidations"); + /* + * We collect all the invalidations under the top transaction so that we + * can execute them all together. + */ + if (txn->toptxn) + txn = txn->toptxn; Assert(nmsgs > 0); - txn->ninvalidations = nmsgs; - txn->invalidations = (SharedInvalidationMessage *) - MemoryContextAlloc(rb->context, - sizeof(SharedInvalidationMessage) * nmsgs); - memcpy(txn->invalidations, msgs, - sizeof(SharedInvalidationMessage) * nmsgs); + /* Accumulate invalidations. */ + if (txn->ninvalidations == 0) + { + txn->ninvalidations = nmsgs; + txn->invalidations = (SharedInvalidationMessage *) + MemoryContextAlloc(rb->context, + sizeof(SharedInvalidationMessage) * nmsgs); + memcpy(txn->invalidations, msgs, + sizeof(SharedInvalidationMessage) * nmsgs); + } + else + { + txn->invalidations = (SharedInvalidationMessage *) + repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) * + (txn->ninvalidations + nmsgs)); + + memcpy(txn->invalidations + txn->ninvalidations, msgs, + nmsgs * sizeof(SharedInvalidationMessage)); + txn->ninvalidations += nmsgs; + } } /* @@ -2250,6 +2275,15 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES; + + /* + * Mark top-level transaction as having catalog changes too if one of its + * children has so that the ReorderBufferBuildTupleCidHash can + * conveniently check just top-level transaction and decide whether to + * build the hash table or not. + */ + if (txn->toptxn != NULL) + txn->toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES; } /* |