diff options
author | Amit Kapila <akapila@postgresql.org> | 2025-04-28 11:22:07 +0530 |
---|---|---|
committer | Amit Kapila <akapila@postgresql.org> | 2025-04-28 11:22:07 +0530 |
commit | 36148b22ee09f504e049e7da2495d32ca1c265bd (patch) | |
tree | e18de6af16d4bf3ba1494017211a17e79b7bd5fd /src | |
parent | d96206f259d6799de066c6fa88a4b51c494f7167 (diff) | |
download | postgresql-36148b22ee09f504e049e7da2495d32ca1c265bd.tar.gz postgresql-36148b22ee09f504e049e7da2495d32ca1c265bd.zip |
Fix xmin advancement during fast_forward decoding.
During logical decoding, we advance catalog_xmin of logical too early in
fast_forward mode, resulting in required catalog data being removed by
vacuum. This mode is normally used to advance the slot without processing
the changes, but we still can't let the slot's xmin to advance to an
incorrect value.
Commit f49a80c481 fixed a similar issue where the logical slot's
catalog_xmin was getting advanced prematurely during non-fast-forward
mode. During xl_running_xacts processing, instead of directly advancing
the slot's xmin to the oldest running xid in the record, it allowed the
xmin to be held back for snapshots that can be used for
not-yet-replayed transactions, as those might consider older txns as
running too. However, it missed the fact that the same problem can happen
during fast_forward mode decoding, as we won't build a base snapshot in
that mode, and the future call to get_changes from the same slot can miss
seeing the required catalog changes leading to incorrect reslts.
This commit allows building the base snapshot even in fast_forward mode to
prevent the early advancement of xmin.
Reported-by: Amit Kapila <amit.kapila16@gmail.com>
Author: Zhijie Hou <houzj.fnst@fujitsu.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Reviewed-by: shveta malik <shveta.malik@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Backpatch-through: 13
Discussion: https://postgr.es/m/CAA4eK1LqWncUOqKijiafe+Ypt1gQAQRjctKLMY953J79xDBgAg@mail.gmail.com
Discussion: https://postgr.es/m/OS0PR01MB57163087F86621D44D9A72BF94BB2@OS0PR01MB5716.jpnprd01.prod.outlook.com
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/replication/logical/decode.c | 37 |
1 files changed, 25 insertions, 12 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 8ec5adfd909..4911e98ec2d 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -411,19 +411,24 @@ heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * If we don't have snapshot or we are just fast-forwarding, there is no - * point in decoding changes. + * point in decoding data changes. However, it's crucial to build the base + * snapshot during fast-forward mode (as is done in + * SnapBuildProcessChange()) because we require the snapshot's xmin when + * determining the candidate catalog_xmin for the replication slot. See + * SnapBuildProcessRunningXacts(). */ - if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || - ctx->fast_forward) + if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) return; switch (info) { case XLOG_HEAP2_MULTI_INSERT: - if (SnapBuildProcessChange(builder, xid, buf->origptr)) + if (SnapBuildProcessChange(builder, xid, buf->origptr) && + !ctx->fast_forward) DecodeMultiInsert(ctx, buf); break; case XLOG_HEAP2_NEW_CID: + if (!ctx->fast_forward) { xl_heap_new_cid *xlrec; @@ -470,16 +475,20 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * If we don't have snapshot or we are just fast-forwarding, there is no - * point in decoding data changes. + * point in decoding data changes. However, it's crucial to build the base + * snapshot during fast-forward mode (as is done in + * SnapBuildProcessChange()) because we require the snapshot's xmin when + * determining the candidate catalog_xmin for the replication slot. See + * SnapBuildProcessRunningXacts(). */ - if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || - ctx->fast_forward) + if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) return; switch (info) { case XLOG_HEAP_INSERT: - if (SnapBuildProcessChange(builder, xid, buf->origptr)) + if (SnapBuildProcessChange(builder, xid, buf->origptr) && + !ctx->fast_forward) DecodeInsert(ctx, buf); break; @@ -490,17 +499,20 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) */ case XLOG_HEAP_HOT_UPDATE: case XLOG_HEAP_UPDATE: - if (SnapBuildProcessChange(builder, xid, buf->origptr)) + if (SnapBuildProcessChange(builder, xid, buf->origptr) && + !ctx->fast_forward) DecodeUpdate(ctx, buf); break; case XLOG_HEAP_DELETE: - if (SnapBuildProcessChange(builder, xid, buf->origptr)) + if (SnapBuildProcessChange(builder, xid, buf->origptr) && + !ctx->fast_forward) DecodeDelete(ctx, buf); break; case XLOG_HEAP_TRUNCATE: - if (SnapBuildProcessChange(builder, xid, buf->origptr)) + if (SnapBuildProcessChange(builder, xid, buf->origptr) && + !ctx->fast_forward) DecodeTruncate(ctx, buf); break; @@ -528,7 +540,8 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) break; case XLOG_HEAP_CONFIRM: - if (SnapBuildProcessChange(builder, xid, buf->origptr)) + if (SnapBuildProcessChange(builder, xid, buf->origptr) && + !ctx->fast_forward) DecodeSpecConfirm(ctx, buf); break; |