aboutsummaryrefslogtreecommitdiff
path: root/contrib/test_decoding/test_decoding.c
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/test_decoding/test_decoding.c')
-rw-r--r--contrib/test_decoding/test_decoding.c93
1 files changed, 78 insertions, 15 deletions
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 8e33614f144..e12278beb58 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -34,10 +34,24 @@ typedef struct
bool include_xids;
bool include_timestamp;
bool skip_empty_xacts;
- bool xact_wrote_changes;
bool only_local;
} TestDecodingData;
+/*
+ * Maintain the per-transaction level variables to track whether the
+ * transaction and or streams have written any changes. In streaming mode the
+ * transaction can be decoded in streams so along with maintaining whether the
+ * transaction has written any changes, we also need to track whether the
+ * current stream has written any changes. This is required so that if user
+ * has requested to skip the empty transactions we can skip the empty streams
+ * even though the transaction has written some changes.
+ */
+typedef struct
+{
+ bool xact_wrote_changes;
+ bool stream_wrote_changes;
+} TestDecodingTxnData;
+
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
bool is_init);
static void pg_decode_shutdown(LogicalDecodingContext *ctx);
@@ -255,8 +269,12 @@ static void
pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{
TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata =
+ MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
+
+ txndata->xact_wrote_changes = false;
+ txn->output_plugin_private = txndata;
- data->xact_wrote_changes = false;
if (data->skip_empty_xacts)
return;
@@ -280,8 +298,13 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata = txn->output_plugin_private;
+ bool xact_wrote_changes = txndata->xact_wrote_changes;
+
+ pfree(txndata);
+ txn->output_plugin_private = NULL;
- if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ if (data->skip_empty_xacts && !xact_wrote_changes)
return;
OutputPluginPrepareWrite(ctx, true);
@@ -442,18 +465,20 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change)
{
TestDecodingData *data;
+ TestDecodingTxnData *txndata;
Form_pg_class class_form;
TupleDesc tupdesc;
MemoryContext old;
data = ctx->output_plugin_private;
+ txndata = txn->output_plugin_private;
/* output BEGIN if we haven't yet */
- if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
{
pg_output_begin(ctx, data, txn, false);
}
- data->xact_wrote_changes = true;
+ txndata->xact_wrote_changes = true;
class_form = RelationGetForm(relation);
tupdesc = RelationGetDescr(relation);
@@ -527,17 +552,19 @@ pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
int nrelations, Relation relations[], ReorderBufferChange *change)
{
TestDecodingData *data;
+ TestDecodingTxnData *txndata;
MemoryContext old;
int i;
data = ctx->output_plugin_private;
+ txndata = txn->output_plugin_private;
/* output BEGIN if we haven't yet */
- if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
{
pg_output_begin(ctx, data, txn, false);
}
- data->xact_wrote_changes = true;
+ txndata->xact_wrote_changes = true;
/* Avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);
@@ -592,8 +619,20 @@ pg_decode_stream_start(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata = txn->output_plugin_private;
- data->xact_wrote_changes = false;
+ /*
+ * Allocate the txn plugin data for the first stream in the transaction.
+ */
+ if (txndata == NULL)
+ {
+ txndata =
+ MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
+ txndata->xact_wrote_changes = false;
+ txn->output_plugin_private = txndata;
+ }
+
+ txndata->stream_wrote_changes = false;
if (data->skip_empty_xacts)
return;
pg_output_stream_start(ctx, data, txn, true);
@@ -615,8 +654,9 @@ pg_decode_stream_stop(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata = txn->output_plugin_private;
- if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
return;
OutputPluginPrepareWrite(ctx, true);
@@ -634,7 +674,23 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
{
TestDecodingData *data = ctx->output_plugin_private;
- if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ /*
+ * stream abort can be sent for an individual subtransaction but we
+ * maintain the output_plugin_private only under the toptxn so if this is
+ * not the toptxn then fetch the toptxn.
+ */
+ ReorderBufferTXN *toptxn = txn->toptxn ? txn->toptxn : txn;
+ TestDecodingTxnData *txndata = toptxn->output_plugin_private;
+ bool xact_wrote_changes = txndata->xact_wrote_changes;
+
+ if (txn->toptxn == NULL)
+ {
+ Assert(txn->output_plugin_private != NULL);
+ pfree(txndata);
+ txn->output_plugin_private = NULL;
+ }
+
+ if (data->skip_empty_xacts && !xact_wrote_changes)
return;
OutputPluginPrepareWrite(ctx, true);
@@ -651,8 +707,13 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx,
XLogRecPtr commit_lsn)
{
TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata = txn->output_plugin_private;
+ bool xact_wrote_changes = txndata->xact_wrote_changes;
+
+ pfree(txndata);
+ txn->output_plugin_private = NULL;
- if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ if (data->skip_empty_xacts && !xact_wrote_changes)
return;
OutputPluginPrepareWrite(ctx, true);
@@ -681,13 +742,14 @@ pg_decode_stream_change(LogicalDecodingContext *ctx,
ReorderBufferChange *change)
{
TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata = txn->output_plugin_private;
/* output stream start if we haven't yet */
- if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
{
pg_output_stream_start(ctx, data, txn, false);
}
- data->xact_wrote_changes = true;
+ txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
@@ -734,12 +796,13 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
ReorderBufferChange *change)
{
TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata = txn->output_plugin_private;
- if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
{
pg_output_stream_start(ctx, data, txn, false);
}
- data->xact_wrote_changes = true;
+ txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)