diff options
Diffstat (limited to 'src/backend/access/transam/parallel.c')
-rw-r--r-- | src/backend/access/transam/parallel.c | 41 |
1 files changed, 21 insertions, 20 deletions
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 1e0eb10c860..9324e52fd3c 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -14,9 +14,9 @@ #include "postgres.h" +#include "access/parallel.h" #include "access/xact.h" #include "access/xlog.h" -#include "access/parallel.h" #include "commands/async.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" @@ -33,6 +33,7 @@ #include "utils/resowner.h" #include "utils/snapmgr.h" + /* * We don't want to waste a lot of memory on an error queue which, most of * the time, will process only a handful of small messages. However, it is @@ -90,7 +91,7 @@ typedef struct FixedParallelState int ParallelWorkerNumber = -1; /* Is there a parallel message pending which we need to receive? */ -bool ParallelMessagePending = false; +volatile bool ParallelMessagePending = false; /* Are we initializing a parallel worker? */ bool InitializingParallelWorker = false; @@ -102,11 +103,12 @@ static FixedParallelState *MyFixedParallelState; static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list); /* Private functions. */ -static void HandleParallelMessage(ParallelContext *, int, StringInfo msg); +static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg); static void ParallelErrorContext(void *arg); static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc); static void ParallelWorkerMain(Datum main_arg); + /* * Establish a new parallel context. This should be done after entering * parallel mode, and (unless there is an error) the context should be @@ -178,8 +180,8 @@ CreateParallelContextForExternalFunction(char *library_name, /* * Establish the dynamic shared memory segment for a parallel context and - * copied state and other bookkeeping information that will need by parallel - * workers into it. + * copy state and other bookkeeping information that will be needed by + * parallel workers into it. */ void InitializeParallelDSM(ParallelContext *pcxt) @@ -231,7 +233,8 @@ InitializeParallelDSM(ParallelContext *pcxt) PARALLEL_ERROR_QUEUE_SIZE, "parallel error queue size not buffer-aligned"); shm_toc_estimate_chunk(&pcxt->estimator, - PARALLEL_ERROR_QUEUE_SIZE * pcxt->nworkers); + mul_size(PARALLEL_ERROR_QUEUE_SIZE, + pcxt->nworkers)); shm_toc_estimate_keys(&pcxt->estimator, 1); /* Estimate how much we'll need for extension entrypoint info. */ @@ -257,7 +260,7 @@ InitializeParallelDSM(ParallelContext *pcxt) * parallelism than to fail outright. */ segsize = shm_toc_estimate(&pcxt->estimator); - if (pcxt->nworkers != 0) + if (pcxt->nworkers > 0) pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS); if (pcxt->seg != NULL) pcxt->toc = shm_toc_create(PARALLEL_MAGIC, @@ -337,7 +340,8 @@ InitializeParallelDSM(ParallelContext *pcxt) */ error_queue_space = shm_toc_allocate(pcxt->toc, - PARALLEL_ERROR_QUEUE_SIZE * pcxt->nworkers); + mul_size(PARALLEL_ERROR_QUEUE_SIZE, + pcxt->nworkers)); for (i = 0; i < pcxt->nworkers; ++i) { char *start; @@ -603,17 +607,17 @@ ParallelContextActive(void) /* * Handle receipt of an interrupt indicating a parallel worker message. + * + * Note: this is called within a signal handler! All we can do is set + * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke + * HandleParallelMessages(). */ void HandleParallelMessageInterrupt(void) { - int save_errno = errno; - InterruptPending = true; ParallelMessagePending = true; SetLatch(MyLatch); - - errno = save_errno; } /* @@ -664,11 +668,8 @@ HandleParallelMessages(void) } else ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), /* XXX: wrong errcode? */ - errmsg("lost connection to parallel worker"))); - - /* This might make the error queue go away. */ - CHECK_FOR_INTERRUPTS(); + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("lost connection to parallel worker"))); } } } @@ -714,7 +715,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg) errctx.previous = pcxt->error_context_stack; error_context_stack = &errctx; - /* Parse ErrorReponse or NoticeResponse. */ + /* Parse ErrorResponse or NoticeResponse. */ pq_parse_errornotice(msg, &edata); /* Death of a worker isn't enough justification for suicide. */ @@ -747,7 +748,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg) default: { - elog(ERROR, "unknown message type: %c (%d bytes)", + elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)", msgtype, msg->len); } } @@ -847,7 +848,7 @@ ParallelWorkerMain(Datum main_arg) if (toc == NULL) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("invalid magic number in dynamic shared memory segment"))); + errmsg("invalid magic number in dynamic shared memory segment"))); /* Look up fixed parallel state. */ fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED); |