aboutsummaryrefslogtreecommitdiff
path: root/src/test/recovery/t/040_standby_failover_slots_sync.pl
blob: e24009610ad071b7cf5e17195804d64f7addddbc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
# Copyright (c) 2024, PostgreSQL Global Development Group

use strict;
use warnings;
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;

##################################################
# Test that when a subscription with failover enabled is created, it will alter
# the failover property of the corresponding slot on the publisher.
##################################################

# Create publisher
my $publisher = PostgreSQL::Test::Cluster->new('publisher');
$publisher->init(allows_streaming => 'logical');
# Disable autovacuum to avoid generating xid during stats update as otherwise
# the new XID could then be replicated to standby at some random point making
# slots at primary lag behind standby during slot sync.
$publisher->append_conf('postgresql.conf', 'autovacuum = off');
$publisher->start;

$publisher->safe_psql('postgres',
	"CREATE PUBLICATION regress_mypub FOR ALL TABLES;");

my $publisher_connstr = $publisher->connstr . ' dbname=postgres';

# Create a subscriber node, wait for sync to complete
my $subscriber1 = PostgreSQL::Test::Cluster->new('subscriber1');
$subscriber1->init;
$subscriber1->start;

# Create a slot on the publisher with failover disabled
$publisher->safe_psql('postgres',
	"SELECT 'init' FROM pg_create_logical_replication_slot('lsub1_slot', 'pgoutput', false, false, false);"
);

# Confirm that the failover flag on the slot is turned off
is( $publisher->safe_psql(
		'postgres',
		q{SELECT failover from pg_replication_slots WHERE slot_name = 'lsub1_slot';}
	),
	"f",
	'logical slot has failover false on the publisher');

# Create a subscription (using the same slot created above) that enables
# failover.
$subscriber1->safe_psql('postgres',
	"CREATE SUBSCRIPTION regress_mysub1 CONNECTION '$publisher_connstr' PUBLICATION regress_mypub WITH (slot_name = lsub1_slot, copy_data=false, failover = true, create_slot = false, enabled = false);"
);

# Confirm that the failover flag on the slot has now been turned on
is( $publisher->safe_psql(
		'postgres',
		q{SELECT failover from pg_replication_slots WHERE slot_name = 'lsub1_slot';}
	),
	"t",
	'logical slot has failover true on the publisher');

##################################################
# Test that changing the failover property of a subscription updates the
# corresponding failover property of the slot.
##################################################

# Disable failover
$subscriber1->safe_psql('postgres',
	"ALTER SUBSCRIPTION regress_mysub1 SET (failover = false)");

# Confirm that the failover flag on the slot has now been turned off
is( $publisher->safe_psql(
		'postgres',
		q{SELECT failover from pg_replication_slots WHERE slot_name = 'lsub1_slot';}
	),
	"f",
	'logical slot has failover false on the publisher');

# Enable failover
$subscriber1->safe_psql('postgres',
	"ALTER SUBSCRIPTION regress_mysub1 SET (failover = true)");

# Confirm that the failover flag on the slot has now been turned on
is( $publisher->safe_psql(
		'postgres',
		q{SELECT failover from pg_replication_slots WHERE slot_name = 'lsub1_slot';}
	),
	"t",
	'logical slot has failover true on the publisher');

##################################################
# Test that the failover option cannot be changed for enabled subscriptions.
##################################################

# Enable subscription
$subscriber1->safe_psql('postgres',
	"ALTER SUBSCRIPTION regress_mysub1 ENABLE");

# Disable failover for enabled subscription
my ($result, $stdout, $stderr) = $subscriber1->psql('postgres',
	"ALTER SUBSCRIPTION regress_mysub1 SET (failover = false)");
ok( $stderr =~ /ERROR:  cannot set failover for enabled subscription/,
	"altering failover is not allowed for enabled subscription");

##################################################
# Test that pg_sync_replication_slots() cannot be executed on a non-standby server.
##################################################

($result, $stdout, $stderr) =
  $publisher->psql('postgres', "SELECT pg_sync_replication_slots();");
ok( $stderr =~
	  /ERROR:  replication slots can only be synchronized to a standby server/,
	"cannot sync slots on a non-standby server");

##################################################
# Test logical failover slots on the standby
# Configure standby1 to replicate and synchronize logical slots configured
# for failover on the primary
#
#              failover slot lsub1_slot ->| ----> subscriber1 (connected via logical replication)
#              failover slot lsub2_slot   |       inactive
# primary --->                            |
#              physical slot sb1_slot --->| ----> standby1 (connected via streaming replication)
#                                         |                 lsub1_slot, lsub2_slot (synced_slot)
##################################################

