aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/executor/execPartition.c17
-rw-r--r--src/backend/executor/nodeModifyTable.c160
-rw-r--r--src/backend/nodes/list.c15
-rw-r--r--src/include/foreign/fdwapi.h10
-rw-r--r--src/include/nodes/execnodes.h6
-rw-r--r--src/include/nodes/pg_list.h15
6 files changed, 223 insertions, 0 deletions
diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c
index 941731a0a9b..1746cb87936 100644
--- a/src/backend/executor/execPartition.c
+++ b/src/backend/executor/execPartition.c
@@ -993,6 +993,23 @@ ExecInitRoutingInfo(ModifyTableState *mtstate,
partRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
partRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate, partRelInfo);
+ /*
+ * Determine if the FDW supports batch insert and determine the batch
+ * size (a FDW may support batching, but it may be disabled for the
+ * server/table or for this particular query).
+ *
+ * If the FDW does not support batching, we set the batch size to 1.
+ */
+ if (partRelInfo->ri_FdwRoutine != NULL &&
+ partRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize &&
+ partRelInfo->ri_FdwRoutine->ExecForeignBatchInsert)
+ partRelInfo->ri_BatchSize =
+ partRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(partRelInfo);
+ else
+ partRelInfo->ri_BatchSize = 1;
+
+ Assert(partRelInfo->ri_BatchSize >= 1);
+
partRelInfo->ri_CopyMultiInsertBuffer = NULL;
/*
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 921e6954194..9c36860704a 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -58,6 +58,13 @@
#include "utils/rel.h"
+static void ExecBatchInsert(ModifyTableState *mtstate,
+ ResultRelInfo *resultRelInfo,
+ TupleTableSlot **slots,
+ TupleTableSlot **planSlots,
+ int numSlots,
+ EState *estate,
+ bool canSetTag);
static bool ExecOnConflictUpdate(ModifyTableState *mtstate,
ResultRelInfo *resultRelInfo,
ItemPointer conflictTid,
@@ -389,6 +396,7 @@ ExecInsert(ModifyTableState *mtstate,
ModifyTable *node = (ModifyTable *) mtstate->ps.plan;
OnConflictAction onconflict = node->onConflictAction;
PartitionTupleRouting *proute = mtstate->mt_partition_tuple_routing;
+ MemoryContext oldContext;
/*
* If the input result relation is a partitioned table, find the leaf
@@ -442,6 +450,55 @@ ExecInsert(ModifyTableState *mtstate,
CMD_INSERT);
/*
+ * If the FDW supports batching, and batching is requested, accumulate
+ * rows and insert them in batches. Otherwise use the per-row inserts.
+ */
+ if (resultRelInfo->ri_BatchSize > 1)
+ {
+ /*
+ * If a certain number of tuples have already been accumulated,
+ * or a tuple has come for a different relation than that for
+ * the accumulated tuples, perform the batch insert
+ */
+ if (resultRelInfo->ri_NumSlots == resultRelInfo->ri_BatchSize)
+ {
+ ExecBatchInsert(mtstate, resultRelInfo,
+ resultRelInfo->ri_Slots,
+ resultRelInfo->ri_PlanSlots,
+ resultRelInfo->ri_NumSlots,
+ estate, canSetTag);
+ resultRelInfo->ri_NumSlots = 0;
+ }
+
+ oldContext = MemoryContextSwitchTo(estate->es_query_cxt);
+
+ if (resultRelInfo->ri_Slots == NULL)
+ {
+ resultRelInfo->ri_Slots = palloc(sizeof(TupleTableSlot *) *
+ resultRelInfo->ri_BatchSize);
+ resultRelInfo->ri_PlanSlots = palloc(sizeof(TupleTableSlot *) *
+ resultRelInfo->ri_BatchSize);
+ }
+
+ resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots] =
+ MakeSingleTupleTableSlot(slot->tts_tupleDescriptor,
+ slot->tts_ops);
+ ExecCopySlot(resultRelInfo->ri_Slots[resultRelInfo->ri_NumSlots],
+ slot);
+ resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots] =
+ MakeSingleTupleTableSlot(planSlot->tts_tupleDescriptor,
+ planSlot->tts_ops);
+ ExecCopySlot(resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots],
+ planSlot);
+
+ resultRelInfo->ri_NumSlots++;
+
+ MemoryContextSwitchTo(oldContext);
+
+ return NULL;
+ }
+
+ /*
* insert into foreign table: let the FDW do it
*/
slot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
@@ -699,6 +756,70 @@ ExecInsert(ModifyTableState *mtstate,
}
/* ----------------------------------------------------------------
+ * ExecBatchInsert
+ *
+ * Insert multiple tuples in an efficient way.
+ * Currently, this handles inserting into a foreign table without
+ * RETURNING clause.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecBatchInsert(ModifyTableState *mtstate,
+ ResultRelInfo *resultRelInfo,
+ TupleTableSlot **slots,
+ TupleTableSlot **planSlots,
+ int numSlots,
+ EState *estate,
+ bool canSetTag)
+{
+ int i;
+ int numInserted = numSlots;
+ TupleTableSlot *slot = NULL;
+ TupleTableSlot **rslots;
+
+ /*
+ * insert into foreign table: let the FDW do it
+ */
+ rslots = resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate,
+ resultRelInfo,
+ slots,
+ planSlots,
+ &numInserted);
+
+ for (i = 0; i < numInserted; i++)
+ {
+ slot = rslots[i];
+
+ /*
+ * AFTER ROW Triggers or RETURNING expressions might reference the
+ * tableoid column, so (re-)initialize tts_tableOid before evaluating
+ * them.
+ */
+ slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+
+ /* AFTER ROW INSERT Triggers */
+ ExecARInsertTriggers(estate, resultRelInfo, slot, NIL,
+ mtstate->mt_transition_capture);
+
+ /*
+ * Check any WITH CHECK OPTION constraints from parent views. See the
+ * comment in ExecInsert.
+ */
+ if (resultRelInfo->ri_WithCheckOptions != NIL)
+ ExecWithCheckOptions(WCO_VIEW_CHECK, resultRelInfo, slot, estate);
+ }
+
+ if (canSetTag && numInserted > 0)
+ estate->es_processed += numInserted;
+
+ for (i = 0; i < numSlots; i++)
+ {
+ ExecDropSingleTupleTableSlot(slots[i]);
+ ExecDropSingleTupleTableSlot(planSlots[i]);
+ }
+}
+
+/* ----------------------------------------------------------------
* ExecDelete
*
* DELETE is like UPDATE, except that we delete the tuple and no
@@ -1937,6 +2058,9 @@ ExecModifyTable(PlanState *pstate)
ItemPointerData tuple_ctid;
HeapTupleData oldtupdata;
HeapTuple oldtuple;
+ PartitionTupleRouting *proute = node->mt_partition_tuple_routing;
+ List *relinfos = NIL;
+ ListCell *lc;
CHECK_FOR_INTERRUPTS();
@@ -2153,6 +2277,25 @@ ExecModifyTable(PlanState *pstate)
}
/*
+ * Insert remaining tuples for batch insert.
+ */
+ if (proute)
+ relinfos = estate->es_tuple_routing_result_relations;
+ else
+ relinfos = estate->es_opened_result_relations;
+
+ foreach(lc, relinfos)
+ {
+ resultRelInfo = lfirst(lc);
+ if (resultRelInfo->ri_NumSlots > 0)
+ ExecBatchInsert(node, resultRelInfo,
+ resultRelInfo->ri_Slots,
+ resultRelInfo->ri_PlanSlots,
+ resultRelInfo->ri_NumSlots,
+ estate, node->canSetTag);
+ }
+
+ /*
* We're done, but fire AFTER STATEMENT triggers before exiting.
*/
fireASTriggers(node);
@@ -2651,6 +2794,23 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
}
/*
+ * Determine if the FDW supports batch insert and determine the batch
+ * size (a FDW may support batching, but it may be disabled for the
+ * server/table).
+ */
+ if (!resultRelInfo->ri_usesFdwDirectModify &&
+ operation == CMD_INSERT &&
+ resultRelInfo->ri_FdwRoutine != NULL &&
+ resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize &&
+ resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert)
+ resultRelInfo->ri_BatchSize =
+ resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo);
+ else
+ resultRelInfo->ri_BatchSize = 1;
+
+ Assert(resultRelInfo->ri_BatchSize >= 1);
+
+ /*
* Lastly, if this is not the primary (canSetTag) ModifyTable node, add it
* to estate->es_auxmodifytables so that it will be run to completion by
* ExecPostprocessPlan. (It'd actually work fine to add the primary
diff --git a/src/backend/nodes/list.c b/src/backend/nodes/list.c
index c4eba6b053f..dbf6b30233a 100644
--- a/src/backend/nodes/list.c
+++ b/src/backend/nodes/list.c
@@ -277,6 +277,21 @@ list_make4_impl(NodeTag t, ListCell datum1, ListCell datum2,
return list;
}
+List *
+list_make5_impl(NodeTag t, ListCell datum1, ListCell datum2,
+ ListCell datum3, ListCell datum4, ListCell datum5)
+{
+ List *list = new_list(t, 5);
+
+ list->elements[0] = datum1;
+ list->elements[1] = datum2;
+ list->elements[2] = datum3;
+ list->elements[3] = datum4;
+ list->elements[4] = datum5;
+ check_list_invariants(list);
+ return list;
+}
+
/*
* Make room for a new head cell in the given (non-NIL) list.
*
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index 2953499fb10..248f78da452 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -85,6 +85,14 @@ typedef TupleTableSlot *(*ExecForeignInsert_function) (EState *estate,
TupleTableSlot *slot,
TupleTableSlot *planSlot);
+typedef TupleTableSlot **(*ExecForeignBatchInsert_function) (EState *estate,
+ ResultRelInfo *rinfo,
+ TupleTableSlot **slots,
+ TupleTableSlot **planSlots,
+ int *numSlots);
+
+typedef int (*GetForeignModifyBatchSize_function) (ResultRelInfo *rinfo);
+
typedef TupleTableSlot *(*ExecForeignUpdate_function) (EState *estate,
ResultRelInfo *rinfo,
TupleTableSlot *slot,
@@ -209,6 +217,8 @@ typedef struct FdwRoutine
PlanForeignModify_function PlanForeignModify;
BeginForeignModify_function BeginForeignModify;
ExecForeignInsert_function ExecForeignInsert;
+ ExecForeignBatchInsert_function ExecForeignBatchInsert;
+ GetForeignModifyBatchSize_function GetForeignModifyBatchSize;
ExecForeignUpdate_function ExecForeignUpdate;
ExecForeignDelete_function ExecForeignDelete;
EndForeignModify_function EndForeignModify;
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 48c3f570fa9..d65099c94aa 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -446,6 +446,12 @@ typedef struct ResultRelInfo
/* true when modifying foreign table directly */
bool ri_usesFdwDirectModify;
+ /* batch insert stuff */
+ int ri_NumSlots; /* number of slots in the array */
+ int ri_BatchSize; /* max slots inserted in a single batch */
+ TupleTableSlot **ri_Slots; /* input tuples for batch insert */
+ TupleTableSlot **ri_PlanSlots;
+
/* list of WithCheckOption's to be checked */
List *ri_WithCheckOptions;
diff --git a/src/include/nodes/pg_list.h b/src/include/nodes/pg_list.h
index 710dcd37ef4..404e03f132d 100644
--- a/src/include/nodes/pg_list.h
+++ b/src/include/nodes/pg_list.h
@@ -213,6 +213,10 @@ list_length(const List *l)
#define list_make4(x1,x2,x3,x4) \
list_make4_impl(T_List, list_make_ptr_cell(x1), list_make_ptr_cell(x2), \
list_make_ptr_cell(x3), list_make_ptr_cell(x4))
+#define list_make5(x1,x2,x3,x4,x5) \
+ list_make5_impl(T_List, list_make_ptr_cell(x1), list_make_ptr_cell(x2), \
+ list_make_ptr_cell(x3), list_make_ptr_cell(x4), \
+ list_make_ptr_cell(x5))
#define list_make1_int(x1) \
list_make1_impl(T_IntList, list_make_int_cell(x1))
@@ -224,6 +228,10 @@ list_length(const List *l)
#define list_make4_int(x1,x2,x3,x4) \
list_make4_impl(T_IntList, list_make_int_cell(x1), list_make_int_cell(x2), \
list_make_int_cell(x3), list_make_int_cell(x4))
+#define list_make5_int(x1,x2,x3,x4,x5) \
+ list_make5_impl(T_IntList, list_make_int_cell(x1), list_make_int_cell(x2), \
+ list_make_int_cell(x3), list_make_int_cell(x4), \
+ list_make_int_cell(x5))
#define list_make1_oid(x1) \
list_make1_impl(T_OidList, list_make_oid_cell(x1))
@@ -235,6 +243,10 @@ list_length(const List *l)
#define list_make4_oid(x1,x2,x3,x4) \
list_make4_impl(T_OidList, list_make_oid_cell(x1), list_make_oid_cell(x2), \
list_make_oid_cell(x3), list_make_oid_cell(x4))
+#define list_make5_oid(x1,x2,x3,x4,x5) \
+ list_make5_impl(T_OidList, list_make_oid_cell(x1), list_make_oid_cell(x2), \
+ list_make_oid_cell(x3), list_make_oid_cell(x4), \
+ list_make_oid_cell(x5))
/*
* Locate the n'th cell (counting from 0) of the list.
@@ -520,6 +532,9 @@ extern List *list_make3_impl(NodeTag t, ListCell datum1, ListCell datum2,
ListCell datum3);
extern List *list_make4_impl(NodeTag t, ListCell datum1, ListCell datum2,
ListCell datum3, ListCell datum4);
+extern List *list_make5_impl(NodeTag t, ListCell datum1, ListCell datum2,
+ ListCell datum3, ListCell datum4,
+ ListCell datum5);
extern pg_nodiscard List *lappend(List *list, void *datum);
extern pg_nodiscard List *lappend_int(List *list, int datum);