aboutsummaryrefslogtreecommitdiff
path: root/src/test/recovery/t/040_standby_failover_slots_sync.pl
blob: 2c61c51e914df74b8facd0532d52dabf79183997 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
# Copyright (c) 2024-2025, PostgreSQL Global Development Group

use strict;
use warnings FATAL => 'all';
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;

##################################################
# Test that when a subscription with failover enabled is created, it will alter
# the failover property of the corresponding slot on the publisher.
##################################################

# Create publisher
my $publisher = PostgreSQL::Test::Cluster->new('publisher');
# Make sure pg_hba.conf is set up to allow connections from repl_role.
# This is only needed on Windows machines that don't use UNIX sockets.
$publisher->init(
	allows_streaming => 'logical',
	auth_extra => [ '--create-role' => 'repl_role' ]);
# Disable autovacuum to avoid generating xid during stats update as otherwise
# the new XID could then be replicated to standby at some random point making
# slots at primary lag behind standby during slot sync.
$publisher->append_conf(
	'postgresql.conf', qq{
autovacuum = off
max_prepared_transactions = 1
});
$publisher->start;

$publisher->safe_psql('postgres',
	"CREATE PUBLICATION regress_mypub FOR ALL TABLES;");

my $publisher_connstr = $publisher->connstr . ' dbname=postgres';

# Create a subscriber node, wait for sync to complete
my $subscriber1 = PostgreSQL::Test::Cluster->new('subscriber1');
$subscriber1->init;
$subscriber1->append_conf('postgresql.conf', 'max_prepared_transactions = 1');
$subscriber1->start;

# Capture the time before the logical failover slot is created on the
# primary. We later call this publisher as primary anyway.
my $slot_creation_time_on_primary = $publisher->safe_psql(
	'postgres', qq[
    SELECT current_timestamp;
]);

# Create a subscription that enables failover.
$subscriber1->safe_psql('postgres',
	"CREATE SUBSCRIPTION regress_mysub1 CONNECTION '$publisher_connstr' PUBLICATION regress_mypub WITH (slot_name = lsub1_slot, copy_data = false, failover = true, enabled = false);"
);

# Confirm that the failover flag on the slot is turned on
is( $publisher->safe_psql(
		'postgres',
		q{SELECT failover from pg_replication_slots WHERE slot_name = 'lsub1_slot';}
	),
	"t",
	'logical slot has failover true on the publisher');

##################################################
# Test that changing the failover property of a subscription updates the
# corresponding failover property of the slot.
##################################################

# Disable failover
$subscriber1->safe_psql('postgres',
	"ALTER SUBSCRIPTION regress_mysub1 SET (failover = false)");

# Confirm that the failover flag on the slot has now been turned off
is( $publisher->safe_psql(
		'postgres',
		q{SELECT failover from pg_replication_slots WHERE slot_name = 'lsub1_slot';}
	),
	"f",
	'logical slot has failover false on the publisher');

# Enable failover
$subscriber1->safe_psql('postgres',
	"ALTER SUBSCRIPTION regress_mysub1 SET (failover = true)");

# Confirm that the failover flag on the slot has now been turned on
is( $publisher->safe_psql(
		'postgres',
		q{SELECT failover from pg_replication_slots WHERE slot_name = 'lsub1_slot';}
	),
	"t",
	'logical slot has failover true on the publisher');

##################################################
# Test that the failover option cannot be changed for enabled subscriptions.
##################################################

# Enable subscription
$subscriber1->safe_psql('postgres',
	"ALTER SUBSCRIPTION regress_mysub1 ENABLE");

# Disable failover for enabled subscription
my ($result, $stdout, $stderr) = $subscriber1->psql('postgres',
	"ALTER SUBSCRIPTION regress_mysub1 SET (failover = false)");
ok( $stderr =~
	  /ERROR:  cannot set option "failover" for enabled subscription/,
	"altering failover is not allowed for enabled subscription");

##################################################
# Test that pg_sync_replication_slots() cannot be executed on a non-standby server.
##################################################

($result, $stdout, $stderr) =
  $publisher->psql('postgres', "SELECT pg_sync_replication_slots();");
ok( $stderr =~
	  /ERROR:  replication slots can only be synchronized to a standby server/,
	"cannot sync slots on a non-standby server");

