diff options
-rw-r--r-- | contrib/test_decoding/expected/concurrent_stream.out | 5 | ||||
-rw-r--r-- | contrib/test_decoding/specs/concurrent_stream.spec | 10 | ||||
-rw-r--r-- | contrib/test_decoding/test_decoding.c | 93 | ||||
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 1 | ||||
-rw-r--r-- | src/include/replication/reorderbuffer.h | 5 | ||||
-rw-r--r-- | src/tools/pgindent/typedefs.list | 1 |
6 files changed, 96 insertions, 19 deletions
diff --git a/contrib/test_decoding/expected/concurrent_stream.out b/contrib/test_decoding/expected/concurrent_stream.out index e731d13d8fa..6f8b2176db5 100644 --- a/contrib/test_decoding/expected/concurrent_stream.out +++ b/contrib/test_decoding/expected/concurrent_stream.out @@ -1,11 +1,12 @@ -Parsed test spec with 2 sessions +Parsed test spec with 3 sessions -starting permutation: s0_begin s0_ddl s1_ddl s1_begin s1_toast_insert s1_commit s1_get_stream_changes +starting permutation: s0_begin s0_ddl s1_ddl s1_begin s1_toast_insert s2_ddl s1_commit s1_get_stream_changes step s0_begin: BEGIN; step s0_ddl: CREATE TABLE stream_test1(data text); step s1_ddl: CREATE TABLE stream_test(data text); step s1_begin: BEGIN; step s1_toast_insert: INSERT INTO stream_test SELECT large_val(); +step s2_ddl: CREATE TABLE stream_test2(data text); step s1_commit: COMMIT; step s1_get_stream_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); data diff --git a/contrib/test_decoding/specs/concurrent_stream.spec b/contrib/test_decoding/specs/concurrent_stream.spec index ad9fde9c284..54218a4b3f6 100644 --- a/contrib/test_decoding/specs/concurrent_stream.spec +++ b/contrib/test_decoding/specs/concurrent_stream.spec @@ -23,9 +23,15 @@ setup { SET synchronous_commit=on; } step "s0_begin" { BEGIN; } step "s0_ddl" {CREATE TABLE stream_test1(data text);} +session "s2" +setup { SET synchronous_commit=on; } +step "s2_ddl" {CREATE TABLE stream_test2(data text);} + # The transaction commit for s1_ddl will add the INTERNAL_SNAPSHOT change to # the currently running s0_ddl and we want to test that s0_ddl should not get -# streamed when user asked to skip-empty-xacts. +# streamed when user asked to skip-empty-xacts. Similarly, the +# INTERNAL_SNAPSHOT change added by s2_ddl should not change the results for +# what gets streamed. session "s1" setup { SET synchronous_commit=on; } step "s1_ddl" { CREATE TABLE stream_test(data text); } @@ -34,4 +40,4 @@ step "s1_toast_insert" {INSERT INTO stream_test SELECT large_val();} step "s1_commit" { COMMIT; } step "s1_get_stream_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');} -permutation "s0_begin" "s0_ddl" "s1_ddl" "s1_begin" "s1_toast_insert" "s1_commit" "s1_get_stream_changes" +permutation "s0_begin" "s0_ddl" "s1_ddl" "s1_begin" "s1_toast_insert" "s2_ddl" "s1_commit" "s1_get_stream_changes" diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 8e33614f144..e12278beb58 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -34,10 +34,24 @@ typedef struct bool include_xids; bool include_timestamp; bool skip_empty_xacts; - bool xact_wrote_changes; bool only_local; } TestDecodingData; +/* + * Maintain the per-transaction level variables to track whether the + * transaction and or streams have written any changes. In streaming mode the + * transaction can be decoded in streams so along with maintaining whether the + * transaction has written any changes, we also need to track whether the + * current stream has written any changes. This is required so that if user + * has requested to skip the empty transactions we can skip the empty streams + * even though the transaction has written some changes. + */ +typedef struct +{ + bool xact_wrote_changes; + bool stream_wrote_changes; +} TestDecodingTxnData; + static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init); static void pg_decode_shutdown(LogicalDecodingContext *ctx); @@ -255,8 +269,12 @@ static void pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { TestDecodingData *data = ctx->output_plugin_private; + TestDecodingTxnData *txndata = + MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData)); + + txndata->xact_wrote_changes = false; + txn->output_plugin_private = txndata; - data->xact_wrote_changes = false; if (data->skip_empty_xacts) return; @@ -280,8 +298,13 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { TestDecodingData *data = ctx->output_plugin_private; + TestDecodingTxnData *txndata = txn->output_plugin_private; + bool xact_wrote_changes = txndata->xact_wrote_changes; + + pfree(txndata); + txn->output_plugin_private = NULL; - if (data->skip_empty_xacts && !data->xact_wrote_changes) + if (data->skip_empty_xacts && !xact_wrote_changes) return; OutputPluginPrepareWrite(ctx, true); @@ -442,18 +465,20 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change) { TestDecodingData *data; + TestDecodingTxnData *txndata; Form_pg_class class_form; TupleDesc tupdesc; MemoryContext old; data = ctx->output_plugin_private; + txndata = txn->output_plugin_private; /* output BEGIN if we haven't yet */ - if (data->skip_empty_xacts && !data->xact_wrote_changes) + if (data->skip_empty_xacts && !txndata->xact_wrote_changes) { pg_output_begin(ctx, data, txn, false); } - data->xact_wrote_changes = true; + txndata->xact_wrote_changes = true; class_form = RelationGetForm(relation); tupdesc = RelationGetDescr(relation); @@ -527,17 +552,19 @@ pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change) { TestDecodingData *data; + TestDecodingTxnData *txndata; MemoryContext old; int i; data = ctx->output_plugin_private; + txndata = txn->output_plugin_private; /* output BEGIN if we haven't yet */ - if (data->skip_empty_xacts && !data->xact_wrote_changes) + if (data->skip_empty_xacts && !txndata->xact_wrote_changes) { pg_output_begin(ctx, data, txn, false); } - data->xact_wrote_changes = true; + txndata->xact_wrote_changes = true; /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); @@ -592,8 +619,20 @@ pg_decode_stream_start(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { TestDecodingData *data = ctx->output_plugin_private; + TestDecodingTxnData *txndata = txn->output_plugin_private; - data->xact_wrote_changes = false; + /* + * Allocate the txn plugin data for the first stream in the transaction. + */ + if (txndata == NULL) + { + txndata = + MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData)); + txndata->xact_wrote_changes = false; + txn->output_plugin_private = txndata; + } + + txndata->stream_wrote_changes = false; if (data->skip_empty_xacts) return; pg_output_stream_start(ctx, data, txn, true); @@ -615,8 +654,9 @@ pg_decode_stream_stop(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { TestDecodingData *data = ctx->output_plugin_private; + TestDecodingTxnData *txndata = txn->output_plugin_private; - if (data->skip_empty_xacts && !data->xact_wrote_changes) + if (data->skip_empty_xacts && !txndata->stream_wrote_changes) return; OutputPluginPrepareWrite(ctx, true); @@ -634,7 +674,23 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx, { TestDecodingData *data = ctx->output_plugin_private; - if (data->skip_empty_xacts && !data->xact_wrote_changes) + /* + * stream abort can be sent for an individual subtransaction but we + * maintain the output_plugin_private only under the toptxn so if this is + * not the toptxn then fetch the toptxn. + */ + ReorderBufferTXN *toptxn = txn->toptxn ? txn->toptxn : txn; + TestDecodingTxnData *txndata = toptxn->output_plugin_private; + bool xact_wrote_changes = txndata->xact_wrote_changes; + + if (txn->toptxn == NULL) + { + Assert(txn->output_plugin_private != NULL); + pfree(txndata); + txn->output_plugin_private = NULL; + } + + if (data->skip_empty_xacts && !xact_wrote_changes) return; OutputPluginPrepareWrite(ctx, true); @@ -651,8 +707,13 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx, XLogRecPtr commit_lsn) { TestDecodingData *data = ctx->output_plugin_private; + TestDecodingTxnData *txndata = txn->output_plugin_private; + bool xact_wrote_changes = txndata->xact_wrote_changes; + + pfree(txndata); + txn->output_plugin_private = NULL; - if (data->skip_empty_xacts && !data->xact_wrote_changes) + if (data->skip_empty_xacts && !xact_wrote_changes) return; OutputPluginPrepareWrite(ctx, true); @@ -681,13 +742,14 @@ pg_decode_stream_change(LogicalDecodingContext *ctx, ReorderBufferChange *change) { TestDecodingData *data = ctx->output_plugin_private; + TestDecodingTxnData *txndata = txn->output_plugin_private; /* output stream start if we haven't yet */ - if (data->skip_empty_xacts && !data->xact_wrote_changes) + if (data->skip_empty_xacts && !txndata->stream_wrote_changes) { pg_output_stream_start(ctx, data, txn, false); } - data->xact_wrote_changes = true; + txndata->xact_wrote_changes = txndata->stream_wrote_changes = true; OutputPluginPrepareWrite(ctx, true); if (data->include_xids) @@ -734,12 +796,13 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, ReorderBufferChange *change) { TestDecodingData *data = ctx->output_plugin_private; + TestDecodingTxnData *txndata = txn->output_plugin_private; - if (data->skip_empty_xacts && !data->xact_wrote_changes) + if (data->skip_empty_xacts && !txndata->stream_wrote_changes) { pg_output_stream_start(ctx, data, txn, false); } - data->xact_wrote_changes = true; + txndata->xact_wrote_changes = txndata->stream_wrote_changes = true; OutputPluginPrepareWrite(ctx, true); if (data->include_xids) diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index c1bd68011c5..301baff2446 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -402,6 +402,7 @@ ReorderBufferGetTXN(ReorderBuffer *rb) /* InvalidCommandId is not zero, so set it explicitly */ txn->command_id = InvalidCommandId; + txn->output_plugin_private = NULL; return txn; } diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index dfdda938b2a..bd9dd7ec676 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -378,6 +378,11 @@ typedef struct ReorderBufferTXN /* If we have detected concurrent abort then ignore future changes. */ bool concurrent_abort; + + /* + * Private data pointer of the output plugin. + */ + void *output_plugin_private; } ReorderBufferTXN; /* so we can define the callbacks used inside struct ReorderBuffer itself */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index b146b3ea73d..fde701bfd4d 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2505,6 +2505,7 @@ Tcl_Obj Tcl_Time TempNamespaceStatus TestDecodingData +TestDecodingTxnData TestSpec TextFreq TextPositionState |