aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/dbcommands.c
diff options
context:
space:
mode:
authorRobert Haas <rhaas@postgresql.org>2022-03-29 11:31:43 -0400
committerRobert Haas <rhaas@postgresql.org>2022-03-29 11:48:36 -0400
commit9c08aea6a3090a396be334cc58c511edab05776a (patch)
treec15e6e9fa45a18173a5bbd67ff4a4c889e616cde /src/backend/commands/dbcommands.c
parentbf902c13930c268388644100663f2998868b6e85 (diff)
downloadpostgresql-9c08aea6a3090a396be334cc58c511edab05776a.tar.gz
postgresql-9c08aea6a3090a396be334cc58c511edab05776a.zip
Add new block-by-block strategy for CREATE DATABASE.
Because this strategy logs changes on a block-by-block basis, it avoids the need to checkpoint before and after the operation. However, because it logs each changed block individually, it might generate a lot of extra write-ahead logging if the template database is large. Therefore, the older strategy remains available via a new STRATEGY parameter to CREATE DATABASE, and a corresponding --strategy option to createdb. Somewhat controversially, this patch assembles the list of relations to be copied to the new database by reading the pg_class relation of the template database. Cross-database access like this isn't normally possible, but it can be made to work here because there can't be any connections to the database being copied, nor can it contain any in-doubt transactions. Even so, we have to use lower-level interfaces than normal, since the table scan and relcache interfaces will not work for a database to which we're not connected. The advantage of this approach is that we do not need to rely on the filesystem to determine what ought to be copied, but instead on PostgreSQL's own knowledge of the database structure. This avoids, for example, copying stray files that happen to be located in the source database directory. Dilip Kumar, with a fairly large number of cosmetic changes by me. Reviewed and tested by Ashutosh Sharma, Andres Freund, John Naylor, Greg Nancarrow, Neha Sharma. Additional feedback from Bruce Momjian, Heikki Linnakangas, Julien Rouhaud, Adam Brusselback, Kyotaro Horiguchi, Tomas Vondra, Andrew Dunstan, Álvaro Herrera, and others. Discussion: http://postgr.es/m/CA+TgmoYtcdxBjLh31DLxUXHxFVMPGzrU5_T=CYCvRyFHywSBUQ@mail.gmail.com
Diffstat (limited to 'src/backend/commands/dbcommands.c')
-rw-r--r--src/backend/commands/dbcommands.c769
1 files changed, 659 insertions, 110 deletions
diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c
index 623e5ec7789..df16533901e 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -63,13 +63,31 @@
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/pg_locale.h"
+#include "utils/relmapper.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
+/*
+ * Create database strategy.
+ *
+ * CREATEDB_WAL_LOG will copy the database at the block level and WAL log each
+ * copied block.
+ *
+ * CREATEDB_FILE_COPY will simply perform a file system level copy of the
+ * database and log a single record for each tablespace copied. To make this
+ * safe, it also triggers checkpoints before and after the operation.
+ */
+typedef enum CreateDBStrategy
+{
+ CREATEDB_WAL_LOG,
+ CREATEDB_FILE_COPY
+} CreateDBStrategy;
+
typedef struct
{
Oid src_dboid; /* source (template) DB */
Oid dest_dboid; /* DB we are trying to create */
+ CreateDBStrategy strategy; /* create db strategy */
} createdb_failure_params;
typedef struct
@@ -78,6 +96,17 @@ typedef struct
Oid dest_tsoid; /* tablespace we are trying to move to */
} movedb_failure_params;
+/*
+ * Information about a relation to be copied when creating a database.
+ */
+typedef struct CreateDBRelInfo
+{
+ RelFileNode rnode; /* physical relation identifier */
+ Oid reloid; /* relation oid */
+ bool permanent; /* relation is permanent or unlogged */
+} CreateDBRelInfo;
+
+
/* non-export function prototypes */
static void createdb_failure_callback(int code, Datum arg);
static void movedb(const char *dbname, const char *tblspcname);
@@ -93,7 +122,546 @@ static bool have_createdb_privilege(void);
static void remove_dbtablespaces(Oid db_id);
static bool check_db_file_conflict(Oid db_id);
static int errdetail_busy_db(int notherbackends, int npreparedxacts);
+static void CreateDatabaseUsingWalLog(Oid src_dboid, Oid dboid, Oid src_tsid,
+ Oid dst_tsid);
+static List *ScanSourceDatabasePgClass(Oid srctbid, Oid srcdbid, char *srcpath);
+static List *ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid,
+ Oid dbid, char *srcpath,
+ List *rnodelist, Snapshot snapshot);
+static CreateDBRelInfo *ScanSourceDatabasePgClassTuple(HeapTupleData *tuple,
+ Oid tbid, Oid dbid,
+ char *srcpath);
+static void CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid,
+ bool isRedo);
+static void CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dboid, Oid src_tsid,
+ Oid dst_tsid);
+
+/*
+ * Create a new database using the WAL_LOG strategy.
+ *
+ * Each copied block is separately written to the write-ahead log.
+ */
+static void
+CreateDatabaseUsingWalLog(Oid src_dboid, Oid dst_dboid,
+ Oid src_tsid, Oid dst_tsid)
+{
+ char *srcpath;
+ char *dstpath;
+ List *rnodelist = NULL;
+ ListCell *cell;
+ LockRelId srcrelid;
+ LockRelId dstrelid;
+ RelFileNode srcrnode;
+ RelFileNode dstrnode;
+ CreateDBRelInfo *relinfo;
+
+ /* Get source and destination database paths. */
+ srcpath = GetDatabasePath(src_dboid, src_tsid);
+ dstpath = GetDatabasePath(dst_dboid, dst_tsid);
+
+ /* Create database directory and write PG_VERSION file. */
+ CreateDirAndVersionFile(dstpath, dst_dboid, dst_tsid, false);
+
+ /* Copy relmap file from source database to the destination database. */
+ RelationMapCopy(dst_dboid, dst_tsid, srcpath, dstpath);
+
+ /* Get list of relfilenodes to copy from the source database. */
+ rnodelist = ScanSourceDatabasePgClass(src_tsid, src_dboid, srcpath);
+ Assert(rnodelist != NIL);
+
+ /*
+ * Database IDs will be the same for all relations so set them before
+ * entering the loop.
+ */
+ srcrelid.dbId = src_dboid;
+ dstrelid.dbId = dst_dboid;
+
+ /* Loop over our list of relfilenodes and copy each one. */
+ foreach(cell, rnodelist)
+ {
+ relinfo = lfirst(cell);
+ srcrnode = relinfo->rnode;
+
+ /*
+ * If the relation is from the source db's default tablespace then we
+ * need to create it in the destinations db's default tablespace.
+ * Otherwise, we need to create in the same tablespace as it is in the
+ * source database.
+ */
+ if (srcrnode.spcNode == src_tsid)
+ dstrnode.spcNode = dst_tsid;
+ else
+ dstrnode.spcNode = srcrnode.spcNode;
+
+ dstrnode.dbNode = dst_dboid;
+ dstrnode.relNode = srcrnode.relNode;
+
+ /*
+ * Acquire locks on source and target relations before copying.
+ *
+ * We typically do not read relation data into shared_buffers without
+ * holding a relation lock. It's unclear what could go wrong if we
+ * skipped it in this case, because nobody can be modifying either
+ * the source or destination database at this point, and we have locks
+ * on both databases, too, but let's take the conservative route.
+ */
+ dstrelid.relId = srcrelid.relId = relinfo->reloid;
+ LockRelationId(&srcrelid, AccessShareLock);
+ LockRelationId(&dstrelid, AccessShareLock);
+
+ /* Copy relation storage from source to the destination. */
+ CreateAndCopyRelationData(srcrnode, dstrnode, relinfo->permanent);
+
+ /* Release the relation locks. */
+ UnlockRelationId(&srcrelid, AccessShareLock);
+ UnlockRelationId(&dstrelid, AccessShareLock);
+ }
+
+ list_free_deep(rnodelist);
+}
+
+/*
+ * Scan the pg_class table in the source database to identify the relations
+ * that need to be copied to the destination database.
+ *
+ * This is an exception to the usual rule that cross-database access is
+ * not possible. We can make it work here because we know that there are no
+ * connections to the source database and (since there can't be prepared
+ * transactions touching that database) no in-doubt tuples either. This
+ * means that we don't need to worry about pruning removing anything from
+ * under us, and we don't need to be too picky about our snapshot either.
+ * As long as it sees all previously-committed XIDs as committed and all
+ * aborted XIDs as aborted, we should be fine: nothing else is possible
+ * here.
+ *
+ * We can't rely on the relcache for anything here, because that only knows
+ * about the database to which we are connected, and can't handle access to
+ * other databases. That also means we can't rely on the heap scan
+ * infrastructure, which would be a bad idea anyway since it might try
+ * to do things like HOT pruning which we definitely can't do safely in
+ * a database to which we're not even connected.
+ */
+static List *
+ScanSourceDatabasePgClass(Oid tbid, Oid dbid, char *srcpath)
+{
+ RelFileNode rnode;
+ BlockNumber nblocks;
+ BlockNumber blkno;
+ Buffer buf;
+ Oid relfilenode;
+ Page page;
+ List *rnodelist = NIL;
+ LockRelId relid;
+ Relation rel;
+ Snapshot snapshot;
+ BufferAccessStrategy bstrategy;
+
+ /* Get pg_class relfilenode. */
+ relfilenode = RelationMapOidToFilenodeForDatabase(srcpath,
+ RelationRelationId);
+
+ /* Don't read data into shared_buffers without holding a relation lock. */
+ relid.dbId = dbid;
+ relid.relId = RelationRelationId;
+ LockRelationId(&relid, AccessShareLock);
+
+ /* Prepare a RelFileNode for the pg_class relation. */
+ rnode.spcNode = tbid;
+ rnode.dbNode = dbid;
+ rnode.relNode = relfilenode;
+
+ /*
+ * We can't use a real relcache entry for a relation in some other
+ * database, but since we're only going to access the fields related
+ * to physical storage, a fake one is good enough. If we didn't do this
+ * and used the smgr layer directly, we would have to worry about
+ * invalidations.
+ */
+ rel = CreateFakeRelcacheEntry(rnode);
+ nblocks = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM);
+ FreeFakeRelcacheEntry(rel);
+
+ /* Use a buffer access strategy since this is a bulk read operation. */
+ bstrategy = GetAccessStrategy(BAS_BULKREAD);
+
+ /*
+ * As explained in the function header comments, we need a snapshot that
+ * will see all committed transactions as committed, and our transaction
+ * snapshot - or the active snapshot - might not be new enough for that,
+ * but the return value of GetLatestSnapshot() should work fine.
+ */
+ snapshot = GetLatestSnapshot();
+
+ /* Process the relation block by block. */
+ for (blkno = 0; blkno < nblocks; blkno++)
+ {
+ CHECK_FOR_INTERRUPTS();
+
+ buf = ReadBufferWithoutRelcache(rnode, MAIN_FORKNUM, blkno,
+ RBM_NORMAL, bstrategy, false);
+
+ LockBuffer(buf, BUFFER_LOCK_SHARE);
+ page = BufferGetPage(buf);
+ if (PageIsNew(page) || PageIsEmpty(page))
+ {
+ UnlockReleaseBuffer(buf);
+ continue;
+ }
+
+ /* Append relevant pg_class tuples for current page to rnodelist. */
+ rnodelist = ScanSourceDatabasePgClassPage(page, buf, tbid, dbid,
+ srcpath, rnodelist,
+ snapshot);
+
+ UnlockReleaseBuffer(buf);
+ }
+
+ /* Release relation lock. */
+ UnlockRelationId(&relid, AccessShareLock);
+
+ return rnodelist;
+}
+
+/*
+ * Scan one page of the source database's pg_class relation and add relevant
+ * entries to rnodelist. The return value is the updated list.
+ */
+static List *
+ScanSourceDatabasePgClassPage(Page page, Buffer buf, Oid tbid, Oid dbid,
+ char *srcpath, List *rnodelist,
+ Snapshot snapshot)
+{
+ BlockNumber blkno = BufferGetBlockNumber(buf);
+ OffsetNumber offnum;
+ OffsetNumber maxoff;
+ HeapTupleData tuple;
+
+ maxoff = PageGetMaxOffsetNumber(page);
+
+ /* Loop over offsets. */
+ for (offnum = FirstOffsetNumber;
+ offnum <= maxoff;
+ offnum = OffsetNumberNext(offnum))
+ {
+ ItemId itemid;
+
+ itemid = PageGetItemId(page, offnum);
+
+ /* Nothing to do if slot is empty or already dead. */
+ if (!ItemIdIsUsed(itemid) || ItemIdIsDead(itemid) ||
+ ItemIdIsRedirected(itemid))
+ continue;
+
+ Assert(ItemIdIsNormal(itemid));
+ ItemPointerSet(&(tuple.t_self), blkno, offnum);
+
+ /* Initialize a HeapTupleData structure. */
+ tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
+ tuple.t_len = ItemIdGetLength(itemid);
+ tuple.t_tableOid = RelationRelationId;
+
+ /* Skip tuples that are not visible to this snapshot. */
+ if (HeapTupleSatisfiesVisibility(&tuple, snapshot, buf))
+ {
+ CreateDBRelInfo *relinfo;
+
+ /*
+ * ScanSourceDatabasePgClassTuple is in charge of constructing
+ * a CreateDBRelInfo object for this tuple, but can also decide
+ * that this tuple isn't something we need to copy. If we do need
+ * to copy the relation, add it to the list.
+ */
+ relinfo = ScanSourceDatabasePgClassTuple(&tuple, tbid, dbid,
+ srcpath);
+ if (relinfo != NULL)
+ rnodelist = lappend(rnodelist, relinfo);
+ }
+ }
+ return rnodelist;
+}
+
+/*
+ * Decide whether a certain pg_class tuple represents something that
+ * needs to be copied from the source database to the destination database,
+ * and if so, construct a CreateDBRelInfo for it.
+ *
+ * Visbility checks are handled by the caller, so our job here is just
+ * to assess the data stored in the tuple.
+ */
+CreateDBRelInfo *
+ScanSourceDatabasePgClassTuple(HeapTupleData *tuple, Oid tbid, Oid dbid,
+ char *srcpath)
+{
+ CreateDBRelInfo *relinfo;
+ Form_pg_class classForm;
+ Oid relfilenode = InvalidOid;
+
+ classForm = (Form_pg_class) GETSTRUCT(tuple);
+
+ /*
+ * Return NULL if this object does not need to be copied.
+ *
+ * Shared objects don't need to be copied, because they are shared.
+ * Objects without storage can't be copied, because there's nothing to
+ * copy. Temporary relations don't need to be copied either, because
+ * they are inaccessible outside of the session that created them,
+ * which must be gone already, and couldn't connect to a different database
+ * if it still existed. autovacuum will eventually remove the pg_class
+ * entries as well.
+ */
+ if (classForm->reltablespace == GLOBALTABLESPACE_OID ||
+ !RELKIND_HAS_STORAGE(classForm->relkind) ||
+ classForm->relpersistence == RELPERSISTENCE_TEMP)
+ return NULL;
+
+ /*
+ * If relfilenode is valid then directly use it. Otherwise, consult the
+ * relmap.
+ */
+ if (OidIsValid(classForm->relfilenode))
+ relfilenode = classForm->relfilenode;
+ else
+ relfilenode = RelationMapOidToFilenodeForDatabase(srcpath,
+ classForm->oid);
+
+ /* We must have a valid relfilenode oid. */
+ if (!OidIsValid(relfilenode))
+ elog(ERROR, "relation with OID %u does not have a valid relfilenode",
+ classForm->oid);
+
+ /* Prepare a rel info element and add it to the list. */
+ relinfo = (CreateDBRelInfo *) palloc(sizeof(CreateDBRelInfo));
+ if (OidIsValid(classForm->reltablespace))
+ relinfo->rnode.spcNode = classForm->reltablespace;
+ else
+ relinfo->rnode.spcNode = tbid;
+
+ relinfo->rnode.dbNode = dbid;
+ relinfo->rnode.relNode = relfilenode;
+ relinfo->reloid = classForm->oid;
+
+ /* Temporary relations were rejected above. */
+ Assert(classForm->relpersistence != RELPERSISTENCE_TEMP);
+ relinfo->permanent =
+ (classForm->relpersistence == RELPERSISTENCE_PERMANENT) ? true : false;
+
+ return relinfo;
+}
+
+/*
+ * Create database directory and write out the PG_VERSION file in the database
+ * path. If isRedo is true, it's okay for the database directory to exist
+ * already.
+ */
+static void
+CreateDirAndVersionFile(char *dbpath, Oid dbid, Oid tsid, bool isRedo)
+{
+ int fd;
+ int nbytes;
+ char versionfile[MAXPGPATH];
+ char buf[16];
+
+ /*
+ * Prepare version data before starting a critical section.
+ *
+ * Note that we don't have to copy this from the source database; there's
+ * only one legal value.
+ */
+ sprintf(buf, "%s\n", PG_MAJORVERSION);
+ nbytes = strlen(PG_MAJORVERSION) + 1;
+
+ /* If we are not in WAL replay then write the WAL. */
+ if (!isRedo)
+ {
+ xl_dbase_create_wal_log_rec xlrec;
+ XLogRecPtr lsn;
+
+ START_CRIT_SECTION();
+
+ xlrec.db_id = dbid;
+ xlrec.tablespace_id = tsid;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) (&xlrec),
+ sizeof(xl_dbase_create_wal_log_rec));
+
+ lsn = XLogInsert(RM_DBASE_ID, XLOG_DBASE_CREATE_WAL_LOG);
+
+ /* As always, WAL must hit the disk before the data update does. */
+ XLogFlush(lsn);
+ }
+
+ /* Create database directory. */
+ if (MakePGDirectory(dbpath) < 0)
+ {
+ /* Failure other than already exists or not in WAL replay? */
+ if (errno != EEXIST || !isRedo)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not create directory \"%s\": %m", dbpath)));
+ }
+
+ /*
+ * Create PG_VERSION file in the database path. If the file already
+ * exists and we are in WAL replay then try again to open it in write
+ * mode.
+ */
+ snprintf(versionfile, sizeof(versionfile), "%s/%s", dbpath, "PG_VERSION");
+
+ fd = OpenTransientFile(versionfile, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY);
+ if (fd < 0 && errno == EEXIST && isRedo)
+ fd = OpenTransientFile(versionfile, O_WRONLY | O_TRUNC | PG_BINARY);
+
+ if (fd < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not create file \"%s\": %m", versionfile)));
+
+ /* Write PG_MAJORVERSION in the PG_VERSION file. */
+ pgstat_report_wait_start(WAIT_EVENT_VERSION_FILE_WRITE);
+ errno = 0;
+ if ((int) write(fd, buf, nbytes) != nbytes)
+ {
+ /* If write didn't set errno, assume problem is no disk space. */
+ if (errno == 0)
+ errno = ENOSPC;
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write to file \"%s\": %m", versionfile)));
+ }
+ pgstat_report_wait_end();
+
+ /* Close the version file. */
+ CloseTransientFile(fd);
+
+ /* Critical section done. */
+ if (!isRedo)
+ END_CRIT_SECTION();
+}
+
+/*
+ * Create a new database using the FILE_COPY strategy.
+ *
+ * Copy each tablespace at the filesystem level, and log a single WAL record
+ * for each tablespace copied. This requires a checkpoint before and after the
+ * copy, which may be expensive, but it does greatly reduce WAL generation
+ * if the copied database is large.
+ */
+static void
+CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid, Oid src_tsid,
+ Oid dst_tsid)
+{
+ TableScanDesc scan;
+ Relation rel;
+ HeapTuple tuple;
+
+ /*
+ * Force a checkpoint before starting the copy. This will force all dirty
+ * buffers, including those of unlogged tables, out to disk, to ensure
+ * source database is up-to-date on disk for the copy.
+ * FlushDatabaseBuffers() would suffice for that, but we also want to
+ * process any pending unlink requests. Otherwise, if a checkpoint
+ * happened while we're copying files, a file might be deleted just when
+ * we're about to copy it, causing the lstat() call in copydir() to fail
+ * with ENOENT.
+ */
+ RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE |
+ CHECKPOINT_WAIT | CHECKPOINT_FLUSH_ALL);
+
+ /*
+ * Iterate through all tablespaces of the template database, and copy each
+ * one to the new database.
+ */
+ rel = table_open(TableSpaceRelationId, AccessShareLock);
+ scan = table_beginscan_catalog(rel, 0, NULL);
+ while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+ {
+ Form_pg_tablespace spaceform = (Form_pg_tablespace) GETSTRUCT(tuple);
+ Oid srctablespace = spaceform->oid;
+ Oid dsttablespace;
+ char *srcpath;
+ char *dstpath;
+ struct stat st;
+
+ /* No need to copy global tablespace */
+ if (srctablespace == GLOBALTABLESPACE_OID)
+ continue;
+
+ srcpath = GetDatabasePath(src_dboid, srctablespace);
+
+ if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
+ directory_is_empty(srcpath))
+ {
+ /* Assume we can ignore it */
+ pfree(srcpath);
+ continue;
+ }
+
+ if (srctablespace == src_tsid)
+ dsttablespace = dst_tsid;
+ else
+ dsttablespace = srctablespace;
+
+ dstpath = GetDatabasePath(dst_dboid, dsttablespace);
+
+ /*
+ * Copy this subdirectory to the new location
+ *
+ * We don't need to copy subdirectories
+ */
+ copydir(srcpath, dstpath, false);
+
+ /* Record the filesystem change in XLOG */
+ {
+ xl_dbase_create_file_copy_rec xlrec;
+
+ xlrec.db_id = dst_dboid;
+ xlrec.tablespace_id = dsttablespace;
+ xlrec.src_db_id = src_dboid;
+ xlrec.src_tablespace_id = srctablespace;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec,
+ sizeof(xl_dbase_create_file_copy_rec));
+
+ (void) XLogInsert(RM_DBASE_ID,
+ XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE);
+ }
+ }
+ table_endscan(scan);
+ table_close(rel, AccessShareLock);
+
+ /*
+ * We force a checkpoint before committing. This effectively means that
+ * committed XLOG_DBASE_CREATE_FILE_COPY operations will never need to be
+ * replayed (at least not in ordinary crash recovery; we still have to
+ * make the XLOG entry for the benefit of PITR operations). This avoids
+ * two nasty scenarios:
+ *
+ * #1: When PITR is off, we don't XLOG the contents of newly created
+ * indexes; therefore the drop-and-recreate-whole-directory behavior of
+ * DBASE_CREATE replay would lose such indexes.
+ *
+ * #2: Since we have to recopy the source database during DBASE_CREATE
+ * replay, we run the risk of copying changes in it that were committed
+ * after the original CREATE DATABASE command but before the system crash
+ * that led to the replay. This is at least unexpected and at worst could
+ * lead to inconsistencies, eg duplicate table names.
+ *
+ * (Both of these were real bugs in releases 8.0 through 8.0.3.)
+ *
+ * In PITR replay, the first of these isn't an issue, and the second is
+ * only a risk if the CREATE DATABASE and subsequent template database
+ * change both occur while a base backup is being taken. There doesn't
+ * seem to be much we can do about that except document it as a
+ * limitation.
+ *
+ * See CreateDatabaseUsingWalLog() for a less cheesy CREATE DATABASE
+ * strategy that avoids these problems.
+ */
+ RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
+}
/*
* CREATE DATABASE
@@ -101,8 +669,6 @@ static int errdetail_busy_db(int notherbackends, int npreparedxacts);
Oid
createdb(ParseState *pstate, const CreatedbStmt *stmt)
{
- TableScanDesc scan;
- Relation rel;
Oid src_dboid;
Oid src_owner;
int src_encoding = -1;
@@ -137,6 +703,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
DefElem *dallowconnections = NULL;
DefElem *dconnlimit = NULL;
DefElem *dcollversion = NULL;
+ DefElem *dstrategy = NULL;
char *dbname = stmt->dbname;
char *dbowner = NULL;
const char *dbtemplate = NULL;
@@ -152,6 +719,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
char *dbcollversion = NULL;
int notherbackends;
int npreparedxacts;
+ CreateDBStrategy dbstrategy = CREATEDB_WAL_LOG;
createdb_failure_params fparms;
/* Extract options from the statement node tree */
@@ -269,6 +837,12 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
(errcode(ERRCODE_INVALID_PARAMETER_VALUE)),
errmsg("OIDs less than %u are reserved for system objects", FirstNormalObjectId));
}
+ else if (strcmp(defel->defname, "strategy") == 0)
+ {
+ if (dstrategy)
+ errorConflictingDefElem(defel, pstate);
+ dstrategy = defel;
+ }
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -413,6 +987,23 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
dbtemplate)));
}
+ /* Validate the database creation strategy. */
+ if (dstrategy && dstrategy->arg)
+ {
+ char *strategy;
+
+ strategy = defGetString(dstrategy);
+ if (strcmp(strategy, "wal_log") == 0)
+ dbstrategy = CREATEDB_WAL_LOG;
+ else if (strcmp(strategy, "file_copy") == 0)
+ dbstrategy = CREATEDB_FILE_COPY;
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid create database strategy %s", strategy),
+ errhint("Valid strategies are \"wal_log\", and \"file_copy\".")));
+ }
+
/* If encoding or locales are defaulted, use source's setting */
if (encoding < 0)
encoding = src_encoding;
@@ -753,17 +1344,18 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
InvokeObjectPostCreateHook(DatabaseRelationId, dboid, 0);
/*
- * Force a checkpoint before starting the copy. This will force all dirty
- * buffers, including those of unlogged tables, out to disk, to ensure
- * source database is up-to-date on disk for the copy.
- * FlushDatabaseBuffers() would suffice for that, but we also want to
- * process any pending unlink requests. Otherwise, if a checkpoint
- * happened while we're copying files, a file might be deleted just when
- * we're about to copy it, causing the lstat() call in copydir() to fail
- * with ENOENT.
+ * If we're going to be reading data for the to-be-created database
+ * into shared_buffers, take a lock on it. Nobody should know that this
+ * database exists yet, but it's good to maintain the invariant that a
+ * lock an AccessExclusiveLock on the database is sufficient to drop all
+ * of its buffers without worrying about more being read later.
+ *
+ * Note that we need to do this before entering the PG_ENSURE_ERROR_CLEANUP
+ * block below, because createdb_failure_callback expects this lock to
+ * be held already.
*/
- RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
- | CHECKPOINT_FLUSH_ALL);
+ if (dbstrategy == CREATEDB_WAL_LOG)
+ LockSharedObject(DatabaseRelationId, dboid, 0, AccessShareLock);
/*
* Once we start copying subdirectories, we need to be able to clean 'em
@@ -774,101 +1366,24 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
*/
fparms.src_dboid = src_dboid;
fparms.dest_dboid = dboid;
+ fparms.strategy = dbstrategy;
+
PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
PointerGetDatum(&fparms));
{
/*
- * Iterate through all tablespaces of the template database, and copy
- * each one to the new database.
- */
- rel = table_open(TableSpaceRelationId, AccessShareLock);
- scan = table_beginscan_catalog(rel, 0, NULL);
- while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
- {
- Form_pg_tablespace spaceform = (Form_pg_tablespace) GETSTRUCT(tuple);
- Oid srctablespace = spaceform->oid;
- Oid dsttablespace;
- char *srcpath;
- char *dstpath;
- struct stat st;
-
- /* No need to copy global tablespace */
- if (srctablespace == GLOBALTABLESPACE_OID)
- continue;
-
- srcpath = GetDatabasePath(src_dboid, srctablespace);
-
- if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
- directory_is_empty(srcpath))
- {
- /* Assume we can ignore it */
- pfree(srcpath);
- continue;
- }
-
- if (srctablespace == src_deftablespace)
- dsttablespace = dst_deftablespace;
- else
- dsttablespace = srctablespace;
-
- dstpath = GetDatabasePath(dboid, dsttablespace);
-
- /*
- * Copy this subdirectory to the new location
- *
- * We don't need to copy subdirectories
- */
- copydir(srcpath, dstpath, false);
-
- /* Record the filesystem change in XLOG */
- {
- xl_dbase_create_rec xlrec;
-
- xlrec.db_id = dboid;
- xlrec.tablespace_id = dsttablespace;
- xlrec.src_db_id = src_dboid;
- xlrec.src_tablespace_id = srctablespace;
-
- XLogBeginInsert();
- XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_create_rec));
-
- (void) XLogInsert(RM_DBASE_ID,
- XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE);
- }
- }
- table_endscan(scan);
- table_close(rel, AccessShareLock);
-
- /*
- * We force a checkpoint before committing. This effectively means
- * that committed XLOG_DBASE_CREATE operations will never need to be
- * replayed (at least not in ordinary crash recovery; we still have to
- * make the XLOG entry for the benefit of PITR operations). This
- * avoids two nasty scenarios:
- *
- * #1: When PITR is off, we don't XLOG the contents of newly created
- * indexes; therefore the drop-and-recreate-whole-directory behavior
- * of DBASE_CREATE replay would lose such indexes.
- *
- * #2: Since we have to recopy the source database during DBASE_CREATE
- * replay, we run the risk of copying changes in it that were
- * committed after the original CREATE DATABASE command but before the
- * system crash that led to the replay. This is at least unexpected
- * and at worst could lead to inconsistencies, eg duplicate table
- * names.
- *
- * (Both of these were real bugs in releases 8.0 through 8.0.3.)
- *
- * In PITR replay, the first of these isn't an issue, and the second
- * is only a risk if the CREATE DATABASE and subsequent template
- * database change both occur while a base backup is being taken.
- * There doesn't seem to be much we can do about that except document
- * it as a limitation.
- *
- * Perhaps if we ever implement CREATE DATABASE in a less cheesy way,
- * we can avoid this.
+ * If the user has asked to create a database with WAL_LOG strategy
+ * then call CreateDatabaseUsingWalLog, which will copy the database
+ * at the block level and it will WAL log each copied block.
+ * Otherwise, call CreateDatabaseUsingFileCopy that will copy the
+ * database file by file.
*/
- RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
+ if (dbstrategy == CREATEDB_WAL_LOG)
+ CreateDatabaseUsingWalLog(src_dboid, dboid, src_deftablespace,
+ dst_deftablespace);
+ else
+ CreateDatabaseUsingFileCopy(src_dboid, dboid, src_deftablespace,
+ dst_deftablespace);
/*
* Close pg_database, but keep lock till commit.
@@ -955,6 +1470,25 @@ createdb_failure_callback(int code, Datum arg)
createdb_failure_params *fparms = (createdb_failure_params *) DatumGetPointer(arg);
/*
+ * If we were copying database at block levels then drop pages for the
+ * destination database that are in the shared buffer cache. And tell
+ * checkpointer to forget any pending fsync and unlink requests for files
+ * in the database. The reasoning behind doing this is same as explained
+ * in dropdb function. But unlike dropdb we don't need to call
+ * pgstat_drop_database because this database is still not created so
+ * there should not be any stat for this.
+ */
+ if (fparms->strategy == CREATEDB_WAL_LOG)
+ {
+ DropDatabaseBuffers(fparms->dest_dboid);
+ ForgetDatabaseSyncRequests(fparms->dest_dboid);
+
+ /* Release lock on the target database. */
+ UnlockSharedObject(DatabaseRelationId, fparms->dest_dboid, 0,
+ AccessShareLock);
+ }
+
+ /*
* Release lock on source database before doing recursive remove. This is
* not essential but it seems desirable to release the lock as soon as
* possible.
@@ -1478,7 +2012,7 @@ movedb(const char *dbname, const char *tblspcname)
* Record the filesystem change in XLOG
*/
{
- xl_dbase_create_rec xlrec;
+ xl_dbase_create_file_copy_rec xlrec;
xlrec.db_id = db_id;
xlrec.tablespace_id = dst_tblspcoid;
@@ -1486,10 +2020,11 @@ movedb(const char *dbname, const char *tblspcname)
xlrec.src_tablespace_id = src_tblspcoid;
XLogBeginInsert();
- XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_create_rec));
+ XLogRegisterData((char *) &xlrec,
+ sizeof(xl_dbase_create_file_copy_rec));
(void) XLogInsert(RM_DBASE_ID,
- XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE);
+ XLOG_DBASE_CREATE_FILE_COPY | XLR_SPECIAL_REL_UPDATE);
}
/*
@@ -1525,9 +2060,10 @@ movedb(const char *dbname, const char *tblspcname)
/*
* Force another checkpoint here. As in CREATE DATABASE, this is to
- * ensure that we don't have to replay a committed XLOG_DBASE_CREATE
- * operation, which would cause us to lose any unlogged operations
- * done in the new DB tablespace before the next checkpoint.
+ * ensure that we don't have to replay a committed
+ * XLOG_DBASE_CREATE_FILE_COPY operation, which would cause us to lose
+ * any unlogged operations done in the new DB tablespace before the
+ * next checkpoint.
*/
RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
@@ -2478,9 +3014,10 @@ dbase_redo(XLogReaderState *record)
/* Backup blocks are not used in dbase records */
Assert(!XLogRecHasAnyBlockRefs(record));
- if (info == XLOG_DBASE_CREATE)
+ if (info == XLOG_DBASE_CREATE_FILE_COPY)
{
- xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
+ xl_dbase_create_file_copy_rec *xlrec =
+ (xl_dbase_create_file_copy_rec *) XLogRecGetData(record);
char *src_path;
char *dst_path;
struct stat st;
@@ -2515,6 +3052,18 @@ dbase_redo(XLogReaderState *record)
*/
copydir(src_path, dst_path, false);
}
+ else if (info == XLOG_DBASE_CREATE_WAL_LOG)
+ {
+ xl_dbase_create_wal_log_rec *xlrec =
+ (xl_dbase_create_wal_log_rec *) XLogRecGetData(record);
+ char *dbpath;
+
+ dbpath = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
+
+ /* Create the database directory with the version file. */
+ CreateDirAndVersionFile(dbpath, xlrec->db_id, xlrec->tablespace_id,
+ true);
+ }
else if (info == XLOG_DBASE_DROP)
{
xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);