aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical')
-rw-r--r--src/backend/replication/logical/proto.c28
-rw-r--r--src/backend/replication/logical/worker.c9
2 files changed, 37 insertions, 0 deletions
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index f2c85cabb52..2a1f9830e05 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -25,6 +25,7 @@
*/
#define LOGICALREP_IS_REPLICA_IDENTITY 1
+#define MESSAGE_TRANSACTIONAL (1<<0)
#define TRUNCATE_CASCADE (1<<0)
#define TRUNCATE_RESTART_SEQS (1<<1)
@@ -362,6 +363,33 @@ logicalrep_read_truncate(StringInfo in,
}
/*
+ * Write MESSAGE to stream
+ */
+void
+logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
+ bool transactional, const char *prefix, Size sz,
+ const char *message)
+{
+ uint8 flags = 0;
+
+ pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
+
+ /* encode and send message flags */
+ if (transactional)
+ flags |= MESSAGE_TRANSACTIONAL;
+
+ /* transaction ID (if not valid, we're not streaming) */
+ if (TransactionIdIsValid(xid))
+ pq_sendint32(out, xid);
+
+ pq_sendint8(out, flags);
+ pq_sendint64(out, lsn);
+ pq_sendstring(out, prefix);
+ pq_sendint32(out, sz);
+ pq_sendbytes(out, message, sz);
+}
+
+/*
* Write relation description to the output stream.
*/
void
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 354fbe4b4bc..74d538b5e37 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1939,6 +1939,15 @@ apply_dispatch(StringInfo s)
apply_handle_origin(s);
return;
+ case LOGICAL_REP_MSG_MESSAGE:
+
+ /*
+ * Logical replication does not use generic logical messages yet.
+ * Although, it could be used by other applications that use this
+ * output plugin.
+ */
+ return;
+
case LOGICAL_REP_MSG_STREAM_START:
apply_handle_stream_start(s);
return;