aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/test_decoding/expected/catalog_change_snapshot.out45
-rw-r--r--contrib/test_decoding/specs/catalog_change_snapshot.spec16
-rw-r--r--src/backend/replication/logical/reorderbuffer.c14
-rw-r--r--src/backend/replication/logical/snapbuild.c3
4 files changed, 78 insertions, 0 deletions
diff --git a/contrib/test_decoding/expected/catalog_change_snapshot.out b/contrib/test_decoding/expected/catalog_change_snapshot.out
index dc4f9b7018f..1d75cf5af02 100644
--- a/contrib/test_decoding/expected/catalog_change_snapshot.out
+++ b/contrib/test_decoding/expected/catalog_change_snapshot.out
@@ -42,3 +42,48 @@ COMMIT
stop
(1 row)
+
+starting permutation: s0_init s0_begin s0_savepoint s0_insert s1_checkpoint s1_get_changes s0_insert2 s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes
+step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
+?column?
+--------
+init
+(1 row)
+
+step s0_begin: BEGIN;
+step s0_savepoint: SAVEPOINT sp1;
+step s0_insert: INSERT INTO tbl1 VALUES (1);
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+----
+(0 rows)
+
+step s0_insert2: INSERT INTO user_cat VALUES (1);
+step s0_commit: COMMIT;
+step s0_begin: BEGIN;
+step s0_insert: INSERT INTO tbl1 VALUES (1);
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+-------------------------------------------------------------
+BEGIN
+table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null
+table public.user_cat: INSERT: val1[integer]:1
+COMMIT
+(4 rows)
+
+step s0_commit: COMMIT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+-------------------------------------------------------------
+BEGIN
+table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null
+COMMIT
+(3 rows)
+
+?column?
+--------
+stop
+(1 row)
+
diff --git a/contrib/test_decoding/specs/catalog_change_snapshot.spec b/contrib/test_decoding/specs/catalog_change_snapshot.spec
index 2971ddc69cb..2ad1edeaa87 100644
--- a/contrib/test_decoding/specs/catalog_change_snapshot.spec
+++ b/contrib/test_decoding/specs/catalog_change_snapshot.spec
@@ -4,11 +4,13 @@ setup
{
DROP TABLE IF EXISTS tbl1;
CREATE TABLE tbl1 (val1 integer, val2 integer);
+ CREATE TABLE user_cat (val1 integer) WITH (user_catalog_table = true);
}
teardown
{
DROP TABLE tbl1;
+ DROP TABLE user_cat;
SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
}
@@ -19,6 +21,7 @@ step "s0_begin" { BEGIN; }
step "s0_savepoint" { SAVEPOINT sp1; }
step "s0_truncate" { TRUNCATE tbl1; }
step "s0_insert" { INSERT INTO tbl1 VALUES (1); }
+step "s0_insert2" { INSERT INTO user_cat VALUES (1); }
step "s0_commit" { COMMIT; }
session "s1"
@@ -37,3 +40,16 @@ step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_
# record written by bgwriter. One might think we can either stop the bgwriter or
# increase LOG_SNAPSHOT_INTERVAL_MS but it's not practical via tests.
permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"
+
+# Test that we can handle the case where there is no association between top-level
+# transaction and its subtransactions. The last decoding restarts from the first
+# checkpoint, decodes NEW_CID generated by "s0_insert2", and marks the subtransaction
+# as containing catalog changes while adding tuple cids to its top-level transaction.
+# During that, both transaction entries are created in ReorderBuffer as top-level
+# transactions and have the same LSN. We check if the assertion check for the order
+# of transaction LSNs in AssertTXNLsnOrder() is skipped since we are still before the
+# LSN at which we start replaying the contents of transactions. Besides, when decoding
+# the commit record of the top-level transaction, we must force the top-level
+# transaction to do timetravel since one of its subtransactions has been marked as
+# containing catalog changes.
+permutation "s0_init" "s0_begin" "s0_savepoint" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_insert2" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 368f60159da..967e5227423 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -722,10 +722,24 @@ static void
AssertTXNLsnOrder(ReorderBuffer *rb)
{
#ifdef USE_ASSERT_CHECKING
+ LogicalDecodingContext *ctx = rb->private_data;
dlist_iter iter;
XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
+ /*
+ * Skip the verification if we don't reach the LSN at which we start
+ * decoding the contents of transactions yet because until we reach the
+ * LSN, we could have transactions that don't have the association between
+ * the top-level transaction and subtransaction yet and consequently have
+ * the same LSN. We don't guarantee this association until we try to
+ * decode the actual contents of transaction. The ordering of the records
+ * prior to the start_decoding_at LSN should have been checked before the
+ * restart.
+ */
+ if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, ctx->reader->EndRecPtr))
+ return;
+
dlist_foreach(iter, &rb->toplevel_by_lsn)
{
ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN, node,
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index d407fb3440e..1e7c918bdeb 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -1103,6 +1103,9 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
else if (sub_needs_timetravel)
{
/* track toplevel txn as well, subxact alone isn't meaningful */
+ elog(DEBUG2, "forced transaction %u to do timetravel due to one of its subtransactions",
+ xid);
+ needs_timetravel = true;
SnapBuildAddCommittedTxn(builder, xid);
}
else if (needs_timetravel)