diff options
Diffstat (limited to 'contrib/postgres_fdw')
-rw-r--r-- | contrib/postgres_fdw/connection.c | 26 | ||||
-rw-r--r-- | contrib/postgres_fdw/expected/postgres_fdw.out | 509 | ||||
-rw-r--r-- | contrib/postgres_fdw/option.c | 6 | ||||
-rw-r--r-- | contrib/postgres_fdw/postgres_fdw.c | 374 | ||||
-rw-r--r-- | contrib/postgres_fdw/postgres_fdw.h | 17 | ||||
-rw-r--r-- | contrib/postgres_fdw/sql/postgres_fdw.sql | 195 |
6 files changed, 1090 insertions, 37 deletions
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index ee0b4acf0ba..54ab8edfab6 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -62,6 +62,7 @@ typedef struct ConnCacheEntry Oid serverid; /* foreign server OID used to get server name */ uint32 server_hashvalue; /* hash value of foreign server OID */ uint32 mapping_hashvalue; /* hash value of user mapping OID */ + PgFdwConnState state; /* extra per-connection state */ } ConnCacheEntry; /* @@ -115,9 +116,12 @@ static bool disconnect_cached_connections(Oid serverid); * will_prep_stmt must be true if caller intends to create any prepared * statements. Since those don't go away automatically at transaction end * (not even on error), we need this flag to cue manual cleanup. + * + * If state is not NULL, *state receives the per-connection state associated + * with the PGconn. */ PGconn * -GetConnection(UserMapping *user, bool will_prep_stmt) +GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state) { bool found; bool retry = false; @@ -196,6 +200,9 @@ GetConnection(UserMapping *user, bool will_prep_stmt) */ PG_TRY(); { + /* Process a pending asynchronous request if any. */ + if (entry->state.pendingAreq) + process_pending_request(entry->state.pendingAreq); /* Start a new transaction or subtransaction if needed. */ begin_remote_xact(entry); } @@ -264,6 +271,10 @@ GetConnection(UserMapping *user, bool will_prep_stmt) /* Remember if caller will prepare statements */ entry->have_prep_stmt |= will_prep_stmt; + /* If caller needs access to the per-connection state, return it. */ + if (state) + *state = &entry->state; + return entry->conn; } @@ -291,6 +302,7 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user) entry->mapping_hashvalue = GetSysCacheHashValue1(USERMAPPINGOID, ObjectIdGetDatum(user->umid)); + memset(&entry->state, 0, sizeof(entry->state)); /* Now try to make the connection */ entry->conn = connect_pg_server(server, user); @@ -648,8 +660,12 @@ GetPrepStmtNumber(PGconn *conn) * Caller is responsible for the error handling on the result. */ PGresult * -pgfdw_exec_query(PGconn *conn, const char *query) +pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state) { + /* First, process a pending asynchronous request, if any. */ + if (state && state->pendingAreq) + process_pending_request(state->pendingAreq); + /* * Submit a query. Since we don't use non-blocking mode, this also can * block. But its risk is relatively small, so we ignore that for now. @@ -940,6 +956,8 @@ pgfdw_xact_callback(XactEvent event, void *arg) { entry->have_prep_stmt = false; entry->have_error = false; + /* Also reset per-connection state */ + memset(&entry->state, 0, sizeof(entry->state)); } /* Disarm changing_xact_state if it all worked. */ @@ -1172,6 +1190,10 @@ pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry) * Cancel the currently-in-progress query (whose query text we do not have) * and ignore the result. Returns true if we successfully cancel the query * and discard any pending result, and false if not. + * + * XXX: if the query was one sent by fetch_more_data_begin(), we could get the + * query text from the pendingAreq saved in the per-connection state, then + * report the query using it. */ static bool pgfdw_cancel_query(PGconn *conn) diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index f2c91c47827..f61e59cd200 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -8946,7 +8946,7 @@ DO $d$ END; $d$; ERROR: invalid option "password" -HINT: Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, sslcrldir, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, fetch_size, batch_size +HINT: Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, sslcrldir, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, fetch_size, batch_size, async_capable CONTEXT: SQL statement "ALTER SERVER loopback_nopw OPTIONS (ADD password 'dummypw')" PL/pgSQL function inline_code_block line 3 at EXECUTE -- If we add a password for our user mapping instead, we should get a different @@ -9437,3 +9437,510 @@ SELECT tableoid::regclass, * FROM batch_cp_upd_test; -- Clean up DROP TABLE batch_table, batch_cp_upd_test CASCADE; +-- =================================================================== +-- test asynchronous execution +-- =================================================================== +ALTER SERVER loopback OPTIONS (DROP extensions); +ALTER SERVER loopback OPTIONS (ADD async_capable 'true'); +ALTER SERVER loopback2 OPTIONS (ADD async_capable 'true'); +CREATE TABLE async_pt (a int, b int, c text) PARTITION BY RANGE (a); +CREATE TABLE base_tbl1 (a int, b int, c text); +CREATE TABLE base_tbl2 (a int, b int, c text); +CREATE FOREIGN TABLE async_p1 PARTITION OF async_pt FOR VALUES FROM (1000) TO (2000) + SERVER loopback OPTIONS (table_name 'base_tbl1'); +CREATE FOREIGN TABLE async_p2 PARTITION OF async_pt FOR VALUES FROM (2000) TO (3000) + SERVER loopback2 OPTIONS (table_name 'base_tbl2'); +INSERT INTO async_p1 SELECT 1000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i; +INSERT INTO async_p2 SELECT 2000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i; +ANALYZE async_pt; +-- simple queries +CREATE TABLE result_tbl (a int, b int, c text); +EXPLAIN (VERBOSE, COSTS OFF) +INSERT INTO result_tbl SELECT * FROM async_pt WHERE b % 100 = 0; + QUERY PLAN +---------------------------------------------------------------------------------------- + Insert on public.result_tbl + -> Append + -> Async Foreign Scan on public.async_p1 async_pt_1 + Output: async_pt_1.a, async_pt_1.b, async_pt_1.c + Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE (((b % 100) = 0)) + -> Async Foreign Scan on public.async_p2 async_pt_2 + Output: async_pt_2.a, async_pt_2.b, async_pt_2.c + Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE (((b % 100) = 0)) +(8 rows) + +INSERT INTO result_tbl SELECT * FROM async_pt WHERE b % 100 = 0; +SELECT * FROM result_tbl ORDER BY a; + a | b | c +------+-----+------ + 1000 | 0 | 0000 + 1100 | 100 | 0100 + 1200 | 200 | 0200 + 1300 | 300 | 0300 + 1400 | 400 | 0400 + 1500 | 500 | 0500 + 1600 | 600 | 0600 + 1700 | 700 | 0700 + 1800 | 800 | 0800 + 1900 | 900 | 0900 + 2000 | 0 | 0000 + 2100 | 100 | 0100 + 2200 | 200 | 0200 + 2300 | 300 | 0300 + 2400 | 400 | 0400 + 2500 | 500 | 0500 + 2600 | 600 | 0600 + 2700 | 700 | 0700 + 2800 | 800 | 0800 + 2900 | 900 | 0900 +(20 rows) + +DELETE FROM result_tbl; +EXPLAIN (VERBOSE, COSTS OFF) +INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505; + QUERY PLAN +---------------------------------------------------------------- + Insert on public.result_tbl + -> Append + -> Async Foreign Scan on public.async_p1 async_pt_1 + Output: async_pt_1.a, async_pt_1.b, async_pt_1.c + Filter: (async_pt_1.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl1 + -> Async Foreign Scan on public.async_p2 async_pt_2 + Output: async_pt_2.a, async_pt_2.b, async_pt_2.c + Filter: (async_pt_2.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl2 +(10 rows) + +INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505; +SELECT * FROM result_tbl ORDER BY a; + a | b | c +------+-----+------ + 1505 | 505 | 0505 + 2505 | 505 | 0505 +(2 rows) + +DELETE FROM result_tbl; +-- Check case where multiple partitions use the same connection +CREATE TABLE base_tbl3 (a int, b int, c text); +CREATE FOREIGN TABLE async_p3 PARTITION OF async_pt FOR VALUES FROM (3000) TO (4000) + SERVER loopback2 OPTIONS (table_name 'base_tbl3'); +INSERT INTO async_p3 SELECT 3000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i; +ANALYZE async_pt; +EXPLAIN (VERBOSE, COSTS OFF) +INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505; + QUERY PLAN +---------------------------------------------------------------- + Insert on public.result_tbl + -> Append + -> Async Foreign Scan on public.async_p1 async_pt_1 + Output: async_pt_1.a, async_pt_1.b, async_pt_1.c + Filter: (async_pt_1.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl1 + -> Async Foreign Scan on public.async_p2 async_pt_2 + Output: async_pt_2.a, async_pt_2.b, async_pt_2.c + Filter: (async_pt_2.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl2 + -> Async Foreign Scan on public.async_p3 async_pt_3 + Output: async_pt_3.a, async_pt_3.b, async_pt_3.c + Filter: (async_pt_3.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl3 +(14 rows) + +INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505; +SELECT * FROM result_tbl ORDER BY a; + a | b | c +------+-----+------ + 1505 | 505 | 0505 + 2505 | 505 | 0505 + 3505 | 505 | 0505 +(3 rows) + +DELETE FROM result_tbl; +DROP FOREIGN TABLE async_p3; +DROP TABLE base_tbl3; +-- Check case where the partitioned table has local/remote partitions +CREATE TABLE async_p3 PARTITION OF async_pt FOR VALUES FROM (3000) TO (4000); +INSERT INTO async_p3 SELECT 3000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i; +ANALYZE async_pt; +EXPLAIN (VERBOSE, COSTS OFF) +INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505; + QUERY PLAN +---------------------------------------------------------------- + Insert on public.result_tbl + -> Append + -> Async Foreign Scan on public.async_p1 async_pt_1 + Output: async_pt_1.a, async_pt_1.b, async_pt_1.c + Filter: (async_pt_1.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl1 + -> Async Foreign Scan on public.async_p2 async_pt_2 + Output: async_pt_2.a, async_pt_2.b, async_pt_2.c + Filter: (async_pt_2.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl2 + -> Seq Scan on public.async_p3 async_pt_3 + Output: async_pt_3.a, async_pt_3.b, async_pt_3.c + Filter: (async_pt_3.b === 505) +(13 rows) + +INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505; +SELECT * FROM result_tbl ORDER BY a; + a | b | c +------+-----+------ + 1505 | 505 | 0505 + 2505 | 505 | 0505 + 3505 | 505 | 0505 +(3 rows) + +DELETE FROM result_tbl; +-- partitionwise joins +SET enable_partitionwise_join TO true; +CREATE TABLE join_tbl (a1 int, b1 int, c1 text, a2 int, b2 int, c2 text); +EXPLAIN (VERBOSE, COSTS OFF) +INSERT INTO join_tbl SELECT * FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Insert on public.join_tbl + -> Append + -> Async Foreign Scan + Output: t1_1.a, t1_1.b, t1_1.c, t2_1.a, t2_1.b, t2_1.c + Relations: (public.async_p1 t1_1) INNER JOIN (public.async_p1 t2_1) + Remote SQL: SELECT r5.a, r5.b, r5.c, r8.a, r8.b, r8.c FROM (public.base_tbl1 r5 INNER JOIN public.base_tbl1 r8 ON (((r5.a = r8.a)) AND ((r5.b = r8.b)) AND (((r5.b % 100) = 0)))) + -> Async Foreign Scan + Output: t1_2.a, t1_2.b, t1_2.c, t2_2.a, t2_2.b, t2_2.c + Relations: (public.async_p2 t1_2) INNER JOIN (public.async_p2 t2_2) + Remote SQL: SELECT r6.a, r6.b, r6.c, r9.a, r9.b, r9.c FROM (public.base_tbl2 r6 INNER JOIN public.base_tbl2 r9 ON (((r6.a = r9.a)) AND ((r6.b = r9.b)) AND (((r6.b % 100) = 0)))) + -> Hash Join + Output: t1_3.a, t1_3.b, t1_3.c, t2_3.a, t2_3.b, t2_3.c + Hash Cond: ((t2_3.a = t1_3.a) AND (t2_3.b = t1_3.b)) + -> Seq Scan on public.async_p3 t2_3 + Output: t2_3.a, t2_3.b, t2_3.c + -> Hash + Output: t1_3.a, t1_3.b, t1_3.c + -> Seq Scan on public.async_p3 t1_3 + Output: t1_3.a, t1_3.b, t1_3.c + Filter: ((t1_3.b % 100) = 0) +(20 rows) + +INSERT INTO join_tbl SELECT * FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0; +SELECT * FROM join_tbl ORDER BY a1; + a1 | b1 | c1 | a2 | b2 | c2 +------+-----+------+------+-----+------ + 1000 | 0 | 0000 | 1000 | 0 | 0000 + 1100 | 100 | 0100 | 1100 | 100 | 0100 + 1200 | 200 | 0200 | 1200 | 200 | 0200 + 1300 | 300 | 0300 | 1300 | 300 | 0300 + 1400 | 400 | 0400 | 1400 | 400 | 0400 + 1500 | 500 | 0500 | 1500 | 500 | 0500 + 1600 | 600 | 0600 | 1600 | 600 | 0600 + 1700 | 700 | 0700 | 1700 | 700 | 0700 + 1800 | 800 | 0800 | 1800 | 800 | 0800 + 1900 | 900 | 0900 | 1900 | 900 | 0900 + 2000 | 0 | 0000 | 2000 | 0 | 0000 + 2100 | 100 | 0100 | 2100 | 100 | 0100 + 2200 | 200 | 0200 | 2200 | 200 | 0200 + 2300 | 300 | 0300 | 2300 | 300 | 0300 + 2400 | 400 | 0400 | 2400 | 400 | 0400 + 2500 | 500 | 0500 | 2500 | 500 | 0500 + 2600 | 600 | 0600 | 2600 | 600 | 0600 + 2700 | 700 | 0700 | 2700 | 700 | 0700 + 2800 | 800 | 0800 | 2800 | 800 | 0800 + 2900 | 900 | 0900 | 2900 | 900 | 0900 + 3000 | 0 | 0000 | 3000 | 0 | 0000 + 3100 | 100 | 0100 | 3100 | 100 | 0100 + 3200 | 200 | 0200 | 3200 | 200 | 0200 + 3300 | 300 | 0300 | 3300 | 300 | 0300 + 3400 | 400 | 0400 | 3400 | 400 | 0400 + 3500 | 500 | 0500 | 3500 | 500 | 0500 + 3600 | 600 | 0600 | 3600 | 600 | 0600 + 3700 | 700 | 0700 | 3700 | 700 | 0700 + 3800 | 800 | 0800 | 3800 | 800 | 0800 + 3900 | 900 | 0900 | 3900 | 900 | 0900 +(30 rows) + +DELETE FROM join_tbl; +RESET enable_partitionwise_join; +-- Test interaction of async execution with plan-time partition pruning +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM async_pt WHERE a < 3000; + QUERY PLAN +----------------------------------------------------------------------------- + Append + -> Async Foreign Scan on public.async_p1 async_pt_1 + Output: async_pt_1.a, async_pt_1.b, async_pt_1.c + Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < 3000)) + -> Async Foreign Scan on public.async_p2 async_pt_2 + Output: async_pt_2.a, async_pt_2.b, async_pt_2.c + Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE ((a < 3000)) +(7 rows) + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM async_pt WHERE a < 2000; + QUERY PLAN +----------------------------------------------------------------------- + Foreign Scan on public.async_p1 async_pt + Output: async_pt.a, async_pt.b, async_pt.c + Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < 2000)) +(3 rows) + +-- Test interaction of async execution with run-time partition pruning +SET plan_cache_mode TO force_generic_plan; +PREPARE async_pt_query (int, int) AS + INSERT INTO result_tbl SELECT * FROM async_pt WHERE a < $1 AND b === $2; +EXPLAIN (VERBOSE, COSTS OFF) +EXECUTE async_pt_query (3000, 505); + QUERY PLAN +------------------------------------------------------------------------------------------ + Insert on public.result_tbl + -> Append + Subplans Removed: 1 + -> Async Foreign Scan on public.async_p1 async_pt_1 + Output: async_pt_1.a, async_pt_1.b, async_pt_1.c + Filter: (async_pt_1.b === $2) + Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < $1::integer)) + -> Async Foreign Scan on public.async_p2 async_pt_2 + Output: async_pt_2.a, async_pt_2.b, async_pt_2.c + Filter: (async_pt_2.b === $2) + Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE ((a < $1::integer)) +(11 rows) + +EXECUTE async_pt_query (3000, 505); +SELECT * FROM result_tbl ORDER BY a; + a | b | c +------+-----+------ + 1505 | 505 | 0505 + 2505 | 505 | 0505 +(2 rows) + +DELETE FROM result_tbl; +EXPLAIN (VERBOSE, COSTS OFF) +EXECUTE async_pt_query (2000, 505); + QUERY PLAN +------------------------------------------------------------------------------------------ + Insert on public.result_tbl + -> Append + Subplans Removed: 2 + -> Async Foreign Scan on public.async_p1 async_pt_1 + Output: async_pt_1.a, async_pt_1.b, async_pt_1.c + Filter: (async_pt_1.b === $2) + Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < $1::integer)) +(7 rows) + +EXECUTE async_pt_query (2000, 505); +SELECT * FROM result_tbl ORDER BY a; + a | b | c +------+-----+------ + 1505 | 505 | 0505 +(1 row) + +DELETE FROM result_tbl; +RESET plan_cache_mode; +CREATE TABLE local_tbl(a int, b int, c text); +INSERT INTO local_tbl VALUES (1505, 505, 'foo'), (2505, 505, 'bar'); +ANALYZE local_tbl; +CREATE INDEX base_tbl1_idx ON base_tbl1 (a); +CREATE INDEX base_tbl2_idx ON base_tbl2 (a); +CREATE INDEX async_p3_idx ON async_p3 (a); +ANALYZE base_tbl1; +ANALYZE base_tbl2; +ANALYZE async_p3; +ALTER FOREIGN TABLE async_p1 OPTIONS (use_remote_estimate 'true'); +ALTER FOREIGN TABLE async_p2 OPTIONS (use_remote_estimate 'true'); +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar'; + QUERY PLAN +------------------------------------------------------------------------------------------ + Nested Loop + Output: local_tbl.a, local_tbl.b, local_tbl.c, async_pt.a, async_pt.b, async_pt.c + -> Seq Scan on public.local_tbl + Output: local_tbl.a, local_tbl.b, local_tbl.c + Filter: (local_tbl.c = 'bar'::text) + -> Append + -> Async Foreign Scan on public.async_p1 async_pt_1 + Output: async_pt_1.a, async_pt_1.b, async_pt_1.c + Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE (($1::integer = a)) + -> Async Foreign Scan on public.async_p2 async_pt_2 + Output: async_pt_2.a, async_pt_2.b, async_pt_2.c + Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE (($1::integer = a)) + -> Seq Scan on public.async_p3 async_pt_3 + Output: async_pt_3.a, async_pt_3.b, async_pt_3.c + Filter: (local_tbl.a = async_pt_3.a) +(15 rows) + +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar'; + QUERY PLAN +------------------------------------------------------------------------------- + Nested Loop (actual rows=1 loops=1) + -> Seq Scan on local_tbl (actual rows=1 loops=1) + Filter: (c = 'bar'::text) + Rows Removed by Filter: 1 + -> Append (actual rows=1 loops=1) + -> Async Foreign Scan on async_p1 async_pt_1 (never executed) + -> Async Foreign Scan on async_p2 async_pt_2 (actual rows=1 loops=1) + -> Seq Scan on async_p3 async_pt_3 (never executed) + Filter: (local_tbl.a = a) +(9 rows) + +SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar'; + a | b | c | a | b | c +------+-----+-----+------+-----+------ + 2505 | 505 | bar | 2505 | 505 | 0505 +(1 row) + +ALTER FOREIGN TABLE async_p1 OPTIONS (DROP use_remote_estimate); +ALTER FOREIGN TABLE async_p2 OPTIONS (DROP use_remote_estimate); +DROP TABLE local_tbl; +DROP INDEX base_tbl1_idx; +DROP INDEX base_tbl2_idx; +DROP INDEX async_p3_idx; +-- Test that pending requests are processed properly +SET enable_mergejoin TO false; +SET enable_hashjoin TO false; +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505; + QUERY PLAN +---------------------------------------------------------------- + Nested Loop + Output: t1.a, t1.b, t1.c, t2.a, t2.b, t2.c + Join Filter: (t1.a = t2.a) + -> Append + -> Async Foreign Scan on public.async_p1 t1_1 + Output: t1_1.a, t1_1.b, t1_1.c + Filter: (t1_1.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl1 + -> Async Foreign Scan on public.async_p2 t1_2 + Output: t1_2.a, t1_2.b, t1_2.c + Filter: (t1_2.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl2 + -> Seq Scan on public.async_p3 t1_3 + Output: t1_3.a, t1_3.b, t1_3.c + Filter: (t1_3.b === 505) + -> Materialize + Output: t2.a, t2.b, t2.c + -> Foreign Scan on public.async_p2 t2 + Output: t2.a, t2.b, t2.c + Remote SQL: SELECT a, b, c FROM public.base_tbl2 +(20 rows) + +SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505; + a | b | c | a | b | c +------+-----+------+------+-----+------ + 2505 | 505 | 0505 | 2505 | 505 | 0505 +(1 row) + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1; + QUERY PLAN +---------------------------------------------------------------- + Limit + Output: t1.a, t1.b, t1.c + -> Append + -> Async Foreign Scan on public.async_p1 t1_1 + Output: t1_1.a, t1_1.b, t1_1.c + Filter: (t1_1.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl1 + -> Async Foreign Scan on public.async_p2 t1_2 + Output: t1_2.a, t1_2.b, t1_2.c + Filter: (t1_2.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl2 + -> Seq Scan on public.async_p3 t1_3 + Output: t1_3.a, t1_3.b, t1_3.c + Filter: (t1_3.b === 505) +(14 rows) + +SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1; + a | b | c +------+-----+------ + 3505 | 505 | 0505 +(1 row) + +-- Check with foreign modify +CREATE TABLE local_tbl (a int, b int, c text); +INSERT INTO local_tbl VALUES (1505, 505, 'foo'); +CREATE TABLE base_tbl3 (a int, b int, c text); +CREATE FOREIGN TABLE remote_tbl (a int, b int, c text) + SERVER loopback OPTIONS (table_name 'base_tbl3'); +INSERT INTO remote_tbl VALUES (2505, 505, 'bar'); +CREATE TABLE base_tbl4 (a int, b int, c text); +CREATE FOREIGN TABLE insert_tbl (a int, b int, c text) + SERVER loopback OPTIONS (table_name 'base_tbl4'); +EXPLAIN (VERBOSE, COSTS OFF) +INSERT INTO insert_tbl (SELECT * FROM local_tbl UNION ALL SELECT * FROM remote_tbl); + QUERY PLAN +------------------------------------------------------------------------- + Insert on public.insert_tbl + Remote SQL: INSERT INTO public.base_tbl4(a, b, c) VALUES ($1, $2, $3) + Batch Size: 1 + -> Append + -> Seq Scan on public.local_tbl + Output: local_tbl.a, local_tbl.b, local_tbl.c + -> Async Foreign Scan on public.remote_tbl + Output: remote_tbl.a, remote_tbl.b, remote_tbl.c + Remote SQL: SELECT a, b, c FROM public.base_tbl3 +(9 rows) + +INSERT INTO insert_tbl (SELECT * FROM local_tbl UNION ALL SELECT * FROM remote_tbl); +SELECT * FROM insert_tbl ORDER BY a; + a | b | c +------+-----+----- + 1505 | 505 | foo + 2505 | 505 | bar +(2 rows) + +-- Check with direct modify +EXPLAIN (VERBOSE, COSTS OFF) +WITH t AS (UPDATE remote_tbl SET c = c || c RETURNING *) +INSERT INTO join_tbl SELECT * FROM async_pt LEFT JOIN t ON (async_pt.a = t.a AND async_pt.b = t.b) WHERE async_pt.b === 505; + QUERY PLAN +---------------------------------------------------------------------------------------- + Insert on public.join_tbl + CTE t + -> Update on public.remote_tbl + Output: remote_tbl.a, remote_tbl.b, remote_tbl.c + -> Foreign Update on public.remote_tbl + Remote SQL: UPDATE public.base_tbl3 SET c = (c || c) RETURNING a, b, c + -> Nested Loop Left Join + Output: async_pt.a, async_pt.b, async_pt.c, t.a, t.b, t.c + Join Filter: ((async_pt.a = t.a) AND (async_pt.b = t.b)) + -> Append + -> Async Foreign Scan on public.async_p1 async_pt_1 + Output: async_pt_1.a, async_pt_1.b, async_pt_1.c + Filter: (async_pt_1.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl1 + -> Async Foreign Scan on public.async_p2 async_pt_2 + Output: async_pt_2.a, async_pt_2.b, async_pt_2.c + Filter: (async_pt_2.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl2 + -> Seq Scan on public.async_p3 async_pt_3 + Output: async_pt_3.a, async_pt_3.b, async_pt_3.c + Filter: (async_pt_3.b === 505) + -> CTE Scan on t + Output: t.a, t.b, t.c +(23 rows) + +WITH t AS (UPDATE remote_tbl SET c = c || c RETURNING *) +INSERT INTO join_tbl SELECT * FROM async_pt LEFT JOIN t ON (async_pt.a = t.a AND async_pt.b = t.b) WHERE async_pt.b === 505; +SELECT * FROM join_tbl ORDER BY a1; + a1 | b1 | c1 | a2 | b2 | c2 +------+-----+------+------+-----+-------- + 1505 | 505 | 0505 | | | + 2505 | 505 | 0505 | 2505 | 505 | barbar + 3505 | 505 | 0505 | | | +(3 rows) + +DELETE FROM join_tbl; +RESET enable_mergejoin; +RESET enable_hashjoin; +-- Clean up +DROP TABLE async_pt; +DROP TABLE base_tbl1; +DROP TABLE base_tbl2; +DROP TABLE result_tbl; +DROP TABLE local_tbl; +DROP FOREIGN TABLE remote_tbl; +DROP FOREIGN TABLE insert_tbl; +DROP TABLE base_tbl3; +DROP TABLE base_tbl4; +DROP TABLE join_tbl; +ALTER SERVER loopback OPTIONS (DROP async_capable); +ALTER SERVER loopback2 OPTIONS (DROP async_capable); diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c index 64698c4da3a..530d7a66d40 100644 --- a/contrib/postgres_fdw/option.c +++ b/contrib/postgres_fdw/option.c @@ -107,7 +107,8 @@ postgres_fdw_validator(PG_FUNCTION_ARGS) * Validate option value, when we can do so without any context. */ if (strcmp(def->defname, "use_remote_estimate") == 0 || - strcmp(def->defname, "updatable") == 0) + strcmp(def->defname, "updatable") == 0 || + strcmp(def->defname, "async_capable") == 0) { /* these accept only boolean values */ (void) defGetBoolean(def); @@ -217,6 +218,9 @@ InitPgFdwOptions(void) /* batch_size is available on both server and table */ {"batch_size", ForeignServerRelationId, false}, {"batch_size", ForeignTableRelationId, false}, + /* async_capable is available on both server and table */ + {"async_capable", ForeignServerRelationId, false}, + {"async_capable", ForeignTableRelationId, false}, {"password_required", UserMappingRelationId, false}, /* diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 20b25935ce6..cc73a6902f5 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -21,6 +21,7 @@ #include "commands/defrem.h" #include "commands/explain.h" #include "commands/vacuum.h" +#include "executor/execAsync.h" #include "foreign/fdwapi.h" #include "funcapi.h" #include "miscadmin.h" @@ -37,6 +38,7 @@ #include "optimizer/tlist.h" #include "parser/parsetree.h" #include "postgres_fdw.h" +#include "storage/latch.h" #include "utils/builtins.h" #include "utils/float.h" #include "utils/guc.h" @@ -143,6 +145,7 @@ typedef struct PgFdwScanState /* for remote query execution */ PGconn *conn; /* connection for the scan */ + PgFdwConnState *conn_state; /* extra per-connection state */ unsigned int cursor_number; /* quasi-unique ID for my cursor */ bool cursor_exists; /* have we created the cursor? */ int numParams; /* number of parameters passed to query */ @@ -159,6 +162,9 @@ typedef struct PgFdwScanState int fetch_ct_2; /* Min(# of fetches done, 2) */ bool eof_reached; /* true if last fetch reached EOF */ + /* for asynchronous execution */ + bool async_capable; /* engage asynchronous-capable logic? */ + /* working memory contexts */ MemoryContext batch_cxt; /* context holding current batch of tuples */ MemoryContext temp_cxt; /* context for per-tuple temporary data */ @@ -176,6 +182,7 @@ typedef struct PgFdwModifyState /* for remote query execution */ PGconn *conn; /* connection for the scan */ + PgFdwConnState *conn_state; /* extra per-connection state */ char *p_name; /* name of prepared statement, if created */ /* extracted fdw_private data */ @@ -219,6 +226,7 @@ typedef struct PgFdwDirectModifyState /* for remote query execution */ PGconn *conn; /* connection for the update */ + PgFdwConnState *conn_state; /* extra per-connection state */ int numParams; /* number of parameters passed to query */ FmgrInfo *param_flinfo; /* output conversion functions for them */ List *param_exprs; /* executable expressions for param values */ @@ -408,6 +416,10 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *output_rel, void *extra); +static bool postgresIsForeignPathAsyncCapable(ForeignPath *path); +static void postgresForeignAsyncRequest(AsyncRequest *areq); +static void postgresForeignAsyncConfigureWait(AsyncRequest *areq); +static void postgresForeignAsyncNotify(AsyncRequest *areq); /* * Helper functions @@ -437,7 +449,8 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, void *arg); static void create_cursor(ForeignScanState *node); static void fetch_more_data(ForeignScanState *node); -static void close_cursor(PGconn *conn, unsigned int cursor_number); +static void close_cursor(PGconn *conn, unsigned int cursor_number, + PgFdwConnState *conn_state); static PgFdwModifyState *create_foreign_modify(EState *estate, RangeTblEntry *rte, ResultRelInfo *resultRelInfo, @@ -491,6 +504,8 @@ static int postgresAcquireSampleRowsFunc(Relation relation, int elevel, double *totaldeadrows); static void analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate); +static void produce_tuple_asynchronously(AsyncRequest *areq, bool fetch); +static void fetch_more_data_begin(AsyncRequest *areq); static HeapTuple make_tuple_from_result_row(PGresult *res, int row, Relation rel, @@ -583,6 +598,12 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) /* Support functions for upper relation push-down */ routine->GetForeignUpperPaths = postgresGetForeignUpperPaths; + /* Support functions for asynchronous execution */ + routine->IsForeignPathAsyncCapable = postgresIsForeignPathAsyncCapable; + routine->ForeignAsyncRequest = postgresForeignAsyncRequest; + routine->ForeignAsyncConfigureWait = postgresForeignAsyncConfigureWait; + routine->ForeignAsyncNotify = postgresForeignAsyncNotify; + PG_RETURN_POINTER(routine); } @@ -618,14 +639,15 @@ postgresGetForeignRelSize(PlannerInfo *root, /* * Extract user-settable option values. Note that per-table settings of - * use_remote_estimate and fetch_size override per-server settings of - * them, respectively. + * use_remote_estimate, fetch_size and async_capable override per-server + * settings of them, respectively. */ fpinfo->use_remote_estimate = false; fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST; fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST; fpinfo->shippable_extensions = NIL; fpinfo->fetch_size = 100; + fpinfo->async_capable = false; apply_server_options(fpinfo); apply_table_options(fpinfo); @@ -1459,7 +1481,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - fsstate->conn = GetConnection(user, false); + fsstate->conn = GetConnection(user, false, &fsstate->conn_state); /* Assign a unique ID for my cursor */ fsstate->cursor_number = GetCursorNumber(fsstate->conn); @@ -1510,6 +1532,9 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) &fsstate->param_flinfo, &fsstate->param_exprs, &fsstate->param_values); + + /* Set the async-capable flag */ + fsstate->async_capable = node->ss.ps.plan->async_capable; } /* @@ -1524,8 +1549,10 @@ postgresIterateForeignScan(ForeignScanState *node) TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; /* - * If this is the first call after Begin or ReScan, we need to create the - * cursor on the remote side. + * In sync mode, if this is the first call after Begin or ReScan, we need + * to create the cursor on the remote side. In async mode, we would have + * already created the cursor before we get here, even if this is the + * first call after Begin or ReScan. */ if (!fsstate->cursor_exists) create_cursor(node); @@ -1535,6 +1562,9 @@ postgresIterateForeignScan(ForeignScanState *node) */ if (fsstate->next_tuple >= fsstate->num_tuples) { + /* In async mode, just clear tuple slot. */ + if (fsstate->async_capable) + return ExecClearTuple(slot); /* No point in another fetch if we already detected EOF, though. */ if (!fsstate->eof_reached) fetch_more_data(node); @@ -1596,7 +1626,7 @@ postgresReScanForeignScan(ForeignScanState *node) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_exec_query(fsstate->conn, sql); + res = pgfdw_exec_query(fsstate->conn, sql, fsstate->conn_state); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, fsstate->conn, true, sql); PQclear(res); @@ -1624,7 +1654,8 @@ postgresEndForeignScan(ForeignScanState *node) /* Close the cursor if open, to prevent accumulation of cursors */ if (fsstate->cursor_exists) - close_cursor(fsstate->conn, fsstate->cursor_number); + close_cursor(fsstate->conn, fsstate->cursor_number, + fsstate->conn_state); /* Release remote connection */ ReleaseConnection(fsstate->conn); @@ -2501,7 +2532,7 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags) * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - dmstate->conn = GetConnection(user, false); + dmstate->conn = GetConnection(user, false, &dmstate->conn_state); /* Update the foreign-join-related fields. */ if (fsplan->scan.scanrelid == 0) @@ -2882,7 +2913,7 @@ estimate_path_cost_size(PlannerInfo *root, false, &retrieved_attrs, NULL); /* Get the remote estimate */ - conn = GetConnection(fpinfo->user, false); + conn = GetConnection(fpinfo->user, false, NULL); get_remote_estimate(sql.data, conn, &rows, &width, &startup_cost, &total_cost); ReleaseConnection(conn); @@ -3328,7 +3359,7 @@ get_remote_estimate(const char *sql, PGconn *conn, /* * Execute EXPLAIN remotely. */ - res = pgfdw_exec_query(conn, sql); + res = pgfdw_exec_query(conn, sql, NULL); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, sql); @@ -3452,6 +3483,10 @@ create_cursor(ForeignScanState *node) StringInfoData buf; PGresult *res; + /* First, process a pending asynchronous request, if any. */ + if (fsstate->conn_state->pendingAreq) + process_pending_request(fsstate->conn_state->pendingAreq); + /* * Construct array of query parameter values in text format. We do the * conversions in the short-lived per-tuple context, so as not to cause a @@ -3532,17 +3567,38 @@ fetch_more_data(ForeignScanState *node) PG_TRY(); { PGconn *conn = fsstate->conn; - char sql[64]; int numrows; int i; - snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", - fsstate->fetch_size, fsstate->cursor_number); + if (fsstate->async_capable) + { + Assert(fsstate->conn_state->pendingAreq); - res = pgfdw_exec_query(conn, sql); - /* On error, report the original query, not the FETCH. */ - if (PQresultStatus(res) != PGRES_TUPLES_OK) - pgfdw_report_error(ERROR, res, conn, false, fsstate->query); + /* + * The query was already sent by an earlier call to + * fetch_more_data_begin. So now we just fetch the result. + */ + res = pgfdw_get_result(conn, fsstate->query); + /* On error, report the original query, not the FETCH. */ + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, false, fsstate->query); + + /* Reset per-connection state */ + fsstate->conn_state->pendingAreq = NULL; + } + else + { + char sql[64]; + + /* This is a regular synchronous fetch. */ + snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", + fsstate->fetch_size, fsstate->cursor_number); + + res = pgfdw_exec_query(conn, sql, fsstate->conn_state); + /* On error, report the original query, not the FETCH. */ + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, false, fsstate->query); + } /* Convert the data into HeapTuples */ numrows = PQntuples(res); @@ -3634,7 +3690,8 @@ reset_transmission_modes(int nestlevel) * Utility routine to close a cursor. */ static void -close_cursor(PGconn *conn, unsigned int cursor_number) +close_cursor(PGconn *conn, unsigned int cursor_number, + PgFdwConnState *conn_state) { char sql[64]; PGresult *res; @@ -3645,7 +3702,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_exec_query(conn, sql); + res = pgfdw_exec_query(conn, sql, conn_state); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, true, sql); PQclear(res); @@ -3694,7 +3751,7 @@ create_foreign_modify(EState *estate, user = GetUserMapping(userid, table->serverid); /* Open connection; report that we'll create a prepared statement. */ - fmstate->conn = GetConnection(user, true); + fmstate->conn = GetConnection(user, true, &fmstate->conn_state); fmstate->p_name = NULL; /* prepared statement not made yet */ /* Set up remote query information. */ @@ -3793,6 +3850,10 @@ execute_foreign_modify(EState *estate, operation == CMD_UPDATE || operation == CMD_DELETE); + /* First, process a pending asynchronous request, if any. */ + if (fmstate->conn_state->pendingAreq) + process_pending_request(fmstate->conn_state->pendingAreq); + /* * If the existing query was deparsed and prepared for a different number * of rows, rebuild it for the proper number. @@ -3894,6 +3955,11 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) char *p_name; PGresult *res; + /* + * The caller would already have processed a pending asynchronous request + * if any, so no need to do it here. + */ + /* Construct name we'll use for the prepared statement. */ snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u", GetPrepStmtNumber(fmstate->conn)); @@ -4079,7 +4145,7 @@ deallocate_query(PgFdwModifyState *fmstate) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_exec_query(fmstate->conn, sql); + res = pgfdw_exec_query(fmstate->conn, sql, fmstate->conn_state); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, fmstate->conn, true, sql); PQclear(res); @@ -4227,6 +4293,10 @@ execute_dml_stmt(ForeignScanState *node) int numParams = dmstate->numParams; const char **values = dmstate->param_values; + /* First, process a pending asynchronous request, if any. */ + if (dmstate->conn_state->pendingAreq) + process_pending_request(dmstate->conn_state->pendingAreq); + /* * Construct array of query parameter values in text format. */ @@ -4628,7 +4698,7 @@ postgresAnalyzeForeignTable(Relation relation, */ table = GetForeignTable(RelationGetRelid(relation)); user = GetUserMapping(relation->rd_rel->relowner, table->serverid); - conn = GetConnection(user, false); + conn = GetConnection(user, false, NULL); /* * Construct command to get page count for relation. @@ -4639,7 +4709,7 @@ postgresAnalyzeForeignTable(Relation relation, /* In what follows, do not risk leaking any PGresults. */ PG_TRY(); { - res = pgfdw_exec_query(conn, sql.data); + res = pgfdw_exec_query(conn, sql.data, NULL); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, sql.data); @@ -4714,7 +4784,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, table = GetForeignTable(RelationGetRelid(relation)); server = GetForeignServer(table->serverid); user = GetUserMapping(relation->rd_rel->relowner, table->serverid); - conn = GetConnection(user, false); + conn = GetConnection(user, false, NULL); /* * Construct cursor that retrieves whole rows from remote. @@ -4731,7 +4801,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, int fetch_size; ListCell *lc; - res = pgfdw_exec_query(conn, sql.data); + res = pgfdw_exec_query(conn, sql.data, NULL); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, false, sql.data); PQclear(res); @@ -4783,7 +4853,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, */ /* Fetch some rows */ - res = pgfdw_exec_query(conn, fetch_sql); + res = pgfdw_exec_query(conn, fetch_sql, NULL); /* On error, report the original query, not the FETCH. */ if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, sql.data); @@ -4802,7 +4872,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, } /* Close the cursor, just to be tidy. */ - close_cursor(conn, cursor_number); + close_cursor(conn, cursor_number, NULL); } PG_CATCH(); { @@ -4942,7 +5012,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) */ server = GetForeignServer(serverOid); mapping = GetUserMapping(GetUserId(), server->serverid); - conn = GetConnection(mapping, false); + conn = GetConnection(mapping, false, NULL); /* Don't attempt to import collation if remote server hasn't got it */ if (PQserverVersion(conn) < 90100) @@ -4958,7 +5028,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = "); deparseStringLiteral(&buf, stmt->remote_schema); - res = pgfdw_exec_query(conn, buf.data); + res = pgfdw_exec_query(conn, buf.data, NULL); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, buf.data); @@ -5070,7 +5140,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum"); /* Fetch the data */ - res = pgfdw_exec_query(conn, buf.data); + res = pgfdw_exec_query(conn, buf.data, NULL); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, buf.data); @@ -5530,6 +5600,8 @@ apply_server_options(PgFdwRelationInfo *fpinfo) ExtractExtensionList(defGetString(def), false); else if (strcmp(def->defname, "fetch_size") == 0) fpinfo->fetch_size = strtol(defGetString(def), NULL, 10); + else if (strcmp(def->defname, "async_capable") == 0) + fpinfo->async_capable = defGetBoolean(def); } } @@ -5551,6 +5623,8 @@ apply_table_options(PgFdwRelationInfo *fpinfo) fpinfo->use_remote_estimate = defGetBoolean(def); else if (strcmp(def->defname, "fetch_size") == 0) fpinfo->fetch_size = strtol(defGetString(def), NULL, 10); + else if (strcmp(def->defname, "async_capable") == 0) + fpinfo->async_capable = defGetBoolean(def); } } @@ -5585,6 +5659,7 @@ merge_fdw_options(PgFdwRelationInfo *fpinfo, fpinfo->shippable_extensions = fpinfo_o->shippable_extensions; fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate; fpinfo->fetch_size = fpinfo_o->fetch_size; + fpinfo->async_capable = fpinfo_o->async_capable; /* Merge the table level options from either side of the join. */ if (fpinfo_i) @@ -5606,6 +5681,13 @@ merge_fdw_options(PgFdwRelationInfo *fpinfo, * relation sizes. */ fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size); + + /* + * We'll prefer to consider this join async-capable if any table from + * either side of the join is considered async-capable. + */ + fpinfo->async_capable = fpinfo_o->async_capable || + fpinfo_i->async_capable; } } @@ -6490,6 +6572,236 @@ add_foreign_final_paths(PlannerInfo *root, RelOptInfo *input_rel, } /* + * postgresIsForeignPathAsyncCapable + * Check whether a given ForeignPath node is async-capable. + */ +static bool +postgresIsForeignPathAsyncCapable(ForeignPath *path) +{ + RelOptInfo *rel = ((Path *) path)->parent; + PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private; + + return fpinfo->async_capable; +} + +/* + * postgresForeignAsyncRequest + * Asynchronously request next tuple from a foreign PostgreSQL table. + */ +static void +postgresForeignAsyncRequest(AsyncRequest *areq) +{ + produce_tuple_asynchronously(areq, true); +} + +/* + * postgresForeignAsyncConfigureWait + * Configure a file descriptor event for which we wish to wait. + */ +static void +postgresForeignAsyncConfigureWait(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq; + AppendState *requestor = (AppendState *) areq->requestor; + WaitEventSet *set = requestor->as_eventset; + + /* This should not be called unless callback_pending */ + Assert(areq->callback_pending); + + /* The core code would have registered postmaster death event */ + Assert(GetNumRegisteredWaitEvents(set) >= 1); + + /* Begin an asynchronous data fetch if not already done */ + if (!pendingAreq) + fetch_more_data_begin(areq); + else if (pendingAreq->requestor != areq->requestor) + { + /* + * This is the case when the in-process request was made by another + * Append. Note that it might be useless to process the request, + * because the query might not need tuples from that Append anymore. + * Skip the given request if there are any configured events other + * than the postmaster death event; otherwise process the request, + * then begin a fetch to configure the event below, because otherwise + * we might end up with no configured events other than the postmaster + * death event. + */ + if (GetNumRegisteredWaitEvents(set) > 1) + return; + process_pending_request(pendingAreq); + fetch_more_data_begin(areq); + } + else if (pendingAreq->requestee != areq->requestee) + { + /* + * This is the case when the in-process request was made by the same + * parent but for a different child. Since we configure only the + * event for the request made for that child, skip the given request. + */ + return; + } + else + Assert(pendingAreq == areq); + + AddWaitEventToSet(set, WL_SOCKET_READABLE, PQsocket(fsstate->conn), + NULL, areq); +} + +/* + * postgresForeignAsyncNotify + * Fetch some more tuples from a file descriptor that becomes ready, + * requesting next tuple. + */ +static void +postgresForeignAsyncNotify(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + + /* The request should be currently in-process */ + Assert(fsstate->conn_state->pendingAreq == areq); + + /* The core code would have initialized the callback_pending flag */ + Assert(!areq->callback_pending); + + /* On error, report the original query, not the FETCH. */ + if (!PQconsumeInput(fsstate->conn)) + pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query); + + fetch_more_data(node); + + produce_tuple_asynchronously(areq, true); +} + +/* + * Asynchronously produce next tuple from a foreign PostgreSQL table. + */ +static void +produce_tuple_asynchronously(AsyncRequest *areq, bool fetch) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq; + TupleTableSlot *result; + + /* This should not be called if the request is currently in-process */ + Assert(areq != pendingAreq); + + /* Fetch some more tuples, if we've run out */ + if (fsstate->next_tuple >= fsstate->num_tuples) + { + /* No point in another fetch if we already detected EOF, though */ + if (!fsstate->eof_reached) + { + /* Mark the request as pending for a callback */ + ExecAsyncRequestPending(areq); + /* Begin another fetch if requested and if no pending request */ + if (fetch && !pendingAreq) + fetch_more_data_begin(areq); + } + else + { + /* There's nothing more to do; just return a NULL pointer */ + result = NULL; + /* Mark the request as complete */ + ExecAsyncRequestDone(areq, result); + } + return; + } + + /* Get a tuple from the ForeignScan node */ + result = ExecProcNode((PlanState *) node); + if (!TupIsNull(result)) + { + /* Mark the request as complete */ + ExecAsyncRequestDone(areq, result); + return; + } + Assert(fsstate->next_tuple >= fsstate->num_tuples); + + /* Fetch some more tuples, if we've not detected EOF yet */ + if (!fsstate->eof_reached) + { + /* Mark the request as pending for a callback */ + ExecAsyncRequestPending(areq); + /* Begin another fetch if requested and if no pending request */ + if (fetch && !pendingAreq) + fetch_more_data_begin(areq); + } + else + { + /* There's nothing more to do; just return a NULL pointer */ + result = NULL; + /* Mark the request as complete */ + ExecAsyncRequestDone(areq, result); + } +} + +/* + * Begin an asynchronous data fetch. + * + * Note: fetch_more_data must be called to fetch the result. + */ +static void +fetch_more_data_begin(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + char sql[64]; + + Assert(!fsstate->conn_state->pendingAreq); + + /* Create the cursor synchronously. */ + if (!fsstate->cursor_exists) + create_cursor(node); + + /* We will send this query, but not wait for the response. */ + snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", + fsstate->fetch_size, fsstate->cursor_number); + + if (PQsendQuery(fsstate->conn, sql) < 0) + pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query); + + /* Remember that the request is in process */ + fsstate->conn_state->pendingAreq = areq; +} + +/* + * Process a pending asynchronous request. + */ +void +process_pending_request(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + EState *estate = node->ss.ps.state; + MemoryContext oldcontext; + + /* The request should be currently in-process */ + Assert(fsstate->conn_state->pendingAreq == areq); + + oldcontext = MemoryContextSwitchTo(estate->es_query_cxt); + + /* The request would have been pending for a callback */ + Assert(areq->callback_pending); + + /* Unlike AsyncNotify, we unset callback_pending ourselves */ + areq->callback_pending = false; + + fetch_more_data(node); + + /* We need to send a new query afterwards; don't fetch */ + produce_tuple_asynchronously(areq, false); + + /* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */ + ExecAsyncResponse(areq); + + MemoryContextSwitchTo(oldcontext); +} + +/* * Create a tuple from the specified row of the PGresult. * * rel is the local representation of the foreign table, attinmeta is diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index 1f67b4d9fd2..88d94da6f6b 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -16,6 +16,7 @@ #include "foreign/foreign.h" #include "lib/stringinfo.h" #include "libpq-fe.h" +#include "nodes/execnodes.h" #include "nodes/pathnodes.h" #include "utils/relcache.h" @@ -78,6 +79,7 @@ typedef struct PgFdwRelationInfo Cost fdw_startup_cost; Cost fdw_tuple_cost; List *shippable_extensions; /* OIDs of shippable extensions */ + bool async_capable; /* Cached catalog information. */ ForeignTable *table; @@ -124,17 +126,28 @@ typedef struct PgFdwRelationInfo int relation_index; } PgFdwRelationInfo; +/* + * Extra control information relating to a connection. + */ +typedef struct PgFdwConnState +{ + AsyncRequest *pendingAreq; /* pending async request */ +} PgFdwConnState; + /* in postgres_fdw.c */ extern int set_transmission_modes(void); extern void reset_transmission_modes(int nestlevel); +extern void process_pending_request(AsyncRequest *areq); /* in connection.c */ -extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt); +extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt, + PgFdwConnState **state); extern void ReleaseConnection(PGconn *conn); extern unsigned int GetCursorNumber(PGconn *conn); extern unsigned int GetPrepStmtNumber(PGconn *conn); extern PGresult *pgfdw_get_result(PGconn *conn, const char *query); -extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query); +extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query, + PgFdwConnState *state); extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql); diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index e9b30517a51..806a5bca28c 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -2928,3 +2928,198 @@ SELECT tableoid::regclass, * FROM batch_cp_upd_test; -- Clean up DROP TABLE batch_table, batch_cp_upd_test CASCADE; + +-- =================================================================== +-- test asynchronous execution +-- =================================================================== + +ALTER SERVER loopback OPTIONS (DROP extensions); +ALTER SERVER loopback OPTIONS (ADD async_capable 'true'); +ALTER SERVER loopback2 OPTIONS (ADD async_capable 'true'); + +CREATE TABLE async_pt (a int, b int, c text) PARTITION BY RANGE (a); +CREATE TABLE base_tbl1 (a int, b int, c text); +CREATE TABLE base_tbl2 (a int, b int, c text); +CREATE FOREIGN TABLE async_p1 PARTITION OF async_pt FOR VALUES FROM (1000) TO (2000) + SERVER loopback OPTIONS (table_name 'base_tbl1'); +CREATE FOREIGN TABLE async_p2 PARTITION OF async_pt FOR VALUES FROM (2000) TO (3000) + SERVER loopback2 OPTIONS (table_name 'base_tbl2'); +INSERT INTO async_p1 SELECT 1000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i; +INSERT INTO async_p2 SELECT 2000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i; +ANALYZE async_pt; + +-- simple queries +CREATE TABLE result_tbl (a int, b int, c text); + +EXPLAIN (VERBOSE, COSTS OFF) +INSERT INTO result_tbl SELECT * FROM async_pt WHERE b % 100 = 0; +INSERT INTO result_tbl SELECT * FROM async_pt WHERE b % 100 = 0; + +SELECT * FROM result_tbl ORDER BY a; +DELETE FROM result_tbl; + +EXPLAIN (VERBOSE, COSTS OFF) +INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505; +INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505; + +SELECT * FROM result_tbl ORDER BY a; +DELETE FROM result_tbl; + +-- Check case where multiple partitions use the same connection +CREATE TABLE base_tbl3 (a int, b int, c text); +CREATE FOREIGN TABLE async_p3 PARTITION OF async_pt FOR VALUES FROM (3000) TO (4000) + SERVER loopback2 OPTIONS (table_name 'base_tbl3'); +INSERT INTO async_p3 SELECT 3000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i; +ANALYZE async_pt; + +EXPLAIN (VERBOSE, COSTS OFF) +INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505; +INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505; + +SELECT * FROM result_tbl ORDER BY a; +DELETE FROM result_tbl; + +DROP FOREIGN TABLE async_p3; +DROP TABLE base_tbl3; + +-- Check case where the partitioned table has local/remote partitions +CREATE TABLE async_p3 PARTITION OF async_pt FOR VALUES FROM (3000) TO (4000); +INSERT INTO async_p3 SELECT 3000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i; +ANALYZE async_pt; + +EXPLAIN (VERBOSE, COSTS OFF) +INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505; +INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505; + +SELECT * FROM result_tbl ORDER BY a; +DELETE FROM result_tbl; + +-- partitionwise joins +SET enable_partitionwise_join TO true; + +CREATE TABLE join_tbl (a1 int, b1 int, c1 text, a2 int, b2 int, c2 text); + +EXPLAIN (VERBOSE, COSTS OFF) +INSERT INTO join_tbl SELECT * FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0; +INSERT INTO join_tbl SELECT * FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0; + +SELECT * FROM join_tbl ORDER BY a1; +DELETE FROM join_tbl; + +RESET enable_partitionwise_join; + +-- Test interaction of async execution with plan-time partition pruning +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM async_pt WHERE a < 3000; + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM async_pt WHERE a < 2000; + +-- Test interaction of async execution with run-time partition pruning +SET plan_cache_mode TO force_generic_plan; + +PREPARE async_pt_query (int, int) AS + INSERT INTO result_tbl SELECT * FROM async_pt WHERE a < $1 AND b === $2; + +EXPLAIN (VERBOSE, COSTS OFF) +EXECUTE async_pt_query (3000, 505); +EXECUTE async_pt_query (3000, 505); + +SELECT * FROM result_tbl ORDER BY a; +DELETE FROM result_tbl; + +EXPLAIN (VERBOSE, COSTS OFF) +EXECUTE async_pt_query (2000, 505); +EXECUTE async_pt_query (2000, 505); + +SELECT * FROM result_tbl ORDER BY a; +DELETE FROM result_tbl; + +RESET plan_cache_mode; + +CREATE TABLE local_tbl(a int, b int, c text); +INSERT INTO local_tbl VALUES (1505, 505, 'foo'), (2505, 505, 'bar'); +ANALYZE local_tbl; + +CREATE INDEX base_tbl1_idx ON base_tbl1 (a); +CREATE INDEX base_tbl2_idx ON base_tbl2 (a); +CREATE INDEX async_p3_idx ON async_p3 (a); +ANALYZE base_tbl1; +ANALYZE base_tbl2; +ANALYZE async_p3; + +ALTER FOREIGN TABLE async_p1 OPTIONS (use_remote_estimate 'true'); +ALTER FOREIGN TABLE async_p2 OPTIONS (use_remote_estimate 'true'); + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar'; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar'; +SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar'; + +ALTER FOREIGN TABLE async_p1 OPTIONS (DROP use_remote_estimate); +ALTER FOREIGN TABLE async_p2 OPTIONS (DROP use_remote_estimate); + +DROP TABLE local_tbl; +DROP INDEX base_tbl1_idx; +DROP INDEX base_tbl2_idx; +DROP INDEX async_p3_idx; + +-- Test that pending requests are processed properly +SET enable_mergejoin TO false; +SET enable_hashjoin TO false; + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505; +SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505; + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1; +SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1; + +-- Check with foreign modify +CREATE TABLE local_tbl (a int, b int, c text); +INSERT INTO local_tbl VALUES (1505, 505, 'foo'); + +CREATE TABLE base_tbl3 (a int, b int, c text); +CREATE FOREIGN TABLE remote_tbl (a int, b int, c text) + SERVER loopback OPTIONS (table_name 'base_tbl3'); +INSERT INTO remote_tbl VALUES (2505, 505, 'bar'); + +CREATE TABLE base_tbl4 (a int, b int, c text); +CREATE FOREIGN TABLE insert_tbl (a int, b int, c text) + SERVER loopback OPTIONS (table_name 'base_tbl4'); + +EXPLAIN (VERBOSE, COSTS OFF) +INSERT INTO insert_tbl (SELECT * FROM local_tbl UNION ALL SELECT * FROM remote_tbl); +INSERT INTO insert_tbl (SELECT * FROM local_tbl UNION ALL SELECT * FROM remote_tbl); + +SELECT * FROM insert_tbl ORDER BY a; + +-- Check with direct modify +EXPLAIN (VERBOSE, COSTS OFF) +WITH t AS (UPDATE remote_tbl SET c = c || c RETURNING *) +INSERT INTO join_tbl SELECT * FROM async_pt LEFT JOIN t ON (async_pt.a = t.a AND async_pt.b = t.b) WHERE async_pt.b === 505; +WITH t AS (UPDATE remote_tbl SET c = c || c RETURNING *) +INSERT INTO join_tbl SELECT * FROM async_pt LEFT JOIN t ON (async_pt.a = t.a AND async_pt.b = t.b) WHERE async_pt.b === 505; + +SELECT * FROM join_tbl ORDER BY a1; +DELETE FROM join_tbl; + +RESET enable_mergejoin; +RESET enable_hashjoin; + +-- Clean up +DROP TABLE async_pt; +DROP TABLE base_tbl1; +DROP TABLE base_tbl2; +DROP TABLE result_tbl; +DROP TABLE local_tbl; +DROP FOREIGN TABLE remote_tbl; +DROP FOREIGN TABLE insert_tbl; +DROP TABLE base_tbl3; +DROP TABLE base_tbl4; +DROP TABLE join_tbl; + +ALTER SERVER loopback OPTIONS (DROP async_capable); +ALTER SERVER loopback2 OPTIONS (DROP async_capable); |