diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2017-09-01 17:38:54 -0400 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2017-09-01 17:39:01 -0400 |
commit | 51daa7bdb39e1bdc31eb99fd3f54f61743ebb7ae (patch) | |
tree | a527c43c5129b7f154b32326337e062202c04010 /src/backend/executor/nodeGatherMerge.c | |
parent | c039ba0716383ccaf88c9be1a7f0803a77823de1 (diff) | |
download | postgresql-51daa7bdb39e1bdc31eb99fd3f54f61743ebb7ae.tar.gz postgresql-51daa7bdb39e1bdc31eb99fd3f54f61743ebb7ae.zip |
Improve division of labor between execParallel.c and nodeGather[Merge].c.
Move the responsibility for creating/destroying TupleQueueReaders into
execParallel.c, to avoid duplicative coding in nodeGather.c and
nodeGatherMerge.c. Also, instead of having DestroyTupleQueueReader do
shm_mq_detach, do it in the caller (which is now only ExecParallelFinish).
This means execParallel.c does both the attaching and detaching of the
tuple-queue-reader shm_mqs, which seems less weird than the previous
arrangement.
These changes also eliminate a vestigial memory leak (of the pei->tqueue
array). It's now demonstrable that rescans of Gather or GatherMerge don't
leak memory.
Discussion: https://postgr.es/m/8670.1504192177@sss.pgh.pa.us
Diffstat (limited to 'src/backend/executor/nodeGatherMerge.c')
-rw-r--r-- | src/backend/executor/nodeGatherMerge.c | 50 |
1 files changed, 15 insertions, 35 deletions
diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c index b8bb4f8eb04..d20d46606e4 100644 --- a/src/backend/executor/nodeGatherMerge.c +++ b/src/backend/executor/nodeGatherMerge.c @@ -178,7 +178,6 @@ ExecGatherMerge(PlanState *pstate) GatherMergeState *node = castNode(GatherMergeState, pstate); TupleTableSlot *slot; ExprContext *econtext; - int i; CHECK_FOR_INTERRUPTS(); @@ -214,27 +213,23 @@ ExecGatherMerge(PlanState *pstate) LaunchParallelWorkers(pcxt); /* We save # workers launched for the benefit of EXPLAIN */ node->nworkers_launched = pcxt->nworkers_launched; - node->nreaders = 0; /* Set up tuple queue readers to read the results. */ if (pcxt->nworkers_launched > 0) { - node->reader = palloc(pcxt->nworkers_launched * - sizeof(TupleQueueReader *)); - - for (i = 0; i < pcxt->nworkers_launched; ++i) - { - shm_mq_set_handle(node->pei->tqueue[i], - pcxt->worker[i].bgwhandle); - node->reader[node->nreaders++] = - CreateTupleQueueReader(node->pei->tqueue[i], - node->tupDesc); - } + ExecParallelCreateReaders(node->pei, node->tupDesc); + /* Make a working array showing the active readers */ + node->nreaders = pcxt->nworkers_launched; + node->reader = (TupleQueueReader **) + palloc(node->nreaders * sizeof(TupleQueueReader *)); + memcpy(node->reader, node->pei->reader, + node->nreaders * sizeof(TupleQueueReader *)); } else { /* No workers? Then never mind. */ - ExecShutdownGatherMergeWorkers(node); + node->nreaders = 0; + node->reader = NULL; } } @@ -284,8 +279,6 @@ ExecEndGatherMerge(GatherMergeState *node) * ExecShutdownGatherMerge * * Destroy the setup for parallel workers including parallel context. - * Collect all the stats after workers are stopped, else some work - * done by workers won't be accounted. * ---------------------------------------------------------------- */ void @@ -304,30 +297,19 @@ ExecShutdownGatherMerge(GatherMergeState *node) /* ---------------------------------------------------------------- * ExecShutdownGatherMergeWorkers * - * Destroy the parallel workers. Collect all the stats after - * workers are stopped, else some work done by workers won't be - * accounted. + * Stop all the parallel workers. * ---------------------------------------------------------------- */ static void ExecShutdownGatherMergeWorkers(GatherMergeState *node) { - /* Shut down tuple queue readers before shutting down workers. */ - if (node->reader != NULL) - { - int i; - - for (i = 0; i < node->nreaders; ++i) - if (node->reader[i]) - DestroyTupleQueueReader(node->reader[i]); - - pfree(node->reader); - node->reader = NULL; - } - - /* Now shut down the workers. */ if (node->pei != NULL) ExecParallelFinish(node->pei); + + /* Flush local copy of reader array */ + if (node->reader) + pfree(node->reader); + node->reader = NULL; } /* ---------------------------------------------------------------- @@ -672,8 +654,6 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait) else if (tuple_buffer->done) { /* Reader is known to be exhausted. */ - DestroyTupleQueueReader(gm_state->reader[reader - 1]); - gm_state->reader[reader - 1] = NULL; return false; } else |