diff options
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r-- | src/backend/replication/logical/decode.c | 37 |
1 files changed, 25 insertions, 12 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 8ec5adfd909..4911e98ec2d 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -411,19 +411,24 @@ heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * If we don't have snapshot or we are just fast-forwarding, there is no - * point in decoding changes. + * point in decoding data changes. However, it's crucial to build the base + * snapshot during fast-forward mode (as is done in + * SnapBuildProcessChange()) because we require the snapshot's xmin when + * determining the candidate catalog_xmin for the replication slot. See + * SnapBuildProcessRunningXacts(). */ - if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || - ctx->fast_forward) + if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) return; switch (info) { case XLOG_HEAP2_MULTI_INSERT: - if (SnapBuildProcessChange(builder, xid, buf->origptr)) + if (SnapBuildProcessChange(builder, xid, buf->origptr) && + !ctx->fast_forward) DecodeMultiInsert(ctx, buf); break; case XLOG_HEAP2_NEW_CID: + if (!ctx->fast_forward) { xl_heap_new_cid *xlrec; @@ -470,16 +475,20 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * If we don't have snapshot or we are just fast-forwarding, there is no - * point in decoding data changes. + * point in decoding data changes. However, it's crucial to build the base + * snapshot during fast-forward mode (as is done in + * SnapBuildProcessChange()) because we require the snapshot's xmin when + * determining the candidate catalog_xmin for the replication slot. See + * SnapBuildProcessRunningXacts(). */ - if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || - ctx->fast_forward) + if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) return; switch (info) { case XLOG_HEAP_INSERT: - if (SnapBuildProcessChange(builder, xid, buf->origptr)) + if (SnapBuildProcessChange(builder, xid, buf->origptr) && + !ctx->fast_forward) DecodeInsert(ctx, buf); break; @@ -490,17 +499,20 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) */ case XLOG_HEAP_HOT_UPDATE: case XLOG_HEAP_UPDATE: - if (SnapBuildProcessChange(builder, xid, buf->origptr)) + if (SnapBuildProcessChange(builder, xid, buf->origptr) && + !ctx->fast_forward) DecodeUpdate(ctx, buf); break; case XLOG_HEAP_DELETE: - if (SnapBuildProcessChange(builder, xid, buf->origptr)) + if (SnapBuildProcessChange(builder, xid, buf->origptr) && + !ctx->fast_forward) DecodeDelete(ctx, buf); break; case XLOG_HEAP_TRUNCATE: - if (SnapBuildProcessChange(builder, xid, buf->origptr)) + if (SnapBuildProcessChange(builder, xid, buf->origptr) && + !ctx->fast_forward) DecodeTruncate(ctx, buf); break; @@ -528,7 +540,8 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) break; case XLOG_HEAP_CONFIRM: - if (SnapBuildProcessChange(builder, xid, buf->origptr)) + if (SnapBuildProcessChange(builder, xid, buf->origptr) && + !ctx->fast_forward) DecodeSpecConfirm(ctx, buf); break; |