diff options
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r-- | src/backend/access/transam/xlog.c | 641 |
1 files changed, 328 insertions, 313 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 76b52fb1dcb..dcd33c931c0 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -200,14 +200,14 @@ static int LocalXLogInsertAllowed = -1; * will switch to using offline XLOG archives as soon as we reach the end of * WAL in pg_xlog. */ -bool ArchiveRecoveryRequested = false; -bool InArchiveRecovery = false; +bool ArchiveRecoveryRequested = false; +bool InArchiveRecovery = false; /* Was the last xlog file restored from archive, or local? */ static bool restoredFromArchive = false; /* options taken from recovery.conf for archive recovery */ -char *recoveryRestoreCommand = NULL; +char *recoveryRestoreCommand = NULL; static char *recoveryEndCommand = NULL; static char *archiveCleanupCommand = NULL; static RecoveryTargetType recoveryTarget = RECOVERY_TARGET_UNSET; @@ -223,7 +223,7 @@ static char *PrimaryConnInfo = NULL; static char *TriggerFile = NULL; /* are we currently in standby mode? */ -bool StandbyMode = false; +bool StandbyMode = false; /* whether request for fast promotion has been made yet */ static bool fast_promote = false; @@ -403,10 +403,11 @@ typedef struct XLogCtlData uint32 ckptXidEpoch; /* nextXID & epoch of latest checkpoint */ TransactionId ckptXid; XLogRecPtr asyncXactLSN; /* LSN of newest async commit/abort */ - XLogSegNo lastRemovedSegNo; /* latest removed/recycled XLOG segment */ + XLogSegNo lastRemovedSegNo; /* latest removed/recycled XLOG + * segment */ /* Fake LSN counter, for unlogged relations. Protected by ulsn_lck */ - XLogRecPtr unloggedLSN; + XLogRecPtr unloggedLSN; slock_t ulsn_lck; /* Protected by WALWriteLock: */ @@ -548,14 +549,14 @@ static XLogwrtResult LogwrtResult = {0, 0}; */ typedef enum { - XLOG_FROM_ANY = 0, /* request to read WAL from any source */ - XLOG_FROM_ARCHIVE, /* restored using restore_command */ - XLOG_FROM_PG_XLOG, /* existing file in pg_xlog */ - XLOG_FROM_STREAM, /* streamed from master */ + XLOG_FROM_ANY = 0, /* request to read WAL from any source */ + XLOG_FROM_ARCHIVE, /* restored using restore_command */ + XLOG_FROM_PG_XLOG, /* existing file in pg_xlog */ + XLOG_FROM_STREAM, /* streamed from master */ } XLogSource; /* human-readable names for XLogSources, for debugging output */ -static const char *xlogSourceNames[] = { "any", "archive", "pg_xlog", "stream" }; +static const char *xlogSourceNames[] = {"any", "archive", "pg_xlog", "stream"}; /* * openLogFile is -1 or a kernel FD for an open log file segment. @@ -589,7 +590,7 @@ static XLogSource readSource = 0; /* XLOG_FROM_* code */ * next. */ static XLogSource currentSource = 0; /* XLOG_FROM_* code */ -static bool lastSourceFailed = false; +static bool lastSourceFailed = false; typedef struct XLogPageReadPrivate { @@ -607,7 +608,7 @@ typedef struct XLogPageReadPrivate * XLogReceiptSource tracks where we last successfully read some WAL.) */ static TimestampTz XLogReceiptTime = 0; -static XLogSource XLogReceiptSource = 0; /* XLOG_FROM_* code */ +static XLogSource XLogReceiptSource = 0; /* XLOG_FROM_* code */ /* State information for XLOG reading */ static XLogRecPtr ReadRecPtr; /* start of last record read */ @@ -649,7 +650,7 @@ static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo); static bool XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock, XLogRecPtr *lsn, BkpBlock *bkpb); static Buffer RestoreBackupBlockContents(XLogRecPtr lsn, BkpBlock bkpb, - char *blk, bool get_cleanup_lock, bool keep_buffer); + char *blk, bool get_cleanup_lock, bool keep_buffer); static bool AdvanceXLInsertBuffer(bool new_segment); static bool XLogCheckpointNeeded(XLogSegNo new_segno); static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch); @@ -658,7 +659,7 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath, bool use_lock); static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, int source, bool notexistOk); -static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source); +static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source); static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI); @@ -823,7 +824,7 @@ begin:; /* OK, put it in this slot */ dtbuf[i] = rdt->buffer; if (doPageWrites && XLogCheckBuffer(rdt, true, - &(dtbuf_lsn[i]), &(dtbuf_xlg[i]))) + &(dtbuf_lsn[i]), &(dtbuf_xlg[i]))) { dtbuf_bkp[i] = true; rdt->data = NULL; @@ -1251,10 +1252,10 @@ XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock, page = BufferGetPage(rdata->buffer); /* - * We assume page LSN is first data on *every* page that can be passed - * to XLogInsert, whether it has the standard page layout or not. We - * don't need to take the buffer header lock for PageGetLSN if we hold - * an exclusive lock on the page and/or the relation. + * We assume page LSN is first data on *every* page that can be passed to + * XLogInsert, whether it has the standard page layout or not. We don't + * need to take the buffer header lock for PageGetLSN if we hold an + * exclusive lock on the page and/or the relation. */ if (holdsExclusiveLock) *lsn = PageGetLSN(page); @@ -1545,7 +1546,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch) */ if (LogwrtResult.Write >= XLogCtl->xlblocks[curridx]) elog(PANIC, "xlog write request %X/%X is past end of log %X/%X", - (uint32) (LogwrtResult.Write >> 32), (uint32) LogwrtResult.Write, + (uint32) (LogwrtResult.Write >> 32), (uint32) LogwrtResult.Write, (uint32) (XLogCtl->xlblocks[curridx] >> 32), (uint32) XLogCtl->xlblocks[curridx]); @@ -1611,9 +1612,9 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch) if (lseek(openLogFile, (off_t) startoffset, SEEK_SET) < 0) ereport(PANIC, (errcode_for_file_access(), - errmsg("could not seek in log file %s to offset %u: %m", - XLogFileNameP(ThisTimeLineID, openLogSegNo), - startoffset))); + errmsg("could not seek in log file %s to offset %u: %m", + XLogFileNameP(ThisTimeLineID, openLogSegNo), + startoffset))); openLogOff = startoffset; } @@ -1858,7 +1859,7 @@ UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force) if (!force && newMinRecoveryPoint < lsn) elog(WARNING, "xlog min recovery request %X/%X is past current point %X/%X", - (uint32) (lsn >> 32) , (uint32) lsn, + (uint32) (lsn >> 32), (uint32) lsn, (uint32) (newMinRecoveryPoint >> 32), (uint32) newMinRecoveryPoint); @@ -1872,10 +1873,10 @@ UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force) minRecoveryPointTLI = newMinRecoveryPointTLI; ereport(DEBUG2, - (errmsg("updated min recovery point to %X/%X on timeline %u", - (uint32) (minRecoveryPoint >> 32), - (uint32) minRecoveryPoint, - newMinRecoveryPointTLI))); + (errmsg("updated min recovery point to %X/%X on timeline %u", + (uint32) (minRecoveryPoint >> 32), + (uint32) minRecoveryPoint, + newMinRecoveryPointTLI))); } } LWLockRelease(ControlFileLock); @@ -1915,7 +1916,7 @@ XLogFlush(XLogRecPtr record) elog(LOG, "xlog flush request %X/%X; write %X/%X; flush %X/%X", (uint32) (record >> 32), (uint32) record, (uint32) (LogwrtResult.Write >> 32), (uint32) LogwrtResult.Write, - (uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush); + (uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush); #endif START_CRIT_SECTION(); @@ -1979,8 +1980,8 @@ XLogFlush(XLogRecPtr record) /* * Sleep before flush! By adding a delay here, we may give further * backends the opportunity to join the backlog of group commit - * followers; this can significantly improve transaction throughput, at - * the risk of increasing transaction latency. + * followers; this can significantly improve transaction throughput, + * at the risk of increasing transaction latency. * * We do not sleep if enableFsync is not turned on, nor if there are * fewer than CommitSiblings other backends with active transactions. @@ -1995,7 +1996,7 @@ XLogFlush(XLogRecPtr record) XLogCtlInsert *Insert = &XLogCtl->Insert; uint32 freespace = INSERT_FREESPACE(Insert); - if (freespace == 0) /* buffer is full */ + if (freespace == 0) /* buffer is full */ WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx]; else { @@ -2048,7 +2049,7 @@ XLogFlush(XLogRecPtr record) elog(ERROR, "xlog flush request %X/%X is not satisfied --- flushed only to %X/%X", (uint32) (record >> 32), (uint32) record, - (uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush); + (uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush); } /* @@ -2127,7 +2128,7 @@ XLogBackgroundFlush(void) elog(LOG, "xlog bg flush request %X/%X; write %X/%X; flush %X/%X", (uint32) (WriteRqstPtr >> 32), (uint32) WriteRqstPtr, (uint32) (LogwrtResult.Write >> 32), (uint32) LogwrtResult.Write, - (uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush); + (uint32) (LogwrtResult.Flush >> 32), (uint32) LogwrtResult.Flush); #endif START_CRIT_SECTION(); @@ -2379,7 +2380,7 @@ XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock) if (fd < 0) ereport(ERROR, (errcode_for_file_access(), - errmsg("could not open file \"%s\": %m", path))); + errmsg("could not open file \"%s\": %m", path))); elog(DEBUG2, "done creating and filling new WAL file"); @@ -2719,7 +2720,7 @@ XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source) * want to read. * * If we haven't read the timeline history file yet, read it now, so that - * we know which TLIs to scan. We don't save the list in expectedTLEs, + * we know which TLIs to scan. We don't save the list in expectedTLEs, * however, unless we actually find a valid segment. That way if there is * neither a timeline history file nor a WAL segment in the archive, and * streaming replication is set up, we'll read the timeline history file @@ -3215,8 +3216,8 @@ RestoreBackupBlockContents(XLogRecPtr lsn, BkpBlock bkpb, char *blk, } /* - * The checksum value on this page is currently invalid. We don't - * need to reset it here since it will be set before being written. + * The checksum value on this page is currently invalid. We don't need to + * reset it here since it will be set before being written. */ PageSetLSN(page, lsn); @@ -3258,7 +3259,7 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode, for (;;) { - char *errormsg; + char *errormsg; record = XLogReadRecord(xlogreader, RecPtr, &errormsg); ReadRecPtr = xlogreader->ReadRecPtr; @@ -3272,34 +3273,35 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode, } /* - * We only end up here without a message when XLogPageRead() failed - * - in that case we already logged something. - * In StandbyMode that only happens if we have been triggered, so - * we shouldn't loop anymore in that case. + * We only end up here without a message when XLogPageRead() + * failed - in that case we already logged something. In + * StandbyMode that only happens if we have been triggered, so we + * shouldn't loop anymore in that case. */ if (errormsg) ereport(emode_for_corrupt_record(emode, RecPtr ? RecPtr : EndRecPtr), - (errmsg_internal("%s", errormsg) /* already translated */)); + (errmsg_internal("%s", errormsg) /* already translated */ )); } + /* * Check page TLI is one of the expected values. */ else if (!tliInHistory(xlogreader->latestPageTLI, expectedTLEs)) { char fname[MAXFNAMELEN]; - XLogSegNo segno; - int32 offset; + XLogSegNo segno; + int32 offset; XLByteToSeg(xlogreader->latestPagePtr, segno); offset = xlogreader->latestPagePtr % XLogSegSize; XLogFileName(fname, xlogreader->readPageTLI, segno); ereport(emode_for_corrupt_record(emode, RecPtr ? RecPtr : EndRecPtr), - (errmsg("unexpected timeline ID %u in log segment %s, offset %u", - xlogreader->latestPageTLI, - fname, - offset))); + (errmsg("unexpected timeline ID %u in log segment %s, offset %u", + xlogreader->latestPageTLI, + fname, + offset))); record = NULL; } @@ -3314,10 +3316,10 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode, lastSourceFailed = true; /* - * If archive recovery was requested, but we were still doing crash - * recovery, switch to archive recovery and retry using the offline - * archive. We have now replayed all the valid WAL in pg_xlog, so - * we are presumably now consistent. + * If archive recovery was requested, but we were still doing + * crash recovery, switch to archive recovery and retry using the + * offline archive. We have now replayed all the valid WAL in + * pg_xlog, so we are presumably now consistent. * * We require that there's at least some valid WAL present in * pg_xlog, however (!fetch_ckpt). We could recover using the WAL @@ -3401,11 +3403,11 @@ rescanLatestTimeLine(void) newExpectedTLEs = readTimeLineHistory(newtarget); /* - * If the current timeline is not part of the history of the new - * timeline, we cannot proceed to it. + * If the current timeline is not part of the history of the new timeline, + * we cannot proceed to it. */ found = false; - foreach (cell, newExpectedTLEs) + foreach(cell, newExpectedTLEs) { currentTle = (TimeLineHistoryEntry *) lfirst(cell); @@ -3812,7 +3814,7 @@ DataChecksumsEnabled(void) XLogRecPtr GetFakeLSNForUnloggedRel(void) { - XLogRecPtr nextUnloggedLSN; + XLogRecPtr nextUnloggedLSN; /* use volatile pointer to prevent code rearrangement */ volatile XLogCtlData *xlogctl = XLogCtl; @@ -4991,15 +4993,15 @@ StartupXLOG(void) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory"), - errdetail("Failed while allocating an XLog reading processor"))); + errdetail("Failed while allocating an XLog reading processor"))); xlogreader->system_identifier = ControlFile->system_identifier; if (read_backup_label(&checkPointLoc, &backupEndRequired, &backupFromStandby)) { /* - * Archive recovery was requested, and thanks to the backup label file, - * we know how far we need to replay to reach consistency. Enter + * Archive recovery was requested, and thanks to the backup label + * file, we know how far we need to replay to reach consistency. Enter * archive recovery directly. */ InArchiveRecovery = true; @@ -5017,7 +5019,7 @@ StartupXLOG(void) wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN); ereport(DEBUG1, (errmsg("checkpoint record is at %X/%X", - (uint32) (checkPointLoc >> 32), (uint32) checkPointLoc))); + (uint32) (checkPointLoc >> 32), (uint32) checkPointLoc))); InRecovery = true; /* force recovery even if SHUTDOWNED */ /* @@ -5049,8 +5051,8 @@ StartupXLOG(void) /* * It's possible that archive recovery was requested, but we don't * know how far we need to replay the WAL before we reach consistency. - * This can happen for example if a base backup is taken from a running - * server using an atomic filesystem snapshot, without calling + * This can happen for example if a base backup is taken from a + * running server using an atomic filesystem snapshot, without calling * pg_start/stop_backup. Or if you just kill a running master server * and put it into archive recovery by creating a recovery.conf file. * @@ -5058,8 +5060,8 @@ StartupXLOG(void) * replaying all the WAL present in pg_xlog, and only enter archive * recovery after that. * - * But usually we already know how far we need to replay the WAL (up to - * minRecoveryPoint, up to backupEndPoint, or until we see an + * But usually we already know how far we need to replay the WAL (up + * to minRecoveryPoint, up to backupEndPoint, or until we see an * end-of-backup record), and we can enter archive recovery directly. */ if (ArchiveRecoveryRequested && @@ -5084,7 +5086,7 @@ StartupXLOG(void) { ereport(DEBUG1, (errmsg("checkpoint record is at %X/%X", - (uint32) (checkPointLoc >> 32), (uint32) checkPointLoc))); + (uint32) (checkPointLoc >> 32), (uint32) checkPointLoc))); } else if (StandbyMode) { @@ -5103,7 +5105,7 @@ StartupXLOG(void) { ereport(LOG, (errmsg("using previous checkpoint record at %X/%X", - (uint32) (checkPointLoc >> 32), (uint32) checkPointLoc))); + (uint32) (checkPointLoc >> 32), (uint32) checkPointLoc))); InRecovery = true; /* force recovery even if SHUTDOWNED */ } else @@ -5119,15 +5121,16 @@ StartupXLOG(void) * timeline in the history of the requested timeline, we cannot proceed: * the backup is not part of the history of the requested timeline. */ - Assert(expectedTLEs); /* was initialized by reading checkpoint record */ + Assert(expectedTLEs); /* was initialized by reading checkpoint + * record */ if (tliOfPointInHistory(checkPointLoc, expectedTLEs) != - checkPoint.ThisTimeLineID) + checkPoint.ThisTimeLineID) { - XLogRecPtr switchpoint; + XLogRecPtr switchpoint; /* - * tliSwitchPoint will throw an error if the checkpoint's timeline - * is not in expectedTLEs at all. + * tliSwitchPoint will throw an error if the checkpoint's timeline is + * not in expectedTLEs at all. */ switchpoint = tliSwitchPoint(ControlFile->checkPointCopy.ThisTimeLineID, expectedTLEs, NULL); ereport(FATAL, @@ -5146,8 +5149,8 @@ StartupXLOG(void) * history, too. */ if (!XLogRecPtrIsInvalid(ControlFile->minRecoveryPoint) && - tliOfPointInHistory(ControlFile->minRecoveryPoint - 1, expectedTLEs) != - ControlFile->minRecoveryPointTLI) + tliOfPointInHistory(ControlFile->minRecoveryPoint - 1, expectedTLEs) != + ControlFile->minRecoveryPointTLI) ereport(FATAL, (errmsg("requested timeline %u does not contain minimum recovery point %X/%X on timeline %u", recoveryTargetTLI, @@ -5159,7 +5162,7 @@ StartupXLOG(void) ereport(DEBUG1, (errmsg("redo record is at %X/%X; shutdown %s", - (uint32) (checkPoint.redo >> 32), (uint32) checkPoint.redo, + (uint32) (checkPoint.redo >> 32), (uint32) checkPoint.redo, wasShutdown ? "TRUE" : "FALSE"))); ereport(DEBUG1, (errmsg("next transaction ID: %u/%u; next OID: %u", @@ -5206,16 +5209,16 @@ StartupXLOG(void) ThisTimeLineID = checkPoint.ThisTimeLineID; /* - * Copy any missing timeline history files between 'now' and the - * recovery target timeline from archive to pg_xlog. While we don't need - * those files ourselves - the history file of the recovery target - * timeline covers all the previous timelines in the history too - a - * cascading standby server might be interested in them. Or, if you - * archive the WAL from this server to a different archive than the - * master, it'd be good for all the history files to get archived there - * after failover, so that you can use one of the old timelines as a - * PITR target. Timeline history files are small, so it's better to copy - * them unnecessarily than not copy them and regret later. + * Copy any missing timeline history files between 'now' and the recovery + * target timeline from archive to pg_xlog. While we don't need those + * files ourselves - the history file of the recovery target timeline + * covers all the previous timelines in the history too - a cascading + * standby server might be interested in them. Or, if you archive the WAL + * from this server to a different archive than the master, it'd be good + * for all the history files to get archived there after failover, so that + * you can use one of the old timelines as a PITR target. Timeline history + * files are small, so it's better to copy them unnecessarily than not + * copy them and regret later. */ restoreTimeLineHistoryFiles(ThisTimeLineID, recoveryTargetTLI); @@ -5271,10 +5274,10 @@ StartupXLOG(void) "automatic recovery in progress"))); if (recoveryTargetTLI > ControlFile->checkPointCopy.ThisTimeLineID) ereport(LOG, - (errmsg("crash recovery starts in timeline %u " - "and has target timeline %u", - ControlFile->checkPointCopy.ThisTimeLineID, - recoveryTargetTLI))); + (errmsg("crash recovery starts in timeline %u " + "and has target timeline %u", + ControlFile->checkPointCopy.ThisTimeLineID, + recoveryTargetTLI))); ControlFile->state = DB_IN_CRASH_RECOVERY; } ControlFile->prevCheckPoint = ControlFile->checkPoint; @@ -5509,14 +5512,15 @@ StartupXLOG(void) ereport(LOG, (errmsg("redo starts at %X/%X", - (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr))); + (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr))); /* * main redo apply loop */ do { - bool switchedTLI = false; + bool switchedTLI = false; + #ifdef WAL_DEBUG if (XLOG_DEBUG || (rmid == RM_XACT_ID && trace_recovery_messages <= DEBUG2) || @@ -5526,8 +5530,8 @@ StartupXLOG(void) initStringInfo(&buf); appendStringInfo(&buf, "REDO @ %X/%X; LSN %X/%X: ", - (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr, - (uint32) (EndRecPtr >> 32), (uint32) EndRecPtr); + (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr, + (uint32) (EndRecPtr >> 32), (uint32) EndRecPtr); xlog_outrec(&buf, record); appendStringInfo(&buf, " - "); RmgrTable[record->xl_rmid].rm_desc(&buf, @@ -5598,13 +5602,13 @@ StartupXLOG(void) } /* - * Before replaying this record, check if this record - * causes the current timeline to change. The record is - * already considered to be part of the new timeline, - * so we update ThisTimeLineID before replaying it. - * That's important so that replayEndTLI, which is - * recorded as the minimum recovery point's TLI if - * recovery stops after this record, is set correctly. + * Before replaying this record, check if this record causes + * the current timeline to change. The record is already + * considered to be part of the new timeline, so we update + * ThisTimeLineID before replaying it. That's important so + * that replayEndTLI, which is recorded as the minimum + * recovery point's TLI if recovery stops after this record, + * is set correctly. */ if (record->xl_rmid == RM_XLOG_ID) { @@ -5622,7 +5626,7 @@ StartupXLOG(void) } else if (info == XLOG_END_OF_RECOVERY) { - xl_end_of_recovery xlrec; + xl_end_of_recovery xlrec; memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_end_of_recovery)); newTLI = xlrec.ThisTimeLineID; @@ -5699,7 +5703,7 @@ StartupXLOG(void) ereport(LOG, (errmsg("redo done at %X/%X", - (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr))); + (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr))); xtime = GetLatestXTime(); if (xtime) ereport(LOG, @@ -5804,7 +5808,7 @@ StartupXLOG(void) PrevTimeLineID = ThisTimeLineID; if (ArchiveRecoveryRequested) { - char reason[200]; + char reason[200]; Assert(InArchiveRecovery); @@ -5952,8 +5956,9 @@ StartupXLOG(void) * allows some extra error checking in xlog_redo. * * In fast promotion, only create a lightweight end-of-recovery record - * instead of a full checkpoint. A checkpoint is requested later, after - * we're fully out of recovery mode and already accepting queries. + * instead of a full checkpoint. A checkpoint is requested later, + * after we're fully out of recovery mode and already accepting + * queries. */ if (bgwriterLaunched) { @@ -5972,14 +5977,15 @@ StartupXLOG(void) fast_promoted = true; /* - * Insert a special WAL record to mark the end of recovery, - * since we aren't doing a checkpoint. That means that the - * checkpointer process may likely be in the middle of a - * time-smoothed restartpoint and could continue to be for - * minutes after this. That sounds strange, but the effect - * is roughly the same and it would be stranger to try to - * come out of the restartpoint and then checkpoint. - * We request a checkpoint later anyway, just for safety. + * Insert a special WAL record to mark the end of + * recovery, since we aren't doing a checkpoint. That + * means that the checkpointer process may likely be in + * the middle of a time-smoothed restartpoint and could + * continue to be for minutes after this. That sounds + * strange, but the effect is roughly the same and it + * would be stranger to try to come out of the + * restartpoint and then checkpoint. We request a + * checkpoint later anyway, just for safety. */ CreateEndOfRecoveryRecord(); } @@ -5987,8 +5993,8 @@ StartupXLOG(void) if (!fast_promoted) RequestCheckpoint(CHECKPOINT_END_OF_RECOVERY | - CHECKPOINT_IMMEDIATE | - CHECKPOINT_WAIT); + CHECKPOINT_IMMEDIATE | + CHECKPOINT_WAIT); } else CreateCheckPoint(CHECKPOINT_END_OF_RECOVERY | CHECKPOINT_IMMEDIATE); @@ -6092,8 +6098,8 @@ StartupXLOG(void) } /* - * If there were cascading standby servers connected to us, nudge any - * wal sender processes to notice that we've been promoted. + * If there were cascading standby servers connected to us, nudge any wal + * sender processes to notice that we've been promoted. */ WalSndWakeup(); @@ -6151,9 +6157,9 @@ CheckRecoveryConsistency(void) } /* - * Have we passed our safe starting point? Note that minRecoveryPoint - * is known to be incorrectly set if ControlFile->backupEndRequired, - * until the XLOG_BACKUP_RECORD arrives to advise us of the correct + * Have we passed our safe starting point? Note that minRecoveryPoint is + * known to be incorrectly set if ControlFile->backupEndRequired, until + * the XLOG_BACKUP_RECORD arrives to advise us of the correct * minRecoveryPoint. All we know prior to that is that we're not * consistent yet. */ @@ -6770,7 +6776,7 @@ CreateCheckPoint(int flags) uint32 freespace; XLogSegNo _logSegNo; VirtualTransactionId *vxids; - int nvxids; + int nvxids; /* * An end-of-recovery checkpoint is really a shutdown checkpoint, just @@ -6946,13 +6952,13 @@ CreateCheckPoint(int flags) TRACE_POSTGRESQL_CHECKPOINT_START(flags); /* - * In some cases there are groups of actions that must all occur on - * one side or the other of a checkpoint record. Before flushing the + * In some cases there are groups of actions that must all occur on one + * side or the other of a checkpoint record. Before flushing the * checkpoint record we must explicitly wait for any backend currently * performing those groups of actions. * * One example is end of transaction, so we must wait for any transactions - * that are currently in commit critical sections. If an xact inserted + * that are currently in commit critical sections. If an xact inserted * its commit record into XLOG just before the REDO point, then a crash * restart from the REDO point would not replay that record, which means * that our flushing had better include the xact's update of pg_clog. So @@ -6977,7 +6983,7 @@ CreateCheckPoint(int flags) vxids = GetVirtualXIDsDelayingChkpt(&nvxids); if (nvxids > 0) { - uint32 nwaits = 0; + uint32 nwaits = 0; do { @@ -7182,9 +7188,9 @@ CreateCheckPoint(int flags) void CreateEndOfRecoveryRecord(void) { - xl_end_of_recovery xlrec; - XLogRecData rdata; - XLogRecPtr recptr; + xl_end_of_recovery xlrec; + XLogRecData rdata; + XLogRecPtr recptr; /* sanity check */ if (!RecoveryInProgress()) @@ -7211,8 +7217,8 @@ CreateEndOfRecoveryRecord(void) XLogFlush(recptr); /* - * Update the control file so that crash recovery can follow - * the timeline changes to this point. + * Update the control file so that crash recovery can follow the timeline + * changes to this point. */ LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); ControlFile->time = (pg_time_t) xlrec.end_time; @@ -7223,7 +7229,7 @@ CreateEndOfRecoveryRecord(void) END_CRIT_SECTION(); - LocalXLogInsertAllowed = -1; /* return to "check" state */ + LocalXLogInsertAllowed = -1; /* return to "check" state */ } /* @@ -7375,7 +7381,7 @@ CreateRestartPoint(int flags) { ereport(DEBUG2, (errmsg("skipping restartpoint, already performed at %X/%X", - (uint32) (lastCheckPoint.redo >> 32), (uint32) lastCheckPoint.redo))); + (uint32) (lastCheckPoint.redo >> 32), (uint32) lastCheckPoint.redo))); UpdateMinRecoveryPoint(InvalidXLogRecPtr, true); if (flags & CHECKPOINT_IS_SHUTDOWN) @@ -7458,7 +7464,8 @@ CreateRestartPoint(int flags) XLogRecPtr endptr; /* - * Get the current end of xlog replayed or received, whichever is later. + * Get the current end of xlog replayed or received, whichever is + * later. */ receivePtr = GetWalRcvWriteRecPtr(NULL, NULL); replayPtr = GetXLogReplayRecPtr(NULL); @@ -7468,8 +7475,8 @@ CreateRestartPoint(int flags) _logSegNo--; /* - * Update ThisTimeLineID to the timeline we're currently replaying, - * so that we install any recycled segments on that timeline. + * Update ThisTimeLineID to the timeline we're currently replaying, so + * that we install any recycled segments on that timeline. * * There is no guarantee that the WAL segments will be useful on the * current timeline; if recovery proceeds to a new timeline right @@ -7480,13 +7487,13 @@ CreateRestartPoint(int flags) * It's possible or perhaps even likely that we finish recovery while * a restartpoint is in progress. That means we may get to this point * some minutes afterwards. Setting ThisTimeLineID at that time would - * actually set it backwards, so we don't want that to persist; if - * we do reset it here, make sure to reset it back afterwards. This + * actually set it backwards, so we don't want that to persist; if we + * do reset it here, make sure to reset it back afterwards. This * doesn't look very clean or principled, but its the best of about * five different ways of handling this edge case. */ if (RecoveryInProgress()) - (void) GetXLogReplayRecPtr(&ThisTimeLineID); + (void) GetXLogReplayRecPtr(&ThisTimeLineID); RemoveOldXlogFiles(_logSegNo, endptr); @@ -7519,7 +7526,7 @@ CreateRestartPoint(int flags) xtime = GetLatestXTime(); ereport((log_checkpoints ? LOG : DEBUG2), (errmsg("recovery restart point at %X/%X", - (uint32) (lastCheckPoint.redo >> 32), (uint32) lastCheckPoint.redo), + (uint32) (lastCheckPoint.redo >> 32), (uint32) lastCheckPoint.redo), xtime ? errdetail("last completed transaction was at log time %s", timestamptz_to_str(xtime)) : 0)); @@ -7677,10 +7684,10 @@ XLogRestorePoint(const char *rpName) XLogRecPtr XLogSaveBufferForHint(Buffer buffer) { - XLogRecPtr recptr = InvalidXLogRecPtr; - XLogRecPtr lsn; + XLogRecPtr recptr = InvalidXLogRecPtr; + XLogRecPtr lsn; XLogRecData rdata[2]; - BkpBlock bkpb; + BkpBlock bkpb; /* * Ensure no checkpoint can change our view of RedoRecPtr. @@ -7693,8 +7700,8 @@ XLogSaveBufferForHint(Buffer buffer) GetRedoRecPtr(); /* - * Setup phony rdata element for use within XLogCheckBuffer only. - * We reuse and reset rdata for any actual WAL record insert. + * Setup phony rdata element for use within XLogCheckBuffer only. We reuse + * and reset rdata for any actual WAL record insert. */ rdata[0].buffer = buffer; rdata[0].buffer_std = true; @@ -7704,8 +7711,8 @@ XLogSaveBufferForHint(Buffer buffer) */ if (XLogCheckBuffer(rdata, false, &lsn, &bkpb)) { - char copied_buffer[BLCKSZ]; - char *origdata = (char *) BufferGetBlock(buffer); + char copied_buffer[BLCKSZ]; + char *origdata = (char *) BufferGetBlock(buffer); /* * Copy buffer so we don't have to worry about concurrent hint bit or @@ -7714,8 +7721,8 @@ XLogSaveBufferForHint(Buffer buffer) */ memcpy(copied_buffer, origdata, bkpb.hole_offset); memcpy(copied_buffer + bkpb.hole_offset, - origdata + bkpb.hole_offset + bkpb.hole_length, - BLCKSZ - bkpb.hole_offset - bkpb.hole_length); + origdata + bkpb.hole_offset + bkpb.hole_length, + BLCKSZ - bkpb.hole_offset - bkpb.hole_length); /* * Header for backup block. @@ -7861,25 +7868,24 @@ checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI, TimeLineID prevTLI) ereport(PANIC, (errmsg("unexpected prev timeline ID %u (current timeline ID %u) in checkpoint record", prevTLI, ThisTimeLineID))); + /* - * The new timeline better be in the list of timelines we expect - * to see, according to the timeline history. It should also not - * decrease. + * The new timeline better be in the list of timelines we expect to see, + * according to the timeline history. It should also not decrease. */ if (newTLI < ThisTimeLineID || !tliInHistory(newTLI, expectedTLEs)) ereport(PANIC, - (errmsg("unexpected timeline ID %u (after %u) in checkpoint record", - newTLI, ThisTimeLineID))); + (errmsg("unexpected timeline ID %u (after %u) in checkpoint record", + newTLI, ThisTimeLineID))); /* - * If we have not yet reached min recovery point, and we're about - * to switch to a timeline greater than the timeline of the min - * recovery point: trouble. After switching to the new timeline, - * we could not possibly visit the min recovery point on the - * correct timeline anymore. This can happen if there is a newer - * timeline in the archive that branched before the timeline the - * min recovery point is on, and you attempt to do PITR to the - * new timeline. + * If we have not yet reached min recovery point, and we're about to + * switch to a timeline greater than the timeline of the min recovery + * point: trouble. After switching to the new timeline, we could not + * possibly visit the min recovery point on the correct timeline anymore. + * This can happen if there is a newer timeline in the archive that + * branched before the timeline the min recovery point is on, and you + * attempt to do PITR to the new timeline. */ if (!XLogRecPtrIsInvalid(minRecoveryPoint) && lsn < minRecoveryPoint && @@ -8101,21 +8107,21 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record) } else if (info == XLOG_HINT) { - char *data; - BkpBlock bkpb; + char *data; + BkpBlock bkpb; /* - * Hint bit records contain a backup block stored "inline" in the normal - * data since the locking when writing hint records isn't sufficient to - * use the normal backup block mechanism, which assumes exclusive lock - * on the buffer supplied. + * Hint bit records contain a backup block stored "inline" in the + * normal data since the locking when writing hint records isn't + * sufficient to use the normal backup block mechanism, which assumes + * exclusive lock on the buffer supplied. * - * Since the only change in these backup block are hint bits, there are - * no recovery conflicts generated. + * Since the only change in these backup block are hint bits, there + * are no recovery conflicts generated. * - * This also means there is no corresponding API call for this, - * so an smgr implementation has no need to implement anything. - * Which means nothing is needed in md.c etc + * This also means there is no corresponding API call for this, so an + * smgr implementation has no need to implement anything. Which means + * nothing is needed in md.c etc */ data = XLogRecGetData(record); memcpy(&bkpb, data, sizeof(BkpBlock)); @@ -8318,7 +8324,7 @@ assign_xlog_sync_method(int new_sync_method, void *extra) ereport(PANIC, (errcode_for_file_access(), errmsg("could not fsync log segment %s: %m", - XLogFileNameP(ThisTimeLineID, openLogSegNo)))); + XLogFileNameP(ThisTimeLineID, openLogSegNo)))); if (get_sync_bit(sync_method) != get_sync_bit(new_sync_method)) XLogFileClose(); } @@ -8349,8 +8355,8 @@ issue_xlog_fsync(int fd, XLogSegNo segno) if (pg_fsync_writethrough(fd) != 0) ereport(PANIC, (errcode_for_file_access(), - errmsg("could not fsync write-through log file %s: %m", - XLogFileNameP(ThisTimeLineID, segno)))); + errmsg("could not fsync write-through log file %s: %m", + XLogFileNameP(ThisTimeLineID, segno)))); break; #endif #ifdef HAVE_FDATASYNC @@ -8379,6 +8385,7 @@ char * XLogFileNameP(TimeLineID tli, XLogSegNo segno) { char *result = palloc(MAXFNAMELEN); + XLogFileName(result, tli, segno); return result; } @@ -8630,9 +8637,9 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p, "%Y-%m-%d %H:%M:%S %Z", pg_localtime(&stamp_time, log_timezone)); appendStringInfo(&labelfbuf, "START WAL LOCATION: %X/%X (file %s)\n", - (uint32) (startpoint >> 32), (uint32) startpoint, xlogfilename); + (uint32) (startpoint >> 32), (uint32) startpoint, xlogfilename); appendStringInfo(&labelfbuf, "CHECKPOINT LOCATION: %X/%X\n", - (uint32) (checkpointloc >> 32), (uint32) checkpointloc); + (uint32) (checkpointloc >> 32), (uint32) checkpointloc); appendStringInfo(&labelfbuf, "BACKUP METHOD: %s\n", exclusive ? "pg_start_backup" : "streamed"); appendStringInfo(&labelfbuf, "BACKUP FROM: %s\n", @@ -8936,10 +8943,10 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p) (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("WAL generated with full_page_writes=off was replayed " "during online backup"), - errhint("This means that the backup being taken on the standby " - "is corrupt and should not be used. " + errhint("This means that the backup being taken on the standby " + "is corrupt and should not be used. " "Enable full_page_writes and run CHECKPOINT on the master, " - "and then try an online backup again."))); + "and then try an online backup again."))); LWLockAcquire(ControlFileLock, LW_SHARED); @@ -8990,7 +8997,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p) errmsg("could not create file \"%s\": %m", histfilepath))); fprintf(fp, "START WAL LOCATION: %X/%X (file %s)\n", - (uint32) (startpoint >> 32), (uint32) startpoint, startxlogfilename); + (uint32) (startpoint >> 32), (uint32) startpoint, startxlogfilename); fprintf(fp, "STOP WAL LOCATION: %X/%X (file %s)\n", (uint32) (stoppoint >> 32), (uint32) stoppoint, stopxlogfilename); /* transfer remaining lines from label to history file */ @@ -9366,10 +9373,10 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI) { XLogPageReadPrivate *private = - (XLogPageReadPrivate *) xlogreader->private_data; + (XLogPageReadPrivate *) xlogreader->private_data; int emode = private->emode; uint32 targetPageOff; - XLogSegNo targetSegNo PG_USED_FOR_ASSERTS_ONLY; + XLogSegNo targetSegNo PG_USED_FOR_ASSERTS_ONLY; XLByteToSeg(targetPagePtr, targetSegNo); targetPageOff = targetPagePtr % XLogSegSize; @@ -9448,24 +9455,24 @@ retry: readOff = targetPageOff; if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0) { - char fname[MAXFNAMELEN]; + char fname[MAXFNAMELEN]; XLogFileName(fname, curFileTLI, readSegNo); ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen), (errcode_for_file_access(), - errmsg("could not seek in log segment %s to offset %u: %m", + errmsg("could not seek in log segment %s to offset %u: %m", fname, readOff))); goto next_record_is_invalid; } if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ) { - char fname[MAXFNAMELEN]; + char fname[MAXFNAMELEN]; XLogFileName(fname, curFileTLI, readSegNo); ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen), (errcode_for_file_access(), - errmsg("could not read from log segment %s, offset %u: %m", + errmsg("could not read from log segment %s, offset %u: %m", fname, readOff))); goto next_record_is_invalid; } @@ -9524,12 +9531,12 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, bool fetching_ckpt, XLogRecPtr tliRecPtr) { static pg_time_t last_fail_time = 0; - pg_time_t now; + pg_time_t now; /*------- * Standby mode is implemented by a state machine: * - * 1. Read from archive (XLOG_FROM_ARCHIVE) + * 1. Read from archive (XLOG_FROM_ARCHIVE) * 2. Read from pg_xlog (XLOG_FROM_PG_XLOG) * 3. Check trigger file * 4. Read from primary server via walreceiver (XLOG_FROM_STREAM) @@ -9554,7 +9561,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, for (;;) { - int oldSource = currentSource; + int oldSource = currentSource; /* * First check if we failed to read from the current source, and @@ -9571,11 +9578,12 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, break; case XLOG_FROM_PG_XLOG: + /* - * Check to see if the trigger file exists. Note that we do - * this only after failure, so when you create the trigger - * file, we still finish replaying as much as we can from - * archive and pg_xlog before failover. + * Check to see if the trigger file exists. Note that we + * do this only after failure, so when you create the + * trigger file, we still finish replaying as much as we + * can from archive and pg_xlog before failover. */ if (StandbyMode && CheckForStandbyTrigger()) { @@ -9584,15 +9592,15 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, } /* - * Not in standby mode, and we've now tried the archive and - * pg_xlog. + * Not in standby mode, and we've now tried the archive + * and pg_xlog. */ if (!StandbyMode) return false; /* - * If primary_conninfo is set, launch walreceiver to try to - * stream the missing WAL. + * If primary_conninfo is set, launch walreceiver to try + * to stream the missing WAL. * * If fetching_ckpt is TRUE, RecPtr points to the initial * checkpoint location. In that case, we use RedoStartLSN @@ -9602,8 +9610,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, */ if (PrimaryConnInfo) { - XLogRecPtr ptr; - TimeLineID tli; + XLogRecPtr ptr; + TimeLineID tli; if (fetching_ckpt) { @@ -9624,28 +9632,32 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, RequestXLogStreaming(tli, ptr, PrimaryConnInfo); receivedUpto = 0; } + /* - * Move to XLOG_FROM_STREAM state in either case. We'll get - * immediate failure if we didn't launch walreceiver, and - * move on to the next state. + * Move to XLOG_FROM_STREAM state in either case. We'll + * get immediate failure if we didn't launch walreceiver, + * and move on to the next state. */ currentSource = XLOG_FROM_STREAM; break; case XLOG_FROM_STREAM: + /* - * Failure while streaming. Most likely, we got here because - * streaming replication was terminated, or promotion was - * triggered. But we also get here if we find an invalid - * record in the WAL streamed from master, in which case - * something is seriously wrong. There's little chance that - * the problem will just go away, but PANIC is not good for - * availability either, especially in hot standby mode. So, - * we treat that the same as disconnection, and retry from - * archive/pg_xlog again. The WAL in the archive should be - * identical to what was streamed, so it's unlikely that it - * helps, but one can hope... + * Failure while streaming. Most likely, we got here + * because streaming replication was terminated, or + * promotion was triggered. But we also get here if we + * find an invalid record in the WAL streamed from master, + * in which case something is seriously wrong. There's + * little chance that the problem will just go away, but + * PANIC is not good for availability either, especially + * in hot standby mode. So, we treat that the same as + * disconnection, and retry from archive/pg_xlog again. + * The WAL in the archive should be identical to what was + * streamed, so it's unlikely that it helps, but one can + * hope... */ + /* * Before we leave XLOG_FROM_STREAM state, make sure that * walreceiver is not active, so that it won't overwrite @@ -9668,11 +9680,12 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, } /* - * XLOG_FROM_STREAM is the last state in our state machine, - * so we've exhausted all the options for obtaining the - * requested WAL. We're going to loop back and retry from - * the archive, but if it hasn't been long since last - * attempt, sleep 5 seconds to avoid busy-waiting. + * XLOG_FROM_STREAM is the last state in our state + * machine, so we've exhausted all the options for + * obtaining the requested WAL. We're going to loop back + * and retry from the archive, but if it hasn't been long + * since last attempt, sleep 5 seconds to avoid + * busy-waiting. */ now = (pg_time_t) time(NULL); if ((now - last_fail_time) < 5) @@ -9691,9 +9704,9 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, else if (currentSource == XLOG_FROM_PG_XLOG) { /* - * We just successfully read a file in pg_xlog. We prefer files - * in the archive over ones in pg_xlog, so try the next file - * again from the archive first. + * We just successfully read a file in pg_xlog. We prefer files in + * the archive over ones in pg_xlog, so try the next file again + * from the archive first. */ if (InArchiveRecovery) currentSource = XLOG_FROM_ARCHIVE; @@ -9739,107 +9752,110 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, break; case XLOG_FROM_STREAM: - { - bool havedata; - - /* - * Check if WAL receiver is still active. - */ - if (!WalRcvStreaming()) - { - lastSourceFailed = true; - break; - } - - /* - * Walreceiver is active, so see if new data has arrived. - * - * We only advance XLogReceiptTime when we obtain fresh WAL - * from walreceiver and observe that we had already processed - * everything before the most recent "chunk" that it flushed to - * disk. In steady state where we are keeping up with the - * incoming data, XLogReceiptTime will be updated on each cycle. - * When we are behind, XLogReceiptTime will not advance, so the - * grace time allotted to conflicting queries will decrease. - */ - if (RecPtr < receivedUpto) - havedata = true; - else { - XLogRecPtr latestChunkStart; + bool havedata; - receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart, &receiveTLI); - if (RecPtr < receivedUpto && receiveTLI == curFileTLI) + /* + * Check if WAL receiver is still active. + */ + if (!WalRcvStreaming()) { + lastSourceFailed = true; + break; + } + + /* + * Walreceiver is active, so see if new data has arrived. + * + * We only advance XLogReceiptTime when we obtain fresh + * WAL from walreceiver and observe that we had already + * processed everything before the most recent "chunk" + * that it flushed to disk. In steady state where we are + * keeping up with the incoming data, XLogReceiptTime will + * be updated on each cycle. When we are behind, + * XLogReceiptTime will not advance, so the grace time + * allotted to conflicting queries will decrease. + */ + if (RecPtr < receivedUpto) havedata = true; - if (latestChunkStart <= RecPtr) + else + { + XLogRecPtr latestChunkStart; + + receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart, &receiveTLI); + if (RecPtr < receivedUpto && receiveTLI == curFileTLI) { - XLogReceiptTime = GetCurrentTimestamp(); - SetCurrentChunkStartTime(XLogReceiptTime); + havedata = true; + if (latestChunkStart <= RecPtr) + { + XLogReceiptTime = GetCurrentTimestamp(); + SetCurrentChunkStartTime(XLogReceiptTime); + } } + else + havedata = false; } - else - havedata = false; - } - if (havedata) - { - /* - * Great, streamed far enough. Open the file if it's not - * open already. Also read the timeline history file if - * we haven't initialized timeline history yet; it should - * be streamed over and present in pg_xlog by now. Use - * XLOG_FROM_STREAM so that source info is set correctly - * and XLogReceiptTime isn't changed. - */ - if (readFile < 0) + if (havedata) { - if (!expectedTLEs) - expectedTLEs = readTimeLineHistory(receiveTLI); - readFile = XLogFileRead(readSegNo, PANIC, - receiveTLI, - XLOG_FROM_STREAM, false); - Assert(readFile >= 0); + /* + * Great, streamed far enough. Open the file if it's + * not open already. Also read the timeline history + * file if we haven't initialized timeline history + * yet; it should be streamed over and present in + * pg_xlog by now. Use XLOG_FROM_STREAM so that + * source info is set correctly and XLogReceiptTime + * isn't changed. + */ + if (readFile < 0) + { + if (!expectedTLEs) + expectedTLEs = readTimeLineHistory(receiveTLI); + readFile = XLogFileRead(readSegNo, PANIC, + receiveTLI, + XLOG_FROM_STREAM, false); + Assert(readFile >= 0); + } + else + { + /* just make sure source info is correct... */ + readSource = XLOG_FROM_STREAM; + XLogReceiptSource = XLOG_FROM_STREAM; + return true; + } + break; } - else + + /* + * Data not here yet. Check for trigger, then wait for + * walreceiver to wake us up when new WAL arrives. + */ + if (CheckForStandbyTrigger()) { - /* just make sure source info is correct... */ - readSource = XLOG_FROM_STREAM; - XLogReceiptSource = XLOG_FROM_STREAM; - return true; + /* + * Note that we don't "return false" immediately here. + * After being triggered, we still want to replay all + * the WAL that was already streamed. It's in pg_xlog + * now, so we just treat this as a failure, and the + * state machine will move on to replay the streamed + * WAL from pg_xlog, and then recheck the trigger and + * exit replay. + */ + lastSourceFailed = true; + break; } - break; - } - /* - * Data not here yet. Check for trigger, then wait for - * walreceiver to wake us up when new WAL arrives. - */ - if (CheckForStandbyTrigger()) - { /* - * Note that we don't "return false" immediately here. - * After being triggered, we still want to replay all the - * WAL that was already streamed. It's in pg_xlog now, so - * we just treat this as a failure, and the state machine - * will move on to replay the streamed WAL from pg_xlog, - * and then recheck the trigger and exit replay. + * Wait for more WAL to arrive. Time out after 5 seconds, + * like when polling the archive, to react to a trigger + * file promptly. */ - lastSourceFailed = true; + WaitLatch(&XLogCtl->recoveryWakeupLatch, + WL_LATCH_SET | WL_TIMEOUT, + 5000L); + ResetLatch(&XLogCtl->recoveryWakeupLatch); break; } - /* - * Wait for more WAL to arrive. Time out after 5 seconds, like - * when polling the archive, to react to a trigger file - * promptly. - */ - WaitLatch(&XLogCtl->recoveryWakeupLatch, - WL_LATCH_SET | WL_TIMEOUT, - 5000L); - ResetLatch(&XLogCtl->recoveryWakeupLatch); - break; - } - default: elog(ERROR, "unexpected WAL source %d", currentSource); } @@ -9903,11 +9919,10 @@ CheckForStandbyTrigger(void) if (IsPromoteTriggered()) { /* - * In 9.1 and 9.2 the postmaster unlinked the promote file - * inside the signal handler. We now leave the file in place - * and let the Startup process do the unlink. This allows - * Startup to know whether we're doing fast or normal - * promotion. Fast promotion takes precedence. + * In 9.1 and 9.2 the postmaster unlinked the promote file inside the + * signal handler. We now leave the file in place and let the Startup + * process do the unlink. This allows Startup to know whether we're + * doing fast or normal promotion. Fast promotion takes precedence. */ if (stat(FAST_PROMOTE_SIGNAL_FILE, &stat_buf) == 0) { |