diff options
Diffstat (limited to 'src/backend/executor/nodeHashjoin.c')
-rw-r--r-- | src/backend/executor/nodeHashjoin.c | 54 |
1 files changed, 38 insertions, 16 deletions
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 5532b91a71d..72256693f99 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -45,7 +45,8 @@ * PHJ_BUILD_ALLOCATING -- one sets up the batches and table 0 * PHJ_BUILD_HASHING_INNER -- all hash the inner rel * PHJ_BUILD_HASHING_OUTER -- (multi-batch only) all hash the outer - * PHJ_BUILD_DONE -- building done, probing can begin + * PHJ_BUILD_RUNNING -- building done, probing can begin + * PHJ_BUILD_DONE -- all work complete, one frees batches * * While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may * be used repeatedly as required to coordinate expansions in the number of @@ -73,7 +74,7 @@ * batches whenever it encounters them while scanning and probing, which it * can do because it processes batches in serial order. * - * Once PHJ_BUILD_DONE is reached, backends then split up and process + * Once PHJ_BUILD_RUNNING is reached, backends then split up and process * different batches, or gang up and work together on probing batches if there * aren't enough to go around. For each batch there is a separate barrier * with the following phases: @@ -95,11 +96,16 @@ * * To avoid deadlocks, we never wait for any barrier unless it is known that * all other backends attached to it are actively executing the node or have - * already arrived. Practically, that means that we never return a tuple - * while attached to a barrier, unless the barrier has reached its final - * state. In the slightly special case of the per-batch barrier, we return - * tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use - * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting. + * finished. Practically, that means that we never emit a tuple while attached + * to a barrier, unless the barrier has reached a phase that means that no + * process will wait on it again. We emit tuples while attached to the build + * barrier in phase PHJ_BUILD_RUNNING, and to a per-batch barrier in phase + * PHJ_BATCH_PROBING. These are advanced to PHJ_BUILD_DONE and PHJ_BATCH_DONE + * respectively without waiting, using BarrierArriveAndDetach(). The last to + * detach receives a different return value so that it knows that it's safe to + * clean up. Any straggler process that attaches after that phase is reached + * will see that it's too late to participate or access the relevant shared + * memory objects. * *------------------------------------------------------------------------- */ @@ -296,7 +302,21 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) * outer relation. */ if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node)) + { + if (parallel) + { + /* + * Advance the build barrier to PHJ_BUILD_RUNNING + * before proceeding so we can negotiate resource + * cleanup. + */ + Barrier *build_barrier = ¶llel_state->build_barrier; + + while (BarrierPhase(build_barrier) < PHJ_BUILD_RUNNING) + BarrierArriveAndWait(build_barrier, 0); + } return NULL; + } /* * need to remember whether nbatch has increased since we @@ -317,6 +337,7 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) build_barrier = ¶llel_state->build_barrier; Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER || + BarrierPhase(build_barrier) == PHJ_BUILD_RUNNING || BarrierPhase(build_barrier) == PHJ_BUILD_DONE); if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER) { @@ -329,9 +350,18 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_HASH_OUTER); } - Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE); + else if (BarrierPhase(build_barrier) == PHJ_BUILD_DONE) + { + /* + * If we attached so late that the job is finished and + * the batch state has been freed, we can return + * immediately. + */ + return NULL; + } /* Each backend should now select a batch to work on. */ + Assert(BarrierPhase(build_barrier) == PHJ_BUILD_RUNNING); hashtable->curbatch = -1; node->hj_JoinState = HJ_NEED_NEW_BATCH; @@ -1091,14 +1121,6 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) int batchno; /* - * If we started up so late that the batch tracking array has been freed - * already by ExecHashTableDetach(), then we are finished. See also - * ExecParallelHashEnsureBatchAccessors(). - */ - if (hashtable->batches == NULL) - return false; - - /* * If we were already attached to a batch, remember not to bother checking * it again, and detach from it (possibly freeing the hash table if we are * last to detach). |