diff options
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/catalog/pg_aggregate.c | 5 | ||||
-rw-r--r-- | src/backend/commands/aggregatecmds.c | 42 | ||||
-rw-r--r-- | src/backend/executor/nodeAgg.c | 65 | ||||
-rw-r--r-- | src/backend/executor/nodeWindowAgg.c | 43 |
4 files changed, 122 insertions, 33 deletions
diff --git a/src/backend/catalog/pg_aggregate.c b/src/backend/catalog/pg_aggregate.c index a9204503d36..ca3fd819b48 100644 --- a/src/backend/catalog/pg_aggregate.c +++ b/src/backend/catalog/pg_aggregate.c @@ -19,6 +19,7 @@ #include "catalog/dependency.h" #include "catalog/indexing.h" #include "catalog/pg_aggregate.h" +#include "catalog/pg_aggregate_fn.h" #include "catalog/pg_language.h" #include "catalog/pg_operator.h" #include "catalog/pg_proc.h" @@ -65,6 +66,8 @@ AggregateCreate(const char *aggName, List *aggmfinalfnName, bool finalfnExtraArgs, bool mfinalfnExtraArgs, + char finalfnModify, + char mfinalfnModify, List *aggsortopName, Oid aggTransType, int32 aggTransSpace, @@ -656,6 +659,8 @@ AggregateCreate(const char *aggName, values[Anum_pg_aggregate_aggmfinalfn - 1] = ObjectIdGetDatum(mfinalfn); values[Anum_pg_aggregate_aggfinalextra - 1] = BoolGetDatum(finalfnExtraArgs); values[Anum_pg_aggregate_aggmfinalextra - 1] = BoolGetDatum(mfinalfnExtraArgs); + values[Anum_pg_aggregate_aggfinalmodify - 1] = CharGetDatum(finalfnModify); + values[Anum_pg_aggregate_aggmfinalmodify - 1] = CharGetDatum(mfinalfnModify); values[Anum_pg_aggregate_aggsortop - 1] = ObjectIdGetDatum(sortop); values[Anum_pg_aggregate_aggtranstype - 1] = ObjectIdGetDatum(aggTransType); values[Anum_pg_aggregate_aggtransspace - 1] = Int32GetDatum(aggTransSpace); diff --git a/src/backend/commands/aggregatecmds.c b/src/backend/commands/aggregatecmds.c index a63539ab217..adc9877e79e 100644 --- a/src/backend/commands/aggregatecmds.c +++ b/src/backend/commands/aggregatecmds.c @@ -26,6 +26,7 @@ #include "catalog/dependency.h" #include "catalog/indexing.h" #include "catalog/pg_aggregate.h" +#include "catalog/pg_aggregate_fn.h" #include "catalog/pg_proc.h" #include "catalog/pg_type.h" #include "commands/alter.h" @@ -39,6 +40,9 @@ #include "utils/syscache.h" +static char extractModify(DefElem *defel); + + /* * DefineAggregate * @@ -67,6 +71,8 @@ DefineAggregate(ParseState *pstate, List *name, List *args, bool oldstyle, List List *mfinalfuncName = NIL; bool finalfuncExtraArgs = false; bool mfinalfuncExtraArgs = false; + char finalfuncModify = 0; + char mfinalfuncModify = 0; List *sortoperatorName = NIL; TypeName *baseType = NULL; TypeName *transType = NULL; @@ -143,6 +149,10 @@ DefineAggregate(ParseState *pstate, List *name, List *args, bool oldstyle, List finalfuncExtraArgs = defGetBoolean(defel); else if (pg_strcasecmp(defel->defname, "mfinalfunc_extra") == 0) mfinalfuncExtraArgs = defGetBoolean(defel); + else if (pg_strcasecmp(defel->defname, "finalfunc_modify") == 0) + finalfuncModify = extractModify(defel); + else if (pg_strcasecmp(defel->defname, "mfinalfunc_modify") == 0) + mfinalfuncModify = extractModify(defel); else if (pg_strcasecmp(defel->defname, "sortop") == 0) sortoperatorName = defGetQualifiedName(defel); else if (pg_strcasecmp(defel->defname, "basetype") == 0) @@ -236,6 +246,15 @@ DefineAggregate(ParseState *pstate, List *name, List *args, bool oldstyle, List } /* + * Default values for modify flags can only be determined once we know the + * aggKind. + */ + if (finalfuncModify == 0) + finalfuncModify = (aggKind == AGGKIND_NORMAL) ? AGGMODIFY_READ_ONLY : AGGMODIFY_READ_WRITE; + if (mfinalfuncModify == 0) + mfinalfuncModify = (aggKind == AGGKIND_NORMAL) ? AGGMODIFY_READ_ONLY : AGGMODIFY_READ_WRITE; + + /* * look up the aggregate's input datatype(s). */ if (oldstyle) @@ -437,6 +456,8 @@ DefineAggregate(ParseState *pstate, List *name, List *args, bool oldstyle, List mfinalfuncName, /* final function name */ finalfuncExtraArgs, mfinalfuncExtraArgs, + finalfuncModify, + mfinalfuncModify, sortoperatorName, /* sort operator name */ transTypeId, /* transition data type */ transSpace, /* transition space */ @@ -446,3 +467,24 @@ DefineAggregate(ParseState *pstate, List *name, List *args, bool oldstyle, List minitval, /* initial condition */ proparallel); /* parallel safe? */ } + +/* + * Convert the string form of [m]finalfunc_modify to the catalog representation + */ +static char +extractModify(DefElem *defel) +{ + char *val = defGetString(defel); + + if (strcmp(val, "read_only") == 0) + return AGGMODIFY_READ_ONLY; + if (strcmp(val, "sharable") == 0) + return AGGMODIFY_SHARABLE; + if (strcmp(val, "read_write") == 0) + return AGGMODIFY_READ_WRITE; + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("parameter \"%s\" must be READ_ONLY, SHARABLE, or READ_WRITE", + defel->defname))); + return 0; /* keep compiler quiet */ +} diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index 6543ecebd3e..40d8ec9db46 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -248,9 +248,9 @@ typedef struct AggStatePerTransData /* * Link to an Aggref expr this state value is for. * - * There can be multiple Aggref's sharing the same state value, as long as - * the inputs and transition function are identical. This points to the - * first one of them. + * There can be multiple Aggref's sharing the same state value, so long as + * the inputs and transition functions are identical and the final + * functions are not read-write. This points to the first one of them. */ Aggref *aggref; @@ -419,8 +419,8 @@ typedef struct AggStatePerAggData Oid finalfn_oid; /* - * fmgr lookup data for final function --- only valid when finalfn_oid oid - * is not InvalidOid. + * fmgr lookup data for final function --- only valid when finalfn_oid is + * not InvalidOid. */ FmgrInfo finalfn; @@ -439,6 +439,11 @@ typedef struct AggStatePerAggData int16 resulttypeLen; bool resulttypeByVal; + /* + * "sharable" is false if this agg cannot share state values with other + * aggregates because the final function is read-write. + */ + bool sharable; } AggStatePerAggData; /* @@ -572,6 +577,7 @@ static void build_pertrans_for_aggref(AggStatePerTrans pertrans, static int find_compatible_peragg(Aggref *newagg, AggState *aggstate, int lastaggno, List **same_input_transnos); static int find_compatible_pertrans(AggState *aggstate, Aggref *newagg, + bool sharable, Oid aggtransfn, Oid aggtranstype, Oid aggserialfn, Oid aggdeserialfn, Datum initValue, bool initValueIsNull, @@ -3105,6 +3111,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) AclResult aclresult; Oid transfn_oid, finalfn_oid; + bool sharable; Oid serialfn_oid, deserialfn_oid; Expr *finalfnexpr; @@ -3177,6 +3184,15 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) else peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn; + /* + * If finalfn is marked read-write, we can't share transition states; + * but it is okay to share states for AGGMODIFY_SHARABLE aggs. Also, + * if we're not executing the finalfn here, we can share regardless. + */ + sharable = (aggform->aggfinalmodify != AGGMODIFY_READ_WRITE) || + (finalfn_oid == InvalidOid); + peragg->sharable = sharable; + serialfn_oid = InvalidOid; deserialfn_oid = InvalidOid; @@ -3315,11 +3331,12 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) * 2. Build working state for invoking the transition function, or * look up previously initialized working state, if we can share it. * - * find_compatible_peragg() already collected a list of per-Trans's - * with the same inputs. Check if any of them have the same transition - * function and initial value. + * find_compatible_peragg() already collected a list of sharable + * per-Trans's with the same inputs. Check if any of them have the + * same transition function and initial value. */ existing_transno = find_compatible_pertrans(aggstate, aggref, + sharable, transfn_oid, aggtranstype, serialfn_oid, deserialfn_oid, initValue, initValueIsNull, @@ -3724,10 +3741,10 @@ GetAggInitVal(Datum textInitVal, Oid transtype) * with this one, with the same input parameters. If no compatible aggregate * can be found, returns -1. * - * As a side-effect, this also collects a list of existing per-Trans structs - * with matching inputs. If no identical Aggref is found, the list is passed - * later to find_compatible_pertrans, to see if we can at least reuse the - * state value of another aggregate. + * As a side-effect, this also collects a list of existing, sharable per-Trans + * structs with matching inputs. If no identical Aggref is found, the list is + * passed later to find_compatible_pertrans, to see if we can at least reuse + * the state value of another aggregate. */ static int find_compatible_peragg(Aggref *newagg, AggState *aggstate, @@ -3785,11 +3802,15 @@ find_compatible_peragg(Aggref *newagg, AggState *aggstate, } /* - * Not identical, but it had the same inputs. Return it to the caller, - * in case we can re-use its per-trans state. + * Not identical, but it had the same inputs. If the final function + * permits sharing, return its transno to the caller, in case we can + * re-use its per-trans state. (If there's already sharing going on, + * we might report a transno more than once. find_compatible_pertrans + * is cheap enough that it's not worth spending cycles to avoid that.) */ - *same_input_transnos = lappend_int(*same_input_transnos, - peragg->transno); + if (peragg->sharable) + *same_input_transnos = lappend_int(*same_input_transnos, + peragg->transno); } return -1; @@ -3804,7 +3825,7 @@ find_compatible_peragg(Aggref *newagg, AggState *aggstate, * verified to match.) */ static int -find_compatible_pertrans(AggState *aggstate, Aggref *newagg, +find_compatible_pertrans(AggState *aggstate, Aggref *newagg, bool sharable, Oid aggtransfn, Oid aggtranstype, Oid aggserialfn, Oid aggdeserialfn, Datum initValue, bool initValueIsNull, @@ -3812,14 +3833,8 @@ find_compatible_pertrans(AggState *aggstate, Aggref *newagg, { ListCell *lc; - /* - * For the moment, never try to share transition states between different - * ordered-set aggregates. This is necessary because the finalfns of the - * built-in OSAs (see orderedsetaggs.c) are destructive of their - * transition states. We should fix them so we can allow this, but not - * losing performance in the normal non-shared case will take some work. - */ - if (AGGKIND_IS_ORDERED_SET(newagg->aggkind)) + /* If this aggregate can't share transition states, give up */ + if (!sharable) return -1; foreach(lc, transnos) diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c index 80be46029f4..02868749f60 100644 --- a/src/backend/executor/nodeWindowAgg.c +++ b/src/backend/executor/nodeWindowAgg.c @@ -49,6 +49,7 @@ #include "utils/datum.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/regproc.h" #include "utils/syscache.h" #include "windowapi.h" @@ -2096,10 +2097,12 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, Oid aggtranstype; AttrNumber initvalAttNo; AclResult aclresult; + bool use_ma_code; Oid transfn_oid, invtransfn_oid, finalfn_oid; bool finalextra; + char finalmodify; Expr *transfnexpr, *invtransfnexpr, *finalfnexpr; @@ -2125,20 +2128,32 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, * Figure out whether we want to use the moving-aggregate implementation, * and collect the right set of fields from the pg_attribute entry. * - * If the frame head can't move, we don't need moving-aggregate code. Even - * if we'd like to use it, don't do so if the aggregate's arguments (and - * FILTER clause if any) contain any calls to volatile functions. - * Otherwise, the difference between restarting and not restarting the - * aggregation would be user-visible. + * It's possible that an aggregate would supply a safe moving-aggregate + * implementation and an unsafe normal one, in which case our hand is + * forced. Otherwise, if the frame head can't move, we don't need + * moving-aggregate code. Even if we'd like to use it, don't do so if the + * aggregate's arguments (and FILTER clause if any) contain any calls to + * volatile functions. Otherwise, the difference between restarting and + * not restarting the aggregation would be user-visible. */ - if (OidIsValid(aggform->aggminvtransfn) && - !(winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING) && - !contain_volatile_functions((Node *) wfunc)) + if (!OidIsValid(aggform->aggminvtransfn)) + use_ma_code = false; /* sine qua non */ + else if (aggform->aggmfinalmodify == AGGMODIFY_READ_ONLY && + aggform->aggfinalmodify != AGGMODIFY_READ_ONLY) + use_ma_code = true; /* decision forced by safety */ + else if (winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING) + use_ma_code = false; /* non-moving frame head */ + else if (contain_volatile_functions((Node *) wfunc)) + use_ma_code = false; /* avoid possible behavioral change */ + else + use_ma_code = true; /* yes, let's use it */ + if (use_ma_code) { peraggstate->transfn_oid = transfn_oid = aggform->aggmtransfn; peraggstate->invtransfn_oid = invtransfn_oid = aggform->aggminvtransfn; peraggstate->finalfn_oid = finalfn_oid = aggform->aggmfinalfn; finalextra = aggform->aggmfinalextra; + finalmodify = aggform->aggmfinalmodify; aggtranstype = aggform->aggmtranstype; initvalAttNo = Anum_pg_aggregate_aggminitval; } @@ -2148,6 +2163,7 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, peraggstate->invtransfn_oid = invtransfn_oid = InvalidOid; peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn; finalextra = aggform->aggfinalextra; + finalmodify = aggform->aggfinalmodify; aggtranstype = aggform->aggtranstype; initvalAttNo = Anum_pg_aggregate_agginitval; } @@ -2198,6 +2214,17 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, } } + /* + * If the selected finalfn isn't read-only, we can't run this aggregate as + * a window function. This is a user-facing error, so we take a bit more + * care with the error message than elsewhere in this function. + */ + if (finalmodify != AGGMODIFY_READ_ONLY) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("aggregate function %s does not support use as a window function", + format_procedure(wfunc->winfnoid)))); + /* Detect how many arguments to pass to the finalfn */ if (finalextra) peraggstate->numFinalArgs = numArguments + 1; |