aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/proto.c
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2021-07-29 15:51:45 +0530
committerAmit Kapila <akapila@postgresql.org>2021-07-29 15:51:45 +0530
commit91f9861242cd7dcf28fae216b1d8b47551c9159d (patch)
tree20c15db5314a5809714068b84b172453131e81e6 /src/backend/replication/logical/proto.c
parent454ae15d10ea2d11669b69e82c98fbd03126fd69 (diff)
downloadpostgresql-91f9861242cd7dcf28fae216b1d8b47551c9159d.tar.gz
postgresql-91f9861242cd7dcf28fae216b1d8b47551c9159d.zip
Refactor to make common functions in proto.c and worker.c.
This is a non-functional change only to refactor code to extract some replication logic into static functions. This is done as preparation for the 2PC streaming patch which also shares this common logic. Author: Peter Smith Reviewed-By: Amit Kapila Discussion: https://postgr.es/m/CAHut+PuiSA8AiLcE2N5StzSKs46SQEP_vDOUD5fX2XCVtfZ7mQ@mail.gmail.com
Diffstat (limited to 'src/backend/replication/logical/proto.c')
-rw-r--r--src/backend/replication/logical/proto.c42
1 files changed, 32 insertions, 10 deletions
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index a2452525299..2d774567e08 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -145,15 +145,15 @@ logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_da
}
/*
- * Write PREPARE to the output stream.
+ * The core functionality for logicalrep_write_prepare.
*/
-void
-logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
- XLogRecPtr prepare_lsn)
+static void
+logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type,
+ ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
{
uint8 flags = 0;
- pq_sendbyte(out, LOGICAL_REP_MSG_PREPARE);
+ pq_sendbyte(out, type);
/*
* This should only ever happen for two-phase commit transactions, in
@@ -161,6 +161,7 @@ logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
*/
Assert(txn->gid != NULL);
Assert(rbtxn_prepared(txn));
+ Assert(TransactionIdIsValid(txn->xid));
/* send the flags field */
pq_sendbyte(out, flags);
@@ -176,24 +177,36 @@ logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
}
/*
- * Read transaction PREPARE from the stream.
+ * Write PREPARE to the output stream.
*/
void
-logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
+logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn)
+{
+ logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_PREPARE,
+ txn, prepare_lsn);
+}
+
+/*
+ * The core functionality for logicalrep_read_prepare.
+ */
+static void
+logicalrep_read_prepare_common(StringInfo in, char *msgtype,
+ LogicalRepPreparedTxnData *prepare_data)
{
/* read flags */
uint8 flags = pq_getmsgbyte(in);
if (flags != 0)
- elog(ERROR, "unrecognized flags %u in prepare message", flags);
+ elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype);
/* read fields */
prepare_data->prepare_lsn = pq_getmsgint64(in);
if (prepare_data->prepare_lsn == InvalidXLogRecPtr)
- elog(ERROR, "prepare_lsn is not set in prepare message");
+ elog(ERROR, "prepare_lsn is not set in %s message", msgtype);
prepare_data->end_lsn = pq_getmsgint64(in);
if (prepare_data->end_lsn == InvalidXLogRecPtr)
- elog(ERROR, "end_lsn is not set in prepare message");
+ elog(ERROR, "end_lsn is not set in %s message", msgtype);
prepare_data->prepare_time = pq_getmsgint64(in);
prepare_data->xid = pq_getmsgint(in, 4);
@@ -202,6 +215,15 @@ logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
}
/*
+ * Read transaction PREPARE from the stream.
+ */
+void
+logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
+{
+ logicalrep_read_prepare_common(in, "prepare", prepare_data);
+}
+
+/*
* Write COMMIT PREPARED to the output stream.
*/
void