diff options
Diffstat (limited to 'src/test/subscription/t/035_conflicts.pl')
-rw-r--r-- | src/test/subscription/t/035_conflicts.pl | 208 |
1 files changed, 205 insertions, 3 deletions
diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl index 2a7a8239a29..976d53a870e 100644 --- a/src/test/subscription/t/035_conflicts.pl +++ b/src/test/subscription/t/035_conflicts.pl @@ -1,6 +1,6 @@ # Copyright (c) 2025, PostgreSQL Global Development Group -# Test the conflict detection of conflict type 'multiple_unique_conflicts'. +# Test conflicts in logical replication use strict; use warnings FATAL => 'all'; use PostgreSQL::Test::Cluster; @@ -18,7 +18,7 @@ $node_publisher->start; # Create a subscriber node my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); -$node_subscriber->init; +$node_subscriber->init(allows_streaming => 'logical'); $node_subscriber->start; # Create a table on publisher @@ -26,7 +26,8 @@ $node_publisher->safe_psql('postgres', "CREATE TABLE conf_tab (a int PRIMARY KEY, b int UNIQUE, c int UNIQUE);"); $node_publisher->safe_psql('postgres', - "CREATE TABLE conf_tab_2 (a int PRIMARY KEY, b int UNIQUE, c int UNIQUE);"); + "CREATE TABLE conf_tab_2 (a int PRIMARY KEY, b int UNIQUE, c int UNIQUE);" +); # Create same table on subscriber $node_subscriber->safe_psql('postgres', @@ -145,4 +146,205 @@ $node_subscriber->wait_for_log( pass('multiple_unique_conflicts detected on a leaf partition during insert'); +############################################################################### +# Setup a bidirectional logical replication between node_A & node_B +############################################################################### + +# Initialize nodes. + +# node_A. Increase the log_min_messages setting to DEBUG2 to debug test +# failures. Disable autovacuum to avoid generating xid that could affect the +# replication slot's xmin value. +my $node_A = $node_publisher; +$node_A->append_conf( + 'postgresql.conf', + qq{autovacuum = off + log_min_messages = 'debug2'}); +$node_A->restart; + +# node_B +my $node_B = $node_subscriber; +$node_B->append_conf('postgresql.conf', "track_commit_timestamp = on"); +$node_B->restart; + +# Create table on node_A +$node_A->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY, b int)"); + +# Create the same table on node_B +$node_B->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY, b int)"); + +my $subname_AB = 'tap_sub_a_b'; +my $subname_BA = 'tap_sub_b_a'; + +# Setup logical replication +# node_A (pub) -> node_B (sub) +my $node_A_connstr = $node_A->connstr . ' dbname=postgres'; +$node_A->safe_psql('postgres', "CREATE PUBLICATION tap_pub_A FOR TABLE tab"); +$node_B->safe_psql( + 'postgres', " + CREATE SUBSCRIPTION $subname_BA + CONNECTION '$node_A_connstr application_name=$subname_BA' + PUBLICATION tap_pub_A + WITH (origin = none, retain_dead_tuples = true)"); + +# node_B (pub) -> node_A (sub) +my $node_B_connstr = $node_B->connstr . ' dbname=postgres'; +$node_B->safe_psql('postgres', "CREATE PUBLICATION tap_pub_B FOR TABLE tab"); +$node_A->safe_psql( + 'postgres', " + CREATE SUBSCRIPTION $subname_AB + CONNECTION '$node_B_connstr application_name=$subname_AB' + PUBLICATION tap_pub_B + WITH (origin = none, copy_data = off)"); + +# Wait for initial table sync to finish +$node_A->wait_for_subscription_sync($node_B, $subname_AB); +$node_B->wait_for_subscription_sync($node_A, $subname_BA); + +is(1, 1, 'Bidirectional replication setup is complete'); + +# Confirm that the conflict detection slot is created on Node B and the xmin +# value is valid. +ok( $node_B->poll_query_until( + 'postgres', + "SELECT xmin IS NOT NULL from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'" + ), + "the xmin value of slot 'pg_conflict_detection' is valid on Node B"); + +################################################## +# Check that the retain_dead_tuples option can be enabled only for disabled +# subscriptions. Validate the NOTICE message during the subscription DDL, and +# ensure the conflict detection slot is created upon enabling the +# retain_dead_tuples option. +################################################## + +# Alter retain_dead_tuples for enabled subscription +my ($cmdret, $stdout, $stderr) = $node_A->psql('postgres', + "ALTER SUBSCRIPTION $subname_AB SET (retain_dead_tuples = true)"); +ok( $stderr =~ + /ERROR: cannot set option \"retain_dead_tuples\" for enabled subscription/, + "altering retain_dead_tuples is not allowed for enabled subscription"); + +# Disable the subscription +$node_A->psql('postgres', "ALTER SUBSCRIPTION $subname_AB DISABLE;"); + +# Wait for the apply worker to stop +$node_A->poll_query_until('postgres', + "SELECT count(*) = 0 FROM pg_stat_activity WHERE backend_type = 'logical replication apply worker'" +); + +# Enable retain_dead_tuples for disabled subscription +($cmdret, $stdout, $stderr) = $node_A->psql('postgres', + "ALTER SUBSCRIPTION $subname_AB SET (retain_dead_tuples = true);"); +ok( $stderr =~ + /NOTICE: deleted rows to detect conflicts would not be removed until the subscription is enabled/, + "altering retain_dead_tuples is allowed for disabled subscription"); + +# Re-enable the subscription +$node_A->safe_psql('postgres', "ALTER SUBSCRIPTION $subname_AB ENABLE;"); + +# Confirm that the conflict detection slot is created on Node A and the xmin +# value is valid. +ok( $node_A->poll_query_until( + 'postgres', + "SELECT xmin IS NOT NULL from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'" + ), + "the xmin value of slot 'pg_conflict_detection' is valid on Node A"); + +################################################## +# Check the WARNING when changing the origin to ANY, if retain_dead_tuples is +# enabled. This warns of the possibility of receiving changes from origins +# other than the publisher. +################################################## + +($cmdret, $stdout, $stderr) = $node_A->psql('postgres', + "ALTER SUBSCRIPTION $subname_AB SET (origin = any);"); +ok( $stderr =~ + /WARNING: subscription "tap_sub_a_b" enabled retain_dead_tuples but might not reliably detect conflicts for changes from different origins/, + "warn of the possibility of receiving changes from origins other than the publisher"); + +# Reset the origin to none +$node_A->psql('postgres', + "ALTER SUBSCRIPTION $subname_AB SET (origin = none);"); + +############################################################################### +# Check that dead tuples on node A cannot be cleaned by VACUUM until the +# concurrent transactions on Node B have been applied and flushed on Node A. +############################################################################### + +# Insert a record +$node_A->safe_psql('postgres', "INSERT INTO tab VALUES (1, 1), (2, 2);"); +$node_A->wait_for_catchup($subname_BA); + +my $result = $node_B->safe_psql('postgres', "SELECT * FROM tab;"); +is($result, qq(1|1 +2|2), 'check replicated insert on node B'); + +# Disable the logical replication from node B to node A +$node_A->safe_psql('postgres', "ALTER SUBSCRIPTION $subname_AB DISABLE"); + +# Wait for the apply worker to stop +$node_A->poll_query_until('postgres', + "SELECT count(*) = 0 FROM pg_stat_activity WHERE backend_type = 'logical replication apply worker'" +); + +$node_B->safe_psql('postgres', "UPDATE tab SET b = 3 WHERE a = 1;"); +$node_A->safe_psql('postgres', "DELETE FROM tab WHERE a = 1;"); + +($cmdret, $stdout, $stderr) = $node_A->psql( + 'postgres', qq(VACUUM (verbose) public.tab;) +); + +ok( $stderr =~ + qr/1 are dead but not yet removable/, + 'the deleted column is non-removable'); + +$node_A->safe_psql( + 'postgres', "ALTER SUBSCRIPTION $subname_AB ENABLE;"); +$node_B->wait_for_catchup($subname_AB); + +# Remember the next transaction ID to be assigned +my $next_xid = $node_A->safe_psql('postgres', "SELECT txid_current() + 1;"); + +# Confirm that the xmin value is advanced to the latest nextXid. If no +# transactions are running, the apply worker selects nextXid as the candidate +# for the non-removable xid. See GetOldestActiveTransactionId(). +ok( $node_A->poll_query_until( + 'postgres', + "SELECT xmin = $next_xid from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'" + ), + "the xmin value of slot 'pg_conflict_detection' is updated on Node A"); + +# Confirm that the dead tuple can be removed now +($cmdret, $stdout, $stderr) = $node_A->psql( + 'postgres', qq(VACUUM (verbose) public.tab;) +); + +ok( $stderr =~ + qr/1 removed, 1 remain, 0 are dead but not yet removable/, + 'the deleted column is removed'); + +############################################################################### +# Check that the replication slot pg_conflict_detection is dropped after +# removing all the subscriptions. +############################################################################### + +$node_B->safe_psql( + 'postgres', "DROP SUBSCRIPTION $subname_BA"); + +ok( $node_B->poll_query_until( + 'postgres', + "SELECT count(*) = 0 FROM pg_replication_slots WHERE slot_name = 'pg_conflict_detection'" + ), + "the slot 'pg_conflict_detection' has been dropped on Node B"); + +$node_A->safe_psql( + 'postgres', "DROP SUBSCRIPTION $subname_AB"); + +ok( $node_A->poll_query_until( + 'postgres', + "SELECT count(*) = 0 FROM pg_replication_slots WHERE slot_name = 'pg_conflict_detection'" + ), + "the slot 'pg_conflict_detection' has been dropped on Node A"); + done_testing(); |