aboutsummaryrefslogtreecommitdiff
path: root/contrib/test_decoding/test_decoding.c
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/test_decoding/test_decoding.c')
-rw-r--r--contrib/test_decoding/test_decoding.c55
1 files changed, 37 insertions, 18 deletions
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 34745150e9b..e60ab34a5a7 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -64,6 +64,10 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
Size sz, const char *message);
static void pg_decode_stream_start(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
+static void pg_output_stream_start(LogicalDecodingContext *ctx,
+ TestDecodingData *data,
+ ReorderBufferTXN *txn,
+ bool last_write);
static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
@@ -583,34 +587,38 @@ pg_decode_message(LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
}
-/*
- * We never try to stream any empty xact so we don't need any special handling
- * for skip_empty_xacts in streaming mode APIs.
- */
static void
pg_decode_stream_start(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
TestDecodingData *data = ctx->output_plugin_private;
- OutputPluginPrepareWrite(ctx, true);
+ data->xact_wrote_changes = false;
+ if (data->skip_empty_xacts)
+ return;
+ pg_output_stream_start(ctx, data, txn, true);
+}
+
+static void
+pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
+{
+ OutputPluginPrepareWrite(ctx, last_write);
if (data->include_xids)
appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
else
appendStringInfo(ctx->out, "opening a streamed block for transaction");
- OutputPluginWrite(ctx, true);
+ OutputPluginWrite(ctx, last_write);
}
-/*
- * We never try to stream any empty xact so we don't need any special handling
- * for skip_empty_xacts in streaming mode APIs.
- */
static void
pg_decode_stream_stop(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
TestDecodingData *data = ctx->output_plugin_private;
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ return;
+
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
@@ -619,10 +627,6 @@ pg_decode_stream_stop(LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
}
-/*
- * We never try to stream any empty xact so we don't need any special handling
- * for skip_empty_xacts in streaming mode APIs.
- */
static void
pg_decode_stream_abort(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
@@ -630,6 +634,9 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
{
TestDecodingData *data = ctx->output_plugin_private;
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ return;
+
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
@@ -638,10 +645,6 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
}
-/*
- * We never try to stream any empty xact so we don't need any special handling
- * for skip_empty_xacts in streaming mode APIs.
- */
static void
pg_decode_stream_commit(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
@@ -649,6 +652,9 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx,
{
TestDecodingData *data = ctx->output_plugin_private;
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ return;
+
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
@@ -676,6 +682,13 @@ pg_decode_stream_change(LogicalDecodingContext *ctx,
{
TestDecodingData *data = ctx->output_plugin_private;
+ /* output stream start if we haven't yet */
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ {
+ pg_output_stream_start(ctx, data, txn, false);
+ }
+ data->xact_wrote_changes = true;
+
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
@@ -722,6 +735,12 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
{
TestDecodingData *data = ctx->output_plugin_private;
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ {
+ pg_output_stream_start(ctx, data, txn, false);
+ }
+ data->xact_wrote_changes = true;
+
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);