aboutsummaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/catalog/pg_aggregate.c5
-rw-r--r--src/backend/commands/aggregatecmds.c42
-rw-r--r--src/backend/executor/nodeAgg.c65
-rw-r--r--src/backend/executor/nodeWindowAgg.c43
4 files changed, 122 insertions, 33 deletions
diff --git a/src/backend/catalog/pg_aggregate.c b/src/backend/catalog/pg_aggregate.c
index a9204503d36..ca3fd819b48 100644
--- a/src/backend/catalog/pg_aggregate.c
+++ b/src/backend/catalog/pg_aggregate.c
@@ -19,6 +19,7 @@
#include "catalog/dependency.h"
#include "catalog/indexing.h"
#include "catalog/pg_aggregate.h"
+#include "catalog/pg_aggregate_fn.h"
#include "catalog/pg_language.h"
#include "catalog/pg_operator.h"
#include "catalog/pg_proc.h"
@@ -65,6 +66,8 @@ AggregateCreate(const char *aggName,
List *aggmfinalfnName,
bool finalfnExtraArgs,
bool mfinalfnExtraArgs,
+ char finalfnModify,
+ char mfinalfnModify,
List *aggsortopName,
Oid aggTransType,
int32 aggTransSpace,
@@ -656,6 +659,8 @@ AggregateCreate(const char *aggName,
values[Anum_pg_aggregate_aggmfinalfn - 1] = ObjectIdGetDatum(mfinalfn);
values[Anum_pg_aggregate_aggfinalextra - 1] = BoolGetDatum(finalfnExtraArgs);
values[Anum_pg_aggregate_aggmfinalextra - 1] = BoolGetDatum(mfinalfnExtraArgs);
+ values[Anum_pg_aggregate_aggfinalmodify - 1] = CharGetDatum(finalfnModify);
+ values[Anum_pg_aggregate_aggmfinalmodify - 1] = CharGetDatum(mfinalfnModify);
values[Anum_pg_aggregate_aggsortop - 1] = ObjectIdGetDatum(sortop);
values[Anum_pg_aggregate_aggtranstype - 1] = ObjectIdGetDatum(aggTransType);
values[Anum_pg_aggregate_aggtransspace - 1] = Int32GetDatum(aggTransSpace);
diff --git a/src/backend/commands/aggregatecmds.c b/src/backend/commands/aggregatecmds.c
index a63539ab217..adc9877e79e 100644
--- a/src/backend/commands/aggregatecmds.c
+++ b/src/backend/commands/aggregatecmds.c
@@ -26,6 +26,7 @@
#include "catalog/dependency.h"
#include "catalog/indexing.h"
#include "catalog/pg_aggregate.h"
+#include "catalog/pg_aggregate_fn.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_type.h"
#include "commands/alter.h"
@@ -39,6 +40,9 @@
#include "utils/syscache.h"
+static char extractModify(DefElem *defel);
+
+
/*
* DefineAggregate
*
@@ -67,6 +71,8 @@ DefineAggregate(ParseState *pstate, List *name, List *args, bool oldstyle, List
List *mfinalfuncName = NIL;
bool finalfuncExtraArgs = false;
bool mfinalfuncExtraArgs = false;
+ char finalfuncModify = 0;
+ char mfinalfuncModify = 0;
List *sortoperatorName = NIL;
TypeName *baseType = NULL;
TypeName *transType = NULL;
@@ -143,6 +149,10 @@ DefineAggregate(ParseState *pstate, List *name, List *args, bool oldstyle, List
finalfuncExtraArgs = defGetBoolean(defel);
else if (pg_strcasecmp(defel->defname, "mfinalfunc_extra") == 0)
mfinalfuncExtraArgs = defGetBoolean(defel);
+ else if (pg_strcasecmp(defel->defname, "finalfunc_modify") == 0)
+ finalfuncModify = extractModify(defel);
+ else if (pg_strcasecmp(defel->defname, "mfinalfunc_modify") == 0)
+ mfinalfuncModify = extractModify(defel);
else if (pg_strcasecmp(defel->defname, "sortop") == 0)
sortoperatorName = defGetQualifiedName(defel);
else if (pg_strcasecmp(defel->defname, "basetype") == 0)
@@ -236,6 +246,15 @@ DefineAggregate(ParseState *pstate, List *name, List *args, bool oldstyle, List
}
/*
+ * Default values for modify flags can only be determined once we know the
+ * aggKind.
+ */
+ if (finalfuncModify == 0)
+ finalfuncModify = (aggKind == AGGKIND_NORMAL) ? AGGMODIFY_READ_ONLY : AGGMODIFY_READ_WRITE;
+ if (mfinalfuncModify == 0)
+ mfinalfuncModify = (aggKind == AGGKIND_NORMAL) ? AGGMODIFY_READ_ONLY : AGGMODIFY_READ_WRITE;
+
+ /*
* look up the aggregate's input datatype(s).
*/
if (oldstyle)
@@ -437,6 +456,8 @@ DefineAggregate(ParseState *pstate, List *name, List *args, bool oldstyle, List
mfinalfuncName, /* final function name */
finalfuncExtraArgs,
mfinalfuncExtraArgs,
+ finalfuncModify,
+ mfinalfuncModify,
sortoperatorName, /* sort operator name */
transTypeId, /* transition data type */
transSpace, /* transition space */
@@ -446,3 +467,24 @@ DefineAggregate(ParseState *pstate, List *name, List *args, bool oldstyle, List
minitval, /* initial condition */
proparallel); /* parallel safe? */
}
+
+/*
+ * Convert the string form of [m]finalfunc_modify to the catalog representation
+ */
+static char
+extractModify(DefElem *defel)
+{
+ char *val = defGetString(defel);
+
+ if (strcmp(val, "read_only") == 0)
+ return AGGMODIFY_READ_ONLY;
+ if (strcmp(val, "sharable") == 0)
+ return AGGMODIFY_SHARABLE;
+ if (strcmp(val, "read_write") == 0)
+ return AGGMODIFY_READ_WRITE;
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("parameter \"%s\" must be READ_ONLY, SHARABLE, or READ_WRITE",
+ defel->defname)));
+ return 0; /* keep compiler quiet */
+}
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index 6543ecebd3e..40d8ec9db46 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -248,9 +248,9 @@ typedef struct AggStatePerTransData
/*
* Link to an Aggref expr this state value is for.
*
- * There can be multiple Aggref's sharing the same state value, as long as
- * the inputs and transition function are identical. This points to the
- * first one of them.
+ * There can be multiple Aggref's sharing the same state value, so long as
+ * the inputs and transition functions are identical and the final
+ * functions are not read-write. This points to the first one of them.
*/
Aggref *aggref;
@@ -419,8 +419,8 @@ typedef struct AggStatePerAggData
Oid finalfn_oid;
/*
- * fmgr lookup data for final function --- only valid when finalfn_oid oid
- * is not InvalidOid.
+ * fmgr lookup data for final function --- only valid when finalfn_oid is
+ * not InvalidOid.
*/
FmgrInfo finalfn;
@@ -439,6 +439,11 @@ typedef struct AggStatePerAggData
int16 resulttypeLen;
bool resulttypeByVal;
+ /*
+ * "sharable" is false if this agg cannot share state values with other
+ * aggregates because the final function is read-write.
+ */
+ bool sharable;
} AggStatePerAggData;
/*
@@ -572,6 +577,7 @@ static void build_pertrans_for_aggref(AggStatePerTrans pertrans,
static int find_compatible_peragg(Aggref *newagg, AggState *aggstate,
int lastaggno, List **same_input_transnos);
static int find_compatible_pertrans(AggState *aggstate, Aggref *newagg,
+ bool sharable,
Oid aggtransfn, Oid aggtranstype,
Oid aggserialfn, Oid aggdeserialfn,
Datum initValue, bool initValueIsNull,
@@ -3105,6 +3111,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
AclResult aclresult;
Oid transfn_oid,
finalfn_oid;
+ bool sharable;
Oid serialfn_oid,
deserialfn_oid;
Expr *finalfnexpr;
@@ -3177,6 +3184,15 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
else
peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
+ /*
+ * If finalfn is marked read-write, we can't share transition states;
+ * but it is okay to share states for AGGMODIFY_SHARABLE aggs. Also,
+ * if we're not executing the finalfn here, we can share regardless.
+ */
+ sharable = (aggform->aggfinalmodify != AGGMODIFY_READ_WRITE) ||
+ (finalfn_oid == InvalidOid);
+ peragg->sharable = sharable;
+
serialfn_oid = InvalidOid;
deserialfn_oid = InvalidOid;
@@ -3315,11 +3331,12 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
* 2. Build working state for invoking the transition function, or
* look up previously initialized working state, if we can share it.
*
- * find_compatible_peragg() already collected a list of per-Trans's
- * with the same inputs. Check if any of them have the same transition
- * function and initial value.
+ * find_compatible_peragg() already collected a list of sharable
+ * per-Trans's with the same inputs. Check if any of them have the
+ * same transition function and initial value.
*/
existing_transno = find_compatible_pertrans(aggstate, aggref,
+ sharable,
transfn_oid, aggtranstype,
serialfn_oid, deserialfn_oid,
initValue, initValueIsNull,
@@ -3724,10 +3741,10 @@ GetAggInitVal(Datum textInitVal, Oid transtype)
* with this one, with the same input parameters. If no compatible aggregate
* can be found, returns -1.
*
- * As a side-effect, this also collects a list of existing per-Trans structs
- * with matching inputs. If no identical Aggref is found, the list is passed
- * later to find_compatible_pertrans, to see if we can at least reuse the
- * state value of another aggregate.
+ * As a side-effect, this also collects a list of existing, sharable per-Trans
+ * structs with matching inputs. If no identical Aggref is found, the list is
+ * passed later to find_compatible_pertrans, to see if we can at least reuse
+ * the state value of another aggregate.
*/
static int
find_compatible_peragg(Aggref *newagg, AggState *aggstate,
@@ -3785,11 +3802,15 @@ find_compatible_peragg(Aggref *newagg, AggState *aggstate,
}
/*
- * Not identical, but it had the same inputs. Return it to the caller,
- * in case we can re-use its per-trans state.
+ * Not identical, but it had the same inputs. If the final function
+ * permits sharing, return its transno to the caller, in case we can
+ * re-use its per-trans state. (If there's already sharing going on,
+ * we might report a transno more than once. find_compatible_pertrans
+ * is cheap enough that it's not worth spending cycles to avoid that.)
*/
- *same_input_transnos = lappend_int(*same_input_transnos,
- peragg->transno);
+ if (peragg->sharable)
+ *same_input_transnos = lappend_int(*same_input_transnos,
+ peragg->transno);
}
return -1;
@@ -3804,7 +3825,7 @@ find_compatible_peragg(Aggref *newagg, AggState *aggstate,
* verified to match.)
*/
static int
-find_compatible_pertrans(AggState *aggstate, Aggref *newagg,
+find_compatible_pertrans(AggState *aggstate, Aggref *newagg, bool sharable,
Oid aggtransfn, Oid aggtranstype,
Oid aggserialfn, Oid aggdeserialfn,
Datum initValue, bool initValueIsNull,
@@ -3812,14 +3833,8 @@ find_compatible_pertrans(AggState *aggstate, Aggref *newagg,
{
ListCell *lc;
- /*
- * For the moment, never try to share transition states between different
- * ordered-set aggregates. This is necessary because the finalfns of the
- * built-in OSAs (see orderedsetaggs.c) are destructive of their
- * transition states. We should fix them so we can allow this, but not
- * losing performance in the normal non-shared case will take some work.
- */
- if (AGGKIND_IS_ORDERED_SET(newagg->aggkind))
+ /* If this aggregate can't share transition states, give up */
+ if (!sharable)
return -1;
foreach(lc, transnos)
diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c
index 80be46029f4..02868749f60 100644
--- a/src/backend/executor/nodeWindowAgg.c
+++ b/src/backend/executor/nodeWindowAgg.c
@@ -49,6 +49,7 @@
#include "utils/datum.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
+#include "utils/regproc.h"
#include "utils/syscache.h"
#include "windowapi.h"
@@ -2096,10 +2097,12 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
Oid aggtranstype;
AttrNumber initvalAttNo;
AclResult aclresult;
+ bool use_ma_code;
Oid transfn_oid,
invtransfn_oid,
finalfn_oid;
bool finalextra;
+ char finalmodify;
Expr *transfnexpr,
*invtransfnexpr,
*finalfnexpr;
@@ -2125,20 +2128,32 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
* Figure out whether we want to use the moving-aggregate implementation,
* and collect the right set of fields from the pg_attribute entry.
*
- * If the frame head can't move, we don't need moving-aggregate code. Even
- * if we'd like to use it, don't do so if the aggregate's arguments (and
- * FILTER clause if any) contain any calls to volatile functions.
- * Otherwise, the difference between restarting and not restarting the
- * aggregation would be user-visible.
+ * It's possible that an aggregate would supply a safe moving-aggregate
+ * implementation and an unsafe normal one, in which case our hand is
+ * forced. Otherwise, if the frame head can't move, we don't need
+ * moving-aggregate code. Even if we'd like to use it, don't do so if the
+ * aggregate's arguments (and FILTER clause if any) contain any calls to
+ * volatile functions. Otherwise, the difference between restarting and
+ * not restarting the aggregation would be user-visible.
*/
- if (OidIsValid(aggform->aggminvtransfn) &&
- !(winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING) &&
- !contain_volatile_functions((Node *) wfunc))
+ if (!OidIsValid(aggform->aggminvtransfn))
+ use_ma_code = false; /* sine qua non */
+ else if (aggform->aggmfinalmodify == AGGMODIFY_READ_ONLY &&
+ aggform->aggfinalmodify != AGGMODIFY_READ_ONLY)
+ use_ma_code = true; /* decision forced by safety */
+ else if (winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)
+ use_ma_code = false; /* non-moving frame head */
+ else if (contain_volatile_functions((Node *) wfunc))
+ use_ma_code = false; /* avoid possible behavioral change */
+ else
+ use_ma_code = true; /* yes, let's use it */
+ if (use_ma_code)
{
peraggstate->transfn_oid = transfn_oid = aggform->aggmtransfn;
peraggstate->invtransfn_oid = invtransfn_oid = aggform->aggminvtransfn;
peraggstate->finalfn_oid = finalfn_oid = aggform->aggmfinalfn;
finalextra = aggform->aggmfinalextra;
+ finalmodify = aggform->aggmfinalmodify;
aggtranstype = aggform->aggmtranstype;
initvalAttNo = Anum_pg_aggregate_aggminitval;
}
@@ -2148,6 +2163,7 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
peraggstate->invtransfn_oid = invtransfn_oid = InvalidOid;
peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
finalextra = aggform->aggfinalextra;
+ finalmodify = aggform->aggfinalmodify;
aggtranstype = aggform->aggtranstype;
initvalAttNo = Anum_pg_aggregate_agginitval;
}
@@ -2198,6 +2214,17 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
}
}
+ /*
+ * If the selected finalfn isn't read-only, we can't run this aggregate as
+ * a window function. This is a user-facing error, so we take a bit more
+ * care with the error message than elsewhere in this function.
+ */
+ if (finalmodify != AGGMODIFY_READ_ONLY)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("aggregate function %s does not support use as a window function",
+ format_procedure(wfunc->winfnoid))));
+
/* Detect how many arguments to pass to the finalfn */
if (finalextra)
peraggstate->numFinalArgs = numArguments + 1;