diff options
Diffstat (limited to 'src/backend/executor/nodeAgg.c')
-rw-r--r-- | src/backend/executor/nodeAgg.c | 64 |
1 files changed, 42 insertions, 22 deletions
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index 586509c50b2..02a9165c694 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -245,9 +245,11 @@ #include "catalog/pg_aggregate.h" #include "catalog/pg_proc.h" #include "catalog/pg_type.h" +#include "common/hashfn.h" #include "executor/execExpr.h" #include "executor/executor.h" #include "executor/nodeAgg.h" +#include "lib/hyperloglog.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" @@ -296,6 +298,14 @@ #define HASHAGG_WRITE_BUFFER_SIZE BLCKSZ /* + * HyperLogLog is used for estimating the cardinality of the spilled tuples in + * a given partition. 5 bits corresponds to a size of about 32 bytes and a + * worst-case error of around 18%. That's effective enough to choose a + * reasonable number of partitions when recursing. + */ +#define HASHAGG_HLL_BIT_WIDTH 5 + +/* * Estimate chunk overhead as a constant 16 bytes. XXX: should this be * improved? */ @@ -339,6 +349,7 @@ typedef struct HashAggSpill int64 *ntuples; /* number of tuples in each partition */ uint32 mask; /* mask to find partition from hash value */ int shift; /* after masking, shift by this amount */ + hyperLogLogState *hll_card; /* cardinality estimate for contents */ } HashAggSpill; /* @@ -357,6 +368,7 @@ typedef struct HashAggBatch LogicalTapeSet *tapeset; /* borrowed reference to tape set */ int input_tapenum; /* input partition tape */ int64 input_tuples; /* number of tuples in this batch */ + double input_card; /* estimated group cardinality */ } HashAggBatch; /* used to find referenced colnos */ @@ -411,7 +423,7 @@ static void hashagg_recompile_expressions(AggState *aggstate, bool minslot, static long hash_choose_num_buckets(double hashentrysize, long estimated_nbuckets, Size memory); -static int hash_choose_num_partitions(uint64 input_groups, +static int hash_choose_num_partitions(double input_groups, double hashentrysize, int used_bits, int *log2_npartittions); @@ -432,10 +444,11 @@ static void hashagg_finish_initial_spills(AggState *aggstate); static void hashagg_reset_spill_state(AggState *aggstate); static HashAggBatch *hashagg_batch_new(LogicalTapeSet *tapeset, int input_tapenum, int setno, - int64 input_tuples, int used_bits); + int64 input_tuples, double input_card, + int used_bits); static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp); static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, - int used_bits, uint64 input_tuples, + int used_bits, double input_groups, double hashentrysize); static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill, TupleTableSlot *slot, uint32 hash); @@ -1777,7 +1790,7 @@ hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck) * substantially larger than the initial value. */ void -hash_agg_set_limits(double hashentrysize, uint64 input_groups, int used_bits, +hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits, Size *mem_limit, uint64 *ngroups_limit, int *num_partitions) { @@ -1969,7 +1982,7 @@ hash_choose_num_buckets(double hashentrysize, long ngroups, Size memory) * *log2_npartitions to the log2() of the number of partitions. */ static int -hash_choose_num_partitions(uint64 input_groups, double hashentrysize, +hash_choose_num_partitions(double input_groups, double hashentrysize, int used_bits, int *log2_npartitions) { Size mem_wanted; @@ -2574,7 +2587,6 @@ agg_refill_hash_table(AggState *aggstate) AggStatePerHash perhash; HashAggSpill spill; HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo; - uint64 ngroups_estimate; bool spill_initialized = false; if (aggstate->hash_batches == NIL) @@ -2583,16 +2595,7 @@ agg_refill_hash_table(AggState *aggstate) batch = linitial(aggstate->hash_batches); aggstate->hash_batches = list_delete_first(aggstate->hash_batches); - /* - * Estimate the number of groups for this batch as the total number of - * tuples in its input file. Although that's a worst case, it's not bad - * here for two reasons: (1) overestimating is better than - * underestimating; and (2) we've already scanned the relation once, so - * it's likely that we've already finalized many of the common values. - */ - ngroups_estimate = batch->input_tuples; - - hash_agg_set_limits(aggstate->hashentrysize, ngroups_estimate, + hash_agg_set_limits(aggstate->hashentrysize, batch->input_card, batch->used_bits, &aggstate->hash_mem_limit, &aggstate->hash_ngroups_limit, NULL); @@ -2678,7 +2681,7 @@ agg_refill_hash_table(AggState *aggstate) */ spill_initialized = true; hashagg_spill_init(&spill, tapeinfo, batch->used_bits, - ngroups_estimate, aggstate->hashentrysize); + batch->input_card, aggstate->hashentrysize); } /* no memory for a new group, spill */ hashagg_spill_tuple(aggstate, &spill, spillslot, hash); @@ -2936,7 +2939,7 @@ hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum) */ static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits, - uint64 input_groups, double hashentrysize) + double input_groups, double hashentrysize) { int npartitions; int partition_bits; @@ -2946,6 +2949,7 @@ hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits, spill->partitions = palloc0(sizeof(int) * npartitions); spill->ntuples = palloc0(sizeof(int64) * npartitions); + spill->hll_card = palloc0(sizeof(hyperLogLogState) * npartitions); hashagg_tapeinfo_assign(tapeinfo, spill->partitions, npartitions); @@ -2953,6 +2957,9 @@ hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits, spill->shift = 32 - used_bits - partition_bits; spill->mask = (npartitions - 1) << spill->shift; spill->npartitions = npartitions; + + for (int i = 0; i < npartitions; i++) + initHyperLogLog(&spill->hll_card[i], HASHAGG_HLL_BIT_WIDTH); } /* @@ -3001,6 +3008,13 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill, partition = (hash & spill->mask) >> spill->shift; spill->ntuples[partition]++; + /* + * All hash values destined for a given partition have some bits in + * common, which causes bad HLL cardinality estimates. Hash the hash to + * get a more uniform distribution. + */ + addHyperLogLog(&spill->hll_card[partition], hash_bytes_uint32(hash)); + tapenum = spill->partitions[partition]; LogicalTapeWrite(tapeset, tapenum, (void *) &hash, sizeof(uint32)); @@ -3023,7 +3037,7 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill, */ static HashAggBatch * hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno, - int64 input_tuples, int used_bits) + int64 input_tuples, double input_card, int used_bits) { HashAggBatch *batch = palloc0(sizeof(HashAggBatch)); @@ -3032,6 +3046,7 @@ hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno, batch->tapeset = tapeset; batch->input_tapenum = tapenum; batch->input_tuples = input_tuples; + batch->input_card = input_card; return batch; } @@ -3135,21 +3150,26 @@ hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno) for (i = 0; i < spill->npartitions; i++) { - int tapenum = spill->partitions[i]; - HashAggBatch *new_batch; + int tapenum = spill->partitions[i]; + HashAggBatch *new_batch; + double cardinality; /* if the partition is empty, don't create a new batch of work */ if (spill->ntuples[i] == 0) continue; + cardinality = estimateHyperLogLog(&spill->hll_card[i]); + freeHyperLogLog(&spill->hll_card[i]); + new_batch = hashagg_batch_new(aggstate->hash_tapeinfo->tapeset, tapenum, setno, spill->ntuples[i], - used_bits); + cardinality, used_bits); aggstate->hash_batches = lcons(new_batch, aggstate->hash_batches); aggstate->hash_batches_used++; } pfree(spill->ntuples); + pfree(spill->hll_card); pfree(spill->partitions); } |