diff options
Diffstat (limited to 'src/bin/pg_basebackup/pg_basebackup.c')
-rw-r--r-- | src/bin/pg_basebackup/pg_basebackup.c | 409 |
1 files changed, 169 insertions, 240 deletions
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 2943d9ec1a0..3e6977df1aa 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -29,6 +29,7 @@ #include "access/xlog_internal.h" #include "bbstreamer.h" +#include "common/backup_compression.h" #include "common/file_perm.h" #include "common/file_utils.h" #include "common/logging.h" @@ -57,6 +58,7 @@ typedef struct TablespaceList typedef struct ArchiveStreamState { int tablespacenum; + bc_specification *compress; bbstreamer *streamer; bbstreamer *manifest_inject_streamer; PQExpBuffer manifest_buffer; @@ -132,9 +134,6 @@ static bool checksum_failure = false; static bool showprogress = false; static bool estimatesize = true; static int verbose = 0; -static int compresslevel = 0; -static WalCompressionMethod compressmethod = COMPRESSION_NONE; -static CompressionLocation compressloc = COMPRESS_LOCATION_UNSPECIFIED; static IncludeWal includewal = STREAM_WAL; static bool fastcheckpoint = false; static bool writerecoveryconf = false; @@ -198,7 +197,8 @@ static void progress_report(int tablespacenum, bool force, bool finished); static bbstreamer *CreateBackupStreamer(char *archive_name, char *spclocation, bbstreamer **manifest_inject_streamer_p, bool is_recovery_guc_supported, - bool expect_unterminated_tarfile); + bool expect_unterminated_tarfile, + bc_specification *compress); static void ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data); static char GetCopyDataByte(size_t r, char *copybuf, size_t *cursor); @@ -207,7 +207,7 @@ static uint64 GetCopyDataUInt64(size_t r, char *copybuf, size_t *cursor); static void GetCopyDataEnd(size_t r, char *copybuf, size_t cursor); static void ReportCopyDataParseError(size_t r, char *copybuf); static void ReceiveTarFile(PGconn *conn, char *archive_name, char *spclocation, - bool tablespacenum); + bool tablespacenum, bc_specification *compress); static void ReceiveTarCopyChunk(size_t r, char *copybuf, void *callback_data); static void ReceiveBackupManifest(PGconn *conn); static void ReceiveBackupManifestChunk(size_t r, char *copybuf, @@ -215,7 +215,9 @@ static void ReceiveBackupManifestChunk(size_t r, char *copybuf, static void ReceiveBackupManifestInMemory(PGconn *conn, PQExpBuffer buf); static void ReceiveBackupManifestInMemoryChunk(size_t r, char *copybuf, void *callback_data); -static void BaseBackup(void); +static void BaseBackup(char *compression_algorithm, char *compression_detail, + CompressionLocation compressloc, + bc_specification *client_compress); static bool reached_end_position(XLogRecPtr segendpos, uint32 timeline, bool segment_finished); @@ -405,8 +407,8 @@ usage(void) printf(_(" -X, --wal-method=none|fetch|stream\n" " include required WAL files with specified method\n")); printf(_(" -z, --gzip compress tar output\n")); - printf(_(" -Z, --compress=[{client|server}-]{gzip|lz4|zstd}[:LEVEL]\n" - " compress tar output with given compression method or level\n")); + printf(_(" -Z, --compress=[{client|server}-]METHOD[:DETAIL]\n" + " compress on client or server as specified\n")); printf(_(" -Z, --compress=none do not compress tar output\n")); printf(_("\nGeneral options:\n")); printf(_(" -c, --checkpoint=fast|spread\n" @@ -542,7 +544,9 @@ typedef struct } logstreamer_param; static int -LogStreamerMain(logstreamer_param *param) +LogStreamerMain(logstreamer_param *param, + WalCompressionMethod wal_compress_method, + int wal_compress_level) { StreamCtl stream; @@ -565,25 +569,14 @@ LogStreamerMain(logstreamer_param *param) stream.mark_done = true; stream.partial_suffix = NULL; stream.replication_slot = replication_slot; - if (format == 'p') stream.walmethod = CreateWalDirectoryMethod(param->xlog, COMPRESSION_NONE, 0, stream.do_sync); - else if (compressloc != COMPRESS_LOCATION_CLIENT) - stream.walmethod = CreateWalTarMethod(param->xlog, - COMPRESSION_NONE, - compresslevel, - stream.do_sync); - else if (compressmethod == COMPRESSION_GZIP) - stream.walmethod = CreateWalTarMethod(param->xlog, - compressmethod, - compresslevel, - stream.do_sync); else stream.walmethod = CreateWalTarMethod(param->xlog, - COMPRESSION_NONE, - compresslevel, + wal_compress_method, + wal_compress_level, stream.do_sync); if (!ReceiveXlogStream(param->bgconn, &stream)) @@ -629,7 +622,9 @@ LogStreamerMain(logstreamer_param *param) * stream the logfile in parallel with the backups. */ static void -StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier) +StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier, + WalCompressionMethod wal_compress_method, + int wal_compress_level) { logstreamer_param *param; uint32 hi, @@ -729,7 +724,7 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier) int ret; /* in child process */ - ret = LogStreamerMain(param); + ret = LogStreamerMain(param, wal_compress_method, wal_compress_level); /* temp debugging aid to analyze 019_replslot_limit failures */ if (verbose) @@ -1004,136 +999,81 @@ parse_max_rate(char *src) } /* - * Utility wrapper to parse the values specified for -Z/--compress. - * *methodres and *levelres will be optionally filled with values coming - * from the parsed results. + * Basic parsing of a value specified for -Z/--compress. + * + * We're not concerned here with understanding exactly what behavior the + * user wants, but we do need to know whether the user is requesting client + * or server side compression or leaving it unspecified, and we need to + * separate the name of the compression algorithm from the detail string. + * + * For instance, if the user writes --compress client-lz4:6, we want to + * separate that into (a) client-side compression, (b) algorithm "lz4", + * and (c) detail "6". Note, however, that all the client/server prefix is + * optional, and so is the detail. The algorithm name is required, unless + * the whole string is an integer, in which case we assume "gzip" as the + * algorithm and use the integer as the detail. + * + * We're not concerned with validation at this stage, so if the user writes + * --compress client-turkey:sandwich, the requested algorithm is "turkey" + * and the detail string is "sandwich". We'll sort out whether that's legal + * at a later stage. */ static void -parse_compress_options(char *src, WalCompressionMethod *methodres, - CompressionLocation *locationres, int *levelres) +parse_compress_options(char *option, char **algorithm, char **detail, + CompressionLocation *locationres) { char *sep; - int firstlen; - char *firstpart; + char *endp; /* - * clear 'levelres' so that if there are multiple compression options, - * the last one fully overrides the earlier ones - */ - *levelres = 0; - - /* check if the option is split in two */ - sep = strchr(src, ':'); - - /* - * The first part of the option value could be a method name, or just a - * level value. - */ - firstlen = (sep != NULL) ? (sep - src) : strlen(src); - firstpart = pg_malloc(firstlen + 1); - memcpy(firstpart, src, firstlen); - firstpart[firstlen] = '\0'; - - /* - * Check if the first part of the string matches with a supported - * compression method. + * Check whether the compression specification consists of a bare integer. + * + * If so, for backward compatibility, assume gzip. */ - if (pg_strcasecmp(firstpart, "gzip") == 0) + (void) strtol(option, &endp, 10); + if (*endp == '\0') { - *methodres = COMPRESSION_GZIP; *locationres = COMPRESS_LOCATION_UNSPECIFIED; + *algorithm = pstrdup("gzip"); + *detail = pstrdup(option); + return; } - else if (pg_strcasecmp(firstpart, "client-gzip") == 0) - { - *methodres = COMPRESSION_GZIP; - *locationres = COMPRESS_LOCATION_CLIENT; - } - else if (pg_strcasecmp(firstpart, "server-gzip") == 0) + + /* Strip off any "client-" or "server-" prefix. */ + if (strncmp(option, "server-", 7) == 0) { - *methodres = COMPRESSION_GZIP; *locationres = COMPRESS_LOCATION_SERVER; + option += 7; } - else if (pg_strcasecmp(firstpart, "lz4") == 0) - { - *methodres = COMPRESSION_LZ4; - *locationres = COMPRESS_LOCATION_UNSPECIFIED; - } - else if (pg_strcasecmp(firstpart, "client-lz4") == 0) + else if (strncmp(option, "client-", 7) == 0) { - *methodres = COMPRESSION_LZ4; *locationres = COMPRESS_LOCATION_CLIENT; - } - else if (pg_strcasecmp(firstpart, "server-lz4") == 0) - { - *methodres = COMPRESSION_LZ4; - *locationres = COMPRESS_LOCATION_SERVER; - } - else if (pg_strcasecmp(firstpart, "zstd") == 0) - { - *methodres = COMPRESSION_ZSTD; - *locationres = COMPRESS_LOCATION_UNSPECIFIED; - } - else if (pg_strcasecmp(firstpart, "client-zstd") == 0) - { - *methodres = COMPRESSION_ZSTD; - *locationres = COMPRESS_LOCATION_CLIENT; - } - else if (pg_strcasecmp(firstpart, "server-zstd") == 0) - { - *methodres = COMPRESSION_ZSTD; - *locationres = COMPRESS_LOCATION_SERVER; - } - else if (pg_strcasecmp(firstpart, "none") == 0) - { - *methodres = COMPRESSION_NONE; - *locationres = COMPRESS_LOCATION_UNSPECIFIED; + option += 7; } else - { - /* - * It does not match anything known, so check for the - * backward-compatible case of only an integer where the implied - * compression method changes depending on the level value. - */ - if (!option_parse_int(firstpart, "-Z/--compress", 0, - INT_MAX, levelres)) - exit(1); - - *methodres = (*levelres > 0) ? - COMPRESSION_GZIP : COMPRESSION_NONE; *locationres = COMPRESS_LOCATION_UNSPECIFIED; - free(firstpart); - return; - } - + /* + * Check whether there is a compression detail following the algorithm + * name. + */ + sep = strchr(option, ':'); if (sep == NULL) { - /* - * The caller specified a method without a colon separator, so let any - * subsequent checks assign a default level. - */ - free(firstpart); - return; + *algorithm = pstrdup(option); + *detail = NULL; } - - /* Check the contents after the colon separator. */ - sep++; - if (*sep == '\0') + else { - pg_log_error("no compression level defined for method %s", firstpart); - exit(1); - } + char *alg; - /* - * For any of the methods currently supported, the data after the - * separator can just be an integer. - */ - if (!option_parse_int(sep, "-Z/--compress", 0, INT_MAX, - levelres)) - exit(1); + alg = palloc((sep - option) + 1); + memcpy(alg, option, sep - option); + alg[sep - option] = '\0'; - free(firstpart); + *algorithm = alg; + *detail = pstrdup(sep + 1); + } } /* @@ -1200,7 +1140,8 @@ static bbstreamer * CreateBackupStreamer(char *archive_name, char *spclocation, bbstreamer **manifest_inject_streamer_p, bool is_recovery_guc_supported, - bool expect_unterminated_tarfile) + bool expect_unterminated_tarfile, + bc_specification *compress) { bbstreamer *streamer = NULL; bbstreamer *manifest_inject_streamer = NULL; @@ -1316,32 +1257,28 @@ CreateBackupStreamer(char *archive_name, char *spclocation, archive_file = NULL; } - if (compressmethod == COMPRESSION_NONE || - compressloc != COMPRESS_LOCATION_CLIENT) + if (compress->algorithm == BACKUP_COMPRESSION_NONE) streamer = bbstreamer_plain_writer_new(archive_filename, archive_file); - else if (compressmethod == COMPRESSION_GZIP) + else if (compress->algorithm == BACKUP_COMPRESSION_GZIP) { strlcat(archive_filename, ".gz", sizeof(archive_filename)); streamer = bbstreamer_gzip_writer_new(archive_filename, - archive_file, - compresslevel); + archive_file, compress); } - else if (compressmethod == COMPRESSION_LZ4) + else if (compress->algorithm == BACKUP_COMPRESSION_LZ4) { strlcat(archive_filename, ".lz4", sizeof(archive_filename)); streamer = bbstreamer_plain_writer_new(archive_filename, archive_file); - streamer = bbstreamer_lz4_compressor_new(streamer, - compresslevel); + streamer = bbstreamer_lz4_compressor_new(streamer, compress); } - else if (compressmethod == COMPRESSION_ZSTD) + else if (compress->algorithm == BACKUP_COMPRESSION_ZSTD) { strlcat(archive_filename, ".zst", sizeof(archive_filename)); streamer = bbstreamer_plain_writer_new(archive_filename, archive_file); - streamer = bbstreamer_zstd_compressor_new(streamer, - compresslevel); + streamer = bbstreamer_zstd_compressor_new(streamer, compress); } else { @@ -1395,13 +1332,13 @@ CreateBackupStreamer(char *archive_name, char *spclocation, * If the user has requested a server compressed archive along with archive * extraction at client then we need to decompress it. */ - if (format == 'p' && compressloc == COMPRESS_LOCATION_SERVER) + if (format == 'p') { - if (compressmethod == COMPRESSION_GZIP) + if (is_tar_gz) streamer = bbstreamer_gzip_decompressor_new(streamer); - else if (compressmethod == COMPRESSION_LZ4) + else if (is_tar_lz4) streamer = bbstreamer_lz4_decompressor_new(streamer); - else if (compressmethod == COMPRESSION_ZSTD) + else if (is_tar_zstd) streamer = bbstreamer_zstd_decompressor_new(streamer); } @@ -1415,13 +1352,14 @@ CreateBackupStreamer(char *archive_name, char *spclocation, * manifest if present - as a single COPY stream. */ static void -ReceiveArchiveStream(PGconn *conn) +ReceiveArchiveStream(PGconn *conn, bc_specification *compress) { ArchiveStreamState state; /* Set up initial state. */ memset(&state, 0, sizeof(state)); state.tablespacenum = -1; + state.compress = compress; /* All the real work happens in ReceiveArchiveStreamChunk. */ ReceiveCopyData(conn, ReceiveArchiveStreamChunk, &state); @@ -1542,7 +1480,8 @@ ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data) CreateBackupStreamer(archive_name, spclocation, &state->manifest_inject_streamer, - true, false); + true, false, + state->compress); } break; } @@ -1743,7 +1682,7 @@ ReportCopyDataParseError(size_t r, char *copybuf) */ static void ReceiveTarFile(PGconn *conn, char *archive_name, char *spclocation, - bool tablespacenum) + bool tablespacenum, bc_specification *compress) { WriteTarState state; bbstreamer *manifest_inject_streamer; @@ -1759,7 +1698,8 @@ ReceiveTarFile(PGconn *conn, char *archive_name, char *spclocation, state.streamer = CreateBackupStreamer(archive_name, spclocation, &manifest_inject_streamer, is_recovery_guc_supported, - expect_unterminated_tarfile); + expect_unterminated_tarfile, + compress); state.tablespacenum = tablespacenum; ReceiveCopyData(conn, ReceiveTarCopyChunk, &state); progress_update_filename(NULL); @@ -1902,7 +1842,8 @@ ReceiveBackupManifestInMemoryChunk(size_t r, char *copybuf, } static void -BaseBackup(void) +BaseBackup(char *compression_algorithm, char *compression_detail, + CompressionLocation compressloc, bc_specification *client_compress) { PGresult *res; char *sysidentifier; @@ -2055,33 +1996,17 @@ BaseBackup(void) if (compressloc == COMPRESS_LOCATION_SERVER) { - char *compressmethodstr = NULL; - if (!use_new_option_syntax) { pg_log_error("server does not support server-side compression"); exit(1); } - switch (compressmethod) - { - case COMPRESSION_GZIP: - compressmethodstr = "gzip"; - break; - case COMPRESSION_LZ4: - compressmethodstr = "lz4"; - break; - case COMPRESSION_ZSTD: - compressmethodstr = "zstd"; - break; - default: - Assert(false); - break; - } AppendStringCommandOption(&buf, use_new_option_syntax, - "COMPRESSION", compressmethodstr); - if (compresslevel >= 1) /* not 0 or Z_DEFAULT_COMPRESSION */ - AppendIntegerCommandOption(&buf, use_new_option_syntax, - "COMPRESSION_LEVEL", compresslevel); + "COMPRESSION", compression_algorithm); + if (compression_detail != NULL) + AppendStringCommandOption(&buf, use_new_option_syntax, + "COMPRESSION_DETAIL", + compression_detail); } if (verbose) @@ -2207,15 +2132,33 @@ BaseBackup(void) */ if (includewal == STREAM_WAL) { + WalCompressionMethod wal_compress_method; + int wal_compress_level; + if (verbose) pg_log_info("starting background WAL receiver"); - StartLogStreamer(xlogstart, starttli, sysidentifier); + + if (client_compress->algorithm == BACKUP_COMPRESSION_GZIP) + { + wal_compress_method = COMPRESSION_GZIP; + wal_compress_level = + (client_compress->options & BACKUP_COMPRESSION_OPTION_LEVEL) + != 0 ? client_compress->level : 0; + } + else + { + wal_compress_method = COMPRESSION_NONE; + wal_compress_level = 0; + } + + StartLogStreamer(xlogstart, starttli, sysidentifier, + wal_compress_method, wal_compress_level); } if (serverMajor >= 1500) { /* Receive a single tar stream with everything. */ - ReceiveArchiveStream(conn); + ReceiveArchiveStream(conn, client_compress); } else { @@ -2244,7 +2187,8 @@ BaseBackup(void) spclocation = PQgetvalue(res, i, 1); } - ReceiveTarFile(conn, archive_name, spclocation, i); + ReceiveTarFile(conn, archive_name, spclocation, i, + client_compress); } /* @@ -2511,6 +2455,10 @@ main(int argc, char **argv) int c; int option_index; + char *compression_algorithm = "none"; + char *compression_detail = NULL; + CompressionLocation compressloc = COMPRESS_LOCATION_UNSPECIFIED; + bc_specification client_compress; pg_logging_init(argv[0]); progname = get_progname(argv[0]); @@ -2616,17 +2564,13 @@ main(int argc, char **argv) do_sync = false; break; case 'z': -#ifdef HAVE_LIBZ - compresslevel = Z_DEFAULT_COMPRESSION; -#else - compresslevel = 1; /* will be rejected below */ -#endif - compressmethod = COMPRESSION_GZIP; + compression_algorithm = "gzip"; + compression_detail = NULL; compressloc = COMPRESS_LOCATION_UNSPECIFIED; break; case 'Z': - parse_compress_options(optarg, &compressmethod, - &compressloc, &compresslevel); + parse_compress_options(optarg, &compression_algorithm, + &compression_detail, &compressloc); break; case 'c': if (pg_strcasecmp(optarg, "fast") == 0) @@ -2753,12 +2697,11 @@ main(int argc, char **argv) } /* - * If we're compressing the backup and the user has not said where to - * perform the compression, do it on the client, unless they specified - * --target, in which case the server is the only choice. + * If the user has not specified where to perform backup compression, + * default to the client, unless the user specified --target, in which case + * the server is the only choice. */ - if (compressmethod != COMPRESSION_NONE && - compressloc == COMPRESS_LOCATION_UNSPECIFIED) + if (compressloc == COMPRESS_LOCATION_UNSPECIFIED) { if (backup_target == NULL) compressloc = COMPRESS_LOCATION_CLIENT; @@ -2767,6 +2710,40 @@ main(int argc, char **argv) } /* + * If any compression that we're doing is happening on the client side, + * we must try to parse the compression algorithm and detail, but if it's + * all on the server side, then we're just going to pass through whatever + * was requested and let the server decide what to do. + */ + if (compressloc == COMPRESS_LOCATION_CLIENT) + { + bc_algorithm alg; + char *error_detail; + + if (!parse_bc_algorithm(compression_algorithm, &alg)) + { + pg_log_error("unrecognized compression algorithm \"%s\"", + compression_algorithm); + exit(1); + } + + parse_bc_specification(alg, compression_detail, &client_compress); + error_detail = validate_bc_specification(&client_compress); + if (error_detail != NULL) + { + pg_log_error("invalid compression specification: %s", + error_detail); + exit(1); + } + } + else + { + Assert(compressloc == COMPRESS_LOCATION_SERVER); + client_compress.algorithm = BACKUP_COMPRESSION_NONE; + client_compress.options = 0; + } + + /* * Can't perform client-side compression if the backup is not being * sent to the client. */ @@ -2779,9 +2756,10 @@ main(int argc, char **argv) } /* - * Compression doesn't make sense unless tar format is in use. + * Client-side compression doesn't make sense unless tar format is in use. */ - if (format == 'p' && compressloc == COMPRESS_LOCATION_CLIENT) + if (format == 'p' && compressloc == COMPRESS_LOCATION_CLIENT && + client_compress.algorithm != BACKUP_COMPRESSION_NONE) { pg_log_error("only tar mode backups can be compressed"); fprintf(stderr, _("Try \"%s --help\" for more information.\n"), @@ -2882,56 +2860,6 @@ main(int argc, char **argv) } } - /* Sanity checks for compression-related options. */ - switch (compressmethod) - { - case COMPRESSION_NONE: - if (compresslevel != 0) - { - pg_log_error("cannot use compression level with method %s", - "none"); - fprintf(stderr, _("Try \"%s --help\" for more information.\n"), - progname); - exit(1); - } - break; - case COMPRESSION_GZIP: - if (compresslevel > 9) - { - pg_log_error("compression level %d of method %s higher than maximum of 9", - compresslevel, "gzip"); - exit(1); - } - if (compressloc == COMPRESS_LOCATION_CLIENT) - { -#ifdef HAVE_LIBZ - if (compresslevel == 0) - compresslevel = Z_DEFAULT_COMPRESSION; -#else - pg_log_error("this build does not support compression with %s", - "gzip"); - exit(1); -#endif - } - break; - case COMPRESSION_LZ4: - if (compresslevel > 12) - { - pg_log_error("compression level %d of method %s higher than maximum of 12", - compresslevel, "lz4"); - exit(1); - } - break; - case COMPRESSION_ZSTD: - if (compresslevel > 22) - { - pg_log_error("compression level %d of method %s higher than maximum of 22", - compresslevel, "zstd"); - exit(1); - } - break; - } - /* * Sanity checks for progress reporting options. */ @@ -3040,7 +2968,8 @@ main(int argc, char **argv) free(linkloc); } - BaseBackup(); + BaseBackup(compression_algorithm, compression_detail, compressloc, + &client_compress); success = true; return 0; |