##################################################
# Test logical failover slots corresponding to different plugins can be
# synced to the standby.
#
# Configure standby1 to replicate and synchronize logical slots configured
# for failover on the primary
#
#              failover slot lsub1_slot   |       output_plugin: pgoutput
#              failover slot lsub2_slot   |       output_plugin: test_decoding
# primary --->                            |
#              physical slot sb1_slot --->| ----> standby1 (connected via streaming replication)
#                                         |                 lsub1_slot, lsub2_slot (synced_slot)
##################################################

my $primary = $publisher;
my $backup_name = 'backup';
$primary->backup($backup_name);

# Create a standby
my $standby1 = PostgreSQL::Test::Cluster->new('standby1');
$standby1->init_from_backup(
	$primary, $backup_name,
	has_streaming => 1,
	has_restoring => 1);

# Increase the log_min_messages setting to DEBUG2 on both the standby and
# primary to debug test failures, if any.
my $connstr_1 = $primary->connstr;
$standby1->append_conf(
	'postgresql.conf', qq(
hot_standby_feedback = on
primary_slot_name = 'sb1_slot'
primary_conninfo = '$connstr_1 dbname=postgres'
log_min_messages = 'debug2'
));

$primary->append_conf('postgresql.conf', "log_min_messages = 'debug2'");
$primary->reload;

# Drop the subscription to prevent further advancement of the restart_lsn for
# the lsub1_slot.
$subscriber1->safe_psql('postgres', "DROP SUBSCRIPTION regress_mysub1;");

# To ensure that restart_lsn has moved to a recent WAL position, we re-create
# the lsub1_slot.
$primary->psql('postgres',
	q{SELECT pg_create_logical_replication_slot('lsub1_slot', 'pgoutput', false, false, true);}
);

$primary->psql('postgres',
	q{SELECT pg_create_logical_replication_slot('lsub2_slot', 'test_decoding', false, false, true);}
);

$primary->psql('postgres',
	q{SELECT pg_create_physical_replication_slot('sb1_slot');});

# Start the standby so that slot syncing can begin
$standby1->start;

# Capture the inactive_since of the slot from the primary. Note that the slot
# will be inactive since the corresponding subscription was dropped.
my $inactive_since_on_primary =
  $primary->validate_slot_inactive_since('lsub1_slot',
	$slot_creation_time_on_primary);

# Wait for the standby to catch up so that the standby is not lagging behind
# the failover slots.
$primary->wait_for_replay_catchup($standby1);

# Synchronize the primary server slots to the standby.
$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");

# Confirm that the logical failover slots are created on the standby and are
# flagged as 'synced'
is( $standby1->safe_psql(
		'postgres',
		q{SELECT count(*) = 2 FROM pg_replication_slots WHERE slot_name IN ('lsub1_slot', 'lsub2_slot') AND synced AND NOT temporary;}
	),
	"t",
	'logical slots have synced as true on standby');

# Capture the inactive_since of the synced slot on the standby
my $inactive_since_on_standby =
  $standby1->validate_slot_inactive_since('lsub1_slot',
	$slot_creation_time_on_primary);

# Synced slot on the standby must get its own inactive_since
is( $standby1->safe_psql(
		'postgres',
		"SELECT '$inactive_since_on_primary'::timestamptz < '$inactive_since_on_standby'::timestamptz;"
	),
	"t",
	'synchronized slot has got its own inactive_since');

##################################################
# Test that the synchronized slot will be dropped if the corresponding remote
# slot on the primary server has been dropped.
##################################################

$primary->psql('postgres', "SELECT pg_drop_replication_slot('lsub2_slot');");

$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");

is( $standby1->safe_psql(
		'postgres',
		q{SELECT count(*) = 0 FROM pg_replication_slots WHERE slot_name = 'lsub2_slot';}
	),
	"t",
	'synchronized slot has been dropped');

##################################################
# Test that if the synchronized slot is invalidated while the remote slot is
# still valid, the slot will be dropped and re-created on the standby by
# executing pg_sync_replication_slots() again.
##################################################

# Configure the max_slot_wal_keep_size so that the synced slot can be
# invalidated due to wal removal.
$standby1->append_conf('postgresql.conf', 'max_slot_wal_keep_size = 64kB');
$standby1->reload;

# Generate some activity and switch WAL file on the primary
$primary->advance_wal(1);
$primary->psql('postgres', "CHECKPOINT");
$primary->wait_for_replay_catchup($standby1);

# Request a checkpoint on the standby to trigger the WAL file(s) removal
$standby1->safe_psql('postgres', "CHECKPOINT");

# Check if the synced slot is invalidated
is( $standby1->safe_psql(
		'postgres',
		q{SELECT invalidation_reason = 'wal_removed' FROM pg_replication_slots WHERE slot_name = 'lsub1_slot';}
	),
	"t",
	'synchronized slot has been invalidated');

