aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/bin/pg_basebackup/Makefile12
-rw-r--r--src/bin/pg_basebackup/astreamer.h226
-rw-r--r--src/bin/pg_basebackup/astreamer_file.c (renamed from src/bin/pg_basebackup/bbstreamer_file.c)152
-rw-r--r--src/bin/pg_basebackup/astreamer_gzip.c (renamed from src/bin/pg_basebackup/bbstreamer_gzip.c)156
-rw-r--r--src/bin/pg_basebackup/astreamer_inject.c (renamed from src/bin/pg_basebackup/bbstreamer_inject.c)156
-rw-r--r--src/bin/pg_basebackup/astreamer_lz4.c (renamed from src/bin/pg_basebackup/bbstreamer_lz4.c)178
-rw-r--r--src/bin/pg_basebackup/astreamer_tar.c (renamed from src/bin/pg_basebackup/bbstreamer_tar.c)324
-rw-r--r--src/bin/pg_basebackup/astreamer_zstd.c (renamed from src/bin/pg_basebackup/bbstreamer_zstd.c)166
-rw-r--r--src/bin/pg_basebackup/bbstreamer.h226
-rw-r--r--src/bin/pg_basebackup/meson.build12
-rw-r--r--src/bin/pg_basebackup/nls.mk12
-rw-r--r--src/bin/pg_basebackup/pg_basebackup.c104
-rw-r--r--src/tools/pgindent/typedefs.list26
13 files changed, 875 insertions, 875 deletions
diff --git a/src/bin/pg_basebackup/Makefile b/src/bin/pg_basebackup/Makefile
index 26c53e473f5..a71af2d48a7 100644
--- a/src/bin/pg_basebackup/Makefile
+++ b/src/bin/pg_basebackup/Makefile
@@ -37,12 +37,12 @@ OBJS = \
BBOBJS = \
pg_basebackup.o \
- bbstreamer_file.o \
- bbstreamer_gzip.o \
- bbstreamer_inject.o \
- bbstreamer_lz4.o \
- bbstreamer_tar.o \
- bbstreamer_zstd.o
+ astreamer_file.o \
+ astreamer_gzip.o \
+ astreamer_inject.o \
+ astreamer_lz4.o \
+ astreamer_tar.o \
+ astreamer_zstd.o
all: pg_basebackup pg_createsubscriber pg_receivewal pg_recvlogical
diff --git a/src/bin/pg_basebackup/astreamer.h b/src/bin/pg_basebackup/astreamer.h
new file mode 100644
index 00000000000..b5ed138f54e
--- /dev/null
+++ b/src/bin/pg_basebackup/astreamer.h
@@ -0,0 +1,226 @@
+/*-------------------------------------------------------------------------
+ *
+ * astreamer.h
+ *
+ * Each tar archive returned by the server is passed to one or more
+ * astreamer objects for further processing. The astreamer may do
+ * something simple, like write the archive to a file, perhaps after
+ * compressing it, but it can also do more complicated things, like
+ * annotating the byte stream to indicate which parts of the data
+ * correspond to tar headers or trailing padding, vs. which parts are
+ * payload data. A subsequent astreamer may use this information to
+ * make further decisions about how to process the data; for example,
+ * it might choose to modify the archive contents.
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/bin/pg_basebackup/astreamer.h
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef ASTREAMER_H
+#define ASTREAMER_H
+
+#include "common/compression.h"
+#include "lib/stringinfo.h"
+#include "pqexpbuffer.h"
+
+struct astreamer;
+struct astreamer_ops;
+typedef struct astreamer astreamer;
+typedef struct astreamer_ops astreamer_ops;
+
+/*
+ * Each chunk of archive data passed to a astreamer is classified into one
+ * of these categories. When data is first received from the remote server,
+ * each chunk will be categorized as ASTREAMER_UNKNOWN, and the chunks will
+ * be of whatever size the remote server chose to send.
+ *
+ * If the archive is parsed (e.g. see astreamer_tar_parser_new()), then all
+ * chunks should be labelled as one of the other types listed here. In
+ * addition, there should be exactly one ASTREAMER_MEMBER_HEADER chunk and
+ * exactly one ASTREAMER_MEMBER_TRAILER chunk per archive member, even if
+ * that means a zero-length call. There can be any number of
+ * ASTREAMER_MEMBER_CONTENTS chunks in between those calls. There
+ * should exactly ASTREAMER_ARCHIVE_TRAILER chunk, and it should follow the
+ * last ASTREAMER_MEMBER_TRAILER chunk.
+ *
+ * In theory, we could need other classifications here, such as a way of
+ * indicating an archive header, but the "tar" format doesn't need anything
+ * else, so for the time being there's no point.
+ */
+typedef enum
+{
+ ASTREAMER_UNKNOWN,
+ ASTREAMER_MEMBER_HEADER,
+ ASTREAMER_MEMBER_CONTENTS,
+ ASTREAMER_MEMBER_TRAILER,
+ ASTREAMER_ARCHIVE_TRAILER,
+} astreamer_archive_context;
+
+/*
+ * Each chunk of data that is classified as ASTREAMER_MEMBER_HEADER,
+ * ASTREAMER_MEMBER_CONTENTS, or ASTREAMER_MEMBER_TRAILER should also
+ * pass a pointer to an instance of this struct. The details are expected
+ * to be present in the archive header and used to fill the struct, after
+ * which all subsequent calls for the same archive member are expected to
+ * pass the same details.
+ */
+typedef struct
+{
+ char pathname[MAXPGPATH];
+ pgoff_t size;
+ mode_t mode;
+ uid_t uid;
+ gid_t gid;
+ bool is_directory;
+ bool is_link;
+ char linktarget[MAXPGPATH];
+} astreamer_member;
+
+/*
+ * Generally, each type of astreamer will define its own struct, but the
+ * first element should be 'astreamer base'. A astreamer that does not
+ * require any additional private data could use this structure directly.
+ *
+ * bbs_ops is a pointer to the astreamer_ops object which contains the
+ * function pointers appropriate to this type of astreamer.
+ *
+ * bbs_next is a pointer to the successor astreamer, for those types of
+ * astreamer which forward data to a successor. It need not be used and
+ * should be set to NULL when not relevant.
+ *
+ * bbs_buffer is a buffer for accumulating data for temporary storage. Each
+ * type of astreamer makes its own decisions about whether and how to use
+ * this buffer.
+ */
+struct astreamer
+{
+ const astreamer_ops *bbs_ops;
+ astreamer *bbs_next;
+ StringInfoData bbs_buffer;
+};
+
+/*
+ * There are three callbacks for a astreamer. The 'content' callback is
+ * called repeatedly, as described in the astreamer_archive_context comments.
+ * Then, the 'finalize' callback is called once at the end, to give the
+ * astreamer a chance to perform cleanup such as closing files. Finally,
+ * because this code is running in a frontend environment where, as of this
+ * writing, there are no memory contexts, the 'free' callback is called to
+ * release memory. These callbacks should always be invoked using the static
+ * inline functions defined below.
+ */
+struct astreamer_ops
+{
+ void (*content) (astreamer *streamer, astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context);
+ void (*finalize) (astreamer *streamer);
+ void (*free) (astreamer *streamer);
+};
+
+/* Send some content to a astreamer. */
+static inline void
+astreamer_content(astreamer *streamer, astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context)
+{
+ Assert(streamer != NULL);
+ streamer->bbs_ops->content(streamer, member, data, len, context);
+}
+
+/* Finalize a astreamer. */
+static inline void
+astreamer_finalize(astreamer *streamer)
+{
+ Assert(streamer != NULL);
+ streamer->bbs_ops->finalize(streamer);
+}
+
+/* Free a astreamer. */
+static inline void
+astreamer_free(astreamer *streamer)
+{
+ Assert(streamer != NULL);
+ streamer->bbs_ops->free(streamer);
+}
+
+/*
+ * This is a convenience method for use when implementing a astreamer; it is
+ * not for use by outside callers. It adds the amount of data specified by
+ * 'nbytes' to the astreamer's buffer and adjusts '*len' and '*data'
+ * accordingly.
+ */
+static inline void
+astreamer_buffer_bytes(astreamer *streamer, const char **data, int *len,
+ int nbytes)
+{
+ Assert(nbytes <= *len);
+
+ appendBinaryStringInfo(&streamer->bbs_buffer, *data, nbytes);
+ *len -= nbytes;
+ *data += nbytes;
+}
+
+/*
+ * This is a convenience method for use when implementing a astreamer; it is
+ * not for use by outsider callers. It attempts to add enough data to the
+ * astreamer's buffer to reach a length of target_bytes and adjusts '*len'
+ * and '*data' accordingly. It returns true if the target length has been
+ * reached and false otherwise.
+ */
+static inline bool
+astreamer_buffer_until(astreamer *streamer, const char **data, int *len,
+ int target_bytes)
+{
+ int buflen = streamer->bbs_buffer.len;
+
+ if (buflen >= target_bytes)
+ {
+ /* Target length already reached; nothing to do. */
+ return true;
+ }
+
+ if (buflen + *len < target_bytes)
+ {
+ /* Not enough data to reach target length; buffer all of it. */
+ astreamer_buffer_bytes(streamer, data, len, *len);
+ return false;
+ }
+
+ /* Buffer just enough to reach the target length. */
+ astreamer_buffer_bytes(streamer, data, len, target_bytes - buflen);
+ return true;
+}
+
+/*
+ * Functions for creating astreamer objects of various types. See the header
+ * comments for each of these functions for details.
+ */
+extern astreamer *astreamer_plain_writer_new(char *pathname, FILE *file);
+extern astreamer *astreamer_gzip_writer_new(char *pathname, FILE *file,
+ pg_compress_specification *compress);
+extern astreamer *astreamer_extractor_new(const char *basepath,
+ const char *(*link_map) (const char *),
+ void (*report_output_file) (const char *));
+
+extern astreamer *astreamer_gzip_decompressor_new(astreamer *next);
+extern astreamer *astreamer_lz4_compressor_new(astreamer *next,
+ pg_compress_specification *compress);
+extern astreamer *astreamer_lz4_decompressor_new(astreamer *next);
+extern astreamer *astreamer_zstd_compressor_new(astreamer *next,
+ pg_compress_specification *compress);
+extern astreamer *astreamer_zstd_decompressor_new(astreamer *next);
+extern astreamer *astreamer_tar_parser_new(astreamer *next);
+extern astreamer *astreamer_tar_terminator_new(astreamer *next);
+extern astreamer *astreamer_tar_archiver_new(astreamer *next);
+
+extern astreamer *astreamer_recovery_injector_new(astreamer *next,
+ bool is_recovery_guc_supported,
+ PQExpBuffer recoveryconfcontents);
+extern void astreamer_inject_file(astreamer *streamer, char *pathname,
+ char *data, int len);
+
+#endif
diff --git a/src/bin/pg_basebackup/bbstreamer_file.c b/src/bin/pg_basebackup/astreamer_file.c
index bab6cd4a6b1..2742385e103 100644
--- a/src/bin/pg_basebackup/bbstreamer_file.c
+++ b/src/bin/pg_basebackup/astreamer_file.c
@@ -1,11 +1,11 @@
/*-------------------------------------------------------------------------
*
- * bbstreamer_file.c
+ * astreamer_file.c
*
* Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
*
* IDENTIFICATION
- * src/bin/pg_basebackup/bbstreamer_file.c
+ * src/bin/pg_basebackup/astreamer_file.c
*-------------------------------------------------------------------------
*/
@@ -13,60 +13,60 @@
#include <unistd.h>
-#include "bbstreamer.h"
+#include "astreamer.h"
#include "common/file_perm.h"
#include "common/logging.h"
#include "common/string.h"
-typedef struct bbstreamer_plain_writer
+typedef struct astreamer_plain_writer
{
- bbstreamer base;
+ astreamer base;
char *pathname;
FILE *file;
bool should_close_file;
-} bbstreamer_plain_writer;
+} astreamer_plain_writer;
-typedef struct bbstreamer_extractor
+typedef struct astreamer_extractor
{
- bbstreamer base;
+ astreamer base;
char *basepath;
const char *(*link_map) (const char *);
void (*report_output_file) (const char *);
char filename[MAXPGPATH];
FILE *file;
-} bbstreamer_extractor;
-
-static void bbstreamer_plain_writer_content(bbstreamer *streamer,
- bbstreamer_member *member,
- const char *data, int len,
- bbstreamer_archive_context context);
-static void bbstreamer_plain_writer_finalize(bbstreamer *streamer);
-static void bbstreamer_plain_writer_free(bbstreamer *streamer);
-
-static const bbstreamer_ops bbstreamer_plain_writer_ops = {
- .content = bbstreamer_plain_writer_content,
- .finalize = bbstreamer_plain_writer_finalize,
- .free = bbstreamer_plain_writer_free
+} astreamer_extractor;
+
+static void astreamer_plain_writer_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context);
+static void astreamer_plain_writer_finalize(astreamer *streamer);
+static void astreamer_plain_writer_free(astreamer *streamer);
+
+static const astreamer_ops astreamer_plain_writer_ops = {
+ .content = astreamer_plain_writer_content,
+ .finalize = astreamer_plain_writer_finalize,
+ .free = astreamer_plain_writer_free
};
-static void bbstreamer_extractor_content(bbstreamer *streamer,
- bbstreamer_member *member,
- const char *data, int len,
- bbstreamer_archive_context context);
-static void bbstreamer_extractor_finalize(bbstreamer *streamer);
-static void bbstreamer_extractor_free(bbstreamer *streamer);
+static void astreamer_extractor_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context);
+static void astreamer_extractor_finalize(astreamer *streamer);
+static void astreamer_extractor_free(astreamer *streamer);
static void extract_directory(const char *filename, mode_t mode);
static void extract_link(const char *filename, const char *linktarget);
static FILE *create_file_for_extract(const char *filename, mode_t mode);
-static const bbstreamer_ops bbstreamer_extractor_ops = {
- .content = bbstreamer_extractor_content,
- .finalize = bbstreamer_extractor_finalize,
- .free = bbstreamer_extractor_free
+static const astreamer_ops astreamer_extractor_ops = {
+ .content = astreamer_extractor_content,
+ .finalize = astreamer_extractor_finalize,
+ .free = astreamer_extractor_free
};
/*
- * Create a bbstreamer that just writes data to a file.
+ * Create a astreamer that just writes data to a file.
*
* The caller must specify a pathname and may specify a file. The pathname is
* used for error-reporting purposes either way. If file is NULL, the pathname
@@ -74,14 +74,14 @@ static const bbstreamer_ops bbstreamer_extractor_ops = {
* for writing and closed when done. If file is not NULL, the data is written
* there.
*/
-bbstreamer *
-bbstreamer_plain_writer_new(char *pathname, FILE *file)
+astreamer *
+astreamer_plain_writer_new(char *pathname, FILE *file)
{
- bbstreamer_plain_writer *streamer;
+ astreamer_plain_writer *streamer;
- streamer = palloc0(sizeof(bbstreamer_plain_writer));
- *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
- &bbstreamer_plain_writer_ops;
+ streamer = palloc0(sizeof(astreamer_plain_writer));
+ *((const astreamer_ops **) &streamer->base.bbs_ops) =
+ &astreamer_plain_writer_ops;
streamer->pathname = pstrdup(pathname);
streamer->file = file;
@@ -101,13 +101,13 @@ bbstreamer_plain_writer_new(char *pathname, FILE *file)
* Write archive content to file.
*/
static void
-bbstreamer_plain_writer_content(bbstreamer *streamer,
- bbstreamer_member *member, const char *data,
- int len, bbstreamer_archive_context context)
+astreamer_plain_writer_content(astreamer *streamer,
+ astreamer_member *member, const char *data,
+ int len, astreamer_archive_context context)
{
- bbstreamer_plain_writer *mystreamer;
+ astreamer_plain_writer *mystreamer;
- mystreamer = (bbstreamer_plain_writer *) streamer;
+ mystreamer = (astreamer_plain_writer *) streamer;
if (len == 0)
return;
@@ -128,11 +128,11 @@ bbstreamer_plain_writer_content(bbstreamer *streamer,
* the file if we opened it, but not if the caller provided it.
*/
static void
-bbstreamer_plain_writer_finalize(bbstreamer *streamer)
+astreamer_plain_writer_finalize(astreamer *streamer)
{
- bbstreamer_plain_writer *mystreamer;
+ astreamer_plain_writer *mystreamer;
- mystreamer = (bbstreamer_plain_writer *) streamer;
+ mystreamer = (astreamer_plain_writer *) streamer;
if (mystreamer->should_close_file && fclose(mystreamer->file) != 0)
pg_fatal("could not close file \"%s\": %m",
@@ -143,14 +143,14 @@ bbstreamer_plain_writer_finalize(bbstreamer *streamer)
}
/*
- * Free memory associated with this bbstreamer.
+ * Free memory associated with this astreamer.
*/
static void
-bbstreamer_plain_writer_free(bbstreamer *streamer)
+astreamer_plain_writer_free(astreamer *streamer)
{
- bbstreamer_plain_writer *mystreamer;
+ astreamer_plain_writer *mystreamer;
- mystreamer = (bbstreamer_plain_writer *) streamer;
+ mystreamer = (astreamer_plain_writer *) streamer;
Assert(!mystreamer->should_close_file);
Assert(mystreamer->base.bbs_next == NULL);
@@ -160,13 +160,13 @@ bbstreamer_plain_writer_free(bbstreamer *streamer)
}
/*
- * Create a bbstreamer that extracts an archive.
+ * Create a astreamer that extracts an archive.
*
* All pathnames in the archive are interpreted relative to basepath.
*
- * Unlike e.g. bbstreamer_plain_writer_new() we can't do anything useful here
+ * Unlike e.g. astreamer_plain_writer_new() we can't do anything useful here
* with untyped chunks; we need typed chunks which follow the rules described
- * in bbstreamer.h. Assuming we have that, we don't need to worry about the
+ * in astreamer.h. Assuming we have that, we don't need to worry about the
* original archive format; it's enough to just look at the member information
* provided and write to the corresponding file.
*
@@ -179,16 +179,16 @@ bbstreamer_plain_writer_free(bbstreamer *streamer)
* new output file. The pathname to that file is passed as an argument. If
* NULL, the call is skipped.
*/
-bbstreamer *
-bbstreamer_extractor_new(const char *basepath,
- const char *(*link_map) (const char *),
- void (*report_output_file) (const char *))
+astreamer *
+astreamer_extractor_new(const char *basepath,
+ const char *(*link_map) (const char *),
+ void (*report_output_file) (const char *))
{
- bbstreamer_extractor *streamer;
+ astreamer_extractor *streamer;
- streamer = palloc0(sizeof(bbstreamer_extractor));
- *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
- &bbstreamer_extractor_ops;
+ streamer = palloc0(sizeof(astreamer_extractor));
+ *((const astreamer_ops **) &streamer->base.bbs_ops) =
+ &astreamer_extractor_ops;
streamer->basepath = pstrdup(basepath);
streamer->link_map = link_map;
streamer->report_output_file = report_output_file;
@@ -200,19 +200,19 @@ bbstreamer_extractor_new(const char *basepath,
* Extract archive contents to the filesystem.
*/
static void
-bbstreamer_extractor_content(bbstreamer *streamer, bbstreamer_member *member,
- const char *data, int len,
- bbstreamer_archive_context context)
+astreamer_extractor_content(astreamer *streamer, astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context)
{
- bbstreamer_extractor *mystreamer = (bbstreamer_extractor *) streamer;
+ astreamer_extractor *mystreamer = (astreamer_extractor *) streamer;
int fnamelen;
- Assert(member != NULL || context == BBSTREAMER_ARCHIVE_TRAILER);
- Assert(context != BBSTREAMER_UNKNOWN);
+ Assert(member != NULL || context == ASTREAMER_ARCHIVE_TRAILER);
+ Assert(context != ASTREAMER_UNKNOWN);
switch (context)
{
- case BBSTREAMER_MEMBER_HEADER:
+ case ASTREAMER_MEMBER_HEADER:
Assert(mystreamer->file == NULL);
/* Prepend basepath. */
@@ -245,7 +245,7 @@ bbstreamer_extractor_content(bbstreamer *streamer, bbstreamer_member *member,
mystreamer->report_output_file(mystreamer->filename);
break;
- case BBSTREAMER_MEMBER_CONTENTS:
+ case ASTREAMER_MEMBER_CONTENTS:
if (mystreamer->file == NULL)
break;
@@ -260,14 +260,14 @@ bbstreamer_extractor_content(bbstreamer *streamer, bbstreamer_member *member,
}
break;
- case BBSTREAMER_MEMBER_TRAILER:
+ case ASTREAMER_MEMBER_TRAILER:
if (mystreamer->file == NULL)
break;
fclose(mystreamer->file);
mystreamer->file = NULL;
break;
- case BBSTREAMER_ARCHIVE_TRAILER:
+ case ASTREAMER_ARCHIVE_TRAILER:
break;
default:
@@ -375,10 +375,10 @@ create_file_for_extract(const char *filename, mode_t mode)
* There's nothing to do here but sanity checking.
*/
static void
-bbstreamer_extractor_finalize(bbstreamer *streamer)
+astreamer_extractor_finalize(astreamer *streamer)
{
- bbstreamer_extractor *mystreamer PG_USED_FOR_ASSERTS_ONLY
- = (bbstreamer_extractor *) streamer;
+ astreamer_extractor *mystreamer PG_USED_FOR_ASSERTS_ONLY
+ = (astreamer_extractor *) streamer;
Assert(mystreamer->file == NULL);
}
@@ -387,9 +387,9 @@ bbstreamer_extractor_finalize(bbstreamer *streamer)
* Free memory.
*/
static void
-bbstreamer_extractor_free(bbstreamer *streamer)
+astreamer_extractor_free(astreamer *streamer)
{
- bbstreamer_extractor *mystreamer = (bbstreamer_extractor *) streamer;
+ astreamer_extractor *mystreamer = (astreamer_extractor *) streamer;
pfree(mystreamer->basepath);
pfree(mystreamer);
diff --git a/src/bin/pg_basebackup/bbstreamer_gzip.c b/src/bin/pg_basebackup/astreamer_gzip.c
index 0417fd9bc2c..6f7c27afbbc 100644
--- a/src/bin/pg_basebackup/bbstreamer_gzip.c
+++ b/src/bin/pg_basebackup/astreamer_gzip.c
@@ -1,11 +1,11 @@
/*-------------------------------------------------------------------------
*
- * bbstreamer_gzip.c
+ * astreamer_gzip.c
*
* Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
*
* IDENTIFICATION
- * src/bin/pg_basebackup/bbstreamer_gzip.c
+ * src/bin/pg_basebackup/astreamer_gzip.c
*-------------------------------------------------------------------------
*/
@@ -17,74 +17,74 @@
#include <zlib.h>
#endif
-#include "bbstreamer.h"
+#include "astreamer.h"
#include "common/file_perm.h"
#include "common/logging.h"
#include "common/string.h"
#ifdef HAVE_LIBZ
-typedef struct bbstreamer_gzip_writer
+typedef struct astreamer_gzip_writer
{
- bbstreamer base;
+ astreamer base;
char *pathname;
gzFile gzfile;
-} bbstreamer_gzip_writer;
+} astreamer_gzip_writer;
-typedef struct bbstreamer_gzip_decompressor
+typedef struct astreamer_gzip_decompressor
{
- bbstreamer base;
+ astreamer base;
z_stream zstream;
size_t bytes_written;
-} bbstreamer_gzip_decompressor;
-
-static void bbstreamer_gzip_writer_content(bbstreamer *streamer,
- bbstreamer_member *member,
- const char *data, int len,
- bbstreamer_archive_context context);
-static void bbstreamer_gzip_writer_finalize(bbstreamer *streamer);
-static void bbstreamer_gzip_writer_free(bbstreamer *streamer);
+} astreamer_gzip_decompressor;
+
+static void astreamer_gzip_writer_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context);
+static void astreamer_gzip_writer_finalize(astreamer *streamer);
+static void astreamer_gzip_writer_free(astreamer *streamer);
static const char *get_gz_error(gzFile gzf);
-static const bbstreamer_ops bbstreamer_gzip_writer_ops = {
- .content = bbstreamer_gzip_writer_content,
- .finalize = bbstreamer_gzip_writer_finalize,
- .free = bbstreamer_gzip_writer_free
+static const astreamer_ops astreamer_gzip_writer_ops = {
+ .content = astreamer_gzip_writer_content,
+ .finalize = astreamer_gzip_writer_finalize,
+ .free = astreamer_gzip_writer_free
};
-static void bbstreamer_gzip_decompressor_content(bbstreamer *streamer,
- bbstreamer_member *member,
- const char *data, int len,
- bbstreamer_archive_context context);
-static void bbstreamer_gzip_decompressor_finalize(bbstreamer *streamer);
-static void bbstreamer_gzip_decompressor_free(bbstreamer *streamer);
+static void astreamer_gzip_decompressor_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context);
+static void astreamer_gzip_decompressor_finalize(astreamer *streamer);
+static void astreamer_gzip_decompressor_free(astreamer *streamer);
static void *gzip_palloc(void *opaque, unsigned items, unsigned size);
static void gzip_pfree(void *opaque, void *address);
-static const bbstreamer_ops bbstreamer_gzip_decompressor_ops = {
- .content = bbstreamer_gzip_decompressor_content,
- .finalize = bbstreamer_gzip_decompressor_finalize,
- .free = bbstreamer_gzip_decompressor_free
+static const astreamer_ops astreamer_gzip_decompressor_ops = {
+ .content = astreamer_gzip_decompressor_content,
+ .finalize = astreamer_gzip_decompressor_finalize,
+ .free = astreamer_gzip_decompressor_free
};
#endif
/*
- * Create a bbstreamer that just compresses data using gzip, and then writes
+ * Create a astreamer that just compresses data using gzip, and then writes
* it to a file.
*
- * As in the case of bbstreamer_plain_writer_new, pathname is always used
+ * As in the case of astreamer_plain_writer_new, pathname is always used
* for error reporting purposes; if file is NULL, it is also the opened and
* closed so that the data may be written there.
*/
-bbstreamer *
-bbstreamer_gzip_writer_new(char *pathname, FILE *file,
- pg_compress_specification *compress)
+astreamer *
+astreamer_gzip_writer_new(char *pathname, FILE *file,
+ pg_compress_specification *compress)
{
#ifdef HAVE_LIBZ
- bbstreamer_gzip_writer *streamer;
+ astreamer_gzip_writer *streamer;
- streamer = palloc0(sizeof(bbstreamer_gzip_writer));
- *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
- &bbstreamer_gzip_writer_ops;
+ streamer = palloc0(sizeof(astreamer_gzip_writer));
+ *((const astreamer_ops **) &streamer->base.bbs_ops) =
+ &astreamer_gzip_writer_ops;
streamer->pathname = pstrdup(pathname);
@@ -123,13 +123,13 @@ bbstreamer_gzip_writer_new(char *pathname, FILE *file,
* Write archive content to gzip file.
*/
static void
-bbstreamer_gzip_writer_content(bbstreamer *streamer,
- bbstreamer_member *member, const char *data,
- int len, bbstreamer_archive_context context)
+astreamer_gzip_writer_content(astreamer *streamer,
+ astreamer_member *member, const char *data,
+ int len, astreamer_archive_context context)
{
- bbstreamer_gzip_writer *mystreamer;
+ astreamer_gzip_writer *mystreamer;
- mystreamer = (bbstreamer_gzip_writer *) streamer;
+ mystreamer = (astreamer_gzip_writer *) streamer;
if (len == 0)
return;
@@ -151,16 +151,16 @@ bbstreamer_gzip_writer_content(bbstreamer *streamer,
*
* It makes no difference whether we opened the file or the caller did it,
* because libz provides no way of avoiding a close on the underlying file
- * handle. Notice, however, that bbstreamer_gzip_writer_new() uses dup() to
+ * handle. Notice, however, that astreamer_gzip_writer_new() uses dup() to
* work around this issue, so that the behavior from the caller's viewpoint
- * is the same as for bbstreamer_plain_writer.
+ * is the same as for astreamer_plain_writer.
*/
static void
-bbstreamer_gzip_writer_finalize(bbstreamer *streamer)
+astreamer_gzip_writer_finalize(astreamer *streamer)
{
- bbstreamer_gzip_writer *mystreamer;
+ astreamer_gzip_writer *mystreamer;
- mystreamer = (bbstreamer_gzip_writer *) streamer;
+ mystreamer = (astreamer_gzip_writer *) streamer;
errno = 0; /* in case gzclose() doesn't set it */
if (gzclose(mystreamer->gzfile) != 0)
@@ -171,14 +171,14 @@ bbstreamer_gzip_writer_finalize(bbstreamer *streamer)
}
/*
- * Free memory associated with this bbstreamer.
+ * Free memory associated with this astreamer.
*/
static void
-bbstreamer_gzip_writer_free(bbstreamer *streamer)
+astreamer_gzip_writer_free(astreamer *streamer)
{
- bbstreamer_gzip_writer *mystreamer;
+ astreamer_gzip_writer *mystreamer;
- mystreamer = (bbstreamer_gzip_writer *) streamer;
+ mystreamer = (astreamer_gzip_writer *) streamer;
Assert(mystreamer->base.bbs_next == NULL);
Assert(mystreamer->gzfile == NULL);
@@ -208,18 +208,18 @@ get_gz_error(gzFile gzf)
* Create a new base backup streamer that performs decompression of gzip
* compressed blocks.
*/
-bbstreamer *
-bbstreamer_gzip_decompressor_new(bbstreamer *next)
+astreamer *
+astreamer_gzip_decompressor_new(astreamer *next)
{
#ifdef HAVE_LIBZ
- bbstreamer_gzip_decompressor *streamer;
+ astreamer_gzip_decompressor *streamer;
z_stream *zs;
Assert(next != NULL);
- streamer = palloc0(sizeof(bbstreamer_gzip_decompressor));
- *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
- &bbstreamer_gzip_decompressor_ops;
+ streamer = palloc0(sizeof(astreamer_gzip_decompressor));
+ *((const astreamer_ops **) &streamer->base.bbs_ops) =
+ &astreamer_gzip_decompressor_ops;
streamer->base.bbs_next = next;
initStringInfo(&streamer->base.bbs_buffer);
@@ -258,15 +258,15 @@ bbstreamer_gzip_decompressor_new(bbstreamer *next)
* to the next streamer.
*/
static void
-bbstreamer_gzip_decompressor_content(bbstreamer *streamer,
- bbstreamer_member *member,
- const char *data, int len,
- bbstreamer_archive_context context)
+astreamer_gzip_decompressor_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context)
{
- bbstreamer_gzip_decompressor *mystreamer;
+ astreamer_gzip_decompressor *mystreamer;
z_stream *zs;
- mystreamer = (bbstreamer_gzip_decompressor *) streamer;
+ mystreamer = (astreamer_gzip_decompressor *) streamer;
zs = &mystreamer->zstream;
zs->next_in = (const uint8 *) data;
@@ -301,9 +301,9 @@ bbstreamer_gzip_decompressor_content(bbstreamer *streamer,
/* If output buffer is full then pass data to next streamer */
if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
{
- bbstreamer_content(mystreamer->base.bbs_next, member,
- mystreamer->base.bbs_buffer.data,
- mystreamer->base.bbs_buffer.maxlen, context);
+ astreamer_content(mystreamer->base.bbs_next, member,
+ mystreamer->base.bbs_buffer.data,
+ mystreamer->base.bbs_buffer.maxlen, context);
mystreamer->bytes_written = 0;
}
}
@@ -313,31 +313,31 @@ bbstreamer_gzip_decompressor_content(bbstreamer *streamer,
* End-of-stream processing.
*/
static void
-bbstreamer_gzip_decompressor_finalize(bbstreamer *streamer)
+astreamer_gzip_decompressor_finalize(astreamer *streamer)
{
- bbstreamer_gzip_decompressor *mystreamer;
+ astreamer_gzip_decompressor *mystreamer;
- mystreamer = (bbstreamer_gzip_decompressor *) streamer;
+ mystreamer = (astreamer_gzip_decompressor *) streamer;
/*
* End of the stream, if there is some pending data in output buffers then
* we must forward it to next streamer.
*/
- bbstreamer_content(mystreamer->base.bbs_next, NULL,
- mystreamer->base.bbs_buffer.data,
- mystreamer->base.bbs_buffer.maxlen,
- BBSTREAMER_UNKNOWN);
+ astreamer_content(mystreamer->base.bbs_next, NULL,
+ mystreamer->base.bbs_buffer.data,
+ mystreamer->base.bbs_buffer.maxlen,
+ ASTREAMER_UNKNOWN);
- bbstreamer_finalize(mystreamer->base.bbs_next);
+ astreamer_finalize(mystreamer->base.bbs_next);
}
/*
* Free memory.
*/
static void
-bbstreamer_gzip_decompressor_free(bbstreamer *streamer)
+astreamer_gzip_decompressor_free(astreamer *streamer)
{
- bbstreamer_free(streamer->bbs_next);
+ astreamer_free(streamer->bbs_next);
pfree(streamer->bbs_buffer.data);
pfree(streamer);
}
diff --git a/src/bin/pg_basebackup/bbstreamer_inject.c b/src/bin/pg_basebackup/astreamer_inject.c
index 194026b56e9..7f1decded8d 100644
--- a/src/bin/pg_basebackup/bbstreamer_inject.c
+++ b/src/bin/pg_basebackup/astreamer_inject.c
@@ -1,51 +1,51 @@
/*-------------------------------------------------------------------------
*
- * bbstreamer_inject.c
+ * astreamer_inject.c
*
* Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
*
* IDENTIFICATION
- * src/bin/pg_basebackup/bbstreamer_inject.c
+ * src/bin/pg_basebackup/astreamer_inject.c
*-------------------------------------------------------------------------
*/
#include "postgres_fe.h"
-#include "bbstreamer.h"
+#include "astreamer.h"
#include "common/file_perm.h"
#include "common/logging.h"
-typedef struct bbstreamer_recovery_injector
+typedef struct astreamer_recovery_injector
{
- bbstreamer base;
+ astreamer base;
bool skip_file;
bool is_recovery_guc_supported;
bool is_postgresql_auto_conf;
bool found_postgresql_auto_conf;
PQExpBuffer recoveryconfcontents;
- bbstreamer_member member;
-} bbstreamer_recovery_injector;
-
-static void bbstreamer_recovery_injector_content(bbstreamer *streamer,
- bbstreamer_member *member,
- const char *data, int len,
- bbstreamer_archive_context context);
-static void bbstreamer_recovery_injector_finalize(bbstreamer *streamer);
-static void bbstreamer_recovery_injector_free(bbstreamer *streamer);
-
-static const bbstreamer_ops bbstreamer_recovery_injector_ops = {
- .content = bbstreamer_recovery_injector_content,
- .finalize = bbstreamer_recovery_injector_finalize,
- .free = bbstreamer_recovery_injector_free
+ astreamer_member member;
+} astreamer_recovery_injector;
+
+static void astreamer_recovery_injector_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context);
+static void astreamer_recovery_injector_finalize(astreamer *streamer);
+static void astreamer_recovery_injector_free(astreamer *streamer);
+
+static const astreamer_ops astreamer_recovery_injector_ops = {
+ .content = astreamer_recovery_injector_content,
+ .finalize = astreamer_recovery_injector_finalize,
+ .free = astreamer_recovery_injector_free
};
/*
- * Create a bbstreamer that can edit recoverydata into an archive stream.
+ * Create a astreamer that can edit recoverydata into an archive stream.
*
- * The input should be a series of typed chunks (not BBSTREAMER_UNKNOWN) as
- * per the conventions described in bbstreamer.h; the chunks forwarded to
- * the next bbstreamer will be similarly typed, but the
- * BBSTREAMER_MEMBER_HEADER chunks may be zero-length in cases where we've
+ * The input should be a series of typed chunks (not ASTREAMER_UNKNOWN) as
+ * per the conventions described in astreamer.h; the chunks forwarded to
+ * the next astreamer will be similarly typed, but the
+ * ASTREAMER_MEMBER_HEADER chunks may be zero-length in cases where we've
* edited the archive stream.
*
* Our goal is to do one of the following three things with the content passed
@@ -61,16 +61,16 @@ static const bbstreamer_ops bbstreamer_recovery_injector_ops = {
* zero-length standby.signal file, dropping any file with that name from
* the archive.
*/
-bbstreamer *
-bbstreamer_recovery_injector_new(bbstreamer *next,
- bool is_recovery_guc_supported,
- PQExpBuffer recoveryconfcontents)
+astreamer *
+astreamer_recovery_injector_new(astreamer *next,
+ bool is_recovery_guc_supported,
+ PQExpBuffer recoveryconfcontents)
{
- bbstreamer_recovery_injector *streamer;
+ astreamer_recovery_injector *streamer;
- streamer = palloc0(sizeof(bbstreamer_recovery_injector));
- *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
- &bbstreamer_recovery_injector_ops;
+ streamer = palloc0(sizeof(astreamer_recovery_injector));
+ *((const astreamer_ops **) &streamer->base.bbs_ops) =
+ &astreamer_recovery_injector_ops;
streamer->base.bbs_next = next;
streamer->is_recovery_guc_supported = is_recovery_guc_supported;
streamer->recoveryconfcontents = recoveryconfcontents;
@@ -82,21 +82,21 @@ bbstreamer_recovery_injector_new(bbstreamer *next,
* Handle each chunk of tar content while injecting recovery configuration.
*/
static void
-bbstreamer_recovery_injector_content(bbstreamer *streamer,
- bbstreamer_member *member,
- const char *data, int len,
- bbstreamer_archive_context context)
+astreamer_recovery_injector_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context)
{
- bbstreamer_recovery_injector *mystreamer;
+ astreamer_recovery_injector *mystreamer;
- mystreamer = (bbstreamer_recovery_injector *) streamer;
- Assert(member != NULL || context == BBSTREAMER_ARCHIVE_TRAILER);
+ mystreamer = (astreamer_recovery_injector *) streamer;
+ Assert(member != NULL || context == ASTREAMER_ARCHIVE_TRAILER);
switch (context)
{
- case BBSTREAMER_MEMBER_HEADER:
+ case ASTREAMER_MEMBER_HEADER:
/* Must copy provided data so we have the option to modify it. */
- memcpy(&mystreamer->member, member, sizeof(bbstreamer_member));
+ memcpy(&mystreamer->member, member, sizeof(astreamer_member));
/*
* On v12+, skip standby.signal and edit postgresql.auto.conf; on
@@ -119,8 +119,8 @@ bbstreamer_recovery_injector_content(bbstreamer *streamer,
/*
* Zap data and len because the archive header is no
- * longer valid; some subsequent bbstreamer must
- * regenerate it if it's necessary.
+ * longer valid; some subsequent astreamer must regenerate
+ * it if it's necessary.
*/
data = NULL;
len = 0;
@@ -135,26 +135,26 @@ bbstreamer_recovery_injector_content(bbstreamer *streamer,
return;
break;
- case BBSTREAMER_MEMBER_CONTENTS:
+ case ASTREAMER_MEMBER_CONTENTS:
/* Do not forward if the file is to be skipped. */
if (mystreamer->skip_file)
return;
break;
- case BBSTREAMER_MEMBER_TRAILER:
+ case ASTREAMER_MEMBER_TRAILER:
/* Do not forward it the file is to be skipped. */
if (mystreamer->skip_file)
return;
/* Append provided content to whatever we already sent. */
if (mystreamer->is_postgresql_auto_conf)
- bbstreamer_content(mystreamer->base.bbs_next, member,
- mystreamer->recoveryconfcontents->data,
- mystreamer->recoveryconfcontents->len,
- BBSTREAMER_MEMBER_CONTENTS);
+ astreamer_content(mystreamer->base.bbs_next, member,
+ mystreamer->recoveryconfcontents->data,
+ mystreamer->recoveryconfcontents->len,
+ ASTREAMER_MEMBER_CONTENTS);
break;
- case BBSTREAMER_ARCHIVE_TRAILER:
+ case ASTREAMER_ARCHIVE_TRAILER:
if (mystreamer->is_recovery_guc_supported)
{
/*
@@ -163,22 +163,22 @@ bbstreamer_recovery_injector_content(bbstreamer *streamer,
* member now.
*/
if (!mystreamer->found_postgresql_auto_conf)
- bbstreamer_inject_file(mystreamer->base.bbs_next,
- "postgresql.auto.conf",
- mystreamer->recoveryconfcontents->data,
- mystreamer->recoveryconfcontents->len);
+ astreamer_inject_file(mystreamer->base.bbs_next,
+ "postgresql.auto.conf",
+ mystreamer->recoveryconfcontents->data,
+ mystreamer->recoveryconfcontents->len);
/* Inject empty standby.signal file. */
- bbstreamer_inject_file(mystreamer->base.bbs_next,
- "standby.signal", "", 0);
+ astreamer_inject_file(mystreamer->base.bbs_next,
+ "standby.signal", "", 0);
}
else
{
/* Inject recovery.conf file with specified contents. */
- bbstreamer_inject_file(mystreamer->base.bbs_next,
- "recovery.conf",
- mystreamer->recoveryconfcontents->data,
- mystreamer->recoveryconfcontents->len);
+ astreamer_inject_file(mystreamer->base.bbs_next,
+ "recovery.conf",
+ mystreamer->recoveryconfcontents->data,
+ mystreamer->recoveryconfcontents->len);
}
/* Nothing to do here. */
@@ -189,26 +189,26 @@ bbstreamer_recovery_injector_content(bbstreamer *streamer,
pg_fatal("unexpected state while injecting recovery settings");
}
- bbstreamer_content(mystreamer->base.bbs_next, &mystreamer->member,
- data, len, context);
+ astreamer_content(mystreamer->base.bbs_next, &mystreamer->member,
+ data, len, context);
}
/*
- * End-of-stream processing for this bbstreamer.
+ * End-of-stream processing for this astreamer.
*/
static void
-bbstreamer_recovery_injector_finalize(bbstreamer *streamer)
+astreamer_recovery_injector_finalize(astreamer *streamer)
{
- bbstreamer_finalize(streamer->bbs_next);
+ astreamer_finalize(streamer->bbs_next);
}
/*
- * Free memory associated with this bbstreamer.
+ * Free memory associated with this astreamer.
*/
static void
-bbstreamer_recovery_injector_free(bbstreamer *streamer)
+astreamer_recovery_injector_free(astreamer *streamer)
{
- bbstreamer_free(streamer->bbs_next);
+ astreamer_free(streamer->bbs_next);
pfree(streamer);
}
@@ -216,10 +216,10 @@ bbstreamer_recovery_injector_free(bbstreamer *streamer)
* Inject a member into the archive with specified contents.
*/
void
-bbstreamer_inject_file(bbstreamer *streamer, char *pathname, char *data,
- int len)
+astreamer_inject_file(astreamer *streamer, char *pathname, char *data,
+ int len)
{
- bbstreamer_member member;
+ astreamer_member member;
strlcpy(member.pathname, pathname, MAXPGPATH);
member.size = len;
@@ -238,12 +238,12 @@ bbstreamer_inject_file(bbstreamer *streamer, char *pathname, char *data,
/*
* We don't know here how to generate valid member headers and trailers
* for the archiving format in use, so if those are needed, some successor
- * bbstreamer will have to generate them using the data from 'member'.
+ * astreamer will have to generate them using the data from 'member'.
*/
- bbstreamer_content(streamer, &member, NULL, 0,
- BBSTREAMER_MEMBER_HEADER);
- bbstreamer_content(streamer, &member, data, len,
- BBSTREAMER_MEMBER_CONTENTS);
- bbstreamer_content(streamer, &member, NULL, 0,
- BBSTREAMER_MEMBER_TRAILER);
+ astreamer_content(streamer, &member, NULL, 0,
+ ASTREAMER_MEMBER_HEADER);
+ astreamer_content(streamer, &member, data, len,
+ ASTREAMER_MEMBER_CONTENTS);
+ astreamer_content(streamer, &member, NULL, 0,
+ ASTREAMER_MEMBER_TRAILER);
}
diff --git a/src/bin/pg_basebackup/bbstreamer_lz4.c b/src/bin/pg_basebackup/astreamer_lz4.c
index f5c9e68150c..1c40d7d8ad5 100644
--- a/src/bin/pg_basebackup/bbstreamer_lz4.c
+++ b/src/bin/pg_basebackup/astreamer_lz4.c
@@ -1,11 +1,11 @@
/*-------------------------------------------------------------------------
*
- * bbstreamer_lz4.c
+ * astreamer_lz4.c
*
* Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
*
* IDENTIFICATION
- * src/bin/pg_basebackup/bbstreamer_lz4.c
+ * src/bin/pg_basebackup/astreamer_lz4.c
*-------------------------------------------------------------------------
*/
@@ -17,15 +17,15 @@
#include <lz4frame.h>
#endif
-#include "bbstreamer.h"
+#include "astreamer.h"
#include "common/file_perm.h"
#include "common/logging.h"
#include "common/string.h"
#ifdef USE_LZ4
-typedef struct bbstreamer_lz4_frame
+typedef struct astreamer_lz4_frame
{
- bbstreamer base;
+ astreamer base;
LZ4F_compressionContext_t cctx;
LZ4F_decompressionContext_t dctx;
@@ -33,32 +33,32 @@ typedef struct bbstreamer_lz4_frame
size_t bytes_written;
bool header_written;
-} bbstreamer_lz4_frame;
-
-static void bbstreamer_lz4_compressor_content(bbstreamer *streamer,
- bbstreamer_member *member,
- const char *data, int len,
- bbstreamer_archive_context context);
-static void bbstreamer_lz4_compressor_finalize(bbstreamer *streamer);
-static void bbstreamer_lz4_compressor_free(bbstreamer *streamer);
-
-static const bbstreamer_ops bbstreamer_lz4_compressor_ops = {
- .content = bbstreamer_lz4_compressor_content,
- .finalize = bbstreamer_lz4_compressor_finalize,
- .free = bbstreamer_lz4_compressor_free
+} astreamer_lz4_frame;
+
+static void astreamer_lz4_compressor_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context);
+static void astreamer_lz4_compressor_finalize(astreamer *streamer);
+static void astreamer_lz4_compressor_free(astreamer *streamer);
+
+static const astreamer_ops astreamer_lz4_compressor_ops = {
+ .content = astreamer_lz4_compressor_content,
+ .finalize = astreamer_lz4_compressor_finalize,
+ .free = astreamer_lz4_compressor_free
};
-static void bbstreamer_lz4_decompressor_content(bbstreamer *streamer,
- bbstreamer_member *member,
- const char *data, int len,
- bbstreamer_archive_context context);
-static void bbstreamer_lz4_decompressor_finalize(bbstreamer *streamer);
-static void bbstreamer_lz4_decompressor_free(bbstreamer *streamer);
-
-static const bbstreamer_ops bbstreamer_lz4_decompressor_ops = {
- .content = bbstreamer_lz4_decompressor_content,
- .finalize = bbstreamer_lz4_decompressor_finalize,
- .free = bbstreamer_lz4_decompressor_free
+static void astreamer_lz4_decompressor_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context);
+static void astreamer_lz4_decompressor_finalize(astreamer *streamer);
+static void astreamer_lz4_decompressor_free(astreamer *streamer);
+
+static const astreamer_ops astreamer_lz4_decompressor_ops = {
+ .content = astreamer_lz4_decompressor_content,
+ .finalize = astreamer_lz4_decompressor_finalize,
+ .free = astreamer_lz4_decompressor_free
};
#endif
@@ -66,19 +66,19 @@ static const bbstreamer_ops bbstreamer_lz4_decompressor_ops = {
* Create a new base backup streamer that performs lz4 compression of tar
* blocks.
*/
-bbstreamer *
-bbstreamer_lz4_compressor_new(bbstreamer *next, pg_compress_specification *compress)
+astreamer *
+astreamer_lz4_compressor_new(astreamer *next, pg_compress_specification *compress)
{
#ifdef USE_LZ4
- bbstreamer_lz4_frame *streamer;
+ astreamer_lz4_frame *streamer;
LZ4F_errorCode_t ctxError;
LZ4F_preferences_t *prefs;
Assert(next != NULL);
- streamer = palloc0(sizeof(bbstreamer_lz4_frame));
- *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
- &bbstreamer_lz4_compressor_ops;
+ streamer = palloc0(sizeof(astreamer_lz4_frame));
+ *((const astreamer_ops **) &streamer->base.bbs_ops) =
+ &astreamer_lz4_compressor_ops;
streamer->base.bbs_next = next;
initStringInfo(&streamer->base.bbs_buffer);
@@ -113,19 +113,19 @@ bbstreamer_lz4_compressor_new(bbstreamer *next, pg_compress_specification *compr
* of output buffer to next streamer and empty the buffer.
*/
static void
-bbstreamer_lz4_compressor_content(bbstreamer *streamer,
- bbstreamer_member *member,
- const char *data, int len,
- bbstreamer_archive_context context)
+astreamer_lz4_compressor_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context)
{
- bbstreamer_lz4_frame *mystreamer;
+ astreamer_lz4_frame *mystreamer;
uint8 *next_in,
*next_out;
size_t out_bound,
compressed_size,
avail_out;
- mystreamer = (bbstreamer_lz4_frame *) streamer;
+ mystreamer = (astreamer_lz4_frame *) streamer;
next_in = (uint8 *) data;
/* Write header before processing the first input chunk. */
@@ -159,10 +159,10 @@ bbstreamer_lz4_compressor_content(bbstreamer *streamer,
out_bound = LZ4F_compressBound(len, &mystreamer->prefs);
if (avail_out < out_bound)
{
- bbstreamer_content(mystreamer->base.bbs_next, member,
- mystreamer->base.bbs_buffer.data,
- mystreamer->bytes_written,
- context);
+ astreamer_content(mystreamer->base.bbs_next, member,
+ mystreamer->base.bbs_buffer.data,
+ mystreamer->bytes_written,
+ context);
/* Enlarge buffer if it falls short of out bound. */
if (mystreamer->base.bbs_buffer.maxlen < out_bound)
@@ -196,25 +196,25 @@ bbstreamer_lz4_compressor_content(bbstreamer *streamer,
* End-of-stream processing.
*/
static void
-bbstreamer_lz4_compressor_finalize(bbstreamer *streamer)
+astreamer_lz4_compressor_finalize(astreamer *streamer)
{
- bbstreamer_lz4_frame *mystreamer;
+ astreamer_lz4_frame *mystreamer;
uint8 *next_out;
size_t footer_bound,
compressed_size,
avail_out;
- mystreamer = (bbstreamer_lz4_frame *) streamer;
+ mystreamer = (astreamer_lz4_frame *) streamer;
/* Find out the footer bound and update the output buffer. */
footer_bound = LZ4F_compressBound(0, &mystreamer->prefs);
if ((mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written) <
footer_bound)
{
- bbstreamer_content(mystreamer->base.bbs_next, NULL,
- mystreamer->base.bbs_buffer.data,
- mystreamer->bytes_written,
- BBSTREAMER_UNKNOWN);
+ astreamer_content(mystreamer->base.bbs_next, NULL,
+ mystreamer->base.bbs_buffer.data,
+ mystreamer->bytes_written,
+ ASTREAMER_UNKNOWN);
/* Enlarge buffer if it falls short of footer bound. */
if (mystreamer->base.bbs_buffer.maxlen < footer_bound)
@@ -243,24 +243,24 @@ bbstreamer_lz4_compressor_finalize(bbstreamer *streamer)
mystreamer->bytes_written += compressed_size;
- bbstreamer_content(mystreamer->base.bbs_next, NULL,
- mystreamer->base.bbs_buffer.data,
- mystreamer->bytes_written,
- BBSTREAMER_UNKNOWN);
+ astreamer_content(mystreamer->base.bbs_next, NULL,
+ mystreamer->base.bbs_buffer.data,
+ mystreamer->bytes_written,
+ ASTREAMER_UNKNOWN);
- bbstreamer_finalize(mystreamer->base.bbs_next);
+ astreamer_finalize(mystreamer->base.bbs_next);
}
/*
* Free memory.
*/
static void
-bbstreamer_lz4_compressor_free(bbstreamer *streamer)
+astreamer_lz4_compressor_free(astreamer *streamer)
{
- bbstreamer_lz4_frame *mystreamer;
+ astreamer_lz4_frame *mystreamer;
- mystreamer = (bbstreamer_lz4_frame *) streamer;
- bbstreamer_free(streamer->bbs_next);
+ mystreamer = (astreamer_lz4_frame *) streamer;
+ astreamer_free(streamer->bbs_next);
LZ4F_freeCompressionContext(mystreamer->cctx);
pfree(streamer->bbs_buffer.data);
pfree(streamer);
@@ -271,18 +271,18 @@ bbstreamer_lz4_compressor_free(bbstreamer *streamer)
* Create a new base backup streamer that performs decompression of lz4
* compressed blocks.
*/
-bbstreamer *
-bbstreamer_lz4_decompressor_new(bbstreamer *next)
+astreamer *
+astreamer_lz4_decompressor_new(astreamer *next)
{
#ifdef USE_LZ4
- bbstreamer_lz4_frame *streamer;
+ astreamer_lz4_frame *streamer;
LZ4F_errorCode_t ctxError;
Assert(next != NULL);
- streamer = palloc0(sizeof(bbstreamer_lz4_frame));
- *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
- &bbstreamer_lz4_decompressor_ops;
+ streamer = palloc0(sizeof(astreamer_lz4_frame));
+ *((const astreamer_ops **) &streamer->base.bbs_ops) =
+ &astreamer_lz4_decompressor_ops;
streamer->base.bbs_next = next;
initStringInfo(&streamer->base.bbs_buffer);
@@ -307,18 +307,18 @@ bbstreamer_lz4_decompressor_new(bbstreamer *next)
* to the next streamer.
*/
static void
-bbstreamer_lz4_decompressor_content(bbstreamer *streamer,
- bbstreamer_member *member,
- const char *data, int len,
- bbstreamer_archive_context context)
+astreamer_lz4_decompressor_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context)
{
- bbstreamer_lz4_frame *mystreamer;
+ astreamer_lz4_frame *mystreamer;
uint8 *next_in,
*next_out;
size_t avail_in,
avail_out;
- mystreamer = (bbstreamer_lz4_frame *) streamer;
+ mystreamer = (astreamer_lz4_frame *) streamer;
next_in = (uint8 *) data;
next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
avail_in = len;
@@ -366,10 +366,10 @@ bbstreamer_lz4_decompressor_content(bbstreamer *streamer,
*/
if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
{
- bbstreamer_content(mystreamer->base.bbs_next, member,
- mystreamer->base.bbs_buffer.data,
- mystreamer->base.bbs_buffer.maxlen,
- context);
+ astreamer_content(mystreamer->base.bbs_next, member,
+ mystreamer->base.bbs_buffer.data,
+ mystreamer->base.bbs_buffer.maxlen,
+ context);
avail_out = mystreamer->base.bbs_buffer.maxlen;
mystreamer->bytes_written = 0;
@@ -387,34 +387,34 @@ bbstreamer_lz4_decompressor_content(bbstreamer *streamer,
* End-of-stream processing.
*/
static void
-bbstreamer_lz4_decompressor_finalize(bbstreamer *streamer)
+astreamer_lz4_decompressor_finalize(astreamer *streamer)
{
- bbstreamer_lz4_frame *mystreamer;
+ astreamer_lz4_frame *mystreamer;
- mystreamer = (bbstreamer_lz4_frame *) streamer;
+ mystreamer = (astreamer_lz4_frame *) streamer;
/*
* End of the stream, if there is some pending data in output buffers then
* we must forward it to next streamer.
*/
- bbstreamer_content(mystreamer->base.bbs_next, NULL,
- mystreamer->base.bbs_buffer.data,
- mystreamer->base.bbs_buffer.maxlen,
- BBSTREAMER_UNKNOWN);
+ astreamer_content(mystreamer->base.bbs_next, NULL,
+ mystreamer->base.bbs_buffer.data,
+ mystreamer->base.bbs_buffer.maxlen,
+ ASTREAMER_UNKNOWN);
- bbstreamer_finalize(mystreamer->base.bbs_next);
+ astreamer_finalize(mystreamer->base.bbs_next);
}
/*
* Free memory.
*/
static void
-bbstreamer_lz4_decompressor_free(bbstreamer *streamer)
+astreamer_lz4_decompressor_free(astreamer *streamer)
{
- bbstreamer_lz4_frame *mystreamer;
+ astreamer_lz4_frame *mystreamer;
- mystreamer = (bbstreamer_lz4_frame *) streamer;
- bbstreamer_free(streamer->bbs_next);
+ mystreamer = (astreamer_lz4_frame *) streamer;
+ astreamer_free(streamer->bbs_next);
LZ4F_freeDecompressionContext(mystreamer->dctx);
pfree(streamer->bbs_buffer.data);
pfree(streamer);
diff --git a/src/bin/pg_basebackup/bbstreamer_tar.c b/src/bin/pg_basebackup/astreamer_tar.c
index 9137d17ddc1..673690cd18f 100644
--- a/src/bin/pg_basebackup/bbstreamer_tar.c
+++ b/src/bin/pg_basebackup/astreamer_tar.c
@@ -1,13 +1,13 @@
/*-------------------------------------------------------------------------
*
- * bbstreamer_tar.c
+ * astreamer_tar.c
*
* This module implements three types of tar processing. A tar parser
- * expects unlabelled chunks of data (e.g. BBSTREAMER_UNKNOWN) and splits
- * it into labelled chunks (any other value of bbstreamer_archive_context).
+ * expects unlabelled chunks of data (e.g. ASTREAMER_UNKNOWN) and splits
+ * it into labelled chunks (any other value of astreamer_archive_context).
* A tar archiver does the reverse: it takes a bunch of labelled chunks
* and produces a tarfile, optionally replacing member headers and trailers
- * so that upstream bbstreamer objects can perform surgery on the tarfile
+ * so that upstream astreamer objects can perform surgery on the tarfile
* contents without knowing the details of the tar format. A tar terminator
* just adds two blocks of NUL bytes to the end of the file, since older
* server versions produce files with this terminator omitted.
@@ -15,7 +15,7 @@
* Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
*
* IDENTIFICATION
- * src/bin/pg_basebackup/bbstreamer_tar.c
+ * src/bin/pg_basebackup/astreamer_tar.c
*-------------------------------------------------------------------------
*/
@@ -23,83 +23,83 @@
#include <time.h>
-#include "bbstreamer.h"
+#include "astreamer.h"
#include "common/logging.h"
#include "pgtar.h"
-typedef struct bbstreamer_tar_parser
+typedef struct astreamer_tar_parser
{
- bbstreamer base;
- bbstreamer_archive_context next_context;
- bbstreamer_member member;
+ astreamer base;
+ astreamer_archive_context next_context;
+ astreamer_member member;
size_t file_bytes_sent;
size_t pad_bytes_expected;
-} bbstreamer_tar_parser;
+} astreamer_tar_parser;
-typedef struct bbstreamer_tar_archiver
+typedef struct astreamer_tar_archiver
{
- bbstreamer base;
+ astreamer base;
bool rearchive_member;
-} bbstreamer_tar_archiver;
-
-static void bbstreamer_tar_parser_content(bbstreamer *streamer,
- bbstreamer_member *member,
- const char *data, int len,
- bbstreamer_archive_context context);
-static void bbstreamer_tar_parser_finalize(bbstreamer *streamer);
-static void bbstreamer_tar_parser_free(bbstreamer *streamer);
-static bool bbstreamer_tar_header(bbstreamer_tar_parser *mystreamer);
-
-static const bbstreamer_ops bbstreamer_tar_parser_ops = {
- .content = bbstreamer_tar_parser_content,
- .finalize = bbstreamer_tar_parser_finalize,
- .free = bbstreamer_tar_parser_free
+} astreamer_tar_archiver;
+
+static void astreamer_tar_parser_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context);
+static void astreamer_tar_parser_finalize(astreamer *streamer);
+static void astreamer_tar_parser_free(astreamer *streamer);
+static bool astreamer_tar_header(astreamer_tar_parser *mystreamer);
+
+static const astreamer_ops astreamer_tar_parser_ops = {
+ .content = astreamer_tar_parser_content,
+ .finalize = astreamer_tar_parser_finalize,
+ .free = astreamer_tar_parser_free
};
-static void bbstreamer_tar_archiver_content(bbstreamer *streamer,
- bbstreamer_member *member,
- const char *data, int len,
- bbstreamer_archive_context context);
-static void bbstreamer_tar_archiver_finalize(bbstreamer *streamer);
-static void bbstreamer_tar_archiver_free(bbstreamer *streamer);
-
-static const bbstreamer_ops bbstreamer_tar_archiver_ops = {
- .content = bbstreamer_tar_archiver_content,
- .finalize = bbstreamer_tar_archiver_finalize,
- .free = bbstreamer_tar_archiver_free
+static void astreamer_tar_archiver_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context);
+static void astreamer_tar_archiver_finalize(astreamer *streamer);
+static void astreamer_tar_archiver_free(astreamer *streamer);
+
+static const astreamer_ops astreamer_tar_archiver_ops = {
+ .content = astreamer_tar_archiver_content,
+ .finalize = astreamer_tar_archiver_finalize,
+ .free = astreamer_tar_archiver_free
};
-static void bbstreamer_tar_terminator_content(bbstreamer *streamer,
- bbstreamer_member *member,
- const char *data, int len,
- bbstreamer_archive_context context);
-static void bbstreamer_tar_terminator_finalize(bbstreamer *streamer);
-static void bbstreamer_tar_terminator_free(bbstreamer *streamer);
-
-static const bbstreamer_ops bbstreamer_tar_terminator_ops = {
- .content = bbstreamer_tar_terminator_content,
- .finalize = bbstreamer_tar_terminator_finalize,
- .free = bbstreamer_tar_terminator_free
+static void astreamer_tar_terminator_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context);
+static void astreamer_tar_terminator_finalize(astreamer *streamer);
+static void astreamer_tar_terminator_free(astreamer *streamer);
+
+static const astreamer_ops astreamer_tar_terminator_ops = {
+ .content = astreamer_tar_terminator_content,
+ .finalize = astreamer_tar_terminator_finalize,
+ .free = astreamer_tar_terminator_free
};
/*
- * Create a bbstreamer that can parse a stream of content as tar data.
+ * Create a astreamer that can parse a stream of content as tar data.
*
- * The input should be a series of BBSTREAMER_UNKNOWN chunks; the bbstreamer
+ * The input should be a series of ASTREAMER_UNKNOWN chunks; the astreamer
* specified by 'next' will receive a series of typed chunks, as per the
- * conventions described in bbstreamer.h.
+ * conventions described in astreamer.h.
*/
-bbstreamer *
-bbstreamer_tar_parser_new(bbstreamer *next)
+astreamer *
+astreamer_tar_parser_new(astreamer *next)
{
- bbstreamer_tar_parser *streamer;
+ astreamer_tar_parser *streamer;
- streamer = palloc0(sizeof(bbstreamer_tar_parser));
- *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
- &bbstreamer_tar_parser_ops;
+ streamer = palloc0(sizeof(astreamer_tar_parser));
+ *((const astreamer_ops **) &streamer->base.bbs_ops) =
+ &astreamer_tar_parser_ops;
streamer->base.bbs_next = next;
initStringInfo(&streamer->base.bbs_buffer);
- streamer->next_context = BBSTREAMER_MEMBER_HEADER;
+ streamer->next_context = ASTREAMER_MEMBER_HEADER;
return &streamer->base;
}
@@ -108,29 +108,29 @@ bbstreamer_tar_parser_new(bbstreamer *next)
* Parse unknown content as tar data.
*/
static void
-bbstreamer_tar_parser_content(bbstreamer *streamer, bbstreamer_member *member,
- const char *data, int len,
- bbstreamer_archive_context context)
+astreamer_tar_parser_content(astreamer *streamer, astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context)
{
- bbstreamer_tar_parser *mystreamer = (bbstreamer_tar_parser *) streamer;
+ astreamer_tar_parser *mystreamer = (astreamer_tar_parser *) streamer;
size_t nbytes;
/* Expect unparsed input. */
Assert(member == NULL);
- Assert(context == BBSTREAMER_UNKNOWN);
+ Assert(context == ASTREAMER_UNKNOWN);
while (len > 0)
{
switch (mystreamer->next_context)
{
- case BBSTREAMER_MEMBER_HEADER:
+ case ASTREAMER_MEMBER_HEADER:
/*
* If we're expecting an archive member header, accumulate a
* full block of data before doing anything further.
*/
- if (!bbstreamer_buffer_until(streamer, &data, &len,
- TAR_BLOCK_SIZE))
+ if (!astreamer_buffer_until(streamer, &data, &len,
+ TAR_BLOCK_SIZE))
return;
/*
@@ -139,32 +139,32 @@ bbstreamer_tar_parser_content(bbstreamer *streamer, bbstreamer_member *member,
* thought was the next file header is actually the start of
* the archive trailer. Switch modes accordingly.
*/
- if (bbstreamer_tar_header(mystreamer))
+ if (astreamer_tar_header(mystreamer))
{
if (mystreamer->member.size == 0)
{
/* No content; trailer is zero-length. */
- bbstreamer_content(mystreamer->base.bbs_next,
- &mystreamer->member,
- NULL, 0,
- BBSTREAMER_MEMBER_TRAILER);
+ astreamer_content(mystreamer->base.bbs_next,
+ &mystreamer->member,
+ NULL, 0,
+ ASTREAMER_MEMBER_TRAILER);
/* Expect next header. */
- mystreamer->next_context = BBSTREAMER_MEMBER_HEADER;
+ mystreamer->next_context = ASTREAMER_MEMBER_HEADER;
}
else
{
/* Expect contents. */
- mystreamer->next_context = BBSTREAMER_MEMBER_CONTENTS;
+ mystreamer->next_context = ASTREAMER_MEMBER_CONTENTS;
}
mystreamer->base.bbs_buffer.len = 0;
mystreamer->file_bytes_sent = 0;
}
else
- mystreamer->next_context = BBSTREAMER_ARCHIVE_TRAILER;
+ mystreamer->next_context = ASTREAMER_ARCHIVE_TRAILER;
break;
- case BBSTREAMER_MEMBER_CONTENTS:
+ case ASTREAMER_MEMBER_CONTENTS:
/*
* Send as much content as we have, but not more than the
@@ -174,10 +174,10 @@ bbstreamer_tar_parser_content(bbstreamer *streamer, bbstreamer_member *member,
nbytes = mystreamer->member.size - mystreamer->file_bytes_sent;
nbytes = Min(nbytes, len);
Assert(nbytes > 0);
- bbstreamer_content(mystreamer->base.bbs_next,
- &mystreamer->member,
- data, nbytes,
- BBSTREAMER_MEMBER_CONTENTS);
+ astreamer_content(mystreamer->base.bbs_next,
+ &mystreamer->member,
+ data, nbytes,
+ ASTREAMER_MEMBER_CONTENTS);
mystreamer->file_bytes_sent += nbytes;
data += nbytes;
len -= nbytes;
@@ -193,53 +193,53 @@ bbstreamer_tar_parser_content(bbstreamer *streamer, bbstreamer_member *member,
if (mystreamer->pad_bytes_expected == 0)
{
/* Trailer is zero-length. */
- bbstreamer_content(mystreamer->base.bbs_next,
- &mystreamer->member,
- NULL, 0,
- BBSTREAMER_MEMBER_TRAILER);
+ astreamer_content(mystreamer->base.bbs_next,
+ &mystreamer->member,
+ NULL, 0,
+ ASTREAMER_MEMBER_TRAILER);
/* Expect next header. */
- mystreamer->next_context = BBSTREAMER_MEMBER_HEADER;
+ mystreamer->next_context = ASTREAMER_MEMBER_HEADER;
}
else
{
/* Trailer is not zero-length. */
- mystreamer->next_context = BBSTREAMER_MEMBER_TRAILER;
+ mystreamer->next_context = ASTREAMER_MEMBER_TRAILER;
}
mystreamer->base.bbs_buffer.len = 0;
}
break;
- case BBSTREAMER_MEMBER_TRAILER:
+ case ASTREAMER_MEMBER_TRAILER:
/*
* If we're expecting an archive member trailer, accumulate
* the expected number of padding bytes before sending
* anything onward.
*/
- if (!bbstreamer_buffer_until(streamer, &data, &len,
- mystreamer->pad_bytes_expected))
+ if (!astreamer_buffer_until(streamer, &data, &len,
+ mystreamer->pad_bytes_expected))
return;
/* OK, now we can send it. */
- bbstreamer_content(mystreamer->base.bbs_next,
- &mystreamer->member,
- data, mystreamer->pad_bytes_expected,
- BBSTREAMER_MEMBER_TRAILER);
+ astreamer_content(mystreamer->base.bbs_next,
+ &mystreamer->member,
+ data, mystreamer->pad_bytes_expected,
+ ASTREAMER_MEMBER_TRAILER);
/* Expect next file header. */
- mystreamer->next_context = BBSTREAMER_MEMBER_HEADER;
+ mystreamer->next_context = ASTREAMER_MEMBER_HEADER;
mystreamer->base.bbs_buffer.len = 0;
break;
- case BBSTREAMER_ARCHIVE_TRAILER:
+ case ASTREAMER_ARCHIVE_TRAILER:
/*
* We've seen an end-of-archive indicator, so anything more is
* buffered and sent as part of the archive trailer. But we
* don't expect more than 2 blocks.
*/
- bbstreamer_buffer_bytes(streamer, &data, &len, len);
+ astreamer_buffer_bytes(streamer, &data, &len, len);
if (len > 2 * TAR_BLOCK_SIZE)
pg_fatal("tar file trailer exceeds 2 blocks");
return;
@@ -255,14 +255,14 @@ bbstreamer_tar_parser_content(bbstreamer *streamer, bbstreamer_member *member,
* Parse a file header within a tar stream.
*
* The return value is true if we found a file header and passed it on to the
- * next bbstreamer; it is false if we have reached the archive trailer.
+ * next astreamer; it is false if we have reached the archive trailer.
*/
static bool
-bbstreamer_tar_header(bbstreamer_tar_parser *mystreamer)
+astreamer_tar_header(astreamer_tar_parser *mystreamer)
{
bool has_nonzero_byte = false;
int i;
- bbstreamer_member *member = &mystreamer->member;
+ astreamer_member *member = &mystreamer->member;
char *buffer = mystreamer->base.bbs_buffer.data;
Assert(mystreamer->base.bbs_buffer.len == TAR_BLOCK_SIZE);
@@ -304,10 +304,10 @@ bbstreamer_tar_header(bbstreamer_tar_parser *mystreamer)
/* Compute number of padding bytes. */
mystreamer->pad_bytes_expected = tarPaddingBytesRequired(member->size);
- /* Forward the entire header to the next bbstreamer. */
- bbstreamer_content(mystreamer->base.bbs_next, member,
- buffer, TAR_BLOCK_SIZE,
- BBSTREAMER_MEMBER_HEADER);
+ /* Forward the entire header to the next astreamer. */
+ astreamer_content(mystreamer->base.bbs_next, member,
+ buffer, TAR_BLOCK_SIZE,
+ ASTREAMER_MEMBER_HEADER);
return true;
}
@@ -316,50 +316,50 @@ bbstreamer_tar_header(bbstreamer_tar_parser *mystreamer)
* End-of-stream processing for a tar parser.
*/
static void
-bbstreamer_tar_parser_finalize(bbstreamer *streamer)
+astreamer_tar_parser_finalize(astreamer *streamer)
{
- bbstreamer_tar_parser *mystreamer = (bbstreamer_tar_parser *) streamer;
+ astreamer_tar_parser *mystreamer = (astreamer_tar_parser *) streamer;
- if (mystreamer->next_context != BBSTREAMER_ARCHIVE_TRAILER &&
- (mystreamer->next_context != BBSTREAMER_MEMBER_HEADER ||
+ if (mystreamer->next_context != ASTREAMER_ARCHIVE_TRAILER &&
+ (mystreamer->next_context != ASTREAMER_MEMBER_HEADER ||
mystreamer->base.bbs_buffer.len > 0))
pg_fatal("COPY stream ended before last file was finished");
/* Send the archive trailer, even if empty. */
- bbstreamer_content(streamer->bbs_next, NULL,
- streamer->bbs_buffer.data, streamer->bbs_buffer.len,
- BBSTREAMER_ARCHIVE_TRAILER);
+ astreamer_content(streamer->bbs_next, NULL,
+ streamer->bbs_buffer.data, streamer->bbs_buffer.len,
+ ASTREAMER_ARCHIVE_TRAILER);
/* Now finalize successor. */
- bbstreamer_finalize(streamer->bbs_next);
+ astreamer_finalize(streamer->bbs_next);
}
/*
* Free memory associated with a tar parser.
*/
static void
-bbstreamer_tar_parser_free(bbstreamer *streamer)
+astreamer_tar_parser_free(astreamer *streamer)
{
pfree(streamer->bbs_buffer.data);
- bbstreamer_free(streamer->bbs_next);
+ astreamer_free(streamer->bbs_next);
}
/*
- * Create a bbstreamer that can generate a tar archive.
+ * Create a astreamer that can generate a tar archive.
*
* This is intended to be usable either for generating a brand-new tar archive
* or for modifying one on the fly. The input should be a series of typed
- * chunks (i.e. not BBSTREAMER_UNKNOWN). See also the comments for
- * bbstreamer_tar_parser_content.
+ * chunks (i.e. not ASTREAMER_UNKNOWN). See also the comments for
+ * astreamer_tar_parser_content.
*/
-bbstreamer *
-bbstreamer_tar_archiver_new(bbstreamer *next)
+astreamer *
+astreamer_tar_archiver_new(astreamer *next)
{
- bbstreamer_tar_archiver *streamer;
+ astreamer_tar_archiver *streamer;
- streamer = palloc0(sizeof(bbstreamer_tar_archiver));
- *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
- &bbstreamer_tar_archiver_ops;
+ streamer = palloc0(sizeof(astreamer_tar_archiver));
+ *((const astreamer_ops **) &streamer->base.bbs_ops) =
+ &astreamer_tar_archiver_ops;
streamer->base.bbs_next = next;
return &streamer->base;
@@ -368,36 +368,36 @@ bbstreamer_tar_archiver_new(bbstreamer *next)
/*
* Fix up the stream of input chunks to create a valid tar file.
*
- * If a BBSTREAMER_MEMBER_HEADER chunk is of size 0, it is replaced with a
+ * If a ASTREAMER_MEMBER_HEADER chunk is of size 0, it is replaced with a
* newly-constructed tar header. If it is of size TAR_BLOCK_SIZE, it is
* passed through without change. Any other size is a fatal error (and
* indicates a bug).
*
- * Whenever a new BBSTREAMER_MEMBER_HEADER chunk is constructed, the
- * corresponding BBSTREAMER_MEMBER_TRAILER chunk is also constructed from
+ * Whenever a new ASTREAMER_MEMBER_HEADER chunk is constructed, the
+ * corresponding ASTREAMER_MEMBER_TRAILER chunk is also constructed from
* scratch. Specifically, we construct a block of zero bytes sufficient to
* pad out to a block boundary, as required by the tar format. Other
- * BBSTREAMER_MEMBER_TRAILER chunks are passed through without change.
+ * ASTREAMER_MEMBER_TRAILER chunks are passed through without change.
*
- * Any BBSTREAMER_MEMBER_CONTENTS chunks are passed through without change.
+ * Any ASTREAMER_MEMBER_CONTENTS chunks are passed through without change.
*
- * The BBSTREAMER_ARCHIVE_TRAILER chunk is replaced with two
+ * The ASTREAMER_ARCHIVE_TRAILER chunk is replaced with two
* blocks of zero bytes. Not all tar programs require this, but apparently
* some do. The server does not supply this trailer. If no archive trailer is
- * present, one will be added by bbstreamer_tar_parser_finalize.
+ * present, one will be added by astreamer_tar_parser_finalize.
*/
static void
-bbstreamer_tar_archiver_content(bbstreamer *streamer,
- bbstreamer_member *member,
- const char *data, int len,
- bbstreamer_archive_context context)
+astreamer_tar_archiver_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context)
{
- bbstreamer_tar_archiver *mystreamer = (bbstreamer_tar_archiver *) streamer;
+ astreamer_tar_archiver *mystreamer = (astreamer_tar_archiver *) streamer;
char buffer[2 * TAR_BLOCK_SIZE];
- Assert(context != BBSTREAMER_UNKNOWN);
+ Assert(context != ASTREAMER_UNKNOWN);
- if (context == BBSTREAMER_MEMBER_HEADER && len != TAR_BLOCK_SIZE)
+ if (context == ASTREAMER_MEMBER_HEADER && len != TAR_BLOCK_SIZE)
{
Assert(len == 0);
@@ -411,7 +411,7 @@ bbstreamer_tar_archiver_content(bbstreamer *streamer,
/* Also make a note to replace padding, in case size changed. */
mystreamer->rearchive_member = true;
}
- else if (context == BBSTREAMER_MEMBER_TRAILER &&
+ else if (context == ASTREAMER_MEMBER_TRAILER &&
mystreamer->rearchive_member)
{
int pad_bytes = tarPaddingBytesRequired(member->size);
@@ -424,7 +424,7 @@ bbstreamer_tar_archiver_content(bbstreamer *streamer,
/* Don't do this again unless we replace another header. */
mystreamer->rearchive_member = false;
}
- else if (context == BBSTREAMER_ARCHIVE_TRAILER)
+ else if (context == ASTREAMER_ARCHIVE_TRAILER)
{
/* Trailer should always be two blocks of zero bytes. */
memset(buffer, 0, 2 * TAR_BLOCK_SIZE);
@@ -432,40 +432,40 @@ bbstreamer_tar_archiver_content(bbstreamer *streamer,
len = 2 * TAR_BLOCK_SIZE;
}
- bbstreamer_content(streamer->bbs_next, member, data, len, context);
+ astreamer_content(streamer->bbs_next, member, data, len, context);
}
/*
* End-of-stream processing for a tar archiver.
*/
static void
-bbstreamer_tar_archiver_finalize(bbstreamer *streamer)
+astreamer_tar_archiver_finalize(astreamer *streamer)
{
- bbstreamer_finalize(streamer->bbs_next);
+ astreamer_finalize(streamer->bbs_next);
}
/*
* Free memory associated with a tar archiver.
*/
static void
-bbstreamer_tar_archiver_free(bbstreamer *streamer)
+astreamer_tar_archiver_free(astreamer *streamer)
{
- bbstreamer_free(streamer->bbs_next);
+ astreamer_free(streamer->bbs_next);
pfree(streamer);
}
/*
- * Create a bbstreamer that blindly adds two blocks of NUL bytes to the
+ * Create a astreamer that blindly adds two blocks of NUL bytes to the
* end of an incomplete tarfile that the server might send us.
*/
-bbstreamer *
-bbstreamer_tar_terminator_new(bbstreamer *next)
+astreamer *
+astreamer_tar_terminator_new(astreamer *next)
{
- bbstreamer *streamer;
+ astreamer *streamer;
- streamer = palloc0(sizeof(bbstreamer));
- *((const bbstreamer_ops **) &streamer->bbs_ops) =
- &bbstreamer_tar_terminator_ops;
+ streamer = palloc0(sizeof(astreamer));
+ *((const astreamer_ops **) &streamer->bbs_ops) =
+ &astreamer_tar_terminator_ops;
streamer->bbs_next = next;
return streamer;
@@ -475,17 +475,17 @@ bbstreamer_tar_terminator_new(bbstreamer *next)
* Pass all the content through without change.
*/
static void
-bbstreamer_tar_terminator_content(bbstreamer *streamer,
- bbstreamer_member *member,
- const char *data, int len,
- bbstreamer_archive_context context)
+astreamer_tar_terminator_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context)
{
/* Expect unparsed input. */
Assert(member == NULL);
- Assert(context == BBSTREAMER_UNKNOWN);
+ Assert(context == ASTREAMER_UNKNOWN);
/* Just forward it. */
- bbstreamer_content(streamer->bbs_next, member, data, len, context);
+ astreamer_content(streamer->bbs_next, member, data, len, context);
}
/*
@@ -493,22 +493,22 @@ bbstreamer_tar_terminator_content(bbstreamer *streamer,
* to supply.
*/
static void
-bbstreamer_tar_terminator_finalize(bbstreamer *streamer)
+astreamer_tar_terminator_finalize(astreamer *streamer)
{
char buffer[2 * TAR_BLOCK_SIZE];
memset(buffer, 0, 2 * TAR_BLOCK_SIZE);
- bbstreamer_content(streamer->bbs_next, NULL, buffer,
- 2 * TAR_BLOCK_SIZE, BBSTREAMER_UNKNOWN);
- bbstreamer_finalize(streamer->bbs_next);
+ astreamer_content(streamer->bbs_next, NULL, buffer,
+ 2 * TAR_BLOCK_SIZE, ASTREAMER_UNKNOWN);
+ astreamer_finalize(streamer->bbs_next);
}
/*
* Free memory associated with a tar terminator.
*/
static void
-bbstreamer_tar_terminator_free(bbstreamer *streamer)
+astreamer_tar_terminator_free(astreamer *streamer)
{
- bbstreamer_free(streamer->bbs_next);
+ astreamer_free(streamer->bbs_next);
pfree(streamer);
}
diff --git a/src/bin/pg_basebackup/bbstreamer_zstd.c b/src/bin/pg_basebackup/astreamer_zstd.c
index 20f11d4450e..58dc679ef99 100644
--- a/src/bin/pg_basebackup/bbstreamer_zstd.c
+++ b/src/bin/pg_basebackup/astreamer_zstd.c
@@ -1,11 +1,11 @@
/*-------------------------------------------------------------------------
*
- * bbstreamer_zstd.c
+ * astreamer_zstd.c
*
* Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
*
* IDENTIFICATION
- * src/bin/pg_basebackup/bbstreamer_zstd.c
+ * src/bin/pg_basebackup/astreamer_zstd.c
*-------------------------------------------------------------------------
*/
@@ -17,44 +17,44 @@
#include <zstd.h>
#endif
-#include "bbstreamer.h"
+#include "astreamer.h"
#include "common/logging.h"
#ifdef USE_ZSTD
-typedef struct bbstreamer_zstd_frame
+typedef struct astreamer_zstd_frame
{
- bbstreamer base;
+ astreamer base;
ZSTD_CCtx *cctx;
ZSTD_DCtx *dctx;
ZSTD_outBuffer zstd_outBuf;
-} bbstreamer_zstd_frame;
-
-static void bbstreamer_zstd_compressor_content(bbstreamer *streamer,
- bbstreamer_member *member,
- const char *data, int len,
- bbstreamer_archive_context context);
-static void bbstreamer_zstd_compressor_finalize(bbstreamer *streamer);
-static void bbstreamer_zstd_compressor_free(bbstreamer *streamer);
-
-static const bbstreamer_ops bbstreamer_zstd_compressor_ops = {
- .content = bbstreamer_zstd_compressor_content,
- .finalize = bbstreamer_zstd_compressor_finalize,
- .free = bbstreamer_zstd_compressor_free
+} astreamer_zstd_frame;
+
+static void astreamer_zstd_compressor_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context);
+static void astreamer_zstd_compressor_finalize(astreamer *streamer);
+static void astreamer_zstd_compressor_free(astreamer *streamer);
+
+static const astreamer_ops astreamer_zstd_compressor_ops = {
+ .content = astreamer_zstd_compressor_content,
+ .finalize = astreamer_zstd_compressor_finalize,
+ .free = astreamer_zstd_compressor_free
};
-static void bbstreamer_zstd_decompressor_content(bbstreamer *streamer,
- bbstreamer_member *member,
- const char *data, int len,
- bbstreamer_archive_context context);
-static void bbstreamer_zstd_decompressor_finalize(bbstreamer *streamer);
-static void bbstreamer_zstd_decompressor_free(bbstreamer *streamer);
-
-static const bbstreamer_ops bbstreamer_zstd_decompressor_ops = {
- .content = bbstreamer_zstd_decompressor_content,
- .finalize = bbstreamer_zstd_decompressor_finalize,
- .free = bbstreamer_zstd_decompressor_free
+static void astreamer_zstd_decompressor_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context);
+static void astreamer_zstd_decompressor_finalize(astreamer *streamer);
+static void astreamer_zstd_decompressor_free(astreamer *streamer);
+
+static const astreamer_ops astreamer_zstd_decompressor_ops = {
+ .content = astreamer_zstd_decompressor_content,
+ .finalize = astreamer_zstd_decompressor_finalize,
+ .free = astreamer_zstd_decompressor_free
};
#endif
@@ -62,19 +62,19 @@ static const bbstreamer_ops bbstreamer_zstd_decompressor_ops = {
* Create a new base backup streamer that performs zstd compression of tar
* blocks.
*/
-bbstreamer *
-bbstreamer_zstd_compressor_new(bbstreamer *next, pg_compress_specification *compress)
+astreamer *
+astreamer_zstd_compressor_new(astreamer *next, pg_compress_specification *compress)
{
#ifdef USE_ZSTD
- bbstreamer_zstd_frame *streamer;
+ astreamer_zstd_frame *streamer;
size_t ret;
Assert(next != NULL);
- streamer = palloc0(sizeof(bbstreamer_zstd_frame));
+ streamer = palloc0(sizeof(astreamer_zstd_frame));
- *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
- &bbstreamer_zstd_compressor_ops;
+ *((const astreamer_ops **) &streamer->base.bbs_ops) =
+ &astreamer_zstd_compressor_ops;
streamer->base.bbs_next = next;
initStringInfo(&streamer->base.bbs_buffer);
@@ -142,12 +142,12 @@ bbstreamer_zstd_compressor_new(bbstreamer *next, pg_compress_specification *comp
* of output buffer to next streamer and empty the buffer.
*/
static void
-bbstreamer_zstd_compressor_content(bbstreamer *streamer,
- bbstreamer_member *member,
- const char *data, int len,
- bbstreamer_archive_context context)
+astreamer_zstd_compressor_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context)
{
- bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
+ astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
ZSTD_inBuffer inBuf = {data, len, 0};
while (inBuf.pos < inBuf.size)
@@ -162,10 +162,10 @@ bbstreamer_zstd_compressor_content(bbstreamer *streamer,
if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
max_needed)
{
- bbstreamer_content(mystreamer->base.bbs_next, member,
- mystreamer->zstd_outBuf.dst,
- mystreamer->zstd_outBuf.pos,
- context);
+ astreamer_content(mystreamer->base.bbs_next, member,
+ mystreamer->zstd_outBuf.dst,
+ mystreamer->zstd_outBuf.pos,
+ context);
/* Reset the ZSTD output buffer. */
mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
@@ -187,9 +187,9 @@ bbstreamer_zstd_compressor_content(bbstreamer *streamer,
* End-of-stream processing.
*/
static void
-bbstreamer_zstd_compressor_finalize(bbstreamer *streamer)
+astreamer_zstd_compressor_finalize(astreamer *streamer)
{
- bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
+ astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
size_t yet_to_flush;
do
@@ -204,10 +204,10 @@ bbstreamer_zstd_compressor_finalize(bbstreamer *streamer)
if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
max_needed)
{
- bbstreamer_content(mystreamer->base.bbs_next, NULL,
- mystreamer->zstd_outBuf.dst,
- mystreamer->zstd_outBuf.pos,
- BBSTREAMER_UNKNOWN);
+ astreamer_content(mystreamer->base.bbs_next, NULL,
+ mystreamer->zstd_outBuf.dst,
+ mystreamer->zstd_outBuf.pos,
+ ASTREAMER_UNKNOWN);
/* Reset the ZSTD output buffer. */
mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
@@ -227,23 +227,23 @@ bbstreamer_zstd_compressor_finalize(bbstreamer *streamer)
/* Make sure to pass any remaining bytes to the next streamer. */
if (mystreamer->zstd_outBuf.pos > 0)
- bbstreamer_content(mystreamer->base.bbs_next, NULL,
- mystreamer->zstd_outBuf.dst,
- mystreamer->zstd_outBuf.pos,
- BBSTREAMER_UNKNOWN);
+ astreamer_content(mystreamer->base.bbs_next, NULL,
+ mystreamer->zstd_outBuf.dst,
+ mystreamer->zstd_outBuf.pos,
+ ASTREAMER_UNKNOWN);
- bbstreamer_finalize(mystreamer->base.bbs_next);
+ astreamer_finalize(mystreamer->base.bbs_next);
}
/*
* Free memory.
*/
static void
-bbstreamer_zstd_compressor_free(bbstreamer *streamer)
+astreamer_zstd_compressor_free(astreamer *streamer)
{
- bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
+ astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
- bbstreamer_free(streamer->bbs_next);
+ astreamer_free(streamer->bbs_next);
ZSTD_freeCCtx(mystreamer->cctx);
pfree(streamer->bbs_buffer.data);
pfree(streamer);
@@ -254,17 +254,17 @@ bbstreamer_zstd_compressor_free(bbstreamer *streamer)
* Create a new base backup streamer that performs decompression of zstd
* compressed blocks.
*/
-bbstreamer *
-bbstreamer_zstd_decompressor_new(bbstreamer *next)
+astreamer *
+astreamer_zstd_decompressor_new(astreamer *next)
{
#ifdef USE_ZSTD
- bbstreamer_zstd_frame *streamer;
+ astreamer_zstd_frame *streamer;
Assert(next != NULL);
- streamer = palloc0(sizeof(bbstreamer_zstd_frame));
- *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
- &bbstreamer_zstd_decompressor_ops;
+ streamer = palloc0(sizeof(astreamer_zstd_frame));
+ *((const astreamer_ops **) &streamer->base.bbs_ops) =
+ &astreamer_zstd_decompressor_ops;
streamer->base.bbs_next = next;
initStringInfo(&streamer->base.bbs_buffer);
@@ -293,12 +293,12 @@ bbstreamer_zstd_decompressor_new(bbstreamer *next)
* to the next streamer.
*/
static void
-bbstreamer_zstd_decompressor_content(bbstreamer *streamer,
- bbstreamer_member *member,
- const char *data, int len,
- bbstreamer_archive_context context)
+astreamer_zstd_decompressor_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context)
{
- bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
+ astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
ZSTD_inBuffer inBuf = {data, len, 0};
while (inBuf.pos < inBuf.size)
@@ -311,10 +311,10 @@ bbstreamer_zstd_decompressor_content(bbstreamer *streamer,
*/
if (mystreamer->zstd_outBuf.pos >= mystreamer->zstd_outBuf.size)
{
- bbstreamer_content(mystreamer->base.bbs_next, member,
- mystreamer->zstd_outBuf.dst,
- mystreamer->zstd_outBuf.pos,
- context);
+ astreamer_content(mystreamer->base.bbs_next, member,
+ mystreamer->zstd_outBuf.dst,
+ mystreamer->zstd_outBuf.pos,
+ context);
/* Reset the ZSTD output buffer. */
mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
@@ -335,32 +335,32 @@ bbstreamer_zstd_decompressor_content(bbstreamer *streamer,
* End-of-stream processing.
*/
static void
-bbstreamer_zstd_decompressor_finalize(bbstreamer *streamer)
+astreamer_zstd_decompressor_finalize(astreamer *streamer)
{
- bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
+ astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
/*
* End of the stream, if there is some pending data in output buffers then
* we must forward it to next streamer.
*/
if (mystreamer->zstd_outBuf.pos > 0)
- bbstreamer_content(mystreamer->base.bbs_next, NULL,
- mystreamer->base.bbs_buffer.data,
- mystreamer->base.bbs_buffer.maxlen,
- BBSTREAMER_UNKNOWN);
+ astreamer_content(mystreamer->base.bbs_next, NULL,
+ mystreamer->base.bbs_buffer.data,
+ mystreamer->base.bbs_buffer.maxlen,
+ ASTREAMER_UNKNOWN);
- bbstreamer_finalize(mystreamer->base.bbs_next);
+ astreamer_finalize(mystreamer->base.bbs_next);
}
/*
* Free memory.
*/
static void
-bbstreamer_zstd_decompressor_free(bbstreamer *streamer)
+astreamer_zstd_decompressor_free(astreamer *streamer)
{
- bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
+ astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
- bbstreamer_free(streamer->bbs_next);
+ astreamer_free(streamer->bbs_next);
ZSTD_freeDCtx(mystreamer->dctx);
pfree(streamer->bbs_buffer.data);
pfree(streamer);
diff --git a/src/bin/pg_basebackup/bbstreamer.h b/src/bin/pg_basebackup/bbstreamer.h
deleted file mode 100644
index 3b820f13b51..00000000000
--- a/src/bin/pg_basebackup/bbstreamer.h
+++ /dev/null
@@ -1,226 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * bbstreamer.h
- *
- * Each tar archive returned by the server is passed to one or more
- * bbstreamer objects for further processing. The bbstreamer may do
- * something simple, like write the archive to a file, perhaps after
- * compressing it, but it can also do more complicated things, like
- * annotating the byte stream to indicate which parts of the data
- * correspond to tar headers or trailing padding, vs. which parts are
- * payload data. A subsequent bbstreamer may use this information to
- * make further decisions about how to process the data; for example,
- * it might choose to modify the archive contents.
- *
- * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
- *
- * IDENTIFICATION
- * src/bin/pg_basebackup/bbstreamer.h
- *-------------------------------------------------------------------------
- */
-
-#ifndef BBSTREAMER_H
-#define BBSTREAMER_H
-
-#include "common/compression.h"
-#include "lib/stringinfo.h"
-#include "pqexpbuffer.h"
-
-struct bbstreamer;
-struct bbstreamer_ops;
-typedef struct bbstreamer bbstreamer;
-typedef struct bbstreamer_ops bbstreamer_ops;
-
-/*
- * Each chunk of archive data passed to a bbstreamer is classified into one
- * of these categories. When data is first received from the remote server,
- * each chunk will be categorized as BBSTREAMER_UNKNOWN, and the chunks will
- * be of whatever size the remote server chose to send.
- *
- * If the archive is parsed (e.g. see bbstreamer_tar_parser_new()), then all
- * chunks should be labelled as one of the other types listed here. In
- * addition, there should be exactly one BBSTREAMER_MEMBER_HEADER chunk and
- * exactly one BBSTREAMER_MEMBER_TRAILER chunk per archive member, even if
- * that means a zero-length call. There can be any number of
- * BBSTREAMER_MEMBER_CONTENTS chunks in between those calls. There
- * should exactly BBSTREAMER_ARCHIVE_TRAILER chunk, and it should follow the
- * last BBSTREAMER_MEMBER_TRAILER chunk.
- *
- * In theory, we could need other classifications here, such as a way of
- * indicating an archive header, but the "tar" format doesn't need anything
- * else, so for the time being there's no point.
- */
-typedef enum
-{
- BBSTREAMER_UNKNOWN,
- BBSTREAMER_MEMBER_HEADER,
- BBSTREAMER_MEMBER_CONTENTS,
- BBSTREAMER_MEMBER_TRAILER,
- BBSTREAMER_ARCHIVE_TRAILER,
-} bbstreamer_archive_context;
-
-/*
- * Each chunk of data that is classified as BBSTREAMER_MEMBER_HEADER,
- * BBSTREAMER_MEMBER_CONTENTS, or BBSTREAMER_MEMBER_TRAILER should also
- * pass a pointer to an instance of this struct. The details are expected
- * to be present in the archive header and used to fill the struct, after
- * which all subsequent calls for the same archive member are expected to
- * pass the same details.
- */
-typedef struct
-{
- char pathname[MAXPGPATH];
- pgoff_t size;
- mode_t mode;
- uid_t uid;
- gid_t gid;
- bool is_directory;
- bool is_link;
- char linktarget[MAXPGPATH];
-} bbstreamer_member;
-
-/*
- * Generally, each type of bbstreamer will define its own struct, but the
- * first element should be 'bbstreamer base'. A bbstreamer that does not
- * require any additional private data could use this structure directly.
- *
- * bbs_ops is a pointer to the bbstreamer_ops object which contains the
- * function pointers appropriate to this type of bbstreamer.
- *
- * bbs_next is a pointer to the successor bbstreamer, for those types of
- * bbstreamer which forward data to a successor. It need not be used and
- * should be set to NULL when not relevant.
- *
- * bbs_buffer is a buffer for accumulating data for temporary storage. Each
- * type of bbstreamer makes its own decisions about whether and how to use
- * this buffer.
- */
-struct bbstreamer
-{
- const bbstreamer_ops *bbs_ops;
- bbstreamer *bbs_next;
- StringInfoData bbs_buffer;
-};
-
-/*
- * There are three callbacks for a bbstreamer. The 'content' callback is
- * called repeatedly, as described in the bbstreamer_archive_context comments.
- * Then, the 'finalize' callback is called once at the end, to give the
- * bbstreamer a chance to perform cleanup such as closing files. Finally,
- * because this code is running in a frontend environment where, as of this
- * writing, there are no memory contexts, the 'free' callback is called to
- * release memory. These callbacks should always be invoked using the static
- * inline functions defined below.
- */
-struct bbstreamer_ops
-{
- void (*content) (bbstreamer *streamer, bbstreamer_member *member,
- const char *data, int len,
- bbstreamer_archive_context context);
- void (*finalize) (bbstreamer *streamer);
- void (*free) (bbstreamer *streamer);
-};
-
-/* Send some content to a bbstreamer. */
-static inline void
-bbstreamer_content(bbstreamer *streamer, bbstreamer_member *member,
- const char *data, int len,
- bbstreamer_archive_context context)
-{
- Assert(streamer != NULL);
- streamer->bbs_ops->content(streamer, member, data, len, context);
-}
-
-/* Finalize a bbstreamer. */
-static inline void
-bbstreamer_finalize(bbstreamer *streamer)
-{
- Assert(streamer != NULL);
- streamer->bbs_ops->finalize(streamer);
-}
-
-/* Free a bbstreamer. */
-static inline void
-bbstreamer_free(bbstreamer *streamer)
-{
- Assert(streamer != NULL);
- streamer->bbs_ops->free(streamer);
-}
-
-/*
- * This is a convenience method for use when implementing a bbstreamer; it is
- * not for use by outside callers. It adds the amount of data specified by
- * 'nbytes' to the bbstreamer's buffer and adjusts '*len' and '*data'
- * accordingly.
- */
-static inline void
-bbstreamer_buffer_bytes(bbstreamer *streamer, const char **data, int *len,
- int nbytes)
-{
- Assert(nbytes <= *len);
-
- appendBinaryStringInfo(&streamer->bbs_buffer, *data, nbytes);
- *len -= nbytes;
- *data += nbytes;
-}
-
-/*
- * This is a convenience method for use when implementing a bbstreamer; it is
- * not for use by outsider callers. It attempts to add enough data to the
- * bbstreamer's buffer to reach a length of target_bytes and adjusts '*len'
- * and '*data' accordingly. It returns true if the target length has been
- * reached and false otherwise.
- */
-static inline bool
-bbstreamer_buffer_until(bbstreamer *streamer, const char **data, int *len,
- int target_bytes)
-{
- int buflen = streamer->bbs_buffer.len;
-
- if (buflen >= target_bytes)
- {
- /* Target length already reached; nothing to do. */
- return true;
- }
-
- if (buflen + *len < target_bytes)
- {
- /* Not enough data to reach target length; buffer all of it. */
- bbstreamer_buffer_bytes(streamer, data, len, *len);
- return false;
- }
-
- /* Buffer just enough to reach the target length. */
- bbstreamer_buffer_bytes(streamer, data, len, target_bytes - buflen);
- return true;
-}
-
-/*
- * Functions for creating bbstreamer objects of various types. See the header
- * comments for each of these functions for details.
- */
-extern bbstreamer *bbstreamer_plain_writer_new(char *pathname, FILE *file);
-extern bbstreamer *bbstreamer_gzip_writer_new(char *pathname, FILE *file,
- pg_compress_specification *compress);
-extern bbstreamer *bbstreamer_extractor_new(const char *basepath,
- const char *(*link_map) (const char *),
- void (*report_output_file) (const char *));
-
-extern bbstreamer *bbstreamer_gzip_decompressor_new(bbstreamer *next);
-extern bbstreamer *bbstreamer_lz4_compressor_new(bbstreamer *next,
- pg_compress_specification *compress);
-extern bbstreamer *bbstreamer_lz4_decompressor_new(bbstreamer *next);
-extern bbstreamer *bbstreamer_zstd_compressor_new(bbstreamer *next,
- pg_compress_specification *compress);
-extern bbstreamer *bbstreamer_zstd_decompressor_new(bbstreamer *next);
-extern bbstreamer *bbstreamer_tar_parser_new(bbstreamer *next);
-extern bbstreamer *bbstreamer_tar_terminator_new(bbstreamer *next);
-extern bbstreamer *bbstreamer_tar_archiver_new(bbstreamer *next);
-
-extern bbstreamer *bbstreamer_recovery_injector_new(bbstreamer *next,
- bool is_recovery_guc_supported,
- PQExpBuffer recoveryconfcontents);
-extern void bbstreamer_inject_file(bbstreamer *streamer, char *pathname,
- char *data, int len);
-
-#endif
diff --git a/src/bin/pg_basebackup/meson.build b/src/bin/pg_basebackup/meson.build
index c00acd5e118..a68dbd7837d 100644
--- a/src/bin/pg_basebackup/meson.build
+++ b/src/bin/pg_basebackup/meson.build
@@ -1,12 +1,12 @@
# Copyright (c) 2022-2024, PostgreSQL Global Development Group
common_sources = files(
- 'bbstreamer_file.c',
- 'bbstreamer_gzip.c',
- 'bbstreamer_inject.c',
- 'bbstreamer_lz4.c',
- 'bbstreamer_tar.c',
- 'bbstreamer_zstd.c',
+ 'astreamer_file.c',
+ 'astreamer_gzip.c',
+ 'astreamer_inject.c',
+ 'astreamer_lz4.c',
+ 'astreamer_tar.c',
+ 'astreamer_zstd.c',
'receivelog.c',
'streamutil.c',
'walmethods.c',
diff --git a/src/bin/pg_basebackup/nls.mk b/src/bin/pg_basebackup/nls.mk
index 384dbb021e9..950b9797b1e 100644
--- a/src/bin/pg_basebackup/nls.mk
+++ b/src/bin/pg_basebackup/nls.mk
@@ -1,12 +1,12 @@
# src/bin/pg_basebackup/nls.mk
CATALOG_NAME = pg_basebackup
GETTEXT_FILES = $(FRONTEND_COMMON_GETTEXT_FILES) \
- bbstreamer_file.c \
- bbstreamer_gzip.c \
- bbstreamer_inject.c \
- bbstreamer_lz4.c \
- bbstreamer_tar.c \
- bbstreamer_zstd.c \
+ astreamer_file.c \
+ astreamer_gzip.c \
+ astreamer_inject.c \
+ astreamer_lz4.c \
+ astreamer_tar.c \
+ astreamer_zstd.c \
pg_basebackup.c \
pg_createsubscriber.c \
pg_receivewal.c \
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 8f3dd04fd22..1966ada69c9 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -26,8 +26,8 @@
#endif
#include "access/xlog_internal.h"
+#include "astreamer.h"
#include "backup/basebackup.h"
-#include "bbstreamer.h"
#include "common/compression.h"
#include "common/file_perm.h"
#include "common/file_utils.h"
@@ -57,8 +57,8 @@ typedef struct ArchiveStreamState
{
int tablespacenum;
pg_compress_specification *compress;
- bbstreamer *streamer;
- bbstreamer *manifest_inject_streamer;
+ astreamer *streamer;
+ astreamer *manifest_inject_streamer;
PQExpBuffer manifest_buffer;
char manifest_filename[MAXPGPATH];
FILE *manifest_file;
@@ -67,7 +67,7 @@ typedef struct ArchiveStreamState
typedef struct WriteTarState
{
int tablespacenum;
- bbstreamer *streamer;
+ astreamer *streamer;
} WriteTarState;
typedef struct WriteManifestState
@@ -199,11 +199,11 @@ static void verify_dir_is_empty_or_create(char *dirname, bool *created, bool *fo
static void progress_update_filename(const char *filename);
static void progress_report(int tablespacenum, bool force, bool finished);
-static bbstreamer *CreateBackupStreamer(char *archive_name, char *spclocation,
- bbstreamer **manifest_inject_streamer_p,
- bool is_recovery_guc_supported,
- bool expect_unterminated_tarfile,
- pg_compress_specification *compress);
+static astreamer *CreateBackupStreamer(char *archive_name, char *spclocation,
+ astreamer **manifest_inject_streamer_p,
+ bool is_recovery_guc_supported,
+ bool expect_unterminated_tarfile,
+ pg_compress_specification *compress);
static void ReceiveArchiveStreamChunk(size_t r, char *copybuf,
void *callback_data);
static char GetCopyDataByte(size_t r, char *copybuf, size_t *cursor);
@@ -1053,19 +1053,19 @@ ReceiveCopyData(PGconn *conn, WriteDataCallback callback,
* the options selected by the user. We may just write the results directly
* to a file, or we might compress first, or we might extract the tar file
* and write each member separately. This function doesn't do any of that
- * directly, but it works out what kind of bbstreamer we need to create so
+ * directly, but it works out what kind of astreamer we need to create so
* that the right stuff happens when, down the road, we actually receive
* the data.
*/
-static bbstreamer *
+static astreamer *
CreateBackupStreamer(char *archive_name, char *spclocation,
- bbstreamer **manifest_inject_streamer_p,
+ astreamer **manifest_inject_streamer_p,
bool is_recovery_guc_supported,
bool expect_unterminated_tarfile,
pg_compress_specification *compress)
{
- bbstreamer *streamer = NULL;
- bbstreamer *manifest_inject_streamer = NULL;
+ astreamer *streamer = NULL;
+ astreamer *manifest_inject_streamer = NULL;
bool inject_manifest;
bool is_tar,
is_tar_gz,
@@ -1160,9 +1160,9 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
directory = psprintf("%s/%s", basedir, spclocation);
else
directory = get_tablespace_mapping(spclocation);
- streamer = bbstreamer_extractor_new(directory,
- get_tablespace_mapping,
- progress_update_filename);
+ streamer = astreamer_extractor_new(directory,
+ get_tablespace_mapping,
+ progress_update_filename);
}
else
{
@@ -1188,27 +1188,27 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
}
if (compress->algorithm == PG_COMPRESSION_NONE)
- streamer = bbstreamer_plain_writer_new(archive_filename,
- archive_file);
+ streamer = astreamer_plain_writer_new(archive_filename,
+ archive_file);
else if (compress->algorithm == PG_COMPRESSION_GZIP)
{
strlcat(archive_filename, ".gz", sizeof(archive_filename));
- streamer = bbstreamer_gzip_writer_new(archive_filename,
- archive_file, compress);
+ streamer = astreamer_gzip_writer_new(archive_filename,
+ archive_file, compress);
}
else if (compress->algorithm == PG_COMPRESSION_LZ4)
{
strlcat(archive_filename, ".lz4", sizeof(archive_filename));
- streamer = bbstreamer_plain_writer_new(archive_filename,
- archive_file);
- streamer = bbstreamer_lz4_compressor_new(streamer, compress);
+ streamer = astreamer_plain_writer_new(archive_filename,
+ archive_file);
+ streamer = astreamer_lz4_compressor_new(streamer, compress);
}
else if (compress->algorithm == PG_COMPRESSION_ZSTD)
{
strlcat(archive_filename, ".zst", sizeof(archive_filename));
- streamer = bbstreamer_plain_writer_new(archive_filename,
- archive_file);
- streamer = bbstreamer_zstd_compressor_new(streamer, compress);
+ streamer = astreamer_plain_writer_new(archive_filename,
+ archive_file);
+ streamer = astreamer_zstd_compressor_new(streamer, compress);
}
else
{
@@ -1222,7 +1222,7 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
* into it.
*/
if (must_parse_archive)
- streamer = bbstreamer_tar_archiver_new(streamer);
+ streamer = astreamer_tar_archiver_new(streamer);
progress_update_filename(archive_filename);
}
@@ -1241,9 +1241,9 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
if (spclocation == NULL && writerecoveryconf)
{
Assert(must_parse_archive);
- streamer = bbstreamer_recovery_injector_new(streamer,
- is_recovery_guc_supported,
- recoveryconfcontents);
+ streamer = astreamer_recovery_injector_new(streamer,
+ is_recovery_guc_supported,
+ recoveryconfcontents);
}
/*
@@ -1253,9 +1253,9 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
* we're talking to such a server we'll need to add the terminator here.
*/
if (must_parse_archive)
- streamer = bbstreamer_tar_parser_new(streamer);
+ streamer = astreamer_tar_parser_new(streamer);
else if (expect_unterminated_tarfile)
- streamer = bbstreamer_tar_terminator_new(streamer);
+ streamer = astreamer_tar_terminator_new(streamer);
/*
* If the user has requested a server compressed archive along with
@@ -1264,11 +1264,11 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
if (format == 'p')
{
if (is_tar_gz)
- streamer = bbstreamer_gzip_decompressor_new(streamer);
+ streamer = astreamer_gzip_decompressor_new(streamer);
else if (is_tar_lz4)
- streamer = bbstreamer_lz4_decompressor_new(streamer);
+ streamer = astreamer_lz4_decompressor_new(streamer);
else if (is_tar_zstd)
- streamer = bbstreamer_zstd_decompressor_new(streamer);
+ streamer = astreamer_zstd_decompressor_new(streamer);
}
/* Return the results. */
@@ -1307,10 +1307,10 @@ ReceiveArchiveStream(PGconn *conn, pg_compress_specification *compress)
if (state.manifest_inject_streamer != NULL &&
state.manifest_buffer != NULL)
{
- bbstreamer_inject_file(state.manifest_inject_streamer,
- "backup_manifest",
- state.manifest_buffer->data,
- state.manifest_buffer->len);
+ astreamer_inject_file(state.manifest_inject_streamer,
+ "backup_manifest",
+ state.manifest_buffer->data,
+ state.manifest_buffer->len);
destroyPQExpBuffer(state.manifest_buffer);
state.manifest_buffer = NULL;
}
@@ -1318,8 +1318,8 @@ ReceiveArchiveStream(PGconn *conn, pg_compress_specification *compress)
/* If there's still an archive in progress, end processing. */
if (state.streamer != NULL)
{
- bbstreamer_finalize(state.streamer);
- bbstreamer_free(state.streamer);
+ astreamer_finalize(state.streamer);
+ astreamer_free(state.streamer);
state.streamer = NULL;
}
}
@@ -1383,8 +1383,8 @@ ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data)
/* End processing of any prior archive. */
if (state->streamer != NULL)
{
- bbstreamer_finalize(state->streamer);
- bbstreamer_free(state->streamer);
+ astreamer_finalize(state->streamer);
+ astreamer_free(state->streamer);
state->streamer = NULL;
}
@@ -1437,8 +1437,8 @@ ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data)
else if (state->streamer != NULL)
{
/* Archive data. */
- bbstreamer_content(state->streamer, NULL, copybuf + 1,
- r - 1, BBSTREAMER_UNKNOWN);
+ astreamer_content(state->streamer, NULL, copybuf + 1,
+ r - 1, ASTREAMER_UNKNOWN);
}
else
pg_fatal("unexpected payload data");
@@ -1600,7 +1600,7 @@ ReceiveTarFile(PGconn *conn, char *archive_name, char *spclocation,
bool tablespacenum, pg_compress_specification *compress)
{
WriteTarState state;
- bbstreamer *manifest_inject_streamer;
+ astreamer *manifest_inject_streamer;
bool is_recovery_guc_supported;
bool expect_unterminated_tarfile;
@@ -1636,16 +1636,16 @@ ReceiveTarFile(PGconn *conn, char *archive_name, char *spclocation,
pg_fatal("out of memory");
/* Inject it into the output tarfile. */
- bbstreamer_inject_file(manifest_inject_streamer, "backup_manifest",
- buf.data, buf.len);
+ astreamer_inject_file(manifest_inject_streamer, "backup_manifest",
+ buf.data, buf.len);
/* Free memory. */
termPQExpBuffer(&buf);
}
/* Cleanup. */
- bbstreamer_finalize(state.streamer);
- bbstreamer_free(state.streamer);
+ astreamer_finalize(state.streamer);
+ astreamer_free(state.streamer);
progress_report(tablespacenum, true, false);
@@ -1663,7 +1663,7 @@ ReceiveTarCopyChunk(size_t r, char *copybuf, void *callback_data)
{
WriteTarState *state = callback_data;
- bbstreamer_content(state->streamer, NULL, copybuf, r, BBSTREAMER_UNKNOWN);
+ astreamer_content(state->streamer, NULL, copybuf, r, ASTREAMER_UNKNOWN);
totaldone += r;
progress_report(state->tablespacenum, false, false);
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 6e6b7c27118..547d14b3e7c 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3317,19 +3317,19 @@ bbsink_shell
bbsink_state
bbsink_throttle
bbsink_zstd
-bbstreamer
-bbstreamer_archive_context
-bbstreamer_extractor
-bbstreamer_gzip_decompressor
-bbstreamer_gzip_writer
-bbstreamer_lz4_frame
-bbstreamer_member
-bbstreamer_ops
-bbstreamer_plain_writer
-bbstreamer_recovery_injector
-bbstreamer_tar_archiver
-bbstreamer_tar_parser
-bbstreamer_zstd_frame
+astreamer
+astreamer_archive_context
+astreamer_extractor
+astreamer_gzip_decompressor
+astreamer_gzip_writer
+astreamer_lz4_frame
+astreamer_member
+astreamer_ops
+astreamer_plain_writer
+astreamer_recovery_injector
+astreamer_tar_archiver
+astreamer_tar_parser
+astreamer_zstd_frame
bgworker_main_type
bh_node_type
binaryheap