aboutsummaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/commands/explain.c3
-rw-r--r--src/backend/executor/Makefile1
-rw-r--r--src/backend/executor/README40
-rw-r--r--src/backend/executor/execAmi.c4
-rw-r--r--src/backend/executor/execAsync.c124
-rw-r--r--src/backend/executor/nodeAppend.c461
-rw-r--r--src/backend/executor/nodeForeignscan.c48
-rw-r--r--src/backend/nodes/copyfuncs.c2
-rw-r--r--src/backend/nodes/outfuncs.c2
-rw-r--r--src/backend/nodes/readfuncs.c2
-rw-r--r--src/backend/optimizer/path/costsize.c1
-rw-r--r--src/backend/optimizer/plan/createplan.c41
-rw-r--r--src/backend/postmaster/pgstat.c3
-rw-r--r--src/backend/storage/ipc/latch.c9
-rw-r--r--src/backend/utils/misc/guc.c10
-rw-r--r--src/backend/utils/misc/postgresql.conf.sample1
16 files changed, 739 insertions, 13 deletions
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index afc45429ba4..fe75cabdcc0 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -1394,6 +1394,8 @@ ExplainNode(PlanState *planstate, List *ancestors,
}
if (plan->parallel_aware)
appendStringInfoString(es->str, "Parallel ");
+ if (plan->async_capable)
+ appendStringInfoString(es->str, "Async ");
appendStringInfoString(es->str, pname);
es->indent++;
}
@@ -1413,6 +1415,7 @@ ExplainNode(PlanState *planstate, List *ancestors,
if (custom_name)
ExplainPropertyText("Custom Plan Provider", custom_name, es);
ExplainPropertyBool("Parallel Aware", plan->parallel_aware, es);
+ ExplainPropertyBool("Async Capable", plan->async_capable, es);
}
switch (nodeTag(plan))
diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile
index 74ac59faa13..680fd69151b 100644
--- a/src/backend/executor/Makefile
+++ b/src/backend/executor/Makefile
@@ -14,6 +14,7 @@ include $(top_builddir)/src/Makefile.global
OBJS = \
execAmi.o \
+ execAsync.o \
execCurrent.o \
execExpr.o \
execExprInterp.o \
diff --git a/src/backend/executor/README b/src/backend/executor/README
index 18b2ac18659..3726048c4a7 100644
--- a/src/backend/executor/README
+++ b/src/backend/executor/README
@@ -359,3 +359,43 @@ query returning the same set of scan tuples multiple times. Likewise,
SRFs are disallowed in an UPDATE's targetlist. There, they would have the
effect of the same row being updated multiple times, which is not very
useful --- and updates after the first would have no effect anyway.
+
+
+Asynchronous Execution
+----------------------
+
+In cases where a node is waiting on an event external to the database system,
+such as a ForeignScan awaiting network I/O, it's desirable for the node to
+indicate that it cannot return any tuple immediately but may be able to do so
+at a later time. A process which discovers this type of situation can always
+handle it simply by blocking, but this may waste time that could be spent
+executing some other part of the plan tree where progress could be made
+immediately. This is particularly likely to occur when the plan tree contains
+an Append node. Asynchronous execution runs multiple parts of an Append node
+concurrently rather than serially to improve performance.
+
+For asynchronous execution, an Append node must first request a tuple from an
+async-capable child node using ExecAsyncRequest. Next, it must execute the
+asynchronous event loop using ExecAppendAsyncEventWait. Eventually, when a
+child node to which an asynchronous request has been made produces a tuple,
+the Append node will receive it from the event loop via ExecAsyncResponse. In
+the current implementation of asynchronous execution, the only node type that
+requests tuples from an async-capable child node is an Append, while the only
+node type that might be async-capable is a ForeignScan.
+
+Typically, the ExecAsyncResponse callback is the only one required for nodes
+that wish to request tuples asynchronously. On the other hand, async-capable
+nodes generally need to implement three methods:
+
+1. When an asynchronous request is made, the node's ExecAsyncRequest callback
+ will be invoked; it should use ExecAsyncRequestPending to indicate that the
+ request is pending for a callback described below. Alternatively, it can
+ instead use ExecAsyncRequestDone if a result is available immediately.
+
+2. When the event loop wishes to wait or poll for file descriptor events, the
+ node's ExecAsyncConfigureWait callback will be invoked to configure the
+ file descriptor event for which the node wishes to wait.
+
+3. When the file descriptor becomes ready, the node's ExecAsyncNotify callback
+ will be invoked; like #1, it should use ExecAsyncRequestPending for another
+ callback or ExecAsyncRequestDone to return a result immediately.
diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c
index 4543ac79edf..58a8aa5ab75 100644
--- a/src/backend/executor/execAmi.c
+++ b/src/backend/executor/execAmi.c
@@ -531,6 +531,10 @@ ExecSupportsBackwardScan(Plan *node)
{
ListCell *l;
+ /* With async, tuples may be interleaved, so can't back up. */
+ if (((Append *) node)->nasyncplans > 0)
+ return false;
+
foreach(l, ((Append *) node)->appendplans)
{
if (!ExecSupportsBackwardScan((Plan *) lfirst(l)))
diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c
new file mode 100644
index 00000000000..f1985e658c4
--- /dev/null
+++ b/src/backend/executor/execAsync.c
@@ -0,0 +1,124 @@
+/*-------------------------------------------------------------------------
+ *
+ * execAsync.c
+ * Support routines for asynchronous execution
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/executor/execAsync.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "executor/execAsync.h"
+#include "executor/nodeAppend.h"
+#include "executor/nodeForeignscan.h"
+
+/*
+ * Asynchronously request a tuple from a designed async-capable node.
+ */
+void
+ExecAsyncRequest(AsyncRequest *areq)
+{
+ switch (nodeTag(areq->requestee))
+ {
+ case T_ForeignScanState:
+ ExecAsyncForeignScanRequest(areq);
+ break;
+ default:
+ /* If the node doesn't support async, caller messed up. */
+ elog(ERROR, "unrecognized node type: %d",
+ (int) nodeTag(areq->requestee));
+ }
+
+ ExecAsyncResponse(areq);
+}
+
+/*
+ * Give the asynchronous node a chance to configure the file descriptor event
+ * for which it wishes to wait. We expect the node-type specific callback to
+ * make a single call of the following form:
+ *
+ * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, areq);
+ */
+void
+ExecAsyncConfigureWait(AsyncRequest *areq)
+{
+ switch (nodeTag(areq->requestee))
+ {
+ case T_ForeignScanState:
+ ExecAsyncForeignScanConfigureWait(areq);
+ break;
+ default:
+ /* If the node doesn't support async, caller messed up. */
+ elog(ERROR, "unrecognized node type: %d",
+ (int) nodeTag(areq->requestee));
+ }
+}
+
+/*
+ * Call the asynchronous node back when a relevant event has occurred.
+ */
+void
+ExecAsyncNotify(AsyncRequest *areq)
+{
+ switch (nodeTag(areq->requestee))
+ {
+ case T_ForeignScanState:
+ ExecAsyncForeignScanNotify(areq);
+ break;
+ default:
+ /* If the node doesn't support async, caller messed up. */
+ elog(ERROR, "unrecognized node type: %d",
+ (int) nodeTag(areq->requestee));
+ }
+
+ ExecAsyncResponse(areq);
+}
+
+/*
+ * Call the requestor back when an asynchronous node has produced a result.
+ */
+void
+ExecAsyncResponse(AsyncRequest *areq)
+{
+ switch (nodeTag(areq->requestor))
+ {
+ case T_AppendState:
+ ExecAsyncAppendResponse(areq);
+ break;
+ default:
+ /* If the node doesn't support async, caller messed up. */
+ elog(ERROR, "unrecognized node type: %d",
+ (int) nodeTag(areq->requestor));
+ }
+}
+
+/*
+ * A requestee node should call this function to deliver the tuple to its
+ * requestor node. The requestee node can call this from its ExecAsyncRequest
+ * or ExecAsyncNotify callback.
+ */
+void
+ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result)
+{
+ areq->request_complete = true;
+ areq->result = result;
+}
+
+/*
+ * A requestee node should call this function to indicate that it is pending
+ * for a callback. The requestee node can call this from its ExecAsyncRequest
+ * or ExecAsyncNotify callback.
+ */
+void
+ExecAsyncRequestPending(AsyncRequest *areq)
+{
+ areq->callback_pending = true;
+ areq->request_complete = false;
+ areq->result = NULL;
+}
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 15e4115bd6d..7da8ffe0652 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -57,10 +57,13 @@
#include "postgres.h"
+#include "executor/execAsync.h"
#include "executor/execdebug.h"
#include "executor/execPartition.h"
#include "executor/nodeAppend.h"
#include "miscadmin.h"
+#include "pgstat.h"
+#include "storage/latch.h"
/* Shared state for parallel-aware Append. */
struct ParallelAppendState
@@ -78,12 +81,18 @@ struct ParallelAppendState
};
#define INVALID_SUBPLAN_INDEX -1
+#define EVENT_BUFFER_SIZE 16
static TupleTableSlot *ExecAppend(PlanState *pstate);
static bool choose_next_subplan_locally(AppendState *node);
static bool choose_next_subplan_for_leader(AppendState *node);
static bool choose_next_subplan_for_worker(AppendState *node);
static void mark_invalid_subplans_as_finished(AppendState *node);
+static void ExecAppendAsyncBegin(AppendState *node);
+static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result);
+static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result);
+static void ExecAppendAsyncEventWait(AppendState *node);
+static void classify_matching_subplans(AppendState *node);
/* ----------------------------------------------------------------
* ExecInitAppend
@@ -102,7 +111,9 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
AppendState *appendstate = makeNode(AppendState);
PlanState **appendplanstates;
Bitmapset *validsubplans;
+ Bitmapset *asyncplans;
int nplans;
+ int nasyncplans;
int firstvalid;
int i,
j;
@@ -119,6 +130,8 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
/* Let choose_next_subplan_* function handle setting the first subplan */
appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
+ appendstate->as_syncdone = false;
+ appendstate->as_begun = false;
/* If run-time partition pruning is enabled, then set that up now */
if (node->part_prune_info != NULL)
@@ -191,6 +204,8 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
* While at it, find out the first valid partial plan.
*/
j = 0;
+ asyncplans = NULL;
+ nasyncplans = 0;
firstvalid = nplans;
i = -1;
while ((i = bms_next_member(validsubplans, i)) >= 0)
@@ -198,6 +213,17 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
Plan *initNode = (Plan *) list_nth(node->appendplans, i);
/*
+ * Record async subplans. When executing EvalPlanQual, we treat them
+ * as sync ones; don't do this when initializing an EvalPlanQual plan
+ * tree.
+ */
+ if (initNode->async_capable && estate->es_epq_active == NULL)
+ {
+ asyncplans = bms_add_member(asyncplans, j);
+ nasyncplans++;
+ }
+
+ /*
* Record the lowest appendplans index which is a valid partial plan.
*/
if (i >= node->first_partial_plan && j < firstvalid)
@@ -210,6 +236,37 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
appendstate->appendplans = appendplanstates;
appendstate->as_nplans = nplans;
+ /* Initialize async state */
+ appendstate->as_asyncplans = asyncplans;
+ appendstate->as_nasyncplans = nasyncplans;
+ appendstate->as_asyncrequests = NULL;
+ appendstate->as_asyncresults = (TupleTableSlot **)
+ palloc0(nasyncplans * sizeof(TupleTableSlot *));
+ appendstate->as_needrequest = NULL;
+ appendstate->as_eventset = NULL;
+
+ if (nasyncplans > 0)
+ {
+ appendstate->as_asyncrequests = (AsyncRequest **)
+ palloc0(nplans * sizeof(AsyncRequest *));
+
+ i = -1;
+ while ((i = bms_next_member(asyncplans, i)) >= 0)
+ {
+ AsyncRequest *areq;
+
+ areq = palloc(sizeof(AsyncRequest));
+ areq->requestor = (PlanState *) appendstate;
+ areq->requestee = appendplanstates[i];
+ areq->request_index = i;
+ areq->callback_pending = false;
+ areq->request_complete = false;
+ areq->result = NULL;
+
+ appendstate->as_asyncrequests[i] = areq;
+ }
+ }
+
/*
* Miscellaneous initialization
*/
@@ -232,31 +289,59 @@ static TupleTableSlot *
ExecAppend(PlanState *pstate)
{
AppendState *node = castNode(AppendState, pstate);
+ TupleTableSlot *result;
- if (node->as_whichplan < 0)
+ /*
+ * If this is the first call after Init or ReScan, we need to do the
+ * initialization work.
+ */
+ if (!node->as_begun)
{
+ Assert(node->as_whichplan == INVALID_SUBPLAN_INDEX);
+ Assert(!node->as_syncdone);
+
/* Nothing to do if there are no subplans */
if (node->as_nplans == 0)
return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ /* If there are any async subplans, begin executing them. */
+ if (node->as_nasyncplans > 0)
+ ExecAppendAsyncBegin(node);
+
/*
- * If no subplan has been chosen, we must choose one before
+ * If no sync subplan has been chosen, we must choose one before
* proceeding.
*/
- if (node->as_whichplan == INVALID_SUBPLAN_INDEX &&
- !node->choose_next_subplan(node))
+ if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+
+ Assert(node->as_syncdone ||
+ (node->as_whichplan >= 0 &&
+ node->as_whichplan < node->as_nplans));
+
+ /* And we're initialized. */
+ node->as_begun = true;
}
for (;;)
{
PlanState *subnode;
- TupleTableSlot *result;
CHECK_FOR_INTERRUPTS();
/*
- * figure out which subplan we are currently processing
+ * try to get a tuple from an async subplan if any
+ */
+ if (node->as_syncdone || !bms_is_empty(node->as_needrequest))
+ {
+ if (ExecAppendAsyncGetNext(node, &result))
+ return result;
+ Assert(!node->as_syncdone);
+ Assert(bms_is_empty(node->as_needrequest));
+ }
+
+ /*
+ * figure out which sync subplan we are currently processing
*/
Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
subnode = node->appendplans[node->as_whichplan];
@@ -276,8 +361,16 @@ ExecAppend(PlanState *pstate)
return result;
}
- /* choose new subplan; if none, we're done */
- if (!node->choose_next_subplan(node))
+ /*
+ * wait or poll async events if any. We do this before checking for
+ * the end of iteration, because it might drain the remaining async
+ * subplans.
+ */
+ if (node->as_nasyncremain > 0)
+ ExecAppendAsyncEventWait(node);
+
+ /* choose new sync subplan; if no sync/async subplans, we're done */
+ if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
return ExecClearTuple(node->ps.ps_ResultTupleSlot);
}
}
@@ -313,6 +406,7 @@ ExecEndAppend(AppendState *node)
void
ExecReScanAppend(AppendState *node)
{
+ int nasyncplans = node->as_nasyncplans;
int i;
/*
@@ -326,6 +420,11 @@ ExecReScanAppend(AppendState *node)
{
bms_free(node->as_valid_subplans);
node->as_valid_subplans = NULL;
+ if (nasyncplans > 0)
+ {
+ bms_free(node->as_valid_asyncplans);
+ node->as_valid_asyncplans = NULL;
+ }
}
for (i = 0; i < node->as_nplans; i++)
@@ -347,8 +446,27 @@ ExecReScanAppend(AppendState *node)
ExecReScan(subnode);
}
+ /* Reset async state */
+ if (nasyncplans > 0)
+ {
+ i = -1;
+ while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
+ {
+ AsyncRequest *areq = node->as_asyncrequests[i];
+
+ areq->callback_pending = false;
+ areq->request_complete = false;
+ areq->result = NULL;
+ }
+
+ bms_free(node->as_needrequest);
+ node->as_needrequest = NULL;
+ }
+
/* Let choose_next_subplan_* function handle setting the first subplan */
node->as_whichplan = INVALID_SUBPLAN_INDEX;
+ node->as_syncdone = false;
+ node->as_begun = false;
}
/* ----------------------------------------------------------------
@@ -429,7 +547,7 @@ ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
/* ----------------------------------------------------------------
* choose_next_subplan_locally
*
- * Choose next subplan for a non-parallel-aware Append,
+ * Choose next sync subplan for a non-parallel-aware Append,
* returning false if there are no more.
* ----------------------------------------------------------------
*/
@@ -442,16 +560,25 @@ choose_next_subplan_locally(AppendState *node)
/* We should never be called when there are no subplans */
Assert(node->as_nplans > 0);
+ /* Nothing to do if syncdone */
+ if (node->as_syncdone)
+ return false;
+
/*
* If first call then have the bms member function choose the first valid
- * subplan by initializing whichplan to -1. If there happen to be no
- * valid subplans then the bms member function will handle that by
- * returning a negative number which will allow us to exit returning a
+ * sync subplan by initializing whichplan to -1. If there happen to be
+ * no valid sync subplans then the bms member function will handle that
+ * by returning a negative number which will allow us to exit returning a
* false value.
*/
if (whichplan == INVALID_SUBPLAN_INDEX)
{
- if (node->as_valid_subplans == NULL)
+ if (node->as_nasyncplans > 0)
+ {
+ /* We'd have filled as_valid_subplans already */
+ Assert(node->as_valid_subplans);
+ }
+ else if (node->as_valid_subplans == NULL)
node->as_valid_subplans =
ExecFindMatchingSubPlans(node->as_prune_state);
@@ -467,7 +594,12 @@ choose_next_subplan_locally(AppendState *node)
nextplan = bms_prev_member(node->as_valid_subplans, whichplan);
if (nextplan < 0)
+ {
+ /* Set as_syncdone if in async mode */
+ if (node->as_nasyncplans > 0)
+ node->as_syncdone = true;
return false;
+ }
node->as_whichplan = nextplan;
@@ -709,3 +841,306 @@ mark_invalid_subplans_as_finished(AppendState *node)
node->as_pstate->pa_finished[i] = true;
}
}
+
+/* ----------------------------------------------------------------
+ * Asynchronous Append Support
+ * ----------------------------------------------------------------
+ */
+
+/* ----------------------------------------------------------------
+ * ExecAppendAsyncBegin
+ *
+ * Begin executing designed async-capable subplans.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecAppendAsyncBegin(AppendState *node)
+{
+ int i;
+
+ /* Backward scan is not supported by async-aware Appends. */
+ Assert(ScanDirectionIsForward(node->ps.state->es_direction));
+
+ /* We should never be called when there are no async subplans. */
+ Assert(node->as_nasyncplans > 0);
+
+ /* If we've yet to determine the valid subplans then do so now. */
+ if (node->as_valid_subplans == NULL)
+ node->as_valid_subplans =
+ ExecFindMatchingSubPlans(node->as_prune_state);
+
+ classify_matching_subplans(node);
+
+ /* Nothing to do if there are no valid async subplans. */
+ if (node->as_nasyncremain == 0)
+ return;
+
+ /* Make a request for each of the valid async subplans. */
+ i = -1;
+ while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0)
+ {
+ AsyncRequest *areq = node->as_asyncrequests[i];
+
+ Assert(areq->request_index == i);
+ Assert(!areq->callback_pending);
+
+ /* Do the actual work. */
+ ExecAsyncRequest(areq);
+ }
+}
+
+/* ----------------------------------------------------------------
+ * ExecAppendAsyncGetNext
+ *
+ * Get the next tuple from any of the asynchronous subplans.
+ * ----------------------------------------------------------------
+ */
+static bool
+ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
+{
+ *result = NULL;
+
+ /* We should never be called when there are no valid async subplans. */
+ Assert(node->as_nasyncremain > 0);
+
+ /* Request a tuple asynchronously. */
+ if (ExecAppendAsyncRequest(node, result))
+ return true;
+
+ while (node->as_nasyncremain > 0)
+ {
+ CHECK_FOR_INTERRUPTS();
+
+ /* Wait or poll async events. */
+ ExecAppendAsyncEventWait(node);
+
+ /* Request a tuple asynchronously. */
+ if (ExecAppendAsyncRequest(node, result))
+ return true;
+
+ /* Break from loop if there's any sync subplan that isn't complete. */
+ if (!node->as_syncdone)
+ break;
+ }
+
+ /*
+ * If all sync subplans are complete, we're totally done scanning the
+ * given node. Otherwise, we're done with the asynchronous stuff but
+ * must continue scanning the sync subplans.
+ */
+ if (node->as_syncdone)
+ {
+ Assert(node->as_nasyncremain == 0);
+ *result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ return true;
+ }
+
+ return false;
+}
+
+/* ----------------------------------------------------------------
+ * ExecAppendAsyncRequest
+ *
+ * Request a tuple asynchronously.
+ * ----------------------------------------------------------------
+ */
+static bool
+ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
+{
+ Bitmapset *needrequest;
+ int i;
+
+ /* Nothing to do if there are no async subplans needing a new request. */
+ if (bms_is_empty(node->as_needrequest))
+ return false;
+
+ /*
+ * If there are any asynchronously-generated results that have not yet
+ * been returned, we have nothing to do; just return one of them.
+ */
+ if (node->as_nasyncresults > 0)
+ {
+ --node->as_nasyncresults;
+ *result = node->as_asyncresults[node->as_nasyncresults];
+ return true;
+ }
+
+ /* Make a new request for each of the async subplans that need it. */
+ needrequest = node->as_needrequest;
+ node->as_needrequest = NULL;
+ i = -1;
+ while ((i = bms_next_member(needrequest, i)) >= 0)
+ {
+ AsyncRequest *areq = node->as_asyncrequests[i];
+
+ /* Do the actual work. */
+ ExecAsyncRequest(areq);
+ }
+ bms_free(needrequest);
+
+ /* Return one of the asynchronously-generated results if any. */
+ if (node->as_nasyncresults > 0)
+ {
+ --node->as_nasyncresults;
+ *result = node->as_asyncresults[node->as_nasyncresults];
+ return true;
+ }
+
+ return false;
+}
+
+/* ----------------------------------------------------------------
+ * ExecAppendAsyncEventWait
+ *
+ * Wait or poll for file descriptor events and fire callbacks.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecAppendAsyncEventWait(AppendState *node)
+{
+ long timeout = node->as_syncdone ? -1 : 0;
+ WaitEvent occurred_event[EVENT_BUFFER_SIZE];
+ int noccurred;
+ int i;
+
+ /* We should never be called when there are no valid async subplans. */
+ Assert(node->as_nasyncremain > 0);
+
+ node->as_eventset = CreateWaitEventSet(CurrentMemoryContext,
+ node->as_nasyncplans + 1);
+ AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
+ NULL, NULL);
+
+ /* Give each waiting subplan a chance to add an event. */
+ i = -1;
+ while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
+ {
+ AsyncRequest *areq = node->as_asyncrequests[i];
+
+ if (areq->callback_pending)
+ ExecAsyncConfigureWait(areq);
+ }
+
+ /* Wait for at least one event to occur. */
+ noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event,
+ EVENT_BUFFER_SIZE, WAIT_EVENT_APPEND_READY);
+ FreeWaitEventSet(node->as_eventset);
+ node->as_eventset = NULL;
+ if (noccurred == 0)
+ return;
+
+ /* Deliver notifications. */
+ for (i = 0; i < noccurred; i++)
+ {
+ WaitEvent *w = &occurred_event[i];
+
+ /*
+ * Each waiting subplan should have registered its wait event with
+ * user_data pointing back to its AsyncRequest.
+ */
+ if ((w->events & WL_SOCKET_READABLE) != 0)
+ {
+ AsyncRequest *areq = (AsyncRequest *) w->user_data;
+
+ /*
+ * Mark it as no longer needing a callback. We must do this
+ * before dispatching the callback in case the callback resets
+ * the flag.
+ */
+ Assert(areq->callback_pending);
+ areq->callback_pending = false;
+
+ /* Do the actual work. */
+ ExecAsyncNotify(areq);
+ }
+ }
+}
+
+/* ----------------------------------------------------------------
+ * ExecAsyncAppendResponse
+ *
+ * Receive a response from an asynchronous request we made.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncAppendResponse(AsyncRequest *areq)
+{
+ AppendState *node = (AppendState *) areq->requestor;
+ TupleTableSlot *slot = areq->result;
+
+ /* The result should be a TupleTableSlot or NULL. */
+ Assert(slot == NULL || IsA(slot, TupleTableSlot));
+
+ /* Nothing to do if the request is pending. */
+ if (!areq->request_complete)
+ {
+ /* The request would have been pending for a callback */
+ Assert(areq->callback_pending);
+ return;
+ }
+
+ /* If the result is NULL or an empty slot, there's nothing more to do. */
+ if (TupIsNull(slot))
+ {
+ /* The ending subplan wouldn't have been pending for a callback. */
+ Assert(!areq->callback_pending);
+ --node->as_nasyncremain;
+ return;
+ }
+
+ /* Save result so we can return it. */
+ Assert(node->as_nasyncresults < node->as_nasyncplans);
+ node->as_asyncresults[node->as_nasyncresults++] = slot;
+
+ /*
+ * Mark the subplan that returned a result as ready for a new request. We
+ * don't launch another one here immediately because it might complete.
+ */
+ node->as_needrequest = bms_add_member(node->as_needrequest,
+ areq->request_index);
+}
+
+/* ----------------------------------------------------------------
+ * classify_matching_subplans
+ *
+ * Classify the node's as_valid_subplans into sync ones and
+ * async ones, adjust it to contain sync ones only, and save
+ * async ones in the node's as_valid_asyncplans.
+ * ----------------------------------------------------------------
+ */
+static void
+classify_matching_subplans(AppendState *node)
+{
+ Bitmapset *valid_asyncplans;
+
+ Assert(node->as_valid_asyncplans == NULL);
+
+ /* Nothing to do if there are no valid subplans. */
+ if (bms_is_empty(node->as_valid_subplans))
+ {
+ node->as_syncdone = true;
+ node->as_nasyncremain = 0;
+ return;
+ }
+
+ /* Nothing to do if there are no valid async subplans. */
+ if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
+ {
+ node->as_nasyncremain = 0;
+ return;
+ }
+
+ /* Get valid async subplans. */
+ valid_asyncplans = bms_copy(node->as_asyncplans);
+ valid_asyncplans = bms_int_members(valid_asyncplans,
+ node->as_valid_subplans);
+
+ /* Adjust the valid subplans to contain sync subplans only. */
+ node->as_valid_subplans = bms_del_members(node->as_valid_subplans,
+ valid_asyncplans);
+ node->as_syncdone = bms_is_empty(node->as_valid_subplans);
+
+ /* Save valid async subplans. */
+ node->as_valid_asyncplans = valid_asyncplans;
+ node->as_nasyncremain = bms_num_members(valid_asyncplans);
+}
diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c
index 0969e53c3a4..898890fb08f 100644
--- a/src/backend/executor/nodeForeignscan.c
+++ b/src/backend/executor/nodeForeignscan.c
@@ -391,3 +391,51 @@ ExecShutdownForeignScan(ForeignScanState *node)
if (fdwroutine->ShutdownForeignScan)
fdwroutine->ShutdownForeignScan(node);
}
+
+/* ----------------------------------------------------------------
+ * ExecAsyncForeignScanRequest
+ *
+ * Asynchronously request a tuple from a designed async-capable node
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncForeignScanRequest(AsyncRequest *areq)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ Assert(fdwroutine->ForeignAsyncRequest != NULL);
+ fdwroutine->ForeignAsyncRequest(areq);
+}
+
+/* ----------------------------------------------------------------
+ * ExecAsyncForeignScanConfigureWait
+ *
+ * In async mode, configure for a wait
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncForeignScanConfigureWait(AsyncRequest *areq)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ Assert(fdwroutine->ForeignAsyncConfigureWait != NULL);
+ fdwroutine->ForeignAsyncConfigureWait(areq);
+}
+
+/* ----------------------------------------------------------------
+ * ExecAsyncForeignScanNotify
+ *
+ * Callback invoked when a relevant event has occurred
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncForeignScanNotify(AsyncRequest *areq)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ Assert(fdwroutine->ForeignAsyncNotify != NULL);
+ fdwroutine->ForeignAsyncNotify(areq);
+}
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 1d0bb6e2e74..d58b79d525c 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -120,6 +120,7 @@ CopyPlanFields(const Plan *from, Plan *newnode)
COPY_SCALAR_FIELD(plan_width);
COPY_SCALAR_FIELD(parallel_aware);
COPY_SCALAR_FIELD(parallel_safe);
+ COPY_SCALAR_FIELD(async_capable);
COPY_SCALAR_FIELD(plan_node_id);
COPY_NODE_FIELD(targetlist);
COPY_NODE_FIELD(qual);
@@ -241,6 +242,7 @@ _copyAppend(const Append *from)
*/
COPY_BITMAPSET_FIELD(apprelids);
COPY_NODE_FIELD(appendplans);
+ COPY_SCALAR_FIELD(nasyncplans);
COPY_SCALAR_FIELD(first_partial_plan);
COPY_NODE_FIELD(part_prune_info);
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index 301fa304902..ff127a19adf 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -333,6 +333,7 @@ _outPlanInfo(StringInfo str, const Plan *node)
WRITE_INT_FIELD(plan_width);
WRITE_BOOL_FIELD(parallel_aware);
WRITE_BOOL_FIELD(parallel_safe);
+ WRITE_BOOL_FIELD(async_capable);
WRITE_INT_FIELD(plan_node_id);
WRITE_NODE_FIELD(targetlist);
WRITE_NODE_FIELD(qual);
@@ -431,6 +432,7 @@ _outAppend(StringInfo str, const Append *node)
WRITE_BITMAPSET_FIELD(apprelids);
WRITE_NODE_FIELD(appendplans);
+ WRITE_INT_FIELD(nasyncplans);
WRITE_INT_FIELD(first_partial_plan);
WRITE_NODE_FIELD(part_prune_info);
}
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index 377185f7c67..6a563e99033 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -1615,6 +1615,7 @@ ReadCommonPlan(Plan *local_node)
READ_INT_FIELD(plan_width);
READ_BOOL_FIELD(parallel_aware);
READ_BOOL_FIELD(parallel_safe);
+ READ_BOOL_FIELD(async_capable);
READ_INT_FIELD(plan_node_id);
READ_NODE_FIELD(targetlist);
READ_NODE_FIELD(qual);
@@ -1711,6 +1712,7 @@ _readAppend(void)
READ_BITMAPSET_FIELD(apprelids);
READ_NODE_FIELD(appendplans);
+ READ_INT_FIELD(nasyncplans);
READ_INT_FIELD(first_partial_plan);
READ_NODE_FIELD(part_prune_info);
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index b92c9485882..0c016a03dd9 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -147,6 +147,7 @@ bool enable_partitionwise_aggregate = false;
bool enable_parallel_append = true;
bool enable_parallel_hash = true;
bool enable_partition_pruning = true;
+bool enable_async_append = true;
typedef struct
{
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 906cab70532..78ef068fb7b 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -81,6 +81,7 @@ static List *get_gating_quals(PlannerInfo *root, List *quals);
static Plan *create_gating_plan(PlannerInfo *root, Path *path, Plan *plan,
List *gating_quals);
static Plan *create_join_plan(PlannerInfo *root, JoinPath *best_path);
+static bool is_async_capable_path(Path *path);
static Plan *create_append_plan(PlannerInfo *root, AppendPath *best_path,
int flags);
static Plan *create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path,
@@ -1081,6 +1082,31 @@ create_join_plan(PlannerInfo *root, JoinPath *best_path)
}
/*
+ * is_async_capable_path
+ * Check whether a given Path node is async-capable.
+ */
+static bool
+is_async_capable_path(Path *path)
+{
+ switch (nodeTag(path))
+ {
+ case T_ForeignPath:
+ {
+ FdwRoutine *fdwroutine = path->parent->fdwroutine;
+
+ Assert(fdwroutine != NULL);
+ if (fdwroutine->IsForeignPathAsyncCapable != NULL &&
+ fdwroutine->IsForeignPathAsyncCapable((ForeignPath *) path))
+ return true;
+ }
+ break;
+ default:
+ break;
+ }
+ return false;
+}
+
+/*
* create_append_plan
* Create an Append plan for 'best_path' and (recursively) plans
* for its subpaths.
@@ -1097,6 +1123,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
List *pathkeys = best_path->path.pathkeys;
List *subplans = NIL;
ListCell *subpaths;
+ int nasyncplans = 0;
RelOptInfo *rel = best_path->path.parent;
PartitionPruneInfo *partpruneinfo = NULL;
int nodenumsortkeys = 0;
@@ -1104,6 +1131,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
Oid *nodeSortOperators = NULL;
Oid *nodeCollations = NULL;
bool *nodeNullsFirst = NULL;
+ bool consider_async = false;
/*
* The subpaths list could be empty, if every child was proven empty by
@@ -1167,6 +1195,11 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
tlist_was_changed = (orig_tlist_length != list_length(plan->plan.targetlist));
}
+ /* If appropriate, consider async append */
+ consider_async = (enable_async_append && pathkeys == NIL &&
+ !best_path->path.parallel_safe &&
+ list_length(best_path->subpaths) > 1);
+
/* Build the plan for each child */
foreach(subpaths, best_path->subpaths)
{
@@ -1234,6 +1267,13 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
}
subplans = lappend(subplans, subplan);
+
+ /* Check to see if subplan can be executed asynchronously */
+ if (consider_async && is_async_capable_path(subpath))
+ {
+ subplan->async_capable = true;
+ ++nasyncplans;
+ }
}
/*
@@ -1266,6 +1306,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
}
plan->appendplans = subplans;
+ plan->nasyncplans = nasyncplans;
plan->first_partial_plan = best_path->first_partial_path;
plan->part_prune_info = partpruneinfo;
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 60f45ccc4ea..4b9bcd2b41a 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3995,6 +3995,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
switch (w)
{
+ case WAIT_EVENT_APPEND_READY:
+ event_name = "AppendReady";
+ break;
case WAIT_EVENT_BACKUP_WAIT_WAL_ARCHIVE:
event_name = "BackupWaitWalArchive";
break;
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index 43a5fded103..5f3318fa8f1 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -2020,6 +2020,15 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
}
#endif
+/*
+ * Get the number of wait events registered in a given WaitEventSet.
+ */
+int
+GetNumRegisteredWaitEvents(WaitEventSet *set)
+{
+ return set->nevents;
+}
+
#if defined(WAIT_USE_POLL)
/*
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 0c5dc4d3e84..03daec9a085 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1129,6 +1129,16 @@ static struct config_bool ConfigureNamesBool[] =
NULL, NULL, NULL
},
{
+ {"enable_async_append", PGC_USERSET, QUERY_TUNING_METHOD,
+ gettext_noop("Enables the planner's use of async append plans."),
+ NULL,
+ GUC_EXPLAIN
+ },
+ &enable_async_append,
+ true,
+ NULL, NULL, NULL
+ },
+ {
{"geqo", PGC_USERSET, QUERY_TUNING_GEQO,
gettext_noop("Enables genetic query optimization."),
gettext_noop("This algorithm attempts to do planning without "
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index b234a6bfe64..791d39cf078 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -371,6 +371,7 @@
#enable_partitionwise_aggregate = off
#enable_parallel_hash = on
#enable_partition_pruning = on
+#enable_async_append = on
# - Planner Cost Constants -