# Reset max_slot_wal_keep_size to avoid further wal removal
$standby1->append_conf('postgresql.conf', 'max_slot_wal_keep_size = -1');
$standby1->reload;

# Capture the time before the logical failover slot is created on the primary.
$slot_creation_time_on_primary = $publisher->safe_psql(
	'postgres', qq[
    SELECT current_timestamp;
]);

# To ensure that restart_lsn has moved to a recent WAL position, we re-create
# the lsub1_slot.
$primary->safe_psql(
	'postgres', qq[
	SELECT pg_drop_replication_slot('lsub1_slot');
	SELECT pg_create_logical_replication_slot('lsub1_slot', 'pgoutput', false, false, true);
]);

# Capture the inactive_since of the slot from the primary. Note that the slot
# will be inactive since the corresponding subscription was dropped.
$inactive_since_on_primary =
  $primary->validate_slot_inactive_since('lsub1_slot',
	$slot_creation_time_on_primary);

# Wait for the standby to catch up so that the standby is not lagging behind
# the failover slots.
$primary->wait_for_replay_catchup($standby1);

my $log_offset = -s $standby1->logfile;

# Synchronize the primary server slots to the standby.
$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");

# Confirm that the invalidated slot has been dropped.
$standby1->wait_for_log(
	qr/dropped replication slot "lsub1_slot" of database with OID [0-9]+/,
	$log_offset);

# Confirm that the logical slot has been re-created on the standby and is
# flagged as 'synced'
is( $standby1->safe_psql(
		'postgres',
		q{SELECT invalidation_reason IS NULL AND synced AND NOT temporary FROM pg_replication_slots WHERE slot_name = 'lsub1_slot';}
	),
	"t",
	'logical slot is re-synced');

# Reset the log_min_messages to the default value.
$primary->append_conf('postgresql.conf', "log_min_messages = 'warning'");
$primary->reload;

$standby1->append_conf('postgresql.conf', "log_min_messages = 'warning'");
$standby1->reload;

##################################################
# Test that a synchronized slot can not be decoded, altered or dropped by the
# user
##################################################

# Attempting to perform logical decoding on a synced slot should result in an error
($result, $stdout, $stderr) = $standby1->psql('postgres',
	"select * from pg_logical_slot_get_changes('lsub1_slot', NULL, NULL);");
ok( $stderr =~
	  /ERROR:  cannot use replication slot "lsub1_slot" for logical decoding/,
	"logical decoding is not allowed on synced slot");

# Attempting to alter a synced slot should result in an error
($result, $stdout, $stderr) = $standby1->psql(
	'postgres',
	qq[ALTER_REPLICATION_SLOT lsub1_slot (failover);],
	replication => 'database');
ok($stderr =~ /ERROR:  cannot alter replication slot "lsub1_slot"/,
	"synced slot on standby cannot be altered");

# Attempting to drop a synced slot should result in an error
($result, $stdout, $stderr) = $standby1->psql('postgres',
	"SELECT pg_drop_replication_slot('lsub1_slot');");
ok($stderr =~ /ERROR:  cannot drop replication slot "lsub1_slot"/,
	"synced slot on standby cannot be dropped");

##################################################
# Test that we cannot synchronize slots if dbname is not specified in the
# primary_conninfo.
##################################################

$standby1->append_conf('postgresql.conf', "primary_conninfo = '$connstr_1'");
$standby1->reload;

($result, $stdout, $stderr) =
  $standby1->psql('postgres', "SELECT pg_sync_replication_slots();");
ok( $stderr =~
	  /ERROR:  replication slot synchronization requires "dbname" to be specified in "primary_conninfo"/,
	"cannot sync slots if dbname is not specified in primary_conninfo");

# Add the dbname back to the primary_conninfo for further tests
$standby1->append_conf('postgresql.conf',
	"primary_conninfo = '$connstr_1 dbname=postgres'");
$standby1->reload;

##################################################
# Test that we cannot synchronize slots to a cascading standby server.
##################################################

# Create a cascading standby
$backup_name = 'backup2';
$standby1->backup($backup_name);

my $cascading_standby = PostgreSQL::Test::Cluster->new('cascading_standby');
$cascading_standby->init_from_backup(
	$standby1, $backup_name,
	has_streaming => 1,
	has_restoring => 1);

