diff options
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r-- | src/backend/commands/tablecmds.c | 277 |
1 files changed, 260 insertions, 17 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 1f709a49773..bb77c5311a9 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -81,6 +81,7 @@ #include "utils/snapmgr.h" #include "utils/syscache.h" #include "utils/tqual.h" +#include "utils/typcache.h" /* @@ -357,6 +358,9 @@ static void ATExecEnableDisableRule(Relation rel, char *rulename, static void ATPrepAddInherit(Relation child_rel); static void ATExecAddInherit(Relation child_rel, RangeVar *parent, LOCKMODE lockmode); static void ATExecDropInherit(Relation rel, RangeVar *parent, LOCKMODE lockmode); +static void drop_parent_dependency(Oid relid, Oid refclassid, Oid refobjid); +static void ATExecAddOf(Relation rel, const TypeName *ofTypename, LOCKMODE lockmode); +static void ATExecDropOf(Relation rel, LOCKMODE lockmode); static void ATExecGenericOptions(Relation rel, List *options); static void copy_relation_data(SMgrRelation rel, SMgrRelation dst, @@ -2684,6 +2688,16 @@ AlterTableGetLockLevel(List *cmds) break; /* + * These subcommands affect implicit row type conversion. They + * have affects similar to CREATE/DROP CAST on queries. We + * don't provide for invalidating parse trees as a result of + * such changes. Do avoid concurrent pg_class updates, though. + */ + case AT_AddOf: + case AT_DropOf: + cmd_lockmode = ShareUpdateExclusiveLock; + + /* * These subcommands affect general strategies for performance * and maintenance, though don't change the semantic results * from normal data reads and writes. Delaying an ALTER TABLE @@ -2942,13 +2956,11 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, case AT_EnableAlwaysRule: case AT_EnableReplicaRule: case AT_DisableRule: - ATSimplePermissions(rel, ATT_TABLE); - /* These commands never recurse */ - /* No command-specific prep needed */ - pass = AT_PASS_MISC; - break; case AT_DropInherit: /* NO INHERIT */ + case AT_AddOf: /* OF */ + case AT_DropOf: /* NOT OF */ ATSimplePermissions(rel, ATT_TABLE); + /* These commands never recurse */ /* No command-specific prep needed */ pass = AT_PASS_MISC; break; @@ -3211,6 +3223,12 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel, case AT_DropInherit: ATExecDropInherit(rel, (RangeVar *) cmd->def, lockmode); break; + case AT_AddOf: + ATExecAddOf(rel, (TypeName *) cmd->def, lockmode); + break; + case AT_DropOf: + ATExecDropOf(rel, lockmode); + break; case AT_GenericOptions: ATExecGenericOptions(rel, (List *) cmd->def); break; @@ -4046,6 +4064,42 @@ find_typed_table_dependencies(Oid typeOid, const char *typeName, DropBehavior be /* + * check_of_type + * + * Check whether a type is suitable for CREATE TABLE OF/ALTER TABLE OF. If it + * isn't suitable, throw an error. Currently, we require that the type + * originated with CREATE TABLE AS. We could support any row type, but doing so + * would require handling a number of extra corner cases in the DDL commands. + */ +void +check_of_type(HeapTuple typetuple) +{ + Form_pg_type typ = (Form_pg_type) GETSTRUCT(typetuple); + bool typeOk = false; + + if (typ->typtype == TYPTYPE_COMPOSITE) + { + Relation typeRelation; + + Assert(OidIsValid(typ->typrelid)); + typeRelation = relation_open(typ->typrelid, AccessShareLock); + typeOk = (typeRelation->rd_rel->relkind == RELKIND_COMPOSITE_TYPE); + /* + * Close the parent rel, but keep our AccessShareLock on it until xact + * commit. That will prevent someone else from deleting or ALTERing + * the type before the typed table creation/conversion commits. + */ + relation_close(typeRelation, NoLock); + } + if (!typeOk) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("type %s is not a composite type", + format_type_be(HeapTupleGetOid(typetuple))))); +} + + +/* * ALTER TABLE ADD COLUMN * * Adds an additional attribute to a relation making the assumption that @@ -8355,8 +8409,7 @@ ATExecDropInherit(Relation rel, RangeVar *parent, LOCKMODE lockmode) ScanKeyData key[3]; HeapTuple inheritsTuple, attributeTuple, - constraintTuple, - depTuple; + constraintTuple; List *connames; bool found = false; @@ -8522,11 +8575,29 @@ ATExecDropInherit(Relation rel, RangeVar *parent, LOCKMODE lockmode) systable_endscan(scan); heap_close(catalogRelation, RowExclusiveLock); - /* - * Drop the dependency - * - * There's no convenient way to do this, so go trawling through pg_depend - */ + drop_parent_dependency(RelationGetRelid(rel), + RelationRelationId, + RelationGetRelid(parent_rel)); + + /* keep our lock on the parent relation until commit */ + heap_close(parent_rel, NoLock); +} + +/* + * Drop the dependency created by StoreCatalogInheritance1 (CREATE TABLE + * INHERITS/ALTER TABLE INHERIT -- refclassid will be RelationRelationId) or + * heap_create_with_catalog (CREATE TABLE OF/ALTER TABLE OF -- refclassid will + * be TypeRelationId). There's no convenient way to do this, so go trawling + * through pg_depend. + */ +static void +drop_parent_dependency(Oid relid, Oid refclassid, Oid refobjid) +{ + Relation catalogRelation; + SysScanDesc scan; + ScanKeyData key[3]; + HeapTuple depTuple; + catalogRelation = heap_open(DependRelationId, RowExclusiveLock); ScanKeyInit(&key[0], @@ -8536,7 +8607,7 @@ ATExecDropInherit(Relation rel, RangeVar *parent, LOCKMODE lockmode) ScanKeyInit(&key[1], Anum_pg_depend_objid, BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(RelationGetRelid(rel))); + ObjectIdGetDatum(relid)); ScanKeyInit(&key[2], Anum_pg_depend_objsubid, BTEqualStrategyNumber, F_INT4EQ, @@ -8549,8 +8620,8 @@ ATExecDropInherit(Relation rel, RangeVar *parent, LOCKMODE lockmode) { Form_pg_depend dep = (Form_pg_depend) GETSTRUCT(depTuple); - if (dep->refclassid == RelationRelationId && - dep->refobjid == RelationGetRelid(parent_rel) && + if (dep->refclassid == refclassid && + dep->refobjid == refobjid && dep->refobjsubid == 0 && dep->deptype == DEPENDENCY_NORMAL) simple_heap_delete(catalogRelation, &depTuple->t_self); @@ -8558,9 +8629,181 @@ ATExecDropInherit(Relation rel, RangeVar *parent, LOCKMODE lockmode) systable_endscan(scan); heap_close(catalogRelation, RowExclusiveLock); +} - /* keep our lock on the parent relation until commit */ - heap_close(parent_rel, NoLock); +/* + * ALTER TABLE OF + * + * Attach a table to a composite type, as though it had been created with CREATE + * TABLE OF. All attname, atttypid, atttypmod and attcollation must match. The + * subject table must not have inheritance parents. These restrictions ensure + * that you cannot create a configuration impossible with CREATE TABLE OF alone. + */ +static void +ATExecAddOf(Relation rel, const TypeName *ofTypename, LOCKMODE lockmode) +{ + Oid relid = RelationGetRelid(rel); + Type typetuple; + Form_pg_type typ; + Oid typeid; + Relation inheritsRelation, + relationRelation; + SysScanDesc scan; + ScanKeyData key; + AttrNumber table_attno, + type_attno; + TupleDesc typeTupleDesc, + tableTupleDesc; + ObjectAddress tableobj, + typeobj; + HeapTuple classtuple; + + /* Validate the type. */ + typetuple = typenameType(NULL, ofTypename, NULL); + check_of_type(typetuple); + typ = (Form_pg_type) GETSTRUCT(typetuple); + typeid = HeapTupleGetOid(typetuple); + + /* Fail if the table has any inheritance parents. */ + inheritsRelation = heap_open(InheritsRelationId, AccessShareLock); + ScanKeyInit(&key, + Anum_pg_inherits_inhrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(relid)); + scan = systable_beginscan(inheritsRelation, InheritsRelidSeqnoIndexId, + true, SnapshotNow, 1, &key); + if (HeapTupleIsValid(systable_getnext(scan))) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("typed tables cannot inherit"))); + systable_endscan(scan); + heap_close(inheritsRelation, AccessShareLock); + + /* + * Check the tuple descriptors for compatibility. Unlike inheritance, we + * require that the order also match. However, attnotnull need not match. + * Also unlike inheritance, we do not require matching relhasoids. + */ + typeTupleDesc = lookup_rowtype_tupdesc(typeid, -1); + tableTupleDesc = RelationGetDescr(rel); + table_attno = 1; + for (type_attno = 1; type_attno <= typeTupleDesc->natts; type_attno++) + { + Form_pg_attribute type_attr, + table_attr; + const char *type_attname, + *table_attname; + + /* Get the next non-dropped type attribute. */ + type_attr = typeTupleDesc->attrs[type_attno - 1]; + if (type_attr->attisdropped) + continue; + type_attname = NameStr(type_attr->attname); + + /* Get the next non-dropped table attribute. */ + do + { + if (table_attno > tableTupleDesc->natts) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("table is missing column \"%s\"", + type_attname))); + table_attr = tableTupleDesc->attrs[table_attno++ - 1]; + } while (table_attr->attisdropped); + table_attname = NameStr(table_attr->attname); + + /* Compare name. */ + if (strncmp(table_attname, type_attname, NAMEDATALEN) != 0) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("table has column \"%s\" where type requires \"%s\"", + table_attname, type_attname))); + + /* Compare type. */ + if (table_attr->atttypid != type_attr->atttypid || + table_attr->atttypmod != type_attr->atttypmod || + table_attr->attcollation != type_attr->attcollation) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("table \"%s\" has different type for column \"%s\"", + RelationGetRelationName(rel), type_attname))); + } + DecrTupleDescRefCount(typeTupleDesc); + + /* Any remaining columns at the end of the table had better be dropped. */ + for (; table_attno <= tableTupleDesc->natts; table_attno++) + { + Form_pg_attribute table_attr = tableTupleDesc->attrs[table_attno - 1]; + if (!table_attr->attisdropped) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("table has extra column \"%s\"", + NameStr(table_attr->attname)))); + } + + /* If the table was already typed, drop the existing dependency. */ + if (rel->rd_rel->reloftype) + drop_parent_dependency(relid, TypeRelationId, rel->rd_rel->reloftype); + + /* Record a dependency on the new type. */ + tableobj.classId = RelationRelationId; + tableobj.objectId = relid; + tableobj.objectSubId = 0; + typeobj.classId = TypeRelationId; + typeobj.objectId = typeid; + typeobj.objectSubId = 0; + recordDependencyOn(&tableobj, &typeobj, DEPENDENCY_NORMAL); + + /* Update pg_class.reloftype */ + relationRelation = heap_open(RelationRelationId, RowExclusiveLock); + classtuple = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relid)); + if (!HeapTupleIsValid(classtuple)) + elog(ERROR, "cache lookup failed for relation %u", relid); + ((Form_pg_class) GETSTRUCT(classtuple))->reloftype = typeid; + simple_heap_update(relationRelation, &classtuple->t_self, classtuple); + CatalogUpdateIndexes(relationRelation, classtuple); + heap_freetuple(classtuple); + heap_close(relationRelation, RowExclusiveLock); + + ReleaseSysCache(typetuple); +} + +/* + * ALTER TABLE NOT OF + * + * Detach a typed table from its originating type. Just clear reloftype and + * remove the dependency. + */ +static void +ATExecDropOf(Relation rel, LOCKMODE lockmode) +{ + Oid relid = RelationGetRelid(rel); + Relation relationRelation; + HeapTuple tuple; + + if (!OidIsValid(rel->rd_rel->reloftype)) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is not a typed table", + RelationGetRelationName(rel)))); + + /* + * We don't bother to check ownership of the type --- ownership of the table + * is presumed enough rights. No lock required on the type, either. + */ + + drop_parent_dependency(relid, TypeRelationId, rel->rd_rel->reloftype); + + /* Clear pg_class.reloftype */ + relationRelation = heap_open(RelationRelationId, RowExclusiveLock); + tuple = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relid)); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "cache lookup failed for relation %u", relid); + ((Form_pg_class) GETSTRUCT(tuple))->reloftype = InvalidOid; + simple_heap_update(relationRelation, &tuple->t_self, tuple); + CatalogUpdateIndexes(relationRelation, tuple); + heap_freetuple(tuple); + heap_close(relationRelation, RowExclusiveLock); } /* |