diff options
author | Amit Kapila <akapila@postgresql.org> | 2021-01-04 08:34:50 +0530 |
---|---|---|
committer | Amit Kapila <akapila@postgresql.org> | 2021-01-04 08:34:50 +0530 |
commit | a271a1b50e9bec07e2ef3a05e38e7285113e4ce6 (patch) | |
tree | a3cd4b3e22169f548a6c92615f8e713f7001e30f /src | |
parent | ca3b37487be333a1d241dab1bbdd17a211a88f43 (diff) | |
download | postgresql-a271a1b50e9bec07e2ef3a05e38e7285113e4ce6.tar.gz postgresql-a271a1b50e9bec07e2ef3a05e38e7285113e4ce6.zip |
Allow decoding at prepare time in ReorderBuffer.
This patch allows PREPARE-time decoding of two-phase transactions (if the
output plugin supports this capability), in which case the transactions
are replayed at PREPARE and then committed later when COMMIT PREPARED
arrives.
Now that we decode the changes before the commit, the concurrent aborts
may cause failures when the output plugin consults catalogs (both system
and user-defined).
We detect such failures with a special sqlerrcode
ERRCODE_TRANSACTION_ROLLBACK introduced by commit 7259736a6e and stop
decoding the remaining changes. Then we rollback the changes when rollback
prepared is encountered.
Author: Ajin Cherian and Amit Kapila based on previous work by Nikhil Sontakke and Stas Kelvich
Reviewed-by: Amit Kapila, Peter Smith, Sawada Masahiko, Arseny Sher, and Dilip Kumar
Tested-by: Takamichi Osumi
Discussion:
https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru
https://postgr.es/m/CAMGcDxeqEpWj3fTXwqhSwBdXd2RS9jzwWscO-XbeCfso6ts3+Q@mail.gmail.com
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/replication/logical/decode.c | 286 | ||||
-rw-r--r-- | src/backend/replication/logical/logical.c | 9 | ||||
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 432 | ||||
-rw-r--r-- | src/backend/replication/logical/snapbuild.c | 7 | ||||
-rw-r--r-- | src/include/replication/reorderbuffer.h | 33 |
5 files changed, 653 insertions, 114 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 1887ba79440..23ab3cf6052 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -67,13 +67,24 @@ static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, - xl_xact_parsed_commit *parsed, TransactionId xid); + xl_xact_parsed_commit *parsed, TransactionId xid, + bool two_phase); static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, - xl_xact_parsed_abort *parsed, TransactionId xid); + xl_xact_parsed_abort *parsed, TransactionId xid, + bool two_phase); +static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, + xl_xact_parsed_prepare *parsed); + /* common function to decode tuples */ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); +/* helper functions for decoding transactions */ +static inline bool FilterPrepare(LogicalDecodingContext *ctx, const char *gid); +static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx, + XLogRecordBuffer *buf, Oid dbId, + RepOriginId origin_id); + /* * Take every XLogReadRecord()ed record and perform the actions required to * decode it using the output plugin already setup in the logical decoding @@ -244,6 +255,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) xl_xact_commit *xlrec; xl_xact_parsed_commit parsed; TransactionId xid; + bool two_phase = false; xlrec = (xl_xact_commit *) XLogRecGetData(r); ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed); @@ -253,7 +265,15 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) else xid = parsed.twophase_xid; - DecodeCommit(ctx, buf, &parsed, xid); + /* + * We would like to process the transaction in a two-phase + * manner iff output plugin supports two-phase commits and + * doesn't filter the transaction at prepare time. + */ + if (info == XLOG_XACT_COMMIT_PREPARED) + two_phase = !(FilterPrepare(ctx, parsed.twophase_gid)); + + DecodeCommit(ctx, buf, &parsed, xid, two_phase); break; } case XLOG_XACT_ABORT: @@ -262,6 +282,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) xl_xact_abort *xlrec; xl_xact_parsed_abort parsed; TransactionId xid; + bool two_phase = false; xlrec = (xl_xact_abort *) XLogRecGetData(r); ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed); @@ -271,7 +292,15 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) else xid = parsed.twophase_xid; - DecodeAbort(ctx, buf, &parsed, xid); + /* + * We would like to process the transaction in a two-phase + * manner iff output plugin supports two-phase commits and + * doesn't filter the transaction at prepare time. + */ + if (info == XLOG_XACT_ABORT_PREPARED) + two_phase = !(FilterPrepare(ctx, parsed.twophase_gid)); + + DecodeAbort(ctx, buf, &parsed, xid, two_phase); break; } case XLOG_XACT_ASSIGNMENT: @@ -312,17 +341,30 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } break; case XLOG_XACT_PREPARE: + { + xl_xact_parsed_prepare parsed; + xl_xact_prepare *xlrec; - /* - * Currently decoding ignores PREPARE TRANSACTION and will just - * decode the transaction when the COMMIT PREPARED is sent or - * throw away the transaction's contents when a ROLLBACK PREPARED - * is received. In the future we could add code to expose prepared - * transactions in the changestream allowing for a kind of - * distributed 2PC. - */ - ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr); - break; + /* ok, parse it */ + xlrec = (xl_xact_prepare *) XLogRecGetData(r); + ParsePrepareRecord(XLogRecGetInfo(buf->record), + xlrec, &parsed); + + /* + * We would like to process the transaction in a two-phase + * manner iff output plugin supports two-phase commits and + * doesn't filter the transaction at prepare time. + */ + if (FilterPrepare(ctx, parsed.twophase_gid)) + { + ReorderBufferProcessXid(reorder, parsed.twophase_xid, + buf->origptr); + break; + } + + DecodePrepare(ctx, buf, &parsed); + break; + } default: elog(ERROR, "unexpected RM_XACT_ID record type: %u", info); } @@ -520,6 +562,32 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } } +/* + * Ask output plugin whether we want to skip this PREPARE and send + * this transaction as a regular commit later. + */ +static inline bool +FilterPrepare(LogicalDecodingContext *ctx, const char *gid) +{ + /* + * Skip if decoding of two-phase transactions at PREPARE time is not + * enabled. In that case, all two-phase transactions are considered + * filtered out and will be applied as regular transactions at COMMIT + * PREPARED. + */ + if (!ctx->twophase) + return true; + + /* + * The filter_prepare callback is optional. When not supplied, all + * prepared transactions should go through. + */ + if (ctx->callbacks.filter_prepare_cb == NULL) + return false; + + return filter_prepare_cb_wrapper(ctx, gid); +} + static inline bool FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id) { @@ -582,10 +650,15 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * Consolidated commit record handling between the different form of commit * records. + * + * 'two_phase' indicates that caller wants to process the transaction in two + * phases, first process prepare if not already done and then process + * commit_prepared. */ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, - xl_xact_parsed_commit *parsed, TransactionId xid) + xl_xact_parsed_commit *parsed, TransactionId xid, + bool two_phase) { XLogRecPtr origin_lsn = InvalidXLogRecPtr; TimestampTz commit_time = parsed->xact_time; @@ -606,15 +679,6 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, * the reorderbuffer to forget the content of the (sub-)transactions * if not. * - * There can be several reasons we might not be interested in this - * transaction: - * 1) We might not be interested in decoding transactions up to this - * LSN. This can happen because we previously decoded it and now just - * are restarting or if we haven't assembled a consistent snapshot yet. - * 2) The transaction happened in another database. - * 3) The output plugin is not interested in the origin. - * 4) We are doing fast-forwarding - * * We can't just use ReorderBufferAbort() here, because we need to execute * the transaction's invalidations. This currently won't be needed if * we're just skipping over the transaction because currently we only do @@ -627,9 +691,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, * relevant syscaches. * --- */ - if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || - (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) || - ctx->fast_forward || FilterByOrigin(ctx, origin_id)) + if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id)) { for (i = 0; i < parsed->nsubxacts; i++) { @@ -647,34 +709,163 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, buf->origptr, buf->endptr); } + /* + * Send the final commit record if the transaction data is already + * decoded, otherwise, process the entire transaction. + */ + if (two_phase) + { + ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn, + parsed->twophase_gid, true); + } + else + { + ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn); + } + + /* + * Update the decoding stats at transaction prepare/commit/abort. It is + * not clear that sending more or less frequently than this would be + * better. + */ + UpdateDecodingStats(ctx); +} + +/* + * Decode PREPARE record. Similar logic as in DecodeCommit. + * + * Note that we don't skip prepare even if have detected concurrent abort + * because it is quite possible that we had already sent some changes before we + * detect abort in which case we need to abort those changes in the subscriber. + * To abort such changes, we do send the prepare and then the rollback prepared + * which is what happened on the publisher-side as well. Now, we can invent a + * new abort API wherein in such cases we send abort and skip sending prepared + * and rollback prepared but then it is not that straightforward because we + * might have streamed this transaction by that time in which case it is + * handled when the rollback is encountered. It is not impossible to optimize + * the concurrent abort case but it can introduce design complexity w.r.t + * handling different cases so leaving it for now as it doesn't seem worth it. + */ +static void +DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, + xl_xact_parsed_prepare *parsed) +{ + SnapBuild *builder = ctx->snapshot_builder; + XLogRecPtr origin_lsn = parsed->origin_lsn; + TimestampTz prepare_time = parsed->xact_time; + XLogRecPtr origin_id = XLogRecGetOrigin(buf->record); + int i; + TransactionId xid = parsed->twophase_xid; + + if (parsed->origin_timestamp != 0) + prepare_time = parsed->origin_timestamp; + + /* + * Remember the prepare info for a txn so that it can be used later in + * commit prepared if required. See ReorderBufferFinishPrepared. + */ + if (!ReorderBufferRememberPrepareInfo(ctx->reorder, xid, buf->origptr, + buf->endptr, prepare_time, origin_id, + origin_lsn)) + return; + + /* We can't start streaming unless a consistent state is reached. */ + if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT) + { + ReorderBufferSkipPrepare(ctx->reorder, xid); + return; + } + + /* + * Check whether we need to process this transaction. See + * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the + * transaction. + * + * We can't call ReorderBufferForget as we did in DecodeCommit as the txn + * hasn't yet been committed, removing this txn before a commit might + * result in the computation of an incorrect restart_lsn. See + * SnapBuildProcessRunningXacts. But we need to process cache + * invalidations if there are any for the reasons mentioned in + * DecodeCommit. + */ + if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id)) + { + ReorderBufferSkipPrepare(ctx->reorder, xid); + ReorderBufferInvalidate(ctx->reorder, xid, buf->origptr); + return; + } + + /* Tell the reorderbuffer about the surviving subtransactions. */ + for (i = 0; i < parsed->nsubxacts; i++) + { + ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i], + buf->origptr, buf->endptr); + } + /* replay actions of all transaction + subtransactions in order */ - ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr, - commit_time, origin_id, origin_lsn); + ReorderBufferPrepare(ctx->reorder, xid, parsed->twophase_gid); /* - * Update the decoding stats at transaction commit/abort. It is not clear - * that sending more or less frequently than this would be better. + * Update the decoding stats at transaction prepare/commit/abort. It is + * not clear that sending more or less frequently than this would be + * better. */ UpdateDecodingStats(ctx); } + /* * Get the data from the various forms of abort records and pass it on to - * snapbuild.c and reorderbuffer.c + * snapbuild.c and reorderbuffer.c. + * + * 'two_phase' indicates to finish prepared transaction. */ static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, - xl_xact_parsed_abort *parsed, TransactionId xid) + xl_xact_parsed_abort *parsed, TransactionId xid, + bool two_phase) { int i; + XLogRecPtr origin_lsn = InvalidXLogRecPtr; + TimestampTz abort_time = parsed->xact_time; + XLogRecPtr origin_id = XLogRecGetOrigin(buf->record); + bool skip_xact; - for (i = 0; i < parsed->nsubxacts; i++) + if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN) + { + origin_lsn = parsed->origin_lsn; + abort_time = parsed->origin_timestamp; + } + + /* + * Check whether we need to process this transaction. See + * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the + * transaction. + */ + skip_xact = DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id); + + /* + * Send the final rollback record for a prepared transaction unless we + * need to skip it. For non-two-phase xacts, simply forget the xact. + */ + if (two_phase && !skip_xact) { - ReorderBufferAbort(ctx->reorder, parsed->subxacts[i], - buf->record->EndRecPtr); + ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr, + abort_time, origin_id, origin_lsn, + parsed->twophase_gid, false); } + else + { + for (i = 0; i < parsed->nsubxacts; i++) + { + ReorderBufferAbort(ctx->reorder, parsed->subxacts[i], + buf->record->EndRecPtr); + } - ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr); + ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr); + } /* update the decoding stats */ UpdateDecodingStats(ctx); @@ -1080,3 +1271,24 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple) header->t_infomask2 = xlhdr.t_infomask2; header->t_hoff = xlhdr.t_hoff; } + +/* + * Check whether we are interested in this specific transaction. + * + * There can be several reasons we might not be interested in this + * transaction: + * 1) We might not be interested in decoding transactions up to this + * LSN. This can happen because we previously decoded it and now just + * are restarting or if we haven't assembled a consistent snapshot yet. + * 2) The transaction happened in another database. + * 3) The output plugin is not interested in the origin. + * 4) We are doing fast-forwarding + */ +static bool +DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, + Oid txn_dbid, RepOriginId origin_id) +{ + return (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || + (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) || + ctx->fast_forward || FilterByOrigin(ctx, origin_id)); +} diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index e49b5115175..605ec0986ca 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1083,15 +1083,6 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid) Assert(!ctx->fast_forward); - /* - * Skip if decoding of two-phase transactions at PREPARE time is not - * enabled. In that case, all two-phase transactions are considered - * filtered out and will be applied as regular transactions at COMMIT - * PREPARED. - */ - if (!ctx->twophase) - return true; - /* Push callback + info on the error context stack */ state.ctx = ctx; state.callback_name = "filter_prepare"; diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index df63b90a67a..315bfe7cae2 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -251,7 +251,8 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change); static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn); -static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); +static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, + bool txn_prepared); static void ReorderBufferCleanupSerializedTXNs(const char *slotname); static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno); @@ -422,6 +423,12 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* free data that's contained */ + if (txn->gid != NULL) + { + pfree(txn->gid); + txn->gid = NULL; + } + if (txn->tuplecid_hash != NULL) { hash_destroy(txn->tuplecid_hash); @@ -1516,12 +1523,18 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) } /* - * Discard changes from a transaction (and subtransactions), after streaming - * them. Keep the remaining info - transactions, tuplecids, invalidations and - * snapshots. + * Discard changes from a transaction (and subtransactions), either after + * streaming or decoding them at PREPARE. Keep the remaining info - + * transactions, tuplecids, invalidations and snapshots. + * + * We additionaly remove tuplecids after decoding the transaction at prepare + * time as we only need to perform invalidation at rollback or commit prepared. + * + * 'txn_prepared' indicates that we have decoded the transaction at prepare + * time. */ static void -ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) +ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared) { dlist_mutable_iter iter; @@ -1540,7 +1553,7 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) Assert(rbtxn_is_known_subxact(subtxn)); Assert(subtxn->nsubtxns == 0); - ReorderBufferTruncateTXN(rb, subtxn); + ReorderBufferTruncateTXN(rb, subtxn, txn_prepared); } /* cleanup changes in the txn */ @@ -1574,9 +1587,33 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) * about the toplevel xact (we send the XID in all messages), but we never * stream XIDs of empty subxacts. */ - if ((!txn->toptxn) || (txn->nentries_mem != 0)) + if ((!txn_prepared) && ((!txn->toptxn) || (txn->nentries_mem != 0))) txn->txn_flags |= RBTXN_IS_STREAMED; + if (txn_prepared) + { + /* + * If this is a prepared txn, cleanup the tuplecids we stored for + * decoding catalog snapshot access. They are always stored in the + * toplevel transaction. + */ + dlist_foreach_modify(iter, &txn->tuplecids) + { + ReorderBufferChange *change; + + change = dlist_container(ReorderBufferChange, node, iter.cur); + + /* Check we're not mixing changes from different transactions. */ + Assert(change->txn == txn); + Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID); + + /* Remove the change from its containing list. */ + dlist_delete(&change->node); + + ReorderBufferReturnChange(rb, change, true); + } + } + /* * Destroy the (relfilenode, ctid) hashtable, so that we don't leak any * memory. We could also keep the hash table and update it with new ctid @@ -1756,9 +1793,10 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap) } /* - * If the transaction was (partially) streamed, we need to commit it in a - * 'streamed' way. That is, we first stream the remaining part of the - * transaction, and then invoke stream_commit message. + * If the transaction was (partially) streamed, we need to prepare or commit + * it in a 'streamed' way. That is, we first stream the remaining part of the + * transaction, and then invoke stream_prepare or stream_commit message as per + * the case. */ static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn) @@ -1768,29 +1806,49 @@ ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn) ReorderBufferStreamTXN(rb, txn); - rb->stream_commit(rb, txn, txn->final_lsn); + if (rbtxn_prepared(txn)) + { + /* + * Note, we send stream prepare even if a concurrent abort is + * detected. See DecodePrepare for more information. + */ + rb->stream_prepare(rb, txn, txn->final_lsn); - ReorderBufferCleanupTXN(rb, txn); + /* + * This is a PREPARED transaction, part of a two-phase commit. The + * full cleanup will happen as part of the COMMIT PREPAREDs, so now + * just truncate txn by removing changes and tuple_cids. + */ + ReorderBufferTruncateTXN(rb, txn, true); + /* Reset the CheckXidAlive */ + CheckXidAlive = InvalidTransactionId; + } + else + { + rb->stream_commit(rb, txn, txn->final_lsn); + ReorderBufferCleanupTXN(rb, txn); + } } /* * Set xid to detect concurrent aborts. * - * While streaming an in-progress transaction there is a possibility that the - * (sub)transaction might get aborted concurrently. In such case if the - * (sub)transaction has catalog update then we might decode the tuple using - * wrong catalog version. For example, suppose there is one catalog tuple with - * (xmin: 500, xmax: 0). Now, the transaction 501 updates the catalog tuple - * and after that we will have two tuples (xmin: 500, xmax: 501) and - * (xmin: 501, xmax: 0). Now, if 501 is aborted and some other transaction - * say 502 updates the same catalog tuple then the first tuple will be changed - * to (xmin: 500, xmax: 502). So, the problem is that when we try to decode - * the tuple inserted/updated in 501 after the catalog update, we will see the - * catalog tuple with (xmin: 500, xmax: 502) as visible because it will - * consider that the tuple is deleted by xid 502 which is not visible to our - * snapshot. And when we will try to decode with that catalog tuple, it can - * lead to a wrong result or a crash. So, it is necessary to detect - * concurrent aborts to allow streaming of in-progress transactions. + * While streaming an in-progress transaction or decoding a prepared + * transaction there is a possibility that the (sub)transaction might get + * aborted concurrently. In such case if the (sub)transaction has catalog + * update then we might decode the tuple using wrong catalog version. For + * example, suppose there is one catalog tuple with (xmin: 500, xmax: 0). Now, + * the transaction 501 updates the catalog tuple and after that we will have + * two tuples (xmin: 500, xmax: 501) and (xmin: 501, xmax: 0). Now, if 501 is + * aborted and some other transaction say 502 updates the same catalog tuple + * then the first tuple will be changed to (xmin: 500, xmax: 502). So, the + * problem is that when we try to decode the tuple inserted/updated in 501 + * after the catalog update, we will see the catalog tuple with (xmin: 500, + * xmax: 502) as visible because it will consider that the tuple is deleted by + * xid 502 which is not visible to our snapshot. And when we will try to + * decode with that catalog tuple, it can lead to a wrong result or a crash. + * So, it is necessary to detect concurrent aborts to allow streaming of + * in-progress transactions or decoding of prepared transactions. * * For detecting the concurrent abort we set CheckXidAlive to the current * (sub)transaction's xid for which this change belongs to. And, during @@ -1799,7 +1857,10 @@ ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn) * and discard the already streamed changes on such an error. We might have * already streamed some of the changes for the aborted (sub)transaction, but * that is fine because when we decode the abort we will stream abort message - * to truncate the changes in the subscriber. + * to truncate the changes in the subscriber. Similarly, for prepared + * transactions, we stop decoding if concurrent abort is detected and then + * rollback the changes when rollback prepared is encountered. See + * DecodePreare. */ static inline void SetupCheckXidLive(TransactionId xid) @@ -1901,7 +1962,7 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *specinsert) { /* Discard the changes that we just streamed */ - ReorderBufferTruncateTXN(rb, txn); + ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn)); /* Free all resources allocated for toast reconstruction */ ReorderBufferToastReset(rb, txn); @@ -1913,15 +1974,19 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, specinsert = NULL; } - /* Stop the stream. */ - rb->stream_stop(rb, txn, last_lsn); - - /* Remember the command ID and snapshot for the streaming run */ - ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id); + /* + * For the streaming case, stop the stream and remember the command ID and + * snapshot for the streaming run. + */ + if (rbtxn_is_streamed(txn)) + { + rb->stream_stop(rb, txn, last_lsn); + ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id); + } } /* - * Helper function for ReorderBufferCommit and ReorderBufferStreamTXN. + * Helper function for ReorderBufferReplay and ReorderBufferStreamTXN. * * Send data of a transaction (and its subtransactions) to the * output plugin. We iterate over the top and subtransactions (using a k-way @@ -1974,9 +2039,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, else StartTransactionCommand(); - /* We only need to send begin/commit for non-streamed transactions. */ + /* + * We only need to send begin/begin-prepare for non-streamed + * transactions. + */ if (!streaming) - rb->begin(rb, txn); + { + if (rbtxn_prepared(txn)) + rb->begin_prepare(rb, txn); + else + rb->begin(rb, txn); + } ReorderBufferIterTXNInit(rb, txn, &iterstate); while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL) @@ -2007,8 +2080,12 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, prev_lsn = change->lsn; - /* Set the current xid to detect concurrent aborts. */ - if (streaming) + /* + * Set the current xid to detect concurrent aborts. This is + * required for the cases when we decode the changes before the + * COMMIT record is processed. + */ + if (streaming || rbtxn_prepared(change->txn)) { curtxn = change->txn; SetupCheckXidLive(curtxn->xid); @@ -2299,7 +2376,16 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, } } else - rb->commit(rb, txn, commit_lsn); + { + /* + * Call either PREPARE (for two-phase transactions) or COMMIT (for + * regular ones). + */ + if (rbtxn_prepared(txn)) + rb->prepare(rb, txn, commit_lsn); + else + rb->commit(rb, txn, commit_lsn); + } /* this is just a sanity check against bad output plugin behaviour */ if (GetCurrentTransactionIdIfAny() != InvalidTransactionId) @@ -2333,15 +2419,22 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, RollbackAndReleaseCurrentSubTransaction(); /* - * If we are streaming the in-progress transaction then discard the - * changes that we just streamed, and mark the transactions as - * streamed (if they contained changes). Otherwise, remove all the - * changes and deallocate the ReorderBufferTXN. + * We are here due to one of the four reasons: 1. Decoding an + * in-progress txn. 2. Decoding a prepared txn. 3. Decoding of a + * prepared txn that was (partially) streamed. 4. Decoding a committed + * txn. + * + * For 1, we allow truncation of txn data by removing the changes + * already streamed but still keeping other things like invalidations, + * snapshot, and tuplecids. For 2 and 3, we indicate + * ReorderBufferTruncateTXN to do more elaborate truncation of txn + * data as the entire transaction has been decoded except for commit. + * For 4, as the entire txn has been decoded, we can fully clean up + * the TXN reorder buffer. */ - if (streaming) + if (streaming || rbtxn_prepared(txn)) { - ReorderBufferTruncateTXN(rb, txn); - + ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn)); /* Reset the CheckXidAlive */ CheckXidAlive = InvalidTransactionId; } @@ -2374,17 +2467,20 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, /* * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent - * abort of the (sub)transaction we are streaming. We need to do the - * cleanup and return gracefully on this error, see SetupCheckXidLive. + * abort of the (sub)transaction we are streaming or preparing. We + * need to do the cleanup and return gracefully on this error, see + * SetupCheckXidLive. */ if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK) { /* - * This error can only occur when we are sending the data in - * streaming mode and the streaming is not finished yet. + * This error can occur either when we are sending the data in + * streaming mode and the streaming is not finished yet or when we + * are sending the data out on a PREPARE during a two-phase + * commit. */ - Assert(streaming); - Assert(stream_started); + Assert(streaming || rbtxn_prepared(txn)); + Assert(stream_started || rbtxn_prepared(txn)); /* Cleanup the temporary error state. */ FlushErrorState(); @@ -2414,26 +2510,19 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, * ReorderBufferCommitChild(), even if previously assigned to the toplevel * transaction with ReorderBufferAssignChild. * - * This interface is called once a toplevel commit is read for both streamed - * as well as non-streamed transactions. + * This interface is called once a prepare or toplevel commit is read for both + * streamed as well as non-streamed transactions. */ -void -ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, +static void +ReorderBufferReplay(ReorderBufferTXN *txn, + ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn) { - ReorderBufferTXN *txn; Snapshot snapshot_now; CommandId command_id = FirstCommandId; - txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, - false); - - /* unknown transaction, nothing to replay */ - if (txn == NULL) - return; - txn->final_lsn = commit_lsn; txn->end_lsn = end_lsn; txn->commit_time = commit_time; @@ -2463,7 +2552,13 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, if (txn->base_snapshot == NULL) { Assert(txn->ninvalidations == 0); - ReorderBufferCleanupTXN(rb, txn); + + /* + * Removing this txn before a commit might result in the computation + * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts. + */ + if (!rbtxn_prepared(txn)) + ReorderBufferCleanupTXN(rb, txn); return; } @@ -2475,6 +2570,178 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, } /* + * Commit a transaction. + * + * See comments for ReorderBufferReplay(). + */ +void +ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + /* unknown transaction, nothing to replay */ + if (txn == NULL) + return; + + ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time, + origin_id, origin_lsn); +} + +/* + * Record the prepare information for a transaction. + */ +bool +ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, + TimestampTz prepare_time, + RepOriginId origin_id, XLogRecPtr origin_lsn) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false); + + /* unknown transaction, nothing to do */ + if (txn == NULL) + return false; + + /* + * Remember the prepare information to be later used by commit prepared in + * case we skip doing prepare. + */ + txn->final_lsn = prepare_lsn; + txn->end_lsn = end_lsn; + txn->commit_time = prepare_time; + txn->origin_id = origin_id; + txn->origin_lsn = origin_lsn; + + return true; +} + +/* Remember that we have skipped prepare */ +void +ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false); + + /* unknown transaction, nothing to do */ + if (txn == NULL) + return; + + txn->txn_flags |= RBTXN_SKIPPED_PREPARE; +} + +/* + * Prepare a two-phase transaction. + * + * See comments for ReorderBufferReplay(). + */ +void +ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, + char *gid) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + /* unknown transaction, nothing to replay */ + if (txn == NULL) + return; + + txn->txn_flags |= RBTXN_PREPARE; + txn->gid = pstrdup(gid); + + /* The prepare info must have been updated in txn by now. */ + Assert(txn->final_lsn != InvalidXLogRecPtr); + + ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn, + txn->commit_time, txn->origin_id, txn->origin_lsn); +} + +/* + * This is used to handle COMMIT/ROLLBACK PREPARED. + */ +void +ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, RepOriginId origin_id, + XLogRecPtr origin_lsn, char *gid, bool is_commit) +{ + ReorderBufferTXN *txn; + XLogRecPtr prepare_end_lsn; + TimestampTz prepare_time; + + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, commit_lsn, false); + + /* unknown transaction, nothing to do */ + if (txn == NULL) + return; + + /* + * By this time the txn has the prepare record information, remember it to + * be later used for rollback. + */ + prepare_end_lsn = txn->end_lsn; + prepare_time = txn->commit_time; + + /* add the gid in the txn */ + txn->gid = pstrdup(gid); + + /* + * It is possible that this transaction is not decoded at prepare time + * either because by that time we didn't have a consistent snapshot or it + * was decoded earlier but we have restarted. We can't distinguish between + * those two cases so we send the prepare in both the cases and let + * downstream decide whether to process or skip it. We don't need to + * decode the xact for aborts if it is not done already. + */ + if (!rbtxn_prepared(txn) && is_commit) + { + txn->txn_flags |= RBTXN_PREPARE; + + /* + * The prepare info must have been updated in txn even if we skip + * prepare. + */ + Assert(txn->final_lsn != InvalidXLogRecPtr); + + /* + * By this time the txn has the prepare record information and it is + * important to use that so that downstream gets the accurate + * information. If instead, we have passed commit information here + * then downstream can behave as it has already replayed commit + * prepared after the restart. + */ + ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn, + txn->commit_time, txn->origin_id, txn->origin_lsn); + } + + txn->final_lsn = commit_lsn; + txn->end_lsn = end_lsn; + txn->commit_time = commit_time; + txn->origin_id = origin_id; + txn->origin_lsn = origin_lsn; + + if (is_commit) + rb->commit_prepared(rb, txn, commit_lsn); + else + rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time); + + /* cleanup: make sure there's no cache pollution */ + ReorderBufferExecuteInvalidations(txn->ninvalidations, + txn->invalidations); + ReorderBufferCleanupTXN(rb, txn); +} + +/* * Abort a transaction that possibly has previous changes. Needs to be first * called for subtransactions and then for the toplevel xid. * @@ -2606,6 +2873,39 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) } /* + * Invalidate cache for those transactions that need to be skipped just in case + * catalogs were manipulated as part of the transaction. + * + * Note that this is a special-purpose function for prepared transactions where + * we don't want to clean up the TXN even when we decide to skip it. See + * DecodePrepare. + */ +void +ReorderBufferInvalidate(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + /* unknown, nothing to do */ + if (txn == NULL) + return; + + /* + * Process cache invalidation messages if there are any. Even if we're not + * interested in the transaction's contents, it could have manipulated the + * catalog and we need to update the caches according to that. + */ + if (txn->base_snapshot != NULL && txn->ninvalidations > 0) + ReorderBufferImmediateInvalidation(rb, txn->ninvalidations, + txn->invalidations); + else + Assert(txn->ninvalidations == 0); +} + + +/* * Execute invalidations happening outside the context of a decoded * transaction. That currently happens either for xid-less commits * (cf. RecordTransactionCommit()) or for invalidations in uninteresting diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 6afc25e8d3d..15b07a54c11 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -834,6 +834,13 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn) if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid)) continue; + /* + * We don't need to add snapshot to prepared transactions as they + * should not see the new catalog contents. + */ + if (rbtxn_prepared(txn) || rbtxn_skip_prepared(txn)) + continue; + elog(DEBUG2, "adding a new snapshot to %u at %X/%X", txn->xid, (uint32) (lsn >> 32), (uint32) lsn); diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 9f982137d93..bab31bf7af7 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -174,6 +174,8 @@ typedef struct ReorderBufferChange #define RBTXN_IS_STREAMED 0x0010 #define RBTXN_HAS_TOAST_INSERT 0x0020 #define RBTXN_HAS_SPEC_INSERT 0x0040 +#define RBTXN_PREPARE 0x0080 +#define RBTXN_SKIPPED_PREPARE 0x0100 /* Does the transaction have catalog changes? */ #define rbtxn_has_catalog_changes(txn) \ @@ -233,6 +235,18 @@ typedef struct ReorderBufferChange ((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \ ) +/* Has this transaction been prepared? */ +#define rbtxn_prepared(txn) \ +( \ + ((txn)->txn_flags & RBTXN_PREPARE) != 0 \ +) + +/* prepare for this transaction skipped? */ +#define rbtxn_skip_prepared(txn) \ +( \ + ((txn)->txn_flags & RBTXN_SKIPPED_PREPARE) != 0 \ +) + typedef struct ReorderBufferTXN { /* See above */ @@ -258,10 +272,11 @@ typedef struct ReorderBufferTXN XLogRecPtr first_lsn; /* ---- - * LSN of the record that lead to this xact to be committed or + * LSN of the record that lead to this xact to be prepared or committed or * aborted. This can be a * * plain commit record * * plain commit record, of a parent transaction + * * prepared tansaction * * prepared transaction commit * * plain abort record * * prepared transaction abort @@ -293,7 +308,8 @@ typedef struct ReorderBufferTXN XLogRecPtr origin_lsn; /* - * Commit time, only known when we read the actual commit record. + * Commit or Prepare time, only known when we read the actual commit or + * prepare record. */ TimestampTz commit_time; @@ -625,12 +641,18 @@ void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapsho void ReorderBufferCommit(ReorderBuffer *, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn); +void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn, + char *gid, bool is_commit); void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn); void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn); void ReorderBufferAbort(ReorderBuffer *, TransactionId, XLogRecPtr lsn); void ReorderBufferAbortOld(ReorderBuffer *, TransactionId xid); void ReorderBufferForget(ReorderBuffer *, TransactionId, XLogRecPtr lsn); +void ReorderBufferInvalidate(ReorderBuffer *, TransactionId, XLogRecPtr lsn); void ReorderBufferSetBaseSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap); void ReorderBufferAddSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap); @@ -644,10 +666,17 @@ void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr l void ReorderBufferImmediateInvalidation(ReorderBuffer *, uint32 ninvalidations, SharedInvalidationMessage *invalidations); void ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn); + void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn); bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid); bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid); +bool ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, + TimestampTz prepare_time, + RepOriginId origin_id, XLogRecPtr origin_lsn); +void ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid); +void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, char *gid); ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *); TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb); |