aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/access/transam/README.parallel10
-rw-r--r--src/backend/access/transam/parallel.c175
-rw-r--r--src/backend/executor/execParallel.c33
-rw-r--r--src/backend/executor/nodeGather.c54
-rw-r--r--src/include/access/parallel.h1
-rw-r--r--src/include/executor/execParallel.h1
6 files changed, 166 insertions, 108 deletions
diff --git a/src/backend/access/transam/README.parallel b/src/backend/access/transam/README.parallel
index dfcbafabf08..db9ac3d504d 100644
--- a/src/backend/access/transam/README.parallel
+++ b/src/backend/access/transam/README.parallel
@@ -222,7 +222,9 @@ pattern looks like this:
ExitParallelMode();
-If desired, after WaitForParallelWorkersToFinish() has been called, another
-call to LaunchParallelWorkers() can be made using the same parallel context.
-Calls to these two functions can be alternated any number of times before
-destroying the parallel context.
+If desired, after WaitForParallelWorkersToFinish() has been called, the
+context can be reset so that workers can be launched anew using the same
+parallel context. To do this, first call ReinitializeParallelDSM() to
+reinitialize state managed by the parallel context machinery itself; then,
+perform any other necessary resetting of state; after that, you can again
+call LaunchParallelWorkers.
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 35a873de6ba..79cc9880bbb 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -110,6 +110,7 @@ static void HandleParallelMessage(ParallelContext *, int, StringInfo msg);
static void ParallelErrorContext(void *arg);
static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc);
static void ParallelWorkerMain(Datum main_arg);
+static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
/*
* Establish a new parallel context. This should be done after entering
@@ -384,6 +385,46 @@ InitializeParallelDSM(ParallelContext *pcxt)
}
/*
+ * Reinitialize the dynamic shared memory segment for a parallel context such
+ * that we could launch workers for it again.
+ */
+void
+ReinitializeParallelDSM(ParallelContext *pcxt)
+{
+ FixedParallelState *fps;
+ char *error_queue_space;
+ int i;
+
+ if (pcxt->nworkers_launched == 0)
+ return;
+
+ WaitForParallelWorkersToFinish(pcxt);
+ WaitForParallelWorkersToExit(pcxt);
+
+ /* Reset a few bits of fixed parallel state to a clean state. */
+ fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED);
+ fps->workers_attached = 0;
+ fps->last_xlog_end = 0;
+
+ /* Recreate error queues. */
+ error_queue_space =
+ shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE);
+ for (i = 0; i < pcxt->nworkers; ++i)
+ {
+ char *start;
+ shm_mq *mq;
+
+ start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
+ mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
+ shm_mq_set_receiver(mq, MyProc);
+ pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
+ }
+
+ /* Reset number of workers launched. */
+ pcxt->nworkers_launched = 0;
+}
+
+/*
* Launch parallel workers.
*/
void
@@ -404,52 +445,6 @@ LaunchParallelWorkers(ParallelContext *pcxt)
/* We might be running in a short-lived memory context. */
oldcontext = MemoryContextSwitchTo(TopTransactionContext);
- /*
- * This function can be called for a parallel context for which it has
- * already been called previously, but only if all of the old workers
- * have already exited. When this case arises, we need to do some extra
- * reinitialization.
- */
- if (pcxt->nworkers_launched > 0)
- {
- FixedParallelState *fps;
- char *error_queue_space;
-
- /* Clean out old worker handles. */
- for (i = 0; i < pcxt->nworkers; ++i)
- {
- if (pcxt->worker[i].error_mqh != NULL)
- elog(ERROR, "previously launched worker still alive");
- if (pcxt->worker[i].bgwhandle != NULL)
- {
- pfree(pcxt->worker[i].bgwhandle);
- pcxt->worker[i].bgwhandle = NULL;
- }
- }
-
- /* Reset a few bits of fixed parallel state to a clean state. */
- fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED);
- fps->workers_attached = 0;
- fps->last_xlog_end = 0;
-
- /* Recreate error queues. */
- error_queue_space =
- shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE);
- for (i = 0; i < pcxt->nworkers; ++i)
- {
- char *start;
- shm_mq *mq;
-
- start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
- mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
- shm_mq_set_receiver(mq, MyProc);
- pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
- }
-
- /* Reset number of workers launched. */
- pcxt->nworkers_launched = 0;
- }
-
/* Configure a worker. */
snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
MyProcPid);
@@ -501,7 +496,7 @@ LaunchParallelWorkers(ParallelContext *pcxt)
}
/*
- * Wait for all workers to exit.
+ * Wait for all workers to finish computing.
*
* Even if the parallel operation seems to have completed successfully, it's
* important to call this function afterwards. We must not miss any errors
@@ -553,6 +548,46 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
}
/*
+ * Wait for all workers to exit.
+ *
+ * This function ensures that workers have been completely shutdown. The
+ * difference between WaitForParallelWorkersToFinish and this function is
+ * that former just ensures that last message sent by worker backend is
+ * received by master backend whereas this ensures the complete shutdown.
+ */
+static void
+WaitForParallelWorkersToExit(ParallelContext *pcxt)
+{
+ int i;
+
+ /* Wait until the workers actually die. */
+ for (i = 0; i < pcxt->nworkers; ++i)
+ {
+ BgwHandleStatus status;
+
+ if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
+ continue;
+
+ status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
+
+ /*
+ * If the postmaster kicked the bucket, we have no chance of cleaning
+ * up safely -- we won't be able to tell when our workers are actually
+ * dead. This doesn't necessitate a PANIC since they will all abort
+ * eventually, but we can't safely continue this session.
+ */
+ if (status == BGWH_POSTMASTER_DIED)
+ ereport(FATAL,
+ (errcode(ERRCODE_ADMIN_SHUTDOWN),
+ errmsg("postmaster exited during a parallel transaction")));
+
+ /* Release memory. */
+ pfree(pcxt->worker[i].bgwhandle);
+ pcxt->worker[i].bgwhandle = NULL;
+ }
+}
+
+/*
* Destroy a parallel context.
*
* If expecting a clean exit, you should use WaitForParallelWorkersToFinish()
@@ -578,10 +613,10 @@ DestroyParallelContext(ParallelContext *pcxt)
{
for (i = 0; i < pcxt->nworkers; ++i)
{
- if (pcxt->worker[i].bgwhandle != NULL)
- TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
if (pcxt->worker[i].error_mqh != NULL)
{
+ TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
+
pfree(pcxt->worker[i].error_mqh);
pcxt->worker[i].error_mqh = NULL;
}
@@ -609,38 +644,14 @@ DestroyParallelContext(ParallelContext *pcxt)
pcxt->private_memory = NULL;
}
- /* Wait until the workers actually die. */
- for (i = 0; i < pcxt->nworkers; ++i)
- {
- BgwHandleStatus status;
-
- if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
- continue;
-
- /*
- * We can't finish transaction commit or abort until all of the
- * workers are dead. This means, in particular, that we can't respond
- * to interrupts at this stage.
- */
- HOLD_INTERRUPTS();
- status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
- RESUME_INTERRUPTS();
-
- /*
- * If the postmaster kicked the bucket, we have no chance of cleaning
- * up safely -- we won't be able to tell when our workers are actually
- * dead. This doesn't necessitate a PANIC since they will all abort
- * eventually, but we can't safely continue this session.
- */
- if (status == BGWH_POSTMASTER_DIED)
- ereport(FATAL,
- (errcode(ERRCODE_ADMIN_SHUTDOWN),
- errmsg("postmaster exited during a parallel transaction")));
-
- /* Release memory. */
- pfree(pcxt->worker[i].bgwhandle);
- pcxt->worker[i].bgwhandle = NULL;
- }
+ /*
+ * We can't finish transaction commit or abort until all of the
+ * workers have exited. This means, in particular, that we can't respond
+ * to interrupts at this stage.
+ */
+ HOLD_INTERRUPTS();
+ WaitForParallelWorkersToExit(pcxt);
+ RESUME_INTERRUPTS();
/* Free the worker array itself. */
if (pcxt->worker != NULL)
@@ -799,9 +810,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
case 'X': /* Terminate, indicating clean exit */
{
- pfree(pcxt->worker[i].bgwhandle);
pfree(pcxt->worker[i].error_mqh);
- pcxt->worker[i].bgwhandle = NULL;
pcxt->worker[i].error_mqh = NULL;
break;
}
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index efcbaef416c..99a9de3cdc3 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -84,7 +84,8 @@ static bool ExecParallelEstimate(PlanState *node,
ExecParallelEstimateContext *e);
static bool ExecParallelInitializeDSM(PlanState *node,
ExecParallelInitializeDSMContext *d);
-static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt);
+static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
+ bool reinitialize);
static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
SharedExecutorInstrumentation *instrumentation);
@@ -217,7 +218,7 @@ ExecParallelInitializeDSM(PlanState *planstate,
* to the main backend and start the workers.
*/
static shm_mq_handle **
-ExecParallelSetupTupleQueues(ParallelContext *pcxt)
+ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
{
shm_mq_handle **responseq;
char *tqueuespace;
@@ -231,9 +232,16 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt)
responseq = (shm_mq_handle **)
palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
- /* Allocate space from the DSM for the queues themselves. */
- tqueuespace = shm_toc_allocate(pcxt->toc,
- PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers);
+ /*
+ * If not reinitializing, allocate space from the DSM for the queues;
+ * otherwise, find the already allocated space.
+ */
+ if (!reinitialize)
+ tqueuespace =
+ shm_toc_allocate(pcxt->toc,
+ PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers);
+ else
+ tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE);
/* Create the queues, and become the receiver for each. */
for (i = 0; i < pcxt->nworkers; ++i)
@@ -248,13 +256,24 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt)
}
/* Add array of queues to shm_toc, so others can find it. */
- shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
+ if (!reinitialize)
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
/* Return array of handles. */
return responseq;
}
/*
+ * Re-initialize the response queues for backend workers to return tuples
+ * to the main backend and start the workers.
+ */
+shm_mq_handle **
+ExecParallelReinitializeTupleQueues(ParallelContext *pcxt)
+{
+ return ExecParallelSetupTupleQueues(pcxt, true);
+}
+
+/*
* Sets up the required infrastructure for backend workers to perform
* execution and return results to the main backend.
*/
@@ -363,7 +382,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
pei->buffer_usage = bufusage_space;
/* Set up tuple queues. */
- pei->tqueue = ExecParallelSetupTupleQueues(pcxt);
+ pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
/*
* If instrumentation options were supplied, allocate space for the
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 9c1533e3113..5f589614dc2 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -41,6 +41,7 @@
static TupleTableSlot *gather_getnext(GatherState *gatherstate);
+static void ExecShutdownGatherWorkers(GatherState *node);
/* ----------------------------------------------------------------
@@ -150,9 +151,10 @@ ExecGather(GatherState *node)
bool got_any_worker = false;
/* Initialize the workers required to execute Gather node. */
- node->pei = ExecInitParallelPlan(node->ps.lefttree,
- estate,
- gather->num_workers);
+ if (!node->pei)
+ node->pei = ExecInitParallelPlan(node->ps.lefttree,
+ estate,
+ gather->num_workers);
/*
* Register backend workers. We might not get as many as we
@@ -279,7 +281,7 @@ gather_getnext(GatherState *gatherstate)
gatherstate->need_to_scan_locally,
&done);
if (done)
- ExecShutdownGather(gatherstate);
+ ExecShutdownGatherWorkers(gatherstate);
if (HeapTupleIsValid(tup))
{
@@ -308,15 +310,15 @@ gather_getnext(GatherState *gatherstate)
}
/* ----------------------------------------------------------------
- * ExecShutdownGather
+ * ExecShutdownGatherWorkers
*
- * Destroy the setup for parallel workers. Collect all the
- * stats after workers are stopped, else some work done by
- * workers won't be accounted.
+ * Destroy the parallel workers. Collect all the stats after
+ * workers are stopped, else some work done by workers won't be
+ * accounted.
* ----------------------------------------------------------------
*/
void
-ExecShutdownGather(GatherState *node)
+ExecShutdownGatherWorkers(GatherState *node)
{
/* Shut down tuple queue funnel before shutting down workers. */
if (node->funnel != NULL)
@@ -327,8 +329,25 @@ ExecShutdownGather(GatherState *node)
/* Now shut down the workers. */
if (node->pei != NULL)
- {
ExecParallelFinish(node->pei);
+}
+
+/* ----------------------------------------------------------------
+ * ExecShutdownGather
+ *
+ * Destroy the setup for parallel workers including parallel context.
+ * Collect all the stats after workers are stopped, else some work
+ * done by workers won't be accounted.
+ * ----------------------------------------------------------------
+ */
+void
+ExecShutdownGather(GatherState *node)
+{
+ ExecShutdownGatherWorkers(node);
+
+ /* Now destroy the parallel context. */
+ if (node->pei != NULL)
+ {
ExecParallelCleanup(node->pei);
node->pei = NULL;
}
@@ -349,14 +368,21 @@ void
ExecReScanGather(GatherState *node)
{
/*
- * Re-initialize the parallel context and workers to perform rescan of
- * relation. We want to gracefully shutdown all the workers so that they
+ * Re-initialize the parallel workers to perform rescan of relation.
+ * We want to gracefully shutdown all the workers so that they
* should be able to propagate any error or other information to master
- * backend before dying.
+ * backend before dying. Parallel context will be reused for rescan.
*/
- ExecShutdownGather(node);
+ ExecShutdownGatherWorkers(node);
node->initialized = false;
+ if (node->pei)
+ {
+ ReinitializeParallelDSM(node->pei->pcxt);
+ node->pei->tqueue =
+ ExecParallelReinitializeTupleQueues(node->pei->pcxt);
+ }
+
ExecReScan(node->ps.lefttree);
}
diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h
index d4b7c5dd75b..411db7964db 100644
--- a/src/include/access/parallel.h
+++ b/src/include/access/parallel.h
@@ -56,6 +56,7 @@ extern bool InitializingParallelWorker;
extern ParallelContext *CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers);
extern ParallelContext *CreateParallelContextForExternalFunction(char *library_name, char *function_name, int nworkers);
extern void InitializeParallelDSM(ParallelContext *);
+extern void ReinitializeParallelDSM(ParallelContext *pcxt);
extern void LaunchParallelWorkers(ParallelContext *);
extern void WaitForParallelWorkersToFinish(ParallelContext *);
extern void DestroyParallelContext(ParallelContext *);
diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h
index 505500e76b5..23c29ebb902 100644
--- a/src/include/executor/execParallel.h
+++ b/src/include/executor/execParallel.h
@@ -33,5 +33,6 @@ extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
EState *estate, int nworkers);
extern void ExecParallelFinish(ParallelExecutorInfo *pei);
extern void ExecParallelCleanup(ParallelExecutorInfo *pei);
+extern shm_mq_handle **ExecParallelReinitializeTupleQueues(ParallelContext *pcxt);
#endif /* EXECPARALLEL_H */