aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/basebackup_copy.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/basebackup_copy.c')
-rw-r--r--src/backend/replication/basebackup_copy.c277
1 files changed, 275 insertions, 2 deletions
diff --git a/src/backend/replication/basebackup_copy.c b/src/backend/replication/basebackup_copy.c
index abacc35fcd2..f42b368c033 100644
--- a/src/backend/replication/basebackup_copy.c
+++ b/src/backend/replication/basebackup_copy.c
@@ -1,8 +1,27 @@
/*-------------------------------------------------------------------------
*
* basebackup_copy.c
- * send basebackup archives using one COPY OUT operation per
- * tablespace, and an additional COPY OUT for the backup manifest
+ * send basebackup archives using COPY OUT
+ *
+ * We have two different ways of doing this.
+ *
+ * 'copytblspc' is an older method still supported for compatibility
+ * with releases prior to v15. In this method, a separate COPY OUT
+ * operation is used for each tablespace. The manifest, if it is sent,
+ * uses an additional COPY OUT operation.
+ *
+ * 'copystream' sends a starts a single COPY OUT operation and transmits
+ * all the archives and the manifest if present during the course of that
+ * single COPY OUT. Each CopyData message begins with a type byte,
+ * allowing us to signal the start of a new archive, or the manifest,
+ * by some means other than ending the COPY stream. This also allows
+ * this protocol to be extended more easily, since we can include
+ * arbitrary information in the message stream as long as we're certain
+ * that the client will know what to do with it.
+ *
+ * Regardless of which method is used, we sent a result set with
+ * information about the tabelspaces to be included in the backup before
+ * starting COPY OUT. This result has the same format in every method.
*
* Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
*
@@ -18,6 +37,52 @@
#include "libpq/pqformat.h"
#include "replication/basebackup.h"
#include "replication/basebackup_sink.h"
+#include "utils/timestamp.h"
+
+typedef struct bbsink_copystream
+{
+ /* Common information for all types of sink. */
+ bbsink base;
+
+ /*
+ * Protocol message buffer. We assemble CopyData protocol messages by
+ * setting the first character of this buffer to 'd' (archive or manifest
+ * data) and then making base.bbs_buffer point to the second character so
+ * that the rest of the data gets copied into the message just where we
+ * want it.
+ */
+ char *msgbuffer;
+
+ /*
+ * When did we last report progress to the client, and how much progress
+ * did we report?
+ */
+ TimestampTz last_progress_report_time;
+ uint64 bytes_done_at_last_time_check;
+} bbsink_copystream;
+
+/*
+ * We don't want to send progress messages to the client excessively
+ * frequently. Ideally, we'd like to send a message when the time since the
+ * last message reaches PROGRESS_REPORT_MILLISECOND_THRESHOLD, but checking
+ * the system time every time we send a tiny bit of data seems too expensive.
+ * So we only check it after the number of bytes sine the last check reaches
+ * PROGRESS_REPORT_BYTE_INTERVAL.
+ */
+#define PROGRESS_REPORT_BYTE_INTERVAL 65536
+#define PROGRESS_REPORT_MILLISECOND_THRESHOLD 1000
+
+static void bbsink_copystream_begin_backup(bbsink *sink);
+static void bbsink_copystream_begin_archive(bbsink *sink,
+ const char *archive_name);
+static void bbsink_copystream_archive_contents(bbsink *sink, size_t len);
+static void bbsink_copystream_end_archive(bbsink *sink);
+static void bbsink_copystream_begin_manifest(bbsink *sink);
+static void bbsink_copystream_manifest_contents(bbsink *sink, size_t len);
+static void bbsink_copystream_end_manifest(bbsink *sink);
+static void bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr,
+ TimeLineID endtli);
+static void bbsink_copystream_cleanup(bbsink *sink);
static void bbsink_copytblspc_begin_backup(bbsink *sink);
static void bbsink_copytblspc_begin_archive(bbsink *sink,
@@ -38,6 +103,18 @@ static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
static void SendTablespaceList(List *tablespaces);
static void send_int8_string(StringInfoData *buf, int64 intval);
+const bbsink_ops bbsink_copystream_ops = {
+ .begin_backup = bbsink_copystream_begin_backup,
+ .begin_archive = bbsink_copystream_begin_archive,
+ .archive_contents = bbsink_copystream_archive_contents,
+ .end_archive = bbsink_copystream_end_archive,
+ .begin_manifest = bbsink_copystream_begin_manifest,
+ .manifest_contents = bbsink_copystream_manifest_contents,
+ .end_manifest = bbsink_copystream_end_manifest,
+ .end_backup = bbsink_copystream_end_backup,
+ .cleanup = bbsink_copystream_cleanup
+};
+
const bbsink_ops bbsink_copytblspc_ops = {
.begin_backup = bbsink_copytblspc_begin_backup,
.begin_archive = bbsink_copytblspc_begin_archive,
@@ -51,6 +128,202 @@ const bbsink_ops bbsink_copytblspc_ops = {
};
/*
+ * Create a new 'copystream' bbsink.
+ */
+bbsink *
+bbsink_copystream_new(void)
+{
+ bbsink_copystream *sink = palloc0(sizeof(bbsink_copystream));
+
+ *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_copystream_ops;
+
+ /* Set up for periodic progress reporting. */
+ sink->last_progress_report_time = GetCurrentTimestamp();
+ sink->bytes_done_at_last_time_check = UINT64CONST(0);
+
+ return &sink->base;
+}
+
+/*
+ * Send start-of-backup wire protocol messages.
+ */
+static void
+bbsink_copystream_begin_backup(bbsink *sink)
+{
+ bbsink_copystream *mysink = (bbsink_copystream *) sink;
+ bbsink_state *state = sink->bbs_state;
+
+ /*
+ * Initialize buffer. We ultimately want to send the archive and manifest
+ * data by means of CopyData messages where the payload portion of each
+ * message begins with a type byte, so we set up a buffer that begins with
+ * a the type byte we're going to need, and then arrange things so that
+ * the data we're given will be written just after that type byte. That
+ * will allow us to ship the data with a single call to pq_putmessage and
+ * without needing any extra copying.
+ */
+ mysink->msgbuffer = palloc(mysink->base.bbs_buffer_length + 1);
+ mysink->base.bbs_buffer = mysink->msgbuffer + 1;
+ mysink->msgbuffer[0] = 'd'; /* archive or manifest data */
+
+ /* Tell client the backup start location. */
+ SendXlogRecPtrResult(state->startptr, state->starttli);
+
+ /* Send client a list of tablespaces. */
+ SendTablespaceList(state->tablespaces);
+
+ /* Send a CommandComplete message */
+ pq_puttextmessage('C', "SELECT");
+
+ /* Begin COPY stream. This will be used for all archives + manifest. */
+ SendCopyOutResponse();
+}
+
+/*
+ * Send a CopyData message announcing the beginning of a new archive.
+ */
+static void
+bbsink_copystream_begin_archive(bbsink *sink, const char *archive_name)
+{
+ bbsink_state *state = sink->bbs_state;
+ tablespaceinfo *ti;
+ StringInfoData buf;
+
+ ti = list_nth(state->tablespaces, state->tablespace_num);
+ pq_beginmessage(&buf, 'd'); /* CopyData */
+ pq_sendbyte(&buf, 'n'); /* New archive */
+ pq_sendstring(&buf, archive_name);
+ pq_sendstring(&buf, ti->path == NULL ? "" : ti->path);
+ pq_endmessage(&buf);
+}
+
+/*
+ * Send a CopyData message containing a chunk of archive content.
+ */
+static void
+bbsink_copystream_archive_contents(bbsink *sink, size_t len)
+{
+ bbsink_copystream *mysink = (bbsink_copystream *) sink;
+ bbsink_state *state = mysink->base.bbs_state;
+ StringInfoData buf;
+ uint64 targetbytes;
+
+ /* Send the archive content to the client (with leading type byte). */
+ pq_putmessage('d', mysink->msgbuffer, len + 1);
+
+ /* Consider whether to send a progress report to the client. */
+ targetbytes = mysink->bytes_done_at_last_time_check
+ + PROGRESS_REPORT_BYTE_INTERVAL;
+ if (targetbytes <= state->bytes_done)
+ {
+ TimestampTz now = GetCurrentTimestamp();
+ long ms;
+
+ /*
+ * OK, we've sent a decent number of bytes, so check the system time
+ * to see whether we're due to send a progress report.
+ */
+ mysink->bytes_done_at_last_time_check = state->bytes_done;
+ ms = TimestampDifferenceMilliseconds(mysink->last_progress_report_time,
+ now);
+
+ /*
+ * Send a progress report if enough time has passed. Also send one if
+ * the system clock was set backward, so that such occurrences don't
+ * have the effect of suppressing further progress messages.
+ */
+ if (ms < 0 || ms >= PROGRESS_REPORT_MILLISECOND_THRESHOLD)
+ {
+ mysink->last_progress_report_time = now;
+
+ pq_beginmessage(&buf, 'd'); /* CopyData */
+ pq_sendbyte(&buf, 'p'); /* Progress report */
+ pq_sendint64(&buf, state->bytes_done);
+ pq_endmessage(&buf);
+ pq_flush_if_writable();
+ }
+ }
+}
+
+/*
+ * We don't need to explicitly signal the end of the archive; the client
+ * will figure out that we've reached the end when we begin the next one,
+ * or begin the manifest, or end the COPY stream. However, this seems like
+ * a good time to force out a progress report. One reason for that is that
+ * if this is the last archive, and we don't force a progress report now,
+ * the client will never be told that we sent all the bytes.
+ */
+static void
+bbsink_copystream_end_archive(bbsink *sink)
+{
+ bbsink_copystream *mysink = (bbsink_copystream *) sink;
+ bbsink_state *state = mysink->base.bbs_state;
+ StringInfoData buf;
+
+ mysink->bytes_done_at_last_time_check = state->bytes_done;
+ mysink->last_progress_report_time = GetCurrentTimestamp();
+ pq_beginmessage(&buf, 'd'); /* CopyData */
+ pq_sendbyte(&buf, 'p'); /* Progress report */
+ pq_sendint64(&buf, state->bytes_done);
+ pq_endmessage(&buf);
+ pq_flush_if_writable();
+}
+
+/*
+ * Send a CopyData message announcing the beginning of the backup manifest.
+ */
+static void
+bbsink_copystream_begin_manifest(bbsink *sink)
+{
+ StringInfoData buf;
+
+ pq_beginmessage(&buf, 'd'); /* CopyData */
+ pq_sendbyte(&buf, 'm'); /* Manifest */
+ pq_endmessage(&buf);
+}
+
+/*
+ * Each chunk of manifest data is sent using a CopyData message.
+ */
+static void
+bbsink_copystream_manifest_contents(bbsink *sink, size_t len)
+{
+ bbsink_copystream *mysink = (bbsink_copystream *) sink;
+
+ /* Send the manifest content to the client (with leading type byte). */
+ pq_putmessage('d', mysink->msgbuffer, len + 1);
+}
+
+/*
+ * We don't need an explicit terminator for the backup manifest.
+ */
+static void
+bbsink_copystream_end_manifest(bbsink *sink)
+{
+ /* Do nothing. */
+}
+
+/*
+ * Send end-of-backup wire protocol messages.
+ */
+static void
+bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr,
+ TimeLineID endtli)
+{
+ SendCopyDone();
+ SendXlogRecPtrResult(endptr, endtli);
+}
+
+/*
+ * Cleanup.
+ */
+static void
+bbsink_copystream_cleanup(bbsink *sink)
+{
+ /* Nothing to do. */
+}
+
+/*
* Create a new 'copytblspc' bbsink.
*/
bbsink *