aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/tablecmds.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r--src/backend/commands/tablecmds.c3071
1 files changed, 2073 insertions, 998 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index a6e3a93d349..a9c307b80c3 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.102 2004/04/01 21:28:44 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.103 2004/05/05 04:48:45 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -24,27 +24,33 @@
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/pg_constraint.h"
+#include "catalog/pg_depend.h"
#include "catalog/pg_inherits.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_opclass.h"
#include "catalog/pg_trigger.h"
#include "catalog/pg_type.h"
#include "commands/cluster.h"
+#include "commands/defrem.h"
#include "commands/tablecmds.h"
#include "commands/trigger.h"
#include "executor/executor.h"
+#include "lib/stringinfo.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "optimizer/clauses.h"
#include "optimizer/plancat.h"
#include "optimizer/prep.h"
+#include "parser/analyze.h"
#include "parser/gramparse.h"
+#include "parser/parser.h"
#include "parser/parse_clause.h"
#include "parser/parse_coerce.h"
#include "parser/parse_expr.h"
#include "parser/parse_oper.h"
#include "parser/parse_relation.h"
#include "parser/parse_type.h"
+#include "rewrite/rewriteHandler.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
@@ -76,6 +82,83 @@ typedef struct OnCommitItem
static List *on_commits = NIL;
+/*
+ * State information for ALTER TABLE
+ *
+ * The pending-work queue for an ALTER TABLE is a List of AlteredTableInfo
+ * structs, one for each table modified by the operation (the named table
+ * plus any child tables that are affected). We save lists of subcommands
+ * to apply to this table (possibly modified by parse transformation steps);
+ * these lists will be executed in Phase 2. If a Phase 3 step is needed,
+ * necessary information is stored in the constraints and newvals lists.
+ *
+ * Phase 2 is divided into multiple passes; subcommands are executed in
+ * a pass determined by subcommand type.
+ */
+
+#define AT_PASS_DROP 0 /* DROP (all flavors) */
+#define AT_PASS_ALTER_TYPE 1 /* ALTER COLUMN TYPE */
+#define AT_PASS_OLD_INDEX 2 /* re-add existing indexes */
+#define AT_PASS_OLD_CONSTR 3 /* re-add existing constraints */
+#define AT_PASS_COL_ATTRS 4 /* set other column attributes */
+/* We could support a RENAME COLUMN pass here, but not currently used */
+#define AT_PASS_ADD_COL 5 /* ADD COLUMN */
+#define AT_PASS_ADD_INDEX 6 /* ADD indexes */
+#define AT_PASS_ADD_CONSTR 7 /* ADD constraints, defaults */
+#define AT_PASS_MISC 8 /* other stuff */
+#define AT_NUM_PASSES 9
+
+typedef struct AlteredTableInfo
+{
+ /* Information saved before any work commences: */
+ Oid relid; /* Relation to work on */
+ TupleDesc oldDesc; /* Pre-modification tuple descriptor */
+ /* Information saved by Phase 1 for Phase 2: */
+ List *subcmds[AT_NUM_PASSES]; /* Lists of AlterTableCmd */
+ /* Information saved by Phases 1/2 for Phase 3: */
+ List *constraints; /* List of NewConstraint */
+ List *newvals; /* List of NewColumnValue */
+ /* Objects to rebuild after completing ALTER TYPE operations */
+ List *changedConstraintOids; /* OIDs of constraints to rebuild */
+ List *changedConstraintDefs; /* string definitions of same */
+ List *changedIndexOids; /* OIDs of indexes to rebuild */
+ List *changedIndexDefs; /* string definitions of same */
+ /* Workspace for ATExecAddConstraint */
+ int constr_name_ctr;
+} AlteredTableInfo;
+
+/* Struct describing one new constraint to check in Phase 3 scan */
+typedef struct NewConstraint
+{
+ char *name; /* Constraint name, or NULL if none */
+ ConstrType contype; /* CHECK, NOT_NULL, or FOREIGN */
+ AttrNumber attnum; /* only relevant for NOT_NULL */
+ Oid refrelid; /* PK rel, if FOREIGN */
+ Node *qual; /* Check expr or FkConstraint struct */
+ List *qualstate; /* Execution state for CHECK */
+} NewConstraint;
+
+/*
+ * Struct describing one new column value that needs to be computed during
+ * Phase 3 copy (this could be either a new column with a non-null default, or
+ * a column that we're changing the type of). Columns without such an entry
+ * are just copied from the old table during ATRewriteTable. Note that the
+ * expr is an expression over *old* table values.
+ */
+typedef struct NewColumnValue
+{
+ AttrNumber attnum; /* which column */
+ Expr *expr; /* expression to compute */
+ ExprState *exprstate; /* execution state */
+} NewColumnValue;
+
+
+/* Used by attribute and relation renaming routines: */
+#define RI_TRIGGER_PK 1 /* is a trigger on the PK relation */
+#define RI_TRIGGER_FK 2 /* is a trigger on the FK relation */
+#define RI_TRIGGER_NONE 0 /* is not an RI trigger function */
+
+
static List *MergeAttributes(List *schema, List *supers, bool istemp,
List **supOids, List **supconstr, int *supOidCount);
static bool change_varattnos_of_a_node(Node *node, const AttrNumber *newattno);
@@ -83,9 +166,6 @@ static void StoreCatalogInheritance(Oid relationId, List *supers);
static int findAttrByName(const char *attributeName, List *schema);
static void setRelhassubclassInRelation(Oid relationId, bool relhassubclass);
static bool needs_toast_table(Relation rel);
-static void AlterTableAddCheckConstraint(Relation rel, Constraint *constr);
-static void AlterTableAddForeignKeyConstraint(Relation rel,
- FkConstraint *fkconstraint);
static int transformColumnNameList(Oid relId, List *colList,
int16 *attnums, Oid *atttypids);
static int transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
@@ -100,13 +180,58 @@ static void validateForeignKeyConstraint(FkConstraint *fkconstraint,
static void createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint,
Oid constrOid);
static char *fkMatchTypeToString(char match_type);
-
-/* Used by attribute and relation renaming routines: */
-
-#define RI_TRIGGER_PK 1 /* is a trigger on the PK relation */
-#define RI_TRIGGER_FK 2 /* is a trigger on the FK relation */
-#define RI_TRIGGER_NONE 0 /* is not an RI trigger function */
-
+static void ATController(Relation rel, List *cmds, bool recurse);
+static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
+ bool recurse, bool recursing);
+static void ATRewriteCatalogs(List **wqueue);
+static void ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd);
+static void ATRewriteTables(List **wqueue);
+static void ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap);
+static AlteredTableInfo *ATGetQueueEntry(List **wqueue, Relation rel);
+static void ATSimplePermissions(Relation rel, bool allowView);
+static void ATSimpleRecursion(List **wqueue, Relation rel,
+ AlterTableCmd *cmd, bool recurse);
+static void ATOneLevelRecursion(List **wqueue, Relation rel,
+ AlterTableCmd *cmd);
+static void ATPrepAddColumn(List **wqueue, Relation rel, bool recurse,
+ AlterTableCmd *cmd);
+static void ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
+ ColumnDef *colDef);
+static void add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid);
+static void ATExecDropNotNull(Relation rel, const char *colName);
+static void ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
+ const char *colName);
+static void ATExecColumnDefault(Relation rel, const char *colName,
+ Node *newDefault);
+static void ATPrepSetStatistics(Relation rel, const char *colName,
+ Node *flagValue);
+static void ATExecSetStatistics(Relation rel, const char *colName,
+ Node *newValue);
+static void ATExecSetStorage(Relation rel, const char *colName,
+ Node *newValue);
+static void ATExecDropColumn(Relation rel, const char *colName,
+ DropBehavior behavior,
+ bool recurse, bool recursing);
+static void ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
+ IndexStmt *stmt, bool is_rebuild);
+static void ATExecAddConstraint(AlteredTableInfo *tab, Relation rel,
+ Node *newConstraint);
+static void ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
+ FkConstraint *fkconstraint);
+static void ATPrepDropConstraint(List **wqueue, Relation rel,
+ bool recurse, AlterTableCmd *cmd);
+static void ATExecDropConstraint(Relation rel, const char *constrName,
+ DropBehavior behavior, bool quiet);
+static void ATPrepAlterColumnType(List **wqueue,
+ AlteredTableInfo *tab, Relation rel,
+ bool recurse, bool recursing,
+ AlterTableCmd *cmd);
+static void ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
+ const char *colName, TypeName *typename);
+static void ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab);
+static void ATPostAlterTypeParse(char *cmd, List **wqueue);
+static void ATExecChangeOwner(Oid relationOid, int32 newOwnerSysId);
+static void ATExecClusterOn(Relation rel, const char *indexName);
static int ri_trigger_type(Oid tgfoid);
static void update_ri_trigger_args(Oid relid,
const char *oldname,
@@ -1100,7 +1225,7 @@ renameatt(Oid myrelid,
if (childrelid == myrelid)
continue;
- /* note we need not recurse again! */
+ /* note we need not recurse again */
renameatt(childrelid, oldattname, newattname, false, true);
}
}
@@ -1129,7 +1254,7 @@ renameatt(Oid myrelid,
attform = (Form_pg_attribute) GETSTRUCT(atttup);
attnum = attform->attnum;
- if (attnum < 0)
+ if (attnum <= 0)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot rename system column \"%s\"",
@@ -1240,8 +1365,7 @@ renameatt(Oid myrelid,
true, false);
}
- relation_close(targetrelation, NoLock); /* close rel but keep
- * lock! */
+ relation_close(targetrelation, NoLock); /* close rel but keep lock */
}
/*
@@ -1559,55 +1683,803 @@ update_ri_trigger_args(Oid relid,
CommandCounterIncrement();
}
+/*
+ * AlterTable
+ * Execute ALTER TABLE, which can be a list of subcommands
+ *
+ * ALTER TABLE is performed in three phases:
+ * 1. Examine subcommands and perform pre-transformation checking.
+ * 2. Update system catalogs.
+ * 3. Scan table(s) to check new constraints, and optionally recopy
+ * the data into new table(s).
+ * Phase 3 is not performed unless one or more of the subcommands requires
+ * it. The intention of this design is to allow multiple independent
+ * updates of the table schema to be performed with only one pass over the
+ * data.
+ *
+ * ATPrepCmd performs phase 1. A "work queue" entry is created for
+ * each table to be affected (there may be multiple affected tables if the
+ * commands traverse a table inheritance hierarchy). Also we do preliminary
+ * validation of the subcommands, including parse transformation of those
+ * expressions that need to be evaluated with respect to the old table
+ * schema.
+ *
+ * ATRewriteCatalogs performs phase 2 for each affected table (note that
+ * phases 2 and 3 do no explicit recursion, since phase 1 already did it).
+ * Certain subcommands need to be performed before others to avoid
+ * unnecessary conflicts; for example, DROP COLUMN should come before
+ * ADD COLUMN. Therefore phase 1 divides the subcommands into multiple
+ * lists, one for each logical "pass" of phase 2.
+ *
+ * ATRewriteTables performs phase 3 for those tables that need it.
+ *
+ * Thanks to the magic of MVCC, an error anywhere along the way rolls back
+ * the whole operation; we don't have to do anything special to clean up.
+ */
+void
+AlterTable(AlterTableStmt *stmt)
+{
+ ATController(relation_openrv(stmt->relation, AccessExclusiveLock),
+ stmt->cmds,
+ interpretInhOption(stmt->relation->inhOpt));
+}
-/* ----------------
- * AlterTableAddColumn
- * (formerly known as PerformAddAttribute)
+/*
+ * AlterTableInternal
*
- * adds an additional attribute to a relation
- * ----------------
+ * ALTER TABLE with target specified by OID
*/
void
-AlterTableAddColumn(Oid myrelid,
- bool recurse,
- ColumnDef *colDef)
+AlterTableInternal(Oid relid, List *cmds, bool recurse)
{
- Relation rel,
- pgclass,
- attrdesc;
- HeapTuple reltup;
- HeapTuple newreltup;
- HeapTuple attributeTuple;
- Form_pg_attribute attribute;
- FormData_pg_attribute attributeD;
+ ATController(relation_open(relid, AccessExclusiveLock),
+ cmds,
+ recurse);
+}
+
+static void
+ATController(Relation rel, List *cmds, bool recurse)
+{
+ List *wqueue = NIL;
+ List *lcmd;
+
+ /* Phase 1: preliminary examination of commands, create work queue */
+ foreach(lcmd, cmds)
+ {
+ AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd);
+
+ ATPrepCmd(&wqueue, rel, cmd, recurse, false);
+ }
+
+ /* Close the relation, but keep lock until commit */
+ relation_close(rel, NoLock);
+
+ /* Phase 2: update system catalogs */
+ ATRewriteCatalogs(&wqueue);
+
+ /* Phase 3: scan/rewrite tables as needed */
+ ATRewriteTables(&wqueue);
+}
+
+/*
+ * ATPrepCmd
+ *
+ * Traffic cop for ALTER TABLE Phase 1 operations, including simple
+ * recursion and permission checks.
+ *
+ * Caller must have acquired AccessExclusiveLock on relation already.
+ * This lock should be held until commit.
+ */
+static void
+ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
+ bool recurse, bool recursing)
+{
+ AlteredTableInfo *tab;
+ int pass;
+
+ /* Find or create work queue entry for this table */
+ tab = ATGetQueueEntry(wqueue, rel);
+
+ /*
+ * Copy the original subcommand for each table. This avoids conflicts
+ * when different child tables need to make different parse
+ * transformations (for example, the same column may have different
+ * column numbers in different children).
+ */
+ cmd = copyObject(cmd);
+
+ /*
+ * Do permissions checking, recursion to child tables if needed,
+ * and any additional phase-1 processing needed.
+ */
+ switch (cmd->subtype)
+ {
+ case AT_AddColumn: /* ADD COLUMN */
+ ATSimplePermissions(rel, false);
+ /* Performs own recursion */
+ ATPrepAddColumn(wqueue, rel, recurse, cmd);
+ pass = AT_PASS_ADD_COL;
+ break;
+ case AT_ColumnDefault: /* ALTER COLUMN DEFAULT */
+ /*
+ * We allow defaults on views so that INSERT into a view can have
+ * default-ish behavior. This works because the rewriter
+ * substitutes default values into INSERTs before it expands
+ * rules.
+ */
+ ATSimplePermissions(rel, true);
+ ATSimpleRecursion(wqueue, rel, cmd, recurse);
+ /* No command-specific prep needed */
+ pass = AT_PASS_ADD_CONSTR;
+ break;
+ case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */
+ ATSimplePermissions(rel, false);
+ ATSimpleRecursion(wqueue, rel, cmd, recurse);
+ /* No command-specific prep needed */
+ pass = AT_PASS_DROP;
+ break;
+ case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */
+ ATSimplePermissions(rel, false);
+ ATSimpleRecursion(wqueue, rel, cmd, recurse);
+ /* No command-specific prep needed */
+ pass = AT_PASS_ADD_CONSTR;
+ break;
+ case AT_SetStatistics: /* ALTER COLUMN STATISTICS */
+ ATSimpleRecursion(wqueue, rel, cmd, recurse);
+ /* Performs own permission checks */
+ ATPrepSetStatistics(rel, cmd->name, cmd->def);
+ pass = AT_PASS_COL_ATTRS;
+ break;
+ case AT_SetStorage: /* ALTER COLUMN STORAGE */
+ ATSimplePermissions(rel, false);
+ ATSimpleRecursion(wqueue, rel, cmd, recurse);
+ /* No command-specific prep needed */
+ pass = AT_PASS_COL_ATTRS;
+ break;
+ case AT_DropColumn: /* DROP COLUMN */
+ ATSimplePermissions(rel, false);
+ /* Recursion occurs during execution phase */
+ /* No command-specific prep needed except saving recurse flag */
+ if (recurse)
+ cmd->subtype = AT_DropColumnRecurse;
+ pass = AT_PASS_DROP;
+ break;
+ case AT_AddIndex: /* ADD INDEX */
+ ATSimplePermissions(rel, false);
+ /* This command never recurses */
+ /* No command-specific prep needed */
+ pass = AT_PASS_ADD_INDEX;
+ break;
+ case AT_AddConstraint: /* ADD CONSTRAINT */
+ ATSimplePermissions(rel, false);
+ /*
+ * Currently we recurse only for CHECK constraints, never for
+ * foreign-key constraints. UNIQUE/PKEY constraints won't be
+ * seen here.
+ */
+ if (IsA(cmd->def, Constraint))
+ ATSimpleRecursion(wqueue, rel, cmd, recurse);
+ /* No command-specific prep needed */
+ pass = AT_PASS_ADD_CONSTR;
+ break;
+ case AT_DropConstraint: /* DROP CONSTRAINT */
+ ATSimplePermissions(rel, false);
+ /* Performs own recursion */
+ ATPrepDropConstraint(wqueue, rel, recurse, cmd);
+ pass = AT_PASS_DROP;
+ break;
+ case AT_DropConstraintQuietly: /* DROP CONSTRAINT for child */
+ ATSimplePermissions(rel, false);
+ ATSimpleRecursion(wqueue, rel, cmd, recurse);
+ /* No command-specific prep needed */
+ pass = AT_PASS_DROP;
+ break;
+ case AT_AlterColumnType: /* ALTER COLUMN TYPE */
+ ATSimplePermissions(rel, false);
+ /* Performs own recursion */
+ ATPrepAlterColumnType(wqueue, tab, rel, recurse, recursing, cmd);
+ pass = AT_PASS_ALTER_TYPE;
+ break;
+ case AT_ToastTable: /* CREATE TOAST TABLE */
+ ATSimplePermissions(rel, false);
+ /* This command never recurses */
+ /* No command-specific prep needed */
+ pass = AT_PASS_MISC;
+ break;
+ case AT_ChangeOwner: /* ALTER OWNER */
+ /* check that we are the superuser */
+ if (!superuser())
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("must be superuser to alter owner")));
+ /* This command never recurses */
+ /* No command-specific prep needed */
+ pass = AT_PASS_MISC;
+ break;
+ case AT_ClusterOn: /* CLUSTER ON */
+ ATSimplePermissions(rel, false);
+ /* This command never recurses */
+ /* No command-specific prep needed */
+ pass = AT_PASS_MISC;
+ break;
+ case AT_DropOids: /* SET WITHOUT OIDS */
+ ATSimplePermissions(rel, false);
+ /* Performs own recursion */
+ if (rel->rd_rel->relhasoids)
+ {
+ AlterTableCmd *dropCmd = makeNode(AlterTableCmd);
+
+ dropCmd->subtype = AT_DropColumn;
+ dropCmd->name = pstrdup("oid");
+ dropCmd->behavior = cmd->behavior;
+ ATPrepCmd(wqueue, rel, dropCmd, recurse, false);
+ }
+ pass = AT_PASS_DROP;
+ break;
+ default: /* oops */
+ elog(ERROR, "unrecognized alter table type: %d",
+ (int) cmd->subtype);
+ pass = 0; /* keep compiler quiet */
+ break;
+ }
+
+ /* Add the subcommand to the appropriate list for phase 2 */
+ tab->subcmds[pass] = lappend(tab->subcmds[pass], cmd);
+}
+
+/*
+ * ATRewriteCatalogs
+ *
+ * Traffic cop for ALTER TABLE Phase 2 operations. Subcommands are
+ * dispatched in a "safe" execution order (designed to avoid unnecessary
+ * conflicts).
+ */
+static void
+ATRewriteCatalogs(List **wqueue)
+{
+ int pass;
+ List *ltab;
+
+ /*
+ * We process all the tables "in parallel", one pass at a time. This
+ * is needed because we may have to propagate work from one table
+ * to another (specifically, ALTER TYPE on a foreign key's PK has to
+ * dispatch the re-adding of the foreign key constraint to the other
+ * table). Work can only be propagated into later passes, however.
+ */
+ for (pass = 0; pass < AT_NUM_PASSES; pass++)
+ {
+ /* Go through each table that needs to be processed */
+ foreach(ltab, *wqueue)
+ {
+ AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
+ List *subcmds = tab->subcmds[pass];
+ Relation rel;
+ List *lcmd;
+
+ if (subcmds == NIL)
+ continue;
+
+ /* Exclusive lock was obtained by phase 1, needn't get it again */
+ rel = relation_open(tab->relid, NoLock);
+
+ foreach(lcmd, subcmds)
+ {
+ ATExecCmd(tab, rel, (AlterTableCmd *) lfirst(lcmd));
+ }
+
+ /*
+ * After the ALTER TYPE pass, do cleanup work (this is not done in
+ * ATExecAlterColumnType since it should be done only once if
+ * multiple columns of a table are altered).
+ */
+ if (pass == AT_PASS_ALTER_TYPE)
+ ATPostAlterTypeCleanup(wqueue, tab);
+
+ relation_close(rel, NoLock);
+ }
+ }
+
+ /*
+ * Do an implicit CREATE TOAST TABLE if we executed any subcommands
+ * that might have added a column or changed column storage.
+ */
+ foreach(ltab, *wqueue)
+ {
+ AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
+
+ if (tab->subcmds[AT_PASS_ADD_COL] ||
+ tab->subcmds[AT_PASS_ALTER_TYPE] ||
+ tab->subcmds[AT_PASS_COL_ATTRS])
+ {
+ AlterTableCreateToastTable(tab->relid, true);
+ }
+ }
+}
+
+/*
+ * ATExecCmd: dispatch a subcommand to appropriate execution routine
+ */
+static void
+ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd)
+{
+ switch (cmd->subtype)
+ {
+ case AT_AddColumn: /* ADD COLUMN */
+ ATExecAddColumn(tab, rel, (ColumnDef *) cmd->def);
+ break;
+ case AT_ColumnDefault: /* ALTER COLUMN DEFAULT */
+ ATExecColumnDefault(rel, cmd->name, cmd->def);
+ break;
+ case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */
+ ATExecDropNotNull(rel, cmd->name);
+ break;
+ case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */
+ ATExecSetNotNull(tab, rel, cmd->name);
+ break;
+ case AT_SetStatistics: /* ALTER COLUMN STATISTICS */
+ ATExecSetStatistics(rel, cmd->name, cmd->def);
+ break;
+ case AT_SetStorage: /* ALTER COLUMN STORAGE */
+ ATExecSetStorage(rel, cmd->name, cmd->def);
+ break;
+ case AT_DropColumn: /* DROP COLUMN */
+ ATExecDropColumn(rel, cmd->name, cmd->behavior, false, false);
+ break;
+ case AT_DropColumnRecurse: /* DROP COLUMN with recursion */
+ ATExecDropColumn(rel, cmd->name, cmd->behavior, true, false);
+ break;
+ case AT_AddIndex: /* ADD INDEX */
+ ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, false);
+ break;
+ case AT_ReAddIndex: /* ADD INDEX */
+ ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, true);
+ break;
+ case AT_AddConstraint: /* ADD CONSTRAINT */
+ ATExecAddConstraint(tab, rel, cmd->def);
+ break;
+ case AT_DropConstraint: /* DROP CONSTRAINT */
+ ATExecDropConstraint(rel, cmd->name, cmd->behavior, false);
+ break;
+ case AT_DropConstraintQuietly: /* DROP CONSTRAINT for child */
+ ATExecDropConstraint(rel, cmd->name, cmd->behavior, true);
+ break;
+ case AT_AlterColumnType: /* ALTER COLUMN TYPE */
+ ATExecAlterColumnType(tab, rel, cmd->name, (TypeName *) cmd->def);
+ break;
+ case AT_ToastTable: /* CREATE TOAST TABLE */
+ AlterTableCreateToastTable(RelationGetRelid(rel), false);
+ break;
+ case AT_ChangeOwner: /* ALTER OWNER */
+ /* get_usesysid raises an error if no such user */
+ ATExecChangeOwner(RelationGetRelid(rel), get_usesysid(cmd->name));
+ break;
+ case AT_ClusterOn: /* CLUSTER ON */
+ ATExecClusterOn(rel, cmd->name);
+ break;
+ case AT_DropOids: /* SET WITHOUT OIDS */
+ /*
+ * Nothing to do here; we'll have generated a DropColumn subcommand
+ * to do the real work
+ */
+ break;
+ default: /* oops */
+ elog(ERROR, "unrecognized alter table type: %d",
+ (int) cmd->subtype);
+ break;
+ }
+
+ /*
+ * Bump the command counter to ensure the next subcommand in the sequence
+ * can see the changes so far
+ */
+ CommandCounterIncrement();
+}
+
+/*
+ * ATRewriteTables: ALTER TABLE phase 3
+ */
+static void
+ATRewriteTables(List **wqueue)
+{
+ List *ltab;
+
+ /* Go through each table that needs to be checked or rewritten */
+ foreach(ltab, *wqueue)
+ {
+ AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
+
+ /*
+ * We only need to rewrite the table if at least one column needs
+ * to be recomputed.
+ */
+ if (tab->newvals != NIL)
+ {
+ /* Build a temporary relation and copy data */
+ Oid OIDNewHeap;
+ char NewHeapName[NAMEDATALEN];
+ List *indexes;
+ Relation OldHeap;
+ ObjectAddress object;
+
+ /* Save the information about all indexes on the relation. */
+ OldHeap = heap_open(tab->relid, NoLock);
+ indexes = get_indexattr_list(OldHeap, InvalidOid);
+ heap_close(OldHeap, NoLock);
+
+ /*
+ * Create the new heap, using a temporary name in the same
+ * namespace as the existing table. NOTE: there is some risk of
+ * collision with user relnames. Working around this seems more
+ * trouble than it's worth; in particular, we can't create the new
+ * heap in a different namespace from the old, or we will have
+ * problems with the TEMP status of temp tables.
+ */
+ snprintf(NewHeapName, sizeof(NewHeapName),
+ "pg_temp_%u", tab->relid);
+
+ OIDNewHeap = make_new_heap(tab->relid, NewHeapName);
+
+ /*
+ * Copy the heap data into the new table with the desired
+ * modifications, and test the current data within the table
+ * against new constraints generated by ALTER TABLE commands.
+ */
+ ATRewriteTable(tab, OIDNewHeap);
+
+ /* Swap the relfilenodes of the old and new heaps. */
+ swap_relfilenodes(tab->relid, OIDNewHeap);
+
+ CommandCounterIncrement();
+
+ /* Destroy new heap with old filenode */
+ object.classId = RelOid_pg_class;
+ object.objectId = OIDNewHeap;
+ object.objectSubId = 0;
+
+ /*
+ * The new relation is local to our transaction and we know nothing
+ * depends on it, so DROP_RESTRICT should be OK.
+ */
+ performDeletion(&object, DROP_RESTRICT);
+ /* performDeletion does CommandCounterIncrement at end */
+
+ /*
+ * Rebuild each index on the relation. We do not need
+ * CommandCounterIncrement() because rebuild_indexes does it.
+ */
+ rebuild_indexes(tab->relid, indexes);
+ }
+ else
+ {
+ /*
+ * Test the current data within the table against new constraints
+ * generated by ALTER TABLE commands, but don't rebuild data.
+ */
+ ATRewriteTable(tab, InvalidOid);
+ }
+ }
+
+ /*
+ * Foreign key constraints are checked in a final pass, since
+ * (a) it's generally best to examine each one separately, and
+ * (b) it's at least theoretically possible that we have changed
+ * both relations of the foreign key, and we'd better have finished
+ * both rewrites before we try to read the tables.
+ */
+ foreach(ltab, *wqueue)
+ {
+ AlteredTableInfo *tab = (AlteredTableInfo *) lfirst(ltab);
+ Relation rel = NULL;
+ List *lcon;
+
+ foreach(lcon, tab->constraints)
+ {
+ NewConstraint *con = lfirst(lcon);
+
+ if (con->contype == CONSTR_FOREIGN)
+ {
+ FkConstraint *fkconstraint = (FkConstraint *) con->qual;
+ Relation refrel;
+
+ if (rel == NULL)
+ {
+ /* Long since locked, no need for another */
+ rel = heap_open(tab->relid, NoLock);
+ }
+
+ refrel = heap_open(con->refrelid, RowShareLock);
+
+ validateForeignKeyConstraint(fkconstraint, rel, refrel);
+
+ heap_close(refrel, NoLock);
+ }
+ }
+
+ if (rel)
+ heap_close(rel, NoLock);
+ }
+}
+
+/*
+ * ATRewriteTable: scan or rewrite one table
+ *
+ * OIDNewHeap is InvalidOid if we don't need to rewrite
+ */
+static void
+ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap)
+{
+ Relation oldrel;
+ Relation newrel;
+ TupleDesc oldTupDesc;
+ TupleDesc newTupDesc;
+ bool needscan = false;
int i;
- int minattnum,
- maxatts;
- HeapTuple typeTuple;
- Form_pg_type tform;
- int attndims;
- ObjectAddress myself,
- referenced;
+ List *l;
+ EState *estate;
/*
- * Grab an exclusive lock on the target table, which we will NOT
- * release until end of transaction.
+ * Open the relation(s). We have surely already locked the existing
+ * table.
*/
- rel = heap_open(myrelid, AccessExclusiveLock);
+ oldrel = heap_open(tab->relid, NoLock);
+ oldTupDesc = tab->oldDesc;
+ newTupDesc = RelationGetDescr(oldrel); /* includes all mods */
- if (rel->rd_rel->relkind != RELKIND_RELATION)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a table",
- RelationGetRelationName(rel))));
+ if (OidIsValid(OIDNewHeap))
+ newrel = heap_open(OIDNewHeap, AccessExclusiveLock);
+ else
+ newrel = NULL;
/*
- * permissions checking. this would normally be done in utility.c,
- * but this particular routine is recursive.
- *
- * normally, only the owner of a class can change its schema.
+ * Generate the constraint and default execution states
*/
- if (!pg_class_ownercheck(myrelid, GetUserId()))
+
+ estate = CreateExecutorState();
+
+ /* Build the needed expression execution states */
+ foreach(l, tab->constraints)
+ {
+ NewConstraint *con = lfirst(l);
+
+ switch (con->contype)
+ {
+ case CONSTR_CHECK:
+ needscan = true;
+ con->qualstate = (List *)
+ ExecPrepareExpr((Expr *) con->qual, estate);
+ break;
+ case CONSTR_FOREIGN:
+ /* Nothing to do here */
+ break;
+ case CONSTR_NOTNULL:
+ needscan = true;
+ break;
+ default:
+ elog(ERROR, "unrecognized constraint type: %d",
+ (int) con->contype);
+ }
+ }
+
+ foreach(l, tab->newvals)
+ {
+ NewColumnValue *ex = lfirst(l);
+
+ needscan = true;
+
+ ex->exprstate = ExecPrepareExpr((Expr *) ex->expr, estate);
+ }
+
+ if (needscan)
+ {
+ ExprContext *econtext;
+ Datum *values;
+ char *nulls;
+ TupleTableSlot *oldslot;
+ TupleTableSlot *newslot;
+ HeapScanDesc scan;
+ HeapTuple tuple;
+
+ econtext = GetPerTupleExprContext(estate);
+
+ /*
+ * Make tuple slots for old and new tuples. Note that even when
+ * the tuples are the same, the tupDescs might not be (consider
+ * ADD COLUMN without a default).
+ */
+ oldslot = MakeTupleTableSlot();
+ ExecSetSlotDescriptor(oldslot, oldTupDesc, false);
+ newslot = MakeTupleTableSlot();
+ ExecSetSlotDescriptor(newslot, newTupDesc, false);
+
+ /* Preallocate values/nulls arrays (+1 in case natts==0) */
+ i = Max(newTupDesc->natts, oldTupDesc->natts);
+ values = (Datum *) palloc(i * sizeof(Datum) + 1);
+ nulls = (char *) palloc(i * sizeof(char) + 1);
+ memset(values, 0, i * sizeof(Datum));
+ memset(nulls, 'n', i * sizeof(char));
+
+ /*
+ * Scan through the rows, generating a new row if needed and then
+ * checking all the constraints.
+ */
+ scan = heap_beginscan(oldrel, SnapshotNow, 0, NULL);
+
+ while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+ {
+ if (newrel)
+ {
+ /*
+ * Extract data from old tuple. We can force to null any
+ * columns that are deleted according to the new tuple.
+ */
+ int natts = oldTupDesc->natts;
+ bool isNull;
+
+ for (i = 0; i < natts; i++)
+ {
+ if (newTupDesc->attrs[i]->attisdropped)
+ nulls[i] = 'n';
+ else
+ {
+ values[i] = heap_getattr(tuple,
+ i + 1,
+ oldTupDesc,
+ &isNull);
+ if (isNull)
+ nulls[i] = 'n';
+ else
+ nulls[i] = ' ';
+ }
+ }
+
+ /*
+ * Process supplied expressions to replace selected columns.
+ * Expression inputs come from the old tuple.
+ */
+ ExecStoreTuple(tuple, oldslot, InvalidBuffer, false);
+ econtext->ecxt_scantuple = oldslot;
+
+ foreach(l, tab->newvals)
+ {
+ NewColumnValue *ex = lfirst(l);
+
+ values[ex->attnum - 1] = ExecEvalExpr(ex->exprstate,
+ econtext,
+ &isNull,
+ NULL);
+ if (isNull)
+ nulls[ex->attnum - 1] = 'n';
+ else
+ nulls[ex->attnum - 1] = ' ';
+ }
+
+ tuple = heap_formtuple(newTupDesc, values, nulls);
+ }
+
+ /* Now check any constraints on the possibly-changed tuple */
+ ExecStoreTuple(tuple, newslot, InvalidBuffer, false);
+ econtext->ecxt_scantuple = newslot;
+
+ foreach(l, tab->constraints)
+ {
+ NewConstraint *con = lfirst(l);
+
+ switch (con->contype)
+ {
+ case CONSTR_CHECK:
+ if (!ExecQual(con->qualstate, econtext, true))
+ ereport(ERROR,
+ (errcode(ERRCODE_CHECK_VIOLATION),
+ errmsg("check constraint \"%s\" is violated by some row",
+ con->name)));
+ break;
+ case CONSTR_NOTNULL:
+ {
+ Datum d;
+ bool isnull;
+
+ d = heap_getattr(tuple, con->attnum, newTupDesc,
+ &isnull);
+ if (isnull)
+ ereport(ERROR,
+ (errcode(ERRCODE_NOT_NULL_VIOLATION),
+ errmsg("column \"%s\" contains null values",
+ get_attname(tab->relid,
+ con->attnum))));
+ }
+ break;
+ case CONSTR_FOREIGN:
+ /* Nothing to do here */
+ break;
+ default:
+ elog(ERROR, "unrecognized constraint type: %d",
+ (int) con->contype);
+ }
+ }
+
+ /* Write the tuple out to the new relation */
+ if (newrel)
+ {
+ simple_heap_insert(newrel, tuple);
+
+ heap_freetuple(tuple);
+ }
+
+ ResetExprContext(econtext);
+
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ heap_endscan(scan);
+ }
+
+ FreeExecutorState(estate);
+
+ heap_close(oldrel, NoLock);
+ if (newrel)
+ heap_close(newrel, NoLock);
+}
+
+/*
+ * ATGetQueueEntry: find or create an entry in the ALTER TABLE work queue
+ */
+static AlteredTableInfo *
+ATGetQueueEntry(List **wqueue, Relation rel)
+{
+ Oid relid = RelationGetRelid(rel);
+ AlteredTableInfo *tab;
+ List *ltab;
+
+ foreach(ltab, *wqueue)
+ {
+ tab = (AlteredTableInfo *) lfirst(ltab);
+ if (tab->relid == relid)
+ return tab;
+ }
+
+ /*
+ * Not there, so add it. Note that we make a copy of the relation's
+ * existing descriptor before anything interesting can happen to it.
+ */
+ tab = (AlteredTableInfo *) palloc0(sizeof(AlteredTableInfo));
+ tab->relid = relid;
+ tab->oldDesc = CreateTupleDescCopy(RelationGetDescr(rel));
+
+ *wqueue = lappend(*wqueue, tab);
+
+ return tab;
+}
+
+/*
+ * ATSimplePermissions
+ *
+ * - Ensure that it is a relation (or possibly a view)
+ * - Ensure this user is the owner
+ * - Ensure that it is not a system table
+ */
+static void
+ATSimplePermissions(Relation rel, bool allowView)
+{
+ if (rel->rd_rel->relkind != RELKIND_RELATION)
+ {
+ if (allowView)
+ {
+ if (rel->rd_rel->relkind != RELKIND_VIEW)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is not a table or view",
+ RelationGetRelationName(rel))));
+ }
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is not a table",
+ RelationGetRelationName(rel))));
+ }
+
+ /* Permissions checks */
+ if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
RelationGetRelationName(rel));
@@ -1616,87 +2488,111 @@ AlterTableAddColumn(Oid myrelid,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied: \"%s\" is a system catalog",
RelationGetRelationName(rel))));
+}
+/*
+ * ATSimpleRecursion
+ *
+ * Simple table recursion sufficient for most ALTER TABLE operations.
+ * All direct and indirect children are processed in an unspecified order.
+ * Note that if a child inherits from the original table via multiple
+ * inheritance paths, it will be visited just once.
+ */
+static void
+ATSimpleRecursion(List **wqueue, Relation rel,
+ AlterTableCmd *cmd, bool recurse)
+{
/*
- * Recurse to add the column to child classes, if requested.
- *
- * any permissions or problems with duplicate attributes will cause the
- * whole transaction to abort, which is what we want -- all or
- * nothing.
+ * Propagate to children if desired. Non-table relations never have
+ * children, so no need to search in that case.
*/
- if (recurse)
+ if (recurse && rel->rd_rel->relkind == RELKIND_RELATION)
{
+ Oid relid = RelationGetRelid(rel);
List *child,
*children;
- ColumnDef *colDefChild = copyObject(colDef);
- /* Child should see column as singly inherited */
- colDefChild->inhcount = 1;
- colDefChild->is_local = false;
-
- /* We only want direct inheritors */
- children = find_inheritance_children(myrelid);
+ /* this routine is actually in the planner */
+ children = find_all_inheritors(relid);
+ /*
+ * find_all_inheritors does the recursive search of the
+ * inheritance hierarchy, so all we have to do is process all of
+ * the relids in the list that it returns.
+ */
foreach(child, children)
{
Oid childrelid = lfirsto(child);
- HeapTuple tuple;
- Form_pg_attribute childatt;
Relation childrel;
- if (childrelid == myrelid)
+ if (childrelid == relid)
continue;
+ childrel = relation_open(childrelid, AccessExclusiveLock);
+ ATPrepCmd(wqueue, childrel, cmd, false, true);
+ relation_close(childrel, NoLock);
+ }
+ }
+}
- childrel = heap_open(childrelid, AccessExclusiveLock);
+/*
+ * ATOneLevelRecursion
+ *
+ * Here, we visit only direct inheritance children. It is expected that
+ * the command's prep routine will recurse again to find indirect children.
+ * When using this technique, a multiply-inheriting child will be visited
+ * multiple times.
+ */
+static void
+ATOneLevelRecursion(List **wqueue, Relation rel,
+ AlterTableCmd *cmd)
+{
+ Oid relid = RelationGetRelid(rel);
+ List *child,
+ *children;
- /* Does child already have a column by this name? */
- attrdesc = heap_openr(AttributeRelationName, RowExclusiveLock);
- tuple = SearchSysCacheCopyAttName(childrelid, colDef->colname);
- if (!HeapTupleIsValid(tuple))
- {
- /* No, recurse to add it normally */
- heap_close(attrdesc, RowExclusiveLock);
- heap_close(childrel, NoLock);
- AlterTableAddColumn(childrelid, true, colDefChild);
- continue;
- }
- childatt = (Form_pg_attribute) GETSTRUCT(tuple);
+ /* this routine is actually in the planner */
+ children = find_inheritance_children(relid);
- /* Okay if child matches by type */
- if (typenameTypeId(colDef->typename) != childatt->atttypid ||
- colDef->typename->typmod != childatt->atttypmod)
- ereport(ERROR,
- (errcode(ERRCODE_DATATYPE_MISMATCH),
- errmsg("child table \"%s\" has different type for column \"%s\"",
- get_rel_name(childrelid), colDef->colname)));
+ foreach(child, children)
+ {
+ Oid childrelid = lfirsto(child);
+ Relation childrel;
- /*
- * XXX if we supported NOT NULL or defaults, would need to do
- * more work here to verify child matches
- */
- ereport(NOTICE,
- (errmsg("merging definition of column \"%s\" for child \"%s\"",
- colDef->colname, get_rel_name(childrelid))));
+ childrel = relation_open(childrelid, AccessExclusiveLock);
+ ATPrepCmd(wqueue, childrel, cmd, true, true);
+ relation_close(childrel, NoLock);
+ }
+}
- /* Bump the existing child att's inhcount */
- childatt->attinhcount++;
- simple_heap_update(attrdesc, &tuple->t_self, tuple);
- CatalogUpdateIndexes(attrdesc, tuple);
+/*
+ * ALTER TABLE ADD COLUMN
+ *
+ * Adds an additional attribute to a relation making the assumption that
+ * CHECK, NOT NULL, and FOREIGN KEY constraints will be removed from the
+ * AT_AddColumn AlterTableCmd by analyze.c and added as independent
+ * AlterTableCmd's.
+ */
+static void
+ATPrepAddColumn(List **wqueue, Relation rel, bool recurse,
+ AlterTableCmd *cmd)
+{
+ /*
+ * Recurse to add the column to child classes, if requested.
+ *
+ * We must recurse one level at a time, so that multiply-inheriting
+ * children are visited the right number of times and end up with the
+ * right attinhcount.
+ */
+ if (recurse)
+ {
+ AlterTableCmd *childCmd = copyObject(cmd);
+ ColumnDef *colDefChild = (ColumnDef *) childCmd->def;
- /*
- * Propagate any new CHECK constraints into the child table
- * and its descendants
- */
- if (colDef->constraints != NIL)
- {
- CommandCounterIncrement();
- AlterTableAddConstraint(childrelid, true, colDef->constraints);
- }
+ /* Child should see column as singly inherited */
+ colDefChild->inhcount = 1;
+ colDefChild->is_local = false;
- heap_freetuple(tuple);
- heap_close(attrdesc, RowExclusiveLock);
- heap_close(childrel, NoLock);
- }
+ ATOneLevelRecursion(wqueue, rel, childCmd);
}
else
{
@@ -1704,42 +2600,77 @@ AlterTableAddColumn(Oid myrelid,
* If we are told not to recurse, there had better not be any
* child tables; else the addition would put them out of step.
*/
- if (find_inheritance_children(myrelid) != NIL)
+ if (find_inheritance_children(RelationGetRelid(rel)) != NIL)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("column must be added to child tables too")));
}
+}
+
+static void
+ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
+ ColumnDef *colDef)
+{
+ Oid myrelid = RelationGetRelid(rel);
+ Relation pgclass,
+ attrdesc;
+ HeapTuple reltup;
+ HeapTuple attributeTuple;
+ Form_pg_attribute attribute;
+ FormData_pg_attribute attributeD;
+ int i;
+ int minattnum,
+ maxatts;
+ HeapTuple typeTuple;
+ Form_pg_type tform;
+ Expr *defval;
+
+ attrdesc = heap_openr(AttributeRelationName, RowExclusiveLock);
/*
- * OK, get on with it...
- *
- * Implementation restrictions: because we don't touch the table rows,
- * the new column values will initially appear to be NULLs. (This
- * happens because the heap tuple access routines always check for
- * attnum > # of attributes in tuple, and return NULL if so.)
- * Therefore we can't support a DEFAULT value in SQL92-compliant
- * fashion, and we also can't allow a NOT NULL constraint.
- *
- * We do allow CHECK constraints, even though these theoretically could
- * fail for NULL rows (eg, CHECK (newcol IS NOT NULL)).
+ * Are we adding the column to a recursion child? If so, check whether
+ * to merge with an existing definition for the column.
*/
- if (colDef->raw_default || colDef->cooked_default)
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("adding columns with defaults is not implemented"),
- errhint("Add the column, then use ALTER TABLE SET DEFAULT.")));
+ if (colDef->inhcount > 0)
+ {
+ HeapTuple tuple;
- if (colDef->is_not_null)
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("adding NOT NULL columns is not implemented"),
- errhint("Add the column, then use ALTER TABLE SET NOT NULL.")));
+ /* Does child already have a column by this name? */
+ tuple = SearchSysCacheCopyAttName(myrelid, colDef->colname);
+ if (HeapTupleIsValid(tuple))
+ {
+ Form_pg_attribute childatt = (Form_pg_attribute) GETSTRUCT(tuple);
+
+ /* Okay if child matches by type */
+ if (typenameTypeId(colDef->typename) != childatt->atttypid ||
+ colDef->typename->typmod != childatt->atttypmod)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("child table \"%s\" has different type for column \"%s\"",
+ RelationGetRelationName(rel), colDef->colname)));
+
+ /* Bump the existing child att's inhcount */
+ childatt->attinhcount++;
+ simple_heap_update(attrdesc, &tuple->t_self, tuple);
+ CatalogUpdateIndexes(attrdesc, tuple);
+
+ heap_freetuple(tuple);
+
+ /* Inform the user about the merge */
+ ereport(NOTICE,
+ (errmsg("merging definition of column \"%s\" for child \"%s\"",
+ colDef->colname, RelationGetRelationName(rel))));
+
+ heap_close(attrdesc, RowExclusiveLock);
+ return;
+ }
+ }
pgclass = heap_openr(RelationRelationName, RowExclusiveLock);
- reltup = SearchSysCache(RELOID,
- ObjectIdGetDatum(myrelid),
- 0, 0, 0);
+ reltup = SearchSysCacheCopy(RELOID,
+ ObjectIdGetDatum(myrelid),
+ 0, 0, 0);
if (!HeapTupleIsValid(reltup))
elog(ERROR, "cache lookup failed for relation %u", myrelid);
@@ -1766,13 +2697,6 @@ AlterTableAddColumn(Oid myrelid,
MaxHeapAttributeNumber)));
i = minattnum + 1;
- attrdesc = heap_openr(AttributeRelationName, RowExclusiveLock);
-
- if (colDef->typename->arrayBounds)
- attndims = length(colDef->typename->arrayBounds);
- else
- attndims = 0;
-
typeTuple = typenameType(colDef->typename);
tform = (Form_pg_type) GETSTRUCT(typeTuple);
@@ -1795,12 +2719,11 @@ AlterTableAddColumn(Oid myrelid,
attribute->atttypmod = colDef->typename->typmod;
attribute->attnum = i;
attribute->attbyval = tform->typbyval;
- attribute->attndims = attndims;
+ attribute->attndims = length(colDef->typename->arrayBounds);
attribute->attstorage = tform->typstorage;
attribute->attalign = tform->typalign;
attribute->attnotnull = colDef->is_not_null;
- attribute->atthasdef = (colDef->raw_default != NULL ||
- colDef->cooked_default != NULL);
+ attribute->atthasdef = false;
attribute->attisdropped = false;
attribute->attislocal = colDef->is_local;
attribute->attinhcount = colDef->inhcount;
@@ -1817,132 +2740,120 @@ AlterTableAddColumn(Oid myrelid,
/*
* Update number of attributes in pg_class tuple
*/
- newreltup = heap_copytuple(reltup);
-
- ((Form_pg_class) GETSTRUCT(newreltup))->relnatts = maxatts;
+ ((Form_pg_class) GETSTRUCT(reltup))->relnatts = maxatts;
- simple_heap_update(pgclass, &newreltup->t_self, newreltup);
+ simple_heap_update(pgclass, &reltup->t_self, reltup);
/* keep catalog indexes current */
- CatalogUpdateIndexes(pgclass, newreltup);
+ CatalogUpdateIndexes(pgclass, reltup);
- heap_freetuple(newreltup);
- ReleaseSysCache(reltup);
+ heap_freetuple(reltup);
heap_close(pgclass, RowExclusiveLock);
- heap_close(rel, NoLock); /* close rel but keep lock! */
+ /* Make the attribute's catalog entry visible */
+ CommandCounterIncrement();
/*
- * Add datatype dependency for the new column.
+ * Store the DEFAULT, if any, in the catalogs
*/
- myself.classId = RelOid_pg_class;
- myself.objectId = myrelid;
- myself.objectSubId = i;
- referenced.classId = RelOid_pg_type;
- referenced.objectId = attribute->atttypid;
- referenced.objectSubId = 0;
- recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+ if (colDef->raw_default)
+ {
+ RawColumnDefault *rawEnt;
- /*
- * Make our catalog updates visible for subsequent steps.
- */
- CommandCounterIncrement();
+ rawEnt = (RawColumnDefault *) palloc(sizeof(RawColumnDefault));
+ rawEnt->attnum = attribute->attnum;
+ rawEnt->raw_default = copyObject(colDef->raw_default);
+
+ /*
+ * This function is intended for CREATE TABLE, so it processes a
+ * _list_ of defaults, but we just do one.
+ */
+ AddRelationRawConstraints(rel, makeList1(rawEnt), NIL);
+
+ /* Make the additional catalog changes visible */
+ CommandCounterIncrement();
+ }
/*
- * Add any CHECK constraints attached to the new column.
+ * Tell Phase 3 to fill in the default expression, if there is one.
+ *
+ * If there is no default, Phase 3 doesn't have to do anything,
+ * because that effectively means that the default is NULL. The
+ * heap tuple access routines always check for attnum > # of attributes
+ * in tuple, and return NULL if so, so without any modification of
+ * the tuple data we will get the effect of NULL values in the new
+ * column.
*
- * To do this we must re-open the rel so that its new attr list gets
- * loaded into the relcache.
+ * Note: we use build_column_default, and not just the cooked default
+ * returned by AddRelationRawConstraints, so that the right thing happens
+ * when a datatype's default applies.
*/
- if (colDef->constraints != NIL)
+ defval = (Expr *) build_column_default(rel, attribute->attnum);
+ if (defval)
{
- rel = heap_open(myrelid, AccessExclusiveLock);
- AddRelationRawConstraints(rel, NIL, colDef->constraints);
- heap_close(rel, NoLock);
+ NewColumnValue *newval;
+
+ newval = (NewColumnValue *) palloc0(sizeof(NewColumnValue));
+ newval->attnum = attribute->attnum;
+ newval->expr = defval;
+
+ tab->newvals = lappend(tab->newvals, newval);
}
/*
- * Automatically create the secondary relation for TOAST if it
- * formerly had no such but now has toastable attributes.
+ * Add datatype dependency for the new column.
*/
- AlterTableCreateToastTable(myrelid, true);
+ add_column_datatype_dependency(myrelid, i, attribute->atttypid);
+}
+
+/*
+ * Install a column's dependency on its datatype.
+ */
+static void
+add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid)
+{
+ ObjectAddress myself,
+ referenced;
+
+ myself.classId = RelOid_pg_class;
+ myself.objectId = relid;
+ myself.objectSubId = attnum;
+ referenced.classId = RelOid_pg_type;
+ referenced.objectId = typid;
+ referenced.objectSubId = 0;
+ recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
}
/*
* ALTER TABLE ALTER COLUMN DROP NOT NULL
*/
-void
-AlterTableAlterColumnDropNotNull(Oid myrelid, bool recurse,
- const char *colName)
+static void
+ATExecDropNotNull(Relation rel, const char *colName)
{
- Relation rel;
HeapTuple tuple;
AttrNumber attnum;
Relation attr_rel;
List *indexoidlist;
List *indexoidscan;
- rel = heap_open(myrelid, AccessExclusiveLock);
-
- if (rel->rd_rel->relkind != RELKIND_RELATION)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a table",
- RelationGetRelationName(rel))));
-
- /* Permissions checks */
- if (!pg_class_ownercheck(myrelid, GetUserId()))
- aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
- RelationGetRelationName(rel));
-
- if (!allowSystemTableMods && IsSystemRelation(rel))
- ereport(ERROR,
- (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
- errmsg("permission denied: \"%s\" is a system catalog",
- RelationGetRelationName(rel))));
-
/*
- * Propagate to children if desired
+ * lookup the attribute
*/
- if (recurse)
- {
- List *child,
- *children;
-
- /* this routine is actually in the planner */
- children = find_all_inheritors(myrelid);
-
- /*
- * find_all_inheritors does the recursive search of the
- * inheritance hierarchy, so all we have to do is process all of
- * the relids in the list that it returns.
- */
- foreach(child, children)
- {
- Oid childrelid = lfirsto(child);
-
- if (childrelid == myrelid)
- continue;
- AlterTableAlterColumnDropNotNull(childrelid,
- false, colName);
- }
- }
+ attr_rel = heap_openr(AttributeRelationName, RowExclusiveLock);
- /* now do the thing on this relation */
+ tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
- /*
- * get the number of the attribute
- */
- attnum = get_attnum(myrelid, colName);
- if (attnum == InvalidAttrNumber)
+ if (!HeapTupleIsValid(tuple))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
- errmsg("column \"%s\" of relation \"%s\" does not exist",
- colName, RelationGetRelationName(rel))));
+ errmsg("column \"%s\" of relation \"%s\" does not exist",
+ colName, RelationGetRelationName(rel))));
+
+ attnum = ((Form_pg_attribute) GETSTRUCT(tuple))->attnum;
/* Prevent them from altering a system attribute */
- if (attnum < 0)
+ if (attnum <= 0)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot alter system column \"%s\"",
@@ -1992,221 +2903,92 @@ AlterTableAlterColumnDropNotNull(Oid myrelid, bool recurse,
freeList(indexoidlist);
/*
- * Okay, actually perform the catalog change
+ * Okay, actually perform the catalog change ... if needed
*/
- attr_rel = heap_openr(AttributeRelationName, RowExclusiveLock);
-
- tuple = SearchSysCacheCopyAttName(myrelid, colName);
- if (!HeapTupleIsValid(tuple)) /* shouldn't happen */
- elog(ERROR, "cache lookup failed for attribute \"%s\" of relation %u",
- colName, myrelid);
-
- ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull = FALSE;
+ if (((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull)
+ {
+ ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull = FALSE;
- simple_heap_update(attr_rel, &tuple->t_self, tuple);
+ simple_heap_update(attr_rel, &tuple->t_self, tuple);
- /* keep the system catalog indexes current */
- CatalogUpdateIndexes(attr_rel, tuple);
+ /* keep the system catalog indexes current */
+ CatalogUpdateIndexes(attr_rel, tuple);
+ }
heap_close(attr_rel, RowExclusiveLock);
-
- heap_close(rel, NoLock);
}
/*
* ALTER TABLE ALTER COLUMN SET NOT NULL
*/
-void
-AlterTableAlterColumnSetNotNull(Oid myrelid, bool recurse,
- const char *colName)
+static void
+ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
+ const char *colName)
{
- Relation rel;
HeapTuple tuple;
AttrNumber attnum;
Relation attr_rel;
- HeapScanDesc scan;
- TupleDesc tupdesc;
-
- rel = heap_open(myrelid, AccessExclusiveLock);
-
- if (rel->rd_rel->relkind != RELKIND_RELATION)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a table",
- RelationGetRelationName(rel))));
-
- /* Permissions checks */
- if (!pg_class_ownercheck(myrelid, GetUserId()))
- aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
- RelationGetRelationName(rel));
-
- if (!allowSystemTableMods && IsSystemRelation(rel))
- ereport(ERROR,
- (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
- errmsg("permission denied: \"%s\" is a system catalog",
- RelationGetRelationName(rel))));
+ NewConstraint *newcon;
/*
- * Propagate to children if desired
+ * lookup the attribute
*/
- if (recurse)
- {
- List *child,
- *children;
-
- /* this routine is actually in the planner */
- children = find_all_inheritors(myrelid);
-
- /*
- * find_all_inheritors does the recursive search of the
- * inheritance hierarchy, so all we have to do is process all of
- * the relids in the list that it returns.
- */
- foreach(child, children)
- {
- Oid childrelid = lfirsto(child);
-
- if (childrelid == myrelid)
- continue;
- AlterTableAlterColumnSetNotNull(childrelid,
- false, colName);
- }
- }
+ attr_rel = heap_openr(AttributeRelationName, RowExclusiveLock);
- /* now do the thing on this relation */
+ tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
- /*
- * get the number of the attribute
- */
- attnum = get_attnum(myrelid, colName);
- if (attnum == InvalidAttrNumber)
+ if (!HeapTupleIsValid(tuple))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
- errmsg("column \"%s\" of relation \"%s\" does not exist",
- colName, RelationGetRelationName(rel))));
+ errmsg("column \"%s\" of relation \"%s\" does not exist",
+ colName, RelationGetRelationName(rel))));
+
+ attnum = ((Form_pg_attribute) GETSTRUCT(tuple))->attnum;
/* Prevent them from altering a system attribute */
- if (attnum < 0)
+ if (attnum <= 0)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot alter system column \"%s\"",
colName)));
/*
- * Perform a scan to ensure that there are no NULL values already in
- * the relation
+ * Okay, actually perform the catalog change ... if needed
*/
- tupdesc = RelationGetDescr(rel);
-
- scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
-
- while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+ if (! ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull)
{
- Datum d;
- bool isnull;
-
- d = heap_getattr(tuple, attnum, tupdesc, &isnull);
-
- if (isnull)
- ereport(ERROR,
- (errcode(ERRCODE_NOT_NULL_VIOLATION),
- errmsg("column \"%s\" contains null values",
- colName)));
- }
+ ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull = TRUE;
- heap_endscan(scan);
+ simple_heap_update(attr_rel, &tuple->t_self, tuple);
- /*
- * Okay, actually perform the catalog change
- */
- attr_rel = heap_openr(AttributeRelationName, RowExclusiveLock);
+ /* keep the system catalog indexes current */
+ CatalogUpdateIndexes(attr_rel, tuple);
- tuple = SearchSysCacheCopyAttName(myrelid, colName);
- if (!HeapTupleIsValid(tuple)) /* shouldn't happen */
- elog(ERROR, "cache lookup failed for attribute \"%s\" of relation %u",
- colName, myrelid);
+ /* Tell Phase 3 to test the constraint */
+ newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
+ newcon->contype = CONSTR_NOTNULL;
+ newcon->attnum = attnum;
+ newcon->name = "NOT NULL";
- ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull = TRUE;
-
- simple_heap_update(attr_rel, &tuple->t_self, tuple);
-
- /* keep the system catalog indexes current */
- CatalogUpdateIndexes(attr_rel, tuple);
+ tab->constraints = lappend(tab->constraints, newcon);
+ }
heap_close(attr_rel, RowExclusiveLock);
-
- heap_close(rel, NoLock);
}
/*
* ALTER TABLE ALTER COLUMN SET/DROP DEFAULT
*/
-void
-AlterTableAlterColumnDefault(Oid myrelid, bool recurse,
- const char *colName,
- Node *newDefault)
+static void
+ATExecColumnDefault(Relation rel, const char *colName,
+ Node *newDefault)
{
- Relation rel;
AttrNumber attnum;
- rel = heap_open(myrelid, AccessExclusiveLock);
-
- /*
- * We allow defaults on views so that INSERT into a view can have
- * default-ish behavior. This works because the rewriter substitutes
- * default values into INSERTs before it expands rules.
- */
- if (rel->rd_rel->relkind != RELKIND_RELATION &&
- rel->rd_rel->relkind != RELKIND_VIEW)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a table or view",
- RelationGetRelationName(rel))));
-
- /* Permissions checks */
- if (!pg_class_ownercheck(myrelid, GetUserId()))
- aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
- RelationGetRelationName(rel));
-
- if (!allowSystemTableMods && IsSystemRelation(rel))
- ereport(ERROR,
- (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
- errmsg("permission denied: \"%s\" is a system catalog",
- RelationGetRelationName(rel))));
-
- /*
- * Propagate to children if desired
- */
- if (recurse)
- {
- List *child,
- *children;
-
- /* this routine is actually in the planner */
- children = find_all_inheritors(myrelid);
-
- /*
- * find_all_inheritors does the recursive search of the
- * inheritance hierarchy, so all we have to do is process all of
- * the relids in the list that it returns.
- */
- foreach(child, children)
- {
- Oid childrelid = lfirsto(child);
-
- if (childrelid == myrelid)
- continue;
- AlterTableAlterColumnDefault(childrelid,
- false, colName, newDefault);
- }
- }
-
- /* now do the thing on this relation */
-
/*
* get the number of the attribute
*/
- attnum = get_attnum(myrelid, colName);
+ attnum = get_attnum(RelationGetRelid(rel), colName);
if (attnum == InvalidAttrNumber)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
@@ -2214,7 +2996,7 @@ AlterTableAlterColumnDefault(Oid myrelid, bool recurse,
colName, RelationGetRelationName(rel))));
/* Prevent them from altering a system attribute */
- if (attnum < 0)
+ if (attnum <= 0)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot alter system column \"%s\"",
@@ -2225,7 +3007,7 @@ AlterTableAlterColumnDefault(Oid myrelid, bool recurse,
* safety, but at present we do not expect anything to depend on the
* default.
*/
- RemoveAttrDefault(myrelid, attnum, DROP_RESTRICT, false);
+ RemoveAttrDefault(RelationGetRelid(rel), attnum, DROP_RESTRICT, false);
if (newDefault)
{
@@ -2242,141 +3024,67 @@ AlterTableAlterColumnDefault(Oid myrelid, bool recurse,
*/
AddRelationRawConstraints(rel, makeList1(rawEnt), NIL);
}
-
- heap_close(rel, NoLock);
}
/*
- * ALTER TABLE ALTER COLUMN SET STATISTICS / STORAGE
+ * ALTER TABLE ALTER COLUMN SET STATISTICS
*/
-void
-AlterTableAlterColumnFlags(Oid myrelid, bool recurse,
- const char *colName,
- Node *flagValue, const char *flagType)
+static void
+ATPrepSetStatistics(Relation rel, const char *colName, Node *flagValue)
{
- Relation rel;
- int newtarget = 1;
- char newstorage = 'p';
- Relation attrelation;
- HeapTuple tuple;
- Form_pg_attribute attrtuple;
-
- rel = relation_open(myrelid, AccessExclusiveLock);
-
/*
- * Allow index for statistics case only
+ * We do our own permission checking because (a) we want to allow
+ * SET STATISTICS on indexes (for expressional index columns), and
+ * (b) we want to allow SET STATISTICS on system catalogs without
+ * requiring allowSystemTableMods to be turned on.
*/
- if (rel->rd_rel->relkind != RELKIND_RELATION)
- {
- if (rel->rd_rel->relkind != RELKIND_INDEX || *flagType != 'S')
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a table",
- RelationGetRelationName(rel))));
- }
+ if (rel->rd_rel->relkind != RELKIND_RELATION &&
+ rel->rd_rel->relkind != RELKIND_INDEX)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is not a table or index",
+ RelationGetRelationName(rel))));
/* Permissions checks */
- if (!pg_class_ownercheck(myrelid, GetUserId()))
+ if (!pg_class_ownercheck(RelationGetRelid(rel), GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
RelationGetRelationName(rel));
+}
- /*
- * we allow statistics case for system tables
- */
- if (*flagType != 'S' && !allowSystemTableMods && IsSystemRelation(rel))
- ereport(ERROR,
- (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
- errmsg("permission denied: \"%s\" is a system catalog",
- RelationGetRelationName(rel))));
+static void
+ATExecSetStatistics(Relation rel, const char *colName, Node *newValue)
+{
+ int newtarget;
+ Relation attrelation;
+ HeapTuple tuple;
+ Form_pg_attribute attrtuple;
+
+ Assert(IsA(newValue, Integer));
+ newtarget = intVal(newValue);
/*
- * Check the supplied parameters before anything else
+ * Limit target to a sane range
*/
- if (*flagType == 'S')
- {
- /* STATISTICS */
- Assert(IsA(flagValue, Integer));
- newtarget = intVal(flagValue);
-
- /*
- * Limit target to a sane range
- */
- if (newtarget < -1)
- {
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("statistics target %d is too low",
- newtarget)));
- }
- else if (newtarget > 1000)
- {
- newtarget = 1000;
- ereport(WARNING,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("lowering statistics target to %d",
- newtarget)));
- }
- }
- else if (*flagType == 'M')
- {
- /* STORAGE */
- char *storagemode;
-
- Assert(IsA(flagValue, String));
- storagemode = strVal(flagValue);
-
- if (strcasecmp(storagemode, "plain") == 0)
- newstorage = 'p';
- else if (strcasecmp(storagemode, "external") == 0)
- newstorage = 'e';
- else if (strcasecmp(storagemode, "extended") == 0)
- newstorage = 'x';
- else if (strcasecmp(storagemode, "main") == 0)
- newstorage = 'm';
- else
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("invalid storage type \"%s\"",
- storagemode)));
- }
- else
+ if (newtarget < -1)
{
- elog(ERROR, "unrecognized alter-column type flag: %c",
- (int) *flagType);
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("statistics target %d is too low",
+ newtarget)));
}
-
- /*
- * Propagate to children if desired
- */
- if (recurse && rel->rd_rel->relkind == RELKIND_RELATION)
+ else if (newtarget > 1000)
{
- List *child,
- *children;
-
- /* this routine is actually in the planner */
- children = find_all_inheritors(myrelid);
-
- /*
- * find_all_inheritors does the recursive search of the
- * inheritance hierarchy, so all we have to do is process all of
- * the relids in the list that it returns.
- */
- foreach(child, children)
- {
- Oid childrelid = lfirsto(child);
-
- if (childrelid == myrelid)
- continue;
- AlterTableAlterColumnFlags(childrelid,
- false, colName, flagValue, flagType);
- }
+ newtarget = 1000;
+ ereport(WARNING,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("lowering statistics target to %d",
+ newtarget)));
}
- /* now do the thing on this relation */
-
attrelation = heap_openr(AttributeRelationName, RowExclusiveLock);
- tuple = SearchSysCacheCopyAttName(myrelid, colName);
+ tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
+
if (!HeapTupleIsValid(tuple))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
@@ -2384,31 +3092,13 @@ AlterTableAlterColumnFlags(Oid myrelid, bool recurse,
colName, RelationGetRelationName(rel))));
attrtuple = (Form_pg_attribute) GETSTRUCT(tuple);
- if (attrtuple->attnum < 0)
+ if (attrtuple->attnum <= 0)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot alter system column \"%s\"",
colName)));
- /*
- * Now change the appropriate field
- */
- if (*flagType == 'S')
- attrtuple->attstattarget = newtarget;
- else if (*flagType == 'M')
- {
- /*
- * safety check: do not allow toasted storage modes unless column
- * datatype is TOAST-aware.
- */
- if (newstorage == 'p' || TypeIsToastable(attrtuple->atttypid))
- attrtuple->attstorage = newstorage;
- else
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("column data type %s can only have storage PLAIN",
- format_type_be(attrtuple->atttypid))));
- }
+ attrtuple->attstattarget = newtarget;
simple_heap_update(attrelation, &tuple->t_self, tuple);
@@ -2418,85 +3108,110 @@ AlterTableAlterColumnFlags(Oid myrelid, bool recurse,
heap_freetuple(tuple);
heap_close(attrelation, RowExclusiveLock);
-
- heap_close(rel, NoLock); /* close rel, but keep lock! */
}
/*
- * ALTER TABLE SET WITH/WITHOUT OIDS
+ * ALTER TABLE ALTER COLUMN SET STORAGE
*/
-void
-AlterTableAlterOids(Oid myrelid, bool setOid, bool recurse,
- DropBehavior behavior)
+static void
+ATExecSetStorage(Relation rel, const char *colName, Node *newValue)
{
- Relation rel;
-
- rel = heap_open(myrelid, AccessExclusiveLock);
+ char *storagemode;
+ char newstorage;
+ Relation attrelation;
+ HeapTuple tuple;
+ Form_pg_attribute attrtuple;
- /*
- * check to see if we actually need to change anything
- */
- if (rel->rd_rel->relhasoids == setOid)
+ Assert(IsA(newValue, String));
+ storagemode = strVal(newValue);
+
+ if (strcasecmp(storagemode, "plain") == 0)
+ newstorage = 'p';
+ else if (strcasecmp(storagemode, "external") == 0)
+ newstorage = 'e';
+ else if (strcasecmp(storagemode, "extended") == 0)
+ newstorage = 'x';
+ else if (strcasecmp(storagemode, "main") == 0)
+ newstorage = 'm';
+ else
{
- heap_close(rel, NoLock); /* close rel, but keep lock! */
- return;
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid storage type \"%s\"",
+ storagemode)));
+ newstorage = 0; /* keep compiler quiet */
}
- if (setOid)
- {
- /*
- * TODO: Generate the now required OID pg_attribute entry, and
- * modify physical rows to have OIDs.
- */
+ attrelation = heap_openr(AttributeRelationName, RowExclusiveLock);
+
+ tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
+
+ if (!HeapTupleIsValid(tuple))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_COLUMN),
+ errmsg("column \"%s\" of relation \"%s\" does not exist",
+ colName, RelationGetRelationName(rel))));
+ attrtuple = (Form_pg_attribute) GETSTRUCT(tuple);
+
+ if (attrtuple->attnum <= 0)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("ALTER TABLE WITH OIDS is not yet implemented")));
- }
+ errmsg("cannot alter system column \"%s\"",
+ colName)));
+
+ /*
+ * safety check: do not allow toasted storage modes unless column
+ * datatype is TOAST-aware.
+ */
+ if (newstorage == 'p' || TypeIsToastable(attrtuple->atttypid))
+ attrtuple->attstorage = newstorage;
else
- {
- heap_close(rel, NoLock); /* close rel, but keep lock! */
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("column data type %s can only have storage PLAIN",
+ format_type_be(attrtuple->atttypid))));
- AlterTableDropColumn(myrelid, recurse, false, "oid", behavior);
- }
+ simple_heap_update(attrelation, &tuple->t_self, tuple);
+
+ /* keep system catalog indexes current */
+ CatalogUpdateIndexes(attrelation, tuple);
+
+ heap_freetuple(tuple);
+
+ heap_close(attrelation, RowExclusiveLock);
}
+
/*
* ALTER TABLE DROP COLUMN
+ *
+ * DROP COLUMN cannot use the normal ALTER TABLE recursion mechanism,
+ * because we have to decide at runtime whether to recurse or not depending
+ * on whether attinhcount goes to zero or not. (We can't check this in a
+ * static pre-pass because it won't handle multiple inheritance situations
+ * correctly.) Since DROP COLUMN doesn't need to create any work queue
+ * entries for Phase 3, it's okay to recurse internally in this routine
+ * without considering the work queue.
*/
-void
-AlterTableDropColumn(Oid myrelid, bool recurse, bool recursing,
- const char *colName,
- DropBehavior behavior)
+static void
+ATExecDropColumn(Relation rel, const char *colName,
+ DropBehavior behavior,
+ bool recurse, bool recursing)
{
- Relation rel;
- AttrNumber attnum;
HeapTuple tuple;
Form_pg_attribute targetatt;
+ AttrNumber attnum;
+ List *children;
ObjectAddress object;
- rel = heap_open(myrelid, AccessExclusiveLock);
-
- if (rel->rd_rel->relkind != RELKIND_RELATION)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a table",
- RelationGetRelationName(rel))));
-
- /* Permissions checks */
- if (!pg_class_ownercheck(myrelid, GetUserId()))
- aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
- RelationGetRelationName(rel));
-
- if (!allowSystemTableMods && IsSystemRelation(rel))
- ereport(ERROR,
- (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
- errmsg("permission denied: \"%s\" is a system catalog",
- RelationGetRelationName(rel))));
+ /* At top level, permission check was done in ATPrepCmd, else do it */
+ if (recursing)
+ ATSimplePermissions(rel, false);
/*
* get the number of the attribute
*/
- tuple = SearchSysCacheAttName(myrelid, colName);
+ tuple = SearchSysCacheAttName(RelationGetRelid(rel), colName);
if (!HeapTupleIsValid(tuple))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
@@ -2523,65 +3238,16 @@ AlterTableDropColumn(Oid myrelid, bool recurse, bool recursing,
ReleaseSysCache(tuple);
/*
- * If we are asked to drop ONLY in this table (no recursion), we need
- * to mark the inheritors' attribute as locally defined rather than
- * inherited.
- */
- if (!recurse && !recursing)
- {
- Relation attr_rel;
- List *child,
- *children;
-
- /* We only want direct inheritors in this case */
- children = find_inheritance_children(myrelid);
-
- attr_rel = heap_openr(AttributeRelationName, RowExclusiveLock);
- foreach(child, children)
- {
- Oid childrelid = lfirsto(child);
- Relation childrel;
- Form_pg_attribute childatt;
-
- childrel = heap_open(childrelid, AccessExclusiveLock);
-
- tuple = SearchSysCacheCopyAttName(childrelid, colName);
- if (!HeapTupleIsValid(tuple)) /* shouldn't happen */
- elog(ERROR, "cache lookup failed for attribute \"%s\" of relation %u",
- colName, childrelid);
- childatt = (Form_pg_attribute) GETSTRUCT(tuple);
-
- if (childatt->attinhcount <= 0) /* shouldn't happen */
- elog(ERROR, "relation %u has non-inherited attribute \"%s\"",
- childrelid, colName);
- childatt->attinhcount--;
- childatt->attislocal = true;
-
- simple_heap_update(attr_rel, &tuple->t_self, tuple);
-
- /* keep the system catalog indexes current */
- CatalogUpdateIndexes(attr_rel, tuple);
-
- heap_freetuple(tuple);
-
- heap_close(childrel, NoLock);
- }
- heap_close(attr_rel, RowExclusiveLock);
- }
-
- /*
- * Propagate to children if desired. Unlike most other ALTER
+ * Propagate to children as appropriate. Unlike most other ALTER
* routines, we have to do this one level of recursion at a time; we
* can't use find_all_inheritors to do it in one pass.
*/
- if (recurse)
+ children = find_inheritance_children(RelationGetRelid(rel));
+
+ if (children)
{
Relation attr_rel;
- List *child,
- *children;
-
- /* We only want direct inheritors in this case */
- children = find_inheritance_children(myrelid);
+ List *child;
attr_rel = heap_openr(AttributeRelationName, RowExclusiveLock);
foreach(child, children)
@@ -2590,9 +3256,6 @@ AlterTableDropColumn(Oid myrelid, bool recurse, bool recursing,
Relation childrel;
Form_pg_attribute childatt;
- if (childrelid == myrelid)
- continue;
-
childrel = heap_open(childrelid, AccessExclusiveLock);
tuple = SearchSysCacheCopyAttName(childrelid, colName);
@@ -2605,20 +3268,49 @@ AlterTableDropColumn(Oid myrelid, bool recurse, bool recursing,
elog(ERROR, "relation %u has non-inherited attribute \"%s\"",
childrelid, colName);
- if (childatt->attinhcount == 1 && !childatt->attislocal)
+ if (recurse)
{
- /* Time to delete this child column, too */
- AlterTableDropColumn(childrelid, true, true, colName, behavior);
+ /*
+ * If the child column has other definition sources, just
+ * decrement its inheritance count; if not, recurse to delete
+ * it.
+ */
+ if (childatt->attinhcount == 1 && !childatt->attislocal)
+ {
+ /* Time to delete this child column, too */
+ ATExecDropColumn(childrel, colName, behavior, true, true);
+ }
+ else
+ {
+ /* Child column must survive my deletion */
+ childatt->attinhcount--;
+
+ simple_heap_update(attr_rel, &tuple->t_self, tuple);
+
+ /* keep the system catalog indexes current */
+ CatalogUpdateIndexes(attr_rel, tuple);
+
+ /* Make update visible */
+ CommandCounterIncrement();
+ }
}
else
{
- /* Child column must survive my deletion */
+ /*
+ * If we were told to drop ONLY in this table (no recursion),
+ * we need to mark the inheritors' attribute as locally
+ * defined rather than inherited.
+ */
childatt->attinhcount--;
+ childatt->attislocal = true;
simple_heap_update(attr_rel, &tuple->t_self, tuple);
/* keep the system catalog indexes current */
CatalogUpdateIndexes(attr_rel, tuple);
+
+ /* Make update visible */
+ CommandCounterIncrement();
}
heap_freetuple(tuple);
@@ -2632,7 +3324,7 @@ AlterTableDropColumn(Oid myrelid, bool recurse, bool recursing,
* Perform the actual column deletion
*/
object.classId = RelOid_pg_class;
- object.objectId = myrelid;
+ object.objectId = RelationGetRelid(rel);
object.objectSubId = attnum;
performDeletion(&object, behavior);
@@ -2648,10 +3340,11 @@ AlterTableDropColumn(Oid myrelid, bool recurse, bool recursing,
class_rel = heap_openr(RelationRelationName, RowExclusiveLock);
tuple = SearchSysCacheCopy(RELOID,
- ObjectIdGetDatum(myrelid),
+ ObjectIdGetDatum(RelationGetRelid(rel)),
0, 0, 0);
if (!HeapTupleIsValid(tuple))
- elog(ERROR, "cache lookup failed for relation %u", myrelid);
+ elog(ERROR, "cache lookup failed for relation %u",
+ RelationGetRelid(rel));
tuple_class = (Form_pg_class) GETSTRUCT(tuple);
tuple_class->relhasoids = false;
@@ -2662,298 +3355,149 @@ AlterTableDropColumn(Oid myrelid, bool recurse, bool recursing,
heap_close(class_rel, RowExclusiveLock);
}
-
- heap_close(rel, NoLock); /* close rel, but keep lock! */
}
+/*
+ * ALTER TABLE ADD INDEX
+ *
+ * There is no such command in the grammar, but the parser converts UNIQUE
+ * and PRIMARY KEY constraints into AT_AddIndex subcommands. This lets us
+ * schedule creation of the index at the appropriate time during ALTER.
+ */
+static void
+ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
+ IndexStmt *stmt, bool is_rebuild)
+{
+ bool check_rights;
+ bool skip_build;
+ bool quiet;
+
+ Assert(IsA(stmt, IndexStmt));
+
+ /* suppress schema rights check when rebuilding existing index */
+ check_rights = !is_rebuild;
+ /* skip index build if phase 3 will have to rewrite table anyway */
+ skip_build = (tab->newvals != NIL);
+ /* suppress notices when rebuilding existing index */
+ quiet = is_rebuild;
+
+ DefineIndex(stmt->relation, /* relation */
+ stmt->idxname, /* index name */
+ stmt->accessMethod, /* am name */
+ stmt->indexParams, /* parameters */
+ (Expr *) stmt->whereClause,
+ stmt->rangetable,
+ stmt->unique,
+ stmt->primary,
+ stmt->isconstraint,
+ true, /* is_alter_table */
+ check_rights,
+ skip_build,
+ quiet);
+}
/*
* ALTER TABLE ADD CONSTRAINT
*/
-void
-AlterTableAddConstraint(Oid myrelid, bool recurse,
- List *newConstraints)
+static void
+ATExecAddConstraint(AlteredTableInfo *tab, Relation rel, Node *newConstraint)
{
- Relation rel;
- List *listptr;
- int counter = 0;
-
- /*
- * Grab an exclusive lock on the target table, which we will NOT
- * release until end of transaction.
- */
- rel = heap_open(myrelid, AccessExclusiveLock);
-
- if (rel->rd_rel->relkind != RELKIND_RELATION)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a table",
- RelationGetRelationName(rel))));
-
- /* Permissions checks */
- if (!pg_class_ownercheck(myrelid, GetUserId()))
- aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
- RelationGetRelationName(rel));
-
- if (!allowSystemTableMods && IsSystemRelation(rel))
- ereport(ERROR,
- (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
- errmsg("permission denied: \"%s\" is a system catalog",
- RelationGetRelationName(rel))));
-
- if (recurse)
+ switch (nodeTag(newConstraint))
{
- List *child,
- *children;
-
- /* this routine is actually in the planner */
- children = find_all_inheritors(myrelid);
-
- /*
- * find_all_inheritors does the recursive search of the
- * inheritance hierarchy, so all we have to do is process all of
- * the relids in the list that it returns.
- */
- foreach(child, children)
+ case T_Constraint:
{
- Oid childrelid = lfirsto(child);
+ Constraint *constr = (Constraint *) newConstraint;
- if (childrelid == myrelid)
- continue;
- AlterTableAddConstraint(childrelid, false, newConstraints);
- }
- }
-
- foreach(listptr, newConstraints)
- {
- /*
- * copy is because we may destructively alter the node below by
- * inserting a generated name; this name is not necessarily
- * correct for children or parents.
- */
- Node *newConstraint = copyObject(lfirst(listptr));
-
- switch (nodeTag(newConstraint))
- {
- case T_Constraint:
- {
- Constraint *constr = (Constraint *) newConstraint;
-
- /*
- * Assign or validate constraint name
- */
- if (constr->name)
- {
- if (ConstraintNameIsUsed(CONSTRAINT_RELATION,
- RelationGetRelid(rel),
- RelationGetNamespace(rel),
- constr->name))
- ereport(ERROR,
- (errcode(ERRCODE_DUPLICATE_OBJECT),
- errmsg("constraint \"%s\" for relation \"%s\" already exists",
- constr->name,
- RelationGetRelationName(rel))));
- }
- else
- constr->name = GenerateConstraintName(CONSTRAINT_RELATION,
- RelationGetRelid(rel),
- RelationGetNamespace(rel),
- &counter);
-
- /*
- * Currently, we only expect to see CONSTR_CHECK nodes
- * arriving here (see the preprocessing done in
- * parser/analyze.c). Use a switch anyway to make it
- * easier to add more code later.
- */
- switch (constr->contype)
- {
- case CONSTR_CHECK:
- AlterTableAddCheckConstraint(rel, constr);
- break;
- default:
- elog(ERROR, "unrecognized constraint type: %d",
- (int) constr->contype);
- }
- break;
- }
- case T_FkConstraint:
+ /*
+ * Currently, we only expect to see CONSTR_CHECK nodes
+ * arriving here (see the preprocessing done in
+ * parser/analyze.c). Use a switch anyway to make it
+ * easier to add more code later.
+ */
+ switch (constr->contype)
+ {
+ case CONSTR_CHECK:
{
- FkConstraint *fkconstraint = (FkConstraint *) newConstraint;
+ List *newcons;
+ List *lcon;
/*
- * Assign or validate constraint name
+ * Call AddRelationRawConstraints to do the work.
+ * It returns a list of cooked constraints.
*/
- if (fkconstraint->constr_name)
- {
- if (ConstraintNameIsUsed(CONSTRAINT_RELATION,
- RelationGetRelid(rel),
- RelationGetNamespace(rel),
- fkconstraint->constr_name))
- ereport(ERROR,
- (errcode(ERRCODE_DUPLICATE_OBJECT),
- errmsg("constraint \"%s\" for relation \"%s\" already exists",
- fkconstraint->constr_name,
- RelationGetRelationName(rel))));
+ newcons = AddRelationRawConstraints(rel, NIL,
+ makeList1(constr));
+ /* Add each constraint to Phase 3's queue */
+ foreach(lcon, newcons)
+ {
+ CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon);
+ NewConstraint *newcon;
+
+ newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
+ newcon->name = ccon->name;
+ newcon->contype = ccon->contype;
+ newcon->attnum = ccon->attnum;
+ /* ExecQual wants implicit-AND format */
+ newcon->qual = (Node *)
+ make_ands_implicit((Expr *) ccon->expr);
+
+ tab->constraints = lappend(tab->constraints,
+ newcon);
}
- else
- fkconstraint->constr_name = GenerateConstraintName(CONSTRAINT_RELATION,
- RelationGetRelid(rel),
- RelationGetNamespace(rel),
- &counter);
-
- AlterTableAddForeignKeyConstraint(rel, fkconstraint);
-
break;
}
- default:
- elog(ERROR, "unrecognized node type: %d",
- (int) nodeTag(newConstraint));
+ default:
+ elog(ERROR, "unrecognized constraint type: %d",
+ (int) constr->contype);
+ }
+ break;
}
+ case T_FkConstraint:
+ {
+ FkConstraint *fkconstraint = (FkConstraint *) newConstraint;
- /* If we have multiple constraints to make, bump CC between 'em */
- if (lnext(listptr))
- CommandCounterIncrement();
- }
-
- /* Close rel, but keep lock till commit */
- heap_close(rel, NoLock);
-}
-
-/*
- * Add a check constraint to a single table
- *
- * Subroutine for AlterTableAddConstraint. Must already hold exclusive
- * lock on the rel, and have done appropriate validity/permissions checks
- * for it.
- */
-static void
-AlterTableAddCheckConstraint(Relation rel, Constraint *constr)
-{
- ParseState *pstate;
- bool successful = true;
- HeapScanDesc scan;
- EState *estate;
- ExprContext *econtext;
- TupleTableSlot *slot;
- HeapTuple tuple;
- RangeTblEntry *rte;
- List *qual;
- List *qualstate;
- Node *expr;
-
- /*
- * We need to make a parse state and range table to allow us to do
- * transformExpr()
- */
- pstate = make_parsestate(NULL);
- rte = addRangeTableEntryForRelation(pstate,
- RelationGetRelid(rel),
- makeAlias(RelationGetRelationName(rel), NIL),
- false,
- true);
- addRTEtoQuery(pstate, rte, true, true);
-
- /*
- * Convert the A_EXPR in raw_expr into an EXPR
- */
- expr = transformExpr(pstate, constr->raw_expr);
-
- /*
- * Make sure it yields a boolean result.
- */
- expr = coerce_to_boolean(pstate, expr, "CHECK");
-
- /*
- * Make sure no outside relations are referred to.
- */
- if (length(pstate->p_rtable) != 1)
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
- errmsg("check constraint may only reference relation \"%s\"",
- RelationGetRelationName(rel))));
-
- /*
- * No subplans or aggregates, either...
- */
- if (pstate->p_hasSubLinks)
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("cannot use subquery in check constraint")));
- if (pstate->p_hasAggs)
- ereport(ERROR,
- (errcode(ERRCODE_GROUPING_ERROR),
- errmsg("cannot use aggregate function in check constraint")));
-
- /*
- * Might as well try to reduce any constant expressions, so as to
- * minimize overhead while testing the constraint at each row.
- *
- * Note that the stored form of the constraint will NOT be const-folded.
- */
- expr = eval_const_expressions(expr);
-
- /* Needs to be in implicit-ANDs form for ExecQual */
- qual = make_ands_implicit((Expr *) expr);
-
- /* Need an EState to run ExecQual */
- estate = CreateExecutorState();
- econtext = GetPerTupleExprContext(estate);
-
- /* build execution state for qual */
- qualstate = (List *) ExecPrepareExpr((Expr *) qual, estate);
-
- /* Make tuple slot to hold tuples */
- slot = MakeTupleTableSlot();
- ExecSetSlotDescriptor(slot, RelationGetDescr(rel), false);
-
- /* Arrange for econtext's scan tuple to be the tuple under test */
- econtext->ecxt_scantuple = slot;
+ /*
+ * Assign or validate constraint name
+ */
+ if (fkconstraint->constr_name)
+ {
+ if (ConstraintNameIsUsed(CONSTRAINT_RELATION,
+ RelationGetRelid(rel),
+ RelationGetNamespace(rel),
+ fkconstraint->constr_name))
+ ereport(ERROR,
+ (errcode(ERRCODE_DUPLICATE_OBJECT),
+ errmsg("constraint \"%s\" for relation \"%s\" already exists",
+ fkconstraint->constr_name,
+ RelationGetRelationName(rel))));
+ }
+ else
+ fkconstraint->constr_name =
+ GenerateConstraintName(CONSTRAINT_RELATION,
+ RelationGetRelid(rel),
+ RelationGetNamespace(rel),
+ &tab->constr_name_ctr);
- /*
- * Scan through the rows now, checking the expression at each row.
- */
- scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
+ ATAddForeignKeyConstraint(tab, rel, fkconstraint);
- while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
- {
- ExecStoreTuple(tuple, slot, InvalidBuffer, false);
- if (!ExecQual(qualstate, econtext, true))
- {
- successful = false;
break;
}
- ResetExprContext(econtext);
+ default:
+ elog(ERROR, "unrecognized node type: %d",
+ (int) nodeTag(newConstraint));
}
-
- heap_endscan(scan);
-
- pfree(slot);
- FreeExecutorState(estate);
-
- if (!successful)
- ereport(ERROR,
- (errcode(ERRCODE_CHECK_VIOLATION),
- errmsg("check constraint \"%s\" is violated by some row",
- constr->name)));
-
- /*
- * Call AddRelationRawConstraints to do the real adding -- It
- * duplicates some of the above, but does not check the validity of
- * the constraint against tuples already in the table.
- */
- AddRelationRawConstraints(rel, NIL, makeList1(constr));
}
/*
* Add a foreign-key constraint to a single table
*
- * Subroutine for AlterTableAddConstraint. Must already hold exclusive
+ * Subroutine for ATExecAddConstraint. Must already hold exclusive
* lock on the rel, and have done appropriate validity/permissions checks
* for it.
*/
static void
-AlterTableAddForeignKeyConstraint(Relation rel, FkConstraint *fkconstraint)
+ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
+ FkConstraint *fkconstraint)
{
Relation pkrel;
AclResult aclresult;
@@ -3124,11 +3668,21 @@ AlterTableAddForeignKeyConstraint(Relation rel, FkConstraint *fkconstraint)
}
/*
- * Check that the constraint is satisfied by existing rows (we can
- * skip this during table creation).
+ * Tell Phase 3 to check that the constraint is satisfied by existing rows
+ * (we can skip this during table creation).
*/
if (!fkconstraint->skip_validation)
- validateForeignKeyConstraint(fkconstraint, rel, pkrel);
+ {
+ NewConstraint *newcon;
+
+ newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
+ newcon->name = fkconstraint->constr_name;
+ newcon->contype = CONSTR_FOREIGN;
+ newcon->refrelid = RelationGetRelid(pkrel);
+ newcon->qual = (Node *) fkconstraint;
+
+ tab->constraints = lappend(tab->constraints, newcon);
+ }
/*
* Record the FK constraint in pg_constraint.
@@ -3728,94 +4282,626 @@ fkMatchTypeToString(char match_type)
/*
* ALTER TABLE DROP CONSTRAINT
*/
-void
-AlterTableDropConstraint(Oid myrelid, bool recurse,
- const char *constrName,
- DropBehavior behavior)
+static void
+ATPrepDropConstraint(List **wqueue, Relation rel,
+ bool recurse, AlterTableCmd *cmd)
{
- Relation rel;
- int deleted = 0;
-
/*
- * Acquire an exclusive lock on the target relation for the duration
- * of the operation.
+ * We don't want errors or noise from child tables, so we have to pass
+ * down a modified command.
*/
- rel = heap_open(myrelid, AccessExclusiveLock);
+ if (recurse)
+ {
+ AlterTableCmd *childCmd = copyObject(cmd);
- /* Disallow DROP CONSTRAINT on views, indexes, sequences, etc */
- if (rel->rd_rel->relkind != RELKIND_RELATION)
+ childCmd->subtype = AT_DropConstraintQuietly;
+ ATSimpleRecursion(wqueue, rel, childCmd, recurse);
+ }
+}
+
+static void
+ATExecDropConstraint(Relation rel, const char *constrName,
+ DropBehavior behavior, bool quiet)
+{
+ int deleted;
+
+ deleted = RemoveRelConstraints(rel, constrName, behavior);
+
+ if (!quiet)
+ {
+ /* If zero constraints deleted, complain */
+ if (deleted == 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("constraint \"%s\" does not exist",
+ constrName)));
+ /* Otherwise if more than one constraint deleted, notify */
+ else if (deleted > 1)
+ ereport(NOTICE,
+ (errmsg("multiple constraints named \"%s\" were dropped",
+ constrName)));
+ }
+}
+
+/*
+ * ALTER COLUMN TYPE
+ */
+static void
+ATPrepAlterColumnType(List **wqueue,
+ AlteredTableInfo *tab, Relation rel,
+ bool recurse, bool recursing,
+ AlterTableCmd *cmd)
+{
+ char *colName = cmd->name;
+ TypeName *typename = (TypeName *) cmd->def;
+ HeapTuple tuple;
+ Form_pg_attribute attTup;
+ AttrNumber attnum;
+ Oid targettype;
+ Node *transform;
+ NewColumnValue *newval;
+ ParseState *pstate = make_parsestate(NULL);
+
+ /* lookup the attribute so we can check inheritance status */
+ tuple = SearchSysCacheAttName(RelationGetRelid(rel), colName);
+ if (!HeapTupleIsValid(tuple))
ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a table",
- RelationGetRelationName(rel))));
+ (errcode(ERRCODE_UNDEFINED_COLUMN),
+ errmsg("column \"%s\" of relation \"%s\" does not exist",
+ colName, RelationGetRelationName(rel))));
+ attTup = (Form_pg_attribute) GETSTRUCT(tuple);
+ attnum = attTup->attnum;
- /* Permissions checks */
- if (!pg_class_ownercheck(myrelid, GetUserId()))
- aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
- RelationGetRelationName(rel));
+ /* Can't alter a system attribute */
+ if (attnum <= 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot alter system column \"%s\"",
+ colName)));
- if (!allowSystemTableMods && IsSystemRelation(rel))
+ /* Don't alter inherited columns */
+ if (attTup->attinhcount > 0 && !recursing)
ereport(ERROR,
- (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
- errmsg("permission denied: \"%s\" is a system catalog",
- RelationGetRelationName(rel))));
+ (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("cannot alter inherited column \"%s\"",
+ colName)));
+
+ /* Look up the target type */
+ targettype = LookupTypeName(typename);
+ if (!OidIsValid(targettype))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("type \"%s\" does not exist",
+ TypeNameToString(typename))));
+
+ /* make sure datatype is legal for a column */
+ CheckAttributeType(colName, targettype);
/*
- * Process child tables if requested.
+ * Set up an expression to transform the old data value to the new type.
+ * If a USING option was given, transform and use that expression, else
+ * just take the old value and try to coerce it. We do this first so
+ * that type incompatibility can be detected before we waste effort,
+ * and because we need the expression to be parsed against the original
+ * table rowtype.
+ */
+ if (cmd->transform)
+ {
+ RangeTblEntry *rte;
+
+ /* Expression must be able to access vars of old table */
+ rte = addRangeTableEntryForRelation(pstate,
+ RelationGetRelid(rel),
+ makeAlias(RelationGetRelationName(rel), NIL),
+ false,
+ true);
+ addRTEtoQuery(pstate, rte, false, true);
+
+ transform = transformExpr(pstate, cmd->transform);
+
+ /* It can't return a set */
+ if (expression_returns_set(transform))
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("transform expression must not return a set")));
+
+ /* No subplans or aggregates, either... */
+ if (pstate->p_hasSubLinks)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot use subquery in transform expression")));
+ if (pstate->p_hasAggs)
+ ereport(ERROR,
+ (errcode(ERRCODE_GROUPING_ERROR),
+ errmsg("cannot use aggregate function in transform expression")));
+ }
+ else
+ {
+ transform = (Node *) makeVar(1, attnum,
+ attTup->atttypid, attTup->atttypmod,
+ 0);
+ }
+
+ transform = coerce_to_target_type(pstate,
+ transform, exprType(transform),
+ targettype, typename->typmod,
+ COERCION_ASSIGNMENT,
+ COERCE_IMPLICIT_CAST);
+ if (transform == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("column \"%s\" cannot be cast to type \"%s\"",
+ colName, TypeNameToString(typename))));
+
+ /*
+ * Add a work queue item to make ATRewriteTable update the column
+ * contents.
+ */
+ newval = (NewColumnValue *) palloc0(sizeof(NewColumnValue));
+ newval->attnum = attnum;
+ newval->expr = (Expr *) transform;
+
+ tab->newvals = lappend(tab->newvals, newval);
+
+ ReleaseSysCache(tuple);
+
+ /*
+ * The recursion case is handled by ATSimpleRecursion. However,
+ * if we are told not to recurse, there had better not be any
+ * child tables; else the alter would put them out of step.
*/
if (recurse)
+ ATSimpleRecursion(wqueue, rel, cmd, recurse);
+ else if (!recursing &&
+ find_inheritance_children(RelationGetRelid(rel)) != NIL)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg("type of inherited column \"%s\" must be changed in child tables too",
+ colName)));
+}
+
+static void
+ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
+ const char *colName, TypeName *typename)
+{
+ HeapTuple heapTup;
+ Form_pg_attribute attTup;
+ AttrNumber attnum;
+ HeapTuple typeTuple;
+ Form_pg_type tform;
+ Oid targettype;
+ Node *defaultexpr;
+ Relation attrelation;
+ Relation depRel;
+ ScanKeyData key[3];
+ SysScanDesc scan;
+ HeapTuple depTup;
+
+ attrelation = heap_openr(AttributeRelationName, RowExclusiveLock);
+
+ /* Look up the target column */
+ heapTup = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName);
+ if (!HeapTupleIsValid(heapTup)) /* shouldn't happen */
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_COLUMN),
+ errmsg("column \"%s\" of relation \"%s\" does not exist",
+ colName, RelationGetRelationName(rel))));
+ attTup = (Form_pg_attribute) GETSTRUCT(heapTup);
+ attnum = attTup->attnum;
+
+ /* Check for multiple ALTER TYPE on same column --- can't cope */
+ if (attTup->atttypid != tab->oldDesc->attrs[attnum-1]->atttypid ||
+ attTup->atttypmod != tab->oldDesc->attrs[attnum-1]->atttypmod)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot alter type of column \"%s\" twice",
+ colName)));
+
+ /* Look up the target type (should not fail, since prep found it) */
+ typeTuple = typenameType(typename);
+ tform = (Form_pg_type) GETSTRUCT(typeTuple);
+ targettype = HeapTupleGetOid(typeTuple);
+
+ /*
+ * If there is a default expression for the column, get it and ensure
+ * we can coerce it to the new datatype. (We must do this before
+ * changing the column type, because build_column_default itself will
+ * try to coerce, and will not issue the error message we want if it
+ * fails.)
+ */
+ if (attTup->atthasdef)
{
- List *child,
- *children;
+ defaultexpr = build_column_default(rel, attnum);
+ Assert(defaultexpr);
+ defaultexpr = coerce_to_target_type(NULL, /* no UNKNOWN params */
+ defaultexpr, exprType(defaultexpr),
+ targettype, typename->typmod,
+ COERCION_ASSIGNMENT,
+ COERCE_IMPLICIT_CAST);
+ if (defaultexpr == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("default for column \"%s\" cannot be cast to type \"%s\"",
+ colName, TypeNameToString(typename))));
+ }
+ else
+ defaultexpr = NULL;
- /* This routine is actually in the planner */
- children = find_all_inheritors(myrelid);
+ /*
+ * Find everything that depends on the column (constraints, indexes, etc),
+ * and record enough information to let us recreate the objects.
+ *
+ * The actual recreation does not happen here, but only after we have
+ * performed all the individual ALTER TYPE operations. We have to save
+ * the info before executing ALTER TYPE, though, else the deparser will
+ * get confused.
+ *
+ * There could be multiple entries for the same object, so we must check
+ * to ensure we process each one only once. Note: we assume that an index
+ * that implements a constraint will not show a direct dependency on the
+ * column.
+ */
+ depRel = heap_openr(DependRelationName, RowExclusiveLock);
+
+ ScanKeyInit(&key[0],
+ Anum_pg_depend_refclassid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(RelOid_pg_class));
+ ScanKeyInit(&key[1],
+ Anum_pg_depend_refobjid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(RelationGetRelid(rel)));
+ ScanKeyInit(&key[2],
+ Anum_pg_depend_refobjsubid,
+ BTEqualStrategyNumber, F_INT4EQ,
+ Int32GetDatum((int32) attnum));
+
+ scan = systable_beginscan(depRel, DependReferenceIndex, true,
+ SnapshotNow, 3, key);
+
+ while (HeapTupleIsValid(depTup = systable_getnext(scan)))
+ {
+ Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(depTup);
+ ObjectAddress foundObject;
- /*
- * find_all_inheritors does the recursive search of the
- * inheritance hierarchy, so all we have to do is process all of
- * the relids in the list that it returns.
- */
- foreach(child, children)
+ /* We don't expect any PIN dependencies on columns */
+ if (foundDep->deptype == DEPENDENCY_PIN)
+ elog(ERROR, "cannot alter type of a pinned column");
+
+ foundObject.classId = foundDep->classid;
+ foundObject.objectId = foundDep->objid;
+ foundObject.objectSubId = foundDep->objsubid;
+
+ switch (getObjectClass(&foundObject))
{
- Oid childrelid = lfirsto(child);
- Relation inhrel;
+ case OCLASS_CLASS:
+ {
+ char relKind = get_rel_relkind(foundObject.objectId);
- if (childrelid == myrelid)
- continue;
- inhrel = heap_open(childrelid, AccessExclusiveLock);
- /* do NOT count child constraints in deleted. */
- RemoveRelConstraints(inhrel, constrName, behavior);
- heap_close(inhrel, NoLock);
+ if (relKind == RELKIND_INDEX)
+ {
+ Assert(foundObject.objectSubId == 0);
+ if (!oidMember(foundObject.objectId, tab->changedIndexOids))
+ {
+ tab->changedIndexOids = lappendo(tab->changedIndexOids,
+ foundObject.objectId);
+ tab->changedIndexDefs = lappend(tab->changedIndexDefs,
+ pg_get_indexdef_string(foundObject.objectId));
+ }
+ }
+ else if (relKind == RELKIND_SEQUENCE)
+ {
+ /*
+ * This must be a SERIAL column's sequence. We need not
+ * do anything to it.
+ */
+ Assert(foundObject.objectSubId == 0);
+ }
+ else
+ {
+ /* Not expecting any other direct dependencies... */
+ elog(ERROR, "unexpected object depending on column: %s",
+ getObjectDescription(&foundObject));
+ }
+ break;
+ }
+
+ case OCLASS_CONSTRAINT:
+ Assert(foundObject.objectSubId == 0);
+ if (!oidMember(foundObject.objectId, tab->changedConstraintOids))
+ {
+ tab->changedConstraintOids = lappendo(tab->changedConstraintOids,
+ foundObject.objectId);
+ tab->changedConstraintDefs = lappend(tab->changedConstraintDefs,
+ pg_get_constraintdef_string(foundObject.objectId));
+ }
+ break;
+
+ case OCLASS_REWRITE:
+ /* XXX someday see if we can cope with revising views */
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot alter type of a column used by a view or rule"),
+ errdetail("%s depends on column \"%s\"",
+ getObjectDescription(&foundObject),
+ colName)));
+ break;
+
+ case OCLASS_DEFAULT:
+ /*
+ * Ignore the column's default expression, since we will fix
+ * it below.
+ */
+ Assert(defaultexpr);
+ break;
+
+ case OCLASS_PROC:
+ case OCLASS_TYPE:
+ case OCLASS_CAST:
+ case OCLASS_CONVERSION:
+ case OCLASS_LANGUAGE:
+ case OCLASS_OPERATOR:
+ case OCLASS_OPCLASS:
+ case OCLASS_TRIGGER:
+ case OCLASS_SCHEMA:
+ /*
+ * We don't expect any of these sorts of objects to depend
+ * on a column.
+ */
+ elog(ERROR, "unexpected object depending on column: %s",
+ getObjectDescription(&foundObject));
+ break;
+
+ default:
+ elog(ERROR, "unrecognized object class: %u",
+ foundObject.classId);
}
}
+ systable_endscan(scan);
+
/*
- * Now do the thing on this relation.
+ * Now scan for dependencies of this column on other things. The only
+ * thing we should find is the dependency on the column datatype,
+ * which we want to remove.
*/
- deleted += RemoveRelConstraints(rel, constrName, behavior);
+ ScanKeyInit(&key[0],
+ Anum_pg_depend_classid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(RelOid_pg_class));
+ ScanKeyInit(&key[1],
+ Anum_pg_depend_objid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(RelationGetRelid(rel)));
+ ScanKeyInit(&key[2],
+ Anum_pg_depend_objsubid,
+ BTEqualStrategyNumber, F_INT4EQ,
+ Int32GetDatum((int32) attnum));
+
+ scan = systable_beginscan(depRel, DependDependerIndex, true,
+ SnapshotNow, 3, key);
+
+ while (HeapTupleIsValid(depTup = systable_getnext(scan)))
+ {
+ Form_pg_depend foundDep = (Form_pg_depend) GETSTRUCT(depTup);
- /* Close the target relation */
- heap_close(rel, NoLock);
+ if (foundDep->deptype != DEPENDENCY_NORMAL)
+ elog(ERROR, "found unexpected dependency type '%c'",
+ foundDep->deptype);
+ if (foundDep->classid != RelOid_pg_type ||
+ foundDep->objid != attTup->atttypid)
+ elog(ERROR, "found unexpected dependency for column");
- /* If zero constraints deleted, complain */
- if (deleted == 0)
- ereport(ERROR,
- (errcode(ERRCODE_UNDEFINED_OBJECT),
- errmsg("constraint \"%s\" does not exist",
- constrName)));
- /* Otherwise if more than one constraint deleted, notify */
- else if (deleted > 1)
- ereport(NOTICE,
- (errmsg("multiple constraints named \"%s\" were dropped",
- constrName)));
+ simple_heap_delete(depRel, &depTup->t_self);
+ }
+
+ systable_endscan(scan);
+
+ heap_close(depRel, RowExclusiveLock);
+
+ /*
+ * Here we go --- change the recorded column type. (Note heapTup is
+ * a copy of the syscache entry, so okay to scribble on.)
+ */
+ attTup->atttypid = targettype;
+ attTup->atttypmod = typename->typmod;
+ attTup->attndims = length(typename->arrayBounds);
+ attTup->attlen = tform->typlen;
+ attTup->attbyval = tform->typbyval;
+ attTup->attalign = tform->typalign;
+ attTup->attstorage = tform->typstorage;
+
+ ReleaseSysCache(typeTuple);
+
+ simple_heap_update(attrelation, &heapTup->t_self, heapTup);
+
+ /* keep system catalog indexes current */
+ CatalogUpdateIndexes(attrelation, heapTup);
+
+ heap_close(attrelation, RowExclusiveLock);
+
+ /* Install dependency on new datatype */
+ add_column_datatype_dependency(RelationGetRelid(rel), attnum, targettype);
+
+ /* Drop any pg_statistic entry for the column, since it's now wrong type */
+ RemoveStatistics(rel, attnum);
+
+ /*
+ * Update the default, if present, by brute force --- remove and re-add
+ * the default. Probably unsafe to take shortcuts, since the new version
+ * may well have additional dependencies. (It's okay to do this now,
+ * rather than after other ALTER TYPE commands, since the default won't
+ * depend on other column types.)
+ */
+ if (defaultexpr)
+ {
+ /* Must make new row visible since it will be updated again */
+ CommandCounterIncrement();
+
+ /*
+ * We use RESTRICT here for safety, but at present we do not expect
+ * anything to depend on the default.
+ */
+ RemoveAttrDefault(RelationGetRelid(rel), attnum, DROP_RESTRICT, true);
+
+ StoreAttrDefault(rel, attnum, nodeToString(defaultexpr));
+ }
+
+ /* Cleanup */
+ heap_freetuple(heapTup);
}
/*
+ * Cleanup after we've finished all the ALTER TYPE operations for a
+ * particular relation. We have to drop and recreate all the indexes
+ * and constraints that depend on the altered columns.
+ */
+static void
+ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab)
+{
+ ObjectAddress obj;
+ List *l;
+
+ /*
+ * Re-parse the index and constraint definitions, and attach them to
+ * the appropriate work queue entries. We do this before dropping
+ * because in the case of a FOREIGN KEY constraint, we might not yet
+ * have exclusive lock on the table the constraint is attached to,
+ * and we need to get that before dropping. It's safe because the
+ * parser won't actually look at the catalogs to detect the existing
+ * entry.
+ */
+ foreach(l, tab->changedIndexDefs)
+ {
+ ATPostAlterTypeParse((char *) lfirst(l), wqueue);
+ }
+ foreach(l, tab->changedConstraintDefs)
+ {
+ ATPostAlterTypeParse((char *) lfirst(l), wqueue);
+ }
+
+ /*
+ * Now we can drop the existing constraints and indexes --- constraints
+ * first, since some of them might depend on the indexes. It should be
+ * okay to use DROP_RESTRICT here, since nothing else should be depending
+ * on these objects.
+ */
+ if (tab->changedConstraintOids)
+ obj.classId = get_system_catalog_relid(ConstraintRelationName);
+ foreach(l, tab->changedConstraintOids)
+ {
+ obj.objectId = lfirsto(l);
+ obj.objectSubId = 0;
+ performDeletion(&obj, DROP_RESTRICT);
+ }
+
+ obj.classId = RelOid_pg_class;
+ foreach(l, tab->changedIndexOids)
+ {
+ obj.objectId = lfirsto(l);
+ obj.objectSubId = 0;
+ performDeletion(&obj, DROP_RESTRICT);
+ }
+
+ /*
+ * The objects will get recreated during subsequent passes over the
+ * work queue.
+ */
+}
+
+static void
+ATPostAlterTypeParse(char *cmd, List **wqueue)
+{
+ List *raw_parsetree_list;
+ List *querytree_list;
+ List *list_item;
+
+ /*
+ * We expect that we only have to do raw parsing and parse analysis, not
+ * any rule rewriting, since these will all be utility statements.
+ */
+ raw_parsetree_list = raw_parser(cmd);
+ querytree_list = NIL;
+ foreach(list_item, raw_parsetree_list)
+ {
+ Node *parsetree = (Node *) lfirst(list_item);
+
+ querytree_list = nconc(querytree_list,
+ parse_analyze(parsetree, NULL, 0));
+ }
+
+ /*
+ * Attach each generated command to the proper place in the work queue.
+ * Note this could result in creation of entirely new work-queue entries.
+ */
+ foreach(list_item, querytree_list)
+ {
+ Query *query = (Query *) lfirst(list_item);
+ Relation rel;
+ AlteredTableInfo *tab;
+
+ Assert(IsA(query, Query));
+ Assert(query->commandType == CMD_UTILITY);
+ switch (nodeTag(query->utilityStmt))
+ {
+ case T_IndexStmt:
+ {
+ IndexStmt *stmt = (IndexStmt *) query->utilityStmt;
+ AlterTableCmd *newcmd;
+
+ rel = relation_openrv(stmt->relation, AccessExclusiveLock);
+ tab = ATGetQueueEntry(wqueue, rel);
+ newcmd = makeNode(AlterTableCmd);
+ newcmd->subtype = AT_ReAddIndex;
+ newcmd->def = (Node *) stmt;
+ tab->subcmds[AT_PASS_OLD_INDEX] =
+ lappend(tab->subcmds[AT_PASS_OLD_INDEX], newcmd);
+ relation_close(rel, NoLock);
+ break;
+ }
+ case T_AlterTableStmt:
+ {
+ AlterTableStmt *stmt = (AlterTableStmt *) query->utilityStmt;
+ List *lcmd;
+
+ rel = relation_openrv(stmt->relation, AccessExclusiveLock);
+ tab = ATGetQueueEntry(wqueue, rel);
+ foreach(lcmd, stmt->cmds)
+ {
+ AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd);
+
+ switch (cmd->subtype)
+ {
+ case AT_AddIndex:
+ cmd->subtype = AT_ReAddIndex;
+ tab->subcmds[AT_PASS_OLD_INDEX] =
+ lappend(tab->subcmds[AT_PASS_OLD_INDEX], cmd);
+ break;
+ case AT_AddConstraint:
+ tab->subcmds[AT_PASS_OLD_CONSTR] =
+ lappend(tab->subcmds[AT_PASS_OLD_CONSTR], cmd);
+ break;
+ default:
+ elog(ERROR, "unexpected statement type: %d",
+ (int) cmd->subtype);
+ }
+ }
+ relation_close(rel, NoLock);
+ break;
+ }
+ default:
+ elog(ERROR, "unexpected statement type: %d",
+ (int) nodeTag(query->utilityStmt));
+ }
+ }
+}
+
+
+/*
* ALTER TABLE OWNER
*/
-void
-AlterTableOwner(Oid relationOid, int32 newOwnerSysId)
+static void
+ATExecChangeOwner(Oid relationOid, int32 newOwnerSysId)
{
Relation target_rel;
Relation class_rel;
@@ -3879,7 +4965,7 @@ AlterTableOwner(Oid relationOid, int32 newOwnerSysId)
/* For each index, recursively change its ownership */
foreach(i, index_oid_list)
- AlterTableOwner(lfirsto(i), newOwnerSysId);
+ ATExecChangeOwner(lfirsto(i), newOwnerSysId);
freeList(index_oid_list);
}
@@ -3888,7 +4974,7 @@ AlterTableOwner(Oid relationOid, int32 newOwnerSysId)
{
/* If it has a toast table, recurse to change its ownership */
if (tuple_class->reltoastrelid != InvalidOid)
- AlterTableOwner(tuple_class->reltoastrelid, newOwnerSysId);
+ ATExecChangeOwner(tuple_class->reltoastrelid, newOwnerSysId);
}
heap_freetuple(tuple);
@@ -3901,25 +4987,22 @@ AlterTableOwner(Oid relationOid, int32 newOwnerSysId)
*
* The only thing we have to do is to change the indisclustered bits.
*/
-void
-AlterTableClusterOn(Oid relOid, const char *indexName)
+static void
+ATExecClusterOn(Relation rel, const char *indexName)
{
- Relation rel,
- pg_index;
+ Relation pg_index;
List *index;
Oid indexOid;
HeapTuple indexTuple;
Form_pg_index indexForm;
- rel = heap_open(relOid, AccessExclusiveLock);
-
indexOid = get_relname_relid(indexName, rel->rd_rel->relnamespace);
if (!OidIsValid(indexOid))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("index \"%s\" for table \"%s\" does not exist",
- indexName, NameStr(rel->rd_rel->relname))));
+ indexName, RelationGetRelationName(rel))));
indexTuple = SearchSysCache(INDEXRELID,
ObjectIdGetDatum(indexOid),
@@ -3935,10 +5018,11 @@ AlterTableClusterOn(Oid relOid, const char *indexName)
if (indexForm->indisclustered)
{
ReleaseSysCache(indexTuple);
- heap_close(rel, NoLock);
return;
}
+ ReleaseSysCache(indexTuple);
+
pg_index = heap_openr(IndexRelationName, RowExclusiveLock);
/*
@@ -3977,14 +5061,15 @@ AlterTableClusterOn(Oid relOid, const char *indexName)
}
heap_close(pg_index, RowExclusiveLock);
-
- ReleaseSysCache(indexTuple);
-
- heap_close(rel, NoLock); /* close rel, but keep lock till commit */
}
/*
* ALTER TABLE CREATE TOAST TABLE
+ *
+ * Note: this is also invoked from outside this module; in such cases we
+ * expect the caller to have verified that the relation is a table and we
+ * have all the right permissions. Callers expect this function
+ * to end with CommandCounterIncrement if it makes any changes.
*/
void
AlterTableCreateToastTable(Oid relOid, bool silent)
@@ -4005,21 +5090,11 @@ AlterTableCreateToastTable(Oid relOid, bool silent)
/*
* Grab an exclusive lock on the target table, which we will NOT
- * release until end of transaction.
+ * release until end of transaction. (This is probably redundant
+ * in all present uses...)
*/
rel = heap_open(relOid, AccessExclusiveLock);
- if (rel->rd_rel->relkind != RELKIND_RELATION)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not a table",
- RelationGetRelationName(rel))));
-
- /* Permissions checks */
- if (!pg_class_ownercheck(relOid, GetUserId()))
- aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
- RelationGetRelationName(rel));
-
/*
* Toast table is shared if and only if its parent is.
*
@@ -4149,7 +5224,7 @@ AlterTableCreateToastTable(Oid relOid, bool silent)
toast_idxid = index_create(toast_relid, toast_idxname, indexInfo,
BTREE_AM_OID, classObjectId,
- true, false, true);
+ true, false, true, false);
/*
* Update toast rel's pg_class entry to show that it has an index. The