diff options
Diffstat (limited to 'src/backend/access/transam/parallel.c')
-rw-r--r-- | src/backend/access/transam/parallel.c | 175 |
1 files changed, 92 insertions, 83 deletions
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 35a873de6ba..79cc9880bbb 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -110,6 +110,7 @@ static void HandleParallelMessage(ParallelContext *, int, StringInfo msg); static void ParallelErrorContext(void *arg); static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc); static void ParallelWorkerMain(Datum main_arg); +static void WaitForParallelWorkersToExit(ParallelContext *pcxt); /* * Establish a new parallel context. This should be done after entering @@ -384,6 +385,46 @@ InitializeParallelDSM(ParallelContext *pcxt) } /* + * Reinitialize the dynamic shared memory segment for a parallel context such + * that we could launch workers for it again. + */ +void +ReinitializeParallelDSM(ParallelContext *pcxt) +{ + FixedParallelState *fps; + char *error_queue_space; + int i; + + if (pcxt->nworkers_launched == 0) + return; + + WaitForParallelWorkersToFinish(pcxt); + WaitForParallelWorkersToExit(pcxt); + + /* Reset a few bits of fixed parallel state to a clean state. */ + fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED); + fps->workers_attached = 0; + fps->last_xlog_end = 0; + + /* Recreate error queues. */ + error_queue_space = + shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE); + for (i = 0; i < pcxt->nworkers; ++i) + { + char *start; + shm_mq *mq; + + start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE; + mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE); + shm_mq_set_receiver(mq, MyProc); + pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL); + } + + /* Reset number of workers launched. */ + pcxt->nworkers_launched = 0; +} + +/* * Launch parallel workers. */ void @@ -404,52 +445,6 @@ LaunchParallelWorkers(ParallelContext *pcxt) /* We might be running in a short-lived memory context. */ oldcontext = MemoryContextSwitchTo(TopTransactionContext); - /* - * This function can be called for a parallel context for which it has - * already been called previously, but only if all of the old workers - * have already exited. When this case arises, we need to do some extra - * reinitialization. - */ - if (pcxt->nworkers_launched > 0) - { - FixedParallelState *fps; - char *error_queue_space; - - /* Clean out old worker handles. */ - for (i = 0; i < pcxt->nworkers; ++i) - { - if (pcxt->worker[i].error_mqh != NULL) - elog(ERROR, "previously launched worker still alive"); - if (pcxt->worker[i].bgwhandle != NULL) - { - pfree(pcxt->worker[i].bgwhandle); - pcxt->worker[i].bgwhandle = NULL; - } - } - - /* Reset a few bits of fixed parallel state to a clean state. */ - fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED); - fps->workers_attached = 0; - fps->last_xlog_end = 0; - - /* Recreate error queues. */ - error_queue_space = - shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE); - for (i = 0; i < pcxt->nworkers; ++i) - { - char *start; - shm_mq *mq; - - start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE; - mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE); - shm_mq_set_receiver(mq, MyProc); - pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL); - } - - /* Reset number of workers launched. */ - pcxt->nworkers_launched = 0; - } - /* Configure a worker. */ snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d", MyProcPid); @@ -501,7 +496,7 @@ LaunchParallelWorkers(ParallelContext *pcxt) } /* - * Wait for all workers to exit. + * Wait for all workers to finish computing. * * Even if the parallel operation seems to have completed successfully, it's * important to call this function afterwards. We must not miss any errors @@ -553,6 +548,46 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt) } /* + * Wait for all workers to exit. + * + * This function ensures that workers have been completely shutdown. The + * difference between WaitForParallelWorkersToFinish and this function is + * that former just ensures that last message sent by worker backend is + * received by master backend whereas this ensures the complete shutdown. + */ +static void +WaitForParallelWorkersToExit(ParallelContext *pcxt) +{ + int i; + + /* Wait until the workers actually die. */ + for (i = 0; i < pcxt->nworkers; ++i) + { + BgwHandleStatus status; + + if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL) + continue; + + status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle); + + /* + * If the postmaster kicked the bucket, we have no chance of cleaning + * up safely -- we won't be able to tell when our workers are actually + * dead. This doesn't necessitate a PANIC since they will all abort + * eventually, but we can't safely continue this session. + */ + if (status == BGWH_POSTMASTER_DIED) + ereport(FATAL, + (errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("postmaster exited during a parallel transaction"))); + + /* Release memory. */ + pfree(pcxt->worker[i].bgwhandle); + pcxt->worker[i].bgwhandle = NULL; + } +} + +/* * Destroy a parallel context. * * If expecting a clean exit, you should use WaitForParallelWorkersToFinish() @@ -578,10 +613,10 @@ DestroyParallelContext(ParallelContext *pcxt) { for (i = 0; i < pcxt->nworkers; ++i) { - if (pcxt->worker[i].bgwhandle != NULL) - TerminateBackgroundWorker(pcxt->worker[i].bgwhandle); if (pcxt->worker[i].error_mqh != NULL) { + TerminateBackgroundWorker(pcxt->worker[i].bgwhandle); + pfree(pcxt->worker[i].error_mqh); pcxt->worker[i].error_mqh = NULL; } @@ -609,38 +644,14 @@ DestroyParallelContext(ParallelContext *pcxt) pcxt->private_memory = NULL; } - /* Wait until the workers actually die. */ - for (i = 0; i < pcxt->nworkers; ++i) - { - BgwHandleStatus status; - - if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL) - continue; - - /* - * We can't finish transaction commit or abort until all of the - * workers are dead. This means, in particular, that we can't respond - * to interrupts at this stage. - */ - HOLD_INTERRUPTS(); - status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle); - RESUME_INTERRUPTS(); - - /* - * If the postmaster kicked the bucket, we have no chance of cleaning - * up safely -- we won't be able to tell when our workers are actually - * dead. This doesn't necessitate a PANIC since they will all abort - * eventually, but we can't safely continue this session. - */ - if (status == BGWH_POSTMASTER_DIED) - ereport(FATAL, - (errcode(ERRCODE_ADMIN_SHUTDOWN), - errmsg("postmaster exited during a parallel transaction"))); - - /* Release memory. */ - pfree(pcxt->worker[i].bgwhandle); - pcxt->worker[i].bgwhandle = NULL; - } + /* + * We can't finish transaction commit or abort until all of the + * workers have exited. This means, in particular, that we can't respond + * to interrupts at this stage. + */ + HOLD_INTERRUPTS(); + WaitForParallelWorkersToExit(pcxt); + RESUME_INTERRUPTS(); /* Free the worker array itself. */ if (pcxt->worker != NULL) @@ -799,9 +810,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg) case 'X': /* Terminate, indicating clean exit */ { - pfree(pcxt->worker[i].bgwhandle); pfree(pcxt->worker[i].error_mqh); - pcxt->worker[i].bgwhandle = NULL; pcxt->worker[i].error_mqh = NULL; break; } |