aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/replication/logical/decode.c14
-rw-r--r--src/backend/replication/logical/reorderbuffer.c10
2 files changed, 22 insertions, 2 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 390963d6b54..bc7cbb25fc4 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -564,7 +564,7 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
TransactionId xid = XLogRecGetXid(r);
uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
RepOriginId origin_id = XLogRecGetOrigin(r);
- Snapshot snapshot;
+ Snapshot snapshot = NULL;
xl_logical_message *message;
if (info != XLOG_LOGICAL_MESSAGE)
@@ -594,7 +594,17 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
SnapBuildXactNeedsSkip(builder, buf->origptr)))
return;
- snapshot = SnapBuildGetOrBuildSnapshot(builder, xid);
+ /*
+ * If this is a non-transactional change, get the snapshot we're expected
+ * to use. We only get here when the snapshot is consistent, and the
+ * change is not meant to be skipped.
+ *
+ * For transactional changes we don't need a snapshot, we'll use the
+ * regular snapshot maintained by ReorderBuffer. We just leave it NULL.
+ */
+ if (!message->transactional)
+ snapshot = SnapBuildGetOrBuildSnapshot(builder, xid);
+
ReorderBufferQueueMessage(ctx->reorder, xid, snapshot, buf->endptr,
message->transactional,
message->message, /* first part of message is
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index e3b67e4447a..b1882ae5ecf 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -828,6 +828,13 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
Assert(xid != InvalidTransactionId);
+ /*
+ * We don't expect snapshots for transactional changes - we'll use the
+ * snapshot derived later during apply (unless the change gets
+ * skipped).
+ */
+ Assert(!snapshot);
+
oldcontext = MemoryContextSwitchTo(rb->context);
change = ReorderBufferGetChange(rb);
@@ -846,6 +853,9 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
ReorderBufferTXN *txn = NULL;
volatile Snapshot snapshot_now = snapshot;
+ /* Non-transactional changes require a valid snapshot. */
+ Assert(snapshot_now);
+
if (xid != InvalidTransactionId)
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);