aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/test_decoding/Makefile2
-rw-r--r--contrib/test_decoding/expected/subxact_without_top.out39
-rw-r--r--contrib/test_decoding/specs/subxact_without_top.spec63
-rw-r--r--src/backend/replication/logical/reorderbuffer.c3
4 files changed, 103 insertions, 4 deletions
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 4afb1d963e5..f439c582a5f 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -7,7 +7,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
decoding_into_rel binary prepared replorigin time messages \
spill slot truncate
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
- oldest_xmin snapshot_transfer
+ oldest_xmin snapshot_transfer subxact_without_top
REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
diff --git a/contrib/test_decoding/expected/subxact_without_top.out b/contrib/test_decoding/expected/subxact_without_top.out
new file mode 100644
index 00000000000..99ce9988225
--- /dev/null
+++ b/contrib/test_decoding/expected/subxact_without_top.out
@@ -0,0 +1,39 @@
+Parsed test spec with 3 sessions
+
+starting permutation: s0_begin s0_first_subxact s2_checkpoint s1_begin s1_dml s0_many_subxacts s0_commit s2_checkpoint s2_get_changes_suppress_output s2_get_changes_suppress_output s1_commit s2_get_changes
+step s0_begin: BEGIN;
+step s0_first_subxact:
+ DO LANGUAGE plpgsql $$
+ BEGIN
+ BEGIN
+ INSERT INTO harvest VALUES (41);
+ EXCEPTION WHEN OTHERS THEN RAISE;
+ END;
+ END $$;
+
+step s2_checkpoint: CHECKPOINT;
+step s1_begin: BEGIN;
+step s1_dml: INSERT INTO harvest VALUES (43);
+step s0_many_subxacts: select subxacts();
+subxacts
+
+
+step s0_commit: COMMIT;
+step s2_checkpoint: CHECKPOINT;
+step s2_get_changes_suppress_output: SELECT null n FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1') GROUP BY n;
+n
+
+
+step s2_get_changes_suppress_output: SELECT null n FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1') GROUP BY n;
+n
+
+step s1_commit: COMMIT;
+step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+data
+
+BEGIN
+table public.harvest: INSERT: apples[integer]:43
+COMMIT
+?column?
+
+stop
diff --git a/contrib/test_decoding/specs/subxact_without_top.spec b/contrib/test_decoding/specs/subxact_without_top.spec
new file mode 100644
index 00000000000..76688c75e0e
--- /dev/null
+++ b/contrib/test_decoding/specs/subxact_without_top.spec
@@ -0,0 +1,63 @@
+# Test decoding of subtransactions whose top-transaction is before restart
+# point. Such transactions won't be streamed as we stream only complete
+# transactions, but it is good to test that they don't cause any problem.
+
+setup
+{
+ SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); -- must be first write in xact
+ CREATE TABLE harvest(apples integer);
+ CREATE OR REPLACE FUNCTION subxacts() returns void as $$
+ BEGIN
+ FOR i in 1 .. 128 LOOP
+ BEGIN
+ INSERT INTO harvest VALUES (42);
+ EXCEPTION
+ WHEN OTHERS THEN
+ RAISE;
+ END;
+ END LOOP;
+ END; $$LANGUAGE 'plpgsql';
+}
+
+teardown
+{
+ DROP TABLE IF EXISTS harvest;
+ SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s0"
+setup { SET synchronous_commit=on; }
+step "s0_begin" { BEGIN; }
+step "s0_first_subxact" {
+ DO LANGUAGE plpgsql $$
+ BEGIN
+ BEGIN
+ INSERT INTO harvest VALUES (41);
+ EXCEPTION WHEN OTHERS THEN RAISE;
+ END;
+ END $$;
+}
+step "s0_many_subxacts" { select subxacts(); }
+step "s0_commit" { COMMIT; }
+
+session "s1"
+setup { SET synchronous_commit=on; }
+step "s1_begin" { BEGIN; }
+step "s1_dml" { INSERT INTO harvest VALUES (43); }
+step "s1_commit" { COMMIT; }
+
+session "s2"
+setup { SET synchronous_commit=on; }
+step "s2_checkpoint" { CHECKPOINT; }
+step "s2_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); }
+step "s2_get_changes_suppress_output" { SELECT null n FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1') GROUP BY n; }
+
+# The first checkpoint establishes the potential restart point (aka
+# restart_lsn) for the slot after the initial subxact. The second checkpoint
+# followed by get_changes will ensure that the potential restart point will
+# become the actual restart point. We do get_changes twice because if one
+# more xl_running_xacts record had slipped before our s0_commit, then the
+# potential restart point won't become actual restart point. The s1's open
+# transaction till get_changes holds the potential restart point to our first
+# checkpoint location.
+permutation "s0_begin" "s0_first_subxact" "s2_checkpoint" "s1_begin" "s1_dml" "s0_many_subxacts" "s0_commit" "s2_checkpoint" "s2_get_changes_suppress_output" "s2_get_changes_suppress_output" "s1_commit" "s2_get_changes"
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index a74fd705b4d..941ca9afe49 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -787,9 +787,6 @@ ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
- if (new_top && !new_sub)
- elog(ERROR, "subtransaction logged without previous top-level txn record");
-
if (!new_sub)
{
if (subtxn->is_known_as_subxact)