diff options
author | Etsuro Fujita <efujita@postgresql.org> | 2021-03-31 18:45:00 +0900 |
---|---|---|
committer | Etsuro Fujita <efujita@postgresql.org> | 2021-03-31 18:45:00 +0900 |
commit | 27e1f14563cf982f1f4d71e21ef247866662a052 (patch) | |
tree | 4e1a17a61abbfc67a471760cc84e1e46182bfb9d /contrib/postgres_fdw/postgres_fdw.c | |
parent | 66392d396508c91c2ec07a61568bf96acb663ad8 (diff) | |
download | postgresql-27e1f14563cf982f1f4d71e21ef247866662a052.tar.gz postgresql-27e1f14563cf982f1f4d71e21ef247866662a052.zip |
Add support for asynchronous execution.
This implements asynchronous execution, which runs multiple parts of a
non-parallel-aware Append concurrently rather than serially to improve
performance when possible. Currently, the only node type that can be
run concurrently is a ForeignScan that is an immediate child of such an
Append. In the case where such ForeignScans access data on different
remote servers, this would run those ForeignScans concurrently, and
overlap the remote operations to be performed simultaneously, so it'll
improve the performance especially when the operations involve
time-consuming ones such as remote join and remote aggregation.
We may extend this to other node types such as joins or aggregates over
ForeignScans in the future.
This also adds the support for postgres_fdw, which is enabled by the
table-level/server-level option "async_capable". The default is false.
Robert Haas, Kyotaro Horiguchi, Thomas Munro, and myself. This commit
is mostly based on the patch proposed by Robert Haas, but also uses
stuff from the patch proposed by Kyotaro Horiguchi and from the patch
proposed by Thomas Munro. Reviewed by Kyotaro Horiguchi, Konstantin
Knizhnik, Andrey Lepikhov, Movead Li, Thomas Munro, Justin Pryzby, and
others.
Discussion: https://postgr.es/m/CA%2BTgmoaXQEt4tZ03FtQhnzeDEMzBck%2BLrni0UWHVVgOTnA6C1w%40mail.gmail.com
Discussion: https://postgr.es/m/CA%2BhUKGLBRyu0rHrDCMC4%3DRn3252gogyp1SjOgG8SEKKZv%3DFwfQ%40mail.gmail.com
Discussion: https://postgr.es/m/20200228.170650.667613673625155850.horikyota.ntt%40gmail.com
Diffstat (limited to 'contrib/postgres_fdw/postgres_fdw.c')
-rw-r--r-- | contrib/postgres_fdw/postgres_fdw.c | 374 |
1 files changed, 343 insertions, 31 deletions
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 20b25935ce6..cc73a6902f5 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -21,6 +21,7 @@ #include "commands/defrem.h" #include "commands/explain.h" #include "commands/vacuum.h" +#include "executor/execAsync.h" #include "foreign/fdwapi.h" #include "funcapi.h" #include "miscadmin.h" @@ -37,6 +38,7 @@ #include "optimizer/tlist.h" #include "parser/parsetree.h" #include "postgres_fdw.h" +#include "storage/latch.h" #include "utils/builtins.h" #include "utils/float.h" #include "utils/guc.h" @@ -143,6 +145,7 @@ typedef struct PgFdwScanState /* for remote query execution */ PGconn *conn; /* connection for the scan */ + PgFdwConnState *conn_state; /* extra per-connection state */ unsigned int cursor_number; /* quasi-unique ID for my cursor */ bool cursor_exists; /* have we created the cursor? */ int numParams; /* number of parameters passed to query */ @@ -159,6 +162,9 @@ typedef struct PgFdwScanState int fetch_ct_2; /* Min(# of fetches done, 2) */ bool eof_reached; /* true if last fetch reached EOF */ + /* for asynchronous execution */ + bool async_capable; /* engage asynchronous-capable logic? */ + /* working memory contexts */ MemoryContext batch_cxt; /* context holding current batch of tuples */ MemoryContext temp_cxt; /* context for per-tuple temporary data */ @@ -176,6 +182,7 @@ typedef struct PgFdwModifyState /* for remote query execution */ PGconn *conn; /* connection for the scan */ + PgFdwConnState *conn_state; /* extra per-connection state */ char *p_name; /* name of prepared statement, if created */ /* extracted fdw_private data */ @@ -219,6 +226,7 @@ typedef struct PgFdwDirectModifyState /* for remote query execution */ PGconn *conn; /* connection for the update */ + PgFdwConnState *conn_state; /* extra per-connection state */ int numParams; /* number of parameters passed to query */ FmgrInfo *param_flinfo; /* output conversion functions for them */ List *param_exprs; /* executable expressions for param values */ @@ -408,6 +416,10 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *output_rel, void *extra); +static bool postgresIsForeignPathAsyncCapable(ForeignPath *path); +static void postgresForeignAsyncRequest(AsyncRequest *areq); +static void postgresForeignAsyncConfigureWait(AsyncRequest *areq); +static void postgresForeignAsyncNotify(AsyncRequest *areq); /* * Helper functions @@ -437,7 +449,8 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, void *arg); static void create_cursor(ForeignScanState *node); static void fetch_more_data(ForeignScanState *node); -static void close_cursor(PGconn *conn, unsigned int cursor_number); +static void close_cursor(PGconn *conn, unsigned int cursor_number, + PgFdwConnState *conn_state); static PgFdwModifyState *create_foreign_modify(EState *estate, RangeTblEntry *rte, ResultRelInfo *resultRelInfo, @@ -491,6 +504,8 @@ static int postgresAcquireSampleRowsFunc(Relation relation, int elevel, double *totaldeadrows); static void analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate); +static void produce_tuple_asynchronously(AsyncRequest *areq, bool fetch); +static void fetch_more_data_begin(AsyncRequest *areq); static HeapTuple make_tuple_from_result_row(PGresult *res, int row, Relation rel, @@ -583,6 +598,12 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) /* Support functions for upper relation push-down */ routine->GetForeignUpperPaths = postgresGetForeignUpperPaths; + /* Support functions for asynchronous execution */ + routine->IsForeignPathAsyncCapable = postgresIsForeignPathAsyncCapable; + routine->ForeignAsyncRequest = postgresForeignAsyncRequest; + routine->ForeignAsyncConfigureWait = postgresForeignAsyncConfigureWait; + routine->ForeignAsyncNotify = postgresForeignAsyncNotify; + PG_RETURN_POINTER(routine); } @@ -618,14 +639,15 @@ postgresGetForeignRelSize(PlannerInfo *root, /* * Extract user-settable option values. Note that per-table settings of - * use_remote_estimate and fetch_size override per-server settings of - * them, respectively. + * use_remote_estimate, fetch_size and async_capable override per-server + * settings of them, respectively. */ fpinfo->use_remote_estimate = false; fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST; fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST; fpinfo->shippable_extensions = NIL; fpinfo->fetch_size = 100; + fpinfo->async_capable = false; apply_server_options(fpinfo); apply_table_options(fpinfo); @@ -1459,7 +1481,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - fsstate->conn = GetConnection(user, false); + fsstate->conn = GetConnection(user, false, &fsstate->conn_state); /* Assign a unique ID for my cursor */ fsstate->cursor_number = GetCursorNumber(fsstate->conn); @@ -1510,6 +1532,9 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) &fsstate->param_flinfo, &fsstate->param_exprs, &fsstate->param_values); + + /* Set the async-capable flag */ + fsstate->async_capable = node->ss.ps.plan->async_capable; } /* @@ -1524,8 +1549,10 @@ postgresIterateForeignScan(ForeignScanState *node) TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; /* - * If this is the first call after Begin or ReScan, we need to create the - * cursor on the remote side. + * In sync mode, if this is the first call after Begin or ReScan, we need + * to create the cursor on the remote side. In async mode, we would have + * already created the cursor before we get here, even if this is the + * first call after Begin or ReScan. */ if (!fsstate->cursor_exists) create_cursor(node); @@ -1535,6 +1562,9 @@ postgresIterateForeignScan(ForeignScanState *node) */ if (fsstate->next_tuple >= fsstate->num_tuples) { + /* In async mode, just clear tuple slot. */ + if (fsstate->async_capable) + return ExecClearTuple(slot); /* No point in another fetch if we already detected EOF, though. */ if (!fsstate->eof_reached) fetch_more_data(node); @@ -1596,7 +1626,7 @@ postgresReScanForeignScan(ForeignScanState *node) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_exec_query(fsstate->conn, sql); + res = pgfdw_exec_query(fsstate->conn, sql, fsstate->conn_state); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, fsstate->conn, true, sql); PQclear(res); @@ -1624,7 +1654,8 @@ postgresEndForeignScan(ForeignScanState *node) /* Close the cursor if open, to prevent accumulation of cursors */ if (fsstate->cursor_exists) - close_cursor(fsstate->conn, fsstate->cursor_number); + close_cursor(fsstate->conn, fsstate->cursor_number, + fsstate->conn_state); /* Release remote connection */ ReleaseConnection(fsstate->conn); @@ -2501,7 +2532,7 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags) * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - dmstate->conn = GetConnection(user, false); + dmstate->conn = GetConnection(user, false, &dmstate->conn_state); /* Update the foreign-join-related fields. */ if (fsplan->scan.scanrelid == 0) @@ -2882,7 +2913,7 @@ estimate_path_cost_size(PlannerInfo *root, false, &retrieved_attrs, NULL); /* Get the remote estimate */ - conn = GetConnection(fpinfo->user, false); + conn = GetConnection(fpinfo->user, false, NULL); get_remote_estimate(sql.data, conn, &rows, &width, &startup_cost, &total_cost); ReleaseConnection(conn); @@ -3328,7 +3359,7 @@ get_remote_estimate(const char *sql, PGconn *conn, /* * Execute EXPLAIN remotely. */ - res = pgfdw_exec_query(conn, sql); + res = pgfdw_exec_query(conn, sql, NULL); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, sql); @@ -3452,6 +3483,10 @@ create_cursor(ForeignScanState *node) StringInfoData buf; PGresult *res; + /* First, process a pending asynchronous request, if any. */ + if (fsstate->conn_state->pendingAreq) + process_pending_request(fsstate->conn_state->pendingAreq); + /* * Construct array of query parameter values in text format. We do the * conversions in the short-lived per-tuple context, so as not to cause a @@ -3532,17 +3567,38 @@ fetch_more_data(ForeignScanState *node) PG_TRY(); { PGconn *conn = fsstate->conn; - char sql[64]; int numrows; int i; - snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", - fsstate->fetch_size, fsstate->cursor_number); + if (fsstate->async_capable) + { + Assert(fsstate->conn_state->pendingAreq); - res = pgfdw_exec_query(conn, sql); - /* On error, report the original query, not the FETCH. */ - if (PQresultStatus(res) != PGRES_TUPLES_OK) - pgfdw_report_error(ERROR, res, conn, false, fsstate->query); + /* + * The query was already sent by an earlier call to + * fetch_more_data_begin. So now we just fetch the result. + */ + res = pgfdw_get_result(conn, fsstate->query); + /* On error, report the original query, not the FETCH. */ + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, false, fsstate->query); + + /* Reset per-connection state */ + fsstate->conn_state->pendingAreq = NULL; + } + else + { + char sql[64]; + + /* This is a regular synchronous fetch. */ + snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", + fsstate->fetch_size, fsstate->cursor_number); + + res = pgfdw_exec_query(conn, sql, fsstate->conn_state); + /* On error, report the original query, not the FETCH. */ + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, false, fsstate->query); + } /* Convert the data into HeapTuples */ numrows = PQntuples(res); @@ -3634,7 +3690,8 @@ reset_transmission_modes(int nestlevel) * Utility routine to close a cursor. */ static void -close_cursor(PGconn *conn, unsigned int cursor_number) +close_cursor(PGconn *conn, unsigned int cursor_number, + PgFdwConnState *conn_state) { char sql[64]; PGresult *res; @@ -3645,7 +3702,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_exec_query(conn, sql); + res = pgfdw_exec_query(conn, sql, conn_state); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, true, sql); PQclear(res); @@ -3694,7 +3751,7 @@ create_foreign_modify(EState *estate, user = GetUserMapping(userid, table->serverid); /* Open connection; report that we'll create a prepared statement. */ - fmstate->conn = GetConnection(user, true); + fmstate->conn = GetConnection(user, true, &fmstate->conn_state); fmstate->p_name = NULL; /* prepared statement not made yet */ /* Set up remote query information. */ @@ -3793,6 +3850,10 @@ execute_foreign_modify(EState *estate, operation == CMD_UPDATE || operation == CMD_DELETE); + /* First, process a pending asynchronous request, if any. */ + if (fmstate->conn_state->pendingAreq) + process_pending_request(fmstate->conn_state->pendingAreq); + /* * If the existing query was deparsed and prepared for a different number * of rows, rebuild it for the proper number. @@ -3894,6 +3955,11 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) char *p_name; PGresult *res; + /* + * The caller would already have processed a pending asynchronous request + * if any, so no need to do it here. + */ + /* Construct name we'll use for the prepared statement. */ snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u", GetPrepStmtNumber(fmstate->conn)); @@ -4079,7 +4145,7 @@ deallocate_query(PgFdwModifyState *fmstate) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_exec_query(fmstate->conn, sql); + res = pgfdw_exec_query(fmstate->conn, sql, fmstate->conn_state); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, fmstate->conn, true, sql); PQclear(res); @@ -4227,6 +4293,10 @@ execute_dml_stmt(ForeignScanState *node) int numParams = dmstate->numParams; const char **values = dmstate->param_values; + /* First, process a pending asynchronous request, if any. */ + if (dmstate->conn_state->pendingAreq) + process_pending_request(dmstate->conn_state->pendingAreq); + /* * Construct array of query parameter values in text format. */ @@ -4628,7 +4698,7 @@ postgresAnalyzeForeignTable(Relation relation, */ table = GetForeignTable(RelationGetRelid(relation)); user = GetUserMapping(relation->rd_rel->relowner, table->serverid); - conn = GetConnection(user, false); + conn = GetConnection(user, false, NULL); /* * Construct command to get page count for relation. @@ -4639,7 +4709,7 @@ postgresAnalyzeForeignTable(Relation relation, /* In what follows, do not risk leaking any PGresults. */ PG_TRY(); { - res = pgfdw_exec_query(conn, sql.data); + res = pgfdw_exec_query(conn, sql.data, NULL); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, sql.data); @@ -4714,7 +4784,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, table = GetForeignTable(RelationGetRelid(relation)); server = GetForeignServer(table->serverid); user = GetUserMapping(relation->rd_rel->relowner, table->serverid); - conn = GetConnection(user, false); + conn = GetConnection(user, false, NULL); /* * Construct cursor that retrieves whole rows from remote. @@ -4731,7 +4801,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, int fetch_size; ListCell *lc; - res = pgfdw_exec_query(conn, sql.data); + res = pgfdw_exec_query(conn, sql.data, NULL); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, false, sql.data); PQclear(res); @@ -4783,7 +4853,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, */ /* Fetch some rows */ - res = pgfdw_exec_query(conn, fetch_sql); + res = pgfdw_exec_query(conn, fetch_sql, NULL); /* On error, report the original query, not the FETCH. */ if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, sql.data); @@ -4802,7 +4872,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, } /* Close the cursor, just to be tidy. */ - close_cursor(conn, cursor_number); + close_cursor(conn, cursor_number, NULL); } PG_CATCH(); { @@ -4942,7 +5012,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) */ server = GetForeignServer(serverOid); mapping = GetUserMapping(GetUserId(), server->serverid); - conn = GetConnection(mapping, false); + conn = GetConnection(mapping, false, NULL); /* Don't attempt to import collation if remote server hasn't got it */ if (PQserverVersion(conn) < 90100) @@ -4958,7 +5028,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = "); deparseStringLiteral(&buf, stmt->remote_schema); - res = pgfdw_exec_query(conn, buf.data); + res = pgfdw_exec_query(conn, buf.data, NULL); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, buf.data); @@ -5070,7 +5140,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum"); /* Fetch the data */ - res = pgfdw_exec_query(conn, buf.data); + res = pgfdw_exec_query(conn, buf.data, NULL); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, buf.data); @@ -5530,6 +5600,8 @@ apply_server_options(PgFdwRelationInfo *fpinfo) ExtractExtensionList(defGetString(def), false); else if (strcmp(def->defname, "fetch_size") == 0) fpinfo->fetch_size = strtol(defGetString(def), NULL, 10); + else if (strcmp(def->defname, "async_capable") == 0) + fpinfo->async_capable = defGetBoolean(def); } } @@ -5551,6 +5623,8 @@ apply_table_options(PgFdwRelationInfo *fpinfo) fpinfo->use_remote_estimate = defGetBoolean(def); else if (strcmp(def->defname, "fetch_size") == 0) fpinfo->fetch_size = strtol(defGetString(def), NULL, 10); + else if (strcmp(def->defname, "async_capable") == 0) + fpinfo->async_capable = defGetBoolean(def); } } @@ -5585,6 +5659,7 @@ merge_fdw_options(PgFdwRelationInfo *fpinfo, fpinfo->shippable_extensions = fpinfo_o->shippable_extensions; fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate; fpinfo->fetch_size = fpinfo_o->fetch_size; + fpinfo->async_capable = fpinfo_o->async_capable; /* Merge the table level options from either side of the join. */ if (fpinfo_i) @@ -5606,6 +5681,13 @@ merge_fdw_options(PgFdwRelationInfo *fpinfo, * relation sizes. */ fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size); + + /* + * We'll prefer to consider this join async-capable if any table from + * either side of the join is considered async-capable. + */ + fpinfo->async_capable = fpinfo_o->async_capable || + fpinfo_i->async_capable; } } @@ -6490,6 +6572,236 @@ add_foreign_final_paths(PlannerInfo *root, RelOptInfo *input_rel, } /* + * postgresIsForeignPathAsyncCapable + * Check whether a given ForeignPath node is async-capable. + */ +static bool +postgresIsForeignPathAsyncCapable(ForeignPath *path) +{ + RelOptInfo *rel = ((Path *) path)->parent; + PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private; + + return fpinfo->async_capable; +} + +/* + * postgresForeignAsyncRequest + * Asynchronously request next tuple from a foreign PostgreSQL table. + */ +static void +postgresForeignAsyncRequest(AsyncRequest *areq) +{ + produce_tuple_asynchronously(areq, true); +} + +/* + * postgresForeignAsyncConfigureWait + * Configure a file descriptor event for which we wish to wait. + */ +static void +postgresForeignAsyncConfigureWait(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq; + AppendState *requestor = (AppendState *) areq->requestor; + WaitEventSet *set = requestor->as_eventset; + + /* This should not be called unless callback_pending */ + Assert(areq->callback_pending); + + /* The core code would have registered postmaster death event */ + Assert(GetNumRegisteredWaitEvents(set) >= 1); + + /* Begin an asynchronous data fetch if not already done */ + if (!pendingAreq) + fetch_more_data_begin(areq); + else if (pendingAreq->requestor != areq->requestor) + { + /* + * This is the case when the in-process request was made by another + * Append. Note that it might be useless to process the request, + * because the query might not need tuples from that Append anymore. + * Skip the given request if there are any configured events other + * than the postmaster death event; otherwise process the request, + * then begin a fetch to configure the event below, because otherwise + * we might end up with no configured events other than the postmaster + * death event. + */ + if (GetNumRegisteredWaitEvents(set) > 1) + return; + process_pending_request(pendingAreq); + fetch_more_data_begin(areq); + } + else if (pendingAreq->requestee != areq->requestee) + { + /* + * This is the case when the in-process request was made by the same + * parent but for a different child. Since we configure only the + * event for the request made for that child, skip the given request. + */ + return; + } + else + Assert(pendingAreq == areq); + + AddWaitEventToSet(set, WL_SOCKET_READABLE, PQsocket(fsstate->conn), + NULL, areq); +} + +/* + * postgresForeignAsyncNotify + * Fetch some more tuples from a file descriptor that becomes ready, + * requesting next tuple. + */ +static void +postgresForeignAsyncNotify(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + + /* The request should be currently in-process */ + Assert(fsstate->conn_state->pendingAreq == areq); + + /* The core code would have initialized the callback_pending flag */ + Assert(!areq->callback_pending); + + /* On error, report the original query, not the FETCH. */ + if (!PQconsumeInput(fsstate->conn)) + pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query); + + fetch_more_data(node); + + produce_tuple_asynchronously(areq, true); +} + +/* + * Asynchronously produce next tuple from a foreign PostgreSQL table. + */ +static void +produce_tuple_asynchronously(AsyncRequest *areq, bool fetch) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq; + TupleTableSlot *result; + + /* This should not be called if the request is currently in-process */ + Assert(areq != pendingAreq); + + /* Fetch some more tuples, if we've run out */ + if (fsstate->next_tuple >= fsstate->num_tuples) + { + /* No point in another fetch if we already detected EOF, though */ + if (!fsstate->eof_reached) + { + /* Mark the request as pending for a callback */ + ExecAsyncRequestPending(areq); + /* Begin another fetch if requested and if no pending request */ + if (fetch && !pendingAreq) + fetch_more_data_begin(areq); + } + else + { + /* There's nothing more to do; just return a NULL pointer */ + result = NULL; + /* Mark the request as complete */ + ExecAsyncRequestDone(areq, result); + } + return; + } + + /* Get a tuple from the ForeignScan node */ + result = ExecProcNode((PlanState *) node); + if (!TupIsNull(result)) + { + /* Mark the request as complete */ + ExecAsyncRequestDone(areq, result); + return; + } + Assert(fsstate->next_tuple >= fsstate->num_tuples); + + /* Fetch some more tuples, if we've not detected EOF yet */ + if (!fsstate->eof_reached) + { + /* Mark the request as pending for a callback */ + ExecAsyncRequestPending(areq); + /* Begin another fetch if requested and if no pending request */ + if (fetch && !pendingAreq) + fetch_more_data_begin(areq); + } + else + { + /* There's nothing more to do; just return a NULL pointer */ + result = NULL; + /* Mark the request as complete */ + ExecAsyncRequestDone(areq, result); + } +} + +/* + * Begin an asynchronous data fetch. + * + * Note: fetch_more_data must be called to fetch the result. + */ +static void +fetch_more_data_begin(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + char sql[64]; + + Assert(!fsstate->conn_state->pendingAreq); + + /* Create the cursor synchronously. */ + if (!fsstate->cursor_exists) + create_cursor(node); + + /* We will send this query, but not wait for the response. */ + snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", + fsstate->fetch_size, fsstate->cursor_number); + + if (PQsendQuery(fsstate->conn, sql) < 0) + pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query); + + /* Remember that the request is in process */ + fsstate->conn_state->pendingAreq = areq; +} + +/* + * Process a pending asynchronous request. + */ +void +process_pending_request(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + EState *estate = node->ss.ps.state; + MemoryContext oldcontext; + + /* The request should be currently in-process */ + Assert(fsstate->conn_state->pendingAreq == areq); + + oldcontext = MemoryContextSwitchTo(estate->es_query_cxt); + + /* The request would have been pending for a callback */ + Assert(areq->callback_pending); + + /* Unlike AsyncNotify, we unset callback_pending ourselves */ + areq->callback_pending = false; + + fetch_more_data(node); + + /* We need to send a new query afterwards; don't fetch */ + produce_tuple_asynchronously(areq, false); + + /* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */ + ExecAsyncResponse(areq); + + MemoryContextSwitchTo(oldcontext); +} + +/* * Create a tuple from the specified row of the PGresult. * * rel is the local representation of the foreign table, attinmeta is |