diff options
Diffstat (limited to 'src/backend/executor/execParallel.c')
-rw-r--r-- | src/backend/executor/execParallel.c | 33 |
1 files changed, 26 insertions, 7 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index efcbaef416c..99a9de3cdc3 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -84,7 +84,8 @@ static bool ExecParallelEstimate(PlanState *node, ExecParallelEstimateContext *e); static bool ExecParallelInitializeDSM(PlanState *node, ExecParallelInitializeDSMContext *d); -static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt); +static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt, + bool reinitialize); static bool ExecParallelRetrieveInstrumentation(PlanState *planstate, SharedExecutorInstrumentation *instrumentation); @@ -217,7 +218,7 @@ ExecParallelInitializeDSM(PlanState *planstate, * to the main backend and start the workers. */ static shm_mq_handle ** -ExecParallelSetupTupleQueues(ParallelContext *pcxt) +ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize) { shm_mq_handle **responseq; char *tqueuespace; @@ -231,9 +232,16 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt) responseq = (shm_mq_handle **) palloc(pcxt->nworkers * sizeof(shm_mq_handle *)); - /* Allocate space from the DSM for the queues themselves. */ - tqueuespace = shm_toc_allocate(pcxt->toc, - PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers); + /* + * If not reinitializing, allocate space from the DSM for the queues; + * otherwise, find the already allocated space. + */ + if (!reinitialize) + tqueuespace = + shm_toc_allocate(pcxt->toc, + PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers); + else + tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE); /* Create the queues, and become the receiver for each. */ for (i = 0; i < pcxt->nworkers; ++i) @@ -248,13 +256,24 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt) } /* Add array of queues to shm_toc, so others can find it. */ - shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace); + if (!reinitialize) + shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace); /* Return array of handles. */ return responseq; } /* + * Re-initialize the response queues for backend workers to return tuples + * to the main backend and start the workers. + */ +shm_mq_handle ** +ExecParallelReinitializeTupleQueues(ParallelContext *pcxt) +{ + return ExecParallelSetupTupleQueues(pcxt, true); +} + +/* * Sets up the required infrastructure for backend workers to perform * execution and return results to the main backend. */ @@ -363,7 +382,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) pei->buffer_usage = bufusage_space; /* Set up tuple queues. */ - pei->tqueue = ExecParallelSetupTupleQueues(pcxt); + pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false); /* * If instrumentation options were supplied, allocate space for the |