diff options
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r-- | src/backend/commands/tablecmds.c | 137 |
1 files changed, 100 insertions, 37 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index c0cbd970e3b..c9d7a8761e9 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -293,7 +293,8 @@ static void validateCheckConstraint(Relation rel, HeapTuple constrtup); static void validateForeignKeyConstraint(char *conname, Relation rel, Relation pkrel, Oid pkindOid, Oid constraintOid); -static void createForeignKeyTriggers(Relation rel, Constraint *fkconstraint, +static void createForeignKeyTriggers(Relation rel, Oid refRelOid, + Constraint *fkconstraint, Oid constraintOid, Oid indexOid); static void ATController(Relation rel, List *cmds, bool recurse, LOCKMODE lockmode); static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, @@ -370,8 +371,9 @@ static void ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel, static void ATExecAlterColumnGenericOptions(Relation rel, const char *colName, List *options, LOCKMODE lockmode); static void ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab, LOCKMODE lockmode); -static void ATPostAlterTypeParse(Oid oldId, char *cmd, - List **wqueue, LOCKMODE lockmode, bool rewrite); +static void ATPostAlterTypeParse(Oid oldId, Oid oldRelId, Oid refRelId, + char *cmd, List **wqueue, LOCKMODE lockmode, + bool rewrite); static void TryReuseIndex(Oid oldId, IndexStmt *stmt); static void TryReuseForeignKey(Oid oldId, Constraint *con); static void change_owner_fix_column_acls(Oid relationOid, @@ -5480,7 +5482,8 @@ ATExecAddIndex(AlteredTableInfo *tab, Relation rel, /* The IndexStmt has already been through transformIndexStmt */ - new_index = DefineIndex(stmt, + new_index = DefineIndex(RelationGetRelid(rel), + stmt, InvalidOid, /* no predefined OID */ true, /* is_alter_table */ check_rights, @@ -5804,7 +5807,10 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, * table; trying to start with a lesser lock will just create a risk of * deadlock.) */ - pkrel = heap_openrv(fkconstraint->pktable, AccessExclusiveLock); + if (OidIsValid(fkconstraint->old_pktable_oid)) + pkrel = heap_open(fkconstraint->old_pktable_oid, AccessExclusiveLock); + else + pkrel = heap_openrv(fkconstraint->pktable, AccessExclusiveLock); /* * Validity checks (permission checks wait till we have the column @@ -6143,7 +6149,8 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, /* * Create the triggers that will enforce the constraint. */ - createForeignKeyTriggers(rel, fkconstraint, constrOid, indexOid); + createForeignKeyTriggers(rel, RelationGetRelid(pkrel), fkconstraint, + constrOid, indexOid); /* * Tell Phase 3 to check that the constraint is satisfied by existing @@ -6818,7 +6825,7 @@ validateForeignKeyConstraint(char *conname, } static void -CreateFKCheckTrigger(RangeVar *myRel, Constraint *fkconstraint, +CreateFKCheckTrigger(Oid myRelOid, Oid refRelOid, Constraint *fkconstraint, Oid constraintOid, Oid indexOid, bool on_insert) { CreateTrigStmt *fk_trigger; @@ -6834,7 +6841,7 @@ CreateFKCheckTrigger(RangeVar *myRel, Constraint *fkconstraint, */ fk_trigger = makeNode(CreateTrigStmt); fk_trigger->trigname = "RI_ConstraintTrigger_c"; - fk_trigger->relation = myRel; + fk_trigger->relation = NULL; fk_trigger->row = true; fk_trigger->timing = TRIGGER_TYPE_AFTER; @@ -6855,10 +6862,11 @@ CreateFKCheckTrigger(RangeVar *myRel, Constraint *fkconstraint, fk_trigger->isconstraint = true; fk_trigger->deferrable = fkconstraint->deferrable; fk_trigger->initdeferred = fkconstraint->initdeferred; - fk_trigger->constrrel = fkconstraint->pktable; + fk_trigger->constrrel = NULL; fk_trigger->args = NIL; - (void) CreateTrigger(fk_trigger, NULL, constraintOid, indexOid, true); + (void) CreateTrigger(fk_trigger, NULL, myRelOid, refRelOid, constraintOid, + indexOid, true); /* Make changes-so-far visible */ CommandCounterIncrement(); @@ -6868,18 +6876,13 @@ CreateFKCheckTrigger(RangeVar *myRel, Constraint *fkconstraint, * Create the triggers that implement an FK constraint. */ static void -createForeignKeyTriggers(Relation rel, Constraint *fkconstraint, +createForeignKeyTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint, Oid constraintOid, Oid indexOid) { - RangeVar *myRel; + Oid myRelOid; CreateTrigStmt *fk_trigger; - /* - * Reconstruct a RangeVar for my relation (not passed in, unfortunately). - */ - myRel = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)), - pstrdup(RelationGetRelationName(rel)), - -1); + myRelOid = RelationGetRelid(rel); /* Make changes-so-far visible */ CommandCounterIncrement(); @@ -6890,14 +6893,14 @@ createForeignKeyTriggers(Relation rel, Constraint *fkconstraint, */ fk_trigger = makeNode(CreateTrigStmt); fk_trigger->trigname = "RI_ConstraintTrigger_a"; - fk_trigger->relation = fkconstraint->pktable; + fk_trigger->relation = NULL; fk_trigger->row = true; fk_trigger->timing = TRIGGER_TYPE_AFTER; fk_trigger->events = TRIGGER_TYPE_DELETE; fk_trigger->columns = NIL; fk_trigger->whenClause = NULL; fk_trigger->isconstraint = true; - fk_trigger->constrrel = myRel; + fk_trigger->constrrel = NULL; switch (fkconstraint->fk_del_action) { case FKCONSTR_ACTION_NOACTION: @@ -6932,7 +6935,8 @@ createForeignKeyTriggers(Relation rel, Constraint *fkconstraint, } fk_trigger->args = NIL; - (void) CreateTrigger(fk_trigger, NULL, constraintOid, indexOid, true); + (void) CreateTrigger(fk_trigger, NULL, refRelOid, myRelOid, constraintOid, + indexOid, true); /* Make changes-so-far visible */ CommandCounterIncrement(); @@ -6943,14 +6947,14 @@ createForeignKeyTriggers(Relation rel, Constraint *fkconstraint, */ fk_trigger = makeNode(CreateTrigStmt); fk_trigger->trigname = "RI_ConstraintTrigger_a"; - fk_trigger->relation = fkconstraint->pktable; + fk_trigger->relation = NULL; fk_trigger->row = true; fk_trigger->timing = TRIGGER_TYPE_AFTER; fk_trigger->events = TRIGGER_TYPE_UPDATE; fk_trigger->columns = NIL; fk_trigger->whenClause = NULL; fk_trigger->isconstraint = true; - fk_trigger->constrrel = myRel; + fk_trigger->constrrel = NULL; switch (fkconstraint->fk_upd_action) { case FKCONSTR_ACTION_NOACTION: @@ -6985,7 +6989,8 @@ createForeignKeyTriggers(Relation rel, Constraint *fkconstraint, } fk_trigger->args = NIL; - (void) CreateTrigger(fk_trigger, NULL, constraintOid, indexOid, true); + (void) CreateTrigger(fk_trigger, NULL, refRelOid, myRelOid, constraintOid, + indexOid, true); /* Make changes-so-far visible */ CommandCounterIncrement(); @@ -6994,8 +6999,10 @@ createForeignKeyTriggers(Relation rel, Constraint *fkconstraint, * Build and execute CREATE CONSTRAINT TRIGGER statements for the CHECK * action for both INSERTs and UPDATEs on the referencing table. */ - CreateFKCheckTrigger(myRel, fkconstraint, constraintOid, indexOid, true); - CreateFKCheckTrigger(myRel, fkconstraint, constraintOid, indexOid, false); + CreateFKCheckTrigger(myRelOid, refRelOid, fkconstraint, constraintOid, + indexOid, true); + CreateFKCheckTrigger(myRelOid, refRelOid, fkconstraint, constraintOid, + indexOid, false); } /* @@ -7899,15 +7906,36 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab, LOCKMODE lockmode) * lock on the table the constraint is attached to, and we need to get * that before dropping. It's safe because the parser won't actually look * at the catalogs to detect the existing entry. + * + * We can't rely on the output of deparsing to tell us which relation + * to operate on, because concurrent activity might have made the name + * resolve differently. Instead, we've got to use the OID of the + * constraint or index we're processing to figure out which relation + * to operate on. */ forboth(oid_item, tab->changedConstraintOids, def_item, tab->changedConstraintDefs) - ATPostAlterTypeParse(lfirst_oid(oid_item), (char *) lfirst(def_item), + { + Oid oldId = lfirst_oid(oid_item); + Oid relid; + Oid confrelid; + + get_constraint_relation_oids(oldId, &relid, &confrelid); + ATPostAlterTypeParse(oldId, relid, confrelid, + (char *) lfirst(def_item), wqueue, lockmode, tab->rewrite); + } forboth(oid_item, tab->changedIndexOids, def_item, tab->changedIndexDefs) - ATPostAlterTypeParse(lfirst_oid(oid_item), (char *) lfirst(def_item), + { + Oid oldId = lfirst_oid(oid_item); + Oid relid; + + relid = IndexGetRelation(oldId, false); + ATPostAlterTypeParse(oldId, relid, InvalidOid, + (char *) lfirst(def_item), wqueue, lockmode, tab->rewrite); + } /* * Now we can drop the existing constraints and indexes --- constraints @@ -7940,12 +7968,13 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab, LOCKMODE lockmode) } static void -ATPostAlterTypeParse(Oid oldId, char *cmd, +ATPostAlterTypeParse(Oid oldId, Oid oldRelId, Oid refRelId, char *cmd, List **wqueue, LOCKMODE lockmode, bool rewrite) { List *raw_parsetree_list; List *querytree_list; ListCell *list_item; + Relation rel; /* * We expect that we will get only ALTER TABLE and CREATE INDEX @@ -7961,16 +7990,21 @@ ATPostAlterTypeParse(Oid oldId, char *cmd, if (IsA(stmt, IndexStmt)) querytree_list = lappend(querytree_list, - transformIndexStmt((IndexStmt *) stmt, + transformIndexStmt(oldRelId, + (IndexStmt *) stmt, cmd)); else if (IsA(stmt, AlterTableStmt)) querytree_list = list_concat(querytree_list, - transformAlterTableStmt((AlterTableStmt *) stmt, + transformAlterTableStmt(oldRelId, + (AlterTableStmt *) stmt, cmd)); else querytree_list = lappend(querytree_list, stmt); } + /* Caller should already have acquired whatever lock we need. */ + rel = relation_open(oldRelId, NoLock); + /* * Attach each generated command to the proper place in the work queue. * Note this could result in creation of entirely new work-queue entries. @@ -7982,7 +8016,6 @@ ATPostAlterTypeParse(Oid oldId, char *cmd, foreach(list_item, querytree_list) { Node *stm = (Node *) lfirst(list_item); - Relation rel; AlteredTableInfo *tab; switch (nodeTag(stm)) @@ -7995,14 +8028,12 @@ ATPostAlterTypeParse(Oid oldId, char *cmd, if (!rewrite) TryReuseIndex(oldId, stmt); - rel = relation_openrv(stmt->relation, lockmode); tab = ATGetQueueEntry(wqueue, rel); newcmd = makeNode(AlterTableCmd); newcmd->subtype = AT_ReAddIndex; newcmd->def = (Node *) stmt; tab->subcmds[AT_PASS_OLD_INDEX] = lappend(tab->subcmds[AT_PASS_OLD_INDEX], newcmd); - relation_close(rel, NoLock); break; } case T_AlterTableStmt: @@ -8010,7 +8041,6 @@ ATPostAlterTypeParse(Oid oldId, char *cmd, AlterTableStmt *stmt = (AlterTableStmt *) stm; ListCell *lcmd; - rel = relation_openrv(stmt->relation, lockmode); tab = ATGetQueueEntry(wqueue, rel); foreach(lcmd, stmt->cmds) { @@ -8031,6 +8061,7 @@ ATPostAlterTypeParse(Oid oldId, char *cmd, case AT_AddConstraint: Assert(IsA(cmd->def, Constraint)); con = (Constraint *) cmd->def; + con->old_pktable_oid = refRelId; /* rewriting neither side of a FK */ if (con->contype == CONSTR_FOREIGN && !rewrite && !tab->rewrite) @@ -8044,7 +8075,6 @@ ATPostAlterTypeParse(Oid oldId, char *cmd, (int) cmd->subtype); } } - relation_close(rel, NoLock); break; } default: @@ -8052,6 +8082,8 @@ ATPostAlterTypeParse(Oid oldId, char *cmd, (int) nodeTag(stm)); } } + + relation_close(rel, NoLock); } /* @@ -8062,7 +8094,6 @@ static void TryReuseIndex(Oid oldId, IndexStmt *stmt) { if (CheckIndexCompatible(oldId, - stmt->relation, stmt->accessMethod, stmt->indexParams, stmt->excludeOpNames)) @@ -10424,6 +10455,38 @@ RangeVarCallbackOwnsTable(const RangeVar *relation, } /* + * Callback to RangeVarGetRelidExtended(), similar to + * RangeVarCallbackOwnsTable() but without checks on the type of the relation. + */ +void +RangeVarCallbackOwnsRelation(const RangeVar *relation, + Oid relId, Oid oldRelId, void *arg) +{ + HeapTuple tuple; + + /* Nothing to do if the relation was not found. */ + if (!OidIsValid(relId)) + return; + + tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relId)); + if (!HeapTupleIsValid(tuple)) /* should not happen */ + elog(ERROR, "cache lookup failed for relation %u", relId); + + if (!pg_class_ownercheck(relId, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, + relation->relname); + + if (!allowSystemTableMods && + IsSystemClass((Form_pg_class) GETSTRUCT(tuple))) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied: \"%s\" is a system catalog", + relation->relname))); + + ReleaseSysCache(tuple); +} + +/* * Common RangeVarGetRelid callback for rename, set schema, and alter table * processing. */ |