aboutsummaryrefslogtreecommitdiff
path: root/src/bin/pg_basebackup/walmethods.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/bin/pg_basebackup/walmethods.c')
-rw-r--r--src/bin/pg_basebackup/walmethods.c510
1 files changed, 267 insertions, 243 deletions
diff --git a/src/bin/pg_basebackup/walmethods.c b/src/bin/pg_basebackup/walmethods.c
index d98a2681b90..bc2e83d02be 100644
--- a/src/bin/pg_basebackup/walmethods.c
+++ b/src/bin/pg_basebackup/walmethods.c
@@ -2,9 +2,6 @@
*
* walmethods.c - implementations of different ways to write received wal
*
- * NOTE! The caller must ensure that only one method is instantiated in
- * any given program, and that it's only instantiated once!
- *
* Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
*
* IDENTIFICATION
@@ -43,19 +40,41 @@
*-------------------------------------------------------------------------
*/
+static Walfile *dir_open_for_write(WalWriteMethod *wwmethod,
+ const char *pathname,
+ const char *temp_suffix,
+ size_t pad_to_size);
+static int dir_close(Walfile *f, WalCloseMethod method);
+static bool dir_existsfile(WalWriteMethod *wwmethod, const char *pathname);
+static ssize_t dir_get_file_size(WalWriteMethod *wwmethod,
+ const char *pathname);
+static char *dir_get_file_name(WalWriteMethod *wwmethod,
+ const char *pathname, const char *temp_suffix);
+static ssize_t dir_write(Walfile *f, const void *buf, size_t count);
+static int dir_sync(Walfile *f);
+static bool dir_finish(WalWriteMethod *wwmethod);
+static void dir_free(WalWriteMethod *wwmethod);
+
+const WalWriteMethodOps WalDirectoryMethodOps = {
+ .open_for_write = dir_open_for_write,
+ .close = dir_close,
+ .existsfile = dir_existsfile,
+ .get_file_size = dir_get_file_size,
+ .get_file_name = dir_get_file_name,
+ .write = dir_write,
+ .sync = dir_sync,
+ .finish = dir_finish,
+ .free = dir_free
+};
+
/*
* Global static data for this method
*/
typedef struct DirectoryMethodData
{
+ WalWriteMethod base;
char *basedir;
- pg_compress_algorithm compression_algorithm;
- int compression_level;
- bool sync;
- const char *lasterrstring; /* if set, takes precedence over lasterrno */
- int lasterrno;
} DirectoryMethodData;
-static DirectoryMethodData *dir_data = NULL;
/*
* Local file handle
@@ -76,36 +95,29 @@ typedef struct DirectoryMethodFile
#endif
} DirectoryMethodFile;
-#define dir_clear_error() \
- (dir_data->lasterrstring = NULL, dir_data->lasterrno = 0)
-#define dir_set_error(msg) \
- (dir_data->lasterrstring = _(msg))
-
-static const char *
-dir_getlasterror(void)
-{
- if (dir_data->lasterrstring)
- return dir_data->lasterrstring;
- return strerror(dir_data->lasterrno);
-}
+#define clear_error(wwmethod) \
+ ((wwmethod)->lasterrstring = NULL, (wwmethod)->lasterrno = 0)
static char *
-dir_get_file_name(const char *pathname, const char *temp_suffix)
+dir_get_file_name(WalWriteMethod *wwmethod,
+ const char *pathname, const char *temp_suffix)
{
char *filename = pg_malloc0(MAXPGPATH * sizeof(char));
snprintf(filename, MAXPGPATH, "%s%s%s",
pathname,
- dir_data->compression_algorithm == PG_COMPRESSION_GZIP ? ".gz" :
- dir_data->compression_algorithm == PG_COMPRESSION_LZ4 ? ".lz4" : "",
+ wwmethod->compression_algorithm == PG_COMPRESSION_GZIP ? ".gz" :
+ wwmethod->compression_algorithm == PG_COMPRESSION_LZ4 ? ".lz4" : "",
temp_suffix ? temp_suffix : "");
return filename;
}
static Walfile *
-dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
+dir_open_for_write(WalWriteMethod *wwmethod, const char *pathname,
+ const char *temp_suffix, size_t pad_to_size)
{
+ DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
char tmppath[MAXPGPATH];
char *filename;
int fd;
@@ -119,9 +131,9 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
void *lz4buf = NULL;
#endif
- dir_clear_error();
+ clear_error(wwmethod);
- filename = dir_get_file_name(pathname, temp_suffix);
+ filename = dir_get_file_name(wwmethod, pathname, temp_suffix);
snprintf(tmppath, sizeof(tmppath), "%s/%s",
dir_data->basedir, filename);
pg_free(filename);
@@ -135,32 +147,32 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, pg_file_create_mode);
if (fd < 0)
{
- dir_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
return NULL;
}
#ifdef HAVE_LIBZ
- if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
gzfp = gzdopen(fd, "wb");
if (gzfp == NULL)
{
- dir_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
close(fd);
return NULL;
}
- if (gzsetparams(gzfp, dir_data->compression_level,
+ if (gzsetparams(gzfp, wwmethod->compression_level,
Z_DEFAULT_STRATEGY) != Z_OK)
{
- dir_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
gzclose(gzfp);
return NULL;
}
}
#endif
#ifdef USE_LZ4
- if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
{
size_t ctx_out;
size_t header_size;
@@ -169,7 +181,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
ctx_out = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
if (LZ4F_isError(ctx_out))
{
- dir_data->lasterrstring = LZ4F_getErrorName(ctx_out);
+ wwmethod->lasterrstring = LZ4F_getErrorName(ctx_out);
close(fd);
return NULL;
}
@@ -179,13 +191,13 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
/* assign the compression level, default is 0 */
memset(&prefs, 0, sizeof(prefs));
- prefs.compressionLevel = dir_data->compression_level;
+ prefs.compressionLevel = wwmethod->compression_level;
/* add the header */
header_size = LZ4F_compressBegin(ctx, lz4buf, lz4bufsize, &prefs);
if (LZ4F_isError(header_size))
{
- dir_data->lasterrstring = LZ4F_getErrorName(header_size);
+ wwmethod->lasterrstring = LZ4F_getErrorName(header_size);
(void) LZ4F_freeCompressionContext(ctx);
pg_free(lz4buf);
close(fd);
@@ -196,7 +208,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
if (write(fd, lz4buf, header_size) != header_size)
{
/* If write didn't set errno, assume problem is no disk space */
- dir_data->lasterrno = errno ? errno : ENOSPC;
+ wwmethod->lasterrno = errno ? errno : ENOSPC;
(void) LZ4F_freeCompressionContext(ctx);
pg_free(lz4buf);
close(fd);
@@ -206,7 +218,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
#endif
/* Do pre-padding on non-compressed files */
- if (pad_to_size && dir_data->compression_algorithm == PG_COMPRESSION_NONE)
+ if (pad_to_size && wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
{
PGAlignedXLogBlock zerobuf;
int bytes;
@@ -218,7 +230,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
if (write(fd, zerobuf.data, XLOG_BLCKSZ) != XLOG_BLCKSZ)
{
/* If write didn't set errno, assume problem is no disk space */
- dir_data->lasterrno = errno ? errno : ENOSPC;
+ wwmethod->lasterrno = errno ? errno : ENOSPC;
close(fd);
return NULL;
}
@@ -226,7 +238,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
if (lseek(fd, 0, SEEK_SET) != 0)
{
- dir_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
close(fd);
return NULL;
}
@@ -238,19 +250,19 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
* important when using synchronous mode, where the file is modified and
* fsynced in-place, without a directory fsync.
*/
- if (dir_data->sync)
+ if (wwmethod->sync)
{
if (fsync_fname(tmppath, false) != 0 ||
fsync_parent_path(tmppath) != 0)
{
- dir_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
#ifdef HAVE_LIBZ
- if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
gzclose(gzfp);
else
#endif
#ifdef USE_LZ4
- if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
{
(void) LZ4F_compressEnd(ctx, lz4buf, lz4bufsize, NULL);
(void) LZ4F_freeCompressionContext(ctx);
@@ -266,11 +278,11 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
f = pg_malloc0(sizeof(DirectoryMethodFile));
#ifdef HAVE_LIBZ
- if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
f->gzfp = gzfp;
#endif
#ifdef USE_LZ4
- if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
{
f->ctx = ctx;
f->lz4buf = lz4buf;
@@ -278,6 +290,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
}
#endif
+ f->base.wwmethod = wwmethod;
f->base.currpos = 0;
f->base.pathname = pg_strdup(pathname);
f->fd = fd;
@@ -295,23 +308,23 @@ dir_write(Walfile *f, const void *buf, size_t count)
DirectoryMethodFile *df = (DirectoryMethodFile *) f;
Assert(f != NULL);
- dir_clear_error();
+ clear_error(f->wwmethod);
#ifdef HAVE_LIBZ
- if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
errno = 0;
r = (ssize_t) gzwrite(df->gzfp, buf, count);
if (r != count)
{
/* If write didn't set errno, assume problem is no disk space */
- dir_data->lasterrno = errno ? errno : ENOSPC;
+ f->wwmethod->lasterrno = errno ? errno : ENOSPC;
}
}
else
#endif
#ifdef USE_LZ4
- if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
{
size_t chunk;
size_t remaining;
@@ -335,7 +348,7 @@ dir_write(Walfile *f, const void *buf, size_t count)
if (LZ4F_isError(compressed))
{
- dir_data->lasterrstring = LZ4F_getErrorName(compressed);
+ f->wwmethod->lasterrstring = LZ4F_getErrorName(compressed);
return -1;
}
@@ -343,7 +356,7 @@ dir_write(Walfile *f, const void *buf, size_t count)
if (write(df->fd, df->lz4buf, compressed) != compressed)
{
/* If write didn't set errno, assume problem is no disk space */
- dir_data->lasterrno = errno ? errno : ENOSPC;
+ f->wwmethod->lasterrno = errno ? errno : ENOSPC;
return -1;
}
@@ -361,7 +374,7 @@ dir_write(Walfile *f, const void *buf, size_t count)
if (r != count)
{
/* If write didn't set errno, assume problem is no disk space */
- dir_data->lasterrno = errno ? errno : ENOSPC;
+ f->wwmethod->lasterrno = errno ? errno : ENOSPC;
}
}
if (r > 0)
@@ -374,14 +387,15 @@ dir_close(Walfile *f, WalCloseMethod method)
{
int r;
DirectoryMethodFile *df = (DirectoryMethodFile *) f;
+ DirectoryMethodData *dir_data = (DirectoryMethodData *) f->wwmethod;
char tmppath[MAXPGPATH];
char tmppath2[MAXPGPATH];
Assert(f != NULL);
- dir_clear_error();
+ clear_error(f->wwmethod);
#ifdef HAVE_LIBZ
- if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
errno = 0; /* in case gzclose() doesn't set it */
r = gzclose(df->gzfp);
@@ -389,7 +403,7 @@ dir_close(Walfile *f, WalCloseMethod method)
else
#endif
#ifdef USE_LZ4
- if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
{
size_t compressed;
@@ -399,7 +413,7 @@ dir_close(Walfile *f, WalCloseMethod method)
if (LZ4F_isError(compressed))
{
- dir_data->lasterrstring = LZ4F_getErrorName(compressed);
+ f->wwmethod->lasterrstring = LZ4F_getErrorName(compressed);
return -1;
}
@@ -407,7 +421,7 @@ dir_close(Walfile *f, WalCloseMethod method)
if (write(df->fd, df->lz4buf, compressed) != compressed)
{
/* If write didn't set errno, assume problem is no disk space */
- dir_data->lasterrno = errno ? errno : ENOSPC;
+ f->wwmethod->lasterrno = errno ? errno : ENOSPC;
return -1;
}
@@ -429,17 +443,18 @@ dir_close(Walfile *f, WalCloseMethod method)
* If we have a temp prefix, normal operation is to rename the
* file.
*/
- filename = dir_get_file_name(df->base.pathname, df->temp_suffix);
+ filename = dir_get_file_name(f->wwmethod, df->base.pathname,
+ df->temp_suffix);
snprintf(tmppath, sizeof(tmppath), "%s/%s",
dir_data->basedir, filename);
pg_free(filename);
/* permanent name, so no need for the prefix */
- filename2 = dir_get_file_name(df->base.pathname, NULL);
+ filename2 = dir_get_file_name(f->wwmethod, df->base.pathname, NULL);
snprintf(tmppath2, sizeof(tmppath2), "%s/%s",
dir_data->basedir, filename2);
pg_free(filename2);
- if (dir_data->sync)
+ if (f->wwmethod->sync)
r = durable_rename(tmppath, tmppath2);
else
{
@@ -456,7 +471,8 @@ dir_close(Walfile *f, WalCloseMethod method)
char *filename;
/* Unlink the file once it's closed */
- filename = dir_get_file_name(df->base.pathname, df->temp_suffix);
+ filename = dir_get_file_name(f->wwmethod, df->base.pathname,
+ df->temp_suffix);
snprintf(tmppath, sizeof(tmppath), "%s/%s",
dir_data->basedir, filename);
pg_free(filename);
@@ -469,7 +485,7 @@ dir_close(Walfile *f, WalCloseMethod method)
* CLOSE_NO_RENAME. In this case, fsync the file and containing
* directory if sync mode is requested.
*/
- if (dir_data->sync)
+ if (f->wwmethod->sync)
{
r = fsync_fname(df->fullpath, false);
if (r == 0)
@@ -479,7 +495,7 @@ dir_close(Walfile *f, WalCloseMethod method)
}
if (r != 0)
- dir_data->lasterrno = errno;
+ f->wwmethod->lasterrno = errno;
#ifdef USE_LZ4
pg_free(df->lz4buf);
@@ -501,23 +517,23 @@ dir_sync(Walfile *f)
int r;
Assert(f != NULL);
- dir_clear_error();
+ clear_error(f->wwmethod);
- if (!dir_data->sync)
+ if (!f->wwmethod->sync)
return 0;
#ifdef HAVE_LIBZ
- if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
if (gzflush(((DirectoryMethodFile *) f)->gzfp, Z_SYNC_FLUSH) != Z_OK)
{
- dir_data->lasterrno = errno;
+ f->wwmethod->lasterrno = errno;
return -1;
}
}
#endif
#ifdef USE_LZ4
- if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
{
DirectoryMethodFile *df = (DirectoryMethodFile *) f;
size_t compressed;
@@ -526,7 +542,7 @@ dir_sync(Walfile *f)
compressed = LZ4F_flush(df->ctx, df->lz4buf, df->lz4bufsize, NULL);
if (LZ4F_isError(compressed))
{
- dir_data->lasterrstring = LZ4F_getErrorName(compressed);
+ f->wwmethod->lasterrstring = LZ4F_getErrorName(compressed);
return -1;
}
@@ -534,7 +550,7 @@ dir_sync(Walfile *f)
if (write(df->fd, df->lz4buf, compressed) != compressed)
{
/* If write didn't set errno, assume problem is no disk space */
- dir_data->lasterrno = errno ? errno : ENOSPC;
+ f->wwmethod->lasterrno = errno ? errno : ENOSPC;
return -1;
}
}
@@ -542,13 +558,14 @@ dir_sync(Walfile *f)
r = fsync(((DirectoryMethodFile *) f)->fd);
if (r < 0)
- dir_data->lasterrno = errno;
+ f->wwmethod->lasterrno = errno;
return r;
}
static ssize_t
-dir_get_file_size(const char *pathname)
+dir_get_file_size(WalWriteMethod *wwmethod, const char *pathname)
{
+ DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
struct stat statbuf;
char tmppath[MAXPGPATH];
@@ -557,26 +574,21 @@ dir_get_file_size(const char *pathname)
if (stat(tmppath, &statbuf) != 0)
{
- dir_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
return -1;
}
return statbuf.st_size;
}
-static pg_compress_algorithm
-dir_compression_algorithm(void)
-{
- return dir_data->compression_algorithm;
-}
-
static bool
-dir_existsfile(const char *pathname)
+dir_existsfile(WalWriteMethod *wwmethod, const char *pathname)
{
+ DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
char tmppath[MAXPGPATH];
int fd;
- dir_clear_error();
+ clear_error(wwmethod);
snprintf(tmppath, sizeof(tmppath), "%s/%s",
dir_data->basedir, pathname);
@@ -589,60 +601,54 @@ dir_existsfile(const char *pathname)
}
static bool
-dir_finish(void)
+dir_finish(WalWriteMethod *wwmethod)
{
- dir_clear_error();
+ clear_error(wwmethod);
- if (dir_data->sync)
+ if (wwmethod->sync)
{
+ DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
+
/*
* Files are fsynced when they are closed, but we need to fsync the
* directory entry here as well.
*/
if (fsync_fname(dir_data->basedir, true) != 0)
{
- dir_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
return false;
}
}
return true;
}
+static void
+dir_free(WalWriteMethod *wwmethod)
+{
+ DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
+
+ pg_free(dir_data->basedir);
+ pg_free(wwmethod);
+}
+
WalWriteMethod *
CreateWalDirectoryMethod(const char *basedir,
pg_compress_algorithm compression_algorithm,
int compression_level, bool sync)
{
- WalWriteMethod *method;
-
- method = pg_malloc0(sizeof(WalWriteMethod));
- method->open_for_write = dir_open_for_write;
- method->write = dir_write;
- method->get_file_size = dir_get_file_size;
- method->get_file_name = dir_get_file_name;
- method->compression_algorithm = dir_compression_algorithm;
- method->close = dir_close;
- method->sync = dir_sync;
- method->existsfile = dir_existsfile;
- method->finish = dir_finish;
- method->getlasterror = dir_getlasterror;
-
- dir_data = pg_malloc0(sizeof(DirectoryMethodData));
- dir_data->compression_algorithm = compression_algorithm;
- dir_data->compression_level = compression_level;
- dir_data->basedir = pg_strdup(basedir);
- dir_data->sync = sync;
-
- return method;
-}
-
-void
-FreeWalDirectoryMethod(void)
-{
- pg_free(dir_data->basedir);
- pg_free(dir_data);
- dir_data = NULL;
+ DirectoryMethodData *wwmethod;
+
+ wwmethod = pg_malloc0(sizeof(DirectoryMethodData));
+ *((const WalWriteMethodOps **) &wwmethod->base.ops) =
+ &WalDirectoryMethodOps;
+ wwmethod->base.compression_algorithm = compression_algorithm;
+ wwmethod->base.compression_level = compression_level;
+ wwmethod->base.sync = sync;
+ clear_error(&wwmethod->base);
+ wwmethod->basedir = pg_strdup(basedir);
+
+ return &wwmethod->base;
}
@@ -651,6 +657,33 @@ FreeWalDirectoryMethod(void)
*-------------------------------------------------------------------------
*/
+static Walfile *tar_open_for_write(WalWriteMethod *wwmethod,
+ const char *pathname,
+ const char *temp_suffix,
+ size_t pad_to_size);
+static int tar_close(Walfile *f, WalCloseMethod method);
+static bool tar_existsfile(WalWriteMethod *wwmethod, const char *pathname);
+static ssize_t tar_get_file_size(WalWriteMethod *wwmethod,
+ const char *pathname);
+static char *tar_get_file_name(WalWriteMethod *wwmethod,
+ const char *pathname, const char *temp_suffix);
+static ssize_t tar_write(Walfile *f, const void *buf, size_t count);
+static int tar_sync(Walfile *f);
+static bool tar_finish(WalWriteMethod *wwmethod);
+static void tar_free(WalWriteMethod *wwmethod);
+
+const WalWriteMethodOps WalTarMethodOps = {
+ .open_for_write = tar_open_for_write,
+ .close = tar_close,
+ .existsfile = tar_existsfile,
+ .get_file_size = tar_get_file_size,
+ .get_file_name = tar_get_file_name,
+ .write = tar_write,
+ .sync = tar_sync,
+ .finish = tar_finish,
+ .free = tar_free
+};
+
typedef struct TarMethodFile
{
Walfile base;
@@ -661,37 +694,20 @@ typedef struct TarMethodFile
typedef struct TarMethodData
{
+ WalWriteMethod base;
char *tarfilename;
int fd;
- pg_compress_algorithm compression_algorithm;
- int compression_level;
- bool sync;
TarMethodFile *currentfile;
- const char *lasterrstring; /* if set, takes precedence over lasterrno */
- int lasterrno;
#ifdef HAVE_LIBZ
z_streamp zp;
void *zlibOut;
#endif
} TarMethodData;
-static TarMethodData *tar_data = NULL;
-
-#define tar_clear_error() \
- (tar_data->lasterrstring = NULL, tar_data->lasterrno = 0)
-#define tar_set_error(msg) \
- (tar_data->lasterrstring = _(msg))
-
-static const char *
-tar_getlasterror(void)
-{
- if (tar_data->lasterrstring)
- return tar_data->lasterrstring;
- return strerror(tar_data->lasterrno);
-}
#ifdef HAVE_LIBZ
static bool
-tar_write_compressed_data(void *buf, size_t count, bool flush)
+tar_write_compressed_data(TarMethodData *tar_data, void *buf, size_t count,
+ bool flush)
{
tar_data->zp->next_in = buf;
tar_data->zp->avail_in = count;
@@ -703,7 +719,7 @@ tar_write_compressed_data(void *buf, size_t count, bool flush)
r = deflate(tar_data->zp, flush ? Z_FINISH : Z_NO_FLUSH);
if (r == Z_STREAM_ERROR)
{
- tar_set_error("could not compress data");
+ tar_data->base.lasterrstring = "could not compress data";
return false;
}
@@ -715,7 +731,7 @@ tar_write_compressed_data(void *buf, size_t count, bool flush)
if (write(tar_data->fd, tar_data->zlibOut, len) != len)
{
/* If write didn't set errno, assume problem is no disk space */
- tar_data->lasterrno = errno ? errno : ENOSPC;
+ tar_data->base.lasterrno = errno ? errno : ENOSPC;
return false;
}
@@ -732,7 +748,7 @@ tar_write_compressed_data(void *buf, size_t count, bool flush)
/* Reset the stream for writing */
if (deflateReset(tar_data->zp) != Z_OK)
{
- tar_set_error("could not reset compression stream");
+ tar_data->base.lasterrstring = "could not reset compression stream";
return false;
}
}
@@ -744,29 +760,31 @@ tar_write_compressed_data(void *buf, size_t count, bool flush)
static ssize_t
tar_write(Walfile *f, const void *buf, size_t count)
{
+ TarMethodData *tar_data = (TarMethodData *) f->wwmethod;
ssize_t r;
Assert(f != NULL);
- tar_clear_error();
+ clear_error(f->wwmethod);
/* Tarfile will always be positioned at the end */
- if (tar_data->compression_algorithm == PG_COMPRESSION_NONE)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
{
errno = 0;
r = write(tar_data->fd, buf, count);
if (r != count)
{
/* If write didn't set errno, assume problem is no disk space */
- tar_data->lasterrno = errno ? errno : ENOSPC;
+ f->wwmethod->lasterrno = errno ? errno : ENOSPC;
return -1;
}
f->currpos += r;
return r;
}
#ifdef HAVE_LIBZ
- else if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ else if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
- if (!tar_write_compressed_data(unconstify(void *, buf), count, false))
+ if (!tar_write_compressed_data(tar_data, unconstify(void *, buf),
+ count, false))
return -1;
f->currpos += count;
return count;
@@ -775,7 +793,7 @@ tar_write(Walfile *f, const void *buf, size_t count)
else
{
/* Can't happen - compression enabled with no method set */
- tar_data->lasterrno = ENOSYS;
+ f->wwmethod->lasterrno = ENOSYS;
return -1;
}
}
@@ -801,7 +819,8 @@ tar_write_padding_data(TarMethodFile *f, size_t bytes)
}
static char *
-tar_get_file_name(const char *pathname, const char *temp_suffix)
+tar_get_file_name(WalWriteMethod *wwmethod, const char *pathname,
+ const char *temp_suffix)
{
char *filename = pg_malloc0(MAXPGPATH * sizeof(char));
@@ -812,11 +831,13 @@ tar_get_file_name(const char *pathname, const char *temp_suffix)
}
static Walfile *
-tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
+tar_open_for_write(WalWriteMethod *wwmethod, const char *pathname,
+ const char *temp_suffix, size_t pad_to_size)
{
+ TarMethodData *tar_data = (TarMethodData *) wwmethod;
char *tmppath;
- tar_clear_error();
+ clear_error(wwmethod);
if (tar_data->fd < 0)
{
@@ -828,12 +849,12 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
pg_file_create_mode);
if (tar_data->fd < 0)
{
- tar_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
return NULL;
}
#ifdef HAVE_LIBZ
- if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
tar_data->zp = (z_streamp) pg_malloc(sizeof(z_stream));
tar_data->zp->zalloc = Z_NULL;
@@ -847,12 +868,13 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
* default 15 for the windowBits parameter makes the output be
* gzip instead of zlib.
*/
- if (deflateInit2(tar_data->zp, tar_data->compression_level,
+ if (deflateInit2(tar_data->zp, wwmethod->compression_level,
Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK)
{
pg_free(tar_data->zp);
tar_data->zp = NULL;
- tar_set_error("could not initialize compression library");
+ wwmethod->lasterrstring =
+ "could not initialize compression library";
return NULL;
}
}
@@ -863,13 +885,15 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
if (tar_data->currentfile != NULL)
{
- tar_set_error("implementation error: tar files can't have more than one open file");
+ wwmethod->lasterrstring =
+ "implementation error: tar files can't have more than one open file";
return NULL;
}
tar_data->currentfile = pg_malloc0(sizeof(TarMethodFile));
+ tar_data->currentfile->base.wwmethod = wwmethod;
- tmppath = tar_get_file_name(pathname, temp_suffix);
+ tmppath = tar_get_file_name(wwmethod, pathname, temp_suffix);
/* Create a header with size set to 0 - we will fill out the size on close */
if (tarCreateHeader(tar_data->currentfile->header, tmppath, NULL, 0, S_IRUSR | S_IWUSR, 0, 0, time(NULL)) != TAR_OK)
@@ -877,23 +901,24 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
pg_free(tar_data->currentfile);
pg_free(tmppath);
tar_data->currentfile = NULL;
- tar_set_error("could not create tar header");
+ wwmethod->lasterrstring = "could not create tar header";
return NULL;
}
pg_free(tmppath);
#ifdef HAVE_LIBZ
- if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
/* Flush existing data */
- if (!tar_write_compressed_data(NULL, 0, true))
+ if (!tar_write_compressed_data(tar_data, NULL, 0, true))
return NULL;
/* Turn off compression for header */
if (deflateParams(tar_data->zp, 0, Z_DEFAULT_STRATEGY) != Z_OK)
{
- tar_set_error("could not change compression parameters");
+ wwmethod->lasterrstring =
+ "could not change compression parameters";
return NULL;
}
}
@@ -902,39 +927,39 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
tar_data->currentfile->ofs_start = lseek(tar_data->fd, 0, SEEK_CUR);
if (tar_data->currentfile->ofs_start == -1)
{
- tar_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
pg_free(tar_data->currentfile);
tar_data->currentfile = NULL;
return NULL;
}
tar_data->currentfile->base.currpos = 0;
- if (tar_data->compression_algorithm == PG_COMPRESSION_NONE)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
{
errno = 0;
if (write(tar_data->fd, tar_data->currentfile->header,
TAR_BLOCK_SIZE) != TAR_BLOCK_SIZE)
{
/* If write didn't set errno, assume problem is no disk space */
- tar_data->lasterrno = errno ? errno : ENOSPC;
+ wwmethod->lasterrno = errno ? errno : ENOSPC;
pg_free(tar_data->currentfile);
tar_data->currentfile = NULL;
return NULL;
}
}
#ifdef HAVE_LIBZ
- else if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ else if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
/* Write header through the zlib APIs but with no compression */
- if (!tar_write_compressed_data(tar_data->currentfile->header,
+ if (!tar_write_compressed_data(tar_data, tar_data->currentfile->header,
TAR_BLOCK_SIZE, true))
return NULL;
/* Re-enable compression for the rest of the file */
- if (deflateParams(tar_data->zp, tar_data->compression_level,
+ if (deflateParams(tar_data->zp, wwmethod->compression_level,
Z_DEFAULT_STRATEGY) != Z_OK)
{
- tar_set_error("could not change compression parameters");
+ wwmethod->lasterrstring = "could not change compression parameters";
return NULL;
}
}
@@ -954,7 +979,7 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
if (pad_to_size)
{
tar_data->currentfile->pad_to_size = pad_to_size;
- if (tar_data->compression_algorithm == PG_COMPRESSION_NONE)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
{
/* Uncompressed, so pad now */
if (!tar_write_padding_data(tar_data->currentfile, pad_to_size))
@@ -964,7 +989,7 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
tar_data->currentfile->ofs_start + TAR_BLOCK_SIZE,
SEEK_SET) != tar_data->currentfile->ofs_start + TAR_BLOCK_SIZE)
{
- tar_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
return NULL;
}
@@ -976,42 +1001,37 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
}
static ssize_t
-tar_get_file_size(const char *pathname)
+tar_get_file_size(WalWriteMethod *wwmethod, const char *pathname)
{
- tar_clear_error();
+ clear_error(wwmethod);
/* Currently not used, so not supported */
- tar_data->lasterrno = ENOSYS;
+ wwmethod->lasterrno = ENOSYS;
return -1;
}
-static pg_compress_algorithm
-tar_compression_algorithm(void)
-{
- return tar_data->compression_algorithm;
-}
-
static int
tar_sync(Walfile *f)
{
+ TarMethodData *tar_data = (TarMethodData *) f->wwmethod;
int r;
Assert(f != NULL);
- tar_clear_error();
+ clear_error(f->wwmethod);
- if (!tar_data->sync)
+ if (!f->wwmethod->sync)
return 0;
/*
* Always sync the whole tarfile, because that's all we can do. This makes
* no sense on compressed files, so just ignore those.
*/
- if (tar_data->compression_algorithm != PG_COMPRESSION_NONE)
+ if (f->wwmethod->compression_algorithm != PG_COMPRESSION_NONE)
return 0;
r = fsync(tar_data->fd);
if (r < 0)
- tar_data->lasterrno = errno;
+ f->wwmethod->lasterrno = errno;
return r;
}
@@ -1020,16 +1040,17 @@ tar_close(Walfile *f, WalCloseMethod method)
{
ssize_t filesize;
int padding;
+ TarMethodData *tar_data = (TarMethodData *) f->wwmethod;
TarMethodFile *tf = (TarMethodFile *) f;
Assert(f != NULL);
- tar_clear_error();
+ clear_error(f->wwmethod);
if (method == CLOSE_UNLINK)
{
- if (tar_data->compression_algorithm != PG_COMPRESSION_NONE)
+ if (f->wwmethod->compression_algorithm != PG_COMPRESSION_NONE)
{
- tar_set_error("unlink not supported with compression");
+ f->wwmethod->lasterrstring = "unlink not supported with compression";
return -1;
}
@@ -1040,7 +1061,7 @@ tar_close(Walfile *f, WalCloseMethod method)
*/
if (ftruncate(tar_data->fd, tf->ofs_start) != 0)
{
- tar_data->lasterrno = errno;
+ f->wwmethod->lasterrno = errno;
return -1;
}
@@ -1058,7 +1079,7 @@ tar_close(Walfile *f, WalCloseMethod method)
*/
if (tf->pad_to_size)
{
- if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
/*
* A compressed tarfile is padded on close since we cannot know
@@ -1098,10 +1119,10 @@ tar_close(Walfile *f, WalCloseMethod method)
#ifdef HAVE_LIBZ
- if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
/* Flush the current buffer */
- if (!tar_write_compressed_data(NULL, 0, true))
+ if (!tar_write_compressed_data(tar_data, NULL, 0, true))
return -1;
}
#endif
@@ -1124,39 +1145,39 @@ tar_close(Walfile *f, WalCloseMethod method)
print_tar_number(&(tf->header[148]), 8, tarChecksum(((TarMethodFile *) f)->header));
if (lseek(tar_data->fd, tf->ofs_start, SEEK_SET) != ((TarMethodFile *) f)->ofs_start)
{
- tar_data->lasterrno = errno;
+ f->wwmethod->lasterrno = errno;
return -1;
}
- if (tar_data->compression_algorithm == PG_COMPRESSION_NONE)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
{
errno = 0;
if (write(tar_data->fd, tf->header, TAR_BLOCK_SIZE) != TAR_BLOCK_SIZE)
{
/* If write didn't set errno, assume problem is no disk space */
- tar_data->lasterrno = errno ? errno : ENOSPC;
+ f->wwmethod->lasterrno = errno ? errno : ENOSPC;
return -1;
}
}
#ifdef HAVE_LIBZ
- else if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ else if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
/* Turn off compression */
if (deflateParams(tar_data->zp, 0, Z_DEFAULT_STRATEGY) != Z_OK)
{
- tar_set_error("could not change compression parameters");
+ f->wwmethod->lasterrstring = "could not change compression parameters";
return -1;
}
/* Overwrite the header, assuming the size will be the same */
- if (!tar_write_compressed_data(tar_data->currentfile->header,
+ if (!tar_write_compressed_data(tar_data, tar_data->currentfile->header,
TAR_BLOCK_SIZE, true))
return -1;
/* Turn compression back on */
- if (deflateParams(tar_data->zp, tar_data->compression_level,
+ if (deflateParams(tar_data->zp, f->wwmethod->compression_level,
Z_DEFAULT_STRATEGY) != Z_OK)
{
- tar_set_error("could not change compression parameters");
+ f->wwmethod->lasterrstring = "could not change compression parameters";
return -1;
}
}
@@ -1170,7 +1191,7 @@ tar_close(Walfile *f, WalCloseMethod method)
/* Move file pointer back down to end, so we can write the next file */
if (lseek(tar_data->fd, 0, SEEK_END) < 0)
{
- tar_data->lasterrno = errno;
+ f->wwmethod->lasterrno = errno;
return -1;
}
@@ -1179,7 +1200,7 @@ tar_close(Walfile *f, WalCloseMethod method)
{
/* XXX this seems pretty bogus; why is only this case fatal? */
pg_fatal("could not fsync file \"%s\": %s",
- tf->base.pathname, tar_getlasterror());
+ tf->base.pathname, GetLastWalMethodError(f->wwmethod));
}
/* Clean up and done */
@@ -1191,19 +1212,20 @@ tar_close(Walfile *f, WalCloseMethod method)
}
static bool
-tar_existsfile(const char *pathname)
+tar_existsfile(WalWriteMethod *wwmethod, const char *pathname)
{
- tar_clear_error();
+ clear_error(wwmethod);
/* We only deal with new tarfiles, so nothing externally created exists */
return false;
}
static bool
-tar_finish(void)
+tar_finish(WalWriteMethod *wwmethod)
{
+ TarMethodData *tar_data = (TarMethodData *) wwmethod;
char zerobuf[1024] = {0};
- tar_clear_error();
+ clear_error(wwmethod);
if (tar_data->currentfile)
{
@@ -1212,20 +1234,21 @@ tar_finish(void)
}
/* A tarfile always ends with two empty blocks */
- if (tar_data->compression_algorithm == PG_COMPRESSION_NONE)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
{
errno = 0;
if (write(tar_data->fd, zerobuf, sizeof(zerobuf)) != sizeof(zerobuf))
{
/* If write didn't set errno, assume problem is no disk space */
- tar_data->lasterrno = errno ? errno : ENOSPC;
+ wwmethod->lasterrno = errno ? errno : ENOSPC;
return false;
}
}
#ifdef HAVE_LIBZ
- else if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ else if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
- if (!tar_write_compressed_data(zerobuf, sizeof(zerobuf), false))
+ if (!tar_write_compressed_data(tar_data, zerobuf, sizeof(zerobuf),
+ false))
return false;
/* Also flush all data to make sure the gzip stream is finished */
@@ -1239,7 +1262,7 @@ tar_finish(void)
if (r == Z_STREAM_ERROR)
{
- tar_set_error("could not compress data");
+ wwmethod->lasterrstring = "could not compress data";
return false;
}
if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
@@ -1253,7 +1276,7 @@ tar_finish(void)
* If write didn't set errno, assume problem is no disk
* space.
*/
- tar_data->lasterrno = errno ? errno : ENOSPC;
+ wwmethod->lasterrno = errno ? errno : ENOSPC;
return false;
}
}
@@ -1263,7 +1286,7 @@ tar_finish(void)
if (deflateEnd(tar_data->zp) != Z_OK)
{
- tar_set_error("could not close compression stream");
+ wwmethod->lasterrstring = "could not close compression stream";
return false;
}
}
@@ -1275,29 +1298,29 @@ tar_finish(void)
}
/* sync the empty blocks as well, since they're after the last file */
- if (tar_data->sync)
+ if (wwmethod->sync)
{
if (fsync(tar_data->fd) != 0)
{
- tar_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
return false;
}
}
if (close(tar_data->fd) != 0)
{
- tar_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
return false;
}
tar_data->fd = -1;
- if (tar_data->sync)
+ if (wwmethod->sync)
{
if (fsync_fname(tar_data->tarfilename, false) != 0 ||
fsync_parent_path(tar_data->tarfilename) != 0)
{
- tar_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
return false;
}
}
@@ -1305,6 +1328,19 @@ tar_finish(void)
return true;
}
+static void
+tar_free(WalWriteMethod *wwmethod)
+{
+ TarMethodData *tar_data = (TarMethodData *) wwmethod;
+
+ pg_free(tar_data->tarfilename);
+#ifdef HAVE_LIBZ
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
+ pg_free(tar_data->zlibOut);
+#endif
+ pg_free(wwmethod);
+}
+
/*
* The argument compression_algorithm is currently ignored. It is in place for
* symmetry with CreateWalDirectoryMethod which uses it for distinguishing
@@ -1316,45 +1352,33 @@ CreateWalTarMethod(const char *tarbase,
pg_compress_algorithm compression_algorithm,
int compression_level, bool sync)
{
- WalWriteMethod *method;
+ TarMethodData *wwmethod;
const char *suffix = (compression_algorithm == PG_COMPRESSION_GZIP) ?
".tar.gz" : ".tar";
- method = pg_malloc0(sizeof(WalWriteMethod));
- method->open_for_write = tar_open_for_write;
- method->write = tar_write;
- method->get_file_size = tar_get_file_size;
- method->get_file_name = tar_get_file_name;
- method->compression_algorithm = tar_compression_algorithm;
- method->close = tar_close;
- method->sync = tar_sync;
- method->existsfile = tar_existsfile;
- method->finish = tar_finish;
- method->getlasterror = tar_getlasterror;
-
- tar_data = pg_malloc0(sizeof(TarMethodData));
- tar_data->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1);
- sprintf(tar_data->tarfilename, "%s%s", tarbase, suffix);
- tar_data->fd = -1;
- tar_data->compression_algorithm = compression_algorithm;
- tar_data->compression_level = compression_level;
- tar_data->sync = sync;
+ wwmethod = pg_malloc0(sizeof(TarMethodData));
+ *((const WalWriteMethodOps **) &wwmethod->base.ops) =
+ &WalTarMethodOps;
+ wwmethod->base.compression_algorithm = compression_algorithm;
+ wwmethod->base.compression_level = compression_level;
+ wwmethod->base.sync = sync;
+ clear_error(&wwmethod->base);
+
+ wwmethod->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1);
+ sprintf(wwmethod->tarfilename, "%s%s", tarbase, suffix);
+ wwmethod->fd = -1;
#ifdef HAVE_LIBZ
if (compression_algorithm == PG_COMPRESSION_GZIP)
- tar_data->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
+ wwmethod->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
#endif
- return method;
+ return &wwmethod->base;
}
-void
-FreeWalTarMethod(void)
+const char *
+GetLastWalMethodError(WalWriteMethod *wwmethod)
{
- pg_free(tar_data->tarfilename);
-#ifdef HAVE_LIBZ
- if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
- pg_free(tar_data->zlibOut);
-#endif
- pg_free(tar_data);
- tar_data = NULL;
+ if (wwmethod->lasterrstring)
+ return wwmethod->lasterrstring;
+ return strerror(wwmethod->lasterrno);
}