diff options
Diffstat (limited to 'contrib/test_decoding/test_decoding.c')
-rw-r--r-- | contrib/test_decoding/test_decoding.c | 55 |
1 files changed, 37 insertions, 18 deletions
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 34745150e9b..e60ab34a5a7 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -64,6 +64,10 @@ static void pg_decode_message(LogicalDecodingContext *ctx, Size sz, const char *message); static void pg_decode_stream_start(LogicalDecodingContext *ctx, ReorderBufferTXN *txn); +static void pg_output_stream_start(LogicalDecodingContext *ctx, + TestDecodingData *data, + ReorderBufferTXN *txn, + bool last_write); static void pg_decode_stream_stop(LogicalDecodingContext *ctx, ReorderBufferTXN *txn); static void pg_decode_stream_abort(LogicalDecodingContext *ctx, @@ -583,34 +587,38 @@ pg_decode_message(LogicalDecodingContext *ctx, OutputPluginWrite(ctx, true); } -/* - * We never try to stream any empty xact so we don't need any special handling - * for skip_empty_xacts in streaming mode APIs. - */ static void pg_decode_stream_start(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { TestDecodingData *data = ctx->output_plugin_private; - OutputPluginPrepareWrite(ctx, true); + data->xact_wrote_changes = false; + if (data->skip_empty_xacts) + return; + pg_output_stream_start(ctx, data, txn, true); +} + +static void +pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write) +{ + OutputPluginPrepareWrite(ctx, last_write); if (data->include_xids) appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid); else appendStringInfo(ctx->out, "opening a streamed block for transaction"); - OutputPluginWrite(ctx, true); + OutputPluginWrite(ctx, last_write); } -/* - * We never try to stream any empty xact so we don't need any special handling - * for skip_empty_xacts in streaming mode APIs. - */ static void pg_decode_stream_stop(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { TestDecodingData *data = ctx->output_plugin_private; + if (data->skip_empty_xacts && !data->xact_wrote_changes) + return; + OutputPluginPrepareWrite(ctx, true); if (data->include_xids) appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid); @@ -619,10 +627,6 @@ pg_decode_stream_stop(LogicalDecodingContext *ctx, OutputPluginWrite(ctx, true); } -/* - * We never try to stream any empty xact so we don't need any special handling - * for skip_empty_xacts in streaming mode APIs. - */ static void pg_decode_stream_abort(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, @@ -630,6 +634,9 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx, { TestDecodingData *data = ctx->output_plugin_private; + if (data->skip_empty_xacts && !data->xact_wrote_changes) + return; + OutputPluginPrepareWrite(ctx, true); if (data->include_xids) appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid); @@ -638,10 +645,6 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx, OutputPluginWrite(ctx, true); } -/* - * We never try to stream any empty xact so we don't need any special handling - * for skip_empty_xacts in streaming mode APIs. - */ static void pg_decode_stream_commit(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, @@ -649,6 +652,9 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx, { TestDecodingData *data = ctx->output_plugin_private; + if (data->skip_empty_xacts && !data->xact_wrote_changes) + return; + OutputPluginPrepareWrite(ctx, true); if (data->include_xids) @@ -676,6 +682,13 @@ pg_decode_stream_change(LogicalDecodingContext *ctx, { TestDecodingData *data = ctx->output_plugin_private; + /* output stream start if we haven't yet */ + if (data->skip_empty_xacts && !data->xact_wrote_changes) + { + pg_output_stream_start(ctx, data, txn, false); + } + data->xact_wrote_changes = true; + OutputPluginPrepareWrite(ctx, true); if (data->include_xids) appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid); @@ -722,6 +735,12 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, { TestDecodingData *data = ctx->output_plugin_private; + if (data->skip_empty_xacts && !data->xact_wrote_changes) + { + pg_output_stream_start(ctx, data, txn, false); + } + data->xact_wrote_changes = true; + OutputPluginPrepareWrite(ctx, true); if (data->include_xids) appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid); |