aboutsummaryrefslogtreecommitdiff
path: root/src/test/subscription/t/030_origin.pl
blob: 5b82848e5e6d3daf924644750e09912a4c0613de (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
# Copyright (c) 2021-2025, PostgreSQL Global Development Group

# Test the CREATE SUBSCRIPTION 'origin' parameter and its interaction with
# 'copy_data' parameter.
use strict;
use warnings FATAL => 'all';
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;

my $subname_AB = 'tap_sub_A_B';
my $subname_AB2 = 'tap_sub_A_B_2';
my $subname_BA = 'tap_sub_B_A';
my $subname_BC = 'tap_sub_B_C';

my $result;
my $stdout;
my $stderr;

###############################################################################
# Setup a bidirectional logical replication between node_A & node_B
###############################################################################

# Initialize nodes
# node_A
my $node_A = PostgreSQL::Test::Cluster->new('node_A');
$node_A->init(allows_streaming => 'logical');
$node_A->start;

# node_B
my $node_B = PostgreSQL::Test::Cluster->new('node_B');
$node_B->init(allows_streaming => 'logical');

# Enable the track_commit_timestamp to detect the conflict when attempting to
# update a row that was previously modified by a different origin.
$node_B->append_conf('postgresql.conf', 'track_commit_timestamp = on');
$node_B->start;

# Create table on node_A
$node_A->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)");

# Create the same table on node_B
$node_B->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)");

