aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/basebackup_copy.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/basebackup_copy.c')
-rw-r--r--src/backend/replication/basebackup_copy.c335
1 files changed, 335 insertions, 0 deletions
diff --git a/src/backend/replication/basebackup_copy.c b/src/backend/replication/basebackup_copy.c
new file mode 100644
index 00000000000..30bab4546ef
--- /dev/null
+++ b/src/backend/replication/basebackup_copy.c
@@ -0,0 +1,335 @@
+/*-------------------------------------------------------------------------
+ *
+ * basebackup_copy.c
+ * send basebackup archives using one COPY OUT operation per
+ * tablespace, and an additional COPY OUT for the backup manifest
+ *
+ * Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/replication/basebackup_copy.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "catalog/pg_type_d.h"
+#include "libpq/libpq.h"
+#include "libpq/pqformat.h"
+#include "replication/basebackup.h"
+#include "replication/basebackup_sink.h"
+
+static void bbsink_copytblspc_begin_backup(bbsink *sink);
+static void bbsink_copytblspc_begin_archive(bbsink *sink,
+ const char *archive_name);
+static void bbsink_copytblspc_archive_contents(bbsink *sink, size_t len);
+static void bbsink_copytblspc_end_archive(bbsink *sink);
+static void bbsink_copytblspc_begin_manifest(bbsink *sink);
+static void bbsink_copytblspc_manifest_contents(bbsink *sink, size_t len);
+static void bbsink_copytblspc_end_manifest(bbsink *sink);
+static void bbsink_copytblspc_end_backup(bbsink *sink, XLogRecPtr endptr,
+ TimeLineID endtli);
+static void bbsink_copytblspc_cleanup(bbsink *sink);
+
+static void SendCopyOutResponse(void);
+static void SendCopyData(const char *data, size_t len);
+static void SendCopyDone(void);
+static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
+static void SendTablespaceList(List *tablespaces);
+static void send_int8_string(StringInfoData *buf, int64 intval);
+
+const bbsink_ops bbsink_copytblspc_ops = {
+ .begin_backup = bbsink_copytblspc_begin_backup,
+ .begin_archive = bbsink_copytblspc_begin_archive,
+ .archive_contents = bbsink_copytblspc_archive_contents,
+ .end_archive = bbsink_copytblspc_end_archive,
+ .begin_manifest = bbsink_copytblspc_begin_manifest,
+ .manifest_contents = bbsink_copytblspc_manifest_contents,
+ .end_manifest = bbsink_copytblspc_end_manifest,
+ .end_backup = bbsink_copytblspc_end_backup,
+ .cleanup = bbsink_copytblspc_cleanup
+};
+
+/*
+ * Create a new 'copytblspc' bbsink.
+ */
+bbsink *
+bbsink_copytblspc_new(void)
+{
+ bbsink *sink = palloc0(sizeof(bbsink));
+
+ *((const bbsink_ops **) &sink->bbs_ops) = &bbsink_copytblspc_ops;
+
+ return sink;
+}
+
+/*
+ * Begin backup.
+ */
+static void
+bbsink_copytblspc_begin_backup(bbsink *sink)
+{
+ bbsink_state *state = sink->bbs_state;
+
+ /* Create a suitable buffer. */
+ sink->bbs_buffer = palloc(sink->bbs_buffer_length);
+
+ /* Tell client the backup start location. */
+ SendXlogRecPtrResult(state->startptr, state->starttli);
+
+ /* Send client a list of tablespaces. */
+ SendTablespaceList(state->tablespaces);
+
+ /* Send a CommandComplete message */
+ pq_puttextmessage('C', "SELECT");
+}
+
+/*
+ * Each archive is set as a separate stream of COPY data, and thus begins
+ * with a CopyOutResponse message.
+ */
+static void
+bbsink_copytblspc_begin_archive(bbsink *sink, const char *archive_name)
+{
+ SendCopyOutResponse();
+}
+
+/*
+ * Each chunk of data within the archive is sent as a CopyData message.
+ */
+static void
+bbsink_copytblspc_archive_contents(bbsink *sink, size_t len)
+{
+ SendCopyData(sink->bbs_buffer, len);
+}
+
+/*
+ * The archive is terminated by a CopyDone message.
+ */
+static void
+bbsink_copytblspc_end_archive(bbsink *sink)
+{
+ SendCopyDone();
+}
+
+/*
+ * The backup manifest is sent as a separate stream of COPY data, and thus
+ * begins with a CopyOutResponse message.
+ */
+static void
+bbsink_copytblspc_begin_manifest(bbsink *sink)
+{
+ SendCopyOutResponse();
+}
+
+/*
+ * Each chunk of manifest data is sent using a CopyData message.
+ */
+static void
+bbsink_copytblspc_manifest_contents(bbsink *sink, size_t len)
+{
+ SendCopyData(sink->bbs_buffer, len);
+}
+
+/*
+ * When we've finished sending the manifest, send a CopyDone message.
+ */
+static void
+bbsink_copytblspc_end_manifest(bbsink *sink)
+{
+ SendCopyDone();
+}
+
+/*
+ * Send end-of-backup wire protocol messages.
+ */
+static void
+bbsink_copytblspc_end_backup(bbsink *sink, XLogRecPtr endptr,
+ TimeLineID endtli)
+{
+ SendXlogRecPtrResult(endptr, endtli);
+}
+
+/*
+ * Cleanup.
+ */
+static void
+bbsink_copytblspc_cleanup(bbsink *sink)
+{
+ /* Nothing to do. */
+}
+
+/*
+ * Send a CopyOutResponse message.
+ */
+static void
+SendCopyOutResponse(void)
+{
+ StringInfoData buf;
+
+ pq_beginmessage(&buf, 'H');
+ pq_sendbyte(&buf, 0); /* overall format */
+ pq_sendint16(&buf, 0); /* natts */
+ pq_endmessage(&buf);
+}
+
+/*
+ * Send a CopyData message.
+ */
+static void
+SendCopyData(const char *data, size_t len)
+{
+ pq_putmessage('d', data, len);
+}
+
+/*
+ * Send a CopyDone message.
+ */
+static void
+SendCopyDone(void)
+{
+ pq_putemptymessage('c');
+}
+
+/*
+ * Send a single resultset containing just a single
+ * XLogRecPtr record (in text format)
+ */
+static void
+SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
+{
+ StringInfoData buf;
+ char str[MAXFNAMELEN];
+ Size len;
+
+ pq_beginmessage(&buf, 'T'); /* RowDescription */
+ pq_sendint16(&buf, 2); /* 2 fields */
+
+ /* Field headers */
+ pq_sendstring(&buf, "recptr");
+ pq_sendint32(&buf, 0); /* table oid */
+ pq_sendint16(&buf, 0); /* attnum */
+ pq_sendint32(&buf, TEXTOID); /* type oid */
+ pq_sendint16(&buf, -1);
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+
+ pq_sendstring(&buf, "tli");
+ pq_sendint32(&buf, 0); /* table oid */
+ pq_sendint16(&buf, 0); /* attnum */
+
+ /*
+ * int8 may seem like a surprising data type for this, but in theory int4
+ * would not be wide enough for this, as TimeLineID is unsigned.
+ */
+ pq_sendint32(&buf, INT8OID); /* type oid */
+ pq_sendint16(&buf, -1);
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_endmessage(&buf);
+
+ /* Data row */
+ pq_beginmessage(&buf, 'D');
+ pq_sendint16(&buf, 2); /* number of columns */
+
+ len = snprintf(str, sizeof(str),
+ "%X/%X", LSN_FORMAT_ARGS(ptr));
+ pq_sendint32(&buf, len);
+ pq_sendbytes(&buf, str, len);
+
+ len = snprintf(str, sizeof(str), "%u", tli);
+ pq_sendint32(&buf, len);
+ pq_sendbytes(&buf, str, len);
+
+ pq_endmessage(&buf);
+
+ /* Send a CommandComplete message */
+ pq_puttextmessage('C', "SELECT");
+}
+
+/*
+ * Send a result set via libpq describing the tablespace list.
+ */
+static void
+SendTablespaceList(List *tablespaces)
+{
+ StringInfoData buf;
+ ListCell *lc;
+
+ /* Construct and send the directory information */
+ pq_beginmessage(&buf, 'T'); /* RowDescription */
+ pq_sendint16(&buf, 3); /* 3 fields */
+
+ /* First field - spcoid */
+ pq_sendstring(&buf, "spcoid");
+ pq_sendint32(&buf, 0); /* table oid */
+ pq_sendint16(&buf, 0); /* attnum */
+ pq_sendint32(&buf, OIDOID); /* type oid */
+ pq_sendint16(&buf, 4); /* typlen */
+ pq_sendint32(&buf, 0); /* typmod */
+ pq_sendint16(&buf, 0); /* format code */
+
+ /* Second field - spclocation */
+ pq_sendstring(&buf, "spclocation");
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_sendint32(&buf, TEXTOID);
+ pq_sendint16(&buf, -1);
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+
+ /* Third field - size */
+ pq_sendstring(&buf, "size");
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_sendint32(&buf, INT8OID);
+ pq_sendint16(&buf, 8);
+ pq_sendint32(&buf, 0);
+ pq_sendint16(&buf, 0);
+ pq_endmessage(&buf);
+
+ foreach(lc, tablespaces)
+ {
+ tablespaceinfo *ti = lfirst(lc);
+
+ /* Send one datarow message */
+ pq_beginmessage(&buf, 'D');
+ pq_sendint16(&buf, 3); /* number of columns */
+ if (ti->path == NULL)
+ {
+ pq_sendint32(&buf, -1); /* Length = -1 ==> NULL */
+ pq_sendint32(&buf, -1);
+ }
+ else
+ {
+ Size len;
+
+ len = strlen(ti->oid);
+ pq_sendint32(&buf, len);
+ pq_sendbytes(&buf, ti->oid, len);
+
+ len = strlen(ti->path);
+ pq_sendint32(&buf, len);
+ pq_sendbytes(&buf, ti->path, len);
+ }
+ if (ti->size >= 0)
+ send_int8_string(&buf, ti->size / 1024);
+ else
+ pq_sendint32(&buf, -1); /* NULL */
+
+ pq_endmessage(&buf);
+ }
+}
+
+/*
+ * Send a 64-bit integer as a string via the wire protocol.
+ */
+static void
+send_int8_string(StringInfoData *buf, int64 intval)
+{
+ char is[32];
+
+ sprintf(is, INT64_FORMAT, intval);
+ pq_sendint32(buf, strlen(is));
+ pq_sendbytes(buf, is, strlen(is));
+}