aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/parallel.c
diff options
context:
space:
mode:
authorRobert Haas <rhaas@postgresql.org>2015-10-30 10:43:00 +0100
committerRobert Haas <rhaas@postgresql.org>2015-10-30 10:44:54 +0100
commit3a1f8611f2582df0a16bcd35caed2e1526387643 (patch)
treea246c057e24fd1a8870194a1d1ff3bf3e15ebb58 /src/backend/access/transam/parallel.c
parentc6baec92fc48387da8164d50f5699a7162267718 (diff)
downloadpostgresql-3a1f8611f2582df0a16bcd35caed2e1526387643.tar.gz
postgresql-3a1f8611f2582df0a16bcd35caed2e1526387643.zip
Update parallel executor support to reuse the same DSM.
Commit b0b0d84b3d663a148022e900ebfc164284a95f55 purported to make it possible to relaunch workers using the same parallel context, but it had an unpleasant race condition: we might reinitialize after the workers have sent their last control message but before they have dettached the DSM, leaving to crashes. Repair by introducing a new ParallelContext operation, ReinitializeParallelDSM. Adjust execParallel.c to use this new support, so that we can rescan a Gather node by relaunching workers but without needing to recreate the DSM. Amit Kapila, with some adjustments by me. Extracted from latest parallel sequential scan patch.
Diffstat (limited to 'src/backend/access/transam/parallel.c')
-rw-r--r--src/backend/access/transam/parallel.c175
1 files changed, 92 insertions, 83 deletions
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 35a873de6ba..79cc9880bbb 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -110,6 +110,7 @@ static void HandleParallelMessage(ParallelContext *, int, StringInfo msg);
static void ParallelErrorContext(void *arg);
static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc);
static void ParallelWorkerMain(Datum main_arg);
+static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
/*
* Establish a new parallel context. This should be done after entering
@@ -384,6 +385,46 @@ InitializeParallelDSM(ParallelContext *pcxt)
}
/*
+ * Reinitialize the dynamic shared memory segment for a parallel context such
+ * that we could launch workers for it again.
+ */
+void
+ReinitializeParallelDSM(ParallelContext *pcxt)
+{
+ FixedParallelState *fps;
+ char *error_queue_space;
+ int i;
+
+ if (pcxt->nworkers_launched == 0)
+ return;
+
+ WaitForParallelWorkersToFinish(pcxt);
+ WaitForParallelWorkersToExit(pcxt);
+
+ /* Reset a few bits of fixed parallel state to a clean state. */
+ fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED);
+ fps->workers_attached = 0;
+ fps->last_xlog_end = 0;
+
+ /* Recreate error queues. */
+ error_queue_space =
+ shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE);
+ for (i = 0; i < pcxt->nworkers; ++i)
+ {
+ char *start;
+ shm_mq *mq;
+
+ start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
+ mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
+ shm_mq_set_receiver(mq, MyProc);
+ pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
+ }
+
+ /* Reset number of workers launched. */
+ pcxt->nworkers_launched = 0;
+}
+
+/*
* Launch parallel workers.
*/
void
@@ -404,52 +445,6 @@ LaunchParallelWorkers(ParallelContext *pcxt)
/* We might be running in a short-lived memory context. */
oldcontext = MemoryContextSwitchTo(TopTransactionContext);
- /*
- * This function can be called for a parallel context for which it has
- * already been called previously, but only if all of the old workers
- * have already exited. When this case arises, we need to do some extra
- * reinitialization.
- */
- if (pcxt->nworkers_launched > 0)
- {
- FixedParallelState *fps;
- char *error_queue_space;
-
- /* Clean out old worker handles. */
- for (i = 0; i < pcxt->nworkers; ++i)
- {
- if (pcxt->worker[i].error_mqh != NULL)
- elog(ERROR, "previously launched worker still alive");
- if (pcxt->worker[i].bgwhandle != NULL)
- {
- pfree(pcxt->worker[i].bgwhandle);
- pcxt->worker[i].bgwhandle = NULL;
- }
- }
-
- /* Reset a few bits of fixed parallel state to a clean state. */
- fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED);
- fps->workers_attached = 0;
- fps->last_xlog_end = 0;
-
- /* Recreate error queues. */
- error_queue_space =
- shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE);
- for (i = 0; i < pcxt->nworkers; ++i)
- {
- char *start;
- shm_mq *mq;
-
- start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
- mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
- shm_mq_set_receiver(mq, MyProc);
- pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
- }
-
- /* Reset number of workers launched. */
- pcxt->nworkers_launched = 0;
- }
-
/* Configure a worker. */
snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
MyProcPid);
@@ -501,7 +496,7 @@ LaunchParallelWorkers(ParallelContext *pcxt)
}
/*
- * Wait for all workers to exit.
+ * Wait for all workers to finish computing.
*
* Even if the parallel operation seems to have completed successfully, it's
* important to call this function afterwards. We must not miss any errors
@@ -553,6 +548,46 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
}
/*
+ * Wait for all workers to exit.
+ *
+ * This function ensures that workers have been completely shutdown. The
+ * difference between WaitForParallelWorkersToFinish and this function is
+ * that former just ensures that last message sent by worker backend is
+ * received by master backend whereas this ensures the complete shutdown.
+ */
+static void
+WaitForParallelWorkersToExit(ParallelContext *pcxt)
+{
+ int i;
+
+ /* Wait until the workers actually die. */
+ for (i = 0; i < pcxt->nworkers; ++i)
+ {
+ BgwHandleStatus status;
+
+ if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
+ continue;
+
+ status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
+
+ /*
+ * If the postmaster kicked the bucket, we have no chance of cleaning
+ * up safely -- we won't be able to tell when our workers are actually
+ * dead. This doesn't necessitate a PANIC since they will all abort
+ * eventually, but we can't safely continue this session.
+ */
+ if (status == BGWH_POSTMASTER_DIED)
+ ereport(FATAL,
+ (errcode(ERRCODE_ADMIN_SHUTDOWN),
+ errmsg("postmaster exited during a parallel transaction")));
+
+ /* Release memory. */
+ pfree(pcxt->worker[i].bgwhandle);
+ pcxt->worker[i].bgwhandle = NULL;
+ }
+}
+
+/*
* Destroy a parallel context.
*
* If expecting a clean exit, you should use WaitForParallelWorkersToFinish()
@@ -578,10 +613,10 @@ DestroyParallelContext(ParallelContext *pcxt)
{
for (i = 0; i < pcxt->nworkers; ++i)
{
- if (pcxt->worker[i].bgwhandle != NULL)
- TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
if (pcxt->worker[i].error_mqh != NULL)
{
+ TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
+
pfree(pcxt->worker[i].error_mqh);
pcxt->worker[i].error_mqh = NULL;
}
@@ -609,38 +644,14 @@ DestroyParallelContext(ParallelContext *pcxt)
pcxt->private_memory = NULL;
}
- /* Wait until the workers actually die. */
- for (i = 0; i < pcxt->nworkers; ++i)
- {
- BgwHandleStatus status;
-
- if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
- continue;
-
- /*
- * We can't finish transaction commit or abort until all of the
- * workers are dead. This means, in particular, that we can't respond
- * to interrupts at this stage.
- */
- HOLD_INTERRUPTS();
- status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
- RESUME_INTERRUPTS();
-
- /*
- * If the postmaster kicked the bucket, we have no chance of cleaning
- * up safely -- we won't be able to tell when our workers are actually
- * dead. This doesn't necessitate a PANIC since they will all abort
- * eventually, but we can't safely continue this session.
- */
- if (status == BGWH_POSTMASTER_DIED)
- ereport(FATAL,
- (errcode(ERRCODE_ADMIN_SHUTDOWN),
- errmsg("postmaster exited during a parallel transaction")));
-
- /* Release memory. */
- pfree(pcxt->worker[i].bgwhandle);
- pcxt->worker[i].bgwhandle = NULL;
- }
+ /*
+ * We can't finish transaction commit or abort until all of the
+ * workers have exited. This means, in particular, that we can't respond
+ * to interrupts at this stage.
+ */
+ HOLD_INTERRUPTS();
+ WaitForParallelWorkersToExit(pcxt);
+ RESUME_INTERRUPTS();
/* Free the worker array itself. */
if (pcxt->worker != NULL)
@@ -799,9 +810,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
case 'X': /* Terminate, indicating clean exit */
{
- pfree(pcxt->worker[i].bgwhandle);
pfree(pcxt->worker[i].error_mqh);
- pcxt->worker[i].bgwhandle = NULL;
pcxt->worker[i].error_mqh = NULL;
break;
}