my $cascading_connstr = $standby1->connstr;
$cascading_standby->append_conf(
	'postgresql.conf', qq(
hot_standby_feedback = on
primary_slot_name = 'cascading_sb_slot'
primary_conninfo = '$cascading_connstr dbname=postgres'
));

$standby1->psql('postgres',
	q{SELECT pg_create_physical_replication_slot('cascading_sb_slot');});

$cascading_standby->start;

($result, $stdout, $stderr) =
  $cascading_standby->psql('postgres', "SELECT pg_sync_replication_slots();");
ok( $stderr =~
	  /ERROR:  cannot synchronize replication slots from a standby server/,
	"cannot sync slots to a cascading standby server");

$cascading_standby->stop;

##################################################
# Create a failover slot and advance the restart_lsn to a position where a
# running transaction exists. This setup is for testing that the synced slots
# can achieve the consistent snapshot state starting from the restart_lsn
# after promotion without losing any data that otherwise would have been
# received from the primary.
##################################################

$primary->safe_psql('postgres',
	"SELECT pg_create_logical_replication_slot('snap_test_slot', 'test_decoding', false, false, true);"
);

# Wait for the standby to catch up so that the standby is not lagging behind
# the failover slots.
$primary->wait_for_replay_catchup($standby1);

$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");

# Two xl_running_xacts logs are generated here. When decoding the first log, it
# only serializes the snapshot, without advancing the restart_lsn to the latest
# position. This is because if a transaction is running, the restart_lsn can
# only move to a position before that transaction. Hence, the second
# xl_running_xacts log is needed, the decoding for which allows the restart_lsn
# to advance to the last serialized snapshot's position (the first log).
$primary->safe_psql(
	'postgres', qq(
		BEGIN;
		SELECT txid_current();
		SELECT pg_log_standby_snapshot();
		COMMIT;
		BEGIN;
		SELECT txid_current();
		SELECT pg_log_standby_snapshot();
		COMMIT;
));

# Advance the restart_lsn to the position of the first xl_running_xacts log
# generated above. Note that there might be concurrent xl_running_xacts logs
# written by the bgwriter, which could cause the position to be advanced to an
# unexpected point, but that would be a rare scenario and doesn't affect the
# test results.
$primary->safe_psql('postgres',
	"SELECT pg_replication_slot_advance('snap_test_slot', pg_current_wal_lsn());"
);

# Wait for the standby to catch up so that the standby is not lagging behind
# the failover slots.
$primary->wait_for_replay_catchup($standby1);

# Log a message that will be consumed on the standby after promotion using the
# synced slot. See the test where we promote standby (Promote the standby1 to
# primary.)
$primary->safe_psql('postgres',
	"SELECT pg_logical_emit_message(false, 'test', 'test');");

# Get the confirmed_flush_lsn for the logical slot snap_test_slot on the primary
my $confirmed_flush_lsn = $primary->safe_psql('postgres',
	"SELECT confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'snap_test_slot';"
);

$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");

# Verify that confirmed_flush_lsn of snap_test_slot slot is synced to the standby
ok( $standby1->poll_query_until(
		'postgres',
		"SELECT '$confirmed_flush_lsn' = confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'snap_test_slot' AND synced AND NOT temporary;"
	),
	'confirmed_flush_lsn of slot snap_test_slot synced to standby');

##################################################
# Test to confirm that the slot synchronization is protected from malicious
# users.
##################################################

$primary->psql('postgres', "CREATE DATABASE slotsync_test_db");
$primary->wait_for_replay_catchup($standby1);

$standby1->stop;

# On the primary server, create '=' operator in another schema mapped to
# inequality function and redirect the queries to use new operator by setting
# search_path. The new '=' operator is created with leftarg as 'bigint' and
# right arg as 'int' to redirect 'count(*) = 1' in slot sync's query to use
# new '=' operator.
$primary->safe_psql(
	'slotsync_test_db', q{

CREATE ROLE repl_role REPLICATION LOGIN;
CREATE SCHEMA myschema;

CREATE FUNCTION myschema.myintne(bigint, int) RETURNS bool as $$
		BEGIN
		  RETURN $1 <> $2;
		END;
	  $$ LANGUAGE plpgsql immutable;

CREATE OPERATOR myschema.= (
	  leftarg    = bigint,
	  rightarg   = int,
	  procedure  = myschema.myintne);

ALTER DATABASE slotsync_test_db SET SEARCH_PATH TO myschema,pg_catalog;
GRANT USAGE on SCHEMA myschema TO repl_role;
});

