diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2000-11-14 18:37:49 +0000 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2000-11-14 18:37:49 +0000 |
commit | 2cf48ca04bf59985117e04dd71644a507be90dbb (patch) | |
tree | 6b1033da07f1805a79bcfb67345aba778559d74e /src/backend/commands/dbcommands.c | |
parent | 8a9315ca92804bd32b3ee864bf83d98840e1a947 (diff) | |
download | postgresql-2cf48ca04bf59985117e04dd71644a507be90dbb.tar.gz postgresql-2cf48ca04bf59985117e04dd71644a507be90dbb.zip |
Extend CREATE DATABASE to allow selection of a template database to be
cloned, rather than always cloning template1. Modify initdb to generate
two identical databases rather than one, template0 and template1.
Connections to template0 are disallowed, so that it will always remain
in its virgin as-initdb'd state. pg_dumpall now dumps databases with
restore commands that say CREATE DATABASE foo WITH TEMPLATE = template0.
This allows proper behavior when there is user-added data in template1.
initdb forced!
Diffstat (limited to 'src/backend/commands/dbcommands.c')
-rw-r--r-- | src/backend/commands/dbcommands.c | 351 |
1 files changed, 221 insertions, 130 deletions
diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c index 2442679250f..70fd952db63 100644 --- a/src/backend/commands/dbcommands.c +++ b/src/backend/commands/dbcommands.c @@ -8,12 +8,11 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/commands/dbcommands.c,v 1.66 2000/11/12 20:51:50 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/commands/dbcommands.c,v 1.67 2000/11/14 18:37:41 tgl Exp $ * *------------------------------------------------------------------------- */ #include "postgres.h" -#include "commands/dbcommands.h" #include <errno.h> #include <fcntl.h> @@ -27,6 +26,7 @@ #include "catalog/pg_database.h" #include "catalog/pg_shadow.h" #include "commands/comment.h" +#include "commands/dbcommands.h" #include "miscadmin.h" #include "storage/sinval.h" /* for DatabaseHasActiveBackends */ #include "utils/builtins.h" @@ -35,29 +35,40 @@ /* non-export function prototypes */ +static bool get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP, + int *encodingP, bool *dbIsTemplateP, + Oid *dbLastSysOidP, char *dbpath); static bool get_user_info(Oid use_sysid, bool *use_super, bool *use_createdb); -static bool get_db_info(const char *name, char *dbpath, Oid *dbIdP, int4 *ownerIdP); -static char * resolve_alt_dbpath(const char * dbpath, Oid dboid); -static bool remove_dbdirs(const char * real_loc, const char * altloc); +static char *resolve_alt_dbpath(const char *dbpath, Oid dboid); +static bool remove_dbdirs(const char *real_loc, const char *altloc); /* * CREATE DATABASE */ void -createdb(const char *dbname, const char *dbpath, int encoding) +createdb(const char *dbname, const char *dbpath, + const char *dbtemplate, int encoding) { + char *nominal_loc; + char *alt_loc; + char *target_dir; + char src_loc[MAXPGPATH]; char buf[2 * MAXPGPATH + 100]; - char *altloc; - char *real_loc; int ret; bool use_super, use_createdb; + Oid src_dboid; + int4 src_owner; + int src_encoding; + bool src_istemplate; + Oid src_lastsysoid; + char src_dbpath[MAXPGPATH]; Relation pg_database_rel; HeapTuple tuple; TupleDesc pg_database_dsc; Datum new_record[Natts_pg_database]; - char new_record_nulls[Natts_pg_database] = {' ', ' ', ' ', ' ', ' '}; + char new_record_nulls[Natts_pg_database]; Oid dboid; if (!get_user_info(GetUserId(), &use_super, &use_createdb)) @@ -66,122 +77,195 @@ createdb(const char *dbname, const char *dbpath, int encoding) if (!use_createdb && !use_super) elog(ERROR, "CREATE DATABASE: permission denied"); - if (get_db_info(dbname, NULL, NULL, NULL)) - elog(ERROR, "CREATE DATABASE: database \"%s\" already exists", dbname); - /* don't call this in a transaction block */ if (IsTransactionBlock()) elog(ERROR, "CREATE DATABASE: may not be called in a transaction block"); /* - * Insert a new tuple into pg_database + * Check for db name conflict. There is a race condition here, since + * another backend could create the same DB name before we commit. + * However, holding an exclusive lock on pg_database for the whole time + * we are copying the source database doesn't seem like a good idea, + * so accept possibility of race to create. We will check again after + * we grab the exclusive lock. */ - pg_database_rel = heap_openr(DatabaseRelationName, AccessExclusiveLock); - pg_database_dsc = RelationGetDescr(pg_database_rel); + if (get_db_info(dbname, NULL, NULL, NULL, NULL, NULL, NULL)) + elog(ERROR, "CREATE DATABASE: database \"%s\" already exists", dbname); - /* - * Preassign OID for pg_database tuple, so that we know current - * OID counter value + /* + * Lookup database (template) to be cloned. */ - dboid = newoid(); + if (!dbtemplate) + dbtemplate = "template1"; /* Default template database name */ - /* Form tuple */ - new_record[Anum_pg_database_datname - 1] = - DirectFunctionCall1(namein, CStringGetDatum(dbname)); - new_record[Anum_pg_database_datdba - 1] = Int32GetDatum(GetUserId()); - new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding); - /* Save current OID val */ - new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(dboid); - /* no nulls here, GetRawDatabaseInfo doesn't like them */ - new_record[Anum_pg_database_datpath - 1] = - DirectFunctionCall1(textin, CStringGetDatum(dbpath ? dbpath : "")); + if (!get_db_info(dbtemplate, &src_dboid, &src_owner, &src_encoding, + &src_istemplate, &src_lastsysoid, src_dbpath)) + elog(ERROR, "CREATE DATABASE: template \"%s\" does not exist", + dbtemplate); + /* + * Permission check: to copy a DB that's not marked datistemplate, + * you must be superuser or the owner thereof. + */ + if (!src_istemplate) + { + if (!use_super && GetUserId() != src_owner) + elog(ERROR, "CREATE DATABASE: permission to copy \"%s\" denied", + dbtemplate); + } + /* + * Determine physical path of source database + */ + alt_loc = resolve_alt_dbpath(src_dbpath, src_dboid); + if (!alt_loc) + alt_loc = GetDatabasePath(src_dboid); + strcpy(src_loc, alt_loc); - tuple = heap_formtuple(pg_database_dsc, new_record, new_record_nulls); + /* + * The source DB can't have any active backends, except this one + * (exception is to allow CREATE DB while connected to template1). + * Otherwise we might copy inconsistent data. This check is not + * bulletproof, since someone might connect while we are copying... + */ + if (DatabaseHasActiveBackends(src_dboid, true)) + elog(ERROR, "CREATE DATABASE: source database \"%s\" is being accessed by other users", dbtemplate); - tuple->t_data->t_oid = dboid; /* override heap_insert */ + /* If encoding is defaulted, use source's encoding */ + if (encoding < 0) + encoding = src_encoding; + /* + * Preassign OID for pg_database tuple, so that we can compute db path. + */ + dboid = newoid(); /* - * Update table + * Compute nominal location (where we will try to access the database), + * and resolve alternate physical location if one is specified. */ - heap_insert(pg_database_rel, tuple); - - real_loc = GetDatabasePath(tuple->t_data->t_oid); - altloc = resolve_alt_dbpath(dbpath, tuple->t_data->t_oid); + nominal_loc = GetDatabasePath(dboid); + alt_loc = resolve_alt_dbpath(dbpath, dboid); - if (strchr(real_loc, '\'') && strchr(altloc, '\'')) + if (strchr(nominal_loc, '\'')) + elog(ERROR, "database path may not contain single quotes"); + if (alt_loc && strchr(alt_loc, '\'')) + elog(ERROR, "database path may not contain single quotes"); + if (strchr(src_loc, '\'')) elog(ERROR, "database path may not contain single quotes"); /* ... otherwise we'd be open to shell exploits below */ - /* - * Update indexes (there aren't any currently) - */ -#ifdef Num_pg_database_indices - if (RelationGetForm(pg_database_rel)->relhasindex) - { - Relation idescs[Num_pg_database_indices]; - - CatalogOpenIndices(Num_pg_database_indices, - Name_pg_database_indices, idescs); - CatalogIndexInsert(idescs, Num_pg_database_indices, pg_database_rel, - tuple); - CatalogCloseIndices(Num_pg_database_indices, idescs); - } +#ifdef XLOG + /* Try to force any dirty buffers out to disk */ + BufferSync(); #endif - heap_close(pg_database_rel, NoLock); - /* * Close virtual file descriptors so the kernel has more available for * the mkdir() and system() calls below. */ closeAllVfds(); - /* Copy the template database to the new location */ + /* + * Check we can create the target directory --- but then remove it + * because we rely on cp(1) to create it for real. + */ + target_dir = alt_loc ? alt_loc : nominal_loc; - if (mkdir((altloc ? altloc : real_loc), S_IRWXU) != 0) - elog(ERROR, "CREATE DATABASE: unable to create database directory '%s': %s", - (altloc ? altloc : real_loc), strerror(errno)); + if (mkdir(target_dir, S_IRWXU) != 0) + elog(ERROR, "CREATE DATABASE: unable to create database directory '%s': %m", + target_dir); + rmdir(target_dir); - if (altloc) + /* Make the symlink, if needed */ + if (alt_loc) { - if (symlink(altloc, real_loc) != 0) - elog(ERROR, "CREATE DATABASE: could not link %s to %s: %s", - real_loc, altloc, strerror(errno)); + if (symlink(alt_loc, nominal_loc) != 0) + elog(ERROR, "CREATE DATABASE: could not link '%s' to '%s': %m", + nominal_loc, alt_loc); } - snprintf(buf, sizeof(buf), "cp '%s'/* '%s'", - GetDatabasePath(TemplateDbOid), real_loc); + /* Copy the template database to the new location */ + snprintf(buf, sizeof(buf), "cp -r '%s' '%s'", src_loc, target_dir); ret = system(buf); /* Some versions of SunOS seem to return ECHILD after a system() call */ if (ret != 0 && errno != ECHILD) { - if (remove_dbdirs(real_loc, altloc)) + if (remove_dbdirs(nominal_loc, alt_loc)) elog(ERROR, "CREATE DATABASE: could not initialize database directory"); else elog(ERROR, "CREATE DATABASE: could not initialize database directory; delete failed as well"); } -#ifdef XLOG - BufferSync(); + /* + * Now OK to grab exclusive lock on pg_database. + */ + pg_database_rel = heap_openr(DatabaseRelationName, AccessExclusiveLock); + + /* Check to see if someone else created same DB name meanwhile. */ + if (get_db_info(dbname, NULL, NULL, NULL, NULL, NULL, NULL)) + { + remove_dbdirs(nominal_loc, alt_loc); + elog(ERROR, "CREATE DATABASE: database \"%s\" already exists", dbname); + } + + /* + * Insert a new tuple into pg_database + */ + pg_database_dsc = RelationGetDescr(pg_database_rel); + + /* Form tuple */ + new_record[Anum_pg_database_datname - 1] = + DirectFunctionCall1(namein, CStringGetDatum(dbname)); + new_record[Anum_pg_database_datdba - 1] = Int32GetDatum(GetUserId()); + new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding); + new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(false); + new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(true); + new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid); + /* no nulls here, GetRawDatabaseInfo doesn't like them */ + new_record[Anum_pg_database_datpath - 1] = + DirectFunctionCall1(textin, CStringGetDatum(dbpath ? dbpath : "")); + + memset(new_record_nulls, ' ', sizeof(new_record_nulls)); + + tuple = heap_formtuple(pg_database_dsc, new_record, new_record_nulls); + + tuple->t_data->t_oid = dboid; /* override heap_insert's OID selection */ + + heap_insert(pg_database_rel, tuple); + + /* + * Update indexes (there aren't any currently) + */ +#ifdef Num_pg_database_indices + if (RelationGetForm(pg_database_rel)->relhasindex) + { + Relation idescs[Num_pg_database_indices]; + + CatalogOpenIndices(Num_pg_database_indices, + Name_pg_database_indices, idescs); + CatalogIndexInsert(idescs, Num_pg_database_indices, pg_database_rel, + tuple); + CatalogCloseIndices(Num_pg_database_indices, idescs); + } #endif -} + /* Close pg_database, but keep lock till commit */ + heap_close(pg_database_rel, NoLock); +} /* * DROP DATABASE */ - void dropdb(const char *dbname) { int4 db_owner; + bool db_istemplate; bool use_super; Oid db_id; - char *altloc; - char *real_loc; + char *alt_loc; + char *nominal_loc; char dbpath[MAXPGPATH]; Relation pgdbrel; HeapScanDesc pgdbscan; @@ -190,9 +274,6 @@ dropdb(const char *dbname) AssertArg(dbname); - if (strcmp(dbname, "template1") == 0) - elog(ERROR, "DROP DATABASE: may not be executed on the template1 database"); - if (strcmp(dbname, DatabaseName) == 0) elog(ERROR, "DROP DATABASE: cannot be executed on the currently open database"); @@ -202,15 +283,6 @@ dropdb(const char *dbname) if (!get_user_info(GetUserId(), &use_super, NULL)) elog(ERROR, "current user name is invalid"); - if (!get_db_info(dbname, dbpath, &db_id, &db_owner)) - elog(ERROR, "DROP DATABASE: database \"%s\" does not exist", dbname); - - if (GetUserId() != db_owner && !use_super) - elog(ERROR, "DROP DATABASE: permission denied"); - - real_loc = GetDatabasePath(db_id); - altloc = resolve_alt_dbpath(dbpath, db_id); - /* * Obtain exclusive lock on pg_database. We need this to ensure that * no new backend starts up in the target database while we are @@ -222,14 +294,29 @@ dropdb(const char *dbname) */ pgdbrel = heap_openr(DatabaseRelationName, AccessExclusiveLock); + if (!get_db_info(dbname, &db_id, &db_owner, NULL, + &db_istemplate, NULL, dbpath)) + elog(ERROR, "DROP DATABASE: database \"%s\" does not exist", dbname); + + if (!use_super && GetUserId() != db_owner) + elog(ERROR, "DROP DATABASE: permission denied"); + + /* + * Disallow dropping a DB that is marked istemplate. This is just + * to prevent people from accidentally dropping template0 or template1; + * they can do so if they're really determined ... + */ + if (db_istemplate) + elog(ERROR, "DROP DATABASE: database is marked as a template"); + + nominal_loc = GetDatabasePath(db_id); + alt_loc = resolve_alt_dbpath(dbpath, db_id); + /* * Check for active backends in the target database. */ if (DatabaseHasActiveBackends(db_id, false)) - { - heap_close(pgdbrel, AccessExclusiveLock); elog(ERROR, "DROP DATABASE: database \"%s\" is being accessed by other users", dbname); - } /* * Find the database's tuple by OID (should be unique, we trust). @@ -242,8 +329,6 @@ dropdb(const char *dbname) tup = heap_getnext(pgdbscan, 0); if (!HeapTupleIsValid(tup)) { - heap_close(pgdbrel, AccessExclusiveLock); - /* * This error should never come up since the existence of the * database is checked earlier @@ -252,9 +337,6 @@ dropdb(const char *dbname) dbname); } - /* Delete any comments associated with the database */ - DeleteComments(db_id); - /* Remove the database's tuple from pg_database */ heap_delete(pgdbrel, &tup->t_self, NULL); @@ -266,6 +348,9 @@ dropdb(const char *dbname) */ heap_close(pgdbrel, NoLock); + /* Delete any comments associated with the database */ + DeleteComments(db_id); + /* * Drop pages for this database that are in the shared buffer cache. * This is important to ensure that no remaining backend tries to @@ -274,15 +359,9 @@ dropdb(const char *dbname) DropBuffers(db_id); /* - * Close virtual file descriptors so the kernel has more available for - * the system() call below. - */ - closeAllVfds(); - - /* * Remove the database's subdirectory and everything in it. */ - remove_dbdirs(real_loc, altloc); + remove_dbdirs(nominal_loc, alt_loc); } @@ -292,28 +371,32 @@ dropdb(const char *dbname) */ static bool -get_db_info(const char *name, char *dbpath, Oid *dbIdP, int4 *ownerIdP) +get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP, + int *encodingP, bool *dbIsTemplateP, + Oid *dbLastSysOidP, char *dbpath) { Relation relation; - HeapTuple tuple; ScanKeyData scanKey; HeapScanDesc scan; + HeapTuple tuple; AssertArg(name); - relation = heap_openr(DatabaseRelationName, AccessExclusiveLock /* ??? */ ); + /* Caller may wish to grab a better lock on pg_database beforehand... */ + relation = heap_openr(DatabaseRelationName, AccessShareLock); ScanKeyEntryInitialize(&scanKey, 0, Anum_pg_database_datname, F_NAMEEQ, NameGetDatum(name)); scan = heap_beginscan(relation, 0, SnapshotNow, 1, &scanKey); if (!HeapScanIsValid(scan)) - elog(ERROR, "Cannot begin scan of %s.", DatabaseRelationName); + elog(ERROR, "Cannot begin scan of %s", DatabaseRelationName); tuple = heap_getnext(scan, 0); if (HeapTupleIsValid(tuple)) { + Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple); text *tmptext; bool isnull; @@ -322,22 +405,23 @@ get_db_info(const char *name, char *dbpath, Oid *dbIdP, int4 *ownerIdP) *dbIdP = tuple->t_data->t_oid; /* uid of the owner */ if (ownerIdP) - { - *ownerIdP = (int4) heap_getattr(tuple, - Anum_pg_database_datdba, - RelationGetDescr(relation), - &isnull); - if (isnull) - *ownerIdP = -1; /* hopefully no one has that id already ;) */ - } + *ownerIdP = dbform->datdba; + /* multibyte encoding */ + if (encodingP) + *encodingP = dbform->encoding; + /* allowed as template? */ + if (dbIsTemplateP) + *dbIsTemplateP = dbform->datistemplate; + /* last system OID used in database */ + if (dbLastSysOidP) + *dbLastSysOidP = dbform->datlastsysoid; /* database path (as registered in pg_database) */ if (dbpath) { - tmptext = (text *) heap_getattr(tuple, - Anum_pg_database_datpath, - RelationGetDescr(relation), - &isnull); - + tmptext = DatumGetTextP(heap_getattr(tuple, + Anum_pg_database_datpath, + RelationGetDescr(relation), + &isnull)); if (!isnull) { Assert(VARSIZE(tmptext) - VARHDRSZ < MAXPGPATH); @@ -349,16 +433,9 @@ get_db_info(const char *name, char *dbpath, Oid *dbIdP, int4 *ownerIdP) strcpy(dbpath, ""); } } - else - { - if (dbIdP) - *dbIdP = InvalidOid; - } heap_endscan(scan); - - /* We will keep the lock on the relation until end of transaction. */ - heap_close(relation, NoLock); + heap_close(relation, AccessShareLock); return HeapTupleIsValid(tuple); } @@ -396,6 +473,8 @@ resolve_alt_dbpath(const char * dbpath, Oid dboid) if (strchr(dbpath, '/')) { + if (dbpath[0] != '/') + elog(ERROR, "Relative paths are not allowed as database locations"); #ifndef ALLOW_ABSOLUTE_DBPATHS elog(ERROR, "Absolute paths are not allowed as database locations"); #endif @@ -406,9 +485,9 @@ resolve_alt_dbpath(const char * dbpath, Oid dboid) /* must be environment variable */ char * var = getenv(dbpath); if (!var) - elog(ERROR, "environment variable %s not set", dbpath); + elog(ERROR, "Postmaster environment variable '%s' not set", dbpath); if (var[0] != '/') - elog(ERROR, "environment variable %s must be absolute path", dbpath); + elog(ERROR, "Postmaster environment variable '%s' must be absolute path", dbpath); prefix = var; } @@ -421,24 +500,36 @@ resolve_alt_dbpath(const char * dbpath, Oid dboid) static bool -remove_dbdirs(const char * real_loc, const char * altloc) +remove_dbdirs(const char * nominal_loc, const char * alt_loc) { + const char *target_dir; char buf[MAXPGPATH + 100]; bool success = true; - if (altloc) + target_dir = alt_loc ? alt_loc : nominal_loc; + + /* + * Close virtual file descriptors so the kernel has more available for + * the system() call below. + */ + closeAllVfds(); + + if (alt_loc) + { /* remove symlink */ - if (unlink(real_loc) != 0) + if (unlink(nominal_loc) != 0) { - elog(NOTICE, "could not remove '%s': %s", real_loc, strerror(errno)); + elog(NOTICE, "could not remove '%s': %m", nominal_loc); success = false; } + } + + snprintf(buf, sizeof(buf), "rm -rf '%s'", target_dir); - snprintf(buf, sizeof(buf), "rm -rf '%s'", altloc ? altloc : real_loc); if (system(buf) != 0 && errno != ECHILD) { elog(NOTICE, "database directory '%s' could not be removed", - altloc ? altloc : real_loc); + target_dir); success = false; } |