diff options
-rw-r--r-- | src/backend/replication/logical/decode.c | 14 | ||||
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 10 |
2 files changed, 22 insertions, 2 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 87cbd08e858..297eb11b5a8 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -514,7 +514,7 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) TransactionId xid = XLogRecGetXid(r); uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; RepOriginId origin_id = XLogRecGetOrigin(r); - Snapshot snapshot; + Snapshot snapshot = NULL; xl_logical_message *message; if (info != XLOG_LOGICAL_MESSAGE) @@ -544,7 +544,17 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) SnapBuildXactNeedsSkip(builder, buf->origptr))) return; - snapshot = SnapBuildGetOrBuildSnapshot(builder, xid); + /* + * If this is a non-transactional change, get the snapshot we're expected + * to use. We only get here when the snapshot is consistent, and the + * change is not meant to be skipped. + * + * For transactional changes we don't need a snapshot, we'll use the + * regular snapshot maintained by ReorderBuffer. We just leave it NULL. + */ + if (!message->transactional) + snapshot = SnapBuildGetOrBuildSnapshot(builder, xid); + ReorderBufferQueueMessage(ctx->reorder, xid, snapshot, buf->endptr, message->transactional, message->message, /* first part of message is diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 4b6adfb92b5..fb323a80ec9 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -672,6 +672,13 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, Assert(xid != InvalidTransactionId); + /* + * We don't expect snapshots for transactional changes - we'll use the + * snapshot derived later during apply (unless the change gets + * skipped). + */ + Assert(!snapshot); + oldcontext = MemoryContextSwitchTo(rb->context); change = ReorderBufferGetChange(rb); @@ -690,6 +697,9 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, ReorderBufferTXN *txn = NULL; volatile Snapshot snapshot_now = snapshot; + /* Non-transactional changes require a valid snapshot. */ + Assert(snapshot_now); + if (xid != InvalidTransactionId) txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); |