diff options
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r-- | src/backend/replication/walsender.c | 124 |
1 files changed, 80 insertions, 44 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index a6a7a1425be..e04d59e1e77 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -116,7 +116,9 @@ static void WalSndKill(int code, Datum arg); static bool XLogSend(char *msgbuf, bool *caughtup); static void IdentifySystem(void); static void StartReplication(StartReplicationCmd * cmd); +static void ProcessStandbyMessage(void); static void ProcessStandbyReplyMessage(void); +static void ProcessStandbyHSFeedbackMessage(void); static void ProcessRepliesIfAny(void); @@ -456,42 +458,45 @@ ProcessRepliesIfAny(void) unsigned char firstchar; int r; - r = pq_getbyte_if_available(&firstchar); - if (r < 0) - { - /* unexpected error or EOF */ - ereport(COMMERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg("unexpected EOF on standby connection"))); - proc_exit(0); - } - if (r == 0) + for (;;) { - /* no data available without blocking */ - return; - } + r = pq_getbyte_if_available(&firstchar); + if (r < 0) + { + /* unexpected error or EOF */ + ereport(COMMERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("unexpected EOF on standby connection"))); + proc_exit(0); + } + if (r == 0) + { + /* no data available without blocking */ + return; + } - /* Handle the very limited subset of commands expected in this phase */ - switch (firstchar) - { - /* - * 'd' means a standby reply wrapped in a CopyData packet. - */ - case 'd': - ProcessStandbyReplyMessage(); - break; + /* Handle the very limited subset of commands expected in this phase */ + switch (firstchar) + { + /* + * 'd' means a standby reply wrapped in a CopyData packet. + */ + case 'd': + ProcessStandbyMessage(); + break; - /* - * 'X' means that the standby is closing down the socket. - */ - case 'X': - proc_exit(0); + /* + * 'X' means that the standby is closing down the socket. + */ + case 'X': + proc_exit(0); - default: - ereport(FATAL, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg("invalid standby closing message type %d", - firstchar))); + default: + ereport(FATAL, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid standby closing message type %d", + firstchar))); + } } } @@ -499,11 +504,9 @@ ProcessRepliesIfAny(void) * Process a status update message received from standby. */ static void -ProcessStandbyReplyMessage(void) +ProcessStandbyMessage(void) { - StandbyReplyMessage reply; char msgtype; - TransactionId newxmin = InvalidTransactionId; resetStringInfo(&reply_message); @@ -523,22 +526,39 @@ ProcessStandbyReplyMessage(void) * one type. */ msgtype = pq_getmsgbyte(&reply_message); - if (msgtype != 'r') + + switch (msgtype) { - ereport(COMMERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg("unexpected message type %c", msgtype))); - proc_exit(0); + case 'r': + ProcessStandbyReplyMessage(); + break; + + case 'h': + ProcessStandbyHSFeedbackMessage(); + break; + + default: + ereport(COMMERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("unexpected message type %c", msgtype))); + proc_exit(0); } +} + +/* + * Regular reply from standby advising of WAL positions on standby server. + */ +static void +ProcessStandbyReplyMessage(void) +{ + StandbyReplyMessage reply; pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyReplyMessage)); - elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X xmin %u epoch %u", + elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X", reply.write.xlogid, reply.write.xrecoff, reply.flush.xlogid, reply.flush.xrecoff, - reply.apply.xlogid, reply.apply.xrecoff, - reply.xmin, - reply.epoch); + reply.apply.xlogid, reply.apply.xrecoff); /* * Update shared state for this WalSender process @@ -554,6 +574,22 @@ ProcessStandbyReplyMessage(void) walsnd->apply = reply.apply; SpinLockRelease(&walsnd->mutex); } +} + +/* + * Hot Standby feedback + */ +static void +ProcessStandbyHSFeedbackMessage(void) +{ + StandbyHSFeedbackMessage reply; + TransactionId newxmin = InvalidTransactionId; + + pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyHSFeedbackMessage)); + + elog(DEBUG2, "hot standby feedback xmin %u epoch %u", + reply.xmin, + reply.epoch); /* * Update the WalSender's proc xmin to allow it to be visible |