diff options
Diffstat (limited to 'src/bin/pg_basebackup/receivelog.c')
-rw-r--r-- | src/bin/pg_basebackup/receivelog.c | 35 |
1 files changed, 18 insertions, 17 deletions
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index 6b6e32dfbdf..25b13c7f55c 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -21,6 +21,7 @@ #include "access/xlog_internal.h" #include "common/logging.h" #include "libpq-fe.h" +#include "libpq/protocol.h" #include "receivelog.h" #include "streamutil.h" @@ -38,8 +39,8 @@ static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, char **buffer); static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr blockpos, TimestampTz *last_status); -static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, - XLogRecPtr *blockpos); +static bool ProcessWALDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, + XLogRecPtr *blockpos); static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf, XLogRecPtr blockpos, XLogRecPtr *stoppos); static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos); @@ -338,7 +339,7 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyReque char replybuf[1 + 8 + 8 + 8 + 8 + 1]; int len = 0; - replybuf[len] = 'r'; + replybuf[len] = PqReplMsg_StandbyStatusUpdate; len += 1; fe_sendint64(blockpos, &replybuf[len]); /* write */ len += 8; @@ -571,7 +572,7 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream) return true; /* Initiate the replication stream at specified location */ - snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u", + snprintf(query, sizeof(query), "START_REPLICATION %s%X/%08X TIMELINE %u", slotcmd, LSN_FORMAT_ARGS(stream->startpos), stream->timeline); @@ -628,7 +629,7 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream) } if (stream->startpos > stoppos) { - pg_log_error("server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X", + pg_log_error("server stopped streaming timeline %u at %X/%08X, but reported next timeline %u to begin at %X/%08X", stream->timeline, LSN_FORMAT_ARGS(stoppos), newtimeline, LSN_FORMAT_ARGS(stream->startpos)); goto error; @@ -720,7 +721,7 @@ ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline) } *timeline = atoi(PQgetvalue(res, 0, 0)); - if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid, + if (sscanf(PQgetvalue(res, 0, 1), "%X/%08X", &startpos_xlogid, &startpos_xrecoff) != 2) { pg_log_error("could not parse next timeline's starting point \"%s\"", @@ -823,15 +824,15 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream, } /* Check the message type. */ - if (copybuf[0] == 'k') + if (copybuf[0] == PqReplMsg_Keepalive) { if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos, &last_status)) goto error; } - else if (copybuf[0] == 'w') + else if (copybuf[0] == PqReplMsg_WALData) { - if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos)) + if (!ProcessWALDataMsg(conn, stream, copybuf, r, &blockpos)) goto error; /* @@ -1001,7 +1002,7 @@ ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, * Parse the keepalive message, enclosed in the CopyData message. We just * check if the server requested a reply, and ignore the rest. */ - pos = 1; /* skip msgtype 'k' */ + pos = 1; /* skip msgtype PqReplMsg_Keepalive */ pos += 8; /* skip walEnd */ pos += 8; /* skip sendTime */ @@ -1041,11 +1042,11 @@ ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, } /* - * Process XLogData message. + * Process WALData message. */ static bool -ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, - XLogRecPtr *blockpos) +ProcessWALDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, + XLogRecPtr *blockpos) { int xlogoff; int bytes_left; @@ -1054,17 +1055,17 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, /* * Once we've decided we don't want to receive any more, just ignore any - * subsequent XLogData messages. + * subsequent WALData messages. */ if (!(still_sending)) return true; /* - * Read the header of the XLogData message, enclosed in the CopyData + * Read the header of the WALData message, enclosed in the CopyData * message. We only need the WAL location field (dataStart), the rest of * the header is ignored. */ - hdr_len = 1; /* msgtype 'w' */ + hdr_len = 1; /* msgtype PqReplMsg_WALData */ hdr_len += 8; /* dataStart */ hdr_len += 8; /* walEnd */ hdr_len += 8; /* sendTime */ @@ -1162,7 +1163,7 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, return false; } still_sending = false; - return true; /* ignore the rest of this XLogData packet */ + return true; /* ignore the rest of this WALData packet */ } } } |