aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication')
-rw-r--r--src/backend/replication/Makefile11
-rw-r--r--src/backend/replication/backup_manifest.c401
-rw-r--r--src/backend/replication/basebackup.c1829
-rw-r--r--src/backend/replication/basebackup_copy.c420
-rw-r--r--src/backend/replication/basebackup_gzip.c308
-rw-r--r--src/backend/replication/basebackup_lz4.c301
-rw-r--r--src/backend/replication/basebackup_progress.c246
-rw-r--r--src/backend/replication/basebackup_server.c309
-rw-r--r--src/backend/replication/basebackup_sink.c125
-rw-r--r--src/backend/replication/basebackup_target.c241
-rw-r--r--src/backend/replication/basebackup_throttle.c199
-rw-r--r--src/backend/replication/basebackup_zstd.c316
-rw-r--r--src/backend/replication/walsender.c2
13 files changed, 1 insertions, 4707 deletions
diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile
index 3d8fb70c0e3..2bffac58c0d 100644
--- a/src/backend/replication/Makefile
+++ b/src/backend/replication/Makefile
@@ -15,17 +15,6 @@ include $(top_builddir)/src/Makefile.global
override CPPFLAGS := -I. -I$(srcdir) $(CPPFLAGS)
OBJS = \
- backup_manifest.o \
- basebackup.o \
- basebackup_copy.o \
- basebackup_gzip.o \
- basebackup_lz4.o \
- basebackup_zstd.o \
- basebackup_progress.o \
- basebackup_server.o \
- basebackup_sink.o \
- basebackup_target.o \
- basebackup_throttle.o \
repl_gram.o \
slot.o \
slotfuncs.o \
diff --git a/src/backend/replication/backup_manifest.c b/src/backend/replication/backup_manifest.c
deleted file mode 100644
index d47ab4c41e3..00000000000
--- a/src/backend/replication/backup_manifest.c
+++ /dev/null
@@ -1,401 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * backup_manifest.c
- * code for generating and sending a backup manifest
- *
- * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
- *
- * IDENTIFICATION
- * src/backend/replication/backup_manifest.c
- *
- *-------------------------------------------------------------------------
- */
-#include "postgres.h"
-
-#include "access/timeline.h"
-#include "libpq/libpq.h"
-#include "libpq/pqformat.h"
-#include "mb/pg_wchar.h"
-#include "replication/backup_manifest.h"
-#include "replication/basebackup_sink.h"
-#include "utils/builtins.h"
-#include "utils/json.h"
-
-static void AppendStringToManifest(backup_manifest_info *manifest, char *s);
-
-/*
- * Does the user want a backup manifest?
- *
- * It's simplest to always have a manifest_info object, so that we don't need
- * checks for NULL pointers in too many places. However, if the user doesn't
- * want a manifest, we set manifest->buffile to NULL.
- */
-static inline bool
-IsManifestEnabled(backup_manifest_info *manifest)
-{
- return (manifest->buffile != NULL);
-}
-
-/*
- * Convenience macro for appending data to the backup manifest.
- */
-#define AppendToManifest(manifest, ...) \
- { \
- char *_manifest_s = psprintf(__VA_ARGS__); \
- AppendStringToManifest(manifest, _manifest_s); \
- pfree(_manifest_s); \
- }
-
-/*
- * Initialize state so that we can construct a backup manifest.
- *
- * NB: Although the checksum type for the data files is configurable, the
- * checksum for the manifest itself always uses SHA-256. See comments in
- * SendBackupManifest.
- */
-void
-InitializeBackupManifest(backup_manifest_info *manifest,
- backup_manifest_option want_manifest,
- pg_checksum_type manifest_checksum_type)
-{
- memset(manifest, 0, sizeof(backup_manifest_info));
- manifest->checksum_type = manifest_checksum_type;
-
- if (want_manifest == MANIFEST_OPTION_NO)
- manifest->buffile = NULL;
- else
- {
- manifest->buffile = BufFileCreateTemp(false);
- manifest->manifest_ctx = pg_cryptohash_create(PG_SHA256);
- if (pg_cryptohash_init(manifest->manifest_ctx) < 0)
- elog(ERROR, "failed to initialize checksum of backup manifest: %s",
- pg_cryptohash_error(manifest->manifest_ctx));
- }
-
- manifest->manifest_size = UINT64CONST(0);
- manifest->force_encode = (want_manifest == MANIFEST_OPTION_FORCE_ENCODE);
- manifest->first_file = true;
- manifest->still_checksumming = true;
-
- if (want_manifest != MANIFEST_OPTION_NO)
- AppendToManifest(manifest,
- "{ \"PostgreSQL-Backup-Manifest-Version\": 1,\n"
- "\"Files\": [");
-}
-
-/*
- * Free resources assigned to a backup manifest constructed.
- */
-void
-FreeBackupManifest(backup_manifest_info *manifest)
-{
- pg_cryptohash_free(manifest->manifest_ctx);
- manifest->manifest_ctx = NULL;
-}
-
-/*
- * Add an entry to the backup manifest for a file.
- */
-void
-AddFileToBackupManifest(backup_manifest_info *manifest, const char *spcoid,
- const char *pathname, size_t size, pg_time_t mtime,
- pg_checksum_context *checksum_ctx)
-{
- char pathbuf[MAXPGPATH];
- int pathlen;
- StringInfoData buf;
-
- if (!IsManifestEnabled(manifest))
- return;
-
- /*
- * If this file is part of a tablespace, the pathname passed to this
- * function will be relative to the tar file that contains it. We want the
- * pathname relative to the data directory (ignoring the intermediate
- * symlink traversal).
- */
- if (spcoid != NULL)
- {
- snprintf(pathbuf, sizeof(pathbuf), "pg_tblspc/%s/%s", spcoid,
- pathname);
- pathname = pathbuf;
- }
-
- /*
- * Each file's entry needs to be separated from any entry that follows by
- * a comma, but there's no comma before the first one or after the last
- * one. To make that work, adding a file to the manifest starts by
- * terminating the most recently added line, with a comma if appropriate,
- * but does not terminate the line inserted for this file.
- */
- initStringInfo(&buf);
- if (manifest->first_file)
- {
- appendStringInfoChar(&buf, '\n');
- manifest->first_file = false;
- }
- else
- appendStringInfoString(&buf, ",\n");
-
- /*
- * Write the relative pathname to this file out to the manifest. The
- * manifest is always stored in UTF-8, so we have to encode paths that are
- * not valid in that encoding.
- */
- pathlen = strlen(pathname);
- if (!manifest->force_encode &&
- pg_verify_mbstr(PG_UTF8, pathname, pathlen, true))
- {
- appendStringInfoString(&buf, "{ \"Path\": ");
- escape_json(&buf, pathname);
- appendStringInfoString(&buf, ", ");
- }
- else
- {
- appendStringInfoString(&buf, "{ \"Encoded-Path\": \"");
- enlargeStringInfo(&buf, 2 * pathlen);
- buf.len += hex_encode(pathname, pathlen,
- &buf.data[buf.len]);
- appendStringInfoString(&buf, "\", ");
- }
-
- appendStringInfo(&buf, "\"Size\": %zu, ", size);
-
- /*
- * Convert last modification time to a string and append it to the
- * manifest. Since it's not clear what time zone to use and since time
- * zone definitions can change, possibly causing confusion, use GMT
- * always.
- */
- appendStringInfoString(&buf, "\"Last-Modified\": \"");
- enlargeStringInfo(&buf, 128);
- buf.len += pg_strftime(&buf.data[buf.len], 128, "%Y-%m-%d %H:%M:%S %Z",
- pg_gmtime(&mtime));
- appendStringInfoChar(&buf, '"');
-
- /* Add checksum information. */
- if (checksum_ctx->type != CHECKSUM_TYPE_NONE)
- {
- uint8 checksumbuf[PG_CHECKSUM_MAX_LENGTH];
- int checksumlen;
-
- checksumlen = pg_checksum_final(checksum_ctx, checksumbuf);
- if (checksumlen < 0)
- elog(ERROR, "could not finalize checksum of file \"%s\"",
- pathname);
-
- appendStringInfo(&buf,
- ", \"Checksum-Algorithm\": \"%s\", \"Checksum\": \"",
- pg_checksum_type_name(checksum_ctx->type));
- enlargeStringInfo(&buf, 2 * checksumlen);
- buf.len += hex_encode((char *) checksumbuf, checksumlen,
- &buf.data[buf.len]);
- appendStringInfoChar(&buf, '"');
- }
-
- /* Close out the object. */
- appendStringInfoString(&buf, " }");
-
- /* OK, add it to the manifest. */
- AppendStringToManifest(manifest, buf.data);
-
- /* Avoid leaking memory. */
- pfree(buf.data);
-}
-
-/*
- * Add information about the WAL that will need to be replayed when restoring
- * this backup to the manifest.
- */
-void
-AddWALInfoToBackupManifest(backup_manifest_info *manifest, XLogRecPtr startptr,
- TimeLineID starttli, XLogRecPtr endptr,
- TimeLineID endtli)
-{
- List *timelines;
- ListCell *lc;
- bool first_wal_range = true;
- bool found_start_timeline = false;
-
- if (!IsManifestEnabled(manifest))
- return;
-
- /* Terminate the list of files. */
- AppendStringToManifest(manifest, "\n],\n");
-
- /* Read the timeline history for the ending timeline. */
- timelines = readTimeLineHistory(endtli);
-
- /* Start a list of LSN ranges. */
- AppendStringToManifest(manifest, "\"WAL-Ranges\": [\n");
-
- foreach(lc, timelines)
- {
- TimeLineHistoryEntry *entry = lfirst(lc);
- XLogRecPtr tl_beginptr;
-
- /*
- * We only care about timelines that were active during the backup.
- * Skip any that ended before the backup started. (Note that if
- * entry->end is InvalidXLogRecPtr, it means that the timeline has not
- * yet ended.)
- */
- if (!XLogRecPtrIsInvalid(entry->end) && entry->end < startptr)
- continue;
-
- /*
- * Because the timeline history file lists newer timelines before
- * older ones, the first timeline we encounter that is new enough to
- * matter ought to match the ending timeline of the backup.
- */
- if (first_wal_range && endtli != entry->tli)
- ereport(ERROR,
- errmsg("expected end timeline %u but found timeline %u",
- starttli, entry->tli));
-
- /*
- * If this timeline entry matches with the timeline on which the
- * backup started, WAL needs to be checked from the start LSN of the
- * backup. If this entry refers to a newer timeline, WAL needs to be
- * checked since the beginning of this timeline, so use the LSN where
- * the timeline began.
- */
- if (starttli == entry->tli)
- tl_beginptr = startptr;
- else
- {
- tl_beginptr = entry->begin;
-
- /*
- * If we reach a TLI that has no valid beginning LSN, there can't
- * be any more timelines in the history after this point, so we'd
- * better have arrived at the expected starting TLI. If not,
- * something's gone horribly wrong.
- */
- if (XLogRecPtrIsInvalid(entry->begin))
- ereport(ERROR,
- errmsg("expected start timeline %u but found timeline %u",
- starttli, entry->tli));
- }
-
- AppendToManifest(manifest,
- "%s{ \"Timeline\": %u, \"Start-LSN\": \"%X/%X\", \"End-LSN\": \"%X/%X\" }",
- first_wal_range ? "" : ",\n",
- entry->tli,
- LSN_FORMAT_ARGS(tl_beginptr),
- LSN_FORMAT_ARGS(endptr));
-
- if (starttli == entry->tli)
- {
- found_start_timeline = true;
- break;
- }
-
- endptr = entry->begin;
- first_wal_range = false;
- }
-
- /*
- * The last entry in the timeline history for the ending timeline should
- * be the ending timeline itself. Verify that this is what we observed.
- */
- if (!found_start_timeline)
- ereport(ERROR,
- errmsg("start timeline %u not found in history of timeline %u",
- starttli, endtli));
-
- /* Terminate the list of WAL ranges. */
- AppendStringToManifest(manifest, "\n],\n");
-}
-
-/*
- * Finalize the backup manifest, and send it to the client.
- */
-void
-SendBackupManifest(backup_manifest_info *manifest, bbsink *sink)
-{
- uint8 checksumbuf[PG_SHA256_DIGEST_LENGTH];
- char checksumstringbuf[PG_SHA256_DIGEST_STRING_LENGTH];
- size_t manifest_bytes_done = 0;
-
- if (!IsManifestEnabled(manifest))
- return;
-
- /*
- * Append manifest checksum, so that the problems with the manifest itself
- * can be detected.
- *
- * We always use SHA-256 for this, regardless of what algorithm is chosen
- * for checksumming the files. If we ever want to make the checksum
- * algorithm used for the manifest file variable, the client will need a
- * way to figure out which algorithm to use as close to the beginning of
- * the manifest file as possible, to avoid having to read the whole thing
- * twice.
- */
- manifest->still_checksumming = false;
- if (pg_cryptohash_final(manifest->manifest_ctx, checksumbuf,
- sizeof(checksumbuf)) < 0)
- elog(ERROR, "failed to finalize checksum of backup manifest: %s",
- pg_cryptohash_error(manifest->manifest_ctx));
- AppendStringToManifest(manifest, "\"Manifest-Checksum\": \"");
-
- hex_encode((char *) checksumbuf, sizeof checksumbuf, checksumstringbuf);
- checksumstringbuf[PG_SHA256_DIGEST_STRING_LENGTH - 1] = '\0';
-
- AppendStringToManifest(manifest, checksumstringbuf);
- AppendStringToManifest(manifest, "\"}\n");
-
- /*
- * We've written all the data to the manifest file. Rewind the file so
- * that we can read it all back.
- */
- if (BufFileSeek(manifest->buffile, 0, 0L, SEEK_SET))
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not rewind temporary file")));
-
-
- /*
- * Send the backup manifest.
- */
- bbsink_begin_manifest(sink);
- while (manifest_bytes_done < manifest->manifest_size)
- {
- size_t bytes_to_read;
- size_t rc;
-
- bytes_to_read = Min(sink->bbs_buffer_length,
- manifest->manifest_size - manifest_bytes_done);
- rc = BufFileRead(manifest->buffile, sink->bbs_buffer,
- bytes_to_read);
- if (rc != bytes_to_read)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not read from temporary file: %m")));
- bbsink_manifest_contents(sink, bytes_to_read);
- manifest_bytes_done += bytes_to_read;
- }
- bbsink_end_manifest(sink);
-
- /* Release resources */
- BufFileClose(manifest->buffile);
-}
-
-/*
- * Append a cstring to the manifest.
- */
-static void
-AppendStringToManifest(backup_manifest_info *manifest, char *s)
-{
- int len = strlen(s);
-
- Assert(manifest != NULL);
- if (manifest->still_checksumming)
- {
- if (pg_cryptohash_update(manifest->manifest_ctx, (uint8 *) s, len) < 0)
- elog(ERROR, "failed to update checksum of backup manifest: %s",
- pg_cryptohash_error(manifest->manifest_ctx));
- }
- BufFileWrite(manifest->buffile, s, len);
- manifest->manifest_size += len;
-}
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
deleted file mode 100644
index deeddd09a9c..00000000000
--- a/src/backend/replication/basebackup.c
+++ /dev/null
@@ -1,1829 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * basebackup.c
- * code for taking a base backup and streaming it to a standby
- *
- * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
- *
- * IDENTIFICATION
- * src/backend/replication/basebackup.c
- *
- *-------------------------------------------------------------------------
- */
-#include "postgres.h"
-
-#include <sys/stat.h>
-#include <unistd.h>
-#include <time.h>
-
-#include "access/xlog_internal.h" /* for pg_backup_start/stop */
-#include "common/compression.h"
-#include "common/file_perm.h"
-#include "commands/defrem.h"
-#include "lib/stringinfo.h"
-#include "miscadmin.h"
-#include "nodes/pg_list.h"
-#include "pgstat.h"
-#include "pgtar.h"
-#include "port.h"
-#include "postmaster/syslogger.h"
-#include "replication/basebackup.h"
-#include "replication/basebackup_sink.h"
-#include "replication/basebackup_target.h"
-#include "replication/backup_manifest.h"
-#include "replication/walsender.h"
-#include "replication/walsender_private.h"
-#include "storage/bufpage.h"
-#include "storage/checksum.h"
-#include "storage/dsm_impl.h"
-#include "storage/fd.h"
-#include "storage/ipc.h"
-#include "storage/reinit.h"
-#include "utils/builtins.h"
-#include "utils/ps_status.h"
-#include "utils/relcache.h"
-#include "utils/resowner.h"
-#include "utils/timestamp.h"
-
-/*
- * How much data do we want to send in one CopyData message? Note that
- * this may also result in reading the underlying files in chunks of this
- * size.
- *
- * NB: The buffer size is required to be a multiple of the system block
- * size, so use that value instead if it's bigger than our preference.
- */
-#define SINK_BUFFER_LENGTH Max(32768, BLCKSZ)
-
-typedef struct
-{
- const char *label;
- bool progress;
- bool fastcheckpoint;
- bool nowait;
- bool includewal;
- uint32 maxrate;
- bool sendtblspcmapfile;
- bool send_to_client;
- bool use_copytblspc;
- BaseBackupTargetHandle *target_handle;
- backup_manifest_option manifest;
- pg_compress_algorithm compression;
- pg_compress_specification compression_specification;
- pg_checksum_type manifest_checksum_type;
-} basebackup_options;
-
-static int64 sendTablespace(bbsink *sink, char *path, char *oid, bool sizeonly,
- struct backup_manifest_info *manifest);
-static int64 sendDir(bbsink *sink, const char *path, int basepathlen, bool sizeonly,
- List *tablespaces, bool sendtblspclinks,
- backup_manifest_info *manifest, const char *spcoid);
-static bool sendFile(bbsink *sink, const char *readfilename, const char *tarfilename,
- struct stat *statbuf, bool missing_ok, Oid dboid,
- backup_manifest_info *manifest, const char *spcoid);
-static void sendFileWithContent(bbsink *sink, const char *filename,
- const char *content,
- backup_manifest_info *manifest);
-static int64 _tarWriteHeader(bbsink *sink, const char *filename,
- const char *linktarget, struct stat *statbuf,
- bool sizeonly);
-static void _tarWritePadding(bbsink *sink, int len);
-static void convert_link_to_directory(const char *pathbuf, struct stat *statbuf);
-static void perform_base_backup(basebackup_options *opt, bbsink *sink);
-static void parse_basebackup_options(List *options, basebackup_options *opt);
-static int compareWalFileNames(const ListCell *a, const ListCell *b);
-static bool is_checksummed_file(const char *fullpath, const char *filename);
-static int basebackup_read_file(int fd, char *buf, size_t nbytes, off_t offset,
- const char *filename, bool partial_read_ok);
-
-/* Was the backup currently in-progress initiated in recovery mode? */
-static bool backup_started_in_recovery = false;
-
-/* Total number of checksum failures during base backup. */
-static long long int total_checksum_failures;
-
-/* Do not verify checksums. */
-static bool noverify_checksums = false;
-
-/*
- * Definition of one element part of an exclusion list, used for paths part
- * of checksum validation or base backups. "name" is the name of the file
- * or path to check for exclusion. If "match_prefix" is true, any items
- * matching the name as prefix are excluded.
- */
-struct exclude_list_item
-{
- const char *name;
- bool match_prefix;
-};
-
-/*
- * The contents of these directories are removed or recreated during server
- * start so they are not included in backups. The directories themselves are
- * kept and included as empty to preserve access permissions.
- *
- * Note: this list should be kept in sync with the filter lists in pg_rewind's
- * filemap.c.
- */
-static const char *const excludeDirContents[] =
-{
- /*
- * Skip temporary statistics files. PG_STAT_TMP_DIR must be skipped
- * because extensions like pg_stat_statements store data there.
- */
- PG_STAT_TMP_DIR,
-
- /*
- * It is generally not useful to backup the contents of this directory
- * even if the intention is to restore to another primary. See backup.sgml
- * for a more detailed description.
- */
- "pg_replslot",
-
- /* Contents removed on startup, see dsm_cleanup_for_mmap(). */
- PG_DYNSHMEM_DIR,
-
- /* Contents removed on startup, see AsyncShmemInit(). */
- "pg_notify",
-
- /*
- * Old contents are loaded for possible debugging but are not required for
- * normal operation, see SerialInit().
- */
- "pg_serial",
-
- /* Contents removed on startup, see DeleteAllExportedSnapshotFiles(). */
- "pg_snapshots",
-
- /* Contents zeroed on startup, see StartupSUBTRANS(). */
- "pg_subtrans",
-
- /* end of list */
- NULL
-};
-
-/*
- * List of files excluded from backups.
- */
-static const struct exclude_list_item excludeFiles[] =
-{
- /* Skip auto conf temporary file. */
- {PG_AUTOCONF_FILENAME ".tmp", false},
-
- /* Skip current log file temporary file */
- {LOG_METAINFO_DATAFILE_TMP, false},
-
- /*
- * Skip relation cache because it is rebuilt on startup. This includes
- * temporary files.
- */
- {RELCACHE_INIT_FILENAME, true},
-
- /*
- * backup_label and tablespace_map should not exist in a running cluster
- * capable of doing an online backup, but exclude them just in case.
- */
- {BACKUP_LABEL_FILE, false},
- {TABLESPACE_MAP, false},
-
- /*
- * If there's a backup_manifest, it belongs to a backup that was used to
- * start this server. It is *not* correct for this backup. Our
- * backup_manifest is injected into the backup separately if users want
- * it.
- */
- {"backup_manifest", false},
-
- {"postmaster.pid", false},
- {"postmaster.opts", false},
-
- /* end of list */
- {NULL, false}
-};
-
-/*
- * List of files excluded from checksum validation.
- *
- * Note: this list should be kept in sync with what pg_checksums.c
- * includes.
- */
-static const struct exclude_list_item noChecksumFiles[] = {
- {"pg_control", false},
- {"pg_filenode.map", false},
- {"pg_internal.init", true},
- {"PG_VERSION", false},
-#ifdef EXEC_BACKEND
- {"config_exec_params", true},
-#endif
- {NULL, false}
-};
-
-/*
- * Actually do a base backup for the specified tablespaces.
- *
- * This is split out mainly to avoid complaints about "variable might be
- * clobbered by longjmp" from stupider versions of gcc.
- */
-static void
-perform_base_backup(basebackup_options *opt, bbsink *sink)
-{
- bbsink_state state;
- XLogRecPtr endptr;
- TimeLineID endtli;
- StringInfo labelfile;
- StringInfo tblspc_map_file;
- backup_manifest_info manifest;
-
- /* Initial backup state, insofar as we know it now. */
- state.tablespaces = NIL;
- state.tablespace_num = 0;
- state.bytes_done = 0;
- state.bytes_total = 0;
- state.bytes_total_is_valid = false;
-
- /* we're going to use a BufFile, so we need a ResourceOwner */
- Assert(CurrentResourceOwner == NULL);
- CurrentResourceOwner = ResourceOwnerCreate(NULL, "base backup");
-
- backup_started_in_recovery = RecoveryInProgress();
-
- labelfile = makeStringInfo();
- tblspc_map_file = makeStringInfo();
- InitializeBackupManifest(&manifest, opt->manifest,
- opt->manifest_checksum_type);
-
- total_checksum_failures = 0;
-
- basebackup_progress_wait_checkpoint();
- state.startptr = do_pg_backup_start(opt->label, opt->fastcheckpoint,
- &state.starttli,
- labelfile, &state.tablespaces,
- tblspc_map_file);
-
- /*
- * Once do_pg_backup_start has been called, ensure that any failure causes
- * us to abort the backup so we don't "leak" a backup counter. For this
- * reason, *all* functionality between do_pg_backup_start() and the end of
- * do_pg_backup_stop() should be inside the error cleanup block!
- */
-
- PG_ENSURE_ERROR_CLEANUP(do_pg_abort_backup, BoolGetDatum(false));
- {
- ListCell *lc;
- tablespaceinfo *ti;
-
- /* Add a node for the base directory at the end */
- ti = palloc0(sizeof(tablespaceinfo));
- ti->size = -1;
- state.tablespaces = lappend(state.tablespaces, ti);
-
- /*
- * Calculate the total backup size by summing up the size of each
- * tablespace
- */
- if (opt->progress)
- {
- basebackup_progress_estimate_backup_size();
-
- foreach(lc, state.tablespaces)
- {
- tablespaceinfo *tmp = (tablespaceinfo *) lfirst(lc);
-
- if (tmp->path == NULL)
- tmp->size = sendDir(sink, ".", 1, true, state.tablespaces,
- true, NULL, NULL);
- else
- tmp->size = sendTablespace(sink, tmp->path, tmp->oid, true,
- NULL);
- state.bytes_total += tmp->size;
- }
- state.bytes_total_is_valid = true;
- }
-
- /* notify basebackup sink about start of backup */
- bbsink_begin_backup(sink, &state, SINK_BUFFER_LENGTH);
-
- /* Send off our tablespaces one by one */
- foreach(lc, state.tablespaces)
- {
- tablespaceinfo *ti = (tablespaceinfo *) lfirst(lc);
-
- if (ti->path == NULL)
- {
- struct stat statbuf;
- bool sendtblspclinks = true;
-
- bbsink_begin_archive(sink, "base.tar");
-
- /* In the main tar, include the backup_label first... */
- sendFileWithContent(sink, BACKUP_LABEL_FILE, labelfile->data,
- &manifest);
-
- /* Then the tablespace_map file, if required... */
- if (opt->sendtblspcmapfile)
- {
- sendFileWithContent(sink, TABLESPACE_MAP, tblspc_map_file->data,
- &manifest);
- sendtblspclinks = false;
- }
-
- /* Then the bulk of the files... */
- sendDir(sink, ".", 1, false, state.tablespaces,
- sendtblspclinks, &manifest, NULL);
-
- /* ... and pg_control after everything else. */
- if (lstat(XLOG_CONTROL_FILE, &statbuf) != 0)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not stat file \"%s\": %m",
- XLOG_CONTROL_FILE)));
- sendFile(sink, XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf,
- false, InvalidOid, &manifest, NULL);
- }
- else
- {
- char *archive_name = psprintf("%s.tar", ti->oid);
-
- bbsink_begin_archive(sink, archive_name);
-
- sendTablespace(sink, ti->path, ti->oid, false, &manifest);
- }
-
- /*
- * If we're including WAL, and this is the main data directory we
- * don't treat this as the end of the tablespace. Instead, we will
- * include the xlog files below and stop afterwards. This is safe
- * since the main data directory is always sent *last*.
- */
- if (opt->includewal && ti->path == NULL)
- {
- Assert(lnext(state.tablespaces, lc) == NULL);
- }
- else
- {
- /* Properly terminate the tarfile. */
- StaticAssertStmt(2 * TAR_BLOCK_SIZE <= BLCKSZ,
- "BLCKSZ too small for 2 tar blocks");
- memset(sink->bbs_buffer, 0, 2 * TAR_BLOCK_SIZE);
- bbsink_archive_contents(sink, 2 * TAR_BLOCK_SIZE);
-
- /* OK, that's the end of the archive. */
- bbsink_end_archive(sink);
- }
- }
-
- basebackup_progress_wait_wal_archive(&state);
- endptr = do_pg_backup_stop(labelfile->data, !opt->nowait, &endtli);
- }
- PG_END_ENSURE_ERROR_CLEANUP(do_pg_abort_backup, BoolGetDatum(false));
-
-
- if (opt->includewal)
- {
- /*
- * We've left the last tar file "open", so we can now append the
- * required WAL files to it.
- */
- char pathbuf[MAXPGPATH];
- XLogSegNo segno;
- XLogSegNo startsegno;
- XLogSegNo endsegno;
- struct stat statbuf;
- List *historyFileList = NIL;
- List *walFileList = NIL;
- char firstoff[MAXFNAMELEN];
- char lastoff[MAXFNAMELEN];
- DIR *dir;
- struct dirent *de;
- ListCell *lc;
- TimeLineID tli;
-
- basebackup_progress_transfer_wal();
-
- /*
- * I'd rather not worry about timelines here, so scan pg_wal and
- * include all WAL files in the range between 'startptr' and 'endptr',
- * regardless of the timeline the file is stamped with. If there are
- * some spurious WAL files belonging to timelines that don't belong in
- * this server's history, they will be included too. Normally there
- * shouldn't be such files, but if there are, there's little harm in
- * including them.
- */
- XLByteToSeg(state.startptr, startsegno, wal_segment_size);
- XLogFileName(firstoff, state.starttli, startsegno, wal_segment_size);
- XLByteToPrevSeg(endptr, endsegno, wal_segment_size);
- XLogFileName(lastoff, endtli, endsegno, wal_segment_size);
-
- dir = AllocateDir("pg_wal");
- while ((de = ReadDir(dir, "pg_wal")) != NULL)
- {
- /* Does it look like a WAL segment, and is it in the range? */
- if (IsXLogFileName(de->d_name) &&
- strcmp(de->d_name + 8, firstoff + 8) >= 0 &&
- strcmp(de->d_name + 8, lastoff + 8) <= 0)
- {
- walFileList = lappend(walFileList, pstrdup(de->d_name));
- }
- /* Does it look like a timeline history file? */
- else if (IsTLHistoryFileName(de->d_name))
- {
- historyFileList = lappend(historyFileList, pstrdup(de->d_name));
- }
- }
- FreeDir(dir);
-
- /*
- * Before we go any further, check that none of the WAL segments we
- * need were removed.
- */
- CheckXLogRemoved(startsegno, state.starttli);
-
- /*
- * Sort the WAL filenames. We want to send the files in order from
- * oldest to newest, to reduce the chance that a file is recycled
- * before we get a chance to send it over.
- */
- list_sort(walFileList, compareWalFileNames);
-
- /*
- * There must be at least one xlog file in the pg_wal directory, since
- * we are doing backup-including-xlog.
- */
- if (walFileList == NIL)
- ereport(ERROR,
- (errmsg("could not find any WAL files")));
-
- /*
- * Sanity check: the first and last segment should cover startptr and
- * endptr, with no gaps in between.
- */
- XLogFromFileName((char *) linitial(walFileList),
- &tli, &segno, wal_segment_size);
- if (segno != startsegno)
- {
- char startfname[MAXFNAMELEN];
-
- XLogFileName(startfname, state.starttli, startsegno,
- wal_segment_size);
- ereport(ERROR,
- (errmsg("could not find WAL file \"%s\"", startfname)));
- }
- foreach(lc, walFileList)
- {
- char *walFileName = (char *) lfirst(lc);
- XLogSegNo currsegno = segno;
- XLogSegNo nextsegno = segno + 1;
-
- XLogFromFileName(walFileName, &tli, &segno, wal_segment_size);
- if (!(nextsegno == segno || currsegno == segno))
- {
- char nextfname[MAXFNAMELEN];
-
- XLogFileName(nextfname, tli, nextsegno, wal_segment_size);
- ereport(ERROR,
- (errmsg("could not find WAL file \"%s\"", nextfname)));
- }
- }
- if (segno != endsegno)
- {
- char endfname[MAXFNAMELEN];
-
- XLogFileName(endfname, endtli, endsegno, wal_segment_size);
- ereport(ERROR,
- (errmsg("could not find WAL file \"%s\"", endfname)));
- }
-
- /* Ok, we have everything we need. Send the WAL files. */
- foreach(lc, walFileList)
- {
- char *walFileName = (char *) lfirst(lc);
- int fd;
- size_t cnt;
- pgoff_t len = 0;
-
- snprintf(pathbuf, MAXPGPATH, XLOGDIR "/%s", walFileName);
- XLogFromFileName(walFileName, &tli, &segno, wal_segment_size);
-
- fd = OpenTransientFile(pathbuf, O_RDONLY | PG_BINARY);
- if (fd < 0)
- {
- int save_errno = errno;
-
- /*
- * Most likely reason for this is that the file was already
- * removed by a checkpoint, so check for that to get a better
- * error message.
- */
- CheckXLogRemoved(segno, tli);
-
- errno = save_errno;
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not open file \"%s\": %m", pathbuf)));
- }
-
- if (fstat(fd, &statbuf) != 0)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not stat file \"%s\": %m",
- pathbuf)));
- if (statbuf.st_size != wal_segment_size)
- {
- CheckXLogRemoved(segno, tli);
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("unexpected WAL file size \"%s\"", walFileName)));
- }
-
- /* send the WAL file itself */
- _tarWriteHeader(sink, pathbuf, NULL, &statbuf, false);
-
- while ((cnt = basebackup_read_file(fd, sink->bbs_buffer,
- Min(sink->bbs_buffer_length,
- wal_segment_size - len),
- len, pathbuf, true)) > 0)
- {
- CheckXLogRemoved(segno, tli);
- bbsink_archive_contents(sink, cnt);
-
- len += cnt;
-
- if (len == wal_segment_size)
- break;
- }
-
- if (len != wal_segment_size)
- {
- CheckXLogRemoved(segno, tli);
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("unexpected WAL file size \"%s\"", walFileName)));
- }
-
- /*
- * wal_segment_size is a multiple of TAR_BLOCK_SIZE, so no need
- * for padding.
- */
- Assert(wal_segment_size % TAR_BLOCK_SIZE == 0);
-
- CloseTransientFile(fd);
-
- /*
- * Mark file as archived, otherwise files can get archived again
- * after promotion of a new node. This is in line with
- * walreceiver.c always doing an XLogArchiveForceDone() after a
- * complete segment.
- */
- StatusFilePath(pathbuf, walFileName, ".done");
- sendFileWithContent(sink, pathbuf, "", &manifest);
- }
-
- /*
- * Send timeline history files too. Only the latest timeline history
- * file is required for recovery, and even that only if there happens
- * to be a timeline switch in the first WAL segment that contains the
- * checkpoint record, or if we're taking a base backup from a standby
- * server and the target timeline changes while the backup is taken.
- * But they are small and highly useful for debugging purposes, so
- * better include them all, always.
- */
- foreach(lc, historyFileList)
- {
- char *fname = lfirst(lc);
-
- snprintf(pathbuf, MAXPGPATH, XLOGDIR "/%s", fname);
-
- if (lstat(pathbuf, &statbuf) != 0)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not stat file \"%s\": %m", pathbuf)));
-
- sendFile(sink, pathbuf, pathbuf, &statbuf, false, InvalidOid,
- &manifest, NULL);
-
- /* unconditionally mark file as archived */
- StatusFilePath(pathbuf, fname, ".done");
- sendFileWithContent(sink, pathbuf, "", &manifest);
- }
-
- /* Properly terminate the tar file. */
- StaticAssertStmt(2 * TAR_BLOCK_SIZE <= BLCKSZ,
- "BLCKSZ too small for 2 tar blocks");
- memset(sink->bbs_buffer, 0, 2 * TAR_BLOCK_SIZE);
- bbsink_archive_contents(sink, 2 * TAR_BLOCK_SIZE);
-
- /* OK, that's the end of the archive. */
- bbsink_end_archive(sink);
- }
-
- AddWALInfoToBackupManifest(&manifest, state.startptr, state.starttli,
- endptr, endtli);
-
- SendBackupManifest(&manifest, sink);
-
- bbsink_end_backup(sink, endptr, endtli);
-
- if (total_checksum_failures)
- {
- if (total_checksum_failures > 1)
- ereport(WARNING,
- (errmsg_plural("%lld total checksum verification failure",
- "%lld total checksum verification failures",
- total_checksum_failures,
- total_checksum_failures)));
-
- ereport(ERROR,
- (errcode(ERRCODE_DATA_CORRUPTED),
- errmsg("checksum verification failure during base backup")));
- }
-
- /*
- * Make sure to free the manifest before the resource owners as manifests
- * use cryptohash contexts that may depend on resource owners (like
- * OpenSSL).
- */
- FreeBackupManifest(&manifest);
-
- /* clean up the resource owner we created */
- WalSndResourceCleanup(true);
-
- basebackup_progress_done();
-}
-
-/*
- * list_sort comparison function, to compare log/seg portion of WAL segment
- * filenames, ignoring the timeline portion.
- */
-static int
-compareWalFileNames(const ListCell *a, const ListCell *b)
-{
- char *fna = (char *) lfirst(a);
- char *fnb = (char *) lfirst(b);
-
- return strcmp(fna + 8, fnb + 8);
-}
-
-/*
- * Parse the base backup options passed down by the parser
- */
-static void
-parse_basebackup_options(List *options, basebackup_options *opt)
-{
- ListCell *lopt;
- bool o_label = false;
- bool o_progress = false;
- bool o_checkpoint = false;
- bool o_nowait = false;
- bool o_wal = false;
- bool o_maxrate = false;
- bool o_tablespace_map = false;
- bool o_noverify_checksums = false;
- bool o_manifest = false;
- bool o_manifest_checksums = false;
- bool o_target = false;
- bool o_target_detail = false;
- char *target_str = NULL;
- char *target_detail_str = NULL;
- bool o_compression = false;
- bool o_compression_detail = false;
- char *compression_detail_str = NULL;
-
- MemSet(opt, 0, sizeof(*opt));
- opt->manifest = MANIFEST_OPTION_NO;
- opt->manifest_checksum_type = CHECKSUM_TYPE_CRC32C;
- opt->compression = PG_COMPRESSION_NONE;
- opt->compression_specification.algorithm = PG_COMPRESSION_NONE;
-
- foreach(lopt, options)
- {
- DefElem *defel = (DefElem *) lfirst(lopt);
-
- if (strcmp(defel->defname, "label") == 0)
- {
- if (o_label)
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("duplicate option \"%s\"", defel->defname)));
- opt->label = defGetString(defel);
- o_label = true;
- }
- else if (strcmp(defel->defname, "progress") == 0)
- {
- if (o_progress)
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("duplicate option \"%s\"", defel->defname)));
- opt->progress = defGetBoolean(defel);
- o_progress = true;
- }
- else if (strcmp(defel->defname, "checkpoint") == 0)
- {
- char *optval = defGetString(defel);
-
- if (o_checkpoint)
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("duplicate option \"%s\"", defel->defname)));
- if (pg_strcasecmp(optval, "fast") == 0)
- opt->fastcheckpoint = true;
- else if (pg_strcasecmp(optval, "spread") == 0)
- opt->fastcheckpoint = false;
- else
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("unrecognized checkpoint type: \"%s\"",
- optval)));
- o_checkpoint = true;
- }
- else if (strcmp(defel->defname, "wait") == 0)
- {
- if (o_nowait)
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("duplicate option \"%s\"", defel->defname)));
- opt->nowait = !defGetBoolean(defel);
- o_nowait = true;
- }
- else if (strcmp(defel->defname, "wal") == 0)
- {
- if (o_wal)
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("duplicate option \"%s\"", defel->defname)));
- opt->includewal = defGetBoolean(defel);
- o_wal = true;
- }
- else if (strcmp(defel->defname, "max_rate") == 0)
- {
- int64 maxrate;
-
- if (o_maxrate)
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("duplicate option \"%s\"", defel->defname)));
-
- maxrate = defGetInt64(defel);
- if (maxrate < MAX_RATE_LOWER || maxrate > MAX_RATE_UPPER)
- ereport(ERROR,
- (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
- errmsg("%d is outside the valid range for parameter \"%s\" (%d .. %d)",
- (int) maxrate, "MAX_RATE", MAX_RATE_LOWER, MAX_RATE_UPPER)));
-
- opt->maxrate = (uint32) maxrate;
- o_maxrate = true;
- }
- else if (strcmp(defel->defname, "tablespace_map") == 0)
- {
- if (o_tablespace_map)
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("duplicate option \"%s\"", defel->defname)));
- opt->sendtblspcmapfile = defGetBoolean(defel);
- o_tablespace_map = true;
- }
- else if (strcmp(defel->defname, "verify_checksums") == 0)
- {
- if (o_noverify_checksums)
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("duplicate option \"%s\"", defel->defname)));
- noverify_checksums = !defGetBoolean(defel);
- o_noverify_checksums = true;
- }
- else if (strcmp(defel->defname, "manifest") == 0)
- {
- char *optval = defGetString(defel);
- bool manifest_bool;
-
- if (o_manifest)
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("duplicate option \"%s\"", defel->defname)));
- if (parse_bool(optval, &manifest_bool))
- {
- if (manifest_bool)
- opt->manifest = MANIFEST_OPTION_YES;
- else
- opt->manifest = MANIFEST_OPTION_NO;
- }
- else if (pg_strcasecmp(optval, "force-encode") == 0)
- opt->manifest = MANIFEST_OPTION_FORCE_ENCODE;
- else
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("unrecognized manifest option: \"%s\"",
- optval)));
- o_manifest = true;
- }
- else if (strcmp(defel->defname, "manifest_checksums") == 0)
- {
- char *optval = defGetString(defel);
-
- if (o_manifest_checksums)
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("duplicate option \"%s\"", defel->defname)));
- if (!pg_checksum_parse_type(optval,
- &opt->manifest_checksum_type))
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("unrecognized checksum algorithm: \"%s\"",
- optval)));
- o_manifest_checksums = true;
- }
- else if (strcmp(defel->defname, "target") == 0)
- {
- if (o_target)
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("duplicate option \"%s\"", defel->defname)));
- target_str = defGetString(defel);
- o_target = true;
- }
- else if (strcmp(defel->defname, "target_detail") == 0)
- {
- char *optval = defGetString(defel);
-
- if (o_target_detail)
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("duplicate option \"%s\"", defel->defname)));
- target_detail_str = optval;
- o_target_detail = true;
- }
- else if (strcmp(defel->defname, "compression") == 0)
- {
- char *optval = defGetString(defel);
-
- if (o_compression)
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("duplicate option \"%s\"", defel->defname)));
- if (!parse_compress_algorithm(optval, &opt->compression))
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("unrecognized compression algorithm \"%s\"",
- optval)));
- o_compression = true;
- }
- else if (strcmp(defel->defname, "compression_detail") == 0)
- {
- if (o_compression_detail)
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("duplicate option \"%s\"", defel->defname)));
- compression_detail_str = defGetString(defel);
- o_compression_detail = true;
- }
- else
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("unrecognized base backup option: \"%s\"",
- defel->defname)));
- }
-
- if (opt->label == NULL)
- opt->label = "base backup";
- if (opt->manifest == MANIFEST_OPTION_NO)
- {
- if (o_manifest_checksums)
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("manifest checksums require a backup manifest")));
- opt->manifest_checksum_type = CHECKSUM_TYPE_NONE;
- }
-
- if (target_str == NULL)
- {
- if (target_detail_str != NULL)
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("target detail cannot be used without target")));
- opt->use_copytblspc = true;
- opt->send_to_client = true;
- }
- else if (strcmp(target_str, "client") == 0)
- {
- if (target_detail_str != NULL)
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("target '%s' does not accept a target detail",
- target_str)));
- opt->send_to_client = true;
- }
- else
- opt->target_handle =
- BaseBackupGetTargetHandle(target_str, target_detail_str);
-
- if (o_compression_detail && !o_compression)
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("compression detail requires compression")));
-
- if (o_compression)
- {
- char *error_detail;
-
- parse_compress_specification(opt->compression, compression_detail_str,
- &opt->compression_specification);
- error_detail =
- validate_compress_specification(&opt->compression_specification);
- if (error_detail != NULL)
- ereport(ERROR,
- errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("invalid compression specification: %s",
- error_detail));
- }
-}
-
-
-/*
- * SendBaseBackup() - send a complete base backup.
- *
- * The function will put the system into backup mode like pg_backup_start()
- * does, so that the backup is consistent even though we read directly from
- * the filesystem, bypassing the buffer cache.
- */
-void
-SendBaseBackup(BaseBackupCmd *cmd)
-{
- basebackup_options opt;
- bbsink *sink;
- SessionBackupState status = get_backup_status();
-
- if (status == SESSION_BACKUP_RUNNING)
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("a backup is already in progress in this session")));
-
- parse_basebackup_options(cmd->options, &opt);
-
- WalSndSetState(WALSNDSTATE_BACKUP);
-
- if (update_process_title)
- {
- char activitymsg[50];
-
- snprintf(activitymsg, sizeof(activitymsg), "sending backup \"%s\"",
- opt.label);
- set_ps_display(activitymsg);
- }
-
- /*
- * If the target is specifically 'client' then set up to stream the backup
- * to the client; otherwise, it's being sent someplace else and should not
- * be sent to the client. BaseBackupGetSink has the job of setting up a
- * sink to send the backup data wherever it needs to go.
- */
- sink = bbsink_copystream_new(opt.send_to_client);
- if (opt.target_handle != NULL)
- sink = BaseBackupGetSink(opt.target_handle, sink);
-
- /* Set up network throttling, if client requested it */
- if (opt.maxrate > 0)
- sink = bbsink_throttle_new(sink, opt.maxrate);
-
- /* Set up server-side compression, if client requested it */
- if (opt.compression == PG_COMPRESSION_GZIP)
- sink = bbsink_gzip_new(sink, &opt.compression_specification);
- else if (opt.compression == PG_COMPRESSION_LZ4)
- sink = bbsink_lz4_new(sink, &opt.compression_specification);
- else if (opt.compression == PG_COMPRESSION_ZSTD)
- sink = bbsink_zstd_new(sink, &opt.compression_specification);
-
- /* Set up progress reporting. */
- sink = bbsink_progress_new(sink, opt.progress);
-
- /*
- * Perform the base backup, but make sure we clean up the bbsink even if
- * an error occurs.
- */
- PG_TRY();
- {
- perform_base_backup(&opt, sink);
- }
- PG_FINALLY();
- {
- bbsink_cleanup(sink);
- }
- PG_END_TRY();
-}
-
-/*
- * Inject a file with given name and content in the output tar stream.
- */
-static void
-sendFileWithContent(bbsink *sink, const char *filename, const char *content,
- backup_manifest_info *manifest)
-{
- struct stat statbuf;
- int bytes_done = 0,
- len;
- pg_checksum_context checksum_ctx;
-
- if (pg_checksum_init(&checksum_ctx, manifest->checksum_type) < 0)
- elog(ERROR, "could not initialize checksum of file \"%s\"",
- filename);
-
- len = strlen(content);
-
- /*
- * Construct a stat struct for the backup_label file we're injecting in
- * the tar.
- */
- /* Windows doesn't have the concept of uid and gid */
-#ifdef WIN32
- statbuf.st_uid = 0;
- statbuf.st_gid = 0;
-#else
- statbuf.st_uid = geteuid();
- statbuf.st_gid = getegid();
-#endif
- statbuf.st_mtime = time(NULL);
- statbuf.st_mode = pg_file_create_mode;
- statbuf.st_size = len;
-
- _tarWriteHeader(sink, filename, NULL, &statbuf, false);
-
- if (pg_checksum_update(&checksum_ctx, (uint8 *) content, len) < 0)
- elog(ERROR, "could not update checksum of file \"%s\"",
- filename);
-
- while (bytes_done < len)
- {
- size_t remaining = len - bytes_done;
- size_t nbytes = Min(sink->bbs_buffer_length, remaining);
-
- memcpy(sink->bbs_buffer, content, nbytes);
- bbsink_archive_contents(sink, nbytes);
- bytes_done += nbytes;
- }
-
- _tarWritePadding(sink, len);
-
- AddFileToBackupManifest(manifest, NULL, filename, len,
- (pg_time_t) statbuf.st_mtime, &checksum_ctx);
-}
-
-/*
- * Include the tablespace directory pointed to by 'path' in the output tar
- * stream. If 'sizeonly' is true, we just calculate a total length and return
- * it, without actually sending anything.
- *
- * Only used to send auxiliary tablespaces, not PGDATA.
- */
-static int64
-sendTablespace(bbsink *sink, char *path, char *spcoid, bool sizeonly,
- backup_manifest_info *manifest)
-{
- int64 size;
- char pathbuf[MAXPGPATH];
- struct stat statbuf;
-
- /*
- * 'path' points to the tablespace location, but we only want to include
- * the version directory in it that belongs to us.
- */
- snprintf(pathbuf, sizeof(pathbuf), "%s/%s", path,
- TABLESPACE_VERSION_DIRECTORY);
-
- /*
- * Store a directory entry in the tar file so we get the permissions
- * right.
- */
- if (lstat(pathbuf, &statbuf) != 0)
- {
- if (errno != ENOENT)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not stat file or directory \"%s\": %m",
- pathbuf)));
-
- /* If the tablespace went away while scanning, it's no error. */
- return 0;
- }
-
- size = _tarWriteHeader(sink, TABLESPACE_VERSION_DIRECTORY, NULL, &statbuf,
- sizeonly);
-
- /* Send all the files in the tablespace version directory */
- size += sendDir(sink, pathbuf, strlen(path), sizeonly, NIL, true, manifest,
- spcoid);
-
- return size;
-}
-
-/*
- * Include all files from the given directory in the output tar stream. If
- * 'sizeonly' is true, we just calculate a total length and return it, without
- * actually sending anything.
- *
- * Omit any directory in the tablespaces list, to avoid backing up
- * tablespaces twice when they were created inside PGDATA.
- *
- * If sendtblspclinks is true, we need to include symlink
- * information in the tar file. If not, we can skip that
- * as it will be sent separately in the tablespace_map file.
- */
-static int64
-sendDir(bbsink *sink, const char *path, int basepathlen, bool sizeonly,
- List *tablespaces, bool sendtblspclinks, backup_manifest_info *manifest,
- const char *spcoid)
-{
- DIR *dir;
- struct dirent *de;
- char pathbuf[MAXPGPATH * 2];
- struct stat statbuf;
- int64 size = 0;
- const char *lastDir; /* Split last dir from parent path. */
- bool isDbDir = false; /* Does this directory contain relations? */
-
- /*
- * Determine if the current path is a database directory that can contain
- * relations.
- *
- * Start by finding the location of the delimiter between the parent path
- * and the current path.
- */
- lastDir = last_dir_separator(path);
-
- /* Does this path look like a database path (i.e. all digits)? */
- if (lastDir != NULL &&
- strspn(lastDir + 1, "0123456789") == strlen(lastDir + 1))
- {
- /* Part of path that contains the parent directory. */
- int parentPathLen = lastDir - path;
-
- /*
- * Mark path as a database directory if the parent path is either
- * $PGDATA/base or a tablespace version path.
- */
- if (strncmp(path, "./base", parentPathLen) == 0 ||
- (parentPathLen >= (sizeof(TABLESPACE_VERSION_DIRECTORY) - 1) &&
- strncmp(lastDir - (sizeof(TABLESPACE_VERSION_DIRECTORY) - 1),
- TABLESPACE_VERSION_DIRECTORY,
- sizeof(TABLESPACE_VERSION_DIRECTORY) - 1) == 0))
- isDbDir = true;
- }
-
- dir = AllocateDir(path);
- while ((de = ReadDir(dir, path)) != NULL)
- {
- int excludeIdx;
- bool excludeFound;
- ForkNumber relForkNum; /* Type of fork if file is a relation */
- int relnumchars; /* Chars in filename that are the
- * relnumber */
-
- /* Skip special stuff */
- if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0)
- continue;
-
- /* Skip temporary files */
- if (strncmp(de->d_name,
- PG_TEMP_FILE_PREFIX,
- strlen(PG_TEMP_FILE_PREFIX)) == 0)
- continue;
-
- /*
- * Check if the postmaster has signaled us to exit, and abort with an
- * error in that case. The error handler further up will call
- * do_pg_abort_backup() for us. Also check that if the backup was
- * started while still in recovery, the server wasn't promoted.
- * do_pg_backup_stop() will check that too, but it's better to stop
- * the backup early than continue to the end and fail there.
- */
- CHECK_FOR_INTERRUPTS();
- if (RecoveryInProgress() != backup_started_in_recovery)
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("the standby was promoted during online backup"),
- errhint("This means that the backup being taken is corrupt "
- "and should not be used. "
- "Try taking another online backup.")));
-
- /* Scan for files that should be excluded */
- excludeFound = false;
- for (excludeIdx = 0; excludeFiles[excludeIdx].name != NULL; excludeIdx++)
- {
- int cmplen = strlen(excludeFiles[excludeIdx].name);
-
- if (!excludeFiles[excludeIdx].match_prefix)
- cmplen++;
- if (strncmp(de->d_name, excludeFiles[excludeIdx].name, cmplen) == 0)
- {
- elog(DEBUG1, "file \"%s\" excluded from backup", de->d_name);
- excludeFound = true;
- break;
- }
- }
-
- if (excludeFound)
- continue;
-
- /* Exclude all forks for unlogged tables except the init fork */
- if (isDbDir &&
- parse_filename_for_nontemp_relation(de->d_name, &relnumchars,
- &relForkNum))
- {
- /* Never exclude init forks */
- if (relForkNum != INIT_FORKNUM)
- {
- char initForkFile[MAXPGPATH];
- char relNumber[OIDCHARS + 1];
-
- /*
- * If any other type of fork, check if there is an init fork
- * with the same RelFileNumber. If so, the file can be
- * excluded.
- */
- memcpy(relNumber, de->d_name, relnumchars);
- relNumber[relnumchars] = '\0';
- snprintf(initForkFile, sizeof(initForkFile), "%s/%s_init",
- path, relNumber);
-
- if (lstat(initForkFile, &statbuf) == 0)
- {
- elog(DEBUG2,
- "unlogged relation file \"%s\" excluded from backup",
- de->d_name);
-
- continue;
- }
- }
- }
-
- /* Exclude temporary relations */
- if (isDbDir && looks_like_temp_rel_name(de->d_name))
- {
- elog(DEBUG2,
- "temporary relation file \"%s\" excluded from backup",
- de->d_name);
-
- continue;
- }
-
- snprintf(pathbuf, sizeof(pathbuf), "%s/%s", path, de->d_name);
-
- /* Skip pg_control here to back up it last */
- if (strcmp(pathbuf, "./global/pg_control") == 0)
- continue;
-
- if (lstat(pathbuf, &statbuf) != 0)
- {
- if (errno != ENOENT)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not stat file or directory \"%s\": %m",
- pathbuf)));
-
- /* If the file went away while scanning, it's not an error. */
- continue;
- }
-
- /* Scan for directories whose contents should be excluded */
- excludeFound = false;
- for (excludeIdx = 0; excludeDirContents[excludeIdx] != NULL; excludeIdx++)
- {
- if (strcmp(de->d_name, excludeDirContents[excludeIdx]) == 0)
- {
- elog(DEBUG1, "contents of directory \"%s\" excluded from backup", de->d_name);
- convert_link_to_directory(pathbuf, &statbuf);
- size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, NULL,
- &statbuf, sizeonly);
- excludeFound = true;
- break;
- }
- }
-
- if (excludeFound)
- continue;
-
- /*
- * We can skip pg_wal, the WAL segments need to be fetched from the
- * WAL archive anyway. But include it as an empty directory anyway, so
- * we get permissions right.
- */
- if (strcmp(pathbuf, "./pg_wal") == 0)
- {
- /* If pg_wal is a symlink, write it as a directory anyway */
- convert_link_to_directory(pathbuf, &statbuf);
- size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, NULL,
- &statbuf, sizeonly);
-
- /*
- * Also send archive_status directory (by hackishly reusing
- * statbuf from above ...).
- */
- size += _tarWriteHeader(sink, "./pg_wal/archive_status", NULL,
- &statbuf, sizeonly);
-
- continue; /* don't recurse into pg_wal */
- }
-
- /* Allow symbolic links in pg_tblspc only */
- if (strcmp(path, "./pg_tblspc") == 0 && S_ISLNK(statbuf.st_mode))
- {
- char linkpath[MAXPGPATH];
- int rllen;
-
- rllen = readlink(pathbuf, linkpath, sizeof(linkpath));
- if (rllen < 0)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not read symbolic link \"%s\": %m",
- pathbuf)));
- if (rllen >= sizeof(linkpath))
- ereport(ERROR,
- (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
- errmsg("symbolic link \"%s\" target is too long",
- pathbuf)));
- linkpath[rllen] = '\0';
-
- size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, linkpath,
- &statbuf, sizeonly);
- }
- else if (S_ISDIR(statbuf.st_mode))
- {
- bool skip_this_dir = false;
- ListCell *lc;
-
- /*
- * Store a directory entry in the tar file so we can get the
- * permissions right.
- */
- size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, NULL, &statbuf,
- sizeonly);
-
- /*
- * Call ourselves recursively for a directory, unless it happens
- * to be a separate tablespace located within PGDATA.
- */
- foreach(lc, tablespaces)
- {
- tablespaceinfo *ti = (tablespaceinfo *) lfirst(lc);
-
- /*
- * ti->rpath is the tablespace relative path within PGDATA, or
- * NULL if the tablespace has been properly located somewhere
- * else.
- *
- * Skip past the leading "./" in pathbuf when comparing.
- */
- if (ti->rpath && strcmp(ti->rpath, pathbuf + 2) == 0)
- {
- skip_this_dir = true;
- break;
- }
- }
-
- /*
- * skip sending directories inside pg_tblspc, if not required.
- */
- if (strcmp(pathbuf, "./pg_tblspc") == 0 && !sendtblspclinks)
- skip_this_dir = true;
-
- if (!skip_this_dir)
- size += sendDir(sink, pathbuf, basepathlen, sizeonly, tablespaces,
- sendtblspclinks, manifest, spcoid);
- }
- else if (S_ISREG(statbuf.st_mode))
- {
- bool sent = false;
-
- if (!sizeonly)
- sent = sendFile(sink, pathbuf, pathbuf + basepathlen + 1, &statbuf,
- true, isDbDir ? atooid(lastDir + 1) : InvalidOid,
- manifest, spcoid);
-
- if (sent || sizeonly)
- {
- /* Add size. */
- size += statbuf.st_size;
-
- /* Pad to a multiple of the tar block size. */
- size += tarPaddingBytesRequired(statbuf.st_size);
-
- /* Size of the header for the file. */
- size += TAR_BLOCK_SIZE;
- }
- }
- else
- ereport(WARNING,
- (errmsg("skipping special file \"%s\"", pathbuf)));
- }
- FreeDir(dir);
- return size;
-}
-
-/*
- * Check if a file should have its checksum validated.
- * We validate checksums on files in regular tablespaces
- * (including global and default) only, and in those there
- * are some files that are explicitly excluded.
- */
-static bool
-is_checksummed_file(const char *fullpath, const char *filename)
-{
- /* Check that the file is in a tablespace */
- if (strncmp(fullpath, "./global/", 9) == 0 ||
- strncmp(fullpath, "./base/", 7) == 0 ||
- strncmp(fullpath, "/", 1) == 0)
- {
- int excludeIdx;
-
- /* Compare file against noChecksumFiles skip list */
- for (excludeIdx = 0; noChecksumFiles[excludeIdx].name != NULL; excludeIdx++)
- {
- int cmplen = strlen(noChecksumFiles[excludeIdx].name);
-
- if (!noChecksumFiles[excludeIdx].match_prefix)
- cmplen++;
- if (strncmp(filename, noChecksumFiles[excludeIdx].name,
- cmplen) == 0)
- return false;
- }
-
- return true;
- }
- else
- return false;
-}
-
-/*****
- * Functions for handling tar file format
- *
- * Copied from pg_dump, but modified to work with libpq for sending
- */
-
-
-/*
- * Given the member, write the TAR header & send the file.
- *
- * If 'missing_ok' is true, will not throw an error if the file is not found.
- *
- * If dboid is anything other than InvalidOid then any checksum failures
- * detected will get reported to the cumulative stats system.
- *
- * Returns true if the file was successfully sent, false if 'missing_ok',
- * and the file did not exist.
- */
-static bool
-sendFile(bbsink *sink, const char *readfilename, const char *tarfilename,
- struct stat *statbuf, bool missing_ok, Oid dboid,
- backup_manifest_info *manifest, const char *spcoid)
-{
- int fd;
- BlockNumber blkno = 0;
- bool block_retry = false;
- uint16 checksum;
- int checksum_failures = 0;
- off_t cnt;
- int i;
- pgoff_t len = 0;
- char *page;
- PageHeader phdr;
- int segmentno = 0;
- char *segmentpath;
- bool verify_checksum = false;
- pg_checksum_context checksum_ctx;
-
- if (pg_checksum_init(&checksum_ctx, manifest->checksum_type) < 0)
- elog(ERROR, "could not initialize checksum of file \"%s\"",
- readfilename);
-
- fd = OpenTransientFile(readfilename, O_RDONLY | PG_BINARY);
- if (fd < 0)
- {
- if (errno == ENOENT && missing_ok)
- return false;
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not open file \"%s\": %m", readfilename)));
- }
-
- _tarWriteHeader(sink, tarfilename, NULL, statbuf, false);
-
- if (!noverify_checksums && DataChecksumsEnabled())
- {
- char *filename;
-
- /*
- * Get the filename (excluding path). As last_dir_separator()
- * includes the last directory separator, we chop that off by
- * incrementing the pointer.
- */
- filename = last_dir_separator(readfilename) + 1;
-
- if (is_checksummed_file(readfilename, filename))
- {
- verify_checksum = true;
-
- /*
- * Cut off at the segment boundary (".") to get the segment number
- * in order to mix it into the checksum.
- */
- segmentpath = strstr(filename, ".");
- if (segmentpath != NULL)
- {
- segmentno = atoi(segmentpath + 1);
- if (segmentno == 0)
- ereport(ERROR,
- (errmsg("invalid segment number %d in file \"%s\"",
- segmentno, filename)));
- }
- }
- }
-
- /*
- * Loop until we read the amount of data the caller told us to expect. The
- * file could be longer, if it was extended while we were sending it, but
- * for a base backup we can ignore such extended data. It will be restored
- * from WAL.
- */
- while (len < statbuf->st_size)
- {
- size_t remaining = statbuf->st_size - len;
-
- /* Try to read some more data. */
- cnt = basebackup_read_file(fd, sink->bbs_buffer,
- Min(sink->bbs_buffer_length, remaining),
- len, readfilename, true);
-
- /*
- * If we hit end-of-file, a concurrent truncation must have occurred.
- * That's not an error condition, because WAL replay will fix things
- * up.
- */
- if (cnt == 0)
- break;
-
- /*
- * The checksums are verified at block level, so we iterate over the
- * buffer in chunks of BLCKSZ, after making sure that
- * TAR_SEND_SIZE/buf is divisible by BLCKSZ and we read a multiple of
- * BLCKSZ bytes.
- */
- Assert((sink->bbs_buffer_length % BLCKSZ) == 0);
-
- if (verify_checksum && (cnt % BLCKSZ != 0))
- {
- ereport(WARNING,
- (errmsg("could not verify checksum in file \"%s\", block "
- "%u: read buffer size %d and page size %d "
- "differ",
- readfilename, blkno, (int) cnt, BLCKSZ)));
- verify_checksum = false;
- }
-
- if (verify_checksum)
- {
- for (i = 0; i < cnt / BLCKSZ; i++)
- {
- page = sink->bbs_buffer + BLCKSZ * i;
-
- /*
- * Only check pages which have not been modified since the
- * start of the base backup. Otherwise, they might have been
- * written only halfway and the checksum would not be valid.
- * However, replaying WAL would reinstate the correct page in
- * this case. We also skip completely new pages, since they
- * don't have a checksum yet.
- */
- if (!PageIsNew(page) && PageGetLSN(page) < sink->bbs_state->startptr)
- {
- checksum = pg_checksum_page((char *) page, blkno + segmentno * RELSEG_SIZE);
- phdr = (PageHeader) page;
- if (phdr->pd_checksum != checksum)
- {
- /*
- * Retry the block on the first failure. It's
- * possible that we read the first 4K page of the
- * block just before postgres updated the entire block
- * so it ends up looking torn to us. We only need to
- * retry once because the LSN should be updated to
- * something we can ignore on the next pass. If the
- * error happens again then it is a true validation
- * failure.
- */
- if (block_retry == false)
- {
- int reread_cnt;
-
- /* Reread the failed block */
- reread_cnt =
- basebackup_read_file(fd,
- sink->bbs_buffer + BLCKSZ * i,
- BLCKSZ, len + BLCKSZ * i,
- readfilename,
- false);
- if (reread_cnt == 0)
- {
- /*
- * If we hit end-of-file, a concurrent
- * truncation must have occurred, so break out
- * of this loop just as if the initial fread()
- * returned 0. We'll drop through to the same
- * code that handles that case. (We must fix
- * up cnt first, though.)
- */
- cnt = BLCKSZ * i;
- break;
- }
-
- /* Set flag so we know a retry was attempted */
- block_retry = true;
-
- /* Reset loop to validate the block again */
- i--;
- continue;
- }
-
- checksum_failures++;
-
- if (checksum_failures <= 5)
- ereport(WARNING,
- (errmsg("checksum verification failed in "
- "file \"%s\", block %u: calculated "
- "%X but expected %X",
- readfilename, blkno, checksum,
- phdr->pd_checksum)));
- if (checksum_failures == 5)
- ereport(WARNING,
- (errmsg("further checksum verification "
- "failures in file \"%s\" will not "
- "be reported", readfilename)));
- }
- }
- block_retry = false;
- blkno++;
- }
- }
-
- bbsink_archive_contents(sink, cnt);
-
- /* Also feed it to the checksum machinery. */
- if (pg_checksum_update(&checksum_ctx,
- (uint8 *) sink->bbs_buffer, cnt) < 0)
- elog(ERROR, "could not update checksum of base backup");
-
- len += cnt;
- }
-
- /* If the file was truncated while we were sending it, pad it with zeros */
- while (len < statbuf->st_size)
- {
- size_t remaining = statbuf->st_size - len;
- size_t nbytes = Min(sink->bbs_buffer_length, remaining);
-
- MemSet(sink->bbs_buffer, 0, nbytes);
- if (pg_checksum_update(&checksum_ctx,
- (uint8 *) sink->bbs_buffer,
- nbytes) < 0)
- elog(ERROR, "could not update checksum of base backup");
- bbsink_archive_contents(sink, nbytes);
- len += nbytes;
- }
-
- /*
- * Pad to a block boundary, per tar format requirements. (This small piece
- * of data is probably not worth throttling, and is not checksummed
- * because it's not actually part of the file.)
- */
- _tarWritePadding(sink, len);
-
- CloseTransientFile(fd);
-
- if (checksum_failures > 1)
- {
- ereport(WARNING,
- (errmsg_plural("file \"%s\" has a total of %d checksum verification failure",
- "file \"%s\" has a total of %d checksum verification failures",
- checksum_failures,
- readfilename, checksum_failures)));
-
- pgstat_report_checksum_failures_in_db(dboid, checksum_failures);
- }
-
- total_checksum_failures += checksum_failures;
-
- AddFileToBackupManifest(manifest, spcoid, tarfilename, statbuf->st_size,
- (pg_time_t) statbuf->st_mtime, &checksum_ctx);
-
- return true;
-}
-
-static int64
-_tarWriteHeader(bbsink *sink, const char *filename, const char *linktarget,
- struct stat *statbuf, bool sizeonly)
-{
- enum tarError rc;
-
- if (!sizeonly)
- {
- /*
- * As of this writing, the smallest supported block size is 1kB, which
- * is twice TAR_BLOCK_SIZE. Since the buffer size is required to be a
- * multiple of BLCKSZ, it should be safe to assume that the buffer is
- * large enough to fit an entire tar block. We double-check by means
- * of these assertions.
- */
- StaticAssertStmt(TAR_BLOCK_SIZE <= BLCKSZ,
- "BLCKSZ too small for tar block");
- Assert(sink->bbs_buffer_length >= TAR_BLOCK_SIZE);
-
- rc = tarCreateHeader(sink->bbs_buffer, filename, linktarget,
- statbuf->st_size, statbuf->st_mode,
- statbuf->st_uid, statbuf->st_gid,
- statbuf->st_mtime);
-
- switch (rc)
- {
- case TAR_OK:
- break;
- case TAR_NAME_TOO_LONG:
- ereport(ERROR,
- (errmsg("file name too long for tar format: \"%s\"",
- filename)));
- break;
- case TAR_SYMLINK_TOO_LONG:
- ereport(ERROR,
- (errmsg("symbolic link target too long for tar format: "
- "file name \"%s\", target \"%s\"",
- filename, linktarget)));
- break;
- default:
- elog(ERROR, "unrecognized tar error: %d", rc);
- }
-
- bbsink_archive_contents(sink, TAR_BLOCK_SIZE);
- }
-
- return TAR_BLOCK_SIZE;
-}
-
-/*
- * Pad with zero bytes out to a multiple of TAR_BLOCK_SIZE.
- */
-static void
-_tarWritePadding(bbsink *sink, int len)
-{
- int pad = tarPaddingBytesRequired(len);
-
- /*
- * As in _tarWriteHeader, it should be safe to assume that the buffer is
- * large enough that we don't need to do this in multiple chunks.
- */
- Assert(sink->bbs_buffer_length >= TAR_BLOCK_SIZE);
- Assert(pad <= TAR_BLOCK_SIZE);
-
- if (pad > 0)
- {
- MemSet(sink->bbs_buffer, 0, pad);
- bbsink_archive_contents(sink, pad);
- }
-}
-
-/*
- * If the entry in statbuf is a link, then adjust statbuf to make it look like a
- * directory, so that it will be written that way.
- */
-static void
-convert_link_to_directory(const char *pathbuf, struct stat *statbuf)
-{
- /* If symlink, write it as a directory anyway */
- if (S_ISLNK(statbuf->st_mode))
- statbuf->st_mode = S_IFDIR | pg_dir_create_mode;
-}
-
-/*
- * Read some data from a file, setting a wait event and reporting any error
- * encountered.
- *
- * If partial_read_ok is false, also report an error if the number of bytes
- * read is not equal to the number of bytes requested.
- *
- * Returns the number of bytes read.
- */
-static int
-basebackup_read_file(int fd, char *buf, size_t nbytes, off_t offset,
- const char *filename, bool partial_read_ok)
-{
- int rc;
-
- pgstat_report_wait_start(WAIT_EVENT_BASEBACKUP_READ);
- rc = pread(fd, buf, nbytes, offset);
- pgstat_report_wait_end();
-
- if (rc < 0)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not read file \"%s\": %m", filename)));
- if (!partial_read_ok && rc > 0 && rc != nbytes)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not read file \"%s\": read %d of %zu",
- filename, rc, nbytes)));
-
- return rc;
-}
diff --git a/src/backend/replication/basebackup_copy.c b/src/backend/replication/basebackup_copy.c
deleted file mode 100644
index c384d63a341..00000000000
--- a/src/backend/replication/basebackup_copy.c
+++ /dev/null
@@ -1,420 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * basebackup_copy.c
- * send basebackup archives using COPY OUT
- *
- * We send a result set with information about the tabelspaces to be included
- * in the backup before starting COPY OUT. Then, we start a single COPY OUT
- * operation and transmits all the archives and the manifest if present during
- * the course of that single COPY OUT. Each CopyData message begins with a
- * type byte, allowing us to signal the start of a new archive, or the
- * manifest, by some means other than ending the COPY stream. This also allows
- * for future protocol extensions, since we can include arbitrary information
- * in the message stream as long as we're certain that the client will know
- * what to do with it.
- *
- * An older method that sent each archive using a separate COPY OUT
- * operation is no longer supported.
- *
- * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
- *
- * IDENTIFICATION
- * src/backend/replication/basebackup_copy.c
- *
- *-------------------------------------------------------------------------
- */
-#include "postgres.h"
-
-#include "access/tupdesc.h"
-#include "catalog/pg_type_d.h"
-#include "executor/executor.h"
-#include "libpq/libpq.h"
-#include "libpq/pqformat.h"
-#include "replication/basebackup.h"
-#include "replication/basebackup_sink.h"
-#include "tcop/dest.h"
-#include "utils/builtins.h"
-#include "utils/timestamp.h"
-
-typedef struct bbsink_copystream
-{
- /* Common information for all types of sink. */
- bbsink base;
-
- /* Are we sending the archives to the client, or somewhere else? */
- bool send_to_client;
-
- /*
- * Protocol message buffer. We assemble CopyData protocol messages by
- * setting the first character of this buffer to 'd' (archive or manifest
- * data) and then making base.bbs_buffer point to the second character so
- * that the rest of the data gets copied into the message just where we
- * want it.
- */
- char *msgbuffer;
-
- /*
- * When did we last report progress to the client, and how much progress
- * did we report?
- */
- TimestampTz last_progress_report_time;
- uint64 bytes_done_at_last_time_check;
-} bbsink_copystream;
-
-/*
- * We don't want to send progress messages to the client excessively
- * frequently. Ideally, we'd like to send a message when the time since the
- * last message reaches PROGRESS_REPORT_MILLISECOND_THRESHOLD, but checking
- * the system time every time we send a tiny bit of data seems too expensive.
- * So we only check it after the number of bytes sine the last check reaches
- * PROGRESS_REPORT_BYTE_INTERVAL.
- */
-#define PROGRESS_REPORT_BYTE_INTERVAL 65536
-#define PROGRESS_REPORT_MILLISECOND_THRESHOLD 1000
-
-static void bbsink_copystream_begin_backup(bbsink *sink);
-static void bbsink_copystream_begin_archive(bbsink *sink,
- const char *archive_name);
-static void bbsink_copystream_archive_contents(bbsink *sink, size_t len);
-static void bbsink_copystream_end_archive(bbsink *sink);
-static void bbsink_copystream_begin_manifest(bbsink *sink);
-static void bbsink_copystream_manifest_contents(bbsink *sink, size_t len);
-static void bbsink_copystream_end_manifest(bbsink *sink);
-static void bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr,
- TimeLineID endtli);
-static void bbsink_copystream_cleanup(bbsink *sink);
-
-static void SendCopyOutResponse(void);
-static void SendCopyDone(void);
-static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
-static void SendTablespaceList(List *tablespaces);
-
-static const bbsink_ops bbsink_copystream_ops = {
- .begin_backup = bbsink_copystream_begin_backup,
- .begin_archive = bbsink_copystream_begin_archive,
- .archive_contents = bbsink_copystream_archive_contents,
- .end_archive = bbsink_copystream_end_archive,
- .begin_manifest = bbsink_copystream_begin_manifest,
- .manifest_contents = bbsink_copystream_manifest_contents,
- .end_manifest = bbsink_copystream_end_manifest,
- .end_backup = bbsink_copystream_end_backup,
- .cleanup = bbsink_copystream_cleanup
-};
-
-/*
- * Create a new 'copystream' bbsink.
- */
-bbsink *
-bbsink_copystream_new(bool send_to_client)
-{
- bbsink_copystream *sink = palloc0(sizeof(bbsink_copystream));
-
- *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_copystream_ops;
- sink->send_to_client = send_to_client;
-
- /* Set up for periodic progress reporting. */
- sink->last_progress_report_time = GetCurrentTimestamp();
- sink->bytes_done_at_last_time_check = UINT64CONST(0);
-
- return &sink->base;
-}
-
-/*
- * Send start-of-backup wire protocol messages.
- */
-static void
-bbsink_copystream_begin_backup(bbsink *sink)
-{
- bbsink_copystream *mysink = (bbsink_copystream *) sink;
- bbsink_state *state = sink->bbs_state;
- char *buf;
-
- /*
- * Initialize buffer. We ultimately want to send the archive and manifest
- * data by means of CopyData messages where the payload portion of each
- * message begins with a type byte. However, basebackup.c expects the
- * buffer to be aligned, so we can't just allocate one extra byte for the
- * type byte. Instead, allocate enough extra bytes that the portion of the
- * buffer we reveal to our callers can be aligned, while leaving room to
- * slip the type byte in just beforehand. That will allow us to ship the
- * data with a single call to pq_putmessage and without needing any extra
- * copying.
- */
- buf = palloc(mysink->base.bbs_buffer_length + MAXIMUM_ALIGNOF);
- mysink->msgbuffer = buf + (MAXIMUM_ALIGNOF - 1);
- mysink->base.bbs_buffer = buf + MAXIMUM_ALIGNOF;
- mysink->msgbuffer[0] = 'd'; /* archive or manifest data */
-
- /* Tell client the backup start location. */
- SendXlogRecPtrResult(state->startptr, state->starttli);
-
- /* Send client a list of tablespaces. */
- SendTablespaceList(state->tablespaces);
-
- /* Send a CommandComplete message */
- pq_puttextmessage('C', "SELECT");
-
- /* Begin COPY stream. This will be used for all archives + manifest. */
- SendCopyOutResponse();
-}
-
-/*
- * Send a CopyData message announcing the beginning of a new archive.
- */
-static void
-bbsink_copystream_begin_archive(bbsink *sink, const char *archive_name)
-{
- bbsink_state *state = sink->bbs_state;
- tablespaceinfo *ti;
- StringInfoData buf;
-
- ti = list_nth(state->tablespaces, state->tablespace_num);
- pq_beginmessage(&buf, 'd'); /* CopyData */
- pq_sendbyte(&buf, 'n'); /* New archive */
- pq_sendstring(&buf, archive_name);
- pq_sendstring(&buf, ti->path == NULL ? "" : ti->path);
- pq_endmessage(&buf);
-}
-
-/*
- * Send a CopyData message containing a chunk of archive content.
- */
-static void
-bbsink_copystream_archive_contents(bbsink *sink, size_t len)
-{
- bbsink_copystream *mysink = (bbsink_copystream *) sink;
- bbsink_state *state = mysink->base.bbs_state;
- StringInfoData buf;
- uint64 targetbytes;
-
- /* Send the archive content to the client, if appropriate. */
- if (mysink->send_to_client)
- {
- /* Add one because we're also sending a leading type byte. */
- pq_putmessage('d', mysink->msgbuffer, len + 1);
- }
-
- /* Consider whether to send a progress report to the client. */
- targetbytes = mysink->bytes_done_at_last_time_check
- + PROGRESS_REPORT_BYTE_INTERVAL;
- if (targetbytes <= state->bytes_done)
- {
- TimestampTz now = GetCurrentTimestamp();
- long ms;
-
- /*
- * OK, we've sent a decent number of bytes, so check the system time
- * to see whether we're due to send a progress report.
- */
- mysink->bytes_done_at_last_time_check = state->bytes_done;
- ms = TimestampDifferenceMilliseconds(mysink->last_progress_report_time,
- now);
-
- /*
- * Send a progress report if enough time has passed. Also send one if
- * the system clock was set backward, so that such occurrences don't
- * have the effect of suppressing further progress messages.
- */
- if (ms < 0 || ms >= PROGRESS_REPORT_MILLISECOND_THRESHOLD)
- {
- mysink->last_progress_report_time = now;
-
- pq_beginmessage(&buf, 'd'); /* CopyData */
- pq_sendbyte(&buf, 'p'); /* Progress report */
- pq_sendint64(&buf, state->bytes_done);
- pq_endmessage(&buf);
- pq_flush_if_writable();
- }
- }
-}
-
-/*
- * We don't need to explicitly signal the end of the archive; the client
- * will figure out that we've reached the end when we begin the next one,
- * or begin the manifest, or end the COPY stream. However, this seems like
- * a good time to force out a progress report. One reason for that is that
- * if this is the last archive, and we don't force a progress report now,
- * the client will never be told that we sent all the bytes.
- */
-static void
-bbsink_copystream_end_archive(bbsink *sink)
-{
- bbsink_copystream *mysink = (bbsink_copystream *) sink;
- bbsink_state *state = mysink->base.bbs_state;
- StringInfoData buf;
-
- mysink->bytes_done_at_last_time_check = state->bytes_done;
- mysink->last_progress_report_time = GetCurrentTimestamp();
- pq_beginmessage(&buf, 'd'); /* CopyData */
- pq_sendbyte(&buf, 'p'); /* Progress report */
- pq_sendint64(&buf, state->bytes_done);
- pq_endmessage(&buf);
- pq_flush_if_writable();
-}
-
-/*
- * Send a CopyData message announcing the beginning of the backup manifest.
- */
-static void
-bbsink_copystream_begin_manifest(bbsink *sink)
-{
- StringInfoData buf;
-
- pq_beginmessage(&buf, 'd'); /* CopyData */
- pq_sendbyte(&buf, 'm'); /* Manifest */
- pq_endmessage(&buf);
-}
-
-/*
- * Each chunk of manifest data is sent using a CopyData message.
- */
-static void
-bbsink_copystream_manifest_contents(bbsink *sink, size_t len)
-{
- bbsink_copystream *mysink = (bbsink_copystream *) sink;
-
- if (mysink->send_to_client)
- {
- /* Add one because we're also sending a leading type byte. */
- pq_putmessage('d', mysink->msgbuffer, len + 1);
- }
-}
-
-/*
- * We don't need an explicit terminator for the backup manifest.
- */
-static void
-bbsink_copystream_end_manifest(bbsink *sink)
-{
- /* Do nothing. */
-}
-
-/*
- * Send end-of-backup wire protocol messages.
- */
-static void
-bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr,
- TimeLineID endtli)
-{
- SendCopyDone();
- SendXlogRecPtrResult(endptr, endtli);
-}
-
-/*
- * Cleanup.
- */
-static void
-bbsink_copystream_cleanup(bbsink *sink)
-{
- /* Nothing to do. */
-}
-
-/*
- * Send a CopyOutResponse message.
- */
-static void
-SendCopyOutResponse(void)
-{
- StringInfoData buf;
-
- pq_beginmessage(&buf, 'H');
- pq_sendbyte(&buf, 0); /* overall format */
- pq_sendint16(&buf, 0); /* natts */
- pq_endmessage(&buf);
-}
-
-/*
- * Send a CopyDone message.
- */
-static void
-SendCopyDone(void)
-{
- pq_putemptymessage('c');
-}
-
-/*
- * Send a single resultset containing just a single
- * XLogRecPtr record (in text format)
- */
-static void
-SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
-{
- DestReceiver *dest;
- TupOutputState *tstate;
- TupleDesc tupdesc;
- Datum values[2];
- bool nulls[2] = {0};
-
- dest = CreateDestReceiver(DestRemoteSimple);
-
- tupdesc = CreateTemplateTupleDesc(2);
- TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "recptr", TEXTOID, -1, 0);
- /*
- * int8 may seem like a surprising data type for this, but in theory int4
- * would not be wide enough for this, as TimeLineID is unsigned.
- */
- TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "tli", INT8OID, -1, 0);
-
- /* send RowDescription */
- tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
-
- /* Data row */
- values[0]= CStringGetTextDatum(psprintf("%X/%X", LSN_FORMAT_ARGS(ptr)));
- values[1] = Int64GetDatum(tli);
- do_tup_output(tstate, values, nulls);
-
- end_tup_output(tstate);
-
- /* Send a CommandComplete message */
- pq_puttextmessage('C', "SELECT");
-}
-
-/*
- * Send a result set via libpq describing the tablespace list.
- */
-static void
-SendTablespaceList(List *tablespaces)
-{
- DestReceiver *dest;
- TupOutputState *tstate;
- TupleDesc tupdesc;
- ListCell *lc;
-
- dest = CreateDestReceiver(DestRemoteSimple);
-
- tupdesc = CreateTemplateTupleDesc(3);
- TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "spcoid", OIDOID, -1, 0);
- TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "spclocation", TEXTOID, -1, 0);
- TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "size", INT8OID, -1, 0);
-
- /* send RowDescription */
- tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
-
- /* Construct and send the directory information */
- foreach(lc, tablespaces)
- {
- tablespaceinfo *ti = lfirst(lc);
- Datum values[3];
- bool nulls[3] = {0};
-
- /* Send one datarow message */
- if (ti->path == NULL)
- {
- nulls[0] = true;
- nulls[1] = true;
- }
- else
- {
- values[0] = ObjectIdGetDatum(strtoul(ti->oid, NULL, 10));
- values[1] = CStringGetTextDatum(ti->path);
- }
- if (ti->size >= 0)
- values[2] = Int64GetDatum(ti->size / 1024);
- else
- nulls[2] = true;
-
- do_tup_output(tstate, values, nulls);
- }
-
- end_tup_output(tstate);
-}
diff --git a/src/backend/replication/basebackup_gzip.c b/src/backend/replication/basebackup_gzip.c
deleted file mode 100644
index ef2b954946a..00000000000
--- a/src/backend/replication/basebackup_gzip.c
+++ /dev/null
@@ -1,308 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * basebackup_gzip.c
- * Basebackup sink implementing gzip compression.
- *
- * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
- *
- * IDENTIFICATION
- * src/backend/replication/basebackup_gzip.c
- *
- *-------------------------------------------------------------------------
- */
-#include "postgres.h"
-
-#ifdef HAVE_LIBZ
-#include <zlib.h>
-#endif
-
-#include "replication/basebackup_sink.h"
-
-#ifdef HAVE_LIBZ
-typedef struct bbsink_gzip
-{
- /* Common information for all types of sink. */
- bbsink base;
-
- /* Compression level. */
- int compresslevel;
-
- /* Compressed data stream. */
- z_stream zstream;
-
- /* Number of bytes staged in output buffer. */
- size_t bytes_written;
-} bbsink_gzip;
-
-static void bbsink_gzip_begin_backup(bbsink *sink);
-static void bbsink_gzip_begin_archive(bbsink *sink, const char *archive_name);
-static void bbsink_gzip_archive_contents(bbsink *sink, size_t len);
-static void bbsink_gzip_manifest_contents(bbsink *sink, size_t len);
-static void bbsink_gzip_end_archive(bbsink *sink);
-static void *gzip_palloc(void *opaque, unsigned items, unsigned size);
-static void gzip_pfree(void *opaque, void *address);
-
-static const bbsink_ops bbsink_gzip_ops = {
- .begin_backup = bbsink_gzip_begin_backup,
- .begin_archive = bbsink_gzip_begin_archive,
- .archive_contents = bbsink_gzip_archive_contents,
- .end_archive = bbsink_gzip_end_archive,
- .begin_manifest = bbsink_forward_begin_manifest,
- .manifest_contents = bbsink_gzip_manifest_contents,
- .end_manifest = bbsink_forward_end_manifest,
- .end_backup = bbsink_forward_end_backup,
- .cleanup = bbsink_forward_cleanup
-};
-#endif
-
-/*
- * Create a new basebackup sink that performs gzip compression.
- */
-bbsink *
-bbsink_gzip_new(bbsink *next, pg_compress_specification *compress)
-{
-#ifndef HAVE_LIBZ
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("gzip compression is not supported by this build")));
- return NULL; /* keep compiler quiet */
-#else
- bbsink_gzip *sink;
- int compresslevel;
-
- Assert(next != NULL);
-
- if ((compress->options & PG_COMPRESSION_OPTION_LEVEL) == 0)
- compresslevel = Z_DEFAULT_COMPRESSION;
- else
- {
- compresslevel = compress->level;
- Assert(compresslevel >= 1 && compresslevel <= 9);
- }
-
- sink = palloc0(sizeof(bbsink_gzip));
- *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_gzip_ops;
- sink->base.bbs_next = next;
- sink->compresslevel = compresslevel;
-
- return &sink->base;
-#endif
-}
-
-#ifdef HAVE_LIBZ
-
-/*
- * Begin backup.
- */
-static void
-bbsink_gzip_begin_backup(bbsink *sink)
-{
- /*
- * We need our own buffer, because we're going to pass different data to
- * the next sink than what gets passed to us.
- */
- sink->bbs_buffer = palloc(sink->bbs_buffer_length);
-
- /*
- * Since deflate() doesn't require the output buffer to be of any
- * particular size, we can just make it the same size as the input buffer.
- */
- bbsink_begin_backup(sink->bbs_next, sink->bbs_state,
- sink->bbs_buffer_length);
-}
-
-/*
- * Prepare to compress the next archive.
- */
-static void
-bbsink_gzip_begin_archive(bbsink *sink, const char *archive_name)
-{
- bbsink_gzip *mysink = (bbsink_gzip *) sink;
- char *gz_archive_name;
- z_stream *zs = &mysink->zstream;
-
- /* Initialize compressor object. */
- memset(zs, 0, sizeof(z_stream));
- zs->zalloc = gzip_palloc;
- zs->zfree = gzip_pfree;
- zs->next_out = (uint8 *) sink->bbs_next->bbs_buffer;
- zs->avail_out = sink->bbs_next->bbs_buffer_length;
-
- /*
- * We need to use deflateInit2() rather than deflateInit() here so that we
- * can request a gzip header rather than a zlib header. Otherwise, we want
- * to supply the same values that would have been used by default if we
- * had just called deflateInit().
- *
- * Per the documentation for deflateInit2, the third argument must be
- * Z_DEFLATED; the fourth argument is the number of "window bits", by
- * default 15, but adding 16 gets you a gzip header rather than a zlib
- * header; the fifth argument controls memory usage, and 8 is the default;
- * and likewise Z_DEFAULT_STRATEGY is the default for the sixth argument.
- */
- if (deflateInit2(zs, mysink->compresslevel, Z_DEFLATED, 15 + 16, 8,
- Z_DEFAULT_STRATEGY) != Z_OK)
- ereport(ERROR,
- errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("could not initialize compression library"));
-
- /*
- * Add ".gz" to the archive name. Note that the pg_basebackup -z produces
- * archives named ".tar.gz" rather than ".tgz", so we match that here.
- */
- gz_archive_name = psprintf("%s.gz", archive_name);
- Assert(sink->bbs_next != NULL);
- bbsink_begin_archive(sink->bbs_next, gz_archive_name);
- pfree(gz_archive_name);
-}
-
-/*
- * Compress the input data to the output buffer until we run out of input
- * data. Each time the output buffer fills up, invoke the archive_contents()
- * method for then next sink.
- *
- * Note that since we're compressing the input, it may very commonly happen
- * that we consume all the input data without filling the output buffer. In
- * that case, the compressed representation of the current input data won't
- * actually be sent to the next bbsink until a later call to this function,
- * or perhaps even not until bbsink_gzip_end_archive() is invoked.
- */
-static void
-bbsink_gzip_archive_contents(bbsink *sink, size_t len)
-{
- bbsink_gzip *mysink = (bbsink_gzip *) sink;
- z_stream *zs = &mysink->zstream;
-
- /* Compress data from input buffer. */
- zs->next_in = (uint8 *) mysink->base.bbs_buffer;
- zs->avail_in = len;
-
- while (zs->avail_in > 0)
- {
- int res;
-
- /* Write output data into unused portion of output buffer. */
- Assert(mysink->bytes_written < mysink->base.bbs_next->bbs_buffer_length);
- zs->next_out = (uint8 *)
- mysink->base.bbs_next->bbs_buffer + mysink->bytes_written;
- zs->avail_out =
- mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written;
-
- /*
- * Try to compress. Note that this will update zs->next_in and
- * zs->avail_in according to how much input data was consumed, and
- * zs->next_out and zs->avail_out according to how many output bytes
- * were produced.
- *
- * According to the zlib documentation, Z_STREAM_ERROR should only
- * occur if we've made a programming error, or if say there's been a
- * memory clobber; we use elog() rather than Assert() here out of an
- * abundance of caution.
- */
- res = deflate(zs, Z_NO_FLUSH);
- if (res == Z_STREAM_ERROR)
- elog(ERROR, "could not compress data: %s", zs->msg);
-
- /* Update our notion of how many bytes we've written. */
- mysink->bytes_written =
- mysink->base.bbs_next->bbs_buffer_length - zs->avail_out;
-
- /*
- * If the output buffer is full, it's time for the next sink to
- * process the contents.
- */
- if (mysink->bytes_written >= mysink->base.bbs_next->bbs_buffer_length)
- {
- bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
- mysink->bytes_written = 0;
- }
- }
-}
-
-/*
- * There might be some data inside zlib's internal buffers; we need to get
- * that flushed out and forwarded to the successor sink as archive content.
- *
- * Then we can end processing for this archive.
- */
-static void
-bbsink_gzip_end_archive(bbsink *sink)
-{
- bbsink_gzip *mysink = (bbsink_gzip *) sink;
- z_stream *zs = &mysink->zstream;
-
- /* There is no more data available. */
- zs->next_in = (uint8 *) mysink->base.bbs_buffer;
- zs->avail_in = 0;
-
- while (1)
- {
- int res;
-
- /* Write output data into unused portion of output buffer. */
- Assert(mysink->bytes_written < mysink->base.bbs_next->bbs_buffer_length);
- zs->next_out = (uint8 *)
- mysink->base.bbs_next->bbs_buffer + mysink->bytes_written;
- zs->avail_out =
- mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written;
-
- /*
- * As bbsink_gzip_archive_contents, but pass Z_FINISH since there is
- * no more input.
- */
- res = deflate(zs, Z_FINISH);
- if (res == Z_STREAM_ERROR)
- elog(ERROR, "could not compress data: %s", zs->msg);
-
- /* Update our notion of how many bytes we've written. */
- mysink->bytes_written =
- mysink->base.bbs_next->bbs_buffer_length - zs->avail_out;
-
- /*
- * Apparently we had no data in the output buffer and deflate() was
- * not able to add any. We must be done.
- */
- if (mysink->bytes_written == 0)
- break;
-
- /* Send whatever accumulated output bytes we have. */
- bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
- mysink->bytes_written = 0;
- }
-
- /* Must also pass on the information that this archive has ended. */
- bbsink_forward_end_archive(sink);
-}
-
-/*
- * Manifest contents are not compressed, but we do need to copy them into
- * the successor sink's buffer, because we have our own.
- */
-static void
-bbsink_gzip_manifest_contents(bbsink *sink, size_t len)
-{
- memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len);
- bbsink_manifest_contents(sink->bbs_next, len);
-}
-
-/*
- * Wrapper function to adjust the signature of palloc to match what libz
- * expects.
- */
-static void *
-gzip_palloc(void *opaque, unsigned items, unsigned size)
-{
- return palloc(items * size);
-}
-
-/*
- * Wrapper function to adjust the signature of pfree to match what libz
- * expects.
- */
-static void
-gzip_pfree(void *opaque, void *address)
-{
- pfree(address);
-}
-
-#endif
diff --git a/src/backend/replication/basebackup_lz4.c b/src/backend/replication/basebackup_lz4.c
deleted file mode 100644
index c9d19b6c448..00000000000
--- a/src/backend/replication/basebackup_lz4.c
+++ /dev/null
@@ -1,301 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * basebackup_lz4.c
- * Basebackup sink implementing lz4 compression.
- *
- * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
- *
- * IDENTIFICATION
- * src/backend/replication/basebackup_lz4.c
- *
- *-------------------------------------------------------------------------
- */
-#include "postgres.h"
-
-#ifdef USE_LZ4
-#include <lz4frame.h>
-#endif
-
-#include "replication/basebackup_sink.h"
-
-#ifdef USE_LZ4
-
-typedef struct bbsink_lz4
-{
- /* Common information for all types of sink. */
- bbsink base;
-
- /* Compression level. */
- int compresslevel;
-
- LZ4F_compressionContext_t ctx;
- LZ4F_preferences_t prefs;
-
- /* Number of bytes staged in output buffer. */
- size_t bytes_written;
-} bbsink_lz4;
-
-static void bbsink_lz4_begin_backup(bbsink *sink);
-static void bbsink_lz4_begin_archive(bbsink *sink, const char *archive_name);
-static void bbsink_lz4_archive_contents(bbsink *sink, size_t avail_in);
-static void bbsink_lz4_manifest_contents(bbsink *sink, size_t len);
-static void bbsink_lz4_end_archive(bbsink *sink);
-static void bbsink_lz4_cleanup(bbsink *sink);
-
-static const bbsink_ops bbsink_lz4_ops = {
- .begin_backup = bbsink_lz4_begin_backup,
- .begin_archive = bbsink_lz4_begin_archive,
- .archive_contents = bbsink_lz4_archive_contents,
- .end_archive = bbsink_lz4_end_archive,
- .begin_manifest = bbsink_forward_begin_manifest,
- .manifest_contents = bbsink_lz4_manifest_contents,
- .end_manifest = bbsink_forward_end_manifest,
- .end_backup = bbsink_forward_end_backup,
- .cleanup = bbsink_lz4_cleanup
-};
-#endif
-
-/*
- * Create a new basebackup sink that performs lz4 compression.
- */
-bbsink *
-bbsink_lz4_new(bbsink *next, pg_compress_specification *compress)
-{
-#ifndef USE_LZ4
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("lz4 compression is not supported by this build")));
- return NULL; /* keep compiler quiet */
-#else
- bbsink_lz4 *sink;
- int compresslevel;
-
- Assert(next != NULL);
-
- if ((compress->options & PG_COMPRESSION_OPTION_LEVEL) == 0)
- compresslevel = 0;
- else
- {
- compresslevel = compress->level;
- Assert(compresslevel >= 1 && compresslevel <= 12);
- }
-
- sink = palloc0(sizeof(bbsink_lz4));
- *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_lz4_ops;
- sink->base.bbs_next = next;
- sink->compresslevel = compresslevel;
-
- return &sink->base;
-#endif
-}
-
-#ifdef USE_LZ4
-
-/*
- * Begin backup.
- */
-static void
-bbsink_lz4_begin_backup(bbsink *sink)
-{
- bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
- size_t output_buffer_bound;
- LZ4F_preferences_t *prefs = &mysink->prefs;
-
- /* Initialize compressor object. */
- memset(prefs, 0, sizeof(LZ4F_preferences_t));
- prefs->frameInfo.blockSizeID = LZ4F_max256KB;
- prefs->compressionLevel = mysink->compresslevel;
-
- /*
- * We need our own buffer, because we're going to pass different data to
- * the next sink than what gets passed to us.
- */
- mysink->base.bbs_buffer = palloc(mysink->base.bbs_buffer_length);
-
- /*
- * Since LZ4F_compressUpdate() requires the output buffer of size equal or
- * greater than that of LZ4F_compressBound(), make sure we have the next
- * sink's bbs_buffer of length that can accommodate the compressed input
- * buffer.
- */
- output_buffer_bound = LZ4F_compressBound(mysink->base.bbs_buffer_length,
- &mysink->prefs);
-
- /*
- * The buffer length is expected to be a multiple of BLCKSZ, so round up.
- */
- output_buffer_bound = output_buffer_bound + BLCKSZ -
- (output_buffer_bound % BLCKSZ);
-
- bbsink_begin_backup(sink->bbs_next, sink->bbs_state, output_buffer_bound);
-}
-
-/*
- * Prepare to compress the next archive.
- */
-static void
-bbsink_lz4_begin_archive(bbsink *sink, const char *archive_name)
-{
- bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
- char *lz4_archive_name;
- LZ4F_errorCode_t ctxError;
- size_t headerSize;
-
- ctxError = LZ4F_createCompressionContext(&mysink->ctx, LZ4F_VERSION);
- if (LZ4F_isError(ctxError))
- elog(ERROR, "could not create lz4 compression context: %s",
- LZ4F_getErrorName(ctxError));
-
- /* First of all write the frame header to destination buffer. */
- headerSize = LZ4F_compressBegin(mysink->ctx,
- mysink->base.bbs_next->bbs_buffer,
- mysink->base.bbs_next->bbs_buffer_length,
- &mysink->prefs);
-
- if (LZ4F_isError(headerSize))
- elog(ERROR, "could not write lz4 header: %s",
- LZ4F_getErrorName(headerSize));
-
- /*
- * We need to write the compressed data after the header in the output
- * buffer. So, make sure to update the notion of bytes written to output
- * buffer.
- */
- mysink->bytes_written += headerSize;
-
- /* Add ".lz4" to the archive name. */
- lz4_archive_name = psprintf("%s.lz4", archive_name);
- Assert(sink->bbs_next != NULL);
- bbsink_begin_archive(sink->bbs_next, lz4_archive_name);
- pfree(lz4_archive_name);
-}
-
-/*
- * Compress the input data to the output buffer until we run out of input
- * data. Each time the output buffer falls below the compression bound for
- * the input buffer, invoke the archive_contents() method for then next sink.
- *
- * Note that since we're compressing the input, it may very commonly happen
- * that we consume all the input data without filling the output buffer. In
- * that case, the compressed representation of the current input data won't
- * actually be sent to the next bbsink until a later call to this function,
- * or perhaps even not until bbsink_lz4_end_archive() is invoked.
- */
-static void
-bbsink_lz4_archive_contents(bbsink *sink, size_t avail_in)
-{
- bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
- size_t compressedSize;
- size_t avail_in_bound;
-
- avail_in_bound = LZ4F_compressBound(avail_in, &mysink->prefs);
-
- /*
- * If the number of available bytes has fallen below the value computed by
- * LZ4F_compressBound(), ask the next sink to process the data so that we
- * can empty the buffer.
- */
- if ((mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written) <
- avail_in_bound)
- {
- bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
- mysink->bytes_written = 0;
- }
-
- /*
- * Compress the input buffer and write it into the output buffer.
- */
- compressedSize = LZ4F_compressUpdate(mysink->ctx,
- mysink->base.bbs_next->bbs_buffer + mysink->bytes_written,
- mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written,
- (uint8 *) mysink->base.bbs_buffer,
- avail_in,
- NULL);
-
- if (LZ4F_isError(compressedSize))
- elog(ERROR, "could not compress data: %s",
- LZ4F_getErrorName(compressedSize));
-
- /*
- * Update our notion of how many bytes we've written into output buffer.
- */
- mysink->bytes_written += compressedSize;
-}
-
-/*
- * There might be some data inside lz4's internal buffers; we need to get
- * that flushed out and also finalize the lz4 frame and then get that forwarded
- * to the successor sink as archive content.
- *
- * Then we can end processing for this archive.
- */
-static void
-bbsink_lz4_end_archive(bbsink *sink)
-{
- bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
- size_t compressedSize;
- size_t lz4_footer_bound;
-
- lz4_footer_bound = LZ4F_compressBound(0, &mysink->prefs);
-
- Assert(mysink->base.bbs_next->bbs_buffer_length >= lz4_footer_bound);
-
- if ((mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written) <
- lz4_footer_bound)
- {
- bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
- mysink->bytes_written = 0;
- }
-
- compressedSize = LZ4F_compressEnd(mysink->ctx,
- mysink->base.bbs_next->bbs_buffer + mysink->bytes_written,
- mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written,
- NULL);
-
- if (LZ4F_isError(compressedSize))
- elog(ERROR, "could not end lz4 compression: %s",
- LZ4F_getErrorName(compressedSize));
-
- /* Update our notion of how many bytes we've written. */
- mysink->bytes_written += compressedSize;
-
- /* Send whatever accumulated output bytes we have. */
- bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
- mysink->bytes_written = 0;
-
- /* Release the resources. */
- LZ4F_freeCompressionContext(mysink->ctx);
- mysink->ctx = NULL;
-
- /* Pass on the information that this archive has ended. */
- bbsink_forward_end_archive(sink);
-}
-
-/*
- * Manifest contents are not compressed, but we do need to copy them into
- * the successor sink's buffer, because we have our own.
- */
-static void
-bbsink_lz4_manifest_contents(bbsink *sink, size_t len)
-{
- memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len);
- bbsink_manifest_contents(sink->bbs_next, len);
-}
-
-/*
- * In case the backup fails, make sure we free the compression context by
- * calling LZ4F_freeCompressionContext() if needed to avoid memory leak.
- */
-static void
-bbsink_lz4_cleanup(bbsink *sink)
-{
- bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
-
- if (mysink->ctx)
- {
- LZ4F_freeCompressionContext(mysink->ctx);
- mysink->ctx = NULL;
- }
-}
-
-#endif
diff --git a/src/backend/replication/basebackup_progress.c b/src/backend/replication/basebackup_progress.c
deleted file mode 100644
index 36671ad3fda..00000000000
--- a/src/backend/replication/basebackup_progress.c
+++ /dev/null
@@ -1,246 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * basebackup_progress.c
- * Basebackup sink implementing progress tracking, including but not
- * limited to command progress reporting.
- *
- * This should be used even if the PROGRESS option to the replication
- * command BASE_BACKUP is not specified. Without that option, we won't
- * have tallied up the size of the files that are going to need to be
- * backed up, but we can still report to the command progress reporting
- * facility how much data we've processed.
- *
- * Moreover, we also use this as a convenient place to update certain
- * fields of the bbsink_state. That work is accurately described as
- * keeping track of our progress, but it's not just for introspection.
- * We need those fields to be updated properly in order for base backups
- * to work.
- *
- * This particular basebackup sink requires extra callbacks that most base
- * backup sinks don't. Rather than cramming those into the interface, we just
- * have a few extra functions here that basebackup.c can call. (We could put
- * the logic directly into that file as it's fairly simple, but it seems
- * cleaner to have everything related to progress reporting in one place.)
- *
- * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
- *
- * IDENTIFICATION
- * src/backend/replication/basebackup_progress.c
- *
- *-------------------------------------------------------------------------
- */
-#include "postgres.h"
-
-#include "commands/progress.h"
-#include "miscadmin.h"
-#include "replication/basebackup.h"
-#include "replication/basebackup_sink.h"
-#include "pgstat.h"
-#include "storage/latch.h"
-#include "utils/timestamp.h"
-
-static void bbsink_progress_begin_backup(bbsink *sink);
-static void bbsink_progress_archive_contents(bbsink *sink, size_t len);
-static void bbsink_progress_end_archive(bbsink *sink);
-
-static const bbsink_ops bbsink_progress_ops = {
- .begin_backup = bbsink_progress_begin_backup,
- .begin_archive = bbsink_forward_begin_archive,
- .archive_contents = bbsink_progress_archive_contents,
- .end_archive = bbsink_progress_end_archive,
- .begin_manifest = bbsink_forward_begin_manifest,
- .manifest_contents = bbsink_forward_manifest_contents,
- .end_manifest = bbsink_forward_end_manifest,
- .end_backup = bbsink_forward_end_backup,
- .cleanup = bbsink_forward_cleanup
-};
-
-/*
- * Create a new basebackup sink that performs progress tracking functions and
- * forwards data to a successor sink.
- */
-bbsink *
-bbsink_progress_new(bbsink *next, bool estimate_backup_size)
-{
- bbsink *sink;
-
- Assert(next != NULL);
-
- sink = palloc0(sizeof(bbsink));
- *((const bbsink_ops **) &sink->bbs_ops) = &bbsink_progress_ops;
- sink->bbs_next = next;
-
- /*
- * Report that a base backup is in progress, and set the total size of the
- * backup to -1, which will get translated to NULL. If we're estimating
- * the backup size, we'll insert the real estimate when we have it.
- */
- pgstat_progress_start_command(PROGRESS_COMMAND_BASEBACKUP, InvalidOid);
- pgstat_progress_update_param(PROGRESS_BASEBACKUP_BACKUP_TOTAL, -1);
-
- return sink;
-}
-
-/*
- * Progress reporting at start of backup.
- */
-static void
-bbsink_progress_begin_backup(bbsink *sink)
-{
- const int index[] = {
- PROGRESS_BASEBACKUP_PHASE,
- PROGRESS_BASEBACKUP_BACKUP_TOTAL,
- PROGRESS_BASEBACKUP_TBLSPC_TOTAL
- };
- int64 val[3];
-
- /*
- * Report that we are now streaming database files as a base backup. Also
- * advertise the number of tablespaces, and, if known, the estimated total
- * backup size.
- */
- val[0] = PROGRESS_BASEBACKUP_PHASE_STREAM_BACKUP;
- if (sink->bbs_state->bytes_total_is_valid)
- val[1] = sink->bbs_state->bytes_total;
- else
- val[1] = -1;
- val[2] = list_length(sink->bbs_state->tablespaces);
- pgstat_progress_update_multi_param(3, index, val);
-
- /* Delegate to next sink. */
- bbsink_forward_begin_backup(sink);
-}
-
-/*
- * End-of archive progress reporting.
- */
-static void
-bbsink_progress_end_archive(bbsink *sink)
-{
- /*
- * We expect one archive per tablespace, so reaching the end of an archive
- * also means reaching the end of a tablespace. (Some day we might have a
- * reason to decouple these concepts.)
- *
- * If WAL is included in the backup, we'll mark the last tablespace
- * complete before the last archive is complete, so we need a guard here
- * to ensure that the number of tablespaces streamed doesn't exceed the
- * total.
- */
- if (sink->bbs_state->tablespace_num < list_length(sink->bbs_state->tablespaces))
- pgstat_progress_update_param(PROGRESS_BASEBACKUP_TBLSPC_STREAMED,
- sink->bbs_state->tablespace_num + 1);
-
- /* Delegate to next sink. */
- bbsink_forward_end_archive(sink);
-
- /*
- * This is a convenient place to update the bbsink_state's notion of which
- * is the current tablespace. Note that the bbsink_state object is shared
- * across all bbsink objects involved, but we're the outermost one and
- * this is the very last thing we do.
- */
- sink->bbs_state->tablespace_num++;
-}
-
-/*
- * Handle progress tracking for new archive contents.
- *
- * Increment the counter for the amount of data already streamed
- * by the given number of bytes, and update the progress report for
- * pg_stat_progress_basebackup.
- */
-static void
-bbsink_progress_archive_contents(bbsink *sink, size_t len)
-{
- bbsink_state *state = sink->bbs_state;
- const int index[] = {
- PROGRESS_BASEBACKUP_BACKUP_STREAMED,
- PROGRESS_BASEBACKUP_BACKUP_TOTAL
- };
- int64 val[2];
- int nparam = 0;
-
- /* First update bbsink_state with # of bytes done. */
- state->bytes_done += len;
-
- /* Now forward to next sink. */
- bbsink_forward_archive_contents(sink, len);
-
- /* Prepare to set # of bytes done for command progress reporting. */
- val[nparam++] = state->bytes_done;
-
- /*
- * We may also want to update # of total bytes, to avoid overflowing past
- * 100% or the full size. This may make the total size number change as we
- * approach the end of the backup (the estimate will always be wrong if
- * WAL is included), but that's better than having the done column be
- * bigger than the total.
- */
- if (state->bytes_total_is_valid && state->bytes_done > state->bytes_total)
- val[nparam++] = state->bytes_done;
-
- pgstat_progress_update_multi_param(nparam, index, val);
-}
-
-/*
- * Advertise that we are waiting for the start-of-backup checkpoint.
- */
-void
-basebackup_progress_wait_checkpoint(void)
-{
- pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE,
- PROGRESS_BASEBACKUP_PHASE_WAIT_CHECKPOINT);
-}
-
-/*
- * Advertise that we are estimating the backup size.
- */
-void
-basebackup_progress_estimate_backup_size(void)
-{
- pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE,
- PROGRESS_BASEBACKUP_PHASE_ESTIMATE_BACKUP_SIZE);
-}
-
-/*
- * Advertise that we are waiting for WAL archiving at end-of-backup.
- */
-void
-basebackup_progress_wait_wal_archive(bbsink_state *state)
-{
- const int index[] = {
- PROGRESS_BASEBACKUP_PHASE,
- PROGRESS_BASEBACKUP_TBLSPC_STREAMED
- };
- int64 val[2];
-
- /*
- * We report having finished all tablespaces at this point, even if the
- * archive for the main tablespace is still open, because what's going to
- * be added is WAL files, not files that are really from the main
- * tablespace.
- */
- val[0] = PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE;
- val[1] = list_length(state->tablespaces);
- pgstat_progress_update_multi_param(2, index, val);
-}
-
-/*
- * Advertise that we are transferring WAL files into the final archive.
- */
-void
-basebackup_progress_transfer_wal(void)
-{
- pgstat_progress_update_param(PROGRESS_BASEBACKUP_PHASE,
- PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL);
-}
-
-/*
- * Advertise that we are no longer performing a backup.
- */
-void
-basebackup_progress_done(void)
-{
- pgstat_progress_end_command();
-}
diff --git a/src/backend/replication/basebackup_server.c b/src/backend/replication/basebackup_server.c
deleted file mode 100644
index 9b4847d90cc..00000000000
--- a/src/backend/replication/basebackup_server.c
+++ /dev/null
@@ -1,309 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * basebackup_server.c
- * store basebackup archives on the server
- *
- * IDENTIFICATION
- * src/backend/replication/basebackup_server.c
- *
- *-------------------------------------------------------------------------
- */
-#include "postgres.h"
-
-#include "access/xact.h"
-#include "catalog/pg_authid.h"
-#include "miscadmin.h"
-#include "replication/basebackup.h"
-#include "replication/basebackup_sink.h"
-#include "storage/fd.h"
-#include "utils/acl.h"
-#include "utils/timestamp.h"
-#include "utils/wait_event.h"
-
-typedef struct bbsink_server
-{
- /* Common information for all types of sink. */
- bbsink base;
-
- /* Directory in which backup is to be stored. */
- char *pathname;
-
- /* Currently open file (or 0 if nothing open). */
- File file;
-
- /* Current file position. */
- off_t filepos;
-} bbsink_server;
-
-static void bbsink_server_begin_archive(bbsink *sink,
- const char *archive_name);
-static void bbsink_server_archive_contents(bbsink *sink, size_t len);
-static void bbsink_server_end_archive(bbsink *sink);
-static void bbsink_server_begin_manifest(bbsink *sink);
-static void bbsink_server_manifest_contents(bbsink *sink, size_t len);
-static void bbsink_server_end_manifest(bbsink *sink);
-
-static const bbsink_ops bbsink_server_ops = {
- .begin_backup = bbsink_forward_begin_backup,
- .begin_archive = bbsink_server_begin_archive,
- .archive_contents = bbsink_server_archive_contents,
- .end_archive = bbsink_server_end_archive,
- .begin_manifest = bbsink_server_begin_manifest,
- .manifest_contents = bbsink_server_manifest_contents,
- .end_manifest = bbsink_server_end_manifest,
- .end_backup = bbsink_forward_end_backup,
- .cleanup = bbsink_forward_cleanup
-};
-
-/*
- * Create a new 'server' bbsink.
- */
-bbsink *
-bbsink_server_new(bbsink *next, char *pathname)
-{
- bbsink_server *sink = palloc0(sizeof(bbsink_server));
-
- *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_server_ops;
- sink->pathname = pathname;
- sink->base.bbs_next = next;
-
- /* Replication permission is not sufficient in this case. */
- StartTransactionCommand();
- if (!has_privs_of_role(GetUserId(), ROLE_PG_WRITE_SERVER_FILES))
- ereport(ERROR,
- (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
- errmsg("must be superuser or a role with privileges of the pg_write_server_files role to create server backup")));
- CommitTransactionCommand();
-
- /*
- * It's not a good idea to store your backups in the same directory that
- * you're backing up. If we allowed a relative path here, that could
- * easily happen accidentally, so we don't. The user could still
- * accomplish the same thing by including the absolute path to $PGDATA in
- * the pathname, but that's likely an intentional bad decision rather than
- * an accident.
- */
- if (!is_absolute_path(pathname))
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_NAME),
- errmsg("relative path not allowed for server backup")));
-
- switch (pg_check_dir(pathname))
- {
- case 0:
-
- /*
- * Does not exist, so create it using the same permissions we'd
- * use for a new subdirectory of the data directory itself.
- */
- if (MakePGDirectory(pathname) < 0)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not create directory \"%s\": %m", pathname)));
- break;
-
- case 1:
- /* Exists, empty. */
- break;
-
- case 2:
- case 3:
- case 4:
- /* Exists, not empty. */
- ereport(ERROR,
- (errcode(ERRCODE_DUPLICATE_FILE),
- errmsg("directory \"%s\" exists but is not empty",
- pathname)));
- break;
-
- default:
- /* Access problem. */
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not access directory \"%s\": %m",
- pathname)));
- }
-
- return &sink->base;
-}
-
-/*
- * Open the correct output file for this archive.
- */
-static void
-bbsink_server_begin_archive(bbsink *sink, const char *archive_name)
-{
- bbsink_server *mysink = (bbsink_server *) sink;
- char *filename;
-
- Assert(mysink->file == 0);
- Assert(mysink->filepos == 0);
-
- filename = psprintf("%s/%s", mysink->pathname, archive_name);
-
- mysink->file = PathNameOpenFile(filename,
- O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
- if (mysink->file <= 0)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not create file \"%s\": %m", filename)));
-
- pfree(filename);
-
- bbsink_forward_begin_archive(sink, archive_name);
-}
-
-/*
- * Write the data to the output file.
- */
-static void
-bbsink_server_archive_contents(bbsink *sink, size_t len)
-{
- bbsink_server *mysink = (bbsink_server *) sink;
- int nbytes;
-
- nbytes = FileWrite(mysink->file, mysink->base.bbs_buffer, len,
- mysink->filepos, WAIT_EVENT_BASEBACKUP_WRITE);
-
- if (nbytes != len)
- {
- if (nbytes < 0)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not write file \"%s\": %m",
- FilePathName(mysink->file)),
- errhint("Check free disk space.")));
- /* short write: complain appropriately */
- ereport(ERROR,
- (errcode(ERRCODE_DISK_FULL),
- errmsg("could not write file \"%s\": wrote only %d of %d bytes at offset %u",
- FilePathName(mysink->file),
- nbytes, (int) len, (unsigned) mysink->filepos),
- errhint("Check free disk space.")));
- }
-
- mysink->filepos += nbytes;
-
- bbsink_forward_archive_contents(sink, len);
-}
-
-/*
- * fsync and close the current output file.
- */
-static void
-bbsink_server_end_archive(bbsink *sink)
-{
- bbsink_server *mysink = (bbsink_server *) sink;
-
- /*
- * We intentionally don't use data_sync_elevel here, because the server
- * shouldn't PANIC just because we can't guarantee that the backup has
- * been written down to disk. Running recovery won't fix anything in this
- * case anyway.
- */
- if (FileSync(mysink->file, WAIT_EVENT_BASEBACKUP_SYNC) < 0)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not fsync file \"%s\": %m",
- FilePathName(mysink->file))));
-
-
- /* We're done with this file now. */
- FileClose(mysink->file);
- mysink->file = 0;
- mysink->filepos = 0;
-
- bbsink_forward_end_archive(sink);
-}
-
-/*
- * Open the output file to which we will write the manifest.
- *
- * Just like pg_basebackup, we write the manifest first under a temporary
- * name and then rename it into place after fsync. That way, if the manifest
- * is there and under the correct name, the user can be sure that the backup
- * completed.
- */
-static void
-bbsink_server_begin_manifest(bbsink *sink)
-{
- bbsink_server *mysink = (bbsink_server *) sink;
- char *tmp_filename;
-
- Assert(mysink->file == 0);
-
- tmp_filename = psprintf("%s/backup_manifest.tmp", mysink->pathname);
-
- mysink->file = PathNameOpenFile(tmp_filename,
- O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
- if (mysink->file <= 0)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not create file \"%s\": %m", tmp_filename)));
-
- pfree(tmp_filename);
-
- bbsink_forward_begin_manifest(sink);
-}
-
-/*
- * Each chunk of manifest data is sent using a CopyData message.
- */
-static void
-bbsink_server_manifest_contents(bbsink *sink, size_t len)
-{
- bbsink_server *mysink = (bbsink_server *) sink;
- int nbytes;
-
- nbytes = FileWrite(mysink->file, mysink->base.bbs_buffer, len,
- mysink->filepos, WAIT_EVENT_BASEBACKUP_WRITE);
-
- if (nbytes != len)
- {
- if (nbytes < 0)
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not write file \"%s\": %m",
- FilePathName(mysink->file)),
- errhint("Check free disk space.")));
- /* short write: complain appropriately */
- ereport(ERROR,
- (errcode(ERRCODE_DISK_FULL),
- errmsg("could not write file \"%s\": wrote only %d of %d bytes at offset %u",
- FilePathName(mysink->file),
- nbytes, (int) len, (unsigned) mysink->filepos),
- errhint("Check free disk space.")));
- }
-
- mysink->filepos += nbytes;
-
- bbsink_forward_manifest_contents(sink, len);
-}
-
-/*
- * fsync the backup manifest, close the file, and then rename it into place.
- */
-static void
-bbsink_server_end_manifest(bbsink *sink)
-{
- bbsink_server *mysink = (bbsink_server *) sink;
- char *tmp_filename;
- char *filename;
-
- /* We're done with this file now. */
- FileClose(mysink->file);
- mysink->file = 0;
-
- /*
- * Rename it into place. This also fsyncs the temporary file, so we don't
- * need to do that here. We don't use data_sync_elevel here for the same
- * reasons as in bbsink_server_end_archive.
- */
- tmp_filename = psprintf("%s/backup_manifest.tmp", mysink->pathname);
- filename = psprintf("%s/backup_manifest", mysink->pathname);
- durable_rename(tmp_filename, filename, ERROR);
- pfree(filename);
- pfree(tmp_filename);
-
- bbsink_forward_end_manifest(sink);
-}
diff --git a/src/backend/replication/basebackup_sink.c b/src/backend/replication/basebackup_sink.c
deleted file mode 100644
index 81353f8f4d1..00000000000
--- a/src/backend/replication/basebackup_sink.c
+++ /dev/null
@@ -1,125 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * basebackup_sink.c
- * Default implementations for bbsink (basebackup sink) callbacks.
- *
- * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
- *
- * src/backend/replication/basebackup_sink.c
- *
- *-------------------------------------------------------------------------
- */
-
-#include "postgres.h"
-
-#include "replication/basebackup_sink.h"
-
-/*
- * Forward begin_backup callback.
- *
- * Only use this implementation if you want the bbsink you're implementing to
- * share a buffer with the successor bbsink.
- */
-void
-bbsink_forward_begin_backup(bbsink *sink)
-{
- Assert(sink->bbs_next != NULL);
- Assert(sink->bbs_state != NULL);
- bbsink_begin_backup(sink->bbs_next, sink->bbs_state,
- sink->bbs_buffer_length);
- sink->bbs_buffer = sink->bbs_next->bbs_buffer;
-}
-
-/*
- * Forward begin_archive callback.
- */
-void
-bbsink_forward_begin_archive(bbsink *sink, const char *archive_name)
-{
- Assert(sink->bbs_next != NULL);
- bbsink_begin_archive(sink->bbs_next, archive_name);
-}
-
-/*
- * Forward archive_contents callback.
- *
- * Code that wants to use this should initialize its own bbs_buffer and
- * bbs_buffer_length fields to the values from the successor sink. In cases
- * where the buffer isn't shared, the data needs to be copied before forwarding
- * the callback. We don't do try to do that here, because there's really no
- * reason to have separately allocated buffers containing the same identical
- * data.
- */
-void
-bbsink_forward_archive_contents(bbsink *sink, size_t len)
-{
- Assert(sink->bbs_next != NULL);
- Assert(sink->bbs_buffer == sink->bbs_next->bbs_buffer);
- Assert(sink->bbs_buffer_length == sink->bbs_next->bbs_buffer_length);
- bbsink_archive_contents(sink->bbs_next, len);
-}
-
-/*
- * Forward end_archive callback.
- */
-void
-bbsink_forward_end_archive(bbsink *sink)
-{
- Assert(sink->bbs_next != NULL);
- bbsink_end_archive(sink->bbs_next);
-}
-
-/*
- * Forward begin_manifest callback.
- */
-void
-bbsink_forward_begin_manifest(bbsink *sink)
-{
- Assert(sink->bbs_next != NULL);
- bbsink_begin_manifest(sink->bbs_next);
-}
-
-/*
- * Forward manifest_contents callback.
- *
- * As with the archive_contents callback, it's expected that the buffer is
- * shared.
- */
-void
-bbsink_forward_manifest_contents(bbsink *sink, size_t len)
-{
- Assert(sink->bbs_next != NULL);
- Assert(sink->bbs_buffer == sink->bbs_next->bbs_buffer);
- Assert(sink->bbs_buffer_length == sink->bbs_next->bbs_buffer_length);
- bbsink_manifest_contents(sink->bbs_next, len);
-}
-
-/*
- * Forward end_manifest callback.
- */
-void
-bbsink_forward_end_manifest(bbsink *sink)
-{
- Assert(sink->bbs_next != NULL);
- bbsink_end_manifest(sink->bbs_next);
-}
-
-/*
- * Forward end_backup callback.
- */
-void
-bbsink_forward_end_backup(bbsink *sink, XLogRecPtr endptr, TimeLineID endtli)
-{
- Assert(sink->bbs_next != NULL);
- bbsink_end_backup(sink->bbs_next, endptr, endtli);
-}
-
-/*
- * Forward cleanup callback.
- */
-void
-bbsink_forward_cleanup(bbsink *sink)
-{
- Assert(sink->bbs_next != NULL);
- bbsink_cleanup(sink->bbs_next);
-}
diff --git a/src/backend/replication/basebackup_target.c b/src/backend/replication/basebackup_target.c
deleted file mode 100644
index 9f73457320e..00000000000
--- a/src/backend/replication/basebackup_target.c
+++ /dev/null
@@ -1,241 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * basebackup_target.c
- * Base backups can be "targeted", which means that they can be sent
- * somewhere other than to the client which requested the backup.
- * Furthermore, new targets can be defined by extensions. This file
- * contains code to support that functionality.
- *
- * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
- *
- * IDENTIFICATION
- * src/backend/replication/basebackup_target.c
- *
- *-------------------------------------------------------------------------
- */
-#include "postgres.h"
-
-#include "replication/basebackup_target.h"
-#include "utils/memutils.h"
-
-typedef struct BaseBackupTargetType
-{
- char *name;
- void *(*check_detail) (char *, char *);
- bbsink *(*get_sink) (bbsink *, void *);
-} BaseBackupTargetType;
-
-struct BaseBackupTargetHandle
-{
- BaseBackupTargetType *type;
- void *detail_arg;
-};
-
-static void initialize_target_list(void);
-static bbsink *blackhole_get_sink(bbsink *next_sink, void *detail_arg);
-static bbsink *server_get_sink(bbsink *next_sink, void *detail_arg);
-static void *reject_target_detail(char *target, char *target_detail);
-static void *server_check_detail(char *target, char *target_detail);
-
-static BaseBackupTargetType builtin_backup_targets[] =
-{
- {
- "blackhole", reject_target_detail, blackhole_get_sink
- },
- {
- "server", server_check_detail, server_get_sink
- },
- {
- NULL
- }
-};
-
-static List *BaseBackupTargetTypeList = NIL;
-
-/*
- * Add a new base backup target type.
- *
- * This is intended for use by server extensions.
- */
-void
-BaseBackupAddTarget(char *name,
- void *(*check_detail) (char *, char *),
- bbsink *(*get_sink) (bbsink *, void *))
-{
- BaseBackupTargetType *ttype;
- MemoryContext oldcontext;
- ListCell *lc;
-
- /* If the target list is not yet initialized, do that first. */
- if (BaseBackupTargetTypeList == NIL)
- initialize_target_list();
-
- /* Search the target type list for an existing entry with this name. */
- foreach(lc, BaseBackupTargetTypeList)
- {
- BaseBackupTargetType *ttype = lfirst(lc);
-
- if (strcmp(ttype->name, name) == 0)
- {
- /*
- * We found one, so update it.
- *
- * It is probably not a great idea to call BaseBackupAddTarget for
- * the same name multiple times, but if it happens, this seems
- * like the sanest behavior.
- */
- ttype->check_detail = check_detail;
- ttype->get_sink = get_sink;
- return;
- }
- }
-
- /*
- * We use TopMemoryContext for allocations here to make sure that the data
- * we need doesn't vanish under us; that's also why we copy the target
- * name into a newly-allocated chunk of memory.
- */
- oldcontext = MemoryContextSwitchTo(TopMemoryContext);
- ttype = palloc(sizeof(BaseBackupTargetType));
- ttype->name = pstrdup(name);
- ttype->check_detail = check_detail;
- ttype->get_sink = get_sink;
- BaseBackupTargetTypeList = lappend(BaseBackupTargetTypeList, ttype);
- MemoryContextSwitchTo(oldcontext);
-}
-
-/*
- * Look up a base backup target and validate the target_detail.
- *
- * Extensions that define new backup targets will probably define a new
- * type of bbsink to match. Validation of the target_detail can be performed
- * either in the check_detail routine called here, or in the bbsink
- * constructor, which will be called from BaseBackupGetSink. It's mostly
- * a matter of taste, but the check_detail function runs somewhat earlier.
- */
-BaseBackupTargetHandle *
-BaseBackupGetTargetHandle(char *target, char *target_detail)
-{
- ListCell *lc;
-
- /* If the target list is not yet initialized, do that first. */
- if (BaseBackupTargetTypeList == NIL)
- initialize_target_list();
-
- /* Search the target type list for a match. */
- foreach(lc, BaseBackupTargetTypeList)
- {
- BaseBackupTargetType *ttype = lfirst(lc);
-
- if (strcmp(ttype->name, target) == 0)
- {
- BaseBackupTargetHandle *handle;
-
- /* Found the target. */
- handle = palloc(sizeof(BaseBackupTargetHandle));
- handle->type = ttype;
- handle->detail_arg = ttype->check_detail(target, target_detail);
-
- return handle;
- }
- }
-
- /* Did not find the target. */
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("unrecognized target: \"%s\"", target)));
-
- /* keep compiler quiet */
- return NULL;
-}
-
-/*
- * Construct a bbsink that will implement the backup target.
- *
- * The get_sink function does all the real work, so all we have to do here
- * is call it with the correct arguments. Whatever the check_detail function
- * returned is here passed through to the get_sink function. This lets those
- * two functions communicate with each other, if they wish. If not, the
- * check_detail function can simply return the target_detail and let the
- * get_sink function take it from there.
- */
-bbsink *
-BaseBackupGetSink(BaseBackupTargetHandle *handle, bbsink *next_sink)
-{
- return handle->type->get_sink(next_sink, handle->detail_arg);
-}
-
-/*
- * Load predefined target types into BaseBackupTargetTypeList.
- */
-static void
-initialize_target_list(void)
-{
- BaseBackupTargetType *ttype = builtin_backup_targets;
- MemoryContext oldcontext;
-
- oldcontext = MemoryContextSwitchTo(TopMemoryContext);
- while (ttype->name != NULL)
- {
- BaseBackupTargetTypeList = lappend(BaseBackupTargetTypeList, ttype);
- ++ttype;
- }
- MemoryContextSwitchTo(oldcontext);
-}
-
-/*
- * Normally, a get_sink function should construct and return a new bbsink that
- * implements the backup target, but the 'blackhole' target just throws the
- * data away. We could implement that by adding a bbsink that does nothing
- * but forward, but it's even cheaper to implement that by not adding a bbsink
- * at all.
- */
-static bbsink *
-blackhole_get_sink(bbsink *next_sink, void *detail_arg)
-{
- return next_sink;
-}
-
-/*
- * Create a bbsink implementing a server-side backup.
- */
-static bbsink *
-server_get_sink(bbsink *next_sink, void *detail_arg)
-{
- return bbsink_server_new(next_sink, detail_arg);
-}
-
-/*
- * Implement target-detail checking for a target that does not accept a
- * detail.
- */
-static void *
-reject_target_detail(char *target, char *target_detail)
-{
- if (target_detail != NULL)
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("target '%s' does not accept a target detail",
- target)));
-
- return NULL;
-}
-
-/*
- * Implement target-detail checking for a server-side backup.
- *
- * target_detail should be the name of the directory to which the backup
- * should be written, but we don't check that here. Rather, that check,
- * as well as the necessary permissions checking, happens in bbsink_server_new.
- */
-static void *
-server_check_detail(char *target, char *target_detail)
-{
- if (target_detail == NULL)
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("target '%s' requires a target detail",
- target)));
-
- return target_detail;
-}
diff --git a/src/backend/replication/basebackup_throttle.c b/src/backend/replication/basebackup_throttle.c
deleted file mode 100644
index af0704c3ace..00000000000
--- a/src/backend/replication/basebackup_throttle.c
+++ /dev/null
@@ -1,199 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * basebackup_throttle.c
- * Basebackup sink implementing throttling. Data is forwarded to the
- * next base backup sink in the chain at a rate no greater than the
- * configured maximum.
- *
- * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
- *
- * IDENTIFICATION
- * src/backend/replication/basebackup_throttle.c
- *
- *-------------------------------------------------------------------------
- */
-#include "postgres.h"
-
-#include "miscadmin.h"
-#include "replication/basebackup_sink.h"
-#include "pgstat.h"
-#include "storage/latch.h"
-#include "utils/timestamp.h"
-
-typedef struct bbsink_throttle
-{
- /* Common information for all types of sink. */
- bbsink base;
-
- /* The actual number of bytes, transfer of which may cause sleep. */
- uint64 throttling_sample;
-
- /* Amount of data already transferred but not yet throttled. */
- int64 throttling_counter;
-
- /* The minimum time required to transfer throttling_sample bytes. */
- TimeOffset elapsed_min_unit;
-
- /* The last check of the transfer rate. */
- TimestampTz throttled_last;
-} bbsink_throttle;
-
-static void bbsink_throttle_begin_backup(bbsink *sink);
-static void bbsink_throttle_archive_contents(bbsink *sink, size_t len);
-static void bbsink_throttle_manifest_contents(bbsink *sink, size_t len);
-static void throttle(bbsink_throttle *sink, size_t increment);
-
-static const bbsink_ops bbsink_throttle_ops = {
- .begin_backup = bbsink_throttle_begin_backup,
- .begin_archive = bbsink_forward_begin_archive,
- .archive_contents = bbsink_throttle_archive_contents,
- .end_archive = bbsink_forward_end_archive,
- .begin_manifest = bbsink_forward_begin_manifest,
- .manifest_contents = bbsink_throttle_manifest_contents,
- .end_manifest = bbsink_forward_end_manifest,
- .end_backup = bbsink_forward_end_backup,
- .cleanup = bbsink_forward_cleanup
-};
-
-/*
- * How frequently to throttle, as a fraction of the specified rate-second.
- */
-#define THROTTLING_FREQUENCY 8
-
-/*
- * Create a new basebackup sink that performs throttling and forwards data
- * to a successor sink.
- */
-bbsink *
-bbsink_throttle_new(bbsink *next, uint32 maxrate)
-{
- bbsink_throttle *sink;
-
- Assert(next != NULL);
- Assert(maxrate > 0);
-
- sink = palloc0(sizeof(bbsink_throttle));
- *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_throttle_ops;
- sink->base.bbs_next = next;
-
- sink->throttling_sample =
- (int64) maxrate * (int64) 1024 / THROTTLING_FREQUENCY;
-
- /*
- * The minimum amount of time for throttling_sample bytes to be
- * transferred.
- */
- sink->elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY;
-
- return &sink->base;
-}
-
-/*
- * There's no real work to do here, but we need to record the current time so
- * that it can be used for future calculations.
- */
-static void
-bbsink_throttle_begin_backup(bbsink *sink)
-{
- bbsink_throttle *mysink = (bbsink_throttle *) sink;
-
- bbsink_forward_begin_backup(sink);
-
- /* The 'real data' starts now (header was ignored). */
- mysink->throttled_last = GetCurrentTimestamp();
-}
-
-/*
- * First throttle, and then pass archive contents to next sink.
- */
-static void
-bbsink_throttle_archive_contents(bbsink *sink, size_t len)
-{
- throttle((bbsink_throttle *) sink, len);
-
- bbsink_forward_archive_contents(sink, len);
-}
-
-/*
- * First throttle, and then pass manifest contents to next sink.
- */
-static void
-bbsink_throttle_manifest_contents(bbsink *sink, size_t len)
-{
- throttle((bbsink_throttle *) sink, len);
-
- bbsink_forward_manifest_contents(sink, len);
-}
-
-/*
- * Increment the network transfer counter by the given number of bytes,
- * and sleep if necessary to comply with the requested network transfer
- * rate.
- */
-static void
-throttle(bbsink_throttle *sink, size_t increment)
-{
- TimeOffset elapsed_min;
-
- Assert(sink->throttling_counter >= 0);
-
- sink->throttling_counter += increment;
- if (sink->throttling_counter < sink->throttling_sample)
- return;
-
- /* How much time should have elapsed at minimum? */
- elapsed_min = sink->elapsed_min_unit *
- (sink->throttling_counter / sink->throttling_sample);
-
- /*
- * Since the latch could be set repeatedly because of concurrently WAL
- * activity, sleep in a loop to ensure enough time has passed.
- */
- for (;;)
- {
- TimeOffset elapsed,
- sleep;
- int wait_result;
-
- /* Time elapsed since the last measurement (and possible wake up). */
- elapsed = GetCurrentTimestamp() - sink->throttled_last;
-
- /* sleep if the transfer is faster than it should be */
- sleep = elapsed_min - elapsed;
- if (sleep <= 0)
- break;
-
- ResetLatch(MyLatch);
-
- /* We're eating a potentially set latch, so check for interrupts */
- CHECK_FOR_INTERRUPTS();
-
- /*
- * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be
- * the maximum time to sleep. Thus the cast to long is safe.
- */
- wait_result = WaitLatch(MyLatch,
- WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
- (long) (sleep / 1000),
- WAIT_EVENT_BASE_BACKUP_THROTTLE);
-
- if (wait_result & WL_LATCH_SET)
- CHECK_FOR_INTERRUPTS();
-
- /* Done waiting? */
- if (wait_result & WL_TIMEOUT)
- break;
- }
-
- /*
- * As we work with integers, only whole multiple of throttling_sample was
- * processed. The rest will be done during the next call of this function.
- */
- sink->throttling_counter %= sink->throttling_sample;
-
- /*
- * Time interval for the remaining amount and possible next increments
- * starts now.
- */
- sink->throttled_last = GetCurrentTimestamp();
-}
diff --git a/src/backend/replication/basebackup_zstd.c b/src/backend/replication/basebackup_zstd.c
deleted file mode 100644
index b23a37b29ed..00000000000
--- a/src/backend/replication/basebackup_zstd.c
+++ /dev/null
@@ -1,316 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * basebackup_zstd.c
- * Basebackup sink implementing zstd compression.
- *
- * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
- *
- * IDENTIFICATION
- * src/backend/replication/basebackup_zstd.c
- *
- *-------------------------------------------------------------------------
- */
-#include "postgres.h"
-
-#ifdef USE_ZSTD
-#include <zstd.h>
-#endif
-
-#include "replication/basebackup_sink.h"
-
-#ifdef USE_ZSTD
-
-typedef struct bbsink_zstd
-{
- /* Common information for all types of sink. */
- bbsink base;
-
- /* Compression options */
- pg_compress_specification *compress;
-
- ZSTD_CCtx *cctx;
- ZSTD_outBuffer zstd_outBuf;
-} bbsink_zstd;
-
-static void bbsink_zstd_begin_backup(bbsink *sink);
-static void bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name);
-static void bbsink_zstd_archive_contents(bbsink *sink, size_t avail_in);
-static void bbsink_zstd_manifest_contents(bbsink *sink, size_t len);
-static void bbsink_zstd_end_archive(bbsink *sink);
-static void bbsink_zstd_cleanup(bbsink *sink);
-static void bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr,
- TimeLineID endtli);
-
-static const bbsink_ops bbsink_zstd_ops = {
- .begin_backup = bbsink_zstd_begin_backup,
- .begin_archive = bbsink_zstd_begin_archive,
- .archive_contents = bbsink_zstd_archive_contents,
- .end_archive = bbsink_zstd_end_archive,
- .begin_manifest = bbsink_forward_begin_manifest,
- .manifest_contents = bbsink_zstd_manifest_contents,
- .end_manifest = bbsink_forward_end_manifest,
- .end_backup = bbsink_zstd_end_backup,
- .cleanup = bbsink_zstd_cleanup
-};
-#endif
-
-/*
- * Create a new basebackup sink that performs zstd compression.
- */
-bbsink *
-bbsink_zstd_new(bbsink *next, pg_compress_specification *compress)
-{
-#ifndef USE_ZSTD
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("zstd compression is not supported by this build")));
- return NULL; /* keep compiler quiet */
-#else
- bbsink_zstd *sink;
-
- Assert(next != NULL);
-
- sink = palloc0(sizeof(bbsink_zstd));
- *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops;
- sink->base.bbs_next = next;
- sink->compress = compress;
-
- return &sink->base;
-#endif
-}
-
-#ifdef USE_ZSTD
-
-/*
- * Begin backup.
- */
-static void
-bbsink_zstd_begin_backup(bbsink *sink)
-{
- bbsink_zstd *mysink = (bbsink_zstd *) sink;
- size_t output_buffer_bound;
- size_t ret;
- pg_compress_specification *compress = mysink->compress;
-
- mysink->cctx = ZSTD_createCCtx();
- if (!mysink->cctx)
- elog(ERROR, "could not create zstd compression context");
-
- if ((compress->options & PG_COMPRESSION_OPTION_LEVEL) != 0)
- {
- ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel,
- compress->level);
- if (ZSTD_isError(ret))
- elog(ERROR, "could not set zstd compression level to %d: %s",
- compress->level, ZSTD_getErrorName(ret));
- }
-
- if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
- {
- /*
- * On older versions of libzstd, this option does not exist, and
- * trying to set it will fail. Similarly for newer versions if they
- * are compiled without threading support.
- */
- ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_nbWorkers,
- compress->workers);
- if (ZSTD_isError(ret))
- ereport(ERROR,
- errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("could not set compression worker count to %d: %s",
- compress->workers, ZSTD_getErrorName(ret)));
- }
-
- /*
- * We need our own buffer, because we're going to pass different data to
- * the next sink than what gets passed to us.
- */
- mysink->base.bbs_buffer = palloc(mysink->base.bbs_buffer_length);
-
- /*
- * Make sure that the next sink's bbs_buffer is big enough to accommodate
- * the compressed input buffer.
- */
- output_buffer_bound = ZSTD_compressBound(mysink->base.bbs_buffer_length);
-
- /*
- * The buffer length is expected to be a multiple of BLCKSZ, so round up.
- */
- output_buffer_bound = output_buffer_bound + BLCKSZ -
- (output_buffer_bound % BLCKSZ);
-
- bbsink_begin_backup(sink->bbs_next, sink->bbs_state, output_buffer_bound);
-}
-
-/*
- * Prepare to compress the next archive.
- */
-static void
-bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name)
-{
- bbsink_zstd *mysink = (bbsink_zstd *) sink;
- char *zstd_archive_name;
-
- /*
- * At the start of each archive we reset the state to start a new
- * compression operation. The parameters are sticky and they will stick
- * around as we are resetting with option ZSTD_reset_session_only.
- */
- ZSTD_CCtx_reset(mysink->cctx, ZSTD_reset_session_only);
-
- mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
- mysink->zstd_outBuf.size = mysink->base.bbs_next->bbs_buffer_length;
- mysink->zstd_outBuf.pos = 0;
-
- /* Add ".zst" to the archive name. */
- zstd_archive_name = psprintf("%s.zst", archive_name);
- Assert(sink->bbs_next != NULL);
- bbsink_begin_archive(sink->bbs_next, zstd_archive_name);
- pfree(zstd_archive_name);
-}
-
-/*
- * Compress the input data to the output buffer until we run out of input
- * data. Each time the output buffer falls below the compression bound for
- * the input buffer, invoke the archive_contents() method for the next sink.
- *
- * Note that since we're compressing the input, it may very commonly happen
- * that we consume all the input data without filling the output buffer. In
- * that case, the compressed representation of the current input data won't
- * actually be sent to the next bbsink until a later call to this function,
- * or perhaps even not until bbsink_zstd_end_archive() is invoked.
- */
-static void
-bbsink_zstd_archive_contents(bbsink *sink, size_t len)
-{
- bbsink_zstd *mysink = (bbsink_zstd *) sink;
- ZSTD_inBuffer inBuf = {mysink->base.bbs_buffer, len, 0};
-
- while (inBuf.pos < inBuf.size)
- {
- size_t yet_to_flush;
- size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
-
- /*
- * If the out buffer is not left with enough space, send the output
- * buffer to the next sink, and reset it.
- */
- if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed)
- {
- bbsink_archive_contents(mysink->base.bbs_next,
- mysink->zstd_outBuf.pos);
- mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
- mysink->zstd_outBuf.size =
- mysink->base.bbs_next->bbs_buffer_length;
- mysink->zstd_outBuf.pos = 0;
- }
-
- yet_to_flush = ZSTD_compressStream2(mysink->cctx, &mysink->zstd_outBuf,
- &inBuf, ZSTD_e_continue);
-
- if (ZSTD_isError(yet_to_flush))
- elog(ERROR,
- "could not compress data: %s",
- ZSTD_getErrorName(yet_to_flush));
- }
-}
-
-/*
- * There might be some data inside zstd's internal buffers; we need to get that
- * flushed out, also end the zstd frame and then get that forwarded to the
- * successor sink as archive content.
- *
- * Then we can end processing for this archive.
- */
-static void
-bbsink_zstd_end_archive(bbsink *sink)
-{
- bbsink_zstd *mysink = (bbsink_zstd *) sink;
- size_t yet_to_flush;
-
- do
- {
- ZSTD_inBuffer in = {NULL, 0, 0};
- size_t max_needed = ZSTD_compressBound(0);
-
- /*
- * If the out buffer is not left with enough space, send the output
- * buffer to the next sink, and reset it.
- */
- if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed)
- {
- bbsink_archive_contents(mysink->base.bbs_next,
- mysink->zstd_outBuf.pos);
- mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
- mysink->zstd_outBuf.size =
- mysink->base.bbs_next->bbs_buffer_length;
- mysink->zstd_outBuf.pos = 0;
- }
-
- yet_to_flush = ZSTD_compressStream2(mysink->cctx,
- &mysink->zstd_outBuf,
- &in, ZSTD_e_end);
-
- if (ZSTD_isError(yet_to_flush))
- elog(ERROR, "could not compress data: %s",
- ZSTD_getErrorName(yet_to_flush));
-
- } while (yet_to_flush > 0);
-
- /* Make sure to pass any remaining bytes to the next sink. */
- if (mysink->zstd_outBuf.pos > 0)
- bbsink_archive_contents(mysink->base.bbs_next,
- mysink->zstd_outBuf.pos);
-
- /* Pass on the information that this archive has ended. */
- bbsink_forward_end_archive(sink);
-}
-
-/*
- * Free the resources and context.
- */
-static void
-bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr,
- TimeLineID endtli)
-{
- bbsink_zstd *mysink = (bbsink_zstd *) sink;
-
- /* Release the context. */
- if (mysink->cctx)
- {
- ZSTD_freeCCtx(mysink->cctx);
- mysink->cctx = NULL;
- }
-
- bbsink_forward_end_backup(sink, endptr, endtli);
-}
-
-/*
- * Manifest contents are not compressed, but we do need to copy them into
- * the successor sink's buffer, because we have our own.
- */
-static void
-bbsink_zstd_manifest_contents(bbsink *sink, size_t len)
-{
- memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len);
- bbsink_manifest_contents(sink->bbs_next, len);
-}
-
-/*
- * In case the backup fails, make sure we free any compression context that
- * got allocated, so that we don't leak memory.
- */
-static void
-bbsink_zstd_cleanup(bbsink *sink)
-{
- bbsink_zstd *mysink = (bbsink_zstd *) sink;
-
- /* Release the context if not already released. */
- if (mysink->cctx)
- {
- ZSTD_freeCCtx(mysink->cctx);
- mysink->cctx = NULL;
- }
-}
-
-#endif
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 3a86786cc3a..724010dbd97 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -57,6 +57,7 @@
#include "access/xlogreader.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
+#include "backup/basebackup.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
@@ -68,7 +69,6 @@
#include "nodes/replnodes.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
-#include "replication/basebackup.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/slot.h"