diff options
author | Robert Haas <rhaas@postgresql.org> | 2016-12-19 16:47:15 -0500 |
---|---|---|
committer | Robert Haas <rhaas@postgresql.org> | 2016-12-19 17:11:46 -0500 |
commit | e13029a5ce353574516c64fd1ec9c50201e705fd (patch) | |
tree | 529cb354f9f3a41fe0678d2733cf303ed62030df /src | |
parent | 2604438472c897fbbd1568b1a8ee177ba8cdb6e3 (diff) | |
download | postgresql-e13029a5ce353574516c64fd1ec9c50201e705fd.tar.gz postgresql-e13029a5ce353574516c64fd1ec9c50201e705fd.zip |
Provide a DSA area for all parallel queries.
This will allow future parallel query code to dynamically allocate
storage shared by all participants.
Thomas Munro, with assorted changes by me.
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/executor/execParallel.c | 51 | ||||
-rw-r--r-- | src/include/executor/execParallel.h | 2 | ||||
-rw-r--r-- | src/include/nodes/execnodes.h | 3 | ||||
-rw-r--r-- | src/include/storage/lwlock.h | 1 |
4 files changed, 56 insertions, 1 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index f9c85989d82..8a6f844e352 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -34,6 +34,7 @@ #include "optimizer/planner.h" #include "storage/spin.h" #include "tcop/tcopprot.h" +#include "utils/dsa.h" #include "utils/memutils.h" #include "utils/snapmgr.h" @@ -47,6 +48,7 @@ #define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000003) #define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000004) #define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000005) +#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000006) #define PARALLEL_TUPLE_QUEUE_SIZE 65536 @@ -345,6 +347,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) int param_len; int instrumentation_len = 0; int instrument_offset = 0; + Size dsa_minsize = dsa_minimum_size(); /* Allocate object for return value. */ pei = palloc0(sizeof(ParallelExecutorInfo)); @@ -413,6 +416,10 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) shm_toc_estimate_keys(&pcxt->estimator, 1); } + /* Estimate space for DSA area. */ + shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize); + shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Everyone's had a chance to ask for space, so now create the DSM. */ InitializeParallelDSM(pcxt); @@ -467,6 +474,29 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) } /* + * Create a DSA area that can be used by the leader and all workers. + * (However, if we failed to create a DSM and are using private memory + * instead, then skip this.) + */ + if (pcxt->seg != NULL) + { + char *area_space; + + area_space = shm_toc_allocate(pcxt->toc, dsa_minsize); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space); + pei->area = dsa_create_in_place(area_space, dsa_minsize, + LWTRANCHE_PARALLEL_QUERY_DSA, + "parallel_query_dsa", + pcxt->seg); + } + + /* + * Make the area available to executor nodes running in the leader. See + * also ParallelQueryMain which makes it available to workers. + */ + estate->es_query_dsa = pei->area; + + /* * Give parallel-aware nodes a chance to initialize their shared data. * This also initializes the elements of instrumentation->ps_instrument, * if it exists. @@ -571,6 +601,11 @@ ExecParallelFinish(ParallelExecutorInfo *pei) void ExecParallelCleanup(ParallelExecutorInfo *pei) { + if (pei->area != NULL) + { + dsa_detach(pei->area); + pei->area = NULL; + } if (pei->pcxt != NULL) { DestroyParallelContext(pei->pcxt); @@ -728,6 +763,8 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) QueryDesc *queryDesc; SharedExecutorInstrumentation *instrumentation; int instrument_options = 0; + void *area_space; + dsa_area *area; /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */ receiver = ExecParallelGetReceiver(seg, toc); @@ -739,10 +776,21 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) /* Prepare to track buffer usage during query execution. */ InstrStartParallelQuery(); - /* Start up the executor, have it run the plan, and then shut it down. */ + /* Attach to the dynamic shared memory area. */ + area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA); + area = dsa_attach_in_place(area_space, seg); + + /* Start up the executor */ ExecutorStart(queryDesc, 0); + + /* Special executor initialization steps for parallel workers */ + queryDesc->planstate->state->es_query_dsa = area; ExecParallelInitializeWorker(queryDesc->planstate, toc); + + /* Run the plan */ ExecutorRun(queryDesc, ForwardScanDirection, 0L); + + /* Shut down the executor */ ExecutorFinish(queryDesc); /* Report buffer usage during parallel execution. */ @@ -758,6 +806,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) ExecutorEnd(queryDesc); /* Cleanup. */ + dsa_detach(area); FreeQueryDesc(queryDesc); (*receiver->rDestroy) (receiver); } diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h index f4c6d37a119..4bbee691a7f 100644 --- a/src/include/executor/execParallel.h +++ b/src/include/executor/execParallel.h @@ -17,6 +17,7 @@ #include "nodes/execnodes.h" #include "nodes/parsenodes.h" #include "nodes/plannodes.h" +#include "utils/dsa.h" typedef struct SharedExecutorInstrumentation SharedExecutorInstrumentation; @@ -27,6 +28,7 @@ typedef struct ParallelExecutorInfo BufferUsage *buffer_usage; SharedExecutorInstrumentation *instrumentation; shm_mq_handle **tqueue; + dsa_area *area; bool finished; } ParallelExecutorInfo; diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 703604ab9d7..5c3b8683f5b 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -427,6 +427,9 @@ typedef struct EState HeapTuple *es_epqTuple; /* array of EPQ substitute tuples */ bool *es_epqTupleSet; /* true if EPQ tuple is provided */ bool *es_epqScanDone; /* true if EPQ tuple has been fetched */ + + /* The per-query shared memory area to use for parallel execution. */ + struct dsa_area *es_query_dsa; } EState; diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index db1c687e21e..3ca4db0a723 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -210,6 +210,7 @@ typedef enum BuiltinTrancheIds LWTRANCHE_BUFFER_MAPPING, LWTRANCHE_LOCK_MANAGER, LWTRANCHE_PREDICATE_LOCK_MANAGER, + LWTRANCHE_PARALLEL_QUERY_DSA, LWTRANCHE_FIRST_USER_DEFINED } BuiltinTrancheIds; |