aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/Makefile4
-rw-r--r--src/backend/replication/backup_manifest.c28
-rw-r--r--src/backend/replication/basebackup.c692
-rw-r--r--src/backend/replication/basebackup_copy.c335
-rw-r--r--src/backend/replication/basebackup_progress.c246
-rw-r--r--src/backend/replication/basebackup_sink.c125
-rw-r--r--src/backend/replication/basebackup_throttle.c199
-rw-r--r--src/include/replication/backup_manifest.h5
-rw-r--r--src/include/replication/basebackup_sink.h296
9 files changed, 1414 insertions, 516 deletions
diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile
index a0381e52f31..74b97cf126a 100644
--- a/src/backend/replication/Makefile
+++ b/src/backend/replication/Makefile
@@ -17,6 +17,10 @@ override CPPFLAGS := -I. -I$(srcdir) $(CPPFLAGS)
OBJS = \
backup_manifest.o \
basebackup.o \
+ basebackup_copy.o \
+ basebackup_progress.o \
+ basebackup_sink.o \
+ basebackup_throttle.o \
repl_gram.o \
slot.o \
slotfuncs.o \
diff --git a/src/backend/replication/backup_manifest.c b/src/backend/replication/backup_manifest.c
index 04ca455ace8..4fe11a3b5cd 100644
--- a/src/backend/replication/backup_manifest.c
+++ b/src/backend/replication/backup_manifest.c
@@ -17,6 +17,7 @@
#include "libpq/pqformat.h"
#include "mb/pg_wchar.h"
#include "replication/backup_manifest.h"
+#include "replication/basebackup_sink.h"
#include "utils/builtins.h"
#include "utils/json.h"
@@ -310,9 +311,8 @@ AddWALInfoToBackupManifest(backup_manifest_info *manifest, XLogRecPtr startptr,
* Finalize the backup manifest, and send it to the client.
*/
void
-SendBackupManifest(backup_manifest_info *manifest)
+SendBackupManifest(backup_manifest_info *manifest, bbsink *sink)
{
- StringInfoData protobuf;
uint8 checksumbuf[PG_SHA256_DIGEST_LENGTH];
char checksumstringbuf[PG_SHA256_DIGEST_STRING_LENGTH];
size_t manifest_bytes_done = 0;
@@ -352,38 +352,28 @@ SendBackupManifest(backup_manifest_info *manifest)
(errcode_for_file_access(),
errmsg("could not rewind temporary file")));
- /* Send CopyOutResponse message */
- pq_beginmessage(&protobuf, 'H');
- pq_sendbyte(&protobuf, 0); /* overall format */
- pq_sendint16(&protobuf, 0); /* natts */
- pq_endmessage(&protobuf);
/*
- * Send CopyData messages.
- *
- * We choose to read back the data from the temporary file in chunks of
- * size BLCKSZ; this isn't necessary, but buffile.c uses that as the I/O
- * size, so it seems to make sense to match that value here.
+ * Send the backup manifest.
*/
+ bbsink_begin_manifest(sink);
while (manifest_bytes_done < manifest->manifest_size)
{
- char manifestbuf[BLCKSZ];
size_t bytes_to_read;
size_t rc;
- bytes_to_read = Min(sizeof(manifestbuf),
+ bytes_to_read = Min(sink->bbs_buffer_length,
manifest->manifest_size - manifest_bytes_done);
- rc = BufFileRead(manifest->buffile, manifestbuf, bytes_to_read);
+ rc = BufFileRead(manifest->buffile, sink->bbs_buffer,
+ bytes_to_read);
if (rc != bytes_to_read)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read from temporary file: %m")));
- pq_putmessage('d', manifestbuf, bytes_to_read);
+ bbsink_manifest_contents(sink, bytes_to_read);
manifest_bytes_done += bytes_to_read;
}
-
- /* No more data, so send CopyDone message */
- pq_putemptymessage('c');
+ bbsink_end_manifest(sink);
/* Release resources */
BufFileClose(manifest->buffile);
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index b7359f43903..38c82c46196 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -17,13 +17,9 @@
#include <time.h>
#include "access/xlog_internal.h" /* for pg_start/stop_backup */
-#include "catalog/pg_type.h"
#include "common/file_perm.h"
#include "commands/defrem.h"
-#include "commands/progress.h"
#include "lib/stringinfo.h"
-#include "libpq/libpq.h"
-#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/pg_list.h"
#include "pgstat.h"
@@ -31,6 +27,7 @@
#include "port.h"
#include "postmaster/syslogger.h"
#include "replication/basebackup.h"
+#include "replication/basebackup_sink.h"
#include "replication/backup_manifest.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
@@ -46,6 +43,16 @@
#include "utils/resowner.h"
#include "utils/timestamp.h"
+/*
+ * How much data do we want to send in one CopyData message? Note that
+ * this may also result in reading the underlying files in chunks of this
+ * size.
+ *
+ * NB: The buffer size is required to be a multiple of the system block
+ * size, so use that value instead if it's bigger than our preference.
+ */
+#define SINK_BUFFER_LENGTH Max(32768, BLCKSZ)
+
typedef struct
{
const char *label;
@@ -59,27 +66,25 @@ typedef struct
pg_checksum_type manifest_checksum_type;
} basebackup_options;
-static int64 sendTablespace(char *path, char *oid, bool sizeonly,
+static int64 sendTablespace(bbsink *sink, char *path, char *oid, bool sizeonly,
struct backup_manifest_info *manifest);
-static int64 sendDir(const char *path, int basepathlen, bool sizeonly,
+static int64 sendDir(bbsink *sink, const char *path, int basepathlen, bool sizeonly,
List *tablespaces, bool sendtblspclinks,
backup_manifest_info *manifest, const char *spcoid);
-static bool sendFile(const char *readfilename, const char *tarfilename,
+static bool sendFile(bbsink *sink, const char *readfilename, const char *tarfilename,
struct stat *statbuf, bool missing_ok, Oid dboid,
backup_manifest_info *manifest, const char *spcoid);
-static void sendFileWithContent(const char *filename, const char *content,
+static void sendFileWithContent(bbsink *sink, const char *filename,
+ const char *content,
backup_manifest_info *manifest);
-static int64 _tarWriteHeader(const char *filename, const char *linktarget,
- struct stat *statbuf, bool sizeonly);
+static int64 _tarWriteHeader(bbsink *sink, const char *filename,
+ const char *linktarget, struct stat *statbuf,
+ bool sizeonly);
+static void _tarWritePadding(bbsink *sink, int len);
static void convert_link_to_directory(const char *pathbuf, struct stat *statbuf);
-static void send_int8_string(StringInfoData *buf, int64 intval);
-static void SendBackupHeader(List *tablespaces);
-static void perform_base_backup(basebackup_options *opt);
+static void perform_base_backup(basebackup_options *opt, bbsink *sink);
static void parse_basebackup_options(List *options, basebackup_options *opt);
-static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
static int compareWalFileNames(const ListCell *a, const ListCell *b);
-static void throttle(size_t increment);
-static void update_basebackup_progress(int64 delta);
static bool is_checksummed_file(const char *fullpath, const char *filename);
static int basebackup_read_file(int fd, char *buf, size_t nbytes, off_t offset,
const char *filename, bool partial_read_ok);
@@ -90,31 +95,6 @@ static bool backup_started_in_recovery = false;
/* Relative path of temporary statistics directory */
static char *statrelpath = NULL;
-/*
- * Size of each block sent into the tar stream for larger files.
- */
-#define TAR_SEND_SIZE 32768
-
-/*
- * How frequently to throttle, as a fraction of the specified rate-second.
- */
-#define THROTTLING_FREQUENCY 8
-
-/* The actual number of bytes, transfer of which may cause sleep. */
-static uint64 throttling_sample;
-
-/* Amount of data already transferred but not yet throttled. */
-static int64 throttling_counter;
-
-/* The minimum time required to transfer throttling_sample bytes. */
-static TimeOffset elapsed_min_unit;
-
-/* The last check of the transfer rate. */
-static TimestampTz throttled_last;
-
-/* The starting XLOG position of the base backup. */
-static XLogRecPtr startptr;
-
/* Total number of checksum failures during base backup. */
static long long int total_checksum_failures;
@@ -122,15 +102,6 @@ static long long int total_checksum_failures;
static bool noverify_checksums = false;
/*
- * Total amount of backup data that will be streamed.
- * -1 means that the size is not estimated.
- */
-static int64 backup_total = 0;
-
-/* Amount of backup data already streamed */
-static int64 backup_streamed = 0;
-
-/*
* Definition of one element part of an exclusion list, used for paths part
* of checksum validation or base backups. "name" is the name of the file
* or path to check for exclusion. If "match_prefix" is true, any items
@@ -253,32 +224,22 @@ static const struct exclude_list_item noChecksumFiles[] = {
* clobbered by longjmp" from stupider versions of gcc.
*/
static void
-perform_base_backup(basebackup_options *opt)
+perform_base_backup(basebackup_options *opt, bbsink *sink)
{
- TimeLineID starttli;
+ bbsink_state state;
XLogRecPtr endptr;
TimeLineID endtli;
StringInfo labelfile;
StringInfo tblspc_map_file;
backup_manifest_info manifest;
int datadirpathlen;
- List *tablespaces = NIL;
- backup_total = 0;
- backup_streamed = 0;
- pgstat_progress_start_command(PROGRESS_COMMAND_BASEBACKUP, InvalidOid);
-
- /*
- * If the estimation of the total backup size is disabled, make the
- * backup_total column in the view return NULL by setting the parameter to
- * -1.
- */
- if (!opt->progress)
- {
- backup_total = -1;
- pgstat_progress_update_param(PROGRESS_BASEBACKUP_BACKUP_TOTAL,
- backup_total);
- }
+ /* Initial backup state, insofar as we know it now. */
+ state.tablespaces = NIL;
+ state.tablespace_num = 0;
+ state.bytes_done = 0;
+ state.bytes_total = 0;
+ state.bytes_total_is_valid = false;
/* we're going to use a BufFile, so we need a ResourceOwner */
Assert(CurrentResourceOwner == NULL);
@@ -295,11 +256,11 @@ perform_base_backup(basebackup_options *opt)
total_checksum_failures = 0;
- pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE,
- PROGRESS_BASEBACKUP_PHASE_WAIT_CHECKPOINT);
- startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &starttli,
- labelfile, &tablespaces,
- tblspc_map_file);
+ basebackup_progress_wait_checkpoint();
+ state.startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint,
+ &state.starttli,
+ labelfile, &state.tablespaces,
+ tblspc_map_file);
/*
* Once do_pg_start_backup has been called, ensure that any failure causes
@@ -312,7 +273,6 @@ perform_base_backup(basebackup_options *opt)
{
ListCell *lc;
tablespaceinfo *ti;
- int tblspc_streamed = 0;
/*
* Calculate the relative path of temporary statistics directory in
@@ -329,7 +289,7 @@ perform_base_backup(basebackup_options *opt)
/* Add a node for the base directory at the end */
ti = palloc0(sizeof(tablespaceinfo));
ti->size = -1;
- tablespaces = lappend(tablespaces, ti);
+ state.tablespaces = lappend(state.tablespaces, ti);
/*
* Calculate the total backup size by summing up the size of each
@@ -337,100 +297,53 @@ perform_base_backup(basebackup_options *opt)
*/
if (opt->progress)
{
- pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE,
- PROGRESS_BASEBACKUP_PHASE_ESTIMATE_BACKUP_SIZE);
+ basebackup_progress_estimate_backup_size();
- foreach(lc, tablespaces)
+ foreach(lc, state.tablespaces)
{
tablespaceinfo *tmp = (tablespaceinfo *) lfirst(lc);
if (tmp->path == NULL)
- tmp->size = sendDir(".", 1, true, tablespaces, true, NULL,
- NULL);
+ tmp->size = sendDir(sink, ".", 1, true, state.tablespaces,
+ true, NULL, NULL);
else
- tmp->size = sendTablespace(tmp->path, tmp->oid, true,
+ tmp->size = sendTablespace(sink, tmp->path, tmp->oid, true,
NULL);
- backup_total += tmp->size;
+ state.bytes_total += tmp->size;
}
+ state.bytes_total_is_valid = true;
}
- /* Report that we are now streaming database files as a base backup */
- {
- const int index[] = {
- PROGRESS_BASEBACKUP_PHASE,
- PROGRESS_BASEBACKUP_BACKUP_TOTAL,
- PROGRESS_BASEBACKUP_TBLSPC_TOTAL
- };
- const int64 val[] = {
- PROGRESS_BASEBACKUP_PHASE_STREAM_BACKUP,
- backup_total, list_length(tablespaces)
- };
-
- pgstat_progress_update_multi_param(3, index, val);
- }
-
- /* Send the starting position of the backup */
- SendXlogRecPtrResult(startptr, starttli);
-
- /* Send tablespace header */
- SendBackupHeader(tablespaces);
-
- /* Setup and activate network throttling, if client requested it */
- if (opt->maxrate > 0)
- {
- throttling_sample =
- (int64) opt->maxrate * (int64) 1024 / THROTTLING_FREQUENCY;
-
- /*
- * The minimum amount of time for throttling_sample bytes to be
- * transferred.
- */
- elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY;
-
- /* Enable throttling. */
- throttling_counter = 0;
-
- /* The 'real data' starts now (header was ignored). */
- throttled_last = GetCurrentTimestamp();
- }
- else
- {
- /* Disable throttling. */
- throttling_counter = -1;
- }
+ /* notify basebackup sink about start of backup */
+ bbsink_begin_backup(sink, &state, SINK_BUFFER_LENGTH);
/* Send off our tablespaces one by one */
- foreach(lc, tablespaces)
+ foreach(lc, state.tablespaces)
{
tablespaceinfo *ti = (tablespaceinfo *) lfirst(lc);
- StringInfoData buf;
-
- /* Send CopyOutResponse message */
- pq_beginmessage(&buf, 'H');
- pq_sendbyte(&buf, 0); /* overall format */
- pq_sendint16(&buf, 0); /* natts */
- pq_endmessage(&buf);
if (ti->path == NULL)
{
struct stat statbuf;
bool sendtblspclinks = true;
+ bbsink_begin_archive(sink, "base.tar");
+
/* In the main tar, include the backup_label first... */
- sendFileWithContent(BACKUP_LABEL_FILE, labelfile->data,
+ sendFileWithContent(sink, BACKUP_LABEL_FILE, labelfile->data,
&manifest);
/* Then the tablespace_map file, if required... */
if (opt->sendtblspcmapfile)
{
- sendFileWithContent(TABLESPACE_MAP, tblspc_map_file->data,
+ sendFileWithContent(sink, TABLESPACE_MAP, tblspc_map_file->data,
&manifest);
sendtblspclinks = false;
}
/* Then the bulk of the files... */
- sendDir(".", 1, false, tablespaces, sendtblspclinks,
- &manifest, NULL);
+ sendDir(sink, ".", 1, false, state.tablespaces,
+ sendtblspclinks, &manifest, NULL);
/* ... and pg_control after everything else. */
if (lstat(XLOG_CONTROL_FILE, &statbuf) != 0)
@@ -438,32 +351,33 @@ perform_base_backup(basebackup_options *opt)
(errcode_for_file_access(),
errmsg("could not stat file \"%s\": %m",
XLOG_CONTROL_FILE)));
- sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf,
+ sendFile(sink, XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf,
false, InvalidOid, &manifest, NULL);
}
else
- sendTablespace(ti->path, ti->oid, false, &manifest);
+ {
+ char *archive_name = psprintf("%s.tar", ti->oid);
+
+ bbsink_begin_archive(sink, archive_name);
+
+ sendTablespace(sink, ti->path, ti->oid, false, &manifest);
+ }
/*
* If we're including WAL, and this is the main data directory we
- * don't terminate the tar stream here. Instead, we will append
- * the xlog files below and terminate it then. This is safe since
- * the main data directory is always sent *last*.
+ * don't treat this as the end of the tablespace. Instead, we will
+ * include the xlog files below and stop afterwards. This is safe
+ * since the main data directory is always sent *last*.
*/
if (opt->includewal && ti->path == NULL)
{
- Assert(lnext(tablespaces, lc) == NULL);
+ Assert(lnext(state.tablespaces, lc) == NULL);
}
else
- pq_putemptymessage('c'); /* CopyDone */
-
- tblspc_streamed++;
- pgstat_progress_update_param(PROGRESS_BASEBACKUP_TBLSPC_STREAMED,
- tblspc_streamed);
+ bbsink_end_archive(sink);
}
- pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE,
- PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE);
+ basebackup_progress_wait_wal_archive(&state);
endptr = do_pg_stop_backup(labelfile->data, !opt->nowait, &endtli);
}
PG_END_ENSURE_ERROR_CLEANUP(do_pg_abort_backup, BoolGetDatum(false));
@@ -489,8 +403,7 @@ perform_base_backup(basebackup_options *opt)
ListCell *lc;
TimeLineID tli;
- pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE,
- PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL);
+ basebackup_progress_transfer_wal();
/*
* I'd rather not worry about timelines here, so scan pg_wal and
@@ -501,8 +414,8 @@ perform_base_backup(basebackup_options *opt)
* shouldn't be such files, but if there are, there's little harm in
* including them.
*/
- XLByteToSeg(startptr, startsegno, wal_segment_size);
- XLogFileName(firstoff, starttli, startsegno, wal_segment_size);
+ XLByteToSeg(state.startptr, startsegno, wal_segment_size);
+ XLogFileName(firstoff, state.starttli, startsegno, wal_segment_size);
XLByteToPrevSeg(endptr, endsegno, wal_segment_size);
XLogFileName(lastoff, endtli, endsegno, wal_segment_size);
@@ -528,7 +441,7 @@ perform_base_backup(basebackup_options *opt)
* Before we go any further, check that none of the WAL segments we
* need were removed.
*/
- CheckXLogRemoved(startsegno, starttli);
+ CheckXLogRemoved(startsegno, state.starttli);
/*
* Sort the WAL filenames. We want to send the files in order from
@@ -555,7 +468,7 @@ perform_base_backup(basebackup_options *opt)
{
char startfname[MAXFNAMELEN];
- XLogFileName(startfname, starttli, startsegno,
+ XLogFileName(startfname, state.starttli, startsegno,
wal_segment_size);
ereport(ERROR,
(errmsg("could not find WAL file \"%s\"", startfname)));
@@ -590,7 +503,6 @@ perform_base_backup(basebackup_options *opt)
{
char *walFileName = (char *) lfirst(lc);
int fd;
- char buf[TAR_SEND_SIZE];
size_t cnt;
pgoff_t len = 0;
@@ -629,22 +541,17 @@ perform_base_backup(basebackup_options *opt)
}
/* send the WAL file itself */
- _tarWriteHeader(pathbuf, NULL, &statbuf, false);
+ _tarWriteHeader(sink, pathbuf, NULL, &statbuf, false);
- while ((cnt = basebackup_read_file(fd, buf,
- Min(sizeof(buf),
+ while ((cnt = basebackup_read_file(fd, sink->bbs_buffer,
+ Min(sink->bbs_buffer_length,
wal_segment_size - len),
len, pathbuf, true)) > 0)
{
CheckXLogRemoved(segno, tli);
- /* Send the chunk as a CopyData message */
- if (pq_putmessage('d', buf, cnt))
- ereport(ERROR,
- (errmsg("base backup could not send data, aborting backup")));
- update_basebackup_progress(cnt);
+ bbsink_archive_contents(sink, cnt);
len += cnt;
- throttle(cnt);
if (len == wal_segment_size)
break;
@@ -673,7 +580,7 @@ perform_base_backup(basebackup_options *opt)
* complete segment.
*/
StatusFilePath(pathbuf, walFileName, ".done");
- sendFileWithContent(pathbuf, "", &manifest);
+ sendFileWithContent(sink, pathbuf, "", &manifest);
}
/*
@@ -696,23 +603,23 @@ perform_base_backup(basebackup_options *opt)
(errcode_for_file_access(),
errmsg("could not stat file \"%s\": %m", pathbuf)));
- sendFile(pathbuf, pathbuf, &statbuf, false, InvalidOid,
+ sendFile(sink, pathbuf, pathbuf, &statbuf, false, InvalidOid,
&manifest, NULL);
/* unconditionally mark file as archived */
StatusFilePath(pathbuf, fname, ".done");
- sendFileWithContent(pathbuf, "", &manifest);
+ sendFileWithContent(sink, pathbuf, "", &manifest);
}
- /* Send CopyDone message for the last tar file */
- pq_putemptymessage('c');
+ bbsink_end_archive(sink);
}
- AddWALInfoToBackupManifest(&manifest, startptr, starttli, endptr, endtli);
+ AddWALInfoToBackupManifest(&manifest, state.startptr, state.starttli,
+ endptr, endtli);
- SendBackupManifest(&manifest);
+ SendBackupManifest(&manifest, sink);
- SendXlogRecPtrResult(endptr, endtli);
+ bbsink_end_backup(sink, endptr, endtli);
if (total_checksum_failures)
{
@@ -738,7 +645,7 @@ perform_base_backup(basebackup_options *opt)
/* clean up the resource owner we created */
WalSndResourceCleanup(true);
- pgstat_progress_end_command();
+ basebackup_progress_done();
}
/*
@@ -943,6 +850,7 @@ void
SendBaseBackup(BaseBackupCmd *cmd)
{
basebackup_options opt;
+ bbsink *sink;
parse_basebackup_options(cmd->options, &opt);
@@ -957,158 +865,40 @@ SendBaseBackup(BaseBackupCmd *cmd)
set_ps_display(activitymsg);
}
- perform_base_backup(&opt);
-}
-
-static void
-send_int8_string(StringInfoData *buf, int64 intval)
-{
- char is[32];
-
- sprintf(is, INT64_FORMAT, intval);
- pq_sendint32(buf, strlen(is));
- pq_sendbytes(buf, is, strlen(is));
-}
-
-static void
-SendBackupHeader(List *tablespaces)
-{
- StringInfoData buf;
- ListCell *lc;
-
- /* Construct and send the directory information */
- pq_beginmessage(&buf, 'T'); /* RowDescription */
- pq_sendint16(&buf, 3); /* 3 fields */
-
- /* First field - spcoid */
- pq_sendstring(&buf, "spcoid");
- pq_sendint32(&buf, 0); /* table oid */
- pq_sendint16(&buf, 0); /* attnum */
- pq_sendint32(&buf, OIDOID); /* type oid */
- pq_sendint16(&buf, 4); /* typlen */
- pq_sendint32(&buf, 0); /* typmod */
- pq_sendint16(&buf, 0); /* format code */
-
- /* Second field - spclocation */
- pq_sendstring(&buf, "spclocation");
- pq_sendint32(&buf, 0);
- pq_sendint16(&buf, 0);
- pq_sendint32(&buf, TEXTOID);
- pq_sendint16(&buf, -1);
- pq_sendint32(&buf, 0);
- pq_sendint16(&buf, 0);
-
- /* Third field - size */
- pq_sendstring(&buf, "size");
- pq_sendint32(&buf, 0);
- pq_sendint16(&buf, 0);
- pq_sendint32(&buf, INT8OID);
- pq_sendint16(&buf, 8);
- pq_sendint32(&buf, 0);
- pq_sendint16(&buf, 0);
- pq_endmessage(&buf);
-
- foreach(lc, tablespaces)
- {
- tablespaceinfo *ti = lfirst(lc);
-
- /* Send one datarow message */
- pq_beginmessage(&buf, 'D');
- pq_sendint16(&buf, 3); /* number of columns */
- if (ti->path == NULL)
- {
- pq_sendint32(&buf, -1); /* Length = -1 ==> NULL */
- pq_sendint32(&buf, -1);
- }
- else
- {
- Size len;
-
- len = strlen(ti->oid);
- pq_sendint32(&buf, len);
- pq_sendbytes(&buf, ti->oid, len);
-
- len = strlen(ti->path);
- pq_sendint32(&buf, len);
- pq_sendbytes(&buf, ti->path, len);
- }
- if (ti->size >= 0)
- send_int8_string(&buf, ti->size / 1024);
- else
- pq_sendint32(&buf, -1); /* NULL */
+ /* Create a basic basebackup sink. */
+ sink = bbsink_copytblspc_new();
- pq_endmessage(&buf);
- }
+ /* Set up network throttling, if client requested it */
+ if (opt.maxrate > 0)
+ sink = bbsink_throttle_new(sink, opt.maxrate);
- /* Send a CommandComplete message */
- pq_puttextmessage('C', "SELECT");
-}
-
-/*
- * Send a single resultset containing just a single
- * XLogRecPtr record (in text format)
- */
-static void
-SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
-{
- StringInfoData buf;
- char str[MAXFNAMELEN];
- Size len;
-
- pq_beginmessage(&buf, 'T'); /* RowDescription */
- pq_sendint16(&buf, 2); /* 2 fields */
-
- /* Field headers */
- pq_sendstring(&buf, "recptr");
- pq_sendint32(&buf, 0); /* table oid */
- pq_sendint16(&buf, 0); /* attnum */
- pq_sendint32(&buf, TEXTOID); /* type oid */
- pq_sendint16(&buf, -1);
- pq_sendint32(&buf, 0);
- pq_sendint16(&buf, 0);
-
- pq_sendstring(&buf, "tli");
- pq_sendint32(&buf, 0); /* table oid */
- pq_sendint16(&buf, 0); /* attnum */
+ /* Set up progress reporting. */
+ sink = bbsink_progress_new(sink, opt.progress);
/*
- * int8 may seem like a surprising data type for this, but in theory int4
- * would not be wide enough for this, as TimeLineID is unsigned.
+ * Perform the base backup, but make sure we clean up the bbsink even if
+ * an error occurs.
*/
- pq_sendint32(&buf, INT8OID); /* type oid */
- pq_sendint16(&buf, -1);
- pq_sendint32(&buf, 0);
- pq_sendint16(&buf, 0);
- pq_endmessage(&buf);
-
- /* Data row */
- pq_beginmessage(&buf, 'D');
- pq_sendint16(&buf, 2); /* number of columns */
-
- len = snprintf(str, sizeof(str),
- "%X/%X", LSN_FORMAT_ARGS(ptr));
- pq_sendint32(&buf, len);
- pq_sendbytes(&buf, str, len);
-
- len = snprintf(str, sizeof(str), "%u", tli);
- pq_sendint32(&buf, len);
- pq_sendbytes(&buf, str, len);
-
- pq_endmessage(&buf);
-
- /* Send a CommandComplete message */
- pq_puttextmessage('C', "SELECT");
+ PG_TRY();
+ {
+ perform_base_backup(&opt, sink);
+ }
+ PG_FINALLY();
+ {
+ bbsink_cleanup(sink);
+ }
+ PG_END_TRY();
}
/*
* Inject a file with given name and content in the output tar stream.
*/
static void
-sendFileWithContent(const char *filename, const char *content,
+sendFileWithContent(bbsink *sink, const char *filename, const char *content,
backup_manifest_info *manifest)
{
struct stat statbuf;
- int pad,
+ int bytes_done = 0,
len;
pg_checksum_context checksum_ctx;
@@ -1134,25 +924,23 @@ sendFileWithContent(const char *filename, const char *content,
statbuf.st_mode = pg_file_create_mode;
statbuf.st_size = len;
- _tarWriteHeader(filename, NULL, &statbuf, false);
- /* Send the contents as a CopyData message */
- pq_putmessage('d', content, len);
- update_basebackup_progress(len);
+ _tarWriteHeader(sink, filename, NULL, &statbuf, false);
- /* Pad to a multiple of the tar block size. */
- pad = tarPaddingBytesRequired(len);
- if (pad > 0)
+ if (pg_checksum_update(&checksum_ctx, (uint8 *) content, len) < 0)
+ elog(ERROR, "could not update checksum of file \"%s\"",
+ filename);
+
+ while (bytes_done < len)
{
- char buf[TAR_BLOCK_SIZE];
+ size_t remaining = len - bytes_done;
+ size_t nbytes = Min(sink->bbs_buffer_length, remaining);
- MemSet(buf, 0, pad);
- pq_putmessage('d', buf, pad);
- update_basebackup_progress(pad);
+ memcpy(sink->bbs_buffer, content, nbytes);
+ bbsink_archive_contents(sink, nbytes);
+ bytes_done += nbytes;
}
- if (pg_checksum_update(&checksum_ctx, (uint8 *) content, len) < 0)
- elog(ERROR, "could not update checksum of file \"%s\"",
- filename);
+ _tarWritePadding(sink, len);
AddFileToBackupManifest(manifest, NULL, filename, len,
(pg_time_t) statbuf.st_mtime, &checksum_ctx);
@@ -1166,7 +954,7 @@ sendFileWithContent(const char *filename, const char *content,
* Only used to send auxiliary tablespaces, not PGDATA.
*/
static int64
-sendTablespace(char *path, char *spcoid, bool sizeonly,
+sendTablespace(bbsink *sink, char *path, char *spcoid, bool sizeonly,
backup_manifest_info *manifest)
{
int64 size;
@@ -1196,11 +984,11 @@ sendTablespace(char *path, char *spcoid, bool sizeonly,
return 0;
}
- size = _tarWriteHeader(TABLESPACE_VERSION_DIRECTORY, NULL, &statbuf,
+ size = _tarWriteHeader(sink, TABLESPACE_VERSION_DIRECTORY, NULL, &statbuf,
sizeonly);
/* Send all the files in the tablespace version directory */
- size += sendDir(pathbuf, strlen(path), sizeonly, NIL, true, manifest,
+ size += sendDir(sink, pathbuf, strlen(path), sizeonly, NIL, true, manifest,
spcoid);
return size;
@@ -1219,8 +1007,8 @@ sendTablespace(char *path, char *spcoid, bool sizeonly,
* as it will be sent separately in the tablespace_map file.
*/
static int64
-sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
- bool sendtblspclinks, backup_manifest_info *manifest,
+sendDir(bbsink *sink, const char *path, int basepathlen, bool sizeonly,
+ List *tablespaces, bool sendtblspclinks, backup_manifest_info *manifest,
const char *spcoid)
{
DIR *dir;
@@ -1380,8 +1168,8 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
{
elog(DEBUG1, "contents of directory \"%s\" excluded from backup", de->d_name);
convert_link_to_directory(pathbuf, &statbuf);
- size += _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf,
- sizeonly);
+ size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, NULL,
+ &statbuf, sizeonly);
excludeFound = true;
break;
}
@@ -1398,8 +1186,8 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
{
elog(DEBUG1, "contents of directory \"%s\" excluded from backup", statrelpath);
convert_link_to_directory(pathbuf, &statbuf);
- size += _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf,
- sizeonly);
+ size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, NULL,
+ &statbuf, sizeonly);
continue;
}
@@ -1412,15 +1200,15 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
{
/* If pg_wal is a symlink, write it as a directory anyway */
convert_link_to_directory(pathbuf, &statbuf);
- size += _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf,
- sizeonly);
+ size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, NULL,
+ &statbuf, sizeonly);
/*
* Also send archive_status directory (by hackishly reusing
* statbuf from above ...).
*/
- size += _tarWriteHeader("./pg_wal/archive_status", NULL, &statbuf,
- sizeonly);
+ size += _tarWriteHeader(sink, "./pg_wal/archive_status", NULL,
+ &statbuf, sizeonly);
continue; /* don't recurse into pg_wal */
}
@@ -1451,7 +1239,7 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
pathbuf)));
linkpath[rllen] = '\0';
- size += _tarWriteHeader(pathbuf + basepathlen + 1, linkpath,
+ size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, linkpath,
&statbuf, sizeonly);
#else
@@ -1475,7 +1263,7 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
* Store a directory entry in the tar file so we can get the
* permissions right.
*/
- size += _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf,
+ size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, NULL, &statbuf,
sizeonly);
/*
@@ -1507,7 +1295,7 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
skip_this_dir = true;
if (!skip_this_dir)
- size += sendDir(pathbuf, basepathlen, sizeonly, tablespaces,
+ size += sendDir(sink, pathbuf, basepathlen, sizeonly, tablespaces,
sendtblspclinks, manifest, spcoid);
}
else if (S_ISREG(statbuf.st_mode))
@@ -1515,7 +1303,7 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
bool sent = false;
if (!sizeonly)
- sent = sendFile(pathbuf, pathbuf + basepathlen + 1, &statbuf,
+ sent = sendFile(sink, pathbuf, pathbuf + basepathlen + 1, &statbuf,
true, isDbDir ? atooid(lastDir + 1) : InvalidOid,
manifest, spcoid);
@@ -1592,21 +1380,19 @@ is_checksummed_file(const char *fullpath, const char *filename)
* and the file did not exist.
*/
static bool
-sendFile(const char *readfilename, const char *tarfilename,
+sendFile(bbsink *sink, const char *readfilename, const char *tarfilename,
struct stat *statbuf, bool missing_ok, Oid dboid,
backup_manifest_info *manifest, const char *spcoid)
{
int fd;
BlockNumber blkno = 0;
bool block_retry = false;
- char buf[TAR_SEND_SIZE];
uint16 checksum;
int checksum_failures = 0;
off_t cnt;
int i;
pgoff_t len = 0;
char *page;
- size_t pad;
PageHeader phdr;
int segmentno = 0;
char *segmentpath;
@@ -1627,7 +1413,7 @@ sendFile(const char *readfilename, const char *tarfilename,
errmsg("could not open file \"%s\": %m", readfilename)));
}
- _tarWriteHeader(tarfilename, NULL, statbuf, false);
+ _tarWriteHeader(sink, tarfilename, NULL, statbuf, false);
if (!noverify_checksums && DataChecksumsEnabled())
{
@@ -1668,9 +1454,11 @@ sendFile(const char *readfilename, const char *tarfilename,
*/
while (len < statbuf->st_size)
{
+ size_t remaining = statbuf->st_size - len;
+
/* Try to read some more data. */
- cnt = basebackup_read_file(fd, buf,
- Min(sizeof(buf), statbuf->st_size - len),
+ cnt = basebackup_read_file(fd, sink->bbs_buffer,
+ Min(sink->bbs_buffer_length, remaining),
len, readfilename, true);
/*
@@ -1687,7 +1475,7 @@ sendFile(const char *readfilename, const char *tarfilename,
* TAR_SEND_SIZE/buf is divisible by BLCKSZ and we read a multiple of
* BLCKSZ bytes.
*/
- Assert(TAR_SEND_SIZE % BLCKSZ == 0);
+ Assert((sink->bbs_buffer_length % BLCKSZ) == 0);
if (verify_checksum && (cnt % BLCKSZ != 0))
{
@@ -1703,7 +1491,7 @@ sendFile(const char *readfilename, const char *tarfilename,
{
for (i = 0; i < cnt / BLCKSZ; i++)
{
- page = buf + BLCKSZ * i;
+ page = sink->bbs_buffer + BLCKSZ * i;
/*
* Only check pages which have not been modified since the
@@ -1713,7 +1501,7 @@ sendFile(const char *readfilename, const char *tarfilename,
* this case. We also skip completely new pages, since they
* don't have a checksum yet.
*/
- if (!PageIsNew(page) && PageGetLSN(page) < startptr)
+ if (!PageIsNew(page) && PageGetLSN(page) < sink->bbs_state->startptr)
{
checksum = pg_checksum_page((char *) page, blkno + segmentno * RELSEG_SIZE);
phdr = (PageHeader) page;
@@ -1735,7 +1523,8 @@ sendFile(const char *readfilename, const char *tarfilename,
/* Reread the failed block */
reread_cnt =
- basebackup_read_file(fd, buf + BLCKSZ * i,
+ basebackup_read_file(fd,
+ sink->bbs_buffer + BLCKSZ * i,
BLCKSZ, len + BLCKSZ * i,
readfilename,
false);
@@ -1782,34 +1571,29 @@ sendFile(const char *readfilename, const char *tarfilename,
}
}
- /* Send the chunk as a CopyData message */
- if (pq_putmessage('d', buf, cnt))
- ereport(ERROR,
- (errmsg("base backup could not send data, aborting backup")));
- update_basebackup_progress(cnt);
+ bbsink_archive_contents(sink, cnt);
/* Also feed it to the checksum machinery. */
- if (pg_checksum_update(&checksum_ctx, (uint8 *) buf, cnt) < 0)
+ if (pg_checksum_update(&checksum_ctx,
+ (uint8 *) sink->bbs_buffer, cnt) < 0)
elog(ERROR, "could not update checksum of base backup");
len += cnt;
- throttle(cnt);
}
/* If the file was truncated while we were sending it, pad it with zeros */
- if (len < statbuf->st_size)
+ while (len < statbuf->st_size)
{
- MemSet(buf, 0, sizeof(buf));
- while (len < statbuf->st_size)
- {
- cnt = Min(sizeof(buf), statbuf->st_size - len);
- pq_putmessage('d', buf, cnt);
- if (pg_checksum_update(&checksum_ctx, (uint8 *) buf, cnt) < 0)
- elog(ERROR, "could not update checksum of base backup");
- update_basebackup_progress(cnt);
- len += cnt;
- throttle(cnt);
- }
+ size_t remaining = statbuf->st_size - len;
+ size_t nbytes = Min(sink->bbs_buffer_length, remaining);
+
+ MemSet(sink->bbs_buffer, 0, nbytes);
+ if (pg_checksum_update(&checksum_ctx,
+ (uint8 *) sink->bbs_buffer,
+ nbytes) < 0)
+ elog(ERROR, "could not update checksum of base backup");
+ bbsink_archive_contents(sink, nbytes);
+ len += nbytes;
}
/*
@@ -1817,13 +1601,7 @@ sendFile(const char *readfilename, const char *tarfilename,
* of data is probably not worth throttling, and is not checksummed
* because it's not actually part of the file.)
*/
- pad = tarPaddingBytesRequired(len);
- if (pad > 0)
- {
- MemSet(buf, 0, pad);
- pq_putmessage('d', buf, pad);
- update_basebackup_progress(pad);
- }
+ _tarWritePadding(sink, len);
CloseTransientFile(fd);
@@ -1846,18 +1624,28 @@ sendFile(const char *readfilename, const char *tarfilename,
return true;
}
-
static int64
-_tarWriteHeader(const char *filename, const char *linktarget,
+_tarWriteHeader(bbsink *sink, const char *filename, const char *linktarget,
struct stat *statbuf, bool sizeonly)
{
- char h[TAR_BLOCK_SIZE];
enum tarError rc;
if (!sizeonly)
{
- rc = tarCreateHeader(h, filename, linktarget, statbuf->st_size,
- statbuf->st_mode, statbuf->st_uid, statbuf->st_gid,
+ /*
+ * As of this writing, the smallest supported block size is 1kB, which
+ * is twice TAR_BLOCK_SIZE. Since the buffer size is required to be a
+ * multiple of BLCKSZ, it should be safe to assume that the buffer is
+ * large enough to fit an entire tar block. We double-check by means
+ * of these assertions.
+ */
+ StaticAssertStmt(TAR_BLOCK_SIZE <= BLCKSZ,
+ "BLCKSZ too small for tar block");
+ Assert(sink->bbs_buffer_length >= TAR_BLOCK_SIZE);
+
+ rc = tarCreateHeader(sink->bbs_buffer, filename, linktarget,
+ statbuf->st_size, statbuf->st_mode,
+ statbuf->st_uid, statbuf->st_gid,
statbuf->st_mtime);
switch (rc)
@@ -1879,134 +1667,48 @@ _tarWriteHeader(const char *filename, const char *linktarget,
elog(ERROR, "unrecognized tar error: %d", rc);
}
- pq_putmessage('d', h, sizeof(h));
- update_basebackup_progress(sizeof(h));
+ bbsink_archive_contents(sink, TAR_BLOCK_SIZE);
}
- return sizeof(h);
+ return TAR_BLOCK_SIZE;
}
/*
- * If the entry in statbuf is a link, then adjust statbuf to make it look like a
- * directory, so that it will be written that way.
+ * Pad with zero bytes out to a multiple of TAR_BLOCK_SIZE.
*/
static void
-convert_link_to_directory(const char *pathbuf, struct stat *statbuf)
+_tarWritePadding(bbsink *sink, int len)
{
- /* If symlink, write it as a directory anyway */
-#ifndef WIN32
- if (S_ISLNK(statbuf->st_mode))
-#else
- if (pgwin32_is_junction(pathbuf))
-#endif
- statbuf->st_mode = S_IFDIR | pg_dir_create_mode;
-}
-
-/*
- * Increment the network transfer counter by the given number of bytes,
- * and sleep if necessary to comply with the requested network transfer
- * rate.
- */
-static void
-throttle(size_t increment)
-{
- TimeOffset elapsed_min;
-
- if (throttling_counter < 0)
- return;
-
- throttling_counter += increment;
- if (throttling_counter < throttling_sample)
- return;
-
- /* How much time should have elapsed at minimum? */
- elapsed_min = elapsed_min_unit *
- (throttling_counter / throttling_sample);
+ int pad = tarPaddingBytesRequired(len);
/*
- * Since the latch could be set repeatedly because of concurrently WAL
- * activity, sleep in a loop to ensure enough time has passed.
+ * As in _tarWriteHeader, it should be safe to assume that the buffer is
+ * large enough that we don't need to do this in multiple chunks.
*/
- for (;;)
- {
- TimeOffset elapsed,
- sleep;
- int wait_result;
-
- /* Time elapsed since the last measurement (and possible wake up). */
- elapsed = GetCurrentTimestamp() - throttled_last;
-
- /* sleep if the transfer is faster than it should be */
- sleep = elapsed_min - elapsed;
- if (sleep <= 0)
- break;
-
- ResetLatch(MyLatch);
-
- /* We're eating a potentially set latch, so check for interrupts */
- CHECK_FOR_INTERRUPTS();
+ Assert(sink->bbs_buffer_length >= TAR_BLOCK_SIZE);
+ Assert(pad <= TAR_BLOCK_SIZE);
- /*
- * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be
- * the maximum time to sleep. Thus the cast to long is safe.
- */
- wait_result = WaitLatch(MyLatch,
- WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
- (long) (sleep / 1000),
- WAIT_EVENT_BASE_BACKUP_THROTTLE);
-
- if (wait_result & WL_LATCH_SET)
- CHECK_FOR_INTERRUPTS();
-
- /* Done waiting? */
- if (wait_result & WL_TIMEOUT)
- break;
+ if (pad > 0)
+ {
+ MemSet(sink->bbs_buffer, 0, pad);
+ bbsink_archive_contents(sink, pad);
}
-
- /*
- * As we work with integers, only whole multiple of throttling_sample was
- * processed. The rest will be done during the next call of this function.
- */
- throttling_counter %= throttling_sample;
-
- /*
- * Time interval for the remaining amount and possible next increments
- * starts now.
- */
- throttled_last = GetCurrentTimestamp();
}
/*
- * Increment the counter for the amount of data already streamed
- * by the given number of bytes, and update the progress report for
- * pg_stat_progress_basebackup.
+ * If the entry in statbuf is a link, then adjust statbuf to make it look like a
+ * directory, so that it will be written that way.
*/
static void
-update_basebackup_progress(int64 delta)
+convert_link_to_directory(const char *pathbuf, struct stat *statbuf)
{
- const int index[] = {
- PROGRESS_BASEBACKUP_BACKUP_STREAMED,
- PROGRESS_BASEBACKUP_BACKUP_TOTAL
- };
- int64 val[2];
- int nparam = 0;
-
- backup_streamed += delta;
- val[nparam++] = backup_streamed;
-
- /*
- * Avoid overflowing past 100% or the full size. This may make the total
- * size number change as we approach the end of the backup (the estimate
- * will always be wrong if WAL is included), but that's better than having
- * the done column be bigger than the total.
- */
- if (backup_total > -1 && backup_streamed > backup_total)
- {
- backup_total = backup_streamed;
- val[nparam++] = backup_total;
- }
-
- pgstat_progress_update_multi_param(nparam, index, val);
+ /* If symlink, write it as a directory anyway */
+#ifndef WIN32
+ if (S_ISLNK(statbuf->st_mode))
+#else
+ if (pgwin32_is_junction(pathbuf))
+#endif
+ statbuf->st_mode = S_IFDIR | pg_dir_create_mode;
}
/*
diff --git a/src/backend/replication/basebackup_copy.c b/src/backend/replication/basebackup_copy.c
new file mode 100644
index 00000000000..30bab4546ef
--- /dev/null
+++ b/src/backend/replication/basebackup_copy.c
@@ -0,0 +1,335 @@
+/*-------------------------------------------------------------------------
+ *
+ * basebackup_copy.c
+ * send basebackup archives using one COPY OUT operation per
+ * tablespace, and an additional COPY OUT for the backup manifest
+ *
+ * Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/replication/basebackup_copy.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "catalog/pg_type_d.h"
+#include "libpq/libpq.h"
+#include "libpq/pqformat.h"
+#include "replication/basebackup.h"
+#include "replication/basebackup_sink.h"
+
+static void bbsink_copytblspc_begin_backup(bbsink *sink);
+static void bbsink_copytblspc_begin_archive(bbsink *sink,
+ const char *archive_name);
+static void bbsink_copytblspc_archive_contents(bbsink *sink, size_t len);
+static void bbsink_copytblspc_end_archive(bbsink *sink);
+static void bbsink_copytblspc_begin_manifest(bbsink *sink);
+static void bbsink_copytblspc_manifest_contents(bbsink *sink, size_t len);
+static void bbsink_copytblspc_end_manifest(bbsink *sink);
+static void bbsink_copytblspc_end_backup(bbsink *sink, XLogRecPtr endptr,
+ TimeLineID endtli);
+static void bbsink_copytblspc_cleanup(bbsink *sink);
+
+static void SendCopyOutResponse(void);
+static void SendCopyData(const char *data, size_t len);
+static void SendCopyDone(void);
+static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
+static void SendTablespaceList(List *tablespaces);
+static void send_int8_string(StringInfoData *buf, int64 intval);
+
+const bbsink_ops bbsink_copytblspc_ops = {
+ .begin_backup = bbsink_copytblspc_begin_backup,
+ .begin_archive = bbsink_copytblspc_begin_archive,
+ .archive_contents = bbsink_copytblspc_archive_contents,
+ .end_archive = bbsink_copytblspc_end_archive,
+ .begin_manifest = bbsink_copytblspc_begin_manifest,
+ .manifest_contents = bbsink_copytblspc_manifest_contents,
+ .end_manifest = bbsink_copytblspc_end_manifest,
+ .end_backup = bbsink_copytblspc_end_backup,
+ .cleanup = bbsink_copytblspc_cleanup
+};
+
+/*
+ * Create a new 'copytblspc' bbsink.
+ */
+bbsink *
+bbsink_copytblspc_new(void)
+{
+ bbsink *sink = palloc0(sizeof(bbsink));
+
+ *((const bbsink_ops **) &sink->bbs_ops) = &bbsink_copytblspc_ops;
+
+ return sink;
+}
+
+/*
+ * Begin backup.
+ */
+static void
+bbsink_copytblspc_begin_backup(bbsink *sink)
+{
+ bbsink_state *state = sink->bbs_state;
+
+ /* Create a suitable buffer. */
+ sink->bbs_buffer = palloc(sink->bbs_buffer_length);
+
+ /* Tell client the backup start location. */
+ SendXlogRecPtrResult(state->startptr, state->starttli);
+
+ /* Send client a list of tablespaces. */
+ SendTablespaceList(state->tablespaces);
+
+ /* Send a CommandComplete message */
+ pq_puttextmessage('C', "SELECT");
+}
+
+/*
+ * Each archive is set as a separate stream of COPY data, and thus begins
+ * with a CopyOutResponse message.
+ */
+static void
+bbsink_copytblspc_begin_archive(bbsink *sink, const char *archive_name)
+{
+ SendCopyOutResponse();
+}
+
+/*
+ * Each chunk of data within the archive is sent as a CopyData message.
+ */
+static void
+bbsink_copytblspc_archive_contents(bbsink *sink, size_t len)
+{
+ SendCopyData(sink->bbs_buffer, len);
+}
+
+/*
+ * The archive is terminated by a CopyDone message.
+ */
+static void
+bbsink_copytblspc_end_archive(bbsink *sink)
+{
+ SendCopyDone();
+}
+
+/*
+ * The backup manifest is sent as a separate stream of COPY data, and thus
+ * begins with a CopyOutResponse message.
+ */
+static void
+bbsink_copytblspc_begin_manifest(bbsink *sink)
+{
+ SendCopyOutResponse();
+}
+
+/*
+ * Each chunk of manifest data is sent using a CopyData message.
+ */
+static void
+bbsink_copytblspc_manifest_contents(bbsink *sink, size_t len)
+{
+ SendCopyData(sink->bbs_buffer, len);
+}
+
+/*
+ * When we've finished sending the manifest, send a CopyDone message.
+ */
+static void
+bbsink_copytblspc_end_manifest(bbsink *sink)
+{
+ SendCopyDone();
+}
+
+/*
+ * Send end-of-backup wire protocol messages.
+ */
+static void
+bbsink_copytblspc_end_backup(bbsink *sink, XLogRecPtr endptr,
+ TimeLineID endtli)
+{
+ SendXlogRecPtrResult(endptr, endtli);
+}
+
+/*
+ * Cleanup.
+ */
+static void
+bbsink_copytblspc_cleanup(bbsink *sink)
+{
+ /* Nothing to do. */
+}
+
+/*
+ * Send a CopyOutResponse message.
+ */
+static void
+SendCopyOutResponse(void)
+{
+ StringInfoData buf;
+
+ pq_beginmessage(&buf, 'H');
+ pq_sendbyte(&buf, 0); /* overall format */
+ pq_sendint16(&buf, 0); /* natts */
+ pq_endmessage(&buf);
+}
+
+/*
+ * Send a CopyData message.
+ */
+static void
+SendCopyData(const char *data, size_t len)
+{
+ pq_putmessage('d', data, len);
+}
+
+/*
+ * Send a CopyDone message.
+ */
+static void
+SendCopyDone(void)
+{
+ pq_putemptymessage('c');
+}
+
+/*
+ * Send a single resultset containing just a single
+ * XLogRecPtr record (in text format)
+ */
+static void
+SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
+{
+ StringInfoData buf;
+ char str[MAXFNAMELEN];
+ Size len;
+
+ pq_beginmessage(&buf, 'T'); /* RowDescription */
+ pq_sendint16(&buf, 2); /* 2 fields */
+
+ /* Field headers */
+ pq_sendstring(&buf, "recptr");
+ pq_sendint32(&buf, 0); /* table oid */
+ pq_sendint16(&buf, 0); /* attnum */
+ pq_sendint32(&buf, TEXTOID); /* type oid */
+ pq_sendint16(&buf, -1);
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+
+ pq_sendstring(&buf, "tli");
+ pq_sendint32(&buf, 0); /* table oid */
+ pq_sendint16(&buf, 0); /* attnum */
+
+ /*
+ * int8 may seem like a surprising data type for this, but in theory int4
+ * would not be wide enough for this, as TimeLineID is unsigned.
+ */
+ pq_sendint32(&buf, INT8OID); /* type oid */
+ pq_sendint16(&buf, -1);
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_endmessage(&buf);
+
+ /* Data row */
+ pq_beginmessage(&buf, 'D');
+ pq_sendint16(&buf, 2); /* number of columns */
+
+ len = snprintf(str, sizeof(str),
+ "%X/%X", LSN_FORMAT_ARGS(ptr));
+ pq_sendint32(&buf, len);
+ pq_sendbytes(&buf, str, len);
+
+ len = snprintf(str, sizeof(str), "%u", tli);
+ pq_sendint32(&buf, len);
+ pq_sendbytes(&buf, str, len);
+
+ pq_endmessage(&buf);
+
+ /* Send a CommandComplete message */
+ pq_puttextmessage('C', "SELECT");
+}
+
+/*
+ * Send a result set via libpq describing the tablespace list.
+ */
+static void
+SendTablespaceList(List *tablespaces)
+{
+ StringInfoData buf;
+ ListCell *lc;
+
+ /* Construct and send the directory information */
+ pq_beginmessage(&buf, 'T'); /* RowDescription */
+ pq_sendint16(&buf, 3); /* 3 fields */
+
+ /* First field - spcoid */
+ pq_sendstring(&buf, "spcoid");
+ pq_sendint32(&buf, 0); /* table oid */
+ pq_sendint16(&buf, 0); /* attnum */
+ pq_sendint32(&buf, OIDOID); /* type oid */
+ pq_sendint16(&buf, 4); /* typlen */
+ pq_sendint32(&buf, 0); /* typmod */
+ pq_sendint16(&buf, 0); /* format code */
+
+ /* Second field - spclocation */
+ pq_sendstring(&buf, "spclocation");
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_sendint32(&buf, TEXTOID);
+ pq_sendint16(&buf, -1);
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+
+ /* Third field - size */
+ pq_sendstring(&buf, "size");
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_sendint32(&buf, INT8OID);
+ pq_sendint16(&buf, 8);
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_endmessage(&buf);
+
+ foreach(lc, tablespaces)
+ {
+ tablespaceinfo *ti = lfirst(lc);
+
+ /* Send one datarow message */
+ pq_beginmessage(&buf, 'D');
+ pq_sendint16(&buf, 3); /* number of columns */
+ if (ti->path == NULL)
+ {
+ pq_sendint32(&buf, -1); /* Length = -1 ==> NULL */
+ pq_sendint32(&buf, -1);
+ }
+ else
+ {
+ Size len;
+
+ len = strlen(ti->oid);
+ pq_sendint32(&buf, len);
+ pq_sendbytes(&buf, ti->oid, len);
+
+ len = strlen(ti->path);
+ pq_sendint32(&buf, len);
+ pq_sendbytes(&buf, ti->path, len);
+ }
+ if (ti->size >= 0)
+ send_int8_string(&buf, ti->size / 1024);
+ else
+ pq_sendint32(&buf, -1); /* NULL */
+
+ pq_endmessage(&buf);
+ }
+}
+
+/*
+ * Send a 64-bit integer as a string via the wire protocol.
+ */
+static void
+send_int8_string(StringInfoData *buf, int64 intval)
+{
+ char is[32];
+
+ sprintf(is, INT64_FORMAT, intval);
+ pq_sendint32(buf, strlen(is));
+ pq_sendbytes(buf, is, strlen(is));
+}
diff --git a/src/backend/replication/basebackup_progress.c b/src/backend/replication/basebackup_progress.c
new file mode 100644
index 00000000000..e1a196251ef
--- /dev/null
+++ b/src/backend/replication/basebackup_progress.c
@@ -0,0 +1,246 @@
+/*-------------------------------------------------------------------------
+ *
+ * basebackup_progress.c
+ * Basebackup sink implementing progress tracking, including but not
+ * limited to command progress reporting.
+ *
+ * This should be used even if the PROGRESS option to the replication
+ * command BASE_BACKUP is not specified. Without that option, we won't
+ * have tallied up the size of the files that are going to need to be
+ * backed up, but we can still report to the command progress reporting
+ * facility how much data we've processed.
+ *
+ * Moreover, we also use this as a convenient place to update certain
+ * fields of the bbsink_state. That work is accurately described as
+ * keeping track of our progress, but it's not just for introspection.
+ * We need those fields to be updated properly in order for base backups
+ * to work.
+ *
+ * This particular basebackup sink requires extra callbacks that most base
+ * backup sinks don't. Rather than cramming those into the interface, we just
+ * have a few extra functions here that basebackup.c can call. (We could put
+ * the logic directly into that file as it's fairly simple, but it seems
+ * cleaner to have everything related to progress reporting in one place.)
+ *
+ * Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/replication/basebackup_progress.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "commands/progress.h"
+#include "miscadmin.h"
+#include "replication/basebackup.h"
+#include "replication/basebackup_sink.h"
+#include "pgstat.h"
+#include "storage/latch.h"
+#include "utils/timestamp.h"
+
+static void bbsink_progress_begin_backup(bbsink *sink);
+static void bbsink_progress_archive_contents(bbsink *sink, size_t len);
+static void bbsink_progress_end_archive(bbsink *sink);
+
+const bbsink_ops bbsink_progress_ops = {
+ .begin_backup = bbsink_progress_begin_backup,
+ .begin_archive = bbsink_forward_begin_archive,
+ .archive_contents = bbsink_progress_archive_contents,
+ .end_archive = bbsink_progress_end_archive,
+ .begin_manifest = bbsink_forward_begin_manifest,
+ .manifest_contents = bbsink_forward_manifest_contents,
+ .end_manifest = bbsink_forward_end_manifest,
+ .end_backup = bbsink_forward_end_backup,
+ .cleanup = bbsink_forward_cleanup
+};
+
+/*
+ * Create a new basebackup sink that performs progress tracking functions and
+ * forwards data to a successor sink.
+ */
+bbsink *
+bbsink_progress_new(bbsink *next, bool estimate_backup_size)
+{
+ bbsink *sink;
+
+ Assert(next != NULL);
+
+ sink = palloc0(sizeof(bbsink));
+ *((const bbsink_ops **) &sink->bbs_ops) = &bbsink_progress_ops;
+ sink->bbs_next = next;
+
+ /*
+ * Report that a base backup is in progress, and set the total size of the
+ * backup to -1, which will get translated to NULL. If we're estimating
+ * the backup size, we'll insert the real estimate when we have it.
+ */
+ pgstat_progress_start_command(PROGRESS_COMMAND_BASEBACKUP, InvalidOid);
+ pgstat_progress_update_param(PROGRESS_BASEBACKUP_BACKUP_TOTAL, -1);
+
+ return sink;
+}
+
+/*
+ * Progress reporting at start of backup.
+ */
+static void
+bbsink_progress_begin_backup(bbsink *sink)
+{
+ const int index[] = {
+ PROGRESS_BASEBACKUP_PHASE,
+ PROGRESS_BASEBACKUP_BACKUP_TOTAL,
+ PROGRESS_BASEBACKUP_TBLSPC_TOTAL
+ };
+ int64 val[3];
+
+ /*
+ * Report that we are now streaming database files as a base backup. Also
+ * advertise the number of tablespaces, and, if known, the estimated total
+ * backup size.
+ */
+ val[0] = PROGRESS_BASEBACKUP_PHASE_STREAM_BACKUP;
+ if (sink->bbs_state->bytes_total_is_valid)
+ val[1] = sink->bbs_state->bytes_total;
+ else
+ val[1] = -1;
+ val[2] = list_length(sink->bbs_state->tablespaces);
+ pgstat_progress_update_multi_param(3, index, val);
+
+ /* Delegate to next sink. */
+ bbsink_forward_begin_backup(sink);
+}
+
+/*
+ * End-of archive progress reporting.
+ */
+static void
+bbsink_progress_end_archive(bbsink *sink)
+{
+ /*
+ * We expect one archive per tablespace, so reaching the end of an archive
+ * also means reaching the end of a tablespace. (Some day we might have a
+ * reason to decouple these concepts.)
+ *
+ * If WAL is included in the backup, we'll mark the last tablespace
+ * complete before the last archive is complete, so we need a guard here
+ * to ensure that the number of tablespaces streamed doesn't exceed the
+ * total.
+ */
+ if (sink->bbs_state->tablespace_num < list_length(sink->bbs_state->tablespaces))
+ pgstat_progress_update_param(PROGRESS_BASEBACKUP_TBLSPC_STREAMED,
+ sink->bbs_state->tablespace_num + 1);
+
+ /* Delegate to next sink. */
+ bbsink_forward_end_archive(sink);
+
+ /*
+ * This is a convenient place to update the bbsink_state's notion of which
+ * is the current tablespace. Note that the bbsink_state object is shared
+ * across all bbsink objects involved, but we're the outermost one and
+ * this is the very last thing we do.
+ */
+ sink->bbs_state->tablespace_num++;
+}
+
+/*
+ * Handle progress tracking for new archive contents.
+ *
+ * Increment the counter for the amount of data already streamed
+ * by the given number of bytes, and update the progress report for
+ * pg_stat_progress_basebackup.
+ */
+static void
+bbsink_progress_archive_contents(bbsink *sink, size_t len)
+{
+ bbsink_state *state = sink->bbs_state;
+ const int index[] = {
+ PROGRESS_BASEBACKUP_BACKUP_STREAMED,
+ PROGRESS_BASEBACKUP_BACKUP_TOTAL
+ };
+ int64 val[2];
+ int nparam = 0;
+
+ /* First update bbsink_state with # of bytes done. */
+ state->bytes_done += len;
+
+ /* Now forward to next sink. */
+ bbsink_forward_archive_contents(sink, len);
+
+ /* Prepare to set # of bytes done for command progress reporting. */
+ val[nparam++] = state->bytes_done;
+
+ /*
+ * We may also want to update # of total bytes, to avoid overflowing past
+ * 100% or the full size. This may make the total size number change as we
+ * approach the end of the backup (the estimate will always be wrong if
+ * WAL is included), but that's better than having the done column be
+ * bigger than the total.
+ */
+ if (state->bytes_total_is_valid && state->bytes_done > state->bytes_total)
+ val[nparam++] = state->bytes_done;
+
+ pgstat_progress_update_multi_param(nparam, index, val);
+}
+
+/*
+ * Advertise that we are waiting for the start-of-backup checkpoint.
+ */
+void
+basebackup_progress_wait_checkpoint(void)
+{
+ pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE,
+ PROGRESS_BASEBACKUP_PHASE_WAIT_CHECKPOINT);
+}
+
+/*
+ * Advertise that we are estimating the backup size.
+ */
+void
+basebackup_progress_estimate_backup_size(void)
+{
+ pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE,
+ PROGRESS_BASEBACKUP_PHASE_ESTIMATE_BACKUP_SIZE);
+}
+
+/*
+ * Advertise that we are waiting for WAL archiving at end-of-backup.
+ */
+void
+basebackup_progress_wait_wal_archive(bbsink_state *state)
+{
+ const int index[] = {
+ PROGRESS_BASEBACKUP_PHASE,
+ PROGRESS_BASEBACKUP_TBLSPC_STREAMED
+ };
+ int64 val[2];
+
+ /*
+ * We report having finished all tablespaces at this point, even if the
+ * archive for the main tablespace is still open, because what's going to
+ * be added is WAL files, not files that are really from the main
+ * tablespace.
+ */
+ val[0] = PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE;
+ val[1] = list_length(state->tablespaces);
+ pgstat_progress_update_multi_param(2, index, val);
+}
+
+/*
+ * Advertise that we are transferring WAL files into the final archive.
+ */
+void
+basebackup_progress_transfer_wal(void)
+{
+ pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE,
+ PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL);
+}
+
+/*
+ * Advertise that we are no longer performing a backup.
+ */
+void
+basebackup_progress_done(void)
+{
+ pgstat_progress_end_command();
+}
diff --git a/src/backend/replication/basebackup_sink.c b/src/backend/replication/basebackup_sink.c
new file mode 100644
index 00000000000..4a47854f81f
--- /dev/null
+++ b/src/backend/replication/basebackup_sink.c
@@ -0,0 +1,125 @@
+/*-------------------------------------------------------------------------
+ *
+ * basebackup_sink.c
+ * Default implementations for bbsink (basebackup sink) callbacks.
+ *
+ * Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group
+ *
+ * src/backend/replication/basebackup_sink.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "replication/basebackup_sink.h"
+
+/*
+ * Forward begin_backup callback.
+ *
+ * Only use this implementation if you want the bbsink you're implementing to
+ * share a buffer with the succesor bbsink.
+ */
+void
+bbsink_forward_begin_backup(bbsink *sink)
+{
+ Assert(sink->bbs_next != NULL);
+ Assert(sink->bbs_state != NULL);
+ bbsink_begin_backup(sink->bbs_next, sink->bbs_state,
+ sink->bbs_buffer_length);
+ sink->bbs_buffer = sink->bbs_next->bbs_buffer;
+}
+
+/*
+ * Forward begin_archive callback.
+ */
+void
+bbsink_forward_begin_archive(bbsink *sink, const char *archive_name)
+{
+ Assert(sink->bbs_next != NULL);
+ bbsink_begin_archive(sink->bbs_next, archive_name);
+}
+
+/*
+ * Forward archive_contents callback.
+ *
+ * Code that wants to use this should initalize its own bbs_buffer and
+ * bbs_buffer_length fields to the values from the successor sink. In cases
+ * where the buffer isn't shared, the data needs to be copied before forwarding
+ * the callback. We don't do try to do that here, because there's really no
+ * reason to have separately allocated buffers containing the same identical
+ * data.
+ */
+void
+bbsink_forward_archive_contents(bbsink *sink, size_t len)
+{
+ Assert(sink->bbs_next != NULL);
+ Assert(sink->bbs_buffer == sink->bbs_next->bbs_buffer);
+ Assert(sink->bbs_buffer_length == sink->bbs_next->bbs_buffer_length);
+ bbsink_archive_contents(sink->bbs_next, len);
+}
+
+/*
+ * Forward end_archive callback.
+ */
+void
+bbsink_forward_end_archive(bbsink *sink)
+{
+ Assert(sink->bbs_next != NULL);
+ bbsink_end_archive(sink->bbs_next);
+}
+
+/*
+ * Forward begin_manifest callback.
+ */
+void
+bbsink_forward_begin_manifest(bbsink *sink)
+{
+ Assert(sink->bbs_next != NULL);
+ bbsink_begin_manifest(sink->bbs_next);
+}
+
+/*
+ * Forward manifest_contents callback.
+ *
+ * As with the archive_contents callback, it's expected that the buffer is
+ * shared.
+ */
+void
+bbsink_forward_manifest_contents(bbsink *sink, size_t len)
+{
+ Assert(sink->bbs_next != NULL);
+ Assert(sink->bbs_buffer == sink->bbs_next->bbs_buffer);
+ Assert(sink->bbs_buffer_length == sink->bbs_next->bbs_buffer_length);
+ bbsink_manifest_contents(sink->bbs_next, len);
+}
+
+/*
+ * Forward end_manifest callback.
+ */
+void
+bbsink_forward_end_manifest(bbsink *sink)
+{
+ Assert(sink->bbs_next != NULL);
+ bbsink_end_manifest(sink->bbs_next);
+}
+
+/*
+ * Forward end_backup callback.
+ */
+void
+bbsink_forward_end_backup(bbsink *sink, XLogRecPtr endptr, TimeLineID endtli)
+{
+ Assert(sink->bbs_next != NULL);
+ bbsink_end_backup(sink->bbs_next, endptr, endtli);
+}
+
+/*
+ * Forward cleanup callback.
+ */
+void
+bbsink_forward_cleanup(bbsink *sink)
+{
+ Assert(sink->bbs_next != NULL);
+ bbsink_cleanup(sink->bbs_next);
+}
diff --git a/src/backend/replication/basebackup_throttle.c b/src/backend/replication/basebackup_throttle.c
new file mode 100644
index 00000000000..f163931f8a3
--- /dev/null
+++ b/src/backend/replication/basebackup_throttle.c
@@ -0,0 +1,199 @@
+/*-------------------------------------------------------------------------
+ *
+ * basebackup_throttle.c
+ * Basebackup sink implementing throttling. Data is forwarded to the
+ * next base backup sink in the chain at a rate no greater than the
+ * configured maximum.
+ *
+ * Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/replication/basebackup_throttle.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "miscadmin.h"
+#include "replication/basebackup_sink.h"
+#include "pgstat.h"
+#include "storage/latch.h"
+#include "utils/timestamp.h"
+
+typedef struct bbsink_throttle
+{
+ /* Common information for all types of sink. */
+ bbsink base;
+
+ /* The actual number of bytes, transfer of which may cause sleep. */
+ uint64 throttling_sample;
+
+ /* Amount of data already transferred but not yet throttled. */
+ int64 throttling_counter;
+
+ /* The minimum time required to transfer throttling_sample bytes. */
+ TimeOffset elapsed_min_unit;
+
+ /* The last check of the transfer rate. */
+ TimestampTz throttled_last;
+} bbsink_throttle;
+
+static void bbsink_throttle_begin_backup(bbsink *sink);
+static void bbsink_throttle_archive_contents(bbsink *sink, size_t len);
+static void bbsink_throttle_manifest_contents(bbsink *sink, size_t len);
+static void throttle(bbsink_throttle *sink, size_t increment);
+
+const bbsink_ops bbsink_throttle_ops = {
+ .begin_backup = bbsink_throttle_begin_backup,
+ .begin_archive = bbsink_forward_begin_archive,
+ .archive_contents = bbsink_throttle_archive_contents,
+ .end_archive = bbsink_forward_end_archive,
+ .begin_manifest = bbsink_forward_begin_manifest,
+ .manifest_contents = bbsink_throttle_manifest_contents,
+ .end_manifest = bbsink_forward_end_manifest,
+ .end_backup = bbsink_forward_end_backup,
+ .cleanup = bbsink_forward_cleanup
+};
+
+/*
+ * How frequently to throttle, as a fraction of the specified rate-second.
+ */
+#define THROTTLING_FREQUENCY 8
+
+/*
+ * Create a new basebackup sink that performs throttling and forwards data
+ * to a successor sink.
+ */
+bbsink *
+bbsink_throttle_new(bbsink *next, uint32 maxrate)
+{
+ bbsink_throttle *sink;
+
+ Assert(next != NULL);
+ Assert(maxrate > 0);
+
+ sink = palloc0(sizeof(bbsink_throttle));
+ *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_throttle_ops;
+ sink->base.bbs_next = next;
+
+ sink->throttling_sample =
+ (int64) maxrate * (int64) 1024 / THROTTLING_FREQUENCY;
+
+ /*
+ * The minimum amount of time for throttling_sample bytes to be
+ * transferred.
+ */
+ sink->elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY;
+
+ return &sink->base;
+}
+
+/*
+ * There's no real work to do here, but we need to record the current time so
+ * that it can be used for future calculations.
+ */
+static void
+bbsink_throttle_begin_backup(bbsink *sink)
+{
+ bbsink_throttle *mysink = (bbsink_throttle *) sink;
+
+ bbsink_forward_begin_backup(sink);
+
+ /* The 'real data' starts now (header was ignored). */
+ mysink->throttled_last = GetCurrentTimestamp();
+}
+
+/*
+ * First throttle, and then pass archive contents to next sink.
+ */
+static void
+bbsink_throttle_archive_contents(bbsink *sink, size_t len)
+{
+ throttle((bbsink_throttle *) sink, len);
+
+ bbsink_forward_archive_contents(sink, len);
+}
+
+/*
+ * First throttle, and then pass manifest contents to next sink.
+ */
+static void
+bbsink_throttle_manifest_contents(bbsink *sink, size_t len)
+{
+ throttle((bbsink_throttle *) sink, len);
+
+ bbsink_forward_manifest_contents(sink->bbs_next, len);
+}
+
+/*
+ * Increment the network transfer counter by the given number of bytes,
+ * and sleep if necessary to comply with the requested network transfer
+ * rate.
+ */
+static void
+throttle(bbsink_throttle *sink, size_t increment)
+{
+ TimeOffset elapsed_min;
+
+ Assert(sink->throttling_counter >= 0);
+
+ sink->throttling_counter += increment;
+ if (sink->throttling_counter < sink->throttling_sample)
+ return;
+
+ /* How much time should have elapsed at minimum? */
+ elapsed_min = sink->elapsed_min_unit *
+ (sink->throttling_counter / sink->throttling_sample);
+
+ /*
+ * Since the latch could be set repeatedly because of concurrently WAL
+ * activity, sleep in a loop to ensure enough time has passed.
+ */
+ for (;;)
+ {
+ TimeOffset elapsed,
+ sleep;
+ int wait_result;
+
+ /* Time elapsed since the last measurement (and possible wake up). */
+ elapsed = GetCurrentTimestamp() - sink->throttled_last;
+
+ /* sleep if the transfer is faster than it should be */
+ sleep = elapsed_min - elapsed;
+ if (sleep <= 0)
+ break;
+
+ ResetLatch(MyLatch);
+
+ /* We're eating a potentially set latch, so check for interrupts */
+ CHECK_FOR_INTERRUPTS();
+
+ /*
+ * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be
+ * the maximum time to sleep. Thus the cast to long is safe.
+ */
+ wait_result = WaitLatch(MyLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+ (long) (sleep / 1000),
+ WAIT_EVENT_BASE_BACKUP_THROTTLE);
+
+ if (wait_result & WL_LATCH_SET)
+ CHECK_FOR_INTERRUPTS();
+
+ /* Done waiting? */
+ if (wait_result & WL_TIMEOUT)
+ break;
+ }
+
+ /*
+ * As we work with integers, only whole multiple of throttling_sample was
+ * processed. The rest will be done during the next call of this function.
+ */
+ sink->throttling_counter %= sink->throttling_sample;
+
+ /*
+ * Time interval for the remaining amount and possible next increments
+ * starts now.
+ */
+ sink->throttled_last = GetCurrentTimestamp();
+}
diff --git a/src/include/replication/backup_manifest.h b/src/include/replication/backup_manifest.h
index 099108910ce..16ed7eec9bb 100644
--- a/src/include/replication/backup_manifest.h
+++ b/src/include/replication/backup_manifest.h
@@ -12,9 +12,9 @@
#ifndef BACKUP_MANIFEST_H
#define BACKUP_MANIFEST_H
-#include "access/xlogdefs.h"
#include "common/checksum_helper.h"
#include "pgtime.h"
+#include "replication/basebackup_sink.h"
#include "storage/buffile.h"
typedef enum manifest_option
@@ -47,7 +47,8 @@ extern void AddWALInfoToBackupManifest(backup_manifest_info *manifest,
XLogRecPtr startptr,
TimeLineID starttli, XLogRecPtr endptr,
TimeLineID endtli);
-extern void SendBackupManifest(backup_manifest_info *manifest);
+
+extern void SendBackupManifest(backup_manifest_info *manifest, bbsink *sink);
extern void FreeBackupManifest(backup_manifest_info *manifest);
#endif
diff --git a/src/include/replication/basebackup_sink.h b/src/include/replication/basebackup_sink.h
new file mode 100644
index 00000000000..e6c073c5674
--- /dev/null
+++ b/src/include/replication/basebackup_sink.h
@@ -0,0 +1,296 @@
+/*-------------------------------------------------------------------------
+ *
+ * basebackup_sink.h
+ * API for filtering or sending to a final destination the archives
+ * produced by the base backup process
+ *
+ * Taking a base backup produces one archive per tablespace directory,
+ * plus a backup manifest unless that feature has been disabled. The
+ * goal of the backup process is to put those archives and that manifest
+ * someplace, possibly after postprocessing them in some way. A 'bbsink'
+ * is an object to which those archives, and the manifest if present,
+ * can be sent.
+ *
+ * In practice, there will be a chain of 'bbsink' objects rather than
+ * just one, with callbacks being forwarded from one to the next,
+ * possibly with modification. Each object is responsible for a
+ * single task e.g. command progress reporting, throttling, or
+ * communication with the client.
+ *
+ * Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group
+ *
+ * src/include/replication/basebackup_sink.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef BASEBACKUP_SINK_H
+#define BASEBACKUP_SINK_H
+
+#include "access/xlog_internal.h"
+#include "nodes/pg_list.h"
+
+/* Forward declarations. */
+struct bbsink;
+struct bbsink_ops;
+typedef struct bbsink bbsink;
+typedef struct bbsink_ops bbsink_ops;
+
+/*
+ * Overall backup state shared by all bbsink objects for a backup.
+ *
+ * Before calling bbstate_begin_backup, caller must initiate a bbsink_state
+ * object which will last for the lifetime of the backup, and must thereafter
+ * update it as required before each new call to a bbsink method. The bbsink
+ * will retain a pointer to the state object and will consult it to understand
+ * the progress of the backup.
+ *
+ * 'tablespaces' is a list of tablespaceinfo objects. It must be set before
+ * calling bbstate_begin_backup() and must not be modified thereafter.
+ *
+ * 'tablespace_num' is the index of the current tablespace within the list
+ * stored in 'tablespaces'.
+ *
+ * 'bytes_done' is the number of bytes read so far from $PGDATA.
+ *
+ * 'bytes_total' is the total number of bytes estimated to be present in
+ * $PGDATA, if we have estimated this.
+ *
+ * 'bytes_total_is_valid' is true if and only if a proper estimate has been
+ * stored into 'bytes_total'.
+ *
+ * 'startptr' and 'starttli' identify the point in the WAL stream at which
+ * the backup began. They must be set before calling bbstate_begin_backup()
+ * and must not be modified thereafter.
+ */
+typedef struct bbsink_state
+{
+ List *tablespaces;
+ int tablespace_num;
+ uint64 bytes_done;
+ uint64 bytes_total;
+ bool bytes_total_is_valid;
+ XLogRecPtr startptr;
+ TimeLineID starttli;
+} bbsink_state;
+
+/*
+ * Common data for any type of basebackup sink.
+ *
+ * 'bbs_ops' is the relevant callback table.
+ *
+ * 'bbs_buffer' is the buffer into which data destined for the bbsink
+ * should be stored. It must be a multiple of BLCKSZ.
+ *
+ * 'bbs_buffer_length' is the allocated length of the buffer.
+ *
+ * 'bbs_next' is a pointer to another bbsink to which this bbsink is
+ * forwarding some or all operations.
+ *
+ * 'bbs_state' is a pointer to the bbsink_state object for this backup.
+ * Every bbsink associated with this backup should point to the same
+ * underlying state object.
+ *
+ * In general it is expected that the values of these fields are set when
+ * a bbsink is created and that they do not change thereafter. It's OK
+ * to modify the data to which bbs_buffer or bbs_state point, but no changes
+ * should be made to the contents of this struct.
+ */
+struct bbsink
+{
+ const bbsink_ops *bbs_ops;
+ char *bbs_buffer;
+ size_t bbs_buffer_length;
+ bbsink *bbs_next;
+ bbsink_state *bbs_state;
+};
+
+/*
+ * Callbacks for a base backup sink.
+ *
+ * All of these callbacks are required. If a particular callback just needs to
+ * forward the call to sink->bbs_next, use bbsink_forward_<callback_name> as
+ * the callback.
+ *
+ * Callers should always invoke these callbacks via the bbsink_* inline
+ * functions rather than calling them directly.
+ */
+struct bbsink_ops
+{
+ /*
+ * This callback is invoked just once, at the very start of the backup. It
+ * must set bbs_buffer to point to a chunk of storage where at least
+ * bbs_buffer_length bytes of data can be written.
+ */
+ void (*begin_backup) (bbsink *sink);
+
+ /*
+ * For each archive transmitted to a bbsink, there will be one call to the
+ * begin_archive() callback, some number of calls to the
+ * archive_contents() callback, and then one call to the end_archive()
+ * callback.
+ *
+ * Before invoking the archive_contents() callback, the caller should copy
+ * a number of bytes equal to what will be passed as len into bbs_buffer,
+ * but not more than bbs_buffer_length.
+ *
+ * It's generally good if the buffer is as full as possible before the
+ * archive_contents() callback is invoked, but it's not worth expending
+ * extra cycles to make sure it's absolutely 100% full.
+ */
+ void (*begin_archive) (bbsink *sink, const char *archive_name);
+ void (*archive_contents) (bbsink *sink, size_t len);
+ void (*end_archive) (bbsink *sink);
+
+ /*
+ * If a backup manifest is to be transmitted to a bbsink, there will be
+ * one call to the begin_manifest() callback, some number of calls to the
+ * manifest_contents() callback, and then one call to the end_manifest()
+ * callback. These calls will occur after all archives are transmitted.
+ *
+ * The rules for invoking the manifest_contents() callback are the same as
+ * for the archive_contents() callback above.
+ */
+ void (*begin_manifest) (bbsink *sink);
+ void (*manifest_contents) (bbsink *sink, size_t len);
+ void (*end_manifest) (bbsink *sink);
+
+ /*
+ * This callback is invoked just once, after all archives and the manifest
+ * have been sent.
+ */
+ void (*end_backup) (bbsink *sink, XLogRecPtr endptr, TimeLineID endtli);
+
+ /*
+ * If a backup is aborted by an error, this callback is invoked before the
+ * bbsink object is destroyed, so that it can release any resources that
+ * would not be released automatically. If no error occurs, this callback
+ * is invoked after the end_backup callback.
+ */
+ void (*cleanup) (bbsink *sink);
+};
+
+/* Begin a backup. */
+static inline void
+bbsink_begin_backup(bbsink *sink, bbsink_state *state, int buffer_length)
+{
+ Assert(sink != NULL);
+
+ Assert(buffer_length > 0);
+
+ sink->bbs_state = state;
+ sink->bbs_buffer_length = buffer_length;
+ sink->bbs_ops->begin_backup(sink);
+
+ Assert(sink->bbs_buffer != NULL);
+ Assert((sink->bbs_buffer_length % BLCKSZ) == 0);
+}
+
+/* Begin an archive. */
+static inline void
+bbsink_begin_archive(bbsink *sink, const char *archive_name)
+{
+ Assert(sink != NULL);
+
+ sink->bbs_ops->begin_archive(sink, archive_name);
+}
+
+/* Process some of the contents of an archive. */
+static inline void
+bbsink_archive_contents(bbsink *sink, size_t len)
+{
+ Assert(sink != NULL);
+
+ /*
+ * The caller should make a reasonable attempt to fill the buffer before
+ * calling this function, so it shouldn't be completely empty. Nor should
+ * it be filled beyond capacity.
+ */
+ Assert(len > 0 && len <= sink->bbs_buffer_length);
+
+ sink->bbs_ops->archive_contents(sink, len);
+}
+
+/* Finish an archive. */
+static inline void
+bbsink_end_archive(bbsink *sink)
+{
+ Assert(sink != NULL);
+
+ sink->bbs_ops->end_archive(sink);
+}
+
+/* Begin the backup manifest. */
+static inline void
+bbsink_begin_manifest(bbsink *sink)
+{
+ Assert(sink != NULL);
+
+ sink->bbs_ops->begin_manifest(sink);
+}
+
+/* Process some of the manifest contents. */
+static inline void
+bbsink_manifest_contents(bbsink *sink, size_t len)
+{
+ Assert(sink != NULL);
+
+ /* See comments in bbsink_archive_contents. */
+ Assert(len > 0 && len <= sink->bbs_buffer_length);
+
+ sink->bbs_ops->manifest_contents(sink, len);
+}
+
+/* Finish the backup manifest. */
+static inline void
+bbsink_end_manifest(bbsink *sink)
+{
+ Assert(sink != NULL);
+
+ sink->bbs_ops->end_manifest(sink);
+}
+
+/* Finish a backup. */
+static inline void
+bbsink_end_backup(bbsink *sink, XLogRecPtr endptr, TimeLineID endtli)
+{
+ Assert(sink != NULL);
+ Assert(sink->bbs_state->tablespace_num == list_length(sink->bbs_state->tablespaces));
+
+ sink->bbs_ops->end_backup(sink, endptr, endtli);
+}
+
+/* Release resources before destruction. */
+static inline void
+bbsink_cleanup(bbsink *sink)
+{
+ Assert(sink != NULL);
+
+ sink->bbs_ops->cleanup(sink);
+}
+
+/* Forwarding callbacks. Use these to pass operations through to next sink. */
+extern void bbsink_forward_begin_backup(bbsink *sink);
+extern void bbsink_forward_begin_archive(bbsink *sink,
+ const char *archive_name);
+extern void bbsink_forward_archive_contents(bbsink *sink, size_t len);
+extern void bbsink_forward_end_archive(bbsink *sink);
+extern void bbsink_forward_begin_manifest(bbsink *sink);
+extern void bbsink_forward_manifest_contents(bbsink *sink, size_t len);
+extern void bbsink_forward_end_manifest(bbsink *sink);
+extern void bbsink_forward_end_backup(bbsink *sink, XLogRecPtr endptr,
+ TimeLineID endtli);
+extern void bbsink_forward_cleanup(bbsink *sink);
+
+/* Constructors for various types of sinks. */
+extern bbsink *bbsink_copytblspc_new(void);
+extern bbsink *bbsink_progress_new(bbsink *next, bool estimate_backup_size);
+extern bbsink *bbsink_throttle_new(bbsink *next, uint32 maxrate);
+
+/* Extra interface functions for progress reporting. */
+extern void basebackup_progress_wait_checkpoint(void);
+extern void basebackup_progress_estimate_backup_size(void);
+extern void basebackup_progress_wait_wal_archive(bbsink_state *);
+extern void basebackup_progress_transfer_wal(void);
+extern void basebackup_progress_done(void);
+
+#endif