diff options
author | Heikki Linnakangas <heikki.linnakangas@iki.fi> | 2020-11-24 10:45:00 +0200 |
---|---|---|
committer | Heikki Linnakangas <heikki.linnakangas@iki.fi> | 2020-11-24 10:45:00 +0200 |
commit | 0a2bc5d61e713e3fe72438f020eea5fcc90b0f0b (patch) | |
tree | 8f630f828fa6bbfb5984f037118211cd68d8a50e /src/backend/executor | |
parent | e522024bd8dd28a0f13dcccfd39170698f45c939 (diff) | |
download | postgresql-0a2bc5d61e713e3fe72438f020eea5fcc90b0f0b.tar.gz postgresql-0a2bc5d61e713e3fe72438f020eea5fcc90b0f0b.zip |
Move per-agg and per-trans duplicate finding to the planner.
This has the advantage that the cost estimates for aggregates can count
the number of calls to transition and final functions correctly.
Bump catalog version, because views can contain Aggrefs.
Reviewed-by: Andres Freund
Discussion: https://www.postgresql.org/message-id/b2e3536b-1dbc-8303-c97e-89cb0b4a9a48%40iki.fi
Diffstat (limited to 'src/backend/executor')
-rw-r--r-- | src/backend/executor/execExpr.c | 10 | ||||
-rw-r--r-- | src/backend/executor/execExprInterp.c | 6 | ||||
-rw-r--r-- | src/backend/executor/nodeAgg.c | 364 |
3 files changed, 83 insertions, 297 deletions
diff --git a/src/backend/executor/execExpr.c b/src/backend/executor/execExpr.c index d76836c09b1..79b325c7cfb 100644 --- a/src/backend/executor/execExpr.c +++ b/src/backend/executor/execExpr.c @@ -99,8 +99,7 @@ static void ExecBuildAggTransCall(ExprState *state, AggState *aggstate, * the same as the per-query context of the associated ExprContext. * * Any Aggref, WindowFunc, or SubPlan nodes found in the tree are added to - * the lists of such nodes held by the parent PlanState (or more accurately, - * the AggrefExprState etc. nodes created for them are added). + * the lists of such nodes held by the parent PlanState. * * Note: there is no ExecEndExpr function; we assume that any resource * cleanup needed will be handled by just releasing the memory context @@ -779,18 +778,15 @@ ExecInitExprRec(Expr *node, ExprState *state, case T_Aggref: { Aggref *aggref = (Aggref *) node; - AggrefExprState *astate = makeNode(AggrefExprState); scratch.opcode = EEOP_AGGREF; - scratch.d.aggref.astate = astate; - astate->aggref = aggref; + scratch.d.aggref.aggno = aggref->aggno; if (state->parent && IsA(state->parent, AggState)) { AggState *aggstate = (AggState *) state->parent; - aggstate->aggs = lappend(aggstate->aggs, astate); - aggstate->numaggs++; + aggstate->aggs = lappend(aggstate->aggs, aggref); } else { diff --git a/src/backend/executor/execExprInterp.c b/src/backend/executor/execExprInterp.c index 26c2b496321..c09371ad58f 100644 --- a/src/backend/executor/execExprInterp.c +++ b/src/backend/executor/execExprInterp.c @@ -1494,12 +1494,12 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull) * Returns a Datum whose value is the precomputed aggregate value * found in the given expression context. */ - AggrefExprState *aggref = op->d.aggref.astate; + int aggno = op->d.aggref.aggno; Assert(econtext->ecxt_aggvalues != NULL); - *op->resvalue = econtext->ecxt_aggvalues[aggref->aggno]; - *op->resnull = econtext->ecxt_aggnulls[aggref->aggno]; + *op->resvalue = econtext->ecxt_aggvalues[aggno]; + *op->resnull = econtext->ecxt_aggnulls[aggno]; EEO_NEXT(); } diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index 5bf9e99bbc8..dc640feb631 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -465,14 +465,6 @@ static void build_pertrans_for_aggref(AggStatePerTrans pertrans, Oid aggserialfn, Oid aggdeserialfn, Datum initValue, bool initValueIsNull, Oid *inputTypes, int numArguments); -static int find_compatible_peragg(Aggref *newagg, AggState *aggstate, - int lastaggno, List **same_input_transnos); -static int find_compatible_pertrans(AggState *aggstate, Aggref *newagg, - bool shareable, - Oid aggtransfn, Oid aggtranstype, - Oid aggserialfn, Oid aggdeserialfn, - Datum initValue, bool initValueIsNull, - List *transnos); /* @@ -3244,9 +3236,11 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) Plan *outerPlan; ExprContext *econtext; TupleDesc scanDesc; - int numaggs, - transno, - aggno; + int max_aggno; + int max_transno; + int numaggrefs; + int numaggs; + int numtrans; int phase; int phaseidx; ListCell *l; @@ -3422,9 +3416,9 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) * semantics, and it's forbidden by the spec. Because it is true, we * don't need to worry about evaluating the aggs in any particular order. * - * Note: execExpr.c finds Aggrefs for us, and adds their AggrefExprState - * nodes to aggstate->aggs. Aggrefs in the qual are found here; Aggrefs - * in the targetlist are found during ExecAssignProjectionInfo, below. + * Note: execExpr.c finds Aggrefs for us, and adds them to aggstate->aggs. + * Aggrefs in the qual are found here; Aggrefs in the targetlist are found + * during ExecAssignProjectionInfo, above. */ aggstate->ss.ps.qual = ExecInitQual(node->plan.qual, (PlanState *) aggstate); @@ -3432,8 +3426,18 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) /* * We should now have found all Aggrefs in the targetlist and quals. */ - numaggs = aggstate->numaggs; - Assert(numaggs == list_length(aggstate->aggs)); + numaggrefs = list_length(aggstate->aggs); + max_aggno = -1; + max_transno = -1; + foreach(l, aggstate->aggs) + { + Aggref *aggref = (Aggref *) lfirst(l); + + max_aggno = Max(max_aggno, aggref->aggno); + max_transno = Max(max_transno, aggref->aggtransno); + } + numaggs = max_aggno + 1; + numtrans = max_transno + 1; /* * For each phase, prepare grouping set data and fmgr lookup data for @@ -3604,7 +3608,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numaggs); peraggs = (AggStatePerAgg) palloc0(sizeof(AggStatePerAggData) * numaggs); - pertransstates = (AggStatePerTrans) palloc0(sizeof(AggStatePerTransData) * numaggs); + pertransstates = (AggStatePerTrans) palloc0(sizeof(AggStatePerTransData) * numtrans); aggstate->peragg = peraggs; aggstate->pertrans = pertransstates; @@ -3695,92 +3699,41 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) select_current_set(aggstate, 0, false); } - /* ----------------- + /* * Perform lookups of aggregate function info, and initialize the * unchanging fields of the per-agg and per-trans data. - * - * We try to optimize by detecting duplicate aggregate functions so that - * their state and final values are re-used, rather than needlessly being - * re-calculated independently. We also detect aggregates that are not - * the same, but which can share the same transition state. - * - * Scenarios: - * - * 1. Identical aggregate function calls appear in the query: - * - * SELECT SUM(x) FROM ... HAVING SUM(x) > 0 - * - * Since these aggregates are identical, we only need to calculate - * the value once. Both aggregates will share the same 'aggno' value. - * - * 2. Two different aggregate functions appear in the query, but the - * aggregates have the same arguments, transition functions and - * initial values (and, presumably, different final functions): - * - * SELECT AVG(x), STDDEV(x) FROM ... - * - * In this case we must create a new peragg for the varying aggregate, - * and we need to call the final functions separately, but we need - * only run the transition function once. (This requires that the - * final functions be nondestructive of the transition state, but - * that's required anyway for other reasons.) - * - * For either of these optimizations to be valid, all aggregate properties - * used in the transition phase must be the same, including any modifiers - * such as ORDER BY, DISTINCT and FILTER, and the arguments mustn't - * contain any volatile functions. - * ----------------- */ - aggno = -1; - transno = -1; foreach(l, aggstate->aggs) { - AggrefExprState *aggrefstate = (AggrefExprState *) lfirst(l); - Aggref *aggref = aggrefstate->aggref; + Aggref *aggref = lfirst(l); AggStatePerAgg peragg; AggStatePerTrans pertrans; - int existing_aggno; - int existing_transno; - List *same_input_transnos; Oid inputTypes[FUNC_MAX_ARGS]; int numArguments; int numDirectArgs; HeapTuple aggTuple; Form_pg_aggregate aggform; AclResult aclresult; - Oid transfn_oid, - finalfn_oid; - bool shareable; + Oid finalfn_oid; Oid serialfn_oid, deserialfn_oid; + Oid aggOwner; Expr *finalfnexpr; Oid aggtranstype; - Datum textInitVal; - Datum initValue; - bool initValueIsNull; /* Planner should have assigned aggregate to correct level */ Assert(aggref->agglevelsup == 0); /* ... and the split mode should match */ Assert(aggref->aggsplit == aggstate->aggsplit); - /* 1. Check for already processed aggs which can be re-used */ - existing_aggno = find_compatible_peragg(aggref, aggstate, aggno, - &same_input_transnos); - if (existing_aggno != -1) - { - /* - * Existing compatible agg found. so just point the Aggref to the - * same per-agg struct. - */ - aggrefstate->aggno = existing_aggno; + peragg = &peraggs[aggref->aggno]; + + /* Check if we initialized the state for this aggregate already. */ + if (peragg->aggref != NULL) continue; - } - /* Mark Aggref state node with assigned index in the result array */ - peragg = &peraggs[++aggno]; peragg->aggref = aggref; - aggrefstate->aggno = aggno; + peragg->transno = aggref->aggtransno; /* Fetch the pg_aggregate row */ aggTuple = SearchSysCache1(AGGFNOID, @@ -3802,36 +3755,12 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) aggtranstype = aggref->aggtranstype; Assert(OidIsValid(aggtranstype)); - /* - * If this aggregation is performing state combines, then instead of - * using the transition function, we'll use the combine function - */ - if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit)) - { - transfn_oid = aggform->aggcombinefn; - - /* If not set then the planner messed up */ - if (!OidIsValid(transfn_oid)) - elog(ERROR, "combinefn not set for aggregate function"); - } - else - transfn_oid = aggform->aggtransfn; - /* Final function only required if we're finalizing the aggregates */ if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit)) peragg->finalfn_oid = finalfn_oid = InvalidOid; else peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn; - /* - * If finalfn is marked read-write, we can't share transition states; - * but it is okay to share states for AGGMODIFY_SHAREABLE aggs. Also, - * if we're not executing the finalfn here, we can share regardless. - */ - shareable = (aggform->aggfinalmodify != AGGMODIFY_READ_WRITE) || - (finalfn_oid == InvalidOid); - peragg->shareable = shareable; - serialfn_oid = InvalidOid; deserialfn_oid = InvalidOid; @@ -3871,7 +3800,6 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) /* Check that aggregate owner has permission to call component fns */ { HeapTuple procTuple; - Oid aggOwner; procTuple = SearchSysCache1(PROCOID, ObjectIdGetDatum(aggref->aggfnoid)); @@ -3881,12 +3809,6 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner; ReleaseSysCache(procTuple); - aclresult = pg_proc_aclcheck(transfn_oid, aggOwner, - ACL_EXECUTE); - if (aclresult != ACLCHECK_OK) - aclcheck_error(aclresult, OBJECT_FUNCTION, - get_func_name(transfn_oid)); - InvokeFunctionExecuteHook(transfn_oid); if (OidIsValid(finalfn_oid)) { aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner, @@ -3959,51 +3881,60 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) &peragg->resulttypeByVal); /* - * initval is potentially null, so don't try to access it as a struct - * field. Must do it the hard way with SysCacheGetAttr. + * Build working state for invoking the transition function, if we + * haven't done it already. */ - textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, - Anum_pg_aggregate_agginitval, - &initValueIsNull); - if (initValueIsNull) - initValue = (Datum) 0; - else - initValue = GetAggInitVal(textInitVal, aggtranstype); - - /* - * 2. Build working state for invoking the transition function, or - * look up previously initialized working state, if we can share it. - * - * find_compatible_peragg() already collected a list of shareable - * per-Trans's with the same inputs. Check if any of them have the - * same transition function and initial value. - */ - existing_transno = find_compatible_pertrans(aggstate, aggref, - shareable, - transfn_oid, aggtranstype, - serialfn_oid, deserialfn_oid, - initValue, initValueIsNull, - same_input_transnos); - if (existing_transno != -1) + pertrans = &pertransstates[aggref->aggtransno]; + if (pertrans->aggref == NULL) { + Datum textInitVal; + Datum initValue; + bool initValueIsNull; + Oid transfn_oid; + /* - * Existing compatible trans found, so just point the 'peragg' to - * the same per-trans struct, and mark the trans state as shared. + * If this aggregation is performing state combines, then instead + * of using the transition function, we'll use the combine + * function */ - pertrans = &pertransstates[existing_transno]; - pertrans->aggshared = true; - peragg->transno = existing_transno; - } - else - { - pertrans = &pertransstates[++transno]; + if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit)) + { + transfn_oid = aggform->aggcombinefn; + + /* If not set then the planner messed up */ + if (!OidIsValid(transfn_oid)) + elog(ERROR, "combinefn not set for aggregate function"); + } + else + transfn_oid = aggform->aggtransfn; + + aclresult = pg_proc_aclcheck(transfn_oid, aggOwner, + ACL_EXECUTE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_FUNCTION, + get_func_name(transfn_oid)); + InvokeFunctionExecuteHook(transfn_oid); + + /* + * initval is potentially null, so don't try to access it as a + * struct field. Must do it the hard way with SysCacheGetAttr. + */ + textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, + Anum_pg_aggregate_agginitval, + &initValueIsNull); + if (initValueIsNull) + initValue = (Datum) 0; + else + initValue = GetAggInitVal(textInitVal, aggtranstype); + build_pertrans_for_aggref(pertrans, aggstate, estate, aggref, transfn_oid, aggtranstype, serialfn_oid, deserialfn_oid, initValue, initValueIsNull, inputTypes, numArguments); - peragg->transno = transno; } + else + pertrans->aggshared = true; ReleaseSysCache(aggTuple); } @@ -4011,8 +3942,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) * Update aggstate->numaggs to be the number of unique aggregates found. * Also set numstates to the number of unique transition states found. */ - aggstate->numaggs = aggno + 1; - aggstate->numtrans = transno + 1; + aggstate->numaggs = numaggs; + aggstate->numtrans = numtrans; /* * Last, check whether any more aggregates got added onto the node while @@ -4024,7 +3955,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) * need to work hard on a helpful error message; but we defend against it * here anyway, just to be sure.) */ - if (numaggs != list_length(aggstate->aggs)) + if (numaggrefs != list_length(aggstate->aggs)) ereport(ERROR, (errcode(ERRCODE_GROUPING_ERROR), errmsg("aggregate function calls cannot be nested"))); @@ -4420,147 +4351,6 @@ GetAggInitVal(Datum textInitVal, Oid transtype) return initVal; } -/* - * find_compatible_peragg - search for a previously initialized per-Agg struct - * - * Searches the previously looked at aggregates to find one which is compatible - * with this one, with the same input parameters. If no compatible aggregate - * can be found, returns -1. - * - * As a side-effect, this also collects a list of existing, shareable per-Trans - * structs with matching inputs. If no identical Aggref is found, the list is - * passed later to find_compatible_pertrans, to see if we can at least reuse - * the state value of another aggregate. - */ -static int -find_compatible_peragg(Aggref *newagg, AggState *aggstate, - int lastaggno, List **same_input_transnos) -{ - int aggno; - AggStatePerAgg peraggs; - - *same_input_transnos = NIL; - - /* we mustn't reuse the aggref if it contains volatile function calls */ - if (contain_volatile_functions((Node *) newagg)) - return -1; - - peraggs = aggstate->peragg; - - /* - * Search through the list of already seen aggregates. If we find an - * existing identical aggregate call, then we can re-use that one. While - * searching, we'll also collect a list of Aggrefs with the same input - * parameters. If no matching Aggref is found, the caller can potentially - * still re-use the transition state of one of them. (At this stage we - * just compare the parsetrees; whether different aggregates share the - * same transition function will be checked later.) - */ - for (aggno = 0; aggno <= lastaggno; aggno++) - { - AggStatePerAgg peragg; - Aggref *existingRef; - - peragg = &peraggs[aggno]; - existingRef = peragg->aggref; - - /* all of the following must be the same or it's no match */ - if (newagg->inputcollid != existingRef->inputcollid || - newagg->aggtranstype != existingRef->aggtranstype || - newagg->aggstar != existingRef->aggstar || - newagg->aggvariadic != existingRef->aggvariadic || - newagg->aggkind != existingRef->aggkind || - !equal(newagg->args, existingRef->args) || - !equal(newagg->aggorder, existingRef->aggorder) || - !equal(newagg->aggdistinct, existingRef->aggdistinct) || - !equal(newagg->aggfilter, existingRef->aggfilter)) - continue; - - /* if it's the same aggregate function then report exact match */ - if (newagg->aggfnoid == existingRef->aggfnoid && - newagg->aggtype == existingRef->aggtype && - newagg->aggcollid == existingRef->aggcollid && - equal(newagg->aggdirectargs, existingRef->aggdirectargs)) - { - list_free(*same_input_transnos); - *same_input_transnos = NIL; - return aggno; - } - - /* - * Not identical, but it had the same inputs. If the final function - * permits sharing, return its transno to the caller, in case we can - * re-use its per-trans state. (If there's already sharing going on, - * we might report a transno more than once. find_compatible_pertrans - * is cheap enough that it's not worth spending cycles to avoid that.) - */ - if (peragg->shareable) - *same_input_transnos = lappend_int(*same_input_transnos, - peragg->transno); - } - - return -1; -} - -/* - * find_compatible_pertrans - search for a previously initialized per-Trans - * struct - * - * Searches the list of transnos for a per-Trans struct with the same - * transition function and initial condition. (The inputs have already been - * verified to match.) - */ -static int -find_compatible_pertrans(AggState *aggstate, Aggref *newagg, bool shareable, - Oid aggtransfn, Oid aggtranstype, - Oid aggserialfn, Oid aggdeserialfn, - Datum initValue, bool initValueIsNull, - List *transnos) -{ - ListCell *lc; - - /* If this aggregate can't share transition states, give up */ - if (!shareable) - return -1; - - foreach(lc, transnos) - { - int transno = lfirst_int(lc); - AggStatePerTrans pertrans = &aggstate->pertrans[transno]; - - /* - * if the transfns or transition state types are not the same then the - * state can't be shared. - */ - if (aggtransfn != pertrans->transfn_oid || - aggtranstype != pertrans->aggtranstype) - continue; - - /* - * The serialization and deserialization functions must match, if - * present, as we're unable to share the trans state for aggregates - * which will serialize or deserialize into different formats. - * Remember that these will be InvalidOid if they're not required for - * this agg node. - */ - if (aggserialfn != pertrans->serialfn_oid || - aggdeserialfn != pertrans->deserialfn_oid) - continue; - - /* - * Check that the initial condition matches, too. - */ - if (initValueIsNull && pertrans->initValueIsNull) - return transno; - - if (!initValueIsNull && !pertrans->initValueIsNull && - datumIsEqual(initValue, pertrans->initValue, - pertrans->transtypeByVal, pertrans->transtypeLen)) - return transno; - } - return -1; -} - void ExecEndAgg(AggState *node) { |