# Setup logical replication
# node_A (pub) -> node_B (sub)
my $node_A_connstr = $node_A->connstr . ' dbname=postgres';
$node_A->safe_psql('postgres', "CREATE PUBLICATION tap_pub_A FOR TABLE tab");
$node_B->safe_psql(
	'postgres', "
	CREATE SUBSCRIPTION $subname_BA
	CONNECTION '$node_A_connstr application_name=$subname_BA'
	PUBLICATION tap_pub_A
	WITH (origin = none)");

# node_B (pub) -> node_A (sub)
my $node_B_connstr = $node_B->connstr . ' dbname=postgres';
$node_B->safe_psql('postgres', "CREATE PUBLICATION tap_pub_B FOR TABLE tab");
$node_A->safe_psql(
	'postgres', "
	CREATE SUBSCRIPTION $subname_AB
	CONNECTION '$node_B_connstr application_name=$subname_AB'
	PUBLICATION tap_pub_B
	WITH (origin = none, copy_data = off)");

# Wait for initial table sync to finish
$node_A->wait_for_subscription_sync($node_B, $subname_AB);
$node_B->wait_for_subscription_sync($node_A, $subname_BA);

is(1, 1, 'Bidirectional replication setup is complete');

###############################################################################
# Check that bidirectional logical replication setup does not cause infinite
# recursive insertion.
###############################################################################

# insert a record
$node_A->safe_psql('postgres', "INSERT INTO tab VALUES (11);");
$node_B->safe_psql('postgres', "INSERT INTO tab VALUES (21);");

$node_A->wait_for_catchup($subname_BA);
$node_B->wait_for_catchup($subname_AB);

# check that transaction was committed on subscriber(s)
$result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
is( $result, qq(11
21),
	'Inserted successfully without leading to infinite recursion in bidirectional replication setup'
);
$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
is( $result, qq(11
21),
	'Inserted successfully without leading to infinite recursion in bidirectional replication setup'
);

$node_A->safe_psql('postgres', "DELETE FROM tab;");

$node_A->wait_for_catchup($subname_BA);
$node_B->wait_for_catchup($subname_AB);

###############################################################################
# Check that remote data of node_B (that originated from node_C) is not
# published to node_A.
###############################################################################
$result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
is($result, qq(), 'Check existing data');

$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
is($result, qq(), 'Check existing data');

# Initialize node node_C
my $node_C = PostgreSQL::Test::Cluster->new('node_C');
$node_C->init(allows_streaming => 'logical');
$node_C->start;

$node_C->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)");

# Setup logical replication
# node_C (pub) -> node_B (sub)
my $node_C_connstr = $node_C->connstr . ' dbname=postgres';
$node_C->safe_psql('postgres', "CREATE PUBLICATION tap_pub_C FOR TABLE tab");
$node_B->safe_psql(
	'postgres', "
	CREATE SUBSCRIPTION $subname_BC
	CONNECTION '$node_C_connstr application_name=$subname_BC'
	PUBLICATION tap_pub_C
	WITH (origin = none)");
$node_B->wait_for_subscription_sync($node_C, $subname_BC);

# insert a record
$node_C->safe_psql('postgres', "INSERT INTO tab VALUES (32);");

$node_C->wait_for_catchup($subname_BC);
$node_B->wait_for_catchup($subname_AB);
$node_A->wait_for_catchup($subname_BA);

$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
is($result, qq(32), 'The node_C data replicated to node_B');

# check that the data published from node_C to node_B is not sent to node_A
$result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
is($result, qq(),
	'Remote data originating from another node (not the publisher) is not replicated when origin parameter is none'
);

###############################################################################
# Check that the conflict can be detected when attempting to update or
# delete a row that was previously modified by a different source.
###############################################################################

$node_B->safe_psql('postgres', "DELETE FROM tab;");

$node_A->safe_psql('postgres', "INSERT INTO tab VALUES (32);");

$node_A->wait_for_catchup($subname_BA);
$node_B->wait_for_catchup($subname_AB);

$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
is($result, qq(32), 'The node_A data replicated to node_B');

# The update should update the row on node B that was inserted by node A.
$node_C->safe_psql('postgres', "UPDATE tab SET a = 33 WHERE a = 32;");

$node_B->wait_for_log(
	qr/conflict detected on relation "public.tab": conflict=update_origin_differs.*\n.*DETAIL:.* Updating the row that was modified by a different origin ".*" in transaction [0-9]+ at .*\n.*Existing local tuple \(32\); remote tuple \(33\); replica identity \(a\)=\(32\)/
);

$node_B->safe_psql('postgres', "DELETE FROM tab;");
$node_A->safe_psql('postgres', "INSERT INTO tab VALUES (33);");

$node_A->wait_for_catchup($subname_BA);
$node_B->wait_for_catchup($subname_AB);

$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
is($result, qq(33), 'The node_A data replicated to node_B');

# The delete should remove the row on node B that was inserted by node A.
$node_C->safe_psql('postgres', "DELETE FROM tab WHERE a = 33;");

$node_B->wait_for_log(
	qr/conflict detected on relation "public.tab": conflict=delete_origin_differs.*\n.*DETAIL:.* Deleting the row that was modified by a different origin ".*" in transaction [0-9]+ at .*\n.*Existing local tuple \(33\); replica identity \(a\)=\(33\)/
);

# The remaining tests no longer test conflict detection.
$node_B->append_conf('postgresql.conf', 'track_commit_timestamp = off');
$node_B->restart;

###############################################################################
# Specifying origin = NONE indicates that the publisher should only replicate the
# changes that are generated locally from node_B, but in this case since the
# node_B is also subscribing data from node_A, node_B can have remotely
# originated data from node_A. We log a warning, in this case, to draw
# attention to there being possible remote data.
###############################################################################
($result, $stdout, $stderr) = $node_A->psql(
	'postgres', "
        CREATE SUBSCRIPTION $subname_AB2
        CONNECTION '$node_B_connstr application_name=$subname_AB2'
        PUBLICATION tap_pub_B
        WITH (origin = none, copy_data = on)");
like(
	$stderr,
	qr/WARNING: ( [A-Z0-9]+:)? subscription "tap_sub_a_b_2" requested copy_data with origin = NONE but might copy data that had a different origin/,
	"Create subscription with origin = none and copy_data when the publisher has subscribed same table"
);

$node_A->wait_for_subscription_sync($node_B, $subname_AB2);

# Alter subscription ... refresh publication should be successful when no new
# table is added
$node_A->safe_psql(
	'postgres', "
        ALTER SUBSCRIPTION $subname_AB2 REFRESH PUBLICATION");

# Check Alter subscription ... refresh publication when there is a new
# table that is subscribing data from a different publication
$node_A->safe_psql('postgres', "CREATE TABLE tab_new (a int PRIMARY KEY)");
$node_B->safe_psql('postgres', "CREATE TABLE tab_new (a int PRIMARY KEY)");

# add a new table to the publication
$node_A->safe_psql('postgres',
	"ALTER PUBLICATION tap_pub_A ADD TABLE tab_new");
$node_B->safe_psql(
	'postgres', "
        ALTER SUBSCRIPTION $subname_BA REFRESH PUBLICATION");

$node_B->wait_for_subscription_sync($node_A, $subname_BA);

# add a new table to the publication
$node_B->safe_psql('postgres',
	"ALTER PUBLICATION tap_pub_B ADD TABLE tab_new");

# Alter subscription ... refresh publication should log a warning when a new
# table on the publisher is subscribing data from a different publication
($result, $stdout, $stderr) = $node_A->psql(
	'postgres', "
        ALTER SUBSCRIPTION $subname_AB2 REFRESH PUBLICATION");
like(
	$stderr,
	qr/WARNING: ( [A-Z0-9]+:)? subscription "tap_sub_a_b_2" requested copy_data with origin = NONE but might copy data that had a different origin/,
	"Refresh publication when the publisher has subscribed for the new table, but the subscriber-side wants origin = none"
);

# Ensure that relation has reached 'ready' state before we try to drop it
my $synced_query =
  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r');";
$node_A->poll_query_until('postgres', $synced_query)
  or die "Timed out while waiting for subscriber to synchronize data";

$node_B->wait_for_catchup($subname_AB2);

# clear the operations done by this test
$node_A->safe_psql(
	'postgres', qq(
DROP TABLE tab_new;
DROP SUBSCRIPTION $subname_AB2;
DROP SUBSCRIPTION $subname_AB;
DROP PUBLICATION tap_pub_A;
));
$node_B->safe_psql(
	'postgres', qq(
DROP TABLE tab_new;
DROP SUBSCRIPTION $subname_BA;
DROP PUBLICATION tap_pub_B;
));

###############################################################################
# Specifying origin = NONE and copy_data = on must raise WARNING if we subscribe
# to a partitioned table and this table contains any remotely originated data.
#
#           node_B
#  __________________________
# |       tab_main           | --------------> node_C (tab_main)
# |__________________________|
# | tab_part1  | tab_part2   | <-------------- node_A (tab_part2)
# |____________|_____________|
#              | tab_part2_1 |
#              |_____________|
#
#           node_B
#  __________________________
# |       tab_main           |
# |__________________________|
# | tab_part1  | tab_part2   | <-------------- node_A (tab_part2)
# |____________|_____________|
#              | tab_part2_1 | --------------> node_C (tab_part2_1)
#              |_____________|
###############################################################################

# create a table on node A which will act as a source for a partition on node B
$node_A->safe_psql(
	'postgres', qq(
CREATE TABLE tab_part2(a int);
CREATE PUBLICATION tap_pub_A FOR TABLE tab_part2;
));

# create a partition table on node B
$node_B->safe_psql(
	'postgres', qq(
CREATE TABLE tab_main(a int) PARTITION BY RANGE(a);
CREATE TABLE tab_part1 PARTITION OF tab_main FOR VALUES FROM (0) TO (5);
CREATE TABLE tab_part2(a int) PARTITION BY RANGE(a);
CREATE TABLE tab_part2_1 PARTITION OF tab_part2 FOR VALUES FROM (5) TO (10);
ALTER TABLE tab_main ATTACH PARTITION tab_part2 FOR VALUES FROM (5) to (10);
CREATE SUBSCRIPTION tap_sub_A_B CONNECTION '$node_A_connstr' PUBLICATION tap_pub_A;
));

# create a table on node C
$node_C->safe_psql(
	'postgres', qq(
CREATE TABLE tab_main(a int);
CREATE TABLE tab_part2_1(a int);
));

# create a logical replication setup between node B and node C with
# subscription on node C having origin = NONE and copy_data = on
$node_B->safe_psql(
	'postgres', qq(
CREATE PUBLICATION tap_pub_B FOR TABLE tab_main WITH (publish_via_partition_root);
CREATE PUBLICATION tap_pub_B_2 FOR TABLE tab_part2_1;
));

($result, $stdout, $stderr) = $node_C->psql(
	'postgres', "
	CREATE SUBSCRIPTION tap_sub_B_C CONNECTION '$node_B_connstr' PUBLICATION tap_pub_B WITH (origin = none, copy_data = on);
");

# A warning must be logged as a partition 'tab_part2' in node B is subscribed to
# node A so partition 'tab_part2' can have remotely originated data
like(
	$stderr,
	qr/WARNING: ( [A-Z0-9]+:)? subscription "tap_sub_b_c" requested copy_data with origin = NONE but might copy data that had a different origin/,
	"Create subscription with origin = none and copy_data when the publisher's partition is subscribing from different origin"
);
$node_C->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_B_C");

($result, $stdout, $stderr) = $node_C->psql(
	'postgres', "
	CREATE SUBSCRIPTION tap_sub_B_C CONNECTION '$node_B_connstr' PUBLICATION tap_pub_B_2 WITH (origin = none, copy_data = on);
");

# A warning must be logged as ancestor of table 'tab_part2_1' in node B is
# subscribed to node A so table 'tab_part2_1' can have remotely originated
# data
like(
	$stderr,
	qr/WARNING: ( [A-Z0-9]+:)? subscription "tap_sub_b_c" requested copy_data with origin = NONE but might copy data that had a different origin/,
	"Create subscription with origin = none and copy_data when the publisher's ancestor is subscribing from different origin"
);

# clear the operations done by this test
$node_C->safe_psql(
	'postgres', qq(
DROP SUBSCRIPTION tap_sub_B_C;
DROP TABLE tab_main;
DROP TABLE tab_part2_1;
));
$node_B->safe_psql(
	'postgres', qq(
DROP SUBSCRIPTION tap_sub_A_B;
DROP PUBLICATION tap_pub_B;
DROP PUBLICATION tap_pub_B_2;
DROP TABLE tab_main;
));
$node_A->safe_psql(
	'postgres', qq(
DROP PUBLICATION tap_pub_A;
DROP TABLE tab_part2;
));

# shutdown
$node_B->stop('fast');
$node_A->stop('fast');
$node_C->stop('fast');

done_testing();