diff options
Diffstat (limited to 'src/backend/executor')
-rw-r--r-- | src/backend/executor/nodeAgg.c | 861 |
1 files changed, 609 insertions, 252 deletions
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index 471acc4b3ec..ef35da6ade6 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -122,12 +122,19 @@ * specific). * * Where more complex grouping sets are used, we break them down into - * "phases", where each phase has a different sort order. During each - * phase but the last, the input tuples are additionally stored in a - * tuplesort which is keyed to the next phase's sort order; during each - * phase but the first, the input tuples are drawn from the previously - * sorted data. (The sorting of the data for the first phase is handled by - * the planner, as it might be satisfied by underlying nodes.) + * "phases", where each phase has a different sort order (except phase 0 + * which is reserved for hashing). During each phase but the last, the + * input tuples are additionally stored in a tuplesort which is keyed to the + * next phase's sort order; during each phase but the first, the input + * tuples are drawn from the previously sorted data. (The sorting of the + * data for the first phase is handled by the planner, as it might be + * satisfied by underlying nodes.) + * + * Hashing can be mixed with sorted grouping. To do this, we have an + * AGG_MIXED strategy that populates the hashtables during the first sorted + * phase, and switches to reading them out after completing all sort phases. + * We can also support AGG_HASHED with multiple hash tables and no sorting + * at all. * * From the perspective of aggregate transition and final functions, the * only issue regarding grouping sets is this: a single call site (flinfo) @@ -139,7 +146,54 @@ * sensitive to the grouping set for which the aggregate function is * currently being called. * - * TODO: AGG_HASHED doesn't support multiple grouping sets yet. + * Plan structure: + * + * What we get from the planner is actually one "real" Agg node which is + * part of the plan tree proper, but which optionally has an additional list + * of Agg nodes hung off the side via the "chain" field. This is because an + * Agg node happens to be a convenient representation of all the data we + * need for grouping sets. + * + * For many purposes, we treat the "real" node as if it were just the first + * node in the chain. The chain must be ordered such that hashed entries + * come before sorted/plain entries; the real node is marked AGG_MIXED if + * there are both types present (in which case the real node describes one + * of the hashed groupings, other AGG_HASHED nodes may optionally follow in + * the chain, followed in turn by AGG_SORTED or (one) AGG_PLAIN node). If + * the real node is marked AGG_HASHED or AGG_SORTED, then all the chained + * nodes must be of the same type; if it is AGG_PLAIN, there can be no + * chained nodes. + * + * We collect all hashed nodes into a single "phase", numbered 0, and create + * a sorted phase (numbered 1..n) for each AGG_SORTED or AGG_PLAIN node. + * Phase 0 is allocated even if there are no hashes, but remains unused in + * that case. + * + * AGG_HASHED nodes actually refer to only a single grouping set each, + * because for each hashed grouping we need a separate grpColIdx and + * numGroups estimate. AGG_SORTED nodes represent a "rollup", a list of + * grouping sets that share a sort order. Each AGG_SORTED node other than + * the first one has an associated Sort node which describes the sort order + * to be used; the first sorted node takes its input from the outer subtree, + * which the planner has already arranged to provide ordered data. + * + * Memory and ExprContext usage: + * + * Because we're accumulating aggregate values across input rows, we need to + * use more memory contexts than just simple input/output tuple contexts. + * In fact, for a rollup, we need a separate context for each grouping set + * so that we can reset the inner (finer-grained) aggregates on their group + * boundaries while continuing to accumulate values for outer + * (coarser-grained) groupings. On top of this, we might be simultaneously + * populating hashtables; however, we only need one context for all the + * hashtables. + * + * So we create an array, aggcontexts, with an ExprContext for each grouping + * set in the largest rollup that we're going to process, and use the + * per-tuple memory context of those ExprContexts to store the aggregate + * transition values. hashcontext is the single context created to support + * all hash tables. + * * * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California @@ -432,6 +486,7 @@ typedef struct AggStatePerGroupData */ typedef struct AggStatePerPhaseData { + AggStrategy aggstrategy; /* strategy for this phase */ int numsets; /* number of grouping sets (or 0) */ int *gset_lengths; /* lengths of grouping sets */ Bitmapset **grouped_cols; /* column groupings for rollup */ @@ -440,7 +495,30 @@ typedef struct AggStatePerPhaseData Sort *sortnode; /* Sort node for input ordering for phase */ } AggStatePerPhaseData; +/* + * AggStatePerHashData - per-hashtable state + * + * When doing grouping sets with hashing, we have one of these for each + * grouping set. (When doing hashing without grouping sets, we have just one of + * them.) + */ +typedef struct AggStatePerHashData +{ + TupleHashTable hashtable; /* hash table with one entry per group */ + TupleHashIterator hashiter; /* for iterating through hash table */ + TupleTableSlot *hashslot; /* slot for loading hash table */ + FmgrInfo *hashfunctions; /* per-grouping-field hash fns */ + FmgrInfo *eqfunctions; /* per-grouping-field equality fns */ + int numCols; /* number of hash key columns */ + int numhashGrpCols; /* number of columns in hash table */ + int largestGrpColIdx; /* largest col required for hashing */ + AttrNumber *hashGrpColIdxInput; /* hash col indices in input slot */ + AttrNumber *hashGrpColIdxHash; /* indices in hashtbl tuples */ + Agg *aggnode; /* original Agg node, for numGroups etc. */ +} AggStatePerHashData; + +static void select_current_set(AggState *aggstate, int setno, bool is_hash); static void initialize_phase(AggState *aggstate, int newphase); static TupleTableSlot *fetch_input_tuple(AggState *aggstate); static void initialize_aggregates(AggState *aggstate, @@ -449,7 +527,8 @@ static void initialize_aggregates(AggState *aggstate, static void advance_transition_function(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroupstate); -static void advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup); +static void advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup, + AggStatePerGroup *pergroups); static void advance_combine_function(AggState *aggstate, AggStatePerTrans pertrans, AggStatePerGroup pergroupstate); @@ -473,14 +552,13 @@ static void prepare_projection_slot(AggState *aggstate, int currentSet); static void finalize_aggregates(AggState *aggstate, AggStatePerAgg peragg, - AggStatePerGroup pergroup, - int currentSet); + AggStatePerGroup pergroup); static TupleTableSlot *project_aggregates(AggState *aggstate); static Bitmapset *find_unaggregated_cols(AggState *aggstate); static bool find_unaggregated_cols_walker(Node *node, Bitmapset **colnos); static void build_hash_table(AggState *aggstate); -static TupleHashEntryData *lookup_hash_entry(AggState *aggstate, - TupleTableSlot *inputslot); +static TupleHashEntryData *lookup_hash_entry(AggState *aggstate); +static AggStatePerGroup *lookup_hash_entries(AggState *aggstate); static TupleTableSlot *agg_retrieve_direct(AggState *aggstate); static void agg_fill_hash_table(AggState *aggstate); static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate); @@ -501,13 +579,31 @@ static int find_compatible_pertrans(AggState *aggstate, Aggref *newagg, /* - * Switch to phase "newphase", which must either be 0 (to reset) or + * Select the current grouping set; affects current_set and + * curaggcontext. + */ +static void +select_current_set(AggState *aggstate, int setno, bool is_hash) +{ + if (is_hash) + aggstate->curaggcontext = aggstate->hashcontext; + else + aggstate->curaggcontext = aggstate->aggcontexts[setno]; + + aggstate->current_set = setno; +} + +/* + * Switch to phase "newphase", which must either be 0 or 1 (to reset) or * current_phase + 1. Juggle the tuplesorts accordingly. + * + * Phase 0 is for hashing, which we currently handle last in the AGG_MIXED + * case, so when entering phase 0, all we need to do is drop open sorts. */ static void initialize_phase(AggState *aggstate, int newphase) { - Assert(newphase == 0 || newphase == aggstate->current_phase + 1); + Assert(newphase <= 1 || newphase == aggstate->current_phase + 1); /* * Whatever the previous state, we're now done with whatever input @@ -519,7 +615,7 @@ initialize_phase(AggState *aggstate, int newphase) aggstate->sort_in = NULL; } - if (newphase == 0) + if (newphase <= 1) { /* * Discard any existing output tuplesort. @@ -546,7 +642,7 @@ initialize_phase(AggState *aggstate, int newphase) * If this isn't the last phase, we need to sort appropriately for the * next phase in sequence. */ - if (newphase < aggstate->numphases - 1) + if (newphase > 0 && newphase < aggstate->numphases - 1) { Sort *sortnode = aggstate->phases[newphase + 1].sortnode; PlanState *outerNode = outerPlanState(aggstate); @@ -567,7 +663,7 @@ initialize_phase(AggState *aggstate, int newphase) } /* - * Fetch a tuple from either the outer plan (for phase 0) or from the sorter + * Fetch a tuple from either the outer plan (for phase 1) or from the sorter * populated by the previous phase. Copy it to the sorter for the next phase * if any. */ @@ -595,8 +691,8 @@ fetch_input_tuple(AggState *aggstate) /* * (Re)Initialize an individual aggregate. * - * This function handles only one grouping set (already set in - * aggstate->current_set). + * This function handles only one grouping set, already set in + * aggstate->current_set. * * When called, CurrentMemoryContext should be the per-query context. */ @@ -653,7 +749,7 @@ initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans, MemoryContext oldContext; oldContext = MemoryContextSwitchTo( - aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory); + aggstate->curaggcontext->ecxt_per_tuple_memory); pergroupstate->transValue = datumCopy(pertrans->initValue, pertrans->transtypeByVal, pertrans->transtypeLen); @@ -676,8 +772,9 @@ initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans, * * If there are multiple grouping sets, we initialize only the first numReset * of them (the grouping sets are ordered so that the most specific one, which - * is reset most often, is first). As a convenience, if numReset is < 1, we - * reinitialize all sets. + * is reset most often, is first). As a convenience, if numReset is 0, we + * reinitialize all sets. numReset is -1 to initialize a hashtable entry, in + * which case the caller must have used select_current_set appropriately. * * When called, CurrentMemoryContext should be the per-query context. */ @@ -689,25 +786,37 @@ initialize_aggregates(AggState *aggstate, int transno; int numGroupingSets = Max(aggstate->phase->numsets, 1); int setno = 0; + int numTrans = aggstate->numtrans; AggStatePerTrans transstates = aggstate->pertrans; - if (numReset < 1) + if (numReset == 0) numReset = numGroupingSets; - for (transno = 0; transno < aggstate->numtrans; transno++) + for (transno = 0; transno < numTrans; transno++) { AggStatePerTrans pertrans = &transstates[transno]; - for (setno = 0; setno < numReset; setno++) + if (numReset < 0) { AggStatePerGroup pergroupstate; - pergroupstate = &pergroup[transno + (setno * (aggstate->numtrans))]; - - aggstate->current_set = setno; + pergroupstate = &pergroup[transno]; initialize_aggregate(aggstate, pertrans, pergroupstate); } + else + { + for (setno = 0; setno < numReset; setno++) + { + AggStatePerGroup pergroupstate; + + pergroupstate = &pergroup[transno + (setno * numTrans)]; + + select_current_set(aggstate, setno, false); + + initialize_aggregate(aggstate, pertrans, pergroupstate); + } + } } } @@ -757,7 +866,7 @@ advance_transition_function(AggState *aggstate, * do not need to pfree the old transValue, since it's NULL. */ oldContext = MemoryContextSwitchTo( - aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory); + aggstate->curaggcontext->ecxt_per_tuple_memory); pergroupstate->transValue = datumCopy(fcinfo->arg[1], pertrans->transtypeByVal, pertrans->transtypeLen); @@ -807,7 +916,7 @@ advance_transition_function(AggState *aggstate, { if (!fcinfo->isnull) { - MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory); + MemoryContextSwitchTo(aggstate->curaggcontext->ecxt_per_tuple_memory); if (DatumIsReadWriteExpandedObject(newVal, false, pertrans->transtypeLen) && @@ -838,17 +947,21 @@ advance_transition_function(AggState *aggstate, /* * Advance each aggregate transition state for one input tuple. The input * tuple has been stored in tmpcontext->ecxt_outertuple, so that it is - * accessible to ExecEvalExpr. pergroup is the array of per-group structs to - * use (this might be in a hashtable entry). + * accessible to ExecEvalExpr. + * + * We have two sets of transition states to handle: one for sorted aggregation + * and one for hashed; we do them both here, to avoid multiple evaluation of + * the inputs. * * When called, CurrentMemoryContext should be the per-query context. */ static void -advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup) +advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup, AggStatePerGroup *pergroups) { int transno; int setno = 0; int numGroupingSets = Max(aggstate->phase->numsets, 1); + int numHashes = aggstate->num_hashes; int numTrans = aggstate->numtrans; TupleTableSlot *slot = aggstate->evalslot; @@ -880,6 +993,7 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup) { /* DISTINCT and/or ORDER BY case */ Assert(slot->tts_nvalid >= (pertrans->numInputs + inputoff)); + Assert(!pergroups); /* * If the transfn is strict, we want to check for nullity before @@ -940,13 +1054,36 @@ advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup) fcinfo->argnull[i + 1] = slot->tts_isnull[i + inputoff]; } - for (setno = 0; setno < numGroupingSets; setno++) + if (pergroup) { - AggStatePerGroup pergroupstate = &pergroup[transno + (setno * numTrans)]; + /* advance transition states for ordered grouping */ + + for (setno = 0; setno < numGroupingSets; setno++) + { + AggStatePerGroup pergroupstate; - aggstate->current_set = setno; + select_current_set(aggstate, setno, false); - advance_transition_function(aggstate, pertrans, pergroupstate); + pergroupstate = &pergroup[transno + (setno * numTrans)]; + + advance_transition_function(aggstate, pertrans, pergroupstate); + } + } + + if (pergroups) + { + /* advance transition states for hashed grouping */ + + for (setno = 0; setno < numHashes; setno++) + { + AggStatePerGroup pergroupstate; + + select_current_set(aggstate, setno, true); + + pergroupstate = &pergroups[setno][transno]; + + advance_transition_function(aggstate, pertrans, pergroupstate); + } } } } @@ -967,7 +1104,7 @@ combine_aggregates(AggState *aggstate, AggStatePerGroup pergroup) TupleTableSlot *slot; /* combine not supported with grouping sets */ - Assert(aggstate->phase->numsets == 0); + Assert(aggstate->phase->numsets <= 1); /* compute input for all aggregates */ slot = ExecProject(aggstate->evalproj); @@ -1060,7 +1197,7 @@ advance_combine_function(AggState *aggstate, if (!pertrans->transtypeByVal) { oldContext = MemoryContextSwitchTo( - aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory); + aggstate->curaggcontext->ecxt_per_tuple_memory); pergroupstate->transValue = datumCopy(fcinfo->arg[1], pertrans->transtypeByVal, pertrans->transtypeLen); @@ -1105,7 +1242,7 @@ advance_combine_function(AggState *aggstate, { if (!fcinfo->isnull) { - MemoryContextSwitchTo(aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory); + MemoryContextSwitchTo(aggstate->curaggcontext->ecxt_per_tuple_memory); if (DatumIsReadWriteExpandedObject(newVal, false, pertrans->transtypeLen) && @@ -1559,15 +1696,16 @@ prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet /* * Compute the final value of all aggregates for one group. * - * This function handles only one grouping set at a time. + * This function handles only one grouping set at a time, which the caller must + * have selected. It's also the caller's responsibility to adjust the supplied + * pergroup parameter to point to the current set's transvalues. * * Results are stored in the output econtext aggvalues/aggnulls. */ static void finalize_aggregates(AggState *aggstate, AggStatePerAgg peraggs, - AggStatePerGroup pergroup, - int currentSet) + AggStatePerGroup pergroup) { ExprContext *econtext = aggstate->ss.ps.ps_ExprContext; Datum *aggvalues = econtext->ecxt_aggvalues; @@ -1575,11 +1713,6 @@ finalize_aggregates(AggState *aggstate, int aggno; int transno; - Assert(currentSet == 0 || - ((Agg *) aggstate->ss.ps.plan)->aggstrategy != AGG_HASHED); - - aggstate->current_set = currentSet; - /* * If there were any DISTINCT and/or ORDER BY aggregates, sort their * inputs and run the transition functions. @@ -1589,11 +1722,12 @@ finalize_aggregates(AggState *aggstate, AggStatePerTrans pertrans = &aggstate->pertrans[transno]; AggStatePerGroup pergroupstate; - pergroupstate = &pergroup[transno + (currentSet * (aggstate->numtrans))]; + pergroupstate = &pergroup[transno]; if (pertrans->numSortCols > 0) { - Assert(((Agg *) aggstate->ss.ps.plan)->aggstrategy != AGG_HASHED); + Assert(aggstate->aggstrategy != AGG_HASHED && + aggstate->aggstrategy != AGG_MIXED); if (pertrans->numInputs == 1) process_ordered_aggregate_single(aggstate, @@ -1615,7 +1749,7 @@ finalize_aggregates(AggState *aggstate, int transno = peragg->transno; AggStatePerGroup pergroupstate; - pergroupstate = &pergroup[transno + (currentSet * (aggstate->numtrans))]; + pergroupstate = &pergroup[transno]; if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit)) finalize_partialaggregate(aggstate, peragg, pergroupstate, @@ -1697,7 +1831,7 @@ find_unaggregated_cols_walker(Node *node, Bitmapset **colnos) } /* - * Initialize the hash table to empty. + * Initialize the hash table(s) to empty. * * To implement hashed aggregation, we need a hashtable that stores a * representative tuple and an array of AggStatePerGroup structs for each @@ -1705,29 +1839,40 @@ find_unaggregated_cols_walker(Node *node, Bitmapset **colnos) * GROUP BY columns. The per-group data is allocated in lookup_hash_entry(), * for each entry. * - * The hash table always lives in the aggcontext memory context. + * We have a separate hashtable and associated perhash data structure for each + * grouping set for which we're doing hashing. + * + * The hash tables always live in the hashcontext's per-tuple memory context + * (there is only one of these for all tables together, since they are all + * reset at the same time). */ static void build_hash_table(AggState *aggstate) { - Agg *node = (Agg *) aggstate->ss.ps.plan; MemoryContext tmpmem = aggstate->tmpcontext->ecxt_per_tuple_memory; Size additionalsize; + int i; - Assert(node->aggstrategy == AGG_HASHED); - Assert(node->numGroups > 0); + Assert(aggstate->aggstrategy == AGG_HASHED || aggstate->aggstrategy == AGG_MIXED); - additionalsize = aggstate->numaggs * sizeof(AggStatePerGroupData); + additionalsize = aggstate->numtrans * sizeof(AggStatePerGroupData); - aggstate->hashtable = BuildTupleHashTable(node->numCols, - aggstate->hashGrpColIdxHash, - aggstate->phase->eqfunctions, - aggstate->hashfunctions, - node->numGroups, - additionalsize, - aggstate->aggcontexts[0]->ecxt_per_tuple_memory, - tmpmem, + for (i = 0; i < aggstate->num_hashes; ++i) + { + AggStatePerHash perhash = &aggstate->perhash[i]; + + Assert(perhash->aggnode->numGroups > 0); + + perhash->hashtable = BuildTupleHashTable(perhash->numCols, + perhash->hashGrpColIdxHash, + perhash->eqfunctions, + perhash->hashfunctions, + perhash->aggnode->numGroups, + additionalsize, + aggstate->hashcontext->ecxt_per_tuple_memory, + tmpmem, DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit)); + } } /* @@ -1750,72 +1895,98 @@ build_hash_table(AggState *aggstate) * the array is preserved over ExecReScanAgg, so we allocate it in the * per-query context (unlike the hash table itself). */ -static List * +static void find_hash_columns(AggState *aggstate) { - Agg *node = (Agg *) aggstate->ss.ps.plan; - Bitmapset *colnos; - List *collist; - TupleDesc hashDesc; + Bitmapset *base_colnos; List *outerTlist = outerPlanState(aggstate)->plan->targetlist; - List *hashTlist = NIL; - int i; - - aggstate->largestGrpColIdx = 0; + int numHashes = aggstate->num_hashes; + int j; /* Find Vars that will be needed in tlist and qual */ - colnos = find_unaggregated_cols(aggstate); - /* Add in all the grouping columns */ - for (i = 0; i < node->numCols; i++) - colnos = bms_add_member(colnos, node->grpColIdx[i]); - /* Convert to list, using lcons so largest element ends up first */ - collist = NIL; + base_colnos = find_unaggregated_cols(aggstate); - aggstate->hashGrpColIdxInput = - palloc(bms_num_members(colnos) * sizeof(AttrNumber)); - aggstate->hashGrpColIdxHash = - palloc(node->numCols * sizeof(AttrNumber)); - - /* - * First build mapping for columns directly hashed. These are the first, - * because they'll be accessed when computing hash values and comparing - * tuples for exact matches. We also build simple mapping for - * execGrouping, so it knows where to find the to-be-hashed / compared - * columns in the input. - */ - for (i = 0; i < node->numCols; i++) + for (j = 0; j < numHashes; ++j) { - aggstate->hashGrpColIdxInput[i] = node->grpColIdx[i]; - aggstate->hashGrpColIdxHash[i] = i + 1; - aggstate->numhashGrpCols++; - /* delete already mapped columns */ - bms_del_member(colnos, node->grpColIdx[i]); - } + AggStatePerHash perhash = &aggstate->perhash[j]; + Bitmapset *colnos = bms_copy(base_colnos); + AttrNumber *grpColIdx = perhash->aggnode->grpColIdx; + List *hashTlist = NIL; + TupleDesc hashDesc; + int i; - /* and add the remaining columns */ - while ((i = bms_first_member(colnos)) >= 0) - { - aggstate->hashGrpColIdxInput[aggstate->numhashGrpCols] = i; - aggstate->numhashGrpCols++; - } + perhash->largestGrpColIdx = 0; - /* and build a tuple descriptor for the hashtable */ - for (i = 0; i < aggstate->numhashGrpCols; i++) - { - int varNumber = aggstate->hashGrpColIdxInput[i] - 1; + /* + * If we're doing grouping sets, then some Vars might be referenced in + * tlist/qual for the benefit of other grouping sets, but not needed + * when hashing; i.e. prepare_projection_slot will null them out, so + * there'd be no point storing them. Use prepare_projection_slot's + * logic to determine which. + */ + if (aggstate->phases[0].grouped_cols) + { + Bitmapset *grouped_cols = aggstate->phases[0].grouped_cols[j]; + ListCell *lc; - hashTlist = lappend(hashTlist, list_nth(outerTlist, varNumber)); - aggstate->largestGrpColIdx = - Max(varNumber + 1, aggstate->largestGrpColIdx); - } + foreach(lc, aggstate->all_grouped_cols) + { + int attnum = lfirst_int(lc); - hashDesc = ExecTypeFromTL(hashTlist, false); - ExecSetSlotDescriptor(aggstate->hashslot, hashDesc); + if (!bms_is_member(attnum, grouped_cols)) + colnos = bms_del_member(colnos, attnum); + } + } + /* Add in all the grouping columns */ + for (i = 0; i < perhash->numCols; i++) + colnos = bms_add_member(colnos, grpColIdx[i]); - list_free(hashTlist); - bms_free(colnos); + perhash->hashGrpColIdxInput = + palloc(bms_num_members(colnos) * sizeof(AttrNumber)); + perhash->hashGrpColIdxHash = + palloc(perhash->numCols * sizeof(AttrNumber)); + + /* + * First build mapping for columns directly hashed. These are the + * first, because they'll be accessed when computing hash values and + * comparing tuples for exact matches. We also build simple mapping + * for execGrouping, so it knows where to find the to-be-hashed / + * compared columns in the input. + */ + for (i = 0; i < perhash->numCols; i++) + { + perhash->hashGrpColIdxInput[i] = grpColIdx[i]; + perhash->hashGrpColIdxHash[i] = i + 1; + perhash->numhashGrpCols++; + /* delete already mapped columns */ + bms_del_member(colnos, grpColIdx[i]); + } - return collist; + /* and add the remaining columns */ + while ((i = bms_first_member(colnos)) >= 0) + { + perhash->hashGrpColIdxInput[perhash->numhashGrpCols] = i; + perhash->numhashGrpCols++; + } + + /* and build a tuple descriptor for the hashtable */ + for (i = 0; i < perhash->numhashGrpCols; i++) + { + int varNumber = perhash->hashGrpColIdxInput[i] - 1; + + hashTlist = lappend(hashTlist, list_nth(outerTlist, varNumber)); + perhash->largestGrpColIdx = + Max(varNumber + 1, perhash->largestGrpColIdx); + } + + hashDesc = ExecTypeFromTL(hashTlist, false); + ExecSetSlotDescriptor(perhash->hashslot, hashDesc); + + list_free(hashTlist); + bms_free(colnos); + } + + bms_free(base_colnos); } /* @@ -1840,26 +2011,30 @@ hash_agg_entry_size(int numAggs) } /* - * Find or create a hashtable entry for the tuple group containing the - * given tuple. + * Find or create a hashtable entry for the tuple group containing the current + * tuple (already set in tmpcontext's outertuple slot), in the current grouping + * set (which the caller must have selected - note that initialize_aggregate + * depends on this). * * When called, CurrentMemoryContext should be the per-query context. */ static TupleHashEntryData * -lookup_hash_entry(AggState *aggstate, TupleTableSlot *inputslot) +lookup_hash_entry(AggState *aggstate) { - TupleTableSlot *hashslot = aggstate->hashslot; + TupleTableSlot *inputslot = aggstate->tmpcontext->ecxt_outertuple; + AggStatePerHash perhash = &aggstate->perhash[aggstate->current_set]; + TupleTableSlot *hashslot = perhash->hashslot; TupleHashEntryData *entry; bool isnew; - int i; + int i; /* transfer just the needed columns into hashslot */ - slot_getsomeattrs(inputslot, aggstate->largestGrpColIdx); + slot_getsomeattrs(inputslot, perhash->largestGrpColIdx); ExecClearTuple(hashslot); - for (i = 0; i < aggstate->numhashGrpCols; i++) + for (i = 0; i < perhash->numhashGrpCols; i++) { - int varNumber = aggstate->hashGrpColIdxInput[i] - 1; + int varNumber = perhash->hashGrpColIdxInput[i] - 1; hashslot->tts_values[i] = inputslot->tts_values[varNumber]; hashslot->tts_isnull[i] = inputslot->tts_isnull[varNumber]; @@ -1867,22 +2042,44 @@ lookup_hash_entry(AggState *aggstate, TupleTableSlot *inputslot) ExecStoreVirtualTuple(hashslot); /* find or create the hashtable entry using the filtered tuple */ - entry = LookupTupleHashEntry(aggstate->hashtable, hashslot, &isnew); + entry = LookupTupleHashEntry(perhash->hashtable, hashslot, &isnew); if (isnew) { entry->additional = (AggStatePerGroup) - MemoryContextAlloc(aggstate->hashtable->tablecxt, + MemoryContextAlloc(perhash->hashtable->tablecxt, sizeof(AggStatePerGroupData) * aggstate->numtrans); /* initialize aggregates for new tuple group */ initialize_aggregates(aggstate, (AggStatePerGroup) entry->additional, - 0); + -1); } return entry; } /* + * Look up hash entries for the current tuple in all hashed grouping sets, + * returning an array of pergroup pointers suitable for advance_aggregates. + * + * Be aware that lookup_hash_entry can reset the tmpcontext. + */ +static AggStatePerGroup * +lookup_hash_entries(AggState *aggstate) +{ + int numHashes = aggstate->num_hashes; + AggStatePerGroup *pergroup = aggstate->hash_pergroup; + int setno; + + for (setno = 0; setno < numHashes; setno++) + { + select_current_set(aggstate, setno, true); + pergroup[setno] = lookup_hash_entry(aggstate)->additional; + } + + return pergroup; +} + +/* * ExecAgg - * * ExecAgg receives tuples from its outer subplan and aggregates over @@ -1898,19 +2095,22 @@ lookup_hash_entry(AggState *aggstate, TupleTableSlot *inputslot) TupleTableSlot * ExecAgg(AggState *node) { - TupleTableSlot *result; + TupleTableSlot *result = NULL; if (!node->agg_done) { /* Dispatch based on strategy */ - switch (node->phase->aggnode->aggstrategy) + switch (node->phase->aggstrategy) { case AGG_HASHED: if (!node->table_filled) agg_fill_hash_table(node); + /* FALLTHROUGH */ + case AGG_MIXED: result = agg_retrieve_hash_table(node); break; - default: + case AGG_PLAIN: + case AGG_SORTED: result = agg_retrieve_direct(node); break; } @@ -1933,6 +2133,7 @@ agg_retrieve_direct(AggState *aggstate) ExprContext *tmpcontext; AggStatePerAgg peragg; AggStatePerGroup pergroup; + AggStatePerGroup *hash_pergroups = NULL; TupleTableSlot *outerslot; TupleTableSlot *firstSlot; TupleTableSlot *result; @@ -2019,6 +2220,19 @@ agg_retrieve_direct(AggState *aggstate) node = aggstate->phase->aggnode; numReset = numGroupingSets; } + else if (aggstate->aggstrategy == AGG_MIXED) + { + /* + * Mixed mode; we've output all the grouped stuff and have + * full hashtables, so switch to outputting those. + */ + initialize_phase(aggstate, 0); + aggstate->table_filled = true; + ResetTupleHashIterator(aggstate->perhash[0].hashtable, + &aggstate->perhash[0].hashiter); + select_current_set(aggstate, 0, true); + return agg_retrieve_hash_table(aggstate); + } else { aggstate->agg_done = true; @@ -2055,7 +2269,7 @@ agg_retrieve_direct(AggState *aggstate) *---------- */ if (aggstate->input_done || - (node->aggstrategy == AGG_SORTED && + (node->aggstrategy != AGG_PLAIN && aggstate->projected_set != -1 && aggstate->projected_set < (numGroupingSets - 1) && nextSetSize > 0 && @@ -2168,10 +2382,22 @@ agg_retrieve_direct(AggState *aggstate) */ for (;;) { + /* + * During phase 1 only of a mixed agg, we need to update + * hashtables as well in advance_aggregates. + */ + if (aggstate->aggstrategy == AGG_MIXED && + aggstate->current_phase == 1) + { + hash_pergroups = lookup_hash_entries(aggstate); + } + else + hash_pergroups = NULL; + if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit)) combine_aggregates(aggstate, pergroup); else - advance_aggregates(aggstate, pergroup); + advance_aggregates(aggstate, pergroup, hash_pergroups); /* Reset per-input-tuple context after each tuple */ ResetExprContext(tmpcontext); @@ -2198,7 +2424,7 @@ agg_retrieve_direct(AggState *aggstate) * If we are grouping, check whether we've crossed a group * boundary. */ - if (node->aggstrategy == AGG_SORTED) + if (node->aggstrategy != AGG_PLAIN) { if (!execTuplesMatch(firstSlot, outerslot, @@ -2231,7 +2457,11 @@ agg_retrieve_direct(AggState *aggstate) prepare_projection_slot(aggstate, econtext->ecxt_outertuple, currentSet); - finalize_aggregates(aggstate, peragg, pergroup, currentSet); + select_current_set(aggstate, currentSet, false); + + finalize_aggregates(aggstate, + peragg, + pergroup + (currentSet * aggstate->numtrans)); /* * If there's no row to project right now, we must continue rather @@ -2247,21 +2477,13 @@ agg_retrieve_direct(AggState *aggstate) } /* - * ExecAgg for hashed case: phase 1, read input and build hash table + * ExecAgg for hashed case: read input and build hash table */ static void agg_fill_hash_table(AggState *aggstate) { - ExprContext *tmpcontext; - TupleHashEntryData *entry; TupleTableSlot *outerslot; - - /* - * get state info from node - * - * tmpcontext is the per-input-tuple expression context - */ - tmpcontext = aggstate->tmpcontext; + ExprContext *tmpcontext = aggstate->tmpcontext; /* * Process each outer-plan tuple, and then fetch the next one, until we @@ -2269,32 +2491,40 @@ agg_fill_hash_table(AggState *aggstate) */ for (;;) { + AggStatePerGroup *pergroups; + outerslot = fetch_input_tuple(aggstate); if (TupIsNull(outerslot)) break; - /* set up for advance_aggregates call */ + + /* set up for lookup_hash_entries and advance_aggregates */ tmpcontext->ecxt_outertuple = outerslot; - /* Find or build hashtable entry for this tuple's group */ - entry = lookup_hash_entry(aggstate, outerslot); + /* Find or build hashtable entries */ + pergroups = lookup_hash_entries(aggstate); /* Advance the aggregates */ if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit)) - combine_aggregates(aggstate, (AggStatePerGroup) entry->additional); + combine_aggregates(aggstate, pergroups[0]); else - advance_aggregates(aggstate, (AggStatePerGroup) entry->additional); + advance_aggregates(aggstate, NULL, pergroups); - /* Reset per-input-tuple context after each tuple */ - ResetExprContext(tmpcontext); + /* + * Reset per-input-tuple context after each tuple, but note that the + * hash lookups do this too + */ + ResetExprContext(aggstate->tmpcontext); } aggstate->table_filled = true; - /* Initialize to walk the hash table */ - ResetTupleHashIterator(aggstate->hashtable, &aggstate->hashiter); + /* Initialize to walk the first hash table */ + select_current_set(aggstate, 0, true); + ResetTupleHashIterator(aggstate->perhash[0].hashtable, + &aggstate->perhash[0].hashiter); } /* - * ExecAgg for hashed case: phase 2, retrieving groups from hash table + * ExecAgg for hashed case: retrieving groups from hash table */ static TupleTableSlot * agg_retrieve_hash_table(AggState *aggstate) @@ -2305,17 +2535,22 @@ agg_retrieve_hash_table(AggState *aggstate) TupleHashEntryData *entry; TupleTableSlot *firstSlot; TupleTableSlot *result; - TupleTableSlot *hashslot; + AggStatePerHash perhash; /* - * get state info from node + * get state info from node. + * + * econtext is the per-output-tuple expression context. */ - /* econtext is the per-output-tuple expression context */ econtext = aggstate->ss.ps.ps_ExprContext; peragg = aggstate->peragg; firstSlot = aggstate->ss.ss_ScanTupleSlot; - hashslot = aggstate->hashslot; + /* + * Note that perhash (and therefore anything accessed through it) can + * change inside the loop, as we change between grouping sets. + */ + perhash = &aggstate->perhash[aggstate->current_set]; /* * We loop retrieving groups until we find one satisfying @@ -2323,17 +2558,37 @@ agg_retrieve_hash_table(AggState *aggstate) */ while (!aggstate->agg_done) { - int i; + TupleTableSlot *hashslot = perhash->hashslot; + int i; /* * Find the next entry in the hash table */ - entry = ScanTupleHashTable(aggstate->hashtable, &aggstate->hashiter); + entry = ScanTupleHashTable(perhash->hashtable, &perhash->hashiter); if (entry == NULL) { - /* No more entries in hashtable, so done */ - aggstate->agg_done = TRUE; - return NULL; + int nextset = aggstate->current_set + 1; + + if (nextset < aggstate->num_hashes) + { + /* + * Switch to next grouping set, reinitialize, and restart the + * loop. + */ + select_current_set(aggstate, nextset, true); + + perhash = &aggstate->perhash[aggstate->current_set]; + + ResetTupleHashIterator(perhash->hashtable, &perhash->hashiter); + + continue; + } + else + { + /* No more hashtables, so done */ + aggstate->agg_done = TRUE; + return NULL; + } } /* @@ -2356,9 +2611,9 @@ agg_retrieve_hash_table(AggState *aggstate) memset(firstSlot->tts_isnull, true, firstSlot->tts_tupleDescriptor->natts * sizeof(bool)); - for (i = 0; i < aggstate->numhashGrpCols; i++) + for (i = 0; i < perhash->numhashGrpCols; i++) { - int varNumber = aggstate->hashGrpColIdxInput[i] - 1; + int varNumber = perhash->hashGrpColIdxInput[i] - 1; firstSlot->tts_values[varNumber] = hashslot->tts_values[i]; firstSlot->tts_isnull[varNumber] = hashslot->tts_isnull[i]; @@ -2367,14 +2622,18 @@ agg_retrieve_hash_table(AggState *aggstate) pergroup = (AggStatePerGroup) entry->additional; - finalize_aggregates(aggstate, peragg, pergroup, 0); - /* * Use the representative input tuple for any references to * non-aggregated input columns in the qual and tlist. */ econtext->ecxt_outertuple = firstSlot; + prepare_projection_slot(aggstate, + econtext->ecxt_outertuple, + aggstate->current_set); + + finalize_aggregates(aggstate, peragg, pergroup); + result = project_aggregates(aggstate); if (result) return result; @@ -2388,7 +2647,8 @@ agg_retrieve_hash_table(AggState *aggstate) * ExecInitAgg * * Creates the run-time information for the agg node produced by the - * planner and initializes its outer subtree + * planner and initializes its outer subtree. + * * ----------------- */ AggState * @@ -2403,14 +2663,18 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) transno, aggno; int phase; + int phaseidx; List *combined_inputeval; ListCell *l; Bitmapset *all_grouped_cols = NULL; int numGroupingSets = 1; int numPhases; + int numHashes; int column_offset; int i = 0; int j = 0; + bool use_hashing = (node->aggstrategy == AGG_HASHED || + node->aggstrategy == AGG_MIXED); /* check for unsupported flags */ Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); @@ -2425,9 +2689,9 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) aggstate->aggs = NIL; aggstate->numaggs = 0; aggstate->numtrans = 0; + aggstate->aggstrategy = node->aggstrategy; aggstate->aggsplit = node->aggsplit; aggstate->maxsets = 0; - aggstate->hashfunctions = NULL; aggstate->projected_set = -1; aggstate->current_set = 0; aggstate->peragg = NULL; @@ -2437,18 +2701,22 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) aggstate->agg_done = false; aggstate->pergroup = NULL; aggstate->grp_firstTuple = NULL; - aggstate->hashtable = NULL; aggstate->sort_in = NULL; aggstate->sort_out = NULL; /* + * phases[0] always exists, but is dummy in sorted/plain mode + */ + numPhases = (use_hashing ? 1 : 2); + numHashes = (use_hashing ? 1 : 0); + + /* * Calculate the maximum number of grouping sets in any phase; this - * determines the size of some allocations. + * determines the size of some allocations. Also calculate the number of + * phases, since all hashed/mixed nodes contribute to only a single phase. */ if (node->groupingSets) { - Assert(node->aggstrategy != AGG_HASHED); - numGroupingSets = list_length(node->groupingSets); foreach(l, node->chain) @@ -2457,22 +2725,32 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) numGroupingSets = Max(numGroupingSets, list_length(agg->groupingSets)); + + /* + * additional AGG_HASHED aggs become part of phase 0, but all + * others add an extra phase. + */ + if (agg->aggstrategy != AGG_HASHED) + ++numPhases; + else + ++numHashes; } } aggstate->maxsets = numGroupingSets; - aggstate->numphases = numPhases = 1 + list_length(node->chain); + aggstate->numphases = numPhases; aggstate->aggcontexts = (ExprContext **) palloc0(sizeof(ExprContext *) * numGroupingSets); /* * Create expression contexts. We need three or more, one for - * per-input-tuple processing, one for per-output-tuple processing, and - * one for each grouping set. The per-tuple memory context of the - * per-grouping-set ExprContexts (aggcontexts) replaces the standalone - * memory context formerly used to hold transition values. We cheat a - * little by using ExecAssignExprContext() to build all of them. + * per-input-tuple processing, one for per-output-tuple processing, one + * for all the hashtables, and one for each grouping set. The per-tuple + * memory context of the per-grouping-set ExprContexts (aggcontexts) + * replaces the standalone memory context formerly used to hold transition + * values. We cheat a little by using ExecAssignExprContext() to build + * all of them. * * NOTE: the details of what is stored in aggcontexts and what is stored * in the regular per-query memory context are driven by a simple @@ -2488,14 +2766,21 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) aggstate->aggcontexts[i] = aggstate->ss.ps.ps_ExprContext; } + if (use_hashing) + { + ExecAssignExprContext(estate, &aggstate->ss.ps); + aggstate->hashcontext = aggstate->ss.ps.ps_ExprContext; + } + ExecAssignExprContext(estate, &aggstate->ss.ps); /* - * tuple table initialization + * tuple table initialization. + * + * For hashtables, we create some additional slots below. */ ExecInitScanTupleSlot(estate, &aggstate->ss); ExecInitResultTupleSlot(estate, &aggstate->ss.ps); - aggstate->hashslot = ExecInitExtraTupleSlot(estate); aggstate->sort_slot = ExecInitExtraTupleSlot(estate); /* @@ -2559,19 +2844,26 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) * For each phase, prepare grouping set data and fmgr lookup data for * compare functions. Accumulate all_grouped_cols in passing. */ - aggstate->phases = palloc0(numPhases * sizeof(AggStatePerPhaseData)); - for (phase = 0; phase < numPhases; ++phase) + aggstate->num_hashes = numHashes; + if (numHashes) + { + aggstate->perhash = palloc0(sizeof(AggStatePerHashData) * numHashes); + aggstate->phases[0].numsets = 0; + aggstate->phases[0].gset_lengths = palloc(numHashes * sizeof(int)); + aggstate->phases[0].grouped_cols = palloc(numHashes * sizeof(Bitmapset *)); + } + + phase = 0; + for (phaseidx = 0; phaseidx <= list_length(node->chain); ++phaseidx) { - AggStatePerPhase phasedata = &aggstate->phases[phase]; Agg *aggnode; Sort *sortnode; - int num_sets; - if (phase > 0) + if (phaseidx > 0) { - aggnode = castNode(Agg, list_nth(node->chain, phase - 1)); + aggnode = castNode(Agg, list_nth(node->chain, phaseidx - 1)); sortnode = castNode(Sort, aggnode->plan.lefttree); } else @@ -2580,53 +2872,91 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) sortnode = NULL; } - phasedata->numsets = num_sets = list_length(aggnode->groupingSets); + Assert(phase <= 1 || sortnode); - if (num_sets) + if (aggnode->aggstrategy == AGG_HASHED + || aggnode->aggstrategy == AGG_MIXED) { - phasedata->gset_lengths = palloc(num_sets * sizeof(int)); - phasedata->grouped_cols = palloc(num_sets * sizeof(Bitmapset *)); + AggStatePerPhase phasedata = &aggstate->phases[0]; + AggStatePerHash perhash; + Bitmapset *cols = NULL; - i = 0; - foreach(l, aggnode->groupingSets) - { - int current_length = list_length(lfirst(l)); - Bitmapset *cols = NULL; + Assert(phase == 0); + i = phasedata->numsets++; + perhash = &aggstate->perhash[i]; - /* planner forces this to be correct */ - for (j = 0; j < current_length; ++j) - cols = bms_add_member(cols, aggnode->grpColIdx[j]); + /* phase 0 always points to the "real" Agg in the hash case */ + phasedata->aggnode = node; + phasedata->aggstrategy = node->aggstrategy; - phasedata->grouped_cols[i] = cols; - phasedata->gset_lengths[i] = current_length; - ++i; - } + /* but the actual Agg node representing this hash is saved here */ + perhash->aggnode = aggnode; + + phasedata->gset_lengths[i] = perhash->numCols = aggnode->numCols; + + for (j = 0; j < aggnode->numCols; ++j) + cols = bms_add_member(cols, aggnode->grpColIdx[j]); + + phasedata->grouped_cols[i] = cols; - all_grouped_cols = bms_add_members(all_grouped_cols, - phasedata->grouped_cols[0]); + all_grouped_cols = bms_add_members(all_grouped_cols, cols); + continue; } else { - Assert(phase == 0); + AggStatePerPhase phasedata = &aggstate->phases[++phase]; + int num_sets; - phasedata->gset_lengths = NULL; - phasedata->grouped_cols = NULL; - } + phasedata->numsets = num_sets = list_length(aggnode->groupingSets); - /* - * If we are grouping, precompute fmgr lookup data for inner loop. - */ - if (aggnode->aggstrategy == AGG_SORTED) - { - Assert(aggnode->numCols > 0); + if (num_sets) + { + phasedata->gset_lengths = palloc(num_sets * sizeof(int)); + phasedata->grouped_cols = palloc(num_sets * sizeof(Bitmapset *)); - phasedata->eqfunctions = - execTuplesMatchPrepare(aggnode->numCols, - aggnode->grpOperators); - } + i = 0; + foreach(l, aggnode->groupingSets) + { + int current_length = list_length(lfirst(l)); + Bitmapset *cols = NULL; + + /* planner forces this to be correct */ + for (j = 0; j < current_length; ++j) + cols = bms_add_member(cols, aggnode->grpColIdx[j]); + + phasedata->grouped_cols[i] = cols; + phasedata->gset_lengths[i] = current_length; + + ++i; + } + + all_grouped_cols = bms_add_members(all_grouped_cols, + phasedata->grouped_cols[0]); + } + else + { + Assert(phaseidx == 0); + + phasedata->gset_lengths = NULL; + phasedata->grouped_cols = NULL; + } + + /* + * If we are grouping, precompute fmgr lookup data for inner loop. + */ + if (aggnode->aggstrategy == AGG_SORTED) + { + Assert(aggnode->numCols > 0); + + phasedata->eqfunctions = + execTuplesMatchPrepare(aggnode->numCols, + aggnode->grpOperators); + } - phasedata->aggnode = aggnode; - phasedata->sortnode = sortnode; + phasedata->aggnode = aggnode; + phasedata->aggstrategy = aggnode->aggstrategy; + phasedata->sortnode = sortnode; + } } /* @@ -2637,13 +2967,6 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) aggstate->all_grouped_cols = lcons_int(i, aggstate->all_grouped_cols); /* - * Initialize current phase-dependent values to initial phase - */ - - aggstate->current_phase = 0; - initialize_phase(aggstate, 0); - - /* * Set up aggregate-result storage in the output expr context, and also * allocate my private per-agg working storage */ @@ -2657,23 +2980,30 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) aggstate->peragg = peraggs; aggstate->pertrans = pertransstates; - /* * Hashing can only appear in the initial phase. */ - if (node->aggstrategy == AGG_HASHED) + if (use_hashing) { - find_hash_columns(aggstate); + for (i = 0; i < numHashes; ++i) + { + aggstate->perhash[i].hashslot = ExecInitExtraTupleSlot(estate); + + execTuplesHashPrepare(aggstate->perhash[i].numCols, + aggstate->perhash[i].aggnode->grpOperators, + &aggstate->perhash[i].eqfunctions, + &aggstate->perhash[i].hashfunctions); + } - execTuplesHashPrepare(node->numCols, - node->grpOperators, - &aggstate->phases[0].eqfunctions, - &aggstate->hashfunctions); + /* this is an array of pointers, not structures */ + aggstate->hash_pergroup = palloc0(sizeof(AggStatePerGroup) * numHashes); + find_hash_columns(aggstate); build_hash_table(aggstate); aggstate->table_filled = false; } - else + + if (node->aggstrategy != AGG_HASHED) { AggStatePerGroup pergroup; @@ -2684,6 +3014,25 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) aggstate->pergroup = pergroup; } + /* + * Initialize current phase-dependent values to initial phase. The initial + * phase is 1 (first sort pass) for all strategies that use sorting (if + * hashing is being done too, then phase 0 is processed last); but if only + * hashing is being done, then phase 0 is all there is. + */ + if (node->aggstrategy == AGG_HASHED) + { + aggstate->current_phase = 0; + initialize_phase(aggstate, 0); + select_current_set(aggstate, 0, true); + } + else + { + aggstate->current_phase = 1; + initialize_phase(aggstate, 1); + select_current_set(aggstate, 0, false); + } + /* ----------------- * Perform lookups of aggregate function info, and initialize the * unchanging fields of the per-agg and per-trans data. @@ -3261,7 +3610,7 @@ build_pertrans_for_aggref(AggStatePerTrans pertrans, * We don't implement DISTINCT or ORDER BY aggs in the HASHED case * (yet) */ - Assert(((Agg *) aggstate->ss.ps.plan)->aggstrategy != AGG_HASHED); + Assert(aggstate->aggstrategy != AGG_HASHED && aggstate->aggstrategy != AGG_MIXED); /* If we have only one input, we need its len/byval info. */ if (numInputs == 1) @@ -3510,6 +3859,8 @@ ExecEndAgg(AggState *node) /* And ensure any agg shutdown callbacks have been called */ for (setno = 0; setno < numGroupingSets; setno++) ReScanExprContext(node->aggcontexts[setno]); + if (node->hashcontext) + ReScanExprContext(node->hashcontext); /* * We don't actually free any ExprContexts here (see comment in @@ -3537,7 +3888,7 @@ ExecReScanAgg(AggState *node) node->agg_done = false; - if (aggnode->aggstrategy == AGG_HASHED) + if (node->aggstrategy == AGG_HASHED) { /* * In the hashed case, if we haven't yet built the hash table then we @@ -3557,7 +3908,9 @@ ExecReScanAgg(AggState *node) if (outerPlan->chgParam == NULL && !bms_overlap(node->ss.ps.chgParam, aggnode->aggParams)) { - ResetTupleHashIterator(node->hashtable, &node->hashiter); + ResetTupleHashIterator(node->perhash[0].hashtable, + &node->perhash[0].hashiter); + select_current_set(node, 0, true); return; } } @@ -3582,11 +3935,7 @@ ExecReScanAgg(AggState *node) * ExecReScan already did it. But we do need to reset our per-grouping-set * contexts, which may have transvalues stored in them. (We use rescan * rather than just reset because transfns may have registered callbacks - * that need to be run now.) - * - * Note that with AGG_HASHED, the hash table is allocated in a sub-context - * of the aggcontext. This used to be an issue, but now, resetting a - * context automatically deletes sub-contexts too. + * that need to be run now.) For the AGG_HASHED case, see below. */ for (setno = 0; setno < numGroupingSets; setno++) @@ -3606,13 +3955,21 @@ ExecReScanAgg(AggState *node) MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numaggs); MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numaggs); - if (aggnode->aggstrategy == AGG_HASHED) + /* + * With AGG_HASHED/MIXED, the hash table is allocated in a sub-context of + * the hashcontext. This used to be an issue, but now, resetting a context + * automatically deletes sub-contexts too. + */ + if (node->aggstrategy == AGG_HASHED || node->aggstrategy == AGG_MIXED) { + ReScanExprContext(node->hashcontext); /* Rebuild an empty hash table */ build_hash_table(node); node->table_filled = false; + /* iterator will be reset when the table is filled */ } - else + + if (node->aggstrategy != AGG_HASHED) { /* * Reset the per-group state (in particular, mark transvalues null) @@ -3620,8 +3977,8 @@ ExecReScanAgg(AggState *node) MemSet(node->pergroup, 0, sizeof(AggStatePerGroupData) * node->numaggs * numGroupingSets); - /* reset to phase 0 */ - initialize_phase(node, 0); + /* reset to phase 1 */ + initialize_phase(node, 1); node->input_done = false; node->projected_set = -1; @@ -3662,7 +4019,7 @@ AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext *aggcontext) if (aggcontext) { AggState *aggstate = ((AggState *) fcinfo->context); - ExprContext *cxt = aggstate->aggcontexts[aggstate->current_set]; + ExprContext *cxt = aggstate->curaggcontext; *aggcontext = cxt->ecxt_per_tuple_memory; } @@ -3751,7 +4108,7 @@ AggRegisterCallback(FunctionCallInfo fcinfo, if (fcinfo->context && IsA(fcinfo->context, AggState)) { AggState *aggstate = (AggState *) fcinfo->context; - ExprContext *cxt = aggstate->aggcontexts[aggstate->current_set]; + ExprContext *cxt = aggstate->curaggcontext; RegisterExprContextCallback(cxt, func, arg); |