diff options
Diffstat (limited to 'contrib/test_decoding/test_decoding.c')
-rw-r--r-- | contrib/test_decoding/test_decoding.c | 45 |
1 files changed, 42 insertions, 3 deletions
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 5ce052b5c61..1167cc7c49b 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -42,6 +42,8 @@ typedef struct MemoryContext context; bool include_xids; bool include_timestamp; + bool skip_empty_xacts; + bool xact_wrote_changes; } TestDecodingData; static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, @@ -49,6 +51,10 @@ static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions * static void pg_decode_shutdown(LogicalDecodingContext *ctx); static void pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn); +static void pg_output_begin(LogicalDecodingContext *ctx, + TestDecodingData *data, + ReorderBufferTXN *txn, + bool last_write); static void pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); static void pg_decode_change(LogicalDecodingContext *ctx, @@ -83,7 +89,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, ListCell *option; TestDecodingData *data; - data = palloc(sizeof(TestDecodingData)); + data = palloc0(sizeof(TestDecodingData)); data->context = AllocSetContextCreate(ctx->context, "text conversion context", ALLOCSET_DEFAULT_MINSIZE, @@ -91,6 +97,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, ALLOCSET_DEFAULT_MAXSIZE); data->include_xids = true; data->include_timestamp = false; + data->skip_empty_xacts = false; ctx->output_plugin_private = data; @@ -138,6 +145,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, if (force_binary) opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT; } + else if (strcmp(elem->defname, "skip-empty-xacts") == 0) + { + + if (elem->arg == NULL) + data->skip_empty_xacts = true; + else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not parse value \"%s\" for parameter \"%s\"", + strVal(elem->arg), elem->defname))); + } else { ereport(ERROR, @@ -165,12 +183,22 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { TestDecodingData *data = ctx->output_plugin_private; - OutputPluginPrepareWrite(ctx, true); + data->xact_wrote_changes = false; + if (data->skip_empty_xacts) + return; + + pg_output_begin(ctx, data, txn, true); +} + +static void +pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write) +{ + OutputPluginPrepareWrite(ctx, last_write); if (data->include_xids) appendStringInfo(ctx->out, "BEGIN %u", txn->xid); else appendStringInfoString(ctx->out, "BEGIN"); - OutputPluginWrite(ctx, true); + OutputPluginWrite(ctx, last_write); } /* COMMIT callback */ @@ -180,6 +208,9 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, { TestDecodingData *data = ctx->output_plugin_private; + if (data->skip_empty_xacts && !data->xact_wrote_changes) + return; + OutputPluginPrepareWrite(ctx, true); if (data->include_xids) appendStringInfo(ctx->out, "COMMIT %u", txn->xid); @@ -339,6 +370,14 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, MemoryContext old; data = ctx->output_plugin_private; + + /* output BEGIN if we haven't yet */ + if (data->skip_empty_xacts && !data->xact_wrote_changes) + { + pg_output_begin(ctx, data, txn, false); + } + data->xact_wrote_changes = true; + class_form = RelationGetForm(relation); tupdesc = RelationGetDescr(relation); |