aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor')
-rw-r--r--src/backend/executor/nodeAgg.c4
-rw-r--r--src/backend/executor/nodeWindowAgg.c648
2 files changed, 528 insertions, 124 deletions
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index ea722f1ee3b..9f748ca6c0a 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -71,7 +71,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/executor/nodeAgg.c,v 1.172 2010/02/08 20:39:51 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/executor/nodeAgg.c,v 1.173 2010/02/12 17:33:19 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -1999,7 +1999,7 @@ AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext *aggcontext)
if (fcinfo->context && IsA(fcinfo->context, WindowAggState))
{
if (aggcontext)
- *aggcontext = ((WindowAggState *) fcinfo->context)->wincontext;
+ *aggcontext = ((WindowAggState *) fcinfo->context)->aggcontext;
return AGG_CONTEXT_WINDOW;
}
diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c
index 0b90992b80c..c2c0af3cde2 100644
--- a/src/backend/executor/nodeWindowAgg.c
+++ b/src/backend/executor/nodeWindowAgg.c
@@ -27,7 +27,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/executor/nodeWindowAgg.c,v 1.9 2010/01/02 16:57:45 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/executor/nodeWindowAgg.c,v 1.10 2010/02/12 17:33:19 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -165,6 +165,7 @@ static void release_partition(WindowAggState *winstate);
static bool row_is_in_frame(WindowAggState *winstate, int64 pos,
TupleTableSlot *slot);
+static void update_frameheadpos(WindowObject winobj, TupleTableSlot *slot);
static void update_frametailpos(WindowObject winobj, TupleTableSlot *slot);
static WindowStatePerAggData *initialize_peragg(WindowAggState *winstate,
@@ -193,7 +194,7 @@ initialize_windowaggregate(WindowAggState *winstate,
peraggstate->transValue = peraggstate->initValue;
else
{
- oldContext = MemoryContextSwitchTo(winstate->wincontext);
+ oldContext = MemoryContextSwitchTo(winstate->aggcontext);
peraggstate->transValue = datumCopy(peraggstate->initValue,
peraggstate->transtypeByVal,
peraggstate->transtypeLen);
@@ -258,10 +259,10 @@ advance_windowaggregate(WindowAggState *winstate,
* already checked that the agg's input type is binary-compatible
* with its transtype, so straight copy here is OK.)
*
- * We must copy the datum into wincontext if it is pass-by-ref. We
+ * We must copy the datum into aggcontext if it is pass-by-ref. We
* do not need to pfree the old transValue, since it's NULL.
*/
- MemoryContextSwitchTo(winstate->wincontext);
+ MemoryContextSwitchTo(winstate->aggcontext);
peraggstate->transValue = datumCopy(fcinfo->arg[1],
peraggstate->transtypeByVal,
peraggstate->transtypeLen);
@@ -294,7 +295,7 @@ advance_windowaggregate(WindowAggState *winstate,
newVal = FunctionCallInvoke(fcinfo);
/*
- * If pass-by-ref datatype, must copy the new value into wincontext and
+ * If pass-by-ref datatype, must copy the new value into aggcontext and
* pfree the prior transValue. But if transfn returned a pointer to its
* first input, we don't need to do anything.
*/
@@ -303,7 +304,7 @@ advance_windowaggregate(WindowAggState *winstate,
{
if (!fcinfo->isnull)
{
- MemoryContextSwitchTo(winstate->wincontext);
+ MemoryContextSwitchTo(winstate->aggcontext);
newVal = datumCopy(newVal,
peraggstate->transtypeByVal,
peraggstate->transtypeLen);
@@ -390,6 +391,7 @@ eval_windowaggregates(WindowAggState *winstate)
int i;
MemoryContext oldContext;
ExprContext *econtext;
+ WindowObject agg_winobj;
TupleTableSlot *agg_row_slot;
numaggs = winstate->numaggs;
@@ -398,10 +400,14 @@ eval_windowaggregates(WindowAggState *winstate)
/* final output execution is in ps_ExprContext */
econtext = winstate->ss.ps.ps_ExprContext;
+ agg_winobj = winstate->agg_winobj;
+ agg_row_slot = winstate->agg_row_slot;
/*
* Currently, we support only a subset of the SQL-standard window framing
- * rules. In all the supported cases, the window frame always consists of
+ * rules.
+ *
+ * If the frame start is UNBOUNDED_PRECEDING, the window frame consists of
* a contiguous group of rows extending forward from the start of the
* partition, and rows only enter the frame, never exit it, as the current
* row advances forward. This makes it possible to use an incremental
@@ -413,6 +419,10 @@ eval_windowaggregates(WindowAggState *winstate)
* damage the running transition value, but we have the same assumption
* in nodeAgg.c too (when it rescans an existing hash table).
*
+ * For other frame start rules, we discard the aggregate state and re-run
+ * the aggregates whenever the frame head row moves. We can still
+ * optimize as above whenever successive rows share the same frame head.
+ *
* In many common cases, multiple rows share the same frame and hence the
* same aggregate value. (In particular, if there's no ORDER BY in a RANGE
* window, then all rows are peers and so they all have window frame equal
@@ -424,63 +434,90 @@ eval_windowaggregates(WindowAggState *winstate)
* accumulated into the aggregate transition values. Whenever we start a
* new peer group, we accumulate forward to the end of the peer group.
*
- * TODO: In the future, we should implement the full SQL-standard set of
- * framing rules. We could implement the other cases by recalculating the
- * aggregates whenever a row exits the frame. That would be pretty slow,
- * though. For aggregates like SUM and COUNT we could implement a
- * "negative transition function" that would be called for each row as it
- * exits the frame. We'd have to think about avoiding recalculation of
- * volatile arguments of aggregate functions, too.
+ * TODO: Rerunning aggregates from the frame start can be pretty slow.
+ * For some aggregates like SUM and COUNT we could avoid that by
+ * implementing a "negative transition function" that would be called for
+ * each row as it exits the frame. We'd have to think about avoiding
+ * recalculation of volatile arguments of aggregate functions, too.
*/
/*
- * If we've already aggregated up through current row, reuse the saved
- * result values. NOTE: this test works for the currently supported
- * framing rules, but will need fixing when more are added.
+ * First, update the frame head position.
*/
- if (winstate->aggregatedupto > winstate->currentpos)
+ update_frameheadpos(agg_winobj, winstate->temp_slot_1);
+
+ /*
+ * Initialize aggregates on first call for partition, or if the frame
+ * head position moved since last time.
+ */
+ if (winstate->currentpos == 0 ||
+ winstate->frameheadpos != winstate->aggregatedbase)
{
+ /*
+ * Discard transient aggregate values
+ */
+ MemoryContextResetAndDeleteChildren(winstate->aggcontext);
+
for (i = 0; i < numaggs; i++)
{
peraggstate = &winstate->peragg[i];
wfuncno = peraggstate->wfuncno;
- econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue;
- econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull;
+ initialize_windowaggregate(winstate,
+ &winstate->perfunc[wfuncno],
+ peraggstate);
}
- return;
+
+ /*
+ * If we created a mark pointer for aggregates, keep it pushed up
+ * to frame head, so that tuplestore can discard unnecessary rows.
+ */
+ if (agg_winobj->markptr >= 0)
+ WinSetMarkPosition(agg_winobj, winstate->frameheadpos);
+
+ /*
+ * Initialize for loop below
+ */
+ ExecClearTuple(agg_row_slot);
+ winstate->aggregatedbase = winstate->frameheadpos;
+ winstate->aggregatedupto = winstate->frameheadpos;
}
- /* Initialize aggregates on first call for partition */
- if (winstate->currentpos == 0)
+ /*
+ * In UNBOUNDED_FOLLOWING mode, we don't have to recalculate aggregates
+ * except when the frame head moves. In END_CURRENT_ROW mode, we only
+ * have to recalculate when the frame head moves or currentpos has advanced
+ * past the place we'd aggregated up to. Check for these cases and if
+ * so, reuse the saved result values.
+ */
+ if ((winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING |
+ FRAMEOPTION_END_CURRENT_ROW)) &&
+ winstate->aggregatedbase <= winstate->currentpos &&
+ winstate->aggregatedupto > winstate->currentpos)
{
for (i = 0; i < numaggs; i++)
{
peraggstate = &winstate->peragg[i];
wfuncno = peraggstate->wfuncno;
- initialize_windowaggregate(winstate,
- &winstate->perfunc[wfuncno],
- peraggstate);
+ econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue;
+ econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull;
}
+ return;
}
/*
* Advance until we reach a row not in frame (or end of partition).
*
* Note the loop invariant: agg_row_slot is either empty or holds the row
- * at position aggregatedupto. The agg_ptr read pointer must always point
- * to the next row to read into agg_row_slot.
+ * at position aggregatedupto. We advance aggregatedupto after processing
+ * a row.
*/
- agg_row_slot = winstate->agg_row_slot;
for (;;)
{
/* Fetch next row if we didn't already */
if (TupIsNull(agg_row_slot))
{
- spool_tuples(winstate, winstate->aggregatedupto);
- tuplestore_select_read_pointer(winstate->buffer,
- winstate->agg_ptr);
- if (!tuplestore_gettupleslot(winstate->buffer, true, true,
- agg_row_slot))
+ if (!window_gettupleslot(agg_winobj, winstate->aggregatedupto,
+ agg_row_slot))
break; /* must be end of partition */
}
@@ -544,11 +581,11 @@ eval_windowaggregates(WindowAggState *winstate)
pfree(DatumGetPointer(peraggstate->resultValue));
/*
- * If pass-by-ref, copy it into our global context.
+ * If pass-by-ref, copy it into our aggregate context.
*/
if (!*isnull)
{
- oldContext = MemoryContextSwitchTo(winstate->wincontext);
+ oldContext = MemoryContextSwitchTo(winstate->aggcontext);
peraggstate->resultValue =
datumCopy(*result,
peraggstate->resulttypeByVal,
@@ -624,11 +661,12 @@ begin_partition(WindowAggState *winstate)
int i;
winstate->partition_spooled = false;
+ winstate->framehead_valid = false;
winstate->frametail_valid = false;
winstate->spooled_rows = 0;
winstate->currentpos = 0;
+ winstate->frameheadpos = 0;
winstate->frametailpos = -1;
- winstate->aggregatedupto = 0;
ExecClearTuple(winstate->agg_row_slot);
/*
@@ -654,18 +692,39 @@ begin_partition(WindowAggState *winstate)
winstate->buffer = tuplestore_begin_heap(false, false, work_mem);
/*
- * Set up read pointers for the tuplestore. The current and agg pointers
- * don't need BACKWARD capability, but the per-window-function read
- * pointers do.
+ * Set up read pointers for the tuplestore. The current pointer doesn't
+ * need BACKWARD capability, but the per-window-function read pointers do,
+ * and the aggregate pointer does if frame start is movable.
*/
winstate->current_ptr = 0; /* read pointer 0 is pre-allocated */
/* reset default REWIND capability bit for current ptr */
tuplestore_set_eflags(winstate->buffer, 0);
- /* create a read pointer for aggregates, if needed */
+ /* create read pointers for aggregates, if needed */
if (winstate->numaggs > 0)
- winstate->agg_ptr = tuplestore_alloc_read_pointer(winstate->buffer, 0);
+ {
+ WindowObject agg_winobj = winstate->agg_winobj;
+ int readptr_flags = 0;
+
+ /* If the frame head is potentially movable ... */
+ if (!(winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
+ {
+ /* ... create a mark pointer to track the frame head */
+ agg_winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer, 0);
+ /* and the read pointer will need BACKWARD capability */
+ readptr_flags |= EXEC_FLAG_BACKWARD;
+ }
+
+ agg_winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
+ readptr_flags);
+ agg_winobj->markpos = -1;
+ agg_winobj->seekpos = -1;
+
+ /* Also reset the row counters for aggregates */
+ winstate->aggregatedbase = 0;
+ winstate->aggregatedupto = 0;
+ }
/* create mark and read pointers for each real window function */
for (i = 0; i < numfuncs; i++)
@@ -694,8 +753,8 @@ begin_partition(WindowAggState *winstate)
}
/*
- * Read tuples from the outer node, up to position 'pos', and store them
- * into the tuplestore. If pos is -1, reads the whole partition.
+ * Read tuples from the outer node, up to and including position 'pos', and
+ * store them into the tuplestore. If pos is -1, reads the whole partition.
*/
static void
spool_tuples(WindowAggState *winstate, int64 pos)
@@ -789,7 +848,8 @@ release_partition(WindowAggState *winstate)
* any aggregate temp data). We don't rely on retail pfree because some
* aggregates might have allocated data we don't have direct pointers to.
*/
- MemoryContextResetAndDeleteChildren(winstate->wincontext);
+ MemoryContextResetAndDeleteChildren(winstate->partcontext);
+ MemoryContextResetAndDeleteChildren(winstate->aggcontext);
if (winstate->buffer)
tuplestore_end(winstate->buffer);
@@ -809,108 +869,303 @@ release_partition(WindowAggState *winstate)
static bool
row_is_in_frame(WindowAggState *winstate, int64 pos, TupleTableSlot *slot)
{
- WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
- int frameOptions = node->frameOptions;
+ int frameOptions = winstate->frameOptions;
Assert(pos >= 0); /* else caller error */
- /* We only support frame start mode UNBOUNDED PRECEDING for now */
- Assert(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING);
+ /* First, check frame starting conditions */
+ if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
+ {
+ if (frameOptions & FRAMEOPTION_ROWS)
+ {
+ /* rows before current row are out of frame */
+ if (pos < winstate->currentpos)
+ return false;
+ }
+ else if (frameOptions & FRAMEOPTION_RANGE)
+ {
+ /* preceding row that is not peer is out of frame */
+ if (pos < winstate->currentpos &&
+ !are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
+ return false;
+ }
+ else
+ Assert(false);
+ }
+ else if (frameOptions & FRAMEOPTION_START_VALUE)
+ {
+ if (frameOptions & FRAMEOPTION_ROWS)
+ {
+ int64 offset = DatumGetInt64(winstate->startOffsetValue);
- /* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */
- if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING)
- return true;
+ /* rows before current row + offset are out of frame */
+ if (frameOptions & FRAMEOPTION_START_VALUE_PRECEDING)
+ offset = -offset;
- /* Else frame tail mode must be CURRENT ROW */
- Assert(frameOptions & FRAMEOPTION_END_CURRENT_ROW);
+ if (pos < winstate->currentpos + offset)
+ return false;
+ }
+ else if (frameOptions & FRAMEOPTION_RANGE)
+ {
+ /* parser should have rejected this */
+ elog(ERROR, "window frame with value offset is not implemented");
+ }
+ else
+ Assert(false);
+ }
- /* if row is current row or a predecessor, it must be in frame */
- if (pos <= winstate->currentpos)
- return true;
+ /* Okay so far, now check frame ending conditions */
+ if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
+ {
+ if (frameOptions & FRAMEOPTION_ROWS)
+ {
+ /* rows after current row are out of frame */
+ if (pos > winstate->currentpos)
+ return false;
+ }
+ else if (frameOptions & FRAMEOPTION_RANGE)
+ {
+ /* following row that is not peer is out of frame */
+ if (pos > winstate->currentpos &&
+ !are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
+ return false;
+ }
+ else
+ Assert(false);
+ }
+ else if (frameOptions & FRAMEOPTION_END_VALUE)
+ {
+ if (frameOptions & FRAMEOPTION_ROWS)
+ {
+ int64 offset = DatumGetInt64(winstate->endOffsetValue);
- /* In ROWS mode, *only* such rows are in frame */
- if (frameOptions & FRAMEOPTION_ROWS)
- return false;
+ /* rows after current row + offset are out of frame */
+ if (frameOptions & FRAMEOPTION_END_VALUE_PRECEDING)
+ offset = -offset;
- /* Else must be RANGE mode */
- Assert(frameOptions & FRAMEOPTION_RANGE);
+ if (pos > winstate->currentpos + offset)
+ return false;
+ }
+ else if (frameOptions & FRAMEOPTION_RANGE)
+ {
+ /* parser should have rejected this */
+ elog(ERROR, "window frame with value offset is not implemented");
+ }
+ else
+ Assert(false);
+ }
- /* In frame iff it's a peer of current row */
- return are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot);
+ /* If we get here, it's in frame */
+ return true;
}
/*
- * update_frametailpos
- * make frametailpos valid for the current row
+ * update_frameheadpos
+ * make frameheadpos valid for the current row
*
- * Uses the winobj's read pointer for any required fetches; the winobj's
- * mark must not be past the currently known frame tail. Also uses the
- * specified slot for any required fetches.
+ * Uses the winobj's read pointer for any required fetches; hence, if the
+ * frame mode is one that requires row comparisons, the winobj's mark must
+ * not be past the currently known frame head. Also uses the specified slot
+ * for any required fetches.
*/
static void
-update_frametailpos(WindowObject winobj, TupleTableSlot *slot)
+update_frameheadpos(WindowObject winobj, TupleTableSlot *slot)
{
WindowAggState *winstate = winobj->winstate;
WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
- int frameOptions = node->frameOptions;
- int64 ftnext;
+ int frameOptions = winstate->frameOptions;
- if (winstate->frametail_valid)
+ if (winstate->framehead_valid)
return; /* already known for current row */
- /* We only support frame start mode UNBOUNDED PRECEDING for now */
- Assert(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING);
-
- /* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */
- if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING)
+ if (frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)
{
- spool_tuples(winstate, -1);
- winstate->frametailpos = winstate->spooled_rows - 1;
- winstate->frametail_valid = true;
- return;
+ /* In UNBOUNDED PRECEDING mode, frame head is always row 0 */
+ winstate->frameheadpos = 0;
+ winstate->framehead_valid = true;
}
+ else if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
+ {
+ if (frameOptions & FRAMEOPTION_ROWS)
+ {
+ /* In ROWS mode, frame head is the same as current */
+ winstate->frameheadpos = winstate->currentpos;
+ winstate->framehead_valid = true;
+ }
+ else if (frameOptions & FRAMEOPTION_RANGE)
+ {
+ int64 fhprev;
- /* Else frame tail mode must be CURRENT ROW */
- Assert(frameOptions & FRAMEOPTION_END_CURRENT_ROW);
+ /* If no ORDER BY, all rows are peers with each other */
+ if (node->ordNumCols == 0)
+ {
+ winstate->frameheadpos = 0;
+ winstate->framehead_valid = true;
+ return;
+ }
- /* In ROWS mode, exactly the rows up to current are in frame */
- if (frameOptions & FRAMEOPTION_ROWS)
+ /*
+ * In RANGE START_CURRENT mode, frame head is the first row that
+ * is a peer of current row. We search backwards from current,
+ * which could be a bit inefficient if peer sets are large.
+ * Might be better to have a separate read pointer that moves
+ * forward tracking the frame head.
+ */
+ fhprev = winstate->currentpos - 1;
+ for (;;)
+ {
+ /* assume the frame head can't go backwards */
+ if (fhprev < winstate->frameheadpos)
+ break;
+ if (!window_gettupleslot(winobj, fhprev, slot))
+ break; /* start of partition */
+ if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
+ break; /* not peer of current row */
+ fhprev--;
+ }
+ winstate->frameheadpos = fhprev + 1;
+ winstate->framehead_valid = true;
+ }
+ else
+ Assert(false);
+ }
+ else if (frameOptions & FRAMEOPTION_START_VALUE)
{
- winstate->frametailpos = winstate->currentpos;
- winstate->frametail_valid = true;
- return;
+ if (frameOptions & FRAMEOPTION_ROWS)
+ {
+ /* In ROWS mode, bound is physically n before/after current */
+ int64 offset = DatumGetInt64(winstate->startOffsetValue);
+
+ if (frameOptions & FRAMEOPTION_START_VALUE_PRECEDING)
+ offset = -offset;
+
+ winstate->frameheadpos = winstate->currentpos + offset;
+ /* frame head can't go before first row */
+ if (winstate->frameheadpos < 0)
+ winstate->frameheadpos = 0;
+ else if (winstate->frameheadpos > winstate->currentpos)
+ {
+ /* make sure frameheadpos is not past end of partition */
+ spool_tuples(winstate, winstate->frameheadpos - 1);
+ if (winstate->frameheadpos > winstate->spooled_rows)
+ winstate->frameheadpos = winstate->spooled_rows;
+ }
+ winstate->framehead_valid = true;
+ }
+ else if (frameOptions & FRAMEOPTION_RANGE)
+ {
+ /* parser should have rejected this */
+ elog(ERROR, "window frame with value offset is not implemented");
+ }
+ else
+ Assert(false);
}
+ else
+ Assert(false);
+}
- /* Else must be RANGE mode */
- Assert(frameOptions & FRAMEOPTION_RANGE);
+/*
+ * update_frametailpos
+ * make frametailpos valid for the current row
+ *
+ * Uses the winobj's read pointer for any required fetches; hence, if the
+ * frame mode is one that requires row comparisons, the winobj's mark must
+ * not be past the currently known frame tail. Also uses the specified slot
+ * for any required fetches.
+ */
+static void
+update_frametailpos(WindowObject winobj, TupleTableSlot *slot)
+{
+ WindowAggState *winstate = winobj->winstate;
+ WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan;
+ int frameOptions = winstate->frameOptions;
- /* If no ORDER BY, all rows are peers with each other */
- if (node->ordNumCols == 0)
+ if (winstate->frametail_valid)
+ return; /* already known for current row */
+
+ if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING)
{
+ /* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */
spool_tuples(winstate, -1);
winstate->frametailpos = winstate->spooled_rows - 1;
winstate->frametail_valid = true;
- return;
}
+ else if (frameOptions & FRAMEOPTION_END_CURRENT_ROW)
+ {
+ if (frameOptions & FRAMEOPTION_ROWS)
+ {
+ /* In ROWS mode, exactly the rows up to current are in frame */
+ winstate->frametailpos = winstate->currentpos;
+ winstate->frametail_valid = true;
+ }
+ else if (frameOptions & FRAMEOPTION_RANGE)
+ {
+ int64 ftnext;
- /*
- * Else we have to search for the first non-peer of the current row. We
- * assume the current value of frametailpos is a lower bound on the
- * possible frame tail location, ie, frame tail never goes backward, and
- * that currentpos is also a lower bound, ie, current row is always in
- * frame.
- */
- ftnext = Max(winstate->frametailpos, winstate->currentpos) + 1;
- for (;;)
+ /* If no ORDER BY, all rows are peers with each other */
+ if (node->ordNumCols == 0)
+ {
+ spool_tuples(winstate, -1);
+ winstate->frametailpos = winstate->spooled_rows - 1;
+ winstate->frametail_valid = true;
+ return;
+ }
+
+ /*
+ * Else we have to search for the first non-peer of the current
+ * row. We assume the current value of frametailpos is a lower
+ * bound on the possible frame tail location, ie, frame tail never
+ * goes backward, and that currentpos is also a lower bound, ie,
+ * frame end always >= current row.
+ */
+ ftnext = Max(winstate->frametailpos, winstate->currentpos) + 1;
+ for (;;)
+ {
+ if (!window_gettupleslot(winobj, ftnext, slot))
+ break; /* end of partition */
+ if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
+ break; /* not peer of current row */
+ ftnext++;
+ }
+ winstate->frametailpos = ftnext - 1;
+ winstate->frametail_valid = true;
+ }
+ else
+ Assert(false);
+ }
+ else if (frameOptions & FRAMEOPTION_END_VALUE)
{
- if (!window_gettupleslot(winobj, ftnext, slot))
- break; /* end of partition */
- if (!are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot))
- break; /* not peer of current row */
- ftnext++;
+ if (frameOptions & FRAMEOPTION_ROWS)
+ {
+ /* In ROWS mode, bound is physically n before/after current */
+ int64 offset = DatumGetInt64(winstate->endOffsetValue);
+
+ if (frameOptions & FRAMEOPTION_END_VALUE_PRECEDING)
+ offset = -offset;
+
+ winstate->frametailpos = winstate->currentpos + offset;
+ /* smallest allowable value of frametailpos is -1 */
+ if (winstate->frametailpos < 0)
+ winstate->frametailpos = -1;
+ else if (winstate->frametailpos > winstate->currentpos)
+ {
+ /* make sure frametailpos is not past last row of partition */
+ spool_tuples(winstate, winstate->frametailpos);
+ if (winstate->frametailpos >= winstate->spooled_rows)
+ winstate->frametailpos = winstate->spooled_rows - 1;
+ }
+ winstate->frametail_valid = true;
+ }
+ else if (frameOptions & FRAMEOPTION_RANGE)
+ {
+ /* parser should have rejected this */
+ elog(ERROR, "window frame with value offset is not implemented");
+ }
+ else
+ Assert(false);
}
- winstate->frametailpos = ftnext - 1;
- winstate->frametail_valid = true;
+ else
+ Assert(false);
}
@@ -953,6 +1208,73 @@ ExecWindowAgg(WindowAggState *winstate)
winstate->ss.ps.ps_TupFromTlist = false;
}
+ /*
+ * Compute frame offset values, if any, during first call.
+ */
+ if (winstate->all_first)
+ {
+ int frameOptions = winstate->frameOptions;
+ ExprContext *econtext = winstate->ss.ps.ps_ExprContext;
+ Datum value;
+ bool isnull;
+ int16 len;
+ bool byval;
+
+ if (frameOptions & FRAMEOPTION_START_VALUE)
+ {
+ Assert(winstate->startOffset != NULL);
+ value = ExecEvalExprSwitchContext(winstate->startOffset,
+ econtext,
+ &isnull,
+ NULL);
+ if (isnull)
+ ereport(ERROR,
+ (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+ errmsg("frame starting offset must not be NULL")));
+ /* copy value into query-lifespan context */
+ get_typlenbyval(exprType((Node *) winstate->startOffset->expr),
+ &len, &byval);
+ winstate->startOffsetValue = datumCopy(value, byval, len);
+ if (frameOptions & FRAMEOPTION_ROWS)
+ {
+ /* value is known to be int8 */
+ int64 offset = DatumGetInt64(value);
+
+ if (offset < 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("frame starting offset must not be negative")));
+ }
+ }
+ if (frameOptions & FRAMEOPTION_END_VALUE)
+ {
+ Assert(winstate->endOffset != NULL);
+ value = ExecEvalExprSwitchContext(winstate->endOffset,
+ econtext,
+ &isnull,
+ NULL);
+ if (isnull)
+ ereport(ERROR,
+ (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+ errmsg("frame ending offset must not be NULL")));
+ /* copy value into query-lifespan context */
+ get_typlenbyval(exprType((Node *) winstate->endOffset->expr),
+ &len, &byval);
+ winstate->endOffsetValue = datumCopy(value, byval, len);
+ if (frameOptions & FRAMEOPTION_ROWS)
+ {
+ /* value is known to be int8 */
+ int64 offset = DatumGetInt64(value);
+
+ if (offset < 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("frame ending offset must not be negative")));
+ }
+ }
+ winstate->all_first = false;
+ }
+
restart:
if (winstate->buffer == NULL)
{
@@ -964,7 +1286,8 @@ restart:
{
/* Advance current row within partition */
winstate->currentpos++;
- /* This might mean that the frame tail moves, too */
+ /* This might mean that the frame moves, too */
+ winstate->framehead_valid = false;
winstate->frametail_valid = false;
}
@@ -1099,10 +1422,18 @@ ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
winstate->tmpcontext = tmpcontext;
ExecAssignExprContext(estate, &winstate->ss.ps);
- /* Create long-lived context for storage of aggregate transvalues etc */
- winstate->wincontext =
+ /* Create long-lived context for storage of partition-local memory etc */
+ winstate->partcontext =
AllocSetContextCreate(CurrentMemoryContext,
- "WindowAggContext",
+ "WindowAgg_Partition",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+
+ /* Create mid-lived context for aggregate trans values etc */
+ winstate->aggcontext =
+ AllocSetContextCreate(CurrentMemoryContext,
+ "WindowAgg_Aggregates",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
@@ -1229,7 +1560,7 @@ ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
perfuncstate->numArguments = list_length(wfuncstate->args);
fmgr_info_cxt(wfunc->winfnoid, &perfuncstate->flinfo,
- tmpcontext->ecxt_per_query_memory);
+ econtext->ecxt_per_query_memory);
perfuncstate->flinfo.fn_expr = (Node *) wfunc;
get_typlenbyval(wfunc->wintype,
&perfuncstate->resulttypeLen,
@@ -1264,6 +1595,30 @@ ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
winstate->numfuncs = wfuncno + 1;
winstate->numaggs = aggno + 1;
+ /* Set up WindowObject for aggregates, if needed */
+ if (winstate->numaggs > 0)
+ {
+ WindowObject agg_winobj = makeNode(WindowObjectData);
+
+ agg_winobj->winstate = winstate;
+ agg_winobj->argstates = NIL;
+ agg_winobj->localmem = NULL;
+ /* make sure markptr = -1 to invalidate. It may not get used */
+ agg_winobj->markptr = -1;
+ agg_winobj->readptr = -1;
+ winstate->agg_winobj = agg_winobj;
+ }
+
+ /* copy frame options to state node for easy access */
+ winstate->frameOptions = node->frameOptions;
+
+ /* initialize frame bound offset expressions */
+ winstate->startOffset = ExecInitExpr((Expr *) node->startOffset,
+ (PlanState *) winstate);
+ winstate->endOffset = ExecInitExpr((Expr *) node->endOffset,
+ (PlanState *) winstate);
+
+ winstate->all_first = true;
winstate->partition_spooled = false;
winstate->more_partitions = false;
@@ -1297,7 +1652,8 @@ ExecEndWindowAgg(WindowAggState *node)
node->ss.ps.ps_ExprContext = node->tmpcontext;
ExecFreeExprContext(&node->ss.ps);
- MemoryContextDelete(node->wincontext);
+ MemoryContextDelete(node->partcontext);
+ MemoryContextDelete(node->aggcontext);
outerPlan = outerPlanState(node);
ExecEndNode(outerPlan);
@@ -1315,6 +1671,7 @@ ExecReScanWindowAgg(WindowAggState *node, ExprContext *exprCtxt)
node->all_done = false;
node->ss.ps.ps_TupFromTlist = false;
+ node->all_first = true;
/* release tuplestore et al */
release_partition(node);
@@ -1566,7 +1923,7 @@ window_gettupleslot(WindowObject winobj, int64 pos, TupleTableSlot *slot)
* There's no API to refetch the tuple at the current position. We have to
* move one tuple forward, and then one backward. (We don't do it the
* other way because we might try to fetch the row before our mark, which
- * isn't allowed.)
+ * isn't allowed.) XXX this case could stand to be optimized.
*/
if (winobj->seekpos == pos)
{
@@ -1616,8 +1973,8 @@ WinGetPartitionLocalMemory(WindowObject winobj, Size sz)
{
Assert(WindowObjectIsValid(winobj));
if (winobj->localmem == NULL)
- winobj->localmem = MemoryContextAllocZero(winobj->winstate->wincontext,
- sz);
+ winobj->localmem =
+ MemoryContextAllocZero(winobj->winstate->partcontext, sz);
return winobj->localmem;
}
@@ -1791,7 +2148,30 @@ WinGetFuncArgInPartition(WindowObject winobj, int argno,
if (isout)
*isout = false;
if (set_mark)
- WinSetMarkPosition(winobj, abs_pos);
+ {
+ int frameOptions = winstate->frameOptions;
+ int64 mark_pos = abs_pos;
+
+ /*
+ * In RANGE mode with a moving frame head, we must not let the
+ * mark advance past frameheadpos, since that row has to be
+ * fetchable during future update_frameheadpos calls.
+ *
+ * XXX it is very ugly to pollute window functions' marks with
+ * this consideration; it could for instance mask a logic bug
+ * that lets a window function fetch rows before what it had
+ * claimed was its mark. Perhaps use a separate mark for
+ * frame head probes?
+ */
+ if ((frameOptions & FRAMEOPTION_RANGE) &&
+ !(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
+ {
+ update_frameheadpos(winobj, winstate->temp_slot_2);
+ if (mark_pos > winstate->frameheadpos)
+ mark_pos = winstate->frameheadpos;
+ }
+ WinSetMarkPosition(winobj, mark_pos);
+ }
econtext->ecxt_outertuple = slot;
return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
econtext, isnull, NULL);
@@ -1838,7 +2218,8 @@ WinGetFuncArgInFrame(WindowObject winobj, int argno,
abs_pos = winstate->currentpos + relpos;
break;
case WINDOW_SEEK_HEAD:
- abs_pos = relpos;
+ update_frameheadpos(winobj, slot);
+ abs_pos = winstate->frameheadpos + relpos;
break;
case WINDOW_SEEK_TAIL:
update_frametailpos(winobj, slot);
@@ -1866,7 +2247,30 @@ WinGetFuncArgInFrame(WindowObject winobj, int argno,
if (isout)
*isout = false;
if (set_mark)
- WinSetMarkPosition(winobj, abs_pos);
+ {
+ int frameOptions = winstate->frameOptions;
+ int64 mark_pos = abs_pos;
+
+ /*
+ * In RANGE mode with a moving frame head, we must not let the
+ * mark advance past frameheadpos, since that row has to be
+ * fetchable during future update_frameheadpos calls.
+ *
+ * XXX it is very ugly to pollute window functions' marks with
+ * this consideration; it could for instance mask a logic bug
+ * that lets a window function fetch rows before what it had
+ * claimed was its mark. Perhaps use a separate mark for
+ * frame head probes?
+ */
+ if ((frameOptions & FRAMEOPTION_RANGE) &&
+ !(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
+ {
+ update_frameheadpos(winobj, winstate->temp_slot_2);
+ if (mark_pos > winstate->frameheadpos)
+ mark_pos = winstate->frameheadpos;
+ }
+ WinSetMarkPosition(winobj, mark_pos);
+ }
econtext->ecxt_outertuple = slot;
return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno),
econtext, isnull, NULL);