aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/postgres_fdw/expected/postgres_fdw.out123
-rw-r--r--contrib/postgres_fdw/sql/postgres_fdw.sql86
-rw-r--r--src/backend/executor/execMain.c1
-rw-r--r--src/backend/executor/execPartition.c7
-rw-r--r--src/backend/executor/execUtils.c1
-rw-r--r--src/backend/executor/nodeModifyTable.c92
-rw-r--r--src/include/nodes/execnodes.h9
7 files changed, 298 insertions, 21 deletions
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index d4c8c5692c6..ee231cedcf4 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -10168,7 +10168,130 @@ SELECT * FROM batch_table ORDER BY x;
50 | test50 | test50
(50 rows)
+-- Clean up
+DROP TABLE batch_table;
+DROP TABLE batch_table_p0;
+DROP TABLE batch_table_p1;
ALTER SERVER loopback OPTIONS (DROP batch_size);
+-- Test that pending inserts are handled properly when needed
+CREATE TABLE batch_table (a text, b int);
+CREATE FOREIGN TABLE ftable (a text, b int)
+ SERVER loopback
+ OPTIONS (table_name 'batch_table', batch_size '2');
+CREATE TABLE ltable (a text, b int);
+CREATE FUNCTION ftable_rowcount_trigf() RETURNS trigger LANGUAGE plpgsql AS
+$$
+begin
+ raise notice '%: there are % rows in ftable',
+ TG_NAME, (SELECT count(*) FROM ftable);
+ if TG_OP = 'DELETE' then
+ return OLD;
+ else
+ return NEW;
+ end if;
+end;
+$$;
+CREATE TRIGGER ftable_rowcount_trigger
+BEFORE INSERT OR UPDATE OR DELETE ON ltable
+FOR EACH ROW EXECUTE PROCEDURE ftable_rowcount_trigf();
+WITH t AS (
+ INSERT INTO ltable VALUES ('AAA', 42), ('BBB', 42) RETURNING *
+)
+INSERT INTO ftable SELECT * FROM t;
+NOTICE: ftable_rowcount_trigger: there are 0 rows in ftable
+NOTICE: ftable_rowcount_trigger: there are 1 rows in ftable
+SELECT * FROM ltable;
+ a | b
+-----+----
+ AAA | 42
+ BBB | 42
+(2 rows)
+
+SELECT * FROM ftable;
+ a | b
+-----+----
+ AAA | 42
+ BBB | 42
+(2 rows)
+
+DELETE FROM ftable;
+WITH t AS (
+ UPDATE ltable SET b = b + 100 RETURNING *
+)
+INSERT INTO ftable SELECT * FROM t;
+NOTICE: ftable_rowcount_trigger: there are 0 rows in ftable
+NOTICE: ftable_rowcount_trigger: there are 1 rows in ftable
+SELECT * FROM ltable;
+ a | b
+-----+-----
+ AAA | 142
+ BBB | 142
+(2 rows)
+
+SELECT * FROM ftable;
+ a | b
+-----+-----
+ AAA | 142
+ BBB | 142
+(2 rows)
+
+DELETE FROM ftable;
+WITH t AS (
+ DELETE FROM ltable RETURNING *
+)
+INSERT INTO ftable SELECT * FROM t;
+NOTICE: ftable_rowcount_trigger: there are 0 rows in ftable
+NOTICE: ftable_rowcount_trigger: there are 1 rows in ftable
+SELECT * FROM ltable;
+ a | b
+---+---
+(0 rows)
+
+SELECT * FROM ftable;
+ a | b
+-----+-----
+ AAA | 142
+ BBB | 142
+(2 rows)
+
+DELETE FROM ftable;
+-- Clean up
+DROP FOREIGN TABLE ftable;
+DROP TABLE batch_table;
+DROP TRIGGER ftable_rowcount_trigger ON ltable;
+DROP TABLE ltable;
+CREATE TABLE parent (a text, b int) PARTITION BY LIST (a);
+CREATE TABLE batch_table (a text, b int);
+CREATE FOREIGN TABLE ftable
+ PARTITION OF parent
+ FOR VALUES IN ('AAA')
+ SERVER loopback
+ OPTIONS (table_name 'batch_table', batch_size '2');
+CREATE TABLE ltable
+ PARTITION OF parent
+ FOR VALUES IN ('BBB');
+CREATE TRIGGER ftable_rowcount_trigger
+BEFORE INSERT ON ltable
+FOR EACH ROW EXECUTE PROCEDURE ftable_rowcount_trigf();
+INSERT INTO parent VALUES ('AAA', 42), ('BBB', 42), ('AAA', 42), ('BBB', 42);
+NOTICE: ftable_rowcount_trigger: there are 1 rows in ftable
+NOTICE: ftable_rowcount_trigger: there are 2 rows in ftable
+SELECT tableoid::regclass, * FROM parent;
+ tableoid | a | b
+----------+-----+----
+ ftable | AAA | 42
+ ftable | AAA | 42
+ ltable | BBB | 42
+ ltable | BBB | 42
+(4 rows)
+
+-- Clean up
+DROP FOREIGN TABLE ftable;
+DROP TABLE batch_table;
+DROP TRIGGER ftable_rowcount_trigger ON ltable;
+DROP TABLE ltable;
+DROP TABLE parent;
+DROP FUNCTION ftable_rowcount_trigf;
-- ===================================================================
-- test asynchronous execution
-- ===================================================================
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 94a7d367d12..258506b01a4 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -3205,8 +3205,94 @@ INSERT INTO batch_table SELECT i, 'test'||i, 'test'|| i FROM generate_series(1,
SELECT COUNT(*) FROM batch_table;
SELECT * FROM batch_table ORDER BY x;
+-- Clean up
+DROP TABLE batch_table;
+DROP TABLE batch_table_p0;
+DROP TABLE batch_table_p1;
+
ALTER SERVER loopback OPTIONS (DROP batch_size);
+-- Test that pending inserts are handled properly when needed
+CREATE TABLE batch_table (a text, b int);
+CREATE FOREIGN TABLE ftable (a text, b int)
+ SERVER loopback
+ OPTIONS (table_name 'batch_table', batch_size '2');
+CREATE TABLE ltable (a text, b int);
+CREATE FUNCTION ftable_rowcount_trigf() RETURNS trigger LANGUAGE plpgsql AS
+$$
+begin
+ raise notice '%: there are % rows in ftable',
+ TG_NAME, (SELECT count(*) FROM ftable);
+ if TG_OP = 'DELETE' then
+ return OLD;
+ else
+ return NEW;
+ end if;
+end;
+$$;
+CREATE TRIGGER ftable_rowcount_trigger
+BEFORE INSERT OR UPDATE OR DELETE ON ltable
+FOR EACH ROW EXECUTE PROCEDURE ftable_rowcount_trigf();
+
+WITH t AS (
+ INSERT INTO ltable VALUES ('AAA', 42), ('BBB', 42) RETURNING *
+)
+INSERT INTO ftable SELECT * FROM t;
+
+SELECT * FROM ltable;
+SELECT * FROM ftable;
+DELETE FROM ftable;
+
+WITH t AS (
+ UPDATE ltable SET b = b + 100 RETURNING *
+)
+INSERT INTO ftable SELECT * FROM t;
+
+SELECT * FROM ltable;
+SELECT * FROM ftable;
+DELETE FROM ftable;
+
+WITH t AS (
+ DELETE FROM ltable RETURNING *
+)
+INSERT INTO ftable SELECT * FROM t;
+
+SELECT * FROM ltable;
+SELECT * FROM ftable;
+DELETE FROM ftable;
+
+-- Clean up
+DROP FOREIGN TABLE ftable;
+DROP TABLE batch_table;
+DROP TRIGGER ftable_rowcount_trigger ON ltable;
+DROP TABLE ltable;
+
+CREATE TABLE parent (a text, b int) PARTITION BY LIST (a);
+CREATE TABLE batch_table (a text, b int);
+CREATE FOREIGN TABLE ftable
+ PARTITION OF parent
+ FOR VALUES IN ('AAA')
+ SERVER loopback
+ OPTIONS (table_name 'batch_table', batch_size '2');
+CREATE TABLE ltable
+ PARTITION OF parent
+ FOR VALUES IN ('BBB');
+CREATE TRIGGER ftable_rowcount_trigger
+BEFORE INSERT ON ltable
+FOR EACH ROW EXECUTE PROCEDURE ftable_rowcount_trigf();
+
+INSERT INTO parent VALUES ('AAA', 42), ('BBB', 42), ('AAA', 42), ('BBB', 42);
+
+SELECT tableoid::regclass, * FROM parent;
+
+-- Clean up
+DROP FOREIGN TABLE ftable;
+DROP TABLE batch_table;
+DROP TRIGGER ftable_rowcount_trigger ON ltable;
+DROP TABLE ltable;
+DROP TABLE parent;
+DROP FUNCTION ftable_rowcount_trigf;
+
-- ===================================================================
-- test asynchronous execution
-- ===================================================================
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index b3ce4bae530..83d21d612b1 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -1257,6 +1257,7 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
resultRelInfo->ri_ChildToRootMap = NULL;
resultRelInfo->ri_ChildToRootMapValid = false;
resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
+ resultRelInfo->ri_ModifyTableState = NULL;
}
/*
diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c
index 606c920b068..216da08d0cf 100644
--- a/src/backend/executor/execPartition.c
+++ b/src/backend/executor/execPartition.c
@@ -934,6 +934,13 @@ ExecInitRoutingInfo(ModifyTableState *mtstate,
Assert(partRelInfo->ri_BatchSize >= 1);
+ /*
+ * If doing batch insert, setup back-link so we can easily find the
+ * mtstate again.
+ */
+ if (partRelInfo->ri_BatchSize > 1)
+ partRelInfo->ri_ModifyTableState = mtstate;
+
partRelInfo->ri_CopyMultiInsertBuffer = NULL;
/*
diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c
index ad11392b99d..64a8c2e5931 100644
--- a/src/backend/executor/execUtils.c
+++ b/src/backend/executor/execUtils.c
@@ -127,6 +127,7 @@ CreateExecutorState(void)
estate->es_result_relations = NULL;
estate->es_opened_result_relations = NIL;
estate->es_tuple_routing_result_relations = NIL;
+ estate->es_insert_pending_result_relations = NIL;
estate->es_trig_target_relations = NIL;
estate->es_param_list_info = NULL;
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 37ba4755cbc..ee0f0422040 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -67,6 +67,7 @@ static void ExecBatchInsert(ModifyTableState *mtstate,
int numSlots,
EState *estate,
bool canSetTag);
+static void ExecPendingInserts(EState *estate);
static bool ExecOnConflictUpdate(ModifyTableState *mtstate,
ResultRelInfo *resultRelInfo,
ItemPointer conflictTid,
@@ -645,6 +646,10 @@ ExecInsert(ModifyTableState *mtstate,
if (resultRelInfo->ri_TrigDesc &&
resultRelInfo->ri_TrigDesc->trig_insert_before_row)
{
+ /* Flush any pending inserts, so rows are visible to the triggers */
+ if (estate->es_insert_pending_result_relations != NIL)
+ ExecPendingInserts(estate);
+
if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
return NULL; /* "do nothing" */
}
@@ -678,6 +683,8 @@ ExecInsert(ModifyTableState *mtstate,
*/
if (resultRelInfo->ri_BatchSize > 1)
{
+ bool flushed = false;
+
/*
* When we've reached the desired batch size, perform the
* insertion.
@@ -690,6 +697,7 @@ ExecInsert(ModifyTableState *mtstate,
resultRelInfo->ri_NumSlots,
estate, canSetTag);
resultRelInfo->ri_NumSlots = 0;
+ flushed = true;
}
oldContext = MemoryContextSwitchTo(estate->es_query_cxt);
@@ -732,6 +740,24 @@ ExecInsert(ModifyTableState *mtstate,
ExecCopySlot(resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots],
planSlot);
+ /*
+ * If these are the first tuples stored in the buffers, add the
+ * target rel to the es_insert_pending_result_relations list,
+ * except in the case where flushing was done above, in which case
+ * the target rel would already have been added to the list, so no
+ * need to do this.
+ */
+ if (resultRelInfo->ri_NumSlots == 0 && !flushed)
+ {
+ Assert(!list_member_ptr(estate->es_insert_pending_result_relations,
+ resultRelInfo));
+ estate->es_insert_pending_result_relations =
+ lappend(estate->es_insert_pending_result_relations,
+ resultRelInfo);
+ }
+ Assert(list_member_ptr(estate->es_insert_pending_result_relations,
+ resultRelInfo));
+
resultRelInfo->ri_NumSlots++;
MemoryContextSwitchTo(oldContext);
@@ -1034,9 +1060,8 @@ ExecBatchInsert(ModifyTableState *mtstate,
slot = rslots[i];
/*
- * AFTER ROW Triggers or RETURNING expressions might reference the
- * tableoid column, so (re-)initialize tts_tableOid before evaluating
- * them.
+ * AFTER ROW Triggers might reference the tableoid column, so
+ * (re-)initialize tts_tableOid before evaluating them.
*/
slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
@@ -1107,6 +1132,10 @@ ExecDelete(ModifyTableState *mtstate,
{
bool dodelete;
+ /* Flush any pending inserts, so rows are visible to the triggers */
+ if (estate->es_insert_pending_result_relations != NIL)
+ ExecPendingInserts(estate);
+
dodelete = ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
tupleid, oldtuple, epqreturnslot);
@@ -1411,6 +1440,32 @@ ldelete:;
}
/*
+ * ExecPendingInserts -- flushes all pending inserts to the foreign tables
+ */
+static void
+ExecPendingInserts(EState *estate)
+{
+ ListCell *lc;
+
+ foreach(lc, estate->es_insert_pending_result_relations)
+ {
+ ResultRelInfo *resultRelInfo = (ResultRelInfo *) lfirst(lc);
+ ModifyTableState *mtstate = resultRelInfo->ri_ModifyTableState;
+
+ Assert(mtstate);
+ ExecBatchInsert(mtstate, resultRelInfo,
+ resultRelInfo->ri_Slots,
+ resultRelInfo->ri_PlanSlots,
+ resultRelInfo->ri_NumSlots,
+ estate, mtstate->canSetTag);
+ resultRelInfo->ri_NumSlots = 0;
+ }
+
+ list_free(estate->es_insert_pending_result_relations);
+ estate->es_insert_pending_result_relations = NIL;
+}
+
+/*
* ExecCrossPartitionUpdate --- Move an updated tuple to another partition.
*
* This works by first deleting the old tuple from the current partition,
@@ -1634,6 +1689,10 @@ ExecUpdate(ModifyTableState *mtstate,
if (resultRelInfo->ri_TrigDesc &&
resultRelInfo->ri_TrigDesc->trig_update_before_row)
{
+ /* Flush any pending inserts, so rows are visible to the triggers */
+ if (estate->es_insert_pending_result_relations != NIL)
+ ExecPendingInserts(estate);
+
if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
tupleid, oldtuple, slot))
return NULL; /* "do nothing" */
@@ -2361,9 +2420,6 @@ ExecModifyTable(PlanState *pstate)
ItemPointerData tuple_ctid;
HeapTupleData oldtupdata;
HeapTuple oldtuple;
- PartitionTupleRouting *proute = node->mt_partition_tuple_routing;
- List *relinfos = NIL;
- ListCell *lc;
CHECK_FOR_INTERRUPTS();
@@ -2620,21 +2676,8 @@ ExecModifyTable(PlanState *pstate)
/*
* Insert remaining tuples for batch insert.
*/
- if (proute)
- relinfos = estate->es_tuple_routing_result_relations;
- else
- relinfos = estate->es_opened_result_relations;
-
- foreach(lc, relinfos)
- {
- resultRelInfo = lfirst(lc);
- if (resultRelInfo->ri_NumSlots > 0)
- ExecBatchInsert(node, resultRelInfo,
- resultRelInfo->ri_Slots,
- resultRelInfo->ri_PlanSlots,
- resultRelInfo->ri_NumSlots,
- estate, node->canSetTag);
- }
+ if (estate->es_insert_pending_result_relations != NIL)
+ ExecPendingInserts(estate);
/*
* We're done, but fire AFTER STATEMENT triggers before exiting.
@@ -3140,6 +3183,13 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
}
else
resultRelInfo->ri_BatchSize = 1;
+
+ /*
+ * If doing batch insert, setup back-link so we can easily find the
+ * mtstate again.
+ */
+ if (resultRelInfo->ri_BatchSize > 1)
+ resultRelInfo->ri_ModifyTableState = mtstate;
}
/*
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 3dfac5bd5fe..71d0cf44dd5 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -524,6 +524,9 @@ typedef struct ResultRelInfo
/* for use by copyfrom.c when performing multi-inserts */
struct CopyMultiInsertBuffer *ri_CopyMultiInsertBuffer;
+
+ /* for use by nodeModifyTable.c when performing batch-inserts */
+ struct ModifyTableState *ri_ModifyTableState;
} ResultRelInfo;
/* ----------------
@@ -645,6 +648,12 @@ typedef struct EState
int es_jit_flags;
struct JitContext *es_jit;
struct JitInstrumentation *es_jit_worker_instr;
+
+ /*
+ * The following list contains ResultRelInfos for foreign tables on which
+ * batch-inserts are to be executed.
+ */
+ List *es_insert_pending_result_relations;
} EState;