diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2007-12-01 23:44:44 +0000 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2007-12-01 23:44:44 +0000 |
commit | 265f904d8f25db2f0272921e25a1397f548d235d (patch) | |
tree | bc1bd0992294e3bc13bf7c4f8409301143a1e40e /src/backend/parser/parse_utilcmd.c | |
parent | ba9da684b5dba9dc5daf05abd442f9b4a5a23979 (diff) | |
download | postgresql-265f904d8f25db2f0272921e25a1397f548d235d.tar.gz postgresql-265f904d8f25db2f0272921e25a1397f548d235d.zip |
Code review for LIKE ... INCLUDING INDEXES patch. Fix failure to propagate
constraint status of copied indexes (bug #3774), as well as various other
small bugs such as failure to pstrdup when needed. Allow INCLUDING INDEXES
indexes to be merged with identical declared indexes (perhaps not real useful,
but the code is there and having it not apply to LIKE indexes seems pretty
unorthogonal). Avoid useless work in generateClonedIndexStmt(). Undo some
poorly chosen API changes, and put a couple of routines in modules that seem
to be better places for them.
Diffstat (limited to 'src/backend/parser/parse_utilcmd.c')
-rw-r--r-- | src/backend/parser/parse_utilcmd.c | 257 |
1 files changed, 132 insertions, 125 deletions
diff --git a/src/backend/parser/parse_utilcmd.c b/src/backend/parser/parse_utilcmd.c index 2ff6f9274d7..a53f01c32ab 100644 --- a/src/backend/parser/parse_utilcmd.c +++ b/src/backend/parser/parse_utilcmd.c @@ -19,7 +19,7 @@ * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/backend/parser/parse_utilcmd.c,v 2.6 2007/11/15 21:14:37 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/parser/parse_utilcmd.c,v 2.7 2007/12/01 23:44:44 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -28,6 +28,8 @@ #include "access/genam.h" #include "access/heapam.h" +#include "access/reloptions.h" +#include "catalog/dependency.h" #include "catalog/heap.h" #include "catalog/index.h" #include "catalog/namespace.h" @@ -675,13 +677,15 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt, } } + /* + * Likewise, copy indexes if requested + */ if (including_indexes && relation->rd_rel->relhasindex) { - AttrNumber *attmap; + AttrNumber *attmap = varattnos_map_schema(tupleDesc, cxt->columns); List *parent_indexes; ListCell *l; - attmap = varattnos_map_schema(tupleDesc, cxt->columns); parent_indexes = RelationGetIndexList(relation); foreach(l, parent_indexes) @@ -693,14 +697,12 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt, parent_index = index_open(parent_index_oid, AccessShareLock); /* Build CREATE INDEX statement to recreate the parent_index */ - index_stmt = generateClonedIndexStmt(cxt, parent_index, - attmap); + index_stmt = generateClonedIndexStmt(cxt, parent_index, attmap); - /* Add the new IndexStmt to the create context */ + /* Save it in the inh_indexes list for the time being */ cxt->inh_indexes = lappend(cxt->inh_indexes, index_stmt); - /* Keep our lock on the index till xact commit */ - index_close(parent_index, NoLock); + index_close(parent_index, AccessShareLock); } } @@ -713,54 +715,62 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt, } /* - * Generate an IndexStmt entry using information from an already - * existing index "source_idx". - * - * Note: Much of this functionality is cribbed from pg_get_indexdef. + * Generate an IndexStmt node using information from an already existing index + * "source_idx". Attribute numbers should be adjusted according to attmap. */ static IndexStmt * generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx, AttrNumber *attmap) { - HeapTuple ht_idx; + Oid source_relid = RelationGetRelid(source_idx); HeapTuple ht_idxrel; - HeapTuple ht_am; - Form_pg_index idxrec; + HeapTuple ht_idx; Form_pg_class idxrelrec; + Form_pg_index idxrec; Form_pg_am amrec; - List *indexprs = NIL; + oidvector *indclass; + IndexStmt *index; + List *indexprs; ListCell *indexpr_item; Oid indrelid; - Oid source_relid; int keyno; Oid keycoltype; - Datum indclassDatum; - Datum indoptionDatum; + Datum datum; bool isnull; - oidvector *indclass; - int2vector *indoption; - IndexStmt *index; - Datum reloptions; - source_relid = RelationGetRelid(source_idx); + /* + * Fetch pg_class tuple of source index. We can't use the copy in the + * relcache entry because it doesn't include optional fields. + */ + ht_idxrel = SearchSysCache(RELOID, + ObjectIdGetDatum(source_relid), + 0, 0, 0); + if (!HeapTupleIsValid(ht_idxrel)) + elog(ERROR, "cache lookup failed for relation %u", source_relid); + idxrelrec = (Form_pg_class) GETSTRUCT(ht_idxrel); - /* Fetch pg_index tuple for source index */ - ht_idx = SearchSysCache(INDEXRELID, - ObjectIdGetDatum(source_relid), - 0, 0, 0); - if (!HeapTupleIsValid(ht_idx)) - elog(ERROR, "cache lookup failed for index %u", source_relid); + /* Fetch pg_index tuple for source index from relcache entry */ + ht_idx = source_idx->rd_indextuple; idxrec = (Form_pg_index) GETSTRUCT(ht_idx); - - Assert(source_relid == idxrec->indexrelid); indrelid = idxrec->indrelid; + /* Fetch pg_am tuple for source index from relcache entry */ + amrec = source_idx->rd_am; + + /* Must get indclass the hard way, since it's not stored in relcache */ + datum = SysCacheGetAttr(INDEXRELID, ht_idx, + Anum_pg_index_indclass, &isnull); + Assert(!isnull); + indclass = (oidvector *) DatumGetPointer(datum); + + /* Begin building the IndexStmt */ index = makeNode(IndexStmt); + index->relation = cxt->relation; + index->accessMethod = pstrdup(NameStr(amrec->amname)); + index->tableSpace = get_tablespace_name(source_idx->rd_node.spcNode); index->unique = idxrec->indisunique; - index->concurrent = false; index->primary = idxrec->indisprimary; - index->relation = cxt->relation; - index->isconstraint = false; + index->concurrent = false; /* * We don't try to preserve the name of the source index; instead, just @@ -768,65 +778,40 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx, */ index->idxname = NULL; - /* Must get indclass and indoption the hard way */ - indclassDatum = SysCacheGetAttr(INDEXRELID, ht_idx, - Anum_pg_index_indclass, &isnull); - Assert(!isnull); - indclass = (oidvector *) DatumGetPointer(indclassDatum); - indoptionDatum = SysCacheGetAttr(INDEXRELID, ht_idx, - Anum_pg_index_indoption, &isnull); - Assert(!isnull); - indoption = (int2vector *) DatumGetPointer(indoptionDatum); - - /* Fetch pg_class tuple of source index */ - ht_idxrel = SearchSysCache(RELOID, - ObjectIdGetDatum(source_relid), - 0, 0, 0); - if (!HeapTupleIsValid(ht_idxrel)) - elog(ERROR, "cache lookup failed for relation %u", source_relid); - /* - * Store the reloptions for later use by this new index + * If the index is marked PRIMARY, it's certainly from a constraint; + * else, if it's not marked UNIQUE, it certainly isn't; else, we have + * to search pg_depend to see if there's an associated unique constraint. */ - reloptions = SysCacheGetAttr(RELOID, ht_idxrel, - Anum_pg_class_reloptions, &isnull); - if (!isnull) - index->src_options = flatten_reloptions(source_relid); - - idxrelrec = (Form_pg_class) GETSTRUCT(ht_idxrel); - - /* Fetch pg_am tuple for the index's access method */ - ht_am = SearchSysCache(AMOID, - ObjectIdGetDatum(idxrelrec->relam), - 0, 0, 0); - if (!HeapTupleIsValid(ht_am)) - elog(ERROR, "cache lookup failed for access method %u", - idxrelrec->relam); - amrec = (Form_pg_am) GETSTRUCT(ht_am); - index->accessMethod = pstrdup(NameStr(amrec->amname)); + if (index->primary) + index->isconstraint = true; + else if (!index->unique) + index->isconstraint = false; + else + index->isconstraint = OidIsValid(get_index_constraint(source_relid)); /* Get the index expressions, if any */ - if (!heap_attisnull(ht_idx, Anum_pg_index_indexprs)) + datum = SysCacheGetAttr(INDEXRELID, ht_idx, + Anum_pg_index_indexprs, &isnull); + if (!isnull) { - Datum exprsDatum; - bool isnull; char *exprsString; - exprsDatum = SysCacheGetAttr(INDEXRELID, ht_idx, - Anum_pg_index_indexprs, &isnull); - exprsString = DatumGetCString(DirectFunctionCall1(textout, - exprsDatum)); - Assert(!isnull); + exprsString = DatumGetCString(DirectFunctionCall1(textout, datum)); indexprs = (List *) stringToNode(exprsString); } + else + indexprs = NIL; - indexpr_item = list_head(indexprs); + /* Build the list of IndexElem */ + index->indexParams = NIL; + indexpr_item = list_head(indexprs); for (keyno = 0; keyno < idxrec->indnatts; keyno++) { IndexElem *iparam; AttrNumber attnum = idxrec->indkey.values[keyno]; - int16 opt = indoption->values[keyno]; + int16 opt = source_idx->rd_indoption[keyno]; iparam = makeNode(IndexElem); @@ -849,11 +834,14 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx, if (indexpr_item == NULL) elog(ERROR, "too few entries in indexprs list"); indexkey = (Node *) lfirst(indexpr_item); + indexpr_item = lnext(indexpr_item); + + /* OK to modify indexkey since we are working on a private copy */ change_varattnos_of_a_node(indexkey, attmap); + iparam->name = NULL; iparam->expr = indexkey; - indexpr_item = lnext(indexpr_item); keycoltype = exprType(indexkey); } @@ -866,40 +854,50 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx, /* Adjust options if necessary */ if (amrec->amcanorder) { - /* If it supports sort ordering, report DESC and NULLS opts */ + /* + * If it supports sort ordering, copy DESC and NULLS opts. + * Don't set non-default settings unnecessarily, though, + * so as to improve the chance of recognizing equivalence + * to constraint indexes. + */ if (opt & INDOPTION_DESC) + { iparam->ordering = SORTBY_DESC; - if (opt & INDOPTION_NULLS_FIRST) - iparam->nulls_ordering = SORTBY_NULLS_FIRST; + if ((opt & INDOPTION_NULLS_FIRST) == 0) + iparam->nulls_ordering = SORTBY_NULLS_LAST; + } + else + { + if (opt & INDOPTION_NULLS_FIRST) + iparam->nulls_ordering = SORTBY_NULLS_FIRST; + } } index->indexParams = lappend(index->indexParams, iparam); } - /* Use the same tablespace as the source index */ - index->tableSpace = get_tablespace_name(source_idx->rd_node.spcNode); + /* Copy reloptions if any */ + datum = SysCacheGetAttr(RELOID, ht_idxrel, + Anum_pg_class_reloptions, &isnull); + if (!isnull) + index->options = untransformRelOptions(datum); /* If it's a partial index, decompile and append the predicate */ - if (!heap_attisnull(ht_idx, Anum_pg_index_indpred)) + datum = SysCacheGetAttr(INDEXRELID, ht_idx, + Anum_pg_index_indpred, &isnull); + if (!isnull) { - Datum pred_datum; - bool isnull; char *pred_str; /* Convert text string to node tree */ - pred_datum = SysCacheGetAttr(INDEXRELID, ht_idx, - Anum_pg_index_indpred, &isnull); - Assert(!isnull); - pred_str = DatumGetCString(DirectFunctionCall1(textout, - pred_datum)); + pred_str = DatumGetCString(DirectFunctionCall1(textout, datum)); index->whereClause = (Node *) stringToNode(pred_str); + /* Adjust attribute numbers */ change_varattnos_of_a_node(index->whereClause, attmap); } /* Clean up */ - ReleaseSysCache(ht_idx); ReleaseSysCache(ht_idxrel); - ReleaseSysCache(ht_am); return index; } @@ -924,11 +922,11 @@ get_opclass(Oid opclass, Oid actual_datatype) elog(ERROR, "cache lookup failed for opclass %u", opclass); opc_rec = (Form_pg_opclass) GETSTRUCT(ht_opc); - if (!OidIsValid(actual_datatype) || - GetDefaultOpClass(actual_datatype, opc_rec->opcmethod) != opclass) + if (GetDefaultOpClass(actual_datatype, opc_rec->opcmethod) != opclass) { + /* For simplicity, we always schema-qualify the name */ char *nsp_name = get_namespace_name(opc_rec->opcnamespace); - char *opc_name = NameStr(opc_rec->opcname); + char *opc_name = pstrdup(NameStr(opc_rec->opcname)); result = list_make2(makeString(nsp_name), makeString(opc_name)); } @@ -940,8 +938,8 @@ get_opclass(Oid opclass, Oid actual_datatype) /* * transformIndexConstraints - * Handle UNIQUE and PRIMARY KEY constraints, which create - * indexes. We also merge index definitions arising from + * Handle UNIQUE and PRIMARY KEY constraints, which create indexes. + * We also merge in any index definitions arising from * LIKE ... INCLUDING INDEXES. */ static void @@ -960,7 +958,30 @@ transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt) { Constraint *constraint = (Constraint *) lfirst(lc); + Assert(IsA(constraint, Constraint)); + Assert(constraint->contype == CONSTR_PRIMARY || + constraint->contype == CONSTR_UNIQUE); + index = transformIndexConstraint(constraint, cxt); + + indexlist = lappend(indexlist, index); + } + + /* Add in any indexes defined by LIKE ... INCLUDING INDEXES */ + foreach(lc, cxt->inh_indexes) + { + index = (IndexStmt *) lfirst(lc); + + if (index->primary) + { + if (cxt->pkey != NULL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("multiple primary keys for table \"%s\" are not allowed", + cxt->relation->relname))); + cxt->pkey = index; + } + indexlist = lappend(indexlist, index); } @@ -995,8 +1016,11 @@ transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt) { IndexStmt *priorindex = lfirst(k); - if (equal(index->indexParams, priorindex->indexParams)) + if (equal(index->indexParams, priorindex->indexParams) && + equal(index->whereClause, priorindex->whereClause) && + strcmp(index->accessMethod, priorindex->accessMethod) == 0) { + priorindex->unique |= index->unique; /* * If the prior index is as yet unnamed, and this one is * named, then transfer the name to the prior index. This @@ -1013,27 +1037,13 @@ transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt) if (keep) cxt->alist = lappend(cxt->alist, index); } - - /* Copy indexes defined by LIKE ... INCLUDING INDEXES */ - foreach(lc, cxt->inh_indexes) - { - index = (IndexStmt *) lfirst(lc); - - if (index->primary) - { - if (cxt->pkey) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("multiple primary keys for table \"%s\" are not allowed", - cxt->relation->relname))); - - cxt->pkey = index; - } - - cxt->alist = lappend(cxt->alist, index); - } } +/* + * transformIndexConstraint + * Transform one UNIQUE or PRIMARY KEY constraint for + * transformIndexConstraints. + */ static IndexStmt * transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt) { @@ -1041,13 +1051,10 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt) ListCell *keys; IndexElem *iparam; - Assert(constraint->contype == CONSTR_PRIMARY || - constraint->contype == CONSTR_UNIQUE); - index = makeNode(IndexStmt); + index->unique = true; index->primary = (constraint->contype == CONSTR_PRIMARY); - if (index->primary) { if (cxt->pkey != NULL) @@ -1771,7 +1778,7 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString) /* * transformIndexConstraints wants cxt.alist to contain only index - * statements, so transfer anything we already have into save_alist. + * statements, so transfer anything we already have into save_alist * immediately. */ save_alist = cxt.alist; |