diff options
-rw-r--r-- | src/backend/bootstrap/bootparse.y | 15 | ||||
-rw-r--r-- | src/backend/catalog/index.c | 3 | ||||
-rw-r--r-- | src/backend/catalog/pg_constraint.c | 19 | ||||
-rw-r--r-- | src/backend/commands/alter.c | 4 | ||||
-rw-r--r-- | src/backend/commands/indexcmds.c | 23 | ||||
-rw-r--r-- | src/backend/commands/tablecmds.c | 137 | ||||
-rw-r--r-- | src/backend/commands/trigger.c | 26 | ||||
-rw-r--r-- | src/backend/nodes/copyfuncs.c | 1 | ||||
-rw-r--r-- | src/backend/nodes/equalfuncs.c | 1 | ||||
-rw-r--r-- | src/backend/nodes/outfuncs.c | 1 | ||||
-rw-r--r-- | src/backend/parser/parse_utilcmd.c | 47 | ||||
-rw-r--r-- | src/backend/tcop/utility.c | 50 | ||||
-rw-r--r-- | src/include/catalog/index.h | 2 | ||||
-rw-r--r-- | src/include/catalog/pg_constraint.h | 1 | ||||
-rw-r--r-- | src/include/commands/defrem.h | 2 | ||||
-rw-r--r-- | src/include/commands/tablecmds.h | 2 | ||||
-rw-r--r-- | src/include/commands/trigger.h | 4 | ||||
-rw-r--r-- | src/include/nodes/parsenodes.h | 1 | ||||
-rw-r--r-- | src/include/parser/parse_utilcmd.h | 5 | ||||
-rw-r--r-- | src/include/tcop/utility.h | 2 |
20 files changed, 232 insertions, 114 deletions
diff --git a/src/backend/bootstrap/bootparse.y b/src/backend/bootstrap/bootparse.y index 1510614d5ad..953b7a49822 100644 --- a/src/backend/bootstrap/bootparse.y +++ b/src/backend/bootstrap/bootparse.y @@ -27,6 +27,7 @@ #include "bootstrap/bootstrap.h" #include "catalog/catalog.h" #include "catalog/heap.h" +#include "catalog/namespace.h" #include "catalog/pg_am.h" #include "catalog/pg_attribute.h" #include "catalog/pg_authid.h" @@ -257,9 +258,14 @@ Boot_InsertStmt: Boot_DeclareIndexStmt: XDECLARE INDEX boot_ident oidspec ON boot_ident USING boot_ident LPAREN boot_index_params RPAREN { + Oid relationId; + do_start(); - DefineIndex(makeRangeVar(NULL, LexIDStr($6), -1), + relationId = RangeVarGetRelid(makeRangeVar(NULL, LexIDStr($6), -1), + false); + + DefineIndex(relationId, LexIDStr($3), $4, LexIDStr($8), @@ -275,9 +281,14 @@ Boot_DeclareIndexStmt: Boot_DeclareUniqueIndexStmt: XDECLARE UNIQUE INDEX boot_ident oidspec ON boot_ident USING boot_ident LPAREN boot_index_params RPAREN { + Oid relationId; + do_start(); - DefineIndex(makeRangeVar(NULL, LexIDStr($7), -1), + relationId = RangeVarGetRelid(makeRangeVar(NULL, LexIDStr($7), -1), + false); + + DefineIndex(relationId, LexIDStr($4), $5, LexIDStr($9), diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index 108d859c0dc..4d7918d5a03 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -96,7 +96,6 @@ static void validate_index_heapscan(Relation heapRelation, IndexInfo *indexInfo, Snapshot snapshot, v_i_state *state); -static Oid IndexGetRelation(Oid indexId); /* @@ -2319,7 +2318,7 @@ index_set_state_flags(Oid indexId, IndexStateFlagsAction action) * IndexGetRelation: given an index's relation OID, get the OID of the * relation it is an index on. Uses the system cache. */ -static Oid +Oid IndexGetRelation(Oid indexId) { HeapTuple tuple; diff --git a/src/backend/catalog/pg_constraint.c b/src/backend/catalog/pg_constraint.c index 9e13872fa83..73b9519b31a 100644 --- a/src/backend/catalog/pg_constraint.c +++ b/src/backend/catalog/pg_constraint.c @@ -699,3 +699,22 @@ AlterConstraintNamespaces(Oid ownerId, Oid oldNspId, heap_close(conRel, RowExclusiveLock); } + +/* + * get_constraint_relation_oids + * Find the IDs of the relations to which a constraint refers. + */ +void +get_constraint_relation_oids(Oid constraint_oid, Oid *conrelid, Oid *confrelid) +{ + HeapTuple tup; + Form_pg_constraint con; + + tup = SearchSysCache(CONSTROID, ObjectIdGetDatum(constraint_oid), 0, 0, 0); + if (!HeapTupleIsValid(tup)) /* should not happen */ + elog(ERROR, "cache lookup failed for constraint %u", constraint_oid); + con = (Form_pg_constraint) GETSTRUCT(tup); + *conrelid = con->conrelid; + *confrelid = con->confrelid; + ReleaseSysCache(tup); +} diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c index b91f2205d1c..ccd5c4a7029 100644 --- a/src/backend/commands/alter.c +++ b/src/backend/commands/alter.c @@ -91,9 +91,8 @@ ExecRenameStmt(RenameStmt *stmt) { Oid relid; - CheckRelationOwnership(stmt->relation, true); - relid = RangeVarGetRelid(stmt->relation, false); + CheckRelationOwnership(relid, true); switch (stmt->renameType) { @@ -186,7 +185,6 @@ ExecAlterObjectSchemaStmt(AlterObjectSchemaStmt *stmt) case OBJECT_SEQUENCE: case OBJECT_TABLE: case OBJECT_VIEW: - CheckRelationOwnership(stmt->relation, true); AlterTableNamespace(stmt->relation, stmt->newschema, stmt->objectType); break; diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c index 7bd0ccbdc27..b781d40fcec 100644 --- a/src/backend/commands/indexcmds.c +++ b/src/backend/commands/indexcmds.c @@ -72,7 +72,8 @@ static bool relationHasPrimaryKey(Relation rel); * DefineIndex * Creates a new index. * - * 'heapRelation': the relation the index will apply to. + * 'relationId': the OID of the heap relation on which the index is to be + * created * 'indexRelationName': the name for the new index, or NULL to indicate * that a nonconflicting default name should be picked. * 'indexRelationId': normally InvalidOid, but during bootstrap can be @@ -97,7 +98,7 @@ static bool relationHasPrimaryKey(Relation rel); * 'concurrent': avoid blocking writers to the table while building. */ void -DefineIndex(RangeVar *heapRelation, +DefineIndex(Oid relationId, char *indexRelationName, Oid indexRelationId, char *accessMethodName, @@ -116,7 +117,6 @@ DefineIndex(RangeVar *heapRelation, { Oid *classObjectId; Oid accessMethodId; - Oid relationId; Oid namespaceId; Oid tablespaceId; Relation rel; @@ -135,6 +135,7 @@ DefineIndex(RangeVar *heapRelation, int n_old_snapshots; LockRelId heaprelid; LOCKTAG heaplocktag; + LOCKMODE lockmode; Snapshot snapshot; int i; @@ -153,14 +154,18 @@ DefineIndex(RangeVar *heapRelation, INDEX_MAX_KEYS))); /* - * Open heap relation, acquire a suitable lock on it, remember its OID - * * Only SELECT ... FOR UPDATE/SHARE are allowed while doing a standard * index build; but for concurrent builds we allow INSERT/UPDATE/DELETE * (but not VACUUM). + * + * NB: Caller is responsible for making sure that relationId refers + * to the relation on which the index should be built; except in bootstrap + * mode, this will typically require the caller to have already locked + * the relation. To avoid lock upgrade hazards, that lock should be at + * least as strong as the one we take here. */ - rel = heap_openrv(heapRelation, - (concurrent ? ShareUpdateExclusiveLock : ShareLock)); + lockmode = concurrent ? ShareUpdateExclusiveLock : ShareLock; + rel = heap_open(relationId, lockmode); relationId = RelationGetRelid(rel); namespaceId = RelationGetNamespace(rel); @@ -171,7 +176,7 @@ DefineIndex(RangeVar *heapRelation, ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("\"%s\" is not a table", - heapRelation->relname))); + RelationGetRelationName(rel)))); /* * Don't try to CREATE INDEX on temp tables of other backends. @@ -537,7 +542,7 @@ DefineIndex(RangeVar *heapRelation, */ /* Open and lock the parent heap relation */ - rel = heap_openrv(heapRelation, ShareUpdateExclusiveLock); + rel = heap_open(relationId, ShareUpdateExclusiveLock); /* And the target index relation */ indexRelation = index_open(indexRelationId, RowExclusiveLock); diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 541c75ab962..1edf6c75533 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -66,6 +66,7 @@ #include "storage/bufmgr.h" #include "storage/lmgr.h" #include "storage/smgr.h" +#include "tcop/utility.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/fmgroids.h" @@ -249,8 +250,8 @@ static Oid transformFkeyCheckAttrs(Relation pkrel, static void checkFkeyPermissions(Relation rel, int16 *attnums, int natts); static void validateForeignKeyConstraint(FkConstraint *fkconstraint, Relation rel, Relation pkrel, Oid constraintOid); -static void createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint, - Oid constraintOid); +static void createForeignKeyTriggers(Relation rel, Oid refRelOid, + FkConstraint *fkconstraint, Oid constraintOid); static void ATController(Relation rel, List *cmds, bool recurse); static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, bool recurse, bool recursing); @@ -308,7 +309,8 @@ static void ATPrepAlterColumnType(List **wqueue, static void ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel, const char *colName, TypeName *typename); static void ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab); -static void ATPostAlterTypeParse(char *cmd, List **wqueue); +static void ATPostAlterTypeParse(Oid oldRelId, Oid refRelId, char *cmd, + List **wqueue); static void change_owner_fix_column_acls(Oid relationOid, Oid oldOwnerId, Oid newOwnerId); static void change_owner_recurse_to_sequences(Oid relationOid, @@ -2159,9 +2161,12 @@ CheckTableNotInUse(Relation rel, const char *stmt) * the whole operation; we don't have to do anything special to clean up. */ void -AlterTable(AlterTableStmt *stmt) +AlterTable(Oid relid, AlterTableStmt *stmt) { - Relation rel = relation_openrv(stmt->relation, AccessExclusiveLock); + Relation rel; + + /* Caller is required to provide an adequate lock. */ + rel = relation_open(relid, NoLock); CheckTableNotInUse(rel, "ALTER TABLE"); @@ -4291,7 +4296,7 @@ ATExecAddIndex(AlteredTableInfo *tab, Relation rel, /* The IndexStmt has already been through transformIndexStmt */ - DefineIndex(stmt->relation, /* relation */ + DefineIndex(RelationGetRelid(rel), /* relation */ stmt->idxname, /* index name */ InvalidOid, /* no predefined OID */ stmt->accessMethod, /* am name */ @@ -4540,7 +4545,10 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, * table; trying to start with a lesser lock will just create a risk of * deadlock.) */ - pkrel = heap_openrv(fkconstraint->pktable, AccessExclusiveLock); + if (OidIsValid(fkconstraint->old_pktable_oid)) + pkrel = heap_open(fkconstraint->old_pktable_oid, AccessExclusiveLock); + else + pkrel = heap_openrv(fkconstraint->pktable, AccessExclusiveLock); /* * Validity checks (permission checks wait till we have the column @@ -4776,7 +4784,8 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, /* * Create the triggers that will enforce the constraint. */ - createForeignKeyTriggers(rel, fkconstraint, constrOid); + createForeignKeyTriggers(rel, RelationGetRelid(pkrel), fkconstraint, + constrOid); /* * Tell Phase 3 to check that the constraint is satisfied by existing rows @@ -5148,14 +5157,14 @@ validateForeignKeyConstraint(FkConstraint *fkconstraint, } static void -CreateFKCheckTrigger(RangeVar *myRel, FkConstraint *fkconstraint, +CreateFKCheckTrigger(Oid myRelOid, Oid refRelOid, FkConstraint *fkconstraint, Oid constraintOid, bool on_insert) { CreateTrigStmt *fk_trigger; fk_trigger = makeNode(CreateTrigStmt); fk_trigger->trigname = fkconstraint->constr_name; - fk_trigger->relation = myRel; + fk_trigger->relation = NULL; fk_trigger->before = false; fk_trigger->row = true; @@ -5174,10 +5183,11 @@ CreateFKCheckTrigger(RangeVar *myRel, FkConstraint *fkconstraint, fk_trigger->isconstraint = true; fk_trigger->deferrable = fkconstraint->deferrable; fk_trigger->initdeferred = fkconstraint->initdeferred; - fk_trigger->constrrel = fkconstraint->pktable; + fk_trigger->constrrel = NULL; fk_trigger->args = NIL; - (void) CreateTrigger(fk_trigger, constraintOid, false); + (void) CreateTrigger(fk_trigger, myRelOid, refRelOid, + constraintOid, false); /* Make changes-so-far visible */ CommandCounterIncrement(); @@ -5187,18 +5197,13 @@ CreateFKCheckTrigger(RangeVar *myRel, FkConstraint *fkconstraint, * Create the triggers that implement an FK constraint. */ static void -createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint, - Oid constraintOid) +createForeignKeyTriggers(Relation rel, Oid refRelOid, + FkConstraint *fkconstraint, Oid constraintOid) { - RangeVar *myRel; + Oid myRelOid; CreateTrigStmt *fk_trigger; - /* - * Reconstruct a RangeVar for my relation (not passed in, unfortunately). - */ - myRel = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)), - pstrdup(RelationGetRelationName(rel)), - -1); + myRelOid = RelationGetRelid(rel); /* Make changes-so-far visible */ CommandCounterIncrement(); @@ -5209,12 +5214,12 @@ createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint, */ fk_trigger = makeNode(CreateTrigStmt); fk_trigger->trigname = fkconstraint->constr_name; - fk_trigger->relation = fkconstraint->pktable; + fk_trigger->relation = NULL; fk_trigger->before = false; fk_trigger->row = true; fk_trigger->events = TRIGGER_TYPE_DELETE; fk_trigger->isconstraint = true; - fk_trigger->constrrel = myRel; + fk_trigger->constrrel = NULL; switch (fkconstraint->fk_del_action) { case FKCONSTR_ACTION_NOACTION: @@ -5249,7 +5254,8 @@ createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint, } fk_trigger->args = NIL; - (void) CreateTrigger(fk_trigger, constraintOid, false); + (void) CreateTrigger(fk_trigger, refRelOid, myRelOid, + constraintOid, false); /* Make changes-so-far visible */ CommandCounterIncrement(); @@ -5260,12 +5266,12 @@ createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint, */ fk_trigger = makeNode(CreateTrigStmt); fk_trigger->trigname = fkconstraint->constr_name; - fk_trigger->relation = fkconstraint->pktable; + fk_trigger->relation = NULL; fk_trigger->before = false; fk_trigger->row = true; fk_trigger->events = TRIGGER_TYPE_UPDATE; fk_trigger->isconstraint = true; - fk_trigger->constrrel = myRel; + fk_trigger->constrrel = NULL; switch (fkconstraint->fk_upd_action) { case FKCONSTR_ACTION_NOACTION: @@ -5300,7 +5306,8 @@ createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint, } fk_trigger->args = NIL; - (void) CreateTrigger(fk_trigger, constraintOid, false); + (void) CreateTrigger(fk_trigger, refRelOid, myRelOid, + constraintOid, false); /* Make changes-so-far visible */ CommandCounterIncrement(); @@ -5320,8 +5327,10 @@ createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint, * and the use of self-referential FKs is rare enough, that we live with * it for now. There will be a real fix in PG 9.2. */ - CreateFKCheckTrigger(myRel, fkconstraint, constraintOid, true); - CreateFKCheckTrigger(myRel, fkconstraint, constraintOid, false); + CreateFKCheckTrigger(myRelOid, refRelOid, fkconstraint, + constraintOid, true); + CreateFKCheckTrigger(myRelOid, refRelOid, fkconstraint, + constraintOid, false); } /* @@ -6000,7 +6009,8 @@ static void ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab) { ObjectAddress obj; - ListCell *l; + ListCell *def_item; + ListCell *oid_item; /* * Re-parse the index and constraint definitions, and attach them to the @@ -6009,11 +6019,36 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab) * lock on the table the constraint is attached to, and we need to get * that before dropping. It's safe because the parser won't actually look * at the catalogs to detect the existing entry. - */ - foreach(l, tab->changedIndexDefs) - ATPostAlterTypeParse((char *) lfirst(l), wqueue); - foreach(l, tab->changedConstraintDefs) - ATPostAlterTypeParse((char *) lfirst(l), wqueue); + * + * We can't rely on the output of deparsing to tell us which relation + * to operate on, because concurrent activity might have made the name + * resolve differently. Instead, we've got to use the OID of the + * constraint or index we're processing to figure out which relation + * to operate on. + */ + forboth(oid_item, tab->changedConstraintOids, + def_item, tab->changedConstraintDefs) + { + Oid oldId = lfirst_oid(oid_item); + Oid relid; + Oid confrelid; + + get_constraint_relation_oids(oldId, &relid, &confrelid); + ATPostAlterTypeParse(relid, confrelid, + (char *) lfirst(def_item), + wqueue); + } + forboth(oid_item, tab->changedIndexOids, + def_item, tab->changedIndexDefs) + { + Oid oldId = lfirst_oid(oid_item); + Oid relid; + + relid = IndexGetRelation(oldId); + ATPostAlterTypeParse(relid, InvalidOid, + (char *) lfirst(def_item), + wqueue); + } /* * Now we can drop the existing constraints and indexes --- constraints @@ -6023,18 +6058,18 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab) * should be okay to use DROP_RESTRICT here, since nothing else should be * depending on these objects. */ - foreach(l, tab->changedConstraintOids) + foreach(oid_item, tab->changedConstraintOids) { obj.classId = ConstraintRelationId; - obj.objectId = lfirst_oid(l); + obj.objectId = lfirst_oid(oid_item); obj.objectSubId = 0; performDeletion(&obj, DROP_RESTRICT); } - foreach(l, tab->changedIndexOids) + foreach(oid_item, tab->changedIndexOids) { obj.classId = RelationRelationId; - obj.objectId = lfirst_oid(l); + obj.objectId = lfirst_oid(oid_item); obj.objectSubId = 0; performDeletion(&obj, DROP_RESTRICT); } @@ -6046,11 +6081,12 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab) } static void -ATPostAlterTypeParse(char *cmd, List **wqueue) +ATPostAlterTypeParse(Oid oldRelId, Oid refRelId, char *cmd, List **wqueue) { List *raw_parsetree_list; List *querytree_list; ListCell *list_item; + Relation rel; /* * We expect that we will get only ALTER TABLE and CREATE INDEX @@ -6066,16 +6102,21 @@ ATPostAlterTypeParse(char *cmd, List **wqueue) if (IsA(stmt, IndexStmt)) querytree_list = lappend(querytree_list, - transformIndexStmt((IndexStmt *) stmt, + transformIndexStmt(oldRelId, + (IndexStmt *) stmt, cmd)); else if (IsA(stmt, AlterTableStmt)) querytree_list = list_concat(querytree_list, - transformAlterTableStmt((AlterTableStmt *) stmt, + transformAlterTableStmt(oldRelId, + (AlterTableStmt *) stmt, cmd)); else querytree_list = lappend(querytree_list, stmt); } + /* Caller should already have acquired whatever lock we need. */ + rel = relation_open(oldRelId, NoLock); + /* * Attach each generated command to the proper place in the work queue. * Note this could result in creation of entirely new work-queue entries. @@ -6087,7 +6128,6 @@ ATPostAlterTypeParse(char *cmd, List **wqueue) foreach(list_item, querytree_list) { Node *stm = (Node *) lfirst(list_item); - Relation rel; AlteredTableInfo *tab; switch (nodeTag(stm)) @@ -6097,14 +6137,12 @@ ATPostAlterTypeParse(char *cmd, List **wqueue) IndexStmt *stmt = (IndexStmt *) stm; AlterTableCmd *newcmd; - rel = relation_openrv(stmt->relation, AccessExclusiveLock); tab = ATGetQueueEntry(wqueue, rel); newcmd = makeNode(AlterTableCmd); newcmd->subtype = AT_ReAddIndex; newcmd->def = (Node *) stmt; tab->subcmds[AT_PASS_OLD_INDEX] = lappend(tab->subcmds[AT_PASS_OLD_INDEX], newcmd); - relation_close(rel, NoLock); break; } case T_AlterTableStmt: @@ -6112,7 +6150,6 @@ ATPostAlterTypeParse(char *cmd, List **wqueue) AlterTableStmt *stmt = (AlterTableStmt *) stm; ListCell *lcmd; - rel = relation_openrv(stmt->relation, AccessExclusiveLock); tab = ATGetQueueEntry(wqueue, rel); foreach(lcmd, stmt->cmds) { @@ -6135,7 +6172,6 @@ ATPostAlterTypeParse(char *cmd, List **wqueue) (int) cmd->subtype); } } - relation_close(rel, NoLock); break; } default: @@ -6143,8 +6179,9 @@ ATPostAlterTypeParse(char *cmd, List **wqueue) (int) nodeTag(stm)); } } -} + relation_close(rel, NoLock); +} /* * ALTER TABLE OWNER @@ -7542,7 +7579,8 @@ ATExecDropInherit(Relation rel, RangeVar *parent) /* * Execute ALTER TABLE SET SCHEMA * - * Note: caller must have checked ownership of the relation already + * WARNING WARNING WARNING: In previous *minor* releases the caller was + * responsible for checking ownership of the relation, but now we do it here. */ void AlterTableNamespace(RangeVar *relation, const char *newschema, @@ -7557,6 +7595,7 @@ AlterTableNamespace(RangeVar *relation, const char *newschema, rel = relation_openrv(relation, AccessExclusiveLock); relid = RelationGetRelid(rel); + CheckRelationOwnership(relid, true); oldNspOid = RelationGetNamespace(rel); /* Check relation type against type specified in the ALTER command */ diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c index 89ff89a6ba8..a042588a1af 100644 --- a/src/backend/commands/trigger.c +++ b/src/backend/commands/trigger.c @@ -69,6 +69,13 @@ static void AfterTriggerSaveEvent(ResultRelInfo *relinfo, int event, /* * Create a trigger. Returns the OID of the created trigger. * + * relOid, if nonzero, is the relation on which the trigger should be + * created. If zero, the name provided in the statement will be looked up. + * + * refRelOid, if nonzero, is the relation to which the constraint trigger + * refers. If zero, the constraint relation name provided in the statement + * will be looked up as needed. + * * constraintOid, if nonzero, says that this trigger is being created * internally to implement that constraint. A suitable pg_depend entry will * be made to link the trigger to that constraint. constraintOid is zero when @@ -83,7 +90,8 @@ static void AfterTriggerSaveEvent(ResultRelInfo *relinfo, int event, * but a foreign-key constraint. This is a kluge for backwards compatibility. */ Oid -CreateTrigger(CreateTrigStmt *stmt, Oid constraintOid, bool checkPermissions) +CreateTrigger(CreateTrigStmt *stmt, Oid relOid, Oid refRelOid, + Oid constraintOid, bool checkPermissions) { int16 tgtype; int2vector *tgattr; @@ -107,7 +115,10 @@ CreateTrigger(CreateTrigStmt *stmt, Oid constraintOid, bool checkPermissions) ObjectAddress myself, referenced; - rel = heap_openrv(stmt->relation, AccessExclusiveLock); + if (OidIsValid(relOid)) + rel = heap_open(relOid, AccessExclusiveLock); + else + rel = heap_openrv(stmt->relation, AccessExclusiveLock); if (rel->rd_rel->relkind != RELKIND_RELATION) ereport(ERROR, @@ -121,8 +132,13 @@ CreateTrigger(CreateTrigStmt *stmt, Oid constraintOid, bool checkPermissions) errmsg("permission denied: \"%s\" is a system catalog", RelationGetRelationName(rel)))); - if (stmt->isconstraint && stmt->constrrel != NULL) - constrrelid = RangeVarGetRelid(stmt->constrrel, false); + if (stmt->isconstraint) + { + if (OidIsValid(refRelOid)) + constrrelid = refRelOid; + else if (stmt->constrrel != NULL) + constrrelid = RangeVarGetRelid(stmt->constrrel, false); + } /* permission checks */ if (checkPermissions) @@ -261,7 +277,7 @@ CreateTrigger(CreateTrigStmt *stmt, Oid constraintOid, bool checkPermissions) ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("trigger \"%s\" for relation \"%s\" already exists", - trigname, stmt->relation->relname))); + trigname, RelationGetRelationName(rel)))); } systable_endscan(tgscan); diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index d999a25f865..fdb30f77e95 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -1756,6 +1756,7 @@ _copyFkConstraint(FkConstraint *from) COPY_SCALAR_FIELD(deferrable); COPY_SCALAR_FIELD(initdeferred); COPY_SCALAR_FIELD(skip_validation); + COPY_SCALAR_FIELD(old_pktable_oid); return newnode; } diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 5d1a19f7a61..445a59f731d 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -2199,6 +2199,7 @@ _equalFkConstraint(FkConstraint *a, FkConstraint *b) COMPARE_SCALAR_FIELD(deferrable); COMPARE_SCALAR_FIELD(initdeferred); COMPARE_SCALAR_FIELD(skip_validation); + COMPARE_SCALAR_FIELD(old_pktable_oid); return true; } diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index f3a8e76e476..c8c4409fb95 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -2345,6 +2345,7 @@ _outFkConstraint(StringInfo str, FkConstraint *node) WRITE_BOOL_FIELD(deferrable); WRITE_BOOL_FIELD(initdeferred); WRITE_BOOL_FIELD(skip_validation); + WRITE_OID_FIELD(old_pktable_oid); } diff --git a/src/backend/parser/parse_utilcmd.c b/src/backend/parser/parse_utilcmd.c index 0dda5bc27cd..71281f0e2fd 100644 --- a/src/backend/parser/parse_utilcmd.c +++ b/src/backend/parser/parse_utilcmd.c @@ -1356,14 +1356,18 @@ transformFKConstraints(ParseState *pstate, CreateStmtContext *cxt, * a predicate expression. There are several code paths that create indexes * without bothering to call this, because they know they don't have any * such expressions to deal with. + * + * To avoid race conditions, it's important that this function rely only on + * the passed-in relid (and not on stmt->relation) to determine the target + * relation. */ IndexStmt * -transformIndexStmt(IndexStmt *stmt, const char *queryString) +transformIndexStmt(Oid relid, IndexStmt *stmt, const char *queryString) { - Relation rel; ParseState *pstate; RangeTblEntry *rte; ListCell *l; + Relation rel; /* * We must not scribble on the passed-in IndexStmt, so copy it. (This is @@ -1371,25 +1375,17 @@ transformIndexStmt(IndexStmt *stmt, const char *queryString) */ stmt = (IndexStmt *) copyObject(stmt); - /* - * Open the parent table with appropriate locking. We must do this - * because addRangeTableEntry() would acquire only AccessShareLock, - * leaving DefineIndex() needing to do a lock upgrade with consequent risk - * of deadlock. Make sure this stays in sync with the type of lock - * DefineIndex() wants. - */ - rel = heap_openrv(stmt->relation, - (stmt->concurrent ? ShareUpdateExclusiveLock : ShareLock)); - /* Set up pstate */ pstate = make_parsestate(NULL); pstate->p_sourcetext = queryString; /* * Put the parent table into the rtable so that the expressions can refer - * to its fields without qualification. + * to its fields without qualification. Caller is responsible for locking + * relation, but we still need to open it. */ - rte = addRangeTableEntry(pstate, stmt->relation, NULL, false, true); + rel = relation_open(relid, NoLock); + rte = addRangeTableEntryForRelation(pstate, rel, NULL, false, true); /* no to join list, yes to namespaces */ addRTEtoQuery(pstate, rte, false, true, true); @@ -1431,7 +1427,7 @@ transformIndexStmt(IndexStmt *stmt, const char *queryString) free_parsestate(pstate); - /* Close relation, but keep the lock */ + /* Close relation */ heap_close(rel, NoLock); return stmt; @@ -1722,9 +1718,14 @@ transformRuleStmt(RuleStmt *stmt, const char *queryString, * Returns a List of utility commands to be done in sequence. One of these * will be the transformed AlterTableStmt, but there may be additional actions * to be done before and after the actual AlterTable() call. + * + * To avoid race conditions, it's important that this function rely only on + * the passed-in relid (and not on stmt->relation) to determine the target + * relation. */ List * -transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString) +transformAlterTableStmt(Oid relid, AlterTableStmt *stmt, + const char *queryString) { Relation rel; ParseState *pstate; @@ -1743,14 +1744,8 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString) */ stmt = (AlterTableStmt *) copyObject(stmt); - /* - * Acquire exclusive lock on the target relation, which will be held until - * end of transaction. This ensures any decisions we make here based on - * the state of the relation will still be good at execution. We must get - * exclusive lock now because execution will; taking a lower grade lock - * now and trying to upgrade later risks deadlock. - */ - rel = relation_openrv(stmt->relation, AccessExclusiveLock); + /* Caller is responsible for locking the relation */ + rel = relation_open(relid, NoLock); /* Set up pstate */ pstate = make_parsestate(NULL); @@ -1866,7 +1861,7 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString) Assert(IsA(idxstmt, IndexStmt)); newcmd = makeNode(AlterTableCmd); newcmd->subtype = AT_AddIndex; - newcmd->def = (Node *) transformIndexStmt((IndexStmt *) idxstmt, + newcmd->def = (Node *) transformIndexStmt(relid, (IndexStmt *) idxstmt, queryString); newcmds = lappend(newcmds, newcmd); } @@ -1888,7 +1883,7 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString) newcmds = lappend(newcmds, newcmd); } - /* Close rel but keep lock */ + /* Close rel */ relation_close(rel, NoLock); /* diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index bd91162cec3..ef1792a30ac 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -51,11 +51,13 @@ #include "rewrite/rewriteDefine.h" #include "rewrite/rewriteRemove.h" #include "storage/fd.h" +#include "storage/lmgr.h" #include "tcop/pquery.h" #include "tcop/utility.h" #include "utils/acl.h" #include "utils/guc.h" #include "utils/syscache.h" +#include "utils/lsyscache.h" /* @@ -65,12 +67,10 @@ * except when allowSystemTableMods is true. */ void -CheckRelationOwnership(RangeVar *rel, bool noCatalogs) +CheckRelationOwnership(Oid relOid, bool noCatalogs) { - Oid relOid; HeapTuple tuple; - relOid = RangeVarGetRelid(rel, false); tuple = SearchSysCache(RELOID, ObjectIdGetDatum(relOid), 0, 0, 0); @@ -79,7 +79,7 @@ CheckRelationOwnership(RangeVar *rel, bool noCatalogs) if (!pg_class_ownercheck(relOid, GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, - rel->relname); + get_rel_name(relOid)); if (noCatalogs) { @@ -88,7 +88,7 @@ CheckRelationOwnership(RangeVar *rel, bool noCatalogs) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("permission denied: \"%s\" is a system catalog", - rel->relname))); + get_rel_name(relOid)))); } ReleaseSysCache(tuple); @@ -638,9 +638,21 @@ ProcessUtility(Node *parsetree, { List *stmts; ListCell *l; + AlterTableStmt *atstmt = (AlterTableStmt *) parsetree; + Oid relid; + + /* + * Look up the relation OID just once, right here at the + * beginning, so that we don't end up repeating the name + * lookup later and latching onto a different relation + * partway through. + */ + relid = RangeVarGetRelid(atstmt->relation, false); + LockRelationOid(relid, AccessExclusiveLock); /* Run parse analysis ... */ - stmts = transformAlterTableStmt((AlterTableStmt *) parsetree, + stmts = transformAlterTableStmt(relid, + atstmt, queryString); /* ... and do it */ @@ -651,7 +663,7 @@ ProcessUtility(Node *parsetree, if (IsA(stmt, AlterTableStmt)) { /* Do the table alteration proper */ - AlterTable((AlterTableStmt *) stmt); + AlterTable(relid, (AlterTableStmt *) stmt); } else { @@ -795,18 +807,33 @@ ProcessUtility(Node *parsetree, case T_IndexStmt: /* CREATE INDEX */ { IndexStmt *stmt = (IndexStmt *) parsetree; + Oid relid; + LOCKMODE lockmode; if (stmt->concurrent) PreventTransactionChain(isTopLevel, "CREATE INDEX CONCURRENTLY"); - CheckRelationOwnership(stmt->relation, true); + /* + * Look up the relation OID just once, right here at the + * beginning, so that we don't end up repeating the name + * lookup later and latching onto a different relation + * partway through. To avoid lock upgrade hazards, it's + * important that we take the strongest lock that will + * eventually be needed here, so the lockmode calculation + * needs to match what DefineIndex() does. + */ + lockmode = stmt->concurrent ? ShareUpdateExclusiveLock + : ShareLock; + relid = RangeVarGetRelid(stmt->relation, false); + LockRelationOid(relid, lockmode); + CheckRelationOwnership(relid, true); /* Run parse analysis ... */ - stmt = transformIndexStmt(stmt, queryString); + stmt = transformIndexStmt(relid, stmt, queryString); /* ... and do it */ - DefineIndex(stmt->relation, /* relation */ + DefineIndex(relid, /* relation */ stmt->idxname, /* index name */ InvalidOid, /* no predefined OID */ stmt->accessMethod, /* am name */ @@ -954,7 +981,8 @@ ProcessUtility(Node *parsetree, break; case T_CreateTrigStmt: - CreateTrigger((CreateTrigStmt *) parsetree, InvalidOid, true); + CreateTrigger((CreateTrigStmt *) parsetree, InvalidOid, InvalidOid, + InvalidOid, true); break; case T_DropPropertyStmt: diff --git a/src/include/catalog/index.h b/src/include/catalog/index.h index a2c3fc04b32..f441352ece1 100644 --- a/src/include/catalog/index.h +++ b/src/include/catalog/index.h @@ -82,4 +82,6 @@ extern void index_set_state_flags(Oid indexId, IndexStateFlagsAction action); extern void reindex_index(Oid indexId); extern bool reindex_relation(Oid relid, bool toast_too); +extern Oid IndexGetRelation(Oid indexId); + #endif /* INDEX_H */ diff --git a/src/include/catalog/pg_constraint.h b/src/include/catalog/pg_constraint.h index edfa864dc2b..69979cef21b 100644 --- a/src/include/catalog/pg_constraint.h +++ b/src/include/catalog/pg_constraint.h @@ -215,5 +215,6 @@ extern char *ChooseConstraintName(const char *name1, const char *name2, extern void AlterConstraintNamespaces(Oid ownerId, Oid oldNspId, Oid newNspId, bool isType); +extern void get_constraint_relation_oids(Oid constraint_oid, Oid *conrelid, Oid *confrelid); #endif /* PG_CONSTRAINT_H */ diff --git a/src/include/commands/defrem.h b/src/include/commands/defrem.h index b326dc2a407..9b7936d1d90 100644 --- a/src/include/commands/defrem.h +++ b/src/include/commands/defrem.h @@ -18,7 +18,7 @@ /* commands/indexcmds.c */ -extern void DefineIndex(RangeVar *heapRelation, +extern void DefineIndex(Oid relationId, char *indexRelationName, Oid indexRelationId, char *accessMethodName, diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h index 706947e097a..2d934db2c9f 100644 --- a/src/include/commands/tablecmds.h +++ b/src/include/commands/tablecmds.h @@ -22,7 +22,7 @@ extern Oid DefineRelation(CreateStmt *stmt, char relkind); extern void RemoveRelations(DropStmt *drop); -extern void AlterTable(AlterTableStmt *stmt); +extern void AlterTable(Oid relid, AlterTableStmt *stmt); extern void ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing); diff --git a/src/include/commands/trigger.h b/src/include/commands/trigger.h index d24ba792b00..5c68565b4ff 100644 --- a/src/include/commands/trigger.h +++ b/src/include/commands/trigger.h @@ -104,8 +104,8 @@ extern PGDLLIMPORT int SessionReplicationRole; #define TRIGGER_FIRES_ON_REPLICA 'R' #define TRIGGER_DISABLED 'D' -extern Oid CreateTrigger(CreateTrigStmt *stmt, Oid constraintOid, - bool checkPermissions); +extern Oid CreateTrigger(CreateTrigStmt *stmt, Oid relOid, Oid refRelOid, + Oid constraintOid, bool checkPermissions); extern void DropTrigger(Oid relid, const char *trigname, DropBehavior behavior, bool missing_ok); diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index f23c9c54ae5..9d31bdbb27d 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -1430,6 +1430,7 @@ typedef struct FkConstraint bool deferrable; /* DEFERRABLE */ bool initdeferred; /* INITIALLY DEFERRED */ bool skip_validation; /* skip validation of existing rows? */ + Oid old_pktable_oid; /* pg_constraint.confrelid of my former self */ } FkConstraint; diff --git a/src/include/parser/parse_utilcmd.h b/src/include/parser/parse_utilcmd.h index 089c907c0e6..ec9e6d5623c 100644 --- a/src/include/parser/parse_utilcmd.h +++ b/src/include/parser/parse_utilcmd.h @@ -18,9 +18,10 @@ extern List *transformCreateStmt(CreateStmt *stmt, const char *queryString); -extern List *transformAlterTableStmt(AlterTableStmt *stmt, +extern List *transformAlterTableStmt(Oid relid, AlterTableStmt *stmt, const char *queryString); -extern IndexStmt *transformIndexStmt(IndexStmt *stmt, const char *queryString); +extern IndexStmt *transformIndexStmt(Oid relid, IndexStmt *stmt, + const char *queryString); extern void transformRuleStmt(RuleStmt *stmt, const char *queryString, List **actions, Node **whereClause); extern List *transformCreateSchemaStmt(CreateSchemaStmt *stmt); diff --git a/src/include/tcop/utility.h b/src/include/tcop/utility.h index d609b0ac4e9..7a12dd8084b 100644 --- a/src/include/tcop/utility.h +++ b/src/include/tcop/utility.h @@ -31,6 +31,6 @@ extern LogStmtLevel GetCommandLogLevel(Node *parsetree); extern bool CommandIsReadOnly(Node *parsetree); -extern void CheckRelationOwnership(RangeVar *rel, bool noCatalogs); +extern void CheckRelationOwnership(Oid relOid, bool noCatalogs); #endif /* UTILITY_H */ |