diff options
Diffstat (limited to 'src/backend/commands/indexcmds.c')
-rw-r--r-- | src/backend/commands/indexcmds.c | 138 |
1 files changed, 131 insertions, 7 deletions
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c index 96272ab998d..00786442b3f 100644 --- a/src/backend/commands/indexcmds.c +++ b/src/backend/commands/indexcmds.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/indexcmds.c,v 1.187 2009/07/29 20:56:18 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/commands/indexcmds.c,v 1.188 2009/12/07 05:22:21 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -25,6 +25,7 @@ #include "catalog/index.h" #include "catalog/indexing.h" #include "catalog/pg_opclass.h" +#include "catalog/pg_opfamily.h" #include "catalog/pg_tablespace.h" #include "commands/dbcommands.h" #include "commands/defrem.h" @@ -36,6 +37,7 @@ #include "optimizer/clauses.h" #include "parser/parse_coerce.h" #include "parser/parse_func.h" +#include "parser/parse_oper.h" #include "parser/parsetree.h" #include "storage/lmgr.h" #include "storage/proc.h" @@ -58,6 +60,7 @@ static void ComputeIndexAttrs(IndexInfo *indexInfo, Oid *classOidP, int16 *colOptionP, List *attList, + List *exclusionOpNames, Oid relId, char *accessMethodName, Oid accessMethodId, bool amcanorder, @@ -83,6 +86,8 @@ static bool relationHasPrimaryKey(Relation rel); * to index on. * 'predicate': the partial-index condition, or NULL if none. * 'options': reloptions from WITH (in list-of-DefElem form). + * 'exclusionOpNames': list of names of exclusion-constraint operators, + * or NIL if not an exclusion constraint. * 'unique': make the index enforce uniqueness. * 'primary': mark the index as a primary key in the catalogs. * 'isconstraint': index is for a PRIMARY KEY or UNIQUE constraint, @@ -106,6 +111,7 @@ DefineIndex(RangeVar *heapRelation, List *attributeList, Expr *predicate, List *options, + List *exclusionOpNames, bool unique, bool primary, bool isconstraint, @@ -247,10 +253,21 @@ DefineIndex(RangeVar *heapRelation, if (indexRelationName == NULL) { if (primary) + { indexRelationName = ChooseRelationName(RelationGetRelationName(rel), NULL, "pkey", namespaceId); + } + else if (exclusionOpNames != NIL) + { + IndexElem *iparam = (IndexElem *) linitial(attributeList); + + indexRelationName = ChooseRelationName(RelationGetRelationName(rel), + iparam->name, + "exclusion", + namespaceId); + } else { IndexElem *iparam = (IndexElem *) linitial(attributeList); @@ -303,6 +320,11 @@ DefineIndex(RangeVar *heapRelation, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("access method \"%s\" does not support multicolumn indexes", accessMethodName))); + if (exclusionOpNames != NIL && !OidIsValid(accessMethodForm->amgettuple)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("access method \"%s\" does not support exclusion constraints", + accessMethodName))); amcanorder = accessMethodForm->amcanorder; amoptions = accessMethodForm->amoptions; @@ -418,6 +440,9 @@ DefineIndex(RangeVar *heapRelation, indexInfo->ii_ExpressionsState = NIL; indexInfo->ii_Predicate = make_ands_implicit(predicate); indexInfo->ii_PredicateState = NIL; + indexInfo->ii_ExclusionOps = NULL; + indexInfo->ii_ExclusionProcs = NULL; + indexInfo->ii_ExclusionStrats = NULL; indexInfo->ii_Unique = unique; /* In a concurrent build, mark it not-ready-for-inserts */ indexInfo->ii_ReadyForInserts = !concurrent; @@ -427,7 +452,8 @@ DefineIndex(RangeVar *heapRelation, classObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid)); coloptions = (int16 *) palloc(numberOfAttributes * sizeof(int16)); ComputeIndexAttrs(indexInfo, classObjectId, coloptions, attributeList, - relationId, accessMethodName, accessMethodId, + exclusionOpNames, relationId, + accessMethodName, accessMethodId, amcanorder, isconstraint); /* @@ -435,11 +461,27 @@ DefineIndex(RangeVar *heapRelation, * error checks) */ if (isconstraint && !quiet) + { + const char *constraint_type; + + if (primary) + constraint_type = "PRIMARY KEY"; + else if (unique) + constraint_type = "UNIQUE"; + else if (exclusionOpNames != NIL) + constraint_type = "EXCLUDE"; + else + { + elog(ERROR, "unknown constraint type"); + constraint_type = NULL; /* keep compiler quiet */ + } + ereport(NOTICE, (errmsg("%s %s will create implicit index \"%s\" for table \"%s\"", is_alter_table ? "ALTER TABLE / ADD" : "CREATE TABLE /", - primary ? "PRIMARY KEY" : "UNIQUE", + constraint_type, indexRelationName, RelationGetRelationName(rel)))); + } /* save lockrelid and locktag for below, then close rel */ heaprelid = rel->rd_lockInfo.lockRelId; @@ -799,21 +841,38 @@ ComputeIndexAttrs(IndexInfo *indexInfo, Oid *classOidP, int16 *colOptionP, List *attList, /* list of IndexElem's */ + List *exclusionOpNames, Oid relId, char *accessMethodName, Oid accessMethodId, bool amcanorder, bool isconstraint) { - ListCell *rest; - int attn = 0; + ListCell *nextExclOp; + ListCell *lc; + int attn; + + /* Allocate space for exclusion operator info, if needed */ + if (exclusionOpNames) + { + int ncols = list_length(attList); + + Assert(list_length(exclusionOpNames) == ncols); + indexInfo->ii_ExclusionOps = (Oid *) palloc(sizeof(Oid) * ncols); + indexInfo->ii_ExclusionProcs = (Oid *) palloc(sizeof(Oid) * ncols); + indexInfo->ii_ExclusionStrats = (uint16 *) palloc(sizeof(uint16) * ncols); + nextExclOp = list_head(exclusionOpNames); + } + else + nextExclOp = NULL; /* * process attributeList */ - foreach(rest, attList) + attn = 0; + foreach(lc, attList) { - IndexElem *attribute = (IndexElem *) lfirst(rest); + IndexElem *attribute = (IndexElem *) lfirst(lc); Oid atttype; /* @@ -898,6 +957,71 @@ ComputeIndexAttrs(IndexInfo *indexInfo, accessMethodId); /* + * Identify the exclusion operator, if any. + */ + if (nextExclOp) + { + List *opname = (List *) lfirst(nextExclOp); + Oid opid; + Oid opfamily; + int strat; + + /* + * Find the operator --- it must accept the column datatype + * without runtime coercion (but binary compatibility is OK) + */ + opid = compatible_oper_opid(opname, atttype, atttype, false); + + /* + * Only allow commutative operators to be used in exclusion + * constraints. If X conflicts with Y, but Y does not conflict + * with X, bad things will happen. + */ + if (get_commutator(opid) != opid) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("operator %s is not commutative", + format_operator(opid)), + errdetail("Only commutative operators can be used in exclusion constraints."))); + + /* + * Operator must be a member of the right opfamily, too + */ + opfamily = get_opclass_family(classOidP[attn]); + strat = get_op_opfamily_strategy(opid, opfamily); + if (strat == 0) + { + HeapTuple opftuple; + Form_pg_opfamily opfform; + + /* + * attribute->opclass might not explicitly name the opfamily, + * so fetch the name of the selected opfamily for use in the + * error message. + */ + opftuple = SearchSysCache(OPFAMILYOID, + ObjectIdGetDatum(opfamily), + 0, 0, 0); + if (!HeapTupleIsValid(opftuple)) + elog(ERROR, "cache lookup failed for opfamily %u", + opfamily); + opfform = (Form_pg_opfamily) GETSTRUCT(opftuple); + + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("operator %s is not a member of operator family \"%s\"", + format_operator(opid), + NameStr(opfform->opfname)), + errdetail("The exclusion operator must be related to the index operator class for the constraint."))); + } + + indexInfo->ii_ExclusionOps[attn] = opid; + indexInfo->ii_ExclusionProcs[attn] = get_opcode(opid); + indexInfo->ii_ExclusionStrats[attn] = strat; + nextExclOp = lnext(nextExclOp); + } + + /* * Set up the per-column options (indoption field). For now, this is * zero for any un-ordered index, while ordered indexes have DESC and * NULLS FIRST/LAST options. |