aboutsummaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
authorRobert Haas <rhaas@postgresql.org>2015-10-16 17:18:05 -0400
committerRobert Haas <rhaas@postgresql.org>2015-10-16 17:18:05 -0400
commitb0b0d84b3d663a148022e900ebfc164284a95f55 (patch)
tree4bf51bc140d3a8a0a094e1353c57c40bfa4315ca /src/backend
parentafdfcd3f7617c9b7be5966d66ddabdc2e92eb99b (diff)
downloadpostgresql-b0b0d84b3d663a148022e900ebfc164284a95f55.tar.gz
postgresql-b0b0d84b3d663a148022e900ebfc164284a95f55.zip
Allow a parallel context to relaunch workers.
This may allow some callers to avoid the overhead involved in tearing down a parallel context and then setting up a new one, which means releasing the DSM and then allocating and populating a new one. I suspect we'll want to revise the Gather node to make use of this new capability, but even if not it may be useful elsewhere and requires very little additional code.
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/access/transam/README.parallel5
-rw-r--r--src/backend/access/transam/parallel.c49
2 files changed, 54 insertions, 0 deletions
diff --git a/src/backend/access/transam/README.parallel b/src/backend/access/transam/README.parallel
index 10051863fed..dfcbafabf08 100644
--- a/src/backend/access/transam/README.parallel
+++ b/src/backend/access/transam/README.parallel
@@ -221,3 +221,8 @@ pattern looks like this:
DestroyParallelContext(pcxt);
ExitParallelMode();
+
+If desired, after WaitForParallelWorkersToFinish() has been called, another
+call to LaunchParallelWorkers() can be made using the same parallel context.
+Calls to these two functions can be alternated any number of times before
+destroying the parallel context.
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 17f9a5ae6e4..0085987f324 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -404,6 +404,52 @@ LaunchParallelWorkers(ParallelContext *pcxt)
/* We might be running in a short-lived memory context. */
oldcontext = MemoryContextSwitchTo(TopTransactionContext);
+ /*
+ * This function can be called for a parallel context for which it has
+ * already been called previously, but only if all of the old workers
+ * have already exited. When this case arises, we need to do some extra
+ * reinitialization.
+ */
+ if (pcxt->nworkers_launched > 0)
+ {
+ FixedParallelState *fps;
+ char *error_queue_space;
+
+ /* Clean out old worker handles. */
+ for (i = 0; i < pcxt->nworkers; ++i)
+ {
+ if (pcxt->worker[i].error_mqh != NULL)
+ elog(ERROR, "previously launched worker still alive");
+ if (pcxt->worker[i].bgwhandle != NULL)
+ {
+ pfree(pcxt->worker[i].bgwhandle);
+ pcxt->worker[i].bgwhandle = NULL;
+ }
+ }
+
+ /* Reset a few bits of fixed parallel state to a clean state. */
+ fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED);
+ fps->workers_attached = 0;
+ fps->last_xlog_end = 0;
+
+ /* Recreate error queues. */
+ error_queue_space =
+ shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE);
+ for (i = 0; i < pcxt->nworkers; ++i)
+ {
+ char *start;
+ shm_mq *mq;
+
+ start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
+ mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
+ shm_mq_set_receiver(mq, MyProc);
+ pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
+ }
+
+ /* Reset number of workers launched. */
+ pcxt->nworkers_launched = 0;
+ }
+
/* Configure a worker. */
snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
MyProcPid);
@@ -428,8 +474,11 @@ LaunchParallelWorkers(ParallelContext *pcxt)
if (!any_registrations_failed &&
RegisterDynamicBackgroundWorker(&worker,
&pcxt->worker[i].bgwhandle))
+ {
shm_mq_set_handle(pcxt->worker[i].error_mqh,
pcxt->worker[i].bgwhandle);
+ pcxt->nworkers_launched++;
+ }
else
{
/*