aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMasahiko Sawada <msawada@postgresql.org>2024-07-11 22:48:23 +0900
committerMasahiko Sawada <msawada@postgresql.org>2024-07-11 22:48:23 +0900
commitbb19b70081e2248f242cd00227abff5b1e105eb6 (patch)
tree9f3eb7f5c0cf592eee3eee43e22db3efd4b39bf1 /src
parentc194de0713ebe71aaeeb5ebed4af2390cc1b521c (diff)
downloadpostgresql-bb19b70081e2248f242cd00227abff5b1e105eb6.tar.gz
postgresql-bb19b70081e2248f242cd00227abff5b1e105eb6.zip
Fix possibility of logical decoding partial transaction changes.
When creating and initializing a logical slot, the restart_lsn is set to the latest WAL insertion point (or the latest replay point on standbys). Subsequently, WAL records are decoded from that point to find the start point for extracting changes in the DecodingContextFindStartpoint() function. Since the initial restart_lsn could be in the middle of a transaction, the start point must be a consistent point where we won't see the data for partial transactions. Previously, when not building a full snapshot, serialized snapshots were restored, and the SnapBuild jumps to the consistent state even while finding the start point. Consequently, the slot's restart_lsn and confirmed_flush could be set to the middle of a transaction. This could lead to various unexpected consequences. Specifically, there were reports of logical decoding decoding partial transactions, and assertion failures occurred because only subtransactions were decoded without decoding their top-level transaction until decoding the commit record. To resolve this issue, the changes prevent restoring the serialized snapshot and jumping to the consistent state while finding the start point. On v17 and HEAD, a flag indicating whether snapshot restores should be skipped has been added to the SnapBuild struct, and SNAPBUILD_VERSION has been bumpded. On backbranches, the flag is stored in the LogicalDecodingContext instead, preserving on-disk compatibility. Backpatch to all supported versions. Reported-by: Drew Callahan Reviewed-by: Amit Kapila, Hayato Kuroda Discussion: https://postgr.es/m/2444AA15-D21B-4CCE-8052-52C7C2DAFE5C%40amazon.com Backpatch-through: 12
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/logical.c7
-rw-r--r--src/backend/replication/logical/snapbuild.c29
-rw-r--r--src/include/replication/snapbuild.h1
3 files changed, 28 insertions, 9 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 99f31849bb1..f8ef5d56d26 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -152,6 +152,7 @@ StartupDecodingContext(List *output_plugin_options,
TransactionId xmin_horizon,
bool need_full_snapshot,
bool fast_forward,
+ bool in_create,
XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
@@ -212,7 +213,7 @@ StartupDecodingContext(List *output_plugin_options,
ctx->reorder = ReorderBufferAllocate();
ctx->snapshot_builder =
AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
- need_full_snapshot, slot->data.two_phase_at);
+ need_full_snapshot, in_create, slot->data.two_phase_at);
ctx->reorder->private_data = ctx;
@@ -438,7 +439,7 @@ CreateInitDecodingContext(const char *plugin,
ReplicationSlotSave();
ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
- need_full_snapshot, false,
+ need_full_snapshot, false, true,
xl_routine, prepare_write, do_write,
update_progress);
@@ -592,7 +593,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
ctx = StartupDecodingContext(output_plugin_options,
start_lsn, InvalidTransactionId, false,
- fast_forward, xl_routine, prepare_write,
+ fast_forward, false, xl_routine, prepare_write,
do_write, update_progress);
/* call output plugin initialization callback */
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index e37e22f4417..ae676145e60 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -190,6 +190,14 @@ struct SnapBuild
bool building_full_snapshot;
/*
+ * Indicates if we are using the snapshot builder for the creation of a
+ * logical replication slot. If it's true, the start point for decoding
+ * changes is not determined yet. So we skip snapshot restores to properly
+ * find the start point. See SnapBuildFindSnapshot() for details.
+ */
+ bool in_slot_creation;
+
+ /*
* Snapshot that's valid to see the catalog state seen at this moment.
*/
Snapshot snapshot;
@@ -317,6 +325,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
TransactionId xmin_horizon,
XLogRecPtr start_lsn,
bool need_full_snapshot,
+ bool in_slot_creation,
XLogRecPtr two_phase_at)
{
MemoryContext context;
@@ -347,6 +356,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
builder->initial_xmin_horizon = xmin_horizon;
builder->start_decoding_at = start_lsn;
+ builder->in_slot_creation = in_slot_creation;
builder->building_full_snapshot = need_full_snapshot;
builder->two_phase_at = two_phase_at;
@@ -1327,10 +1337,12 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
* state while waiting on c)'s sub-states.
*
* b) This (in a previous run) or another decoding slot serialized a
- * snapshot to disk that we can use. Can't use this method for the
- * initial snapshot when slot is being created and needs full snapshot
- * for export or direct use, as that snapshot will only contain catalog
- * modifying transactions.
+ * snapshot to disk that we can use. Can't use this method while finding
+ * the start point for decoding changes as the restart LSN would be an
+ * arbitrary LSN but we need to find the start point to extract changes
+ * where we won't see the data for partial transactions. Also, we cannot
+ * use this method when a slot needs a full snapshot for export or direct
+ * use, as that snapshot will only contain catalog modifying transactions.
*
* c) First incrementally build a snapshot for catalog tuples
* (BUILDING_SNAPSHOT), that requires all, already in-progress,
@@ -1395,8 +1407,13 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
return false;
}
- /* b) valid on disk state and not building full snapshot */
+
+ /*
+ * b) valid on disk state and while neither building full snapshot nor
+ * creating a slot.
+ */
else if (!builder->building_full_snapshot &&
+ !builder->in_slot_creation &&
SnapBuildRestore(builder, lsn))
{
/* there won't be any state to cleanup */
@@ -1580,7 +1597,7 @@ typedef struct SnapBuildOnDisk
offsetof(SnapBuildOnDisk, version)
#define SNAPBUILD_MAGIC 0x51A1E001
-#define SNAPBUILD_VERSION 5
+#define SNAPBUILD_VERSION 6
/*
* Store/Load a snapshot from disk, depending on the snapshot builder's state.
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index a3360a1c5ea..caa5113ff81 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -62,6 +62,7 @@ extern void CheckPointSnapBuild(void);
extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *reorder,
TransactionId xmin_horizon, XLogRecPtr start_lsn,
bool need_full_snapshot,
+ bool in_slot_creation,
XLogRecPtr two_phase_at);
extern void FreeSnapshotBuilder(SnapBuild *builder);