diff options
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r-- | src/backend/commands/tablecmds.c | 3071 |
1 files changed, 2073 insertions, 998 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index a6e3a93d349..a9c307b80c3 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.102 2004/04/01 21:28:44 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.103 2004/05/05 04:48:45 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -24,27 +24,33 @@ #include "catalog/indexing.h" #include "catalog/namespace.h" #include "catalog/pg_constraint.h" +#include "catalog/pg_depend.h" #include "catalog/pg_inherits.h" #include "catalog/pg_namespace.h" #include "catalog/pg_opclass.h" #include "catalog/pg_trigger.h" #include "catalog/pg_type.h" #include "commands/cluster.h" +#include "commands/defrem.h" #include "commands/tablecmds.h" #include "commands/trigger.h" #include "executor/executor.h" +#include "lib/stringinfo.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "optimizer/clauses.h" #include "optimizer/plancat.h" #include "optimizer/prep.h" +#include "parser/analyze.h" #include "parser/gramparse.h" +#include "parser/parser.h" #include "parser/parse_clause.h" #include "parser/parse_coerce.h" #include "parser/parse_expr.h" #include "parser/parse_oper.h" #include "parser/parse_relation.h" #include "parser/parse_type.h" +#include "rewrite/rewriteHandler.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/fmgroids.h" @@ -76,6 +82,83 @@ typedef struct OnCommitItem static List *on_commits = NIL; +/* + * State information for ALTER TABLE + * + * The pending-work queue for an ALTER TABLE is a List of AlteredTableInfo + * structs, one for each table modified by the operation (the named table + * plus any child tables that are affected). We save lists of subcommands + * to apply to this table (possibly modified by parse transformation steps); + * these lists will be executed in Phase 2. If a Phase 3 step is needed, + * necessary information is stored in the constraints and newvals lists. + * + * Phase 2 is divided into multiple passes; subcommands are executed in + * a pass determined by subcommand type. + */ + +#define AT_PASS_DROP 0 /* DROP (all flavors) */ +#define AT_PASS_ALTER_TYPE 1 /* ALTER COLUMN TYPE */ +#define AT_PASS_OLD_INDEX 2 /* re-add existing indexes */ +#define AT_PASS_OLD_CONSTR 3 /* re-add existing constraints */ +#define AT_PASS_COL_ATTRS 4 /* set other column attributes */ +/* We could support a RENAME COLUMN pass here, but not currently used */ +#define AT_PASS_ADD_COL 5 /* ADD COLUMN */ +#define AT_PASS_ADD_INDEX 6 /* ADD indexes */ +#define AT_PASS_ADD_CONSTR 7 /* ADD constraints, defaults */ +#define AT_PASS_MISC 8 /* other stuff */ +#define AT_NUM_PASSES 9 + +typedef struct AlteredTableInfo +{ + /* Information saved before any work commences: */ + Oid relid; /* Relation to work on */ + TupleDesc oldDesc; /* Pre-modification tuple descriptor */ + /* Information saved by Phase 1 for Phase 2: */ + List *subcmds[AT_NUM_PASSES]; /* Lists of AlterTableCmd */ + /* Information saved by Phases 1/2 for Phase 3: */ + List *constraints; /* List of NewConstraint */ + List *newvals; /* List of NewColumnValue */ + /* Objects to rebuild after completing ALTER TYPE operations */ + List *changedConstraintOids; /* OIDs of constraints to rebuild */ + List *changedConstraintDefs; /* string definitions of same */ + List *changedIndexOids; /* OIDs of indexes to rebuild */ + List *changedIndexDefs; /* string definitions of same */ + /* Workspace for ATExecAddConstraint */ + int constr_name_ctr; +} AlteredTableInfo; + +/* Struct describing one new constraint to check in Phase 3 scan */ +typedef struct NewConstraint +{ + char *name; /* Constraint name, or NULL if none */ + ConstrType contype; /* CHECK, NOT_NULL, or FOREIGN */ + AttrNumber attnum; /* only relevant for NOT_NULL */ + Oid refrelid; /* PK rel, if FOREIGN */ + Node *qual; /* Check expr or FkConstraint struct */ + List *qualstate; /* Execution state for CHECK */ +} NewConstraint; + +/* + * Struct describing one new column value that needs to be computed during + * Phase 3 copy (this could be either a new column with a non-null default, or + * a column that we're changing the type of). Columns without such an entry + * are just copied from the old table during ATRewriteTable. Note that the + * expr is an expression over *old* table values. + */ +typedef struct NewColumnValue +{ + AttrNumber attnum; /* which column */ + Expr *expr; /* expression to compute */ + ExprState *exprstate; /* execution state */ +} NewColumnValue; + + +/* Used by attribute and relation renaming routines: */ +#define RI_TRIGGER_PK 1 /* is a trigger on the PK relation */ +#define RI_TRIGGER_FK 2 /* is a trigger on the FK relation */ +#define RI_TRIGGER_NONE 0 /* is not an RI trigger function */ + + static List *MergeAttributes(List *schema, List *supers, bool istemp, List **supOids, List **supconstr, int *supOidCount); static bool change_varattnos_of_a_node(Node *node, const AttrNumber *newattno); @@ -83,9 +166,6 @@ static void StoreCatalogInheritance(Oid relationId, List *supers); static int findAttrByName(const char *attributeName, List *schema); static void setRelhassubclassInRelation(Oid relationId, bool relhassubclass); static bool needs_toast_table(Relation rel); -static void AlterTableAddCheckConstraint(Relation rel, Constraint *constr); -static void AlterTableAddForeignKeyConstraint(Relation rel, - FkConstraint *fkconstraint); static int transformColumnNameList(Oid relId, List *colList, int16 *attnums, Oid *atttypids); static int transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid, @@ -100,13 +180,58 @@ static void validateForeignKeyConstraint(FkConstraint *fkconstraint, static void createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint, Oid constrOid); static char *fkMatchTypeToString(char match_type); - -/* Used by attribute and relation renaming routines: */ - -#define RI_TRIGGER_PK 1 /* is a trigger on the PK relation */ -#define RI_TRIGGER_FK 2 /* is a trigger on the FK relation */ -#define RI_TRIGGER_NONE 0 /* is not an RI trigger function */ - +static void ATController(Relation rel, List *cmds, bool recurse); +static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, + bool recurse, bool recursing); +static void ATRewriteCatalogs(List **wqueue); +static void ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd); +static void ATRewriteTables(List **wqueue); +static void ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap); +static AlteredTableInfo *ATGetQueueEntry(List **wqueue, Relation rel); +static void ATSimplePermissions(Relation rel, bool allowView); +static void ATSimpleRecursion(List **wqueue, Relation rel, + AlterTableCmd *cmd, bool recurse); +static void ATOneLevelRecursion(List **wqueue, Relation rel, + AlterTableCmd *cmd); +static void ATPrepAddColumn(List **wqueue, Relation rel, bool recurse, + AlterTableCmd *cmd); +static void ATExecAddColumn(AlteredTableInfo *tab, Relation rel, + ColumnDef *colDef); +static void add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid); +static void ATExecDropNotNull(Relation rel, const char *colName); +static void ATExecSetNotNull(AlteredTableInfo *tab, Relation rel, + const char *colName); +static void ATExecColumnDefault(Relation rel, const char *colName, + Node *newDefault); +static void ATPrepSetStatistics(Relation rel, const char *colName, + Node *flagValue); +static void ATExecSetStatistics(Relation rel, const char *colName, + Node *newValue); +static void ATExecSetStorage(Relation rel, const char *colName, + Node *newValue); +static void ATExecDropColumn(Relation rel, const char *colName, + DropBehavior behavior, + bool recurse, bool recursing); +static void ATExecAddIndex(AlteredTableInfo *tab, Relation rel, + IndexStmt *stmt, bool is_rebuild); +static void ATExecAddConstraint(AlteredTableInfo *tab, Relation rel, + Node *newConstraint); +static void ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, + FkConstraint *fkconstraint); +static void ATPrepDropConstraint(List **wqueue, Relation rel, + bool recurse, AlterTableCmd *cmd); +static void ATExecDropConstraint(Relation rel, const char *constrName, + DropBehavior behavior, bool quiet); +static void ATPrepAlterColumnType(List **wqueue, + AlteredTableInfo *tab, Relation rel, + bool recurse, bool recursing, + AlterTableCmd *cmd); +static void ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel, + const char *colName, TypeName *typename); +static void ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab); +static void ATPostAlterTypeParse(char *cmd, List **wqueue); +static void ATExecChangeOwner(Oid relationOid, int32 newOwnerSysId); +static void ATExecClusterOn(Relation rel, const char *indexName); static int ri_trigger_type(Oid tgfoid); static void update_ri_trigger_args(Oid relid, const char *oldname, @@ -1100,7 +1225,7 @@ renameatt(Oid myrelid, if (childrelid == myrelid) continue; - /* note we need not recurse again! */ + /* note we need not recurse again */ renameatt(childrelid, oldattname, newattname, false, true); } } @@ -1129,7 +1254,7 @@ renameatt(Oid myrelid, attform = (Form_pg_attribute) GETSTRUCT(atttup); attnum = attform->attnum; - if (attnum < 0) + if (attnum <= 0) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot rename system column \"%s\"", @@ -1240,8 +1365,7 @@ renameatt(Oid myrelid, true, false); } - relation_close(targetrelation, NoLock); /* close rel but keep - * lock! */ + relation_close(targetrelation, NoLock); /* close rel but keep lock */ } /* @@ -1559,55 +1683,803 @@ update_ri_trigger_args(Oid relid, CommandCounterIncrement(); } +/* + * AlterTable + * Execute ALTER TABLE, which can be a list of subcommands + * + * ALTER TABLE is performed in three phases: + * 1. Examine subcommands and perform pre-transformation checking. + * 2. Update system catalogs. + * 3. Scan table(s) to check new constraints, and optionally recopy + * the data into new table(s). + * Phase 3 is not performed unless one or more of the subcommands requires + * it. The intention of this design is to allow multiple independent + * updates of the table schema to be performed with only one pass over the + * data. + * + * ATPrepCmd performs phase 1. A "work queue" entry is created for + * each table to be affected (there may be multiple affected tables if the + * commands traverse a table inheritance hierarchy). Also we do preliminary + * validation of the subcommands, including parse transformation of those + * expressions that need to be evaluated with respect to the old table + * schema. + * + * ATRewriteCatalogs performs phase 2 for each affected table (note that + * phases 2 and 3 do no explicit recursion, since phase 1 already did it). + * Certain subcommands need to be performed before others to avoid + * unnecessary conflicts; for example, DROP COLUMN should come before + * ADD COLUMN. Therefore phase 1 divides the subcommands into multiple + * lists, one for each logical "pass" of phase 2. + * + * ATRewriteTables performs phase 3 for those tables that need it. + * + * Thanks to the magic of MVCC, an error anywhere along the way rolls back + * the whole operation; we don't have to do anything special to clean up. + */ +void +AlterTable(AlterTableStmt *stmt) +{ + ATController(relation_openrv(stmt->relation, AccessExclusiveLock), + stmt->cmds, + interpretInhOption(stmt->relation->inhOpt)); +} -/* ---------------- - * AlterTableAddColumn - * (formerly known as PerformAddAttribute) +/* + * AlterTableInternal * - * adds an additional attribute to a relation - * ---------------- + * ALTER TABLE with target specified by OID */ void -AlterTableAddColumn(Oid myrelid, - bool recurse, - ColumnDef *colDef) +AlterTableInternal(Oid relid, List *cmds, bool recurse) { - Relation rel, - pgclass, - attrdesc; - HeapTuple reltup; - HeapTuple newreltup; - HeapTuple attributeTuple; - Form_pg_attribute attribute; - FormData_pg_attribute attributeD; + ATController(relation_open(relid, AccessExclusiveLock), + cmds, + recurse); +} + +static void +ATController(Relation rel, List *cmds, bool recurse) +{ + List *wqueue = NIL; + List *lcmd; + + /* Phase 1: preliminary examination of commands, create work queue */ + foreach(lcmd, cmds) + { + AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd); + + ATPrepCmd(&wqueue, rel, cmd, recurse, false); + } + + /* Close the relation, but keep lock until commit */ + relation_close(rel, NoLock); + + /* Phase 2: update system catalogs */ + ATRewriteCatalogs(&wqueue); + + /* Phase 3: scan/rewrite tables as needed */ + ATRewriteTables(&wqueue); +} + +/* + * ATPrepCmd + * + * Traffic cop for ALTER TABLE Phase 1 operations, including simple + * recursion and permission checks. + * + * Caller must have acquired AccessExclusiveLock on relation already. + * This lock should be held until commit. + */ +static void +ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, + bool recurse, bool recursing) +{ + AlteredTableInfo *tab; + int pass; + + /* Find or create work queue entry for this table */ + tab = ATGetQueueEntry(wqueue, rel); + + /* + * Copy the original subcommand for each table. This avoids conflicts + * when different child tables need to make different parse + * transformations (for example, the same column may have different + * column numbers in different children). + */ + cmd = copyObject(cmd); + + /* + * Do permissions checking, recursion to child tables if needed, + * and any additional phase-1 processing needed. + */ + switch (cmd->subtype) + { + case AT_AddColumn: /* ADD COLUMN */ + ATSimplePermissions(rel, false); + /* Performs own recursion */ + ATPrepAddColumn(wqueue, rel, recurse, cmd); + pass = AT_PASS_ADD_COL; + break; + case AT_ColumnDefault: /* ALTER COLUMN DEFAULT */ + /* + * We allow defaults on views so that INSERT into a view can have + * default-ish behavior. This works because the rewriter + * substitutes default values into INSERTs before it expands + * rules. + */ + ATSimplePermissions(rel, true); + ATSimpleRecursion(wqueue, rel, cmd, recurse); + /* No command-specific prep needed */ + pass = AT_PASS_ADD_CONSTR; + break; + case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */ + ATSimplePermissions(rel, false); + ATSimpleRecursion(wqueue, rel, cmd, recurse); + /* No command-specific prep needed */ + pass = AT_PASS_DROP; + break; + case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */ + ATSimplePermissions(rel, false); + ATSimpleRecursion(wqueue, rel, cmd, recurse); + /* No command-specific prep needed */ + pass = AT_PASS_ADD_CONSTR; + break; + case AT_SetStatistics: /* ALTER COLUMN STATISTICS */ + ATSimpleRecursion(wqueue, rel, cmd, recurse); + /* Performs own permission checks */ + ATPrepSetStatistics(rel, cmd->name, cmd->def); + pass = AT_PASS_COL_ATTRS; + break; + case AT_SetStorage: /* ALTER COLUMN STORAGE */ + ATSimplePermissions(rel, false); + ATSimpleRecursion(wqueue, rel, cmd, recurse); + /* No command-specific prep needed */ + pass = AT_PASS_COL_ATTRS; + break; + case AT_DropColumn: /* DROP COLUMN */ + ATSimplePermissions(rel, false); + /* Recursion occurs during execution phase */ + /* No command-specific prep needed except saving recurse flag */ + if (recurse) + cmd->subtype = AT_DropColumnRecurse; + pass = AT_PASS_DROP; + break; + case AT_AddIndex: /* ADD INDEX */ + ATSimplePermissions(rel, false); + /* This command never recurses */ + /* No command-specific prep needed */ + pass = AT_PASS_ADD_INDEX; + break; + case AT_AddConstraint: /* ADD CONSTRAINT */ + ATSimplePermissions(rel, false); + /* + * Currently we recurse only for CHECK constraints, never for + * foreign-key constraints. UNIQUE/PKEY constraints won't be + * seen here. + */ + if (IsA(cmd->def, Constraint)) + ATSimpleRecursion(wqueue, rel, cmd, recurse); + /* No command-specific prep needed */ + pass = AT_PASS_ADD_CONSTR; + break; + case AT_DropConstraint: /* DROP CONSTRAINT */ + ATSimplePermissions(rel, false); + /* Performs own recursion */ + ATPrepDropConstraint(wqueue, rel, recurse, cmd); + pass = AT_PASS_DROP; + break; + case AT_DropConstraintQuietly: /* DROP CONSTRAINT for child */ + ATSimplePermissions(rel, false); + ATSimpleRecursion(wqueue, rel, cmd, recurse); + /* No command-specific prep needed */ + pass = AT_PASS_DROP; + break; + case AT_AlterColumnType: /* ALTER COLUMN TYPE */ + ATSimplePermissions(rel, false); + /* Performs own recursion */ + ATPrepAlterColumnType(wqueue, tab, rel, recurse, recursing, cmd); + pass = AT_PASS_ALTER_TYPE; + break; + case AT_ToastTable: /* CREATE TOAST TABLE */ + ATSimplePermissions(rel, false); + /* This command never recurses */ + /* No command-specific prep needed */ + pass = AT_PASS_MISC; + break; + case AT_ChangeOwner: /* ALTER OWNER */ + /* check that we are the superuser */ + if (!superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to alter owner"))); + /* This command never recurses */ + /* No command-specific prep needed */ + pass = AT_PASS_MISC; + break; + case AT_ClusterOn: /* CLUSTER ON */ + ATSimplePermissions(rel, false); + /* This command never recurses */ + /* No command-specific prep needed */ + pass = AT_PASS_MISC; + break; + case AT_DropOids: /* SET WITHOUT OIDS */ + ATSimplePermissions(rel, false); + /* Performs own recursion */ + if (rel->rd_rel->relhasoids) + { + AlterTableCmd *dropCmd = makeNode(AlterTableCmd); + + dropCmd->subtype = AT_DropColumn; + dropCmd->name = pstrdup("oid"); + dropCmd->behavior = cmd->behavior; + ATPrepCmd(wqueue, rel, dropCmd, recurse, false); + } + pass = AT_PASS_DROP; + break; + default: /* oops */ + elog(ERROR, "unrecognized alter table type: %d", + (int) cmd->subtype); + pass = 0; /* keep compiler quiet */ + break; + } + + /* Add the subcommand to the appropriate list for phase 2 */ + tab->subcmds[pass] = lappend(tab->subcmds[pass], cmd); +} + +/* + * ATRewriteCatalogs + * + * Traffic cop for ALTER TABLE Phase 2 operations. Subcommands are + * dispatched in a "safe" execution order (designed to avoid unnecessary + * conflicts). + */ +static void +ATRewriteCatalogs(List **wqueue) +{ + int pass; + List *ltab; + + /* + * We process all the tables "in parallel", one pass at a time. This + * is needed because we may have to propagate work from one table + * to another (specifically, ALTER TYPE on a foreign key's PK has to + * dispatch the re-adding of the foreign key constraint to the other + * table). Work can only be propagated into later passes, however. + */ + for (pass = 0; pass < AT_NUM_PASSES; pass++) + { + /* Go through each table that needs to be processed */ + foreach(ltab, *wqueue) + { + AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab); + List *subcmds = tab->subcmds[pass]; + Relation rel; + List *lcmd; + + if (subcmds == NIL) + continue; + + /* Exclusive lock was obtained by phase 1, needn't get it again */ + rel = relation_open(tab->relid, NoLock); + + foreach(lcmd, subcmds) + { + ATExecCmd(tab, rel, (AlterTableCmd *) lfirst(lcmd)); + } + + /* + * After the ALTER TYPE pass, do cleanup work (this is not done in + * ATExecAlterColumnType since it should be done only once if + * multiple columns of a table are altered). + */ + if (pass == AT_PASS_ALTER_TYPE) + ATPostAlterTypeCleanup(wqueue, tab); + + relation_close(rel, NoLock); + } + } + + /* + * Do an implicit CREATE TOAST TABLE if we executed any subcommands + * that might have added a column or changed column storage. + */ + foreach(ltab, *wqueue) + { + AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab); + + if (tab->subcmds[AT_PASS_ADD_COL] || + tab->subcmds[AT_PASS_ALTER_TYPE] || + tab->subcmds[AT_PASS_COL_ATTRS]) + { + AlterTableCreateToastTable(tab->relid, true); + } + } +} + +/* + * ATExecCmd: dispatch a subcommand to appropriate execution routine + */ +static void +ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd) +{ + switch (cmd->subtype) + { + case AT_AddColumn: /* ADD COLUMN */ + ATExecAddColumn(tab, rel, (ColumnDef *) cmd->def); + break; + case AT_ColumnDefault: /* ALTER COLUMN DEFAULT */ + ATExecColumnDefault(rel, cmd->name, cmd->def); + break; + case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */ + ATExecDropNotNull(rel, cmd->name); + break; + case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */ + ATExecSetNotNull(tab, rel, cmd->name); + break; + case AT_SetStatistics: /* ALTER COLUMN STATISTICS */ + ATExecSetStatistics(rel, cmd->name, cmd->def); + break; + case AT_SetStorage: /* ALTER COLUMN STORAGE */ + ATExecSetStorage(rel, cmd->name, cmd->def); + break; + case AT_DropColumn: /* DROP COLUMN */ + ATExecDropColumn(rel, cmd->name, cmd->behavior, false, false); + break; + case AT_DropColumnRecurse: /* DROP COLUMN with recursion */ + ATExecDropColumn(rel, cmd->name, cmd->behavior, true, false); + break; + case AT_AddIndex: /* ADD INDEX */ + ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, false); + break; + case AT_ReAddIndex: /* ADD INDEX */ + ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, true); + break; + case AT_AddConstraint: /* ADD CONSTRAINT */ + ATExecAddConstraint(tab, rel, cmd->def); + break; + case AT_DropConstraint: /* DROP CONSTRAINT */ + ATExecDropConstraint(rel, cmd->name, cmd->behavior, false); + break; + case AT_DropConstraintQuietly: /* DROP CONSTRAINT for child */ + ATExecDropConstraint(rel, cmd->name, cmd->behavior, true); + break; + case AT_AlterColumnType: /* ALTER COLUMN TYPE */ + ATExecAlterColumnType(tab, rel, cmd->name, (TypeName *) cmd->def); + break; + case AT_ToastTable: /* CREATE TOAST TABLE */ + AlterTableCreateToastTable(RelationGetRelid(rel), false); + break; + case AT_ChangeOwner: /* ALTER OWNER */ + /* get_usesysid raises an error if no such user */ + ATExecChangeOwner(RelationGetRelid(rel), get_usesysid(cmd->name)); + break; + case AT_ClusterOn: /* CLUSTER ON */ + ATExecClusterOn(rel, cmd->name); + break; + case AT_DropOids: /* SET WITHOUT OIDS */ + /* + * Nothing to do here; we'll have generated a DropColumn subcommand + * to do the real work + */ + break; + default: /* oops */ + elog(ERROR, "unrecognized alter table type: %d", + (int) cmd->subtype); + break; + } + + /* + * Bump the command counter to ensure the next subcommand in the sequence + * can see the changes so far + */ + CommandCounterIncrement(); +} + +/* + * ATRewriteTables: ALTER TABLE phase 3 + */ +static void +ATRewriteTables(List **wqueue) +{ + List *ltab; + + /* Go through each table that needs to be checked or rewritten */ + foreach(ltab, *wqueue) + { + AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab); + + /* + * We only need to rewrite the table if at least one column needs + * to be recomputed. + */ + if (tab->newvals != NIL) + { + /* Build a temporary relation and copy data */ + Oid OIDNewHeap; + char NewHeapName[NAMEDATALEN]; + List *indexes; + Relation OldHeap; + ObjectAddress object; + + /* Save the information about all indexes on the relation. */ + OldHeap = heap_open(tab->relid, NoLock); + indexes = get_indexattr_list(OldHeap, InvalidOid); + heap_close(OldHeap, NoLock); + + /* + * Create the new heap, using a temporary name in the same + * namespace as the existing table. NOTE: there is some risk of + * collision with user relnames. Working around this seems more + * trouble than it's worth; in particular, we can't create the new + * heap in a different namespace from the old, or we will have + * problems with the TEMP status of temp tables. + */ + snprintf(NewHeapName, sizeof(NewHeapName), + "pg_temp_%u", tab->relid); + + OIDNewHeap = make_new_heap(tab->relid, NewHeapName); + + /* + * Copy the heap data into the new table with the desired + * modifications, and test the current data within the table + * against new constraints generated by ALTER TABLE commands. + */ + ATRewriteTable(tab, OIDNewHeap); + + /* Swap the relfilenodes of the old and new heaps. */ + swap_relfilenodes(tab->relid, OIDNewHeap); + + CommandCounterIncrement(); + + /* Destroy new heap with old filenode */ + object.classId = RelOid_pg_class; + object.objectId = OIDNewHeap; + object.objectSubId = 0; + + /* + * The new relation is local to our transaction and we know nothing + * depends on it, so DROP_RESTRICT should be OK. + */ + performDeletion(&object, DROP_RESTRICT); + /* performDeletion does CommandCounterIncrement at end */ + + /* + * Rebuild each index on the relation. We do not need + * CommandCounterIncrement() because rebuild_indexes does it. + */ + rebuild_indexes(tab->relid, indexes); + } + else + { + /* + * Test the current data within the table against new constraints + * generated by ALTER TABLE commands, but don't rebuild data. + */ + ATRewriteTable(tab, InvalidOid); + } + } + + /* + * Foreign key constraints are checked in a final pass, since + * (a) it's generally best to examine each one separately, and + * (b) it's at least theoretically possible that we have changed + * both relations of the foreign key, and we'd better have finished + * both rewrites before we try to read the tables. + */ + foreach(ltab, *wqueue) + { + AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab); + Relation rel = NULL; + List *lcon; + + foreach(lcon, tab->constraints) + { + NewConstraint *con = lfirst(lcon); + + if (con->contype == CONSTR_FOREIGN) + { + FkConstraint *fkconstraint = (FkConstraint *) con->qual; + Relation refrel; + + if (rel == NULL) + { + /* Long since locked, no need for another */ + rel = heap_open(tab->relid, NoLock); + } + + refrel = heap_open(con->refrelid, RowShareLock); + + validateForeignKeyConstraint(fkconstraint, rel, refrel); + + heap_close(refrel, NoLock); + } + } + + if (rel) + heap_close(rel, NoLock); + } +} + +/* + * ATRewriteTable: scan or rewrite one table + * + * OIDNewHeap is InvalidOid if we don't need to rewrite + */ +static void +ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap) +{ + Relation oldrel; + Relation newrel; + TupleDesc oldTupDesc; + TupleDesc newTupDesc; + bool needscan = false; int i; - int minattnum, - maxatts; - HeapTuple typeTuple; - Form_pg_type tform; - int attndims; - ObjectAddress myself, - referenced; + List *l; + EState *estate; /* - * Grab an exclusive lock on the target table, which we will NOT - * release until end of transaction. + * Open the relation(s). We have surely already locked the existing + * table. */ - rel = heap_open(myrelid, AccessExclusiveLock); + oldrel = heap_open(tab->relid, NoLock); + oldTupDesc = tab->oldDesc; + newTupDesc = RelationGetDescr(oldrel); /* includes all mods */ - if (rel->rd_rel->relkind != RELKIND_RELATION) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not a table", - RelationGetRelationName(rel)))); + if (OidIsValid(OIDNewHeap)) + newrel = heap_open(OIDNewHeap, AccessExclusiveLock); + else + newrel = NULL; /* - * permissions checking. this would normally be done in utility.c, - * but this particular routine is recursive. - * - * normally, only the owner of a class can change its schema. + * Generate the constraint and default execution states */ - if (!pg_class_ownercheck(myrelid, GetUserId())) + + estate = CreateExecutorState(); + + /* Build the needed expression execution states */ + foreach(l, tab->constraints) + { + NewConstraint *con = lfirst(l); + + switch (con->contype) + { + case CONSTR_CHECK: + needscan = true; + con->qualstate = (List *) + ExecPrepareExpr((Expr *) con->qual, estate); + break; + case CONSTR_FOREIGN: + /* Nothing to do here */ + break; + case CONSTR_NOTNULL: + needscan = true; + break; + default: + elog(ERROR, "unrecognized constraint type: %d", + (int) con->contype); + } + } + + foreach(l, tab->newvals) + { + NewColumnValue *ex = lfirst(l); + + needscan = true; + + ex->exprstate = ExecPrepareExpr((Expr *) ex->expr, estate); + } + + if (needscan) + { + ExprContext *econtext; + Datum *values; + char *nulls; + TupleTableSlot *oldslot; + TupleTableSlot *newslot; + HeapScanDesc scan; + HeapTuple tuple; + + econtext = GetPerTupleExprContext(estate); + + /* + * Make tuple slots for old and new tuples. Note that even when + * the tuples are the same, the tupDescs might not be (consider + * ADD COLUMN without a default). + */ + oldslot = MakeTupleTableSlot(); + ExecSetSlotDescriptor(oldslot, oldTupDesc, false); + newslot = MakeTupleTableSlot(); + ExecSetSlotDescriptor(newslot, newTupDesc, false); + + /* Preallocate values/nulls arrays (+1 in case natts==0) */ + i = Max(newTupDesc->natts, oldTupDesc->natts); + values = (Datum *) palloc(i * sizeof(Datum) + 1); + nulls = (char *) palloc(i * sizeof(char) + 1); + memset(values, 0, i * sizeof(Datum)); + memset(nulls, 'n', i * sizeof(char)); + + /* + * Scan through the rows, generating a new row if needed and then + * checking all the constraints. + */ + scan = heap_beginscan(oldrel, SnapshotNow, 0, NULL); + + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + if (newrel) + { + /* + * Extract data from old tuple. We can force to null any + * columns that are deleted according to the new tuple. + */ + int natts = oldTupDesc->natts; + bool isNull; + + for (i = 0; i < natts; i++) + { + if (newTupDesc->attrs[i]->attisdropped) + nulls[i] = 'n'; + else + { + values[i] = heap_getattr(tuple, + i + 1, + oldTupDesc, + &isNull); + if (isNull) + nulls[i] = 'n'; + else + nulls[i] = ' '; + } + } + + /* + * Process supplied expressions to replace selected columns. + * Expression inputs come from the old tuple. + */ + ExecStoreTuple(tuple, oldslot, InvalidBuffer, false); + econtext->ecxt_scantuple = oldslot; + + foreach(l, tab->newvals) + { + NewColumnValue *ex = lfirst(l); + + values[ex->attnum - 1] = ExecEvalExpr(ex->exprstate, + econtext, + &isNull, + NULL); + if (isNull) + nulls[ex->attnum - 1] = 'n'; + else + nulls[ex->attnum - 1] = ' '; + } + + tuple = heap_formtuple(newTupDesc, values, nulls); + } + + /* Now check any constraints on the possibly-changed tuple */ + ExecStoreTuple(tuple, newslot, InvalidBuffer, false); + econtext->ecxt_scantuple = newslot; + + foreach(l, tab->constraints) + { + NewConstraint *con = lfirst(l); + + switch (con->contype) + { + case CONSTR_CHECK: + if (!ExecQual(con->qualstate, econtext, true)) + ereport(ERROR, + (errcode(ERRCODE_CHECK_VIOLATION), + errmsg("check constraint \"%s\" is violated by some row", + con->name))); + break; + case CONSTR_NOTNULL: + { + Datum d; + bool isnull; + + d = heap_getattr(tuple, con->attnum, newTupDesc, + &isnull); + if (isnull) + ereport(ERROR, + (errcode(ERRCODE_NOT_NULL_VIOLATION), + errmsg("column \"%s\" contains null values", + get_attname(tab->relid, + con->attnum)))); + } + break; + case CONSTR_FOREIGN: + /* Nothing to do here */ + break; + default: + elog(ERROR, "unrecognized constraint type: %d", + (int) con->contype); + } + } + + /* Write the tuple out to the new relation */ + if (newrel) + { + simple_heap_insert(newrel, tuple); + + heap_freetuple(tuple); + } + + ResetExprContext(econtext); + + CHECK_FOR_INTERRUPTS(); + } + + heap_endscan(scan); + } + + FreeExecutorState(estate); + + heap_close(oldrel, NoLock); + if (newrel) + heap_close(newrel, NoLock); +} + +/* + * ATGetQueueEntry: find or create an entry in the ALTER TABLE work queue + */ +static AlteredTableInfo * +ATGetQueueEntry(List **wqueue, Relation rel) +{ + Oid relid = RelationGetRelid(rel); + AlteredTableInfo *tab; + List *ltab; + + foreach(ltab, *wqueue) + { + tab = (AlteredTableInfo *) lfirst(ltab); + if (tab->relid == relid) + return tab; + } + + /* + * Not there, so add it. Note that we make a copy of the relation's + * existing descriptor before anything interesting can happen to it. + */ + tab = (AlteredTableInfo *) palloc0(sizeof(AlteredTableInfo)); + tab->relid = relid; + tab->oldDesc = CreateTupleDescCopy(RelationGetDescr(rel)); + + *wqueue = lappend(*wqueue, tab); + + return tab; +} + +/* + * ATSimplePermissions + * + * - Ensure that it is a relation (or possibly a view) + * - Ensure this user is the owner + * - Ensure that it is not a system table + */ +static void +ATSimplePermissions(Relation rel, bool allowView) +{ + if (rel->rd_rel->relkind != RELKIND_RELATION) + { + if (allowView) + { + if (rel->rd_rel->relkind != RELKIND_VIEW) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a table or view", + RelationGetRelationName(rel)))); + } + else + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a table", + RelationGetRelationName(rel)))); + } + + /* Permissions checks */ + if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, RelationGetRelationName(rel)); @@ -1616,87 +2488,111 @@ AlterTableAddColumn(Oid myrelid, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("permission denied: \"%s\" is a system catalog", RelationGetRelationName(rel)))); +} +/* + * ATSimpleRecursion + * + * Simple table recursion sufficient for most ALTER TABLE operations. + * All direct and indirect children are processed in an unspecified order. + * Note that if a child inherits from the original table via multiple + * inheritance paths, it will be visited just once. + */ +static void +ATSimpleRecursion(List **wqueue, Relation rel, + AlterTableCmd *cmd, bool recurse) +{ /* - * Recurse to add the column to child classes, if requested. - * - * any permissions or problems with duplicate attributes will cause the - * whole transaction to abort, which is what we want -- all or - * nothing. + * Propagate to children if desired. Non-table relations never have + * children, so no need to search in that case. */ - if (recurse) + if (recurse && rel->rd_rel->relkind == RELKIND_RELATION) { + Oid relid = RelationGetRelid(rel); List *child, *children; - ColumnDef *colDefChild = copyObject(colDef); - /* Child should see column as singly inherited */ - colDefChild->inhcount = 1; - colDefChild->is_local = false; - - /* We only want direct inheritors */ - children = find_inheritance_children(myrelid); + /* this routine is actually in the planner */ + children = find_all_inheritors(relid); + /* + * find_all_inheritors does the recursive search of the + * inheritance hierarchy, so all we have to do is process all of + * the relids in the list that it returns. + */ foreach(child, children) { Oid childrelid = lfirsto(child); - HeapTuple tuple; - Form_pg_attribute childatt; Relation childrel; - if (childrelid == myrelid) + if (childrelid == relid) continue; + childrel = relation_open(childrelid, AccessExclusiveLock); + ATPrepCmd(wqueue, childrel, cmd, false, true); + relation_close(childrel, NoLock); + } + } +} - childrel = heap_open(childrelid, AccessExclusiveLock); +/* + * ATOneLevelRecursion + * + * Here, we visit only direct inheritance children. It is expected that + * the command's prep routine will recurse again to find indirect children. + * When using this technique, a multiply-inheriting child will be visited + * multiple times. + */ +static void +ATOneLevelRecursion(List **wqueue, Relation rel, + AlterTableCmd *cmd) +{ + Oid relid = RelationGetRelid(rel); + List *child, + *children; - /* Does child already have a column by this name? */ - attrdesc = heap_openr(AttributeRelationName, RowExclusiveLock); - tuple = SearchSysCacheCopyAttName(childrelid, colDef->colname); - if (!HeapTupleIsValid(tuple)) - { - /* No, recurse to add it normally */ - heap_close(attrdesc, RowExclusiveLock); - heap_close(childrel, NoLock); - AlterTableAddColumn(childrelid, true, colDefChild); - continue; - } - childatt = (Form_pg_attribute) GETSTRUCT(tuple); + /* this routine is actually in the planner */ + children = find_inheritance_children(relid); - /* Okay if child matches by type */ - if (typenameTypeId(colDef->typename) != childatt->atttypid || - colDef->typename->typmod != childatt->atttypmod) - ereport(ERROR, - (errcode(ERRCODE_DATATYPE_MISMATCH), - errmsg("child table \"%s\" has different type for column \"%s\"", - get_rel_name(childrelid), colDef->colname))); + foreach(child, children) + { + Oid childrelid = lfirsto(child); + Relation childrel; - /* - * XXX if we supported NOT NULL or defaults, would need to do - * more work here to verify child matches - */ - ereport(NOTICE, - (errmsg("merging definition of column \"%s\" for child \"%s\"", - colDef->colname, get_rel_name(childrelid)))); + childrel = relation_open(childrelid, AccessExclusiveLock); + ATPrepCmd(wqueue, childrel, cmd, true, true); + relation_close(childrel, NoLock); + } +} - /* Bump the existing child att's inhcount */ - childatt->attinhcount++; - simple_heap_update(attrdesc, &tuple->t_self, tuple); - CatalogUpdateIndexes(attrdesc, tuple); +/* + * ALTER TABLE ADD COLUMN + * + * Adds an additional attribute to a relation making the assumption that + * CHECK, NOT NULL, and FOREIGN KEY constraints will be removed from the + * AT_AddColumn AlterTableCmd by analyze.c and added as independent + * AlterTableCmd's. + */ +static void +ATPrepAddColumn(List **wqueue, Relation rel, bool recurse, + AlterTableCmd *cmd) +{ + /* + * Recurse to add the column to child classes, if requested. + * + * We must recurse one level at a time, so that multiply-inheriting + * children are visited the right number of times and end up with the + * right attinhcount. + */ + if (recurse) + { + AlterTableCmd *childCmd = copyObject(cmd); + ColumnDef *colDefChild = (ColumnDef *) childCmd->def; - /* - * Propagate any new CHECK constraints into the child table - * and its descendants - */ - if (colDef->constraints != NIL) - { - CommandCounterIncrement(); - AlterTableAddConstraint(childrelid, true, colDef->constraints); - } + /* Child should see column as singly inherited */ + colDefChild->inhcount = 1; + colDefChild->is_local = false; - heap_freetuple(tuple); - heap_close(attrdesc, RowExclusiveLock); - heap_close(childrel, NoLock); - } + ATOneLevelRecursion(wqueue, rel, childCmd); } else { @@ -1704,42 +2600,77 @@ AlterTableAddColumn(Oid myrelid, * If we are told not to recurse, there had better not be any * child tables; else the addition would put them out of step. */ - if (find_inheritance_children(myrelid) != NIL) + if (find_inheritance_children(RelationGetRelid(rel)) != NIL) ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("column must be added to child tables too"))); } +} + +static void +ATExecAddColumn(AlteredTableInfo *tab, Relation rel, + ColumnDef *colDef) +{ + Oid myrelid = RelationGetRelid(rel); + Relation pgclass, + attrdesc; + HeapTuple reltup; + HeapTuple attributeTuple; + Form_pg_attribute attribute; + FormData_pg_attribute attributeD; + int i; + int minattnum, + maxatts; + HeapTuple typeTuple; + Form_pg_type tform; + Expr *defval; + + attrdesc = heap_openr(AttributeRelationName, RowExclusiveLock); /* - * OK, get on with it... - * - * Implementation restrictions: because we don't touch the table rows, - * the new column values will initially appear to be NULLs. (This - * happens because the heap tuple access routines always check for - * attnum > # of attributes in tuple, and return NULL if so.) - * Therefore we can't support a DEFAULT value in SQL92-compliant - * fashion, and we also can't allow a NOT NULL constraint. - * - * We do allow CHECK constraints, even though these theoretically could - * fail for NULL rows (eg, CHECK (newcol IS NOT NULL)). + * Are we adding the column to a recursion child? If so, check whether + * to merge with an existing definition for the column. */ - if (colDef->raw_default || colDef->cooked_default) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("adding columns with defaults is not implemented"), - errhint("Add the column, then use ALTER TABLE SET DEFAULT."))); + if (colDef->inhcount > 0) + { + HeapTuple tuple; - if (colDef->is_not_null) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("adding NOT NULL columns is not implemented"), - errhint("Add the column, then use ALTER TABLE SET NOT NULL."))); + /* Does child already have a column by this name? */ + tuple = SearchSysCacheCopyAttName(myrelid, colDef->colname); + if (HeapTupleIsValid(tuple)) + { + Form_pg_attribute childatt = (Form_pg_attribute) GETSTRUCT(tuple); + + /* Okay if child matches by type */ + if (typenameTypeId(colDef->typename) != childatt->atttypid || + colDef->typename->typmod != childatt->atttypmod) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("child table \"%s\" has different type for column \"%s\"", + RelationGetRelationName(rel), colDef->colname))); + + /* Bump the existing child att's inhcount */ + childatt->attinhcount++; + simple_heap_update(attrdesc, &tuple->t_self, tuple); + CatalogUpdateIndexes(attrdesc, tuple); + + heap_freetuple(tuple); + + /* Inform the user about the merge */ + ereport(NOTICE, + (errmsg("merging definition of column \"%s\" for child \"%s\"", + colDef->colname, RelationGetRelationName(rel)))); + + heap_close(attrdesc, RowExclusiveLock); + return; + } + } pgclass = heap_openr(RelationRelationName, RowExclusiveLock); - reltup = SearchSysCache(RELOID, - ObjectIdGetDatum(myrelid), - 0, 0, 0); + reltup = SearchSysCacheCopy(RELOID, + ObjectIdGetDatum(myrelid), + 0, 0, 0); if (!HeapTupleIsValid(reltup)) elog(ERROR, "cache lookup failed for relation %u", myrelid); @@ -1766,13 +2697,6 @@ AlterTableAddColumn(Oid myrelid, MaxHeapAttributeNumber))); i = minattnum + 1; - attrdesc = heap_openr(AttributeRelationName, RowExclusiveLock); - - if (colDef->typename->arrayBounds) - attndims = length(colDef->typename->arrayBounds); - else - attndims = 0; - typeTuple = typenameType(colDef->typename); tform = (Form_pg_type) GETSTRUCT(typeTuple); @@ -1795,12 +2719,11 @@ AlterTableAddColumn(Oid myrelid, attribute->atttypmod = colDef->typename->typmod; attribute->attnum = i; attribute->attbyval = tform->typbyval; - attribute->attndims = attndims; + attribute->attndims = length(colDef->typename->arrayBounds); attribute->attstorage = tform->typstorage; attribute->attalign = tform->typalign; attribute->attnotnull = colDef->is_not_null; - attribute->atthasdef = (colDef->raw_default != NULL || - colDef->cooked_default != NULL); + attribute->atthasdef = false; attribute->attisdropped = false; attribute->attislocal = colDef->is_local; attribute->attinhcount = colDef->inhcount; @@ -1817,132 +2740,120 @@ AlterTableAddColumn(Oid myrelid, /* * Update number of attributes in pg_class tuple */ - newreltup = heap_copytuple(reltup); - - ((Form_pg_class) GETSTRUCT(newreltup))->relnatts = maxatts; + ((Form_pg_class) GETSTRUCT(reltup))->relnatts = maxatts; - simple_heap_update(pgclass, &newreltup->t_self, newreltup); + simple_heap_update(pgclass, &reltup->t_self, reltup); /* keep catalog indexes current */ - CatalogUpdateIndexes(pgclass, newreltup); + CatalogUpdateIndexes(pgclass, reltup); - heap_freetuple(newreltup); - ReleaseSysCache(reltup); + heap_freetuple(reltup); heap_close(pgclass, RowExclusiveLock); - heap_close(rel, NoLock); /* close rel but keep lock! */ + /* Make the attribute's catalog entry visible */ + CommandCounterIncrement(); /* - * Add datatype dependency for the new column. + * Store the DEFAULT, if any, in the catalogs */ - myself.classId = RelOid_pg_class; - myself.objectId = myrelid; - myself.objectSubId = i; - referenced.classId = RelOid_pg_type; - referenced.objectId = attribute->atttypid; - referenced.objectSubId = 0; - recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); + if (colDef->raw_default) + { + RawColumnDefault *rawEnt; - /* - * Make our catalog updates visible for subsequent steps. - */ - CommandCounterIncrement(); + rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault)); + rawEnt->attnum = attribute->attnum; + rawEnt->raw_default = copyObject(colDef->raw_default); + + /* + * This function is intended for CREATE TABLE, so it processes a + * _list_ of defaults, but we just do one. + */ + AddRelationRawConstraints(rel, makeList1(rawEnt), NIL); + + /* Make the additional catalog changes visible */ + CommandCounterIncrement(); + } /* - * Add any CHECK constraints attached to the new column. + * Tell Phase 3 to fill in the default expression, if there is one. + * + * If there is no default, Phase 3 doesn't have to do anything, + * because that effectively means that the default is NULL. The + * heap tuple access routines always check for attnum > # of attributes + * in tuple, and return NULL if so, so without any modification of + * the tuple data we will get the effect of NULL values in the new + * column. * - * To do this we must re-open the rel so that its new attr list gets - * loaded into the relcache. + * Note: we use build_column_default, and not just the cooked default + * returned by AddRelationRawConstraints, so that the right thing happens + * when a datatype's default applies. */ - if (colDef->constraints != NIL) + defval = (Expr *) build_column_default(rel, attribute->attnum); + if (defval) { - rel = heap_open(myrelid, AccessExclusiveLock); - AddRelationRawConstraints(rel, NIL, colDef->constraints); - heap_close(rel, NoLock); + NewColumnValue *newval; + + newval = (NewColumnValue *) palloc0(sizeof(NewColumnValue)); + newval->attnum = attribute->attnum; + newval->expr = defval; + + tab->newvals = lappend(tab->newvals, newval); } /* - * Automatically create the secondary relation for TOAST if it - * formerly had no such but now has toastable attributes. + * Add datatype dependency for the new column. */ - AlterTableCreateToastTable(myrelid, true); + add_column_datatype_dependency(myrelid, i, attribute->atttypid); +} + +/* + * Install a column's dependency on its datatype. + */ +static void +add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid) +{ + ObjectAddress myself, + referenced; + + myself.classId = RelOid_pg_class; + myself.objectId = relid; + myself.objectSubId = attnum; + referenced.classId = RelOid_pg_type; + referenced.objectId = typid; + referenced.objectSubId = 0; + recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); } /* * ALTER TABLE ALTER COLUMN DROP NOT NULL */ -void -AlterTableAlterColumnDropNotNull(Oid myrelid, bool recurse, - const char *colName) +static void +ATExecDropNotNull(Relation rel, const char *colName) { - Relation rel; HeapTuple tuple; AttrNumber attnum; Relation attr_rel; List *indexoidlist; List *indexoidscan; - rel = heap_open(myrelid, AccessExclusiveLock); - - if (rel->rd_rel->relkind != RELKIND_RELATION) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not a table", - RelationGetRelationName(rel)))); - - /* Permissions checks */ - if (!pg_class_ownercheck(myrelid, GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, - RelationGetRelationName(rel)); - - if (!allowSystemTableMods && IsSystemRelation(rel)) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("permission denied: \"%s\" is a system catalog", - RelationGetRelationName(rel)))); - /* - * Propagate to children if desired + * lookup the attribute */ - if (recurse) - { - List *child, - *children; - - /* this routine is actually in the planner */ - children = find_all_inheritors(myrelid); - - /* - * find_all_inheritors does the recursive search of the - * inheritance hierarchy, so all we have to do is process all of - * the relids in the list that it returns. - */ - foreach(child, children) - { - Oid childrelid = lfirsto(child); - - if (childrelid == myrelid) - continue; - AlterTableAlterColumnDropNotNull(childrelid, - false, colName); - } - } + attr_rel = heap_openr(AttributeRelationName, RowExclusiveLock); - /* now do the thing on this relation */ + tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName); - /* - * get the number of the attribute - */ - attnum = get_attnum(myrelid, colName); - if (attnum == InvalidAttrNumber) + if (!HeapTupleIsValid(tuple)) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN), - errmsg("column \"%s\" of relation \"%s\" does not exist", - colName, RelationGetRelationName(rel)))); + errmsg("column \"%s\" of relation \"%s\" does not exist", + colName, RelationGetRelationName(rel)))); + + attnum = ((Form_pg_attribute) GETSTRUCT(tuple))->attnum; /* Prevent them from altering a system attribute */ - if (attnum < 0) + if (attnum <= 0) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot alter system column \"%s\"", @@ -1992,221 +2903,92 @@ AlterTableAlterColumnDropNotNull(Oid myrelid, bool recurse, freeList(indexoidlist); /* - * Okay, actually perform the catalog change + * Okay, actually perform the catalog change ... if needed */ - attr_rel = heap_openr(AttributeRelationName, RowExclusiveLock); - - tuple = SearchSysCacheCopyAttName(myrelid, colName); - if (!HeapTupleIsValid(tuple)) /* shouldn't happen */ - elog(ERROR, "cache lookup failed for attribute \"%s\" of relation %u", - colName, myrelid); - - ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull = FALSE; + if (((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull) + { + ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull = FALSE; - simple_heap_update(attr_rel, &tuple->t_self, tuple); + simple_heap_update(attr_rel, &tuple->t_self, tuple); - /* keep the system catalog indexes current */ - CatalogUpdateIndexes(attr_rel, tuple); + /* keep the system catalog indexes current */ + CatalogUpdateIndexes(attr_rel, tuple); + } heap_close(attr_rel, RowExclusiveLock); - - heap_close(rel, NoLock); } /* * ALTER TABLE ALTER COLUMN SET NOT NULL */ -void -AlterTableAlterColumnSetNotNull(Oid myrelid, bool recurse, - const char *colName) +static void +ATExecSetNotNull(AlteredTableInfo *tab, Relation rel, + const char *colName) { - Relation rel; HeapTuple tuple; AttrNumber attnum; Relation attr_rel; - HeapScanDesc scan; - TupleDesc tupdesc; - - rel = heap_open(myrelid, AccessExclusiveLock); - - if (rel->rd_rel->relkind != RELKIND_RELATION) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not a table", - RelationGetRelationName(rel)))); - - /* Permissions checks */ - if (!pg_class_ownercheck(myrelid, GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, - RelationGetRelationName(rel)); - - if (!allowSystemTableMods && IsSystemRelation(rel)) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("permission denied: \"%s\" is a system catalog", - RelationGetRelationName(rel)))); + NewConstraint *newcon; /* - * Propagate to children if desired + * lookup the attribute */ - if (recurse) - { - List *child, - *children; - - /* this routine is actually in the planner */ - children = find_all_inheritors(myrelid); - - /* - * find_all_inheritors does the recursive search of the - * inheritance hierarchy, so all we have to do is process all of - * the relids in the list that it returns. - */ - foreach(child, children) - { - Oid childrelid = lfirsto(child); - - if (childrelid == myrelid) - continue; - AlterTableAlterColumnSetNotNull(childrelid, - false, colName); - } - } + attr_rel = heap_openr(AttributeRelationName, RowExclusiveLock); - /* now do the thing on this relation */ + tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName); - /* - * get the number of the attribute - */ - attnum = get_attnum(myrelid, colName); - if (attnum == InvalidAttrNumber) + if (!HeapTupleIsValid(tuple)) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN), - errmsg("column \"%s\" of relation \"%s\" does not exist", - colName, RelationGetRelationName(rel)))); + errmsg("column \"%s\" of relation \"%s\" does not exist", + colName, RelationGetRelationName(rel)))); + + attnum = ((Form_pg_attribute) GETSTRUCT(tuple))->attnum; /* Prevent them from altering a system attribute */ - if (attnum < 0) + if (attnum <= 0) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot alter system column \"%s\"", colName))); /* - * Perform a scan to ensure that there are no NULL values already in - * the relation + * Okay, actually perform the catalog change ... if needed */ - tupdesc = RelationGetDescr(rel); - - scan = heap_beginscan(rel, SnapshotNow, 0, NULL); - - while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + if (! ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull) { - Datum d; - bool isnull; - - d = heap_getattr(tuple, attnum, tupdesc, &isnull); - - if (isnull) - ereport(ERROR, - (errcode(ERRCODE_NOT_NULL_VIOLATION), - errmsg("column \"%s\" contains null values", - colName))); - } + ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull = TRUE; - heap_endscan(scan); + simple_heap_update(attr_rel, &tuple->t_self, tuple); - /* - * Okay, actually perform the catalog change - */ - attr_rel = heap_openr(AttributeRelationName, RowExclusiveLock); + /* keep the system catalog indexes current */ + CatalogUpdateIndexes(attr_rel, tuple); - tuple = SearchSysCacheCopyAttName(myrelid, colName); - if (!HeapTupleIsValid(tuple)) /* shouldn't happen */ - elog(ERROR, "cache lookup failed for attribute \"%s\" of relation %u", - colName, myrelid); + /* Tell Phase 3 to test the constraint */ + newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); + newcon->contype = CONSTR_NOTNULL; + newcon->attnum = attnum; + newcon->name = "NOT NULL"; - ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull = TRUE; - - simple_heap_update(attr_rel, &tuple->t_self, tuple); - - /* keep the system catalog indexes current */ - CatalogUpdateIndexes(attr_rel, tuple); + tab->constraints = lappend(tab->constraints, newcon); + } heap_close(attr_rel, RowExclusiveLock); - - heap_close(rel, NoLock); } /* * ALTER TABLE ALTER COLUMN SET/DROP DEFAULT */ -void -AlterTableAlterColumnDefault(Oid myrelid, bool recurse, - const char *colName, - Node *newDefault) +static void +ATExecColumnDefault(Relation rel, const char *colName, + Node *newDefault) { - Relation rel; AttrNumber attnum; - rel = heap_open(myrelid, AccessExclusiveLock); - - /* - * We allow defaults on views so that INSERT into a view can have - * default-ish behavior. This works because the rewriter substitutes - * default values into INSERTs before it expands rules. - */ - if (rel->rd_rel->relkind != RELKIND_RELATION && - rel->rd_rel->relkind != RELKIND_VIEW) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not a table or view", - RelationGetRelationName(rel)))); - - /* Permissions checks */ - if (!pg_class_ownercheck(myrelid, GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, - RelationGetRelationName(rel)); - - if (!allowSystemTableMods && IsSystemRelation(rel)) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("permission denied: \"%s\" is a system catalog", - RelationGetRelationName(rel)))); - - /* - * Propagate to children if desired - */ - if (recurse) - { - List *child, - *children; - - /* this routine is actually in the planner */ - children = find_all_inheritors(myrelid); - - /* - * find_all_inheritors does the recursive search of the - * inheritance hierarchy, so all we have to do is process all of - * the relids in the list that it returns. - */ - foreach(child, children) - { - Oid childrelid = lfirsto(child); - - if (childrelid == myrelid) - continue; - AlterTableAlterColumnDefault(childrelid, - false, colName, newDefault); - } - } - - /* now do the thing on this relation */ - /* * get the number of the attribute */ - attnum = get_attnum(myrelid, colName); + attnum = get_attnum(RelationGetRelid(rel), colName); if (attnum == InvalidAttrNumber) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN), @@ -2214,7 +2996,7 @@ AlterTableAlterColumnDefault(Oid myrelid, bool recurse, colName, RelationGetRelationName(rel)))); /* Prevent them from altering a system attribute */ - if (attnum < 0) + if (attnum <= 0) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot alter system column \"%s\"", @@ -2225,7 +3007,7 @@ AlterTableAlterColumnDefault(Oid myrelid, bool recurse, * safety, but at present we do not expect anything to depend on the * default. */ - RemoveAttrDefault(myrelid, attnum, DROP_RESTRICT, false); + RemoveAttrDefault(RelationGetRelid(rel), attnum, DROP_RESTRICT, false); if (newDefault) { @@ -2242,141 +3024,67 @@ AlterTableAlterColumnDefault(Oid myrelid, bool recurse, */ AddRelationRawConstraints(rel, makeList1(rawEnt), NIL); } - - heap_close(rel, NoLock); } /* - * ALTER TABLE ALTER COLUMN SET STATISTICS / STORAGE + * ALTER TABLE ALTER COLUMN SET STATISTICS */ -void -AlterTableAlterColumnFlags(Oid myrelid, bool recurse, - const char *colName, - Node *flagValue, const char *flagType) +static void +ATPrepSetStatistics(Relation rel, const char *colName, Node *flagValue) { - Relation rel; - int newtarget = 1; - char newstorage = 'p'; - Relation attrelation; - HeapTuple tuple; - Form_pg_attribute attrtuple; - - rel = relation_open(myrelid, AccessExclusiveLock); - /* - * Allow index for statistics case only + * We do our own permission checking because (a) we want to allow + * SET STATISTICS on indexes (for expressional index columns), and + * (b) we want to allow SET STATISTICS on system catalogs without + * requiring allowSystemTableMods to be turned on. */ - if (rel->rd_rel->relkind != RELKIND_RELATION) - { - if (rel->rd_rel->relkind != RELKIND_INDEX || *flagType != 'S') - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not a table", - RelationGetRelationName(rel)))); - } + if (rel->rd_rel->relkind != RELKIND_RELATION && + rel->rd_rel->relkind != RELKIND_INDEX) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a table or index", + RelationGetRelationName(rel)))); /* Permissions checks */ - if (!pg_class_ownercheck(myrelid, GetUserId())) + if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, RelationGetRelationName(rel)); +} - /* - * we allow statistics case for system tables - */ - if (*flagType != 'S' && !allowSystemTableMods && IsSystemRelation(rel)) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("permission denied: \"%s\" is a system catalog", - RelationGetRelationName(rel)))); +static void +ATExecSetStatistics(Relation rel, const char *colName, Node *newValue) +{ + int newtarget; + Relation attrelation; + HeapTuple tuple; + Form_pg_attribute attrtuple; + + Assert(IsA(newValue, Integer)); + newtarget = intVal(newValue); /* - * Check the supplied parameters before anything else + * Limit target to a sane range */ - if (*flagType == 'S') - { - /* STATISTICS */ - Assert(IsA(flagValue, Integer)); - newtarget = intVal(flagValue); - - /* - * Limit target to a sane range - */ - if (newtarget < -1) - { - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("statistics target %d is too low", - newtarget))); - } - else if (newtarget > 1000) - { - newtarget = 1000; - ereport(WARNING, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("lowering statistics target to %d", - newtarget))); - } - } - else if (*flagType == 'M') - { - /* STORAGE */ - char *storagemode; - - Assert(IsA(flagValue, String)); - storagemode = strVal(flagValue); - - if (strcasecmp(storagemode, "plain") == 0) - newstorage = 'p'; - else if (strcasecmp(storagemode, "external") == 0) - newstorage = 'e'; - else if (strcasecmp(storagemode, "extended") == 0) - newstorage = 'x'; - else if (strcasecmp(storagemode, "main") == 0) - newstorage = 'm'; - else - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("invalid storage type \"%s\"", - storagemode))); - } - else + if (newtarget < -1) { - elog(ERROR, "unrecognized alter-column type flag: %c", - (int) *flagType); + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("statistics target %d is too low", + newtarget))); } - - /* - * Propagate to children if desired - */ - if (recurse && rel->rd_rel->relkind == RELKIND_RELATION) + else if (newtarget > 1000) { - List *child, - *children; - - /* this routine is actually in the planner */ - children = find_all_inheritors(myrelid); - - /* - * find_all_inheritors does the recursive search of the - * inheritance hierarchy, so all we have to do is process all of - * the relids in the list that it returns. - */ - foreach(child, children) - { - Oid childrelid = lfirsto(child); - - if (childrelid == myrelid) - continue; - AlterTableAlterColumnFlags(childrelid, - false, colName, flagValue, flagType); - } + newtarget = 1000; + ereport(WARNING, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("lowering statistics target to %d", + newtarget))); } - /* now do the thing on this relation */ - attrelation = heap_openr(AttributeRelationName, RowExclusiveLock); - tuple = SearchSysCacheCopyAttName(myrelid, colName); + tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName); + if (!HeapTupleIsValid(tuple)) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN), @@ -2384,31 +3092,13 @@ AlterTableAlterColumnFlags(Oid myrelid, bool recurse, colName, RelationGetRelationName(rel)))); attrtuple = (Form_pg_attribute) GETSTRUCT(tuple); - if (attrtuple->attnum < 0) + if (attrtuple->attnum <= 0) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot alter system column \"%s\"", colName))); - /* - * Now change the appropriate field - */ - if (*flagType == 'S') - attrtuple->attstattarget = newtarget; - else if (*flagType == 'M') - { - /* - * safety check: do not allow toasted storage modes unless column - * datatype is TOAST-aware. - */ - if (newstorage == 'p' || TypeIsToastable(attrtuple->atttypid)) - attrtuple->attstorage = newstorage; - else - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("column data type %s can only have storage PLAIN", - format_type_be(attrtuple->atttypid)))); - } + attrtuple->attstattarget = newtarget; simple_heap_update(attrelation, &tuple->t_self, tuple); @@ -2418,85 +3108,110 @@ AlterTableAlterColumnFlags(Oid myrelid, bool recurse, heap_freetuple(tuple); heap_close(attrelation, RowExclusiveLock); - - heap_close(rel, NoLock); /* close rel, but keep lock! */ } /* - * ALTER TABLE SET WITH/WITHOUT OIDS + * ALTER TABLE ALTER COLUMN SET STORAGE */ -void -AlterTableAlterOids(Oid myrelid, bool setOid, bool recurse, - DropBehavior behavior) +static void +ATExecSetStorage(Relation rel, const char *colName, Node *newValue) { - Relation rel; - - rel = heap_open(myrelid, AccessExclusiveLock); + char *storagemode; + char newstorage; + Relation attrelation; + HeapTuple tuple; + Form_pg_attribute attrtuple; - /* - * check to see if we actually need to change anything - */ - if (rel->rd_rel->relhasoids == setOid) + Assert(IsA(newValue, String)); + storagemode = strVal(newValue); + + if (strcasecmp(storagemode, "plain") == 0) + newstorage = 'p'; + else if (strcasecmp(storagemode, "external") == 0) + newstorage = 'e'; + else if (strcasecmp(storagemode, "extended") == 0) + newstorage = 'x'; + else if (strcasecmp(storagemode, "main") == 0) + newstorage = 'm'; + else { - heap_close(rel, NoLock); /* close rel, but keep lock! */ - return; + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid storage type \"%s\"", + storagemode))); + newstorage = 0; /* keep compiler quiet */ } - if (setOid) - { - /* - * TODO: Generate the now required OID pg_attribute entry, and - * modify physical rows to have OIDs. - */ + attrelation = heap_openr(AttributeRelationName, RowExclusiveLock); + + tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName); + + if (!HeapTupleIsValid(tuple)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("column \"%s\" of relation \"%s\" does not exist", + colName, RelationGetRelationName(rel)))); + attrtuple = (Form_pg_attribute) GETSTRUCT(tuple); + + if (attrtuple->attnum <= 0) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("ALTER TABLE WITH OIDS is not yet implemented"))); - } + errmsg("cannot alter system column \"%s\"", + colName))); + + /* + * safety check: do not allow toasted storage modes unless column + * datatype is TOAST-aware. + */ + if (newstorage == 'p' || TypeIsToastable(attrtuple->atttypid)) + attrtuple->attstorage = newstorage; else - { - heap_close(rel, NoLock); /* close rel, but keep lock! */ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("column data type %s can only have storage PLAIN", + format_type_be(attrtuple->atttypid)))); - AlterTableDropColumn(myrelid, recurse, false, "oid", behavior); - } + simple_heap_update(attrelation, &tuple->t_self, tuple); + + /* keep system catalog indexes current */ + CatalogUpdateIndexes(attrelation, tuple); + + heap_freetuple(tuple); + + heap_close(attrelation, RowExclusiveLock); } + /* * ALTER TABLE DROP COLUMN + * + * DROP COLUMN cannot use the normal ALTER TABLE recursion mechanism, + * because we have to decide at runtime whether to recurse or not depending + * on whether attinhcount goes to zero or not. (We can't check this in a + * static pre-pass because it won't handle multiple inheritance situations + * correctly.) Since DROP COLUMN doesn't need to create any work queue + * entries for Phase 3, it's okay to recurse internally in this routine + * without considering the work queue. */ -void -AlterTableDropColumn(Oid myrelid, bool recurse, bool recursing, - const char *colName, - DropBehavior behavior) +static void +ATExecDropColumn(Relation rel, const char *colName, + DropBehavior behavior, + bool recurse, bool recursing) { - Relation rel; - AttrNumber attnum; HeapTuple tuple; Form_pg_attribute targetatt; + AttrNumber attnum; + List *children; ObjectAddress object; - rel = heap_open(myrelid, AccessExclusiveLock); - - if (rel->rd_rel->relkind != RELKIND_RELATION) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not a table", - RelationGetRelationName(rel)))); - - /* Permissions checks */ - if (!pg_class_ownercheck(myrelid, GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, - RelationGetRelationName(rel)); - - if (!allowSystemTableMods && IsSystemRelation(rel)) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("permission denied: \"%s\" is a system catalog", - RelationGetRelationName(rel)))); + /* At top level, permission check was done in ATPrepCmd, else do it */ + if (recursing) + ATSimplePermissions(rel, false); /* * get the number of the attribute */ - tuple = SearchSysCacheAttName(myrelid, colName); + tuple = SearchSysCacheAttName(RelationGetRelid(rel), colName); if (!HeapTupleIsValid(tuple)) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN), @@ -2523,65 +3238,16 @@ AlterTableDropColumn(Oid myrelid, bool recurse, bool recursing, ReleaseSysCache(tuple); /* - * If we are asked to drop ONLY in this table (no recursion), we need - * to mark the inheritors' attribute as locally defined rather than - * inherited. - */ - if (!recurse && !recursing) - { - Relation attr_rel; - List *child, - *children; - - /* We only want direct inheritors in this case */ - children = find_inheritance_children(myrelid); - - attr_rel = heap_openr(AttributeRelationName, RowExclusiveLock); - foreach(child, children) - { - Oid childrelid = lfirsto(child); - Relation childrel; - Form_pg_attribute childatt; - - childrel = heap_open(childrelid, AccessExclusiveLock); - - tuple = SearchSysCacheCopyAttName(childrelid, colName); - if (!HeapTupleIsValid(tuple)) /* shouldn't happen */ - elog(ERROR, "cache lookup failed for attribute \"%s\" of relation %u", - colName, childrelid); - childatt = (Form_pg_attribute) GETSTRUCT(tuple); - - if (childatt->attinhcount <= 0) /* shouldn't happen */ - elog(ERROR, "relation %u has non-inherited attribute \"%s\"", - childrelid, colName); - childatt->attinhcount--; - childatt->attislocal = true; - - simple_heap_update(attr_rel, &tuple->t_self, tuple); - - /* keep the system catalog indexes current */ - CatalogUpdateIndexes(attr_rel, tuple); - - heap_freetuple(tuple); - - heap_close(childrel, NoLock); - } - heap_close(attr_rel, RowExclusiveLock); - } - - /* - * Propagate to children if desired. Unlike most other ALTER + * Propagate to children as appropriate. Unlike most other ALTER * routines, we have to do this one level of recursion at a time; we * can't use find_all_inheritors to do it in one pass. */ - if (recurse) + children = find_inheritance_children(RelationGetRelid(rel)); + + if (children) { Relation attr_rel; - List *child, - *children; - - /* We only want direct inheritors in this case */ - children = find_inheritance_children(myrelid); + List *child; attr_rel = heap_openr(AttributeRelationName, RowExclusiveLock); foreach(child, children) @@ -2590,9 +3256,6 @@ AlterTableDropColumn(Oid myrelid, bool recurse, bool recursing, Relation childrel; Form_pg_attribute childatt; - if (childrelid == myrelid) - continue; - childrel = heap_open(childrelid, AccessExclusiveLock); tuple = SearchSysCacheCopyAttName(childrelid, colName); @@ -2605,20 +3268,49 @@ AlterTableDropColumn(Oid myrelid, bool recurse, bool recursing, elog(ERROR, "relation %u has non-inherited attribute \"%s\"", childrelid, colName); - if (childatt->attinhcount == 1 && !childatt->attislocal) + if (recurse) { - /* Time to delete this child column, too */ - AlterTableDropColumn(childrelid, true, true, colName, behavior); + /* + * If the child column has other definition sources, just + * decrement its inheritance count; if not, recurse to delete + * it. + */ + if (childatt->attinhcount == 1 && !childatt->attislocal) + { + /* Time to delete this child column, too */ + ATExecDropColumn(childrel, colName, behavior, true, true); + } + else + { + /* Child column must survive my deletion */ + childatt->attinhcount--; + + simple_heap_update(attr_rel, &tuple->t_self, tuple); + + /* keep the system catalog indexes current */ + CatalogUpdateIndexes(attr_rel, tuple); + + /* Make update visible */ + CommandCounterIncrement(); + } } else { - /* Child column must survive my deletion */ + /* + * If we were told to drop ONLY in this table (no recursion), + * we need to mark the inheritors' attribute as locally + * defined rather than inherited. + */ childatt->attinhcount--; + childatt->attislocal = true; simple_heap_update(attr_rel, &tuple->t_self, tuple); /* keep the system catalog indexes current */ CatalogUpdateIndexes(attr_rel, tuple); + + /* Make update visible */ + CommandCounterIncrement(); } heap_freetuple(tuple); @@ -2632,7 +3324,7 @@ AlterTableDropColumn(Oid myrelid, bool recurse, bool recursing, * Perform the actual column deletion */ object.classId = RelOid_pg_class; - object.objectId = myrelid; + object.objectId = RelationGetRelid(rel); object.objectSubId = attnum; performDeletion(&object, behavior); @@ -2648,10 +3340,11 @@ AlterTableDropColumn(Oid myrelid, bool recurse, bool recursing, class_rel = heap_openr(RelationRelationName, RowExclusiveLock); tuple = SearchSysCacheCopy(RELOID, - ObjectIdGetDatum(myrelid), + ObjectIdGetDatum(RelationGetRelid(rel)), 0, 0, 0); if (!HeapTupleIsValid(tuple)) - elog(ERROR, "cache lookup failed for relation %u", myrelid); + elog(ERROR, "cache lookup failed for relation %u", + RelationGetRelid(rel)); tuple_class = (Form_pg_class) GETSTRUCT(tuple); tuple_class->relhasoids = false; @@ -2662,298 +3355,149 @@ AlterTableDropColumn(Oid myrelid, bool recurse, bool recursing, heap_close(class_rel, RowExclusiveLock); } - - heap_close(rel, NoLock); /* close rel, but keep lock! */ } +/* + * ALTER TABLE ADD INDEX + * + * There is no such command in the grammar, but the parser converts UNIQUE + * and PRIMARY KEY constraints into AT_AddIndex subcommands. This lets us + * schedule creation of the index at the appropriate time during ALTER. + */ +static void +ATExecAddIndex(AlteredTableInfo *tab, Relation rel, + IndexStmt *stmt, bool is_rebuild) +{ + bool check_rights; + bool skip_build; + bool quiet; + + Assert(IsA(stmt, IndexStmt)); + + /* suppress schema rights check when rebuilding existing index */ + check_rights = !is_rebuild; + /* skip index build if phase 3 will have to rewrite table anyway */ + skip_build = (tab->newvals != NIL); + /* suppress notices when rebuilding existing index */ + quiet = is_rebuild; + + DefineIndex(stmt->relation, /* relation */ + stmt->idxname, /* index name */ + stmt->accessMethod, /* am name */ + stmt->indexParams, /* parameters */ + (Expr *) stmt->whereClause, + stmt->rangetable, + stmt->unique, + stmt->primary, + stmt->isconstraint, + true, /* is_alter_table */ + check_rights, + skip_build, + quiet); +} /* * ALTER TABLE ADD CONSTRAINT */ -void -AlterTableAddConstraint(Oid myrelid, bool recurse, - List *newConstraints) +static void +ATExecAddConstraint(AlteredTableInfo *tab, Relation rel, Node *newConstraint) { - Relation rel; - List *listptr; - int counter = 0; - - /* - * Grab an exclusive lock on the target table, which we will NOT - * release until end of transaction. - */ - rel = heap_open(myrelid, AccessExclusiveLock); - - if (rel->rd_rel->relkind != RELKIND_RELATION) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not a table", - RelationGetRelationName(rel)))); - - /* Permissions checks */ - if (!pg_class_ownercheck(myrelid, GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, - RelationGetRelationName(rel)); - - if (!allowSystemTableMods && IsSystemRelation(rel)) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("permission denied: \"%s\" is a system catalog", - RelationGetRelationName(rel)))); - - if (recurse) + switch (nodeTag(newConstraint)) { - List *child, - *children; - - /* this routine is actually in the planner */ - children = find_all_inheritors(myrelid); - - /* - * find_all_inheritors does the recursive search of the - * inheritance hierarchy, so all we have to do is process all of - * the relids in the list that it returns. - */ - foreach(child, children) + case T_Constraint: { - Oid childrelid = lfirsto(child); + Constraint *constr = (Constraint *) newConstraint; - if (childrelid == myrelid) - continue; - AlterTableAddConstraint(childrelid, false, newConstraints); - } - } - - foreach(listptr, newConstraints) - { - /* - * copy is because we may destructively alter the node below by - * inserting a generated name; this name is not necessarily - * correct for children or parents. - */ - Node *newConstraint = copyObject(lfirst(listptr)); - - switch (nodeTag(newConstraint)) - { - case T_Constraint: - { - Constraint *constr = (Constraint *) newConstraint; - - /* - * Assign or validate constraint name - */ - if (constr->name) - { - if (ConstraintNameIsUsed(CONSTRAINT_RELATION, - RelationGetRelid(rel), - RelationGetNamespace(rel), - constr->name)) - ereport(ERROR, - (errcode(ERRCODE_DUPLICATE_OBJECT), - errmsg("constraint \"%s\" for relation \"%s\" already exists", - constr->name, - RelationGetRelationName(rel)))); - } - else - constr->name = GenerateConstraintName(CONSTRAINT_RELATION, - RelationGetRelid(rel), - RelationGetNamespace(rel), - &counter); - - /* - * Currently, we only expect to see CONSTR_CHECK nodes - * arriving here (see the preprocessing done in - * parser/analyze.c). Use a switch anyway to make it - * easier to add more code later. - */ - switch (constr->contype) - { - case CONSTR_CHECK: - AlterTableAddCheckConstraint(rel, constr); - break; - default: - elog(ERROR, "unrecognized constraint type: %d", - (int) constr->contype); - } - break; - } - case T_FkConstraint: + /* + * Currently, we only expect to see CONSTR_CHECK nodes + * arriving here (see the preprocessing done in + * parser/analyze.c). Use a switch anyway to make it + * easier to add more code later. + */ + switch (constr->contype) + { + case CONSTR_CHECK: { - FkConstraint *fkconstraint = (FkConstraint *) newConstraint; + List *newcons; + List *lcon; /* - * Assign or validate constraint name + * Call AddRelationRawConstraints to do the work. + * It returns a list of cooked constraints. */ - if (fkconstraint->constr_name) - { - if (ConstraintNameIsUsed(CONSTRAINT_RELATION, - RelationGetRelid(rel), - RelationGetNamespace(rel), - fkconstraint->constr_name)) - ereport(ERROR, - (errcode(ERRCODE_DUPLICATE_OBJECT), - errmsg("constraint \"%s\" for relation \"%s\" already exists", - fkconstraint->constr_name, - RelationGetRelationName(rel)))); + newcons = AddRelationRawConstraints(rel, NIL, + makeList1(constr)); + /* Add each constraint to Phase 3's queue */ + foreach(lcon, newcons) + { + CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon); + NewConstraint *newcon; + + newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); + newcon->name = ccon->name; + newcon->contype = ccon->contype; + newcon->attnum = ccon->attnum; + /* ExecQual wants implicit-AND format */ + newcon->qual = (Node *) + make_ands_implicit((Expr *) ccon->expr); + + tab->constraints = lappend(tab->constraints, + newcon); } - else - fkconstraint->constr_name = GenerateConstraintName(CONSTRAINT_RELATION, - RelationGetRelid(rel), - RelationGetNamespace(rel), - &counter); - - AlterTableAddForeignKeyConstraint(rel, fkconstraint); - break; } - default: - elog(ERROR, "unrecognized node type: %d", - (int) nodeTag(newConstraint)); + default: + elog(ERROR, "unrecognized constraint type: %d", + (int) constr->contype); + } + break; } + case T_FkConstraint: + { + FkConstraint *fkconstraint = (FkConstraint *) newConstraint; - /* If we have multiple constraints to make, bump CC between 'em */ - if (lnext(listptr)) - CommandCounterIncrement(); - } - - /* Close rel, but keep lock till commit */ - heap_close(rel, NoLock); -} - -/* - * Add a check constraint to a single table - * - * Subroutine for AlterTableAddConstraint. Must already hold exclusive - * lock on the rel, and have done appropriate validity/permissions checks - * for it. - */ -static void -AlterTableAddCheckConstraint(Relation rel, Constraint *constr) -{ - ParseState *pstate; - bool successful = true; - HeapScanDesc scan; - EState *estate; - ExprContext *econtext; - TupleTableSlot *slot; - HeapTuple tuple; - RangeTblEntry *rte; - List *qual; - List *qualstate; - Node *expr; - - /* - * We need to make a parse state and range table to allow us to do - * transformExpr() - */ - pstate = make_parsestate(NULL); - rte = addRangeTableEntryForRelation(pstate, - RelationGetRelid(rel), - makeAlias(RelationGetRelationName(rel), NIL), - false, - true); - addRTEtoQuery(pstate, rte, true, true); - - /* - * Convert the A_EXPR in raw_expr into an EXPR - */ - expr = transformExpr(pstate, constr->raw_expr); - - /* - * Make sure it yields a boolean result. - */ - expr = coerce_to_boolean(pstate, expr, "CHECK"); - - /* - * Make sure no outside relations are referred to. - */ - if (length(pstate->p_rtable) != 1) - ereport(ERROR, - (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), - errmsg("check constraint may only reference relation \"%s\"", - RelationGetRelationName(rel)))); - - /* - * No subplans or aggregates, either... - */ - if (pstate->p_hasSubLinks) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot use subquery in check constraint"))); - if (pstate->p_hasAggs) - ereport(ERROR, - (errcode(ERRCODE_GROUPING_ERROR), - errmsg("cannot use aggregate function in check constraint"))); - - /* - * Might as well try to reduce any constant expressions, so as to - * minimize overhead while testing the constraint at each row. - * - * Note that the stored form of the constraint will NOT be const-folded. - */ - expr = eval_const_expressions(expr); - - /* Needs to be in implicit-ANDs form for ExecQual */ - qual = make_ands_implicit((Expr *) expr); - - /* Need an EState to run ExecQual */ - estate = CreateExecutorState(); - econtext = GetPerTupleExprContext(estate); - - /* build execution state for qual */ - qualstate = (List *) ExecPrepareExpr((Expr *) qual, estate); - - /* Make tuple slot to hold tuples */ - slot = MakeTupleTableSlot(); - ExecSetSlotDescriptor(slot, RelationGetDescr(rel), false); - - /* Arrange for econtext's scan tuple to be the tuple under test */ - econtext->ecxt_scantuple = slot; + /* + * Assign or validate constraint name + */ + if (fkconstraint->constr_name) + { + if (ConstraintNameIsUsed(CONSTRAINT_RELATION, + RelationGetRelid(rel), + RelationGetNamespace(rel), + fkconstraint->constr_name)) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("constraint \"%s\" for relation \"%s\" already exists", + fkconstraint->constr_name, + RelationGetRelationName(rel)))); + } + else + fkconstraint->constr_name = + GenerateConstraintName(CONSTRAINT_RELATION, + RelationGetRelid(rel), + RelationGetNamespace(rel), + &tab->constr_name_ctr); - /* - * Scan through the rows now, checking the expression at each row. - */ - scan = heap_beginscan(rel, SnapshotNow, 0, NULL); + ATAddForeignKeyConstraint(tab, rel, fkconstraint); - while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) - { - ExecStoreTuple(tuple, slot, InvalidBuffer, false); - if (!ExecQual(qualstate, econtext, true)) - { - successful = false; break; } - ResetExprContext(econtext); + default: + elog(ERROR, "unrecognized node type: %d", + (int) nodeTag(newConstraint)); } - - heap_endscan(scan); - - pfree(slot); - FreeExecutorState(estate); - - if (!successful) - ereport(ERROR, - (errcode(ERRCODE_CHECK_VIOLATION), - errmsg("check constraint \"%s\" is violated by some row", - constr->name))); - - /* - * Call AddRelationRawConstraints to do the real adding -- It - * duplicates some of the above, but does not check the validity of - * the constraint against tuples already in the table. - */ - AddRelationRawConstraints(rel, NIL, makeList1(constr)); } /* * Add a foreign-key constraint to a single table * - * Subroutine for AlterTableAddConstraint. Must already hold exclusive + * Subroutine for ATExecAddConstraint. Must already hold exclusive * lock on the rel, and have done appropriate validity/permissions checks * for it. */ static void -AlterTableAddForeignKeyConstraint(Relation rel, FkConstraint *fkconstraint) +ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, + FkConstraint *fkconstraint) { Relation pkrel; AclResult aclresult; @@ -3124,11 +3668,21 @@ AlterTableAddForeignKeyConstraint(Relation rel, FkConstraint *fkconstraint) } /* - * Check that the constraint is satisfied by existing rows (we can - * skip this during table creation). + * Tell Phase 3 to check that the constraint is satisfied by existing rows + * (we can skip this during table creation). */ if (!fkconstraint->skip_validation) - validateForeignKeyConstraint(fkconstraint, rel, pkrel); + { + NewConstraint *newcon; + + newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); + newcon->name = fkconstraint->constr_name; + newcon->contype = CONSTR_FOREIGN; + newcon->refrelid = RelationGetRelid(pkrel); + newcon->qual = (Node *) fkconstraint; + + tab->constraints = lappend(tab->constraints, newcon); + } /* * Record the FK constraint in pg_constraint. @@ -3728,94 +4282,626 @@ fkMatchTypeToString(char match_type) /* * ALTER TABLE DROP CONSTRAINT */ -void -AlterTableDropConstraint(Oid myrelid, bool recurse, - const char *constrName, - DropBehavior behavior) +static void +ATPrepDropConstraint(List **wqueue, Relation rel, + bool recurse, AlterTableCmd *cmd) { - Relation rel; - int deleted = 0; - /* - * Acquire an exclusive lock on the target relation for the duration - * of the operation. + * We don't want errors or noise from child tables, so we have to pass + * down a modified command. */ - rel = heap_open(myrelid, AccessExclusiveLock); + if (recurse) + { + AlterTableCmd *childCmd = copyObject(cmd); - /* Disallow DROP CONSTRAINT on views, indexes, sequences, etc */ - if (rel->rd_rel->relkind != RELKIND_RELATION) + childCmd->subtype = AT_DropConstraintQuietly; + ATSimpleRecursion(wqueue, rel, childCmd, recurse); + } +} + +static void +ATExecDropConstraint(Relation rel, const char *constrName, + DropBehavior behavior, bool quiet) +{ + int deleted; + + deleted = RemoveRelConstraints(rel, constrName, behavior); + + if (!quiet) + { + /* If zero constraints deleted, complain */ + if (deleted == 0) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("constraint \"%s\" does not exist", + constrName))); + /* Otherwise if more than one constraint deleted, notify */ + else if (deleted > 1) + ereport(NOTICE, + (errmsg("multiple constraints named \"%s\" were dropped", + constrName))); + } +} + +/* + * ALTER COLUMN TYPE + */ +static void +ATPrepAlterColumnType(List **wqueue, + AlteredTableInfo *tab, Relation rel, + bool recurse, bool recursing, + AlterTableCmd *cmd) +{ + char *colName = cmd->name; + TypeName *typename = (TypeName *) cmd->def; + HeapTuple tuple; + Form_pg_attribute attTup; + AttrNumber attnum; + Oid targettype; + Node *transform; + NewColumnValue *newval; + ParseState *pstate = make_parsestate(NULL); + + /* lookup the attribute so we can check inheritance status */ + tuple = SearchSysCacheAttName(RelationGetRelid(rel), colName); + if (!HeapTupleIsValid(tuple)) ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not a table", - RelationGetRelationName(rel)))); + (errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("column \"%s\" of relation \"%s\" does not exist", + colName, RelationGetRelationName(rel)))); + attTup = (Form_pg_attribute) GETSTRUCT(tuple); + attnum = attTup->attnum; - /* Permissions checks */ - if (!pg_class_ownercheck(myrelid, GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, - RelationGetRelationName(rel)); + /* Can't alter a system attribute */ + if (attnum <= 0) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot alter system column \"%s\"", + colName))); - if (!allowSystemTableMods && IsSystemRelation(rel)) + /* Don't alter inherited columns */ + if (attTup->attinhcount > 0 && !recursing) ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("permission denied: \"%s\" is a system catalog", - RelationGetRelationName(rel)))); + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("cannot alter inherited column \"%s\"", + colName))); + + /* Look up the target type */ + targettype = LookupTypeName(typename); + if (!OidIsValid(targettype)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("type \"%s\" does not exist", + TypeNameToString(typename)))); + + /* make sure datatype is legal for a column */ + CheckAttributeType(colName, targettype); /* - * Process child tables if requested. + * Set up an expression to transform the old data value to the new type. + * If a USING option was given, transform and use that expression, else + * just take the old value and try to coerce it. We do this first so + * that type incompatibility can be detected before we waste effort, + * and because we need the expression to be parsed against the original + * table rowtype. + */ + if (cmd->transform) + { + RangeTblEntry *rte; + + /* Expression must be able to access vars of old table */ + rte = addRangeTableEntryForRelation(pstate, + RelationGetRelid(rel), + makeAlias(RelationGetRelationName(rel), NIL), + false, + true); + addRTEtoQuery(pstate, rte, false, true); + + transform = transformExpr(pstate, cmd->transform); + + /* It can't return a set */ + if (expression_returns_set(transform)) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("transform expression must not return a set"))); + + /* No subplans or aggregates, either... */ + if (pstate->p_hasSubLinks) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use subquery in transform expression"))); + if (pstate->p_hasAggs) + ereport(ERROR, + (errcode(ERRCODE_GROUPING_ERROR), + errmsg("cannot use aggregate function in transform expression"))); + } + else + { + transform = (Node *) makeVar(1, attnum, + attTup->atttypid, attTup->atttypmod, + 0); + } + + transform = coerce_to_target_type(pstate, + transform, exprType(transform), + targettype, typename->typmod, + COERCION_ASSIGNMENT, + COERCE_IMPLICIT_CAST); + if (transform == NULL) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("column \"%s\" cannot be cast to type \"%s\"", + colName, TypeNameToString(typename)))); + + /* + * Add a work queue item to make ATRewriteTable update the column + * contents. + */ + newval = (NewColumnValue *) palloc0(sizeof(NewColumnValue)); + newval->attnum = attnum; + newval->expr = (Expr *) transform; + + tab->newvals = lappend(tab->newvals, newval); + + ReleaseSysCache(tuple); + + /* + * The recursion case is handled by ATSimpleRecursion. However, + * if we are told not to recurse, there had better not be any + * child tables; else the alter would put them out of step. */ if (recurse) + ATSimpleRecursion(wqueue, rel, cmd, recurse); + else if (!recursing && + find_inheritance_children(RelationGetRelid(rel)) != NIL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("type of inherited column \"%s\" must be changed in child tables too", + colName))); +} + +static void +ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel, + const char *colName, TypeName *typename) +{ + HeapTuple heapTup; + Form_pg_attribute attTup; + AttrNumber attnum; + HeapTuple typeTuple; + Form_pg_type tform; + Oid targettype; + Node *defaultexpr; + Relation attrelation; + Relation depRel; + ScanKeyData key[3]; + SysScanDesc scan; + HeapTuple depTup; + + attrelation = heap_openr(AttributeRelationName, RowExclusiveLock); + + /* Look up the target column */ + heapTup = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName); + if (!HeapTupleIsValid(heapTup)) /* shouldn't happen */ + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("column \"%s\" of relation \"%s\" does not exist", + colName, RelationGetRelationName(rel)))); + attTup = (Form_pg_attribute) GETSTRUCT(heapTup); + attnum = attTup->attnum; + + /* Check for multiple ALTER TYPE on same column --- can't cope */ + if (attTup->atttypid != tab->oldDesc->attrs[attnum-1]->atttypid || + attTup->atttypmod != tab->oldDesc->attrs[attnum-1]->atttypmod) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot alter type of column \"%s\" twice", + colName))); + + /* Look up the target type (should not fail, since prep found it) */ + typeTuple = typenameType(typename); + tform = (Form_pg_type) GETSTRUCT(typeTuple); + targettype = HeapTupleGetOid(typeTuple); + + /* + * If there is a default expression for the column, get it and ensure + * we can coerce it to the new datatype. (We must do this before + * changing the column type, because build_column_default itself will + * try to coerce, and will not issue the error message we want if it + * fails.) + */ + if (attTup->atthasdef) { - List *child, - *children; + defaultexpr = build_column_default(rel, attnum); + Assert(defaultexpr); + defaultexpr = coerce_to_target_type(NULL, /* no UNKNOWN params */ + defaultexpr, exprType(defaultexpr), + targettype, typename->typmod, + COERCION_ASSIGNMENT, + COERCE_IMPLICIT_CAST); + if (defaultexpr == NULL) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("default for column \"%s\" cannot be cast to type \"%s\"", + colName, TypeNameToString(typename)))); + } + else + defaultexpr = NULL; - /* This routine is actually in the planner */ - children = find_all_inheritors(myrelid); + /* + * Find everything that depends on the column (constraints, indexes, etc), + * and record enough information to let us recreate the objects. + * + * The actual recreation does not happen here, but only after we have + * performed all the individual ALTER TYPE operations. We have to save + * the info before executing ALTER TYPE, though, else the deparser will + * get confused. + * + * There could be multiple entries for the same object, so we must check + * to ensure we process each one only once. Note: we assume that an index + * that implements a constraint will not show a direct dependency on the + * column. + */ + depRel = heap_openr(DependRelationName, RowExclusiveLock); + + ScanKeyInit(&key[0], + Anum_pg_depend_refclassid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelOid_pg_class)); + ScanKeyInit(&key[1], + Anum_pg_depend_refobjid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(rel))); + ScanKeyInit(&key[2], + Anum_pg_depend_refobjsubid, + BTEqualStrategyNumber, F_INT4EQ, + Int32GetDatum((int32) attnum)); + + scan = systable_beginscan(depRel, DependReferenceIndex, true, + SnapshotNow, 3, key); + + while (HeapTupleIsValid(depTup = systable_getnext(scan))) + { + Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(depTup); + ObjectAddress foundObject; - /* - * find_all_inheritors does the recursive search of the - * inheritance hierarchy, so all we have to do is process all of - * the relids in the list that it returns. - */ - foreach(child, children) + /* We don't expect any PIN dependencies on columns */ + if (foundDep->deptype == DEPENDENCY_PIN) + elog(ERROR, "cannot alter type of a pinned column"); + + foundObject.classId = foundDep->classid; + foundObject.objectId = foundDep->objid; + foundObject.objectSubId = foundDep->objsubid; + + switch (getObjectClass(&foundObject)) { - Oid childrelid = lfirsto(child); - Relation inhrel; + case OCLASS_CLASS: + { + char relKind = get_rel_relkind(foundObject.objectId); - if (childrelid == myrelid) - continue; - inhrel = heap_open(childrelid, AccessExclusiveLock); - /* do NOT count child constraints in deleted. */ - RemoveRelConstraints(inhrel, constrName, behavior); - heap_close(inhrel, NoLock); + if (relKind == RELKIND_INDEX) + { + Assert(foundObject.objectSubId == 0); + if (!oidMember(foundObject.objectId, tab->changedIndexOids)) + { + tab->changedIndexOids = lappendo(tab->changedIndexOids, + foundObject.objectId); + tab->changedIndexDefs = lappend(tab->changedIndexDefs, + pg_get_indexdef_string(foundObject.objectId)); + } + } + else if (relKind == RELKIND_SEQUENCE) + { + /* + * This must be a SERIAL column's sequence. We need not + * do anything to it. + */ + Assert(foundObject.objectSubId == 0); + } + else + { + /* Not expecting any other direct dependencies... */ + elog(ERROR, "unexpected object depending on column: %s", + getObjectDescription(&foundObject)); + } + break; + } + + case OCLASS_CONSTRAINT: + Assert(foundObject.objectSubId == 0); + if (!oidMember(foundObject.objectId, tab->changedConstraintOids)) + { + tab->changedConstraintOids = lappendo(tab->changedConstraintOids, + foundObject.objectId); + tab->changedConstraintDefs = lappend(tab->changedConstraintDefs, + pg_get_constraintdef_string(foundObject.objectId)); + } + break; + + case OCLASS_REWRITE: + /* XXX someday see if we can cope with revising views */ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot alter type of a column used by a view or rule"), + errdetail("%s depends on column \"%s\"", + getObjectDescription(&foundObject), + colName))); + break; + + case OCLASS_DEFAULT: + /* + * Ignore the column's default expression, since we will fix + * it below. + */ + Assert(defaultexpr); + break; + + case OCLASS_PROC: + case OCLASS_TYPE: + case OCLASS_CAST: + case OCLASS_CONVERSION: + case OCLASS_LANGUAGE: + case OCLASS_OPERATOR: + case OCLASS_OPCLASS: + case OCLASS_TRIGGER: + case OCLASS_SCHEMA: + /* + * We don't expect any of these sorts of objects to depend + * on a column. + */ + elog(ERROR, "unexpected object depending on column: %s", + getObjectDescription(&foundObject)); + break; + + default: + elog(ERROR, "unrecognized object class: %u", + foundObject.classId); } } + systable_endscan(scan); + /* - * Now do the thing on this relation. + * Now scan for dependencies of this column on other things. The only + * thing we should find is the dependency on the column datatype, + * which we want to remove. */ - deleted += RemoveRelConstraints(rel, constrName, behavior); + ScanKeyInit(&key[0], + Anum_pg_depend_classid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelOid_pg_class)); + ScanKeyInit(&key[1], + Anum_pg_depend_objid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(rel))); + ScanKeyInit(&key[2], + Anum_pg_depend_objsubid, + BTEqualStrategyNumber, F_INT4EQ, + Int32GetDatum((int32) attnum)); + + scan = systable_beginscan(depRel, DependDependerIndex, true, + SnapshotNow, 3, key); + + while (HeapTupleIsValid(depTup = systable_getnext(scan))) + { + Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(depTup); - /* Close the target relation */ - heap_close(rel, NoLock); + if (foundDep->deptype != DEPENDENCY_NORMAL) + elog(ERROR, "found unexpected dependency type '%c'", + foundDep->deptype); + if (foundDep->classid != RelOid_pg_type || + foundDep->objid != attTup->atttypid) + elog(ERROR, "found unexpected dependency for column"); - /* If zero constraints deleted, complain */ - if (deleted == 0) - ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_OBJECT), - errmsg("constraint \"%s\" does not exist", - constrName))); - /* Otherwise if more than one constraint deleted, notify */ - else if (deleted > 1) - ereport(NOTICE, - (errmsg("multiple constraints named \"%s\" were dropped", - constrName))); + simple_heap_delete(depRel, &depTup->t_self); + } + + systable_endscan(scan); + + heap_close(depRel, RowExclusiveLock); + + /* + * Here we go --- change the recorded column type. (Note heapTup is + * a copy of the syscache entry, so okay to scribble on.) + */ + attTup->atttypid = targettype; + attTup->atttypmod = typename->typmod; + attTup->attndims = length(typename->arrayBounds); + attTup->attlen = tform->typlen; + attTup->attbyval = tform->typbyval; + attTup->attalign = tform->typalign; + attTup->attstorage = tform->typstorage; + + ReleaseSysCache(typeTuple); + + simple_heap_update(attrelation, &heapTup->t_self, heapTup); + + /* keep system catalog indexes current */ + CatalogUpdateIndexes(attrelation, heapTup); + + heap_close(attrelation, RowExclusiveLock); + + /* Install dependency on new datatype */ + add_column_datatype_dependency(RelationGetRelid(rel), attnum, targettype); + + /* Drop any pg_statistic entry for the column, since it's now wrong type */ + RemoveStatistics(rel, attnum); + + /* + * Update the default, if present, by brute force --- remove and re-add + * the default. Probably unsafe to take shortcuts, since the new version + * may well have additional dependencies. (It's okay to do this now, + * rather than after other ALTER TYPE commands, since the default won't + * depend on other column types.) + */ + if (defaultexpr) + { + /* Must make new row visible since it will be updated again */ + CommandCounterIncrement(); + + /* + * We use RESTRICT here for safety, but at present we do not expect + * anything to depend on the default. + */ + RemoveAttrDefault(RelationGetRelid(rel), attnum, DROP_RESTRICT, true); + + StoreAttrDefault(rel, attnum, nodeToString(defaultexpr)); + } + + /* Cleanup */ + heap_freetuple(heapTup); } /* + * Cleanup after we've finished all the ALTER TYPE operations for a + * particular relation. We have to drop and recreate all the indexes + * and constraints that depend on the altered columns. + */ +static void +ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab) +{ + ObjectAddress obj; + List *l; + + /* + * Re-parse the index and constraint definitions, and attach them to + * the appropriate work queue entries. We do this before dropping + * because in the case of a FOREIGN KEY constraint, we might not yet + * have exclusive lock on the table the constraint is attached to, + * and we need to get that before dropping. It's safe because the + * parser won't actually look at the catalogs to detect the existing + * entry. + */ + foreach(l, tab->changedIndexDefs) + { + ATPostAlterTypeParse((char *) lfirst(l), wqueue); + } + foreach(l, tab->changedConstraintDefs) + { + ATPostAlterTypeParse((char *) lfirst(l), wqueue); + } + + /* + * Now we can drop the existing constraints and indexes --- constraints + * first, since some of them might depend on the indexes. It should be + * okay to use DROP_RESTRICT here, since nothing else should be depending + * on these objects. + */ + if (tab->changedConstraintOids) + obj.classId = get_system_catalog_relid(ConstraintRelationName); + foreach(l, tab->changedConstraintOids) + { + obj.objectId = lfirsto(l); + obj.objectSubId = 0; + performDeletion(&obj, DROP_RESTRICT); + } + + obj.classId = RelOid_pg_class; + foreach(l, tab->changedIndexOids) + { + obj.objectId = lfirsto(l); + obj.objectSubId = 0; + performDeletion(&obj, DROP_RESTRICT); + } + + /* + * The objects will get recreated during subsequent passes over the + * work queue. + */ +} + +static void +ATPostAlterTypeParse(char *cmd, List **wqueue) +{ + List *raw_parsetree_list; + List *querytree_list; + List *list_item; + + /* + * We expect that we only have to do raw parsing and parse analysis, not + * any rule rewriting, since these will all be utility statements. + */ + raw_parsetree_list = raw_parser(cmd); + querytree_list = NIL; + foreach(list_item, raw_parsetree_list) + { + Node *parsetree = (Node *) lfirst(list_item); + + querytree_list = nconc(querytree_list, + parse_analyze(parsetree, NULL, 0)); + } + + /* + * Attach each generated command to the proper place in the work queue. + * Note this could result in creation of entirely new work-queue entries. + */ + foreach(list_item, querytree_list) + { + Query *query = (Query *) lfirst(list_item); + Relation rel; + AlteredTableInfo *tab; + + Assert(IsA(query, Query)); + Assert(query->commandType == CMD_UTILITY); + switch (nodeTag(query->utilityStmt)) + { + case T_IndexStmt: + { + IndexStmt *stmt = (IndexStmt *) query->utilityStmt; + AlterTableCmd *newcmd; + + rel = relation_openrv(stmt->relation, AccessExclusiveLock); + tab = ATGetQueueEntry(wqueue, rel); + newcmd = makeNode(AlterTableCmd); + newcmd->subtype = AT_ReAddIndex; + newcmd->def = (Node *) stmt; + tab->subcmds[AT_PASS_OLD_INDEX] = + lappend(tab->subcmds[AT_PASS_OLD_INDEX], newcmd); + relation_close(rel, NoLock); + break; + } + case T_AlterTableStmt: + { + AlterTableStmt *stmt = (AlterTableStmt *) query->utilityStmt; + List *lcmd; + + rel = relation_openrv(stmt->relation, AccessExclusiveLock); + tab = ATGetQueueEntry(wqueue, rel); + foreach(lcmd, stmt->cmds) + { + AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd); + + switch (cmd->subtype) + { + case AT_AddIndex: + cmd->subtype = AT_ReAddIndex; + tab->subcmds[AT_PASS_OLD_INDEX] = + lappend(tab->subcmds[AT_PASS_OLD_INDEX], cmd); + break; + case AT_AddConstraint: + tab->subcmds[AT_PASS_OLD_CONSTR] = + lappend(tab->subcmds[AT_PASS_OLD_CONSTR], cmd); + break; + default: + elog(ERROR, "unexpected statement type: %d", + (int) cmd->subtype); + } + } + relation_close(rel, NoLock); + break; + } + default: + elog(ERROR, "unexpected statement type: %d", + (int) nodeTag(query->utilityStmt)); + } + } +} + + +/* * ALTER TABLE OWNER */ -void -AlterTableOwner(Oid relationOid, int32 newOwnerSysId) +static void +ATExecChangeOwner(Oid relationOid, int32 newOwnerSysId) { Relation target_rel; Relation class_rel; @@ -3879,7 +4965,7 @@ AlterTableOwner(Oid relationOid, int32 newOwnerSysId) /* For each index, recursively change its ownership */ foreach(i, index_oid_list) - AlterTableOwner(lfirsto(i), newOwnerSysId); + ATExecChangeOwner(lfirsto(i), newOwnerSysId); freeList(index_oid_list); } @@ -3888,7 +4974,7 @@ AlterTableOwner(Oid relationOid, int32 newOwnerSysId) { /* If it has a toast table, recurse to change its ownership */ if (tuple_class->reltoastrelid != InvalidOid) - AlterTableOwner(tuple_class->reltoastrelid, newOwnerSysId); + ATExecChangeOwner(tuple_class->reltoastrelid, newOwnerSysId); } heap_freetuple(tuple); @@ -3901,25 +4987,22 @@ AlterTableOwner(Oid relationOid, int32 newOwnerSysId) * * The only thing we have to do is to change the indisclustered bits. */ -void -AlterTableClusterOn(Oid relOid, const char *indexName) +static void +ATExecClusterOn(Relation rel, const char *indexName) { - Relation rel, - pg_index; + Relation pg_index; List *index; Oid indexOid; HeapTuple indexTuple; Form_pg_index indexForm; - rel = heap_open(relOid, AccessExclusiveLock); - indexOid = get_relname_relid(indexName, rel->rd_rel->relnamespace); if (!OidIsValid(indexOid)) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("index \"%s\" for table \"%s\" does not exist", - indexName, NameStr(rel->rd_rel->relname)))); + indexName, RelationGetRelationName(rel)))); indexTuple = SearchSysCache(INDEXRELID, ObjectIdGetDatum(indexOid), @@ -3935,10 +5018,11 @@ AlterTableClusterOn(Oid relOid, const char *indexName) if (indexForm->indisclustered) { ReleaseSysCache(indexTuple); - heap_close(rel, NoLock); return; } + ReleaseSysCache(indexTuple); + pg_index = heap_openr(IndexRelationName, RowExclusiveLock); /* @@ -3977,14 +5061,15 @@ AlterTableClusterOn(Oid relOid, const char *indexName) } heap_close(pg_index, RowExclusiveLock); - - ReleaseSysCache(indexTuple); - - heap_close(rel, NoLock); /* close rel, but keep lock till commit */ } /* * ALTER TABLE CREATE TOAST TABLE + * + * Note: this is also invoked from outside this module; in such cases we + * expect the caller to have verified that the relation is a table and we + * have all the right permissions. Callers expect this function + * to end with CommandCounterIncrement if it makes any changes. */ void AlterTableCreateToastTable(Oid relOid, bool silent) @@ -4005,21 +5090,11 @@ AlterTableCreateToastTable(Oid relOid, bool silent) /* * Grab an exclusive lock on the target table, which we will NOT - * release until end of transaction. + * release until end of transaction. (This is probably redundant + * in all present uses...) */ rel = heap_open(relOid, AccessExclusiveLock); - if (rel->rd_rel->relkind != RELKIND_RELATION) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("\"%s\" is not a table", - RelationGetRelationName(rel)))); - - /* Permissions checks */ - if (!pg_class_ownercheck(relOid, GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, - RelationGetRelationName(rel)); - /* * Toast table is shared if and only if its parent is. * @@ -4149,7 +5224,7 @@ AlterTableCreateToastTable(Oid relOid, bool silent) toast_idxid = index_create(toast_relid, toast_idxname, indexInfo, BTREE_AM_OID, classObjectId, - true, false, true); + true, false, true, false); /* * Update toast rel's pg_class entry to show that it has an index. The |