diff options
Diffstat (limited to 'src/bin/pg_dump/parallel.c')
-rw-r--r-- | src/bin/pg_dump/parallel.c | 232 |
1 files changed, 145 insertions, 87 deletions
diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c index 634b4444f9f..0e2bfa106a7 100644 --- a/src/bin/pg_dump/parallel.c +++ b/src/bin/pg_dump/parallel.c @@ -20,32 +20,25 @@ * the desired number of worker processes, which each enter WaitForCommands(). * * The master process dispatches an individual work item to one of the worker - * processes in DispatchJobForTocEntry(). That calls - * AH->MasterStartParallelItemPtr, a routine of the output format. This - * function's arguments are the parents archive handle AH (containing the full - * catalog information), the TocEntry that the worker should work on and a - * T_Action value indicating whether this is a backup or a restore task. The - * function simply converts the TocEntry assignment into a command string that - * is then sent over to the worker process. In the simplest case that would be - * something like "DUMP 1234", with 1234 being the TocEntry id. - * + * processes in DispatchJobForTocEntry(). We send a command string such as + * "DUMP 1234" or "RESTORE 1234", where 1234 is the TocEntry ID. * The worker process receives and decodes the command and passes it to the * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr, * which are routines of the current archive format. That routine performs - * the required action (dump or restore) and returns a malloc'd status string. - * The status string is passed back to the master where it is interpreted by - * AH->MasterEndParallelItemPtr, another format-specific routine. That - * function can update format-specific information on the master's side, - * depending on the reply from the worker process. In the end it returns a - * status code, which we pass to the ParallelCompletionPtr callback function - * that was passed to DispatchJobForTocEntry(). The callback function does - * state updating for the master control logic in pg_backup_archiver.c. + * the required action (dump or restore) and returns an integer status code. + * This is passed back to the master where we pass it to the + * ParallelCompletionPtr callback function that was passed to + * DispatchJobForTocEntry(). The callback function does state updating + * for the master control logic in pg_backup_archiver.c. * - * Remember that we have forked off the workers only after we have read in - * the catalog. That's why our worker processes can also access the catalog - * information. (In the Windows case, the workers are threads in the same - * process. To avoid problems, they work with cloned copies of the Archive - * data structure; see RunWorker().) + * In principle additional archive-format-specific information might be needed + * in commands or worker status responses, but so far that hasn't proved + * necessary, since workers have full copies of the ArchiveHandle/TocEntry + * data structures. Remember that we have forked off the workers only after + * we have read in the catalog. That's why our worker processes can also + * access the catalog information. (In the Windows case, the workers are + * threads in the same process. To avoid problems, they work with cloned + * copies of the Archive data structure; see RunWorker().) * * In the master process, the workerStatus field for each worker has one of * the following values: @@ -1074,6 +1067,110 @@ ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate) } /* + * These next four functions handle construction and parsing of the command + * strings and response strings for parallel workers. + * + * Currently, these can be the same regardless of which archive format we are + * processing. In future, we might want to let format modules override these + * functions to add format-specific data to a command or response. + */ + +/* + * buildWorkerCommand: format a command string to send to a worker. + * + * The string is built in the caller-supplied buffer of size buflen. + */ +static void +buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act, + char *buf, int buflen) +{ + if (act == ACT_DUMP) + snprintf(buf, buflen, "DUMP %d", te->dumpId); + else if (act == ACT_RESTORE) + snprintf(buf, buflen, "RESTORE %d", te->dumpId); + else + Assert(false); +} + +/* + * parseWorkerCommand: interpret a command string in a worker. + */ +static void +parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act, + const char *msg) +{ + DumpId dumpId; + int nBytes; + + if (messageStartsWith(msg, "DUMP ")) + { + *act = ACT_DUMP; + sscanf(msg, "DUMP %d%n", &dumpId, &nBytes); + Assert(nBytes == strlen(msg)); + *te = getTocEntryByDumpId(AH, dumpId); + Assert(*te != NULL); + } + else if (messageStartsWith(msg, "RESTORE ")) + { + *act = ACT_RESTORE; + sscanf(msg, "RESTORE %d%n", &dumpId, &nBytes); + Assert(nBytes == strlen(msg)); + *te = getTocEntryByDumpId(AH, dumpId); + Assert(*te != NULL); + } + else + exit_horribly(modulename, + "unrecognized command received from master: \"%s\"\n", + msg); +} + +/* + * buildWorkerResponse: format a response string to send to the master. + * + * The string is built in the caller-supplied buffer of size buflen. + */ +static void +buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status, + char *buf, int buflen) +{ + snprintf(buf, buflen, "OK %d %d %d", + te->dumpId, + status, + status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0); +} + +/* + * parseWorkerResponse: parse the status message returned by a worker. + * + * Returns the integer status code, and may update fields of AH and/or te. + */ +static int +parseWorkerResponse(ArchiveHandle *AH, TocEntry *te, + const char *msg) +{ + DumpId dumpId; + int nBytes, + n_errors; + int status = 0; + + if (messageStartsWith(msg, "OK ")) + { + sscanf(msg, "OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes); + + Assert(dumpId == te->dumpId); + Assert(nBytes == strlen(msg)); + + AH->public.n_errors += n_errors; + } + else + exit_horribly(modulename, + "invalid message received from worker: \"%s\"\n", + msg); + + return status; +} + +/* * Dispatch a job to some free worker. * * te is the TocEntry to be processed, act is the action to be taken on it. @@ -1091,18 +1188,16 @@ DispatchJobForTocEntry(ArchiveHandle *AH, void *callback_data) { int worker; - char *arg; + char buf[256]; /* Get a worker, waiting if none are idle */ while ((worker = GetIdleWorker(pstate)) == NO_SLOT) WaitForWorkers(AH, pstate, WFW_ONE_IDLE); /* Construct and send command string */ - arg = (AH->MasterStartParallelItemPtr) (AH, te, act); - - sendMessageToWorker(pstate, worker, arg); + buildWorkerCommand(AH, te, act, buf, sizeof(buf)); - /* XXX aren't we leaking string here? (no, because it's static. Ick.) */ + sendMessageToWorker(pstate, worker, buf); /* Remember worker is busy, and which TocEntry it's working on */ pstate->parallelSlot[worker].workerStatus = WRKR_WORKING; @@ -1220,10 +1315,10 @@ static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]) { char *command; - DumpId dumpId; - int nBytes; - char *str; TocEntry *te; + T_Action act; + int status = 0; + char buf[256]; for (;;) { @@ -1233,47 +1328,29 @@ WaitForCommands(ArchiveHandle *AH, int pipefd[2]) return; } - if (messageStartsWith(command, "DUMP ")) - { - /* Decode the command */ - sscanf(command + strlen("DUMP "), "%d%n", &dumpId, &nBytes); - Assert(nBytes == strlen(command) - strlen("DUMP ")); - te = getTocEntryByDumpId(AH, dumpId); - Assert(te != NULL); + /* Decode the command */ + parseWorkerCommand(AH, &te, &act, command); + if (act == ACT_DUMP) + { /* Acquire lock on this table within the worker's session */ lockTableForWorker(AH, te); /* Perform the dump command */ - str = (AH->WorkerJobDumpPtr) (AH, te); - - /* Return status to master */ - sendMessageToMaster(pipefd, str); - - /* we are responsible for freeing the status string */ - free(str); + status = (AH->WorkerJobDumpPtr) (AH, te); } - else if (messageStartsWith(command, "RESTORE ")) + else if (act == ACT_RESTORE) { - /* Decode the command */ - sscanf(command + strlen("RESTORE "), "%d%n", &dumpId, &nBytes); - Assert(nBytes == strlen(command) - strlen("RESTORE ")); - te = getTocEntryByDumpId(AH, dumpId); - Assert(te != NULL); - /* Perform the restore command */ - str = (AH->WorkerJobRestorePtr) (AH, te); - - /* Return status to master */ - sendMessageToMaster(pipefd, str); - - /* we are responsible for freeing the status string */ - free(str); + status = (AH->WorkerJobRestorePtr) (AH, te); } else - exit_horribly(modulename, - "unrecognized command received from master: \"%s\"\n", - command); + Assert(false); + + /* Return status to master */ + buildWorkerResponse(AH, te, act, status, buf, sizeof(buf)); + + sendMessageToMaster(pipefd, buf); /* command was pg_malloc'd and we are responsible for free()ing it. */ free(command); @@ -1286,9 +1363,9 @@ WaitForCommands(ArchiveHandle *AH, int pipefd[2]) * If do_wait is true, wait to get a status message; otherwise, just return * immediately if there is none available. * - * When we get a status message, we let MasterEndParallelItemPtr process it, - * then pass the resulting status code to the callback function that was - * specified to DispatchJobForTocEntry, then reset the worker status to IDLE. + * When we get a status message, we pass the status code to the callback + * function that was specified to DispatchJobForTocEntry, then reset the + * worker status to IDLE. * * Returns true if we collected a status message, else false. * @@ -1318,29 +1395,10 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait) { ParallelSlot *slot = &pstate->parallelSlot[worker]; TocEntry *te = slot->te; - char *statusString; int status; - if (messageStartsWith(msg, "OK RESTORE ")) - { - statusString = msg + strlen("OK RESTORE "); - status = - (AH->MasterEndParallelItemPtr) - (AH, te, statusString, ACT_RESTORE); - slot->callback(AH, te, status, slot->callback_data); - } - else if (messageStartsWith(msg, "OK DUMP ")) - { - statusString = msg + strlen("OK DUMP "); - status = - (AH->MasterEndParallelItemPtr) - (AH, te, statusString, ACT_DUMP); - slot->callback(AH, te, status, slot->callback_data); - } - else - exit_horribly(modulename, - "invalid message received from worker: \"%s\"\n", - msg); + status = parseWorkerResponse(AH, te, msg); + slot->callback(AH, te, status, slot->callback_data); slot->workerStatus = WRKR_IDLE; slot->te = NULL; } @@ -1364,8 +1422,8 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait) * WFW_ONE_IDLE: wait for at least one worker to be idle * WFW_ALL_IDLE: wait for all workers to be idle * - * Any received results are passed to MasterEndParallelItemPtr and then - * to the callback specified to DispatchJobForTocEntry. + * Any received results are passed to the callback specified to + * DispatchJobForTocEntry. * * This function is executed in the master process. */ |