aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/nodeGather.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor/nodeGather.c')
-rw-r--r--src/backend/executor/nodeGather.c64
1 files changed, 22 insertions, 42 deletions
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index d93fbacdf9e..022d75b4b85 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -130,7 +130,6 @@ ExecGather(PlanState *pstate)
{
GatherState *node = castNode(GatherState, pstate);
TupleTableSlot *fslot = node->funnel_slot;
- int i;
TupleTableSlot *slot;
ExprContext *econtext;
@@ -173,33 +172,30 @@ ExecGather(PlanState *pstate)
LaunchParallelWorkers(pcxt);
/* We save # workers launched for the benefit of EXPLAIN */
node->nworkers_launched = pcxt->nworkers_launched;
- node->nreaders = 0;
- node->nextreader = 0;
/* Set up tuple queue readers to read the results. */
if (pcxt->nworkers_launched > 0)
{
- node->reader = palloc(pcxt->nworkers_launched *
- sizeof(TupleQueueReader *));
-
- for (i = 0; i < pcxt->nworkers_launched; ++i)
- {
- shm_mq_set_handle(node->pei->tqueue[i],
- pcxt->worker[i].bgwhandle);
- node->reader[node->nreaders++] =
- CreateTupleQueueReader(node->pei->tqueue[i],
- fslot->tts_tupleDescriptor);
- }
+ ExecParallelCreateReaders(node->pei,
+ fslot->tts_tupleDescriptor);
+ /* Make a working array showing the active readers */
+ node->nreaders = pcxt->nworkers_launched;
+ node->reader = (TupleQueueReader **)
+ palloc(node->nreaders * sizeof(TupleQueueReader *));
+ memcpy(node->reader, node->pei->reader,
+ node->nreaders * sizeof(TupleQueueReader *));
}
else
{
/* No workers? Then never mind. */
- ExecShutdownGatherWorkers(node);
+ node->nreaders = 0;
+ node->reader = NULL;
}
+ node->nextreader = 0;
}
/* Run plan locally if no workers or not single-copy. */
- node->need_to_scan_locally = (node->reader == NULL)
+ node->need_to_scan_locally = (node->nreaders == 0)
|| !gather->single_copy;
node->initialized = true;
}
@@ -258,11 +254,11 @@ gather_getnext(GatherState *gatherstate)
MemoryContext tupleContext = gatherstate->ps.ps_ExprContext->ecxt_per_tuple_memory;
HeapTuple tup;
- while (gatherstate->reader != NULL || gatherstate->need_to_scan_locally)
+ while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
{
CHECK_FOR_INTERRUPTS();
- if (gatherstate->reader != NULL)
+ if (gatherstate->nreaders > 0)
{
MemoryContext oldContext;
@@ -319,19 +315,15 @@ gather_readnext(GatherState *gatherstate)
tup = TupleQueueReaderNext(reader, true, &readerdone);
/*
- * If this reader is done, remove it, and collapse the array. If all
- * readers are done, clean up remaining worker state.
+ * If this reader is done, remove it from our working array of active
+ * readers. If all readers are done, we're outta here.
*/
if (readerdone)
{
Assert(!tup);
- DestroyTupleQueueReader(reader);
--gatherstate->nreaders;
if (gatherstate->nreaders == 0)
- {
- ExecShutdownGatherWorkers(gatherstate);
return NULL;
- }
memmove(&gatherstate->reader[gatherstate->nextreader],
&gatherstate->reader[gatherstate->nextreader + 1],
sizeof(TupleQueueReader *)
@@ -378,37 +370,25 @@ gather_readnext(GatherState *gatherstate)
/* ----------------------------------------------------------------
* ExecShutdownGatherWorkers
*
- * Destroy the parallel workers. Collect all the stats after
- * workers are stopped, else some work done by workers won't be
- * accounted.
+ * Stop all the parallel workers.
* ----------------------------------------------------------------
*/
static void
ExecShutdownGatherWorkers(GatherState *node)
{
- /* Shut down tuple queue readers before shutting down workers. */
- if (node->reader != NULL)
- {
- int i;
-
- for (i = 0; i < node->nreaders; ++i)
- DestroyTupleQueueReader(node->reader[i]);
-
- pfree(node->reader);
- node->reader = NULL;
- }
-
- /* Now shut down the workers. */
if (node->pei != NULL)
ExecParallelFinish(node->pei);
+
+ /* Flush local copy of reader array */
+ if (node->reader)
+ pfree(node->reader);
+ node->reader = NULL;
}
/* ----------------------------------------------------------------
* ExecShutdownGather
*
* Destroy the setup for parallel workers including parallel context.
- * Collect all the stats after workers are stopped, else some work
- * done by workers won't be accounted.
* ----------------------------------------------------------------
*/
void