diff options
Diffstat (limited to 'contrib/test_decoding')
-rw-r--r-- | contrib/test_decoding/Makefile | 2 | ||||
-rw-r--r-- | contrib/test_decoding/expected/ddl.out | 21 | ||||
-rw-r--r-- | contrib/test_decoding/expected/messages.out | 79 | ||||
-rw-r--r-- | contrib/test_decoding/sql/ddl.sql | 3 | ||||
-rw-r--r-- | contrib/test_decoding/sql/messages.sql | 25 | ||||
-rw-r--r-- | contrib/test_decoding/test_decoding.c | 18 |
6 files changed, 139 insertions, 9 deletions
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index 06c95461f8a..309cb0b39a3 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -38,7 +38,7 @@ submake-test_decoding: $(MAKE) -C $(top_builddir)/contrib/test_decoding REGRESSCHECKS=ddl xact rewrite toast permissions decoding_in_xact \ - decoding_into_rel binary prepared replorigin time + decoding_into_rel binary prepared replorigin time messages regresscheck: | submake-regress submake-test_decoding temp-install $(MKDIR_P) regression_output diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out index 77719e8fed0..32cd24d6f0d 100644 --- a/contrib/test_decoding/expected/ddl.out +++ b/contrib/test_decoding/expected/ddl.out @@ -220,11 +220,17 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc (7 rows) /* - * check that disk spooling works + * check that disk spooling works (also for logical messages) */ BEGIN; CREATE TABLE tr_etoomuch (id serial primary key, data int); INSERT INTO tr_etoomuch(data) SELECT g.i FROM generate_series(1, 10234) g(i); +SELECT 'tx logical msg' FROM pg_logical_emit_message(true, 'test', 'tx logical msg'); + ?column? +---------------- + tx logical msg +(1 row) + DELETE FROM tr_etoomuch WHERE id < 5000; UPDATE tr_etoomuch SET data = - data WHERE id > 5000; COMMIT; @@ -233,12 +239,13 @@ SELECT count(*), min(data), max(data) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1') GROUP BY substring(data, 1, 24) ORDER BY 1,2; - count | min | max --------+-------------------------------------------------+------------------------------------------------------------------------ - 1 | BEGIN | BEGIN - 1 | COMMIT | COMMIT - 20467 | table public.tr_etoomuch: DELETE: id[integer]:1 | table public.tr_etoomuch: UPDATE: id[integer]:9999 data[integer]:-9999 -(3 rows) + count | min | max +-------+-----------------------------------------------------------------------+------------------------------------------------------------------------ + 1 | BEGIN | BEGIN + 1 | COMMIT | COMMIT + 1 | message: transactional: 1 prefix: test, sz: 14 content:tx logical msg | message: transactional: 1 prefix: test, sz: 14 content:tx logical msg + 20467 | table public.tr_etoomuch: DELETE: id[integer]:1 | table public.tr_etoomuch: UPDATE: id[integer]:9999 data[integer]:-9999 +(4 rows) -- check updates of primary keys work correctly BEGIN; diff --git a/contrib/test_decoding/expected/messages.out b/contrib/test_decoding/expected/messages.out new file mode 100644 index 00000000000..e26f793daf8 --- /dev/null +++ b/contrib/test_decoding/expected/messages.out @@ -0,0 +1,79 @@ +-- predictability +SET synchronous_commit = on; +SET client_encoding = 'utf8'; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1'); + ?column? +---------- + msg1 +(1 row) + +SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2'); + ?column? +---------- + msg2 +(1 row) + +BEGIN; +SELECT 'msg3' FROM pg_logical_emit_message(true, 'test', 'msg3'); + ?column? +---------- + msg3 +(1 row) + +SELECT 'msg4' FROM pg_logical_emit_message(false, 'test', 'msg4'); + ?column? +---------- + msg4 +(1 row) + +ROLLBACK; +BEGIN; +SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', 'msg5'); + ?column? +---------- + msg5 +(1 row) + +SELECT 'msg6' FROM pg_logical_emit_message(false, 'test', 'msg6'); + ?column? +---------- + msg6 +(1 row) + +SELECT 'msg7' FROM pg_logical_emit_message(true, 'test', 'msg7'); + ?column? +---------- + msg7 +(1 row) + +COMMIT; +SELECT 'žluťoučký kůň' FROM pg_logical_emit_message(true, 'test', 'žluťoučký kůň'); + ?column? +--------------- + žluťoučký kůň +(1 row) + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1'); + data +---------------------------------------------------------------------- + message: transactional: 1 prefix: test, sz: 4 content:msg1 + message: transactional: 0 prefix: test, sz: 4 content:msg2 + message: transactional: 0 prefix: test, sz: 4 content:msg4 + message: transactional: 0 prefix: test, sz: 4 content:msg6 + message: transactional: 1 prefix: test, sz: 4 content:msg5 + message: transactional: 1 prefix: test, sz: 4 content:msg7 + message: transactional: 1 prefix: test, sz: 19 content:žluťoučký kůň +(7 rows) + +SELECT 'init' FROM pg_drop_replication_slot('regression_slot'); + ?column? +---------- + init +(1 row) + diff --git a/contrib/test_decoding/sql/ddl.sql b/contrib/test_decoding/sql/ddl.sql index ad928ad5726..b1f7bf693a8 100644 --- a/contrib/test_decoding/sql/ddl.sql +++ b/contrib/test_decoding/sql/ddl.sql @@ -108,11 +108,12 @@ DELETE FROM tr_pkey; SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); /* - * check that disk spooling works + * check that disk spooling works (also for logical messages) */ BEGIN; CREATE TABLE tr_etoomuch (id serial primary key, data int); INSERT INTO tr_etoomuch(data) SELECT g.i FROM generate_series(1, 10234) g(i); +SELECT 'tx logical msg' FROM pg_logical_emit_message(true, 'test', 'tx logical msg'); DELETE FROM tr_etoomuch WHERE id < 5000; UPDATE tr_etoomuch SET data = - data WHERE id > 5000; COMMIT; diff --git a/contrib/test_decoding/sql/messages.sql b/contrib/test_decoding/sql/messages.sql new file mode 100644 index 00000000000..d72191b0a9c --- /dev/null +++ b/contrib/test_decoding/sql/messages.sql @@ -0,0 +1,25 @@ +-- predictability +SET synchronous_commit = on; +SET client_encoding = 'utf8'; + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + +SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1'); +SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2'); + +BEGIN; +SELECT 'msg3' FROM pg_logical_emit_message(true, 'test', 'msg3'); +SELECT 'msg4' FROM pg_logical_emit_message(false, 'test', 'msg4'); +ROLLBACK; + +BEGIN; +SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', 'msg5'); +SELECT 'msg6' FROM pg_logical_emit_message(false, 'test', 'msg6'); +SELECT 'msg7' FROM pg_logical_emit_message(true, 'test', 'msg7'); +COMMIT; + +SELECT 'žluťoučký kůň' FROM pg_logical_emit_message(true, 'test', 'žluťoučký kůň'); + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1'); + +SELECT 'init' FROM pg_drop_replication_slot('regression_slot'); diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 4cf808f2814..3336e1e16e7 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -21,6 +21,7 @@ #include "replication/output_plugin.h" #include "replication/logical.h" +#include "replication/message.h" #include "replication/origin.h" #include "utils/builtins.h" @@ -63,6 +64,10 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferChange *change); static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); +static void pg_decode_message(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr message_lsn, + bool transactional, const char *prefix, + Size sz, const char *message); void _PG_init(void) @@ -82,6 +87,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->commit_cb = pg_decode_commit_txn; cb->filter_by_origin_cb = pg_decode_filter; cb->shutdown_cb = pg_decode_shutdown; + cb->message_cb = pg_decode_message; } @@ -471,3 +477,15 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginWrite(ctx, true); } + +static void +pg_decode_message(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, + const char *prefix, Size sz, const char *message) +{ + OutputPluginPrepareWrite(ctx, true); + appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:", + transactional, prefix, sz); + appendBinaryStringInfo(ctx->out, message, sz); + OutputPluginWrite(ctx, true); +} |