aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication')
-rw-r--r--src/backend/replication/logical/proto.c28
-rw-r--r--src/backend/replication/logical/worker.c9
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c47
3 files changed, 84 insertions, 0 deletions
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index f2c85cabb52..2a1f9830e05 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -25,6 +25,7 @@
*/
#define LOGICALREP_IS_REPLICA_IDENTITY 1
+#define MESSAGE_TRANSACTIONAL (1<<0)
#define TRUNCATE_CASCADE (1<<0)
#define TRUNCATE_RESTART_SEQS (1<<1)
@@ -362,6 +363,33 @@ logicalrep_read_truncate(StringInfo in,
}
/*
+ * Write MESSAGE to stream
+ */
+void
+logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
+ bool transactional, const char *prefix, Size sz,
+ const char *message)
+{
+ uint8 flags = 0;
+
+ pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
+
+ /* encode and send message flags */
+ if (transactional)
+ flags |= MESSAGE_TRANSACTIONAL;
+
+ /* transaction ID (if not valid, we're not streaming) */
+ if (TransactionIdIsValid(xid))
+ pq_sendint32(out, xid);
+
+ pq_sendint8(out, flags);
+ pq_sendint64(out, lsn);
+ pq_sendstring(out, prefix);
+ pq_sendint32(out, sz);
+ pq_sendbytes(out, message, sz);
+}
+
+/*
* Write relation description to the output stream.
*/
void
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 354fbe4b4bc..74d538b5e37 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1939,6 +1939,15 @@ apply_dispatch(StringInfo s)
apply_handle_origin(s);
return;
+ case LOGICAL_REP_MSG_MESSAGE:
+
+ /*
+ * Logical replication does not use generic logical messages yet.
+ * Although, it could be used by other applications that use this
+ * output plugin.
+ */
+ return;
+
case LOGICAL_REP_MSG_STREAM_START:
apply_handle_stream_start(s);
return;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 6146c5acdb3..f68348dcf45 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -45,6 +45,10 @@ static void pgoutput_change(LogicalDecodingContext *ctx,
static void pgoutput_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, int nrelations, Relation relations[],
ReorderBufferChange *change);
+static void pgoutput_message(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr message_lsn,
+ bool transactional, const char *prefix,
+ Size sz, const char *message);
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id);
static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
@@ -142,6 +146,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->begin_cb = pgoutput_begin_txn;
cb->change_cb = pgoutput_change;
cb->truncate_cb = pgoutput_truncate;
+ cb->message_cb = pgoutput_message;
cb->commit_cb = pgoutput_commit_txn;
cb->filter_by_origin_cb = pgoutput_origin_filter;
cb->shutdown_cb = pgoutput_shutdown;
@@ -152,6 +157,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->stream_abort_cb = pgoutput_stream_abort;
cb->stream_commit_cb = pgoutput_stream_commit;
cb->stream_change_cb = pgoutput_change;
+ cb->stream_message_cb = pgoutput_message;
cb->stream_truncate_cb = pgoutput_truncate;
}
@@ -162,10 +168,12 @@ parse_output_parameters(List *options, PGOutputData *data)
bool protocol_version_given = false;
bool publication_names_given = false;
bool binary_option_given = false;
+ bool messages_option_given = false;
bool streaming_given = false;
data->binary = false;
data->streaming = false;
+ data->messages = false;
foreach(lc, options)
{
@@ -221,6 +229,16 @@ parse_output_parameters(List *options, PGOutputData *data)
data->binary = defGetBoolean(defel);
}
+ else if (strcmp(defel->defname, "messages") == 0)
+ {
+ if (messages_option_given)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ messages_option_given = true;
+
+ data->messages = defGetBoolean(defel);
+ }
else if (strcmp(defel->defname, "streaming") == 0)
{
if (streaming_given)
@@ -689,6 +707,35 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
MemoryContextReset(data->context);
}
+static void
+pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
+ const char *message)
+{
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ TransactionId xid = InvalidTransactionId;
+
+ if (!data->messages)
+ return;
+
+ /*
+ * Remember the xid for the message in streaming mode. See
+ * pgoutput_change.
+ */
+ if (in_streaming)
+ xid = txn->xid;
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_message(ctx->out,
+ xid,
+ message_lsn,
+ transactional,
+ prefix,
+ sz,
+ message);
+ OutputPluginWrite(ctx, true);
+}
+
/*
* Currently we always forward.
*/