aboutsummaryrefslogtreecommitdiff
path: root/src/bin/pg_dump/pg_backup_custom.c
diff options
context:
space:
mode:
authorAndrew Dunstan <andrew@dunslane.net>2013-03-24 11:27:20 -0400
committerAndrew Dunstan <andrew@dunslane.net>2013-03-24 11:27:20 -0400
commit9e257a181cc1dc5e19eb5d770ce09cc98f470f5f (patch)
treea2b5c7a40cfe004d4838cd3be32e0177096fafbf /src/bin/pg_dump/pg_backup_custom.c
parent3b91fe185a71c05ac4528f93a39ba27232acc9e0 (diff)
downloadpostgresql-9e257a181cc1dc5e19eb5d770ce09cc98f470f5f.tar.gz
postgresql-9e257a181cc1dc5e19eb5d770ce09cc98f470f5f.zip
Add parallel pg_dump option.
New infrastructure is added which creates a set number of workers (threads on Windows, forked processes on Unix). Jobs are then handed out to these workers by the master process as needed. pg_restore is adjusted to use this new infrastructure in place of the old setup which created a new worker for each step on the fly. Parallel dumps acquire a snapshot clone in order to stay consistent, if available. The parallel option is selected by the -j / --jobs command line parameter of pg_dump. Joachim Wieland, lightly editorialized by Andrew Dunstan.
Diffstat (limited to 'src/bin/pg_dump/pg_backup_custom.c')
-rw-r--r--src/bin/pg_dump/pg_backup_custom.c88
1 files changed, 87 insertions, 1 deletions
diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c
index 7081598baaa..c2e94ca084a 100644
--- a/src/bin/pg_dump/pg_backup_custom.c
+++ b/src/bin/pg_dump/pg_backup_custom.c
@@ -26,6 +26,7 @@
#include "compress_io.h"
#include "dumputils.h"
+#include "parallel.h"
/*--------
* Routines in the format interface
@@ -59,6 +60,10 @@ static void _LoadBlobs(ArchiveHandle *AH, bool drop);
static void _Clone(ArchiveHandle *AH);
static void _DeClone(ArchiveHandle *AH);
+static char *_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act);
+static int _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act);
+char *_WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te);
+
typedef struct
{
CompressorState *cs;
@@ -127,6 +132,13 @@ InitArchiveFmt_Custom(ArchiveHandle *AH)
AH->ClonePtr = _Clone;
AH->DeClonePtr = _DeClone;
+ AH->MasterStartParallelItemPtr = _MasterStartParallelItem;
+ AH->MasterEndParallelItemPtr = _MasterEndParallelItem;
+
+ /* no parallel dump in the custom archive, only parallel restore */
+ AH->WorkerJobDumpPtr = NULL;
+ AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom;
+
/* Set up a private area. */
ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
AH->formatData = (void *) ctx;
@@ -698,7 +710,7 @@ _CloseArchive(ArchiveHandle *AH)
tpos = ftello(AH->FH);
WriteToc(AH);
ctx->dataStart = _getFilePos(AH, ctx);
- WriteDataChunks(AH);
+ WriteDataChunks(AH, NULL);
/*
* If possible, re-write the TOC in order to update the data offset
@@ -796,6 +808,80 @@ _DeClone(ArchiveHandle *AH)
free(ctx);
}
+/*
+ * This function is executed in the child of a parallel backup for the
+ * custom format archive and dumps the actual data.
+ */
+char *
+_WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
+{
+ /*
+ * short fixed-size string + some ID so far, this needs to be malloc'ed
+ * instead of static because we work with threads on windows
+ */
+ const int buflen = 64;
+ char *buf = (char *) pg_malloc(buflen);
+ ParallelArgs pargs;
+ int status;
+
+ pargs.AH = AH;
+ pargs.te = te;
+
+ status = parallel_restore(&pargs);
+
+ snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status,
+ status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
+
+ return buf;
+}
+
+/*
+ * This function is executed in the parent process. Depending on the desired
+ * action (dump or restore) it creates a string that is understood by the
+ * _WorkerJobDump /_WorkerJobRestore functions of the dump format.
+ */
+static char *
+_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act)
+{
+ /*
+ * A static char is okay here, even on Windows because we call this
+ * function only from one process (the master).
+ */
+ static char buf[64]; /* short fixed-size string + number */
+
+ /* no parallel dump in the custom archive format */
+ Assert(act == ACT_RESTORE);
+
+ snprintf(buf, sizeof(buf), "RESTORE %d", te->dumpId);
+
+ return buf;
+}
+
+/*
+ * This function is executed in the parent process. It analyzes the response of
+ * the _WorkerJobDump / _WorkerJobRestore functions of the dump format.
+ */
+static int
+_MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act)
+{
+ DumpId dumpId;
+ int nBytes,
+ status,
+ n_errors;
+
+ /* no parallel dump in the custom archive */
+ Assert(act == ACT_RESTORE);
+
+ sscanf(str, "%u %u %u%n", &dumpId, &status, &n_errors, &nBytes);
+
+ Assert(nBytes == strlen(str));
+ Assert(dumpId == te->dumpId);
+
+ AH->public.n_errors += n_errors;
+
+ return status;
+}
+
/*--------------------------------------------------
* END OF FORMAT CALLBACKS
*--------------------------------------------------