aboutsummaryrefslogtreecommitdiff
path: root/src/bin/pg_upgrade/t/004_subscription.pl
blob: 63c0a9837624b3d796a9608e81e23fb2aa3a979e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
# Copyright (c) 2023-2024, PostgreSQL Global Development Group

# Test for pg_upgrade of logical subscription
use strict;
use warnings FATAL => 'all';

use File::Find qw(find);

use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;

# Can be changed to test the other modes.
my $mode = $ENV{PG_TEST_PG_UPGRADE_MODE} || '--copy';

# Initialize publisher node
my $publisher = PostgreSQL::Test::Cluster->new('publisher');
$publisher->init(allows_streaming => 'logical');
$publisher->start;

# Initialize the old subscriber node
my $old_sub = PostgreSQL::Test::Cluster->new('old_sub');
$old_sub->init;
$old_sub->start;
my $oldbindir = $old_sub->config_data('--bindir');

# Initialize the new subscriber
my $new_sub = PostgreSQL::Test::Cluster->new('new_sub');
$new_sub->init;
my $newbindir = $new_sub->config_data('--bindir');

# In a VPATH build, we'll be started in the source directory, but we want
# to run pg_upgrade in the build directory so that any files generated finish
# in it, like delete_old_cluster.{sh,bat}.
chdir ${PostgreSQL::Test::Utils::tmp_check};

# Initial setup
$publisher->safe_psql(
	'postgres', qq[
		CREATE TABLE tab_upgraded1(id int);
		CREATE TABLE tab_upgraded2(id int);
]);
$old_sub->safe_psql(
	'postgres', qq[
		CREATE TABLE tab_upgraded1(id int);
		CREATE TABLE tab_upgraded2(id int);
]);

# Setup logical replication
my $connstr = $publisher->connstr . ' dbname=postgres';

# Setup an enabled subscription to verify that the running status and failover
# option are retained after the upgrade.
$publisher->safe_psql('postgres', "CREATE PUBLICATION regress_pub1");
$old_sub->safe_psql('postgres',
	"CREATE SUBSCRIPTION regress_sub1 CONNECTION '$connstr' PUBLICATION regress_pub1 WITH (failover = true)"
);
$old_sub->wait_for_subscription_sync($publisher, 'regress_sub1');

# Verify that the upgrade should be successful with tables in 'ready'/'init'
# state along with retaining the replication origin's remote lsn, and
# subscription's running status.
$publisher->safe_psql('postgres',
	"CREATE PUBLICATION regress_pub2 FOR TABLE tab_upgraded1");
$old_sub->safe_psql('postgres',
	"CREATE SUBSCRIPTION regress_sub2 CONNECTION '$connstr' PUBLICATION regress_pub2"
);
# Wait till the table tab_upgraded1 reaches 'ready' state
my $synced_query =
  "SELECT count(1) = 1 FROM pg_subscription_rel WHERE srsubstate = 'r'";
$old_sub->poll_query_until('postgres', $synced_query)
  or die "Timed out while waiting for the table to reach ready state";

$publisher->safe_psql('postgres',
	"INSERT INTO tab_upgraded1 VALUES (generate_series(1,50))");
$publisher->wait_for_catchup('regress_sub2');

# Change configuration to prepare a subscription table in init state
$old_sub->append_conf('postgresql.conf',
	"max_logical_replication_workers = 0");
$old_sub->restart;

$publisher->safe_psql('postgres',
	"ALTER PUBLICATION regress_pub2 ADD TABLE tab_upgraded2");
$old_sub->safe_psql('postgres',
	"ALTER SUBSCRIPTION regress_sub2 REFRESH PUBLICATION");

# The table tab_upgraded2 will be in init state as the subscriber
# configuration for max_logical_replication_workers is set to 0.
my $result = $old_sub->safe_psql('postgres',
	"SELECT count(1) = 1 FROM pg_subscription_rel WHERE srsubstate = 'i'");
is($result, qq(t), "Check that the table is in init state");

# Get the replication origin's remote_lsn of the old subscriber
my $remote_lsn = $old_sub->safe_psql('postgres',
	"SELECT remote_lsn FROM pg_replication_origin_status os, pg_subscription s WHERE os.external_id = 'pg_' || s.oid AND s.subname = 'regress_sub2'"
);
# Have the subscription in disabled state before upgrade
$old_sub->safe_psql('postgres', "ALTER SUBSCRIPTION regress_sub2 DISABLE");

