aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/test_decoding/Makefile2
-rw-r--r--contrib/test_decoding/expected/concurrent_stream.out19
-rw-r--r--contrib/test_decoding/expected/stream.out5
-rw-r--r--contrib/test_decoding/specs/concurrent_stream.spec37
-rw-r--r--contrib/test_decoding/test_decoding.c55
5 files changed, 95 insertions, 23 deletions
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index ed9a3d6c0ed..f23f15b04d4 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -7,7 +7,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
decoding_into_rel binary prepared replorigin time messages \
spill slot truncate stream
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
- oldest_xmin snapshot_transfer subxact_without_top
+ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream
REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
diff --git a/contrib/test_decoding/expected/concurrent_stream.out b/contrib/test_decoding/expected/concurrent_stream.out
new file mode 100644
index 00000000000..e731d13d8fa
--- /dev/null
+++ b/contrib/test_decoding/expected/concurrent_stream.out
@@ -0,0 +1,19 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s0_begin s0_ddl s1_ddl s1_begin s1_toast_insert s1_commit s1_get_stream_changes
+step s0_begin: BEGIN;
+step s0_ddl: CREATE TABLE stream_test1(data text);
+step s1_ddl: CREATE TABLE stream_test(data text);
+step s1_begin: BEGIN;
+step s1_toast_insert: INSERT INTO stream_test SELECT large_val();
+step s1_commit: COMMIT;
+step s1_get_stream_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+data
+
+opening a streamed block for transaction
+streaming change for transaction
+closing a streamed block for transaction
+committing streamed transaction
+?column?
+
+stop
diff --git a/contrib/test_decoding/expected/stream.out b/contrib/test_decoding/expected/stream.out
index d7e32f81854..e1c3bc838d5 100644
--- a/contrib/test_decoding/expected/stream.out
+++ b/contrib/test_decoding/expected/stream.out
@@ -29,10 +29,7 @@ COMMIT;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
data
----------------------------------------------------------
- opening a streamed block for transaction
streaming message: transactional: 1 prefix: test, sz: 50
- closing a streamed block for transaction
- aborting streamed (sub)transaction
opening a streamed block for transaction
streaming change for transaction
streaming change for transaction
@@ -56,7 +53,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl
streaming change for transaction
closing a streamed block for transaction
committing streamed transaction
-(27 rows)
+(24 rows)
-- streaming test for toast changes
ALTER TABLE stream_test ALTER COLUMN data set storage external;
diff --git a/contrib/test_decoding/specs/concurrent_stream.spec b/contrib/test_decoding/specs/concurrent_stream.spec
new file mode 100644
index 00000000000..ad9fde9c284
--- /dev/null
+++ b/contrib/test_decoding/specs/concurrent_stream.spec
@@ -0,0 +1,37 @@
+# Test decoding of in-progress transaction containing dml and a concurrent
+# transaction with ddl operation. The transaction containing ddl operation
+# should not get streamed as it doesn't have any changes.
+
+setup
+{
+ SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
+
+ -- consume DDL
+ SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ CREATE OR REPLACE FUNCTION large_val() RETURNS TEXT LANGUAGE SQL AS 'select array_agg(md5(g::text))::text from generate_series(1, 80000) g';
+}
+
+teardown
+{
+ DROP TABLE IF EXISTS stream_test;
+ DROP TABLE IF EXISTS stream_test1;
+ SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s0"
+setup { SET synchronous_commit=on; }
+step "s0_begin" { BEGIN; }
+step "s0_ddl" {CREATE TABLE stream_test1(data text);}
+
+# The transaction commit for s1_ddl will add the INTERNAL_SNAPSHOT change to
+# the currently running s0_ddl and we want to test that s0_ddl should not get
+# streamed when user asked to skip-empty-xacts.
+session "s1"
+setup { SET synchronous_commit=on; }
+step "s1_ddl" { CREATE TABLE stream_test(data text); }
+step "s1_begin" { BEGIN; }
+step "s1_toast_insert" {INSERT INTO stream_test SELECT large_val();}
+step "s1_commit" { COMMIT; }
+step "s1_get_stream_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');}
+
+permutation "s0_begin" "s0_ddl" "s1_ddl" "s1_begin" "s1_toast_insert" "s1_commit" "s1_get_stream_changes"
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 34745150e9b..e60ab34a5a7 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -64,6 +64,10 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
Size sz, const char *message);
static void pg_decode_stream_start(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
+static void pg_output_stream_start(LogicalDecodingContext *ctx,
+ TestDecodingData *data,
+ ReorderBufferTXN *txn,
+ bool last_write);
static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
@@ -583,34 +587,38 @@ pg_decode_message(LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
}
-/*
- * We never try to stream any empty xact so we don't need any special handling
- * for skip_empty_xacts in streaming mode APIs.
- */
static void
pg_decode_stream_start(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
TestDecodingData *data = ctx->output_plugin_private;
- OutputPluginPrepareWrite(ctx, true);
+ data->xact_wrote_changes = false;
+ if (data->skip_empty_xacts)
+ return;
+ pg_output_stream_start(ctx, data, txn, true);
+}
+
+static void
+pg_output_stream_start(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
+{
+ OutputPluginPrepareWrite(ctx, last_write);
if (data->include_xids)
appendStringInfo(ctx->out, "opening a streamed block for transaction TXN %u", txn->xid);
else
appendStringInfo(ctx->out, "opening a streamed block for transaction");
- OutputPluginWrite(ctx, true);
+ OutputPluginWrite(ctx, last_write);
}
-/*
- * We never try to stream any empty xact so we don't need any special handling
- * for skip_empty_xacts in streaming mode APIs.
- */
static void
pg_decode_stream_stop(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
TestDecodingData *data = ctx->output_plugin_private;
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ return;
+
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "closing a streamed block for transaction TXN %u", txn->xid);
@@ -619,10 +627,6 @@ pg_decode_stream_stop(LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
}
-/*
- * We never try to stream any empty xact so we don't need any special handling
- * for skip_empty_xacts in streaming mode APIs.
- */
static void
pg_decode_stream_abort(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
@@ -630,6 +634,9 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
{
TestDecodingData *data = ctx->output_plugin_private;
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ return;
+
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "aborting streamed (sub)transaction TXN %u", txn->xid);
@@ -638,10 +645,6 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
}
-/*
- * We never try to stream any empty xact so we don't need any special handling
- * for skip_empty_xacts in streaming mode APIs.
- */
static void
pg_decode_stream_commit(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
@@ -649,6 +652,9 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx,
{
TestDecodingData *data = ctx->output_plugin_private;
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ return;
+
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
@@ -676,6 +682,13 @@ pg_decode_stream_change(LogicalDecodingContext *ctx,
{
TestDecodingData *data = ctx->output_plugin_private;
+ /* output stream start if we haven't yet */
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ {
+ pg_output_stream_start(ctx, data, txn, false);
+ }
+ data->xact_wrote_changes = true;
+
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
@@ -722,6 +735,12 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
{
TestDecodingData *data = ctx->output_plugin_private;
+ if (data->skip_empty_xacts && !data->xact_wrote_changes)
+ {
+ pg_output_stream_start(ctx, data, txn, false);
+ }
+ data->xact_wrote_changes = true;
+
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);