diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/executor/execAsync.c | 30 | ||||
-rw-r--r-- | src/backend/executor/execMain.c | 2 | ||||
-rw-r--r-- | src/backend/executor/execProcnode.c | 3 | ||||
-rw-r--r-- | src/backend/executor/instrument.c | 21 | ||||
-rw-r--r-- | src/backend/executor/nodeAppend.c | 12 | ||||
-rw-r--r-- | src/backend/executor/nodeForeignscan.c | 7 | ||||
-rw-r--r-- | src/include/executor/instrument.h | 5 | ||||
-rw-r--r-- | src/include/nodes/execnodes.h | 5 |
8 files changed, 74 insertions, 11 deletions
diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c index f1985e658c4..75108d36be2 100644 --- a/src/backend/executor/execAsync.c +++ b/src/backend/executor/execAsync.c @@ -15,6 +15,7 @@ #include "postgres.h" #include "executor/execAsync.h" +#include "executor/executor.h" #include "executor/nodeAppend.h" #include "executor/nodeForeignscan.h" @@ -24,6 +25,13 @@ void ExecAsyncRequest(AsyncRequest *areq) { + if (areq->requestee->chgParam != NULL) /* something changed? */ + ExecReScan(areq->requestee); /* let ReScan handle this */ + + /* must provide our own instrumentation support */ + if (areq->requestee->instrument) + InstrStartNode(areq->requestee->instrument); + switch (nodeTag(areq->requestee)) { case T_ForeignScanState: @@ -36,6 +44,11 @@ ExecAsyncRequest(AsyncRequest *areq) } ExecAsyncResponse(areq); + + /* must provide our own instrumentation support */ + if (areq->requestee->instrument) + InstrStopNode(areq->requestee->instrument, + TupIsNull(areq->result) ? 0.0 : 1.0); } /* @@ -48,6 +61,10 @@ ExecAsyncRequest(AsyncRequest *areq) void ExecAsyncConfigureWait(AsyncRequest *areq) { + /* must provide our own instrumentation support */ + if (areq->requestee->instrument) + InstrStartNode(areq->requestee->instrument); + switch (nodeTag(areq->requestee)) { case T_ForeignScanState: @@ -58,6 +75,10 @@ ExecAsyncConfigureWait(AsyncRequest *areq) elog(ERROR, "unrecognized node type: %d", (int) nodeTag(areq->requestee)); } + + /* must provide our own instrumentation support */ + if (areq->requestee->instrument) + InstrStopNode(areq->requestee->instrument, 0.0); } /* @@ -66,6 +87,10 @@ ExecAsyncConfigureWait(AsyncRequest *areq) void ExecAsyncNotify(AsyncRequest *areq) { + /* must provide our own instrumentation support */ + if (areq->requestee->instrument) + InstrStartNode(areq->requestee->instrument); + switch (nodeTag(areq->requestee)) { case T_ForeignScanState: @@ -78,6 +103,11 @@ ExecAsyncNotify(AsyncRequest *areq) } ExecAsyncResponse(areq); + + /* must provide our own instrumentation support */ + if (areq->requestee->instrument) + InstrStopNode(areq->requestee->instrument, + TupIsNull(areq->result) ? 0.0 : 1.0); } /* diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index df3d7f9a8bc..58b49687350 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -1214,7 +1214,7 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo, resultRelInfo->ri_TrigWhenExprs = (ExprState **) palloc0(n * sizeof(ExprState *)); if (instrument_options) - resultRelInfo->ri_TrigInstrument = InstrAlloc(n, instrument_options); + resultRelInfo->ri_TrigInstrument = InstrAlloc(n, instrument_options, false); } else { diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index 9f8c7582e04..753f46863b7 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -407,7 +407,8 @@ ExecInitNode(Plan *node, EState *estate, int eflags) /* Set up instrumentation for this node if requested */ if (estate->es_instrument) - result->instrument = InstrAlloc(1, estate->es_instrument); + result->instrument = InstrAlloc(1, estate->es_instrument, + result->async_capable); return result; } diff --git a/src/backend/executor/instrument.c b/src/backend/executor/instrument.c index 237e13361b5..2b106d8473c 100644 --- a/src/backend/executor/instrument.c +++ b/src/backend/executor/instrument.c @@ -28,7 +28,7 @@ static void WalUsageAdd(WalUsage *dst, WalUsage *add); /* Allocate new instrumentation structure(s) */ Instrumentation * -InstrAlloc(int n, int instrument_options) +InstrAlloc(int n, int instrument_options, bool async_mode) { Instrumentation *instr; @@ -46,6 +46,7 @@ InstrAlloc(int n, int instrument_options) instr[i].need_bufusage = need_buffers; instr[i].need_walusage = need_wal; instr[i].need_timer = need_timer; + instr[i].async_mode = async_mode; } } @@ -82,6 +83,7 @@ InstrStartNode(Instrumentation *instr) void InstrStopNode(Instrumentation *instr, double nTuples) { + double save_tuplecount = instr->tuplecount; instr_time endtime; /* count the returned tuples */ @@ -114,6 +116,23 @@ InstrStopNode(Instrumentation *instr, double nTuples) instr->running = true; instr->firsttuple = INSTR_TIME_GET_DOUBLE(instr->counter); } + else + { + /* + * In async mode, if the plan node hadn't emitted any tuples before, + * this might be the first tuple + */ + if (instr->async_mode && save_tuplecount < 1.0) + instr->firsttuple = INSTR_TIME_GET_DOUBLE(instr->counter); + } +} + +/* Update tuple count */ +void +InstrUpdateTupleCount(Instrumentation *instr, double nTuples) +{ + /* count the returned tuples */ + instr->tuplecount += nTuples; } /* Finish a run cycle for a plan node */ diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c index 3c1f12adafb..1558fafad1e 100644 --- a/src/backend/executor/nodeAppend.c +++ b/src/backend/executor/nodeAppend.c @@ -362,9 +362,9 @@ ExecAppend(PlanState *pstate) } /* - * wait or poll async events if any. We do this before checking for - * the end of iteration, because it might drain the remaining async - * subplans. + * wait or poll for async events if any. We do this before checking + * for the end of iteration, because it might drain the remaining + * async subplans. */ if (node->as_nasyncremain > 0) ExecAppendAsyncEventWait(node); @@ -440,7 +440,7 @@ ExecReScanAppend(AppendState *node) /* * If chgParam of subnode is not null then plan will be re-scanned by - * first ExecProcNode. + * first ExecProcNode or by first ExecAsyncRequest. */ if (subnode->chgParam == NULL) ExecReScan(subnode); @@ -911,7 +911,7 @@ ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result) { CHECK_FOR_INTERRUPTS(); - /* Wait or poll async events. */ + /* Wait or poll for async events. */ ExecAppendAsyncEventWait(node); /* Request a tuple asynchronously. */ @@ -1084,7 +1084,7 @@ ExecAsyncAppendResponse(AsyncRequest *areq) /* Nothing to do if the request is pending. */ if (!areq->request_complete) { - /* The request would have been pending for a callback */ + /* The request would have been pending for a callback. */ Assert(areq->callback_pending); return; } diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c index 898890fb08f..9dc38d47ea7 100644 --- a/src/backend/executor/nodeForeignscan.c +++ b/src/backend/executor/nodeForeignscan.c @@ -210,6 +210,13 @@ ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags) ExecInitQual(node->fdw_recheck_quals, (PlanState *) scanstate); /* + * Determine whether to scan the foreign relation asynchronously or not; + * this has to be kept in sync with the code in ExecInitAppend(). + */ + scanstate->ss.ps.async_capable = (((Plan *) node)->async_capable && + estate->es_epq_active == NULL); + + /* * Initialize FDW-related state. */ scanstate->fdwroutine = fdwroutine; diff --git a/src/include/executor/instrument.h b/src/include/executor/instrument.h index c25aa1b04c2..fc87eed4fb2 100644 --- a/src/include/executor/instrument.h +++ b/src/include/executor/instrument.h @@ -55,6 +55,7 @@ typedef struct Instrumentation bool need_timer; /* true if we need timer data */ bool need_bufusage; /* true if we need buffer usage data */ bool need_walusage; /* true if we need WAL usage data */ + bool async_mode; /* true if node is in async mode */ /* Info about current plan cycle: */ bool running; /* true if we've completed first tuple */ instr_time starttime; /* start time of current iteration of node */ @@ -84,10 +85,12 @@ typedef struct WorkerInstrumentation extern PGDLLIMPORT BufferUsage pgBufferUsage; extern PGDLLIMPORT WalUsage pgWalUsage; -extern Instrumentation *InstrAlloc(int n, int instrument_options); +extern Instrumentation *InstrAlloc(int n, int instrument_options, + bool async_mode); extern void InstrInit(Instrumentation *instr, int instrument_options); extern void InstrStartNode(Instrumentation *instr); extern void InstrStopNode(Instrumentation *instr, double nTuples); +extern void InstrUpdateTupleCount(Instrumentation *instr, double nTuples); extern void InstrEndLoop(Instrumentation *instr); extern void InstrAggNode(Instrumentation *dst, Instrumentation *add); extern void InstrStartParallelQuery(void); diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index e7ae21c023c..91a1c3a780e 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -538,7 +538,8 @@ typedef struct AsyncRequest int request_index; /* Scratch space for requestor */ bool callback_pending; /* Callback is needed */ bool request_complete; /* Request complete, result valid */ - TupleTableSlot *result; /* Result (NULL if no more tuples) */ + TupleTableSlot *result; /* Result (NULL or an empty slot if no more + * tuples) */ } AsyncRequest; /* ---------------- @@ -1003,6 +1004,8 @@ typedef struct PlanState ExprContext *ps_ExprContext; /* node's expression-evaluation context */ ProjectionInfo *ps_ProjInfo; /* info for doing tuple projection */ + bool async_capable; /* true if node is async-capable */ + /* * Scanslot's descriptor if known. This is a bit of a hack, but otherwise * it's hard for expression compilation to optimize based on the |