diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2006-05-04 16:07:29 +0000 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2006-05-04 16:07:29 +0000 |
commit | 52667d56a3b489e5645f069522631824b7ffc520 (patch) | |
tree | fc53caf8cff281d944b0bcb1dd2b10f9e9e5b0c4 /src/backend/commands | |
parent | cb98e6fb8fd4f1ca955a85d5c0088e42e77a04f0 (diff) | |
download | postgresql-52667d56a3b489e5645f069522631824b7ffc520.tar.gz postgresql-52667d56a3b489e5645f069522631824b7ffc520.zip |
Rethink the locking mechanisms used for CREATE/DROP/RENAME DATABASE.
The former approach used ExclusiveLock on pg_database, which being a
cluster-wide lock meant only one of these operations could proceed at
a time; worse, it also blocked all incoming connections in ReverifyMyDatabase.
Now that we have LockSharedObject(), we can use locks of different types
applied to databases considered as objects. This allows much more
flexible management of the interlocking: two CREATE DATABASEs need not
block each other, and need not block connections except to the template
database being used. Similarly DROP DATABASE doesn't block unrelated
operations. The locking used in flatfiles.c is also much narrower in
scope than before. Per recent proposal.
Diffstat (limited to 'src/backend/commands')
-rw-r--r-- | src/backend/commands/dbcommands.c | 451 | ||||
-rw-r--r-- | src/backend/commands/user.c | 68 |
2 files changed, 271 insertions, 248 deletions
diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c index b21d750394d..518fed81968 100644 --- a/src/backend/commands/dbcommands.c +++ b/src/backend/commands/dbcommands.c @@ -3,19 +3,17 @@ * dbcommands.c * Database management commands (create/drop database). * - * Note: database creation/destruction commands take ExclusiveLock on - * pg_database to ensure that no two proceed in parallel. We must use - * at least this level of locking to ensure that no two backends try to - * write the flat-file copy of pg_database at once. We avoid using - * AccessExclusiveLock since there's no need to lock out ordinary readers - * of pg_database. + * Note: database creation/destruction commands use exclusive locks on + * the database objects (as expressed by LockSharedObject()) to avoid + * stepping on each others' toes. Formerly we used table-level locks + * on pg_database, but that's too coarse-grained. * * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.180 2006/05/03 22:45:26 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.181 2006/05/04 16:07:29 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -53,7 +51,8 @@ /* non-export function prototypes */ -static bool get_db_info(const char *name, Oid *dbIdP, Oid *ownerIdP, +static bool get_db_info(const char *name, LOCKMODE lockmode, + Oid *dbIdP, Oid *ownerIdP, int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, Oid *dbLastSysOidP, TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP, @@ -80,13 +79,12 @@ createdb(const CreatedbStmt *stmt) TransactionId src_frozenxid; Oid src_deftablespace; volatile Oid dst_deftablespace; - volatile Relation pg_database_rel; + Relation pg_database_rel; HeapTuple tuple; - TupleDesc pg_database_dsc; Datum new_record[Natts_pg_database]; char new_record_nulls[Natts_pg_database]; Oid dboid; - volatile Oid datdba; + Oid datdba; ListCell *option; DefElem *dtablespacename = NULL; DefElem *downer = NULL; @@ -96,8 +94,8 @@ createdb(const CreatedbStmt *stmt) char *dbname = stmt->dbname; char *dbowner = NULL; const char *dbtemplate = NULL; - volatile int encoding = -1; - volatile int dbconnlimit = -1; + int encoding = -1; + int dbconnlimit = -1; /* don't call this in a transaction block */ PreventTransactionChain((void *) stmt, "CREATE DATABASE"); @@ -216,31 +214,25 @@ createdb(const CreatedbStmt *stmt) check_is_member_of_role(GetUserId(), datdba); /* - * Check for db name conflict. There is a race condition here, since - * another backend could create the same DB name before we commit. - * However, holding an exclusive lock on pg_database for the whole time we - * are copying the source database doesn't seem like a good idea, so - * accept possibility of race to create. We will check again after we - * grab the exclusive lock. - */ - if (get_db_info(dbname, NULL, NULL, NULL, - NULL, NULL, NULL, NULL, NULL, NULL)) - ereport(ERROR, - (errcode(ERRCODE_DUPLICATE_DATABASE), - errmsg("database \"%s\" already exists", dbname))); - - /* - * Lookup database (template) to be cloned. + * Lookup database (template) to be cloned, and obtain share lock on it. + * ShareLock allows two CREATE DATABASEs to work from the same template + * concurrently, while ensuring no one is busy dropping it in parallel + * (which would be Very Bad since we'd likely get an incomplete copy + * without knowing it). This also prevents any new connections from being + * made to the source until we finish copying it, so we can be sure it + * won't change underneath us. */ if (!dbtemplate) dbtemplate = "template1"; /* Default template database name */ - if (!get_db_info(dbtemplate, &src_dboid, &src_owner, &src_encoding, + if (!get_db_info(dbtemplate, ShareLock, + &src_dboid, &src_owner, &src_encoding, &src_istemplate, &src_allowconn, &src_lastsysoid, &src_vacuumxid, &src_frozenxid, &src_deftablespace)) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_DATABASE), - errmsg("template database \"%s\" does not exist", dbtemplate))); + errmsg("template database \"%s\" does not exist", + dbtemplate))); /* * Permission check: to copy a DB that's not marked datistemplate, you @@ -258,8 +250,7 @@ createdb(const CreatedbStmt *stmt) /* * The source DB can't have any active backends, except this one * (exception is to allow CREATE DB while connected to template1). - * Otherwise we might copy inconsistent data. This check is not - * bulletproof, since someone might connect while we are copying... + * Otherwise we might copy inconsistent data. */ if (DatabaseHasActiveBackends(src_dboid, true)) ereport(ERROR, @@ -346,14 +337,65 @@ createdb(const CreatedbStmt *stmt) src_vacuumxid = src_frozenxid = GetCurrentTransactionId(); /* - * Preassign OID for pg_database tuple, so that we can compute db path. We - * have to open pg_database to do this, but we don't want to take - * ExclusiveLock yet, so just do it and close again. + * Check for db name conflict. This is just to give a more friendly + * error message than "unique index violation". There's a race condition + * but we're willing to accept the less friendly message in that case. */ - pg_database_rel = heap_open(DatabaseRelationId, AccessShareLock); - dboid = GetNewOid(pg_database_rel); - heap_close(pg_database_rel, AccessShareLock); - pg_database_rel = NULL; + if (OidIsValid(get_database_oid(dbname))) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_DATABASE), + errmsg("database \"%s\" already exists", dbname))); + + /* + * Insert a new tuple into pg_database. This establishes our ownership + * of the new database name (anyone else trying to insert the same name + * will block on the unique index, and fail after we commit). It also + * assigns the OID that the new database will have. + */ + pg_database_rel = heap_open(DatabaseRelationId, RowExclusiveLock); + + /* Form tuple */ + MemSet(new_record, 0, sizeof(new_record)); + MemSet(new_record_nulls, ' ', sizeof(new_record_nulls)); + + new_record[Anum_pg_database_datname - 1] = + DirectFunctionCall1(namein, CStringGetDatum(dbname)); + new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba); + new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding); + new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false); + new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true); + new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit); + new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid); + new_record[Anum_pg_database_datvacuumxid - 1] = TransactionIdGetDatum(src_vacuumxid); + new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid); + new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace); + + /* + * We deliberately set datconfig and datacl to defaults (NULL), rather + * than copying them from the template database. Copying datacl would + * be a bad idea when the owner is not the same as the template's + * owner. It's more debatable whether datconfig should be copied. + */ + new_record_nulls[Anum_pg_database_datconfig - 1] = 'n'; + new_record_nulls[Anum_pg_database_datacl - 1] = 'n'; + + tuple = heap_formtuple(RelationGetDescr(pg_database_rel), + new_record, new_record_nulls); + + dboid = simple_heap_insert(pg_database_rel, tuple); + + /* Update indexes */ + CatalogUpdateIndexes(pg_database_rel, tuple); + + /* + * Now generate additional catalog entries associated with the new DB + */ + + /* Register owner dependency */ + recordDependencyOnOwner(DatabaseRelationId, dboid, datdba); + + /* Create pg_shdepend entries for objects within database */ + copyTemplateDependencies(src_dboid, dboid); /* * Force dirty buffers out to disk, to ensure source database is @@ -435,64 +477,6 @@ createdb(const CreatedbStmt *stmt) heap_close(rel, AccessShareLock); /* - * Now OK to grab exclusive lock on pg_database. - */ - pg_database_rel = heap_open(DatabaseRelationId, ExclusiveLock); - - /* Check to see if someone else created same DB name meanwhile. */ - if (get_db_info(dbname, NULL, NULL, NULL, - NULL, NULL, NULL, NULL, NULL, NULL)) - ereport(ERROR, - (errcode(ERRCODE_DUPLICATE_DATABASE), - errmsg("database \"%s\" already exists", dbname))); - - /* - * Insert a new tuple into pg_database - */ - pg_database_dsc = RelationGetDescr(pg_database_rel); - - /* Form tuple */ - MemSet(new_record, 0, sizeof(new_record)); - MemSet(new_record_nulls, ' ', sizeof(new_record_nulls)); - - new_record[Anum_pg_database_datname - 1] = - DirectFunctionCall1(namein, CStringGetDatum(dbname)); - new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba); - new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding); - new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false); - new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true); - new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit); - new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid); - new_record[Anum_pg_database_datvacuumxid - 1] = TransactionIdGetDatum(src_vacuumxid); - new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid); - new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace); - - /* - * We deliberately set datconfig and datacl to defaults (NULL), rather - * than copying them from the template database. Copying datacl would - * be a bad idea when the owner is not the same as the template's - * owner. It's more debatable whether datconfig should be copied. - */ - new_record_nulls[Anum_pg_database_datconfig - 1] = 'n'; - new_record_nulls[Anum_pg_database_datacl - 1] = 'n'; - - tuple = heap_formtuple(pg_database_dsc, new_record, new_record_nulls); - - HeapTupleSetOid(tuple, dboid); /* override heap_insert's OID - * selection */ - - simple_heap_insert(pg_database_rel, tuple); - - /* Update indexes */ - CatalogUpdateIndexes(pg_database_rel, tuple); - - /* Register owner dependency */ - recordDependencyOnOwner(DatabaseRelationId, dboid, datdba); - - /* Create pg_shdepend entries for objects within database */ - copyTemplateDependencies(src_dboid, dboid); - - /* * We force a checkpoint before committing. This effectively means * that committed XLOG_DBASE_CREATE operations will never need to be * replayed (at least not in ordinary crash recovery; we still have to @@ -524,15 +508,21 @@ createdb(const CreatedbStmt *stmt) RequestCheckpoint(true, false); /* + * Close pg_database, but keep lock till commit (this is important + * to prevent any risk of deadlock failure while updating flat file) + */ + heap_close(pg_database_rel, NoLock); + + /* * Set flag to update flat database file at commit. */ database_file_update_needed(); } PG_CATCH(); { - /* Don't hold pg_database lock while doing recursive remove */ - if (pg_database_rel != NULL) - heap_close(pg_database_rel, ExclusiveLock); + /* Release lock on source database before doing recursive remove */ + UnlockSharedObject(DatabaseRelationId, src_dboid, 0, + ShareLock); /* Throw away any successfully copied subdirectories */ remove_dbtablespaces(dboid); @@ -540,10 +530,6 @@ createdb(const CreatedbStmt *stmt) PG_RE_THROW(); } PG_END_TRY(); - - /* Close pg_database, but keep exclusive lock till commit */ - /* This has to be outside the PG_TRY */ - heap_close(pg_database_rel, NoLock); } @@ -568,20 +554,15 @@ dropdb(const char *dbname, bool missing_ok) errmsg("cannot drop the currently open database"))); /* - * Obtain exclusive lock on pg_database. We need this to ensure that no - * new backend starts up in the target database while we are deleting it. - * (Actually, a new backend might still manage to start up, because it - * isn't able to lock pg_database while starting. But it will detect its - * error in ReverifyMyDatabase and shut down before any serious damage is - * done. See postinit.c.) - * - * An ExclusiveLock, rather than AccessExclusiveLock, is sufficient since - * ReverifyMyDatabase takes RowShareLock. This allows ordinary readers of - * pg_database to proceed in parallel. + * Look up the target database's OID, and get exclusive lock on it. + * We need this to ensure that no new backend starts up in the target + * database while we are deleting it (see postinit.c), and that no one is + * using it as a CREATE DATABASE template or trying to delete it for + * themselves. */ - pgdbrel = heap_open(DatabaseRelationId, ExclusiveLock); + pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock); - if (!get_db_info(dbname, &db_id, NULL, NULL, + if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL, &db_istemplate, NULL, NULL, NULL, NULL, NULL)) { if (!missing_ok) @@ -592,17 +573,18 @@ dropdb(const char *dbname, bool missing_ok) } else { - /* Close pg_database, release the lock, since we changed nothing */ - heap_close(pgdbrel, ExclusiveLock); + heap_close(pgdbrel, RowExclusiveLock); ereport(NOTICE, (errmsg("database \"%s\" does not exist, skipping", dbname))); - return; } } + /* + * Permission checks + */ if (!pg_database_ownercheck(db_id, GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE, dbname); @@ -618,7 +600,8 @@ dropdb(const char *dbname, bool missing_ok) errmsg("cannot drop a template database"))); /* - * Check for active backends in the target database. + * Check for active backends in the target database. (Because we hold + * the database lock, no new ones can start after this.) */ if (DatabaseHasActiveBackends(db_id, false)) ereport(ERROR, @@ -640,8 +623,7 @@ dropdb(const char *dbname, bool missing_ok) ReleaseSysCache(tup); /* - * Delete any comments associated with the database - * + * Delete any comments associated with the database. */ DeleteSharedComments(db_id, DatabaseRelationId); @@ -675,7 +657,10 @@ dropdb(const char *dbname, bool missing_ok) */ remove_dbtablespaces(db_id); - /* Close pg_database, but keep exclusive lock till commit */ + /* + * Close pg_database, but keep lock till commit (this is important + * to prevent any risk of deadlock failure while updating flat file) + */ heap_close(pgdbrel, NoLock); /* @@ -691,29 +676,18 @@ dropdb(const char *dbname, bool missing_ok) void RenameDatabase(const char *oldname, const char *newname) { - HeapTuple tup, - newtup; + Oid db_id; + HeapTuple newtup; Relation rel; - SysScanDesc scan, - scan2; - ScanKeyData key, - key2; /* - * Obtain ExclusiveLock so that no new session gets started while the - * rename is in progress. + * Look up the target database's OID, and get exclusive lock on it. + * We need this for the same reasons as DROP DATABASE. */ - rel = heap_open(DatabaseRelationId, ExclusiveLock); - - ScanKeyInit(&key, - Anum_pg_database_datname, - BTEqualStrategyNumber, F_NAMEEQ, - NameGetDatum(oldname)); - scan = systable_beginscan(rel, DatabaseNameIndexId, true, - SnapshotNow, 1, &key); + rel = heap_open(DatabaseRelationId, RowExclusiveLock); - tup = systable_getnext(scan); - if (!HeapTupleIsValid(tup)) + if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL, + NULL, NULL, NULL, NULL, NULL, NULL)) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_DATABASE), errmsg("database \"%s\" does not exist", oldname))); @@ -724,36 +698,29 @@ RenameDatabase(const char *oldname, const char *newname) * be an actual problem besides a little confusion, so think about this * and decide. */ - if (HeapTupleGetOid(tup) == MyDatabaseId) + if (db_id == MyDatabaseId) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("current database may not be renamed"))); /* - * Make sure the database does not have active sessions. Might not be - * necessary, but it's consistent with other database operations. + * Make sure the database does not have active sessions. This is the + * same concern as above, but applied to other sessions. */ - if (DatabaseHasActiveBackends(HeapTupleGetOid(tup), false)) + if (DatabaseHasActiveBackends(db_id, false)) ereport(ERROR, (errcode(ERRCODE_OBJECT_IN_USE), errmsg("database \"%s\" is being accessed by other users", oldname))); /* make sure the new name doesn't exist */ - ScanKeyInit(&key2, - Anum_pg_database_datname, - BTEqualStrategyNumber, F_NAMEEQ, - NameGetDatum(newname)); - scan2 = systable_beginscan(rel, DatabaseNameIndexId, true, - SnapshotNow, 1, &key2); - if (HeapTupleIsValid(systable_getnext(scan2))) + if (OidIsValid(get_database_oid(newname))) ereport(ERROR, (errcode(ERRCODE_DUPLICATE_DATABASE), errmsg("database \"%s\" already exists", newname))); - systable_endscan(scan2); /* must be owner */ - if (!pg_database_ownercheck(HeapTupleGetOid(tup), GetUserId())) + if (!pg_database_ownercheck(db_id, GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE, oldname); @@ -764,14 +731,19 @@ RenameDatabase(const char *oldname, const char *newname) errmsg("permission denied to rename database"))); /* rename */ - newtup = heap_copytuple(tup); + newtup = SearchSysCacheCopy(DATABASEOID, + ObjectIdGetDatum(db_id), + 0, 0, 0); + if (!HeapTupleIsValid(newtup)) + elog(ERROR, "cache lookup failed for database %u", db_id); namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname); simple_heap_update(rel, &newtup->t_self, newtup); CatalogUpdateIndexes(rel, newtup); - systable_endscan(scan); - - /* Close pg_database, but keep exclusive lock till commit */ + /* + * Close pg_database, but keep lock till commit (this is important + * to prevent any risk of deadlock failure while updating flat file) + */ heap_close(rel, NoLock); /* @@ -821,7 +793,9 @@ AlterDatabase(AlterDatabaseStmt *stmt) connlimit = intVal(dconnlimit->arg); /* - * We don't need ExclusiveLock since we aren't updating the flat file. + * Get the old tuple. We don't need a lock on the database per se, + * because we're not going to do anything that would mess up incoming + * connections. */ rel = heap_open(DatabaseRelationId, RowExclusiveLock); ScanKeyInit(&scankey, @@ -891,7 +865,9 @@ AlterDatabaseSet(AlterDatabaseSetStmt *stmt) valuestr = flatten_set_variable_args(stmt->variable, stmt->value); /* - * We don't need ExclusiveLock since we aren't updating the flat file. + * Get the old tuple. We don't need a lock on the database per se, + * because we're not going to do anything that would mess up incoming + * connections. */ rel = heap_open(DatabaseRelationId, RowExclusiveLock); ScanKeyInit(&scankey, @@ -974,7 +950,9 @@ AlterDatabaseOwner(const char *dbname, Oid newOwnerId) Form_pg_database datForm; /* - * We don't need ExclusiveLock since we aren't updating the flat file. + * Get the old tuple. We don't need a lock on the database per se, + * because we're not going to do anything that would mess up incoming + * connections. */ rel = heap_open(DatabaseRelationId, RowExclusiveLock); ScanKeyInit(&scankey, @@ -1077,72 +1055,127 @@ AlterDatabaseOwner(const char *dbname, Oid newOwnerId) * Helper functions */ +/* + * Look up info about the database named "name". If the database exists, + * obtain the specified lock type on it, fill in any of the remaining + * parameters that aren't NULL, and return TRUE. If no such database, + * return FALSE. + */ static bool -get_db_info(const char *name, Oid *dbIdP, Oid *ownerIdP, +get_db_info(const char *name, LOCKMODE lockmode, + Oid *dbIdP, Oid *ownerIdP, int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP, Oid *dbLastSysOidP, TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP, Oid *dbTablespace) { + bool result = false; Relation relation; - ScanKeyData scanKey; - SysScanDesc scan; - HeapTuple tuple; - bool gottuple; AssertArg(name); /* Caller may wish to grab a better lock on pg_database beforehand... */ relation = heap_open(DatabaseRelationId, AccessShareLock); - ScanKeyInit(&scanKey, - Anum_pg_database_datname, - BTEqualStrategyNumber, F_NAMEEQ, - NameGetDatum(name)); + /* + * Loop covers the rare case where the database is renamed before we + * can lock it. We try again just in case we can find a new one of + * the same name. + */ + for (;;) + { + ScanKeyData scanKey; + SysScanDesc scan; + HeapTuple tuple; + Oid dbOid; - scan = systable_beginscan(relation, DatabaseNameIndexId, true, - SnapshotNow, 1, &scanKey); + /* + * there's no syscache for database-indexed-by-name, + * so must do it the hard way + */ + ScanKeyInit(&scanKey, + Anum_pg_database_datname, + BTEqualStrategyNumber, F_NAMEEQ, + NameGetDatum(name)); - tuple = systable_getnext(scan); + scan = systable_beginscan(relation, DatabaseNameIndexId, true, + SnapshotNow, 1, &scanKey); - gottuple = HeapTupleIsValid(tuple); - if (gottuple) - { - Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple); - - /* oid of the database */ - if (dbIdP) - *dbIdP = HeapTupleGetOid(tuple); - /* oid of the owner */ - if (ownerIdP) - *ownerIdP = dbform->datdba; - /* character encoding */ - if (encodingP) - *encodingP = dbform->encoding; - /* allowed as template? */ - if (dbIsTemplateP) - *dbIsTemplateP = dbform->datistemplate; - /* allowing connections? */ - if (dbAllowConnP) - *dbAllowConnP = dbform->datallowconn; - /* last system OID used in database */ - if (dbLastSysOidP) - *dbLastSysOidP = dbform->datlastsysoid; - /* limit of vacuumed XIDs */ - if (dbVacuumXidP) - *dbVacuumXidP = dbform->datvacuumxid; - /* limit of frozen XIDs */ - if (dbFrozenXidP) - *dbFrozenXidP = dbform->datfrozenxid; - /* default tablespace for this database */ - if (dbTablespace) - *dbTablespace = dbform->dattablespace; + tuple = systable_getnext(scan); + + if (!HeapTupleIsValid(tuple)) + { + /* definitely no database of that name */ + systable_endscan(scan); + break; + } + + dbOid = HeapTupleGetOid(tuple); + + systable_endscan(scan); + + /* + * Now that we have a database OID, we can try to lock the DB. + */ + if (lockmode != NoLock) + LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode); + + /* + * And now, re-fetch the tuple by OID. If it's still there and + * still the same name, we win; else, drop the lock and loop + * back to try again. + */ + tuple = SearchSysCache(DATABASEOID, + ObjectIdGetDatum(dbOid), + 0, 0, 0); + if (HeapTupleIsValid(tuple)) + { + Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple); + + if (strcmp(name, NameStr(dbform->datname)) == 0) + { + /* oid of the database */ + if (dbIdP) + *dbIdP = dbOid; + /* oid of the owner */ + if (ownerIdP) + *ownerIdP = dbform->datdba; + /* character encoding */ + if (encodingP) + *encodingP = dbform->encoding; + /* allowed as template? */ + if (dbIsTemplateP) + *dbIsTemplateP = dbform->datistemplate; + /* allowing connections? */ + if (dbAllowConnP) + *dbAllowConnP = dbform->datallowconn; + /* last system OID used in database */ + if (dbLastSysOidP) + *dbLastSysOidP = dbform->datlastsysoid; + /* limit of vacuumed XIDs */ + if (dbVacuumXidP) + *dbVacuumXidP = dbform->datvacuumxid; + /* limit of frozen XIDs */ + if (dbFrozenXidP) + *dbFrozenXidP = dbform->datfrozenxid; + /* default tablespace for this database */ + if (dbTablespace) + *dbTablespace = dbform->dattablespace; + ReleaseSysCache(tuple); + result = true; + break; + } + /* can only get here if it was just renamed */ + ReleaseSysCache(tuple); + } + + if (lockmode != NoLock) + UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode); } - systable_endscan(scan); heap_close(relation, AccessShareLock); - return gottuple; + return result; } /* Check if current user has createdb privileges */ @@ -1234,8 +1267,6 @@ remove_dbtablespaces(Oid db_id) * get_database_oid - given a database name, look up the OID * * Returns InvalidOid if database name not found. - * - * This is not actually used in this file, but is exported for use elsewhere. */ Oid get_database_oid(const char *dbname) @@ -1277,8 +1308,6 @@ get_database_oid(const char *dbname) * get_database_name - given a database OID, look up the name * * Returns a palloc'd string, or NULL if no such database. - * - * This is not actually used in this file, but is exported for use elsewhere. */ char * get_database_name(Oid dbid) diff --git a/src/backend/commands/user.c b/src/backend/commands/user.c index 5fddd80dfdd..61956b27706 100644 --- a/src/backend/commands/user.c +++ b/src/backend/commands/user.c @@ -6,7 +6,7 @@ * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/backend/commands/user.c,v 1.170 2006/03/05 15:58:25 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/commands/user.c,v 1.171 2006/05/04 16:07:29 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -274,10 +274,9 @@ CreateRole(CreateRoleStmt *stmt) /* * Check the pg_authid relation to be certain the role doesn't already - * exist. Note we secure exclusive lock because we need to protect our - * eventual update of the flat auth file. + * exist. */ - pg_authid_rel = heap_open(AuthIdRelationId, ExclusiveLock); + pg_authid_rel = heap_open(AuthIdRelationId, RowExclusiveLock); pg_authid_dsc = RelationGetDescr(pg_authid_rel); tuple = SearchSysCache(AUTHNAME, @@ -377,8 +376,8 @@ CreateRole(CreateRoleStmt *stmt) GetUserId(), false); /* - * Now we can clean up; but keep lock until commit (to avoid possible - * deadlock when commit code tries to acquire lock). + * Close pg_authid, but keep lock till commit (this is important + * to prevent any risk of deadlock failure while updating flat file) */ heap_close(pg_authid_rel, NoLock); @@ -538,10 +537,9 @@ AlterRole(AlterRoleStmt *stmt) validUntil = strVal(dvalidUntil->arg); /* - * Scan the pg_authid relation to be certain the user exists. Note we - * secure exclusive lock to protect our update of the flat auth file. + * Scan the pg_authid relation to be certain the user exists. */ - pg_authid_rel = heap_open(AuthIdRelationId, ExclusiveLock); + pg_authid_rel = heap_open(AuthIdRelationId, RowExclusiveLock); pg_authid_dsc = RelationGetDescr(pg_authid_rel); tuple = SearchSysCache(AUTHNAME, @@ -697,8 +695,8 @@ AlterRole(AlterRoleStmt *stmt) false); /* - * Now we can clean up; but keep lock until commit (to avoid possible - * deadlock when commit code tries to acquire lock). + * Close pg_authid, but keep lock till commit (this is important + * to prevent any risk of deadlock failure while updating flat file) */ heap_close(pg_authid_rel, NoLock); @@ -726,10 +724,6 @@ AlterRoleSet(AlterRoleSetStmt *stmt) valuestr = flatten_set_variable_args(stmt->variable, stmt->value); - /* - * RowExclusiveLock is sufficient, because we don't need to update the - * flat auth file. - */ rel = heap_open(AuthIdRelationId, RowExclusiveLock); oldtuple = SearchSysCache(AUTHNAME, PointerGetDatum(stmt->role), @@ -799,6 +793,7 @@ AlterRoleSet(AlterRoleSetStmt *stmt) CatalogUpdateIndexes(rel, newtuple); ReleaseSysCache(oldtuple); + /* needn't keep lock since we won't be updating the flat file */ heap_close(rel, RowExclusiveLock); } @@ -820,11 +815,9 @@ DropRole(DropRoleStmt *stmt) /* * Scan the pg_authid relation to find the Oid of the role(s) to be - * deleted. Note we secure exclusive lock on pg_authid, because we need - * to protect our update of the flat auth file. A regular writer's lock - * on pg_auth_members is sufficient though. + * deleted. */ - pg_authid_rel = heap_open(AuthIdRelationId, ExclusiveLock); + pg_authid_rel = heap_open(AuthIdRelationId, RowExclusiveLock); pg_auth_members_rel = heap_open(AuthMemRelationId, RowExclusiveLock); foreach(item, stmt->roles) @@ -960,7 +953,7 @@ DropRole(DropRoleStmt *stmt) /* * Now we can clean up; but keep locks until commit (to avoid possible - * deadlock when commit code tries to acquire lock). + * deadlock failure while updating flat file) */ heap_close(pg_auth_members_rel, NoLock); heap_close(pg_authid_rel, NoLock); @@ -989,8 +982,7 @@ RenameRole(const char *oldname, const char *newname) int i; Oid roleid; - /* ExclusiveLock because we need to update the flat auth file */ - rel = heap_open(AuthIdRelationId, ExclusiveLock); + rel = heap_open(AuthIdRelationId, RowExclusiveLock); dsc = RelationGetDescr(rel); oldtuple = SearchSysCache(AUTHNAME, @@ -1080,6 +1072,11 @@ RenameRole(const char *oldname, const char *newname) CatalogUpdateIndexes(rel, newtuple); ReleaseSysCache(oldtuple); + + /* + * Close pg_authid, but keep lock till commit (this is important + * to prevent any risk of deadlock failure while updating flat file) + */ heap_close(rel, NoLock); /* @@ -1108,11 +1105,8 @@ GrantRole(GrantRoleStmt *stmt) grantee_ids = roleNamesToIds(stmt->grantee_roles); - /* - * Even though this operation doesn't change pg_authid, we must secure - * exclusive lock on it to protect our update of the flat auth file. - */ - pg_authid_rel = heap_open(AuthIdRelationId, ExclusiveLock); + /* AccessShareLock is enough since we aren't modifying pg_authid */ + pg_authid_rel = heap_open(AuthIdRelationId, AccessShareLock); /* * Step through all of the granted roles and add/remove entries for the @@ -1136,6 +1130,10 @@ GrantRole(GrantRoleStmt *stmt) stmt->admin_opt); } + /* + * Close pg_authid, but keep lock till commit (this is important + * to prevent any risk of deadlock failure while updating flat file) + */ heap_close(pg_authid_rel, NoLock); /* @@ -1237,8 +1235,7 @@ roleNamesToIds(List *memberNames) * grantorId: who is granting the membership * admin_opt: granting admin option? * - * Note: caller is responsible for holding ExclusiveLock on pg_authid, - * and for calling auth_file_update_needed(). + * Note: caller is responsible for calling auth_file_update_needed(). */ static void AddRoleMems(const char *rolename, Oid roleid, @@ -1283,7 +1280,6 @@ AddRoleMems(const char *rolename, Oid roleid, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be superuser to set grantor"))); - /* We need only regular writer's lock on pg_auth_members */ pg_authmem_rel = heap_open(AuthMemRelationId, RowExclusiveLock); pg_authmem_dsc = RelationGetDescr(pg_authmem_rel); @@ -1363,8 +1359,8 @@ AddRoleMems(const char *rolename, Oid roleid, } /* - * Now we can clean up; but keep lock until commit (to avoid possible - * deadlock when commit code tries to acquire lock). + * Close pg_authmem, but keep lock till commit (this is important + * to prevent any risk of deadlock failure while updating flat file) */ heap_close(pg_authmem_rel, NoLock); } @@ -1378,8 +1374,7 @@ AddRoleMems(const char *rolename, Oid roleid, * memberIds: OIDs of roles to del * admin_opt: remove admin option only? * - * Note: caller is responsible for holding ExclusiveLock on pg_authid, - * and for calling auth_file_update_needed(). + * Note: caller is responsible for calling auth_file_update_needed(). */ static void DelRoleMems(const char *rolename, Oid roleid, @@ -1418,7 +1413,6 @@ DelRoleMems(const char *rolename, Oid roleid, rolename))); } - /* We need only regular writer's lock on pg_auth_members */ pg_authmem_rel = heap_open(AuthMemRelationId, RowExclusiveLock); pg_authmem_dsc = RelationGetDescr(pg_authmem_rel); @@ -1478,8 +1472,8 @@ DelRoleMems(const char *rolename, Oid roleid, } /* - * Now we can clean up; but keep lock until commit (to avoid possible - * deadlock when commit code tries to acquire lock). + * Close pg_authmem, but keep lock till commit (this is important + * to prevent any risk of deadlock failure while updating flat file) */ heap_close(pg_authmem_rel, NoLock); } |