diff options
Diffstat (limited to 'src/backend/executor/nodeWindowAgg.c')
-rw-r--r-- | src/backend/executor/nodeWindowAgg.c | 639 |
1 files changed, 538 insertions, 101 deletions
diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c index 0b558e59231..046637fb092 100644 --- a/src/backend/executor/nodeWindowAgg.c +++ b/src/backend/executor/nodeWindowAgg.c @@ -102,16 +102,18 @@ typedef struct WindowStatePerFuncData */ typedef struct WindowStatePerAggData { - /* Oids of transfer functions */ + /* Oids of transition functions */ Oid transfn_oid; + Oid invtransfn_oid; /* may be InvalidOid */ Oid finalfn_oid; /* may be InvalidOid */ /* - * fmgr lookup data for transfer functions --- only valid when + * fmgr lookup data for transition functions --- only valid when * corresponding oid is not InvalidOid. Note in particular that fn_strict * flags are kept here. */ FmgrInfo transfn; + FmgrInfo invtransfn; FmgrInfo finalfn; /* @@ -139,11 +141,17 @@ typedef struct WindowStatePerAggData int wfuncno; /* index of associated PerFuncData */ + /* Context holding transition value and possibly other subsidiary data */ + MemoryContext aggcontext; /* may be private, or winstate->aggcontext */ + /* Current transition value */ Datum transValue; /* current transition value */ bool transValueIsNull; - bool noTransValue; /* true if transValue not set yet */ + int64 transValueCount; /* number of currently-aggregated rows */ + + /* Data local to eval_windowaggregates() */ + bool restart; /* need to restart this agg in this cycle? */ } WindowStatePerAggData; static void initialize_windowaggregate(WindowAggState *winstate, @@ -152,6 +160,9 @@ static void initialize_windowaggregate(WindowAggState *winstate, static void advance_windowaggregate(WindowAggState *winstate, WindowStatePerFunc perfuncstate, WindowStatePerAgg peraggstate); +static bool advance_windowaggregate_base(WindowAggState *winstate, + WindowStatePerFunc perfuncstate, + WindowStatePerAgg peraggstate); static void finalize_windowaggregate(WindowAggState *winstate, WindowStatePerFunc perfuncstate, WindowStatePerAgg peraggstate, @@ -193,18 +204,27 @@ initialize_windowaggregate(WindowAggState *winstate, { MemoryContext oldContext; + /* + * If we're using a private aggcontext, we may reset it here. But if the + * context is shared, we don't know which other aggregates may still need + * it, so we must leave it to the caller to reset at an appropriate time. + */ + if (peraggstate->aggcontext != winstate->aggcontext) + MemoryContextResetAndDeleteChildren(peraggstate->aggcontext); + if (peraggstate->initValueIsNull) peraggstate->transValue = peraggstate->initValue; else { - oldContext = MemoryContextSwitchTo(winstate->aggcontext); + oldContext = MemoryContextSwitchTo(peraggstate->aggcontext); peraggstate->transValue = datumCopy(peraggstate->initValue, peraggstate->transtypeByVal, peraggstate->transtypeLen); MemoryContextSwitchTo(oldContext); } peraggstate->transValueIsNull = peraggstate->initValueIsNull; - peraggstate->noTransValue = peraggstate->initValueIsNull; + peraggstate->transValueCount = 0; + peraggstate->resultValue = (Datum) 0; peraggstate->resultValueIsNull = true; } @@ -258,7 +278,8 @@ advance_windowaggregate(WindowAggState *winstate, { /* * For a strict transfn, nothing happens when there's a NULL input; we - * just keep the prior transValue. + * just keep the prior transValue. Note transValueCount doesn't + * change either. */ for (i = 1; i <= numArguments; i++) { @@ -268,41 +289,47 @@ advance_windowaggregate(WindowAggState *winstate, return; } } - if (peraggstate->noTransValue) + + /* + * For strict transition functions with initial value NULL we use the + * first non-NULL input as the initial state. (We already checked + * that the agg's input type is binary-compatible with its transtype, + * so straight copy here is OK.) + * + * We must copy the datum into aggcontext if it is pass-by-ref. We do + * not need to pfree the old transValue, since it's NULL. + */ + if (peraggstate->transValueCount == 0 && peraggstate->transValueIsNull) { - /* - * transValue has not been initialized. This is the first non-NULL - * input value. We use it as the initial value for transValue. (We - * already checked that the agg's input type is binary-compatible - * with its transtype, so straight copy here is OK.) - * - * We must copy the datum into aggcontext if it is pass-by-ref. We - * do not need to pfree the old transValue, since it's NULL. - */ - MemoryContextSwitchTo(winstate->aggcontext); + MemoryContextSwitchTo(peraggstate->aggcontext); peraggstate->transValue = datumCopy(fcinfo->arg[1], peraggstate->transtypeByVal, peraggstate->transtypeLen); peraggstate->transValueIsNull = false; - peraggstate->noTransValue = false; + peraggstate->transValueCount = 1; MemoryContextSwitchTo(oldContext); return; } + if (peraggstate->transValueIsNull) { /* * Don't call a strict function with NULL inputs. Note it is * possible to get here despite the above tests, if the transfn is - * strict *and* returned a NULL on a prior cycle. If that happens - * we will propagate the NULL all the way to the end. + * strict *and* returned a NULL on a prior cycle. If that happens + * we will propagate the NULL all the way to the end. That can + * only happen if there's no inverse transition function, though, + * since we disallow transitions back to NULL when there is one. */ MemoryContextSwitchTo(oldContext); + Assert(!OidIsValid(peraggstate->invtransfn_oid)); return; } } /* - * OK to call the transition function + * OK to call the transition function. Set winstate->curaggcontext while + * calling it, for possible use by AggCheckCallContext. */ InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn), numArguments + 1, @@ -310,7 +337,26 @@ advance_windowaggregate(WindowAggState *winstate, (void *) winstate, NULL); fcinfo->arg[0] = peraggstate->transValue; fcinfo->argnull[0] = peraggstate->transValueIsNull; + winstate->curaggcontext = peraggstate->aggcontext; newVal = FunctionCallInvoke(fcinfo); + winstate->curaggcontext = NULL; + + /* + * Moving-aggregate transition functions must not return NULL, see + * advance_windowaggregate_base(). + */ + if (fcinfo->isnull && OidIsValid(peraggstate->invtransfn_oid)) + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("moving-aggregate transition function must not return NULL"))); + + /* + * We must track the number of rows included in transValue, since to + * remove the last input, advance_windowaggregate_base() musn't call the + * inverse transition function, but simply reset transValue back to its + * initial value. + */ + peraggstate->transValueCount++; /* * If pass-by-ref datatype, must copy the new value into aggcontext and @@ -322,7 +368,161 @@ advance_windowaggregate(WindowAggState *winstate, { if (!fcinfo->isnull) { - MemoryContextSwitchTo(winstate->aggcontext); + MemoryContextSwitchTo(peraggstate->aggcontext); + newVal = datumCopy(newVal, + peraggstate->transtypeByVal, + peraggstate->transtypeLen); + } + if (!peraggstate->transValueIsNull) + pfree(DatumGetPointer(peraggstate->transValue)); + } + + MemoryContextSwitchTo(oldContext); + peraggstate->transValue = newVal; + peraggstate->transValueIsNull = fcinfo->isnull; +} + +/* + * advance_windowaggregate_base + * Remove the oldest tuple from an aggregation. + * + * This is very much like advance_windowaggregate, except that we will call + * the inverse transition function (which caller must have checked is + * available). + * + * Returns true if we successfully removed the current row from this + * aggregate, false if not (in the latter case, caller is responsible + * for cleaning up by restarting the aggregation). + */ +static bool +advance_windowaggregate_base(WindowAggState *winstate, + WindowStatePerFunc perfuncstate, + WindowStatePerAgg peraggstate) +{ + WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate; + int numArguments = perfuncstate->numArguments; + FunctionCallInfoData fcinfodata; + FunctionCallInfo fcinfo = &fcinfodata; + Datum newVal; + ListCell *arg; + int i; + MemoryContext oldContext; + ExprContext *econtext = winstate->tmpcontext; + ExprState *filter = wfuncstate->aggfilter; + + oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); + + /* Skip anything FILTERed out */ + if (filter) + { + bool isnull; + Datum res = ExecEvalExpr(filter, econtext, &isnull, NULL); + + if (isnull || !DatumGetBool(res)) + { + MemoryContextSwitchTo(oldContext); + return true; + } + } + + /* We start from 1, since the 0th arg will be the transition value */ + i = 1; + foreach(arg, wfuncstate->args) + { + ExprState *argstate = (ExprState *) lfirst(arg); + + fcinfo->arg[i] = ExecEvalExpr(argstate, econtext, + &fcinfo->argnull[i], NULL); + i++; + } + + if (peraggstate->invtransfn.fn_strict) + { + /* + * For a strict (inv)transfn, nothing happens when there's a NULL + * input; we just keep the prior transValue. Note transValueCount + * doesn't change either. + */ + for (i = 1; i <= numArguments; i++) + { + if (fcinfo->argnull[i]) + { + MemoryContextSwitchTo(oldContext); + return true; + } + } + } + + /* There should still be an added but not yet removed value */ + Assert(peraggstate->transValueCount > 0); + + /* + * In moving-aggregate mode, the state must never be NULL, except possibly + * before any rows have been aggregated (which is surely not the case at + * this point). This restriction allows us to interpret a NULL result + * from the inverse function as meaning "sorry, can't do an inverse + * transition in this case". We already checked this in + * advance_windowaggregate, but just for safety, check again. + */ + if (peraggstate->transValueIsNull) + elog(ERROR, "aggregate transition value is NULL before inverse transition"); + + /* + * We mustn't use the inverse transition function to remove the last + * input. Doing so would yield a non-NULL state, whereas we should be in + * the initial state afterwards which may very well be NULL. So instead, + * we simply re-initialize the aggregate in this case. + */ + if (peraggstate->transValueCount == 1) + { + MemoryContextSwitchTo(oldContext); + initialize_windowaggregate(winstate, + &winstate->perfunc[peraggstate->wfuncno], + peraggstate); + return true; + } + + /* + * OK to call the inverse transition function. Set + * winstate->curaggcontext while calling it, for possible use by + * AggCheckCallContext. + */ + InitFunctionCallInfoData(*fcinfo, &(peraggstate->invtransfn), + numArguments + 1, + perfuncstate->winCollation, + (void *) winstate, NULL); + fcinfo->arg[0] = peraggstate->transValue; + fcinfo->argnull[0] = peraggstate->transValueIsNull; + winstate->curaggcontext = peraggstate->aggcontext; + newVal = FunctionCallInvoke(fcinfo); + winstate->curaggcontext = NULL; + + /* + * If the function returns NULL, report failure, forcing a restart. + */ + if (fcinfo->isnull) + { + MemoryContextSwitchTo(oldContext); + return false; + } + + /* Update number of rows included in transValue */ + peraggstate->transValueCount--; + + /* + * If pass-by-ref datatype, must copy the new value into aggcontext and + * pfree the prior transValue. But if invtransfn returned a pointer to + * its first input, we don't need to do anything. + * + * Note: the checks for null values here will never fire, but it seems + * best to have this stanza look just like advance_windowaggregate. + */ + if (!peraggstate->transtypeByVal && + DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue)) + { + if (!fcinfo->isnull) + { + MemoryContextSwitchTo(peraggstate->aggcontext); newVal = datumCopy(newVal, peraggstate->transtypeByVal, peraggstate->transtypeLen); @@ -334,6 +534,8 @@ advance_windowaggregate(WindowAggState *winstate, MemoryContextSwitchTo(oldContext); peraggstate->transValue = newVal; peraggstate->transValueIsNull = fcinfo->isnull; + + return true; } /* @@ -370,7 +572,9 @@ finalize_windowaggregate(WindowAggState *winstate, } else { + winstate->curaggcontext = peraggstate->aggcontext; *result = FunctionCallInvoke(&fcinfo); + winstate->curaggcontext = NULL; *isnull = fcinfo.isnull; } } @@ -396,7 +600,9 @@ finalize_windowaggregate(WindowAggState *winstate, * eval_windowaggregates * evaluate plain aggregates being used as window functions * - * Much of this is duplicated from nodeAgg.c. But NOTE that we expect to be + * This differs from nodeAgg.c in two ways. First, if the window's frame + * start position moves, we use the inverse transition function (if it exists) + * to remove rows from the transition value. And second, we expect to be * able to call aggregate final functions repeatedly after aggregating more * data onto the same transition value. This is not a behavior required by * nodeAgg.c. @@ -406,12 +612,15 @@ eval_windowaggregates(WindowAggState *winstate) { WindowStatePerAgg peraggstate; int wfuncno, - numaggs; - int i; + numaggs, + numaggs_restart, + i; + int64 aggregatedupto_nonrestarted; MemoryContext oldContext; ExprContext *econtext; WindowObject agg_winobj; TupleTableSlot *agg_row_slot; + TupleTableSlot *temp_slot; numaggs = winstate->numaggs; if (numaggs == 0) @@ -421,6 +630,7 @@ eval_windowaggregates(WindowAggState *winstate) econtext = winstate->ss.ps.ps_ExprContext; agg_winobj = winstate->agg_winobj; agg_row_slot = winstate->agg_row_slot; + temp_slot = winstate->temp_slot_1; /* * Currently, we support only a subset of the SQL-standard window framing @@ -438,9 +648,17 @@ eval_windowaggregates(WindowAggState *winstate) * damage the running transition value, but we have the same assumption in * nodeAgg.c too (when it rescans an existing hash table). * - * For other frame start rules, we discard the aggregate state and re-run - * the aggregates whenever the frame head row moves. We can still - * optimize as above whenever successive rows share the same frame head. + * If the frame start does sometimes move, we can still optimize as above + * whenever successive rows share the same frame head, but if the frame + * head moves beyond the previous head we try to remove those rows using + * the aggregate's inverse transition function. This function restores + * the aggregate's current state to what it would be if the removed row + * had never been aggregated in the first place. Inverse transition + * functions may optionally return NULL, indicating that the function was + * unable to remove the tuple from aggregation. If this happens, or if + * the aggregate doesn't have an inverse transition function at all, we + * must perform the aggregation all over again for all tuples within the + * new frame boundaries. * * In many common cases, multiple rows share the same frame and hence the * same aggregate value. (In particular, if there's no ORDER BY in a RANGE @@ -452,75 +670,195 @@ eval_windowaggregates(WindowAggState *winstate) * 'aggregatedupto' keeps track of the first row that has not yet been * accumulated into the aggregate transition values. Whenever we start a * new peer group, we accumulate forward to the end of the peer group. - * - * TODO: Rerunning aggregates from the frame start can be pretty slow. For - * some aggregates like SUM and COUNT we could avoid that by implementing - * a "negative transition function" that would be called for each row as - * it exits the frame. We'd have to think about avoiding recalculation of - * volatile arguments of aggregate functions, too. */ /* * First, update the frame head position. + * + * The frame head should never move backwards, and the code below wouldn't + * cope if it did, so for safety we complain if it does. */ - update_frameheadpos(agg_winobj, winstate->temp_slot_1); + update_frameheadpos(agg_winobj, temp_slot); + if (winstate->frameheadpos < winstate->aggregatedbase) + elog(ERROR, "window frame head moved backward"); /* - * Initialize aggregates on first call for partition, or if the frame head - * position moved since last time. + * If the frame didn't change compared to the previous row, we can re-use + * the result values that were previously saved at the bottom of this + * function. Since we don't know the current frame's end yet, this is not + * possible to check for fully. But if the frame end mode is UNBOUNDED + * FOLLOWING or CURRENT ROW, and the current row lies within the previous + * row's frame, then the two frames' ends must coincide. Note that on the + * first row aggregatedbase == aggregatedupto, meaning this test must + * fail, so we don't need to check the "there was no previous row" case + * explicitly here. */ - if (winstate->currentpos == 0 || - winstate->frameheadpos != winstate->aggregatedbase) + if (winstate->aggregatedbase == winstate->frameheadpos && + (winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING | + FRAMEOPTION_END_CURRENT_ROW)) && + winstate->aggregatedbase <= winstate->currentpos && + winstate->aggregatedupto > winstate->currentpos) { - /* - * Discard transient aggregate values - */ - MemoryContextResetAndDeleteChildren(winstate->aggcontext); - for (i = 0; i < numaggs; i++) { peraggstate = &winstate->peragg[i]; wfuncno = peraggstate->wfuncno; - initialize_windowaggregate(winstate, - &winstate->perfunc[wfuncno], - peraggstate); + econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue; + econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull; } + return; + } + /*---------- + * Initialize restart flags. + * + * We restart the aggregation: + * - if we're processing the first row in the partition, or + * - if the frame's head moved and we cannot use an inverse + * transition function, or + * - if the new frame doesn't overlap the old one + * + * Note that we don't strictly need to restart in the last case, but if + * we're going to remove all rows from the aggregation anyway, a restart + * surely is faster. + *---------- + */ + numaggs_restart = 0; + for (i = 0; i < numaggs; i++) + { + peraggstate = &winstate->peragg[i]; + if (winstate->currentpos == 0 || + (winstate->aggregatedbase != winstate->frameheadpos && + !OidIsValid(peraggstate->invtransfn_oid)) || + winstate->aggregatedupto <= winstate->frameheadpos) + { + peraggstate->restart = true; + numaggs_restart++; + } + else + peraggstate->restart = false; + } + + /* + * If we have any possibly-moving aggregates, attempt to advance + * aggregatedbase to match the frame's head by removing input rows that + * fell off the top of the frame from the aggregations. This can fail, + * i.e. advance_windowaggregate_base() can return false, in which case + * we'll restart that aggregate below. + */ + while (numaggs_restart < numaggs && + winstate->aggregatedbase < winstate->frameheadpos) + { /* - * If we created a mark pointer for aggregates, keep it pushed up to - * frame head, so that tuplestore can discard unnecessary rows. + * Fetch the next tuple of those being removed. This should never fail + * as we should have been here before. */ - if (agg_winobj->markptr >= 0) - WinSetMarkPosition(agg_winobj, winstate->frameheadpos); + if (!window_gettupleslot(agg_winobj, winstate->aggregatedbase, + temp_slot)) + elog(ERROR, "could not re-fetch previously fetched frame row"); + + /* Set tuple context for evaluation of aggregate arguments */ + winstate->tmpcontext->ecxt_outertuple = temp_slot; /* - * Initialize for loop below + * Perform the inverse transition for each aggregate function in the + * window, unless it has already been marked as needing a restart. */ - ExecClearTuple(agg_row_slot); - winstate->aggregatedbase = winstate->frameheadpos; - winstate->aggregatedupto = winstate->frameheadpos; + for (i = 0; i < numaggs; i++) + { + bool ok; + + peraggstate = &winstate->peragg[i]; + if (peraggstate->restart) + continue; + + wfuncno = peraggstate->wfuncno; + ok = advance_windowaggregate_base(winstate, + &winstate->perfunc[wfuncno], + peraggstate); + if (!ok) + { + /* Inverse transition function has failed, must restart */ + peraggstate->restart = true; + numaggs_restart++; + } + } + + /* Reset per-input-tuple context after each tuple */ + ResetExprContext(winstate->tmpcontext); + + /* And advance the aggregated-row state */ + winstate->aggregatedbase++; + ExecClearTuple(temp_slot); } /* - * In UNBOUNDED_FOLLOWING mode, we don't have to recalculate aggregates - * except when the frame head moves. In END_CURRENT_ROW mode, we only - * have to recalculate when the frame head moves or currentpos has - * advanced past the place we'd aggregated up to. Check for these cases - * and if so, reuse the saved result values. + * If we successfully advanced the base rows of all the aggregates, + * aggregatedbase now equals frameheadpos; but if we failed for any, we + * must forcibly update aggregatedbase. */ - if ((winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING | - FRAMEOPTION_END_CURRENT_ROW)) && - winstate->aggregatedbase <= winstate->currentpos && - winstate->aggregatedupto > winstate->currentpos) + winstate->aggregatedbase = winstate->frameheadpos; + + /* + * If we created a mark pointer for aggregates, keep it pushed up to frame + * head, so that tuplestore can discard unnecessary rows. + */ + if (agg_winobj->markptr >= 0) + WinSetMarkPosition(agg_winobj, winstate->frameheadpos); + + /* + * Now restart the aggregates that require it. + * + * We assume that aggregates using the shared context always restart if + * *any* aggregate restarts, and we may thus clean up the shared + * aggcontext if that is the case. Private aggcontexts are reset by + * initialize_windowaggregate() if their owning aggregate restarts. If we + * aren't restarting an aggregate, we need to free any previously saved + * result for it, else we'll leak memory. + */ + if (numaggs_restart > 0) + MemoryContextResetAndDeleteChildren(winstate->aggcontext); + for (i = 0; i < numaggs; i++) { - for (i = 0; i < numaggs; i++) + peraggstate = &winstate->peragg[i]; + + /* Aggregates using the shared ctx must restart if *any* agg does */ + Assert(peraggstate->aggcontext != winstate->aggcontext || + numaggs_restart == 0 || + peraggstate->restart); + + if (peraggstate->restart) { - peraggstate = &winstate->peragg[i]; wfuncno = peraggstate->wfuncno; - econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue; - econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull; + initialize_windowaggregate(winstate, + &winstate->perfunc[wfuncno], + peraggstate); + } + else if (!peraggstate->resultValueIsNull) + { + if (!peraggstate->resulttypeByVal) + pfree(DatumGetPointer(peraggstate->resultValue)); + peraggstate->resultValue = (Datum) 0; + peraggstate->resultValueIsNull = true; } - return; + } + + /* + * Non-restarted aggregates now contain the rows between aggregatedbase + * (i.e., frameheadpos) and aggregatedupto, while restarted aggregates + * contain no rows. If there are any restarted aggregates, we must thus + * begin aggregating anew at frameheadpos, otherwise we may simply + * continue at aggregatedupto. We must remember the old value of + * aggregatedupto to know how long to skip advancing non-restarted + * aggregates. If we modify aggregatedupto, we must also clear + * agg_row_slot, per the loop invariant below. + */ + aggregatedupto_nonrestarted = winstate->aggregatedupto; + if (numaggs_restart > 0 && + winstate->aggregatedupto != winstate->frameheadpos) + { + winstate->aggregatedupto = winstate->frameheadpos; + ExecClearTuple(agg_row_slot); } /* @@ -551,6 +889,12 @@ eval_windowaggregates(WindowAggState *winstate) for (i = 0; i < numaggs; i++) { peraggstate = &winstate->peragg[i]; + + /* Non-restarted aggs skip until aggregatedupto_nonrestarted */ + if (!peraggstate->restart && + winstate->aggregatedupto < aggregatedupto_nonrestarted) + continue; + wfuncno = peraggstate->wfuncno; advance_windowaggregate(winstate, &winstate->perfunc[wfuncno], @@ -565,6 +909,9 @@ eval_windowaggregates(WindowAggState *winstate) ExecClearTuple(agg_row_slot); } + /* The frame's end is not supposed to move backwards, ever */ + Assert(aggregatedupto_nonrestarted <= winstate->aggregatedupto); + /* * finalize aggregates and fill result/isnull fields. */ @@ -589,28 +936,14 @@ eval_windowaggregates(WindowAggState *winstate) * advance that the next row can't possibly share the same frame. Is * it worth detecting that and skipping this code? */ - if (!peraggstate->resulttypeByVal) + if (!peraggstate->resulttypeByVal && !*isnull) { - /* - * clear old resultValue in order not to leak memory. (Note: the - * new result can't possibly be the same datum as old resultValue, - * because we never passed it to the trans function.) - */ - if (!peraggstate->resultValueIsNull) - pfree(DatumGetPointer(peraggstate->resultValue)); - - /* - * If pass-by-ref, copy it into our aggregate context. - */ - if (!*isnull) - { - oldContext = MemoryContextSwitchTo(winstate->aggcontext); - peraggstate->resultValue = - datumCopy(*result, - peraggstate->resulttypeByVal, - peraggstate->resulttypeLen); - MemoryContextSwitchTo(oldContext); - } + oldContext = MemoryContextSwitchTo(peraggstate->aggcontext); + peraggstate->resultValue = + datumCopy(*result, + peraggstate->resulttypeByVal, + peraggstate->resulttypeLen); + MemoryContextSwitchTo(oldContext); } else { @@ -650,6 +983,8 @@ eval_windowfunction(WindowAggState *winstate, WindowStatePerFunc perfuncstate, (void *) perfuncstate->winobj, NULL); /* Just in case, make all the regular argument slots be null */ memset(fcinfo.argnull, true, perfuncstate->numArguments); + /* Window functions don't have a current aggregate context, either */ + winstate->curaggcontext = NULL; *result = FunctionCallInvoke(&fcinfo); *isnull = fcinfo.isnull; @@ -870,6 +1205,11 @@ release_partition(WindowAggState *winstate) */ MemoryContextResetAndDeleteChildren(winstate->partcontext); MemoryContextResetAndDeleteChildren(winstate->aggcontext); + for (i = 0; i < winstate->numaggs; i++) + { + if (winstate->peragg[i].aggcontext != winstate->aggcontext) + MemoryContextResetAndDeleteChildren(winstate->peragg[i].aggcontext); + } if (winstate->buffer) tuplestore_end(winstate->buffer); @@ -1450,7 +1790,12 @@ ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags) ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); - /* Create mid-lived context for aggregate trans values etc */ + /* + * Create mid-lived context for aggregate trans values etc. + * + * Note that moving aggregates each use their own private context, not + * this one. + */ winstate->aggcontext = AllocSetContextCreate(CurrentMemoryContext, "WindowAgg_Aggregates", @@ -1657,12 +2002,10 @@ void ExecEndWindowAgg(WindowAggState *node) { PlanState *outerPlan; + int i; release_partition(node); - pfree(node->perfunc); - pfree(node->peragg); - ExecClearTuple(node->ss.ss_ScanTupleSlot); ExecClearTuple(node->first_part_slot); ExecClearTuple(node->agg_row_slot); @@ -1676,9 +2019,17 @@ ExecEndWindowAgg(WindowAggState *node) node->ss.ps.ps_ExprContext = node->tmpcontext; ExecFreeExprContext(&node->ss.ps); + for (i = 0; i < node->numaggs; i++) + { + if (node->peragg[i].aggcontext != node->aggcontext) + MemoryContextDelete(node->peragg[i].aggcontext); + } MemoryContextDelete(node->partcontext); MemoryContextDelete(node->aggcontext); + pfree(node->perfunc); + pfree(node->peragg); + outerPlan = outerPlanState(node); ExecEndNode(outerPlan); } @@ -1733,10 +2084,13 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, HeapTuple aggTuple; Form_pg_aggregate aggform; Oid aggtranstype; + AttrNumber initvalAttNo; AclResult aclresult; Oid transfn_oid, + invtransfn_oid, finalfn_oid; Expr *transfnexpr, + *invtransfnexpr, *finalfnexpr; Datum textInitVal; int i; @@ -1757,13 +2111,39 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple); /* + * Figure out whether we want to use the moving-aggregate implementation, + * and collect the right set of fields from the pg_attribute entry. + * + * If the frame head can't move, we don't need moving-aggregate code. Even + * if we'd like to use it, don't do so if the aggregate's arguments (and + * FILTER clause if any) contain any calls to volatile functions. + * Otherwise, the difference between restarting and not restarting the + * aggregation would be user-visible. + */ + if (OidIsValid(aggform->aggminvtransfn) && + !(winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING) && + !contain_volatile_functions((Node *) wfunc)) + { + peraggstate->transfn_oid = transfn_oid = aggform->aggmtransfn; + peraggstate->invtransfn_oid = invtransfn_oid = aggform->aggminvtransfn; + peraggstate->finalfn_oid = finalfn_oid = aggform->aggmfinalfn; + aggtranstype = aggform->aggmtranstype; + initvalAttNo = Anum_pg_aggregate_aggminitval; + } + else + { + peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn; + peraggstate->invtransfn_oid = invtransfn_oid = InvalidOid; + peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn; + aggtranstype = aggform->aggtranstype; + initvalAttNo = Anum_pg_aggregate_agginitval; + } + + /* * ExecInitWindowAgg already checked permission to call aggregate function * ... but we still need to check the component functions */ - peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn; - peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn; - /* Check that aggregate owner has permission to call component fns */ { HeapTuple procTuple; @@ -1783,6 +2163,17 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(transfn_oid)); InvokeFunctionExecuteHook(transfn_oid); + + if (OidIsValid(invtransfn_oid)) + { + aclresult = pg_proc_aclcheck(invtransfn_oid, aggOwner, + ACL_EXECUTE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_PROC, + get_func_name(invtransfn_oid)); + InvokeFunctionExecuteHook(invtransfn_oid); + } + if (OidIsValid(finalfn_oid)) { aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner, @@ -1796,7 +2187,7 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, /* resolve actual type of transition state, if polymorphic */ aggtranstype = resolve_aggregate_transtype(wfunc->winfnoid, - aggform->aggtranstype, + aggtranstype, inputTypes, numArguments); @@ -1810,13 +2201,21 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, wfunc->wintype, wfunc->inputcollid, transfn_oid, + invtransfn_oid, finalfn_oid, &transfnexpr, + &invtransfnexpr, &finalfnexpr); fmgr_info(transfn_oid, &peraggstate->transfn); fmgr_info_set_expr((Node *) transfnexpr, &peraggstate->transfn); + if (OidIsValid(invtransfn_oid)) + { + fmgr_info(invtransfn_oid, &peraggstate->invtransfn); + fmgr_info_set_expr((Node *) invtransfnexpr, &peraggstate->invtransfn); + } + if (OidIsValid(finalfn_oid)) { fmgr_info(finalfn_oid, &peraggstate->finalfn); @@ -1834,8 +2233,7 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, * initval is potentially null, so don't try to access it as a struct * field. Must do it the hard way with SysCacheGetAttr. */ - textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, - Anum_pg_aggregate_agginitval, + textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, initvalAttNo, &peraggstate->initValueIsNull); if (peraggstate->initValueIsNull) @@ -1848,7 +2246,8 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, * If the transfn is strict and the initval is NULL, make sure input type * and transtype are the same (or at least binary-compatible), so that * it's OK to use the first input value as the initial transValue. This - * should have been checked at agg definition time, but just in case... + * should have been checked at agg definition time, but we must check + * again in case the transfn's strictness property has been changed. */ if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull) { @@ -1860,6 +2259,44 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, wfunc->winfnoid))); } + /* + * Insist that forward and inverse transition functions have the same + * strictness setting. Allowing them to differ would require handling + * more special cases in advance_windowaggregate and + * advance_windowaggregate_base, for no discernible benefit. This should + * have been checked at agg definition time, but we must check again in + * case either function's strictness property has been changed. + */ + if (OidIsValid(invtransfn_oid) && + peraggstate->transfn.fn_strict != peraggstate->invtransfn.fn_strict) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("strictness of aggregate's forward and inverse transition functions must match"))); + + /* + * Moving aggregates use their own aggcontext. + * + * This is necessary because they might restart at different times, so we + * might never be able to reset the shared context otherwise. We can't + * make it the aggregates' responsibility to clean up after themselves, + * because strict aggregates must be restarted whenever we remove their + * last non-NULL input, which the aggregate won't be aware is happening. + * Also, just pfree()ing the transValue upon restarting wouldn't help, + * since we'd miss any indirectly referenced data. We could, in theory, + * make the memory allocation rules for moving aggregates different than + * they have historically been for plain aggregates, but that seems grotty + * and likely to lead to memory leaks. + */ + if (OidIsValid(invtransfn_oid)) + peraggstate->aggcontext = + AllocSetContextCreate(CurrentMemoryContext, + "WindowAgg_AggregatePrivate", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + else + peraggstate->aggcontext = winstate->aggcontext; + ReleaseSysCache(aggTuple); return peraggstate; |