# Start the standby with changed primary_conninfo.
$standby1->append_conf('postgresql.conf',
	"primary_conninfo = '$connstr_1 dbname=slotsync_test_db user=repl_role'");
$standby1->start;

# Run the synchronization function. If the sync flow was not prepared
# to handle such attacks, it would have failed during the validation
# of the primary_slot_name itself resulting in
# ERROR:  slot synchronization requires valid primary_slot_name
$standby1->safe_psql('slotsync_test_db',
	"SELECT pg_sync_replication_slots();");

# Reset the dbname and user in primary_conninfo to the earlier values.
$standby1->append_conf('postgresql.conf',
	"primary_conninfo = '$connstr_1 dbname=postgres'");
$standby1->reload;

# Drop the newly created database.
$primary->psql('postgres', q{DROP DATABASE slotsync_test_db;});

##################################################
# Test to confirm that the slot sync worker exits on invalid GUC(s) and
# get started again on valid GUC(s).
##################################################

$log_offset = -s $standby1->logfile;

# Enable slot sync worker.
$standby1->append_conf('postgresql.conf', qq(sync_replication_slots = on));
$standby1->reload;

# Confirm that the slot sync worker is able to start.
$standby1->wait_for_log(qr/slot sync worker started/, $log_offset);

$log_offset = -s $standby1->logfile;

# Disable another GUC required for slot sync.
$standby1->append_conf('postgresql.conf', qq(hot_standby_feedback = off));
$standby1->reload;

# Confirm that slot sync worker acknowledge the GUC change and logs the msg
# about wrong configuration.
$standby1->wait_for_log(
	qr/slot synchronization worker will restart because of a parameter change/,
	$log_offset);
$standby1->wait_for_log(
	qr/slot synchronization requires "hot_standby_feedback" to be enabled/,
	$log_offset);

$log_offset = -s $standby1->logfile;

# Re-enable the required GUC
$standby1->append_conf('postgresql.conf', "hot_standby_feedback = on");
$standby1->reload;

# Confirm that the slot sync worker is able to start now.
$standby1->wait_for_log(qr/slot sync worker started/, $log_offset);

##################################################
# Test to confirm that confirmed_flush_lsn of the logical slot on the primary
# is synced to the standby via the slot sync worker.
##################################################

# Insert data on the primary
$primary->safe_psql(
	'postgres', qq[
	CREATE TABLE tab_int (a int PRIMARY KEY);
	INSERT INTO tab_int SELECT generate_series(1, 10);
]);

# Subscribe to the new table data and wait for it to arrive
$subscriber1->safe_psql(
	'postgres', qq[
	CREATE TABLE tab_int (a int PRIMARY KEY);
	CREATE SUBSCRIPTION regress_mysub1 CONNECTION '$publisher_connstr' PUBLICATION regress_mypub WITH (slot_name = lsub1_slot, failover = true, create_slot = false);
]);

$subscriber1->wait_for_subscription_sync;

# Do not allow any further advancement of the confirmed_flush_lsn for the
# lsub1_slot.
$subscriber1->safe_psql('postgres',
	"ALTER SUBSCRIPTION regress_mysub1 DISABLE");

# Wait for the replication slot to become inactive on the publisher
$primary->poll_query_until(
	'postgres',
	"SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active='f'",
	1);

# Get the confirmed_flush_lsn for the logical slot lsub1_slot on the primary
my $primary_flush_lsn = $primary->safe_psql('postgres',
	"SELECT confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';"
);

# Confirm that confirmed_flush_lsn of lsub1_slot slot is synced to the standby
ok( $standby1->poll_query_until(
		'postgres',
		"SELECT '$primary_flush_lsn' = confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot' AND synced AND NOT temporary;"
	),
	'confirmed_flush_lsn of slot lsub1_slot synced to standby');

##################################################
# Test that logical failover replication slots wait for the specified
# physical replication slots to receive the changes first. It uses the
# following set up:
#
#				(physical standbys)
#				| ----> standby1 (primary_slot_name = sb1_slot)
#				| ----> standby2 (primary_slot_name = sb2_slot)
# primary -----	|
#				(logical replication)
#				| ----> subscriber1 (failover = true, slot_name = lsub1_slot)
#				| ----> subscriber2 (failover = false, slot_name = lsub2_slot)
#
# synchronized_standby_slots = 'sb1_slot'
#
# The setup is configured in such a way that the logical slot of subscriber1 is
# enabled for failover, and thus the subscriber1 will wait for the physical
# slot of standby1(sb1_slot) to catch up before receiving the decoded changes.
##################################################

