aboutsummaryrefslogtreecommitdiff
path: root/src/backend/utils/adt/numeric.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/utils/adt/numeric.c')
-rw-r--r--src/backend/utils/adt/numeric.c860
1 files changed, 860 insertions, 0 deletions
diff --git a/src/backend/utils/adt/numeric.c b/src/backend/utils/adt/numeric.c
index 07b264572d9..3ba373a15b9 100644
--- a/src/backend/utils/adt/numeric.c
+++ b/src/backend/utils/adt/numeric.c
@@ -436,6 +436,7 @@ static int32 numericvar_to_int32(NumericVar *var);
static bool numericvar_to_int64(NumericVar *var, int64 *result);
static void int64_to_numericvar(int64 val, NumericVar *var);
#ifdef HAVE_INT128
+static bool numericvar_to_int128(NumericVar *var, int128 *result);
static void int128_to_numericvar(int128 val, NumericVar *var);
#endif
static double numeric_to_double_no_overflow(Numeric num);
@@ -3349,6 +3350,77 @@ numeric_accum(PG_FUNCTION_ARGS)
}
/*
+ * Generic combine function for numeric aggregates which require sumX2
+ */
+Datum
+numeric_combine(PG_FUNCTION_ARGS)
+{
+ NumericAggState *state1;
+ NumericAggState *state2;
+ MemoryContext agg_context;
+ MemoryContext old_context;
+
+ if (!AggCheckCallContext(fcinfo, &agg_context))
+ elog(ERROR, "aggregate function called in non-aggregate context");
+
+ state1 = PG_ARGISNULL(0) ? NULL : (NumericAggState *) PG_GETARG_POINTER(0);
+ state2 = PG_ARGISNULL(1) ? NULL : (NumericAggState *) PG_GETARG_POINTER(1);
+
+ if (state2 == NULL)
+ PG_RETURN_POINTER(state1);
+
+ /* manually copy all fields from state2 to state1 */
+ if (state1 == NULL)
+ {
+ old_context = MemoryContextSwitchTo(agg_context);
+
+ state1 = makeNumericAggState(fcinfo, true);
+ state1->N = state2->N;
+ state1->NaNcount = state2->NaNcount;
+ state1->maxScale = state2->maxScale;
+ state1->maxScaleCount = state2->maxScaleCount;
+
+ init_var(&state1->sumX);
+ set_var_from_var(&state2->sumX, &state1->sumX);
+
+ init_var(&state1->sumX2);
+ set_var_from_var(&state2->sumX2, &state1->sumX2);
+
+ MemoryContextSwitchTo(old_context);
+
+ PG_RETURN_POINTER(state1);
+ }
+
+ if (state2->N > 0)
+ {
+ state1->N += state2->N;
+ state1->NaNcount += state2->NaNcount;
+
+ /*
+ * These are currently only needed for moving aggregates, but let's
+ * do the right thing anyway...
+ */
+ if (state2->maxScale > state1->maxScale)
+ {
+ state1->maxScale = state2->maxScale;
+ state1->maxScaleCount = state2->maxScaleCount;
+ }
+ else if (state2->maxScale == state1->maxScale)
+ state1->maxScaleCount += state2->maxScaleCount;
+
+ /* The rest of this needs to work in the aggregate context */
+ old_context = MemoryContextSwitchTo(agg_context);
+
+ /* Accumulate sums */
+ add_var(&(state1->sumX), &(state2->sumX), &(state1->sumX));
+ add_var(&(state1->sumX2), &(state2->sumX2), &(state1->sumX2));
+
+ MemoryContextSwitchTo(old_context);
+ }
+ PG_RETURN_POINTER(state1);
+}
+
+/*
* Generic transition function for numeric aggregates that don't require sumX2.
*/
Datum
@@ -3369,6 +3441,307 @@ numeric_avg_accum(PG_FUNCTION_ARGS)
}
/*
+ * Combine function for numeric aggregates which don't require sumX2
+ */
+Datum
+numeric_avg_combine(PG_FUNCTION_ARGS)
+{
+ NumericAggState *state1;
+ NumericAggState *state2;
+ MemoryContext agg_context;
+ MemoryContext old_context;
+
+ if (!AggCheckCallContext(fcinfo, &agg_context))
+ elog(ERROR, "aggregate function called in non-aggregate context");
+
+ state1 = PG_ARGISNULL(0) ? NULL : (NumericAggState *) PG_GETARG_POINTER(0);
+ state2 = PG_ARGISNULL(1) ? NULL : (NumericAggState *) PG_GETARG_POINTER(1);
+
+ if (state2 == NULL)
+ PG_RETURN_POINTER(state1);
+
+ /* manually copy all fields from state2 to state1 */
+ if (state1 == NULL)
+ {
+ old_context = MemoryContextSwitchTo(agg_context);
+
+ state1 = makeNumericAggState(fcinfo, false);
+ state1->N = state2->N;
+ state1->NaNcount = state2->NaNcount;
+ state1->maxScale = state2->maxScale;
+ state1->maxScaleCount = state2->maxScaleCount;
+
+ init_var(&state1->sumX);
+ set_var_from_var(&state2->sumX, &state1->sumX);
+
+ MemoryContextSwitchTo(old_context);
+
+ PG_RETURN_POINTER(state1);
+ }
+
+ if (state2->N > 0)
+ {
+ state1->N += state2->N;
+ state1->NaNcount += state2->NaNcount;
+
+ /*
+ * These are currently only needed for moving aggregates, but let's
+ * do the right thing anyway...
+ */
+ if (state2->maxScale > state1->maxScale)
+ {
+ state1->maxScale = state2->maxScale;
+ state1->maxScaleCount = state2->maxScaleCount;
+ }
+ else if (state2->maxScale == state1->maxScale)
+ state1->maxScaleCount += state2->maxScaleCount;
+
+ /* The rest of this needs to work in the aggregate context */
+ old_context = MemoryContextSwitchTo(agg_context);
+
+ /* Accumulate sums */
+ add_var(&(state1->sumX), &(state2->sumX), &(state1->sumX));
+
+ MemoryContextSwitchTo(old_context);
+ }
+ PG_RETURN_POINTER(state1);
+}
+
+/*
+ * numeric_avg_serialize
+ * Serialize NumericAggState for numeric aggregates that don't require
+ * sumX2. Serializes NumericAggState into bytea using the standard pq API.
+ *
+ * numeric_avg_deserialize(numeric_avg_serialize(state)) must result in a state
+ * which matches the original input state.
+ */
+Datum
+numeric_avg_serialize(PG_FUNCTION_ARGS)
+{
+ NumericAggState *state;
+ StringInfoData buf;
+ Datum temp;
+ bytea *sumX;
+ bytea *result;
+
+ /* Ensure we disallow calling when not in aggregate context */
+ if (!AggCheckCallContext(fcinfo, NULL))
+ elog(ERROR, "aggregate function called in non-aggregate context");
+
+ state = (NumericAggState *) PG_GETARG_POINTER(0);
+
+ /*
+ * This is a little wasteful since make_result converts the NumericVar
+ * into a Numeric and numeric_send converts it back again. Is it worth
+ * splitting the tasks in numeric_send into separate functions to stop
+ * this? Doing so would also remove the fmgr call overhead.
+ */
+ temp = DirectFunctionCall1(numeric_send,
+ NumericGetDatum(make_result(&state->sumX)));
+ sumX = DatumGetByteaP(temp);
+
+ pq_begintypsend(&buf);
+
+ /* N */
+ pq_sendint64(&buf, state->N);
+
+ /* sumX */
+ pq_sendbytes(&buf, VARDATA(sumX), VARSIZE(sumX) - VARHDRSZ);
+
+ /* maxScale */
+ pq_sendint(&buf, state->maxScale, 4);
+
+ /* maxScaleCount */
+ pq_sendint64(&buf, state->maxScaleCount);
+
+ /* NaNcount */
+ pq_sendint64(&buf, state->NaNcount);
+
+ result = pq_endtypsend(&buf);
+
+ PG_RETURN_BYTEA_P(result);
+}
+
+/*
+ * numeric_avg_deserialize
+ * Deserialize bytea into NumericAggState for numeric aggregates that
+ * don't require sumX2. Deserializes bytea into NumericAggState using the
+ * standard pq API.
+ *
+ * numeric_avg_serialize(numeric_avg_deserialize(bytea)) must result in a value
+ * which matches the original bytea value.
+ */
+Datum
+numeric_avg_deserialize(PG_FUNCTION_ARGS)
+{
+ bytea *sstate = PG_GETARG_BYTEA_P(0);
+ NumericAggState *result;
+ Datum temp;
+ StringInfoData buf;
+
+ if (!AggCheckCallContext(fcinfo, NULL))
+ elog(ERROR, "aggregate function called in non-aggregate context");
+
+ /*
+ * Copy the bytea into a StringInfo so that we can "receive" it using the
+ * standard pq API.
+ */
+ initStringInfo(&buf);
+ appendBinaryStringInfo(&buf, VARDATA(sstate), VARSIZE(sstate) - VARHDRSZ);
+
+ result = makeNumericAggState(fcinfo, false);
+
+ /* N */
+ result->N = pq_getmsgint64(&buf);
+
+ /* sumX */
+ temp = DirectFunctionCall3(numeric_recv,
+ PointerGetDatum(&buf),
+ InvalidOid,
+ -1);
+ set_var_from_num(DatumGetNumeric(temp), &result->sumX);
+
+ /* maxScale */
+ result->maxScale = pq_getmsgint(&buf, 4);
+
+ /* maxScaleCount */
+ result->maxScaleCount = pq_getmsgint64(&buf);
+
+ /* NaNcount */
+ result->NaNcount = pq_getmsgint64(&buf);
+
+ pq_getmsgend(&buf);
+ pfree(buf.data);
+
+ PG_RETURN_POINTER(result);
+}
+
+/*
+ * numeric_serialize
+ * Serialization function for NumericAggState for numeric aggregates that
+ * require sumX2. Serializes NumericAggState into bytea using the standard
+ * pq API.
+ *
+ * numeric_deserialize(numeric_serialize(state)) must result in a state which
+ * matches the original input state.
+ */
+Datum
+numeric_serialize(PG_FUNCTION_ARGS)
+{
+ NumericAggState *state;
+ StringInfoData buf;
+ Datum temp;
+ bytea *sumX;
+ bytea *sumX2;
+ bytea *result;
+
+ /* Ensure we disallow calling when not in aggregate context */
+ if (!AggCheckCallContext(fcinfo, NULL))
+ elog(ERROR, "aggregate function called in non-aggregate context");
+
+ state = (NumericAggState *) PG_GETARG_POINTER(0);
+
+ /*
+ * This is a little wasteful since make_result converts the NumericVar
+ * into a Numeric and numeric_send converts it back again. Is it worth
+ * splitting the tasks in numeric_send into separate functions to stop
+ * this? Doing so would also remove the fmgr call overhead.
+ */
+ temp = DirectFunctionCall1(numeric_send,
+ NumericGetDatum(make_result(&state->sumX)));
+ sumX = DatumGetByteaP(temp);
+
+ temp = DirectFunctionCall1(numeric_send,
+ NumericGetDatum(make_result(&state->sumX2)));
+ sumX2 = DatumGetByteaP(temp);
+
+ pq_begintypsend(&buf);
+
+ /* N */
+ pq_sendint64(&buf, state->N);
+
+ /* sumX */
+ pq_sendbytes(&buf, VARDATA(sumX), VARSIZE(sumX) - VARHDRSZ);
+
+ /* sumX2 */
+ pq_sendbytes(&buf, VARDATA(sumX2), VARSIZE(sumX2) - VARHDRSZ);
+
+ /* maxScale */
+ pq_sendint(&buf, state->maxScale, 4);
+
+ /* maxScaleCount */
+ pq_sendint64(&buf, state->maxScaleCount);
+
+ /* NaNcount */
+ pq_sendint64(&buf, state->NaNcount);
+
+ result = pq_endtypsend(&buf);
+
+ PG_RETURN_BYTEA_P(result);
+}
+
+/*
+ * numeric_deserialize
+ * Deserialization function for NumericAggState for numeric aggregates that
+ * require sumX2. Deserializes bytea into into NumericAggState using the
+ * standard pq API.
+ *
+ * numeric_serialize(numeric_deserialize(bytea)) must result in a value which
+ * matches the original bytea value.
+ */
+Datum
+numeric_deserialize(PG_FUNCTION_ARGS)
+{
+ bytea *sstate = PG_GETARG_BYTEA_P(0);
+ NumericAggState *result;
+ Datum temp;
+ StringInfoData buf;
+
+ if (!AggCheckCallContext(fcinfo, NULL))
+ elog(ERROR, "aggregate function called in non-aggregate context");
+
+ /*
+ * Copy the bytea into a StringInfo so that we can "receive" it using the
+ * standard pq API.
+ */
+ initStringInfo(&buf);
+ appendBinaryStringInfo(&buf, VARDATA(sstate), VARSIZE(sstate) - VARHDRSZ);
+
+ result = makeNumericAggState(fcinfo, false);
+
+ /* N */
+ result->N = pq_getmsgint64(&buf);
+
+ /* sumX */
+ temp = DirectFunctionCall3(numeric_recv,
+ PointerGetDatum(&buf),
+ InvalidOid,
+ -1);
+ set_var_from_num(DatumGetNumeric(temp), &result->sumX);
+
+ /* sumX2 */
+ temp = DirectFunctionCall3(numeric_recv,
+ PointerGetDatum(&buf),
+ InvalidOid,
+ -1);
+ set_var_from_num(DatumGetNumeric(temp), &result->sumX2);
+
+ /* maxScale */
+ result->maxScale = pq_getmsgint(&buf, 4);
+
+ /* maxScaleCount */
+ result->maxScaleCount = pq_getmsgint64(&buf);
+
+ /* NaNcount */
+ result->NaNcount = pq_getmsgint64(&buf);
+
+ pq_getmsgend(&buf);
+ pfree(buf.data);
+
+ PG_RETURN_POINTER(result);
+}
+
+/*
* Generic inverse transition function for numeric aggregates
* (with or without requirement for X^2).
*/
@@ -3552,6 +3925,215 @@ int8_accum(PG_FUNCTION_ARGS)
}
/*
+ * Combine function for numeric aggregates which require sumX2
+ */
+Datum
+numeric_poly_combine(PG_FUNCTION_ARGS)
+{
+ PolyNumAggState *state1;
+ PolyNumAggState *state2;
+ MemoryContext agg_context;
+ MemoryContext old_context;
+
+ if (!AggCheckCallContext(fcinfo, &agg_context))
+ elog(ERROR, "aggregate function called in non-aggregate context");
+
+ state1 = PG_ARGISNULL(0) ? NULL : (PolyNumAggState *) PG_GETARG_POINTER(0);
+ state2 = PG_ARGISNULL(1) ? NULL : (PolyNumAggState *) PG_GETARG_POINTER(1);
+
+ if (state2 == NULL)
+ PG_RETURN_POINTER(state1);
+
+ /* manually copy all fields from state2 to state1 */
+ if (state1 == NULL)
+ {
+ old_context = MemoryContextSwitchTo(agg_context);
+
+ state1 = makePolyNumAggState(fcinfo, true);
+ state1->N = state2->N;
+
+#ifdef HAVE_INT128
+ state1->sumX = state2->sumX;
+ state1->sumX2 = state2->sumX2;
+#else
+ init_var(&(state1->sumX));
+ set_var_from_var(&(state2->sumX), &(state1->sumX));
+
+ init_var(&state1->sumX2);
+ set_var_from_var(&(state2->sumX2), &(state1->sumX2));
+#endif
+
+ MemoryContextSwitchTo(old_context);
+
+ PG_RETURN_POINTER(state1);
+ }
+
+ if (state2->N > 0)
+ {
+ state1->N += state2->N;
+
+#ifdef HAVE_INT128
+ state1->sumX += state2->sumX;
+ state1->sumX2 += state2->sumX2;
+#else
+ /* The rest of this needs to work in the aggregate context */
+ old_context = MemoryContextSwitchTo(agg_context);
+
+ /* Accumulate sums */
+ add_var(&(state1->sumX), &(state2->sumX), &(state1->sumX));
+ add_var(&(state1->sumX2), &(state2->sumX2), &(state1->sumX2));
+
+ MemoryContextSwitchTo(old_context);
+#endif
+
+ }
+ PG_RETURN_POINTER(state1);
+}
+
+/*
+ * numeric_poly_serialize
+ * Serialize PolyNumAggState into bytea using the standard pq API for
+ * aggregate functions which require sumX2.
+ *
+ * numeric_poly_deserialize(numeric_poly_serialize(state)) must result in a
+ * state which matches the original input state.
+ */
+Datum
+numeric_poly_serialize(PG_FUNCTION_ARGS)
+{
+ PolyNumAggState *state;
+ StringInfoData buf;
+ bytea *sumX;
+ bytea *sumX2;
+ bytea *result;
+
+ /* Ensure we disallow calling when not in aggregate context */
+ if (!AggCheckCallContext(fcinfo, NULL))
+ elog(ERROR, "aggregate function called in non-aggregate context");
+
+ state = (PolyNumAggState *) PG_GETARG_POINTER(0);
+
+ /*
+ * If the platform supports int128 then sumX and sumX2 will be a 128 bit
+ * integer type. Here we'll convert that into a numeric type so that the
+ * combine state is in the same format for both int128 enabled machines
+ * and machines which don't support that type. The logic here is that one
+ * day we might like to send these over to another server for further
+ * processing and we want a standard format to work with.
+ */
+ {
+ Datum temp;
+
+#ifdef HAVE_INT128
+ NumericVar num;
+
+ init_var(&num);
+ int128_to_numericvar(state->sumX, &num);
+ temp = DirectFunctionCall1(numeric_send,
+ NumericGetDatum(make_result(&num)));
+ sumX = DatumGetByteaP(temp);
+
+ int128_to_numericvar(state->sumX2, &num);
+ temp = DirectFunctionCall1(numeric_send,
+ NumericGetDatum(make_result(&num)));
+ sumX2 = DatumGetByteaP(temp);
+ free_var(&num);
+#else
+ temp = DirectFunctionCall1(numeric_send,
+ NumericGetDatum(make_result(&state->sumX)));
+ sumX = DatumGetByteaP(temp);
+
+ temp = DirectFunctionCall1(numeric_send,
+ NumericGetDatum(make_result(&state->sumX2)));
+ sumX2 = DatumGetByteaP(temp);
+#endif
+ }
+
+ pq_begintypsend(&buf);
+
+ /* N */
+ pq_sendint64(&buf, state->N);
+
+ /* sumX */
+ pq_sendbytes(&buf, VARDATA(sumX), VARSIZE(sumX) - VARHDRSZ);
+
+ /* sumX2 */
+ pq_sendbytes(&buf, VARDATA(sumX2), VARSIZE(sumX2) - VARHDRSZ);
+
+ result = pq_endtypsend(&buf);
+
+ PG_RETURN_BYTEA_P(result);
+}
+
+/*
+ * numeric_poly_deserialize
+ * Deserialize PolyNumAggState from bytea using the standard pq API for
+ * aggregate functions which require sumX2.
+ *
+ * numeric_poly_serialize(numeric_poly_deserialize(bytea)) must result in a
+ * state which matches the original input state.
+ */
+Datum
+numeric_poly_deserialize(PG_FUNCTION_ARGS)
+{
+ bytea *sstate = PG_GETARG_BYTEA_P(0);
+ PolyNumAggState *result;
+ Datum sumX;
+ Datum sumX2;
+ StringInfoData buf;
+
+ if (!AggCheckCallContext(fcinfo, NULL))
+ elog(ERROR, "aggregate function called in non-aggregate context");
+
+ /*
+ * Copy the bytea into a StringInfo so that we can "receive" it using the
+ * standard pq API.
+ */
+ initStringInfo(&buf);
+ appendBinaryStringInfo(&buf, VARDATA(sstate), VARSIZE(sstate) - VARHDRSZ);
+
+ result = makePolyNumAggState(fcinfo, false);
+
+ /* N */
+ result->N = pq_getmsgint64(&buf);
+
+ /* sumX */
+ sumX = DirectFunctionCall3(numeric_recv,
+ PointerGetDatum(&buf),
+ InvalidOid,
+ -1);
+
+ /* sumX2 */
+ sumX2 = DirectFunctionCall3(numeric_recv,
+ PointerGetDatum(&buf),
+ InvalidOid,
+ -1);
+
+#ifdef HAVE_INT128
+ {
+ NumericVar num;
+
+ init_var(&num);
+ set_var_from_num(DatumGetNumeric(sumX), &num);
+ numericvar_to_int128(&num, &result->sumX);
+
+ set_var_from_num(DatumGetNumeric(sumX2), &num);
+ numericvar_to_int128(&num, &result->sumX2);
+
+ free_var(&num);
+ }
+#else
+ set_var_from_num(DatumGetNumeric(sumX), &result->sumX);
+ set_var_from_num(DatumGetNumeric(sumX2), &result->sumX2);
+#endif
+
+ pq_getmsgend(&buf);
+ pfree(buf.data);
+
+ PG_RETURN_POINTER(result);
+}
+
+/*
* Transition function for int8 input when we don't need sumX2.
*/
Datum
@@ -3581,6 +4163,180 @@ int8_avg_accum(PG_FUNCTION_ARGS)
PG_RETURN_POINTER(state);
}
+/*
+ * Combine function for PolyNumAggState for aggregates which don't require
+ * sumX2
+ */
+Datum
+int8_avg_combine(PG_FUNCTION_ARGS)
+{
+ PolyNumAggState *state1;
+ PolyNumAggState *state2;
+ MemoryContext agg_context;
+ MemoryContext old_context;
+
+ if (!AggCheckCallContext(fcinfo, &agg_context))
+ elog(ERROR, "aggregate function called in non-aggregate context");
+
+ state1 = PG_ARGISNULL(0) ? NULL : (PolyNumAggState *) PG_GETARG_POINTER(0);
+ state2 = PG_ARGISNULL(1) ? NULL : (PolyNumAggState *) PG_GETARG_POINTER(1);
+
+ if (state2 == NULL)
+ PG_RETURN_POINTER(state1);
+
+ /* manually copy all fields from state2 to state1 */
+ if (state1 == NULL)
+ {
+ old_context = MemoryContextSwitchTo(agg_context);
+
+ state1 = makePolyNumAggState(fcinfo, false);
+ state1->N = state2->N;
+
+#ifdef HAVE_INT128
+ state1->sumX = state2->sumX;
+#else
+ init_var(&state1->sumX);
+ set_var_from_var(&state2->sumX, &state1->sumX);
+#endif
+ MemoryContextSwitchTo(old_context);
+
+ PG_RETURN_POINTER(state1);
+ }
+
+ if (state2->N > 0)
+ {
+ state1->N += state2->N;
+
+#ifdef HAVE_INT128
+ state1->sumX += state2->sumX;
+#else
+ /* The rest of this needs to work in the aggregate context */
+ old_context = MemoryContextSwitchTo(agg_context);
+
+ /* Accumulate sums */
+ add_var(&(state1->sumX), &(state2->sumX), &(state1->sumX));
+
+ MemoryContextSwitchTo(old_context);
+#endif
+
+ }
+ PG_RETURN_POINTER(state1);
+}
+
+/*
+ * int8_avg_serialize
+ * Serialize PolyNumAggState into bytea using the standard pq API.
+ *
+ * int8_avg_deserialize(int8_avg_serialize(state)) must result in a state which
+ * matches the original input state.
+ */
+Datum
+int8_avg_serialize(PG_FUNCTION_ARGS)
+{
+ PolyNumAggState *state;
+ StringInfoData buf;
+ bytea *sumX;
+ bytea *result;
+
+ /* Ensure we disallow calling when not in aggregate context */
+ if (!AggCheckCallContext(fcinfo, NULL))
+ elog(ERROR, "aggregate function called in non-aggregate context");
+
+ state = (PolyNumAggState *) PG_GETARG_POINTER(0);
+
+ /*
+ * If the platform supports int128 then sumX will be a 128 integer type.
+ * Here we'll convert that into a numeric type so that the combine state
+ * is in the same format for both int128 enabled machines and machines
+ * which don't support that type. The logic here is that one day we might
+ * like to send these over to another server for further processing and we
+ * want a standard format to work with.
+ */
+ {
+ Datum temp;
+#ifdef HAVE_INT128
+ NumericVar num;
+
+ init_var(&num);
+ int128_to_numericvar(state->sumX, &num);
+ temp = DirectFunctionCall1(numeric_send,
+ NumericGetDatum(make_result(&num)));
+ free_var(&num);
+ sumX = DatumGetByteaP(temp);
+#else
+ temp = DirectFunctionCall1(numeric_send,
+ NumericGetDatum(make_result(&state->sumX)));
+ sumX = DatumGetByteaP(temp);
+#endif
+ }
+
+ pq_begintypsend(&buf);
+
+ /* N */
+ pq_sendint64(&buf, state->N);
+
+ /* sumX */
+ pq_sendbytes(&buf, VARDATA(sumX), VARSIZE(sumX) - VARHDRSZ);
+
+ result = pq_endtypsend(&buf);
+
+ PG_RETURN_BYTEA_P(result);
+}
+
+/*
+ * int8_avg_deserialize
+ * Deserialize bytea back into PolyNumAggState.
+ *
+ * int8_avg_serialize(int8_avg_deserialize(bytea)) must result in a value which
+ * matches the original bytea value.
+ */
+Datum
+int8_avg_deserialize(PG_FUNCTION_ARGS)
+{
+ bytea *sstate = PG_GETARG_BYTEA_P(0);
+ PolyNumAggState *result;
+ StringInfoData buf;
+ Datum temp;
+
+ if (!AggCheckCallContext(fcinfo, NULL))
+ elog(ERROR, "aggregate function called in non-aggregate context");
+
+ /*
+ * Copy the bytea into a StringInfo so that we can "receive" it using the
+ * standard pq API.
+ */
+ initStringInfo(&buf);
+ appendBinaryStringInfo(&buf, VARDATA(sstate), VARSIZE(sstate) - VARHDRSZ);
+
+ result = makePolyNumAggState(fcinfo, false);
+
+ /* N */
+ result->N = pq_getmsgint64(&buf);
+
+ /* sumX */
+ temp = DirectFunctionCall3(numeric_recv,
+ PointerGetDatum(&buf),
+ InvalidOid,
+ -1);
+
+#ifdef HAVE_INT128
+ {
+ NumericVar num;
+
+ init_var(&num);
+ set_var_from_num(DatumGetNumeric(temp), &num);
+ numericvar_to_int128(&num, &result->sumX);
+ free_var(&num);
+ }
+#else
+ set_var_from_num(DatumGetNumeric(temp), &result->sumX);
+#endif
+
+ pq_getmsgend(&buf);
+ pfree(buf.data);
+
+ PG_RETURN_POINTER(result);
+}
/*
* Inverse transition functions to go with the above.
@@ -4310,6 +5066,37 @@ int4_avg_accum(PG_FUNCTION_ARGS)
}
Datum
+int4_avg_combine(PG_FUNCTION_ARGS)
+{
+ ArrayType *transarray1;
+ ArrayType *transarray2;
+ Int8TransTypeData *state1;
+ Int8TransTypeData *state2;
+
+ if (!AggCheckCallContext(fcinfo, NULL))
+ elog(ERROR, "aggregate function called in non-aggregate context");
+
+ transarray1 = PG_GETARG_ARRAYTYPE_P(0);
+ transarray2 = PG_GETARG_ARRAYTYPE_P(1);
+
+ if (ARR_HASNULL(transarray1) ||
+ ARR_SIZE(transarray1) != ARR_OVERHEAD_NONULLS(1) + sizeof(Int8TransTypeData))
+ elog(ERROR, "expected 2-element int8 array");
+
+ if (ARR_HASNULL(transarray2) ||
+ ARR_SIZE(transarray2) != ARR_OVERHEAD_NONULLS(1) + sizeof(Int8TransTypeData))
+ elog(ERROR, "expected 2-element int8 array");
+
+ state1 = (Int8TransTypeData *) ARR_DATA_PTR(transarray1);
+ state2 = (Int8TransTypeData *) ARR_DATA_PTR(transarray2);
+
+ state1->count += state2->count;
+ state1->sum += state2->sum;
+
+ PG_RETURN_ARRAYTYPE_P(transarray1);
+}
+
+Datum
int2_avg_accum_inv(PG_FUNCTION_ARGS)
{
ArrayType *transarray;
@@ -5308,6 +6095,79 @@ int64_to_numericvar(int64 val, NumericVar *var)
#ifdef HAVE_INT128
/*
+ * Convert numeric to int128, rounding if needed.
+ *
+ * If overflow, return FALSE (no error is raised). Return TRUE if okay.
+ */
+static bool
+numericvar_to_int128(NumericVar *var, int128 *result)
+{
+ NumericDigit *digits;
+ int ndigits;
+ int weight;
+ int i;
+ int128 val,
+ oldval;
+ bool neg;
+ NumericVar rounded;
+
+ /* Round to nearest integer */
+ init_var(&rounded);
+ set_var_from_var(var, &rounded);
+ round_var(&rounded, 0);
+
+ /* Check for zero input */
+ strip_var(&rounded);
+ ndigits = rounded.ndigits;
+ if (ndigits == 0)
+ {
+ *result = 0;
+ free_var(&rounded);
+ return true;
+ }
+
+ /*
+ * For input like 10000000000, we must treat stripped digits as real. So
+ * the loop assumes there are weight+1 digits before the decimal point.
+ */
+ weight = rounded.weight;
+ Assert(weight >= 0 && ndigits <= weight + 1);
+
+ /* Construct the result */
+ digits = rounded.digits;
+ neg = (rounded.sign == NUMERIC_NEG);
+ val = digits[0];
+ for (i = 1; i <= weight; i++)
+ {
+ oldval = val;
+ val *= NBASE;
+ if (i < ndigits)
+ val += digits[i];
+
+ /*
+ * The overflow check is a bit tricky because we want to accept
+ * INT128_MIN, which will overflow the positive accumulator. We can
+ * detect this case easily though because INT128_MIN is the only
+ * nonzero value for which -val == val (on a two's complement machine,
+ * anyway).
+ */
+ if ((val / NBASE) != oldval) /* possible overflow? */
+ {
+ if (!neg || (-val) != val || val == 0 || oldval < 0)
+ {
+ free_var(&rounded);
+ return false;
+ }
+ }
+ }
+
+ free_var(&rounded);
+
+ *result = neg ? -val : val;
+ return true;
+}
+
+/*
* Convert 128 bit integer to numeric.
*/
static void