aboutsummaryrefslogtreecommitdiff
path: root/src/test/subscription/t/030_origin.pl
blob: b297a51f7c7288496fdd2806a41b700da3164914 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# Copyright (c) 2021-2022, PostgreSQL Global Development Group

# Test the CREATE SUBSCRIPTION 'origin' parameter.
use strict;
use warnings;
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;

###############################################################################
# Setup a bidirectional logical replication between node_A & node_B
###############################################################################

# Initialize nodes
# node_A
my $node_A = PostgreSQL::Test::Cluster->new('node_A');
$node_A->init(allows_streaming => 'logical');
$node_A->start;
# node_B
my $node_B = PostgreSQL::Test::Cluster->new('node_B');
$node_B->init(allows_streaming => 'logical');
$node_B->start;

# Create table on node_A
$node_A->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)");

# Create the same table on node_B
$node_B->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)");

# Setup logical replication
# node_A (pub) -> node_B (sub)
my $node_A_connstr = $node_A->connstr . ' dbname=postgres';
$node_A->safe_psql('postgres', "CREATE PUBLICATION tap_pub_A FOR TABLE tab");
my $appname_B1 = 'tap_sub_B1';
$node_B->safe_psql(
	'postgres', "
	CREATE SUBSCRIPTION tap_sub_B1
	CONNECTION '$node_A_connstr application_name=$appname_B1'
	PUBLICATION tap_pub_A
	WITH (origin = none)");

# node_B (pub) -> node_A (sub)
my $node_B_connstr = $node_B->connstr . ' dbname=postgres';
$node_B->safe_psql('postgres', "CREATE PUBLICATION tap_pub_B FOR TABLE tab");
my $appname_A = 'tap_sub_A';
$node_A->safe_psql(
	'postgres', "
	CREATE SUBSCRIPTION tap_sub_A
	CONNECTION '$node_B_connstr application_name=$appname_A'
	PUBLICATION tap_pub_B
	WITH (origin = none, copy_data = off)");

# Wait for initial table sync to finish
$node_A->wait_for_subscription_sync($node_B, $appname_A);
$node_B->wait_for_subscription_sync($node_A, $appname_B1);

is(1, 1, 'Bidirectional replication setup is complete');

my $result;

###############################################################################
# Check that bidirectional logical replication setup does not cause infinite
# recursive insertion.
###############################################################################

# insert a record
$node_A->safe_psql('postgres', "INSERT INTO tab VALUES (11);");
$node_B->safe_psql('postgres', "INSERT INTO tab VALUES (21);");

$node_A->wait_for_catchup($appname_B1);
$node_B->wait_for_catchup($appname_A);

# check that transaction was committed on subscriber(s)
$result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
is( $result, qq(11
21),
	'Inserted successfully without leading to infinite recursion in bidirectional replication setup'
);
$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
is( $result, qq(11
21),
	'Inserted successfully without leading to infinite recursion in bidirectional replication setup'
);

$node_A->safe_psql('postgres', "DELETE FROM tab;");

$node_A->wait_for_catchup($appname_B1);
$node_B->wait_for_catchup($appname_A);

###############################################################################
# Check that remote data of node_B (that originated from node_C) is not
# published to node_A.
###############################################################################
$result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
is($result, qq(), 'Check existing data');

$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
is($result, qq(), 'Check existing data');

# Initialize node node_C
my $node_C = PostgreSQL::Test::Cluster->new('node_C');
$node_C->init(allows_streaming => 'logical');
$node_C->start;

$node_C->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)");

# Setup logical replication
# node_C (pub) -> node_B (sub)
my $node_C_connstr = $node_C->connstr . ' dbname=postgres';
$node_C->safe_psql('postgres', "CREATE PUBLICATION tap_pub_C FOR TABLE tab");

my $appname_B2 = 'tap_sub_B2';
$node_B->safe_psql(
	'postgres', "
	CREATE SUBSCRIPTION tap_sub_B2
	CONNECTION '$node_C_connstr application_name=$appname_B2'
	PUBLICATION tap_pub_C
	WITH (origin = none)");

$node_B->wait_for_subscription_sync($node_C, $appname_B2);

# insert a record
$node_C->safe_psql('postgres', "INSERT INTO tab VALUES (32);");

$node_C->wait_for_catchup($appname_B2);
$node_B->wait_for_catchup($appname_A);
$node_A->wait_for_catchup($appname_B1);

$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
is($result, qq(32), 'The node_C data replicated to node_B');

# check that the data published from node_C to node_B is not sent to node_A
$result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
is($result, qq(),
	'Remote data originating from another node (not the publisher) is not replicated when origin parameter is none'
);

# shutdown
$node_B->stop('fast');
$node_A->stop('fast');
$node_C->stop('fast');

done_testing();