$backup_name = 'backup3';

$primary->psql('postgres',
	q{SELECT pg_create_physical_replication_slot('sb2_slot');});

$primary->backup($backup_name);

# Create another standby
my $standby2 = PostgreSQL::Test::Cluster->new('standby2');
$standby2->init_from_backup(
	$primary, $backup_name,
	has_streaming => 1,
	has_restoring => 1);
$standby2->append_conf(
	'postgresql.conf', qq(
primary_slot_name = 'sb2_slot'
));
$standby2->start;
$primary->wait_for_replay_catchup($standby2);

# Configure primary to disallow any logical slots that have enabled failover
# from getting ahead of the specified physical replication slot (sb1_slot).
$primary->append_conf(
	'postgresql.conf', qq(
synchronized_standby_slots = 'sb1_slot'
));
$primary->reload;

# Create another subscriber node without enabling failover, wait for sync to
# complete
my $subscriber2 = PostgreSQL::Test::Cluster->new('subscriber2');
$subscriber2->init;
$subscriber2->start;
$subscriber2->safe_psql(
	'postgres', qq[
	CREATE TABLE tab_int (a int PRIMARY KEY);
	CREATE SUBSCRIPTION regress_mysub2 CONNECTION '$publisher_connstr' PUBLICATION regress_mypub WITH (slot_name = lsub2_slot);
]);

$subscriber2->wait_for_subscription_sync;

$subscriber1->safe_psql('postgres',
	"ALTER SUBSCRIPTION regress_mysub1 ENABLE");

my $offset = -s $primary->logfile;

# Stop the standby associated with the specified physical replication slot
# (sb1_slot) so that the logical replication slot (lsub1_slot) won't receive
# changes until the standby comes up.
$standby1->stop;

# Create some data on the primary
my $primary_row_count = 20;
$primary->safe_psql('postgres',
	"INSERT INTO tab_int SELECT generate_series(11, $primary_row_count);");

# Wait until the standby2 that's still running gets the data from the primary
$primary->wait_for_replay_catchup($standby2);
$result = $standby2->safe_psql('postgres',
	"SELECT count(*) = $primary_row_count FROM tab_int;");
is($result, 't', "standby2 gets data from primary");

# Wait for regress_mysub2 to get the data from the primary. This subscription
# was not enabled for failover so it gets the data without waiting for any
# standbys.
$primary->wait_for_catchup('regress_mysub2');
$result = $subscriber2->safe_psql('postgres',
	"SELECT count(*) = $primary_row_count FROM tab_int;");
is($result, 't', "subscriber2 gets data from primary");

# Wait until the primary server logs a warning indicating that it is waiting
# for the sb1_slot to catch up.
$primary->wait_for_log(
	qr/replication slot \"sb1_slot\" specified in parameter "synchronized_standby_slots" does not have active_pid/,
	$offset);

# The regress_mysub1 was enabled for failover so it doesn't get the data from
# primary and keeps waiting for the standby specified in synchronized_standby_slots
# (sb1_slot aka standby1).
$result =
  $subscriber1->safe_psql('postgres',
	"SELECT count(*) <> $primary_row_count FROM tab_int;");
is($result, 't',
	"subscriber1 doesn't get data from primary until standby1 acknowledges changes"
);

# Start the standby specified in synchronized_standby_slots (sb1_slot aka standby1) and
# wait for it to catch up with the primary.
$standby1->start;
$primary->wait_for_replay_catchup($standby1);
$result = $standby1->safe_psql('postgres',
	"SELECT count(*) = $primary_row_count FROM tab_int;");
is($result, 't', "standby1 gets data from primary");

# Now that the standby specified in synchronized_standby_slots is up and running, the
# primary can send the decoded changes to the subscription enabled for failover
# (i.e. regress_mysub1). While the standby was down, regress_mysub1 didn't
# receive any data from the primary. i.e. the primary didn't allow it to go
# ahead of standby.
$primary->wait_for_catchup('regress_mysub1');
$result = $subscriber1->safe_psql('postgres',
	"SELECT count(*) = $primary_row_count FROM tab_int;");
is($result, 't',
	"subscriber1 gets data from primary after standby1 acknowledges changes");

##################################################
# Verify that when using pg_logical_slot_get_changes to consume changes from a
# logical failover slot, it will also wait for the slots specified in
# synchronized_standby_slots to catch up.
##################################################

