diff options
Diffstat (limited to 'src/backend/executor')
-rw-r--r-- | src/backend/executor/nodeAgg.c | 466 |
1 files changed, 358 insertions, 108 deletions
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index 00234f0e236..b4cc1e553b3 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -13,6 +13,10 @@ * If a finalfunc is not supplied then the result is just the ending * value of transvalue. * + * If an aggregate call specifies DISTINCT or ORDER BY, we sort the input + * tuples and eliminate duplicates (if required) before performing the + * above-depicted process. + * * If transfunc is marked "strict" in pg_proc and initcond is NULL, * then the first non-NULL input_value is assigned directly to transvalue, * and transfunc isn't applied until the second non-NULL input_value. @@ -63,7 +67,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/executor/nodeAgg.c,v 1.169 2009/09/27 21:10:53 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/executor/nodeAgg.c,v 1.170 2009/12/15 17:57:46 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -78,9 +82,9 @@ #include "miscadmin.h" #include "nodes/nodeFuncs.h" #include "optimizer/clauses.h" +#include "optimizer/tlist.h" #include "parser/parse_agg.h" #include "parser/parse_coerce.h" -#include "parser/parse_oper.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/lsyscache.h" @@ -104,9 +108,12 @@ typedef struct AggStatePerAggData AggrefExprState *aggrefstate; Aggref *aggref; - /* number of input arguments for aggregate */ + /* number of input arguments for aggregate function proper */ int numArguments; + /* number of inputs including ORDER BY expressions */ + int numInputs; + /* Oids of transfer functions */ Oid transfn_oid; Oid finalfn_oid; /* may be InvalidOid */ @@ -119,19 +126,24 @@ typedef struct AggStatePerAggData FmgrInfo transfn; FmgrInfo finalfn; - /* - * Type of input data and Oid of sort operator to use for it; only - * set/used when aggregate has DISTINCT flag. (These are not used - * directly by nodeAgg, but must be passed to the Tuplesort object.) - */ - Oid inputType; - Oid sortOperator; + /* number of sorting columns */ + int numSortCols; + + /* number of sorting columns to consider in DISTINCT comparisons */ + /* (this is either zero or the same as numSortCols) */ + int numDistinctCols; + + /* deconstructed sorting information (arrays of length numSortCols) */ + AttrNumber *sortColIdx; + Oid *sortOperators; + bool *sortNullsFirst; /* - * fmgr lookup data for input type's equality operator --- only set/used - * when aggregate has DISTINCT flag. + * fmgr lookup data for input columns' equality operators --- only + * set/used when aggregate has DISTINCT flag. Note that these are in + * order of sort column index, not parameter index. */ - FmgrInfo equalfn; + FmgrInfo *equalfns; /* array of length numDistinctCols */ /* * initial value from pg_aggregate entry @@ -142,6 +154,9 @@ typedef struct AggStatePerAggData /* * We need the len and byval info for the agg's input, result, and * transition data types in order to know how to copy/delete values. + * + * Note that the info for the input type is used only when handling + * DISTINCT aggs with just one argument, so there is only one input type. */ int16 inputtypeLen, resulttypeLen, @@ -151,17 +166,34 @@ typedef struct AggStatePerAggData transtypeByVal; /* + * Stuff for evaluation of inputs. We used to just use ExecEvalExpr, but + * with the addition of ORDER BY we now need at least a slot for passing + * data to the sort object, which requires a tupledesc, so we might as + * well go whole hog and use ExecProject too. + */ + TupleDesc evaldesc; /* descriptor of input tuples */ + ProjectionInfo *evalproj; /* projection machinery */ + + /* + * Slots for holding the evaluated input arguments. These are set up + * during ExecInitAgg() and then used for each input row. + */ + TupleTableSlot *evalslot; /* current input tuple */ + TupleTableSlot *uniqslot; /* used for multi-column DISTINCT */ + + /* * These values are working state that is initialized at the start of an * input tuple group and updated for each input tuple. * - * For a simple (non DISTINCT) aggregate, we just feed the input values - * straight to the transition function. If it's DISTINCT, we pass the - * input values into a Tuplesort object; then at completion of the input - * tuple group, we scan the sorted values, eliminate duplicates, and run - * the transition function on the rest. + * For a simple (non DISTINCT/ORDER BY) aggregate, we just feed the input + * values straight to the transition function. If it's DISTINCT or + * requires ORDER BY, we pass the input values into a Tuplesort object; + * then at completion of the input tuple group, we scan the sorted values, + * eliminate duplicates if needed, and run the transition function on the + * rest. */ - Tuplesortstate *sortstate; /* sort object, if a DISTINCT agg */ + Tuplesortstate *sortstate; /* sort object, if DISTINCT or ORDER BY */ } AggStatePerAggData; /* @@ -220,7 +252,10 @@ static void advance_transition_function(AggState *aggstate, AggStatePerGroup pergroupstate, FunctionCallInfoData *fcinfo); static void advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup); -static void process_sorted_aggregate(AggState *aggstate, +static void process_ordered_aggregate_single(AggState *aggstate, + AggStatePerAgg peraggstate, + AggStatePerGroup pergroupstate); +static void process_ordered_aggregate_multi(AggState *aggstate, AggStatePerAgg peraggstate, AggStatePerGroup pergroupstate); static void finalize_aggregate(AggState *aggstate, @@ -254,12 +289,11 @@ initialize_aggregates(AggState *aggstate, { AggStatePerAgg peraggstate = &peragg[aggno]; AggStatePerGroup pergroupstate = &pergroup[aggno]; - Aggref *aggref = peraggstate->aggref; /* - * Start a fresh sort operation for each DISTINCT aggregate. + * Start a fresh sort operation for each DISTINCT/ORDER BY aggregate. */ - if (aggref->aggdistinct) + if (peraggstate->numSortCols > 0) { /* * In case of rescan, maybe there could be an uncompleted sort @@ -268,10 +302,23 @@ initialize_aggregates(AggState *aggstate, if (peraggstate->sortstate) tuplesort_end(peraggstate->sortstate); + /* + * We use a plain Datum sorter when there's a single input + * column; otherwise sort the full tuple. (See comments for + * process_ordered_aggregate_single.) + */ peraggstate->sortstate = - tuplesort_begin_datum(peraggstate->inputType, - peraggstate->sortOperator, false, - work_mem, false); + (peraggstate->numInputs == 1) ? + tuplesort_begin_datum(peraggstate->evaldesc->attrs[0]->atttypid, + peraggstate->sortOperators[0], + peraggstate->sortNullsFirst[0], + work_mem, false) : + tuplesort_begin_heap(peraggstate->evaldesc, + peraggstate->numSortCols, + peraggstate->sortColIdx, + peraggstate->sortOperators, + peraggstate->sortNullsFirst, + work_mem, false); } /* @@ -418,78 +465,110 @@ advance_transition_function(AggState *aggstate, static void advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup) { - ExprContext *econtext = aggstate->tmpcontext; int aggno; for (aggno = 0; aggno < aggstate->numaggs; aggno++) { AggStatePerAgg peraggstate = &aggstate->peragg[aggno]; AggStatePerGroup pergroupstate = &pergroup[aggno]; - AggrefExprState *aggrefstate = peraggstate->aggrefstate; - Aggref *aggref = peraggstate->aggref; - FunctionCallInfoData fcinfo; + int nargs = peraggstate->numArguments; int i; - ListCell *arg; - MemoryContext oldContext; + TupleTableSlot *slot; - /* Switch memory context just once for all args */ - oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); + /* Evaluate the current input expressions for this aggregate */ + slot = ExecProject(peraggstate->evalproj, NULL); - /* Evaluate inputs and save in fcinfo */ - /* We start from 1, since the 0th arg will be the transition value */ - i = 1; - foreach(arg, aggrefstate->args) + if (peraggstate->numSortCols > 0) { - ExprState *argstate = (ExprState *) lfirst(arg); - - fcinfo.arg[i] = ExecEvalExpr(argstate, econtext, - fcinfo.argnull + i, NULL); - i++; - } + /* DISTINCT and/or ORDER BY case */ + Assert(slot->tts_nvalid == peraggstate->numInputs); - /* Switch back */ - MemoryContextSwitchTo(oldContext); + /* + * If the transfn is strict, we want to check for nullity + * before storing the row in the sorter, to save space if + * there are a lot of nulls. Note that we must only check + * numArguments columns, not numInputs, since nullity in + * columns used only for sorting is not relevant here. + */ + if (peraggstate->transfn.fn_strict) + { + for (i = 0; i < nargs; i++) + { + if (slot->tts_isnull[i]) + break; + } + if (i < nargs) + continue; + } - if (aggref->aggdistinct) - { - /* in DISTINCT mode, we may ignore nulls */ - /* XXX we assume there is only one input column */ - if (fcinfo.argnull[1]) - continue; - tuplesort_putdatum(peraggstate->sortstate, fcinfo.arg[1], - fcinfo.argnull[1]); + /* OK, put the tuple into the tuplesort object */ + if (peraggstate->numInputs == 1) + tuplesort_putdatum(peraggstate->sortstate, + slot->tts_values[0], + slot->tts_isnull[0]); + else + tuplesort_puttupleslot(peraggstate->sortstate, slot); } else { + /* We can apply the transition function immediately */ + FunctionCallInfoData fcinfo; + + /* Load values into fcinfo */ + /* Start from 1, since the 0th arg will be the transition value */ + Assert(slot->tts_nvalid >= nargs); + for (i = 0; i < nargs; i++) + { + fcinfo.arg[i + 1] = slot->tts_values[i]; + fcinfo.argnull[i + 1] = slot->tts_isnull[i]; + } + advance_transition_function(aggstate, peraggstate, pergroupstate, &fcinfo); } } } + /* - * Run the transition function for a DISTINCT aggregate. This is called - * after we have completed entering all the input values into the sort - * object. We complete the sort, read out the values in sorted order, - * and run the transition function on each non-duplicate value. + * Run the transition function for a DISTINCT or ORDER BY aggregate + * with only one input. This is called after we have completed + * entering all the input values into the sort object. We complete the + * sort, read out the values in sorted order, and run the transition + * function on each value (applying DISTINCT if appropriate). + * + * Note that the strictness of the transition function was checked when + * entering the values into the sort, so we don't check it again here; + * we just apply standard SQL DISTINCT logic. + * + * The one-input case is handled separately from the multi-input case + * for performance reasons: for single by-value inputs, such as the + * common case of count(distinct id), the tuplesort_getdatum code path + * is around 300% faster. (The speedup for by-reference types is less + * but still noticeable.) * * When called, CurrentMemoryContext should be the per-query context. */ static void -process_sorted_aggregate(AggState *aggstate, - AggStatePerAgg peraggstate, - AggStatePerGroup pergroupstate) +process_ordered_aggregate_single(AggState *aggstate, + AggStatePerAgg peraggstate, + AggStatePerGroup pergroupstate) { Datum oldVal = (Datum) 0; + bool oldIsNull = true; bool haveOldVal = false; MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory; MemoryContext oldContext; + bool isDistinct = (peraggstate->numDistinctCols > 0); Datum *newVal; bool *isNull; FunctionCallInfoData fcinfo; + Assert(peraggstate->numDistinctCols < 2); + tuplesort_performsort(peraggstate->sortstate); + /* Load the column into argument 1 (arg 0 will be transition value) */ newVal = fcinfo.arg + 1; isNull = fcinfo.argnull + 1; @@ -503,25 +582,24 @@ process_sorted_aggregate(AggState *aggstate, newVal, isNull)) { /* - * DISTINCT always suppresses nulls, per SQL spec, regardless of the - * transition function's strictness. - */ - if (*isNull) - continue; - - /* * Clear and select the working context for evaluation of the equality * function and transition function. */ MemoryContextReset(workcontext); oldContext = MemoryContextSwitchTo(workcontext); - if (haveOldVal && - DatumGetBool(FunctionCall2(&peraggstate->equalfn, - oldVal, *newVal))) + /* + * If DISTINCT mode, and not distinct from prior, skip it. + */ + if (isDistinct && + haveOldVal && + ((oldIsNull && *isNull) || + (!oldIsNull && !*isNull && + DatumGetBool(FunctionCall2(&peraggstate->equalfns[0], + oldVal, *newVal))))) { /* equal to prior, so forget this one */ - if (!peraggstate->inputtypeByVal) + if (!peraggstate->inputtypeByVal && !*isNull) pfree(DatumGetPointer(*newVal)); } else @@ -529,17 +607,18 @@ process_sorted_aggregate(AggState *aggstate, advance_transition_function(aggstate, peraggstate, pergroupstate, &fcinfo); /* forget the old value, if any */ - if (haveOldVal && !peraggstate->inputtypeByVal) + if (!oldIsNull && !peraggstate->inputtypeByVal) pfree(DatumGetPointer(oldVal)); /* and remember the new one for subsequent equality checks */ oldVal = *newVal; + oldIsNull = *isNull; haveOldVal = true; } MemoryContextSwitchTo(oldContext); } - if (haveOldVal && !peraggstate->inputtypeByVal) + if (!oldIsNull && !peraggstate->inputtypeByVal) pfree(DatumGetPointer(oldVal)); tuplesort_end(peraggstate->sortstate); @@ -547,6 +626,87 @@ process_sorted_aggregate(AggState *aggstate, } /* + * Run the transition function for a DISTINCT or ORDER BY aggregate + * with more than one input. This is called after we have completed + * entering all the input values into the sort object. We complete the + * sort, read out the values in sorted order, and run the transition + * function on each value (applying DISTINCT if appropriate). + * + * When called, CurrentMemoryContext should be the per-query context. + */ +static void +process_ordered_aggregate_multi(AggState *aggstate, + AggStatePerAgg peraggstate, + AggStatePerGroup pergroupstate) +{ + MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory; + FunctionCallInfoData fcinfo; + TupleTableSlot *slot1 = peraggstate->evalslot; + TupleTableSlot *slot2 = peraggstate->uniqslot; + int numArguments = peraggstate->numArguments; + int numDistinctCols = peraggstate->numDistinctCols; + bool haveOldValue = false; + int i; + + tuplesort_performsort(peraggstate->sortstate); + + ExecClearTuple(slot1); + if (slot2) + ExecClearTuple(slot2); + + while (tuplesort_gettupleslot(peraggstate->sortstate, true, slot1)) + { + /* + * Extract the first numArguments as datums to pass to the transfn. + * (This will help execTuplesMatch too, so do it immediately.) + */ + slot_getsomeattrs(slot1, numArguments); + + if (numDistinctCols == 0 || + !haveOldValue || + !execTuplesMatch(slot1, slot2, + numDistinctCols, + peraggstate->sortColIdx, + peraggstate->equalfns, + workcontext)) + { + /* Load values into fcinfo */ + /* Start from 1, since the 0th arg will be the transition value */ + for (i = 0; i < numArguments; i++) + { + fcinfo.arg[i + 1] = slot1->tts_values[i]; + fcinfo.argnull[i + 1] = slot1->tts_isnull[i]; + } + + advance_transition_function(aggstate, peraggstate, pergroupstate, + &fcinfo); + + if (numDistinctCols > 0) + { + /* swap the slot pointers to retain the current tuple */ + TupleTableSlot *tmpslot = slot2; + + slot2 = slot1; + slot1 = tmpslot; + haveOldValue = true; + } + } + + /* Reset context each time, unless execTuplesMatch did it for us */ + if (numDistinctCols == 0) + MemoryContextReset(workcontext); + + ExecClearTuple(slot1); + } + + if (slot2) + ExecClearTuple(slot2); + + tuplesort_end(peraggstate->sortstate); + peraggstate->sortstate = NULL; +} + +/* * Compute the final value of one aggregate. * * The finalfunction will be run, and the result delivered, in the @@ -983,8 +1143,17 @@ agg_retrieve_direct(AggState *aggstate) AggStatePerAgg peraggstate = &peragg[aggno]; AggStatePerGroup pergroupstate = &pergroup[aggno]; - if (peraggstate->aggref->aggdistinct) - process_sorted_aggregate(aggstate, peraggstate, pergroupstate); + if (peraggstate->numSortCols > 0) + { + if (peraggstate->numInputs == 1) + process_ordered_aggregate_single(aggstate, + peraggstate, + pergroupstate); + else + process_ordered_aggregate_multi(aggstate, + peraggstate, + pergroupstate); + } finalize_aggregate(aggstate, peraggstate, pergroupstate, &aggvalues[aggno], &aggnulls[aggno]); @@ -1138,7 +1307,7 @@ agg_retrieve_hash_table(AggState *aggstate) AggStatePerAgg peraggstate = &peragg[aggno]; AggStatePerGroup pergroupstate = &pergroup[aggno]; - Assert(!peraggstate->aggref->aggdistinct); + Assert(peraggstate->numSortCols == 0); finalize_aggregate(aggstate, peraggstate, pergroupstate, &aggvalues[aggno], &aggnulls[aggno]); } @@ -1363,6 +1532,10 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) AggStatePerAgg peraggstate; Oid inputTypes[FUNC_MAX_ARGS]; int numArguments; + int numInputs; + int numSortCols; + int numDistinctCols; + List *sortlist; HeapTuple aggTuple; Form_pg_aggregate aggform; Oid aggtranstype; @@ -1401,19 +1574,24 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) /* Fill in the peraggstate data */ peraggstate->aggrefstate = aggrefstate; peraggstate->aggref = aggref; - numArguments = list_length(aggref->args); - peraggstate->numArguments = numArguments; + numInputs = list_length(aggref->args); + peraggstate->numInputs = numInputs; + peraggstate->sortstate = NULL; /* * Get actual datatypes of the inputs. These could be different from * the agg's declared input types, when the agg accepts ANY or a * polymorphic type. */ - i = 0; + numArguments = 0; foreach(lc, aggref->args) { - inputTypes[i++] = exprType((Node *) lfirst(lc)); + TargetEntry *tle = (TargetEntry *) lfirst(lc); + + if (!tle->resjunk) + inputTypes[numArguments++] = exprType((Node *) tle->expr); } + peraggstate->numArguments = numArguments; aggTuple = SearchSysCache(AGGFNOID, ObjectIdGetDatum(aggref->aggfnoid), @@ -1538,43 +1716,115 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) aggref->aggfnoid))); } + /* + * Get a tupledesc corresponding to the inputs (including sort + * expressions) of the agg. + */ + peraggstate->evaldesc = ExecTypeFromTL(aggref->args, false); + + /* Create slot we're going to do argument evaluation in */ + peraggstate->evalslot = ExecInitExtraTupleSlot(estate); + ExecSetSlotDescriptor(peraggstate->evalslot, peraggstate->evaldesc); + + /* Set up projection info for evaluation */ + peraggstate->evalproj = ExecBuildProjectionInfo(aggrefstate->args, + aggstate->tmpcontext, + peraggstate->evalslot, + NULL); + + /* + * If we're doing either DISTINCT or ORDER BY, then we have a list + * of SortGroupClause nodes; fish out the data in them and + * stick them into arrays. + * + * Note that by construction, if there is a DISTINCT clause then the + * ORDER BY clause is a prefix of it (see transformDistinctClause). + */ if (aggref->aggdistinct) { - Oid lt_opr; - Oid eq_opr; + sortlist = aggref->aggdistinct; + numSortCols = numDistinctCols = list_length(sortlist); + Assert(numSortCols >= list_length(aggref->aggorder)); + } + else + { + sortlist = aggref->aggorder; + numSortCols = list_length(sortlist); + numDistinctCols = 0; + } - /* We don't implement DISTINCT aggs in the HASHED case */ - Assert(node->aggstrategy != AGG_HASHED); + peraggstate->numSortCols = numSortCols; + peraggstate->numDistinctCols = numDistinctCols; + if (numSortCols > 0) + { /* - * We don't currently implement DISTINCT aggs for aggs having more - * than one argument. This isn't required for anything in the SQL - * spec, but really it ought to be implemented for - * feature-completeness. FIXME someday. + * We don't implement DISTINCT or ORDER BY aggs in the HASHED case + * (yet) */ - if (numArguments != 1) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("DISTINCT is supported only for single-argument aggregates"))); + Assert(node->aggstrategy != AGG_HASHED); + + /* If we have only one input, we need its len/byval info. */ + if (numInputs == 1) + { + get_typlenbyval(inputTypes[0], + &peraggstate->inputtypeLen, + &peraggstate->inputtypeByVal); + } + else if (numDistinctCols > 0) + { + /* we will need an extra slot to store prior values */ + peraggstate->uniqslot = ExecInitExtraTupleSlot(estate); + ExecSetSlotDescriptor(peraggstate->uniqslot, + peraggstate->evaldesc); + } + + /* Extract the sort information for use later */ + peraggstate->sortColIdx = + (AttrNumber *) palloc(numSortCols * sizeof(AttrNumber)); + peraggstate->sortOperators = + (Oid *) palloc(numSortCols * sizeof(Oid)); + peraggstate->sortNullsFirst = + (bool *) palloc(numSortCols * sizeof(bool)); + + i = 0; + foreach(lc, sortlist) + { + SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc); + TargetEntry *tle = get_sortgroupclause_tle(sortcl, + aggref->args); + + /* the parser should have made sure of this */ + Assert(OidIsValid(sortcl->sortop)); - peraggstate->inputType = inputTypes[0]; - get_typlenbyval(inputTypes[0], - &peraggstate->inputtypeLen, - &peraggstate->inputtypeByVal); + peraggstate->sortColIdx[i] = tle->resno; + peraggstate->sortOperators[i] = sortcl->sortop; + peraggstate->sortNullsFirst[i] = sortcl->nulls_first; + i++; + } + Assert(i == numSortCols); + } + + if (aggref->aggdistinct) + { + Assert(numArguments > 0); /* - * Look up the sorting and comparison operators to use. XXX it's - * pretty bletcherous to be making this sort of semantic decision - * in the executor. Probably the parser should decide this and - * record it in the Aggref node ... or at latest, do it in the - * planner. + * We need the equal function for each DISTINCT comparison we will + * make. */ - get_sort_group_operators(inputTypes[0], - true, true, false, - <_opr, &eq_opr, NULL); - fmgr_info(get_opcode(eq_opr), &(peraggstate->equalfn)); - peraggstate->sortOperator = lt_opr; - peraggstate->sortstate = NULL; + peraggstate->equalfns = + (FmgrInfo *) palloc(numDistinctCols * sizeof(FmgrInfo)); + + i = 0; + foreach(lc, aggref->aggdistinct) + { + SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc); + + fmgr_info(get_opcode(sortcl->eqop), &peraggstate->equalfns[i]); + i++; + } + Assert(i == numDistinctCols); } ReleaseSysCache(aggTuple); |