diff options
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r-- | src/backend/commands/tablecmds.c | 137 |
1 files changed, 88 insertions, 49 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 541c75ab962..1edf6c75533 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -66,6 +66,7 @@ #include "storage/bufmgr.h" #include "storage/lmgr.h" #include "storage/smgr.h" +#include "tcop/utility.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/fmgroids.h" @@ -249,8 +250,8 @@ static Oid transformFkeyCheckAttrs(Relation pkrel, static void checkFkeyPermissions(Relation rel, int16 *attnums, int natts); static void validateForeignKeyConstraint(FkConstraint *fkconstraint, Relation rel, Relation pkrel, Oid constraintOid); -static void createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint, - Oid constraintOid); +static void createForeignKeyTriggers(Relation rel, Oid refRelOid, + FkConstraint *fkconstraint, Oid constraintOid); static void ATController(Relation rel, List *cmds, bool recurse); static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, bool recurse, bool recursing); @@ -308,7 +309,8 @@ static void ATPrepAlterColumnType(List **wqueue, static void ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel, const char *colName, TypeName *typename); static void ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab); -static void ATPostAlterTypeParse(char *cmd, List **wqueue); +static void ATPostAlterTypeParse(Oid oldRelId, Oid refRelId, char *cmd, + List **wqueue); static void change_owner_fix_column_acls(Oid relationOid, Oid oldOwnerId, Oid newOwnerId); static void change_owner_recurse_to_sequences(Oid relationOid, @@ -2159,9 +2161,12 @@ CheckTableNotInUse(Relation rel, const char *stmt) * the whole operation; we don't have to do anything special to clean up. */ void -AlterTable(AlterTableStmt *stmt) +AlterTable(Oid relid, AlterTableStmt *stmt) { - Relation rel = relation_openrv(stmt->relation, AccessExclusiveLock); + Relation rel; + + /* Caller is required to provide an adequate lock. */ + rel = relation_open(relid, NoLock); CheckTableNotInUse(rel, "ALTER TABLE"); @@ -4291,7 +4296,7 @@ ATExecAddIndex(AlteredTableInfo *tab, Relation rel, /* The IndexStmt has already been through transformIndexStmt */ - DefineIndex(stmt->relation, /* relation */ + DefineIndex(RelationGetRelid(rel), /* relation */ stmt->idxname, /* index name */ InvalidOid, /* no predefined OID */ stmt->accessMethod, /* am name */ @@ -4540,7 +4545,10 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, * table; trying to start with a lesser lock will just create a risk of * deadlock.) */ - pkrel = heap_openrv(fkconstraint->pktable, AccessExclusiveLock); + if (OidIsValid(fkconstraint->old_pktable_oid)) + pkrel = heap_open(fkconstraint->old_pktable_oid, AccessExclusiveLock); + else + pkrel = heap_openrv(fkconstraint->pktable, AccessExclusiveLock); /* * Validity checks (permission checks wait till we have the column @@ -4776,7 +4784,8 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, /* * Create the triggers that will enforce the constraint. */ - createForeignKeyTriggers(rel, fkconstraint, constrOid); + createForeignKeyTriggers(rel, RelationGetRelid(pkrel), fkconstraint, + constrOid); /* * Tell Phase 3 to check that the constraint is satisfied by existing rows @@ -5148,14 +5157,14 @@ validateForeignKeyConstraint(FkConstraint *fkconstraint, } static void -CreateFKCheckTrigger(RangeVar *myRel, FkConstraint *fkconstraint, +CreateFKCheckTrigger(Oid myRelOid, Oid refRelOid, FkConstraint *fkconstraint, Oid constraintOid, bool on_insert) { CreateTrigStmt *fk_trigger; fk_trigger = makeNode(CreateTrigStmt); fk_trigger->trigname = fkconstraint->constr_name; - fk_trigger->relation = myRel; + fk_trigger->relation = NULL; fk_trigger->before = false; fk_trigger->row = true; @@ -5174,10 +5183,11 @@ CreateFKCheckTrigger(RangeVar *myRel, FkConstraint *fkconstraint, fk_trigger->isconstraint = true; fk_trigger->deferrable = fkconstraint->deferrable; fk_trigger->initdeferred = fkconstraint->initdeferred; - fk_trigger->constrrel = fkconstraint->pktable; + fk_trigger->constrrel = NULL; fk_trigger->args = NIL; - (void) CreateTrigger(fk_trigger, constraintOid, false); + (void) CreateTrigger(fk_trigger, myRelOid, refRelOid, + constraintOid, false); /* Make changes-so-far visible */ CommandCounterIncrement(); @@ -5187,18 +5197,13 @@ CreateFKCheckTrigger(RangeVar *myRel, FkConstraint *fkconstraint, * Create the triggers that implement an FK constraint. */ static void -createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint, - Oid constraintOid) +createForeignKeyTriggers(Relation rel, Oid refRelOid, + FkConstraint *fkconstraint, Oid constraintOid) { - RangeVar *myRel; + Oid myRelOid; CreateTrigStmt *fk_trigger; - /* - * Reconstruct a RangeVar for my relation (not passed in, unfortunately). - */ - myRel = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)), - pstrdup(RelationGetRelationName(rel)), - -1); + myRelOid = RelationGetRelid(rel); /* Make changes-so-far visible */ CommandCounterIncrement(); @@ -5209,12 +5214,12 @@ createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint, */ fk_trigger = makeNode(CreateTrigStmt); fk_trigger->trigname = fkconstraint->constr_name; - fk_trigger->relation = fkconstraint->pktable; + fk_trigger->relation = NULL; fk_trigger->before = false; fk_trigger->row = true; fk_trigger->events = TRIGGER_TYPE_DELETE; fk_trigger->isconstraint = true; - fk_trigger->constrrel = myRel; + fk_trigger->constrrel = NULL; switch (fkconstraint->fk_del_action) { case FKCONSTR_ACTION_NOACTION: @@ -5249,7 +5254,8 @@ createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint, } fk_trigger->args = NIL; - (void) CreateTrigger(fk_trigger, constraintOid, false); + (void) CreateTrigger(fk_trigger, refRelOid, myRelOid, + constraintOid, false); /* Make changes-so-far visible */ CommandCounterIncrement(); @@ -5260,12 +5266,12 @@ createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint, */ fk_trigger = makeNode(CreateTrigStmt); fk_trigger->trigname = fkconstraint->constr_name; - fk_trigger->relation = fkconstraint->pktable; + fk_trigger->relation = NULL; fk_trigger->before = false; fk_trigger->row = true; fk_trigger->events = TRIGGER_TYPE_UPDATE; fk_trigger->isconstraint = true; - fk_trigger->constrrel = myRel; + fk_trigger->constrrel = NULL; switch (fkconstraint->fk_upd_action) { case FKCONSTR_ACTION_NOACTION: @@ -5300,7 +5306,8 @@ createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint, } fk_trigger->args = NIL; - (void) CreateTrigger(fk_trigger, constraintOid, false); + (void) CreateTrigger(fk_trigger, refRelOid, myRelOid, + constraintOid, false); /* Make changes-so-far visible */ CommandCounterIncrement(); @@ -5320,8 +5327,10 @@ createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint, * and the use of self-referential FKs is rare enough, that we live with * it for now. There will be a real fix in PG 9.2. */ - CreateFKCheckTrigger(myRel, fkconstraint, constraintOid, true); - CreateFKCheckTrigger(myRel, fkconstraint, constraintOid, false); + CreateFKCheckTrigger(myRelOid, refRelOid, fkconstraint, + constraintOid, true); + CreateFKCheckTrigger(myRelOid, refRelOid, fkconstraint, + constraintOid, false); } /* @@ -6000,7 +6009,8 @@ static void ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab) { ObjectAddress obj; - ListCell *l; + ListCell *def_item; + ListCell *oid_item; /* * Re-parse the index and constraint definitions, and attach them to the @@ -6009,11 +6019,36 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab) * lock on the table the constraint is attached to, and we need to get * that before dropping. It's safe because the parser won't actually look * at the catalogs to detect the existing entry. - */ - foreach(l, tab->changedIndexDefs) - ATPostAlterTypeParse((char *) lfirst(l), wqueue); - foreach(l, tab->changedConstraintDefs) - ATPostAlterTypeParse((char *) lfirst(l), wqueue); + * + * We can't rely on the output of deparsing to tell us which relation + * to operate on, because concurrent activity might have made the name + * resolve differently. Instead, we've got to use the OID of the + * constraint or index we're processing to figure out which relation + * to operate on. + */ + forboth(oid_item, tab->changedConstraintOids, + def_item, tab->changedConstraintDefs) + { + Oid oldId = lfirst_oid(oid_item); + Oid relid; + Oid confrelid; + + get_constraint_relation_oids(oldId, &relid, &confrelid); + ATPostAlterTypeParse(relid, confrelid, + (char *) lfirst(def_item), + wqueue); + } + forboth(oid_item, tab->changedIndexOids, + def_item, tab->changedIndexDefs) + { + Oid oldId = lfirst_oid(oid_item); + Oid relid; + + relid = IndexGetRelation(oldId); + ATPostAlterTypeParse(relid, InvalidOid, + (char *) lfirst(def_item), + wqueue); + } /* * Now we can drop the existing constraints and indexes --- constraints @@ -6023,18 +6058,18 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab) * should be okay to use DROP_RESTRICT here, since nothing else should be * depending on these objects. */ - foreach(l, tab->changedConstraintOids) + foreach(oid_item, tab->changedConstraintOids) { obj.classId = ConstraintRelationId; - obj.objectId = lfirst_oid(l); + obj.objectId = lfirst_oid(oid_item); obj.objectSubId = 0; performDeletion(&obj, DROP_RESTRICT); } - foreach(l, tab->changedIndexOids) + foreach(oid_item, tab->changedIndexOids) { obj.classId = RelationRelationId; - obj.objectId = lfirst_oid(l); + obj.objectId = lfirst_oid(oid_item); obj.objectSubId = 0; performDeletion(&obj, DROP_RESTRICT); } @@ -6046,11 +6081,12 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab) } static void -ATPostAlterTypeParse(char *cmd, List **wqueue) +ATPostAlterTypeParse(Oid oldRelId, Oid refRelId, char *cmd, List **wqueue) { List *raw_parsetree_list; List *querytree_list; ListCell *list_item; + Relation rel; /* * We expect that we will get only ALTER TABLE and CREATE INDEX @@ -6066,16 +6102,21 @@ ATPostAlterTypeParse(char *cmd, List **wqueue) if (IsA(stmt, IndexStmt)) querytree_list = lappend(querytree_list, - transformIndexStmt((IndexStmt *) stmt, + transformIndexStmt(oldRelId, + (IndexStmt *) stmt, cmd)); else if (IsA(stmt, AlterTableStmt)) querytree_list = list_concat(querytree_list, - transformAlterTableStmt((AlterTableStmt *) stmt, + transformAlterTableStmt(oldRelId, + (AlterTableStmt *) stmt, cmd)); else querytree_list = lappend(querytree_list, stmt); } + /* Caller should already have acquired whatever lock we need. */ + rel = relation_open(oldRelId, NoLock); + /* * Attach each generated command to the proper place in the work queue. * Note this could result in creation of entirely new work-queue entries. @@ -6087,7 +6128,6 @@ ATPostAlterTypeParse(char *cmd, List **wqueue) foreach(list_item, querytree_list) { Node *stm = (Node *) lfirst(list_item); - Relation rel; AlteredTableInfo *tab; switch (nodeTag(stm)) @@ -6097,14 +6137,12 @@ ATPostAlterTypeParse(char *cmd, List **wqueue) IndexStmt *stmt = (IndexStmt *) stm; AlterTableCmd *newcmd; - rel = relation_openrv(stmt->relation, AccessExclusiveLock); tab = ATGetQueueEntry(wqueue, rel); newcmd = makeNode(AlterTableCmd); newcmd->subtype = AT_ReAddIndex; newcmd->def = (Node *) stmt; tab->subcmds[AT_PASS_OLD_INDEX] = lappend(tab->subcmds[AT_PASS_OLD_INDEX], newcmd); - relation_close(rel, NoLock); break; } case T_AlterTableStmt: @@ -6112,7 +6150,6 @@ ATPostAlterTypeParse(char *cmd, List **wqueue) AlterTableStmt *stmt = (AlterTableStmt *) stm; ListCell *lcmd; - rel = relation_openrv(stmt->relation, AccessExclusiveLock); tab = ATGetQueueEntry(wqueue, rel); foreach(lcmd, stmt->cmds) { @@ -6135,7 +6172,6 @@ ATPostAlterTypeParse(char *cmd, List **wqueue) (int) cmd->subtype); } } - relation_close(rel, NoLock); break; } default: @@ -6143,8 +6179,9 @@ ATPostAlterTypeParse(char *cmd, List **wqueue) (int) nodeTag(stm)); } } -} + relation_close(rel, NoLock); +} /* * ALTER TABLE OWNER @@ -7542,7 +7579,8 @@ ATExecDropInherit(Relation rel, RangeVar *parent) /* * Execute ALTER TABLE SET SCHEMA * - * Note: caller must have checked ownership of the relation already + * WARNING WARNING WARNING: In previous *minor* releases the caller was + * responsible for checking ownership of the relation, but now we do it here. */ void AlterTableNamespace(RangeVar *relation, const char *newschema, @@ -7557,6 +7595,7 @@ AlterTableNamespace(RangeVar *relation, const char *newschema, rel = relation_openrv(relation, AccessExclusiveLock); relid = RelationGetRelid(rel); + CheckRelationOwnership(relid, true); oldNspOid = RelationGetNamespace(rel); /* Check relation type against type specified in the ALTER command */ |