diff options
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r-- | src/backend/commands/tablecmds.c | 141 |
1 files changed, 85 insertions, 56 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index c4622c04125..1c3fe6a9630 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -235,6 +235,12 @@ static const struct dropmsgstrings dropmsgstringarray[] = { {'\0', 0, NULL, NULL, NULL, NULL} }; +struct DropRelationCallbackState +{ + char relkind; + Oid heapOid; +}; + /* Alter table target-type flags for ATSimplePermissions */ #define ATT_TABLE 0x0001 #define ATT_VIEW 0x0002 @@ -375,6 +381,9 @@ static void copy_relation_data(SMgrRelation rel, SMgrRelation dst, ForkNumber forkNum, char relpersistence); static const char *storage_name(char c); +static void RangeVarCallbackForDropRelation(const RangeVar *rel, Oid relOid, + Oid oldRelOid, void *arg); + /* ---------------------------------------------------------------- * DefineRelation @@ -751,9 +760,8 @@ RemoveRelations(DropStmt *drop) { RangeVar *rel = makeRangeVarFromNameList((List *) lfirst(cell)); Oid relOid; - HeapTuple tuple; - Form_pg_class classform; ObjectAddress obj; + struct DropRelationCallbackState state; /* * These next few steps are a great deal like relation_openrv, but we @@ -767,15 +775,13 @@ RemoveRelations(DropStmt *drop) */ AcceptInvalidationMessages(); - /* - * Look up the appropriate relation using namespace search. - * - * XXX: Doing this without a lock is unsafe in the presence of - * concurrent DDL, but acquiring a lock here might violate the rule - * that a table must be locked before its corresponding index. - * So, for now, we ignore the hazard. - */ - relOid = RangeVarGetRelid(rel, NoLock, true, false); + /* Look up the appropriate relation using namespace search. */ + state.relkind = relkind; + state.heapOid = InvalidOid; + relOid = RangeVarGetRelidExtended(rel, AccessExclusiveLock, true, + false, + RangeVarCallbackForDropRelation, + (void *) &state); /* Not there? */ if (!OidIsValid(relOid)) @@ -784,57 +790,12 @@ RemoveRelations(DropStmt *drop) continue; } - /* - * In DROP INDEX, attempt to acquire lock on the parent table before - * locking the index. index_drop() will need this anyway, and since - * regular queries lock tables before their indexes, we risk deadlock - * if we do it the other way around. No error if we don't find a - * pg_index entry, though --- that most likely means it isn't an - * index, and we'll fail below. - */ - if (relkind == RELKIND_INDEX) - { - tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(relOid)); - if (HeapTupleIsValid(tuple)) - { - Form_pg_index index = (Form_pg_index) GETSTRUCT(tuple); - - LockRelationOid(index->indrelid, AccessExclusiveLock); - ReleaseSysCache(tuple); - } - } - - /* Get the lock before trying to fetch the syscache entry */ - LockRelationOid(relOid, AccessExclusiveLock); - - tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relOid)); - if (!HeapTupleIsValid(tuple)) - elog(ERROR, "cache lookup failed for relation %u", relOid); - classform = (Form_pg_class) GETSTRUCT(tuple); - - if (classform->relkind != relkind) - DropErrorMsgWrongType(rel->relname, classform->relkind, relkind); - - /* Allow DROP to either table owner or schema owner */ - if (!pg_class_ownercheck(relOid, GetUserId()) && - !pg_namespace_ownercheck(classform->relnamespace, GetUserId())) - aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, - rel->relname); - - if (!allowSystemTableMods && IsSystemClass(classform)) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("permission denied: \"%s\" is a system catalog", - rel->relname))); - /* OK, we're ready to delete this one */ obj.classId = RelationRelationId; obj.objectId = relOid; obj.objectSubId = 0; add_exact_object_address(&obj, objects); - - ReleaseSysCache(tuple); } performMultipleDeletions(objects, drop->behavior); @@ -843,6 +804,74 @@ RemoveRelations(DropStmt *drop) } /* + * Before acquiring a table lock, check whether we have sufficient rights. + * In the case of DROP INDEX, also try to lock the table before the index. + */ +static void +RangeVarCallbackForDropRelation(const RangeVar *rel, Oid relOid, Oid oldRelOid, + void *arg) +{ + HeapTuple tuple; + struct DropRelationCallbackState *state; + char relkind; + Form_pg_class classform; + + state = (struct DropRelationCallbackState *) arg; + relkind = state->relkind; + + /* + * If we previously locked some other index's heap, and the name we're + * looking up no longer refers to that relation, release the now-useless + * lock. + */ + if (relOid != oldRelOid && OidIsValid(state->heapOid)) + { + UnlockRelationOid(state->heapOid, AccessExclusiveLock); + state->heapOid = InvalidOid; + } + + /* Didn't find a relation, so need for locking or permission checks. */ + if (!OidIsValid(relOid)) + return; + + tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relOid)); + if (!HeapTupleIsValid(tuple)) + return; /* concurrently dropped, so nothing to do */ + classform = (Form_pg_class) GETSTRUCT(tuple); + + if (classform->relkind != relkind) + DropErrorMsgWrongType(rel->relname, classform->relkind, relkind); + + /* Allow DROP to either table owner or schema owner */ + if (!pg_class_ownercheck(relOid, GetUserId()) && + !pg_namespace_ownercheck(classform->relnamespace, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, + rel->relname); + + if (!allowSystemTableMods && IsSystemClass(classform)) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied: \"%s\" is a system catalog", + rel->relname))); + + ReleaseSysCache(tuple); + + /* + * In DROP INDEX, attempt to acquire lock on the parent table before + * locking the index. index_drop() will need this anyway, and since + * regular queries lock tables before their indexes, we risk deadlock + * if we do it the other way around. No error if we don't find a + * pg_index entry, though --- the relation may have been droppd. + */ + if (relkind == RELKIND_INDEX && relOid != oldRelOid) + { + state->heapOid = IndexGetRelation(relOid, true); + if (OidIsValid(state->heapOid)) + LockRelationOid(state->heapOid, AccessExclusiveLock); + } +} + +/* * ExecuteTruncate * Executes a TRUNCATE command. * |