aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/walsender.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r--src/backend/replication/walsender.c124
1 files changed, 80 insertions, 44 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index a6a7a1425be..e04d59e1e77 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -116,7 +116,9 @@ static void WalSndKill(int code, Datum arg);
static bool XLogSend(char *msgbuf, bool *caughtup);
static void IdentifySystem(void);
static void StartReplication(StartReplicationCmd * cmd);
+static void ProcessStandbyMessage(void);
static void ProcessStandbyReplyMessage(void);
+static void ProcessStandbyHSFeedbackMessage(void);
static void ProcessRepliesIfAny(void);
@@ -456,42 +458,45 @@ ProcessRepliesIfAny(void)
unsigned char firstchar;
int r;
- r = pq_getbyte_if_available(&firstchar);
- if (r < 0)
- {
- /* unexpected error or EOF */
- ereport(COMMERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg("unexpected EOF on standby connection")));
- proc_exit(0);
- }
- if (r == 0)
+ for (;;)
{
- /* no data available without blocking */
- return;
- }
+ r = pq_getbyte_if_available(&firstchar);
+ if (r < 0)
+ {
+ /* unexpected error or EOF */
+ ereport(COMMERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("unexpected EOF on standby connection")));
+ proc_exit(0);
+ }
+ if (r == 0)
+ {
+ /* no data available without blocking */
+ return;
+ }
- /* Handle the very limited subset of commands expected in this phase */
- switch (firstchar)
- {
- /*
- * 'd' means a standby reply wrapped in a CopyData packet.
- */
- case 'd':
- ProcessStandbyReplyMessage();
- break;
+ /* Handle the very limited subset of commands expected in this phase */
+ switch (firstchar)
+ {
+ /*
+ * 'd' means a standby reply wrapped in a CopyData packet.
+ */
+ case 'd':
+ ProcessStandbyMessage();
+ break;
- /*
- * 'X' means that the standby is closing down the socket.
- */
- case 'X':
- proc_exit(0);
+ /*
+ * 'X' means that the standby is closing down the socket.
+ */
+ case 'X':
+ proc_exit(0);
- default:
- ereport(FATAL,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg("invalid standby closing message type %d",
- firstchar)));
+ default:
+ ereport(FATAL,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("invalid standby closing message type %d",
+ firstchar)));
+ }
}
}
@@ -499,11 +504,9 @@ ProcessRepliesIfAny(void)
* Process a status update message received from standby.
*/
static void
-ProcessStandbyReplyMessage(void)
+ProcessStandbyMessage(void)
{
- StandbyReplyMessage reply;
char msgtype;
- TransactionId newxmin = InvalidTransactionId;
resetStringInfo(&reply_message);
@@ -523,22 +526,39 @@ ProcessStandbyReplyMessage(void)
* one type.
*/
msgtype = pq_getmsgbyte(&reply_message);
- if (msgtype != 'r')
+
+ switch (msgtype)
{
- ereport(COMMERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg("unexpected message type %c", msgtype)));
- proc_exit(0);
+ case 'r':
+ ProcessStandbyReplyMessage();
+ break;
+
+ case 'h':
+ ProcessStandbyHSFeedbackMessage();
+ break;
+
+ default:
+ ereport(COMMERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("unexpected message type %c", msgtype)));
+ proc_exit(0);
}
+}
+
+/*
+ * Regular reply from standby advising of WAL positions on standby server.
+ */
+static void
+ProcessStandbyReplyMessage(void)
+{
+ StandbyReplyMessage reply;
pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyReplyMessage));
- elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X xmin %u epoch %u",
+ elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X",
reply.write.xlogid, reply.write.xrecoff,
reply.flush.xlogid, reply.flush.xrecoff,
- reply.apply.xlogid, reply.apply.xrecoff,
- reply.xmin,
- reply.epoch);
+ reply.apply.xlogid, reply.apply.xrecoff);
/*
* Update shared state for this WalSender process
@@ -554,6 +574,22 @@ ProcessStandbyReplyMessage(void)
walsnd->apply = reply.apply;
SpinLockRelease(&walsnd->mutex);
}
+}
+
+/*
+ * Hot Standby feedback
+ */
+static void
+ProcessStandbyHSFeedbackMessage(void)
+{
+ StandbyHSFeedbackMessage reply;
+ TransactionId newxmin = InvalidTransactionId;
+
+ pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyHSFeedbackMessage));
+
+ elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
+ reply.xmin,
+ reply.epoch);
/*
* Update the WalSender's proc xmin to allow it to be visible