aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/test_decoding/expected/catalog_change_snapshot.out44
-rw-r--r--contrib/test_decoding/specs/catalog_change_snapshot.spec15
-rw-r--r--src/backend/replication/logical/snapbuild.c10
3 files changed, 65 insertions, 4 deletions
diff --git a/contrib/test_decoding/expected/catalog_change_snapshot.out b/contrib/test_decoding/expected/catalog_change_snapshot.out
index b33e49c0b1c..551dc2204a9 100644
--- a/contrib/test_decoding/expected/catalog_change_snapshot.out
+++ b/contrib/test_decoding/expected/catalog_change_snapshot.out
@@ -132,3 +132,47 @@ COMMIT
stop
(1 row)
+
+starting permutation: s0_init s0_begin s0_savepoint s0_create_part1 s0_savepoint_release s1_checkpoint s0_create_part2 s0_commit s0_begin s0_truncate s1_checkpoint s1_get_changes s0_insert_part s1_get_changes s0_commit s1_get_changes
+step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
+?column?
+--------
+init
+(1 row)
+
+step s0_begin: BEGIN;
+step s0_savepoint: SAVEPOINT sp1;
+step s0_create_part1: CREATE TABLE tbl1_part_p1 PARTITION OF tbl1_part FOR VALUES FROM (0) TO (10);
+step s0_savepoint_release: RELEASE SAVEPOINT sp1;
+step s1_checkpoint: CHECKPOINT;
+step s0_create_part2: CREATE TABLE tbl1_part_p2 PARTITION OF tbl1_part FOR VALUES FROM (10) TO (20);
+step s0_commit: COMMIT;
+step s0_begin: BEGIN;
+step s0_truncate: TRUNCATE tbl1;
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+----
+(0 rows)
+
+step s0_insert_part: INSERT INTO tbl1_part VALUES (1);
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+----
+(0 rows)
+
+step s0_commit: COMMIT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+--------------------------------------------------
+BEGIN
+table public.tbl1: TRUNCATE: (no-flags)
+table public.tbl1_part_p1: INSERT: val1[integer]:1
+COMMIT
+(4 rows)
+
+?column?
+--------
+stop
+(1 row)
+
diff --git a/contrib/test_decoding/specs/catalog_change_snapshot.spec b/contrib/test_decoding/specs/catalog_change_snapshot.spec
index 770dbd642d6..d8b9df97ed5 100644
--- a/contrib/test_decoding/specs/catalog_change_snapshot.spec
+++ b/contrib/test_decoding/specs/catalog_change_snapshot.spec
@@ -3,13 +3,16 @@
setup
{
DROP TABLE IF EXISTS tbl1;
+ DROP TABLE IF EXISTS tbl1_part;
CREATE TABLE tbl1 (val1 integer, val2 integer);
+ CREATE TABLE tbl1_part (val1 integer) PARTITION BY RANGE (val1);
CREATE TABLE user_cat (val1 integer) WITH (user_catalog_table = true);
}
teardown
{
DROP TABLE tbl1;
+ DROP TABLE tbl1_part;
DROP TABLE user_cat;
SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
}
@@ -19,9 +22,13 @@ setup { SET synchronous_commit=on; }
step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); }
step "s0_begin" { BEGIN; }
step "s0_savepoint" { SAVEPOINT sp1; }
+step "s0_savepoint_release" { RELEASE SAVEPOINT sp1; }
step "s0_truncate" { TRUNCATE tbl1; }
step "s0_insert" { INSERT INTO tbl1 VALUES (1); }
step "s0_insert2" { INSERT INTO user_cat VALUES (1); }
+step "s0_insert_part" { INSERT INTO tbl1_part VALUES (1); }
+step "s0_create_part1" { CREATE TABLE tbl1_part_p1 PARTITION OF tbl1_part FOR VALUES FROM (0) TO (10); }
+step "s0_create_part2" { CREATE TABLE tbl1_part_p2 PARTITION OF tbl1_part FOR VALUES FROM (10) TO (20); }
step "s0_commit" { COMMIT; }
session "s1"
@@ -60,3 +67,11 @@ permutation "s0_init" "s0_begin" "s0_savepoint" "s0_insert" "s1_checkpoint" "s1_
# to skip this xact but ensure that corresponding invalidation messages
# get processed.
permutation "s0_init" "s0_begin" "s0_savepoint" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_truncate" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"
+
+# The last decoding restarts from the first checkpoint and doesn't decode
+# any WAL records generated by the subtransaction that performed s0_create_part1.
+# While processing the commit record for the corresponding top-level transaction
+# which will be marked as containing catalog change even before commit, we ensure
+# that the corresponding substransaction is also marked as containing a catalog
+# modifying change.
+permutation "s0_init" "s0_begin" "s0_savepoint" "s0_create_part1" "s0_savepoint_release" "s1_checkpoint" "s0_create_part2" "s0_commit" "s0_begin" "s0_truncate" "s1_checkpoint" "s1_get_changes" "s0_insert_part" "s1_get_changes" "s0_commit" "s1_get_changes"
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 3eaaaac6b3a..39819947c13 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -2126,11 +2126,13 @@ SnapBuildXidSetCatalogChanges(SnapBuild *builder, TransactionId xid, int subxcnt
TransactionId *subxacts, XLogRecPtr lsn)
{
/*
- * Skip if there is no initial running xacts information or the
- * transaction is already marked as containing catalog changes.
+ * Skip if there is no initial running xacts information.
+ *
+ * Even if the transaction has been marked as containing catalog
+ * changes, it cannot be skipped because its subtransactions that
+ * modified the catalog may not be marked.
*/
- if (NInitialRunningXacts == 0 ||
- ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
+ if (NInitialRunningXacts == 0)
return;
if (bsearch(&xid, InitialRunningXacts, NInitialRunningXacts,