diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2008-05-09 23:32:05 +0000 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2008-05-09 23:32:05 +0000 |
commit | cd902b331dc4b0c170e800441a98f9213d98b46b (patch) | |
tree | bef3eacf7ff474dd0fb96b368e80137f73658d52 /src/backend/commands/tablecmds.c | |
parent | f8df836ae396be28a6c9e4f79a6adf3e5c0187b5 (diff) | |
download | postgresql-cd902b331dc4b0c170e800441a98f9213d98b46b.tar.gz postgresql-cd902b331dc4b0c170e800441a98f9213d98b46b.zip |
Change the rules for inherited CHECK constraints to be essentially the same
as those for inherited columns; that is, it's no longer allowed for a child
table to not have a check constraint matching one that exists on a parent.
This satisfies the principle of least surprise (rows selected from the parent
will always appear to meet its check constraints) and eliminates some
longstanding bogosity in pg_dump, which formerly had to guess about whether
check constraints were really inherited or not.
The implementation involves adding conislocal and coninhcount columns to
pg_constraint (paralleling attislocal and attinhcount in pg_attribute)
and refactoring various ALTER TABLE actions to be more like those for
columns.
Alex Hunsaker, Nikhil Sontakke, Tom Lane
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r-- | src/backend/commands/tablecmds.c | 847 |
1 files changed, 587 insertions, 260 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 9d6939bbc11..1aaa0bd86fe 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.251 2008/04/24 20:17:50 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.252 2008/05/09 23:32:04 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -169,11 +169,10 @@ typedef struct NewColumnValue static void truncate_check_rel(Relation rel); static List *MergeAttributes(List *schema, List *supers, bool istemp, List **supOids, List **supconstr, int *supOidCount); -static void MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel); -static void MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel); -static void add_nonduplicate_constraint(Constraint *cdef, - ConstrCheck *check, int *ncheck); +static bool MergeCheckConstraint(List *constraints, char *name, Node *expr); static bool change_varattnos_walker(Node *node, const AttrNumber *newattno); +static void MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel); +static void MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel); static void StoreCatalogInheritance(Oid relationId, List *supers); static void StoreCatalogInheritance1(Oid relationId, Oid parentOid, int16 seqNumber, Relation inhRelation); @@ -201,7 +200,8 @@ static void ATController(Relation rel, List *cmds, bool recurse); static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, bool recurse, bool recursing); static void ATRewriteCatalogs(List **wqueue); -static void ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd); +static void ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel, + AlterTableCmd *cmd); static void ATRewriteTables(List **wqueue); static void ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap); static AlteredTableInfo *ATGetQueueEntry(List **wqueue, Relation rel); @@ -232,14 +232,18 @@ static void ATExecDropColumn(Relation rel, const char *colName, bool recurse, bool recursing); static void ATExecAddIndex(AlteredTableInfo *tab, Relation rel, IndexStmt *stmt, bool is_rebuild); -static void ATExecAddConstraint(AlteredTableInfo *tab, Relation rel, - Node *newConstraint); +static void ATExecAddConstraint(List **wqueue, + AlteredTableInfo *tab, Relation rel, + Node *newConstraint, bool recurse); +static void ATAddCheckConstraint(List **wqueue, + AlteredTableInfo *tab, Relation rel, + Constraint *constr, + bool recurse, bool recursing); static void ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, FkConstraint *fkconstraint); -static void ATPrepDropConstraint(List **wqueue, Relation rel, - bool recurse, AlterTableCmd *cmd); static void ATExecDropConstraint(Relation rel, const char *constrName, - DropBehavior behavior, bool quiet); + DropBehavior behavior, + bool recurse, bool recursing); static void ATPrepAlterColumnType(List **wqueue, AlteredTableInfo *tab, Relation rel, bool recurse, bool recursing, @@ -287,6 +291,7 @@ DefineRelation(CreateStmt *stmt, char relkind) bool localHasOids; int parentOidCount; List *rawDefaults; + List *cookedDefaults; Datum reloptions; ListCell *listptr; AttrNumber attnum; @@ -370,67 +375,78 @@ DefineRelation(CreateStmt *stmt, char relkind) &inheritOids, &old_constraints, &parentOidCount); /* - * Create a relation descriptor from the relation schema and create the - * relation. Note that in this stage only inherited (pre-cooked) defaults - * and constraints will be included into the new relation. - * (BuildDescForRelation takes care of the inherited defaults, but we have - * to copy inherited constraints here.) + * Create a tuple descriptor from the relation schema. Note that this + * deals with column names, types, and NOT NULL constraints, but not + * default values or CHECK constraints; we handle those below. */ descriptor = BuildDescForRelation(schema); localHasOids = interpretOidsOption(stmt->options); descriptor->tdhasoid = (localHasOids || parentOidCount > 0); - if (old_constraints || stmt->constraints) + /* + * Find columns with default values and prepare for insertion of the + * defaults. Pre-cooked (that is, inherited) defaults go into a list of + * CookedConstraint structs that we'll pass to heap_create_with_catalog, + * while raw defaults go into a list of RawColumnDefault structs that + * will be processed by AddRelationNewConstraints. (We can't deal with + * raw expressions until we can do transformExpr.) + * + * We can set the atthasdef flags now in the tuple descriptor; this just + * saves StoreAttrDefault from having to do an immediate update of the + * pg_attribute rows. + */ + rawDefaults = NIL; + cookedDefaults = NIL; + attnum = 0; + + foreach(listptr, schema) { - ConstrCheck *check; - int ncheck = 0; - - /* make array that's certainly big enough */ - check = (ConstrCheck *) - palloc((list_length(old_constraints) + - list_length(stmt->constraints)) * sizeof(ConstrCheck)); - /* deal with constraints from MergeAttributes */ - foreach(listptr, old_constraints) - { - Constraint *cdef = (Constraint *) lfirst(listptr); + ColumnDef *colDef = lfirst(listptr); - if (cdef->contype == CONSTR_CHECK) - add_nonduplicate_constraint(cdef, check, &ncheck); - } + attnum++; - /* - * parse_utilcmd.c might have passed some precooked constraints too, - * due to LIKE tab INCLUDING CONSTRAINTS - */ - foreach(listptr, stmt->constraints) + if (colDef->raw_default != NULL) { - Constraint *cdef = (Constraint *) lfirst(listptr); + RawColumnDefault *rawEnt; - if (cdef->contype == CONSTR_CHECK && cdef->cooked_expr != NULL) - add_nonduplicate_constraint(cdef, check, &ncheck); + Assert(colDef->cooked_default == NULL); + + rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault)); + rawEnt->attnum = attnum; + rawEnt->raw_default = colDef->raw_default; + rawDefaults = lappend(rawDefaults, rawEnt); + descriptor->attrs[attnum - 1]->atthasdef = true; } - /* if we found any, insert 'em into the descriptor */ - if (ncheck > 0) + else if (colDef->cooked_default != NULL) { - if (descriptor->constr == NULL) - { - descriptor->constr = (TupleConstr *) palloc(sizeof(TupleConstr)); - descriptor->constr->defval = NULL; - descriptor->constr->num_defval = 0; - descriptor->constr->has_not_null = false; - } - descriptor->constr->num_check = ncheck; - descriptor->constr->check = check; + CookedConstraint *cooked; + + cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint)); + cooked->contype = CONSTR_DEFAULT; + cooked->name = NULL; + cooked->attnum = attnum; + cooked->expr = stringToNode(colDef->cooked_default); + cooked->is_local = true; /* not used for defaults */ + cooked->inhcount = 0; /* ditto */ + cookedDefaults = lappend(cookedDefaults, cooked); + descriptor->attrs[attnum - 1]->atthasdef = true; } } + /* + * Create the relation. Inherited defaults and constraints are passed + * in for immediate handling --- since they don't need parsing, they + * can be stored immediately. + */ relationId = heap_create_with_catalog(relname, namespaceId, tablespaceId, InvalidOid, GetUserId(), descriptor, + list_concat(cookedDefaults, + old_constraints), relkind, false, localHasOids, @@ -463,36 +479,10 @@ DefineRelation(CreateStmt *stmt, char relkind) * apply the parser's transformExpr routine, but transformExpr doesn't * work unless we have a pre-existing relation. So, the transformation has * to be postponed to this final step of CREATE TABLE. - * - * First, scan schema to find new column defaults. - */ - rawDefaults = NIL; - attnum = 0; - - foreach(listptr, schema) - { - ColumnDef *colDef = lfirst(listptr); - - attnum++; - - if (colDef->raw_default != NULL) - { - RawColumnDefault *rawEnt; - - Assert(colDef->cooked_default == NULL); - - rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault)); - rawEnt->attnum = attnum; - rawEnt->raw_default = colDef->raw_default; - rawDefaults = lappend(rawDefaults, rawEnt); - } - } - - /* - * Parse and add the defaults/constraints, if any. */ if (rawDefaults || stmt->constraints) - AddRelationRawConstraints(rel, rawDefaults, stmt->constraints); + AddRelationNewConstraints(rel, rawDefaults, stmt->constraints, + true, true); /* * Clean up. We keep lock on new relation (although it shouldn't be @@ -1046,8 +1036,9 @@ MergeAttributes(List *schema, List *supers, bool istemp, } /* - * Now copy the constraints of this parent, adjusting attnos using the - * completed newattno[] map + * Now copy the CHECK constraints of this parent, adjusting attnos + * using the completed newattno[] map. Identically named constraints + * are merged if possible, else we throw error. */ if (constr && constr->num_check > 0) { @@ -1056,17 +1047,28 @@ MergeAttributes(List *schema, List *supers, bool istemp, for (i = 0; i < constr->num_check; i++) { - Constraint *cdef = makeNode(Constraint); + char *name = check[i].ccname; Node *expr; - cdef->contype = CONSTR_CHECK; - cdef->name = pstrdup(check[i].ccname); - cdef->raw_expr = NULL; /* adjust varattnos of ccbin here */ expr = stringToNode(check[i].ccbin); change_varattnos_of_a_node(expr, newattno); - cdef->cooked_expr = nodeToString(expr); - constraints = lappend(constraints, cdef); + + /* check for duplicate */ + if (!MergeCheckConstraint(constraints, name, expr)) + { + /* nope, this is a new one */ + CookedConstraint *cooked; + + cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint)); + cooked->contype = CONSTR_CHECK; + cooked->name = pstrdup(name); + cooked->attnum = 0; /* not used for constraints */ + cooked->expr = expr; + cooked->is_local = false; + cooked->inhcount = 1; + constraints = lappend(constraints, cooked); + } } } @@ -1183,38 +1185,46 @@ MergeAttributes(List *schema, List *supers, bool istemp, /* - * In multiple-inheritance situations, it's possible to inherit - * the same grandparent constraint through multiple parents. - * Hence, we want to discard inherited constraints that match as to - * both name and expression. Otherwise, gripe if there are conflicting - * names. Nonconflicting constraints are added to the array check[] - * of length *ncheck ... caller must ensure there is room! + * MergeCheckConstraint + * Try to merge an inherited CHECK constraint with previous ones + * + * If we inherit identically-named constraints from multiple parents, we must + * merge them, or throw an error if they don't have identical definitions. + * + * constraints is a list of CookedConstraint structs for previous constraints. + * + * Returns TRUE if merged (constraint is a duplicate), or FALSE if it's + * got a so-far-unique name, or throws error if conflict. */ -static void -add_nonduplicate_constraint(Constraint *cdef, ConstrCheck *check, int *ncheck) +static bool +MergeCheckConstraint(List *constraints, char *name, Node *expr) { - int i; - - /* Should only see precooked constraints here */ - Assert(cdef->contype == CONSTR_CHECK); - Assert(cdef->name != NULL); - Assert(cdef->raw_expr == NULL && cdef->cooked_expr != NULL); + ListCell *lc; - for (i = 0; i < *ncheck; i++) + foreach(lc, constraints) { - if (strcmp(check[i].ccname, cdef->name) != 0) + CookedConstraint *ccon = (CookedConstraint *) lfirst(lc); + + Assert(ccon->contype == CONSTR_CHECK); + + /* Non-matching names never conflict */ + if (strcmp(ccon->name, name) != 0) continue; - if (strcmp(check[i].ccbin, cdef->cooked_expr) == 0) - return; /* duplicate constraint, so ignore it */ + + if (equal(expr, ccon->expr)) + { + /* OK to merge */ + ccon->inhcount++; + return true; + } + ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("check constraint name \"%s\" appears multiple times but with different expressions", - cdef->name))); + name))); } - /* No match on name, so add it to array */ - check[*ncheck].ccname = cdef->name; - check[*ncheck].ccbin = pstrdup(cdef->cooked_expr); - (*ncheck)++; + + return false; } @@ -1252,7 +1262,7 @@ change_varattnos_walker(Node *node, const AttrNumber *newattno) * currently. */ Assert(newattno[var->varattno - 1] > 0); - var->varattno = newattno[var->varattno - 1]; + var->varattno = var->varoattno = newattno[var->varattno - 1]; } return false; } @@ -1887,8 +1897,9 @@ CheckTableNotInUse(Relation rel, const char *stmt) * expressions that need to be evaluated with respect to the old table * schema. * - * ATRewriteCatalogs performs phase 2 for each affected table (note that - * phases 2 and 3 do no explicit recursion, since phase 1 already did it). + * ATRewriteCatalogs performs phase 2 for each affected table. (Note that + * phases 2 and 3 normally do no explicit recursion, since phase 1 already + * did it --- although some subcommands have to recurse in phase 2 instead.) * Certain subcommands need to be performed before others to avoid * unnecessary conflicts; for example, DROP COLUMN should come before * ADD COLUMN. Therefore phase 1 divides the subcommands into multiple @@ -2044,27 +2055,18 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, break; case AT_AddConstraint: /* ADD CONSTRAINT */ ATSimplePermissions(rel, false); - - /* - * Currently we recurse only for CHECK constraints, never for - * foreign-key constraints. UNIQUE/PKEY constraints won't be seen - * here. - */ - if (IsA(cmd->def, Constraint)) - ATSimpleRecursion(wqueue, rel, cmd, recurse); - /* No command-specific prep needed */ + /* Recursion occurs during execution phase */ + /* No command-specific prep needed except saving recurse flag */ + if (recurse) + cmd->subtype = AT_AddConstraintRecurse; pass = AT_PASS_ADD_CONSTR; break; case AT_DropConstraint: /* DROP CONSTRAINT */ ATSimplePermissions(rel, false); - /* Performs own recursion */ - ATPrepDropConstraint(wqueue, rel, recurse, cmd); - pass = AT_PASS_DROP; - break; - case AT_DropConstraintQuietly: /* DROP CONSTRAINT for child */ - ATSimplePermissions(rel, false); - ATSimpleRecursion(wqueue, rel, cmd, recurse); - /* No command-specific prep needed */ + /* Recursion occurs during execution phase */ + /* No command-specific prep needed except saving recurse flag */ + if (recurse) + cmd->subtype = AT_DropConstraintRecurse; pass = AT_PASS_DROP; break; case AT_AlterColumnType: /* ALTER COLUMN TYPE */ @@ -2181,7 +2183,7 @@ ATRewriteCatalogs(List **wqueue) rel = relation_open(tab->relid, NoLock); foreach(lcmd, subcmds) - ATExecCmd(tab, rel, (AlterTableCmd *) lfirst(lcmd)); + ATExecCmd(wqueue, tab, rel, (AlterTableCmd *) lfirst(lcmd)); /* * After the ALTER TYPE pass, do cleanup work (this is not done in @@ -2215,7 +2217,8 @@ ATRewriteCatalogs(List **wqueue) * ATExecCmd: dispatch a subcommand to appropriate execution routine */ static void -ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd) +ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel, + AlterTableCmd *cmd) { switch (cmd->subtype) { @@ -2250,13 +2253,16 @@ ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd) ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, true); break; case AT_AddConstraint: /* ADD CONSTRAINT */ - ATExecAddConstraint(tab, rel, cmd->def); + ATExecAddConstraint(wqueue, tab, rel, cmd->def, false); + break; + case AT_AddConstraintRecurse: /* ADD CONSTRAINT with recursion */ + ATExecAddConstraint(wqueue, tab, rel, cmd->def, true); break; case AT_DropConstraint: /* DROP CONSTRAINT */ - ATExecDropConstraint(rel, cmd->name, cmd->behavior, false); + ATExecDropConstraint(rel, cmd->name, cmd->behavior, false, false); break; - case AT_DropConstraintQuietly: /* DROP CONSTRAINT for child */ - ATExecDropConstraint(rel, cmd->name, cmd->behavior, true); + case AT_DropConstraintRecurse: /* DROP CONSTRAINT with recursion */ + ATExecDropConstraint(rel, cmd->name, cmd->behavior, true, false); break; case AT_AlterColumnType: /* ALTER COLUMN TYPE */ ATExecAlterColumnType(tab, rel, cmd->name, (TypeName *) cmd->def); @@ -3272,7 +3278,7 @@ ATExecAddColumn(AlteredTableInfo *tab, Relation rel, * This function is intended for CREATE TABLE, so it processes a * _list_ of defaults, but we just do one. */ - AddRelationRawConstraints(rel, list_make1(rawEnt), NIL); + AddRelationNewConstraints(rel, list_make1(rawEnt), NIL, false, true); /* Make the additional catalog changes visible */ CommandCounterIncrement(); @@ -3297,7 +3303,7 @@ ATExecAddColumn(AlteredTableInfo *tab, Relation rel, * the constraints more directly.) * * Note: we use build_column_default, and not just the cooked default - * returned by AddRelationRawConstraints, so that the right thing happens + * returned by AddRelationNewConstraints, so that the right thing happens * when a datatype's default applies. */ defval = (Expr *) build_column_default(rel, attribute->attnum); @@ -3552,7 +3558,7 @@ ATExecColumnDefault(Relation rel, const char *colName, * This function is intended for CREATE TABLE, so it processes a * _list_ of defaults, but we just do one. */ - AddRelationRawConstraints(rel, list_make1(rawEnt), NIL); + AddRelationNewConstraints(rel, list_make1(rawEnt), NIL, false, true); } } @@ -3829,7 +3835,7 @@ ATExecDropColumn(Relation rel, const char *colName, { /* * If we were told to drop ONLY in this table (no recursion), - * we need to mark the inheritors' attribute as locally + * we need to mark the inheritors' attributes as locally * defined rather than inherited. */ childatt->attinhcount--; @@ -3936,7 +3942,8 @@ ATExecAddIndex(AlteredTableInfo *tab, Relation rel, * ALTER TABLE ADD CONSTRAINT */ static void -ATExecAddConstraint(AlteredTableInfo *tab, Relation rel, Node *newConstraint) +ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, + Node *newConstraint, bool recurse) { switch (nodeTag(newConstraint)) { @@ -3953,34 +3960,9 @@ ATExecAddConstraint(AlteredTableInfo *tab, Relation rel, Node *newConstraint) switch (constr->contype) { case CONSTR_CHECK: - { - List *newcons; - ListCell *lcon; - - /* - * Call AddRelationRawConstraints to do the work. - * It returns a list of cooked constraints. - */ - newcons = AddRelationRawConstraints(rel, NIL, - list_make1(constr)); - /* Add each constraint to Phase 3's queue */ - foreach(lcon, newcons) - { - CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon); - NewConstraint *newcon; - - newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); - newcon->name = ccon->name; - newcon->contype = ccon->contype; - /* ExecQual wants implicit-AND format */ - newcon->qual = (Node *) - make_ands_implicit((Expr *) ccon->expr); - - tab->constraints = lappend(tab->constraints, - newcon); - } - break; - } + ATAddCheckConstraint(wqueue, tab, rel, + constr, recurse, false); + break; default: elog(ERROR, "unrecognized constraint type: %d", (int) constr->contype); @@ -3992,6 +3974,9 @@ ATExecAddConstraint(AlteredTableInfo *tab, Relation rel, Node *newConstraint) FkConstraint *fkconstraint = (FkConstraint *) newConstraint; /* + * Note that we currently never recurse for FK constraints, + * so the "recurse" flag is silently ignored. + * * Assign or validate constraint name */ if (fkconstraint->constr_name) @@ -4025,6 +4010,107 @@ ATExecAddConstraint(AlteredTableInfo *tab, Relation rel, Node *newConstraint) } /* + * Add a check constraint to a single table and its children + * + * Subroutine for ATExecAddConstraint. + * + * We must recurse to child tables during execution, rather than using + * ALTER TABLE's normal prep-time recursion. The reason is that all the + * constraints *must* be given the same name, else they won't be seen as + * related later. If the user didn't explicitly specify a name, then + * AddRelationNewConstraints would normally assign different names to the + * child constraints. To fix that, we must capture the name assigned at + * the parent table and pass that down. + */ +static void +ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, + Constraint *constr, bool recurse, bool recursing) +{ + List *newcons; + ListCell *lcon; + List *children; + ListCell *child; + + /* At top level, permission check was done in ATPrepCmd, else do it */ + if (recursing) + ATSimplePermissions(rel, false); + + /* + * Call AddRelationNewConstraints to do the work, making sure it works on + * a copy of the Constraint so transformExpr can't modify the original. + * It returns a list of cooked constraints. + * + * If the constraint ends up getting merged with a pre-existing one, it's + * omitted from the returned list, which is what we want: we do not need + * to do any validation work. That can only happen at child tables, + * though, since we disallow merging at the top level. + */ + newcons = AddRelationNewConstraints(rel, NIL, + list_make1(copyObject(constr)), + recursing, !recursing); + + /* Add each constraint to Phase 3's queue */ + foreach(lcon, newcons) + { + CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon); + NewConstraint *newcon; + + newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); + newcon->name = ccon->name; + newcon->contype = ccon->contype; + /* ExecQual wants implicit-AND format */ + newcon->qual = (Node *) make_ands_implicit((Expr *) ccon->expr); + + tab->constraints = lappend(tab->constraints, newcon); + + /* Save the actually assigned name if it was defaulted */ + if (constr->name == NULL) + constr->name = ccon->name; + } + + /* At this point we must have a locked-down name to use */ + Assert(constr->name != NULL); + + /* Advance command counter in case same table is visited multiple times */ + CommandCounterIncrement(); + + /* + * Propagate to children as appropriate. Unlike most other ALTER + * routines, we have to do this one level of recursion at a time; we can't + * use find_all_inheritors to do it in one pass. + */ + children = find_inheritance_children(RelationGetRelid(rel)); + + /* + * If we are told not to recurse, there had better not be any child + * tables; else the addition would put them out of step. + */ + if (children && !recurse) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("constraint must be added to child tables too"))); + + foreach(child, children) + { + Oid childrelid = lfirst_oid(child); + Relation childrel; + AlteredTableInfo *childtab; + + childrel = heap_open(childrelid, AccessExclusiveLock); + CheckTableNotInUse(childrel, "ALTER TABLE"); + + /* Find or create work queue entry for this table */ + childtab = ATGetQueueEntry(wqueue, childrel); + + /* Recurse to child */ + ATAddCheckConstraint(wqueue, childtab, childrel, + constr, recurse, true); + + heap_close(childrel, NoLock); + } +} + +/* * Add a foreign-key constraint to a single table * * Subroutine for ATExecAddConstraint. Must already hold exclusive @@ -4295,7 +4381,9 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, indexOid, NULL, /* no check constraint */ NULL, - NULL); + NULL, + true, /* islocal */ + 0); /* inhcount */ /* * Create the triggers that will enforce the constraint. @@ -4813,46 +4901,186 @@ createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint, /* * ALTER TABLE DROP CONSTRAINT + * + * Like DROP COLUMN, we can't use the normal ALTER TABLE recursion mechanism. */ static void -ATPrepDropConstraint(List **wqueue, Relation rel, - bool recurse, AlterTableCmd *cmd) +ATExecDropConstraint(Relation rel, const char *constrName, + DropBehavior behavior, + bool recurse, bool recursing) { + List *children; + ListCell *child; + Relation conrel; + Form_pg_constraint con; + SysScanDesc scan; + ScanKeyData key; + HeapTuple tuple; + bool found = false; + bool is_check_constraint = false; + + /* At top level, permission check was done in ATPrepCmd, else do it */ + if (recursing) + ATSimplePermissions(rel, false); + + conrel = heap_open(ConstraintRelationId, RowExclusiveLock); + /* - * We don't want errors or noise from child tables, so we have to pass - * down a modified command. + * Find and drop the target constraint */ - if (recurse) + ScanKeyInit(&key, + Anum_pg_constraint_conrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(rel))); + scan = systable_beginscan(conrel, ConstraintRelidIndexId, + true, SnapshotNow, 1, &key); + + while (HeapTupleIsValid(tuple = systable_getnext(scan))) { - AlterTableCmd *childCmd = copyObject(cmd); + ObjectAddress conobj; + + con = (Form_pg_constraint) GETSTRUCT(tuple); + + if (strcmp(NameStr(con->conname), constrName) != 0) + continue; + + /* Don't drop inherited constraints */ + if (con->coninhcount > 0 && !recursing) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("cannot drop inherited constraint \"%s\" of relation \"%s\"", + constrName, RelationGetRelationName(rel)))); + + /* Right now only CHECK constraints can be inherited */ + if (con->contype == CONSTRAINT_CHECK) + is_check_constraint = true; + + /* + * Perform the actual constraint deletion + */ + conobj.classId = ConstraintRelationId; + conobj.objectId = HeapTupleGetOid(tuple); + conobj.objectSubId = 0; + + performDeletion(&conobj, behavior); - childCmd->subtype = AT_DropConstraintQuietly; - ATSimpleRecursion(wqueue, rel, childCmd, recurse); + found = true; } -} -static void -ATExecDropConstraint(Relation rel, const char *constrName, - DropBehavior behavior, bool quiet) -{ - int deleted; + systable_endscan(scan); - deleted = RemoveRelConstraints(rel, constrName, behavior); + if (!found) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("constraint \"%s\" of relation \"%s\" does not exist", + constrName, RelationGetRelationName(rel)))); - if (!quiet) + /* + * Propagate to children as appropriate. Unlike most other ALTER + * routines, we have to do this one level of recursion at a time; we can't + * use find_all_inheritors to do it in one pass. + */ + if (is_check_constraint) + children = find_inheritance_children(RelationGetRelid(rel)); + else + children = NIL; + + foreach(child, children) { - /* If zero constraints deleted, complain */ - if (deleted == 0) + Oid childrelid = lfirst_oid(child); + Relation childrel; + + childrel = heap_open(childrelid, AccessExclusiveLock); + CheckTableNotInUse(childrel, "ALTER TABLE"); + + ScanKeyInit(&key, + Anum_pg_constraint_conrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(childrelid)); + scan = systable_beginscan(conrel, ConstraintRelidIndexId, + true, SnapshotNow, 1, &key); + + found = false; + + while (HeapTupleIsValid(tuple = systable_getnext(scan))) + { + HeapTuple copy_tuple; + + con = (Form_pg_constraint) GETSTRUCT(tuple); + + /* Right now only CHECK constraints can be inherited */ + if (con->contype != CONSTRAINT_CHECK) + continue; + + if (strcmp(NameStr(con->conname), constrName) != 0) + continue; + + found = true; + + if (con->coninhcount <= 0) /* shouldn't happen */ + elog(ERROR, "relation %u has non-inherited constraint \"%s\"", + childrelid, constrName); + + copy_tuple = heap_copytuple(tuple); + con = (Form_pg_constraint) GETSTRUCT(copy_tuple); + + if (recurse) + { + /* + * If the child constraint has other definition sources, + * just decrement its inheritance count; if not, recurse + * to delete it. + */ + if (con->coninhcount == 1 && !con->conislocal) + { + /* Time to delete this child constraint, too */ + ATExecDropConstraint(childrel, constrName, behavior, + true, true); + } + else + { + /* Child constraint must survive my deletion */ + con->coninhcount--; + simple_heap_update(conrel, ©_tuple->t_self, copy_tuple); + CatalogUpdateIndexes(conrel, copy_tuple); + + /* Make update visible */ + CommandCounterIncrement(); + } + } + else + { + /* + * If we were told to drop ONLY in this table (no + * recursion), we need to mark the inheritors' constraints + * as locally defined rather than inherited. + */ + con->coninhcount--; + con->conislocal = true; + + simple_heap_update(conrel, ©_tuple->t_self, copy_tuple); + CatalogUpdateIndexes(conrel, copy_tuple); + + /* Make update visible */ + CommandCounterIncrement(); + } + + heap_freetuple(copy_tuple); + } + + systable_endscan(scan); + + if (!found) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), - errmsg("constraint \"%s\" does not exist", - constrName))); - /* Otherwise if more than one constraint deleted, notify */ - else if (deleted > 1) - ereport(NOTICE, - (errmsg("multiple constraints named \"%s\" were dropped", - constrName))); + errmsg("constraint \"%s\" of relation \"%s\" does not exist", + constrName, + RelationGetRelationName(childrel)))); + + heap_close(childrel, NoLock); } + + heap_close(conrel, RowExclusiveLock); } /* @@ -5314,7 +5542,7 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel, */ RemoveAttrDefault(RelationGetRelid(rel), attnum, DROP_RESTRICT, true); - StoreAttrDefault(rel, attnum, nodeToString(defaultexpr)); + StoreAttrDefault(rel, attnum, defaultexpr); } /* Cleanup */ @@ -6216,10 +6444,10 @@ ATExecAddInherit(Relation child_rel, RangeVar *parent) RelationGetRelationName(child_rel), RelationGetRelationName(parent_rel)))); - /* Match up the columns and bump attinhcount and attislocal */ + /* Match up the columns and bump attinhcount as needed */ MergeAttributesIntoExisting(child_rel, parent_rel); - /* Match up the constraints and make sure they're present in child */ + /* Match up the constraints and bump coninhcount as needed */ MergeConstraintsIntoExisting(child_rel, parent_rel); /* @@ -6260,6 +6488,28 @@ decompile_conbin(HeapTuple contup, TupleDesc tupdesc) } /* + * Determine whether two check constraints are functionally equivalent + * + * The test we apply is to see whether they reverse-compile to the same + * source string. This insulates us from issues like whether attributes + * have the same physical column numbers in parent and child relations. + */ +static bool +constraints_equivalent(HeapTuple a, HeapTuple b, TupleDesc tupleDesc) +{ + Form_pg_constraint acon = (Form_pg_constraint) GETSTRUCT(a); + Form_pg_constraint bcon = (Form_pg_constraint) GETSTRUCT(b); + + if (acon->condeferrable != bcon->condeferrable || + acon->condeferred != bcon->condeferred || + strcmp(decompile_conbin(a, tupleDesc), + decompile_conbin(b, tupleDesc)) != 0) + return false; + else + return true; +} + +/* * Check columns in child table match up with columns in parent, and increment * their attinhcount. * @@ -6342,7 +6592,8 @@ MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel) } /* - * Check constraints in child table match up with constraints in parent + * Check constraints in child table match up with constraints in parent, + * and increment their coninhcount. * * Called by ATExecAddInherit * @@ -6360,91 +6611,87 @@ MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel) static void MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel) { - Relation catalogRelation; - TupleDesc tupleDesc; - SysScanDesc scan; - ScanKeyData key; - HeapTuple constraintTuple; - ListCell *elem; - List *constraints; + Relation catalog_relation; + TupleDesc tuple_desc; + SysScanDesc parent_scan; + ScanKeyData parent_key; + HeapTuple parent_tuple; - /* First gather up the child's constraint definitions */ - catalogRelation = heap_open(ConstraintRelationId, AccessShareLock); - tupleDesc = RelationGetDescr(catalogRelation); + catalog_relation = heap_open(ConstraintRelationId, RowExclusiveLock); + tuple_desc = RelationGetDescr(catalog_relation); - ScanKeyInit(&key, + /* Outer loop scans through the parent's constraint definitions */ + ScanKeyInit(&parent_key, Anum_pg_constraint_conrelid, BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(RelationGetRelid(child_rel))); - scan = systable_beginscan(catalogRelation, ConstraintRelidIndexId, - true, SnapshotNow, 1, &key); + ObjectIdGetDatum(RelationGetRelid(parent_rel))); + parent_scan = systable_beginscan(catalog_relation, ConstraintRelidIndexId, + true, SnapshotNow, 1, &parent_key); - constraints = NIL; - while (HeapTupleIsValid(constraintTuple = systable_getnext(scan))) + while (HeapTupleIsValid(parent_tuple = systable_getnext(parent_scan))) { - Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(constraintTuple); + Form_pg_constraint parent_con = (Form_pg_constraint) GETSTRUCT(parent_tuple); + SysScanDesc child_scan; + ScanKeyData child_key; + HeapTuple child_tuple; + bool found = false; - if (con->contype != CONSTRAINT_CHECK) + if (parent_con->contype != CONSTRAINT_CHECK) continue; - constraints = lappend(constraints, heap_copytuple(constraintTuple)); - } + /* Search for a child constraint matching this one */ + ScanKeyInit(&child_key, + Anum_pg_constraint_conrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(child_rel))); + child_scan = systable_beginscan(catalog_relation, ConstraintRelidIndexId, + true, SnapshotNow, 1, &child_key); - systable_endscan(scan); + while (HeapTupleIsValid(child_tuple = systable_getnext(child_scan))) + { + Form_pg_constraint child_con = (Form_pg_constraint) GETSTRUCT(child_tuple); + HeapTuple child_copy; - /* Then scan through the parent's constraints looking for matches */ - ScanKeyInit(&key, - Anum_pg_constraint_conrelid, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(RelationGetRelid(parent_rel))); - scan = systable_beginscan(catalogRelation, ConstraintRelidIndexId, true, - SnapshotNow, 1, &key); + if (child_con->contype != CONSTRAINT_CHECK) + continue; - while (HeapTupleIsValid(constraintTuple = systable_getnext(scan))) - { - Form_pg_constraint parent_con = (Form_pg_constraint) GETSTRUCT(constraintTuple); - bool found = false; - Form_pg_constraint child_con = NULL; - HeapTuple child_contuple = NULL; + if (strcmp(NameStr(parent_con->conname), + NameStr(child_con->conname)) != 0) + continue; - if (parent_con->contype != CONSTRAINT_CHECK) - continue; + if (!constraints_equivalent(parent_tuple, child_tuple, tuple_desc)) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("child table \"%s\" has different definition for check constraint \"%s\"", + RelationGetRelationName(child_rel), + NameStr(parent_con->conname)))); - foreach(elem, constraints) - { - child_contuple = (HeapTuple) lfirst(elem); - child_con = (Form_pg_constraint) GETSTRUCT(child_contuple); - if (strcmp(NameStr(parent_con->conname), - NameStr(child_con->conname)) == 0) - { - found = true; - break; - } + /* + * OK, bump the child constraint's inheritance count. (If we fail + * later on, this change will just roll back.) + */ + child_copy = heap_copytuple(child_tuple); + child_con = (Form_pg_constraint) GETSTRUCT(child_copy); + child_con->coninhcount++; + simple_heap_update(catalog_relation, &child_copy->t_self, child_copy); + CatalogUpdateIndexes(catalog_relation, child_copy); + heap_freetuple(child_copy); + + found = true; + break; } + systable_endscan(child_scan); + if (!found) ereport(ERROR, (errcode(ERRCODE_DATATYPE_MISMATCH), errmsg("child table is missing constraint \"%s\"", NameStr(parent_con->conname)))); - - if (parent_con->condeferrable != child_con->condeferrable || - parent_con->condeferred != child_con->condeferred || - strcmp(decompile_conbin(constraintTuple, tupleDesc), - decompile_conbin(child_contuple, tupleDesc)) != 0) - ereport(ERROR, - (errcode(ERRCODE_DATATYPE_MISMATCH), - errmsg("constraint definition for check constraint \"%s\" does not match", - NameStr(parent_con->conname)))); - - /* - * TODO: add conislocal,coninhcount to constraints. This is where we - * would have to bump them just like attributes - */ } - systable_endscan(scan); - heap_close(catalogRelation, AccessShareLock); + systable_endscan(parent_scan); + heap_close(catalog_relation, RowExclusiveLock); } /* @@ -6459,6 +6706,9 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel) * parent then its columns will never be automatically dropped which may * surprise. But at least we'll never surprise by dropping columns someone * isn't expecting to be dropped which would actually mean data loss. + * + * coninhcount and conislocal for inherited constraints are adjusted in + * exactly the same way. */ static void ATExecDropInherit(Relation rel, RangeVar *parent) @@ -6469,7 +6719,9 @@ ATExecDropInherit(Relation rel, RangeVar *parent) ScanKeyData key[3]; HeapTuple inheritsTuple, attributeTuple, + constraintTuple, depTuple; + List *connames; bool found = false; /* @@ -6559,6 +6811,81 @@ ATExecDropInherit(Relation rel, RangeVar *parent) heap_close(catalogRelation, RowExclusiveLock); /* + * Likewise, find inherited check constraints and disinherit them. + * To do this, we first need a list of the names of the parent's check + * constraints. (We cheat a bit by only checking for name matches, + * assuming that the expressions will match.) + */ + catalogRelation = heap_open(ConstraintRelationId, RowExclusiveLock); + ScanKeyInit(&key[0], + Anum_pg_constraint_conrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(parent_rel))); + scan = systable_beginscan(catalogRelation, ConstraintRelidIndexId, + true, SnapshotNow, 1, key); + + connames = NIL; + + while (HeapTupleIsValid(constraintTuple = systable_getnext(scan))) + { + Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(constraintTuple); + + if (con->contype == CONSTRAINT_CHECK) + connames = lappend(connames, pstrdup(NameStr(con->conname))); + } + + systable_endscan(scan); + + /* Now scan the child's constraints */ + ScanKeyInit(&key[0], + Anum_pg_constraint_conrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(rel))); + scan = systable_beginscan(catalogRelation, ConstraintRelidIndexId, + true, SnapshotNow, 1, key); + + while (HeapTupleIsValid(constraintTuple = systable_getnext(scan))) + { + Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(constraintTuple); + bool match; + ListCell *lc; + + if (con->contype != CONSTRAINT_CHECK) + continue; + + match = false; + foreach (lc, connames) + { + if (strcmp(NameStr(con->conname), (char *) lfirst(lc)) == 0) + { + match = true; + break; + } + } + + if (match) + { + /* Decrement inhcount and possibly set islocal to true */ + HeapTuple copyTuple = heap_copytuple(constraintTuple); + Form_pg_constraint copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple); + if (copy_con->coninhcount <= 0) /* shouldn't happen */ + elog(ERROR, "relation %u has non-inherited constraint \"%s\"", + RelationGetRelid(rel), NameStr(copy_con->conname)); + + copy_con->coninhcount--; + if (copy_con->coninhcount == 0) + copy_con->conislocal = true; + + simple_heap_update(catalogRelation, ©Tuple->t_self, copyTuple); + CatalogUpdateIndexes(catalogRelation, copyTuple); + heap_freetuple(copyTuple); + } + } + + systable_endscan(scan); + heap_close(catalogRelation, RowExclusiveLock); + + /* * Drop the dependency * * There's no convenient way to do this, so go trawling through pg_depend |