From 3fe3511d05127cc024b221040db2eeb352e7d716 Mon Sep 17 00:00:00 2001 From: Simon Riggs Date: Wed, 6 Apr 2016 10:05:41 +0100 Subject: Generic Messages for Logical Decoding API and mechanism to allow generic messages to be inserted into WAL that are intended to be read by logical decoding plugins. This commit adds an optional new callback to the logical decoding API. Messages are either text or bytea. Messages can be transactional, or not, and are identified by a prefix to allow multiple concurrent decoding plugins. (Not to be confused with Generic WAL records, which are intended to allow crash recovery of extensible objects.) Author: Petr Jelinek and Andres Freund Reviewers: Artur Zakirov, Tomas Vondra, Simon Riggs Discussion: 5685F999.6010202@2ndquadrant.com --- contrib/test_decoding/test_decoding.c | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) (limited to 'contrib/test_decoding/test_decoding.c') diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 4cf808f2814..3336e1e16e7 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -21,6 +21,7 @@ #include "replication/output_plugin.h" #include "replication/logical.h" +#include "replication/message.h" #include "replication/origin.h" #include "utils/builtins.h" @@ -63,6 +64,10 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferChange *change); static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); +static void pg_decode_message(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr message_lsn, + bool transactional, const char *prefix, + Size sz, const char *message); void _PG_init(void) @@ -82,6 +87,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->commit_cb = pg_decode_commit_txn; cb->filter_by_origin_cb = pg_decode_filter; cb->shutdown_cb = pg_decode_shutdown; + cb->message_cb = pg_decode_message; } @@ -471,3 +477,15 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginWrite(ctx, true); } + +static void +pg_decode_message(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, + const char *prefix, Size sz, const char *message) +{ + OutputPluginPrepareWrite(ctx, true); + appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:", + transactional, prefix, sz); + appendBinaryStringInfo(ctx->out, message, sz); + OutputPluginWrite(ctx, true); +} -- cgit v1.2.3