diff options
Diffstat (limited to 'src/backend/executor/nodeGather.c')
-rw-r--r-- | src/backend/executor/nodeGather.c | 64 |
1 files changed, 22 insertions, 42 deletions
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index d93fbacdf9e..022d75b4b85 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -130,7 +130,6 @@ ExecGather(PlanState *pstate) { GatherState *node = castNode(GatherState, pstate); TupleTableSlot *fslot = node->funnel_slot; - int i; TupleTableSlot *slot; ExprContext *econtext; @@ -173,33 +172,30 @@ ExecGather(PlanState *pstate) LaunchParallelWorkers(pcxt); /* We save # workers launched for the benefit of EXPLAIN */ node->nworkers_launched = pcxt->nworkers_launched; - node->nreaders = 0; - node->nextreader = 0; /* Set up tuple queue readers to read the results. */ if (pcxt->nworkers_launched > 0) { - node->reader = palloc(pcxt->nworkers_launched * - sizeof(TupleQueueReader *)); - - for (i = 0; i < pcxt->nworkers_launched; ++i) - { - shm_mq_set_handle(node->pei->tqueue[i], - pcxt->worker[i].bgwhandle); - node->reader[node->nreaders++] = - CreateTupleQueueReader(node->pei->tqueue[i], - fslot->tts_tupleDescriptor); - } + ExecParallelCreateReaders(node->pei, + fslot->tts_tupleDescriptor); + /* Make a working array showing the active readers */ + node->nreaders = pcxt->nworkers_launched; + node->reader = (TupleQueueReader **) + palloc(node->nreaders * sizeof(TupleQueueReader *)); + memcpy(node->reader, node->pei->reader, + node->nreaders * sizeof(TupleQueueReader *)); } else { /* No workers? Then never mind. */ - ExecShutdownGatherWorkers(node); + node->nreaders = 0; + node->reader = NULL; } + node->nextreader = 0; } /* Run plan locally if no workers or not single-copy. */ - node->need_to_scan_locally = (node->reader == NULL) + node->need_to_scan_locally = (node->nreaders == 0) || !gather->single_copy; node->initialized = true; } @@ -258,11 +254,11 @@ gather_getnext(GatherState *gatherstate) MemoryContext tupleContext = gatherstate->ps.ps_ExprContext->ecxt_per_tuple_memory; HeapTuple tup; - while (gatherstate->reader != NULL || gatherstate->need_to_scan_locally) + while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally) { CHECK_FOR_INTERRUPTS(); - if (gatherstate->reader != NULL) + if (gatherstate->nreaders > 0) { MemoryContext oldContext; @@ -319,19 +315,15 @@ gather_readnext(GatherState *gatherstate) tup = TupleQueueReaderNext(reader, true, &readerdone); /* - * If this reader is done, remove it, and collapse the array. If all - * readers are done, clean up remaining worker state. + * If this reader is done, remove it from our working array of active + * readers. If all readers are done, we're outta here. */ if (readerdone) { Assert(!tup); - DestroyTupleQueueReader(reader); --gatherstate->nreaders; if (gatherstate->nreaders == 0) - { - ExecShutdownGatherWorkers(gatherstate); return NULL; - } memmove(&gatherstate->reader[gatherstate->nextreader], &gatherstate->reader[gatherstate->nextreader + 1], sizeof(TupleQueueReader *) @@ -378,37 +370,25 @@ gather_readnext(GatherState *gatherstate) /* ---------------------------------------------------------------- * ExecShutdownGatherWorkers * - * Destroy the parallel workers. Collect all the stats after - * workers are stopped, else some work done by workers won't be - * accounted. + * Stop all the parallel workers. * ---------------------------------------------------------------- */ static void ExecShutdownGatherWorkers(GatherState *node) { - /* Shut down tuple queue readers before shutting down workers. */ - if (node->reader != NULL) - { - int i; - - for (i = 0; i < node->nreaders; ++i) - DestroyTupleQueueReader(node->reader[i]); - - pfree(node->reader); - node->reader = NULL; - } - - /* Now shut down the workers. */ if (node->pei != NULL) ExecParallelFinish(node->pei); + + /* Flush local copy of reader array */ + if (node->reader) + pfree(node->reader); + node->reader = NULL; } /* ---------------------------------------------------------------- * ExecShutdownGather * * Destroy the setup for parallel workers including parallel context. - * Collect all the stats after workers are stopped, else some work - * done by workers won't be accounted. * ---------------------------------------------------------------- */ void |