aboutsummaryrefslogtreecommitdiff
path: root/src/bin/pg_basebackup/receivelog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/bin/pg_basebackup/receivelog.c')
-rw-r--r--src/bin/pg_basebackup/receivelog.c74
1 files changed, 64 insertions, 10 deletions
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index 33095304fa7..b1e103e6348 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -35,11 +35,41 @@ static char current_walfile_name[MAXPGPATH] = "";
static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
uint32 timeline, char *basedir,
stream_stop_callback stream_stop, int standby_message_timeout,
- char *partial_suffix, XLogRecPtr *stoppos);
+ char *partial_suffix, XLogRecPtr *stoppos,
+ bool mark_done);
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
uint32 *timeline);
+static bool
+mark_file_as_archived(const char *basedir, const char *fname)
+{
+ int fd;
+ static char tmppath[MAXPGPATH];
+
+ snprintf(tmppath, sizeof(tmppath), "%s/archive_status/%s.done",
+ basedir, fname);
+
+ fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
+ if (fd < 0)
+ {
+ fprintf(stderr, _("%s: could not create archive status file \"%s\": %s\n"),
+ progname, tmppath, strerror(errno));
+ return false;
+ }
+
+ if (fsync(fd) != 0)
+ {
+ fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
+ progname, tmppath, strerror(errno));
+ return false;
+ }
+
+ close(fd);
+
+ return true;
+}
+
/*
* Open a new WAL file in the specified directory.
*
@@ -133,7 +163,7 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
* and returns false, otherwise returns true.
*/
static bool
-close_walfile(char *basedir, char *partial_suffix)
+close_walfile(char *basedir, char *partial_suffix, bool mark_done)
{
off_t currpos;
@@ -187,6 +217,19 @@ close_walfile(char *basedir, char *partial_suffix)
_("%s: not renaming \"%s%s\", segment is not complete\n"),
progname, current_walfile_name, partial_suffix);
+ /*
+ * Mark file as archived if requested by the caller - pg_basebackup needs
+ * to do so as files can otherwise get archived again after promotion of a
+ * new node. This is in line with walreceiver.c always doing a
+ * XLogArchiveForceDone() after a complete segment.
+ */
+ if (currpos == XLOG_SEG_SIZE && mark_done)
+ {
+ /* writes error message if failed */
+ if (!mark_file_as_archived(basedir, current_walfile_name))
+ return false;
+ }
+
return true;
}
@@ -285,7 +328,8 @@ existsTimeLineHistoryFile(char *basedir, TimeLineID tli)
}
static bool
-writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *content)
+writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename,
+ char *content, bool mark_done)
{
int size = strlen(content);
char path[MAXPGPATH];
@@ -364,6 +408,14 @@ writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *co
return false;
}
+ /* Maintain archive_status, check close_walfile() for details. */
+ if (mark_done)
+ {
+ /* writes error message if failed */
+ if (!mark_file_as_archived(basedir, histfname))
+ return false;
+ }
+
return true;
}
@@ -508,7 +560,8 @@ bool
ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *sysidentifier, char *basedir,
stream_stop_callback stream_stop,
- int standby_message_timeout, char *partial_suffix)
+ int standby_message_timeout, char *partial_suffix,
+ bool mark_done)
{
char query[128];
PGresult *res;
@@ -593,7 +646,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/* Write the history file to disk */
writeTimeLineHistoryFile(basedir, timeline,
PQgetvalue(res, 0, 0),
- PQgetvalue(res, 0, 1));
+ PQgetvalue(res, 0, 1),
+ mark_done);
PQclear(res);
}
@@ -622,7 +676,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/* Stream the WAL */
res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
standby_message_timeout, partial_suffix,
- &stoppos);
+ &stoppos, mark_done);
if (res == NULL)
goto error;
@@ -787,7 +841,7 @@ static PGresult *
HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix,
- XLogRecPtr *stoppos)
+ XLogRecPtr *stoppos, bool mark_done)
{
char *copybuf = NULL;
int64 last_status = -1;
@@ -814,7 +868,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
*/
if (still_sending && stream_stop(blockpos, timeline, false))
{
- if (!close_walfile(basedir, partial_suffix))
+ if (!close_walfile(basedir, partial_suffix, mark_done))
{
/* Potential error message is written by close_walfile */
goto error;
@@ -913,7 +967,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
*/
if (still_sending)
{
- if (!close_walfile(basedir, partial_suffix))
+ if (!close_walfile(basedir, partial_suffix, mark_done))
{
/* Error message written in close_walfile() */
PQclear(res);
@@ -1081,7 +1135,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/* Did we reach the end of a WAL segment? */
if (blockpos % XLOG_SEG_SIZE == 0)
{
- if (!close_walfile(basedir, partial_suffix))
+ if (!close_walfile(basedir, partial_suffix, mark_done))
/* Error message written in close_walfile() */
goto error;