aboutsummaryrefslogtreecommitdiff
path: root/contrib/postgres_fdw/postgres_fdw.c
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/postgres_fdw/postgres_fdw.c')
-rw-r--r--contrib/postgres_fdw/postgres_fdw.c374
1 files changed, 343 insertions, 31 deletions
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 20b25935ce6..cc73a6902f5 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -21,6 +21,7 @@
#include "commands/defrem.h"
#include "commands/explain.h"
#include "commands/vacuum.h"
+#include "executor/execAsync.h"
#include "foreign/fdwapi.h"
#include "funcapi.h"
#include "miscadmin.h"
@@ -37,6 +38,7 @@
#include "optimizer/tlist.h"
#include "parser/parsetree.h"
#include "postgres_fdw.h"
+#include "storage/latch.h"
#include "utils/builtins.h"
#include "utils/float.h"
#include "utils/guc.h"
@@ -143,6 +145,7 @@ typedef struct PgFdwScanState
/* for remote query execution */
PGconn *conn; /* connection for the scan */
+ PgFdwConnState *conn_state; /* extra per-connection state */
unsigned int cursor_number; /* quasi-unique ID for my cursor */
bool cursor_exists; /* have we created the cursor? */
int numParams; /* number of parameters passed to query */
@@ -159,6 +162,9 @@ typedef struct PgFdwScanState
int fetch_ct_2; /* Min(# of fetches done, 2) */
bool eof_reached; /* true if last fetch reached EOF */
+ /* for asynchronous execution */
+ bool async_capable; /* engage asynchronous-capable logic? */
+
/* working memory contexts */
MemoryContext batch_cxt; /* context holding current batch of tuples */
MemoryContext temp_cxt; /* context for per-tuple temporary data */
@@ -176,6 +182,7 @@ typedef struct PgFdwModifyState
/* for remote query execution */
PGconn *conn; /* connection for the scan */
+ PgFdwConnState *conn_state; /* extra per-connection state */
char *p_name; /* name of prepared statement, if created */
/* extracted fdw_private data */
@@ -219,6 +226,7 @@ typedef struct PgFdwDirectModifyState
/* for remote query execution */
PGconn *conn; /* connection for the update */
+ PgFdwConnState *conn_state; /* extra per-connection state */
int numParams; /* number of parameters passed to query */
FmgrInfo *param_flinfo; /* output conversion functions for them */
List *param_exprs; /* executable expressions for param values */
@@ -408,6 +416,10 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root,
RelOptInfo *input_rel,
RelOptInfo *output_rel,
void *extra);
+static bool postgresIsForeignPathAsyncCapable(ForeignPath *path);
+static void postgresForeignAsyncRequest(AsyncRequest *areq);
+static void postgresForeignAsyncConfigureWait(AsyncRequest *areq);
+static void postgresForeignAsyncNotify(AsyncRequest *areq);
/*
* Helper functions
@@ -437,7 +449,8 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
void *arg);
static void create_cursor(ForeignScanState *node);
static void fetch_more_data(ForeignScanState *node);
-static void close_cursor(PGconn *conn, unsigned int cursor_number);
+static void close_cursor(PGconn *conn, unsigned int cursor_number,
+ PgFdwConnState *conn_state);
static PgFdwModifyState *create_foreign_modify(EState *estate,
RangeTblEntry *rte,
ResultRelInfo *resultRelInfo,
@@ -491,6 +504,8 @@ static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
double *totaldeadrows);
static void analyze_row_processor(PGresult *res, int row,
PgFdwAnalyzeState *astate);
+static void produce_tuple_asynchronously(AsyncRequest *areq, bool fetch);
+static void fetch_more_data_begin(AsyncRequest *areq);
static HeapTuple make_tuple_from_result_row(PGresult *res,
int row,
Relation rel,
@@ -583,6 +598,12 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
/* Support functions for upper relation push-down */
routine->GetForeignUpperPaths = postgresGetForeignUpperPaths;
+ /* Support functions for asynchronous execution */
+ routine->IsForeignPathAsyncCapable = postgresIsForeignPathAsyncCapable;
+ routine->ForeignAsyncRequest = postgresForeignAsyncRequest;
+ routine->ForeignAsyncConfigureWait = postgresForeignAsyncConfigureWait;
+ routine->ForeignAsyncNotify = postgresForeignAsyncNotify;
+
PG_RETURN_POINTER(routine);
}
@@ -618,14 +639,15 @@ postgresGetForeignRelSize(PlannerInfo *root,
/*
* Extract user-settable option values. Note that per-table settings of
- * use_remote_estimate and fetch_size override per-server settings of
- * them, respectively.
+ * use_remote_estimate, fetch_size and async_capable override per-server
+ * settings of them, respectively.
*/
fpinfo->use_remote_estimate = false;
fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST;
fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST;
fpinfo->shippable_extensions = NIL;
fpinfo->fetch_size = 100;
+ fpinfo->async_capable = false;
apply_server_options(fpinfo);
apply_table_options(fpinfo);
@@ -1459,7 +1481,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
* Get connection to the foreign server. Connection manager will
* establish new connection if necessary.
*/
- fsstate->conn = GetConnection(user, false);
+ fsstate->conn = GetConnection(user, false, &fsstate->conn_state);
/* Assign a unique ID for my cursor */
fsstate->cursor_number = GetCursorNumber(fsstate->conn);
@@ -1510,6 +1532,9 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
&fsstate->param_flinfo,
&fsstate->param_exprs,
&fsstate->param_values);
+
+ /* Set the async-capable flag */
+ fsstate->async_capable = node->ss.ps.plan->async_capable;
}
/*
@@ -1524,8 +1549,10 @@ postgresIterateForeignScan(ForeignScanState *node)
TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
/*
- * If this is the first call after Begin or ReScan, we need to create the
- * cursor on the remote side.
+ * In sync mode, if this is the first call after Begin or ReScan, we need
+ * to create the cursor on the remote side. In async mode, we would have
+ * already created the cursor before we get here, even if this is the
+ * first call after Begin or ReScan.
*/
if (!fsstate->cursor_exists)
create_cursor(node);
@@ -1535,6 +1562,9 @@ postgresIterateForeignScan(ForeignScanState *node)
*/
if (fsstate->next_tuple >= fsstate->num_tuples)
{
+ /* In async mode, just clear tuple slot. */
+ if (fsstate->async_capable)
+ return ExecClearTuple(slot);
/* No point in another fetch if we already detected EOF, though. */
if (!fsstate->eof_reached)
fetch_more_data(node);
@@ -1596,7 +1626,7 @@ postgresReScanForeignScan(ForeignScanState *node)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = pgfdw_exec_query(fsstate->conn, sql);
+ res = pgfdw_exec_query(fsstate->conn, sql, fsstate->conn_state);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
PQclear(res);
@@ -1624,7 +1654,8 @@ postgresEndForeignScan(ForeignScanState *node)
/* Close the cursor if open, to prevent accumulation of cursors */
if (fsstate->cursor_exists)
- close_cursor(fsstate->conn, fsstate->cursor_number);
+ close_cursor(fsstate->conn, fsstate->cursor_number,
+ fsstate->conn_state);
/* Release remote connection */
ReleaseConnection(fsstate->conn);
@@ -2501,7 +2532,7 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
* Get connection to the foreign server. Connection manager will
* establish new connection if necessary.
*/
- dmstate->conn = GetConnection(user, false);
+ dmstate->conn = GetConnection(user, false, &dmstate->conn_state);
/* Update the foreign-join-related fields. */
if (fsplan->scan.scanrelid == 0)
@@ -2882,7 +2913,7 @@ estimate_path_cost_size(PlannerInfo *root,
false, &retrieved_attrs, NULL);
/* Get the remote estimate */
- conn = GetConnection(fpinfo->user, false);
+ conn = GetConnection(fpinfo->user, false, NULL);
get_remote_estimate(sql.data, conn, &rows, &width,
&startup_cost, &total_cost);
ReleaseConnection(conn);
@@ -3328,7 +3359,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
/*
* Execute EXPLAIN remotely.
*/
- res = pgfdw_exec_query(conn, sql);
+ res = pgfdw_exec_query(conn, sql, NULL);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, sql);
@@ -3452,6 +3483,10 @@ create_cursor(ForeignScanState *node)
StringInfoData buf;
PGresult *res;
+ /* First, process a pending asynchronous request, if any. */
+ if (fsstate->conn_state->pendingAreq)
+ process_pending_request(fsstate->conn_state->pendingAreq);
+
/*
* Construct array of query parameter values in text format. We do the
* conversions in the short-lived per-tuple context, so as not to cause a
@@ -3532,17 +3567,38 @@ fetch_more_data(ForeignScanState *node)
PG_TRY();
{
PGconn *conn = fsstate->conn;
- char sql[64];
int numrows;
int i;
- snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
- fsstate->fetch_size, fsstate->cursor_number);
+ if (fsstate->async_capable)
+ {
+ Assert(fsstate->conn_state->pendingAreq);
- res = pgfdw_exec_query(conn, sql);
- /* On error, report the original query, not the FETCH. */
- if (PQresultStatus(res) != PGRES_TUPLES_OK)
- pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+ /*
+ * The query was already sent by an earlier call to
+ * fetch_more_data_begin. So now we just fetch the result.
+ */
+ res = pgfdw_get_result(conn, fsstate->query);
+ /* On error, report the original query, not the FETCH. */
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+
+ /* Reset per-connection state */
+ fsstate->conn_state->pendingAreq = NULL;
+ }
+ else
+ {
+ char sql[64];
+
+ /* This is a regular synchronous fetch. */
+ snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
+ fsstate->fetch_size, fsstate->cursor_number);
+
+ res = pgfdw_exec_query(conn, sql, fsstate->conn_state);
+ /* On error, report the original query, not the FETCH. */
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+ }
/* Convert the data into HeapTuples */
numrows = PQntuples(res);
@@ -3634,7 +3690,8 @@ reset_transmission_modes(int nestlevel)
* Utility routine to close a cursor.
*/
static void
-close_cursor(PGconn *conn, unsigned int cursor_number)
+close_cursor(PGconn *conn, unsigned int cursor_number,
+ PgFdwConnState *conn_state)
{
char sql[64];
PGresult *res;
@@ -3645,7 +3702,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = pgfdw_exec_query(conn, sql);
+ res = pgfdw_exec_query(conn, sql, conn_state);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, conn, true, sql);
PQclear(res);
@@ -3694,7 +3751,7 @@ create_foreign_modify(EState *estate,
user = GetUserMapping(userid, table->serverid);
/* Open connection; report that we'll create a prepared statement. */
- fmstate->conn = GetConnection(user, true);
+ fmstate->conn = GetConnection(user, true, &fmstate->conn_state);
fmstate->p_name = NULL; /* prepared statement not made yet */
/* Set up remote query information. */
@@ -3793,6 +3850,10 @@ execute_foreign_modify(EState *estate,
operation == CMD_UPDATE ||
operation == CMD_DELETE);
+ /* First, process a pending asynchronous request, if any. */
+ if (fmstate->conn_state->pendingAreq)
+ process_pending_request(fmstate->conn_state->pendingAreq);
+
/*
* If the existing query was deparsed and prepared for a different number
* of rows, rebuild it for the proper number.
@@ -3894,6 +3955,11 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
char *p_name;
PGresult *res;
+ /*
+ * The caller would already have processed a pending asynchronous request
+ * if any, so no need to do it here.
+ */
+
/* Construct name we'll use for the prepared statement. */
snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
GetPrepStmtNumber(fmstate->conn));
@@ -4079,7 +4145,7 @@ deallocate_query(PgFdwModifyState *fmstate)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = pgfdw_exec_query(fmstate->conn, sql);
+ res = pgfdw_exec_query(fmstate->conn, sql, fmstate->conn_state);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
PQclear(res);
@@ -4227,6 +4293,10 @@ execute_dml_stmt(ForeignScanState *node)
int numParams = dmstate->numParams;
const char **values = dmstate->param_values;
+ /* First, process a pending asynchronous request, if any. */
+ if (dmstate->conn_state->pendingAreq)
+ process_pending_request(dmstate->conn_state->pendingAreq);
+
/*
* Construct array of query parameter values in text format.
*/
@@ -4628,7 +4698,7 @@ postgresAnalyzeForeignTable(Relation relation,
*/
table = GetForeignTable(RelationGetRelid(relation));
user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
- conn = GetConnection(user, false);
+ conn = GetConnection(user, false, NULL);
/*
* Construct command to get page count for relation.
@@ -4639,7 +4709,7 @@ postgresAnalyzeForeignTable(Relation relation,
/* In what follows, do not risk leaking any PGresults. */
PG_TRY();
{
- res = pgfdw_exec_query(conn, sql.data);
+ res = pgfdw_exec_query(conn, sql.data, NULL);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, sql.data);
@@ -4714,7 +4784,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
table = GetForeignTable(RelationGetRelid(relation));
server = GetForeignServer(table->serverid);
user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
- conn = GetConnection(user, false);
+ conn = GetConnection(user, false, NULL);
/*
* Construct cursor that retrieves whole rows from remote.
@@ -4731,7 +4801,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
int fetch_size;
ListCell *lc;
- res = pgfdw_exec_query(conn, sql.data);
+ res = pgfdw_exec_query(conn, sql.data, NULL);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, conn, false, sql.data);
PQclear(res);
@@ -4783,7 +4853,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
*/
/* Fetch some rows */
- res = pgfdw_exec_query(conn, fetch_sql);
+ res = pgfdw_exec_query(conn, fetch_sql, NULL);
/* On error, report the original query, not the FETCH. */
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, sql.data);
@@ -4802,7 +4872,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
}
/* Close the cursor, just to be tidy. */
- close_cursor(conn, cursor_number);
+ close_cursor(conn, cursor_number, NULL);
}
PG_CATCH();
{
@@ -4942,7 +5012,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
*/
server = GetForeignServer(serverOid);
mapping = GetUserMapping(GetUserId(), server->serverid);
- conn = GetConnection(mapping, false);
+ conn = GetConnection(mapping, false, NULL);
/* Don't attempt to import collation if remote server hasn't got it */
if (PQserverVersion(conn) < 90100)
@@ -4958,7 +5028,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
deparseStringLiteral(&buf, stmt->remote_schema);
- res = pgfdw_exec_query(conn, buf.data);
+ res = pgfdw_exec_query(conn, buf.data, NULL);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, buf.data);
@@ -5070,7 +5140,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
/* Fetch the data */
- res = pgfdw_exec_query(conn, buf.data);
+ res = pgfdw_exec_query(conn, buf.data, NULL);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, buf.data);
@@ -5530,6 +5600,8 @@ apply_server_options(PgFdwRelationInfo *fpinfo)
ExtractExtensionList(defGetString(def), false);
else if (strcmp(def->defname, "fetch_size") == 0)
fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
+ else if (strcmp(def->defname, "async_capable") == 0)
+ fpinfo->async_capable = defGetBoolean(def);
}
}
@@ -5551,6 +5623,8 @@ apply_table_options(PgFdwRelationInfo *fpinfo)
fpinfo->use_remote_estimate = defGetBoolean(def);
else if (strcmp(def->defname, "fetch_size") == 0)
fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
+ else if (strcmp(def->defname, "async_capable") == 0)
+ fpinfo->async_capable = defGetBoolean(def);
}
}
@@ -5585,6 +5659,7 @@ merge_fdw_options(PgFdwRelationInfo *fpinfo,
fpinfo->shippable_extensions = fpinfo_o->shippable_extensions;
fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate;
fpinfo->fetch_size = fpinfo_o->fetch_size;
+ fpinfo->async_capable = fpinfo_o->async_capable;
/* Merge the table level options from either side of the join. */
if (fpinfo_i)
@@ -5606,6 +5681,13 @@ merge_fdw_options(PgFdwRelationInfo *fpinfo,
* relation sizes.
*/
fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size);
+
+ /*
+ * We'll prefer to consider this join async-capable if any table from
+ * either side of the join is considered async-capable.
+ */
+ fpinfo->async_capable = fpinfo_o->async_capable ||
+ fpinfo_i->async_capable;
}
}
@@ -6490,6 +6572,236 @@ add_foreign_final_paths(PlannerInfo *root, RelOptInfo *input_rel,
}
/*
+ * postgresIsForeignPathAsyncCapable
+ * Check whether a given ForeignPath node is async-capable.
+ */
+static bool
+postgresIsForeignPathAsyncCapable(ForeignPath *path)
+{
+ RelOptInfo *rel = ((Path *) path)->parent;
+ PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private;
+
+ return fpinfo->async_capable;
+}
+
+/*
+ * postgresForeignAsyncRequest
+ * Asynchronously request next tuple from a foreign PostgreSQL table.
+ */
+static void
+postgresForeignAsyncRequest(AsyncRequest *areq)
+{
+ produce_tuple_asynchronously(areq, true);
+}
+
+/*
+ * postgresForeignAsyncConfigureWait
+ * Configure a file descriptor event for which we wish to wait.
+ */
+static void
+postgresForeignAsyncConfigureWait(AsyncRequest *areq)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq;
+ AppendState *requestor = (AppendState *) areq->requestor;
+ WaitEventSet *set = requestor->as_eventset;
+
+ /* This should not be called unless callback_pending */
+ Assert(areq->callback_pending);
+
+ /* The core code would have registered postmaster death event */
+ Assert(GetNumRegisteredWaitEvents(set) >= 1);
+
+ /* Begin an asynchronous data fetch if not already done */
+ if (!pendingAreq)
+ fetch_more_data_begin(areq);
+ else if (pendingAreq->requestor != areq->requestor)
+ {
+ /*
+ * This is the case when the in-process request was made by another
+ * Append. Note that it might be useless to process the request,
+ * because the query might not need tuples from that Append anymore.
+ * Skip the given request if there are any configured events other
+ * than the postmaster death event; otherwise process the request,
+ * then begin a fetch to configure the event below, because otherwise
+ * we might end up with no configured events other than the postmaster
+ * death event.
+ */
+ if (GetNumRegisteredWaitEvents(set) > 1)
+ return;
+ process_pending_request(pendingAreq);
+ fetch_more_data_begin(areq);
+ }
+ else if (pendingAreq->requestee != areq->requestee)
+ {
+ /*
+ * This is the case when the in-process request was made by the same
+ * parent but for a different child. Since we configure only the
+ * event for the request made for that child, skip the given request.
+ */
+ return;
+ }
+ else
+ Assert(pendingAreq == areq);
+
+ AddWaitEventToSet(set, WL_SOCKET_READABLE, PQsocket(fsstate->conn),
+ NULL, areq);
+}
+
+/*
+ * postgresForeignAsyncNotify
+ * Fetch some more tuples from a file descriptor that becomes ready,
+ * requesting next tuple.
+ */
+static void
+postgresForeignAsyncNotify(AsyncRequest *areq)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+
+ /* The request should be currently in-process */
+ Assert(fsstate->conn_state->pendingAreq == areq);
+
+ /* The core code would have initialized the callback_pending flag */
+ Assert(!areq->callback_pending);
+
+ /* On error, report the original query, not the FETCH. */
+ if (!PQconsumeInput(fsstate->conn))
+ pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query);
+
+ fetch_more_data(node);
+
+ produce_tuple_asynchronously(areq, true);
+}
+
+/*
+ * Asynchronously produce next tuple from a foreign PostgreSQL table.
+ */
+static void
+produce_tuple_asynchronously(AsyncRequest *areq, bool fetch)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq;
+ TupleTableSlot *result;
+
+ /* This should not be called if the request is currently in-process */
+ Assert(areq != pendingAreq);
+
+ /* Fetch some more tuples, if we've run out */
+ if (fsstate->next_tuple >= fsstate->num_tuples)
+ {
+ /* No point in another fetch if we already detected EOF, though */
+ if (!fsstate->eof_reached)
+ {
+ /* Mark the request as pending for a callback */
+ ExecAsyncRequestPending(areq);
+ /* Begin another fetch if requested and if no pending request */
+ if (fetch && !pendingAreq)
+ fetch_more_data_begin(areq);
+ }
+ else
+ {
+ /* There's nothing more to do; just return a NULL pointer */
+ result = NULL;
+ /* Mark the request as complete */
+ ExecAsyncRequestDone(areq, result);
+ }
+ return;
+ }
+
+ /* Get a tuple from the ForeignScan node */
+ result = ExecProcNode((PlanState *) node);
+ if (!TupIsNull(result))
+ {
+ /* Mark the request as complete */
+ ExecAsyncRequestDone(areq, result);
+ return;
+ }
+ Assert(fsstate->next_tuple >= fsstate->num_tuples);
+
+ /* Fetch some more tuples, if we've not detected EOF yet */
+ if (!fsstate->eof_reached)
+ {
+ /* Mark the request as pending for a callback */
+ ExecAsyncRequestPending(areq);
+ /* Begin another fetch if requested and if no pending request */
+ if (fetch && !pendingAreq)
+ fetch_more_data_begin(areq);
+ }
+ else
+ {
+ /* There's nothing more to do; just return a NULL pointer */
+ result = NULL;
+ /* Mark the request as complete */
+ ExecAsyncRequestDone(areq, result);
+ }
+}
+
+/*
+ * Begin an asynchronous data fetch.
+ *
+ * Note: fetch_more_data must be called to fetch the result.
+ */
+static void
+fetch_more_data_begin(AsyncRequest *areq)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ char sql[64];
+
+ Assert(!fsstate->conn_state->pendingAreq);
+
+ /* Create the cursor synchronously. */
+ if (!fsstate->cursor_exists)
+ create_cursor(node);
+
+ /* We will send this query, but not wait for the response. */
+ snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
+ fsstate->fetch_size, fsstate->cursor_number);
+
+ if (PQsendQuery(fsstate->conn, sql) < 0)
+ pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query);
+
+ /* Remember that the request is in process */
+ fsstate->conn_state->pendingAreq = areq;
+}
+
+/*
+ * Process a pending asynchronous request.
+ */
+void
+process_pending_request(AsyncRequest *areq)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ EState *estate = node->ss.ps.state;
+ MemoryContext oldcontext;
+
+ /* The request should be currently in-process */
+ Assert(fsstate->conn_state->pendingAreq == areq);
+
+ oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);
+
+ /* The request would have been pending for a callback */
+ Assert(areq->callback_pending);
+
+ /* Unlike AsyncNotify, we unset callback_pending ourselves */
+ areq->callback_pending = false;
+
+ fetch_more_data(node);
+
+ /* We need to send a new query afterwards; don't fetch */
+ produce_tuple_asynchronously(areq, false);
+
+ /* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */
+ ExecAsyncResponse(areq);
+
+ MemoryContextSwitchTo(oldcontext);
+}
+
+/*
* Create a tuple from the specified row of the PGresult.
*
* rel is the local representation of the foreign table, attinmeta is