aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/tablecmds.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r--src/backend/commands/tablecmds.c141
1 files changed, 85 insertions, 56 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index c4622c04125..1c3fe6a9630 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -235,6 +235,12 @@ static const struct dropmsgstrings dropmsgstringarray[] = {
{'\0', 0, NULL, NULL, NULL, NULL}
};
+struct DropRelationCallbackState
+{
+ char relkind;
+ Oid heapOid;
+};
+
/* Alter table target-type flags for ATSimplePermissions */
#define ATT_TABLE 0x0001
#define ATT_VIEW 0x0002
@@ -375,6 +381,9 @@ static void copy_relation_data(SMgrRelation rel, SMgrRelation dst,
ForkNumber forkNum, char relpersistence);
static const char *storage_name(char c);
+static void RangeVarCallbackForDropRelation(const RangeVar *rel, Oid relOid,
+ Oid oldRelOid, void *arg);
+
/* ----------------------------------------------------------------
* DefineRelation
@@ -751,9 +760,8 @@ RemoveRelations(DropStmt *drop)
{
RangeVar *rel = makeRangeVarFromNameList((List *) lfirst(cell));
Oid relOid;
- HeapTuple tuple;
- Form_pg_class classform;
ObjectAddress obj;
+ struct DropRelationCallbackState state;
/*
* These next few steps are a great deal like relation_openrv, but we
@@ -767,15 +775,13 @@ RemoveRelations(DropStmt *drop)
*/
AcceptInvalidationMessages();
- /*
- * Look up the appropriate relation using namespace search.
- *
- * XXX: Doing this without a lock is unsafe in the presence of
- * concurrent DDL, but acquiring a lock here might violate the rule
- * that a table must be locked before its corresponding index.
- * So, for now, we ignore the hazard.
- */
- relOid = RangeVarGetRelid(rel, NoLock, true, false);
+ /* Look up the appropriate relation using namespace search. */
+ state.relkind = relkind;
+ state.heapOid = InvalidOid;
+ relOid = RangeVarGetRelidExtended(rel, AccessExclusiveLock, true,
+ false,
+ RangeVarCallbackForDropRelation,
+ (void *) &state);
/* Not there? */
if (!OidIsValid(relOid))
@@ -784,57 +790,12 @@ RemoveRelations(DropStmt *drop)
continue;
}
- /*
- * In DROP INDEX, attempt to acquire lock on the parent table before
- * locking the index. index_drop() will need this anyway, and since
- * regular queries lock tables before their indexes, we risk deadlock
- * if we do it the other way around. No error if we don't find a
- * pg_index entry, though --- that most likely means it isn't an
- * index, and we'll fail below.
- */
- if (relkind == RELKIND_INDEX)
- {
- tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(relOid));
- if (HeapTupleIsValid(tuple))
- {
- Form_pg_index index = (Form_pg_index) GETSTRUCT(tuple);
-
- LockRelationOid(index->indrelid, AccessExclusiveLock);
- ReleaseSysCache(tuple);
- }
- }
-
- /* Get the lock before trying to fetch the syscache entry */
- LockRelationOid(relOid, AccessExclusiveLock);
-
- tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relOid));
- if (!HeapTupleIsValid(tuple))
- elog(ERROR, "cache lookup failed for relation %u", relOid);
- classform = (Form_pg_class) GETSTRUCT(tuple);
-
- if (classform->relkind != relkind)
- DropErrorMsgWrongType(rel->relname, classform->relkind, relkind);
-
- /* Allow DROP to either table owner or schema owner */
- if (!pg_class_ownercheck(relOid, GetUserId()) &&
- !pg_namespace_ownercheck(classform->relnamespace, GetUserId()))
- aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
- rel->relname);
-
- if (!allowSystemTableMods && IsSystemClass(classform))
- ereport(ERROR,
- (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
- errmsg("permission denied: \"%s\" is a system catalog",
- rel->relname)));
-
/* OK, we're ready to delete this one */
obj.classId = RelationRelationId;
obj.objectId = relOid;
obj.objectSubId = 0;
add_exact_object_address(&obj, objects);
-
- ReleaseSysCache(tuple);
}
performMultipleDeletions(objects, drop->behavior);
@@ -843,6 +804,74 @@ RemoveRelations(DropStmt *drop)
}
/*
+ * Before acquiring a table lock, check whether we have sufficient rights.
+ * In the case of DROP INDEX, also try to lock the table before the index.
+ */
+static void
+RangeVarCallbackForDropRelation(const RangeVar *rel, Oid relOid, Oid oldRelOid,
+ void *arg)
+{
+ HeapTuple tuple;
+ struct DropRelationCallbackState *state;
+ char relkind;
+ Form_pg_class classform;
+
+ state = (struct DropRelationCallbackState *) arg;
+ relkind = state->relkind;
+
+ /*
+ * If we previously locked some other index's heap, and the name we're
+ * looking up no longer refers to that relation, release the now-useless
+ * lock.
+ */
+ if (relOid != oldRelOid && OidIsValid(state->heapOid))
+ {
+ UnlockRelationOid(state->heapOid, AccessExclusiveLock);
+ state->heapOid = InvalidOid;
+ }
+
+ /* Didn't find a relation, so need for locking or permission checks. */
+ if (!OidIsValid(relOid))
+ return;
+
+ tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relOid));
+ if (!HeapTupleIsValid(tuple))
+ return; /* concurrently dropped, so nothing to do */
+ classform = (Form_pg_class) GETSTRUCT(tuple);
+
+ if (classform->relkind != relkind)
+ DropErrorMsgWrongType(rel->relname, classform->relkind, relkind);
+
+ /* Allow DROP to either table owner or schema owner */
+ if (!pg_class_ownercheck(relOid, GetUserId()) &&
+ !pg_namespace_ownercheck(classform->relnamespace, GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
+ rel->relname);
+
+ if (!allowSystemTableMods && IsSystemClass(classform))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("permission denied: \"%s\" is a system catalog",
+ rel->relname)));
+
+ ReleaseSysCache(tuple);
+
+ /*
+ * In DROP INDEX, attempt to acquire lock on the parent table before
+ * locking the index. index_drop() will need this anyway, and since
+ * regular queries lock tables before their indexes, we risk deadlock
+ * if we do it the other way around. No error if we don't find a
+ * pg_index entry, though --- the relation may have been droppd.
+ */
+ if (relkind == RELKIND_INDEX && relOid != oldRelOid)
+ {
+ state->heapOid = IndexGetRelation(relOid, true);
+ if (OidIsValid(state->heapOid))
+ LockRelationOid(state->heapOid, AccessExclusiveLock);
+ }
+}
+
+/*
* ExecuteTruncate
* Executes a TRUNCATE command.
*