aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/relation.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/relation.c')
-rw-r--r--src/backend/replication/logical/relation.c115
1 files changed, 66 insertions, 49 deletions
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 5c7e9d11ac8..1fc34b18a44 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -250,6 +250,67 @@ logicalrep_report_missing_attrs(LogicalRepRelation *remoterel,
}
/*
+ * Check if replica identity matches and mark the updatable flag.
+ *
+ * We allow for stricter replica identity (fewer columns) on subscriber as
+ * that will not stop us from finding unique tuple. IE, if publisher has
+ * identity (id,timestamp) and subscriber just (id) this will not be a
+ * problem, but in the opposite scenario it will.
+ *
+ * We just mark the relation entry as not updatable here if the local
+ * replica identity is found to be insufficient for applying
+ * updates/deletes (inserts don't care!) and leave it to
+ * check_relation_updatable() to throw the actual error if needed.
+ */
+static void
+logicalrep_rel_mark_updatable(LogicalRepRelMapEntry *entry)
+{
+ Bitmapset *idkey;
+ LogicalRepRelation *remoterel = &entry->remoterel;
+ int i;
+
+ entry->updatable = true;
+
+ idkey = RelationGetIndexAttrBitmap(entry->localrel,
+ INDEX_ATTR_BITMAP_IDENTITY_KEY);
+ /* fallback to PK if no replica identity */
+ if (idkey == NULL)
+ {
+ idkey = RelationGetIndexAttrBitmap(entry->localrel,
+ INDEX_ATTR_BITMAP_PRIMARY_KEY);
+
+ /*
+ * If no replica identity index and no PK, the published table must
+ * have replica identity FULL.
+ */
+ if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
+ entry->updatable = false;
+ }
+
+ i = -1;
+ while ((i = bms_next_member(idkey, i)) >= 0)
+ {
+ int attnum = i + FirstLowInvalidHeapAttributeNumber;
+
+ if (!AttrNumberIsForUserDefinedAttr(attnum))
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical replication target relation \"%s.%s\" uses "
+ "system columns in REPLICA IDENTITY index",
+ remoterel->nspname, remoterel->relname)));
+
+ attnum = AttrNumberGetAttrOffset(attnum);
+
+ if (entry->attrmap->attnums[attnum] < 0 ||
+ !bms_is_member(entry->attrmap->attnums[attnum], remoterel->attkeys))
+ {
+ entry->updatable = false;
+ break;
+ }
+ }
+}
+
+/*
* Open the local relation associated with the remote one.
*
* Rebuilds the Relcache mapping if it was invalidated by local DDL.
@@ -307,7 +368,6 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
if (!entry->localrelvalid)
{
Oid relid;
- Bitmapset *idkey;
TupleDesc desc;
MemoryContext oldctx;
int i;
@@ -366,54 +426,10 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
bms_free(missingatts);
/*
- * Check that replica identity matches. We allow for stricter replica
- * identity (fewer columns) on subscriber as that will not stop us
- * from finding unique tuple. IE, if publisher has identity
- * (id,timestamp) and subscriber just (id) this will not be a problem,
- * but in the opposite scenario it will.
- *
- * Don't throw any error here just mark the relation entry as not
- * updatable, as replica identity is only for updates and deletes but
- * inserts can be replicated even without it.
+ * Set if the table's replica identity is enough to apply
+ * update/delete.
*/
- entry->updatable = true;
- idkey = RelationGetIndexAttrBitmap(entry->localrel,
- INDEX_ATTR_BITMAP_IDENTITY_KEY);
- /* fallback to PK if no replica identity */
- if (idkey == NULL)
- {
- idkey = RelationGetIndexAttrBitmap(entry->localrel,
- INDEX_ATTR_BITMAP_PRIMARY_KEY);
-
- /*
- * If no replica identity index and no PK, the published table
- * must have replica identity FULL.
- */
- if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
- entry->updatable = false;
- }
-
- i = -1;
- while ((i = bms_next_member(idkey, i)) >= 0)
- {
- int attnum = i + FirstLowInvalidHeapAttributeNumber;
-
- if (!AttrNumberIsForUserDefinedAttr(attnum))
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("logical replication target relation \"%s.%s\" uses "
- "system columns in REPLICA IDENTITY index",
- remoterel->nspname, remoterel->relname)));
-
- attnum = AttrNumberGetAttrOffset(attnum);
-
- if (entry->attrmap->attnums[attnum] < 0 ||
- !bms_is_member(entry->attrmap->attnums[attnum], remoterel->attkeys))
- {
- entry->updatable = false;
- break;
- }
- }
+ logicalrep_rel_mark_updatable(entry);
entry->localrelvalid = true;
}
@@ -651,7 +667,8 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
attrmap->maplen * sizeof(AttrNumber));
}
- entry->updatable = root->updatable;
+ /* Set if the table's replica identity is enough to apply update/delete. */
+ logicalrep_rel_mark_updatable(entry);
entry->localrelvalid = true;