diff options
Diffstat (limited to 'src/bin/pg_basebackup/receivelog.c')
-rw-r--r-- | src/bin/pg_basebackup/receivelog.c | 74 |
1 files changed, 64 insertions, 10 deletions
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index 33095304fa7..b1e103e6348 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -35,11 +35,41 @@ static char current_walfile_name[MAXPGPATH] = ""; static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, - char *partial_suffix, XLogRecPtr *stoppos); + char *partial_suffix, XLogRecPtr *stoppos, + bool mark_done); static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline); +static bool +mark_file_as_archived(const char *basedir, const char *fname) +{ + int fd; + static char tmppath[MAXPGPATH]; + + snprintf(tmppath, sizeof(tmppath), "%s/archive_status/%s.done", + basedir, fname); + + fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR); + if (fd < 0) + { + fprintf(stderr, _("%s: could not create archive status file \"%s\": %s\n"), + progname, tmppath, strerror(errno)); + return false; + } + + if (fsync(fd) != 0) + { + fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"), + progname, tmppath, strerror(errno)); + return false; + } + + close(fd); + + return true; +} + /* * Open a new WAL file in the specified directory. * @@ -133,7 +163,7 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, * and returns false, otherwise returns true. */ static bool -close_walfile(char *basedir, char *partial_suffix) +close_walfile(char *basedir, char *partial_suffix, bool mark_done) { off_t currpos; @@ -187,6 +217,19 @@ close_walfile(char *basedir, char *partial_suffix) _("%s: not renaming \"%s%s\", segment is not complete\n"), progname, current_walfile_name, partial_suffix); + /* + * Mark file as archived if requested by the caller - pg_basebackup needs + * to do so as files can otherwise get archived again after promotion of a + * new node. This is in line with walreceiver.c always doing a + * XLogArchiveForceDone() after a complete segment. + */ + if (currpos == XLOG_SEG_SIZE && mark_done) + { + /* writes error message if failed */ + if (!mark_file_as_archived(basedir, current_walfile_name)) + return false; + } + return true; } @@ -285,7 +328,8 @@ existsTimeLineHistoryFile(char *basedir, TimeLineID tli) } static bool -writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *content) +writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, + char *content, bool mark_done) { int size = strlen(content); char path[MAXPGPATH]; @@ -364,6 +408,14 @@ writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *co return false; } + /* Maintain archive_status, check close_walfile() for details. */ + if (mark_done) + { + /* writes error message if failed */ + if (!mark_file_as_archived(basedir, histfname)) + return false; + } + return true; } @@ -508,7 +560,8 @@ bool ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, stream_stop_callback stream_stop, - int standby_message_timeout, char *partial_suffix) + int standby_message_timeout, char *partial_suffix, + bool mark_done) { char query[128]; PGresult *res; @@ -593,7 +646,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* Write the history file to disk */ writeTimeLineHistoryFile(basedir, timeline, PQgetvalue(res, 0, 0), - PQgetvalue(res, 0, 1)); + PQgetvalue(res, 0, 1), + mark_done); PQclear(res); } @@ -622,7 +676,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* Stream the WAL */ res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop, standby_message_timeout, partial_suffix, - &stoppos); + &stoppos, mark_done); if (res == NULL) goto error; @@ -787,7 +841,7 @@ static PGresult * HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, - XLogRecPtr *stoppos) + XLogRecPtr *stoppos, bool mark_done) { char *copybuf = NULL; int64 last_status = -1; @@ -814,7 +868,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, */ if (still_sending && stream_stop(blockpos, timeline, false)) { - if (!close_walfile(basedir, partial_suffix)) + if (!close_walfile(basedir, partial_suffix, mark_done)) { /* Potential error message is written by close_walfile */ goto error; @@ -913,7 +967,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, */ if (still_sending) { - if (!close_walfile(basedir, partial_suffix)) + if (!close_walfile(basedir, partial_suffix, mark_done)) { /* Error message written in close_walfile() */ PQclear(res); @@ -1081,7 +1135,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* Did we reach the end of a WAL segment? */ if (blockpos % XLOG_SEG_SIZE == 0) { - if (!close_walfile(basedir, partial_suffix)) + if (!close_walfile(basedir, partial_suffix, mark_done)) /* Error message written in close_walfile() */ goto error; |