aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/nodeAgg.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor/nodeAgg.c')
-rw-r--r--src/backend/executor/nodeAgg.c364
1 files changed, 77 insertions, 287 deletions
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index 5bf9e99bbc8..dc640feb631 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -465,14 +465,6 @@ static void build_pertrans_for_aggref(AggStatePerTrans pertrans,
Oid aggserialfn, Oid aggdeserialfn,
Datum initValue, bool initValueIsNull,
Oid *inputTypes, int numArguments);
-static int find_compatible_peragg(Aggref *newagg, AggState *aggstate,
- int lastaggno, List **same_input_transnos);
-static int find_compatible_pertrans(AggState *aggstate, Aggref *newagg,
- bool shareable,
- Oid aggtransfn, Oid aggtranstype,
- Oid aggserialfn, Oid aggdeserialfn,
- Datum initValue, bool initValueIsNull,
- List *transnos);
/*
@@ -3244,9 +3236,11 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
Plan *outerPlan;
ExprContext *econtext;
TupleDesc scanDesc;
- int numaggs,
- transno,
- aggno;
+ int max_aggno;
+ int max_transno;
+ int numaggrefs;
+ int numaggs;
+ int numtrans;
int phase;
int phaseidx;
ListCell *l;
@@ -3422,9 +3416,9 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
* semantics, and it's forbidden by the spec. Because it is true, we
* don't need to worry about evaluating the aggs in any particular order.
*
- * Note: execExpr.c finds Aggrefs for us, and adds their AggrefExprState
- * nodes to aggstate->aggs. Aggrefs in the qual are found here; Aggrefs
- * in the targetlist are found during ExecAssignProjectionInfo, below.
+ * Note: execExpr.c finds Aggrefs for us, and adds them to aggstate->aggs.
+ * Aggrefs in the qual are found here; Aggrefs in the targetlist are found
+ * during ExecAssignProjectionInfo, above.
*/
aggstate->ss.ps.qual =
ExecInitQual(node->plan.qual, (PlanState *) aggstate);
@@ -3432,8 +3426,18 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
/*
* We should now have found all Aggrefs in the targetlist and quals.
*/
- numaggs = aggstate->numaggs;
- Assert(numaggs == list_length(aggstate->aggs));
+ numaggrefs = list_length(aggstate->aggs);
+ max_aggno = -1;
+ max_transno = -1;
+ foreach(l, aggstate->aggs)
+ {
+ Aggref *aggref = (Aggref *) lfirst(l);
+
+ max_aggno = Max(max_aggno, aggref->aggno);
+ max_transno = Max(max_transno, aggref->aggtransno);
+ }
+ numaggs = max_aggno + 1;
+ numtrans = max_transno + 1;
/*
* For each phase, prepare grouping set data and fmgr lookup data for
@@ -3604,7 +3608,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numaggs);
peraggs = (AggStatePerAgg) palloc0(sizeof(AggStatePerAggData) * numaggs);
- pertransstates = (AggStatePerTrans) palloc0(sizeof(AggStatePerTransData) * numaggs);
+ pertransstates = (AggStatePerTrans) palloc0(sizeof(AggStatePerTransData) * numtrans);
aggstate->peragg = peraggs;
aggstate->pertrans = pertransstates;
@@ -3695,92 +3699,41 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
select_current_set(aggstate, 0, false);
}
- /* -----------------
+ /*
* Perform lookups of aggregate function info, and initialize the
* unchanging fields of the per-agg and per-trans data.
- *
- * We try to optimize by detecting duplicate aggregate functions so that
- * their state and final values are re-used, rather than needlessly being
- * re-calculated independently. We also detect aggregates that are not
- * the same, but which can share the same transition state.
- *
- * Scenarios:
- *
- * 1. Identical aggregate function calls appear in the query:
- *
- * SELECT SUM(x) FROM ... HAVING SUM(x) > 0
- *
- * Since these aggregates are identical, we only need to calculate
- * the value once. Both aggregates will share the same 'aggno' value.
- *
- * 2. Two different aggregate functions appear in the query, but the
- * aggregates have the same arguments, transition functions and
- * initial values (and, presumably, different final functions):
- *
- * SELECT AVG(x), STDDEV(x) FROM ...
- *
- * In this case we must create a new peragg for the varying aggregate,
- * and we need to call the final functions separately, but we need
- * only run the transition function once. (This requires that the
- * final functions be nondestructive of the transition state, but
- * that's required anyway for other reasons.)
- *
- * For either of these optimizations to be valid, all aggregate properties
- * used in the transition phase must be the same, including any modifiers
- * such as ORDER BY, DISTINCT and FILTER, and the arguments mustn't
- * contain any volatile functions.
- * -----------------
*/
- aggno = -1;
- transno = -1;
foreach(l, aggstate->aggs)
{
- AggrefExprState *aggrefstate = (AggrefExprState *) lfirst(l);
- Aggref *aggref = aggrefstate->aggref;
+ Aggref *aggref = lfirst(l);
AggStatePerAgg peragg;
AggStatePerTrans pertrans;
- int existing_aggno;
- int existing_transno;
- List *same_input_transnos;
Oid inputTypes[FUNC_MAX_ARGS];
int numArguments;
int numDirectArgs;
HeapTuple aggTuple;
Form_pg_aggregate aggform;
AclResult aclresult;
- Oid transfn_oid,
- finalfn_oid;
- bool shareable;
+ Oid finalfn_oid;
Oid serialfn_oid,
deserialfn_oid;
+ Oid aggOwner;
Expr *finalfnexpr;
Oid aggtranstype;
- Datum textInitVal;
- Datum initValue;
- bool initValueIsNull;
/* Planner should have assigned aggregate to correct level */
Assert(aggref->agglevelsup == 0);
/* ... and the split mode should match */
Assert(aggref->aggsplit == aggstate->aggsplit);
- /* 1. Check for already processed aggs which can be re-used */
- existing_aggno = find_compatible_peragg(aggref, aggstate, aggno,
- &same_input_transnos);
- if (existing_aggno != -1)
- {
- /*
- * Existing compatible agg found. so just point the Aggref to the
- * same per-agg struct.
- */
- aggrefstate->aggno = existing_aggno;
+ peragg = &peraggs[aggref->aggno];
+
+ /* Check if we initialized the state for this aggregate already. */
+ if (peragg->aggref != NULL)
continue;
- }
- /* Mark Aggref state node with assigned index in the result array */
- peragg = &peraggs[++aggno];
peragg->aggref = aggref;
- aggrefstate->aggno = aggno;
+ peragg->transno = aggref->aggtransno;
/* Fetch the pg_aggregate row */
aggTuple = SearchSysCache1(AGGFNOID,
@@ -3802,36 +3755,12 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
aggtranstype = aggref->aggtranstype;
Assert(OidIsValid(aggtranstype));
- /*
- * If this aggregation is performing state combines, then instead of
- * using the transition function, we'll use the combine function
- */
- if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
- {
- transfn_oid = aggform->aggcombinefn;
-
- /* If not set then the planner messed up */
- if (!OidIsValid(transfn_oid))
- elog(ERROR, "combinefn not set for aggregate function");
- }
- else
- transfn_oid = aggform->aggtransfn;
-
/* Final function only required if we're finalizing the aggregates */
if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
peragg->finalfn_oid = finalfn_oid = InvalidOid;
else
peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
- /*
- * If finalfn is marked read-write, we can't share transition states;
- * but it is okay to share states for AGGMODIFY_SHAREABLE aggs. Also,
- * if we're not executing the finalfn here, we can share regardless.
- */
- shareable = (aggform->aggfinalmodify != AGGMODIFY_READ_WRITE) ||
- (finalfn_oid == InvalidOid);
- peragg->shareable = shareable;
-
serialfn_oid = InvalidOid;
deserialfn_oid = InvalidOid;
@@ -3871,7 +3800,6 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
/* Check that aggregate owner has permission to call component fns */
{
HeapTuple procTuple;
- Oid aggOwner;
procTuple = SearchSysCache1(PROCOID,
ObjectIdGetDatum(aggref->aggfnoid));
@@ -3881,12 +3809,6 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
ReleaseSysCache(procTuple);
- aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
- ACL_EXECUTE);
- if (aclresult != ACLCHECK_OK)
- aclcheck_error(aclresult, OBJECT_FUNCTION,
- get_func_name(transfn_oid));
- InvokeFunctionExecuteHook(transfn_oid);
if (OidIsValid(finalfn_oid))
{
aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
@@ -3959,51 +3881,60 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
&peragg->resulttypeByVal);
/*
- * initval is potentially null, so don't try to access it as a struct
- * field. Must do it the hard way with SysCacheGetAttr.
+ * Build working state for invoking the transition function, if we
+ * haven't done it already.
*/
- textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
- Anum_pg_aggregate_agginitval,
- &initValueIsNull);
- if (initValueIsNull)
- initValue = (Datum) 0;
- else
- initValue = GetAggInitVal(textInitVal, aggtranstype);
-
- /*
- * 2. Build working state for invoking the transition function, or
- * look up previously initialized working state, if we can share it.
- *
- * find_compatible_peragg() already collected a list of shareable
- * per-Trans's with the same inputs. Check if any of them have the
- * same transition function and initial value.
- */
- existing_transno = find_compatible_pertrans(aggstate, aggref,
- shareable,
- transfn_oid, aggtranstype,
- serialfn_oid, deserialfn_oid,
- initValue, initValueIsNull,
- same_input_transnos);
- if (existing_transno != -1)
+ pertrans = &pertransstates[aggref->aggtransno];
+ if (pertrans->aggref == NULL)
{
+ Datum textInitVal;
+ Datum initValue;
+ bool initValueIsNull;
+ Oid transfn_oid;
+
/*
- * Existing compatible trans found, so just point the 'peragg' to
- * the same per-trans struct, and mark the trans state as shared.
+ * If this aggregation is performing state combines, then instead
+ * of using the transition function, we'll use the combine
+ * function
*/
- pertrans = &pertransstates[existing_transno];
- pertrans->aggshared = true;
- peragg->transno = existing_transno;
- }
- else
- {
- pertrans = &pertransstates[++transno];
+ if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
+ {
+ transfn_oid = aggform->aggcombinefn;
+
+ /* If not set then the planner messed up */
+ if (!OidIsValid(transfn_oid))
+ elog(ERROR, "combinefn not set for aggregate function");
+ }
+ else
+ transfn_oid = aggform->aggtransfn;
+
+ aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
+ ACL_EXECUTE);
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error(aclresult, OBJECT_FUNCTION,
+ get_func_name(transfn_oid));
+ InvokeFunctionExecuteHook(transfn_oid);
+
+ /*
+ * initval is potentially null, so don't try to access it as a
+ * struct field. Must do it the hard way with SysCacheGetAttr.
+ */
+ textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
+ Anum_pg_aggregate_agginitval,
+ &initValueIsNull);
+ if (initValueIsNull)
+ initValue = (Datum) 0;
+ else
+ initValue = GetAggInitVal(textInitVal, aggtranstype);
+
build_pertrans_for_aggref(pertrans, aggstate, estate,
aggref, transfn_oid, aggtranstype,
serialfn_oid, deserialfn_oid,
initValue, initValueIsNull,
inputTypes, numArguments);
- peragg->transno = transno;
}
+ else
+ pertrans->aggshared = true;
ReleaseSysCache(aggTuple);
}
@@ -4011,8 +3942,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
* Update aggstate->numaggs to be the number of unique aggregates found.
* Also set numstates to the number of unique transition states found.
*/
- aggstate->numaggs = aggno + 1;
- aggstate->numtrans = transno + 1;
+ aggstate->numaggs = numaggs;
+ aggstate->numtrans = numtrans;
/*
* Last, check whether any more aggregates got added onto the node while
@@ -4024,7 +3955,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
* need to work hard on a helpful error message; but we defend against it
* here anyway, just to be sure.)
*/
- if (numaggs != list_length(aggstate->aggs))
+ if (numaggrefs != list_length(aggstate->aggs))
ereport(ERROR,
(errcode(ERRCODE_GROUPING_ERROR),
errmsg("aggregate function calls cannot be nested")));
@@ -4420,147 +4351,6 @@ GetAggInitVal(Datum textInitVal, Oid transtype)
return initVal;
}
-/*
- * find_compatible_peragg - search for a previously initialized per-Agg struct
- *
- * Searches the previously looked at aggregates to find one which is compatible
- * with this one, with the same input parameters. If no compatible aggregate
- * can be found, returns -1.
- *
- * As a side-effect, this also collects a list of existing, shareable per-Trans
- * structs with matching inputs. If no identical Aggref is found, the list is
- * passed later to find_compatible_pertrans, to see if we can at least reuse
- * the state value of another aggregate.
- */
-static int
-find_compatible_peragg(Aggref *newagg, AggState *aggstate,
- int lastaggno, List **same_input_transnos)
-{
- int aggno;
- AggStatePerAgg peraggs;
-
- *same_input_transnos = NIL;
-
- /* we mustn't reuse the aggref if it contains volatile function calls */
- if (contain_volatile_functions((Node *) newagg))
- return -1;
-
- peraggs = aggstate->peragg;
-
- /*
- * Search through the list of already seen aggregates. If we find an
- * existing identical aggregate call, then we can re-use that one. While
- * searching, we'll also collect a list of Aggrefs with the same input
- * parameters. If no matching Aggref is found, the caller can potentially
- * still re-use the transition state of one of them. (At this stage we
- * just compare the parsetrees; whether different aggregates share the
- * same transition function will be checked later.)
- */
- for (aggno = 0; aggno <= lastaggno; aggno++)
- {
- AggStatePerAgg peragg;
- Aggref *existingRef;
-
- peragg = &peraggs[aggno];
- existingRef = peragg->aggref;
-
- /* all of the following must be the same or it's no match */
- if (newagg->inputcollid != existingRef->inputcollid ||
- newagg->aggtranstype != existingRef->aggtranstype ||
- newagg->aggstar != existingRef->aggstar ||
- newagg->aggvariadic != existingRef->aggvariadic ||
- newagg->aggkind != existingRef->aggkind ||
- !equal(newagg->args, existingRef->args) ||
- !equal(newagg->aggorder, existingRef->aggorder) ||
- !equal(newagg->aggdistinct, existingRef->aggdistinct) ||
- !equal(newagg->aggfilter, existingRef->aggfilter))
- continue;
-
- /* if it's the same aggregate function then report exact match */
- if (newagg->aggfnoid == existingRef->aggfnoid &&
- newagg->aggtype == existingRef->aggtype &&
- newagg->aggcollid == existingRef->aggcollid &&
- equal(newagg->aggdirectargs, existingRef->aggdirectargs))
- {
- list_free(*same_input_transnos);
- *same_input_transnos = NIL;
- return aggno;
- }
-
- /*
- * Not identical, but it had the same inputs. If the final function
- * permits sharing, return its transno to the caller, in case we can
- * re-use its per-trans state. (If there's already sharing going on,
- * we might report a transno more than once. find_compatible_pertrans
- * is cheap enough that it's not worth spending cycles to avoid that.)
- */
- if (peragg->shareable)
- *same_input_transnos = lappend_int(*same_input_transnos,
- peragg->transno);
- }
-
- return -1;
-}
-
-/*
- * find_compatible_pertrans - search for a previously initialized per-Trans
- * struct
- *
- * Searches the list of transnos for a per-Trans struct with the same
- * transition function and initial condition. (The inputs have already been
- * verified to match.)
- */
-static int
-find_compatible_pertrans(AggState *aggstate, Aggref *newagg, bool shareable,
- Oid aggtransfn, Oid aggtranstype,
- Oid aggserialfn, Oid aggdeserialfn,
- Datum initValue, bool initValueIsNull,
- List *transnos)
-{
- ListCell *lc;
-
- /* If this aggregate can't share transition states, give up */
- if (!shareable)
- return -1;
-
- foreach(lc, transnos)
- {
- int transno = lfirst_int(lc);
- AggStatePerTrans pertrans = &aggstate->pertrans[transno];
-
- /*
- * if the transfns or transition state types are not the same then the
- * state can't be shared.
- */
- if (aggtransfn != pertrans->transfn_oid ||
- aggtranstype != pertrans->aggtranstype)
- continue;
-
- /*
- * The serialization and deserialization functions must match, if
- * present, as we're unable to share the trans state for aggregates
- * which will serialize or deserialize into different formats.
- * Remember that these will be InvalidOid if they're not required for
- * this agg node.
- */
- if (aggserialfn != pertrans->serialfn_oid ||
- aggdeserialfn != pertrans->deserialfn_oid)
- continue;
-
- /*
- * Check that the initial condition matches, too.
- */
- if (initValueIsNull && pertrans->initValueIsNull)
- return transno;
-
- if (!initValueIsNull && !pertrans->initValueIsNull &&
- datumIsEqual(initValue, pertrans->initValue,
- pertrans->transtypeByVal, pertrans->transtypeLen))
- return transno;
- }
- return -1;
-}
-
void
ExecEndAgg(AggState *node)
{