diff options
Diffstat (limited to 'src/bin/pg_basebackup/receivelog.c')
-rw-r--r-- | src/bin/pg_basebackup/receivelog.c | 93 |
1 files changed, 47 insertions, 46 deletions
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index 98e874f4ffe..7ce81125bfe 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -30,11 +30,11 @@ /* fd and filename for currently open WAL file */ static int walfile = -1; -static char current_walfile_name[MAXPGPATH] = ""; +static char current_walfile_name[MAXPGPATH] = ""; static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, - stream_stop_callback stream_stop, int standby_message_timeout, + stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, XLogRecPtr *stoppos); static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, @@ -200,7 +200,7 @@ close_walfile(char *basedir, char *partial_suffix) static int64 localGetCurrentTimestamp(void) { - int64 result; + int64 result; struct timeval tp; gettimeofday(&tp, NULL); @@ -221,7 +221,7 @@ static void localTimestampDifference(int64 start_time, int64 stop_time, long *secs, int *microsecs) { - int64 diff = stop_time - start_time; + int64 diff = stop_time - start_time; if (diff <= 0) { @@ -244,7 +244,7 @@ localTimestampDifferenceExceeds(int64 start_time, int64 stop_time, int msec) { - int64 diff = stop_time - start_time; + int64 diff = stop_time - start_time; return (diff >= msec * INT64CONST(1000)); } @@ -309,7 +309,7 @@ writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *co /* * Write into a temp file name. */ - snprintf(tmppath, MAXPGPATH, "%s.tmp", path); + snprintf(tmppath, MAXPGPATH, "%s.tmp", path); unlink(tmppath); @@ -414,19 +414,19 @@ static bool sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested) { char replybuf[1 + 8 + 8 + 8 + 8 + 1]; - int len = 0; + int len = 0; replybuf[len] = 'r'; len += 1; - sendint64(blockpos, &replybuf[len]); /* write */ + sendint64(blockpos, &replybuf[len]); /* write */ len += 8; - sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */ + sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */ len += 8; - sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */ + sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */ len += 8; - sendint64(now, &replybuf[len]); /* sendTime */ + sendint64(now, &replybuf[len]); /* sendTime */ len += 8; - replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */ + replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */ len += 1; if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn)) @@ -464,6 +464,7 @@ CheckServerVersionForStreaming(PGconn *conn) if (serverMajor < minServerMajor || serverMajor > maxServerMajor) { const char *serverver = PQparameterStatus(conn, "server_version"); + fprintf(stderr, _("%s: incompatible server version %s; streaming is only supported with server version %s\n"), progname, serverver ? serverver : "'unknown'", @@ -550,7 +551,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, if (timeline > atoi(PQgetvalue(res, 0, 1))) { fprintf(stderr, - _("%s: starting timeline %u is not present in the server\n"), + _("%s: starting timeline %u is not present in the server\n"), progname, timeline); PQclear(res); return false; @@ -561,8 +562,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, while (1) { /* - * Fetch the timeline history file for this timeline, if we don't - * have it already. + * Fetch the timeline history file for this timeline, if we don't have + * it already. */ if (!existsTimeLineHistoryFile(basedir, timeline)) { @@ -572,7 +573,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, { /* FIXME: we might send it ok, but get an error */ fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), - progname, "TIMELINE_HISTORY", PQresultErrorMessage(res)); + progname, "TIMELINE_HISTORY", PQresultErrorMessage(res)); PQclear(res); return false; } @@ -585,7 +586,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, { fprintf(stderr, _("%s: unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields\n"), - progname, PQntuples(res), PQnfields(res), 1, 2); + progname, PQntuples(res), PQnfields(res), 1, 2); } /* Write the history file to disk */ @@ -597,8 +598,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, } /* - * Before we start streaming from the requested location, check - * if the callback tells us to stop here. + * Before we start streaming from the requested location, check if the + * callback tells us to stop here. */ if (stream_stop(startpos, timeline, false)) return true; @@ -627,8 +628,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* * Streaming finished. * - * There are two possible reasons for that: a controlled shutdown, - * or we reached the end of the current timeline. In case of + * There are two possible reasons for that: a controlled shutdown, or + * we reached the end of the current timeline. In case of * end-of-timeline, the server sends a result set after Copy has * finished, containing information about the next timeline. Read * that, and restart streaming from the next timeline. In case of @@ -667,7 +668,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, _("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X\n"), progname, timeline, (uint32) (stoppos >> 32), (uint32) stoppos, - newtimeline, (uint32) (startpos >> 32), (uint32) startpos); + newtimeline, (uint32) (startpos >> 32), (uint32) startpos); goto error; } @@ -676,15 +677,15 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, if (PQresultStatus(res) != PGRES_COMMAND_OK) { fprintf(stderr, - _("%s: unexpected termination of replication stream: %s"), + _("%s: unexpected termination of replication stream: %s"), progname, PQresultErrorMessage(res)); goto error; } PQclear(res); /* - * Loop back to start streaming from the new timeline. - * Always start streaming at the beginning of a segment. + * Loop back to start streaming from the new timeline. Always + * start streaming at the beginning of a segment. */ timeline = newtimeline; startpos = startpos - (startpos % XLOG_SEG_SIZE); @@ -738,9 +739,9 @@ ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline) /*---------- * The result set consists of one row and two columns, e.g: * - * next_tli | next_tli_startpos + * next_tli | next_tli_startpos * ----------+------------------- - * 4 | 0/9949AE0 + * 4 | 0/9949AE0 * * next_tli is the timeline ID of the next timeline after the one that * just finished streaming. next_tli_startpos is the XLOG position where @@ -760,7 +761,7 @@ ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline) &startpos_xrecoff) != 2) { fprintf(stderr, - _("%s: could not parse next timeline's starting point \"%s\"\n"), + _("%s: could not parse next timeline's starting point \"%s\"\n"), progname, PQgetvalue(res, 0, 1)); return false; } @@ -840,8 +841,8 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, if (r == 0) { /* - * No data available. Wait for some to appear, but not longer - * than the specified timeout, so that we can ping the server. + * No data available. Wait for some to appear, but not longer than + * the specified timeout, so that we can ping the server. */ fd_set input_mask; struct timeval timeout; @@ -875,8 +876,8 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, { /* * Got a timeout or signal. Continue the loop and either - * deliver a status packet to the server or just go back - * into blocking. + * deliver a status packet to the server or just go back into + * blocking. */ continue; } @@ -940,17 +941,17 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* Check the message type. */ if (copybuf[0] == 'k') { - int pos; - bool replyRequested; + int pos; + bool replyRequested; /* * Parse the keepalive message, enclosed in the CopyData message. * We just check if the server requested a reply, and ignore the * rest. */ - pos = 1; /* skip msgtype 'k' */ - pos += 8; /* skip walEnd */ - pos += 8; /* skip sendTime */ + pos = 1; /* skip msgtype 'k' */ + pos += 8; /* skip walEnd */ + pos += 8; /* skip sendTime */ if (r < pos + 1) { @@ -983,10 +984,10 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, * CopyData message. We only need the WAL location field * (dataStart), the rest of the header is ignored. */ - hdr_len = 1; /* msgtype 'w' */ - hdr_len += 8; /* dataStart */ - hdr_len += 8; /* walEnd */ - hdr_len += 8; /* sendTime */ + hdr_len = 1; /* msgtype 'w' */ + hdr_len += 8; /* dataStart */ + hdr_len += 8; /* walEnd */ + hdr_len += 8; /* sendTime */ if (r < hdr_len + 1) { fprintf(stderr, _("%s: streaming header too small: %d\n"), @@ -999,8 +1000,8 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, xlogoff = blockpos % XLOG_SEG_SIZE; /* - * Verify that the initial location in the stream matches where - * we think we are. + * Verify that the initial location in the stream matches where we + * think we are. */ if (walfile == -1) { @@ -1020,8 +1021,8 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, if (lseek(walfile, 0, SEEK_CUR) != xlogoff) { fprintf(stderr, - _("%s: got WAL data offset %08x, expected %08x\n"), - progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR)); + _("%s: got WAL data offset %08x, expected %08x\n"), + progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR)); goto error; } } @@ -1087,7 +1088,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, goto error; } still_sending = false; - break; /* ignore the rest of this XLogData packet */ + break; /* ignore the rest of this XLogData packet */ } } } |