diff options
Diffstat (limited to 'src/include')
-rw-r--r-- | src/include/executor/hashjoin.h | 175 | ||||
-rw-r--r-- | src/include/executor/nodeHash.h | 24 | ||||
-rw-r--r-- | src/include/executor/nodeHashjoin.h | 6 | ||||
-rw-r--r-- | src/include/nodes/execnodes.h | 6 | ||||
-rw-r--r-- | src/include/nodes/plannodes.h | 1 | ||||
-rw-r--r-- | src/include/nodes/relation.h | 2 | ||||
-rw-r--r-- | src/include/optimizer/cost.h | 4 | ||||
-rw-r--r-- | src/include/optimizer/pathnode.h | 1 | ||||
-rw-r--r-- | src/include/pgstat.h | 15 | ||||
-rw-r--r-- | src/include/storage/lwlock.h | 1 |
10 files changed, 227 insertions, 8 deletions
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index 82acadf85ba..d8c82d4e7c0 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -15,7 +15,10 @@ #define HASHJOIN_H #include "nodes/execnodes.h" +#include "port/atomics.h" +#include "storage/barrier.h" #include "storage/buffile.h" +#include "storage/lwlock.h" /* ---------------------------------------------------------------- * hash-join hash table structures @@ -63,7 +66,12 @@ typedef struct HashJoinTupleData { - struct HashJoinTupleData *next; /* link to next tuple in same bucket */ + /* link to next tuple in same bucket */ + union + { + struct HashJoinTupleData *unshared; + dsa_pointer shared; + } next; uint32 hashvalue; /* tuple's hash code */ /* Tuple data, in MinimalTuple format, follows on a MAXALIGN boundary */ } HashJoinTupleData; @@ -112,8 +120,12 @@ typedef struct HashMemoryChunkData size_t maxlen; /* size of the buffer holding the tuples */ size_t used; /* number of buffer bytes already used */ - struct HashMemoryChunkData *next; /* pointer to the next chunk (linked - * list) */ + /* pointer to the next chunk (linked list) */ + union + { + struct HashMemoryChunkData *unshared; + dsa_pointer shared; + } next; char data[FLEXIBLE_ARRAY_MEMBER]; /* buffer allocated at the end */ } HashMemoryChunkData; @@ -121,8 +133,148 @@ typedef struct HashMemoryChunkData typedef struct HashMemoryChunkData *HashMemoryChunk; #define HASH_CHUNK_SIZE (32 * 1024L) +#define HASH_CHUNK_HEADER_SIZE (offsetof(HashMemoryChunkData, data)) #define HASH_CHUNK_THRESHOLD (HASH_CHUNK_SIZE / 4) +/* + * For each batch of a Parallel Hash Join, we have a ParallelHashJoinBatch + * object in shared memory to coordinate access to it. Since they are + * followed by variable-sized objects, they are arranged in contiguous memory + * but not accessed directly as an array. + */ +typedef struct ParallelHashJoinBatch +{ + dsa_pointer buckets; /* array of hash table buckets */ + Barrier batch_barrier; /* synchronization for joining this batch */ + + dsa_pointer chunks; /* chunks of tuples loaded */ + size_t size; /* size of buckets + chunks in memory */ + size_t estimated_size; /* size of buckets + chunks while writing */ + size_t ntuples; /* number of tuples loaded */ + size_t old_ntuples; /* number of tuples before repartitioning */ + bool space_exhausted; + + /* + * Variable-sized SharedTuplestore objects follow this struct in memory. + * See the accessor macros below. + */ +} ParallelHashJoinBatch; + +/* Accessor for inner batch tuplestore following a ParallelHashJoinBatch. */ +#define ParallelHashJoinBatchInner(batch) \ + ((SharedTuplestore *) \ + ((char *) (batch) + MAXALIGN(sizeof(ParallelHashJoinBatch)))) + +/* Accessor for outer batch tuplestore following a ParallelHashJoinBatch. */ +#define ParallelHashJoinBatchOuter(batch, nparticipants) \ + ((SharedTuplestore *) \ + ((char *) ParallelHashJoinBatchInner(batch) + \ + MAXALIGN(sts_estimate(nparticipants)))) + +/* Total size of a ParallelHashJoinBatch and tuplestores. */ +#define EstimateParallelHashJoinBatch(hashtable) \ + (MAXALIGN(sizeof(ParallelHashJoinBatch)) + \ + MAXALIGN(sts_estimate((hashtable)->parallel_state->nparticipants)) * 2) + +/* Accessor for the nth ParallelHashJoinBatch given the base. */ +#define NthParallelHashJoinBatch(base, n) \ + ((ParallelHashJoinBatch *) \ + ((char *) (base) + \ + EstimateParallelHashJoinBatch(hashtable) * (n))) + +/* + * Each backend requires a small amount of per-batch state to interact with + * each ParalellHashJoinBatch. + */ +typedef struct ParallelHashJoinBatchAccessor +{ + ParallelHashJoinBatch *shared; /* pointer to shared state */ + + /* Per-backend partial counters to reduce contention. */ + size_t preallocated; /* pre-allocated space for this backend */ + size_t ntuples; /* number of tuples */ + size_t size; /* size of partition in memory */ + size_t estimated_size; /* size of partition on disk */ + size_t old_ntuples; /* how many tuples before repartioning? */ + bool at_least_one_chunk; /* has this backend allocated a chunk? */ + + bool done; /* flag to remember that a batch is done */ + SharedTuplestoreAccessor *inner_tuples; + SharedTuplestoreAccessor *outer_tuples; +} ParallelHashJoinBatchAccessor; + +/* + * While hashing the inner relation, any participant might determine that it's + * time to increase the number of buckets to reduce the load factor or batches + * to reduce the memory size. This is indicated by setting the growth flag to + * these values. + */ +typedef enum ParallelHashGrowth +{ + /* The current dimensions are sufficient. */ + PHJ_GROWTH_OK, + /* The load factor is too high, so we need to add buckets. */ + PHJ_GROWTH_NEED_MORE_BUCKETS, + /* The memory budget would be exhausted, so we need to repartition. */ + PHJ_GROWTH_NEED_MORE_BATCHES, + /* Repartitioning didn't help last time, so don't try to do that again. */ + PHJ_GROWTH_DISABLED +} ParallelHashGrowth; + +/* + * The shared state used to coordinate a Parallel Hash Join. This is stored + * in the DSM segment. + */ +typedef struct ParallelHashJoinState +{ + dsa_pointer batches; /* array of ParallelHashJoinBatch */ + dsa_pointer old_batches; /* previous generation during repartition */ + int nbatch; /* number of batches now */ + int old_nbatch; /* previous number of batches */ + int nbuckets; /* number of buckets */ + ParallelHashGrowth growth; /* control batch/bucket growth */ + dsa_pointer chunk_work_queue; /* chunk work queue */ + int nparticipants; + size_t space_allowed; + size_t total_tuples; /* total number of inner tuples */ + LWLock lock; /* lock protecting the above */ + + Barrier build_barrier; /* synchronization for the build phases */ + Barrier grow_batches_barrier; + Barrier grow_buckets_barrier; + pg_atomic_uint32 distributor; /* counter for load balancing */ + + SharedFileSet fileset; /* space for shared temporary files */ +} ParallelHashJoinState; + +/* The phases for building batches, used by build_barrier. */ +#define PHJ_BUILD_ELECTING 0 +#define PHJ_BUILD_ALLOCATING 1 +#define PHJ_BUILD_HASHING_INNER 2 +#define PHJ_BUILD_HASHING_OUTER 3 +#define PHJ_BUILD_DONE 4 + +/* The phases for probing each batch, used by for batch_barrier. */ +#define PHJ_BATCH_ELECTING 0 +#define PHJ_BATCH_ALLOCATING 1 +#define PHJ_BATCH_LOADING 2 +#define PHJ_BATCH_PROBING 3 +#define PHJ_BATCH_DONE 4 + +/* The phases of batch growth while hashing, for grow_batches_barrier. */ +#define PHJ_GROW_BATCHES_ELECTING 0 +#define PHJ_GROW_BATCHES_ALLOCATING 1 +#define PHJ_GROW_BATCHES_REPARTITIONING 2 +#define PHJ_GROW_BATCHES_DECIDING 3 +#define PHJ_GROW_BATCHES_FINISHING 4 +#define PHJ_GROW_BATCHES_PHASE(n) ((n) % 5) /* circular phases */ + +/* The phases of bucket growth while hashing, for grow_buckets_barrier. */ +#define PHJ_GROW_BUCKETS_ELECTING 0 +#define PHJ_GROW_BUCKETS_ALLOCATING 1 +#define PHJ_GROW_BUCKETS_REINSERTING 2 +#define PHJ_GROW_BUCKETS_PHASE(n) ((n) % 3) /* circular phases */ + typedef struct HashJoinTableData { int nbuckets; /* # buckets in the in-memory hash table */ @@ -133,8 +285,13 @@ typedef struct HashJoinTableData int log2_nbuckets_optimal; /* log2(nbuckets_optimal) */ /* buckets[i] is head of list of tuples in i'th in-memory bucket */ - struct HashJoinTupleData **buckets; - /* buckets array is per-batch storage, as are all the tuples */ + union + { + /* unshared array is per-batch storage, as are all the tuples */ + struct HashJoinTupleData **unshared; + /* shared array is per-query DSA area, as are all the tuples */ + dsa_pointer_atomic *shared; + } buckets; bool keepNulls; /* true to store unmatchable NULL tuples */ @@ -153,6 +310,7 @@ typedef struct HashJoinTableData bool growEnabled; /* flag to shut off nbatch increases */ double totalTuples; /* # tuples obtained from inner plan */ + double partialTuples; /* # tuples obtained from inner plan by me */ double skewTuples; /* # tuples inserted into skew tuples */ /* @@ -185,6 +343,13 @@ typedef struct HashJoinTableData /* used for dense allocation of tuples (into linked chunks) */ HashMemoryChunk chunks; /* one list for the whole batch */ + + /* Shared and private state for Parallel Hash. */ + HashMemoryChunk current_chunk; /* this backend's current chunk */ + dsa_area *area; /* DSA area to allocate memory from */ + ParallelHashJoinState *parallel_state; + ParallelHashJoinBatchAccessor *batches; + dsa_pointer current_chunk_shared; } HashJoinTableData; #endif /* HASHJOIN_H */ diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h index 0974f1edc21..84c166b3951 100644 --- a/src/include/executor/nodeHash.h +++ b/src/include/executor/nodeHash.h @@ -17,17 +17,33 @@ #include "access/parallel.h" #include "nodes/execnodes.h" +struct SharedHashJoinBatch; + extern HashState *ExecInitHash(Hash *node, EState *estate, int eflags); extern Node *MultiExecHash(HashState *node); extern void ExecEndHash(HashState *node); extern void ExecReScanHash(HashState *node); -extern HashJoinTable ExecHashTableCreate(Hash *node, List *hashOperators, +extern HashJoinTable ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls); +extern void ExecParallelHashTableAlloc(HashJoinTable hashtable, + int batchno); extern void ExecHashTableDestroy(HashJoinTable hashtable); +extern void ExecHashTableDetach(HashJoinTable hashtable); +extern void ExecHashTableDetachBatch(HashJoinTable hashtable); +extern void ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, + int batchno); +void ExecParallelHashUpdateSpacePeak(HashJoinTable hashtable, int batchno); + extern void ExecHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue); +extern void ExecParallelHashTableInsert(HashJoinTable hashtable, + TupleTableSlot *slot, + uint32 hashvalue); +extern void ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable, + TupleTableSlot *slot, + uint32 hashvalue); extern bool ExecHashGetHashValue(HashJoinTable hashtable, ExprContext *econtext, List *hashkeys, @@ -39,12 +55,16 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable, int *bucketno, int *batchno); extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext); +extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext); extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate); extern bool ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext); extern void ExecHashTableReset(HashJoinTable hashtable); extern void ExecHashTableResetMatchFlags(HashJoinTable hashtable); extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, + bool try_combined_work_mem, + int parallel_workers, + size_t *space_allowed, int *numbuckets, int *numbatches, int *num_skew_mcvs); @@ -55,6 +75,6 @@ extern void ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwc extern void ExecHashRetrieveInstrumentation(HashState *node); extern void ExecShutdownHash(HashState *node); extern void ExecHashGetInstrumentation(HashInstrumentation *instrument, - HashJoinTable hashtable); + HashJoinTable hashtable); #endif /* NODEHASH_H */ diff --git a/src/include/executor/nodeHashjoin.h b/src/include/executor/nodeHashjoin.h index 7469bfbf60c..8469085d7e3 100644 --- a/src/include/executor/nodeHashjoin.h +++ b/src/include/executor/nodeHashjoin.h @@ -20,6 +20,12 @@ extern HashJoinState *ExecInitHashJoin(HashJoin *node, EState *estate, int eflags); extern void ExecEndHashJoin(HashJoinState *node); extern void ExecReScanHashJoin(HashJoinState *node); +extern void ExecShutdownHashJoin(HashJoinState *node); +extern void ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt); +extern void ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt); +extern void ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *pcxt); +extern void ExecHashJoinInitializeWorker(HashJoinState *state, + ParallelWorkerContext *pwcxt); extern void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, BufFile **fileptr); diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 1a35c5c9ada..44d8c47d2c2 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -25,6 +25,7 @@ #include "utils/hsearch.h" #include "utils/queryenvironment.h" #include "utils/reltrigger.h" +#include "utils/sharedtuplestore.h" #include "utils/sortsupport.h" #include "utils/tuplestore.h" #include "utils/tuplesort.h" @@ -43,6 +44,8 @@ struct ExprState; /* forward references in this file */ struct ExprContext; struct ExprEvalStep; /* avoid including execExpr.h everywhere */ +struct ParallelHashJoinState; + typedef Datum (*ExprStateEvalFunc) (struct ExprState *expression, struct ExprContext *econtext, bool *isNull); @@ -2026,6 +2029,9 @@ typedef struct HashState SharedHashInfo *shared_info; /* one entry per worker */ HashInstrumentation *hinstrument; /* this worker's entry */ + + /* Parallel hash state. */ + struct ParallelHashJoinState *parallel_state; } HashState; /* ---------------- diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 02fb366680f..d763da647b8 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -880,6 +880,7 @@ typedef struct Hash AttrNumber skewColumn; /* outer join key's column #, or zero */ bool skewInherit; /* is outer join rel an inheritance tree? */ /* all other info is in the parent HashJoin node */ + double rows_total; /* estimate total rows if parallel_aware */ } Hash; /* ---------------- diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h index 1108b6a0ea0..3b9d303ce4e 100644 --- a/src/include/nodes/relation.h +++ b/src/include/nodes/relation.h @@ -1464,6 +1464,7 @@ typedef struct HashPath JoinPath jpath; List *path_hashclauses; /* join clauses used for hashing */ int num_batches; /* number of batches expected */ + double inner_rows_total; /* total inner rows expected */ } HashPath; /* @@ -2315,6 +2316,7 @@ typedef struct JoinCostWorkspace /* private for cost_hashjoin code */ int numbuckets; int numbatches; + double inner_rows_total; } JoinCostWorkspace; #endif /* RELATION_H */ diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index 5a1fbf97c38..27afc2eaeb3 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -69,6 +69,7 @@ extern bool enable_hashjoin; extern bool enable_gathermerge; extern bool enable_partition_wise_join; extern bool enable_parallel_append; +extern bool enable_parallel_hash; extern int constraint_exclusion; extern double clamp_row_est(double nrows); @@ -153,7 +154,8 @@ extern void initial_cost_hashjoin(PlannerInfo *root, JoinType jointype, List *hashclauses, Path *outer_path, Path *inner_path, - JoinPathExtraData *extra); + JoinPathExtraData *extra, + bool parallel_hash); extern void final_cost_hashjoin(PlannerInfo *root, HashPath *path, JoinCostWorkspace *workspace, JoinPathExtraData *extra); diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index 99f65b44f22..3ef12b323bb 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -153,6 +153,7 @@ extern HashPath *create_hashjoin_path(PlannerInfo *root, JoinPathExtraData *extra, Path *outer_path, Path *inner_path, + bool parallel_hash, List *restrict_clauses, Relids required_outer, List *hashclauses); diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 089b7c3a108..58f3a19bc61 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -803,6 +803,21 @@ typedef enum WAIT_EVENT_BGWORKER_STARTUP, WAIT_EVENT_BTREE_PAGE, WAIT_EVENT_EXECUTE_GATHER, + WAIT_EVENT_HASH_BATCH_ALLOCATING, + WAIT_EVENT_HASH_BATCH_ELECTING, + WAIT_EVENT_HASH_BATCH_LOADING, + WAIT_EVENT_HASH_BUILD_ALLOCATING, + WAIT_EVENT_HASH_BUILD_ELECTING, + WAIT_EVENT_HASH_BUILD_HASHING_INNER, + WAIT_EVENT_HASH_BUILD_HASHING_OUTER, + WAIT_EVENT_HASH_GROW_BATCHES_ELECTING, + WAIT_EVENT_HASH_GROW_BATCHES_FINISHING, + WAIT_EVENT_HASH_GROW_BATCHES_REPARTITIONING, + WAIT_EVENT_HASH_GROW_BATCHES_ALLOCATING, + WAIT_EVENT_HASH_GROW_BATCHES_DECIDING, + WAIT_EVENT_HASH_GROW_BUCKETS_ELECTING, + WAIT_EVENT_HASH_GROW_BUCKETS_REINSERTING, + WAIT_EVENT_HASH_GROW_BUCKETS_ALLOCATING, WAIT_EVENT_LOGICAL_SYNC_DATA, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE, WAIT_EVENT_MQ_INTERNAL, diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index a347ee4d7de..97e4a0bbbd3 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -211,6 +211,7 @@ typedef enum BuiltinTrancheIds LWTRANCHE_BUFFER_MAPPING, LWTRANCHE_LOCK_MANAGER, LWTRANCHE_PREDICATE_LOCK_MANAGER, + LWTRANCHE_PARALLEL_HASH_JOIN, LWTRANCHE_PARALLEL_QUERY_DSA, LWTRANCHE_SESSION_DSA, LWTRANCHE_SESSION_RECORD_TABLE, |