diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2003-03-21 01:58:05 +0000 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2003-03-21 01:58:05 +0000 |
commit | d72f6c75038d8d37e64a29a04b911f728044d83b (patch) | |
tree | eceda56ef435a8b8317d0e123cbdb0fe2cd844e4 /src/backend/utils/adt/numeric.c | |
parent | 5ae424529b1bb9bf36eddbec57c7ba887698371e (diff) | |
download | postgresql-d72f6c75038d8d37e64a29a04b911f728044d83b.tar.gz postgresql-d72f6c75038d8d37e64a29a04b911f728044d83b.zip |
Reimplement NUMERIC datatype using base-10000 arithmetic; also improve
some of the algorithms for higher functions. I see about a factor of ten
speedup on the 'numeric' regression test, but it's unlikely that that test
is representative of real-world applications.
initdb forced due to change of on-disk representation for NUMERIC.
Diffstat (limited to 'src/backend/utils/adt/numeric.c')
-rw-r--r-- | src/backend/utils/adt/numeric.c | 2457 |
1 files changed, 1546 insertions, 911 deletions
diff --git a/src/backend/utils/adt/numeric.c b/src/backend/utils/adt/numeric.c index 2214f570d6d..e6698fea6c0 100644 --- a/src/backend/utils/adt/numeric.c +++ b/src/backend/utils/adt/numeric.c @@ -1,19 +1,29 @@ -/* ---------- +/*------------------------------------------------------------------------- + * * numeric.c + * An exact numeric data type for the Postgres database system * - * An exact numeric data type for the Postgres database system + * Original coding 1998, Jan Wieck. Heavily revised 2003, Tom Lane. * - * 1998 Jan Wieck + * Many of the algorithmic ideas are borrowed from David M. Smith's "FM" + * multiple-precision math library, most recently published as Algorithm + * 786: Multiple-Precision Complex Arithmetic and Functions, ACM + * Transactions on Mathematical Software, Vol. 24, No. 4, December 1998, + * pages 359-367. * - * $Header: /cvsroot/pgsql/src/backend/utils/adt/numeric.c,v 1.58 2003/03/14 00:15:32 tgl Exp $ + * Copyright (c) 1998-2003, PostgreSQL Global Development Group * - * ---------- + * IDENTIFICATION + * $Header: /cvsroot/pgsql/src/backend/utils/adt/numeric.c,v 1.59 2003/03/21 01:58:04 tgl Exp $ + * + *------------------------------------------------------------------------- */ #include "postgres.h" #include <ctype.h> #include <float.h> +#include <limits.h> #include <math.h> #include <errno.h> @@ -43,84 +53,166 @@ /* ---------- * Local data types * - * Note: the first digit of a NumericVar's value is assumed to be multiplied - * by 10 ** weight. Another way to say it is that there are weight+1 digits - * before the decimal point. It is possible to have weight < 0. - * + * Numeric values are represented in a base-NBASE floating point format. + * Each "digit" ranges from 0 to NBASE-1. The type NumericDigit is signed + * and wide enough to store a digit. We assume that NBASE*NBASE can fit in + * an int. Although the purely calculational routines could handle any even + * NBASE that's less than sqrt(INT_MAX), in practice we are only interested + * in NBASE a power of ten, so that I/O conversions and decimal rounding + * are easy. Also, it's actually more efficient if NBASE is rather less than + * sqrt(INT_MAX), so that there is "headroom" for mul_var and div_var to + * postpone processing carries. + * ---------- + */ + +#if 0 +#define NBASE 10 +#define HALF_NBASE 5 +#define DEC_DIGITS 1 /* decimal digits per NBASE digit */ +#define MUL_GUARD_DIGITS 4 /* these are measured in NBASE digits */ +#define DIV_GUARD_DIGITS 8 + +typedef signed char NumericDigit; +#endif + +#if 0 +#define NBASE 100 +#define HALF_NBASE 50 +#define DEC_DIGITS 2 /* decimal digits per NBASE digit */ +#define MUL_GUARD_DIGITS 3 /* these are measured in NBASE digits */ +#define DIV_GUARD_DIGITS 6 + +typedef signed char NumericDigit; +#endif + +#if 1 +#define NBASE 10000 +#define HALF_NBASE 5000 +#define DEC_DIGITS 4 /* decimal digits per NBASE digit */ +#define MUL_GUARD_DIGITS 2 /* these are measured in NBASE digits */ +#define DIV_GUARD_DIGITS 4 + +typedef int16 NumericDigit; +#endif + + +/* ---------- * The value represented by a NumericVar is determined by the sign, weight, - * ndigits, and digits[] array. The rscale and dscale are carried along, - * but they are just auxiliary information until rounding is done before - * final storage or display. (Scales are the number of digits wanted - * *after* the decimal point. Scales are always >= 0.) + * ndigits, and digits[] array. + * Note: the first digit of a NumericVar's value is assumed to be multiplied + * by NBASE ** weight. Another way to say it is that there are weight+1 + * digits before the decimal point. It is possible to have weight < 0. * * buf points at the physical start of the palloc'd digit buffer for the * NumericVar. digits points at the first digit in actual use (the one - * with the specified weight). We normally leave an unused byte or two + * with the specified weight). We normally leave an unused digit or two * (preset to zeroes) between buf and digits, so that there is room to store * a carry out of the top digit without special pushups. We just need to * decrement digits (and increment weight) to make room for the carry digit. + * (There is no such extra space in a numeric value stored in the database, + * only in a NumericVar in memory.) * * If buf is NULL then the digit buffer isn't actually palloc'd and should * not be freed --- see the constants below for an example. * + * dscale, or display scale, is the nominal precision expressed as number + * of digits after the decimal point (it must always be >= 0 at present). + * dscale may be more than the number of physically stored fractional digits, + * implying that we have suppressed storage of significant trailing zeroes. + * It should never be less than the number of stored digits, since that would + * imply hiding digits that are present. NOTE that dscale is always expressed + * in *decimal* digits, and so it may correspond to a fractional number of + * base-NBASE digits --- divide by DEC_DIGITS to convert to NBASE digits. + * + * rscale, or result scale, is the target precision for a computation. + * Like dscale it is expressed as number of *decimal* digits after the decimal + * point, and is always >= 0 at present. + * Note that rscale is not stored in variables --- it's figured on-the-fly + * from the dscales of the inputs. + * * NB: All the variable-level functions are written in a style that makes it * possible to give one and the same variable as argument and destination. * This is feasible because the digit buffer is separate from the variable. * ---------- */ -typedef unsigned char NumericDigit; - typedef struct NumericVar { - int ndigits; /* number of digits in digits[] - can be - * 0! */ + int ndigits; /* # of digits in digits[] - can be 0! */ int weight; /* weight of first digit */ - int rscale; /* result scale */ - int dscale; /* display scale */ int sign; /* NUMERIC_POS, NUMERIC_NEG, or * NUMERIC_NAN */ + int dscale; /* display scale */ NumericDigit *buf; /* start of palloc'd space for digits[] */ - NumericDigit *digits; /* decimal digits */ + NumericDigit *digits; /* base-NBASE digits */ } NumericVar; /* ---------- - * Local data - * ---------- - */ -static int global_rscale = 0; - -/* ---------- - * Some preinitialized variables we need often + * Some preinitialized constants * ---------- */ static NumericDigit const_zero_data[1] = {0}; static NumericVar const_zero = -{0, 0, 0, 0, NUMERIC_POS, NULL, const_zero_data}; +{0, 0, NUMERIC_POS, 0, NULL, const_zero_data}; static NumericDigit const_one_data[1] = {1}; static NumericVar const_one = -{1, 0, 0, 0, NUMERIC_POS, NULL, const_one_data}; +{1, 0, NUMERIC_POS, 0, NULL, const_one_data}; static NumericDigit const_two_data[1] = {2}; static NumericVar const_two = -{1, 0, 0, 0, NUMERIC_POS, NULL, const_two_data}; - -static NumericDigit const_zero_point_one_data[1] = {1}; -static NumericVar const_zero_point_one = -{1, -1, 1, 1, NUMERIC_POS, NULL, const_zero_point_one_data}; - +{1, 0, NUMERIC_POS, 0, NULL, const_two_data}; + +#if DEC_DIGITS == 4 +static NumericDigit const_zero_point_five_data[1] = {5000}; +#elif DEC_DIGITS == 2 +static NumericDigit const_zero_point_five_data[1] = {50}; +#elif DEC_DIGITS == 1 +static NumericDigit const_zero_point_five_data[1] = {5}; +#endif +static NumericVar const_zero_point_five = +{1, -1, NUMERIC_POS, 1, NULL, const_zero_point_five_data}; + +#if DEC_DIGITS == 4 +static NumericDigit const_zero_point_nine_data[1] = {9000}; +#elif DEC_DIGITS == 2 +static NumericDigit const_zero_point_nine_data[1] = {90}; +#elif DEC_DIGITS == 1 static NumericDigit const_zero_point_nine_data[1] = {9}; +#endif static NumericVar const_zero_point_nine = -{1, -1, 1, 1, NUMERIC_POS, NULL, const_zero_point_nine_data}; +{1, -1, NUMERIC_POS, 1, NULL, const_zero_point_nine_data}; + +#if DEC_DIGITS == 4 +static NumericDigit const_zero_point_01_data[1] = {100}; +static NumericVar const_zero_point_01 = +{1, -1, NUMERIC_POS, 2, NULL, const_zero_point_01_data}; +#elif DEC_DIGITS == 2 +static NumericDigit const_zero_point_01_data[1] = {1}; +static NumericVar const_zero_point_01 = +{1, -1, NUMERIC_POS, 2, NULL, const_zero_point_01_data}; +#elif DEC_DIGITS == 1 +static NumericDigit const_zero_point_01_data[1] = {1}; +static NumericVar const_zero_point_01 = +{1, -2, NUMERIC_POS, 2, NULL, const_zero_point_01_data}; +#endif +#if DEC_DIGITS == 4 +static NumericDigit const_one_point_one_data[2] = {1, 1000}; +#elif DEC_DIGITS == 2 +static NumericDigit const_one_point_one_data[2] = {1, 10}; +#elif DEC_DIGITS == 1 static NumericDigit const_one_point_one_data[2] = {1, 1}; +#endif static NumericVar const_one_point_one = -{2, 0, 1, 1, NUMERIC_POS, NULL, const_one_point_one_data}; +{2, 0, NUMERIC_POS, 1, NULL, const_one_point_one_data}; static NumericVar const_nan = -{0, 0, 0, 0, NUMERIC_NAN, NULL, NULL}; +{0, 0, NUMERIC_NAN, 0, NULL, NULL}; +#if DEC_DIGITS == 4 +static const int round_powers[4] = { 0, 1000, 100, 10 }; +#endif /* ---------- @@ -129,27 +221,29 @@ static NumericVar const_nan = */ #ifdef NUMERIC_DEBUG -static void dump_numeric(char *str, Numeric num); -static void dump_var(char *str, NumericVar *var); +static void dump_numeric(const char *str, Numeric num); +static void dump_var(const char *str, NumericVar *var); #else #define dump_numeric(s,n) #define dump_var(s,v) #endif -#define digitbuf_alloc(size) ((NumericDigit *) palloc(size)) +#define digitbuf_alloc(ndigits) \ + ((NumericDigit *) palloc((ndigits) * sizeof(NumericDigit))) #define digitbuf_free(buf) \ do { \ if ((buf) != NULL) \ pfree(buf); \ } while (0) -#define init_var(v) memset(v,0,sizeof(NumericVar)) +#define init_var(v) MemSetAligned(v, 0, sizeof(NumericVar)) + static void alloc_var(NumericVar *var, int ndigits); static void free_var(NumericVar *var); static void zero_var(NumericVar *var); -static void set_var_from_str(char *str, NumericVar *dest); +static void set_var_from_str(const char *str, NumericVar *dest); static void set_var_from_num(Numeric value, NumericVar *dest); static void set_var_from_var(NumericVar *value, NumericVar *dest); static char *get_str_from_var(NumericVar *var, int dscale); @@ -158,6 +252,8 @@ static Numeric make_result(NumericVar *var); static void apply_typmod(NumericVar *var, int32 typmod); +static bool numericvar_to_int8(NumericVar *var, int64 *result); +static void int8_to_numericvar(int64 val, NumericVar *var); static double numeric_to_double_no_overflow(Numeric num); static double numericvar_to_double_no_overflow(NumericVar *var); @@ -165,23 +261,30 @@ static int cmp_numerics(Numeric num1, Numeric num2); static int cmp_var(NumericVar *var1, NumericVar *var2); static void add_var(NumericVar *var1, NumericVar *var2, NumericVar *result); static void sub_var(NumericVar *var1, NumericVar *var2, NumericVar *result); -static void mul_var(NumericVar *var1, NumericVar *var2, NumericVar *result); -static void div_var(NumericVar *var1, NumericVar *var2, NumericVar *result); +static void mul_var(NumericVar *var1, NumericVar *var2, NumericVar *result, + int rscale); +static void div_var(NumericVar *var1, NumericVar *var2, NumericVar *result, + int rscale); static int select_div_scale(NumericVar *var1, NumericVar *var2); static void mod_var(NumericVar *var1, NumericVar *var2, NumericVar *result); static void ceil_var(NumericVar *var, NumericVar *result); static void floor_var(NumericVar *var, NumericVar *result); -static void sqrt_var(NumericVar *arg, NumericVar *result); -static void exp_var(NumericVar *arg, NumericVar *result); -static void ln_var(NumericVar *arg, NumericVar *result); +static void sqrt_var(NumericVar *arg, NumericVar *result, int rscale); +static void exp_var(NumericVar *arg, NumericVar *result, int rscale); +static void exp_var_internal(NumericVar *arg, NumericVar *result, int rscale); +static void ln_var(NumericVar *arg, NumericVar *result, int rscale); static void log_var(NumericVar *base, NumericVar *num, NumericVar *result); static void power_var(NumericVar *base, NumericVar *exp, NumericVar *result); +static void power_var_int(NumericVar *base, int exp, NumericVar *result, + int rscale); static int cmp_abs(NumericVar *var1, NumericVar *var2); static void add_abs(NumericVar *var1, NumericVar *var2, NumericVar *result); static void sub_abs(NumericVar *var1, NumericVar *var2, NumericVar *result); - +static void round_var(NumericVar *var, int rscale); +static void trunc_var(NumericVar *var, int rscale); +static void strip_var(NumericVar *var); /* ---------------------------------------------------------------------- @@ -192,11 +295,10 @@ static void sub_abs(NumericVar *var1, NumericVar *var2, NumericVar *result); */ -/* ---------- +/* * numeric_in() - * * Input function for numeric data type - * ---------- */ Datum numeric_in(PG_FUNCTION_ARGS) @@ -232,11 +334,10 @@ numeric_in(PG_FUNCTION_ARGS) } -/* ---------- +/* * numeric_out() - * * Output function for numeric data type - * ---------- */ Datum numeric_out(PG_FUNCTION_ARGS) @@ -270,13 +371,12 @@ numeric_out(PG_FUNCTION_ARGS) } -/* ---------- +/* * numeric() - * * This is a special function called by the Postgres database system - * before a value is stored in a tuples attribute. The precision and + * before a value is stored in a tuple's attribute. The precision and * scale of the attribute have to be applied on the value. - * ---------- */ Datum numeric(PG_FUNCTION_ARGS) @@ -287,7 +387,8 @@ numeric(PG_FUNCTION_ARGS) int32 tmp_typmod; int precision; int scale; - int maxweight; + int ddigits; + int maxdigits; NumericVar var; /* @@ -313,18 +414,19 @@ numeric(PG_FUNCTION_ARGS) tmp_typmod = typmod - VARHDRSZ; precision = (tmp_typmod >> 16) & 0xffff; scale = tmp_typmod & 0xffff; - maxweight = precision - scale; + maxdigits = precision - scale; /* - * If the number is in bounds and due to the present result scale no + * If the number is certainly in bounds and due to the target scale no * rounding could be necessary, just make a copy of the input and - * modify its scale fields. + * modify its scale fields. (Note we assume the existing dscale is + * honest...) */ - if (num->n_weight < maxweight && scale >= num->n_rscale) + ddigits = (num->n_weight + 1) * DEC_DIGITS; + if (ddigits <= maxdigits && scale >= NUMERIC_DSCALE(num)) { new = (Numeric) palloc(num->varlen); memcpy(new, num, num->varlen); - new->n_rscale = scale; new->n_sign_dscale = NUMERIC_SIGN(new) | ((uint16) scale & NUMERIC_DSCALE_MASK); PG_RETURN_NUMERIC(new); @@ -425,12 +527,11 @@ numeric_uplus(PG_FUNCTION_ARGS) PG_RETURN_NUMERIC(res); } -/* ---------- +/* * numeric_sign() - * * returns -1 if the argument is less than 0, 0 if the argument is equal * to 0, and 1 if the argument is greater than zero. - * ---------- */ Datum numeric_sign(PG_FUNCTION_ARGS) @@ -471,13 +572,12 @@ numeric_sign(PG_FUNCTION_ARGS) } -/* ---------- +/* * numeric_round() - * * Round a value to have 'scale' digits after the decimal point. * We allow negative 'scale', implying rounding before the decimal * point --- Oracle interprets rounding that way. - * ---------- */ Datum numeric_round(PG_FUNCTION_ARGS) @@ -486,7 +586,6 @@ numeric_round(PG_FUNCTION_ARGS) int32 scale = PG_GETARG_INT32(1); Numeric res; NumericVar arg; - int i; /* * Handle NaN @@ -496,7 +595,6 @@ numeric_round(PG_FUNCTION_ARGS) /* * Limit the scale value to avoid possible overflow in calculations - * below. */ scale = Max(scale, -NUMERIC_MAX_RESULT_SCALE); scale = Min(scale, NUMERIC_MAX_RESULT_SCALE); @@ -507,46 +605,11 @@ numeric_round(PG_FUNCTION_ARGS) init_var(&arg); set_var_from_num(num, &arg); - i = arg.weight + scale + 1; - - if (i < arg.ndigits) - { - /* - * If i = 0, the value loses all digits, but could round up if its - * first digit is more than 4. If i < 0 the result must be 0. - */ - if (i < 0) - arg.ndigits = 0; - else - { - int carry = (arg.digits[i] > 4) ? 1 : 0; - - arg.ndigits = i; - - while (carry) - { - carry += arg.digits[--i]; - arg.digits[i] = carry % 10; - carry /= 10; - } - - if (i < 0) - { - Assert(i == -1); /* better not have added more than 1 digit */ - Assert(arg.digits > arg.buf); - arg.digits--; - arg.ndigits++; - arg.weight++; - } - } - } + round_var(&arg, scale); - /* - * Set result's scale to something reasonable. - */ - scale = Min(NUMERIC_MAX_DISPLAY_SCALE, Max(0, scale)); - arg.rscale = scale; - arg.dscale = scale; + /* We don't allow negative output dscale */ + if (scale < 0) + arg.dscale = 0; /* * Return the rounded result @@ -558,13 +621,12 @@ numeric_round(PG_FUNCTION_ARGS) } -/* ---------- +/* * numeric_trunc() - * * Truncate a value to have 'scale' digits after the decimal point. * We allow negative 'scale', implying a truncation before the decimal * point --- Oracle interprets truncation that way. - * ---------- */ Datum numeric_trunc(PG_FUNCTION_ARGS) @@ -582,7 +644,6 @@ numeric_trunc(PG_FUNCTION_ARGS) /* * Limit the scale value to avoid possible overflow in calculations - * below. */ scale = Max(scale, -NUMERIC_MAX_RESULT_SCALE); scale = Min(scale, NUMERIC_MAX_RESULT_SCALE); @@ -593,14 +654,11 @@ numeric_trunc(PG_FUNCTION_ARGS) init_var(&arg); set_var_from_num(num, &arg); - arg.ndigits = Min(arg.ndigits, Max(0, arg.weight + scale + 1)); + trunc_var(&arg, scale); - /* - * Set result's scale to something reasonable. - */ - scale = Min(NUMERIC_MAX_DISPLAY_SCALE, Max(0, scale)); - arg.rscale = scale; - arg.dscale = scale; + /* We don't allow negative output dscale */ + if (scale < 0) + arg.dscale = 0; /* * Return the truncated result @@ -612,11 +670,10 @@ numeric_trunc(PG_FUNCTION_ARGS) } -/* ---------- +/* * numeric_ceil() - * * Return the smallest integer greater than or equal to the argument - * ---------- */ Datum numeric_ceil(PG_FUNCTION_ARGS) @@ -633,8 +690,6 @@ numeric_ceil(PG_FUNCTION_ARGS) set_var_from_num(num, &result); ceil_var(&result, &result); - result.dscale = 0; - res = make_result(&result); free_var(&result); @@ -642,11 +697,10 @@ numeric_ceil(PG_FUNCTION_ARGS) } -/* ---------- +/* * numeric_floor() - * * Return the largest integer equal to or less than the argument - * ---------- */ Datum numeric_floor(PG_FUNCTION_ARGS) @@ -663,8 +717,6 @@ numeric_floor(PG_FUNCTION_ARGS) set_var_from_num(num, &result); floor_var(&result, &result); - result.dscale = 0; - res = make_result(&result); free_var(&result); @@ -833,17 +885,16 @@ cmp_numerics(Numeric num1, Numeric num2) /* ---------------------------------------------------------------------- * - * Arithmetic base functions + * Basic arithmetic functions * * ---------------------------------------------------------------------- */ -/* ---------- +/* * numeric_add() - * * Add two numerics - * ---------- */ Datum numeric_add(PG_FUNCTION_ARGS) @@ -863,8 +914,6 @@ numeric_add(PG_FUNCTION_ARGS) /* * Unpack the values, let add_var() compute the result and return it. - * The internals of add_var() will automatically set the correct - * result and display scales in the result. */ init_var(&arg1); init_var(&arg2); @@ -874,6 +923,7 @@ numeric_add(PG_FUNCTION_ARGS) set_var_from_num(num2, &arg2); add_var(&arg1, &arg2, &result); + res = make_result(&result); free_var(&arg1); @@ -884,11 +934,10 @@ numeric_add(PG_FUNCTION_ARGS) } -/* ---------- +/* * numeric_sub() - * * Subtract one numeric from another - * ---------- */ Datum numeric_sub(PG_FUNCTION_ARGS) @@ -907,8 +956,7 @@ numeric_sub(PG_FUNCTION_ARGS) PG_RETURN_NUMERIC(make_result(&const_nan)); /* - * Unpack the two arguments, let sub_var() compute the result and - * return it. + * Unpack the values, let sub_var() compute the result and return it. */ init_var(&arg1); init_var(&arg2); @@ -918,6 +966,7 @@ numeric_sub(PG_FUNCTION_ARGS) set_var_from_num(num2, &arg2); sub_var(&arg1, &arg2, &result); + res = make_result(&result); free_var(&arg1); @@ -928,11 +977,10 @@ numeric_sub(PG_FUNCTION_ARGS) } -/* ---------- +/* * numeric_mul() - * * Calculate the product of two numerics - * ---------- */ Datum numeric_mul(PG_FUNCTION_ARGS) @@ -951,12 +999,11 @@ numeric_mul(PG_FUNCTION_ARGS) PG_RETURN_NUMERIC(make_result(&const_nan)); /* - * Unpack the arguments, let mul_var() compute the result and return - * it. Unlike add_var() and sub_var(), mul_var() will round the result - * to the scale stored in global_rscale. In the case of numeric_mul(), - * which is invoked for the * operator on numerics, we set it to the - * exact representation for the product (rscale = sum(rscale of arg1, - * rscale of arg2) and the same for the dscale). + * Unpack the values, let mul_var() compute the result and return it. + * Unlike add_var() and sub_var(), mul_var() will round its result. + * In the case of numeric_mul(), which is invoked for the * operator on + * numerics, we request exact representation for the product (rscale = + * sum(dscale of arg1, dscale of arg2)). */ init_var(&arg1); init_var(&arg2); @@ -965,11 +1012,7 @@ numeric_mul(PG_FUNCTION_ARGS) set_var_from_num(num1, &arg1); set_var_from_num(num2, &arg2); - global_rscale = arg1.rscale + arg2.rscale; - - mul_var(&arg1, &arg2, &result); - - result.dscale = arg1.dscale + arg2.dscale; + mul_var(&arg1, &arg2, &result, arg1.dscale + arg2.dscale); res = make_result(&result); @@ -981,11 +1024,10 @@ numeric_mul(PG_FUNCTION_ARGS) } -/* ---------- +/* * numeric_div() - * * Divide one numeric into another - * ---------- */ Datum numeric_div(PG_FUNCTION_ARGS) @@ -996,7 +1038,7 @@ numeric_div(PG_FUNCTION_ARGS) NumericVar arg2; NumericVar result; Numeric res; - int res_dscale; + int rscale; /* * Handle NaN @@ -1014,14 +1056,15 @@ numeric_div(PG_FUNCTION_ARGS) set_var_from_num(num1, &arg1); set_var_from_num(num2, &arg2); - res_dscale = select_div_scale(&arg1, &arg2); - /* - * Do the divide, set the display scale and return the result + * Select scale for division result */ - div_var(&arg1, &arg2, &result); + rscale = select_div_scale(&arg1, &arg2); - result.dscale = res_dscale; + /* + * Do the divide and return the result + */ + div_var(&arg1, &arg2, &result, rscale); res = make_result(&result); @@ -1033,11 +1076,10 @@ numeric_div(PG_FUNCTION_ARGS) } -/* ---------- +/* * numeric_mod() - * * Calculate the modulo of two numerics - * ---------- */ Datum numeric_mod(PG_FUNCTION_ARGS) @@ -1071,11 +1113,10 @@ numeric_mod(PG_FUNCTION_ARGS) } -/* ---------- +/* * numeric_inc() - * * Increment a number by one - * ---------- */ Datum numeric_inc(PG_FUNCTION_ARGS) @@ -1098,6 +1139,7 @@ numeric_inc(PG_FUNCTION_ARGS) set_var_from_num(num, &arg); add_var(&arg, &const_one, &arg); + res = make_result(&arg); free_var(&arg); @@ -1106,11 +1148,10 @@ numeric_inc(PG_FUNCTION_ARGS) } -/* ---------- +/* * numeric_smaller() - * * Return the smaller of two numbers - * ---------- */ Datum numeric_smaller(PG_FUNCTION_ARGS) @@ -1148,11 +1189,10 @@ numeric_smaller(PG_FUNCTION_ARGS) } -/* ---------- +/* * numeric_larger() - * * Return the larger of two numbers - * ---------- */ Datum numeric_larger(PG_FUNCTION_ARGS) @@ -1192,17 +1232,16 @@ numeric_larger(PG_FUNCTION_ARGS) /* ---------------------------------------------------------------------- * - * Complex math functions + * Advanced math functions * * ---------------------------------------------------------------------- */ -/* ---------- +/* * numeric_sqrt() - * * Compute the square root of a numeric. - * ---------- */ Datum numeric_sqrt(PG_FUNCTION_ARGS) @@ -1212,7 +1251,7 @@ numeric_sqrt(PG_FUNCTION_ARGS) NumericVar arg; NumericVar result; int sweight; - int res_dscale; + int rscale; /* * Handle NaN @@ -1221,7 +1260,7 @@ numeric_sqrt(PG_FUNCTION_ARGS) PG_RETURN_NUMERIC(make_result(&const_nan)); /* - * Unpack the argument and determine the scales. We choose a display + * Unpack the argument and determine the result scale. We choose a * scale to give at least NUMERIC_MIN_SIG_DIGITS significant digits; * but in any case not less than the input's dscale. */ @@ -1231,21 +1270,17 @@ numeric_sqrt(PG_FUNCTION_ARGS) set_var_from_num(num, &arg); /* Assume the input was normalized, so arg.weight is accurate */ - sweight = (arg.weight / 2) - 1; - - res_dscale = NUMERIC_MIN_SIG_DIGITS - sweight; - res_dscale = Max(res_dscale, arg.dscale); - res_dscale = Max(res_dscale, NUMERIC_MIN_DISPLAY_SCALE); - res_dscale = Min(res_dscale, NUMERIC_MAX_DISPLAY_SCALE); + sweight = (arg.weight + 1) * DEC_DIGITS / 2 - 1; - global_rscale = res_dscale + 8; + rscale = NUMERIC_MIN_SIG_DIGITS - sweight; + rscale = Max(rscale, arg.dscale); + rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE); + rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE); /* * Let sqrt_var() do the calculation and return the result. */ - sqrt_var(&arg, &result); - - result.dscale = res_dscale; + sqrt_var(&arg, &result, rscale); res = make_result(&result); @@ -1256,11 +1291,10 @@ numeric_sqrt(PG_FUNCTION_ARGS) } -/* ---------- +/* * numeric_exp() - * * Raise e to the power of x - * ---------- */ Datum numeric_exp(PG_FUNCTION_ARGS) @@ -1269,7 +1303,7 @@ numeric_exp(PG_FUNCTION_ARGS) Numeric res; NumericVar arg; NumericVar result; - int res_dscale; + int rscale; double val; /* @@ -1279,7 +1313,7 @@ numeric_exp(PG_FUNCTION_ARGS) PG_RETURN_NUMERIC(make_result(&const_nan)); /* - * Unpack the argument and determine the scales. We choose a display + * Unpack the argument and determine the result scale. We choose a * scale to give at least NUMERIC_MIN_SIG_DIGITS significant digits; * but in any case not less than the input's dscale. */ @@ -1289,28 +1323,27 @@ numeric_exp(PG_FUNCTION_ARGS) set_var_from_num(num, &arg); /* convert input to float8, ignoring overflow */ - val = numeric_to_double_no_overflow(num); + val = numericvar_to_double_no_overflow(&arg); - /* log10(result) = num * log10(e), so this is approximately the weight: */ + /* + * log10(result) = num * log10(e), so this is approximately the decimal + * weight of the result: + */ val *= 0.434294481903252; /* limit to something that won't cause integer overflow */ val = Max(val, -NUMERIC_MAX_RESULT_SCALE); val = Min(val, NUMERIC_MAX_RESULT_SCALE); - res_dscale = NUMERIC_MIN_SIG_DIGITS - (int) val; - res_dscale = Max(res_dscale, arg.dscale); - res_dscale = Max(res_dscale, NUMERIC_MIN_DISPLAY_SCALE); - res_dscale = Min(res_dscale, NUMERIC_MAX_DISPLAY_SCALE); - - global_rscale = res_dscale + NUMERIC_EXTRA_DIGITS; + rscale = NUMERIC_MIN_SIG_DIGITS - (int) val; + rscale = Max(rscale, arg.dscale); + rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE); + rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE); /* * Let exp_var() do the calculation and return the result. */ - exp_var(&arg, &result); - - result.dscale = res_dscale; + exp_var(&arg, &result, rscale); res = make_result(&result); @@ -1321,11 +1354,10 @@ numeric_exp(PG_FUNCTION_ARGS) } -/* ---------- +/* * numeric_ln() - * * Compute the natural logarithm of x - * ---------- */ Datum numeric_ln(PG_FUNCTION_ARGS) @@ -1334,7 +1366,8 @@ numeric_ln(PG_FUNCTION_ARGS) Numeric res; NumericVar arg; NumericVar result; - int res_dscale; + int dec_digits; + int rscale; /* * Handle NaN @@ -1347,22 +1380,21 @@ numeric_ln(PG_FUNCTION_ARGS) set_var_from_num(num, &arg); - if (arg.weight > 0) - res_dscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(arg.weight); - else if (arg.weight < 0) - res_dscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(- arg.weight); - else - res_dscale = NUMERIC_MIN_SIG_DIGITS; - - res_dscale = Max(res_dscale, arg.dscale); - res_dscale = Max(res_dscale, NUMERIC_MIN_DISPLAY_SCALE); - res_dscale = Min(res_dscale, NUMERIC_MAX_DISPLAY_SCALE); + /* Approx decimal digits before decimal point */ + dec_digits = (arg.weight + 1) * DEC_DIGITS; - global_rscale = res_dscale + NUMERIC_EXTRA_DIGITS; + if (dec_digits > 1) + rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(dec_digits - 1); + else if (dec_digits < 1) + rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(1 - dec_digits); + else + rscale = NUMERIC_MIN_SIG_DIGITS; - ln_var(&arg, &result); + rscale = Max(rscale, arg.dscale); + rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE); + rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE); - result.dscale = res_dscale; + ln_var(&arg, &result, rscale); res = make_result(&result); @@ -1373,11 +1405,10 @@ numeric_ln(PG_FUNCTION_ARGS) } -/* ---------- +/* * numeric_log() - * * Compute the logarithm of x in a given base - * ---------- */ Datum numeric_log(PG_FUNCTION_ARGS) @@ -1407,7 +1438,7 @@ numeric_log(PG_FUNCTION_ARGS) /* * Call log_var() to compute and return the result; note it handles - * rscale/dscale itself. + * scale selection itself. */ log_var(&arg1, &arg2, &result); @@ -1421,11 +1452,10 @@ numeric_log(PG_FUNCTION_ARGS) } -/* ---------- +/* * numeric_power() - * * Raise b to the power of x - * ---------- */ Datum numeric_power(PG_FUNCTION_ARGS) @@ -1455,7 +1485,7 @@ numeric_power(PG_FUNCTION_ARGS) /* * Call power_var() to compute and return the result; note it handles - * rscale/dscale itself. + * scale selection itself. */ power_var(&arg1, &arg2, &result); @@ -1483,17 +1513,14 @@ int4_numeric(PG_FUNCTION_ARGS) int32 val = PG_GETARG_INT32(0); Numeric res; NumericVar result; - char *tmp; init_var(&result); - tmp = DatumGetCString(DirectFunctionCall1(int4out, - Int32GetDatum(val))); - set_var_from_str(tmp, &result); + int8_to_numericvar((int64) val, &result); + res = make_result(&result); free_var(&result); - pfree(tmp); PG_RETURN_NUMERIC(res); } @@ -1504,46 +1531,47 @@ numeric_int4(PG_FUNCTION_ARGS) { Numeric num = PG_GETARG_NUMERIC(0); NumericVar x; - char *str; - Datum result; + int64 val; + int32 result; /* XXX would it be better to return NULL? */ if (NUMERIC_IS_NAN(num)) elog(ERROR, "Cannot convert NaN to int4"); - /* - * Get the number in the variable format so we can round to integer. - */ + /* Convert to variable format and thence to int8 */ init_var(&x); set_var_from_num(num, &x); - str = get_str_from_var(&x, 0); /* dscale = 0 produces rounding */ + if (!numericvar_to_int8(&x, &val)) + elog(ERROR, "numeric conversion to int4 is out of range"); free_var(&x); - result = DirectFunctionCall1(int4in, CStringGetDatum(str)); - pfree(str); + /* Down-convert to int4 */ + result = (int32) val; - PG_RETURN_DATUM(result); + /* Test for overflow by reverse-conversion. */ + if ((int64) result != val) + elog(ERROR, "numeric conversion to int4 is out of range"); + + PG_RETURN_INT32(result); } Datum int8_numeric(PG_FUNCTION_ARGS) { - Datum val = PG_GETARG_DATUM(0); + int64 val = PG_GETARG_INT64(0); Numeric res; NumericVar result; - char *tmp; init_var(&result); - tmp = DatumGetCString(DirectFunctionCall1(int8out, val)); - set_var_from_str(tmp, &result); + int8_to_numericvar(val, &result); + res = make_result(&result); free_var(&result); - pfree(tmp); PG_RETURN_NUMERIC(res); } @@ -1554,27 +1582,22 @@ numeric_int8(PG_FUNCTION_ARGS) { Numeric num = PG_GETARG_NUMERIC(0); NumericVar x; - char *str; - Datum result; + int64 result; /* XXX would it be better to return NULL? */ if (NUMERIC_IS_NAN(num)) elog(ERROR, "Cannot convert NaN to int8"); - /* - * Get the number in the variable format so we can round to integer. - */ + /* Convert to variable format and thence to int8 */ init_var(&x); set_var_from_num(num, &x); - str = get_str_from_var(&x, 0); /* dscale = 0 produces rounding */ + if (!numericvar_to_int8(&x, &result)) + elog(ERROR, "numeric conversion to int8 is out of range"); free_var(&x); - result = DirectFunctionCall1(int8in, CStringGetDatum(str)); - pfree(str); - - PG_RETURN_DATUM(result); + PG_RETURN_INT64(result); } @@ -1584,17 +1607,14 @@ int2_numeric(PG_FUNCTION_ARGS) int16 val = PG_GETARG_INT16(0); Numeric res; NumericVar result; - char *tmp; init_var(&result); - tmp = DatumGetCString(DirectFunctionCall1(int2out, - Int16GetDatum(val))); - set_var_from_str(tmp, &result); + int8_to_numericvar((int64) val, &result); + res = make_result(&result); free_var(&result); - pfree(tmp); PG_RETURN_NUMERIC(res); } @@ -1605,27 +1625,30 @@ numeric_int2(PG_FUNCTION_ARGS) { Numeric num = PG_GETARG_NUMERIC(0); NumericVar x; - char *str; - Datum result; + int64 val; + int16 result; /* XXX would it be better to return NULL? */ if (NUMERIC_IS_NAN(num)) elog(ERROR, "Cannot convert NaN to int2"); - /* - * Get the number in the variable format so we can round to integer. - */ + /* Convert to variable format and thence to int8 */ init_var(&x); set_var_from_num(num, &x); - str = get_str_from_var(&x, 0); /* dscale = 0 produces rounding */ + if (!numericvar_to_int8(&x, &val)) + elog(ERROR, "numeric conversion to int2 is out of range"); free_var(&x); - result = DirectFunctionCall1(int2in, CStringGetDatum(str)); - pfree(str); + /* Down-convert to int2 */ + result = (int16) val; - return result; + /* Test for overflow by reverse-conversion. */ + if ((int64) result != val) + elog(ERROR, "numeric conversion to int2 is out of range"); + + PG_RETURN_INT16(result); } @@ -1926,7 +1949,7 @@ numeric_variance(PG_FUNCTION_ARGS) vsumX, vsumX2, vNminus1; - int div_dscale; + int rscale; /* We assume the input is array of numeric */ deconstruct_array(transarray, @@ -1964,11 +1987,11 @@ numeric_variance(PG_FUNCTION_ARGS) init_var(&vsumX2); set_var_from_num(sumX2, &vsumX2); - /* set rscale for mul_var calls */ - global_rscale = vsumX.rscale * 2; + /* compute rscale for mul_var calls */ + rscale = vsumX.dscale * 2; - mul_var(&vsumX, &vsumX, &vsumX); /* now vsumX contains sumX * sumX */ - mul_var(&vN, &vsumX2, &vsumX2); /* now vsumX2 contains N * sumX2 */ + mul_var(&vsumX, &vsumX, &vsumX, rscale); /* vsumX = sumX * sumX */ + mul_var(&vN, &vsumX2, &vsumX2, rscale); /* vsumX2 = N * sumX2 */ sub_var(&vsumX2, &vsumX, &vsumX2); /* N * sumX2 - sumX * sumX */ if (cmp_var(&vsumX2, &const_zero) <= 0) @@ -1978,10 +2001,9 @@ numeric_variance(PG_FUNCTION_ARGS) } else { - mul_var(&vN, &vNminus1, &vNminus1); /* N * (N - 1) */ - div_dscale = select_div_scale(&vsumX2, &vNminus1); - div_var(&vsumX2, &vNminus1, &vsumX); /* variance */ - vsumX.dscale = div_dscale; + mul_var(&vN, &vNminus1, &vNminus1, 0); /* N * (N - 1) */ + rscale = select_div_scale(&vsumX2, &vNminus1); + div_var(&vsumX2, &vNminus1, &vsumX, rscale); /* variance */ res = make_result(&vsumX); } @@ -2008,7 +2030,7 @@ numeric_stddev(PG_FUNCTION_ARGS) vsumX, vsumX2, vNminus1; - int div_dscale; + int rscale; /* We assume the input is array of numeric */ deconstruct_array(transarray, @@ -2046,11 +2068,11 @@ numeric_stddev(PG_FUNCTION_ARGS) init_var(&vsumX2); set_var_from_num(sumX2, &vsumX2); - /* set rscale for mul_var calls */ - global_rscale = vsumX.rscale * 2; + /* compute rscale for mul_var calls */ + rscale = vsumX.dscale * 2; - mul_var(&vsumX, &vsumX, &vsumX); /* now vsumX contains sumX * sumX */ - mul_var(&vN, &vsumX2, &vsumX2); /* now vsumX2 contains N * sumX2 */ + mul_var(&vsumX, &vsumX, &vsumX, rscale); /* vsumX = sumX * sumX */ + mul_var(&vN, &vsumX2, &vsumX2, rscale); /* vsumX2 = N * sumX2 */ sub_var(&vsumX2, &vsumX, &vsumX2); /* N * sumX2 - sumX * sumX */ if (cmp_var(&vsumX2, &const_zero) <= 0) @@ -2060,11 +2082,10 @@ numeric_stddev(PG_FUNCTION_ARGS) } else { - mul_var(&vN, &vNminus1, &vNminus1); /* N * (N - 1) */ - div_dscale = select_div_scale(&vsumX2, &vNminus1); - div_var(&vsumX2, &vNminus1, &vsumX); /* variance */ - vsumX.dscale = div_dscale; - sqrt_var(&vsumX, &vsumX); /* stddev */ + mul_var(&vN, &vNminus1, &vNminus1, 0); /* N * (N - 1) */ + rscale = select_div_scale(&vsumX2, &vNminus1); + div_var(&vsumX2, &vNminus1, &vsumX, rscale); /* variance */ + sqrt_var(&vsumX, &vsumX, rscale); /* stddev */ res = make_result(&vsumX); } @@ -2267,25 +2288,26 @@ int8_avg(PG_FUNCTION_ARGS) /* ---------------------------------------------------------------------- * - * Local functions follow + * Debug support * * ---------------------------------------------------------------------- */ - #ifdef NUMERIC_DEBUG -/* ---------- +/* * dump_numeric() - Dump a value in the db storage format for debugging - * ---------- */ static void -dump_numeric(char *str, Numeric num) +dump_numeric(const char *str, Numeric num) { + NumericDigit *digits = (NumericDigit *) num->n_data; + int ndigits; int i; - printf("%s: NUMERIC w=%d r=%d d=%d ", str, num->n_weight, num->n_rscale, - NUMERIC_DSCALE(num)); + ndigits = (num->varlen - NUMERIC_HDRSZ) / sizeof(NumericDigit); + + printf("%s: NUMERIC w=%d d=%d ", str, num->n_weight, NUMERIC_DSCALE(num)); switch (NUMERIC_SIGN(num)) { case NUMERIC_POS: @@ -2302,23 +2324,21 @@ dump_numeric(char *str, Numeric num) break; } - for (i = 0; i < num->varlen - NUMERIC_HDRSZ; i++) - printf(" %d %d", (num->n_data[i] >> 4) & 0x0f, num->n_data[i] & 0x0f); + for (i = 0; i < ndigits; i++) + printf(" %0*d", DEC_DIGITS, digits[i]); printf("\n"); } -/* ---------- +/* * dump_var() - Dump a value in the variable format for debugging - * ---------- */ static void -dump_var(char *str, NumericVar *var) +dump_var(const char *str, NumericVar *var) { int i; - printf("%s: VAR w=%d r=%d d=%d ", str, var->weight, var->rscale, - var->dscale); + printf("%s: VAR w=%d d=%d ", str, var->weight, var->dscale); switch (var->sign) { case NUMERIC_POS: @@ -2336,35 +2356,45 @@ dump_var(char *str, NumericVar *var) } for (i = 0; i < var->ndigits; i++) - printf(" %d", var->digits[i]); + printf(" %0*d", DEC_DIGITS, var->digits[i]); printf("\n"); } + #endif /* NUMERIC_DEBUG */ -/* ---------- +/* ---------------------------------------------------------------------- + * + * Local functions follow + * + * In general, these do not support NaNs --- callers must eliminate + * the possibility of NaN first. (make_result() is an exception.) + * + * ---------------------------------------------------------------------- + */ + + +/* * alloc_var() - * * Allocate a digit buffer of ndigits digits (plus a spare digit for rounding) - * ---------- */ static void alloc_var(NumericVar *var, int ndigits) { digitbuf_free(var->buf); var->buf = digitbuf_alloc(ndigits + 1); - var->buf[0] = 0; + var->buf[0] = 0; /* spare digit for rounding */ var->digits = var->buf + 1; var->ndigits = ndigits; } -/* ---------- +/* * free_var() - * * Return the digit buffer of a variable to the free pool - * ---------- */ static void free_var(NumericVar *var) @@ -2376,12 +2406,11 @@ free_var(NumericVar *var) } -/* ---------- +/* * zero_var() - * * Set a variable to ZERO. - * Note: rscale and dscale are not touched. - * ---------- + * Note: its dscale is not touched. */ static void zero_var(NumericVar *var) @@ -2395,19 +2424,33 @@ zero_var(NumericVar *var) } -/* ---------- +/* * set_var_from_str() * * Parse a string and put the number into a variable - * ---------- */ static void -set_var_from_str(char *str, NumericVar *dest) +set_var_from_str(const char *str, NumericVar *dest) { - char *cp = str; + const char *cp = str; bool have_dp = FALSE; - int i = 0; + int i; + unsigned char *decdigits; + int sign = NUMERIC_POS; + int dweight = -1; + int ddigits; + int dscale = 0; + int weight; + int ndigits; + int offset; + NumericDigit *digits; + /* + * We first parse the string to extract decimal digits and determine the + * correct decimal weight. Then convert to NBASE representation. + */ + + /* skip leading spaces */ while (*cp) { if (!isspace((unsigned char) *cp)) @@ -2415,20 +2458,15 @@ set_var_from_str(char *str, NumericVar *dest) cp++; } - alloc_var(dest, strlen(cp)); - dest->weight = -1; - dest->dscale = 0; - dest->sign = NUMERIC_POS; - switch (*cp) { case '+': - dest->sign = NUMERIC_POS; + sign = NUMERIC_POS; cp++; break; case '-': - dest->sign = NUMERIC_NEG; + sign = NUMERIC_NEG; cp++; break; } @@ -2442,15 +2480,21 @@ set_var_from_str(char *str, NumericVar *dest) if (!isdigit((unsigned char) *cp)) elog(ERROR, "Bad numeric input format '%s'", str); + decdigits = (unsigned char *) palloc(strlen(cp) + DEC_DIGITS*2); + + /* leading padding for digit alignment later */ + memset(decdigits, 0, DEC_DIGITS); + i = DEC_DIGITS; + while (*cp) { if (isdigit((unsigned char) *cp)) { - dest->digits[i++] = *cp++ - '0'; + decdigits[i++] = *cp++ - '0'; if (!have_dp) - dest->weight++; + dweight++; else - dest->dscale++; + dscale++; } else if (*cp == '.') { @@ -2462,7 +2506,10 @@ set_var_from_str(char *str, NumericVar *dest) else break; } - dest->ndigits = i; + + ddigits = i - DEC_DIGITS; + /* trailing padding for digit alignment later */ + memset(decdigits + i, 0, DEC_DIGITS-1); /* Handle exponent, if any */ if (*cp == 'e' || *cp == 'E') @@ -2478,10 +2525,10 @@ set_var_from_str(char *str, NumericVar *dest) if (exponent > NUMERIC_MAX_PRECISION || exponent < -NUMERIC_MAX_PRECISION) elog(ERROR, "Bad numeric input format '%s'", str); - dest->weight += (int) exponent; - dest->dscale -= (int) exponent; - if (dest->dscale < 0) - dest->dscale = 0; + dweight += (int) exponent; + dscale -= (int) exponent; + if (dscale < 0) + dscale = 0; } /* Should be nothing left but spaces */ @@ -2492,60 +2539,75 @@ set_var_from_str(char *str, NumericVar *dest) cp++; } - /* Strip any leading zeroes */ - while (dest->ndigits > 0 && *(dest->digits) == 0) + /* + * Okay, convert pure-decimal representation to base NBASE. First we + * need to determine the converted weight and ndigits. offset is the + * number of decimal zeroes to insert before the first given digit to + * have a correctly aligned first NBASE digit. + */ + if (dweight >= 0) + weight = (dweight + 1 + DEC_DIGITS-1) / DEC_DIGITS - 1; + else + weight = - ((-dweight - 1) / DEC_DIGITS + 1); + offset = (weight + 1) * DEC_DIGITS - (dweight + 1); + ndigits = (ddigits + offset + DEC_DIGITS-1) / DEC_DIGITS; + + alloc_var(dest, ndigits); + dest->sign = sign; + dest->weight = weight; + dest->dscale = dscale; + + i = DEC_DIGITS - offset; + digits = dest->digits; + + while (ndigits-- > 0) { - (dest->digits)++; - (dest->weight)--; - (dest->ndigits)--; +#if DEC_DIGITS == 4 + *digits++ = ((decdigits[i] * 10 + decdigits[i+1]) * 10 + + decdigits[i+2]) * 10 + decdigits[i+3]; +#elif DEC_DIGITS == 2 + *digits++ = decdigits[i] * 10 + decdigits[i+1]; +#elif DEC_DIGITS == 1 + *digits++ = decdigits[i]; +#else +#error unsupported NBASE +#endif + i += DEC_DIGITS; } - if (dest->ndigits == 0) - dest->weight = 0; - dest->rscale = dest->dscale; + pfree(decdigits); + + /* Strip any leading/trailing zeroes, and normalize weight if zero */ + strip_var(dest); } /* * set_var_from_num() - * - * Parse back the packed db format into a variable - * + * Convert the packed db format into a variable */ static void set_var_from_num(Numeric num, NumericVar *dest) { - NumericDigit *digit; - int i; - int n; + int ndigits; - n = num->varlen - NUMERIC_HDRSZ; /* number of digit-pairs in packed - * fmt */ + ndigits = (num->varlen - NUMERIC_HDRSZ) / sizeof(NumericDigit); - alloc_var(dest, n * 2); + alloc_var(dest, ndigits); dest->weight = num->n_weight; - dest->rscale = num->n_rscale; - dest->dscale = NUMERIC_DSCALE(num); dest->sign = NUMERIC_SIGN(num); + dest->dscale = NUMERIC_DSCALE(num); - digit = dest->digits; - - for (i = 0; i < n; i++) - { - unsigned char digitpair = num->n_data[i]; - - *digit++ = (digitpair >> 4) & 0x0f; - *digit++ = digitpair & 0x0f; - } + memcpy(dest->digits, num->n_data, ndigits * sizeof(NumericDigit)); } -/* ---------- +/* * set_var_from_var() - * * Copy one variable into another - * ---------- */ static void set_var_from_var(NumericVar *value, NumericVar *dest) @@ -2554,7 +2616,7 @@ set_var_from_var(NumericVar *value, NumericVar *dest) newbuf = digitbuf_alloc(value->ndigits + 1); newbuf[0] = 0; /* spare digit for rounding */ - memcpy(newbuf + 1, value->digits, value->ndigits); + memcpy(newbuf + 1, value->digits, value->ndigits * sizeof(NumericDigit)); digitbuf_free(dest->buf); @@ -2564,56 +2626,47 @@ set_var_from_var(NumericVar *value, NumericVar *dest) } -/* ---------- +/* * get_str_from_var() - * * Convert a var to text representation (guts of numeric_out). * CAUTION: var's contents may be modified by rounding! - * Caller must have checked for NaN case. * Returns a palloc'd string. - * ---------- */ static char * get_str_from_var(NumericVar *var, int dscale) { char *str; char *cp; + char *endcp; int i; int d; + NumericDigit dig; +#if DEC_DIGITS > 1 + NumericDigit d1; +#endif + + if (dscale < 0) + dscale = 0; /* * Check if we must round up before printing the value and do so. */ - i = dscale + var->weight + 1; - if (i >= 0 && var->ndigits > i) - { - int carry = (var->digits[i] > 4) ? 1 : 0; - - var->ndigits = i; - - while (carry) - { - carry += var->digits[--i]; - var->digits[i] = carry % 10; - carry /= 10; - } - - if (i < 0) - { - Assert(i == -1); /* better not have added more than 1 digit */ - Assert(var->digits > var->buf); - var->digits--; - var->ndigits++; - var->weight++; - } - } - else - var->ndigits = Max(0, Min(i, var->ndigits)); + round_var(var, dscale); /* - * Allocate space for the result + * Allocate space for the result. + * + * i is set to to # of decimal digits before decimal point. + * dscale is the # of decimal digits we will print after decimal point. + * We may generate as many as DEC_DIGITS-1 excess digits at the end, + * and in addition we need room for sign, decimal point, null terminator. */ - str = palloc(Max(0, dscale) + Max(0, var->weight) + 4); + i = (var->weight + 1) * DEC_DIGITS; + if (i <= 0) + i = 1; + + str = palloc(i + dscale + DEC_DIGITS + 2); cp = str; /* @@ -2625,33 +2678,87 @@ get_str_from_var(NumericVar *var, int dscale) /* * Output all digits before the decimal point */ - i = Max(var->weight, 0); - d = 0; - - while (i >= 0) + if (var->weight < 0) { - if (i <= var->weight && d < var->ndigits) - *cp++ = var->digits[d++] + '0'; - else - *cp++ = '0'; - i--; + d = var->weight + 1; + *cp++ = '0'; + } + else + { + for (d = 0; d <= var->weight; d++) + { + dig = (d < var->ndigits) ? var->digits[d] : 0; + /* In the first digit, suppress extra leading decimal zeroes */ +#if DEC_DIGITS == 4 + { + bool putit = (d > 0); + + d1 = dig / 1000; + dig -= d1 * 1000; + putit |= (d1 > 0); + if (putit) + *cp++ = d1 + '0'; + d1 = dig / 100; + dig -= d1 * 100; + putit |= (d1 > 0); + if (putit) + *cp++ = d1 + '0'; + d1 = dig / 10; + dig -= d1 * 10; + putit |= (d1 > 0); + if (putit) + *cp++ = d1 + '0'; + *cp++ = dig + '0'; + } +#elif DEC_DIGITS == 2 + d1 = dig / 10; + dig -= d1 * 10; + if (d1 > 0 || d > 0) + *cp++ = d1 + '0'; + *cp++ = dig + '0'; +#elif DEC_DIGITS == 1 + *cp++ = dig + '0'; +#else +#error unsupported NBASE +#endif + } } /* * If requested, output a decimal point and all the digits that follow - * it. + * it. We initially put out a multiple of DEC_DIGITS digits, then + * truncate if needed. */ if (dscale > 0) { *cp++ = '.'; - while (i >= -dscale) + endcp = cp + dscale; + for (i = 0; i < dscale; d++, i += DEC_DIGITS) { - if (i <= var->weight && d < var->ndigits) - *cp++ = var->digits[d++] + '0'; - else - *cp++ = '0'; - i--; + dig = (d >= 0 && d < var->ndigits) ? var->digits[d] : 0; +#if DEC_DIGITS == 4 + d1 = dig / 1000; + dig -= d1 * 1000; + *cp++ = d1 + '0'; + d1 = dig / 100; + dig -= d1 * 100; + *cp++ = d1 + '0'; + d1 = dig / 10; + dig -= d1 * 10; + *cp++ = d1 + '0'; + *cp++ = dig + '0'; +#elif DEC_DIGITS == 2 + d1 = dig / 10; + dig -= d1 * 10; + *cp++ = d1 + '0'; + *cp++ = dig + '0'; +#elif DEC_DIGITS == 1 + *cp++ = dig + '0'; +#else +#error unsupported NBASE +#endif } + cp = endcp; } /* @@ -2662,23 +2769,21 @@ get_str_from_var(NumericVar *var, int dscale) } -/* ---------- +/* * make_result() - * * Create the packed db numeric format in palloc()'d memory from - * a variable. The var's rscale determines the number of digits kept. - * ---------- + * a variable. */ static Numeric make_result(NumericVar *var) { Numeric result; - NumericDigit *digit = var->digits; + NumericDigit *digits = var->digits; int weight = var->weight; int sign = var->sign; int n; - int i, - j; + Size len; if (sign == NUMERIC_NAN) { @@ -2686,24 +2791,23 @@ make_result(NumericVar *var) result->varlen = NUMERIC_HDRSZ; result->n_weight = 0; - result->n_rscale = 0; result->n_sign_dscale = NUMERIC_NAN; dump_numeric("make_result()", result); return result; } - n = Max(0, Min(var->ndigits, var->weight + var->rscale + 1)); + n = var->ndigits; /* truncate leading zeroes */ - while (n > 0 && *digit == 0) + while (n > 0 && *digits == 0) { - digit++; + digits++; weight--; n--; } /* truncate trailing zeroes */ - while (n > 0 && digit[n - 1] == 0) + while (n > 0 && digits[n - 1] == 0) n--; /* If zero result, force to weight=0 and positive sign */ @@ -2713,42 +2817,38 @@ make_result(NumericVar *var) sign = NUMERIC_POS; } - result = (Numeric) palloc(NUMERIC_HDRSZ + (n + 1) / 2); - result->varlen = NUMERIC_HDRSZ + (n + 1) / 2; + /* Build the result */ + len = NUMERIC_HDRSZ + n * sizeof(NumericDigit); + result = (Numeric) palloc(len); + result->varlen = len; result->n_weight = weight; - result->n_rscale = var->rscale; - result->n_sign_dscale = sign | - ((uint16) var->dscale & NUMERIC_DSCALE_MASK); + result->n_sign_dscale = sign | (var->dscale & NUMERIC_DSCALE_MASK); - i = 0; - j = 0; - while (j < n) - { - unsigned char digitpair = digit[j++] << 4; + memcpy(result->n_data, digits, n * sizeof(NumericDigit)); - if (j < n) - digitpair |= digit[j++]; - result->n_data[i++] = digitpair; - } + /* Check for overflow of int16 fields */ + if (result->n_weight != weight || + NUMERIC_DSCALE(result) != var->dscale) + elog(ERROR, "Value overflows numeric format"); dump_numeric("make_result()", result); return result; } -/* ---------- +/* * apply_typmod() - * * Do bounds checking and rounding according to the attributes * typmod field. - * ---------- */ static void apply_typmod(NumericVar *var, int32 typmod) { int precision; int scale; - int maxweight; + int maxdigits; + int ddigits; int i; /* Do nothing if we have a default typmod (-1) */ @@ -2758,34 +2858,10 @@ apply_typmod(NumericVar *var, int32 typmod) typmod -= VARHDRSZ; precision = (typmod >> 16) & 0xffff; scale = typmod & 0xffff; - maxweight = precision - scale; + maxdigits = precision - scale; - /* Round to target scale */ - i = scale + var->weight + 1; - if (i >= 0 && var->ndigits > i) - { - int carry = (var->digits[i] > 4) ? 1 : 0; - - var->ndigits = i; - - while (carry) - { - carry += var->digits[--i]; - var->digits[i] = carry % 10; - carry /= 10; - } - - if (i < 0) - { - Assert(i == -1); /* better not have added more than 1 digit */ - Assert(var->digits > var->buf); - var->digits--; - var->ndigits++; - var->weight++; - } - } - else - var->ndigits = Max(0, Min(i, var->ndigits)); + /* Round to target scale (and set var->dscale) */ + round_var(var, scale); /* * Check for overflow - note we can't do this before rounding, because @@ -2794,30 +2870,146 @@ apply_typmod(NumericVar *var, int32 typmod) * storage but perhaps might not have been yet. In any case, we must * recognize a true zero, whose weight doesn't mean anything. */ - if (var->weight >= maxweight) + ddigits = (var->weight + 1) * DEC_DIGITS; + if (ddigits > maxdigits) { /* Determine true weight; and check for all-zero result */ - int tweight = var->weight; - for (i = 0; i < var->ndigits; i++) { - if (var->digits[i]) + NumericDigit dig = var->digits[i]; + + if (dig) + { + /* Adjust for any high-order decimal zero digits */ +#if DEC_DIGITS == 4 + if (dig < 10) + ddigits -= 3; + else if (dig < 100) + ddigits -= 2; + else if (dig < 1000) + ddigits -= 1; +#elif DEC_DIGITS == 2 + if (dig < 10) + ddigits -= 1; +#elif DEC_DIGITS == 1 + /* no adjustment */ +#else +#error unsupported NBASE +#endif + if (ddigits > maxdigits) + elog(ERROR, "overflow on numeric " + "ABS(value) >= 10^%d for field with precision %d scale %d", + ddigits-1, precision, scale); break; - tweight--; + } + ddigits -= DEC_DIGITS; } + } +} + +/* + * Convert numeric to int8, rounding if needed. + * + * If overflow, return FALSE (no error is raised). Return TRUE if okay. + * + * CAUTION: var's contents may be modified by rounding! + */ +static bool +numericvar_to_int8(NumericVar *var, int64 *result) +{ + NumericDigit *digits; + int ndigits; + int i; + int64 val, + oldval; + bool neg; + + /* Round to nearest integer */ + round_var(var, 0); + + /* Check for zero input */ + strip_var(var); + ndigits = var->ndigits; + if (ndigits == 0) + { + *result = 0; + return true; + } - if (tweight >= maxweight && i < var->ndigits) - elog(ERROR, "overflow on numeric " - "ABS(value) >= 10^%d for field with precision %d scale %d", - tweight, precision, scale); + /* Construct the result */ + digits = var->digits; + neg = (var->sign == NUMERIC_NEG); + val = digits[0]; + for (i = 1; i < ndigits; i++) + { + oldval = val; + val *= NBASE; + val += digits[i]; + /* + * The overflow check is a bit tricky because we want to accept + * INT64_MIN, which will overflow the positive accumulator. We + * can detect this case easily though because INT64_MIN is the + * only nonzero value for which -val == val (on a two's complement + * machine, anyway). + */ + if ((val / NBASE) != oldval) /* possible overflow? */ + { + if (!neg || (-val) != val || val == 0 || oldval < 0) + return false; + } } - var->rscale = scale; - var->dscale = scale; + *result = neg ? -val : val; + return true; } -/* Convert numeric to float8; if out of range, return +/- HUGE_VAL */ -/* Caller should have eliminated the possibility of NAN */ +/* + * Convert int8 value to numeric. + */ +static void +int8_to_numericvar(int64 val, NumericVar *var) +{ + uint64 uval, + newuval; + NumericDigit *ptr; + int ndigits; + + /* int8 can require at most 19 decimal digits; add one for safety */ + alloc_var(var, 20/DEC_DIGITS); + if (val < 0) + { + var->sign = NUMERIC_NEG; + uval = -val; + } + else + { + var->sign = NUMERIC_POS; + uval = val; + } + var->dscale = 0; + if (val == 0) + { + var->ndigits = 0; + var->weight = 0; + return; + } + ptr = var->digits + var->ndigits; + ndigits = 0; + do { + ptr--; + ndigits++; + newuval = uval / NBASE; + *ptr = uval - newuval * NBASE; + uval = newuval; + } while (uval); + var->digits = ptr; + var->ndigits = ndigits; + var->weight = ndigits - 1; +} + +/* + * Convert numeric to float8; if out of range, return +/- HUGE_VAL + */ static double numeric_to_double_no_overflow(Numeric num) { @@ -2865,11 +3057,11 @@ numericvar_to_double_no_overflow(NumericVar *var) } -/* ---------- +/* * cmp_var() - * - * Compare two values on variable level - * ---------- + * Compare two values on variable level. We assume zeroes have been + * truncated to no digits. */ static int cmp_var(NumericVar *var1, NumericVar *var2) @@ -2903,12 +3095,11 @@ cmp_var(NumericVar *var1, NumericVar *var2) } -/* ---------- +/* * add_var() - * * Full version of add functionality on variable level (handling signs). * result might point to one of the operands too without danger. - * ---------- */ static void add_var(NumericVar *var1, NumericVar *var2, NumericVar *result) @@ -2941,7 +3132,6 @@ add_var(NumericVar *var1, NumericVar *var2, NumericVar *result) * ---------- */ zero_var(result); - result->rscale = Max(var1->rscale, var2->rscale); result->dscale = Max(var1->dscale, var2->dscale); break; @@ -2985,7 +3175,6 @@ add_var(NumericVar *var1, NumericVar *var2, NumericVar *result) * ---------- */ zero_var(result); - result->rscale = Max(var1->rscale, var2->rscale); result->dscale = Max(var1->dscale, var2->dscale); break; @@ -3024,12 +3213,11 @@ add_var(NumericVar *var1, NumericVar *var2, NumericVar *result) } -/* ---------- +/* * sub_var() - * * Full version of sub functionality on variable level (handling signs). * result might point to one of the operands too without danger. - * ---------- */ static void sub_var(NumericVar *var1, NumericVar *var2, NumericVar *result) @@ -3065,7 +3253,6 @@ sub_var(NumericVar *var1, NumericVar *var2, NumericVar *result) * ---------- */ zero_var(result); - result->rscale = Max(var1->rscale, var2->rscale); result->dscale = Max(var1->dscale, var2->dscale); break; @@ -3109,7 +3296,6 @@ sub_var(NumericVar *var1, NumericVar *var2, NumericVar *result) * ---------- */ zero_var(result); - result->rscale = Max(var1->rscale, var2->rscale); result->dscale = Max(var1->dscale, var2->dscale); break; @@ -3148,304 +3334,430 @@ sub_var(NumericVar *var1, NumericVar *var2, NumericVar *result) } -/* ---------- +/* * mul_var() - * * Multiplication on variable level. Product of var1 * var2 is stored - * in result. Accuracy of result is determined by global_rscale. - * ---------- + * in result. Result is rounded to no more than rscale fractional digits. */ static void -mul_var(NumericVar *var1, NumericVar *var2, NumericVar *result) +mul_var(NumericVar *var1, NumericVar *var2, NumericVar *result, + int rscale) { - NumericDigit *res_buf; - NumericDigit *res_digits; int res_ndigits; - int res_weight; int res_sign; + int res_weight; + int maxdigits; + int *dig; + int carry; + int maxdig; + int newdig; + NumericDigit *res_digits; int i, ri, i1, i2; - long sum = 0; + /* copy these values into local vars for speed in inner loop */ + int var1ndigits = var1->ndigits; + int var2ndigits = var2->ndigits; + NumericDigit *var1digits = var1->digits; + NumericDigit *var2digits = var2->digits; - res_weight = var1->weight + var2->weight + 2; - res_ndigits = var1->ndigits + var2->ndigits + 1; + if (var1ndigits == 0 || var2ndigits == 0) + { + /* one or both inputs is zero; so is result */ + zero_var(result); + result->dscale = rscale; + return; + } + + /* Determine result sign and (maximum possible) weight */ if (var1->sign == var2->sign) res_sign = NUMERIC_POS; else res_sign = NUMERIC_NEG; + res_weight = var1->weight + var2->weight + 2; - res_buf = digitbuf_alloc(res_ndigits); - res_digits = res_buf; - memset(res_digits, 0, res_ndigits); - - ri = res_ndigits; - for (i1 = var1->ndigits - 1; i1 >= 0; i1--) + /* + * Determine number of result digits to compute. If the exact result + * would have more than rscale fractional digits, truncate the computation + * with MUL_GUARD_DIGITS guard digits. We do that by pretending that + * one or both inputs have fewer digits than they really do. + */ + res_ndigits = var1ndigits + var2ndigits + 1; + maxdigits = res_weight + 1 + (rscale * DEC_DIGITS) + MUL_GUARD_DIGITS; + if (res_ndigits > maxdigits) { - sum = 0; - i = --ri; - - for (i2 = var2->ndigits - 1; i2 >= 0; i2--) + if (maxdigits < 3) + { + /* no useful precision at all in the result... */ + zero_var(result); + result->dscale = rscale; + return; + } + /* force maxdigits odd so that input ndigits can be equal */ + if ((maxdigits & 1) == 0) + maxdigits++; + if (var1ndigits > var2ndigits) { - sum += res_digits[i] + var1->digits[i1] * var2->digits[i2]; - res_digits[i--] = sum % 10; - sum /= 10; + var1ndigits -= res_ndigits - maxdigits; + if (var1ndigits < var2ndigits) + var1ndigits = var2ndigits = (var1ndigits + var2ndigits) / 2; } - res_digits[i] = sum; + else + { + var2ndigits -= res_ndigits - maxdigits; + if (var2ndigits < var1ndigits) + var1ndigits = var2ndigits = (var1ndigits + var2ndigits) / 2; + } + res_ndigits = maxdigits; + Assert(res_ndigits == var1ndigits + var2ndigits + 1); } - i = res_weight + global_rscale + 2; - if (i >= 0 && i < res_ndigits) + /* + * We do the arithmetic in an array "dig[]" of signed int's. Since + * INT_MAX is noticeably larger than NBASE*NBASE, this gives us headroom + * to avoid normalizing carries immediately. + * + * maxdig tracks the maximum possible value of any dig[] entry; + * when this threatens to exceed INT_MAX, we take the time to propagate + * carries. To avoid overflow in maxdig itself, it actually represents + * the max possible value divided by NBASE-1. + */ + dig = (int *) palloc0(res_ndigits * sizeof(int)); + maxdig = 0; + + ri = res_ndigits - 1; + for (i1 = var1ndigits - 1; i1 >= 0; ri--, i1--) { - sum = (res_digits[i] > 4) ? 1 : 0; - res_ndigits = i; - i--; - while (sum) + int var1digit = var1digits[i1]; + + if (var1digit == 0) + continue; + + /* Time to normalize? */ + maxdig += var1digit; + if (maxdig > INT_MAX/(NBASE-1)) { - sum += res_digits[i]; - res_digits[i--] = sum % 10; - sum /= 10; + /* Yes, do it */ + carry = 0; + for (i = res_ndigits-1; i >= 0; i--) + { + newdig = dig[i] + carry; + if (newdig >= NBASE) + { + carry = newdig/NBASE; + newdig -= carry*NBASE; + } + else + carry = 0; + dig[i] = newdig; + } + Assert(carry == 0); + /* Reset maxdig to indicate new worst-case */ + maxdig = 1 + var1digit; } - } - while (res_ndigits > 0 && *res_digits == 0) - { - res_digits++; - res_weight--; - res_ndigits--; + /* Add appropriate multiple of var2 into the accumulator */ + i = ri; + for (i2 = var2ndigits - 1; i2 >= 0; i2--) + { + dig[i--] += var1digit * var2digits[i2]; + } } - while (res_ndigits > 0 && res_digits[res_ndigits - 1] == 0) - res_ndigits--; - if (res_ndigits == 0) + /* + * Now we do a final carry propagation pass to normalize the result, + * which we combine with storing the result digits into the output. + * Note that this is still done at full precision w/guard digits. + */ + alloc_var(result, res_ndigits); + res_digits = result->digits; + carry = 0; + for (i = res_ndigits-1; i >= 0; i--) { - res_sign = NUMERIC_POS; - res_weight = 0; + newdig = dig[i] + carry; + if (newdig >= NBASE) + { + carry = newdig/NBASE; + newdig -= carry*NBASE; + } + else + carry = 0; + res_digits[i] = newdig; } + Assert(carry == 0); - digitbuf_free(result->buf); - result->buf = res_buf; - result->digits = res_digits; - result->ndigits = res_ndigits; + pfree(dig); + + /* + * Finally, round the result to the requested precision. + */ result->weight = res_weight; - result->rscale = global_rscale; result->sign = res_sign; + + /* Round to target rscale (and set result->dscale) */ + round_var(result, rscale); + + /* Strip leading and trailing zeroes */ + strip_var(result); } -/* ---------- +/* * div_var() - * - * Division on variable level. Accuracy of result is determined by - * global_rscale. - * ---------- + * Division on variable level. Quotient of var1 / var2 is stored + * in result. Result is rounded to no more than rscale fractional digits. */ static void -div_var(NumericVar *var1, NumericVar *var2, NumericVar *result) +div_var(NumericVar *var1, NumericVar *var2, NumericVar *result, + int rscale) { - NumericDigit *res_digits; - int res_ndigits; + int div_ndigits; int res_sign; int res_weight; - NumericVar dividend; - NumericVar divisor[10]; - int ndigits_tmp; - int weight_tmp; - int rscale_tmp; - int ri; + int *div; + int qdigit; + int carry; + int maxdiv; + int newdig; + NumericDigit *res_digits; + double fdividend, + fdivisor, + fdivisorinverse, + fquotient; + int qi; int i; - long guess; - long first_have; - long first_div; - int first_nextdigit; - int stat = 0; + /* copy these values into local vars for speed in inner loop */ + int var1ndigits = var1->ndigits; + int var2ndigits = var2->ndigits; + NumericDigit *var1digits = var1->digits; + NumericDigit *var2digits = var2->digits; /* - * First of all division by zero check + * First of all division by zero check; we must not be handed an + * unnormalized divisor. */ - ndigits_tmp = var2->ndigits + 1; - if (ndigits_tmp == 1) + if (var2ndigits == 0 || var2digits[0] == 0) elog(ERROR, "division by zero"); /* - * Determine the result sign, weight and number of digits to calculate - */ - if (var1->sign == var2->sign) - res_sign = NUMERIC_POS; - else - res_sign = NUMERIC_NEG; - res_weight = var1->weight - var2->weight + 1; - res_ndigits = global_rscale + res_weight; - if (res_ndigits <= 0) - res_ndigits = 1; - - /* * Now result zero check */ - if (var1->ndigits == 0) + if (var1ndigits == 0) { zero_var(result); - result->rscale = global_rscale; + result->dscale = rscale; return; } /* - * Initialize local variables + * Determine the result sign, weight and number of digits to calculate */ - init_var(÷nd); - for (i = 1; i < 10; i++) - init_var(&divisor[i]); + if (var1->sign == var2->sign) + res_sign = NUMERIC_POS; + else + res_sign = NUMERIC_NEG; + res_weight = var1->weight - var2->weight + 1; + /* The number of accurate result digits we need to produce: */ + div_ndigits = res_weight + 1 + (rscale + DEC_DIGITS-1)/DEC_DIGITS; + /* Add guard digits for roundoff error */ + div_ndigits += DIV_GUARD_DIGITS; + if (div_ndigits < DIV_GUARD_DIGITS) + div_ndigits = DIV_GUARD_DIGITS; + /* Must be at least var1ndigits, too, to simplify data-loading loop */ + if (div_ndigits < var1ndigits) + div_ndigits = var1ndigits; /* - * Make a copy of the divisor which has one leading zero digit + * We do the arithmetic in an array "div[]" of signed int's. Since + * INT_MAX is noticeably larger than NBASE*NBASE, this gives us headroom + * to avoid normalizing carries immediately. + * + * We start with div[] containing one zero digit followed by the + * dividend's digits (plus appended zeroes to reach the desired + * precision including guard digits). Each step of the main loop + * computes an (approximate) quotient digit and stores it into div[], + * removing one position of dividend space. A final pass of carry + * propagation takes care of any mistaken quotient digits. */ - divisor[1].ndigits = ndigits_tmp; - divisor[1].rscale = var2->ndigits; - divisor[1].sign = NUMERIC_POS; - divisor[1].buf = digitbuf_alloc(ndigits_tmp); - divisor[1].digits = divisor[1].buf; - divisor[1].digits[0] = 0; - memcpy(&(divisor[1].digits[1]), var2->digits, ndigits_tmp - 1); + div = (int *) palloc0((div_ndigits + 1) * sizeof(int)); + for (i = 0; i < var1ndigits; i++) + div[i+1] = var1digits[i]; /* - * Make a copy of the dividend + * We estimate each quotient digit using floating-point arithmetic, + * taking the first four digits of the (current) dividend and divisor. + * This must be float to avoid overflow. */ - dividend.ndigits = var1->ndigits; - dividend.weight = 0; - dividend.rscale = var1->ndigits; - dividend.sign = NUMERIC_POS; - dividend.buf = digitbuf_alloc(var1->ndigits); - dividend.digits = dividend.buf; - memcpy(dividend.digits, var1->digits, var1->ndigits); + fdivisor = (double) var2digits[0]; + for (i = 1; i < 4; i++) + { + fdivisor *= NBASE; + if (i < var2ndigits) + fdivisor += (double) var2digits[i]; + } + fdivisorinverse = 1.0 / fdivisor; /* - * Setup the result + * maxdiv tracks the maximum possible absolute value of any div[] entry; + * when this threatens to exceed INT_MAX, we take the time to propagate + * carries. To avoid overflow in maxdiv itself, it actually represents + * the max possible abs. value divided by NBASE-1. */ - digitbuf_free(result->buf); - result->buf = digitbuf_alloc(res_ndigits + 2); - res_digits = result->buf; - result->digits = res_digits; - result->ndigits = res_ndigits; - result->weight = res_weight; - result->rscale = global_rscale; - result->sign = res_sign; - res_digits[0] = 0; - - first_div = divisor[1].digits[1] * 10; - if (ndigits_tmp > 2) - first_div += divisor[1].digits[2]; - - first_have = 0; - first_nextdigit = 0; - - weight_tmp = 1; - rscale_tmp = divisor[1].rscale; + maxdiv = 1; - for (ri = 0; ri <= res_ndigits; ri++) + /* + * Outer loop computes next quotient digit, which will go into div[qi] + */ + for (qi = 0; qi < div_ndigits; qi++) { - first_have = first_have * 10; - if (first_nextdigit >= 0 && first_nextdigit < dividend.ndigits) - first_have += dividend.digits[first_nextdigit]; - first_nextdigit++; - - guess = (first_have * 10) / first_div + 1; - if (guess > 9) - guess = 9; + /* Approximate the current dividend value */ + fdividend = (double) div[qi]; + for (i = 1; i < 4; i++) + { + fdividend *= NBASE; + if (qi+i <= div_ndigits) + fdividend += (double) div[qi+i]; + } + /* Compute the (approximate) quotient digit */ + fquotient = fdividend * fdivisorinverse; + qdigit = (fquotient >= 0.0) ? ((int) fquotient) : + (((int) fquotient) - 1); /* truncate towards -infinity */ - while (guess > 0) + if (qdigit != 0) { - if (divisor[guess].buf == NULL) + /* Do we need to normalize now? */ + maxdiv += Abs(qdigit); + if (maxdiv > INT_MAX/(NBASE-1)) { - int i; - long sum = 0; - - memcpy(&divisor[guess], &divisor[1], sizeof(NumericVar)); - divisor[guess].buf = digitbuf_alloc(divisor[guess].ndigits); - divisor[guess].digits = divisor[guess].buf; - for (i = divisor[1].ndigits - 1; i >= 0; i--) + /* Yes, do it */ + carry = 0; + for (i = div_ndigits; i > qi; i--) { - sum += divisor[1].digits[i] * guess; - divisor[guess].digits[i] = sum % 10; - sum /= 10; + newdig = div[i] + carry; + if (newdig < 0) + { + carry = -((-newdig-1)/NBASE) - 1; + newdig -= carry*NBASE; + } + else if (newdig >= NBASE) + { + carry = newdig/NBASE; + newdig -= carry*NBASE; + } + else + carry = 0; + div[i] = newdig; } + newdig = div[qi] + carry; + div[qi] = newdig; + /* + * All the div[] digits except possibly div[qi] are now + * in the range 0..NBASE-1. + */ + maxdiv = Abs(newdig) / (NBASE-1); + maxdiv = Max(maxdiv, 1); + /* + * Recompute the quotient digit since new info may have + * propagated into the top four dividend digits + */ + fdividend = (double) div[qi]; + for (i = 1; i < 4; i++) + { + fdividend *= NBASE; + if (qi+i <= div_ndigits) + fdividend += (double) div[qi+i]; + } + /* Compute the (approximate) quotient digit */ + fquotient = fdividend * fdivisorinverse; + qdigit = (fquotient >= 0.0) ? ((int) fquotient) : + (((int) fquotient) - 1); /* truncate towards -infinity */ + maxdiv += Abs(qdigit); } - divisor[guess].weight = weight_tmp; - divisor[guess].rscale = rscale_tmp; - - stat = cmp_abs(÷nd, &divisor[guess]); - if (stat >= 0) - break; - - guess--; - } + /* Subtract off the appropriate multiple of the divisor */ + if (qdigit != 0) + { + int istop = Min(var2ndigits, div_ndigits-qi+1); - res_digits[ri + 1] = guess; - if (stat == 0) - { - ri++; - break; + for (i = 0; i < istop; i++) + div[qi+i] -= qdigit * var2digits[i]; + } } - weight_tmp--; - rscale_tmp++; - - if (guess == 0) - continue; - - sub_abs(÷nd, &divisor[guess], ÷nd); + /* + * The dividend digit we are about to replace might still be nonzero. + * Fold it into the next digit position. We don't need to worry about + * overflow here since this should nearly cancel with the subtraction + * of the divisor. + */ + div[qi+1] += div[qi] * NBASE; - first_nextdigit = dividend.weight - weight_tmp; - first_have = 0; - if (first_nextdigit >= 0 && first_nextdigit < dividend.ndigits) - first_have = dividend.digits[first_nextdigit]; - first_nextdigit++; + div[qi] = qdigit; } - result->ndigits = ri + 1; - if (ri == res_ndigits + 1) + /* + * Approximate and store the last quotient digit (div[div_ndigits]) + */ + fdividend = (double) div[qi]; + for (i = 1; i < 4; i++) { - int carry = (res_digits[ri] > 4) ? 1 : 0; - - result->ndigits = ri; - res_digits[ri] = 0; + fdividend *= NBASE; + } + fquotient = fdividend * fdivisorinverse; + qdigit = (fquotient >= 0.0) ? ((int) fquotient) : + (((int) fquotient) - 1); /* truncate towards -infinity */ + div[qi] = qdigit; - while (carry && ri > 0) + /* + * Now we do a final carry propagation pass to normalize the result, + * which we combine with storing the result digits into the output. + * Note that this is still done at full precision w/guard digits. + */ + alloc_var(result, div_ndigits+1); + res_digits = result->digits; + carry = 0; + for (i = div_ndigits; i >= 0; i--) + { + newdig = div[i] + carry; + if (newdig < 0) + { + carry = -((-newdig-1)/NBASE) - 1; + newdig -= carry*NBASE; + } + else if (newdig >= NBASE) { - carry += res_digits[--ri]; - res_digits[ri] = carry % 10; - carry /= 10; + carry = newdig/NBASE; + newdig -= carry*NBASE; } + else + carry = 0; + res_digits[i] = newdig; } + Assert(carry == 0); - while (result->ndigits > 0 && *(result->digits) == 0) - { - (result->digits)++; - (result->weight)--; - (result->ndigits)--; - } - while (result->ndigits > 0 && result->digits[result->ndigits - 1] == 0) - (result->ndigits)--; - if (result->ndigits == 0) - result->sign = NUMERIC_POS; + pfree(div); /* - * Tidy up + * Finally, round the result to the requested precision. */ - digitbuf_free(dividend.buf); - for (i = 1; i < 10; i++) - digitbuf_free(divisor[i].buf); + result->weight = res_weight; + result->sign = res_sign; + + /* Round to target rscale (and set result->dscale) */ + round_var(result, rscale); + + /* Strip leading and trailing zeroes */ + strip_var(result); } /* * Default scale selection for division * - * Returns the appropriate display scale for the division result, - * and sets global_rscale to the result scale to use during div_var. - * - * Note that this must be called before div_var. + * Returns the appropriate result scale for the division result. */ static int select_div_scale(NumericVar *var1, NumericVar *var2) @@ -3456,18 +3768,14 @@ select_div_scale(NumericVar *var1, NumericVar *var2) i; NumericDigit firstdigit1, firstdigit2; - int res_dscale; - int res_rscale; + int rscale; /* * The result scale of a division isn't specified in any SQL standard. - * For PostgreSQL we select a display scale that will give at least + * For PostgreSQL we select a result scale that will give at least * NUMERIC_MIN_SIG_DIGITS significant digits, so that numeric gives a * result no less accurate than float8; but use a scale not less than * either input's display scale. - * - * The result scale is NUMERIC_EXTRA_DIGITS more than the display scale, - * to provide some guard digits in the calculation. */ /* Get the actual (normalized) weight and first digit of each input */ @@ -3504,74 +3812,59 @@ select_div_scale(NumericVar *var1, NumericVar *var2) if (firstdigit1 <= firstdigit2) qweight--; - /* Select display scale */ - res_dscale = NUMERIC_MIN_SIG_DIGITS - qweight; - res_dscale = Max(res_dscale, var1->dscale); - res_dscale = Max(res_dscale, var2->dscale); - res_dscale = Max(res_dscale, NUMERIC_MIN_DISPLAY_SCALE); - res_dscale = Min(res_dscale, NUMERIC_MAX_DISPLAY_SCALE); - /* Select result scale */ - res_rscale = res_dscale + NUMERIC_EXTRA_DIGITS; - global_rscale = res_rscale; + rscale = NUMERIC_MIN_SIG_DIGITS - qweight * DEC_DIGITS; + rscale = Max(rscale, var1->dscale); + rscale = Max(rscale, var2->dscale); + rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE); + rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE); - return res_dscale; + return rscale; } -/* ---------- +/* * mod_var() - * * Calculate the modulo of two numerics at variable level - * ---------- */ static void mod_var(NumericVar *var1, NumericVar *var2, NumericVar *result) { NumericVar tmp; - int save_global_rscale; - int div_dscale; + int rscale; init_var(&tmp); /* --------- * We do this using the equation * mod(x,y) = x - trunc(x/y)*y - * We set global_rscale the same way numeric_div and numeric_mul do + * We set rscale the same way numeric_div and numeric_mul do * to get the right answer from the equation. The final result, * however, need not be displayed to more precision than the inputs. * ---------- */ - save_global_rscale = global_rscale; - - div_dscale = select_div_scale(var1, var2); + rscale = select_div_scale(var1, var2); - div_var(var1, var2, &tmp); + div_var(var1, var2, &tmp, rscale); - tmp.dscale = div_dscale; + trunc_var(&tmp, 0); - /* do trunc() by forgetting digits to the right of the decimal point */ - tmp.ndigits = Max(0, Min(tmp.ndigits, tmp.weight + 1)); - - global_rscale = var2->rscale + tmp.rscale; - - mul_var(var2, &tmp, &tmp); + mul_var(var2, &tmp, &tmp, var2->dscale + tmp.dscale); sub_var(var1, &tmp, result); - result->dscale = Max(var1->dscale, var2->dscale); + round_var(result, Max(var1->dscale, var2->dscale)); - global_rscale = save_global_rscale; free_var(&tmp); } -/* ---------- +/* * ceil_var() - * * Return the smallest integer greater than or equal to the argument * on variable level - * ---------- */ static void ceil_var(NumericVar *var, NumericVar *result) @@ -3581,9 +3874,9 @@ ceil_var(NumericVar *var, NumericVar *result) init_var(&tmp); set_var_from_var(var, &tmp); - tmp.rscale = 0; - tmp.ndigits = Min(tmp.ndigits, Max(0, tmp.weight + 1)); - if (tmp.sign == NUMERIC_POS && cmp_var(var, &tmp) != 0) + trunc_var(&tmp, 0); + + if (var->sign == NUMERIC_POS && cmp_var(var, &tmp) != 0) add_var(&tmp, &const_one, &tmp); set_var_from_var(&tmp, result); @@ -3591,12 +3884,11 @@ ceil_var(NumericVar *var, NumericVar *result) } -/* ---------- +/* * floor_var() - * * Return the largest integer equal to or less than the argument * on variable level - * ---------- */ static void floor_var(NumericVar *var, NumericVar *result) @@ -3606,9 +3898,9 @@ floor_var(NumericVar *var, NumericVar *result) init_var(&tmp); set_var_from_var(var, &tmp); - tmp.rscale = 0; - tmp.ndigits = Min(tmp.ndigits, Max(0, tmp.weight + 1)); - if (tmp.sign == NUMERIC_NEG && cmp_var(var, &tmp) != 0) + trunc_var(&tmp, 0); + + if (var->sign == NUMERIC_NEG && cmp_var(var, &tmp) != 0) sub_var(&tmp, &const_one, &tmp); set_var_from_var(&tmp, result); @@ -3616,33 +3908,27 @@ floor_var(NumericVar *var, NumericVar *result) } -/* ---------- +/* * sqrt_var() - * * Compute the square root of x using Newton's algorithm - * ---------- */ static void -sqrt_var(NumericVar *arg, NumericVar *result) +sqrt_var(NumericVar *arg, NumericVar *result, int rscale) { NumericVar tmp_arg; NumericVar tmp_val; NumericVar last_val; - int res_rscale; - int save_global_rscale; + int local_rscale; int stat; - save_global_rscale = global_rscale; - global_rscale += 8; - res_rscale = global_rscale; + local_rscale = rscale + 8; stat = cmp_var(arg, &const_zero); if (stat == 0) { - set_var_from_var(&const_zero, result); - result->rscale = res_rscale; - result->sign = NUMERIC_POS; - global_rscale = save_global_rscale; + zero_var(result); + result->dscale = rscale; return; } @@ -3659,25 +3945,21 @@ sqrt_var(NumericVar *arg, NumericVar *result) /* * Initialize the result to the first guess */ - digitbuf_free(result->buf); - result->buf = digitbuf_alloc(1); - result->digits = result->buf; + alloc_var(result, 1); result->digits[0] = tmp_arg.digits[0] / 2; if (result->digits[0] == 0) result->digits[0] = 1; - result->ndigits = 1; result->weight = tmp_arg.weight / 2; - result->rscale = res_rscale; result->sign = NUMERIC_POS; set_var_from_var(result, &last_val); for (;;) { - div_var(&tmp_arg, result, &tmp_val); + div_var(&tmp_arg, result, &tmp_val, local_rscale); add_var(result, &tmp_val, result); - div_var(result, &const_two, result); + mul_var(result, &const_zero_point_five, result, local_rscale); if (cmp_var(&last_val, result) == 0) break; @@ -3688,37 +3970,34 @@ sqrt_var(NumericVar *arg, NumericVar *result) free_var(&tmp_val); free_var(&tmp_arg); - global_rscale = save_global_rscale; - div_var(result, &const_one, result); + /* Round to requested precision */ + round_var(result, rscale); } -/* ---------- +/* * exp_var() - * * Raise e to the power of x - * ---------- */ static void -exp_var(NumericVar *arg, NumericVar *result) +exp_var(NumericVar *arg, NumericVar *result, int rscale) { NumericVar x; - NumericVar xpow; - NumericVar ifac; - NumericVar elem; - NumericVar ni; - int d; - int i; int xintval; - int ndiv2 = 0; bool xneg = FALSE; - int save_global_rscale; - + int local_rscale; + + /*---------- + * We separate the integral and fraction parts of x, then compute + * e^x = e^xint * e^xfrac + * where e = exp(1) and e^xfrac = exp(xfrac) are computed by + * exp_var_internal; the limited range of inputs allows that routine + * to do a good job with a simple Taylor series. Raising e^xint is + * done by repeated multiplications in power_var_int. + *---------- + */ init_var(&x); - init_var(&xpow); - init_var(&ifac); - init_var(&elem); - init_var(&ni); set_var_from_var(arg, &x); @@ -3728,26 +4007,88 @@ exp_var(NumericVar *arg, NumericVar *result) x.sign = NUMERIC_POS; } - /* Select an appropriate scale for internal calculation */ + /* Extract the integer part, remove it from x */ xintval = 0; - for (i = x.weight, d = 0; i >= 0; i--, d++) + while (x.weight >= 0) { - xintval *= 10; - if (d < x.ndigits) - xintval += x.digits[d]; - if (xintval >= NUMERIC_MAX_RESULT_SCALE) + xintval *= NBASE; + if (x.ndigits > 0) + { + xintval += x.digits[0]; + x.digits++; + x.ndigits--; + } + x.weight--; + /* Guard against overflow */ + if (xintval >= NUMERIC_MAX_RESULT_SCALE * 3) elog(ERROR, "argument for EXP() too big"); } - save_global_rscale = global_rscale; - global_rscale += xintval / 2 + 8; + /* Select an appropriate scale for internal calculation */ + local_rscale = rscale + MUL_GUARD_DIGITS * 2; - /* Reduce input into range 0 <= x <= 0.1 */ - while (cmp_var(&x, &const_zero_point_one) > 0) + /* Compute e^xfrac */ + exp_var_internal(&x, result, local_rscale); + + /* If there's an integer part, multiply by e^xint */ + if (xintval > 0) + { + NumericVar e; + + init_var(&e); + exp_var_internal(&const_one, &e, local_rscale); + power_var_int(&e, xintval, &e, local_rscale); + mul_var(&e, result, result, local_rscale); + free_var(&e); + } + + /* Compensate for input sign, and round to requested rscale */ + if (xneg) + div_var(&const_one, result, result, rscale); + else + round_var(result, rscale); + + free_var(&x); +} + + +/* + * exp_var_internal() - + * + * Raise e to the power of x, where 0 <= x <= 1 + * + * NB: the result should be good to at least rscale digits, but it has + * *not* been rounded off; the caller must do that if wanted. + */ +static void +exp_var_internal(NumericVar *arg, NumericVar *result, int rscale) +{ + NumericVar x; + NumericVar xpow; + NumericVar ifac; + NumericVar elem; + NumericVar ni; + int ndiv2 = 0; + int local_rscale; + + init_var(&x); + init_var(&xpow); + init_var(&ifac); + init_var(&elem); + init_var(&ni); + + set_var_from_var(arg, &x); + + Assert(x.sign == NUMERIC_POS); + + local_rscale = rscale + 8; + + /* Reduce input into range 0 <= x <= 0.01 */ + while (cmp_var(&x, &const_zero_point_01) > 0) { ndiv2++; - global_rscale++; - div_var(&x, &const_two, &x); + local_rscale++; + mul_var(&x, &const_zero_point_five, &x, x.dscale+1); } /* @@ -3756,7 +4097,7 @@ exp_var(NumericVar *arg, NumericVar *result) * exp(x) = 1 + x + x^2/2! + x^3/3! + ... * * Given the limited range of x, this should converge reasonably quickly. - * We run the series until the terms fall below the global_rscale limit. + * We run the series until the terms fall below the local_rscale limit. */ add_var(&const_one, &x, result); set_var_from_var(&x, &xpow); @@ -3766,9 +4107,9 @@ exp_var(NumericVar *arg, NumericVar *result) for (;;) { add_var(&ni, &const_one, &ni); - mul_var(&xpow, &x, &xpow); - mul_var(&ifac, &ni, &ifac); - div_var(&xpow, &ifac, &elem); + mul_var(&xpow, &x, &xpow, local_rscale); + mul_var(&ifac, &ni, &ifac, 0); + div_var(&xpow, &ifac, &elem, local_rscale); if (elem.ndigits == 0) break; @@ -3778,17 +4119,7 @@ exp_var(NumericVar *arg, NumericVar *result) /* Compensate for argument range reduction */ while (ndiv2-- > 0) - mul_var(result, result, result); - - /* Compensate for input sign, and round to caller's global_rscale */ - global_rscale = save_global_rscale; - - if (xneg) - div_var(&const_one, result, result); - else - div_var(result, &const_one, result); - - result->sign = NUMERIC_POS; + mul_var(result, result, result, local_rscale); free_var(&x); free_var(&xpow); @@ -3798,27 +4129,25 @@ exp_var(NumericVar *arg, NumericVar *result) } -/* ---------- +/* * ln_var() - * * Compute the natural log of x - * ---------- */ static void -ln_var(NumericVar *arg, NumericVar *result) +ln_var(NumericVar *arg, NumericVar *result, int rscale) { NumericVar x; NumericVar xx; NumericVar ni; NumericVar elem; NumericVar fact; - int save_global_rscale; + int local_rscale; if (cmp_var(arg, &const_zero) <= 0) elog(ERROR, "math error on numeric - cannot compute LN of value <= zero"); - save_global_rscale = global_rscale; - global_rscale += 8; + local_rscale = rscale + 8; init_var(&x); init_var(&xx); @@ -3826,21 +4155,21 @@ ln_var(NumericVar *arg, NumericVar *result) init_var(&elem); init_var(&fact); - set_var_from_var(&const_two, &fact); set_var_from_var(arg, &x); + set_var_from_var(&const_two, &fact); /* Reduce input into range 0.9 < x < 1.1 */ while (cmp_var(&x, &const_zero_point_nine) <= 0) { - global_rscale++; - sqrt_var(&x, &x); - mul_var(&fact, &const_two, &fact); + local_rscale++; + sqrt_var(&x, &x, local_rscale); + mul_var(&fact, &const_two, &fact, 0); } while (cmp_var(&x, &const_one_point_one) >= 0) { - global_rscale++; - sqrt_var(&x, &x); - mul_var(&fact, &const_two, &fact); + local_rscale++; + sqrt_var(&x, &x, local_rscale); + mul_var(&fact, &const_two, &fact, 0); } /* @@ -3856,31 +4185,29 @@ ln_var(NumericVar *arg, NumericVar *result) */ sub_var(&x, &const_one, result); add_var(&x, &const_one, &elem); - div_var(result, &elem, result); + div_var(result, &elem, result, local_rscale); set_var_from_var(result, &xx); - mul_var(result, result, &x); + mul_var(result, result, &x, local_rscale); set_var_from_var(&const_one, &ni); for (;;) { add_var(&ni, &const_two, &ni); - mul_var(&xx, &x, &xx); - div_var(&xx, &ni, &elem); + mul_var(&xx, &x, &xx, local_rscale); + div_var(&xx, &ni, &elem, local_rscale); if (elem.ndigits == 0) break; add_var(result, &elem, result); - if (elem.weight < (result->weight - 2 * global_rscale)) + if (elem.weight < (result->weight - local_rscale * 2/DEC_DIGITS)) break; } - /* Compensate for argument range reduction, round to caller's rscale */ - global_rscale = save_global_rscale; - - mul_var(result, &fact, result); + /* Compensate for argument range reduction, round to requested rscale */ + mul_var(result, &fact, result, rscale); free_var(&x); free_var(&xx); @@ -3890,105 +4217,137 @@ ln_var(NumericVar *arg, NumericVar *result) } -/* ---------- +/* * log_var() - * * Compute the logarithm of num in a given base. * - * Note: this routine chooses rscale and dscale of the result. - * ---------- + * Note: this routine chooses dscale of the result. */ static void log_var(NumericVar *base, NumericVar *num, NumericVar *result) { NumericVar ln_base; NumericVar ln_num; - int save_global_rscale = global_rscale; - int res_dscale; + int dec_digits; + int rscale; + int local_rscale; init_var(&ln_base); init_var(&ln_num); - /* Set scale for ln() calculations */ - if (num->weight > 0) - res_dscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(num->weight); - else if (num->weight < 0) - res_dscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(- num->weight); + /* Set scale for ln() calculations --- compare numeric_ln() */ + + /* Approx decimal digits before decimal point */ + dec_digits = (num->weight + 1) * DEC_DIGITS; + + if (dec_digits > 1) + rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(dec_digits - 1); + else if (dec_digits < 1) + rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(1 - dec_digits); else - res_dscale = NUMERIC_MIN_SIG_DIGITS; + rscale = NUMERIC_MIN_SIG_DIGITS; - res_dscale = Max(res_dscale, base->dscale); - res_dscale = Max(res_dscale, num->dscale); - res_dscale = Max(res_dscale, NUMERIC_MIN_DISPLAY_SCALE); - res_dscale = Min(res_dscale, NUMERIC_MAX_DISPLAY_SCALE); + rscale = Max(rscale, base->dscale); + rscale = Max(rscale, num->dscale); + rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE); + rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE); - global_rscale = res_dscale + 8; + local_rscale = rscale + 8; /* Form natural logarithms */ - ln_var(base, &ln_base); - ln_var(num, &ln_num); + ln_var(base, &ln_base, local_rscale); + ln_var(num, &ln_num, local_rscale); - ln_base.dscale = res_dscale; - ln_num.dscale = res_dscale; + ln_base.dscale = rscale; + ln_num.dscale = rscale; /* Select scale for division result */ - res_dscale = select_div_scale(&ln_num, &ln_base); - - div_var(&ln_num, &ln_base, result); + rscale = select_div_scale(&ln_num, &ln_base); - result->dscale = res_dscale; - - global_rscale = save_global_rscale; + div_var(&ln_num, &ln_base, result, rscale); free_var(&ln_num); free_var(&ln_base); } -/* ---------- +/* * power_var() - * * Raise base to the power of exp * - * Note: this routine chooses rscale and dscale of the result. - * ---------- + * Note: this routine chooses dscale of the result. */ static void power_var(NumericVar *base, NumericVar *exp, NumericVar *result) { NumericVar ln_base; NumericVar ln_num; - int save_global_rscale = global_rscale; - int res_dscale; + int dec_digits; + int rscale; + int local_rscale; double val; + /* If exp can be represented as an integer, use power_var_int */ + if (exp->ndigits == 0 || exp->ndigits <= exp->weight + 1) + { + /* exact integer, but does it fit in int? */ + NumericVar x; + int64 expval64; + + /* must copy because numericvar_to_int8() scribbles on input */ + init_var(&x); + set_var_from_var(exp, &x); + if (numericvar_to_int8(&x, &expval64)) + { + int expval = (int) expval64; + + /* Test for overflow by reverse-conversion. */ + if ((int64) expval == expval64) + { + /* Okay, select rscale */ + rscale = NUMERIC_MIN_SIG_DIGITS; + rscale = Max(rscale, base->dscale); + rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE); + rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE); + + power_var_int(base, expval, result, rscale); + + free_var(&x); + return; + } + } + free_var(&x); + } + init_var(&ln_base); init_var(&ln_num); /* Set scale for ln() calculation --- need extra accuracy here */ - if (base->weight > 0) - res_dscale = NUMERIC_MIN_SIG_DIGITS*2 - (int) log10(base->weight); - else if (base->weight < 0) - res_dscale = NUMERIC_MIN_SIG_DIGITS*2 - (int) log10(- base->weight); - else - res_dscale = NUMERIC_MIN_SIG_DIGITS*2; - res_dscale = Max(res_dscale, base->dscale * 2); - res_dscale = Max(res_dscale, exp->dscale * 2); - res_dscale = Max(res_dscale, NUMERIC_MIN_DISPLAY_SCALE); - res_dscale = Min(res_dscale, NUMERIC_MAX_DISPLAY_SCALE); + /* Approx decimal digits before decimal point */ + dec_digits = (base->weight + 1) * DEC_DIGITS; - global_rscale = res_dscale + 8; + if (dec_digits > 1) + rscale = NUMERIC_MIN_SIG_DIGITS*2 - (int) log10(dec_digits - 1); + else if (dec_digits < 1) + rscale = NUMERIC_MIN_SIG_DIGITS*2 - (int) log10(1 - dec_digits); + else + rscale = NUMERIC_MIN_SIG_DIGITS*2; - ln_var(base, &ln_base); + rscale = Max(rscale, base->dscale * 2); + rscale = Max(rscale, exp->dscale * 2); + rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE * 2); + rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE * 2); - ln_base.dscale = res_dscale; + local_rscale = rscale + 8; - mul_var(&ln_base, exp, &ln_num); + ln_var(base, &ln_base, local_rscale); - ln_num.dscale = res_dscale; + mul_var(&ln_base, exp, &ln_num, local_rscale); - /* Set scale for exp() */ + /* Set scale for exp() -- compare numeric_exp() */ /* convert input to float8, ignoring overflow */ val = numericvar_to_double_no_overflow(&ln_num); @@ -4000,22 +4359,86 @@ power_var(NumericVar *base, NumericVar *exp, NumericVar *result) val = Max(val, -NUMERIC_MAX_RESULT_SCALE); val = Min(val, NUMERIC_MAX_RESULT_SCALE); - res_dscale = NUMERIC_MIN_SIG_DIGITS - (int) val; - res_dscale = Max(res_dscale, base->dscale); - res_dscale = Max(res_dscale, exp->dscale); - res_dscale = Max(res_dscale, NUMERIC_MIN_DISPLAY_SCALE); - res_dscale = Min(res_dscale, NUMERIC_MAX_DISPLAY_SCALE); + rscale = NUMERIC_MIN_SIG_DIGITS - (int) val; + rscale = Max(rscale, base->dscale); + rscale = Max(rscale, exp->dscale); + rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE); + rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE); - global_rscale = res_dscale + 8; + exp_var(&ln_num, result, rscale); - exp_var(&ln_num, result); + free_var(&ln_num); + free_var(&ln_base); +} - result->dscale = res_dscale; +/* + * power_var_int() - + * + * Raise base to the power of exp, where exp is an integer. + */ +static void +power_var_int(NumericVar *base, int exp, NumericVar *result, int rscale) +{ + bool neg; + NumericVar base_prod; + int local_rscale; - global_rscale = save_global_rscale; + /* Detect some special cases, particularly 0^0. */ - free_var(&ln_num); - free_var(&ln_base); + switch (exp) + { + case 0: + if (base->ndigits == 0) + elog(ERROR, "zero raised to zero is undefined"); + set_var_from_var(&const_one, result); + result->dscale = rscale; /* no need to round */ + return; + case 1: + set_var_from_var(base, result); + round_var(result, rscale); + return; + case -1: + div_var(&const_one, base, result, rscale); + return; + case 2: + mul_var(base, base, result, rscale); + return; + default: + break; + } + + /* + * The general case repeatedly multiplies base according to the + * bit pattern of exp. We do the multiplications with some extra + * precision. + */ + neg = (exp < 0); + exp = Abs(exp); + + local_rscale = rscale + MUL_GUARD_DIGITS * 2; + + init_var(&base_prod); + set_var_from_var(base, &base_prod); + + if (exp & 1) + set_var_from_var(base, result); + else + set_var_from_var(&const_one, result); + + while ((exp >>= 1) > 0) + { + mul_var(&base_prod, &base_prod, &base_prod, local_rscale); + if (exp & 1) + mul_var(&base_prod, result, result, local_rscale); + } + + free_var(&base_prod); + + /* Compensate for input sign, and round to requested rscale */ + if (neg) + div_var(&const_one, result, result, rscale); + else + round_var(result, rscale); } @@ -4040,30 +4463,36 @@ power_var(NumericVar *base, NumericVar *exp, NumericVar *result) static int cmp_abs(NumericVar *var1, NumericVar *var2) { + NumericDigit *var1digits = var1->digits; + NumericDigit *var2digits = var2->digits; int i1 = 0; int i2 = 0; int w1 = var1->weight; int w2 = var2->weight; - int stat; + + /* Check any digits before the first common digit */ while (w1 > w2 && i1 < var1->ndigits) { - if (var1->digits[i1++] != 0) + if (var1digits[i1++] != 0) return 1; w1--; } while (w2 > w1 && i2 < var2->ndigits) { - if (var2->digits[i2++] != 0) + if (var2digits[i2++] != 0) return -1; w2--; } + /* At this point, either w1 == w2 or we've run out of digits */ + if (w1 == w2) { while (i1 < var1->ndigits && i2 < var2->ndigits) { - stat = var1->digits[i1++] - var2->digits[i2++]; + int stat = var1digits[i1++] - var2digits[i2++]; + if (stat) { if (stat > 0) @@ -4073,14 +4502,18 @@ cmp_abs(NumericVar *var1, NumericVar *var2) } } + /* + * At this point, we've run out of digits on one side or the other; + * so any remaining nonzero digits imply that side is larger + */ while (i1 < var1->ndigits) { - if (var1->digits[i1++] != 0) + if (var1digits[i1++] != 0) return 1; } while (i2 < var2->ndigits) { - if (var2->digits[i2++] != 0) + if (var2digits[i2++] != 0) return -1; } @@ -4088,12 +4521,11 @@ cmp_abs(NumericVar *var1, NumericVar *var2) } -/* ---------- +/* * add_abs() - * * Add the absolute values of two variables into result. * result might point to one of the operands without danger. - * ---------- */ static void add_abs(NumericVar *var1, NumericVar *var2, NumericVar *result) @@ -4102,7 +4534,9 @@ add_abs(NumericVar *var1, NumericVar *var2, NumericVar *result) NumericDigit *res_digits; int res_ndigits; int res_weight; - int res_rscale; + int res_rscale, + rscale1, + rscale2; int res_dscale; int i, i1, @@ -4116,14 +4550,21 @@ add_abs(NumericVar *var1, NumericVar *var2, NumericVar *result) NumericDigit *var2digits = var2->digits; res_weight = Max(var1->weight, var2->weight) + 1; - res_rscale = Max(var1->rscale, var2->rscale); + res_dscale = Max(var1->dscale, var2->dscale); + + /* Note: here we are figuring rscale in base-NBASE digits */ + rscale1 = var1->ndigits - var1->weight - 1; + rscale2 = var2->ndigits - var2->weight - 1; + res_rscale = Max(rscale1, rscale2); + res_ndigits = res_rscale + res_weight + 1; if (res_ndigits <= 0) res_ndigits = 1; - res_buf = digitbuf_alloc(res_ndigits); - res_digits = res_buf; + res_buf = digitbuf_alloc(res_ndigits + 1); + res_buf[0] = 0; /* spare digit for later rounding */ + res_digits = res_buf + 1; i1 = res_rscale + var1->weight + 1; i2 = res_rscale + var2->weight + 1; @@ -4136,9 +4577,9 @@ add_abs(NumericVar *var1, NumericVar *var2, NumericVar *result) if (i2 >= 0 && i2 < var2ndigits) carry += var2digits[i2]; - if (carry >= 10) + if (carry >= NBASE) { - res_digits[i] = carry - 10; + res_digits[i] = carry - NBASE; carry = 1; } else @@ -4150,37 +4591,26 @@ add_abs(NumericVar *var1, NumericVar *var2, NumericVar *result) Assert(carry == 0); /* else we failed to allow for carry out */ - while (res_ndigits > 0 && *res_digits == 0) - { - res_digits++; - res_weight--; - res_ndigits--; - } - while (res_ndigits > 0 && res_digits[res_ndigits - 1] == 0) - res_ndigits--; - - if (res_ndigits == 0) - res_weight = 0; - digitbuf_free(result->buf); result->ndigits = res_ndigits; result->buf = res_buf; result->digits = res_digits; result->weight = res_weight; - result->rscale = res_rscale; result->dscale = res_dscale; + + /* Remove leading/trailing zeroes */ + strip_var(result); } -/* ---------- - * sub_abs() - +/* + * sub_abs() * * Subtract the absolute value of var2 from the absolute value of var1 * and store in result. result might point to one of the operands * without danger. * * ABS(var1) MUST BE GREATER OR EQUAL ABS(var2) !!! - * ---------- */ static void sub_abs(NumericVar *var1, NumericVar *var2, NumericVar *result) @@ -4189,7 +4619,9 @@ sub_abs(NumericVar *var1, NumericVar *var2, NumericVar *result) NumericDigit *res_digits; int res_ndigits; int res_weight; - int res_rscale; + int res_rscale, + rscale1, + rscale2; int res_dscale; int i, i1, @@ -4203,14 +4635,21 @@ sub_abs(NumericVar *var1, NumericVar *var2, NumericVar *result) NumericDigit *var2digits = var2->digits; res_weight = var1->weight; - res_rscale = Max(var1->rscale, var2->rscale); + res_dscale = Max(var1->dscale, var2->dscale); + + /* Note: here we are figuring rscale in base-NBASE digits */ + rscale1 = var1->ndigits - var1->weight - 1; + rscale2 = var2->ndigits - var2->weight - 1; + res_rscale = Max(rscale1, rscale2); + res_ndigits = res_rscale + res_weight + 1; if (res_ndigits <= 0) res_ndigits = 1; - res_buf = digitbuf_alloc(res_ndigits); - res_digits = res_buf; + res_buf = digitbuf_alloc(res_ndigits + 1); + res_buf[0] = 0; /* spare digit for later rounding */ + res_digits = res_buf + 1; i1 = res_rscale + var1->weight + 1; i2 = res_rscale + var2->weight + 1; @@ -4225,7 +4664,7 @@ sub_abs(NumericVar *var1, NumericVar *var2, NumericVar *result) if (borrow < 0) { - res_digits[i] = borrow + 10; + res_digits[i] = borrow + NBASE; borrow = -1; } else @@ -4237,23 +4676,219 @@ sub_abs(NumericVar *var1, NumericVar *var2, NumericVar *result) Assert(borrow == 0); /* else caller gave us var1 < var2 */ - while (res_ndigits > 0 && *res_digits == 0) - { - res_digits++; - res_weight--; - res_ndigits--; - } - while (res_ndigits > 0 && res_digits[res_ndigits - 1] == 0) - res_ndigits--; - - if (res_ndigits == 0) - res_weight = 0; - digitbuf_free(result->buf); result->ndigits = res_ndigits; result->buf = res_buf; result->digits = res_digits; result->weight = res_weight; - result->rscale = res_rscale; result->dscale = res_dscale; + + /* Remove leading/trailing zeroes */ + strip_var(result); +} + +/* + * round_var + * + * Round the value of a variable to no more than rscale decimal digits + * after the decimal point. NOTE: we allow rscale < 0 here, implying + * rounding before the decimal point. + */ +static void +round_var(NumericVar *var, int rscale) +{ + NumericDigit *digits = var->digits; + int di; + int ndigits; + int carry; + + var->dscale = rscale; + + /* decimal digits wanted */ + di = (var->weight + 1) * DEC_DIGITS + rscale; + + /* + * If di = 0, the value loses all digits, but could round up to 1 + * if its first extra digit is >= 5. If di < 0 the result must be 0. + */ + if (di < 0) + { + var->ndigits = 0; + var->weight = 0; + var->sign = NUMERIC_POS; + } + else + { + /* NBASE digits wanted */ + ndigits = (di + DEC_DIGITS-1) / DEC_DIGITS; + + /* 0, or number of decimal digits to keep in last NBASE digit */ + di %= DEC_DIGITS; + + if (ndigits < var->ndigits || + (ndigits == var->ndigits && di > 0)) + { + var->ndigits = ndigits; + +#if DEC_DIGITS == 1 + /* di must be zero */ + carry = (digits[ndigits] >= HALF_NBASE) ? 1 : 0; +#else + if (di == 0) + { + carry = (digits[ndigits] >= HALF_NBASE) ? 1 : 0; + } + else + { + /* Must round within last NBASE digit */ + int extra, + pow10; + +#if DEC_DIGITS == 4 + pow10 = round_powers[di]; +#elif DEC_DIGITS == 2 + pow10 = 10; +#else +#error unsupported NBASE +#endif + extra = digits[--ndigits] % pow10; + digits[ndigits] -= extra; + carry = 0; + if (extra >= pow10/2) + { + pow10 += digits[ndigits]; + if (pow10 >= NBASE) + { + pow10 -= NBASE; + carry = 1; + } + digits[ndigits] = pow10; + } + } +#endif + + /* Propagate carry if needed */ + while (carry) + { + carry += digits[--ndigits]; + if (carry >= NBASE) + { + digits[ndigits] = carry - NBASE; + carry = 1; + } + else + { + digits[ndigits] = carry; + carry = 0; + } + } + + if (ndigits < 0) + { + Assert(ndigits == -1); /* better not have added > 1 digit */ + Assert(var->digits > var->buf); + var->digits--; + var->ndigits++; + var->weight++; + } + } + } +} + +/* + * trunc_var + * + * Truncate the value of a variable at rscale decimal digits after the + * decimal point. NOTE: we allow rscale < 0 here, implying + * truncation before the decimal point. + */ +static void +trunc_var(NumericVar *var, int rscale) +{ + int di; + int ndigits; + + var->dscale = rscale; + + /* decimal digits wanted */ + di = (var->weight + 1) * DEC_DIGITS + rscale; + + /* + * If di <= 0, the value loses all digits. + */ + if (di <= 0) + { + var->ndigits = 0; + var->weight = 0; + var->sign = NUMERIC_POS; + } + else + { + /* NBASE digits wanted */ + ndigits = (di + DEC_DIGITS-1) / DEC_DIGITS; + + if (ndigits <= var->ndigits) + { + var->ndigits = ndigits; + +#if DEC_DIGITS == 1 + /* no within-digit stuff to worry about */ +#else + /* 0, or number of decimal digits to keep in last NBASE digit */ + di %= DEC_DIGITS; + + if (di > 0) + { + /* Must truncate within last NBASE digit */ + NumericDigit *digits = var->digits; + int extra, + pow10; + +#if DEC_DIGITS == 4 + pow10 = round_powers[di]; +#elif DEC_DIGITS == 2 + pow10 = 10; +#else +#error unsupported NBASE +#endif + extra = digits[--ndigits] % pow10; + digits[ndigits] -= extra; + } +#endif + } + } +} + +/* + * strip_var + * + * Strip any leading and trailing zeroes from a numeric variable + */ +static void +strip_var(NumericVar *var) +{ + NumericDigit *digits = var->digits; + int ndigits = var->ndigits; + + /* Strip leading zeroes */ + while (ndigits > 0 && *digits == 0) + { + digits++; + var->weight--; + ndigits--; + } + + /* Strip trailing zeroes */ + while (ndigits > 0 && digits[ndigits - 1] == 0) + ndigits--; + + /* If it's zero, normalize the sign and weight */ + if (ndigits == 0) + { + var->sign = NUMERIC_POS; + var->weight = 0; + } + + var->digits = digits; + var->ndigits = ndigits; } |