diff options
-rw-r--r-- | contrib/test_decoding/expected/decoding_into_rel.out | 25 | ||||
-rw-r--r-- | contrib/test_decoding/sql/decoding_into_rel.sql | 11 | ||||
-rw-r--r-- | src/backend/executor/spi.c | 42 | ||||
-rw-r--r-- | src/include/executor/spi_priv.h | 3 |
4 files changed, 70 insertions, 11 deletions
diff --git a/contrib/test_decoding/expected/decoding_into_rel.out b/contrib/test_decoding/expected/decoding_into_rel.out index be759caa31d..8fd3390066d 100644 --- a/contrib/test_decoding/expected/decoding_into_rel.out +++ b/contrib/test_decoding/expected/decoding_into_rel.out @@ -59,6 +59,31 @@ SELECT * FROM changeresult; DROP TABLE changeresult; DROP TABLE somechange; +-- check calling logical decoding from pl/pgsql +CREATE FUNCTION slot_changes_wrapper(slot_name name) RETURNS SETOF TEXT AS $$ +BEGIN + RETURN QUERY + SELECT data FROM pg_logical_slot_peek_changes(slot_name, NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +END$$ LANGUAGE plpgsql; +SELECT * FROM slot_changes_wrapper('regression_slot'); + slot_changes_wrapper +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + BEGIN + table public.changeresult: INSERT: data[text]:'BEGIN' + table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN''' + table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.somechange: INSERT: id[integer]:1''' + table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT''' + table public.changeresult: INSERT: data[text]:'COMMIT' + table public.changeresult: INSERT: data[text]:'BEGIN' + table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN''' + table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''BEGIN''''''' + table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''table public.somechange: INSERT: id[integer]:1''''''' + table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''COMMIT''''''' + table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT''' + table public.changeresult: INSERT: data[text]:'COMMIT' + COMMIT +(14 rows) + SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- diff --git a/contrib/test_decoding/sql/decoding_into_rel.sql b/contrib/test_decoding/sql/decoding_into_rel.sql index 54670fd39e7..1068cec5888 100644 --- a/contrib/test_decoding/sql/decoding_into_rel.sql +++ b/contrib/test_decoding/sql/decoding_into_rel.sql @@ -27,5 +27,16 @@ INSERT INTO changeresult SELECT * FROM changeresult; DROP TABLE changeresult; DROP TABLE somechange; + +-- check calling logical decoding from pl/pgsql +CREATE FUNCTION slot_changes_wrapper(slot_name name) RETURNS SETOF TEXT AS $$ +BEGIN + RETURN QUERY + SELECT data FROM pg_logical_slot_peek_changes(slot_name, NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +END$$ LANGUAGE plpgsql; + +SELECT * FROM slot_changes_wrapper('regression_slot'); + SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + SELECT 'stop' FROM pg_drop_replication_slot('regression_slot'); diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c index cd00a6d9f25..1069013cd2a 100644 --- a/src/backend/executor/spi.c +++ b/src/backend/executor/spi.c @@ -71,8 +71,8 @@ static void _SPI_cursor_operation(Portal portal, static SPIPlanPtr _SPI_make_plan_non_temp(SPIPlanPtr plan); static SPIPlanPtr _SPI_save_plan(SPIPlanPtr plan); -static int _SPI_begin_call(bool execmem); -static int _SPI_end_call(bool procmem); +static int _SPI_begin_call(bool use_exec); +static int _SPI_end_call(bool use_exec); static MemoryContext _SPI_execmem(void); static MemoryContext _SPI_procmem(void); static bool _SPI_checktuples(void); @@ -118,6 +118,7 @@ SPI_connect(void) _SPI_current->processed = 0; _SPI_current->lastoid = InvalidOid; _SPI_current->tuptable = NULL; + _SPI_current->execSubid = InvalidSubTransactionId; slist_init(&_SPI_current->tuptables); _SPI_current->procCxt = NULL; /* in case we fail to create 'em */ _SPI_current->execCxt = NULL; @@ -149,7 +150,7 @@ SPI_finish(void) { int res; - res = _SPI_begin_call(false); /* live in procedure memory */ + res = _SPI_begin_call(false); /* just check we're connected */ if (res < 0) return res; @@ -268,8 +269,15 @@ AtEOSubXact_SPI(bool isCommit, SubTransactionId mySubid) { slist_mutable_iter siter; - /* free Executor memory the same as _SPI_end_call would do */ - MemoryContextResetAndDeleteChildren(_SPI_current->execCxt); + /* + * Throw away executor state if current executor operation was started + * within current subxact (essentially, force a _SPI_end_call(true)). + */ + if (_SPI_current->execSubid >= mySubid) + { + _SPI_current->execSubid = InvalidSubTransactionId; + MemoryContextResetAndDeleteChildren(_SPI_current->execCxt); + } /* throw away any tuple tables created within current subxact */ slist_foreach_modify(siter, &_SPI_current->tuptables) @@ -293,8 +301,6 @@ AtEOSubXact_SPI(bool isCommit, SubTransactionId mySubid) MemoryContextDelete(tuptable->tuptabcxt); } } - /* in particular we should have gotten rid of any in-progress table */ - Assert(_SPI_current->tuptable == NULL); } } @@ -2444,15 +2450,24 @@ _SPI_procmem(void) /* * _SPI_begin_call: begin a SPI operation within a connected procedure + * + * use_exec is true if we intend to make use of the procedure's execCxt + * during this SPI operation. We'll switch into that context, and arrange + * for it to be cleaned up at _SPI_end_call or if an error occurs. */ static int -_SPI_begin_call(bool execmem) +_SPI_begin_call(bool use_exec) { if (_SPI_current == NULL) return SPI_ERROR_UNCONNECTED; - if (execmem) /* switch to the Executor memory context */ + if (use_exec) + { + /* remember when the Executor operation started */ + _SPI_current->execSubid = GetCurrentSubTransactionId(); + /* switch to the Executor memory context */ _SPI_execmem(); + } return 0; } @@ -2460,14 +2475,19 @@ _SPI_begin_call(bool execmem) /* * _SPI_end_call: end a SPI operation within a connected procedure * + * use_exec must be the same as in the previous _SPI_begin_call + * * Note: this currently has no failure return cases, so callers don't check */ static int -_SPI_end_call(bool procmem) +_SPI_end_call(bool use_exec) { - if (procmem) /* switch to the procedure memory context */ + if (use_exec) { + /* switch to the procedure memory context */ _SPI_procmem(); + /* mark Executor context no longer in use */ + _SPI_current->execSubid = InvalidSubTransactionId; /* and free Executor memory */ MemoryContextResetAndDeleteChildren(_SPI_current->execCxt); } diff --git a/src/include/executor/spi_priv.h b/src/include/executor/spi_priv.h index ba7fb988754..f2cfd59feb5 100644 --- a/src/include/executor/spi_priv.h +++ b/src/include/executor/spi_priv.h @@ -33,6 +33,9 @@ typedef struct MemoryContext savedcxt; /* context of SPI_connect's caller */ SubTransactionId connectSubid; /* ID of connecting subtransaction */ QueryEnvironment *queryEnv; /* query environment setup for SPI level */ + + /* subtransaction in which current Executor call was started */ + SubTransactionId execSubid; } _SPI_connection; /* |