diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/replication/logical/decode.c | 28 | ||||
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 21 | ||||
-rw-r--r-- | src/backend/replication/logical/snapbuild.c | 6 | ||||
-rw-r--r-- | src/include/replication/reorderbuffer.h | 2 |
4 files changed, 42 insertions, 15 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 61c3ed740bb..c530fe9ba09 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -78,6 +78,14 @@ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); * Take every XLogReadRecord()ed record and perform the actions required to * decode it using the output plugin already setup in the logical decoding * context. + * + * NB: Note that every record's xid needs to be processed by reorderbuffer + * (xids contained in the content of records are not relevant for this rule). + * That means that for records which'd otherwise not go through the + * reorderbuffer ReorderBufferProcessXid() has to be called. We don't want to + * call ReorderBufferProcessXid for each record type by default, because + * e.g. empty xacts can be handled more efficiently if there's no previous + * state for them. */ void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record) @@ -135,6 +143,9 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor case RM_BRIN_ID: case RM_COMMIT_TS_ID: case RM_REPLORIGIN_ID: + /* just deal with xid, and done */ + ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record), + buf.origptr); break; case RM_NEXT_ID: elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record)); @@ -150,6 +161,9 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) SnapBuild *builder = ctx->snapshot_builder; uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK; + ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(buf->record), + buf->origptr); + switch (info) { /* this is also used in END_OF_RECOVERY checkpoints */ @@ -191,7 +205,12 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) XLogReaderState *r = buf->record; uint8 info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK; - /* no point in doing anything yet, data could not be decoded anyway */ + /* + * No point in doing anything yet, data could not be decoded anyway. It's + * ok not to call ReorderBufferProcessXid() in that case, except in the + * assignment case there'll not be any later records with the same xid; + * and in the assignment case we'll not decode those xacts. + */ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) return; @@ -260,6 +279,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) * transactions in the changestream allowing for a kind of * distributed 2PC. */ + ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr); break; default: elog(ERROR, "unexpected RM_XACT_ID record type: %u", info); @@ -276,6 +296,8 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) XLogReaderState *r = buf->record; uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; + ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr); + switch (info) { case XLOG_RUNNING_XACTS: @@ -313,6 +335,8 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) TransactionId xid = XLogRecGetXid(buf->record); SnapBuild *builder = ctx->snapshot_builder; + ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr); + /* no point in doing anything yet */ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) return; @@ -366,6 +390,8 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) TransactionId xid = XLogRecGetXid(buf->record); SnapBuild *builder = ctx->snapshot_builder; + ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr); + /* no point in doing anything yet */ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) return; diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 478c3e874af..b3276c74c7c 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1741,16 +1741,21 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) /* - * Check whether a transaction is already known in this module.xs + * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at + * least once for every xid in XLogRecord->xl_xid (other places in records + * may, but do not have to be passed through here). + * + * Reorderbuffer keeps some datastructures about transactions in LSN order, + * for efficiency. To do that it has to know about when transactions are seen + * first in the WAL. As many types of records are not actually interesting for + * logical decoding, they do not necessarily pass though here. */ -bool -ReorderBufferIsXidKnown(ReorderBuffer *rb, TransactionId xid) +void +ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) { - ReorderBufferTXN *txn; - - txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, - false); - return txn != NULL; + /* many records won't have an xid assigned, centralize check here */ + if (xid != InvalidTransactionId) + ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); } /* diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 16100538663..eb571045fea 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -635,8 +635,6 @@ SnapBuildClearExportedSnapshot() bool SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn) { - bool is_old_tx; - /* * We can't handle data in transactions if we haven't built a snapshot * yet, so don't store them. @@ -657,9 +655,7 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn) * If the reorderbuffer doesn't yet have a snapshot, add one now, it will * be needed to decode the change we're currently processing. */ - is_old_tx = ReorderBufferIsXidKnown(builder->reorder, xid); - - if (!is_old_tx || !ReorderBufferXidHasBaseSnapshot(builder->reorder, xid)) + if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid)) { /* only build a new snapshot if we don't have a prebuilt one */ if (builder->snapshot == NULL) diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index bf0d790053b..cf47a117200 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -369,7 +369,7 @@ void ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn CommandId cmin, CommandId cmax, CommandId combocid); void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs); -bool ReorderBufferIsXidKnown(ReorderBuffer *, TransactionId xid); +void ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn); void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn); bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid); bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid); |