my $primary = $publisher;
my $backup_name = 'backup';
$primary->backup($backup_name);

# Create a standby
my $standby1 = PostgreSQL::Test::Cluster->new('standby1');
$standby1->init_from_backup(
	$primary, $backup_name,
	has_streaming => 1,
	has_restoring => 1);

# Increase the log_min_messages setting to DEBUG2 on both the standby and
# primary to debug test failures, if any.
my $connstr_1 = $primary->connstr;
$standby1->append_conf(
	'postgresql.conf', qq(
hot_standby_feedback = on
primary_slot_name = 'sb1_slot'
primary_conninfo = '$connstr_1 dbname=postgres'
log_min_messages = 'debug2'
));

$primary->append_conf('postgresql.conf', "log_min_messages = 'debug2'");
$primary->reload;

$primary->psql('postgres',
	q{SELECT pg_create_logical_replication_slot('lsub2_slot', 'test_decoding', false, false, true);}
);

$primary->psql('postgres',
	q{SELECT pg_create_physical_replication_slot('sb1_slot');});

# Start the standby so that slot syncing can begin
$standby1->start;

$primary->wait_for_catchup('regress_mysub1');

# Do not allow any further advancement of the restart_lsn for the lsub1_slot.
$subscriber1->safe_psql('postgres',
	"ALTER SUBSCRIPTION regress_mysub1 DISABLE");

# Wait for the replication slot to become inactive on the publisher
$primary->poll_query_until(
	'postgres',
	"SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active = 'f'",
	1);

# Wait for the standby to catch up so that the standby is not lagging behind
# the subscriber.
$primary->wait_for_replay_catchup($standby1);

# Synchronize the primary server slots to the standby.
$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");

# Confirm that the logical failover slots are created on the standby and are
# flagged as 'synced'
is( $standby1->safe_psql(
		'postgres',
		q{SELECT count(*) = 2 FROM pg_replication_slots WHERE slot_name IN ('lsub1_slot', 'lsub2_slot') AND synced AND NOT temporary;}
	),
	"t",
	'logical slots have synced as true on standby');

##################################################
# Test that the synchronized slot will be dropped if the corresponding remote
# slot on the primary server has been dropped.
##################################################

$primary->psql('postgres', "SELECT pg_drop_replication_slot('lsub2_slot');");

$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");

is( $standby1->safe_psql(
		'postgres',
		q{SELECT count(*) = 0 FROM pg_replication_slots WHERE slot_name = 'lsub2_slot';}
	),
	"t",
	'synchronized slot has been dropped');

##################################################
# Test that if the synchronized slot is invalidated while the remote slot is
# still valid, the slot will be dropped and re-created on the standby by
# executing pg_sync_replication_slots() again.
##################################################

# Configure the max_slot_wal_keep_size so that the synced slot can be
# invalidated due to wal removal.
$standby1->append_conf('postgresql.conf', 'max_slot_wal_keep_size = 64kB');
$standby1->reload;

# Generate some activity and switch WAL file on the primary
$primary->advance_wal(1);
$primary->psql('postgres', "CHECKPOINT");
$primary->wait_for_replay_catchup($standby1);

# Request a checkpoint on the standby to trigger the WAL file(s) removal
$standby1->safe_psql('postgres', "CHECKPOINT");

# Check if the synced slot is invalidated
is( $standby1->safe_psql(
		'postgres',
		q{SELECT conflict_reason = 'wal_removed' FROM pg_replication_slots WHERE slot_name = 'lsub1_slot';}
	),
	"t",
	'synchronized slot has been invalidated');

# Reset max_slot_wal_keep_size to avoid further wal removal
$standby1->append_conf('postgresql.conf', 'max_slot_wal_keep_size = -1');
$standby1->reload;

# To ensure that restart_lsn has moved to a recent WAL position, we re-create
# the subscription and the logical slot.
$subscriber1->safe_psql(
	'postgres', qq[
	DROP SUBSCRIPTION regress_mysub1;
	CREATE SUBSCRIPTION regress_mysub1 CONNECTION '$publisher_connstr' PUBLICATION regress_mypub WITH (slot_name = lsub1_slot, copy_data = false, failover = true);
]);

$primary->wait_for_catchup('regress_mysub1');

