From b663a4136331de6c7364226e3dbf7c88bfee7145 Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Wed, 20 Jan 2021 23:05:46 +0100 Subject: Implement support for bulk inserts in postgres_fdw Extends the FDW API to allow batching inserts into foreign tables. That is usually much more efficient than inserting individual rows, due to high latency for each round-trip to the foreign server. It was possible to implement something similar in the regular FDW API, but it was inconvenient and there were issues with reporting the number of actually inserted rows etc. This extends the FDW API with two new functions: * GetForeignModifyBatchSize - allows the FDW picking optimal batch size * ExecForeignBatchInsert - inserts a batch of rows at once Currently, only INSERT queries support batching. Support for DELETE and UPDATE may be added in the future. This also implements batching for postgres_fdw. The batch size may be specified using "batch_size" option both at the server and table level. The initial patch version was written by me, but it was rewritten and improved in many ways by Takayuki Tsunakawa. Author: Takayuki Tsunakawa Reviewed-by: Tomas Vondra, Amit Langote Discussion: https://postgr.es/m/20200628151002.7x5laxwpgvkyiu3q@development --- contrib/postgres_fdw/postgres_fdw.c | 329 +++++++++++++++++++++++++++++------- 1 file changed, 272 insertions(+), 57 deletions(-) (limited to 'contrib/postgres_fdw/postgres_fdw.c') diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 2f2d4d171c1..9a31bbb86b2 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -87,8 +87,10 @@ enum FdwScanPrivateIndex * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server * 2) Integer list of target attribute numbers for INSERT/UPDATE * (NIL for a DELETE) - * 3) Boolean flag showing if the remote query has a RETURNING clause - * 4) Integer list of attribute numbers retrieved by RETURNING, if any + * 3) Length till the end of VALUES clause for INSERT + * (-1 for a DELETE/UPDATE) + * 4) Boolean flag showing if the remote query has a RETURNING clause + * 5) Integer list of attribute numbers retrieved by RETURNING, if any */ enum FdwModifyPrivateIndex { @@ -96,6 +98,8 @@ enum FdwModifyPrivateIndex FdwModifyPrivateUpdateSql, /* Integer list of target attribute numbers for INSERT/UPDATE */ FdwModifyPrivateTargetAttnums, + /* Length till the end of VALUES clause (as an integer Value node) */ + FdwModifyPrivateLen, /* has-returning flag (as an integer Value node) */ FdwModifyPrivateHasReturning, /* Integer list of attribute numbers retrieved by RETURNING */ @@ -176,7 +180,10 @@ typedef struct PgFdwModifyState /* extracted fdw_private data */ char *query; /* text of INSERT/UPDATE/DELETE command */ + char *orig_query; /* original text of INSERT command */ List *target_attrs; /* list of target attribute numbers */ + int values_end; /* length up to the end of VALUES */ + int batch_size; /* value of FDW option "batch_size" */ bool has_returning; /* is there a RETURNING clause? */ List *retrieved_attrs; /* attr numbers retrieved by RETURNING */ @@ -185,6 +192,9 @@ typedef struct PgFdwModifyState int p_nums; /* number of parameters to transmit */ FmgrInfo *p_flinfo; /* output conversion functions for them */ + /* batch operation stuff */ + int num_slots; /* number of slots to insert */ + /* working memory context */ MemoryContext temp_cxt; /* context for per-tuple temporary data */ @@ -343,6 +353,12 @@ static TupleTableSlot *postgresExecForeignInsert(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot); +static TupleTableSlot **postgresExecForeignBatchInsert(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int *numSlots); +static int postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo); static TupleTableSlot *postgresExecForeignUpdate(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, @@ -429,20 +445,24 @@ static PgFdwModifyState *create_foreign_modify(EState *estate, Plan *subplan, char *query, List *target_attrs, + int len, bool has_returning, List *retrieved_attrs); -static TupleTableSlot *execute_foreign_modify(EState *estate, +static TupleTableSlot **execute_foreign_modify(EState *estate, ResultRelInfo *resultRelInfo, CmdType operation, - TupleTableSlot *slot, - TupleTableSlot *planSlot); + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int *numSlots); static void prepare_foreign_modify(PgFdwModifyState *fmstate); static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate, ItemPointer tupleid, - TupleTableSlot *slot); + TupleTableSlot **slots, + int numSlots); static void store_returning_result(PgFdwModifyState *fmstate, TupleTableSlot *slot, PGresult *res); static void finish_foreign_modify(PgFdwModifyState *fmstate); +static void deallocate_query(PgFdwModifyState *fmstate); static List *build_remote_returning(Index rtindex, Relation rel, List *returningList); static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist); @@ -505,6 +525,7 @@ static void apply_table_options(PgFdwRelationInfo *fpinfo); static void merge_fdw_options(PgFdwRelationInfo *fpinfo, const PgFdwRelationInfo *fpinfo_o, const PgFdwRelationInfo *fpinfo_i); +static int get_batch_size_option(Relation rel); /* @@ -530,6 +551,8 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) routine->PlanForeignModify = postgresPlanForeignModify; routine->BeginForeignModify = postgresBeginForeignModify; routine->ExecForeignInsert = postgresExecForeignInsert; + routine->ExecForeignBatchInsert = postgresExecForeignBatchInsert; + routine->GetForeignModifyBatchSize = postgresGetForeignModifyBatchSize; routine->ExecForeignUpdate = postgresExecForeignUpdate; routine->ExecForeignDelete = postgresExecForeignDelete; routine->EndForeignModify = postgresEndForeignModify; @@ -1665,6 +1688,7 @@ postgresPlanForeignModify(PlannerInfo *root, List *returningList = NIL; List *retrieved_attrs = NIL; bool doNothing = false; + int values_end_len = -1; initStringInfo(&sql); @@ -1752,7 +1776,7 @@ postgresPlanForeignModify(PlannerInfo *root, deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing, withCheckOptionList, returningList, - &retrieved_attrs); + &retrieved_attrs, &values_end_len); break; case CMD_UPDATE: deparseUpdateSql(&sql, rte, resultRelation, rel, @@ -1776,8 +1800,9 @@ postgresPlanForeignModify(PlannerInfo *root, * Build the fdw_private list that will be available to the executor. * Items in the list must match enum FdwModifyPrivateIndex, above. */ - return list_make4(makeString(sql.data), + return list_make5(makeString(sql.data), targetAttrs, + makeInteger(values_end_len), makeInteger((retrieved_attrs != NIL)), retrieved_attrs); } @@ -1797,6 +1822,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate, char *query; List *target_attrs; bool has_returning; + int values_end_len; List *retrieved_attrs; RangeTblEntry *rte; @@ -1812,6 +1838,8 @@ postgresBeginForeignModify(ModifyTableState *mtstate, FdwModifyPrivateUpdateSql)); target_attrs = (List *) list_nth(fdw_private, FdwModifyPrivateTargetAttnums); + values_end_len = intVal(list_nth(fdw_private, + FdwModifyPrivateLen)); has_returning = intVal(list_nth(fdw_private, FdwModifyPrivateHasReturning)); retrieved_attrs = (List *) list_nth(fdw_private, @@ -1829,6 +1857,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate, mtstate->mt_plans[subplan_index]->plan, query, target_attrs, + values_end_len, has_returning, retrieved_attrs); @@ -1846,7 +1875,37 @@ postgresExecForeignInsert(EState *estate, TupleTableSlot *planSlot) { PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; - TupleTableSlot *rslot; + TupleTableSlot **rslot; + int numSlots = 1; + + /* + * If the fmstate has aux_fmstate set, use the aux_fmstate (see + * postgresBeginForeignInsert()) + */ + if (fmstate->aux_fmstate) + resultRelInfo->ri_FdwState = fmstate->aux_fmstate; + rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT, + &slot, &planSlot, &numSlots); + /* Revert that change */ + if (fmstate->aux_fmstate) + resultRelInfo->ri_FdwState = fmstate; + + return rslot ? *rslot : NULL; +} + +/* + * postgresExecForeignBatchInsert + * Insert multiple rows into a foreign table + */ +static TupleTableSlot ** +postgresExecForeignBatchInsert(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int *numSlots) +{ + PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; + TupleTableSlot **rslot; /* * If the fmstate has aux_fmstate set, use the aux_fmstate (see @@ -1855,7 +1914,7 @@ postgresExecForeignInsert(EState *estate, if (fmstate->aux_fmstate) resultRelInfo->ri_FdwState = fmstate->aux_fmstate; rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT, - slot, planSlot); + slots, planSlots, numSlots); /* Revert that change */ if (fmstate->aux_fmstate) resultRelInfo->ri_FdwState = fmstate; @@ -1863,6 +1922,42 @@ postgresExecForeignInsert(EState *estate, return rslot; } +/* + * postgresGetForeignModifyBatchSize + * Determine the maximum number of tuples that can be inserted in bulk + * + * Returns the batch size specified for server or table. When batching is not + * allowed (e.g. for tables with AFTER ROW triggers or with RETURNING clause), + * returns 1. + */ +static int +postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo) +{ + int batch_size; + + /* should be called only once */ + Assert(resultRelInfo->ri_BatchSize == 0); + + /* + * In EXPLAIN without ANALYZE, ri_fdwstate is NULL, so we have to lookup + * the option directly in server/table options. Otherwise just use the + * value we determined earlier. + */ + if (resultRelInfo->ri_FdwState) + batch_size = ((PgFdwModifyState *) resultRelInfo->ri_FdwState)->batch_size; + else + batch_size = get_batch_size_option(resultRelInfo->ri_RelationDesc); + + /* Disable batching when we have to use RETURNING. */ + if (resultRelInfo->ri_projectReturning != NULL || + (resultRelInfo->ri_TrigDesc && + resultRelInfo->ri_TrigDesc->trig_insert_after_row)) + return 1; + + /* Otherwise use the batch size specified for server/table. */ + return batch_size; +} + /* * postgresExecForeignUpdate * Update one row in a foreign table @@ -1873,8 +1968,13 @@ postgresExecForeignUpdate(EState *estate, TupleTableSlot *slot, TupleTableSlot *planSlot) { - return execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE, - slot, planSlot); + TupleTableSlot **rslot; + int numSlots = 1; + + rslot = execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE, + &slot, &planSlot, &numSlots); + + return rslot ? rslot[0] : NULL; } /* @@ -1887,8 +1987,13 @@ postgresExecForeignDelete(EState *estate, TupleTableSlot *slot, TupleTableSlot *planSlot) { - return execute_foreign_modify(estate, resultRelInfo, CMD_DELETE, - slot, planSlot); + TupleTableSlot **rslot; + int numSlots = 1; + + rslot = execute_foreign_modify(estate, resultRelInfo, CMD_DELETE, + &slot, &planSlot, &numSlots); + + return rslot ? rslot[0] : NULL; } /* @@ -1925,6 +2030,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, RangeTblEntry *rte; TupleDesc tupdesc = RelationGetDescr(rel); int attnum; + int values_end_len; StringInfoData sql; List *targetAttrs = NIL; List *retrieved_attrs = NIL; @@ -2001,7 +2107,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing, resultRelInfo->ri_WithCheckOptions, resultRelInfo->ri_returningList, - &retrieved_attrs); + &retrieved_attrs, &values_end_len); /* Construct an execution state. */ fmstate = create_foreign_modify(mtstate->ps.state, @@ -2011,6 +2117,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate, NULL, sql.data, targetAttrs, + values_end_len, retrieved_attrs != NIL, retrieved_attrs); @@ -2636,6 +2743,13 @@ postgresExplainForeignModify(ModifyTableState *mtstate, FdwModifyPrivateUpdateSql)); ExplainPropertyText("Remote SQL", sql, es); + + /* + * For INSERT we should always have batch size >= 1, but UPDATE + * and DELETE don't support batching so don't show the property. + */ + if (rinfo->ri_BatchSize > 0) + ExplainPropertyInteger("Batch Size", NULL, rinfo->ri_BatchSize, es); } } @@ -3530,6 +3644,7 @@ create_foreign_modify(EState *estate, Plan *subplan, char *query, List *target_attrs, + int values_end, bool has_returning, List *retrieved_attrs) { @@ -3564,7 +3679,10 @@ create_foreign_modify(EState *estate, /* Set up remote query information. */ fmstate->query = query; + if (operation == CMD_INSERT) + fmstate->orig_query = pstrdup(fmstate->query); fmstate->target_attrs = target_attrs; + fmstate->values_end = values_end; fmstate->has_returning = has_returning; fmstate->retrieved_attrs = retrieved_attrs; @@ -3616,6 +3734,12 @@ create_foreign_modify(EState *estate, Assert(fmstate->p_nums <= n_params); + /* Set batch_size from foreign server/table options. */ + if (operation == CMD_INSERT) + fmstate->batch_size = get_batch_size_option(rel); + + fmstate->num_slots = 1; + /* Initialize auxiliary state */ fmstate->aux_fmstate = NULL; @@ -3626,26 +3750,48 @@ create_foreign_modify(EState *estate, * execute_foreign_modify * Perform foreign-table modification as required, and fetch RETURNING * result if any. (This is the shared guts of postgresExecForeignInsert, - * postgresExecForeignUpdate, and postgresExecForeignDelete.) + * postgresExecForeignBatchInsert, postgresExecForeignUpdate, and + * postgresExecForeignDelete.) */ -static TupleTableSlot * +static TupleTableSlot ** execute_foreign_modify(EState *estate, ResultRelInfo *resultRelInfo, CmdType operation, - TupleTableSlot *slot, - TupleTableSlot *planSlot) + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int *numSlots) { PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; ItemPointer ctid = NULL; const char **p_values; PGresult *res; int n_rows; + StringInfoData sql; /* The operation should be INSERT, UPDATE, or DELETE */ Assert(operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE); + /* + * If the existing query was deparsed and prepared for a different number + * of rows, rebuild it for the proper number. + */ + if (operation == CMD_INSERT && fmstate->num_slots != *numSlots) + { + /* Destroy the prepared statement created previously */ + if (fmstate->p_name) + deallocate_query(fmstate); + + /* Build INSERT string with numSlots records in its VALUES clause. */ + initStringInfo(&sql); + rebuildInsertSql(&sql, fmstate->orig_query, fmstate->values_end, + fmstate->p_nums, *numSlots - 1); + pfree(fmstate->query); + fmstate->query = sql.data; + fmstate->num_slots = *numSlots; + } + /* Set up the prepared statement on the remote server, if we didn't yet */ if (!fmstate->p_name) prepare_foreign_modify(fmstate); @@ -3658,7 +3804,7 @@ execute_foreign_modify(EState *estate, Datum datum; bool isNull; - datum = ExecGetJunkAttribute(planSlot, + datum = ExecGetJunkAttribute(planSlots[0], fmstate->ctidAttno, &isNull); /* shouldn't ever get a null result... */ @@ -3668,14 +3814,14 @@ execute_foreign_modify(EState *estate, } /* Convert parameters needed by prepared statement to text form */ - p_values = convert_prep_stmt_params(fmstate, ctid, slot); + p_values = convert_prep_stmt_params(fmstate, ctid, slots, *numSlots); /* * Execute the prepared statement. */ if (!PQsendQueryPrepared(fmstate->conn, fmstate->p_name, - fmstate->p_nums, + fmstate->p_nums * (*numSlots), p_values, NULL, NULL, @@ -3696,9 +3842,10 @@ execute_foreign_modify(EState *estate, /* Check number of rows affected, and fetch RETURNING tuple if any */ if (fmstate->has_returning) { + Assert(*numSlots == 1); n_rows = PQntuples(res); if (n_rows > 0) - store_returning_result(fmstate, slot, res); + store_returning_result(fmstate, slots[0], res); } else n_rows = atoi(PQcmdTuples(res)); @@ -3708,10 +3855,12 @@ execute_foreign_modify(EState *estate, MemoryContextReset(fmstate->temp_cxt); + *numSlots = n_rows; + /* * Return NULL if nothing was inserted/updated/deleted on the remote end */ - return (n_rows > 0) ? slot : NULL; + return (n_rows > 0) ? slots : NULL; } /* @@ -3771,52 +3920,64 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) static const char ** convert_prep_stmt_params(PgFdwModifyState *fmstate, ItemPointer tupleid, - TupleTableSlot *slot) + TupleTableSlot **slots, + int numSlots) { const char **p_values; + int i; + int j; int pindex = 0; MemoryContext oldcontext; oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt); - p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums); + p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums * numSlots); + + /* ctid is provided only for UPDATE/DELETE, which don't allow batching */ + Assert(!(tupleid != NULL && numSlots > 1)); /* 1st parameter should be ctid, if it's in use */ if (tupleid != NULL) { + Assert(numSlots == 1); /* don't need set_transmission_modes for TID output */ p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex], PointerGetDatum(tupleid)); pindex++; } - /* get following parameters from slot */ - if (slot != NULL && fmstate->target_attrs != NIL) + /* get following parameters from slots */ + if (slots != NULL && fmstate->target_attrs != NIL) { int nestlevel; ListCell *lc; nestlevel = set_transmission_modes(); - foreach(lc, fmstate->target_attrs) + for (i = 0; i < numSlots; i++) { - int attnum = lfirst_int(lc); - Datum value; - bool isnull; + j = (tupleid != NULL) ? 1 : 0; + foreach(lc, fmstate->target_attrs) + { + int attnum = lfirst_int(lc); + Datum value; + bool isnull; - value = slot_getattr(slot, attnum, &isnull); - if (isnull) - p_values[pindex] = NULL; - else - p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex], - value); - pindex++; + value = slot_getattr(slots[i], attnum, &isnull); + if (isnull) + p_values[pindex] = NULL; + else + p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[j], + value); + pindex++; + j++; + } } reset_transmission_modes(nestlevel); } - Assert(pindex == fmstate->p_nums); + Assert(pindex == fmstate->p_nums * numSlots); MemoryContextSwitchTo(oldcontext); @@ -3870,29 +4031,41 @@ finish_foreign_modify(PgFdwModifyState *fmstate) Assert(fmstate != NULL); /* If we created a prepared statement, destroy it */ - if (fmstate->p_name) - { - char sql[64]; - PGresult *res; - - snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name); - - /* - * We don't use a PG_TRY block here, so be careful not to throw error - * without releasing the PGresult. - */ - res = pgfdw_exec_query(fmstate->conn, sql); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - pgfdw_report_error(ERROR, res, fmstate->conn, true, sql); - PQclear(res); - fmstate->p_name = NULL; - } + deallocate_query(fmstate); /* Release remote connection */ ReleaseConnection(fmstate->conn); fmstate->conn = NULL; } +/* + * deallocate_query + * Deallocate a prepared statement for a foreign insert/update/delete + * operation + */ +static void +deallocate_query(PgFdwModifyState *fmstate) +{ + char sql[64]; + PGresult *res; + + /* do nothing if the query is not allocated */ + if (!fmstate->p_name) + return; + + snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name); + + /* + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + res = pgfdw_exec_query(fmstate->conn, sql); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, fmstate->conn, true, sql); + PQclear(res); + fmstate->p_name = NULL; +} + /* * build_remote_returning * Build a RETURNING targetlist of a remote query for performing an @@ -6577,3 +6750,45 @@ find_em_expr_for_input_target(PlannerInfo *root, elog(ERROR, "could not find pathkey item to sort"); return NULL; /* keep compiler quiet */ } + +/* + * Determine batch size for a given foreign table. The option specified for + * a table has precedence. + */ +static int +get_batch_size_option(Relation rel) +{ + Oid foreigntableid = RelationGetRelid(rel); + ForeignTable *table; + ForeignServer *server; + List *options; + ListCell *lc; + + /* we use 1 by default, which means "no batching" */ + int batch_size = 1; + + /* + * Load options for table and server. We append server options after + * table options, because table options take precedence. + */ + table = GetForeignTable(foreigntableid); + server = GetForeignServer(table->serverid); + + options = NIL; + options = list_concat(options, table->options); + options = list_concat(options, server->options); + + /* See if either table or server specifies batch_size. */ + foreach(lc, options) + { + DefElem *def = (DefElem *) lfirst(lc); + + if (strcmp(def->defname, "batch_size") == 0) + { + batch_size = strtol(defGetString(def), NULL, 10); + break; + } + } + + return batch_size; +} -- cgit v1.2.3