From 3452dc5240da43e833118484e1e9b4894d04431c Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Tue, 29 Aug 2017 13:12:23 -0400 Subject: Push tuple limits through Gather and Gather Merge. If we only need, say, 10 tuples in total, then we certainly don't need more than 10 tuples from any single process. Pushing down the limit lets workers exit early when possible. For Gather Merge, there is an additional benefit: a Sort immediately below the Gather Merge can be done as a bounded sort if there is an applicable limit. Robert Haas and Tom Lane Discussion: http://postgr.es/m/CA+TgmoYa3QKKrLj5rX7UvGqhH73G1Li4B-EKxrmASaca2tFu9Q@mail.gmail.com --- src/backend/executor/execParallel.c | 54 ++++++++++++++++++++++++++++++------- 1 file changed, 44 insertions(+), 10 deletions(-) (limited to 'src/backend/executor/execParallel.c') diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index ce47f1d4a8b..ad9eba63dd3 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -47,16 +47,25 @@ * greater than any 32-bit integer here so that values < 2^32 can be used * by individual parallel nodes to store their own state. */ -#define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000001) -#define PARALLEL_KEY_PARAMS UINT64CONST(0xE000000000000002) -#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000003) -#define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000004) -#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000005) -#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000006) -#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000007) +#define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001) +#define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002) +#define PARALLEL_KEY_PARAMS UINT64CONST(0xE000000000000003) +#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004) +#define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005) +#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006) +#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007) +#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008) #define PARALLEL_TUPLE_QUEUE_SIZE 65536 +/* + * Fixed-size random stuff that we need to pass to parallel workers. + */ +typedef struct FixedParallelExecutorState +{ + int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */ +} FixedParallelExecutorState; + /* * DSM structure for accumulating per-PlanState instrumentation. * @@ -381,12 +390,14 @@ ExecParallelReinitialize(ParallelExecutorInfo *pei) * execution and return results to the main backend. */ ParallelExecutorInfo * -ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) +ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, + int64 tuples_needed) { ParallelExecutorInfo *pei; ParallelContext *pcxt; ExecParallelEstimateContext e; ExecParallelInitializeDSMContext d; + FixedParallelExecutorState *fpes; char *pstmt_data; char *pstmt_space; char *param_space; @@ -418,6 +429,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) * for the various things we need to store. */ + /* Estimate space for fixed-size state. */ + shm_toc_estimate_chunk(&pcxt->estimator, + sizeof(FixedParallelExecutorState)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for query text. */ query_len = strlen(estate->es_sourceText); shm_toc_estimate_chunk(&pcxt->estimator, query_len); @@ -487,6 +503,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) * asked for has been allocated or initialized yet, though, so do that. */ + /* Store fixed-size state. */ + fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState)); + fpes->tuples_needed = tuples_needed; + shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes); + /* Store query string */ query_string = shm_toc_allocate(pcxt->toc, query_len); memcpy(query_string, estate->es_sourceText, query_len); @@ -833,6 +854,7 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc) void ParallelQueryMain(dsm_segment *seg, shm_toc *toc) { + FixedParallelExecutorState *fpes; BufferUsage *buffer_usage; DestReceiver *receiver; QueryDesc *queryDesc; @@ -841,6 +863,9 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) void *area_space; dsa_area *area; + /* Get fixed-size state. */ + fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false); + /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */ receiver = ExecParallelGetReceiver(seg, toc); instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true); @@ -868,8 +893,17 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) queryDesc->planstate->state->es_query_dsa = area; ExecParallelInitializeWorker(queryDesc->planstate, toc); - /* Run the plan */ - ExecutorRun(queryDesc, ForwardScanDirection, 0L, true); + /* Pass down any tuple bound */ + ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate); + + /* + * Run the plan. If we specified a tuple bound, be careful not to demand + * more tuples than that. + */ + ExecutorRun(queryDesc, + ForwardScanDirection, + fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed, + true); /* Shut down the executor */ ExecutorFinish(queryDesc); -- cgit v1.2.3