my $tab_upgraded1_oid = $old_sub->safe_psql('postgres',
	"SELECT oid FROM pg_class WHERE relname = 'tab_upgraded1'");
my $tab_upgraded2_oid = $old_sub->safe_psql('postgres',
	"SELECT oid FROM pg_class WHERE relname = 'tab_upgraded2'");

$old_sub->stop;

# ------------------------------------------------------
# Check that pg_upgrade is successful when all tables are in ready or in
# init state (tab_upgraded1 table is in ready state and tab_upgraded2 table is
# in init state) along with retaining the replication origin's remote lsn,
# subscription's running status, and failover option.
# ------------------------------------------------------
command_ok(
	[
		'pg_upgrade', '--no-sync', '-d', $old_sub->data_dir,
		'-D', $new_sub->data_dir, '-b', $oldbindir,
		'-B', $newbindir, '-s', $new_sub->host,
		'-p', $old_sub->port, '-P', $new_sub->port,
		$mode
	],
	'run of pg_upgrade for old instance when the subscription tables are in init/ready state'
);
ok( !-d $new_sub->data_dir . "/pg_upgrade_output.d",
	"pg_upgrade_output.d/ removed after successful pg_upgrade");

# ------------------------------------------------------
# Check that the data inserted to the publisher when the new subscriber is down
# will be replicated once it is started. Also check that the old subscription
# states and relations origins are all preserved.
# ------------------------------------------------------
$publisher->safe_psql(
	'postgres', qq[
		INSERT INTO tab_upgraded1 VALUES(51);
		INSERT INTO tab_upgraded2 VALUES(1);
]);

$new_sub->start;

# The subscription's running status and failover option should be preserved
# in the upgraded instance. So regress_sub1 should still have subenabled and
# subfailover set to true, while regress_sub2 should have both set to false.
$result =
  $new_sub->safe_psql('postgres',
	"SELECT subname, subenabled, subfailover FROM pg_subscription ORDER BY subname");
is( $result, qq(regress_sub1|t|t
regress_sub2|f|f),
	"check that the subscription's running status and failover are preserved");

my $sub_oid = $new_sub->safe_psql('postgres',
	"SELECT oid FROM pg_subscription WHERE subname = 'regress_sub2'");

# Subscription relations should be preserved
$result = $new_sub->safe_psql('postgres',
	"SELECT srrelid, srsubstate FROM pg_subscription_rel WHERE srsubid = $sub_oid ORDER BY srrelid"
);
is( $result, qq($tab_upgraded1_oid|r
$tab_upgraded2_oid|i),
	"there should be 2 rows in pg_subscription_rel(representing tab_upgraded1 and tab_upgraded2)"
);

# The replication origin's remote_lsn should be preserved
$result = $new_sub->safe_psql('postgres',
	"SELECT remote_lsn FROM pg_replication_origin_status WHERE external_id = 'pg_' || $sub_oid"
);
is($result, qq($remote_lsn), "remote_lsn should have been preserved");

# Enable the subscription
$new_sub->safe_psql('postgres', "ALTER SUBSCRIPTION regress_sub2 ENABLE");

# Wait until all tables of subscription 'regress_sub2' are synchronized
$new_sub->wait_for_subscription_sync($publisher, 'regress_sub2');

# Rows on tab_upgraded1 and tab_upgraded2 should have been replicated
$result =
  $new_sub->safe_psql('postgres', "SELECT count(*) FROM tab_upgraded1");
is($result, qq(51), "check replicated inserts on new subscriber");
$result =
  $new_sub->safe_psql('postgres', "SELECT count(*) FROM tab_upgraded2");
is($result, qq(1),
	"check the data is synced after enabling the subscription for the table that was in init state"
);

# cleanup
$new_sub->stop;
$old_sub->append_conf('postgresql.conf',
	"max_logical_replication_workers = 4");
$old_sub->start;
$old_sub->safe_psql(
	'postgres', qq[
		ALTER SUBSCRIPTION regress_sub1 DISABLE;
		ALTER SUBSCRIPTION regress_sub1 SET (slot_name = none);
		DROP SUBSCRIPTION regress_sub1;
]);
$old_sub->stop;

# ------------------------------------------------------
# Check that pg_upgrade fails when max_replication_slots configured in the new
# cluster is less than the number of subscriptions in the old cluster.
# ------------------------------------------------------
my $new_sub1 = PostgreSQL::Test::Cluster->new('new_sub1');
$new_sub1->init;
$new_sub1->append_conf('postgresql.conf', "max_replication_slots = 0");

