diff options
Diffstat (limited to 'src/backend/catalog/heap.c')
-rw-r--r-- | src/backend/catalog/heap.c | 372 |
1 files changed, 234 insertions, 138 deletions
diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c index 567280e1529..d12e63445a5 100644 --- a/src/backend/catalog/heap.c +++ b/src/backend/catalog/heap.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/catalog/heap.c,v 1.332 2008/03/27 03:57:33 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/catalog/heap.c,v 1.333 2008/05/09 23:32:04 tgl Exp $ * * * INTERFACE ROUTINES @@ -77,9 +77,15 @@ static Oid AddNewRelationType(const char *typeName, char new_rel_kind, Oid new_array_type); static void RelationRemoveInheritance(Oid relid); -static void StoreRelCheck(Relation rel, char *ccname, char *ccbin); -static void StoreConstraints(Relation rel, TupleDesc tupdesc); +static void StoreRelCheck(Relation rel, char *ccname, Node *expr, + bool is_local, int inhcount); +static void StoreConstraints(Relation rel, List *cooked_constraints); +static bool MergeWithExistingConstraint(Relation rel, char *ccname, Node *expr, + bool allow_merge, bool is_local); static void SetRelationNumChecks(Relation rel, int numchecks); +static Node *cookConstraint(ParseState *pstate, + Node *raw_constraint, + char *relname); static List *insert_ordered_unique_oid(List *list, Oid datum); @@ -788,6 +794,7 @@ heap_create_with_catalog(const char *relname, Oid relid, Oid ownerid, TupleDesc tupdesc, + List *cooked_constraints, char relkind, bool shared_relation, bool oidislocal, @@ -1004,13 +1011,13 @@ heap_create_with_catalog(const char *relname, } /* - * store constraints and defaults passed in the tupdesc, if any. + * Store any supplied constraints and defaults. * * NB: this may do a CommandCounterIncrement and rebuild the relcache * entry, so the relation must be valid and self-consistent at this point. * In particular, there are not yet constraints and defaults anywhere. */ - StoreConstraints(new_rel_desc, tupdesc); + StoreConstraints(new_rel_desc, cooked_constraints); /* * If there's a special on-commit action, remember it @@ -1426,12 +1433,11 @@ heap_drop_with_catalog(Oid relid) /* * Store a default expression for column attnum of relation rel. - * The expression must be presented as a nodeToString() string. */ void -StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin) +StoreAttrDefault(Relation rel, AttrNumber attnum, Node *expr) { - Node *expr; + char *adbin; char *adsrc; Relation adrel; HeapTuple tuple; @@ -1445,12 +1451,12 @@ StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin) defobject; /* - * Need to construct source equivalent of given node-string. + * Flatten expression to string form for storage. */ - expr = stringToNode(adbin); + adbin = nodeToString(expr); /* - * deparse it + * Also deparse it to form the mostly-obsolete adsrc field. */ adsrc = deparse_expression(expr, deparse_context_for(RelationGetRelationName(rel), @@ -1482,6 +1488,7 @@ StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin) pfree(DatumGetPointer(values[Anum_pg_attrdef_adbin - 1])); pfree(DatumGetPointer(values[Anum_pg_attrdef_adsrc - 1])); heap_freetuple(tuple); + pfree(adbin); pfree(adsrc); /* @@ -1525,27 +1532,27 @@ StoreAttrDefault(Relation rel, AttrNumber attnum, char *adbin) /* * Store a check-constraint expression for the given relation. - * The expression must be presented as a nodeToString() string. * * Caller is responsible for updating the count of constraints * in the pg_class entry for the relation. */ static void -StoreRelCheck(Relation rel, char *ccname, char *ccbin) +StoreRelCheck(Relation rel, char *ccname, Node *expr, + bool is_local, int inhcount) { - Node *expr; + char *ccbin; char *ccsrc; List *varList; int keycount; int16 *attNos; /* - * Convert condition to an expression tree. + * Flatten expression to string form for storage. */ - expr = stringToNode(ccbin); + ccbin = nodeToString(expr); /* - * deparse it + * Also deparse it to form the mostly-obsolete consrc field. */ ccsrc = deparse_expression(expr, deparse_context_for(RelationGetRelationName(rel), @@ -1553,7 +1560,7 @@ StoreRelCheck(Relation rel, char *ccname, char *ccbin) false, false); /* - * Find columns of rel that are used in ccbin + * Find columns of rel that are used in expr * * NB: pull_var_clause is okay here only because we don't allow subselects * in check constraints; it would fail to examine the contents of @@ -1608,26 +1615,29 @@ StoreRelCheck(Relation rel, char *ccname, char *ccbin) InvalidOid, /* no associated index */ expr, /* Tree form check constraint */ ccbin, /* Binary form check constraint */ - ccsrc); /* Source form check constraint */ + ccsrc, /* Source form check constraint */ + is_local, /* conislocal */ + inhcount); /* coninhcount */ + pfree(ccbin); pfree(ccsrc); } /* - * Store defaults and constraints passed in via the tuple constraint struct. + * Store defaults and constraints (passed as a list of CookedConstraint). * * NOTE: only pre-cooked expressions will be passed this way, which is to * say expressions inherited from an existing relation. Newly parsed * expressions can be added later, by direct calls to StoreAttrDefault - * and StoreRelCheck (see AddRelationRawConstraints()). + * and StoreRelCheck (see AddRelationNewConstraints()). */ static void -StoreConstraints(Relation rel, TupleDesc tupdesc) +StoreConstraints(Relation rel, List *cooked_constraints) { - TupleConstr *constr = tupdesc->constr; - int i; + int numchecks = 0; + ListCell *lc; - if (!constr) + if (!cooked_constraints) return; /* nothing to do */ /* @@ -1637,33 +1647,46 @@ StoreConstraints(Relation rel, TupleDesc tupdesc) */ CommandCounterIncrement(); - for (i = 0; i < constr->num_defval; i++) - StoreAttrDefault(rel, constr->defval[i].adnum, - constr->defval[i].adbin); + foreach(lc, cooked_constraints) + { + CookedConstraint *con = (CookedConstraint *) lfirst(lc); - for (i = 0; i < constr->num_check; i++) - StoreRelCheck(rel, constr->check[i].ccname, - constr->check[i].ccbin); + switch (con->contype) + { + case CONSTR_DEFAULT: + StoreAttrDefault(rel, con->attnum, con->expr); + break; + case CONSTR_CHECK: + StoreRelCheck(rel, con->name, con->expr, + con->is_local, con->inhcount); + numchecks++; + break; + default: + elog(ERROR, "unrecognized constraint type: %d", + (int) con->contype); + } + } - if (constr->num_check > 0) - SetRelationNumChecks(rel, constr->num_check); + if (numchecks > 0) + SetRelationNumChecks(rel, numchecks); } /* - * AddRelationRawConstraints + * AddRelationNewConstraints * - * Add raw (not-yet-transformed) column default expressions and/or constraint - * check expressions to an existing relation. This is defined to do both - * for efficiency in DefineRelation, but of course you can do just one or - * the other by passing empty lists. + * Add new column default expressions and/or constraint check expressions + * to an existing relation. This is defined to do both for efficiency in + * DefineRelation, but of course you can do just one or the other by passing + * empty lists. * * rel: relation to be modified - * rawColDefaults: list of RawColumnDefault structures - * rawConstraints: list of Constraint nodes + * newColDefaults: list of RawColumnDefault structures + * newConstraints: list of Constraint nodes + * allow_merge: TRUE if check constraints may be merged with existing ones + * is_local: TRUE if definition is local, FALSE if it's inherited * - * All entries in rawColDefaults will be processed. Entries in rawConstraints - * will be processed only if they are CONSTR_CHECK type and contain a "raw" - * expression. + * All entries in newColDefaults will be processed. Entries in newConstraints + * will be processed only if they are CONSTR_CHECK type. * * Returns a list of CookedConstraint nodes that shows the cooked form of * the default and constraint expressions added to the relation. @@ -1674,9 +1697,11 @@ StoreConstraints(Relation rel, TupleDesc tupdesc) * tuples visible. */ List * -AddRelationRawConstraints(Relation rel, - List *rawColDefaults, - List *rawConstraints) +AddRelationNewConstraints(Relation rel, + List *newColDefaults, + List *newConstraints, + bool allow_merge, + bool is_local) { List *cookedConstraints = NIL; TupleDesc tupleDesc; @@ -1715,7 +1740,7 @@ AddRelationRawConstraints(Relation rel, /* * Process column default expressions. */ - foreach(cell, rawColDefaults) + foreach(cell, newColDefaults) { RawColumnDefault *colDef = (RawColumnDefault *) lfirst(cell); Form_pg_attribute atp = rel->rd_att->attrs[colDef->attnum - 1]; @@ -1739,13 +1764,15 @@ AddRelationRawConstraints(Relation rel, (IsA(expr, Const) &&((Const *) expr)->constisnull)) continue; - StoreAttrDefault(rel, colDef->attnum, nodeToString(expr)); + StoreAttrDefault(rel, colDef->attnum, expr); cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint)); cooked->contype = CONSTR_DEFAULT; cooked->name = NULL; cooked->attnum = colDef->attnum; cooked->expr = expr; + cooked->is_local = is_local; + cooked->inhcount = is_local ? 0 : 1; cookedConstraints = lappend(cookedConstraints, cooked); } @@ -1754,45 +1781,35 @@ AddRelationRawConstraints(Relation rel, */ numchecks = numoldchecks; checknames = NIL; - foreach(cell, rawConstraints) + foreach(cell, newConstraints) { Constraint *cdef = (Constraint *) lfirst(cell); char *ccname; - if (cdef->contype != CONSTR_CHECK || cdef->raw_expr == NULL) + if (cdef->contype != CONSTR_CHECK) continue; - Assert(cdef->cooked_expr == NULL); - /* - * Transform raw parsetree to executable expression. - */ - expr = transformExpr(pstate, cdef->raw_expr); - - /* - * Make sure it yields a boolean result. - */ - expr = coerce_to_boolean(pstate, expr, "CHECK"); + if (cdef->raw_expr != NULL) + { + Assert(cdef->cooked_expr == NULL); - /* - * Make sure no outside relations are referred to. - */ - if (list_length(pstate->p_rtable) != 1) - ereport(ERROR, - (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), - errmsg("only table \"%s\" can be referenced in check constraint", - RelationGetRelationName(rel)))); + /* + * Transform raw parsetree to executable expression, and verify + * it's valid as a CHECK constraint. + */ + expr = cookConstraint(pstate, cdef->raw_expr, + RelationGetRelationName(rel)); + } + else + { + Assert(cdef->cooked_expr != NULL); - /* - * No subplans or aggregates, either... - */ - if (pstate->p_hasSubLinks) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot use subquery in check constraint"))); - if (pstate->p_hasAggs) - ereport(ERROR, - (errcode(ERRCODE_GROUPING_ERROR), - errmsg("cannot use aggregate function in check constraint"))); + /* + * Here, we assume the parser will only pass us valid CHECK + * expressions, so we do no particular checking. + */ + expr = stringToNode(cdef->cooked_expr); + } /* * Check name uniqueness, or generate a name if none was given. @@ -1802,15 +1819,6 @@ AddRelationRawConstraints(Relation rel, ListCell *cell2; ccname = cdef->name; - /* Check against pre-existing constraints */ - if (ConstraintNameIsUsed(CONSTRAINT_RELATION, - RelationGetRelid(rel), - RelationGetNamespace(rel), - ccname)) - ereport(ERROR, - (errcode(ERRCODE_DUPLICATE_OBJECT), - errmsg("constraint \"%s\" for relation \"%s\" already exists", - ccname, RelationGetRelationName(rel)))); /* Check against other new constraints */ /* Needed because we don't do CommandCounterIncrement in loop */ foreach(cell2, checknames) @@ -1821,6 +1829,19 @@ AddRelationRawConstraints(Relation rel, errmsg("check constraint \"%s\" already exists", ccname))); } + + /* save name for future checks */ + checknames = lappend(checknames, ccname); + + /* + * Check against pre-existing constraints. If we are allowed + * to merge with an existing constraint, there's no more to + * do here. (We omit the duplicate constraint from the result, + * which is what ATAddCheckConstraint wants.) + */ + if (MergeWithExistingConstraint(rel, ccname, expr, + allow_merge, is_local)) + continue; } else { @@ -1855,15 +1876,15 @@ AddRelationRawConstraints(Relation rel, "check", RelationGetNamespace(rel), checknames); - } - /* save name for future checks */ - checknames = lappend(checknames, ccname); + /* save name for future checks */ + checknames = lappend(checknames, ccname); + } /* * OK, store it. */ - StoreRelCheck(rel, ccname, nodeToString(expr)); + StoreRelCheck(rel, ccname, expr, is_local, is_local ? 0 : 1); numchecks++; @@ -1872,6 +1893,8 @@ AddRelationRawConstraints(Relation rel, cooked->name = ccname; cooked->attnum = 0; cooked->expr = expr; + cooked->is_local = is_local; + cooked->inhcount = is_local ? 0 : 1; cookedConstraints = lappend(cookedConstraints, cooked); } @@ -1888,6 +1911,90 @@ AddRelationRawConstraints(Relation rel, } /* + * Check for a pre-existing check constraint that conflicts with a proposed + * new one, and either adjust its conislocal/coninhcount settings or throw + * error as needed. + * + * Returns TRUE if merged (constraint is a duplicate), or FALSE if it's + * got a so-far-unique name, or throws error if conflict. + */ +static bool +MergeWithExistingConstraint(Relation rel, char *ccname, Node *expr, + bool allow_merge, bool is_local) +{ + bool found; + Relation conDesc; + SysScanDesc conscan; + ScanKeyData skey[2]; + HeapTuple tup; + + /* Search for a pg_constraint entry with same name and relation */ + conDesc = heap_open(ConstraintRelationId, RowExclusiveLock); + + found = false; + + ScanKeyInit(&skey[0], + Anum_pg_constraint_conname, + BTEqualStrategyNumber, F_NAMEEQ, + CStringGetDatum(ccname)); + + ScanKeyInit(&skey[1], + Anum_pg_constraint_connamespace, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetNamespace(rel))); + + conscan = systable_beginscan(conDesc, ConstraintNameNspIndexId, true, + SnapshotNow, 2, skey); + + while (HeapTupleIsValid(tup = systable_getnext(conscan))) + { + Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tup); + + if (con->conrelid == RelationGetRelid(rel)) + { + /* Found it. Conflicts if not identical check constraint */ + if (con->contype == CONSTRAINT_CHECK) + { + Datum val; + bool isnull; + + val = fastgetattr(tup, + Anum_pg_constraint_conbin, + conDesc->rd_att, &isnull); + if (isnull) + elog(ERROR, "null conbin for rel %s", + RelationGetRelationName(rel)); + if (equal(expr, stringToNode(TextDatumGetCString(val)))) + found = true; + } + if (!found || !allow_merge) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("constraint \"%s\" for relation \"%s\" already exists", + ccname, RelationGetRelationName(rel)))); + /* OK to update the tuple */ + ereport(NOTICE, + (errmsg("merging constraint \"%s\" with inherited definition", + ccname))); + tup = heap_copytuple(tup); + con = (Form_pg_constraint) GETSTRUCT(tup); + if (is_local) + con->conislocal = true; + else + con->coninhcount++; + simple_heap_update(conDesc, &tup->t_self, tup); + CatalogUpdateIndexes(conDesc, tup); + break; + } + } + + systable_endscan(conscan); + heap_close(conDesc, RowExclusiveLock); + + return found; +} + +/* * Update the count of constraints in the relation's pg_class tuple. * * Caller had better hold exclusive lock on the relation. @@ -2015,63 +2122,52 @@ cookDefault(ParseState *pstate, return expr; } - /* - * Removes all constraints on a relation that match the given name. - * - * It is the responsibility of the calling function to acquire a suitable - * lock on the relation. + * Take a raw CHECK constraint expression and convert it to a cooked format + * ready for storage. * - * Returns: The number of constraints removed. + * Parse state must be set up to recognize any vars that might appear + * in the expression. */ -int -RemoveRelConstraints(Relation rel, const char *constrName, - DropBehavior behavior) +static Node * +cookConstraint(ParseState *pstate, + Node *raw_constraint, + char *relname) { - int ndeleted = 0; - Relation conrel; - SysScanDesc conscan; - ScanKeyData key[1]; - HeapTuple contup; - - /* Grab an appropriate lock on the pg_constraint relation */ - conrel = heap_open(ConstraintRelationId, RowExclusiveLock); - - /* Use the index to scan only constraints of the target relation */ - ScanKeyInit(&key[0], - Anum_pg_constraint_conrelid, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(RelationGetRelid(rel))); - - conscan = systable_beginscan(conrel, ConstraintRelidIndexId, true, - SnapshotNow, 1, key); + Node *expr; /* - * Scan over the result set, removing any matching entries. + * Transform raw parsetree to executable expression. */ - while ((contup = systable_getnext(conscan)) != NULL) - { - Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(contup); + expr = transformExpr(pstate, raw_constraint); - if (strcmp(NameStr(con->conname), constrName) == 0) - { - ObjectAddress conobj; - - conobj.classId = ConstraintRelationId; - conobj.objectId = HeapTupleGetOid(contup); - conobj.objectSubId = 0; - - performDeletion(&conobj, behavior); + /* + * Make sure it yields a boolean result. + */ + expr = coerce_to_boolean(pstate, expr, "CHECK"); - ndeleted++; - } - } + /* + * Make sure no outside relations are referred to. + */ + if (list_length(pstate->p_rtable) != 1) + ereport(ERROR, + (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), + errmsg("only table \"%s\" can be referenced in check constraint", + relname))); - /* Clean up after the scan */ - systable_endscan(conscan); - heap_close(conrel, RowExclusiveLock); + /* + * No subplans or aggregates, either... + */ + if (pstate->p_hasSubLinks) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use subquery in check constraint"))); + if (pstate->p_hasAggs) + ereport(ERROR, + (errcode(ERRCODE_GROUPING_ERROR), + errmsg("cannot use aggregate function in check constraint"))); - return ndeleted; + return expr; } |