diff options
author | Simon Riggs <simon@2ndQuadrant.com> | 2016-04-06 10:05:41 +0100 |
---|---|---|
committer | Simon Riggs <simon@2ndQuadrant.com> | 2016-04-06 10:05:41 +0100 |
commit | 3fe3511d05127cc024b221040db2eeb352e7d716 (patch) | |
tree | b17a084bec318a70a1c0fcd755596b771871bce7 /contrib/test_decoding | |
parent | 989be0810dffd08b54e1caecec0677608211c339 (diff) | |
download | postgresql-3fe3511d05127cc024b221040db2eeb352e7d716.tar.gz postgresql-3fe3511d05127cc024b221040db2eeb352e7d716.zip |
Generic Messages for Logical Decoding
API and mechanism to allow generic messages to be inserted into WAL that are
intended to be read by logical decoding plugins. This commit adds an optional
new callback to the logical decoding API.
Messages are either text or bytea. Messages can be transactional, or not, and
are identified by a prefix to allow multiple concurrent decoding plugins.
(Not to be confused with Generic WAL records, which are intended to allow crash
recovery of extensible objects.)
Author: Petr Jelinek and Andres Freund
Reviewers: Artur Zakirov, Tomas Vondra, Simon Riggs
Discussion: 5685F999.6010202@2ndquadrant.com
Diffstat (limited to 'contrib/test_decoding')
-rw-r--r-- | contrib/test_decoding/Makefile | 2 | ||||
-rw-r--r-- | contrib/test_decoding/expected/ddl.out | 21 | ||||
-rw-r--r-- | contrib/test_decoding/expected/messages.out | 79 | ||||
-rw-r--r-- | contrib/test_decoding/sql/ddl.sql | 3 | ||||
-rw-r--r-- | contrib/test_decoding/sql/messages.sql | 25 | ||||
-rw-r--r-- | contrib/test_decoding/test_decoding.c | 18 |
6 files changed, 139 insertions, 9 deletions
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index 06c95461f8a..309cb0b39a3 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -38,7 +38,7 @@ submake-test_decoding: $(MAKE) -C $(top_builddir)/contrib/test_decoding REGRESSCHECKS=ddl xact rewrite toast permissions decoding_in_xact \ - decoding_into_rel binary prepared replorigin time + decoding_into_rel binary prepared replorigin time messages regresscheck: | submake-regress submake-test_decoding temp-install $(MKDIR_P) regression_output diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out index 77719e8fed0..32cd24d6f0d 100644 --- a/contrib/test_decoding/expected/ddl.out +++ b/contrib/test_decoding/expected/ddl.out @@ -220,11 +220,17 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc (7 rows) /* - * check that disk spooling works + * check that disk spooling works (also for logical messages) */ BEGIN; CREATE TABLE tr_etoomuch (id serial primary key, data int); INSERT INTO tr_etoomuch(data) SELECT g.i FROM generate_series(1, 10234) g(i); +SELECT 'tx logical msg' FROM pg_logical_emit_message(true, 'test', 'tx logical msg'); + ?column? +---------------- + tx logical msg +(1 row) + DELETE FROM tr_etoomuch WHERE id < 5000; UPDATE tr_etoomuch SET data = - data WHERE id > 5000; COMMIT; @@ -233,12 +239,13 @@ SELECT count(*), min(data), max(data) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1') GROUP BY substring(data, 1, 24) ORDER BY 1,2; - count | min | max --------+-------------------------------------------------+------------------------------------------------------------------------ - 1 | BEGIN | BEGIN - 1 | COMMIT | COMMIT - 20467 | table public.tr_etoomuch: DELETE: id[integer]:1 | table public.tr_etoomuch: UPDATE: id[integer]:9999 data[integer]:-9999 -(3 rows) + count | min | max +-------+-----------------------------------------------------------------------+------------------------------------------------------------------------ + 1 | BEGIN | BEGIN + 1 | COMMIT | COMMIT + 1 | message: transactional: 1 prefix: test, sz: 14 content:tx logical msg | message: transactional: 1 prefix: test, sz: 14 content:tx logical msg + 20467 | table public.tr_etoomuch: DELETE: id[integer]:1 | table public.tr_etoomuch: UPDATE: id[integer]:9999 data[integer]:-9999 +(4 rows) -- check updates of primary keys work correctly BEGIN; diff --git a/contrib/test_decoding/expected/messages.out b/contrib/test_decoding/expected/messages.out new file mode 100644 index 00000000000..e26f793daf8 --- /dev/null +++ b/contrib/test_decoding/expected/messages.out @@ -0,0 +1,79 @@ +-- predictability +SET synchronous_commit = on; +SET client_encoding = 'utf8'; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1'); + ?column? +---------- + msg1 +(1 row) + +SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2'); + ?column? +---------- + msg2 +(1 row) + +BEGIN; +SELECT 'msg3' FROM pg_logical_emit_message(true, 'test', 'msg3'); + ?column? +---------- + msg3 +(1 row) + +SELECT 'msg4' FROM pg_logical_emit_message(false, 'test', 'msg4'); + ?column? +---------- + msg4 +(1 row) + +ROLLBACK; +BEGIN; +SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', 'msg5'); + ?column? +---------- + msg5 +(1 row) + +SELECT 'msg6' FROM pg_logical_emit_message(false, 'test', 'msg6'); + ?column? +---------- + msg6 +(1 row) + +SELECT 'msg7' FROM pg_logical_emit_message(true, 'test', 'msg7'); + ?column? +---------- + msg7 +(1 row) + +COMMIT; +SELECT 'žluťoučký kůň' FROM pg_logical_emit_message(true, 'test', 'žluťoučký kůň'); + ?column? +--------------- + žluťoučký kůň +(1 row) + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1'); + data +---------------------------------------------------------------------- + message: transactional: 1 prefix: test, sz: 4 content:msg1 + message: transactional: 0 prefix: test, sz: 4 content:msg2 + message: transactional: 0 prefix: test, sz: 4 content:msg4 + message: transactional: 0 prefix: test, sz: 4 content:msg6 + message: transactional: 1 prefix: test, sz: 4 content:msg5 + message: transactional: 1 prefix: test, sz: 4 content:msg7 + message: transactional: 1 prefix: test, sz: 19 content:žluťoučký kůň +(7 rows) + +SELECT 'init' FROM pg_drop_replication_slot('regression_slot'); + ?column? +---------- + init +(1 row) + diff --git a/contrib/test_decoding/sql/ddl.sql b/contrib/test_decoding/sql/ddl.sql index ad928ad5726..b1f7bf693a8 100644 --- a/contrib/test_decoding/sql/ddl.sql +++ b/contrib/test_decoding/sql/ddl.sql @@ -108,11 +108,12 @@ DELETE FROM tr_pkey; SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); /* - * check that disk spooling works + * check that disk spooling works (also for logical messages) */ BEGIN; CREATE TABLE tr_etoomuch (id serial primary key, data int); INSERT INTO tr_etoomuch(data) SELECT g.i FROM generate_series(1, 10234) g(i); +SELECT 'tx logical msg' FROM pg_logical_emit_message(true, 'test', 'tx logical msg'); DELETE FROM tr_etoomuch WHERE id < 5000; UPDATE tr_etoomuch SET data = - data WHERE id > 5000; COMMIT; diff --git a/contrib/test_decoding/sql/messages.sql b/contrib/test_decoding/sql/messages.sql new file mode 100644 index 00000000000..d72191b0a9c --- /dev/null +++ b/contrib/test_decoding/sql/messages.sql @@ -0,0 +1,25 @@ +-- predictability +SET synchronous_commit = on; +SET client_encoding = 'utf8'; + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + +SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1'); +SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2'); + +BEGIN; +SELECT 'msg3' FROM pg_logical_emit_message(true, 'test', 'msg3'); +SELECT 'msg4' FROM pg_logical_emit_message(false, 'test', 'msg4'); +ROLLBACK; + +BEGIN; +SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', 'msg5'); +SELECT 'msg6' FROM pg_logical_emit_message(false, 'test', 'msg6'); +SELECT 'msg7' FROM pg_logical_emit_message(true, 'test', 'msg7'); +COMMIT; + +SELECT 'žluťoučký kůň' FROM pg_logical_emit_message(true, 'test', 'žluťoučký kůň'); + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1'); + +SELECT 'init' FROM pg_drop_replication_slot('regression_slot'); diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 4cf808f2814..3336e1e16e7 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -21,6 +21,7 @@ #include "replication/output_plugin.h" #include "replication/logical.h" +#include "replication/message.h" #include "replication/origin.h" #include "utils/builtins.h" @@ -63,6 +64,10 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferChange *change); static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); +static void pg_decode_message(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr message_lsn, + bool transactional, const char *prefix, + Size sz, const char *message); void _PG_init(void) @@ -82,6 +87,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->commit_cb = pg_decode_commit_txn; cb->filter_by_origin_cb = pg_decode_filter; cb->shutdown_cb = pg_decode_shutdown; + cb->message_cb = pg_decode_message; } @@ -471,3 +477,15 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginWrite(ctx, true); } + +static void +pg_decode_message(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, + const char *prefix, Size sz, const char *message) +{ + OutputPluginPrepareWrite(ctx, true); + appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:", + transactional, prefix, sz); + appendBinaryStringInfo(ctx->out, message, sz); + OutputPluginWrite(ctx, true); +} |