diff options
Diffstat (limited to 'src/bin/pg_dump/pg_backup_custom.c')
-rw-r--r-- | src/bin/pg_dump/pg_backup_custom.c | 88 |
1 files changed, 87 insertions, 1 deletions
diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c index 7081598baaa..c2e94ca084a 100644 --- a/src/bin/pg_dump/pg_backup_custom.c +++ b/src/bin/pg_dump/pg_backup_custom.c @@ -26,6 +26,7 @@ #include "compress_io.h" #include "dumputils.h" +#include "parallel.h" /*-------- * Routines in the format interface @@ -59,6 +60,10 @@ static void _LoadBlobs(ArchiveHandle *AH, bool drop); static void _Clone(ArchiveHandle *AH); static void _DeClone(ArchiveHandle *AH); +static char *_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act); +static int _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act); +char *_WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te); + typedef struct { CompressorState *cs; @@ -127,6 +132,13 @@ InitArchiveFmt_Custom(ArchiveHandle *AH) AH->ClonePtr = _Clone; AH->DeClonePtr = _DeClone; + AH->MasterStartParallelItemPtr = _MasterStartParallelItem; + AH->MasterEndParallelItemPtr = _MasterEndParallelItem; + + /* no parallel dump in the custom archive, only parallel restore */ + AH->WorkerJobDumpPtr = NULL; + AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom; + /* Set up a private area. */ ctx = (lclContext *) pg_malloc0(sizeof(lclContext)); AH->formatData = (void *) ctx; @@ -698,7 +710,7 @@ _CloseArchive(ArchiveHandle *AH) tpos = ftello(AH->FH); WriteToc(AH); ctx->dataStart = _getFilePos(AH, ctx); - WriteDataChunks(AH); + WriteDataChunks(AH, NULL); /* * If possible, re-write the TOC in order to update the data offset @@ -796,6 +808,80 @@ _DeClone(ArchiveHandle *AH) free(ctx); } +/* + * This function is executed in the child of a parallel backup for the + * custom format archive and dumps the actual data. + */ +char * +_WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te) +{ + /* + * short fixed-size string + some ID so far, this needs to be malloc'ed + * instead of static because we work with threads on windows + */ + const int buflen = 64; + char *buf = (char *) pg_malloc(buflen); + ParallelArgs pargs; + int status; + + pargs.AH = AH; + pargs.te = te; + + status = parallel_restore(&pargs); + + snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status, + status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0); + + return buf; +} + +/* + * This function is executed in the parent process. Depending on the desired + * action (dump or restore) it creates a string that is understood by the + * _WorkerJobDump /_WorkerJobRestore functions of the dump format. + */ +static char * +_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act) +{ + /* + * A static char is okay here, even on Windows because we call this + * function only from one process (the master). + */ + static char buf[64]; /* short fixed-size string + number */ + + /* no parallel dump in the custom archive format */ + Assert(act == ACT_RESTORE); + + snprintf(buf, sizeof(buf), "RESTORE %d", te->dumpId); + + return buf; +} + +/* + * This function is executed in the parent process. It analyzes the response of + * the _WorkerJobDump / _WorkerJobRestore functions of the dump format. + */ +static int +_MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act) +{ + DumpId dumpId; + int nBytes, + status, + n_errors; + + /* no parallel dump in the custom archive */ + Assert(act == ACT_RESTORE); + + sscanf(str, "%u %u %u%n", &dumpId, &status, &n_errors, &nBytes); + + Assert(nBytes == strlen(str)); + Assert(dumpId == te->dumpId); + + AH->public.n_errors += n_errors; + + return status; +} + /*-------------------------------------------------- * END OF FORMAT CALLBACKS *-------------------------------------------------- |