aboutsummaryrefslogtreecommitdiff
path: root/contrib/postgres_fdw/postgres_fdw.c
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/postgres_fdw/postgres_fdw.c')
-rw-r--r--contrib/postgres_fdw/postgres_fdw.c65
1 files changed, 52 insertions, 13 deletions
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 51fac77f3d6..8dfee2e02c6 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -503,6 +503,7 @@ static void analyze_row_processor(PGresult *res, int row,
PgFdwAnalyzeState *astate);
static void produce_tuple_asynchronously(AsyncRequest *areq, bool fetch);
static void fetch_more_data_begin(AsyncRequest *areq);
+static void complete_pending_request(AsyncRequest *areq);
static HeapTuple make_tuple_from_result_row(PGresult *res,
int row,
Relation rel,
@@ -6826,6 +6827,22 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq)
/* This should not be called unless callback_pending */
Assert(areq->callback_pending);
+ /*
+ * If process_pending_request() has been invoked on the given request
+ * before we get here, we might have some tuples already; in which case
+ * complete the request
+ */
+ if (fsstate->next_tuple < fsstate->num_tuples)
+ {
+ complete_pending_request(areq);
+ if (areq->request_complete)
+ return;
+ Assert(areq->callback_pending);
+ }
+
+ /* We must have run out of tuples */
+ Assert(fsstate->next_tuple >= fsstate->num_tuples);
+
/* The core code would have registered postmaster death event */
Assert(GetNumRegisteredWaitEvents(set) >= 1);
@@ -6838,12 +6855,15 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq)
* This is the case when the in-process request was made by another
* Append. Note that it might be useless to process the request,
* because the query might not need tuples from that Append anymore.
- * Skip the given request if there are any configured events other
- * than the postmaster death event; otherwise process the request,
- * then begin a fetch to configure the event below, because otherwise
- * we might end up with no configured events other than the postmaster
- * death event.
+ * If there are any child subplans of the same parent that are ready
+ * for new requests, skip the given request. Likewise, if there are
+ * any configured events other than the postmaster death event, skip
+ * it. Otherwise, process the in-process request, then begin a fetch
+ * to configure the event below, because we might otherwise end up
+ * with no configured events other than the postmaster death event.
*/
+ if (!bms_is_empty(requestor->as_needrequest))
+ return;
if (GetNumRegisteredWaitEvents(set) > 1)
return;
process_pending_request(pendingAreq);
@@ -6995,23 +7015,44 @@ process_pending_request(AsyncRequest *areq)
{
ForeignScanState *node = (ForeignScanState *) areq->requestee;
PgFdwScanState *fsstate PG_USED_FOR_ASSERTS_ONLY = (PgFdwScanState *) node->fdw_state;
- EState *estate = node->ss.ps.state;
- MemoryContext oldcontext;
+
+ /* The request would have been pending for a callback */
+ Assert(areq->callback_pending);
/* The request should be currently in-process */
Assert(fsstate->conn_state->pendingAreq == areq);
- oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);
+ fetch_more_data(node);
+ /*
+ * If we didn't get any tuples, must be end of data; complete the request
+ * now. Otherwise, we postpone completing the request until we are called
+ * from postgresForeignAsyncConfigureWait().
+ */
+ if (fsstate->next_tuple >= fsstate->num_tuples)
+ {
+ /* Unlike AsyncNotify, we unset callback_pending ourselves */
+ areq->callback_pending = false;
+ /* Mark the request as complete */
+ ExecAsyncRequestDone(areq, NULL);
+ /* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */
+ ExecAsyncResponse(areq);
+ }
+}
+
+/*
+ * Complete a pending asynchronous request.
+ */
+static void
+complete_pending_request(AsyncRequest *areq)
+{
/* The request would have been pending for a callback */
Assert(areq->callback_pending);
/* Unlike AsyncNotify, we unset callback_pending ourselves */
areq->callback_pending = false;
- fetch_more_data(node);
-
- /* We need to send a new query afterwards; don't fetch */
+ /* We begin a fetch afterwards if necessary; don't fetch */
produce_tuple_asynchronously(areq, false);
/* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */
@@ -7021,8 +7062,6 @@ process_pending_request(AsyncRequest *areq)
if (areq->requestee->instrument)
InstrUpdateTupleCount(areq->requestee->instrument,
TupIsNull(areq->result) ? 0.0 : 1.0);
-
- MemoryContextSwitchTo(oldcontext);
}
/*