aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/execParallel.c
diff options
context:
space:
mode:
authorRobert Haas <rhaas@postgresql.org>2016-12-19 16:47:15 -0500
committerRobert Haas <rhaas@postgresql.org>2016-12-19 17:11:46 -0500
commite13029a5ce353574516c64fd1ec9c50201e705fd (patch)
tree529cb354f9f3a41fe0678d2733cf303ed62030df /src/backend/executor/execParallel.c
parent2604438472c897fbbd1568b1a8ee177ba8cdb6e3 (diff)
downloadpostgresql-e13029a5ce353574516c64fd1ec9c50201e705fd.tar.gz
postgresql-e13029a5ce353574516c64fd1ec9c50201e705fd.zip
Provide a DSA area for all parallel queries.
This will allow future parallel query code to dynamically allocate storage shared by all participants. Thomas Munro, with assorted changes by me.
Diffstat (limited to 'src/backend/executor/execParallel.c')
-rw-r--r--src/backend/executor/execParallel.c51
1 files changed, 50 insertions, 1 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index f9c85989d82..8a6f844e352 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -34,6 +34,7 @@
#include "optimizer/planner.h"
#include "storage/spin.h"
#include "tcop/tcopprot.h"
+#include "utils/dsa.h"
#include "utils/memutils.h"
#include "utils/snapmgr.h"
@@ -47,6 +48,7 @@
#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000003)
#define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000004)
#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000005)
+#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000006)
#define PARALLEL_TUPLE_QUEUE_SIZE 65536
@@ -345,6 +347,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
int param_len;
int instrumentation_len = 0;
int instrument_offset = 0;
+ Size dsa_minsize = dsa_minimum_size();
/* Allocate object for return value. */
pei = palloc0(sizeof(ParallelExecutorInfo));
@@ -413,6 +416,10 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
shm_toc_estimate_keys(&pcxt->estimator, 1);
}
+ /* Estimate space for DSA area. */
+ shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+
/* Everyone's had a chance to ask for space, so now create the DSM. */
InitializeParallelDSM(pcxt);
@@ -467,6 +474,29 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
}
/*
+ * Create a DSA area that can be used by the leader and all workers.
+ * (However, if we failed to create a DSM and are using private memory
+ * instead, then skip this.)
+ */
+ if (pcxt->seg != NULL)
+ {
+ char *area_space;
+
+ area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
+ pei->area = dsa_create_in_place(area_space, dsa_minsize,
+ LWTRANCHE_PARALLEL_QUERY_DSA,
+ "parallel_query_dsa",
+ pcxt->seg);
+ }
+
+ /*
+ * Make the area available to executor nodes running in the leader. See
+ * also ParallelQueryMain which makes it available to workers.
+ */
+ estate->es_query_dsa = pei->area;
+
+ /*
* Give parallel-aware nodes a chance to initialize their shared data.
* This also initializes the elements of instrumentation->ps_instrument,
* if it exists.
@@ -571,6 +601,11 @@ ExecParallelFinish(ParallelExecutorInfo *pei)
void
ExecParallelCleanup(ParallelExecutorInfo *pei)
{
+ if (pei->area != NULL)
+ {
+ dsa_detach(pei->area);
+ pei->area = NULL;
+ }
if (pei->pcxt != NULL)
{
DestroyParallelContext(pei->pcxt);
@@ -728,6 +763,8 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
QueryDesc *queryDesc;
SharedExecutorInstrumentation *instrumentation;
int instrument_options = 0;
+ void *area_space;
+ dsa_area *area;
/* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
receiver = ExecParallelGetReceiver(seg, toc);
@@ -739,10 +776,21 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
/* Prepare to track buffer usage during query execution. */
InstrStartParallelQuery();
- /* Start up the executor, have it run the plan, and then shut it down. */
+ /* Attach to the dynamic shared memory area. */
+ area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA);
+ area = dsa_attach_in_place(area_space, seg);
+
+ /* Start up the executor */
ExecutorStart(queryDesc, 0);
+
+ /* Special executor initialization steps for parallel workers */
+ queryDesc->planstate->state->es_query_dsa = area;
ExecParallelInitializeWorker(queryDesc->planstate, toc);
+
+ /* Run the plan */
ExecutorRun(queryDesc, ForwardScanDirection, 0L);
+
+ /* Shut down the executor */
ExecutorFinish(queryDesc);
/* Report buffer usage during parallel execution. */
@@ -758,6 +806,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
ExecutorEnd(queryDesc);
/* Cleanup. */
+ dsa_detach(area);
FreeQueryDesc(queryDesc);
(*receiver->rDestroy) (receiver);
}