1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
|
-- predictability
SET synchronous_commit = on;
-- fail because we're creating a slot while in an xact with xid
BEGIN;
SELECT pg_current_xact_id() = '0';
?column?
----------
f
(1 row)
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
ERROR: cannot create logical replication slot in transaction that has performed writes
ROLLBACK;
-- fail because we're creating a slot while in a subxact whose topxact has an xid
BEGIN;
SELECT pg_current_xact_id() = '0';
?column?
----------
f
(1 row)
SAVEPOINT barf;
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
ERROR: cannot create logical replication slot in transaction that has performed writes
ROLLBACK TO SAVEPOINT barf;
ROLLBACK;
-- succeed, outside tx.
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
?column?
----------
init
(1 row)
SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
?column?
----------
stop
(1 row)
-- succeed, in tx without xid.
BEGIN;
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
?column?
----------
init
(1 row)
COMMIT;
CREATE TABLE nobarf(id serial primary key, data text);
INSERT INTO nobarf(data) VALUES('1');
-- decoding works in transaction with xid
BEGIN;
SELECT pg_current_xact_id() = '0';
?column?
----------
f
(1 row)
-- don't show yet, haven't committed
INSERT INTO nobarf(data) VALUES('2');
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
-----------------------------------------------------------
BEGIN
table public.nobarf: INSERT: id[integer]:1 data[text]:'1'
COMMIT
(3 rows)
COMMIT;
INSERT INTO nobarf(data) VALUES('3');
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
-----------------------------------------------------------
BEGIN
table public.nobarf: INSERT: id[integer]:2 data[text]:'2'
COMMIT
BEGIN
table public.nobarf: INSERT: id[integer]:3 data[text]:'3'
COMMIT
(6 rows)
-- Decoding works in transaction that issues DDL
--
-- We had issues handling relcache invalidations with these, see
-- https://www.postgresql.org/message-id/e56be7d9-14b1-664d-0bfc-00ce9772721c@gmail.com
CREATE TABLE tbl_created_outside_xact(id SERIAL PRIMARY KEY);
BEGIN;
-- TRUNCATE changes the relfilenode and sends relcache invalidation
TRUNCATE tbl_created_outside_xact;
INSERT INTO tbl_created_outside_xact(id) VALUES('1');
-- don't show yet, haven't committed
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
------
(0 rows)
COMMIT;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
--------------------------------------------------------------
BEGIN
table public.tbl_created_outside_xact: TRUNCATE: (no-flags)
table public.tbl_created_outside_xact: INSERT: id[integer]:1
COMMIT
(4 rows)
SET debug_logical_replication_streaming = immediate;
BEGIN;
CREATE TABLE tbl_created_in_xact(id SERIAL PRIMARY KEY);
INSERT INTO tbl_created_in_xact VALUES (1);
CHECKPOINT; -- Force WAL flush, so that the above changes will be streamed
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
data
------------------------------------------
opening a streamed block for transaction
streaming change for transaction
closing a streamed block for transaction
(3 rows)
COMMIT;
RESET debug_logical_replication_streaming;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
---------------------------------------------------------
BEGIN
table public.tbl_created_in_xact: INSERT: id[integer]:1
COMMIT
(3 rows)
SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
?column?
----------
stop
(1 row)
|