aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/test_decoding/expected/decoding_into_rel.out25
-rw-r--r--contrib/test_decoding/sql/decoding_into_rel.sql11
-rw-r--r--src/backend/executor/spi.c42
-rw-r--r--src/include/executor/spi_priv.h3
4 files changed, 70 insertions, 11 deletions
diff --git a/contrib/test_decoding/expected/decoding_into_rel.out b/contrib/test_decoding/expected/decoding_into_rel.out
index be759caa31d..8fd3390066d 100644
--- a/contrib/test_decoding/expected/decoding_into_rel.out
+++ b/contrib/test_decoding/expected/decoding_into_rel.out
@@ -59,6 +59,31 @@ SELECT * FROM changeresult;
DROP TABLE changeresult;
DROP TABLE somechange;
+-- check calling logical decoding from pl/pgsql
+CREATE FUNCTION slot_changes_wrapper(slot_name name) RETURNS SETOF TEXT AS $$
+BEGIN
+ RETURN QUERY
+ SELECT data FROM pg_logical_slot_peek_changes(slot_name, NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+END$$ LANGUAGE plpgsql;
+SELECT * FROM slot_changes_wrapper('regression_slot');
+ slot_changes_wrapper
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ BEGIN
+ table public.changeresult: INSERT: data[text]:'BEGIN'
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN'''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.somechange: INSERT: id[integer]:1'''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT'''
+ table public.changeresult: INSERT: data[text]:'COMMIT'
+ table public.changeresult: INSERT: data[text]:'BEGIN'
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''BEGIN'''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''BEGIN'''''''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''table public.somechange: INSERT: id[integer]:1'''''''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''table public.changeresult: INSERT: data[text]:''''COMMIT'''''''
+ table public.changeresult: INSERT: data[text]:'table public.changeresult: INSERT: data[text]:''COMMIT'''
+ table public.changeresult: INSERT: data[text]:'COMMIT'
+ COMMIT
+(14 rows)
+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
diff --git a/contrib/test_decoding/sql/decoding_into_rel.sql b/contrib/test_decoding/sql/decoding_into_rel.sql
index 54670fd39e7..1068cec5888 100644
--- a/contrib/test_decoding/sql/decoding_into_rel.sql
+++ b/contrib/test_decoding/sql/decoding_into_rel.sql
@@ -27,5 +27,16 @@ INSERT INTO changeresult
SELECT * FROM changeresult;
DROP TABLE changeresult;
DROP TABLE somechange;
+
+-- check calling logical decoding from pl/pgsql
+CREATE FUNCTION slot_changes_wrapper(slot_name name) RETURNS SETOF TEXT AS $$
+BEGIN
+ RETURN QUERY
+ SELECT data FROM pg_logical_slot_peek_changes(slot_name, NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+END$$ LANGUAGE plpgsql;
+
+SELECT * FROM slot_changes_wrapper('regression_slot');
+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index cd00a6d9f25..1069013cd2a 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -71,8 +71,8 @@ static void _SPI_cursor_operation(Portal portal,
static SPIPlanPtr _SPI_make_plan_non_temp(SPIPlanPtr plan);
static SPIPlanPtr _SPI_save_plan(SPIPlanPtr plan);
-static int _SPI_begin_call(bool execmem);
-static int _SPI_end_call(bool procmem);
+static int _SPI_begin_call(bool use_exec);
+static int _SPI_end_call(bool use_exec);
static MemoryContext _SPI_execmem(void);
static MemoryContext _SPI_procmem(void);
static bool _SPI_checktuples(void);
@@ -118,6 +118,7 @@ SPI_connect(void)
_SPI_current->processed = 0;
_SPI_current->lastoid = InvalidOid;
_SPI_current->tuptable = NULL;
+ _SPI_current->execSubid = InvalidSubTransactionId;
slist_init(&_SPI_current->tuptables);
_SPI_current->procCxt = NULL; /* in case we fail to create 'em */
_SPI_current->execCxt = NULL;
@@ -149,7 +150,7 @@ SPI_finish(void)
{
int res;
- res = _SPI_begin_call(false); /* live in procedure memory */
+ res = _SPI_begin_call(false); /* just check we're connected */
if (res < 0)
return res;
@@ -268,8 +269,15 @@ AtEOSubXact_SPI(bool isCommit, SubTransactionId mySubid)
{
slist_mutable_iter siter;
- /* free Executor memory the same as _SPI_end_call would do */
- MemoryContextResetAndDeleteChildren(_SPI_current->execCxt);
+ /*
+ * Throw away executor state if current executor operation was started
+ * within current subxact (essentially, force a _SPI_end_call(true)).
+ */
+ if (_SPI_current->execSubid >= mySubid)
+ {
+ _SPI_current->execSubid = InvalidSubTransactionId;
+ MemoryContextResetAndDeleteChildren(_SPI_current->execCxt);
+ }
/* throw away any tuple tables created within current subxact */
slist_foreach_modify(siter, &_SPI_current->tuptables)
@@ -293,8 +301,6 @@ AtEOSubXact_SPI(bool isCommit, SubTransactionId mySubid)
MemoryContextDelete(tuptable->tuptabcxt);
}
}
- /* in particular we should have gotten rid of any in-progress table */
- Assert(_SPI_current->tuptable == NULL);
}
}
@@ -2444,15 +2450,24 @@ _SPI_procmem(void)
/*
* _SPI_begin_call: begin a SPI operation within a connected procedure
+ *
+ * use_exec is true if we intend to make use of the procedure's execCxt
+ * during this SPI operation. We'll switch into that context, and arrange
+ * for it to be cleaned up at _SPI_end_call or if an error occurs.
*/
static int
-_SPI_begin_call(bool execmem)
+_SPI_begin_call(bool use_exec)
{
if (_SPI_current == NULL)
return SPI_ERROR_UNCONNECTED;
- if (execmem) /* switch to the Executor memory context */
+ if (use_exec)
+ {
+ /* remember when the Executor operation started */
+ _SPI_current->execSubid = GetCurrentSubTransactionId();
+ /* switch to the Executor memory context */
_SPI_execmem();
+ }
return 0;
}
@@ -2460,14 +2475,19 @@ _SPI_begin_call(bool execmem)
/*
* _SPI_end_call: end a SPI operation within a connected procedure
*
+ * use_exec must be the same as in the previous _SPI_begin_call
+ *
* Note: this currently has no failure return cases, so callers don't check
*/
static int
-_SPI_end_call(bool procmem)
+_SPI_end_call(bool use_exec)
{
- if (procmem) /* switch to the procedure memory context */
+ if (use_exec)
{
+ /* switch to the procedure memory context */
_SPI_procmem();
+ /* mark Executor context no longer in use */
+ _SPI_current->execSubid = InvalidSubTransactionId;
/* and free Executor memory */
MemoryContextResetAndDeleteChildren(_SPI_current->execCxt);
}
diff --git a/src/include/executor/spi_priv.h b/src/include/executor/spi_priv.h
index ba7fb988754..f2cfd59feb5 100644
--- a/src/include/executor/spi_priv.h
+++ b/src/include/executor/spi_priv.h
@@ -33,6 +33,9 @@ typedef struct
MemoryContext savedcxt; /* context of SPI_connect's caller */
SubTransactionId connectSubid; /* ID of connecting subtransaction */
QueryEnvironment *queryEnv; /* query environment setup for SPI level */
+
+ /* subtransaction in which current Executor call was started */
+ SubTransactionId execSubid;
} _SPI_connection;
/*