diff options
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r-- | src/backend/commands/tablecmds.c | 305 |
1 files changed, 305 insertions, 0 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 71b2e3f1340..b1b7fe11ac0 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -412,6 +412,8 @@ static ObjectAddress ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo * Relation rel, Constraint *fkconstraint, Oid parentConstr, bool recurse, bool recursing, LOCKMODE lockmode); +static void CloneFkReferencing(Relation pg_constraint, Relation parentRel, + Relation partRel, List *clone, List **cloned); static void ATExecDropConstraint(Relation rel, const char *constrName, DropBehavior behavior, bool recurse, bool recursing, @@ -7785,6 +7787,309 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, } /* + * CloneForeignKeyConstraints + * Clone foreign keys from a partitioned table to a newly acquired + * partition. + * + * relationId is a partition of parentId, so we can be certain that it has the + * same columns with the same datatypes. The columns may be in different + * order, though. + * + * The *cloned list is appended ClonedConstraint elements describing what was + * created, for the purposes of validating the constraint in ALTER TABLE's + * Phase 3. + */ +void +CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned) +{ + Relation pg_constraint; + Relation parentRel; + Relation rel; + ScanKeyData key; + SysScanDesc scan; + HeapTuple tuple; + List *clone = NIL; + + parentRel = heap_open(parentId, NoLock); /* already got lock */ + /* see ATAddForeignKeyConstraint about lock level */ + rel = heap_open(relationId, AccessExclusiveLock); + pg_constraint = heap_open(ConstraintRelationId, RowShareLock); + + /* Obtain the list of constraints to clone or attach */ + ScanKeyInit(&key, + Anum_pg_constraint_conrelid, BTEqualStrategyNumber, + F_OIDEQ, ObjectIdGetDatum(parentId)); + scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true, + NULL, 1, &key); + while ((tuple = systable_getnext(scan)) != NULL) + { + Oid oid = HeapTupleGetOid(tuple); + + clone = lappend_oid(clone, oid); + } + systable_endscan(scan); + + /* Do the actual work, recursing to partitions as needed */ + CloneFkReferencing(pg_constraint, parentRel, rel, clone, cloned); + + /* We're done. Clean up */ + heap_close(parentRel, NoLock); + heap_close(rel, NoLock); /* keep lock till commit */ + heap_close(pg_constraint, RowShareLock); +} + +/* + * CloneFkReferencing + * Recursive subroutine for CloneForeignKeyConstraints, referencing side + * + * Clone the given list of FK constraints when a partition is attached on the + * referencing side of those constraints. + * + * When cloning foreign keys to a partition, it may happen that equivalent + * constraints already exist in the partition for some of them. We can skip + * creating a clone in that case, and instead just attach the existing + * constraint to the one in the parent. + * + * This function recurses to partitions, if the new partition is partitioned; + * of course, only do this for FKs that were actually cloned. + */ +static void +CloneFkReferencing(Relation pg_constraint, Relation parentRel, + Relation partRel, List *clone, List **cloned) +{ + AttrNumber *attmap; + List *partFKs; + List *subclone = NIL; + ListCell *cell; + + /* + * The constraint key may differ, if the columns in the partition are + * different. This map is used to convert them. + */ + attmap = convert_tuples_by_name_map(RelationGetDescr(partRel), + RelationGetDescr(parentRel), + gettext_noop("could not convert row type")); + + partFKs = copyObject(RelationGetFKeyList(partRel)); + + foreach(cell, clone) + { + Oid parentConstrOid = lfirst_oid(cell); + Form_pg_constraint constrForm; + HeapTuple tuple; + int numfks; + AttrNumber conkey[INDEX_MAX_KEYS]; + AttrNumber mapped_conkey[INDEX_MAX_KEYS]; + AttrNumber confkey[INDEX_MAX_KEYS]; + Oid conpfeqop[INDEX_MAX_KEYS]; + Oid conppeqop[INDEX_MAX_KEYS]; + Oid conffeqop[INDEX_MAX_KEYS]; + Constraint *fkconstraint; + bool attach_it; + Oid constrOid; + ObjectAddress parentAddr, + childAddr; + ListCell *cell; + int i; + + tuple = SearchSysCache1(CONSTROID, parentConstrOid); + if (!tuple) + elog(ERROR, "cache lookup failed for constraint %u", + parentConstrOid); + constrForm = (Form_pg_constraint) GETSTRUCT(tuple); + + /* only foreign keys */ + if (constrForm->contype != CONSTRAINT_FOREIGN) + { + ReleaseSysCache(tuple); + continue; + } + + ObjectAddressSet(parentAddr, ConstraintRelationId, parentConstrOid); + + DeconstructFkConstraintRow(tuple, &numfks, conkey, confkey, + conpfeqop, conppeqop, conffeqop); + for (i = 0; i < numfks; i++) + mapped_conkey[i] = attmap[conkey[i] - 1]; + + /* + * Before creating a new constraint, see whether any existing FKs are + * fit for the purpose. If one is, attach the parent constraint to it, + * and don't clone anything. This way we avoid the expensive + * verification step and don't end up with a duplicate FK. This also + * means we don't consider this constraint when recursing to + * partitions. + */ + attach_it = false; + foreach(cell, partFKs) + { + ForeignKeyCacheInfo *fk = lfirst_node(ForeignKeyCacheInfo, cell); + Form_pg_constraint partConstr; + HeapTuple partcontup; + + attach_it = true; + + /* + * Do some quick & easy initial checks. If any of these fail, we + * cannot use this constraint, but keep looking. + */ + if (fk->confrelid != constrForm->confrelid || fk->nkeys != numfks) + { + attach_it = false; + continue; + } + for (i = 0; i < numfks; i++) + { + if (fk->conkey[i] != mapped_conkey[i] || + fk->confkey[i] != confkey[i] || + fk->conpfeqop[i] != conpfeqop[i]) + { + attach_it = false; + break; + } + } + if (!attach_it) + continue; + + /* + * Looks good so far; do some more extensive checks. Presumably + * the check for 'convalidated' could be dropped, since we don't + * really care about that, but let's be careful for now. + */ + partcontup = SearchSysCache1(CONSTROID, + ObjectIdGetDatum(fk->conoid)); + if (!partcontup) + elog(ERROR, "cache lookup failed for constraint %u", + fk->conoid); + partConstr = (Form_pg_constraint) GETSTRUCT(partcontup); + if (OidIsValid(partConstr->conparentid) || + !partConstr->convalidated || + partConstr->condeferrable != constrForm->condeferrable || + partConstr->condeferred != constrForm->condeferred || + partConstr->confupdtype != constrForm->confupdtype || + partConstr->confdeltype != constrForm->confdeltype || + partConstr->confmatchtype != constrForm->confmatchtype) + { + ReleaseSysCache(partcontup); + attach_it = false; + continue; + } + + ReleaseSysCache(partcontup); + + /* looks good! Attach this constraint */ + ConstraintSetParentConstraint(fk->conoid, parentConstrOid); + CommandCounterIncrement(); + attach_it = true; + break; + } + + /* + * If we attached to an existing constraint, there is no need to + * create a new one. In fact, there's no need to recurse for this + * constraint to partitions, either. + */ + if (attach_it) + { + ReleaseSysCache(tuple); + continue; + } + + constrOid = + CreateConstraintEntry(NameStr(constrForm->conname), + constrForm->connamespace, + CONSTRAINT_FOREIGN, + constrForm->condeferrable, + constrForm->condeferred, + constrForm->convalidated, + parentConstrOid, + RelationGetRelid(partRel), + mapped_conkey, + numfks, + numfks, + InvalidOid, /* not a domain constraint */ + constrForm->conindid, /* same index */ + constrForm->confrelid, /* same foreign rel */ + confkey, + conpfeqop, + conppeqop, + conffeqop, + numfks, + constrForm->confupdtype, + constrForm->confdeltype, + constrForm->confmatchtype, + NULL, + NULL, + NULL, + NULL, + false, + 1, false, true); + subclone = lappend_oid(subclone, constrOid); + + ObjectAddressSet(childAddr, ConstraintRelationId, constrOid); + recordDependencyOn(&childAddr, &parentAddr, DEPENDENCY_INTERNAL_AUTO); + + fkconstraint = makeNode(Constraint); + /* for now this is all we need */ + fkconstraint->conname = pstrdup(NameStr(constrForm->conname)); + fkconstraint->fk_upd_action = constrForm->confupdtype; + fkconstraint->fk_del_action = constrForm->confdeltype; + fkconstraint->deferrable = constrForm->condeferrable; + fkconstraint->initdeferred = constrForm->condeferred; + + createForeignKeyTriggers(partRel, constrForm->confrelid, fkconstraint, + constrOid, constrForm->conindid, false); + + if (cloned) + { + ClonedConstraint *newc; + + /* + * Feed back caller about the constraints we created, so that they + * can set up constraint verification. + */ + newc = palloc(sizeof(ClonedConstraint)); + newc->relid = RelationGetRelid(partRel); + newc->refrelid = constrForm->confrelid; + newc->conindid = constrForm->conindid; + newc->conid = constrOid; + newc->constraint = fkconstraint; + + *cloned = lappend(*cloned, newc); + } + + ReleaseSysCache(tuple); + } + + pfree(attmap); + list_free_deep(partFKs); + + /* + * If the partition is partitioned, recurse to handle any constraints that + * were cloned. + */ + if (partRel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE && + subclone != NIL) + { + PartitionDesc partdesc = RelationGetPartitionDesc(partRel); + int i; + + for (i = 0; i < partdesc->nparts; i++) + { + Relation childRel; + + childRel = heap_open(partdesc->oids[i], AccessExclusiveLock); + CloneFkReferencing(pg_constraint, + partRel, + childRel, + subclone, + cloned); + heap_close(childRel, NoLock); /* keep lock till commit */ + } + } +} + +/* * ALTER TABLE ALTER CONSTRAINT * * Update the attributes of a constraint. |