diff options
Diffstat (limited to 'src/backend/executor/nodeAgg.c')
-rw-r--r-- | src/backend/executor/nodeAgg.c | 173 |
1 files changed, 78 insertions, 95 deletions
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index a4479646129..b3187e66681 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -10,51 +10,33 @@ * transvalue = transfunc(transvalue, input_value(s)) * result = finalfunc(transvalue, direct_argument(s)) * - * If a finalfunc is not supplied or finalizeAggs is false, then the result - * is just the ending value of transvalue. - * - * Other behavior is also supported and is controlled by the 'combineStates' - * and 'finalizeAggs'. 'combineStates' controls whether the trans func or - * the combine func is used during aggregation. When 'combineStates' is - * true we expect other (previously) aggregated states as input rather than - * input tuples. This mode facilitates multiple aggregate stages which - * allows us to support pushing aggregation down deeper into the plan rather - * than leaving it for the final stage. For example with a query such as: - * - * SELECT count(*) FROM (SELECT * FROM a UNION ALL SELECT * FROM b); - * - * with this functionality the planner has the flexibility to generate a - * plan which performs count(*) on table a and table b separately and then - * add a combine phase to combine both results. In this case the combine - * function would simply add both counts together. - * - * When multiple aggregate stages exist the planner should have set the - * 'finalizeAggs' to true only for the final aggregtion state, and each - * stage, apart from the very first one should have 'combineStates' set to - * true. This permits plans such as: - * - * Finalize Aggregate - * -> Partial Aggregate - * -> Partial Aggregate - * - * Combine functions which use pass-by-ref states should be careful to - * always update the 1st state parameter by adding the 2nd parameter to it, - * rather than the other way around. If the 1st state is NULL, then it's not - * sufficient to simply return the 2nd state, as the memory context is - * incorrect. Instead a new state should be created in the correct aggregate - * memory context and the 2nd state should be copied over. - * - * The 'serialStates' option can be used to allow multi-stage aggregation - * for aggregates with an INTERNAL state type. When this mode is disabled - * only a pointer to the INTERNAL aggregate states are passed around the - * executor. When enabled, INTERNAL states are serialized and deserialized - * as required; this is useful when data must be passed between processes. + * If a finalfunc is not supplied then the result is just the ending + * value of transvalue. + * + * Other behaviors can be selected by the "aggsplit" mode, which exists + * to support partial aggregation. It is possible to: + * * Skip running the finalfunc, so that the output is always the + * final transvalue state. + * * Substitute the combinefunc for the transfunc, so that transvalue + * states (propagated up from a child partial-aggregation step) are merged + * rather than processing raw input rows. (The statements below about + * the transfunc apply equally to the combinefunc, when it's selected.) + * * Apply the serializefunc to the output values (this only makes sense + * when skipping the finalfunc, since the serializefunc works on the + * transvalue data type). + * * Apply the deserializefunc to the input values (this only makes sense + * when using the combinefunc, for similar reasons). + * It is the planner's responsibility to connect up Agg nodes using these + * alternate behaviors in a way that makes sense, with partial aggregation + * results being fed to nodes that expect them. * * If a normal aggregate call specifies DISTINCT or ORDER BY, we sort the * input tuples and eliminate duplicates (if required) before performing * the above-depicted process. (However, we don't do that for ordered-set * aggregates; their "ORDER BY" inputs are ordinary aggregate arguments - * so far as this module is concerned.) + * so far as this module is concerned.) Note that partial aggregation + * is not supported in these cases, since we couldn't ensure global + * ordering or distinctness of the inputs. * * If transfunc is marked "strict" in pg_proc and initcond is NULL, * then the first non-NULL input_value is assigned directly to transvalue, @@ -862,8 +844,6 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup) int numGroupingSets = Max(aggstate->phase->numsets, 1); int numTrans = aggstate->numtrans; - Assert(!aggstate->combineStates); - for (transno = 0; transno < numTrans; transno++) { AggStatePerTrans pertrans = &aggstate->pertrans[transno]; @@ -948,9 +928,11 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup) } /* - * combine_aggregates is used when running in 'combineState' mode. This - * advances each aggregate transition state by adding another transition state - * to it. + * combine_aggregates replaces advance_aggregates in DO_AGGSPLIT_COMBINE + * mode. The principal difference is that here we may need to apply the + * deserialization function before running the transfn (which, in this mode, + * is actually the aggregate's combinefn). Also, we know we don't need to + * handle FILTER, DISTINCT, ORDER BY, or grouping sets. */ static void combine_aggregates(AggState *aggstate, AggStatePerGroup pergroup) @@ -960,14 +942,13 @@ combine_aggregates(AggState *aggstate, AggStatePerGroup pergroup) /* combine not supported with grouping sets */ Assert(aggstate->phase->numsets == 0); - Assert(aggstate->combineStates); for (transno = 0; transno < numTrans; transno++) { AggStatePerTrans pertrans = &aggstate->pertrans[transno]; + AggStatePerGroup pergroupstate = &pergroup[transno]; TupleTableSlot *slot; FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo; - AggStatePerGroup pergroupstate = &pergroup[transno]; /* Evaluate the current input expressions for this aggregate */ slot = ExecProject(pertrans->evalproj, NULL); @@ -979,15 +960,12 @@ combine_aggregates(AggState *aggstate, AggStatePerGroup pergroup) */ if (OidIsValid(pertrans->deserialfn_oid)) { - /* - * Don't call a strict deserialization function with NULL input. A - * strict deserialization function and a null value means we skip - * calling the combine function for this state. We assume that - * this would be a waste of time and effort anyway so just skip - * it. - */ + /* Don't call a strict deserialization function with NULL input */ if (pertrans->deserialfn.fn_strict && slot->tts_isnull[0]) - continue; + { + fcinfo->arg[1] = slot->tts_values[0]; + fcinfo->argnull[1] = slot->tts_isnull[0]; + } else { FunctionCallInfo dsinfo = &pertrans->deserialfn_fcinfo; @@ -1110,7 +1088,6 @@ advance_combine_function(AggState *aggstate, pergroupstate->transValueIsNull = fcinfo->isnull; MemoryContextSwitchTo(oldContext); - } @@ -1415,7 +1392,7 @@ finalize_aggregate(AggState *aggstate, } /* - * Compute the final value of one partial aggregate. + * Compute the output value of one partial aggregate. * * The serialization function will be run, and the result delivered, in the * output-tuple context; caller's CurrentMemoryContext does not matter. @@ -1432,8 +1409,8 @@ finalize_partialaggregate(AggState *aggstate, oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory); /* - * serialfn_oid will be set if we must serialize the input state before - * calling the combine function on the state. + * serialfn_oid will be set if we must serialize the transvalue before + * returning it */ if (OidIsValid(pertrans->serialfn_oid)) { @@ -1577,12 +1554,12 @@ finalize_aggregates(AggState *aggstate, pergroupstate); } - if (aggstate->finalizeAggs) - finalize_aggregate(aggstate, peragg, pergroupstate, - &aggvalues[aggno], &aggnulls[aggno]); - else + if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit)) finalize_partialaggregate(aggstate, peragg, pergroupstate, &aggvalues[aggno], &aggnulls[aggno]); + else + finalize_aggregate(aggstate, peragg, pergroupstate, + &aggvalues[aggno], &aggnulls[aggno]); } } @@ -2114,10 +2091,10 @@ agg_retrieve_direct(AggState *aggstate) */ for (;;) { - if (!aggstate->combineStates) - advance_aggregates(aggstate, pergroup); - else + if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit)) combine_aggregates(aggstate, pergroup); + else + advance_aggregates(aggstate, pergroup); /* Reset per-input-tuple context after each tuple */ ResetExprContext(tmpcontext); @@ -2225,10 +2202,10 @@ agg_fill_hash_table(AggState *aggstate) entry = lookup_hash_entry(aggstate, outerslot); /* Advance the aggregates */ - if (!aggstate->combineStates) - advance_aggregates(aggstate, entry->pergroup); - else + if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit)) combine_aggregates(aggstate, entry->pergroup); + else + advance_aggregates(aggstate, entry->pergroup); /* Reset per-input-tuple context after each tuple */ ResetExprContext(tmpcontext); @@ -2352,6 +2329,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) aggstate->aggs = NIL; aggstate->numaggs = 0; aggstate->numtrans = 0; + aggstate->aggsplit = node->aggsplit; aggstate->maxsets = 0; aggstate->hashfunctions = NULL; aggstate->projected_set = -1; @@ -2359,11 +2337,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) aggstate->peragg = NULL; aggstate->pertrans = NULL; aggstate->curpertrans = NULL; - aggstate->agg_done = false; - aggstate->combineStates = node->combineStates; - aggstate->finalizeAggs = node->finalizeAggs; - aggstate->serialStates = node->serialStates; aggstate->input_done = false; + aggstate->agg_done = false; aggstate->pergroup = NULL; aggstate->grp_firstTuple = NULL; aggstate->hashtable = NULL; @@ -2681,6 +2656,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) /* Planner should have assigned aggregate to correct level */ Assert(aggref->agglevelsup == 0); + /* ... and the split mode should match */ + Assert(aggref->aggsplit == aggstate->aggsplit); /* 1. Check for already processed aggs which can be re-used */ existing_aggno = find_compatible_peragg(aggref, aggstate, aggno, @@ -2724,7 +2701,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) * If this aggregation is performing state combines, then instead of * using the transition function, we'll use the combine function */ - if (aggstate->combineStates) + if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit)) { transfn_oid = aggform->aggcombinefn; @@ -2736,39 +2713,45 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) transfn_oid = aggform->aggtransfn; /* Final function only required if we're finalizing the aggregates */ - if (aggstate->finalizeAggs) - peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn; - else + if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit)) peragg->finalfn_oid = finalfn_oid = InvalidOid; + else + peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn; serialfn_oid = InvalidOid; deserialfn_oid = InvalidOid; /* - * Determine if we require serialization or deserialization of the - * aggregate states. This is only required if the aggregate state is - * internal. + * Check if serialization/deserialization is required. We only do it + * for aggregates that have transtype INTERNAL. */ - if (aggstate->serialStates && aggtranstype == INTERNALOID) + if (aggtranstype == INTERNALOID) { /* - * The planner should only have generated an agg node with - * serialStates if every aggregate with an INTERNAL state has - * serialization/deserialization functions. Verify that. + * The planner should only have generated a serialize agg node if + * every aggregate with an INTERNAL state has a serialization + * function. Verify that. */ - if (!OidIsValid(aggform->aggserialfn)) - elog(ERROR, "serialfunc not set during serialStates aggregation step"); - - if (!OidIsValid(aggform->aggdeserialfn)) - elog(ERROR, "deserialfunc not set during serialStates aggregation step"); + if (DO_AGGSPLIT_SERIALIZE(aggstate->aggsplit)) + { + /* serialization only valid when not running finalfn */ + Assert(DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit)); - /* serialization func only required when not finalizing aggs */ - if (!aggstate->finalizeAggs) + if (!OidIsValid(aggform->aggserialfn)) + elog(ERROR, "serialfunc not provided for serialization aggregation"); serialfn_oid = aggform->aggserialfn; + } + + /* Likewise for deserialization functions */ + if (DO_AGGSPLIT_DESERIALIZE(aggstate->aggsplit)) + { + /* deserialization only valid when combining states */ + Assert(DO_AGGSPLIT_COMBINE(aggstate->aggsplit)); - /* deserialization func only required when combining states */ - if (aggstate->combineStates) + if (!OidIsValid(aggform->aggdeserialfn)) + elog(ERROR, "deserialfunc not provided for deserialization aggregation"); deserialfn_oid = aggform->aggdeserialfn; + } } /* Check that aggregate owner has permission to call component fns */ @@ -2853,7 +2836,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) } /* get info about the output value's datatype */ - get_typlenbyval(aggref->aggoutputtype, + get_typlenbyval(aggref->aggtype, &peragg->resulttypeLen, &peragg->resulttypeByVal); @@ -2972,7 +2955,7 @@ build_pertrans_for_aggref(AggStatePerTrans pertrans, * transfn and transfn_oid fields of pertrans refer to the combine * function rather than the transition function. */ - if (aggstate->combineStates) + if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit)) { Expr *combinefnexpr; |