aboutsummaryrefslogtreecommitdiff
path: root/src/include
diff options
context:
space:
mode:
Diffstat (limited to 'src/include')
-rw-r--r--src/include/executor/hashjoin.h175
-rw-r--r--src/include/executor/nodeHash.h24
-rw-r--r--src/include/executor/nodeHashjoin.h6
-rw-r--r--src/include/nodes/execnodes.h6
-rw-r--r--src/include/nodes/plannodes.h1
-rw-r--r--src/include/nodes/relation.h2
-rw-r--r--src/include/optimizer/cost.h4
-rw-r--r--src/include/optimizer/pathnode.h1
-rw-r--r--src/include/pgstat.h15
-rw-r--r--src/include/storage/lwlock.h1
10 files changed, 227 insertions, 8 deletions
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index 82acadf85ba..d8c82d4e7c0 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -15,7 +15,10 @@
#define HASHJOIN_H
#include "nodes/execnodes.h"
+#include "port/atomics.h"
+#include "storage/barrier.h"
#include "storage/buffile.h"
+#include "storage/lwlock.h"
/* ----------------------------------------------------------------
* hash-join hash table structures
@@ -63,7 +66,12 @@
typedef struct HashJoinTupleData
{
- struct HashJoinTupleData *next; /* link to next tuple in same bucket */
+ /* link to next tuple in same bucket */
+ union
+ {
+ struct HashJoinTupleData *unshared;
+ dsa_pointer shared;
+ } next;
uint32 hashvalue; /* tuple's hash code */
/* Tuple data, in MinimalTuple format, follows on a MAXALIGN boundary */
} HashJoinTupleData;
@@ -112,8 +120,12 @@ typedef struct HashMemoryChunkData
size_t maxlen; /* size of the buffer holding the tuples */
size_t used; /* number of buffer bytes already used */
- struct HashMemoryChunkData *next; /* pointer to the next chunk (linked
- * list) */
+ /* pointer to the next chunk (linked list) */
+ union
+ {
+ struct HashMemoryChunkData *unshared;
+ dsa_pointer shared;
+ } next;
char data[FLEXIBLE_ARRAY_MEMBER]; /* buffer allocated at the end */
} HashMemoryChunkData;
@@ -121,8 +133,148 @@ typedef struct HashMemoryChunkData
typedef struct HashMemoryChunkData *HashMemoryChunk;
#define HASH_CHUNK_SIZE (32 * 1024L)
+#define HASH_CHUNK_HEADER_SIZE (offsetof(HashMemoryChunkData, data))
#define HASH_CHUNK_THRESHOLD (HASH_CHUNK_SIZE / 4)
+/*
+ * For each batch of a Parallel Hash Join, we have a ParallelHashJoinBatch
+ * object in shared memory to coordinate access to it. Since they are
+ * followed by variable-sized objects, they are arranged in contiguous memory
+ * but not accessed directly as an array.
+ */
+typedef struct ParallelHashJoinBatch
+{
+ dsa_pointer buckets; /* array of hash table buckets */
+ Barrier batch_barrier; /* synchronization for joining this batch */
+
+ dsa_pointer chunks; /* chunks of tuples loaded */
+ size_t size; /* size of buckets + chunks in memory */
+ size_t estimated_size; /* size of buckets + chunks while writing */
+ size_t ntuples; /* number of tuples loaded */
+ size_t old_ntuples; /* number of tuples before repartitioning */
+ bool space_exhausted;
+
+ /*
+ * Variable-sized SharedTuplestore objects follow this struct in memory.
+ * See the accessor macros below.
+ */
+} ParallelHashJoinBatch;
+
+/* Accessor for inner batch tuplestore following a ParallelHashJoinBatch. */
+#define ParallelHashJoinBatchInner(batch) \
+ ((SharedTuplestore *) \
+ ((char *) (batch) + MAXALIGN(sizeof(ParallelHashJoinBatch))))
+
+/* Accessor for outer batch tuplestore following a ParallelHashJoinBatch. */
+#define ParallelHashJoinBatchOuter(batch, nparticipants) \
+ ((SharedTuplestore *) \
+ ((char *) ParallelHashJoinBatchInner(batch) + \
+ MAXALIGN(sts_estimate(nparticipants))))
+
+/* Total size of a ParallelHashJoinBatch and tuplestores. */
+#define EstimateParallelHashJoinBatch(hashtable) \
+ (MAXALIGN(sizeof(ParallelHashJoinBatch)) + \
+ MAXALIGN(sts_estimate((hashtable)->parallel_state->nparticipants)) * 2)
+
+/* Accessor for the nth ParallelHashJoinBatch given the base. */
+#define NthParallelHashJoinBatch(base, n) \
+ ((ParallelHashJoinBatch *) \
+ ((char *) (base) + \
+ EstimateParallelHashJoinBatch(hashtable) * (n)))
+
+/*
+ * Each backend requires a small amount of per-batch state to interact with
+ * each ParalellHashJoinBatch.
+ */
+typedef struct ParallelHashJoinBatchAccessor
+{
+ ParallelHashJoinBatch *shared; /* pointer to shared state */
+
+ /* Per-backend partial counters to reduce contention. */
+ size_t preallocated; /* pre-allocated space for this backend */
+ size_t ntuples; /* number of tuples */
+ size_t size; /* size of partition in memory */
+ size_t estimated_size; /* size of partition on disk */
+ size_t old_ntuples; /* how many tuples before repartioning? */
+ bool at_least_one_chunk; /* has this backend allocated a chunk? */
+
+ bool done; /* flag to remember that a batch is done */
+ SharedTuplestoreAccessor *inner_tuples;
+ SharedTuplestoreAccessor *outer_tuples;
+} ParallelHashJoinBatchAccessor;
+
+/*
+ * While hashing the inner relation, any participant might determine that it's
+ * time to increase the number of buckets to reduce the load factor or batches
+ * to reduce the memory size. This is indicated by setting the growth flag to
+ * these values.
+ */
+typedef enum ParallelHashGrowth
+{
+ /* The current dimensions are sufficient. */
+ PHJ_GROWTH_OK,
+ /* The load factor is too high, so we need to add buckets. */
+ PHJ_GROWTH_NEED_MORE_BUCKETS,
+ /* The memory budget would be exhausted, so we need to repartition. */
+ PHJ_GROWTH_NEED_MORE_BATCHES,
+ /* Repartitioning didn't help last time, so don't try to do that again. */
+ PHJ_GROWTH_DISABLED
+} ParallelHashGrowth;
+
+/*
+ * The shared state used to coordinate a Parallel Hash Join. This is stored
+ * in the DSM segment.
+ */
+typedef struct ParallelHashJoinState
+{
+ dsa_pointer batches; /* array of ParallelHashJoinBatch */
+ dsa_pointer old_batches; /* previous generation during repartition */
+ int nbatch; /* number of batches now */
+ int old_nbatch; /* previous number of batches */
+ int nbuckets; /* number of buckets */
+ ParallelHashGrowth growth; /* control batch/bucket growth */
+ dsa_pointer chunk_work_queue; /* chunk work queue */
+ int nparticipants;
+ size_t space_allowed;
+ size_t total_tuples; /* total number of inner tuples */
+ LWLock lock; /* lock protecting the above */
+
+ Barrier build_barrier; /* synchronization for the build phases */
+ Barrier grow_batches_barrier;
+ Barrier grow_buckets_barrier;
+ pg_atomic_uint32 distributor; /* counter for load balancing */
+
+ SharedFileSet fileset; /* space for shared temporary files */
+} ParallelHashJoinState;
+
+/* The phases for building batches, used by build_barrier. */
+#define PHJ_BUILD_ELECTING 0
+#define PHJ_BUILD_ALLOCATING 1
+#define PHJ_BUILD_HASHING_INNER 2
+#define PHJ_BUILD_HASHING_OUTER 3
+#define PHJ_BUILD_DONE 4
+
+/* The phases for probing each batch, used by for batch_barrier. */
+#define PHJ_BATCH_ELECTING 0
+#define PHJ_BATCH_ALLOCATING 1
+#define PHJ_BATCH_LOADING 2
+#define PHJ_BATCH_PROBING 3
+#define PHJ_BATCH_DONE 4
+
+/* The phases of batch growth while hashing, for grow_batches_barrier. */
+#define PHJ_GROW_BATCHES_ELECTING 0
+#define PHJ_GROW_BATCHES_ALLOCATING 1
+#define PHJ_GROW_BATCHES_REPARTITIONING 2
+#define PHJ_GROW_BATCHES_DECIDING 3
+#define PHJ_GROW_BATCHES_FINISHING 4
+#define PHJ_GROW_BATCHES_PHASE(n) ((n) % 5) /* circular phases */
+
+/* The phases of bucket growth while hashing, for grow_buckets_barrier. */
+#define PHJ_GROW_BUCKETS_ELECTING 0
+#define PHJ_GROW_BUCKETS_ALLOCATING 1
+#define PHJ_GROW_BUCKETS_REINSERTING 2
+#define PHJ_GROW_BUCKETS_PHASE(n) ((n) % 3) /* circular phases */
+
typedef struct HashJoinTableData
{
int nbuckets; /* # buckets in the in-memory hash table */
@@ -133,8 +285,13 @@ typedef struct HashJoinTableData
int log2_nbuckets_optimal; /* log2(nbuckets_optimal) */
/* buckets[i] is head of list of tuples in i'th in-memory bucket */
- struct HashJoinTupleData **buckets;
- /* buckets array is per-batch storage, as are all the tuples */
+ union
+ {
+ /* unshared array is per-batch storage, as are all the tuples */
+ struct HashJoinTupleData **unshared;
+ /* shared array is per-query DSA area, as are all the tuples */
+ dsa_pointer_atomic *shared;
+ } buckets;
bool keepNulls; /* true to store unmatchable NULL tuples */
@@ -153,6 +310,7 @@ typedef struct HashJoinTableData
bool growEnabled; /* flag to shut off nbatch increases */
double totalTuples; /* # tuples obtained from inner plan */
+ double partialTuples; /* # tuples obtained from inner plan by me */
double skewTuples; /* # tuples inserted into skew tuples */
/*
@@ -185,6 +343,13 @@ typedef struct HashJoinTableData
/* used for dense allocation of tuples (into linked chunks) */
HashMemoryChunk chunks; /* one list for the whole batch */
+
+ /* Shared and private state for Parallel Hash. */
+ HashMemoryChunk current_chunk; /* this backend's current chunk */
+ dsa_area *area; /* DSA area to allocate memory from */
+ ParallelHashJoinState *parallel_state;
+ ParallelHashJoinBatchAccessor *batches;
+ dsa_pointer current_chunk_shared;
} HashJoinTableData;
#endif /* HASHJOIN_H */
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index 0974f1edc21..84c166b3951 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -17,17 +17,33 @@
#include "access/parallel.h"
#include "nodes/execnodes.h"
+struct SharedHashJoinBatch;
+
extern HashState *ExecInitHash(Hash *node, EState *estate, int eflags);
extern Node *MultiExecHash(HashState *node);
extern void ExecEndHash(HashState *node);
extern void ExecReScanHash(HashState *node);
-extern HashJoinTable ExecHashTableCreate(Hash *node, List *hashOperators,
+extern HashJoinTable ExecHashTableCreate(HashState *state, List *hashOperators,
bool keepNulls);
+extern void ExecParallelHashTableAlloc(HashJoinTable hashtable,
+ int batchno);
extern void ExecHashTableDestroy(HashJoinTable hashtable);
+extern void ExecHashTableDetach(HashJoinTable hashtable);
+extern void ExecHashTableDetachBatch(HashJoinTable hashtable);
+extern void ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable,
+ int batchno);
+void ExecParallelHashUpdateSpacePeak(HashJoinTable hashtable, int batchno);
+
extern void ExecHashTableInsert(HashJoinTable hashtable,
TupleTableSlot *slot,
uint32 hashvalue);
+extern void ExecParallelHashTableInsert(HashJoinTable hashtable,
+ TupleTableSlot *slot,
+ uint32 hashvalue);
+extern void ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable,
+ TupleTableSlot *slot,
+ uint32 hashvalue);
extern bool ExecHashGetHashValue(HashJoinTable hashtable,
ExprContext *econtext,
List *hashkeys,
@@ -39,12 +55,16 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable,
int *bucketno,
int *batchno);
extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
+extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate);
extern bool ExecScanHashTableForUnmatched(HashJoinState *hjstate,
ExprContext *econtext);
extern void ExecHashTableReset(HashJoinTable hashtable);
extern void ExecHashTableResetMatchFlags(HashJoinTable hashtable);
extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
+ bool try_combined_work_mem,
+ int parallel_workers,
+ size_t *space_allowed,
int *numbuckets,
int *numbatches,
int *num_skew_mcvs);
@@ -55,6 +75,6 @@ extern void ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwc
extern void ExecHashRetrieveInstrumentation(HashState *node);
extern void ExecShutdownHash(HashState *node);
extern void ExecHashGetInstrumentation(HashInstrumentation *instrument,
- HashJoinTable hashtable);
+ HashJoinTable hashtable);
#endif /* NODEHASH_H */
diff --git a/src/include/executor/nodeHashjoin.h b/src/include/executor/nodeHashjoin.h
index 7469bfbf60c..8469085d7e3 100644
--- a/src/include/executor/nodeHashjoin.h
+++ b/src/include/executor/nodeHashjoin.h
@@ -20,6 +20,12 @@
extern HashJoinState *ExecInitHashJoin(HashJoin *node, EState *estate, int eflags);
extern void ExecEndHashJoin(HashJoinState *node);
extern void ExecReScanHashJoin(HashJoinState *node);
+extern void ExecShutdownHashJoin(HashJoinState *node);
+extern void ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt);
+extern void ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt);
+extern void ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *pcxt);
+extern void ExecHashJoinInitializeWorker(HashJoinState *state,
+ ParallelWorkerContext *pwcxt);
extern void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
BufFile **fileptr);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 1a35c5c9ada..44d8c47d2c2 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -25,6 +25,7 @@
#include "utils/hsearch.h"
#include "utils/queryenvironment.h"
#include "utils/reltrigger.h"
+#include "utils/sharedtuplestore.h"
#include "utils/sortsupport.h"
#include "utils/tuplestore.h"
#include "utils/tuplesort.h"
@@ -43,6 +44,8 @@ struct ExprState; /* forward references in this file */
struct ExprContext;
struct ExprEvalStep; /* avoid including execExpr.h everywhere */
+struct ParallelHashJoinState;
+
typedef Datum (*ExprStateEvalFunc) (struct ExprState *expression,
struct ExprContext *econtext,
bool *isNull);
@@ -2026,6 +2029,9 @@ typedef struct HashState
SharedHashInfo *shared_info; /* one entry per worker */
HashInstrumentation *hinstrument; /* this worker's entry */
+
+ /* Parallel hash state. */
+ struct ParallelHashJoinState *parallel_state;
} HashState;
/* ----------------
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index 02fb366680f..d763da647b8 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -880,6 +880,7 @@ typedef struct Hash
AttrNumber skewColumn; /* outer join key's column #, or zero */
bool skewInherit; /* is outer join rel an inheritance tree? */
/* all other info is in the parent HashJoin node */
+ double rows_total; /* estimate total rows if parallel_aware */
} Hash;
/* ----------------
diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h
index 1108b6a0ea0..3b9d303ce4e 100644
--- a/src/include/nodes/relation.h
+++ b/src/include/nodes/relation.h
@@ -1464,6 +1464,7 @@ typedef struct HashPath
JoinPath jpath;
List *path_hashclauses; /* join clauses used for hashing */
int num_batches; /* number of batches expected */
+ double inner_rows_total; /* total inner rows expected */
} HashPath;
/*
@@ -2315,6 +2316,7 @@ typedef struct JoinCostWorkspace
/* private for cost_hashjoin code */
int numbuckets;
int numbatches;
+ double inner_rows_total;
} JoinCostWorkspace;
#endif /* RELATION_H */
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index 5a1fbf97c38..27afc2eaeb3 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -69,6 +69,7 @@ extern bool enable_hashjoin;
extern bool enable_gathermerge;
extern bool enable_partition_wise_join;
extern bool enable_parallel_append;
+extern bool enable_parallel_hash;
extern int constraint_exclusion;
extern double clamp_row_est(double nrows);
@@ -153,7 +154,8 @@ extern void initial_cost_hashjoin(PlannerInfo *root,
JoinType jointype,
List *hashclauses,
Path *outer_path, Path *inner_path,
- JoinPathExtraData *extra);
+ JoinPathExtraData *extra,
+ bool parallel_hash);
extern void final_cost_hashjoin(PlannerInfo *root, HashPath *path,
JoinCostWorkspace *workspace,
JoinPathExtraData *extra);
diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h
index 99f65b44f22..3ef12b323bb 100644
--- a/src/include/optimizer/pathnode.h
+++ b/src/include/optimizer/pathnode.h
@@ -153,6 +153,7 @@ extern HashPath *create_hashjoin_path(PlannerInfo *root,
JoinPathExtraData *extra,
Path *outer_path,
Path *inner_path,
+ bool parallel_hash,
List *restrict_clauses,
Relids required_outer,
List *hashclauses);
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 089b7c3a108..58f3a19bc61 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -803,6 +803,21 @@ typedef enum
WAIT_EVENT_BGWORKER_STARTUP,
WAIT_EVENT_BTREE_PAGE,
WAIT_EVENT_EXECUTE_GATHER,
+ WAIT_EVENT_HASH_BATCH_ALLOCATING,
+ WAIT_EVENT_HASH_BATCH_ELECTING,
+ WAIT_EVENT_HASH_BATCH_LOADING,
+ WAIT_EVENT_HASH_BUILD_ALLOCATING,
+ WAIT_EVENT_HASH_BUILD_ELECTING,
+ WAIT_EVENT_HASH_BUILD_HASHING_INNER,
+ WAIT_EVENT_HASH_BUILD_HASHING_OUTER,
+ WAIT_EVENT_HASH_GROW_BATCHES_ELECTING,
+ WAIT_EVENT_HASH_GROW_BATCHES_FINISHING,
+ WAIT_EVENT_HASH_GROW_BATCHES_REPARTITIONING,
+ WAIT_EVENT_HASH_GROW_BATCHES_ALLOCATING,
+ WAIT_EVENT_HASH_GROW_BATCHES_DECIDING,
+ WAIT_EVENT_HASH_GROW_BUCKETS_ELECTING,
+ WAIT_EVENT_HASH_GROW_BUCKETS_REINSERTING,
+ WAIT_EVENT_HASH_GROW_BUCKETS_ALLOCATING,
WAIT_EVENT_LOGICAL_SYNC_DATA,
WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE,
WAIT_EVENT_MQ_INTERNAL,
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index a347ee4d7de..97e4a0bbbd3 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -211,6 +211,7 @@ typedef enum BuiltinTrancheIds
LWTRANCHE_BUFFER_MAPPING,
LWTRANCHE_LOCK_MANAGER,
LWTRANCHE_PREDICATE_LOCK_MANAGER,
+ LWTRANCHE_PARALLEL_HASH_JOIN,
LWTRANCHE_PARALLEL_QUERY_DSA,
LWTRANCHE_SESSION_DSA,
LWTRANCHE_SESSION_RECORD_TABLE,