# Do not allow any further advancement of the restart_lsn for the lsub1_slot.
$subscriber1->safe_psql('postgres',
	"ALTER SUBSCRIPTION regress_mysub1 DISABLE");

# Wait for the replication slot to become inactive on the publisher
$primary->poll_query_until(
	'postgres',
	"SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active = 'f'",
	1);

# Wait for the standby to catch up so that the standby is not lagging behind
# the subscriber.
$primary->wait_for_replay_catchup($standby1);

my $log_offset = -s $standby1->logfile;

# Synchronize the primary server slots to the standby.
$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");

# Confirm that the invalidated slot has been dropped.
$standby1->wait_for_log(qr/dropped replication slot "lsub1_slot" of dbid [0-9]+/,
	$log_offset);

# Confirm that the logical slot has been re-created on the standby and is
# flagged as 'synced'
is( $standby1->safe_psql(
		'postgres',
		q{SELECT conflict_reason IS NULL AND synced AND NOT temporary FROM pg_replication_slots WHERE slot_name = 'lsub1_slot';}
	),
	"t",
	'logical slot is re-synced');

# Reset the log_min_messages to the default value.
$primary->append_conf('postgresql.conf', "log_min_messages = 'warning'");
$primary->reload;

$standby1->append_conf('postgresql.conf', "log_min_messages = 'warning'");
$standby1->reload;

##################################################
# Test that a synchronized slot can not be decoded, altered or dropped by the
# user
##################################################

# Attempting to perform logical decoding on a synced slot should result in an error
($result, $stdout, $stderr) = $standby1->psql('postgres',
	"select * from pg_logical_slot_get_changes('lsub1_slot', NULL, NULL);");
ok( $stderr =~
	  /ERROR:  cannot use replication slot "lsub1_slot" for logical decoding/,
	"logical decoding is not allowed on synced slot");

# Attempting to alter a synced slot should result in an error
($result, $stdout, $stderr) = $standby1->psql(
	'postgres',
	qq[ALTER_REPLICATION_SLOT lsub1_slot (failover);],
	replication => 'database');
ok($stderr =~ /ERROR:  cannot alter replication slot "lsub1_slot"/,
	"synced slot on standby cannot be altered");

# Attempting to drop a synced slot should result in an error
($result, $stdout, $stderr) = $standby1->psql('postgres',
	"SELECT pg_drop_replication_slot('lsub1_slot');");
ok($stderr =~ /ERROR:  cannot drop replication slot "lsub1_slot"/,
	"synced slot on standby cannot be dropped");

##################################################
# Test that we cannot synchronize slots if dbname is not specified in the
# primary_conninfo.
##################################################

$standby1->append_conf('postgresql.conf', "primary_conninfo = '$connstr_1'");
$standby1->reload;

($result, $stdout, $stderr) =
  $standby1->psql('postgres', "SELECT pg_sync_replication_slots();");
ok( $stderr =~
	  /ERROR:  slot synchronization requires dbname to be specified in primary_conninfo/,
	"cannot sync slots if dbname is not specified in primary_conninfo");

# Add the dbname back to the primary_conninfo for further tests
$standby1->append_conf('postgresql.conf', "primary_conninfo = '$connstr_1 dbname=postgres'");
$standby1->reload;

##################################################
# Test that we cannot synchronize slots to a cascading standby server.
##################################################

# Create a cascading standby
$backup_name = 'backup2';
$standby1->backup($backup_name);

my $cascading_standby = PostgreSQL::Test::Cluster->new('cascading_standby');
$cascading_standby->init_from_backup(
	$standby1, $backup_name,
	has_streaming => 1,
	has_restoring => 1);

my $cascading_connstr = $standby1->connstr;
$cascading_standby->append_conf(
	'postgresql.conf', qq(
hot_standby_feedback = on
primary_slot_name = 'cascading_sb_slot'
primary_conninfo = '$cascading_connstr dbname=postgres'
));

$standby1->psql('postgres',
	q{SELECT pg_create_physical_replication_slot('cascading_sb_slot');});

$cascading_standby->start;

($result, $stdout, $stderr) =
  $cascading_standby->psql('postgres', "SELECT pg_sync_replication_slots();");
ok( $stderr =~
	  /ERROR:  cannot synchronize replication slots from a standby server/,
	"cannot sync slots to a cascading standby server");

$cascading_standby->stop;

##################################################
# Test to confirm that the slot sync worker exits on invalid GUC(s) and
# get started again on valid GUC(s).
##################################################

