diff options
Diffstat (limited to 'src/backend/executor/nodeGather.c')
-rw-r--r-- | src/backend/executor/nodeGather.c | 45 |
1 files changed, 26 insertions, 19 deletions
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index 313b2344540..93a566ba629 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -214,8 +214,11 @@ ExecGather(GatherState *node) /* * Reset per-tuple memory context to free any expression evaluation * storage allocated in the previous tuple cycle. Note we can't do this - * until we're done projecting. + * until we're done projecting. This will also clear any previous tuple + * returned by a TupleQueueReader; to make sure we don't leave a dangling + * pointer around, clear the working slot first. */ + ExecClearTuple(node->funnel_slot); econtext = node->ps.ps_ExprContext; ResetExprContext(econtext); @@ -274,13 +277,19 @@ gather_getnext(GatherState *gatherstate) PlanState *outerPlan = outerPlanState(gatherstate); TupleTableSlot *outerTupleSlot; TupleTableSlot *fslot = gatherstate->funnel_slot; + MemoryContext tupleContext = gatherstate->ps.ps_ExprContext->ecxt_per_tuple_memory; HeapTuple tup; while (gatherstate->reader != NULL || gatherstate->need_to_scan_locally) { if (gatherstate->reader != NULL) { + MemoryContext oldContext; + + /* Run TupleQueueReaders in per-tuple context */ + oldContext = MemoryContextSwitchTo(tupleContext); tup = gather_readnext(gatherstate); + MemoryContextSwitchTo(oldContext); if (HeapTupleIsValid(tup)) { @@ -288,8 +297,7 @@ gather_getnext(GatherState *gatherstate) fslot, /* slot in which to store the tuple */ InvalidBuffer, /* buffer associated with this * tuple */ - true); /* pfree this pointer if not from heap */ - + false); /* slot should not pfree tuple */ return fslot; } } @@ -314,7 +322,7 @@ gather_getnext(GatherState *gatherstate) static HeapTuple gather_readnext(GatherState *gatherstate) { - int waitpos = gatherstate->nextreader; + int nvisited = 0; for (;;) { @@ -335,6 +343,7 @@ gather_readnext(GatherState *gatherstate) */ if (readerdone) { + Assert(!tup); DestroyTupleQueueReader(reader); --gatherstate->nreaders; if (gatherstate->nreaders == 0) @@ -342,17 +351,12 @@ gather_readnext(GatherState *gatherstate) ExecShutdownGatherWorkers(gatherstate); return NULL; } - else - { - memmove(&gatherstate->reader[gatherstate->nextreader], - &gatherstate->reader[gatherstate->nextreader + 1], - sizeof(TupleQueueReader *) - * (gatherstate->nreaders - gatherstate->nextreader)); - if (gatherstate->nextreader >= gatherstate->nreaders) - gatherstate->nextreader = 0; - if (gatherstate->nextreader < waitpos) - --waitpos; - } + memmove(&gatherstate->reader[gatherstate->nextreader], + &gatherstate->reader[gatherstate->nextreader + 1], + sizeof(TupleQueueReader *) + * (gatherstate->nreaders - gatherstate->nextreader)); + if (gatherstate->nextreader >= gatherstate->nreaders) + gatherstate->nextreader = 0; continue; } @@ -367,11 +371,13 @@ gather_readnext(GatherState *gatherstate) * every tuple, but it turns out to be much more efficient to keep * reading from the same queue until that would require blocking. */ - gatherstate->nextreader = - (gatherstate->nextreader + 1) % gatherstate->nreaders; + gatherstate->nextreader++; + if (gatherstate->nextreader >= gatherstate->nreaders) + gatherstate->nextreader = 0; - /* Have we visited every TupleQueueReader? */ - if (gatherstate->nextreader == waitpos) + /* Have we visited every (surviving) TupleQueueReader? */ + nvisited++; + if (nvisited >= gatherstate->nreaders) { /* * If (still) running plan locally, return NULL so caller can @@ -384,6 +390,7 @@ gather_readnext(GatherState *gatherstate) WaitLatch(MyLatch, WL_LATCH_SET, 0); CHECK_FOR_INTERRUPTS(); ResetLatch(MyLatch); + nvisited = 0; } } } |