aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/parallel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/parallel.c')
-rw-r--r--src/backend/access/transam/parallel.c41
1 files changed, 21 insertions, 20 deletions
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 1e0eb10c860..9324e52fd3c 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -14,9 +14,9 @@
#include "postgres.h"
+#include "access/parallel.h"
#include "access/xact.h"
#include "access/xlog.h"
-#include "access/parallel.h"
#include "commands/async.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
@@ -33,6 +33,7 @@
#include "utils/resowner.h"
#include "utils/snapmgr.h"
+
/*
* We don't want to waste a lot of memory on an error queue which, most of
* the time, will process only a handful of small messages. However, it is
@@ -90,7 +91,7 @@ typedef struct FixedParallelState
int ParallelWorkerNumber = -1;
/* Is there a parallel message pending which we need to receive? */
-bool ParallelMessagePending = false;
+volatile bool ParallelMessagePending = false;
/* Are we initializing a parallel worker? */
bool InitializingParallelWorker = false;
@@ -102,11 +103,12 @@ static FixedParallelState *MyFixedParallelState;
static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
/* Private functions. */
-static void HandleParallelMessage(ParallelContext *, int, StringInfo msg);
+static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
static void ParallelErrorContext(void *arg);
static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc);
static void ParallelWorkerMain(Datum main_arg);
+
/*
* Establish a new parallel context. This should be done after entering
* parallel mode, and (unless there is an error) the context should be
@@ -178,8 +180,8 @@ CreateParallelContextForExternalFunction(char *library_name,
/*
* Establish the dynamic shared memory segment for a parallel context and
- * copied state and other bookkeeping information that will need by parallel
- * workers into it.
+ * copy state and other bookkeeping information that will be needed by
+ * parallel workers into it.
*/
void
InitializeParallelDSM(ParallelContext *pcxt)
@@ -231,7 +233,8 @@ InitializeParallelDSM(ParallelContext *pcxt)
PARALLEL_ERROR_QUEUE_SIZE,
"parallel error queue size not buffer-aligned");
shm_toc_estimate_chunk(&pcxt->estimator,
- PARALLEL_ERROR_QUEUE_SIZE * pcxt->nworkers);
+ mul_size(PARALLEL_ERROR_QUEUE_SIZE,
+ pcxt->nworkers));
shm_toc_estimate_keys(&pcxt->estimator, 1);
/* Estimate how much we'll need for extension entrypoint info. */
@@ -257,7 +260,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
* parallelism than to fail outright.
*/
segsize = shm_toc_estimate(&pcxt->estimator);
- if (pcxt->nworkers != 0)
+ if (pcxt->nworkers > 0)
pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
if (pcxt->seg != NULL)
pcxt->toc = shm_toc_create(PARALLEL_MAGIC,
@@ -337,7 +340,8 @@ InitializeParallelDSM(ParallelContext *pcxt)
*/
error_queue_space =
shm_toc_allocate(pcxt->toc,
- PARALLEL_ERROR_QUEUE_SIZE * pcxt->nworkers);
+ mul_size(PARALLEL_ERROR_QUEUE_SIZE,
+ pcxt->nworkers));
for (i = 0; i < pcxt->nworkers; ++i)
{
char *start;
@@ -603,17 +607,17 @@ ParallelContextActive(void)
/*
* Handle receipt of an interrupt indicating a parallel worker message.
+ *
+ * Note: this is called within a signal handler! All we can do is set
+ * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
+ * HandleParallelMessages().
*/
void
HandleParallelMessageInterrupt(void)
{
- int save_errno = errno;
-
InterruptPending = true;
ParallelMessagePending = true;
SetLatch(MyLatch);
-
- errno = save_errno;
}
/*
@@ -664,11 +668,8 @@ HandleParallelMessages(void)
}
else
ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR), /* XXX: wrong errcode? */
- errmsg("lost connection to parallel worker")));
-
- /* This might make the error queue go away. */
- CHECK_FOR_INTERRUPTS();
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("lost connection to parallel worker")));
}
}
}
@@ -714,7 +715,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
errctx.previous = pcxt->error_context_stack;
error_context_stack = &errctx;
- /* Parse ErrorReponse or NoticeResponse. */
+ /* Parse ErrorResponse or NoticeResponse. */
pq_parse_errornotice(msg, &edata);
/* Death of a worker isn't enough justification for suicide. */
@@ -747,7 +748,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
default:
{
- elog(ERROR, "unknown message type: %c (%d bytes)",
+ elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)",
msgtype, msg->len);
}
}
@@ -847,7 +848,7 @@ ParallelWorkerMain(Datum main_arg)
if (toc == NULL)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("invalid magic number in dynamic shared memory segment")));
+ errmsg("invalid magic number in dynamic shared memory segment")));
/* Look up fixed parallel state. */
fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED);