aboutsummaryrefslogtreecommitdiff
path: root/contrib/postgres_fdw/postgres_fdw.c
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/postgres_fdw/postgres_fdw.c')
-rw-r--r--contrib/postgres_fdw/postgres_fdw.c329
1 files changed, 272 insertions, 57 deletions
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 2f2d4d171c1..9a31bbb86b2 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -87,8 +87,10 @@ enum FdwScanPrivateIndex
* 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
* 2) Integer list of target attribute numbers for INSERT/UPDATE
* (NIL for a DELETE)
- * 3) Boolean flag showing if the remote query has a RETURNING clause
- * 4) Integer list of attribute numbers retrieved by RETURNING, if any
+ * 3) Length till the end of VALUES clause for INSERT
+ * (-1 for a DELETE/UPDATE)
+ * 4) Boolean flag showing if the remote query has a RETURNING clause
+ * 5) Integer list of attribute numbers retrieved by RETURNING, if any
*/
enum FdwModifyPrivateIndex
{
@@ -96,6 +98,8 @@ enum FdwModifyPrivateIndex
FdwModifyPrivateUpdateSql,
/* Integer list of target attribute numbers for INSERT/UPDATE */
FdwModifyPrivateTargetAttnums,
+ /* Length till the end of VALUES clause (as an integer Value node) */
+ FdwModifyPrivateLen,
/* has-returning flag (as an integer Value node) */
FdwModifyPrivateHasReturning,
/* Integer list of attribute numbers retrieved by RETURNING */
@@ -176,7 +180,10 @@ typedef struct PgFdwModifyState
/* extracted fdw_private data */
char *query; /* text of INSERT/UPDATE/DELETE command */
+ char *orig_query; /* original text of INSERT command */
List *target_attrs; /* list of target attribute numbers */
+ int values_end; /* length up to the end of VALUES */
+ int batch_size; /* value of FDW option "batch_size" */
bool has_returning; /* is there a RETURNING clause? */
List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
@@ -185,6 +192,9 @@ typedef struct PgFdwModifyState
int p_nums; /* number of parameters to transmit */
FmgrInfo *p_flinfo; /* output conversion functions for them */
+ /* batch operation stuff */
+ int num_slots; /* number of slots to insert */
+
/* working memory context */
MemoryContext temp_cxt; /* context for per-tuple temporary data */
@@ -343,6 +353,12 @@ static TupleTableSlot *postgresExecForeignInsert(EState *estate,
ResultRelInfo *resultRelInfo,
TupleTableSlot *slot,
TupleTableSlot *planSlot);
+static TupleTableSlot **postgresExecForeignBatchInsert(EState *estate,
+ ResultRelInfo *resultRelInfo,
+ TupleTableSlot **slots,
+ TupleTableSlot **planSlots,
+ int *numSlots);
+static int postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo);
static TupleTableSlot *postgresExecForeignUpdate(EState *estate,
ResultRelInfo *resultRelInfo,
TupleTableSlot *slot,
@@ -429,20 +445,24 @@ static PgFdwModifyState *create_foreign_modify(EState *estate,
Plan *subplan,
char *query,
List *target_attrs,
+ int len,
bool has_returning,
List *retrieved_attrs);
-static TupleTableSlot *execute_foreign_modify(EState *estate,
+static TupleTableSlot **execute_foreign_modify(EState *estate,
ResultRelInfo *resultRelInfo,
CmdType operation,
- TupleTableSlot *slot,
- TupleTableSlot *planSlot);
+ TupleTableSlot **slots,
+ TupleTableSlot **planSlots,
+ int *numSlots);
static void prepare_foreign_modify(PgFdwModifyState *fmstate);
static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
ItemPointer tupleid,
- TupleTableSlot *slot);
+ TupleTableSlot **slots,
+ int numSlots);
static void store_returning_result(PgFdwModifyState *fmstate,
TupleTableSlot *slot, PGresult *res);
static void finish_foreign_modify(PgFdwModifyState *fmstate);
+static void deallocate_query(PgFdwModifyState *fmstate);
static List *build_remote_returning(Index rtindex, Relation rel,
List *returningList);
static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
@@ -505,6 +525,7 @@ static void apply_table_options(PgFdwRelationInfo *fpinfo);
static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
const PgFdwRelationInfo *fpinfo_o,
const PgFdwRelationInfo *fpinfo_i);
+static int get_batch_size_option(Relation rel);
/*
@@ -530,6 +551,8 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
routine->PlanForeignModify = postgresPlanForeignModify;
routine->BeginForeignModify = postgresBeginForeignModify;
routine->ExecForeignInsert = postgresExecForeignInsert;
+ routine->ExecForeignBatchInsert = postgresExecForeignBatchInsert;
+ routine->GetForeignModifyBatchSize = postgresGetForeignModifyBatchSize;
routine->ExecForeignUpdate = postgresExecForeignUpdate;
routine->ExecForeignDelete = postgresExecForeignDelete;
routine->EndForeignModify = postgresEndForeignModify;
@@ -1665,6 +1688,7 @@ postgresPlanForeignModify(PlannerInfo *root,
List *returningList = NIL;
List *retrieved_attrs = NIL;
bool doNothing = false;
+ int values_end_len = -1;
initStringInfo(&sql);
@@ -1752,7 +1776,7 @@ postgresPlanForeignModify(PlannerInfo *root,
deparseInsertSql(&sql, rte, resultRelation, rel,
targetAttrs, doNothing,
withCheckOptionList, returningList,
- &retrieved_attrs);
+ &retrieved_attrs, &values_end_len);
break;
case CMD_UPDATE:
deparseUpdateSql(&sql, rte, resultRelation, rel,
@@ -1776,8 +1800,9 @@ postgresPlanForeignModify(PlannerInfo *root,
* Build the fdw_private list that will be available to the executor.
* Items in the list must match enum FdwModifyPrivateIndex, above.
*/
- return list_make4(makeString(sql.data),
+ return list_make5(makeString(sql.data),
targetAttrs,
+ makeInteger(values_end_len),
makeInteger((retrieved_attrs != NIL)),
retrieved_attrs);
}
@@ -1797,6 +1822,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
char *query;
List *target_attrs;
bool has_returning;
+ int values_end_len;
List *retrieved_attrs;
RangeTblEntry *rte;
@@ -1812,6 +1838,8 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
FdwModifyPrivateUpdateSql));
target_attrs = (List *) list_nth(fdw_private,
FdwModifyPrivateTargetAttnums);
+ values_end_len = intVal(list_nth(fdw_private,
+ FdwModifyPrivateLen));
has_returning = intVal(list_nth(fdw_private,
FdwModifyPrivateHasReturning));
retrieved_attrs = (List *) list_nth(fdw_private,
@@ -1829,6 +1857,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
mtstate->mt_plans[subplan_index]->plan,
query,
target_attrs,
+ values_end_len,
has_returning,
retrieved_attrs);
@@ -1846,7 +1875,37 @@ postgresExecForeignInsert(EState *estate,
TupleTableSlot *planSlot)
{
PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
- TupleTableSlot *rslot;
+ TupleTableSlot **rslot;
+ int numSlots = 1;
+
+ /*
+ * If the fmstate has aux_fmstate set, use the aux_fmstate (see
+ * postgresBeginForeignInsert())
+ */
+ if (fmstate->aux_fmstate)
+ resultRelInfo->ri_FdwState = fmstate->aux_fmstate;
+ rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT,
+ &slot, &planSlot, &numSlots);
+ /* Revert that change */
+ if (fmstate->aux_fmstate)
+ resultRelInfo->ri_FdwState = fmstate;
+
+ return rslot ? *rslot : NULL;
+}
+
+/*
+ * postgresExecForeignBatchInsert
+ * Insert multiple rows into a foreign table
+ */
+static TupleTableSlot **
+postgresExecForeignBatchInsert(EState *estate,
+ ResultRelInfo *resultRelInfo,
+ TupleTableSlot **slots,
+ TupleTableSlot **planSlots,
+ int *numSlots)
+{
+ PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
+ TupleTableSlot **rslot;
/*
* If the fmstate has aux_fmstate set, use the aux_fmstate (see
@@ -1855,7 +1914,7 @@ postgresExecForeignInsert(EState *estate,
if (fmstate->aux_fmstate)
resultRelInfo->ri_FdwState = fmstate->aux_fmstate;
rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT,
- slot, planSlot);
+ slots, planSlots, numSlots);
/* Revert that change */
if (fmstate->aux_fmstate)
resultRelInfo->ri_FdwState = fmstate;
@@ -1864,6 +1923,42 @@ postgresExecForeignInsert(EState *estate,
}
/*
+ * postgresGetForeignModifyBatchSize
+ * Determine the maximum number of tuples that can be inserted in bulk
+ *
+ * Returns the batch size specified for server or table. When batching is not
+ * allowed (e.g. for tables with AFTER ROW triggers or with RETURNING clause),
+ * returns 1.
+ */
+static int
+postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo)
+{
+ int batch_size;
+
+ /* should be called only once */
+ Assert(resultRelInfo->ri_BatchSize == 0);
+
+ /*
+ * In EXPLAIN without ANALYZE, ri_fdwstate is NULL, so we have to lookup
+ * the option directly in server/table options. Otherwise just use the
+ * value we determined earlier.
+ */
+ if (resultRelInfo->ri_FdwState)
+ batch_size = ((PgFdwModifyState *) resultRelInfo->ri_FdwState)->batch_size;
+ else
+ batch_size = get_batch_size_option(resultRelInfo->ri_RelationDesc);
+
+ /* Disable batching when we have to use RETURNING. */
+ if (resultRelInfo->ri_projectReturning != NULL ||
+ (resultRelInfo->ri_TrigDesc &&
+ resultRelInfo->ri_TrigDesc->trig_insert_after_row))
+ return 1;
+
+ /* Otherwise use the batch size specified for server/table. */
+ return batch_size;
+}
+
+/*
* postgresExecForeignUpdate
* Update one row in a foreign table
*/
@@ -1873,8 +1968,13 @@ postgresExecForeignUpdate(EState *estate,
TupleTableSlot *slot,
TupleTableSlot *planSlot)
{
- return execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE,
- slot, planSlot);
+ TupleTableSlot **rslot;
+ int numSlots = 1;
+
+ rslot = execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE,
+ &slot, &planSlot, &numSlots);
+
+ return rslot ? rslot[0] : NULL;
}
/*
@@ -1887,8 +1987,13 @@ postgresExecForeignDelete(EState *estate,
TupleTableSlot *slot,
TupleTableSlot *planSlot)
{
- return execute_foreign_modify(estate, resultRelInfo, CMD_DELETE,
- slot, planSlot);
+ TupleTableSlot **rslot;
+ int numSlots = 1;
+
+ rslot = execute_foreign_modify(estate, resultRelInfo, CMD_DELETE,
+ &slot, &planSlot, &numSlots);
+
+ return rslot ? rslot[0] : NULL;
}
/*
@@ -1925,6 +2030,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
RangeTblEntry *rte;
TupleDesc tupdesc = RelationGetDescr(rel);
int attnum;
+ int values_end_len;
StringInfoData sql;
List *targetAttrs = NIL;
List *retrieved_attrs = NIL;
@@ -2001,7 +2107,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing,
resultRelInfo->ri_WithCheckOptions,
resultRelInfo->ri_returningList,
- &retrieved_attrs);
+ &retrieved_attrs, &values_end_len);
/* Construct an execution state. */
fmstate = create_foreign_modify(mtstate->ps.state,
@@ -2011,6 +2117,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
NULL,
sql.data,
targetAttrs,
+ values_end_len,
retrieved_attrs != NIL,
retrieved_attrs);
@@ -2636,6 +2743,13 @@ postgresExplainForeignModify(ModifyTableState *mtstate,
FdwModifyPrivateUpdateSql));
ExplainPropertyText("Remote SQL", sql, es);
+
+ /*
+ * For INSERT we should always have batch size >= 1, but UPDATE
+ * and DELETE don't support batching so don't show the property.
+ */
+ if (rinfo->ri_BatchSize > 0)
+ ExplainPropertyInteger("Batch Size", NULL, rinfo->ri_BatchSize, es);
}
}
@@ -3530,6 +3644,7 @@ create_foreign_modify(EState *estate,
Plan *subplan,
char *query,
List *target_attrs,
+ int values_end,
bool has_returning,
List *retrieved_attrs)
{
@@ -3564,7 +3679,10 @@ create_foreign_modify(EState *estate,
/* Set up remote query information. */
fmstate->query = query;
+ if (operation == CMD_INSERT)
+ fmstate->orig_query = pstrdup(fmstate->query);
fmstate->target_attrs = target_attrs;
+ fmstate->values_end = values_end;
fmstate->has_returning = has_returning;
fmstate->retrieved_attrs = retrieved_attrs;
@@ -3616,6 +3734,12 @@ create_foreign_modify(EState *estate,
Assert(fmstate->p_nums <= n_params);
+ /* Set batch_size from foreign server/table options. */
+ if (operation == CMD_INSERT)
+ fmstate->batch_size = get_batch_size_option(rel);
+
+ fmstate->num_slots = 1;
+
/* Initialize auxiliary state */
fmstate->aux_fmstate = NULL;
@@ -3626,26 +3750,48 @@ create_foreign_modify(EState *estate,
* execute_foreign_modify
* Perform foreign-table modification as required, and fetch RETURNING
* result if any. (This is the shared guts of postgresExecForeignInsert,
- * postgresExecForeignUpdate, and postgresExecForeignDelete.)
+ * postgresExecForeignBatchInsert, postgresExecForeignUpdate, and
+ * postgresExecForeignDelete.)
*/
-static TupleTableSlot *
+static TupleTableSlot **
execute_foreign_modify(EState *estate,
ResultRelInfo *resultRelInfo,
CmdType operation,
- TupleTableSlot *slot,
- TupleTableSlot *planSlot)
+ TupleTableSlot **slots,
+ TupleTableSlot **planSlots,
+ int *numSlots)
{
PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
ItemPointer ctid = NULL;
const char **p_values;
PGresult *res;
int n_rows;
+ StringInfoData sql;
/* The operation should be INSERT, UPDATE, or DELETE */
Assert(operation == CMD_INSERT ||
operation == CMD_UPDATE ||
operation == CMD_DELETE);
+ /*
+ * If the existing query was deparsed and prepared for a different number
+ * of rows, rebuild it for the proper number.
+ */
+ if (operation == CMD_INSERT && fmstate->num_slots != *numSlots)
+ {
+ /* Destroy the prepared statement created previously */
+ if (fmstate->p_name)
+ deallocate_query(fmstate);
+
+ /* Build INSERT string with numSlots records in its VALUES clause. */
+ initStringInfo(&sql);
+ rebuildInsertSql(&sql, fmstate->orig_query, fmstate->values_end,
+ fmstate->p_nums, *numSlots - 1);
+ pfree(fmstate->query);
+ fmstate->query = sql.data;
+ fmstate->num_slots = *numSlots;
+ }
+
/* Set up the prepared statement on the remote server, if we didn't yet */
if (!fmstate->p_name)
prepare_foreign_modify(fmstate);
@@ -3658,7 +3804,7 @@ execute_foreign_modify(EState *estate,
Datum datum;
bool isNull;
- datum = ExecGetJunkAttribute(planSlot,
+ datum = ExecGetJunkAttribute(planSlots[0],
fmstate->ctidAttno,
&isNull);
/* shouldn't ever get a null result... */
@@ -3668,14 +3814,14 @@ execute_foreign_modify(EState *estate,
}
/* Convert parameters needed by prepared statement to text form */
- p_values = convert_prep_stmt_params(fmstate, ctid, slot);
+ p_values = convert_prep_stmt_params(fmstate, ctid, slots, *numSlots);
/*
* Execute the prepared statement.
*/
if (!PQsendQueryPrepared(fmstate->conn,
fmstate->p_name,
- fmstate->p_nums,
+ fmstate->p_nums * (*numSlots),
p_values,
NULL,
NULL,
@@ -3696,9 +3842,10 @@ execute_foreign_modify(EState *estate,
/* Check number of rows affected, and fetch RETURNING tuple if any */
if (fmstate->has_returning)
{
+ Assert(*numSlots == 1);
n_rows = PQntuples(res);
if (n_rows > 0)
- store_returning_result(fmstate, slot, res);
+ store_returning_result(fmstate, slots[0], res);
}
else
n_rows = atoi(PQcmdTuples(res));
@@ -3708,10 +3855,12 @@ execute_foreign_modify(EState *estate,
MemoryContextReset(fmstate->temp_cxt);
+ *numSlots = n_rows;
+
/*
* Return NULL if nothing was inserted/updated/deleted on the remote end
*/
- return (n_rows > 0) ? slot : NULL;
+ return (n_rows > 0) ? slots : NULL;
}
/*
@@ -3771,52 +3920,64 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
static const char **
convert_prep_stmt_params(PgFdwModifyState *fmstate,
ItemPointer tupleid,
- TupleTableSlot *slot)
+ TupleTableSlot **slots,
+ int numSlots)
{
const char **p_values;
+ int i;
+ int j;
int pindex = 0;
MemoryContext oldcontext;
oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
- p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
+ p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums * numSlots);
+
+ /* ctid is provided only for UPDATE/DELETE, which don't allow batching */
+ Assert(!(tupleid != NULL && numSlots > 1));
/* 1st parameter should be ctid, if it's in use */
if (tupleid != NULL)
{
+ Assert(numSlots == 1);
/* don't need set_transmission_modes for TID output */
p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
PointerGetDatum(tupleid));
pindex++;
}
- /* get following parameters from slot */
- if (slot != NULL && fmstate->target_attrs != NIL)
+ /* get following parameters from slots */
+ if (slots != NULL && fmstate->target_attrs != NIL)
{
int nestlevel;
ListCell *lc;
nestlevel = set_transmission_modes();
- foreach(lc, fmstate->target_attrs)
+ for (i = 0; i < numSlots; i++)
{
- int attnum = lfirst_int(lc);
- Datum value;
- bool isnull;
+ j = (tupleid != NULL) ? 1 : 0;
+ foreach(lc, fmstate->target_attrs)
+ {
+ int attnum = lfirst_int(lc);
+ Datum value;
+ bool isnull;
- value = slot_getattr(slot, attnum, &isnull);
- if (isnull)
- p_values[pindex] = NULL;
- else
- p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
- value);
- pindex++;
+ value = slot_getattr(slots[i], attnum, &isnull);
+ if (isnull)
+ p_values[pindex] = NULL;
+ else
+ p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[j],
+ value);
+ pindex++;
+ j++;
+ }
}
reset_transmission_modes(nestlevel);
}
- Assert(pindex == fmstate->p_nums);
+ Assert(pindex == fmstate->p_nums * numSlots);
MemoryContextSwitchTo(oldcontext);
@@ -3870,23 +4031,7 @@ finish_foreign_modify(PgFdwModifyState *fmstate)
Assert(fmstate != NULL);
/* If we created a prepared statement, destroy it */
- if (fmstate->p_name)
- {
- char sql[64];
- PGresult *res;
-
- snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
-
- /*
- * We don't use a PG_TRY block here, so be careful not to throw error
- * without releasing the PGresult.
- */
- res = pgfdw_exec_query(fmstate->conn, sql);
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
- pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
- PQclear(res);
- fmstate->p_name = NULL;
- }
+ deallocate_query(fmstate);
/* Release remote connection */
ReleaseConnection(fmstate->conn);
@@ -3894,6 +4039,34 @@ finish_foreign_modify(PgFdwModifyState *fmstate)
}
/*
+ * deallocate_query
+ * Deallocate a prepared statement for a foreign insert/update/delete
+ * operation
+ */
+static void
+deallocate_query(PgFdwModifyState *fmstate)
+{
+ char sql[64];
+ PGresult *res;
+
+ /* do nothing if the query is not allocated */
+ if (!fmstate->p_name)
+ return;
+
+ snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
+
+ /*
+ * We don't use a PG_TRY block here, so be careful not to throw error
+ * without releasing the PGresult.
+ */
+ res = pgfdw_exec_query(fmstate->conn, sql);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
+ PQclear(res);
+ fmstate->p_name = NULL;
+}
+
+/*
* build_remote_returning
* Build a RETURNING targetlist of a remote query for performing an
* UPDATE/DELETE .. RETURNING on a join directly
@@ -6577,3 +6750,45 @@ find_em_expr_for_input_target(PlannerInfo *root,
elog(ERROR, "could not find pathkey item to sort");
return NULL; /* keep compiler quiet */
}
+
+/*
+ * Determine batch size for a given foreign table. The option specified for
+ * a table has precedence.
+ */
+static int
+get_batch_size_option(Relation rel)
+{
+ Oid foreigntableid = RelationGetRelid(rel);
+ ForeignTable *table;
+ ForeignServer *server;
+ List *options;
+ ListCell *lc;
+
+ /* we use 1 by default, which means "no batching" */
+ int batch_size = 1;
+
+ /*
+ * Load options for table and server. We append server options after
+ * table options, because table options take precedence.
+ */
+ table = GetForeignTable(foreigntableid);
+ server = GetForeignServer(table->serverid);
+
+ options = NIL;
+ options = list_concat(options, table->options);
+ options = list_concat(options, server->options);
+
+ /* See if either table or server specifies batch_size. */
+ foreach(lc, options)
+ {
+ DefElem *def = (DefElem *) lfirst(lc);
+
+ if (strcmp(def->defname, "batch_size") == 0)
+ {
+ batch_size = strtol(defGetString(def), NULL, 10);
+ break;
+ }
+ }
+
+ return batch_size;
+}