diff options
-rw-r--r-- | src/bin/pg_basebackup/Makefile | 12 | ||||
-rw-r--r-- | src/bin/pg_basebackup/astreamer.h | 226 | ||||
-rw-r--r-- | src/bin/pg_basebackup/astreamer_file.c (renamed from src/bin/pg_basebackup/bbstreamer_file.c) | 152 | ||||
-rw-r--r-- | src/bin/pg_basebackup/astreamer_gzip.c (renamed from src/bin/pg_basebackup/bbstreamer_gzip.c) | 156 | ||||
-rw-r--r-- | src/bin/pg_basebackup/astreamer_inject.c (renamed from src/bin/pg_basebackup/bbstreamer_inject.c) | 156 | ||||
-rw-r--r-- | src/bin/pg_basebackup/astreamer_lz4.c (renamed from src/bin/pg_basebackup/bbstreamer_lz4.c) | 178 | ||||
-rw-r--r-- | src/bin/pg_basebackup/astreamer_tar.c (renamed from src/bin/pg_basebackup/bbstreamer_tar.c) | 324 | ||||
-rw-r--r-- | src/bin/pg_basebackup/astreamer_zstd.c (renamed from src/bin/pg_basebackup/bbstreamer_zstd.c) | 166 | ||||
-rw-r--r-- | src/bin/pg_basebackup/bbstreamer.h | 226 | ||||
-rw-r--r-- | src/bin/pg_basebackup/meson.build | 12 | ||||
-rw-r--r-- | src/bin/pg_basebackup/nls.mk | 12 | ||||
-rw-r--r-- | src/bin/pg_basebackup/pg_basebackup.c | 104 | ||||
-rw-r--r-- | src/tools/pgindent/typedefs.list | 26 |
13 files changed, 875 insertions, 875 deletions
diff --git a/src/bin/pg_basebackup/Makefile b/src/bin/pg_basebackup/Makefile index 26c53e473f5..a71af2d48a7 100644 --- a/src/bin/pg_basebackup/Makefile +++ b/src/bin/pg_basebackup/Makefile @@ -37,12 +37,12 @@ OBJS = \ BBOBJS = \ pg_basebackup.o \ - bbstreamer_file.o \ - bbstreamer_gzip.o \ - bbstreamer_inject.o \ - bbstreamer_lz4.o \ - bbstreamer_tar.o \ - bbstreamer_zstd.o + astreamer_file.o \ + astreamer_gzip.o \ + astreamer_inject.o \ + astreamer_lz4.o \ + astreamer_tar.o \ + astreamer_zstd.o all: pg_basebackup pg_createsubscriber pg_receivewal pg_recvlogical diff --git a/src/bin/pg_basebackup/astreamer.h b/src/bin/pg_basebackup/astreamer.h new file mode 100644 index 00000000000..b5ed138f54e --- /dev/null +++ b/src/bin/pg_basebackup/astreamer.h @@ -0,0 +1,226 @@ +/*------------------------------------------------------------------------- + * + * astreamer.h + * + * Each tar archive returned by the server is passed to one or more + * astreamer objects for further processing. The astreamer may do + * something simple, like write the archive to a file, perhaps after + * compressing it, but it can also do more complicated things, like + * annotating the byte stream to indicate which parts of the data + * correspond to tar headers or trailing padding, vs. which parts are + * payload data. A subsequent astreamer may use this information to + * make further decisions about how to process the data; for example, + * it might choose to modify the archive contents. + * + * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/astreamer.h + *------------------------------------------------------------------------- + */ + +#ifndef ASTREAMER_H +#define ASTREAMER_H + +#include "common/compression.h" +#include "lib/stringinfo.h" +#include "pqexpbuffer.h" + +struct astreamer; +struct astreamer_ops; +typedef struct astreamer astreamer; +typedef struct astreamer_ops astreamer_ops; + +/* + * Each chunk of archive data passed to a astreamer is classified into one + * of these categories. When data is first received from the remote server, + * each chunk will be categorized as ASTREAMER_UNKNOWN, and the chunks will + * be of whatever size the remote server chose to send. + * + * If the archive is parsed (e.g. see astreamer_tar_parser_new()), then all + * chunks should be labelled as one of the other types listed here. In + * addition, there should be exactly one ASTREAMER_MEMBER_HEADER chunk and + * exactly one ASTREAMER_MEMBER_TRAILER chunk per archive member, even if + * that means a zero-length call. There can be any number of + * ASTREAMER_MEMBER_CONTENTS chunks in between those calls. There + * should exactly ASTREAMER_ARCHIVE_TRAILER chunk, and it should follow the + * last ASTREAMER_MEMBER_TRAILER chunk. + * + * In theory, we could need other classifications here, such as a way of + * indicating an archive header, but the "tar" format doesn't need anything + * else, so for the time being there's no point. + */ +typedef enum +{ + ASTREAMER_UNKNOWN, + ASTREAMER_MEMBER_HEADER, + ASTREAMER_MEMBER_CONTENTS, + ASTREAMER_MEMBER_TRAILER, + ASTREAMER_ARCHIVE_TRAILER, +} astreamer_archive_context; + +/* + * Each chunk of data that is classified as ASTREAMER_MEMBER_HEADER, + * ASTREAMER_MEMBER_CONTENTS, or ASTREAMER_MEMBER_TRAILER should also + * pass a pointer to an instance of this struct. The details are expected + * to be present in the archive header and used to fill the struct, after + * which all subsequent calls for the same archive member are expected to + * pass the same details. + */ +typedef struct +{ + char pathname[MAXPGPATH]; + pgoff_t size; + mode_t mode; + uid_t uid; + gid_t gid; + bool is_directory; + bool is_link; + char linktarget[MAXPGPATH]; +} astreamer_member; + +/* + * Generally, each type of astreamer will define its own struct, but the + * first element should be 'astreamer base'. A astreamer that does not + * require any additional private data could use this structure directly. + * + * bbs_ops is a pointer to the astreamer_ops object which contains the + * function pointers appropriate to this type of astreamer. + * + * bbs_next is a pointer to the successor astreamer, for those types of + * astreamer which forward data to a successor. It need not be used and + * should be set to NULL when not relevant. + * + * bbs_buffer is a buffer for accumulating data for temporary storage. Each + * type of astreamer makes its own decisions about whether and how to use + * this buffer. + */ +struct astreamer +{ + const astreamer_ops *bbs_ops; + astreamer *bbs_next; + StringInfoData bbs_buffer; +}; + +/* + * There are three callbacks for a astreamer. The 'content' callback is + * called repeatedly, as described in the astreamer_archive_context comments. + * Then, the 'finalize' callback is called once at the end, to give the + * astreamer a chance to perform cleanup such as closing files. Finally, + * because this code is running in a frontend environment where, as of this + * writing, there are no memory contexts, the 'free' callback is called to + * release memory. These callbacks should always be invoked using the static + * inline functions defined below. + */ +struct astreamer_ops +{ + void (*content) (astreamer *streamer, astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); + void (*finalize) (astreamer *streamer); + void (*free) (astreamer *streamer); +}; + +/* Send some content to a astreamer. */ +static inline void +astreamer_content(astreamer *streamer, astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) +{ + Assert(streamer != NULL); + streamer->bbs_ops->content(streamer, member, data, len, context); +} + +/* Finalize a astreamer. */ +static inline void +astreamer_finalize(astreamer *streamer) +{ + Assert(streamer != NULL); + streamer->bbs_ops->finalize(streamer); +} + +/* Free a astreamer. */ +static inline void +astreamer_free(astreamer *streamer) +{ + Assert(streamer != NULL); + streamer->bbs_ops->free(streamer); +} + +/* + * This is a convenience method for use when implementing a astreamer; it is + * not for use by outside callers. It adds the amount of data specified by + * 'nbytes' to the astreamer's buffer and adjusts '*len' and '*data' + * accordingly. + */ +static inline void +astreamer_buffer_bytes(astreamer *streamer, const char **data, int *len, + int nbytes) +{ + Assert(nbytes <= *len); + + appendBinaryStringInfo(&streamer->bbs_buffer, *data, nbytes); + *len -= nbytes; + *data += nbytes; +} + +/* + * This is a convenience method for use when implementing a astreamer; it is + * not for use by outsider callers. It attempts to add enough data to the + * astreamer's buffer to reach a length of target_bytes and adjusts '*len' + * and '*data' accordingly. It returns true if the target length has been + * reached and false otherwise. + */ +static inline bool +astreamer_buffer_until(astreamer *streamer, const char **data, int *len, + int target_bytes) +{ + int buflen = streamer->bbs_buffer.len; + + if (buflen >= target_bytes) + { + /* Target length already reached; nothing to do. */ + return true; + } + + if (buflen + *len < target_bytes) + { + /* Not enough data to reach target length; buffer all of it. */ + astreamer_buffer_bytes(streamer, data, len, *len); + return false; + } + + /* Buffer just enough to reach the target length. */ + astreamer_buffer_bytes(streamer, data, len, target_bytes - buflen); + return true; +} + +/* + * Functions for creating astreamer objects of various types. See the header + * comments for each of these functions for details. + */ +extern astreamer *astreamer_plain_writer_new(char *pathname, FILE *file); +extern astreamer *astreamer_gzip_writer_new(char *pathname, FILE *file, + pg_compress_specification *compress); +extern astreamer *astreamer_extractor_new(const char *basepath, + const char *(*link_map) (const char *), + void (*report_output_file) (const char *)); + +extern astreamer *astreamer_gzip_decompressor_new(astreamer *next); +extern astreamer *astreamer_lz4_compressor_new(astreamer *next, + pg_compress_specification *compress); +extern astreamer *astreamer_lz4_decompressor_new(astreamer *next); +extern astreamer *astreamer_zstd_compressor_new(astreamer *next, + pg_compress_specification *compress); +extern astreamer *astreamer_zstd_decompressor_new(astreamer *next); +extern astreamer *astreamer_tar_parser_new(astreamer *next); +extern astreamer *astreamer_tar_terminator_new(astreamer *next); +extern astreamer *astreamer_tar_archiver_new(astreamer *next); + +extern astreamer *astreamer_recovery_injector_new(astreamer *next, + bool is_recovery_guc_supported, + PQExpBuffer recoveryconfcontents); +extern void astreamer_inject_file(astreamer *streamer, char *pathname, + char *data, int len); + +#endif diff --git a/src/bin/pg_basebackup/bbstreamer_file.c b/src/bin/pg_basebackup/astreamer_file.c index bab6cd4a6b1..2742385e103 100644 --- a/src/bin/pg_basebackup/bbstreamer_file.c +++ b/src/bin/pg_basebackup/astreamer_file.c @@ -1,11 +1,11 @@ /*------------------------------------------------------------------------- * - * bbstreamer_file.c + * astreamer_file.c * * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group * * IDENTIFICATION - * src/bin/pg_basebackup/bbstreamer_file.c + * src/bin/pg_basebackup/astreamer_file.c *------------------------------------------------------------------------- */ @@ -13,60 +13,60 @@ #include <unistd.h> -#include "bbstreamer.h" +#include "astreamer.h" #include "common/file_perm.h" #include "common/logging.h" #include "common/string.h" -typedef struct bbstreamer_plain_writer +typedef struct astreamer_plain_writer { - bbstreamer base; + astreamer base; char *pathname; FILE *file; bool should_close_file; -} bbstreamer_plain_writer; +} astreamer_plain_writer; -typedef struct bbstreamer_extractor +typedef struct astreamer_extractor { - bbstreamer base; + astreamer base; char *basepath; const char *(*link_map) (const char *); void (*report_output_file) (const char *); char filename[MAXPGPATH]; FILE *file; -} bbstreamer_extractor; - -static void bbstreamer_plain_writer_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context); -static void bbstreamer_plain_writer_finalize(bbstreamer *streamer); -static void bbstreamer_plain_writer_free(bbstreamer *streamer); - -static const bbstreamer_ops bbstreamer_plain_writer_ops = { - .content = bbstreamer_plain_writer_content, - .finalize = bbstreamer_plain_writer_finalize, - .free = bbstreamer_plain_writer_free +} astreamer_extractor; + +static void astreamer_plain_writer_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_plain_writer_finalize(astreamer *streamer); +static void astreamer_plain_writer_free(astreamer *streamer); + +static const astreamer_ops astreamer_plain_writer_ops = { + .content = astreamer_plain_writer_content, + .finalize = astreamer_plain_writer_finalize, + .free = astreamer_plain_writer_free }; -static void bbstreamer_extractor_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context); -static void bbstreamer_extractor_finalize(bbstreamer *streamer); -static void bbstreamer_extractor_free(bbstreamer *streamer); +static void astreamer_extractor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_extractor_finalize(astreamer *streamer); +static void astreamer_extractor_free(astreamer *streamer); static void extract_directory(const char *filename, mode_t mode); static void extract_link(const char *filename, const char *linktarget); static FILE *create_file_for_extract(const char *filename, mode_t mode); -static const bbstreamer_ops bbstreamer_extractor_ops = { - .content = bbstreamer_extractor_content, - .finalize = bbstreamer_extractor_finalize, - .free = bbstreamer_extractor_free +static const astreamer_ops astreamer_extractor_ops = { + .content = astreamer_extractor_content, + .finalize = astreamer_extractor_finalize, + .free = astreamer_extractor_free }; /* - * Create a bbstreamer that just writes data to a file. + * Create a astreamer that just writes data to a file. * * The caller must specify a pathname and may specify a file. The pathname is * used for error-reporting purposes either way. If file is NULL, the pathname @@ -74,14 +74,14 @@ static const bbstreamer_ops bbstreamer_extractor_ops = { * for writing and closed when done. If file is not NULL, the data is written * there. */ -bbstreamer * -bbstreamer_plain_writer_new(char *pathname, FILE *file) +astreamer * +astreamer_plain_writer_new(char *pathname, FILE *file) { - bbstreamer_plain_writer *streamer; + astreamer_plain_writer *streamer; - streamer = palloc0(sizeof(bbstreamer_plain_writer)); - *((const bbstreamer_ops **) &streamer->base.bbs_ops) = - &bbstreamer_plain_writer_ops; + streamer = palloc0(sizeof(astreamer_plain_writer)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_plain_writer_ops; streamer->pathname = pstrdup(pathname); streamer->file = file; @@ -101,13 +101,13 @@ bbstreamer_plain_writer_new(char *pathname, FILE *file) * Write archive content to file. */ static void -bbstreamer_plain_writer_content(bbstreamer *streamer, - bbstreamer_member *member, const char *data, - int len, bbstreamer_archive_context context) +astreamer_plain_writer_content(astreamer *streamer, + astreamer_member *member, const char *data, + int len, astreamer_archive_context context) { - bbstreamer_plain_writer *mystreamer; + astreamer_plain_writer *mystreamer; - mystreamer = (bbstreamer_plain_writer *) streamer; + mystreamer = (astreamer_plain_writer *) streamer; if (len == 0) return; @@ -128,11 +128,11 @@ bbstreamer_plain_writer_content(bbstreamer *streamer, * the file if we opened it, but not if the caller provided it. */ static void -bbstreamer_plain_writer_finalize(bbstreamer *streamer) +astreamer_plain_writer_finalize(astreamer *streamer) { - bbstreamer_plain_writer *mystreamer; + astreamer_plain_writer *mystreamer; - mystreamer = (bbstreamer_plain_writer *) streamer; + mystreamer = (astreamer_plain_writer *) streamer; if (mystreamer->should_close_file && fclose(mystreamer->file) != 0) pg_fatal("could not close file \"%s\": %m", @@ -143,14 +143,14 @@ bbstreamer_plain_writer_finalize(bbstreamer *streamer) } /* - * Free memory associated with this bbstreamer. + * Free memory associated with this astreamer. */ static void -bbstreamer_plain_writer_free(bbstreamer *streamer) +astreamer_plain_writer_free(astreamer *streamer) { - bbstreamer_plain_writer *mystreamer; + astreamer_plain_writer *mystreamer; - mystreamer = (bbstreamer_plain_writer *) streamer; + mystreamer = (astreamer_plain_writer *) streamer; Assert(!mystreamer->should_close_file); Assert(mystreamer->base.bbs_next == NULL); @@ -160,13 +160,13 @@ bbstreamer_plain_writer_free(bbstreamer *streamer) } /* - * Create a bbstreamer that extracts an archive. + * Create a astreamer that extracts an archive. * * All pathnames in the archive are interpreted relative to basepath. * - * Unlike e.g. bbstreamer_plain_writer_new() we can't do anything useful here + * Unlike e.g. astreamer_plain_writer_new() we can't do anything useful here * with untyped chunks; we need typed chunks which follow the rules described - * in bbstreamer.h. Assuming we have that, we don't need to worry about the + * in astreamer.h. Assuming we have that, we don't need to worry about the * original archive format; it's enough to just look at the member information * provided and write to the corresponding file. * @@ -179,16 +179,16 @@ bbstreamer_plain_writer_free(bbstreamer *streamer) * new output file. The pathname to that file is passed as an argument. If * NULL, the call is skipped. */ -bbstreamer * -bbstreamer_extractor_new(const char *basepath, - const char *(*link_map) (const char *), - void (*report_output_file) (const char *)) +astreamer * +astreamer_extractor_new(const char *basepath, + const char *(*link_map) (const char *), + void (*report_output_file) (const char *)) { - bbstreamer_extractor *streamer; + astreamer_extractor *streamer; - streamer = palloc0(sizeof(bbstreamer_extractor)); - *((const bbstreamer_ops **) &streamer->base.bbs_ops) = - &bbstreamer_extractor_ops; + streamer = palloc0(sizeof(astreamer_extractor)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_extractor_ops; streamer->basepath = pstrdup(basepath); streamer->link_map = link_map; streamer->report_output_file = report_output_file; @@ -200,19 +200,19 @@ bbstreamer_extractor_new(const char *basepath, * Extract archive contents to the filesystem. */ static void -bbstreamer_extractor_content(bbstreamer *streamer, bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context) +astreamer_extractor_content(astreamer *streamer, astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) { - bbstreamer_extractor *mystreamer = (bbstreamer_extractor *) streamer; + astreamer_extractor *mystreamer = (astreamer_extractor *) streamer; int fnamelen; - Assert(member != NULL || context == BBSTREAMER_ARCHIVE_TRAILER); - Assert(context != BBSTREAMER_UNKNOWN); + Assert(member != NULL || context == ASTREAMER_ARCHIVE_TRAILER); + Assert(context != ASTREAMER_UNKNOWN); switch (context) { - case BBSTREAMER_MEMBER_HEADER: + case ASTREAMER_MEMBER_HEADER: Assert(mystreamer->file == NULL); /* Prepend basepath. */ @@ -245,7 +245,7 @@ bbstreamer_extractor_content(bbstreamer *streamer, bbstreamer_member *member, mystreamer->report_output_file(mystreamer->filename); break; - case BBSTREAMER_MEMBER_CONTENTS: + case ASTREAMER_MEMBER_CONTENTS: if (mystreamer->file == NULL) break; @@ -260,14 +260,14 @@ bbstreamer_extractor_content(bbstreamer *streamer, bbstreamer_member *member, } break; - case BBSTREAMER_MEMBER_TRAILER: + case ASTREAMER_MEMBER_TRAILER: if (mystreamer->file == NULL) break; fclose(mystreamer->file); mystreamer->file = NULL; break; - case BBSTREAMER_ARCHIVE_TRAILER: + case ASTREAMER_ARCHIVE_TRAILER: break; default: @@ -375,10 +375,10 @@ create_file_for_extract(const char *filename, mode_t mode) * There's nothing to do here but sanity checking. */ static void -bbstreamer_extractor_finalize(bbstreamer *streamer) +astreamer_extractor_finalize(astreamer *streamer) { - bbstreamer_extractor *mystreamer PG_USED_FOR_ASSERTS_ONLY - = (bbstreamer_extractor *) streamer; + astreamer_extractor *mystreamer PG_USED_FOR_ASSERTS_ONLY + = (astreamer_extractor *) streamer; Assert(mystreamer->file == NULL); } @@ -387,9 +387,9 @@ bbstreamer_extractor_finalize(bbstreamer *streamer) * Free memory. */ static void -bbstreamer_extractor_free(bbstreamer *streamer) +astreamer_extractor_free(astreamer *streamer) { - bbstreamer_extractor *mystreamer = (bbstreamer_extractor *) streamer; + astreamer_extractor *mystreamer = (astreamer_extractor *) streamer; pfree(mystreamer->basepath); pfree(mystreamer); diff --git a/src/bin/pg_basebackup/bbstreamer_gzip.c b/src/bin/pg_basebackup/astreamer_gzip.c index 0417fd9bc2c..6f7c27afbbc 100644 --- a/src/bin/pg_basebackup/bbstreamer_gzip.c +++ b/src/bin/pg_basebackup/astreamer_gzip.c @@ -1,11 +1,11 @@ /*------------------------------------------------------------------------- * - * bbstreamer_gzip.c + * astreamer_gzip.c * * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group * * IDENTIFICATION - * src/bin/pg_basebackup/bbstreamer_gzip.c + * src/bin/pg_basebackup/astreamer_gzip.c *------------------------------------------------------------------------- */ @@ -17,74 +17,74 @@ #include <zlib.h> #endif -#include "bbstreamer.h" +#include "astreamer.h" #include "common/file_perm.h" #include "common/logging.h" #include "common/string.h" #ifdef HAVE_LIBZ -typedef struct bbstreamer_gzip_writer +typedef struct astreamer_gzip_writer { - bbstreamer base; + astreamer base; char *pathname; gzFile gzfile; -} bbstreamer_gzip_writer; +} astreamer_gzip_writer; -typedef struct bbstreamer_gzip_decompressor +typedef struct astreamer_gzip_decompressor { - bbstreamer base; + astreamer base; z_stream zstream; size_t bytes_written; -} bbstreamer_gzip_decompressor; - -static void bbstreamer_gzip_writer_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context); -static void bbstreamer_gzip_writer_finalize(bbstreamer *streamer); -static void bbstreamer_gzip_writer_free(bbstreamer *streamer); +} astreamer_gzip_decompressor; + +static void astreamer_gzip_writer_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_gzip_writer_finalize(astreamer *streamer); +static void astreamer_gzip_writer_free(astreamer *streamer); static const char *get_gz_error(gzFile gzf); -static const bbstreamer_ops bbstreamer_gzip_writer_ops = { - .content = bbstreamer_gzip_writer_content, - .finalize = bbstreamer_gzip_writer_finalize, - .free = bbstreamer_gzip_writer_free +static const astreamer_ops astreamer_gzip_writer_ops = { + .content = astreamer_gzip_writer_content, + .finalize = astreamer_gzip_writer_finalize, + .free = astreamer_gzip_writer_free }; -static void bbstreamer_gzip_decompressor_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context); -static void bbstreamer_gzip_decompressor_finalize(bbstreamer *streamer); -static void bbstreamer_gzip_decompressor_free(bbstreamer *streamer); +static void astreamer_gzip_decompressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_gzip_decompressor_finalize(astreamer *streamer); +static void astreamer_gzip_decompressor_free(astreamer *streamer); static void *gzip_palloc(void *opaque, unsigned items, unsigned size); static void gzip_pfree(void *opaque, void *address); -static const bbstreamer_ops bbstreamer_gzip_decompressor_ops = { - .content = bbstreamer_gzip_decompressor_content, - .finalize = bbstreamer_gzip_decompressor_finalize, - .free = bbstreamer_gzip_decompressor_free +static const astreamer_ops astreamer_gzip_decompressor_ops = { + .content = astreamer_gzip_decompressor_content, + .finalize = astreamer_gzip_decompressor_finalize, + .free = astreamer_gzip_decompressor_free }; #endif /* - * Create a bbstreamer that just compresses data using gzip, and then writes + * Create a astreamer that just compresses data using gzip, and then writes * it to a file. * - * As in the case of bbstreamer_plain_writer_new, pathname is always used + * As in the case of astreamer_plain_writer_new, pathname is always used * for error reporting purposes; if file is NULL, it is also the opened and * closed so that the data may be written there. */ -bbstreamer * -bbstreamer_gzip_writer_new(char *pathname, FILE *file, - pg_compress_specification *compress) +astreamer * +astreamer_gzip_writer_new(char *pathname, FILE *file, + pg_compress_specification *compress) { #ifdef HAVE_LIBZ - bbstreamer_gzip_writer *streamer; + astreamer_gzip_writer *streamer; - streamer = palloc0(sizeof(bbstreamer_gzip_writer)); - *((const bbstreamer_ops **) &streamer->base.bbs_ops) = - &bbstreamer_gzip_writer_ops; + streamer = palloc0(sizeof(astreamer_gzip_writer)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_gzip_writer_ops; streamer->pathname = pstrdup(pathname); @@ -123,13 +123,13 @@ bbstreamer_gzip_writer_new(char *pathname, FILE *file, * Write archive content to gzip file. */ static void -bbstreamer_gzip_writer_content(bbstreamer *streamer, - bbstreamer_member *member, const char *data, - int len, bbstreamer_archive_context context) +astreamer_gzip_writer_content(astreamer *streamer, + astreamer_member *member, const char *data, + int len, astreamer_archive_context context) { - bbstreamer_gzip_writer *mystreamer; + astreamer_gzip_writer *mystreamer; - mystreamer = (bbstreamer_gzip_writer *) streamer; + mystreamer = (astreamer_gzip_writer *) streamer; if (len == 0) return; @@ -151,16 +151,16 @@ bbstreamer_gzip_writer_content(bbstreamer *streamer, * * It makes no difference whether we opened the file or the caller did it, * because libz provides no way of avoiding a close on the underlying file - * handle. Notice, however, that bbstreamer_gzip_writer_new() uses dup() to + * handle. Notice, however, that astreamer_gzip_writer_new() uses dup() to * work around this issue, so that the behavior from the caller's viewpoint - * is the same as for bbstreamer_plain_writer. + * is the same as for astreamer_plain_writer. */ static void -bbstreamer_gzip_writer_finalize(bbstreamer *streamer) +astreamer_gzip_writer_finalize(astreamer *streamer) { - bbstreamer_gzip_writer *mystreamer; + astreamer_gzip_writer *mystreamer; - mystreamer = (bbstreamer_gzip_writer *) streamer; + mystreamer = (astreamer_gzip_writer *) streamer; errno = 0; /* in case gzclose() doesn't set it */ if (gzclose(mystreamer->gzfile) != 0) @@ -171,14 +171,14 @@ bbstreamer_gzip_writer_finalize(bbstreamer *streamer) } /* - * Free memory associated with this bbstreamer. + * Free memory associated with this astreamer. */ static void -bbstreamer_gzip_writer_free(bbstreamer *streamer) +astreamer_gzip_writer_free(astreamer *streamer) { - bbstreamer_gzip_writer *mystreamer; + astreamer_gzip_writer *mystreamer; - mystreamer = (bbstreamer_gzip_writer *) streamer; + mystreamer = (astreamer_gzip_writer *) streamer; Assert(mystreamer->base.bbs_next == NULL); Assert(mystreamer->gzfile == NULL); @@ -208,18 +208,18 @@ get_gz_error(gzFile gzf) * Create a new base backup streamer that performs decompression of gzip * compressed blocks. */ -bbstreamer * -bbstreamer_gzip_decompressor_new(bbstreamer *next) +astreamer * +astreamer_gzip_decompressor_new(astreamer *next) { #ifdef HAVE_LIBZ - bbstreamer_gzip_decompressor *streamer; + astreamer_gzip_decompressor *streamer; z_stream *zs; Assert(next != NULL); - streamer = palloc0(sizeof(bbstreamer_gzip_decompressor)); - *((const bbstreamer_ops **) &streamer->base.bbs_ops) = - &bbstreamer_gzip_decompressor_ops; + streamer = palloc0(sizeof(astreamer_gzip_decompressor)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_gzip_decompressor_ops; streamer->base.bbs_next = next; initStringInfo(&streamer->base.bbs_buffer); @@ -258,15 +258,15 @@ bbstreamer_gzip_decompressor_new(bbstreamer *next) * to the next streamer. */ static void -bbstreamer_gzip_decompressor_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context) +astreamer_gzip_decompressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) { - bbstreamer_gzip_decompressor *mystreamer; + astreamer_gzip_decompressor *mystreamer; z_stream *zs; - mystreamer = (bbstreamer_gzip_decompressor *) streamer; + mystreamer = (astreamer_gzip_decompressor *) streamer; zs = &mystreamer->zstream; zs->next_in = (const uint8 *) data; @@ -301,9 +301,9 @@ bbstreamer_gzip_decompressor_content(bbstreamer *streamer, /* If output buffer is full then pass data to next streamer */ if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen) { - bbstreamer_content(mystreamer->base.bbs_next, member, - mystreamer->base.bbs_buffer.data, - mystreamer->base.bbs_buffer.maxlen, context); + astreamer_content(mystreamer->base.bbs_next, member, + mystreamer->base.bbs_buffer.data, + mystreamer->base.bbs_buffer.maxlen, context); mystreamer->bytes_written = 0; } } @@ -313,31 +313,31 @@ bbstreamer_gzip_decompressor_content(bbstreamer *streamer, * End-of-stream processing. */ static void -bbstreamer_gzip_decompressor_finalize(bbstreamer *streamer) +astreamer_gzip_decompressor_finalize(astreamer *streamer) { - bbstreamer_gzip_decompressor *mystreamer; + astreamer_gzip_decompressor *mystreamer; - mystreamer = (bbstreamer_gzip_decompressor *) streamer; + mystreamer = (astreamer_gzip_decompressor *) streamer; /* * End of the stream, if there is some pending data in output buffers then * we must forward it to next streamer. */ - bbstreamer_content(mystreamer->base.bbs_next, NULL, - mystreamer->base.bbs_buffer.data, - mystreamer->base.bbs_buffer.maxlen, - BBSTREAMER_UNKNOWN); + astreamer_content(mystreamer->base.bbs_next, NULL, + mystreamer->base.bbs_buffer.data, + mystreamer->base.bbs_buffer.maxlen, + ASTREAMER_UNKNOWN); - bbstreamer_finalize(mystreamer->base.bbs_next); + astreamer_finalize(mystreamer->base.bbs_next); } /* * Free memory. */ static void -bbstreamer_gzip_decompressor_free(bbstreamer *streamer) +astreamer_gzip_decompressor_free(astreamer *streamer) { - bbstreamer_free(streamer->bbs_next); + astreamer_free(streamer->bbs_next); pfree(streamer->bbs_buffer.data); pfree(streamer); } diff --git a/src/bin/pg_basebackup/bbstreamer_inject.c b/src/bin/pg_basebackup/astreamer_inject.c index 194026b56e9..7f1decded8d 100644 --- a/src/bin/pg_basebackup/bbstreamer_inject.c +++ b/src/bin/pg_basebackup/astreamer_inject.c @@ -1,51 +1,51 @@ /*------------------------------------------------------------------------- * - * bbstreamer_inject.c + * astreamer_inject.c * * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group * * IDENTIFICATION - * src/bin/pg_basebackup/bbstreamer_inject.c + * src/bin/pg_basebackup/astreamer_inject.c *------------------------------------------------------------------------- */ #include "postgres_fe.h" -#include "bbstreamer.h" +#include "astreamer.h" #include "common/file_perm.h" #include "common/logging.h" -typedef struct bbstreamer_recovery_injector +typedef struct astreamer_recovery_injector { - bbstreamer base; + astreamer base; bool skip_file; bool is_recovery_guc_supported; bool is_postgresql_auto_conf; bool found_postgresql_auto_conf; PQExpBuffer recoveryconfcontents; - bbstreamer_member member; -} bbstreamer_recovery_injector; - -static void bbstreamer_recovery_injector_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context); -static void bbstreamer_recovery_injector_finalize(bbstreamer *streamer); -static void bbstreamer_recovery_injector_free(bbstreamer *streamer); - -static const bbstreamer_ops bbstreamer_recovery_injector_ops = { - .content = bbstreamer_recovery_injector_content, - .finalize = bbstreamer_recovery_injector_finalize, - .free = bbstreamer_recovery_injector_free + astreamer_member member; +} astreamer_recovery_injector; + +static void astreamer_recovery_injector_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_recovery_injector_finalize(astreamer *streamer); +static void astreamer_recovery_injector_free(astreamer *streamer); + +static const astreamer_ops astreamer_recovery_injector_ops = { + .content = astreamer_recovery_injector_content, + .finalize = astreamer_recovery_injector_finalize, + .free = astreamer_recovery_injector_free }; /* - * Create a bbstreamer that can edit recoverydata into an archive stream. + * Create a astreamer that can edit recoverydata into an archive stream. * - * The input should be a series of typed chunks (not BBSTREAMER_UNKNOWN) as - * per the conventions described in bbstreamer.h; the chunks forwarded to - * the next bbstreamer will be similarly typed, but the - * BBSTREAMER_MEMBER_HEADER chunks may be zero-length in cases where we've + * The input should be a series of typed chunks (not ASTREAMER_UNKNOWN) as + * per the conventions described in astreamer.h; the chunks forwarded to + * the next astreamer will be similarly typed, but the + * ASTREAMER_MEMBER_HEADER chunks may be zero-length in cases where we've * edited the archive stream. * * Our goal is to do one of the following three things with the content passed @@ -61,16 +61,16 @@ static const bbstreamer_ops bbstreamer_recovery_injector_ops = { * zero-length standby.signal file, dropping any file with that name from * the archive. */ -bbstreamer * -bbstreamer_recovery_injector_new(bbstreamer *next, - bool is_recovery_guc_supported, - PQExpBuffer recoveryconfcontents) +astreamer * +astreamer_recovery_injector_new(astreamer *next, + bool is_recovery_guc_supported, + PQExpBuffer recoveryconfcontents) { - bbstreamer_recovery_injector *streamer; + astreamer_recovery_injector *streamer; - streamer = palloc0(sizeof(bbstreamer_recovery_injector)); - *((const bbstreamer_ops **) &streamer->base.bbs_ops) = - &bbstreamer_recovery_injector_ops; + streamer = palloc0(sizeof(astreamer_recovery_injector)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_recovery_injector_ops; streamer->base.bbs_next = next; streamer->is_recovery_guc_supported = is_recovery_guc_supported; streamer->recoveryconfcontents = recoveryconfcontents; @@ -82,21 +82,21 @@ bbstreamer_recovery_injector_new(bbstreamer *next, * Handle each chunk of tar content while injecting recovery configuration. */ static void -bbstreamer_recovery_injector_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context) +astreamer_recovery_injector_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) { - bbstreamer_recovery_injector *mystreamer; + astreamer_recovery_injector *mystreamer; - mystreamer = (bbstreamer_recovery_injector *) streamer; - Assert(member != NULL || context == BBSTREAMER_ARCHIVE_TRAILER); + mystreamer = (astreamer_recovery_injector *) streamer; + Assert(member != NULL || context == ASTREAMER_ARCHIVE_TRAILER); switch (context) { - case BBSTREAMER_MEMBER_HEADER: + case ASTREAMER_MEMBER_HEADER: /* Must copy provided data so we have the option to modify it. */ - memcpy(&mystreamer->member, member, sizeof(bbstreamer_member)); + memcpy(&mystreamer->member, member, sizeof(astreamer_member)); /* * On v12+, skip standby.signal and edit postgresql.auto.conf; on @@ -119,8 +119,8 @@ bbstreamer_recovery_injector_content(bbstreamer *streamer, /* * Zap data and len because the archive header is no - * longer valid; some subsequent bbstreamer must - * regenerate it if it's necessary. + * longer valid; some subsequent astreamer must regenerate + * it if it's necessary. */ data = NULL; len = 0; @@ -135,26 +135,26 @@ bbstreamer_recovery_injector_content(bbstreamer *streamer, return; break; - case BBSTREAMER_MEMBER_CONTENTS: + case ASTREAMER_MEMBER_CONTENTS: /* Do not forward if the file is to be skipped. */ if (mystreamer->skip_file) return; break; - case BBSTREAMER_MEMBER_TRAILER: + case ASTREAMER_MEMBER_TRAILER: /* Do not forward it the file is to be skipped. */ if (mystreamer->skip_file) return; /* Append provided content to whatever we already sent. */ if (mystreamer->is_postgresql_auto_conf) - bbstreamer_content(mystreamer->base.bbs_next, member, - mystreamer->recoveryconfcontents->data, - mystreamer->recoveryconfcontents->len, - BBSTREAMER_MEMBER_CONTENTS); + astreamer_content(mystreamer->base.bbs_next, member, + mystreamer->recoveryconfcontents->data, + mystreamer->recoveryconfcontents->len, + ASTREAMER_MEMBER_CONTENTS); break; - case BBSTREAMER_ARCHIVE_TRAILER: + case ASTREAMER_ARCHIVE_TRAILER: if (mystreamer->is_recovery_guc_supported) { /* @@ -163,22 +163,22 @@ bbstreamer_recovery_injector_content(bbstreamer *streamer, * member now. */ if (!mystreamer->found_postgresql_auto_conf) - bbstreamer_inject_file(mystreamer->base.bbs_next, - "postgresql.auto.conf", - mystreamer->recoveryconfcontents->data, - mystreamer->recoveryconfcontents->len); + astreamer_inject_file(mystreamer->base.bbs_next, + "postgresql.auto.conf", + mystreamer->recoveryconfcontents->data, + mystreamer->recoveryconfcontents->len); /* Inject empty standby.signal file. */ - bbstreamer_inject_file(mystreamer->base.bbs_next, - "standby.signal", "", 0); + astreamer_inject_file(mystreamer->base.bbs_next, + "standby.signal", "", 0); } else { /* Inject recovery.conf file with specified contents. */ - bbstreamer_inject_file(mystreamer->base.bbs_next, - "recovery.conf", - mystreamer->recoveryconfcontents->data, - mystreamer->recoveryconfcontents->len); + astreamer_inject_file(mystreamer->base.bbs_next, + "recovery.conf", + mystreamer->recoveryconfcontents->data, + mystreamer->recoveryconfcontents->len); } /* Nothing to do here. */ @@ -189,26 +189,26 @@ bbstreamer_recovery_injector_content(bbstreamer *streamer, pg_fatal("unexpected state while injecting recovery settings"); } - bbstreamer_content(mystreamer->base.bbs_next, &mystreamer->member, - data, len, context); + astreamer_content(mystreamer->base.bbs_next, &mystreamer->member, + data, len, context); } /* - * End-of-stream processing for this bbstreamer. + * End-of-stream processing for this astreamer. */ static void -bbstreamer_recovery_injector_finalize(bbstreamer *streamer) +astreamer_recovery_injector_finalize(astreamer *streamer) { - bbstreamer_finalize(streamer->bbs_next); + astreamer_finalize(streamer->bbs_next); } /* - * Free memory associated with this bbstreamer. + * Free memory associated with this astreamer. */ static void -bbstreamer_recovery_injector_free(bbstreamer *streamer) +astreamer_recovery_injector_free(astreamer *streamer) { - bbstreamer_free(streamer->bbs_next); + astreamer_free(streamer->bbs_next); pfree(streamer); } @@ -216,10 +216,10 @@ bbstreamer_recovery_injector_free(bbstreamer *streamer) * Inject a member into the archive with specified contents. */ void -bbstreamer_inject_file(bbstreamer *streamer, char *pathname, char *data, - int len) +astreamer_inject_file(astreamer *streamer, char *pathname, char *data, + int len) { - bbstreamer_member member; + astreamer_member member; strlcpy(member.pathname, pathname, MAXPGPATH); member.size = len; @@ -238,12 +238,12 @@ bbstreamer_inject_file(bbstreamer *streamer, char *pathname, char *data, /* * We don't know here how to generate valid member headers and trailers * for the archiving format in use, so if those are needed, some successor - * bbstreamer will have to generate them using the data from 'member'. + * astreamer will have to generate them using the data from 'member'. */ - bbstreamer_content(streamer, &member, NULL, 0, - BBSTREAMER_MEMBER_HEADER); - bbstreamer_content(streamer, &member, data, len, - BBSTREAMER_MEMBER_CONTENTS); - bbstreamer_content(streamer, &member, NULL, 0, - BBSTREAMER_MEMBER_TRAILER); + astreamer_content(streamer, &member, NULL, 0, + ASTREAMER_MEMBER_HEADER); + astreamer_content(streamer, &member, data, len, + ASTREAMER_MEMBER_CONTENTS); + astreamer_content(streamer, &member, NULL, 0, + ASTREAMER_MEMBER_TRAILER); } diff --git a/src/bin/pg_basebackup/bbstreamer_lz4.c b/src/bin/pg_basebackup/astreamer_lz4.c index f5c9e68150c..1c40d7d8ad5 100644 --- a/src/bin/pg_basebackup/bbstreamer_lz4.c +++ b/src/bin/pg_basebackup/astreamer_lz4.c @@ -1,11 +1,11 @@ /*------------------------------------------------------------------------- * - * bbstreamer_lz4.c + * astreamer_lz4.c * * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group * * IDENTIFICATION - * src/bin/pg_basebackup/bbstreamer_lz4.c + * src/bin/pg_basebackup/astreamer_lz4.c *------------------------------------------------------------------------- */ @@ -17,15 +17,15 @@ #include <lz4frame.h> #endif -#include "bbstreamer.h" +#include "astreamer.h" #include "common/file_perm.h" #include "common/logging.h" #include "common/string.h" #ifdef USE_LZ4 -typedef struct bbstreamer_lz4_frame +typedef struct astreamer_lz4_frame { - bbstreamer base; + astreamer base; LZ4F_compressionContext_t cctx; LZ4F_decompressionContext_t dctx; @@ -33,32 +33,32 @@ typedef struct bbstreamer_lz4_frame size_t bytes_written; bool header_written; -} bbstreamer_lz4_frame; - -static void bbstreamer_lz4_compressor_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context); -static void bbstreamer_lz4_compressor_finalize(bbstreamer *streamer); -static void bbstreamer_lz4_compressor_free(bbstreamer *streamer); - -static const bbstreamer_ops bbstreamer_lz4_compressor_ops = { - .content = bbstreamer_lz4_compressor_content, - .finalize = bbstreamer_lz4_compressor_finalize, - .free = bbstreamer_lz4_compressor_free +} astreamer_lz4_frame; + +static void astreamer_lz4_compressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_lz4_compressor_finalize(astreamer *streamer); +static void astreamer_lz4_compressor_free(astreamer *streamer); + +static const astreamer_ops astreamer_lz4_compressor_ops = { + .content = astreamer_lz4_compressor_content, + .finalize = astreamer_lz4_compressor_finalize, + .free = astreamer_lz4_compressor_free }; -static void bbstreamer_lz4_decompressor_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context); -static void bbstreamer_lz4_decompressor_finalize(bbstreamer *streamer); -static void bbstreamer_lz4_decompressor_free(bbstreamer *streamer); - -static const bbstreamer_ops bbstreamer_lz4_decompressor_ops = { - .content = bbstreamer_lz4_decompressor_content, - .finalize = bbstreamer_lz4_decompressor_finalize, - .free = bbstreamer_lz4_decompressor_free +static void astreamer_lz4_decompressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_lz4_decompressor_finalize(astreamer *streamer); +static void astreamer_lz4_decompressor_free(astreamer *streamer); + +static const astreamer_ops astreamer_lz4_decompressor_ops = { + .content = astreamer_lz4_decompressor_content, + .finalize = astreamer_lz4_decompressor_finalize, + .free = astreamer_lz4_decompressor_free }; #endif @@ -66,19 +66,19 @@ static const bbstreamer_ops bbstreamer_lz4_decompressor_ops = { * Create a new base backup streamer that performs lz4 compression of tar * blocks. */ -bbstreamer * -bbstreamer_lz4_compressor_new(bbstreamer *next, pg_compress_specification *compress) +astreamer * +astreamer_lz4_compressor_new(astreamer *next, pg_compress_specification *compress) { #ifdef USE_LZ4 - bbstreamer_lz4_frame *streamer; + astreamer_lz4_frame *streamer; LZ4F_errorCode_t ctxError; LZ4F_preferences_t *prefs; Assert(next != NULL); - streamer = palloc0(sizeof(bbstreamer_lz4_frame)); - *((const bbstreamer_ops **) &streamer->base.bbs_ops) = - &bbstreamer_lz4_compressor_ops; + streamer = palloc0(sizeof(astreamer_lz4_frame)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_lz4_compressor_ops; streamer->base.bbs_next = next; initStringInfo(&streamer->base.bbs_buffer); @@ -113,19 +113,19 @@ bbstreamer_lz4_compressor_new(bbstreamer *next, pg_compress_specification *compr * of output buffer to next streamer and empty the buffer. */ static void -bbstreamer_lz4_compressor_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context) +astreamer_lz4_compressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) { - bbstreamer_lz4_frame *mystreamer; + astreamer_lz4_frame *mystreamer; uint8 *next_in, *next_out; size_t out_bound, compressed_size, avail_out; - mystreamer = (bbstreamer_lz4_frame *) streamer; + mystreamer = (astreamer_lz4_frame *) streamer; next_in = (uint8 *) data; /* Write header before processing the first input chunk. */ @@ -159,10 +159,10 @@ bbstreamer_lz4_compressor_content(bbstreamer *streamer, out_bound = LZ4F_compressBound(len, &mystreamer->prefs); if (avail_out < out_bound) { - bbstreamer_content(mystreamer->base.bbs_next, member, - mystreamer->base.bbs_buffer.data, - mystreamer->bytes_written, - context); + astreamer_content(mystreamer->base.bbs_next, member, + mystreamer->base.bbs_buffer.data, + mystreamer->bytes_written, + context); /* Enlarge buffer if it falls short of out bound. */ if (mystreamer->base.bbs_buffer.maxlen < out_bound) @@ -196,25 +196,25 @@ bbstreamer_lz4_compressor_content(bbstreamer *streamer, * End-of-stream processing. */ static void -bbstreamer_lz4_compressor_finalize(bbstreamer *streamer) +astreamer_lz4_compressor_finalize(astreamer *streamer) { - bbstreamer_lz4_frame *mystreamer; + astreamer_lz4_frame *mystreamer; uint8 *next_out; size_t footer_bound, compressed_size, avail_out; - mystreamer = (bbstreamer_lz4_frame *) streamer; + mystreamer = (astreamer_lz4_frame *) streamer; /* Find out the footer bound and update the output buffer. */ footer_bound = LZ4F_compressBound(0, &mystreamer->prefs); if ((mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written) < footer_bound) { - bbstreamer_content(mystreamer->base.bbs_next, NULL, - mystreamer->base.bbs_buffer.data, - mystreamer->bytes_written, - BBSTREAMER_UNKNOWN); + astreamer_content(mystreamer->base.bbs_next, NULL, + mystreamer->base.bbs_buffer.data, + mystreamer->bytes_written, + ASTREAMER_UNKNOWN); /* Enlarge buffer if it falls short of footer bound. */ if (mystreamer->base.bbs_buffer.maxlen < footer_bound) @@ -243,24 +243,24 @@ bbstreamer_lz4_compressor_finalize(bbstreamer *streamer) mystreamer->bytes_written += compressed_size; - bbstreamer_content(mystreamer->base.bbs_next, NULL, - mystreamer->base.bbs_buffer.data, - mystreamer->bytes_written, - BBSTREAMER_UNKNOWN); + astreamer_content(mystreamer->base.bbs_next, NULL, + mystreamer->base.bbs_buffer.data, + mystreamer->bytes_written, + ASTREAMER_UNKNOWN); - bbstreamer_finalize(mystreamer->base.bbs_next); + astreamer_finalize(mystreamer->base.bbs_next); } /* * Free memory. */ static void -bbstreamer_lz4_compressor_free(bbstreamer *streamer) +astreamer_lz4_compressor_free(astreamer *streamer) { - bbstreamer_lz4_frame *mystreamer; + astreamer_lz4_frame *mystreamer; - mystreamer = (bbstreamer_lz4_frame *) streamer; - bbstreamer_free(streamer->bbs_next); + mystreamer = (astreamer_lz4_frame *) streamer; + astreamer_free(streamer->bbs_next); LZ4F_freeCompressionContext(mystreamer->cctx); pfree(streamer->bbs_buffer.data); pfree(streamer); @@ -271,18 +271,18 @@ bbstreamer_lz4_compressor_free(bbstreamer *streamer) * Create a new base backup streamer that performs decompression of lz4 * compressed blocks. */ -bbstreamer * -bbstreamer_lz4_decompressor_new(bbstreamer *next) +astreamer * +astreamer_lz4_decompressor_new(astreamer *next) { #ifdef USE_LZ4 - bbstreamer_lz4_frame *streamer; + astreamer_lz4_frame *streamer; LZ4F_errorCode_t ctxError; Assert(next != NULL); - streamer = palloc0(sizeof(bbstreamer_lz4_frame)); - *((const bbstreamer_ops **) &streamer->base.bbs_ops) = - &bbstreamer_lz4_decompressor_ops; + streamer = palloc0(sizeof(astreamer_lz4_frame)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_lz4_decompressor_ops; streamer->base.bbs_next = next; initStringInfo(&streamer->base.bbs_buffer); @@ -307,18 +307,18 @@ bbstreamer_lz4_decompressor_new(bbstreamer *next) * to the next streamer. */ static void -bbstreamer_lz4_decompressor_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context) +astreamer_lz4_decompressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) { - bbstreamer_lz4_frame *mystreamer; + astreamer_lz4_frame *mystreamer; uint8 *next_in, *next_out; size_t avail_in, avail_out; - mystreamer = (bbstreamer_lz4_frame *) streamer; + mystreamer = (astreamer_lz4_frame *) streamer; next_in = (uint8 *) data; next_out = (uint8 *) mystreamer->base.bbs_buffer.data; avail_in = len; @@ -366,10 +366,10 @@ bbstreamer_lz4_decompressor_content(bbstreamer *streamer, */ if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen) { - bbstreamer_content(mystreamer->base.bbs_next, member, - mystreamer->base.bbs_buffer.data, - mystreamer->base.bbs_buffer.maxlen, - context); + astreamer_content(mystreamer->base.bbs_next, member, + mystreamer->base.bbs_buffer.data, + mystreamer->base.bbs_buffer.maxlen, + context); avail_out = mystreamer->base.bbs_buffer.maxlen; mystreamer->bytes_written = 0; @@ -387,34 +387,34 @@ bbstreamer_lz4_decompressor_content(bbstreamer *streamer, * End-of-stream processing. */ static void -bbstreamer_lz4_decompressor_finalize(bbstreamer *streamer) +astreamer_lz4_decompressor_finalize(astreamer *streamer) { - bbstreamer_lz4_frame *mystreamer; + astreamer_lz4_frame *mystreamer; - mystreamer = (bbstreamer_lz4_frame *) streamer; + mystreamer = (astreamer_lz4_frame *) streamer; /* * End of the stream, if there is some pending data in output buffers then * we must forward it to next streamer. */ - bbstreamer_content(mystreamer->base.bbs_next, NULL, - mystreamer->base.bbs_buffer.data, - mystreamer->base.bbs_buffer.maxlen, - BBSTREAMER_UNKNOWN); + astreamer_content(mystreamer->base.bbs_next, NULL, + mystreamer->base.bbs_buffer.data, + mystreamer->base.bbs_buffer.maxlen, + ASTREAMER_UNKNOWN); - bbstreamer_finalize(mystreamer->base.bbs_next); + astreamer_finalize(mystreamer->base.bbs_next); } /* * Free memory. */ static void -bbstreamer_lz4_decompressor_free(bbstreamer *streamer) +astreamer_lz4_decompressor_free(astreamer *streamer) { - bbstreamer_lz4_frame *mystreamer; + astreamer_lz4_frame *mystreamer; - mystreamer = (bbstreamer_lz4_frame *) streamer; - bbstreamer_free(streamer->bbs_next); + mystreamer = (astreamer_lz4_frame *) streamer; + astreamer_free(streamer->bbs_next); LZ4F_freeDecompressionContext(mystreamer->dctx); pfree(streamer->bbs_buffer.data); pfree(streamer); diff --git a/src/bin/pg_basebackup/bbstreamer_tar.c b/src/bin/pg_basebackup/astreamer_tar.c index 9137d17ddc1..673690cd18f 100644 --- a/src/bin/pg_basebackup/bbstreamer_tar.c +++ b/src/bin/pg_basebackup/astreamer_tar.c @@ -1,13 +1,13 @@ /*------------------------------------------------------------------------- * - * bbstreamer_tar.c + * astreamer_tar.c * * This module implements three types of tar processing. A tar parser - * expects unlabelled chunks of data (e.g. BBSTREAMER_UNKNOWN) and splits - * it into labelled chunks (any other value of bbstreamer_archive_context). + * expects unlabelled chunks of data (e.g. ASTREAMER_UNKNOWN) and splits + * it into labelled chunks (any other value of astreamer_archive_context). * A tar archiver does the reverse: it takes a bunch of labelled chunks * and produces a tarfile, optionally replacing member headers and trailers - * so that upstream bbstreamer objects can perform surgery on the tarfile + * so that upstream astreamer objects can perform surgery on the tarfile * contents without knowing the details of the tar format. A tar terminator * just adds two blocks of NUL bytes to the end of the file, since older * server versions produce files with this terminator omitted. @@ -15,7 +15,7 @@ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group * * IDENTIFICATION - * src/bin/pg_basebackup/bbstreamer_tar.c + * src/bin/pg_basebackup/astreamer_tar.c *------------------------------------------------------------------------- */ @@ -23,83 +23,83 @@ #include <time.h> -#include "bbstreamer.h" +#include "astreamer.h" #include "common/logging.h" #include "pgtar.h" -typedef struct bbstreamer_tar_parser +typedef struct astreamer_tar_parser { - bbstreamer base; - bbstreamer_archive_context next_context; - bbstreamer_member member; + astreamer base; + astreamer_archive_context next_context; + astreamer_member member; size_t file_bytes_sent; size_t pad_bytes_expected; -} bbstreamer_tar_parser; +} astreamer_tar_parser; -typedef struct bbstreamer_tar_archiver +typedef struct astreamer_tar_archiver { - bbstreamer base; + astreamer base; bool rearchive_member; -} bbstreamer_tar_archiver; - -static void bbstreamer_tar_parser_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context); -static void bbstreamer_tar_parser_finalize(bbstreamer *streamer); -static void bbstreamer_tar_parser_free(bbstreamer *streamer); -static bool bbstreamer_tar_header(bbstreamer_tar_parser *mystreamer); - -static const bbstreamer_ops bbstreamer_tar_parser_ops = { - .content = bbstreamer_tar_parser_content, - .finalize = bbstreamer_tar_parser_finalize, - .free = bbstreamer_tar_parser_free +} astreamer_tar_archiver; + +static void astreamer_tar_parser_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_tar_parser_finalize(astreamer *streamer); +static void astreamer_tar_parser_free(astreamer *streamer); +static bool astreamer_tar_header(astreamer_tar_parser *mystreamer); + +static const astreamer_ops astreamer_tar_parser_ops = { + .content = astreamer_tar_parser_content, + .finalize = astreamer_tar_parser_finalize, + .free = astreamer_tar_parser_free }; -static void bbstreamer_tar_archiver_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context); -static void bbstreamer_tar_archiver_finalize(bbstreamer *streamer); -static void bbstreamer_tar_archiver_free(bbstreamer *streamer); - -static const bbstreamer_ops bbstreamer_tar_archiver_ops = { - .content = bbstreamer_tar_archiver_content, - .finalize = bbstreamer_tar_archiver_finalize, - .free = bbstreamer_tar_archiver_free +static void astreamer_tar_archiver_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_tar_archiver_finalize(astreamer *streamer); +static void astreamer_tar_archiver_free(astreamer *streamer); + +static const astreamer_ops astreamer_tar_archiver_ops = { + .content = astreamer_tar_archiver_content, + .finalize = astreamer_tar_archiver_finalize, + .free = astreamer_tar_archiver_free }; -static void bbstreamer_tar_terminator_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context); -static void bbstreamer_tar_terminator_finalize(bbstreamer *streamer); -static void bbstreamer_tar_terminator_free(bbstreamer *streamer); - -static const bbstreamer_ops bbstreamer_tar_terminator_ops = { - .content = bbstreamer_tar_terminator_content, - .finalize = bbstreamer_tar_terminator_finalize, - .free = bbstreamer_tar_terminator_free +static void astreamer_tar_terminator_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_tar_terminator_finalize(astreamer *streamer); +static void astreamer_tar_terminator_free(astreamer *streamer); + +static const astreamer_ops astreamer_tar_terminator_ops = { + .content = astreamer_tar_terminator_content, + .finalize = astreamer_tar_terminator_finalize, + .free = astreamer_tar_terminator_free }; /* - * Create a bbstreamer that can parse a stream of content as tar data. + * Create a astreamer that can parse a stream of content as tar data. * - * The input should be a series of BBSTREAMER_UNKNOWN chunks; the bbstreamer + * The input should be a series of ASTREAMER_UNKNOWN chunks; the astreamer * specified by 'next' will receive a series of typed chunks, as per the - * conventions described in bbstreamer.h. + * conventions described in astreamer.h. */ -bbstreamer * -bbstreamer_tar_parser_new(bbstreamer *next) +astreamer * +astreamer_tar_parser_new(astreamer *next) { - bbstreamer_tar_parser *streamer; + astreamer_tar_parser *streamer; - streamer = palloc0(sizeof(bbstreamer_tar_parser)); - *((const bbstreamer_ops **) &streamer->base.bbs_ops) = - &bbstreamer_tar_parser_ops; + streamer = palloc0(sizeof(astreamer_tar_parser)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_tar_parser_ops; streamer->base.bbs_next = next; initStringInfo(&streamer->base.bbs_buffer); - streamer->next_context = BBSTREAMER_MEMBER_HEADER; + streamer->next_context = ASTREAMER_MEMBER_HEADER; return &streamer->base; } @@ -108,29 +108,29 @@ bbstreamer_tar_parser_new(bbstreamer *next) * Parse unknown content as tar data. */ static void -bbstreamer_tar_parser_content(bbstreamer *streamer, bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context) +astreamer_tar_parser_content(astreamer *streamer, astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) { - bbstreamer_tar_parser *mystreamer = (bbstreamer_tar_parser *) streamer; + astreamer_tar_parser *mystreamer = (astreamer_tar_parser *) streamer; size_t nbytes; /* Expect unparsed input. */ Assert(member == NULL); - Assert(context == BBSTREAMER_UNKNOWN); + Assert(context == ASTREAMER_UNKNOWN); while (len > 0) { switch (mystreamer->next_context) { - case BBSTREAMER_MEMBER_HEADER: + case ASTREAMER_MEMBER_HEADER: /* * If we're expecting an archive member header, accumulate a * full block of data before doing anything further. */ - if (!bbstreamer_buffer_until(streamer, &data, &len, - TAR_BLOCK_SIZE)) + if (!astreamer_buffer_until(streamer, &data, &len, + TAR_BLOCK_SIZE)) return; /* @@ -139,32 +139,32 @@ bbstreamer_tar_parser_content(bbstreamer *streamer, bbstreamer_member *member, * thought was the next file header is actually the start of * the archive trailer. Switch modes accordingly. */ - if (bbstreamer_tar_header(mystreamer)) + if (astreamer_tar_header(mystreamer)) { if (mystreamer->member.size == 0) { /* No content; trailer is zero-length. */ - bbstreamer_content(mystreamer->base.bbs_next, - &mystreamer->member, - NULL, 0, - BBSTREAMER_MEMBER_TRAILER); + astreamer_content(mystreamer->base.bbs_next, + &mystreamer->member, + NULL, 0, + ASTREAMER_MEMBER_TRAILER); /* Expect next header. */ - mystreamer->next_context = BBSTREAMER_MEMBER_HEADER; + mystreamer->next_context = ASTREAMER_MEMBER_HEADER; } else { /* Expect contents. */ - mystreamer->next_context = BBSTREAMER_MEMBER_CONTENTS; + mystreamer->next_context = ASTREAMER_MEMBER_CONTENTS; } mystreamer->base.bbs_buffer.len = 0; mystreamer->file_bytes_sent = 0; } else - mystreamer->next_context = BBSTREAMER_ARCHIVE_TRAILER; + mystreamer->next_context = ASTREAMER_ARCHIVE_TRAILER; break; - case BBSTREAMER_MEMBER_CONTENTS: + case ASTREAMER_MEMBER_CONTENTS: /* * Send as much content as we have, but not more than the @@ -174,10 +174,10 @@ bbstreamer_tar_parser_content(bbstreamer *streamer, bbstreamer_member *member, nbytes = mystreamer->member.size - mystreamer->file_bytes_sent; nbytes = Min(nbytes, len); Assert(nbytes > 0); - bbstreamer_content(mystreamer->base.bbs_next, - &mystreamer->member, - data, nbytes, - BBSTREAMER_MEMBER_CONTENTS); + astreamer_content(mystreamer->base.bbs_next, + &mystreamer->member, + data, nbytes, + ASTREAMER_MEMBER_CONTENTS); mystreamer->file_bytes_sent += nbytes; data += nbytes; len -= nbytes; @@ -193,53 +193,53 @@ bbstreamer_tar_parser_content(bbstreamer *streamer, bbstreamer_member *member, if (mystreamer->pad_bytes_expected == 0) { /* Trailer is zero-length. */ - bbstreamer_content(mystreamer->base.bbs_next, - &mystreamer->member, - NULL, 0, - BBSTREAMER_MEMBER_TRAILER); + astreamer_content(mystreamer->base.bbs_next, + &mystreamer->member, + NULL, 0, + ASTREAMER_MEMBER_TRAILER); /* Expect next header. */ - mystreamer->next_context = BBSTREAMER_MEMBER_HEADER; + mystreamer->next_context = ASTREAMER_MEMBER_HEADER; } else { /* Trailer is not zero-length. */ - mystreamer->next_context = BBSTREAMER_MEMBER_TRAILER; + mystreamer->next_context = ASTREAMER_MEMBER_TRAILER; } mystreamer->base.bbs_buffer.len = 0; } break; - case BBSTREAMER_MEMBER_TRAILER: + case ASTREAMER_MEMBER_TRAILER: /* * If we're expecting an archive member trailer, accumulate * the expected number of padding bytes before sending * anything onward. */ - if (!bbstreamer_buffer_until(streamer, &data, &len, - mystreamer->pad_bytes_expected)) + if (!astreamer_buffer_until(streamer, &data, &len, + mystreamer->pad_bytes_expected)) return; /* OK, now we can send it. */ - bbstreamer_content(mystreamer->base.bbs_next, - &mystreamer->member, - data, mystreamer->pad_bytes_expected, - BBSTREAMER_MEMBER_TRAILER); + astreamer_content(mystreamer->base.bbs_next, + &mystreamer->member, + data, mystreamer->pad_bytes_expected, + ASTREAMER_MEMBER_TRAILER); /* Expect next file header. */ - mystreamer->next_context = BBSTREAMER_MEMBER_HEADER; + mystreamer->next_context = ASTREAMER_MEMBER_HEADER; mystreamer->base.bbs_buffer.len = 0; break; - case BBSTREAMER_ARCHIVE_TRAILER: + case ASTREAMER_ARCHIVE_TRAILER: /* * We've seen an end-of-archive indicator, so anything more is * buffered and sent as part of the archive trailer. But we * don't expect more than 2 blocks. */ - bbstreamer_buffer_bytes(streamer, &data, &len, len); + astreamer_buffer_bytes(streamer, &data, &len, len); if (len > 2 * TAR_BLOCK_SIZE) pg_fatal("tar file trailer exceeds 2 blocks"); return; @@ -255,14 +255,14 @@ bbstreamer_tar_parser_content(bbstreamer *streamer, bbstreamer_member *member, * Parse a file header within a tar stream. * * The return value is true if we found a file header and passed it on to the - * next bbstreamer; it is false if we have reached the archive trailer. + * next astreamer; it is false if we have reached the archive trailer. */ static bool -bbstreamer_tar_header(bbstreamer_tar_parser *mystreamer) +astreamer_tar_header(astreamer_tar_parser *mystreamer) { bool has_nonzero_byte = false; int i; - bbstreamer_member *member = &mystreamer->member; + astreamer_member *member = &mystreamer->member; char *buffer = mystreamer->base.bbs_buffer.data; Assert(mystreamer->base.bbs_buffer.len == TAR_BLOCK_SIZE); @@ -304,10 +304,10 @@ bbstreamer_tar_header(bbstreamer_tar_parser *mystreamer) /* Compute number of padding bytes. */ mystreamer->pad_bytes_expected = tarPaddingBytesRequired(member->size); - /* Forward the entire header to the next bbstreamer. */ - bbstreamer_content(mystreamer->base.bbs_next, member, - buffer, TAR_BLOCK_SIZE, - BBSTREAMER_MEMBER_HEADER); + /* Forward the entire header to the next astreamer. */ + astreamer_content(mystreamer->base.bbs_next, member, + buffer, TAR_BLOCK_SIZE, + ASTREAMER_MEMBER_HEADER); return true; } @@ -316,50 +316,50 @@ bbstreamer_tar_header(bbstreamer_tar_parser *mystreamer) * End-of-stream processing for a tar parser. */ static void -bbstreamer_tar_parser_finalize(bbstreamer *streamer) +astreamer_tar_parser_finalize(astreamer *streamer) { - bbstreamer_tar_parser *mystreamer = (bbstreamer_tar_parser *) streamer; + astreamer_tar_parser *mystreamer = (astreamer_tar_parser *) streamer; - if (mystreamer->next_context != BBSTREAMER_ARCHIVE_TRAILER && - (mystreamer->next_context != BBSTREAMER_MEMBER_HEADER || + if (mystreamer->next_context != ASTREAMER_ARCHIVE_TRAILER && + (mystreamer->next_context != ASTREAMER_MEMBER_HEADER || mystreamer->base.bbs_buffer.len > 0)) pg_fatal("COPY stream ended before last file was finished"); /* Send the archive trailer, even if empty. */ - bbstreamer_content(streamer->bbs_next, NULL, - streamer->bbs_buffer.data, streamer->bbs_buffer.len, - BBSTREAMER_ARCHIVE_TRAILER); + astreamer_content(streamer->bbs_next, NULL, + streamer->bbs_buffer.data, streamer->bbs_buffer.len, + ASTREAMER_ARCHIVE_TRAILER); /* Now finalize successor. */ - bbstreamer_finalize(streamer->bbs_next); + astreamer_finalize(streamer->bbs_next); } /* * Free memory associated with a tar parser. */ static void -bbstreamer_tar_parser_free(bbstreamer *streamer) +astreamer_tar_parser_free(astreamer *streamer) { pfree(streamer->bbs_buffer.data); - bbstreamer_free(streamer->bbs_next); + astreamer_free(streamer->bbs_next); } /* - * Create a bbstreamer that can generate a tar archive. + * Create a astreamer that can generate a tar archive. * * This is intended to be usable either for generating a brand-new tar archive * or for modifying one on the fly. The input should be a series of typed - * chunks (i.e. not BBSTREAMER_UNKNOWN). See also the comments for - * bbstreamer_tar_parser_content. + * chunks (i.e. not ASTREAMER_UNKNOWN). See also the comments for + * astreamer_tar_parser_content. */ -bbstreamer * -bbstreamer_tar_archiver_new(bbstreamer *next) +astreamer * +astreamer_tar_archiver_new(astreamer *next) { - bbstreamer_tar_archiver *streamer; + astreamer_tar_archiver *streamer; - streamer = palloc0(sizeof(bbstreamer_tar_archiver)); - *((const bbstreamer_ops **) &streamer->base.bbs_ops) = - &bbstreamer_tar_archiver_ops; + streamer = palloc0(sizeof(astreamer_tar_archiver)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_tar_archiver_ops; streamer->base.bbs_next = next; return &streamer->base; @@ -368,36 +368,36 @@ bbstreamer_tar_archiver_new(bbstreamer *next) /* * Fix up the stream of input chunks to create a valid tar file. * - * If a BBSTREAMER_MEMBER_HEADER chunk is of size 0, it is replaced with a + * If a ASTREAMER_MEMBER_HEADER chunk is of size 0, it is replaced with a * newly-constructed tar header. If it is of size TAR_BLOCK_SIZE, it is * passed through without change. Any other size is a fatal error (and * indicates a bug). * - * Whenever a new BBSTREAMER_MEMBER_HEADER chunk is constructed, the - * corresponding BBSTREAMER_MEMBER_TRAILER chunk is also constructed from + * Whenever a new ASTREAMER_MEMBER_HEADER chunk is constructed, the + * corresponding ASTREAMER_MEMBER_TRAILER chunk is also constructed from * scratch. Specifically, we construct a block of zero bytes sufficient to * pad out to a block boundary, as required by the tar format. Other - * BBSTREAMER_MEMBER_TRAILER chunks are passed through without change. + * ASTREAMER_MEMBER_TRAILER chunks are passed through without change. * - * Any BBSTREAMER_MEMBER_CONTENTS chunks are passed through without change. + * Any ASTREAMER_MEMBER_CONTENTS chunks are passed through without change. * - * The BBSTREAMER_ARCHIVE_TRAILER chunk is replaced with two + * The ASTREAMER_ARCHIVE_TRAILER chunk is replaced with two * blocks of zero bytes. Not all tar programs require this, but apparently * some do. The server does not supply this trailer. If no archive trailer is - * present, one will be added by bbstreamer_tar_parser_finalize. + * present, one will be added by astreamer_tar_parser_finalize. */ static void -bbstreamer_tar_archiver_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context) +astreamer_tar_archiver_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) { - bbstreamer_tar_archiver *mystreamer = (bbstreamer_tar_archiver *) streamer; + astreamer_tar_archiver *mystreamer = (astreamer_tar_archiver *) streamer; char buffer[2 * TAR_BLOCK_SIZE]; - Assert(context != BBSTREAMER_UNKNOWN); + Assert(context != ASTREAMER_UNKNOWN); - if (context == BBSTREAMER_MEMBER_HEADER && len != TAR_BLOCK_SIZE) + if (context == ASTREAMER_MEMBER_HEADER && len != TAR_BLOCK_SIZE) { Assert(len == 0); @@ -411,7 +411,7 @@ bbstreamer_tar_archiver_content(bbstreamer *streamer, /* Also make a note to replace padding, in case size changed. */ mystreamer->rearchive_member = true; } - else if (context == BBSTREAMER_MEMBER_TRAILER && + else if (context == ASTREAMER_MEMBER_TRAILER && mystreamer->rearchive_member) { int pad_bytes = tarPaddingBytesRequired(member->size); @@ -424,7 +424,7 @@ bbstreamer_tar_archiver_content(bbstreamer *streamer, /* Don't do this again unless we replace another header. */ mystreamer->rearchive_member = false; } - else if (context == BBSTREAMER_ARCHIVE_TRAILER) + else if (context == ASTREAMER_ARCHIVE_TRAILER) { /* Trailer should always be two blocks of zero bytes. */ memset(buffer, 0, 2 * TAR_BLOCK_SIZE); @@ -432,40 +432,40 @@ bbstreamer_tar_archiver_content(bbstreamer *streamer, len = 2 * TAR_BLOCK_SIZE; } - bbstreamer_content(streamer->bbs_next, member, data, len, context); + astreamer_content(streamer->bbs_next, member, data, len, context); } /* * End-of-stream processing for a tar archiver. */ static void -bbstreamer_tar_archiver_finalize(bbstreamer *streamer) +astreamer_tar_archiver_finalize(astreamer *streamer) { - bbstreamer_finalize(streamer->bbs_next); + astreamer_finalize(streamer->bbs_next); } /* * Free memory associated with a tar archiver. */ static void -bbstreamer_tar_archiver_free(bbstreamer *streamer) +astreamer_tar_archiver_free(astreamer *streamer) { - bbstreamer_free(streamer->bbs_next); + astreamer_free(streamer->bbs_next); pfree(streamer); } /* - * Create a bbstreamer that blindly adds two blocks of NUL bytes to the + * Create a astreamer that blindly adds two blocks of NUL bytes to the * end of an incomplete tarfile that the server might send us. */ -bbstreamer * -bbstreamer_tar_terminator_new(bbstreamer *next) +astreamer * +astreamer_tar_terminator_new(astreamer *next) { - bbstreamer *streamer; + astreamer *streamer; - streamer = palloc0(sizeof(bbstreamer)); - *((const bbstreamer_ops **) &streamer->bbs_ops) = - &bbstreamer_tar_terminator_ops; + streamer = palloc0(sizeof(astreamer)); + *((const astreamer_ops **) &streamer->bbs_ops) = + &astreamer_tar_terminator_ops; streamer->bbs_next = next; return streamer; @@ -475,17 +475,17 @@ bbstreamer_tar_terminator_new(bbstreamer *next) * Pass all the content through without change. */ static void -bbstreamer_tar_terminator_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context) +astreamer_tar_terminator_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) { /* Expect unparsed input. */ Assert(member == NULL); - Assert(context == BBSTREAMER_UNKNOWN); + Assert(context == ASTREAMER_UNKNOWN); /* Just forward it. */ - bbstreamer_content(streamer->bbs_next, member, data, len, context); + astreamer_content(streamer->bbs_next, member, data, len, context); } /* @@ -493,22 +493,22 @@ bbstreamer_tar_terminator_content(bbstreamer *streamer, * to supply. */ static void -bbstreamer_tar_terminator_finalize(bbstreamer *streamer) +astreamer_tar_terminator_finalize(astreamer *streamer) { char buffer[2 * TAR_BLOCK_SIZE]; memset(buffer, 0, 2 * TAR_BLOCK_SIZE); - bbstreamer_content(streamer->bbs_next, NULL, buffer, - 2 * TAR_BLOCK_SIZE, BBSTREAMER_UNKNOWN); - bbstreamer_finalize(streamer->bbs_next); + astreamer_content(streamer->bbs_next, NULL, buffer, + 2 * TAR_BLOCK_SIZE, ASTREAMER_UNKNOWN); + astreamer_finalize(streamer->bbs_next); } /* * Free memory associated with a tar terminator. */ static void -bbstreamer_tar_terminator_free(bbstreamer *streamer) +astreamer_tar_terminator_free(astreamer *streamer) { - bbstreamer_free(streamer->bbs_next); + astreamer_free(streamer->bbs_next); pfree(streamer); } diff --git a/src/bin/pg_basebackup/bbstreamer_zstd.c b/src/bin/pg_basebackup/astreamer_zstd.c index 20f11d4450e..58dc679ef99 100644 --- a/src/bin/pg_basebackup/bbstreamer_zstd.c +++ b/src/bin/pg_basebackup/astreamer_zstd.c @@ -1,11 +1,11 @@ /*------------------------------------------------------------------------- * - * bbstreamer_zstd.c + * astreamer_zstd.c * * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group * * IDENTIFICATION - * src/bin/pg_basebackup/bbstreamer_zstd.c + * src/bin/pg_basebackup/astreamer_zstd.c *------------------------------------------------------------------------- */ @@ -17,44 +17,44 @@ #include <zstd.h> #endif -#include "bbstreamer.h" +#include "astreamer.h" #include "common/logging.h" #ifdef USE_ZSTD -typedef struct bbstreamer_zstd_frame +typedef struct astreamer_zstd_frame { - bbstreamer base; + astreamer base; ZSTD_CCtx *cctx; ZSTD_DCtx *dctx; ZSTD_outBuffer zstd_outBuf; -} bbstreamer_zstd_frame; - -static void bbstreamer_zstd_compressor_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context); -static void bbstreamer_zstd_compressor_finalize(bbstreamer *streamer); -static void bbstreamer_zstd_compressor_free(bbstreamer *streamer); - -static const bbstreamer_ops bbstreamer_zstd_compressor_ops = { - .content = bbstreamer_zstd_compressor_content, - .finalize = bbstreamer_zstd_compressor_finalize, - .free = bbstreamer_zstd_compressor_free +} astreamer_zstd_frame; + +static void astreamer_zstd_compressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_zstd_compressor_finalize(astreamer *streamer); +static void astreamer_zstd_compressor_free(astreamer *streamer); + +static const astreamer_ops astreamer_zstd_compressor_ops = { + .content = astreamer_zstd_compressor_content, + .finalize = astreamer_zstd_compressor_finalize, + .free = astreamer_zstd_compressor_free }; -static void bbstreamer_zstd_decompressor_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context); -static void bbstreamer_zstd_decompressor_finalize(bbstreamer *streamer); -static void bbstreamer_zstd_decompressor_free(bbstreamer *streamer); - -static const bbstreamer_ops bbstreamer_zstd_decompressor_ops = { - .content = bbstreamer_zstd_decompressor_content, - .finalize = bbstreamer_zstd_decompressor_finalize, - .free = bbstreamer_zstd_decompressor_free +static void astreamer_zstd_decompressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_zstd_decompressor_finalize(astreamer *streamer); +static void astreamer_zstd_decompressor_free(astreamer *streamer); + +static const astreamer_ops astreamer_zstd_decompressor_ops = { + .content = astreamer_zstd_decompressor_content, + .finalize = astreamer_zstd_decompressor_finalize, + .free = astreamer_zstd_decompressor_free }; #endif @@ -62,19 +62,19 @@ static const bbstreamer_ops bbstreamer_zstd_decompressor_ops = { * Create a new base backup streamer that performs zstd compression of tar * blocks. */ -bbstreamer * -bbstreamer_zstd_compressor_new(bbstreamer *next, pg_compress_specification *compress) +astreamer * +astreamer_zstd_compressor_new(astreamer *next, pg_compress_specification *compress) { #ifdef USE_ZSTD - bbstreamer_zstd_frame *streamer; + astreamer_zstd_frame *streamer; size_t ret; Assert(next != NULL); - streamer = palloc0(sizeof(bbstreamer_zstd_frame)); + streamer = palloc0(sizeof(astreamer_zstd_frame)); - *((const bbstreamer_ops **) &streamer->base.bbs_ops) = - &bbstreamer_zstd_compressor_ops; + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_zstd_compressor_ops; streamer->base.bbs_next = next; initStringInfo(&streamer->base.bbs_buffer); @@ -142,12 +142,12 @@ bbstreamer_zstd_compressor_new(bbstreamer *next, pg_compress_specification *comp * of output buffer to next streamer and empty the buffer. */ static void -bbstreamer_zstd_compressor_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context) +astreamer_zstd_compressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) { - bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer; + astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer; ZSTD_inBuffer inBuf = {data, len, 0}; while (inBuf.pos < inBuf.size) @@ -162,10 +162,10 @@ bbstreamer_zstd_compressor_content(bbstreamer *streamer, if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos < max_needed) { - bbstreamer_content(mystreamer->base.bbs_next, member, - mystreamer->zstd_outBuf.dst, - mystreamer->zstd_outBuf.pos, - context); + astreamer_content(mystreamer->base.bbs_next, member, + mystreamer->zstd_outBuf.dst, + mystreamer->zstd_outBuf.pos, + context); /* Reset the ZSTD output buffer. */ mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data; @@ -187,9 +187,9 @@ bbstreamer_zstd_compressor_content(bbstreamer *streamer, * End-of-stream processing. */ static void -bbstreamer_zstd_compressor_finalize(bbstreamer *streamer) +astreamer_zstd_compressor_finalize(astreamer *streamer) { - bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer; + astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer; size_t yet_to_flush; do @@ -204,10 +204,10 @@ bbstreamer_zstd_compressor_finalize(bbstreamer *streamer) if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos < max_needed) { - bbstreamer_content(mystreamer->base.bbs_next, NULL, - mystreamer->zstd_outBuf.dst, - mystreamer->zstd_outBuf.pos, - BBSTREAMER_UNKNOWN); + astreamer_content(mystreamer->base.bbs_next, NULL, + mystreamer->zstd_outBuf.dst, + mystreamer->zstd_outBuf.pos, + ASTREAMER_UNKNOWN); /* Reset the ZSTD output buffer. */ mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data; @@ -227,23 +227,23 @@ bbstreamer_zstd_compressor_finalize(bbstreamer *streamer) /* Make sure to pass any remaining bytes to the next streamer. */ if (mystreamer->zstd_outBuf.pos > 0) - bbstreamer_content(mystreamer->base.bbs_next, NULL, - mystreamer->zstd_outBuf.dst, - mystreamer->zstd_outBuf.pos, - BBSTREAMER_UNKNOWN); + astreamer_content(mystreamer->base.bbs_next, NULL, + mystreamer->zstd_outBuf.dst, + mystreamer->zstd_outBuf.pos, + ASTREAMER_UNKNOWN); - bbstreamer_finalize(mystreamer->base.bbs_next); + astreamer_finalize(mystreamer->base.bbs_next); } /* * Free memory. */ static void -bbstreamer_zstd_compressor_free(bbstreamer *streamer) +astreamer_zstd_compressor_free(astreamer *streamer) { - bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer; + astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer; - bbstreamer_free(streamer->bbs_next); + astreamer_free(streamer->bbs_next); ZSTD_freeCCtx(mystreamer->cctx); pfree(streamer->bbs_buffer.data); pfree(streamer); @@ -254,17 +254,17 @@ bbstreamer_zstd_compressor_free(bbstreamer *streamer) * Create a new base backup streamer that performs decompression of zstd * compressed blocks. */ -bbstreamer * -bbstreamer_zstd_decompressor_new(bbstreamer *next) +astreamer * +astreamer_zstd_decompressor_new(astreamer *next) { #ifdef USE_ZSTD - bbstreamer_zstd_frame *streamer; + astreamer_zstd_frame *streamer; Assert(next != NULL); - streamer = palloc0(sizeof(bbstreamer_zstd_frame)); - *((const bbstreamer_ops **) &streamer->base.bbs_ops) = - &bbstreamer_zstd_decompressor_ops; + streamer = palloc0(sizeof(astreamer_zstd_frame)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_zstd_decompressor_ops; streamer->base.bbs_next = next; initStringInfo(&streamer->base.bbs_buffer); @@ -293,12 +293,12 @@ bbstreamer_zstd_decompressor_new(bbstreamer *next) * to the next streamer. */ static void -bbstreamer_zstd_decompressor_content(bbstreamer *streamer, - bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context) +astreamer_zstd_decompressor_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context) { - bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer; + astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer; ZSTD_inBuffer inBuf = {data, len, 0}; while (inBuf.pos < inBuf.size) @@ -311,10 +311,10 @@ bbstreamer_zstd_decompressor_content(bbstreamer *streamer, */ if (mystreamer->zstd_outBuf.pos >= mystreamer->zstd_outBuf.size) { - bbstreamer_content(mystreamer->base.bbs_next, member, - mystreamer->zstd_outBuf.dst, - mystreamer->zstd_outBuf.pos, - context); + astreamer_content(mystreamer->base.bbs_next, member, + mystreamer->zstd_outBuf.dst, + mystreamer->zstd_outBuf.pos, + context); /* Reset the ZSTD output buffer. */ mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data; @@ -335,32 +335,32 @@ bbstreamer_zstd_decompressor_content(bbstreamer *streamer, * End-of-stream processing. */ static void -bbstreamer_zstd_decompressor_finalize(bbstreamer *streamer) +astreamer_zstd_decompressor_finalize(astreamer *streamer) { - bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer; + astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer; /* * End of the stream, if there is some pending data in output buffers then * we must forward it to next streamer. */ if (mystreamer->zstd_outBuf.pos > 0) - bbstreamer_content(mystreamer->base.bbs_next, NULL, - mystreamer->base.bbs_buffer.data, - mystreamer->base.bbs_buffer.maxlen, - BBSTREAMER_UNKNOWN); + astreamer_content(mystreamer->base.bbs_next, NULL, + mystreamer->base.bbs_buffer.data, + mystreamer->base.bbs_buffer.maxlen, + ASTREAMER_UNKNOWN); - bbstreamer_finalize(mystreamer->base.bbs_next); + astreamer_finalize(mystreamer->base.bbs_next); } /* * Free memory. */ static void -bbstreamer_zstd_decompressor_free(bbstreamer *streamer) +astreamer_zstd_decompressor_free(astreamer *streamer) { - bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer; + astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer; - bbstreamer_free(streamer->bbs_next); + astreamer_free(streamer->bbs_next); ZSTD_freeDCtx(mystreamer->dctx); pfree(streamer->bbs_buffer.data); pfree(streamer); diff --git a/src/bin/pg_basebackup/bbstreamer.h b/src/bin/pg_basebackup/bbstreamer.h deleted file mode 100644 index 3b820f13b51..00000000000 --- a/src/bin/pg_basebackup/bbstreamer.h +++ /dev/null @@ -1,226 +0,0 @@ -/*------------------------------------------------------------------------- - * - * bbstreamer.h - * - * Each tar archive returned by the server is passed to one or more - * bbstreamer objects for further processing. The bbstreamer may do - * something simple, like write the archive to a file, perhaps after - * compressing it, but it can also do more complicated things, like - * annotating the byte stream to indicate which parts of the data - * correspond to tar headers or trailing padding, vs. which parts are - * payload data. A subsequent bbstreamer may use this information to - * make further decisions about how to process the data; for example, - * it might choose to modify the archive contents. - * - * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group - * - * IDENTIFICATION - * src/bin/pg_basebackup/bbstreamer.h - *------------------------------------------------------------------------- - */ - -#ifndef BBSTREAMER_H -#define BBSTREAMER_H - -#include "common/compression.h" -#include "lib/stringinfo.h" -#include "pqexpbuffer.h" - -struct bbstreamer; -struct bbstreamer_ops; -typedef struct bbstreamer bbstreamer; -typedef struct bbstreamer_ops bbstreamer_ops; - -/* - * Each chunk of archive data passed to a bbstreamer is classified into one - * of these categories. When data is first received from the remote server, - * each chunk will be categorized as BBSTREAMER_UNKNOWN, and the chunks will - * be of whatever size the remote server chose to send. - * - * If the archive is parsed (e.g. see bbstreamer_tar_parser_new()), then all - * chunks should be labelled as one of the other types listed here. In - * addition, there should be exactly one BBSTREAMER_MEMBER_HEADER chunk and - * exactly one BBSTREAMER_MEMBER_TRAILER chunk per archive member, even if - * that means a zero-length call. There can be any number of - * BBSTREAMER_MEMBER_CONTENTS chunks in between those calls. There - * should exactly BBSTREAMER_ARCHIVE_TRAILER chunk, and it should follow the - * last BBSTREAMER_MEMBER_TRAILER chunk. - * - * In theory, we could need other classifications here, such as a way of - * indicating an archive header, but the "tar" format doesn't need anything - * else, so for the time being there's no point. - */ -typedef enum -{ - BBSTREAMER_UNKNOWN, - BBSTREAMER_MEMBER_HEADER, - BBSTREAMER_MEMBER_CONTENTS, - BBSTREAMER_MEMBER_TRAILER, - BBSTREAMER_ARCHIVE_TRAILER, -} bbstreamer_archive_context; - -/* - * Each chunk of data that is classified as BBSTREAMER_MEMBER_HEADER, - * BBSTREAMER_MEMBER_CONTENTS, or BBSTREAMER_MEMBER_TRAILER should also - * pass a pointer to an instance of this struct. The details are expected - * to be present in the archive header and used to fill the struct, after - * which all subsequent calls for the same archive member are expected to - * pass the same details. - */ -typedef struct -{ - char pathname[MAXPGPATH]; - pgoff_t size; - mode_t mode; - uid_t uid; - gid_t gid; - bool is_directory; - bool is_link; - char linktarget[MAXPGPATH]; -} bbstreamer_member; - -/* - * Generally, each type of bbstreamer will define its own struct, but the - * first element should be 'bbstreamer base'. A bbstreamer that does not - * require any additional private data could use this structure directly. - * - * bbs_ops is a pointer to the bbstreamer_ops object which contains the - * function pointers appropriate to this type of bbstreamer. - * - * bbs_next is a pointer to the successor bbstreamer, for those types of - * bbstreamer which forward data to a successor. It need not be used and - * should be set to NULL when not relevant. - * - * bbs_buffer is a buffer for accumulating data for temporary storage. Each - * type of bbstreamer makes its own decisions about whether and how to use - * this buffer. - */ -struct bbstreamer -{ - const bbstreamer_ops *bbs_ops; - bbstreamer *bbs_next; - StringInfoData bbs_buffer; -}; - -/* - * There are three callbacks for a bbstreamer. The 'content' callback is - * called repeatedly, as described in the bbstreamer_archive_context comments. - * Then, the 'finalize' callback is called once at the end, to give the - * bbstreamer a chance to perform cleanup such as closing files. Finally, - * because this code is running in a frontend environment where, as of this - * writing, there are no memory contexts, the 'free' callback is called to - * release memory. These callbacks should always be invoked using the static - * inline functions defined below. - */ -struct bbstreamer_ops -{ - void (*content) (bbstreamer *streamer, bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context); - void (*finalize) (bbstreamer *streamer); - void (*free) (bbstreamer *streamer); -}; - -/* Send some content to a bbstreamer. */ -static inline void -bbstreamer_content(bbstreamer *streamer, bbstreamer_member *member, - const char *data, int len, - bbstreamer_archive_context context) -{ - Assert(streamer != NULL); - streamer->bbs_ops->content(streamer, member, data, len, context); -} - -/* Finalize a bbstreamer. */ -static inline void -bbstreamer_finalize(bbstreamer *streamer) -{ - Assert(streamer != NULL); - streamer->bbs_ops->finalize(streamer); -} - -/* Free a bbstreamer. */ -static inline void -bbstreamer_free(bbstreamer *streamer) -{ - Assert(streamer != NULL); - streamer->bbs_ops->free(streamer); -} - -/* - * This is a convenience method for use when implementing a bbstreamer; it is - * not for use by outside callers. It adds the amount of data specified by - * 'nbytes' to the bbstreamer's buffer and adjusts '*len' and '*data' - * accordingly. - */ -static inline void -bbstreamer_buffer_bytes(bbstreamer *streamer, const char **data, int *len, - int nbytes) -{ - Assert(nbytes <= *len); - - appendBinaryStringInfo(&streamer->bbs_buffer, *data, nbytes); - *len -= nbytes; - *data += nbytes; -} - -/* - * This is a convenience method for use when implementing a bbstreamer; it is - * not for use by outsider callers. It attempts to add enough data to the - * bbstreamer's buffer to reach a length of target_bytes and adjusts '*len' - * and '*data' accordingly. It returns true if the target length has been - * reached and false otherwise. - */ -static inline bool -bbstreamer_buffer_until(bbstreamer *streamer, const char **data, int *len, - int target_bytes) -{ - int buflen = streamer->bbs_buffer.len; - - if (buflen >= target_bytes) - { - /* Target length already reached; nothing to do. */ - return true; - } - - if (buflen + *len < target_bytes) - { - /* Not enough data to reach target length; buffer all of it. */ - bbstreamer_buffer_bytes(streamer, data, len, *len); - return false; - } - - /* Buffer just enough to reach the target length. */ - bbstreamer_buffer_bytes(streamer, data, len, target_bytes - buflen); - return true; -} - -/* - * Functions for creating bbstreamer objects of various types. See the header - * comments for each of these functions for details. - */ -extern bbstreamer *bbstreamer_plain_writer_new(char *pathname, FILE *file); -extern bbstreamer *bbstreamer_gzip_writer_new(char *pathname, FILE *file, - pg_compress_specification *compress); -extern bbstreamer *bbstreamer_extractor_new(const char *basepath, - const char *(*link_map) (const char *), - void (*report_output_file) (const char *)); - -extern bbstreamer *bbstreamer_gzip_decompressor_new(bbstreamer *next); -extern bbstreamer *bbstreamer_lz4_compressor_new(bbstreamer *next, - pg_compress_specification *compress); -extern bbstreamer *bbstreamer_lz4_decompressor_new(bbstreamer *next); -extern bbstreamer *bbstreamer_zstd_compressor_new(bbstreamer *next, - pg_compress_specification *compress); -extern bbstreamer *bbstreamer_zstd_decompressor_new(bbstreamer *next); -extern bbstreamer *bbstreamer_tar_parser_new(bbstreamer *next); -extern bbstreamer *bbstreamer_tar_terminator_new(bbstreamer *next); -extern bbstreamer *bbstreamer_tar_archiver_new(bbstreamer *next); - -extern bbstreamer *bbstreamer_recovery_injector_new(bbstreamer *next, - bool is_recovery_guc_supported, - PQExpBuffer recoveryconfcontents); -extern void bbstreamer_inject_file(bbstreamer *streamer, char *pathname, - char *data, int len); - -#endif diff --git a/src/bin/pg_basebackup/meson.build b/src/bin/pg_basebackup/meson.build index c00acd5e118..a68dbd7837d 100644 --- a/src/bin/pg_basebackup/meson.build +++ b/src/bin/pg_basebackup/meson.build @@ -1,12 +1,12 @@ # Copyright (c) 2022-2024, PostgreSQL Global Development Group common_sources = files( - 'bbstreamer_file.c', - 'bbstreamer_gzip.c', - 'bbstreamer_inject.c', - 'bbstreamer_lz4.c', - 'bbstreamer_tar.c', - 'bbstreamer_zstd.c', + 'astreamer_file.c', + 'astreamer_gzip.c', + 'astreamer_inject.c', + 'astreamer_lz4.c', + 'astreamer_tar.c', + 'astreamer_zstd.c', 'receivelog.c', 'streamutil.c', 'walmethods.c', diff --git a/src/bin/pg_basebackup/nls.mk b/src/bin/pg_basebackup/nls.mk index 384dbb021e9..950b9797b1e 100644 --- a/src/bin/pg_basebackup/nls.mk +++ b/src/bin/pg_basebackup/nls.mk @@ -1,12 +1,12 @@ # src/bin/pg_basebackup/nls.mk CATALOG_NAME = pg_basebackup GETTEXT_FILES = $(FRONTEND_COMMON_GETTEXT_FILES) \ - bbstreamer_file.c \ - bbstreamer_gzip.c \ - bbstreamer_inject.c \ - bbstreamer_lz4.c \ - bbstreamer_tar.c \ - bbstreamer_zstd.c \ + astreamer_file.c \ + astreamer_gzip.c \ + astreamer_inject.c \ + astreamer_lz4.c \ + astreamer_tar.c \ + astreamer_zstd.c \ pg_basebackup.c \ pg_createsubscriber.c \ pg_receivewal.c \ diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 8f3dd04fd22..1966ada69c9 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -26,8 +26,8 @@ #endif #include "access/xlog_internal.h" +#include "astreamer.h" #include "backup/basebackup.h" -#include "bbstreamer.h" #include "common/compression.h" #include "common/file_perm.h" #include "common/file_utils.h" @@ -57,8 +57,8 @@ typedef struct ArchiveStreamState { int tablespacenum; pg_compress_specification *compress; - bbstreamer *streamer; - bbstreamer *manifest_inject_streamer; + astreamer *streamer; + astreamer *manifest_inject_streamer; PQExpBuffer manifest_buffer; char manifest_filename[MAXPGPATH]; FILE *manifest_file; @@ -67,7 +67,7 @@ typedef struct ArchiveStreamState typedef struct WriteTarState { int tablespacenum; - bbstreamer *streamer; + astreamer *streamer; } WriteTarState; typedef struct WriteManifestState @@ -199,11 +199,11 @@ static void verify_dir_is_empty_or_create(char *dirname, bool *created, bool *fo static void progress_update_filename(const char *filename); static void progress_report(int tablespacenum, bool force, bool finished); -static bbstreamer *CreateBackupStreamer(char *archive_name, char *spclocation, - bbstreamer **manifest_inject_streamer_p, - bool is_recovery_guc_supported, - bool expect_unterminated_tarfile, - pg_compress_specification *compress); +static astreamer *CreateBackupStreamer(char *archive_name, char *spclocation, + astreamer **manifest_inject_streamer_p, + bool is_recovery_guc_supported, + bool expect_unterminated_tarfile, + pg_compress_specification *compress); static void ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data); static char GetCopyDataByte(size_t r, char *copybuf, size_t *cursor); @@ -1053,19 +1053,19 @@ ReceiveCopyData(PGconn *conn, WriteDataCallback callback, * the options selected by the user. We may just write the results directly * to a file, or we might compress first, or we might extract the tar file * and write each member separately. This function doesn't do any of that - * directly, but it works out what kind of bbstreamer we need to create so + * directly, but it works out what kind of astreamer we need to create so * that the right stuff happens when, down the road, we actually receive * the data. */ -static bbstreamer * +static astreamer * CreateBackupStreamer(char *archive_name, char *spclocation, - bbstreamer **manifest_inject_streamer_p, + astreamer **manifest_inject_streamer_p, bool is_recovery_guc_supported, bool expect_unterminated_tarfile, pg_compress_specification *compress) { - bbstreamer *streamer = NULL; - bbstreamer *manifest_inject_streamer = NULL; + astreamer *streamer = NULL; + astreamer *manifest_inject_streamer = NULL; bool inject_manifest; bool is_tar, is_tar_gz, @@ -1160,9 +1160,9 @@ CreateBackupStreamer(char *archive_name, char *spclocation, directory = psprintf("%s/%s", basedir, spclocation); else directory = get_tablespace_mapping(spclocation); - streamer = bbstreamer_extractor_new(directory, - get_tablespace_mapping, - progress_update_filename); + streamer = astreamer_extractor_new(directory, + get_tablespace_mapping, + progress_update_filename); } else { @@ -1188,27 +1188,27 @@ CreateBackupStreamer(char *archive_name, char *spclocation, } if (compress->algorithm == PG_COMPRESSION_NONE) - streamer = bbstreamer_plain_writer_new(archive_filename, - archive_file); + streamer = astreamer_plain_writer_new(archive_filename, + archive_file); else if (compress->algorithm == PG_COMPRESSION_GZIP) { strlcat(archive_filename, ".gz", sizeof(archive_filename)); - streamer = bbstreamer_gzip_writer_new(archive_filename, - archive_file, compress); + streamer = astreamer_gzip_writer_new(archive_filename, + archive_file, compress); } else if (compress->algorithm == PG_COMPRESSION_LZ4) { strlcat(archive_filename, ".lz4", sizeof(archive_filename)); - streamer = bbstreamer_plain_writer_new(archive_filename, - archive_file); - streamer = bbstreamer_lz4_compressor_new(streamer, compress); + streamer = astreamer_plain_writer_new(archive_filename, + archive_file); + streamer = astreamer_lz4_compressor_new(streamer, compress); } else if (compress->algorithm == PG_COMPRESSION_ZSTD) { strlcat(archive_filename, ".zst", sizeof(archive_filename)); - streamer = bbstreamer_plain_writer_new(archive_filename, - archive_file); - streamer = bbstreamer_zstd_compressor_new(streamer, compress); + streamer = astreamer_plain_writer_new(archive_filename, + archive_file); + streamer = astreamer_zstd_compressor_new(streamer, compress); } else { @@ -1222,7 +1222,7 @@ CreateBackupStreamer(char *archive_name, char *spclocation, * into it. */ if (must_parse_archive) - streamer = bbstreamer_tar_archiver_new(streamer); + streamer = astreamer_tar_archiver_new(streamer); progress_update_filename(archive_filename); } @@ -1241,9 +1241,9 @@ CreateBackupStreamer(char *archive_name, char *spclocation, if (spclocation == NULL && writerecoveryconf) { Assert(must_parse_archive); - streamer = bbstreamer_recovery_injector_new(streamer, - is_recovery_guc_supported, - recoveryconfcontents); + streamer = astreamer_recovery_injector_new(streamer, + is_recovery_guc_supported, + recoveryconfcontents); } /* @@ -1253,9 +1253,9 @@ CreateBackupStreamer(char *archive_name, char *spclocation, * we're talking to such a server we'll need to add the terminator here. */ if (must_parse_archive) - streamer = bbstreamer_tar_parser_new(streamer); + streamer = astreamer_tar_parser_new(streamer); else if (expect_unterminated_tarfile) - streamer = bbstreamer_tar_terminator_new(streamer); + streamer = astreamer_tar_terminator_new(streamer); /* * If the user has requested a server compressed archive along with @@ -1264,11 +1264,11 @@ CreateBackupStreamer(char *archive_name, char *spclocation, if (format == 'p') { if (is_tar_gz) - streamer = bbstreamer_gzip_decompressor_new(streamer); + streamer = astreamer_gzip_decompressor_new(streamer); else if (is_tar_lz4) - streamer = bbstreamer_lz4_decompressor_new(streamer); + streamer = astreamer_lz4_decompressor_new(streamer); else if (is_tar_zstd) - streamer = bbstreamer_zstd_decompressor_new(streamer); + streamer = astreamer_zstd_decompressor_new(streamer); } /* Return the results. */ @@ -1307,10 +1307,10 @@ ReceiveArchiveStream(PGconn *conn, pg_compress_specification *compress) if (state.manifest_inject_streamer != NULL && state.manifest_buffer != NULL) { - bbstreamer_inject_file(state.manifest_inject_streamer, - "backup_manifest", - state.manifest_buffer->data, - state.manifest_buffer->len); + astreamer_inject_file(state.manifest_inject_streamer, + "backup_manifest", + state.manifest_buffer->data, + state.manifest_buffer->len); destroyPQExpBuffer(state.manifest_buffer); state.manifest_buffer = NULL; } @@ -1318,8 +1318,8 @@ ReceiveArchiveStream(PGconn *conn, pg_compress_specification *compress) /* If there's still an archive in progress, end processing. */ if (state.streamer != NULL) { - bbstreamer_finalize(state.streamer); - bbstreamer_free(state.streamer); + astreamer_finalize(state.streamer); + astreamer_free(state.streamer); state.streamer = NULL; } } @@ -1383,8 +1383,8 @@ ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data) /* End processing of any prior archive. */ if (state->streamer != NULL) { - bbstreamer_finalize(state->streamer); - bbstreamer_free(state->streamer); + astreamer_finalize(state->streamer); + astreamer_free(state->streamer); state->streamer = NULL; } @@ -1437,8 +1437,8 @@ ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data) else if (state->streamer != NULL) { /* Archive data. */ - bbstreamer_content(state->streamer, NULL, copybuf + 1, - r - 1, BBSTREAMER_UNKNOWN); + astreamer_content(state->streamer, NULL, copybuf + 1, + r - 1, ASTREAMER_UNKNOWN); } else pg_fatal("unexpected payload data"); @@ -1600,7 +1600,7 @@ ReceiveTarFile(PGconn *conn, char *archive_name, char *spclocation, bool tablespacenum, pg_compress_specification *compress) { WriteTarState state; - bbstreamer *manifest_inject_streamer; + astreamer *manifest_inject_streamer; bool is_recovery_guc_supported; bool expect_unterminated_tarfile; @@ -1636,16 +1636,16 @@ ReceiveTarFile(PGconn *conn, char *archive_name, char *spclocation, pg_fatal("out of memory"); /* Inject it into the output tarfile. */ - bbstreamer_inject_file(manifest_inject_streamer, "backup_manifest", - buf.data, buf.len); + astreamer_inject_file(manifest_inject_streamer, "backup_manifest", + buf.data, buf.len); /* Free memory. */ termPQExpBuffer(&buf); } /* Cleanup. */ - bbstreamer_finalize(state.streamer); - bbstreamer_free(state.streamer); + astreamer_finalize(state.streamer); + astreamer_free(state.streamer); progress_report(tablespacenum, true, false); @@ -1663,7 +1663,7 @@ ReceiveTarCopyChunk(size_t r, char *copybuf, void *callback_data) { WriteTarState *state = callback_data; - bbstreamer_content(state->streamer, NULL, copybuf, r, BBSTREAMER_UNKNOWN); + astreamer_content(state->streamer, NULL, copybuf, r, ASTREAMER_UNKNOWN); totaldone += r; progress_report(state->tablespacenum, false, false); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 6e6b7c27118..547d14b3e7c 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -3317,19 +3317,19 @@ bbsink_shell bbsink_state bbsink_throttle bbsink_zstd -bbstreamer -bbstreamer_archive_context -bbstreamer_extractor -bbstreamer_gzip_decompressor -bbstreamer_gzip_writer -bbstreamer_lz4_frame -bbstreamer_member -bbstreamer_ops -bbstreamer_plain_writer -bbstreamer_recovery_injector -bbstreamer_tar_archiver -bbstreamer_tar_parser -bbstreamer_zstd_frame +astreamer +astreamer_archive_context +astreamer_extractor +astreamer_gzip_decompressor +astreamer_gzip_writer +astreamer_lz4_frame +astreamer_member +astreamer_ops +astreamer_plain_writer +astreamer_recovery_injector +astreamer_tar_archiver +astreamer_tar_parser +astreamer_zstd_frame bgworker_main_type bh_node_type binaryheap |