diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2016-08-02 16:09:09 -0400 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2016-08-02 16:09:09 -0400 |
commit | 93ac14efb465d3160a77b5f75dad8e4721cee41a (patch) | |
tree | 2369d99588738b1171557163ea4fdefbbe206246 /src/backend/access/transam/parallel.c | |
parent | 89c30d1133be5ba4da6098da2ee12114e527f03b (diff) | |
download | postgresql-93ac14efb465d3160a77b5f75dad8e4721cee41a.tar.gz postgresql-93ac14efb465d3160a77b5f75dad8e4721cee41a.zip |
Sync 9.5 version of access/transam/parallel.c with HEAD.
This back-patches commit a5fe473ad (notably, marking ParallelMessagePending
as volatile, which is not particularly optional). I also back-patched some
previous cosmetic changes to remove unnecessary diffs between the two
branches. I'm unsure how much of this code is actually reachable in 9.5,
but to the extent that it is reachable, it needs to be maintained, and
minimizing cross-branch diffs will make that easier.
Diffstat (limited to 'src/backend/access/transam/parallel.c')
-rw-r--r-- | src/backend/access/transam/parallel.c | 41 |
1 files changed, 21 insertions, 20 deletions
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 1e0eb10c860..9324e52fd3c 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -14,9 +14,9 @@ #include "postgres.h" +#include "access/parallel.h" #include "access/xact.h" #include "access/xlog.h" -#include "access/parallel.h" #include "commands/async.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" @@ -33,6 +33,7 @@ #include "utils/resowner.h" #include "utils/snapmgr.h" + /* * We don't want to waste a lot of memory on an error queue which, most of * the time, will process only a handful of small messages. However, it is @@ -90,7 +91,7 @@ typedef struct FixedParallelState int ParallelWorkerNumber = -1; /* Is there a parallel message pending which we need to receive? */ -bool ParallelMessagePending = false; +volatile bool ParallelMessagePending = false; /* Are we initializing a parallel worker? */ bool InitializingParallelWorker = false; @@ -102,11 +103,12 @@ static FixedParallelState *MyFixedParallelState; static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list); /* Private functions. */ -static void HandleParallelMessage(ParallelContext *, int, StringInfo msg); +static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg); static void ParallelErrorContext(void *arg); static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc); static void ParallelWorkerMain(Datum main_arg); + /* * Establish a new parallel context. This should be done after entering * parallel mode, and (unless there is an error) the context should be @@ -178,8 +180,8 @@ CreateParallelContextForExternalFunction(char *library_name, /* * Establish the dynamic shared memory segment for a parallel context and - * copied state and other bookkeeping information that will need by parallel - * workers into it. + * copy state and other bookkeeping information that will be needed by + * parallel workers into it. */ void InitializeParallelDSM(ParallelContext *pcxt) @@ -231,7 +233,8 @@ InitializeParallelDSM(ParallelContext *pcxt) PARALLEL_ERROR_QUEUE_SIZE, "parallel error queue size not buffer-aligned"); shm_toc_estimate_chunk(&pcxt->estimator, - PARALLEL_ERROR_QUEUE_SIZE * pcxt->nworkers); + mul_size(PARALLEL_ERROR_QUEUE_SIZE, + pcxt->nworkers)); shm_toc_estimate_keys(&pcxt->estimator, 1); /* Estimate how much we'll need for extension entrypoint info. */ @@ -257,7 +260,7 @@ InitializeParallelDSM(ParallelContext *pcxt) * parallelism than to fail outright. */ segsize = shm_toc_estimate(&pcxt->estimator); - if (pcxt->nworkers != 0) + if (pcxt->nworkers > 0) pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS); if (pcxt->seg != NULL) pcxt->toc = shm_toc_create(PARALLEL_MAGIC, @@ -337,7 +340,8 @@ InitializeParallelDSM(ParallelContext *pcxt) */ error_queue_space = shm_toc_allocate(pcxt->toc, - PARALLEL_ERROR_QUEUE_SIZE * pcxt->nworkers); + mul_size(PARALLEL_ERROR_QUEUE_SIZE, + pcxt->nworkers)); for (i = 0; i < pcxt->nworkers; ++i) { char *start; @@ -603,17 +607,17 @@ ParallelContextActive(void) /* * Handle receipt of an interrupt indicating a parallel worker message. + * + * Note: this is called within a signal handler! All we can do is set + * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke + * HandleParallelMessages(). */ void HandleParallelMessageInterrupt(void) { - int save_errno = errno; - InterruptPending = true; ParallelMessagePending = true; SetLatch(MyLatch); - - errno = save_errno; } /* @@ -664,11 +668,8 @@ HandleParallelMessages(void) } else ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), /* XXX: wrong errcode? */ - errmsg("lost connection to parallel worker"))); - - /* This might make the error queue go away. */ - CHECK_FOR_INTERRUPTS(); + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("lost connection to parallel worker"))); } } } @@ -714,7 +715,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg) errctx.previous = pcxt->error_context_stack; error_context_stack = &errctx; - /* Parse ErrorReponse or NoticeResponse. */ + /* Parse ErrorResponse or NoticeResponse. */ pq_parse_errornotice(msg, &edata); /* Death of a worker isn't enough justification for suicide. */ @@ -747,7 +748,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg) default: { - elog(ERROR, "unknown message type: %c (%d bytes)", + elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)", msgtype, msg->len); } } @@ -847,7 +848,7 @@ ParallelWorkerMain(Datum main_arg) if (toc == NULL) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("invalid magic number in dynamic shared memory segment"))); + errmsg("invalid magic number in dynamic shared memory segment"))); /* Look up fixed parallel state. */ fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED); |