aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/decode.c286
-rw-r--r--src/backend/replication/logical/logical.c9
-rw-r--r--src/backend/replication/logical/reorderbuffer.c432
-rw-r--r--src/backend/replication/logical/snapbuild.c7
-rw-r--r--src/include/replication/reorderbuffer.h33
5 files changed, 653 insertions, 114 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 1887ba79440..23ab3cf6052 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -67,13 +67,24 @@ static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf
static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
- xl_xact_parsed_commit *parsed, TransactionId xid);
+ xl_xact_parsed_commit *parsed, TransactionId xid,
+ bool two_phase);
static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
- xl_xact_parsed_abort *parsed, TransactionId xid);
+ xl_xact_parsed_abort *parsed, TransactionId xid,
+ bool two_phase);
+static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+ xl_xact_parsed_prepare *parsed);
+
/* common function to decode tuples */
static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
+/* helper functions for decoding transactions */
+static inline bool FilterPrepare(LogicalDecodingContext *ctx, const char *gid);
+static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
+ XLogRecordBuffer *buf, Oid dbId,
+ RepOriginId origin_id);
+
/*
* Take every XLogReadRecord()ed record and perform the actions required to
* decode it using the output plugin already setup in the logical decoding
@@ -244,6 +255,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
xl_xact_commit *xlrec;
xl_xact_parsed_commit parsed;
TransactionId xid;
+ bool two_phase = false;
xlrec = (xl_xact_commit *) XLogRecGetData(r);
ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
@@ -253,7 +265,15 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
else
xid = parsed.twophase_xid;
- DecodeCommit(ctx, buf, &parsed, xid);
+ /*
+ * We would like to process the transaction in a two-phase
+ * manner iff output plugin supports two-phase commits and
+ * doesn't filter the transaction at prepare time.
+ */
+ if (info == XLOG_XACT_COMMIT_PREPARED)
+ two_phase = !(FilterPrepare(ctx, parsed.twophase_gid));
+
+ DecodeCommit(ctx, buf, &parsed, xid, two_phase);
break;
}
case XLOG_XACT_ABORT:
@@ -262,6 +282,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
xl_xact_abort *xlrec;
xl_xact_parsed_abort parsed;
TransactionId xid;
+ bool two_phase = false;
xlrec = (xl_xact_abort *) XLogRecGetData(r);
ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
@@ -271,7 +292,15 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
else
xid = parsed.twophase_xid;
- DecodeAbort(ctx, buf, &parsed, xid);
+ /*
+ * We would like to process the transaction in a two-phase
+ * manner iff output plugin supports two-phase commits and
+ * doesn't filter the transaction at prepare time.
+ */
+ if (info == XLOG_XACT_ABORT_PREPARED)
+ two_phase = !(FilterPrepare(ctx, parsed.twophase_gid));
+
+ DecodeAbort(ctx, buf, &parsed, xid, two_phase);
break;
}
case XLOG_XACT_ASSIGNMENT:
@@ -312,17 +341,30 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
}
break;
case XLOG_XACT_PREPARE:
+ {
+ xl_xact_parsed_prepare parsed;
+ xl_xact_prepare *xlrec;
- /*
- * Currently decoding ignores PREPARE TRANSACTION and will just
- * decode the transaction when the COMMIT PREPARED is sent or
- * throw away the transaction's contents when a ROLLBACK PREPARED
- * is received. In the future we could add code to expose prepared
- * transactions in the changestream allowing for a kind of
- * distributed 2PC.
- */
- ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
- break;
+ /* ok, parse it */
+ xlrec = (xl_xact_prepare *) XLogRecGetData(r);
+ ParsePrepareRecord(XLogRecGetInfo(buf->record),
+ xlrec, &parsed);
+
+ /*
+ * We would like to process the transaction in a two-phase
+ * manner iff output plugin supports two-phase commits and
+ * doesn't filter the transaction at prepare time.
+ */
+ if (FilterPrepare(ctx, parsed.twophase_gid))
+ {
+ ReorderBufferProcessXid(reorder, parsed.twophase_xid,
+ buf->origptr);
+ break;
+ }
+
+ DecodePrepare(ctx, buf, &parsed);
+ break;
+ }
default:
elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
}
@@ -520,6 +562,32 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
}
}
+/*
+ * Ask output plugin whether we want to skip this PREPARE and send
+ * this transaction as a regular commit later.
+ */
+static inline bool
+FilterPrepare(LogicalDecodingContext *ctx, const char *gid)
+{
+ /*
+ * Skip if decoding of two-phase transactions at PREPARE time is not
+ * enabled. In that case, all two-phase transactions are considered
+ * filtered out and will be applied as regular transactions at COMMIT
+ * PREPARED.
+ */
+ if (!ctx->twophase)
+ return true;
+
+ /*
+ * The filter_prepare callback is optional. When not supplied, all
+ * prepared transactions should go through.
+ */
+ if (ctx->callbacks.filter_prepare_cb == NULL)
+ return false;
+
+ return filter_prepare_cb_wrapper(ctx, gid);
+}
+
static inline bool
FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
{
@@ -582,10 +650,15 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/*
* Consolidated commit record handling between the different form of commit
* records.
+ *
+ * 'two_phase' indicates that caller wants to process the transaction in two
+ * phases, first process prepare if not already done and then process
+ * commit_prepared.
*/
static void
DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
- xl_xact_parsed_commit *parsed, TransactionId xid)
+ xl_xact_parsed_commit *parsed, TransactionId xid,
+ bool two_phase)
{
XLogRecPtr origin_lsn = InvalidXLogRecPtr;
TimestampTz commit_time = parsed->xact_time;
@@ -606,15 +679,6 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
* the reorderbuffer to forget the content of the (sub-)transactions
* if not.
*
- * There can be several reasons we might not be interested in this
- * transaction:
- * 1) We might not be interested in decoding transactions up to this
- * LSN. This can happen because we previously decoded it and now just
- * are restarting or if we haven't assembled a consistent snapshot yet.
- * 2) The transaction happened in another database.
- * 3) The output plugin is not interested in the origin.
- * 4) We are doing fast-forwarding
- *
* We can't just use ReorderBufferAbort() here, because we need to execute
* the transaction's invalidations. This currently won't be needed if
* we're just skipping over the transaction because currently we only do
@@ -627,9 +691,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
* relevant syscaches.
* ---
*/
- if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
- (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) ||
- ctx->fast_forward || FilterByOrigin(ctx, origin_id))
+ if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
{
for (i = 0; i < parsed->nsubxacts; i++)
{
@@ -647,34 +709,163 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
buf->origptr, buf->endptr);
}
+ /*
+ * Send the final commit record if the transaction data is already
+ * decoded, otherwise, process the entire transaction.
+ */
+ if (two_phase)
+ {
+ ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
+ commit_time, origin_id, origin_lsn,
+ parsed->twophase_gid, true);
+ }
+ else
+ {
+ ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
+ commit_time, origin_id, origin_lsn);
+ }
+
+ /*
+ * Update the decoding stats at transaction prepare/commit/abort. It is
+ * not clear that sending more or less frequently than this would be
+ * better.
+ */
+ UpdateDecodingStats(ctx);
+}
+
+/*
+ * Decode PREPARE record. Similar logic as in DecodeCommit.
+ *
+ * Note that we don't skip prepare even if have detected concurrent abort
+ * because it is quite possible that we had already sent some changes before we
+ * detect abort in which case we need to abort those changes in the subscriber.
+ * To abort such changes, we do send the prepare and then the rollback prepared
+ * which is what happened on the publisher-side as well. Now, we can invent a
+ * new abort API wherein in such cases we send abort and skip sending prepared
+ * and rollback prepared but then it is not that straightforward because we
+ * might have streamed this transaction by that time in which case it is
+ * handled when the rollback is encountered. It is not impossible to optimize
+ * the concurrent abort case but it can introduce design complexity w.r.t
+ * handling different cases so leaving it for now as it doesn't seem worth it.
+ */
+static void
+DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+ xl_xact_parsed_prepare *parsed)
+{
+ SnapBuild *builder = ctx->snapshot_builder;
+ XLogRecPtr origin_lsn = parsed->origin_lsn;
+ TimestampTz prepare_time = parsed->xact_time;
+ XLogRecPtr origin_id = XLogRecGetOrigin(buf->record);
+ int i;
+ TransactionId xid = parsed->twophase_xid;
+
+ if (parsed->origin_timestamp != 0)
+ prepare_time = parsed->origin_timestamp;
+
+ /*
+ * Remember the prepare info for a txn so that it can be used later in
+ * commit prepared if required. See ReorderBufferFinishPrepared.
+ */
+ if (!ReorderBufferRememberPrepareInfo(ctx->reorder, xid, buf->origptr,
+ buf->endptr, prepare_time, origin_id,
+ origin_lsn))
+ return;
+
+ /* We can't start streaming unless a consistent state is reached. */
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT)
+ {
+ ReorderBufferSkipPrepare(ctx->reorder, xid);
+ return;
+ }
+
+ /*
+ * Check whether we need to process this transaction. See
+ * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
+ * transaction.
+ *
+ * We can't call ReorderBufferForget as we did in DecodeCommit as the txn
+ * hasn't yet been committed, removing this txn before a commit might
+ * result in the computation of an incorrect restart_lsn. See
+ * SnapBuildProcessRunningXacts. But we need to process cache
+ * invalidations if there are any for the reasons mentioned in
+ * DecodeCommit.
+ */
+ if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
+ {
+ ReorderBufferSkipPrepare(ctx->reorder, xid);
+ ReorderBufferInvalidate(ctx->reorder, xid, buf->origptr);
+ return;
+ }
+
+ /* Tell the reorderbuffer about the surviving subtransactions. */
+ for (i = 0; i < parsed->nsubxacts; i++)
+ {
+ ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
+ buf->origptr, buf->endptr);
+ }
+
/* replay actions of all transaction + subtransactions in order */
- ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
- commit_time, origin_id, origin_lsn);
+ ReorderBufferPrepare(ctx->reorder, xid, parsed->twophase_gid);
/*
- * Update the decoding stats at transaction commit/abort. It is not clear
- * that sending more or less frequently than this would be better.
+ * Update the decoding stats at transaction prepare/commit/abort. It is
+ * not clear that sending more or less frequently than this would be
+ * better.
*/
UpdateDecodingStats(ctx);
}
+
/*
* Get the data from the various forms of abort records and pass it on to
- * snapbuild.c and reorderbuffer.c
+ * snapbuild.c and reorderbuffer.c.
+ *
+ * 'two_phase' indicates to finish prepared transaction.
*/
static void
DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
- xl_xact_parsed_abort *parsed, TransactionId xid)
+ xl_xact_parsed_abort *parsed, TransactionId xid,
+ bool two_phase)
{
int i;
+ XLogRecPtr origin_lsn = InvalidXLogRecPtr;
+ TimestampTz abort_time = parsed->xact_time;
+ XLogRecPtr origin_id = XLogRecGetOrigin(buf->record);
+ bool skip_xact;
- for (i = 0; i < parsed->nsubxacts; i++)
+ if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+ {
+ origin_lsn = parsed->origin_lsn;
+ abort_time = parsed->origin_timestamp;
+ }
+
+ /*
+ * Check whether we need to process this transaction. See
+ * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
+ * transaction.
+ */
+ skip_xact = DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id);
+
+ /*
+ * Send the final rollback record for a prepared transaction unless we
+ * need to skip it. For non-two-phase xacts, simply forget the xact.
+ */
+ if (two_phase && !skip_xact)
{
- ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
- buf->record->EndRecPtr);
+ ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
+ abort_time, origin_id, origin_lsn,
+ parsed->twophase_gid, false);
}
+ else
+ {
+ for (i = 0; i < parsed->nsubxacts; i++)
+ {
+ ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
+ buf->record->EndRecPtr);
+ }
- ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr);
+ ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr);
+ }
/* update the decoding stats */
UpdateDecodingStats(ctx);
@@ -1080,3 +1271,24 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
header->t_infomask2 = xlhdr.t_infomask2;
header->t_hoff = xlhdr.t_hoff;
}
+
+/*
+ * Check whether we are interested in this specific transaction.
+ *
+ * There can be several reasons we might not be interested in this
+ * transaction:
+ * 1) We might not be interested in decoding transactions up to this
+ * LSN. This can happen because we previously decoded it and now just
+ * are restarting or if we haven't assembled a consistent snapshot yet.
+ * 2) The transaction happened in another database.
+ * 3) The output plugin is not interested in the origin.
+ * 4) We are doing fast-forwarding
+ */
+static bool
+DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+ Oid txn_dbid, RepOriginId origin_id)
+{
+ return (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
+ (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
+ ctx->fast_forward || FilterByOrigin(ctx, origin_id));
+}
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index e49b5115175..605ec0986ca 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1083,15 +1083,6 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid)
Assert(!ctx->fast_forward);
- /*
- * Skip if decoding of two-phase transactions at PREPARE time is not
- * enabled. In that case, all two-phase transactions are considered
- * filtered out and will be applied as regular transactions at COMMIT
- * PREPARED.
- */
- if (!ctx->twophase)
- return true;
-
/* Push callback + info on the error context stack */
state.ctx = ctx;
state.callback_name = "filter_prepare";
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index df63b90a67a..315bfe7cae2 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -251,7 +251,8 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
char *change);
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
-static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
+static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
+ bool txn_prepared);
static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
TransactionId xid, XLogSegNo segno);
@@ -422,6 +423,12 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
/* free data that's contained */
+ if (txn->gid != NULL)
+ {
+ pfree(txn->gid);
+ txn->gid = NULL;
+ }
+
if (txn->tuplecid_hash != NULL)
{
hash_destroy(txn->tuplecid_hash);
@@ -1516,12 +1523,18 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
}
/*
- * Discard changes from a transaction (and subtransactions), after streaming
- * them. Keep the remaining info - transactions, tuplecids, invalidations and
- * snapshots.
+ * Discard changes from a transaction (and subtransactions), either after
+ * streaming or decoding them at PREPARE. Keep the remaining info -
+ * transactions, tuplecids, invalidations and snapshots.
+ *
+ * We additionaly remove tuplecids after decoding the transaction at prepare
+ * time as we only need to perform invalidation at rollback or commit prepared.
+ *
+ * 'txn_prepared' indicates that we have decoded the transaction at prepare
+ * time.
*/
static void
-ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
+ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
{
dlist_mutable_iter iter;
@@ -1540,7 +1553,7 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
Assert(rbtxn_is_known_subxact(subtxn));
Assert(subtxn->nsubtxns == 0);
- ReorderBufferTruncateTXN(rb, subtxn);
+ ReorderBufferTruncateTXN(rb, subtxn, txn_prepared);
}
/* cleanup changes in the txn */
@@ -1574,9 +1587,33 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
* about the toplevel xact (we send the XID in all messages), but we never
* stream XIDs of empty subxacts.
*/
- if ((!txn->toptxn) || (txn->nentries_mem != 0))
+ if ((!txn_prepared) && ((!txn->toptxn) || (txn->nentries_mem != 0)))
txn->txn_flags |= RBTXN_IS_STREAMED;
+ if (txn_prepared)
+ {
+ /*
+ * If this is a prepared txn, cleanup the tuplecids we stored for
+ * decoding catalog snapshot access. They are always stored in the
+ * toplevel transaction.
+ */
+ dlist_foreach_modify(iter, &txn->tuplecids)
+ {
+ ReorderBufferChange *change;
+
+ change = dlist_container(ReorderBufferChange, node, iter.cur);
+
+ /* Check we're not mixing changes from different transactions. */
+ Assert(change->txn == txn);
+ Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
+
+ /* Remove the change from its containing list. */
+ dlist_delete(&change->node);
+
+ ReorderBufferReturnChange(rb, change, true);
+ }
+ }
+
/*
* Destroy the (relfilenode, ctid) hashtable, so that we don't leak any
* memory. We could also keep the hash table and update it with new ctid
@@ -1756,9 +1793,10 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
}
/*
- * If the transaction was (partially) streamed, we need to commit it in a
- * 'streamed' way. That is, we first stream the remaining part of the
- * transaction, and then invoke stream_commit message.
+ * If the transaction was (partially) streamed, we need to prepare or commit
+ * it in a 'streamed' way. That is, we first stream the remaining part of the
+ * transaction, and then invoke stream_prepare or stream_commit message as per
+ * the case.
*/
static void
ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
@@ -1768,29 +1806,49 @@ ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
ReorderBufferStreamTXN(rb, txn);
- rb->stream_commit(rb, txn, txn->final_lsn);
+ if (rbtxn_prepared(txn))
+ {
+ /*
+ * Note, we send stream prepare even if a concurrent abort is
+ * detected. See DecodePrepare for more information.
+ */
+ rb->stream_prepare(rb, txn, txn->final_lsn);
- ReorderBufferCleanupTXN(rb, txn);
+ /*
+ * This is a PREPARED transaction, part of a two-phase commit. The
+ * full cleanup will happen as part of the COMMIT PREPAREDs, so now
+ * just truncate txn by removing changes and tuple_cids.
+ */
+ ReorderBufferTruncateTXN(rb, txn, true);
+ /* Reset the CheckXidAlive */
+ CheckXidAlive = InvalidTransactionId;
+ }
+ else
+ {
+ rb->stream_commit(rb, txn, txn->final_lsn);
+ ReorderBufferCleanupTXN(rb, txn);
+ }
}
/*
* Set xid to detect concurrent aborts.
*
- * While streaming an in-progress transaction there is a possibility that the
- * (sub)transaction might get aborted concurrently. In such case if the
- * (sub)transaction has catalog update then we might decode the tuple using
- * wrong catalog version. For example, suppose there is one catalog tuple with
- * (xmin: 500, xmax: 0). Now, the transaction 501 updates the catalog tuple
- * and after that we will have two tuples (xmin: 500, xmax: 501) and
- * (xmin: 501, xmax: 0). Now, if 501 is aborted and some other transaction
- * say 502 updates the same catalog tuple then the first tuple will be changed
- * to (xmin: 500, xmax: 502). So, the problem is that when we try to decode
- * the tuple inserted/updated in 501 after the catalog update, we will see the
- * catalog tuple with (xmin: 500, xmax: 502) as visible because it will
- * consider that the tuple is deleted by xid 502 which is not visible to our
- * snapshot. And when we will try to decode with that catalog tuple, it can
- * lead to a wrong result or a crash. So, it is necessary to detect
- * concurrent aborts to allow streaming of in-progress transactions.
+ * While streaming an in-progress transaction or decoding a prepared
+ * transaction there is a possibility that the (sub)transaction might get
+ * aborted concurrently. In such case if the (sub)transaction has catalog
+ * update then we might decode the tuple using wrong catalog version. For
+ * example, suppose there is one catalog tuple with (xmin: 500, xmax: 0). Now,
+ * the transaction 501 updates the catalog tuple and after that we will have
+ * two tuples (xmin: 500, xmax: 501) and (xmin: 501, xmax: 0). Now, if 501 is
+ * aborted and some other transaction say 502 updates the same catalog tuple
+ * then the first tuple will be changed to (xmin: 500, xmax: 502). So, the
+ * problem is that when we try to decode the tuple inserted/updated in 501
+ * after the catalog update, we will see the catalog tuple with (xmin: 500,
+ * xmax: 502) as visible because it will consider that the tuple is deleted by
+ * xid 502 which is not visible to our snapshot. And when we will try to
+ * decode with that catalog tuple, it can lead to a wrong result or a crash.
+ * So, it is necessary to detect concurrent aborts to allow streaming of
+ * in-progress transactions or decoding of prepared transactions.
*
* For detecting the concurrent abort we set CheckXidAlive to the current
* (sub)transaction's xid for which this change belongs to. And, during
@@ -1799,7 +1857,10 @@ ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
* and discard the already streamed changes on such an error. We might have
* already streamed some of the changes for the aborted (sub)transaction, but
* that is fine because when we decode the abort we will stream abort message
- * to truncate the changes in the subscriber.
+ * to truncate the changes in the subscriber. Similarly, for prepared
+ * transactions, we stop decoding if concurrent abort is detected and then
+ * rollback the changes when rollback prepared is encountered. See
+ * DecodePreare.
*/
static inline void
SetupCheckXidLive(TransactionId xid)
@@ -1901,7 +1962,7 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
ReorderBufferChange *specinsert)
{
/* Discard the changes that we just streamed */
- ReorderBufferTruncateTXN(rb, txn);
+ ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
/* Free all resources allocated for toast reconstruction */
ReorderBufferToastReset(rb, txn);
@@ -1913,15 +1974,19 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
specinsert = NULL;
}
- /* Stop the stream. */
- rb->stream_stop(rb, txn, last_lsn);
-
- /* Remember the command ID and snapshot for the streaming run */
- ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
+ /*
+ * For the streaming case, stop the stream and remember the command ID and
+ * snapshot for the streaming run.
+ */
+ if (rbtxn_is_streamed(txn))
+ {
+ rb->stream_stop(rb, txn, last_lsn);
+ ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
+ }
}
/*
- * Helper function for ReorderBufferCommit and ReorderBufferStreamTXN.
+ * Helper function for ReorderBufferReplay and ReorderBufferStreamTXN.
*
* Send data of a transaction (and its subtransactions) to the
* output plugin. We iterate over the top and subtransactions (using a k-way
@@ -1974,9 +2039,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
else
StartTransactionCommand();
- /* We only need to send begin/commit for non-streamed transactions. */
+ /*
+ * We only need to send begin/begin-prepare for non-streamed
+ * transactions.
+ */
if (!streaming)
- rb->begin(rb, txn);
+ {
+ if (rbtxn_prepared(txn))
+ rb->begin_prepare(rb, txn);
+ else
+ rb->begin(rb, txn);
+ }
ReorderBufferIterTXNInit(rb, txn, &iterstate);
while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
@@ -2007,8 +2080,12 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
prev_lsn = change->lsn;
- /* Set the current xid to detect concurrent aborts. */
- if (streaming)
+ /*
+ * Set the current xid to detect concurrent aborts. This is
+ * required for the cases when we decode the changes before the
+ * COMMIT record is processed.
+ */
+ if (streaming || rbtxn_prepared(change->txn))
{
curtxn = change->txn;
SetupCheckXidLive(curtxn->xid);
@@ -2299,7 +2376,16 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
}
}
else
- rb->commit(rb, txn, commit_lsn);
+ {
+ /*
+ * Call either PREPARE (for two-phase transactions) or COMMIT (for
+ * regular ones).
+ */
+ if (rbtxn_prepared(txn))
+ rb->prepare(rb, txn, commit_lsn);
+ else
+ rb->commit(rb, txn, commit_lsn);
+ }
/* this is just a sanity check against bad output plugin behaviour */
if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
@@ -2333,15 +2419,22 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
RollbackAndReleaseCurrentSubTransaction();
/*
- * If we are streaming the in-progress transaction then discard the
- * changes that we just streamed, and mark the transactions as
- * streamed (if they contained changes). Otherwise, remove all the
- * changes and deallocate the ReorderBufferTXN.
+ * We are here due to one of the four reasons: 1. Decoding an
+ * in-progress txn. 2. Decoding a prepared txn. 3. Decoding of a
+ * prepared txn that was (partially) streamed. 4. Decoding a committed
+ * txn.
+ *
+ * For 1, we allow truncation of txn data by removing the changes
+ * already streamed but still keeping other things like invalidations,
+ * snapshot, and tuplecids. For 2 and 3, we indicate
+ * ReorderBufferTruncateTXN to do more elaborate truncation of txn
+ * data as the entire transaction has been decoded except for commit.
+ * For 4, as the entire txn has been decoded, we can fully clean up
+ * the TXN reorder buffer.
*/
- if (streaming)
+ if (streaming || rbtxn_prepared(txn))
{
- ReorderBufferTruncateTXN(rb, txn);
-
+ ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
/* Reset the CheckXidAlive */
CheckXidAlive = InvalidTransactionId;
}
@@ -2374,17 +2467,20 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
/*
* The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
- * abort of the (sub)transaction we are streaming. We need to do the
- * cleanup and return gracefully on this error, see SetupCheckXidLive.
+ * abort of the (sub)transaction we are streaming or preparing. We
+ * need to do the cleanup and return gracefully on this error, see
+ * SetupCheckXidLive.
*/
if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK)
{
/*
- * This error can only occur when we are sending the data in
- * streaming mode and the streaming is not finished yet.
+ * This error can occur either when we are sending the data in
+ * streaming mode and the streaming is not finished yet or when we
+ * are sending the data out on a PREPARE during a two-phase
+ * commit.
*/
- Assert(streaming);
- Assert(stream_started);
+ Assert(streaming || rbtxn_prepared(txn));
+ Assert(stream_started || rbtxn_prepared(txn));
/* Cleanup the temporary error state. */
FlushErrorState();
@@ -2414,26 +2510,19 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
* ReorderBufferCommitChild(), even if previously assigned to the toplevel
* transaction with ReorderBufferAssignChild.
*
- * This interface is called once a toplevel commit is read for both streamed
- * as well as non-streamed transactions.
+ * This interface is called once a prepare or toplevel commit is read for both
+ * streamed as well as non-streamed transactions.
*/
-void
-ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
+static void
+ReorderBufferReplay(ReorderBufferTXN *txn,
+ ReorderBuffer *rb, TransactionId xid,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
TimestampTz commit_time,
RepOriginId origin_id, XLogRecPtr origin_lsn)
{
- ReorderBufferTXN *txn;
Snapshot snapshot_now;
CommandId command_id = FirstCommandId;
- txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
- false);
-
- /* unknown transaction, nothing to replay */
- if (txn == NULL)
- return;
-
txn->final_lsn = commit_lsn;
txn->end_lsn = end_lsn;
txn->commit_time = commit_time;
@@ -2463,7 +2552,13 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
if (txn->base_snapshot == NULL)
{
Assert(txn->ninvalidations == 0);
- ReorderBufferCleanupTXN(rb, txn);
+
+ /*
+ * Removing this txn before a commit might result in the computation
+ * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts.
+ */
+ if (!rbtxn_prepared(txn))
+ ReorderBufferCleanupTXN(rb, txn);
return;
}
@@ -2475,6 +2570,178 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
}
/*
+ * Commit a transaction.
+ *
+ * See comments for ReorderBufferReplay().
+ */
+void
+ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+ TimestampTz commit_time,
+ RepOriginId origin_id, XLogRecPtr origin_lsn)
+{
+ ReorderBufferTXN *txn;
+
+ txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+ false);
+
+ /* unknown transaction, nothing to replay */
+ if (txn == NULL)
+ return;
+
+ ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
+ origin_id, origin_lsn);
+}
+
+/*
+ * Record the prepare information for a transaction.
+ */
+bool
+ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr prepare_lsn, XLogRecPtr end_lsn,
+ TimestampTz prepare_time,
+ RepOriginId origin_id, XLogRecPtr origin_lsn)
+{
+ ReorderBufferTXN *txn;
+
+ txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
+
+ /* unknown transaction, nothing to do */
+ if (txn == NULL)
+ return false;
+
+ /*
+ * Remember the prepare information to be later used by commit prepared in
+ * case we skip doing prepare.
+ */
+ txn->final_lsn = prepare_lsn;
+ txn->end_lsn = end_lsn;
+ txn->commit_time = prepare_time;
+ txn->origin_id = origin_id;
+ txn->origin_lsn = origin_lsn;
+
+ return true;
+}
+
+/* Remember that we have skipped prepare */
+void
+ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid)
+{
+ ReorderBufferTXN *txn;
+
+ txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
+
+ /* unknown transaction, nothing to do */
+ if (txn == NULL)
+ return;
+
+ txn->txn_flags |= RBTXN_SKIPPED_PREPARE;
+}
+
+/*
+ * Prepare a two-phase transaction.
+ *
+ * See comments for ReorderBufferReplay().
+ */
+void
+ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
+ char *gid)
+{
+ ReorderBufferTXN *txn;
+
+ txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+ false);
+
+ /* unknown transaction, nothing to replay */
+ if (txn == NULL)
+ return;
+
+ txn->txn_flags |= RBTXN_PREPARE;
+ txn->gid = pstrdup(gid);
+
+ /* The prepare info must have been updated in txn by now. */
+ Assert(txn->final_lsn != InvalidXLogRecPtr);
+
+ ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
+ txn->commit_time, txn->origin_id, txn->origin_lsn);
+}
+
+/*
+ * This is used to handle COMMIT/ROLLBACK PREPARED.
+ */
+void
+ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+ TimestampTz commit_time, RepOriginId origin_id,
+ XLogRecPtr origin_lsn, char *gid, bool is_commit)
+{
+ ReorderBufferTXN *txn;
+ XLogRecPtr prepare_end_lsn;
+ TimestampTz prepare_time;
+
+ txn = ReorderBufferTXNByXid(rb, xid, true, NULL, commit_lsn, false);
+
+ /* unknown transaction, nothing to do */
+ if (txn == NULL)
+ return;
+
+ /*
+ * By this time the txn has the prepare record information, remember it to
+ * be later used for rollback.
+ */
+ prepare_end_lsn = txn->end_lsn;
+ prepare_time = txn->commit_time;
+
+ /* add the gid in the txn */
+ txn->gid = pstrdup(gid);
+
+ /*
+ * It is possible that this transaction is not decoded at prepare time
+ * either because by that time we didn't have a consistent snapshot or it
+ * was decoded earlier but we have restarted. We can't distinguish between
+ * those two cases so we send the prepare in both the cases and let
+ * downstream decide whether to process or skip it. We don't need to
+ * decode the xact for aborts if it is not done already.
+ */
+ if (!rbtxn_prepared(txn) && is_commit)
+ {
+ txn->txn_flags |= RBTXN_PREPARE;
+
+ /*
+ * The prepare info must have been updated in txn even if we skip
+ * prepare.
+ */
+ Assert(txn->final_lsn != InvalidXLogRecPtr);
+
+ /*
+ * By this time the txn has the prepare record information and it is
+ * important to use that so that downstream gets the accurate
+ * information. If instead, we have passed commit information here
+ * then downstream can behave as it has already replayed commit
+ * prepared after the restart.
+ */
+ ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
+ txn->commit_time, txn->origin_id, txn->origin_lsn);
+ }
+
+ txn->final_lsn = commit_lsn;
+ txn->end_lsn = end_lsn;
+ txn->commit_time = commit_time;
+ txn->origin_id = origin_id;
+ txn->origin_lsn = origin_lsn;
+
+ if (is_commit)
+ rb->commit_prepared(rb, txn, commit_lsn);
+ else
+ rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
+
+ /* cleanup: make sure there's no cache pollution */
+ ReorderBufferExecuteInvalidations(txn->ninvalidations,
+ txn->invalidations);
+ ReorderBufferCleanupTXN(rb, txn);
+}
+
+/*
* Abort a transaction that possibly has previous changes. Needs to be first
* called for subtransactions and then for the toplevel xid.
*
@@ -2606,6 +2873,39 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
}
/*
+ * Invalidate cache for those transactions that need to be skipped just in case
+ * catalogs were manipulated as part of the transaction.
+ *
+ * Note that this is a special-purpose function for prepared transactions where
+ * we don't want to clean up the TXN even when we decide to skip it. See
+ * DecodePrepare.
+ */
+void
+ReorderBufferInvalidate(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
+{
+ ReorderBufferTXN *txn;
+
+ txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+ false);
+
+ /* unknown, nothing to do */
+ if (txn == NULL)
+ return;
+
+ /*
+ * Process cache invalidation messages if there are any. Even if we're not
+ * interested in the transaction's contents, it could have manipulated the
+ * catalog and we need to update the caches according to that.
+ */
+ if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
+ ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
+ txn->invalidations);
+ else
+ Assert(txn->ninvalidations == 0);
+}
+
+
+/*
* Execute invalidations happening outside the context of a decoded
* transaction. That currently happens either for xid-less commits
* (cf. RecordTransactionCommit()) or for invalidations in uninteresting
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 6afc25e8d3d..15b07a54c11 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -834,6 +834,13 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
continue;
+ /*
+ * We don't need to add snapshot to prepared transactions as they
+ * should not see the new catalog contents.
+ */
+ if (rbtxn_prepared(txn) || rbtxn_skip_prepared(txn))
+ continue;
+
elog(DEBUG2, "adding a new snapshot to %u at %X/%X",
txn->xid, (uint32) (lsn >> 32), (uint32) lsn);
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 9f982137d93..bab31bf7af7 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -174,6 +174,8 @@ typedef struct ReorderBufferChange
#define RBTXN_IS_STREAMED 0x0010
#define RBTXN_HAS_TOAST_INSERT 0x0020
#define RBTXN_HAS_SPEC_INSERT 0x0040
+#define RBTXN_PREPARE 0x0080
+#define RBTXN_SKIPPED_PREPARE 0x0100
/* Does the transaction have catalog changes? */
#define rbtxn_has_catalog_changes(txn) \
@@ -233,6 +235,18 @@ typedef struct ReorderBufferChange
((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \
)
+/* Has this transaction been prepared? */
+#define rbtxn_prepared(txn) \
+( \
+ ((txn)->txn_flags & RBTXN_PREPARE) != 0 \
+)
+
+/* prepare for this transaction skipped? */
+#define rbtxn_skip_prepared(txn) \
+( \
+ ((txn)->txn_flags & RBTXN_SKIPPED_PREPARE) != 0 \
+)
+
typedef struct ReorderBufferTXN
{
/* See above */
@@ -258,10 +272,11 @@ typedef struct ReorderBufferTXN
XLogRecPtr first_lsn;
/* ----
- * LSN of the record that lead to this xact to be committed or
+ * LSN of the record that lead to this xact to be prepared or committed or
* aborted. This can be a
* * plain commit record
* * plain commit record, of a parent transaction
+ * * prepared tansaction
* * prepared transaction commit
* * plain abort record
* * prepared transaction abort
@@ -293,7 +308,8 @@ typedef struct ReorderBufferTXN
XLogRecPtr origin_lsn;
/*
- * Commit time, only known when we read the actual commit record.
+ * Commit or Prepare time, only known when we read the actual commit or
+ * prepare record.
*/
TimestampTz commit_time;
@@ -625,12 +641,18 @@ void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapsho
void ReorderBufferCommit(ReorderBuffer *, TransactionId,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
+void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+ TimestampTz commit_time,
+ RepOriginId origin_id, XLogRecPtr origin_lsn,
+ char *gid, bool is_commit);
void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn);
void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn);
void ReorderBufferAbort(ReorderBuffer *, TransactionId, XLogRecPtr lsn);
void ReorderBufferAbortOld(ReorderBuffer *, TransactionId xid);
void ReorderBufferForget(ReorderBuffer *, TransactionId, XLogRecPtr lsn);
+void ReorderBufferInvalidate(ReorderBuffer *, TransactionId, XLogRecPtr lsn);
void ReorderBufferSetBaseSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap);
void ReorderBufferAddSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap);
@@ -644,10 +666,17 @@ void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr l
void ReorderBufferImmediateInvalidation(ReorderBuffer *, uint32 ninvalidations,
SharedInvalidationMessage *invalidations);
void ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
+
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid);
+bool ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid,
+ XLogRecPtr prepare_lsn, XLogRecPtr end_lsn,
+ TimestampTz prepare_time,
+ RepOriginId origin_id, XLogRecPtr origin_lsn);
+void ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid);
+void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, char *gid);
ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *);
TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb);