aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2025-04-28 11:22:07 +0530
committerAmit Kapila <akapila@postgresql.org>2025-04-28 11:22:07 +0530
commit36148b22ee09f504e049e7da2495d32ca1c265bd (patch)
treee18de6af16d4bf3ba1494017211a17e79b7bd5fd /src
parentd96206f259d6799de066c6fa88a4b51c494f7167 (diff)
downloadpostgresql-36148b22ee09f504e049e7da2495d32ca1c265bd.tar.gz
postgresql-36148b22ee09f504e049e7da2495d32ca1c265bd.zip
Fix xmin advancement during fast_forward decoding.
During logical decoding, we advance catalog_xmin of logical too early in fast_forward mode, resulting in required catalog data being removed by vacuum. This mode is normally used to advance the slot without processing the changes, but we still can't let the slot's xmin to advance to an incorrect value. Commit f49a80c481 fixed a similar issue where the logical slot's catalog_xmin was getting advanced prematurely during non-fast-forward mode. During xl_running_xacts processing, instead of directly advancing the slot's xmin to the oldest running xid in the record, it allowed the xmin to be held back for snapshots that can be used for not-yet-replayed transactions, as those might consider older txns as running too. However, it missed the fact that the same problem can happen during fast_forward mode decoding, as we won't build a base snapshot in that mode, and the future call to get_changes from the same slot can miss seeing the required catalog changes leading to incorrect reslts. This commit allows building the base snapshot even in fast_forward mode to prevent the early advancement of xmin. Reported-by: Amit Kapila <amit.kapila16@gmail.com> Author: Zhijie Hou <houzj.fnst@fujitsu.com> Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com> Reviewed-by: shveta malik <shveta.malik@gmail.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Backpatch-through: 13 Discussion: https://postgr.es/m/CAA4eK1LqWncUOqKijiafe+Ypt1gQAQRjctKLMY953J79xDBgAg@mail.gmail.com Discussion: https://postgr.es/m/OS0PR01MB57163087F86621D44D9A72BF94BB2@OS0PR01MB5716.jpnprd01.prod.outlook.com
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/decode.c37
1 files changed, 25 insertions, 12 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 8ec5adfd909..4911e98ec2d 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -411,19 +411,24 @@ heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/*
* If we don't have snapshot or we are just fast-forwarding, there is no
- * point in decoding changes.
+ * point in decoding data changes. However, it's crucial to build the base
+ * snapshot during fast-forward mode (as is done in
+ * SnapBuildProcessChange()) because we require the snapshot's xmin when
+ * determining the candidate catalog_xmin for the replication slot. See
+ * SnapBuildProcessRunningXacts().
*/
- if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
- ctx->fast_forward)
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
return;
switch (info)
{
case XLOG_HEAP2_MULTI_INSERT:
- if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
+ !ctx->fast_forward)
DecodeMultiInsert(ctx, buf);
break;
case XLOG_HEAP2_NEW_CID:
+ if (!ctx->fast_forward)
{
xl_heap_new_cid *xlrec;
@@ -470,16 +475,20 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/*
* If we don't have snapshot or we are just fast-forwarding, there is no
- * point in decoding data changes.
+ * point in decoding data changes. However, it's crucial to build the base
+ * snapshot during fast-forward mode (as is done in
+ * SnapBuildProcessChange()) because we require the snapshot's xmin when
+ * determining the candidate catalog_xmin for the replication slot. See
+ * SnapBuildProcessRunningXacts().
*/
- if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
- ctx->fast_forward)
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
return;
switch (info)
{
case XLOG_HEAP_INSERT:
- if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
+ !ctx->fast_forward)
DecodeInsert(ctx, buf);
break;
@@ -490,17 +499,20 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
*/
case XLOG_HEAP_HOT_UPDATE:
case XLOG_HEAP_UPDATE:
- if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
+ !ctx->fast_forward)
DecodeUpdate(ctx, buf);
break;
case XLOG_HEAP_DELETE:
- if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
+ !ctx->fast_forward)
DecodeDelete(ctx, buf);
break;
case XLOG_HEAP_TRUNCATE:
- if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
+ !ctx->fast_forward)
DecodeTruncate(ctx, buf);
break;
@@ -528,7 +540,8 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
break;
case XLOG_HEAP_CONFIRM:
- if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
+ !ctx->fast_forward)
DecodeSpecConfirm(ctx, buf);
break;