diff options
Diffstat (limited to 'src/backend/executor/nodeAgg.c')
-rw-r--r-- | src/backend/executor/nodeAgg.c | 364 |
1 files changed, 77 insertions, 287 deletions
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index 5bf9e99bbc8..dc640feb631 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -465,14 +465,6 @@ static void build_pertrans_for_aggref(AggStatePerTrans pertrans, Oid aggserialfn, Oid aggdeserialfn, Datum initValue, bool initValueIsNull, Oid *inputTypes, int numArguments); -static int find_compatible_peragg(Aggref *newagg, AggState *aggstate, - int lastaggno, List **same_input_transnos); -static int find_compatible_pertrans(AggState *aggstate, Aggref *newagg, - bool shareable, - Oid aggtransfn, Oid aggtranstype, - Oid aggserialfn, Oid aggdeserialfn, - Datum initValue, bool initValueIsNull, - List *transnos); /* @@ -3244,9 +3236,11 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) Plan *outerPlan; ExprContext *econtext; TupleDesc scanDesc; - int numaggs, - transno, - aggno; + int max_aggno; + int max_transno; + int numaggrefs; + int numaggs; + int numtrans; int phase; int phaseidx; ListCell *l; @@ -3422,9 +3416,9 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) * semantics, and it's forbidden by the spec. Because it is true, we * don't need to worry about evaluating the aggs in any particular order. * - * Note: execExpr.c finds Aggrefs for us, and adds their AggrefExprState - * nodes to aggstate->aggs. Aggrefs in the qual are found here; Aggrefs - * in the targetlist are found during ExecAssignProjectionInfo, below. + * Note: execExpr.c finds Aggrefs for us, and adds them to aggstate->aggs. + * Aggrefs in the qual are found here; Aggrefs in the targetlist are found + * during ExecAssignProjectionInfo, above. */ aggstate->ss.ps.qual = ExecInitQual(node->plan.qual, (PlanState *) aggstate); @@ -3432,8 +3426,18 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) /* * We should now have found all Aggrefs in the targetlist and quals. */ - numaggs = aggstate->numaggs; - Assert(numaggs == list_length(aggstate->aggs)); + numaggrefs = list_length(aggstate->aggs); + max_aggno = -1; + max_transno = -1; + foreach(l, aggstate->aggs) + { + Aggref *aggref = (Aggref *) lfirst(l); + + max_aggno = Max(max_aggno, aggref->aggno); + max_transno = Max(max_transno, aggref->aggtransno); + } + numaggs = max_aggno + 1; + numtrans = max_transno + 1; /* * For each phase, prepare grouping set data and fmgr lookup data for @@ -3604,7 +3608,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numaggs); peraggs = (AggStatePerAgg) palloc0(sizeof(AggStatePerAggData) * numaggs); - pertransstates = (AggStatePerTrans) palloc0(sizeof(AggStatePerTransData) * numaggs); + pertransstates = (AggStatePerTrans) palloc0(sizeof(AggStatePerTransData) * numtrans); aggstate->peragg = peraggs; aggstate->pertrans = pertransstates; @@ -3695,92 +3699,41 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) select_current_set(aggstate, 0, false); } - /* ----------------- + /* * Perform lookups of aggregate function info, and initialize the * unchanging fields of the per-agg and per-trans data. - * - * We try to optimize by detecting duplicate aggregate functions so that - * their state and final values are re-used, rather than needlessly being - * re-calculated independently. We also detect aggregates that are not - * the same, but which can share the same transition state. - * - * Scenarios: - * - * 1. Identical aggregate function calls appear in the query: - * - * SELECT SUM(x) FROM ... HAVING SUM(x) > 0 - * - * Since these aggregates are identical, we only need to calculate - * the value once. Both aggregates will share the same 'aggno' value. - * - * 2. Two different aggregate functions appear in the query, but the - * aggregates have the same arguments, transition functions and - * initial values (and, presumably, different final functions): - * - * SELECT AVG(x), STDDEV(x) FROM ... - * - * In this case we must create a new peragg for the varying aggregate, - * and we need to call the final functions separately, but we need - * only run the transition function once. (This requires that the - * final functions be nondestructive of the transition state, but - * that's required anyway for other reasons.) - * - * For either of these optimizations to be valid, all aggregate properties - * used in the transition phase must be the same, including any modifiers - * such as ORDER BY, DISTINCT and FILTER, and the arguments mustn't - * contain any volatile functions. - * ----------------- */ - aggno = -1; - transno = -1; foreach(l, aggstate->aggs) { - AggrefExprState *aggrefstate = (AggrefExprState *) lfirst(l); - Aggref *aggref = aggrefstate->aggref; + Aggref *aggref = lfirst(l); AggStatePerAgg peragg; AggStatePerTrans pertrans; - int existing_aggno; - int existing_transno; - List *same_input_transnos; Oid inputTypes[FUNC_MAX_ARGS]; int numArguments; int numDirectArgs; HeapTuple aggTuple; Form_pg_aggregate aggform; AclResult aclresult; - Oid transfn_oid, - finalfn_oid; - bool shareable; + Oid finalfn_oid; Oid serialfn_oid, deserialfn_oid; + Oid aggOwner; Expr *finalfnexpr; Oid aggtranstype; - Datum textInitVal; - Datum initValue; - bool initValueIsNull; /* Planner should have assigned aggregate to correct level */ Assert(aggref->agglevelsup == 0); /* ... and the split mode should match */ Assert(aggref->aggsplit == aggstate->aggsplit); - /* 1. Check for already processed aggs which can be re-used */ - existing_aggno = find_compatible_peragg(aggref, aggstate, aggno, - &same_input_transnos); - if (existing_aggno != -1) - { - /* - * Existing compatible agg found. so just point the Aggref to the - * same per-agg struct. - */ - aggrefstate->aggno = existing_aggno; + peragg = &peraggs[aggref->aggno]; + + /* Check if we initialized the state for this aggregate already. */ + if (peragg->aggref != NULL) continue; - } - /* Mark Aggref state node with assigned index in the result array */ - peragg = &peraggs[++aggno]; peragg->aggref = aggref; - aggrefstate->aggno = aggno; + peragg->transno = aggref->aggtransno; /* Fetch the pg_aggregate row */ aggTuple = SearchSysCache1(AGGFNOID, @@ -3802,36 +3755,12 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) aggtranstype = aggref->aggtranstype; Assert(OidIsValid(aggtranstype)); - /* - * If this aggregation is performing state combines, then instead of - * using the transition function, we'll use the combine function - */ - if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit)) - { - transfn_oid = aggform->aggcombinefn; - - /* If not set then the planner messed up */ - if (!OidIsValid(transfn_oid)) - elog(ERROR, "combinefn not set for aggregate function"); - } - else - transfn_oid = aggform->aggtransfn; - /* Final function only required if we're finalizing the aggregates */ if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit)) peragg->finalfn_oid = finalfn_oid = InvalidOid; else peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn; - /* - * If finalfn is marked read-write, we can't share transition states; - * but it is okay to share states for AGGMODIFY_SHAREABLE aggs. Also, - * if we're not executing the finalfn here, we can share regardless. - */ - shareable = (aggform->aggfinalmodify != AGGMODIFY_READ_WRITE) || - (finalfn_oid == InvalidOid); - peragg->shareable = shareable; - serialfn_oid = InvalidOid; deserialfn_oid = InvalidOid; @@ -3871,7 +3800,6 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) /* Check that aggregate owner has permission to call component fns */ { HeapTuple procTuple; - Oid aggOwner; procTuple = SearchSysCache1(PROCOID, ObjectIdGetDatum(aggref->aggfnoid)); @@ -3881,12 +3809,6 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner; ReleaseSysCache(procTuple); - aclresult = pg_proc_aclcheck(transfn_oid, aggOwner, - ACL_EXECUTE); - if (aclresult != ACLCHECK_OK) - aclcheck_error(aclresult, OBJECT_FUNCTION, - get_func_name(transfn_oid)); - InvokeFunctionExecuteHook(transfn_oid); if (OidIsValid(finalfn_oid)) { aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner, @@ -3959,51 +3881,60 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) &peragg->resulttypeByVal); /* - * initval is potentially null, so don't try to access it as a struct - * field. Must do it the hard way with SysCacheGetAttr. + * Build working state for invoking the transition function, if we + * haven't done it already. */ - textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, - Anum_pg_aggregate_agginitval, - &initValueIsNull); - if (initValueIsNull) - initValue = (Datum) 0; - else - initValue = GetAggInitVal(textInitVal, aggtranstype); - - /* - * 2. Build working state for invoking the transition function, or - * look up previously initialized working state, if we can share it. - * - * find_compatible_peragg() already collected a list of shareable - * per-Trans's with the same inputs. Check if any of them have the - * same transition function and initial value. - */ - existing_transno = find_compatible_pertrans(aggstate, aggref, - shareable, - transfn_oid, aggtranstype, - serialfn_oid, deserialfn_oid, - initValue, initValueIsNull, - same_input_transnos); - if (existing_transno != -1) + pertrans = &pertransstates[aggref->aggtransno]; + if (pertrans->aggref == NULL) { + Datum textInitVal; + Datum initValue; + bool initValueIsNull; + Oid transfn_oid; + /* - * Existing compatible trans found, so just point the 'peragg' to - * the same per-trans struct, and mark the trans state as shared. + * If this aggregation is performing state combines, then instead + * of using the transition function, we'll use the combine + * function */ - pertrans = &pertransstates[existing_transno]; - pertrans->aggshared = true; - peragg->transno = existing_transno; - } - else - { - pertrans = &pertransstates[++transno]; + if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit)) + { + transfn_oid = aggform->aggcombinefn; + + /* If not set then the planner messed up */ + if (!OidIsValid(transfn_oid)) + elog(ERROR, "combinefn not set for aggregate function"); + } + else + transfn_oid = aggform->aggtransfn; + + aclresult = pg_proc_aclcheck(transfn_oid, aggOwner, + ACL_EXECUTE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_FUNCTION, + get_func_name(transfn_oid)); + InvokeFunctionExecuteHook(transfn_oid); + + /* + * initval is potentially null, so don't try to access it as a + * struct field. Must do it the hard way with SysCacheGetAttr. + */ + textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, + Anum_pg_aggregate_agginitval, + &initValueIsNull); + if (initValueIsNull) + initValue = (Datum) 0; + else + initValue = GetAggInitVal(textInitVal, aggtranstype); + build_pertrans_for_aggref(pertrans, aggstate, estate, aggref, transfn_oid, aggtranstype, serialfn_oid, deserialfn_oid, initValue, initValueIsNull, inputTypes, numArguments); - peragg->transno = transno; } + else + pertrans->aggshared = true; ReleaseSysCache(aggTuple); } @@ -4011,8 +3942,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) * Update aggstate->numaggs to be the number of unique aggregates found. * Also set numstates to the number of unique transition states found. */ - aggstate->numaggs = aggno + 1; - aggstate->numtrans = transno + 1; + aggstate->numaggs = numaggs; + aggstate->numtrans = numtrans; /* * Last, check whether any more aggregates got added onto the node while @@ -4024,7 +3955,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) * need to work hard on a helpful error message; but we defend against it * here anyway, just to be sure.) */ - if (numaggs != list_length(aggstate->aggs)) + if (numaggrefs != list_length(aggstate->aggs)) ereport(ERROR, (errcode(ERRCODE_GROUPING_ERROR), errmsg("aggregate function calls cannot be nested"))); @@ -4420,147 +4351,6 @@ GetAggInitVal(Datum textInitVal, Oid transtype) return initVal; } -/* - * find_compatible_peragg - search for a previously initialized per-Agg struct - * - * Searches the previously looked at aggregates to find one which is compatible - * with this one, with the same input parameters. If no compatible aggregate - * can be found, returns -1. - * - * As a side-effect, this also collects a list of existing, shareable per-Trans - * structs with matching inputs. If no identical Aggref is found, the list is - * passed later to find_compatible_pertrans, to see if we can at least reuse - * the state value of another aggregate. - */ -static int -find_compatible_peragg(Aggref *newagg, AggState *aggstate, - int lastaggno, List **same_input_transnos) -{ - int aggno; - AggStatePerAgg peraggs; - - *same_input_transnos = NIL; - - /* we mustn't reuse the aggref if it contains volatile function calls */ - if (contain_volatile_functions((Node *) newagg)) - return -1; - - peraggs = aggstate->peragg; - - /* - * Search through the list of already seen aggregates. If we find an - * existing identical aggregate call, then we can re-use that one. While - * searching, we'll also collect a list of Aggrefs with the same input - * parameters. If no matching Aggref is found, the caller can potentially - * still re-use the transition state of one of them. (At this stage we - * just compare the parsetrees; whether different aggregates share the - * same transition function will be checked later.) - */ - for (aggno = 0; aggno <= lastaggno; aggno++) - { - AggStatePerAgg peragg; - Aggref *existingRef; - - peragg = &peraggs[aggno]; - existingRef = peragg->aggref; - - /* all of the following must be the same or it's no match */ - if (newagg->inputcollid != existingRef->inputcollid || - newagg->aggtranstype != existingRef->aggtranstype || - newagg->aggstar != existingRef->aggstar || - newagg->aggvariadic != existingRef->aggvariadic || - newagg->aggkind != existingRef->aggkind || - !equal(newagg->args, existingRef->args) || - !equal(newagg->aggorder, existingRef->aggorder) || - !equal(newagg->aggdistinct, existingRef->aggdistinct) || - !equal(newagg->aggfilter, existingRef->aggfilter)) - continue; - - /* if it's the same aggregate function then report exact match */ - if (newagg->aggfnoid == existingRef->aggfnoid && - newagg->aggtype == existingRef->aggtype && - newagg->aggcollid == existingRef->aggcollid && - equal(newagg->aggdirectargs, existingRef->aggdirectargs)) - { - list_free(*same_input_transnos); - *same_input_transnos = NIL; - return aggno; - } - - /* - * Not identical, but it had the same inputs. If the final function - * permits sharing, return its transno to the caller, in case we can - * re-use its per-trans state. (If there's already sharing going on, - * we might report a transno more than once. find_compatible_pertrans - * is cheap enough that it's not worth spending cycles to avoid that.) - */ - if (peragg->shareable) - *same_input_transnos = lappend_int(*same_input_transnos, - peragg->transno); - } - - return -1; -} - -/* - * find_compatible_pertrans - search for a previously initialized per-Trans - * struct - * - * Searches the list of transnos for a per-Trans struct with the same - * transition function and initial condition. (The inputs have already been - * verified to match.) - */ -static int -find_compatible_pertrans(AggState *aggstate, Aggref *newagg, bool shareable, - Oid aggtransfn, Oid aggtranstype, - Oid aggserialfn, Oid aggdeserialfn, - Datum initValue, bool initValueIsNull, - List *transnos) -{ - ListCell *lc; - - /* If this aggregate can't share transition states, give up */ - if (!shareable) - return -1; - - foreach(lc, transnos) - { - int transno = lfirst_int(lc); - AggStatePerTrans pertrans = &aggstate->pertrans[transno]; - - /* - * if the transfns or transition state types are not the same then the - * state can't be shared. - */ - if (aggtransfn != pertrans->transfn_oid || - aggtranstype != pertrans->aggtranstype) - continue; - - /* - * The serialization and deserialization functions must match, if - * present, as we're unable to share the trans state for aggregates - * which will serialize or deserialize into different formats. - * Remember that these will be InvalidOid if they're not required for - * this agg node. - */ - if (aggserialfn != pertrans->serialfn_oid || - aggdeserialfn != pertrans->deserialfn_oid) - continue; - - /* - * Check that the initial condition matches, too. - */ - if (initValueIsNull && pertrans->initValueIsNull) - return transno; - - if (!initValueIsNull && !pertrans->initValueIsNull && - datumIsEqual(initValue, pertrans->initValue, - pertrans->transtypeByVal, pertrans->transtypeLen)) - return transno; - } - return -1; -} - void ExecEndAgg(AggState *node) { |