aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/tablecmds.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r--src/backend/commands/tablecmds.c305
1 files changed, 305 insertions, 0 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 71b2e3f1340..b1b7fe11ac0 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -412,6 +412,8 @@ static ObjectAddress ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *
Relation rel, Constraint *fkconstraint, Oid parentConstr,
bool recurse, bool recursing,
LOCKMODE lockmode);
+static void CloneFkReferencing(Relation pg_constraint, Relation parentRel,
+ Relation partRel, List *clone, List **cloned);
static void ATExecDropConstraint(Relation rel, const char *constrName,
DropBehavior behavior,
bool recurse, bool recursing,
@@ -7785,6 +7787,309 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
}
/*
+ * CloneForeignKeyConstraints
+ * Clone foreign keys from a partitioned table to a newly acquired
+ * partition.
+ *
+ * relationId is a partition of parentId, so we can be certain that it has the
+ * same columns with the same datatypes. The columns may be in different
+ * order, though.
+ *
+ * The *cloned list is appended ClonedConstraint elements describing what was
+ * created, for the purposes of validating the constraint in ALTER TABLE's
+ * Phase 3.
+ */
+void
+CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
+{
+ Relation pg_constraint;
+ Relation parentRel;
+ Relation rel;
+ ScanKeyData key;
+ SysScanDesc scan;
+ HeapTuple tuple;
+ List *clone = NIL;
+
+ parentRel = heap_open(parentId, NoLock); /* already got lock */
+ /* see ATAddForeignKeyConstraint about lock level */
+ rel = heap_open(relationId, AccessExclusiveLock);
+ pg_constraint = heap_open(ConstraintRelationId, RowShareLock);
+
+ /* Obtain the list of constraints to clone or attach */
+ ScanKeyInit(&key,
+ Anum_pg_constraint_conrelid, BTEqualStrategyNumber,
+ F_OIDEQ, ObjectIdGetDatum(parentId));
+ scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
+ NULL, 1, &key);
+ while ((tuple = systable_getnext(scan)) != NULL)
+ {
+ Oid oid = HeapTupleGetOid(tuple);
+
+ clone = lappend_oid(clone, oid);
+ }
+ systable_endscan(scan);
+
+ /* Do the actual work, recursing to partitions as needed */
+ CloneFkReferencing(pg_constraint, parentRel, rel, clone, cloned);
+
+ /* We're done. Clean up */
+ heap_close(parentRel, NoLock);
+ heap_close(rel, NoLock); /* keep lock till commit */
+ heap_close(pg_constraint, RowShareLock);
+}
+
+/*
+ * CloneFkReferencing
+ * Recursive subroutine for CloneForeignKeyConstraints, referencing side
+ *
+ * Clone the given list of FK constraints when a partition is attached on the
+ * referencing side of those constraints.
+ *
+ * When cloning foreign keys to a partition, it may happen that equivalent
+ * constraints already exist in the partition for some of them. We can skip
+ * creating a clone in that case, and instead just attach the existing
+ * constraint to the one in the parent.
+ *
+ * This function recurses to partitions, if the new partition is partitioned;
+ * of course, only do this for FKs that were actually cloned.
+ */
+static void
+CloneFkReferencing(Relation pg_constraint, Relation parentRel,
+ Relation partRel, List *clone, List **cloned)
+{
+ AttrNumber *attmap;
+ List *partFKs;
+ List *subclone = NIL;
+ ListCell *cell;
+
+ /*
+ * The constraint key may differ, if the columns in the partition are
+ * different. This map is used to convert them.
+ */
+ attmap = convert_tuples_by_name_map(RelationGetDescr(partRel),
+ RelationGetDescr(parentRel),
+ gettext_noop("could not convert row type"));
+
+ partFKs = copyObject(RelationGetFKeyList(partRel));
+
+ foreach(cell, clone)
+ {
+ Oid parentConstrOid = lfirst_oid(cell);
+ Form_pg_constraint constrForm;
+ HeapTuple tuple;
+ int numfks;
+ AttrNumber conkey[INDEX_MAX_KEYS];
+ AttrNumber mapped_conkey[INDEX_MAX_KEYS];
+ AttrNumber confkey[INDEX_MAX_KEYS];
+ Oid conpfeqop[INDEX_MAX_KEYS];
+ Oid conppeqop[INDEX_MAX_KEYS];
+ Oid conffeqop[INDEX_MAX_KEYS];
+ Constraint *fkconstraint;
+ bool attach_it;
+ Oid constrOid;
+ ObjectAddress parentAddr,
+ childAddr;
+ ListCell *cell;
+ int i;
+
+ tuple = SearchSysCache1(CONSTROID, parentConstrOid);
+ if (!tuple)
+ elog(ERROR, "cache lookup failed for constraint %u",
+ parentConstrOid);
+ constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
+
+ /* only foreign keys */
+ if (constrForm->contype != CONSTRAINT_FOREIGN)
+ {
+ ReleaseSysCache(tuple);
+ continue;
+ }
+
+ ObjectAddressSet(parentAddr, ConstraintRelationId, parentConstrOid);
+
+ DeconstructFkConstraintRow(tuple, &numfks, conkey, confkey,
+ conpfeqop, conppeqop, conffeqop);
+ for (i = 0; i < numfks; i++)
+ mapped_conkey[i] = attmap[conkey[i] - 1];
+
+ /*
+ * Before creating a new constraint, see whether any existing FKs are
+ * fit for the purpose. If one is, attach the parent constraint to it,
+ * and don't clone anything. This way we avoid the expensive
+ * verification step and don't end up with a duplicate FK. This also
+ * means we don't consider this constraint when recursing to
+ * partitions.
+ */
+ attach_it = false;
+ foreach(cell, partFKs)
+ {
+ ForeignKeyCacheInfo *fk = lfirst_node(ForeignKeyCacheInfo, cell);
+ Form_pg_constraint partConstr;
+ HeapTuple partcontup;
+
+ attach_it = true;
+
+ /*
+ * Do some quick & easy initial checks. If any of these fail, we
+ * cannot use this constraint, but keep looking.
+ */
+ if (fk->confrelid != constrForm->confrelid || fk->nkeys != numfks)
+ {
+ attach_it = false;
+ continue;
+ }
+ for (i = 0; i < numfks; i++)
+ {
+ if (fk->conkey[i] != mapped_conkey[i] ||
+ fk->confkey[i] != confkey[i] ||
+ fk->conpfeqop[i] != conpfeqop[i])
+ {
+ attach_it = false;
+ break;
+ }
+ }
+ if (!attach_it)
+ continue;
+
+ /*
+ * Looks good so far; do some more extensive checks. Presumably
+ * the check for 'convalidated' could be dropped, since we don't
+ * really care about that, but let's be careful for now.
+ */
+ partcontup = SearchSysCache1(CONSTROID,
+ ObjectIdGetDatum(fk->conoid));
+ if (!partcontup)
+ elog(ERROR, "cache lookup failed for constraint %u",
+ fk->conoid);
+ partConstr = (Form_pg_constraint) GETSTRUCT(partcontup);
+ if (OidIsValid(partConstr->conparentid) ||
+ !partConstr->convalidated ||
+ partConstr->condeferrable != constrForm->condeferrable ||
+ partConstr->condeferred != constrForm->condeferred ||
+ partConstr->confupdtype != constrForm->confupdtype ||
+ partConstr->confdeltype != constrForm->confdeltype ||
+ partConstr->confmatchtype != constrForm->confmatchtype)
+ {
+ ReleaseSysCache(partcontup);
+ attach_it = false;
+ continue;
+ }
+
+ ReleaseSysCache(partcontup);
+
+ /* looks good! Attach this constraint */
+ ConstraintSetParentConstraint(fk->conoid, parentConstrOid);
+ CommandCounterIncrement();
+ attach_it = true;
+ break;
+ }
+
+ /*
+ * If we attached to an existing constraint, there is no need to
+ * create a new one. In fact, there's no need to recurse for this
+ * constraint to partitions, either.
+ */
+ if (attach_it)
+ {
+ ReleaseSysCache(tuple);
+ continue;
+ }
+
+ constrOid =
+ CreateConstraintEntry(NameStr(constrForm->conname),
+ constrForm->connamespace,
+ CONSTRAINT_FOREIGN,
+ constrForm->condeferrable,
+ constrForm->condeferred,
+ constrForm->convalidated,
+ parentConstrOid,
+ RelationGetRelid(partRel),
+ mapped_conkey,
+ numfks,
+ numfks,
+ InvalidOid, /* not a domain constraint */
+ constrForm->conindid, /* same index */
+ constrForm->confrelid, /* same foreign rel */
+ confkey,
+ conpfeqop,
+ conppeqop,
+ conffeqop,
+ numfks,
+ constrForm->confupdtype,
+ constrForm->confdeltype,
+ constrForm->confmatchtype,
+ NULL,
+ NULL,
+ NULL,
+ NULL,
+ false,
+ 1, false, true);
+ subclone = lappend_oid(subclone, constrOid);
+
+ ObjectAddressSet(childAddr, ConstraintRelationId, constrOid);
+ recordDependencyOn(&childAddr, &parentAddr, DEPENDENCY_INTERNAL_AUTO);
+
+ fkconstraint = makeNode(Constraint);
+ /* for now this is all we need */
+ fkconstraint->conname = pstrdup(NameStr(constrForm->conname));
+ fkconstraint->fk_upd_action = constrForm->confupdtype;
+ fkconstraint->fk_del_action = constrForm->confdeltype;
+ fkconstraint->deferrable = constrForm->condeferrable;
+ fkconstraint->initdeferred = constrForm->condeferred;
+
+ createForeignKeyTriggers(partRel, constrForm->confrelid, fkconstraint,
+ constrOid, constrForm->conindid, false);
+
+ if (cloned)
+ {
+ ClonedConstraint *newc;
+
+ /*
+ * Feed back caller about the constraints we created, so that they
+ * can set up constraint verification.
+ */
+ newc = palloc(sizeof(ClonedConstraint));
+ newc->relid = RelationGetRelid(partRel);
+ newc->refrelid = constrForm->confrelid;
+ newc->conindid = constrForm->conindid;
+ newc->conid = constrOid;
+ newc->constraint = fkconstraint;
+
+ *cloned = lappend(*cloned, newc);
+ }
+
+ ReleaseSysCache(tuple);
+ }
+
+ pfree(attmap);
+ list_free_deep(partFKs);
+
+ /*
+ * If the partition is partitioned, recurse to handle any constraints that
+ * were cloned.
+ */
+ if (partRel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE &&
+ subclone != NIL)
+ {
+ PartitionDesc partdesc = RelationGetPartitionDesc(partRel);
+ int i;
+
+ for (i = 0; i < partdesc->nparts; i++)
+ {
+ Relation childRel;
+
+ childRel = heap_open(partdesc->oids[i], AccessExclusiveLock);
+ CloneFkReferencing(pg_constraint,
+ partRel,
+ childRel,
+ subclone,
+ cloned);
+ heap_close(childRel, NoLock); /* keep lock till commit */
+ }
+ }
+}
+
+/*
* ALTER TABLE ALTER CONSTRAINT
*
* Update the attributes of a constraint.