diff options
Diffstat (limited to 'contrib/postgres_fdw/postgres_fdw.c')
-rw-r--r-- | contrib/postgres_fdw/postgres_fdw.c | 329 |
1 files changed, 272 insertions, 57 deletions
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 2f2d4d171c1..9a31bbb86b2 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -87,8 +87,10 @@ enum FdwScanPrivateIndex * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server * 2) Integer list of target attribute numbers for INSERT/UPDATE * (NIL for a DELETE) - * 3) Boolean flag showing if the remote query has a RETURNING clause - * 4) Integer list of attribute numbers retrieved by RETURNING, if any + * 3) Length till the end of VALUES clause for INSERT + * (-1 for a DELETE/UPDATE) + * 4) Boolean flag showing if the remote query has a RETURNING clause + * 5) Integer list of attribute numbers retrieved by RETURNING, if any */ enum FdwModifyPrivateIndex { @@ -96,6 +98,8 @@ enum FdwModifyPrivateIndex FdwModifyPrivateUpdateSql, /* Integer list of target attribute numbers for INSERT/UPDATE */ FdwModifyPrivateTargetAttnums, + /* Length till the end of VALUES clause (as an integer Value node) */ + FdwModifyPrivateLen, /* has-returning flag (as an integer Value node) */ FdwModifyPrivateHasReturning, /* Integer list of attribute numbers retrieved by RETURNING */ @@ -176,7 +180,10 @@ typedef struct PgFdwModifyState /* extracted fdw_private data */ char *query; /* text of INSERT/UPDATE/DELETE command */ + char *orig_query; /* original text of INSERT command */ List *target_attrs; /* list of target attribute numbers */ + int values_end; /* length up to the end of VALUES */ + int batch_size; /* value of FDW option "batch_size" */ bool has_returning; /* is there a RETURNING clause? */ List *retrieved_attrs; /* attr numbers retrieved by RETURNING */ @@ -185,6 +192,9 @@ typedef struct PgFdwModifyState int p_nums; /* number of parameters to transmit */ FmgrInfo *p_flinfo; /* output conversion functions for them */ + /* batch operation stuff */ + int num_slots; /* number of slots to insert */ + /* working memory context */ MemoryContext temp_cxt; /* context for per-tuple temporary data */ @@ -343,6 +353,12 @@ static TupleTableSlot *postgresExecForeignInsert(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot); +static TupleTableSlot **postgresExecForeignBatchInsert(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int *numSlots); +static int postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo); static TupleTableSlot *postgresExecForeignUpdate(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, @@ -429,20 +445,24 @@ static PgFdwModifyState *create_foreign_modify(EState *estate, Plan *subplan, char *query, List *target_attrs, + int len, bool has_returning, List *retrieved_attrs); -static TupleTableSlot *execute_foreign_modify(EState *estate, +static TupleTableSlot **execute_foreign_modify(EState *estate, ResultRelInfo *resultRelInfo, CmdType operation, - TupleTableSlot *slot, - TupleTableSlot *planSlot); + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int *numSlots); static void prepare_foreign_modify(PgFdwModifyState *fmstate); static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate, ItemPointer tupleid, - TupleTableSlot *slot); + TupleTableSlot **slots, + int numSlots); static void store_returning_result(PgFdwModifyState *fmstate, TupleTableSlot *slot, PGresult *res); static void finish_foreign_modify(PgFdwModifyState *fmstate); +static void deallocate_query(PgFdwModifyState *fmstate); static List *build_remote_returning(Index rtindex, Relation rel, List *returningList); static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist); @@ -505,6 +525,7 @@ static void apply_table_options(PgFdwRelationInfo *fpinfo); static void merge_fdw_options(PgFdwRelationInfo *fpinfo, const PgFdwRelationInfo *fpinfo_o, const PgFdwRelationInfo *fpinfo_i); +static int get_batch_size_option(Relation rel); /* @@ -530,6 +551,8 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) routine->PlanForeignModify = postgresPlanForeignModify; routine->BeginForeignModify = postgresBeginForeignModify; routine->ExecForeignInsert = postgresExecForeignInsert; + routine->ExecForeignBatchInsert = postgresExecForeignBatchInsert; + routine->GetForeignModifyBatchSize = postgresGetForeignModifyBatchSize; routine->ExecForeignUpdate = postgresExecForeignUpdate; routine->ExecForeignDelete = postgresExecForeignDelete; routine->EndForeignModify = postgresEndForeignModify; @@ -1665,6 +1688,7 @@ postgresPlanForeignModify(PlannerInfo *root, List *returningList = NIL; List *retrieved_attrs = NIL; bool doNothing = false; + int values_end_len = -1; initStringInfo(&sql); @@ -1752,7 +1776,7 @@ postgresPlanForeignModify(PlannerInfo *root, deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing, withCheckOptionList, returningList, - &retrieved_attrs); + &retrieved_attrs, &values_end_len); break; case CMD_UPDATE: deparseUpdateSql(&sql, rte, resultRelation, rel, @@ -1776,8 +1800,9 @@ postgresPlanForeignModify(PlannerInfo *root, * Build the fdw_private list that will be available to the executor. * Items in the list must match enum FdwModifyPrivateIndex, above. */ - return list_make4(makeString(sql.data), + return list_make5(makeString(sql.data), targetAttrs, + makeInteger(values_end_len), makeInteger((retrieved_attrs != NIL)), retrieved_attrs); } @@ -1797,6 +1822,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate, char *query; List *target_attrs; bool has_returning; + int values_end_len; List *retrieved_attrs; RangeTblEntry *rte; @@ -1812,6 +1838,8 @@ postgresBeginForeignModify(ModifyTableState *mtstate, FdwModifyPrivateUpdateSql)); target_attrs = (List *) list_nth(fdw_private, FdwModifyPrivateTargetAttnums); + values_end_len = intVal(list_nth(fdw_private, + FdwModifyPrivateLen)); has_returning = intVal(list_nth(fdw_private, FdwModifyPrivateHasReturning)); retrieved_attrs = (List *) list_nth(fdw_private, @@ -1829,6 +1857,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate, mtstate->mt_plans[subplan_index]->plan, query, target_attrs, + values_end_len, has_returning, retrieved_attrs); @@ -1846,7 +1875,37 @@ postgresExecForeignInsert(EState *estate, TupleTableSlot *planSlot) { PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; - TupleTableSlot *rslot; + TupleTableSlot **rslot; + int numSlots = 1; + + /* + * If the fmstate has aux_fmstate set, use the aux_fmstate (see + * postgresBeginForeignInsert()) + */ + if (fmstate->aux_fmstate) + resultRelInfo->ri_FdwState = fmstate->aux_fmstate; + rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT, + &slot, &planSlot, &numSlots); + /* Revert that change */ + if (fmstate->aux_fmstate) + resultRelInfo->ri_FdwState = fmstate; + + return rslot ? *rslot : NULL; +} + +/* + * postgresExecForeignBatchInsert + * Insert multiple rows into a foreign table + */ +static TupleTableSlot ** +postgresExecForeignBatchInsert(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int *numSlots) +{ + PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; + TupleTableSlot **rslot; /* * If the fmstate has aux_fmstate set, use the aux_fmstate (see @@ -1855,7 +1914,7 @@ postgresExecForeignInsert(EState *estate, if (fmstate->aux_fmstate) resultRelInfo->ri_FdwState = fmstate->aux_fmstate; rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT, - slot, planSlot); + slots, planSlots, numSlots); /* Revert that change */ if (fmstate->aux_fmstate) resultRelInfo->ri_FdwState = fmstate; @@ -1864,6 +1923,42 @@ postgresExecForeignInsert(EState *estate, } /* + * postgresGetForeignModifyBatchSize + * Determine the maximum number of tuples that can be inserted in bulk + * + * Returns the batch size specified for server or table. When batching is not + * allowed (e.g. for tables with AFTER ROW triggers or with RETURNING clause), + * returns 1. + */ +static int +postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo) +{ + int batch_size; + + /* should be called only once */ + Assert(resultRelInfo->ri_BatchSize == 0); + + /* + * In EXPLAIN without ANALYZE, ri_fdwstate is NULL, so we have to lookup + * the option directly in server/table options. Otherwise just use the + * value we determined earlier. + */ + if (resultRelInfo->ri_FdwState) + batch_size = ((PgFdwModifyState *) resultRelInfo->ri_FdwState)->batch_size; + else + batch_size = get_batch_size_option(resultRelInfo->ri_RelationDesc); + + /* Disable batching when we have to use RETURNING. */ + if (resultRelInfo->ri_projectReturning != NULL || + (resultRelInfo->ri_TrigDesc && + resultRelInfo->ri_TrigDesc->trig_insert_after_row)) + return 1; + + /* Otherwise use the batch size specified for server/table. */ + return batch_size; +} + +/* * postgresExecForeignUpdate * Update one row in a foreign table */ @@ -1873,8 +1968,13 @@ postgresExecForeignUpdate(EState *estate, TupleTableSlot *slot, TupleTableSlot *planSlot) { - return execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE, - slot, planSlot); + TupleTableSlot **rslot; + int numSlots = 1; + + rslot = execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE, + &slot, &planSlot, &numSlots); + + return rslot ? rslot[0] : NULL; } /* @@ -1887,8 +1987,13 @@ postgresExecForeignDelete(EState *estate, TupleTableSlot *slot, TupleTableSlot *planSlot) { - return execute_foreign_modify(estate, resultRelInfo, CMD_DELETE, - slot, planSlot); + TupleTableSlot **rslot; + int numSlots = 1; + + rslot = execute_foreign_modify(estate, resultRelInfo, CMD_DELETE, + &slot, &planSlot, &numSlots); + + return rslot ? rslot[0] : NULL; } /* @@ -1925,6 +2030,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, RangeTblEntry *rte; TupleDesc tupdesc = RelationGetDescr(rel); int attnum; + int values_end_len; StringInfoData sql; List *targetAttrs = NIL; List *retrieved_attrs = NIL; @@ -2001,7 +2107,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing, resultRelInfo->ri_WithCheckOptions, resultRelInfo->ri_returningList, - &retrieved_attrs); + &retrieved_attrs, &values_end_len); /* Construct an execution state. */ fmstate = create_foreign_modify(mtstate->ps.state, @@ -2011,6 +2117,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, NULL, sql.data, targetAttrs, + values_end_len, retrieved_attrs != NIL, retrieved_attrs); @@ -2636,6 +2743,13 @@ postgresExplainForeignModify(ModifyTableState *mtstate, FdwModifyPrivateUpdateSql)); ExplainPropertyText("Remote SQL", sql, es); + + /* + * For INSERT we should always have batch size >= 1, but UPDATE + * and DELETE don't support batching so don't show the property. + */ + if (rinfo->ri_BatchSize > 0) + ExplainPropertyInteger("Batch Size", NULL, rinfo->ri_BatchSize, es); } } @@ -3530,6 +3644,7 @@ create_foreign_modify(EState *estate, Plan *subplan, char *query, List *target_attrs, + int values_end, bool has_returning, List *retrieved_attrs) { @@ -3564,7 +3679,10 @@ create_foreign_modify(EState *estate, /* Set up remote query information. */ fmstate->query = query; + if (operation == CMD_INSERT) + fmstate->orig_query = pstrdup(fmstate->query); fmstate->target_attrs = target_attrs; + fmstate->values_end = values_end; fmstate->has_returning = has_returning; fmstate->retrieved_attrs = retrieved_attrs; @@ -3616,6 +3734,12 @@ create_foreign_modify(EState *estate, Assert(fmstate->p_nums <= n_params); + /* Set batch_size from foreign server/table options. */ + if (operation == CMD_INSERT) + fmstate->batch_size = get_batch_size_option(rel); + + fmstate->num_slots = 1; + /* Initialize auxiliary state */ fmstate->aux_fmstate = NULL; @@ -3626,26 +3750,48 @@ create_foreign_modify(EState *estate, * execute_foreign_modify * Perform foreign-table modification as required, and fetch RETURNING * result if any. (This is the shared guts of postgresExecForeignInsert, - * postgresExecForeignUpdate, and postgresExecForeignDelete.) + * postgresExecForeignBatchInsert, postgresExecForeignUpdate, and + * postgresExecForeignDelete.) */ -static TupleTableSlot * +static TupleTableSlot ** execute_foreign_modify(EState *estate, ResultRelInfo *resultRelInfo, CmdType operation, - TupleTableSlot *slot, - TupleTableSlot *planSlot) + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int *numSlots) { PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; ItemPointer ctid = NULL; const char **p_values; PGresult *res; int n_rows; + StringInfoData sql; /* The operation should be INSERT, UPDATE, or DELETE */ Assert(operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE); + /* + * If the existing query was deparsed and prepared for a different number + * of rows, rebuild it for the proper number. + */ + if (operation == CMD_INSERT && fmstate->num_slots != *numSlots) + { + /* Destroy the prepared statement created previously */ + if (fmstate->p_name) + deallocate_query(fmstate); + + /* Build INSERT string with numSlots records in its VALUES clause. */ + initStringInfo(&sql); + rebuildInsertSql(&sql, fmstate->orig_query, fmstate->values_end, + fmstate->p_nums, *numSlots - 1); + pfree(fmstate->query); + fmstate->query = sql.data; + fmstate->num_slots = *numSlots; + } + /* Set up the prepared statement on the remote server, if we didn't yet */ if (!fmstate->p_name) prepare_foreign_modify(fmstate); @@ -3658,7 +3804,7 @@ execute_foreign_modify(EState *estate, Datum datum; bool isNull; - datum = ExecGetJunkAttribute(planSlot, + datum = ExecGetJunkAttribute(planSlots[0], fmstate->ctidAttno, &isNull); /* shouldn't ever get a null result... */ @@ -3668,14 +3814,14 @@ execute_foreign_modify(EState *estate, } /* Convert parameters needed by prepared statement to text form */ - p_values = convert_prep_stmt_params(fmstate, ctid, slot); + p_values = convert_prep_stmt_params(fmstate, ctid, slots, *numSlots); /* * Execute the prepared statement. */ if (!PQsendQueryPrepared(fmstate->conn, fmstate->p_name, - fmstate->p_nums, + fmstate->p_nums * (*numSlots), p_values, NULL, NULL, @@ -3696,9 +3842,10 @@ execute_foreign_modify(EState *estate, /* Check number of rows affected, and fetch RETURNING tuple if any */ if (fmstate->has_returning) { + Assert(*numSlots == 1); n_rows = PQntuples(res); if (n_rows > 0) - store_returning_result(fmstate, slot, res); + store_returning_result(fmstate, slots[0], res); } else n_rows = atoi(PQcmdTuples(res)); @@ -3708,10 +3855,12 @@ execute_foreign_modify(EState *estate, MemoryContextReset(fmstate->temp_cxt); + *numSlots = n_rows; + /* * Return NULL if nothing was inserted/updated/deleted on the remote end */ - return (n_rows > 0) ? slot : NULL; + return (n_rows > 0) ? slots : NULL; } /* @@ -3771,52 +3920,64 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) static const char ** convert_prep_stmt_params(PgFdwModifyState *fmstate, ItemPointer tupleid, - TupleTableSlot *slot) + TupleTableSlot **slots, + int numSlots) { const char **p_values; + int i; + int j; int pindex = 0; MemoryContext oldcontext; oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt); - p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums); + p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums * numSlots); + + /* ctid is provided only for UPDATE/DELETE, which don't allow batching */ + Assert(!(tupleid != NULL && numSlots > 1)); /* 1st parameter should be ctid, if it's in use */ if (tupleid != NULL) { + Assert(numSlots == 1); /* don't need set_transmission_modes for TID output */ p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex], PointerGetDatum(tupleid)); pindex++; } - /* get following parameters from slot */ - if (slot != NULL && fmstate->target_attrs != NIL) + /* get following parameters from slots */ + if (slots != NULL && fmstate->target_attrs != NIL) { int nestlevel; ListCell *lc; nestlevel = set_transmission_modes(); - foreach(lc, fmstate->target_attrs) + for (i = 0; i < numSlots; i++) { - int attnum = lfirst_int(lc); - Datum value; - bool isnull; + j = (tupleid != NULL) ? 1 : 0; + foreach(lc, fmstate->target_attrs) + { + int attnum = lfirst_int(lc); + Datum value; + bool isnull; - value = slot_getattr(slot, attnum, &isnull); - if (isnull) - p_values[pindex] = NULL; - else - p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex], - value); - pindex++; + value = slot_getattr(slots[i], attnum, &isnull); + if (isnull) + p_values[pindex] = NULL; + else + p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[j], + value); + pindex++; + j++; + } } reset_transmission_modes(nestlevel); } - Assert(pindex == fmstate->p_nums); + Assert(pindex == fmstate->p_nums * numSlots); MemoryContextSwitchTo(oldcontext); @@ -3870,23 +4031,7 @@ finish_foreign_modify(PgFdwModifyState *fmstate) Assert(fmstate != NULL); /* If we created a prepared statement, destroy it */ - if (fmstate->p_name) - { - char sql[64]; - PGresult *res; - - snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name); - - /* - * We don't use a PG_TRY block here, so be careful not to throw error - * without releasing the PGresult. - */ - res = pgfdw_exec_query(fmstate->conn, sql); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - pgfdw_report_error(ERROR, res, fmstate->conn, true, sql); - PQclear(res); - fmstate->p_name = NULL; - } + deallocate_query(fmstate); /* Release remote connection */ ReleaseConnection(fmstate->conn); @@ -3894,6 +4039,34 @@ finish_foreign_modify(PgFdwModifyState *fmstate) } /* + * deallocate_query + * Deallocate a prepared statement for a foreign insert/update/delete + * operation + */ +static void +deallocate_query(PgFdwModifyState *fmstate) +{ + char sql[64]; + PGresult *res; + + /* do nothing if the query is not allocated */ + if (!fmstate->p_name) + return; + + snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name); + + /* + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + res = pgfdw_exec_query(fmstate->conn, sql); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, fmstate->conn, true, sql); + PQclear(res); + fmstate->p_name = NULL; +} + +/* * build_remote_returning * Build a RETURNING targetlist of a remote query for performing an * UPDATE/DELETE .. RETURNING on a join directly @@ -6577,3 +6750,45 @@ find_em_expr_for_input_target(PlannerInfo *root, elog(ERROR, "could not find pathkey item to sort"); return NULL; /* keep compiler quiet */ } + +/* + * Determine batch size for a given foreign table. The option specified for + * a table has precedence. + */ +static int +get_batch_size_option(Relation rel) +{ + Oid foreigntableid = RelationGetRelid(rel); + ForeignTable *table; + ForeignServer *server; + List *options; + ListCell *lc; + + /* we use 1 by default, which means "no batching" */ + int batch_size = 1; + + /* + * Load options for table and server. We append server options after + * table options, because table options take precedence. + */ + table = GetForeignTable(foreigntableid); + server = GetForeignServer(table->serverid); + + options = NIL; + options = list_concat(options, table->options); + options = list_concat(options, server->options); + + /* See if either table or server specifies batch_size. */ + foreach(lc, options) + { + DefElem *def = (DefElem *) lfirst(lc); + + if (strcmp(def->defname, "batch_size") == 0) + { + batch_size = strtol(defGetString(def), NULL, 10); + break; + } + } + + return batch_size; +} |