$log_offset = -s $standby1->logfile;

# Enable slot sync worker.
$standby1->append_conf('postgresql.conf', qq(sync_replication_slots = on));
$standby1->reload;

# Confirm that the slot sync worker is able to start.
$standby1->wait_for_log(qr/LOG:  slot sync worker started/,
	$log_offset);

$log_offset = -s $standby1->logfile;

# Disable another GUC required for slot sync.
$standby1->append_conf(	'postgresql.conf', qq(hot_standby_feedback = off));
$standby1->reload;

# Confirm that slot sync worker acknowledge the GUC change and logs the msg
# about wrong configuration.
$standby1->wait_for_log(qr/LOG:  slot sync worker will restart because of a parameter change/,
	$log_offset);
$standby1->wait_for_log(qr/LOG:  slot synchronization requires hot_standby_feedback to be enabled/,
	$log_offset);

$log_offset = -s $standby1->logfile;

# Re-enable the required GUC
$standby1->append_conf('postgresql.conf', "hot_standby_feedback = on");
$standby1->reload;

# Confirm that the slot sync worker is able to start now.
$standby1->wait_for_log(qr/LOG:  slot sync worker started/,
	$log_offset);

##################################################
# Test to confirm that restart_lsn and confirmed_flush_lsn of the logical slot
# on the primary is synced to the standby via the slot sync worker.
##################################################

# Insert data on the primary
$primary->safe_psql(
	'postgres', qq[
	CREATE TABLE tab_int (a int PRIMARY KEY);
	INSERT INTO tab_int SELECT generate_series(1, 10);
]);

# Subscribe to the new table data and wait for it to arrive
$subscriber1->safe_psql(
	'postgres', qq[
	CREATE TABLE tab_int (a int PRIMARY KEY);
	ALTER SUBSCRIPTION regress_mysub1 ENABLE;
	ALTER SUBSCRIPTION regress_mysub1 REFRESH PUBLICATION;
]);

$subscriber1->wait_for_subscription_sync;

# Do not allow any further advancement of the restart_lsn and
# confirmed_flush_lsn for the lsub1_slot.
$subscriber1->safe_psql('postgres', "ALTER SUBSCRIPTION regress_mysub1 DISABLE");

# Wait for the replication slot to become inactive on the publisher
$primary->poll_query_until(
	'postgres',
	"SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active='f'",
	1);

# Get the restart_lsn for the logical slot lsub1_slot on the primary
my $primary_restart_lsn = $primary->safe_psql('postgres',
	"SELECT restart_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';");

# Get the confirmed_flush_lsn for the logical slot lsub1_slot on the primary
my $primary_flush_lsn = $primary->safe_psql('postgres',
	"SELECT confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';");

# Confirm that restart_lsn and confirmed_flush_lsn of lsub1_slot slot are synced
# to the standby
ok( $standby1->poll_query_until(
		'postgres',
		"SELECT '$primary_restart_lsn' = restart_lsn AND '$primary_flush_lsn' = confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot' AND synced AND NOT temporary;"),
	'restart_lsn and confirmed_flush_lsn of slot lsub1_slot synced to standby');

##################################################
# Promote the standby1 to primary. Confirm that:
# a) the slot 'lsub1_slot' is retained on the new primary
# b) logical replication for regress_mysub1 is resumed successfully after failover
##################################################
$standby1->promote;

# Update subscription with the new primary's connection info
my $standby1_conninfo = $standby1->connstr . ' dbname=postgres';
$subscriber1->safe_psql('postgres',
	"ALTER SUBSCRIPTION regress_mysub1 CONNECTION '$standby1_conninfo';
	 ALTER SUBSCRIPTION regress_mysub1 ENABLE; ");

# Confirm the synced slot 'lsub1_slot' is retained on the new primary
is($standby1->safe_psql('postgres',
	q{SELECT slot_name FROM pg_replication_slots WHERE slot_name = 'lsub1_slot' AND synced AND NOT temporary;}),
	'lsub1_slot',
	'synced slot retained on the new primary');

# Insert data on the new primary
$standby1->safe_psql('postgres',
	"INSERT INTO tab_int SELECT generate_series(11, 20);");
$standby1->wait_for_catchup('regress_mysub1');

# Confirm that data in tab_int replicated on the subscriber
is( $subscriber1->safe_psql('postgres', q{SELECT count(*) FROM tab_int;}),
	"20",
	'data replicated from the new primary');

done_testing();