aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/bin/pg_basebackup/pg_basebackup.c8
-rw-r--r--src/bin/pg_basebackup/pg_receivewal.c6
-rw-r--r--src/bin/pg_basebackup/receivelog.c89
-rw-r--r--src/bin/pg_basebackup/walmethods.c510
-rw-r--r--src/bin/pg_basebackup/walmethods.h60
5 files changed, 357 insertions, 316 deletions
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index a830b199f54..876d20611b6 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -570,7 +570,7 @@ LogStreamerMain(logstreamer_param *param)
return 1;
}
- if (!stream.walmethod->finish())
+ if (!stream.walmethod->ops->finish(stream.walmethod))
{
pg_log_error("could not finish writing WAL files: %m");
#ifdef WIN32
@@ -581,11 +581,7 @@ LogStreamerMain(logstreamer_param *param)
PQfinish(param->bgconn);
- if (format == 'p')
- FreeWalDirectoryMethod();
- else
- FreeWalTarMethod();
- pg_free(stream.walmethod);
+ stream.walmethod->ops->free(stream.walmethod);
return 0;
}
diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c
index 5c22c914bc7..a7180e2955b 100644
--- a/src/bin/pg_basebackup/pg_receivewal.c
+++ b/src/bin/pg_basebackup/pg_receivewal.c
@@ -658,7 +658,7 @@ StreamLog(void)
ReceiveXlogStream(conn, &stream);
- if (!stream.walmethod->finish())
+ if (!stream.walmethod->ops->finish(stream.walmethod))
{
pg_log_info("could not finish writing WAL files: %m");
return;
@@ -667,9 +667,7 @@ StreamLog(void)
PQfinish(conn);
conn = NULL;
- FreeWalDirectoryMethod();
- pg_free(stream.walmethod);
- pg_free(stream.sysidentifier);
+ stream.walmethod->ops->free(stream.walmethod);
}
/*
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index a619176511f..9c71323d708 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -59,18 +59,19 @@ mark_file_as_archived(StreamCtl *stream, const char *fname)
snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
fname);
- f = stream->walmethod->open_for_write(tmppath, NULL, 0);
+ f = stream->walmethod->ops->open_for_write(stream->walmethod, tmppath,
+ NULL, 0);
if (f == NULL)
{
pg_log_error("could not create archive status file \"%s\": %s",
- tmppath, stream->walmethod->getlasterror());
+ tmppath, GetLastWalMethodError(stream->walmethod));
return false;
}
- if (stream->walmethod->close(f, CLOSE_NORMAL) != 0)
+ if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
{
pg_log_error("could not close archive status file \"%s\": %s",
- tmppath, stream->walmethod->getlasterror());
+ tmppath, GetLastWalMethodError(stream->walmethod));
return false;
}
@@ -98,8 +99,9 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
XLogFileName(walfile_name, stream->timeline, segno, WalSegSz);
/* Note that this considers the compression used if necessary */
- fn = stream->walmethod->get_file_name(walfile_name,
- stream->partial_suffix);
+ fn = stream->walmethod->ops->get_file_name(stream->walmethod,
+ walfile_name,
+ stream->partial_suffix);
/*
* When streaming to files, if an existing file exists we verify that it's
@@ -111,35 +113,35 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
* When streaming to tar, no file with this name will exist before, so we
* never have to verify a size.
*/
- if (stream->walmethod->compression_algorithm() == PG_COMPRESSION_NONE &&
- stream->walmethod->existsfile(fn))
+ if (stream->walmethod->compression_algorithm == PG_COMPRESSION_NONE &&
+ stream->walmethod->ops->existsfile(stream->walmethod, fn))
{
- size = stream->walmethod->get_file_size(fn);
+ size = stream->walmethod->ops->get_file_size(stream->walmethod, fn);
if (size < 0)
{
pg_log_error("could not get size of write-ahead log file \"%s\": %s",
- fn, stream->walmethod->getlasterror());
+ fn, GetLastWalMethodError(stream->walmethod));
pg_free(fn);
return false;
}
if (size == WalSegSz)
{
/* Already padded file. Open it for use */
- f = stream->walmethod->open_for_write(walfile_name, stream->partial_suffix, 0);
+ f = stream->walmethod->ops->open_for_write(stream->walmethod, walfile_name, stream->partial_suffix, 0);
if (f == NULL)
{
pg_log_error("could not open existing write-ahead log file \"%s\": %s",
- fn, stream->walmethod->getlasterror());
+ fn, GetLastWalMethodError(stream->walmethod));
pg_free(fn);
return false;
}
/* fsync file in case of a previous crash */
- if (stream->walmethod->sync(f) != 0)
+ if (stream->walmethod->ops->sync(f) != 0)
{
pg_log_error("could not fsync existing write-ahead log file \"%s\": %s",
- fn, stream->walmethod->getlasterror());
- stream->walmethod->close(f, CLOSE_UNLINK);
+ fn, GetLastWalMethodError(stream->walmethod));
+ stream->walmethod->ops->close(f, CLOSE_UNLINK);
exit(1);
}
@@ -164,12 +166,14 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
/* No file existed, so create one */
- f = stream->walmethod->open_for_write(walfile_name,
- stream->partial_suffix, WalSegSz);
+ f = stream->walmethod->ops->open_for_write(stream->walmethod,
+ walfile_name,
+ stream->partial_suffix,
+ WalSegSz);
if (f == NULL)
{
pg_log_error("could not open write-ahead log file \"%s\": %s",
- fn, stream->walmethod->getlasterror());
+ fn, GetLastWalMethodError(stream->walmethod));
pg_free(fn);
return false;
}
@@ -199,28 +203,29 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos)
currpos = walfile->currpos;
/* Note that this considers the compression used if necessary */
- fn = stream->walmethod->get_file_name(walfile_name,
- stream->partial_suffix);
+ fn = stream->walmethod->ops->get_file_name(stream->walmethod,
+ walfile_name,
+ stream->partial_suffix);
if (stream->partial_suffix)
{
if (currpos == WalSegSz)
- r = stream->walmethod->close(walfile, CLOSE_NORMAL);
+ r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
else
{
pg_log_info("not renaming \"%s\", segment is not complete", fn);
- r = stream->walmethod->close(walfile, CLOSE_NO_RENAME);
+ r = stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME);
}
}
else
- r = stream->walmethod->close(walfile, CLOSE_NORMAL);
+ r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
walfile = NULL;
if (r != 0)
{
pg_log_error("could not close file \"%s\": %s",
- fn, stream->walmethod->getlasterror());
+ fn, GetLastWalMethodError(stream->walmethod));
pg_free(fn);
return false;
@@ -263,7 +268,7 @@ existsTimeLineHistoryFile(StreamCtl *stream)
TLHistoryFileName(histfname, stream->timeline);
- return stream->walmethod->existsfile(histfname);
+ return stream->walmethod->ops->existsfile(stream->walmethod, histfname);
}
static bool
@@ -285,31 +290,32 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
return false;
}
- f = stream->walmethod->open_for_write(histfname, ".tmp", 0);
+ f = stream->walmethod->ops->open_for_write(stream->walmethod,
+ histfname, ".tmp", 0);
if (f == NULL)
{
pg_log_error("could not create timeline history file \"%s\": %s",
- histfname, stream->walmethod->getlasterror());
+ histfname, GetLastWalMethodError(stream->walmethod));
return false;
}
- if ((int) stream->walmethod->write(f, content, size) != size)
+ if ((int) stream->walmethod->ops->write(f, content, size) != size)
{
pg_log_error("could not write timeline history file \"%s\": %s",
- histfname, stream->walmethod->getlasterror());
+ histfname, GetLastWalMethodError(stream->walmethod));
/*
* If we fail to make the file, delete it to release disk space
*/
- stream->walmethod->close(f, CLOSE_UNLINK);
+ stream->walmethod->ops->close(f, CLOSE_UNLINK);
return false;
}
- if (stream->walmethod->close(f, CLOSE_NORMAL) != 0)
+ if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
{
pg_log_error("could not close file \"%s\": %s",
- histfname, stream->walmethod->getlasterror());
+ histfname, GetLastWalMethodError(stream->walmethod));
return false;
}
@@ -678,9 +684,9 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
}
error:
- if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0)
+ if (walfile != NULL && stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME) != 0)
pg_log_error("could not close file \"%s\": %s",
- walfile->pathname, stream->walmethod->getlasterror());
+ walfile->pathname, GetLastWalMethodError(stream->walmethod));
walfile = NULL;
return false;
}
@@ -765,9 +771,9 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
*/
if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
{
- if (stream->walmethod->sync(walfile) != 0)
+ if (stream->walmethod->ops->sync(walfile) != 0)
pg_fatal("could not fsync file \"%s\": %s",
- walfile->pathname, stream->walmethod->getlasterror());
+ walfile->pathname, GetLastWalMethodError(stream->walmethod));
lastFlushPosition = blockpos;
/*
@@ -1012,9 +1018,9 @@ ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
* data has been successfully replicated or not, at the normal
* shutdown of the server.
*/
- if (stream->walmethod->sync(walfile) != 0)
+ if (stream->walmethod->ops->sync(walfile) != 0)
pg_fatal("could not fsync file \"%s\": %s",
- walfile->pathname, stream->walmethod->getlasterror());
+ walfile->pathname, GetLastWalMethodError(stream->walmethod));
lastFlushPosition = blockpos;
}
@@ -1115,12 +1121,13 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
}
}
- if (stream->walmethod->write(walfile, copybuf + hdr_len + bytes_written,
- bytes_to_write) != bytes_to_write)
+ if (stream->walmethod->ops->write(walfile,
+ copybuf + hdr_len + bytes_written,
+ bytes_to_write) != bytes_to_write)
{
pg_log_error("could not write %d bytes to WAL file \"%s\": %s",
bytes_to_write, walfile->pathname,
- stream->walmethod->getlasterror());
+ GetLastWalMethodError(stream->walmethod));
return false;
}
diff --git a/src/bin/pg_basebackup/walmethods.c b/src/bin/pg_basebackup/walmethods.c
index d98a2681b90..bc2e83d02be 100644
--- a/src/bin/pg_basebackup/walmethods.c
+++ b/src/bin/pg_basebackup/walmethods.c
@@ -2,9 +2,6 @@
*
* walmethods.c - implementations of different ways to write received wal
*
- * NOTE! The caller must ensure that only one method is instantiated in
- * any given program, and that it's only instantiated once!
- *
* Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
*
* IDENTIFICATION
@@ -43,19 +40,41 @@
*-------------------------------------------------------------------------
*/
+static Walfile *dir_open_for_write(WalWriteMethod *wwmethod,
+ const char *pathname,
+ const char *temp_suffix,
+ size_t pad_to_size);
+static int dir_close(Walfile *f, WalCloseMethod method);
+static bool dir_existsfile(WalWriteMethod *wwmethod, const char *pathname);
+static ssize_t dir_get_file_size(WalWriteMethod *wwmethod,
+ const char *pathname);
+static char *dir_get_file_name(WalWriteMethod *wwmethod,
+ const char *pathname, const char *temp_suffix);
+static ssize_t dir_write(Walfile *f, const void *buf, size_t count);
+static int dir_sync(Walfile *f);
+static bool dir_finish(WalWriteMethod *wwmethod);
+static void dir_free(WalWriteMethod *wwmethod);
+
+const WalWriteMethodOps WalDirectoryMethodOps = {
+ .open_for_write = dir_open_for_write,
+ .close = dir_close,
+ .existsfile = dir_existsfile,
+ .get_file_size = dir_get_file_size,
+ .get_file_name = dir_get_file_name,
+ .write = dir_write,
+ .sync = dir_sync,
+ .finish = dir_finish,
+ .free = dir_free
+};
+
/*
* Global static data for this method
*/
typedef struct DirectoryMethodData
{
+ WalWriteMethod base;
char *basedir;
- pg_compress_algorithm compression_algorithm;
- int compression_level;
- bool sync;
- const char *lasterrstring; /* if set, takes precedence over lasterrno */
- int lasterrno;
} DirectoryMethodData;
-static DirectoryMethodData *dir_data = NULL;
/*
* Local file handle
@@ -76,36 +95,29 @@ typedef struct DirectoryMethodFile
#endif
} DirectoryMethodFile;
-#define dir_clear_error() \
- (dir_data->lasterrstring = NULL, dir_data->lasterrno = 0)
-#define dir_set_error(msg) \
- (dir_data->lasterrstring = _(msg))
-
-static const char *
-dir_getlasterror(void)
-{
- if (dir_data->lasterrstring)
- return dir_data->lasterrstring;
- return strerror(dir_data->lasterrno);
-}
+#define clear_error(wwmethod) \
+ ((wwmethod)->lasterrstring = NULL, (wwmethod)->lasterrno = 0)
static char *
-dir_get_file_name(const char *pathname, const char *temp_suffix)
+dir_get_file_name(WalWriteMethod *wwmethod,
+ const char *pathname, const char *temp_suffix)
{
char *filename = pg_malloc0(MAXPGPATH * sizeof(char));
snprintf(filename, MAXPGPATH, "%s%s%s",
pathname,
- dir_data->compression_algorithm == PG_COMPRESSION_GZIP ? ".gz" :
- dir_data->compression_algorithm == PG_COMPRESSION_LZ4 ? ".lz4" : "",
+ wwmethod->compression_algorithm == PG_COMPRESSION_GZIP ? ".gz" :
+ wwmethod->compression_algorithm == PG_COMPRESSION_LZ4 ? ".lz4" : "",
temp_suffix ? temp_suffix : "");
return filename;
}
static Walfile *
-dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
+dir_open_for_write(WalWriteMethod *wwmethod, const char *pathname,
+ const char *temp_suffix, size_t pad_to_size)
{
+ DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
char tmppath[MAXPGPATH];
char *filename;
int fd;
@@ -119,9 +131,9 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
void *lz4buf = NULL;
#endif
- dir_clear_error();
+ clear_error(wwmethod);
- filename = dir_get_file_name(pathname, temp_suffix);
+ filename = dir_get_file_name(wwmethod, pathname, temp_suffix);
snprintf(tmppath, sizeof(tmppath), "%s/%s",
dir_data->basedir, filename);
pg_free(filename);
@@ -135,32 +147,32 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, pg_file_create_mode);
if (fd < 0)
{
- dir_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
return NULL;
}
#ifdef HAVE_LIBZ
- if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
gzfp = gzdopen(fd, "wb");
if (gzfp == NULL)
{
- dir_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
close(fd);
return NULL;
}
- if (gzsetparams(gzfp, dir_data->compression_level,
+ if (gzsetparams(gzfp, wwmethod->compression_level,
Z_DEFAULT_STRATEGY) != Z_OK)
{
- dir_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
gzclose(gzfp);
return NULL;
}
}
#endif
#ifdef USE_LZ4
- if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
{
size_t ctx_out;
size_t header_size;
@@ -169,7 +181,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
ctx_out = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
if (LZ4F_isError(ctx_out))
{
- dir_data->lasterrstring = LZ4F_getErrorName(ctx_out);
+ wwmethod->lasterrstring = LZ4F_getErrorName(ctx_out);
close(fd);
return NULL;
}
@@ -179,13 +191,13 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
/* assign the compression level, default is 0 */
memset(&prefs, 0, sizeof(prefs));
- prefs.compressionLevel = dir_data->compression_level;
+ prefs.compressionLevel = wwmethod->compression_level;
/* add the header */
header_size = LZ4F_compressBegin(ctx, lz4buf, lz4bufsize, &prefs);
if (LZ4F_isError(header_size))
{
- dir_data->lasterrstring = LZ4F_getErrorName(header_size);
+ wwmethod->lasterrstring = LZ4F_getErrorName(header_size);
(void) LZ4F_freeCompressionContext(ctx);
pg_free(lz4buf);
close(fd);
@@ -196,7 +208,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
if (write(fd, lz4buf, header_size) != header_size)
{
/* If write didn't set errno, assume problem is no disk space */
- dir_data->lasterrno = errno ? errno : ENOSPC;
+ wwmethod->lasterrno = errno ? errno : ENOSPC;
(void) LZ4F_freeCompressionContext(ctx);
pg_free(lz4buf);
close(fd);
@@ -206,7 +218,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
#endif
/* Do pre-padding on non-compressed files */
- if (pad_to_size && dir_data->compression_algorithm == PG_COMPRESSION_NONE)
+ if (pad_to_size && wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
{
PGAlignedXLogBlock zerobuf;
int bytes;
@@ -218,7 +230,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
if (write(fd, zerobuf.data, XLOG_BLCKSZ) != XLOG_BLCKSZ)
{
/* If write didn't set errno, assume problem is no disk space */
- dir_data->lasterrno = errno ? errno : ENOSPC;
+ wwmethod->lasterrno = errno ? errno : ENOSPC;
close(fd);
return NULL;
}
@@ -226,7 +238,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
if (lseek(fd, 0, SEEK_SET) != 0)
{
- dir_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
close(fd);
return NULL;
}
@@ -238,19 +250,19 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
* important when using synchronous mode, where the file is modified and
* fsynced in-place, without a directory fsync.
*/
- if (dir_data->sync)
+ if (wwmethod->sync)
{
if (fsync_fname(tmppath, false) != 0 ||
fsync_parent_path(tmppath) != 0)
{
- dir_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
#ifdef HAVE_LIBZ
- if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
gzclose(gzfp);
else
#endif
#ifdef USE_LZ4
- if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
{
(void) LZ4F_compressEnd(ctx, lz4buf, lz4bufsize, NULL);
(void) LZ4F_freeCompressionContext(ctx);
@@ -266,11 +278,11 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
f = pg_malloc0(sizeof(DirectoryMethodFile));
#ifdef HAVE_LIBZ
- if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
f->gzfp = gzfp;
#endif
#ifdef USE_LZ4
- if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
{
f->ctx = ctx;
f->lz4buf = lz4buf;
@@ -278,6 +290,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
}
#endif
+ f->base.wwmethod = wwmethod;
f->base.currpos = 0;
f->base.pathname = pg_strdup(pathname);
f->fd = fd;
@@ -295,23 +308,23 @@ dir_write(Walfile *f, const void *buf, size_t count)
DirectoryMethodFile *df = (DirectoryMethodFile *) f;
Assert(f != NULL);
- dir_clear_error();
+ clear_error(f->wwmethod);
#ifdef HAVE_LIBZ
- if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
errno = 0;
r = (ssize_t) gzwrite(df->gzfp, buf, count);
if (r != count)
{
/* If write didn't set errno, assume problem is no disk space */
- dir_data->lasterrno = errno ? errno : ENOSPC;
+ f->wwmethod->lasterrno = errno ? errno : ENOSPC;
}
}
else
#endif
#ifdef USE_LZ4
- if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
{
size_t chunk;
size_t remaining;
@@ -335,7 +348,7 @@ dir_write(Walfile *f, const void *buf, size_t count)
if (LZ4F_isError(compressed))
{
- dir_data->lasterrstring = LZ4F_getErrorName(compressed);
+ f->wwmethod->lasterrstring = LZ4F_getErrorName(compressed);
return -1;
}
@@ -343,7 +356,7 @@ dir_write(Walfile *f, const void *buf, size_t count)
if (write(df->fd, df->lz4buf, compressed) != compressed)
{
/* If write didn't set errno, assume problem is no disk space */
- dir_data->lasterrno = errno ? errno : ENOSPC;
+ f->wwmethod->lasterrno = errno ? errno : ENOSPC;
return -1;
}
@@ -361,7 +374,7 @@ dir_write(Walfile *f, const void *buf, size_t count)
if (r != count)
{
/* If write didn't set errno, assume problem is no disk space */
- dir_data->lasterrno = errno ? errno : ENOSPC;
+ f->wwmethod->lasterrno = errno ? errno : ENOSPC;
}
}
if (r > 0)
@@ -374,14 +387,15 @@ dir_close(Walfile *f, WalCloseMethod method)
{
int r;
DirectoryMethodFile *df = (DirectoryMethodFile *) f;
+ DirectoryMethodData *dir_data = (DirectoryMethodData *) f->wwmethod;
char tmppath[MAXPGPATH];
char tmppath2[MAXPGPATH];
Assert(f != NULL);
- dir_clear_error();
+ clear_error(f->wwmethod);
#ifdef HAVE_LIBZ
- if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
errno = 0; /* in case gzclose() doesn't set it */
r = gzclose(df->gzfp);
@@ -389,7 +403,7 @@ dir_close(Walfile *f, WalCloseMethod method)
else
#endif
#ifdef USE_LZ4
- if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
{
size_t compressed;
@@ -399,7 +413,7 @@ dir_close(Walfile *f, WalCloseMethod method)
if (LZ4F_isError(compressed))
{
- dir_data->lasterrstring = LZ4F_getErrorName(compressed);
+ f->wwmethod->lasterrstring = LZ4F_getErrorName(compressed);
return -1;
}
@@ -407,7 +421,7 @@ dir_close(Walfile *f, WalCloseMethod method)
if (write(df->fd, df->lz4buf, compressed) != compressed)
{
/* If write didn't set errno, assume problem is no disk space */
- dir_data->lasterrno = errno ? errno : ENOSPC;
+ f->wwmethod->lasterrno = errno ? errno : ENOSPC;
return -1;
}
@@ -429,17 +443,18 @@ dir_close(Walfile *f, WalCloseMethod method)
* If we have a temp prefix, normal operation is to rename the
* file.
*/
- filename = dir_get_file_name(df->base.pathname, df->temp_suffix);
+ filename = dir_get_file_name(f->wwmethod, df->base.pathname,
+ df->temp_suffix);
snprintf(tmppath, sizeof(tmppath), "%s/%s",
dir_data->basedir, filename);
pg_free(filename);
/* permanent name, so no need for the prefix */
- filename2 = dir_get_file_name(df->base.pathname, NULL);
+ filename2 = dir_get_file_name(f->wwmethod, df->base.pathname, NULL);
snprintf(tmppath2, sizeof(tmppath2), "%s/%s",
dir_data->basedir, filename2);
pg_free(filename2);
- if (dir_data->sync)
+ if (f->wwmethod->sync)
r = durable_rename(tmppath, tmppath2);
else
{
@@ -456,7 +471,8 @@ dir_close(Walfile *f, WalCloseMethod method)
char *filename;
/* Unlink the file once it's closed */
- filename = dir_get_file_name(df->base.pathname, df->temp_suffix);
+ filename = dir_get_file_name(f->wwmethod, df->base.pathname,
+ df->temp_suffix);
snprintf(tmppath, sizeof(tmppath), "%s/%s",
dir_data->basedir, filename);
pg_free(filename);
@@ -469,7 +485,7 @@ dir_close(Walfile *f, WalCloseMethod method)
* CLOSE_NO_RENAME. In this case, fsync the file and containing
* directory if sync mode is requested.
*/
- if (dir_data->sync)
+ if (f->wwmethod->sync)
{
r = fsync_fname(df->fullpath, false);
if (r == 0)
@@ -479,7 +495,7 @@ dir_close(Walfile *f, WalCloseMethod method)
}
if (r != 0)
- dir_data->lasterrno = errno;
+ f->wwmethod->lasterrno = errno;
#ifdef USE_LZ4
pg_free(df->lz4buf);
@@ -501,23 +517,23 @@ dir_sync(Walfile *f)
int r;
Assert(f != NULL);
- dir_clear_error();
+ clear_error(f->wwmethod);
- if (!dir_data->sync)
+ if (!f->wwmethod->sync)
return 0;
#ifdef HAVE_LIBZ
- if (dir_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
if (gzflush(((DirectoryMethodFile *) f)->gzfp, Z_SYNC_FLUSH) != Z_OK)
{
- dir_data->lasterrno = errno;
+ f->wwmethod->lasterrno = errno;
return -1;
}
}
#endif
#ifdef USE_LZ4
- if (dir_data->compression_algorithm == PG_COMPRESSION_LZ4)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
{
DirectoryMethodFile *df = (DirectoryMethodFile *) f;
size_t compressed;
@@ -526,7 +542,7 @@ dir_sync(Walfile *f)
compressed = LZ4F_flush(df->ctx, df->lz4buf, df->lz4bufsize, NULL);
if (LZ4F_isError(compressed))
{
- dir_data->lasterrstring = LZ4F_getErrorName(compressed);
+ f->wwmethod->lasterrstring = LZ4F_getErrorName(compressed);
return -1;
}
@@ -534,7 +550,7 @@ dir_sync(Walfile *f)
if (write(df->fd, df->lz4buf, compressed) != compressed)
{
/* If write didn't set errno, assume problem is no disk space */
- dir_data->lasterrno = errno ? errno : ENOSPC;
+ f->wwmethod->lasterrno = errno ? errno : ENOSPC;
return -1;
}
}
@@ -542,13 +558,14 @@ dir_sync(Walfile *f)
r = fsync(((DirectoryMethodFile *) f)->fd);
if (r < 0)
- dir_data->lasterrno = errno;
+ f->wwmethod->lasterrno = errno;
return r;
}
static ssize_t
-dir_get_file_size(const char *pathname)
+dir_get_file_size(WalWriteMethod *wwmethod, const char *pathname)
{
+ DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
struct stat statbuf;
char tmppath[MAXPGPATH];
@@ -557,26 +574,21 @@ dir_get_file_size(const char *pathname)
if (stat(tmppath, &statbuf) != 0)
{
- dir_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
return -1;
}
return statbuf.st_size;
}
-static pg_compress_algorithm
-dir_compression_algorithm(void)
-{
- return dir_data->compression_algorithm;
-}
-
static bool
-dir_existsfile(const char *pathname)
+dir_existsfile(WalWriteMethod *wwmethod, const char *pathname)
{
+ DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
char tmppath[MAXPGPATH];
int fd;
- dir_clear_error();
+ clear_error(wwmethod);
snprintf(tmppath, sizeof(tmppath), "%s/%s",
dir_data->basedir, pathname);
@@ -589,60 +601,54 @@ dir_existsfile(const char *pathname)
}
static bool
-dir_finish(void)
+dir_finish(WalWriteMethod *wwmethod)
{
- dir_clear_error();
+ clear_error(wwmethod);
- if (dir_data->sync)
+ if (wwmethod->sync)
{
+ DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
+
/*
* Files are fsynced when they are closed, but we need to fsync the
* directory entry here as well.
*/
if (fsync_fname(dir_data->basedir, true) != 0)
{
- dir_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
return false;
}
}
return true;
}
+static void
+dir_free(WalWriteMethod *wwmethod)
+{
+ DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
+
+ pg_free(dir_data->basedir);
+ pg_free(wwmethod);
+}
+
WalWriteMethod *
CreateWalDirectoryMethod(const char *basedir,
pg_compress_algorithm compression_algorithm,
int compression_level, bool sync)
{
- WalWriteMethod *method;
-
- method = pg_malloc0(sizeof(WalWriteMethod));
- method->open_for_write = dir_open_for_write;
- method->write = dir_write;
- method->get_file_size = dir_get_file_size;
- method->get_file_name = dir_get_file_name;
- method->compression_algorithm = dir_compression_algorithm;
- method->close = dir_close;
- method->sync = dir_sync;
- method->existsfile = dir_existsfile;
- method->finish = dir_finish;
- method->getlasterror = dir_getlasterror;
-
- dir_data = pg_malloc0(sizeof(DirectoryMethodData));
- dir_data->compression_algorithm = compression_algorithm;
- dir_data->compression_level = compression_level;
- dir_data->basedir = pg_strdup(basedir);
- dir_data->sync = sync;
-
- return method;
-}
-
-void
-FreeWalDirectoryMethod(void)
-{
- pg_free(dir_data->basedir);
- pg_free(dir_data);
- dir_data = NULL;
+ DirectoryMethodData *wwmethod;
+
+ wwmethod = pg_malloc0(sizeof(DirectoryMethodData));
+ *((const WalWriteMethodOps **) &wwmethod->base.ops) =
+ &WalDirectoryMethodOps;
+ wwmethod->base.compression_algorithm = compression_algorithm;
+ wwmethod->base.compression_level = compression_level;
+ wwmethod->base.sync = sync;
+ clear_error(&wwmethod->base);
+ wwmethod->basedir = pg_strdup(basedir);
+
+ return &wwmethod->base;
}
@@ -651,6 +657,33 @@ FreeWalDirectoryMethod(void)
*-------------------------------------------------------------------------
*/
+static Walfile *tar_open_for_write(WalWriteMethod *wwmethod,
+ const char *pathname,
+ const char *temp_suffix,
+ size_t pad_to_size);
+static int tar_close(Walfile *f, WalCloseMethod method);
+static bool tar_existsfile(WalWriteMethod *wwmethod, const char *pathname);
+static ssize_t tar_get_file_size(WalWriteMethod *wwmethod,
+ const char *pathname);
+static char *tar_get_file_name(WalWriteMethod *wwmethod,
+ const char *pathname, const char *temp_suffix);
+static ssize_t tar_write(Walfile *f, const void *buf, size_t count);
+static int tar_sync(Walfile *f);
+static bool tar_finish(WalWriteMethod *wwmethod);
+static void tar_free(WalWriteMethod *wwmethod);
+
+const WalWriteMethodOps WalTarMethodOps = {
+ .open_for_write = tar_open_for_write,
+ .close = tar_close,
+ .existsfile = tar_existsfile,
+ .get_file_size = tar_get_file_size,
+ .get_file_name = tar_get_file_name,
+ .write = tar_write,
+ .sync = tar_sync,
+ .finish = tar_finish,
+ .free = tar_free
+};
+
typedef struct TarMethodFile
{
Walfile base;
@@ -661,37 +694,20 @@ typedef struct TarMethodFile
typedef struct TarMethodData
{
+ WalWriteMethod base;
char *tarfilename;
int fd;
- pg_compress_algorithm compression_algorithm;
- int compression_level;
- bool sync;
TarMethodFile *currentfile;
- const char *lasterrstring; /* if set, takes precedence over lasterrno */
- int lasterrno;
#ifdef HAVE_LIBZ
z_streamp zp;
void *zlibOut;
#endif
} TarMethodData;
-static TarMethodData *tar_data = NULL;
-
-#define tar_clear_error() \
- (tar_data->lasterrstring = NULL, tar_data->lasterrno = 0)
-#define tar_set_error(msg) \
- (tar_data->lasterrstring = _(msg))
-
-static const char *
-tar_getlasterror(void)
-{
- if (tar_data->lasterrstring)
- return tar_data->lasterrstring;
- return strerror(tar_data->lasterrno);
-}
#ifdef HAVE_LIBZ
static bool
-tar_write_compressed_data(void *buf, size_t count, bool flush)
+tar_write_compressed_data(TarMethodData *tar_data, void *buf, size_t count,
+ bool flush)
{
tar_data->zp->next_in = buf;
tar_data->zp->avail_in = count;
@@ -703,7 +719,7 @@ tar_write_compressed_data(void *buf, size_t count, bool flush)
r = deflate(tar_data->zp, flush ? Z_FINISH : Z_NO_FLUSH);
if (r == Z_STREAM_ERROR)
{
- tar_set_error("could not compress data");
+ tar_data->base.lasterrstring = "could not compress data";
return false;
}
@@ -715,7 +731,7 @@ tar_write_compressed_data(void *buf, size_t count, bool flush)
if (write(tar_data->fd, tar_data->zlibOut, len) != len)
{
/* If write didn't set errno, assume problem is no disk space */
- tar_data->lasterrno = errno ? errno : ENOSPC;
+ tar_data->base.lasterrno = errno ? errno : ENOSPC;
return false;
}
@@ -732,7 +748,7 @@ tar_write_compressed_data(void *buf, size_t count, bool flush)
/* Reset the stream for writing */
if (deflateReset(tar_data->zp) != Z_OK)
{
- tar_set_error("could not reset compression stream");
+ tar_data->base.lasterrstring = "could not reset compression stream";
return false;
}
}
@@ -744,29 +760,31 @@ tar_write_compressed_data(void *buf, size_t count, bool flush)
static ssize_t
tar_write(Walfile *f, const void *buf, size_t count)
{
+ TarMethodData *tar_data = (TarMethodData *) f->wwmethod;
ssize_t r;
Assert(f != NULL);
- tar_clear_error();
+ clear_error(f->wwmethod);
/* Tarfile will always be positioned at the end */
- if (tar_data->compression_algorithm == PG_COMPRESSION_NONE)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
{
errno = 0;
r = write(tar_data->fd, buf, count);
if (r != count)
{
/* If write didn't set errno, assume problem is no disk space */
- tar_data->lasterrno = errno ? errno : ENOSPC;
+ f->wwmethod->lasterrno = errno ? errno : ENOSPC;
return -1;
}
f->currpos += r;
return r;
}
#ifdef HAVE_LIBZ
- else if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ else if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
- if (!tar_write_compressed_data(unconstify(void *, buf), count, false))
+ if (!tar_write_compressed_data(tar_data, unconstify(void *, buf),
+ count, false))
return -1;
f->currpos += count;
return count;
@@ -775,7 +793,7 @@ tar_write(Walfile *f, const void *buf, size_t count)
else
{
/* Can't happen - compression enabled with no method set */
- tar_data->lasterrno = ENOSYS;
+ f->wwmethod->lasterrno = ENOSYS;
return -1;
}
}
@@ -801,7 +819,8 @@ tar_write_padding_data(TarMethodFile *f, size_t bytes)
}
static char *
-tar_get_file_name(const char *pathname, const char *temp_suffix)
+tar_get_file_name(WalWriteMethod *wwmethod, const char *pathname,
+ const char *temp_suffix)
{
char *filename = pg_malloc0(MAXPGPATH * sizeof(char));
@@ -812,11 +831,13 @@ tar_get_file_name(const char *pathname, const char *temp_suffix)
}
static Walfile *
-tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
+tar_open_for_write(WalWriteMethod *wwmethod, const char *pathname,
+ const char *temp_suffix, size_t pad_to_size)
{
+ TarMethodData *tar_data = (TarMethodData *) wwmethod;
char *tmppath;
- tar_clear_error();
+ clear_error(wwmethod);
if (tar_data->fd < 0)
{
@@ -828,12 +849,12 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
pg_file_create_mode);
if (tar_data->fd < 0)
{
- tar_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
return NULL;
}
#ifdef HAVE_LIBZ
- if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
tar_data->zp = (z_streamp) pg_malloc(sizeof(z_stream));
tar_data->zp->zalloc = Z_NULL;
@@ -847,12 +868,13 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
* default 15 for the windowBits parameter makes the output be
* gzip instead of zlib.
*/
- if (deflateInit2(tar_data->zp, tar_data->compression_level,
+ if (deflateInit2(tar_data->zp, wwmethod->compression_level,
Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK)
{
pg_free(tar_data->zp);
tar_data->zp = NULL;
- tar_set_error("could not initialize compression library");
+ wwmethod->lasterrstring =
+ "could not initialize compression library";
return NULL;
}
}
@@ -863,13 +885,15 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
if (tar_data->currentfile != NULL)
{
- tar_set_error("implementation error: tar files can't have more than one open file");
+ wwmethod->lasterrstring =
+ "implementation error: tar files can't have more than one open file";
return NULL;
}
tar_data->currentfile = pg_malloc0(sizeof(TarMethodFile));
+ tar_data->currentfile->base.wwmethod = wwmethod;
- tmppath = tar_get_file_name(pathname, temp_suffix);
+ tmppath = tar_get_file_name(wwmethod, pathname, temp_suffix);
/* Create a header with size set to 0 - we will fill out the size on close */
if (tarCreateHeader(tar_data->currentfile->header, tmppath, NULL, 0, S_IRUSR | S_IWUSR, 0, 0, time(NULL)) != TAR_OK)
@@ -877,23 +901,24 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
pg_free(tar_data->currentfile);
pg_free(tmppath);
tar_data->currentfile = NULL;
- tar_set_error("could not create tar header");
+ wwmethod->lasterrstring = "could not create tar header";
return NULL;
}
pg_free(tmppath);
#ifdef HAVE_LIBZ
- if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
/* Flush existing data */
- if (!tar_write_compressed_data(NULL, 0, true))
+ if (!tar_write_compressed_data(tar_data, NULL, 0, true))
return NULL;
/* Turn off compression for header */
if (deflateParams(tar_data->zp, 0, Z_DEFAULT_STRATEGY) != Z_OK)
{
- tar_set_error("could not change compression parameters");
+ wwmethod->lasterrstring =
+ "could not change compression parameters";
return NULL;
}
}
@@ -902,39 +927,39 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
tar_data->currentfile->ofs_start = lseek(tar_data->fd, 0, SEEK_CUR);
if (tar_data->currentfile->ofs_start == -1)
{
- tar_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
pg_free(tar_data->currentfile);
tar_data->currentfile = NULL;
return NULL;
}
tar_data->currentfile->base.currpos = 0;
- if (tar_data->compression_algorithm == PG_COMPRESSION_NONE)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
{
errno = 0;
if (write(tar_data->fd, tar_data->currentfile->header,
TAR_BLOCK_SIZE) != TAR_BLOCK_SIZE)
{
/* If write didn't set errno, assume problem is no disk space */
- tar_data->lasterrno = errno ? errno : ENOSPC;
+ wwmethod->lasterrno = errno ? errno : ENOSPC;
pg_free(tar_data->currentfile);
tar_data->currentfile = NULL;
return NULL;
}
}
#ifdef HAVE_LIBZ
- else if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ else if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
/* Write header through the zlib APIs but with no compression */
- if (!tar_write_compressed_data(tar_data->currentfile->header,
+ if (!tar_write_compressed_data(tar_data, tar_data->currentfile->header,
TAR_BLOCK_SIZE, true))
return NULL;
/* Re-enable compression for the rest of the file */
- if (deflateParams(tar_data->zp, tar_data->compression_level,
+ if (deflateParams(tar_data->zp, wwmethod->compression_level,
Z_DEFAULT_STRATEGY) != Z_OK)
{
- tar_set_error("could not change compression parameters");
+ wwmethod->lasterrstring = "could not change compression parameters";
return NULL;
}
}
@@ -954,7 +979,7 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
if (pad_to_size)
{
tar_data->currentfile->pad_to_size = pad_to_size;
- if (tar_data->compression_algorithm == PG_COMPRESSION_NONE)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
{
/* Uncompressed, so pad now */
if (!tar_write_padding_data(tar_data->currentfile, pad_to_size))
@@ -964,7 +989,7 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
tar_data->currentfile->ofs_start + TAR_BLOCK_SIZE,
SEEK_SET) != tar_data->currentfile->ofs_start + TAR_BLOCK_SIZE)
{
- tar_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
return NULL;
}
@@ -976,42 +1001,37 @@ tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
}
static ssize_t
-tar_get_file_size(const char *pathname)
+tar_get_file_size(WalWriteMethod *wwmethod, const char *pathname)
{
- tar_clear_error();
+ clear_error(wwmethod);
/* Currently not used, so not supported */
- tar_data->lasterrno = ENOSYS;
+ wwmethod->lasterrno = ENOSYS;
return -1;
}
-static pg_compress_algorithm
-tar_compression_algorithm(void)
-{
- return tar_data->compression_algorithm;
-}
-
static int
tar_sync(Walfile *f)
{
+ TarMethodData *tar_data = (TarMethodData *) f->wwmethod;
int r;
Assert(f != NULL);
- tar_clear_error();
+ clear_error(f->wwmethod);
- if (!tar_data->sync)
+ if (!f->wwmethod->sync)
return 0;
/*
* Always sync the whole tarfile, because that's all we can do. This makes
* no sense on compressed files, so just ignore those.
*/
- if (tar_data->compression_algorithm != PG_COMPRESSION_NONE)
+ if (f->wwmethod->compression_algorithm != PG_COMPRESSION_NONE)
return 0;
r = fsync(tar_data->fd);
if (r < 0)
- tar_data->lasterrno = errno;
+ f->wwmethod->lasterrno = errno;
return r;
}
@@ -1020,16 +1040,17 @@ tar_close(Walfile *f, WalCloseMethod method)
{
ssize_t filesize;
int padding;
+ TarMethodData *tar_data = (TarMethodData *) f->wwmethod;
TarMethodFile *tf = (TarMethodFile *) f;
Assert(f != NULL);
- tar_clear_error();
+ clear_error(f->wwmethod);
if (method == CLOSE_UNLINK)
{
- if (tar_data->compression_algorithm != PG_COMPRESSION_NONE)
+ if (f->wwmethod->compression_algorithm != PG_COMPRESSION_NONE)
{
- tar_set_error("unlink not supported with compression");
+ f->wwmethod->lasterrstring = "unlink not supported with compression";
return -1;
}
@@ -1040,7 +1061,7 @@ tar_close(Walfile *f, WalCloseMethod method)
*/
if (ftruncate(tar_data->fd, tf->ofs_start) != 0)
{
- tar_data->lasterrno = errno;
+ f->wwmethod->lasterrno = errno;
return -1;
}
@@ -1058,7 +1079,7 @@ tar_close(Walfile *f, WalCloseMethod method)
*/
if (tf->pad_to_size)
{
- if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
/*
* A compressed tarfile is padded on close since we cannot know
@@ -1098,10 +1119,10 @@ tar_close(Walfile *f, WalCloseMethod method)
#ifdef HAVE_LIBZ
- if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
/* Flush the current buffer */
- if (!tar_write_compressed_data(NULL, 0, true))
+ if (!tar_write_compressed_data(tar_data, NULL, 0, true))
return -1;
}
#endif
@@ -1124,39 +1145,39 @@ tar_close(Walfile *f, WalCloseMethod method)
print_tar_number(&(tf->header[148]), 8, tarChecksum(((TarMethodFile *) f)->header));
if (lseek(tar_data->fd, tf->ofs_start, SEEK_SET) != ((TarMethodFile *) f)->ofs_start)
{
- tar_data->lasterrno = errno;
+ f->wwmethod->lasterrno = errno;
return -1;
}
- if (tar_data->compression_algorithm == PG_COMPRESSION_NONE)
+ if (f->wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
{
errno = 0;
if (write(tar_data->fd, tf->header, TAR_BLOCK_SIZE) != TAR_BLOCK_SIZE)
{
/* If write didn't set errno, assume problem is no disk space */
- tar_data->lasterrno = errno ? errno : ENOSPC;
+ f->wwmethod->lasterrno = errno ? errno : ENOSPC;
return -1;
}
}
#ifdef HAVE_LIBZ
- else if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ else if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
/* Turn off compression */
if (deflateParams(tar_data->zp, 0, Z_DEFAULT_STRATEGY) != Z_OK)
{
- tar_set_error("could not change compression parameters");
+ f->wwmethod->lasterrstring = "could not change compression parameters";
return -1;
}
/* Overwrite the header, assuming the size will be the same */
- if (!tar_write_compressed_data(tar_data->currentfile->header,
+ if (!tar_write_compressed_data(tar_data, tar_data->currentfile->header,
TAR_BLOCK_SIZE, true))
return -1;
/* Turn compression back on */
- if (deflateParams(tar_data->zp, tar_data->compression_level,
+ if (deflateParams(tar_data->zp, f->wwmethod->compression_level,
Z_DEFAULT_STRATEGY) != Z_OK)
{
- tar_set_error("could not change compression parameters");
+ f->wwmethod->lasterrstring = "could not change compression parameters";
return -1;
}
}
@@ -1170,7 +1191,7 @@ tar_close(Walfile *f, WalCloseMethod method)
/* Move file pointer back down to end, so we can write the next file */
if (lseek(tar_data->fd, 0, SEEK_END) < 0)
{
- tar_data->lasterrno = errno;
+ f->wwmethod->lasterrno = errno;
return -1;
}
@@ -1179,7 +1200,7 @@ tar_close(Walfile *f, WalCloseMethod method)
{
/* XXX this seems pretty bogus; why is only this case fatal? */
pg_fatal("could not fsync file \"%s\": %s",
- tf->base.pathname, tar_getlasterror());
+ tf->base.pathname, GetLastWalMethodError(f->wwmethod));
}
/* Clean up and done */
@@ -1191,19 +1212,20 @@ tar_close(Walfile *f, WalCloseMethod method)
}
static bool
-tar_existsfile(const char *pathname)
+tar_existsfile(WalWriteMethod *wwmethod, const char *pathname)
{
- tar_clear_error();
+ clear_error(wwmethod);
/* We only deal with new tarfiles, so nothing externally created exists */
return false;
}
static bool
-tar_finish(void)
+tar_finish(WalWriteMethod *wwmethod)
{
+ TarMethodData *tar_data = (TarMethodData *) wwmethod;
char zerobuf[1024] = {0};
- tar_clear_error();
+ clear_error(wwmethod);
if (tar_data->currentfile)
{
@@ -1212,20 +1234,21 @@ tar_finish(void)
}
/* A tarfile always ends with two empty blocks */
- if (tar_data->compression_algorithm == PG_COMPRESSION_NONE)
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
{
errno = 0;
if (write(tar_data->fd, zerobuf, sizeof(zerobuf)) != sizeof(zerobuf))
{
/* If write didn't set errno, assume problem is no disk space */
- tar_data->lasterrno = errno ? errno : ENOSPC;
+ wwmethod->lasterrno = errno ? errno : ENOSPC;
return false;
}
}
#ifdef HAVE_LIBZ
- else if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
+ else if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
{
- if (!tar_write_compressed_data(zerobuf, sizeof(zerobuf), false))
+ if (!tar_write_compressed_data(tar_data, zerobuf, sizeof(zerobuf),
+ false))
return false;
/* Also flush all data to make sure the gzip stream is finished */
@@ -1239,7 +1262,7 @@ tar_finish(void)
if (r == Z_STREAM_ERROR)
{
- tar_set_error("could not compress data");
+ wwmethod->lasterrstring = "could not compress data";
return false;
}
if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
@@ -1253,7 +1276,7 @@ tar_finish(void)
* If write didn't set errno, assume problem is no disk
* space.
*/
- tar_data->lasterrno = errno ? errno : ENOSPC;
+ wwmethod->lasterrno = errno ? errno : ENOSPC;
return false;
}
}
@@ -1263,7 +1286,7 @@ tar_finish(void)
if (deflateEnd(tar_data->zp) != Z_OK)
{
- tar_set_error("could not close compression stream");
+ wwmethod->lasterrstring = "could not close compression stream";
return false;
}
}
@@ -1275,29 +1298,29 @@ tar_finish(void)
}
/* sync the empty blocks as well, since they're after the last file */
- if (tar_data->sync)
+ if (wwmethod->sync)
{
if (fsync(tar_data->fd) != 0)
{
- tar_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
return false;
}
}
if (close(tar_data->fd) != 0)
{
- tar_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
return false;
}
tar_data->fd = -1;
- if (tar_data->sync)
+ if (wwmethod->sync)
{
if (fsync_fname(tar_data->tarfilename, false) != 0 ||
fsync_parent_path(tar_data->tarfilename) != 0)
{
- tar_data->lasterrno = errno;
+ wwmethod->lasterrno = errno;
return false;
}
}
@@ -1305,6 +1328,19 @@ tar_finish(void)
return true;
}
+static void
+tar_free(WalWriteMethod *wwmethod)
+{
+ TarMethodData *tar_data = (TarMethodData *) wwmethod;
+
+ pg_free(tar_data->tarfilename);
+#ifdef HAVE_LIBZ
+ if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
+ pg_free(tar_data->zlibOut);
+#endif
+ pg_free(wwmethod);
+}
+
/*
* The argument compression_algorithm is currently ignored. It is in place for
* symmetry with CreateWalDirectoryMethod which uses it for distinguishing
@@ -1316,45 +1352,33 @@ CreateWalTarMethod(const char *tarbase,
pg_compress_algorithm compression_algorithm,
int compression_level, bool sync)
{
- WalWriteMethod *method;
+ TarMethodData *wwmethod;
const char *suffix = (compression_algorithm == PG_COMPRESSION_GZIP) ?
".tar.gz" : ".tar";
- method = pg_malloc0(sizeof(WalWriteMethod));
- method->open_for_write = tar_open_for_write;
- method->write = tar_write;
- method->get_file_size = tar_get_file_size;
- method->get_file_name = tar_get_file_name;
- method->compression_algorithm = tar_compression_algorithm;
- method->close = tar_close;
- method->sync = tar_sync;
- method->existsfile = tar_existsfile;
- method->finish = tar_finish;
- method->getlasterror = tar_getlasterror;
-
- tar_data = pg_malloc0(sizeof(TarMethodData));
- tar_data->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1);
- sprintf(tar_data->tarfilename, "%s%s", tarbase, suffix);
- tar_data->fd = -1;
- tar_data->compression_algorithm = compression_algorithm;
- tar_data->compression_level = compression_level;
- tar_data->sync = sync;
+ wwmethod = pg_malloc0(sizeof(TarMethodData));
+ *((const WalWriteMethodOps **) &wwmethod->base.ops) =
+ &WalTarMethodOps;
+ wwmethod->base.compression_algorithm = compression_algorithm;
+ wwmethod->base.compression_level = compression_level;
+ wwmethod->base.sync = sync;
+ clear_error(&wwmethod->base);
+
+ wwmethod->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1);
+ sprintf(wwmethod->tarfilename, "%s%s", tarbase, suffix);
+ wwmethod->fd = -1;
#ifdef HAVE_LIBZ
if (compression_algorithm == PG_COMPRESSION_GZIP)
- tar_data->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
+ wwmethod->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
#endif
- return method;
+ return &wwmethod->base;
}
-void
-FreeWalTarMethod(void)
+const char *
+GetLastWalMethodError(WalWriteMethod *wwmethod)
{
- pg_free(tar_data->tarfilename);
-#ifdef HAVE_LIBZ
- if (tar_data->compression_algorithm == PG_COMPRESSION_GZIP)
- pg_free(tar_data->zlibOut);
-#endif
- pg_free(tar_data);
- tar_data = NULL;
+ if (wwmethod->lasterrstring)
+ return wwmethod->lasterrstring;
+ return strerror(wwmethod->lasterrno);
}
diff --git a/src/bin/pg_basebackup/walmethods.h b/src/bin/pg_basebackup/walmethods.h
index cf5ed87fbe8..0d7728d0864 100644
--- a/src/bin/pg_basebackup/walmethods.h
+++ b/src/bin/pg_basebackup/walmethods.h
@@ -16,6 +16,7 @@ typedef struct WalWriteMethod WalWriteMethod;
typedef struct
{
+ WalWriteMethod *wwmethod;
off_t currpos;
char *pathname;
/*
@@ -34,16 +35,9 @@ typedef enum
} WalCloseMethod;
/*
- * A WalWriteMethod structure represents the different methods used
- * to write the streaming WAL as it's received.
- *
- * All methods that have a failure return indicator will set state
- * allowing the getlasterror() method to return a suitable message.
- * Commonly, errno is this state (or part of it); so callers must take
- * care not to clobber errno between a failed method call and use of
- * getlasterror() to retrieve the message.
+ * Table of callbacks for a WalWriteMethod.
*/
-struct WalWriteMethod
+typedef struct WalWriteMethodOps
{
/*
* Open a target file. Returns Walfile, or NULL if open failed. If a temp
@@ -51,7 +45,7 @@ struct WalWriteMethod
* automatically renamed in close(). If pad_to_size is specified, the file
* will be padded with NUL up to that size, if supported by the Walmethod.
*/
- Walfile *(*open_for_write) (const char *pathname, const char *temp_suffix, size_t pad_to_size);
+ Walfile *(*open_for_write) (WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix, size_t pad_to_size);
/*
* Close an open Walfile, using one or more methods for handling automatic
@@ -60,19 +54,16 @@ struct WalWriteMethod
int (*close) (Walfile *f, WalCloseMethod method);
/* Check if a file exist */
- bool (*existsfile) (const char *pathname);
+ bool (*existsfile) (WalWriteMethod *wwmethod, const char *pathname);
/* Return the size of a file, or -1 on failure. */
- ssize_t (*get_file_size) (const char *pathname);
+ ssize_t (*get_file_size) (WalWriteMethod *wwmethod, const char *pathname);
/*
* Return the name of the current file to work on in pg_malloc()'d string,
* without the base directory. This is useful for logging.
*/
- char *(*get_file_name) (const char *pathname, const char *temp_suffix);
-
- /* Returns the compression method */
- pg_compress_algorithm (*compression_algorithm) (void);
+ char *(*get_file_name) (WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix);
/*
* Write count number of bytes to the file, and return the number of bytes
@@ -91,10 +82,37 @@ struct WalWriteMethod
* close/write/sync of shared resources succeeded, otherwise returns false
* (but the resources are still closed).
*/
- bool (*finish) (void);
+ bool (*finish) (WalWriteMethod *wwmethod);
- /* Return a text for the last error in this Walfile */
- const char *(*getlasterror) (void);
+ /*
+ * Free subsidiary data associated with the WalWriteMethod, and the
+ * WalWriteMethod itself.
+ */
+ void (*free) (WalWriteMethod *wwmethod);
+} WalWriteMethodOps;
+
+/*
+ * A WalWriteMethod structure represents a way of writing streaming WAL as
+ * it's received.
+ *
+ * All methods that have a failure return indicator will set lasterrstring
+ * or lasterrno (the former takes precedence) so that the caller can signal
+ * a suitable error.
+ */
+struct WalWriteMethod
+{
+ const WalWriteMethodOps *ops;
+ pg_compress_algorithm compression_algorithm;
+ int compression_level;
+ bool sync;
+ const char *lasterrstring; /* if set, takes precedence over lasterrno */
+ int lasterrno;
+ /*
+ * MORE DATA FOLLOWS AT END OF STRUCT
+ *
+ * Each WalWriteMethod is expected to embed this as the first member of
+ * a larger struct with method-specific fields following.
+ */
};
/*
@@ -111,6 +129,4 @@ WalWriteMethod *CreateWalTarMethod(const char *tarbase,
pg_compress_algorithm compression_algo,
int compression, bool sync);
-/* Cleanup routines for previously-created methods */
-void FreeWalDirectoryMethod(void);
-void FreeWalTarMethod(void);
+const char *GetLastWalMethodError(WalWriteMethod *wwmethod);