aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/nodeGather.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor/nodeGather.c')
-rw-r--r--src/backend/executor/nodeGather.c45
1 files changed, 26 insertions, 19 deletions
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 313b2344540..93a566ba629 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -214,8 +214,11 @@ ExecGather(GatherState *node)
/*
* Reset per-tuple memory context to free any expression evaluation
* storage allocated in the previous tuple cycle. Note we can't do this
- * until we're done projecting.
+ * until we're done projecting. This will also clear any previous tuple
+ * returned by a TupleQueueReader; to make sure we don't leave a dangling
+ * pointer around, clear the working slot first.
*/
+ ExecClearTuple(node->funnel_slot);
econtext = node->ps.ps_ExprContext;
ResetExprContext(econtext);
@@ -274,13 +277,19 @@ gather_getnext(GatherState *gatherstate)
PlanState *outerPlan = outerPlanState(gatherstate);
TupleTableSlot *outerTupleSlot;
TupleTableSlot *fslot = gatherstate->funnel_slot;
+ MemoryContext tupleContext = gatherstate->ps.ps_ExprContext->ecxt_per_tuple_memory;
HeapTuple tup;
while (gatherstate->reader != NULL || gatherstate->need_to_scan_locally)
{
if (gatherstate->reader != NULL)
{
+ MemoryContext oldContext;
+
+ /* Run TupleQueueReaders in per-tuple context */
+ oldContext = MemoryContextSwitchTo(tupleContext);
tup = gather_readnext(gatherstate);
+ MemoryContextSwitchTo(oldContext);
if (HeapTupleIsValid(tup))
{
@@ -288,8 +297,7 @@ gather_getnext(GatherState *gatherstate)
fslot, /* slot in which to store the tuple */
InvalidBuffer, /* buffer associated with this
* tuple */
- true); /* pfree this pointer if not from heap */
-
+ false); /* slot should not pfree tuple */
return fslot;
}
}
@@ -314,7 +322,7 @@ gather_getnext(GatherState *gatherstate)
static HeapTuple
gather_readnext(GatherState *gatherstate)
{
- int waitpos = gatherstate->nextreader;
+ int nvisited = 0;
for (;;)
{
@@ -335,6 +343,7 @@ gather_readnext(GatherState *gatherstate)
*/
if (readerdone)
{
+ Assert(!tup);
DestroyTupleQueueReader(reader);
--gatherstate->nreaders;
if (gatherstate->nreaders == 0)
@@ -342,17 +351,12 @@ gather_readnext(GatherState *gatherstate)
ExecShutdownGatherWorkers(gatherstate);
return NULL;
}
- else
- {
- memmove(&gatherstate->reader[gatherstate->nextreader],
- &gatherstate->reader[gatherstate->nextreader + 1],
- sizeof(TupleQueueReader *)
- * (gatherstate->nreaders - gatherstate->nextreader));
- if (gatherstate->nextreader >= gatherstate->nreaders)
- gatherstate->nextreader = 0;
- if (gatherstate->nextreader < waitpos)
- --waitpos;
- }
+ memmove(&gatherstate->reader[gatherstate->nextreader],
+ &gatherstate->reader[gatherstate->nextreader + 1],
+ sizeof(TupleQueueReader *)
+ * (gatherstate->nreaders - gatherstate->nextreader));
+ if (gatherstate->nextreader >= gatherstate->nreaders)
+ gatherstate->nextreader = 0;
continue;
}
@@ -367,11 +371,13 @@ gather_readnext(GatherState *gatherstate)
* every tuple, but it turns out to be much more efficient to keep
* reading from the same queue until that would require blocking.
*/
- gatherstate->nextreader =
- (gatherstate->nextreader + 1) % gatherstate->nreaders;
+ gatherstate->nextreader++;
+ if (gatherstate->nextreader >= gatherstate->nreaders)
+ gatherstate->nextreader = 0;
- /* Have we visited every TupleQueueReader? */
- if (gatherstate->nextreader == waitpos)
+ /* Have we visited every (surviving) TupleQueueReader? */
+ nvisited++;
+ if (nvisited >= gatherstate->nreaders)
{
/*
* If (still) running plan locally, return NULL so caller can
@@ -384,6 +390,7 @@ gather_readnext(GatherState *gatherstate)
WaitLatch(MyLatch, WL_LATCH_SET, 0);
CHECK_FOR_INTERRUPTS();
ResetLatch(MyLatch);
+ nvisited = 0;
}
}
}