diff options
Diffstat (limited to 'src/backend/replication/basebackup_copy.c')
-rw-r--r-- | src/backend/replication/basebackup_copy.c | 277 |
1 files changed, 275 insertions, 2 deletions
diff --git a/src/backend/replication/basebackup_copy.c b/src/backend/replication/basebackup_copy.c index abacc35fcd2..f42b368c033 100644 --- a/src/backend/replication/basebackup_copy.c +++ b/src/backend/replication/basebackup_copy.c @@ -1,8 +1,27 @@ /*------------------------------------------------------------------------- * * basebackup_copy.c - * send basebackup archives using one COPY OUT operation per - * tablespace, and an additional COPY OUT for the backup manifest + * send basebackup archives using COPY OUT + * + * We have two different ways of doing this. + * + * 'copytblspc' is an older method still supported for compatibility + * with releases prior to v15. In this method, a separate COPY OUT + * operation is used for each tablespace. The manifest, if it is sent, + * uses an additional COPY OUT operation. + * + * 'copystream' sends a starts a single COPY OUT operation and transmits + * all the archives and the manifest if present during the course of that + * single COPY OUT. Each CopyData message begins with a type byte, + * allowing us to signal the start of a new archive, or the manifest, + * by some means other than ending the COPY stream. This also allows + * this protocol to be extended more easily, since we can include + * arbitrary information in the message stream as long as we're certain + * that the client will know what to do with it. + * + * Regardless of which method is used, we sent a result set with + * information about the tabelspaces to be included in the backup before + * starting COPY OUT. This result has the same format in every method. * * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group * @@ -18,6 +37,52 @@ #include "libpq/pqformat.h" #include "replication/basebackup.h" #include "replication/basebackup_sink.h" +#include "utils/timestamp.h" + +typedef struct bbsink_copystream +{ + /* Common information for all types of sink. */ + bbsink base; + + /* + * Protocol message buffer. We assemble CopyData protocol messages by + * setting the first character of this buffer to 'd' (archive or manifest + * data) and then making base.bbs_buffer point to the second character so + * that the rest of the data gets copied into the message just where we + * want it. + */ + char *msgbuffer; + + /* + * When did we last report progress to the client, and how much progress + * did we report? + */ + TimestampTz last_progress_report_time; + uint64 bytes_done_at_last_time_check; +} bbsink_copystream; + +/* + * We don't want to send progress messages to the client excessively + * frequently. Ideally, we'd like to send a message when the time since the + * last message reaches PROGRESS_REPORT_MILLISECOND_THRESHOLD, but checking + * the system time every time we send a tiny bit of data seems too expensive. + * So we only check it after the number of bytes sine the last check reaches + * PROGRESS_REPORT_BYTE_INTERVAL. + */ +#define PROGRESS_REPORT_BYTE_INTERVAL 65536 +#define PROGRESS_REPORT_MILLISECOND_THRESHOLD 1000 + +static void bbsink_copystream_begin_backup(bbsink *sink); +static void bbsink_copystream_begin_archive(bbsink *sink, + const char *archive_name); +static void bbsink_copystream_archive_contents(bbsink *sink, size_t len); +static void bbsink_copystream_end_archive(bbsink *sink); +static void bbsink_copystream_begin_manifest(bbsink *sink); +static void bbsink_copystream_manifest_contents(bbsink *sink, size_t len); +static void bbsink_copystream_end_manifest(bbsink *sink); +static void bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr, + TimeLineID endtli); +static void bbsink_copystream_cleanup(bbsink *sink); static void bbsink_copytblspc_begin_backup(bbsink *sink); static void bbsink_copytblspc_begin_archive(bbsink *sink, @@ -38,6 +103,18 @@ static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli); static void SendTablespaceList(List *tablespaces); static void send_int8_string(StringInfoData *buf, int64 intval); +const bbsink_ops bbsink_copystream_ops = { + .begin_backup = bbsink_copystream_begin_backup, + .begin_archive = bbsink_copystream_begin_archive, + .archive_contents = bbsink_copystream_archive_contents, + .end_archive = bbsink_copystream_end_archive, + .begin_manifest = bbsink_copystream_begin_manifest, + .manifest_contents = bbsink_copystream_manifest_contents, + .end_manifest = bbsink_copystream_end_manifest, + .end_backup = bbsink_copystream_end_backup, + .cleanup = bbsink_copystream_cleanup +}; + const bbsink_ops bbsink_copytblspc_ops = { .begin_backup = bbsink_copytblspc_begin_backup, .begin_archive = bbsink_copytblspc_begin_archive, @@ -51,6 +128,202 @@ const bbsink_ops bbsink_copytblspc_ops = { }; /* + * Create a new 'copystream' bbsink. + */ +bbsink * +bbsink_copystream_new(void) +{ + bbsink_copystream *sink = palloc0(sizeof(bbsink_copystream)); + + *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_copystream_ops; + + /* Set up for periodic progress reporting. */ + sink->last_progress_report_time = GetCurrentTimestamp(); + sink->bytes_done_at_last_time_check = UINT64CONST(0); + + return &sink->base; +} + +/* + * Send start-of-backup wire protocol messages. + */ +static void +bbsink_copystream_begin_backup(bbsink *sink) +{ + bbsink_copystream *mysink = (bbsink_copystream *) sink; + bbsink_state *state = sink->bbs_state; + + /* + * Initialize buffer. We ultimately want to send the archive and manifest + * data by means of CopyData messages where the payload portion of each + * message begins with a type byte, so we set up a buffer that begins with + * a the type byte we're going to need, and then arrange things so that + * the data we're given will be written just after that type byte. That + * will allow us to ship the data with a single call to pq_putmessage and + * without needing any extra copying. + */ + mysink->msgbuffer = palloc(mysink->base.bbs_buffer_length + 1); + mysink->base.bbs_buffer = mysink->msgbuffer + 1; + mysink->msgbuffer[0] = 'd'; /* archive or manifest data */ + + /* Tell client the backup start location. */ + SendXlogRecPtrResult(state->startptr, state->starttli); + + /* Send client a list of tablespaces. */ + SendTablespaceList(state->tablespaces); + + /* Send a CommandComplete message */ + pq_puttextmessage('C', "SELECT"); + + /* Begin COPY stream. This will be used for all archives + manifest. */ + SendCopyOutResponse(); +} + +/* + * Send a CopyData message announcing the beginning of a new archive. + */ +static void +bbsink_copystream_begin_archive(bbsink *sink, const char *archive_name) +{ + bbsink_state *state = sink->bbs_state; + tablespaceinfo *ti; + StringInfoData buf; + + ti = list_nth(state->tablespaces, state->tablespace_num); + pq_beginmessage(&buf, 'd'); /* CopyData */ + pq_sendbyte(&buf, 'n'); /* New archive */ + pq_sendstring(&buf, archive_name); + pq_sendstring(&buf, ti->path == NULL ? "" : ti->path); + pq_endmessage(&buf); +} + +/* + * Send a CopyData message containing a chunk of archive content. + */ +static void +bbsink_copystream_archive_contents(bbsink *sink, size_t len) +{ + bbsink_copystream *mysink = (bbsink_copystream *) sink; + bbsink_state *state = mysink->base.bbs_state; + StringInfoData buf; + uint64 targetbytes; + + /* Send the archive content to the client (with leading type byte). */ + pq_putmessage('d', mysink->msgbuffer, len + 1); + + /* Consider whether to send a progress report to the client. */ + targetbytes = mysink->bytes_done_at_last_time_check + + PROGRESS_REPORT_BYTE_INTERVAL; + if (targetbytes <= state->bytes_done) + { + TimestampTz now = GetCurrentTimestamp(); + long ms; + + /* + * OK, we've sent a decent number of bytes, so check the system time + * to see whether we're due to send a progress report. + */ + mysink->bytes_done_at_last_time_check = state->bytes_done; + ms = TimestampDifferenceMilliseconds(mysink->last_progress_report_time, + now); + + /* + * Send a progress report if enough time has passed. Also send one if + * the system clock was set backward, so that such occurrences don't + * have the effect of suppressing further progress messages. + */ + if (ms < 0 || ms >= PROGRESS_REPORT_MILLISECOND_THRESHOLD) + { + mysink->last_progress_report_time = now; + + pq_beginmessage(&buf, 'd'); /* CopyData */ + pq_sendbyte(&buf, 'p'); /* Progress report */ + pq_sendint64(&buf, state->bytes_done); + pq_endmessage(&buf); + pq_flush_if_writable(); + } + } +} + +/* + * We don't need to explicitly signal the end of the archive; the client + * will figure out that we've reached the end when we begin the next one, + * or begin the manifest, or end the COPY stream. However, this seems like + * a good time to force out a progress report. One reason for that is that + * if this is the last archive, and we don't force a progress report now, + * the client will never be told that we sent all the bytes. + */ +static void +bbsink_copystream_end_archive(bbsink *sink) +{ + bbsink_copystream *mysink = (bbsink_copystream *) sink; + bbsink_state *state = mysink->base.bbs_state; + StringInfoData buf; + + mysink->bytes_done_at_last_time_check = state->bytes_done; + mysink->last_progress_report_time = GetCurrentTimestamp(); + pq_beginmessage(&buf, 'd'); /* CopyData */ + pq_sendbyte(&buf, 'p'); /* Progress report */ + pq_sendint64(&buf, state->bytes_done); + pq_endmessage(&buf); + pq_flush_if_writable(); +} + +/* + * Send a CopyData message announcing the beginning of the backup manifest. + */ +static void +bbsink_copystream_begin_manifest(bbsink *sink) +{ + StringInfoData buf; + + pq_beginmessage(&buf, 'd'); /* CopyData */ + pq_sendbyte(&buf, 'm'); /* Manifest */ + pq_endmessage(&buf); +} + +/* + * Each chunk of manifest data is sent using a CopyData message. + */ +static void +bbsink_copystream_manifest_contents(bbsink *sink, size_t len) +{ + bbsink_copystream *mysink = (bbsink_copystream *) sink; + + /* Send the manifest content to the client (with leading type byte). */ + pq_putmessage('d', mysink->msgbuffer, len + 1); +} + +/* + * We don't need an explicit terminator for the backup manifest. + */ +static void +bbsink_copystream_end_manifest(bbsink *sink) +{ + /* Do nothing. */ +} + +/* + * Send end-of-backup wire protocol messages. + */ +static void +bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr, + TimeLineID endtli) +{ + SendCopyDone(); + SendXlogRecPtrResult(endptr, endtli); +} + +/* + * Cleanup. + */ +static void +bbsink_copystream_cleanup(bbsink *sink) +{ + /* Nothing to do. */ +} + +/* * Create a new 'copytblspc' bbsink. */ bbsink * |