diff options
Diffstat (limited to 'contrib/test_decoding/test_decoding.c')
-rw-r--r-- | contrib/test_decoding/test_decoding.c | 93 |
1 files changed, 78 insertions, 15 deletions
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 8e33614f144..e12278beb58 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -34,10 +34,24 @@ typedef struct bool include_xids; bool include_timestamp; bool skip_empty_xacts; - bool xact_wrote_changes; bool only_local; } TestDecodingData; +/* + * Maintain the per-transaction level variables to track whether the + * transaction and or streams have written any changes. In streaming mode the + * transaction can be decoded in streams so along with maintaining whether the + * transaction has written any changes, we also need to track whether the + * current stream has written any changes. This is required so that if user + * has requested to skip the empty transactions we can skip the empty streams + * even though the transaction has written some changes. + */ +typedef struct +{ + bool xact_wrote_changes; + bool stream_wrote_changes; +} TestDecodingTxnData; + static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init); static void pg_decode_shutdown(LogicalDecodingContext *ctx); @@ -255,8 +269,12 @@ static void pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { TestDecodingData *data = ctx->output_plugin_private; + TestDecodingTxnData *txndata = + MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData)); + + txndata->xact_wrote_changes = false; + txn->output_plugin_private = txndata; - data->xact_wrote_changes = false; if (data->skip_empty_xacts) return; @@ -280,8 +298,13 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { TestDecodingData *data = ctx->output_plugin_private; + TestDecodingTxnData *txndata = txn->output_plugin_private; + bool xact_wrote_changes = txndata->xact_wrote_changes; + + pfree(txndata); + txn->output_plugin_private = NULL; - if (data->skip_empty_xacts && !data->xact_wrote_changes) + if (data->skip_empty_xacts && !xact_wrote_changes) return; OutputPluginPrepareWrite(ctx, true); @@ -442,18 +465,20 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change) { TestDecodingData *data; + TestDecodingTxnData *txndata; Form_pg_class class_form; TupleDesc tupdesc; MemoryContext old; data = ctx->output_plugin_private; + txndata = txn->output_plugin_private; /* output BEGIN if we haven't yet */ - if (data->skip_empty_xacts && !data->xact_wrote_changes) + if (data->skip_empty_xacts && !txndata->xact_wrote_changes) { pg_output_begin(ctx, data, txn, false); } - data->xact_wrote_changes = true; + txndata->xact_wrote_changes = true; class_form = RelationGetForm(relation); tupdesc = RelationGetDescr(relation); @@ -527,17 +552,19 @@ pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change) { TestDecodingData *data; + TestDecodingTxnData *txndata; MemoryContext old; int i; data = ctx->output_plugin_private; + txndata = txn->output_plugin_private; /* output BEGIN if we haven't yet */ - if (data->skip_empty_xacts && !data->xact_wrote_changes) + if (data->skip_empty_xacts && !txndata->xact_wrote_changes) { pg_output_begin(ctx, data, txn, false); } - data->xact_wrote_changes = true; + txndata->xact_wrote_changes = true; /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); @@ -592,8 +619,20 @@ pg_decode_stream_start(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { TestDecodingData *data = ctx->output_plugin_private; + TestDecodingTxnData *txndata = txn->output_plugin_private; - data->xact_wrote_changes = false; + /* + * Allocate the txn plugin data for the first stream in the transaction. + */ + if (txndata == NULL) + { + txndata = + MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData)); + txndata->xact_wrote_changes = false; + txn->output_plugin_private = txndata; + } + + txndata->stream_wrote_changes = false; if (data->skip_empty_xacts) return; pg_output_stream_start(ctx, data, txn, true); @@ -615,8 +654,9 @@ pg_decode_stream_stop(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { TestDecodingData *data = ctx->output_plugin_private; + TestDecodingTxnData *txndata = txn->output_plugin_private; - if (data->skip_empty_xacts && !data->xact_wrote_changes) + if (data->skip_empty_xacts && !txndata->stream_wrote_changes) return; OutputPluginPrepareWrite(ctx, true); @@ -634,7 +674,23 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx, { TestDecodingData *data = ctx->output_plugin_private; - if (data->skip_empty_xacts && !data->xact_wrote_changes) + /* + * stream abort can be sent for an individual subtransaction but we + * maintain the output_plugin_private only under the toptxn so if this is + * not the toptxn then fetch the toptxn. + */ + ReorderBufferTXN *toptxn = txn->toptxn ? txn->toptxn : txn; + TestDecodingTxnData *txndata = toptxn->output_plugin_private; + bool xact_wrote_changes = txndata->xact_wrote_changes; + + if (txn->toptxn == NULL) + { + Assert(txn->output_plugin_private != NULL); + pfree(txndata); + txn->output_plugin_private = NULL; + } + + if (data->skip_empty_xacts && !xact_wrote_changes) return; OutputPluginPrepareWrite(ctx, true); @@ -651,8 +707,13 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx, XLogRecPtr commit_lsn) { TestDecodingData *data = ctx->output_plugin_private; + TestDecodingTxnData *txndata = txn->output_plugin_private; + bool xact_wrote_changes = txndata->xact_wrote_changes; + + pfree(txndata); + txn->output_plugin_private = NULL; - if (data->skip_empty_xacts && !data->xact_wrote_changes) + if (data->skip_empty_xacts && !xact_wrote_changes) return; OutputPluginPrepareWrite(ctx, true); @@ -681,13 +742,14 @@ pg_decode_stream_change(LogicalDecodingContext *ctx, ReorderBufferChange *change) { TestDecodingData *data = ctx->output_plugin_private; + TestDecodingTxnData *txndata = txn->output_plugin_private; /* output stream start if we haven't yet */ - if (data->skip_empty_xacts && !data->xact_wrote_changes) + if (data->skip_empty_xacts && !txndata->stream_wrote_changes) { pg_output_stream_start(ctx, data, txn, false); } - data->xact_wrote_changes = true; + txndata->xact_wrote_changes = txndata->stream_wrote_changes = true; OutputPluginPrepareWrite(ctx, true); if (data->include_xids) @@ -734,12 +796,13 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, ReorderBufferChange *change) { TestDecodingData *data = ctx->output_plugin_private; + TestDecodingTxnData *txndata = txn->output_plugin_private; - if (data->skip_empty_xacts && !data->xact_wrote_changes) + if (data->skip_empty_xacts && !txndata->stream_wrote_changes) { pg_output_stream_start(ctx, data, txn, false); } - data->xact_wrote_changes = true; + txndata->xact_wrote_changes = txndata->stream_wrote_changes = true; OutputPluginPrepareWrite(ctx, true); if (data->include_xids) |