aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/execParallel.c
diff options
context:
space:
mode:
authorTom Lane <tgl@sss.pgh.pa.us>2017-09-01 17:38:54 -0400
committerTom Lane <tgl@sss.pgh.pa.us>2017-09-01 17:39:01 -0400
commit51daa7bdb39e1bdc31eb99fd3f54f61743ebb7ae (patch)
treea527c43c5129b7f154b32326337e062202c04010 /src/backend/executor/execParallel.c
parentc039ba0716383ccaf88c9be1a7f0803a77823de1 (diff)
downloadpostgresql-51daa7bdb39e1bdc31eb99fd3f54f61743ebb7ae.tar.gz
postgresql-51daa7bdb39e1bdc31eb99fd3f54f61743ebb7ae.zip
Improve division of labor between execParallel.c and nodeGather[Merge].c.
Move the responsibility for creating/destroying TupleQueueReaders into execParallel.c, to avoid duplicative coding in nodeGather.c and nodeGatherMerge.c. Also, instead of having DestroyTupleQueueReader do shm_mq_detach, do it in the caller (which is now only ExecParallelFinish). This means execParallel.c does both the attaching and detaching of the tuple-queue-reader shm_mqs, which seems less weird than the previous arrangement. These changes also eliminate a vestigial memory leak (of the pei->tqueue array). It's now demonstrable that rescans of Gather or GatherMerge don't leak memory. Discussion: https://postgr.es/m/8670.1504192177@sss.pgh.pa.us
Diffstat (limited to 'src/backend/executor/execParallel.c')
-rw-r--r--src/backend/executor/execParallel.c72
1 files changed, 68 insertions, 4 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index c713b851399..59f3744a147 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -534,9 +534,12 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
pei->buffer_usage = bufusage_space;
- /* Set up tuple queues. */
+ /* Set up the tuple queues that the workers will write into. */
pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
+ /* We don't need the TupleQueueReaders yet, though. */
+ pei->reader = NULL;
+
/*
* If instrumentation options were supplied, allocate space for the data.
* It only gets partially initialized here; the rest happens during
@@ -604,6 +607,37 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
}
/*
+ * Set up tuple queue readers to read the results of a parallel subplan.
+ * All the workers are expected to return tuples matching tupDesc.
+ *
+ * This is separate from ExecInitParallelPlan() because we can launch the
+ * worker processes and let them start doing something before we do this.
+ */
+void
+ExecParallelCreateReaders(ParallelExecutorInfo *pei,
+ TupleDesc tupDesc)
+{
+ int nworkers = pei->pcxt->nworkers_launched;
+ int i;
+
+ Assert(pei->reader == NULL);
+
+ if (nworkers > 0)
+ {
+ pei->reader = (TupleQueueReader **)
+ palloc(nworkers * sizeof(TupleQueueReader *));
+
+ for (i = 0; i < nworkers; i++)
+ {
+ shm_mq_set_handle(pei->tqueue[i],
+ pei->pcxt->worker[i].bgwhandle);
+ pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i],
+ tupDesc);
+ }
+ }
+}
+
+/*
* Re-initialize the parallel executor shared memory state before launching
* a fresh batch of workers.
*/
@@ -616,6 +650,7 @@ ExecParallelReinitialize(PlanState *planstate,
ReinitializeParallelDSM(pei->pcxt);
pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
+ pei->reader = NULL;
pei->finished = false;
/* Traverse plan tree and let each child node reset associated state. */
@@ -741,16 +776,45 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
void
ExecParallelFinish(ParallelExecutorInfo *pei)
{
+ int nworkers = pei->pcxt->nworkers_launched;
int i;
+ /* Make this be a no-op if called twice in a row. */
if (pei->finished)
return;
- /* First, wait for the workers to finish. */
+ /*
+ * Detach from tuple queues ASAP, so that any still-active workers will
+ * notice that no further results are wanted.
+ */
+ if (pei->tqueue != NULL)
+ {
+ for (i = 0; i < nworkers; i++)
+ shm_mq_detach(pei->tqueue[i]);
+ pfree(pei->tqueue);
+ pei->tqueue = NULL;
+ }
+
+ /*
+ * While we're waiting for the workers to finish, let's get rid of the
+ * tuple queue readers. (Any other local cleanup could be done here too.)
+ */
+ if (pei->reader != NULL)
+ {
+ for (i = 0; i < nworkers; i++)
+ DestroyTupleQueueReader(pei->reader[i]);
+ pfree(pei->reader);
+ pei->reader = NULL;
+ }
+
+ /* Now wait for the workers to finish. */
WaitForParallelWorkersToFinish(pei->pcxt);
- /* Next, accumulate buffer usage. */
- for (i = 0; i < pei->pcxt->nworkers_launched; ++i)
+ /*
+ * Next, accumulate buffer usage. (This must wait for the workers to
+ * finish, or we might get incomplete data.)
+ */
+ for (i = 0; i < nworkers; i++)
InstrAccumParallelQuery(&pei->buffer_usage[i]);
/* Finally, accumulate instrumentation, if any. */