aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/catalog/pg_publication.c52
-rw-r--r--src/test/regress/expected/publication.out9
-rw-r--r--src/test/regress/sql/publication.sql4
-rw-r--r--src/test/subscription/t/013_partition.pl18
4 files changed, 79 insertions, 4 deletions
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index b144a3be7da..36609118d49 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -106,6 +106,45 @@ is_publishable_class(Oid relid, Form_pg_class reltuple)
}
/*
+ * Filter out the partitions whose parent tables were also specified in
+ * the publication.
+ */
+static List *
+filter_partitions(List *relids)
+{
+ List *result = NIL;
+ ListCell *lc;
+ ListCell *lc2;
+
+ foreach(lc, relids)
+ {
+ bool skip = false;
+ List *ancestors = NIL;
+ Oid relid = lfirst_oid(lc);
+
+ if (get_rel_relispartition(relid))
+ ancestors = get_partition_ancestors(relid);
+
+ foreach(lc2, ancestors)
+ {
+ Oid ancestor = lfirst_oid(lc2);
+
+ /* Check if the parent table exists in the published table list. */
+ if (list_member_oid(relids, ancestor))
+ {
+ skip = true;
+ break;
+ }
+ }
+
+ if (!skip)
+ result = lappend_oid(result, relid);
+ }
+
+ return result;
+}
+
+/*
* Another variant of this, taking a Relation.
*/
bool
@@ -557,10 +596,23 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
if (publication->alltables)
tables = GetAllTablesPublicationRelations(publication->pubviaroot);
else
+ {
tables = GetPublicationRelations(publication->oid,
publication->pubviaroot ?
PUBLICATION_PART_ROOT :
PUBLICATION_PART_LEAF);
+
+ /*
+ * If the publication publishes partition changes via their
+ * respective root partitioned tables, we must exclude partitions
+ * in favor of including the root partitioned tables. Otherwise,
+ * the function could return both the child and parent tables
+ * which could cause data of the child table to be
+ * double-published on the subscriber side.
+ */
+ if (publication->pubviaroot)
+ tables = filter_partitions(tables);
+ }
funcctx->user_fctx = (void *) tables;
MemoryContextSwitchTo(oldcontext);
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index f27859373d1..a2aca234ef8 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -163,6 +163,15 @@ HINT: To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.
ALTER PUBLICATION testpub_forparted DROP TABLE testpub_parted;
-- works again, because update is no longer replicated
UPDATE testpub_parted2 SET a = 2;
+-- publication includes both the parent table and the child table
+ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted, testpub_parted2;
+-- only parent is listed as being in publication, not the partition
+SELECT * FROM pg_publication_tables;
+ pubname | schemaname | tablename
+-------------------+------------+----------------
+ testpub_forparted | public | testpub_parted
+(1 row)
+
DROP TABLE testpub_parted1, testpub_parted2;
DROP PUBLICATION testpub_forparted, testpub_forparted1;
-- Test cache invalidation FOR ALL TABLES publication
diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql
index e5745d575b0..4f2445ad117 100644
--- a/src/test/regress/sql/publication.sql
+++ b/src/test/regress/sql/publication.sql
@@ -97,6 +97,10 @@ UPDATE testpub_parted2 SET a = 2;
ALTER PUBLICATION testpub_forparted DROP TABLE testpub_parted;
-- works again, because update is no longer replicated
UPDATE testpub_parted2 SET a = 2;
+-- publication includes both the parent table and the child table
+ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted, testpub_parted2;
+-- only parent is listed as being in publication, not the partition
+SELECT * FROM pg_publication_tables;
DROP TABLE testpub_parted1, testpub_parted2;
DROP PUBLICATION testpub_forparted, testpub_forparted1;
diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl
index 3478e4db8fd..e2a58cb080c 100644
--- a/src/test/subscription/t/013_partition.pl
+++ b/src/test/subscription/t/013_partition.pl
@@ -6,7 +6,7 @@ use strict;
use warnings;
use PostgresNode;
use TestLib;
-use Test::More tests => 62;
+use Test::More tests => 63;
# setup
@@ -412,11 +412,16 @@ $node_publisher->safe_psql('postgres',
$node_publisher->safe_psql('postgres',
"ALTER PUBLICATION pub_all SET (publish_via_partition_root = true)");
# Note: tab3_1's parent is not in the publication, in which case its
-# changes are published using own identity.
+# changes are published using own identity. For tab2, even though both parent
+# and child tables are present but changes will be replicated via the parent's
+# identity and only once.
$node_publisher->safe_psql('postgres',
- "CREATE PUBLICATION pub_viaroot FOR TABLE tab2, tab3_1 WITH (publish_via_partition_root = true)"
+ "CREATE PUBLICATION pub_viaroot FOR TABLE tab2, tab2_1, tab3_1 WITH (publish_via_partition_root = true)"
);
+# prepare data for the initial sync
+$node_publisher->safe_psql('postgres', "INSERT INTO tab2 VALUES (1)");
+
# subscriber 1
$node_subscriber1->safe_psql('postgres', "DROP SUBSCRIPTION sub1");
$node_subscriber1->safe_psql('postgres',
@@ -468,12 +473,17 @@ $node_subscriber1->poll_query_until('postgres', $synced_query)
$node_subscriber2->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
+# check that data is synced correctly
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT c, a FROM tab2");
+is( $result, qq(sub1_tab2|1), 'initial data synced for pub_viaroot');
+
# insert
$node_publisher->safe_psql('postgres', "INSERT INTO tab1 VALUES (1), (0)");
$node_publisher->safe_psql('postgres', "INSERT INTO tab1_1 (a) VALUES (3)");
$node_publisher->safe_psql('postgres', "INSERT INTO tab1_2 VALUES (5)");
$node_publisher->safe_psql('postgres',
- "INSERT INTO tab2 VALUES (1), (0), (3), (5)");
+ "INSERT INTO tab2 VALUES (0), (3), (5)");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab3 VALUES (1), (0), (3), (5)");