diff options
Diffstat (limited to 'src/backend/commands/dbcommands.c')
-rw-r--r-- | src/backend/commands/dbcommands.c | 769 |
1 files changed, 659 insertions, 110 deletions
diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c index 623e5ec7789..df16533901e 100644 --- a/src/backend/commands/dbcommands.c +++ b/src/backend/commands/dbcommands.c @@ -63,13 +63,31 @@ #include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/pg_locale.h" +#include "utils/relmapper.h" #include "utils/snapmgr.h" #include "utils/syscache.h" +/* + * Create database strategy. + * + * CREATEDB_WAL_LOG will copy the database at the block level and WAL log each + * copied block. + * + * CREATEDB_FILE_COPY will simply perform a file system level copy of the + * database and log a single record for each tablespace copied. To make this + * safe, it also triggers checkpoints before and after the operation. + */ +typedef enum CreateDBStrategy +{ + CREATEDB_WAL_LOG, + CREATEDB_FILE_COPY +} CreateDBStrategy; + typedef struct { Oid src_dboid; /* source (template) DB */ Oid dest_dboid; /* DB we are trying to create */ + CreateDBStrategy strategy; /* create db strategy */ } createdb_failure_params; typedef struct @@ -78,6 +96,17 @@ typedef struct Oid dest_tsoid; /* tablespace we are trying to move to */ } movedb_failure_params; +/* + * Information about a relation to be copied when creating a database. + */ +typedef struct CreateDBRelInfo +{ + RelFileNode rnode; /* physical relation identifier */ + Oid reloid; /* relation oid */ + bool permanent; /* relation is permanent or unlogged */ +} CreateDBRelInfo; + + /* non-export function prototypes */ static void createdb_failure_callback(int code, Datum arg); static void movedb(const char *dbname, const char *tblspcname); @@ -93,7 +122,546 @@ static bool have_createdb_privilege(void); static void remove_dbtablespaces(Oid db_id); static bool check_db_file_conflict(Oid db_id); static int errdetail_busy_db(int notherbackends, int npreparedxacts); +static void CreateDatabaseUsingWalLog(Oid src_dboid, Oid dboid, Oid src_tsid, + Oid dst_tsid); +static List *ScanSourceDatabasePgClass(Oid srctbid, Oid srcdbid, char *srcpath); +static List *ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid, + Oid dbid, char *srcpath, + List *rnodelist, Snapshot snapshot); +static CreateDBRelInfo *ScanSourceDatabasePgClassTuple(HeapTupleData *tuple, + Oid tbid, Oid dbid, + char *srcpath); +static void CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid, + bool isRedo); +static void CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dboid, Oid src_tsid, + Oid dst_tsid); + +/* + * Create a new database using the WAL_LOG strategy. + * + * Each copied block is separately written to the write-ahead log. + */ +static void +CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid, + Oid src_tsid, Oid dst_tsid) +{ + char *srcpath; + char *dstpath; + List *rnodelist = NULL; + ListCell *cell; + LockRelId srcrelid; + LockRelId dstrelid; + RelFileNode srcrnode; + RelFileNode dstrnode; + CreateDBRelInfo *relinfo; + + /* Get source and destination database paths. */ + srcpath = GetDatabasePath(src_dboid, src_tsid); + dstpath = GetDatabasePath(dst_dboid, dst_tsid); + + /* Create database directory and write PG_VERSION file. */ + CreateDirAndVersionFile(dstpath, dst_dboid, dst_tsid, false); + + /* Copy relmap file from source database to the destination database. */ + RelationMapCopy(dst_dboid, dst_tsid, srcpath, dstpath); + + /* Get list of relfilenodes to copy from the source database. */ + rnodelist = ScanSourceDatabasePgClass(src_tsid, src_dboid, srcpath); + Assert(rnodelist != NIL); + + /* + * Database IDs will be the same for all relations so set them before + * entering the loop. + */ + srcrelid.dbId = src_dboid; + dstrelid.dbId = dst_dboid; + + /* Loop over our list of relfilenodes and copy each one. */ + foreach(cell, rnodelist) + { + relinfo = lfirst(cell); + srcrnode = relinfo->rnode; + + /* + * If the relation is from the source db's default tablespace then we + * need to create it in the destinations db's default tablespace. + * Otherwise, we need to create in the same tablespace as it is in the + * source database. + */ + if (srcrnode.spcNode == src_tsid) + dstrnode.spcNode = dst_tsid; + else + dstrnode.spcNode = srcrnode.spcNode; + + dstrnode.dbNode = dst_dboid; + dstrnode.relNode = srcrnode.relNode; + + /* + * Acquire locks on source and target relations before copying. + * + * We typically do not read relation data into shared_buffers without + * holding a relation lock. It's unclear what could go wrong if we + * skipped it in this case, because nobody can be modifying either + * the source or destination database at this point, and we have locks + * on both databases, too, but let's take the conservative route. + */ + dstrelid.relId = srcrelid.relId = relinfo->reloid; + LockRelationId(&srcrelid, AccessShareLock); + LockRelationId(&dstrelid, AccessShareLock); + + /* Copy relation storage from source to the destination. */ + CreateAndCopyRelationData(srcrnode, dstrnode, relinfo->permanent); + + /* Release the relation locks. */ + UnlockRelationId(&srcrelid, AccessShareLock); + UnlockRelationId(&dstrelid, AccessShareLock); + } + + list_free_deep(rnodelist); +} + +/* + * Scan the pg_class table in the source database to identify the relations + * that need to be copied to the destination database. + * + * This is an exception to the usual rule that cross-database access is + * not possible. We can make it work here because we know that there are no + * connections to the source database and (since there can't be prepared + * transactions touching that database) no in-doubt tuples either. This + * means that we don't need to worry about pruning removing anything from + * under us, and we don't need to be too picky about our snapshot either. + * As long as it sees all previously-committed XIDs as committed and all + * aborted XIDs as aborted, we should be fine: nothing else is possible + * here. + * + * We can't rely on the relcache for anything here, because that only knows + * about the database to which we are connected, and can't handle access to + * other databases. That also means we can't rely on the heap scan + * infrastructure, which would be a bad idea anyway since it might try + * to do things like HOT pruning which we definitely can't do safely in + * a database to which we're not even connected. + */ +static List * +ScanSourceDatabasePgClass(Oid tbid, Oid dbid, char *srcpath) +{ + RelFileNode rnode; + BlockNumber nblocks; + BlockNumber blkno; + Buffer buf; + Oid relfilenode; + Page page; + List *rnodelist = NIL; + LockRelId relid; + Relation rel; + Snapshot snapshot; + BufferAccessStrategy bstrategy; + + /* Get pg_class relfilenode. */ + relfilenode = RelationMapOidToFilenodeForDatabase(srcpath, + RelationRelationId); + + /* Don't read data into shared_buffers without holding a relation lock. */ + relid.dbId = dbid; + relid.relId = RelationRelationId; + LockRelationId(&relid, AccessShareLock); + + /* Prepare a RelFileNode for the pg_class relation. */ + rnode.spcNode = tbid; + rnode.dbNode = dbid; + rnode.relNode = relfilenode; + + /* + * We can't use a real relcache entry for a relation in some other + * database, but since we're only going to access the fields related + * to physical storage, a fake one is good enough. If we didn't do this + * and used the smgr layer directly, we would have to worry about + * invalidations. + */ + rel = CreateFakeRelcacheEntry(rnode); + nblocks = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM); + FreeFakeRelcacheEntry(rel); + + /* Use a buffer access strategy since this is a bulk read operation. */ + bstrategy = GetAccessStrategy(BAS_BULKREAD); + + /* + * As explained in the function header comments, we need a snapshot that + * will see all committed transactions as committed, and our transaction + * snapshot - or the active snapshot - might not be new enough for that, + * but the return value of GetLatestSnapshot() should work fine. + */ + snapshot = GetLatestSnapshot(); + + /* Process the relation block by block. */ + for (blkno = 0; blkno < nblocks; blkno++) + { + CHECK_FOR_INTERRUPTS(); + + buf = ReadBufferWithoutRelcache(rnode, MAIN_FORKNUM, blkno, + RBM_NORMAL, bstrategy, false); + + LockBuffer(buf, BUFFER_LOCK_SHARE); + page = BufferGetPage(buf); + if (PageIsNew(page) || PageIsEmpty(page)) + { + UnlockReleaseBuffer(buf); + continue; + } + + /* Append relevant pg_class tuples for current page to rnodelist. */ + rnodelist = ScanSourceDatabasePgClassPage(page, buf, tbid, dbid, + srcpath, rnodelist, + snapshot); + + UnlockReleaseBuffer(buf); + } + + /* Release relation lock. */ + UnlockRelationId(&relid, AccessShareLock); + + return rnodelist; +} + +/* + * Scan one page of the source database's pg_class relation and add relevant + * entries to rnodelist. The return value is the updated list. + */ +static List * +ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid, Oid dbid, + char *srcpath, List *rnodelist, + Snapshot snapshot) +{ + BlockNumber blkno = BufferGetBlockNumber(buf); + OffsetNumber offnum; + OffsetNumber maxoff; + HeapTupleData tuple; + + maxoff = PageGetMaxOffsetNumber(page); + + /* Loop over offsets. */ + for (offnum = FirstOffsetNumber; + offnum <= maxoff; + offnum = OffsetNumberNext(offnum)) + { + ItemId itemid; + + itemid = PageGetItemId(page, offnum); + + /* Nothing to do if slot is empty or already dead. */ + if (!ItemIdIsUsed(itemid) || ItemIdIsDead(itemid) || + ItemIdIsRedirected(itemid)) + continue; + + Assert(ItemIdIsNormal(itemid)); + ItemPointerSet(&(tuple.t_self), blkno, offnum); + + /* Initialize a HeapTupleData structure. */ + tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid); + tuple.t_len = ItemIdGetLength(itemid); + tuple.t_tableOid = RelationRelationId; + + /* Skip tuples that are not visible to this snapshot. */ + if (HeapTupleSatisfiesVisibility(&tuple, snapshot, buf)) + { + CreateDBRelInfo *relinfo; + + /* + * ScanSourceDatabasePgClassTuple is in charge of constructing + * a CreateDBRelInfo object for this tuple, but can also decide + * that this tuple isn't something we need to copy. If we do need + * to copy the relation, add it to the list. + */ + relinfo = ScanSourceDatabasePgClassTuple(&tuple, tbid, dbid, + srcpath); + if (relinfo != NULL) + rnodelist = lappend(rnodelist, relinfo); + } + } + return rnodelist; +} + +/* + * Decide whether a certain pg_class tuple represents something that + * needs to be copied from the source database to the destination database, + * and if so, construct a CreateDBRelInfo for it. + * + * Visbility checks are handled by the caller, so our job here is just + * to assess the data stored in the tuple. + */ +CreateDBRelInfo * +ScanSourceDatabasePgClassTuple(HeapTupleData *tuple, Oid tbid, Oid dbid, + char *srcpath) +{ + CreateDBRelInfo *relinfo; + Form_pg_class classForm; + Oid relfilenode = InvalidOid; + + classForm = (Form_pg_class) GETSTRUCT(tuple); + + /* + * Return NULL if this object does not need to be copied. + * + * Shared objects don't need to be copied, because they are shared. + * Objects without storage can't be copied, because there's nothing to + * copy. Temporary relations don't need to be copied either, because + * they are inaccessible outside of the session that created them, + * which must be gone already, and couldn't connect to a different database + * if it still existed. autovacuum will eventually remove the pg_class + * entries as well. + */ + if (classForm->reltablespace == GLOBALTABLESPACE_OID || + !RELKIND_HAS_STORAGE(classForm->relkind) || + classForm->relpersistence == RELPERSISTENCE_TEMP) + return NULL; + + /* + * If relfilenode is valid then directly use it. Otherwise, consult the + * relmap. + */ + if (OidIsValid(classForm->relfilenode)) + relfilenode = classForm->relfilenode; + else + relfilenode = RelationMapOidToFilenodeForDatabase(srcpath, + classForm->oid); + + /* We must have a valid relfilenode oid. */ + if (!OidIsValid(relfilenode)) + elog(ERROR, "relation with OID %u does not have a valid relfilenode", + classForm->oid); + + /* Prepare a rel info element and add it to the list. */ + relinfo = (CreateDBRelInfo *) palloc(sizeof(CreateDBRelInfo)); + if (OidIsValid(classForm->reltablespace)) + relinfo->rnode.spcNode = classForm->reltablespace; + else + relinfo->rnode.spcNode = tbid; + + relinfo->rnode.dbNode = dbid; + relinfo->rnode.relNode = relfilenode; + relinfo->reloid = classForm->oid; + + /* Temporary relations were rejected above. */ + Assert(classForm->relpersistence != RELPERSISTENCE_TEMP); + relinfo->permanent = + (classForm->relpersistence == RELPERSISTENCE_PERMANENT) ? true : false; + + return relinfo; +} + +/* + * Create database directory and write out the PG_VERSION file in the database + * path. If isRedo is true, it's okay for the database directory to exist + * already. + */ +static void +CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid, bool isRedo) +{ + int fd; + int nbytes; + char versionfile[MAXPGPATH]; + char buf[16]; + + /* + * Prepare version data before starting a critical section. + * + * Note that we don't have to copy this from the source database; there's + * only one legal value. + */ + sprintf(buf, "%s\n", PG_MAJORVERSION); + nbytes = strlen(PG_MAJORVERSION) + 1; + + /* If we are not in WAL replay then write the WAL. */ + if (!isRedo) + { + xl_dbase_create_wal_log_rec xlrec; + XLogRecPtr lsn; + + START_CRIT_SECTION(); + + xlrec.db_id = dbid; + xlrec.tablespace_id = tsid; + + XLogBeginInsert(); + XLogRegisterData((char *) (&xlrec), + sizeof(xl_dbase_create_wal_log_rec)); + + lsn = XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE_WAL_LOG); + + /* As always, WAL must hit the disk before the data update does. */ + XLogFlush(lsn); + } + + /* Create database directory. */ + if (MakePGDirectory(dbpath) < 0) + { + /* Failure other than already exists or not in WAL replay? */ + if (errno != EEXIST || !isRedo) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not create directory \"%s\": %m", dbpath))); + } + + /* + * Create PG_VERSION file in the database path. If the file already + * exists and we are in WAL replay then try again to open it in write + * mode. + */ + snprintf(versionfile, sizeof(versionfile), "%s/%s", dbpath, "PG_VERSION"); + + fd = OpenTransientFile(versionfile, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY); + if (fd < 0 && errno == EEXIST && isRedo) + fd = OpenTransientFile(versionfile, O_WRONLY | O_TRUNC | PG_BINARY); + + if (fd < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not create file \"%s\": %m", versionfile))); + + /* Write PG_MAJORVERSION in the PG_VERSION file. */ + pgstat_report_wait_start(WAIT_EVENT_VERSION_FILE_WRITE); + errno = 0; + if ((int) write(fd, buf, nbytes) != nbytes) + { + /* If write didn't set errno, assume problem is no disk space. */ + if (errno == 0) + errno = ENOSPC; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write to file \"%s\": %m", versionfile))); + } + pgstat_report_wait_end(); + + /* Close the version file. */ + CloseTransientFile(fd); + + /* Critical section done. */ + if (!isRedo) + END_CRIT_SECTION(); +} + +/* + * Create a new database using the FILE_COPY strategy. + * + * Copy each tablespace at the filesystem level, and log a single WAL record + * for each tablespace copied. This requires a checkpoint before and after the + * copy, which may be expensive, but it does greatly reduce WAL generation + * if the copied database is large. + */ +static void +CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid, Oid src_tsid, + Oid dst_tsid) +{ + TableScanDesc scan; + Relation rel; + HeapTuple tuple; + + /* + * Force a checkpoint before starting the copy. This will force all dirty + * buffers, including those of unlogged tables, out to disk, to ensure + * source database is up-to-date on disk for the copy. + * FlushDatabaseBuffers() would suffice for that, but we also want to + * process any pending unlink requests. Otherwise, if a checkpoint + * happened while we're copying files, a file might be deleted just when + * we're about to copy it, causing the lstat() call in copydir() to fail + * with ENOENT. + */ + RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | + CHECKPOINT_WAIT | CHECKPOINT_FLUSH_ALL); + + /* + * Iterate through all tablespaces of the template database, and copy each + * one to the new database. + */ + rel = table_open(TableSpaceRelationId, AccessShareLock); + scan = table_beginscan_catalog(rel, 0, NULL); + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + Form_pg_tablespace spaceform = (Form_pg_tablespace) GETSTRUCT(tuple); + Oid srctablespace = spaceform->oid; + Oid dsttablespace; + char *srcpath; + char *dstpath; + struct stat st; + + /* No need to copy global tablespace */ + if (srctablespace == GLOBALTABLESPACE_OID) + continue; + + srcpath = GetDatabasePath(src_dboid, srctablespace); + + if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) || + directory_is_empty(srcpath)) + { + /* Assume we can ignore it */ + pfree(srcpath); + continue; + } + + if (srctablespace == src_tsid) + dsttablespace = dst_tsid; + else + dsttablespace = srctablespace; + + dstpath = GetDatabasePath(dst_dboid, dsttablespace); + + /* + * Copy this subdirectory to the new location + * + * We don't need to copy subdirectories + */ + copydir(srcpath, dstpath, false); + + /* Record the filesystem change in XLOG */ + { + xl_dbase_create_file_copy_rec xlrec; + + xlrec.db_id = dst_dboid; + xlrec.tablespace_id = dsttablespace; + xlrec.src_db_id = src_dboid; + xlrec.src_tablespace_id = srctablespace; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, + sizeof(xl_dbase_create_file_copy_rec)); + + (void) XLogInsert(RM_DBASE_ID, + XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE); + } + } + table_endscan(scan); + table_close(rel, AccessShareLock); + + /* + * We force a checkpoint before committing. This effectively means that + * committed XLOG_DBASE_CREATE_FILE_COPY operations will never need to be + * replayed (at least not in ordinary crash recovery; we still have to + * make the XLOG entry for the benefit of PITR operations). This avoids + * two nasty scenarios: + * + * #1: When PITR is off, we don't XLOG the contents of newly created + * indexes; therefore the drop-and-recreate-whole-directory behavior of + * DBASE_CREATE replay would lose such indexes. + * + * #2: Since we have to recopy the source database during DBASE_CREATE + * replay, we run the risk of copying changes in it that were committed + * after the original CREATE DATABASE command but before the system crash + * that led to the replay. This is at least unexpected and at worst could + * lead to inconsistencies, eg duplicate table names. + * + * (Both of these were real bugs in releases 8.0 through 8.0.3.) + * + * In PITR replay, the first of these isn't an issue, and the second is + * only a risk if the CREATE DATABASE and subsequent template database + * change both occur while a base backup is being taken. There doesn't + * seem to be much we can do about that except document it as a + * limitation. + * + * See CreateDatabaseUsingWalLog() for a less cheesy CREATE DATABASE + * strategy that avoids these problems. + */ + RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT); +} /* * CREATE DATABASE @@ -101,8 +669,6 @@ static int errdetail_busy_db(int notherbackends, int npreparedxacts); Oid createdb(ParseState *pstate, const CreatedbStmt *stmt) { - TableScanDesc scan; - Relation rel; Oid src_dboid; Oid src_owner; int src_encoding = -1; @@ -137,6 +703,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt) DefElem *dallowconnections = NULL; DefElem *dconnlimit = NULL; DefElem *dcollversion = NULL; + DefElem *dstrategy = NULL; char *dbname = stmt->dbname; char *dbowner = NULL; const char *dbtemplate = NULL; @@ -152,6 +719,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt) char *dbcollversion = NULL; int notherbackends; int npreparedxacts; + CreateDBStrategy dbstrategy = CREATEDB_WAL_LOG; createdb_failure_params fparms; /* Extract options from the statement node tree */ @@ -269,6 +837,12 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt) (errcode(ERRCODE_INVALID_PARAMETER_VALUE)), errmsg("OIDs less than %u are reserved for system objects", FirstNormalObjectId)); } + else if (strcmp(defel->defname, "strategy") == 0) + { + if (dstrategy) + errorConflictingDefElem(defel, pstate); + dstrategy = defel; + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -413,6 +987,23 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt) dbtemplate))); } + /* Validate the database creation strategy. */ + if (dstrategy && dstrategy->arg) + { + char *strategy; + + strategy = defGetString(dstrategy); + if (strcmp(strategy, "wal_log") == 0) + dbstrategy = CREATEDB_WAL_LOG; + else if (strcmp(strategy, "file_copy") == 0) + dbstrategy = CREATEDB_FILE_COPY; + else + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid create database strategy %s", strategy), + errhint("Valid strategies are \"wal_log\", and \"file_copy\"."))); + } + /* If encoding or locales are defaulted, use source's setting */ if (encoding < 0) encoding = src_encoding; @@ -753,17 +1344,18 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt) InvokeObjectPostCreateHook(DatabaseRelationId, dboid, 0); /* - * Force a checkpoint before starting the copy. This will force all dirty - * buffers, including those of unlogged tables, out to disk, to ensure - * source database is up-to-date on disk for the copy. - * FlushDatabaseBuffers() would suffice for that, but we also want to - * process any pending unlink requests. Otherwise, if a checkpoint - * happened while we're copying files, a file might be deleted just when - * we're about to copy it, causing the lstat() call in copydir() to fail - * with ENOENT. + * If we're going to be reading data for the to-be-created database + * into shared_buffers, take a lock on it. Nobody should know that this + * database exists yet, but it's good to maintain the invariant that a + * lock an AccessExclusiveLock on the database is sufficient to drop all + * of its buffers without worrying about more being read later. + * + * Note that we need to do this before entering the PG_ENSURE_ERROR_CLEANUP + * block below, because createdb_failure_callback expects this lock to + * be held already. */ - RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT - | CHECKPOINT_FLUSH_ALL); + if (dbstrategy == CREATEDB_WAL_LOG) + LockSharedObject(DatabaseRelationId, dboid, 0, AccessShareLock); /* * Once we start copying subdirectories, we need to be able to clean 'em @@ -774,101 +1366,24 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt) */ fparms.src_dboid = src_dboid; fparms.dest_dboid = dboid; + fparms.strategy = dbstrategy; + PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback, PointerGetDatum(&fparms)); { /* - * Iterate through all tablespaces of the template database, and copy - * each one to the new database. - */ - rel = table_open(TableSpaceRelationId, AccessShareLock); - scan = table_beginscan_catalog(rel, 0, NULL); - while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) - { - Form_pg_tablespace spaceform = (Form_pg_tablespace) GETSTRUCT(tuple); - Oid srctablespace = spaceform->oid; - Oid dsttablespace; - char *srcpath; - char *dstpath; - struct stat st; - - /* No need to copy global tablespace */ - if (srctablespace == GLOBALTABLESPACE_OID) - continue; - - srcpath = GetDatabasePath(src_dboid, srctablespace); - - if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) || - directory_is_empty(srcpath)) - { - /* Assume we can ignore it */ - pfree(srcpath); - continue; - } - - if (srctablespace == src_deftablespace) - dsttablespace = dst_deftablespace; - else - dsttablespace = srctablespace; - - dstpath = GetDatabasePath(dboid, dsttablespace); - - /* - * Copy this subdirectory to the new location - * - * We don't need to copy subdirectories - */ - copydir(srcpath, dstpath, false); - - /* Record the filesystem change in XLOG */ - { - xl_dbase_create_rec xlrec; - - xlrec.db_id = dboid; - xlrec.tablespace_id = dsttablespace; - xlrec.src_db_id = src_dboid; - xlrec.src_tablespace_id = srctablespace; - - XLogBeginInsert(); - XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_create_rec)); - - (void) XLogInsert(RM_DBASE_ID, - XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE); - } - } - table_endscan(scan); - table_close(rel, AccessShareLock); - - /* - * We force a checkpoint before committing. This effectively means - * that committed XLOG_DBASE_CREATE operations will never need to be - * replayed (at least not in ordinary crash recovery; we still have to - * make the XLOG entry for the benefit of PITR operations). This - * avoids two nasty scenarios: - * - * #1: When PITR is off, we don't XLOG the contents of newly created - * indexes; therefore the drop-and-recreate-whole-directory behavior - * of DBASE_CREATE replay would lose such indexes. - * - * #2: Since we have to recopy the source database during DBASE_CREATE - * replay, we run the risk of copying changes in it that were - * committed after the original CREATE DATABASE command but before the - * system crash that led to the replay. This is at least unexpected - * and at worst could lead to inconsistencies, eg duplicate table - * names. - * - * (Both of these were real bugs in releases 8.0 through 8.0.3.) - * - * In PITR replay, the first of these isn't an issue, and the second - * is only a risk if the CREATE DATABASE and subsequent template - * database change both occur while a base backup is being taken. - * There doesn't seem to be much we can do about that except document - * it as a limitation. - * - * Perhaps if we ever implement CREATE DATABASE in a less cheesy way, - * we can avoid this. + * If the user has asked to create a database with WAL_LOG strategy + * then call CreateDatabaseUsingWalLog, which will copy the database + * at the block level and it will WAL log each copied block. + * Otherwise, call CreateDatabaseUsingFileCopy that will copy the + * database file by file. */ - RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT); + if (dbstrategy == CREATEDB_WAL_LOG) + CreateDatabaseUsingWalLog(src_dboid, dboid, src_deftablespace, + dst_deftablespace); + else + CreateDatabaseUsingFileCopy(src_dboid, dboid, src_deftablespace, + dst_deftablespace); /* * Close pg_database, but keep lock till commit. @@ -955,6 +1470,25 @@ createdb_failure_callback(int code, Datum arg) createdb_failure_params *fparms = (createdb_failure_params *) DatumGetPointer(arg); /* + * If we were copying database at block levels then drop pages for the + * destination database that are in the shared buffer cache. And tell + * checkpointer to forget any pending fsync and unlink requests for files + * in the database. The reasoning behind doing this is same as explained + * in dropdb function. But unlike dropdb we don't need to call + * pgstat_drop_database because this database is still not created so + * there should not be any stat for this. + */ + if (fparms->strategy == CREATEDB_WAL_LOG) + { + DropDatabaseBuffers(fparms->dest_dboid); + ForgetDatabaseSyncRequests(fparms->dest_dboid); + + /* Release lock on the target database. */ + UnlockSharedObject(DatabaseRelationId, fparms->dest_dboid, 0, + AccessShareLock); + } + + /* * Release lock on source database before doing recursive remove. This is * not essential but it seems desirable to release the lock as soon as * possible. @@ -1478,7 +2012,7 @@ movedb(const char *dbname, const char *tblspcname) * Record the filesystem change in XLOG */ { - xl_dbase_create_rec xlrec; + xl_dbase_create_file_copy_rec xlrec; xlrec.db_id = db_id; xlrec.tablespace_id = dst_tblspcoid; @@ -1486,10 +2020,11 @@ movedb(const char *dbname, const char *tblspcname) xlrec.src_tablespace_id = src_tblspcoid; XLogBeginInsert(); - XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_create_rec)); + XLogRegisterData((char *) &xlrec, + sizeof(xl_dbase_create_file_copy_rec)); (void) XLogInsert(RM_DBASE_ID, - XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE); + XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE); } /* @@ -1525,9 +2060,10 @@ movedb(const char *dbname, const char *tblspcname) /* * Force another checkpoint here. As in CREATE DATABASE, this is to - * ensure that we don't have to replay a committed XLOG_DBASE_CREATE - * operation, which would cause us to lose any unlogged operations - * done in the new DB tablespace before the next checkpoint. + * ensure that we don't have to replay a committed + * XLOG_DBASE_CREATE_FILE_COPY operation, which would cause us to lose + * any unlogged operations done in the new DB tablespace before the + * next checkpoint. */ RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT); @@ -2478,9 +3014,10 @@ dbase_redo(XLogReaderState *record) /* Backup blocks are not used in dbase records */ Assert(!XLogRecHasAnyBlockRefs(record)); - if (info == XLOG_DBASE_CREATE) + if (info == XLOG_DBASE_CREATE_FILE_COPY) { - xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record); + xl_dbase_create_file_copy_rec *xlrec = + (xl_dbase_create_file_copy_rec *) XLogRecGetData(record); char *src_path; char *dst_path; struct stat st; @@ -2515,6 +3052,18 @@ dbase_redo(XLogReaderState *record) */ copydir(src_path, dst_path, false); } + else if (info == XLOG_DBASE_CREATE_WAL_LOG) + { + xl_dbase_create_wal_log_rec *xlrec = + (xl_dbase_create_wal_log_rec *) XLogRecGetData(record); + char *dbpath; + + dbpath = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id); + + /* Create the database directory with the version file. */ + CreateDirAndVersionFile(dbpath, xlrec->db_id, xlrec->tablespace_id, + true); + } else if (info == XLOG_DBASE_DROP) { xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record); |