diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2004-05-05 04:48:48 +0000 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2004-05-05 04:48:48 +0000 |
commit | 077db40fa1f3ef93a60d995cc5b2666474f81302 (patch) | |
tree | 911e14b79a368b10ad9fc267fa69f299e86b15f9 /src/backend/commands | |
parent | 3e3cb0a14a608bb058407b6349c3c9e2d09e13b8 (diff) | |
download | postgresql-077db40fa1f3ef93a60d995cc5b2666474f81302.tar.gz postgresql-077db40fa1f3ef93a60d995cc5b2666474f81302.zip |
ALTER TABLE rewrite. New cool stuff:
* ALTER ... ADD COLUMN with defaults and NOT NULL constraints works per SQL
spec. A default is implemented by rewriting the table with the new value
stored in each row.
* ALTER COLUMN TYPE. You can change a column's datatype to anything you
want, so long as you can specify how to convert the old value. Rewrites
the table. (Possible future improvement: optimize no-op conversions such
as varchar(N) to varchar(N+1).)
* Multiple ALTER actions in a single ALTER TABLE command. You can perform
any number of column additions, type changes, and constraint additions with
only one pass over the table contents.
Basic documentation provided in ALTER TABLE ref page, but some more docs
work is needed.
Original patch from Rod Taylor, additional work from Tom Lane.
Diffstat (limited to 'src/backend/commands')
-rw-r--r-- | src/backend/commands/cluster.c | 81 | ||||
-rw-r--r-- | src/backend/commands/indexcmds.c | 246 | ||||
-rw-r--r-- | src/backend/commands/tablecmds.c | 3071 |
3 files changed, 2329 insertions, 1069 deletions
diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c index a10bbbd746c..b4bf08006eb 100644 --- a/src/backend/commands/cluster.c +++ b/src/backend/commands/cluster.c @@ -11,7 +11,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/cluster.c,v 1.120 2004/03/23 19:35:16 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/commands/cluster.c,v 1.121 2004/05/05 04:48:45 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -64,11 +64,7 @@ typedef struct static void cluster_rel(RelToCluster *rv, bool recheck); -static Oid make_new_heap(Oid OIDOldHeap, const char *NewName); static void copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex); -static List *get_indexattr_list(Relation OldHeap, Oid OldIndex); -static void rebuild_indexes(Oid OIDOldHeap, List *indexes); -static void swap_relfilenodes(Oid r1, Oid r2); static List *get_tables_to_cluster(MemoryContext cluster_context); @@ -479,7 +475,7 @@ rebuild_relation(Relation OldHeap, Oid indexOid) /* * Create the new table that we will fill with correctly-ordered data. */ -static Oid +Oid make_new_heap(Oid OIDOldHeap, const char *NewName) { TupleDesc OldHeapDesc, @@ -578,7 +574,7 @@ copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex) * Get the necessary info about the indexes of the relation and * return a list of IndexAttrs structures. */ -static List * +List * get_indexattr_list(Relation OldHeap, Oid OldIndex) { List *indexes = NIL; @@ -621,7 +617,7 @@ get_indexattr_list(Relation OldHeap, Oid OldIndex) * Create new indexes and swap the filenodes with old indexes. Then drop * the new index (carrying the old index filenode along). */ -static void +void rebuild_indexes(Oid OIDOldHeap, List *indexes) { List *elem; @@ -646,10 +642,15 @@ rebuild_indexes(Oid OIDOldHeap, List *indexes) * matter: after the filenode swap the index will keep the * constraint status of the old index. */ - newIndexOID = index_create(OIDOldHeap, newIndexName, - attrs->indexInfo, attrs->accessMethodOID, - attrs->classOID, false, - false, allowSystemTableMods); + newIndexOID = index_create(OIDOldHeap, + newIndexName, + attrs->indexInfo, + attrs->accessMethodOID, + attrs->classOID, + false, + false, + allowSystemTableMods, + false); CommandCounterIncrement(); /* Swap the filenodes. */ @@ -698,7 +699,7 @@ rebuild_indexes(Oid OIDOldHeap, List *indexes) * Also swap any TOAST links, so that the toast data moves along with * the main-table data. */ -static void +void swap_relfilenodes(Oid r1, Oid r2) { Relation relRelation, @@ -789,9 +790,9 @@ swap_relfilenodes(Oid r1, Oid r2) * their new owning relations. Otherwise the wrong one will get * dropped ... * - * NOTE: for now, we can assume the new table will have a TOAST table if - * and only if the old one does. This logic might need work if we get - * smarter about dropped columns. + * NOTE: it is possible that only one table has a toast table; this + * can happen in CLUSTER if there were dropped columns in the old table, + * and in ALTER TABLE when adding or changing type of columns. * * NOTE: at present, a TOAST table's only dependency is the one on its * owning table. If more are ever created, we'd need to use something @@ -804,35 +805,43 @@ swap_relfilenodes(Oid r1, Oid r2) toastobject; long count; - if (!(relform1->reltoastrelid && relform2->reltoastrelid)) - elog(ERROR, "expected both swapped tables to have TOAST tables"); - /* Delete old dependencies */ - count = deleteDependencyRecordsFor(RelOid_pg_class, - relform1->reltoastrelid); - if (count != 1) - elog(ERROR, "expected one dependency record for TOAST table, found %ld", - count); - count = deleteDependencyRecordsFor(RelOid_pg_class, - relform2->reltoastrelid); - if (count != 1) - elog(ERROR, "expected one dependency record for TOAST table, found %ld", - count); + if (relform1->reltoastrelid) + { + count = deleteDependencyRecordsFor(RelOid_pg_class, + relform1->reltoastrelid); + if (count != 1) + elog(ERROR, "expected one dependency record for TOAST table, found %ld", + count); + } + if (relform2->reltoastrelid) + { + count = deleteDependencyRecordsFor(RelOid_pg_class, + relform2->reltoastrelid); + if (count != 1) + elog(ERROR, "expected one dependency record for TOAST table, found %ld", + count); + } /* Register new dependencies */ baseobject.classId = RelOid_pg_class; - baseobject.objectId = r1; baseobject.objectSubId = 0; toastobject.classId = RelOid_pg_class; - toastobject.objectId = relform1->reltoastrelid; toastobject.objectSubId = 0; - recordDependencyOn(&toastobject, &baseobject, DEPENDENCY_INTERNAL); - - baseobject.objectId = r2; - toastobject.objectId = relform2->reltoastrelid; + if (relform1->reltoastrelid) + { + baseobject.objectId = r1; + toastobject.objectId = relform1->reltoastrelid; + recordDependencyOn(&toastobject, &baseobject, DEPENDENCY_INTERNAL); + } - recordDependencyOn(&toastobject, &baseobject, DEPENDENCY_INTERNAL); + if (relform2->reltoastrelid) + { + baseobject.objectId = r2; + toastobject.objectId = relform2->reltoastrelid; + recordDependencyOn(&toastobject, &baseobject, DEPENDENCY_INTERNAL); + } } /* diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c index 507bcf50d5f..efcc70c6856 100644 --- a/src/backend/commands/indexcmds.c +++ b/src/backend/commands/indexcmds.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/indexcmds.c,v 1.117 2003/12/28 21:57:36 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/commands/indexcmds.c,v 1.118 2004/05/05 04:48:45 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -31,6 +31,7 @@ #include "miscadmin.h" #include "optimizer/clauses.h" #include "optimizer/prep.h" +#include "parser/analyze.h" #include "parser/parsetree.h" #include "parser/parse_coerce.h" #include "parser/parse_expr.h" @@ -38,38 +39,62 @@ #include "utils/acl.h" #include "utils/builtins.h" #include "utils/lsyscache.h" +#include "utils/relcache.h" #include "utils/syscache.h" /* non-export function prototypes */ static void CheckPredicate(Expr *predicate); static void ComputeIndexAttrs(IndexInfo *indexInfo, Oid *classOidP, - List *attList, - Oid relId, - char *accessMethodName, Oid accessMethodId); + List *attList, + Oid relId, + char *accessMethodName, Oid accessMethodId, + bool isconstraint); static Oid GetIndexOpClass(List *opclass, Oid attrType, char *accessMethodName, Oid accessMethodId); static Oid GetDefaultOpClass(Oid attrType, Oid accessMethodId); +static char *CreateIndexName(const char *table_name, const char *column_name, + const char *label, Oid inamespace); +static bool relationHasPrimaryKey(Relation rel); + /* * DefineIndex * Creates a new index. * - * 'attributeList' is a list of IndexElem specifying columns and expressions + * 'heapRelation': the relation the index will apply to. + * 'indexRelationName': the name for the new index, or NULL to indicate + * that a nonconflicting default name should be picked. + * 'accessMethodName': name of the AM to use. + * 'attributeList': a list of IndexElem specifying columns and expressions * to index on. - * 'predicate' is the qual specified in the where clause. - * 'rangetable' is needed to interpret the predicate. + * 'predicate': the partial-index condition, or NULL if none. + * 'rangetable': needed to interpret the predicate. + * 'unique': make the index enforce uniqueness. + * 'primary': mark the index as a primary key in the catalogs. + * 'isconstraint': index is for a PRIMARY KEY or UNIQUE constraint, + * so build a pg_constraint entry for it. + * 'is_alter_table': this is due to an ALTER rather than a CREATE operation. + * 'check_rights': check for CREATE rights in the namespace. (This should + * be true except when ALTER is deleting/recreating an index.) + * 'skip_build': make the catalog entries but leave the index file empty; + * it will be filled later. + * 'quiet': suppress the NOTICE chatter ordinarily provided for constraints. */ void DefineIndex(RangeVar *heapRelation, char *indexRelationName, char *accessMethodName, List *attributeList, + Expr *predicate, + List *rangetable, bool unique, bool primary, bool isconstraint, - Expr *predicate, - List *rangetable) + bool is_alter_table, + bool check_rights, + bool skip_build, + bool quiet) { Oid *classObjectId; Oid accessMethodId; @@ -111,15 +136,13 @@ DefineIndex(RangeVar *heapRelation, relationId = RelationGetRelid(rel); namespaceId = RelationGetNamespace(rel); - heap_close(rel, NoLock); - /* * Verify we (still) have CREATE rights in the rel's namespace. * (Presumably we did when the rel was created, but maybe not - * anymore.) Skip check if bootstrapping, since permissions machinery - * may not be working yet. + * anymore.) Skip check if caller doesn't want it. Also skip check + * if bootstrapping, since permissions machinery may not be working yet. */ - if (!IsBootstrapProcessingMode()) + if (check_rights && !IsBootstrapProcessingMode()) { AclResult aclresult; @@ -131,6 +154,27 @@ DefineIndex(RangeVar *heapRelation, } /* + * Select name for index if caller didn't specify + */ + if (indexRelationName == NULL) + { + if (primary) + indexRelationName = CreateIndexName(RelationGetRelationName(rel), + NULL, + "pkey", + namespaceId); + else + { + IndexElem *iparam = (IndexElem *) lfirst(attributeList); + + indexRelationName = CreateIndexName(RelationGetRelationName(rel), + iparam->name, + "key", + namespaceId); + } + } + + /* * look up the access method, verify it can handle the requested * features */ @@ -177,13 +221,33 @@ DefineIndex(RangeVar *heapRelation, CheckPredicate(predicate); /* - * Check that all of the attributes in a primary key are marked as not - * null, otherwise attempt to ALTER TABLE .. SET NOT NULL + * Extra checks when creating a PRIMARY KEY index. */ if (primary) { + List *cmds; List *keys; + /* + * If ALTER TABLE, check that there isn't already a PRIMARY KEY. + * In CREATE TABLE, we have faith that the parser rejected multiple + * pkey clauses; and CREATE INDEX doesn't have a way to say + * PRIMARY KEY, so it's no problem either. + */ + if (is_alter_table && + relationHasPrimaryKey(rel)) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("multiple primary keys for table \"%s\" are not allowed", + RelationGetRelationName(rel)))); + } + + /* + * Check that all of the attributes in a primary key are marked as not + * null, otherwise attempt to ALTER TABLE .. SET NOT NULL + */ + cmds = NIL; foreach(keys, attributeList) { IndexElem *key = (IndexElem *) lfirst(keys); @@ -203,29 +267,43 @@ DefineIndex(RangeVar *heapRelation, { if (!((Form_pg_attribute) GETSTRUCT(atttuple))->attnotnull) { - /* - * Try to make it NOT NULL. - * - * XXX: Shouldn't the ALTER TABLE .. SET NOT NULL cascade - * to child tables? Currently, since the PRIMARY KEY - * itself doesn't cascade, we don't cascade the - * notnull constraint either; but this is pretty - * debatable. - */ - AlterTableAlterColumnSetNotNull(relationId, false, - key->name); + /* Add a subcommand to make this one NOT NULL */ + AlterTableCmd *cmd = makeNode(AlterTableCmd); + + cmd->subtype = AT_SetNotNull; + cmd->name = key->name; + + cmds = lappend(cmds, cmd); } ReleaseSysCache(atttuple); } else { - /* This shouldn't happen if parser did its job ... */ + /* + * This shouldn't happen during CREATE TABLE, but can + * happen during ALTER TABLE. Keep message in sync with + * transformIndexConstraints() in parser/analyze.c. + */ ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN), errmsg("column \"%s\" named in key does not exist", key->name))); } } + + /* + * XXX: Shouldn't the ALTER TABLE .. SET NOT NULL cascade + * to child tables? Currently, since the PRIMARY KEY + * itself doesn't cascade, we don't cascade the + * notnull constraint(s) either; but this is pretty debatable. + * + * XXX: possible future improvement: when being called from + * ALTER TABLE, it would be more efficient to merge this with + * the outer ALTER TABLE, so as to avoid two scans. But that + * seems to complicate DefineIndex's API unduly. + */ + if (cmds) + AlterTableInternal(relationId, cmds, false); } /* @@ -242,11 +320,26 @@ DefineIndex(RangeVar *heapRelation, classObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid)); ComputeIndexAttrs(indexInfo, classObjectId, attributeList, - relationId, accessMethodName, accessMethodId); + relationId, accessMethodName, accessMethodId, + isconstraint); + + heap_close(rel, NoLock); + + /* + * Report index creation if appropriate (delay this till after most + * of the error checks) + */ + if (isconstraint && !quiet) + ereport(NOTICE, + (errmsg("%s %s will create implicit index \"%s\" for table \"%s\"", + is_alter_table ? "ALTER TABLE / ADD" : "CREATE TABLE /", + primary ? "PRIMARY KEY" : "UNIQUE", + indexRelationName, RelationGetRelationName(rel)))); index_create(relationId, indexRelationName, indexInfo, accessMethodId, classObjectId, - primary, isconstraint, allowSystemTableMods); + primary, isconstraint, + allowSystemTableMods, skip_build); /* * We update the relation's pg_class tuple even if it already has @@ -303,7 +396,8 @@ ComputeIndexAttrs(IndexInfo *indexInfo, List *attList, /* list of IndexElem's */ Oid relId, char *accessMethodName, - Oid accessMethodId) + Oid accessMethodId, + bool isconstraint) { List *rest; int attn = 0; @@ -325,10 +419,19 @@ ComputeIndexAttrs(IndexInfo *indexInfo, Assert(attribute->expr == NULL); atttuple = SearchSysCacheAttName(relId, attribute->name); if (!HeapTupleIsValid(atttuple)) - ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_COLUMN), - errmsg("column \"%s\" does not exist", - attribute->name))); + { + /* difference in error message spellings is historical */ + if (isconstraint) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("column \"%s\" named in key does not exist", + attribute->name))); + else + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("column \"%s\" does not exist", + attribute->name))); + } attform = (Form_pg_attribute) GETSTRUCT(atttuple); indexInfo->ii_KeyAttrNumbers[attn] = attform->attnum; atttype = attform->atttypid; @@ -554,6 +657,79 @@ GetDefaultOpClass(Oid attrType, Oid accessMethodId) } /* + * Select a nonconflicting name for an index. + */ +static char * +CreateIndexName(const char *table_name, const char *column_name, + const char *label, Oid inamespace) +{ + int pass = 0; + char *iname = NULL; + char typename[NAMEDATALEN]; + + /* + * The type name for makeObjectName is label, or labelN if that's + * necessary to prevent collision with existing indexes. + */ + strncpy(typename, label, sizeof(typename)); + + for (;;) + { + iname = makeObjectName(table_name, column_name, typename); + + if (!OidIsValid(get_relname_relid(iname, inamespace))) + break; + + /* found a conflict, so try a new name component */ + pfree(iname); + snprintf(typename, sizeof(typename), "%s%d", label, ++pass); + } + + return iname; +} + +/* + * relationHasPrimaryKey - + * + * See whether an existing relation has a primary key. + */ +static bool +relationHasPrimaryKey(Relation rel) +{ + bool result = false; + List *indexoidlist, + *indexoidscan; + + /* + * Get the list of index OIDs for the table from the relcache, and + * look up each one in the pg_index syscache until we find one marked + * primary key (hopefully there isn't more than one such). + */ + indexoidlist = RelationGetIndexList(rel); + + foreach(indexoidscan, indexoidlist) + { + Oid indexoid = lfirsto(indexoidscan); + HeapTuple indexTuple; + + indexTuple = SearchSysCache(INDEXRELID, + ObjectIdGetDatum(indexoid), + 0, 0, 0); + if (!HeapTupleIsValid(indexTuple)) /* should not happen */ + elog(ERROR, "cache lookup failed for index %u", indexoid); + result = ((Form_pg_index) GETSTRUCT(indexTuple))->indisprimary; + ReleaseSysCache(indexTuple); + if (result) + break; + } + + freeList(indexoidlist); + + return result; +} + + +/* * RemoveIndex * Deletes an index. */ diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index a6e3a93d349..a9c307b80c3 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.102 2004/04/01 21:28:44 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.103 2004/05/05 04:48:45 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -24,27 +24,33 @@ #include "catalog/indexing.h" #include "catalog/namespace.h" #include "catalog/pg_constraint.h" +#include "catalog/pg_depend.h" #include "catalog/pg_inherits.h" #include "catalog/pg_namespace.h" #include "catalog/pg_opclass.h" #include "catalog/pg_trigger.h" #include "catalog/pg_type.h" #include "commands/cluster.h" +#include "commands/defrem.h" #include "commands/tablecmds.h" #include "commands/trigger.h" #include "executor/executor.h" +#include "lib/stringinfo.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "optimizer/clauses.h" #include "optimizer/plancat.h" #include "optimizer/prep.h" +#include "parser/analyze.h" #include "parser/gramparse.h" +#include "parser/parser.h" #include "parser/parse_clause.h" #include "parser/parse_coerce.h" #include "parser/parse_expr.h" #include "parser/parse_oper.h" #include "parser/parse_relation.h" #include "parser/parse_type.h" +#include "rewrite/rewriteHandler.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/fmgroids.h" @@ -76,6 +82,83 @@ typedef struct OnCommitItem static List *on_commits = NIL; +/* + * State information for ALTER TABLE + * + * The pending-work queue for an ALTER TABLE is a List of AlteredTableInfo + * structs, one for each table modified by the operation (the named table + * plus any child tables that are affected). We save lists of subcommands + * to apply to this table (possibly modified by parse transformation steps); + * these lists will be executed in Phase 2. If a Phase 3 step is needed, + * necessary information is stored in the constraints and newvals lists. + * + * Phase 2 is divided into multiple passes; subcommands are executed in + * a pass determined by subcommand type. + */ + +#define AT_PASS_DROP 0 /* DROP (all flavors) */ +#define AT_PASS_ALTER_TYPE 1 /* ALTER COLUMN TYPE */ +#define AT_PASS_OLD_INDEX 2 /* re-add existing indexes */ +#define AT_PASS_OLD_CONSTR 3 /* re-add existing constraints */ +#define AT_PASS_COL_ATTRS 4 /* set other column attributes */ +/* We could support a RENAME COLUMN pass here, but not currently used */ +#define AT_PASS_ADD_COL 5 /* ADD COLUMN */ +#define AT_PASS_ADD_INDEX 6 /* ADD indexes */ +#define AT_PASS_ADD_CONSTR 7 /* ADD constraints, defaults */ +#define AT_PASS_MISC 8 /* other stuff */ +#define AT_NUM_PASSES 9 + +typedef struct AlteredTableInfo +{ + /* Information saved before any work commences: */ + Oid relid; /* Relation to work on */ + TupleDesc oldDesc; /* Pre-modification tuple descriptor */ + /* Information saved by Phase 1 for Phase 2: */ + List *subcmds[AT_NUM_PASSES]; /* Lists of AlterTableCmd */ + /* Information saved by Phases 1/2 for Phase 3: */ + List *constraints; /* List of NewConstraint */ + List *newvals; /* List of NewColumnValue */ + /* Objects to rebuild after completing ALTER TYPE operations */ + List *changedConstraintOids; /* OIDs of constraints to rebuild */ + List *changedConstraintDefs; /* string definitions of same */ + List *changedIndexOids; /* OIDs of indexes to rebuild */ + List *changedIndexDefs; /* string definitions of same */ + /* Workspace for ATExecAddConstraint */ + int constr_name_ctr; +} AlteredTableInfo; + +/* Struct describing one new constraint to check in Phase 3 scan */ +typedef struct NewConstraint +{ + char *name; /* Constraint name, or NULL if none */ + ConstrType contype; /* CHECK, NOT_NULL, or FOREIGN */ + AttrNumber attnum; /* only relevant for NOT_NULL */ + Oid refrelid; /* PK rel, if FOREIGN */ + Node *qual; /* Check expr or FkConstraint struct */ + List *qualstate; /* Execution state for CHECK */ +} NewConstraint; + +/* + * Struct describing one new column value that needs to be computed during + * Phase 3 copy (this could be either a new column with a non-null default, or + * a column that we're changing the type of). Columns without such an entry + * are just copied from the old table during ATRewriteTable. Note that the + * expr is an expression over *old* table values. + */ +typedef struct NewColumnValue +{ + AttrNumber attnum; /* which column */ + Expr *expr; /* expression to compute */ + ExprState *exprstate; /* execution state */ +} NewColumnValue; + + +/* Used by attribute and relation renaming routines: */ +#define RI_TRIGGER_PK 1 /* is a trigger on the PK relation */ +#define RI_TRIGGER_FK 2 /* is a trigger on the FK relation */ +#define RI_TRIGGER_NONE 0 /* is not an RI trigger function */ + + static List *MergeAttributes(List *schema, List *supers, bool istemp, List **supOids, List **supconstr, int *supOidCount); static bool change_varattnos_of_a_node(Node *node, const AttrNumber *newattno); @@ -83,9 +166,6 @@ static void StoreCatalogInheritance(Oid relationId, List *supers); static int findAttrByName(const char *attributeName, List *schema); static void setRelhassubclassInRelation(Oid relationId, bool relhassubclass); static bool needs_toast_table(Relation rel); -static void AlterTableAddCheckConstraint(Relation rel, Constraint *constr); -static void AlterTableAddForeignKeyConstraint(Relation rel, - FkConstraint *fkconstraint); static int transformColumnNameList(Oid relId, List *colList, int16 *attnums, Oid *atttypids); static int transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid, @@ -100,13 +180,58 @@ static void validateForeignKeyConstraint(FkConstraint *fkconstraint, static void createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint, Oid constrOid); static char *fkMatchTypeToString(char match_type); - -/* Used by attribute and relation renaming routines: */ - -#define RI_TRIGGER_PK 1 /* is a trigger on the PK relation */ -#define RI_TRIGGER_FK 2 /* is a trigger on the FK relation */ -#define RI_TRIGGER_NONE 0 /* is not an RI trigger function */ - +static void ATController(Relation rel, List *cmds, bool recurse); +static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, + bool recurse, bool recursing); +static void ATRewriteCatalogs(List **wqueue); +static void ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd); +static void ATRewriteTables(List **wqueue); +static void ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap); +static AlteredTableInfo *ATGetQueueEntry(List **wqueue, Relation rel); +static void ATSimplePermissions(Relation rel, bool allowView); +static void ATSimpleRecursion(List **wqueue, Relation rel, + AlterTableCmd *cmd, bool recurse); +static void ATOneLevelRecursion(List **wqueue, Relation rel, + AlterTableCmd *cmd); +static void ATPrepAddColumn(List **wqueue, Relation rel, bool recurse, + AlterTableCmd *cmd); +static void ATExecAddColumn(AlteredTableInfo *tab, Relation rel, + ColumnDef *colDef); +static void add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid); +static void ATExecDropNotNull(Relation rel, const char *colName); +static void ATExecSetNotNull(AlteredTableInfo *tab, Relation rel, + const char *colName); +static void ATExecColumnDefault(Relation rel, const char *colName, + Node *newDefault); +static void ATPrepSetStatistics(Relation rel, const char *colName, + Node *flagValue); +static void ATExecSetStatistics(Relation rel, const char *colName, + Node *newValue); +static void ATExecSetStorage(Relation rel, const char *colName, + Node *newValue); +static void ATExecDropColumn(Relation rel, const char *colName, + DropBehavior behavior, + bool recurse, bool recursing); +static void ATExecAddIndex(AlteredTableInfo *tab, Relation rel, + IndexStmt *stmt, bool is_rebuild); +static void ATExecAddConstraint(AlteredTableInfo *tab, Relation rel, + Node *newConstraint); +static void ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, + FkConstraint *fkconstraint); +static void ATPrepDropConstraint(List **wqueue, Relation rel, + bool recurse, AlterTableCmd *cmd); +static void ATExecDropConstraint(Relation rel, const char *constrName, + DropBehavior behavior, bool quiet); +static void ATPrepAlterColumnType(List **wqueue, + AlteredTableInfo *tab, Relation rel, + bool recurse, bool recursing, + AlterTableCmd *cmd); +static void ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel, + const char *colName, TypeName *typename); +static void ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab); +static void ATPostAlterTypeParse(char *cmd, List **wqueue); +static void ATExecChangeOwner(Oid relationOid, int32 newOwnerSysId); +static void ATExecClusterOn(Relation rel, const char *indexName); static int ri_trigger_type(Oid tgfoid); static void update_ri_trigger_args(Oid relid, const char *oldname, @@ -1100,7 +1225,7 @@ renameatt(Oid myrelid, if (childrelid == myrelid) continue; - /* note we need not recurse again! */ + /* note we need not recurse again */ renameatt(childrelid, oldattname, newattname, false, true); } } @@ -1129,7 +1254,7 @@ renameatt(Oid myrelid, attform = (Form_pg_attribute) GETSTRUCT(atttup); attnum = attform->attnum; - if (attnum < 0) + if (attnum <= 0) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot rename system column \"%s\"", @@ -1240,8 +1365,7 @@ renameatt(Oid myrelid, true, false); } - relation_close(targetrelation, NoLock); /* close rel but keep - * lock! */ + relation_close(targetrelation, NoLock); /* close rel but keep lock */ } /* @@ -1559,55 +1683,803 @@ update_ri_trigger_args(Oid relid, CommandCounterIncrement(); } +/* + * AlterTable + * Execute ALTER TABLE, which can be a list of subcommands + * + * ALTER TABLE is performed in three phases: + * 1. Examine subcommands and perform pre-transformation checking. + * 2. Update system catalogs. + * 3. Scan table(s) to check new constraints, and optionally recopy + * the data into new table(s). + * Phase 3 is not performed unless one or more of the subcommands requires + * it. The intention of this design is to allow multiple independent + * updates of the table schema to be performed with only one pass over the + * data. + * + * ATPrepCmd performs phase 1. A "work queue" entry is created for + * each table to be affected (there may be multiple affected tables if the + * commands traverse a table inheritance hierarchy). Also we do preliminary + * validation of the subcommands, including parse transformation of those + * expressions that need to be evaluated with respect to the old table + * schema. + * + * ATRewriteCatalogs performs phase 2 for each affected table (note that + * phases 2 and 3 do no explicit recursion, since phase 1 already did it). + * Certain subcommands need to be performed before others to avoid + * unnecessary conflicts; for example, DROP COLUMN should come before + * ADD COLUMN. Therefore phase 1 divides the subcommands into multiple + * lists, one for each logical "pass" of phase 2. + * + * ATRewriteTables performs phase 3 for those tables that need it. + * + * Thanks to the magic of MVCC, an error anywhere along the way rolls back + * the whole operation; we don't have to do anything special to clean up. + */ +void +AlterTable(AlterTableStmt *stmt) +{ + ATController(relation_openrv(stmt->relation, AccessExclusiveLock), + stmt->cmds, + interpretInhOption(stmt->relation->inhOpt)); +} -/* ---------------- - * AlterTableAddColumn - * (formerly known as PerformAddAttribute) +/* + * AlterTableInternal * - * adds an additional attribute to a relation - * ---------------- + * ALTER TABLE with target specified by OID */ void -AlterTableAddColumn(Oid myrelid, - bool recurse, - ColumnDef *colDef) +AlterTableInternal(Oid relid, List *cmds, bool recurse) { - Relation rel, - pgclass, - attrdesc; - HeapTuple reltup; - HeapTuple newreltup; - HeapTuple attributeTuple; - Form_pg_attribute attribute; - FormData_pg_attribute attributeD; + ATController(relation_open(relid, AccessExclusiveLock), + cmds, + recurse); +} + +static void +ATController(Relation rel, List *cmds, bool recurse) +{ + List *wqueue = NIL; + List *lcmd; + + /* Phase 1: preliminary examination of commands, create work queue */ + foreach(lcmd, cmds) + { + AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd); + + ATPrepCmd(&wqueue, rel, cmd, recurse, false); + } + + /* Close the relation, but keep lock until commit */ + relation_close(rel, NoLock); + + /* Phase 2: update system catalogs */ + ATRewriteCatalogs(&wqueue); + + /* Phase 3: scan/rewrite tables as needed */ + ATRewriteTables(&wqueue); +} + +/* + * ATPrepCmd + * + * Traffic cop for ALTER TABLE Phase 1 operations, including simple + * recursion and permission checks. + * + * Caller must have acquired AccessExclusiveLock on relation already. + * This lock should be held until commit. + */ +static void +ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, + bool recurse, bool recursing) +{ + AlteredTableInfo *tab; + int pass; + + /* Find or create work queue entry for this table */ + tab = ATGetQueueEntry(wqueue, rel); + + /* + * Copy the original subcommand for each table. This avoids conflicts + * when different child tables need to make different parse + * transformations (for example, the same column may have different + * column numbers in different children). + */ + cmd = copyObject(cmd); + + /* + * Do permissions checking, recursion to child tables if needed, + * and any additional phase-1 processing needed. + */ + switch (cmd->subtype) + { + case AT_AddColumn: /* ADD COLUMN */ + ATSimplePermissions(rel, false); + /* Performs own recursion */ + ATPrepAddColumn(wqueue, rel, recurse, cmd); + pass = AT_PASS_ADD_COL; + break; + case AT_ColumnDefault: /* ALTER COLUMN DEFAULT */ + /* + * We allow defaults on views so that INSERT into a view can have + * default-ish behavior. This works because the rewriter + * substitutes default values into INSERTs before it expands + * rules. + */ + ATSimplePermissions(rel, true); + ATSimpleRecursion(wqueue, rel, cmd, recurse); + /* No command-specific prep needed */ + pass = AT_PASS_ADD_CONSTR; + break; + case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */ + ATSimplePermissions(rel, false); + ATSimpleRecursion(wqueue, rel, cmd, recurse); + /* No command-specific prep needed */ + pass = AT_PASS_DROP; + break; + case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */ + ATSimplePermissions(rel, false); + ATSimpleRecursion(wqueue, rel, cmd, recurse); + /* No command-specific prep needed */ + pass = AT_PASS_ADD_CONSTR; + break; + case AT_SetStatistics: /* ALTER COLUMN STATISTICS */ + ATSimpleRecursion(wqueue, rel, cmd, recurse); + /* Performs own permission checks */ + ATPrepSetStatistics(rel, cmd->name, cmd->def); + pass = AT_PASS_COL_ATTRS; + break; + case AT_SetStorage: /* ALTER COLUMN STORAGE */ + ATSimplePermissions(rel, false); + ATSimpleRecursion(wqueue, rel, cmd, recurse); + /* No command-specific prep needed */ + pass = AT_PASS_COL_ATTRS; + break; + case AT_DropColumn: /* DROP COLUMN */ + ATSimplePermissions(rel, false); + /* Recursion occurs during execution phase */ + /* No command-specific prep needed except saving recurse flag */ + if (recurse) + cmd->subtype = AT_DropColumnRecurse; + pass = AT_PASS_DROP; + break; + case AT_AddIndex: /* ADD INDEX */ + ATSimplePermissions(rel, false); + /* This command never recurses */ + /* No command-specific prep needed */ + pass = AT_PASS_ADD_INDEX; + break; + case AT_AddConstraint: /* ADD CONSTRAINT */ + ATSimplePermissions(rel, false); + /* + * Currently we recurse only for CHECK constraints, never for + * foreign-key constraints. UNIQUE/PKEY constraints won't be + * seen here. + */ + if (IsA(cmd->def, Constraint)) + ATSimpleRecursion(wqueue, rel, cmd, recurse); + /* No command-specific prep needed */ + pass = AT_PASS_ADD_CONSTR; + break; + case AT_DropConstraint: /* DROP CONSTRAINT */ + ATSimplePermissions(rel, false); + /* Performs own recursion */ + ATPrepDropConstraint(wqueue, rel, recurse, cmd); + pass = AT_PASS_DROP; + break; + case AT_DropConstraintQuietly: /* DROP CONSTRAINT for child */ + ATSimplePermissions(rel, false); + ATSimpleRecursion(wqueue, rel, cmd, recurse); + /* No command-specific prep needed */ + pass = AT_PASS_DROP; + break; + case AT_AlterColumnType: /* ALTER COLUMN TYPE */ + ATSimplePermissions(rel, false); + /* Performs own recursion */ + ATPrepAlterColumnType(wqueue, tab, rel, recurse, recursing, cmd); + pass = AT_PASS_ALTER_TYPE; + break; + case AT_ToastTable: /* CREATE TOAST TABLE */ + ATSimplePermissions(rel, false); + /* This command never recurses */ + /* No command-specific prep needed */ + pass = AT_PASS_MISC; + break; + case AT_ChangeOwner: /* ALTER OWNER */ + /* check that we are the superuser */ + if (!superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to alter owner"))); + /* This command never recurses */ + /* No command-specific prep needed */ + pass = AT_PASS_MISC; + break; + case AT_ClusterOn: /* CLUSTER ON */ + ATSimplePermissions(rel, false); + /* This command never recurses */ + /* No command-specific prep needed */ + pass = AT_PASS_MISC; + break; + case AT_DropOids: /* SET WITHOUT OIDS */ + ATSimplePermissions(rel, false); + /* Performs own recursion */ + if (rel->rd_rel->relhasoids) + { + AlterTableCmd *dropCmd = makeNode(AlterTableCmd); + + dropCmd->subtype = AT_DropColumn; + dropCmd->name = pstrdup("oid"); + dropCmd->behavior = cmd->behavior; + ATPrepCmd(wqueue, rel, dropCmd, recurse, false); + } + pass = AT_PASS_DROP; + break; + default: /* oops */ + elog(ERROR, "unrecognized alter table type: %d", + (int) cmd->subtype); + pass = 0; /* keep compiler quiet */ + break; + } + + /* Add the subcommand to the appropriate list for phase 2 */ + tab->subcmds[pass] = lappend(tab->subcmds[pass], cmd); +} + +/* + * ATRewriteCatalogs + * + * Traffic cop for ALTER TABLE Phase 2 operations. Subcommands are + * dispatched in a "safe" execution order (designed to avoid unnecessary + * conflicts). + */ +static void +ATRewriteCatalogs(List **wqueue) +{ + int pass; + List *ltab; + + /* + * We process all the tables "in parallel", one pass at a time. This + * is needed because we may have to propagate work from one table + * to another (specifically, ALTER TYPE on a foreign key's PK has to + * dispatch the re-adding of the foreign key constraint to the other + * table). Work can only be propagated into later passes, however. + */ + for (pass = 0; pass < AT_NUM_PASSES; pass++) + { + /* Go through each table that needs to be processed */ + foreach(ltab, *wqueue) + { + AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab); + List *subcmds = tab->subcmds[pass]; + Relation rel; + List *lcmd; + + if (subcmds == NIL) + continue; + + /* Exclusive lock was obtained by phase 1, needn't get it again */ + rel = relation_open(tab->relid, NoLock); + + foreach(lcmd, subcmds) + { + ATExecCmd(tab, rel, (AlterTableCmd *) lfirst(lcmd)); + } + + /* + * After the ALTER TYPE pass, do cleanup work (this is not done in + * ATExecAlterColumnType since it should be done only once if + * multiple columns of a table are altered). + */ + if (pass == AT_PASS_ALTER_TYPE) + ATPostAlterTypeCleanup(wqueue, tab); + + relation_close(rel, NoLock); + } + } + + /* + * Do an implicit CREATE TOAST TABLE if we executed any subcommands + * that might have added a column or changed column storage. + */ + foreach(ltab, *wqueue) + { + AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab); + + if (tab->subcmds[AT_PASS_ADD_COL] || + tab->subcmds[AT_PASS_ALTER_TYPE] || + tab->subcmds[AT_PASS_COL_ATTRS]) + { + AlterTableCreateToastTable(tab->relid, true); + } + } +} + +/* + * ATExecCmd: dispatch a subcommand to appropriate execution routine + */ +static void +ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd) +{ + switch (cmd->subtype) + { + case AT_AddColumn: /* ADD COLUMN */ + ATExecAddColumn(tab, rel, (ColumnDef *) cmd->def); + break; + case AT_ColumnDefault: /* ALTER COLUMN DEFAULT */ + ATExecColumnDefault(rel, cmd->name, cmd->def); + break; + case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */ + ATExecDropNotNull(rel, cmd->name); + break; + case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */ + ATExecSetNotNull(tab, rel, cmd->name); + break; + case AT_SetStatistics: /* ALTER COLUMN STATISTICS */ + ATExecSetStatistics(rel, cmd->name, cmd->def); + break; + case AT_SetStorage: /* ALTER COLUMN STORAGE */ + ATExecSetStorage(rel, cmd->name, cmd->def); + break; + case AT_DropColumn: /* DROP COLUMN */ + ATExecDropColumn(rel, cmd->name, cmd->behavior, false, false); + break; + case AT_DropColumnRecurse: /* DROP COLUMN with recursion */ + ATExecDropColumn(rel, cmd->name, cmd->behavior, true, false); + break; + case AT_AddIndex: /* ADD INDEX */ + ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, false); + break; + case AT_ReAddIndex: /* ADD INDEX */ + ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, true); + break; + case AT_AddConstraint: /* ADD CONSTRAINT */ + ATExecAddConstraint(tab, rel, cmd->def); + break; + case AT_DropConstraint: /* DROP CONSTRAINT */ + ATExecDropConstraint(rel, cmd->name, cmd->behavior, false); + break; + case AT_DropConstraintQuietly: /* DROP CONSTRAINT for child */ + ATExecDropConstraint(rel, cmd->name, cmd->behavior, true); + break; + case AT_AlterColumnType: /* ALTER COLUMN TYPE */ + ATExecAlterColumnType(tab, rel, cmd->name, (TypeName *) cmd->def); + break; + case AT_ToastTable: /* CREATE TOAST TABLE */ + AlterTableCreateToastTable(RelationGetRelid(rel), false); + break; + case AT_ChangeOwner: /* ALTER OWNER */ + /* get_usesysid raises an error if no such user */ + ATExecChangeOwner(RelationGetRelid(rel), get_usesysid(cmd->name)); + break; + case AT_ClusterOn: /* CLUSTER ON */ + ATExecClusterOn(rel, cmd->name); + break; + case AT_DropOids: /* SET WITHOUT OIDS */ + /* + * Nothing to do here; we'll have generated a DropColumn subcommand + * to do the real work + */ + break; + default: /* oops */ + elog(ERROR, "unrecognized alter table type: %d", + (int) cmd->subtype); + break; + } + + /* + * Bump the command counter to ensure the next subcommand in the sequence + * can see the changes so far + */ + CommandCounterIncrement(); +} + +/* + * ATRewriteTables: ALTER TABLE phase 3 + */ +static void +ATRewriteTables(List **wqueue) +{ + List *ltab; + + /* Go through each table that needs to be checked or rewritten */ + foreach(ltab, *wqueue) + { + AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab); + + /* + * We only need to rewrite the table if at least one column needs + * to be recomputed. + */ + if (tab->newvals != NIL) + { + /* Build a temporary relation and copy data */ + Oid OIDNewHeap; + char NewHeapName[NAMEDATALEN]; + List *indexes; + Relation OldHeap; + ObjectAddress object; + + /* Save the information about all indexes on the relation. */ + OldHeap = heap_open(tab->relid, NoLock); + indexes = get_indexattr_list(OldHeap, InvalidOid); + heap_close(OldHeap, NoLock); + + /* + * Create the new heap, using a temporary name in the same + * namespace as the existing table. NOTE: there is some risk of + * collision with user relnames. Working around this seems more + * trouble than it's worth; in particular, we can't create the new + * heap in a different namespace from the old, or we will have + * problems with the TEMP status of temp tables. + */ + snprintf(NewHeapName, sizeof(NewHeapName), + "pg_temp_%u", tab->relid); + + OIDNewHeap = make_new_heap(tab->relid, NewHeapName); + + /* + * Copy the heap data into the new table with the desired + * modifications, and test the current data within the table + * against new constraints generated by ALTER TABLE commands. + */ + ATRewriteTable(tab, OIDNewHeap); + + /* Swap the relfilenodes of the old and new heaps. */ + swap_relfilenodes(tab->relid, OIDNewHeap); + + CommandCounterIncrement(); + + /* Destroy new heap with old filenode */ + object.classId = RelOid_pg_class; + object.objectId = OIDNewHeap; + object.objectSubId = 0; + + /* + * The new relation is local to our transaction and we know nothing + * depends on it, so DROP_RESTRICT should be OK. + */ + performDeletion(&object, DROP_RESTRICT); + /* performDeletion does CommandCounterIncrement at end */ + + /* + * Rebuild each index on the relation. We do not need + * CommandCounterIncrement() because rebuild_indexes does it. + */ + rebuild_indexes(tab->relid, indexes); + } + else + { + /* + * Test the current data within the table against new constraints + * generated by ALTER TABLE commands, but don't rebuild data. + */ + ATRewriteTable(tab, InvalidOid); + } + } + + /* + * Foreign key constraints are checked in a final pass, since + * (a) it's generally best to examine each one separately, and + * (b) it's at least theoretically possible that we have changed + * both relations of the foreign key, and we'd better have finished + * both rewrites before we try to read the tables. + */ + foreach(ltab, *wqueue) + { + AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab); + Relation rel = NULL; + List *lcon; + + foreach(lcon, tab->constraints) + { + NewConstraint *con = lfirst(lcon); + + if (con->contype == CONSTR_FOREIGN) + { + FkConstraint *fkconstraint = (FkConstraint *) con->qual; + Relation refrel; + + if (rel == NULL) + { + /* Long since locked, no need for another */ + rel = heap_open(tab->relid, NoLock); + } + + refrel = heap_open(con->refrelid, RowShareLock); + + validateForeignKeyConstraint(fkconstraint, rel, refrel); + + heap_close(refrel, NoLock); + } + } + + if (rel) + heap_close(rel, NoLock); + } +} + +/* + * ATRewriteTable: scan or rewrite one table + * + * OIDNewHeap is InvalidOid if we don't need to rewrite + */ +static void +ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap) +{ + Relation oldrel; + Relation newrel; + TupleDesc oldTupDesc; + TupleDesc newTupDesc; + bool needscan = false; int i; - int minattnum, - maxatts; - HeapTuple typeTuple; - Form_pg_type tform; - int attndims; - ObjectAddress myself, - referenced; + List *l; + EState *estate; /* - * Grab an exclusive lock on the target table, which we will NOT - * release until end of transaction. + * Open the relation(s). We have surely already locked the existing + * table. */ - rel = heap_open(myrelid, AccessExclusiveLock); + oldrel = heap_open(tab->relid, NoLock); + oldTupDesc = tab->oldDesc; + newTupDesc = RelationGetDescr(oldrel); /* includes all mods */ - if (rel->rd_rel->relkind != RELKIND_RELATION) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not a table", - RelationGetRelationName(rel)))); + if (OidIsValid(OIDNewHeap)) + newrel = heap_open(OIDNewHeap, AccessExclusiveLock); + else + newrel = NULL; /* - * permissions checking. this would normally be done in utility.c, - * but this particular routine is recursive. - * - * normally, only the owner of a class can change its schema. + * Generate the constraint and default execution states */ - if (!pg_class_ownercheck(myrelid, GetUserId())) + + estate = CreateExecutorState(); + + /* Build the needed expression execution states */ + foreach(l, tab->constraints) + { + NewConstraint *con = lfirst(l); + + switch (con->contype) + { + case CONSTR_CHECK: + needscan = true; + con->qualstate = (List *) + ExecPrepareExpr((Expr *) con->qual, estate); + break; + case CONSTR_FOREIGN: + /* Nothing to do here */ + break; + case CONSTR_NOTNULL: + needscan = true; + break; + default: + elog(ERROR, "unrecognized constraint type: %d", + (int) con->contype); + } + } + + foreach(l, tab->newvals) + { + NewColumnValue *ex = lfirst(l); + + needscan = true; + + ex->exprstate = ExecPrepareExpr((Expr *) ex->expr, estate); + } + + if (needscan) + { + ExprContext *econtext; + Datum *values; + char *nulls; + TupleTableSlot *oldslot; + TupleTableSlot *newslot; + HeapScanDesc scan; + HeapTuple tuple; + + econtext = GetPerTupleExprContext(estate); + + /* + * Make tuple slots for old and new tuples. Note that even when + * the tuples are the same, the tupDescs might not be (consider + * ADD COLUMN without a default). + */ + oldslot = MakeTupleTableSlot(); + ExecSetSlotDescriptor(oldslot, oldTupDesc, false); + newslot = MakeTupleTableSlot(); + ExecSetSlotDescriptor(newslot, newTupDesc, false); + + /* Preallocate values/nulls arrays (+1 in case natts==0) */ + i = Max(newTupDesc->natts, oldTupDesc->natts); + values = (Datum *) palloc(i * sizeof(Datum) + 1); + nulls = (char *) palloc(i * sizeof(char) + 1); + memset(values, 0, i * sizeof(Datum)); + memset(nulls, 'n', i * sizeof(char)); + + /* + * Scan through the rows, generating a new row if needed and then + * checking all the constraints. + */ + scan = heap_beginscan(oldrel, SnapshotNow, 0, NULL); + + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + if (newrel) + { + /* + * Extract data from old tuple. We can force to null any + * columns that are deleted according to the new tuple. + */ + int natts = oldTupDesc->natts; + bool isNull; + + for (i = 0; i < natts; i++) + { + if (newTupDesc->attrs[i]->attisdropped) + nulls[i] = 'n'; + else + { + values[i] = heap_getattr(tuple, + i + 1, + oldTupDesc, + &isNull); + if (isNull) + nulls[i] = 'n'; + else + nulls[i] = ' '; + } + } + + /* + * Process supplied expressions to replace selected columns. + * Expression inputs come from the old tuple. + */ + ExecStoreTuple(tuple, oldslot, InvalidBuffer, false); + econtext->ecxt_scantuple = oldslot; + + foreach(l, tab->newvals) + { + NewColumnValue *ex = lfirst(l); + + values[ex->attnum - 1] = ExecEvalExpr(ex->exprstate, + econtext, + &isNull, + NULL); + if (isNull) + nulls[ex->attnum - 1] = 'n'; + else + nulls[ex->attnum - 1] = ' '; + } + + tuple = heap_formtuple(newTupDesc, values, nulls); + } + + /* Now check any constraints on the possibly-changed tuple */ + ExecStoreTuple(tuple, newslot, InvalidBuffer, false); + econtext->ecxt_scantuple = newslot; + + foreach(l, tab->constraints) + { + NewConstraint *con = lfirst(l); + + switch (con->contype) + { + case CONSTR_CHECK: + if (!ExecQual(con->qualstate, econtext, true)) + ereport(ERROR, + (errcode(ERRCODE_CHECK_VIOLATION), + errmsg("check constraint \"%s\" is violated by some row", + con->name))); + break; + case CONSTR_NOTNULL: + { + Datum d; + bool isnull; + + d = heap_getattr(tuple, con->attnum, newTupDesc, + &isnull); + if (isnull) + ereport(ERROR, + (errcode(ERRCODE_NOT_NULL_VIOLATION), + errmsg("column \"%s\" contains null values", + get_attname(tab->relid, + con->attnum)))); + } + break; + case CONSTR_FOREIGN: + /* Nothing to do here */ + break; + default: + elog(ERROR, "unrecognized constraint type: %d", + (int) con->contype); + } + } + + /* Write the tuple out to the new relation */ + if (newrel) + { + simple_heap_insert(newrel, tuple); + + heap_freetuple(tuple); + } + + ResetExprContext(econtext); + + CHECK_FOR_INTERRUPTS(); + } + + heap_endscan(scan); + } + + FreeExecutorState(estate); + + heap_close(oldrel, NoLock); + if (newrel) + heap_close(newrel, NoLock); +} + +/* + * ATGetQueueEntry: find or create an entry in the ALTER TABLE work queue + */ +static AlteredTableInfo * +ATGetQueueEntry(List **wqueue, Relation rel) +{ + Oid relid = RelationGetRelid(rel); + AlteredTableInfo *tab; + List *ltab; + + foreach(ltab, *wqueue) + { + tab = (AlteredTableInfo *) lfirst(ltab); + if (tab->relid == relid) + return tab; + } + + /* + * Not there, so add it. Note that we make a copy of the relation's + * existing descriptor before anything interesting can happen to it. + */ + tab = (AlteredTableInfo *) palloc0(sizeof(AlteredTableInfo)); + tab->relid = relid; + tab->oldDesc = CreateTupleDescCopy(RelationGetDescr(rel)); + + *wqueue = lappend(*wqueue, tab); + + return tab; +} + +/* + * ATSimplePermissions + * + * - Ensure that it is a relation (or possibly a view) + * - Ensure this user is the owner + * - Ensure that it is not a system table + */ +static void +ATSimplePermissions(Relation rel, bool allowView) +{ + if (rel->rd_rel->relkind != RELKIND_RELATION) + { + if (allowView) + { + if (rel->rd_rel->relkind != RELKIND_VIEW) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a table or view", + RelationGetRelationName(rel)))); + } + else + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a table", + RelationGetRelationName(rel)))); + } + + /* Permissions checks */ + if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, RelationGetRelationName(rel)); @@ -1616,87 +2488,111 @@ AlterTableAddColumn(Oid myrelid, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("permission denied: \"%s\" is a system catalog", RelationGetRelationName(rel)))); +} +/* + * ATSimpleRecursion + * + * Simple table recursion sufficient for most ALTER TABLE operations. + * All direct and indirect children are processed in an unspecified order. + * Note that if a child inherits from the original table via multiple + * inheritance paths, it will be visited just once. + */ +static void +ATSimpleRecursion(List **wqueue, Relation rel, + AlterTableCmd *cmd, bool recurse) +{ /* - * Recurse to add the column to child classes, if requested. - * - * any permissions or problems with duplicate attributes will cause the - * whole transaction to abort, which is what we want -- all or - * nothing. + * Propagate to children if desired. Non-table relations never have + * children, so no need to search in that case. */ - if (recurse) + if (recurse && rel->rd_rel->relkind == RELKIND_RELATION) { + Oid relid = RelationGetRelid(rel); List *child, *children; - ColumnDef *colDefChild = copyObject(colDef); - /* Child should see column as singly inherited */ - colDefChild->inhcount = 1; - colDefChild->is_local = false; - - /* We only want direct inheritors */ - children = find_inheritance_children(myrelid); + /* this routine is actually in the planner */ + children = find_all_inheritors(relid); + /* + * find_all_inheritors does the recursive search of the + * inheritance hierarchy, so all we have to do is process all of + * the relids in the list that it returns. + */ foreach(child, children) { Oid childrelid = lfirsto(child); - HeapTuple tuple; - Form_pg_attribute childatt; Relation childrel; - if (childrelid == myrelid) + if (childrelid == relid) continue; + childrel = relation_open(childrelid, AccessExclusiveLock); + ATPrepCmd(wqueue, childrel, cmd, false, true); + relation_close(childrel, NoLock); + } + } +} - childrel = heap_open(childrelid, AccessExclusiveLock); +/* + * ATOneLevelRecursion + * + * Here, we visit only direct inheritance children. It is expected that + * the command's prep routine will recurse again to find indirect children. + * When using this technique, a multiply-inheriting child will be visited + * multiple times. + */ +static void +ATOneLevelRecursion(List **wqueue, Relation rel, + AlterTableCmd *cmd) +{ + Oid relid = RelationGetRelid(rel); + List *child, + *children; - /* Does child already have a column by this name? */ - attrdesc = heap_openr(AttributeRelationName, RowExclusiveLock); - tuple = SearchSysCacheCopyAttName(childrelid, colDef->colname); - if (!HeapTupleIsValid(tuple)) - { - /* No, recurse to add it normally */ - heap_close(attrdesc, RowExclusiveLock); - heap_close(childrel, NoLock); - AlterTableAddColumn(childrelid, true, colDefChild); - continue; - } - childatt = (Form_pg_attribute) GETSTRUCT(tuple); + /* this routine is actually in the planner */ + children = find_inheritance_children(relid); - /* Okay if child matches by type */ - if (typenameTypeId(colDef->typename) != childatt->atttypid || - colDef->typename->typmod != childatt->atttypmod) - ereport(ERROR, - (errcode(ERRCODE_DATATYPE_MISMATCH), - errmsg("child table \"%s\" has different type for column \"%s\"", - get_rel_name(childrelid), colDef->colname))); + foreach(child, children) + { + Oid childrelid = lfirsto(child); + Relation childrel; - /* - * XXX if we supported NOT NULL or defaults, would need to do - * more work here to verify child matches - */ - ereport(NOTICE, - (errmsg("merging definition of column \"%s\" for child \"%s\"", - colDef->colname, get_rel_name(childrelid)))); + childrel = relation_open(childrelid, AccessExclusiveLock); + ATPrepCmd(wqueue, childrel, cmd, true, true); + relation_close(childrel, NoLock); + } +} - /* Bump the existing child att's inhcount */ - childatt->attinhcount++; - simple_heap_update(attrdesc, &tuple->t_self, tuple); - CatalogUpdateIndexes(attrdesc, tuple); +/* + * ALTER TABLE ADD COLUMN + * + * Adds an additional attribute to a relation making the assumption that + * CHECK, NOT NULL, and FOREIGN KEY constraints will be removed from the + * AT_AddColumn AlterTableCmd by analyze.c and added as independent + * AlterTableCmd's. + */ +static void +ATPrepAddColumn(List **wqueue, Relation rel, bool recurse, + AlterTableCmd *cmd) +{ + /* + * Recurse to add the column to child classes, if requested. + * + * We must recurse one level at a time, so that multiply-inheriting + * children are visited the right number of times and end up with the + * right attinhcount. + */ + if (recurse) + { + AlterTableCmd *childCmd = copyObject(cmd); + ColumnDef *colDefChild = (ColumnDef *) childCmd->def; - /* - * Propagate any new CHECK constraints into the child table - * and its descendants - */ - if (colDef->constraints != NIL) - { - CommandCounterIncrement(); - AlterTableAddConstraint(childrelid, true, colDef->constraints); - } + /* Child should see column as singly inherited */ + colDefChild->inhcount = 1; + colDefChild->is_local = false; - heap_freetuple(tuple); - heap_close(attrdesc, RowExclusiveLock); - heap_close(childrel, NoLock); - } + ATOneLevelRecursion(wqueue, rel, childCmd); } else { @@ -1704,42 +2600,77 @@ AlterTableAddColumn(Oid myrelid, * If we are told not to recurse, there had better not be any * child tables; else the addition would put them out of step. */ - if (find_inheritance_children(myrelid) != NIL) + if (find_inheritance_children(RelationGetRelid(rel)) != NIL) ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("column must be added to child tables too"))); } +} + +static void +ATExecAddColumn(AlteredTableInfo *tab, Relation rel, + ColumnDef *colDef) +{ + Oid myrelid = RelationGetRelid(rel); + Relation pgclass, + attrdesc; + HeapTuple reltup; + HeapTuple attributeTuple; + Form_pg_attribute attribute; + FormData_pg_attribute attributeD; + int i; + int minattnum, + maxatts; + HeapTuple typeTuple; + Form_pg_type tform; + Expr *defval; + + attrdesc = heap_openr(AttributeRelationName, RowExclusiveLock); /* - * OK, get on with it... - * - * Implementation restrictions: because we don't touch the table rows, - * the new column values will initially appear to be NULLs. (This - * happens because the heap tuple access routines always check for - * attnum > # of attributes in tuple, and return NULL if so.) - * Therefore we can't support a DEFAULT value in SQL92-compliant - * fashion, and we also can't allow a NOT NULL constraint. - * - * We do allow CHECK constraints, even though these theoretically could - * fail for NULL rows (eg, CHECK (newcol IS NOT NULL)). + * Are we adding the column to a recursion child? If so, check whether + * to merge with an existing definition for the column. */ - if (colDef->raw_default || colDef->cooked_default) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("adding columns with defaults is not implemented"), - errhint("Add the column, then use ALTER TABLE SET DEFAULT."))); + if (colDef->inhcount > 0) + { + HeapTuple tuple; - if (colDef->is_not_null) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("adding NOT NULL columns is not implemented"), - errhint("Add the column, then use ALTER TABLE SET NOT NULL."))); + /* Does child already have a column by this name? */ + tuple = SearchSysCacheCopyAttName(myrelid, colDef->colname); + if (HeapTupleIsValid(tuple)) + { + Form_pg_attribute childatt = (Form_pg_attribute) GETSTRUCT(tuple); + + /* Okay if child matches by type */ + if (typenameTypeId(colDef->typename) != childatt->atttypid || + colDef->typename->typmod != childatt->atttypmod) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("child table \"%s\" has different type for column \"%s\"", + RelationGetRelationName(rel), colDef->colname))); + + /* Bump the existing child att's inhcount */ + childatt->attinhcount++; + simple_heap_update(attrdesc, &tuple->t_self, tuple); + CatalogUpdateIndexes(attrdesc, tuple); + + heap_freetuple(tuple); + + /* Inform the user about the merge */ + ereport(NOTICE, + (errmsg("merging definition of column \"%s\" for child \"%s\"", + colDef->colname, RelationGetRelationName(rel)))); + + heap_close(attrdesc, RowExclusiveLock); + return; + } + } pgclass = heap_openr(RelationRelationName, RowExclusiveLock); - reltup = SearchSysCache(RELOID, - ObjectIdGetDatum(myrelid), - 0, 0, 0); + reltup = SearchSysCacheCopy(RELOID, + ObjectIdGetDatum(myrelid), + 0, 0, 0); if (!HeapTupleIsValid(reltup)) elog(ERROR, "cache lookup failed for relation %u", myrelid); @@ -1766,13 +2697,6 @@ AlterTableAddColumn(Oid myrelid, MaxHeapAttributeNumber))); i = minattnum + 1; - attrdesc = heap_openr(AttributeRelationName, RowExclusiveLock); - - if (colDef->typename->arrayBounds) - attndims = length(colDef->typename->arrayBounds); - else - attndims = 0; - typeTuple = typenameType(colDef->typename); tform = (Form_pg_type) GETSTRUCT(typeTuple); @@ -1795,12 +2719,11 @@ AlterTableAddColumn(Oid myrelid, attribute->atttypmod = colDef->typename->typmod; attribute->attnum = i; attribute->attbyval = tform->typbyval; - attribute->attndims = attndims; + attribute->attndims = length(colDef->typename->arrayBounds); attribute->attstorage = tform->typstorage; attribute->attalign = tform->typalign; attribute->attnotnull = colDef->is_not_null; - attribute->atthasdef = (colDef->raw_default != NULL || - colDef->cooked_default != NULL); + attribute->atthasdef = false; attribute->attisdropped = false; attribute->attislocal = colDef->is_local; attribute->attinhcount = colDef->inhcount; @@ -1817,132 +2740,120 @@ AlterTableAddColumn(Oid myrelid, /* * Update number of attributes in pg_class tuple */ - newreltup = heap_copytuple(reltup); - - ((Form_pg_class) GETSTRUCT(newreltup))->relnatts = maxatts; + ((Form_pg_class) GETSTRUCT(reltup))->relnatts = maxatts; - simple_heap_update(pgclass, &newreltup->t_self, newreltup); + simple_heap_update(pgclass, &reltup->t_self, reltup); /* keep catalog indexes current */ - CatalogUpdateIndexes(pgclass, newreltup); + CatalogUpdateIndexes(pgclass, reltup); - heap_freetuple(newreltup); - ReleaseSysCache(reltup); + heap_freetuple(reltup); heap_close(pgclass, RowExclusiveLock); - heap_close(rel, NoLock); /* close rel but keep lock! */ + /* Make the attribute's catalog entry visible */ + CommandCounterIncrement(); /* - * Add datatype dependency for the new column. + * Store the DEFAULT, if any, in the catalogs */ - myself.classId = RelOid_pg_class; - myself.objectId = myrelid; - myself.objectSubId = i; - referenced.classId = RelOid_pg_type; - referenced.objectId = attribute->atttypid; - referenced.objectSubId = 0; - recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); + if (colDef->raw_default) + { + RawColumnDefault *rawEnt; - /* - * Make our catalog updates visible for subsequent steps. - */ - CommandCounterIncrement(); + rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault)); + rawEnt->attnum = attribute->attnum; + rawEnt->raw_default = copyObject(colDef->raw_default); + + /* + * This function is intended for CREATE TABLE, so it processes a + * _list_ of defaults, but we just do one. + */ + AddRelationRawConstraints(rel, makeList1(rawEnt), NIL); + + /* Make the additional catalog changes visible */ + CommandCounterIncrement(); + } /* - * Add any CHECK constraints attached to the new column. + * Tell Phase 3 to fill in the default expression, if there is one. + * + * If there is no default, Phase 3 doesn't have to do anything, + * because that effectively means that the default is NULL. The + * heap tuple access routines always check for attnum > # of attributes + * in tuple, and return NULL if so, so without any modification of + * the tuple data we will get the effect of NULL values in the new + * column. * - * To do this we must re-open the rel so that its new attr list gets - * loaded into the relcache. + * Note: we use build_column_default, and not just the cooked default + * returned by AddRelationRawConstraints, so that the right thing happens + * when a datatype's default applies. */ - if (colDef->constraints != NIL) + defval = (Expr *) build_column_default(rel, attribute->attnum); + if (defval) { - rel = heap_open(myrelid, AccessExclusiveLock); - AddRelationRawConstraints(rel, NIL, colDef->constraints); - heap_close(rel, NoLock); + NewColumnValue *newval; + + newval = (NewColumnValue *) palloc0(sizeof(NewColumnValue)); + newval->attnum = attribute->attnum; + newval->expr = defval; + + tab->newvals = lappend(tab->newvals, newval); } /* - * Automatically create the secondary relation for TOAST if it - * formerly had no such but now has toastable attributes. + * Add datatype dependency for the new column. */ - AlterTableCreateToastTable(myrelid, true); + add_column_datatype_dependency(myrelid, i, attribute->atttypid); +} + +/* + * Install a column's dependency on its datatype. + */ +static void +add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid) +{ + ObjectAddress myself, + referenced; + + myself.classId = RelOid_pg_class; + myself.objectId = relid; + myself.objectSubId = attnum; + referenced.classId = RelOid_pg_type; + referenced.objectId = typid; + referenced.objectSubId = 0; + recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); } /* * ALTER TABLE ALTER COLUMN DROP NOT NULL */ -void -AlterTableAlterColumnDropNotNull(Oid myrelid, bool recurse, - const char *colName) +static void +ATExecDropNotNull(Relation rel, const char *colName) { - Relation rel; HeapTuple tuple; AttrNumber attnum; Relation attr_rel; List *indexoidlist; List *indexoidscan; - rel = heap_open(myrelid, AccessExclusiveLock); - - if (rel->rd_rel->relkind != RELKIND_RELATION) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not a table", - RelationGetRelationName(rel)))); - - /* Permissions checks */ - if (!pg_class_ownercheck(myrelid, GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, - RelationGetRelationName(rel)); - - if (!allowSystemTableMods && IsSystemRelation(rel)) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("permission denied: \"%s\" is a system catalog", - RelationGetRelationName(rel)))); - /* - * Propagate to children if desired + * lookup the attribute */ - if (recurse) - { - List *child, - *children; - - /* this routine is actually in the planner */ - children = find_all_inheritors(myrelid); - - /* - * find_all_inheritors does the recursive search of the - * inheritance hierarchy, so all we have to do is process all of - * the relids in the list that it returns. - */ - foreach(child, children) - { - Oid childrelid = lfirsto(child); - - if (childrelid == myrelid) - continue; - AlterTableAlterColumnDropNotNull(childrelid, - false, colName); - } - } + attr_rel = heap_openr(AttributeRelationName, RowExclusiveLock); - /* now do the thing on this relation */ + tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName); - /* - * get the number of the attribute - */ - attnum = get_attnum(myrelid, colName); - if (attnum == InvalidAttrNumber) + if (!HeapTupleIsValid(tuple)) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN), - errmsg("column \"%s\" of relation \"%s\" does not exist", - colName, RelationGetRelationName(rel)))); + errmsg("column \"%s\" of relation \"%s\" does not exist", + colName, RelationGetRelationName(rel)))); + + attnum = ((Form_pg_attribute) GETSTRUCT(tuple))->attnum; /* Prevent them from altering a system attribute */ - if (attnum < 0) + if (attnum <= 0) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot alter system column \"%s\"", @@ -1992,221 +2903,92 @@ AlterTableAlterColumnDropNotNull(Oid myrelid, bool recurse, freeList(indexoidlist); /* - * Okay, actually perform the catalog change + * Okay, actually perform the catalog change ... if needed */ - attr_rel = heap_openr(AttributeRelationName, RowExclusiveLock); - - tuple = SearchSysCacheCopyAttName(myrelid, colName); - if (!HeapTupleIsValid(tuple)) /* shouldn't happen */ - elog(ERROR, "cache lookup failed for attribute \"%s\" of relation %u", - colName, myrelid); - - ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull = FALSE; + if (((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull) + { + ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull = FALSE; - simple_heap_update(attr_rel, &tuple->t_self, tuple); + simple_heap_update(attr_rel, &tuple->t_self, tuple); - /* keep the system catalog indexes current */ - CatalogUpdateIndexes(attr_rel, tuple); + /* keep the system catalog indexes current */ + CatalogUpdateIndexes(attr_rel, tuple); + } heap_close(attr_rel, RowExclusiveLock); - - heap_close(rel, NoLock); } /* * ALTER TABLE ALTER COLUMN SET NOT NULL */ -void -AlterTableAlterColumnSetNotNull(Oid myrelid, bool recurse, - const char *colName) +static void +ATExecSetNotNull(AlteredTableInfo *tab, Relation rel, + const char *colName) { - Relation rel; HeapTuple tuple; AttrNumber attnum; Relation attr_rel; - HeapScanDesc scan; - TupleDesc tupdesc; - - rel = heap_open(myrelid, AccessExclusiveLock); - - if (rel->rd_rel->relkind != RELKIND_RELATION) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not a table", - RelationGetRelationName(rel)))); - - /* Permissions checks */ - if (!pg_class_ownercheck(myrelid, GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, - RelationGetRelationName(rel)); - - if (!allowSystemTableMods && IsSystemRelation(rel)) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("permission denied: \"%s\" is a system catalog", - RelationGetRelationName(rel)))); + NewConstraint *newcon; /* - * Propagate to children if desired + * lookup the attribute */ - if (recurse) - { - List *child, - *children; - - /* this routine is actually in the planner */ - children = find_all_inheritors(myrelid); - - /* - * find_all_inheritors does the recursive search of the - * inheritance hierarchy, so all we have to do is process all of - * the relids in the list that it returns. - */ - foreach(child, children) - { - Oid childrelid = lfirsto(child); - - if (childrelid == myrelid) - continue; - AlterTableAlterColumnSetNotNull(childrelid, - false, colName); - } - } + attr_rel = heap_openr(AttributeRelationName, RowExclusiveLock); - /* now do the thing on this relation */ + tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName); - /* - * get the number of the attribute - */ - attnum = get_attnum(myrelid, colName); - if (attnum == InvalidAttrNumber) + if (!HeapTupleIsValid(tuple)) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN), - errmsg("column \"%s\" of relation \"%s\" does not exist", - colName, RelationGetRelationName(rel)))); + errmsg("column \"%s\" of relation \"%s\" does not exist", + colName, RelationGetRelationName(rel)))); + + attnum = ((Form_pg_attribute) GETSTRUCT(tuple))->attnum; /* Prevent them from altering a system attribute */ - if (attnum < 0) + if (attnum <= 0) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot alter system column \"%s\"", colName))); /* - * Perform a scan to ensure that there are no NULL values already in - * the relation + * Okay, actually perform the catalog change ... if needed */ - tupdesc = RelationGetDescr(rel); - - scan = heap_beginscan(rel, SnapshotNow, 0, NULL); - - while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + if (! ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull) { - Datum d; - bool isnull; - - d = heap_getattr(tuple, attnum, tupdesc, &isnull); - - if (isnull) - ereport(ERROR, - (errcode(ERRCODE_NOT_NULL_VIOLATION), - errmsg("column \"%s\" contains null values", - colName))); - } + ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull = TRUE; - heap_endscan(scan); + simple_heap_update(attr_rel, &tuple->t_self, tuple); - /* - * Okay, actually perform the catalog change - */ - attr_rel = heap_openr(AttributeRelationName, RowExclusiveLock); + /* keep the system catalog indexes current */ + CatalogUpdateIndexes(attr_rel, tuple); - tuple = SearchSysCacheCopyAttName(myrelid, colName); - if (!HeapTupleIsValid(tuple)) /* shouldn't happen */ - elog(ERROR, "cache lookup failed for attribute \"%s\" of relation %u", - colName, myrelid); + /* Tell Phase 3 to test the constraint */ + newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); + newcon->contype = CONSTR_NOTNULL; + newcon->attnum = attnum; + newcon->name = "NOT NULL"; - ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull = TRUE; - - simple_heap_update(attr_rel, &tuple->t_self, tuple); - - /* keep the system catalog indexes current */ - CatalogUpdateIndexes(attr_rel, tuple); + tab->constraints = lappend(tab->constraints, newcon); + } heap_close(attr_rel, RowExclusiveLock); - - heap_close(rel, NoLock); } /* * ALTER TABLE ALTER COLUMN SET/DROP DEFAULT */ -void -AlterTableAlterColumnDefault(Oid myrelid, bool recurse, - const char *colName, - Node *newDefault) +static void +ATExecColumnDefault(Relation rel, const char *colName, + Node *newDefault) { - Relation rel; AttrNumber attnum; - rel = heap_open(myrelid, AccessExclusiveLock); - - /* - * We allow defaults on views so that INSERT into a view can have - * default-ish behavior. This works because the rewriter substitutes - * default values into INSERTs before it expands rules. - */ - if (rel->rd_rel->relkind != RELKIND_RELATION && - rel->rd_rel->relkind != RELKIND_VIEW) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not a table or view", - RelationGetRelationName(rel)))); - - /* Permissions checks */ - if (!pg_class_ownercheck(myrelid, GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, - RelationGetRelationName(rel)); - - if (!allowSystemTableMods && IsSystemRelation(rel)) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("permission denied: \"%s\" is a system catalog", - RelationGetRelationName(rel)))); - - /* - * Propagate to children if desired - */ - if (recurse) - { - List *child, - *children; - - /* this routine is actually in the planner */ - children = find_all_inheritors(myrelid); - - /* - * find_all_inheritors does the recursive search of the - * inheritance hierarchy, so all we have to do is process all of - * the relids in the list that it returns. - */ - foreach(child, children) - { - Oid childrelid = lfirsto(child); - - if (childrelid == myrelid) - continue; - AlterTableAlterColumnDefault(childrelid, - false, colName, newDefault); - } - } - - /* now do the thing on this relation */ - /* * get the number of the attribute */ - attnum = get_attnum(myrelid, colName); + attnum = get_attnum(RelationGetRelid(rel), colName); if (attnum == InvalidAttrNumber) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN), @@ -2214,7 +2996,7 @@ AlterTableAlterColumnDefault(Oid myrelid, bool recurse, colName, RelationGetRelationName(rel)))); /* Prevent them from altering a system attribute */ - if (attnum < 0) + if (attnum <= 0) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot alter system column \"%s\"", @@ -2225,7 +3007,7 @@ AlterTableAlterColumnDefault(Oid myrelid, bool recurse, * safety, but at present we do not expect anything to depend on the * default. */ - RemoveAttrDefault(myrelid, attnum, DROP_RESTRICT, false); + RemoveAttrDefault(RelationGetRelid(rel), attnum, DROP_RESTRICT, false); if (newDefault) { @@ -2242,141 +3024,67 @@ AlterTableAlterColumnDefault(Oid myrelid, bool recurse, */ AddRelationRawConstraints(rel, makeList1(rawEnt), NIL); } - - heap_close(rel, NoLock); } /* - * ALTER TABLE ALTER COLUMN SET STATISTICS / STORAGE + * ALTER TABLE ALTER COLUMN SET STATISTICS */ -void -AlterTableAlterColumnFlags(Oid myrelid, bool recurse, - const char *colName, - Node *flagValue, const char *flagType) +static void +ATPrepSetStatistics(Relation rel, const char *colName, Node *flagValue) { - Relation rel; - int newtarget = 1; - char newstorage = 'p'; - Relation attrelation; - HeapTuple tuple; - Form_pg_attribute attrtuple; - - rel = relation_open(myrelid, AccessExclusiveLock); - /* - * Allow index for statistics case only + * We do our own permission checking because (a) we want to allow + * SET STATISTICS on indexes (for expressional index columns), and + * (b) we want to allow SET STATISTICS on system catalogs without + * requiring allowSystemTableMods to be turned on. */ - if (rel->rd_rel->relkind != RELKIND_RELATION) - { - if (rel->rd_rel->relkind != RELKIND_INDEX || *flagType != 'S') - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not a table", - RelationGetRelationName(rel)))); - } + if (rel->rd_rel->relkind != RELKIND_RELATION && + rel->rd_rel->relkind != RELKIND_INDEX) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a table or index", + RelationGetRelationName(rel)))); /* Permissions checks */ - if (!pg_class_ownercheck(myrelid, GetUserId())) + if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, RelationGetRelationName(rel)); +} - /* - * we allow statistics case for system tables - */ - if (*flagType != 'S' && !allowSystemTableMods && IsSystemRelation(rel)) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("permission denied: \"%s\" is a system catalog", - RelationGetRelationName(rel)))); +static void +ATExecSetStatistics(Relation rel, const char *colName, Node *newValue) +{ + int newtarget; + Relation attrelation; + HeapTuple tuple; + Form_pg_attribute attrtuple; + + Assert(IsA(newValue, Integer)); + newtarget = intVal(newValue); /* - * Check the supplied parameters before anything else + * Limit target to a sane range */ - if (*flagType == 'S') - { - /* STATISTICS */ - Assert(IsA(flagValue, Integer)); - newtarget = intVal(flagValue); - - /* - * Limit target to a sane range - */ - if (newtarget < -1) - { - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("statistics target %d is too low", - newtarget))); - } - else if (newtarget > 1000) - { - newtarget = 1000; - ereport(WARNING, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("lowering statistics target to %d", - newtarget))); - } - } - else if (*flagType == 'M') - { - /* STORAGE */ - char *storagemode; - - Assert(IsA(flagValue, String)); - storagemode = strVal(flagValue); - - if (strcasecmp(storagemode, "plain") == 0) - newstorage = 'p'; - else if (strcasecmp(storagemode, "external") == 0) - newstorage = 'e'; - else if (strcasecmp(storagemode, "extended") == 0) - newstorage = 'x'; - else if (strcasecmp(storagemode, "main") == 0) - newstorage = 'm'; - else - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("invalid storage type \"%s\"", - storagemode))); - } - else + if (newtarget < -1) { - elog(ERROR, "unrecognized alter-column type flag: %c", - (int) *flagType); + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("statistics target %d is too low", + newtarget))); } - - /* - * Propagate to children if desired - */ - if (recurse && rel->rd_rel->relkind == RELKIND_RELATION) + else if (newtarget > 1000) { - List *child, - *children; - - /* this routine is actually in the planner */ - children = find_all_inheritors(myrelid); - - /* - * find_all_inheritors does the recursive search of the - * inheritance hierarchy, so all we have to do is process all of - * the relids in the list that it returns. - */ - foreach(child, children) - { - Oid childrelid = lfirsto(child); - - if (childrelid == myrelid) - continue; - AlterTableAlterColumnFlags(childrelid, - false, colName, flagValue, flagType); - } + newtarget = 1000; + ereport(WARNING, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("lowering statistics target to %d", + newtarget))); } - /* now do the thing on this relation */ - attrelation = heap_openr(AttributeRelationName, RowExclusiveLock); - tuple = SearchSysCacheCopyAttName(myrelid, colName); + tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName); + if (!HeapTupleIsValid(tuple)) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN), @@ -2384,31 +3092,13 @@ AlterTableAlterColumnFlags(Oid myrelid, bool recurse, colName, RelationGetRelationName(rel)))); attrtuple = (Form_pg_attribute) GETSTRUCT(tuple); - if (attrtuple->attnum < 0) + if (attrtuple->attnum <= 0) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot alter system column \"%s\"", colName))); - /* - * Now change the appropriate field - */ - if (*flagType == 'S') - attrtuple->attstattarget = newtarget; - else if (*flagType == 'M') - { - /* - * safety check: do not allow toasted storage modes unless column - * datatype is TOAST-aware. - */ - if (newstorage == 'p' || TypeIsToastable(attrtuple->atttypid)) - attrtuple->attstorage = newstorage; - else - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("column data type %s can only have storage PLAIN", - format_type_be(attrtuple->atttypid)))); - } + attrtuple->attstattarget = newtarget; simple_heap_update(attrelation, &tuple->t_self, tuple); @@ -2418,85 +3108,110 @@ AlterTableAlterColumnFlags(Oid myrelid, bool recurse, heap_freetuple(tuple); heap_close(attrelation, RowExclusiveLock); - - heap_close(rel, NoLock); /* close rel, but keep lock! */ } /* - * ALTER TABLE SET WITH/WITHOUT OIDS + * ALTER TABLE ALTER COLUMN SET STORAGE */ -void -AlterTableAlterOids(Oid myrelid, bool setOid, bool recurse, - DropBehavior behavior) +static void +ATExecSetStorage(Relation rel, const char *colName, Node *newValue) { - Relation rel; - - rel = heap_open(myrelid, AccessExclusiveLock); + char *storagemode; + char newstorage; + Relation attrelation; + HeapTuple tuple; + Form_pg_attribute attrtuple; - /* - * check to see if we actually need to change anything - */ - if (rel->rd_rel->relhasoids == setOid) + Assert(IsA(newValue, String)); + storagemode = strVal(newValue); + + if (strcasecmp(storagemode, "plain") == 0) + newstorage = 'p'; + else if (strcasecmp(storagemode, "external") == 0) + newstorage = 'e'; + else if (strcasecmp(storagemode, "extended") == 0) + newstorage = 'x'; + else if (strcasecmp(storagemode, "main") == 0) + newstorage = 'm'; + else { - heap_close(rel, NoLock); /* close rel, but keep lock! */ - return; + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid storage type \"%s\"", + storagemode))); + newstorage = 0; /* keep compiler quiet */ } - if (setOid) - { - /* - * TODO: Generate the now required OID pg_attribute entry, and - * modify physical rows to have OIDs. - */ + attrelation = heap_openr(AttributeRelationName, RowExclusiveLock); + + tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName); + + if (!HeapTupleIsValid(tuple)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("column \"%s\" of relation \"%s\" does not exist", + colName, RelationGetRelationName(rel)))); + attrtuple = (Form_pg_attribute) GETSTRUCT(tuple); + + if (attrtuple->attnum <= 0) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("ALTER TABLE WITH OIDS is not yet implemented"))); - } + errmsg("cannot alter system column \"%s\"", + colName))); + + /* + * safety check: do not allow toasted storage modes unless column + * datatype is TOAST-aware. + */ + if (newstorage == 'p' || TypeIsToastable(attrtuple->atttypid)) + attrtuple->attstorage = newstorage; else - { - heap_close(rel, NoLock); /* close rel, but keep lock! */ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("column data type %s can only have storage PLAIN", + format_type_be(attrtuple->atttypid)))); - AlterTableDropColumn(myrelid, recurse, false, "oid", behavior); - } + simple_heap_update(attrelation, &tuple->t_self, tuple); + + /* keep system catalog indexes current */ + CatalogUpdateIndexes(attrelation, tuple); + + heap_freetuple(tuple); + + heap_close(attrelation, RowExclusiveLock); } + /* * ALTER TABLE DROP COLUMN + * + * DROP COLUMN cannot use the normal ALTER TABLE recursion mechanism, + * because we have to decide at runtime whether to recurse or not depending + * on whether attinhcount goes to zero or not. (We can't check this in a + * static pre-pass because it won't handle multiple inheritance situations + * correctly.) Since DROP COLUMN doesn't need to create any work queue + * entries for Phase 3, it's okay to recurse internally in this routine + * without considering the work queue. */ -void -AlterTableDropColumn(Oid myrelid, bool recurse, bool recursing, - const char *colName, - DropBehavior behavior) +static void +ATExecDropColumn(Relation rel, const char *colName, + DropBehavior behavior, + bool recurse, bool recursing) { - Relation rel; - AttrNumber attnum; HeapTuple tuple; Form_pg_attribute targetatt; + AttrNumber attnum; + List *children; ObjectAddress object; - rel = heap_open(myrelid, AccessExclusiveLock); - - if (rel->rd_rel->relkind != RELKIND_RELATION) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not a table", - RelationGetRelationName(rel)))); - - /* Permissions checks */ - if (!pg_class_ownercheck(myrelid, GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, - RelationGetRelationName(rel)); - - if (!allowSystemTableMods && IsSystemRelation(rel)) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("permission denied: \"%s\" is a system catalog", - RelationGetRelationName(rel)))); + /* At top level, permission check was done in ATPrepCmd, else do it */ + if (recursing) + ATSimplePermissions(rel, false); /* * get the number of the attribute */ - tuple = SearchSysCacheAttName(myrelid, colName); + tuple = SearchSysCacheAttName(RelationGetRelid(rel), colName); if (!HeapTupleIsValid(tuple)) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN), @@ -2523,65 +3238,16 @@ AlterTableDropColumn(Oid myrelid, bool recurse, bool recursing, ReleaseSysCache(tuple); /* - * If we are asked to drop ONLY in this table (no recursion), we need - * to mark the inheritors' attribute as locally defined rather than - * inherited. - */ - if (!recurse && !recursing) - { - Relation attr_rel; - List *child, - *children; - - /* We only want direct inheritors in this case */ - children = find_inheritance_children(myrelid); - - attr_rel = heap_openr(AttributeRelationName, RowExclusiveLock); - foreach(child, children) - { - Oid childrelid = lfirsto(child); - Relation childrel; - Form_pg_attribute childatt; - - childrel = heap_open(childrelid, AccessExclusiveLock); - - tuple = SearchSysCacheCopyAttName(childrelid, colName); - if (!HeapTupleIsValid(tuple)) /* shouldn't happen */ - elog(ERROR, "cache lookup failed for attribute \"%s\" of relation %u", - colName, childrelid); - childatt = (Form_pg_attribute) GETSTRUCT(tuple); - - if (childatt->attinhcount <= 0) /* shouldn't happen */ - elog(ERROR, "relation %u has non-inherited attribute \"%s\"", - childrelid, colName); - childatt->attinhcount--; - childatt->attislocal = true; - - simple_heap_update(attr_rel, &tuple->t_self, tuple); - - /* keep the system catalog indexes current */ - CatalogUpdateIndexes(attr_rel, tuple); - - heap_freetuple(tuple); - - heap_close(childrel, NoLock); - } - heap_close(attr_rel, RowExclusiveLock); - } - - /* - * Propagate to children if desired. Unlike most other ALTER + * Propagate to children as appropriate. Unlike most other ALTER * routines, we have to do this one level of recursion at a time; we * can't use find_all_inheritors to do it in one pass. */ - if (recurse) + children = find_inheritance_children(RelationGetRelid(rel)); + + if (children) { Relation attr_rel; - List *child, - *children; - - /* We only want direct inheritors in this case */ - children = find_inheritance_children(myrelid); + List *child; attr_rel = heap_openr(AttributeRelationName, RowExclusiveLock); foreach(child, children) @@ -2590,9 +3256,6 @@ AlterTableDropColumn(Oid myrelid, bool recurse, bool recursing, Relation childrel; Form_pg_attribute childatt; - if (childrelid == myrelid) - continue; - childrel = heap_open(childrelid, AccessExclusiveLock); tuple = SearchSysCacheCopyAttName(childrelid, colName); @@ -2605,20 +3268,49 @@ AlterTableDropColumn(Oid myrelid, bool recurse, bool recursing, elog(ERROR, "relation %u has non-inherited attribute \"%s\"", childrelid, colName); - if (childatt->attinhcount == 1 && !childatt->attislocal) + if (recurse) { - /* Time to delete this child column, too */ - AlterTableDropColumn(childrelid, true, true, colName, behavior); + /* + * If the child column has other definition sources, just + * decrement its inheritance count; if not, recurse to delete + * it. + */ + if (childatt->attinhcount == 1 && !childatt->attislocal) + { + /* Time to delete this child column, too */ + ATExecDropColumn(childrel, colName, behavior, true, true); + } + else + { + /* Child column must survive my deletion */ + childatt->attinhcount--; + + simple_heap_update(attr_rel, &tuple->t_self, tuple); + + /* keep the system catalog indexes current */ + CatalogUpdateIndexes(attr_rel, tuple); + + /* Make update visible */ + CommandCounterIncrement(); + } } else { - /* Child column must survive my deletion */ + /* + * If we were told to drop ONLY in this table (no recursion), + * we need to mark the inheritors' attribute as locally + * defined rather than inherited. + */ childatt->attinhcount--; + childatt->attislocal = true; simple_heap_update(attr_rel, &tuple->t_self, tuple); /* keep the system catalog indexes current */ CatalogUpdateIndexes(attr_rel, tuple); + + /* Make update visible */ + CommandCounterIncrement(); } heap_freetuple(tuple); @@ -2632,7 +3324,7 @@ AlterTableDropColumn(Oid myrelid, bool recurse, bool recursing, * Perform the actual column deletion */ object.classId = RelOid_pg_class; - object.objectId = myrelid; + object.objectId = RelationGetRelid(rel); object.objectSubId = attnum; performDeletion(&object, behavior); @@ -2648,10 +3340,11 @@ AlterTableDropColumn(Oid myrelid, bool recurse, bool recursing, class_rel = heap_openr(RelationRelationName, RowExclusiveLock); tuple = SearchSysCacheCopy(RELOID, - ObjectIdGetDatum(myrelid), + ObjectIdGetDatum(RelationGetRelid(rel)), 0, 0, 0); if (!HeapTupleIsValid(tuple)) - elog(ERROR, "cache lookup failed for relation %u", myrelid); + elog(ERROR, "cache lookup failed for relation %u", + RelationGetRelid(rel)); tuple_class = (Form_pg_class) GETSTRUCT(tuple); tuple_class->relhasoids = false; @@ -2662,298 +3355,149 @@ AlterTableDropColumn(Oid myrelid, bool recurse, bool recursing, heap_close(class_rel, RowExclusiveLock); } - - heap_close(rel, NoLock); /* close rel, but keep lock! */ } +/* + * ALTER TABLE ADD INDEX + * + * There is no such command in the grammar, but the parser converts UNIQUE + * and PRIMARY KEY constraints into AT_AddIndex subcommands. This lets us + * schedule creation of the index at the appropriate time during ALTER. + */ +static void +ATExecAddIndex(AlteredTableInfo *tab, Relation rel, + IndexStmt *stmt, bool is_rebuild) +{ + bool check_rights; + bool skip_build; + bool quiet; + + Assert(IsA(stmt, IndexStmt)); + + /* suppress schema rights check when rebuilding existing index */ + check_rights = !is_rebuild; + /* skip index build if phase 3 will have to rewrite table anyway */ + skip_build = (tab->newvals != NIL); + /* suppress notices when rebuilding existing index */ + quiet = is_rebuild; + + DefineIndex(stmt->relation, /* relation */ + stmt->idxname, /* index name */ + stmt->accessMethod, /* am name */ + stmt->indexParams, /* parameters */ + (Expr *) stmt->whereClause, + stmt->rangetable, + stmt->unique, + stmt->primary, + stmt->isconstraint, + true, /* is_alter_table */ + check_rights, + skip_build, + quiet); +} /* * ALTER TABLE ADD CONSTRAINT */ -void -AlterTableAddConstraint(Oid myrelid, bool recurse, - List *newConstraints) +static void +ATExecAddConstraint(AlteredTableInfo *tab, Relation rel, Node *newConstraint) { - Relation rel; - List *listptr; - int counter = 0; - - /* - * Grab an exclusive lock on the target table, which we will NOT - * release until end of transaction. - */ - rel = heap_open(myrelid, AccessExclusiveLock); - - if (rel->rd_rel->relkind != RELKIND_RELATION) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not a table", - RelationGetRelationName(rel)))); - - /* Permissions checks */ - if (!pg_class_ownercheck(myrelid, GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, - RelationGetRelationName(rel)); - - if (!allowSystemTableMods && IsSystemRelation(rel)) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("permission denied: \"%s\" is a system catalog", - RelationGetRelationName(rel)))); - - if (recurse) + switch (nodeTag(newConstraint)) { - List *child, - *children; - - /* this routine is actually in the planner */ - children = find_all_inheritors(myrelid); - - /* - * find_all_inheritors does the recursive search of the - * inheritance hierarchy, so all we have to do is process all of - * the relids in the list that it returns. - */ - foreach(child, children) + case T_Constraint: { - Oid childrelid = lfirsto(child); + Constraint *constr = (Constraint *) newConstraint; - if (childrelid == myrelid) - continue; - AlterTableAddConstraint(childrelid, false, newConstraints); - } - } - - foreach(listptr, newConstraints) - { - /* - * copy is because we may destructively alter the node below by - * inserting a generated name; this name is not necessarily - * correct for children or parents. - */ - Node *newConstraint = copyObject(lfirst(listptr)); - - switch (nodeTag(newConstraint)) - { - case T_Constraint: - { - Constraint *constr = (Constraint *) newConstraint; - - /* - * Assign or validate constraint name - */ - if (constr->name) - { - if (ConstraintNameIsUsed(CONSTRAINT_RELATION, - RelationGetRelid(rel), - RelationGetNamespace(rel), - constr->name)) - ereport(ERROR, - (errcode(ERRCODE_DUPLICATE_OBJECT), - errmsg("constraint \"%s\" for relation \"%s\" already exists", - constr->name, - RelationGetRelationName(rel)))); - } - else - constr->name = GenerateConstraintName(CONSTRAINT_RELATION, - RelationGetRelid(rel), - RelationGetNamespace(rel), - &counter); - - /* - * Currently, we only expect to see CONSTR_CHECK nodes - * arriving here (see the preprocessing done in - * parser/analyze.c). Use a switch anyway to make it - * easier to add more code later. - */ - switch (constr->contype) - { - case CONSTR_CHECK: - AlterTableAddCheckConstraint(rel, constr); - break; - default: - elog(ERROR, "unrecognized constraint type: %d", - (int) constr->contype); - } - break; - } - case T_FkConstraint: + /* + * Currently, we only expect to see CONSTR_CHECK nodes + * arriving here (see the preprocessing done in + * parser/analyze.c). Use a switch anyway to make it + * easier to add more code later. + */ + switch (constr->contype) + { + case CONSTR_CHECK: { - FkConstraint *fkconstraint = (FkConstraint *) newConstraint; + List *newcons; + List *lcon; /* - * Assign or validate constraint name + * Call AddRelationRawConstraints to do the work. + * It returns a list of cooked constraints. */ - if (fkconstraint->constr_name) - { - if (ConstraintNameIsUsed(CONSTRAINT_RELATION, - RelationGetRelid(rel), - RelationGetNamespace(rel), - fkconstraint->constr_name)) - ereport(ERROR, - (errcode(ERRCODE_DUPLICATE_OBJECT), - errmsg("constraint \"%s\" for relation \"%s\" already exists", - fkconstraint->constr_name, - RelationGetRelationName(rel)))); + newcons = AddRelationRawConstraints(rel, NIL, + makeList1(constr)); + /* Add each constraint to Phase 3's queue */ + foreach(lcon, newcons) + { + CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon); + NewConstraint *newcon; + + newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); + newcon->name = ccon->name; + newcon->contype = ccon->contype; + newcon->attnum = ccon->attnum; + /* ExecQual wants implicit-AND format */ + newcon->qual = (Node *) + make_ands_implicit((Expr *) ccon->expr); + + tab->constraints = lappend(tab->constraints, + newcon); } - else - fkconstraint->constr_name = GenerateConstraintName(CONSTRAINT_RELATION, - RelationGetRelid(rel), - RelationGetNamespace(rel), - &counter); - - AlterTableAddForeignKeyConstraint(rel, fkconstraint); - break; } - default: - elog(ERROR, "unrecognized node type: %d", - (int) nodeTag(newConstraint)); + default: + elog(ERROR, "unrecognized constraint type: %d", + (int) constr->contype); + } + break; } + case T_FkConstraint: + { + FkConstraint *fkconstraint = (FkConstraint *) newConstraint; - /* If we have multiple constraints to make, bump CC between 'em */ - if (lnext(listptr)) - CommandCounterIncrement(); - } - - /* Close rel, but keep lock till commit */ - heap_close(rel, NoLock); -} - -/* - * Add a check constraint to a single table - * - * Subroutine for AlterTableAddConstraint. Must already hold exclusive - * lock on the rel, and have done appropriate validity/permissions checks - * for it. - */ -static void -AlterTableAddCheckConstraint(Relation rel, Constraint *constr) -{ - ParseState *pstate; - bool successful = true; - HeapScanDesc scan; - EState *estate; - ExprContext *econtext; - TupleTableSlot *slot; - HeapTuple tuple; - RangeTblEntry *rte; - List *qual; - List *qualstate; - Node *expr; - - /* - * We need to make a parse state and range table to allow us to do - * transformExpr() - */ - pstate = make_parsestate(NULL); - rte = addRangeTableEntryForRelation(pstate, - RelationGetRelid(rel), - makeAlias(RelationGetRelationName(rel), NIL), - false, - true); - addRTEtoQuery(pstate, rte, true, true); - - /* - * Convert the A_EXPR in raw_expr into an EXPR - */ - expr = transformExpr(pstate, constr->raw_expr); - - /* - * Make sure it yields a boolean result. - */ - expr = coerce_to_boolean(pstate, expr, "CHECK"); - - /* - * Make sure no outside relations are referred to. - */ - if (length(pstate->p_rtable) != 1) - ereport(ERROR, - (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), - errmsg("check constraint may only reference relation \"%s\"", - RelationGetRelationName(rel)))); - - /* - * No subplans or aggregates, either... - */ - if (pstate->p_hasSubLinks) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot use subquery in check constraint"))); - if (pstate->p_hasAggs) - ereport(ERROR, - (errcode(ERRCODE_GROUPING_ERROR), - errmsg("cannot use aggregate function in check constraint"))); - - /* - * Might as well try to reduce any constant expressions, so as to - * minimize overhead while testing the constraint at each row. - * - * Note that the stored form of the constraint will NOT be const-folded. - */ - expr = eval_const_expressions(expr); - - /* Needs to be in implicit-ANDs form for ExecQual */ - qual = make_ands_implicit((Expr *) expr); - - /* Need an EState to run ExecQual */ - estate = CreateExecutorState(); - econtext = GetPerTupleExprContext(estate); - - /* build execution state for qual */ - qualstate = (List *) ExecPrepareExpr((Expr *) qual, estate); - - /* Make tuple slot to hold tuples */ - slot = MakeTupleTableSlot(); - ExecSetSlotDescriptor(slot, RelationGetDescr(rel), false); - - /* Arrange for econtext's scan tuple to be the tuple under test */ - econtext->ecxt_scantuple = slot; + /* + * Assign or validate constraint name + */ + if (fkconstraint->constr_name) + { + if (ConstraintNameIsUsed(CONSTRAINT_RELATION, + RelationGetRelid(rel), + RelationGetNamespace(rel), + fkconstraint->constr_name)) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("constraint \"%s\" for relation \"%s\" already exists", + fkconstraint->constr_name, + RelationGetRelationName(rel)))); + } + else + fkconstraint->constr_name = + GenerateConstraintName(CONSTRAINT_RELATION, + RelationGetRelid(rel), + RelationGetNamespace(rel), + &tab->constr_name_ctr); - /* - * Scan through the rows now, checking the expression at each row. - */ - scan = heap_beginscan(rel, SnapshotNow, 0, NULL); + ATAddForeignKeyConstraint(tab, rel, fkconstraint); - while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) - { - ExecStoreTuple(tuple, slot, InvalidBuffer, false); - if (!ExecQual(qualstate, econtext, true)) - { - successful = false; break; } - ResetExprContext(econtext); + default: + elog(ERROR, "unrecognized node type: %d", + (int) nodeTag(newConstraint)); } - - heap_endscan(scan); - - pfree(slot); - FreeExecutorState(estate); - - if (!successful) - ereport(ERROR, - (errcode(ERRCODE_CHECK_VIOLATION), - errmsg("check constraint \"%s\" is violated by some row", - constr->name))); - - /* - * Call AddRelationRawConstraints to do the real adding -- It - * duplicates some of the above, but does not check the validity of - * the constraint against tuples already in the table. - */ - AddRelationRawConstraints(rel, NIL, makeList1(constr)); } /* * Add a foreign-key constraint to a single table * - * Subroutine for AlterTableAddConstraint. Must already hold exclusive + * Subroutine for ATExecAddConstraint. Must already hold exclusive * lock on the rel, and have done appropriate validity/permissions checks * for it. */ static void -AlterTableAddForeignKeyConstraint(Relation rel, FkConstraint *fkconstraint) +ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, + FkConstraint *fkconstraint) { Relation pkrel; AclResult aclresult; @@ -3124,11 +3668,21 @@ AlterTableAddForeignKeyConstraint(Relation rel, FkConstraint *fkconstraint) } /* - * Check that the constraint is satisfied by existing rows (we can - * skip this during table creation). + * Tell Phase 3 to check that the constraint is satisfied by existing rows + * (we can skip this during table creation). */ if (!fkconstraint->skip_validation) - validateForeignKeyConstraint(fkconstraint, rel, pkrel); + { + NewConstraint *newcon; + + newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); + newcon->name = fkconstraint->constr_name; + newcon->contype = CONSTR_FOREIGN; + newcon->refrelid = RelationGetRelid(pkrel); + newcon->qual = (Node *) fkconstraint; + + tab->constraints = lappend(tab->constraints, newcon); + } /* * Record the FK constraint in pg_constraint. @@ -3728,94 +4282,626 @@ fkMatchTypeToString(char match_type) /* * ALTER TABLE DROP CONSTRAINT */ -void -AlterTableDropConstraint(Oid myrelid, bool recurse, - const char *constrName, - DropBehavior behavior) +static void +ATPrepDropConstraint(List **wqueue, Relation rel, + bool recurse, AlterTableCmd *cmd) { - Relation rel; - int deleted = 0; - /* - * Acquire an exclusive lock on the target relation for the duration - * of the operation. + * We don't want errors or noise from child tables, so we have to pass + * down a modified command. */ - rel = heap_open(myrelid, AccessExclusiveLock); + if (recurse) + { + AlterTableCmd *childCmd = copyObject(cmd); - /* Disallow DROP CONSTRAINT on views, indexes, sequences, etc */ - if (rel->rd_rel->relkind != RELKIND_RELATION) + childCmd->subtype = AT_DropConstraintQuietly; + ATSimpleRecursion(wqueue, rel, childCmd, recurse); + } +} + +static void +ATExecDropConstraint(Relation rel, const char *constrName, + DropBehavior behavior, bool quiet) +{ + int deleted; + + deleted = RemoveRelConstraints(rel, constrName, behavior); + + if (!quiet) + { + /* If zero constraints deleted, complain */ + if (deleted == 0) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("constraint \"%s\" does not exist", + constrName))); + /* Otherwise if more than one constraint deleted, notify */ + else if (deleted > 1) + ereport(NOTICE, + (errmsg("multiple constraints named \"%s\" were dropped", + constrName))); + } +} + +/* + * ALTER COLUMN TYPE + */ +static void +ATPrepAlterColumnType(List **wqueue, + AlteredTableInfo *tab, Relation rel, + bool recurse, bool recursing, + AlterTableCmd *cmd) +{ + char *colName = cmd->name; + TypeName *typename = (TypeName *) cmd->def; + HeapTuple tuple; + Form_pg_attribute attTup; + AttrNumber attnum; + Oid targettype; + Node *transform; + NewColumnValue *newval; + ParseState *pstate = make_parsestate(NULL); + + /* lookup the attribute so we can check inheritance status */ + tuple = SearchSysCacheAttName(RelationGetRelid(rel), colName); + if (!HeapTupleIsValid(tuple)) ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not a table", - RelationGetRelationName(rel)))); + (errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("column \"%s\" of relation \"%s\" does not exist", + colName, RelationGetRelationName(rel)))); + attTup = (Form_pg_attribute) GETSTRUCT(tuple); + attnum = attTup->attnum; - /* Permissions checks */ - if (!pg_class_ownercheck(myrelid, GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, - RelationGetRelationName(rel)); + /* Can't alter a system attribute */ + if (attnum <= 0) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot alter system column \"%s\"", + colName))); - if (!allowSystemTableMods && IsSystemRelation(rel)) + /* Don't alter inherited columns */ + if (attTup->attinhcount > 0 && !recursing) ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("permission denied: \"%s\" is a system catalog", - RelationGetRelationName(rel)))); + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("cannot alter inherited column \"%s\"", + colName))); + + /* Look up the target type */ + targettype = LookupTypeName(typename); + if (!OidIsValid(targettype)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("type \"%s\" does not exist", + TypeNameToString(typename)))); + + /* make sure datatype is legal for a column */ + CheckAttributeType(colName, targettype); /* - * Process child tables if requested. + * Set up an expression to transform the old data value to the new type. + * If a USING option was given, transform and use that expression, else + * just take the old value and try to coerce it. We do this first so + * that type incompatibility can be detected before we waste effort, + * and because we need the expression to be parsed against the original + * table rowtype. + */ + if (cmd->transform) + { + RangeTblEntry *rte; + + /* Expression must be able to access vars of old table */ + rte = addRangeTableEntryForRelation(pstate, + RelationGetRelid(rel), + makeAlias(RelationGetRelationName(rel), NIL), + false, + true); + addRTEtoQuery(pstate, rte, false, true); + + transform = transformExpr(pstate, cmd->transform); + + /* It can't return a set */ + if (expression_returns_set(transform)) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("transform expression must not return a set"))); + + /* No subplans or aggregates, either... */ + if (pstate->p_hasSubLinks) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use subquery in transform expression"))); + if (pstate->p_hasAggs) + ereport(ERROR, + (errcode(ERRCODE_GROUPING_ERROR), + errmsg("cannot use aggregate function in transform expression"))); + } + else + { + transform = (Node *) makeVar(1, attnum, + attTup->atttypid, attTup->atttypmod, + 0); + } + + transform = coerce_to_target_type(pstate, + transform, exprType(transform), + targettype, typename->typmod, + COERCION_ASSIGNMENT, + COERCE_IMPLICIT_CAST); + if (transform == NULL) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("column \"%s\" cannot be cast to type \"%s\"", + colName, TypeNameToString(typename)))); + + /* + * Add a work queue item to make ATRewriteTable update the column + * contents. + */ + newval = (NewColumnValue *) palloc0(sizeof(NewColumnValue)); + newval->attnum = attnum; + newval->expr = (Expr *) transform; + + tab->newvals = lappend(tab->newvals, newval); + + ReleaseSysCache(tuple); + + /* + * The recursion case is handled by ATSimpleRecursion. However, + * if we are told not to recurse, there had better not be any + * child tables; else the alter would put them out of step. */ if (recurse) + ATSimpleRecursion(wqueue, rel, cmd, recurse); + else if (!recursing && + find_inheritance_children(RelationGetRelid(rel)) != NIL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("type of inherited column \"%s\" must be changed in child tables too", + colName))); +} + +static void +ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel, + const char *colName, TypeName *typename) +{ + HeapTuple heapTup; + Form_pg_attribute attTup; + AttrNumber attnum; + HeapTuple typeTuple; + Form_pg_type tform; + Oid targettype; + Node *defaultexpr; + Relation attrelation; + Relation depRel; + ScanKeyData key[3]; + SysScanDesc scan; + HeapTuple depTup; + + attrelation = heap_openr(AttributeRelationName, RowExclusiveLock); + + /* Look up the target column */ + heapTup = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName); + if (!HeapTupleIsValid(heapTup)) /* shouldn't happen */ + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("column \"%s\" of relation \"%s\" does not exist", + colName, RelationGetRelationName(rel)))); + attTup = (Form_pg_attribute) GETSTRUCT(heapTup); + attnum = attTup->attnum; + + /* Check for multiple ALTER TYPE on same column --- can't cope */ + if (attTup->atttypid != tab->oldDesc->attrs[attnum-1]->atttypid || + attTup->atttypmod != tab->oldDesc->attrs[attnum-1]->atttypmod) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot alter type of column \"%s\" twice", + colName))); + + /* Look up the target type (should not fail, since prep found it) */ + typeTuple = typenameType(typename); + tform = (Form_pg_type) GETSTRUCT(typeTuple); + targettype = HeapTupleGetOid(typeTuple); + + /* + * If there is a default expression for the column, get it and ensure + * we can coerce it to the new datatype. (We must do this before + * changing the column type, because build_column_default itself will + * try to coerce, and will not issue the error message we want if it + * fails.) + */ + if (attTup->atthasdef) { - List *child, - *children; + defaultexpr = build_column_default(rel, attnum); + Assert(defaultexpr); + defaultexpr = coerce_to_target_type(NULL, /* no UNKNOWN params */ + defaultexpr, exprType(defaultexpr), + targettype, typename->typmod, + COERCION_ASSIGNMENT, + COERCE_IMPLICIT_CAST); + if (defaultexpr == NULL) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("default for column \"%s\" cannot be cast to type \"%s\"", + colName, TypeNameToString(typename)))); + } + else + defaultexpr = NULL; - /* This routine is actually in the planner */ - children = find_all_inheritors(myrelid); + /* + * Find everything that depends on the column (constraints, indexes, etc), + * and record enough information to let us recreate the objects. + * + * The actual recreation does not happen here, but only after we have + * performed all the individual ALTER TYPE operations. We have to save + * the info before executing ALTER TYPE, though, else the deparser will + * get confused. + * + * There could be multiple entries for the same object, so we must check + * to ensure we process each one only once. Note: we assume that an index + * that implements a constraint will not show a direct dependency on the + * column. + */ + depRel = heap_openr(DependRelationName, RowExclusiveLock); + + ScanKeyInit(&key[0], + Anum_pg_depend_refclassid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelOid_pg_class)); + ScanKeyInit(&key[1], + Anum_pg_depend_refobjid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(rel))); + ScanKeyInit(&key[2], + Anum_pg_depend_refobjsubid, + BTEqualStrategyNumber, F_INT4EQ, + Int32GetDatum((int32) attnum)); + + scan = systable_beginscan(depRel, DependReferenceIndex, true, + SnapshotNow, 3, key); + + while (HeapTupleIsValid(depTup = systable_getnext(scan))) + { + Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(depTup); + ObjectAddress foundObject; - /* - * find_all_inheritors does the recursive search of the - * inheritance hierarchy, so all we have to do is process all of - * the relids in the list that it returns. - */ - foreach(child, children) + /* We don't expect any PIN dependencies on columns */ + if (foundDep->deptype == DEPENDENCY_PIN) + elog(ERROR, "cannot alter type of a pinned column"); + + foundObject.classId = foundDep->classid; + foundObject.objectId = foundDep->objid; + foundObject.objectSubId = foundDep->objsubid; + + switch (getObjectClass(&foundObject)) { - Oid childrelid = lfirsto(child); - Relation inhrel; + case OCLASS_CLASS: + { + char relKind = get_rel_relkind(foundObject.objectId); - if (childrelid == myrelid) - continue; - inhrel = heap_open(childrelid, AccessExclusiveLock); - /* do NOT count child constraints in deleted. */ - RemoveRelConstraints(inhrel, constrName, behavior); - heap_close(inhrel, NoLock); + if (relKind == RELKIND_INDEX) + { + Assert(foundObject.objectSubId == 0); + if (!oidMember(foundObject.objectId, tab->changedIndexOids)) + { + tab->changedIndexOids = lappendo(tab->changedIndexOids, + foundObject.objectId); + tab->changedIndexDefs = lappend(tab->changedIndexDefs, + pg_get_indexdef_string(foundObject.objectId)); + } + } + else if (relKind == RELKIND_SEQUENCE) + { + /* + * This must be a SERIAL column's sequence. We need not + * do anything to it. + */ + Assert(foundObject.objectSubId == 0); + } + else + { + /* Not expecting any other direct dependencies... */ + elog(ERROR, "unexpected object depending on column: %s", + getObjectDescription(&foundObject)); + } + break; + } + + case OCLASS_CONSTRAINT: + Assert(foundObject.objectSubId == 0); + if (!oidMember(foundObject.objectId, tab->changedConstraintOids)) + { + tab->changedConstraintOids = lappendo(tab->changedConstraintOids, + foundObject.objectId); + tab->changedConstraintDefs = lappend(tab->changedConstraintDefs, + pg_get_constraintdef_string(foundObject.objectId)); + } + break; + + case OCLASS_REWRITE: + /* XXX someday see if we can cope with revising views */ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot alter type of a column used by a view or rule"), + errdetail("%s depends on column \"%s\"", + getObjectDescription(&foundObject), + colName))); + break; + + case OCLASS_DEFAULT: + /* + * Ignore the column's default expression, since we will fix + * it below. + */ + Assert(defaultexpr); + break; + + case OCLASS_PROC: + case OCLASS_TYPE: + case OCLASS_CAST: + case OCLASS_CONVERSION: + case OCLASS_LANGUAGE: + case OCLASS_OPERATOR: + case OCLASS_OPCLASS: + case OCLASS_TRIGGER: + case OCLASS_SCHEMA: + /* + * We don't expect any of these sorts of objects to depend + * on a column. + */ + elog(ERROR, "unexpected object depending on column: %s", + getObjectDescription(&foundObject)); + break; + + default: + elog(ERROR, "unrecognized object class: %u", + foundObject.classId); } } + systable_endscan(scan); + /* - * Now do the thing on this relation. + * Now scan for dependencies of this column on other things. The only + * thing we should find is the dependency on the column datatype, + * which we want to remove. */ - deleted += RemoveRelConstraints(rel, constrName, behavior); + ScanKeyInit(&key[0], + Anum_pg_depend_classid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelOid_pg_class)); + ScanKeyInit(&key[1], + Anum_pg_depend_objid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(rel))); + ScanKeyInit(&key[2], + Anum_pg_depend_objsubid, + BTEqualStrategyNumber, F_INT4EQ, + Int32GetDatum((int32) attnum)); + + scan = systable_beginscan(depRel, DependDependerIndex, true, + SnapshotNow, 3, key); + + while (HeapTupleIsValid(depTup = systable_getnext(scan))) + { + Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(depTup); - /* Close the target relation */ - heap_close(rel, NoLock); + if (foundDep->deptype != DEPENDENCY_NORMAL) + elog(ERROR, "found unexpected dependency type '%c'", + foundDep->deptype); + if (foundDep->classid != RelOid_pg_type || + foundDep->objid != attTup->atttypid) + elog(ERROR, "found unexpected dependency for column"); - /* If zero constraints deleted, complain */ - if (deleted == 0) - ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_OBJECT), - errmsg("constraint \"%s\" does not exist", - constrName))); - /* Otherwise if more than one constraint deleted, notify */ - else if (deleted > 1) - ereport(NOTICE, - (errmsg("multiple constraints named \"%s\" were dropped", - constrName))); + simple_heap_delete(depRel, &depTup->t_self); + } + + systable_endscan(scan); + + heap_close(depRel, RowExclusiveLock); + + /* + * Here we go --- change the recorded column type. (Note heapTup is + * a copy of the syscache entry, so okay to scribble on.) + */ + attTup->atttypid = targettype; + attTup->atttypmod = typename->typmod; + attTup->attndims = length(typename->arrayBounds); + attTup->attlen = tform->typlen; + attTup->attbyval = tform->typbyval; + attTup->attalign = tform->typalign; + attTup->attstorage = tform->typstorage; + + ReleaseSysCache(typeTuple); + + simple_heap_update(attrelation, &heapTup->t_self, heapTup); + + /* keep system catalog indexes current */ + CatalogUpdateIndexes(attrelation, heapTup); + + heap_close(attrelation, RowExclusiveLock); + + /* Install dependency on new datatype */ + add_column_datatype_dependency(RelationGetRelid(rel), attnum, targettype); + + /* Drop any pg_statistic entry for the column, since it's now wrong type */ + RemoveStatistics(rel, attnum); + + /* + * Update the default, if present, by brute force --- remove and re-add + * the default. Probably unsafe to take shortcuts, since the new version + * may well have additional dependencies. (It's okay to do this now, + * rather than after other ALTER TYPE commands, since the default won't + * depend on other column types.) + */ + if (defaultexpr) + { + /* Must make new row visible since it will be updated again */ + CommandCounterIncrement(); + + /* + * We use RESTRICT here for safety, but at present we do not expect + * anything to depend on the default. + */ + RemoveAttrDefault(RelationGetRelid(rel), attnum, DROP_RESTRICT, true); + + StoreAttrDefault(rel, attnum, nodeToString(defaultexpr)); + } + + /* Cleanup */ + heap_freetuple(heapTup); } /* + * Cleanup after we've finished all the ALTER TYPE operations for a + * particular relation. We have to drop and recreate all the indexes + * and constraints that depend on the altered columns. + */ +static void +ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab) +{ + ObjectAddress obj; + List *l; + + /* + * Re-parse the index and constraint definitions, and attach them to + * the appropriate work queue entries. We do this before dropping + * because in the case of a FOREIGN KEY constraint, we might not yet + * have exclusive lock on the table the constraint is attached to, + * and we need to get that before dropping. It's safe because the + * parser won't actually look at the catalogs to detect the existing + * entry. + */ + foreach(l, tab->changedIndexDefs) + { + ATPostAlterTypeParse((char *) lfirst(l), wqueue); + } + foreach(l, tab->changedConstraintDefs) + { + ATPostAlterTypeParse((char *) lfirst(l), wqueue); + } + + /* + * Now we can drop the existing constraints and indexes --- constraints + * first, since some of them might depend on the indexes. It should be + * okay to use DROP_RESTRICT here, since nothing else should be depending + * on these objects. + */ + if (tab->changedConstraintOids) + obj.classId = get_system_catalog_relid(ConstraintRelationName); + foreach(l, tab->changedConstraintOids) + { + obj.objectId = lfirsto(l); + obj.objectSubId = 0; + performDeletion(&obj, DROP_RESTRICT); + } + + obj.classId = RelOid_pg_class; + foreach(l, tab->changedIndexOids) + { + obj.objectId = lfirsto(l); + obj.objectSubId = 0; + performDeletion(&obj, DROP_RESTRICT); + } + + /* + * The objects will get recreated during subsequent passes over the + * work queue. + */ +} + +static void +ATPostAlterTypeParse(char *cmd, List **wqueue) +{ + List *raw_parsetree_list; + List *querytree_list; + List *list_item; + + /* + * We expect that we only have to do raw parsing and parse analysis, not + * any rule rewriting, since these will all be utility statements. + */ + raw_parsetree_list = raw_parser(cmd); + querytree_list = NIL; + foreach(list_item, raw_parsetree_list) + { + Node *parsetree = (Node *) lfirst(list_item); + + querytree_list = nconc(querytree_list, + parse_analyze(parsetree, NULL, 0)); + } + + /* + * Attach each generated command to the proper place in the work queue. + * Note this could result in creation of entirely new work-queue entries. + */ + foreach(list_item, querytree_list) + { + Query *query = (Query *) lfirst(list_item); + Relation rel; + AlteredTableInfo *tab; + + Assert(IsA(query, Query)); + Assert(query->commandType == CMD_UTILITY); + switch (nodeTag(query->utilityStmt)) + { + case T_IndexStmt: + { + IndexStmt *stmt = (IndexStmt *) query->utilityStmt; + AlterTableCmd *newcmd; + + rel = relation_openrv(stmt->relation, AccessExclusiveLock); + tab = ATGetQueueEntry(wqueue, rel); + newcmd = makeNode(AlterTableCmd); + newcmd->subtype = AT_ReAddIndex; + newcmd->def = (Node *) stmt; + tab->subcmds[AT_PASS_OLD_INDEX] = + lappend(tab->subcmds[AT_PASS_OLD_INDEX], newcmd); + relation_close(rel, NoLock); + break; + } + case T_AlterTableStmt: + { + AlterTableStmt *stmt = (AlterTableStmt *) query->utilityStmt; + List *lcmd; + + rel = relation_openrv(stmt->relation, AccessExclusiveLock); + tab = ATGetQueueEntry(wqueue, rel); + foreach(lcmd, stmt->cmds) + { + AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd); + + switch (cmd->subtype) + { + case AT_AddIndex: + cmd->subtype = AT_ReAddIndex; + tab->subcmds[AT_PASS_OLD_INDEX] = + lappend(tab->subcmds[AT_PASS_OLD_INDEX], cmd); + break; + case AT_AddConstraint: + tab->subcmds[AT_PASS_OLD_CONSTR] = + lappend(tab->subcmds[AT_PASS_OLD_CONSTR], cmd); + break; + default: + elog(ERROR, "unexpected statement type: %d", + (int) cmd->subtype); + } + } + relation_close(rel, NoLock); + break; + } + default: + elog(ERROR, "unexpected statement type: %d", + (int) nodeTag(query->utilityStmt)); + } + } +} + + +/* * ALTER TABLE OWNER */ -void -AlterTableOwner(Oid relationOid, int32 newOwnerSysId) +static void +ATExecChangeOwner(Oid relationOid, int32 newOwnerSysId) { Relation target_rel; Relation class_rel; @@ -3879,7 +4965,7 @@ AlterTableOwner(Oid relationOid, int32 newOwnerSysId) /* For each index, recursively change its ownership */ foreach(i, index_oid_list) - AlterTableOwner(lfirsto(i), newOwnerSysId); + ATExecChangeOwner(lfirsto(i), newOwnerSysId); freeList(index_oid_list); } @@ -3888,7 +4974,7 @@ AlterTableOwner(Oid relationOid, int32 newOwnerSysId) { /* If it has a toast table, recurse to change its ownership */ if (tuple_class->reltoastrelid != InvalidOid) - AlterTableOwner(tuple_class->reltoastrelid, newOwnerSysId); + ATExecChangeOwner(tuple_class->reltoastrelid, newOwnerSysId); } heap_freetuple(tuple); @@ -3901,25 +4987,22 @@ AlterTableOwner(Oid relationOid, int32 newOwnerSysId) * * The only thing we have to do is to change the indisclustered bits. */ -void -AlterTableClusterOn(Oid relOid, const char *indexName) +static void +ATExecClusterOn(Relation rel, const char *indexName) { - Relation rel, - pg_index; + Relation pg_index; List *index; Oid indexOid; HeapTuple indexTuple; Form_pg_index indexForm; - rel = heap_open(relOid, AccessExclusiveLock); - indexOid = get_relname_relid(indexName, rel->rd_rel->relnamespace); if (!OidIsValid(indexOid)) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("index \"%s\" for table \"%s\" does not exist", - indexName, NameStr(rel->rd_rel->relname)))); + indexName, RelationGetRelationName(rel)))); indexTuple = SearchSysCache(INDEXRELID, ObjectIdGetDatum(indexOid), @@ -3935,10 +5018,11 @@ AlterTableClusterOn(Oid relOid, const char *indexName) if (indexForm->indisclustered) { ReleaseSysCache(indexTuple); - heap_close(rel, NoLock); return; } + ReleaseSysCache(indexTuple); + pg_index = heap_openr(IndexRelationName, RowExclusiveLock); /* @@ -3977,14 +5061,15 @@ AlterTableClusterOn(Oid relOid, const char *indexName) } heap_close(pg_index, RowExclusiveLock); - - ReleaseSysCache(indexTuple); - - heap_close(rel, NoLock); /* close rel, but keep lock till commit */ } /* * ALTER TABLE CREATE TOAST TABLE + * + * Note: this is also invoked from outside this module; in such cases we + * expect the caller to have verified that the relation is a table and we + * have all the right permissions. Callers expect this function + * to end with CommandCounterIncrement if it makes any changes. */ void AlterTableCreateToastTable(Oid relOid, bool silent) @@ -4005,21 +5090,11 @@ AlterTableCreateToastTable(Oid relOid, bool silent) /* * Grab an exclusive lock on the target table, which we will NOT - * release until end of transaction. + * release until end of transaction. (This is probably redundant + * in all present uses...) */ rel = heap_open(relOid, AccessExclusiveLock); - if (rel->rd_rel->relkind != RELKIND_RELATION) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not a table", - RelationGetRelationName(rel)))); - - /* Permissions checks */ - if (!pg_class_ownercheck(relOid, GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, - RelationGetRelationName(rel)); - /* * Toast table is shared if and only if its parent is. * @@ -4149,7 +5224,7 @@ AlterTableCreateToastTable(Oid relOid, bool silent) toast_idxid = index_create(toast_relid, toast_idxname, indexInfo, BTREE_AM_OID, classObjectId, - true, false, true); + true, false, true, false); /* * Update toast rel's pg_class entry to show that it has an index. The |