# pg_upgrade will fail because the new cluster has insufficient
# max_replication_slots.
command_checks_all(
	[
		'pg_upgrade', '--no-sync',
		'-d', $old_sub->data_dir,
		'-D', $new_sub1->data_dir,
		'-b', $oldbindir,
		'-B', $newbindir,
		'-s', $new_sub1->host,
		'-p', $old_sub->port,
		'-P', $new_sub1->port,
		$mode, '--check',
	],
	1,
	[
		qr/max_replication_slots \(0\) must be greater than or equal to the number of subscriptions \(1\) on the old cluster/
	],
	[qr//],
	'run of pg_upgrade where the new cluster has insufficient max_replication_slots'
);

# Reset max_replication_slots
$new_sub1->append_conf('postgresql.conf', "max_replication_slots = 10");

# Drop the subscription
$old_sub->start;
$old_sub->safe_psql('postgres', "DROP SUBSCRIPTION regress_sub2");

# ------------------------------------------------------
# Check that pg_upgrade refuses to run if:
# a) there's a subscription with tables in a state other than 'r' (ready) or
#    'i' (init) and/or
# b) the subscription has no replication origin.
# ------------------------------------------------------
$publisher->safe_psql(
	'postgres', qq[
		CREATE TABLE tab_primary_key(id serial PRIMARY KEY);
		INSERT INTO tab_primary_key values(1);
		CREATE PUBLICATION regress_pub3 FOR TABLE tab_primary_key;
]);

# Insert the same value that is already present in publisher to the primary key
# column of subscriber so that the table sync will fail.
$old_sub->safe_psql(
	'postgres', qq[
		CREATE TABLE tab_primary_key(id serial PRIMARY KEY);
		INSERT INTO tab_primary_key values(1);
		CREATE SUBSCRIPTION regress_sub3 CONNECTION '$connstr' PUBLICATION regress_pub3;
]);

# Table will be in 'd' (data is being copied) state as table sync will fail
# because of primary key constraint error.
my $started_query =
  "SELECT count(1) = 1 FROM pg_subscription_rel WHERE srsubstate = 'd'";
$old_sub->poll_query_until('postgres', $started_query)
  or die
  "Timed out while waiting for the table state to become 'd' (datasync)";

# Create another subscription and drop the subscription's replication origin
$old_sub->safe_psql('postgres',
	"CREATE SUBSCRIPTION regress_sub4 CONNECTION '$connstr' PUBLICATION regress_pub3 WITH (enabled = false)"
);
$sub_oid = $old_sub->safe_psql('postgres',
	"SELECT oid FROM pg_subscription WHERE subname = 'regress_sub4'");
my $reporigin = 'pg_' . qq($sub_oid);
$old_sub->safe_psql('postgres',
	"SELECT pg_replication_origin_drop('$reporigin')");

$old_sub->stop;

command_fails(
	[
		'pg_upgrade', '--no-sync',
		'-d', $old_sub->data_dir,
		'-D', $new_sub1->data_dir,
		'-b', $oldbindir,
		'-B', $newbindir,
		'-s', $new_sub1->host,
		'-p', $old_sub->port,
		'-P', $new_sub1->port,
		$mode, '--check',
	],
	'run of pg_upgrade --check for old instance with relation in \'d\' datasync(invalid) state and missing replication origin'
);

# Verify the reason why the subscriber cannot be upgraded
my $sub_relstate_filename;

# Find a txt file that contains a list of tables that cannot be upgraded. We
# cannot predict the file's path because the output directory contains a
# milliseconds timestamp. File::Find::find must be used.
find(
	sub {
		if ($File::Find::name =~ m/subs_invalid\.txt/)
		{
			$sub_relstate_filename = $File::Find::name;
		}
	},
	$new_sub1->data_dir . "/pg_upgrade_output.d");

# Check the file content which should have tab_primary_key table in invalid
# state.
like(
	slurp_file($sub_relstate_filename),
	qr/The table sync state \"d\" is not allowed for database:\"postgres\" subscription:\"regress_sub3\" schema:\"public\" relation:\"tab_primary_key\"/m,
	'the previous test failed due to subscription table in invalid state');

# Check the file content which should have regress_sub4 subscription.
like(
	slurp_file($sub_relstate_filename),
	qr/The replication origin is missing for database:\"postgres\" subscription:\"regress_sub4\"/m,
	'the previous test failed due to missing replication origin');

done_testing();