aboutsummaryrefslogtreecommitdiff
path: root/src/test/subscription/t/018_stream_subxact_abort.pl
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/subscription/t/018_stream_subxact_abort.pl')
-rw-r--r--src/test/subscription/t/018_stream_subxact_abort.pl220
1 files changed, 147 insertions, 73 deletions
diff --git a/src/test/subscription/t/018_stream_subxact_abort.pl b/src/test/subscription/t/018_stream_subxact_abort.pl
index 170ee10c1db..dce14b150af 100644
--- a/src/test/subscription/t/018_stream_subxact_abort.pl
+++ b/src/test/subscription/t/018_stream_subxact_abort.pl
@@ -8,6 +8,124 @@ use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;
+# Check that the parallel apply worker has finished applying the streaming
+# transaction.
+sub check_parallel_log
+{
+ my ($node_subscriber, $offset, $is_parallel, $type) = @_;
+
+ if ($is_parallel)
+ {
+ $node_subscriber->wait_for_log(
+ qr/DEBUG: ( [A-Z0-9]+:)? finished processing the STREAM $type command/,
+ $offset);
+ }
+}
+
+# Common test steps for both the streaming=on and streaming=parallel cases.
+sub test_streaming
+{
+ my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_;
+
+ my $offset = 0;
+
+ # Check the subscriber log from now on.
+ $offset = -s $node_subscriber->logfile;
+
+ # streamed transaction with DDL, DML and ROLLBACKs
+ $node_publisher->safe_psql(
+ 'postgres', q{
+ BEGIN;
+ INSERT INTO test_tab VALUES (3, md5(3::text));
+ SAVEPOINT s1;
+ INSERT INTO test_tab VALUES (4, md5(4::text));
+ SAVEPOINT s2;
+ INSERT INTO test_tab VALUES (5, md5(5::text));
+ SAVEPOINT s3;
+ INSERT INTO test_tab VALUES (6, md5(6::text));
+ ROLLBACK TO s2;
+ INSERT INTO test_tab VALUES (7, md5(7::text));
+ ROLLBACK TO s1;
+ INSERT INTO test_tab VALUES (8, md5(8::text));
+ SAVEPOINT s4;
+ INSERT INTO test_tab VALUES (9, md5(9::text));
+ SAVEPOINT s5;
+ INSERT INTO test_tab VALUES (10, md5(10::text));
+ COMMIT;
+ });
+
+ $node_publisher->wait_for_catchup($appname);
+
+ check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT');
+
+ my $result =
+ $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), count(c) FROM test_tab");
+ is($result, qq(6|0),
+ 'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults'
+ );
+
+ # Check the subscriber log from now on.
+ $offset = -s $node_subscriber->logfile;
+
+ # streamed transaction with subscriber receiving out of order
+ # subtransaction ROLLBACKs
+ $node_publisher->safe_psql(
+ 'postgres', q{
+ BEGIN;
+ INSERT INTO test_tab VALUES (11, md5(11::text));
+ SAVEPOINT s1;
+ INSERT INTO test_tab VALUES (12, md5(12::text));
+ SAVEPOINT s2;
+ INSERT INTO test_tab VALUES (13, md5(13::text));
+ SAVEPOINT s3;
+ INSERT INTO test_tab VALUES (14, md5(14::text));
+ RELEASE s2;
+ INSERT INTO test_tab VALUES (15, md5(15::text));
+ ROLLBACK TO s1;
+ COMMIT;
+ });
+
+ $node_publisher->wait_for_catchup($appname);
+
+ check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT');
+
+ $result =
+ $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), count(c) FROM test_tab");
+ is($result, qq(7|0),
+ 'check rollback to savepoint was reflected on subscriber');
+
+ # Check the subscriber log from now on.
+ $offset = -s $node_subscriber->logfile;
+
+ # streamed transaction with subscriber receiving rollback
+ $node_publisher->safe_psql(
+ 'postgres', q{
+ BEGIN;
+ INSERT INTO test_tab VALUES (16, md5(16::text));
+ SAVEPOINT s1;
+ INSERT INTO test_tab VALUES (17, md5(17::text));
+ SAVEPOINT s2;
+ INSERT INTO test_tab VALUES (18, md5(18::text));
+ ROLLBACK;
+ });
+
+ $node_publisher->wait_for_catchup($appname);
+
+ check_parallel_log($node_subscriber, $offset, $is_parallel, 'ABORT');
+
+ $result =
+ $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), count(c) FROM test_tab");
+ is($result, qq(7|0), 'check rollback was reflected on subscriber');
+
+ # Cleanup the test data
+ $node_publisher->safe_psql('postgres',
+ "DELETE FROM test_tab WHERE (a > 2)");
+ $node_publisher->wait_for_catchup($appname);
+}
+
# Create publisher node
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical');
@@ -36,6 +154,10 @@ $node_publisher->safe_psql('postgres',
"CREATE PUBLICATION tap_pub FOR TABLE test_tab");
my $appname = 'tap_sub';
+
+################################
+# Test using streaming mode 'on'
+################################
$node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
);
@@ -48,81 +170,33 @@ my $result =
"SELECT count(*), count(c) FROM test_tab");
is($result, qq(2|0), 'check initial data was copied to subscriber');
-# streamed transaction with DDL, DML and ROLLBACKs
-$node_publisher->safe_psql(
- 'postgres', q{
-BEGIN;
-INSERT INTO test_tab VALUES (3, md5(3::text));
-SAVEPOINT s1;
-INSERT INTO test_tab VALUES (4, md5(4::text));
-SAVEPOINT s2;
-INSERT INTO test_tab VALUES (5, md5(5::text));
-SAVEPOINT s3;
-INSERT INTO test_tab VALUES (6, md5(6::text));
-ROLLBACK TO s2;
-INSERT INTO test_tab VALUES (7, md5(7::text));
-ROLLBACK TO s1;
-INSERT INTO test_tab VALUES (8, md5(8::text));
-SAVEPOINT s4;
-INSERT INTO test_tab VALUES (9, md5(9::text));
-SAVEPOINT s5;
-INSERT INTO test_tab VALUES (10, md5(10::text));
-COMMIT;
-});
-
-$node_publisher->wait_for_catchup($appname);
-
-$result =
- $node_subscriber->safe_psql('postgres',
- "SELECT count(*), count(c) FROM test_tab");
-is($result, qq(6|0),
- 'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults'
+test_streaming($node_publisher, $node_subscriber, $appname, 0);
+
+######################################
+# Test using streaming mode 'parallel'
+######################################
+my $oldpid = $node_publisher->safe_psql('postgres',
+ "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';"
);
-# streamed transaction with subscriber receiving out of order subtransaction
-# ROLLBACKs
-$node_publisher->safe_psql(
- 'postgres', q{
-BEGIN;
-INSERT INTO test_tab VALUES (11, md5(11::text));
-SAVEPOINT s1;
-INSERT INTO test_tab VALUES (12, md5(12::text));
-SAVEPOINT s2;
-INSERT INTO test_tab VALUES (13, md5(13::text));
-SAVEPOINT s3;
-INSERT INTO test_tab VALUES (14, md5(14::text));
-RELEASE s2;
-INSERT INTO test_tab VALUES (15, md5(15::text));
-ROLLBACK TO s1;
-COMMIT;
-});
-
-$node_publisher->wait_for_catchup($appname);
-
-$result =
- $node_subscriber->safe_psql('postgres',
- "SELECT count(*), count(c) FROM test_tab");
-is($result, qq(7|0),
- 'check rollback to savepoint was reflected on subscriber');
-
-# streamed transaction with subscriber receiving rollback
-$node_publisher->safe_psql(
- 'postgres', q{
-BEGIN;
-INSERT INTO test_tab VALUES (16, md5(16::text));
-SAVEPOINT s1;
-INSERT INTO test_tab VALUES (17, md5(17::text));
-SAVEPOINT s2;
-INSERT INTO test_tab VALUES (18, md5(18::text));
-ROLLBACK;
-});
-
-$node_publisher->wait_for_catchup($appname);
-
-$result =
- $node_subscriber->safe_psql('postgres',
- "SELECT count(*), count(c) FROM test_tab");
-is($result, qq(7|0), 'check rollback was reflected on subscriber');
+$node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION tap_sub SET(streaming = parallel)");
+
+$node_publisher->poll_query_until('postgres',
+ "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';"
+ )
+ or die
+ "Timed out while waiting for apply to restart after changing SUBSCRIPTION";
+
+# We need to check DEBUG logs to ensure that the parallel apply worker has
+# applied the transaction. So, bump up the log verbosity.
+$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1");
+$node_subscriber->reload;
+
+# Run a query to make sure that the reload has taken effect.
+$node_subscriber->safe_psql('postgres', q{SELECT 1});
+
+test_streaming($node_publisher, $node_subscriber, $appname, 1);
$node_subscriber->stop;
$node_publisher->stop;