aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/replication/logical/relation.c115
-rw-r--r--src/backend/replication/logical/worker.c27
-rw-r--r--src/test/subscription/t/013_partition.pl16
3 files changed, 102 insertions, 56 deletions
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 5c7e9d11ac8..1fc34b18a44 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -250,6 +250,67 @@ logicalrep_report_missing_attrs(LogicalRepRelation *remoterel,
}
/*
+ * Check if replica identity matches and mark the updatable flag.
+ *
+ * We allow for stricter replica identity (fewer columns) on subscriber as
+ * that will not stop us from finding unique tuple. IE, if publisher has
+ * identity (id,timestamp) and subscriber just (id) this will not be a
+ * problem, but in the opposite scenario it will.
+ *
+ * We just mark the relation entry as not updatable here if the local
+ * replica identity is found to be insufficient for applying
+ * updates/deletes (inserts don't care!) and leave it to
+ * check_relation_updatable() to throw the actual error if needed.
+ */
+static void
+logicalrep_rel_mark_updatable(LogicalRepRelMapEntry *entry)
+{
+ Bitmapset *idkey;
+ LogicalRepRelation *remoterel = &entry->remoterel;
+ int i;
+
+ entry->updatable = true;
+
+ idkey = RelationGetIndexAttrBitmap(entry->localrel,
+ INDEX_ATTR_BITMAP_IDENTITY_KEY);
+ /* fallback to PK if no replica identity */
+ if (idkey == NULL)
+ {
+ idkey = RelationGetIndexAttrBitmap(entry->localrel,
+ INDEX_ATTR_BITMAP_PRIMARY_KEY);
+
+ /*
+ * If no replica identity index and no PK, the published table must
+ * have replica identity FULL.
+ */
+ if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
+ entry->updatable = false;
+ }
+
+ i = -1;
+ while ((i = bms_next_member(idkey, i)) >= 0)
+ {
+ int attnum = i + FirstLowInvalidHeapAttributeNumber;
+
+ if (!AttrNumberIsForUserDefinedAttr(attnum))
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical replication target relation \"%s.%s\" uses "
+ "system columns in REPLICA IDENTITY index",
+ remoterel->nspname, remoterel->relname)));
+
+ attnum = AttrNumberGetAttrOffset(attnum);
+
+ if (entry->attrmap->attnums[attnum] < 0 ||
+ !bms_is_member(entry->attrmap->attnums[attnum], remoterel->attkeys))
+ {
+ entry->updatable = false;
+ break;
+ }
+ }
+}
+
+/*
* Open the local relation associated with the remote one.
*
* Rebuilds the Relcache mapping if it was invalidated by local DDL.
@@ -307,7 +368,6 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
if (!entry->localrelvalid)
{
Oid relid;
- Bitmapset *idkey;
TupleDesc desc;
MemoryContext oldctx;
int i;
@@ -366,54 +426,10 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
bms_free(missingatts);
/*
- * Check that replica identity matches. We allow for stricter replica
- * identity (fewer columns) on subscriber as that will not stop us
- * from finding unique tuple. IE, if publisher has identity
- * (id,timestamp) and subscriber just (id) this will not be a problem,
- * but in the opposite scenario it will.
- *
- * Don't throw any error here just mark the relation entry as not
- * updatable, as replica identity is only for updates and deletes but
- * inserts can be replicated even without it.
+ * Set if the table's replica identity is enough to apply
+ * update/delete.
*/
- entry->updatable = true;
- idkey = RelationGetIndexAttrBitmap(entry->localrel,
- INDEX_ATTR_BITMAP_IDENTITY_KEY);
- /* fallback to PK if no replica identity */
- if (idkey == NULL)
- {
- idkey = RelationGetIndexAttrBitmap(entry->localrel,
- INDEX_ATTR_BITMAP_PRIMARY_KEY);
-
- /*
- * If no replica identity index and no PK, the published table
- * must have replica identity FULL.
- */
- if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
- entry->updatable = false;
- }
-
- i = -1;
- while ((i = bms_next_member(idkey, i)) >= 0)
- {
- int attnum = i + FirstLowInvalidHeapAttributeNumber;
-
- if (!AttrNumberIsForUserDefinedAttr(attnum))
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("logical replication target relation \"%s.%s\" uses "
- "system columns in REPLICA IDENTITY index",
- remoterel->nspname, remoterel->relname)));
-
- attnum = AttrNumberGetAttrOffset(attnum);
-
- if (entry->attrmap->attnums[attnum] < 0 ||
- !bms_is_member(entry->attrmap->attnums[attnum], remoterel->attkeys))
- {
- entry->updatable = false;
- break;
- }
- }
+ logicalrep_rel_mark_updatable(entry);
entry->localrelvalid = true;
}
@@ -651,7 +667,8 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
attrmap->maplen * sizeof(AttrNumber));
}
- entry->updatable = root->updatable;
+ /* Set if the table's replica identity is enough to apply update/delete. */
+ logicalrep_rel_mark_updatable(entry);
entry->localrelvalid = true;
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index bf97fa44ba2..8c9a4b50383 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1323,6 +1323,13 @@ apply_handle_insert_internal(ApplyExecutionData *edata,
static void
check_relation_updatable(LogicalRepRelMapEntry *rel)
{
+ /*
+ * For partitioned tables, we only need to care if the target partition is
+ * updatable (aka has PK or RI defined for it).
+ */
+ if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ return;
+
/* Updatable, no error. */
if (rel->updatable)
return;
@@ -1676,6 +1683,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
TupleTableSlot *remoteslot_part;
TupleConversionMap *map;
MemoryContext oldctx;
+ LogicalRepRelMapEntry *part_entry = NULL;
+ AttrMap *attrmap = NULL;
/* ModifyTableState is needed for ExecFindPartition(). */
edata->mtstate = mtstate = makeNode(ModifyTableState);
@@ -1707,8 +1716,11 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
map = partrelinfo->ri_RootToPartitionMap;
if (map != NULL)
- remoteslot_part = execute_attr_map_slot(map->attrMap, remoteslot,
+ {
+ attrmap = map->attrMap;
+ remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
remoteslot_part);
+ }
else
{
remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
@@ -1716,6 +1728,14 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
}
MemoryContextSwitchTo(oldctx);
+ /* Check if we can do the update or delete on the leaf partition. */
+ if (operation == CMD_UPDATE || operation == CMD_DELETE)
+ {
+ part_entry = logicalrep_partition_open(relmapentry, partrel,
+ attrmap);
+ check_relation_updatable(part_entry);
+ }
+
switch (operation)
{
case CMD_INSERT:
@@ -1737,15 +1757,10 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
* suitable partition.
*/
{
- AttrMap *attrmap = map ? map->attrMap : NULL;
- LogicalRepRelMapEntry *part_entry;
TupleTableSlot *localslot;
ResultRelInfo *partrelinfo_new;
bool found;
- part_entry = logicalrep_partition_open(relmapentry, partrel,
- attrmap);
-
/* Get the matching local tuple from the partition. */
found = FindReplTupleInLocalRel(estate, partrel,
&part_entry->remoterel,
diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl
index 568e4d104e0..dfe2cb6deae 100644
--- a/src/test/subscription/t/013_partition.pl
+++ b/src/test/subscription/t/013_partition.pl
@@ -6,7 +6,7 @@ use strict;
use warnings;
use PostgresNode;
use TestLib;
-use Test::More tests => 70;
+use Test::More tests => 71;
# setup
@@ -856,3 +856,17 @@ $node_publisher->wait_for_catchup('sub2');
$result = $node_subscriber2->safe_psql('postgres',
"SELECT a, b, c FROM tab5 ORDER BY 1");
is($result, qq(3||1), 'updates of tab5 replicated correctly after altering table on publisher');
+
+# Test that replication works correctly as long as the leaf partition
+# has the necessary REPLICA IDENTITY, even though the actual target
+# partitioned table does not.
+$node_subscriber2->safe_psql('postgres',
+ "ALTER TABLE tab5 REPLICA IDENTITY NOTHING");
+
+$node_publisher->safe_psql('postgres', "UPDATE tab5 SET a = 4 WHERE a = 3");
+
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b, c FROM tab5_1 ORDER BY 1");
+is($result, qq(4||1), 'updates of tab5 replicated correctly');