aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/executor/nodeGather.c87
-rw-r--r--src/backend/optimizer/plan/setrefs.c5
-rw-r--r--src/include/nodes/execnodes.h1
3 files changed, 75 insertions, 18 deletions
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index ef810a5834d..9c1533e3113 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -36,6 +36,7 @@
#include "executor/nodeGather.h"
#include "executor/nodeSubplan.h"
#include "executor/tqueue.h"
+#include "utils/memutils.h"
#include "utils/rel.h"
@@ -50,6 +51,9 @@ GatherState *
ExecInitGather(Gather *node, EState *estate, int eflags)
{
GatherState *gatherstate;
+ Plan *outerNode;
+ bool hasoid;
+ TupleDesc tupDesc;
/* Gather node doesn't have innerPlan node. */
Assert(innerPlan(node) == NULL);
@@ -82,13 +86,14 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
/*
* tuple table initialization
*/
+ gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate);
ExecInitResultTupleSlot(estate, &gatherstate->ps);
/*
* now initialize outer plan
*/
- outerPlanState(gatherstate) = ExecInitNode(outerPlan(node), estate, eflags);
-
+ outerNode = outerPlan(node);
+ outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
gatherstate->ps.ps_TupFromTlist = false;
@@ -98,6 +103,14 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
ExecAssignResultTypeFromTL(&gatherstate->ps);
ExecAssignProjectionInfo(&gatherstate->ps, NULL);
+ /*
+ * Initialize funnel slot to same tuple descriptor as outer plan.
+ */
+ if (!ExecContextForcesOids(&gatherstate->ps, &hasoid))
+ hasoid = false;
+ tupDesc = ExecTypeFromTL(outerNode->targetlist, hasoid);
+ ExecSetSlotDescriptor(gatherstate->funnel_slot, tupDesc);
+
return gatherstate;
}
@@ -113,6 +126,9 @@ ExecGather(GatherState *node)
{
int i;
TupleTableSlot *slot;
+ TupleTableSlot *resultSlot;
+ ExprDoneCond isDone;
+ ExprContext *econtext;
/*
* Initialize the parallel context and workers on first execution. We do
@@ -169,7 +185,53 @@ ExecGather(GatherState *node)
node->initialized = true;
}
- slot = gather_getnext(node);
+ /*
+ * Check to see if we're still projecting out tuples from a previous scan
+ * tuple (because there is a function-returning-set in the projection
+ * expressions). If so, try to project another one.
+ */
+ if (node->ps.ps_TupFromTlist)
+ {
+ resultSlot = ExecProject(node->ps.ps_ProjInfo, &isDone);
+ if (isDone == ExprMultipleResult)
+ return resultSlot;
+ /* Done with that source tuple... */
+ node->ps.ps_TupFromTlist = false;
+ }
+
+ /*
+ * Reset per-tuple memory context to free any expression evaluation
+ * storage allocated in the previous tuple cycle. Note we can't do this
+ * until we're done projecting.
+ */
+ econtext = node->ps.ps_ExprContext;
+ ResetExprContext(econtext);
+
+ /* Get and return the next tuple, projecting if necessary. */
+ for (;;)
+ {
+ /*
+ * Get next tuple, either from one of our workers, or by running the
+ * plan ourselves.
+ */
+ slot = gather_getnext(node);
+ if (TupIsNull(slot))
+ return NULL;
+
+ /*
+ * form the result tuple using ExecProject(), and return it --- unless
+ * the projection produces an empty set, in which case we must loop
+ * back around for another tuple
+ */
+ econtext->ecxt_outertuple = slot;
+ resultSlot = ExecProject(node->ps.ps_ProjInfo, &isDone);
+
+ if (isDone != ExprEndResult)
+ {
+ node->ps.ps_TupFromTlist = (isDone == ExprMultipleResult);
+ return resultSlot;
+ }
+ }
return slot;
}
@@ -201,18 +263,11 @@ ExecEndGather(GatherState *node)
static TupleTableSlot *
gather_getnext(GatherState *gatherstate)
{
- PlanState *outerPlan;
+ PlanState *outerPlan = outerPlanState(gatherstate);
TupleTableSlot *outerTupleSlot;
- TupleTableSlot *slot;
+ TupleTableSlot *fslot = gatherstate->funnel_slot;
HeapTuple tup;
- /*
- * We can use projection info of Gather for the tuples received from
- * worker backends as currently for all cases worker backends sends the
- * projected tuple as required by Gather node.
- */
- slot = gatherstate->ps.ps_ProjInfo->pi_slot;
-
while (gatherstate->funnel != NULL || gatherstate->need_to_scan_locally)
{
if (gatherstate->funnel != NULL)
@@ -229,19 +284,17 @@ gather_getnext(GatherState *gatherstate)
if (HeapTupleIsValid(tup))
{
ExecStoreTuple(tup, /* tuple to store */
- slot, /* slot to store in */
+ fslot, /* slot in which to store the tuple */
InvalidBuffer, /* buffer associated with this
* tuple */
true); /* pfree this pointer if not from heap */
- return slot;
+ return fslot;
}
}
if (gatherstate->need_to_scan_locally)
{
- outerPlan = outerPlanState(gatherstate);
-
outerTupleSlot = ExecProcNode(outerPlan);
if (!TupIsNull(outerTupleSlot))
@@ -251,7 +304,7 @@ gather_getnext(GatherState *gatherstate)
}
}
- return ExecClearTuple(slot);
+ return ExecClearTuple(fslot);
}
/* ----------------------------------------------------------------
diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c
index 8c6c57101c0..48d6e6fd783 100644
--- a/src/backend/optimizer/plan/setrefs.c
+++ b/src/backend/optimizer/plan/setrefs.c
@@ -602,12 +602,15 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset)
set_join_references(root, (Join *) plan, rtoffset);
break;
+ case T_Gather:
+ set_upper_references(root, plan, rtoffset);
+ break;
+
case T_Hash:
case T_Material:
case T_Sort:
case T_Unique:
case T_SetOp:
- case T_Gather:
/*
* These plan types don't actually bother to evaluate their
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 4fcdcc4067a..939bc0ed734 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1964,6 +1964,7 @@ typedef struct GatherState
bool initialized;
struct ParallelExecutorInfo *pei;
struct TupleQueueFunnel *funnel;
+ TupleTableSlot *funnel_slot;
bool need_to_scan_locally;
} GatherState;