aboutsummaryrefslogtreecommitdiff
path: root/src/backend/utils/adt/numeric.c
diff options
context:
space:
mode:
authorRobert Haas <rhaas@postgresql.org>2016-04-05 14:24:59 -0400
committerRobert Haas <rhaas@postgresql.org>2016-04-05 14:32:53 -0400
commit11c8669c0cc7abc7a7702594cf13452c378d2517 (patch)
treeaaea45448541428b702586fca7e30bec284f8c9a /src/backend/utils/adt/numeric.c
parent7117685461af50f50c03f43e6a622284c8d54694 (diff)
downloadpostgresql-11c8669c0cc7abc7a7702594cf13452c378d2517.tar.gz
postgresql-11c8669c0cc7abc7a7702594cf13452c378d2517.zip
Add parallel query support functions for assorted aggregates.
This lets us use parallel aggregate for a variety of useful cases that didn't work before, like sum(int8), sum(numeric), several versions of avg(), and various other functions. Add some regression tests, as well, testing the general sanity of these and future catalog entries. David Rowley, reviewed by Tomas Vondra, with a few further changes by me.
Diffstat (limited to 'src/backend/utils/adt/numeric.c')
-rw-r--r--src/backend/utils/adt/numeric.c860
1 files changed, 860 insertions, 0 deletions
diff --git a/src/backend/utils/adt/numeric.c b/src/backend/utils/adt/numeric.c
index 07b264572d9..3ba373a15b9 100644
--- a/src/backend/utils/adt/numeric.c
+++ b/src/backend/utils/adt/numeric.c
@@ -436,6 +436,7 @@ static int32 numericvar_to_int32(NumericVar *var);
static bool numericvar_to_int64(NumericVar *var, int64 *result);
static void int64_to_numericvar(int64 val, NumericVar *var);
#ifdef HAVE_INT128
+static bool numericvar_to_int128(NumericVar *var, int128 *result);
static void int128_to_numericvar(int128 val, NumericVar *var);
#endif
static double numeric_to_double_no_overflow(Numeric num);
@@ -3349,6 +3350,77 @@ numeric_accum(PG_FUNCTION_ARGS)
}
/*
+ * Generic combine function for numeric aggregates which require sumX2
+ */
+Datum
+numeric_combine(PG_FUNCTION_ARGS)
+{
+ NumericAggState *state1;
+ NumericAggState *state2;
+ MemoryContext agg_context;
+ MemoryContext old_context;
+
+ if (!AggCheckCallContext(fcinfo, &agg_context))
+ elog(ERROR, "aggregate function called in non-aggregate context");
+
+ state1 = PG_ARGISNULL(0) ? NULL : (NumericAggState *) PG_GETARG_POINTER(0);
+ state2 = PG_ARGISNULL(1) ? NULL : (NumericAggState *) PG_GETARG_POINTER(1);
+
+ if (state2 == NULL)
+ PG_RETURN_POINTER(state1);
+
+ /* manually copy all fields from state2 to state1 */
+ if (state1 == NULL)
+ {
+ old_context = MemoryContextSwitchTo(agg_context);
+
+ state1 = makeNumericAggState(fcinfo, true);
+ state1->N = state2->N;
+ state1->NaNcount = state2->NaNcount;
+ state1->maxScale = state2->maxScale;
+ state1->maxScaleCount = state2->maxScaleCount;
+
+ init_var(&state1->sumX);
+ set_var_from_var(&state2->sumX, &state1->sumX);
+
+ init_var(&state1->sumX2);
+ set_var_from_var(&state2->sumX2, &state1->sumX2);
+
+ MemoryContextSwitchTo(old_context);
+
+ PG_RETURN_POINTER(state1);
+ }
+
+ if (state2->N > 0)
+ {
+ state1->N += state2->N;
+ state1->NaNcount += state2->NaNcount;
+
+ /*
+ * These are currently only needed for moving aggregates, but let's
+ * do the right thing anyway...
+ */
+ if (state2->maxScale > state1->maxScale)
+ {
+ state1->maxScale = state2->maxScale;
+ state1->maxScaleCount = state2->maxScaleCount;
+ }
+ else if (state2->maxScale == state1->maxScale)
+ state1->maxScaleCount += state2->maxScaleCount;
+
+ /* The rest of this needs to work in the aggregate context */
+ old_context = MemoryContextSwitchTo(agg_context);
+
+ /* Accumulate sums */
+ add_var(&(state1->sumX), &(state2->sumX), &(state1->sumX));
+ add_var(&(state1->sumX2), &(state2->sumX2), &(state1->sumX2));
+
+ MemoryContextSwitchTo(old_context);
+ }
+ PG_RETURN_POINTER(state1);
+}
+
+/*
* Generic transition function for numeric aggregates that don't require sumX2.
*/
Datum
@@ -3369,6 +3441,307 @@ numeric_avg_accum(PG_FUNCTION_ARGS)
}
/*
+ * Combine function for numeric aggregates which don't require sumX2
+ */
+Datum
+numeric_avg_combine(PG_FUNCTION_ARGS)
+{
+ NumericAggState *state1;
+ NumericAggState *state2;
+ MemoryContext agg_context;
+ MemoryContext old_context;
+
+ if (!AggCheckCallContext(fcinfo, &agg_context))
+ elog(ERROR, "aggregate function called in non-aggregate context");
+
+ state1 = PG_ARGISNULL(0) ? NULL : (NumericAggState *) PG_GETARG_POINTER(0);
+ state2 = PG_ARGISNULL(1) ? NULL : (NumericAggState *) PG_GETARG_POINTER(1);
+
+ if (state2 == NULL)
+ PG_RETURN_POINTER(state1);
+
+ /* manually copy all fields from state2 to state1 */
+ if (state1 == NULL)
+ {
+ old_context = MemoryContextSwitchTo(agg_context);
+
+ state1 = makeNumericAggState(fcinfo, false);
+ state1->N = state2->N;
+ state1->NaNcount = state2->NaNcount;
+ state1->maxScale = state2->maxScale;
+ state1->maxScaleCount = state2->maxScaleCount;
+
+ init_var(&state1->sumX);
+ set_var_from_var(&state2->sumX, &state1->sumX);
+
+ MemoryContextSwitchTo(old_context);
+
+ PG_RETURN_POINTER(state1);
+ }
+
+ if (state2->N > 0)
+ {
+ state1->N += state2->N;
+ state1->NaNcount += state2->NaNcount;
+
+ /*
+ * These are currently only needed for moving aggregates, but let's
+ * do the right thing anyway...
+ */
+ if (state2->maxScale > state1->maxScale)
+ {
+ state1->maxScale = state2->maxScale;
+ state1->maxScaleCount = state2->maxScaleCount;
+ }
+ else if (state2->maxScale == state1->maxScale)
+ state1->maxScaleCount += state2->maxScaleCount;
+
+ /* The rest of this needs to work in the aggregate context */
+ old_context = MemoryContextSwitchTo(agg_context);
+
+ /* Accumulate sums */
+ add_var(&(state1->sumX), &(state2->sumX), &(state1->sumX));
+
+ MemoryContextSwitchTo(old_context);
+ }
+ PG_RETURN_POINTER(state1);
+}
+
+/*
+ * numeric_avg_serialize
+ * Serialize NumericAggState for numeric aggregates that don't require
+ * sumX2. Serializes NumericAggState into bytea using the standard pq API.
+ *
+ * numeric_avg_deserialize(numeric_avg_serialize(state)) must result in a state
+ * which matches the original input state.
+ */
+Datum
+numeric_avg_serialize(PG_FUNCTION_ARGS)
+{
+ NumericAggState *state;
+ StringInfoData buf;
+ Datum temp;
+ bytea *sumX;
+ bytea *result;
+
+ /* Ensure we disallow calling when not in aggregate context */
+ if (!AggCheckCallContext(fcinfo, NULL))
+ elog(ERROR, "aggregate function called in non-aggregate context");
+
+ state = (NumericAggState *) PG_GETARG_POINTER(0);
+
+ /*
+ * This is a little wasteful since make_result converts the NumericVar
+ * into a Numeric and numeric_send converts it back again. Is it worth
+ * splitting the tasks in numeric_send into separate functions to stop
+ * this? Doing so would also remove the fmgr call overhead.
+ */
+ temp = DirectFunctionCall1(numeric_send,
+ NumericGetDatum(make_result(&state->sumX)));
+ sumX = DatumGetByteaP(temp);
+
+ pq_begintypsend(&buf);
+
+ /* N */
+ pq_sendint64(&buf, state->N);
+
+ /* sumX */
+ pq_sendbytes(&buf, VARDATA(sumX), VARSIZE(sumX) - VARHDRSZ);
+
+ /* maxScale */
+ pq_sendint(&buf, state->maxScale, 4);
+
+ /* maxScaleCount */
+ pq_sendint64(&buf, state->maxScaleCount);
+
+ /* NaNcount */
+ pq_sendint64(&buf, state->NaNcount);
+
+ result = pq_endtypsend(&buf);
+
+ PG_RETURN_BYTEA_P(result);
+}
+
+/*
+ * numeric_avg_deserialize
+ * Deserialize bytea into NumericAggState for numeric aggregates that
+ * don't require sumX2. Deserializes bytea into NumericAggState using the
+ * standard pq API.
+ *
+ * numeric_avg_serialize(numeric_avg_deserialize(bytea)) must result in a value
+ * which matches the original bytea value.
+ */
+Datum
+numeric_avg_deserialize(PG_FUNCTION_ARGS)
+{
+ bytea *sstate = PG_GETARG_BYTEA_P(0);
+ NumericAggState *result;
+ Datum temp;
+ StringInfoData buf;
+
+ if (!AggCheckCallContext(fcinfo, NULL))
+ elog(ERROR, "aggregate function called in non-aggregate context");
+
+ /*
+ * Copy the bytea into a StringInfo so that we can "receive" it using the
+ * standard pq API.
+ */
+ initStringInfo(&buf);
+ appendBinaryStringInfo(&buf, VARDATA(sstate), VARSIZE(sstate) - VARHDRSZ);
+
+ result = makeNumericAggState(fcinfo, false);
+
+ /* N */
+ result->N = pq_getmsgint64(&buf);
+
+ /* sumX */
+ temp = DirectFunctionCall3(numeric_recv,
+ PointerGetDatum(&buf),
+ InvalidOid,
+ -1);
+ set_var_from_num(DatumGetNumeric(temp), &result->sumX);
+
+ /* maxScale */
+ result->maxScale = pq_getmsgint(&buf, 4);
+
+ /* maxScaleCount */
+ result->maxScaleCount = pq_getmsgint64(&buf);
+
+ /* NaNcount */
+ result->NaNcount = pq_getmsgint64(&buf);
+
+ pq_getmsgend(&buf);
+ pfree(buf.data);
+
+ PG_RETURN_POINTER(result);
+}
+
+/*
+ * numeric_serialize
+ * Serialization function for NumericAggState for numeric aggregates that
+ * require sumX2. Serializes NumericAggState into bytea using the standard
+ * pq API.
+ *
+ * numeric_deserialize(numeric_serialize(state)) must result in a state which
+ * matches the original input state.
+ */
+Datum
+numeric_serialize(PG_FUNCTION_ARGS)
+{
+ NumericAggState *state;
+ StringInfoData buf;
+ Datum temp;
+ bytea *sumX;
+ bytea *sumX2;
+ bytea *result;
+
+ /* Ensure we disallow calling when not in aggregate context */
+ if (!AggCheckCallContext(fcinfo, NULL))
+ elog(ERROR, "aggregate function called in non-aggregate context");
+
+ state = (NumericAggState *) PG_GETARG_POINTER(0);
+
+ /*
+ * This is a little wasteful since make_result converts the NumericVar
+ * into a Numeric and numeric_send converts it back again. Is it worth
+ * splitting the tasks in numeric_send into separate functions to stop
+ * this? Doing so would also remove the fmgr call overhead.
+ */
+ temp = DirectFunctionCall1(numeric_send,
+ NumericGetDatum(make_result(&state->sumX)));
+ sumX = DatumGetByteaP(temp);
+
+ temp = DirectFunctionCall1(numeric_send,
+ NumericGetDatum(make_result(&state->sumX2)));
+ sumX2 = DatumGetByteaP(temp);
+
+ pq_begintypsend(&buf);
+
+ /* N */
+ pq_sendint64(&buf, state->N);
+
+ /* sumX */
+ pq_sendbytes(&buf, VARDATA(sumX), VARSIZE(sumX) - VARHDRSZ);
+
+ /* sumX2 */
+ pq_sendbytes(&buf, VARDATA(sumX2), VARSIZE(sumX2) - VARHDRSZ);
+
+ /* maxScale */
+ pq_sendint(&buf, state->maxScale, 4);
+
+ /* maxScaleCount */
+ pq_sendint64(&buf, state->maxScaleCount);
+
+ /* NaNcount */
+ pq_sendint64(&buf, state->NaNcount);
+
+ result = pq_endtypsend(&buf);
+
+ PG_RETURN_BYTEA_P(result);
+}
+
+/*
+ * numeric_deserialize
+ * Deserialization function for NumericAggState for numeric aggregates that
+ * require sumX2. Deserializes bytea into into NumericAggState using the
+ * standard pq API.
+ *
+ * numeric_serialize(numeric_deserialize(bytea)) must result in a value which
+ * matches the original bytea value.
+ */
+Datum
+numeric_deserialize(PG_FUNCTION_ARGS)
+{
+ bytea *sstate = PG_GETARG_BYTEA_P(0);
+ NumericAggState *result;
+ Datum temp;
+ StringInfoData buf;
+
+ if (!AggCheckCallContext(fcinfo, NULL))
+ elog(ERROR, "aggregate function called in non-aggregate context");
+
+ /*
+ * Copy the bytea into a StringInfo so that we can "receive" it using the
+ * standard pq API.
+ */
+ initStringInfo(&buf);
+ appendBinaryStringInfo(&buf, VARDATA(sstate), VARSIZE(sstate) - VARHDRSZ);
+
+ result = makeNumericAggState(fcinfo, false);
+
+ /* N */
+ result->N = pq_getmsgint64(&buf);
+
+ /* sumX */
+ temp = DirectFunctionCall3(numeric_recv,
+ PointerGetDatum(&buf),
+ InvalidOid,
+ -1);
+ set_var_from_num(DatumGetNumeric(temp), &result->sumX);
+
+ /* sumX2 */
+ temp = DirectFunctionCall3(numeric_recv,
+ PointerGetDatum(&buf),
+ InvalidOid,
+ -1);
+ set_var_from_num(DatumGetNumeric(temp), &result->sumX2);
+
+ /* maxScale */
+ result->maxScale = pq_getmsgint(&buf, 4);
+
+ /* maxScaleCount */
+ result->maxScaleCount = pq_getmsgint64(&buf);
+
+ /* NaNcount */
+ result->NaNcount = pq_getmsgint64(&buf);
+
+ pq_getmsgend(&buf);
+ pfree(buf.data);
+
+ PG_RETURN_POINTER(result);
+}
+
+/*
* Generic inverse transition function for numeric aggregates
* (with or without requirement for X^2).
*/
@@ -3552,6 +3925,215 @@ int8_accum(PG_FUNCTION_ARGS)
}
/*
+ * Combine function for numeric aggregates which require sumX2
+ */
+Datum
+numeric_poly_combine(PG_FUNCTION_ARGS)
+{
+ PolyNumAggState *state1;
+ PolyNumAggState *state2;
+ MemoryContext agg_context;
+ MemoryContext old_context;
+
+ if (!AggCheckCallContext(fcinfo, &agg_context))
+ elog(ERROR, "aggregate function called in non-aggregate context");
+
+ state1 = PG_ARGISNULL(0) ? NULL : (PolyNumAggState *) PG_GETARG_POINTER(0);
+ state2 = PG_ARGISNULL(1) ? NULL : (PolyNumAggState *) PG_GETARG_POINTER(1);
+
+ if (state2 == NULL)
+ PG_RETURN_POINTER(state1);
+
+ /* manually copy all fields from state2 to state1 */
+ if (state1 == NULL)
+ {
+ old_context = MemoryContextSwitchTo(agg_context);
+
+ state1 = makePolyNumAggState(fcinfo, true);
+ state1->N = state2->N;
+
+#ifdef HAVE_INT128
+ state1->sumX = state2->sumX;
+ state1->sumX2 = state2->sumX2;
+#else
+ init_var(&(state1->sumX));
+ set_var_from_var(&(state2->sumX), &(state1->sumX));
+
+ init_var(&state1->sumX2);
+ set_var_from_var(&(state2->sumX2), &(state1->sumX2));
+#endif
+
+ MemoryContextSwitchTo(old_context);
+
+ PG_RETURN_POINTER(state1);
+ }
+
+ if (state2->N > 0)
+ {
+ state1->N += state2->N;
+
+#ifdef HAVE_INT128
+ state1->sumX += state2->sumX;
+ state1->sumX2 += state2->sumX2;
+#else
+ /* The rest of this needs to work in the aggregate context */
+ old_context = MemoryContextSwitchTo(agg_context);
+
+ /* Accumulate sums */
+ add_var(&(state1->sumX), &(state2->sumX), &(state1->sumX));
+ add_var(&(state1->sumX2), &(state2->sumX2), &(state1->sumX2));
+
+ MemoryContextSwitchTo(old_context);
+#endif
+
+ }
+ PG_RETURN_POINTER(state1);
+}
+
+/*
+ * numeric_poly_serialize
+ * Serialize PolyNumAggState into bytea using the standard pq API for
+ * aggregate functions which require sumX2.
+ *
+ * numeric_poly_deserialize(numeric_poly_serialize(state)) must result in a
+ * state which matches the original input state.
+ */
+Datum
+numeric_poly_serialize(PG_FUNCTION_ARGS)
+{
+ PolyNumAggState *state;
+ StringInfoData buf;
+ bytea *sumX;
+ bytea *sumX2;
+ bytea *result;
+
+ /* Ensure we disallow calling when not in aggregate context */
+ if (!AggCheckCallContext(fcinfo, NULL))
+ elog(ERROR, "aggregate function called in non-aggregate context");
+
+ state = (PolyNumAggState *) PG_GETARG_POINTER(0);
+
+ /*
+ * If the platform supports int128 then sumX and sumX2 will be a 128 bit
+ * integer type. Here we'll convert that into a numeric type so that the
+ * combine state is in the same format for both int128 enabled machines
+ * and machines which don't support that type. The logic here is that one
+ * day we might like to send these over to another server for further
+ * processing and we want a standard format to work with.
+ */
+ {
+ Datum temp;
+
+#ifdef HAVE_INT128
+ NumericVar num;
+
+ init_var(&num);
+ int128_to_numericvar(state->sumX, &num);
+ temp = DirectFunctionCall1(numeric_send,
+ NumericGetDatum(make_result(&num)));
+ sumX = DatumGetByteaP(temp);
+
+ int128_to_numericvar(state->sumX2, &num);
+ temp = DirectFunctionCall1(numeric_send,
+ NumericGetDatum(make_result(&num)));
+ sumX2 = DatumGetByteaP(temp);
+ free_var(&num);
+#else
+ temp = DirectFunctionCall1(numeric_send,
+ NumericGetDatum(make_result(&state->sumX)));
+ sumX = DatumGetByteaP(temp);
+
+ temp = DirectFunctionCall1(numeric_send,
+ NumericGetDatum(make_result(&state->sumX2)));
+ sumX2 = DatumGetByteaP(temp);
+#endif
+ }
+
+ pq_begintypsend(&buf);
+
+ /* N */
+ pq_sendint64(&buf, state->N);
+
+ /* sumX */
+ pq_sendbytes(&buf, VARDATA(sumX), VARSIZE(sumX) - VARHDRSZ);
+
+ /* sumX2 */
+ pq_sendbytes(&buf, VARDATA(sumX2), VARSIZE(sumX2) - VARHDRSZ);
+
+ result = pq_endtypsend(&buf);
+
+ PG_RETURN_BYTEA_P(result);
+}
+
+/*
+ * numeric_poly_deserialize
+ * Deserialize PolyNumAggState from bytea using the standard pq API for
+ * aggregate functions which require sumX2.
+ *
+ * numeric_poly_serialize(numeric_poly_deserialize(bytea)) must result in a
+ * state which matches the original input state.
+ */
+Datum
+numeric_poly_deserialize(PG_FUNCTION_ARGS)
+{
+ bytea *sstate = PG_GETARG_BYTEA_P(0);
+ PolyNumAggState *result;
+ Datum sumX;
+ Datum sumX2;
+ StringInfoData buf;
+
+ if (!AggCheckCallContext(fcinfo, NULL))
+ elog(ERROR, "aggregate function called in non-aggregate context");
+
+ /*
+ * Copy the bytea into a StringInfo so that we can "receive" it using the
+ * standard pq API.
+ */
+ initStringInfo(&buf);
+ appendBinaryStringInfo(&buf, VARDATA(sstate), VARSIZE(sstate) - VARHDRSZ);
+
+ result = makePolyNumAggState(fcinfo, false);
+
+ /* N */
+ result->N = pq_getmsgint64(&buf);
+
+ /* sumX */
+ sumX = DirectFunctionCall3(numeric_recv,
+ PointerGetDatum(&buf),
+ InvalidOid,
+ -1);
+
+ /* sumX2 */
+ sumX2 = DirectFunctionCall3(numeric_recv,
+ PointerGetDatum(&buf),
+ InvalidOid,
+ -1);
+
+#ifdef HAVE_INT128
+ {
+ NumericVar num;
+
+ init_var(&num);
+ set_var_from_num(DatumGetNumeric(sumX), &num);
+ numericvar_to_int128(&num, &result->sumX);
+
+ set_var_from_num(DatumGetNumeric(sumX2), &num);
+ numericvar_to_int128(&num, &result->sumX2);
+
+ free_var(&num);
+ }
+#else
+ set_var_from_num(DatumGetNumeric(sumX), &result->sumX);
+ set_var_from_num(DatumGetNumeric(sumX2), &result->sumX2);
+#endif
+
+ pq_getmsgend(&buf);
+ pfree(buf.data);
+
+ PG_RETURN_POINTER(result);
+}
+
+/*
* Transition function for int8 input when we don't need sumX2.
*/
Datum
@@ -3581,6 +4163,180 @@ int8_avg_accum(PG_FUNCTION_ARGS)
PG_RETURN_POINTER(state);
}
+/*
+ * Combine function for PolyNumAggState for aggregates which don't require
+ * sumX2
+ */
+Datum
+int8_avg_combine(PG_FUNCTION_ARGS)
+{
+ PolyNumAggState *state1;
+ PolyNumAggState *state2;
+ MemoryContext agg_context;
+ MemoryContext old_context;
+
+ if (!AggCheckCallContext(fcinfo, &agg_context))
+ elog(ERROR, "aggregate function called in non-aggregate context");
+
+ state1 = PG_ARGISNULL(0) ? NULL : (PolyNumAggState *) PG_GETARG_POINTER(0);
+ state2 = PG_ARGISNULL(1) ? NULL : (PolyNumAggState *) PG_GETARG_POINTER(1);
+
+ if (state2 == NULL)
+ PG_RETURN_POINTER(state1);
+
+ /* manually copy all fields from state2 to state1 */
+ if (state1 == NULL)
+ {
+ old_context = MemoryContextSwitchTo(agg_context);
+
+ state1 = makePolyNumAggState(fcinfo, false);
+ state1->N = state2->N;
+
+#ifdef HAVE_INT128
+ state1->sumX = state2->sumX;
+#else
+ init_var(&state1->sumX);
+ set_var_from_var(&state2->sumX, &state1->sumX);
+#endif
+ MemoryContextSwitchTo(old_context);
+
+ PG_RETURN_POINTER(state1);
+ }
+
+ if (state2->N > 0)
+ {
+ state1->N += state2->N;
+
+#ifdef HAVE_INT128
+ state1->sumX += state2->sumX;
+#else
+ /* The rest of this needs to work in the aggregate context */
+ old_context = MemoryContextSwitchTo(agg_context);
+
+ /* Accumulate sums */
+ add_var(&(state1->sumX), &(state2->sumX), &(state1->sumX));
+
+ MemoryContextSwitchTo(old_context);
+#endif
+
+ }
+ PG_RETURN_POINTER(state1);
+}
+
+/*
+ * int8_avg_serialize
+ * Serialize PolyNumAggState into bytea using the standard pq API.
+ *
+ * int8_avg_deserialize(int8_avg_serialize(state)) must result in a state which
+ * matches the original input state.
+ */
+Datum
+int8_avg_serialize(PG_FUNCTION_ARGS)
+{
+ PolyNumAggState *state;
+ StringInfoData buf;
+ bytea *sumX;
+ bytea *result;
+
+ /* Ensure we disallow calling when not in aggregate context */
+ if (!AggCheckCallContext(fcinfo, NULL))
+ elog(ERROR, "aggregate function called in non-aggregate context");
+
+ state = (PolyNumAggState *) PG_GETARG_POINTER(0);
+
+ /*
+ * If the platform supports int128 then sumX will be a 128 integer type.
+ * Here we'll convert that into a numeric type so that the combine state
+ * is in the same format for both int128 enabled machines and machines
+ * which don't support that type. The logic here is that one day we might
+ * like to send these over to another server for further processing and we
+ * want a standard format to work with.
+ */
+ {
+ Datum temp;
+#ifdef HAVE_INT128
+ NumericVar num;
+
+ init_var(&num);
+ int128_to_numericvar(state->sumX, &num);
+ temp = DirectFunctionCall1(numeric_send,
+ NumericGetDatum(make_result(&num)));
+ free_var(&num);
+ sumX = DatumGetByteaP(temp);
+#else
+ temp = DirectFunctionCall1(numeric_send,
+ NumericGetDatum(make_result(&state->sumX)));
+ sumX = DatumGetByteaP(temp);
+#endif
+ }
+
+ pq_begintypsend(&buf);
+
+ /* N */
+ pq_sendint64(&buf, state->N);
+
+ /* sumX */
+ pq_sendbytes(&buf, VARDATA(sumX), VARSIZE(sumX) - VARHDRSZ);
+
+ result = pq_endtypsend(&buf);
+
+ PG_RETURN_BYTEA_P(result);
+}
+
+/*
+ * int8_avg_deserialize
+ * Deserialize bytea back into PolyNumAggState.
+ *
+ * int8_avg_serialize(int8_avg_deserialize(bytea)) must result in a value which
+ * matches the original bytea value.
+ */
+Datum
+int8_avg_deserialize(PG_FUNCTION_ARGS)
+{
+ bytea *sstate = PG_GETARG_BYTEA_P(0);
+ PolyNumAggState *result;
+ StringInfoData buf;
+ Datum temp;
+
+ if (!AggCheckCallContext(fcinfo, NULL))
+ elog(ERROR, "aggregate function called in non-aggregate context");
+
+ /*
+ * Copy the bytea into a StringInfo so that we can "receive" it using the
+ * standard pq API.
+ */
+ initStringInfo(&buf);
+ appendBinaryStringInfo(&buf, VARDATA(sstate), VARSIZE(sstate) - VARHDRSZ);
+
+ result = makePolyNumAggState(fcinfo, false);
+
+ /* N */
+ result->N = pq_getmsgint64(&buf);
+
+ /* sumX */
+ temp = DirectFunctionCall3(numeric_recv,
+ PointerGetDatum(&buf),
+ InvalidOid,
+ -1);
+
+#ifdef HAVE_INT128
+ {
+ NumericVar num;
+
+ init_var(&num);
+ set_var_from_num(DatumGetNumeric(temp), &num);
+ numericvar_to_int128(&num, &result->sumX);
+ free_var(&num);
+ }
+#else
+ set_var_from_num(DatumGetNumeric(temp), &result->sumX);
+#endif
+
+ pq_getmsgend(&buf);
+ pfree(buf.data);
+
+ PG_RETURN_POINTER(result);
+}
/*
* Inverse transition functions to go with the above.
@@ -4310,6 +5066,37 @@ int4_avg_accum(PG_FUNCTION_ARGS)
}
Datum
+int4_avg_combine(PG_FUNCTION_ARGS)
+{
+ ArrayType *transarray1;
+ ArrayType *transarray2;
+ Int8TransTypeData *state1;
+ Int8TransTypeData *state2;
+
+ if (!AggCheckCallContext(fcinfo, NULL))
+ elog(ERROR, "aggregate function called in non-aggregate context");
+
+ transarray1 = PG_GETARG_ARRAYTYPE_P(0);
+ transarray2 = PG_GETARG_ARRAYTYPE_P(1);
+
+ if (ARR_HASNULL(transarray1) ||
+ ARR_SIZE(transarray1) != ARR_OVERHEAD_NONULLS(1) + sizeof(Int8TransTypeData))
+ elog(ERROR, "expected 2-element int8 array");
+
+ if (ARR_HASNULL(transarray2) ||
+ ARR_SIZE(transarray2) != ARR_OVERHEAD_NONULLS(1) + sizeof(Int8TransTypeData))
+ elog(ERROR, "expected 2-element int8 array");
+
+ state1 = (Int8TransTypeData *) ARR_DATA_PTR(transarray1);
+ state2 = (Int8TransTypeData *) ARR_DATA_PTR(transarray2);
+
+ state1->count += state2->count;
+ state1->sum += state2->sum;
+
+ PG_RETURN_ARRAYTYPE_P(transarray1);
+}
+
+Datum
int2_avg_accum_inv(PG_FUNCTION_ARGS)
{
ArrayType *transarray;
@@ -5308,6 +6095,79 @@ int64_to_numericvar(int64 val, NumericVar *var)
#ifdef HAVE_INT128
/*
+ * Convert numeric to int128, rounding if needed.
+ *
+ * If overflow, return FALSE (no error is raised). Return TRUE if okay.
+ */
+static bool
+numericvar_to_int128(NumericVar *var, int128 *result)
+{
+ NumericDigit *digits;
+ int ndigits;
+ int weight;
+ int i;
+ int128 val,
+ oldval;
+ bool neg;
+ NumericVar rounded;
+
+ /* Round to nearest integer */
+ init_var(&rounded);
+ set_var_from_var(var, &rounded);
+ round_var(&rounded, 0);
+
+ /* Check for zero input */
+ strip_var(&rounded);
+ ndigits = rounded.ndigits;
+ if (ndigits == 0)
+ {
+ *result = 0;
+ free_var(&rounded);
+ return true;
+ }
+
+ /*
+ * For input like 10000000000, we must treat stripped digits as real. So
+ * the loop assumes there are weight+1 digits before the decimal point.
+ */
+ weight = rounded.weight;
+ Assert(weight >= 0 && ndigits <= weight + 1);
+
+ /* Construct the result */
+ digits = rounded.digits;
+ neg = (rounded.sign == NUMERIC_NEG);
+ val = digits[0];
+ for (i = 1; i <= weight; i++)
+ {
+ oldval = val;
+ val *= NBASE;
+ if (i < ndigits)
+ val += digits[i];
+
+ /*
+ * The overflow check is a bit tricky because we want to accept
+ * INT128_MIN, which will overflow the positive accumulator. We can
+ * detect this case easily though because INT128_MIN is the only
+ * nonzero value for which -val == val (on a two's complement machine,
+ * anyway).
+ */
+ if ((val / NBASE) != oldval) /* possible overflow? */
+ {
+ if (!neg || (-val) != val || val == 0 || oldval < 0)
+ {
+ free_var(&rounded);
+ return false;
+ }
+ }
+ }
+
+ free_var(&rounded);
+
+ *result = neg ? -val : val;
+ return true;
+}
+
+/*
* Convert 128 bit integer to numeric.
*/
static void