aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Haas <rhaas@postgresql.org>2021-11-05 10:08:30 -0400
committerRobert Haas <rhaas@postgresql.org>2021-11-05 10:08:30 -0400
commitbef47ff85df18bf4a3a9b13bd2a54820e27f3614 (patch)
tree9b0ff2c1fa76a38a425172a66d9afb2c3550743c
parentbd807be6935929bdefe74d1258ca08048f0aafa3 (diff)
downloadpostgresql-bef47ff85df18bf4a3a9b13bd2a54820e27f3614.tar.gz
postgresql-bef47ff85df18bf4a3a9b13bd2a54820e27f3614.zip
Introduce 'bbsink' abstraction to modularize base backup code.
The base backup code has accumulated a healthy number of new features over the years, but it's becoming increasingly difficult to maintain and further enhance that code because there's no real separation of concerns. For example, the code that understands knows the details of how we send data to the client using the libpq protocol is scattered throughout basebackup.c, rather than being centralized in one place. To try to improve this situation, introduce a new 'bbsink' object which acts as a recipient for archives generated during the base backup progress and also for the backup manifest. This commit introduces three types of bbsink: a 'copytblspc' bbsink forwards the backup to the client using one COPY OUT operation per tablespace and another for the manifest, a 'progress' bbsink performs command progress reporting, and a 'throttle' bbsink performs rate-limiting. The 'progress' and 'throttle' bbsink types also forward the data to a successor bbsink; at present, the last bbsink in the chain will always be of type 'copytblspc'. There are plans to add more types of 'bbsink' in future commits. This abstraction is a bit leaky in the case of progress reporting, but this still seems cleaner than what we had before. Patch by me, reviewed and tested by Andres Freund, Sumanta Mukherjee, Dilip Kumar, Suraj Kharage, Dipesh Pandit, Tushar Ahuja, Mark Dilger, and Jeevan Ladhe. Discussion: https://postgr.es/m/CA+TgmoZGwR=ZVWFeecncubEyPdwghnvfkkdBe9BLccLSiqdf9Q@mail.gmail.com Discussion: https://postgr.es/m/CA+TgmoZvqk7UuzxsX1xjJRmMGkqoUGYTZLDCH8SmU1xTPr1Xig@mail.gmail.com
-rw-r--r--src/backend/replication/Makefile4
-rw-r--r--src/backend/replication/backup_manifest.c28
-rw-r--r--src/backend/replication/basebackup.c692
-rw-r--r--src/backend/replication/basebackup_copy.c335
-rw-r--r--src/backend/replication/basebackup_progress.c246
-rw-r--r--src/backend/replication/basebackup_sink.c125
-rw-r--r--src/backend/replication/basebackup_throttle.c199
-rw-r--r--src/include/replication/backup_manifest.h5
-rw-r--r--src/include/replication/basebackup_sink.h296
9 files changed, 1414 insertions, 516 deletions
diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile
index a0381e52f31..74b97cf126a 100644
--- a/src/backend/replication/Makefile
+++ b/src/backend/replication/Makefile
@@ -17,6 +17,10 @@ override CPPFLAGS := -I. -I$(srcdir) $(CPPFLAGS)
OBJS = \
backup_manifest.o \
basebackup.o \
+ basebackup_copy.o \
+ basebackup_progress.o \
+ basebackup_sink.o \
+ basebackup_throttle.o \
repl_gram.o \
slot.o \
slotfuncs.o \
diff --git a/src/backend/replication/backup_manifest.c b/src/backend/replication/backup_manifest.c
index 04ca455ace8..4fe11a3b5cd 100644
--- a/src/backend/replication/backup_manifest.c
+++ b/src/backend/replication/backup_manifest.c
@@ -17,6 +17,7 @@
#include "libpq/pqformat.h"
#include "mb/pg_wchar.h"
#include "replication/backup_manifest.h"
+#include "replication/basebackup_sink.h"
#include "utils/builtins.h"
#include "utils/json.h"
@@ -310,9 +311,8 @@ AddWALInfoToBackupManifest(backup_manifest_info *manifest, XLogRecPtr startptr,
* Finalize the backup manifest, and send it to the client.
*/
void
-SendBackupManifest(backup_manifest_info *manifest)
+SendBackupManifest(backup_manifest_info *manifest, bbsink *sink)
{
- StringInfoData protobuf;
uint8 checksumbuf[PG_SHA256_DIGEST_LENGTH];
char checksumstringbuf[PG_SHA256_DIGEST_STRING_LENGTH];
size_t manifest_bytes_done = 0;
@@ -352,38 +352,28 @@ SendBackupManifest(backup_manifest_info *manifest)
(errcode_for_file_access(),
errmsg("could not rewind temporary file")));
- /* Send CopyOutResponse message */
- pq_beginmessage(&protobuf, 'H');
- pq_sendbyte(&protobuf, 0); /* overall format */
- pq_sendint16(&protobuf, 0); /* natts */
- pq_endmessage(&protobuf);
/*
- * Send CopyData messages.
- *
- * We choose to read back the data from the temporary file in chunks of
- * size BLCKSZ; this isn't necessary, but buffile.c uses that as the I/O
- * size, so it seems to make sense to match that value here.
+ * Send the backup manifest.
*/
+ bbsink_begin_manifest(sink);
while (manifest_bytes_done < manifest->manifest_size)
{
- char manifestbuf[BLCKSZ];
size_t bytes_to_read;
size_t rc;
- bytes_to_read = Min(sizeof(manifestbuf),
+ bytes_to_read = Min(sink->bbs_buffer_length,
manifest->manifest_size - manifest_bytes_done);
- rc = BufFileRead(manifest->buffile, manifestbuf, bytes_to_read);
+ rc = BufFileRead(manifest->buffile, sink->bbs_buffer,
+ bytes_to_read);
if (rc != bytes_to_read)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read from temporary file: %m")));
- pq_putmessage('d', manifestbuf, bytes_to_read);
+ bbsink_manifest_contents(sink, bytes_to_read);
manifest_bytes_done += bytes_to_read;
}
-
- /* No more data, so send CopyDone message */
- pq_putemptymessage('c');
+ bbsink_end_manifest(sink);
/* Release resources */
BufFileClose(manifest->buffile);
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index b7359f43903..38c82c46196 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -17,13 +17,9 @@
#include <time.h>
#include "access/xlog_internal.h" /* for pg_start/stop_backup */
-#include "catalog/pg_type.h"
#include "common/file_perm.h"
#include "commands/defrem.h"
-#include "commands/progress.h"
#include "lib/stringinfo.h"
-#include "libpq/libpq.h"
-#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/pg_list.h"
#include "pgstat.h"
@@ -31,6 +27,7 @@
#include "port.h"
#include "postmaster/syslogger.h"
#include "replication/basebackup.h"
+#include "replication/basebackup_sink.h"
#include "replication/backup_manifest.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
@@ -46,6 +43,16 @@
#include "utils/resowner.h"
#include "utils/timestamp.h"
+/*
+ * How much data do we want to send in one CopyData message? Note that
+ * this may also result in reading the underlying files in chunks of this
+ * size.
+ *
+ * NB: The buffer size is required to be a multiple of the system block
+ * size, so use that value instead if it's bigger than our preference.
+ */
+#define SINK_BUFFER_LENGTH Max(32768, BLCKSZ)
+
typedef struct
{
const char *label;
@@ -59,27 +66,25 @@ typedef struct
pg_checksum_type manifest_checksum_type;
} basebackup_options;
-static int64 sendTablespace(char *path, char *oid, bool sizeonly,
+static int64 sendTablespace(bbsink *sink, char *path, char *oid, bool sizeonly,
struct backup_manifest_info *manifest);
-static int64 sendDir(const char *path, int basepathlen, bool sizeonly,
+static int64 sendDir(bbsink *sink, const char *path, int basepathlen, bool sizeonly,
List *tablespaces, bool sendtblspclinks,
backup_manifest_info *manifest, const char *spcoid);
-static bool sendFile(const char *readfilename, const char *tarfilename,
+static bool sendFile(bbsink *sink, const char *readfilename, const char *tarfilename,
struct stat *statbuf, bool missing_ok, Oid dboid,
backup_manifest_info *manifest, const char *spcoid);
-static void sendFileWithContent(const char *filename, const char *content,
+static void sendFileWithContent(bbsink *sink, const char *filename,
+ const char *content,
backup_manifest_info *manifest);
-static int64 _tarWriteHeader(const char *filename, const char *linktarget,
- struct stat *statbuf, bool sizeonly);
+static int64 _tarWriteHeader(bbsink *sink, const char *filename,
+ const char *linktarget, struct stat *statbuf,
+ bool sizeonly);
+static void _tarWritePadding(bbsink *sink, int len);
static void convert_link_to_directory(const char *pathbuf, struct stat *statbuf);
-static void send_int8_string(StringInfoData *buf, int64 intval);
-static void SendBackupHeader(List *tablespaces);
-static void perform_base_backup(basebackup_options *opt);
+static void perform_base_backup(basebackup_options *opt, bbsink *sink);
static void parse_basebackup_options(List *options, basebackup_options *opt);
-static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
static int compareWalFileNames(const ListCell *a, const ListCell *b);
-static void throttle(size_t increment);
-static void update_basebackup_progress(int64 delta);
static bool is_checksummed_file(const char *fullpath, const char *filename);
static int basebackup_read_file(int fd, char *buf, size_t nbytes, off_t offset,
const char *filename, bool partial_read_ok);
@@ -90,31 +95,6 @@ static bool backup_started_in_recovery = false;
/* Relative path of temporary statistics directory */
static char *statrelpath = NULL;
-/*
- * Size of each block sent into the tar stream for larger files.
- */
-#define TAR_SEND_SIZE 32768
-
-/*
- * How frequently to throttle, as a fraction of the specified rate-second.
- */
-#define THROTTLING_FREQUENCY 8
-
-/* The actual number of bytes, transfer of which may cause sleep. */
-static uint64 throttling_sample;
-
-/* Amount of data already transferred but not yet throttled. */
-static int64 throttling_counter;
-
-/* The minimum time required to transfer throttling_sample bytes. */
-static TimeOffset elapsed_min_unit;
-
-/* The last check of the transfer rate. */
-static TimestampTz throttled_last;
-
-/* The starting XLOG position of the base backup. */
-static XLogRecPtr startptr;
-
/* Total number of checksum failures during base backup. */
static long long int total_checksum_failures;
@@ -122,15 +102,6 @@ static long long int total_checksum_failures;
static bool noverify_checksums = false;
/*
- * Total amount of backup data that will be streamed.
- * -1 means that the size is not estimated.
- */
-static int64 backup_total = 0;
-
-/* Amount of backup data already streamed */
-static int64 backup_streamed = 0;
-
-/*
* Definition of one element part of an exclusion list, used for paths part
* of checksum validation or base backups. "name" is the name of the file
* or path to check for exclusion. If "match_prefix" is true, any items
@@ -253,32 +224,22 @@ static const struct exclude_list_item noChecksumFiles[] = {
* clobbered by longjmp" from stupider versions of gcc.
*/
static void
-perform_base_backup(basebackup_options *opt)
+perform_base_backup(basebackup_options *opt, bbsink *sink)
{
- TimeLineID starttli;
+ bbsink_state state;
XLogRecPtr endptr;
TimeLineID endtli;
StringInfo labelfile;
StringInfo tblspc_map_file;
backup_manifest_info manifest;
int datadirpathlen;
- List *tablespaces = NIL;
- backup_total = 0;
- backup_streamed = 0;
- pgstat_progress_start_command(PROGRESS_COMMAND_BASEBACKUP, InvalidOid);
-
- /*
- * If the estimation of the total backup size is disabled, make the
- * backup_total column in the view return NULL by setting the parameter to
- * -1.
- */
- if (!opt->progress)
- {
- backup_total = -1;
- pgstat_progress_update_param(PROGRESS_BASEBACKUP_BACKUP_TOTAL,
- backup_total);
- }
+ /* Initial backup state, insofar as we know it now. */
+ state.tablespaces = NIL;
+ state.tablespace_num = 0;
+ state.bytes_done = 0;
+ state.bytes_total = 0;
+ state.bytes_total_is_valid = false;
/* we're going to use a BufFile, so we need a ResourceOwner */
Assert(CurrentResourceOwner == NULL);
@@ -295,11 +256,11 @@ perform_base_backup(basebackup_options *opt)
total_checksum_failures = 0;
- pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE,
- PROGRESS_BASEBACKUP_PHASE_WAIT_CHECKPOINT);
- startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &starttli,
- labelfile, &tablespaces,
- tblspc_map_file);
+ basebackup_progress_wait_checkpoint();
+ state.startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint,
+ &state.starttli,
+ labelfile, &state.tablespaces,
+ tblspc_map_file);
/*
* Once do_pg_start_backup has been called, ensure that any failure causes
@@ -312,7 +273,6 @@ perform_base_backup(basebackup_options *opt)
{
ListCell *lc;
tablespaceinfo *ti;
- int tblspc_streamed = 0;
/*
* Calculate the relative path of temporary statistics directory in
@@ -329,7 +289,7 @@ perform_base_backup(basebackup_options *opt)
/* Add a node for the base directory at the end */
ti = palloc0(sizeof(tablespaceinfo));
ti->size = -1;
- tablespaces = lappend(tablespaces, ti);
+ state.tablespaces = lappend(state.tablespaces, ti);
/*
* Calculate the total backup size by summing up the size of each
@@ -337,100 +297,53 @@ perform_base_backup(basebackup_options *opt)
*/
if (opt->progress)
{
- pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE,
- PROGRESS_BASEBACKUP_PHASE_ESTIMATE_BACKUP_SIZE);
+ basebackup_progress_estimate_backup_size();
- foreach(lc, tablespaces)
+ foreach(lc, state.tablespaces)
{
tablespaceinfo *tmp = (tablespaceinfo *) lfirst(lc);
if (tmp->path == NULL)
- tmp->size = sendDir(".", 1, true, tablespaces, true, NULL,
- NULL);
+ tmp->size = sendDir(sink, ".", 1, true, state.tablespaces,
+ true, NULL, NULL);
else
- tmp->size = sendTablespace(tmp->path, tmp->oid, true,
+ tmp->size = sendTablespace(sink, tmp->path, tmp->oid, true,
NULL);
- backup_total += tmp->size;
+ state.bytes_total += tmp->size;
}
+ state.bytes_total_is_valid = true;
}
- /* Report that we are now streaming database files as a base backup */
- {
- const int index[] = {
- PROGRESS_BASEBACKUP_PHASE,
- PROGRESS_BASEBACKUP_BACKUP_TOTAL,
- PROGRESS_BASEBACKUP_TBLSPC_TOTAL
- };
- const int64 val[] = {
- PROGRESS_BASEBACKUP_PHASE_STREAM_BACKUP,
- backup_total, list_length(tablespaces)
- };
-
- pgstat_progress_update_multi_param(3, index, val);
- }
-
- /* Send the starting position of the backup */
- SendXlogRecPtrResult(startptr, starttli);
-
- /* Send tablespace header */
- SendBackupHeader(tablespaces);
-
- /* Setup and activate network throttling, if client requested it */
- if (opt->maxrate > 0)
- {
- throttling_sample =
- (int64) opt->maxrate * (int64) 1024 / THROTTLING_FREQUENCY;
-
- /*
- * The minimum amount of time for throttling_sample bytes to be
- * transferred.
- */
- elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY;
-
- /* Enable throttling. */
- throttling_counter = 0;
-
- /* The 'real data' starts now (header was ignored). */
- throttled_last = GetCurrentTimestamp();
- }
- else
- {
- /* Disable throttling. */
- throttling_counter = -1;
- }
+ /* notify basebackup sink about start of backup */
+ bbsink_begin_backup(sink, &state, SINK_BUFFER_LENGTH);
/* Send off our tablespaces one by one */
- foreach(lc, tablespaces)
+ foreach(lc, state.tablespaces)
{
tablespaceinfo *ti = (tablespaceinfo *) lfirst(lc);
- StringInfoData buf;
-
- /* Send CopyOutResponse message */
- pq_beginmessage(&buf, 'H');
- pq_sendbyte(&buf, 0); /* overall format */
- pq_sendint16(&buf, 0); /* natts */
- pq_endmessage(&buf);
if (ti->path == NULL)
{
struct stat statbuf;
bool sendtblspclinks = true;
+ bbsink_begin_archive(sink, "base.tar");
+
/* In the main tar, include the backup_label first... */
- sendFileWithContent(BACKUP_LABEL_FILE, labelfile->data,
+ sendFileWithContent(sink, BACKUP_LABEL_FILE, labelfile->data,
&manifest);
/* Then the tablespace_map file, if required... */
if (opt->sendtblspcmapfile)
{
- sendFileWithContent(TABLESPACE_MAP, tblspc_map_file->data,
+ sendFileWithContent(sink, TABLESPACE_MAP, tblspc_map_file->data,
&manifest);
sendtblspclinks = false;
}
/* Then the bulk of the files... */
- sendDir(".", 1, false, tablespaces, sendtblspclinks,
- &manifest, NULL);
+ sendDir(sink, ".", 1, false, state.tablespaces,
+ sendtblspclinks, &manifest, NULL);
/* ... and pg_control after everything else. */
if (lstat(XLOG_CONTROL_FILE, &statbuf) != 0)
@@ -438,32 +351,33 @@ perform_base_backup(basebackup_options *opt)
(errcode_for_file_access(),
errmsg("could not stat file \"%s\": %m",
XLOG_CONTROL_FILE)));
- sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf,
+ sendFile(sink, XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf,
false, InvalidOid, &manifest, NULL);
}
else
- sendTablespace(ti->path, ti->oid, false, &manifest);
+ {
+ char *archive_name = psprintf("%s.tar", ti->oid);
+
+ bbsink_begin_archive(sink, archive_name);
+
+ sendTablespace(sink, ti->path, ti->oid, false, &manifest);
+ }
/*
* If we're including WAL, and this is the main data directory we
- * don't terminate the tar stream here. Instead, we will append
- * the xlog files below and terminate it then. This is safe since
- * the main data directory is always sent *last*.
+ * don't treat this as the end of the tablespace. Instead, we will
+ * include the xlog files below and stop afterwards. This is safe
+ * since the main data directory is always sent *last*.
*/
if (opt->includewal && ti->path == NULL)
{
- Assert(lnext(tablespaces, lc) == NULL);
+ Assert(lnext(state.tablespaces, lc) == NULL);
}
else
- pq_putemptymessage('c'); /* CopyDone */
-
- tblspc_streamed++;
- pgstat_progress_update_param(PROGRESS_BASEBACKUP_TBLSPC_STREAMED,
- tblspc_streamed);
+ bbsink_end_archive(sink);
}
- pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE,
- PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE);
+ basebackup_progress_wait_wal_archive(&state);
endptr = do_pg_stop_backup(labelfile->data, !opt->nowait, &endtli);
}
PG_END_ENSURE_ERROR_CLEANUP(do_pg_abort_backup, BoolGetDatum(false));
@@ -489,8 +403,7 @@ perform_base_backup(basebackup_options *opt)
ListCell *lc;
TimeLineID tli;
- pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE,
- PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL);
+ basebackup_progress_transfer_wal();
/*
* I'd rather not worry about timelines here, so scan pg_wal and
@@ -501,8 +414,8 @@ perform_base_backup(basebackup_options *opt)
* shouldn't be such files, but if there are, there's little harm in
* including them.
*/
- XLByteToSeg(startptr, startsegno, wal_segment_size);
- XLogFileName(firstoff, starttli, startsegno, wal_segment_size);
+ XLByteToSeg(state.startptr, startsegno, wal_segment_size);
+ XLogFileName(firstoff, state.starttli, startsegno, wal_segment_size);
XLByteToPrevSeg(endptr, endsegno, wal_segment_size);
XLogFileName(lastoff, endtli, endsegno, wal_segment_size);
@@ -528,7 +441,7 @@ perform_base_backup(basebackup_options *opt)
* Before we go any further, check that none of the WAL segments we
* need were removed.
*/
- CheckXLogRemoved(startsegno, starttli);
+ CheckXLogRemoved(startsegno, state.starttli);
/*
* Sort the WAL filenames. We want to send the files in order from
@@ -555,7 +468,7 @@ perform_base_backup(basebackup_options *opt)
{
char startfname[MAXFNAMELEN];
- XLogFileName(startfname, starttli, startsegno,
+ XLogFileName(startfname, state.starttli, startsegno,
wal_segment_size);
ereport(ERROR,
(errmsg("could not find WAL file \"%s\"", startfname)));
@@ -590,7 +503,6 @@ perform_base_backup(basebackup_options *opt)
{
char *walFileName = (char *) lfirst(lc);
int fd;
- char buf[TAR_SEND_SIZE];
size_t cnt;
pgoff_t len = 0;
@@ -629,22 +541,17 @@ perform_base_backup(basebackup_options *opt)
}
/* send the WAL file itself */
- _tarWriteHeader(pathbuf, NULL, &statbuf, false);
+ _tarWriteHeader(sink, pathbuf, NULL, &statbuf, false);
- while ((cnt = basebackup_read_file(fd, buf,
- Min(sizeof(buf),
+ while ((cnt = basebackup_read_file(fd, sink->bbs_buffer,
+ Min(sink->bbs_buffer_length,
wal_segment_size - len),
len, pathbuf, true)) > 0)
{
CheckXLogRemoved(segno, tli);
- /* Send the chunk as a CopyData message */
- if (pq_putmessage('d', buf, cnt))
- ereport(ERROR,
- (errmsg("base backup could not send data, aborting backup")));
- update_basebackup_progress(cnt);
+ bbsink_archive_contents(sink, cnt);
len += cnt;
- throttle(cnt);
if (len == wal_segment_size)
break;
@@ -673,7 +580,7 @@ perform_base_backup(basebackup_options *opt)
* complete segment.
*/
StatusFilePath(pathbuf, walFileName, ".done");
- sendFileWithContent(pathbuf, "", &manifest);
+ sendFileWithContent(sink, pathbuf, "", &manifest);
}
/*
@@ -696,23 +603,23 @@ perform_base_backup(basebackup_options *opt)
(errcode_for_file_access(),
errmsg("could not stat file \"%s\": %m", pathbuf)));
- sendFile(pathbuf, pathbuf, &statbuf, false, InvalidOid,
+ sendFile(sink, pathbuf, pathbuf, &statbuf, false, InvalidOid,
&manifest, NULL);
/* unconditionally mark file as archived */
StatusFilePath(pathbuf, fname, ".done");
- sendFileWithContent(pathbuf, "", &manifest);
+ sendFileWithContent(sink, pathbuf, "", &manifest);
}
- /* Send CopyDone message for the last tar file */
- pq_putemptymessage('c');
+ bbsink_end_archive(sink);
}
- AddWALInfoToBackupManifest(&manifest, startptr, starttli, endptr, endtli);
+ AddWALInfoToBackupManifest(&manifest, state.startptr, state.starttli,
+ endptr, endtli);
- SendBackupManifest(&manifest);
+ SendBackupManifest(&manifest, sink);
- SendXlogRecPtrResult(endptr, endtli);
+ bbsink_end_backup(sink, endptr, endtli);
if (total_checksum_failures)
{
@@ -738,7 +645,7 @@ perform_base_backup(basebackup_options *opt)
/* clean up the resource owner we created */
WalSndResourceCleanup(true);
- pgstat_progress_end_command();
+ basebackup_progress_done();
}
/*
@@ -943,6 +850,7 @@ void
SendBaseBackup(BaseBackupCmd *cmd)
{
basebackup_options opt;
+ bbsink *sink;
parse_basebackup_options(cmd->options, &opt);
@@ -957,158 +865,40 @@ SendBaseBackup(BaseBackupCmd *cmd)
set_ps_display(activitymsg);
}
- perform_base_backup(&opt);
-}
-
-static void
-send_int8_string(StringInfoData *buf, int64 intval)
-{
- char is[32];
-
- sprintf(is, INT64_FORMAT, intval);
- pq_sendint32(buf, strlen(is));
- pq_sendbytes(buf, is, strlen(is));
-}
-
-static void
-SendBackupHeader(List *tablespaces)
-{
- StringInfoData buf;
- ListCell *lc;
-
- /* Construct and send the directory information */
- pq_beginmessage(&buf, 'T'); /* RowDescription */
- pq_sendint16(&buf, 3); /* 3 fields */
-
- /* First field - spcoid */
- pq_sendstring(&buf, "spcoid");
- pq_sendint32(&buf, 0); /* table oid */
- pq_sendint16(&buf, 0); /* attnum */
- pq_sendint32(&buf, OIDOID); /* type oid */
- pq_sendint16(&buf, 4); /* typlen */
- pq_sendint32(&buf, 0); /* typmod */
- pq_sendint16(&buf, 0); /* format code */
-
- /* Second field - spclocation */
- pq_sendstring(&buf, "spclocation");
- pq_sendint32(&buf, 0);
- pq_sendint16(&buf, 0);
- pq_sendint32(&buf, TEXTOID);
- pq_sendint16(&buf, -1);
- pq_sendint32(&buf, 0);
- pq_sendint16(&buf, 0);
-
- /* Third field - size */
- pq_sendstring(&buf, "size");
- pq_sendint32(&buf, 0);
- pq_sendint16(&buf, 0);
- pq_sendint32(&buf, INT8OID);
- pq_sendint16(&buf, 8);
- pq_sendint32(&buf, 0);
- pq_sendint16(&buf, 0);
- pq_endmessage(&buf);
-
- foreach(lc, tablespaces)
- {
- tablespaceinfo *ti = lfirst(lc);
-
- /* Send one datarow message */
- pq_beginmessage(&buf, 'D');
- pq_sendint16(&buf, 3); /* number of columns */
- if (ti->path == NULL)
- {
- pq_sendint32(&buf, -1); /* Length = -1 ==> NULL */
- pq_sendint32(&buf, -1);
- }
- else
- {
- Size len;
-
- len = strlen(ti->oid);
- pq_sendint32(&buf, len);
- pq_sendbytes(&buf, ti->oid, len);
-
- len = strlen(ti->path);
- pq_sendint32(&buf, len);
- pq_sendbytes(&buf, ti->path, len);
- }
- if (ti->size >= 0)
- send_int8_string(&buf, ti->size / 1024);
- else
- pq_sendint32(&buf, -1); /* NULL */
+ /* Create a basic basebackup sink. */
+ sink = bbsink_copytblspc_new();
- pq_endmessage(&buf);
- }
+ /* Set up network throttling, if client requested it */
+ if (opt.maxrate > 0)
+ sink = bbsink_throttle_new(sink, opt.maxrate);
- /* Send a CommandComplete message */
- pq_puttextmessage('C', "SELECT");
-}
-
-/*
- * Send a single resultset containing just a single
- * XLogRecPtr record (in text format)
- */
-static void
-SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
-{
- StringInfoData buf;
- char str[MAXFNAMELEN];
- Size len;
-
- pq_beginmessage(&buf, 'T'); /* RowDescription */
- pq_sendint16(&buf, 2); /* 2 fields */
-
- /* Field headers */
- pq_sendstring(&buf, "recptr");
- pq_sendint32(&buf, 0); /* table oid */
- pq_sendint16(&buf, 0); /* attnum */
- pq_sendint32(&buf, TEXTOID); /* type oid */
- pq_sendint16(&buf, -1);
- pq_sendint32(&buf, 0);
- pq_sendint16(&buf, 0);
-
- pq_sendstring(&buf, "tli");
- pq_sendint32(&buf, 0); /* table oid */
- pq_sendint16(&buf, 0); /* attnum */
+ /* Set up progress reporting. */
+ sink = bbsink_progress_new(sink, opt.progress);
/*
- * int8 may seem like a surprising data type for this, but in theory int4
- * would not be wide enough for this, as TimeLineID is unsigned.
+ * Perform the base backup, but make sure we clean up the bbsink even if
+ * an error occurs.
*/
- pq_sendint32(&buf, INT8OID); /* type oid */
- pq_sendint16(&buf, -1);
- pq_sendint32(&buf, 0);
- pq_sendint16(&buf, 0);
- pq_endmessage(&buf);
-
- /* Data row */
- pq_beginmessage(&buf, 'D');
- pq_sendint16(&buf, 2); /* number of columns */
-
- len = snprintf(str, sizeof(str),
- "%X/%X", LSN_FORMAT_ARGS(ptr));
- pq_sendint32(&buf, len);
- pq_sendbytes(&buf, str, len);
-
- len = snprintf(str, sizeof(str), "%u", tli);
- pq_sendint32(&buf, len);
- pq_sendbytes(&buf, str, len);
-
- pq_endmessage(&buf);
-
- /* Send a CommandComplete message */
- pq_puttextmessage('C', "SELECT");
+ PG_TRY();
+ {
+ perform_base_backup(&opt, sink);
+ }
+ PG_FINALLY();
+ {
+ bbsink_cleanup(sink);
+ }
+ PG_END_TRY();
}
/*
* Inject a file with given name and content in the output tar stream.
*/
static void
-sendFileWithContent(const char *filename, const char *content,
+sendFileWithContent(bbsink *sink, const char *filename, const char *content,
backup_manifest_info *manifest)
{
struct stat statbuf;
- int pad,
+ int bytes_done = 0,
len;
pg_checksum_context checksum_ctx;
@@ -1134,25 +924,23 @@ sendFileWithContent(const char *filename, const char *content,
statbuf.st_mode = pg_file_create_mode;
statbuf.st_size = len;
- _tarWriteHeader(filename, NULL, &statbuf, false);
- /* Send the contents as a CopyData message */
- pq_putmessage('d', content, len);
- update_basebackup_progress(len);
+ _tarWriteHeader(sink, filename, NULL, &statbuf, false);
- /* Pad to a multiple of the tar block size. */
- pad = tarPaddingBytesRequired(len);
- if (pad > 0)
+ if (pg_checksum_update(&checksum_ctx, (uint8 *) content, len) < 0)
+ elog(ERROR, "could not update checksum of file \"%s\"",
+ filename);
+
+ while (bytes_done < len)
{
- char buf[TAR_BLOCK_SIZE];
+ size_t remaining = len - bytes_done;
+ size_t nbytes = Min(sink->bbs_buffer_length, remaining);
- MemSet(buf, 0, pad);
- pq_putmessage('d', buf, pad);
- update_basebackup_progress(pad);
+ memcpy(sink->bbs_buffer, content, nbytes);
+ bbsink_archive_contents(sink, nbytes);
+ bytes_done += nbytes;
}
- if (pg_checksum_update(&checksum_ctx, (uint8 *) content, len) < 0)
- elog(ERROR, "could not update checksum of file \"%s\"",
- filename);
+ _tarWritePadding(sink, len);
AddFileToBackupManifest(manifest, NULL, filename, len,
(pg_time_t) statbuf.st_mtime, &checksum_ctx);
@@ -1166,7 +954,7 @@ sendFileWithContent(const char *filename, const char *content,
* Only used to send auxiliary tablespaces, not PGDATA.
*/
static int64
-sendTablespace(char *path, char *spcoid, bool sizeonly,
+sendTablespace(bbsink *sink, char *path, char *spcoid, bool sizeonly,
backup_manifest_info *manifest)
{
int64 size;
@@ -1196,11 +984,11 @@ sendTablespace(char *path, char *spcoid, bool sizeonly,
return 0;
}
- size = _tarWriteHeader(TABLESPACE_VERSION_DIRECTORY, NULL, &statbuf,
+ size = _tarWriteHeader(sink, TABLESPACE_VERSION_DIRECTORY, NULL, &statbuf,
sizeonly);
/* Send all the files in the tablespace version directory */
- size += sendDir(pathbuf, strlen(path), sizeonly, NIL, true, manifest,
+ size += sendDir(sink, pathbuf, strlen(path), sizeonly, NIL, true, manifest,
spcoid);
return size;
@@ -1219,8 +1007,8 @@ sendTablespace(char *path, char *spcoid, bool sizeonly,
* as it will be sent separately in the tablespace_map file.
*/
static int64
-sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
- bool sendtblspclinks, backup_manifest_info *manifest,
+sendDir(bbsink *sink, const char *path, int basepathlen, bool sizeonly,
+ List *tablespaces, bool sendtblspclinks, backup_manifest_info *manifest,
const char *spcoid)
{
DIR *dir;
@@ -1380,8 +1168,8 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
{
elog(DEBUG1, "contents of directory \"%s\" excluded from backup", de->d_name);
convert_link_to_directory(pathbuf, &statbuf);
- size += _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf,
- sizeonly);
+ size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, NULL,
+ &statbuf, sizeonly);
excludeFound = true;
break;
}
@@ -1398,8 +1186,8 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
{
elog(DEBUG1, "contents of directory \"%s\" excluded from backup", statrelpath);
convert_link_to_directory(pathbuf, &statbuf);
- size += _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf,
- sizeonly);
+ size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, NULL,
+ &statbuf, sizeonly);
continue;
}
@@ -1412,15 +1200,15 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
{
/* If pg_wal is a symlink, write it as a directory anyway */
convert_link_to_directory(pathbuf, &statbuf);
- size += _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf,
- sizeonly);
+ size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, NULL,
+ &statbuf, sizeonly);
/*
* Also send archive_status directory (by hackishly reusing
* statbuf from above ...).
*/
- size += _tarWriteHeader("./pg_wal/archive_status", NULL, &statbuf,
- sizeonly);
+ size += _tarWriteHeader(sink, "./pg_wal/archive_status", NULL,
+ &statbuf, sizeonly);
continue; /* don't recurse into pg_wal */
}
@@ -1451,7 +1239,7 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
pathbuf)));
linkpath[rllen] = '\0';
- size += _tarWriteHeader(pathbuf + basepathlen + 1, linkpath,
+ size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, linkpath,
&statbuf, sizeonly);
#else
@@ -1475,7 +1263,7 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
* Store a directory entry in the tar file so we can get the
* permissions right.
*/
- size += _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf,
+ size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, NULL, &statbuf,
sizeonly);
/*
@@ -1507,7 +1295,7 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
skip_this_dir = true;
if (!skip_this_dir)
- size += sendDir(pathbuf, basepathlen, sizeonly, tablespaces,
+ size += sendDir(sink, pathbuf, basepathlen, sizeonly, tablespaces,
sendtblspclinks, manifest, spcoid);
}
else if (S_ISREG(statbuf.st_mode))
@@ -1515,7 +1303,7 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
bool sent = false;
if (!sizeonly)
- sent = sendFile(pathbuf, pathbuf + basepathlen + 1, &statbuf,
+ sent = sendFile(sink, pathbuf, pathbuf + basepathlen + 1, &statbuf,
true, isDbDir ? atooid(lastDir + 1) : InvalidOid,
manifest, spcoid);
@@ -1592,21 +1380,19 @@ is_checksummed_file(const char *fullpath, const char *filename)
* and the file did not exist.
*/
static bool
-sendFile(const char *readfilename, const char *tarfilename,
+sendFile(bbsink *sink, const char *readfilename, const char *tarfilename,
struct stat *statbuf, bool missing_ok, Oid dboid,
backup_manifest_info *manifest, const char *spcoid)
{
int fd;
BlockNumber blkno = 0;
bool block_retry = false;
- char buf[TAR_SEND_SIZE];
uint16 checksum;
int checksum_failures = 0;
off_t cnt;
int i;
pgoff_t len = 0;
char *page;
- size_t pad;
PageHeader phdr;
int segmentno = 0;
char *segmentpath;
@@ -1627,7 +1413,7 @@ sendFile(const char *readfilename, const char *tarfilename,
errmsg("could not open file \"%s\": %m", readfilename)));
}
- _tarWriteHeader(tarfilename, NULL, statbuf, false);
+ _tarWriteHeader(sink, tarfilename, NULL, statbuf, false);
if (!noverify_checksums && DataChecksumsEnabled())
{
@@ -1668,9 +1454,11 @@ sendFile(const char *readfilename, const char *tarfilename,
*/
while (len < statbuf->st_size)
{
+ size_t remaining = statbuf->st_size - len;
+
/* Try to read some more data. */
- cnt = basebackup_read_file(fd, buf,
- Min(sizeof(buf), statbuf->st_size - len),
+ cnt = basebackup_read_file(fd, sink->bbs_buffer,
+ Min(sink->bbs_buffer_length, remaining),
len, readfilename, true);
/*
@@ -1687,7 +1475,7 @@ sendFile(const char *readfilename, const char *tarfilename,
* TAR_SEND_SIZE/buf is divisible by BLCKSZ and we read a multiple of
* BLCKSZ bytes.
*/
- Assert(TAR_SEND_SIZE % BLCKSZ == 0);
+ Assert((sink->bbs_buffer_length % BLCKSZ) == 0);
if (verify_checksum && (cnt % BLCKSZ != 0))
{
@@ -1703,7 +1491,7 @@ sendFile(const char *readfilename, const char *tarfilename,
{
for (i = 0; i < cnt / BLCKSZ; i++)
{
- page = buf + BLCKSZ * i;
+ page = sink->bbs_buffer + BLCKSZ * i;
/*
* Only check pages which have not been modified since the
@@ -1713,7 +1501,7 @@ sendFile(const char *readfilename, const char *tarfilename,
* this case. We also skip completely new pages, since they
* don't have a checksum yet.
*/
- if (!PageIsNew(page) && PageGetLSN(page) < startptr)
+ if (!PageIsNew(page) && PageGetLSN(page) < sink->bbs_state->startptr)
{
checksum = pg_checksum_page((char *) page, blkno + segmentno * RELSEG_SIZE);
phdr = (PageHeader) page;
@@ -1735,7 +1523,8 @@ sendFile(const char *readfilename, const char *tarfilename,
/* Reread the failed block */
reread_cnt =
- basebackup_read_file(fd, buf + BLCKSZ * i,
+ basebackup_read_file(fd,
+ sink->bbs_buffer + BLCKSZ * i,
BLCKSZ, len + BLCKSZ * i,
readfilename,
false);
@@ -1782,34 +1571,29 @@ sendFile(const char *readfilename, const char *tarfilename,
}
}
- /* Send the chunk as a CopyData message */
- if (pq_putmessage('d', buf, cnt))
- ereport(ERROR,
- (errmsg("base backup could not send data, aborting backup")));
- update_basebackup_progress(cnt);
+ bbsink_archive_contents(sink, cnt);
/* Also feed it to the checksum machinery. */
- if (pg_checksum_update(&checksum_ctx, (uint8 *) buf, cnt) < 0)
+ if (pg_checksum_update(&checksum_ctx,
+ (uint8 *) sink->bbs_buffer, cnt) < 0)
elog(ERROR, "could not update checksum of base backup");
len += cnt;
- throttle(cnt);
}
/* If the file was truncated while we were sending it, pad it with zeros */
- if (len < statbuf->st_size)
+ while (len < statbuf->st_size)
{
- MemSet(buf, 0, sizeof(buf));
- while (len < statbuf->st_size)
- {
- cnt = Min(sizeof(buf), statbuf->st_size - len);
- pq_putmessage('d', buf, cnt);
- if (pg_checksum_update(&checksum_ctx, (uint8 *) buf, cnt) < 0)
- elog(ERROR, "could not update checksum of base backup");
- update_basebackup_progress(cnt);
- len += cnt;
- throttle(cnt);
- }
+ size_t remaining = statbuf->st_size - len;
+ size_t nbytes = Min(sink->bbs_buffer_length, remaining);
+
+ MemSet(sink->bbs_buffer, 0, nbytes);
+ if (pg_checksum_update(&checksum_ctx,
+ (uint8 *) sink->bbs_buffer,
+ nbytes) < 0)
+ elog(ERROR, "could not update checksum of base backup");
+ bbsink_archive_contents(sink, nbytes);
+ len += nbytes;
}
/*
@@ -1817,13 +1601,7 @@ sendFile(const char *readfilename, const char *tarfilename,
* of data is probably not worth throttling, and is not checksummed
* because it's not actually part of the file.)
*/
- pad = tarPaddingBytesRequired(len);
- if (pad > 0)
- {
- MemSet(buf, 0, pad);
- pq_putmessage('d', buf, pad);
- update_basebackup_progress(pad);
- }
+ _tarWritePadding(sink, len);
CloseTransientFile(fd);
@@ -1846,18 +1624,28 @@ sendFile(const char *readfilename, const char *tarfilename,
return true;
}
-
static int64
-_tarWriteHeader(const char *filename, const char *linktarget,
+_tarWriteHeader(bbsink *sink, const char *filename, const char *linktarget,
struct stat *statbuf, bool sizeonly)
{
- char h[TAR_BLOCK_SIZE];
enum tarError rc;
if (!sizeonly)
{
- rc = tarCreateHeader(h, filename, linktarget, statbuf->st_size,
- statbuf->st_mode, statbuf->st_uid, statbuf->st_gid,
+ /*
+ * As of this writing, the smallest supported block size is 1kB, which
+ * is twice TAR_BLOCK_SIZE. Since the buffer size is required to be a
+ * multiple of BLCKSZ, it should be safe to assume that the buffer is
+ * large enough to fit an entire tar block. We double-check by means
+ * of these assertions.
+ */
+ StaticAssertStmt(TAR_BLOCK_SIZE <= BLCKSZ,
+ "BLCKSZ too small for tar block");
+ Assert(sink->bbs_buffer_length >= TAR_BLOCK_SIZE);
+
+ rc = tarCreateHeader(sink->bbs_buffer, filename, linktarget,
+ statbuf->st_size, statbuf->st_mode,
+ statbuf->st_uid, statbuf->st_gid,
statbuf->st_mtime);
switch (rc)
@@ -1879,134 +1667,48 @@ _tarWriteHeader(const char *filename, const char *linktarget,
elog(ERROR, "unrecognized tar error: %d", rc);
}
- pq_putmessage('d', h, sizeof(h));
- update_basebackup_progress(sizeof(h));
+ bbsink_archive_contents(sink, TAR_BLOCK_SIZE);
}
- return sizeof(h);
+ return TAR_BLOCK_SIZE;
}
/*
- * If the entry in statbuf is a link, then adjust statbuf to make it look like a
- * directory, so that it will be written that way.
+ * Pad with zero bytes out to a multiple of TAR_BLOCK_SIZE.
*/
static void
-convert_link_to_directory(const char *pathbuf, struct stat *statbuf)
+_tarWritePadding(bbsink *sink, int len)
{
- /* If symlink, write it as a directory anyway */
-#ifndef WIN32
- if (S_ISLNK(statbuf->st_mode))
-#else
- if (pgwin32_is_junction(pathbuf))
-#endif
- statbuf->st_mode = S_IFDIR | pg_dir_create_mode;
-}
-
-/*
- * Increment the network transfer counter by the given number of bytes,
- * and sleep if necessary to comply with the requested network transfer
- * rate.
- */
-static void
-throttle(size_t increment)
-{
- TimeOffset elapsed_min;
-
- if (throttling_counter < 0)
- return;
-
- throttling_counter += increment;
- if (throttling_counter < throttling_sample)
- return;
-
- /* How much time should have elapsed at minimum? */
- elapsed_min = elapsed_min_unit *
- (throttling_counter / throttling_sample);
+ int pad = tarPaddingBytesRequired(len);
/*
- * Since the latch could be set repeatedly because of concurrently WAL
- * activity, sleep in a loop to ensure enough time has passed.
+ * As in _tarWriteHeader, it should be safe to assume that the buffer is
+ * large enough that we don't need to do this in multiple chunks.
*/
- for (;;)
- {
- TimeOffset elapsed,
- sleep;
- int wait_result;
-
- /* Time elapsed since the last measurement (and possible wake up). */
- elapsed = GetCurrentTimestamp() - throttled_last;
-
- /* sleep if the transfer is faster than it should be */
- sleep = elapsed_min - elapsed;
- if (sleep <= 0)
- break;
-
- ResetLatch(MyLatch);
-
- /* We're eating a potentially set latch, so check for interrupts */
- CHECK_FOR_INTERRUPTS();
+ Assert(sink->bbs_buffer_length >= TAR_BLOCK_SIZE);
+ Assert(pad <= TAR_BLOCK_SIZE);
- /*
- * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be
- * the maximum time to sleep. Thus the cast to long is safe.
- */
- wait_result = WaitLatch(MyLatch,
- WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
- (long) (sleep / 1000),
- WAIT_EVENT_BASE_BACKUP_THROTTLE);
-
- if (wait_result & WL_LATCH_SET)
- CHECK_FOR_INTERRUPTS();
-
- /* Done waiting? */
- if (wait_result & WL_TIMEOUT)
- break;
+ if (pad > 0)
+ {
+ MemSet(sink->bbs_buffer, 0, pad);
+ bbsink_archive_contents(sink, pad);
}
-
- /*
- * As we work with integers, only whole multiple of throttling_sample was
- * processed. The rest will be done during the next call of this function.
- */
- throttling_counter %= throttling_sample;
-
- /*
- * Time interval for the remaining amount and possible next increments
- * starts now.
- */
- throttled_last = GetCurrentTimestamp();
}
/*
- * Increment the counter for the amount of data already streamed
- * by the given number of bytes, and update the progress report for
- * pg_stat_progress_basebackup.
+ * If the entry in statbuf is a link, then adjust statbuf to make it look like a
+ * directory, so that it will be written that way.
*/
static void
-update_basebackup_progress(int64 delta)
+convert_link_to_directory(const char *pathbuf, struct stat *statbuf)
{
- const int index[] = {
- PROGRESS_BASEBACKUP_BACKUP_STREAMED,
- PROGRESS_BASEBACKUP_BACKUP_TOTAL
- };
- int64 val[2];
- int nparam = 0;
-
- backup_streamed += delta;
- val[nparam++] = backup_streamed;
-
- /*
- * Avoid overflowing past 100% or the full size. This may make the total
- * size number change as we approach the end of the backup (the estimate
- * will always be wrong if WAL is included), but that's better than having
- * the done column be bigger than the total.
- */
- if (backup_total > -1 && backup_streamed > backup_total)
- {
- backup_total = backup_streamed;
- val[nparam++] = backup_total;
- }
-
- pgstat_progress_update_multi_param(nparam, index, val);
+ /* If symlink, write it as a directory anyway */
+#ifndef WIN32
+ if (S_ISLNK(statbuf->st_mode))
+#else
+ if (pgwin32_is_junction(pathbuf))
+#endif
+ statbuf->st_mode = S_IFDIR | pg_dir_create_mode;
}
/*
diff --git a/src/backend/replication/basebackup_copy.c b/src/backend/replication/basebackup_copy.c
new file mode 100644
index 00000000000..30bab4546ef
--- /dev/null
+++ b/src/backend/replication/basebackup_copy.c
@@ -0,0 +1,335 @@
+/*-------------------------------------------------------------------------
+ *
+ * basebackup_copy.c
+ * send basebackup archives using one COPY OUT operation per
+ * tablespace, and an additional COPY OUT for the backup manifest
+ *
+ * Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/replication/basebackup_copy.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "catalog/pg_type_d.h"
+#include "libpq/libpq.h"
+#include "libpq/pqformat.h"
+#include "replication/basebackup.h"
+#include "replication/basebackup_sink.h"
+
+static void bbsink_copytblspc_begin_backup(bbsink *sink);
+static void bbsink_copytblspc_begin_archive(bbsink *sink,
+ const char *archive_name);
+static void bbsink_copytblspc_archive_contents(bbsink *sink, size_t len);
+static void bbsink_copytblspc_end_archive(bbsink *sink);
+static void bbsink_copytblspc_begin_manifest(bbsink *sink);
+static void bbsink_copytblspc_manifest_contents(bbsink *sink, size_t len);
+static void bbsink_copytblspc_end_manifest(bbsink *sink);
+static void bbsink_copytblspc_end_backup(bbsink *sink, XLogRecPtr endptr,
+ TimeLineID endtli);
+static void bbsink_copytblspc_cleanup(bbsink *sink);
+
+static void SendCopyOutResponse(void);
+static void SendCopyData(const char *data, size_t len);
+static void SendCopyDone(void);
+static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
+static void SendTablespaceList(List *tablespaces);
+static void send_int8_string(StringInfoData *buf, int64 intval);
+
+const bbsink_ops bbsink_copytblspc_ops = {
+ .begin_backup = bbsink_copytblspc_begin_backup,
+ .begin_archive = bbsink_copytblspc_begin_archive,
+ .archive_contents = bbsink_copytblspc_archive_contents,
+ .end_archive = bbsink_copytblspc_end_archive,
+ .begin_manifest = bbsink_copytblspc_begin_manifest,
+ .manifest_contents = bbsink_copytblspc_manifest_contents,
+ .end_manifest = bbsink_copytblspc_end_manifest,
+ .end_backup = bbsink_copytblspc_end_backup,
+ .cleanup = bbsink_copytblspc_cleanup
+};
+
+/*
+ * Create a new 'copytblspc' bbsink.
+ */
+bbsink *
+bbsink_copytblspc_new(void)
+{
+ bbsink *sink = palloc0(sizeof(bbsink));
+
+ *((const bbsink_ops **) &sink->bbs_ops) = &bbsink_copytblspc_ops;
+
+ return sink;
+}
+
+/*
+ * Begin backup.
+ */
+static void
+bbsink_copytblspc_begin_backup(bbsink *sink)
+{
+ bbsink_state *state = sink->bbs_state;
+
+ /* Create a suitable buffer. */
+ sink->bbs_buffer = palloc(sink->bbs_buffer_length);
+
+ /* Tell client the backup start location. */
+ SendXlogRecPtrResult(state->startptr, state->starttli);
+
+ /* Send client a list of tablespaces. */
+ SendTablespaceList(state->tablespaces);
+
+ /* Send a CommandComplete message */
+ pq_puttextmessage('C', "SELECT");
+}
+
+/*
+ * Each archive is set as a separate stream of COPY data, and thus begins
+ * with a CopyOutResponse message.
+ */
+static void
+bbsink_copytblspc_begin_archive(bbsink *sink, const char *archive_name)
+{
+ SendCopyOutResponse();
+}
+
+/*
+ * Each chunk of data within the archive is sent as a CopyData message.
+ */
+static void
+bbsink_copytblspc_archive_contents(bbsink *sink, size_t len)
+{
+ SendCopyData(sink->bbs_buffer, len);
+}
+
+/*
+ * The archive is terminated by a CopyDone message.
+ */
+static void
+bbsink_copytblspc_end_archive(bbsink *sink)
+{
+ SendCopyDone();
+}
+
+/*
+ * The backup manifest is sent as a separate stream of COPY data, and thus
+ * begins with a CopyOutResponse message.
+ */
+static void
+bbsink_copytblspc_begin_manifest(bbsink *sink)
+{
+ SendCopyOutResponse();
+}
+
+/*
+ * Each chunk of manifest data is sent using a CopyData message.
+ */
+static void
+bbsink_copytblspc_manifest_contents(bbsink *sink, size_t len)
+{
+ SendCopyData(sink->bbs_buffer, len);
+}
+
+/*
+ * When we've finished sending the manifest, send a CopyDone message.
+ */
+static void
+bbsink_copytblspc_end_manifest(bbsink *sink)
+{
+ SendCopyDone();
+}
+
+/*
+ * Send end-of-backup wire protocol messages.
+ */
+static void
+bbsink_copytblspc_end_backup(bbsink *sink, XLogRecPtr endptr,
+ TimeLineID endtli)
+{
+ SendXlogRecPtrResult(endptr, endtli);
+}
+
+/*
+ * Cleanup.
+ */
+static void
+bbsink_copytblspc_cleanup(bbsink *sink)
+{
+ /* Nothing to do. */
+}
+
+/*
+ * Send a CopyOutResponse message.
+ */
+static void
+SendCopyOutResponse(void)
+{
+ StringInfoData buf;
+
+ pq_beginmessage(&buf, 'H');
+ pq_sendbyte(&buf, 0); /* overall format */
+ pq_sendint16(&buf, 0); /* natts */
+ pq_endmessage(&buf);
+}
+
+/*
+ * Send a CopyData message.
+ */
+static void
+SendCopyData(const char *data, size_t len)
+{
+ pq_putmessage('d', data, len);
+}
+
+/*
+ * Send a CopyDone message.
+ */
+static void
+SendCopyDone(void)
+{
+ pq_putemptymessage('c');
+}
+
+/*
+ * Send a single resultset containing just a single
+ * XLogRecPtr record (in text format)
+ */
+static void
+SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
+{
+ StringInfoData buf;
+ char str[MAXFNAMELEN];
+ Size len;
+
+ pq_beginmessage(&buf, 'T'); /* RowDescription */
+ pq_sendint16(&buf, 2); /* 2 fields */
+
+ /* Field headers */
+ pq_sendstring(&buf, "recptr");
+ pq_sendint32(&buf, 0); /* table oid */
+ pq_sendint16(&buf, 0); /* attnum */
+ pq_sendint32(&buf, TEXTOID); /* type oid */
+ pq_sendint16(&buf, -1);
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+
+ pq_sendstring(&buf, "tli");
+ pq_sendint32(&buf, 0); /* table oid */
+ pq_sendint16(&buf, 0); /* attnum */
+
+ /*
+ * int8 may seem like a surprising data type for this, but in theory int4
+ * would not be wide enough for this, as TimeLineID is unsigned.
+ */
+ pq_sendint32(&buf, INT8OID); /* type oid */
+ pq_sendint16(&buf, -1);
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_endmessage(&buf);
+
+ /* Data row */
+ pq_beginmessage(&buf, 'D');
+ pq_sendint16(&buf, 2); /* number of columns */
+
+ len = snprintf(str, sizeof(str),
+ "%X/%X", LSN_FORMAT_ARGS(ptr));
+ pq_sendint32(&buf, len);
+ pq_sendbytes(&buf, str, len);
+
+ len = snprintf(str, sizeof(str), "%u", tli);
+ pq_sendint32(&buf, len);
+ pq_sendbytes(&buf, str, len);
+
+ pq_endmessage(&buf);
+
+ /* Send a CommandComplete message */
+ pq_puttextmessage('C', "SELECT");
+}
+
+/*
+ * Send a result set via libpq describing the tablespace list.
+ */
+static void
+SendTablespaceList(List *tablespaces)
+{
+ StringInfoData buf;
+ ListCell *lc;
+
+ /* Construct and send the directory information */
+ pq_beginmessage(&buf, 'T'); /* RowDescription */
+ pq_sendint16(&buf, 3); /* 3 fields */
+
+ /* First field - spcoid */
+ pq_sendstring(&buf, "spcoid");
+ pq_sendint32(&buf, 0); /* table oid */
+ pq_sendint16(&buf, 0); /* attnum */
+ pq_sendint32(&buf, OIDOID); /* type oid */
+ pq_sendint16(&buf, 4); /* typlen */
+ pq_sendint32(&buf, 0); /* typmod */
+ pq_sendint16(&buf, 0); /* format code */
+
+ /* Second field - spclocation */
+ pq_sendstring(&buf, "spclocation");
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_sendint32(&buf, TEXTOID);
+ pq_sendint16(&buf, -1);
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+
+ /* Third field - size */
+ pq_sendstring(&buf, "size");
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_sendint32(&buf, INT8OID);
+ pq_sendint16(&buf, 8);
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_endmessage(&buf);
+
+ foreach(lc, tablespaces)
+ {
+ tablespaceinfo *ti = lfirst(lc);
+
+ /* Send one datarow message */
+ pq_beginmessage(&buf, 'D');
+ pq_sendint16(&buf, 3); /* number of columns */
+ if (ti->path == NULL)
+ {
+ pq_sendint32(&buf, -1); /* Length = -1 ==> NULL */
+ pq_sendint32(&buf, -1);
+ }
+ else
+ {
+ Size len;
+
+ len = strlen(ti->oid);
+ pq_sendint32(&buf, len);
+ pq_sendbytes(&buf, ti->oid, len);
+
+ len = strlen(ti->path);
+ pq_sendint32(&buf, len);
+ pq_sendbytes(&buf, ti->path, len);
+ }
+ if (ti->size >= 0)
+ send_int8_string(&buf, ti->size / 1024);
+ else
+ pq_sendint32(&buf, -1); /* NULL */
+
+ pq_endmessage(&buf);
+ }
+}
+
+/*
+ * Send a 64-bit integer as a string via the wire protocol.
+ */
+static void
+send_int8_string(StringInfoData *buf, int64 intval)
+{
+ char is[32];
+
+ sprintf(is, INT64_FORMAT, intval);
+ pq_sendint32(buf, strlen(is));
+ pq_sendbytes(buf, is, strlen(is));
+}
diff --git a/src/backend/replication/basebackup_progress.c b/src/backend/replication/basebackup_progress.c
new file mode 100644
index 00000000000..e1a196251ef
--- /dev/null
+++ b/src/backend/replication/basebackup_progress.c
@@ -0,0 +1,246 @@
+/*-------------------------------------------------------------------------
+ *
+ * basebackup_progress.c
+ * Basebackup sink implementing progress tracking, including but not
+ * limited to command progress reporting.
+ *
+ * This should be used even if the PROGRESS option to the replication
+ * command BASE_BACKUP is not specified. Without that option, we won't
+ * have tallied up the size of the files that are going to need to be
+ * backed up, but we can still report to the command progress reporting
+ * facility how much data we've processed.
+ *
+ * Moreover, we also use this as a convenient place to update certain
+ * fields of the bbsink_state. That work is accurately described as
+ * keeping track of our progress, but it's not just for introspection.
+ * We need those fields to be updated properly in order for base backups
+ * to work.
+ *
+ * This particular basebackup sink requires extra callbacks that most base
+ * backup sinks don't. Rather than cramming those into the interface, we just
+ * have a few extra functions here that basebackup.c can call. (We could put
+ * the logic directly into that file as it's fairly simple, but it seems
+ * cleaner to have everything related to progress reporting in one place.)
+ *
+ * Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/replication/basebackup_progress.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "commands/progress.h"
+#include "miscadmin.h"
+#include "replication/basebackup.h"
+#include "replication/basebackup_sink.h"
+#include "pgstat.h"
+#include "storage/latch.h"
+#include "utils/timestamp.h"
+
+static void bbsink_progress_begin_backup(bbsink *sink);
+static void bbsink_progress_archive_contents(bbsink *sink, size_t len);
+static void bbsink_progress_end_archive(bbsink *sink);
+
+const bbsink_ops bbsink_progress_ops = {
+ .begin_backup = bbsink_progress_begin_backup,
+ .begin_archive = bbsink_forward_begin_archive,
+ .archive_contents = bbsink_progress_archive_contents,
+ .end_archive = bbsink_progress_end_archive,
+ .begin_manifest = bbsink_forward_begin_manifest,
+ .manifest_contents = bbsink_forward_manifest_contents,
+ .end_manifest = bbsink_forward_end_manifest,
+ .end_backup = bbsink_forward_end_backup,
+ .cleanup = bbsink_forward_cleanup
+};
+
+/*
+ * Create a new basebackup sink that performs progress tracking functions and
+ * forwards data to a successor sink.
+ */
+bbsink *
+bbsink_progress_new(bbsink *next, bool estimate_backup_size)
+{
+ bbsink *sink;
+
+ Assert(next != NULL);
+
+ sink = palloc0(sizeof(bbsink));
+ *((const bbsink_ops **) &sink->bbs_ops) = &bbsink_progress_ops;
+ sink->bbs_next = next;
+
+ /*
+ * Report that a base backup is in progress, and set the total size of the
+ * backup to -1, which will get translated to NULL. If we're estimating
+ * the backup size, we'll insert the real estimate when we have it.
+ */
+ pgstat_progress_start_command(PROGRESS_COMMAND_BASEBACKUP, InvalidOid);
+ pgstat_progress_update_param(PROGRESS_BASEBACKUP_BACKUP_TOTAL, -1);
+
+ return sink;
+}
+
+/*
+ * Progress reporting at start of backup.
+ */
+static void
+bbsink_progress_begin_backup(bbsink *sink)
+{
+ const int index[] = {
+ PROGRESS_BASEBACKUP_PHASE,
+ PROGRESS_BASEBACKUP_BACKUP_TOTAL,
+ PROGRESS_BASEBACKUP_TBLSPC_TOTAL
+ };
+ int64 val[3];
+
+ /*
+ * Report that we are now streaming database files as a base backup. Also
+ * advertise the number of tablespaces, and, if known, the estimated total
+ * backup size.
+ */
+ val[0] = PROGRESS_BASEBACKUP_PHASE_STREAM_BACKUP;
+ if (sink->bbs_state->bytes_total_is_valid)
+ val[1] = sink->bbs_state->bytes_total;
+ else
+ val[1] = -1;
+ val[2] = list_length(sink->bbs_state->tablespaces);
+ pgstat_progress_update_multi_param(3, index, val);
+
+ /* Delegate to next sink. */
+ bbsink_forward_begin_backup(sink);
+}
+
+/*
+ * End-of archive progress reporting.
+ */
+static void
+bbsink_progress_end_archive(bbsink *sink)
+{
+ /*
+ * We expect one archive per tablespace, so reaching the end of an archive
+ * also means reaching the end of a tablespace. (Some day we might have a
+ * reason to decouple these concepts.)
+ *
+ * If WAL is included in the backup, we'll mark the last tablespace
+ * complete before the last archive is complete, so we need a guard here
+ * to ensure that the number of tablespaces streamed doesn't exceed the
+ * total.
+ */
+ if (sink->bbs_state->tablespace_num < list_length(sink->bbs_state->tablespaces))
+ pgstat_progress_update_param(PROGRESS_BASEBACKUP_TBLSPC_STREAMED,
+ sink->bbs_state->tablespace_num + 1);
+
+ /* Delegate to next sink. */
+ bbsink_forward_end_archive(sink);
+
+ /*
+ * This is a convenient place to update the bbsink_state's notion of which
+ * is the current tablespace. Note that the bbsink_state object is shared
+ * across all bbsink objects involved, but we're the outermost one and
+ * this is the very last thing we do.
+ */
+ sink->bbs_state->tablespace_num++;
+}
+
+/*
+ * Handle progress tracking for new archive contents.
+ *
+ * Increment the counter for the amount of data already streamed
+ * by the given number of bytes, and update the progress report for
+ * pg_stat_progress_basebackup.
+ */
+static void
+bbsink_progress_archive_contents(bbsink *sink, size_t len)
+{
+ bbsink_state *state = sink->bbs_state;
+ const int index[] = {
+ PROGRESS_BASEBACKUP_BACKUP_STREAMED,
+ PROGRESS_BASEBACKUP_BACKUP_TOTAL
+ };
+ int64 val[2];
+ int nparam = 0;
+
+ /* First update bbsink_state with # of bytes done. */
+ state->bytes_done += len;
+
+ /* Now forward to next sink. */
+ bbsink_forward_archive_contents(sink, len);
+
+ /* Prepare to set # of bytes done for command progress reporting. */
+ val[nparam++] = state->bytes_done;
+
+ /*
+ * We may also want to update # of total bytes, to avoid overflowing past
+ * 100% or the full size. This may make the total size number change as we
+ * approach the end of the backup (the estimate will always be wrong if
+ * WAL is included), but that's better than having the done column be
+ * bigger than the total.
+ */
+ if (state->bytes_total_is_valid && state->bytes_done > state->bytes_total)
+ val[nparam++] = state->bytes_done;
+
+ pgstat_progress_update_multi_param(nparam, index, val);
+}
+
+/*
+ * Advertise that we are waiting for the start-of-backup checkpoint.
+ */
+void
+basebackup_progress_wait_checkpoint(void)
+{
+ pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE,
+ PROGRESS_BASEBACKUP_PHASE_WAIT_CHECKPOINT);
+}
+
+/*
+ * Advertise that we are estimating the backup size.
+ */
+void
+basebackup_progress_estimate_backup_size(void)
+{
+ pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE,
+ PROGRESS_BASEBACKUP_PHASE_ESTIMATE_BACKUP_SIZE);
+}
+
+/*
+ * Advertise that we are waiting for WAL archiving at end-of-backup.
+ */
+void
+basebackup_progress_wait_wal_archive(bbsink_state *state)
+{
+ const int index[] = {
+ PROGRESS_BASEBACKUP_PHASE,
+ PROGRESS_BASEBACKUP_TBLSPC_STREAMED
+ };
+ int64 val[2];
+
+ /*
+ * We report having finished all tablespaces at this point, even if the
+ * archive for the main tablespace is still open, because what's going to
+ * be added is WAL files, not files that are really from the main
+ * tablespace.
+ */
+ val[0] = PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE;
+ val[1] = list_length(state->tablespaces);
+ pgstat_progress_update_multi_param(2, index, val);
+}
+
+/*
+ * Advertise that we are transferring WAL files into the final archive.
+ */
+void
+basebackup_progress_transfer_wal(void)
+{
+ pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE,
+ PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL);
+}
+
+/*
+ * Advertise that we are no longer performing a backup.
+ */
+void
+basebackup_progress_done(void)
+{
+ pgstat_progress_end_command();
+}
diff --git a/src/backend/replication/basebackup_sink.c b/src/backend/replication/basebackup_sink.c
new file mode 100644
index 00000000000..4a47854f81f
--- /dev/null
+++ b/src/backend/replication/basebackup_sink.c
@@ -0,0 +1,125 @@
+/*-------------------------------------------------------------------------
+ *
+ * basebackup_sink.c
+ * Default implementations for bbsink (basebackup sink) callbacks.
+ *
+ * Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group
+ *
+ * src/backend/replication/basebackup_sink.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "replication/basebackup_sink.h"
+
+/*
+ * Forward begin_backup callback.
+ *
+ * Only use this implementation if you want the bbsink you're implementing to
+ * share a buffer with the succesor bbsink.
+ */
+void
+bbsink_forward_begin_backup(bbsink *sink)
+{
+ Assert(sink->bbs_next != NULL);
+ Assert(sink->bbs_state != NULL);
+ bbsink_begin_backup(sink->bbs_next, sink->bbs_state,
+ sink->bbs_buffer_length);
+ sink->bbs_buffer = sink->bbs_next->bbs_buffer;
+}
+
+/*
+ * Forward begin_archive callback.
+ */
+void
+bbsink_forward_begin_archive(bbsink *sink, const char *archive_name)
+{
+ Assert(sink->bbs_next != NULL);
+ bbsink_begin_archive(sink->bbs_next, archive_name);
+}
+
+/*
+ * Forward archive_contents callback.
+ *
+ * Code that wants to use this should initalize its own bbs_buffer and
+ * bbs_buffer_length fields to the values from the successor sink. In cases
+ * where the buffer isn't shared, the data needs to be copied before forwarding
+ * the callback. We don't do try to do that here, because there's really no
+ * reason to have separately allocated buffers containing the same identical
+ * data.
+ */
+void
+bbsink_forward_archive_contents(bbsink *sink, size_t len)
+{
+ Assert(sink->bbs_next != NULL);
+ Assert(sink->bbs_buffer == sink->bbs_next->bbs_buffer);
+ Assert(sink->bbs_buffer_length == sink->bbs_next->bbs_buffer_length);
+ bbsink_archive_contents(sink->bbs_next, len);
+}
+
+/*
+ * Forward end_archive callback.
+ */
+void
+bbsink_forward_end_archive(bbsink *sink)
+{
+ Assert(sink->bbs_next != NULL);
+ bbsink_end_archive(sink->bbs_next);
+}
+
+/*
+ * Forward begin_manifest callback.
+ */
+void
+bbsink_forward_begin_manifest(bbsink *sink)
+{
+ Assert(sink->bbs_next != NULL);
+ bbsink_begin_manifest(sink->bbs_next);
+}
+
+/*
+ * Forward manifest_contents callback.
+ *
+ * As with the archive_contents callback, it's expected that the buffer is
+ * shared.
+ */
+void
+bbsink_forward_manifest_contents(bbsink *sink, size_t len)
+{
+ Assert(sink->bbs_next != NULL);
+ Assert(sink->bbs_buffer == sink->bbs_next->bbs_buffer);
+ Assert(sink->bbs_buffer_length == sink->bbs_next->bbs_buffer_length);
+ bbsink_manifest_contents(sink->bbs_next, len);
+}
+
+/*
+ * Forward end_manifest callback.
+ */
+void
+bbsink_forward_end_manifest(bbsink *sink)
+{
+ Assert(sink->bbs_next != NULL);
+ bbsink_end_manifest(sink->bbs_next);
+}
+
+/*
+ * Forward end_backup callback.
+ */
+void
+bbsink_forward_end_backup(bbsink *sink, XLogRecPtr endptr, TimeLineID endtli)
+{
+ Assert(sink->bbs_next != NULL);
+ bbsink_end_backup(sink->bbs_next, endptr, endtli);
+}
+
+/*
+ * Forward cleanup callback.
+ */
+void
+bbsink_forward_cleanup(bbsink *sink)
+{
+ Assert(sink->bbs_next != NULL);
+ bbsink_cleanup(sink->bbs_next);
+}
diff --git a/src/backend/replication/basebackup_throttle.c b/src/backend/replication/basebackup_throttle.c
new file mode 100644
index 00000000000..f163931f8a3
--- /dev/null
+++ b/src/backend/replication/basebackup_throttle.c
@@ -0,0 +1,199 @@
+/*-------------------------------------------------------------------------
+ *
+ * basebackup_throttle.c
+ * Basebackup sink implementing throttling. Data is forwarded to the
+ * next base backup sink in the chain at a rate no greater than the
+ * configured maximum.
+ *
+ * Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/replication/basebackup_throttle.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "miscadmin.h"
+#include "replication/basebackup_sink.h"
+#include "pgstat.h"
+#include "storage/latch.h"
+#include "utils/timestamp.h"
+
+typedef struct bbsink_throttle
+{
+ /* Common information for all types of sink. */
+ bbsink base;
+
+ /* The actual number of bytes, transfer of which may cause sleep. */
+ uint64 throttling_sample;
+
+ /* Amount of data already transferred but not yet throttled. */
+ int64 throttling_counter;
+
+ /* The minimum time required to transfer throttling_sample bytes. */
+ TimeOffset elapsed_min_unit;
+
+ /* The last check of the transfer rate. */
+ TimestampTz throttled_last;
+} bbsink_throttle;
+
+static void bbsink_throttle_begin_backup(bbsink *sink);
+static void bbsink_throttle_archive_contents(bbsink *sink, size_t len);
+static void bbsink_throttle_manifest_contents(bbsink *sink, size_t len);
+static void throttle(bbsink_throttle *sink, size_t increment);
+
+const bbsink_ops bbsink_throttle_ops = {
+ .begin_backup = bbsink_throttle_begin_backup,
+ .begin_archive = bbsink_forward_begin_archive,
+ .archive_contents = bbsink_throttle_archive_contents,
+ .end_archive = bbsink_forward_end_archive,
+ .begin_manifest = bbsink_forward_begin_manifest,
+ .manifest_contents = bbsink_throttle_manifest_contents,
+ .end_manifest = bbsink_forward_end_manifest,
+ .end_backup = bbsink_forward_end_backup,
+ .cleanup = bbsink_forward_cleanup
+};
+
+/*
+ * How frequently to throttle, as a fraction of the specified rate-second.
+ */
+#define THROTTLING_FREQUENCY 8
+
+/*
+ * Create a new basebackup sink that performs throttling and forwards data
+ * to a successor sink.
+ */
+bbsink *
+bbsink_throttle_new(bbsink *next, uint32 maxrate)
+{
+ bbsink_throttle *sink;
+
+ Assert(next != NULL);
+ Assert(maxrate > 0);
+
+ sink = palloc0(sizeof(bbsink_throttle));
+ *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_throttle_ops;
+ sink->base.bbs_next = next;
+
+ sink->throttling_sample =
+ (int64) maxrate * (int64) 1024 / THROTTLING_FREQUENCY;
+
+ /*
+ * The minimum amount of time for throttling_sample bytes to be
+ * transferred.
+ */
+ sink->elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY;
+
+ return &sink->base;
+}
+
+/*
+ * There's no real work to do here, but we need to record the current time so
+ * that it can be used for future calculations.
+ */
+static void
+bbsink_throttle_begin_backup(bbsink *sink)
+{
+ bbsink_throttle *mysink = (bbsink_throttle *) sink;
+
+ bbsink_forward_begin_backup(sink);
+
+ /* The 'real data' starts now (header was ignored). */
+ mysink->throttled_last = GetCurrentTimestamp();
+}
+
+/*
+ * First throttle, and then pass archive contents to next sink.
+ */
+static void
+bbsink_throttle_archive_contents(bbsink *sink, size_t len)
+{
+ throttle((bbsink_throttle *) sink, len);
+
+ bbsink_forward_archive_contents(sink, len);
+}
+
+/*
+ * First throttle, and then pass manifest contents to next sink.
+ */
+static void
+bbsink_throttle_manifest_contents(bbsink *sink, size_t len)
+{
+ throttle((bbsink_throttle *) sink, len);
+
+ bbsink_forward_manifest_contents(sink->bbs_next, len);
+}
+
+/*
+ * Increment the network transfer counter by the given number of bytes,
+ * and sleep if necessary to comply with the requested network transfer
+ * rate.
+ */
+static void
+throttle(bbsink_throttle *sink, size_t increment)
+{
+ TimeOffset elapsed_min;
+
+ Assert(sink->throttling_counter >= 0);
+
+ sink->throttling_counter += increment;
+ if (sink->throttling_counter < sink->throttling_sample)
+ return;
+
+ /* How much time should have elapsed at minimum? */
+ elapsed_min = sink->elapsed_min_unit *
+ (sink->throttling_counter / sink->throttling_sample);
+
+ /*
+ * Since the latch could be set repeatedly because of concurrently WAL
+ * activity, sleep in a loop to ensure enough time has passed.
+ */
+ for (;;)
+ {
+ TimeOffset elapsed,
+ sleep;
+ int wait_result;
+
+ /* Time elapsed since the last measurement (and possible wake up). */
+ elapsed = GetCurrentTimestamp() - sink->throttled_last;
+
+ /* sleep if the transfer is faster than it should be */
+ sleep = elapsed_min - elapsed;
+ if (sleep <= 0)
+ break;
+
+ ResetLatch(MyLatch);
+
+ /* We're eating a potentially set latch, so check for interrupts */
+ CHECK_FOR_INTERRUPTS();
+
+ /*
+ * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be
+ * the maximum time to sleep. Thus the cast to long is safe.
+ */
+ wait_result = WaitLatch(MyLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+ (long) (sleep / 1000),
+ WAIT_EVENT_BASE_BACKUP_THROTTLE);
+
+ if (wait_result & WL_LATCH_SET)
+ CHECK_FOR_INTERRUPTS();
+
+ /* Done waiting? */
+ if (wait_result & WL_TIMEOUT)
+ break;
+ }
+
+ /*
+ * As we work with integers, only whole multiple of throttling_sample was
+ * processed. The rest will be done during the next call of this function.
+ */
+ sink->throttling_counter %= sink->throttling_sample;
+
+ /*
+ * Time interval for the remaining amount and possible next increments
+ * starts now.
+ */
+ sink->throttled_last = GetCurrentTimestamp();
+}
diff --git a/src/include/replication/backup_manifest.h b/src/include/replication/backup_manifest.h
index 099108910ce..16ed7eec9bb 100644
--- a/src/include/replication/backup_manifest.h
+++ b/src/include/replication/backup_manifest.h
@@ -12,9 +12,9 @@
#ifndef BACKUP_MANIFEST_H
#define BACKUP_MANIFEST_H
-#include "access/xlogdefs.h"
#include "common/checksum_helper.h"
#include "pgtime.h"
+#include "replication/basebackup_sink.h"
#include "storage/buffile.h"
typedef enum manifest_option
@@ -47,7 +47,8 @@ extern void AddWALInfoToBackupManifest(backup_manifest_info *manifest,
XLogRecPtr startptr,
TimeLineID starttli, XLogRecPtr endptr,
TimeLineID endtli);
-extern void SendBackupManifest(backup_manifest_info *manifest);
+
+extern void SendBackupManifest(backup_manifest_info *manifest, bbsink *sink);
extern void FreeBackupManifest(backup_manifest_info *manifest);
#endif
diff --git a/src/include/replication/basebackup_sink.h b/src/include/replication/basebackup_sink.h
new file mode 100644
index 00000000000..e6c073c5674
--- /dev/null
+++ b/src/include/replication/basebackup_sink.h
@@ -0,0 +1,296 @@
+/*-------------------------------------------------------------------------
+ *
+ * basebackup_sink.h
+ * API for filtering or sending to a final destination the archives
+ * produced by the base backup process
+ *
+ * Taking a base backup produces one archive per tablespace directory,
+ * plus a backup manifest unless that feature has been disabled. The
+ * goal of the backup process is to put those archives and that manifest
+ * someplace, possibly after postprocessing them in some way. A 'bbsink'
+ * is an object to which those archives, and the manifest if present,
+ * can be sent.
+ *
+ * In practice, there will be a chain of 'bbsink' objects rather than
+ * just one, with callbacks being forwarded from one to the next,
+ * possibly with modification. Each object is responsible for a
+ * single task e.g. command progress reporting, throttling, or
+ * communication with the client.
+ *
+ * Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group
+ *
+ * src/include/replication/basebackup_sink.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef BASEBACKUP_SINK_H
+#define BASEBACKUP_SINK_H
+
+#include "access/xlog_internal.h"
+#include "nodes/pg_list.h"
+
+/* Forward declarations. */
+struct bbsink;
+struct bbsink_ops;
+typedef struct bbsink bbsink;
+typedef struct bbsink_ops bbsink_ops;
+
+/*
+ * Overall backup state shared by all bbsink objects for a backup.
+ *
+ * Before calling bbstate_begin_backup, caller must initiate a bbsink_state
+ * object which will last for the lifetime of the backup, and must thereafter
+ * update it as required before each new call to a bbsink method. The bbsink
+ * will retain a pointer to the state object and will consult it to understand
+ * the progress of the backup.
+ *
+ * 'tablespaces' is a list of tablespaceinfo objects. It must be set before
+ * calling bbstate_begin_backup() and must not be modified thereafter.
+ *
+ * 'tablespace_num' is the index of the current tablespace within the list
+ * stored in 'tablespaces'.
+ *
+ * 'bytes_done' is the number of bytes read so far from $PGDATA.
+ *
+ * 'bytes_total' is the total number of bytes estimated to be present in
+ * $PGDATA, if we have estimated this.
+ *
+ * 'bytes_total_is_valid' is true if and only if a proper estimate has been
+ * stored into 'bytes_total'.
+ *
+ * 'startptr' and 'starttli' identify the point in the WAL stream at which
+ * the backup began. They must be set before calling bbstate_begin_backup()
+ * and must not be modified thereafter.
+ */
+typedef struct bbsink_state
+{
+ List *tablespaces;
+ int tablespace_num;
+ uint64 bytes_done;
+ uint64 bytes_total;
+ bool bytes_total_is_valid;
+ XLogRecPtr startptr;
+ TimeLineID starttli;
+} bbsink_state;
+
+/*
+ * Common data for any type of basebackup sink.
+ *
+ * 'bbs_ops' is the relevant callback table.
+ *
+ * 'bbs_buffer' is the buffer into which data destined for the bbsink
+ * should be stored. It must be a multiple of BLCKSZ.
+ *
+ * 'bbs_buffer_length' is the allocated length of the buffer.
+ *
+ * 'bbs_next' is a pointer to another bbsink to which this bbsink is
+ * forwarding some or all operations.
+ *
+ * 'bbs_state' is a pointer to the bbsink_state object for this backup.
+ * Every bbsink associated with this backup should point to the same
+ * underlying state object.
+ *
+ * In general it is expected that the values of these fields are set when
+ * a bbsink is created and that they do not change thereafter. It's OK
+ * to modify the data to which bbs_buffer or bbs_state point, but no changes
+ * should be made to the contents of this struct.
+ */
+struct bbsink
+{
+ const bbsink_ops *bbs_ops;
+ char *bbs_buffer;
+ size_t bbs_buffer_length;
+ bbsink *bbs_next;
+ bbsink_state *bbs_state;
+};
+
+/*
+ * Callbacks for a base backup sink.
+ *
+ * All of these callbacks are required. If a particular callback just needs to
+ * forward the call to sink->bbs_next, use bbsink_forward_<callback_name> as
+ * the callback.
+ *
+ * Callers should always invoke these callbacks via the bbsink_* inline
+ * functions rather than calling them directly.
+ */
+struct bbsink_ops
+{
+ /*
+ * This callback is invoked just once, at the very start of the backup. It
+ * must set bbs_buffer to point to a chunk of storage where at least
+ * bbs_buffer_length bytes of data can be written.
+ */
+ void (*begin_backup) (bbsink *sink);
+
+ /*
+ * For each archive transmitted to a bbsink, there will be one call to the
+ * begin_archive() callback, some number of calls to the
+ * archive_contents() callback, and then one call to the end_archive()
+ * callback.
+ *
+ * Before invoking the archive_contents() callback, the caller should copy
+ * a number of bytes equal to what will be passed as len into bbs_buffer,
+ * but not more than bbs_buffer_length.
+ *
+ * It's generally good if the buffer is as full as possible before the
+ * archive_contents() callback is invoked, but it's not worth expending
+ * extra cycles to make sure it's absolutely 100% full.
+ */
+ void (*begin_archive) (bbsink *sink, const char *archive_name);
+ void (*archive_contents) (bbsink *sink, size_t len);
+ void (*end_archive) (bbsink *sink);
+
+ /*
+ * If a backup manifest is to be transmitted to a bbsink, there will be
+ * one call to the begin_manifest() callback, some number of calls to the
+ * manifest_contents() callback, and then one call to the end_manifest()
+ * callback. These calls will occur after all archives are transmitted.
+ *
+ * The rules for invoking the manifest_contents() callback are the same as
+ * for the archive_contents() callback above.
+ */
+ void (*begin_manifest) (bbsink *sink);
+ void (*manifest_contents) (bbsink *sink, size_t len);
+ void (*end_manifest) (bbsink *sink);
+
+ /*
+ * This callback is invoked just once, after all archives and the manifest
+ * have been sent.
+ */
+ void (*end_backup) (bbsink *sink, XLogRecPtr endptr, TimeLineID endtli);
+
+ /*
+ * If a backup is aborted by an error, this callback is invoked before the
+ * bbsink object is destroyed, so that it can release any resources that
+ * would not be released automatically. If no error occurs, this callback
+ * is invoked after the end_backup callback.
+ */
+ void (*cleanup) (bbsink *sink);
+};
+
+/* Begin a backup. */
+static inline void
+bbsink_begin_backup(bbsink *sink, bbsink_state *state, int buffer_length)
+{
+ Assert(sink != NULL);
+
+ Assert(buffer_length > 0);
+
+ sink->bbs_state = state;
+ sink->bbs_buffer_length = buffer_length;
+ sink->bbs_ops->begin_backup(sink);
+
+ Assert(sink->bbs_buffer != NULL);
+ Assert((sink->bbs_buffer_length % BLCKSZ) == 0);
+}
+
+/* Begin an archive. */
+static inline void
+bbsink_begin_archive(bbsink *sink, const char *archive_name)
+{
+ Assert(sink != NULL);
+
+ sink->bbs_ops->begin_archive(sink, archive_name);
+}
+
+/* Process some of the contents of an archive. */
+static inline void
+bbsink_archive_contents(bbsink *sink, size_t len)
+{
+ Assert(sink != NULL);
+
+ /*
+ * The caller should make a reasonable attempt to fill the buffer before
+ * calling this function, so it shouldn't be completely empty. Nor should
+ * it be filled beyond capacity.
+ */
+ Assert(len > 0 && len <= sink->bbs_buffer_length);
+
+ sink->bbs_ops->archive_contents(sink, len);
+}
+
+/* Finish an archive. */
+static inline void
+bbsink_end_archive(bbsink *sink)
+{
+ Assert(sink != NULL);
+
+ sink->bbs_ops->end_archive(sink);
+}
+
+/* Begin the backup manifest. */
+static inline void
+bbsink_begin_manifest(bbsink *sink)
+{
+ Assert(sink != NULL);
+
+ sink->bbs_ops->begin_manifest(sink);
+}
+
+/* Process some of the manifest contents. */
+static inline void
+bbsink_manifest_contents(bbsink *sink, size_t len)
+{
+ Assert(sink != NULL);
+
+ /* See comments in bbsink_archive_contents. */
+ Assert(len > 0 && len <= sink->bbs_buffer_length);
+
+ sink->bbs_ops->manifest_contents(sink, len);
+}
+
+/* Finish the backup manifest. */
+static inline void
+bbsink_end_manifest(bbsink *sink)
+{
+ Assert(sink != NULL);
+
+ sink->bbs_ops->end_manifest(sink);
+}
+
+/* Finish a backup. */
+static inline void
+bbsink_end_backup(bbsink *sink, XLogRecPtr endptr, TimeLineID endtli)
+{
+ Assert(sink != NULL);
+ Assert(sink->bbs_state->tablespace_num == list_length(sink->bbs_state->tablespaces));
+
+ sink->bbs_ops->end_backup(sink, endptr, endtli);
+}
+
+/* Release resources before destruction. */
+static inline void
+bbsink_cleanup(bbsink *sink)
+{
+ Assert(sink != NULL);
+
+ sink->bbs_ops->cleanup(sink);
+}
+
+/* Forwarding callbacks. Use these to pass operations through to next sink. */
+extern void bbsink_forward_begin_backup(bbsink *sink);
+extern void bbsink_forward_begin_archive(bbsink *sink,
+ const char *archive_name);
+extern void bbsink_forward_archive_contents(bbsink *sink, size_t len);
+extern void bbsink_forward_end_archive(bbsink *sink);
+extern void bbsink_forward_begin_manifest(bbsink *sink);
+extern void bbsink_forward_manifest_contents(bbsink *sink, size_t len);
+extern void bbsink_forward_end_manifest(bbsink *sink);
+extern void bbsink_forward_end_backup(bbsink *sink, XLogRecPtr endptr,
+ TimeLineID endtli);
+extern void bbsink_forward_cleanup(bbsink *sink);
+
+/* Constructors for various types of sinks. */
+extern bbsink *bbsink_copytblspc_new(void);
+extern bbsink *bbsink_progress_new(bbsink *next, bool estimate_backup_size);
+extern bbsink *bbsink_throttle_new(bbsink *next, uint32 maxrate);
+
+/* Extra interface functions for progress reporting. */
+extern void basebackup_progress_wait_checkpoint(void);
+extern void basebackup_progress_estimate_backup_size(void);
+extern void basebackup_progress_wait_wal_archive(bbsink_state *);
+extern void basebackup_progress_transfer_wal(void);
+extern void basebackup_progress_done(void);
+
+#endif