aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/nodeAgg.c
diff options
context:
space:
mode:
authorJeff Davis <jdavis@postgresql.org>2020-03-18 15:42:02 -0700
committerJeff Davis <jdavis@postgresql.org>2020-03-18 15:42:02 -0700
commit1f39bce021540fde00990af55b4432c55ef4b3c7 (patch)
treec2403fb61234d93408b23350a82ad429b3625af3 /src/backend/executor/nodeAgg.c
parente00912e11a9ec2d29274ed8a6465e81385906dc2 (diff)
downloadpostgresql-1f39bce021540fde00990af55b4432c55ef4b3c7.tar.gz
postgresql-1f39bce021540fde00990af55b4432c55ef4b3c7.zip
Disk-based Hash Aggregation.
While performing hash aggregation, track memory usage when adding new groups to a hash table. If the memory usage exceeds work_mem, enter "spill mode". In spill mode, new groups are not created in the hash table(s), but existing groups continue to be advanced if input tuples match. Tuples that would cause a new group to be created are instead spilled to a logical tape to be processed later. The tuples are spilled in a partitioned fashion. When all tuples from the outer plan are processed (either by advancing the group or spilling the tuple), finalize and emit the groups from the hash table. Then, create new batches of work from the spilled partitions, and select one of the saved batches and process it (possibly spilling recursively). Author: Jeff Davis Reviewed-by: Tomas Vondra, Adam Lee, Justin Pryzby, Taylor Vesely, Melanie Plageman Discussion: https://postgr.es/m/507ac540ec7c20136364b5272acbcd4574aa76ef.camel@j-davis.com
Diffstat (limited to 'src/backend/executor/nodeAgg.c')
-rw-r--r--src/backend/executor/nodeAgg.c1092
1 files changed, 1072 insertions, 20 deletions
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index 7aebb247d88..44c159ab2a3 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -194,6 +194,29 @@
* transition values. hashcontext is the single context created to support
* all hash tables.
*
+ * Spilling To Disk
+ *
+ * When performing hash aggregation, if the hash table memory exceeds the
+ * limit (see hash_agg_check_limits()), we enter "spill mode". In spill
+ * mode, we advance the transition states only for groups already in the
+ * hash table. For tuples that would need to create a new hash table
+ * entries (and initialize new transition states), we instead spill them to
+ * disk to be processed later. The tuples are spilled in a partitioned
+ * manner, so that subsequent batches are smaller and less likely to exceed
+ * work_mem (if a batch does exceed work_mem, it must be spilled
+ * recursively).
+ *
+ * Spilled data is written to logical tapes. These provide better control
+ * over memory usage, disk space, and the number of files than if we were
+ * to use a BufFile for each spill.
+ *
+ * Note that it's possible for transition states to start small but then
+ * grow very large; for instance in the case of ARRAY_AGG. In such cases,
+ * it's still possible to significantly exceed work_mem. We try to avoid
+ * this situation by estimating what will fit in the available memory, and
+ * imposing a limit on the number of groups separately from the amount of
+ * memory consumed.
+ *
* Transition / Combine function invocation:
*
* For performance reasons transition functions, including combine
@@ -233,12 +256,105 @@
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/datum.h"
+#include "utils/dynahash.h"
#include "utils/expandeddatum.h"
+#include "utils/logtape.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
#include "utils/tuplesort.h"
+/*
+ * Control how many partitions are created when spilling HashAgg to
+ * disk.
+ *
+ * HASHAGG_PARTITION_FACTOR is multiplied by the estimated number of
+ * partitions needed such that each partition will fit in memory. The factor
+ * is set higher than one because there's not a high cost to having a few too
+ * many partitions, and it makes it less likely that a partition will need to
+ * be spilled recursively. Another benefit of having more, smaller partitions
+ * is that small hash tables may perform better than large ones due to memory
+ * caching effects.
+ *
+ * We also specify a min and max number of partitions per spill. Too few might
+ * mean a lot of wasted I/O from repeated spilling of the same tuples. Too
+ * many will result in lots of memory wasted buffering the spill files (which
+ * could instead be spent on a larger hash table).
+ */
+#define HASHAGG_PARTITION_FACTOR 1.50
+#define HASHAGG_MIN_PARTITIONS 4
+#define HASHAGG_MAX_PARTITIONS 1024
+
+/*
+ * For reading from tapes, the buffer size must be a multiple of
+ * BLCKSZ. Larger values help when reading from multiple tapes concurrently,
+ * but that doesn't happen in HashAgg, so we simply use BLCKSZ. Writing to a
+ * tape always uses a buffer of size BLCKSZ.
+ */
+#define HASHAGG_READ_BUFFER_SIZE BLCKSZ
+#define HASHAGG_WRITE_BUFFER_SIZE BLCKSZ
+
+/* minimum number of initial hash table buckets */
+#define HASHAGG_MIN_BUCKETS 256
+
+/*
+ * Track all tapes needed for a HashAgg that spills. We don't know the maximum
+ * number of tapes needed at the start of the algorithm (because it can
+ * recurse), so one tape set is allocated and extended as needed for new
+ * tapes. When a particular tape is already read, rewind it for write mode and
+ * put it in the free list.
+ *
+ * Tapes' buffers can take up substantial memory when many tapes are open at
+ * once. We only need one tape open at a time in read mode (using a buffer
+ * that's a multiple of BLCKSZ); but we need one tape open in write mode (each
+ * requiring a buffer of size BLCKSZ) for each partition.
+ */
+typedef struct HashTapeInfo
+{
+ LogicalTapeSet *tapeset;
+ int ntapes;
+ int *freetapes;
+ int nfreetapes;
+ int freetapes_alloc;
+} HashTapeInfo;
+
+/*
+ * Represents partitioned spill data for a single hashtable. Contains the
+ * necessary information to route tuples to the correct partition, and to
+ * transform the spilled data into new batches.
+ *
+ * The high bits are used for partition selection (when recursing, we ignore
+ * the bits that have already been used for partition selection at an earlier
+ * level).
+ */
+typedef struct HashAggSpill
+{
+ LogicalTapeSet *tapeset; /* borrowed reference to tape set */
+ int npartitions; /* number of partitions */
+ int *partitions; /* spill partition tape numbers */
+ int64 *ntuples; /* number of tuples in each partition */
+ uint32 mask; /* mask to find partition from hash value */
+ int shift; /* after masking, shift by this amount */
+} HashAggSpill;
+
+/*
+ * Represents work to be done for one pass of hash aggregation (with only one
+ * grouping set).
+ *
+ * Also tracks the bits of the hash already used for partition selection by
+ * earlier iterations, so that this batch can use new bits. If all bits have
+ * already been used, no partitioning will be done (any spilled data will go
+ * to a single output tape).
+ */
+typedef struct HashAggBatch
+{
+ int setno; /* grouping set */
+ int used_bits; /* number of bits of hash already used */
+ LogicalTapeSet *tapeset; /* borrowed reference to tape set */
+ int input_tapenum; /* input partition tape */
+ int64 input_tuples; /* number of tuples in this batch */
+} HashAggBatch;
+
static void select_current_set(AggState *aggstate, int setno, bool is_hash);
static void initialize_phase(AggState *aggstate, int newphase);
static TupleTableSlot *fetch_input_tuple(AggState *aggstate);
@@ -275,11 +391,43 @@ static Bitmapset *find_unaggregated_cols(AggState *aggstate);
static bool find_unaggregated_cols_walker(Node *node, Bitmapset **colnos);
static void build_hash_tables(AggState *aggstate);
static void build_hash_table(AggState *aggstate, int setno, long nbuckets);
+static void hashagg_recompile_expressions(AggState *aggstate, bool minslot,
+ bool nullcheck);
+static long hash_choose_num_buckets(double hashentrysize,
+ long estimated_nbuckets,
+ Size memory);
+static int hash_choose_num_partitions(uint64 input_groups,
+ double hashentrysize,
+ int used_bits,
+ int *log2_npartittions);
static AggStatePerGroup lookup_hash_entry(AggState *aggstate, uint32 hash);
static void lookup_hash_entries(AggState *aggstate);
static TupleTableSlot *agg_retrieve_direct(AggState *aggstate);
static void agg_fill_hash_table(AggState *aggstate);
+static bool agg_refill_hash_table(AggState *aggstate);
static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate);
+static TupleTableSlot *agg_retrieve_hash_table_in_memory(AggState *aggstate);
+static void hash_agg_check_limits(AggState *aggstate);
+static void hash_agg_enter_spill_mode(AggState *aggstate);
+static void hash_agg_update_metrics(AggState *aggstate, bool from_tape,
+ int npartitions);
+static void hashagg_finish_initial_spills(AggState *aggstate);
+static void hashagg_reset_spill_state(AggState *aggstate);
+static HashAggBatch *hashagg_batch_new(LogicalTapeSet *tapeset,
+ int input_tapenum, int setno,
+ int64 input_tuples, int used_bits);
+static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp);
+static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo,
+ int used_bits, uint64 input_tuples,
+ double hashentrysize);
+static Size hashagg_spill_tuple(HashAggSpill *spill, TupleTableSlot *slot,
+ uint32 hash);
+static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill,
+ int setno);
+static void hashagg_tapeinfo_init(AggState *aggstate);
+static void hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *dest,
+ int ndest);
+static void hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum);
static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
static void build_pertrans_for_aggref(AggStatePerTrans pertrans,
AggState *aggstate, EState *estate,
@@ -1287,14 +1435,27 @@ build_hash_tables(AggState *aggstate)
for (setno = 0; setno < aggstate->num_hashes; ++setno)
{
AggStatePerHash perhash = &aggstate->perhash[setno];
+ long nbuckets;
+ Size memory;
+
+ if (perhash->hashtable != NULL)
+ {
+ ResetTupleHashTable(perhash->hashtable);
+ continue;
+ }
Assert(perhash->aggnode->numGroups > 0);
- if (perhash->hashtable)
- ResetTupleHashTable(perhash->hashtable);
- else
- build_hash_table(aggstate, setno, perhash->aggnode->numGroups);
+ memory = aggstate->hash_mem_limit / aggstate->num_hashes;
+
+ /* choose reasonable number of buckets per hashtable */
+ nbuckets = hash_choose_num_buckets(
+ aggstate->hashentrysize, perhash->aggnode->numGroups, memory);
+
+ build_hash_table(aggstate, setno, nbuckets);
}
+
+ aggstate->hash_ngroups_current = 0;
}
/*
@@ -1304,7 +1465,7 @@ static void
build_hash_table(AggState *aggstate, int setno, long nbuckets)
{
AggStatePerHash perhash = &aggstate->perhash[setno];
- MemoryContext metacxt = aggstate->ss.ps.state->es_query_cxt;
+ MemoryContext metacxt = aggstate->hash_metacxt;
MemoryContext hashcxt = aggstate->hashcontext->ecxt_per_tuple_memory;
MemoryContext tmpcxt = aggstate->tmpcontext->ecxt_per_tuple_memory;
Size additionalsize;
@@ -1488,6 +1649,320 @@ hash_agg_entry_size(int numAggs, Size tupleWidth, Size transitionSpace)
}
/*
+ * hashagg_recompile_expressions()
+ *
+ * Identifies the right phase, compiles the right expression given the
+ * arguments, and then sets phase->evalfunc to that expression.
+ *
+ * Different versions of the compiled expression are needed depending on
+ * whether hash aggregation has spilled or not, and whether it's reading from
+ * the outer plan or a tape. Before spilling to disk, the expression reads
+ * from the outer plan and does not need to perform a NULL check. After
+ * HashAgg begins to spill, new groups will not be created in the hash table,
+ * and the AggStatePerGroup array may be NULL; therefore we need to add a null
+ * pointer check to the expression. Then, when reading spilled data from a
+ * tape, we change the outer slot type to be a fixed minimal tuple slot.
+ *
+ * It would be wasteful to recompile every time, so cache the compiled
+ * expressions in the AggStatePerPhase, and reuse when appropriate.
+ */
+static void
+hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck)
+{
+ AggStatePerPhase phase;
+ int i = minslot ? 1 : 0;
+ int j = nullcheck ? 1 : 0;
+
+ Assert(aggstate->aggstrategy == AGG_HASHED ||
+ aggstate->aggstrategy == AGG_MIXED);
+
+ if (aggstate->aggstrategy == AGG_HASHED)
+ phase = &aggstate->phases[0];
+ else /* AGG_MIXED */
+ phase = &aggstate->phases[1];
+
+ if (phase->evaltrans_cache[i][j] == NULL)
+ {
+ const TupleTableSlotOps *outerops = aggstate->ss.ps.outerops;
+ bool outerfixed = aggstate->ss.ps.outeropsfixed;
+ bool dohash = true;
+ bool dosort;
+
+ dosort = aggstate->aggstrategy == AGG_MIXED ? true : false;
+
+ /* temporarily change the outerops while compiling the expression */
+ if (minslot)
+ {
+ aggstate->ss.ps.outerops = &TTSOpsMinimalTuple;
+ aggstate->ss.ps.outeropsfixed = true;
+ }
+
+ phase->evaltrans_cache[i][j] = ExecBuildAggTrans(
+ aggstate, phase, dosort, dohash, nullcheck);
+
+ /* change back */
+ aggstate->ss.ps.outerops = outerops;
+ aggstate->ss.ps.outeropsfixed = outerfixed;
+ }
+
+ phase->evaltrans = phase->evaltrans_cache[i][j];
+}
+
+/*
+ * Set limits that trigger spilling to avoid exceeding work_mem. Consider the
+ * number of partitions we expect to create (if we do spill).
+ *
+ * There are two limits: a memory limit, and also an ngroups limit. The
+ * ngroups limit becomes important when we expect transition values to grow
+ * substantially larger than the initial value.
+ */
+void
+hash_agg_set_limits(double hashentrysize, uint64 input_groups, int used_bits,
+ Size *mem_limit, uint64 *ngroups_limit,
+ int *num_partitions)
+{
+ int npartitions;
+ Size partition_mem;
+
+ /* if not expected to spill, use all of work_mem */
+ if (input_groups * hashentrysize < work_mem * 1024L)
+ {
+ *mem_limit = work_mem * 1024L;
+ *ngroups_limit = *mem_limit / hashentrysize;
+ return;
+ }
+
+ /*
+ * Calculate expected memory requirements for spilling, which is the size
+ * of the buffers needed for all the tapes that need to be open at
+ * once. Then, subtract that from the memory available for holding hash
+ * tables.
+ */
+ npartitions = hash_choose_num_partitions(input_groups,
+ hashentrysize,
+ used_bits,
+ NULL);
+ if (num_partitions != NULL)
+ *num_partitions = npartitions;
+
+ partition_mem =
+ HASHAGG_READ_BUFFER_SIZE +
+ HASHAGG_WRITE_BUFFER_SIZE * npartitions;
+
+ /*
+ * Don't set the limit below 3/4 of work_mem. In that case, we are at the
+ * minimum number of partitions, so we aren't going to dramatically exceed
+ * work mem anyway.
+ */
+ if (work_mem * 1024L > 4 * partition_mem)
+ *mem_limit = work_mem * 1024L - partition_mem;
+ else
+ *mem_limit = work_mem * 1024L * 0.75;
+
+ if (*mem_limit > hashentrysize)
+ *ngroups_limit = *mem_limit / hashentrysize;
+ else
+ *ngroups_limit = 1;
+}
+
+/*
+ * hash_agg_check_limits
+ *
+ * After adding a new group to the hash table, check whether we need to enter
+ * spill mode. Allocations may happen without adding new groups (for instance,
+ * if the transition state size grows), so this check is imperfect.
+ */
+static void
+hash_agg_check_limits(AggState *aggstate)
+{
+ uint64 ngroups = aggstate->hash_ngroups_current;
+ Size meta_mem = MemoryContextMemAllocated(
+ aggstate->hash_metacxt, true);
+ Size hash_mem = MemoryContextMemAllocated(
+ aggstate->hashcontext->ecxt_per_tuple_memory, true);
+
+ /*
+ * Don't spill unless there's at least one group in the hash table so we
+ * can be sure to make progress even in edge cases.
+ */
+ if (aggstate->hash_ngroups_current > 0 &&
+ (meta_mem + hash_mem > aggstate->hash_mem_limit ||
+ ngroups > aggstate->hash_ngroups_limit))
+ {
+ hash_agg_enter_spill_mode(aggstate);
+ }
+}
+
+/*
+ * Enter "spill mode", meaning that no new groups are added to any of the hash
+ * tables. Tuples that would create a new group are instead spilled, and
+ * processed later.
+ */
+static void
+hash_agg_enter_spill_mode(AggState *aggstate)
+{
+ aggstate->hash_spill_mode = true;
+ hashagg_recompile_expressions(aggstate, aggstate->table_filled, true);
+
+ if (!aggstate->hash_ever_spilled)
+ {
+ Assert(aggstate->hash_tapeinfo == NULL);
+ Assert(aggstate->hash_spills == NULL);
+
+ aggstate->hash_ever_spilled = true;
+
+ hashagg_tapeinfo_init(aggstate);
+
+ aggstate->hash_spills = palloc(
+ sizeof(HashAggSpill) * aggstate->num_hashes);
+
+ for (int setno = 0; setno < aggstate->num_hashes; setno++)
+ {
+ AggStatePerHash perhash = &aggstate->perhash[setno];
+ HashAggSpill *spill = &aggstate->hash_spills[setno];
+
+ hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
+ perhash->aggnode->numGroups,
+ aggstate->hashentrysize);
+ }
+ }
+}
+
+/*
+ * Update metrics after filling the hash table.
+ *
+ * If reading from the outer plan, from_tape should be false; if reading from
+ * another tape, from_tape should be true.
+ */
+static void
+hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions)
+{
+ Size meta_mem;
+ Size hash_mem;
+ Size buffer_mem;
+ Size total_mem;
+
+ if (aggstate->aggstrategy != AGG_MIXED &&
+ aggstate->aggstrategy != AGG_HASHED)
+ return;
+
+ /* memory for the hash table itself */
+ meta_mem = MemoryContextMemAllocated(aggstate->hash_metacxt, true);
+
+ /* memory for the group keys and transition states */
+ hash_mem = MemoryContextMemAllocated(
+ aggstate->hashcontext->ecxt_per_tuple_memory, true);
+
+ /* memory for read/write tape buffers, if spilled */
+ buffer_mem = npartitions * HASHAGG_WRITE_BUFFER_SIZE;
+ if (from_tape)
+ buffer_mem += HASHAGG_READ_BUFFER_SIZE;
+
+ /* update peak mem */
+ total_mem = meta_mem + hash_mem + buffer_mem;
+ if (total_mem > aggstate->hash_mem_peak)
+ aggstate->hash_mem_peak = total_mem;
+
+ /* update disk usage */
+ if (aggstate->hash_tapeinfo != NULL)
+ {
+ uint64 disk_used = LogicalTapeSetBlocks(
+ aggstate->hash_tapeinfo->tapeset) * (BLCKSZ / 1024);
+
+ if (aggstate->hash_disk_used < disk_used)
+ aggstate->hash_disk_used = disk_used;
+ }
+
+ /*
+ * Update hashentrysize estimate based on contents. Don't include meta_mem
+ * in the memory used, because empty buckets would inflate the per-entry
+ * cost. An underestimate of the per-entry size is better than an
+ * overestimate, because an overestimate could compound with each level of
+ * recursion.
+ */
+ if (aggstate->hash_ngroups_current > 0)
+ {
+ aggstate->hashentrysize =
+ hash_mem / (double)aggstate->hash_ngroups_current;
+ }
+}
+
+/*
+ * Choose a reasonable number of buckets for the initial hash table size.
+ */
+static long
+hash_choose_num_buckets(double hashentrysize, long ngroups, Size memory)
+{
+ long max_nbuckets;
+ long nbuckets = ngroups;
+
+ max_nbuckets = memory / hashentrysize;
+
+ /*
+ * Leave room for slop to avoid a case where the initial hash table size
+ * exceeds the memory limit (though that may still happen in edge cases).
+ */
+ max_nbuckets *= 0.75;
+
+ if (nbuckets > max_nbuckets)
+ nbuckets = max_nbuckets;
+ if (nbuckets < HASHAGG_MIN_BUCKETS)
+ nbuckets = HASHAGG_MIN_BUCKETS;
+ return nbuckets;
+}
+
+/*
+ * Determine the number of partitions to create when spilling, which will
+ * always be a power of two. If log2_npartitions is non-NULL, set
+ * *log2_npartitions to the log2() of the number of partitions.
+ */
+static int
+hash_choose_num_partitions(uint64 input_groups, double hashentrysize,
+ int used_bits, int *log2_npartitions)
+{
+ Size mem_wanted;
+ int partition_limit;
+ int npartitions;
+ int partition_bits;
+
+ /*
+ * Avoid creating so many partitions that the memory requirements of the
+ * open partition files are greater than 1/4 of work_mem.
+ */
+ partition_limit =
+ (work_mem * 1024L * 0.25 - HASHAGG_READ_BUFFER_SIZE) /
+ HASHAGG_WRITE_BUFFER_SIZE;
+
+ mem_wanted = HASHAGG_PARTITION_FACTOR * input_groups * hashentrysize;
+
+ /* make enough partitions so that each one is likely to fit in memory */
+ npartitions = 1 + (mem_wanted / (work_mem * 1024L));
+
+ if (npartitions > partition_limit)
+ npartitions = partition_limit;
+
+ if (npartitions < HASHAGG_MIN_PARTITIONS)
+ npartitions = HASHAGG_MIN_PARTITIONS;
+ if (npartitions > HASHAGG_MAX_PARTITIONS)
+ npartitions = HASHAGG_MAX_PARTITIONS;
+
+ /* ceil(log2(npartitions)) */
+ partition_bits = my_log2(npartitions);
+
+ /* make sure that we don't exhaust the hash bits */
+ if (partition_bits + used_bits >= 32)
+ partition_bits = 32 - used_bits;
+
+ if (log2_npartitions != NULL)
+ *log2_npartitions = partition_bits;
+
+ /* number of partitions will be a power of two */
+ npartitions = 1L << partition_bits;
+
+ return npartitions;
+}
+
+/*
* Find or create a hashtable entry for the tuple group containing the current
* tuple (already set in tmpcontext's outertuple slot), in the current grouping
* set (which the caller must have selected - note that initialize_aggregate
@@ -1495,6 +1970,10 @@ hash_agg_entry_size(int numAggs, Size tupleWidth, Size transitionSpace)
*
* When called, CurrentMemoryContext should be the per-query context. The
* already-calculated hash value for the tuple must be specified.
+ *
+ * If in "spill mode", then only find existing hashtable entries; don't create
+ * new ones. If a tuple's group is not already present in the hash table for
+ * the current grouping set, return NULL and the caller will spill it to disk.
*/
static AggStatePerGroup
lookup_hash_entry(AggState *aggstate, uint32 hash)
@@ -1502,16 +1981,26 @@ lookup_hash_entry(AggState *aggstate, uint32 hash)
AggStatePerHash perhash = &aggstate->perhash[aggstate->current_set];
TupleTableSlot *hashslot = perhash->hashslot;
TupleHashEntryData *entry;
- bool isnew;
+ bool isnew = false;
+ bool *p_isnew;
+
+ /* if hash table already spilled, don't create new entries */
+ p_isnew = aggstate->hash_spill_mode ? NULL : &isnew;
/* find or create the hashtable entry using the filtered tuple */
- entry = LookupTupleHashEntryHash(perhash->hashtable, hashslot, &isnew,
+ entry = LookupTupleHashEntryHash(perhash->hashtable, hashslot, p_isnew,
hash);
+ if (entry == NULL)
+ return NULL;
+
if (isnew)
{
- AggStatePerGroup pergroup;
- int transno;
+ AggStatePerGroup pergroup;
+ int transno;
+
+ aggstate->hash_ngroups_current++;
+ hash_agg_check_limits(aggstate);
pergroup = (AggStatePerGroup)
MemoryContextAlloc(perhash->hashtable->tablecxt,
@@ -1539,23 +2028,48 @@ lookup_hash_entry(AggState *aggstate, uint32 hash)
* returning an array of pergroup pointers suitable for advance_aggregates.
*
* Be aware that lookup_hash_entry can reset the tmpcontext.
+ *
+ * Some entries may be left NULL if we are in "spill mode". The same tuple
+ * will belong to different groups for each grouping set, so may match a group
+ * already in memory for one set and match a group not in memory for another
+ * set. When in "spill mode", the tuple will be spilled for each grouping set
+ * where it doesn't match a group in memory.
+ *
+ * NB: It's possible to spill the same tuple for several different grouping
+ * sets. This may seem wasteful, but it's actually a trade-off: if we spill
+ * the tuple multiple times for multiple grouping sets, it can be partitioned
+ * for each grouping set, making the refilling of the hash table very
+ * efficient.
*/
static void
lookup_hash_entries(AggState *aggstate)
{
- int numHashes = aggstate->num_hashes;
AggStatePerGroup *pergroup = aggstate->hash_pergroup;
int setno;
- for (setno = 0; setno < numHashes; setno++)
+ for (setno = 0; setno < aggstate->num_hashes; setno++)
{
- AggStatePerHash perhash = &aggstate->perhash[setno];
+ AggStatePerHash perhash = &aggstate->perhash[setno];
uint32 hash;
select_current_set(aggstate, setno, true);
prepare_hash_slot(aggstate);
hash = TupleHashTableHash(perhash->hashtable, perhash->hashslot);
pergroup[setno] = lookup_hash_entry(aggstate, hash);
+
+ /* check to see if we need to spill the tuple for this grouping set */
+ if (pergroup[setno] == NULL)
+ {
+ HashAggSpill *spill = &aggstate->hash_spills[setno];
+ TupleTableSlot *slot = aggstate->tmpcontext->ecxt_outertuple;
+
+ if (spill->partitions == NULL)
+ hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
+ perhash->aggnode->numGroups,
+ aggstate->hashentrysize);
+
+ hashagg_spill_tuple(spill, slot, hash);
+ }
}
}
@@ -1878,6 +2392,12 @@ agg_retrieve_direct(AggState *aggstate)
if (TupIsNull(outerslot))
{
/* no more outer-plan tuples available */
+
+ /* if we built hash tables, finalize any spills */
+ if (aggstate->aggstrategy == AGG_MIXED &&
+ aggstate->current_phase == 1)
+ hashagg_finish_initial_spills(aggstate);
+
if (hasGroupingSets)
{
aggstate->input_done = true;
@@ -1980,6 +2500,9 @@ agg_fill_hash_table(AggState *aggstate)
ResetExprContext(aggstate->tmpcontext);
}
+ /* finalize spills, if any */
+ hashagg_finish_initial_spills(aggstate);
+
aggstate->table_filled = true;
/* Initialize to walk the first hash table */
select_current_set(aggstate, 0, true);
@@ -1988,11 +2511,189 @@ agg_fill_hash_table(AggState *aggstate)
}
/*
+ * If any data was spilled during hash aggregation, reset the hash table and
+ * reprocess one batch of spilled data. After reprocessing a batch, the hash
+ * table will again contain data, ready to be consumed by
+ * agg_retrieve_hash_table_in_memory().
+ *
+ * Should only be called after all in memory hash table entries have been
+ * finalized and emitted.
+ *
+ * Return false when input is exhausted and there's no more work to be done;
+ * otherwise return true.
+ */
+static bool
+agg_refill_hash_table(AggState *aggstate)
+{
+ HashAggBatch *batch;
+ HashAggSpill spill;
+ HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
+ uint64 ngroups_estimate;
+ bool spill_initialized = false;
+
+ if (aggstate->hash_batches == NIL)
+ return false;
+
+ batch = linitial(aggstate->hash_batches);
+ aggstate->hash_batches = list_delete_first(aggstate->hash_batches);
+
+ /*
+ * Estimate the number of groups for this batch as the total number of
+ * tuples in its input file. Although that's a worst case, it's not bad
+ * here for two reasons: (1) overestimating is better than
+ * underestimating; and (2) we've already scanned the relation once, so
+ * it's likely that we've already finalized many of the common values.
+ */
+ ngroups_estimate = batch->input_tuples;
+
+ hash_agg_set_limits(aggstate->hashentrysize, ngroups_estimate,
+ batch->used_bits, &aggstate->hash_mem_limit,
+ &aggstate->hash_ngroups_limit, NULL);
+
+ /* there could be residual pergroup pointers; clear them */
+ for (int setoff = 0;
+ setoff < aggstate->maxsets + aggstate->num_hashes;
+ setoff++)
+ aggstate->all_pergroups[setoff] = NULL;
+
+ /* free memory and reset hash tables */
+ ReScanExprContext(aggstate->hashcontext);
+ for (int setno = 0; setno < aggstate->num_hashes; setno++)
+ ResetTupleHashTable(aggstate->perhash[setno].hashtable);
+
+ aggstate->hash_ngroups_current = 0;
+
+ /*
+ * In AGG_MIXED mode, hash aggregation happens in phase 1 and the output
+ * happens in phase 0. So, we switch to phase 1 when processing a batch,
+ * and back to phase 0 after the batch is done.
+ */
+ Assert(aggstate->current_phase == 0);
+ if (aggstate->phase->aggstrategy == AGG_MIXED)
+ {
+ aggstate->current_phase = 1;
+ aggstate->phase = &aggstate->phases[aggstate->current_phase];
+ }
+
+ select_current_set(aggstate, batch->setno, true);
+
+ /*
+ * Spilled tuples are always read back as MinimalTuples, which may be
+ * different from the outer plan, so recompile the aggregate expressions.
+ *
+ * We still need the NULL check, because we are only processing one
+ * grouping set at a time and the rest will be NULL.
+ */
+ hashagg_recompile_expressions(aggstate, true, true);
+
+ LogicalTapeRewindForRead(tapeinfo->tapeset, batch->input_tapenum,
+ HASHAGG_READ_BUFFER_SIZE);
+ for (;;) {
+ TupleTableSlot *slot = aggstate->hash_spill_slot;
+ MinimalTuple tuple;
+ uint32 hash;
+
+ CHECK_FOR_INTERRUPTS();
+
+ tuple = hashagg_batch_read(batch, &hash);
+ if (tuple == NULL)
+ break;
+
+ ExecStoreMinimalTuple(tuple, slot, true);
+ aggstate->tmpcontext->ecxt_outertuple = slot;
+
+ prepare_hash_slot(aggstate);
+ aggstate->hash_pergroup[batch->setno] = lookup_hash_entry(aggstate, hash);
+
+ if (aggstate->hash_pergroup[batch->setno] != NULL)
+ {
+ /* Advance the aggregates (or combine functions) */
+ advance_aggregates(aggstate);
+ }
+ else
+ {
+ if (!spill_initialized)
+ {
+ /*
+ * Avoid initializing the spill until we actually need it so
+ * that we don't assign tapes that will never be used.
+ */
+ spill_initialized = true;
+ hashagg_spill_init(&spill, tapeinfo, batch->used_bits,
+ ngroups_estimate, aggstate->hashentrysize);
+ }
+ /* no memory for a new group, spill */
+ hashagg_spill_tuple(&spill, slot, hash);
+ }
+
+ /*
+ * Reset per-input-tuple context after each tuple, but note that the
+ * hash lookups do this too
+ */
+ ResetExprContext(aggstate->tmpcontext);
+ }
+
+ hashagg_tapeinfo_release(tapeinfo, batch->input_tapenum);
+
+ /* change back to phase 0 */
+ aggstate->current_phase = 0;
+ aggstate->phase = &aggstate->phases[aggstate->current_phase];
+
+ if (spill_initialized)
+ {
+ hash_agg_update_metrics(aggstate, true, spill.npartitions);
+ hashagg_spill_finish(aggstate, &spill, batch->setno);
+ }
+ else
+ hash_agg_update_metrics(aggstate, true, 0);
+
+ aggstate->hash_spill_mode = false;
+
+ /* prepare to walk the first hash table */
+ select_current_set(aggstate, batch->setno, true);
+ ResetTupleHashIterator(aggstate->perhash[batch->setno].hashtable,
+ &aggstate->perhash[batch->setno].hashiter);
+
+ pfree(batch);
+
+ return true;
+}
+
+/*
* ExecAgg for hashed case: retrieving groups from hash table
+ *
+ * After exhausting in-memory tuples, also try refilling the hash table using
+ * previously-spilled tuples. Only returns NULL after all in-memory and
+ * spilled tuples are exhausted.
*/
static TupleTableSlot *
agg_retrieve_hash_table(AggState *aggstate)
{
+ TupleTableSlot *result = NULL;
+
+ while (result == NULL)
+ {
+ result = agg_retrieve_hash_table_in_memory(aggstate);
+ if (result == NULL)
+ {
+ if (!agg_refill_hash_table(aggstate))
+ {
+ aggstate->agg_done = true;
+ break;
+ }
+ }
+ }
+
+ return result;
+}
+
+/*
+ * Retrieve the groups from the in-memory hash tables without considering any
+ * spilled tuples.
+ */
+static TupleTableSlot *
+agg_retrieve_hash_table_in_memory(AggState *aggstate)
+{
ExprContext *econtext;
AggStatePerAgg peragg;
AggStatePerGroup pergroup;
@@ -2020,7 +2721,7 @@ agg_retrieve_hash_table(AggState *aggstate)
* We loop retrieving groups until we find one satisfying
* aggstate->ss.ps.qual
*/
- while (!aggstate->agg_done)
+ for (;;)
{
TupleTableSlot *hashslot = perhash->hashslot;
int i;
@@ -2051,8 +2752,6 @@ agg_retrieve_hash_table(AggState *aggstate)
}
else
{
- /* No more hashtables, so done */
- aggstate->agg_done = true;
return NULL;
}
}
@@ -2109,6 +2808,313 @@ agg_retrieve_hash_table(AggState *aggstate)
return NULL;
}
+/*
+ * Initialize HashTapeInfo
+ */
+static void
+hashagg_tapeinfo_init(AggState *aggstate)
+{
+ HashTapeInfo *tapeinfo = palloc(sizeof(HashTapeInfo));
+ int init_tapes = 16; /* expanded dynamically */
+
+ tapeinfo->tapeset = LogicalTapeSetCreate(init_tapes, NULL, NULL, -1);
+ tapeinfo->ntapes = init_tapes;
+ tapeinfo->nfreetapes = init_tapes;
+ tapeinfo->freetapes_alloc = init_tapes;
+ tapeinfo->freetapes = palloc(init_tapes * sizeof(int));
+ for (int i = 0; i < init_tapes; i++)
+ tapeinfo->freetapes[i] = i;
+
+ aggstate->hash_tapeinfo = tapeinfo;
+}
+
+/*
+ * Assign unused tapes to spill partitions, extending the tape set if
+ * necessary.
+ */
+static void
+hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *partitions,
+ int npartitions)
+{
+ int partidx = 0;
+
+ /* use free tapes if available */
+ while (partidx < npartitions && tapeinfo->nfreetapes > 0)
+ partitions[partidx++] = tapeinfo->freetapes[--tapeinfo->nfreetapes];
+
+ if (partidx < npartitions)
+ {
+ LogicalTapeSetExtend(tapeinfo->tapeset, npartitions - partidx);
+
+ while (partidx < npartitions)
+ partitions[partidx++] = tapeinfo->ntapes++;
+ }
+}
+
+/*
+ * After a tape has already been written to and then read, this function
+ * rewinds it for writing and adds it to the free list.
+ */
+static void
+hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum)
+{
+ LogicalTapeRewindForWrite(tapeinfo->tapeset, tapenum);
+ if (tapeinfo->freetapes_alloc == tapeinfo->nfreetapes)
+ {
+ tapeinfo->freetapes_alloc <<= 1;
+ tapeinfo->freetapes = repalloc(
+ tapeinfo->freetapes, tapeinfo->freetapes_alloc * sizeof(int));
+ }
+ tapeinfo->freetapes[tapeinfo->nfreetapes++] = tapenum;
+}
+
+/*
+ * hashagg_spill_init
+ *
+ * Called after we determined that spilling is necessary. Chooses the number
+ * of partitions to create, and initializes them.
+ */
+static void
+hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
+ uint64 input_groups, double hashentrysize)
+{
+ int npartitions;
+ int partition_bits;
+
+ npartitions = hash_choose_num_partitions(
+ input_groups, hashentrysize, used_bits, &partition_bits);
+
+ spill->partitions = palloc0(sizeof(int) * npartitions);
+ spill->ntuples = palloc0(sizeof(int64) * npartitions);
+
+ hashagg_tapeinfo_assign(tapeinfo, spill->partitions, npartitions);
+
+ spill->tapeset = tapeinfo->tapeset;
+ spill->shift = 32 - used_bits - partition_bits;
+ spill->mask = (npartitions - 1) << spill->shift;
+ spill->npartitions = npartitions;
+}
+
+/*
+ * hashagg_spill_tuple
+ *
+ * No room for new groups in the hash table. Save for later in the appropriate
+ * partition.
+ */
+static Size
+hashagg_spill_tuple(HashAggSpill *spill, TupleTableSlot *slot, uint32 hash)
+{
+ LogicalTapeSet *tapeset = spill->tapeset;
+ int partition;
+ MinimalTuple tuple;
+ int tapenum;
+ int total_written = 0;
+ bool shouldFree;
+
+ Assert(spill->partitions != NULL);
+
+ /* XXX: may contain unnecessary attributes, should project */
+ tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);
+
+ partition = (hash & spill->mask) >> spill->shift;
+ spill->ntuples[partition]++;
+
+ tapenum = spill->partitions[partition];
+
+ LogicalTapeWrite(tapeset, tapenum, (void *) &hash, sizeof(uint32));
+ total_written += sizeof(uint32);
+
+ LogicalTapeWrite(tapeset, tapenum, (void *) tuple, tuple->t_len);
+ total_written += tuple->t_len;
+
+ if (shouldFree)
+ pfree(tuple);
+
+ return total_written;
+}
+
+/*
+ * hashagg_batch_new
+ *
+ * Construct a HashAggBatch item, which represents one iteration of HashAgg to
+ * be done.
+ */
+static HashAggBatch *
+hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno,
+ int64 input_tuples, int used_bits)
+{
+ HashAggBatch *batch = palloc0(sizeof(HashAggBatch));
+
+ batch->setno = setno;
+ batch->used_bits = used_bits;
+ batch->tapeset = tapeset;
+ batch->input_tapenum = tapenum;
+ batch->input_tuples = input_tuples;
+
+ return batch;
+}
+
+/*
+ * read_spilled_tuple
+ * read the next tuple from a batch's tape. Return NULL if no more.
+ */
+static MinimalTuple
+hashagg_batch_read(HashAggBatch *batch, uint32 *hashp)
+{
+ LogicalTapeSet *tapeset = batch->tapeset;
+ int tapenum = batch->input_tapenum;
+ MinimalTuple tuple;
+ uint32 t_len;
+ size_t nread;
+ uint32 hash;
+
+ nread = LogicalTapeRead(tapeset, tapenum, &hash, sizeof(uint32));
+ if (nread == 0)
+ return NULL;
+ if (nread != sizeof(uint32))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
+ tapenum, sizeof(uint32), nread)));
+ if (hashp != NULL)
+ *hashp = hash;
+
+ nread = LogicalTapeRead(tapeset, tapenum, &t_len, sizeof(t_len));
+ if (nread != sizeof(uint32))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
+ tapenum, sizeof(uint32), nread)));
+
+ tuple = (MinimalTuple) palloc(t_len);
+ tuple->t_len = t_len;
+
+ nread = LogicalTapeRead(tapeset, tapenum,
+ (void *)((char *)tuple + sizeof(uint32)),
+ t_len - sizeof(uint32));
+ if (nread != t_len - sizeof(uint32))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
+ tapenum, t_len - sizeof(uint32), nread)));
+
+ return tuple;
+}
+
+/*
+ * hashagg_finish_initial_spills
+ *
+ * After a HashAggBatch has been processed, it may have spilled tuples to
+ * disk. If so, turn the spilled partitions into new batches that must later
+ * be executed.
+ */
+static void
+hashagg_finish_initial_spills(AggState *aggstate)
+{
+ int setno;
+ int total_npartitions = 0;
+
+ if (aggstate->hash_spills != NULL)
+ {
+ for (setno = 0; setno < aggstate->num_hashes; setno++)
+ {
+ HashAggSpill *spill = &aggstate->hash_spills[setno];
+ total_npartitions += spill->npartitions;
+ hashagg_spill_finish(aggstate, spill, setno);
+ }
+
+ /*
+ * We're not processing tuples from outer plan any more; only
+ * processing batches of spilled tuples. The initial spill structures
+ * are no longer needed.
+ */
+ pfree(aggstate->hash_spills);
+ aggstate->hash_spills = NULL;
+ }
+
+ hash_agg_update_metrics(aggstate, false, total_npartitions);
+ aggstate->hash_spill_mode = false;
+}
+
+/*
+ * hashagg_spill_finish
+ *
+ * Transform spill partitions into new batches.
+ */
+static void
+hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
+{
+ int i;
+ int used_bits = 32 - spill->shift;
+
+ if (spill->npartitions == 0)
+ return; /* didn't spill */
+
+ for (i = 0; i < spill->npartitions; i++)
+ {
+ int tapenum = spill->partitions[i];
+ HashAggBatch *new_batch;
+
+ /* if the partition is empty, don't create a new batch of work */
+ if (spill->ntuples[i] == 0)
+ continue;
+
+ new_batch = hashagg_batch_new(aggstate->hash_tapeinfo->tapeset,
+ tapenum, setno, spill->ntuples[i],
+ used_bits);
+ aggstate->hash_batches = lcons(new_batch, aggstate->hash_batches);
+ aggstate->hash_batches_used++;
+ }
+
+ pfree(spill->ntuples);
+ pfree(spill->partitions);
+}
+
+/*
+ * Free resources related to a spilled HashAgg.
+ */
+static void
+hashagg_reset_spill_state(AggState *aggstate)
+{
+ ListCell *lc;
+
+ /* free spills from initial pass */
+ if (aggstate->hash_spills != NULL)
+ {
+ int setno;
+
+ for (setno = 0; setno < aggstate->num_hashes; setno++)
+ {
+ HashAggSpill *spill = &aggstate->hash_spills[setno];
+ pfree(spill->ntuples);
+ pfree(spill->partitions);
+ }
+ pfree(aggstate->hash_spills);
+ aggstate->hash_spills = NULL;
+ }
+
+ /* free batches */
+ foreach(lc, aggstate->hash_batches)
+ {
+ HashAggBatch *batch = (HashAggBatch*) lfirst(lc);
+ pfree(batch);
+ }
+ list_free(aggstate->hash_batches);
+ aggstate->hash_batches = NIL;
+
+ /* close tape set */
+ if (aggstate->hash_tapeinfo != NULL)
+ {
+ HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
+
+ LogicalTapeSetClose(tapeinfo->tapeset);
+ pfree(tapeinfo->freetapes);
+ pfree(tapeinfo);
+ aggstate->hash_tapeinfo = NULL;
+ }
+}
+
+
/* -----------------
* ExecInitAgg
*
@@ -2518,9 +3524,36 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
*/
if (use_hashing)
{
+ Plan *outerplan = outerPlan(node);
+ uint64 totalGroups = 0;
+ int i;
+
+ aggstate->hash_metacxt = AllocSetContextCreate(
+ aggstate->ss.ps.state->es_query_cxt,
+ "HashAgg meta context",
+ ALLOCSET_DEFAULT_SIZES);
+ aggstate->hash_spill_slot = ExecInitExtraTupleSlot(
+ estate, scanDesc, &TTSOpsMinimalTuple);
+
/* this is an array of pointers, not structures */
aggstate->hash_pergroup = pergroups;
+ aggstate->hashentrysize = hash_agg_entry_size(
+ aggstate->numtrans, outerplan->plan_width, node->transitionSpace);
+
+ /*
+ * Consider all of the grouping sets together when setting the limits
+ * and estimating the number of partitions. This can be inaccurate
+ * when there is more than one grouping set, but should still be
+ * reasonable.
+ */
+ for (i = 0; i < aggstate->num_hashes; i++)
+ totalGroups = aggstate->perhash[i].aggnode->numGroups;
+
+ hash_agg_set_limits(aggstate->hashentrysize, totalGroups, 0,
+ &aggstate->hash_mem_limit,
+ &aggstate->hash_ngroups_limit,
+ &aggstate->hash_planned_partitions);
find_hash_columns(aggstate);
build_hash_tables(aggstate);
aggstate->table_filled = false;
@@ -2931,6 +3964,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
phase->evaltrans = ExecBuildAggTrans(aggstate, phase, dosort, dohash,
false);
+ /* cache compiled expression for outer slot without NULL check */
+ phase->evaltrans_cache[0][0] = phase->evaltrans;
}
return aggstate;
@@ -3424,6 +4459,14 @@ ExecEndAgg(AggState *node)
if (node->sort_out)
tuplesort_end(node->sort_out);
+ hashagg_reset_spill_state(node);
+
+ if (node->hash_metacxt != NULL)
+ {
+ MemoryContextDelete(node->hash_metacxt);
+ node->hash_metacxt = NULL;
+ }
+
for (transno = 0; transno < node->numtrans; transno++)
{
AggStatePerTrans pertrans = &node->pertrans[transno];
@@ -3479,12 +4522,13 @@ ExecReScanAgg(AggState *node)
return;
/*
- * If we do have the hash table, and the subplan does not have any
- * parameter changes, and none of our own parameter changes affect
- * input expressions of the aggregated functions, then we can just
- * rescan the existing hash table; no need to build it again.
+ * If we do have the hash table, and it never spilled, and the subplan
+ * does not have any parameter changes, and none of our own parameter
+ * changes affect input expressions of the aggregated functions, then
+ * we can just rescan the existing hash table; no need to build it
+ * again.
*/
- if (outerPlan->chgParam == NULL &&
+ if (outerPlan->chgParam == NULL && !node->hash_ever_spilled &&
!bms_overlap(node->ss.ps.chgParam, aggnode->aggParams))
{
ResetTupleHashIterator(node->perhash[0].hashtable,
@@ -3541,11 +4585,19 @@ ExecReScanAgg(AggState *node)
*/
if (node->aggstrategy == AGG_HASHED || node->aggstrategy == AGG_MIXED)
{
+ hashagg_reset_spill_state(node);
+
+ node->hash_ever_spilled = false;
+ node->hash_spill_mode = false;
+ node->hash_ngroups_current = 0;
+
ReScanExprContext(node->hashcontext);
/* Rebuild an empty hash table */
build_hash_tables(node);
node->table_filled = false;
/* iterator will be reset when the table is filled */
+
+ hashagg_recompile_expressions(node, false, false);
}
if (node->aggstrategy != AGG_HASHED)