aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/nodeAgg.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor/nodeAgg.c')
-rw-r--r--src/backend/executor/nodeAgg.c173
1 files changed, 78 insertions, 95 deletions
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index a4479646129..b3187e66681 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -10,51 +10,33 @@
* transvalue = transfunc(transvalue, input_value(s))
* result = finalfunc(transvalue, direct_argument(s))
*
- * If a finalfunc is not supplied or finalizeAggs is false, then the result
- * is just the ending value of transvalue.
- *
- * Other behavior is also supported and is controlled by the 'combineStates'
- * and 'finalizeAggs'. 'combineStates' controls whether the trans func or
- * the combine func is used during aggregation. When 'combineStates' is
- * true we expect other (previously) aggregated states as input rather than
- * input tuples. This mode facilitates multiple aggregate stages which
- * allows us to support pushing aggregation down deeper into the plan rather
- * than leaving it for the final stage. For example with a query such as:
- *
- * SELECT count(*) FROM (SELECT * FROM a UNION ALL SELECT * FROM b);
- *
- * with this functionality the planner has the flexibility to generate a
- * plan which performs count(*) on table a and table b separately and then
- * add a combine phase to combine both results. In this case the combine
- * function would simply add both counts together.
- *
- * When multiple aggregate stages exist the planner should have set the
- * 'finalizeAggs' to true only for the final aggregtion state, and each
- * stage, apart from the very first one should have 'combineStates' set to
- * true. This permits plans such as:
- *
- * Finalize Aggregate
- * -> Partial Aggregate
- * -> Partial Aggregate
- *
- * Combine functions which use pass-by-ref states should be careful to
- * always update the 1st state parameter by adding the 2nd parameter to it,
- * rather than the other way around. If the 1st state is NULL, then it's not
- * sufficient to simply return the 2nd state, as the memory context is
- * incorrect. Instead a new state should be created in the correct aggregate
- * memory context and the 2nd state should be copied over.
- *
- * The 'serialStates' option can be used to allow multi-stage aggregation
- * for aggregates with an INTERNAL state type. When this mode is disabled
- * only a pointer to the INTERNAL aggregate states are passed around the
- * executor. When enabled, INTERNAL states are serialized and deserialized
- * as required; this is useful when data must be passed between processes.
+ * If a finalfunc is not supplied then the result is just the ending
+ * value of transvalue.
+ *
+ * Other behaviors can be selected by the "aggsplit" mode, which exists
+ * to support partial aggregation. It is possible to:
+ * * Skip running the finalfunc, so that the output is always the
+ * final transvalue state.
+ * * Substitute the combinefunc for the transfunc, so that transvalue
+ * states (propagated up from a child partial-aggregation step) are merged
+ * rather than processing raw input rows. (The statements below about
+ * the transfunc apply equally to the combinefunc, when it's selected.)
+ * * Apply the serializefunc to the output values (this only makes sense
+ * when skipping the finalfunc, since the serializefunc works on the
+ * transvalue data type).
+ * * Apply the deserializefunc to the input values (this only makes sense
+ * when using the combinefunc, for similar reasons).
+ * It is the planner's responsibility to connect up Agg nodes using these
+ * alternate behaviors in a way that makes sense, with partial aggregation
+ * results being fed to nodes that expect them.
*
* If a normal aggregate call specifies DISTINCT or ORDER BY, we sort the
* input tuples and eliminate duplicates (if required) before performing
* the above-depicted process. (However, we don't do that for ordered-set
* aggregates; their "ORDER BY" inputs are ordinary aggregate arguments
- * so far as this module is concerned.)
+ * so far as this module is concerned.) Note that partial aggregation
+ * is not supported in these cases, since we couldn't ensure global
+ * ordering or distinctness of the inputs.
*
* If transfunc is marked "strict" in pg_proc and initcond is NULL,
* then the first non-NULL input_value is assigned directly to transvalue,
@@ -862,8 +844,6 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
int numGroupingSets = Max(aggstate->phase->numsets, 1);
int numTrans = aggstate->numtrans;
- Assert(!aggstate->combineStates);
-
for (transno = 0; transno < numTrans; transno++)
{
AggStatePerTrans pertrans = &aggstate->pertrans[transno];
@@ -948,9 +928,11 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
}
/*
- * combine_aggregates is used when running in 'combineState' mode. This
- * advances each aggregate transition state by adding another transition state
- * to it.
+ * combine_aggregates replaces advance_aggregates in DO_AGGSPLIT_COMBINE
+ * mode. The principal difference is that here we may need to apply the
+ * deserialization function before running the transfn (which, in this mode,
+ * is actually the aggregate's combinefn). Also, we know we don't need to
+ * handle FILTER, DISTINCT, ORDER BY, or grouping sets.
*/
static void
combine_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
@@ -960,14 +942,13 @@ combine_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
/* combine not supported with grouping sets */
Assert(aggstate->phase->numsets == 0);
- Assert(aggstate->combineStates);
for (transno = 0; transno < numTrans; transno++)
{
AggStatePerTrans pertrans = &aggstate->pertrans[transno];
+ AggStatePerGroup pergroupstate = &pergroup[transno];
TupleTableSlot *slot;
FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo;
- AggStatePerGroup pergroupstate = &pergroup[transno];
/* Evaluate the current input expressions for this aggregate */
slot = ExecProject(pertrans->evalproj, NULL);
@@ -979,15 +960,12 @@ combine_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
*/
if (OidIsValid(pertrans->deserialfn_oid))
{
- /*
- * Don't call a strict deserialization function with NULL input. A
- * strict deserialization function and a null value means we skip
- * calling the combine function for this state. We assume that
- * this would be a waste of time and effort anyway so just skip
- * it.
- */
+ /* Don't call a strict deserialization function with NULL input */
if (pertrans->deserialfn.fn_strict && slot->tts_isnull[0])
- continue;
+ {
+ fcinfo->arg[1] = slot->tts_values[0];
+ fcinfo->argnull[1] = slot->tts_isnull[0];
+ }
else
{
FunctionCallInfo dsinfo = &pertrans->deserialfn_fcinfo;
@@ -1110,7 +1088,6 @@ advance_combine_function(AggState *aggstate,
pergroupstate->transValueIsNull = fcinfo->isnull;
MemoryContextSwitchTo(oldContext);
-
}
@@ -1415,7 +1392,7 @@ finalize_aggregate(AggState *aggstate,
}
/*
- * Compute the final value of one partial aggregate.
+ * Compute the output value of one partial aggregate.
*
* The serialization function will be run, and the result delivered, in the
* output-tuple context; caller's CurrentMemoryContext does not matter.
@@ -1432,8 +1409,8 @@ finalize_partialaggregate(AggState *aggstate,
oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
/*
- * serialfn_oid will be set if we must serialize the input state before
- * calling the combine function on the state.
+ * serialfn_oid will be set if we must serialize the transvalue before
+ * returning it
*/
if (OidIsValid(pertrans->serialfn_oid))
{
@@ -1577,12 +1554,12 @@ finalize_aggregates(AggState *aggstate,
pergroupstate);
}
- if (aggstate->finalizeAggs)
- finalize_aggregate(aggstate, peragg, pergroupstate,
- &aggvalues[aggno], &aggnulls[aggno]);
- else
+ if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
finalize_partialaggregate(aggstate, peragg, pergroupstate,
&aggvalues[aggno], &aggnulls[aggno]);
+ else
+ finalize_aggregate(aggstate, peragg, pergroupstate,
+ &aggvalues[aggno], &aggnulls[aggno]);
}
}
@@ -2114,10 +2091,10 @@ agg_retrieve_direct(AggState *aggstate)
*/
for (;;)
{
- if (!aggstate->combineStates)
- advance_aggregates(aggstate, pergroup);
- else
+ if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
combine_aggregates(aggstate, pergroup);
+ else
+ advance_aggregates(aggstate, pergroup);
/* Reset per-input-tuple context after each tuple */
ResetExprContext(tmpcontext);
@@ -2225,10 +2202,10 @@ agg_fill_hash_table(AggState *aggstate)
entry = lookup_hash_entry(aggstate, outerslot);
/* Advance the aggregates */
- if (!aggstate->combineStates)
- advance_aggregates(aggstate, entry->pergroup);
- else
+ if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
combine_aggregates(aggstate, entry->pergroup);
+ else
+ advance_aggregates(aggstate, entry->pergroup);
/* Reset per-input-tuple context after each tuple */
ResetExprContext(tmpcontext);
@@ -2352,6 +2329,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
aggstate->aggs = NIL;
aggstate->numaggs = 0;
aggstate->numtrans = 0;
+ aggstate->aggsplit = node->aggsplit;
aggstate->maxsets = 0;
aggstate->hashfunctions = NULL;
aggstate->projected_set = -1;
@@ -2359,11 +2337,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
aggstate->peragg = NULL;
aggstate->pertrans = NULL;
aggstate->curpertrans = NULL;
- aggstate->agg_done = false;
- aggstate->combineStates = node->combineStates;
- aggstate->finalizeAggs = node->finalizeAggs;
- aggstate->serialStates = node->serialStates;
aggstate->input_done = false;
+ aggstate->agg_done = false;
aggstate->pergroup = NULL;
aggstate->grp_firstTuple = NULL;
aggstate->hashtable = NULL;
@@ -2681,6 +2656,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
/* Planner should have assigned aggregate to correct level */
Assert(aggref->agglevelsup == 0);
+ /* ... and the split mode should match */
+ Assert(aggref->aggsplit == aggstate->aggsplit);
/* 1. Check for already processed aggs which can be re-used */
existing_aggno = find_compatible_peragg(aggref, aggstate, aggno,
@@ -2724,7 +2701,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
* If this aggregation is performing state combines, then instead of
* using the transition function, we'll use the combine function
*/
- if (aggstate->combineStates)
+ if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
{
transfn_oid = aggform->aggcombinefn;
@@ -2736,39 +2713,45 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
transfn_oid = aggform->aggtransfn;
/* Final function only required if we're finalizing the aggregates */
- if (aggstate->finalizeAggs)
- peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
- else
+ if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
peragg->finalfn_oid = finalfn_oid = InvalidOid;
+ else
+ peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
serialfn_oid = InvalidOid;
deserialfn_oid = InvalidOid;
/*
- * Determine if we require serialization or deserialization of the
- * aggregate states. This is only required if the aggregate state is
- * internal.
+ * Check if serialization/deserialization is required. We only do it
+ * for aggregates that have transtype INTERNAL.
*/
- if (aggstate->serialStates && aggtranstype == INTERNALOID)
+ if (aggtranstype == INTERNALOID)
{
/*
- * The planner should only have generated an agg node with
- * serialStates if every aggregate with an INTERNAL state has
- * serialization/deserialization functions. Verify that.
+ * The planner should only have generated a serialize agg node if
+ * every aggregate with an INTERNAL state has a serialization
+ * function. Verify that.
*/
- if (!OidIsValid(aggform->aggserialfn))
- elog(ERROR, "serialfunc not set during serialStates aggregation step");
-
- if (!OidIsValid(aggform->aggdeserialfn))
- elog(ERROR, "deserialfunc not set during serialStates aggregation step");
+ if (DO_AGGSPLIT_SERIALIZE(aggstate->aggsplit))
+ {
+ /* serialization only valid when not running finalfn */
+ Assert(DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
- /* serialization func only required when not finalizing aggs */
- if (!aggstate->finalizeAggs)
+ if (!OidIsValid(aggform->aggserialfn))
+ elog(ERROR, "serialfunc not provided for serialization aggregation");
serialfn_oid = aggform->aggserialfn;
+ }
+
+ /* Likewise for deserialization functions */
+ if (DO_AGGSPLIT_DESERIALIZE(aggstate->aggsplit))
+ {
+ /* deserialization only valid when combining states */
+ Assert(DO_AGGSPLIT_COMBINE(aggstate->aggsplit));
- /* deserialization func only required when combining states */
- if (aggstate->combineStates)
+ if (!OidIsValid(aggform->aggdeserialfn))
+ elog(ERROR, "deserialfunc not provided for deserialization aggregation");
deserialfn_oid = aggform->aggdeserialfn;
+ }
}
/* Check that aggregate owner has permission to call component fns */
@@ -2853,7 +2836,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
}
/* get info about the output value's datatype */
- get_typlenbyval(aggref->aggoutputtype,
+ get_typlenbyval(aggref->aggtype,
&peragg->resulttypeLen,
&peragg->resulttypeByVal);
@@ -2972,7 +2955,7 @@ build_pertrans_for_aggref(AggStatePerTrans pertrans,
* transfn and transfn_oid fields of pertrans refer to the combine
* function rather than the transition function.
*/
- if (aggstate->combineStates)
+ if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
{
Expr *combinefnexpr;