aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2021-04-06 08:40:47 +0530
committerAmit Kapila <akapila@postgresql.org>2021-04-06 08:40:47 +0530
commitac4645c0157fc5fcef0af8ff571512aa284a2cec (patch)
treea414a4b21e4a79ac04cd67b8ed4ac907d9856f76 /src/backend/replication/logical
parent531737ddad214cb8a675953208e2f3a6b1be122b (diff)
downloadpostgresql-ac4645c0157fc5fcef0af8ff571512aa284a2cec.tar.gz
postgresql-ac4645c0157fc5fcef0af8ff571512aa284a2cec.zip
Allow pgoutput to send logical decoding messages.
The output plugin accepts a new parameter (messages) that controls if logical decoding messages are written into the replication stream. It is useful for those clients that use pgoutput as an output plugin and needs to process messages that were written by pg_logical_emit_message(). Although logical streaming replication protocol supports logical decoding messages now, logical replication does not use this feature yet. Author: David Pirotte, Euler Taveira Reviewed-by: Euler Taveira, Andres Freund, Ashutosh Bapat, Amit Kapila Discussion: https://postgr.es/m/CADK3HHJ-+9SO7KuRLH=9Wa1rAo60Yreq1GFNkH_kd0=CdaWM+A@mail.gmail.com
Diffstat (limited to 'src/backend/replication/logical')
-rw-r--r--src/backend/replication/logical/proto.c28
-rw-r--r--src/backend/replication/logical/worker.c9
2 files changed, 37 insertions, 0 deletions
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index f2c85cabb52..2a1f9830e05 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -25,6 +25,7 @@
*/
#define LOGICALREP_IS_REPLICA_IDENTITY 1
+#define MESSAGE_TRANSACTIONAL (1<<0)
#define TRUNCATE_CASCADE (1<<0)
#define TRUNCATE_RESTART_SEQS (1<<1)
@@ -362,6 +363,33 @@ logicalrep_read_truncate(StringInfo in,
}
/*
+ * Write MESSAGE to stream
+ */
+void
+logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
+ bool transactional, const char *prefix, Size sz,
+ const char *message)
+{
+ uint8 flags = 0;
+
+ pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
+
+ /* encode and send message flags */
+ if (transactional)
+ flags |= MESSAGE_TRANSACTIONAL;
+
+ /* transaction ID (if not valid, we're not streaming) */
+ if (TransactionIdIsValid(xid))
+ pq_sendint32(out, xid);
+
+ pq_sendint8(out, flags);
+ pq_sendint64(out, lsn);
+ pq_sendstring(out, prefix);
+ pq_sendint32(out, sz);
+ pq_sendbytes(out, message, sz);
+}
+
+/*
* Write relation description to the output stream.
*/
void
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 354fbe4b4bc..74d538b5e37 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1939,6 +1939,15 @@ apply_dispatch(StringInfo s)
apply_handle_origin(s);
return;
+ case LOGICAL_REP_MSG_MESSAGE:
+
+ /*
+ * Logical replication does not use generic logical messages yet.
+ * Although, it could be used by other applications that use this
+ * output plugin.
+ */
+ return;
+
case LOGICAL_REP_MSG_STREAM_START:
apply_handle_stream_start(s);
return;