diff options
Diffstat (limited to 'contrib/postgres_fdw')
-rw-r--r-- | contrib/postgres_fdw/expected/postgres_fdw.out | 334 | ||||
-rw-r--r-- | contrib/postgres_fdw/postgres_fdw.c | 96 | ||||
-rw-r--r-- | contrib/postgres_fdw/sql/postgres_fdw.sql | 237 |
3 files changed, 667 insertions, 0 deletions
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index fa0d1db5fbc..e4d9469fdd3 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -7371,6 +7371,340 @@ NOTICE: drop cascades to foreign table bar2 drop table loct1; drop table loct2; -- =================================================================== +-- test tuple routing for foreign-table partitions +-- =================================================================== +-- Test insert tuple routing +create table itrtest (a int, b text) partition by list (a); +create table loct1 (a int check (a in (1)), b text); +create foreign table remp1 (a int check (a in (1)), b text) server loopback options (table_name 'loct1'); +create table loct2 (a int check (a in (2)), b text); +create foreign table remp2 (b text, a int check (a in (2))) server loopback options (table_name 'loct2'); +alter table itrtest attach partition remp1 for values in (1); +alter table itrtest attach partition remp2 for values in (2); +insert into itrtest values (1, 'foo'); +insert into itrtest values (1, 'bar') returning *; + a | b +---+----- + 1 | bar +(1 row) + +insert into itrtest values (2, 'baz'); +insert into itrtest values (2, 'qux') returning *; + a | b +---+----- + 2 | qux +(1 row) + +insert into itrtest values (1, 'test1'), (2, 'test2') returning *; + a | b +---+------- + 1 | test1 + 2 | test2 +(2 rows) + +select tableoid::regclass, * FROM itrtest; + tableoid | a | b +----------+---+------- + remp1 | 1 | foo + remp1 | 1 | bar + remp1 | 1 | test1 + remp2 | 2 | baz + remp2 | 2 | qux + remp2 | 2 | test2 +(6 rows) + +select tableoid::regclass, * FROM remp1; + tableoid | a | b +----------+---+------- + remp1 | 1 | foo + remp1 | 1 | bar + remp1 | 1 | test1 +(3 rows) + +select tableoid::regclass, * FROM remp2; + tableoid | b | a +----------+-------+--- + remp2 | baz | 2 + remp2 | qux | 2 + remp2 | test2 | 2 +(3 rows) + +delete from itrtest; +create unique index loct1_idx on loct1 (a); +-- DO NOTHING without an inference specification is supported +insert into itrtest values (1, 'foo') on conflict do nothing returning *; + a | b +---+----- + 1 | foo +(1 row) + +insert into itrtest values (1, 'foo') on conflict do nothing returning *; + a | b +---+--- +(0 rows) + +-- But other cases are not supported +insert into itrtest values (1, 'bar') on conflict (a) do nothing; +ERROR: there is no unique or exclusion constraint matching the ON CONFLICT specification +insert into itrtest values (1, 'bar') on conflict (a) do update set b = excluded.b; +ERROR: there is no unique or exclusion constraint matching the ON CONFLICT specification +select tableoid::regclass, * FROM itrtest; + tableoid | a | b +----------+---+----- + remp1 | 1 | foo +(1 row) + +drop table itrtest; +drop table loct1; +drop table loct2; +-- Test update tuple routing +create table utrtest (a int, b text) partition by list (a); +create table loct (a int check (a in (1)), b text); +create foreign table remp (a int check (a in (1)), b text) server loopback options (table_name 'loct'); +create table locp (a int check (a in (2)), b text); +alter table utrtest attach partition remp for values in (1); +alter table utrtest attach partition locp for values in (2); +insert into utrtest values (1, 'foo'); +insert into utrtest values (2, 'qux'); +select tableoid::regclass, * FROM utrtest; + tableoid | a | b +----------+---+----- + remp | 1 | foo + locp | 2 | qux +(2 rows) + +select tableoid::regclass, * FROM remp; + tableoid | a | b +----------+---+----- + remp | 1 | foo +(1 row) + +select tableoid::regclass, * FROM locp; + tableoid | a | b +----------+---+----- + locp | 2 | qux +(1 row) + +-- It's not allowed to move a row from a partition that is foreign to another +update utrtest set a = 2 where b = 'foo' returning *; +ERROR: new row for relation "loct" violates check constraint "loct_a_check" +DETAIL: Failing row contains (2, foo). +CONTEXT: remote SQL command: UPDATE public.loct SET a = 2 WHERE ((b = 'foo'::text)) RETURNING a, b +-- But the reverse is allowed +update utrtest set a = 1 where b = 'qux' returning *; + a | b +---+----- + 1 | qux +(1 row) + +select tableoid::regclass, * FROM utrtest; + tableoid | a | b +----------+---+----- + remp | 1 | foo + remp | 1 | qux +(2 rows) + +select tableoid::regclass, * FROM remp; + tableoid | a | b +----------+---+----- + remp | 1 | foo + remp | 1 | qux +(2 rows) + +select tableoid::regclass, * FROM locp; + tableoid | a | b +----------+---+--- +(0 rows) + +-- The executor should not let unexercised FDWs shut down +update utrtest set a = 1 where b = 'foo'; +drop table utrtest; +drop table loct; +-- Test copy tuple routing +create table ctrtest (a int, b text) partition by list (a); +create table loct1 (a int check (a in (1)), b text); +create foreign table remp1 (a int check (a in (1)), b text) server loopback options (table_name 'loct1'); +create table loct2 (a int check (a in (2)), b text); +create foreign table remp2 (b text, a int check (a in (2))) server loopback options (table_name 'loct2'); +alter table ctrtest attach partition remp1 for values in (1); +alter table ctrtest attach partition remp2 for values in (2); +copy ctrtest from stdin; +select tableoid::regclass, * FROM ctrtest; + tableoid | a | b +----------+---+----- + remp1 | 1 | foo + remp2 | 2 | qux +(2 rows) + +select tableoid::regclass, * FROM remp1; + tableoid | a | b +----------+---+----- + remp1 | 1 | foo +(1 row) + +select tableoid::regclass, * FROM remp2; + tableoid | b | a +----------+-----+--- + remp2 | qux | 2 +(1 row) + +-- Copying into foreign partitions directly should work as well +copy remp1 from stdin; +select tableoid::regclass, * FROM remp1; + tableoid | a | b +----------+---+----- + remp1 | 1 | foo + remp1 | 1 | bar +(2 rows) + +drop table ctrtest; +drop table loct1; +drop table loct2; +-- =================================================================== +-- test COPY FROM +-- =================================================================== +create table loc2 (f1 int, f2 text); +alter table loc2 set (autovacuum_enabled = 'false'); +create foreign table rem2 (f1 int, f2 text) server loopback options(table_name 'loc2'); +-- Test basic functionality +copy rem2 from stdin; +select * from rem2; + f1 | f2 +----+----- + 1 | foo + 2 | bar +(2 rows) + +delete from rem2; +-- Test check constraints +alter table loc2 add constraint loc2_f1positive check (f1 >= 0); +alter foreign table rem2 add constraint rem2_f1positive check (f1 >= 0); +-- check constraint is enforced on the remote side, not locally +copy rem2 from stdin; +copy rem2 from stdin; -- ERROR +ERROR: new row for relation "loc2" violates check constraint "loc2_f1positive" +DETAIL: Failing row contains (-1, xyzzy). +CONTEXT: remote SQL command: INSERT INTO public.loc2(f1, f2) VALUES ($1, $2) +COPY rem2, line 1: "-1 xyzzy" +select * from rem2; + f1 | f2 +----+----- + 1 | foo + 2 | bar +(2 rows) + +alter foreign table rem2 drop constraint rem2_f1positive; +alter table loc2 drop constraint loc2_f1positive; +delete from rem2; +-- Test local triggers +create trigger trig_stmt_before before insert on rem2 + for each statement execute procedure trigger_func(); +create trigger trig_stmt_after after insert on rem2 + for each statement execute procedure trigger_func(); +create trigger trig_row_before before insert on rem2 + for each row execute procedure trigger_data(23,'skidoo'); +create trigger trig_row_after after insert on rem2 + for each row execute procedure trigger_data(23,'skidoo'); +copy rem2 from stdin; +NOTICE: trigger_func(<NULL>) called: action = INSERT, when = BEFORE, level = STATEMENT +NOTICE: trig_row_before(23, skidoo) BEFORE ROW INSERT ON rem2 +NOTICE: NEW: (1,foo) +NOTICE: trig_row_before(23, skidoo) BEFORE ROW INSERT ON rem2 +NOTICE: NEW: (2,bar) +NOTICE: trig_row_after(23, skidoo) AFTER ROW INSERT ON rem2 +NOTICE: NEW: (1,foo) +NOTICE: trig_row_after(23, skidoo) AFTER ROW INSERT ON rem2 +NOTICE: NEW: (2,bar) +NOTICE: trigger_func(<NULL>) called: action = INSERT, when = AFTER, level = STATEMENT +select * from rem2; + f1 | f2 +----+----- + 1 | foo + 2 | bar +(2 rows) + +drop trigger trig_row_before on rem2; +drop trigger trig_row_after on rem2; +drop trigger trig_stmt_before on rem2; +drop trigger trig_stmt_after on rem2; +delete from rem2; +create trigger trig_row_before_insert before insert on rem2 + for each row execute procedure trig_row_before_insupdate(); +-- The new values are concatenated with ' triggered !' +copy rem2 from stdin; +select * from rem2; + f1 | f2 +----+----------------- + 1 | foo triggered ! + 2 | bar triggered ! +(2 rows) + +drop trigger trig_row_before_insert on rem2; +delete from rem2; +create trigger trig_null before insert on rem2 + for each row execute procedure trig_null(); +-- Nothing happens +copy rem2 from stdin; +select * from rem2; + f1 | f2 +----+---- +(0 rows) + +drop trigger trig_null on rem2; +delete from rem2; +-- Test remote triggers +create trigger trig_row_before_insert before insert on loc2 + for each row execute procedure trig_row_before_insupdate(); +-- The new values are concatenated with ' triggered !' +copy rem2 from stdin; +select * from rem2; + f1 | f2 +----+----------------- + 1 | foo triggered ! + 2 | bar triggered ! +(2 rows) + +drop trigger trig_row_before_insert on loc2; +delete from rem2; +create trigger trig_null before insert on loc2 + for each row execute procedure trig_null(); +-- Nothing happens +copy rem2 from stdin; +select * from rem2; + f1 | f2 +----+---- +(0 rows) + +drop trigger trig_null on loc2; +delete from rem2; +-- Test a combination of local and remote triggers +create trigger rem2_trig_row_before before insert on rem2 + for each row execute procedure trigger_data(23,'skidoo'); +create trigger rem2_trig_row_after after insert on rem2 + for each row execute procedure trigger_data(23,'skidoo'); +create trigger loc2_trig_row_before_insert before insert on loc2 + for each row execute procedure trig_row_before_insupdate(); +copy rem2 from stdin; +NOTICE: rem2_trig_row_before(23, skidoo) BEFORE ROW INSERT ON rem2 +NOTICE: NEW: (1,foo) +NOTICE: rem2_trig_row_before(23, skidoo) BEFORE ROW INSERT ON rem2 +NOTICE: NEW: (2,bar) +NOTICE: rem2_trig_row_after(23, skidoo) AFTER ROW INSERT ON rem2 +NOTICE: NEW: (1,"foo triggered !") +NOTICE: rem2_trig_row_after(23, skidoo) AFTER ROW INSERT ON rem2 +NOTICE: NEW: (2,"bar triggered !") +select * from rem2; + f1 | f2 +----+----------------- + 1 | foo triggered ! + 2 | bar triggered ! +(2 rows) + +drop trigger rem2_trig_row_before on rem2; +drop trigger rem2_trig_row_after on rem2; +drop trigger loc2_trig_row_before_insert on loc2; +delete from rem2; +-- =================================================================== -- test IMPORT FOREIGN SCHEMA -- =================================================================== CREATE SCHEMA import_source; diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index e7441c759ba..30e572632ee 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -319,6 +319,10 @@ static TupleTableSlot *postgresExecForeignDelete(EState *estate, TupleTableSlot *planSlot); static void postgresEndForeignModify(EState *estate, ResultRelInfo *resultRelInfo); +static void postgresBeginForeignInsert(ModifyTableState *mtstate, + ResultRelInfo *resultRelInfo); +static void postgresEndForeignInsert(EState *estate, + ResultRelInfo *resultRelInfo); static int postgresIsForeignRelUpdatable(Relation rel); static bool postgresPlanDirectModify(PlannerInfo *root, ModifyTable *plan, @@ -473,6 +477,8 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) routine->ExecForeignUpdate = postgresExecForeignUpdate; routine->ExecForeignDelete = postgresExecForeignDelete; routine->EndForeignModify = postgresEndForeignModify; + routine->BeginForeignInsert = postgresBeginForeignInsert; + routine->EndForeignInsert = postgresEndForeignInsert; routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable; routine->PlanDirectModify = postgresPlanDirectModify; routine->BeginDirectModify = postgresBeginDirectModify; @@ -1960,6 +1966,96 @@ postgresEndForeignModify(EState *estate, } /* + * postgresBeginForeignInsert + * Begin an insert operation on a foreign table + */ +static void +postgresBeginForeignInsert(ModifyTableState *mtstate, + ResultRelInfo *resultRelInfo) +{ + PgFdwModifyState *fmstate; + Plan *plan = mtstate->ps.plan; + Relation rel = resultRelInfo->ri_RelationDesc; + RangeTblEntry *rte; + Query *query; + PlannerInfo *root; + TupleDesc tupdesc = RelationGetDescr(rel); + int attnum; + StringInfoData sql; + List *targetAttrs = NIL; + List *retrieved_attrs = NIL; + bool doNothing = false; + + initStringInfo(&sql); + + /* Set up largely-dummy planner state. */ + rte = makeNode(RangeTblEntry); + rte->rtekind = RTE_RELATION; + rte->relid = RelationGetRelid(rel); + rte->relkind = RELKIND_FOREIGN_TABLE; + query = makeNode(Query); + query->commandType = CMD_INSERT; + query->resultRelation = 1; + query->rtable = list_make1(rte); + root = makeNode(PlannerInfo); + root->parse = query; + + /* We transmit all columns that are defined in the foreign table. */ + for (attnum = 1; attnum <= tupdesc->natts; attnum++) + { + Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); + + if (!attr->attisdropped) + targetAttrs = lappend_int(targetAttrs, attnum); + } + + /* Check if we add the ON CONFLICT clause to the remote query. */ + if (plan) + { + OnConflictAction onConflictAction = ((ModifyTable *) plan)->onConflictAction; + + /* We only support DO NOTHING without an inference specification. */ + if (onConflictAction == ONCONFLICT_NOTHING) + doNothing = true; + else if (onConflictAction != ONCONFLICT_NONE) + elog(ERROR, "unexpected ON CONFLICT specification: %d", + (int) onConflictAction); + } + + /* Construct the SQL command string. */ + deparseInsertSql(&sql, root, 1, rel, targetAttrs, doNothing, + resultRelInfo->ri_returningList, &retrieved_attrs); + + /* Construct an execution state. */ + fmstate = create_foreign_modify(mtstate->ps.state, + resultRelInfo, + CMD_INSERT, + NULL, + sql.data, + targetAttrs, + retrieved_attrs != NIL, + retrieved_attrs); + + resultRelInfo->ri_FdwState = fmstate; +} + +/* + * postgresEndForeignInsert + * Finish an insert operation on a foreign table + */ +static void +postgresEndForeignInsert(EState *estate, + ResultRelInfo *resultRelInfo) +{ + PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; + + Assert(fmstate != NULL); + + /* Destroy the execution state */ + finish_foreign_modify(fmstate); +} + +/* * postgresIsForeignRelUpdatable * Determine whether a foreign table supports INSERT, UPDATE and/or * DELETE. diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index cf32be4bfe2..e1df952e7af 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -1768,6 +1768,243 @@ drop table loct1; drop table loct2; -- =================================================================== +-- test tuple routing for foreign-table partitions +-- =================================================================== + +-- Test insert tuple routing +create table itrtest (a int, b text) partition by list (a); +create table loct1 (a int check (a in (1)), b text); +create foreign table remp1 (a int check (a in (1)), b text) server loopback options (table_name 'loct1'); +create table loct2 (a int check (a in (2)), b text); +create foreign table remp2 (b text, a int check (a in (2))) server loopback options (table_name 'loct2'); +alter table itrtest attach partition remp1 for values in (1); +alter table itrtest attach partition remp2 for values in (2); + +insert into itrtest values (1, 'foo'); +insert into itrtest values (1, 'bar') returning *; +insert into itrtest values (2, 'baz'); +insert into itrtest values (2, 'qux') returning *; +insert into itrtest values (1, 'test1'), (2, 'test2') returning *; + +select tableoid::regclass, * FROM itrtest; +select tableoid::regclass, * FROM remp1; +select tableoid::regclass, * FROM remp2; + +delete from itrtest; + +create unique index loct1_idx on loct1 (a); + +-- DO NOTHING without an inference specification is supported +insert into itrtest values (1, 'foo') on conflict do nothing returning *; +insert into itrtest values (1, 'foo') on conflict do nothing returning *; + +-- But other cases are not supported +insert into itrtest values (1, 'bar') on conflict (a) do nothing; +insert into itrtest values (1, 'bar') on conflict (a) do update set b = excluded.b; + +select tableoid::regclass, * FROM itrtest; + +drop table itrtest; +drop table loct1; +drop table loct2; + +-- Test update tuple routing +create table utrtest (a int, b text) partition by list (a); +create table loct (a int check (a in (1)), b text); +create foreign table remp (a int check (a in (1)), b text) server loopback options (table_name 'loct'); +create table locp (a int check (a in (2)), b text); +alter table utrtest attach partition remp for values in (1); +alter table utrtest attach partition locp for values in (2); + +insert into utrtest values (1, 'foo'); +insert into utrtest values (2, 'qux'); + +select tableoid::regclass, * FROM utrtest; +select tableoid::regclass, * FROM remp; +select tableoid::regclass, * FROM locp; + +-- It's not allowed to move a row from a partition that is foreign to another +update utrtest set a = 2 where b = 'foo' returning *; + +-- But the reverse is allowed +update utrtest set a = 1 where b = 'qux' returning *; + +select tableoid::regclass, * FROM utrtest; +select tableoid::regclass, * FROM remp; +select tableoid::regclass, * FROM locp; + +-- The executor should not let unexercised FDWs shut down +update utrtest set a = 1 where b = 'foo'; + +drop table utrtest; +drop table loct; + +-- Test copy tuple routing +create table ctrtest (a int, b text) partition by list (a); +create table loct1 (a int check (a in (1)), b text); +create foreign table remp1 (a int check (a in (1)), b text) server loopback options (table_name 'loct1'); +create table loct2 (a int check (a in (2)), b text); +create foreign table remp2 (b text, a int check (a in (2))) server loopback options (table_name 'loct2'); +alter table ctrtest attach partition remp1 for values in (1); +alter table ctrtest attach partition remp2 for values in (2); + +copy ctrtest from stdin; +1 foo +2 qux +\. + +select tableoid::regclass, * FROM ctrtest; +select tableoid::regclass, * FROM remp1; +select tableoid::regclass, * FROM remp2; + +-- Copying into foreign partitions directly should work as well +copy remp1 from stdin; +1 bar +\. + +select tableoid::regclass, * FROM remp1; + +drop table ctrtest; +drop table loct1; +drop table loct2; + +-- =================================================================== +-- test COPY FROM +-- =================================================================== + +create table loc2 (f1 int, f2 text); +alter table loc2 set (autovacuum_enabled = 'false'); +create foreign table rem2 (f1 int, f2 text) server loopback options(table_name 'loc2'); + +-- Test basic functionality +copy rem2 from stdin; +1 foo +2 bar +\. +select * from rem2; + +delete from rem2; + +-- Test check constraints +alter table loc2 add constraint loc2_f1positive check (f1 >= 0); +alter foreign table rem2 add constraint rem2_f1positive check (f1 >= 0); + +-- check constraint is enforced on the remote side, not locally +copy rem2 from stdin; +1 foo +2 bar +\. +copy rem2 from stdin; -- ERROR +-1 xyzzy +\. +select * from rem2; + +alter foreign table rem2 drop constraint rem2_f1positive; +alter table loc2 drop constraint loc2_f1positive; + +delete from rem2; + +-- Test local triggers +create trigger trig_stmt_before before insert on rem2 + for each statement execute procedure trigger_func(); +create trigger trig_stmt_after after insert on rem2 + for each statement execute procedure trigger_func(); +create trigger trig_row_before before insert on rem2 + for each row execute procedure trigger_data(23,'skidoo'); +create trigger trig_row_after after insert on rem2 + for each row execute procedure trigger_data(23,'skidoo'); + +copy rem2 from stdin; +1 foo +2 bar +\. +select * from rem2; + +drop trigger trig_row_before on rem2; +drop trigger trig_row_after on rem2; +drop trigger trig_stmt_before on rem2; +drop trigger trig_stmt_after on rem2; + +delete from rem2; + +create trigger trig_row_before_insert before insert on rem2 + for each row execute procedure trig_row_before_insupdate(); + +-- The new values are concatenated with ' triggered !' +copy rem2 from stdin; +1 foo +2 bar +\. +select * from rem2; + +drop trigger trig_row_before_insert on rem2; + +delete from rem2; + +create trigger trig_null before insert on rem2 + for each row execute procedure trig_null(); + +-- Nothing happens +copy rem2 from stdin; +1 foo +2 bar +\. +select * from rem2; + +drop trigger trig_null on rem2; + +delete from rem2; + +-- Test remote triggers +create trigger trig_row_before_insert before insert on loc2 + for each row execute procedure trig_row_before_insupdate(); + +-- The new values are concatenated with ' triggered !' +copy rem2 from stdin; +1 foo +2 bar +\. +select * from rem2; + +drop trigger trig_row_before_insert on loc2; + +delete from rem2; + +create trigger trig_null before insert on loc2 + for each row execute procedure trig_null(); + +-- Nothing happens +copy rem2 from stdin; +1 foo +2 bar +\. +select * from rem2; + +drop trigger trig_null on loc2; + +delete from rem2; + +-- Test a combination of local and remote triggers +create trigger rem2_trig_row_before before insert on rem2 + for each row execute procedure trigger_data(23,'skidoo'); +create trigger rem2_trig_row_after after insert on rem2 + for each row execute procedure trigger_data(23,'skidoo'); +create trigger loc2_trig_row_before_insert before insert on loc2 + for each row execute procedure trig_row_before_insupdate(); + +copy rem2 from stdin; +1 foo +2 bar +\. +select * from rem2; + +drop trigger rem2_trig_row_before on rem2; +drop trigger rem2_trig_row_after on rem2; +drop trigger loc2_trig_row_before_insert on loc2; + +delete from rem2; + +-- =================================================================== -- test IMPORT FOREIGN SCHEMA -- =================================================================== |