aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/test_decoding/Makefile3
-rw-r--r--contrib/test_decoding/expected/skip_snapshot_restore.out45
-rw-r--r--contrib/test_decoding/meson.build1
-rw-r--r--contrib/test_decoding/specs/skip_snapshot_restore.spec46
-rw-r--r--src/backend/replication/logical/logical.c7
-rw-r--r--src/backend/replication/logical/snapbuild.c19
-rw-r--r--src/include/replication/logical.h6
7 files changed, 119 insertions, 8 deletions
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index c7ce6037064..a4ba1a509ae 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -8,7 +8,8 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
spill slot truncate stream stats twophase twophase_stream
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
- twophase_snapshot slot_creation_error catalog_change_snapshot
+ twophase_snapshot slot_creation_error catalog_change_snapshot \
+ skip_snapshot_restore
REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
diff --git a/contrib/test_decoding/expected/skip_snapshot_restore.out b/contrib/test_decoding/expected/skip_snapshot_restore.out
new file mode 100644
index 00000000000..c64dbd9c4e0
--- /dev/null
+++ b/contrib/test_decoding/expected/skip_snapshot_restore.out
@@ -0,0 +1,45 @@
+Parsed test spec with 3 sessions
+
+starting permutation: s0_init s0_begin s0_insert1 s1_init s2_checkpoint s2_get_changes_slot0 s0_insert2 s0_commit s1_get_changes_slot0 s1_get_changes_slot1
+step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot0', 'test_decoding');
+?column?
+--------
+init
+(1 row)
+
+step s0_begin: BEGIN;
+step s0_insert1: INSERT INTO tbl VALUES (1);
+step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('slot1', 'test_decoding'); <waiting ...>
+step s2_checkpoint: CHECKPOINT;
+step s2_get_changes_slot0: SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+----
+(0 rows)
+
+step s0_insert2: INSERT INTO tbl VALUES (2);
+step s0_commit: COMMIT;
+step s1_init: <... completed>
+?column?
+--------
+init
+(1 row)
+
+step s1_get_changes_slot0: SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+-----------------------------------------
+BEGIN
+table public.tbl: INSERT: val1[integer]:1
+table public.tbl: INSERT: val1[integer]:2
+COMMIT
+(4 rows)
+
+step s1_get_changes_slot1: SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+----
+(0 rows)
+
+?column?
+--------
+stop
+(1 row)
+
diff --git a/contrib/test_decoding/meson.build b/contrib/test_decoding/meson.build
index 7b05cc25a36..2dd3ede41bf 100644
--- a/contrib/test_decoding/meson.build
+++ b/contrib/test_decoding/meson.build
@@ -62,6 +62,7 @@ tests += {
'concurrent_stream',
'twophase_snapshot',
'slot_creation_error',
+ 'skip_snapshot_restore',
],
'regress_args': [
'--temp-config', files('logical.conf'),
diff --git a/contrib/test_decoding/specs/skip_snapshot_restore.spec b/contrib/test_decoding/specs/skip_snapshot_restore.spec
new file mode 100644
index 00000000000..3f1fb6f02c7
--- /dev/null
+++ b/contrib/test_decoding/specs/skip_snapshot_restore.spec
@@ -0,0 +1,46 @@
+# Test that a slot creation skips to restore serialized snapshot to reach
+# the consistent state.
+
+setup
+{
+ DROP TABLE IF EXISTS tbl;
+ CREATE TABLE tbl (val1 integer);
+}
+
+teardown
+{
+ DROP TABLE tbl;
+ SELECT 'stop' FROM pg_drop_replication_slot('slot0');
+ SELECT 'stop' FROM pg_drop_replication_slot('slot1');
+}
+
+session "s0"
+setup { SET synchronous_commit = on; }
+step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('slot0', 'test_decoding'); }
+step "s0_begin" { BEGIN; }
+step "s0_insert1" { INSERT INTO tbl VALUES (1); }
+step "s0_insert2" { INSERT INTO tbl VALUES (2); }
+step "s0_commit" { COMMIT; }
+
+session "s1"
+setup { SET synchronous_commit = on; }
+step "s1_init" { SELECT 'init' FROM pg_create_logical_replication_slot('slot1', 'test_decoding'); }
+step "s1_get_changes_slot0" { SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); }
+step "s1_get_changes_slot1" { SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); }
+
+session "s2"
+setup { SET synchronous_commit = on ;}
+step "s2_checkpoint" { CHECKPOINT; }
+step "s2_get_changes_slot0" { SELECT data FROM pg_logical_slot_get_changes('slot0', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); }
+
+
+# While 'slot1' creation by "s1_init" waits for s0-transaction to commit, the
+# RUNNING_XACTS record is written by "s2_checkpoint" and "s2_get_changes_slot1"
+# serializes consistent snapshots to the disk at LSNs where are before
+# s0-transaction's commit. After s0-transaction commits, "s1_init" resumes but
+# must not restore any serialized snapshots and will reach the consistent state
+# when decoding a RUNNING_XACT record generated after s0-transaction's commit.
+# We check if the get_changes on 'slot1' will not return any s0-transaction's
+# changes as its confirmed_flush_lsn will be after the s0-transaction's commit
+# record.
+permutation "s0_init" "s0_begin" "s0_insert1" "s1_init" "s2_checkpoint" "s2_get_changes_slot0" "s0_insert2" "s0_commit" "s1_get_changes_slot0" "s1_get_changes_slot1"
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 41243d0187a..6894d3acbc4 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -151,6 +151,7 @@ StartupDecodingContext(List *output_plugin_options,
TransactionId xmin_horizon,
bool need_full_snapshot,
bool fast_forward,
+ bool in_create,
XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
@@ -296,6 +297,8 @@ StartupDecodingContext(List *output_plugin_options,
ctx->fast_forward = fast_forward;
+ ctx->in_create = in_create;
+
MemoryContextSwitchTo(old_context);
return ctx;
@@ -437,7 +440,7 @@ CreateInitDecodingContext(const char *plugin,
ReplicationSlotSave();
ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
- need_full_snapshot, false,
+ need_full_snapshot, false, true,
xl_routine, prepare_write, do_write,
update_progress);
@@ -573,7 +576,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
ctx = StartupDecodingContext(output_plugin_options,
start_lsn, InvalidTransactionId, false,
- fast_forward, xl_routine, prepare_write,
+ fast_forward, false, xl_routine, prepare_write,
do_write, update_progress);
/* call output plugin initialization callback */
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 7a7aba33e16..3ed2f79dd06 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -1314,6 +1314,8 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
static bool
SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
{
+ LogicalDecodingContext *ctx = (LogicalDecodingContext *) builder->reorder->private_data;
+
/* ---
* Build catalog decoding snapshot incrementally using information about
* the currently running transactions. There are several ways to do that:
@@ -1323,10 +1325,12 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
* state while waiting on c)'s sub-states.
*
* b) This (in a previous run) or another decoding slot serialized a
- * snapshot to disk that we can use. Can't use this method for the
- * initial snapshot when slot is being created and needs full snapshot
- * for export or direct use, as that snapshot will only contain catalog
- * modifying transactions.
+ * snapshot to disk that we can use. Can't use this method while finding
+ * the start point for decoding changes as the restart LSN would be an
+ * arbitrary LSN but we need to find the start point to extract changes
+ * where we won't see the data for partial transactions. Also, we cannot
+ * use this method when a slot needs a full snapshot for export or direct
+ * use, as that snapshot will only contain catalog modifying transactions.
*
* c) First incrementally build a snapshot for catalog tuples
* (BUILDING_SNAPSHOT), that requires all, already in-progress,
@@ -1391,8 +1395,13 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
return false;
}
- /* b) valid on disk state and not building full snapshot */
+
+ /*
+ * b) valid on disk state and while neither building full snapshot nor
+ * creating a slot.
+ */
else if (!builder->building_full_snapshot &&
+ !ctx->in_create &&
SnapBuildRestore(builder, lsn))
{
/* there won't be any state to cleanup */
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 5f49554ea05..68d1c37230a 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -109,6 +109,12 @@ typedef struct LogicalDecodingContext
TransactionId write_xid;
/* Are we processing the end LSN of a transaction? */
bool end_xact;
+
+ /*
+ * True if the logical decoding context being used for the creation
+ * of a logical replication slot.
+ */
+ bool in_create;
} LogicalDecodingContext;