aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/reorderbuffer.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/reorderbuffer.c')
-rw-r--r--src/backend/replication/logical/reorderbuffer.c432
1 files changed, 366 insertions, 66 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index df63b90a67a..315bfe7cae2 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -251,7 +251,8 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
char *change);
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
-static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
+static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
+ bool txn_prepared);
static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
TransactionId xid, XLogSegNo segno);
@@ -422,6 +423,12 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
/* free data that's contained */
+ if (txn->gid != NULL)
+ {
+ pfree(txn->gid);
+ txn->gid = NULL;
+ }
+
if (txn->tuplecid_hash != NULL)
{
hash_destroy(txn->tuplecid_hash);
@@ -1516,12 +1523,18 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
}
/*
- * Discard changes from a transaction (and subtransactions), after streaming
- * them. Keep the remaining info - transactions, tuplecids, invalidations and
- * snapshots.
+ * Discard changes from a transaction (and subtransactions), either after
+ * streaming or decoding them at PREPARE. Keep the remaining info -
+ * transactions, tuplecids, invalidations and snapshots.
+ *
+ * We additionaly remove tuplecids after decoding the transaction at prepare
+ * time as we only need to perform invalidation at rollback or commit prepared.
+ *
+ * 'txn_prepared' indicates that we have decoded the transaction at prepare
+ * time.
*/
static void
-ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
+ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
{
dlist_mutable_iter iter;
@@ -1540,7 +1553,7 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
Assert(rbtxn_is_known_subxact(subtxn));
Assert(subtxn->nsubtxns == 0);
- ReorderBufferTruncateTXN(rb, subtxn);
+ ReorderBufferTruncateTXN(rb, subtxn, txn_prepared);
}
/* cleanup changes in the txn */
@@ -1574,9 +1587,33 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
* about the toplevel xact (we send the XID in all messages), but we never
* stream XIDs of empty subxacts.
*/
- if ((!txn->toptxn) || (txn->nentries_mem != 0))
+ if ((!txn_prepared) && ((!txn->toptxn) || (txn->nentries_mem != 0)))
txn->txn_flags |= RBTXN_IS_STREAMED;
+ if (txn_prepared)
+ {
+ /*
+ * If this is a prepared txn, cleanup the tuplecids we stored for
+ * decoding catalog snapshot access. They are always stored in the
+ * toplevel transaction.
+ */
+ dlist_foreach_modify(iter, &txn->tuplecids)
+ {
+ ReorderBufferChange *change;
+
+ change = dlist_container(ReorderBufferChange, node, iter.cur);
+
+ /* Check we're not mixing changes from different transactions. */
+ Assert(change->txn == txn);
+ Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
+
+ /* Remove the change from its containing list. */
+ dlist_delete(&change->node);
+
+ ReorderBufferReturnChange(rb, change, true);
+ }
+ }
+
/*
* Destroy the (relfilenode, ctid) hashtable, so that we don't leak any
* memory. We could also keep the hash table and update it with new ctid
@@ -1756,9 +1793,10 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
}
/*
- * If the transaction was (partially) streamed, we need to commit it in a
- * 'streamed' way. That is, we first stream the remaining part of the
- * transaction, and then invoke stream_commit message.
+ * If the transaction was (partially) streamed, we need to prepare or commit
+ * it in a 'streamed' way. That is, we first stream the remaining part of the
+ * transaction, and then invoke stream_prepare or stream_commit message as per
+ * the case.
*/
static void
ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
@@ -1768,29 +1806,49 @@ ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
ReorderBufferStreamTXN(rb, txn);
- rb->stream_commit(rb, txn, txn->final_lsn);
+ if (rbtxn_prepared(txn))
+ {
+ /*
+ * Note, we send stream prepare even if a concurrent abort is
+ * detected. See DecodePrepare for more information.
+ */
+ rb->stream_prepare(rb, txn, txn->final_lsn);
- ReorderBufferCleanupTXN(rb, txn);
+ /*
+ * This is a PREPARED transaction, part of a two-phase commit. The
+ * full cleanup will happen as part of the COMMIT PREPAREDs, so now
+ * just truncate txn by removing changes and tuple_cids.
+ */
+ ReorderBufferTruncateTXN(rb, txn, true);
+ /* Reset the CheckXidAlive */
+ CheckXidAlive = InvalidTransactionId;
+ }
+ else
+ {
+ rb->stream_commit(rb, txn, txn->final_lsn);
+ ReorderBufferCleanupTXN(rb, txn);
+ }
}
/*
* Set xid to detect concurrent aborts.
*
- * While streaming an in-progress transaction there is a possibility that the
- * (sub)transaction might get aborted concurrently. In such case if the
- * (sub)transaction has catalog update then we might decode the tuple using
- * wrong catalog version. For example, suppose there is one catalog tuple with
- * (xmin: 500, xmax: 0). Now, the transaction 501 updates the catalog tuple
- * and after that we will have two tuples (xmin: 500, xmax: 501) and
- * (xmin: 501, xmax: 0). Now, if 501 is aborted and some other transaction
- * say 502 updates the same catalog tuple then the first tuple will be changed
- * to (xmin: 500, xmax: 502). So, the problem is that when we try to decode
- * the tuple inserted/updated in 501 after the catalog update, we will see the
- * catalog tuple with (xmin: 500, xmax: 502) as visible because it will
- * consider that the tuple is deleted by xid 502 which is not visible to our
- * snapshot. And when we will try to decode with that catalog tuple, it can
- * lead to a wrong result or a crash. So, it is necessary to detect
- * concurrent aborts to allow streaming of in-progress transactions.
+ * While streaming an in-progress transaction or decoding a prepared
+ * transaction there is a possibility that the (sub)transaction might get
+ * aborted concurrently. In such case if the (sub)transaction has catalog
+ * update then we might decode the tuple using wrong catalog version. For
+ * example, suppose there is one catalog tuple with (xmin: 500, xmax: 0). Now,
+ * the transaction 501 updates the catalog tuple and after that we will have
+ * two tuples (xmin: 500, xmax: 501) and (xmin: 501, xmax: 0). Now, if 501 is
+ * aborted and some other transaction say 502 updates the same catalog tuple
+ * then the first tuple will be changed to (xmin: 500, xmax: 502). So, the
+ * problem is that when we try to decode the tuple inserted/updated in 501
+ * after the catalog update, we will see the catalog tuple with (xmin: 500,
+ * xmax: 502) as visible because it will consider that the tuple is deleted by
+ * xid 502 which is not visible to our snapshot. And when we will try to
+ * decode with that catalog tuple, it can lead to a wrong result or a crash.
+ * So, it is necessary to detect concurrent aborts to allow streaming of
+ * in-progress transactions or decoding of prepared transactions.
*
* For detecting the concurrent abort we set CheckXidAlive to the current
* (sub)transaction's xid for which this change belongs to. And, during
@@ -1799,7 +1857,10 @@ ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
* and discard the already streamed changes on such an error. We might have
* already streamed some of the changes for the aborted (sub)transaction, but
* that is fine because when we decode the abort we will stream abort message
- * to truncate the changes in the subscriber.
+ * to truncate the changes in the subscriber. Similarly, for prepared
+ * transactions, we stop decoding if concurrent abort is detected and then
+ * rollback the changes when rollback prepared is encountered. See
+ * DecodePreare.
*/
static inline void
SetupCheckXidLive(TransactionId xid)
@@ -1901,7 +1962,7 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
ReorderBufferChange *specinsert)
{
/* Discard the changes that we just streamed */
- ReorderBufferTruncateTXN(rb, txn);
+ ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
/* Free all resources allocated for toast reconstruction */
ReorderBufferToastReset(rb, txn);
@@ -1913,15 +1974,19 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
specinsert = NULL;
}
- /* Stop the stream. */
- rb->stream_stop(rb, txn, last_lsn);
-
- /* Remember the command ID and snapshot for the streaming run */
- ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
+ /*
+ * For the streaming case, stop the stream and remember the command ID and
+ * snapshot for the streaming run.
+ */
+ if (rbtxn_is_streamed(txn))
+ {
+ rb->stream_stop(rb, txn, last_lsn);
+ ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
+ }
}
/*
- * Helper function for ReorderBufferCommit and ReorderBufferStreamTXN.
+ * Helper function for ReorderBufferReplay and ReorderBufferStreamTXN.
*
* Send data of a transaction (and its subtransactions) to the
* output plugin. We iterate over the top and subtransactions (using a k-way
@@ -1974,9 +2039,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
else
StartTransactionCommand();
- /* We only need to send begin/commit for non-streamed transactions. */
+ /*
+ * We only need to send begin/begin-prepare for non-streamed
+ * transactions.
+ */
if (!streaming)
- rb->begin(rb, txn);
+ {
+ if (rbtxn_prepared(txn))
+ rb->begin_prepare(rb, txn);
+ else
+ rb->begin(rb, txn);
+ }
ReorderBufferIterTXNInit(rb, txn, &iterstate);
while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
@@ -2007,8 +2080,12 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
prev_lsn = change->lsn;
- /* Set the current xid to detect concurrent aborts. */
- if (streaming)
+ /*
+ * Set the current xid to detect concurrent aborts. This is
+ * required for the cases when we decode the changes before the
+ * COMMIT record is processed.
+ */
+ if (streaming || rbtxn_prepared(change->txn))
{
curtxn = change->txn;
SetupCheckXidLive(curtxn->xid);
@@ -2299,7 +2376,16 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
}
}
else
- rb->commit(rb, txn, commit_lsn);
+ {
+ /*
+ * Call either PREPARE (for two-phase transactions) or COMMIT (for
+ * regular ones).
+ */
+ if (rbtxn_prepared(txn))
+ rb->prepare(rb, txn, commit_lsn);
+ else
+ rb->commit(rb, txn, commit_lsn);
+ }
/* this is just a sanity check against bad output plugin behaviour */
if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
@@ -2333,15 +2419,22 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
RollbackAndReleaseCurrentSubTransaction();
/*
- * If we are streaming the in-progress transaction then discard the
- * changes that we just streamed, and mark the transactions as
- * streamed (if they contained changes). Otherwise, remove all the
- * changes and deallocate the ReorderBufferTXN.
+ * We are here due to one of the four reasons: 1. Decoding an
+ * in-progress txn. 2. Decoding a prepared txn. 3. Decoding of a
+ * prepared txn that was (partially) streamed. 4. Decoding a committed
+ * txn.
+ *
+ * For 1, we allow truncation of txn data by removing the changes
+ * already streamed but still keeping other things like invalidations,
+ * snapshot, and tuplecids. For 2 and 3, we indicate
+ * ReorderBufferTruncateTXN to do more elaborate truncation of txn
+ * data as the entire transaction has been decoded except for commit.
+ * For 4, as the entire txn has been decoded, we can fully clean up
+ * the TXN reorder buffer.
*/
- if (streaming)
+ if (streaming || rbtxn_prepared(txn))
{
- ReorderBufferTruncateTXN(rb, txn);
-
+ ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
/* Reset the CheckXidAlive */
CheckXidAlive = InvalidTransactionId;
}
@@ -2374,17 +2467,20 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
/*
* The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
- * abort of the (sub)transaction we are streaming. We need to do the
- * cleanup and return gracefully on this error, see SetupCheckXidLive.
+ * abort of the (sub)transaction we are streaming or preparing. We
+ * need to do the cleanup and return gracefully on this error, see
+ * SetupCheckXidLive.
*/
if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK)
{
/*
- * This error can only occur when we are sending the data in
- * streaming mode and the streaming is not finished yet.
+ * This error can occur either when we are sending the data in
+ * streaming mode and the streaming is not finished yet or when we
+ * are sending the data out on a PREPARE during a two-phase
+ * commit.
*/
- Assert(streaming);
- Assert(stream_started);
+ Assert(streaming || rbtxn_prepared(txn));
+ Assert(stream_started || rbtxn_prepared(txn));
/* Cleanup the temporary error state. */
FlushErrorState();
@@ -2414,26 +2510,19 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
* ReorderBufferCommitChild(), even if previously assigned to the toplevel
* transaction with ReorderBufferAssignChild.
*
- * This interface is called once a toplevel commit is read for both streamed
- * as well as non-streamed transactions.
+ * This interface is called once a prepare or toplevel commit is read for both
+ * streamed as well as non-streamed transactions.
*/
-void
-ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
+static void
+ReorderBufferReplay(ReorderBufferTXN *txn,
+ ReorderBuffer *rb, TransactionId xid,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
TimestampTz commit_time,
RepOriginId origin_id, XLogRecPtr origin_lsn)
{
- ReorderBufferTXN *txn;
Snapshot snapshot_now;
CommandId command_id = FirstCommandId;
- txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
- false);
-
- /* unknown transaction, nothing to replay */
- if (txn == NULL)
- return;
-
txn->final_lsn = commit_lsn;
txn->end_lsn = end_lsn;
txn->commit_time = commit_time;
@@ -2463,7 +2552,13 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
if (txn->base_snapshot == NULL)
{
Assert(txn->ninvalidations == 0);
- ReorderBufferCleanupTXN(rb, txn);
+
+ /*
+ * Removing this txn before a commit might result in the computation
+ * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts.
+ */
+ if (!rbtxn_prepared(txn))
+ ReorderBufferCleanupTXN(rb, txn);
return;
}
@@ -2475,6 +2570,178 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
}
/*
+ * Commit a transaction.
+ *
+ * See comments for ReorderBufferReplay().
+ */
+void
+ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+ TimestampTz commit_time,
+ RepOriginId origin_id, XLogRecPtr origin_lsn)
+{
+ ReorderBufferTXN *txn;
+
+ txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+ false);
+
+ /* unknown transaction, nothing to replay */
+ if (txn == NULL)
+ return;
+
+ ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
+ origin_id, origin_lsn);
+}
+
+/*
+ * Record the prepare information for a transaction.
+ */
+bool
+ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr prepare_lsn, XLogRecPtr end_lsn,
+ TimestampTz prepare_time,
+ RepOriginId origin_id, XLogRecPtr origin_lsn)
+{
+ ReorderBufferTXN *txn;
+
+ txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
+
+ /* unknown transaction, nothing to do */
+ if (txn == NULL)
+ return false;
+
+ /*
+ * Remember the prepare information to be later used by commit prepared in
+ * case we skip doing prepare.
+ */
+ txn->final_lsn = prepare_lsn;
+ txn->end_lsn = end_lsn;
+ txn->commit_time = prepare_time;
+ txn->origin_id = origin_id;
+ txn->origin_lsn = origin_lsn;
+
+ return true;
+}
+
+/* Remember that we have skipped prepare */
+void
+ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid)
+{
+ ReorderBufferTXN *txn;
+
+ txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
+
+ /* unknown transaction, nothing to do */
+ if (txn == NULL)
+ return;
+
+ txn->txn_flags |= RBTXN_SKIPPED_PREPARE;
+}
+
+/*
+ * Prepare a two-phase transaction.
+ *
+ * See comments for ReorderBufferReplay().
+ */
+void
+ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
+ char *gid)
+{
+ ReorderBufferTXN *txn;
+
+ txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+ false);
+
+ /* unknown transaction, nothing to replay */
+ if (txn == NULL)
+ return;
+
+ txn->txn_flags |= RBTXN_PREPARE;
+ txn->gid = pstrdup(gid);
+
+ /* The prepare info must have been updated in txn by now. */
+ Assert(txn->final_lsn != InvalidXLogRecPtr);
+
+ ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
+ txn->commit_time, txn->origin_id, txn->origin_lsn);
+}
+
+/*
+ * This is used to handle COMMIT/ROLLBACK PREPARED.
+ */
+void
+ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+ TimestampTz commit_time, RepOriginId origin_id,
+ XLogRecPtr origin_lsn, char *gid, bool is_commit)
+{
+ ReorderBufferTXN *txn;
+ XLogRecPtr prepare_end_lsn;
+ TimestampTz prepare_time;
+
+ txn = ReorderBufferTXNByXid(rb, xid, true, NULL, commit_lsn, false);
+
+ /* unknown transaction, nothing to do */
+ if (txn == NULL)
+ return;
+
+ /*
+ * By this time the txn has the prepare record information, remember it to
+ * be later used for rollback.
+ */
+ prepare_end_lsn = txn->end_lsn;
+ prepare_time = txn->commit_time;
+
+ /* add the gid in the txn */
+ txn->gid = pstrdup(gid);
+
+ /*
+ * It is possible that this transaction is not decoded at prepare time
+ * either because by that time we didn't have a consistent snapshot or it
+ * was decoded earlier but we have restarted. We can't distinguish between
+ * those two cases so we send the prepare in both the cases and let
+ * downstream decide whether to process or skip it. We don't need to
+ * decode the xact for aborts if it is not done already.
+ */
+ if (!rbtxn_prepared(txn) && is_commit)
+ {
+ txn->txn_flags |= RBTXN_PREPARE;
+
+ /*
+ * The prepare info must have been updated in txn even if we skip
+ * prepare.
+ */
+ Assert(txn->final_lsn != InvalidXLogRecPtr);
+
+ /*
+ * By this time the txn has the prepare record information and it is
+ * important to use that so that downstream gets the accurate
+ * information. If instead, we have passed commit information here
+ * then downstream can behave as it has already replayed commit
+ * prepared after the restart.
+ */
+ ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
+ txn->commit_time, txn->origin_id, txn->origin_lsn);
+ }
+
+ txn->final_lsn = commit_lsn;
+ txn->end_lsn = end_lsn;
+ txn->commit_time = commit_time;
+ txn->origin_id = origin_id;
+ txn->origin_lsn = origin_lsn;
+
+ if (is_commit)
+ rb->commit_prepared(rb, txn, commit_lsn);
+ else
+ rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
+
+ /* cleanup: make sure there's no cache pollution */
+ ReorderBufferExecuteInvalidations(txn->ninvalidations,
+ txn->invalidations);
+ ReorderBufferCleanupTXN(rb, txn);
+}
+
+/*
* Abort a transaction that possibly has previous changes. Needs to be first
* called for subtransactions and then for the toplevel xid.
*
@@ -2606,6 +2873,39 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
}
/*
+ * Invalidate cache for those transactions that need to be skipped just in case
+ * catalogs were manipulated as part of the transaction.
+ *
+ * Note that this is a special-purpose function for prepared transactions where
+ * we don't want to clean up the TXN even when we decide to skip it. See
+ * DecodePrepare.
+ */
+void
+ReorderBufferInvalidate(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
+{
+ ReorderBufferTXN *txn;
+
+ txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+ false);
+
+ /* unknown, nothing to do */
+ if (txn == NULL)
+ return;
+
+ /*
+ * Process cache invalidation messages if there are any. Even if we're not
+ * interested in the transaction's contents, it could have manipulated the
+ * catalog and we need to update the caches according to that.
+ */
+ if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
+ ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
+ txn->invalidations);
+ else
+ Assert(txn->ninvalidations == 0);
+}
+
+
+/*
* Execute invalidations happening outside the context of a decoded
* transaction. That currently happens either for xid-less commits
* (cf. RecordTransactionCommit()) or for invalidations in uninteresting