From 474774918b4b55e774d2fcc1d7e94c8c632fadef Mon Sep 17 00:00:00 2001 From: Neil Conway Date: Tue, 17 Jul 2007 05:02:03 +0000 Subject: Implement CREATE TABLE LIKE ... INCLUDING INDEXES. Patch from NikhilS, based in part on an earlier patch from Trevor Hardcastle, and reviewed by myself. --- src/backend/parser/parse_utilcmd.c | 624 +++++++++++++++++++++++++++---------- 1 file changed, 457 insertions(+), 167 deletions(-) (limited to 'src/backend/parser/parse_utilcmd.c') diff --git a/src/backend/parser/parse_utilcmd.c b/src/backend/parser/parse_utilcmd.c index 37822251943..f17ad480212 100644 --- a/src/backend/parser/parse_utilcmd.c +++ b/src/backend/parser/parse_utilcmd.c @@ -19,20 +19,23 @@ * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/backend/parser/parse_utilcmd.c,v 2.1 2007/06/23 22:12:51 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/parser/parse_utilcmd.c,v 2.2 2007/07/17 05:02:02 neilc Exp $ * *------------------------------------------------------------------------- */ #include "postgres.h" +#include "access/genam.h" #include "access/heapam.h" #include "catalog/heap.h" #include "catalog/index.h" #include "catalog/namespace.h" +#include "catalog/pg_opclass.h" #include "catalog/pg_type.h" #include "commands/defrem.h" #include "commands/tablecmds.h" +#include "commands/tablespace.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "optimizer/clauses.h" @@ -47,6 +50,7 @@ #include "utils/acl.h" #include "utils/builtins.h" #include "utils/lsyscache.h" +#include "utils/relcache.h" #include "utils/syscache.h" @@ -63,6 +67,7 @@ typedef struct List *ckconstraints; /* CHECK constraints */ List *fkconstraints; /* FOREIGN KEY constraints */ List *ixconstraints; /* index-creating constraints */ + List *inh_indexes; /* cloned indexes from INCLUDING INDEXES */ List *blist; /* "before list" of things to do before * creating the table */ List *alist; /* "after list" of things to do after creating @@ -93,8 +98,13 @@ static void transformTableConstraint(ParseState *pstate, Constraint *constraint); static void transformInhRelation(ParseState *pstate, CreateStmtContext *cxt, InhRelation *inhrelation); +static IndexStmt *generateClonedIndexStmt(CreateStmtContext *cxt, + Relation parent_index, AttrNumber *attmap); +static List *get_opclass(Oid opclass, Oid actual_datatype); static void transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt); +static IndexStmt *transformIndexConstraint(Constraint *constraint, + CreateStmtContext *cxt); static void transformFKConstraints(ParseState *pstate, CreateStmtContext *cxt, bool skipValidation, @@ -146,6 +156,7 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString) cxt.ckconstraints = NIL; cxt.fkconstraints = NIL; cxt.ixconstraints = NIL; + cxt.inh_indexes = NIL; cxt.blist = NIL; cxt.alist = NIL; cxt.pkey = NULL; @@ -555,11 +566,6 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt, } } - if (including_indexes) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("LIKE INCLUDING INDEXES is not implemented"))); - /* * Insert the copied attributes into the cxt for the new table * definition. @@ -657,6 +663,35 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt, } } + if (including_indexes && relation->rd_rel->relhasindex) + { + AttrNumber *attmap; + List *parent_indexes; + ListCell *l; + + attmap = varattnos_map_schema(tupleDesc, cxt->columns); + parent_indexes = RelationGetIndexList(relation); + + foreach(l, parent_indexes) + { + Oid parent_index_oid = lfirst_oid(l); + Relation parent_index; + IndexStmt *index_stmt; + + parent_index = index_open(parent_index_oid, AccessShareLock); + + /* Build CREATE INDEX statement to recreate the parent_index */ + index_stmt = generateClonedIndexStmt(cxt, parent_index, + attmap); + + /* Add the new IndexStmt to the create context */ + cxt->inh_indexes = lappend(cxt->inh_indexes, index_stmt); + + /* Keep our lock on the index till xact commit */ + index_close(parent_index, NoLock); + } + } + /* * Close the parent rel, but keep our AccessShareLock on it until xact * commit. That will prevent someone else from deleting or ALTERing the @@ -666,188 +701,254 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt, } /* - * transformIndexConstraints - * Handle UNIQUE and PRIMARY KEY constraints, which create indexes + * Generate an IndexStmt entry using information from an already + * existing index "source_idx". + * + * Note: Much of this functionality is cribbed from pg_get_indexdef. */ -static void -transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt) +static IndexStmt * +generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx, + AttrNumber *attmap) { - IndexStmt *index; - List *indexlist = NIL; - ListCell *listptr; - ListCell *l; + HeapTuple ht_idx; + HeapTuple ht_idxrel; + HeapTuple ht_am; + Form_pg_index idxrec; + Form_pg_class idxrelrec; + Form_pg_am amrec; + List *indexprs = NIL; + ListCell *indexpr_item; + Oid indrelid; + Oid source_relid; + int keyno; + Oid keycoltype; + Datum indclassDatum; + Datum indoptionDatum; + bool isnull; + oidvector *indclass; + int2vector *indoption; + IndexStmt *index; + Datum reloptions; + + source_relid = RelationGetRelid(source_idx); + + /* Fetch pg_index tuple for source index */ + ht_idx = SearchSysCache(INDEXRELID, + ObjectIdGetDatum(source_relid), + 0, 0, 0); + if (!HeapTupleIsValid(ht_idx)) + elog(ERROR, "cache lookup failed for index %u", source_relid); + idxrec = (Form_pg_index) GETSTRUCT(ht_idx); + + Assert(source_relid == idxrec->indexrelid); + indrelid = idxrec->indrelid; + + index = makeNode(IndexStmt); + index->unique = idxrec->indisunique; + index->concurrent = false; + index->primary = idxrec->indisprimary; + index->relation = cxt->relation; + index->isconstraint = false; /* - * Run through the constraints that need to generate an index. For PRIMARY - * KEY, mark each column as NOT NULL and create an index. For UNIQUE, - * create an index as for PRIMARY KEY, but do not insist on NOT NULL. + * We don't try to preserve the name of the source index; instead, just + * let DefineIndex() choose a reasonable name. + */ + index->idxname = NULL; + + /* Must get indclass and indoption the hard way */ + indclassDatum = SysCacheGetAttr(INDEXRELID, ht_idx, + Anum_pg_index_indclass, &isnull); + Assert(!isnull); + indclass = (oidvector *) DatumGetPointer(indclassDatum); + indoptionDatum = SysCacheGetAttr(INDEXRELID, ht_idx, + Anum_pg_index_indoption, &isnull); + Assert(!isnull); + indoption = (int2vector *) DatumGetPointer(indoptionDatum); + + /* Fetch pg_class tuple of source index */ + ht_idxrel = SearchSysCache(RELOID, + ObjectIdGetDatum(source_relid), + 0, 0, 0); + if (!HeapTupleIsValid(ht_idxrel)) + elog(ERROR, "cache lookup failed for relation %u", source_relid); + + /* + * Store the reloptions for later use by this new index */ - foreach(listptr, cxt->ixconstraints) + reloptions = SysCacheGetAttr(RELOID, ht_idxrel, + Anum_pg_class_reloptions, &isnull); + if (!isnull) + index->src_options = flatten_reloptions(source_relid); + + idxrelrec = (Form_pg_class) GETSTRUCT(ht_idxrel); + + /* Fetch pg_am tuple for the index's access method */ + ht_am = SearchSysCache(AMOID, + ObjectIdGetDatum(idxrelrec->relam), + 0, 0, 0); + if (!HeapTupleIsValid(ht_am)) + elog(ERROR, "cache lookup failed for access method %u", + idxrelrec->relam); + amrec = (Form_pg_am) GETSTRUCT(ht_am); + index->accessMethod = pstrdup(NameStr(amrec->amname)); + + /* Get the index expressions, if any */ + if (!heap_attisnull(ht_idx, Anum_pg_index_indexprs)) { - Constraint *constraint = lfirst(listptr); - ListCell *keys; - IndexElem *iparam; + Datum exprsDatum; + bool isnull; + char *exprsString; + + exprsDatum = SysCacheGetAttr(INDEXRELID, ht_idx, + Anum_pg_index_indexprs, &isnull); + exprsString = DatumGetCString(DirectFunctionCall1(textout, + exprsDatum)); + Assert(!isnull); + indexprs = (List *) stringToNode(exprsString); + } - Assert(IsA(constraint, Constraint)); - Assert((constraint->contype == CONSTR_PRIMARY) - || (constraint->contype == CONSTR_UNIQUE)); + indexpr_item = list_head(indexprs); - index = makeNode(IndexStmt); + for (keyno = 0; keyno < idxrec->indnatts; keyno++) + { + IndexElem *iparam; + AttrNumber attnum = idxrec->indkey.values[keyno]; + int16 opt = indoption->values[keyno]; - index->unique = true; - index->primary = (constraint->contype == CONSTR_PRIMARY); - if (index->primary) + iparam = makeNode(IndexElem); + + if (AttributeNumberIsValid(attnum)) { - if (cxt->pkey != NULL) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("multiple primary keys for table \"%s\" are not allowed", - cxt->relation->relname))); - cxt->pkey = index; + /* Simple index column */ + char *attname; - /* - * In ALTER TABLE case, a primary index might already exist, but - * DefineIndex will check for it. - */ - } - index->isconstraint = true; + attname = get_relid_attribute_name(indrelid, attnum); + keycoltype = get_atttype(indrelid, attnum); - if (constraint->name != NULL) - index->idxname = pstrdup(constraint->name); + iparam->name = attname; + iparam->expr = NULL; + } else - index->idxname = NULL; /* DefineIndex will choose name */ + { + /* Expressional index */ + Node *indexkey; + + if (indexpr_item == NULL) + elog(ERROR, "too few entries in indexprs list"); + indexkey = (Node *) lfirst(indexpr_item); + change_varattnos_of_a_node(indexkey, attmap); + iparam->name = NULL; + iparam->expr = indexkey; + + indexpr_item = lnext(indexpr_item); + keycoltype = exprType(indexkey); + } - index->relation = cxt->relation; - index->accessMethod = DEFAULT_INDEX_TYPE; - index->options = constraint->options; - index->tableSpace = constraint->indexspace; - index->indexParams = NIL; - index->whereClause = NULL; - index->concurrent = false; + /* Add the operator class name, if non-default */ + iparam->opclass = get_opclass(indclass->values[keyno], keycoltype); - /* - * Make sure referenced keys exist. If we are making a PRIMARY KEY - * index, also make sure they are NOT NULL, if possible. (Although we - * could leave it to DefineIndex to mark the columns NOT NULL, it's - * more efficient to get it right the first time.) - */ - foreach(keys, constraint->keys) + iparam->ordering = SORTBY_DEFAULT; + iparam->nulls_ordering = SORTBY_NULLS_DEFAULT; + + /* Adjust options if necessary */ + if (amrec->amcanorder) { - char *key = strVal(lfirst(keys)); - bool found = false; - ColumnDef *column = NULL; - ListCell *columns; + /* If it supports sort ordering, report DESC and NULLS opts */ + if (opt & INDOPTION_DESC) + iparam->ordering = SORTBY_DESC; + if (opt & INDOPTION_NULLS_FIRST) + iparam->nulls_ordering = SORTBY_NULLS_FIRST; + } - foreach(columns, cxt->columns) - { - column = (ColumnDef *) lfirst(columns); - Assert(IsA(column, ColumnDef)); - if (strcmp(column->colname, key) == 0) - { - found = true; - break; - } - } - if (found) - { - /* found column in the new table; force it to be NOT NULL */ - if (constraint->contype == CONSTR_PRIMARY) - column->is_not_null = TRUE; - } - else if (SystemAttributeByName(key, cxt->hasoids) != NULL) - { - /* - * column will be a system column in the new table, so accept - * it. System columns can't ever be null, so no need to worry - * about PRIMARY/NOT NULL constraint. - */ - found = true; - } - else if (cxt->inhRelations) - { - /* try inherited tables */ - ListCell *inher; + index->indexParams = lappend(index->indexParams, iparam); + } - foreach(inher, cxt->inhRelations) - { - RangeVar *inh = (RangeVar *) lfirst(inher); - Relation rel; - int count; + /* Use the same tablespace as the source index */ + index->tableSpace = get_tablespace_name(source_idx->rd_node.spcNode); - Assert(IsA(inh, RangeVar)); - rel = heap_openrv(inh, AccessShareLock); - if (rel->rd_rel->relkind != RELKIND_RELATION) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("inherited relation \"%s\" is not a table", - inh->relname))); - for (count = 0; count < rel->rd_att->natts; count++) - { - Form_pg_attribute inhattr = rel->rd_att->attrs[count]; - char *inhname = NameStr(inhattr->attname); - - if (inhattr->attisdropped) - continue; - if (strcmp(key, inhname) == 0) - { - found = true; - - /* - * We currently have no easy way to force an - * inherited column to be NOT NULL at creation, if - * its parent wasn't so already. We leave it to - * DefineIndex to fix things up in this case. - */ - break; - } - } - heap_close(rel, NoLock); - if (found) - break; - } - } + /* If it's a partial index, decompile and append the predicate */ + if (!heap_attisnull(ht_idx, Anum_pg_index_indpred)) + { + Datum pred_datum; + bool isnull; + char *pred_str; + + /* Convert text string to node tree */ + pred_datum = SysCacheGetAttr(INDEXRELID, ht_idx, + Anum_pg_index_indpred, &isnull); + Assert(!isnull); + pred_str = DatumGetCString(DirectFunctionCall1(textout, + pred_datum)); + index->whereClause = (Node *) stringToNode(pred_str); + change_varattnos_of_a_node(index->whereClause, attmap); + } - /* - * In the ALTER TABLE case, don't complain about index keys not - * created in the command; they may well exist already. - * DefineIndex will complain about them if not, and will also take - * care of marking them NOT NULL. - */ - if (!found && !cxt->isalter) - ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_COLUMN), - errmsg("column \"%s\" named in key does not exist", - key))); + /* Clean up */ + ReleaseSysCache(ht_idx); + ReleaseSysCache(ht_idxrel); + ReleaseSysCache(ht_am); - /* Check for PRIMARY KEY(foo, foo) */ - foreach(columns, index->indexParams) - { - iparam = (IndexElem *) lfirst(columns); - if (iparam->name && strcmp(key, iparam->name) == 0) - { - if (index->primary) - ereport(ERROR, - (errcode(ERRCODE_DUPLICATE_COLUMN), - errmsg("column \"%s\" appears twice in primary key constraint", - key))); - else - ereport(ERROR, - (errcode(ERRCODE_DUPLICATE_COLUMN), - errmsg("column \"%s\" appears twice in unique constraint", - key))); - } - } + return index; +} - /* OK, add it to the index definition */ - iparam = makeNode(IndexElem); - iparam->name = pstrdup(key); - iparam->expr = NULL; - iparam->opclass = NIL; - iparam->ordering = SORTBY_DEFAULT; - iparam->nulls_ordering = SORTBY_NULLS_DEFAULT; - index->indexParams = lappend(index->indexParams, iparam); - } +/* + * get_opclass - fetch name of an index operator class + * + * If the opclass is the default for the given actual_datatype, then + * the return value is NIL. + */ +static List * +get_opclass(Oid opclass, Oid actual_datatype) +{ + HeapTuple ht_opc; + Form_pg_opclass opc_rec; + List *result = NIL; + + ht_opc = SearchSysCache(CLAOID, + ObjectIdGetDatum(opclass), + 0, 0, 0); + if (!HeapTupleIsValid(ht_opc)) + elog(ERROR, "cache lookup failed for opclass %u", opclass); + opc_rec = (Form_pg_opclass) GETSTRUCT(ht_opc); + + if (!OidIsValid(actual_datatype) || + GetDefaultOpClass(actual_datatype, opc_rec->opcmethod) != opclass) + { + char *nsp_name = get_namespace_name(opc_rec->opcnamespace); + char *opc_name = NameStr(opc_rec->opcname); + + result = list_make2(makeString(nsp_name), makeString(opc_name)); + } + ReleaseSysCache(ht_opc); + return result; +} + + +/* + * transformIndexConstraints + * Handle UNIQUE and PRIMARY KEY constraints, which create + * indexes. We also merge index definitions arising from + * LIKE ... INCLUDING INDEXES. + */ +static void +transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt) +{ + IndexStmt *index; + List *indexlist = NIL; + ListCell *lc; + + /* + * Run through the constraints that need to generate an index. For PRIMARY + * KEY, mark each column as NOT NULL and create an index. For UNIQUE, + * create an index as for PRIMARY KEY, but do not insist on NOT NULL. + */ + foreach(lc, cxt->ixconstraints) + { + Constraint *constraint = (Constraint *) lfirst(lc); + + index = transformIndexConstraint(constraint, cxt); indexlist = lappend(indexlist, index); } @@ -867,12 +968,12 @@ transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt) cxt->alist = list_make1(cxt->pkey); } - foreach(l, indexlist) + foreach(lc, indexlist) { bool keep = true; ListCell *k; - index = lfirst(l); + index = lfirst(lc); /* if it's pkey, it's already in cxt->alist */ if (index == cxt->pkey) @@ -900,6 +1001,194 @@ transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt) if (keep) cxt->alist = lappend(cxt->alist, index); } + + /* Copy indexes defined by LIKE ... INCLUDING INDEXES */ + foreach(lc, cxt->inh_indexes) + { + index = (IndexStmt *) lfirst(lc); + + if (index->primary) + { + if (cxt->pkey) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("multiple primary keys for table \"%s\" are not allowed", + cxt->relation->relname))); + + cxt->pkey = index; + } + + cxt->alist = lappend(cxt->alist, index); + } +} + +static IndexStmt * +transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt) +{ + IndexStmt *index; + ListCell *keys; + IndexElem *iparam; + + Assert(constraint->contype == CONSTR_PRIMARY || + constraint->contype == CONSTR_UNIQUE); + + index = makeNode(IndexStmt); + index->unique = true; + index->primary = (constraint->contype == CONSTR_PRIMARY); + + if (index->primary) + { + if (cxt->pkey != NULL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("multiple primary keys for table \"%s\" are not allowed", + cxt->relation->relname))); + cxt->pkey = index; + + /* + * In ALTER TABLE case, a primary index might already exist, but + * DefineIndex will check for it. + */ + } + index->isconstraint = true; + + if (constraint->name != NULL) + index->idxname = pstrdup(constraint->name); + else + index->idxname = NULL; /* DefineIndex will choose name */ + + index->relation = cxt->relation; + index->accessMethod = DEFAULT_INDEX_TYPE; + index->options = constraint->options; + index->tableSpace = constraint->indexspace; + index->indexParams = NIL; + index->whereClause = NULL; + index->concurrent = false; + + /* + * Make sure referenced keys exist. If we are making a PRIMARY KEY + * index, also make sure they are NOT NULL, if possible. (Although we + * could leave it to DefineIndex to mark the columns NOT NULL, it's + * more efficient to get it right the first time.) + */ + foreach(keys, constraint->keys) + { + char *key = strVal(lfirst(keys)); + bool found = false; + ColumnDef *column = NULL; + ListCell *columns; + + foreach(columns, cxt->columns) + { + column = (ColumnDef *) lfirst(columns); + Assert(IsA(column, ColumnDef)); + if (strcmp(column->colname, key) == 0) + { + found = true; + break; + } + } + if (found) + { + /* found column in the new table; force it to be NOT NULL */ + if (constraint->contype == CONSTR_PRIMARY) + column->is_not_null = TRUE; + } + else if (SystemAttributeByName(key, cxt->hasoids) != NULL) + { + /* + * column will be a system column in the new table, so accept + * it. System columns can't ever be null, so no need to worry + * about PRIMARY/NOT NULL constraint. + */ + found = true; + } + else if (cxt->inhRelations) + { + /* try inherited tables */ + ListCell *inher; + + foreach(inher, cxt->inhRelations) + { + RangeVar *inh = (RangeVar *) lfirst(inher); + Relation rel; + int count; + + Assert(IsA(inh, RangeVar)); + rel = heap_openrv(inh, AccessShareLock); + if (rel->rd_rel->relkind != RELKIND_RELATION) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("inherited relation \"%s\" is not a table", + inh->relname))); + for (count = 0; count < rel->rd_att->natts; count++) + { + Form_pg_attribute inhattr = rel->rd_att->attrs[count]; + char *inhname = NameStr(inhattr->attname); + + if (inhattr->attisdropped) + continue; + if (strcmp(key, inhname) == 0) + { + found = true; + + /* + * We currently have no easy way to force an + * inherited column to be NOT NULL at creation, if + * its parent wasn't so already. We leave it to + * DefineIndex to fix things up in this case. + */ + break; + } + } + heap_close(rel, NoLock); + if (found) + break; + } + } + + /* + * In the ALTER TABLE case, don't complain about index keys not + * created in the command; they may well exist already. + * DefineIndex will complain about them if not, and will also take + * care of marking them NOT NULL. + */ + if (!found && !cxt->isalter) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("column \"%s\" named in key does not exist", + key))); + + /* Check for PRIMARY KEY(foo, foo) */ + foreach(columns, index->indexParams) + { + iparam = (IndexElem *) lfirst(columns); + if (iparam->name && strcmp(key, iparam->name) == 0) + { + if (index->primary) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_COLUMN), + errmsg("column \"%s\" appears twice in primary key constraint", + key))); + else + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_COLUMN), + errmsg("column \"%s\" appears twice in unique constraint", + key))); + } + } + + /* OK, add it to the index definition */ + iparam = makeNode(IndexElem); + iparam->name = pstrdup(key); + iparam->expr = NULL; + iparam->opclass = NIL; + iparam->ordering = SORTBY_DEFAULT; + iparam->nulls_ordering = SORTBY_NULLS_DEFAULT; + index->indexParams = lappend(index->indexParams, iparam); + } + + return index; } /* @@ -1376,6 +1665,7 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString) cxt.ckconstraints = NIL; cxt.fkconstraints = NIL; cxt.ixconstraints = NIL; + cxt.inh_indexes = NIL; cxt.blist = NIL; cxt.alist = NIL; cxt.pkey = NULL; -- cgit v1.2.3