diff options
Diffstat (limited to 'contrib/postgres_fdw/postgres_fdw.c')
-rw-r--r-- | contrib/postgres_fdw/postgres_fdw.c | 374 |
1 files changed, 343 insertions, 31 deletions
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 20b25935ce6..cc73a6902f5 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -21,6 +21,7 @@ #include "commands/defrem.h" #include "commands/explain.h" #include "commands/vacuum.h" +#include "executor/execAsync.h" #include "foreign/fdwapi.h" #include "funcapi.h" #include "miscadmin.h" @@ -37,6 +38,7 @@ #include "optimizer/tlist.h" #include "parser/parsetree.h" #include "postgres_fdw.h" +#include "storage/latch.h" #include "utils/builtins.h" #include "utils/float.h" #include "utils/guc.h" @@ -143,6 +145,7 @@ typedef struct PgFdwScanState /* for remote query execution */ PGconn *conn; /* connection for the scan */ + PgFdwConnState *conn_state; /* extra per-connection state */ unsigned int cursor_number; /* quasi-unique ID for my cursor */ bool cursor_exists; /* have we created the cursor? */ int numParams; /* number of parameters passed to query */ @@ -159,6 +162,9 @@ typedef struct PgFdwScanState int fetch_ct_2; /* Min(# of fetches done, 2) */ bool eof_reached; /* true if last fetch reached EOF */ + /* for asynchronous execution */ + bool async_capable; /* engage asynchronous-capable logic? */ + /* working memory contexts */ MemoryContext batch_cxt; /* context holding current batch of tuples */ MemoryContext temp_cxt; /* context for per-tuple temporary data */ @@ -176,6 +182,7 @@ typedef struct PgFdwModifyState /* for remote query execution */ PGconn *conn; /* connection for the scan */ + PgFdwConnState *conn_state; /* extra per-connection state */ char *p_name; /* name of prepared statement, if created */ /* extracted fdw_private data */ @@ -219,6 +226,7 @@ typedef struct PgFdwDirectModifyState /* for remote query execution */ PGconn *conn; /* connection for the update */ + PgFdwConnState *conn_state; /* extra per-connection state */ int numParams; /* number of parameters passed to query */ FmgrInfo *param_flinfo; /* output conversion functions for them */ List *param_exprs; /* executable expressions for param values */ @@ -408,6 +416,10 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *output_rel, void *extra); +static bool postgresIsForeignPathAsyncCapable(ForeignPath *path); +static void postgresForeignAsyncRequest(AsyncRequest *areq); +static void postgresForeignAsyncConfigureWait(AsyncRequest *areq); +static void postgresForeignAsyncNotify(AsyncRequest *areq); /* * Helper functions @@ -437,7 +449,8 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, void *arg); static void create_cursor(ForeignScanState *node); static void fetch_more_data(ForeignScanState *node); -static void close_cursor(PGconn *conn, unsigned int cursor_number); +static void close_cursor(PGconn *conn, unsigned int cursor_number, + PgFdwConnState *conn_state); static PgFdwModifyState *create_foreign_modify(EState *estate, RangeTblEntry *rte, ResultRelInfo *resultRelInfo, @@ -491,6 +504,8 @@ static int postgresAcquireSampleRowsFunc(Relation relation, int elevel, double *totaldeadrows); static void analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate); +static void produce_tuple_asynchronously(AsyncRequest *areq, bool fetch); +static void fetch_more_data_begin(AsyncRequest *areq); static HeapTuple make_tuple_from_result_row(PGresult *res, int row, Relation rel, @@ -583,6 +598,12 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) /* Support functions for upper relation push-down */ routine->GetForeignUpperPaths = postgresGetForeignUpperPaths; + /* Support functions for asynchronous execution */ + routine->IsForeignPathAsyncCapable = postgresIsForeignPathAsyncCapable; + routine->ForeignAsyncRequest = postgresForeignAsyncRequest; + routine->ForeignAsyncConfigureWait = postgresForeignAsyncConfigureWait; + routine->ForeignAsyncNotify = postgresForeignAsyncNotify; + PG_RETURN_POINTER(routine); } @@ -618,14 +639,15 @@ postgresGetForeignRelSize(PlannerInfo *root, /* * Extract user-settable option values. Note that per-table settings of - * use_remote_estimate and fetch_size override per-server settings of - * them, respectively. + * use_remote_estimate, fetch_size and async_capable override per-server + * settings of them, respectively. */ fpinfo->use_remote_estimate = false; fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST; fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST; fpinfo->shippable_extensions = NIL; fpinfo->fetch_size = 100; + fpinfo->async_capable = false; apply_server_options(fpinfo); apply_table_options(fpinfo); @@ -1459,7 +1481,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - fsstate->conn = GetConnection(user, false); + fsstate->conn = GetConnection(user, false, &fsstate->conn_state); /* Assign a unique ID for my cursor */ fsstate->cursor_number = GetCursorNumber(fsstate->conn); @@ -1510,6 +1532,9 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) &fsstate->param_flinfo, &fsstate->param_exprs, &fsstate->param_values); + + /* Set the async-capable flag */ + fsstate->async_capable = node->ss.ps.plan->async_capable; } /* @@ -1524,8 +1549,10 @@ postgresIterateForeignScan(ForeignScanState *node) TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; /* - * If this is the first call after Begin or ReScan, we need to create the - * cursor on the remote side. + * In sync mode, if this is the first call after Begin or ReScan, we need + * to create the cursor on the remote side. In async mode, we would have + * already created the cursor before we get here, even if this is the + * first call after Begin or ReScan. */ if (!fsstate->cursor_exists) create_cursor(node); @@ -1535,6 +1562,9 @@ postgresIterateForeignScan(ForeignScanState *node) */ if (fsstate->next_tuple >= fsstate->num_tuples) { + /* In async mode, just clear tuple slot. */ + if (fsstate->async_capable) + return ExecClearTuple(slot); /* No point in another fetch if we already detected EOF, though. */ if (!fsstate->eof_reached) fetch_more_data(node); @@ -1596,7 +1626,7 @@ postgresReScanForeignScan(ForeignScanState *node) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_exec_query(fsstate->conn, sql); + res = pgfdw_exec_query(fsstate->conn, sql, fsstate->conn_state); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, fsstate->conn, true, sql); PQclear(res); @@ -1624,7 +1654,8 @@ postgresEndForeignScan(ForeignScanState *node) /* Close the cursor if open, to prevent accumulation of cursors */ if (fsstate->cursor_exists) - close_cursor(fsstate->conn, fsstate->cursor_number); + close_cursor(fsstate->conn, fsstate->cursor_number, + fsstate->conn_state); /* Release remote connection */ ReleaseConnection(fsstate->conn); @@ -2501,7 +2532,7 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags) * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - dmstate->conn = GetConnection(user, false); + dmstate->conn = GetConnection(user, false, &dmstate->conn_state); /* Update the foreign-join-related fields. */ if (fsplan->scan.scanrelid == 0) @@ -2882,7 +2913,7 @@ estimate_path_cost_size(PlannerInfo *root, false, &retrieved_attrs, NULL); /* Get the remote estimate */ - conn = GetConnection(fpinfo->user, false); + conn = GetConnection(fpinfo->user, false, NULL); get_remote_estimate(sql.data, conn, &rows, &width, &startup_cost, &total_cost); ReleaseConnection(conn); @@ -3328,7 +3359,7 @@ get_remote_estimate(const char *sql, PGconn *conn, /* * Execute EXPLAIN remotely. */ - res = pgfdw_exec_query(conn, sql); + res = pgfdw_exec_query(conn, sql, NULL); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, sql); @@ -3452,6 +3483,10 @@ create_cursor(ForeignScanState *node) StringInfoData buf; PGresult *res; + /* First, process a pending asynchronous request, if any. */ + if (fsstate->conn_state->pendingAreq) + process_pending_request(fsstate->conn_state->pendingAreq); + /* * Construct array of query parameter values in text format. We do the * conversions in the short-lived per-tuple context, so as not to cause a @@ -3532,17 +3567,38 @@ fetch_more_data(ForeignScanState *node) PG_TRY(); { PGconn *conn = fsstate->conn; - char sql[64]; int numrows; int i; - snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", - fsstate->fetch_size, fsstate->cursor_number); + if (fsstate->async_capable) + { + Assert(fsstate->conn_state->pendingAreq); - res = pgfdw_exec_query(conn, sql); - /* On error, report the original query, not the FETCH. */ - if (PQresultStatus(res) != PGRES_TUPLES_OK) - pgfdw_report_error(ERROR, res, conn, false, fsstate->query); + /* + * The query was already sent by an earlier call to + * fetch_more_data_begin. So now we just fetch the result. + */ + res = pgfdw_get_result(conn, fsstate->query); + /* On error, report the original query, not the FETCH. */ + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, false, fsstate->query); + + /* Reset per-connection state */ + fsstate->conn_state->pendingAreq = NULL; + } + else + { + char sql[64]; + + /* This is a regular synchronous fetch. */ + snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", + fsstate->fetch_size, fsstate->cursor_number); + + res = pgfdw_exec_query(conn, sql, fsstate->conn_state); + /* On error, report the original query, not the FETCH. */ + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, false, fsstate->query); + } /* Convert the data into HeapTuples */ numrows = PQntuples(res); @@ -3634,7 +3690,8 @@ reset_transmission_modes(int nestlevel) * Utility routine to close a cursor. */ static void -close_cursor(PGconn *conn, unsigned int cursor_number) +close_cursor(PGconn *conn, unsigned int cursor_number, + PgFdwConnState *conn_state) { char sql[64]; PGresult *res; @@ -3645,7 +3702,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_exec_query(conn, sql); + res = pgfdw_exec_query(conn, sql, conn_state); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, true, sql); PQclear(res); @@ -3694,7 +3751,7 @@ create_foreign_modify(EState *estate, user = GetUserMapping(userid, table->serverid); /* Open connection; report that we'll create a prepared statement. */ - fmstate->conn = GetConnection(user, true); + fmstate->conn = GetConnection(user, true, &fmstate->conn_state); fmstate->p_name = NULL; /* prepared statement not made yet */ /* Set up remote query information. */ @@ -3793,6 +3850,10 @@ execute_foreign_modify(EState *estate, operation == CMD_UPDATE || operation == CMD_DELETE); + /* First, process a pending asynchronous request, if any. */ + if (fmstate->conn_state->pendingAreq) + process_pending_request(fmstate->conn_state->pendingAreq); + /* * If the existing query was deparsed and prepared for a different number * of rows, rebuild it for the proper number. @@ -3894,6 +3955,11 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) char *p_name; PGresult *res; + /* + * The caller would already have processed a pending asynchronous request + * if any, so no need to do it here. + */ + /* Construct name we'll use for the prepared statement. */ snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u", GetPrepStmtNumber(fmstate->conn)); @@ -4079,7 +4145,7 @@ deallocate_query(PgFdwModifyState *fmstate) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_exec_query(fmstate->conn, sql); + res = pgfdw_exec_query(fmstate->conn, sql, fmstate->conn_state); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, fmstate->conn, true, sql); PQclear(res); @@ -4227,6 +4293,10 @@ execute_dml_stmt(ForeignScanState *node) int numParams = dmstate->numParams; const char **values = dmstate->param_values; + /* First, process a pending asynchronous request, if any. */ + if (dmstate->conn_state->pendingAreq) + process_pending_request(dmstate->conn_state->pendingAreq); + /* * Construct array of query parameter values in text format. */ @@ -4628,7 +4698,7 @@ postgresAnalyzeForeignTable(Relation relation, */ table = GetForeignTable(RelationGetRelid(relation)); user = GetUserMapping(relation->rd_rel->relowner, table->serverid); - conn = GetConnection(user, false); + conn = GetConnection(user, false, NULL); /* * Construct command to get page count for relation. @@ -4639,7 +4709,7 @@ postgresAnalyzeForeignTable(Relation relation, /* In what follows, do not risk leaking any PGresults. */ PG_TRY(); { - res = pgfdw_exec_query(conn, sql.data); + res = pgfdw_exec_query(conn, sql.data, NULL); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, sql.data); @@ -4714,7 +4784,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, table = GetForeignTable(RelationGetRelid(relation)); server = GetForeignServer(table->serverid); user = GetUserMapping(relation->rd_rel->relowner, table->serverid); - conn = GetConnection(user, false); + conn = GetConnection(user, false, NULL); /* * Construct cursor that retrieves whole rows from remote. @@ -4731,7 +4801,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, int fetch_size; ListCell *lc; - res = pgfdw_exec_query(conn, sql.data); + res = pgfdw_exec_query(conn, sql.data, NULL); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, false, sql.data); PQclear(res); @@ -4783,7 +4853,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, */ /* Fetch some rows */ - res = pgfdw_exec_query(conn, fetch_sql); + res = pgfdw_exec_query(conn, fetch_sql, NULL); /* On error, report the original query, not the FETCH. */ if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, sql.data); @@ -4802,7 +4872,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, } /* Close the cursor, just to be tidy. */ - close_cursor(conn, cursor_number); + close_cursor(conn, cursor_number, NULL); } PG_CATCH(); { @@ -4942,7 +5012,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) */ server = GetForeignServer(serverOid); mapping = GetUserMapping(GetUserId(), server->serverid); - conn = GetConnection(mapping, false); + conn = GetConnection(mapping, false, NULL); /* Don't attempt to import collation if remote server hasn't got it */ if (PQserverVersion(conn) < 90100) @@ -4958,7 +5028,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = "); deparseStringLiteral(&buf, stmt->remote_schema); - res = pgfdw_exec_query(conn, buf.data); + res = pgfdw_exec_query(conn, buf.data, NULL); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, buf.data); @@ -5070,7 +5140,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum"); /* Fetch the data */ - res = pgfdw_exec_query(conn, buf.data); + res = pgfdw_exec_query(conn, buf.data, NULL); if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, buf.data); @@ -5530,6 +5600,8 @@ apply_server_options(PgFdwRelationInfo *fpinfo) ExtractExtensionList(defGetString(def), false); else if (strcmp(def->defname, "fetch_size") == 0) fpinfo->fetch_size = strtol(defGetString(def), NULL, 10); + else if (strcmp(def->defname, "async_capable") == 0) + fpinfo->async_capable = defGetBoolean(def); } } @@ -5551,6 +5623,8 @@ apply_table_options(PgFdwRelationInfo *fpinfo) fpinfo->use_remote_estimate = defGetBoolean(def); else if (strcmp(def->defname, "fetch_size") == 0) fpinfo->fetch_size = strtol(defGetString(def), NULL, 10); + else if (strcmp(def->defname, "async_capable") == 0) + fpinfo->async_capable = defGetBoolean(def); } } @@ -5585,6 +5659,7 @@ merge_fdw_options(PgFdwRelationInfo *fpinfo, fpinfo->shippable_extensions = fpinfo_o->shippable_extensions; fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate; fpinfo->fetch_size = fpinfo_o->fetch_size; + fpinfo->async_capable = fpinfo_o->async_capable; /* Merge the table level options from either side of the join. */ if (fpinfo_i) @@ -5606,6 +5681,13 @@ merge_fdw_options(PgFdwRelationInfo *fpinfo, * relation sizes. */ fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size); + + /* + * We'll prefer to consider this join async-capable if any table from + * either side of the join is considered async-capable. + */ + fpinfo->async_capable = fpinfo_o->async_capable || + fpinfo_i->async_capable; } } @@ -6490,6 +6572,236 @@ add_foreign_final_paths(PlannerInfo *root, RelOptInfo *input_rel, } /* + * postgresIsForeignPathAsyncCapable + * Check whether a given ForeignPath node is async-capable. + */ +static bool +postgresIsForeignPathAsyncCapable(ForeignPath *path) +{ + RelOptInfo *rel = ((Path *) path)->parent; + PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private; + + return fpinfo->async_capable; +} + +/* + * postgresForeignAsyncRequest + * Asynchronously request next tuple from a foreign PostgreSQL table. + */ +static void +postgresForeignAsyncRequest(AsyncRequest *areq) +{ + produce_tuple_asynchronously(areq, true); +} + +/* + * postgresForeignAsyncConfigureWait + * Configure a file descriptor event for which we wish to wait. + */ +static void +postgresForeignAsyncConfigureWait(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq; + AppendState *requestor = (AppendState *) areq->requestor; + WaitEventSet *set = requestor->as_eventset; + + /* This should not be called unless callback_pending */ + Assert(areq->callback_pending); + + /* The core code would have registered postmaster death event */ + Assert(GetNumRegisteredWaitEvents(set) >= 1); + + /* Begin an asynchronous data fetch if not already done */ + if (!pendingAreq) + fetch_more_data_begin(areq); + else if (pendingAreq->requestor != areq->requestor) + { + /* + * This is the case when the in-process request was made by another + * Append. Note that it might be useless to process the request, + * because the query might not need tuples from that Append anymore. + * Skip the given request if there are any configured events other + * than the postmaster death event; otherwise process the request, + * then begin a fetch to configure the event below, because otherwise + * we might end up with no configured events other than the postmaster + * death event. + */ + if (GetNumRegisteredWaitEvents(set) > 1) + return; + process_pending_request(pendingAreq); + fetch_more_data_begin(areq); + } + else if (pendingAreq->requestee != areq->requestee) + { + /* + * This is the case when the in-process request was made by the same + * parent but for a different child. Since we configure only the + * event for the request made for that child, skip the given request. + */ + return; + } + else + Assert(pendingAreq == areq); + + AddWaitEventToSet(set, WL_SOCKET_READABLE, PQsocket(fsstate->conn), + NULL, areq); +} + +/* + * postgresForeignAsyncNotify + * Fetch some more tuples from a file descriptor that becomes ready, + * requesting next tuple. + */ +static void +postgresForeignAsyncNotify(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + + /* The request should be currently in-process */ + Assert(fsstate->conn_state->pendingAreq == areq); + + /* The core code would have initialized the callback_pending flag */ + Assert(!areq->callback_pending); + + /* On error, report the original query, not the FETCH. */ + if (!PQconsumeInput(fsstate->conn)) + pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query); + + fetch_more_data(node); + + produce_tuple_asynchronously(areq, true); +} + +/* + * Asynchronously produce next tuple from a foreign PostgreSQL table. + */ +static void +produce_tuple_asynchronously(AsyncRequest *areq, bool fetch) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq; + TupleTableSlot *result; + + /* This should not be called if the request is currently in-process */ + Assert(areq != pendingAreq); + + /* Fetch some more tuples, if we've run out */ + if (fsstate->next_tuple >= fsstate->num_tuples) + { + /* No point in another fetch if we already detected EOF, though */ + if (!fsstate->eof_reached) + { + /* Mark the request as pending for a callback */ + ExecAsyncRequestPending(areq); + /* Begin another fetch if requested and if no pending request */ + if (fetch && !pendingAreq) + fetch_more_data_begin(areq); + } + else + { + /* There's nothing more to do; just return a NULL pointer */ + result = NULL; + /* Mark the request as complete */ + ExecAsyncRequestDone(areq, result); + } + return; + } + + /* Get a tuple from the ForeignScan node */ + result = ExecProcNode((PlanState *) node); + if (!TupIsNull(result)) + { + /* Mark the request as complete */ + ExecAsyncRequestDone(areq, result); + return; + } + Assert(fsstate->next_tuple >= fsstate->num_tuples); + + /* Fetch some more tuples, if we've not detected EOF yet */ + if (!fsstate->eof_reached) + { + /* Mark the request as pending for a callback */ + ExecAsyncRequestPending(areq); + /* Begin another fetch if requested and if no pending request */ + if (fetch && !pendingAreq) + fetch_more_data_begin(areq); + } + else + { + /* There's nothing more to do; just return a NULL pointer */ + result = NULL; + /* Mark the request as complete */ + ExecAsyncRequestDone(areq, result); + } +} + +/* + * Begin an asynchronous data fetch. + * + * Note: fetch_more_data must be called to fetch the result. + */ +static void +fetch_more_data_begin(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + char sql[64]; + + Assert(!fsstate->conn_state->pendingAreq); + + /* Create the cursor synchronously. */ + if (!fsstate->cursor_exists) + create_cursor(node); + + /* We will send this query, but not wait for the response. */ + snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", + fsstate->fetch_size, fsstate->cursor_number); + + if (PQsendQuery(fsstate->conn, sql) < 0) + pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query); + + /* Remember that the request is in process */ + fsstate->conn_state->pendingAreq = areq; +} + +/* + * Process a pending asynchronous request. + */ +void +process_pending_request(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + EState *estate = node->ss.ps.state; + MemoryContext oldcontext; + + /* The request should be currently in-process */ + Assert(fsstate->conn_state->pendingAreq == areq); + + oldcontext = MemoryContextSwitchTo(estate->es_query_cxt); + + /* The request would have been pending for a callback */ + Assert(areq->callback_pending); + + /* Unlike AsyncNotify, we unset callback_pending ourselves */ + areq->callback_pending = false; + + fetch_more_data(node); + + /* We need to send a new query afterwards; don't fetch */ + produce_tuple_asynchronously(areq, false); + + /* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */ + ExecAsyncResponse(areq); + + MemoryContextSwitchTo(oldcontext); +} + +/* * Create a tuple from the specified row of the PGresult. * * rel is the local representation of the foreign table, attinmeta is |