diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/replication/logical/decode.c | 14 | ||||
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 10 |
2 files changed, 22 insertions, 2 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 19cd0bf76ac..c1d0470209b 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -517,7 +517,7 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) TransactionId xid = XLogRecGetXid(r); uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; RepOriginId origin_id = XLogRecGetOrigin(r); - Snapshot snapshot; + Snapshot snapshot = NULL; xl_logical_message *message; if (info != XLOG_LOGICAL_MESSAGE) @@ -547,7 +547,17 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) SnapBuildXactNeedsSkip(builder, buf->origptr))) return; - snapshot = SnapBuildGetOrBuildSnapshot(builder, xid); + /* + * If this is a non-transactional change, get the snapshot we're expected + * to use. We only get here when the snapshot is consistent, and the + * change is not meant to be skipped. + * + * For transactional changes we don't need a snapshot, we'll use the + * regular snapshot maintained by ReorderBuffer. We just leave it NULL. + */ + if (!message->transactional) + snapshot = SnapBuildGetOrBuildSnapshot(builder, xid); + ReorderBufferQueueMessage(ctx->reorder, xid, snapshot, buf->endptr, message->transactional, message->message, /* first part of message is diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 76e6cf5134f..d8345a75b68 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -622,6 +622,13 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, Assert(xid != InvalidTransactionId); + /* + * We don't expect snapshots for transactional changes - we'll use the + * snapshot derived later during apply (unless the change gets + * skipped). + */ + Assert(!snapshot); + oldcontext = MemoryContextSwitchTo(rb->context); change = ReorderBufferGetChange(rb); @@ -640,6 +647,9 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, ReorderBufferTXN *txn = NULL; volatile Snapshot snapshot_now = snapshot; + /* Non-transactional changes require a valid snapshot. */ + Assert(snapshot_now); + if (xid != InvalidTransactionId) txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); |