diff options
Diffstat (limited to 'src/backend/replication/logical/proto.c')
-rw-r--r-- | src/backend/replication/logical/proto.c | 42 |
1 files changed, 32 insertions, 10 deletions
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index a2452525299..2d774567e08 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -145,15 +145,15 @@ logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_da } /* - * Write PREPARE to the output stream. + * The core functionality for logicalrep_write_prepare. */ -void -logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, - XLogRecPtr prepare_lsn) +static void +logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type, + ReorderBufferTXN *txn, XLogRecPtr prepare_lsn) { uint8 flags = 0; - pq_sendbyte(out, LOGICAL_REP_MSG_PREPARE); + pq_sendbyte(out, type); /* * This should only ever happen for two-phase commit transactions, in @@ -161,6 +161,7 @@ logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, */ Assert(txn->gid != NULL); Assert(rbtxn_prepared(txn)); + Assert(TransactionIdIsValid(txn->xid)); /* send the flags field */ pq_sendbyte(out, flags); @@ -176,24 +177,36 @@ logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, } /* - * Read transaction PREPARE from the stream. + * Write PREPARE to the output stream. */ void -logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data) +logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_PREPARE, + txn, prepare_lsn); +} + +/* + * The core functionality for logicalrep_read_prepare. + */ +static void +logicalrep_read_prepare_common(StringInfo in, char *msgtype, + LogicalRepPreparedTxnData *prepare_data) { /* read flags */ uint8 flags = pq_getmsgbyte(in); if (flags != 0) - elog(ERROR, "unrecognized flags %u in prepare message", flags); + elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype); /* read fields */ prepare_data->prepare_lsn = pq_getmsgint64(in); if (prepare_data->prepare_lsn == InvalidXLogRecPtr) - elog(ERROR, "prepare_lsn is not set in prepare message"); + elog(ERROR, "prepare_lsn is not set in %s message", msgtype); prepare_data->end_lsn = pq_getmsgint64(in); if (prepare_data->end_lsn == InvalidXLogRecPtr) - elog(ERROR, "end_lsn is not set in prepare message"); + elog(ERROR, "end_lsn is not set in %s message", msgtype); prepare_data->prepare_time = pq_getmsgint64(in); prepare_data->xid = pq_getmsgint(in, 4); @@ -202,6 +215,15 @@ logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data) } /* + * Read transaction PREPARE from the stream. + */ +void +logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data) +{ + logicalrep_read_prepare_common(in, "prepare", prepare_data); +} + +/* * Write COMMIT PREPARED to the output stream. */ void |