aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/reorderbuffer.c
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2021-01-04 08:34:50 +0530
committerAmit Kapila <akapila@postgresql.org>2021-01-04 08:34:50 +0530
commita271a1b50e9bec07e2ef3a05e38e7285113e4ce6 (patch)
treea3cd4b3e22169f548a6c92615f8e713f7001e30f /src/backend/replication/logical/reorderbuffer.c
parentca3b37487be333a1d241dab1bbdd17a211a88f43 (diff)
downloadpostgresql-a271a1b50e9bec07e2ef3a05e38e7285113e4ce6.tar.gz
postgresql-a271a1b50e9bec07e2ef3a05e38e7285113e4ce6.zip
Allow decoding at prepare time in ReorderBuffer.
This patch allows PREPARE-time decoding of two-phase transactions (if the output plugin supports this capability), in which case the transactions are replayed at PREPARE and then committed later when COMMIT PREPARED arrives. Now that we decode the changes before the commit, the concurrent aborts may cause failures when the output plugin consults catalogs (both system and user-defined). We detect such failures with a special sqlerrcode ERRCODE_TRANSACTION_ROLLBACK introduced by commit 7259736a6e and stop decoding the remaining changes. Then we rollback the changes when rollback prepared is encountered. Author: Ajin Cherian and Amit Kapila based on previous work by Nikhil Sontakke and Stas Kelvich Reviewed-by: Amit Kapila, Peter Smith, Sawada Masahiko, Arseny Sher, and Dilip Kumar Tested-by: Takamichi Osumi Discussion: https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru https://postgr.es/m/CAMGcDxeqEpWj3fTXwqhSwBdXd2RS9jzwWscO-XbeCfso6ts3+Q@mail.gmail.com
Diffstat (limited to 'src/backend/replication/logical/reorderbuffer.c')
-rw-r--r--src/backend/replication/logical/reorderbuffer.c432
1 files changed, 366 insertions, 66 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index df63b90a67a..315bfe7cae2 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -251,7 +251,8 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
char *change);
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
-static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
+static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
+ bool txn_prepared);
static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
TransactionId xid, XLogSegNo segno);
@@ -422,6 +423,12 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
/* free data that's contained */
+ if (txn->gid != NULL)
+ {
+ pfree(txn->gid);
+ txn->gid = NULL;
+ }
+
if (txn->tuplecid_hash != NULL)
{
hash_destroy(txn->tuplecid_hash);
@@ -1516,12 +1523,18 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
}
/*
- * Discard changes from a transaction (and subtransactions), after streaming
- * them. Keep the remaining info - transactions, tuplecids, invalidations and
- * snapshots.
+ * Discard changes from a transaction (and subtransactions), either after
+ * streaming or decoding them at PREPARE. Keep the remaining info -
+ * transactions, tuplecids, invalidations and snapshots.
+ *
+ * We additionaly remove tuplecids after decoding the transaction at prepare
+ * time as we only need to perform invalidation at rollback or commit prepared.
+ *
+ * 'txn_prepared' indicates that we have decoded the transaction at prepare
+ * time.
*/
static void
-ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
+ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
{
dlist_mutable_iter iter;
@@ -1540,7 +1553,7 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
Assert(rbtxn_is_known_subxact(subtxn));
Assert(subtxn->nsubtxns == 0);
- ReorderBufferTruncateTXN(rb, subtxn);
+ ReorderBufferTruncateTXN(rb, subtxn, txn_prepared);
}
/* cleanup changes in the txn */
@@ -1574,9 +1587,33 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
* about the toplevel xact (we send the XID in all messages), but we never
* stream XIDs of empty subxacts.
*/
- if ((!txn->toptxn) || (txn->nentries_mem != 0))
+ if ((!txn_prepared) && ((!txn->toptxn) || (txn->nentries_mem != 0)))
txn->txn_flags |= RBTXN_IS_STREAMED;
+ if (txn_prepared)
+ {
+ /*
+ * If this is a prepared txn, cleanup the tuplecids we stored for
+ * decoding catalog snapshot access. They are always stored in the
+ * toplevel transaction.
+ */
+ dlist_foreach_modify(iter, &txn->tuplecids)
+ {
+ ReorderBufferChange *change;
+
+ change = dlist_container(ReorderBufferChange, node, iter.cur);
+
+ /* Check we're not mixing changes from different transactions. */
+ Assert(change->txn == txn);
+ Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
+
+ /* Remove the change from its containing list. */
+ dlist_delete(&change->node);
+
+ ReorderBufferReturnChange(rb, change, true);
+ }
+ }
+
/*
* Destroy the (relfilenode, ctid) hashtable, so that we don't leak any
* memory. We could also keep the hash table and update it with new ctid
@@ -1756,9 +1793,10 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
}
/*
- * If the transaction was (partially) streamed, we need to commit it in a
- * 'streamed' way. That is, we first stream the remaining part of the
- * transaction, and then invoke stream_commit message.
+ * If the transaction was (partially) streamed, we need to prepare or commit
+ * it in a 'streamed' way. That is, we first stream the remaining part of the
+ * transaction, and then invoke stream_prepare or stream_commit message as per
+ * the case.
*/
static void
ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
@@ -1768,29 +1806,49 @@ ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
ReorderBufferStreamTXN(rb, txn);
- rb->stream_commit(rb, txn, txn->final_lsn);
+ if (rbtxn_prepared(txn))
+ {
+ /*
+ * Note, we send stream prepare even if a concurrent abort is
+ * detected. See DecodePrepare for more information.
+ */
+ rb->stream_prepare(rb, txn, txn->final_lsn);
- ReorderBufferCleanupTXN(rb, txn);
+ /*
+ * This is a PREPARED transaction, part of a two-phase commit. The
+ * full cleanup will happen as part of the COMMIT PREPAREDs, so now
+ * just truncate txn by removing changes and tuple_cids.
+ */
+ ReorderBufferTruncateTXN(rb, txn, true);
+ /* Reset the CheckXidAlive */
+ CheckXidAlive = InvalidTransactionId;
+ }
+ else
+ {
+ rb->stream_commit(rb, txn, txn->final_lsn);
+ ReorderBufferCleanupTXN(rb, txn);
+ }
}
/*
* Set xid to detect concurrent aborts.
*
- * While streaming an in-progress transaction there is a possibility that the
- * (sub)transaction might get aborted concurrently. In such case if the
- * (sub)transaction has catalog update then we might decode the tuple using
- * wrong catalog version. For example, suppose there is one catalog tuple with
- * (xmin: 500, xmax: 0). Now, the transaction 501 updates the catalog tuple
- * and after that we will have two tuples (xmin: 500, xmax: 501) and
- * (xmin: 501, xmax: 0). Now, if 501 is aborted and some other transaction
- * say 502 updates the same catalog tuple then the first tuple will be changed
- * to (xmin: 500, xmax: 502). So, the problem is that when we try to decode
- * the tuple inserted/updated in 501 after the catalog update, we will see the
- * catalog tuple with (xmin: 500, xmax: 502) as visible because it will
- * consider that the tuple is deleted by xid 502 which is not visible to our
- * snapshot. And when we will try to decode with that catalog tuple, it can
- * lead to a wrong result or a crash. So, it is necessary to detect
- * concurrent aborts to allow streaming of in-progress transactions.
+ * While streaming an in-progress transaction or decoding a prepared
+ * transaction there is a possibility that the (sub)transaction might get
+ * aborted concurrently. In such case if the (sub)transaction has catalog
+ * update then we might decode the tuple using wrong catalog version. For
+ * example, suppose there is one catalog tuple with (xmin: 500, xmax: 0). Now,
+ * the transaction 501 updates the catalog tuple and after that we will have
+ * two tuples (xmin: 500, xmax: 501) and (xmin: 501, xmax: 0). Now, if 501 is
+ * aborted and some other transaction say 502 updates the same catalog tuple
+ * then the first tuple will be changed to (xmin: 500, xmax: 502). So, the
+ * problem is that when we try to decode the tuple inserted/updated in 501
+ * after the catalog update, we will see the catalog tuple with (xmin: 500,
+ * xmax: 502) as visible because it will consider that the tuple is deleted by
+ * xid 502 which is not visible to our snapshot. And when we will try to
+ * decode with that catalog tuple, it can lead to a wrong result or a crash.
+ * So, it is necessary to detect concurrent aborts to allow streaming of
+ * in-progress transactions or decoding of prepared transactions.
*
* For detecting the concurrent abort we set CheckXidAlive to the current
* (sub)transaction's xid for which this change belongs to. And, during
@@ -1799,7 +1857,10 @@ ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
* and discard the already streamed changes on such an error. We might have
* already streamed some of the changes for the aborted (sub)transaction, but
* that is fine because when we decode the abort we will stream abort message
- * to truncate the changes in the subscriber.
+ * to truncate the changes in the subscriber. Similarly, for prepared
+ * transactions, we stop decoding if concurrent abort is detected and then
+ * rollback the changes when rollback prepared is encountered. See
+ * DecodePreare.
*/
static inline void
SetupCheckXidLive(TransactionId xid)
@@ -1901,7 +1962,7 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
ReorderBufferChange *specinsert)
{
/* Discard the changes that we just streamed */
- ReorderBufferTruncateTXN(rb, txn);
+ ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
/* Free all resources allocated for toast reconstruction */
ReorderBufferToastReset(rb, txn);
@@ -1913,15 +1974,19 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
specinsert = NULL;
}
- /* Stop the stream. */
- rb->stream_stop(rb, txn, last_lsn);
-
- /* Remember the command ID and snapshot for the streaming run */
- ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
+ /*
+ * For the streaming case, stop the stream and remember the command ID and
+ * snapshot for the streaming run.
+ */
+ if (rbtxn_is_streamed(txn))
+ {
+ rb->stream_stop(rb, txn, last_lsn);
+ ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
+ }
}
/*
- * Helper function for ReorderBufferCommit and ReorderBufferStreamTXN.
+ * Helper function for ReorderBufferReplay and ReorderBufferStreamTXN.
*
* Send data of a transaction (and its subtransactions) to the
* output plugin. We iterate over the top and subtransactions (using a k-way
@@ -1974,9 +2039,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
else
StartTransactionCommand();
- /* We only need to send begin/commit for non-streamed transactions. */
+ /*
+ * We only need to send begin/begin-prepare for non-streamed
+ * transactions.
+ */
if (!streaming)
- rb->begin(rb, txn);
+ {
+ if (rbtxn_prepared(txn))
+ rb->begin_prepare(rb, txn);
+ else
+ rb->begin(rb, txn);
+ }
ReorderBufferIterTXNInit(rb, txn, &iterstate);
while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
@@ -2007,8 +2080,12 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
prev_lsn = change->lsn;
- /* Set the current xid to detect concurrent aborts. */
- if (streaming)
+ /*
+ * Set the current xid to detect concurrent aborts. This is
+ * required for the cases when we decode the changes before the
+ * COMMIT record is processed.
+ */
+ if (streaming || rbtxn_prepared(change->txn))
{
curtxn = change->txn;
SetupCheckXidLive(curtxn->xid);
@@ -2299,7 +2376,16 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
}
}
else
- rb->commit(rb, txn, commit_lsn);
+ {
+ /*
+ * Call either PREPARE (for two-phase transactions) or COMMIT (for
+ * regular ones).
+ */
+ if (rbtxn_prepared(txn))
+ rb->prepare(rb, txn, commit_lsn);
+ else
+ rb->commit(rb, txn, commit_lsn);
+ }
/* this is just a sanity check against bad output plugin behaviour */
if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
@@ -2333,15 +2419,22 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
RollbackAndReleaseCurrentSubTransaction();
/*
- * If we are streaming the in-progress transaction then discard the
- * changes that we just streamed, and mark the transactions as
- * streamed (if they contained changes). Otherwise, remove all the
- * changes and deallocate the ReorderBufferTXN.
+ * We are here due to one of the four reasons: 1. Decoding an
+ * in-progress txn. 2. Decoding a prepared txn. 3. Decoding of a
+ * prepared txn that was (partially) streamed. 4. Decoding a committed
+ * txn.
+ *
+ * For 1, we allow truncation of txn data by removing the changes
+ * already streamed but still keeping other things like invalidations,
+ * snapshot, and tuplecids. For 2 and 3, we indicate
+ * ReorderBufferTruncateTXN to do more elaborate truncation of txn
+ * data as the entire transaction has been decoded except for commit.
+ * For 4, as the entire txn has been decoded, we can fully clean up
+ * the TXN reorder buffer.
*/
- if (streaming)
+ if (streaming || rbtxn_prepared(txn))
{
- ReorderBufferTruncateTXN(rb, txn);
-
+ ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
/* Reset the CheckXidAlive */
CheckXidAlive = InvalidTransactionId;
}
@@ -2374,17 +2467,20 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
/*
* The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
- * abort of the (sub)transaction we are streaming. We need to do the
- * cleanup and return gracefully on this error, see SetupCheckXidLive.
+ * abort of the (sub)transaction we are streaming or preparing. We
+ * need to do the cleanup and return gracefully on this error, see
+ * SetupCheckXidLive.
*/
if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK)
{
/*
- * This error can only occur when we are sending the data in
- * streaming mode and the streaming is not finished yet.
+ * This error can occur either when we are sending the data in
+ * streaming mode and the streaming is not finished yet or when we
+ * are sending the data out on a PREPARE during a two-phase
+ * commit.
*/
- Assert(streaming);
- Assert(stream_started);
+ Assert(streaming || rbtxn_prepared(txn));
+ Assert(stream_started || rbtxn_prepared(txn));
/* Cleanup the temporary error state. */
FlushErrorState();
@@ -2414,26 +2510,19 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
* ReorderBufferCommitChild(), even if previously assigned to the toplevel
* transaction with ReorderBufferAssignChild.
*
- * This interface is called once a toplevel commit is read for both streamed
- * as well as non-streamed transactions.
+ * This interface is called once a prepare or toplevel commit is read for both
+ * streamed as well as non-streamed transactions.
*/
-void
-ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
+static void
+ReorderBufferReplay(ReorderBufferTXN *txn,
+ ReorderBuffer *rb, TransactionId xid,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
TimestampTz commit_time,
RepOriginId origin_id, XLogRecPtr origin_lsn)
{
- ReorderBufferTXN *txn;
Snapshot snapshot_now;
CommandId command_id = FirstCommandId;
- txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
- false);
-
- /* unknown transaction, nothing to replay */
- if (txn == NULL)
- return;
-
txn->final_lsn = commit_lsn;
txn->end_lsn = end_lsn;
txn->commit_time = commit_time;
@@ -2463,7 +2552,13 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
if (txn->base_snapshot == NULL)
{
Assert(txn->ninvalidations == 0);
- ReorderBufferCleanupTXN(rb, txn);
+
+ /*
+ * Removing this txn before a commit might result in the computation
+ * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts.
+ */
+ if (!rbtxn_prepared(txn))
+ ReorderBufferCleanupTXN(rb, txn);
return;
}
@@ -2475,6 +2570,178 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
}
/*
+ * Commit a transaction.
+ *
+ * See comments for ReorderBufferReplay().
+ */
+void
+ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+ TimestampTz commit_time,
+ RepOriginId origin_id, XLogRecPtr origin_lsn)
+{
+ ReorderBufferTXN *txn;
+
+ txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+ false);
+
+ /* unknown transaction, nothing to replay */
+ if (txn == NULL)
+ return;
+
+ ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
+ origin_id, origin_lsn);
+}
+
+/*
+ * Record the prepare information for a transaction.
+ */
+bool
+ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr prepare_lsn, XLogRecPtr end_lsn,
+ TimestampTz prepare_time,
+ RepOriginId origin_id, XLogRecPtr origin_lsn)
+{
+ ReorderBufferTXN *txn;
+
+ txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
+
+ /* unknown transaction, nothing to do */
+ if (txn == NULL)
+ return false;
+
+ /*
+ * Remember the prepare information to be later used by commit prepared in
+ * case we skip doing prepare.
+ */
+ txn->final_lsn = prepare_lsn;
+ txn->end_lsn = end_lsn;
+ txn->commit_time = prepare_time;
+ txn->origin_id = origin_id;
+ txn->origin_lsn = origin_lsn;
+
+ return true;
+}
+
+/* Remember that we have skipped prepare */
+void
+ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid)
+{
+ ReorderBufferTXN *txn;
+
+ txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
+
+ /* unknown transaction, nothing to do */
+ if (txn == NULL)
+ return;
+
+ txn->txn_flags |= RBTXN_SKIPPED_PREPARE;
+}
+
+/*
+ * Prepare a two-phase transaction.
+ *
+ * See comments for ReorderBufferReplay().
+ */
+void
+ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
+ char *gid)
+{
+ ReorderBufferTXN *txn;
+
+ txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+ false);
+
+ /* unknown transaction, nothing to replay */
+ if (txn == NULL)
+ return;
+
+ txn->txn_flags |= RBTXN_PREPARE;
+ txn->gid = pstrdup(gid);
+
+ /* The prepare info must have been updated in txn by now. */
+ Assert(txn->final_lsn != InvalidXLogRecPtr);
+
+ ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
+ txn->commit_time, txn->origin_id, txn->origin_lsn);
+}
+
+/*
+ * This is used to handle COMMIT/ROLLBACK PREPARED.
+ */
+void
+ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+ TimestampTz commit_time, RepOriginId origin_id,
+ XLogRecPtr origin_lsn, char *gid, bool is_commit)
+{
+ ReorderBufferTXN *txn;
+ XLogRecPtr prepare_end_lsn;
+ TimestampTz prepare_time;
+
+ txn = ReorderBufferTXNByXid(rb, xid, true, NULL, commit_lsn, false);
+
+ /* unknown transaction, nothing to do */
+ if (txn == NULL)
+ return;
+
+ /*
+ * By this time the txn has the prepare record information, remember it to
+ * be later used for rollback.
+ */
+ prepare_end_lsn = txn->end_lsn;
+ prepare_time = txn->commit_time;
+
+ /* add the gid in the txn */
+ txn->gid = pstrdup(gid);
+
+ /*
+ * It is possible that this transaction is not decoded at prepare time
+ * either because by that time we didn't have a consistent snapshot or it
+ * was decoded earlier but we have restarted. We can't distinguish between
+ * those two cases so we send the prepare in both the cases and let
+ * downstream decide whether to process or skip it. We don't need to
+ * decode the xact for aborts if it is not done already.
+ */
+ if (!rbtxn_prepared(txn) && is_commit)
+ {
+ txn->txn_flags |= RBTXN_PREPARE;
+
+ /*
+ * The prepare info must have been updated in txn even if we skip
+ * prepare.
+ */
+ Assert(txn->final_lsn != InvalidXLogRecPtr);
+
+ /*
+ * By this time the txn has the prepare record information and it is
+ * important to use that so that downstream gets the accurate
+ * information. If instead, we have passed commit information here
+ * then downstream can behave as it has already replayed commit
+ * prepared after the restart.
+ */
+ ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
+ txn->commit_time, txn->origin_id, txn->origin_lsn);
+ }
+
+ txn->final_lsn = commit_lsn;
+ txn->end_lsn = end_lsn;
+ txn->commit_time = commit_time;
+ txn->origin_id = origin_id;
+ txn->origin_lsn = origin_lsn;
+
+ if (is_commit)
+ rb->commit_prepared(rb, txn, commit_lsn);
+ else
+ rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
+
+ /* cleanup: make sure there's no cache pollution */
+ ReorderBufferExecuteInvalidations(txn->ninvalidations,
+ txn->invalidations);
+ ReorderBufferCleanupTXN(rb, txn);
+}
+
+/*
* Abort a transaction that possibly has previous changes. Needs to be first
* called for subtransactions and then for the toplevel xid.
*
@@ -2606,6 +2873,39 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
}
/*
+ * Invalidate cache for those transactions that need to be skipped just in case
+ * catalogs were manipulated as part of the transaction.
+ *
+ * Note that this is a special-purpose function for prepared transactions where
+ * we don't want to clean up the TXN even when we decide to skip it. See
+ * DecodePrepare.
+ */
+void
+ReorderBufferInvalidate(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
+{
+ ReorderBufferTXN *txn;
+
+ txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+ false);
+
+ /* unknown, nothing to do */
+ if (txn == NULL)
+ return;
+
+ /*
+ * Process cache invalidation messages if there are any. Even if we're not
+ * interested in the transaction's contents, it could have manipulated the
+ * catalog and we need to update the caches according to that.
+ */
+ if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
+ ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
+ txn->invalidations);
+ else
+ Assert(txn->ninvalidations == 0);
+}
+
+
+/*
* Execute invalidations happening outside the context of a decoded
* transaction. That currently happens either for xid-less commits
* (cf. RecordTransactionCommit()) or for invalidations in uninteresting