diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2014-04-12 11:58:53 -0400 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2014-04-12 12:03:30 -0400 |
commit | a9d9acbf219b9e96585779cd5f99d674d4ccba74 (patch) | |
tree | 4bd26a78fa7f6f0bc558c611278e42a9f41d4875 /src/backend/executor/nodeWindowAgg.c | |
parent | 3c41b812c5578fd7bd5c2de42941012d7d56dde2 (diff) | |
download | postgresql-a9d9acbf219b9e96585779cd5f99d674d4ccba74.tar.gz postgresql-a9d9acbf219b9e96585779cd5f99d674d4ccba74.zip |
Create infrastructure for moving-aggregate optimization.
Until now, when executing an aggregate function as a window function
within a window with moving frame start (that is, any frame start mode
except UNBOUNDED PRECEDING), we had to recalculate the aggregate from
scratch each time the frame head moved. This patch allows an aggregate
definition to include an alternate "moving aggregate" implementation
that includes an inverse transition function for removing rows from
the aggregate's running state. As long as this can be done successfully,
runtime is proportional to the total number of input rows, rather than
to the number of input rows times the average frame length.
This commit includes the core infrastructure, documentation, and regression
tests using user-defined aggregates. Follow-on commits will update some
of the built-in aggregates to use this feature.
David Rowley and Florian Pflug, reviewed by Dean Rasheed; additional
hacking by me
Diffstat (limited to 'src/backend/executor/nodeWindowAgg.c')
-rw-r--r-- | src/backend/executor/nodeWindowAgg.c | 639 |
1 files changed, 538 insertions, 101 deletions
diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c index 0b558e59231..046637fb092 100644 --- a/src/backend/executor/nodeWindowAgg.c +++ b/src/backend/executor/nodeWindowAgg.c @@ -102,16 +102,18 @@ typedef struct WindowStatePerFuncData */ typedef struct WindowStatePerAggData { - /* Oids of transfer functions */ + /* Oids of transition functions */ Oid transfn_oid; + Oid invtransfn_oid; /* may be InvalidOid */ Oid finalfn_oid; /* may be InvalidOid */ /* - * fmgr lookup data for transfer functions --- only valid when + * fmgr lookup data for transition functions --- only valid when * corresponding oid is not InvalidOid. Note in particular that fn_strict * flags are kept here. */ FmgrInfo transfn; + FmgrInfo invtransfn; FmgrInfo finalfn; /* @@ -139,11 +141,17 @@ typedef struct WindowStatePerAggData int wfuncno; /* index of associated PerFuncData */ + /* Context holding transition value and possibly other subsidiary data */ + MemoryContext aggcontext; /* may be private, or winstate->aggcontext */ + /* Current transition value */ Datum transValue; /* current transition value */ bool transValueIsNull; - bool noTransValue; /* true if transValue not set yet */ + int64 transValueCount; /* number of currently-aggregated rows */ + + /* Data local to eval_windowaggregates() */ + bool restart; /* need to restart this agg in this cycle? */ } WindowStatePerAggData; static void initialize_windowaggregate(WindowAggState *winstate, @@ -152,6 +160,9 @@ static void initialize_windowaggregate(WindowAggState *winstate, static void advance_windowaggregate(WindowAggState *winstate, WindowStatePerFunc perfuncstate, WindowStatePerAgg peraggstate); +static bool advance_windowaggregate_base(WindowAggState *winstate, + WindowStatePerFunc perfuncstate, + WindowStatePerAgg peraggstate); static void finalize_windowaggregate(WindowAggState *winstate, WindowStatePerFunc perfuncstate, WindowStatePerAgg peraggstate, @@ -193,18 +204,27 @@ initialize_windowaggregate(WindowAggState *winstate, { MemoryContext oldContext; + /* + * If we're using a private aggcontext, we may reset it here. But if the + * context is shared, we don't know which other aggregates may still need + * it, so we must leave it to the caller to reset at an appropriate time. + */ + if (peraggstate->aggcontext != winstate->aggcontext) + MemoryContextResetAndDeleteChildren(peraggstate->aggcontext); + if (peraggstate->initValueIsNull) peraggstate->transValue = peraggstate->initValue; else { - oldContext = MemoryContextSwitchTo(winstate->aggcontext); + oldContext = MemoryContextSwitchTo(peraggstate->aggcontext); peraggstate->transValue = datumCopy(peraggstate->initValue, peraggstate->transtypeByVal, peraggstate->transtypeLen); MemoryContextSwitchTo(oldContext); } peraggstate->transValueIsNull = peraggstate->initValueIsNull; - peraggstate->noTransValue = peraggstate->initValueIsNull; + peraggstate->transValueCount = 0; + peraggstate->resultValue = (Datum) 0; peraggstate->resultValueIsNull = true; } @@ -258,7 +278,8 @@ advance_windowaggregate(WindowAggState *winstate, { /* * For a strict transfn, nothing happens when there's a NULL input; we - * just keep the prior transValue. + * just keep the prior transValue. Note transValueCount doesn't + * change either. */ for (i = 1; i <= numArguments; i++) { @@ -268,41 +289,47 @@ advance_windowaggregate(WindowAggState *winstate, return; } } - if (peraggstate->noTransValue) + + /* + * For strict transition functions with initial value NULL we use the + * first non-NULL input as the initial state. (We already checked + * that the agg's input type is binary-compatible with its transtype, + * so straight copy here is OK.) + * + * We must copy the datum into aggcontext if it is pass-by-ref. We do + * not need to pfree the old transValue, since it's NULL. + */ + if (peraggstate->transValueCount == 0 && peraggstate->transValueIsNull) { - /* - * transValue has not been initialized. This is the first non-NULL - * input value. We use it as the initial value for transValue. (We - * already checked that the agg's input type is binary-compatible - * with its transtype, so straight copy here is OK.) - * - * We must copy the datum into aggcontext if it is pass-by-ref. We - * do not need to pfree the old transValue, since it's NULL. - */ - MemoryContextSwitchTo(winstate->aggcontext); + MemoryContextSwitchTo(peraggstate->aggcontext); peraggstate->transValue = datumCopy(fcinfo->arg[1], peraggstate->transtypeByVal, peraggstate->transtypeLen); peraggstate->transValueIsNull = false; - peraggstate->noTransValue = false; + peraggstate->transValueCount = 1; MemoryContextSwitchTo(oldContext); return; } + if (peraggstate->transValueIsNull) { /* * Don't call a strict function with NULL inputs. Note it is * possible to get here despite the above tests, if the transfn is - * strict *and* returned a NULL on a prior cycle. If that happens - * we will propagate the NULL all the way to the end. + * strict *and* returned a NULL on a prior cycle. If that happens + * we will propagate the NULL all the way to the end. That can + * only happen if there's no inverse transition function, though, + * since we disallow transitions back to NULL when there is one. */ MemoryContextSwitchTo(oldContext); + Assert(!OidIsValid(peraggstate->invtransfn_oid)); return; } } /* - * OK to call the transition function + * OK to call the transition function. Set winstate->curaggcontext while + * calling it, for possible use by AggCheckCallContext. */ InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn), numArguments + 1, @@ -310,7 +337,26 @@ advance_windowaggregate(WindowAggState *winstate, (void *) winstate, NULL); fcinfo->arg[0] = peraggstate->transValue; fcinfo->argnull[0] = peraggstate->transValueIsNull; + winstate->curaggcontext = peraggstate->aggcontext; newVal = FunctionCallInvoke(fcinfo); + winstate->curaggcontext = NULL; + + /* + * Moving-aggregate transition functions must not return NULL, see + * advance_windowaggregate_base(). + */ + if (fcinfo->isnull && OidIsValid(peraggstate->invtransfn_oid)) + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("moving-aggregate transition function must not return NULL"))); + + /* + * We must track the number of rows included in transValue, since to + * remove the last input, advance_windowaggregate_base() musn't call the + * inverse transition function, but simply reset transValue back to its + * initial value. + */ + peraggstate->transValueCount++; /* * If pass-by-ref datatype, must copy the new value into aggcontext and @@ -322,7 +368,161 @@ advance_windowaggregate(WindowAggState *winstate, { if (!fcinfo->isnull) { - MemoryContextSwitchTo(winstate->aggcontext); + MemoryContextSwitchTo(peraggstate->aggcontext); + newVal = datumCopy(newVal, + peraggstate->transtypeByVal, + peraggstate->transtypeLen); + } + if (!peraggstate->transValueIsNull) + pfree(DatumGetPointer(peraggstate->transValue)); + } + + MemoryContextSwitchTo(oldContext); + peraggstate->transValue = newVal; + peraggstate->transValueIsNull = fcinfo->isnull; +} + +/* + * advance_windowaggregate_base + * Remove the oldest tuple from an aggregation. + * + * This is very much like advance_windowaggregate, except that we will call + * the inverse transition function (which caller must have checked is + * available). + * + * Returns true if we successfully removed the current row from this + * aggregate, false if not (in the latter case, caller is responsible + * for cleaning up by restarting the aggregation). + */ +static bool +advance_windowaggregate_base(WindowAggState *winstate, + WindowStatePerFunc perfuncstate, + WindowStatePerAgg peraggstate) +{ + WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate; + int numArguments = perfuncstate->numArguments; + FunctionCallInfoData fcinfodata; + FunctionCallInfo fcinfo = &fcinfodata; + Datum newVal; + ListCell *arg; + int i; + MemoryContext oldContext; + ExprContext *econtext = winstate->tmpcontext; + ExprState *filter = wfuncstate->aggfilter; + + oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); + + /* Skip anything FILTERed out */ + if (filter) + { + bool isnull; + Datum res = ExecEvalExpr(filter, econtext, &isnull, NULL); + + if (isnull || !DatumGetBool(res)) + { + MemoryContextSwitchTo(oldContext); + return true; + } + } + + /* We start from 1, since the 0th arg will be the transition value */ + i = 1; + foreach(arg, wfuncstate->args) + { + ExprState *argstate = (ExprState *) lfirst(arg); + + fcinfo->arg[i] = ExecEvalExpr(argstate, econtext, + &fcinfo->argnull[i], NULL); + i++; + } + + if (peraggstate->invtransfn.fn_strict) + { + /* + * For a strict (inv)transfn, nothing happens when there's a NULL + * input; we just keep the prior transValue. Note transValueCount + * doesn't change either. + */ + for (i = 1; i <= numArguments; i++) + { + if (fcinfo->argnull[i]) + { + MemoryContextSwitchTo(oldContext); + return true; + } + } + } + + /* There should still be an added but not yet removed value */ + Assert(peraggstate->transValueCount > 0); + + /* + * In moving-aggregate mode, the state must never be NULL, except possibly + * before any rows have been aggregated (which is surely not the case at + * this point). This restriction allows us to interpret a NULL result + * from the inverse function as meaning "sorry, can't do an inverse + * transition in this case". We already checked this in + * advance_windowaggregate, but just for safety, check again. + */ + if (peraggstate->transValueIsNull) + elog(ERROR, "aggregate transition value is NULL before inverse transition"); + + /* + * We mustn't use the inverse transition function to remove the last + * input. Doing so would yield a non-NULL state, whereas we should be in + * the initial state afterwards which may very well be NULL. So instead, + * we simply re-initialize the aggregate in this case. + */ + if (peraggstate->transValueCount == 1) + { + MemoryContextSwitchTo(oldContext); + initialize_windowaggregate(winstate, + &winstate->perfunc[peraggstate->wfuncno], + peraggstate); + return true; + } + + /* + * OK to call the inverse transition function. Set + * winstate->curaggcontext while calling it, for possible use by + * AggCheckCallContext. + */ + InitFunctionCallInfoData(*fcinfo, &(peraggstate->invtransfn), + numArguments + 1, + perfuncstate->winCollation, + (void *) winstate, NULL); + fcinfo->arg[0] = peraggstate->transValue; + fcinfo->argnull[0] = peraggstate->transValueIsNull; + winstate->curaggcontext = peraggstate->aggcontext; + newVal = FunctionCallInvoke(fcinfo); + winstate->curaggcontext = NULL; + + /* + * If the function returns NULL, report failure, forcing a restart. + */ + if (fcinfo->isnull) + { + MemoryContextSwitchTo(oldContext); + return false; + } + + /* Update number of rows included in transValue */ + peraggstate->transValueCount--; + + /* + * If pass-by-ref datatype, must copy the new value into aggcontext and + * pfree the prior transValue. But if invtransfn returned a pointer to + * its first input, we don't need to do anything. + * + * Note: the checks for null values here will never fire, but it seems + * best to have this stanza look just like advance_windowaggregate. + */ + if (!peraggstate->transtypeByVal && + DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue)) + { + if (!fcinfo->isnull) + { + MemoryContextSwitchTo(peraggstate->aggcontext); newVal = datumCopy(newVal, peraggstate->transtypeByVal, peraggstate->transtypeLen); @@ -334,6 +534,8 @@ advance_windowaggregate(WindowAggState *winstate, MemoryContextSwitchTo(oldContext); peraggstate->transValue = newVal; peraggstate->transValueIsNull = fcinfo->isnull; + + return true; } /* @@ -370,7 +572,9 @@ finalize_windowaggregate(WindowAggState *winstate, } else { + winstate->curaggcontext = peraggstate->aggcontext; *result = FunctionCallInvoke(&fcinfo); + winstate->curaggcontext = NULL; *isnull = fcinfo.isnull; } } @@ -396,7 +600,9 @@ finalize_windowaggregate(WindowAggState *winstate, * eval_windowaggregates * evaluate plain aggregates being used as window functions * - * Much of this is duplicated from nodeAgg.c. But NOTE that we expect to be + * This differs from nodeAgg.c in two ways. First, if the window's frame + * start position moves, we use the inverse transition function (if it exists) + * to remove rows from the transition value. And second, we expect to be * able to call aggregate final functions repeatedly after aggregating more * data onto the same transition value. This is not a behavior required by * nodeAgg.c. @@ -406,12 +612,15 @@ eval_windowaggregates(WindowAggState *winstate) { WindowStatePerAgg peraggstate; int wfuncno, - numaggs; - int i; + numaggs, + numaggs_restart, + i; + int64 aggregatedupto_nonrestarted; MemoryContext oldContext; ExprContext *econtext; WindowObject agg_winobj; TupleTableSlot *agg_row_slot; + TupleTableSlot *temp_slot; numaggs = winstate->numaggs; if (numaggs == 0) @@ -421,6 +630,7 @@ eval_windowaggregates(WindowAggState *winstate) econtext = winstate->ss.ps.ps_ExprContext; agg_winobj = winstate->agg_winobj; agg_row_slot = winstate->agg_row_slot; + temp_slot = winstate->temp_slot_1; /* * Currently, we support only a subset of the SQL-standard window framing @@ -438,9 +648,17 @@ eval_windowaggregates(WindowAggState *winstate) * damage the running transition value, but we have the same assumption in * nodeAgg.c too (when it rescans an existing hash table). * - * For other frame start rules, we discard the aggregate state and re-run - * the aggregates whenever the frame head row moves. We can still - * optimize as above whenever successive rows share the same frame head. + * If the frame start does sometimes move, we can still optimize as above + * whenever successive rows share the same frame head, but if the frame + * head moves beyond the previous head we try to remove those rows using + * the aggregate's inverse transition function. This function restores + * the aggregate's current state to what it would be if the removed row + * had never been aggregated in the first place. Inverse transition + * functions may optionally return NULL, indicating that the function was + * unable to remove the tuple from aggregation. If this happens, or if + * the aggregate doesn't have an inverse transition function at all, we + * must perform the aggregation all over again for all tuples within the + * new frame boundaries. * * In many common cases, multiple rows share the same frame and hence the * same aggregate value. (In particular, if there's no ORDER BY in a RANGE @@ -452,75 +670,195 @@ eval_windowaggregates(WindowAggState *winstate) * 'aggregatedupto' keeps track of the first row that has not yet been * accumulated into the aggregate transition values. Whenever we start a * new peer group, we accumulate forward to the end of the peer group. - * - * TODO: Rerunning aggregates from the frame start can be pretty slow. For - * some aggregates like SUM and COUNT we could avoid that by implementing - * a "negative transition function" that would be called for each row as - * it exits the frame. We'd have to think about avoiding recalculation of - * volatile arguments of aggregate functions, too. */ /* * First, update the frame head position. + * + * The frame head should never move backwards, and the code below wouldn't + * cope if it did, so for safety we complain if it does. */ - update_frameheadpos(agg_winobj, winstate->temp_slot_1); + update_frameheadpos(agg_winobj, temp_slot); + if (winstate->frameheadpos < winstate->aggregatedbase) + elog(ERROR, "window frame head moved backward"); /* - * Initialize aggregates on first call for partition, or if the frame head - * position moved since last time. + * If the frame didn't change compared to the previous row, we can re-use + * the result values that were previously saved at the bottom of this + * function. Since we don't know the current frame's end yet, this is not + * possible to check for fully. But if the frame end mode is UNBOUNDED + * FOLLOWING or CURRENT ROW, and the current row lies within the previous + * row's frame, then the two frames' ends must coincide. Note that on the + * first row aggregatedbase == aggregatedupto, meaning this test must + * fail, so we don't need to check the "there was no previous row" case + * explicitly here. */ - if (winstate->currentpos == 0 || - winstate->frameheadpos != winstate->aggregatedbase) + if (winstate->aggregatedbase == winstate->frameheadpos && + (winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING | + FRAMEOPTION_END_CURRENT_ROW)) && + winstate->aggregatedbase <= winstate->currentpos && + winstate->aggregatedupto > winstate->currentpos) { - /* - * Discard transient aggregate values - */ - MemoryContextResetAndDeleteChildren(winstate->aggcontext); - for (i = 0; i < numaggs; i++) { peraggstate = &winstate->peragg[i]; wfuncno = peraggstate->wfuncno; - initialize_windowaggregate(winstate, - &winstate->perfunc[wfuncno], - peraggstate); + econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue; + econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull; } + return; + } + /*---------- + * Initialize restart flags. + * + * We restart the aggregation: + * - if we're processing the first row in the partition, or + * - if the frame's head moved and we cannot use an inverse + * transition function, or + * - if the new frame doesn't overlap the old one + * + * Note that we don't strictly need to restart in the last case, but if + * we're going to remove all rows from the aggregation anyway, a restart + * surely is faster. + *---------- + */ + numaggs_restart = 0; + for (i = 0; i < numaggs; i++) + { + peraggstate = &winstate->peragg[i]; + if (winstate->currentpos == 0 || + (winstate->aggregatedbase != winstate->frameheadpos && + !OidIsValid(peraggstate->invtransfn_oid)) || + winstate->aggregatedupto <= winstate->frameheadpos) + { + peraggstate->restart = true; + numaggs_restart++; + } + else + peraggstate->restart = false; + } + + /* + * If we have any possibly-moving aggregates, attempt to advance + * aggregatedbase to match the frame's head by removing input rows that + * fell off the top of the frame from the aggregations. This can fail, + * i.e. advance_windowaggregate_base() can return false, in which case + * we'll restart that aggregate below. + */ + while (numaggs_restart < numaggs && + winstate->aggregatedbase < winstate->frameheadpos) + { /* - * If we created a mark pointer for aggregates, keep it pushed up to - * frame head, so that tuplestore can discard unnecessary rows. + * Fetch the next tuple of those being removed. This should never fail + * as we should have been here before. */ - if (agg_winobj->markptr >= 0) - WinSetMarkPosition(agg_winobj, winstate->frameheadpos); + if (!window_gettupleslot(agg_winobj, winstate->aggregatedbase, + temp_slot)) + elog(ERROR, "could not re-fetch previously fetched frame row"); + + /* Set tuple context for evaluation of aggregate arguments */ + winstate->tmpcontext->ecxt_outertuple = temp_slot; /* - * Initialize for loop below + * Perform the inverse transition for each aggregate function in the + * window, unless it has already been marked as needing a restart. */ - ExecClearTuple(agg_row_slot); - winstate->aggregatedbase = winstate->frameheadpos; - winstate->aggregatedupto = winstate->frameheadpos; + for (i = 0; i < numaggs; i++) + { + bool ok; + + peraggstate = &winstate->peragg[i]; + if (peraggstate->restart) + continue; + + wfuncno = peraggstate->wfuncno; + ok = advance_windowaggregate_base(winstate, + &winstate->perfunc[wfuncno], + peraggstate); + if (!ok) + { + /* Inverse transition function has failed, must restart */ + peraggstate->restart = true; + numaggs_restart++; + } + } + + /* Reset per-input-tuple context after each tuple */ + ResetExprContext(winstate->tmpcontext); + + /* And advance the aggregated-row state */ + winstate->aggregatedbase++; + ExecClearTuple(temp_slot); } /* - * In UNBOUNDED_FOLLOWING mode, we don't have to recalculate aggregates - * except when the frame head moves. In END_CURRENT_ROW mode, we only - * have to recalculate when the frame head moves or currentpos has - * advanced past the place we'd aggregated up to. Check for these cases - * and if so, reuse the saved result values. + * If we successfully advanced the base rows of all the aggregates, + * aggregatedbase now equals frameheadpos; but if we failed for any, we + * must forcibly update aggregatedbase. */ - if ((winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING | - FRAMEOPTION_END_CURRENT_ROW)) && - winstate->aggregatedbase <= winstate->currentpos && - winstate->aggregatedupto > winstate->currentpos) + winstate->aggregatedbase = winstate->frameheadpos; + + /* + * If we created a mark pointer for aggregates, keep it pushed up to frame + * head, so that tuplestore can discard unnecessary rows. + */ + if (agg_winobj->markptr >= 0) + WinSetMarkPosition(agg_winobj, winstate->frameheadpos); + + /* + * Now restart the aggregates that require it. + * + * We assume that aggregates using the shared context always restart if + * *any* aggregate restarts, and we may thus clean up the shared + * aggcontext if that is the case. Private aggcontexts are reset by + * initialize_windowaggregate() if their owning aggregate restarts. If we + * aren't restarting an aggregate, we need to free any previously saved + * result for it, else we'll leak memory. + */ + if (numaggs_restart > 0) + MemoryContextResetAndDeleteChildren(winstate->aggcontext); + for (i = 0; i < numaggs; i++) { - for (i = 0; i < numaggs; i++) + peraggstate = &winstate->peragg[i]; + + /* Aggregates using the shared ctx must restart if *any* agg does */ + Assert(peraggstate->aggcontext != winstate->aggcontext || + numaggs_restart == 0 || + peraggstate->restart); + + if (peraggstate->restart) { - peraggstate = &winstate->peragg[i]; wfuncno = peraggstate->wfuncno; - econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue; - econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull; + initialize_windowaggregate(winstate, + &winstate->perfunc[wfuncno], + peraggstate); + } + else if (!peraggstate->resultValueIsNull) + { + if (!peraggstate->resulttypeByVal) + pfree(DatumGetPointer(peraggstate->resultValue)); + peraggstate->resultValue = (Datum) 0; + peraggstate->resultValueIsNull = true; } - return; + } + + /* + * Non-restarted aggregates now contain the rows between aggregatedbase + * (i.e., frameheadpos) and aggregatedupto, while restarted aggregates + * contain no rows. If there are any restarted aggregates, we must thus + * begin aggregating anew at frameheadpos, otherwise we may simply + * continue at aggregatedupto. We must remember the old value of + * aggregatedupto to know how long to skip advancing non-restarted + * aggregates. If we modify aggregatedupto, we must also clear + * agg_row_slot, per the loop invariant below. + */ + aggregatedupto_nonrestarted = winstate->aggregatedupto; + if (numaggs_restart > 0 && + winstate->aggregatedupto != winstate->frameheadpos) + { + winstate->aggregatedupto = winstate->frameheadpos; + ExecClearTuple(agg_row_slot); } /* @@ -551,6 +889,12 @@ eval_windowaggregates(WindowAggState *winstate) for (i = 0; i < numaggs; i++) { peraggstate = &winstate->peragg[i]; + + /* Non-restarted aggs skip until aggregatedupto_nonrestarted */ + if (!peraggstate->restart && + winstate->aggregatedupto < aggregatedupto_nonrestarted) + continue; + wfuncno = peraggstate->wfuncno; advance_windowaggregate(winstate, &winstate->perfunc[wfuncno], @@ -565,6 +909,9 @@ eval_windowaggregates(WindowAggState *winstate) ExecClearTuple(agg_row_slot); } + /* The frame's end is not supposed to move backwards, ever */ + Assert(aggregatedupto_nonrestarted <= winstate->aggregatedupto); + /* * finalize aggregates and fill result/isnull fields. */ @@ -589,28 +936,14 @@ eval_windowaggregates(WindowAggState *winstate) * advance that the next row can't possibly share the same frame. Is * it worth detecting that and skipping this code? */ - if (!peraggstate->resulttypeByVal) + if (!peraggstate->resulttypeByVal && !*isnull) { - /* - * clear old resultValue in order not to leak memory. (Note: the - * new result can't possibly be the same datum as old resultValue, - * because we never passed it to the trans function.) - */ - if (!peraggstate->resultValueIsNull) - pfree(DatumGetPointer(peraggstate->resultValue)); - - /* - * If pass-by-ref, copy it into our aggregate context. - */ - if (!*isnull) - { - oldContext = MemoryContextSwitchTo(winstate->aggcontext); - peraggstate->resultValue = - datumCopy(*result, - peraggstate->resulttypeByVal, - peraggstate->resulttypeLen); - MemoryContextSwitchTo(oldContext); - } + oldContext = MemoryContextSwitchTo(peraggstate->aggcontext); + peraggstate->resultValue = + datumCopy(*result, + peraggstate->resulttypeByVal, + peraggstate->resulttypeLen); + MemoryContextSwitchTo(oldContext); } else { @@ -650,6 +983,8 @@ eval_windowfunction(WindowAggState *winstate, WindowStatePerFunc perfuncstate, (void *) perfuncstate->winobj, NULL); /* Just in case, make all the regular argument slots be null */ memset(fcinfo.argnull, true, perfuncstate->numArguments); + /* Window functions don't have a current aggregate context, either */ + winstate->curaggcontext = NULL; *result = FunctionCallInvoke(&fcinfo); *isnull = fcinfo.isnull; @@ -870,6 +1205,11 @@ release_partition(WindowAggState *winstate) */ MemoryContextResetAndDeleteChildren(winstate->partcontext); MemoryContextResetAndDeleteChildren(winstate->aggcontext); + for (i = 0; i < winstate->numaggs; i++) + { + if (winstate->peragg[i].aggcontext != winstate->aggcontext) + MemoryContextResetAndDeleteChildren(winstate->peragg[i].aggcontext); + } if (winstate->buffer) tuplestore_end(winstate->buffer); @@ -1450,7 +1790,12 @@ ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags) ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); - /* Create mid-lived context for aggregate trans values etc */ + /* + * Create mid-lived context for aggregate trans values etc. + * + * Note that moving aggregates each use their own private context, not + * this one. + */ winstate->aggcontext = AllocSetContextCreate(CurrentMemoryContext, "WindowAgg_Aggregates", @@ -1657,12 +2002,10 @@ void ExecEndWindowAgg(WindowAggState *node) { PlanState *outerPlan; + int i; release_partition(node); - pfree(node->perfunc); - pfree(node->peragg); - ExecClearTuple(node->ss.ss_ScanTupleSlot); ExecClearTuple(node->first_part_slot); ExecClearTuple(node->agg_row_slot); @@ -1676,9 +2019,17 @@ ExecEndWindowAgg(WindowAggState *node) node->ss.ps.ps_ExprContext = node->tmpcontext; ExecFreeExprContext(&node->ss.ps); + for (i = 0; i < node->numaggs; i++) + { + if (node->peragg[i].aggcontext != node->aggcontext) + MemoryContextDelete(node->peragg[i].aggcontext); + } MemoryContextDelete(node->partcontext); MemoryContextDelete(node->aggcontext); + pfree(node->perfunc); + pfree(node->peragg); + outerPlan = outerPlanState(node); ExecEndNode(outerPlan); } @@ -1733,10 +2084,13 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, HeapTuple aggTuple; Form_pg_aggregate aggform; Oid aggtranstype; + AttrNumber initvalAttNo; AclResult aclresult; Oid transfn_oid, + invtransfn_oid, finalfn_oid; Expr *transfnexpr, + *invtransfnexpr, *finalfnexpr; Datum textInitVal; int i; @@ -1757,13 +2111,39 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple); /* + * Figure out whether we want to use the moving-aggregate implementation, + * and collect the right set of fields from the pg_attribute entry. + * + * If the frame head can't move, we don't need moving-aggregate code. Even + * if we'd like to use it, don't do so if the aggregate's arguments (and + * FILTER clause if any) contain any calls to volatile functions. + * Otherwise, the difference between restarting and not restarting the + * aggregation would be user-visible. + */ + if (OidIsValid(aggform->aggminvtransfn) && + !(winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING) && + !contain_volatile_functions((Node *) wfunc)) + { + peraggstate->transfn_oid = transfn_oid = aggform->aggmtransfn; + peraggstate->invtransfn_oid = invtransfn_oid = aggform->aggminvtransfn; + peraggstate->finalfn_oid = finalfn_oid = aggform->aggmfinalfn; + aggtranstype = aggform->aggmtranstype; + initvalAttNo = Anum_pg_aggregate_aggminitval; + } + else + { + peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn; + peraggstate->invtransfn_oid = invtransfn_oid = InvalidOid; + peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn; + aggtranstype = aggform->aggtranstype; + initvalAttNo = Anum_pg_aggregate_agginitval; + } + + /* * ExecInitWindowAgg already checked permission to call aggregate function * ... but we still need to check the component functions */ - peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn; - peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn; - /* Check that aggregate owner has permission to call component fns */ { HeapTuple procTuple; @@ -1783,6 +2163,17 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(transfn_oid)); InvokeFunctionExecuteHook(transfn_oid); + + if (OidIsValid(invtransfn_oid)) + { + aclresult = pg_proc_aclcheck(invtransfn_oid, aggOwner, + ACL_EXECUTE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_PROC, + get_func_name(invtransfn_oid)); + InvokeFunctionExecuteHook(invtransfn_oid); + } + if (OidIsValid(finalfn_oid)) { aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner, @@ -1796,7 +2187,7 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, /* resolve actual type of transition state, if polymorphic */ aggtranstype = resolve_aggregate_transtype(wfunc->winfnoid, - aggform->aggtranstype, + aggtranstype, inputTypes, numArguments); @@ -1810,13 +2201,21 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, wfunc->wintype, wfunc->inputcollid, transfn_oid, + invtransfn_oid, finalfn_oid, &transfnexpr, + &invtransfnexpr, &finalfnexpr); fmgr_info(transfn_oid, &peraggstate->transfn); fmgr_info_set_expr((Node *) transfnexpr, &peraggstate->transfn); + if (OidIsValid(invtransfn_oid)) + { + fmgr_info(invtransfn_oid, &peraggstate->invtransfn); + fmgr_info_set_expr((Node *) invtransfnexpr, &peraggstate->invtransfn); + } + if (OidIsValid(finalfn_oid)) { fmgr_info(finalfn_oid, &peraggstate->finalfn); @@ -1834,8 +2233,7 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, * initval is potentially null, so don't try to access it as a struct * field. Must do it the hard way with SysCacheGetAttr. */ - textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, - Anum_pg_aggregate_agginitval, + textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, initvalAttNo, &peraggstate->initValueIsNull); if (peraggstate->initValueIsNull) @@ -1848,7 +2246,8 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, * If the transfn is strict and the initval is NULL, make sure input type * and transtype are the same (or at least binary-compatible), so that * it's OK to use the first input value as the initial transValue. This - * should have been checked at agg definition time, but just in case... + * should have been checked at agg definition time, but we must check + * again in case the transfn's strictness property has been changed. */ if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull) { @@ -1860,6 +2259,44 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, wfunc->winfnoid))); } + /* + * Insist that forward and inverse transition functions have the same + * strictness setting. Allowing them to differ would require handling + * more special cases in advance_windowaggregate and + * advance_windowaggregate_base, for no discernible benefit. This should + * have been checked at agg definition time, but we must check again in + * case either function's strictness property has been changed. + */ + if (OidIsValid(invtransfn_oid) && + peraggstate->transfn.fn_strict != peraggstate->invtransfn.fn_strict) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("strictness of aggregate's forward and inverse transition functions must match"))); + + /* + * Moving aggregates use their own aggcontext. + * + * This is necessary because they might restart at different times, so we + * might never be able to reset the shared context otherwise. We can't + * make it the aggregates' responsibility to clean up after themselves, + * because strict aggregates must be restarted whenever we remove their + * last non-NULL input, which the aggregate won't be aware is happening. + * Also, just pfree()ing the transValue upon restarting wouldn't help, + * since we'd miss any indirectly referenced data. We could, in theory, + * make the memory allocation rules for moving aggregates different than + * they have historically been for plain aggregates, but that seems grotty + * and likely to lead to memory leaks. + */ + if (OidIsValid(invtransfn_oid)) + peraggstate->aggcontext = + AllocSetContextCreate(CurrentMemoryContext, + "WindowAgg_AggregatePrivate", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + else + peraggstate->aggcontext = winstate->aggcontext; + ReleaseSysCache(aggTuple); return peraggstate; |