aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/test_decoding/expected/concurrent_stream.out5
-rw-r--r--contrib/test_decoding/specs/concurrent_stream.spec10
-rw-r--r--contrib/test_decoding/test_decoding.c93
-rw-r--r--src/backend/replication/logical/reorderbuffer.c1
-rw-r--r--src/include/replication/reorderbuffer.h5
-rw-r--r--src/tools/pgindent/typedefs.list1
6 files changed, 96 insertions, 19 deletions
diff --git a/contrib/test_decoding/expected/concurrent_stream.out b/contrib/test_decoding/expected/concurrent_stream.out
index e731d13d8fa..6f8b2176db5 100644
--- a/contrib/test_decoding/expected/concurrent_stream.out
+++ b/contrib/test_decoding/expected/concurrent_stream.out
@@ -1,11 +1,12 @@
-Parsed test spec with 2 sessions
+Parsed test spec with 3 sessions
-starting permutation: s0_begin s0_ddl s1_ddl s1_begin s1_toast_insert s1_commit s1_get_stream_changes
+starting permutation: s0_begin s0_ddl s1_ddl s1_begin s1_toast_insert s2_ddl s1_commit s1_get_stream_changes
step s0_begin: BEGIN;
step s0_ddl: CREATE TABLE stream_test1(data text);
step s1_ddl: CREATE TABLE stream_test(data text);
step s1_begin: BEGIN;
step s1_toast_insert: INSERT INTO stream_test SELECT large_val();
+step s2_ddl: CREATE TABLE stream_test2(data text);
step s1_commit: COMMIT;
step s1_get_stream_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
data
diff --git a/contrib/test_decoding/specs/concurrent_stream.spec b/contrib/test_decoding/specs/concurrent_stream.spec
index ad9fde9c284..54218a4b3f6 100644
--- a/contrib/test_decoding/specs/concurrent_stream.spec
+++ b/contrib/test_decoding/specs/concurrent_stream.spec
@@ -23,9 +23,15 @@ setup { SET synchronous_commit=on; }
step "s0_begin" { BEGIN; }
step "s0_ddl" {CREATE TABLE stream_test1(data text);}
+session "s2"
+setup { SET synchronous_commit=on; }
+step "s2_ddl" {CREATE TABLE stream_test2(data text);}
+
# The transaction commit for s1_ddl will add the INTERNAL_SNAPSHOT change to
# the currently running s0_ddl and we want to test that s0_ddl should not get
-# streamed when user asked to skip-empty-xacts.
+# streamed when user asked to skip-empty-xacts. Similarly, the
+# INTERNAL_SNAPSHOT change added by s2_ddl should not change the results for
+# what gets streamed.
session "s1"
setup { SET synchronous_commit=on; }
step "s1_ddl" { CREATE TABLE stream_test(data text); }
@@ -34,4 +40,4 @@ step "s1_toast_insert" {INSERT INTO stream_test SELECT large_val();}
step "s1_commit" { COMMIT; }
step "s1_get_stream_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');}
-permutation "s0_begin" "s0_ddl" "s1_ddl" "s1_begin" "s1_toast_insert" "s1_commit" "s1_get_stream_changes"
+permutation "s0_begin" "s0_ddl" "s1_ddl" "s1_begin" "s1_toast_insert" "s2_ddl" "s1_commit" "s1_get_stream_changes"
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 8e33614f144..e12278beb58 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -34,10 +34,24 @@ typedef struct
bool include_xids;
bool include_timestamp;
bool skip_empty_xacts;
- bool xact_wrote_changes;
bool only_local;
} TestDecodingData;
+/*
+ * Maintain the per-transaction level variables to track whether the
+ * transaction and or streams have written any changes. In streaming mode the
+ * transaction can be decoded in streams so along with maintaining whether the
+ * transaction has written any changes, we also need to track whether the
+ * current stream has written any changes. This is required so that if user
+ * has requested to skip the empty transactions we can skip the empty streams
+ * even though the transaction has written some changes.
+ */
+typedef struct
+{
+ bool xact_wrote_changes;
+ bool stream_wrote_changes;
+} TestDecodingTxnData;
+
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
bool is_init);
static void pg_decode_shutdown(LogicalDecodingContext *ctx);
@@ -255,8 +269,12 @@ static void
pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{
TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata =
+ MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
+
+ txndata->xact_wrote_changes = false;
+ txn->output_plugin_private = txndata;
- data->xact_wrote_changes = false;
if (data->skip_empty_xacts)
return;
@@ -280,8 +298,13 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata = txn->output_plugin_private;
+ bool xact_wrote_changes = txndata->xact_wrote_changes;
+
+ pfree(txndata);
+ txn->output_plugin_private = NULL;
- if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ if (data->skip_empty_xacts && !xact_wrote_changes)
return;
OutputPluginPrepareWrite(ctx, true);
@@ -442,18 +465,20 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change)
{
TestDecodingData *data;
+ TestDecodingTxnData *txndata;
Form_pg_class class_form;
TupleDesc tupdesc;
MemoryContext old;
data = ctx->output_plugin_private;
+ txndata = txn->output_plugin_private;
/* output BEGIN if we haven't yet */
- if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
{
pg_output_begin(ctx, data, txn, false);
}
- data->xact_wrote_changes = true;
+ txndata->xact_wrote_changes = true;
class_form = RelationGetForm(relation);
tupdesc = RelationGetDescr(relation);
@@ -527,17 +552,19 @@ pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
int nrelations, Relation relations[], ReorderBufferChange *change)
{
TestDecodingData *data;
+ TestDecodingTxnData *txndata;
MemoryContext old;
int i;
data = ctx->output_plugin_private;
+ txndata = txn->output_plugin_private;
/* output BEGIN if we haven't yet */
- if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
{
pg_output_begin(ctx, data, txn, false);
}
- data->xact_wrote_changes = true;
+ txndata->xact_wrote_changes = true;
/* Avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);
@@ -592,8 +619,20 @@ pg_decode_stream_start(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata = txn->output_plugin_private;
- data->xact_wrote_changes = false;
+ /*
+ * Allocate the txn plugin data for the first stream in the transaction.
+ */
+ if (txndata == NULL)
+ {
+ txndata =
+ MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData));
+ txndata->xact_wrote_changes = false;
+ txn->output_plugin_private = txndata;
+ }
+
+ txndata->stream_wrote_changes = false;
if (data->skip_empty_xacts)
return;
pg_output_stream_start(ctx, data, txn, true);
@@ -615,8 +654,9 @@ pg_decode_stream_stop(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata = txn->output_plugin_private;
- if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
return;
OutputPluginPrepareWrite(ctx, true);
@@ -634,7 +674,23 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
{
TestDecodingData *data = ctx->output_plugin_private;
- if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ /*
+ * stream abort can be sent for an individual subtransaction but we
+ * maintain the output_plugin_private only under the toptxn so if this is
+ * not the toptxn then fetch the toptxn.
+ */
+ ReorderBufferTXN *toptxn = txn->toptxn ? txn->toptxn : txn;
+ TestDecodingTxnData *txndata = toptxn->output_plugin_private;
+ bool xact_wrote_changes = txndata->xact_wrote_changes;
+
+ if (txn->toptxn == NULL)
+ {
+ Assert(txn->output_plugin_private != NULL);
+ pfree(txndata);
+ txn->output_plugin_private = NULL;
+ }
+
+ if (data->skip_empty_xacts && !xact_wrote_changes)
return;
OutputPluginPrepareWrite(ctx, true);
@@ -651,8 +707,13 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx,
XLogRecPtr commit_lsn)
{
TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata = txn->output_plugin_private;
+ bool xact_wrote_changes = txndata->xact_wrote_changes;
+
+ pfree(txndata);
+ txn->output_plugin_private = NULL;
- if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ if (data->skip_empty_xacts && !xact_wrote_changes)
return;
OutputPluginPrepareWrite(ctx, true);
@@ -681,13 +742,14 @@ pg_decode_stream_change(LogicalDecodingContext *ctx,
ReorderBufferChange *change)
{
TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata = txn->output_plugin_private;
/* output stream start if we haven't yet */
- if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
{
pg_output_stream_start(ctx, data, txn, false);
}
- data->xact_wrote_changes = true;
+ txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
@@ -734,12 +796,13 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
ReorderBufferChange *change)
{
TestDecodingData *data = ctx->output_plugin_private;
+ TestDecodingTxnData *txndata = txn->output_plugin_private;
- if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ if (data->skip_empty_xacts && !txndata->stream_wrote_changes)
{
pg_output_stream_start(ctx, data, txn, false);
}
- data->xact_wrote_changes = true;
+ txndata->xact_wrote_changes = txndata->stream_wrote_changes = true;
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c1bd68011c5..301baff2446 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -402,6 +402,7 @@ ReorderBufferGetTXN(ReorderBuffer *rb)
/* InvalidCommandId is not zero, so set it explicitly */
txn->command_id = InvalidCommandId;
+ txn->output_plugin_private = NULL;
return txn;
}
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index dfdda938b2a..bd9dd7ec676 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -378,6 +378,11 @@ typedef struct ReorderBufferTXN
/* If we have detected concurrent abort then ignore future changes. */
bool concurrent_abort;
+
+ /*
+ * Private data pointer of the output plugin.
+ */
+ void *output_plugin_private;
} ReorderBufferTXN;
/* so we can define the callbacks used inside struct ReorderBuffer itself */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index b146b3ea73d..fde701bfd4d 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2505,6 +2505,7 @@ Tcl_Obj
Tcl_Time
TempNamespaceStatus
TestDecodingData
+TestDecodingTxnData
TestSpec
TextFreq
TextPositionState