aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/worker.c1
-rw-r--r--src/backend/replication/walsender.c3
-rw-r--r--src/test/subscription/t/100_bugs.pl55
3 files changed, 56 insertions, 3 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 4f32dc74c86..640409b757f 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3071,7 +3071,6 @@ ApplyWorkerMain(Datum main_arg)
* does some initializations on the upstream so let's still call it.
*/
(void) walrcv_identify_system(wrconn, &startpointTLI);
-
}
/*
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 7c9d1b67dfb..df27e847617 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1656,7 +1656,8 @@ exec_replication_command(const char *cmd_string)
else
StartLogicalReplication(cmd);
- /* callees already sent their own completion message */
+ /* dupe, but necessary per libpqrcv_endstreaming */
+ EndReplicationCommand(cmdtag);
Assert(xlogreader != NULL);
break;
diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl
index 366a7a94350..7dc8983d6ae 100644
--- a/src/test/subscription/t/100_bugs.pl
+++ b/src/test/subscription/t/100_bugs.pl
@@ -3,7 +3,7 @@ use strict;
use warnings;
use PostgresNode;
use TestLib;
-use Test::More tests => 3;
+use Test::More tests => 5;
# Bug #15114
@@ -100,3 +100,56 @@ is( $node_publisher->psql(
);
$node_publisher->stop('fast');
+
+# Bug #16643 - https://postgr.es/m/16643-eaadeb2a1a58d28c@postgresql.org
+#
+# Initial sync doesn't complete; the protocol was not being followed per
+# expectations after commit 07082b08cc5d.
+my $node_twoways = get_new_node('twoways');
+$node_twoways->init(allows_streaming => 'logical');
+$node_twoways->start;
+for my $db (qw(d1 d2))
+{
+ $node_twoways->safe_psql('postgres', "CREATE DATABASE $db");
+ $node_twoways->safe_psql($db, "CREATE TABLE t (f int)");
+ $node_twoways->safe_psql($db, "CREATE TABLE t2 (f int)");
+}
+
+my $rows = 3000;
+$node_twoways->safe_psql(
+ 'd1', qq{
+ INSERT INTO t SELECT * FROM generate_series(1, $rows);
+ INSERT INTO t2 SELECT * FROM generate_series(1, $rows);
+ CREATE PUBLICATION testpub FOR TABLE t;
+ SELECT pg_create_logical_replication_slot('testslot', 'pgoutput');
+ });
+
+$node_twoways->safe_psql('d2',
+ "CREATE SUBSCRIPTION testsub CONNECTION \$\$"
+ . $node_twoways->connstr('d1')
+ . "\$\$ PUBLICATION testpub WITH (create_slot=false, "
+ . "slot_name='testslot')");
+$node_twoways->safe_psql(
+ 'd1', qq{
+ INSERT INTO t SELECT * FROM generate_series(1, $rows);
+ INSERT INTO t2 SELECT * FROM generate_series(1, $rows);
+ });
+$node_twoways->safe_psql(
+ 'd1', 'ALTER PUBLICATION testpub ADD TABLE t2');
+$node_twoways->safe_psql(
+ 'd2', 'ALTER SUBSCRIPTION testsub REFRESH PUBLICATION');
+
+# We cannot rely solely on wait_for_catchup() here; it isn't sufficient
+# when tablesync workers might still be running. So in addition to that,
+# we verify that no tablesync workers appear for the subscription.
+# XXX maybe this should be integrated in wait_for_catchup() itself.
+$node_twoways->wait_for_catchup('testsub');
+$node_twoways->poll_query_until(
+ 'd2',
+ "SELECT count(*) FROM pg_stat_subscription WHERE subname = 'testsub' AND relid <> 0",
+ "0");
+
+is($node_twoways->safe_psql('d2', "SELECT count(f) FROM t"),
+ $rows * 2, "2x$rows rows in t");
+is($node_twoways->safe_psql('d2', "SELECT count(f) FROM t2"),
+ $rows * 2, "2x$rows rows in t2");