aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/parallel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/parallel.c')
-rw-r--r--src/backend/access/transam/parallel.c175
1 files changed, 92 insertions, 83 deletions
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 35a873de6ba..79cc9880bbb 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -110,6 +110,7 @@ static void HandleParallelMessage(ParallelContext *, int, StringInfo msg);
static void ParallelErrorContext(void *arg);
static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc);
static void ParallelWorkerMain(Datum main_arg);
+static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
/*
* Establish a new parallel context. This should be done after entering
@@ -384,6 +385,46 @@ InitializeParallelDSM(ParallelContext *pcxt)
}
/*
+ * Reinitialize the dynamic shared memory segment for a parallel context such
+ * that we could launch workers for it again.
+ */
+void
+ReinitializeParallelDSM(ParallelContext *pcxt)
+{
+ FixedParallelState *fps;
+ char *error_queue_space;
+ int i;
+
+ if (pcxt->nworkers_launched == 0)
+ return;
+
+ WaitForParallelWorkersToFinish(pcxt);
+ WaitForParallelWorkersToExit(pcxt);
+
+ /* Reset a few bits of fixed parallel state to a clean state. */
+ fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED);
+ fps->workers_attached = 0;
+ fps->last_xlog_end = 0;
+
+ /* Recreate error queues. */
+ error_queue_space =
+ shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE);
+ for (i = 0; i < pcxt->nworkers; ++i)
+ {
+ char *start;
+ shm_mq *mq;
+
+ start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
+ mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
+ shm_mq_set_receiver(mq, MyProc);
+ pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
+ }
+
+ /* Reset number of workers launched. */
+ pcxt->nworkers_launched = 0;
+}
+
+/*
* Launch parallel workers.
*/
void
@@ -404,52 +445,6 @@ LaunchParallelWorkers(ParallelContext *pcxt)
/* We might be running in a short-lived memory context. */
oldcontext = MemoryContextSwitchTo(TopTransactionContext);
- /*
- * This function can be called for a parallel context for which it has
- * already been called previously, but only if all of the old workers
- * have already exited. When this case arises, we need to do some extra
- * reinitialization.
- */
- if (pcxt->nworkers_launched > 0)
- {
- FixedParallelState *fps;
- char *error_queue_space;
-
- /* Clean out old worker handles. */
- for (i = 0; i < pcxt->nworkers; ++i)
- {
- if (pcxt->worker[i].error_mqh != NULL)
- elog(ERROR, "previously launched worker still alive");
- if (pcxt->worker[i].bgwhandle != NULL)
- {
- pfree(pcxt->worker[i].bgwhandle);
- pcxt->worker[i].bgwhandle = NULL;
- }
- }
-
- /* Reset a few bits of fixed parallel state to a clean state. */
- fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED);
- fps->workers_attached = 0;
- fps->last_xlog_end = 0;
-
- /* Recreate error queues. */
- error_queue_space =
- shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE);
- for (i = 0; i < pcxt->nworkers; ++i)
- {
- char *start;
- shm_mq *mq;
-
- start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
- mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
- shm_mq_set_receiver(mq, MyProc);
- pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
- }
-
- /* Reset number of workers launched. */
- pcxt->nworkers_launched = 0;
- }
-
/* Configure a worker. */
snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
MyProcPid);
@@ -501,7 +496,7 @@ LaunchParallelWorkers(ParallelContext *pcxt)
}
/*
- * Wait for all workers to exit.
+ * Wait for all workers to finish computing.
*
* Even if the parallel operation seems to have completed successfully, it's
* important to call this function afterwards. We must not miss any errors
@@ -553,6 +548,46 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
}
/*
+ * Wait for all workers to exit.
+ *
+ * This function ensures that workers have been completely shutdown. The
+ * difference between WaitForParallelWorkersToFinish and this function is
+ * that former just ensures that last message sent by worker backend is
+ * received by master backend whereas this ensures the complete shutdown.
+ */
+static void
+WaitForParallelWorkersToExit(ParallelContext *pcxt)
+{
+ int i;
+
+ /* Wait until the workers actually die. */
+ for (i = 0; i < pcxt->nworkers; ++i)
+ {
+ BgwHandleStatus status;
+
+ if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
+ continue;
+
+ status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
+
+ /*
+ * If the postmaster kicked the bucket, we have no chance of cleaning
+ * up safely -- we won't be able to tell when our workers are actually
+ * dead. This doesn't necessitate a PANIC since they will all abort
+ * eventually, but we can't safely continue this session.
+ */
+ if (status == BGWH_POSTMASTER_DIED)
+ ereport(FATAL,
+ (errcode(ERRCODE_ADMIN_SHUTDOWN),
+ errmsg("postmaster exited during a parallel transaction")));
+
+ /* Release memory. */
+ pfree(pcxt->worker[i].bgwhandle);
+ pcxt->worker[i].bgwhandle = NULL;
+ }
+}
+
+/*
* Destroy a parallel context.
*
* If expecting a clean exit, you should use WaitForParallelWorkersToFinish()
@@ -578,10 +613,10 @@ DestroyParallelContext(ParallelContext *pcxt)
{
for (i = 0; i < pcxt->nworkers; ++i)
{
- if (pcxt->worker[i].bgwhandle != NULL)
- TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
if (pcxt->worker[i].error_mqh != NULL)
{
+ TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
+
pfree(pcxt->worker[i].error_mqh);
pcxt->worker[i].error_mqh = NULL;
}
@@ -609,38 +644,14 @@ DestroyParallelContext(ParallelContext *pcxt)
pcxt->private_memory = NULL;
}
- /* Wait until the workers actually die. */
- for (i = 0; i < pcxt->nworkers; ++i)
- {
- BgwHandleStatus status;
-
- if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
- continue;
-
- /*
- * We can't finish transaction commit or abort until all of the
- * workers are dead. This means, in particular, that we can't respond
- * to interrupts at this stage.
- */
- HOLD_INTERRUPTS();
- status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
- RESUME_INTERRUPTS();
-
- /*
- * If the postmaster kicked the bucket, we have no chance of cleaning
- * up safely -- we won't be able to tell when our workers are actually
- * dead. This doesn't necessitate a PANIC since they will all abort
- * eventually, but we can't safely continue this session.
- */
- if (status == BGWH_POSTMASTER_DIED)
- ereport(FATAL,
- (errcode(ERRCODE_ADMIN_SHUTDOWN),
- errmsg("postmaster exited during a parallel transaction")));
-
- /* Release memory. */
- pfree(pcxt->worker[i].bgwhandle);
- pcxt->worker[i].bgwhandle = NULL;
- }
+ /*
+ * We can't finish transaction commit or abort until all of the
+ * workers have exited. This means, in particular, that we can't respond
+ * to interrupts at this stage.
+ */
+ HOLD_INTERRUPTS();
+ WaitForParallelWorkersToExit(pcxt);
+ RESUME_INTERRUPTS();
/* Free the worker array itself. */
if (pcxt->worker != NULL)
@@ -799,9 +810,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
case 'X': /* Terminate, indicating clean exit */
{
- pfree(pcxt->worker[i].bgwhandle);
pfree(pcxt->worker[i].error_mqh);
- pcxt->worker[i].bgwhandle = NULL;
pcxt->worker[i].error_mqh = NULL;
break;
}