aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c16
-rw-r--r--src/test/subscription/t/024_add_drop_pub.pl44
2 files changed, 57 insertions, 3 deletions
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index ed806c54300..8357bf8b4c0 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1764,6 +1764,11 @@ pgoutput_shutdown(LogicalDecodingContext *ctx)
/*
* Load publications from the list of publication names.
+ *
+ * Here, we skip the publications that don't exist yet. This will allow us
+ * to silently continue the replication in the absence of a missing publication.
+ * This is required because we allow the users to create publications after they
+ * have specified the required publications at the time of replication start.
*/
static List *
LoadPublications(List *pubnames)
@@ -1774,9 +1779,16 @@ LoadPublications(List *pubnames)
foreach(lc, pubnames)
{
char *pubname = (char *) lfirst(lc);
- Publication *pub = GetPublicationByName(pubname, false);
+ Publication *pub = GetPublicationByName(pubname, true);
- result = lappend(result, pub);
+ if (pub)
+ result = lappend(result, pub);
+ else
+ ereport(WARNING,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("skipped loading publication: %s", pubname),
+ errdetail("The publication does not exist at this point in the WAL."),
+ errhint("Create the publication if it does not exist."));
}
return result;
diff --git a/src/test/subscription/t/024_add_drop_pub.pl b/src/test/subscription/t/024_add_drop_pub.pl
index 4428a3413db..b594941c7cb 100644
--- a/src/test/subscription/t/024_add_drop_pub.pl
+++ b/src/test/subscription/t/024_add_drop_pub.pl
@@ -1,7 +1,9 @@
# Copyright (c) 2021-2025, PostgreSQL Global Development Group
-# This test checks behaviour of ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION
+# This test checks behaviour of ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION and
+# ensures that creating a publication associated with a subscription at a later
+# point of time does not break logical replication.
use strict;
use warnings FATAL => 'all';
use PostgreSQL::Test::Cluster;
@@ -80,6 +82,46 @@ $result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), min(a), max(a) FROM tab_1");
is($result, qq(20|1|10), 'check initial data is copied to subscriber');
+# Ensure that setting a missing publication to the subscription does not
+# disrupt existing logical replication. Instead, it should log a warning
+# while allowing replication to continue. Additionally, verify that replication
+# resumes after the missing publication is created for the publication table.
+
+# Create table on publisher and subscriber
+$node_publisher->safe_psql('postgres', "CREATE TABLE tab_3 (a int)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_3 (a int)");
+
+# Set the subscription with a missing publication
+$node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION tap_sub SET PUBLICATION tap_pub_3");
+
+my $offset = -s $node_publisher->logfile;
+
+$node_publisher->safe_psql('postgres',"INSERT INTO tab_3 values(1)");
+
+# Verify that a warning is logged.
+$node_publisher->wait_for_log(
+ qr/WARNING: ( [A-Z0-9]+:)? skipped loading publication: tap_pub_3/, $offset);
+
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub_3 FOR TABLE tab_3");
+
+$node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION");
+
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
+
+$node_publisher->safe_psql('postgres', "INSERT INTO tab_3 values(2)");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+# Verify that the insert operation gets replicated to subscriber after
+# publication is created.
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT * FROM tab_3");
+is($result, qq(1
+2), 'check that the incremental data is replicated after the publication is created');
+
# shutdown
$node_subscriber->stop('fast');
$node_publisher->stop('fast');