# Stop the standby associated with the specified physical replication slot so
# that the logical replication slot won't receive changes until the standby
# slot's restart_lsn is advanced or the slot is removed from the
# synchronized_standby_slots list.
$primary->safe_psql('postgres', "TRUNCATE tab_int;");
$primary->wait_for_catchup('regress_mysub1');
$standby1->stop;

# Disable the regress_mysub1 to prevent the logical walsender from generating
# more warnings.
$subscriber1->safe_psql('postgres',
	"ALTER SUBSCRIPTION regress_mysub1 DISABLE");

# Wait for the replication slot to become inactive on the publisher
$primary->poll_query_until(
	'postgres',
	"SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active = 'f'",
	1);

# Create a logical 'test_decoding' replication slot with failover enabled
$primary->safe_psql('postgres',
	"SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding', false, false, true);"
);

my $back_q = $primary->background_psql(
	'postgres',
	on_error_stop => 0,
	timeout => $PostgreSQL::Test::Utils::timeout_default);

# pg_logical_slot_get_changes will be blocked until the standby catches up,
# hence it needs to be executed in a background session.
$offset = -s $primary->logfile;
$back_q->query_until(
	qr/logical_slot_get_changes/, q(
   \echo logical_slot_get_changes
   SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);
));

# Wait until the primary server logs a warning indicating that it is waiting
# for the sb1_slot to catch up.
$primary->wait_for_log(
	qr/replication slot \"sb1_slot\" specified in parameter "synchronized_standby_slots" does not have active_pid/,
	$offset);

# Remove the standby from the synchronized_standby_slots list and reload the
# configuration.
$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''");
$primary->reload;

# Since there are no slots in synchronized_standby_slots, the function
# pg_logical_slot_get_changes should now return, and the session can be
# stopped.
$back_q->quit;

$primary->safe_psql('postgres',
	"SELECT pg_drop_replication_slot('test_slot');");

# Add the physical slot (sb1_slot) back to the synchronized_standby_slots for further
# tests.
$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots',
	"'sb1_slot'");
$primary->reload;

# Enable the regress_mysub1 for further tests
$subscriber1->safe_psql('postgres',
	"ALTER SUBSCRIPTION regress_mysub1 ENABLE");

##################################################
# Test that logical replication will wait for the user-created inactive
# physical slot to catch up until we remove the slot from synchronized_standby_slots.
##################################################

$offset = -s $primary->logfile;

# Create some data on the primary
$primary_row_count = 10;
$primary->safe_psql('postgres',
	"INSERT INTO tab_int SELECT generate_series(1, $primary_row_count);");

# Wait until the primary server logs a warning indicating that it is waiting
# for the sb1_slot to catch up.
$primary->wait_for_log(
	qr/replication slot \"sb1_slot\" specified in parameter "synchronized_standby_slots" does not have active_pid/,
	$offset);

# The regress_mysub1 doesn't get the data from primary because the specified
# standby slot (sb1_slot) in synchronized_standby_slots is inactive.
$result =
  $subscriber1->safe_psql('postgres', "SELECT count(*) = 0 FROM tab_int;");
is($result, 't',
	"subscriber1 doesn't get data as the sb1_slot doesn't catch up");

# Remove the standby from the synchronized_standby_slots list and reload the
# configuration.
$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''");
$primary->reload;

# Since there are no slots in synchronized_standby_slots, the primary server should now
# send the decoded changes to the subscription.
$primary->wait_for_catchup('regress_mysub1');
$result = $subscriber1->safe_psql('postgres',
	"SELECT count(*) = $primary_row_count FROM tab_int;");
is($result, 't',
	"subscriber1 gets data from primary after standby1 is removed from the synchronized_standby_slots list"
);

# Add the physical slot (sb1_slot) back to the synchronized_standby_slots for further
# tests.
$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots',
	"'sb1_slot'");
$primary->reload;

##################################################
# Test the synchronization of the two_phase setting for a subscription with the
# standby. Additionally, prepare a transaction before enabling the two_phase
# option; subsequent tests will verify if it can be correctly replicated to the
# subscriber after committing it on the promoted standby.
##################################################

$standby1->start;

# Prepare a transaction
$primary->safe_psql(
	'postgres', qq[
	BEGIN;
	INSERT INTO tab_int values(0);
	PREPARE TRANSACTION 'test_twophase_slotsync';
]);

$primary->wait_for_replay_catchup($standby1);
$primary->wait_for_catchup('regress_mysub1');

# Disable the subscription to allow changing the two_phase option.
$subscriber1->safe_psql('postgres',
	"ALTER SUBSCRIPTION regress_mysub1 DISABLE");

