aboutsummaryrefslogtreecommitdiff
path: root/contrib/postgres_fdw
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/postgres_fdw')
-rw-r--r--contrib/postgres_fdw/connection.c26
-rw-r--r--contrib/postgres_fdw/expected/postgres_fdw.out509
-rw-r--r--contrib/postgres_fdw/option.c6
-rw-r--r--contrib/postgres_fdw/postgres_fdw.c374
-rw-r--r--contrib/postgres_fdw/postgres_fdw.h17
-rw-r--r--contrib/postgres_fdw/sql/postgres_fdw.sql195
6 files changed, 1090 insertions, 37 deletions
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index ee0b4acf0ba..54ab8edfab6 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -62,6 +62,7 @@ typedef struct ConnCacheEntry
Oid serverid; /* foreign server OID used to get server name */
uint32 server_hashvalue; /* hash value of foreign server OID */
uint32 mapping_hashvalue; /* hash value of user mapping OID */
+ PgFdwConnState state; /* extra per-connection state */
} ConnCacheEntry;
/*
@@ -115,9 +116,12 @@ static bool disconnect_cached_connections(Oid serverid);
* will_prep_stmt must be true if caller intends to create any prepared
* statements. Since those don't go away automatically at transaction end
* (not even on error), we need this flag to cue manual cleanup.
+ *
+ * If state is not NULL, *state receives the per-connection state associated
+ * with the PGconn.
*/
PGconn *
-GetConnection(UserMapping *user, bool will_prep_stmt)
+GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
{
bool found;
bool retry = false;
@@ -196,6 +200,9 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
*/
PG_TRY();
{
+ /* Process a pending asynchronous request if any. */
+ if (entry->state.pendingAreq)
+ process_pending_request(entry->state.pendingAreq);
/* Start a new transaction or subtransaction if needed. */
begin_remote_xact(entry);
}
@@ -264,6 +271,10 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
/* Remember if caller will prepare statements */
entry->have_prep_stmt |= will_prep_stmt;
+ /* If caller needs access to the per-connection state, return it. */
+ if (state)
+ *state = &entry->state;
+
return entry->conn;
}
@@ -291,6 +302,7 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user)
entry->mapping_hashvalue =
GetSysCacheHashValue1(USERMAPPINGOID,
ObjectIdGetDatum(user->umid));
+ memset(&entry->state, 0, sizeof(entry->state));
/* Now try to make the connection */
entry->conn = connect_pg_server(server, user);
@@ -648,8 +660,12 @@ GetPrepStmtNumber(PGconn *conn)
* Caller is responsible for the error handling on the result.
*/
PGresult *
-pgfdw_exec_query(PGconn *conn, const char *query)
+pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
{
+ /* First, process a pending asynchronous request, if any. */
+ if (state && state->pendingAreq)
+ process_pending_request(state->pendingAreq);
+
/*
* Submit a query. Since we don't use non-blocking mode, this also can
* block. But its risk is relatively small, so we ignore that for now.
@@ -940,6 +956,8 @@ pgfdw_xact_callback(XactEvent event, void *arg)
{
entry->have_prep_stmt = false;
entry->have_error = false;
+ /* Also reset per-connection state */
+ memset(&entry->state, 0, sizeof(entry->state));
}
/* Disarm changing_xact_state if it all worked. */
@@ -1172,6 +1190,10 @@ pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
* Cancel the currently-in-progress query (whose query text we do not have)
* and ignore the result. Returns true if we successfully cancel the query
* and discard any pending result, and false if not.
+ *
+ * XXX: if the query was one sent by fetch_more_data_begin(), we could get the
+ * query text from the pendingAreq saved in the per-connection state, then
+ * report the query using it.
*/
static bool
pgfdw_cancel_query(PGconn *conn)
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index f2c91c47827..f61e59cd200 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -8946,7 +8946,7 @@ DO $d$
END;
$d$;
ERROR: invalid option "password"
-HINT: Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, sslcrldir, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, fetch_size, batch_size
+HINT: Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, sslcrldir, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, fetch_size, batch_size, async_capable
CONTEXT: SQL statement "ALTER SERVER loopback_nopw OPTIONS (ADD password 'dummypw')"
PL/pgSQL function inline_code_block line 3 at EXECUTE
-- If we add a password for our user mapping instead, we should get a different
@@ -9437,3 +9437,510 @@ SELECT tableoid::regclass, * FROM batch_cp_upd_test;
-- Clean up
DROP TABLE batch_table, batch_cp_upd_test CASCADE;
+-- ===================================================================
+-- test asynchronous execution
+-- ===================================================================
+ALTER SERVER loopback OPTIONS (DROP extensions);
+ALTER SERVER loopback OPTIONS (ADD async_capable 'true');
+ALTER SERVER loopback2 OPTIONS (ADD async_capable 'true');
+CREATE TABLE async_pt (a int, b int, c text) PARTITION BY RANGE (a);
+CREATE TABLE base_tbl1 (a int, b int, c text);
+CREATE TABLE base_tbl2 (a int, b int, c text);
+CREATE FOREIGN TABLE async_p1 PARTITION OF async_pt FOR VALUES FROM (1000) TO (2000)
+ SERVER loopback OPTIONS (table_name 'base_tbl1');
+CREATE FOREIGN TABLE async_p2 PARTITION OF async_pt FOR VALUES FROM (2000) TO (3000)
+ SERVER loopback2 OPTIONS (table_name 'base_tbl2');
+INSERT INTO async_p1 SELECT 1000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
+INSERT INTO async_p2 SELECT 2000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
+ANALYZE async_pt;
+-- simple queries
+CREATE TABLE result_tbl (a int, b int, c text);
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b % 100 = 0;
+ QUERY PLAN
+----------------------------------------------------------------------------------------
+ Insert on public.result_tbl
+ -> Append
+ -> Async Foreign Scan on public.async_p1 async_pt_1
+ Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE (((b % 100) = 0))
+ -> Async Foreign Scan on public.async_p2 async_pt_2
+ Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE (((b % 100) = 0))
+(8 rows)
+
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b % 100 = 0;
+SELECT * FROM result_tbl ORDER BY a;
+ a | b | c
+------+-----+------
+ 1000 | 0 | 0000
+ 1100 | 100 | 0100
+ 1200 | 200 | 0200
+ 1300 | 300 | 0300
+ 1400 | 400 | 0400
+ 1500 | 500 | 0500
+ 1600 | 600 | 0600
+ 1700 | 700 | 0700
+ 1800 | 800 | 0800
+ 1900 | 900 | 0900
+ 2000 | 0 | 0000
+ 2100 | 100 | 0100
+ 2200 | 200 | 0200
+ 2300 | 300 | 0300
+ 2400 | 400 | 0400
+ 2500 | 500 | 0500
+ 2600 | 600 | 0600
+ 2700 | 700 | 0700
+ 2800 | 800 | 0800
+ 2900 | 900 | 0900
+(20 rows)
+
+DELETE FROM result_tbl;
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
+ QUERY PLAN
+----------------------------------------------------------------
+ Insert on public.result_tbl
+ -> Append
+ -> Async Foreign Scan on public.async_p1 async_pt_1
+ Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+ Filter: (async_pt_1.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1
+ -> Async Foreign Scan on public.async_p2 async_pt_2
+ Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+ Filter: (async_pt_2.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2
+(10 rows)
+
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
+SELECT * FROM result_tbl ORDER BY a;
+ a | b | c
+------+-----+------
+ 1505 | 505 | 0505
+ 2505 | 505 | 0505
+(2 rows)
+
+DELETE FROM result_tbl;
+-- Check case where multiple partitions use the same connection
+CREATE TABLE base_tbl3 (a int, b int, c text);
+CREATE FOREIGN TABLE async_p3 PARTITION OF async_pt FOR VALUES FROM (3000) TO (4000)
+ SERVER loopback2 OPTIONS (table_name 'base_tbl3');
+INSERT INTO async_p3 SELECT 3000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
+ANALYZE async_pt;
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
+ QUERY PLAN
+----------------------------------------------------------------
+ Insert on public.result_tbl
+ -> Append
+ -> Async Foreign Scan on public.async_p1 async_pt_1
+ Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+ Filter: (async_pt_1.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1
+ -> Async Foreign Scan on public.async_p2 async_pt_2
+ Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+ Filter: (async_pt_2.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2
+ -> Async Foreign Scan on public.async_p3 async_pt_3
+ Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
+ Filter: (async_pt_3.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl3
+(14 rows)
+
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
+SELECT * FROM result_tbl ORDER BY a;
+ a | b | c
+------+-----+------
+ 1505 | 505 | 0505
+ 2505 | 505 | 0505
+ 3505 | 505 | 0505
+(3 rows)
+
+DELETE FROM result_tbl;
+DROP FOREIGN TABLE async_p3;
+DROP TABLE base_tbl3;
+-- Check case where the partitioned table has local/remote partitions
+CREATE TABLE async_p3 PARTITION OF async_pt FOR VALUES FROM (3000) TO (4000);
+INSERT INTO async_p3 SELECT 3000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
+ANALYZE async_pt;
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
+ QUERY PLAN
+----------------------------------------------------------------
+ Insert on public.result_tbl
+ -> Append
+ -> Async Foreign Scan on public.async_p1 async_pt_1
+ Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+ Filter: (async_pt_1.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1
+ -> Async Foreign Scan on public.async_p2 async_pt_2
+ Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+ Filter: (async_pt_2.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2
+ -> Seq Scan on public.async_p3 async_pt_3
+ Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
+ Filter: (async_pt_3.b === 505)
+(13 rows)
+
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
+SELECT * FROM result_tbl ORDER BY a;
+ a | b | c
+------+-----+------
+ 1505 | 505 | 0505
+ 2505 | 505 | 0505
+ 3505 | 505 | 0505
+(3 rows)
+
+DELETE FROM result_tbl;
+-- partitionwise joins
+SET enable_partitionwise_join TO true;
+CREATE TABLE join_tbl (a1 int, b1 int, c1 text, a2 int, b2 int, c2 text);
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO join_tbl SELECT * FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0;
+ QUERY PLAN
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Insert on public.join_tbl
+ -> Append
+ -> Async Foreign Scan
+ Output: t1_1.a, t1_1.b, t1_1.c, t2_1.a, t2_1.b, t2_1.c
+ Relations: (public.async_p1 t1_1) INNER JOIN (public.async_p1 t2_1)
+ Remote SQL: SELECT r5.a, r5.b, r5.c, r8.a, r8.b, r8.c FROM (public.base_tbl1 r5 INNER JOIN public.base_tbl1 r8 ON (((r5.a = r8.a)) AND ((r5.b = r8.b)) AND (((r5.b % 100) = 0))))
+ -> Async Foreign Scan
+ Output: t1_2.a, t1_2.b, t1_2.c, t2_2.a, t2_2.b, t2_2.c
+ Relations: (public.async_p2 t1_2) INNER JOIN (public.async_p2 t2_2)
+ Remote SQL: SELECT r6.a, r6.b, r6.c, r9.a, r9.b, r9.c FROM (public.base_tbl2 r6 INNER JOIN public.base_tbl2 r9 ON (((r6.a = r9.a)) AND ((r6.b = r9.b)) AND (((r6.b % 100) = 0))))
+ -> Hash Join
+ Output: t1_3.a, t1_3.b, t1_3.c, t2_3.a, t2_3.b, t2_3.c
+ Hash Cond: ((t2_3.a = t1_3.a) AND (t2_3.b = t1_3.b))
+ -> Seq Scan on public.async_p3 t2_3
+ Output: t2_3.a, t2_3.b, t2_3.c
+ -> Hash
+ Output: t1_3.a, t1_3.b, t1_3.c
+ -> Seq Scan on public.async_p3 t1_3
+ Output: t1_3.a, t1_3.b, t1_3.c
+ Filter: ((t1_3.b % 100) = 0)
+(20 rows)
+
+INSERT INTO join_tbl SELECT * FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0;
+SELECT * FROM join_tbl ORDER BY a1;
+ a1 | b1 | c1 | a2 | b2 | c2
+------+-----+------+------+-----+------
+ 1000 | 0 | 0000 | 1000 | 0 | 0000
+ 1100 | 100 | 0100 | 1100 | 100 | 0100
+ 1200 | 200 | 0200 | 1200 | 200 | 0200
+ 1300 | 300 | 0300 | 1300 | 300 | 0300
+ 1400 | 400 | 0400 | 1400 | 400 | 0400
+ 1500 | 500 | 0500 | 1500 | 500 | 0500
+ 1600 | 600 | 0600 | 1600 | 600 | 0600
+ 1700 | 700 | 0700 | 1700 | 700 | 0700
+ 1800 | 800 | 0800 | 1800 | 800 | 0800
+ 1900 | 900 | 0900 | 1900 | 900 | 0900
+ 2000 | 0 | 0000 | 2000 | 0 | 0000
+ 2100 | 100 | 0100 | 2100 | 100 | 0100
+ 2200 | 200 | 0200 | 2200 | 200 | 0200
+ 2300 | 300 | 0300 | 2300 | 300 | 0300
+ 2400 | 400 | 0400 | 2400 | 400 | 0400
+ 2500 | 500 | 0500 | 2500 | 500 | 0500
+ 2600 | 600 | 0600 | 2600 | 600 | 0600
+ 2700 | 700 | 0700 | 2700 | 700 | 0700
+ 2800 | 800 | 0800 | 2800 | 800 | 0800
+ 2900 | 900 | 0900 | 2900 | 900 | 0900
+ 3000 | 0 | 0000 | 3000 | 0 | 0000
+ 3100 | 100 | 0100 | 3100 | 100 | 0100
+ 3200 | 200 | 0200 | 3200 | 200 | 0200
+ 3300 | 300 | 0300 | 3300 | 300 | 0300
+ 3400 | 400 | 0400 | 3400 | 400 | 0400
+ 3500 | 500 | 0500 | 3500 | 500 | 0500
+ 3600 | 600 | 0600 | 3600 | 600 | 0600
+ 3700 | 700 | 0700 | 3700 | 700 | 0700
+ 3800 | 800 | 0800 | 3800 | 800 | 0800
+ 3900 | 900 | 0900 | 3900 | 900 | 0900
+(30 rows)
+
+DELETE FROM join_tbl;
+RESET enable_partitionwise_join;
+-- Test interaction of async execution with plan-time partition pruning
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE a < 3000;
+ QUERY PLAN
+-----------------------------------------------------------------------------
+ Append
+ -> Async Foreign Scan on public.async_p1 async_pt_1
+ Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < 3000))
+ -> Async Foreign Scan on public.async_p2 async_pt_2
+ Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE ((a < 3000))
+(7 rows)
+
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE a < 2000;
+ QUERY PLAN
+-----------------------------------------------------------------------
+ Foreign Scan on public.async_p1 async_pt
+ Output: async_pt.a, async_pt.b, async_pt.c
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < 2000))
+(3 rows)
+
+-- Test interaction of async execution with run-time partition pruning
+SET plan_cache_mode TO force_generic_plan;
+PREPARE async_pt_query (int, int) AS
+ INSERT INTO result_tbl SELECT * FROM async_pt WHERE a < $1 AND b === $2;
+EXPLAIN (VERBOSE, COSTS OFF)
+EXECUTE async_pt_query (3000, 505);
+ QUERY PLAN
+------------------------------------------------------------------------------------------
+ Insert on public.result_tbl
+ -> Append
+ Subplans Removed: 1
+ -> Async Foreign Scan on public.async_p1 async_pt_1
+ Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+ Filter: (async_pt_1.b === $2)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < $1::integer))
+ -> Async Foreign Scan on public.async_p2 async_pt_2
+ Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+ Filter: (async_pt_2.b === $2)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE ((a < $1::integer))
+(11 rows)
+
+EXECUTE async_pt_query (3000, 505);
+SELECT * FROM result_tbl ORDER BY a;
+ a | b | c
+------+-----+------
+ 1505 | 505 | 0505
+ 2505 | 505 | 0505
+(2 rows)
+
+DELETE FROM result_tbl;
+EXPLAIN (VERBOSE, COSTS OFF)
+EXECUTE async_pt_query (2000, 505);
+ QUERY PLAN
+------------------------------------------------------------------------------------------
+ Insert on public.result_tbl
+ -> Append
+ Subplans Removed: 2
+ -> Async Foreign Scan on public.async_p1 async_pt_1
+ Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+ Filter: (async_pt_1.b === $2)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < $1::integer))
+(7 rows)
+
+EXECUTE async_pt_query (2000, 505);
+SELECT * FROM result_tbl ORDER BY a;
+ a | b | c
+------+-----+------
+ 1505 | 505 | 0505
+(1 row)
+
+DELETE FROM result_tbl;
+RESET plan_cache_mode;
+CREATE TABLE local_tbl(a int, b int, c text);
+INSERT INTO local_tbl VALUES (1505, 505, 'foo'), (2505, 505, 'bar');
+ANALYZE local_tbl;
+CREATE INDEX base_tbl1_idx ON base_tbl1 (a);
+CREATE INDEX base_tbl2_idx ON base_tbl2 (a);
+CREATE INDEX async_p3_idx ON async_p3 (a);
+ANALYZE base_tbl1;
+ANALYZE base_tbl2;
+ANALYZE async_p3;
+ALTER FOREIGN TABLE async_p1 OPTIONS (use_remote_estimate 'true');
+ALTER FOREIGN TABLE async_p2 OPTIONS (use_remote_estimate 'true');
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar';
+ QUERY PLAN
+------------------------------------------------------------------------------------------
+ Nested Loop
+ Output: local_tbl.a, local_tbl.b, local_tbl.c, async_pt.a, async_pt.b, async_pt.c
+ -> Seq Scan on public.local_tbl
+ Output: local_tbl.a, local_tbl.b, local_tbl.c
+ Filter: (local_tbl.c = 'bar'::text)
+ -> Append
+ -> Async Foreign Scan on public.async_p1 async_pt_1
+ Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE (($1::integer = a))
+ -> Async Foreign Scan on public.async_p2 async_pt_2
+ Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE (($1::integer = a))
+ -> Seq Scan on public.async_p3 async_pt_3
+ Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
+ Filter: (local_tbl.a = async_pt_3.a)
+(15 rows)
+
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar';
+ QUERY PLAN
+-------------------------------------------------------------------------------
+ Nested Loop (actual rows=1 loops=1)
+ -> Seq Scan on local_tbl (actual rows=1 loops=1)
+ Filter: (c = 'bar'::text)
+ Rows Removed by Filter: 1
+ -> Append (actual rows=1 loops=1)
+ -> Async Foreign Scan on async_p1 async_pt_1 (never executed)
+ -> Async Foreign Scan on async_p2 async_pt_2 (actual rows=1 loops=1)
+ -> Seq Scan on async_p3 async_pt_3 (never executed)
+ Filter: (local_tbl.a = a)
+(9 rows)
+
+SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar';
+ a | b | c | a | b | c
+------+-----+-----+------+-----+------
+ 2505 | 505 | bar | 2505 | 505 | 0505
+(1 row)
+
+ALTER FOREIGN TABLE async_p1 OPTIONS (DROP use_remote_estimate);
+ALTER FOREIGN TABLE async_p2 OPTIONS (DROP use_remote_estimate);
+DROP TABLE local_tbl;
+DROP INDEX base_tbl1_idx;
+DROP INDEX base_tbl2_idx;
+DROP INDEX async_p3_idx;
+-- Test that pending requests are processed properly
+SET enable_mergejoin TO false;
+SET enable_hashjoin TO false;
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505;
+ QUERY PLAN
+----------------------------------------------------------------
+ Nested Loop
+ Output: t1.a, t1.b, t1.c, t2.a, t2.b, t2.c
+ Join Filter: (t1.a = t2.a)
+ -> Append
+ -> Async Foreign Scan on public.async_p1 t1_1
+ Output: t1_1.a, t1_1.b, t1_1.c
+ Filter: (t1_1.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1
+ -> Async Foreign Scan on public.async_p2 t1_2
+ Output: t1_2.a, t1_2.b, t1_2.c
+ Filter: (t1_2.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2
+ -> Seq Scan on public.async_p3 t1_3
+ Output: t1_3.a, t1_3.b, t1_3.c
+ Filter: (t1_3.b === 505)
+ -> Materialize
+ Output: t2.a, t2.b, t2.c
+ -> Foreign Scan on public.async_p2 t2
+ Output: t2.a, t2.b, t2.c
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2
+(20 rows)
+
+SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505;
+ a | b | c | a | b | c
+------+-----+------+------+-----+------
+ 2505 | 505 | 0505 | 2505 | 505 | 0505
+(1 row)
+
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
+ QUERY PLAN
+----------------------------------------------------------------
+ Limit
+ Output: t1.a, t1.b, t1.c
+ -> Append
+ -> Async Foreign Scan on public.async_p1 t1_1
+ Output: t1_1.a, t1_1.b, t1_1.c
+ Filter: (t1_1.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1
+ -> Async Foreign Scan on public.async_p2 t1_2
+ Output: t1_2.a, t1_2.b, t1_2.c
+ Filter: (t1_2.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2
+ -> Seq Scan on public.async_p3 t1_3
+ Output: t1_3.a, t1_3.b, t1_3.c
+ Filter: (t1_3.b === 505)
+(14 rows)
+
+SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
+ a | b | c
+------+-----+------
+ 3505 | 505 | 0505
+(1 row)
+
+-- Check with foreign modify
+CREATE TABLE local_tbl (a int, b int, c text);
+INSERT INTO local_tbl VALUES (1505, 505, 'foo');
+CREATE TABLE base_tbl3 (a int, b int, c text);
+CREATE FOREIGN TABLE remote_tbl (a int, b int, c text)
+ SERVER loopback OPTIONS (table_name 'base_tbl3');
+INSERT INTO remote_tbl VALUES (2505, 505, 'bar');
+CREATE TABLE base_tbl4 (a int, b int, c text);
+CREATE FOREIGN TABLE insert_tbl (a int, b int, c text)
+ SERVER loopback OPTIONS (table_name 'base_tbl4');
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO insert_tbl (SELECT * FROM local_tbl UNION ALL SELECT * FROM remote_tbl);
+ QUERY PLAN
+-------------------------------------------------------------------------
+ Insert on public.insert_tbl
+ Remote SQL: INSERT INTO public.base_tbl4(a, b, c) VALUES ($1, $2, $3)
+ Batch Size: 1
+ -> Append
+ -> Seq Scan on public.local_tbl
+ Output: local_tbl.a, local_tbl.b, local_tbl.c
+ -> Async Foreign Scan on public.remote_tbl
+ Output: remote_tbl.a, remote_tbl.b, remote_tbl.c
+ Remote SQL: SELECT a, b, c FROM public.base_tbl3
+(9 rows)
+
+INSERT INTO insert_tbl (SELECT * FROM local_tbl UNION ALL SELECT * FROM remote_tbl);
+SELECT * FROM insert_tbl ORDER BY a;
+ a | b | c
+------+-----+-----
+ 1505 | 505 | foo
+ 2505 | 505 | bar
+(2 rows)
+
+-- Check with direct modify
+EXPLAIN (VERBOSE, COSTS OFF)
+WITH t AS (UPDATE remote_tbl SET c = c || c RETURNING *)
+INSERT INTO join_tbl SELECT * FROM async_pt LEFT JOIN t ON (async_pt.a = t.a AND async_pt.b = t.b) WHERE async_pt.b === 505;
+ QUERY PLAN
+----------------------------------------------------------------------------------------
+ Insert on public.join_tbl
+ CTE t
+ -> Update on public.remote_tbl
+ Output: remote_tbl.a, remote_tbl.b, remote_tbl.c
+ -> Foreign Update on public.remote_tbl
+ Remote SQL: UPDATE public.base_tbl3 SET c = (c || c) RETURNING a, b, c
+ -> Nested Loop Left Join
+ Output: async_pt.a, async_pt.b, async_pt.c, t.a, t.b, t.c
+ Join Filter: ((async_pt.a = t.a) AND (async_pt.b = t.b))
+ -> Append
+ -> Async Foreign Scan on public.async_p1 async_pt_1
+ Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+ Filter: (async_pt_1.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1
+ -> Async Foreign Scan on public.async_p2 async_pt_2
+ Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+ Filter: (async_pt_2.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2
+ -> Seq Scan on public.async_p3 async_pt_3
+ Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
+ Filter: (async_pt_3.b === 505)
+ -> CTE Scan on t
+ Output: t.a, t.b, t.c
+(23 rows)
+
+WITH t AS (UPDATE remote_tbl SET c = c || c RETURNING *)
+INSERT INTO join_tbl SELECT * FROM async_pt LEFT JOIN t ON (async_pt.a = t.a AND async_pt.b = t.b) WHERE async_pt.b === 505;
+SELECT * FROM join_tbl ORDER BY a1;
+ a1 | b1 | c1 | a2 | b2 | c2
+------+-----+------+------+-----+--------
+ 1505 | 505 | 0505 | | |
+ 2505 | 505 | 0505 | 2505 | 505 | barbar
+ 3505 | 505 | 0505 | | |
+(3 rows)
+
+DELETE FROM join_tbl;
+RESET enable_mergejoin;
+RESET enable_hashjoin;
+-- Clean up
+DROP TABLE async_pt;
+DROP TABLE base_tbl1;
+DROP TABLE base_tbl2;
+DROP TABLE result_tbl;
+DROP TABLE local_tbl;
+DROP FOREIGN TABLE remote_tbl;
+DROP FOREIGN TABLE insert_tbl;
+DROP TABLE base_tbl3;
+DROP TABLE base_tbl4;
+DROP TABLE join_tbl;
+ALTER SERVER loopback OPTIONS (DROP async_capable);
+ALTER SERVER loopback2 OPTIONS (DROP async_capable);
diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c
index 64698c4da3a..530d7a66d40 100644
--- a/contrib/postgres_fdw/option.c
+++ b/contrib/postgres_fdw/option.c
@@ -107,7 +107,8 @@ postgres_fdw_validator(PG_FUNCTION_ARGS)
* Validate option value, when we can do so without any context.
*/
if (strcmp(def->defname, "use_remote_estimate") == 0 ||
- strcmp(def->defname, "updatable") == 0)
+ strcmp(def->defname, "updatable") == 0 ||
+ strcmp(def->defname, "async_capable") == 0)
{
/* these accept only boolean values */
(void) defGetBoolean(def);
@@ -217,6 +218,9 @@ InitPgFdwOptions(void)
/* batch_size is available on both server and table */
{"batch_size", ForeignServerRelationId, false},
{"batch_size", ForeignTableRelationId, false},
+ /* async_capable is available on both server and table */
+ {"async_capable", ForeignServerRelationId, false},
+ {"async_capable", ForeignTableRelationId, false},
{"password_required", UserMappingRelationId, false},
/*
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 20b25935ce6..cc73a6902f5 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -21,6 +21,7 @@
#include "commands/defrem.h"
#include "commands/explain.h"
#include "commands/vacuum.h"
+#include "executor/execAsync.h"
#include "foreign/fdwapi.h"
#include "funcapi.h"
#include "miscadmin.h"
@@ -37,6 +38,7 @@
#include "optimizer/tlist.h"
#include "parser/parsetree.h"
#include "postgres_fdw.h"
+#include "storage/latch.h"
#include "utils/builtins.h"
#include "utils/float.h"
#include "utils/guc.h"
@@ -143,6 +145,7 @@ typedef struct PgFdwScanState
/* for remote query execution */
PGconn *conn; /* connection for the scan */
+ PgFdwConnState *conn_state; /* extra per-connection state */
unsigned int cursor_number; /* quasi-unique ID for my cursor */
bool cursor_exists; /* have we created the cursor? */
int numParams; /* number of parameters passed to query */
@@ -159,6 +162,9 @@ typedef struct PgFdwScanState
int fetch_ct_2; /* Min(# of fetches done, 2) */
bool eof_reached; /* true if last fetch reached EOF */
+ /* for asynchronous execution */
+ bool async_capable; /* engage asynchronous-capable logic? */
+
/* working memory contexts */
MemoryContext batch_cxt; /* context holding current batch of tuples */
MemoryContext temp_cxt; /* context for per-tuple temporary data */
@@ -176,6 +182,7 @@ typedef struct PgFdwModifyState
/* for remote query execution */
PGconn *conn; /* connection for the scan */
+ PgFdwConnState *conn_state; /* extra per-connection state */
char *p_name; /* name of prepared statement, if created */
/* extracted fdw_private data */
@@ -219,6 +226,7 @@ typedef struct PgFdwDirectModifyState
/* for remote query execution */
PGconn *conn; /* connection for the update */
+ PgFdwConnState *conn_state; /* extra per-connection state */
int numParams; /* number of parameters passed to query */
FmgrInfo *param_flinfo; /* output conversion functions for them */
List *param_exprs; /* executable expressions for param values */
@@ -408,6 +416,10 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root,
RelOptInfo *input_rel,
RelOptInfo *output_rel,
void *extra);
+static bool postgresIsForeignPathAsyncCapable(ForeignPath *path);
+static void postgresForeignAsyncRequest(AsyncRequest *areq);
+static void postgresForeignAsyncConfigureWait(AsyncRequest *areq);
+static void postgresForeignAsyncNotify(AsyncRequest *areq);
/*
* Helper functions
@@ -437,7 +449,8 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
void *arg);
static void create_cursor(ForeignScanState *node);
static void fetch_more_data(ForeignScanState *node);
-static void close_cursor(PGconn *conn, unsigned int cursor_number);
+static void close_cursor(PGconn *conn, unsigned int cursor_number,
+ PgFdwConnState *conn_state);
static PgFdwModifyState *create_foreign_modify(EState *estate,
RangeTblEntry *rte,
ResultRelInfo *resultRelInfo,
@@ -491,6 +504,8 @@ static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
double *totaldeadrows);
static void analyze_row_processor(PGresult *res, int row,
PgFdwAnalyzeState *astate);
+static void produce_tuple_asynchronously(AsyncRequest *areq, bool fetch);
+static void fetch_more_data_begin(AsyncRequest *areq);
static HeapTuple make_tuple_from_result_row(PGresult *res,
int row,
Relation rel,
@@ -583,6 +598,12 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
/* Support functions for upper relation push-down */
routine->GetForeignUpperPaths = postgresGetForeignUpperPaths;
+ /* Support functions for asynchronous execution */
+ routine->IsForeignPathAsyncCapable = postgresIsForeignPathAsyncCapable;
+ routine->ForeignAsyncRequest = postgresForeignAsyncRequest;
+ routine->ForeignAsyncConfigureWait = postgresForeignAsyncConfigureWait;
+ routine->ForeignAsyncNotify = postgresForeignAsyncNotify;
+
PG_RETURN_POINTER(routine);
}
@@ -618,14 +639,15 @@ postgresGetForeignRelSize(PlannerInfo *root,
/*
* Extract user-settable option values. Note that per-table settings of
- * use_remote_estimate and fetch_size override per-server settings of
- * them, respectively.
+ * use_remote_estimate, fetch_size and async_capable override per-server
+ * settings of them, respectively.
*/
fpinfo->use_remote_estimate = false;
fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST;
fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST;
fpinfo->shippable_extensions = NIL;
fpinfo->fetch_size = 100;
+ fpinfo->async_capable = false;
apply_server_options(fpinfo);
apply_table_options(fpinfo);
@@ -1459,7 +1481,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
* Get connection to the foreign server. Connection manager will
* establish new connection if necessary.
*/
- fsstate->conn = GetConnection(user, false);
+ fsstate->conn = GetConnection(user, false, &fsstate->conn_state);
/* Assign a unique ID for my cursor */
fsstate->cursor_number = GetCursorNumber(fsstate->conn);
@@ -1510,6 +1532,9 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
&fsstate->param_flinfo,
&fsstate->param_exprs,
&fsstate->param_values);
+
+ /* Set the async-capable flag */
+ fsstate->async_capable = node->ss.ps.plan->async_capable;
}
/*
@@ -1524,8 +1549,10 @@ postgresIterateForeignScan(ForeignScanState *node)
TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
/*
- * If this is the first call after Begin or ReScan, we need to create the
- * cursor on the remote side.
+ * In sync mode, if this is the first call after Begin or ReScan, we need
+ * to create the cursor on the remote side. In async mode, we would have
+ * already created the cursor before we get here, even if this is the
+ * first call after Begin or ReScan.
*/
if (!fsstate->cursor_exists)
create_cursor(node);
@@ -1535,6 +1562,9 @@ postgresIterateForeignScan(ForeignScanState *node)
*/
if (fsstate->next_tuple >= fsstate->num_tuples)
{
+ /* In async mode, just clear tuple slot. */
+ if (fsstate->async_capable)
+ return ExecClearTuple(slot);
/* No point in another fetch if we already detected EOF, though. */
if (!fsstate->eof_reached)
fetch_more_data(node);
@@ -1596,7 +1626,7 @@ postgresReScanForeignScan(ForeignScanState *node)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = pgfdw_exec_query(fsstate->conn, sql);
+ res = pgfdw_exec_query(fsstate->conn, sql, fsstate->conn_state);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
PQclear(res);
@@ -1624,7 +1654,8 @@ postgresEndForeignScan(ForeignScanState *node)
/* Close the cursor if open, to prevent accumulation of cursors */
if (fsstate->cursor_exists)
- close_cursor(fsstate->conn, fsstate->cursor_number);
+ close_cursor(fsstate->conn, fsstate->cursor_number,
+ fsstate->conn_state);
/* Release remote connection */
ReleaseConnection(fsstate->conn);
@@ -2501,7 +2532,7 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
* Get connection to the foreign server. Connection manager will
* establish new connection if necessary.
*/
- dmstate->conn = GetConnection(user, false);
+ dmstate->conn = GetConnection(user, false, &dmstate->conn_state);
/* Update the foreign-join-related fields. */
if (fsplan->scan.scanrelid == 0)
@@ -2882,7 +2913,7 @@ estimate_path_cost_size(PlannerInfo *root,
false, &retrieved_attrs, NULL);
/* Get the remote estimate */
- conn = GetConnection(fpinfo->user, false);
+ conn = GetConnection(fpinfo->user, false, NULL);
get_remote_estimate(sql.data, conn, &rows, &width,
&startup_cost, &total_cost);
ReleaseConnection(conn);
@@ -3328,7 +3359,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
/*
* Execute EXPLAIN remotely.
*/
- res = pgfdw_exec_query(conn, sql);
+ res = pgfdw_exec_query(conn, sql, NULL);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, sql);
@@ -3452,6 +3483,10 @@ create_cursor(ForeignScanState *node)
StringInfoData buf;
PGresult *res;
+ /* First, process a pending asynchronous request, if any. */
+ if (fsstate->conn_state->pendingAreq)
+ process_pending_request(fsstate->conn_state->pendingAreq);
+
/*
* Construct array of query parameter values in text format. We do the
* conversions in the short-lived per-tuple context, so as not to cause a
@@ -3532,17 +3567,38 @@ fetch_more_data(ForeignScanState *node)
PG_TRY();
{
PGconn *conn = fsstate->conn;
- char sql[64];
int numrows;
int i;
- snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
- fsstate->fetch_size, fsstate->cursor_number);
+ if (fsstate->async_capable)
+ {
+ Assert(fsstate->conn_state->pendingAreq);
- res = pgfdw_exec_query(conn, sql);
- /* On error, report the original query, not the FETCH. */
- if (PQresultStatus(res) != PGRES_TUPLES_OK)
- pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+ /*
+ * The query was already sent by an earlier call to
+ * fetch_more_data_begin. So now we just fetch the result.
+ */
+ res = pgfdw_get_result(conn, fsstate->query);
+ /* On error, report the original query, not the FETCH. */
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+
+ /* Reset per-connection state */
+ fsstate->conn_state->pendingAreq = NULL;
+ }
+ else
+ {
+ char sql[64];
+
+ /* This is a regular synchronous fetch. */
+ snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
+ fsstate->fetch_size, fsstate->cursor_number);
+
+ res = pgfdw_exec_query(conn, sql, fsstate->conn_state);
+ /* On error, report the original query, not the FETCH. */
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+ }
/* Convert the data into HeapTuples */
numrows = PQntuples(res);
@@ -3634,7 +3690,8 @@ reset_transmission_modes(int nestlevel)
* Utility routine to close a cursor.
*/
static void
-close_cursor(PGconn *conn, unsigned int cursor_number)
+close_cursor(PGconn *conn, unsigned int cursor_number,
+ PgFdwConnState *conn_state)
{
char sql[64];
PGresult *res;
@@ -3645,7 +3702,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = pgfdw_exec_query(conn, sql);
+ res = pgfdw_exec_query(conn, sql, conn_state);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, conn, true, sql);
PQclear(res);
@@ -3694,7 +3751,7 @@ create_foreign_modify(EState *estate,
user = GetUserMapping(userid, table->serverid);
/* Open connection; report that we'll create a prepared statement. */
- fmstate->conn = GetConnection(user, true);
+ fmstate->conn = GetConnection(user, true, &fmstate->conn_state);
fmstate->p_name = NULL; /* prepared statement not made yet */
/* Set up remote query information. */
@@ -3793,6 +3850,10 @@ execute_foreign_modify(EState *estate,
operation == CMD_UPDATE ||
operation == CMD_DELETE);
+ /* First, process a pending asynchronous request, if any. */
+ if (fmstate->conn_state->pendingAreq)
+ process_pending_request(fmstate->conn_state->pendingAreq);
+
/*
* If the existing query was deparsed and prepared for a different number
* of rows, rebuild it for the proper number.
@@ -3894,6 +3955,11 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
char *p_name;
PGresult *res;
+ /*
+ * The caller would already have processed a pending asynchronous request
+ * if any, so no need to do it here.
+ */
+
/* Construct name we'll use for the prepared statement. */
snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
GetPrepStmtNumber(fmstate->conn));
@@ -4079,7 +4145,7 @@ deallocate_query(PgFdwModifyState *fmstate)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = pgfdw_exec_query(fmstate->conn, sql);
+ res = pgfdw_exec_query(fmstate->conn, sql, fmstate->conn_state);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
PQclear(res);
@@ -4227,6 +4293,10 @@ execute_dml_stmt(ForeignScanState *node)
int numParams = dmstate->numParams;
const char **values = dmstate->param_values;
+ /* First, process a pending asynchronous request, if any. */
+ if (dmstate->conn_state->pendingAreq)
+ process_pending_request(dmstate->conn_state->pendingAreq);
+
/*
* Construct array of query parameter values in text format.
*/
@@ -4628,7 +4698,7 @@ postgresAnalyzeForeignTable(Relation relation,
*/
table = GetForeignTable(RelationGetRelid(relation));
user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
- conn = GetConnection(user, false);
+ conn = GetConnection(user, false, NULL);
/*
* Construct command to get page count for relation.
@@ -4639,7 +4709,7 @@ postgresAnalyzeForeignTable(Relation relation,
/* In what follows, do not risk leaking any PGresults. */
PG_TRY();
{
- res = pgfdw_exec_query(conn, sql.data);
+ res = pgfdw_exec_query(conn, sql.data, NULL);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, sql.data);
@@ -4714,7 +4784,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
table = GetForeignTable(RelationGetRelid(relation));
server = GetForeignServer(table->serverid);
user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
- conn = GetConnection(user, false);
+ conn = GetConnection(user, false, NULL);
/*
* Construct cursor that retrieves whole rows from remote.
@@ -4731,7 +4801,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
int fetch_size;
ListCell *lc;
- res = pgfdw_exec_query(conn, sql.data);
+ res = pgfdw_exec_query(conn, sql.data, NULL);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, conn, false, sql.data);
PQclear(res);
@@ -4783,7 +4853,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
*/
/* Fetch some rows */
- res = pgfdw_exec_query(conn, fetch_sql);
+ res = pgfdw_exec_query(conn, fetch_sql, NULL);
/* On error, report the original query, not the FETCH. */
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, sql.data);
@@ -4802,7 +4872,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
}
/* Close the cursor, just to be tidy. */
- close_cursor(conn, cursor_number);
+ close_cursor(conn, cursor_number, NULL);
}
PG_CATCH();
{
@@ -4942,7 +5012,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
*/
server = GetForeignServer(serverOid);
mapping = GetUserMapping(GetUserId(), server->serverid);
- conn = GetConnection(mapping, false);
+ conn = GetConnection(mapping, false, NULL);
/* Don't attempt to import collation if remote server hasn't got it */
if (PQserverVersion(conn) < 90100)
@@ -4958,7 +5028,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
deparseStringLiteral(&buf, stmt->remote_schema);
- res = pgfdw_exec_query(conn, buf.data);
+ res = pgfdw_exec_query(conn, buf.data, NULL);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, buf.data);
@@ -5070,7 +5140,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
/* Fetch the data */
- res = pgfdw_exec_query(conn, buf.data);
+ res = pgfdw_exec_query(conn, buf.data, NULL);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, buf.data);
@@ -5530,6 +5600,8 @@ apply_server_options(PgFdwRelationInfo *fpinfo)
ExtractExtensionList(defGetString(def), false);
else if (strcmp(def->defname, "fetch_size") == 0)
fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
+ else if (strcmp(def->defname, "async_capable") == 0)
+ fpinfo->async_capable = defGetBoolean(def);
}
}
@@ -5551,6 +5623,8 @@ apply_table_options(PgFdwRelationInfo *fpinfo)
fpinfo->use_remote_estimate = defGetBoolean(def);
else if (strcmp(def->defname, "fetch_size") == 0)
fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
+ else if (strcmp(def->defname, "async_capable") == 0)
+ fpinfo->async_capable = defGetBoolean(def);
}
}
@@ -5585,6 +5659,7 @@ merge_fdw_options(PgFdwRelationInfo *fpinfo,
fpinfo->shippable_extensions = fpinfo_o->shippable_extensions;
fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate;
fpinfo->fetch_size = fpinfo_o->fetch_size;
+ fpinfo->async_capable = fpinfo_o->async_capable;
/* Merge the table level options from either side of the join. */
if (fpinfo_i)
@@ -5606,6 +5681,13 @@ merge_fdw_options(PgFdwRelationInfo *fpinfo,
* relation sizes.
*/
fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size);
+
+ /*
+ * We'll prefer to consider this join async-capable if any table from
+ * either side of the join is considered async-capable.
+ */
+ fpinfo->async_capable = fpinfo_o->async_capable ||
+ fpinfo_i->async_capable;
}
}
@@ -6490,6 +6572,236 @@ add_foreign_final_paths(PlannerInfo *root, RelOptInfo *input_rel,
}
/*
+ * postgresIsForeignPathAsyncCapable
+ * Check whether a given ForeignPath node is async-capable.
+ */
+static bool
+postgresIsForeignPathAsyncCapable(ForeignPath *path)
+{
+ RelOptInfo *rel = ((Path *) path)->parent;
+ PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private;
+
+ return fpinfo->async_capable;
+}
+
+/*
+ * postgresForeignAsyncRequest
+ * Asynchronously request next tuple from a foreign PostgreSQL table.
+ */
+static void
+postgresForeignAsyncRequest(AsyncRequest *areq)
+{
+ produce_tuple_asynchronously(areq, true);
+}
+
+/*
+ * postgresForeignAsyncConfigureWait
+ * Configure a file descriptor event for which we wish to wait.
+ */
+static void
+postgresForeignAsyncConfigureWait(AsyncRequest *areq)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq;
+ AppendState *requestor = (AppendState *) areq->requestor;
+ WaitEventSet *set = requestor->as_eventset;
+
+ /* This should not be called unless callback_pending */
+ Assert(areq->callback_pending);
+
+ /* The core code would have registered postmaster death event */
+ Assert(GetNumRegisteredWaitEvents(set) >= 1);
+
+ /* Begin an asynchronous data fetch if not already done */
+ if (!pendingAreq)
+ fetch_more_data_begin(areq);
+ else if (pendingAreq->requestor != areq->requestor)
+ {
+ /*
+ * This is the case when the in-process request was made by another
+ * Append. Note that it might be useless to process the request,
+ * because the query might not need tuples from that Append anymore.
+ * Skip the given request if there are any configured events other
+ * than the postmaster death event; otherwise process the request,
+ * then begin a fetch to configure the event below, because otherwise
+ * we might end up with no configured events other than the postmaster
+ * death event.
+ */
+ if (GetNumRegisteredWaitEvents(set) > 1)
+ return;
+ process_pending_request(pendingAreq);
+ fetch_more_data_begin(areq);
+ }
+ else if (pendingAreq->requestee != areq->requestee)
+ {
+ /*
+ * This is the case when the in-process request was made by the same
+ * parent but for a different child. Since we configure only the
+ * event for the request made for that child, skip the given request.
+ */
+ return;
+ }
+ else
+ Assert(pendingAreq == areq);
+
+ AddWaitEventToSet(set, WL_SOCKET_READABLE, PQsocket(fsstate->conn),
+ NULL, areq);
+}
+
+/*
+ * postgresForeignAsyncNotify
+ * Fetch some more tuples from a file descriptor that becomes ready,
+ * requesting next tuple.
+ */
+static void
+postgresForeignAsyncNotify(AsyncRequest *areq)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+
+ /* The request should be currently in-process */
+ Assert(fsstate->conn_state->pendingAreq == areq);
+
+ /* The core code would have initialized the callback_pending flag */
+ Assert(!areq->callback_pending);
+
+ /* On error, report the original query, not the FETCH. */
+ if (!PQconsumeInput(fsstate->conn))
+ pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query);
+
+ fetch_more_data(node);
+
+ produce_tuple_asynchronously(areq, true);
+}
+
+/*
+ * Asynchronously produce next tuple from a foreign PostgreSQL table.
+ */
+static void
+produce_tuple_asynchronously(AsyncRequest *areq, bool fetch)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq;
+ TupleTableSlot *result;
+
+ /* This should not be called if the request is currently in-process */
+ Assert(areq != pendingAreq);
+
+ /* Fetch some more tuples, if we've run out */
+ if (fsstate->next_tuple >= fsstate->num_tuples)
+ {
+ /* No point in another fetch if we already detected EOF, though */
+ if (!fsstate->eof_reached)
+ {
+ /* Mark the request as pending for a callback */
+ ExecAsyncRequestPending(areq);
+ /* Begin another fetch if requested and if no pending request */
+ if (fetch && !pendingAreq)
+ fetch_more_data_begin(areq);
+ }
+ else
+ {
+ /* There's nothing more to do; just return a NULL pointer */
+ result = NULL;
+ /* Mark the request as complete */
+ ExecAsyncRequestDone(areq, result);
+ }
+ return;
+ }
+
+ /* Get a tuple from the ForeignScan node */
+ result = ExecProcNode((PlanState *) node);
+ if (!TupIsNull(result))
+ {
+ /* Mark the request as complete */
+ ExecAsyncRequestDone(areq, result);
+ return;
+ }
+ Assert(fsstate->next_tuple >= fsstate->num_tuples);
+
+ /* Fetch some more tuples, if we've not detected EOF yet */
+ if (!fsstate->eof_reached)
+ {
+ /* Mark the request as pending for a callback */
+ ExecAsyncRequestPending(areq);
+ /* Begin another fetch if requested and if no pending request */
+ if (fetch && !pendingAreq)
+ fetch_more_data_begin(areq);
+ }
+ else
+ {
+ /* There's nothing more to do; just return a NULL pointer */
+ result = NULL;
+ /* Mark the request as complete */
+ ExecAsyncRequestDone(areq, result);
+ }
+}
+
+/*
+ * Begin an asynchronous data fetch.
+ *
+ * Note: fetch_more_data must be called to fetch the result.
+ */
+static void
+fetch_more_data_begin(AsyncRequest *areq)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ char sql[64];
+
+ Assert(!fsstate->conn_state->pendingAreq);
+
+ /* Create the cursor synchronously. */
+ if (!fsstate->cursor_exists)
+ create_cursor(node);
+
+ /* We will send this query, but not wait for the response. */
+ snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
+ fsstate->fetch_size, fsstate->cursor_number);
+
+ if (PQsendQuery(fsstate->conn, sql) < 0)
+ pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query);
+
+ /* Remember that the request is in process */
+ fsstate->conn_state->pendingAreq = areq;
+}
+
+/*
+ * Process a pending asynchronous request.
+ */
+void
+process_pending_request(AsyncRequest *areq)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ EState *estate = node->ss.ps.state;
+ MemoryContext oldcontext;
+
+ /* The request should be currently in-process */
+ Assert(fsstate->conn_state->pendingAreq == areq);
+
+ oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);
+
+ /* The request would have been pending for a callback */
+ Assert(areq->callback_pending);
+
+ /* Unlike AsyncNotify, we unset callback_pending ourselves */
+ areq->callback_pending = false;
+
+ fetch_more_data(node);
+
+ /* We need to send a new query afterwards; don't fetch */
+ produce_tuple_asynchronously(areq, false);
+
+ /* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */
+ ExecAsyncResponse(areq);
+
+ MemoryContextSwitchTo(oldcontext);
+}
+
+/*
* Create a tuple from the specified row of the PGresult.
*
* rel is the local representation of the foreign table, attinmeta is
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 1f67b4d9fd2..88d94da6f6b 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -16,6 +16,7 @@
#include "foreign/foreign.h"
#include "lib/stringinfo.h"
#include "libpq-fe.h"
+#include "nodes/execnodes.h"
#include "nodes/pathnodes.h"
#include "utils/relcache.h"
@@ -78,6 +79,7 @@ typedef struct PgFdwRelationInfo
Cost fdw_startup_cost;
Cost fdw_tuple_cost;
List *shippable_extensions; /* OIDs of shippable extensions */
+ bool async_capable;
/* Cached catalog information. */
ForeignTable *table;
@@ -124,17 +126,28 @@ typedef struct PgFdwRelationInfo
int relation_index;
} PgFdwRelationInfo;
+/*
+ * Extra control information relating to a connection.
+ */
+typedef struct PgFdwConnState
+{
+ AsyncRequest *pendingAreq; /* pending async request */
+} PgFdwConnState;
+
/* in postgres_fdw.c */
extern int set_transmission_modes(void);
extern void reset_transmission_modes(int nestlevel);
+extern void process_pending_request(AsyncRequest *areq);
/* in connection.c */
-extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt);
+extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt,
+ PgFdwConnState **state);
extern void ReleaseConnection(PGconn *conn);
extern unsigned int GetCursorNumber(PGconn *conn);
extern unsigned int GetPrepStmtNumber(PGconn *conn);
extern PGresult *pgfdw_get_result(PGconn *conn, const char *query);
-extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query);
+extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query,
+ PgFdwConnState *state);
extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
bool clear, const char *sql);
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index e9b30517a51..806a5bca28c 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -2928,3 +2928,198 @@ SELECT tableoid::regclass, * FROM batch_cp_upd_test;
-- Clean up
DROP TABLE batch_table, batch_cp_upd_test CASCADE;
+
+-- ===================================================================
+-- test asynchronous execution
+-- ===================================================================
+
+ALTER SERVER loopback OPTIONS (DROP extensions);
+ALTER SERVER loopback OPTIONS (ADD async_capable 'true');
+ALTER SERVER loopback2 OPTIONS (ADD async_capable 'true');
+
+CREATE TABLE async_pt (a int, b int, c text) PARTITION BY RANGE (a);
+CREATE TABLE base_tbl1 (a int, b int, c text);
+CREATE TABLE base_tbl2 (a int, b int, c text);
+CREATE FOREIGN TABLE async_p1 PARTITION OF async_pt FOR VALUES FROM (1000) TO (2000)
+ SERVER loopback OPTIONS (table_name 'base_tbl1');
+CREATE FOREIGN TABLE async_p2 PARTITION OF async_pt FOR VALUES FROM (2000) TO (3000)
+ SERVER loopback2 OPTIONS (table_name 'base_tbl2');
+INSERT INTO async_p1 SELECT 1000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
+INSERT INTO async_p2 SELECT 2000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
+ANALYZE async_pt;
+
+-- simple queries
+CREATE TABLE result_tbl (a int, b int, c text);
+
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b % 100 = 0;
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b % 100 = 0;
+
+SELECT * FROM result_tbl ORDER BY a;
+DELETE FROM result_tbl;
+
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
+
+SELECT * FROM result_tbl ORDER BY a;
+DELETE FROM result_tbl;
+
+-- Check case where multiple partitions use the same connection
+CREATE TABLE base_tbl3 (a int, b int, c text);
+CREATE FOREIGN TABLE async_p3 PARTITION OF async_pt FOR VALUES FROM (3000) TO (4000)
+ SERVER loopback2 OPTIONS (table_name 'base_tbl3');
+INSERT INTO async_p3 SELECT 3000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
+ANALYZE async_pt;
+
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
+
+SELECT * FROM result_tbl ORDER BY a;
+DELETE FROM result_tbl;
+
+DROP FOREIGN TABLE async_p3;
+DROP TABLE base_tbl3;
+
+-- Check case where the partitioned table has local/remote partitions
+CREATE TABLE async_p3 PARTITION OF async_pt FOR VALUES FROM (3000) TO (4000);
+INSERT INTO async_p3 SELECT 3000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
+ANALYZE async_pt;
+
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
+
+SELECT * FROM result_tbl ORDER BY a;
+DELETE FROM result_tbl;
+
+-- partitionwise joins
+SET enable_partitionwise_join TO true;
+
+CREATE TABLE join_tbl (a1 int, b1 int, c1 text, a2 int, b2 int, c2 text);
+
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO join_tbl SELECT * FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0;
+INSERT INTO join_tbl SELECT * FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0;
+
+SELECT * FROM join_tbl ORDER BY a1;
+DELETE FROM join_tbl;
+
+RESET enable_partitionwise_join;
+
+-- Test interaction of async execution with plan-time partition pruning
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE a < 3000;
+
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE a < 2000;
+
+-- Test interaction of async execution with run-time partition pruning
+SET plan_cache_mode TO force_generic_plan;
+
+PREPARE async_pt_query (int, int) AS
+ INSERT INTO result_tbl SELECT * FROM async_pt WHERE a < $1 AND b === $2;
+
+EXPLAIN (VERBOSE, COSTS OFF)
+EXECUTE async_pt_query (3000, 505);
+EXECUTE async_pt_query (3000, 505);
+
+SELECT * FROM result_tbl ORDER BY a;
+DELETE FROM result_tbl;
+
+EXPLAIN (VERBOSE, COSTS OFF)
+EXECUTE async_pt_query (2000, 505);
+EXECUTE async_pt_query (2000, 505);
+
+SELECT * FROM result_tbl ORDER BY a;
+DELETE FROM result_tbl;
+
+RESET plan_cache_mode;
+
+CREATE TABLE local_tbl(a int, b int, c text);
+INSERT INTO local_tbl VALUES (1505, 505, 'foo'), (2505, 505, 'bar');
+ANALYZE local_tbl;
+
+CREATE INDEX base_tbl1_idx ON base_tbl1 (a);
+CREATE INDEX base_tbl2_idx ON base_tbl2 (a);
+CREATE INDEX async_p3_idx ON async_p3 (a);
+ANALYZE base_tbl1;
+ANALYZE base_tbl2;
+ANALYZE async_p3;
+
+ALTER FOREIGN TABLE async_p1 OPTIONS (use_remote_estimate 'true');
+ALTER FOREIGN TABLE async_p2 OPTIONS (use_remote_estimate 'true');
+
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar';
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar';
+SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar';
+
+ALTER FOREIGN TABLE async_p1 OPTIONS (DROP use_remote_estimate);
+ALTER FOREIGN TABLE async_p2 OPTIONS (DROP use_remote_estimate);
+
+DROP TABLE local_tbl;
+DROP INDEX base_tbl1_idx;
+DROP INDEX base_tbl2_idx;
+DROP INDEX async_p3_idx;
+
+-- Test that pending requests are processed properly
+SET enable_mergejoin TO false;
+SET enable_hashjoin TO false;
+
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505;
+SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505;
+
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
+SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
+
+-- Check with foreign modify
+CREATE TABLE local_tbl (a int, b int, c text);
+INSERT INTO local_tbl VALUES (1505, 505, 'foo');
+
+CREATE TABLE base_tbl3 (a int, b int, c text);
+CREATE FOREIGN TABLE remote_tbl (a int, b int, c text)
+ SERVER loopback OPTIONS (table_name 'base_tbl3');
+INSERT INTO remote_tbl VALUES (2505, 505, 'bar');
+
+CREATE TABLE base_tbl4 (a int, b int, c text);
+CREATE FOREIGN TABLE insert_tbl (a int, b int, c text)
+ SERVER loopback OPTIONS (table_name 'base_tbl4');
+
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO insert_tbl (SELECT * FROM local_tbl UNION ALL SELECT * FROM remote_tbl);
+INSERT INTO insert_tbl (SELECT * FROM local_tbl UNION ALL SELECT * FROM remote_tbl);
+
+SELECT * FROM insert_tbl ORDER BY a;
+
+-- Check with direct modify
+EXPLAIN (VERBOSE, COSTS OFF)
+WITH t AS (UPDATE remote_tbl SET c = c || c RETURNING *)
+INSERT INTO join_tbl SELECT * FROM async_pt LEFT JOIN t ON (async_pt.a = t.a AND async_pt.b = t.b) WHERE async_pt.b === 505;
+WITH t AS (UPDATE remote_tbl SET c = c || c RETURNING *)
+INSERT INTO join_tbl SELECT * FROM async_pt LEFT JOIN t ON (async_pt.a = t.a AND async_pt.b = t.b) WHERE async_pt.b === 505;
+
+SELECT * FROM join_tbl ORDER BY a1;
+DELETE FROM join_tbl;
+
+RESET enable_mergejoin;
+RESET enable_hashjoin;
+
+-- Clean up
+DROP TABLE async_pt;
+DROP TABLE base_tbl1;
+DROP TABLE base_tbl2;
+DROP TABLE result_tbl;
+DROP TABLE local_tbl;
+DROP FOREIGN TABLE remote_tbl;
+DROP FOREIGN TABLE insert_tbl;
+DROP TABLE base_tbl3;
+DROP TABLE base_tbl4;
+DROP TABLE join_tbl;
+
+ALTER SERVER loopback OPTIONS (DROP async_capable);
+ALTER SERVER loopback2 OPTIONS (DROP async_capable);