aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/execParallel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor/execParallel.c')
-rw-r--r--src/backend/executor/execParallel.c18
1 files changed, 11 insertions, 7 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 6df62a7dccb..f03cd9b07b3 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -287,7 +287,8 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
if (!reinitialize)
tqueuespace =
shm_toc_allocate(pcxt->toc,
- PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers);
+ mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
+ pcxt->nworkers));
else
tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE);
@@ -296,7 +297,8 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
{
shm_mq *mq;
- mq = shm_mq_create(tqueuespace + i * PARALLEL_TUPLE_QUEUE_SIZE,
+ mq = shm_mq_create(tqueuespace +
+ ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
(Size) PARALLEL_TUPLE_QUEUE_SIZE);
shm_mq_set_receiver(mq, MyProc);
@@ -380,12 +382,12 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
* looking at pgBufferUsage, so do it unconditionally.
*/
shm_toc_estimate_chunk(&pcxt->estimator,
- sizeof(BufferUsage) * pcxt->nworkers);
+ mul_size(sizeof(BufferUsage), pcxt->nworkers));
shm_toc_estimate_keys(&pcxt->estimator, 1);
/* Estimate space for tuple queues. */
shm_toc_estimate_chunk(&pcxt->estimator,
- PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers);
+ mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
shm_toc_estimate_keys(&pcxt->estimator, 1);
/*
@@ -404,7 +406,9 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
sizeof(int) * e.nnodes;
instrumentation_len = MAXALIGN(instrumentation_len);
instrument_offset = instrumentation_len;
- instrumentation_len += sizeof(Instrumentation) * e.nnodes * nworkers;
+ instrumentation_len +=
+ mul_size(sizeof(Instrumentation),
+ mul_size(e.nnodes, nworkers));
shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
shm_toc_estimate_keys(&pcxt->estimator, 1);
}
@@ -432,7 +436,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
/* Allocate space for each worker's BufferUsage; no need to initialize. */
bufusage_space = shm_toc_allocate(pcxt->toc,
- sizeof(BufferUsage) * pcxt->nworkers);
+ mul_size(sizeof(BufferUsage), pcxt->nworkers));
shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
pei->buffer_usage = bufusage_space;
@@ -511,7 +515,7 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
InstrAggNode(planstate->instrument, &instrument[n]);
/* Also store the per-worker detail. */
- ibytes = instrumentation->num_workers * sizeof(Instrumentation);
+ ibytes = mul_size(instrumentation->num_workers, sizeof(Instrumentation));
planstate->worker_instrument =
palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
planstate->worker_instrument->num_workers = instrumentation->num_workers;