diff options
Diffstat (limited to 'src/backend/catalog/namespace.c')
-rw-r--r-- | src/backend/catalog/namespace.c | 122 |
1 files changed, 111 insertions, 11 deletions
diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c index a9a64fe5f21..80d6fc7d0c2 100644 --- a/src/backend/catalog/namespace.c +++ b/src/backend/catalog/namespace.c @@ -480,31 +480,131 @@ RangeVarGetCreationNamespace(const RangeVar *newRelation) /* * RangeVarGetAndCheckCreationNamespace - * As RangeVarGetCreationNamespace, but with a permissions check. + * + * This function returns the OID of the namespace in which a new relation + * with a given name should be created. If the user does not have CREATE + * permission on the target namespace, this function will instead signal + * an ERROR. + * + * If non-NULL, *existing_oid is set to the OID of any existing relation with + * the same name which already exists in that namespace, or to InvalidOid if + * no such relation exists. + * + * If lockmode != NoLock, the specified lock mode is acquire on the existing + * relation, if any, provided that the current user owns the target relation. + * However, if lockmode != NoLock and the user does not own the target + * relation, we throw an ERROR, as we must not try to lock relations the + * user does not have permissions on. + * + * As a side effect, this function acquires AccessShareLock on the target + * namespace. Without this, the namespace could be dropped before our + * transaction commits, leaving behind relations with relnamespace pointing + * to a no-longer-exstant namespace. + * + * As a further side-effect, if the select namespace is a temporary namespace, + * we mark the RangeVar as RELPERSISTENCE_TEMP. */ Oid -RangeVarGetAndCheckCreationNamespace(const RangeVar *newRelation) +RangeVarGetAndCheckCreationNamespace(RangeVar *relation, + LOCKMODE lockmode, + Oid *existing_relation_id) { - Oid namespaceId; + uint64 inval_count; + Oid relid; + Oid oldrelid = InvalidOid; + Oid nspid; + Oid oldnspid = InvalidOid; + bool retry = false; - namespaceId = RangeVarGetCreationNamespace(newRelation); + /* + * We check the catalog name and then ignore it. + */ + if (relation->catalogname) + { + if (strcmp(relation->catalogname, get_database_name(MyDatabaseId)) != 0) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cross-database references are not implemented: \"%s.%s.%s\"", + relation->catalogname, relation->schemaname, + relation->relname))); + } /* - * Check we have permission to create there. Skip check if bootstrapping, - * since permissions machinery may not be working yet. + * As in RangeVarGetRelidExtended(), we guard against concurrent DDL + * operations by tracking whether any invalidation messages are processed + * while we're doing the name lookups and acquiring locks. See comments + * in that function for a more detailed explanation of this logic. */ - if (!IsBootstrapProcessingMode()) + for (;;) { AclResult aclresult; - aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(), - ACL_CREATE); + inval_count = SharedInvalidMessageCounter; + + /* Look up creation namespace and check for existing relation. */ + nspid = RangeVarGetCreationNamespace(relation); + Assert(OidIsValid(nspid)); + if (existing_relation_id != NULL) + relid = get_relname_relid(relation->relname, nspid); + else + relid = InvalidOid; + + /* + * In bootstrap processing mode, we don't bother with permissions + * or locking. Permissions might not be working yet, and locking is + * unnecessary. + */ + if (IsBootstrapProcessingMode()) + break; + + /* Check namespace permissions. */ + aclresult = pg_namespace_aclcheck(nspid, GetUserId(), ACL_CREATE); if (aclresult != ACLCHECK_OK) aclcheck_error(aclresult, ACL_KIND_NAMESPACE, - get_namespace_name(namespaceId)); + get_namespace_name(nspid)); + + if (retry) + { + /* If nothing changed, we're done. */ + if (relid == oldrelid && nspid == oldnspid) + break; + /* If creation namespace has changed, give up old lock. */ + if (nspid != oldnspid) + UnlockDatabaseObject(NamespaceRelationId, oldnspid, 0, + AccessShareLock); + /* If name points to something different, give up old lock. */ + if (relid != oldrelid && OidIsValid(oldrelid) && lockmode != NoLock) + UnlockRelationOid(oldrelid, lockmode); + } + + /* Lock namespace. */ + if (nspid != oldnspid) + LockDatabaseObject(NamespaceRelationId, nspid, 0, AccessShareLock); + + /* Lock relation, if required if and we have permission. */ + if (lockmode != NoLock && OidIsValid(relid)) + { + if (!pg_class_ownercheck(relid, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, + relation->relname); + if (relid != oldrelid) + LockRelationOid(relid, lockmode); + } + + /* If no invalidation message were processed, we're done! */ + if (inval_count == SharedInvalidMessageCounter) + break; + + /* Something may have changed, so recheck our work. */ + retry = true; + oldrelid = relid; + oldnspid = nspid; } - return namespaceId; + RangeVarAdjustRelationPersistence(relation, nspid); + if (existing_relation_id != NULL) + *existing_relation_id = relid; + return nspid; } /* |