diff options
Diffstat (limited to 'src/backend/commands/dbcommands.c')
-rw-r--r-- | src/backend/commands/dbcommands.c | 100 |
1 files changed, 85 insertions, 15 deletions
diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c index d4ab736c690..974c5777bc3 100644 --- a/src/backend/commands/dbcommands.c +++ b/src/backend/commands/dbcommands.c @@ -140,7 +140,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt) int encoding = -1; bool dbistemplate = false; bool dballowconnections = true; - int dbconnlimit = -1; + int dbconnlimit = DATCONNLIMIT_UNLIMITED; int notherbackends; int npreparedxacts; createdb_failure_params fparms; @@ -309,7 +309,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt) if (dconnlimit && dconnlimit->arg) { dbconnlimit = defGetInt32(dconnlimit); - if (dbconnlimit < -1) + if (dbconnlimit < DATCONNLIMIT_UNLIMITED) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid connection limit: %d", dbconnlimit))); @@ -358,6 +358,16 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt) dbtemplate))); /* + * If the source database was in the process of being dropped, we can't + * use it as a template. + */ + if (database_is_invalid_oid(src_dboid)) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot use invalid database \"%s\" as template", dbtemplate), + errhint("Use DROP DATABASE to drop invalid databases.")); + + /* * Permission check: to copy a DB that's not marked datistemplate, you * must be superuser or the owner thereof. */ @@ -817,6 +827,7 @@ dropdb(const char *dbname, bool missing_ok, bool force) bool db_istemplate; Relation pgdbrel; HeapTuple tup; + Form_pg_database datform; int notherbackends; int npreparedxacts; int nslots, @@ -933,17 +944,6 @@ dropdb(const char *dbname, bool missing_ok, bool force) errdetail_busy_db(notherbackends, npreparedxacts))); /* - * Remove the database's tuple from pg_database. - */ - tup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(db_id)); - if (!HeapTupleIsValid(tup)) - elog(ERROR, "cache lookup failed for database %u", db_id); - - CatalogTupleDelete(pgdbrel, &tup->t_self); - - ReleaseSysCache(tup); - - /* * Delete any comments or security labels associated with the database. */ DeleteSharedComments(db_id, DatabaseRelationId); @@ -959,6 +959,32 @@ dropdb(const char *dbname, bool missing_ok, bool force) */ dropDatabaseDependencies(db_id); + tup = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(db_id)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for database %u", db_id); + datform = (Form_pg_database) GETSTRUCT(tup); + + /* + * Except for the deletion of the catalog row, subsequent actions are not + * transactional (consider DropDatabaseBuffers() discarding modified + * buffers). But we might crash or get interrupted below. To prevent + * accesses to a database with invalid contents, mark the database as + * invalid using an in-place update. + * + * We need to flush the WAL before continuing, to guarantee the + * modification is durable before performing irreversible filesystem + * operations. + */ + datform->datconnlimit = DATCONNLIMIT_INVALID_DB; + heap_inplace_update(pgdbrel, tup); + XLogFlush(XactLastRecEnd); + + /* + * Also delete the tuple - transactionally. If this transaction commits, + * the row will be gone, but if we fail, dropdb() can be invoked again. + */ + CatalogTupleDelete(pgdbrel, &tup->t_self); + /* * Drop db-specific replication slots. */ @@ -1481,7 +1507,7 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel) ListCell *option; bool dbistemplate = false; bool dballowconnections = true; - int dbconnlimit = -1; + int dbconnlimit = DATCONNLIMIT_UNLIMITED; DefElem *distemplate = NULL; DefElem *dallowconnections = NULL; DefElem *dconnlimit = NULL; @@ -1564,7 +1590,7 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel) if (dconnlimit && dconnlimit->arg) { dbconnlimit = defGetInt32(dconnlimit); - if (dbconnlimit < -1) + if (dbconnlimit < DATCONNLIMIT_UNLIMITED) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid connection limit: %d", dbconnlimit))); @@ -1591,6 +1617,14 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel) datform = (Form_pg_database) GETSTRUCT(tuple); dboid = datform->oid; + if (database_is_invalid_form(datform)) + { + ereport(FATAL, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot alter invalid database \"%s\"", stmt->dbname), + errhint("Use DROP DATABASE to drop invalid databases.")); + } + if (!pg_database_ownercheck(dboid, GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE, stmt->dbname); @@ -2170,6 +2204,42 @@ get_database_name(Oid dbid) return result; } + +/* + * While dropping a database the pg_database row is marked invalid, but the + * catalog contents still exist. Connections to such a database are not + * allowed. + */ +bool +database_is_invalid_form(Form_pg_database datform) +{ + return datform->datconnlimit == DATCONNLIMIT_INVALID_DB; +} + + +/* + * Convenience wrapper around database_is_invalid_form() + */ +bool +database_is_invalid_oid(Oid dboid) +{ + HeapTuple dbtup; + Form_pg_database dbform; + bool invalid; + + dbtup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dboid)); + if (!HeapTupleIsValid(dbtup)) + elog(ERROR, "cache lookup failed for database %u", dboid); + dbform = (Form_pg_database) GETSTRUCT(dbtup); + + invalid = database_is_invalid_form(dbform); + + ReleaseSysCache(dbtup); + + return invalid; +} + + /* * recovery_create_dbdir() * |