# Wait for the replication slot to become inactive on the publisher
$primary->poll_query_until(
	'postgres',
	"SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active='f'",
	1);

# Set two_phase to true and enable the subscription
$subscriber1->safe_psql(
	'postgres', qq[
	ALTER SUBSCRIPTION regress_mysub1 SET (two_phase = true);
	ALTER SUBSCRIPTION regress_mysub1 ENABLE;
]);

$primary->wait_for_catchup('regress_mysub1');

my $two_phase_at = $primary->safe_psql('postgres',
	"SELECT two_phase_at from pg_replication_slots WHERE slot_name = 'lsub1_slot';"
);

# Confirm that two_phase setting of lsub1_slot slot is synced to the standby
ok( $standby1->poll_query_until(
		'postgres',
		"SELECT two_phase AND '$two_phase_at' = two_phase_at from pg_replication_slots WHERE slot_name = 'lsub1_slot' AND synced AND NOT temporary;"
	),
	'two_phase setting of slot lsub1_slot synced to standby');

# Confirm that the prepared transaction is not yet replicated to the
# subscriber.
$result = $subscriber1->safe_psql('postgres',
	"SELECT count(*) = 0 FROM pg_prepared_xacts;");
is($result, 't',
	"the prepared transaction is not replicated to the subscriber");

##################################################
# Promote the standby1 to primary. Confirm that:
# a) the slot 'lsub1_slot' and 'snap_test_slot' are retained on the new primary
# b) logical replication for regress_mysub1 is resumed successfully after failover
# c) changes from the transaction prepared 'test_twophase_slotsync' can be
#    consumed from the synced slot 'snap_test_slot' once committed on the new
#    primary.
# d) changes can be consumed from the synced slot 'snap_test_slot'
##################################################
$primary->wait_for_replay_catchup($standby1);

# Capture the time before the standby is promoted
my $promotion_time_on_primary = $standby1->safe_psql(
	'postgres', qq[
    SELECT current_timestamp;
]);

$standby1->promote;

# Capture the inactive_since of the synced slot after the promotion.
# The expectation here is that the slot gets its inactive_since as part of the
# promotion. We do this check before the slot is enabled on the new primary
# below, otherwise, the slot gets active setting inactive_since to NULL.
my $inactive_since_on_new_primary =
  $standby1->validate_slot_inactive_since('lsub1_slot',
	$promotion_time_on_primary);

is( $standby1->safe_psql(
		'postgres',
		"SELECT '$inactive_since_on_new_primary'::timestamptz > '$inactive_since_on_primary'::timestamptz"
	),
	"t",
	'synchronized slot has got its own inactive_since on the new primary after promotion'
);

# Update subscription with the new primary's connection info
my $standby1_conninfo = $standby1->connstr . ' dbname=postgres';
$subscriber1->safe_psql('postgres',
	"ALTER SUBSCRIPTION regress_mysub1 CONNECTION '$standby1_conninfo';");

# Confirm the synced slot 'lsub1_slot' is retained on the new primary
is( $standby1->safe_psql(
		'postgres',
		q{SELECT count(*) = 2 FROM pg_replication_slots WHERE slot_name IN ('lsub1_slot', 'snap_test_slot') AND synced AND NOT temporary;}
	),
	't',
	'synced slot retained on the new primary');

# Commit the prepared transaction
$standby1->safe_psql('postgres', "COMMIT PREPARED 'test_twophase_slotsync';");
$standby1->wait_for_catchup('regress_mysub1');

# Confirm that the prepared transaction is replicated to the subscriber
is($subscriber1->safe_psql('postgres', q{SELECT count(*) FROM tab_int;}),
	"11", 'prepared data replicated from the new primary');

# Insert data on the new primary
$standby1->safe_psql('postgres',
	"INSERT INTO tab_int SELECT generate_series(11, 20);");
$standby1->wait_for_catchup('regress_mysub1');

# Confirm that data in tab_int replicated on the subscriber
is($subscriber1->safe_psql('postgres', q{SELECT count(*) FROM tab_int;}),
	"21", 'data replicated from the new primary');

# Consume the data from the snap_test_slot. The synced slot should reach a
# consistent point by restoring the snapshot at the restart_lsn serialized
# during slot synchronization.
$result = $standby1->safe_psql('postgres',
	"SELECT count(*) FROM pg_logical_slot_get_changes('snap_test_slot', NULL, NULL) WHERE data ~ 'message*';"
);

is($result, '1', "data can be consumed using snap_test_slot");

done_testing();