aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/transam/xlog.c81
-rw-r--r--src/backend/access/transam/xlogfuncs.c2
-rw-r--r--src/backend/replication/walreceiver.c4
-rw-r--r--src/backend/replication/walreceiverfuncs.c2
-rw-r--r--src/backend/replication/walsender.c82
-rw-r--r--src/include/access/xlog.h3
6 files changed, 92 insertions, 82 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index d7e83a937c9..d808607ecdb 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -453,6 +453,7 @@ typedef struct XLogCtlData
* replayed, otherwise it's equal to lastReplayedEndRecPtr.
*/
XLogRecPtr lastReplayedEndRecPtr;
+ TimeLineID lastReplayedTLI;
XLogRecPtr replayEndRecPtr;
TimeLineID replayEndTLI;
/* timestamp of last COMMIT/ABORT record replayed (or being replayed) */
@@ -3829,7 +3830,6 @@ rescanLatestTimeLine(void)
TimeLineID newtarget;
TimeLineHistoryEntry *currentTle = NULL;
/* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
newtarget = findNewestTimeLine(recoveryTargetTLI);
if (newtarget == recoveryTargetTLI)
@@ -3888,20 +3888,10 @@ rescanLatestTimeLine(void)
list_free_deep(expectedTLEs);
expectedTLEs = newExpectedTLEs;
- SpinLockAcquire(&xlogctl->info_lck);
- xlogctl->RecoveryTargetTLI = recoveryTargetTLI;
- SpinLockRelease(&xlogctl->info_lck);
-
ereport(LOG,
(errmsg("new target timeline is %u",
recoveryTargetTLI)));
- /*
- * Wake up any walsenders to notice that we have a new target timeline.
- */
- if (AllowCascadeReplication())
- WalSndWakeup();
-
return true;
}
@@ -5389,11 +5379,9 @@ StartupXLOG(void)
ControlFile->minRecoveryPointTLI)));
/*
- * Save the selected recovery target timeline ID and
- * archive_cleanup_command in shared memory so that other processes can
- * see them
+ * Save archive_cleanup_command in shared memory so that other processes
+ * can see it.
*/
- XLogCtl->RecoveryTargetTLI = recoveryTargetTLI;
strncpy(XLogCtl->archiveCleanupCommand,
archiveCleanupCommand ? archiveCleanupCommand : "",
sizeof(XLogCtl->archiveCleanupCommand));
@@ -5770,6 +5758,7 @@ StartupXLOG(void)
xlogctl->replayEndRecPtr = ReadRecPtr;
xlogctl->replayEndTLI = ThisTimeLineID;
xlogctl->lastReplayedEndRecPtr = EndRecPtr;
+ xlogctl->lastReplayedEndRecPtr = ThisTimeLineID;
xlogctl->recoveryLastXTime = 0;
xlogctl->currentChunkStartTime = 0;
xlogctl->recoveryPause = false;
@@ -5837,6 +5826,7 @@ StartupXLOG(void)
*/
do
{
+ bool switchedTLI = false;
#ifdef WAL_DEBUG
if (XLOG_DEBUG ||
(rmid == RM_XACT_ID && trace_recovery_messages <= DEBUG2) ||
@@ -5942,6 +5932,7 @@ StartupXLOG(void)
/* Following WAL records should be run with new TLI */
ThisTimeLineID = newTLI;
+ switchedTLI = true;
}
}
@@ -5974,6 +5965,7 @@ StartupXLOG(void)
*/
SpinLockAcquire(&xlogctl->info_lck);
xlogctl->lastReplayedEndRecPtr = EndRecPtr;
+ xlogctl->lastReplayedTLI = ThisTimeLineID;
SpinLockRelease(&xlogctl->info_lck);
/* Remember this record as the last-applied one */
@@ -5982,6 +5974,13 @@ StartupXLOG(void)
/* Allow read-only connections if we're consistent now */
CheckRecoveryConsistency();
+ /*
+ * If this record was a timeline switch, wake up any
+ * walsenders to notice that we are on a new timeline.
+ */
+ if (switchedTLI && AllowCascadeReplication())
+ WalSndWakeup();
+
/* Exit loop if we reached inclusive recovery target */
if (!recoveryContinue)
break;
@@ -6823,23 +6822,6 @@ GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
}
/*
- * GetRecoveryTargetTLI - get the current recovery target timeline ID
- */
-TimeLineID
-GetRecoveryTargetTLI(void)
-{
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
- TimeLineID result;
-
- SpinLockAcquire(&xlogctl->info_lck);
- result = xlogctl->RecoveryTargetTLI;
- SpinLockRelease(&xlogctl->info_lck);
-
- return result;
-}
-
-/*
* This must be called ONCE during postmaster or standalone-backend shutdown
*/
void
@@ -7642,10 +7624,16 @@ CreateRestartPoint(int flags)
*/
if (_logSegNo)
{
+ XLogRecPtr receivePtr;
+ XLogRecPtr replayPtr;
XLogRecPtr endptr;
- /* Get the current (or recent) end of xlog */
- endptr = GetStandbyFlushRecPtr();
+ /*
+ * Get the current end of xlog replayed or received, whichever is later.
+ */
+ receivePtr = GetWalRcvWriteRecPtr(NULL, NULL);
+ replayPtr = GetXLogReplayRecPtr(NULL);
+ endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
KeepLogSeg(endptr, &_logSegNo);
_logSegNo--;
@@ -9109,39 +9097,24 @@ do_pg_abort_backup(void)
* Exported to allow WALReceiver to read the pointer directly.
*/
XLogRecPtr
-GetXLogReplayRecPtr(void)
+GetXLogReplayRecPtr(TimeLineID *replayTLI)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
XLogRecPtr recptr;
+ TimeLineID tli;
SpinLockAcquire(&xlogctl->info_lck);
recptr = xlogctl->lastReplayedEndRecPtr;
+ tli = xlogctl->lastReplayedTLI;
SpinLockRelease(&xlogctl->info_lck);
+ if (replayTLI)
+ *replayTLI = tli;
return recptr;
}
/*
- * Get current standby flush position, ie, the last WAL position
- * known to be fsync'd to disk in standby.
- */
-XLogRecPtr
-GetStandbyFlushRecPtr(void)
-{
- XLogRecPtr receivePtr;
- XLogRecPtr replayPtr;
-
- receivePtr = GetWalRcvWriteRecPtr(NULL, NULL);
- replayPtr = GetXLogReplayRecPtr();
-
- if (XLByteLT(receivePtr, replayPtr))
- return replayPtr;
- else
- return receivePtr;
-}
-
-/*
* Get latest WAL insert pointer
*/
XLogRecPtr
diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c
index e91bdc3f4af..47624c3e75f 100644
--- a/src/backend/access/transam/xlogfuncs.c
+++ b/src/backend/access/transam/xlogfuncs.c
@@ -248,7 +248,7 @@ pg_last_xlog_replay_location(PG_FUNCTION_ARGS)
XLogRecPtr recptr;
char location[MAXFNAMELEN];
- recptr = GetXLogReplayRecPtr();
+ recptr = GetXLogReplayRecPtr(NULL);
if (recptr == 0)
PG_RETURN_NULL();
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 303edb75a32..a0960f2ceab 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -370,7 +370,7 @@ WalReceiverMain(void)
first_stream = false;
/* Initialize LogstreamResult and buffers for processing messages */
- LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr();
+ LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
initStringInfo(&reply_message);
initStringInfo(&incoming_message);
@@ -1026,7 +1026,7 @@ XLogWalRcvSendReply(bool force, bool requestReply)
/* Construct a new message */
writePtr = LogstreamResult.Write;
flushPtr = LogstreamResult.Flush;
- applyPtr = GetXLogReplayRecPtr();
+ applyPtr = GetXLogReplayRecPtr(NULL);
resetStringInfo(&reply_message);
pq_sendbyte(&reply_message, 'r');
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index a8ccfc66398..1aaafbb49fc 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -324,7 +324,7 @@ GetReplicationApplyDelay(void)
receivePtr = walrcv->receivedUpto;
SpinLockRelease(&walrcv->mutex);
- replayPtr = GetXLogReplayRecPtr();
+ replayPtr = GetXLogReplayRecPtr(NULL);
if (XLByteEQ(receivePtr, replayPtr))
return 0;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index aec57f5535f..29a25eb9035 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -169,6 +169,7 @@ static void WalSndLoop(void);
static void InitWalSenderSlot(void);
static void WalSndKill(int code, Datum arg);
static void XLogSend(bool *caughtup);
+static XLogRecPtr GetStandbyFlushRecPtr(TimeLineID currentTLI);
static void IdentifySystem(void);
static void StartReplication(StartReplicationCmd *cmd);
static void ProcessStandbyMessage(void);
@@ -191,12 +192,6 @@ InitWalSender(void)
CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
/*
- * Use the recovery target timeline ID during recovery
- */
- if (am_cascading_walsender)
- ThisTimeLineID = GetRecoveryTargetTLI();
-
- /*
* Let postmaster know that we're a WAL sender. Once we've declared us as
* a WAL sender process, postmaster will let us outlive the bgwriter and
* kill us last in the shutdown sequence, so we get a chance to stream all
@@ -254,8 +249,8 @@ IdentifySystem(void)
am_cascading_walsender = RecoveryInProgress();
if (am_cascading_walsender)
{
- logptr = GetStandbyFlushRecPtr();
- ThisTimeLineID = GetRecoveryTargetTLI();
+ /* this also updates ThisTimeLineID */
+ logptr = GetStandbyFlushRecPtr(0);
}
else
logptr = GetInsertRecPtr();
@@ -409,6 +404,7 @@ static void
StartReplication(StartReplicationCmd *cmd)
{
StringInfoData buf;
+ XLogRecPtr FlushPtr;
/*
* We assume here that we're logging enough information in the WAL for
@@ -421,8 +417,17 @@ StartReplication(StartReplicationCmd *cmd)
/*
* Select the timeline. If it was given explicitly by the client, use
- * that. Otherwise use the current ThisTimeLineID.
+ * that. Otherwise use the timeline of the last replayed record, which
+ * is kept in ThisTimeLineID.
*/
+ if (am_cascading_walsender)
+ {
+ /* this also updates ThisTimeLineID */
+ FlushPtr = GetStandbyFlushRecPtr(0);
+ }
+ else
+ FlushPtr = GetFlushRecPtr();
+
if (cmd->timeline != 0)
{
XLogRecPtr switchpoint;
@@ -494,7 +499,6 @@ StartReplication(StartReplicationCmd *cmd)
if (!sendTimeLineIsHistoric ||
XLByteLT(cmd->startpoint, sendTimeLineValidUpto))
{
- XLogRecPtr FlushPtr;
/*
* When we first start replication the standby will be behind the primary.
* For some applications, for example, synchronous replication, it is
@@ -516,10 +520,6 @@ StartReplication(StartReplicationCmd *cmd)
* Don't allow a request to stream from a future point in WAL that
* hasn't been flushed to disk in this server yet.
*/
- if (am_cascading_walsender)
- FlushPtr = GetStandbyFlushRecPtr();
- else
- FlushPtr = GetFlushRecPtr();
if (XLByteLT(FlushPtr, cmd->startpoint))
{
ereport(ERROR,
@@ -1330,7 +1330,7 @@ XLogSend(bool *caughtup)
* that gets lost on the master.
*/
if (am_cascading_walsender)
- FlushPtr = GetStandbyFlushRecPtr();
+ FlushPtr = GetStandbyFlushRecPtr(sendTimeLine);
else
FlushPtr = GetFlushRecPtr();
@@ -1347,7 +1347,6 @@ XLogSend(bool *caughtup)
if (!sendTimeLineIsHistoric && am_cascading_walsender)
{
bool becameHistoric = false;
- TimeLineID targetTLI;
if (!RecoveryInProgress())
{
@@ -1355,7 +1354,6 @@ XLogSend(bool *caughtup)
* We have been promoted. RecoveryInProgress() updated
* ThisTimeLineID to the new current timeline.
*/
- targetTLI = ThisTimeLineID;
am_cascading_walsender = false;
becameHistoric = true;
}
@@ -1363,11 +1361,9 @@ XLogSend(bool *caughtup)
{
/*
* Still a cascading standby. But is the timeline we're sending
- * still the recovery target timeline?
+ * still the one recovery is recovering from?
*/
- targetTLI = GetRecoveryTargetTLI();
-
- if (targetTLI != sendTimeLine)
+ if (sendTimeLine != ThisTimeLineID)
becameHistoric = true;
}
@@ -1380,7 +1376,7 @@ XLogSend(bool *caughtup)
*/
List *history;
- history = readTimeLineHistory(targetTLI);
+ history = readTimeLineHistory(ThisTimeLineID);
sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history);
Assert(XLByteLE(sentPtr, sendTimeLineValidUpto));
list_free_deep(history);
@@ -1522,6 +1518,48 @@ XLogSend(bool *caughtup)
}
/*
+ * Returns the latest point in WAL that has been safely flushed to disk, and
+ * can be sent to the standby. This should only be called when in recovery,
+ * ie. we're streaming to a cascaded standby.
+ *
+ * If currentTLI is non-zero, the function returns the point that the WAL on
+ * the given timeline has been flushed upto. If recovery has already switched
+ * to a different timeline, InvalidXLogRecPtr is returned.
+ *
+ * As a side-effect, ThisTimeLineID is updated to the TLI of the last
+ * replayed WAL record.
+ */
+static XLogRecPtr
+GetStandbyFlushRecPtr(TimeLineID currentTLI)
+{
+ XLogRecPtr replayPtr;
+ TimeLineID replayTLI;
+ XLogRecPtr receivePtr;
+ TimeLineID receiveTLI;
+ XLogRecPtr result;
+
+ /*
+ * We can safely send what's already been replayed. Also, if walreceiver
+ * is streaming WAL from the same timeline, we can send anything that
+ * it has streamed, but hasn't been replayed yet.
+ */
+
+ receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
+ replayPtr = GetXLogReplayRecPtr(&replayTLI);
+
+ ThisTimeLineID = replayTLI;
+
+ if (currentTLI != replayTLI && currentTLI != 0)
+ return InvalidXLogRecPtr;
+
+ result = replayPtr;
+ if (receiveTLI == currentTLI && receivePtr > replayPtr)
+ result = receivePtr;
+
+ return result;
+}
+
+/*
* Request walsenders to reload the currently-open WAL file
*/
void
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index c8cd37981c5..95d01b97444 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -283,8 +283,7 @@ extern bool RecoveryInProgress(void);
extern bool HotStandbyActive(void);
extern bool XLogInsertAllowed(void);
extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
-extern XLogRecPtr GetXLogReplayRecPtr(void);
-extern XLogRecPtr GetStandbyFlushRecPtr(void);
+extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI);
extern XLogRecPtr GetXLogInsertRecPtr(void);
extern XLogRecPtr GetXLogWriteRecPtr(void);
extern bool RecoveryIsPaused(void);