aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/parallel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/parallel.c')
-rw-r--r--src/backend/access/transam/parallel.c22
1 files changed, 11 insertions, 11 deletions
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index ab5ef2573cf..9325b628da3 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -191,8 +191,8 @@ CreateParallelContextForExternalFunction(char *library_name,
/*
* Establish the dynamic shared memory segment for a parallel context and
- * copied state and other bookkeeping information that will need by parallel
- * workers into it.
+ * copy state and other bookkeeping information that will be needed by
+ * parallel workers into it.
*/
void
InitializeParallelDSM(ParallelContext *pcxt)
@@ -271,7 +271,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
* parallelism than to fail outright.
*/
segsize = shm_toc_estimate(&pcxt->estimator);
- if (pcxt->nworkers != 0)
+ if (pcxt->nworkers > 0)
pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
if (pcxt->seg != NULL)
pcxt->toc = shm_toc_create(PARALLEL_MAGIC,
@@ -397,11 +397,13 @@ ReinitializeParallelDSM(ParallelContext *pcxt)
char *error_queue_space;
int i;
- if (pcxt->nworkers_launched == 0)
- return;
-
- WaitForParallelWorkersToFinish(pcxt);
- WaitForParallelWorkersToExit(pcxt);
+ /* Wait for any old workers to exit. */
+ if (pcxt->nworkers_launched > 0)
+ {
+ WaitForParallelWorkersToFinish(pcxt);
+ WaitForParallelWorkersToExit(pcxt);
+ pcxt->nworkers_launched = 0;
+ }
/* Reset a few bits of fixed parallel state to a clean state. */
fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED);
@@ -420,9 +422,6 @@ ReinitializeParallelDSM(ParallelContext *pcxt)
shm_mq_set_receiver(mq, MyProc);
pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
}
-
- /* Reset number of workers launched. */
- pcxt->nworkers_launched = 0;
}
/*
@@ -493,6 +492,7 @@ LaunchParallelWorkers(ParallelContext *pcxt)
*/
any_registrations_failed = true;
pcxt->worker[i].bgwhandle = NULL;
+ pfree(pcxt->worker[i].error_mqh);
pcxt->worker[i].error_mqh = NULL;
}
}