diff options
Diffstat (limited to 'contrib/postgres_fdw/postgres_fdw.c')
-rw-r--r-- | contrib/postgres_fdw/postgres_fdw.c | 65 |
1 files changed, 52 insertions, 13 deletions
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 51fac77f3d6..8dfee2e02c6 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -503,6 +503,7 @@ static void analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate); static void produce_tuple_asynchronously(AsyncRequest *areq, bool fetch); static void fetch_more_data_begin(AsyncRequest *areq); +static void complete_pending_request(AsyncRequest *areq); static HeapTuple make_tuple_from_result_row(PGresult *res, int row, Relation rel, @@ -6826,6 +6827,22 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq) /* This should not be called unless callback_pending */ Assert(areq->callback_pending); + /* + * If process_pending_request() has been invoked on the given request + * before we get here, we might have some tuples already; in which case + * complete the request + */ + if (fsstate->next_tuple < fsstate->num_tuples) + { + complete_pending_request(areq); + if (areq->request_complete) + return; + Assert(areq->callback_pending); + } + + /* We must have run out of tuples */ + Assert(fsstate->next_tuple >= fsstate->num_tuples); + /* The core code would have registered postmaster death event */ Assert(GetNumRegisteredWaitEvents(set) >= 1); @@ -6838,12 +6855,15 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq) * This is the case when the in-process request was made by another * Append. Note that it might be useless to process the request, * because the query might not need tuples from that Append anymore. - * Skip the given request if there are any configured events other - * than the postmaster death event; otherwise process the request, - * then begin a fetch to configure the event below, because otherwise - * we might end up with no configured events other than the postmaster - * death event. + * If there are any child subplans of the same parent that are ready + * for new requests, skip the given request. Likewise, if there are + * any configured events other than the postmaster death event, skip + * it. Otherwise, process the in-process request, then begin a fetch + * to configure the event below, because we might otherwise end up + * with no configured events other than the postmaster death event. */ + if (!bms_is_empty(requestor->as_needrequest)) + return; if (GetNumRegisteredWaitEvents(set) > 1) return; process_pending_request(pendingAreq); @@ -6995,23 +7015,44 @@ process_pending_request(AsyncRequest *areq) { ForeignScanState *node = (ForeignScanState *) areq->requestee; PgFdwScanState *fsstate PG_USED_FOR_ASSERTS_ONLY = (PgFdwScanState *) node->fdw_state; - EState *estate = node->ss.ps.state; - MemoryContext oldcontext; + + /* The request would have been pending for a callback */ + Assert(areq->callback_pending); /* The request should be currently in-process */ Assert(fsstate->conn_state->pendingAreq == areq); - oldcontext = MemoryContextSwitchTo(estate->es_query_cxt); + fetch_more_data(node); + /* + * If we didn't get any tuples, must be end of data; complete the request + * now. Otherwise, we postpone completing the request until we are called + * from postgresForeignAsyncConfigureWait(). + */ + if (fsstate->next_tuple >= fsstate->num_tuples) + { + /* Unlike AsyncNotify, we unset callback_pending ourselves */ + areq->callback_pending = false; + /* Mark the request as complete */ + ExecAsyncRequestDone(areq, NULL); + /* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */ + ExecAsyncResponse(areq); + } +} + +/* + * Complete a pending asynchronous request. + */ +static void +complete_pending_request(AsyncRequest *areq) +{ /* The request would have been pending for a callback */ Assert(areq->callback_pending); /* Unlike AsyncNotify, we unset callback_pending ourselves */ areq->callback_pending = false; - fetch_more_data(node); - - /* We need to send a new query afterwards; don't fetch */ + /* We begin a fetch afterwards if necessary; don't fetch */ produce_tuple_asynchronously(areq, false); /* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */ @@ -7021,8 +7062,6 @@ process_pending_request(AsyncRequest *areq) if (areq->requestee->instrument) InstrUpdateTupleCount(areq->requestee->instrument, TupIsNull(areq->result) ? 0.0 : 1.0); - - MemoryContextSwitchTo(oldcontext); } /* |