aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/decode.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r--src/backend/replication/logical/decode.c37
1 files changed, 25 insertions, 12 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 8ec5adfd909..4911e98ec2d 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -411,19 +411,24 @@ heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/*
* If we don't have snapshot or we are just fast-forwarding, there is no
- * point in decoding changes.
+ * point in decoding data changes. However, it's crucial to build the base
+ * snapshot during fast-forward mode (as is done in
+ * SnapBuildProcessChange()) because we require the snapshot's xmin when
+ * determining the candidate catalog_xmin for the replication slot. See
+ * SnapBuildProcessRunningXacts().
*/
- if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
- ctx->fast_forward)
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
return;
switch (info)
{
case XLOG_HEAP2_MULTI_INSERT:
- if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
+ !ctx->fast_forward)
DecodeMultiInsert(ctx, buf);
break;
case XLOG_HEAP2_NEW_CID:
+ if (!ctx->fast_forward)
{
xl_heap_new_cid *xlrec;
@@ -470,16 +475,20 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/*
* If we don't have snapshot or we are just fast-forwarding, there is no
- * point in decoding data changes.
+ * point in decoding data changes. However, it's crucial to build the base
+ * snapshot during fast-forward mode (as is done in
+ * SnapBuildProcessChange()) because we require the snapshot's xmin when
+ * determining the candidate catalog_xmin for the replication slot. See
+ * SnapBuildProcessRunningXacts().
*/
- if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
- ctx->fast_forward)
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
return;
switch (info)
{
case XLOG_HEAP_INSERT:
- if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
+ !ctx->fast_forward)
DecodeInsert(ctx, buf);
break;
@@ -490,17 +499,20 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
*/
case XLOG_HEAP_HOT_UPDATE:
case XLOG_HEAP_UPDATE:
- if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
+ !ctx->fast_forward)
DecodeUpdate(ctx, buf);
break;
case XLOG_HEAP_DELETE:
- if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
+ !ctx->fast_forward)
DecodeDelete(ctx, buf);
break;
case XLOG_HEAP_TRUNCATE:
- if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
+ !ctx->fast_forward)
DecodeTruncate(ctx, buf);
break;
@@ -528,7 +540,8 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
break;
case XLOG_HEAP_CONFIRM:
- if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
+ !ctx->fast_forward)
DecodeSpecConfirm(ctx, buf);
break;