diff options
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r-- | src/backend/replication/logical/decode.c | 286 |
1 files changed, 249 insertions, 37 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 1887ba79440..23ab3cf6052 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -67,13 +67,24 @@ static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, - xl_xact_parsed_commit *parsed, TransactionId xid); + xl_xact_parsed_commit *parsed, TransactionId xid, + bool two_phase); static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, - xl_xact_parsed_abort *parsed, TransactionId xid); + xl_xact_parsed_abort *parsed, TransactionId xid, + bool two_phase); +static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, + xl_xact_parsed_prepare *parsed); + /* common function to decode tuples */ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); +/* helper functions for decoding transactions */ +static inline bool FilterPrepare(LogicalDecodingContext *ctx, const char *gid); +static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx, + XLogRecordBuffer *buf, Oid dbId, + RepOriginId origin_id); + /* * Take every XLogReadRecord()ed record and perform the actions required to * decode it using the output plugin already setup in the logical decoding @@ -244,6 +255,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) xl_xact_commit *xlrec; xl_xact_parsed_commit parsed; TransactionId xid; + bool two_phase = false; xlrec = (xl_xact_commit *) XLogRecGetData(r); ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed); @@ -253,7 +265,15 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) else xid = parsed.twophase_xid; - DecodeCommit(ctx, buf, &parsed, xid); + /* + * We would like to process the transaction in a two-phase + * manner iff output plugin supports two-phase commits and + * doesn't filter the transaction at prepare time. + */ + if (info == XLOG_XACT_COMMIT_PREPARED) + two_phase = !(FilterPrepare(ctx, parsed.twophase_gid)); + + DecodeCommit(ctx, buf, &parsed, xid, two_phase); break; } case XLOG_XACT_ABORT: @@ -262,6 +282,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) xl_xact_abort *xlrec; xl_xact_parsed_abort parsed; TransactionId xid; + bool two_phase = false; xlrec = (xl_xact_abort *) XLogRecGetData(r); ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed); @@ -271,7 +292,15 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) else xid = parsed.twophase_xid; - DecodeAbort(ctx, buf, &parsed, xid); + /* + * We would like to process the transaction in a two-phase + * manner iff output plugin supports two-phase commits and + * doesn't filter the transaction at prepare time. + */ + if (info == XLOG_XACT_ABORT_PREPARED) + two_phase = !(FilterPrepare(ctx, parsed.twophase_gid)); + + DecodeAbort(ctx, buf, &parsed, xid, two_phase); break; } case XLOG_XACT_ASSIGNMENT: @@ -312,17 +341,30 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } break; case XLOG_XACT_PREPARE: + { + xl_xact_parsed_prepare parsed; + xl_xact_prepare *xlrec; - /* - * Currently decoding ignores PREPARE TRANSACTION and will just - * decode the transaction when the COMMIT PREPARED is sent or - * throw away the transaction's contents when a ROLLBACK PREPARED - * is received. In the future we could add code to expose prepared - * transactions in the changestream allowing for a kind of - * distributed 2PC. - */ - ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr); - break; + /* ok, parse it */ + xlrec = (xl_xact_prepare *) XLogRecGetData(r); + ParsePrepareRecord(XLogRecGetInfo(buf->record), + xlrec, &parsed); + + /* + * We would like to process the transaction in a two-phase + * manner iff output plugin supports two-phase commits and + * doesn't filter the transaction at prepare time. + */ + if (FilterPrepare(ctx, parsed.twophase_gid)) + { + ReorderBufferProcessXid(reorder, parsed.twophase_xid, + buf->origptr); + break; + } + + DecodePrepare(ctx, buf, &parsed); + break; + } default: elog(ERROR, "unexpected RM_XACT_ID record type: %u", info); } @@ -520,6 +562,32 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } } +/* + * Ask output plugin whether we want to skip this PREPARE and send + * this transaction as a regular commit later. + */ +static inline bool +FilterPrepare(LogicalDecodingContext *ctx, const char *gid) +{ + /* + * Skip if decoding of two-phase transactions at PREPARE time is not + * enabled. In that case, all two-phase transactions are considered + * filtered out and will be applied as regular transactions at COMMIT + * PREPARED. + */ + if (!ctx->twophase) + return true; + + /* + * The filter_prepare callback is optional. When not supplied, all + * prepared transactions should go through. + */ + if (ctx->callbacks.filter_prepare_cb == NULL) + return false; + + return filter_prepare_cb_wrapper(ctx, gid); +} + static inline bool FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id) { @@ -582,10 +650,15 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * Consolidated commit record handling between the different form of commit * records. + * + * 'two_phase' indicates that caller wants to process the transaction in two + * phases, first process prepare if not already done and then process + * commit_prepared. */ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, - xl_xact_parsed_commit *parsed, TransactionId xid) + xl_xact_parsed_commit *parsed, TransactionId xid, + bool two_phase) { XLogRecPtr origin_lsn = InvalidXLogRecPtr; TimestampTz commit_time = parsed->xact_time; @@ -606,15 +679,6 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, * the reorderbuffer to forget the content of the (sub-)transactions * if not. * - * There can be several reasons we might not be interested in this - * transaction: - * 1) We might not be interested in decoding transactions up to this - * LSN. This can happen because we previously decoded it and now just - * are restarting or if we haven't assembled a consistent snapshot yet. - * 2) The transaction happened in another database. - * 3) The output plugin is not interested in the origin. - * 4) We are doing fast-forwarding - * * We can't just use ReorderBufferAbort() here, because we need to execute * the transaction's invalidations. This currently won't be needed if * we're just skipping over the transaction because currently we only do @@ -627,9 +691,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, * relevant syscaches. * --- */ - if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || - (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) || - ctx->fast_forward || FilterByOrigin(ctx, origin_id)) + if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id)) { for (i = 0; i < parsed->nsubxacts; i++) { @@ -647,34 +709,163 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, buf->origptr, buf->endptr); } + /* + * Send the final commit record if the transaction data is already + * decoded, otherwise, process the entire transaction. + */ + if (two_phase) + { + ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn, + parsed->twophase_gid, true); + } + else + { + ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn); + } + + /* + * Update the decoding stats at transaction prepare/commit/abort. It is + * not clear that sending more or less frequently than this would be + * better. + */ + UpdateDecodingStats(ctx); +} + +/* + * Decode PREPARE record. Similar logic as in DecodeCommit. + * + * Note that we don't skip prepare even if have detected concurrent abort + * because it is quite possible that we had already sent some changes before we + * detect abort in which case we need to abort those changes in the subscriber. + * To abort such changes, we do send the prepare and then the rollback prepared + * which is what happened on the publisher-side as well. Now, we can invent a + * new abort API wherein in such cases we send abort and skip sending prepared + * and rollback prepared but then it is not that straightforward because we + * might have streamed this transaction by that time in which case it is + * handled when the rollback is encountered. It is not impossible to optimize + * the concurrent abort case but it can introduce design complexity w.r.t + * handling different cases so leaving it for now as it doesn't seem worth it. + */ +static void +DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, + xl_xact_parsed_prepare *parsed) +{ + SnapBuild *builder = ctx->snapshot_builder; + XLogRecPtr origin_lsn = parsed->origin_lsn; + TimestampTz prepare_time = parsed->xact_time; + XLogRecPtr origin_id = XLogRecGetOrigin(buf->record); + int i; + TransactionId xid = parsed->twophase_xid; + + if (parsed->origin_timestamp != 0) + prepare_time = parsed->origin_timestamp; + + /* + * Remember the prepare info for a txn so that it can be used later in + * commit prepared if required. See ReorderBufferFinishPrepared. + */ + if (!ReorderBufferRememberPrepareInfo(ctx->reorder, xid, buf->origptr, + buf->endptr, prepare_time, origin_id, + origin_lsn)) + return; + + /* We can't start streaming unless a consistent state is reached. */ + if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT) + { + ReorderBufferSkipPrepare(ctx->reorder, xid); + return; + } + + /* + * Check whether we need to process this transaction. See + * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the + * transaction. + * + * We can't call ReorderBufferForget as we did in DecodeCommit as the txn + * hasn't yet been committed, removing this txn before a commit might + * result in the computation of an incorrect restart_lsn. See + * SnapBuildProcessRunningXacts. But we need to process cache + * invalidations if there are any for the reasons mentioned in + * DecodeCommit. + */ + if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id)) + { + ReorderBufferSkipPrepare(ctx->reorder, xid); + ReorderBufferInvalidate(ctx->reorder, xid, buf->origptr); + return; + } + + /* Tell the reorderbuffer about the surviving subtransactions. */ + for (i = 0; i < parsed->nsubxacts; i++) + { + ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i], + buf->origptr, buf->endptr); + } + /* replay actions of all transaction + subtransactions in order */ - ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr, - commit_time, origin_id, origin_lsn); + ReorderBufferPrepare(ctx->reorder, xid, parsed->twophase_gid); /* - * Update the decoding stats at transaction commit/abort. It is not clear - * that sending more or less frequently than this would be better. + * Update the decoding stats at transaction prepare/commit/abort. It is + * not clear that sending more or less frequently than this would be + * better. */ UpdateDecodingStats(ctx); } + /* * Get the data from the various forms of abort records and pass it on to - * snapbuild.c and reorderbuffer.c + * snapbuild.c and reorderbuffer.c. + * + * 'two_phase' indicates to finish prepared transaction. */ static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, - xl_xact_parsed_abort *parsed, TransactionId xid) + xl_xact_parsed_abort *parsed, TransactionId xid, + bool two_phase) { int i; + XLogRecPtr origin_lsn = InvalidXLogRecPtr; + TimestampTz abort_time = parsed->xact_time; + XLogRecPtr origin_id = XLogRecGetOrigin(buf->record); + bool skip_xact; - for (i = 0; i < parsed->nsubxacts; i++) + if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN) + { + origin_lsn = parsed->origin_lsn; + abort_time = parsed->origin_timestamp; + } + + /* + * Check whether we need to process this transaction. See + * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the + * transaction. + */ + skip_xact = DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id); + + /* + * Send the final rollback record for a prepared transaction unless we + * need to skip it. For non-two-phase xacts, simply forget the xact. + */ + if (two_phase && !skip_xact) { - ReorderBufferAbort(ctx->reorder, parsed->subxacts[i], - buf->record->EndRecPtr); + ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr, + abort_time, origin_id, origin_lsn, + parsed->twophase_gid, false); } + else + { + for (i = 0; i < parsed->nsubxacts; i++) + { + ReorderBufferAbort(ctx->reorder, parsed->subxacts[i], + buf->record->EndRecPtr); + } - ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr); + ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr); + } /* update the decoding stats */ UpdateDecodingStats(ctx); @@ -1080,3 +1271,24 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple) header->t_infomask2 = xlhdr.t_infomask2; header->t_hoff = xlhdr.t_hoff; } + +/* + * Check whether we are interested in this specific transaction. + * + * There can be several reasons we might not be interested in this + * transaction: + * 1) We might not be interested in decoding transactions up to this + * LSN. This can happen because we previously decoded it and now just + * are restarting or if we haven't assembled a consistent snapshot yet. + * 2) The transaction happened in another database. + * 3) The output plugin is not interested in the origin. + * 4) We are doing fast-forwarding + */ +static bool +DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, + Oid txn_dbid, RepOriginId origin_id) +{ + return (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || + (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) || + ctx->fast_forward || FilterByOrigin(ctx, origin_id)); +} |