diff options
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/access/transam/timeline.c | 14 | ||||
-rw-r--r-- | src/backend/access/transam/xlog.c | 24 | ||||
-rw-r--r-- | src/backend/access/transam/xlogfuncs.c | 4 | ||||
-rw-r--r-- | src/backend/replication/basebackup.c | 41 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 50 |
5 files changed, 106 insertions, 27 deletions
diff --git a/src/backend/access/transam/timeline.c b/src/backend/access/transam/timeline.c index 46379bbff88..ad4f3162c53 100644 --- a/src/backend/access/transam/timeline.c +++ b/src/backend/access/transam/timeline.c @@ -545,22 +545,26 @@ tliOfPointInHistory(XLogRecPtr ptr, List *history) } /* - * Returns the point in history where we branched off the given timeline. - * Returns InvalidXLogRecPtr if the timeline is current (= we have not - * branched off from it), and throws an error if the timeline is not part of - * this server's history. + * Returns the point in history where we branched off the given timeline, + * and the timeline we branched to (*nextTLI). Returns InvalidXLogRecPtr if + * the timeline is current, ie. we have not branched off from it, and throws + * an error if the timeline is not part of this server's history. */ XLogRecPtr -tliSwitchPoint(TimeLineID tli, List *history) +tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI) { ListCell *cell; + if (nextTLI) + *nextTLI = 0; foreach (cell, history) { TimeLineHistoryEntry *tle = (TimeLineHistoryEntry *) lfirst(cell); if (tle->tli == tli) return tle->end; + if (nextTLI) + *nextTLI = tle->tli; } ereport(ERROR, diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index ac2b26b4982..90ba32ef0f5 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -4930,7 +4930,7 @@ StartupXLOG(void) * tliSwitchPoint will throw an error if the checkpoint's timeline * is not in expectedTLEs at all. */ - switchpoint = tliSwitchPoint(ControlFile->checkPointCopy.ThisTimeLineID, expectedTLEs); + switchpoint = tliSwitchPoint(ControlFile->checkPointCopy.ThisTimeLineID, expectedTLEs, NULL); ereport(FATAL, (errmsg("requested timeline %u is not a child of this server's history", recoveryTargetTLI), @@ -7870,16 +7870,21 @@ XLogFileNameP(TimeLineID tli, XLogSegNo segno) * non-exclusive backups active at the same time, and they don't conflict * with an exclusive backup either. * + * Returns the minimum WAL position that must be present to restore from this + * backup, and the corresponding timeline ID in *starttli_p. + * * Every successfully started non-exclusive backup must be stopped by calling * do_pg_stop_backup() or do_pg_abort_backup(). */ XLogRecPtr -do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile) +do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p, + char **labelfile) { bool exclusive = (labelfile == NULL); bool backup_started_in_recovery = false; XLogRecPtr checkpointloc; XLogRecPtr startpoint; + TimeLineID starttli; pg_time_t stamp_time; char strfbuf[128]; char xlogfilename[MAXFNAMELEN]; @@ -8021,6 +8026,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile) LWLockAcquire(ControlFileLock, LW_SHARED); checkpointloc = ControlFile->checkPoint; startpoint = ControlFile->checkPointCopy.redo; + starttli = ControlFile->checkPointCopy.ThisTimeLineID; checkpointfpw = ControlFile->checkPointCopy.fullPageWrites; LWLockRelease(ControlFileLock); @@ -8154,6 +8160,8 @@ do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile) /* * We're done. As a convenience, return the starting WAL location. */ + if (starttli_p) + *starttli_p = starttli; return startpoint; } @@ -8190,14 +8198,18 @@ pg_start_backup_callback(int code, Datum arg) * If labelfile is NULL, this stops an exclusive backup. Otherwise this stops * the non-exclusive backup specified by 'labelfile'. + * + * Returns the last WAL position that must be present to restore from this + * backup, and the corresponding timeline ID in *stoptli_p. */ XLogRecPtr -do_pg_stop_backup(char *labelfile, bool waitforarchive) +do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p) { bool exclusive = (labelfile == NULL); bool backup_started_in_recovery = false; XLogRecPtr startpoint; XLogRecPtr stoppoint; + TimeLineID stoptli; XLogRecData rdata; pg_time_t stamp_time; char strfbuf[128]; @@ -8401,8 +8413,11 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive) LWLockAcquire(ControlFileLock, LW_SHARED); stoppoint = ControlFile->minRecoveryPoint; + stoptli = ControlFile->minRecoveryPointTLI; LWLockRelease(ControlFileLock); + if (stoptli_p) + *stoptli_p = stoptli; return stoppoint; } @@ -8414,6 +8429,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive) rdata.buffer = InvalidBuffer; rdata.next = NULL; stoppoint = XLogInsert(RM_XLOG_ID, XLOG_BACKUP_END, &rdata); + stoptli = ThisTimeLineID; /* * Force a switch to a new xlog segment file, so that the backup is valid @@ -8529,6 +8545,8 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive) /* * We're done. As a convenience, return the ending WAL location. */ + if (stoptli_p) + *stoptli_p = stoptli; return stoppoint; } diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c index 96db5dbbf52..b6bb6773d6b 100644 --- a/src/backend/access/transam/xlogfuncs.c +++ b/src/backend/access/transam/xlogfuncs.c @@ -56,7 +56,7 @@ pg_start_backup(PG_FUNCTION_ARGS) backupidstr = text_to_cstring(backupid); - startpoint = do_pg_start_backup(backupidstr, fast, NULL); + startpoint = do_pg_start_backup(backupidstr, fast, NULL, NULL); snprintf(startxlogstr, sizeof(startxlogstr), "%X/%X", (uint32) (startpoint >> 32), (uint32) startpoint); @@ -82,7 +82,7 @@ pg_stop_backup(PG_FUNCTION_ARGS) XLogRecPtr stoppoint; char stopxlogstr[MAXFNAMELEN]; - stoppoint = do_pg_stop_backup(NULL, true); + stoppoint = do_pg_stop_backup(NULL, true, NULL); snprintf(stopxlogstr, sizeof(stopxlogstr), "%X/%X", (uint32) (stoppoint >> 32), (uint32) stoppoint); diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index 2330fcc23ad..57946a9fa97 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -55,7 +55,7 @@ static void SendBackupHeader(List *tablespaces); static void base_backup_cleanup(int code, Datum arg); static void perform_base_backup(basebackup_options *opt, DIR *tblspcdir); static void parse_basebackup_options(List *options, basebackup_options *opt); -static void SendXlogRecPtrResult(XLogRecPtr ptr); +static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli); static int compareWalFileNames(const void *a, const void *b); /* Was the backup currently in-progress initiated in recovery mode? */ @@ -94,13 +94,16 @@ static void perform_base_backup(basebackup_options *opt, DIR *tblspcdir) { XLogRecPtr startptr; + TimeLineID starttli; XLogRecPtr endptr; + TimeLineID endtli; char *labelfile; backup_started_in_recovery = RecoveryInProgress(); - startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &labelfile); - SendXlogRecPtrResult(startptr); + startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &starttli, + &labelfile); + SendXlogRecPtrResult(startptr, starttli); PG_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0); { @@ -218,7 +221,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) } PG_END_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0); - endptr = do_pg_stop_backup(labelfile, !opt->nowait); + endptr = do_pg_stop_backup(labelfile, !opt->nowait, &endtli); if (opt->includewal) { @@ -426,7 +429,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) /* Send CopyDone message for the last tar file */ pq_putemptymessage('c'); } - SendXlogRecPtrResult(endptr); + SendXlogRecPtrResult(endptr, endtli); } /* @@ -635,17 +638,15 @@ SendBackupHeader(List *tablespaces) * XlogRecPtr record (in text format) */ static void -SendXlogRecPtrResult(XLogRecPtr ptr) +SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli) { StringInfoData buf; char str[MAXFNAMELEN]; - snprintf(str, sizeof(str), "%X/%X", (uint32) (ptr >> 32), (uint32) ptr); - pq_beginmessage(&buf, 'T'); /* RowDescription */ - pq_sendint(&buf, 1, 2); /* 1 field */ + pq_sendint(&buf, 2, 2); /* 2 fields */ - /* Field header */ + /* Field headers */ pq_sendstring(&buf, "recptr"); pq_sendint(&buf, 0, 4); /* table oid */ pq_sendint(&buf, 0, 2); /* attnum */ @@ -653,11 +654,29 @@ SendXlogRecPtrResult(XLogRecPtr ptr) pq_sendint(&buf, -1, 2); pq_sendint(&buf, 0, 4); pq_sendint(&buf, 0, 2); + + pq_sendstring(&buf, "tli"); + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ + /* + * int8 may seem like a surprising data type for this, but in thory int4 + * would not be wide enough for this, as TimeLineID is unsigned. + */ + pq_sendint(&buf, INT8OID, 4); /* type oid */ + pq_sendint(&buf, -1, 2); + pq_sendint(&buf, 0, 4); + pq_sendint(&buf, 0, 2); pq_endmessage(&buf); /* Data row */ pq_beginmessage(&buf, 'D'); - pq_sendint(&buf, 1, 2); /* number of columns */ + pq_sendint(&buf, 2, 2); /* number of columns */ + + snprintf(str, sizeof(str), "%X/%X", (uint32) (ptr >> 32), (uint32) ptr); + pq_sendint(&buf, strlen(str), 4); /* length */ + pq_sendbytes(&buf, str, strlen(str)); + + snprintf(str, sizeof(str), "%u", tli); pq_sendint(&buf, strlen(str), 4); /* length */ pq_sendbytes(&buf, str, strlen(str)); pq_endmessage(&buf); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index ad7d1c911b3..ba138e73da3 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -117,6 +117,7 @@ static uint32 sendOff = 0; * history forked off from that timeline at sendTimeLineValidUpto. */ static TimeLineID sendTimeLine = 0; +static TimeLineID sendTimeLineNextTLI = 0; static bool sendTimeLineIsHistoric = false; static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr; @@ -449,7 +450,8 @@ StartReplication(StartReplicationCmd *cmd) * requested start location is on that timeline. */ timeLineHistory = readTimeLineHistory(ThisTimeLineID); - switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory); + switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory, + &sendTimeLineNextTLI); list_free_deep(timeLineHistory); /* @@ -496,8 +498,7 @@ StartReplication(StartReplicationCmd *cmd) streamingDoneSending = streamingDoneReceiving = false; /* If there is nothing to stream, don't even enter COPY mode */ - if (!sendTimeLineIsHistoric || - cmd->startpoint < sendTimeLineValidUpto) + if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto) { /* * When we first start replication the standby will be behind the primary. @@ -554,10 +555,46 @@ StartReplication(StartReplicationCmd *cmd) if (walsender_ready_to_stop) proc_exit(0); WalSndSetState(WALSNDSTATE_STARTUP); + + Assert(streamingDoneSending && streamingDoneReceiving); + } + + /* + * Copy is finished now. Send a single-row result set indicating the next + * timeline. + */ + if (sendTimeLineIsHistoric) + { + char str[11]; + snprintf(str, sizeof(str), "%u", sendTimeLineNextTLI); + + pq_beginmessage(&buf, 'T'); /* RowDescription */ + pq_sendint(&buf, 1, 2); /* 1 field */ + + /* Field header */ + pq_sendstring(&buf, "next_tli"); + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ + /* + * int8 may seem like a surprising data type for this, but in theory + * int4 would not be wide enough for this, as TimeLineID is unsigned. + */ + pq_sendint(&buf, INT8OID, 4); /* type oid */ + pq_sendint(&buf, -1, 2); + pq_sendint(&buf, 0, 4); + pq_sendint(&buf, 0, 2); + pq_endmessage(&buf); + + /* Data row */ + pq_beginmessage(&buf, 'D'); + pq_sendint(&buf, 1, 2); /* number of columns */ + pq_sendint(&buf, strlen(str), 4); /* length */ + pq_sendbytes(&buf, str, strlen(str)); + pq_endmessage(&buf); } - /* Get out of COPY mode (CommandComplete). */ - EndCommand("COPY 0", DestRemote); + /* Send CommandComplete message */ + pq_puttextmessage('C', "START_STREAMING"); } /* @@ -1377,8 +1414,9 @@ XLogSend(bool *caughtup) List *history; history = readTimeLineHistory(ThisTimeLineID); - sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history); + sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI); Assert(sentPtr <= sendTimeLineValidUpto); + Assert(sendTimeLine < sendTimeLineNextTLI); list_free_deep(history); /* the current send pointer should be <= the switchpoint */ |