aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/test/subscription/t/016_stream_subxact.pl81
-rw-r--r--src/test/subscription/t/017_stream_ddl.pl110
-rw-r--r--src/test/subscription/t/018_stream_subxact_abort.pl117
-rw-r--r--src/test/subscription/t/019_stream_subxact_ddl_abort.pl76
4 files changed, 384 insertions, 0 deletions
diff --git a/src/test/subscription/t/016_stream_subxact.pl b/src/test/subscription/t/016_stream_subxact.pl
new file mode 100644
index 00000000000..b6a2d10e91e
--- /dev/null
+++ b/src/test/subscription/t/016_stream_subxact.pl
@@ -0,0 +1,81 @@
+# Test streaming of large transaction containing large subtransactions
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 2;
+
+# Create publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE test_tab (a int primary key, b varchar)");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab");
+
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres',
+"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
+);
+
+$node_publisher->wait_for_catchup($appname);
+
+# Also wait for initial table sync to finish
+my $synced_query =
+"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+my $result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(2|2|2), 'check initial data was copied to subscriber');
+
+# Insert, update and delete enough rows to exceed 64kB limit.
+$node_publisher->safe_psql('postgres', q{
+BEGIN;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series( 3, 500) s(i);
+UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+DELETE FROM test_tab WHERE mod(a,3) = 0;
+SAVEPOINT s1;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(501, 1000) s(i);
+UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+DELETE FROM test_tab WHERE mod(a,3) = 0;
+SAVEPOINT s2;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1001, 1500) s(i);
+UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+DELETE FROM test_tab WHERE mod(a,3) = 0;
+SAVEPOINT s3;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1501, 2000) s(i);
+UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+DELETE FROM test_tab WHERE mod(a,3) = 0;
+SAVEPOINT s4;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2001, 2500) s(i);
+UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+DELETE FROM test_tab WHERE mod(a,3) = 0;
+COMMIT;
+});
+
+$node_publisher->wait_for_catchup($appname);
+
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(1667|1667|1667), 'check data was copied to subscriber in streaming mode and extra columns contain local defaults');
+
+$node_subscriber->stop;
+$node_publisher->stop;
diff --git a/src/test/subscription/t/017_stream_ddl.pl b/src/test/subscription/t/017_stream_ddl.pl
new file mode 100644
index 00000000000..be7d7d74e3f
--- /dev/null
+++ b/src/test/subscription/t/017_stream_ddl.pl
@@ -0,0 +1,110 @@
+# Test streaming of large transaction with DDL and subtransactions
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 3;
+
+# Create publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE test_tab (a int primary key, b varchar)");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT, f INT)");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab");
+
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres',
+"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
+);
+
+$node_publisher->wait_for_catchup($appname);
+
+# Also wait for initial table sync to finish
+my $synced_query =
+"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+my $result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(2|0|0), 'check initial data was copied to subscriber');
+
+# a small (non-streamed) transaction with DDL and DML
+$node_publisher->safe_psql('postgres', q{
+BEGIN;
+INSERT INTO test_tab VALUES (3, md5(3::text));
+ALTER TABLE test_tab ADD COLUMN c INT;
+SAVEPOINT s1;
+INSERT INTO test_tab VALUES (4, md5(4::text), -4);
+COMMIT;
+});
+
+# large (streamed) transaction with DDL and DML
+$node_publisher->safe_psql('postgres', q{
+BEGIN;
+INSERT INTO test_tab SELECT i, md5(i::text), -i FROM generate_series(5, 1000) s(i);
+ALTER TABLE test_tab ADD COLUMN d INT;
+SAVEPOINT s1;
+INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i FROM generate_series(1001, 2000) s(i);
+COMMIT;
+});
+
+# a small (non-streamed) transaction with DDL and DML
+$node_publisher->safe_psql('postgres', q{
+BEGIN;
+INSERT INTO test_tab VALUES (2001, md5(2001::text), -2001, 2*2001);
+ALTER TABLE test_tab ADD COLUMN e INT;
+SAVEPOINT s1;
+INSERT INTO test_tab VALUES (2002, md5(2002::text), -2002, 2*2002, -3*2002);
+COMMIT;
+});
+
+$node_publisher->wait_for_catchup($appname);
+
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d), count(e) FROM test_tab");
+is($result, qq(2002|1999|1002|1), 'check data was copied to subscriber in streaming mode and extra columns contain local defaults');
+
+# A large (streamed) transaction with DDL and DML. One of the DDL is performed
+# after DML to ensure that we invalidate the schema sent for test_tab so that
+# the next transaction has to send the schema again.
+$node_publisher->safe_psql('postgres', q{
+BEGIN;
+INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i, -3*i FROM generate_series(2003,5000) s(i);
+ALTER TABLE test_tab ADD COLUMN f INT;
+COMMIT;
+});
+
+# A small transaction that won't get streamed. This is just to ensure that we
+# send the schema again to reflect the last column added in the previous test.
+$node_publisher->safe_psql('postgres', q{
+BEGIN;
+INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i, -3*i, 4*i FROM generate_series(5001,5005) s(i);
+COMMIT;
+});
+
+$node_publisher->wait_for_catchup($appname);
+
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d), count(e), count(f) FROM test_tab");
+is($result, qq(5005|5002|4005|3004|5), 'check data was copied to subscriber for both streaming and non-streaming transactions');
+
+$node_subscriber->stop;
+$node_publisher->stop;
diff --git a/src/test/subscription/t/018_stream_subxact_abort.pl b/src/test/subscription/t/018_stream_subxact_abort.pl
new file mode 100644
index 00000000000..ddf0621558a
--- /dev/null
+++ b/src/test/subscription/t/018_stream_subxact_abort.pl
@@ -0,0 +1,117 @@
+# Test streaming of large transaction containing multiple subtransactions and rollbacks
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 4;
+
+# Create publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE test_tab (a int primary key, b varchar)");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT)");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab");
+
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres',
+"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
+);
+
+$node_publisher->wait_for_catchup($appname);
+
+# Also wait for initial table sync to finish
+my $synced_query =
+"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+my $result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab");
+is($result, qq(2|0), 'check initial data was copied to subscriber');
+
+# large (streamed) transaction with DDL, DML and ROLLBACKs
+$node_publisher->safe_psql('postgres', q{
+BEGIN;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3,500) s(i);
+SAVEPOINT s1;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(501,1000) s(i);
+SAVEPOINT s2;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1001,1500) s(i);
+SAVEPOINT s3;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1501,2000) s(i);
+ROLLBACK TO s2;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2001,2500) s(i);
+ROLLBACK TO s1;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2501,3000) s(i);
+SAVEPOINT s4;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3001,3500) s(i);
+SAVEPOINT s5;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3501,4000) s(i);
+COMMIT;
+});
+
+$node_publisher->wait_for_catchup($appname);
+
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab");
+is($result, qq(2000|0), 'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults');
+
+# large (streamed) transaction with subscriber receiving out of order
+# subtransaction ROLLBACKs
+$node_publisher->safe_psql('postgres', q{
+BEGIN;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(4001,4500) s(i);
+SAVEPOINT s1;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001,5500) s(i);
+SAVEPOINT s2;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(6001,6500) s(i);
+SAVEPOINT s3;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(7001,7500) s(i);
+RELEASE s2;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(8001,8500) s(i);
+ROLLBACK TO s1;
+COMMIT;
+});
+
+$node_publisher->wait_for_catchup($appname);
+
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab");
+is($result, qq(2500|0), 'check rollback to savepoint was reflected on subscriber');
+
+# large (streamed) transaction with subscriber receiving rollback
+$node_publisher->safe_psql('postgres', q{
+BEGIN;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(8501,9000) s(i);
+SAVEPOINT s1;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(9001,9500) s(i);
+SAVEPOINT s2;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(9501,10000) s(i);
+ROLLBACK;
+});
+
+$node_publisher->wait_for_catchup($appname);
+
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab");
+is($result, qq(2500|0), 'check rollback was reflected on subscriber');
+
+$node_subscriber->stop;
+$node_publisher->stop;
diff --git a/src/test/subscription/t/019_stream_subxact_ddl_abort.pl b/src/test/subscription/t/019_stream_subxact_ddl_abort.pl
new file mode 100644
index 00000000000..33e42edfef2
--- /dev/null
+++ b/src/test/subscription/t/019_stream_subxact_ddl_abort.pl
@@ -0,0 +1,76 @@
+# Test streaming of large transaction with subtransactions, DDLs, DMLs, and
+# rollbacks
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 2;
+
+# Create publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE test_tab (a int primary key, b varchar)");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT)");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab");
+
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres',
+"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
+);
+
+$node_publisher->wait_for_catchup($appname);
+
+# Also wait for initial table sync to finish
+my $synced_query =
+"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+my $result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab");
+is($result, qq(2|0), 'check initial data was copied to subscriber');
+
+# large (streamed) transaction with DDL, DML and ROLLBACKs
+$node_publisher->safe_psql('postgres', q{
+BEGIN;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3,500) s(i);
+ALTER TABLE test_tab ADD COLUMN c INT;
+SAVEPOINT s1;
+INSERT INTO test_tab SELECT i, md5(i::text), -i FROM generate_series(501,1000) s(i);
+ALTER TABLE test_tab ADD COLUMN d INT;
+SAVEPOINT s2;
+INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i FROM generate_series(1001,1500) s(i);
+ALTER TABLE test_tab ADD COLUMN e INT;
+SAVEPOINT s3;
+INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i, -3*i FROM generate_series(1501,2000) s(i);
+ALTER TABLE test_tab DROP COLUMN c;
+ROLLBACK TO s1;
+INSERT INTO test_tab SELECT i, md5(i::text), i FROM generate_series(501,1000) s(i);
+COMMIT;
+});
+
+$node_publisher->wait_for_catchup($appname);
+
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab");
+is($result, qq(1000|500), 'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults');
+
+$node_subscriber->stop;
+$node_publisher->stop;