diff options
Diffstat (limited to 'src/backend/catalog/namespace.c')
-rw-r--r-- | src/backend/catalog/namespace.c | 149 |
1 files changed, 120 insertions, 29 deletions
diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c index ce795a61c5b..8d426951ba5 100644 --- a/src/backend/catalog/namespace.c +++ b/src/backend/catalog/namespace.c @@ -44,6 +44,8 @@ #include "parser/parse_func.h" #include "storage/backendid.h" #include "storage/ipc.h" +#include "storage/lmgr.h" +#include "storage/sinval.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/guc.h" @@ -215,14 +217,20 @@ Datum pg_is_other_temp_schema(PG_FUNCTION_ARGS); * Given a RangeVar describing an existing relation, * select the proper namespace and look up the relation OID. * - * If the relation is not found, return InvalidOid if failOK = true, + * If the relation is not found, return InvalidOid if missing_ok = true, * otherwise raise an error. + * + * If nowait = true, throw an error if we'd have to wait for a lock. */ Oid -RangeVarGetRelid(const RangeVar *relation, bool failOK) +RangeVarGetRelid(const RangeVar *relation, LOCKMODE lockmode, bool missing_ok, + bool nowait) { + uint64 inval_count; Oid namespaceId; Oid relId; + Oid oldRelId = InvalidOid; + bool retry = false; /* * We check the catalog name and then ignore it. @@ -238,37 +246,120 @@ RangeVarGetRelid(const RangeVar *relation, bool failOK) } /* - * Some non-default relpersistence value may have been specified. The - * parser never generates such a RangeVar in simple DML, but it can happen - * in contexts such as "CREATE TEMP TABLE foo (f1 int PRIMARY KEY)". Such - * a command will generate an added CREATE INDEX operation, which must be - * careful to find the temp table, even when pg_temp is not first in the - * search path. + * DDL operations can change the results of a name lookup. Since all + * such operations will generate invalidation messages, we keep track + * of whether any such messages show up while we're performing the + * operation, and retry until either (1) no more invalidation messages + * show up or (2) the answer doesn't change. + * + * But if lockmode = NoLock, then we assume that either the caller is OK + * with the answer changing under them, or that they already hold some + * appropriate lock, and therefore return the first answer we get without + * checking for invalidation messages. Also, if the requested lock is + * already held, no LockRelationOid will not AcceptInvalidationMessages, + * so we may fail to notice a change. We could protect against that case + * by calling AcceptInvalidationMessages() before beginning this loop, + * but that would add a significant amount overhead, so for now we don't. */ - if (relation->relpersistence == RELPERSISTENCE_TEMP) + for (;;) { - if (relation->schemaname) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("temporary tables cannot specify a schema name"))); - if (OidIsValid(myTempNamespace)) - relId = get_relname_relid(relation->relname, myTempNamespace); - else /* this probably can't happen? */ - relId = InvalidOid; - } - else if (relation->schemaname) - { - /* use exact schema given */ - namespaceId = LookupExplicitNamespace(relation->schemaname); - relId = get_relname_relid(relation->relname, namespaceId); - } - else - { - /* search the namespace path */ - relId = RelnameGetRelid(relation->relname); + /* + * Remember this value, so that, after looking up the relation name + * and locking its OID, we can check whether any invalidation messages + * have been processed that might require a do-over. + */ + inval_count = SharedInvalidMessageCounter; + + /* + * Some non-default relpersistence value may have been specified. The + * parser never generates such a RangeVar in simple DML, but it can + * happen in contexts such as "CREATE TEMP TABLE foo (f1 int PRIMARY + * KEY)". Such a command will generate an added CREATE INDEX + * operation, which must be careful to find the temp table, even when + * pg_temp is not first in the search path. + */ + if (relation->relpersistence == RELPERSISTENCE_TEMP) + { + if (relation->schemaname) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("temporary tables cannot specify a schema name"))); + if (OidIsValid(myTempNamespace)) + relId = get_relname_relid(relation->relname, myTempNamespace); + else /* this probably can't happen? */ + relId = InvalidOid; + } + else if (relation->schemaname) + { + /* use exact schema given */ + namespaceId = LookupExplicitNamespace(relation->schemaname); + relId = get_relname_relid(relation->relname, namespaceId); + } + else + { + /* search the namespace path */ + relId = RelnameGetRelid(relation->relname); + } + + /* + * If no lock requested, we assume the caller knows what they're + * doing. They should have already acquired a heavyweight lock on + * this relation earlier in the processing of this same statement, + * so it wouldn't be appropriate to AcceptInvalidationMessages() + * here, as that might pull the rug out from under them. + */ + if (lockmode == NoLock) + break; + + /* + * If, upon retry, we get back the same OID we did last time, then + * the invalidation messages we processed did not change the final + * answer. So we're done. + */ + if (retry && relId == oldRelId) + break; + + /* + * Lock relation. This will also accept any pending invalidation + * messages. If we got back InvalidOid, indicating not found, then + * there's nothing to lock, but we accept invalidation messages + * anyway, to flush any negative catcache entries that may be + * lingering. + */ + if (!OidIsValid(relId)) + AcceptInvalidationMessages(); + else if (!nowait) + LockRelationOid(relId, lockmode); + else if (!ConditionalLockRelationOid(relId, lockmode)) + { + if (relation->schemaname) + ereport(ERROR, + (errcode(ERRCODE_LOCK_NOT_AVAILABLE), + errmsg("could not obtain lock on relation \"%s.%s\"", + relation->schemaname, relation->relname))); + else + ereport(ERROR, + (errcode(ERRCODE_LOCK_NOT_AVAILABLE), + errmsg("could not obtain lock on relation \"%s\"", + relation->relname))); + } + + /* + * If no invalidation message were processed, we're done! + */ + if (inval_count == SharedInvalidMessageCounter) + break; + + /* + * Something may have changed. Let's repeat the name lookup, to + * make sure this name still references the same relation it did + * previously. + */ + retry = true; + oldRelId = relId; } - if (!OidIsValid(relId) && !failOK) + if (!OidIsValid(relId) && !missing_ok) { if (relation->schemaname) ereport(ERROR, |