From f80b09bac87d6b49f5dbb6131da5fbd9b9773c5c Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Mon, 5 Aug 2024 11:40:29 -0400 Subject: Move astreamer (except astreamer_inject) to fe_utils. This allows the code to be used by other frontend applications. Amul Sul, reviewed by Sravan Kumar, Andres Freund (whose input I specifically solicited regarding the meson.build changes), and me. Discussion: http://postgr.es/m/CAAJ_b94StvLWrc_p4q-f7n3OPfr6GhL8_XuAg2aAaYZp1tF-nw@mail.gmail.com --- src/bin/pg_basebackup/Makefile | 7 +- src/bin/pg_basebackup/astreamer.h | 220 ------------- src/bin/pg_basebackup/astreamer_file.c | 396 ------------------------ src/bin/pg_basebackup/astreamer_gzip.c | 364 ---------------------- src/bin/pg_basebackup/astreamer_inject.h | 2 +- src/bin/pg_basebackup/astreamer_lz4.c | 422 ------------------------- src/bin/pg_basebackup/astreamer_tar.c | 514 ------------------------------- src/bin/pg_basebackup/astreamer_zstd.c | 368 ---------------------- src/bin/pg_basebackup/meson.build | 5 - src/fe_utils/Makefile | 5 + src/fe_utils/astreamer_file.c | 396 ++++++++++++++++++++++++ src/fe_utils/astreamer_gzip.c | 364 ++++++++++++++++++++++ src/fe_utils/astreamer_lz4.c | 422 +++++++++++++++++++++++++ src/fe_utils/astreamer_tar.c | 514 +++++++++++++++++++++++++++++++ src/fe_utils/astreamer_zstd.c | 368 ++++++++++++++++++++++ src/fe_utils/meson.build | 5 + src/include/fe_utils/astreamer.h | 220 +++++++++++++ 17 files changed, 2296 insertions(+), 2296 deletions(-) delete mode 100644 src/bin/pg_basebackup/astreamer.h delete mode 100644 src/bin/pg_basebackup/astreamer_file.c delete mode 100644 src/bin/pg_basebackup/astreamer_gzip.c delete mode 100644 src/bin/pg_basebackup/astreamer_lz4.c delete mode 100644 src/bin/pg_basebackup/astreamer_tar.c delete mode 100644 src/bin/pg_basebackup/astreamer_zstd.c create mode 100644 src/fe_utils/astreamer_file.c create mode 100644 src/fe_utils/astreamer_gzip.c create mode 100644 src/fe_utils/astreamer_lz4.c create mode 100644 src/fe_utils/astreamer_tar.c create mode 100644 src/fe_utils/astreamer_zstd.c create mode 100644 src/include/fe_utils/astreamer.h (limited to 'src') diff --git a/src/bin/pg_basebackup/Makefile b/src/bin/pg_basebackup/Makefile index a71af2d48a7..f1e73058b23 100644 --- a/src/bin/pg_basebackup/Makefile +++ b/src/bin/pg_basebackup/Makefile @@ -37,12 +37,7 @@ OBJS = \ BBOBJS = \ pg_basebackup.o \ - astreamer_file.o \ - astreamer_gzip.o \ - astreamer_inject.o \ - astreamer_lz4.o \ - astreamer_tar.o \ - astreamer_zstd.o + astreamer_inject.o all: pg_basebackup pg_createsubscriber pg_receivewal pg_recvlogical diff --git a/src/bin/pg_basebackup/astreamer.h b/src/bin/pg_basebackup/astreamer.h deleted file mode 100644 index 2c014dbddbe..00000000000 --- a/src/bin/pg_basebackup/astreamer.h +++ /dev/null @@ -1,220 +0,0 @@ -/*------------------------------------------------------------------------- - * - * astreamer.h - * - * Each tar archive returned by the server is passed to one or more - * astreamer objects for further processing. The astreamer may do - * something simple, like write the archive to a file, perhaps after - * compressing it, but it can also do more complicated things, like - * annotating the byte stream to indicate which parts of the data - * correspond to tar headers or trailing padding, vs. which parts are - * payload data. A subsequent astreamer may use this information to - * make further decisions about how to process the data; for example, - * it might choose to modify the archive contents. - * - * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group - * - * IDENTIFICATION - * src/bin/pg_basebackup/astreamer.h - *------------------------------------------------------------------------- - */ - -#ifndef ASTREAMER_H -#define ASTREAMER_H - -#include "common/compression.h" -#include "lib/stringinfo.h" -#include "pqexpbuffer.h" - -struct astreamer; -struct astreamer_ops; -typedef struct astreamer astreamer; -typedef struct astreamer_ops astreamer_ops; - -/* - * Each chunk of archive data passed to a astreamer is classified into one - * of these categories. When data is first received from the remote server, - * each chunk will be categorized as ASTREAMER_UNKNOWN, and the chunks will - * be of whatever size the remote server chose to send. - * - * If the archive is parsed (e.g. see astreamer_tar_parser_new()), then all - * chunks should be labelled as one of the other types listed here. In - * addition, there should be exactly one ASTREAMER_MEMBER_HEADER chunk and - * exactly one ASTREAMER_MEMBER_TRAILER chunk per archive member, even if - * that means a zero-length call. There can be any number of - * ASTREAMER_MEMBER_CONTENTS chunks in between those calls. There - * should exactly ASTREAMER_ARCHIVE_TRAILER chunk, and it should follow the - * last ASTREAMER_MEMBER_TRAILER chunk. - * - * In theory, we could need other classifications here, such as a way of - * indicating an archive header, but the "tar" format doesn't need anything - * else, so for the time being there's no point. - */ -typedef enum -{ - ASTREAMER_UNKNOWN, - ASTREAMER_MEMBER_HEADER, - ASTREAMER_MEMBER_CONTENTS, - ASTREAMER_MEMBER_TRAILER, - ASTREAMER_ARCHIVE_TRAILER, -} astreamer_archive_context; - -/* - * Each chunk of data that is classified as ASTREAMER_MEMBER_HEADER, - * ASTREAMER_MEMBER_CONTENTS, or ASTREAMER_MEMBER_TRAILER should also - * pass a pointer to an instance of this struct. The details are expected - * to be present in the archive header and used to fill the struct, after - * which all subsequent calls for the same archive member are expected to - * pass the same details. - */ -typedef struct -{ - char pathname[MAXPGPATH]; - pgoff_t size; - mode_t mode; - uid_t uid; - gid_t gid; - bool is_directory; - bool is_link; - char linktarget[MAXPGPATH]; -} astreamer_member; - -/* - * Generally, each type of astreamer will define its own struct, but the - * first element should be 'astreamer base'. A astreamer that does not - * require any additional private data could use this structure directly. - * - * bbs_ops is a pointer to the astreamer_ops object which contains the - * function pointers appropriate to this type of astreamer. - * - * bbs_next is a pointer to the successor astreamer, for those types of - * astreamer which forward data to a successor. It need not be used and - * should be set to NULL when not relevant. - * - * bbs_buffer is a buffer for accumulating data for temporary storage. Each - * type of astreamer makes its own decisions about whether and how to use - * this buffer. - */ -struct astreamer -{ - const astreamer_ops *bbs_ops; - astreamer *bbs_next; - StringInfoData bbs_buffer; -}; - -/* - * There are three callbacks for a astreamer. The 'content' callback is - * called repeatedly, as described in the astreamer_archive_context comments. - * Then, the 'finalize' callback is called once at the end, to give the - * astreamer a chance to perform cleanup such as closing files. Finally, - * because this code is running in a frontend environment where, as of this - * writing, there are no memory contexts, the 'free' callback is called to - * release memory. These callbacks should always be invoked using the static - * inline functions defined below. - */ -struct astreamer_ops -{ - void (*content) (astreamer *streamer, astreamer_member *member, - const char *data, int len, - astreamer_archive_context context); - void (*finalize) (astreamer *streamer); - void (*free) (astreamer *streamer); -}; - -/* Send some content to a astreamer. */ -static inline void -astreamer_content(astreamer *streamer, astreamer_member *member, - const char *data, int len, - astreamer_archive_context context) -{ - Assert(streamer != NULL); - streamer->bbs_ops->content(streamer, member, data, len, context); -} - -/* Finalize a astreamer. */ -static inline void -astreamer_finalize(astreamer *streamer) -{ - Assert(streamer != NULL); - streamer->bbs_ops->finalize(streamer); -} - -/* Free a astreamer. */ -static inline void -astreamer_free(astreamer *streamer) -{ - Assert(streamer != NULL); - streamer->bbs_ops->free(streamer); -} - -/* - * This is a convenience method for use when implementing a astreamer; it is - * not for use by outside callers. It adds the amount of data specified by - * 'nbytes' to the astreamer's buffer and adjusts '*len' and '*data' - * accordingly. - */ -static inline void -astreamer_buffer_bytes(astreamer *streamer, const char **data, int *len, - int nbytes) -{ - Assert(nbytes <= *len); - - appendBinaryStringInfo(&streamer->bbs_buffer, *data, nbytes); - *len -= nbytes; - *data += nbytes; -} - -/* - * This is a convenience method for use when implementing a astreamer; it is - * not for use by outsider callers. It attempts to add enough data to the - * astreamer's buffer to reach a length of target_bytes and adjusts '*len' - * and '*data' accordingly. It returns true if the target length has been - * reached and false otherwise. - */ -static inline bool -astreamer_buffer_until(astreamer *streamer, const char **data, int *len, - int target_bytes) -{ - int buflen = streamer->bbs_buffer.len; - - if (buflen >= target_bytes) - { - /* Target length already reached; nothing to do. */ - return true; - } - - if (buflen + *len < target_bytes) - { - /* Not enough data to reach target length; buffer all of it. */ - astreamer_buffer_bytes(streamer, data, len, *len); - return false; - } - - /* Buffer just enough to reach the target length. */ - astreamer_buffer_bytes(streamer, data, len, target_bytes - buflen); - return true; -} - -/* - * Functions for creating astreamer objects of various types. See the header - * comments for each of these functions for details. - */ -extern astreamer *astreamer_plain_writer_new(char *pathname, FILE *file); -extern astreamer *astreamer_gzip_writer_new(char *pathname, FILE *file, - pg_compress_specification *compress); -extern astreamer *astreamer_extractor_new(const char *basepath, - const char *(*link_map) (const char *), - void (*report_output_file) (const char *)); - -extern astreamer *astreamer_gzip_decompressor_new(astreamer *next); -extern astreamer *astreamer_lz4_compressor_new(astreamer *next, - pg_compress_specification *compress); -extern astreamer *astreamer_lz4_decompressor_new(astreamer *next); -extern astreamer *astreamer_zstd_compressor_new(astreamer *next, - pg_compress_specification *compress); -extern astreamer *astreamer_zstd_decompressor_new(astreamer *next); -extern astreamer *astreamer_tar_parser_new(astreamer *next); -extern astreamer *astreamer_tar_terminator_new(astreamer *next); -extern astreamer *astreamer_tar_archiver_new(astreamer *next); - -#endif diff --git a/src/bin/pg_basebackup/astreamer_file.c b/src/bin/pg_basebackup/astreamer_file.c deleted file mode 100644 index 2742385e103..00000000000 --- a/src/bin/pg_basebackup/astreamer_file.c +++ /dev/null @@ -1,396 +0,0 @@ -/*------------------------------------------------------------------------- - * - * astreamer_file.c - * - * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group - * - * IDENTIFICATION - * src/bin/pg_basebackup/astreamer_file.c - *------------------------------------------------------------------------- - */ - -#include "postgres_fe.h" - -#include - -#include "astreamer.h" -#include "common/file_perm.h" -#include "common/logging.h" -#include "common/string.h" - -typedef struct astreamer_plain_writer -{ - astreamer base; - char *pathname; - FILE *file; - bool should_close_file; -} astreamer_plain_writer; - -typedef struct astreamer_extractor -{ - astreamer base; - char *basepath; - const char *(*link_map) (const char *); - void (*report_output_file) (const char *); - char filename[MAXPGPATH]; - FILE *file; -} astreamer_extractor; - -static void astreamer_plain_writer_content(astreamer *streamer, - astreamer_member *member, - const char *data, int len, - astreamer_archive_context context); -static void astreamer_plain_writer_finalize(astreamer *streamer); -static void astreamer_plain_writer_free(astreamer *streamer); - -static const astreamer_ops astreamer_plain_writer_ops = { - .content = astreamer_plain_writer_content, - .finalize = astreamer_plain_writer_finalize, - .free = astreamer_plain_writer_free -}; - -static void astreamer_extractor_content(astreamer *streamer, - astreamer_member *member, - const char *data, int len, - astreamer_archive_context context); -static void astreamer_extractor_finalize(astreamer *streamer); -static void astreamer_extractor_free(astreamer *streamer); -static void extract_directory(const char *filename, mode_t mode); -static void extract_link(const char *filename, const char *linktarget); -static FILE *create_file_for_extract(const char *filename, mode_t mode); - -static const astreamer_ops astreamer_extractor_ops = { - .content = astreamer_extractor_content, - .finalize = astreamer_extractor_finalize, - .free = astreamer_extractor_free -}; - -/* - * Create a astreamer that just writes data to a file. - * - * The caller must specify a pathname and may specify a file. The pathname is - * used for error-reporting purposes either way. If file is NULL, the pathname - * also identifies the file to which the data should be written: it is opened - * for writing and closed when done. If file is not NULL, the data is written - * there. - */ -astreamer * -astreamer_plain_writer_new(char *pathname, FILE *file) -{ - astreamer_plain_writer *streamer; - - streamer = palloc0(sizeof(astreamer_plain_writer)); - *((const astreamer_ops **) &streamer->base.bbs_ops) = - &astreamer_plain_writer_ops; - - streamer->pathname = pstrdup(pathname); - streamer->file = file; - - if (file == NULL) - { - streamer->file = fopen(pathname, "wb"); - if (streamer->file == NULL) - pg_fatal("could not create file \"%s\": %m", pathname); - streamer->should_close_file = true; - } - - return &streamer->base; -} - -/* - * Write archive content to file. - */ -static void -astreamer_plain_writer_content(astreamer *streamer, - astreamer_member *member, const char *data, - int len, astreamer_archive_context context) -{ - astreamer_plain_writer *mystreamer; - - mystreamer = (astreamer_plain_writer *) streamer; - - if (len == 0) - return; - - errno = 0; - if (fwrite(data, len, 1, mystreamer->file) != 1) - { - /* if write didn't set errno, assume problem is no disk space */ - if (errno == 0) - errno = ENOSPC; - pg_fatal("could not write to file \"%s\": %m", - mystreamer->pathname); - } -} - -/* - * End-of-archive processing when writing to a plain file consists of closing - * the file if we opened it, but not if the caller provided it. - */ -static void -astreamer_plain_writer_finalize(astreamer *streamer) -{ - astreamer_plain_writer *mystreamer; - - mystreamer = (astreamer_plain_writer *) streamer; - - if (mystreamer->should_close_file && fclose(mystreamer->file) != 0) - pg_fatal("could not close file \"%s\": %m", - mystreamer->pathname); - - mystreamer->file = NULL; - mystreamer->should_close_file = false; -} - -/* - * Free memory associated with this astreamer. - */ -static void -astreamer_plain_writer_free(astreamer *streamer) -{ - astreamer_plain_writer *mystreamer; - - mystreamer = (astreamer_plain_writer *) streamer; - - Assert(!mystreamer->should_close_file); - Assert(mystreamer->base.bbs_next == NULL); - - pfree(mystreamer->pathname); - pfree(mystreamer); -} - -/* - * Create a astreamer that extracts an archive. - * - * All pathnames in the archive are interpreted relative to basepath. - * - * Unlike e.g. astreamer_plain_writer_new() we can't do anything useful here - * with untyped chunks; we need typed chunks which follow the rules described - * in astreamer.h. Assuming we have that, we don't need to worry about the - * original archive format; it's enough to just look at the member information - * provided and write to the corresponding file. - * - * 'link_map' is a function that will be applied to the target of any - * symbolic link, and which should return a replacement pathname to be used - * in its place. If NULL, the symbolic link target is used without - * modification. - * - * 'report_output_file' is a function that will be called each time we open a - * new output file. The pathname to that file is passed as an argument. If - * NULL, the call is skipped. - */ -astreamer * -astreamer_extractor_new(const char *basepath, - const char *(*link_map) (const char *), - void (*report_output_file) (const char *)) -{ - astreamer_extractor *streamer; - - streamer = palloc0(sizeof(astreamer_extractor)); - *((const astreamer_ops **) &streamer->base.bbs_ops) = - &astreamer_extractor_ops; - streamer->basepath = pstrdup(basepath); - streamer->link_map = link_map; - streamer->report_output_file = report_output_file; - - return &streamer->base; -} - -/* - * Extract archive contents to the filesystem. - */ -static void -astreamer_extractor_content(astreamer *streamer, astreamer_member *member, - const char *data, int len, - astreamer_archive_context context) -{ - astreamer_extractor *mystreamer = (astreamer_extractor *) streamer; - int fnamelen; - - Assert(member != NULL || context == ASTREAMER_ARCHIVE_TRAILER); - Assert(context != ASTREAMER_UNKNOWN); - - switch (context) - { - case ASTREAMER_MEMBER_HEADER: - Assert(mystreamer->file == NULL); - - /* Prepend basepath. */ - snprintf(mystreamer->filename, sizeof(mystreamer->filename), - "%s/%s", mystreamer->basepath, member->pathname); - - /* Remove any trailing slash. */ - fnamelen = strlen(mystreamer->filename); - if (mystreamer->filename[fnamelen - 1] == '/') - mystreamer->filename[fnamelen - 1] = '\0'; - - /* Dispatch based on file type. */ - if (member->is_directory) - extract_directory(mystreamer->filename, member->mode); - else if (member->is_link) - { - const char *linktarget = member->linktarget; - - if (mystreamer->link_map) - linktarget = mystreamer->link_map(linktarget); - extract_link(mystreamer->filename, linktarget); - } - else - mystreamer->file = - create_file_for_extract(mystreamer->filename, - member->mode); - - /* Report output file change. */ - if (mystreamer->report_output_file) - mystreamer->report_output_file(mystreamer->filename); - break; - - case ASTREAMER_MEMBER_CONTENTS: - if (mystreamer->file == NULL) - break; - - errno = 0; - if (len > 0 && fwrite(data, len, 1, mystreamer->file) != 1) - { - /* if write didn't set errno, assume problem is no disk space */ - if (errno == 0) - errno = ENOSPC; - pg_fatal("could not write to file \"%s\": %m", - mystreamer->filename); - } - break; - - case ASTREAMER_MEMBER_TRAILER: - if (mystreamer->file == NULL) - break; - fclose(mystreamer->file); - mystreamer->file = NULL; - break; - - case ASTREAMER_ARCHIVE_TRAILER: - break; - - default: - /* Shouldn't happen. */ - pg_fatal("unexpected state while extracting archive"); - } -} - -/* - * Should we tolerate an already-existing directory? - * - * When streaming WAL, pg_wal (or pg_xlog for pre-9.6 clusters) will have been - * created by the wal receiver process. Also, when the WAL directory location - * was specified, pg_wal (or pg_xlog) has already been created as a symbolic - * link before starting the actual backup. So just ignore creation failures - * on related directories. - * - * If in-place tablespaces are used, pg_tblspc and subdirectories may already - * exist when we get here. So tolerate that case, too. - */ -static bool -should_allow_existing_directory(const char *pathname) -{ - const char *filename = last_dir_separator(pathname) + 1; - - if (strcmp(filename, "pg_wal") == 0 || - strcmp(filename, "pg_xlog") == 0 || - strcmp(filename, "archive_status") == 0 || - strcmp(filename, "summaries") == 0 || - strcmp(filename, "pg_tblspc") == 0) - return true; - - if (strspn(filename, "0123456789") == strlen(filename)) - { - const char *pg_tblspc = strstr(pathname, "/pg_tblspc/"); - - return pg_tblspc != NULL && pg_tblspc + 11 == filename; - } - - return false; -} - -/* - * Create a directory. - */ -static void -extract_directory(const char *filename, mode_t mode) -{ - if (mkdir(filename, pg_dir_create_mode) != 0 && - (errno != EEXIST || !should_allow_existing_directory(filename))) - pg_fatal("could not create directory \"%s\": %m", - filename); - -#ifndef WIN32 - if (chmod(filename, mode)) - pg_fatal("could not set permissions on directory \"%s\": %m", - filename); -#endif -} - -/* - * Create a symbolic link. - * - * It's most likely a link in pg_tblspc directory, to the location of a - * tablespace. Apply any tablespace mapping given on the command line - * (--tablespace-mapping). (We blindly apply the mapping without checking that - * the link really is inside pg_tblspc. We don't expect there to be other - * symlinks in a data directory, but if there are, you can call it an - * undocumented feature that you can map them too.) - */ -static void -extract_link(const char *filename, const char *linktarget) -{ - if (symlink(linktarget, filename) != 0) - pg_fatal("could not create symbolic link from \"%s\" to \"%s\": %m", - filename, linktarget); -} - -/* - * Create a regular file. - * - * Return the resulting handle so we can write the content to the file. - */ -static FILE * -create_file_for_extract(const char *filename, mode_t mode) -{ - FILE *file; - - file = fopen(filename, "wb"); - if (file == NULL) - pg_fatal("could not create file \"%s\": %m", filename); - -#ifndef WIN32 - if (chmod(filename, mode)) - pg_fatal("could not set permissions on file \"%s\": %m", - filename); -#endif - - return file; -} - -/* - * End-of-stream processing for extracting an archive. - * - * There's nothing to do here but sanity checking. - */ -static void -astreamer_extractor_finalize(astreamer *streamer) -{ - astreamer_extractor *mystreamer PG_USED_FOR_ASSERTS_ONLY - = (astreamer_extractor *) streamer; - - Assert(mystreamer->file == NULL); -} - -/* - * Free memory. - */ -static void -astreamer_extractor_free(astreamer *streamer) -{ - astreamer_extractor *mystreamer = (astreamer_extractor *) streamer; - - pfree(mystreamer->basepath); - pfree(mystreamer); -} diff --git a/src/bin/pg_basebackup/astreamer_gzip.c b/src/bin/pg_basebackup/astreamer_gzip.c deleted file mode 100644 index 6f7c27afbbc..00000000000 --- a/src/bin/pg_basebackup/astreamer_gzip.c +++ /dev/null @@ -1,364 +0,0 @@ -/*------------------------------------------------------------------------- - * - * astreamer_gzip.c - * - * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group - * - * IDENTIFICATION - * src/bin/pg_basebackup/astreamer_gzip.c - *------------------------------------------------------------------------- - */ - -#include "postgres_fe.h" - -#include - -#ifdef HAVE_LIBZ -#include -#endif - -#include "astreamer.h" -#include "common/file_perm.h" -#include "common/logging.h" -#include "common/string.h" - -#ifdef HAVE_LIBZ -typedef struct astreamer_gzip_writer -{ - astreamer base; - char *pathname; - gzFile gzfile; -} astreamer_gzip_writer; - -typedef struct astreamer_gzip_decompressor -{ - astreamer base; - z_stream zstream; - size_t bytes_written; -} astreamer_gzip_decompressor; - -static void astreamer_gzip_writer_content(astreamer *streamer, - astreamer_member *member, - const char *data, int len, - astreamer_archive_context context); -static void astreamer_gzip_writer_finalize(astreamer *streamer); -static void astreamer_gzip_writer_free(astreamer *streamer); -static const char *get_gz_error(gzFile gzf); - -static const astreamer_ops astreamer_gzip_writer_ops = { - .content = astreamer_gzip_writer_content, - .finalize = astreamer_gzip_writer_finalize, - .free = astreamer_gzip_writer_free -}; - -static void astreamer_gzip_decompressor_content(astreamer *streamer, - astreamer_member *member, - const char *data, int len, - astreamer_archive_context context); -static void astreamer_gzip_decompressor_finalize(astreamer *streamer); -static void astreamer_gzip_decompressor_free(astreamer *streamer); -static void *gzip_palloc(void *opaque, unsigned items, unsigned size); -static void gzip_pfree(void *opaque, void *address); - -static const astreamer_ops astreamer_gzip_decompressor_ops = { - .content = astreamer_gzip_decompressor_content, - .finalize = astreamer_gzip_decompressor_finalize, - .free = astreamer_gzip_decompressor_free -}; -#endif - -/* - * Create a astreamer that just compresses data using gzip, and then writes - * it to a file. - * - * As in the case of astreamer_plain_writer_new, pathname is always used - * for error reporting purposes; if file is NULL, it is also the opened and - * closed so that the data may be written there. - */ -astreamer * -astreamer_gzip_writer_new(char *pathname, FILE *file, - pg_compress_specification *compress) -{ -#ifdef HAVE_LIBZ - astreamer_gzip_writer *streamer; - - streamer = palloc0(sizeof(astreamer_gzip_writer)); - *((const astreamer_ops **) &streamer->base.bbs_ops) = - &astreamer_gzip_writer_ops; - - streamer->pathname = pstrdup(pathname); - - if (file == NULL) - { - streamer->gzfile = gzopen(pathname, "wb"); - if (streamer->gzfile == NULL) - pg_fatal("could not create compressed file \"%s\": %m", - pathname); - } - else - { - int fd = dup(fileno(file)); - - if (fd < 0) - pg_fatal("could not duplicate stdout: %m"); - - streamer->gzfile = gzdopen(fd, "wb"); - if (streamer->gzfile == NULL) - pg_fatal("could not open output file: %m"); - } - - if (gzsetparams(streamer->gzfile, compress->level, Z_DEFAULT_STRATEGY) != Z_OK) - pg_fatal("could not set compression level %d: %s", - compress->level, get_gz_error(streamer->gzfile)); - - return &streamer->base; -#else - pg_fatal("this build does not support compression with %s", "gzip"); - return NULL; /* keep compiler quiet */ -#endif -} - -#ifdef HAVE_LIBZ -/* - * Write archive content to gzip file. - */ -static void -astreamer_gzip_writer_content(astreamer *streamer, - astreamer_member *member, const char *data, - int len, astreamer_archive_context context) -{ - astreamer_gzip_writer *mystreamer; - - mystreamer = (astreamer_gzip_writer *) streamer; - - if (len == 0) - return; - - errno = 0; - if (gzwrite(mystreamer->gzfile, data, len) != len) - { - /* if write didn't set errno, assume problem is no disk space */ - if (errno == 0) - errno = ENOSPC; - pg_fatal("could not write to compressed file \"%s\": %s", - mystreamer->pathname, get_gz_error(mystreamer->gzfile)); - } -} - -/* - * End-of-archive processing when writing to a gzip file consists of just - * calling gzclose. - * - * It makes no difference whether we opened the file or the caller did it, - * because libz provides no way of avoiding a close on the underlying file - * handle. Notice, however, that astreamer_gzip_writer_new() uses dup() to - * work around this issue, so that the behavior from the caller's viewpoint - * is the same as for astreamer_plain_writer. - */ -static void -astreamer_gzip_writer_finalize(astreamer *streamer) -{ - astreamer_gzip_writer *mystreamer; - - mystreamer = (astreamer_gzip_writer *) streamer; - - errno = 0; /* in case gzclose() doesn't set it */ - if (gzclose(mystreamer->gzfile) != 0) - pg_fatal("could not close compressed file \"%s\": %m", - mystreamer->pathname); - - mystreamer->gzfile = NULL; -} - -/* - * Free memory associated with this astreamer. - */ -static void -astreamer_gzip_writer_free(astreamer *streamer) -{ - astreamer_gzip_writer *mystreamer; - - mystreamer = (astreamer_gzip_writer *) streamer; - - Assert(mystreamer->base.bbs_next == NULL); - Assert(mystreamer->gzfile == NULL); - - pfree(mystreamer->pathname); - pfree(mystreamer); -} - -/* - * Helper function for libz error reporting. - */ -static const char * -get_gz_error(gzFile gzf) -{ - int errnum; - const char *errmsg; - - errmsg = gzerror(gzf, &errnum); - if (errnum == Z_ERRNO) - return strerror(errno); - else - return errmsg; -} -#endif - -/* - * Create a new base backup streamer that performs decompression of gzip - * compressed blocks. - */ -astreamer * -astreamer_gzip_decompressor_new(astreamer *next) -{ -#ifdef HAVE_LIBZ - astreamer_gzip_decompressor *streamer; - z_stream *zs; - - Assert(next != NULL); - - streamer = palloc0(sizeof(astreamer_gzip_decompressor)); - *((const astreamer_ops **) &streamer->base.bbs_ops) = - &astreamer_gzip_decompressor_ops; - - streamer->base.bbs_next = next; - initStringInfo(&streamer->base.bbs_buffer); - - /* Initialize internal stream state for decompression */ - zs = &streamer->zstream; - zs->zalloc = gzip_palloc; - zs->zfree = gzip_pfree; - zs->next_out = (uint8 *) streamer->base.bbs_buffer.data; - zs->avail_out = streamer->base.bbs_buffer.maxlen; - - /* - * Data compression was initialized using deflateInit2 to request a gzip - * header. Similarly, we are using inflateInit2 to initialize data - * decompression. - * - * Per the documentation for inflateInit2, the second argument is - * "windowBits" and its value must be greater than or equal to the value - * provided while compressing the data, so we are using the maximum - * possible value for safety. - */ - if (inflateInit2(zs, 15 + 16) != Z_OK) - pg_fatal("could not initialize compression library"); - - return &streamer->base; -#else - pg_fatal("this build does not support compression with %s", "gzip"); - return NULL; /* keep compiler quiet */ -#endif -} - -#ifdef HAVE_LIBZ -/* - * Decompress the input data to output buffer until we run out of input - * data. Each time the output buffer is full, pass on the decompressed data - * to the next streamer. - */ -static void -astreamer_gzip_decompressor_content(astreamer *streamer, - astreamer_member *member, - const char *data, int len, - astreamer_archive_context context) -{ - astreamer_gzip_decompressor *mystreamer; - z_stream *zs; - - mystreamer = (astreamer_gzip_decompressor *) streamer; - - zs = &mystreamer->zstream; - zs->next_in = (const uint8 *) data; - zs->avail_in = len; - - /* Process the current chunk */ - while (zs->avail_in > 0) - { - int res; - - Assert(mystreamer->bytes_written < mystreamer->base.bbs_buffer.maxlen); - - zs->next_out = (uint8 *) - mystreamer->base.bbs_buffer.data + mystreamer->bytes_written; - zs->avail_out = - mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written; - - /* - * This call decompresses data starting at zs->next_in and updates - * zs->next_in * and zs->avail_in. It generates output data starting - * at zs->next_out and updates zs->next_out and zs->avail_out - * accordingly. - */ - res = inflate(zs, Z_NO_FLUSH); - - if (res == Z_STREAM_ERROR) - pg_log_error("could not decompress data: %s", zs->msg); - - mystreamer->bytes_written = - mystreamer->base.bbs_buffer.maxlen - zs->avail_out; - - /* If output buffer is full then pass data to next streamer */ - if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen) - { - astreamer_content(mystreamer->base.bbs_next, member, - mystreamer->base.bbs_buffer.data, - mystreamer->base.bbs_buffer.maxlen, context); - mystreamer->bytes_written = 0; - } - } -} - -/* - * End-of-stream processing. - */ -static void -astreamer_gzip_decompressor_finalize(astreamer *streamer) -{ - astreamer_gzip_decompressor *mystreamer; - - mystreamer = (astreamer_gzip_decompressor *) streamer; - - /* - * End of the stream, if there is some pending data in output buffers then - * we must forward it to next streamer. - */ - astreamer_content(mystreamer->base.bbs_next, NULL, - mystreamer->base.bbs_buffer.data, - mystreamer->base.bbs_buffer.maxlen, - ASTREAMER_UNKNOWN); - - astreamer_finalize(mystreamer->base.bbs_next); -} - -/* - * Free memory. - */ -static void -astreamer_gzip_decompressor_free(astreamer *streamer) -{ - astreamer_free(streamer->bbs_next); - pfree(streamer->bbs_buffer.data); - pfree(streamer); -} - -/* - * Wrapper function to adjust the signature of palloc to match what libz - * expects. - */ -static void * -gzip_palloc(void *opaque, unsigned items, unsigned size) -{ - return palloc(items * size); -} - -/* - * Wrapper function to adjust the signature of pfree to match what libz - * expects. - */ -static void -gzip_pfree(void *opaque, void *address) -{ - pfree(address); -} -#endif diff --git a/src/bin/pg_basebackup/astreamer_inject.h b/src/bin/pg_basebackup/astreamer_inject.h index 8504b3f5e0d..aeed533862b 100644 --- a/src/bin/pg_basebackup/astreamer_inject.h +++ b/src/bin/pg_basebackup/astreamer_inject.h @@ -12,7 +12,7 @@ #ifndef ASTREAMER_INJECT_H #define ASTREAMER_INJECT_H -#include "astreamer.h" +#include "fe_utils/astreamer.h" #include "pqexpbuffer.h" extern astreamer *astreamer_recovery_injector_new(astreamer *next, diff --git a/src/bin/pg_basebackup/astreamer_lz4.c b/src/bin/pg_basebackup/astreamer_lz4.c deleted file mode 100644 index 1c40d7d8ad5..00000000000 --- a/src/bin/pg_basebackup/astreamer_lz4.c +++ /dev/null @@ -1,422 +0,0 @@ -/*------------------------------------------------------------------------- - * - * astreamer_lz4.c - * - * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group - * - * IDENTIFICATION - * src/bin/pg_basebackup/astreamer_lz4.c - *------------------------------------------------------------------------- - */ - -#include "postgres_fe.h" - -#include - -#ifdef USE_LZ4 -#include -#endif - -#include "astreamer.h" -#include "common/file_perm.h" -#include "common/logging.h" -#include "common/string.h" - -#ifdef USE_LZ4 -typedef struct astreamer_lz4_frame -{ - astreamer base; - - LZ4F_compressionContext_t cctx; - LZ4F_decompressionContext_t dctx; - LZ4F_preferences_t prefs; - - size_t bytes_written; - bool header_written; -} astreamer_lz4_frame; - -static void astreamer_lz4_compressor_content(astreamer *streamer, - astreamer_member *member, - const char *data, int len, - astreamer_archive_context context); -static void astreamer_lz4_compressor_finalize(astreamer *streamer); -static void astreamer_lz4_compressor_free(astreamer *streamer); - -static const astreamer_ops astreamer_lz4_compressor_ops = { - .content = astreamer_lz4_compressor_content, - .finalize = astreamer_lz4_compressor_finalize, - .free = astreamer_lz4_compressor_free -}; - -static void astreamer_lz4_decompressor_content(astreamer *streamer, - astreamer_member *member, - const char *data, int len, - astreamer_archive_context context); -static void astreamer_lz4_decompressor_finalize(astreamer *streamer); -static void astreamer_lz4_decompressor_free(astreamer *streamer); - -static const astreamer_ops astreamer_lz4_decompressor_ops = { - .content = astreamer_lz4_decompressor_content, - .finalize = astreamer_lz4_decompressor_finalize, - .free = astreamer_lz4_decompressor_free -}; -#endif - -/* - * Create a new base backup streamer that performs lz4 compression of tar - * blocks. - */ -astreamer * -astreamer_lz4_compressor_new(astreamer *next, pg_compress_specification *compress) -{ -#ifdef USE_LZ4 - astreamer_lz4_frame *streamer; - LZ4F_errorCode_t ctxError; - LZ4F_preferences_t *prefs; - - Assert(next != NULL); - - streamer = palloc0(sizeof(astreamer_lz4_frame)); - *((const astreamer_ops **) &streamer->base.bbs_ops) = - &astreamer_lz4_compressor_ops; - - streamer->base.bbs_next = next; - initStringInfo(&streamer->base.bbs_buffer); - streamer->header_written = false; - - /* Initialize stream compression preferences */ - prefs = &streamer->prefs; - memset(prefs, 0, sizeof(LZ4F_preferences_t)); - prefs->frameInfo.blockSizeID = LZ4F_max256KB; - prefs->compressionLevel = compress->level; - - ctxError = LZ4F_createCompressionContext(&streamer->cctx, LZ4F_VERSION); - if (LZ4F_isError(ctxError)) - pg_log_error("could not create lz4 compression context: %s", - LZ4F_getErrorName(ctxError)); - - return &streamer->base; -#else - pg_fatal("this build does not support compression with %s", "LZ4"); - return NULL; /* keep compiler quiet */ -#endif -} - -#ifdef USE_LZ4 -/* - * Compress the input data to output buffer. - * - * Find out the compression bound based on input data length for each - * invocation to make sure that output buffer has enough capacity to - * accommodate the compressed data. In case if the output buffer - * capacity falls short of compression bound then forward the content - * of output buffer to next streamer and empty the buffer. - */ -static void -astreamer_lz4_compressor_content(astreamer *streamer, - astreamer_member *member, - const char *data, int len, - astreamer_archive_context context) -{ - astreamer_lz4_frame *mystreamer; - uint8 *next_in, - *next_out; - size_t out_bound, - compressed_size, - avail_out; - - mystreamer = (astreamer_lz4_frame *) streamer; - next_in = (uint8 *) data; - - /* Write header before processing the first input chunk. */ - if (!mystreamer->header_written) - { - compressed_size = LZ4F_compressBegin(mystreamer->cctx, - (uint8 *) mystreamer->base.bbs_buffer.data, - mystreamer->base.bbs_buffer.maxlen, - &mystreamer->prefs); - - if (LZ4F_isError(compressed_size)) - pg_log_error("could not write lz4 header: %s", - LZ4F_getErrorName(compressed_size)); - - mystreamer->bytes_written += compressed_size; - mystreamer->header_written = true; - } - - /* - * Update the offset and capacity of output buffer based on number of - * bytes written to output buffer. - */ - next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written; - avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written; - - /* - * Find out the compression bound and make sure that output buffer has the - * required capacity for the success of LZ4F_compressUpdate. If needed - * forward the content to next streamer and empty the buffer. - */ - out_bound = LZ4F_compressBound(len, &mystreamer->prefs); - if (avail_out < out_bound) - { - astreamer_content(mystreamer->base.bbs_next, member, - mystreamer->base.bbs_buffer.data, - mystreamer->bytes_written, - context); - - /* Enlarge buffer if it falls short of out bound. */ - if (mystreamer->base.bbs_buffer.maxlen < out_bound) - enlargeStringInfo(&mystreamer->base.bbs_buffer, out_bound); - - avail_out = mystreamer->base.bbs_buffer.maxlen; - mystreamer->bytes_written = 0; - next_out = (uint8 *) mystreamer->base.bbs_buffer.data; - } - - /* - * This call compresses the data starting at next_in and generates the - * output starting at next_out. It expects the caller to provide the size - * of input buffer and capacity of output buffer by providing parameters - * len and avail_out. - * - * It returns the number of bytes compressed to output buffer. - */ - compressed_size = LZ4F_compressUpdate(mystreamer->cctx, - next_out, avail_out, - next_in, len, NULL); - - if (LZ4F_isError(compressed_size)) - pg_log_error("could not compress data: %s", - LZ4F_getErrorName(compressed_size)); - - mystreamer->bytes_written += compressed_size; -} - -/* - * End-of-stream processing. - */ -static void -astreamer_lz4_compressor_finalize(astreamer *streamer) -{ - astreamer_lz4_frame *mystreamer; - uint8 *next_out; - size_t footer_bound, - compressed_size, - avail_out; - - mystreamer = (astreamer_lz4_frame *) streamer; - - /* Find out the footer bound and update the output buffer. */ - footer_bound = LZ4F_compressBound(0, &mystreamer->prefs); - if ((mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written) < - footer_bound) - { - astreamer_content(mystreamer->base.bbs_next, NULL, - mystreamer->base.bbs_buffer.data, - mystreamer->bytes_written, - ASTREAMER_UNKNOWN); - - /* Enlarge buffer if it falls short of footer bound. */ - if (mystreamer->base.bbs_buffer.maxlen < footer_bound) - enlargeStringInfo(&mystreamer->base.bbs_buffer, footer_bound); - - avail_out = mystreamer->base.bbs_buffer.maxlen; - mystreamer->bytes_written = 0; - next_out = (uint8 *) mystreamer->base.bbs_buffer.data; - } - else - { - next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written; - avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written; - } - - /* - * Finalize the frame and flush whatever data remaining in compression - * context. - */ - compressed_size = LZ4F_compressEnd(mystreamer->cctx, - next_out, avail_out, NULL); - - if (LZ4F_isError(compressed_size)) - pg_log_error("could not end lz4 compression: %s", - LZ4F_getErrorName(compressed_size)); - - mystreamer->bytes_written += compressed_size; - - astreamer_content(mystreamer->base.bbs_next, NULL, - mystreamer->base.bbs_buffer.data, - mystreamer->bytes_written, - ASTREAMER_UNKNOWN); - - astreamer_finalize(mystreamer->base.bbs_next); -} - -/* - * Free memory. - */ -static void -astreamer_lz4_compressor_free(astreamer *streamer) -{ - astreamer_lz4_frame *mystreamer; - - mystreamer = (astreamer_lz4_frame *) streamer; - astreamer_free(streamer->bbs_next); - LZ4F_freeCompressionContext(mystreamer->cctx); - pfree(streamer->bbs_buffer.data); - pfree(streamer); -} -#endif - -/* - * Create a new base backup streamer that performs decompression of lz4 - * compressed blocks. - */ -astreamer * -astreamer_lz4_decompressor_new(astreamer *next) -{ -#ifdef USE_LZ4 - astreamer_lz4_frame *streamer; - LZ4F_errorCode_t ctxError; - - Assert(next != NULL); - - streamer = palloc0(sizeof(astreamer_lz4_frame)); - *((const astreamer_ops **) &streamer->base.bbs_ops) = - &astreamer_lz4_decompressor_ops; - - streamer->base.bbs_next = next; - initStringInfo(&streamer->base.bbs_buffer); - - /* Initialize internal stream state for decompression */ - ctxError = LZ4F_createDecompressionContext(&streamer->dctx, LZ4F_VERSION); - if (LZ4F_isError(ctxError)) - pg_fatal("could not initialize compression library: %s", - LZ4F_getErrorName(ctxError)); - - return &streamer->base; -#else - pg_fatal("this build does not support compression with %s", "LZ4"); - return NULL; /* keep compiler quiet */ -#endif -} - -#ifdef USE_LZ4 -/* - * Decompress the input data to output buffer until we run out of input - * data. Each time the output buffer is full, pass on the decompressed data - * to the next streamer. - */ -static void -astreamer_lz4_decompressor_content(astreamer *streamer, - astreamer_member *member, - const char *data, int len, - astreamer_archive_context context) -{ - astreamer_lz4_frame *mystreamer; - uint8 *next_in, - *next_out; - size_t avail_in, - avail_out; - - mystreamer = (astreamer_lz4_frame *) streamer; - next_in = (uint8 *) data; - next_out = (uint8 *) mystreamer->base.bbs_buffer.data; - avail_in = len; - avail_out = mystreamer->base.bbs_buffer.maxlen; - - while (avail_in > 0) - { - size_t ret, - read_size, - out_size; - - read_size = avail_in; - out_size = avail_out; - - /* - * This call decompresses the data starting at next_in and generates - * the output data starting at next_out. It expects the caller to - * provide size of the input buffer and total capacity of the output - * buffer by providing the read_size and out_size parameters - * respectively. - * - * Per the documentation of LZ4, parameters read_size and out_size - * behaves as dual parameters. On return, the number of bytes consumed - * from the input buffer will be written back to read_size and the - * number of bytes decompressed to output buffer will be written back - * to out_size respectively. - */ - ret = LZ4F_decompress(mystreamer->dctx, - next_out, &out_size, - next_in, &read_size, NULL); - - if (LZ4F_isError(ret)) - pg_log_error("could not decompress data: %s", - LZ4F_getErrorName(ret)); - - /* Update input buffer based on number of bytes consumed */ - avail_in -= read_size; - next_in += read_size; - - mystreamer->bytes_written += out_size; - - /* - * If output buffer is full then forward the content to next streamer - * and update the output buffer. - */ - if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen) - { - astreamer_content(mystreamer->base.bbs_next, member, - mystreamer->base.bbs_buffer.data, - mystreamer->base.bbs_buffer.maxlen, - context); - - avail_out = mystreamer->base.bbs_buffer.maxlen; - mystreamer->bytes_written = 0; - next_out = (uint8 *) mystreamer->base.bbs_buffer.data; - } - else - { - avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written; - next_out += mystreamer->bytes_written; - } - } -} - -/* - * End-of-stream processing. - */ -static void -astreamer_lz4_decompressor_finalize(astreamer *streamer) -{ - astreamer_lz4_frame *mystreamer; - - mystreamer = (astreamer_lz4_frame *) streamer; - - /* - * End of the stream, if there is some pending data in output buffers then - * we must forward it to next streamer. - */ - astreamer_content(mystreamer->base.bbs_next, NULL, - mystreamer->base.bbs_buffer.data, - mystreamer->base.bbs_buffer.maxlen, - ASTREAMER_UNKNOWN); - - astreamer_finalize(mystreamer->base.bbs_next); -} - -/* - * Free memory. - */ -static void -astreamer_lz4_decompressor_free(astreamer *streamer) -{ - astreamer_lz4_frame *mystreamer; - - mystreamer = (astreamer_lz4_frame *) streamer; - astreamer_free(streamer->bbs_next); - LZ4F_freeDecompressionContext(mystreamer->dctx); - pfree(streamer->bbs_buffer.data); - pfree(streamer); -} -#endif diff --git a/src/bin/pg_basebackup/astreamer_tar.c b/src/bin/pg_basebackup/astreamer_tar.c deleted file mode 100644 index 673690cd18f..00000000000 --- a/src/bin/pg_basebackup/astreamer_tar.c +++ /dev/null @@ -1,514 +0,0 @@ -/*------------------------------------------------------------------------- - * - * astreamer_tar.c - * - * This module implements three types of tar processing. A tar parser - * expects unlabelled chunks of data (e.g. ASTREAMER_UNKNOWN) and splits - * it into labelled chunks (any other value of astreamer_archive_context). - * A tar archiver does the reverse: it takes a bunch of labelled chunks - * and produces a tarfile, optionally replacing member headers and trailers - * so that upstream astreamer objects can perform surgery on the tarfile - * contents without knowing the details of the tar format. A tar terminator - * just adds two blocks of NUL bytes to the end of the file, since older - * server versions produce files with this terminator omitted. - * - * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group - * - * IDENTIFICATION - * src/bin/pg_basebackup/astreamer_tar.c - *------------------------------------------------------------------------- - */ - -#include "postgres_fe.h" - -#include - -#include "astreamer.h" -#include "common/logging.h" -#include "pgtar.h" - -typedef struct astreamer_tar_parser -{ - astreamer base; - astreamer_archive_context next_context; - astreamer_member member; - size_t file_bytes_sent; - size_t pad_bytes_expected; -} astreamer_tar_parser; - -typedef struct astreamer_tar_archiver -{ - astreamer base; - bool rearchive_member; -} astreamer_tar_archiver; - -static void astreamer_tar_parser_content(astreamer *streamer, - astreamer_member *member, - const char *data, int len, - astreamer_archive_context context); -static void astreamer_tar_parser_finalize(astreamer *streamer); -static void astreamer_tar_parser_free(astreamer *streamer); -static bool astreamer_tar_header(astreamer_tar_parser *mystreamer); - -static const astreamer_ops astreamer_tar_parser_ops = { - .content = astreamer_tar_parser_content, - .finalize = astreamer_tar_parser_finalize, - .free = astreamer_tar_parser_free -}; - -static void astreamer_tar_archiver_content(astreamer *streamer, - astreamer_member *member, - const char *data, int len, - astreamer_archive_context context); -static void astreamer_tar_archiver_finalize(astreamer *streamer); -static void astreamer_tar_archiver_free(astreamer *streamer); - -static const astreamer_ops astreamer_tar_archiver_ops = { - .content = astreamer_tar_archiver_content, - .finalize = astreamer_tar_archiver_finalize, - .free = astreamer_tar_archiver_free -}; - -static void astreamer_tar_terminator_content(astreamer *streamer, - astreamer_member *member, - const char *data, int len, - astreamer_archive_context context); -static void astreamer_tar_terminator_finalize(astreamer *streamer); -static void astreamer_tar_terminator_free(astreamer *streamer); - -static const astreamer_ops astreamer_tar_terminator_ops = { - .content = astreamer_tar_terminator_content, - .finalize = astreamer_tar_terminator_finalize, - .free = astreamer_tar_terminator_free -}; - -/* - * Create a astreamer that can parse a stream of content as tar data. - * - * The input should be a series of ASTREAMER_UNKNOWN chunks; the astreamer - * specified by 'next' will receive a series of typed chunks, as per the - * conventions described in astreamer.h. - */ -astreamer * -astreamer_tar_parser_new(astreamer *next) -{ - astreamer_tar_parser *streamer; - - streamer = palloc0(sizeof(astreamer_tar_parser)); - *((const astreamer_ops **) &streamer->base.bbs_ops) = - &astreamer_tar_parser_ops; - streamer->base.bbs_next = next; - initStringInfo(&streamer->base.bbs_buffer); - streamer->next_context = ASTREAMER_MEMBER_HEADER; - - return &streamer->base; -} - -/* - * Parse unknown content as tar data. - */ -static void -astreamer_tar_parser_content(astreamer *streamer, astreamer_member *member, - const char *data, int len, - astreamer_archive_context context) -{ - astreamer_tar_parser *mystreamer = (astreamer_tar_parser *) streamer; - size_t nbytes; - - /* Expect unparsed input. */ - Assert(member == NULL); - Assert(context == ASTREAMER_UNKNOWN); - - while (len > 0) - { - switch (mystreamer->next_context) - { - case ASTREAMER_MEMBER_HEADER: - - /* - * If we're expecting an archive member header, accumulate a - * full block of data before doing anything further. - */ - if (!astreamer_buffer_until(streamer, &data, &len, - TAR_BLOCK_SIZE)) - return; - - /* - * Now we can process the header and get ready to process the - * file contents; however, we might find out that what we - * thought was the next file header is actually the start of - * the archive trailer. Switch modes accordingly. - */ - if (astreamer_tar_header(mystreamer)) - { - if (mystreamer->member.size == 0) - { - /* No content; trailer is zero-length. */ - astreamer_content(mystreamer->base.bbs_next, - &mystreamer->member, - NULL, 0, - ASTREAMER_MEMBER_TRAILER); - - /* Expect next header. */ - mystreamer->next_context = ASTREAMER_MEMBER_HEADER; - } - else - { - /* Expect contents. */ - mystreamer->next_context = ASTREAMER_MEMBER_CONTENTS; - } - mystreamer->base.bbs_buffer.len = 0; - mystreamer->file_bytes_sent = 0; - } - else - mystreamer->next_context = ASTREAMER_ARCHIVE_TRAILER; - break; - - case ASTREAMER_MEMBER_CONTENTS: - - /* - * Send as much content as we have, but not more than the - * remaining file length. - */ - Assert(mystreamer->file_bytes_sent < mystreamer->member.size); - nbytes = mystreamer->member.size - mystreamer->file_bytes_sent; - nbytes = Min(nbytes, len); - Assert(nbytes > 0); - astreamer_content(mystreamer->base.bbs_next, - &mystreamer->member, - data, nbytes, - ASTREAMER_MEMBER_CONTENTS); - mystreamer->file_bytes_sent += nbytes; - data += nbytes; - len -= nbytes; - - /* - * If we've not yet sent the whole file, then there's more - * content to come; otherwise, it's time to expect the file - * trailer. - */ - Assert(mystreamer->file_bytes_sent <= mystreamer->member.size); - if (mystreamer->file_bytes_sent == mystreamer->member.size) - { - if (mystreamer->pad_bytes_expected == 0) - { - /* Trailer is zero-length. */ - astreamer_content(mystreamer->base.bbs_next, - &mystreamer->member, - NULL, 0, - ASTREAMER_MEMBER_TRAILER); - - /* Expect next header. */ - mystreamer->next_context = ASTREAMER_MEMBER_HEADER; - } - else - { - /* Trailer is not zero-length. */ - mystreamer->next_context = ASTREAMER_MEMBER_TRAILER; - } - mystreamer->base.bbs_buffer.len = 0; - } - break; - - case ASTREAMER_MEMBER_TRAILER: - - /* - * If we're expecting an archive member trailer, accumulate - * the expected number of padding bytes before sending - * anything onward. - */ - if (!astreamer_buffer_until(streamer, &data, &len, - mystreamer->pad_bytes_expected)) - return; - - /* OK, now we can send it. */ - astreamer_content(mystreamer->base.bbs_next, - &mystreamer->member, - data, mystreamer->pad_bytes_expected, - ASTREAMER_MEMBER_TRAILER); - - /* Expect next file header. */ - mystreamer->next_context = ASTREAMER_MEMBER_HEADER; - mystreamer->base.bbs_buffer.len = 0; - break; - - case ASTREAMER_ARCHIVE_TRAILER: - - /* - * We've seen an end-of-archive indicator, so anything more is - * buffered and sent as part of the archive trailer. But we - * don't expect more than 2 blocks. - */ - astreamer_buffer_bytes(streamer, &data, &len, len); - if (len > 2 * TAR_BLOCK_SIZE) - pg_fatal("tar file trailer exceeds 2 blocks"); - return; - - default: - /* Shouldn't happen. */ - pg_fatal("unexpected state while parsing tar archive"); - } - } -} - -/* - * Parse a file header within a tar stream. - * - * The return value is true if we found a file header and passed it on to the - * next astreamer; it is false if we have reached the archive trailer. - */ -static bool -astreamer_tar_header(astreamer_tar_parser *mystreamer) -{ - bool has_nonzero_byte = false; - int i; - astreamer_member *member = &mystreamer->member; - char *buffer = mystreamer->base.bbs_buffer.data; - - Assert(mystreamer->base.bbs_buffer.len == TAR_BLOCK_SIZE); - - /* Check whether we've got a block of all zero bytes. */ - for (i = 0; i < TAR_BLOCK_SIZE; ++i) - { - if (buffer[i] != '\0') - { - has_nonzero_byte = true; - break; - } - } - - /* - * If the entire block was zeros, this is the end of the archive, not the - * start of the next file. - */ - if (!has_nonzero_byte) - return false; - - /* - * Parse key fields out of the header. - */ - strlcpy(member->pathname, &buffer[TAR_OFFSET_NAME], MAXPGPATH); - if (member->pathname[0] == '\0') - pg_fatal("tar member has empty name"); - member->size = read_tar_number(&buffer[TAR_OFFSET_SIZE], 12); - member->mode = read_tar_number(&buffer[TAR_OFFSET_MODE], 8); - member->uid = read_tar_number(&buffer[TAR_OFFSET_UID], 8); - member->gid = read_tar_number(&buffer[TAR_OFFSET_GID], 8); - member->is_directory = - (buffer[TAR_OFFSET_TYPEFLAG] == TAR_FILETYPE_DIRECTORY); - member->is_link = - (buffer[TAR_OFFSET_TYPEFLAG] == TAR_FILETYPE_SYMLINK); - if (member->is_link) - strlcpy(member->linktarget, &buffer[TAR_OFFSET_LINKNAME], 100); - - /* Compute number of padding bytes. */ - mystreamer->pad_bytes_expected = tarPaddingBytesRequired(member->size); - - /* Forward the entire header to the next astreamer. */ - astreamer_content(mystreamer->base.bbs_next, member, - buffer, TAR_BLOCK_SIZE, - ASTREAMER_MEMBER_HEADER); - - return true; -} - -/* - * End-of-stream processing for a tar parser. - */ -static void -astreamer_tar_parser_finalize(astreamer *streamer) -{ - astreamer_tar_parser *mystreamer = (astreamer_tar_parser *) streamer; - - if (mystreamer->next_context != ASTREAMER_ARCHIVE_TRAILER && - (mystreamer->next_context != ASTREAMER_MEMBER_HEADER || - mystreamer->base.bbs_buffer.len > 0)) - pg_fatal("COPY stream ended before last file was finished"); - - /* Send the archive trailer, even if empty. */ - astreamer_content(streamer->bbs_next, NULL, - streamer->bbs_buffer.data, streamer->bbs_buffer.len, - ASTREAMER_ARCHIVE_TRAILER); - - /* Now finalize successor. */ - astreamer_finalize(streamer->bbs_next); -} - -/* - * Free memory associated with a tar parser. - */ -static void -astreamer_tar_parser_free(astreamer *streamer) -{ - pfree(streamer->bbs_buffer.data); - astreamer_free(streamer->bbs_next); -} - -/* - * Create a astreamer that can generate a tar archive. - * - * This is intended to be usable either for generating a brand-new tar archive - * or for modifying one on the fly. The input should be a series of typed - * chunks (i.e. not ASTREAMER_UNKNOWN). See also the comments for - * astreamer_tar_parser_content. - */ -astreamer * -astreamer_tar_archiver_new(astreamer *next) -{ - astreamer_tar_archiver *streamer; - - streamer = palloc0(sizeof(astreamer_tar_archiver)); - *((const astreamer_ops **) &streamer->base.bbs_ops) = - &astreamer_tar_archiver_ops; - streamer->base.bbs_next = next; - - return &streamer->base; -} - -/* - * Fix up the stream of input chunks to create a valid tar file. - * - * If a ASTREAMER_MEMBER_HEADER chunk is of size 0, it is replaced with a - * newly-constructed tar header. If it is of size TAR_BLOCK_SIZE, it is - * passed through without change. Any other size is a fatal error (and - * indicates a bug). - * - * Whenever a new ASTREAMER_MEMBER_HEADER chunk is constructed, the - * corresponding ASTREAMER_MEMBER_TRAILER chunk is also constructed from - * scratch. Specifically, we construct a block of zero bytes sufficient to - * pad out to a block boundary, as required by the tar format. Other - * ASTREAMER_MEMBER_TRAILER chunks are passed through without change. - * - * Any ASTREAMER_MEMBER_CONTENTS chunks are passed through without change. - * - * The ASTREAMER_ARCHIVE_TRAILER chunk is replaced with two - * blocks of zero bytes. Not all tar programs require this, but apparently - * some do. The server does not supply this trailer. If no archive trailer is - * present, one will be added by astreamer_tar_parser_finalize. - */ -static void -astreamer_tar_archiver_content(astreamer *streamer, - astreamer_member *member, - const char *data, int len, - astreamer_archive_context context) -{ - astreamer_tar_archiver *mystreamer = (astreamer_tar_archiver *) streamer; - char buffer[2 * TAR_BLOCK_SIZE]; - - Assert(context != ASTREAMER_UNKNOWN); - - if (context == ASTREAMER_MEMBER_HEADER && len != TAR_BLOCK_SIZE) - { - Assert(len == 0); - - /* Replace zero-length tar header with a newly constructed one. */ - tarCreateHeader(buffer, member->pathname, NULL, - member->size, member->mode, member->uid, member->gid, - time(NULL)); - data = buffer; - len = TAR_BLOCK_SIZE; - - /* Also make a note to replace padding, in case size changed. */ - mystreamer->rearchive_member = true; - } - else if (context == ASTREAMER_MEMBER_TRAILER && - mystreamer->rearchive_member) - { - int pad_bytes = tarPaddingBytesRequired(member->size); - - /* Also replace padding, if we regenerated the header. */ - memset(buffer, 0, pad_bytes); - data = buffer; - len = pad_bytes; - - /* Don't do this again unless we replace another header. */ - mystreamer->rearchive_member = false; - } - else if (context == ASTREAMER_ARCHIVE_TRAILER) - { - /* Trailer should always be two blocks of zero bytes. */ - memset(buffer, 0, 2 * TAR_BLOCK_SIZE); - data = buffer; - len = 2 * TAR_BLOCK_SIZE; - } - - astreamer_content(streamer->bbs_next, member, data, len, context); -} - -/* - * End-of-stream processing for a tar archiver. - */ -static void -astreamer_tar_archiver_finalize(astreamer *streamer) -{ - astreamer_finalize(streamer->bbs_next); -} - -/* - * Free memory associated with a tar archiver. - */ -static void -astreamer_tar_archiver_free(astreamer *streamer) -{ - astreamer_free(streamer->bbs_next); - pfree(streamer); -} - -/* - * Create a astreamer that blindly adds two blocks of NUL bytes to the - * end of an incomplete tarfile that the server might send us. - */ -astreamer * -astreamer_tar_terminator_new(astreamer *next) -{ - astreamer *streamer; - - streamer = palloc0(sizeof(astreamer)); - *((const astreamer_ops **) &streamer->bbs_ops) = - &astreamer_tar_terminator_ops; - streamer->bbs_next = next; - - return streamer; -} - -/* - * Pass all the content through without change. - */ -static void -astreamer_tar_terminator_content(astreamer *streamer, - astreamer_member *member, - const char *data, int len, - astreamer_archive_context context) -{ - /* Expect unparsed input. */ - Assert(member == NULL); - Assert(context == ASTREAMER_UNKNOWN); - - /* Just forward it. */ - astreamer_content(streamer->bbs_next, member, data, len, context); -} - -/* - * At the end, blindly add the two blocks of NUL bytes which the server fails - * to supply. - */ -static void -astreamer_tar_terminator_finalize(astreamer *streamer) -{ - char buffer[2 * TAR_BLOCK_SIZE]; - - memset(buffer, 0, 2 * TAR_BLOCK_SIZE); - astreamer_content(streamer->bbs_next, NULL, buffer, - 2 * TAR_BLOCK_SIZE, ASTREAMER_UNKNOWN); - astreamer_finalize(streamer->bbs_next); -} - -/* - * Free memory associated with a tar terminator. - */ -static void -astreamer_tar_terminator_free(astreamer *streamer) -{ - astreamer_free(streamer->bbs_next); - pfree(streamer); -} diff --git a/src/bin/pg_basebackup/astreamer_zstd.c b/src/bin/pg_basebackup/astreamer_zstd.c deleted file mode 100644 index 58dc679ef99..00000000000 --- a/src/bin/pg_basebackup/astreamer_zstd.c +++ /dev/null @@ -1,368 +0,0 @@ -/*------------------------------------------------------------------------- - * - * astreamer_zstd.c - * - * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group - * - * IDENTIFICATION - * src/bin/pg_basebackup/astreamer_zstd.c - *------------------------------------------------------------------------- - */ - -#include "postgres_fe.h" - -#include - -#ifdef USE_ZSTD -#include -#endif - -#include "astreamer.h" -#include "common/logging.h" - -#ifdef USE_ZSTD - -typedef struct astreamer_zstd_frame -{ - astreamer base; - - ZSTD_CCtx *cctx; - ZSTD_DCtx *dctx; - ZSTD_outBuffer zstd_outBuf; -} astreamer_zstd_frame; - -static void astreamer_zstd_compressor_content(astreamer *streamer, - astreamer_member *member, - const char *data, int len, - astreamer_archive_context context); -static void astreamer_zstd_compressor_finalize(astreamer *streamer); -static void astreamer_zstd_compressor_free(astreamer *streamer); - -static const astreamer_ops astreamer_zstd_compressor_ops = { - .content = astreamer_zstd_compressor_content, - .finalize = astreamer_zstd_compressor_finalize, - .free = astreamer_zstd_compressor_free -}; - -static void astreamer_zstd_decompressor_content(astreamer *streamer, - astreamer_member *member, - const char *data, int len, - astreamer_archive_context context); -static void astreamer_zstd_decompressor_finalize(astreamer *streamer); -static void astreamer_zstd_decompressor_free(astreamer *streamer); - -static const astreamer_ops astreamer_zstd_decompressor_ops = { - .content = astreamer_zstd_decompressor_content, - .finalize = astreamer_zstd_decompressor_finalize, - .free = astreamer_zstd_decompressor_free -}; -#endif - -/* - * Create a new base backup streamer that performs zstd compression of tar - * blocks. - */ -astreamer * -astreamer_zstd_compressor_new(astreamer *next, pg_compress_specification *compress) -{ -#ifdef USE_ZSTD - astreamer_zstd_frame *streamer; - size_t ret; - - Assert(next != NULL); - - streamer = palloc0(sizeof(astreamer_zstd_frame)); - - *((const astreamer_ops **) &streamer->base.bbs_ops) = - &astreamer_zstd_compressor_ops; - - streamer->base.bbs_next = next; - initStringInfo(&streamer->base.bbs_buffer); - enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize()); - - streamer->cctx = ZSTD_createCCtx(); - if (!streamer->cctx) - pg_fatal("could not create zstd compression context"); - - /* Set compression level */ - ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel, - compress->level); - if (ZSTD_isError(ret)) - pg_fatal("could not set zstd compression level to %d: %s", - compress->level, ZSTD_getErrorName(ret)); - - /* Set # of workers, if specified */ - if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0) - { - /* - * On older versions of libzstd, this option does not exist, and - * trying to set it will fail. Similarly for newer versions if they - * are compiled without threading support. - */ - ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers, - compress->workers); - if (ZSTD_isError(ret)) - pg_fatal("could not set compression worker count to %d: %s", - compress->workers, ZSTD_getErrorName(ret)); - } - - if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0) - { - ret = ZSTD_CCtx_setParameter(streamer->cctx, - ZSTD_c_enableLongDistanceMatching, - compress->long_distance); - if (ZSTD_isError(ret)) - { - pg_log_error("could not enable long-distance mode: %s", - ZSTD_getErrorName(ret)); - exit(1); - } - } - - /* Initialize the ZSTD output buffer. */ - streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data; - streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen; - streamer->zstd_outBuf.pos = 0; - - return &streamer->base; -#else - pg_fatal("this build does not support compression with %s", "ZSTD"); - return NULL; /* keep compiler quiet */ -#endif -} - -#ifdef USE_ZSTD -/* - * Compress the input data to output buffer. - * - * Find out the compression bound based on input data length for each - * invocation to make sure that output buffer has enough capacity to - * accommodate the compressed data. In case if the output buffer - * capacity falls short of compression bound then forward the content - * of output buffer to next streamer and empty the buffer. - */ -static void -astreamer_zstd_compressor_content(astreamer *streamer, - astreamer_member *member, - const char *data, int len, - astreamer_archive_context context) -{ - astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer; - ZSTD_inBuffer inBuf = {data, len, 0}; - - while (inBuf.pos < inBuf.size) - { - size_t yet_to_flush; - size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos); - - /* - * If the output buffer is not left with enough space, send the - * compressed bytes to the next streamer, and empty the buffer. - */ - if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos < - max_needed) - { - astreamer_content(mystreamer->base.bbs_next, member, - mystreamer->zstd_outBuf.dst, - mystreamer->zstd_outBuf.pos, - context); - - /* Reset the ZSTD output buffer. */ - mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data; - mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen; - mystreamer->zstd_outBuf.pos = 0; - } - - yet_to_flush = - ZSTD_compressStream2(mystreamer->cctx, &mystreamer->zstd_outBuf, - &inBuf, ZSTD_e_continue); - - if (ZSTD_isError(yet_to_flush)) - pg_log_error("could not compress data: %s", - ZSTD_getErrorName(yet_to_flush)); - } -} - -/* - * End-of-stream processing. - */ -static void -astreamer_zstd_compressor_finalize(astreamer *streamer) -{ - astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer; - size_t yet_to_flush; - - do - { - ZSTD_inBuffer in = {NULL, 0, 0}; - size_t max_needed = ZSTD_compressBound(0); - - /* - * If the output buffer is not left with enough space, send the - * compressed bytes to the next streamer, and empty the buffer. - */ - if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos < - max_needed) - { - astreamer_content(mystreamer->base.bbs_next, NULL, - mystreamer->zstd_outBuf.dst, - mystreamer->zstd_outBuf.pos, - ASTREAMER_UNKNOWN); - - /* Reset the ZSTD output buffer. */ - mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data; - mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen; - mystreamer->zstd_outBuf.pos = 0; - } - - yet_to_flush = ZSTD_compressStream2(mystreamer->cctx, - &mystreamer->zstd_outBuf, - &in, ZSTD_e_end); - - if (ZSTD_isError(yet_to_flush)) - pg_log_error("could not compress data: %s", - ZSTD_getErrorName(yet_to_flush)); - - } while (yet_to_flush > 0); - - /* Make sure to pass any remaining bytes to the next streamer. */ - if (mystreamer->zstd_outBuf.pos > 0) - astreamer_content(mystreamer->base.bbs_next, NULL, - mystreamer->zstd_outBuf.dst, - mystreamer->zstd_outBuf.pos, - ASTREAMER_UNKNOWN); - - astreamer_finalize(mystreamer->base.bbs_next); -} - -/* - * Free memory. - */ -static void -astreamer_zstd_compressor_free(astreamer *streamer) -{ - astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer; - - astreamer_free(streamer->bbs_next); - ZSTD_freeCCtx(mystreamer->cctx); - pfree(streamer->bbs_buffer.data); - pfree(streamer); -} -#endif - -/* - * Create a new base backup streamer that performs decompression of zstd - * compressed blocks. - */ -astreamer * -astreamer_zstd_decompressor_new(astreamer *next) -{ -#ifdef USE_ZSTD - astreamer_zstd_frame *streamer; - - Assert(next != NULL); - - streamer = palloc0(sizeof(astreamer_zstd_frame)); - *((const astreamer_ops **) &streamer->base.bbs_ops) = - &astreamer_zstd_decompressor_ops; - - streamer->base.bbs_next = next; - initStringInfo(&streamer->base.bbs_buffer); - enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize()); - - streamer->dctx = ZSTD_createDCtx(); - if (!streamer->dctx) - pg_fatal("could not create zstd decompression context"); - - /* Initialize the ZSTD output buffer. */ - streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data; - streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen; - streamer->zstd_outBuf.pos = 0; - - return &streamer->base; -#else - pg_fatal("this build does not support compression with %s", "ZSTD"); - return NULL; /* keep compiler quiet */ -#endif -} - -#ifdef USE_ZSTD -/* - * Decompress the input data to output buffer until we run out of input - * data. Each time the output buffer is full, pass on the decompressed data - * to the next streamer. - */ -static void -astreamer_zstd_decompressor_content(astreamer *streamer, - astreamer_member *member, - const char *data, int len, - astreamer_archive_context context) -{ - astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer; - ZSTD_inBuffer inBuf = {data, len, 0}; - - while (inBuf.pos < inBuf.size) - { - size_t ret; - - /* - * If output buffer is full then forward the content to next streamer - * and update the output buffer. - */ - if (mystreamer->zstd_outBuf.pos >= mystreamer->zstd_outBuf.size) - { - astreamer_content(mystreamer->base.bbs_next, member, - mystreamer->zstd_outBuf.dst, - mystreamer->zstd_outBuf.pos, - context); - - /* Reset the ZSTD output buffer. */ - mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data; - mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen; - mystreamer->zstd_outBuf.pos = 0; - } - - ret = ZSTD_decompressStream(mystreamer->dctx, - &mystreamer->zstd_outBuf, &inBuf); - - if (ZSTD_isError(ret)) - pg_log_error("could not decompress data: %s", - ZSTD_getErrorName(ret)); - } -} - -/* - * End-of-stream processing. - */ -static void -astreamer_zstd_decompressor_finalize(astreamer *streamer) -{ - astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer; - - /* - * End of the stream, if there is some pending data in output buffers then - * we must forward it to next streamer. - */ - if (mystreamer->zstd_outBuf.pos > 0) - astreamer_content(mystreamer->base.bbs_next, NULL, - mystreamer->base.bbs_buffer.data, - mystreamer->base.bbs_buffer.maxlen, - ASTREAMER_UNKNOWN); - - astreamer_finalize(mystreamer->base.bbs_next); -} - -/* - * Free memory. - */ -static void -astreamer_zstd_decompressor_free(astreamer *streamer) -{ - astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer; - - astreamer_free(streamer->bbs_next); - ZSTD_freeDCtx(mystreamer->dctx); - pfree(streamer->bbs_buffer.data); - pfree(streamer); -} -#endif diff --git a/src/bin/pg_basebackup/meson.build b/src/bin/pg_basebackup/meson.build index a68dbd7837d..9101fc18438 100644 --- a/src/bin/pg_basebackup/meson.build +++ b/src/bin/pg_basebackup/meson.build @@ -1,12 +1,7 @@ # Copyright (c) 2022-2024, PostgreSQL Global Development Group common_sources = files( - 'astreamer_file.c', - 'astreamer_gzip.c', 'astreamer_inject.c', - 'astreamer_lz4.c', - 'astreamer_tar.c', - 'astreamer_zstd.c', 'receivelog.c', 'streamutil.c', 'walmethods.c', diff --git a/src/fe_utils/Makefile b/src/fe_utils/Makefile index 946c05258f0..2694be4b859 100644 --- a/src/fe_utils/Makefile +++ b/src/fe_utils/Makefile @@ -21,6 +21,11 @@ override CPPFLAGS := -DFRONTEND -I$(libpq_srcdir) $(CPPFLAGS) OBJS = \ archive.o \ + astreamer_file.o \ + astreamer_gzip.o \ + astreamer_lz4.o \ + astreamer_tar.o \ + astreamer_zstd.o \ cancel.o \ conditional.o \ connect_utils.o \ diff --git a/src/fe_utils/astreamer_file.c b/src/fe_utils/astreamer_file.c new file mode 100644 index 00000000000..13d1192c6e6 --- /dev/null +++ b/src/fe_utils/astreamer_file.c @@ -0,0 +1,396 @@ +/*------------------------------------------------------------------------- + * + * astreamer_file.c + * + * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/astreamer_file.c + *------------------------------------------------------------------------- + */ + +#include "postgres_fe.h" + +#include + +#include "common/file_perm.h" +#include "common/logging.h" +#include "common/string.h" +#include "fe_utils/astreamer.h" + +typedef struct astreamer_plain_writer +{ + astreamer base; + char *pathname; + FILE *file; + bool should_close_file; +} astreamer_plain_writer; + +typedef struct astreamer_extractor +{ + astreamer base; + char *basepath; + const char *(*link_map) (const char *); + void (*report_output_file) (const char *); + char filename[MAXPGPATH]; + FILE *file; +} astreamer_extractor; + +static void astreamer_plain_writer_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_plain_writer_finalize(astreamer *streamer); +static void astreamer_plain_writer_free(astreamer *streamer); + +static const astreamer_ops astreamer_plain_writer_ops = { + .content = astreamer_plain_writer_content, + .finalize = astreamer_plain_writer_finalize, + .free = astreamer_plain_writer_free +}; + +static void astreamer_extractor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_extractor_finalize(astreamer *streamer); +static void astreamer_extractor_free(astreamer *streamer); +static void extract_directory(const char *filename, mode_t mode); +static void extract_link(const char *filename, const char *linktarget); +static FILE *create_file_for_extract(const char *filename, mode_t mode); + +static const astreamer_ops astreamer_extractor_ops = { + .content = astreamer_extractor_content, + .finalize = astreamer_extractor_finalize, + .free = astreamer_extractor_free +}; + +/* + * Create a astreamer that just writes data to a file. + * + * The caller must specify a pathname and may specify a file. The pathname is + * used for error-reporting purposes either way. If file is NULL, the pathname + * also identifies the file to which the data should be written: it is opened + * for writing and closed when done. If file is not NULL, the data is written + * there. + */ +astreamer * +astreamer_plain_writer_new(char *pathname, FILE *file) +{ + astreamer_plain_writer *streamer; + + streamer = palloc0(sizeof(astreamer_plain_writer)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_plain_writer_ops; + + streamer->pathname = pstrdup(pathname); + streamer->file = file; + + if (file == NULL) + { + streamer->file = fopen(pathname, "wb"); + if (streamer->file == NULL) + pg_fatal("could not create file \"%s\": %m", pathname); + streamer->should_close_file = true; + } + + return &streamer->base; +} + +/* + * Write archive content to file. + */ +static void +astreamer_plain_writer_content(astreamer *streamer, + astreamer_member *member, const char *data, + int len, astreamer_archive_context context) +{ + astreamer_plain_writer *mystreamer; + + mystreamer = (astreamer_plain_writer *) streamer; + + if (len == 0) + return; + + errno = 0; + if (fwrite(data, len, 1, mystreamer->file) != 1) + { + /* if write didn't set errno, assume problem is no disk space */ + if (errno == 0) + errno = ENOSPC; + pg_fatal("could not write to file \"%s\": %m", + mystreamer->pathname); + } +} + +/* + * End-of-archive processing when writing to a plain file consists of closing + * the file if we opened it, but not if the caller provided it. + */ +static void +astreamer_plain_writer_finalize(astreamer *streamer) +{ + astreamer_plain_writer *mystreamer; + + mystreamer = (astreamer_plain_writer *) streamer; + + if (mystreamer->should_close_file && fclose(mystreamer->file) != 0) + pg_fatal("could not close file \"%s\": %m", + mystreamer->pathname); + + mystreamer->file = NULL; + mystreamer->should_close_file = false; +} + +/* + * Free memory associated with this astreamer. + */ +static void +astreamer_plain_writer_free(astreamer *streamer) +{ + astreamer_plain_writer *mystreamer; + + mystreamer = (astreamer_plain_writer *) streamer; + + Assert(!mystreamer->should_close_file); + Assert(mystreamer->base.bbs_next == NULL); + + pfree(mystreamer->pathname); + pfree(mystreamer); +} + +/* + * Create a astreamer that extracts an archive. + * + * All pathnames in the archive are interpreted relative to basepath. + * + * Unlike e.g. astreamer_plain_writer_new() we can't do anything useful here + * with untyped chunks; we need typed chunks which follow the rules described + * in astreamer.h. Assuming we have that, we don't need to worry about the + * original archive format; it's enough to just look at the member information + * provided and write to the corresponding file. + * + * 'link_map' is a function that will be applied to the target of any + * symbolic link, and which should return a replacement pathname to be used + * in its place. If NULL, the symbolic link target is used without + * modification. + * + * 'report_output_file' is a function that will be called each time we open a + * new output file. The pathname to that file is passed as an argument. If + * NULL, the call is skipped. + */ +astreamer * +astreamer_extractor_new(const char *basepath, + const char *(*link_map) (const char *), + void (*report_output_file) (const char *)) +{ + astreamer_extractor *streamer; + + streamer = palloc0(sizeof(astreamer_extractor)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_extractor_ops; + streamer->basepath = pstrdup(basepath); + streamer->link_map = link_map; + streamer->report_output_file = report_output_file; + + return &streamer->base; +} + +/* + * Extract archive contents to the filesystem. + */ +static void +astreamer_extractor_content(astreamer *streamer, astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) +{ + astreamer_extractor *mystreamer = (astreamer_extractor *) streamer; + int fnamelen; + + Assert(member != NULL || context == ASTREAMER_ARCHIVE_TRAILER); + Assert(context != ASTREAMER_UNKNOWN); + + switch (context) + { + case ASTREAMER_MEMBER_HEADER: + Assert(mystreamer->file == NULL); + + /* Prepend basepath. */ + snprintf(mystreamer->filename, sizeof(mystreamer->filename), + "%s/%s", mystreamer->basepath, member->pathname); + + /* Remove any trailing slash. */ + fnamelen = strlen(mystreamer->filename); + if (mystreamer->filename[fnamelen - 1] == '/') + mystreamer->filename[fnamelen - 1] = '\0'; + + /* Dispatch based on file type. */ + if (member->is_directory) + extract_directory(mystreamer->filename, member->mode); + else if (member->is_link) + { + const char *linktarget = member->linktarget; + + if (mystreamer->link_map) + linktarget = mystreamer->link_map(linktarget); + extract_link(mystreamer->filename, linktarget); + } + else + mystreamer->file = + create_file_for_extract(mystreamer->filename, + member->mode); + + /* Report output file change. */ + if (mystreamer->report_output_file) + mystreamer->report_output_file(mystreamer->filename); + break; + + case ASTREAMER_MEMBER_CONTENTS: + if (mystreamer->file == NULL) + break; + + errno = 0; + if (len > 0 && fwrite(data, len, 1, mystreamer->file) != 1) + { + /* if write didn't set errno, assume problem is no disk space */ + if (errno == 0) + errno = ENOSPC; + pg_fatal("could not write to file \"%s\": %m", + mystreamer->filename); + } + break; + + case ASTREAMER_MEMBER_TRAILER: + if (mystreamer->file == NULL) + break; + fclose(mystreamer->file); + mystreamer->file = NULL; + break; + + case ASTREAMER_ARCHIVE_TRAILER: + break; + + default: + /* Shouldn't happen. */ + pg_fatal("unexpected state while extracting archive"); + } +} + +/* + * Should we tolerate an already-existing directory? + * + * When streaming WAL, pg_wal (or pg_xlog for pre-9.6 clusters) will have been + * created by the wal receiver process. Also, when the WAL directory location + * was specified, pg_wal (or pg_xlog) has already been created as a symbolic + * link before starting the actual backup. So just ignore creation failures + * on related directories. + * + * If in-place tablespaces are used, pg_tblspc and subdirectories may already + * exist when we get here. So tolerate that case, too. + */ +static bool +should_allow_existing_directory(const char *pathname) +{ + const char *filename = last_dir_separator(pathname) + 1; + + if (strcmp(filename, "pg_wal") == 0 || + strcmp(filename, "pg_xlog") == 0 || + strcmp(filename, "archive_status") == 0 || + strcmp(filename, "summaries") == 0 || + strcmp(filename, "pg_tblspc") == 0) + return true; + + if (strspn(filename, "0123456789") == strlen(filename)) + { + const char *pg_tblspc = strstr(pathname, "/pg_tblspc/"); + + return pg_tblspc != NULL && pg_tblspc + 11 == filename; + } + + return false; +} + +/* + * Create a directory. + */ +static void +extract_directory(const char *filename, mode_t mode) +{ + if (mkdir(filename, pg_dir_create_mode) != 0 && + (errno != EEXIST || !should_allow_existing_directory(filename))) + pg_fatal("could not create directory \"%s\": %m", + filename); + +#ifndef WIN32 + if (chmod(filename, mode)) + pg_fatal("could not set permissions on directory \"%s\": %m", + filename); +#endif +} + +/* + * Create a symbolic link. + * + * It's most likely a link in pg_tblspc directory, to the location of a + * tablespace. Apply any tablespace mapping given on the command line + * (--tablespace-mapping). (We blindly apply the mapping without checking that + * the link really is inside pg_tblspc. We don't expect there to be other + * symlinks in a data directory, but if there are, you can call it an + * undocumented feature that you can map them too.) + */ +static void +extract_link(const char *filename, const char *linktarget) +{ + if (symlink(linktarget, filename) != 0) + pg_fatal("could not create symbolic link from \"%s\" to \"%s\": %m", + filename, linktarget); +} + +/* + * Create a regular file. + * + * Return the resulting handle so we can write the content to the file. + */ +static FILE * +create_file_for_extract(const char *filename, mode_t mode) +{ + FILE *file; + + file = fopen(filename, "wb"); + if (file == NULL) + pg_fatal("could not create file \"%s\": %m", filename); + +#ifndef WIN32 + if (chmod(filename, mode)) + pg_fatal("could not set permissions on file \"%s\": %m", + filename); +#endif + + return file; +} + +/* + * End-of-stream processing for extracting an archive. + * + * There's nothing to do here but sanity checking. + */ +static void +astreamer_extractor_finalize(astreamer *streamer) +{ + astreamer_extractor *mystreamer PG_USED_FOR_ASSERTS_ONLY + = (astreamer_extractor *) streamer; + + Assert(mystreamer->file == NULL); +} + +/* + * Free memory. + */ +static void +astreamer_extractor_free(astreamer *streamer) +{ + astreamer_extractor *mystreamer = (astreamer_extractor *) streamer; + + pfree(mystreamer->basepath); + pfree(mystreamer); +} diff --git a/src/fe_utils/astreamer_gzip.c b/src/fe_utils/astreamer_gzip.c new file mode 100644 index 00000000000..dd28defac7b --- /dev/null +++ b/src/fe_utils/astreamer_gzip.c @@ -0,0 +1,364 @@ +/*------------------------------------------------------------------------- + * + * astreamer_gzip.c + * + * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/astreamer_gzip.c + *------------------------------------------------------------------------- + */ + +#include "postgres_fe.h" + +#include + +#ifdef HAVE_LIBZ +#include +#endif + +#include "common/file_perm.h" +#include "common/logging.h" +#include "common/string.h" +#include "fe_utils/astreamer.h" + +#ifdef HAVE_LIBZ +typedef struct astreamer_gzip_writer +{ + astreamer base; + char *pathname; + gzFile gzfile; +} astreamer_gzip_writer; + +typedef struct astreamer_gzip_decompressor +{ + astreamer base; + z_stream zstream; + size_t bytes_written; +} astreamer_gzip_decompressor; + +static void astreamer_gzip_writer_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_gzip_writer_finalize(astreamer *streamer); +static void astreamer_gzip_writer_free(astreamer *streamer); +static const char *get_gz_error(gzFile gzf); + +static const astreamer_ops astreamer_gzip_writer_ops = { + .content = astreamer_gzip_writer_content, + .finalize = astreamer_gzip_writer_finalize, + .free = astreamer_gzip_writer_free +}; + +static void astreamer_gzip_decompressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_gzip_decompressor_finalize(astreamer *streamer); +static void astreamer_gzip_decompressor_free(astreamer *streamer); +static void *gzip_palloc(void *opaque, unsigned items, unsigned size); +static void gzip_pfree(void *opaque, void *address); + +static const astreamer_ops astreamer_gzip_decompressor_ops = { + .content = astreamer_gzip_decompressor_content, + .finalize = astreamer_gzip_decompressor_finalize, + .free = astreamer_gzip_decompressor_free +}; +#endif + +/* + * Create a astreamer that just compresses data using gzip, and then writes + * it to a file. + * + * As in the case of astreamer_plain_writer_new, pathname is always used + * for error reporting purposes; if file is NULL, it is also the opened and + * closed so that the data may be written there. + */ +astreamer * +astreamer_gzip_writer_new(char *pathname, FILE *file, + pg_compress_specification *compress) +{ +#ifdef HAVE_LIBZ + astreamer_gzip_writer *streamer; + + streamer = palloc0(sizeof(astreamer_gzip_writer)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_gzip_writer_ops; + + streamer->pathname = pstrdup(pathname); + + if (file == NULL) + { + streamer->gzfile = gzopen(pathname, "wb"); + if (streamer->gzfile == NULL) + pg_fatal("could not create compressed file \"%s\": %m", + pathname); + } + else + { + int fd = dup(fileno(file)); + + if (fd < 0) + pg_fatal("could not duplicate stdout: %m"); + + streamer->gzfile = gzdopen(fd, "wb"); + if (streamer->gzfile == NULL) + pg_fatal("could not open output file: %m"); + } + + if (gzsetparams(streamer->gzfile, compress->level, Z_DEFAULT_STRATEGY) != Z_OK) + pg_fatal("could not set compression level %d: %s", + compress->level, get_gz_error(streamer->gzfile)); + + return &streamer->base; +#else + pg_fatal("this build does not support compression with %s", "gzip"); + return NULL; /* keep compiler quiet */ +#endif +} + +#ifdef HAVE_LIBZ +/* + * Write archive content to gzip file. + */ +static void +astreamer_gzip_writer_content(astreamer *streamer, + astreamer_member *member, const char *data, + int len, astreamer_archive_context context) +{ + astreamer_gzip_writer *mystreamer; + + mystreamer = (astreamer_gzip_writer *) streamer; + + if (len == 0) + return; + + errno = 0; + if (gzwrite(mystreamer->gzfile, data, len) != len) + { + /* if write didn't set errno, assume problem is no disk space */ + if (errno == 0) + errno = ENOSPC; + pg_fatal("could not write to compressed file \"%s\": %s", + mystreamer->pathname, get_gz_error(mystreamer->gzfile)); + } +} + +/* + * End-of-archive processing when writing to a gzip file consists of just + * calling gzclose. + * + * It makes no difference whether we opened the file or the caller did it, + * because libz provides no way of avoiding a close on the underlying file + * handle. Notice, however, that astreamer_gzip_writer_new() uses dup() to + * work around this issue, so that the behavior from the caller's viewpoint + * is the same as for astreamer_plain_writer. + */ +static void +astreamer_gzip_writer_finalize(astreamer *streamer) +{ + astreamer_gzip_writer *mystreamer; + + mystreamer = (astreamer_gzip_writer *) streamer; + + errno = 0; /* in case gzclose() doesn't set it */ + if (gzclose(mystreamer->gzfile) != 0) + pg_fatal("could not close compressed file \"%s\": %m", + mystreamer->pathname); + + mystreamer->gzfile = NULL; +} + +/* + * Free memory associated with this astreamer. + */ +static void +astreamer_gzip_writer_free(astreamer *streamer) +{ + astreamer_gzip_writer *mystreamer; + + mystreamer = (astreamer_gzip_writer *) streamer; + + Assert(mystreamer->base.bbs_next == NULL); + Assert(mystreamer->gzfile == NULL); + + pfree(mystreamer->pathname); + pfree(mystreamer); +} + +/* + * Helper function for libz error reporting. + */ +static const char * +get_gz_error(gzFile gzf) +{ + int errnum; + const char *errmsg; + + errmsg = gzerror(gzf, &errnum); + if (errnum == Z_ERRNO) + return strerror(errno); + else + return errmsg; +} +#endif + +/* + * Create a new base backup streamer that performs decompression of gzip + * compressed blocks. + */ +astreamer * +astreamer_gzip_decompressor_new(astreamer *next) +{ +#ifdef HAVE_LIBZ + astreamer_gzip_decompressor *streamer; + z_stream *zs; + + Assert(next != NULL); + + streamer = palloc0(sizeof(astreamer_gzip_decompressor)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_gzip_decompressor_ops; + + streamer->base.bbs_next = next; + initStringInfo(&streamer->base.bbs_buffer); + + /* Initialize internal stream state for decompression */ + zs = &streamer->zstream; + zs->zalloc = gzip_palloc; + zs->zfree = gzip_pfree; + zs->next_out = (uint8 *) streamer->base.bbs_buffer.data; + zs->avail_out = streamer->base.bbs_buffer.maxlen; + + /* + * Data compression was initialized using deflateInit2 to request a gzip + * header. Similarly, we are using inflateInit2 to initialize data + * decompression. + * + * Per the documentation for inflateInit2, the second argument is + * "windowBits" and its value must be greater than or equal to the value + * provided while compressing the data, so we are using the maximum + * possible value for safety. + */ + if (inflateInit2(zs, 15 + 16) != Z_OK) + pg_fatal("could not initialize compression library"); + + return &streamer->base; +#else + pg_fatal("this build does not support compression with %s", "gzip"); + return NULL; /* keep compiler quiet */ +#endif +} + +#ifdef HAVE_LIBZ +/* + * Decompress the input data to output buffer until we run out of input + * data. Each time the output buffer is full, pass on the decompressed data + * to the next streamer. + */ +static void +astreamer_gzip_decompressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) +{ + astreamer_gzip_decompressor *mystreamer; + z_stream *zs; + + mystreamer = (astreamer_gzip_decompressor *) streamer; + + zs = &mystreamer->zstream; + zs->next_in = (const uint8 *) data; + zs->avail_in = len; + + /* Process the current chunk */ + while (zs->avail_in > 0) + { + int res; + + Assert(mystreamer->bytes_written < mystreamer->base.bbs_buffer.maxlen); + + zs->next_out = (uint8 *) + mystreamer->base.bbs_buffer.data + mystreamer->bytes_written; + zs->avail_out = + mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written; + + /* + * This call decompresses data starting at zs->next_in and updates + * zs->next_in * and zs->avail_in. It generates output data starting + * at zs->next_out and updates zs->next_out and zs->avail_out + * accordingly. + */ + res = inflate(zs, Z_NO_FLUSH); + + if (res == Z_STREAM_ERROR) + pg_log_error("could not decompress data: %s", zs->msg); + + mystreamer->bytes_written = + mystreamer->base.bbs_buffer.maxlen - zs->avail_out; + + /* If output buffer is full then pass data to next streamer */ + if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen) + { + astreamer_content(mystreamer->base.bbs_next, member, + mystreamer->base.bbs_buffer.data, + mystreamer->base.bbs_buffer.maxlen, context); + mystreamer->bytes_written = 0; + } + } +} + +/* + * End-of-stream processing. + */ +static void +astreamer_gzip_decompressor_finalize(astreamer *streamer) +{ + astreamer_gzip_decompressor *mystreamer; + + mystreamer = (astreamer_gzip_decompressor *) streamer; + + /* + * End of the stream, if there is some pending data in output buffers then + * we must forward it to next streamer. + */ + astreamer_content(mystreamer->base.bbs_next, NULL, + mystreamer->base.bbs_buffer.data, + mystreamer->base.bbs_buffer.maxlen, + ASTREAMER_UNKNOWN); + + astreamer_finalize(mystreamer->base.bbs_next); +} + +/* + * Free memory. + */ +static void +astreamer_gzip_decompressor_free(astreamer *streamer) +{ + astreamer_free(streamer->bbs_next); + pfree(streamer->bbs_buffer.data); + pfree(streamer); +} + +/* + * Wrapper function to adjust the signature of palloc to match what libz + * expects. + */ +static void * +gzip_palloc(void *opaque, unsigned items, unsigned size) +{ + return palloc(items * size); +} + +/* + * Wrapper function to adjust the signature of pfree to match what libz + * expects. + */ +static void +gzip_pfree(void *opaque, void *address) +{ + pfree(address); +} +#endif diff --git a/src/fe_utils/astreamer_lz4.c b/src/fe_utils/astreamer_lz4.c new file mode 100644 index 00000000000..d8b2a367e47 --- /dev/null +++ b/src/fe_utils/astreamer_lz4.c @@ -0,0 +1,422 @@ +/*------------------------------------------------------------------------- + * + * astreamer_lz4.c + * + * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/astreamer_lz4.c + *------------------------------------------------------------------------- + */ + +#include "postgres_fe.h" + +#include + +#ifdef USE_LZ4 +#include +#endif + +#include "common/file_perm.h" +#include "common/logging.h" +#include "common/string.h" +#include "fe_utils/astreamer.h" + +#ifdef USE_LZ4 +typedef struct astreamer_lz4_frame +{ + astreamer base; + + LZ4F_compressionContext_t cctx; + LZ4F_decompressionContext_t dctx; + LZ4F_preferences_t prefs; + + size_t bytes_written; + bool header_written; +} astreamer_lz4_frame; + +static void astreamer_lz4_compressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_lz4_compressor_finalize(astreamer *streamer); +static void astreamer_lz4_compressor_free(astreamer *streamer); + +static const astreamer_ops astreamer_lz4_compressor_ops = { + .content = astreamer_lz4_compressor_content, + .finalize = astreamer_lz4_compressor_finalize, + .free = astreamer_lz4_compressor_free +}; + +static void astreamer_lz4_decompressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_lz4_decompressor_finalize(astreamer *streamer); +static void astreamer_lz4_decompressor_free(astreamer *streamer); + +static const astreamer_ops astreamer_lz4_decompressor_ops = { + .content = astreamer_lz4_decompressor_content, + .finalize = astreamer_lz4_decompressor_finalize, + .free = astreamer_lz4_decompressor_free +}; +#endif + +/* + * Create a new base backup streamer that performs lz4 compression of tar + * blocks. + */ +astreamer * +astreamer_lz4_compressor_new(astreamer *next, pg_compress_specification *compress) +{ +#ifdef USE_LZ4 + astreamer_lz4_frame *streamer; + LZ4F_errorCode_t ctxError; + LZ4F_preferences_t *prefs; + + Assert(next != NULL); + + streamer = palloc0(sizeof(astreamer_lz4_frame)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_lz4_compressor_ops; + + streamer->base.bbs_next = next; + initStringInfo(&streamer->base.bbs_buffer); + streamer->header_written = false; + + /* Initialize stream compression preferences */ + prefs = &streamer->prefs; + memset(prefs, 0, sizeof(LZ4F_preferences_t)); + prefs->frameInfo.blockSizeID = LZ4F_max256KB; + prefs->compressionLevel = compress->level; + + ctxError = LZ4F_createCompressionContext(&streamer->cctx, LZ4F_VERSION); + if (LZ4F_isError(ctxError)) + pg_log_error("could not create lz4 compression context: %s", + LZ4F_getErrorName(ctxError)); + + return &streamer->base; +#else + pg_fatal("this build does not support compression with %s", "LZ4"); + return NULL; /* keep compiler quiet */ +#endif +} + +#ifdef USE_LZ4 +/* + * Compress the input data to output buffer. + * + * Find out the compression bound based on input data length for each + * invocation to make sure that output buffer has enough capacity to + * accommodate the compressed data. In case if the output buffer + * capacity falls short of compression bound then forward the content + * of output buffer to next streamer and empty the buffer. + */ +static void +astreamer_lz4_compressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) +{ + astreamer_lz4_frame *mystreamer; + uint8 *next_in, + *next_out; + size_t out_bound, + compressed_size, + avail_out; + + mystreamer = (astreamer_lz4_frame *) streamer; + next_in = (uint8 *) data; + + /* Write header before processing the first input chunk. */ + if (!mystreamer->header_written) + { + compressed_size = LZ4F_compressBegin(mystreamer->cctx, + (uint8 *) mystreamer->base.bbs_buffer.data, + mystreamer->base.bbs_buffer.maxlen, + &mystreamer->prefs); + + if (LZ4F_isError(compressed_size)) + pg_log_error("could not write lz4 header: %s", + LZ4F_getErrorName(compressed_size)); + + mystreamer->bytes_written += compressed_size; + mystreamer->header_written = true; + } + + /* + * Update the offset and capacity of output buffer based on number of + * bytes written to output buffer. + */ + next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written; + avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written; + + /* + * Find out the compression bound and make sure that output buffer has the + * required capacity for the success of LZ4F_compressUpdate. If needed + * forward the content to next streamer and empty the buffer. + */ + out_bound = LZ4F_compressBound(len, &mystreamer->prefs); + if (avail_out < out_bound) + { + astreamer_content(mystreamer->base.bbs_next, member, + mystreamer->base.bbs_buffer.data, + mystreamer->bytes_written, + context); + + /* Enlarge buffer if it falls short of out bound. */ + if (mystreamer->base.bbs_buffer.maxlen < out_bound) + enlargeStringInfo(&mystreamer->base.bbs_buffer, out_bound); + + avail_out = mystreamer->base.bbs_buffer.maxlen; + mystreamer->bytes_written = 0; + next_out = (uint8 *) mystreamer->base.bbs_buffer.data; + } + + /* + * This call compresses the data starting at next_in and generates the + * output starting at next_out. It expects the caller to provide the size + * of input buffer and capacity of output buffer by providing parameters + * len and avail_out. + * + * It returns the number of bytes compressed to output buffer. + */ + compressed_size = LZ4F_compressUpdate(mystreamer->cctx, + next_out, avail_out, + next_in, len, NULL); + + if (LZ4F_isError(compressed_size)) + pg_log_error("could not compress data: %s", + LZ4F_getErrorName(compressed_size)); + + mystreamer->bytes_written += compressed_size; +} + +/* + * End-of-stream processing. + */ +static void +astreamer_lz4_compressor_finalize(astreamer *streamer) +{ + astreamer_lz4_frame *mystreamer; + uint8 *next_out; + size_t footer_bound, + compressed_size, + avail_out; + + mystreamer = (astreamer_lz4_frame *) streamer; + + /* Find out the footer bound and update the output buffer. */ + footer_bound = LZ4F_compressBound(0, &mystreamer->prefs); + if ((mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written) < + footer_bound) + { + astreamer_content(mystreamer->base.bbs_next, NULL, + mystreamer->base.bbs_buffer.data, + mystreamer->bytes_written, + ASTREAMER_UNKNOWN); + + /* Enlarge buffer if it falls short of footer bound. */ + if (mystreamer->base.bbs_buffer.maxlen < footer_bound) + enlargeStringInfo(&mystreamer->base.bbs_buffer, footer_bound); + + avail_out = mystreamer->base.bbs_buffer.maxlen; + mystreamer->bytes_written = 0; + next_out = (uint8 *) mystreamer->base.bbs_buffer.data; + } + else + { + next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written; + avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written; + } + + /* + * Finalize the frame and flush whatever data remaining in compression + * context. + */ + compressed_size = LZ4F_compressEnd(mystreamer->cctx, + next_out, avail_out, NULL); + + if (LZ4F_isError(compressed_size)) + pg_log_error("could not end lz4 compression: %s", + LZ4F_getErrorName(compressed_size)); + + mystreamer->bytes_written += compressed_size; + + astreamer_content(mystreamer->base.bbs_next, NULL, + mystreamer->base.bbs_buffer.data, + mystreamer->bytes_written, + ASTREAMER_UNKNOWN); + + astreamer_finalize(mystreamer->base.bbs_next); +} + +/* + * Free memory. + */ +static void +astreamer_lz4_compressor_free(astreamer *streamer) +{ + astreamer_lz4_frame *mystreamer; + + mystreamer = (astreamer_lz4_frame *) streamer; + astreamer_free(streamer->bbs_next); + LZ4F_freeCompressionContext(mystreamer->cctx); + pfree(streamer->bbs_buffer.data); + pfree(streamer); +} +#endif + +/* + * Create a new base backup streamer that performs decompression of lz4 + * compressed blocks. + */ +astreamer * +astreamer_lz4_decompressor_new(astreamer *next) +{ +#ifdef USE_LZ4 + astreamer_lz4_frame *streamer; + LZ4F_errorCode_t ctxError; + + Assert(next != NULL); + + streamer = palloc0(sizeof(astreamer_lz4_frame)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_lz4_decompressor_ops; + + streamer->base.bbs_next = next; + initStringInfo(&streamer->base.bbs_buffer); + + /* Initialize internal stream state for decompression */ + ctxError = LZ4F_createDecompressionContext(&streamer->dctx, LZ4F_VERSION); + if (LZ4F_isError(ctxError)) + pg_fatal("could not initialize compression library: %s", + LZ4F_getErrorName(ctxError)); + + return &streamer->base; +#else + pg_fatal("this build does not support compression with %s", "LZ4"); + return NULL; /* keep compiler quiet */ +#endif +} + +#ifdef USE_LZ4 +/* + * Decompress the input data to output buffer until we run out of input + * data. Each time the output buffer is full, pass on the decompressed data + * to the next streamer. + */ +static void +astreamer_lz4_decompressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) +{ + astreamer_lz4_frame *mystreamer; + uint8 *next_in, + *next_out; + size_t avail_in, + avail_out; + + mystreamer = (astreamer_lz4_frame *) streamer; + next_in = (uint8 *) data; + next_out = (uint8 *) mystreamer->base.bbs_buffer.data; + avail_in = len; + avail_out = mystreamer->base.bbs_buffer.maxlen; + + while (avail_in > 0) + { + size_t ret, + read_size, + out_size; + + read_size = avail_in; + out_size = avail_out; + + /* + * This call decompresses the data starting at next_in and generates + * the output data starting at next_out. It expects the caller to + * provide size of the input buffer and total capacity of the output + * buffer by providing the read_size and out_size parameters + * respectively. + * + * Per the documentation of LZ4, parameters read_size and out_size + * behaves as dual parameters. On return, the number of bytes consumed + * from the input buffer will be written back to read_size and the + * number of bytes decompressed to output buffer will be written back + * to out_size respectively. + */ + ret = LZ4F_decompress(mystreamer->dctx, + next_out, &out_size, + next_in, &read_size, NULL); + + if (LZ4F_isError(ret)) + pg_log_error("could not decompress data: %s", + LZ4F_getErrorName(ret)); + + /* Update input buffer based on number of bytes consumed */ + avail_in -= read_size; + next_in += read_size; + + mystreamer->bytes_written += out_size; + + /* + * If output buffer is full then forward the content to next streamer + * and update the output buffer. + */ + if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen) + { + astreamer_content(mystreamer->base.bbs_next, member, + mystreamer->base.bbs_buffer.data, + mystreamer->base.bbs_buffer.maxlen, + context); + + avail_out = mystreamer->base.bbs_buffer.maxlen; + mystreamer->bytes_written = 0; + next_out = (uint8 *) mystreamer->base.bbs_buffer.data; + } + else + { + avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written; + next_out += mystreamer->bytes_written; + } + } +} + +/* + * End-of-stream processing. + */ +static void +astreamer_lz4_decompressor_finalize(astreamer *streamer) +{ + astreamer_lz4_frame *mystreamer; + + mystreamer = (astreamer_lz4_frame *) streamer; + + /* + * End of the stream, if there is some pending data in output buffers then + * we must forward it to next streamer. + */ + astreamer_content(mystreamer->base.bbs_next, NULL, + mystreamer->base.bbs_buffer.data, + mystreamer->base.bbs_buffer.maxlen, + ASTREAMER_UNKNOWN); + + astreamer_finalize(mystreamer->base.bbs_next); +} + +/* + * Free memory. + */ +static void +astreamer_lz4_decompressor_free(astreamer *streamer) +{ + astreamer_lz4_frame *mystreamer; + + mystreamer = (astreamer_lz4_frame *) streamer; + astreamer_free(streamer->bbs_next); + LZ4F_freeDecompressionContext(mystreamer->dctx); + pfree(streamer->bbs_buffer.data); + pfree(streamer); +} +#endif diff --git a/src/fe_utils/astreamer_tar.c b/src/fe_utils/astreamer_tar.c new file mode 100644 index 00000000000..f5d3562d280 --- /dev/null +++ b/src/fe_utils/astreamer_tar.c @@ -0,0 +1,514 @@ +/*------------------------------------------------------------------------- + * + * astreamer_tar.c + * + * This module implements three types of tar processing. A tar parser + * expects unlabelled chunks of data (e.g. ASTREAMER_UNKNOWN) and splits + * it into labelled chunks (any other value of astreamer_archive_context). + * A tar archiver does the reverse: it takes a bunch of labelled chunks + * and produces a tarfile, optionally replacing member headers and trailers + * so that upstream astreamer objects can perform surgery on the tarfile + * contents without knowing the details of the tar format. A tar terminator + * just adds two blocks of NUL bytes to the end of the file, since older + * server versions produce files with this terminator omitted. + * + * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/astreamer_tar.c + *------------------------------------------------------------------------- + */ + +#include "postgres_fe.h" + +#include + +#include "common/logging.h" +#include "fe_utils/astreamer.h" +#include "pgtar.h" + +typedef struct astreamer_tar_parser +{ + astreamer base; + astreamer_archive_context next_context; + astreamer_member member; + size_t file_bytes_sent; + size_t pad_bytes_expected; +} astreamer_tar_parser; + +typedef struct astreamer_tar_archiver +{ + astreamer base; + bool rearchive_member; +} astreamer_tar_archiver; + +static void astreamer_tar_parser_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_tar_parser_finalize(astreamer *streamer); +static void astreamer_tar_parser_free(astreamer *streamer); +static bool astreamer_tar_header(astreamer_tar_parser *mystreamer); + +static const astreamer_ops astreamer_tar_parser_ops = { + .content = astreamer_tar_parser_content, + .finalize = astreamer_tar_parser_finalize, + .free = astreamer_tar_parser_free +}; + +static void astreamer_tar_archiver_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_tar_archiver_finalize(astreamer *streamer); +static void astreamer_tar_archiver_free(astreamer *streamer); + +static const astreamer_ops astreamer_tar_archiver_ops = { + .content = astreamer_tar_archiver_content, + .finalize = astreamer_tar_archiver_finalize, + .free = astreamer_tar_archiver_free +}; + +static void astreamer_tar_terminator_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_tar_terminator_finalize(astreamer *streamer); +static void astreamer_tar_terminator_free(astreamer *streamer); + +static const astreamer_ops astreamer_tar_terminator_ops = { + .content = astreamer_tar_terminator_content, + .finalize = astreamer_tar_terminator_finalize, + .free = astreamer_tar_terminator_free +}; + +/* + * Create a astreamer that can parse a stream of content as tar data. + * + * The input should be a series of ASTREAMER_UNKNOWN chunks; the astreamer + * specified by 'next' will receive a series of typed chunks, as per the + * conventions described in astreamer.h. + */ +astreamer * +astreamer_tar_parser_new(astreamer *next) +{ + astreamer_tar_parser *streamer; + + streamer = palloc0(sizeof(astreamer_tar_parser)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_tar_parser_ops; + streamer->base.bbs_next = next; + initStringInfo(&streamer->base.bbs_buffer); + streamer->next_context = ASTREAMER_MEMBER_HEADER; + + return &streamer->base; +} + +/* + * Parse unknown content as tar data. + */ +static void +astreamer_tar_parser_content(astreamer *streamer, astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) +{ + astreamer_tar_parser *mystreamer = (astreamer_tar_parser *) streamer; + size_t nbytes; + + /* Expect unparsed input. */ + Assert(member == NULL); + Assert(context == ASTREAMER_UNKNOWN); + + while (len > 0) + { + switch (mystreamer->next_context) + { + case ASTREAMER_MEMBER_HEADER: + + /* + * If we're expecting an archive member header, accumulate a + * full block of data before doing anything further. + */ + if (!astreamer_buffer_until(streamer, &data, &len, + TAR_BLOCK_SIZE)) + return; + + /* + * Now we can process the header and get ready to process the + * file contents; however, we might find out that what we + * thought was the next file header is actually the start of + * the archive trailer. Switch modes accordingly. + */ + if (astreamer_tar_header(mystreamer)) + { + if (mystreamer->member.size == 0) + { + /* No content; trailer is zero-length. */ + astreamer_content(mystreamer->base.bbs_next, + &mystreamer->member, + NULL, 0, + ASTREAMER_MEMBER_TRAILER); + + /* Expect next header. */ + mystreamer->next_context = ASTREAMER_MEMBER_HEADER; + } + else + { + /* Expect contents. */ + mystreamer->next_context = ASTREAMER_MEMBER_CONTENTS; + } + mystreamer->base.bbs_buffer.len = 0; + mystreamer->file_bytes_sent = 0; + } + else + mystreamer->next_context = ASTREAMER_ARCHIVE_TRAILER; + break; + + case ASTREAMER_MEMBER_CONTENTS: + + /* + * Send as much content as we have, but not more than the + * remaining file length. + */ + Assert(mystreamer->file_bytes_sent < mystreamer->member.size); + nbytes = mystreamer->member.size - mystreamer->file_bytes_sent; + nbytes = Min(nbytes, len); + Assert(nbytes > 0); + astreamer_content(mystreamer->base.bbs_next, + &mystreamer->member, + data, nbytes, + ASTREAMER_MEMBER_CONTENTS); + mystreamer->file_bytes_sent += nbytes; + data += nbytes; + len -= nbytes; + + /* + * If we've not yet sent the whole file, then there's more + * content to come; otherwise, it's time to expect the file + * trailer. + */ + Assert(mystreamer->file_bytes_sent <= mystreamer->member.size); + if (mystreamer->file_bytes_sent == mystreamer->member.size) + { + if (mystreamer->pad_bytes_expected == 0) + { + /* Trailer is zero-length. */ + astreamer_content(mystreamer->base.bbs_next, + &mystreamer->member, + NULL, 0, + ASTREAMER_MEMBER_TRAILER); + + /* Expect next header. */ + mystreamer->next_context = ASTREAMER_MEMBER_HEADER; + } + else + { + /* Trailer is not zero-length. */ + mystreamer->next_context = ASTREAMER_MEMBER_TRAILER; + } + mystreamer->base.bbs_buffer.len = 0; + } + break; + + case ASTREAMER_MEMBER_TRAILER: + + /* + * If we're expecting an archive member trailer, accumulate + * the expected number of padding bytes before sending + * anything onward. + */ + if (!astreamer_buffer_until(streamer, &data, &len, + mystreamer->pad_bytes_expected)) + return; + + /* OK, now we can send it. */ + astreamer_content(mystreamer->base.bbs_next, + &mystreamer->member, + data, mystreamer->pad_bytes_expected, + ASTREAMER_MEMBER_TRAILER); + + /* Expect next file header. */ + mystreamer->next_context = ASTREAMER_MEMBER_HEADER; + mystreamer->base.bbs_buffer.len = 0; + break; + + case ASTREAMER_ARCHIVE_TRAILER: + + /* + * We've seen an end-of-archive indicator, so anything more is + * buffered and sent as part of the archive trailer. But we + * don't expect more than 2 blocks. + */ + astreamer_buffer_bytes(streamer, &data, &len, len); + if (len > 2 * TAR_BLOCK_SIZE) + pg_fatal("tar file trailer exceeds 2 blocks"); + return; + + default: + /* Shouldn't happen. */ + pg_fatal("unexpected state while parsing tar archive"); + } + } +} + +/* + * Parse a file header within a tar stream. + * + * The return value is true if we found a file header and passed it on to the + * next astreamer; it is false if we have reached the archive trailer. + */ +static bool +astreamer_tar_header(astreamer_tar_parser *mystreamer) +{ + bool has_nonzero_byte = false; + int i; + astreamer_member *member = &mystreamer->member; + char *buffer = mystreamer->base.bbs_buffer.data; + + Assert(mystreamer->base.bbs_buffer.len == TAR_BLOCK_SIZE); + + /* Check whether we've got a block of all zero bytes. */ + for (i = 0; i < TAR_BLOCK_SIZE; ++i) + { + if (buffer[i] != '\0') + { + has_nonzero_byte = true; + break; + } + } + + /* + * If the entire block was zeros, this is the end of the archive, not the + * start of the next file. + */ + if (!has_nonzero_byte) + return false; + + /* + * Parse key fields out of the header. + */ + strlcpy(member->pathname, &buffer[TAR_OFFSET_NAME], MAXPGPATH); + if (member->pathname[0] == '\0') + pg_fatal("tar member has empty name"); + member->size = read_tar_number(&buffer[TAR_OFFSET_SIZE], 12); + member->mode = read_tar_number(&buffer[TAR_OFFSET_MODE], 8); + member->uid = read_tar_number(&buffer[TAR_OFFSET_UID], 8); + member->gid = read_tar_number(&buffer[TAR_OFFSET_GID], 8); + member->is_directory = + (buffer[TAR_OFFSET_TYPEFLAG] == TAR_FILETYPE_DIRECTORY); + member->is_link = + (buffer[TAR_OFFSET_TYPEFLAG] == TAR_FILETYPE_SYMLINK); + if (member->is_link) + strlcpy(member->linktarget, &buffer[TAR_OFFSET_LINKNAME], 100); + + /* Compute number of padding bytes. */ + mystreamer->pad_bytes_expected = tarPaddingBytesRequired(member->size); + + /* Forward the entire header to the next astreamer. */ + astreamer_content(mystreamer->base.bbs_next, member, + buffer, TAR_BLOCK_SIZE, + ASTREAMER_MEMBER_HEADER); + + return true; +} + +/* + * End-of-stream processing for a tar parser. + */ +static void +astreamer_tar_parser_finalize(astreamer *streamer) +{ + astreamer_tar_parser *mystreamer = (astreamer_tar_parser *) streamer; + + if (mystreamer->next_context != ASTREAMER_ARCHIVE_TRAILER && + (mystreamer->next_context != ASTREAMER_MEMBER_HEADER || + mystreamer->base.bbs_buffer.len > 0)) + pg_fatal("COPY stream ended before last file was finished"); + + /* Send the archive trailer, even if empty. */ + astreamer_content(streamer->bbs_next, NULL, + streamer->bbs_buffer.data, streamer->bbs_buffer.len, + ASTREAMER_ARCHIVE_TRAILER); + + /* Now finalize successor. */ + astreamer_finalize(streamer->bbs_next); +} + +/* + * Free memory associated with a tar parser. + */ +static void +astreamer_tar_parser_free(astreamer *streamer) +{ + pfree(streamer->bbs_buffer.data); + astreamer_free(streamer->bbs_next); +} + +/* + * Create a astreamer that can generate a tar archive. + * + * This is intended to be usable either for generating a brand-new tar archive + * or for modifying one on the fly. The input should be a series of typed + * chunks (i.e. not ASTREAMER_UNKNOWN). See also the comments for + * astreamer_tar_parser_content. + */ +astreamer * +astreamer_tar_archiver_new(astreamer *next) +{ + astreamer_tar_archiver *streamer; + + streamer = palloc0(sizeof(astreamer_tar_archiver)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_tar_archiver_ops; + streamer->base.bbs_next = next; + + return &streamer->base; +} + +/* + * Fix up the stream of input chunks to create a valid tar file. + * + * If a ASTREAMER_MEMBER_HEADER chunk is of size 0, it is replaced with a + * newly-constructed tar header. If it is of size TAR_BLOCK_SIZE, it is + * passed through without change. Any other size is a fatal error (and + * indicates a bug). + * + * Whenever a new ASTREAMER_MEMBER_HEADER chunk is constructed, the + * corresponding ASTREAMER_MEMBER_TRAILER chunk is also constructed from + * scratch. Specifically, we construct a block of zero bytes sufficient to + * pad out to a block boundary, as required by the tar format. Other + * ASTREAMER_MEMBER_TRAILER chunks are passed through without change. + * + * Any ASTREAMER_MEMBER_CONTENTS chunks are passed through without change. + * + * The ASTREAMER_ARCHIVE_TRAILER chunk is replaced with two + * blocks of zero bytes. Not all tar programs require this, but apparently + * some do. The server does not supply this trailer. If no archive trailer is + * present, one will be added by astreamer_tar_parser_finalize. + */ +static void +astreamer_tar_archiver_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) +{ + astreamer_tar_archiver *mystreamer = (astreamer_tar_archiver *) streamer; + char buffer[2 * TAR_BLOCK_SIZE]; + + Assert(context != ASTREAMER_UNKNOWN); + + if (context == ASTREAMER_MEMBER_HEADER && len != TAR_BLOCK_SIZE) + { + Assert(len == 0); + + /* Replace zero-length tar header with a newly constructed one. */ + tarCreateHeader(buffer, member->pathname, NULL, + member->size, member->mode, member->uid, member->gid, + time(NULL)); + data = buffer; + len = TAR_BLOCK_SIZE; + + /* Also make a note to replace padding, in case size changed. */ + mystreamer->rearchive_member = true; + } + else if (context == ASTREAMER_MEMBER_TRAILER && + mystreamer->rearchive_member) + { + int pad_bytes = tarPaddingBytesRequired(member->size); + + /* Also replace padding, if we regenerated the header. */ + memset(buffer, 0, pad_bytes); + data = buffer; + len = pad_bytes; + + /* Don't do this again unless we replace another header. */ + mystreamer->rearchive_member = false; + } + else if (context == ASTREAMER_ARCHIVE_TRAILER) + { + /* Trailer should always be two blocks of zero bytes. */ + memset(buffer, 0, 2 * TAR_BLOCK_SIZE); + data = buffer; + len = 2 * TAR_BLOCK_SIZE; + } + + astreamer_content(streamer->bbs_next, member, data, len, context); +} + +/* + * End-of-stream processing for a tar archiver. + */ +static void +astreamer_tar_archiver_finalize(astreamer *streamer) +{ + astreamer_finalize(streamer->bbs_next); +} + +/* + * Free memory associated with a tar archiver. + */ +static void +astreamer_tar_archiver_free(astreamer *streamer) +{ + astreamer_free(streamer->bbs_next); + pfree(streamer); +} + +/* + * Create a astreamer that blindly adds two blocks of NUL bytes to the + * end of an incomplete tarfile that the server might send us. + */ +astreamer * +astreamer_tar_terminator_new(astreamer *next) +{ + astreamer *streamer; + + streamer = palloc0(sizeof(astreamer)); + *((const astreamer_ops **) &streamer->bbs_ops) = + &astreamer_tar_terminator_ops; + streamer->bbs_next = next; + + return streamer; +} + +/* + * Pass all the content through without change. + */ +static void +astreamer_tar_terminator_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) +{ + /* Expect unparsed input. */ + Assert(member == NULL); + Assert(context == ASTREAMER_UNKNOWN); + + /* Just forward it. */ + astreamer_content(streamer->bbs_next, member, data, len, context); +} + +/* + * At the end, blindly add the two blocks of NUL bytes which the server fails + * to supply. + */ +static void +astreamer_tar_terminator_finalize(astreamer *streamer) +{ + char buffer[2 * TAR_BLOCK_SIZE]; + + memset(buffer, 0, 2 * TAR_BLOCK_SIZE); + astreamer_content(streamer->bbs_next, NULL, buffer, + 2 * TAR_BLOCK_SIZE, ASTREAMER_UNKNOWN); + astreamer_finalize(streamer->bbs_next); +} + +/* + * Free memory associated with a tar terminator. + */ +static void +astreamer_tar_terminator_free(astreamer *streamer) +{ + astreamer_free(streamer->bbs_next); + pfree(streamer); +} diff --git a/src/fe_utils/astreamer_zstd.c b/src/fe_utils/astreamer_zstd.c new file mode 100644 index 00000000000..45f6cb67363 --- /dev/null +++ b/src/fe_utils/astreamer_zstd.c @@ -0,0 +1,368 @@ +/*------------------------------------------------------------------------- + * + * astreamer_zstd.c + * + * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/astreamer_zstd.c + *------------------------------------------------------------------------- + */ + +#include "postgres_fe.h" + +#include + +#ifdef USE_ZSTD +#include +#endif + +#include "common/logging.h" +#include "fe_utils/astreamer.h" + +#ifdef USE_ZSTD + +typedef struct astreamer_zstd_frame +{ + astreamer base; + + ZSTD_CCtx *cctx; + ZSTD_DCtx *dctx; + ZSTD_outBuffer zstd_outBuf; +} astreamer_zstd_frame; + +static void astreamer_zstd_compressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_zstd_compressor_finalize(astreamer *streamer); +static void astreamer_zstd_compressor_free(astreamer *streamer); + +static const astreamer_ops astreamer_zstd_compressor_ops = { + .content = astreamer_zstd_compressor_content, + .finalize = astreamer_zstd_compressor_finalize, + .free = astreamer_zstd_compressor_free +}; + +static void astreamer_zstd_decompressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_zstd_decompressor_finalize(astreamer *streamer); +static void astreamer_zstd_decompressor_free(astreamer *streamer); + +static const astreamer_ops astreamer_zstd_decompressor_ops = { + .content = astreamer_zstd_decompressor_content, + .finalize = astreamer_zstd_decompressor_finalize, + .free = astreamer_zstd_decompressor_free +}; +#endif + +/* + * Create a new base backup streamer that performs zstd compression of tar + * blocks. + */ +astreamer * +astreamer_zstd_compressor_new(astreamer *next, pg_compress_specification *compress) +{ +#ifdef USE_ZSTD + astreamer_zstd_frame *streamer; + size_t ret; + + Assert(next != NULL); + + streamer = palloc0(sizeof(astreamer_zstd_frame)); + + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_zstd_compressor_ops; + + streamer->base.bbs_next = next; + initStringInfo(&streamer->base.bbs_buffer); + enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize()); + + streamer->cctx = ZSTD_createCCtx(); + if (!streamer->cctx) + pg_fatal("could not create zstd compression context"); + + /* Set compression level */ + ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel, + compress->level); + if (ZSTD_isError(ret)) + pg_fatal("could not set zstd compression level to %d: %s", + compress->level, ZSTD_getErrorName(ret)); + + /* Set # of workers, if specified */ + if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0) + { + /* + * On older versions of libzstd, this option does not exist, and + * trying to set it will fail. Similarly for newer versions if they + * are compiled without threading support. + */ + ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers, + compress->workers); + if (ZSTD_isError(ret)) + pg_fatal("could not set compression worker count to %d: %s", + compress->workers, ZSTD_getErrorName(ret)); + } + + if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0) + { + ret = ZSTD_CCtx_setParameter(streamer->cctx, + ZSTD_c_enableLongDistanceMatching, + compress->long_distance); + if (ZSTD_isError(ret)) + { + pg_log_error("could not enable long-distance mode: %s", + ZSTD_getErrorName(ret)); + exit(1); + } + } + + /* Initialize the ZSTD output buffer. */ + streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data; + streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen; + streamer->zstd_outBuf.pos = 0; + + return &streamer->base; +#else + pg_fatal("this build does not support compression with %s", "ZSTD"); + return NULL; /* keep compiler quiet */ +#endif +} + +#ifdef USE_ZSTD +/* + * Compress the input data to output buffer. + * + * Find out the compression bound based on input data length for each + * invocation to make sure that output buffer has enough capacity to + * accommodate the compressed data. In case if the output buffer + * capacity falls short of compression bound then forward the content + * of output buffer to next streamer and empty the buffer. + */ +static void +astreamer_zstd_compressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) +{ + astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer; + ZSTD_inBuffer inBuf = {data, len, 0}; + + while (inBuf.pos < inBuf.size) + { + size_t yet_to_flush; + size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos); + + /* + * If the output buffer is not left with enough space, send the + * compressed bytes to the next streamer, and empty the buffer. + */ + if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos < + max_needed) + { + astreamer_content(mystreamer->base.bbs_next, member, + mystreamer->zstd_outBuf.dst, + mystreamer->zstd_outBuf.pos, + context); + + /* Reset the ZSTD output buffer. */ + mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data; + mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen; + mystreamer->zstd_outBuf.pos = 0; + } + + yet_to_flush = + ZSTD_compressStream2(mystreamer->cctx, &mystreamer->zstd_outBuf, + &inBuf, ZSTD_e_continue); + + if (ZSTD_isError(yet_to_flush)) + pg_log_error("could not compress data: %s", + ZSTD_getErrorName(yet_to_flush)); + } +} + +/* + * End-of-stream processing. + */ +static void +astreamer_zstd_compressor_finalize(astreamer *streamer) +{ + astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer; + size_t yet_to_flush; + + do + { + ZSTD_inBuffer in = {NULL, 0, 0}; + size_t max_needed = ZSTD_compressBound(0); + + /* + * If the output buffer is not left with enough space, send the + * compressed bytes to the next streamer, and empty the buffer. + */ + if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos < + max_needed) + { + astreamer_content(mystreamer->base.bbs_next, NULL, + mystreamer->zstd_outBuf.dst, + mystreamer->zstd_outBuf.pos, + ASTREAMER_UNKNOWN); + + /* Reset the ZSTD output buffer. */ + mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data; + mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen; + mystreamer->zstd_outBuf.pos = 0; + } + + yet_to_flush = ZSTD_compressStream2(mystreamer->cctx, + &mystreamer->zstd_outBuf, + &in, ZSTD_e_end); + + if (ZSTD_isError(yet_to_flush)) + pg_log_error("could not compress data: %s", + ZSTD_getErrorName(yet_to_flush)); + + } while (yet_to_flush > 0); + + /* Make sure to pass any remaining bytes to the next streamer. */ + if (mystreamer->zstd_outBuf.pos > 0) + astreamer_content(mystreamer->base.bbs_next, NULL, + mystreamer->zstd_outBuf.dst, + mystreamer->zstd_outBuf.pos, + ASTREAMER_UNKNOWN); + + astreamer_finalize(mystreamer->base.bbs_next); +} + +/* + * Free memory. + */ +static void +astreamer_zstd_compressor_free(astreamer *streamer) +{ + astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer; + + astreamer_free(streamer->bbs_next); + ZSTD_freeCCtx(mystreamer->cctx); + pfree(streamer->bbs_buffer.data); + pfree(streamer); +} +#endif + +/* + * Create a new base backup streamer that performs decompression of zstd + * compressed blocks. + */ +astreamer * +astreamer_zstd_decompressor_new(astreamer *next) +{ +#ifdef USE_ZSTD + astreamer_zstd_frame *streamer; + + Assert(next != NULL); + + streamer = palloc0(sizeof(astreamer_zstd_frame)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_zstd_decompressor_ops; + + streamer->base.bbs_next = next; + initStringInfo(&streamer->base.bbs_buffer); + enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize()); + + streamer->dctx = ZSTD_createDCtx(); + if (!streamer->dctx) + pg_fatal("could not create zstd decompression context"); + + /* Initialize the ZSTD output buffer. */ + streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data; + streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen; + streamer->zstd_outBuf.pos = 0; + + return &streamer->base; +#else + pg_fatal("this build does not support compression with %s", "ZSTD"); + return NULL; /* keep compiler quiet */ +#endif +} + +#ifdef USE_ZSTD +/* + * Decompress the input data to output buffer until we run out of input + * data. Each time the output buffer is full, pass on the decompressed data + * to the next streamer. + */ +static void +astreamer_zstd_decompressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) +{ + astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer; + ZSTD_inBuffer inBuf = {data, len, 0}; + + while (inBuf.pos < inBuf.size) + { + size_t ret; + + /* + * If output buffer is full then forward the content to next streamer + * and update the output buffer. + */ + if (mystreamer->zstd_outBuf.pos >= mystreamer->zstd_outBuf.size) + { + astreamer_content(mystreamer->base.bbs_next, member, + mystreamer->zstd_outBuf.dst, + mystreamer->zstd_outBuf.pos, + context); + + /* Reset the ZSTD output buffer. */ + mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data; + mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen; + mystreamer->zstd_outBuf.pos = 0; + } + + ret = ZSTD_decompressStream(mystreamer->dctx, + &mystreamer->zstd_outBuf, &inBuf); + + if (ZSTD_isError(ret)) + pg_log_error("could not decompress data: %s", + ZSTD_getErrorName(ret)); + } +} + +/* + * End-of-stream processing. + */ +static void +astreamer_zstd_decompressor_finalize(astreamer *streamer) +{ + astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer; + + /* + * End of the stream, if there is some pending data in output buffers then + * we must forward it to next streamer. + */ + if (mystreamer->zstd_outBuf.pos > 0) + astreamer_content(mystreamer->base.bbs_next, NULL, + mystreamer->base.bbs_buffer.data, + mystreamer->base.bbs_buffer.maxlen, + ASTREAMER_UNKNOWN); + + astreamer_finalize(mystreamer->base.bbs_next); +} + +/* + * Free memory. + */ +static void +astreamer_zstd_decompressor_free(astreamer *streamer) +{ + astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer; + + astreamer_free(streamer->bbs_next); + ZSTD_freeDCtx(mystreamer->dctx); + pfree(streamer->bbs_buffer.data); + pfree(streamer); +} +#endif diff --git a/src/fe_utils/meson.build b/src/fe_utils/meson.build index 14d0482a2cc..043021d826d 100644 --- a/src/fe_utils/meson.build +++ b/src/fe_utils/meson.build @@ -2,6 +2,11 @@ fe_utils_sources = files( 'archive.c', + 'astreamer_file.c', + 'astreamer_gzip.c', + 'astreamer_lz4.c', + 'astreamer_tar.c', + 'astreamer_zstd.c', 'cancel.c', 'conditional.c', 'connect_utils.c', diff --git a/src/include/fe_utils/astreamer.h b/src/include/fe_utils/astreamer.h new file mode 100644 index 00000000000..2c014dbddbe --- /dev/null +++ b/src/include/fe_utils/astreamer.h @@ -0,0 +1,220 @@ +/*------------------------------------------------------------------------- + * + * astreamer.h + * + * Each tar archive returned by the server is passed to one or more + * astreamer objects for further processing. The astreamer may do + * something simple, like write the archive to a file, perhaps after + * compressing it, but it can also do more complicated things, like + * annotating the byte stream to indicate which parts of the data + * correspond to tar headers or trailing padding, vs. which parts are + * payload data. A subsequent astreamer may use this information to + * make further decisions about how to process the data; for example, + * it might choose to modify the archive contents. + * + * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/astreamer.h + *------------------------------------------------------------------------- + */ + +#ifndef ASTREAMER_H +#define ASTREAMER_H + +#include "common/compression.h" +#include "lib/stringinfo.h" +#include "pqexpbuffer.h" + +struct astreamer; +struct astreamer_ops; +typedef struct astreamer astreamer; +typedef struct astreamer_ops astreamer_ops; + +/* + * Each chunk of archive data passed to a astreamer is classified into one + * of these categories. When data is first received from the remote server, + * each chunk will be categorized as ASTREAMER_UNKNOWN, and the chunks will + * be of whatever size the remote server chose to send. + * + * If the archive is parsed (e.g. see astreamer_tar_parser_new()), then all + * chunks should be labelled as one of the other types listed here. In + * addition, there should be exactly one ASTREAMER_MEMBER_HEADER chunk and + * exactly one ASTREAMER_MEMBER_TRAILER chunk per archive member, even if + * that means a zero-length call. There can be any number of + * ASTREAMER_MEMBER_CONTENTS chunks in between those calls. There + * should exactly ASTREAMER_ARCHIVE_TRAILER chunk, and it should follow the + * last ASTREAMER_MEMBER_TRAILER chunk. + * + * In theory, we could need other classifications here, such as a way of + * indicating an archive header, but the "tar" format doesn't need anything + * else, so for the time being there's no point. + */ +typedef enum +{ + ASTREAMER_UNKNOWN, + ASTREAMER_MEMBER_HEADER, + ASTREAMER_MEMBER_CONTENTS, + ASTREAMER_MEMBER_TRAILER, + ASTREAMER_ARCHIVE_TRAILER, +} astreamer_archive_context; + +/* + * Each chunk of data that is classified as ASTREAMER_MEMBER_HEADER, + * ASTREAMER_MEMBER_CONTENTS, or ASTREAMER_MEMBER_TRAILER should also + * pass a pointer to an instance of this struct. The details are expected + * to be present in the archive header and used to fill the struct, after + * which all subsequent calls for the same archive member are expected to + * pass the same details. + */ +typedef struct +{ + char pathname[MAXPGPATH]; + pgoff_t size; + mode_t mode; + uid_t uid; + gid_t gid; + bool is_directory; + bool is_link; + char linktarget[MAXPGPATH]; +} astreamer_member; + +/* + * Generally, each type of astreamer will define its own struct, but the + * first element should be 'astreamer base'. A astreamer that does not + * require any additional private data could use this structure directly. + * + * bbs_ops is a pointer to the astreamer_ops object which contains the + * function pointers appropriate to this type of astreamer. + * + * bbs_next is a pointer to the successor astreamer, for those types of + * astreamer which forward data to a successor. It need not be used and + * should be set to NULL when not relevant. + * + * bbs_buffer is a buffer for accumulating data for temporary storage. Each + * type of astreamer makes its own decisions about whether and how to use + * this buffer. + */ +struct astreamer +{ + const astreamer_ops *bbs_ops; + astreamer *bbs_next; + StringInfoData bbs_buffer; +}; + +/* + * There are three callbacks for a astreamer. The 'content' callback is + * called repeatedly, as described in the astreamer_archive_context comments. + * Then, the 'finalize' callback is called once at the end, to give the + * astreamer a chance to perform cleanup such as closing files. Finally, + * because this code is running in a frontend environment where, as of this + * writing, there are no memory contexts, the 'free' callback is called to + * release memory. These callbacks should always be invoked using the static + * inline functions defined below. + */ +struct astreamer_ops +{ + void (*content) (astreamer *streamer, astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); + void (*finalize) (astreamer *streamer); + void (*free) (astreamer *streamer); +}; + +/* Send some content to a astreamer. */ +static inline void +astreamer_content(astreamer *streamer, astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) +{ + Assert(streamer != NULL); + streamer->bbs_ops->content(streamer, member, data, len, context); +} + +/* Finalize a astreamer. */ +static inline void +astreamer_finalize(astreamer *streamer) +{ + Assert(streamer != NULL); + streamer->bbs_ops->finalize(streamer); +} + +/* Free a astreamer. */ +static inline void +astreamer_free(astreamer *streamer) +{ + Assert(streamer != NULL); + streamer->bbs_ops->free(streamer); +} + +/* + * This is a convenience method for use when implementing a astreamer; it is + * not for use by outside callers. It adds the amount of data specified by + * 'nbytes' to the astreamer's buffer and adjusts '*len' and '*data' + * accordingly. + */ +static inline void +astreamer_buffer_bytes(astreamer *streamer, const char **data, int *len, + int nbytes) +{ + Assert(nbytes <= *len); + + appendBinaryStringInfo(&streamer->bbs_buffer, *data, nbytes); + *len -= nbytes; + *data += nbytes; +} + +/* + * This is a convenience method for use when implementing a astreamer; it is + * not for use by outsider callers. It attempts to add enough data to the + * astreamer's buffer to reach a length of target_bytes and adjusts '*len' + * and '*data' accordingly. It returns true if the target length has been + * reached and false otherwise. + */ +static inline bool +astreamer_buffer_until(astreamer *streamer, const char **data, int *len, + int target_bytes) +{ + int buflen = streamer->bbs_buffer.len; + + if (buflen >= target_bytes) + { + /* Target length already reached; nothing to do. */ + return true; + } + + if (buflen + *len < target_bytes) + { + /* Not enough data to reach target length; buffer all of it. */ + astreamer_buffer_bytes(streamer, data, len, *len); + return false; + } + + /* Buffer just enough to reach the target length. */ + astreamer_buffer_bytes(streamer, data, len, target_bytes - buflen); + return true; +} + +/* + * Functions for creating astreamer objects of various types. See the header + * comments for each of these functions for details. + */ +extern astreamer *astreamer_plain_writer_new(char *pathname, FILE *file); +extern astreamer *astreamer_gzip_writer_new(char *pathname, FILE *file, + pg_compress_specification *compress); +extern astreamer *astreamer_extractor_new(const char *basepath, + const char *(*link_map) (const char *), + void (*report_output_file) (const char *)); + +extern astreamer *astreamer_gzip_decompressor_new(astreamer *next); +extern astreamer *astreamer_lz4_compressor_new(astreamer *next, + pg_compress_specification *compress); +extern astreamer *astreamer_lz4_decompressor_new(astreamer *next); +extern astreamer *astreamer_zstd_compressor_new(astreamer *next, + pg_compress_specification *compress); +extern astreamer *astreamer_zstd_decompressor_new(astreamer *next); +extern astreamer *astreamer_tar_parser_new(astreamer *next); +extern astreamer *astreamer_tar_terminator_new(astreamer *next); +extern astreamer *astreamer_tar_archiver_new(astreamer *next); + +#endif -- cgit v1.2.3