diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/test/subscription/t/016_stream_subxact.pl | 81 | ||||
-rw-r--r-- | src/test/subscription/t/017_stream_ddl.pl | 110 | ||||
-rw-r--r-- | src/test/subscription/t/018_stream_subxact_abort.pl | 117 | ||||
-rw-r--r-- | src/test/subscription/t/019_stream_subxact_ddl_abort.pl | 76 |
4 files changed, 384 insertions, 0 deletions
diff --git a/src/test/subscription/t/016_stream_subxact.pl b/src/test/subscription/t/016_stream_subxact.pl new file mode 100644 index 00000000000..b6a2d10e91e --- /dev/null +++ b/src/test/subscription/t/016_stream_subxact.pl @@ -0,0 +1,81 @@ +# Test streaming of large transaction containing large subtransactions +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 2; + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" +); + +$node_publisher->wait_for_catchup($appname); + +# Also wait for initial table sync to finish +my $synced_query = +"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(2|2|2), 'check initial data was copied to subscriber'); + +# Insert, update and delete enough rows to exceed 64kB limit. +$node_publisher->safe_psql('postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series( 3, 500) s(i); +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; +DELETE FROM test_tab WHERE mod(a,3) = 0; +SAVEPOINT s1; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(501, 1000) s(i); +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; +DELETE FROM test_tab WHERE mod(a,3) = 0; +SAVEPOINT s2; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1001, 1500) s(i); +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; +DELETE FROM test_tab WHERE mod(a,3) = 0; +SAVEPOINT s3; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1501, 2000) s(i); +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; +DELETE FROM test_tab WHERE mod(a,3) = 0; +SAVEPOINT s4; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2001, 2500) s(i); +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; +DELETE FROM test_tab WHERE mod(a,3) = 0; +COMMIT; +}); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(1667|1667|1667), 'check data was copied to subscriber in streaming mode and extra columns contain local defaults'); + +$node_subscriber->stop; +$node_publisher->stop; diff --git a/src/test/subscription/t/017_stream_ddl.pl b/src/test/subscription/t/017_stream_ddl.pl new file mode 100644 index 00000000000..be7d7d74e3f --- /dev/null +++ b/src/test/subscription/t/017_stream_ddl.pl @@ -0,0 +1,110 @@ +# Test streaming of large transaction with DDL and subtransactions +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 3; + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT, f INT)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" +); + +$node_publisher->wait_for_catchup($appname); + +# Also wait for initial table sync to finish +my $synced_query = +"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(2|0|0), 'check initial data was copied to subscriber'); + +# a small (non-streamed) transaction with DDL and DML +$node_publisher->safe_psql('postgres', q{ +BEGIN; +INSERT INTO test_tab VALUES (3, md5(3::text)); +ALTER TABLE test_tab ADD COLUMN c INT; +SAVEPOINT s1; +INSERT INTO test_tab VALUES (4, md5(4::text), -4); +COMMIT; +}); + +# large (streamed) transaction with DDL and DML +$node_publisher->safe_psql('postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text), -i FROM generate_series(5, 1000) s(i); +ALTER TABLE test_tab ADD COLUMN d INT; +SAVEPOINT s1; +INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i FROM generate_series(1001, 2000) s(i); +COMMIT; +}); + +# a small (non-streamed) transaction with DDL and DML +$node_publisher->safe_psql('postgres', q{ +BEGIN; +INSERT INTO test_tab VALUES (2001, md5(2001::text), -2001, 2*2001); +ALTER TABLE test_tab ADD COLUMN e INT; +SAVEPOINT s1; +INSERT INTO test_tab VALUES (2002, md5(2002::text), -2002, 2*2002, -3*2002); +COMMIT; +}); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d), count(e) FROM test_tab"); +is($result, qq(2002|1999|1002|1), 'check data was copied to subscriber in streaming mode and extra columns contain local defaults'); + +# A large (streamed) transaction with DDL and DML. One of the DDL is performed +# after DML to ensure that we invalidate the schema sent for test_tab so that +# the next transaction has to send the schema again. +$node_publisher->safe_psql('postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i, -3*i FROM generate_series(2003,5000) s(i); +ALTER TABLE test_tab ADD COLUMN f INT; +COMMIT; +}); + +# A small transaction that won't get streamed. This is just to ensure that we +# send the schema again to reflect the last column added in the previous test. +$node_publisher->safe_psql('postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i, -3*i, 4*i FROM generate_series(5001,5005) s(i); +COMMIT; +}); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d), count(e), count(f) FROM test_tab"); +is($result, qq(5005|5002|4005|3004|5), 'check data was copied to subscriber for both streaming and non-streaming transactions'); + +$node_subscriber->stop; +$node_publisher->stop; diff --git a/src/test/subscription/t/018_stream_subxact_abort.pl b/src/test/subscription/t/018_stream_subxact_abort.pl new file mode 100644 index 00000000000..ddf0621558a --- /dev/null +++ b/src/test/subscription/t/018_stream_subxact_abort.pl @@ -0,0 +1,117 @@ +# Test streaming of large transaction containing multiple subtransactions and rollbacks +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 4; + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" +); + +$node_publisher->wait_for_catchup($appname); + +# Also wait for initial table sync to finish +my $synced_query = +"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab"); +is($result, qq(2|0), 'check initial data was copied to subscriber'); + +# large (streamed) transaction with DDL, DML and ROLLBACKs +$node_publisher->safe_psql('postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3,500) s(i); +SAVEPOINT s1; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(501,1000) s(i); +SAVEPOINT s2; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1001,1500) s(i); +SAVEPOINT s3; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1501,2000) s(i); +ROLLBACK TO s2; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2001,2500) s(i); +ROLLBACK TO s1; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2501,3000) s(i); +SAVEPOINT s4; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3001,3500) s(i); +SAVEPOINT s5; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3501,4000) s(i); +COMMIT; +}); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab"); +is($result, qq(2000|0), 'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults'); + +# large (streamed) transaction with subscriber receiving out of order +# subtransaction ROLLBACKs +$node_publisher->safe_psql('postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(4001,4500) s(i); +SAVEPOINT s1; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001,5500) s(i); +SAVEPOINT s2; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(6001,6500) s(i); +SAVEPOINT s3; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(7001,7500) s(i); +RELEASE s2; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(8001,8500) s(i); +ROLLBACK TO s1; +COMMIT; +}); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab"); +is($result, qq(2500|0), 'check rollback to savepoint was reflected on subscriber'); + +# large (streamed) transaction with subscriber receiving rollback +$node_publisher->safe_psql('postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(8501,9000) s(i); +SAVEPOINT s1; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(9001,9500) s(i); +SAVEPOINT s2; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(9501,10000) s(i); +ROLLBACK; +}); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab"); +is($result, qq(2500|0), 'check rollback was reflected on subscriber'); + +$node_subscriber->stop; +$node_publisher->stop; diff --git a/src/test/subscription/t/019_stream_subxact_ddl_abort.pl b/src/test/subscription/t/019_stream_subxact_ddl_abort.pl new file mode 100644 index 00000000000..33e42edfef2 --- /dev/null +++ b/src/test/subscription/t/019_stream_subxact_ddl_abort.pl @@ -0,0 +1,76 @@ +# Test streaming of large transaction with subtransactions, DDLs, DMLs, and +# rollbacks +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 2; + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" +); + +$node_publisher->wait_for_catchup($appname); + +# Also wait for initial table sync to finish +my $synced_query = +"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab"); +is($result, qq(2|0), 'check initial data was copied to subscriber'); + +# large (streamed) transaction with DDL, DML and ROLLBACKs +$node_publisher->safe_psql('postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3,500) s(i); +ALTER TABLE test_tab ADD COLUMN c INT; +SAVEPOINT s1; +INSERT INTO test_tab SELECT i, md5(i::text), -i FROM generate_series(501,1000) s(i); +ALTER TABLE test_tab ADD COLUMN d INT; +SAVEPOINT s2; +INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i FROM generate_series(1001,1500) s(i); +ALTER TABLE test_tab ADD COLUMN e INT; +SAVEPOINT s3; +INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i, -3*i FROM generate_series(1501,2000) s(i); +ALTER TABLE test_tab DROP COLUMN c; +ROLLBACK TO s1; +INSERT INTO test_tab SELECT i, md5(i::text), i FROM generate_series(501,1000) s(i); +COMMIT; +}); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c) FROM test_tab"); +is($result, qq(1000|500), 'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults'); + +$node_subscriber->stop; +$node_publisher->stop; |