diff options
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r-- | src/backend/commands/tablecmds.c | 257 |
1 files changed, 173 insertions, 84 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 82143682d58..a5d7af60420 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -263,12 +263,13 @@ static void ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel, static void ATRewriteTables(List **wqueue, LOCKMODE lockmode); static void ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode); static AlteredTableInfo *ATGetQueueEntry(List **wqueue, Relation rel); -static void ATSimplePermissions(Relation rel, bool allowView); +static void ATSimplePermissions(Relation rel, bool allowView, bool allowType); static void ATSimplePermissionsRelationOrIndex(Relation rel); static void ATSimpleRecursion(List **wqueue, Relation rel, AlterTableCmd *cmd, bool recurse, LOCKMODE lockmode); static void ATOneLevelRecursion(List **wqueue, Relation rel, AlterTableCmd *cmd, LOCKMODE lockmode); +static void find_typed_table_dependencies(Oid typeOid, const char *typeName); static void ATPrepAddColumn(List **wqueue, Relation rel, bool recurse, AlterTableCmd *cmd, LOCKMODE lockmode); static void ATExecAddColumn(AlteredTableInfo *tab, Relation rel, @@ -1978,6 +1979,10 @@ renameatt(Oid myrelid, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot rename column of typed table"))); + if (targetrelation->rd_rel->relkind == RELKIND_COMPOSITE_TYPE) + find_typed_table_dependencies(targetrelation->rd_rel->reltype, + RelationGetRelationName(targetrelation)); + /* * Renaming the columns of sequences or toast tables doesn't actually * break anything from the system's point of view, since internal @@ -2368,8 +2373,13 @@ AlterTable(AlterTableStmt *stmt) /* * For mostly-historical reasons, we allow ALTER TABLE to apply to - * all relation types. + * almost all relation types. */ + if (rel->rd_rel->relkind == RELKIND_COMPOSITE_TYPE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a table", + RelationGetRelationName(rel)))); break; case OBJECT_INDEX: @@ -2388,6 +2398,14 @@ AlterTable(AlterTableStmt *stmt) RelationGetRelationName(rel)))); break; + case OBJECT_TYPE: + if (rel->rd_rel->relkind != RELKIND_COMPOSITE_TYPE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a composite type", + RelationGetRelationName(rel)))); + break; + case OBJECT_VIEW: if (rel->rd_rel->relkind != RELKIND_VIEW) ereport(ERROR, @@ -2639,14 +2657,14 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, switch (cmd->subtype) { case AT_AddColumn: /* ADD COLUMN */ - ATSimplePermissions(rel, false); + ATSimplePermissions(rel, false, true); /* Performs own recursion */ ATPrepAddColumn(wqueue, rel, recurse, cmd, lockmode); pass = AT_PASS_ADD_COL; break; case AT_AddColumnToView: /* add column via CREATE OR REPLACE * VIEW */ - ATSimplePermissions(rel, true); + ATSimplePermissions(rel, true, false); /* Performs own recursion */ ATPrepAddColumn(wqueue, rel, recurse, cmd, lockmode); pass = AT_PASS_ADD_COL; @@ -2659,19 +2677,19 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, * substitutes default values into INSERTs before it expands * rules. */ - ATSimplePermissions(rel, true); + ATSimplePermissions(rel, true, false); ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode); /* No command-specific prep needed */ pass = cmd->def ? AT_PASS_ADD_CONSTR : AT_PASS_DROP; break; case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */ - ATSimplePermissions(rel, false); + ATSimplePermissions(rel, false, false); ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode); /* No command-specific prep needed */ pass = AT_PASS_DROP; break; case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */ - ATSimplePermissions(rel, false); + ATSimplePermissions(rel, false, false); ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode); /* No command-specific prep needed */ pass = AT_PASS_ADD_CONSTR; @@ -2689,25 +2707,25 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, pass = AT_PASS_MISC; break; case AT_SetStorage: /* ALTER COLUMN SET STORAGE */ - ATSimplePermissions(rel, false); + ATSimplePermissions(rel, false, false); ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode); /* No command-specific prep needed */ pass = AT_PASS_MISC; break; case AT_DropColumn: /* DROP COLUMN */ - ATSimplePermissions(rel, false); + ATSimplePermissions(rel, false, true); ATPrepDropColumn(rel, recurse, cmd); /* Recursion occurs during execution phase */ pass = AT_PASS_DROP; break; case AT_AddIndex: /* ADD INDEX */ - ATSimplePermissions(rel, false); + ATSimplePermissions(rel, false, false); /* This command never recurses */ /* No command-specific prep needed */ pass = AT_PASS_ADD_INDEX; break; case AT_AddConstraint: /* ADD CONSTRAINT */ - ATSimplePermissions(rel, false); + ATSimplePermissions(rel, false, false); /* Recursion occurs during execution phase */ /* No command-specific prep needed except saving recurse flag */ if (recurse) @@ -2715,7 +2733,7 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, pass = AT_PASS_ADD_CONSTR; break; case AT_DropConstraint: /* DROP CONSTRAINT */ - ATSimplePermissions(rel, false); + ATSimplePermissions(rel, false, false); /* Recursion occurs during execution phase */ /* No command-specific prep needed except saving recurse flag */ if (recurse) @@ -2723,7 +2741,7 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, pass = AT_PASS_DROP; break; case AT_AlterColumnType: /* ALTER COLUMN TYPE */ - ATSimplePermissions(rel, false); + ATSimplePermissions(rel, false, true); /* Performs own recursion */ ATPrepAlterColumnType(wqueue, tab, rel, recurse, recursing, cmd, lockmode); pass = AT_PASS_ALTER_TYPE; @@ -2735,20 +2753,20 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, break; case AT_ClusterOn: /* CLUSTER ON */ case AT_DropCluster: /* SET WITHOUT CLUSTER */ - ATSimplePermissions(rel, false); + ATSimplePermissions(rel, false, false); /* These commands never recurse */ /* No command-specific prep needed */ pass = AT_PASS_MISC; break; case AT_AddOids: /* SET WITH OIDS */ - ATSimplePermissions(rel, false); + ATSimplePermissions(rel, false, false); /* Performs own recursion */ if (!rel->rd_rel->relhasoids || recursing) ATPrepAddOids(wqueue, rel, recurse, cmd, lockmode); pass = AT_PASS_ADD_COL; break; case AT_DropOids: /* SET WITHOUT OIDS */ - ATSimplePermissions(rel, false); + ATSimplePermissions(rel, false, false); /* Performs own recursion */ if (rel->rd_rel->relhasoids) { @@ -2775,7 +2793,7 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, pass = AT_PASS_MISC; break; case AT_AddInherit: /* INHERIT */ - ATSimplePermissions(rel, false); + ATSimplePermissions(rel, false, false); /* This command never recurses */ ATPrepAddInherit(rel); pass = AT_PASS_MISC; @@ -2793,7 +2811,7 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, case AT_EnableReplicaRule: case AT_DisableRule: case AT_DropInherit: /* NO INHERIT */ - ATSimplePermissions(rel, false); + ATSimplePermissions(rel, false, false); /* These commands never recurse */ /* No command-specific prep needed */ pass = AT_PASS_MISC; @@ -3519,7 +3537,7 @@ ATGetQueueEntry(List **wqueue, Relation rel) * - Ensure that it is not a system table */ static void -ATSimplePermissions(Relation rel, bool allowView) +ATSimplePermissions(Relation rel, bool allowView, bool allowType) { if (rel->rd_rel->relkind != RELKIND_RELATION) { @@ -3531,6 +3549,14 @@ ATSimplePermissions(Relation rel, bool allowView) errmsg("\"%s\" is not a table or view", RelationGetRelationName(rel)))); } + else if (allowType) + { + if (rel->rd_rel->relkind != RELKIND_COMPOSITE_TYPE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a table or composite type", + RelationGetRelationName(rel)))); + } else ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), @@ -3759,6 +3785,44 @@ find_composite_type_dependencies(Oid typeOid, /* + * find_typed_table_dependencies + * + * Check to see if a composite type is being used as the type of a + * typed table. Eventually, we'd like to propagate the alter + * operation into such tables, but for now, just error out if we find + * any. + */ +static void +find_typed_table_dependencies(Oid typeOid, const char *typeName) +{ + Relation classRel; + ScanKeyData key[1]; + HeapScanDesc scan; + HeapTuple tuple; + + classRel = heap_open(RelationRelationId, AccessShareLock); + + ScanKeyInit(&key[0], + Anum_pg_class_reloftype, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(typeOid)); + + scan = heap_beginscan(classRel, SnapshotNow, 1, key); + + if (HeapTupleIsValid(tuple = heap_getnext(scan, ForwardScanDirection))) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot alter type \"%s\" because it is the type of a typed table", + typeName))); + } + + heap_endscan(scan); + heap_close(classRel, AccessShareLock); +} + + +/* * ALTER TABLE ADD COLUMN * * Adds an additional attribute to a relation making the assumption that @@ -3804,6 +3868,10 @@ ATPrepAddColumn(List **wqueue, Relation rel, bool recurse, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("column must be added to child tables too"))); } + + if (rel->rd_rel->relkind == RELKIND_COMPOSITE_TYPE) + find_typed_table_dependencies(rel->rd_rel->reltype, + RelationGetRelationName(rel)); } static void @@ -4007,7 +4075,7 @@ ATExecAddColumn(AlteredTableInfo *tab, Relation rel, * defaults, not even for domain-typed columns. And in any case we * mustn't invoke Phase 3 on a view, since it has no storage. */ - if (relkind != RELKIND_VIEW && attribute.attnum > 0) + if (relkind != RELKIND_VIEW && relkind != RELKIND_COMPOSITE_TYPE && attribute.attnum > 0) { defval = (Expr *) build_column_default(rel, attribute.attnum); @@ -4535,6 +4603,10 @@ ATPrepDropColumn(Relation rel, bool recurse, AlterTableCmd *cmd) (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot drop column from typed table"))); + if (rel->rd_rel->relkind == RELKIND_COMPOSITE_TYPE) + find_typed_table_dependencies(rel->rd_rel->reltype, + RelationGetRelationName(rel)); + /* No command-specific prep needed except saving recurse flag */ if (recurse) cmd->subtype = AT_DropColumnRecurse; @@ -4554,7 +4626,7 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName, /* At top level, permission check was done in ATPrepCmd, else do it */ if (recursing) - ATSimplePermissions(rel, false); + ATSimplePermissions(rel, false, true); /* * get the number of the attribute @@ -4858,7 +4930,7 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, /* At top level, permission check was done in ATPrepCmd, else do it */ if (recursing) - ATSimplePermissions(rel, false); + ATSimplePermissions(rel, false, false); /* * Call AddRelationNewConstraints to do the work, making sure it works on @@ -5801,7 +5873,7 @@ ATExecDropConstraint(Relation rel, const char *constrName, /* At top level, permission check was done in ATPrepCmd, else do it */ if (recursing) - ATSimplePermissions(rel, false); + ATSimplePermissions(rel, false, false); conrel = heap_open(ConstraintRelationId, RowExclusiveLock); @@ -6033,76 +6105,93 @@ ATPrepAlterColumnType(List **wqueue, /* make sure datatype is legal for a column */ CheckAttributeType(colName, targettype, false); - /* - * Set up an expression to transform the old data value to the new type. - * If a USING option was given, transform and use that expression, else - * just take the old value and try to coerce it. We do this first so that - * type incompatibility can be detected before we waste effort, and - * because we need the expression to be parsed against the original table - * rowtype. - */ - if (cmd->transform) + if (tab->relkind == RELKIND_RELATION) { - RangeTblEntry *rte; + /* + * Set up an expression to transform the old data value to the new type. + * If a USING option was given, transform and use that expression, else + * just take the old value and try to coerce it. We do this first so that + * type incompatibility can be detected before we waste effort, and + * because we need the expression to be parsed against the original table + * rowtype. + */ + if (cmd->transform) + { + RangeTblEntry *rte; - /* Expression must be able to access vars of old table */ - rte = addRangeTableEntryForRelation(pstate, - rel, - NULL, - false, - true); - addRTEtoQuery(pstate, rte, false, true, true); + /* Expression must be able to access vars of old table */ + rte = addRangeTableEntryForRelation(pstate, + rel, + NULL, + false, + true); + addRTEtoQuery(pstate, rte, false, true, true); - transform = transformExpr(pstate, cmd->transform); + transform = transformExpr(pstate, cmd->transform); - /* It can't return a set */ - if (expression_returns_set(transform)) + /* It can't return a set */ + if (expression_returns_set(transform)) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("transform expression must not return a set"))); + + /* No subplans or aggregates, either... */ + if (pstate->p_hasSubLinks) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use subquery in transform expression"))); + if (pstate->p_hasAggs) + ereport(ERROR, + (errcode(ERRCODE_GROUPING_ERROR), + errmsg("cannot use aggregate function in transform expression"))); + if (pstate->p_hasWindowFuncs) + ereport(ERROR, + (errcode(ERRCODE_WINDOWING_ERROR), + errmsg("cannot use window function in transform expression"))); + } + else + { + transform = (Node *) makeVar(1, attnum, + attTup->atttypid, attTup->atttypmod, + 0); + } + + transform = coerce_to_target_type(pstate, + transform, exprType(transform), + targettype, targettypmod, + COERCION_ASSIGNMENT, + COERCE_IMPLICIT_CAST, + -1); + if (transform == NULL) ereport(ERROR, (errcode(ERRCODE_DATATYPE_MISMATCH), - errmsg("transform expression must not return a set"))); + errmsg("column \"%s\" cannot be cast to type %s", + colName, format_type_be(targettype)))); - /* No subplans or aggregates, either... */ - if (pstate->p_hasSubLinks) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot use subquery in transform expression"))); - if (pstate->p_hasAggs) - ereport(ERROR, - (errcode(ERRCODE_GROUPING_ERROR), - errmsg("cannot use aggregate function in transform expression"))); - if (pstate->p_hasWindowFuncs) - ereport(ERROR, - (errcode(ERRCODE_WINDOWING_ERROR), - errmsg("cannot use window function in transform expression"))); - } - else - { - transform = (Node *) makeVar(1, attnum, - attTup->atttypid, attTup->atttypmod, - 0); - } + /* + * Add a work queue item to make ATRewriteTable update the column + * contents. + */ + newval = (NewColumnValue *) palloc0(sizeof(NewColumnValue)); + newval->attnum = attnum; + newval->expr = (Expr *) transform; - transform = coerce_to_target_type(pstate, - transform, exprType(transform), - targettype, targettypmod, - COERCION_ASSIGNMENT, - COERCE_IMPLICIT_CAST, - -1); - if (transform == NULL) - ereport(ERROR, - (errcode(ERRCODE_DATATYPE_MISMATCH), - errmsg("column \"%s\" cannot be cast to type %s", - colName, format_type_be(targettype)))); + tab->newvals = lappend(tab->newvals, newval); + } - /* - * Add a work queue item to make ATRewriteTable update the column - * contents. - */ - newval = (NewColumnValue *) palloc0(sizeof(NewColumnValue)); - newval->attnum = attnum; - newval->expr = (Expr *) transform; + if (tab->relkind == RELKIND_COMPOSITE_TYPE) + { + /* + * For composite types, do this check now. Tables will check + * it later when the table is being rewritten. + */ + find_composite_type_dependencies(rel->rd_rel->reltype, + NULL, + RelationGetRelationName(rel)); - tab->newvals = lappend(tab->newvals, newval); + find_typed_table_dependencies(rel->rd_rel->reltype, + RelationGetRelationName(rel)); + } ReleaseSysCache(tuple); @@ -7367,7 +7456,7 @@ ATExecAddInherit(Relation child_rel, RangeVar *parent, LOCKMODE lockmode) * Must be owner of both parent and child -- child was checked by * ATSimplePermissions call in ATPrepCmd */ - ATSimplePermissions(parent_rel, false); + ATSimplePermissions(parent_rel, false, false); /* Permanent rels cannot inherit from temporary ones */ if (parent_rel->rd_istemp && !child_rel->rd_istemp) |