diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2016-09-27 13:22:39 -0400 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2016-09-27 13:22:39 -0400 |
commit | b7b8cc0cfcf1c956b752f3e25894f9ad607583b7 (patch) | |
tree | 417d64b3faa85c5285f10b110273781298a57ec8 /src/bin/pg_dump/parallel.c | |
parent | f31a931fade868d788ef4480c59753a2d5059246 (diff) | |
download | postgresql-b7b8cc0cfcf1c956b752f3e25894f9ad607583b7.tar.gz postgresql-b7b8cc0cfcf1c956b752f3e25894f9ad607583b7.zip |
Redesign parallel dump/restore's wait-for-workers logic.
The ListenToWorkers/ReapWorkerStatus APIs were messy and hard to use.
Instead, make DispatchJobForTocEntry register a callback function that
will take care of state cleanup, doing whatever had been done by the caller
of ReapWorkerStatus in the old design. (This callback is essentially just
the old mark_work_done function in the restore case, and a trivial test for
worker failure in the dump case.) Then we can have ListenToWorkers call
the callback immediately on receipt of a status message, and return the
worker to WRKR_IDLE state; so the WRKR_FINISHED state goes away.
This allows us to design a unified wait-for-worker-messages loop:
WaitForWorkers replaces EnsureIdleWorker and EnsureWorkersFinished as well
as the mess in restore_toc_entries_parallel. Also, we no longer need the
fragile API spec that the caller of DispatchJobForTocEntry is responsible
for ensuring there's an idle worker, since DispatchJobForTocEntry can just
wait until there is one.
In passing, I got rid of the ParallelArgs struct, which was a net negative
in terms of notational verboseness, and didn't seem to be providing any
noticeable amount of abstraction either.
Tom Lane, reviewed by Kevin Grittner
Discussion: <1188.1464544443@sss.pgh.pa.us>
Diffstat (limited to 'src/bin/pg_dump/parallel.c')
-rw-r--r-- | src/bin/pg_dump/parallel.c | 228 |
1 files changed, 107 insertions, 121 deletions
diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c index bfd023f3e1f..634b4444f9f 100644 --- a/src/bin/pg_dump/parallel.c +++ b/src/bin/pg_dump/parallel.c @@ -35,9 +35,11 @@ * the required action (dump or restore) and returns a malloc'd status string. * The status string is passed back to the master where it is interpreted by * AH->MasterEndParallelItemPtr, another format-specific routine. That - * function can update state or catalog information on the master's side, + * function can update format-specific information on the master's side, * depending on the reply from the worker process. In the end it returns a - * status code, which is 0 for successful execution. + * status code, which we pass to the ParallelCompletionPtr callback function + * that was passed to DispatchJobForTocEntry(). The callback function does + * state updating for the master control logic in pg_backup_archiver.c. * * Remember that we have forked off the workers only after we have read in * the catalog. That's why our worker processes can also access the catalog @@ -48,13 +50,8 @@ * In the master process, the workerStatus field for each worker has one of * the following values: * WRKR_IDLE: it's waiting for a command - * WRKR_WORKING: it's been sent a command - * WRKR_FINISHED: it's returned a result + * WRKR_WORKING: it's working on a command * WRKR_TERMINATED: process ended - * The FINISHED state indicates that the worker is idle, but we've not yet - * dealt with the status code it returned from the prior command. - * ReapWorkerStatus() extracts the unhandled command status value and sets - * the workerStatus back to WRKR_IDLE. */ #include "postgres_fe.h" @@ -79,6 +76,8 @@ #define PIPE_READ 0 #define PIPE_WRITE 1 +#define NO_SLOT (-1) /* Failure result for GetIdleWorker() */ + #ifdef WIN32 /* @@ -175,9 +174,12 @@ static void setup_cancel_handler(void); static void set_cancel_pstate(ParallelState *pstate); static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH); static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot); +static int GetIdleWorker(ParallelState *pstate); static bool HasEveryWorkerTerminated(ParallelState *pstate); static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te); static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]); +static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, + bool do_wait); static char *getMessageFromMaster(int pipefd[2]); static void sendMessageToMaster(int pipefd[2], const char *str); static int select_loop(int maxFd, fd_set *workerset); @@ -349,8 +351,8 @@ archive_close_connection(int code, void *arg) * fail to detect it because there would be no EOF condition on * the other end of the pipe.) */ - if (slot->args->AH) - DisconnectDatabase(&(slot->args->AH->public)); + if (slot->AH) + DisconnectDatabase(&(slot->AH->public)); #ifdef WIN32 closesocket(slot->pipeRevRead); @@ -407,7 +409,7 @@ ShutdownWorkersHard(ParallelState *pstate) EnterCriticalSection(&signal_info_lock); for (i = 0; i < pstate->numWorkers; i++) { - ArchiveHandle *AH = pstate->parallelSlot[i].args->AH; + ArchiveHandle *AH = pstate->parallelSlot[i].AH; char errbuf[1]; if (AH != NULL && AH->connCancel != NULL) @@ -634,7 +636,7 @@ consoleHandler(DWORD dwCtrlType) for (i = 0; i < signal_info.pstate->numWorkers; i++) { ParallelSlot *slot = &(signal_info.pstate->parallelSlot[i]); - ArchiveHandle *AH = slot->args->AH; + ArchiveHandle *AH = slot->AH; HANDLE hThread = (HANDLE) slot->hThread; /* @@ -789,7 +791,7 @@ set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH) EnterCriticalSection(&signal_info_lock); #endif - slot->args->AH = AH; + slot->AH = AH; #ifdef WIN32 LeaveCriticalSection(&signal_info_lock); @@ -935,9 +937,10 @@ ParallelBackupStart(ArchiveHandle *AH) strerror(errno)); slot->workerStatus = WRKR_IDLE; - slot->args = (ParallelArgs *) pg_malloc(sizeof(ParallelArgs)); - slot->args->AH = NULL; - slot->args->te = NULL; + slot->AH = NULL; + slot->te = NULL; + slot->callback = NULL; + slot->callback_data = NULL; /* master's ends of the pipes */ slot->pipeRead = pipeWM[PIPE_READ]; @@ -1071,20 +1074,28 @@ ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate) } /* - * Dispatch a job to some free worker (caller must ensure there is one!) + * Dispatch a job to some free worker. * * te is the TocEntry to be processed, act is the action to be taken on it. + * callback is the function to call on completion of the job. + * + * If no worker is currently available, this will block, and previously + * registered callback functions may be called. */ void -DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te, - T_Action act) +DispatchJobForTocEntry(ArchiveHandle *AH, + ParallelState *pstate, + TocEntry *te, + T_Action act, + ParallelCompletionPtr callback, + void *callback_data) { int worker; char *arg; - /* our caller makes sure that at least one worker is idle */ - worker = GetIdleWorker(pstate); - Assert(worker != NO_SLOT); + /* Get a worker, waiting if none are idle */ + while ((worker = GetIdleWorker(pstate)) == NO_SLOT) + WaitForWorkers(AH, pstate, WFW_ONE_IDLE); /* Construct and send command string */ arg = (AH->MasterStartParallelItemPtr) (AH, te, act); @@ -1095,14 +1106,16 @@ DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te, /* Remember worker is busy, and which TocEntry it's working on */ pstate->parallelSlot[worker].workerStatus = WRKR_WORKING; - pstate->parallelSlot[worker].args->te = te; + pstate->parallelSlot[worker].te = te; + pstate->parallelSlot[worker].callback = callback; + pstate->parallelSlot[worker].callback_data = callback_data; } /* * Find an idle worker and return its slot number. * Return NO_SLOT if none are idle. */ -int +static int GetIdleWorker(ParallelState *pstate) { int i; @@ -1274,17 +1287,16 @@ WaitForCommands(ArchiveHandle *AH, int pipefd[2]) * immediately if there is none available. * * When we get a status message, we let MasterEndParallelItemPtr process it, - * then save the resulting status code and switch the worker's state to - * WRKR_FINISHED. Later, caller must call ReapWorkerStatus() to verify - * that the status was "OK" and push the worker back to IDLE state. + * then pass the resulting status code to the callback function that was + * specified to DispatchJobForTocEntry, then reset the worker status to IDLE. * - * XXX Rube Goldberg would be proud of this API, but no one else should be. + * Returns true if we collected a status message, else false. * * XXX is it worth checking for more than one status message per call? * It seems somewhat unlikely that multiple workers would finish at exactly * the same time. */ -void +static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait) { int worker; @@ -1298,34 +1310,39 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait) /* If do_wait is true, we must have detected EOF on some socket */ if (do_wait) exit_horribly(modulename, "a worker process died unexpectedly\n"); - return; + return false; } /* Process it and update our idea of the worker's status */ if (messageStartsWith(msg, "OK ")) { - TocEntry *te = pstate->parallelSlot[worker].args->te; + ParallelSlot *slot = &pstate->parallelSlot[worker]; + TocEntry *te = slot->te; char *statusString; + int status; if (messageStartsWith(msg, "OK RESTORE ")) { statusString = msg + strlen("OK RESTORE "); - pstate->parallelSlot[worker].status = + status = (AH->MasterEndParallelItemPtr) (AH, te, statusString, ACT_RESTORE); + slot->callback(AH, te, status, slot->callback_data); } else if (messageStartsWith(msg, "OK DUMP ")) { statusString = msg + strlen("OK DUMP "); - pstate->parallelSlot[worker].status = + status = (AH->MasterEndParallelItemPtr) (AH, te, statusString, ACT_DUMP); + slot->callback(AH, te, status, slot->callback_data); } else exit_horribly(modulename, "invalid message received from worker: \"%s\"\n", msg); - pstate->parallelSlot[worker].workerStatus = WRKR_FINISHED; + slot->workerStatus = WRKR_IDLE; + slot->te = NULL; } else exit_horribly(modulename, @@ -1334,110 +1351,79 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait) /* Free the string returned from getMessageFromWorker */ free(msg); -} - -/* - * Check to see if any worker is in WRKR_FINISHED state. If so, - * return its command status code into *status, reset it to IDLE state, - * and return its slot number. Otherwise return NO_SLOT. - * - * This function is executed in the master process. - */ -int -ReapWorkerStatus(ParallelState *pstate, int *status) -{ - int i; - for (i = 0; i < pstate->numWorkers; i++) - { - if (pstate->parallelSlot[i].workerStatus == WRKR_FINISHED) - { - *status = pstate->parallelSlot[i].status; - pstate->parallelSlot[i].status = 0; - pstate->parallelSlot[i].workerStatus = WRKR_IDLE; - return i; - } - } - return NO_SLOT; + return true; } /* - * Wait, if necessary, until we have at least one idle worker. - * Reap worker status as necessary to move FINISHED workers to IDLE state. + * Check for status results from workers, waiting if necessary. * - * We assume that no extra processing is required when reaping a finished - * command, except for checking that the status was OK (zero). - * Caution: that assumption means that this function can only be used in - * parallel dump, not parallel restore, because the latter has a more - * complex set of rules about handling status. + * Available wait modes are: + * WFW_NO_WAIT: reap any available status, but don't block + * WFW_GOT_STATUS: wait for at least one more worker to finish + * WFW_ONE_IDLE: wait for at least one worker to be idle + * WFW_ALL_IDLE: wait for all workers to be idle + * + * Any received results are passed to MasterEndParallelItemPtr and then + * to the callback specified to DispatchJobForTocEntry. * * This function is executed in the master process. */ void -EnsureIdleWorker(ArchiveHandle *AH, ParallelState *pstate) +WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode) { - int ret_worker; - int work_status; + bool do_wait = false; - for (;;) + /* + * In GOT_STATUS mode, always block waiting for a message, since we can't + * return till we get something. In other modes, we don't block the first + * time through the loop. + */ + if (mode == WFW_GOT_STATUS) { - int nTerm = 0; - - while ((ret_worker = ReapWorkerStatus(pstate, &work_status)) != NO_SLOT) - { - if (work_status != 0) - exit_horribly(modulename, "error processing a parallel work item\n"); - - nTerm++; - } - - /* - * We need to make sure that we have an idle worker before dispatching - * the next item. If nTerm > 0 we already have that (quick check). - */ - if (nTerm > 0) - return; - - /* explicit check for an idle worker */ - if (GetIdleWorker(pstate) != NO_SLOT) - return; + /* Assert that caller knows what it's doing */ + Assert(!IsEveryWorkerIdle(pstate)); + do_wait = true; + } + for (;;) + { /* - * If we have no idle worker, read the result of one or more workers - * and loop the loop to call ReapWorkerStatus() on them + * Check for status messages, even if we don't need to block. We do + * not try very hard to reap all available messages, though, since + * there's unlikely to be more than one. */ - ListenToWorkers(AH, pstate, true); - } -} - -/* - * Wait for all workers to be idle. - * Reap worker status as necessary to move FINISHED workers to IDLE state. - * - * We assume that no extra processing is required when reaping a finished - * command, except for checking that the status was OK (zero). - * Caution: that assumption means that this function can only be used in - * parallel dump, not parallel restore, because the latter has a more - * complex set of rules about handling status. - * - * This function is executed in the master process. - */ -void -EnsureWorkersFinished(ArchiveHandle *AH, ParallelState *pstate) -{ - int work_status; + if (ListenToWorkers(AH, pstate, do_wait)) + { + /* + * If we got a message, we are done by definition for GOT_STATUS + * mode, and we can also be certain that there's at least one idle + * worker. So we're done in all but ALL_IDLE mode. + */ + if (mode != WFW_ALL_IDLE) + return; + } - if (!pstate || pstate->numWorkers == 1) - return; + /* Check whether we must wait for new status messages */ + switch (mode) + { + case WFW_NO_WAIT: + return; /* never wait */ + case WFW_GOT_STATUS: + Assert(false); /* can't get here, because we waited */ + break; + case WFW_ONE_IDLE: + if (GetIdleWorker(pstate) != NO_SLOT) + return; + break; + case WFW_ALL_IDLE: + if (IsEveryWorkerIdle(pstate)) + return; + break; + } - /* Waiting for the remaining worker processes to finish */ - while (!IsEveryWorkerIdle(pstate)) - { - if (ReapWorkerStatus(pstate, &work_status) == NO_SLOT) - ListenToWorkers(AH, pstate, true); - else if (work_status != 0) - exit_horribly(modulename, - "error processing a parallel work item\n"); + /* Loop back, and this time wait for something to happen */ + do_wait = true; } } |