aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/nodeGatherMerge.c
diff options
context:
space:
mode:
authorTom Lane <tgl@sss.pgh.pa.us>2017-09-01 17:38:54 -0400
committerTom Lane <tgl@sss.pgh.pa.us>2017-09-01 17:39:01 -0400
commit51daa7bdb39e1bdc31eb99fd3f54f61743ebb7ae (patch)
treea527c43c5129b7f154b32326337e062202c04010 /src/backend/executor/nodeGatherMerge.c
parentc039ba0716383ccaf88c9be1a7f0803a77823de1 (diff)
downloadpostgresql-51daa7bdb39e1bdc31eb99fd3f54f61743ebb7ae.tar.gz
postgresql-51daa7bdb39e1bdc31eb99fd3f54f61743ebb7ae.zip
Improve division of labor between execParallel.c and nodeGather[Merge].c.
Move the responsibility for creating/destroying TupleQueueReaders into execParallel.c, to avoid duplicative coding in nodeGather.c and nodeGatherMerge.c. Also, instead of having DestroyTupleQueueReader do shm_mq_detach, do it in the caller (which is now only ExecParallelFinish). This means execParallel.c does both the attaching and detaching of the tuple-queue-reader shm_mqs, which seems less weird than the previous arrangement. These changes also eliminate a vestigial memory leak (of the pei->tqueue array). It's now demonstrable that rescans of Gather or GatherMerge don't leak memory. Discussion: https://postgr.es/m/8670.1504192177@sss.pgh.pa.us
Diffstat (limited to 'src/backend/executor/nodeGatherMerge.c')
-rw-r--r--src/backend/executor/nodeGatherMerge.c50
1 files changed, 15 insertions, 35 deletions
diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c
index b8bb4f8eb04..d20d46606e4 100644
--- a/src/backend/executor/nodeGatherMerge.c
+++ b/src/backend/executor/nodeGatherMerge.c
@@ -178,7 +178,6 @@ ExecGatherMerge(PlanState *pstate)
GatherMergeState *node = castNode(GatherMergeState, pstate);
TupleTableSlot *slot;
ExprContext *econtext;
- int i;
CHECK_FOR_INTERRUPTS();
@@ -214,27 +213,23 @@ ExecGatherMerge(PlanState *pstate)
LaunchParallelWorkers(pcxt);
/* We save # workers launched for the benefit of EXPLAIN */
node->nworkers_launched = pcxt->nworkers_launched;
- node->nreaders = 0;
/* Set up tuple queue readers to read the results. */
if (pcxt->nworkers_launched > 0)
{
- node->reader = palloc(pcxt->nworkers_launched *
- sizeof(TupleQueueReader *));
-
- for (i = 0; i < pcxt->nworkers_launched; ++i)
- {
- shm_mq_set_handle(node->pei->tqueue[i],
- pcxt->worker[i].bgwhandle);
- node->reader[node->nreaders++] =
- CreateTupleQueueReader(node->pei->tqueue[i],
- node->tupDesc);
- }
+ ExecParallelCreateReaders(node->pei, node->tupDesc);
+ /* Make a working array showing the active readers */
+ node->nreaders = pcxt->nworkers_launched;
+ node->reader = (TupleQueueReader **)
+ palloc(node->nreaders * sizeof(TupleQueueReader *));
+ memcpy(node->reader, node->pei->reader,
+ node->nreaders * sizeof(TupleQueueReader *));
}
else
{
/* No workers? Then never mind. */
- ExecShutdownGatherMergeWorkers(node);
+ node->nreaders = 0;
+ node->reader = NULL;
}
}
@@ -284,8 +279,6 @@ ExecEndGatherMerge(GatherMergeState *node)
* ExecShutdownGatherMerge
*
* Destroy the setup for parallel workers including parallel context.
- * Collect all the stats after workers are stopped, else some work
- * done by workers won't be accounted.
* ----------------------------------------------------------------
*/
void
@@ -304,30 +297,19 @@ ExecShutdownGatherMerge(GatherMergeState *node)
/* ----------------------------------------------------------------
* ExecShutdownGatherMergeWorkers
*
- * Destroy the parallel workers. Collect all the stats after
- * workers are stopped, else some work done by workers won't be
- * accounted.
+ * Stop all the parallel workers.
* ----------------------------------------------------------------
*/
static void
ExecShutdownGatherMergeWorkers(GatherMergeState *node)
{
- /* Shut down tuple queue readers before shutting down workers. */
- if (node->reader != NULL)
- {
- int i;
-
- for (i = 0; i < node->nreaders; ++i)
- if (node->reader[i])
- DestroyTupleQueueReader(node->reader[i]);
-
- pfree(node->reader);
- node->reader = NULL;
- }
-
- /* Now shut down the workers. */
if (node->pei != NULL)
ExecParallelFinish(node->pei);
+
+ /* Flush local copy of reader array */
+ if (node->reader)
+ pfree(node->reader);
+ node->reader = NULL;
}
/* ----------------------------------------------------------------
@@ -672,8 +654,6 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
else if (tuple_buffer->done)
{
/* Reader is known to be exhausted. */
- DestroyTupleQueueReader(gm_state->reader[reader - 1]);
- gm_state->reader[reader - 1] = NULL;
return false;
}
else