diff options
author | Etsuro Fujita <efujita@postgresql.org> | 2021-07-30 17:00:00 +0900 |
---|---|---|
committer | Etsuro Fujita <efujita@postgresql.org> | 2021-07-30 17:00:00 +0900 |
commit | 1ec7fca8592178281cd5cdada0f27a340fb813fc (patch) | |
tree | b77835bd5114ae1d1773d0c6dc95c57e0c6160dc /contrib/postgres_fdw/postgres_fdw.c | |
parent | 16bd4becee32240d09db5c6cbec87957fdfcd2d9 (diff) | |
download | postgresql-1ec7fca8592178281cd5cdada0f27a340fb813fc.tar.gz postgresql-1ec7fca8592178281cd5cdada0f27a340fb813fc.zip |
postgres_fdw: Fix handling of pending asynchronous requests.
A pending asynchronous request is handled by process_pending_request(),
which previously not only processed an in-progress remote query but
performed ExecForeignScan() to produce a tuple to return to the local
server asynchronously from the result of the remote query. But that led
to a server crash when executing a query or led to an "InstrStartNode
called twice in a row" or "InstrEndLoop called on running node" failure
when doing EXPLAIN ANALYZE of it, in cases where the plan tree for it
contained multiple async-capable nodes accessing the same
initplan/subplan that contained multiple async-capable nodes scanning
the same foreign tables as for the parent async-capable nodes, as
reported by Andrey Lepikhov. The reason is that the second step in
process_pending_request() invoked when executing the initplan/subplan
for one of the parent async-capable nodes caused recursive execution of
the initplan/subplan for another of the parent async-capable nodes.
To fix, split process_pending_request() into the two steps and postpone
the second step until ForeignAsyncConfigureWait() is called for each of
the pending asynchronous requests. Also, in ExecAppendAsyncEventWait()
we assumed that FDWs would register at least one wait event in a
WaitEventSet created there when they were called from
ForeignAsyncConfigureWait() in that function, but allow FDWs to register
zero wait events in the WaitEventSet; modify ExecAppendAsyncEventWait()
to just return in that case.
Oversight in commit 27e1f1456. Back-patch to v14 where that commit went
in.
Andrey Lepikhov and Etsuro Fujita
Discussion: https://postgr.es/m/fe5eaa19-1704-e4a4-76ee-3b9d37ade399@postgrespro.ru
Diffstat (limited to 'contrib/postgres_fdw/postgres_fdw.c')
-rw-r--r-- | contrib/postgres_fdw/postgres_fdw.c | 65 |
1 files changed, 52 insertions, 13 deletions
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 51fac77f3d6..8dfee2e02c6 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -503,6 +503,7 @@ static void analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate); static void produce_tuple_asynchronously(AsyncRequest *areq, bool fetch); static void fetch_more_data_begin(AsyncRequest *areq); +static void complete_pending_request(AsyncRequest *areq); static HeapTuple make_tuple_from_result_row(PGresult *res, int row, Relation rel, @@ -6826,6 +6827,22 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq) /* This should not be called unless callback_pending */ Assert(areq->callback_pending); + /* + * If process_pending_request() has been invoked on the given request + * before we get here, we might have some tuples already; in which case + * complete the request + */ + if (fsstate->next_tuple < fsstate->num_tuples) + { + complete_pending_request(areq); + if (areq->request_complete) + return; + Assert(areq->callback_pending); + } + + /* We must have run out of tuples */ + Assert(fsstate->next_tuple >= fsstate->num_tuples); + /* The core code would have registered postmaster death event */ Assert(GetNumRegisteredWaitEvents(set) >= 1); @@ -6838,12 +6855,15 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq) * This is the case when the in-process request was made by another * Append. Note that it might be useless to process the request, * because the query might not need tuples from that Append anymore. - * Skip the given request if there are any configured events other - * than the postmaster death event; otherwise process the request, - * then begin a fetch to configure the event below, because otherwise - * we might end up with no configured events other than the postmaster - * death event. + * If there are any child subplans of the same parent that are ready + * for new requests, skip the given request. Likewise, if there are + * any configured events other than the postmaster death event, skip + * it. Otherwise, process the in-process request, then begin a fetch + * to configure the event below, because we might otherwise end up + * with no configured events other than the postmaster death event. */ + if (!bms_is_empty(requestor->as_needrequest)) + return; if (GetNumRegisteredWaitEvents(set) > 1) return; process_pending_request(pendingAreq); @@ -6995,23 +7015,44 @@ process_pending_request(AsyncRequest *areq) { ForeignScanState *node = (ForeignScanState *) areq->requestee; PgFdwScanState *fsstate PG_USED_FOR_ASSERTS_ONLY = (PgFdwScanState *) node->fdw_state; - EState *estate = node->ss.ps.state; - MemoryContext oldcontext; + + /* The request would have been pending for a callback */ + Assert(areq->callback_pending); /* The request should be currently in-process */ Assert(fsstate->conn_state->pendingAreq == areq); - oldcontext = MemoryContextSwitchTo(estate->es_query_cxt); + fetch_more_data(node); + /* + * If we didn't get any tuples, must be end of data; complete the request + * now. Otherwise, we postpone completing the request until we are called + * from postgresForeignAsyncConfigureWait(). + */ + if (fsstate->next_tuple >= fsstate->num_tuples) + { + /* Unlike AsyncNotify, we unset callback_pending ourselves */ + areq->callback_pending = false; + /* Mark the request as complete */ + ExecAsyncRequestDone(areq, NULL); + /* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */ + ExecAsyncResponse(areq); + } +} + +/* + * Complete a pending asynchronous request. + */ +static void +complete_pending_request(AsyncRequest *areq) +{ /* The request would have been pending for a callback */ Assert(areq->callback_pending); /* Unlike AsyncNotify, we unset callback_pending ourselves */ areq->callback_pending = false; - fetch_more_data(node); - - /* We need to send a new query afterwards; don't fetch */ + /* We begin a fetch afterwards if necessary; don't fetch */ produce_tuple_asynchronously(areq, false); /* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */ @@ -7021,8 +7062,6 @@ process_pending_request(AsyncRequest *areq) if (areq->requestee->instrument) InstrUpdateTupleCount(areq->requestee->instrument, TupIsNull(areq->result) ? 0.0 : 1.0); - - MemoryContextSwitchTo(oldcontext); } /* |