aboutsummaryrefslogtreecommitdiff
path: root/contrib/postgres_fdw/postgres_fdw.c
diff options
context:
space:
mode:
authorTom Lane <tgl@sss.pgh.pa.us>2013-02-21 05:26:23 -0500
committerTom Lane <tgl@sss.pgh.pa.us>2013-02-21 05:27:16 -0500
commitd0d75c402217421b691050857eb3d7af82d0c770 (patch)
tree1d934b75b12e41c80520ce3aea6830e3bbe4b718 /contrib/postgres_fdw/postgres_fdw.c
parentf435cd1d385859a0cdb1d70fccc21dde2b1ee116 (diff)
downloadpostgresql-d0d75c402217421b691050857eb3d7af82d0c770.tar.gz
postgresql-d0d75c402217421b691050857eb3d7af82d0c770.zip
Add postgres_fdw contrib module.
There's still a lot of room for improvement, but it basically works, and we need this to be present before we can do anything much with the writable-foreign-tables patch. So let's commit it and get on with testing. Shigeru Hanada, reviewed by KaiGai Kohei and Tom Lane
Diffstat (limited to 'contrib/postgres_fdw/postgres_fdw.c')
-rw-r--r--contrib/postgres_fdw/postgres_fdw.c1400
1 files changed, 1400 insertions, 0 deletions
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
new file mode 100644
index 00000000000..0aef00b738d
--- /dev/null
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -0,0 +1,1400 @@
+/*-------------------------------------------------------------------------
+ *
+ * postgres_fdw.c
+ * Foreign-data wrapper for remote PostgreSQL servers
+ *
+ * Portions Copyright (c) 2012-2013, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * contrib/postgres_fdw/postgres_fdw.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "postgres_fdw.h"
+
+#include "access/htup_details.h"
+#include "commands/defrem.h"
+#include "commands/explain.h"
+#include "commands/vacuum.h"
+#include "foreign/fdwapi.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "optimizer/cost.h"
+#include "optimizer/pathnode.h"
+#include "optimizer/planmain.h"
+#include "parser/parsetree.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+
+
+PG_MODULE_MAGIC;
+
+/* Default CPU cost to start up a foreign query. */
+#define DEFAULT_FDW_STARTUP_COST 100.0
+
+/* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */
+#define DEFAULT_FDW_TUPLE_COST 0.01
+
+/*
+ * FDW-specific planner information kept in RelOptInfo.fdw_private for a
+ * foreign table. This information is collected by postgresGetForeignRelSize.
+ */
+typedef struct PgFdwRelationInfo
+{
+ /* XXX underdocumented, but a lot of this shouldn't be here anyway */
+ StringInfoData sql;
+ Cost startup_cost;
+ Cost total_cost;
+ List *remote_conds;
+ List *param_conds;
+ List *local_conds;
+ List *param_numbers;
+
+ /* Cached catalog information. */
+ ForeignTable *table;
+ ForeignServer *server;
+} PgFdwRelationInfo;
+
+/*
+ * Indexes of FDW-private information stored in fdw_private list.
+ *
+ * We store various information in ForeignScan.fdw_private to pass it from
+ * planner to executor. Specifically there is:
+ *
+ * 1) SELECT statement text to be sent to the remote server
+ * 2) IDs of PARAM_EXEC Params used in the SELECT statement
+ *
+ * These items are indexed with the enum FdwPrivateIndex, so an item can be
+ * fetched with list_nth(). For example, to get the SELECT statement:
+ * sql = strVal(list_nth(fdw_private, FdwPrivateSelectSql));
+ */
+enum FdwPrivateIndex
+{
+ /* SQL statement to execute remotely (as a String node) */
+ FdwPrivateSelectSql,
+
+ /* Integer list of param IDs of PARAM_EXEC Params used in SQL stmt */
+ FdwPrivateExternParamIds,
+
+ /* # of elements stored in the list fdw_private */
+ FdwPrivateNum
+};
+
+/*
+ * Execution state of a foreign scan using postgres_fdw.
+ */
+typedef struct PgFdwExecutionState
+{
+ Relation rel; /* relcache entry for the foreign table */
+ AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
+
+ List *fdw_private; /* FDW-private information from planner */
+
+ /* for remote query execution */
+ PGconn *conn; /* connection for the scan */
+ unsigned int cursor_number; /* quasi-unique ID for my cursor */
+ bool cursor_exists; /* have we created the cursor? */
+ bool extparams_done; /* have we converted PARAM_EXTERN params? */
+ int numParams; /* number of parameters passed to query */
+ Oid *param_types; /* array of types of query parameters */
+ const char **param_values; /* array of values of query parameters */
+
+ /* for storing result tuples */
+ HeapTuple *tuples; /* array of currently-retrieved tuples */
+ int num_tuples; /* # of tuples in array */
+ int next_tuple; /* index of next one to return */
+
+ /* batch-level state, for optimizing rewinds and avoiding useless fetch */
+ int fetch_ct_2; /* Min(# of fetches done, 2) */
+ bool eof_reached; /* true if last fetch reached EOF */
+
+ /* working memory contexts */
+ MemoryContext batch_cxt; /* context holding current batch of tuples */
+ MemoryContext temp_cxt; /* context for per-tuple temporary data */
+} PgFdwExecutionState;
+
+/*
+ * Workspace for analyzing a foreign table.
+ */
+typedef struct PgFdwAnalyzeState
+{
+ Relation rel; /* relcache entry for the foreign table */
+ AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
+
+ /* collected sample rows */
+ HeapTuple *rows; /* array of size targrows */
+ int targrows; /* target # of sample rows */
+ int numrows; /* # of sample rows collected */
+
+ /* for random sampling */
+ double samplerows; /* # of rows fetched */
+ double rowstoskip; /* # of rows to skip before next sample */
+ double rstate; /* random state */
+
+ /* working memory contexts */
+ MemoryContext anl_cxt; /* context for per-analyze lifespan data */
+ MemoryContext temp_cxt; /* context for per-tuple temporary data */
+} PgFdwAnalyzeState;
+
+/*
+ * Identify the attribute where data conversion fails.
+ */
+typedef struct ConversionLocation
+{
+ Relation rel; /* foreign table's relcache entry */
+ AttrNumber cur_attno; /* attribute number being processed, or 0 */
+} ConversionLocation;
+
+/*
+ * SQL functions
+ */
+extern Datum postgres_fdw_handler(PG_FUNCTION_ARGS);
+
+PG_FUNCTION_INFO_V1(postgres_fdw_handler);
+
+/*
+ * FDW callback routines
+ */
+static void postgresGetForeignRelSize(PlannerInfo *root,
+ RelOptInfo *baserel,
+ Oid foreigntableid);
+static void postgresGetForeignPaths(PlannerInfo *root,
+ RelOptInfo *baserel,
+ Oid foreigntableid);
+static ForeignScan *postgresGetForeignPlan(PlannerInfo *root,
+ RelOptInfo *baserel,
+ Oid foreigntableid,
+ ForeignPath *best_path,
+ List *tlist,
+ List *scan_clauses);
+static void postgresExplainForeignScan(ForeignScanState *node,
+ ExplainState *es);
+static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
+static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node);
+static void postgresReScanForeignScan(ForeignScanState *node);
+static void postgresEndForeignScan(ForeignScanState *node);
+static bool postgresAnalyzeForeignTable(Relation relation,
+ AcquireSampleRowsFunc *func,
+ BlockNumber *totalpages);
+
+/*
+ * Helper functions
+ */
+static void get_remote_estimate(const char *sql,
+ PGconn *conn,
+ double *rows,
+ int *width,
+ Cost *startup_cost,
+ Cost *total_cost);
+static void create_cursor(ForeignScanState *node);
+static void fetch_more_data(ForeignScanState *node);
+static void close_cursor(PGconn *conn, unsigned int cursor_number);
+static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
+ HeapTuple *rows, int targrows,
+ double *totalrows,
+ double *totaldeadrows);
+static void analyze_row_processor(PGresult *res, int row,
+ PgFdwAnalyzeState *astate);
+static HeapTuple make_tuple_from_result_row(PGresult *res,
+ int row,
+ Relation rel,
+ AttInMetadata *attinmeta,
+ MemoryContext temp_context);
+static void conversion_error_callback(void *arg);
+
+
+/*
+ * Foreign-data wrapper handler function: return a struct with pointers
+ * to my callback routines.
+ */
+Datum
+postgres_fdw_handler(PG_FUNCTION_ARGS)
+{
+ FdwRoutine *routine = makeNode(FdwRoutine);
+
+ /* Required handler functions. */
+ routine->GetForeignRelSize = postgresGetForeignRelSize;
+ routine->GetForeignPaths = postgresGetForeignPaths;
+ routine->GetForeignPlan = postgresGetForeignPlan;
+ routine->ExplainForeignScan = postgresExplainForeignScan;
+ routine->BeginForeignScan = postgresBeginForeignScan;
+ routine->IterateForeignScan = postgresIterateForeignScan;
+ routine->ReScanForeignScan = postgresReScanForeignScan;
+ routine->EndForeignScan = postgresEndForeignScan;
+
+ /* Optional handler functions. */
+ routine->AnalyzeForeignTable = postgresAnalyzeForeignTable;
+
+ PG_RETURN_POINTER(routine);
+}
+
+/*
+ * postgresGetForeignRelSize
+ * Estimate # of rows and width of the result of the scan
+ *
+ * Here we estimate number of rows returned by the scan in two steps. In the
+ * first step, we execute remote EXPLAIN command to obtain the number of rows
+ * returned from remote side. In the second step, we calculate the selectivity
+ * of the filtering done on local side, and modify first estimate.
+ *
+ * We have to get some catalog objects and generate remote query string here,
+ * so we store such expensive information in FDW private area of RelOptInfo and
+ * pass them to subsequent functions for reuse.
+ */
+static void
+postgresGetForeignRelSize(PlannerInfo *root,
+ RelOptInfo *baserel,
+ Oid foreigntableid)
+{
+ bool use_remote_explain = false;
+ ListCell *lc;
+ PgFdwRelationInfo *fpinfo;
+ StringInfo sql;
+ ForeignTable *table;
+ ForeignServer *server;
+ Selectivity sel;
+ double rows;
+ int width;
+ Cost startup_cost;
+ Cost total_cost;
+ List *remote_conds;
+ List *param_conds;
+ List *local_conds;
+ List *param_numbers;
+
+ /*
+ * We use PgFdwRelationInfo to pass various information to subsequent
+ * functions.
+ */
+ fpinfo = palloc0(sizeof(PgFdwRelationInfo));
+ initStringInfo(&fpinfo->sql);
+ sql = &fpinfo->sql;
+
+ /*
+ * Determine whether we use remote estimate or not. Note that per-table
+ * setting overrides per-server setting.
+ */
+ table = GetForeignTable(foreigntableid);
+ server = GetForeignServer(table->serverid);
+ foreach(lc, server->options)
+ {
+ DefElem *def = (DefElem *) lfirst(lc);
+
+ if (strcmp(def->defname, "use_remote_explain") == 0)
+ {
+ use_remote_explain = defGetBoolean(def);
+ break;
+ }
+ }
+ foreach(lc, table->options)
+ {
+ DefElem *def = (DefElem *) lfirst(lc);
+
+ if (strcmp(def->defname, "use_remote_explain") == 0)
+ {
+ use_remote_explain = defGetBoolean(def);
+ break;
+ }
+ }
+
+ /*
+ * Construct remote query which consists of SELECT, FROM, and WHERE
+ * clauses. Conditions which contain any Param node are excluded because
+ * placeholder can't be used in EXPLAIN statement. Such conditions are
+ * appended later.
+ */
+ classifyConditions(root, baserel, &remote_conds, &param_conds,
+ &local_conds, &param_numbers);
+ deparseSimpleSql(sql, root, baserel, local_conds);
+ if (list_length(remote_conds) > 0)
+ appendWhereClause(sql, true, remote_conds, root);
+
+ /*
+ * If the table or the server is configured to use remote EXPLAIN, connect
+ * to the foreign server and execute EXPLAIN with the quals that don't
+ * contain any Param nodes. Otherwise, estimate rows using whatever
+ * statistics we have locally, in a way similar to ordinary tables.
+ */
+ if (use_remote_explain)
+ {
+ RangeTblEntry *rte;
+ Oid userid;
+ UserMapping *user;
+ PGconn *conn;
+
+ /*
+ * Identify which user to do the remote access as. This should match
+ * what ExecCheckRTEPerms() does. If we fail due to lack of
+ * permissions, the query would have failed at runtime anyway.
+ */
+ rte = planner_rt_fetch(baserel->relid, root);
+ userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
+
+ user = GetUserMapping(userid, server->serverid);
+ conn = GetConnection(server, user);
+ get_remote_estimate(sql->data, conn, &rows, &width,
+ &startup_cost, &total_cost);
+ ReleaseConnection(conn);
+
+ /*
+ * Estimate selectivity of conditions which were not used in remote
+ * EXPLAIN by calling clauselist_selectivity(). The best we can do
+ * for these conditions is to estimate selectivity on the basis of
+ * local statistics.
+ */
+ sel = clauselist_selectivity(root, param_conds,
+ baserel->relid, JOIN_INNER, NULL);
+ sel *= clauselist_selectivity(root, local_conds,
+ baserel->relid, JOIN_INNER, NULL);
+
+ /* Report estimated numbers to planner. */
+ baserel->rows = clamp_row_est(rows * sel);
+ baserel->width = width;
+ }
+ else
+ {
+ /*
+ * Estimate rows from the result of the last ANALYZE, using all
+ * conditions specified in original query.
+ *
+ * If the foreign table has never been ANALYZEd, it will have relpages
+ * and reltuples equal to zero, which most likely has nothing to do
+ * with reality. We can't do a whole lot about that if we're not
+ * allowed to consult the remote server, but we can use a hack similar
+ * to plancat.c's treatment of empty relations: use a minimum size
+ * estimate of 10 pages, and divide by the column-datatype-based width
+ * estimate to get the corresponding number of tuples.
+ */
+ if (baserel->tuples <= 0)
+ baserel->tuples =
+ (10 * BLCKSZ) / (baserel->width + sizeof(HeapTupleHeaderData));
+
+ set_baserel_size_estimates(root, baserel);
+
+ /*
+ * XXX need to do something here to calculate sane startup and total
+ * cost estimates ... for the moment, we do this:
+ */
+ startup_cost = 0;
+ total_cost = baserel->rows * cpu_tuple_cost;
+ }
+
+ /*
+ * Finish deparsing remote query by adding conditions which were unusable
+ * in remote EXPLAIN since they contain Param nodes.
+ */
+ if (list_length(param_conds) > 0)
+ appendWhereClause(sql, !(list_length(remote_conds) > 0), param_conds,
+ root);
+
+ /*
+ * Store obtained information into FDW-private area of RelOptInfo so it's
+ * available to subsequent functions.
+ */
+ fpinfo->startup_cost = startup_cost;
+ fpinfo->total_cost = total_cost;
+ fpinfo->remote_conds = remote_conds;
+ fpinfo->param_conds = param_conds;
+ fpinfo->local_conds = local_conds;
+ fpinfo->param_numbers = param_numbers;
+ fpinfo->table = table;
+ fpinfo->server = server;
+ baserel->fdw_private = (void *) fpinfo;
+}
+
+/*
+ * postgresGetForeignPaths
+ * Create possible scan paths for a scan on the foreign table
+ */
+static void
+postgresGetForeignPaths(PlannerInfo *root,
+ RelOptInfo *baserel,
+ Oid foreigntableid)
+{
+ PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
+ ForeignPath *path;
+ ListCell *lc;
+ double fdw_startup_cost = DEFAULT_FDW_STARTUP_COST;
+ double fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST;
+ Cost startup_cost;
+ Cost total_cost;
+ List *fdw_private;
+
+ /*
+ * Check for user override of fdw_startup_cost, fdw_tuple_cost values
+ */
+ foreach(lc, fpinfo->server->options)
+ {
+ DefElem *d = (DefElem *) lfirst(lc);
+
+ if (strcmp(d->defname, "fdw_startup_cost") == 0)
+ fdw_startup_cost = strtod(defGetString(d), NULL);
+ else if (strcmp(d->defname, "fdw_tuple_cost") == 0)
+ fdw_tuple_cost = strtod(defGetString(d), NULL);
+ }
+
+ /*
+ * We have cost values which are estimated on remote side, so adjust them
+ * for better estimate which respect various stuffs to complete the scan,
+ * such as sending query, transferring result, and local filtering.
+ */
+ startup_cost = fpinfo->startup_cost;
+ total_cost = fpinfo->total_cost;
+
+ /*----------
+ * Adjust costs with factors of the corresponding foreign server:
+ * - add cost to establish connection to both startup and total
+ * - add cost to manipulate on remote, and transfer result to total
+ * - add cost to manipulate tuples on local side to total
+ *----------
+ */
+ startup_cost += fdw_startup_cost;
+ total_cost += fdw_startup_cost;
+ total_cost += fdw_tuple_cost * baserel->rows;
+ total_cost += cpu_tuple_cost * baserel->rows;
+
+ /*
+ * Build the fdw_private list that will be available to the executor.
+ * Items in the list must match enum FdwPrivateIndex, above.
+ */
+ fdw_private = list_make2(makeString(fpinfo->sql.data),
+ fpinfo->param_numbers);
+
+ /*
+ * Create simplest ForeignScan path node and add it to baserel. This path
+ * corresponds to SeqScan path of regular tables (though depending on what
+ * baserestrict conditions we were able to send to remote, there might
+ * actually be an indexscan happening there).
+ */
+ path = create_foreignscan_path(root, baserel,
+ baserel->rows,
+ startup_cost,
+ total_cost,
+ NIL, /* no pathkeys */
+ NULL, /* no outer rel either */
+ fdw_private);
+ add_path(baserel, (Path *) path);
+
+ /*
+ * XXX We can consider sorted path or parameterized path here if we know
+ * that foreign table is indexed on remote end. For this purpose, we
+ * might have to support FOREIGN INDEX to represent possible sets of sort
+ * keys and/or filtering. Or we could just try some join conditions and
+ * see if remote side estimates using them as markedly cheaper. Note that
+ * executor functions need work to support internal Params before we can
+ * try generating any parameterized paths, though.
+ */
+}
+
+/*
+ * postgresGetForeignPlan
+ * Create ForeignScan plan node which implements selected best path
+ */
+static ForeignScan *
+postgresGetForeignPlan(PlannerInfo *root,
+ RelOptInfo *baserel,
+ Oid foreigntableid,
+ ForeignPath *best_path,
+ List *tlist,
+ List *scan_clauses)
+{
+ PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
+ Index scan_relid = baserel->relid;
+ List *fdw_private = best_path->fdw_private;
+ List *remote_exprs = NIL;
+ List *local_exprs = NIL;
+ ListCell *lc;
+
+ /*
+ * Separate the scan_clauses into those that can be executed remotely and
+ * those that can't. For now, we accept only remote clauses that were
+ * previously determined to be safe by classifyClauses (so, only
+ * baserestrictinfo clauses can be used that way).
+ *
+ * This code must match "extract_actual_clauses(scan_clauses, false)"
+ * except for the additional decision about remote versus local execution.
+ */
+ foreach(lc, scan_clauses)
+ {
+ RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
+
+ Assert(IsA(rinfo, RestrictInfo));
+
+ /* Ignore any pseudoconstants, they're dealt with elsewhere */
+ if (rinfo->pseudoconstant)
+ continue;
+
+ /* Either simple or parameterized remote clauses are OK now */
+ if (list_member_ptr(fpinfo->remote_conds, rinfo) ||
+ list_member_ptr(fpinfo->param_conds, rinfo))
+ remote_exprs = lappend(remote_exprs, rinfo->clause);
+ else
+ local_exprs = lappend(local_exprs, rinfo->clause);
+ }
+
+ /*
+ * Create the ForeignScan node from target list, local filtering
+ * expressions, remote filtering expressions, and FDW private information.
+ *
+ * Note that the remote_exprs are stored in the fdw_exprs field of the
+ * finished plan node; we can't keep them in private state because then
+ * they wouldn't be subject to later planner processing.
+ *
+ * XXX Currently, the remote_exprs aren't actually used at runtime, so we
+ * don't need to store them at all. But we'll keep this behavior for a
+ * little while for debugging reasons.
+ */
+ return make_foreignscan(tlist,
+ local_exprs,
+ scan_relid,
+ remote_exprs,
+ fdw_private);
+}
+
+/*
+ * postgresExplainForeignScan
+ * Produce extra output for EXPLAIN
+ */
+static void
+postgresExplainForeignScan(ForeignScanState *node, ExplainState *es)
+{
+ List *fdw_private;
+ char *sql;
+
+ if (es->verbose)
+ {
+ fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
+ sql = strVal(list_nth(fdw_private, FdwPrivateSelectSql));
+ ExplainPropertyText("Remote SQL", sql, es);
+ }
+}
+
+/*
+ * postgresBeginForeignScan
+ * Initiate an executor scan of a foreign PostgreSQL table.
+ */
+static void
+postgresBeginForeignScan(ForeignScanState *node, int eflags)
+{
+ ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
+ EState *estate = node->ss.ps.state;
+ PgFdwExecutionState *festate;
+ RangeTblEntry *rte;
+ Oid userid;
+ ForeignTable *table;
+ ForeignServer *server;
+ UserMapping *user;
+ List *param_numbers;
+ int numParams;
+ int i;
+
+ /*
+ * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
+ */
+ if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
+ return;
+
+ /*
+ * We'll save private state in node->fdw_state.
+ */
+ festate = (PgFdwExecutionState *) palloc0(sizeof(PgFdwExecutionState));
+ node->fdw_state = (void *) festate;
+
+ /*
+ * Identify which user to do the remote access as. This should match what
+ * ExecCheckRTEPerms() does.
+ */
+ rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table);
+ userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
+
+ /* Get info about foreign table. */
+ festate->rel = node->ss.ss_currentRelation;
+ table = GetForeignTable(RelationGetRelid(festate->rel));
+ server = GetForeignServer(table->serverid);
+ user = GetUserMapping(userid, server->serverid);
+
+ /*
+ * Get connection to the foreign server. Connection manager will
+ * establish new connection if necessary.
+ */
+ festate->conn = GetConnection(server, user);
+
+ /* Assign a unique ID for my cursor */
+ festate->cursor_number = GetCursorNumber(festate->conn);
+ festate->cursor_exists = false;
+
+ /* Get private info created by planner functions. */
+ festate->fdw_private = fsplan->fdw_private;
+
+ /* Create contexts for batches of tuples and per-tuple temp workspace. */
+ festate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
+ "postgres_fdw tuple data",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ festate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
+ "postgres_fdw temporary data",
+ ALLOCSET_SMALL_MINSIZE,
+ ALLOCSET_SMALL_INITSIZE,
+ ALLOCSET_SMALL_MAXSIZE);
+
+ /* Get info we'll need for data conversion. */
+ festate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(festate->rel));
+
+ /*
+ * Allocate buffer for query parameters, if the remote conditions use any.
+ *
+ * We use a parameter slot for each PARAM_EXTERN parameter, even though
+ * not all of them may get sent to the remote server. This allows us to
+ * refer to Params by their original number rather than remapping, and it
+ * doesn't cost much. Slots that are not actually used get filled with
+ * null values that are arbitrarily marked as being of type int4.
+ */
+ param_numbers = (List *)
+ list_nth(festate->fdw_private, FdwPrivateExternParamIds);
+ if (param_numbers != NIL)
+ {
+ ParamListInfo params = estate->es_param_list_info;
+
+ numParams = params ? params->numParams : 0;
+ }
+ else
+ numParams = 0;
+ festate->numParams = numParams;
+ if (numParams > 0)
+ {
+ /* we initially fill all slots with value = NULL, type = int4 */
+ festate->param_types = (Oid *) palloc(numParams * sizeof(Oid));
+ festate->param_values = (const char **) palloc0(numParams * sizeof(char *));
+ for (i = 0; i < numParams; i++)
+ festate->param_types[i] = INT4OID;
+ }
+ else
+ {
+ festate->param_types = NULL;
+ festate->param_values = NULL;
+ }
+ festate->extparams_done = false;
+}
+
+/*
+ * postgresIterateForeignScan
+ * Retrieve next row from the result set, or clear tuple slot to indicate
+ * EOF.
+ */
+static TupleTableSlot *
+postgresIterateForeignScan(ForeignScanState *node)
+{
+ PgFdwExecutionState *festate = (PgFdwExecutionState *) node->fdw_state;
+ TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
+
+ /*
+ * If this is the first call after Begin or ReScan, we need to create the
+ * cursor on the remote side.
+ */
+ if (!festate->cursor_exists)
+ create_cursor(node);
+
+ /*
+ * Get some more tuples, if we've run out.
+ */
+ if (festate->next_tuple >= festate->num_tuples)
+ {
+ /* No point in another fetch if we already detected EOF, though. */
+ if (!festate->eof_reached)
+ fetch_more_data(node);
+ /* If we didn't get any tuples, must be end of data. */
+ if (festate->next_tuple >= festate->num_tuples)
+ return ExecClearTuple(slot);
+ }
+
+ /*
+ * Return the next tuple.
+ */
+ ExecStoreTuple(festate->tuples[festate->next_tuple++],
+ slot,
+ InvalidBuffer,
+ false);
+
+ return slot;
+}
+
+/*
+ * postgresReScanForeignScan
+ * Restart the scan.
+ */
+static void
+postgresReScanForeignScan(ForeignScanState *node)
+{
+ PgFdwExecutionState *festate = (PgFdwExecutionState *) node->fdw_state;
+ char sql[64];
+ PGresult *res;
+
+ /*
+ * Note: we assume that PARAM_EXTERN params don't change over the life of
+ * the query, so no need to reset extparams_done.
+ */
+
+ /* If we haven't created the cursor yet, nothing to do. */
+ if (!festate->cursor_exists)
+ return;
+
+ /*
+ * If any internal parameters affecting this node have changed, we'd
+ * better destroy and recreate the cursor. Otherwise, rewinding it should
+ * be good enough. If we've only fetched zero or one batch, we needn't
+ * even rewind the cursor, just rescan what we have.
+ */
+ if (node->ss.ps.chgParam != NULL)
+ {
+ festate->cursor_exists = false;
+ snprintf(sql, sizeof(sql), "CLOSE c%u",
+ festate->cursor_number);
+ }
+ else if (festate->fetch_ct_2 > 1)
+ {
+ snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
+ festate->cursor_number);
+ }
+ else
+ {
+ /* Easy: just rescan what we already have in memory, if anything */
+ festate->next_tuple = 0;
+ return;
+ }
+
+ /*
+ * We don't use a PG_TRY block here, so be careful not to throw error
+ * without releasing the PGresult.
+ */
+ res = PQexec(festate->conn, sql);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pgfdw_report_error(ERROR, res, true, sql);
+ PQclear(res);
+
+ /* Now force a fresh FETCH. */
+ festate->tuples = NULL;
+ festate->num_tuples = 0;
+ festate->next_tuple = 0;
+ festate->fetch_ct_2 = 0;
+ festate->eof_reached = false;
+}
+
+/*
+ * postgresEndForeignScan
+ * Finish scanning foreign table and dispose objects used for this scan
+ */
+static void
+postgresEndForeignScan(ForeignScanState *node)
+{
+ PgFdwExecutionState *festate = (PgFdwExecutionState *) node->fdw_state;
+
+ /* if festate is NULL, we are in EXPLAIN; nothing to do */
+ if (festate == NULL)
+ return;
+
+ /* Close the cursor if open, to prevent accumulation of cursors */
+ if (festate->cursor_exists)
+ close_cursor(festate->conn, festate->cursor_number);
+
+ /* Release remote connection */
+ ReleaseConnection(festate->conn);
+ festate->conn = NULL;
+
+ /* MemoryContexts will be deleted automatically. */
+}
+
+/*
+ * Estimate costs of executing given SQL statement.
+ */
+static void
+get_remote_estimate(const char *sql, PGconn *conn,
+ double *rows, int *width,
+ Cost *startup_cost, Cost *total_cost)
+{
+ PGresult *volatile res = NULL;
+
+ /* PGresult must be released before leaving this function. */
+ PG_TRY();
+ {
+ StringInfoData buf;
+ char *line;
+ char *p;
+ int n;
+
+ /*
+ * Execute EXPLAIN remotely on given SQL statement.
+ */
+ initStringInfo(&buf);
+ appendStringInfo(&buf, "EXPLAIN %s", sql);
+ res = PQexec(conn, buf.data);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pgfdw_report_error(ERROR, res, false, buf.data);
+
+ /*
+ * Extract cost numbers for topmost plan node. Note we search for a
+ * left paren from the end of the line to avoid being confused by
+ * other uses of parentheses.
+ */
+ line = PQgetvalue(res, 0, 0);
+ p = strrchr(line, '(');
+ if (p == NULL)
+ elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
+ n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)",
+ startup_cost, total_cost, rows, width);
+ if (n != 4)
+ elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line);
+
+ PQclear(res);
+ res = NULL;
+ }
+ PG_CATCH();
+ {
+ if (res)
+ PQclear(res);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+}
+
+/*
+ * Create cursor for node's query with current parameter values.
+ */
+static void
+create_cursor(ForeignScanState *node)
+{
+ PgFdwExecutionState *festate = (PgFdwExecutionState *) node->fdw_state;
+ int numParams = festate->numParams;
+ Oid *types = festate->param_types;
+ const char **values = festate->param_values;
+ PGconn *conn = festate->conn;
+ char *sql;
+ StringInfoData buf;
+ PGresult *res;
+
+ /*
+ * Construct array of external parameter values in text format. Since
+ * there might be random unconvertible stuff in the ParamExternData array,
+ * take care to convert only values we actually need.
+ *
+ * Note that we leak the memory for the value strings until end of query;
+ * this doesn't seem like a big problem, and in any case we might need to
+ * recreate the cursor after a rescan, so we could need to re-use the
+ * values anyway.
+ */
+ if (numParams > 0 && !festate->extparams_done)
+ {
+ ParamListInfo params = node->ss.ps.state->es_param_list_info;
+ List *param_numbers;
+ ListCell *lc;
+
+ param_numbers = (List *)
+ list_nth(festate->fdw_private, FdwPrivateExternParamIds);
+ foreach(lc, param_numbers)
+ {
+ int paramno = lfirst_int(lc);
+ ParamExternData *prm = &params->params[paramno - 1];
+
+ /* give hook a chance in case parameter is dynamic */
+ if (!OidIsValid(prm->ptype) && params->paramFetch != NULL)
+ params->paramFetch(params, paramno);
+
+ /*
+ * Get string representation of each parameter value by invoking
+ * type-specific output function, unless the value is null.
+ */
+ types[paramno - 1] = prm->ptype;
+ if (prm->isnull)
+ values[paramno - 1] = NULL;
+ else
+ {
+ Oid out_func;
+ bool isvarlena;
+
+ getTypeOutputInfo(prm->ptype, &out_func, &isvarlena);
+ values[paramno - 1] = OidOutputFunctionCall(out_func,
+ prm->value);
+ }
+ }
+ festate->extparams_done = true;
+ }
+
+ /* Construct the DECLARE CURSOR command */
+ sql = strVal(list_nth(festate->fdw_private, FdwPrivateSelectSql));
+ initStringInfo(&buf);
+ appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
+ festate->cursor_number, sql);
+
+ /*
+ * We don't use a PG_TRY block here, so be careful not to throw error
+ * without releasing the PGresult.
+ */
+ res = PQexecParams(conn, buf.data, numParams, types, values,
+ NULL, NULL, 0);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pgfdw_report_error(ERROR, res, true, sql);
+ PQclear(res);
+
+ /* Mark the cursor as created, and show no tuples have been retrieved */
+ festate->cursor_exists = true;
+ festate->tuples = NULL;
+ festate->num_tuples = 0;
+ festate->next_tuple = 0;
+ festate->fetch_ct_2 = 0;
+ festate->eof_reached = false;
+
+ /* Clean up */
+ pfree(buf.data);
+}
+
+/*
+ * Fetch some more rows from the node's cursor.
+ */
+static void
+fetch_more_data(ForeignScanState *node)
+{
+ PgFdwExecutionState *festate = (PgFdwExecutionState *) node->fdw_state;
+ PGresult *volatile res = NULL;
+ MemoryContext oldcontext;
+
+ /*
+ * We'll store the tuples in the batch_cxt. First, flush the previous
+ * batch.
+ */
+ festate->tuples = NULL;
+ MemoryContextReset(festate->batch_cxt);
+ oldcontext = MemoryContextSwitchTo(festate->batch_cxt);
+
+ /* PGresult must be released before leaving this function. */
+ PG_TRY();
+ {
+ PGconn *conn = festate->conn;
+ char sql[64];
+ int fetch_size;
+ int numrows;
+ int i;
+
+ /* The fetch size is arbitrary, but shouldn't be enormous. */
+ fetch_size = 100;
+
+ snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
+ fetch_size, festate->cursor_number);
+
+ res = PQexec(conn, sql);
+ /* On error, report the original query, not the FETCH. */
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pgfdw_report_error(ERROR, res, false,
+ strVal(list_nth(festate->fdw_private,
+ FdwPrivateSelectSql)));
+
+ /* Convert the data into HeapTuples */
+ numrows = PQntuples(res);
+ festate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
+ festate->num_tuples = numrows;
+ festate->next_tuple = 0;
+
+ for (i = 0; i < numrows; i++)
+ {
+ festate->tuples[i] =
+ make_tuple_from_result_row(res, i,
+ festate->rel,
+ festate->attinmeta,
+ festate->temp_cxt);
+ }
+
+ /* Update fetch_ct_2 */
+ if (festate->fetch_ct_2 < 2)
+ festate->fetch_ct_2++;
+
+ /* Must be EOF if we didn't get as many tuples as we asked for. */
+ festate->eof_reached = (numrows < fetch_size);
+
+ PQclear(res);
+ res = NULL;
+ }
+ PG_CATCH();
+ {
+ if (res)
+ PQclear(res);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+
+ MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Utility routine to close a cursor.
+ */
+static void
+close_cursor(PGconn *conn, unsigned int cursor_number)
+{
+ char sql[64];
+ PGresult *res;
+
+ snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
+
+ /*
+ * We don't use a PG_TRY block here, so be careful not to throw error
+ * without releasing the PGresult.
+ */
+ res = PQexec(conn, sql);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pgfdw_report_error(ERROR, res, true, sql);
+ PQclear(res);
+}
+
+/*
+ * postgresAnalyzeForeignTable
+ * Test whether analyzing this foreign table is supported
+ */
+static bool
+postgresAnalyzeForeignTable(Relation relation,
+ AcquireSampleRowsFunc *func,
+ BlockNumber *totalpages)
+{
+ *totalpages = 0; /* XXX this is probably a bad idea */
+ *func = postgresAcquireSampleRowsFunc;
+
+ return true;
+}
+
+/*
+ * Acquire a random sample of rows from foreign table managed by postgres_fdw.
+ *
+ * We fetch the whole table from the remote side and pick out some sample rows.
+ *
+ * Selected rows are returned in the caller-allocated array rows[],
+ * which must have at least targrows entries.
+ * The actual number of rows selected is returned as the function result.
+ * We also count the total number of rows in the table and return it into
+ * *totalrows. Note that *totaldeadrows is always set to 0.
+ *
+ * Note that the returned list of rows is not always in order by physical
+ * position in the table. Therefore, correlation estimates derived later
+ * may be meaningless, but it's OK because we don't use the estimates
+ * currently (the planner only pays attention to correlation for indexscans).
+ */
+static int
+postgresAcquireSampleRowsFunc(Relation relation, int elevel,
+ HeapTuple *rows, int targrows,
+ double *totalrows,
+ double *totaldeadrows)
+{
+ PgFdwAnalyzeState astate;
+ ForeignTable *table;
+ ForeignServer *server;
+ UserMapping *user;
+ PGconn *conn;
+ unsigned int cursor_number;
+ StringInfoData sql;
+ PGresult *volatile res = NULL;
+
+ /* Initialize workspace state */
+ astate.rel = relation;
+ astate.attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(relation));
+
+ astate.rows = rows;
+ astate.targrows = targrows;
+ astate.numrows = 0;
+ astate.samplerows = 0;
+ astate.rowstoskip = -1; /* -1 means not set yet */
+ astate.rstate = anl_init_selection_state(targrows);
+
+ /* Remember ANALYZE context, and create a per-tuple temp context */
+ astate.anl_cxt = CurrentMemoryContext;
+ astate.temp_cxt = AllocSetContextCreate(CurrentMemoryContext,
+ "postgres_fdw temporary data",
+ ALLOCSET_SMALL_MINSIZE,
+ ALLOCSET_SMALL_INITSIZE,
+ ALLOCSET_SMALL_MAXSIZE);
+
+ /*
+ * Get the connection to use. We do the remote access as the table's
+ * owner, even if the ANALYZE was started by some other user.
+ */
+ table = GetForeignTable(RelationGetRelid(relation));
+ server = GetForeignServer(table->serverid);
+ user = GetUserMapping(relation->rd_rel->relowner, server->serverid);
+ conn = GetConnection(server, user);
+
+ /*
+ * Construct cursor that retrieves whole rows from remote.
+ */
+ cursor_number = GetCursorNumber(conn);
+ initStringInfo(&sql);
+ appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number);
+ deparseAnalyzeSql(&sql, relation);
+
+ /* In what follows, do not risk leaking any PGresults. */
+ PG_TRY();
+ {
+ res = PQexec(conn, sql.data);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pgfdw_report_error(ERROR, res, false, sql.data);
+ PQclear(res);
+ res = NULL;
+
+ /* Retrieve and process rows a batch at a time. */
+ for (;;)
+ {
+ char fetch_sql[64];
+ int fetch_size;
+ int numrows;
+ int i;
+
+ /* Allow users to cancel long query */
+ CHECK_FOR_INTERRUPTS();
+
+ /*
+ * XXX possible future improvement: if rowstoskip is large, we
+ * could issue a MOVE rather than physically fetching the rows,
+ * then just adjust rowstoskip and samplerows appropriately.
+ */
+
+ /* The fetch size is arbitrary, but shouldn't be enormous. */
+ fetch_size = 100;
+
+ /* Fetch some rows */
+ snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
+ fetch_size, cursor_number);
+
+ res = PQexec(conn, fetch_sql);
+ /* On error, report the original query, not the FETCH. */
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pgfdw_report_error(ERROR, res, false, sql.data);
+
+ /* Process whatever we got. */
+ numrows = PQntuples(res);
+ for (i = 0; i < numrows; i++)
+ analyze_row_processor(res, i, &astate);
+
+ PQclear(res);
+ res = NULL;
+
+ /* Must be EOF if we didn't get all the rows requested. */
+ if (numrows < fetch_size)
+ break;
+ }
+
+ /* Close the cursor, just to be tidy. */
+ close_cursor(conn, cursor_number);
+ }
+ PG_CATCH();
+ {
+ if (res)
+ PQclear(res);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+
+ ReleaseConnection(conn);
+
+ /* We assume that we have no dead tuple. */
+ *totaldeadrows = 0.0;
+
+ /* We've retrieved all living tuples from foreign server. */
+ *totalrows = astate.samplerows;
+
+ /*
+ * Emit some interesting relation info
+ */
+ ereport(elevel,
+ (errmsg("\"%s\": table contains %.0f rows, %d rows in sample",
+ RelationGetRelationName(relation),
+ astate.samplerows, astate.numrows)));
+
+ return astate.numrows;
+}
+
+/*
+ * Collect sample rows from the result of query.
+ * - Use all tuples in sample until target # of samples are collected.
+ * - Subsequently, replace already-sampled tuples randomly.
+ */
+static void
+analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate)
+{
+ int targrows = astate->targrows;
+ int pos; /* array index to store tuple in */
+ MemoryContext oldcontext;
+
+ /* Always increment sample row counter. */
+ astate->samplerows += 1;
+
+ /*
+ * Determine the slot where this sample row should be stored. Set pos to
+ * negative value to indicate the row should be skipped.
+ */
+ if (astate->numrows < targrows)
+ {
+ /* First targrows rows are always included into the sample */
+ pos = astate->numrows++;
+ }
+ else
+ {
+ /*
+ * Now we start replacing tuples in the sample until we reach the end
+ * of the relation. Same algorithm as in acquire_sample_rows in
+ * analyze.c; see Jeff Vitter's paper.
+ */
+ if (astate->rowstoskip < 0)
+ astate->rowstoskip = anl_get_next_S(astate->samplerows, targrows,
+ &astate->rstate);
+
+ if (astate->rowstoskip <= 0)
+ {
+ /* Choose a random reservoir element to replace. */
+ pos = (int) (targrows * anl_random_fract());
+ Assert(pos >= 0 && pos < targrows);
+ heap_freetuple(astate->rows[pos]);
+ }
+ else
+ {
+ /* Skip this tuple. */
+ pos = -1;
+ }
+
+ astate->rowstoskip -= 1;
+ }
+
+ if (pos >= 0)
+ {
+ /*
+ * Create sample tuple from current result row, and store it in the
+ * position determined above. The tuple has to be created in anl_cxt.
+ */
+ oldcontext = MemoryContextSwitchTo(astate->anl_cxt);
+
+ astate->rows[pos] = make_tuple_from_result_row(res, row,
+ astate->rel,
+ astate->attinmeta,
+ astate->temp_cxt);
+
+ MemoryContextSwitchTo(oldcontext);
+ }
+}
+
+/*
+ * Create a tuple from the specified row of the PGresult.
+ *
+ * rel is the local representation of the foreign table, attinmeta is
+ * conversion data for the rel's tupdesc, and temp_context is a working
+ * context that can be reset after each tuple.
+ */
+static HeapTuple
+make_tuple_from_result_row(PGresult *res,
+ int row,
+ Relation rel,
+ AttInMetadata *attinmeta,
+ MemoryContext temp_context)
+{
+ HeapTuple tuple;
+ TupleDesc tupdesc = RelationGetDescr(rel);
+ Form_pg_attribute *attrs = tupdesc->attrs;
+ Datum *values;
+ bool *nulls;
+ ConversionLocation errpos;
+ ErrorContextCallback errcallback;
+ MemoryContext oldcontext;
+ int i;
+ int j;
+
+ Assert(row < PQntuples(res));
+
+ /*
+ * Do the following work in a temp context that we reset after each tuple.
+ * This cleans up not only the data we have direct access to, but any
+ * cruft the I/O functions might leak.
+ */
+ oldcontext = MemoryContextSwitchTo(temp_context);
+
+ values = (Datum *) palloc(tupdesc->natts * sizeof(Datum));
+ nulls = (bool *) palloc(tupdesc->natts * sizeof(bool));
+
+ /*
+ * Set up and install callback to report where conversion error occurs.
+ */
+ errpos.rel = rel;
+ errpos.cur_attno = 0;
+ errcallback.callback = conversion_error_callback;
+ errcallback.arg = (void *) &errpos;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /*
+ * i indexes columns in the relation, j indexes columns in the PGresult.
+ * We assume dropped columns are not represented in the PGresult.
+ */
+ for (i = 0, j = 0; i < tupdesc->natts; i++)
+ {
+ char *valstr;
+
+ /* skip dropped columns. */
+ if (attrs[i]->attisdropped)
+ {
+ values[i] = (Datum) 0;
+ nulls[i] = true;
+ continue;
+ }
+
+ /* convert value to internal representation */
+ if (PQgetisnull(res, row, j))
+ {
+ valstr = NULL;
+ nulls[i] = true;
+ }
+ else
+ {
+ valstr = PQgetvalue(res, row, j);
+ nulls[i] = false;
+ }
+
+ /* Note: apply the input function even to nulls, to support domains */
+ errpos.cur_attno = i + 1;
+ values[i] = InputFunctionCall(&attinmeta->attinfuncs[i],
+ valstr,
+ attinmeta->attioparams[i],
+ attinmeta->atttypmods[i]);
+ errpos.cur_attno = 0;
+
+ j++;
+ }
+
+ /* Uninstall error context callback. */
+ error_context_stack = errcallback.previous;
+
+ /* check result and tuple descriptor have the same number of columns */
+ if (j != PQnfields(res))
+ elog(ERROR, "remote query result does not match the foreign table");
+
+ /*
+ * Build the result tuple in caller's memory context.
+ */
+ MemoryContextSwitchTo(oldcontext);
+
+ tuple = heap_form_tuple(tupdesc, values, nulls);
+
+ /* Clean up */
+ MemoryContextReset(temp_context);
+
+ return tuple;
+}
+
+/*
+ * Callback function which is called when error occurs during column value
+ * conversion. Print names of column and relation.
+ */
+static void
+conversion_error_callback(void *arg)
+{
+ ConversionLocation *errpos = (ConversionLocation *) arg;
+ TupleDesc tupdesc = RelationGetDescr(errpos->rel);
+
+ if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts)
+ errcontext("column \"%s\" of foreign table \"%s\"",
+ NameStr(tupdesc->attrs[errpos->cur_attno - 1]->attname),
+ RelationGetRelationName(errpos->rel));
+}