aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/proto.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/proto.c')
-rw-r--r--src/backend/replication/logical/proto.c42
1 files changed, 32 insertions, 10 deletions
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index a2452525299..2d774567e08 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -145,15 +145,15 @@ logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_da
}
/*
- * Write PREPARE to the output stream.
+ * The core functionality for logicalrep_write_prepare.
*/
-void
-logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
- XLogRecPtr prepare_lsn)
+static void
+logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type,
+ ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
{
uint8 flags = 0;
- pq_sendbyte(out, LOGICAL_REP_MSG_PREPARE);
+ pq_sendbyte(out, type);
/*
* This should only ever happen for two-phase commit transactions, in
@@ -161,6 +161,7 @@ logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
*/
Assert(txn->gid != NULL);
Assert(rbtxn_prepared(txn));
+ Assert(TransactionIdIsValid(txn->xid));
/* send the flags field */
pq_sendbyte(out, flags);
@@ -176,24 +177,36 @@ logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
}
/*
- * Read transaction PREPARE from the stream.
+ * Write PREPARE to the output stream.
*/
void
-logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
+logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn)
+{
+ logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_PREPARE,
+ txn, prepare_lsn);
+}
+
+/*
+ * The core functionality for logicalrep_read_prepare.
+ */
+static void
+logicalrep_read_prepare_common(StringInfo in, char *msgtype,
+ LogicalRepPreparedTxnData *prepare_data)
{
/* read flags */
uint8 flags = pq_getmsgbyte(in);
if (flags != 0)
- elog(ERROR, "unrecognized flags %u in prepare message", flags);
+ elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype);
/* read fields */
prepare_data->prepare_lsn = pq_getmsgint64(in);
if (prepare_data->prepare_lsn == InvalidXLogRecPtr)
- elog(ERROR, "prepare_lsn is not set in prepare message");
+ elog(ERROR, "prepare_lsn is not set in %s message", msgtype);
prepare_data->end_lsn = pq_getmsgint64(in);
if (prepare_data->end_lsn == InvalidXLogRecPtr)
- elog(ERROR, "end_lsn is not set in prepare message");
+ elog(ERROR, "end_lsn is not set in %s message", msgtype);
prepare_data->prepare_time = pq_getmsgint64(in);
prepare_data->xid = pq_getmsgint(in, 4);
@@ -202,6 +215,15 @@ logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
}
/*
+ * Read transaction PREPARE from the stream.
+ */
+void
+logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
+{
+ logicalrep_read_prepare_common(in, "prepare", prepare_data);
+}
+
+/*
* Write COMMIT PREPARED to the output stream.
*/
void