diff options
-rw-r--r-- | src/backend/commands/tablecmds.c | 271 |
1 files changed, 165 insertions, 106 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index c42a740ccef..d2420a9558c 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -403,6 +403,11 @@ static void ATExecAlterChildConstr(Constraint *cmdcon, Relation conrel, Relation static ObjectAddress ATExecValidateConstraint(List **wqueue, Relation rel, char *constrName, bool recurse, bool recursing, LOCKMODE lockmode); +static void QueueFKConstraintValidation(List **wqueue, Relation conrel, Relation rel, + HeapTuple contuple, LOCKMODE lockmode); +static void QueueCheckConstraintValidation(List **wqueue, Relation conrel, Relation rel, + char *constrName, HeapTuple contuple, + bool recurse, bool recursing, LOCKMODE lockmode); static int transformColumnNameList(Oid relId, List *colList, int16 *attnums, Oid *atttypids, Oid *attcollids); static int transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid, @@ -12079,136 +12084,190 @@ ATExecValidateConstraint(List **wqueue, Relation rel, char *constrName, if (!con->convalidated) { - AlteredTableInfo *tab; - HeapTuple copyTuple; - Form_pg_constraint copy_con; - if (con->contype == CONSTRAINT_FOREIGN) { - NewConstraint *newcon; - Constraint *fkconstraint; + QueueFKConstraintValidation(wqueue, conrel, rel, tuple, lockmode); + } + else if (con->contype == CONSTRAINT_CHECK) + { + QueueCheckConstraintValidation(wqueue, conrel, rel, constrName, + tuple, recurse, recursing, lockmode); + } - /* Queue validation for phase 3 */ - fkconstraint = makeNode(Constraint); - /* for now this is all we need */ - fkconstraint->conname = constrName; + ObjectAddressSet(address, ConstraintRelationId, con->oid); + } + else + address = InvalidObjectAddress; /* already validated */ - newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); - newcon->name = constrName; - newcon->contype = CONSTR_FOREIGN; - newcon->refrelid = con->confrelid; - newcon->refindid = con->conindid; - newcon->conid = con->oid; - newcon->qual = (Node *) fkconstraint; + systable_endscan(scan); - /* Find or create work queue entry for this table */ - tab = ATGetQueueEntry(wqueue, rel); - tab->constraints = lappend(tab->constraints, newcon); + table_close(conrel, RowExclusiveLock); - /* - * We disallow creating invalid foreign keys to or from - * partitioned tables, so ignoring the recursion bit is okay. - */ - } - else if (con->contype == CONSTRAINT_CHECK) - { - List *children = NIL; - ListCell *child; - NewConstraint *newcon; - Datum val; - char *conbin; + return address; +} - /* - * If we're recursing, the parent has already done this, so skip - * it. Also, if the constraint is a NO INHERIT constraint, we - * shouldn't try to look for it in the children. - */ - if (!recursing && !con->connoinherit) - children = find_all_inheritors(RelationGetRelid(rel), - lockmode, NULL); +/* + * QueueFKConstraintValidation + * + * Add an entry to the wqueue to validate the given foreign key constraint in + * Phase 3 and update the convalidated field in the pg_constraint catalog + * for the specified relation. + */ +static void +QueueFKConstraintValidation(List **wqueue, Relation conrel, Relation rel, + HeapTuple contuple, LOCKMODE lockmode) +{ + Form_pg_constraint con; + AlteredTableInfo *tab; + HeapTuple copyTuple; + Form_pg_constraint copy_con; - /* - * For CHECK constraints, we must ensure that we only mark the - * constraint as validated on the parent if it's already validated - * on the children. - * - * We recurse before validating on the parent, to reduce risk of - * deadlocks. - */ - foreach(child, children) - { - Oid childoid = lfirst_oid(child); - Relation childrel; + con = (Form_pg_constraint) GETSTRUCT(contuple); + Assert(con->contype == CONSTRAINT_FOREIGN); - if (childoid == RelationGetRelid(rel)) - continue; + if (rel->rd_rel->relkind == RELKIND_RELATION) + { + NewConstraint *newcon; + Constraint *fkconstraint; - /* - * If we are told not to recurse, there had better not be any - * child tables, because we can't mark the constraint on the - * parent valid unless it is valid for all child tables. - */ - if (!recurse) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("constraint must be validated on child tables too"))); + /* Queue validation for phase 3 */ + fkconstraint = makeNode(Constraint); + /* for now this is all we need */ + fkconstraint->conname = pstrdup(NameStr(con->conname)); - /* find_all_inheritors already got lock */ - childrel = table_open(childoid, NoLock); + newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); + newcon->name = fkconstraint->conname; + newcon->contype = CONSTR_FOREIGN; + newcon->refrelid = con->confrelid; + newcon->refindid = con->conindid; + newcon->conid = con->oid; + newcon->qual = (Node *) fkconstraint; - ATExecValidateConstraint(wqueue, childrel, constrName, false, - true, lockmode); - table_close(childrel, NoLock); - } + /* Find or create work queue entry for this table */ + tab = ATGetQueueEntry(wqueue, rel); + tab->constraints = lappend(tab->constraints, newcon); + } - /* Queue validation for phase 3 */ - newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); - newcon->name = constrName; - newcon->contype = CONSTR_CHECK; - newcon->refrelid = InvalidOid; - newcon->refindid = InvalidOid; - newcon->conid = con->oid; - - val = SysCacheGetAttrNotNull(CONSTROID, tuple, - Anum_pg_constraint_conbin); - conbin = TextDatumGetCString(val); - newcon->qual = (Node *) stringToNode(conbin); - - /* Find or create work queue entry for this table */ - tab = ATGetQueueEntry(wqueue, rel); - tab->constraints = lappend(tab->constraints, newcon); + /* + * We disallow creating invalid foreign keys to or from partitioned + * tables, so ignoring the recursion bit is okay. + */ - /* - * Invalidate relcache so that others see the new validated - * constraint. - */ - CacheInvalidateRelcache(rel); - } + /* + * Now update the catalog, while we have the door open. + */ + copyTuple = heap_copytuple(contuple); + copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple); + copy_con->convalidated = true; + CatalogTupleUpdate(conrel, ©Tuple->t_self, copyTuple); + + InvokeObjectPostAlterHook(ConstraintRelationId, con->oid, 0); + + heap_freetuple(copyTuple); +} + +/* + * QueueCheckConstraintValidation + * + * Add an entry to the wqueue to validate the given check constraint in Phase 3 + * and update the convalidated field in the pg_constraint catalog for the + * specified relation and all its inheriting children. + */ +static void +QueueCheckConstraintValidation(List **wqueue, Relation conrel, Relation rel, + char *constrName, HeapTuple contuple, + bool recurse, bool recursing, LOCKMODE lockmode) +{ + Form_pg_constraint con; + AlteredTableInfo *tab; + HeapTuple copyTuple; + Form_pg_constraint copy_con; + + List *children = NIL; + ListCell *child; + NewConstraint *newcon; + Datum val; + char *conbin; + + con = (Form_pg_constraint) GETSTRUCT(contuple); + Assert(con->contype == CONSTRAINT_CHECK); + + /* + * If we're recursing, the parent has already done this, so skip it. Also, + * if the constraint is a NO INHERIT constraint, we shouldn't try to look + * for it in the children. + */ + if (!recursing && !con->connoinherit) + children = find_all_inheritors(RelationGetRelid(rel), + lockmode, NULL); + + /* + * For CHECK constraints, we must ensure that we only mark the constraint + * as validated on the parent if it's already validated on the children. + * + * We recurse before validating on the parent, to reduce risk of + * deadlocks. + */ + foreach(child, children) + { + Oid childoid = lfirst_oid(child); + Relation childrel; + + if (childoid == RelationGetRelid(rel)) + continue; /* - * Now update the catalog, while we have the door open. + * If we are told not to recurse, there had better not be any child + * tables, because we can't mark the constraint on the parent valid + * unless it is valid for all child tables. */ - copyTuple = heap_copytuple(tuple); - copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple); - copy_con->convalidated = true; - CatalogTupleUpdate(conrel, ©Tuple->t_self, copyTuple); - - InvokeObjectPostAlterHook(ConstraintRelationId, con->oid, 0); + if (!recurse) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("constraint must be validated on child tables too"))); - heap_freetuple(copyTuple); + /* find_all_inheritors already got lock */ + childrel = table_open(childoid, NoLock); - ObjectAddressSet(address, ConstraintRelationId, con->oid); + ATExecValidateConstraint(wqueue, childrel, constrName, false, + true, lockmode); + table_close(childrel, NoLock); } - else - address = InvalidObjectAddress; /* already validated */ - systable_endscan(scan); + /* Queue validation for phase 3 */ + newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); + newcon->name = constrName; + newcon->contype = CONSTR_CHECK; + newcon->refrelid = InvalidOid; + newcon->refindid = InvalidOid; + newcon->conid = con->oid; - table_close(conrel, RowExclusiveLock); + val = SysCacheGetAttrNotNull(CONSTROID, contuple, + Anum_pg_constraint_conbin); + conbin = TextDatumGetCString(val); + newcon->qual = (Node *) stringToNode(conbin); - return address; -} + /* Find or create work queue entry for this table */ + tab = ATGetQueueEntry(wqueue, rel); + tab->constraints = lappend(tab->constraints, newcon); + /* + * Invalidate relcache so that others see the new validated constraint. + */ + CacheInvalidateRelcache(rel); + + /* + * Now update the catalog, while we have the door open. + */ + copyTuple = heap_copytuple(contuple); + copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple); + copy_con->convalidated = true; + CatalogTupleUpdate(conrel, ©Tuple->t_self, copyTuple); + + InvokeObjectPostAlterHook(ConstraintRelationId, con->oid, 0); + + heap_freetuple(copyTuple); +} /* * transformColumnNameList - transform list of column names |