aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/decode.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r--src/backend/replication/logical/decode.c286
1 files changed, 249 insertions, 37 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 1887ba79440..23ab3cf6052 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -67,13 +67,24 @@ static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf
static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
- xl_xact_parsed_commit *parsed, TransactionId xid);
+ xl_xact_parsed_commit *parsed, TransactionId xid,
+ bool two_phase);
static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
- xl_xact_parsed_abort *parsed, TransactionId xid);
+ xl_xact_parsed_abort *parsed, TransactionId xid,
+ bool two_phase);
+static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+ xl_xact_parsed_prepare *parsed);
+
/* common function to decode tuples */
static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
+/* helper functions for decoding transactions */
+static inline bool FilterPrepare(LogicalDecodingContext *ctx, const char *gid);
+static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
+ XLogRecordBuffer *buf, Oid dbId,
+ RepOriginId origin_id);
+
/*
* Take every XLogReadRecord()ed record and perform the actions required to
* decode it using the output plugin already setup in the logical decoding
@@ -244,6 +255,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
xl_xact_commit *xlrec;
xl_xact_parsed_commit parsed;
TransactionId xid;
+ bool two_phase = false;
xlrec = (xl_xact_commit *) XLogRecGetData(r);
ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
@@ -253,7 +265,15 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
else
xid = parsed.twophase_xid;
- DecodeCommit(ctx, buf, &parsed, xid);
+ /*
+ * We would like to process the transaction in a two-phase
+ * manner iff output plugin supports two-phase commits and
+ * doesn't filter the transaction at prepare time.
+ */
+ if (info == XLOG_XACT_COMMIT_PREPARED)
+ two_phase = !(FilterPrepare(ctx, parsed.twophase_gid));
+
+ DecodeCommit(ctx, buf, &parsed, xid, two_phase);
break;
}
case XLOG_XACT_ABORT:
@@ -262,6 +282,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
xl_xact_abort *xlrec;
xl_xact_parsed_abort parsed;
TransactionId xid;
+ bool two_phase = false;
xlrec = (xl_xact_abort *) XLogRecGetData(r);
ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
@@ -271,7 +292,15 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
else
xid = parsed.twophase_xid;
- DecodeAbort(ctx, buf, &parsed, xid);
+ /*
+ * We would like to process the transaction in a two-phase
+ * manner iff output plugin supports two-phase commits and
+ * doesn't filter the transaction at prepare time.
+ */
+ if (info == XLOG_XACT_ABORT_PREPARED)
+ two_phase = !(FilterPrepare(ctx, parsed.twophase_gid));
+
+ DecodeAbort(ctx, buf, &parsed, xid, two_phase);
break;
}
case XLOG_XACT_ASSIGNMENT:
@@ -312,17 +341,30 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
}
break;
case XLOG_XACT_PREPARE:
+ {
+ xl_xact_parsed_prepare parsed;
+ xl_xact_prepare *xlrec;
- /*
- * Currently decoding ignores PREPARE TRANSACTION and will just
- * decode the transaction when the COMMIT PREPARED is sent or
- * throw away the transaction's contents when a ROLLBACK PREPARED
- * is received. In the future we could add code to expose prepared
- * transactions in the changestream allowing for a kind of
- * distributed 2PC.
- */
- ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
- break;
+ /* ok, parse it */
+ xlrec = (xl_xact_prepare *) XLogRecGetData(r);
+ ParsePrepareRecord(XLogRecGetInfo(buf->record),
+ xlrec, &parsed);
+
+ /*
+ * We would like to process the transaction in a two-phase
+ * manner iff output plugin supports two-phase commits and
+ * doesn't filter the transaction at prepare time.
+ */
+ if (FilterPrepare(ctx, parsed.twophase_gid))
+ {
+ ReorderBufferProcessXid(reorder, parsed.twophase_xid,
+ buf->origptr);
+ break;
+ }
+
+ DecodePrepare(ctx, buf, &parsed);
+ break;
+ }
default:
elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
}
@@ -520,6 +562,32 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
}
}
+/*
+ * Ask output plugin whether we want to skip this PREPARE and send
+ * this transaction as a regular commit later.
+ */
+static inline bool
+FilterPrepare(LogicalDecodingContext *ctx, const char *gid)
+{
+ /*
+ * Skip if decoding of two-phase transactions at PREPARE time is not
+ * enabled. In that case, all two-phase transactions are considered
+ * filtered out and will be applied as regular transactions at COMMIT
+ * PREPARED.
+ */
+ if (!ctx->twophase)
+ return true;
+
+ /*
+ * The filter_prepare callback is optional. When not supplied, all
+ * prepared transactions should go through.
+ */
+ if (ctx->callbacks.filter_prepare_cb == NULL)
+ return false;
+
+ return filter_prepare_cb_wrapper(ctx, gid);
+}
+
static inline bool
FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
{
@@ -582,10 +650,15 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/*
* Consolidated commit record handling between the different form of commit
* records.
+ *
+ * 'two_phase' indicates that caller wants to process the transaction in two
+ * phases, first process prepare if not already done and then process
+ * commit_prepared.
*/
static void
DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
- xl_xact_parsed_commit *parsed, TransactionId xid)
+ xl_xact_parsed_commit *parsed, TransactionId xid,
+ bool two_phase)
{
XLogRecPtr origin_lsn = InvalidXLogRecPtr;
TimestampTz commit_time = parsed->xact_time;
@@ -606,15 +679,6 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
* the reorderbuffer to forget the content of the (sub-)transactions
* if not.
*
- * There can be several reasons we might not be interested in this
- * transaction:
- * 1) We might not be interested in decoding transactions up to this
- * LSN. This can happen because we previously decoded it and now just
- * are restarting or if we haven't assembled a consistent snapshot yet.
- * 2) The transaction happened in another database.
- * 3) The output plugin is not interested in the origin.
- * 4) We are doing fast-forwarding
- *
* We can't just use ReorderBufferAbort() here, because we need to execute
* the transaction's invalidations. This currently won't be needed if
* we're just skipping over the transaction because currently we only do
@@ -627,9 +691,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
* relevant syscaches.
* ---
*/
- if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
- (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) ||
- ctx->fast_forward || FilterByOrigin(ctx, origin_id))
+ if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
{
for (i = 0; i < parsed->nsubxacts; i++)
{
@@ -647,34 +709,163 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
buf->origptr, buf->endptr);
}
+ /*
+ * Send the final commit record if the transaction data is already
+ * decoded, otherwise, process the entire transaction.
+ */
+ if (two_phase)
+ {
+ ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
+ commit_time, origin_id, origin_lsn,
+ parsed->twophase_gid, true);
+ }
+ else
+ {
+ ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
+ commit_time, origin_id, origin_lsn);
+ }
+
+ /*
+ * Update the decoding stats at transaction prepare/commit/abort. It is
+ * not clear that sending more or less frequently than this would be
+ * better.
+ */
+ UpdateDecodingStats(ctx);
+}
+
+/*
+ * Decode PREPARE record. Similar logic as in DecodeCommit.
+ *
+ * Note that we don't skip prepare even if have detected concurrent abort
+ * because it is quite possible that we had already sent some changes before we
+ * detect abort in which case we need to abort those changes in the subscriber.
+ * To abort such changes, we do send the prepare and then the rollback prepared
+ * which is what happened on the publisher-side as well. Now, we can invent a
+ * new abort API wherein in such cases we send abort and skip sending prepared
+ * and rollback prepared but then it is not that straightforward because we
+ * might have streamed this transaction by that time in which case it is
+ * handled when the rollback is encountered. It is not impossible to optimize
+ * the concurrent abort case but it can introduce design complexity w.r.t
+ * handling different cases so leaving it for now as it doesn't seem worth it.
+ */
+static void
+DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+ xl_xact_parsed_prepare *parsed)
+{
+ SnapBuild *builder = ctx->snapshot_builder;
+ XLogRecPtr origin_lsn = parsed->origin_lsn;
+ TimestampTz prepare_time = parsed->xact_time;
+ XLogRecPtr origin_id = XLogRecGetOrigin(buf->record);
+ int i;
+ TransactionId xid = parsed->twophase_xid;
+
+ if (parsed->origin_timestamp != 0)
+ prepare_time = parsed->origin_timestamp;
+
+ /*
+ * Remember the prepare info for a txn so that it can be used later in
+ * commit prepared if required. See ReorderBufferFinishPrepared.
+ */
+ if (!ReorderBufferRememberPrepareInfo(ctx->reorder, xid, buf->origptr,
+ buf->endptr, prepare_time, origin_id,
+ origin_lsn))
+ return;
+
+ /* We can't start streaming unless a consistent state is reached. */
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT)
+ {
+ ReorderBufferSkipPrepare(ctx->reorder, xid);
+ return;
+ }
+
+ /*
+ * Check whether we need to process this transaction. See
+ * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
+ * transaction.
+ *
+ * We can't call ReorderBufferForget as we did in DecodeCommit as the txn
+ * hasn't yet been committed, removing this txn before a commit might
+ * result in the computation of an incorrect restart_lsn. See
+ * SnapBuildProcessRunningXacts. But we need to process cache
+ * invalidations if there are any for the reasons mentioned in
+ * DecodeCommit.
+ */
+ if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
+ {
+ ReorderBufferSkipPrepare(ctx->reorder, xid);
+ ReorderBufferInvalidate(ctx->reorder, xid, buf->origptr);
+ return;
+ }
+
+ /* Tell the reorderbuffer about the surviving subtransactions. */
+ for (i = 0; i < parsed->nsubxacts; i++)
+ {
+ ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
+ buf->origptr, buf->endptr);
+ }
+
/* replay actions of all transaction + subtransactions in order */
- ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
- commit_time, origin_id, origin_lsn);
+ ReorderBufferPrepare(ctx->reorder, xid, parsed->twophase_gid);
/*
- * Update the decoding stats at transaction commit/abort. It is not clear
- * that sending more or less frequently than this would be better.
+ * Update the decoding stats at transaction prepare/commit/abort. It is
+ * not clear that sending more or less frequently than this would be
+ * better.
*/
UpdateDecodingStats(ctx);
}
+
/*
* Get the data from the various forms of abort records and pass it on to
- * snapbuild.c and reorderbuffer.c
+ * snapbuild.c and reorderbuffer.c.
+ *
+ * 'two_phase' indicates to finish prepared transaction.
*/
static void
DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
- xl_xact_parsed_abort *parsed, TransactionId xid)
+ xl_xact_parsed_abort *parsed, TransactionId xid,
+ bool two_phase)
{
int i;
+ XLogRecPtr origin_lsn = InvalidXLogRecPtr;
+ TimestampTz abort_time = parsed->xact_time;
+ XLogRecPtr origin_id = XLogRecGetOrigin(buf->record);
+ bool skip_xact;
- for (i = 0; i < parsed->nsubxacts; i++)
+ if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+ {
+ origin_lsn = parsed->origin_lsn;
+ abort_time = parsed->origin_timestamp;
+ }
+
+ /*
+ * Check whether we need to process this transaction. See
+ * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
+ * transaction.
+ */
+ skip_xact = DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id);
+
+ /*
+ * Send the final rollback record for a prepared transaction unless we
+ * need to skip it. For non-two-phase xacts, simply forget the xact.
+ */
+ if (two_phase && !skip_xact)
{
- ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
- buf->record->EndRecPtr);
+ ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
+ abort_time, origin_id, origin_lsn,
+ parsed->twophase_gid, false);
}
+ else
+ {
+ for (i = 0; i < parsed->nsubxacts; i++)
+ {
+ ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
+ buf->record->EndRecPtr);
+ }
- ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr);
+ ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr);
+ }
/* update the decoding stats */
UpdateDecodingStats(ctx);
@@ -1080,3 +1271,24 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
header->t_infomask2 = xlhdr.t_infomask2;
header->t_hoff = xlhdr.t_hoff;
}
+
+/*
+ * Check whether we are interested in this specific transaction.
+ *
+ * There can be several reasons we might not be interested in this
+ * transaction:
+ * 1) We might not be interested in decoding transactions up to this
+ * LSN. This can happen because we previously decoded it and now just
+ * are restarting or if we haven't assembled a consistent snapshot yet.
+ * 2) The transaction happened in another database.
+ * 3) The output plugin is not interested in the origin.
+ * 4) We are doing fast-forwarding
+ */
+static bool
+DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+ Oid txn_dbid, RepOriginId origin_id)
+{
+ return (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
+ (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
+ ctx->fast_forward || FilterByOrigin(ctx, origin_id));
+}