aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication')
-rw-r--r--src/backend/replication/basebackup.c41
-rw-r--r--src/backend/replication/walsender.c50
2 files changed, 74 insertions, 17 deletions
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index 2330fcc23ad..57946a9fa97 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -55,7 +55,7 @@ static void SendBackupHeader(List *tablespaces);
static void base_backup_cleanup(int code, Datum arg);
static void perform_base_backup(basebackup_options *opt, DIR *tblspcdir);
static void parse_basebackup_options(List *options, basebackup_options *opt);
-static void SendXlogRecPtrResult(XLogRecPtr ptr);
+static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
static int compareWalFileNames(const void *a, const void *b);
/* Was the backup currently in-progress initiated in recovery mode? */
@@ -94,13 +94,16 @@ static void
perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
{
XLogRecPtr startptr;
+ TimeLineID starttli;
XLogRecPtr endptr;
+ TimeLineID endtli;
char *labelfile;
backup_started_in_recovery = RecoveryInProgress();
- startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &labelfile);
- SendXlogRecPtrResult(startptr);
+ startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &starttli,
+ &labelfile);
+ SendXlogRecPtrResult(startptr, starttli);
PG_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0);
{
@@ -218,7 +221,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
}
PG_END_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0);
- endptr = do_pg_stop_backup(labelfile, !opt->nowait);
+ endptr = do_pg_stop_backup(labelfile, !opt->nowait, &endtli);
if (opt->includewal)
{
@@ -426,7 +429,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
/* Send CopyDone message for the last tar file */
pq_putemptymessage('c');
}
- SendXlogRecPtrResult(endptr);
+ SendXlogRecPtrResult(endptr, endtli);
}
/*
@@ -635,17 +638,15 @@ SendBackupHeader(List *tablespaces)
* XlogRecPtr record (in text format)
*/
static void
-SendXlogRecPtrResult(XLogRecPtr ptr)
+SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
{
StringInfoData buf;
char str[MAXFNAMELEN];
- snprintf(str, sizeof(str), "%X/%X", (uint32) (ptr >> 32), (uint32) ptr);
-
pq_beginmessage(&buf, 'T'); /* RowDescription */
- pq_sendint(&buf, 1, 2); /* 1 field */
+ pq_sendint(&buf, 2, 2); /* 2 fields */
- /* Field header */
+ /* Field headers */
pq_sendstring(&buf, "recptr");
pq_sendint(&buf, 0, 4); /* table oid */
pq_sendint(&buf, 0, 2); /* attnum */
@@ -653,11 +654,29 @@ SendXlogRecPtrResult(XLogRecPtr ptr)
pq_sendint(&buf, -1, 2);
pq_sendint(&buf, 0, 4);
pq_sendint(&buf, 0, 2);
+
+ pq_sendstring(&buf, "tli");
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
+ /*
+ * int8 may seem like a surprising data type for this, but in thory int4
+ * would not be wide enough for this, as TimeLineID is unsigned.
+ */
+ pq_sendint(&buf, INT8OID, 4); /* type oid */
+ pq_sendint(&buf, -1, 2);
+ pq_sendint(&buf, 0, 4);
+ pq_sendint(&buf, 0, 2);
pq_endmessage(&buf);
/* Data row */
pq_beginmessage(&buf, 'D');
- pq_sendint(&buf, 1, 2); /* number of columns */
+ pq_sendint(&buf, 2, 2); /* number of columns */
+
+ snprintf(str, sizeof(str), "%X/%X", (uint32) (ptr >> 32), (uint32) ptr);
+ pq_sendint(&buf, strlen(str), 4); /* length */
+ pq_sendbytes(&buf, str, strlen(str));
+
+ snprintf(str, sizeof(str), "%u", tli);
pq_sendint(&buf, strlen(str), 4); /* length */
pq_sendbytes(&buf, str, strlen(str));
pq_endmessage(&buf);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index ad7d1c911b3..ba138e73da3 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -117,6 +117,7 @@ static uint32 sendOff = 0;
* history forked off from that timeline at sendTimeLineValidUpto.
*/
static TimeLineID sendTimeLine = 0;
+static TimeLineID sendTimeLineNextTLI = 0;
static bool sendTimeLineIsHistoric = false;
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr;
@@ -449,7 +450,8 @@ StartReplication(StartReplicationCmd *cmd)
* requested start location is on that timeline.
*/
timeLineHistory = readTimeLineHistory(ThisTimeLineID);
- switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory);
+ switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
+ &sendTimeLineNextTLI);
list_free_deep(timeLineHistory);
/*
@@ -496,8 +498,7 @@ StartReplication(StartReplicationCmd *cmd)
streamingDoneSending = streamingDoneReceiving = false;
/* If there is nothing to stream, don't even enter COPY mode */
- if (!sendTimeLineIsHistoric ||
- cmd->startpoint < sendTimeLineValidUpto)
+ if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
{
/*
* When we first start replication the standby will be behind the primary.
@@ -554,10 +555,46 @@ StartReplication(StartReplicationCmd *cmd)
if (walsender_ready_to_stop)
proc_exit(0);
WalSndSetState(WALSNDSTATE_STARTUP);
+
+ Assert(streamingDoneSending && streamingDoneReceiving);
+ }
+
+ /*
+ * Copy is finished now. Send a single-row result set indicating the next
+ * timeline.
+ */
+ if (sendTimeLineIsHistoric)
+ {
+ char str[11];
+ snprintf(str, sizeof(str), "%u", sendTimeLineNextTLI);
+
+ pq_beginmessage(&buf, 'T'); /* RowDescription */
+ pq_sendint(&buf, 1, 2); /* 1 field */
+
+ /* Field header */
+ pq_sendstring(&buf, "next_tli");
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
+ /*
+ * int8 may seem like a surprising data type for this, but in theory
+ * int4 would not be wide enough for this, as TimeLineID is unsigned.
+ */
+ pq_sendint(&buf, INT8OID, 4); /* type oid */
+ pq_sendint(&buf, -1, 2);
+ pq_sendint(&buf, 0, 4);
+ pq_sendint(&buf, 0, 2);
+ pq_endmessage(&buf);
+
+ /* Data row */
+ pq_beginmessage(&buf, 'D');
+ pq_sendint(&buf, 1, 2); /* number of columns */
+ pq_sendint(&buf, strlen(str), 4); /* length */
+ pq_sendbytes(&buf, str, strlen(str));
+ pq_endmessage(&buf);
}
- /* Get out of COPY mode (CommandComplete). */
- EndCommand("COPY 0", DestRemote);
+ /* Send CommandComplete message */
+ pq_puttextmessage('C', "START_STREAMING");
}
/*
@@ -1377,8 +1414,9 @@ XLogSend(bool *caughtup)
List *history;
history = readTimeLineHistory(ThisTimeLineID);
- sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history);
+ sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
Assert(sentPtr <= sendTimeLineValidUpto);
+ Assert(sendTimeLine < sendTimeLineNextTLI);
list_free_deep(history);
/* the current send pointer should be <= the switchpoint */