aboutsummaryrefslogtreecommitdiff
path: root/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'src/test')
-rw-r--r--src/test/subscription/t/015_stream.pl28
-rw-r--r--src/test/subscription/t/018_stream_subxact_abort.pl61
-rw-r--r--src/test/subscription/t/023_twophase_stream.pl46
3 files changed, 133 insertions, 2 deletions
diff --git a/src/test/subscription/t/015_stream.pl b/src/test/subscription/t/015_stream.pl
index 91e8aa8c0a5..0e0f27f14df 100644
--- a/src/test/subscription/t/015_stream.pl
+++ b/src/test/subscription/t/015_stream.pl
@@ -312,6 +312,34 @@ $result =
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
is($result, qq(10000), 'data replicated to subscriber after dropping index');
+# Test serializing changes to files and notify the parallel apply worker to
+# apply them at the end of the transaction.
+$node_subscriber->append_conf('postgresql.conf',
+ 'logical_replication_mode = immediate');
+# Reset the log_min_messages to default.
+$node_subscriber->append_conf('postgresql.conf', "log_min_messages = warning");
+$node_subscriber->reload;
+
+# Run a query to make sure that the reload has taken effect.
+$node_subscriber->safe_psql('postgres', q{SELECT 1});
+
+$offset = -s $node_subscriber->logfile;
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i)");
+
+# Ensure that the changes are serialized.
+$node_subscriber->wait_for_log(
+ qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/,
+ $offset);
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check that transaction is committed on subscriber
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
+is($result, qq(15000), 'parallel apply worker replayed all changes from file');
+
$node_subscriber->stop;
$node_publisher->stop;
diff --git a/src/test/subscription/t/018_stream_subxact_abort.pl b/src/test/subscription/t/018_stream_subxact_abort.pl
index 814daf4d2f9..2b67ae1e0ac 100644
--- a/src/test/subscription/t/018_stream_subxact_abort.pl
+++ b/src/test/subscription/t/018_stream_subxact_abort.pl
@@ -143,15 +143,17 @@ $node_publisher->safe_psql('postgres',
"CREATE TABLE test_tab (a int primary key, b varchar)");
$node_publisher->safe_psql('postgres',
"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
+$node_publisher->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)");
# Setup structure on subscriber
$node_subscriber->safe_psql('postgres',
"CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)");
# Setup logical replication
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres',
- "CREATE PUBLICATION tap_pub FOR TABLE test_tab");
+ "CREATE PUBLICATION tap_pub FOR TABLE test_tab, test_tab_2");
my $appname = 'tap_sub';
@@ -198,6 +200,63 @@ $node_subscriber->safe_psql('postgres', q{SELECT 1});
test_streaming($node_publisher, $node_subscriber, $appname, 1);
+# Test serializing changes to files and notify the parallel apply worker to
+# apply them at the end of the transaction.
+$node_subscriber->append_conf('postgresql.conf',
+ 'logical_replication_mode = immediate');
+# Reset the log_min_messages to default.
+$node_subscriber->append_conf('postgresql.conf', "log_min_messages = warning");
+$node_subscriber->reload;
+
+# Run a query to make sure that the reload has taken effect.
+$node_subscriber->safe_psql('postgres', q{SELECT 1});
+
+my $offset = -s $node_subscriber->logfile;
+
+$node_publisher->safe_psql(
+ 'postgres', q{
+ BEGIN;
+ INSERT INTO test_tab_2 values(1);
+ ROLLBACK;
+ });
+
+# Ensure that the changes are serialized.
+$node_subscriber->wait_for_log(
+ qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/,
+ $offset);
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check that transaction is aborted on subscriber
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
+is($result, qq(0), 'check rollback was reflected on subscriber');
+
+# Serialize the ABORT sub-transaction.
+$offset = -s $node_subscriber->logfile;
+
+$node_publisher->safe_psql(
+ 'postgres', q{
+ BEGIN;
+ INSERT INTO test_tab_2 values(1);
+ SAVEPOINT sp;
+ INSERT INTO test_tab_2 values(1);
+ ROLLBACK TO sp;
+ COMMIT;
+ });
+
+# Ensure that the changes are serialized.
+$node_subscriber->wait_for_log(
+ qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/,
+ $offset);
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check that only sub-transaction is aborted on subscriber.
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
+is($result, qq(1), 'check rollback to savepoint was reflected on subscriber');
+
$node_subscriber->stop;
$node_publisher->stop;
diff --git a/src/test/subscription/t/023_twophase_stream.pl b/src/test/subscription/t/023_twophase_stream.pl
index 497245a209c..1cc871fddbd 100644
--- a/src/test/subscription/t/023_twophase_stream.pl
+++ b/src/test/subscription/t/023_twophase_stream.pl
@@ -319,16 +319,18 @@ $node_publisher->safe_psql('postgres',
"CREATE TABLE test_tab (a int primary key, b varchar)");
$node_publisher->safe_psql('postgres',
"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
+$node_publisher->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)");
# Setup structure on subscriber (columns a and b are compatible with same table name on publisher)
$node_subscriber->safe_psql('postgres',
"CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"
);
+$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)");
# Setup logical replication (streaming = on)
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres',
- "CREATE PUBLICATION tap_pub FOR TABLE test_tab");
+ "CREATE PUBLICATION tap_pub FOR TABLE test_tab, test_tab_2");
my $appname = 'tap_sub';
@@ -384,6 +386,48 @@ $node_subscriber->safe_psql('postgres', q{SELECT 1});
test_streaming($node_publisher, $node_subscriber, $appname, 1);
+# Test serializing changes to files and notify the parallel apply worker to
+# apply them at the end of the transaction.
+$node_subscriber->append_conf('postgresql.conf',
+ 'logical_replication_mode = immediate');
+# Reset the log_min_messages to default.
+$node_subscriber->append_conf('postgresql.conf', "log_min_messages = warning");
+$node_subscriber->reload;
+
+# Run a query to make sure that the reload has taken effect.
+$node_subscriber->safe_psql('postgres', q{SELECT 1});
+
+my $offset = -s $node_subscriber->logfile;
+
+$node_publisher->safe_psql(
+ 'postgres', q{
+ BEGIN;
+ INSERT INTO test_tab_2 values(1);
+ PREPARE TRANSACTION 'xact';
+ });
+
+# Ensure that the changes are serialized.
+$node_subscriber->wait_for_log(
+ qr/LOG: ( [A-Z0-9]+:)? logical replication apply worker will serialize the remaining changes of remote transaction \d+ to a file/,
+ $offset);
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# Check that 2PC gets committed on subscriber
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'xact';");
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check that transaction is committed on subscriber
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
+is($result, qq(1), 'transaction is committed on subscriber');
+
###############################
# check all the cleanup
###############################