diff options
Diffstat (limited to 'src/backend/executor/nodeAgg.c')
-rw-r--r-- | src/backend/executor/nodeAgg.c | 864 |
1 files changed, 98 insertions, 766 deletions
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index 46ee8804152..061acad80f1 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -90,7 +90,7 @@ * but in the aggregate case we know the left input is either the initial * transition value or a previous function result, and in either case its * value need not be preserved. See int8inc() for an example. Notice that - * advance_transition_function() is coded to avoid a data copy step when + * the EEOP_AGG_PLAIN_TRANS step is coded to avoid a data copy step when * the previous transition value pointer is returned. It is also possible * to avoid repeated data copying when the transition value is an expanded * object: to do that, the transition function must take care to return @@ -194,6 +194,16 @@ * transition values. hashcontext is the single context created to support * all hash tables. * + * Transition / Combine function invocation: + * + * For performance reasons transition functions, including combine + * functions, aren't invoked one-by-one from nodeAgg.c after computing + * arguments using the expression evaluation engine. Instead + * ExecBuildAggTrans() builds one large expression that does both argument + * evaluation and transition function invocation. That avoids performance + * issues due to repeated uses of expression evaluation, complications due + * to filter expressions having to be evaluated early, and allows to JIT + * the entire expression into one native function. * * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California @@ -229,305 +239,6 @@ #include "utils/datum.h" -/* - * AggStatePerTransData - per aggregate state value information - * - * Working state for updating the aggregate's state value, by calling the - * transition function with an input row. This struct does not store the - * information needed to produce the final aggregate result from the transition - * state, that's stored in AggStatePerAggData instead. This separation allows - * multiple aggregate results to be produced from a single state value. - */ -typedef struct AggStatePerTransData -{ - /* - * These values are set up during ExecInitAgg() and do not change - * thereafter: - */ - - /* - * Link to an Aggref expr this state value is for. - * - * There can be multiple Aggref's sharing the same state value, so long as - * the inputs and transition functions are identical and the final - * functions are not read-write. This points to the first one of them. - */ - Aggref *aggref; - - /* - * Is this state value actually being shared by more than one Aggref? - */ - bool aggshared; - - /* - * Number of aggregated input columns. This includes ORDER BY expressions - * in both the plain-agg and ordered-set cases. Ordered-set direct args - * are not counted, though. - */ - int numInputs; - - /* - * Number of aggregated input columns to pass to the transfn. This - * includes the ORDER BY columns for ordered-set aggs, but not for plain - * aggs. (This doesn't count the transition state value!) - */ - int numTransInputs; - - /* - * At each input row, we perform a single ExecProject call to evaluate all - * argument expressions that will certainly be needed at this row; that - * includes this aggregate's filter expression if it has one, or its - * regular argument expressions (including any ORDER BY columns) if it - * doesn't. inputoff is the starting index of this aggregate's required - * expressions in the resulting tuple. - */ - int inputoff; - - /* Oid of the state transition or combine function */ - Oid transfn_oid; - - /* Oid of the serialization function or InvalidOid */ - Oid serialfn_oid; - - /* Oid of the deserialization function or InvalidOid */ - Oid deserialfn_oid; - - /* Oid of state value's datatype */ - Oid aggtranstype; - - /* - * fmgr lookup data for transition function or combine function. Note in - * particular that the fn_strict flag is kept here. - */ - FmgrInfo transfn; - - /* fmgr lookup data for serialization function */ - FmgrInfo serialfn; - - /* fmgr lookup data for deserialization function */ - FmgrInfo deserialfn; - - /* Input collation derived for aggregate */ - Oid aggCollation; - - /* number of sorting columns */ - int numSortCols; - - /* number of sorting columns to consider in DISTINCT comparisons */ - /* (this is either zero or the same as numSortCols) */ - int numDistinctCols; - - /* deconstructed sorting information (arrays of length numSortCols) */ - AttrNumber *sortColIdx; - Oid *sortOperators; - Oid *sortCollations; - bool *sortNullsFirst; - - /* - * fmgr lookup data for input columns' equality operators --- only - * set/used when aggregate has DISTINCT flag. Note that these are in - * order of sort column index, not parameter index. - */ - FmgrInfo *equalfns; /* array of length numDistinctCols */ - - /* - * initial value from pg_aggregate entry - */ - Datum initValue; - bool initValueIsNull; - - /* - * We need the len and byval info for the agg's input and transition data - * types in order to know how to copy/delete values. - * - * Note that the info for the input type is used only when handling - * DISTINCT aggs with just one argument, so there is only one input type. - */ - int16 inputtypeLen, - transtypeLen; - bool inputtypeByVal, - transtypeByVal; - - /* - * Stuff for evaluation of aggregate inputs, when they must be evaluated - * separately because there's a FILTER expression. In such cases we will - * create a sortslot and the result will be stored there, whether or not - * we're actually sorting. - */ - ProjectionInfo *evalproj; /* projection machinery */ - - /* - * Slots for holding the evaluated input arguments. These are set up - * during ExecInitAgg() and then used for each input row requiring either - * FILTER or ORDER BY/DISTINCT processing. - */ - TupleTableSlot *sortslot; /* current input tuple */ - TupleTableSlot *uniqslot; /* used for multi-column DISTINCT */ - TupleDesc sortdesc; /* descriptor of input tuples */ - - /* - * These values are working state that is initialized at the start of an - * input tuple group and updated for each input tuple. - * - * For a simple (non DISTINCT/ORDER BY) aggregate, we just feed the input - * values straight to the transition function. If it's DISTINCT or - * requires ORDER BY, we pass the input values into a Tuplesort object; - * then at completion of the input tuple group, we scan the sorted values, - * eliminate duplicates if needed, and run the transition function on the - * rest. - * - * We need a separate tuplesort for each grouping set. - */ - - Tuplesortstate **sortstates; /* sort objects, if DISTINCT or ORDER BY */ - - /* - * This field is a pre-initialized FunctionCallInfo struct used for - * calling this aggregate's transfn. We save a few cycles per row by not - * re-initializing the unchanging fields; which isn't much, but it seems - * worth the extra space consumption. - */ - FunctionCallInfoData transfn_fcinfo; - - /* Likewise for serialization and deserialization functions */ - FunctionCallInfoData serialfn_fcinfo; - - FunctionCallInfoData deserialfn_fcinfo; -} AggStatePerTransData; - -/* - * AggStatePerAggData - per-aggregate information - * - * This contains the information needed to call the final function, to produce - * a final aggregate result from the state value. If there are multiple - * identical Aggrefs in the query, they can all share the same per-agg data. - * - * These values are set up during ExecInitAgg() and do not change thereafter. - */ -typedef struct AggStatePerAggData -{ - /* - * Link to an Aggref expr this state value is for. - * - * There can be multiple identical Aggref's sharing the same per-agg. This - * points to the first one of them. - */ - Aggref *aggref; - - /* index to the state value which this agg should use */ - int transno; - - /* Optional Oid of final function (may be InvalidOid) */ - Oid finalfn_oid; - - /* - * fmgr lookup data for final function --- only valid when finalfn_oid is - * not InvalidOid. - */ - FmgrInfo finalfn; - - /* - * Number of arguments to pass to the finalfn. This is always at least 1 - * (the transition state value) plus any ordered-set direct args. If the - * finalfn wants extra args then we pass nulls corresponding to the - * aggregated input columns. - */ - int numFinalArgs; - - /* ExprStates for any direct-argument expressions */ - List *aggdirectargs; - - /* - * We need the len and byval info for the agg's result data type in order - * to know how to copy/delete values. - */ - int16 resulttypeLen; - bool resulttypeByVal; - - /* - * "sharable" is false if this agg cannot share state values with other - * aggregates because the final function is read-write. - */ - bool sharable; -} AggStatePerAggData; - -/* - * AggStatePerGroupData - per-aggregate-per-group working state - * - * These values are working state that is initialized at the start of - * an input tuple group and updated for each input tuple. - * - * In AGG_PLAIN and AGG_SORTED modes, we have a single array of these - * structs (pointed to by aggstate->pergroup); we re-use the array for - * each input group, if it's AGG_SORTED mode. In AGG_HASHED mode, the - * hash table contains an array of these structs for each tuple group. - * - * Logically, the sortstate field belongs in this struct, but we do not - * keep it here for space reasons: we don't support DISTINCT aggregates - * in AGG_HASHED mode, so there's no reason to use up a pointer field - * in every entry of the hashtable. - */ -typedef struct AggStatePerGroupData -{ - Datum transValue; /* current transition value */ - bool transValueIsNull; - - bool noTransValue; /* true if transValue not set yet */ - - /* - * Note: noTransValue initially has the same value as transValueIsNull, - * and if true both are cleared to false at the same time. They are not - * the same though: if transfn later returns a NULL, we want to keep that - * NULL and not auto-replace it with a later input value. Only the first - * non-NULL input will be auto-substituted. - */ -} AggStatePerGroupData; - -/* - * AggStatePerPhaseData - per-grouping-set-phase state - * - * Grouping sets are divided into "phases", where a single phase can be - * processed in one pass over the input. If there is more than one phase, then - * at the end of input from the current phase, state is reset and another pass - * taken over the data which has been re-sorted in the mean time. - * - * Accordingly, each phase specifies a list of grouping sets and group clause - * information, plus each phase after the first also has a sort order. - */ -typedef struct AggStatePerPhaseData -{ - AggStrategy aggstrategy; /* strategy for this phase */ - int numsets; /* number of grouping sets (or 0) */ - int *gset_lengths; /* lengths of grouping sets */ - Bitmapset **grouped_cols; /* column groupings for rollup */ - FmgrInfo *eqfunctions; /* per-grouping-field equality fns */ - Agg *aggnode; /* Agg node for phase data */ - Sort *sortnode; /* Sort node for input ordering for phase */ -} AggStatePerPhaseData; - -/* - * AggStatePerHashData - per-hashtable state - * - * When doing grouping sets with hashing, we have one of these for each - * grouping set. (When doing hashing without grouping sets, we have just one of - * them.) - */ -typedef struct AggStatePerHashData -{ - TupleHashTable hashtable; /* hash table with one entry per group */ - TupleHashIterator hashiter; /* for iterating through hash table */ - TupleTableSlot *hashslot; /* slot for loading hash table */ - FmgrInfo *hashfunctions; /* per-grouping-field hash fns */ - FmgrInfo *eqfunctions; /* per-grouping-field equality fns */ - int numCols; /* number of hash key columns */ - int numhashGrpCols; /* number of columns in hash table */ - int largestGrpColIdx; /* largest col required for hashing */ - AttrNumber *hashGrpColIdxInput; /* hash col indices in input slot */ - AttrNumber *hashGrpColIdxHash; /* indices in hashtbl tuples */ - Agg *aggnode; /* original Agg node, for numGroups etc. */ -} AggStatePerHashData; - - static void select_current_set(AggState *aggstate, int setno, bool is_hash); static void initialize_phase(AggState *aggstate, int newphase); static TupleTableSlot *fetch_input_tuple(AggState *aggstate); @@ -537,13 +248,7 @@ static void initialize_aggregates(AggState *aggstate, static void advance_transition_function(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroupstate); -static void advance_aggregates(AggState *aggstate, - AggStatePerGroup *sort_pergroups, - AggStatePerGroup *hash_pergroups); -static void advance_combine_function(AggState *aggstate, - AggStatePerTrans pertrans, - AggStatePerGroup pergroupstate); -static void combine_aggregates(AggState *aggstate, AggStatePerGroup pergroup); +static void advance_aggregates(AggState *aggstate); static void process_ordered_aggregate_single(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroupstate); @@ -569,7 +274,7 @@ static Bitmapset *find_unaggregated_cols(AggState *aggstate); static bool find_unaggregated_cols_walker(Node *node, Bitmapset **colnos); static void build_hash_table(AggState *aggstate); static TupleHashEntryData *lookup_hash_entry(AggState *aggstate); -static AggStatePerGroup *lookup_hash_entries(AggState *aggstate); +static void lookup_hash_entries(AggState *aggstate); static TupleTableSlot *agg_retrieve_direct(AggState *aggstate); static void agg_fill_hash_table(AggState *aggstate); static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate); @@ -597,6 +302,7 @@ static int find_compatible_pertrans(AggState *aggstate, Aggref *newagg, static void select_current_set(AggState *aggstate, int setno, bool is_hash) { + /* when changing this, also adapt ExecInterpExpr() and friends */ if (is_hash) aggstate->curaggcontext = aggstate->hashcontext; else @@ -967,351 +673,16 @@ advance_transition_function(AggState *aggstate, * When called, CurrentMemoryContext should be the per-query context. */ static void -advance_aggregates(AggState *aggstate, - AggStatePerGroup *sort_pergroups, - AggStatePerGroup *hash_pergroups) +advance_aggregates(AggState *aggstate) { - int transno; - int setno = 0; - int numGroupingSets = Max(aggstate->phase->numsets, 1); - int numHashes = aggstate->num_hashes; - int numTrans = aggstate->numtrans; - TupleTableSlot *combinedslot; - - /* compute required inputs for all aggregates */ - combinedslot = ExecProject(aggstate->combinedproj); - - for (transno = 0; transno < numTrans; transno++) - { - AggStatePerTrans pertrans = &aggstate->pertrans[transno]; - int numTransInputs = pertrans->numTransInputs; - int inputoff = pertrans->inputoff; - TupleTableSlot *slot; - int i; - - /* Skip anything FILTERed out */ - if (pertrans->aggref->aggfilter) - { - /* Check the result of the filter expression */ - if (combinedslot->tts_isnull[inputoff] || - !DatumGetBool(combinedslot->tts_values[inputoff])) - continue; - - /* Now it's safe to evaluate this agg's arguments */ - slot = ExecProject(pertrans->evalproj); - /* There's no offset needed in this slot, of course */ - inputoff = 0; - } - else - { - /* arguments are already evaluated into combinedslot @ inputoff */ - slot = combinedslot; - } - - if (pertrans->numSortCols > 0) - { - /* DISTINCT and/or ORDER BY case */ - Assert(slot->tts_nvalid >= (pertrans->numInputs + inputoff)); - Assert(!hash_pergroups); - - /* - * If the transfn is strict, we want to check for nullity before - * storing the row in the sorter, to save space if there are a lot - * of nulls. Note that we must only check numTransInputs columns, - * not numInputs, since nullity in columns used only for sorting - * is not relevant here. - */ - if (pertrans->transfn.fn_strict) - { - for (i = 0; i < numTransInputs; i++) - { - if (slot->tts_isnull[i + inputoff]) - break; - } - if (i < numTransInputs) - continue; - } - - for (setno = 0; setno < numGroupingSets; setno++) - { - /* OK, put the tuple into the tuplesort object */ - if (pertrans->numInputs == 1) - tuplesort_putdatum(pertrans->sortstates[setno], - slot->tts_values[inputoff], - slot->tts_isnull[inputoff]); - else if (pertrans->aggref->aggfilter) - { - /* - * When filtering and ordering, we already have a slot - * containing just the argument columns. - */ - Assert(slot == pertrans->sortslot); - tuplesort_puttupleslot(pertrans->sortstates[setno], slot); - } - else - { - /* - * Copy argument columns from combined slot, starting at - * inputoff, into sortslot, so that we can store just the - * columns we want. - */ - ExecClearTuple(pertrans->sortslot); - memcpy(pertrans->sortslot->tts_values, - &slot->tts_values[inputoff], - pertrans->numInputs * sizeof(Datum)); - memcpy(pertrans->sortslot->tts_isnull, - &slot->tts_isnull[inputoff], - pertrans->numInputs * sizeof(bool)); - ExecStoreVirtualTuple(pertrans->sortslot); - tuplesort_puttupleslot(pertrans->sortstates[setno], - pertrans->sortslot); - } - } - } - else - { - /* We can apply the transition function immediately */ - FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo; + bool dummynull; - /* Load values into fcinfo */ - /* Start from 1, since the 0th arg will be the transition value */ - Assert(slot->tts_nvalid >= (numTransInputs + inputoff)); - - for (i = 0; i < numTransInputs; i++) - { - fcinfo->arg[i + 1] = slot->tts_values[i + inputoff]; - fcinfo->argnull[i + 1] = slot->tts_isnull[i + inputoff]; - } - - if (sort_pergroups) - { - /* advance transition states for ordered grouping */ - - for (setno = 0; setno < numGroupingSets; setno++) - { - AggStatePerGroup pergroupstate; - - select_current_set(aggstate, setno, false); - - pergroupstate = &sort_pergroups[setno][transno]; - - advance_transition_function(aggstate, pertrans, pergroupstate); - } - } - - if (hash_pergroups) - { - /* advance transition states for hashed grouping */ - - for (setno = 0; setno < numHashes; setno++) - { - AggStatePerGroup pergroupstate; - - select_current_set(aggstate, setno, true); - - pergroupstate = &hash_pergroups[setno][transno]; - - advance_transition_function(aggstate, pertrans, pergroupstate); - } - } - } - } + ExecEvalExprSwitchContext(aggstate->phase->evaltrans, + aggstate->tmpcontext, + &dummynull); } /* - * combine_aggregates replaces advance_aggregates in DO_AGGSPLIT_COMBINE - * mode. The principal difference is that here we may need to apply the - * deserialization function before running the transfn (which, in this mode, - * is actually the aggregate's combinefn). Also, we know we don't need to - * handle FILTER, DISTINCT, ORDER BY, or grouping sets. - */ -static void -combine_aggregates(AggState *aggstate, AggStatePerGroup pergroup) -{ - int transno; - int numTrans = aggstate->numtrans; - TupleTableSlot *slot; - - /* combine not supported with grouping sets */ - Assert(aggstate->phase->numsets <= 1); - - /* compute input for all aggregates */ - slot = ExecProject(aggstate->combinedproj); - - for (transno = 0; transno < numTrans; transno++) - { - AggStatePerTrans pertrans = &aggstate->pertrans[transno]; - AggStatePerGroup pergroupstate = &pergroup[transno]; - FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo; - int inputoff = pertrans->inputoff; - - Assert(slot->tts_nvalid > inputoff); - - /* - * deserialfn_oid will be set if we must deserialize the input state - * before calling the combine function - */ - if (OidIsValid(pertrans->deserialfn_oid)) - { - /* Don't call a strict deserialization function with NULL input */ - if (pertrans->deserialfn.fn_strict && slot->tts_isnull[inputoff]) - { - fcinfo->arg[1] = slot->tts_values[inputoff]; - fcinfo->argnull[1] = slot->tts_isnull[inputoff]; - } - else - { - FunctionCallInfo dsinfo = &pertrans->deserialfn_fcinfo; - MemoryContext oldContext; - - dsinfo->arg[0] = slot->tts_values[inputoff]; - dsinfo->argnull[0] = slot->tts_isnull[inputoff]; - /* Dummy second argument for type-safety reasons */ - dsinfo->arg[1] = PointerGetDatum(NULL); - dsinfo->argnull[1] = false; - - /* - * We run the deserialization functions in per-input-tuple - * memory context. - */ - oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory); - - fcinfo->arg[1] = FunctionCallInvoke(dsinfo); - fcinfo->argnull[1] = dsinfo->isnull; - - MemoryContextSwitchTo(oldContext); - } - } - else - { - fcinfo->arg[1] = slot->tts_values[inputoff]; - fcinfo->argnull[1] = slot->tts_isnull[inputoff]; - } - - advance_combine_function(aggstate, pertrans, pergroupstate); - } -} - -/* - * Perform combination of states between 2 aggregate states. Effectively this - * 'adds' two states together by whichever logic is defined in the aggregate - * function's combine function. - * - * Note that in this case transfn is set to the combination function. This - * perhaps should be changed to avoid confusion, but one field is ok for now - * as they'll never be needed at the same time. - */ -static void -advance_combine_function(AggState *aggstate, - AggStatePerTrans pertrans, - AggStatePerGroup pergroupstate) -{ - FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo; - MemoryContext oldContext; - Datum newVal; - - if (pertrans->transfn.fn_strict) - { - /* if we're asked to merge to a NULL state, then do nothing */ - if (fcinfo->argnull[1]) - return; - - if (pergroupstate->noTransValue) - { - /* - * transValue has not yet been initialized. If pass-by-ref - * datatype we must copy the combining state value into - * aggcontext. - */ - if (!pertrans->transtypeByVal) - { - oldContext = MemoryContextSwitchTo( - aggstate->curaggcontext->ecxt_per_tuple_memory); - pergroupstate->transValue = datumCopy(fcinfo->arg[1], - pertrans->transtypeByVal, - pertrans->transtypeLen); - MemoryContextSwitchTo(oldContext); - } - else - pergroupstate->transValue = fcinfo->arg[1]; - - pergroupstate->transValueIsNull = false; - pergroupstate->noTransValue = false; - return; - } - - if (pergroupstate->transValueIsNull) - { - /* - * Don't call a strict function with NULL inputs. Note it is - * possible to get here despite the above tests, if the combinefn - * is strict *and* returned a NULL on a prior cycle. If that - * happens we will propagate the NULL all the way to the end. - */ - return; - } - } - - /* We run the combine functions in per-input-tuple memory context */ - oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory); - - /* set up aggstate->curpertrans for AggGetAggref() */ - aggstate->curpertrans = pertrans; - - /* - * OK to call the combine function - */ - fcinfo->arg[0] = pergroupstate->transValue; - fcinfo->argnull[0] = pergroupstate->transValueIsNull; - fcinfo->isnull = false; /* just in case combine func doesn't set it */ - - newVal = FunctionCallInvoke(fcinfo); - - aggstate->curpertrans = NULL; - - /* - * If pass-by-ref datatype, must copy the new value into aggcontext and - * free the prior transValue. But if the combine function returned a - * pointer to its first input, we don't need to do anything. Also, if the - * combine function returned a pointer to a R/W expanded object that is - * already a child of the aggcontext, assume we can adopt that value - * without copying it. - */ - if (!pertrans->transtypeByVal && - DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue)) - { - if (!fcinfo->isnull) - { - MemoryContextSwitchTo(aggstate->curaggcontext->ecxt_per_tuple_memory); - if (DatumIsReadWriteExpandedObject(newVal, - false, - pertrans->transtypeLen) && - MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext) - /* do nothing */ ; - else - newVal = datumCopy(newVal, - pertrans->transtypeByVal, - pertrans->transtypeLen); - } - if (!pergroupstate->transValueIsNull) - { - if (DatumIsReadWriteExpandedObject(pergroupstate->transValue, - false, - pertrans->transtypeLen)) - DeleteExpandedObject(pergroupstate->transValue); - else - pfree(DatumGetPointer(pergroupstate->transValue)); - } - } - - pergroupstate->transValue = newVal; - pergroupstate->transValueIsNull = fcinfo->isnull; - - MemoryContextSwitchTo(oldContext); -} - - -/* * Run the transition function for a DISTINCT or ORDER BY aggregate * with only one input. This is called after we have completed * entering all the input values into the sort object. We complete the @@ -2118,7 +1489,7 @@ lookup_hash_entry(AggState *aggstate) * * Be aware that lookup_hash_entry can reset the tmpcontext. */ -static AggStatePerGroup * +static void lookup_hash_entries(AggState *aggstate) { int numHashes = aggstate->num_hashes; @@ -2130,8 +1501,6 @@ lookup_hash_entries(AggState *aggstate) select_current_set(aggstate, setno, true); pergroup[setno] = lookup_hash_entry(aggstate)->additional; } - - return pergroup; } /* @@ -2191,7 +1560,6 @@ agg_retrieve_direct(AggState *aggstate) ExprContext *tmpcontext; AggStatePerAgg peragg; AggStatePerGroup *pergroups; - AggStatePerGroup *hash_pergroups = NULL; TupleTableSlot *outerslot; TupleTableSlot *firstSlot; TupleTableSlot *result; @@ -2446,15 +1814,11 @@ agg_retrieve_direct(AggState *aggstate) if (aggstate->aggstrategy == AGG_MIXED && aggstate->current_phase == 1) { - hash_pergroups = lookup_hash_entries(aggstate); + lookup_hash_entries(aggstate); } - else - hash_pergroups = NULL; - if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit)) - combine_aggregates(aggstate, pergroups[0]); - else - advance_aggregates(aggstate, pergroups, hash_pergroups); + /* Advance the aggregates (or combine functions) */ + advance_aggregates(aggstate); /* Reset per-input-tuple context after each tuple */ ResetExprContext(tmpcontext); @@ -2548,8 +1912,6 @@ agg_fill_hash_table(AggState *aggstate) */ for (;;) { - AggStatePerGroup *pergroups; - outerslot = fetch_input_tuple(aggstate); if (TupIsNull(outerslot)) break; @@ -2558,13 +1920,10 @@ agg_fill_hash_table(AggState *aggstate) tmpcontext->ecxt_outertuple = outerslot; /* Find or build hashtable entries */ - pergroups = lookup_hash_entries(aggstate); + lookup_hash_entries(aggstate); - /* Advance the aggregates */ - if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit)) - combine_aggregates(aggstate, pergroups[0]); - else - advance_aggregates(aggstate, NULL, pergroups); + /* Advance the aggregates (or combine functions) */ + advance_aggregates(aggstate); /* * Reset per-input-tuple context after each tuple, but note that the @@ -2716,6 +2075,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) AggState *aggstate; AggStatePerAgg peraggs; AggStatePerTrans pertransstates; + AggStatePerGroup *pergroups; Plan *outerPlan; ExprContext *econtext; int numaggs, @@ -2723,15 +2083,11 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) aggno; int phase; int phaseidx; - List *combined_inputeval; - TupleDesc combineddesc; - TupleTableSlot *combinedslot; ListCell *l; Bitmapset *all_grouped_cols = NULL; int numGroupingSets = 1; int numPhases; int numHashes; - int column_offset; int i = 0; int j = 0; bool use_hashing = (node->aggstrategy == AGG_HASHED || @@ -3033,6 +2389,24 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) aggstate->peragg = peraggs; aggstate->pertrans = pertransstates; + + aggstate->all_pergroups = + (AggStatePerGroup *) palloc0(sizeof(AggStatePerGroup) + * (numGroupingSets + numHashes)); + pergroups = aggstate->all_pergroups; + + if (node->aggstrategy != AGG_HASHED) + { + for (i = 0; i < numGroupingSets; i++) + { + pergroups[i] = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData) + * numaggs); + } + + aggstate->pergroups = pergroups; + pergroups += numGroupingSets; + } + /* * Hashing can only appear in the initial phase. */ @@ -3049,27 +2423,13 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) } /* this is an array of pointers, not structures */ - aggstate->hash_pergroup = palloc0(sizeof(AggStatePerGroup) * numHashes); + aggstate->hash_pergroup = pergroups; find_hash_columns(aggstate); build_hash_table(aggstate); aggstate->table_filled = false; } - if (node->aggstrategy != AGG_HASHED) - { - AggStatePerGroup *pergroups; - - pergroups = (AggStatePerGroup *) palloc0(sizeof(AggStatePerGroup) * - numGroupingSets); - - for (i = 0; i < numGroupingSets; i++) - pergroups[i] = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData) - * numaggs); - - aggstate->pergroups = pergroups; - } - /* * Initialize current phase-dependent values to initial phase. The initial * phase is 1 (first sort pass) for all strategies that use sorting (if @@ -3409,98 +2769,72 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) aggstate->numtrans = transno + 1; /* - * Build a single projection computing the required arguments for all - * aggregates at once; if there's more than one, that's considerably - * faster than doing it separately for each. - * - * First create a targetlist representing the values to compute. + * Last, check whether any more aggregates got added onto the node while + * we processed the expressions for the aggregate arguments (including not + * only the regular arguments and FILTER expressions handled immediately + * above, but any direct arguments we might've handled earlier). If so, + * we have nested aggregate functions, which is semantically nonsensical, + * so complain. (This should have been caught by the parser, so we don't + * need to work hard on a helpful error message; but we defend against it + * here anyway, just to be sure.) */ - combined_inputeval = NIL; - column_offset = 0; - for (transno = 0; transno < aggstate->numtrans; transno++) + if (numaggs != list_length(aggstate->aggs)) + ereport(ERROR, + (errcode(ERRCODE_GROUPING_ERROR), + errmsg("aggregate function calls cannot be nested"))); + + /* + * Build expressions doing all the transition work at once. We build a + * different one for each phase, as the number of transition function + * invocation can differ between phases. Note this'll work both for + * transition and combination functions (although there'll only be one + * phase in the latter case). + */ + for (phaseidx = 0; phaseidx < aggstate->numphases; phaseidx++) { - AggStatePerTrans pertrans = &pertransstates[transno]; + AggStatePerPhase phase = &aggstate->phases[phaseidx]; + bool dohash = false; + bool dosort = false; - /* - * Mark this per-trans state with its starting column in the combined - * slot. - */ - pertrans->inputoff = column_offset; + /* phase 0 doesn't necessarily exist */ + if (!phase->aggnode) + continue; - /* - * If the aggregate has a FILTER, we can only evaluate the filter - * expression, not the actual input expressions, during the combined - * eval step --- unless we're ignoring the filter because this node is - * running combinefns not transfns. - */ - if (pertrans->aggref->aggfilter && - !DO_AGGSPLIT_COMBINE(aggstate->aggsplit)) + if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 1) { - TargetEntry *tle; - - tle = makeTargetEntry(pertrans->aggref->aggfilter, - column_offset + 1, NULL, false); - combined_inputeval = lappend(combined_inputeval, tle); - column_offset++; - /* - * We'll need separate projection machinery for the real args. - * Arrange to evaluate them into the sortslot previously created. + * Phase one, and only phase one, in a mixed agg performs both + * sorting and aggregation. */ - Assert(pertrans->sortslot); - pertrans->evalproj = ExecBuildProjectionInfo(pertrans->aggref->args, - aggstate->tmpcontext, - pertrans->sortslot, - &aggstate->ss.ps, - NULL); + dohash = true; + dosort = true; } - else + else if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 0) { /* - * Add agg's input expressions to combined_inputeval, adjusting - * resnos in the copied target entries to match the combined slot. + * No need to compute a transition function for an AGG_MIXED phase + * 0 - the contents of the hashtables will have been computed + * during phase 1. */ - ListCell *arg; - - foreach(arg, pertrans->aggref->args) - { - TargetEntry *source_tle = lfirst_node(TargetEntry, arg); - TargetEntry *tle; - - tle = flatCopyTargetEntry(source_tle); - tle->resno += column_offset; - - combined_inputeval = lappend(combined_inputeval, tle); - } - - column_offset += list_length(pertrans->aggref->args); + continue; } - } + else if (phase->aggstrategy == AGG_PLAIN || + phase->aggstrategy == AGG_SORTED) + { + dohash = false; + dosort = true; + } + else if (phase->aggstrategy == AGG_HASHED) + { + dohash = true; + dosort = false; + } + else + Assert(false); - /* Now create a projection for the combined targetlist */ - combineddesc = ExecTypeFromTL(combined_inputeval, false); - combinedslot = ExecInitExtraTupleSlot(estate); - ExecSetSlotDescriptor(combinedslot, combineddesc); - aggstate->combinedproj = ExecBuildProjectionInfo(combined_inputeval, - aggstate->tmpcontext, - combinedslot, - &aggstate->ss.ps, - NULL); + phase->evaltrans = ExecBuildAggTrans(aggstate, phase, dosort, dohash); - /* - * Last, check whether any more aggregates got added onto the node while - * we processed the expressions for the aggregate arguments (including not - * only the regular arguments and FILTER expressions handled immediately - * above, but any direct arguments we might've handled earlier). If so, - * we have nested aggregate functions, which is semantically nonsensical, - * so complain. (This should have been caught by the parser, so we don't - * need to work hard on a helpful error message; but we defend against it - * here anyway, just to be sure.) - */ - if (numaggs != list_length(aggstate->aggs)) - ereport(ERROR, - (errcode(ERRCODE_GROUPING_ERROR), - errmsg("aggregate function calls cannot be nested"))); + } return aggstate; } @@ -3557,8 +2891,6 @@ build_pertrans_for_aggref(AggStatePerTrans pertrans, else pertrans->numTransInputs = numArguments; - /* inputoff and evalproj will be set up later, in ExecInitAgg */ - /* * When combining states, we have no use at all for the aggregate * function's transfn. Instead we use the combinefn. In this case, the |