aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/basebackup.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/basebackup.c')
-rw-r--r--src/backend/replication/basebackup.c692
1 files changed, 197 insertions, 495 deletions
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index b7359f43903..38c82c46196 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -17,13 +17,9 @@
#include <time.h>
#include "access/xlog_internal.h" /* for pg_start/stop_backup */
-#include "catalog/pg_type.h"
#include "common/file_perm.h"
#include "commands/defrem.h"
-#include "commands/progress.h"
#include "lib/stringinfo.h"
-#include "libpq/libpq.h"
-#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/pg_list.h"
#include "pgstat.h"
@@ -31,6 +27,7 @@
#include "port.h"
#include "postmaster/syslogger.h"
#include "replication/basebackup.h"
+#include "replication/basebackup_sink.h"
#include "replication/backup_manifest.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
@@ -46,6 +43,16 @@
#include "utils/resowner.h"
#include "utils/timestamp.h"
+/*
+ * How much data do we want to send in one CopyData message? Note that
+ * this may also result in reading the underlying files in chunks of this
+ * size.
+ *
+ * NB: The buffer size is required to be a multiple of the system block
+ * size, so use that value instead if it's bigger than our preference.
+ */
+#define SINK_BUFFER_LENGTH Max(32768, BLCKSZ)
+
typedef struct
{
const char *label;
@@ -59,27 +66,25 @@ typedef struct
pg_checksum_type manifest_checksum_type;
} basebackup_options;
-static int64 sendTablespace(char *path, char *oid, bool sizeonly,
+static int64 sendTablespace(bbsink *sink, char *path, char *oid, bool sizeonly,
struct backup_manifest_info *manifest);
-static int64 sendDir(const char *path, int basepathlen, bool sizeonly,
+static int64 sendDir(bbsink *sink, const char *path, int basepathlen, bool sizeonly,
List *tablespaces, bool sendtblspclinks,
backup_manifest_info *manifest, const char *spcoid);
-static bool sendFile(const char *readfilename, const char *tarfilename,
+static bool sendFile(bbsink *sink, const char *readfilename, const char *tarfilename,
struct stat *statbuf, bool missing_ok, Oid dboid,
backup_manifest_info *manifest, const char *spcoid);
-static void sendFileWithContent(const char *filename, const char *content,
+static void sendFileWithContent(bbsink *sink, const char *filename,
+ const char *content,
backup_manifest_info *manifest);
-static int64 _tarWriteHeader(const char *filename, const char *linktarget,
- struct stat *statbuf, bool sizeonly);
+static int64 _tarWriteHeader(bbsink *sink, const char *filename,
+ const char *linktarget, struct stat *statbuf,
+ bool sizeonly);
+static void _tarWritePadding(bbsink *sink, int len);
static void convert_link_to_directory(const char *pathbuf, struct stat *statbuf);
-static void send_int8_string(StringInfoData *buf, int64 intval);
-static void SendBackupHeader(List *tablespaces);
-static void perform_base_backup(basebackup_options *opt);
+static void perform_base_backup(basebackup_options *opt, bbsink *sink);
static void parse_basebackup_options(List *options, basebackup_options *opt);
-static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
static int compareWalFileNames(const ListCell *a, const ListCell *b);
-static void throttle(size_t increment);
-static void update_basebackup_progress(int64 delta);
static bool is_checksummed_file(const char *fullpath, const char *filename);
static int basebackup_read_file(int fd, char *buf, size_t nbytes, off_t offset,
const char *filename, bool partial_read_ok);
@@ -90,31 +95,6 @@ static bool backup_started_in_recovery = false;
/* Relative path of temporary statistics directory */
static char *statrelpath = NULL;
-/*
- * Size of each block sent into the tar stream for larger files.
- */
-#define TAR_SEND_SIZE 32768
-
-/*
- * How frequently to throttle, as a fraction of the specified rate-second.
- */
-#define THROTTLING_FREQUENCY 8
-
-/* The actual number of bytes, transfer of which may cause sleep. */
-static uint64 throttling_sample;
-
-/* Amount of data already transferred but not yet throttled. */
-static int64 throttling_counter;
-
-/* The minimum time required to transfer throttling_sample bytes. */
-static TimeOffset elapsed_min_unit;
-
-/* The last check of the transfer rate. */
-static TimestampTz throttled_last;
-
-/* The starting XLOG position of the base backup. */
-static XLogRecPtr startptr;
-
/* Total number of checksum failures during base backup. */
static long long int total_checksum_failures;
@@ -122,15 +102,6 @@ static long long int total_checksum_failures;
static bool noverify_checksums = false;
/*
- * Total amount of backup data that will be streamed.
- * -1 means that the size is not estimated.
- */
-static int64 backup_total = 0;
-
-/* Amount of backup data already streamed */
-static int64 backup_streamed = 0;
-
-/*
* Definition of one element part of an exclusion list, used for paths part
* of checksum validation or base backups. "name" is the name of the file
* or path to check for exclusion. If "match_prefix" is true, any items
@@ -253,32 +224,22 @@ static const struct exclude_list_item noChecksumFiles[] = {
* clobbered by longjmp" from stupider versions of gcc.
*/
static void
-perform_base_backup(basebackup_options *opt)
+perform_base_backup(basebackup_options *opt, bbsink *sink)
{
- TimeLineID starttli;
+ bbsink_state state;
XLogRecPtr endptr;
TimeLineID endtli;
StringInfo labelfile;
StringInfo tblspc_map_file;
backup_manifest_info manifest;
int datadirpathlen;
- List *tablespaces = NIL;
- backup_total = 0;
- backup_streamed = 0;
- pgstat_progress_start_command(PROGRESS_COMMAND_BASEBACKUP, InvalidOid);
-
- /*
- * If the estimation of the total backup size is disabled, make the
- * backup_total column in the view return NULL by setting the parameter to
- * -1.
- */
- if (!opt->progress)
- {
- backup_total = -1;
- pgstat_progress_update_param(PROGRESS_BASEBACKUP_BACKUP_TOTAL,
- backup_total);
- }
+ /* Initial backup state, insofar as we know it now. */
+ state.tablespaces = NIL;
+ state.tablespace_num = 0;
+ state.bytes_done = 0;
+ state.bytes_total = 0;
+ state.bytes_total_is_valid = false;
/* we're going to use a BufFile, so we need a ResourceOwner */
Assert(CurrentResourceOwner == NULL);
@@ -295,11 +256,11 @@ perform_base_backup(basebackup_options *opt)
total_checksum_failures = 0;
- pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE,
- PROGRESS_BASEBACKUP_PHASE_WAIT_CHECKPOINT);
- startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &starttli,
- labelfile, &tablespaces,
- tblspc_map_file);
+ basebackup_progress_wait_checkpoint();
+ state.startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint,
+ &state.starttli,
+ labelfile, &state.tablespaces,
+ tblspc_map_file);
/*
* Once do_pg_start_backup has been called, ensure that any failure causes
@@ -312,7 +273,6 @@ perform_base_backup(basebackup_options *opt)
{
ListCell *lc;
tablespaceinfo *ti;
- int tblspc_streamed = 0;
/*
* Calculate the relative path of temporary statistics directory in
@@ -329,7 +289,7 @@ perform_base_backup(basebackup_options *opt)
/* Add a node for the base directory at the end */
ti = palloc0(sizeof(tablespaceinfo));
ti->size = -1;
- tablespaces = lappend(tablespaces, ti);
+ state.tablespaces = lappend(state.tablespaces, ti);
/*
* Calculate the total backup size by summing up the size of each
@@ -337,100 +297,53 @@ perform_base_backup(basebackup_options *opt)
*/
if (opt->progress)
{
- pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE,
- PROGRESS_BASEBACKUP_PHASE_ESTIMATE_BACKUP_SIZE);
+ basebackup_progress_estimate_backup_size();
- foreach(lc, tablespaces)
+ foreach(lc, state.tablespaces)
{
tablespaceinfo *tmp = (tablespaceinfo *) lfirst(lc);
if (tmp->path == NULL)
- tmp->size = sendDir(".", 1, true, tablespaces, true, NULL,
- NULL);
+ tmp->size = sendDir(sink, ".", 1, true, state.tablespaces,
+ true, NULL, NULL);
else
- tmp->size = sendTablespace(tmp->path, tmp->oid, true,
+ tmp->size = sendTablespace(sink, tmp->path, tmp->oid, true,
NULL);
- backup_total += tmp->size;
+ state.bytes_total += tmp->size;
}
+ state.bytes_total_is_valid = true;
}
- /* Report that we are now streaming database files as a base backup */
- {
- const int index[] = {
- PROGRESS_BASEBACKUP_PHASE,
- PROGRESS_BASEBACKUP_BACKUP_TOTAL,
- PROGRESS_BASEBACKUP_TBLSPC_TOTAL
- };
- const int64 val[] = {
- PROGRESS_BASEBACKUP_PHASE_STREAM_BACKUP,
- backup_total, list_length(tablespaces)
- };
-
- pgstat_progress_update_multi_param(3, index, val);
- }
-
- /* Send the starting position of the backup */
- SendXlogRecPtrResult(startptr, starttli);
-
- /* Send tablespace header */
- SendBackupHeader(tablespaces);
-
- /* Setup and activate network throttling, if client requested it */
- if (opt->maxrate > 0)
- {
- throttling_sample =
- (int64) opt->maxrate * (int64) 1024 / THROTTLING_FREQUENCY;
-
- /*
- * The minimum amount of time for throttling_sample bytes to be
- * transferred.
- */
- elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY;
-
- /* Enable throttling. */
- throttling_counter = 0;
-
- /* The 'real data' starts now (header was ignored). */
- throttled_last = GetCurrentTimestamp();
- }
- else
- {
- /* Disable throttling. */
- throttling_counter = -1;
- }
+ /* notify basebackup sink about start of backup */
+ bbsink_begin_backup(sink, &state, SINK_BUFFER_LENGTH);
/* Send off our tablespaces one by one */
- foreach(lc, tablespaces)
+ foreach(lc, state.tablespaces)
{
tablespaceinfo *ti = (tablespaceinfo *) lfirst(lc);
- StringInfoData buf;
-
- /* Send CopyOutResponse message */
- pq_beginmessage(&buf, 'H');
- pq_sendbyte(&buf, 0); /* overall format */
- pq_sendint16(&buf, 0); /* natts */
- pq_endmessage(&buf);
if (ti->path == NULL)
{
struct stat statbuf;
bool sendtblspclinks = true;
+ bbsink_begin_archive(sink, "base.tar");
+
/* In the main tar, include the backup_label first... */
- sendFileWithContent(BACKUP_LABEL_FILE, labelfile->data,
+ sendFileWithContent(sink, BACKUP_LABEL_FILE, labelfile->data,
&manifest);
/* Then the tablespace_map file, if required... */
if (opt->sendtblspcmapfile)
{
- sendFileWithContent(TABLESPACE_MAP, tblspc_map_file->data,
+ sendFileWithContent(sink, TABLESPACE_MAP, tblspc_map_file->data,
&manifest);
sendtblspclinks = false;
}
/* Then the bulk of the files... */
- sendDir(".", 1, false, tablespaces, sendtblspclinks,
- &manifest, NULL);
+ sendDir(sink, ".", 1, false, state.tablespaces,
+ sendtblspclinks, &manifest, NULL);
/* ... and pg_control after everything else. */
if (lstat(XLOG_CONTROL_FILE, &statbuf) != 0)
@@ -438,32 +351,33 @@ perform_base_backup(basebackup_options *opt)
(errcode_for_file_access(),
errmsg("could not stat file \"%s\": %m",
XLOG_CONTROL_FILE)));
- sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf,
+ sendFile(sink, XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf,
false, InvalidOid, &manifest, NULL);
}
else
- sendTablespace(ti->path, ti->oid, false, &manifest);
+ {
+ char *archive_name = psprintf("%s.tar", ti->oid);
+
+ bbsink_begin_archive(sink, archive_name);
+
+ sendTablespace(sink, ti->path, ti->oid, false, &manifest);
+ }
/*
* If we're including WAL, and this is the main data directory we
- * don't terminate the tar stream here. Instead, we will append
- * the xlog files below and terminate it then. This is safe since
- * the main data directory is always sent *last*.
+ * don't treat this as the end of the tablespace. Instead, we will
+ * include the xlog files below and stop afterwards. This is safe
+ * since the main data directory is always sent *last*.
*/
if (opt->includewal && ti->path == NULL)
{
- Assert(lnext(tablespaces, lc) == NULL);
+ Assert(lnext(state.tablespaces, lc) == NULL);
}
else
- pq_putemptymessage('c'); /* CopyDone */
-
- tblspc_streamed++;
- pgstat_progress_update_param(PROGRESS_BASEBACKUP_TBLSPC_STREAMED,
- tblspc_streamed);
+ bbsink_end_archive(sink);
}
- pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE,
- PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE);
+ basebackup_progress_wait_wal_archive(&state);
endptr = do_pg_stop_backup(labelfile->data, !opt->nowait, &endtli);
}
PG_END_ENSURE_ERROR_CLEANUP(do_pg_abort_backup, BoolGetDatum(false));
@@ -489,8 +403,7 @@ perform_base_backup(basebackup_options *opt)
ListCell *lc;
TimeLineID tli;
- pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE,
- PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL);
+ basebackup_progress_transfer_wal();
/*
* I'd rather not worry about timelines here, so scan pg_wal and
@@ -501,8 +414,8 @@ perform_base_backup(basebackup_options *opt)
* shouldn't be such files, but if there are, there's little harm in
* including them.
*/
- XLByteToSeg(startptr, startsegno, wal_segment_size);
- XLogFileName(firstoff, starttli, startsegno, wal_segment_size);
+ XLByteToSeg(state.startptr, startsegno, wal_segment_size);
+ XLogFileName(firstoff, state.starttli, startsegno, wal_segment_size);
XLByteToPrevSeg(endptr, endsegno, wal_segment_size);
XLogFileName(lastoff, endtli, endsegno, wal_segment_size);
@@ -528,7 +441,7 @@ perform_base_backup(basebackup_options *opt)
* Before we go any further, check that none of the WAL segments we
* need were removed.
*/
- CheckXLogRemoved(startsegno, starttli);
+ CheckXLogRemoved(startsegno, state.starttli);
/*
* Sort the WAL filenames. We want to send the files in order from
@@ -555,7 +468,7 @@ perform_base_backup(basebackup_options *opt)
{
char startfname[MAXFNAMELEN];
- XLogFileName(startfname, starttli, startsegno,
+ XLogFileName(startfname, state.starttli, startsegno,
wal_segment_size);
ereport(ERROR,
(errmsg("could not find WAL file \"%s\"", startfname)));
@@ -590,7 +503,6 @@ perform_base_backup(basebackup_options *opt)
{
char *walFileName = (char *) lfirst(lc);
int fd;
- char buf[TAR_SEND_SIZE];
size_t cnt;
pgoff_t len = 0;
@@ -629,22 +541,17 @@ perform_base_backup(basebackup_options *opt)
}
/* send the WAL file itself */
- _tarWriteHeader(pathbuf, NULL, &statbuf, false);
+ _tarWriteHeader(sink, pathbuf, NULL, &statbuf, false);
- while ((cnt = basebackup_read_file(fd, buf,
- Min(sizeof(buf),
+ while ((cnt = basebackup_read_file(fd, sink->bbs_buffer,
+ Min(sink->bbs_buffer_length,
wal_segment_size - len),
len, pathbuf, true)) > 0)
{
CheckXLogRemoved(segno, tli);
- /* Send the chunk as a CopyData message */
- if (pq_putmessage('d', buf, cnt))
- ereport(ERROR,
- (errmsg("base backup could not send data, aborting backup")));
- update_basebackup_progress(cnt);
+ bbsink_archive_contents(sink, cnt);
len += cnt;
- throttle(cnt);
if (len == wal_segment_size)
break;
@@ -673,7 +580,7 @@ perform_base_backup(basebackup_options *opt)
* complete segment.
*/
StatusFilePath(pathbuf, walFileName, ".done");
- sendFileWithContent(pathbuf, "", &manifest);
+ sendFileWithContent(sink, pathbuf, "", &manifest);
}
/*
@@ -696,23 +603,23 @@ perform_base_backup(basebackup_options *opt)
(errcode_for_file_access(),
errmsg("could not stat file \"%s\": %m", pathbuf)));
- sendFile(pathbuf, pathbuf, &statbuf, false, InvalidOid,
+ sendFile(sink, pathbuf, pathbuf, &statbuf, false, InvalidOid,
&manifest, NULL);
/* unconditionally mark file as archived */
StatusFilePath(pathbuf, fname, ".done");
- sendFileWithContent(pathbuf, "", &manifest);
+ sendFileWithContent(sink, pathbuf, "", &manifest);
}
- /* Send CopyDone message for the last tar file */
- pq_putemptymessage('c');
+ bbsink_end_archive(sink);
}
- AddWALInfoToBackupManifest(&manifest, startptr, starttli, endptr, endtli);
+ AddWALInfoToBackupManifest(&manifest, state.startptr, state.starttli,
+ endptr, endtli);
- SendBackupManifest(&manifest);
+ SendBackupManifest(&manifest, sink);
- SendXlogRecPtrResult(endptr, endtli);
+ bbsink_end_backup(sink, endptr, endtli);
if (total_checksum_failures)
{
@@ -738,7 +645,7 @@ perform_base_backup(basebackup_options *opt)
/* clean up the resource owner we created */
WalSndResourceCleanup(true);
- pgstat_progress_end_command();
+ basebackup_progress_done();
}
/*
@@ -943,6 +850,7 @@ void
SendBaseBackup(BaseBackupCmd *cmd)
{
basebackup_options opt;
+ bbsink *sink;
parse_basebackup_options(cmd->options, &opt);
@@ -957,158 +865,40 @@ SendBaseBackup(BaseBackupCmd *cmd)
set_ps_display(activitymsg);
}
- perform_base_backup(&opt);
-}
-
-static void
-send_int8_string(StringInfoData *buf, int64 intval)
-{
- char is[32];
-
- sprintf(is, INT64_FORMAT, intval);
- pq_sendint32(buf, strlen(is));
- pq_sendbytes(buf, is, strlen(is));
-}
-
-static void
-SendBackupHeader(List *tablespaces)
-{
- StringInfoData buf;
- ListCell *lc;
-
- /* Construct and send the directory information */
- pq_beginmessage(&buf, 'T'); /* RowDescription */
- pq_sendint16(&buf, 3); /* 3 fields */
-
- /* First field - spcoid */
- pq_sendstring(&buf, "spcoid");
- pq_sendint32(&buf, 0); /* table oid */
- pq_sendint16(&buf, 0); /* attnum */
- pq_sendint32(&buf, OIDOID); /* type oid */
- pq_sendint16(&buf, 4); /* typlen */
- pq_sendint32(&buf, 0); /* typmod */
- pq_sendint16(&buf, 0); /* format code */
-
- /* Second field - spclocation */
- pq_sendstring(&buf, "spclocation");
- pq_sendint32(&buf, 0);
- pq_sendint16(&buf, 0);
- pq_sendint32(&buf, TEXTOID);
- pq_sendint16(&buf, -1);
- pq_sendint32(&buf, 0);
- pq_sendint16(&buf, 0);
-
- /* Third field - size */
- pq_sendstring(&buf, "size");
- pq_sendint32(&buf, 0);
- pq_sendint16(&buf, 0);
- pq_sendint32(&buf, INT8OID);
- pq_sendint16(&buf, 8);
- pq_sendint32(&buf, 0);
- pq_sendint16(&buf, 0);
- pq_endmessage(&buf);
-
- foreach(lc, tablespaces)
- {
- tablespaceinfo *ti = lfirst(lc);
-
- /* Send one datarow message */
- pq_beginmessage(&buf, 'D');
- pq_sendint16(&buf, 3); /* number of columns */
- if (ti->path == NULL)
- {
- pq_sendint32(&buf, -1); /* Length = -1 ==> NULL */
- pq_sendint32(&buf, -1);
- }
- else
- {
- Size len;
-
- len = strlen(ti->oid);
- pq_sendint32(&buf, len);
- pq_sendbytes(&buf, ti->oid, len);
-
- len = strlen(ti->path);
- pq_sendint32(&buf, len);
- pq_sendbytes(&buf, ti->path, len);
- }
- if (ti->size >= 0)
- send_int8_string(&buf, ti->size / 1024);
- else
- pq_sendint32(&buf, -1); /* NULL */
+ /* Create a basic basebackup sink. */
+ sink = bbsink_copytblspc_new();
- pq_endmessage(&buf);
- }
+ /* Set up network throttling, if client requested it */
+ if (opt.maxrate > 0)
+ sink = bbsink_throttle_new(sink, opt.maxrate);
- /* Send a CommandComplete message */
- pq_puttextmessage('C', "SELECT");
-}
-
-/*
- * Send a single resultset containing just a single
- * XLogRecPtr record (in text format)
- */
-static void
-SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
-{
- StringInfoData buf;
- char str[MAXFNAMELEN];
- Size len;
-
- pq_beginmessage(&buf, 'T'); /* RowDescription */
- pq_sendint16(&buf, 2); /* 2 fields */
-
- /* Field headers */
- pq_sendstring(&buf, "recptr");
- pq_sendint32(&buf, 0); /* table oid */
- pq_sendint16(&buf, 0); /* attnum */
- pq_sendint32(&buf, TEXTOID); /* type oid */
- pq_sendint16(&buf, -1);
- pq_sendint32(&buf, 0);
- pq_sendint16(&buf, 0);
-
- pq_sendstring(&buf, "tli");
- pq_sendint32(&buf, 0); /* table oid */
- pq_sendint16(&buf, 0); /* attnum */
+ /* Set up progress reporting. */
+ sink = bbsink_progress_new(sink, opt.progress);
/*
- * int8 may seem like a surprising data type for this, but in theory int4
- * would not be wide enough for this, as TimeLineID is unsigned.
+ * Perform the base backup, but make sure we clean up the bbsink even if
+ * an error occurs.
*/
- pq_sendint32(&buf, INT8OID); /* type oid */
- pq_sendint16(&buf, -1);
- pq_sendint32(&buf, 0);
- pq_sendint16(&buf, 0);
- pq_endmessage(&buf);
-
- /* Data row */
- pq_beginmessage(&buf, 'D');
- pq_sendint16(&buf, 2); /* number of columns */
-
- len = snprintf(str, sizeof(str),
- "%X/%X", LSN_FORMAT_ARGS(ptr));
- pq_sendint32(&buf, len);
- pq_sendbytes(&buf, str, len);
-
- len = snprintf(str, sizeof(str), "%u", tli);
- pq_sendint32(&buf, len);
- pq_sendbytes(&buf, str, len);
-
- pq_endmessage(&buf);
-
- /* Send a CommandComplete message */
- pq_puttextmessage('C', "SELECT");
+ PG_TRY();
+ {
+ perform_base_backup(&opt, sink);
+ }
+ PG_FINALLY();
+ {
+ bbsink_cleanup(sink);
+ }
+ PG_END_TRY();
}
/*
* Inject a file with given name and content in the output tar stream.
*/
static void
-sendFileWithContent(const char *filename, const char *content,
+sendFileWithContent(bbsink *sink, const char *filename, const char *content,
backup_manifest_info *manifest)
{
struct stat statbuf;
- int pad,
+ int bytes_done = 0,
len;
pg_checksum_context checksum_ctx;
@@ -1134,25 +924,23 @@ sendFileWithContent(const char *filename, const char *content,
statbuf.st_mode = pg_file_create_mode;
statbuf.st_size = len;
- _tarWriteHeader(filename, NULL, &statbuf, false);
- /* Send the contents as a CopyData message */
- pq_putmessage('d', content, len);
- update_basebackup_progress(len);
+ _tarWriteHeader(sink, filename, NULL, &statbuf, false);
- /* Pad to a multiple of the tar block size. */
- pad = tarPaddingBytesRequired(len);
- if (pad > 0)
+ if (pg_checksum_update(&checksum_ctx, (uint8 *) content, len) < 0)
+ elog(ERROR, "could not update checksum of file \"%s\"",
+ filename);
+
+ while (bytes_done < len)
{
- char buf[TAR_BLOCK_SIZE];
+ size_t remaining = len - bytes_done;
+ size_t nbytes = Min(sink->bbs_buffer_length, remaining);
- MemSet(buf, 0, pad);
- pq_putmessage('d', buf, pad);
- update_basebackup_progress(pad);
+ memcpy(sink->bbs_buffer, content, nbytes);
+ bbsink_archive_contents(sink, nbytes);
+ bytes_done += nbytes;
}
- if (pg_checksum_update(&checksum_ctx, (uint8 *) content, len) < 0)
- elog(ERROR, "could not update checksum of file \"%s\"",
- filename);
+ _tarWritePadding(sink, len);
AddFileToBackupManifest(manifest, NULL, filename, len,
(pg_time_t) statbuf.st_mtime, &checksum_ctx);
@@ -1166,7 +954,7 @@ sendFileWithContent(const char *filename, const char *content,
* Only used to send auxiliary tablespaces, not PGDATA.
*/
static int64
-sendTablespace(char *path, char *spcoid, bool sizeonly,
+sendTablespace(bbsink *sink, char *path, char *spcoid, bool sizeonly,
backup_manifest_info *manifest)
{
int64 size;
@@ -1196,11 +984,11 @@ sendTablespace(char *path, char *spcoid, bool sizeonly,
return 0;
}
- size = _tarWriteHeader(TABLESPACE_VERSION_DIRECTORY, NULL, &statbuf,
+ size = _tarWriteHeader(sink, TABLESPACE_VERSION_DIRECTORY, NULL, &statbuf,
sizeonly);
/* Send all the files in the tablespace version directory */
- size += sendDir(pathbuf, strlen(path), sizeonly, NIL, true, manifest,
+ size += sendDir(sink, pathbuf, strlen(path), sizeonly, NIL, true, manifest,
spcoid);
return size;
@@ -1219,8 +1007,8 @@ sendTablespace(char *path, char *spcoid, bool sizeonly,
* as it will be sent separately in the tablespace_map file.
*/
static int64
-sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
- bool sendtblspclinks, backup_manifest_info *manifest,
+sendDir(bbsink *sink, const char *path, int basepathlen, bool sizeonly,
+ List *tablespaces, bool sendtblspclinks, backup_manifest_info *manifest,
const char *spcoid)
{
DIR *dir;
@@ -1380,8 +1168,8 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
{
elog(DEBUG1, "contents of directory \"%s\" excluded from backup", de->d_name);
convert_link_to_directory(pathbuf, &statbuf);
- size += _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf,
- sizeonly);
+ size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, NULL,
+ &statbuf, sizeonly);
excludeFound = true;
break;
}
@@ -1398,8 +1186,8 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
{
elog(DEBUG1, "contents of directory \"%s\" excluded from backup", statrelpath);
convert_link_to_directory(pathbuf, &statbuf);
- size += _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf,
- sizeonly);
+ size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, NULL,
+ &statbuf, sizeonly);
continue;
}
@@ -1412,15 +1200,15 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
{
/* If pg_wal is a symlink, write it as a directory anyway */
convert_link_to_directory(pathbuf, &statbuf);
- size += _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf,
- sizeonly);
+ size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, NULL,
+ &statbuf, sizeonly);
/*
* Also send archive_status directory (by hackishly reusing
* statbuf from above ...).
*/
- size += _tarWriteHeader("./pg_wal/archive_status", NULL, &statbuf,
- sizeonly);
+ size += _tarWriteHeader(sink, "./pg_wal/archive_status", NULL,
+ &statbuf, sizeonly);
continue; /* don't recurse into pg_wal */
}
@@ -1451,7 +1239,7 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
pathbuf)));
linkpath[rllen] = '\0';
- size += _tarWriteHeader(pathbuf + basepathlen + 1, linkpath,
+ size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, linkpath,
&statbuf, sizeonly);
#else
@@ -1475,7 +1263,7 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
* Store a directory entry in the tar file so we can get the
* permissions right.
*/
- size += _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf,
+ size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, NULL, &statbuf,
sizeonly);
/*
@@ -1507,7 +1295,7 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
skip_this_dir = true;
if (!skip_this_dir)
- size += sendDir(pathbuf, basepathlen, sizeonly, tablespaces,
+ size += sendDir(sink, pathbuf, basepathlen, sizeonly, tablespaces,
sendtblspclinks, manifest, spcoid);
}
else if (S_ISREG(statbuf.st_mode))
@@ -1515,7 +1303,7 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
bool sent = false;
if (!sizeonly)
- sent = sendFile(pathbuf, pathbuf + basepathlen + 1, &statbuf,
+ sent = sendFile(sink, pathbuf, pathbuf + basepathlen + 1, &statbuf,
true, isDbDir ? atooid(lastDir + 1) : InvalidOid,
manifest, spcoid);
@@ -1592,21 +1380,19 @@ is_checksummed_file(const char *fullpath, const char *filename)
* and the file did not exist.
*/
static bool
-sendFile(const char *readfilename, const char *tarfilename,
+sendFile(bbsink *sink, const char *readfilename, const char *tarfilename,
struct stat *statbuf, bool missing_ok, Oid dboid,
backup_manifest_info *manifest, const char *spcoid)
{
int fd;
BlockNumber blkno = 0;
bool block_retry = false;
- char buf[TAR_SEND_SIZE];
uint16 checksum;
int checksum_failures = 0;
off_t cnt;
int i;
pgoff_t len = 0;
char *page;
- size_t pad;
PageHeader phdr;
int segmentno = 0;
char *segmentpath;
@@ -1627,7 +1413,7 @@ sendFile(const char *readfilename, const char *tarfilename,
errmsg("could not open file \"%s\": %m", readfilename)));
}
- _tarWriteHeader(tarfilename, NULL, statbuf, false);
+ _tarWriteHeader(sink, tarfilename, NULL, statbuf, false);
if (!noverify_checksums && DataChecksumsEnabled())
{
@@ -1668,9 +1454,11 @@ sendFile(const char *readfilename, const char *tarfilename,
*/
while (len < statbuf->st_size)
{
+ size_t remaining = statbuf->st_size - len;
+
/* Try to read some more data. */
- cnt = basebackup_read_file(fd, buf,
- Min(sizeof(buf), statbuf->st_size - len),
+ cnt = basebackup_read_file(fd, sink->bbs_buffer,
+ Min(sink->bbs_buffer_length, remaining),
len, readfilename, true);
/*
@@ -1687,7 +1475,7 @@ sendFile(const char *readfilename, const char *tarfilename,
* TAR_SEND_SIZE/buf is divisible by BLCKSZ and we read a multiple of
* BLCKSZ bytes.
*/
- Assert(TAR_SEND_SIZE % BLCKSZ == 0);
+ Assert((sink->bbs_buffer_length % BLCKSZ) == 0);
if (verify_checksum && (cnt % BLCKSZ != 0))
{
@@ -1703,7 +1491,7 @@ sendFile(const char *readfilename, const char *tarfilename,
{
for (i = 0; i < cnt / BLCKSZ; i++)
{
- page = buf + BLCKSZ * i;
+ page = sink->bbs_buffer + BLCKSZ * i;
/*
* Only check pages which have not been modified since the
@@ -1713,7 +1501,7 @@ sendFile(const char *readfilename, const char *tarfilename,
* this case. We also skip completely new pages, since they
* don't have a checksum yet.
*/
- if (!PageIsNew(page) && PageGetLSN(page) < startptr)
+ if (!PageIsNew(page) && PageGetLSN(page) < sink->bbs_state->startptr)
{
checksum = pg_checksum_page((char *) page, blkno + segmentno * RELSEG_SIZE);
phdr = (PageHeader) page;
@@ -1735,7 +1523,8 @@ sendFile(const char *readfilename, const char *tarfilename,
/* Reread the failed block */
reread_cnt =
- basebackup_read_file(fd, buf + BLCKSZ * i,
+ basebackup_read_file(fd,
+ sink->bbs_buffer + BLCKSZ * i,
BLCKSZ, len + BLCKSZ * i,
readfilename,
false);
@@ -1782,34 +1571,29 @@ sendFile(const char *readfilename, const char *tarfilename,
}
}
- /* Send the chunk as a CopyData message */
- if (pq_putmessage('d', buf, cnt))
- ereport(ERROR,
- (errmsg("base backup could not send data, aborting backup")));
- update_basebackup_progress(cnt);
+ bbsink_archive_contents(sink, cnt);
/* Also feed it to the checksum machinery. */
- if (pg_checksum_update(&checksum_ctx, (uint8 *) buf, cnt) < 0)
+ if (pg_checksum_update(&checksum_ctx,
+ (uint8 *) sink->bbs_buffer, cnt) < 0)
elog(ERROR, "could not update checksum of base backup");
len += cnt;
- throttle(cnt);
}
/* If the file was truncated while we were sending it, pad it with zeros */
- if (len < statbuf->st_size)
+ while (len < statbuf->st_size)
{
- MemSet(buf, 0, sizeof(buf));
- while (len < statbuf->st_size)
- {
- cnt = Min(sizeof(buf), statbuf->st_size - len);
- pq_putmessage('d', buf, cnt);
- if (pg_checksum_update(&checksum_ctx, (uint8 *) buf, cnt) < 0)
- elog(ERROR, "could not update checksum of base backup");
- update_basebackup_progress(cnt);
- len += cnt;
- throttle(cnt);
- }
+ size_t remaining = statbuf->st_size - len;
+ size_t nbytes = Min(sink->bbs_buffer_length, remaining);
+
+ MemSet(sink->bbs_buffer, 0, nbytes);
+ if (pg_checksum_update(&checksum_ctx,
+ (uint8 *) sink->bbs_buffer,
+ nbytes) < 0)
+ elog(ERROR, "could not update checksum of base backup");
+ bbsink_archive_contents(sink, nbytes);
+ len += nbytes;
}
/*
@@ -1817,13 +1601,7 @@ sendFile(const char *readfilename, const char *tarfilename,
* of data is probably not worth throttling, and is not checksummed
* because it's not actually part of the file.)
*/
- pad = tarPaddingBytesRequired(len);
- if (pad > 0)
- {
- MemSet(buf, 0, pad);
- pq_putmessage('d', buf, pad);
- update_basebackup_progress(pad);
- }
+ _tarWritePadding(sink, len);
CloseTransientFile(fd);
@@ -1846,18 +1624,28 @@ sendFile(const char *readfilename, const char *tarfilename,
return true;
}
-
static int64
-_tarWriteHeader(const char *filename, const char *linktarget,
+_tarWriteHeader(bbsink *sink, const char *filename, const char *linktarget,
struct stat *statbuf, bool sizeonly)
{
- char h[TAR_BLOCK_SIZE];
enum tarError rc;
if (!sizeonly)
{
- rc = tarCreateHeader(h, filename, linktarget, statbuf->st_size,
- statbuf->st_mode, statbuf->st_uid, statbuf->st_gid,
+ /*
+ * As of this writing, the smallest supported block size is 1kB, which
+ * is twice TAR_BLOCK_SIZE. Since the buffer size is required to be a
+ * multiple of BLCKSZ, it should be safe to assume that the buffer is
+ * large enough to fit an entire tar block. We double-check by means
+ * of these assertions.
+ */
+ StaticAssertStmt(TAR_BLOCK_SIZE <= BLCKSZ,
+ "BLCKSZ too small for tar block");
+ Assert(sink->bbs_buffer_length >= TAR_BLOCK_SIZE);
+
+ rc = tarCreateHeader(sink->bbs_buffer, filename, linktarget,
+ statbuf->st_size, statbuf->st_mode,
+ statbuf->st_uid, statbuf->st_gid,
statbuf->st_mtime);
switch (rc)
@@ -1879,134 +1667,48 @@ _tarWriteHeader(const char *filename, const char *linktarget,
elog(ERROR, "unrecognized tar error: %d", rc);
}
- pq_putmessage('d', h, sizeof(h));
- update_basebackup_progress(sizeof(h));
+ bbsink_archive_contents(sink, TAR_BLOCK_SIZE);
}
- return sizeof(h);
+ return TAR_BLOCK_SIZE;
}
/*
- * If the entry in statbuf is a link, then adjust statbuf to make it look like a
- * directory, so that it will be written that way.
+ * Pad with zero bytes out to a multiple of TAR_BLOCK_SIZE.
*/
static void
-convert_link_to_directory(const char *pathbuf, struct stat *statbuf)
+_tarWritePadding(bbsink *sink, int len)
{
- /* If symlink, write it as a directory anyway */
-#ifndef WIN32
- if (S_ISLNK(statbuf->st_mode))
-#else
- if (pgwin32_is_junction(pathbuf))
-#endif
- statbuf->st_mode = S_IFDIR | pg_dir_create_mode;
-}
-
-/*
- * Increment the network transfer counter by the given number of bytes,
- * and sleep if necessary to comply with the requested network transfer
- * rate.
- */
-static void
-throttle(size_t increment)
-{
- TimeOffset elapsed_min;
-
- if (throttling_counter < 0)
- return;
-
- throttling_counter += increment;
- if (throttling_counter < throttling_sample)
- return;
-
- /* How much time should have elapsed at minimum? */
- elapsed_min = elapsed_min_unit *
- (throttling_counter / throttling_sample);
+ int pad = tarPaddingBytesRequired(len);
/*
- * Since the latch could be set repeatedly because of concurrently WAL
- * activity, sleep in a loop to ensure enough time has passed.
+ * As in _tarWriteHeader, it should be safe to assume that the buffer is
+ * large enough that we don't need to do this in multiple chunks.
*/
- for (;;)
- {
- TimeOffset elapsed,
- sleep;
- int wait_result;
-
- /* Time elapsed since the last measurement (and possible wake up). */
- elapsed = GetCurrentTimestamp() - throttled_last;
-
- /* sleep if the transfer is faster than it should be */
- sleep = elapsed_min - elapsed;
- if (sleep <= 0)
- break;
-
- ResetLatch(MyLatch);
-
- /* We're eating a potentially set latch, so check for interrupts */
- CHECK_FOR_INTERRUPTS();
+ Assert(sink->bbs_buffer_length >= TAR_BLOCK_SIZE);
+ Assert(pad <= TAR_BLOCK_SIZE);
- /*
- * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be
- * the maximum time to sleep. Thus the cast to long is safe.
- */
- wait_result = WaitLatch(MyLatch,
- WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
- (long) (sleep / 1000),
- WAIT_EVENT_BASE_BACKUP_THROTTLE);
-
- if (wait_result & WL_LATCH_SET)
- CHECK_FOR_INTERRUPTS();
-
- /* Done waiting? */
- if (wait_result & WL_TIMEOUT)
- break;
+ if (pad > 0)
+ {
+ MemSet(sink->bbs_buffer, 0, pad);
+ bbsink_archive_contents(sink, pad);
}
-
- /*
- * As we work with integers, only whole multiple of throttling_sample was
- * processed. The rest will be done during the next call of this function.
- */
- throttling_counter %= throttling_sample;
-
- /*
- * Time interval for the remaining amount and possible next increments
- * starts now.
- */
- throttled_last = GetCurrentTimestamp();
}
/*
- * Increment the counter for the amount of data already streamed
- * by the given number of bytes, and update the progress report for
- * pg_stat_progress_basebackup.
+ * If the entry in statbuf is a link, then adjust statbuf to make it look like a
+ * directory, so that it will be written that way.
*/
static void
-update_basebackup_progress(int64 delta)
+convert_link_to_directory(const char *pathbuf, struct stat *statbuf)
{
- const int index[] = {
- PROGRESS_BASEBACKUP_BACKUP_STREAMED,
- PROGRESS_BASEBACKUP_BACKUP_TOTAL
- };
- int64 val[2];
- int nparam = 0;
-
- backup_streamed += delta;
- val[nparam++] = backup_streamed;
-
- /*
- * Avoid overflowing past 100% or the full size. This may make the total
- * size number change as we approach the end of the backup (the estimate
- * will always be wrong if WAL is included), but that's better than having
- * the done column be bigger than the total.
- */
- if (backup_total > -1 && backup_streamed > backup_total)
- {
- backup_total = backup_streamed;
- val[nparam++] = backup_total;
- }
-
- pgstat_progress_update_multi_param(nparam, index, val);
+ /* If symlink, write it as a directory anyway */
+#ifndef WIN32
+ if (S_ISLNK(statbuf->st_mode))
+#else
+ if (pgwin32_is_junction(pathbuf))
+#endif
+ statbuf->st_mode = S_IFDIR | pg_dir_create_mode;
}
/*