aboutsummaryrefslogtreecommitdiff
path: root/contrib/test_decoding
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/test_decoding')
-rw-r--r--contrib/test_decoding/Makefile2
-rw-r--r--contrib/test_decoding/expected/ddl.out21
-rw-r--r--contrib/test_decoding/expected/messages.out79
-rw-r--r--contrib/test_decoding/sql/ddl.sql3
-rw-r--r--contrib/test_decoding/sql/messages.sql25
-rw-r--r--contrib/test_decoding/test_decoding.c18
6 files changed, 139 insertions, 9 deletions
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 06c95461f8a..309cb0b39a3 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -38,7 +38,7 @@ submake-test_decoding:
$(MAKE) -C $(top_builddir)/contrib/test_decoding
REGRESSCHECKS=ddl xact rewrite toast permissions decoding_in_xact \
- decoding_into_rel binary prepared replorigin time
+ decoding_into_rel binary prepared replorigin time messages
regresscheck: | submake-regress submake-test_decoding temp-install
$(MKDIR_P) regression_output
diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out
index 77719e8fed0..32cd24d6f0d 100644
--- a/contrib/test_decoding/expected/ddl.out
+++ b/contrib/test_decoding/expected/ddl.out
@@ -220,11 +220,17 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
(7 rows)
/*
- * check that disk spooling works
+ * check that disk spooling works (also for logical messages)
*/
BEGIN;
CREATE TABLE tr_etoomuch (id serial primary key, data int);
INSERT INTO tr_etoomuch(data) SELECT g.i FROM generate_series(1, 10234) g(i);
+SELECT 'tx logical msg' FROM pg_logical_emit_message(true, 'test', 'tx logical msg');
+ ?column?
+----------------
+ tx logical msg
+(1 row)
+
DELETE FROM tr_etoomuch WHERE id < 5000;
UPDATE tr_etoomuch SET data = - data WHERE id > 5000;
COMMIT;
@@ -233,12 +239,13 @@ SELECT count(*), min(data), max(data)
FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1')
GROUP BY substring(data, 1, 24)
ORDER BY 1,2;
- count | min | max
--------+-------------------------------------------------+------------------------------------------------------------------------
- 1 | BEGIN | BEGIN
- 1 | COMMIT | COMMIT
- 20467 | table public.tr_etoomuch: DELETE: id[integer]:1 | table public.tr_etoomuch: UPDATE: id[integer]:9999 data[integer]:-9999
-(3 rows)
+ count | min | max
+-------+-----------------------------------------------------------------------+------------------------------------------------------------------------
+ 1 | BEGIN | BEGIN
+ 1 | COMMIT | COMMIT
+ 1 | message: transactional: 1 prefix: test, sz: 14 content:tx logical msg | message: transactional: 1 prefix: test, sz: 14 content:tx logical msg
+ 20467 | table public.tr_etoomuch: DELETE: id[integer]:1 | table public.tr_etoomuch: UPDATE: id[integer]:9999 data[integer]:-9999
+(4 rows)
-- check updates of primary keys work correctly
BEGIN;
diff --git a/contrib/test_decoding/expected/messages.out b/contrib/test_decoding/expected/messages.out
new file mode 100644
index 00000000000..e26f793daf8
--- /dev/null
+++ b/contrib/test_decoding/expected/messages.out
@@ -0,0 +1,79 @@
+-- predictability
+SET synchronous_commit = on;
+SET client_encoding = 'utf8';
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column?
+----------
+ init
+(1 row)
+
+SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1');
+ ?column?
+----------
+ msg1
+(1 row)
+
+SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2');
+ ?column?
+----------
+ msg2
+(1 row)
+
+BEGIN;
+SELECT 'msg3' FROM pg_logical_emit_message(true, 'test', 'msg3');
+ ?column?
+----------
+ msg3
+(1 row)
+
+SELECT 'msg4' FROM pg_logical_emit_message(false, 'test', 'msg4');
+ ?column?
+----------
+ msg4
+(1 row)
+
+ROLLBACK;
+BEGIN;
+SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', 'msg5');
+ ?column?
+----------
+ msg5
+(1 row)
+
+SELECT 'msg6' FROM pg_logical_emit_message(false, 'test', 'msg6');
+ ?column?
+----------
+ msg6
+(1 row)
+
+SELECT 'msg7' FROM pg_logical_emit_message(true, 'test', 'msg7');
+ ?column?
+----------
+ msg7
+(1 row)
+
+COMMIT;
+SELECT 'žluťoučký kůň' FROM pg_logical_emit_message(true, 'test', 'žluťoučký kůň');
+ ?column?
+---------------
+ žluťoučký kůň
+(1 row)
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+ data
+----------------------------------------------------------------------
+ message: transactional: 1 prefix: test, sz: 4 content:msg1
+ message: transactional: 0 prefix: test, sz: 4 content:msg2
+ message: transactional: 0 prefix: test, sz: 4 content:msg4
+ message: transactional: 0 prefix: test, sz: 4 content:msg6
+ message: transactional: 1 prefix: test, sz: 4 content:msg5
+ message: transactional: 1 prefix: test, sz: 4 content:msg7
+ message: transactional: 1 prefix: test, sz: 19 content:žluťoučký kůň
+(7 rows)
+
+SELECT 'init' FROM pg_drop_replication_slot('regression_slot');
+ ?column?
+----------
+ init
+(1 row)
+
diff --git a/contrib/test_decoding/sql/ddl.sql b/contrib/test_decoding/sql/ddl.sql
index ad928ad5726..b1f7bf693a8 100644
--- a/contrib/test_decoding/sql/ddl.sql
+++ b/contrib/test_decoding/sql/ddl.sql
@@ -108,11 +108,12 @@ DELETE FROM tr_pkey;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
/*
- * check that disk spooling works
+ * check that disk spooling works (also for logical messages)
*/
BEGIN;
CREATE TABLE tr_etoomuch (id serial primary key, data int);
INSERT INTO tr_etoomuch(data) SELECT g.i FROM generate_series(1, 10234) g(i);
+SELECT 'tx logical msg' FROM pg_logical_emit_message(true, 'test', 'tx logical msg');
DELETE FROM tr_etoomuch WHERE id < 5000;
UPDATE tr_etoomuch SET data = - data WHERE id > 5000;
COMMIT;
diff --git a/contrib/test_decoding/sql/messages.sql b/contrib/test_decoding/sql/messages.sql
new file mode 100644
index 00000000000..d72191b0a9c
--- /dev/null
+++ b/contrib/test_decoding/sql/messages.sql
@@ -0,0 +1,25 @@
+-- predictability
+SET synchronous_commit = on;
+SET client_encoding = 'utf8';
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1');
+SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2');
+
+BEGIN;
+SELECT 'msg3' FROM pg_logical_emit_message(true, 'test', 'msg3');
+SELECT 'msg4' FROM pg_logical_emit_message(false, 'test', 'msg4');
+ROLLBACK;
+
+BEGIN;
+SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', 'msg5');
+SELECT 'msg6' FROM pg_logical_emit_message(false, 'test', 'msg6');
+SELECT 'msg7' FROM pg_logical_emit_message(true, 'test', 'msg7');
+COMMIT;
+
+SELECT 'žluťoučký kůň' FROM pg_logical_emit_message(true, 'test', 'žluťoučký kůň');
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+
+SELECT 'init' FROM pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 4cf808f2814..3336e1e16e7 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -21,6 +21,7 @@
#include "replication/output_plugin.h"
#include "replication/logical.h"
+#include "replication/message.h"
#include "replication/origin.h"
#include "utils/builtins.h"
@@ -63,6 +64,10 @@ static void pg_decode_change(LogicalDecodingContext *ctx,
ReorderBufferChange *change);
static bool pg_decode_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id);
+static void pg_decode_message(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr message_lsn,
+ bool transactional, const char *prefix,
+ Size sz, const char *message);
void
_PG_init(void)
@@ -82,6 +87,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->commit_cb = pg_decode_commit_txn;
cb->filter_by_origin_cb = pg_decode_filter;
cb->shutdown_cb = pg_decode_shutdown;
+ cb->message_cb = pg_decode_message;
}
@@ -471,3 +477,15 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
OutputPluginWrite(ctx, true);
}
+
+static void
+pg_decode_message(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
+ const char *prefix, Size sz, const char *message)
+{
+ OutputPluginPrepareWrite(ctx, true);
+ appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
+ transactional, prefix, sz);
+ appendBinaryStringInfo(ctx->out, message, sz);
+ OutputPluginWrite(ctx, true);
+}