diff options
Diffstat (limited to 'src/backend/access/transam/parallel.c')
-rw-r--r-- | src/backend/access/transam/parallel.c | 49 |
1 files changed, 49 insertions, 0 deletions
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 17f9a5ae6e4..0085987f324 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -404,6 +404,52 @@ LaunchParallelWorkers(ParallelContext *pcxt) /* We might be running in a short-lived memory context. */ oldcontext = MemoryContextSwitchTo(TopTransactionContext); + /* + * This function can be called for a parallel context for which it has + * already been called previously, but only if all of the old workers + * have already exited. When this case arises, we need to do some extra + * reinitialization. + */ + if (pcxt->nworkers_launched > 0) + { + FixedParallelState *fps; + char *error_queue_space; + + /* Clean out old worker handles. */ + for (i = 0; i < pcxt->nworkers; ++i) + { + if (pcxt->worker[i].error_mqh != NULL) + elog(ERROR, "previously launched worker still alive"); + if (pcxt->worker[i].bgwhandle != NULL) + { + pfree(pcxt->worker[i].bgwhandle); + pcxt->worker[i].bgwhandle = NULL; + } + } + + /* Reset a few bits of fixed parallel state to a clean state. */ + fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED); + fps->workers_attached = 0; + fps->last_xlog_end = 0; + + /* Recreate error queues. */ + error_queue_space = + shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE); + for (i = 0; i < pcxt->nworkers; ++i) + { + char *start; + shm_mq *mq; + + start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE; + mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE); + shm_mq_set_receiver(mq, MyProc); + pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL); + } + + /* Reset number of workers launched. */ + pcxt->nworkers_launched = 0; + } + /* Configure a worker. */ snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d", MyProcPid); @@ -428,8 +474,11 @@ LaunchParallelWorkers(ParallelContext *pcxt) if (!any_registrations_failed && RegisterDynamicBackgroundWorker(&worker, &pcxt->worker[i].bgwhandle)) + { shm_mq_set_handle(pcxt->worker[i].error_mqh, pcxt->worker[i].bgwhandle); + pcxt->nworkers_launched++; + } else { /* |