diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/replication/logical/decode.c | 286 | ||||
-rw-r--r-- | src/backend/replication/logical/logical.c | 9 | ||||
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 432 | ||||
-rw-r--r-- | src/backend/replication/logical/snapbuild.c | 7 | ||||
-rw-r--r-- | src/include/replication/reorderbuffer.h | 33 |
5 files changed, 653 insertions, 114 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 1887ba79440..23ab3cf6052 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -67,13 +67,24 @@ static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, - xl_xact_parsed_commit *parsed, TransactionId xid); + xl_xact_parsed_commit *parsed, TransactionId xid, + bool two_phase); static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, - xl_xact_parsed_abort *parsed, TransactionId xid); + xl_xact_parsed_abort *parsed, TransactionId xid, + bool two_phase); +static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, + xl_xact_parsed_prepare *parsed); + /* common function to decode tuples */ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); +/* helper functions for decoding transactions */ +static inline bool FilterPrepare(LogicalDecodingContext *ctx, const char *gid); +static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx, + XLogRecordBuffer *buf, Oid dbId, + RepOriginId origin_id); + /* * Take every XLogReadRecord()ed record and perform the actions required to * decode it using the output plugin already setup in the logical decoding @@ -244,6 +255,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) xl_xact_commit *xlrec; xl_xact_parsed_commit parsed; TransactionId xid; + bool two_phase = false; xlrec = (xl_xact_commit *) XLogRecGetData(r); ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed); @@ -253,7 +265,15 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) else xid = parsed.twophase_xid; - DecodeCommit(ctx, buf, &parsed, xid); + /* + * We would like to process the transaction in a two-phase + * manner iff output plugin supports two-phase commits and + * doesn't filter the transaction at prepare time. + */ + if (info == XLOG_XACT_COMMIT_PREPARED) + two_phase = !(FilterPrepare(ctx, parsed.twophase_gid)); + + DecodeCommit(ctx, buf, &parsed, xid, two_phase); break; } case XLOG_XACT_ABORT: @@ -262,6 +282,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) xl_xact_abort *xlrec; xl_xact_parsed_abort parsed; TransactionId xid; + bool two_phase = false; xlrec = (xl_xact_abort *) XLogRecGetData(r); ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed); @@ -271,7 +292,15 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) else xid = parsed.twophase_xid; - DecodeAbort(ctx, buf, &parsed, xid); + /* + * We would like to process the transaction in a two-phase + * manner iff output plugin supports two-phase commits and + * doesn't filter the transaction at prepare time. + */ + if (info == XLOG_XACT_ABORT_PREPARED) + two_phase = !(FilterPrepare(ctx, parsed.twophase_gid)); + + DecodeAbort(ctx, buf, &parsed, xid, two_phase); break; } case XLOG_XACT_ASSIGNMENT: @@ -312,17 +341,30 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } break; case XLOG_XACT_PREPARE: + { + xl_xact_parsed_prepare parsed; + xl_xact_prepare *xlrec; - /* - * Currently decoding ignores PREPARE TRANSACTION and will just - * decode the transaction when the COMMIT PREPARED is sent or - * throw away the transaction's contents when a ROLLBACK PREPARED - * is received. In the future we could add code to expose prepared - * transactions in the changestream allowing for a kind of - * distributed 2PC. - */ - ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr); - break; + /* ok, parse it */ + xlrec = (xl_xact_prepare *) XLogRecGetData(r); + ParsePrepareRecord(XLogRecGetInfo(buf->record), + xlrec, &parsed); + + /* + * We would like to process the transaction in a two-phase + * manner iff output plugin supports two-phase commits and + * doesn't filter the transaction at prepare time. + */ + if (FilterPrepare(ctx, parsed.twophase_gid)) + { + ReorderBufferProcessXid(reorder, parsed.twophase_xid, + buf->origptr); + break; + } + + DecodePrepare(ctx, buf, &parsed); + break; + } default: elog(ERROR, "unexpected RM_XACT_ID record type: %u", info); } @@ -520,6 +562,32 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } } +/* + * Ask output plugin whether we want to skip this PREPARE and send + * this transaction as a regular commit later. + */ +static inline bool +FilterPrepare(LogicalDecodingContext *ctx, const char *gid) +{ + /* + * Skip if decoding of two-phase transactions at PREPARE time is not + * enabled. In that case, all two-phase transactions are considered + * filtered out and will be applied as regular transactions at COMMIT + * PREPARED. + */ + if (!ctx->twophase) + return true; + + /* + * The filter_prepare callback is optional. When not supplied, all + * prepared transactions should go through. + */ + if (ctx->callbacks.filter_prepare_cb == NULL) + return false; + + return filter_prepare_cb_wrapper(ctx, gid); +} + static inline bool FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id) { @@ -582,10 +650,15 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * Consolidated commit record handling between the different form of commit * records. + * + * 'two_phase' indicates that caller wants to process the transaction in two + * phases, first process prepare if not already done and then process + * commit_prepared. */ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, - xl_xact_parsed_commit *parsed, TransactionId xid) + xl_xact_parsed_commit *parsed, TransactionId xid, + bool two_phase) { XLogRecPtr origin_lsn = InvalidXLogRecPtr; TimestampTz commit_time = parsed->xact_time; @@ -606,15 +679,6 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, * the reorderbuffer to forget the content of the (sub-)transactions * if not. * - * There can be several reasons we might not be interested in this - * transaction: - * 1) We might not be interested in decoding transactions up to this - * LSN. This can happen because we previously decoded it and now just - * are restarting or if we haven't assembled a consistent snapshot yet. - * 2) The transaction happened in another database. - * 3) The output plugin is not interested in the origin. - * 4) We are doing fast-forwarding - * * We can't just use ReorderBufferAbort() here, because we need to execute * the transaction's invalidations. This currently won't be needed if * we're just skipping over the transaction because currently we only do @@ -627,9 +691,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, * relevant syscaches. * --- */ - if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || - (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) || - ctx->fast_forward || FilterByOrigin(ctx, origin_id)) + if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id)) { for (i = 0; i < parsed->nsubxacts; i++) { @@ -647,34 +709,163 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, buf->origptr, buf->endptr); } + /* + * Send the final commit record if the transaction data is already + * decoded, otherwise, process the entire transaction. + */ + if (two_phase) + { + ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn, + parsed->twophase_gid, true); + } + else + { + ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr, + commit_time, origin_id, origin_lsn); + } + + /* + * Update the decoding stats at transaction prepare/commit/abort. It is + * not clear that sending more or less frequently than this would be + * better. + */ + UpdateDecodingStats(ctx); +} + +/* + * Decode PREPARE record. Similar logic as in DecodeCommit. + * + * Note that we don't skip prepare even if have detected concurrent abort + * because it is quite possible that we had already sent some changes before we + * detect abort in which case we need to abort those changes in the subscriber. + * To abort such changes, we do send the prepare and then the rollback prepared + * which is what happened on the publisher-side as well. Now, we can invent a + * new abort API wherein in such cases we send abort and skip sending prepared + * and rollback prepared but then it is not that straightforward because we + * might have streamed this transaction by that time in which case it is + * handled when the rollback is encountered. It is not impossible to optimize + * the concurrent abort case but it can introduce design complexity w.r.t + * handling different cases so leaving it for now as it doesn't seem worth it. + */ +static void +DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, + xl_xact_parsed_prepare *parsed) +{ + SnapBuild *builder = ctx->snapshot_builder; + XLogRecPtr origin_lsn = parsed->origin_lsn; + TimestampTz prepare_time = parsed->xact_time; + XLogRecPtr origin_id = XLogRecGetOrigin(buf->record); + int i; + TransactionId xid = parsed->twophase_xid; + + if (parsed->origin_timestamp != 0) + prepare_time = parsed->origin_timestamp; + + /* + * Remember the prepare info for a txn so that it can be used later in + * commit prepared if required. See ReorderBufferFinishPrepared. + */ + if (!ReorderBufferRememberPrepareInfo(ctx->reorder, xid, buf->origptr, + buf->endptr, prepare_time, origin_id, + origin_lsn)) + return; + + /* We can't start streaming unless a consistent state is reached. */ + if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT) + { + ReorderBufferSkipPrepare(ctx->reorder, xid); + return; + } + + /* + * Check whether we need to process this transaction. See + * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the + * transaction. + * + * We can't call ReorderBufferForget as we did in DecodeCommit as the txn + * hasn't yet been committed, removing this txn before a commit might + * result in the computation of an incorrect restart_lsn. See + * SnapBuildProcessRunningXacts. But we need to process cache + * invalidations if there are any for the reasons mentioned in + * DecodeCommit. + */ + if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id)) + { + ReorderBufferSkipPrepare(ctx->reorder, xid); + ReorderBufferInvalidate(ctx->reorder, xid, buf->origptr); + return; + } + + /* Tell the reorderbuffer about the surviving subtransactions. */ + for (i = 0; i < parsed->nsubxacts; i++) + { + ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i], + buf->origptr, buf->endptr); + } + /* replay actions of all transaction + subtransactions in order */ - ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr, - commit_time, origin_id, origin_lsn); + ReorderBufferPrepare(ctx->reorder, xid, parsed->twophase_gid); /* - * Update the decoding stats at transaction commit/abort. It is not clear - * that sending more or less frequently than this would be better. + * Update the decoding stats at transaction prepare/commit/abort. It is + * not clear that sending more or less frequently than this would be + * better. */ UpdateDecodingStats(ctx); } + /* * Get the data from the various forms of abort records and pass it on to - * snapbuild.c and reorderbuffer.c + * snapbuild.c and reorderbuffer.c. + * + * 'two_phase' indicates to finish prepared transaction. */ static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, - xl_xact_parsed_abort *parsed, TransactionId xid) + xl_xact_parsed_abort *parsed, TransactionId xid, + bool two_phase) { int i; + XLogRecPtr origin_lsn = InvalidXLogRecPtr; + TimestampTz abort_time = parsed->xact_time; + XLogRecPtr origin_id = XLogRecGetOrigin(buf->record); + bool skip_xact; - for (i = 0; i < parsed->nsubxacts; i++) + if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN) + { + origin_lsn = parsed->origin_lsn; + abort_time = parsed->origin_timestamp; + } + + /* + * Check whether we need to process this transaction. See + * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the + * transaction. + */ + skip_xact = DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id); + + /* + * Send the final rollback record for a prepared transaction unless we + * need to skip it. For non-two-phase xacts, simply forget the xact. + */ + if (two_phase && !skip_xact) { - ReorderBufferAbort(ctx->reorder, parsed->subxacts[i], - buf->record->EndRecPtr); + ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr, + abort_time, origin_id, origin_lsn, + parsed->twophase_gid, false); } + else + { + for (i = 0; i < parsed->nsubxacts; i++) + { + ReorderBufferAbort(ctx->reorder, parsed->subxacts[i], + buf->record->EndRecPtr); + } - ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr); + ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr); + } /* update the decoding stats */ UpdateDecodingStats(ctx); @@ -1080,3 +1271,24 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple) header->t_infomask2 = xlhdr.t_infomask2; header->t_hoff = xlhdr.t_hoff; } + +/* + * Check whether we are interested in this specific transaction. + * + * There can be several reasons we might not be interested in this + * transaction: + * 1) We might not be interested in decoding transactions up to this + * LSN. This can happen because we previously decoded it and now just + * are restarting or if we haven't assembled a consistent snapshot yet. + * 2) The transaction happened in another database. + * 3) The output plugin is not interested in the origin. + * 4) We are doing fast-forwarding + */ +static bool +DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, + Oid txn_dbid, RepOriginId origin_id) +{ + return (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || + (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) || + ctx->fast_forward || FilterByOrigin(ctx, origin_id)); +} diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index e49b5115175..605ec0986ca 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1083,15 +1083,6 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid) Assert(!ctx->fast_forward); - /* - * Skip if decoding of two-phase transactions at PREPARE time is not - * enabled. In that case, all two-phase transactions are considered - * filtered out and will be applied as regular transactions at COMMIT - * PREPARED. - */ - if (!ctx->twophase) - return true; - /* Push callback + info on the error context stack */ state.ctx = ctx; state.callback_name = "filter_prepare"; diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index df63b90a67a..315bfe7cae2 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -251,7 +251,8 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change); static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn); -static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); +static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, + bool txn_prepared); static void ReorderBufferCleanupSerializedTXNs(const char *slotname); static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno); @@ -422,6 +423,12 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* free data that's contained */ + if (txn->gid != NULL) + { + pfree(txn->gid); + txn->gid = NULL; + } + if (txn->tuplecid_hash != NULL) { hash_destroy(txn->tuplecid_hash); @@ -1516,12 +1523,18 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) } /* - * Discard changes from a transaction (and subtransactions), after streaming - * them. Keep the remaining info - transactions, tuplecids, invalidations and - * snapshots. + * Discard changes from a transaction (and subtransactions), either after + * streaming or decoding them at PREPARE. Keep the remaining info - + * transactions, tuplecids, invalidations and snapshots. + * + * We additionaly remove tuplecids after decoding the transaction at prepare + * time as we only need to perform invalidation at rollback or commit prepared. + * + * 'txn_prepared' indicates that we have decoded the transaction at prepare + * time. */ static void -ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) +ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared) { dlist_mutable_iter iter; @@ -1540,7 +1553,7 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) Assert(rbtxn_is_known_subxact(subtxn)); Assert(subtxn->nsubtxns == 0); - ReorderBufferTruncateTXN(rb, subtxn); + ReorderBufferTruncateTXN(rb, subtxn, txn_prepared); } /* cleanup changes in the txn */ @@ -1574,9 +1587,33 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) * about the toplevel xact (we send the XID in all messages), but we never * stream XIDs of empty subxacts. */ - if ((!txn->toptxn) || (txn->nentries_mem != 0)) + if ((!txn_prepared) && ((!txn->toptxn) || (txn->nentries_mem != 0))) txn->txn_flags |= RBTXN_IS_STREAMED; + if (txn_prepared) + { + /* + * If this is a prepared txn, cleanup the tuplecids we stored for + * decoding catalog snapshot access. They are always stored in the + * toplevel transaction. + */ + dlist_foreach_modify(iter, &txn->tuplecids) + { + ReorderBufferChange *change; + + change = dlist_container(ReorderBufferChange, node, iter.cur); + + /* Check we're not mixing changes from different transactions. */ + Assert(change->txn == txn); + Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID); + + /* Remove the change from its containing list. */ + dlist_delete(&change->node); + + ReorderBufferReturnChange(rb, change, true); + } + } + /* * Destroy the (relfilenode, ctid) hashtable, so that we don't leak any * memory. We could also keep the hash table and update it with new ctid @@ -1756,9 +1793,10 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap) } /* - * If the transaction was (partially) streamed, we need to commit it in a - * 'streamed' way. That is, we first stream the remaining part of the - * transaction, and then invoke stream_commit message. + * If the transaction was (partially) streamed, we need to prepare or commit + * it in a 'streamed' way. That is, we first stream the remaining part of the + * transaction, and then invoke stream_prepare or stream_commit message as per + * the case. */ static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn) @@ -1768,29 +1806,49 @@ ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn) ReorderBufferStreamTXN(rb, txn); - rb->stream_commit(rb, txn, txn->final_lsn); + if (rbtxn_prepared(txn)) + { + /* + * Note, we send stream prepare even if a concurrent abort is + * detected. See DecodePrepare for more information. + */ + rb->stream_prepare(rb, txn, txn->final_lsn); - ReorderBufferCleanupTXN(rb, txn); + /* + * This is a PREPARED transaction, part of a two-phase commit. The + * full cleanup will happen as part of the COMMIT PREPAREDs, so now + * just truncate txn by removing changes and tuple_cids. + */ + ReorderBufferTruncateTXN(rb, txn, true); + /* Reset the CheckXidAlive */ + CheckXidAlive = InvalidTransactionId; + } + else + { + rb->stream_commit(rb, txn, txn->final_lsn); + ReorderBufferCleanupTXN(rb, txn); + } } /* * Set xid to detect concurrent aborts. * - * While streaming an in-progress transaction there is a possibility that the - * (sub)transaction might get aborted concurrently. In such case if the - * (sub)transaction has catalog update then we might decode the tuple using - * wrong catalog version. For example, suppose there is one catalog tuple with - * (xmin: 500, xmax: 0). Now, the transaction 501 updates the catalog tuple - * and after that we will have two tuples (xmin: 500, xmax: 501) and - * (xmin: 501, xmax: 0). Now, if 501 is aborted and some other transaction - * say 502 updates the same catalog tuple then the first tuple will be changed - * to (xmin: 500, xmax: 502). So, the problem is that when we try to decode - * the tuple inserted/updated in 501 after the catalog update, we will see the - * catalog tuple with (xmin: 500, xmax: 502) as visible because it will - * consider that the tuple is deleted by xid 502 which is not visible to our - * snapshot. And when we will try to decode with that catalog tuple, it can - * lead to a wrong result or a crash. So, it is necessary to detect - * concurrent aborts to allow streaming of in-progress transactions. + * While streaming an in-progress transaction or decoding a prepared + * transaction there is a possibility that the (sub)transaction might get + * aborted concurrently. In such case if the (sub)transaction has catalog + * update then we might decode the tuple using wrong catalog version. For + * example, suppose there is one catalog tuple with (xmin: 500, xmax: 0). Now, + * the transaction 501 updates the catalog tuple and after that we will have + * two tuples (xmin: 500, xmax: 501) and (xmin: 501, xmax: 0). Now, if 501 is + * aborted and some other transaction say 502 updates the same catalog tuple + * then the first tuple will be changed to (xmin: 500, xmax: 502). So, the + * problem is that when we try to decode the tuple inserted/updated in 501 + * after the catalog update, we will see the catalog tuple with (xmin: 500, + * xmax: 502) as visible because it will consider that the tuple is deleted by + * xid 502 which is not visible to our snapshot. And when we will try to + * decode with that catalog tuple, it can lead to a wrong result or a crash. + * So, it is necessary to detect concurrent aborts to allow streaming of + * in-progress transactions or decoding of prepared transactions. * * For detecting the concurrent abort we set CheckXidAlive to the current * (sub)transaction's xid for which this change belongs to. And, during @@ -1799,7 +1857,10 @@ ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn) * and discard the already streamed changes on such an error. We might have * already streamed some of the changes for the aborted (sub)transaction, but * that is fine because when we decode the abort we will stream abort message - * to truncate the changes in the subscriber. + * to truncate the changes in the subscriber. Similarly, for prepared + * transactions, we stop decoding if concurrent abort is detected and then + * rollback the changes when rollback prepared is encountered. See + * DecodePreare. */ static inline void SetupCheckXidLive(TransactionId xid) @@ -1901,7 +1962,7 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *specinsert) { /* Discard the changes that we just streamed */ - ReorderBufferTruncateTXN(rb, txn); + ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn)); /* Free all resources allocated for toast reconstruction */ ReorderBufferToastReset(rb, txn); @@ -1913,15 +1974,19 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, specinsert = NULL; } - /* Stop the stream. */ - rb->stream_stop(rb, txn, last_lsn); - - /* Remember the command ID and snapshot for the streaming run */ - ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id); + /* + * For the streaming case, stop the stream and remember the command ID and + * snapshot for the streaming run. + */ + if (rbtxn_is_streamed(txn)) + { + rb->stream_stop(rb, txn, last_lsn); + ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id); + } } /* - * Helper function for ReorderBufferCommit and ReorderBufferStreamTXN. + * Helper function for ReorderBufferReplay and ReorderBufferStreamTXN. * * Send data of a transaction (and its subtransactions) to the * output plugin. We iterate over the top and subtransactions (using a k-way @@ -1974,9 +2039,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, else StartTransactionCommand(); - /* We only need to send begin/commit for non-streamed transactions. */ + /* + * We only need to send begin/begin-prepare for non-streamed + * transactions. + */ if (!streaming) - rb->begin(rb, txn); + { + if (rbtxn_prepared(txn)) + rb->begin_prepare(rb, txn); + else + rb->begin(rb, txn); + } ReorderBufferIterTXNInit(rb, txn, &iterstate); while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL) @@ -2007,8 +2080,12 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, prev_lsn = change->lsn; - /* Set the current xid to detect concurrent aborts. */ - if (streaming) + /* + * Set the current xid to detect concurrent aborts. This is + * required for the cases when we decode the changes before the + * COMMIT record is processed. + */ + if (streaming || rbtxn_prepared(change->txn)) { curtxn = change->txn; SetupCheckXidLive(curtxn->xid); @@ -2299,7 +2376,16 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, } } else - rb->commit(rb, txn, commit_lsn); + { + /* + * Call either PREPARE (for two-phase transactions) or COMMIT (for + * regular ones). + */ + if (rbtxn_prepared(txn)) + rb->prepare(rb, txn, commit_lsn); + else + rb->commit(rb, txn, commit_lsn); + } /* this is just a sanity check against bad output plugin behaviour */ if (GetCurrentTransactionIdIfAny() != InvalidTransactionId) @@ -2333,15 +2419,22 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, RollbackAndReleaseCurrentSubTransaction(); /* - * If we are streaming the in-progress transaction then discard the - * changes that we just streamed, and mark the transactions as - * streamed (if they contained changes). Otherwise, remove all the - * changes and deallocate the ReorderBufferTXN. + * We are here due to one of the four reasons: 1. Decoding an + * in-progress txn. 2. Decoding a prepared txn. 3. Decoding of a + * prepared txn that was (partially) streamed. 4. Decoding a committed + * txn. + * + * For 1, we allow truncation of txn data by removing the changes + * already streamed but still keeping other things like invalidations, + * snapshot, and tuplecids. For 2 and 3, we indicate + * ReorderBufferTruncateTXN to do more elaborate truncation of txn + * data as the entire transaction has been decoded except for commit. + * For 4, as the entire txn has been decoded, we can fully clean up + * the TXN reorder buffer. */ - if (streaming) + if (streaming || rbtxn_prepared(txn)) { - ReorderBufferTruncateTXN(rb, txn); - + ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn)); /* Reset the CheckXidAlive */ CheckXidAlive = InvalidTransactionId; } @@ -2374,17 +2467,20 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, /* * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent - * abort of the (sub)transaction we are streaming. We need to do the - * cleanup and return gracefully on this error, see SetupCheckXidLive. + * abort of the (sub)transaction we are streaming or preparing. We + * need to do the cleanup and return gracefully on this error, see + * SetupCheckXidLive. */ if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK) { /* - * This error can only occur when we are sending the data in - * streaming mode and the streaming is not finished yet. + * This error can occur either when we are sending the data in + * streaming mode and the streaming is not finished yet or when we + * are sending the data out on a PREPARE during a two-phase + * commit. */ - Assert(streaming); - Assert(stream_started); + Assert(streaming || rbtxn_prepared(txn)); + Assert(stream_started || rbtxn_prepared(txn)); /* Cleanup the temporary error state. */ FlushErrorState(); @@ -2414,26 +2510,19 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, * ReorderBufferCommitChild(), even if previously assigned to the toplevel * transaction with ReorderBufferAssignChild. * - * This interface is called once a toplevel commit is read for both streamed - * as well as non-streamed transactions. + * This interface is called once a prepare or toplevel commit is read for both + * streamed as well as non-streamed transactions. */ -void -ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, +static void +ReorderBufferReplay(ReorderBufferTXN *txn, + ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn) { - ReorderBufferTXN *txn; Snapshot snapshot_now; CommandId command_id = FirstCommandId; - txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, - false); - - /* unknown transaction, nothing to replay */ - if (txn == NULL) - return; - txn->final_lsn = commit_lsn; txn->end_lsn = end_lsn; txn->commit_time = commit_time; @@ -2463,7 +2552,13 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, if (txn->base_snapshot == NULL) { Assert(txn->ninvalidations == 0); - ReorderBufferCleanupTXN(rb, txn); + + /* + * Removing this txn before a commit might result in the computation + * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts. + */ + if (!rbtxn_prepared(txn)) + ReorderBufferCleanupTXN(rb, txn); return; } @@ -2475,6 +2570,178 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, } /* + * Commit a transaction. + * + * See comments for ReorderBufferReplay(). + */ +void +ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + /* unknown transaction, nothing to replay */ + if (txn == NULL) + return; + + ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time, + origin_id, origin_lsn); +} + +/* + * Record the prepare information for a transaction. + */ +bool +ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, + TimestampTz prepare_time, + RepOriginId origin_id, XLogRecPtr origin_lsn) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false); + + /* unknown transaction, nothing to do */ + if (txn == NULL) + return false; + + /* + * Remember the prepare information to be later used by commit prepared in + * case we skip doing prepare. + */ + txn->final_lsn = prepare_lsn; + txn->end_lsn = end_lsn; + txn->commit_time = prepare_time; + txn->origin_id = origin_id; + txn->origin_lsn = origin_lsn; + + return true; +} + +/* Remember that we have skipped prepare */ +void +ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false); + + /* unknown transaction, nothing to do */ + if (txn == NULL) + return; + + txn->txn_flags |= RBTXN_SKIPPED_PREPARE; +} + +/* + * Prepare a two-phase transaction. + * + * See comments for ReorderBufferReplay(). + */ +void +ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, + char *gid) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + /* unknown transaction, nothing to replay */ + if (txn == NULL) + return; + + txn->txn_flags |= RBTXN_PREPARE; + txn->gid = pstrdup(gid); + + /* The prepare info must have been updated in txn by now. */ + Assert(txn->final_lsn != InvalidXLogRecPtr); + + ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn, + txn->commit_time, txn->origin_id, txn->origin_lsn); +} + +/* + * This is used to handle COMMIT/ROLLBACK PREPARED. + */ +void +ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, RepOriginId origin_id, + XLogRecPtr origin_lsn, char *gid, bool is_commit) +{ + ReorderBufferTXN *txn; + XLogRecPtr prepare_end_lsn; + TimestampTz prepare_time; + + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, commit_lsn, false); + + /* unknown transaction, nothing to do */ + if (txn == NULL) + return; + + /* + * By this time the txn has the prepare record information, remember it to + * be later used for rollback. + */ + prepare_end_lsn = txn->end_lsn; + prepare_time = txn->commit_time; + + /* add the gid in the txn */ + txn->gid = pstrdup(gid); + + /* + * It is possible that this transaction is not decoded at prepare time + * either because by that time we didn't have a consistent snapshot or it + * was decoded earlier but we have restarted. We can't distinguish between + * those two cases so we send the prepare in both the cases and let + * downstream decide whether to process or skip it. We don't need to + * decode the xact for aborts if it is not done already. + */ + if (!rbtxn_prepared(txn) && is_commit) + { + txn->txn_flags |= RBTXN_PREPARE; + + /* + * The prepare info must have been updated in txn even if we skip + * prepare. + */ + Assert(txn->final_lsn != InvalidXLogRecPtr); + + /* + * By this time the txn has the prepare record information and it is + * important to use that so that downstream gets the accurate + * information. If instead, we have passed commit information here + * then downstream can behave as it has already replayed commit + * prepared after the restart. + */ + ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn, + txn->commit_time, txn->origin_id, txn->origin_lsn); + } + + txn->final_lsn = commit_lsn; + txn->end_lsn = end_lsn; + txn->commit_time = commit_time; + txn->origin_id = origin_id; + txn->origin_lsn = origin_lsn; + + if (is_commit) + rb->commit_prepared(rb, txn, commit_lsn); + else + rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time); + + /* cleanup: make sure there's no cache pollution */ + ReorderBufferExecuteInvalidations(txn->ninvalidations, + txn->invalidations); + ReorderBufferCleanupTXN(rb, txn); +} + +/* * Abort a transaction that possibly has previous changes. Needs to be first * called for subtransactions and then for the toplevel xid. * @@ -2606,6 +2873,39 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) } /* + * Invalidate cache for those transactions that need to be skipped just in case + * catalogs were manipulated as part of the transaction. + * + * Note that this is a special-purpose function for prepared transactions where + * we don't want to clean up the TXN even when we decide to skip it. See + * DecodePrepare. + */ +void +ReorderBufferInvalidate(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + /* unknown, nothing to do */ + if (txn == NULL) + return; + + /* + * Process cache invalidation messages if there are any. Even if we're not + * interested in the transaction's contents, it could have manipulated the + * catalog and we need to update the caches according to that. + */ + if (txn->base_snapshot != NULL && txn->ninvalidations > 0) + ReorderBufferImmediateInvalidation(rb, txn->ninvalidations, + txn->invalidations); + else + Assert(txn->ninvalidations == 0); +} + + +/* * Execute invalidations happening outside the context of a decoded * transaction. That currently happens either for xid-less commits * (cf. RecordTransactionCommit()) or for invalidations in uninteresting diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 6afc25e8d3d..15b07a54c11 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -834,6 +834,13 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn) if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid)) continue; + /* + * We don't need to add snapshot to prepared transactions as they + * should not see the new catalog contents. + */ + if (rbtxn_prepared(txn) || rbtxn_skip_prepared(txn)) + continue; + elog(DEBUG2, "adding a new snapshot to %u at %X/%X", txn->xid, (uint32) (lsn >> 32), (uint32) lsn); diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 9f982137d93..bab31bf7af7 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -174,6 +174,8 @@ typedef struct ReorderBufferChange #define RBTXN_IS_STREAMED 0x0010 #define RBTXN_HAS_TOAST_INSERT 0x0020 #define RBTXN_HAS_SPEC_INSERT 0x0040 +#define RBTXN_PREPARE 0x0080 +#define RBTXN_SKIPPED_PREPARE 0x0100 /* Does the transaction have catalog changes? */ #define rbtxn_has_catalog_changes(txn) \ @@ -233,6 +235,18 @@ typedef struct ReorderBufferChange ((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \ ) +/* Has this transaction been prepared? */ +#define rbtxn_prepared(txn) \ +( \ + ((txn)->txn_flags & RBTXN_PREPARE) != 0 \ +) + +/* prepare for this transaction skipped? */ +#define rbtxn_skip_prepared(txn) \ +( \ + ((txn)->txn_flags & RBTXN_SKIPPED_PREPARE) != 0 \ +) + typedef struct ReorderBufferTXN { /* See above */ @@ -258,10 +272,11 @@ typedef struct ReorderBufferTXN XLogRecPtr first_lsn; /* ---- - * LSN of the record that lead to this xact to be committed or + * LSN of the record that lead to this xact to be prepared or committed or * aborted. This can be a * * plain commit record * * plain commit record, of a parent transaction + * * prepared tansaction * * prepared transaction commit * * plain abort record * * prepared transaction abort @@ -293,7 +308,8 @@ typedef struct ReorderBufferTXN XLogRecPtr origin_lsn; /* - * Commit time, only known when we read the actual commit record. + * Commit or Prepare time, only known when we read the actual commit or + * prepare record. */ TimestampTz commit_time; @@ -625,12 +641,18 @@ void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapsho void ReorderBufferCommit(ReorderBuffer *, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn); +void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn, + char *gid, bool is_commit); void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn); void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn); void ReorderBufferAbort(ReorderBuffer *, TransactionId, XLogRecPtr lsn); void ReorderBufferAbortOld(ReorderBuffer *, TransactionId xid); void ReorderBufferForget(ReorderBuffer *, TransactionId, XLogRecPtr lsn); +void ReorderBufferInvalidate(ReorderBuffer *, TransactionId, XLogRecPtr lsn); void ReorderBufferSetBaseSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap); void ReorderBufferAddSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap); @@ -644,10 +666,17 @@ void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr l void ReorderBufferImmediateInvalidation(ReorderBuffer *, uint32 ninvalidations, SharedInvalidationMessage *invalidations); void ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn); + void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn); bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid); bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid); +bool ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, + TimestampTz prepare_time, + RepOriginId origin_id, XLogRecPtr origin_lsn); +void ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid); +void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, char *gid); ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *); TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb); |