aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/replication/logical/decode.c14
-rw-r--r--src/backend/replication/logical/reorderbuffer.c10
2 files changed, 22 insertions, 2 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 87cbd08e858..297eb11b5a8 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -514,7 +514,7 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
TransactionId xid = XLogRecGetXid(r);
uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
RepOriginId origin_id = XLogRecGetOrigin(r);
- Snapshot snapshot;
+ Snapshot snapshot = NULL;
xl_logical_message *message;
if (info != XLOG_LOGICAL_MESSAGE)
@@ -544,7 +544,17 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
SnapBuildXactNeedsSkip(builder, buf->origptr)))
return;
- snapshot = SnapBuildGetOrBuildSnapshot(builder, xid);
+ /*
+ * If this is a non-transactional change, get the snapshot we're expected
+ * to use. We only get here when the snapshot is consistent, and the
+ * change is not meant to be skipped.
+ *
+ * For transactional changes we don't need a snapshot, we'll use the
+ * regular snapshot maintained by ReorderBuffer. We just leave it NULL.
+ */
+ if (!message->transactional)
+ snapshot = SnapBuildGetOrBuildSnapshot(builder, xid);
+
ReorderBufferQueueMessage(ctx->reorder, xid, snapshot, buf->endptr,
message->transactional,
message->message, /* first part of message is
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 4b6adfb92b5..fb323a80ec9 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -672,6 +672,13 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
Assert(xid != InvalidTransactionId);
+ /*
+ * We don't expect snapshots for transactional changes - we'll use the
+ * snapshot derived later during apply (unless the change gets
+ * skipped).
+ */
+ Assert(!snapshot);
+
oldcontext = MemoryContextSwitchTo(rb->context);
change = ReorderBufferGetChange(rb);
@@ -690,6 +697,9 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
ReorderBufferTXN *txn = NULL;
volatile Snapshot snapshot_now = snapshot;
+ /* Non-transactional changes require a valid snapshot. */
+ Assert(snapshot_now);
+
if (xid != InvalidTransactionId)
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);