aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2023-10-30 14:36:21 +0530
committerAmit Kapila <akapila@postgresql.org>2023-10-30 14:36:21 +0530
commit57891c256c345451b91ddb67a8e2a9c5eca9ae1c (patch)
tree77aa5e39ec8b055bf8444a958bf2a3c0ee68b817
parent675fed4df5db4e78d40a0ce9cb785cfba9fa480f (diff)
downloadpostgresql-57891c256c345451b91ddb67a8e2a9c5eca9ae1c.tar.gz
postgresql-57891c256c345451b91ddb67a8e2a9c5eca9ae1c.zip
Add STREAM_START/STREAM_STOP for transactional messages during decoding.
In test_decoding module, when skip_empty_xacts option was specified, add stream_start/stop for streaming transactional messages. This makes the handling of transactional messages stream consistent irrespective of whether skip_empty_xacts option was specified. Commit 26dd0284b9 made a similar change for non-streaming messages but forgot to update the streaming cases. Author: Peter Smith Reviewed-by: Amit Kapila Discussion: http://postgr.es/m/OS0PR01MB5716AEBD2988F8F5E9D5985794DFA@OS0PR01MB5716.jpnprd01.prod.outlook.com
-rw-r--r--contrib/test_decoding/expected/stream.out5
-rw-r--r--contrib/test_decoding/expected/twophase_stream.out10
-rw-r--r--contrib/test_decoding/test_decoding.c13
3 files changed, 25 insertions, 3 deletions
diff --git a/contrib/test_decoding/expected/stream.out b/contrib/test_decoding/expected/stream.out
index 0f21dcb8e0e..4ab2d47bf8d 100644
--- a/contrib/test_decoding/expected/stream.out
+++ b/contrib/test_decoding/expected/stream.out
@@ -29,7 +29,10 @@ COMMIT;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
data
----------------------------------------------------------
+ opening a streamed block for transaction
streaming message: transactional: 1 prefix: test, sz: 50
+ closing a streamed block for transaction
+ aborting streamed (sub)transaction
opening a streamed block for transaction
streaming change for transaction
streaming change for transaction
@@ -53,7 +56,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
streaming change for transaction
closing a streamed block for transaction
committing streamed transaction
-(24 rows)
+(27 rows)
-- streaming test for toast changes
ALTER TABLE stream_test ALTER COLUMN data set storage external;
diff --git a/contrib/test_decoding/expected/twophase_stream.out b/contrib/test_decoding/expected/twophase_stream.out
index b08bb0e5730..a3574f73c8e 100644
--- a/contrib/test_decoding/expected/twophase_stream.out
+++ b/contrib/test_decoding/expected/twophase_stream.out
@@ -31,7 +31,10 @@ PREPARE TRANSACTION 'test1';
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
data
----------------------------------------------------------
+ opening a streamed block for transaction
streaming message: transactional: 1 prefix: test, sz: 50
+ closing a streamed block for transaction
+ aborting streamed (sub)transaction
opening a streamed block for transaction
streaming change for transaction
streaming change for transaction
@@ -55,7 +58,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
streaming change for transaction
closing a streamed block for transaction
preparing streamed transaction 'test1'
-(24 rows)
+(27 rows)
COMMIT PREPARED 'test1';
--should show the COMMIT PREPARED and the other changes in the transaction
@@ -84,8 +87,11 @@ PREPARE TRANSACTION 'test1_nodecode';
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
data
----------------------------------------------------------
+ opening a streamed block for transaction
streaming message: transactional: 1 prefix: test, sz: 50
-(1 row)
+ closing a streamed block for transaction
+ aborting streamed (sub)transaction
+(4 rows)
COMMIT PREPARED 'test1_nodecode';
-- should show the inserts but not show a COMMIT PREPARED but a COMMIT
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index ab870d9e4dc..288fd0bb4ab 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -944,6 +944,19 @@ pg_decode_stream_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
const char *prefix, Size sz, const char *message)
{
+ /* Output stream start if we haven't yet for transactional messages. */
+ if (transactional)
+ {
+ TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata = txn->output_plugin_private;
+
+ if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
+ {
+ pg_output_stream_start(ctx, data, txn, false);
+ }
+ txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
+ }
+
OutputPluginPrepareWrite(ctx, true);
if (transactional)