aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/postgres_fdw/expected/postgres_fdw.out55
-rw-r--r--contrib/postgres_fdw/postgres_fdw.c65
-rw-r--r--contrib/postgres_fdw/sql/postgres_fdw.sql13
-rw-r--r--src/backend/executor/nodeAppend.c11
4 files changed, 126 insertions, 18 deletions
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index ed25e7a743f..78c05ee5ea8 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -10300,6 +10300,59 @@ SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505;
2505 | 505 | 0505 | 2505 | 505 | 0505
(1 row)
+CREATE TABLE local_tbl (a int, b int, c text);
+INSERT INTO local_tbl VALUES (1505, 505, 'foo');
+ANALYZE local_tbl;
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM local_tbl t1 LEFT JOIN (SELECT *, (SELECT count(*) FROM async_pt WHERE a < 3000) FROM async_pt WHERE a < 3000) t2 ON t1.a = t2.a;
+ QUERY PLAN
+----------------------------------------------------------------------------------------
+ Nested Loop Left Join
+ Output: t1.a, t1.b, t1.c, async_pt.a, async_pt.b, async_pt.c, ($0)
+ Join Filter: (t1.a = async_pt.a)
+ InitPlan 1 (returns $0)
+ -> Aggregate
+ Output: count(*)
+ -> Append
+ -> Async Foreign Scan on public.async_p1 async_pt_4
+ Remote SQL: SELECT NULL FROM public.base_tbl1 WHERE ((a < 3000))
+ -> Async Foreign Scan on public.async_p2 async_pt_5
+ Remote SQL: SELECT NULL FROM public.base_tbl2 WHERE ((a < 3000))
+ -> Seq Scan on public.local_tbl t1
+ Output: t1.a, t1.b, t1.c
+ -> Append
+ -> Async Foreign Scan on public.async_p1 async_pt_1
+ Output: async_pt_1.a, async_pt_1.b, async_pt_1.c, $0
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < 3000))
+ -> Async Foreign Scan on public.async_p2 async_pt_2
+ Output: async_pt_2.a, async_pt_2.b, async_pt_2.c, $0
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE ((a < 3000))
+(20 rows)
+
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT * FROM local_tbl t1 LEFT JOIN (SELECT *, (SELECT count(*) FROM async_pt WHERE a < 3000) FROM async_pt WHERE a < 3000) t2 ON t1.a = t2.a;
+ QUERY PLAN
+-----------------------------------------------------------------------------------------
+ Nested Loop Left Join (actual rows=1 loops=1)
+ Join Filter: (t1.a = async_pt.a)
+ Rows Removed by Join Filter: 399
+ InitPlan 1 (returns $0)
+ -> Aggregate (actual rows=1 loops=1)
+ -> Append (actual rows=400 loops=1)
+ -> Async Foreign Scan on async_p1 async_pt_4 (actual rows=200 loops=1)
+ -> Async Foreign Scan on async_p2 async_pt_5 (actual rows=200 loops=1)
+ -> Seq Scan on local_tbl t1 (actual rows=1 loops=1)
+ -> Append (actual rows=400 loops=1)
+ -> Async Foreign Scan on async_p1 async_pt_1 (actual rows=200 loops=1)
+ -> Async Foreign Scan on async_p2 async_pt_2 (actual rows=200 loops=1)
+(12 rows)
+
+SELECT * FROM local_tbl t1 LEFT JOIN (SELECT *, (SELECT count(*) FROM async_pt WHERE a < 3000) FROM async_pt WHERE a < 3000) t2 ON t1.a = t2.a;
+ a | b | c | a | b | c | count
+------+-----+-----+------+-----+------+-------
+ 1505 | 505 | foo | 1505 | 505 | 0505 | 400
+(1 row)
+
EXPLAIN (VERBOSE, COSTS OFF)
SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
QUERY PLAN
@@ -10342,8 +10395,6 @@ SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
(1 row)
-- Check with foreign modify
-CREATE TABLE local_tbl (a int, b int, c text);
-INSERT INTO local_tbl VALUES (1505, 505, 'foo');
CREATE TABLE base_tbl3 (a int, b int, c text);
CREATE FOREIGN TABLE remote_tbl (a int, b int, c text)
SERVER loopback OPTIONS (table_name 'base_tbl3');
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 51fac77f3d6..8dfee2e02c6 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -503,6 +503,7 @@ static void analyze_row_processor(PGresult *res, int row,
PgFdwAnalyzeState *astate);
static void produce_tuple_asynchronously(AsyncRequest *areq, bool fetch);
static void fetch_more_data_begin(AsyncRequest *areq);
+static void complete_pending_request(AsyncRequest *areq);
static HeapTuple make_tuple_from_result_row(PGresult *res,
int row,
Relation rel,
@@ -6826,6 +6827,22 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq)
/* This should not be called unless callback_pending */
Assert(areq->callback_pending);
+ /*
+ * If process_pending_request() has been invoked on the given request
+ * before we get here, we might have some tuples already; in which case
+ * complete the request
+ */
+ if (fsstate->next_tuple < fsstate->num_tuples)
+ {
+ complete_pending_request(areq);
+ if (areq->request_complete)
+ return;
+ Assert(areq->callback_pending);
+ }
+
+ /* We must have run out of tuples */
+ Assert(fsstate->next_tuple >= fsstate->num_tuples);
+
/* The core code would have registered postmaster death event */
Assert(GetNumRegisteredWaitEvents(set) >= 1);
@@ -6838,12 +6855,15 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq)
* This is the case when the in-process request was made by another
* Append. Note that it might be useless to process the request,
* because the query might not need tuples from that Append anymore.
- * Skip the given request if there are any configured events other
- * than the postmaster death event; otherwise process the request,
- * then begin a fetch to configure the event below, because otherwise
- * we might end up with no configured events other than the postmaster
- * death event.
+ * If there are any child subplans of the same parent that are ready
+ * for new requests, skip the given request. Likewise, if there are
+ * any configured events other than the postmaster death event, skip
+ * it. Otherwise, process the in-process request, then begin a fetch
+ * to configure the event below, because we might otherwise end up
+ * with no configured events other than the postmaster death event.
*/
+ if (!bms_is_empty(requestor->as_needrequest))
+ return;
if (GetNumRegisteredWaitEvents(set) > 1)
return;
process_pending_request(pendingAreq);
@@ -6995,23 +7015,44 @@ process_pending_request(AsyncRequest *areq)
{
ForeignScanState *node = (ForeignScanState *) areq->requestee;
PgFdwScanState *fsstate PG_USED_FOR_ASSERTS_ONLY = (PgFdwScanState *) node->fdw_state;
- EState *estate = node->ss.ps.state;
- MemoryContext oldcontext;
+
+ /* The request would have been pending for a callback */
+ Assert(areq->callback_pending);
/* The request should be currently in-process */
Assert(fsstate->conn_state->pendingAreq == areq);
- oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);
+ fetch_more_data(node);
+ /*
+ * If we didn't get any tuples, must be end of data; complete the request
+ * now. Otherwise, we postpone completing the request until we are called
+ * from postgresForeignAsyncConfigureWait().
+ */
+ if (fsstate->next_tuple >= fsstate->num_tuples)
+ {
+ /* Unlike AsyncNotify, we unset callback_pending ourselves */
+ areq->callback_pending = false;
+ /* Mark the request as complete */
+ ExecAsyncRequestDone(areq, NULL);
+ /* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */
+ ExecAsyncResponse(areq);
+ }
+}
+
+/*
+ * Complete a pending asynchronous request.
+ */
+static void
+complete_pending_request(AsyncRequest *areq)
+{
/* The request would have been pending for a callback */
Assert(areq->callback_pending);
/* Unlike AsyncNotify, we unset callback_pending ourselves */
areq->callback_pending = false;
- fetch_more_data(node);
-
- /* We need to send a new query afterwards; don't fetch */
+ /* We begin a fetch afterwards if necessary; don't fetch */
produce_tuple_asynchronously(areq, false);
/* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */
@@ -7021,8 +7062,6 @@ process_pending_request(AsyncRequest *areq)
if (areq->requestee->instrument)
InstrUpdateTupleCount(areq->requestee->instrument,
TupIsNull(areq->result) ? 0.0 : 1.0);
-
- MemoryContextSwitchTo(oldcontext);
}
/*
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 02a6b15a13f..75fff9bad04 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -3274,6 +3274,16 @@ EXPLAIN (VERBOSE, COSTS OFF)
SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505;
SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505;
+CREATE TABLE local_tbl (a int, b int, c text);
+INSERT INTO local_tbl VALUES (1505, 505, 'foo');
+ANALYZE local_tbl;
+
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM local_tbl t1 LEFT JOIN (SELECT *, (SELECT count(*) FROM async_pt WHERE a < 3000) FROM async_pt WHERE a < 3000) t2 ON t1.a = t2.a;
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT * FROM local_tbl t1 LEFT JOIN (SELECT *, (SELECT count(*) FROM async_pt WHERE a < 3000) FROM async_pt WHERE a < 3000) t2 ON t1.a = t2.a;
+SELECT * FROM local_tbl t1 LEFT JOIN (SELECT *, (SELECT count(*) FROM async_pt WHERE a < 3000) FROM async_pt WHERE a < 3000) t2 ON t1.a = t2.a;
+
EXPLAIN (VERBOSE, COSTS OFF)
SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
@@ -3281,9 +3291,6 @@ SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
-- Check with foreign modify
-CREATE TABLE local_tbl (a int, b int, c text);
-INSERT INTO local_tbl VALUES (1505, 505, 'foo');
-
CREATE TABLE base_tbl3 (a int, b int, c text);
CREATE FOREIGN TABLE remote_tbl (a int, b int, c text)
SERVER loopback OPTIONS (table_name 'base_tbl3');
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 755c1392f09..a4eef19d7f4 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -1043,6 +1043,17 @@ ExecAppendAsyncEventWait(AppendState *node)
ExecAsyncConfigureWait(areq);
}
+ /*
+ * No need for further processing if there are no configured events other
+ * than the postmaster death event.
+ */
+ if (GetNumRegisteredWaitEvents(node->as_eventset) == 1)
+ {
+ FreeWaitEventSet(node->as_eventset);
+ node->as_eventset = NULL;
+ return;
+ }
+
/* We wait on at most EVENT_BUFFER_SIZE events. */
if (nevents > EVENT_BUFFER_SIZE)
nevents = EVENT_BUFFER_SIZE;