diff options
author | Andrew Dunstan <andrew@dunslane.net> | 2013-03-24 11:27:20 -0400 |
---|---|---|
committer | Andrew Dunstan <andrew@dunslane.net> | 2013-03-24 11:27:20 -0400 |
commit | 9e257a181cc1dc5e19eb5d770ce09cc98f470f5f (patch) | |
tree | a2b5c7a40cfe004d4838cd3be32e0177096fafbf /src/bin/pg_dump/pg_backup_custom.c | |
parent | 3b91fe185a71c05ac4528f93a39ba27232acc9e0 (diff) | |
download | postgresql-9e257a181cc1dc5e19eb5d770ce09cc98f470f5f.tar.gz postgresql-9e257a181cc1dc5e19eb5d770ce09cc98f470f5f.zip |
Add parallel pg_dump option.
New infrastructure is added which creates a set number of workers
(threads on Windows, forked processes on Unix). Jobs are then
handed out to these workers by the master process as needed.
pg_restore is adjusted to use this new infrastructure in place of the
old setup which created a new worker for each step on the fly. Parallel
dumps acquire a snapshot clone in order to stay consistent, if
available.
The parallel option is selected by the -j / --jobs command line
parameter of pg_dump.
Joachim Wieland, lightly editorialized by Andrew Dunstan.
Diffstat (limited to 'src/bin/pg_dump/pg_backup_custom.c')
-rw-r--r-- | src/bin/pg_dump/pg_backup_custom.c | 88 |
1 files changed, 87 insertions, 1 deletions
diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c index 7081598baaa..c2e94ca084a 100644 --- a/src/bin/pg_dump/pg_backup_custom.c +++ b/src/bin/pg_dump/pg_backup_custom.c @@ -26,6 +26,7 @@ #include "compress_io.h" #include "dumputils.h" +#include "parallel.h" /*-------- * Routines in the format interface @@ -59,6 +60,10 @@ static void _LoadBlobs(ArchiveHandle *AH, bool drop); static void _Clone(ArchiveHandle *AH); static void _DeClone(ArchiveHandle *AH); +static char *_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act); +static int _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act); +char *_WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te); + typedef struct { CompressorState *cs; @@ -127,6 +132,13 @@ InitArchiveFmt_Custom(ArchiveHandle *AH) AH->ClonePtr = _Clone; AH->DeClonePtr = _DeClone; + AH->MasterStartParallelItemPtr = _MasterStartParallelItem; + AH->MasterEndParallelItemPtr = _MasterEndParallelItem; + + /* no parallel dump in the custom archive, only parallel restore */ + AH->WorkerJobDumpPtr = NULL; + AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom; + /* Set up a private area. */ ctx = (lclContext *) pg_malloc0(sizeof(lclContext)); AH->formatData = (void *) ctx; @@ -698,7 +710,7 @@ _CloseArchive(ArchiveHandle *AH) tpos = ftello(AH->FH); WriteToc(AH); ctx->dataStart = _getFilePos(AH, ctx); - WriteDataChunks(AH); + WriteDataChunks(AH, NULL); /* * If possible, re-write the TOC in order to update the data offset @@ -796,6 +808,80 @@ _DeClone(ArchiveHandle *AH) free(ctx); } +/* + * This function is executed in the child of a parallel backup for the + * custom format archive and dumps the actual data. + */ +char * +_WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te) +{ + /* + * short fixed-size string + some ID so far, this needs to be malloc'ed + * instead of static because we work with threads on windows + */ + const int buflen = 64; + char *buf = (char *) pg_malloc(buflen); + ParallelArgs pargs; + int status; + + pargs.AH = AH; + pargs.te = te; + + status = parallel_restore(&pargs); + + snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status, + status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0); + + return buf; +} + +/* + * This function is executed in the parent process. Depending on the desired + * action (dump or restore) it creates a string that is understood by the + * _WorkerJobDump /_WorkerJobRestore functions of the dump format. + */ +static char * +_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act) +{ + /* + * A static char is okay here, even on Windows because we call this + * function only from one process (the master). + */ + static char buf[64]; /* short fixed-size string + number */ + + /* no parallel dump in the custom archive format */ + Assert(act == ACT_RESTORE); + + snprintf(buf, sizeof(buf), "RESTORE %d", te->dumpId); + + return buf; +} + +/* + * This function is executed in the parent process. It analyzes the response of + * the _WorkerJobDump / _WorkerJobRestore functions of the dump format. + */ +static int +_MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act) +{ + DumpId dumpId; + int nBytes, + status, + n_errors; + + /* no parallel dump in the custom archive */ + Assert(act == ACT_RESTORE); + + sscanf(str, "%u %u %u%n", &dumpId, &status, &n_errors, &nBytes); + + Assert(nBytes == strlen(str)); + Assert(dumpId == te->dumpId); + + AH->public.n_errors += n_errors; + + return status; +} + /*-------------------------------------------------- * END OF FORMAT CALLBACKS *-------------------------------------------------- |