diff options
Diffstat (limited to 'src/backend/parser/parse_utilcmd.c')
-rw-r--r-- | src/backend/parser/parse_utilcmd.c | 240 |
1 files changed, 157 insertions, 83 deletions
diff --git a/src/backend/parser/parse_utilcmd.c b/src/backend/parser/parse_utilcmd.c index 46d2803aa84..5c8a64d52e1 100644 --- a/src/backend/parser/parse_utilcmd.c +++ b/src/backend/parser/parse_utilcmd.c @@ -85,7 +85,6 @@ typedef struct List *ckconstraints; /* CHECK constraints */ List *fkconstraints; /* FOREIGN KEY constraints */ List *ixconstraints; /* index-creating constraints */ - List *inh_indexes; /* cloned indexes from INCLUDING INDEXES */ List *extstats; /* cloned extended statistics */ List *blist; /* "before list" of things to do before * creating the table */ @@ -150,6 +149,9 @@ static Const *transformPartitionBoundValue(ParseState *pstate, A_Const *con, * Returns a List of utility commands to be done in sequence. One of these * will be the transformed CreateStmt, but there may be additional actions * to be done before and after the actual DefineRelation() call. + * In addition to normal utility commands such as AlterTableStmt and + * IndexStmt, the result list may contain TableLikeClause(s), representing + * the need to perform additional parse analysis after DefineRelation(). * * SQL allows constraints to be scattered all over, so thumb through * the columns and collect all constraints into one place. @@ -238,7 +240,6 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString) cxt.ckconstraints = NIL; cxt.fkconstraints = NIL; cxt.ixconstraints = NIL; - cxt.inh_indexes = NIL; cxt.extstats = NIL; cxt.blist = NIL; cxt.alist = NIL; @@ -888,8 +889,11 @@ transformTableConstraint(CreateStmtContext *cxt, Constraint *constraint) * transformTableLikeClause * * Change the LIKE <srctable> portion of a CREATE TABLE statement into - * column definitions which recreate the user defined column portions of - * <srctable>. + * column definitions that recreate the user defined column portions of + * <srctable>. Also, if there are any LIKE options that we can't fully + * process at this point, add the TableLikeClause to cxt->alist, which + * will cause utility.c to call expandTableLikeClause() after the new + * table has been created. */ static void transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_clause) @@ -898,7 +902,6 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla Relation relation; TupleDesc tupleDesc; TupleConstr *constr; - AttrNumber *attmap; AclResult aclresult; char *comment; ParseCallbackState pcbstate; @@ -912,6 +915,7 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("LIKE is not supported for creating foreign tables"))); + /* Open the relation referenced by the LIKE clause */ relation = relation_openrv(table_like_clause->relation, AccessShareLock); if (relation->rd_rel->relkind != RELKIND_RELATION && @@ -951,14 +955,9 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla constr = tupleDesc->constr; /* - * Initialize column number map for map_variable_attnos(). We need this - * since dropped columns in the source table aren't copied, so the new - * table can have different column numbers. - */ - attmap = (AttrNumber *) palloc0(sizeof(AttrNumber) * tupleDesc->natts); - - /* * Insert the copied attributes into the cxt for the new table definition. + * We must do this now so that they appear in the table in the relative + * position where the LIKE clause is, as required by SQL99. */ for (parent_attno = 1; parent_attno <= tupleDesc->natts; parent_attno++) @@ -969,7 +968,7 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla ColumnDef *def; /* - * Ignore dropped columns in the parent. attmap entry is left zero. + * Ignore dropped columns in the parent. */ if (attribute->attisdropped) continue; @@ -1001,8 +1000,6 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla */ cxt->columns = lappend(cxt->columns, def); - attmap[parent_attno - 1] = list_length(cxt->columns); - /* * Copy default, if present and the default has been requested */ @@ -1083,21 +1080,125 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla cxt->hasoids |= relation->rd_rel->relhasoids; /* + * We cannot yet deal with CHECK constraints or indexes, since we don't + * yet know what column numbers the copied columns will have in the + * finished table. If any of those options are specified, add the LIKE + * clause to cxt->alist so that expandTableLikeClause will be called after + * we do know that. + */ + if (table_like_clause->options & + (CREATE_TABLE_LIKE_CONSTRAINTS | + CREATE_TABLE_LIKE_INDEXES)) + cxt->alist = lappend(cxt->alist, table_like_clause); + + /* + * We may copy extended statistics if requested, since the representation + * of CreateStatsStmt doesn't depend on column numbers. + */ + if (table_like_clause->options & CREATE_TABLE_LIKE_STATISTICS) + { + List *parent_extstats; + ListCell *l; + + parent_extstats = RelationGetStatExtList(relation); + + foreach(l, parent_extstats) + { + Oid parent_stat_oid = lfirst_oid(l); + CreateStatsStmt *stats_stmt; + + stats_stmt = generateClonedExtStatsStmt(cxt->relation, + RelationGetRelid(relation), + parent_stat_oid); + + /* Copy comment on statistics object, if requested */ + if (table_like_clause->options & CREATE_TABLE_LIKE_COMMENTS) + { + comment = GetComment(parent_stat_oid, StatisticExtRelationId, 0); + + /* + * We make use of CreateStatsStmt's stxcomment option, so as + * not to need to know now what name the statistics will have. + */ + stats_stmt->stxcomment = comment; + } + + cxt->extstats = lappend(cxt->extstats, stats_stmt); + } + + list_free(parent_extstats); + } + + /* + * Close the parent rel, but keep our AccessShareLock on it until xact + * commit. That will prevent someone else from deleting or ALTERing the + * parent before we can run expandTableLikeClause. + */ + heap_close(relation, NoLock); +} + +/* + * expandTableLikeClause + * + * Process LIKE options that require knowing the final column numbers + * assigned to the new table's columns. This executes after we have + * run DefineRelation for the new table. It returns a list of utility + * commands that should be run to generate indexes etc. + */ +List * +expandTableLikeClause(RangeVar *heapRel, TableLikeClause *table_like_clause) +{ + List *result = NIL; + List *atsubcmds = NIL; + Relation relation; + Relation childrel; + TupleDesc tupleDesc; + TupleConstr *constr; + AttrNumber *attmap; + char *comment; + + /* + * Open the relation referenced by the LIKE clause. We should still have + * the table lock obtained by transformTableLikeClause (and this'll throw + * an assertion failure if not). Hence, no need to recheck privileges + * etc. + */ + relation = relation_openrv(table_like_clause->relation, NoLock); + + tupleDesc = RelationGetDescr(relation); + constr = tupleDesc->constr; + + /* + * Open the newly-created child relation; we have lock on that too. + */ + childrel = relation_openrv(heapRel, NoLock); + + /* + * Construct a map from the LIKE relation's attnos to the child rel's. + * This re-checks type match etc, although it shouldn't be possible to + * have a failure since both tables are locked. + */ + attmap = convert_tuples_by_name_map(RelationGetDescr(childrel), + tupleDesc, + gettext_noop("could not convert row type")); + + /* * Copy CHECK constraints if requested, being careful to adjust attribute * numbers so they match the child. */ if ((table_like_clause->options & CREATE_TABLE_LIKE_CONSTRAINTS) && - tupleDesc->constr) + constr != NULL) { int ccnum; - for (ccnum = 0; ccnum < tupleDesc->constr->num_check; ccnum++) + for (ccnum = 0; ccnum < constr->num_check; ccnum++) { - char *ccname = tupleDesc->constr->check[ccnum].ccname; - char *ccbin = tupleDesc->constr->check[ccnum].ccbin; - Constraint *n = makeNode(Constraint); + char *ccname = constr->check[ccnum].ccname; + char *ccbin = constr->check[ccnum].ccbin; Node *ccbin_node; bool found_whole_row; + Constraint *n; + AlterTableCmd *atsubcmd; ccbin_node = map_variable_attnos(stringToNode(ccbin), 1, 0, @@ -1118,12 +1219,21 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla ccname, RelationGetRelationName(relation)))); + n = makeNode(Constraint); n->contype = CONSTR_CHECK; n->location = -1; n->conname = pstrdup(ccname); n->raw_expr = NULL; n->cooked_expr = nodeToString(ccbin_node); - cxt->ckconstraints = lappend(cxt->ckconstraints, n); + + /* We can skip validation, since the new table should be empty. */ + n->skip_validation = true; + n->initially_valid = true; + + atsubcmd = makeNode(AlterTableCmd); + atsubcmd->subtype = AT_AddConstraint; + atsubcmd->def = (Node *) n; + atsubcmds = lappend(atsubcmds, atsubcmd); /* Copy comment on constraint */ if ((table_like_clause->options & CREATE_TABLE_LIKE_COMMENTS) && @@ -1135,18 +1245,34 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla CommentStmt *stmt = makeNode(CommentStmt); stmt->objtype = OBJECT_TABCONSTRAINT; - stmt->object = (Node *) list_make3(makeString(cxt->relation->schemaname), - makeString(cxt->relation->relname), + stmt->object = (Node *) list_make3(makeString(heapRel->schemaname), + makeString(heapRel->relname), makeString(n->conname)); stmt->comment = comment; - cxt->alist = lappend(cxt->alist, stmt); + result = lappend(result, stmt); } } } /* - * Likewise, copy indexes if requested + * If we generated any ALTER TABLE actions above, wrap them into a single + * ALTER TABLE command. Stick it at the front of the result, so it runs + * before any CommentStmts we made above. + */ + if (atsubcmds) + { + AlterTableStmt *atcmd = makeNode(AlterTableStmt); + + atcmd->relation = copyObject(heapRel); + atcmd->cmds = atsubcmds; + atcmd->relkind = OBJECT_TABLE; + atcmd->missing_ok = false; + result = lcons(atcmd, result); + } + + /* + * Process indexes if required. */ if ((table_like_clause->options & CREATE_TABLE_LIKE_INDEXES) && relation->rd_rel->relhasindex) @@ -1165,7 +1291,7 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla parent_index = index_open(parent_index_oid, AccessShareLock); /* Build CREATE INDEX statement to recreate the parent_index */ - index_stmt = generateClonedIndexStmt(cxt->relation, InvalidOid, + index_stmt = generateClonedIndexStmt(heapRel, InvalidOid, parent_index, attmap, tupleDesc->natts, NULL); @@ -1181,49 +1307,14 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla index_stmt->idxcomment = comment; } - /* Save it in the inh_indexes list for the time being */ - cxt->inh_indexes = lappend(cxt->inh_indexes, index_stmt); + result = lappend(result, index_stmt); index_close(parent_index, AccessShareLock); } } - /* - * Likewise, copy extended statistics if requested - */ - if (table_like_clause->options & CREATE_TABLE_LIKE_STATISTICS) - { - List *parent_extstats; - ListCell *l; - - parent_extstats = RelationGetStatExtList(relation); - - foreach(l, parent_extstats) - { - Oid parent_stat_oid = lfirst_oid(l); - CreateStatsStmt *stats_stmt; - - stats_stmt = generateClonedExtStatsStmt(cxt->relation, - RelationGetRelid(relation), - parent_stat_oid); - - /* Copy comment on statistics object, if requested */ - if (table_like_clause->options & CREATE_TABLE_LIKE_COMMENTS) - { - comment = GetComment(parent_stat_oid, StatisticExtRelationId, 0); - - /* - * We make use of CreateStatsStmt's stxcomment option, so as - * not to need to know now what name the statistics will have. - */ - stats_stmt->stxcomment = comment; - } - - cxt->extstats = lappend(cxt->extstats, stats_stmt); - } - - list_free(parent_extstats); - } + /* Done with child rel */ + heap_close(childrel, NoLock); /* * Close the parent rel, but keep our AccessShareLock on it until xact @@ -1231,6 +1322,8 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla * parent before the child is committed. */ heap_close(relation, NoLock); + + return result; } static void @@ -1810,24 +1903,6 @@ transformIndexConstraints(CreateStmtContext *cxt) indexlist = lappend(indexlist, index); } - /* Add in any indexes defined by LIKE ... INCLUDING INDEXES */ - foreach(lc, cxt->inh_indexes) - { - index = (IndexStmt *) lfirst(lc); - - if (index->primary) - { - if (cxt->pkey != NULL) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("multiple primary keys for table \"%s\" are not allowed", - cxt->relation->relname))); - cxt->pkey = index; - } - - indexlist = lappend(indexlist, index); - } - /* * Scan the index list and remove any redundant index specifications. This * can happen if, for instance, the user writes UNIQUE PRIMARY KEY. A @@ -2962,7 +3037,6 @@ transformAlterTableStmt(Oid relid, AlterTableStmt *stmt, cxt.ckconstraints = NIL; cxt.fkconstraints = NIL; cxt.ixconstraints = NIL; - cxt.inh_indexes = NIL; cxt.extstats = NIL; cxt.blist = NIL; cxt.alist = NIL; |