diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2017-09-01 17:38:54 -0400 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2017-09-01 17:39:01 -0400 |
commit | 51daa7bdb39e1bdc31eb99fd3f54f61743ebb7ae (patch) | |
tree | a527c43c5129b7f154b32326337e062202c04010 /src/backend/executor/execParallel.c | |
parent | c039ba0716383ccaf88c9be1a7f0803a77823de1 (diff) | |
download | postgresql-51daa7bdb39e1bdc31eb99fd3f54f61743ebb7ae.tar.gz postgresql-51daa7bdb39e1bdc31eb99fd3f54f61743ebb7ae.zip |
Improve division of labor between execParallel.c and nodeGather[Merge].c.
Move the responsibility for creating/destroying TupleQueueReaders into
execParallel.c, to avoid duplicative coding in nodeGather.c and
nodeGatherMerge.c. Also, instead of having DestroyTupleQueueReader do
shm_mq_detach, do it in the caller (which is now only ExecParallelFinish).
This means execParallel.c does both the attaching and detaching of the
tuple-queue-reader shm_mqs, which seems less weird than the previous
arrangement.
These changes also eliminate a vestigial memory leak (of the pei->tqueue
array). It's now demonstrable that rescans of Gather or GatherMerge don't
leak memory.
Discussion: https://postgr.es/m/8670.1504192177@sss.pgh.pa.us
Diffstat (limited to 'src/backend/executor/execParallel.c')
-rw-r--r-- | src/backend/executor/execParallel.c | 72 |
1 files changed, 68 insertions, 4 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index c713b851399..59f3744a147 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -534,9 +534,12 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space); pei->buffer_usage = bufusage_space; - /* Set up tuple queues. */ + /* Set up the tuple queues that the workers will write into. */ pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false); + /* We don't need the TupleQueueReaders yet, though. */ + pei->reader = NULL; + /* * If instrumentation options were supplied, allocate space for the data. * It only gets partially initialized here; the rest happens during @@ -604,6 +607,37 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, } /* + * Set up tuple queue readers to read the results of a parallel subplan. + * All the workers are expected to return tuples matching tupDesc. + * + * This is separate from ExecInitParallelPlan() because we can launch the + * worker processes and let them start doing something before we do this. + */ +void +ExecParallelCreateReaders(ParallelExecutorInfo *pei, + TupleDesc tupDesc) +{ + int nworkers = pei->pcxt->nworkers_launched; + int i; + + Assert(pei->reader == NULL); + + if (nworkers > 0) + { + pei->reader = (TupleQueueReader **) + palloc(nworkers * sizeof(TupleQueueReader *)); + + for (i = 0; i < nworkers; i++) + { + shm_mq_set_handle(pei->tqueue[i], + pei->pcxt->worker[i].bgwhandle); + pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i], + tupDesc); + } + } +} + +/* * Re-initialize the parallel executor shared memory state before launching * a fresh batch of workers. */ @@ -616,6 +650,7 @@ ExecParallelReinitialize(PlanState *planstate, ReinitializeParallelDSM(pei->pcxt); pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true); + pei->reader = NULL; pei->finished = false; /* Traverse plan tree and let each child node reset associated state. */ @@ -741,16 +776,45 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate, void ExecParallelFinish(ParallelExecutorInfo *pei) { + int nworkers = pei->pcxt->nworkers_launched; int i; + /* Make this be a no-op if called twice in a row. */ if (pei->finished) return; - /* First, wait for the workers to finish. */ + /* + * Detach from tuple queues ASAP, so that any still-active workers will + * notice that no further results are wanted. + */ + if (pei->tqueue != NULL) + { + for (i = 0; i < nworkers; i++) + shm_mq_detach(pei->tqueue[i]); + pfree(pei->tqueue); + pei->tqueue = NULL; + } + + /* + * While we're waiting for the workers to finish, let's get rid of the + * tuple queue readers. (Any other local cleanup could be done here too.) + */ + if (pei->reader != NULL) + { + for (i = 0; i < nworkers; i++) + DestroyTupleQueueReader(pei->reader[i]); + pfree(pei->reader); + pei->reader = NULL; + } + + /* Now wait for the workers to finish. */ WaitForParallelWorkersToFinish(pei->pcxt); - /* Next, accumulate buffer usage. */ - for (i = 0; i < pei->pcxt->nworkers_launched; ++i) + /* + * Next, accumulate buffer usage. (This must wait for the workers to + * finish, or we might get incomplete data.) + */ + for (i = 0; i < nworkers; i++) InstrAccumParallelQuery(&pei->buffer_usage[i]); /* Finally, accumulate instrumentation, if any. */ |