aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/parallel.c
diff options
context:
space:
mode:
authorRobert Haas <rhaas@postgresql.org>2015-11-05 12:05:38 -0500
committerRobert Haas <rhaas@postgresql.org>2015-11-05 12:13:56 -0500
commit64b2e7ad917a9a7814904d0f6dbde52cefbcfa00 (patch)
treeb46b16b54a6ca4e6bf0841ee93b7c1a4acaf43b4 /src/backend/access/transam/parallel.c
parent59464bd6f928ad0da30502cbe9b54baec9ca2c69 (diff)
downloadpostgresql-64b2e7ad917a9a7814904d0f6dbde52cefbcfa00.tar.gz
postgresql-64b2e7ad917a9a7814904d0f6dbde52cefbcfa00.zip
Pass extra data to bgworkers, and use this to fix parallel contexts.
Up until now, the total amount of data that could be passed to a background worker at startup was one datum, which can be a small as 4 bytes on some systems. That's enough to pass a dsm_handle or an array index, but not much else. Add a bgw_extra flag to the BackgroundWorker struct, allowing up to 128 bytes to be passed to a new worker on any platform. Use this to fix a problem I recently discovered with the parallel context machinery added in 9.5: the master assigns each worker an array index, and each worker subsequently assigns itself an array index, and there's nothing to guarantee that the two sets of indexes match, leading to chaos. Normally, I would not back-patch the change to add bgw_extra, since it is basically a feature addition. However, since 9.5 is still in beta and there seems to be no other sensible way to repair the broken parallel context machinery, back-patch to 9.5. Existing background worker code can ignore the bgw_extra field without a problem, but might need to be recompiled since the structure size has changed. Report and patch by me. Review by Amit Kapila.
Diffstat (limited to 'src/backend/access/transam/parallel.c')
-rw-r--r--src/backend/access/transam/parallel.c24
1 files changed, 7 insertions, 17 deletions
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 79cc9880bbb..5edaaf4bd2a 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -77,10 +77,6 @@ typedef struct FixedParallelState
/* Mutex protects remaining fields. */
slock_t mutex;
- /* Track whether workers have attached. */
- int workers_expected;
- int workers_attached;
-
/* Maximum XactLastRecEnd of any worker. */
XLogRecPtr last_xlog_end;
} FixedParallelState;
@@ -295,8 +291,6 @@ InitializeParallelDSM(ParallelContext *pcxt)
fps->parallel_master_backend_id = MyBackendId;
fps->entrypoint = pcxt->entrypoint;
SpinLockInit(&fps->mutex);
- fps->workers_expected = pcxt->nworkers;
- fps->workers_attached = 0;
fps->last_xlog_end = 0;
shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
@@ -403,7 +397,6 @@ ReinitializeParallelDSM(ParallelContext *pcxt)
/* Reset a few bits of fixed parallel state to a clean state. */
fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED);
- fps->workers_attached = 0;
fps->last_xlog_end = 0;
/* Recreate error queues. */
@@ -455,6 +448,7 @@ LaunchParallelWorkers(ParallelContext *pcxt)
worker.bgw_main = ParallelWorkerMain;
worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg));
worker.bgw_notify_pid = MyProcPid;
+ memset(&worker.bgw_extra, 0, BGW_EXTRALEN);
/*
* Start workers.
@@ -466,6 +460,7 @@ LaunchParallelWorkers(ParallelContext *pcxt)
*/
for (i = 0; i < pcxt->nworkers; ++i)
{
+ memcpy(worker.bgw_extra, &i, sizeof(int));
if (!any_registrations_failed &&
RegisterDynamicBackgroundWorker(&worker,
&pcxt->worker[i].bgwhandle))
@@ -891,6 +886,10 @@ ParallelWorkerMain(Datum main_arg)
pqsignal(SIGTERM, die);
BackgroundWorkerUnblockSignals();
+ /* Determine and set our parallel worker number. */
+ Assert(ParallelWorkerNumber == -1);
+ memcpy(&ParallelWorkerNumber, MyBgworkerEntry->bgw_extra, sizeof(int));
+
/* Set up a memory context and resource owner. */
Assert(CurrentResourceOwner == NULL);
CurrentResourceOwner = ResourceOwnerCreate(NULL, "parallel toplevel");
@@ -915,18 +914,9 @@ ParallelWorkerMain(Datum main_arg)
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("bad magic number in dynamic shared memory segment")));
- /* Determine and set our worker number. */
+ /* Look up fixed parallel state. */
fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED);
Assert(fps != NULL);
- Assert(ParallelWorkerNumber == -1);
- SpinLockAcquire(&fps->mutex);
- if (fps->workers_attached < fps->workers_expected)
- ParallelWorkerNumber = fps->workers_attached++;
- SpinLockRelease(&fps->mutex);
- if (ParallelWorkerNumber < 0)
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("too many parallel workers already attached")));
MyFixedParallelState = fps;
/*