aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/access/transam/timeline.c2
-rw-r--r--src/backend/access/transam/xlog.c220
-rw-r--r--src/backend/access/transam/xlogarchive.c17
-rw-r--r--src/backend/postmaster/walwriter.c7
-rw-r--r--src/backend/replication/walreceiver.c6
-rw-r--r--src/include/access/xlog.h1
-rw-r--r--src/include/access/xlogarchive.h4
-rw-r--r--src/include/access/xlogdefs.h1
8 files changed, 234 insertions, 24 deletions
diff --git a/src/backend/access/transam/timeline.c b/src/backend/access/transam/timeline.c
index 8d0903c1756..acd5c2431da 100644
--- a/src/backend/access/transam/timeline.c
+++ b/src/backend/access/transam/timeline.c
@@ -452,7 +452,7 @@ writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
if (XLogArchivingActive())
{
TLHistoryFileName(histfname, newTLI);
- XLogArchiveNotify(histfname);
+ XLogArchiveNotify(histfname, true);
}
}
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index e51a7a749da..24165ab03ec 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -724,6 +724,18 @@ typedef struct XLogCtlData
XLogRecPtr lastFpwDisableRecPtr;
slock_t info_lck; /* locks shared variables shown above */
+
+ /*
+ * Variables used to track segment-boundary-crossing WAL records. See
+ * RegisterSegmentBoundary. Protected by segtrack_lck.
+ */
+ XLogSegNo lastNotifiedSeg;
+ XLogSegNo earliestSegBoundary;
+ XLogRecPtr earliestSegBoundaryEndPtr;
+ XLogSegNo latestSegBoundary;
+ XLogRecPtr latestSegBoundaryEndPtr;
+
+ slock_t segtrack_lck; /* locks shared variables shown above */
} XLogCtlData;
static XLogCtlData *XLogCtl = NULL;
@@ -920,6 +932,7 @@ static void RemoveXlogFile(const char *segname, XLogSegNo recycleSegNo,
XLogSegNo *endlogSegNo);
static void UpdateLastRemovedPtr(char *filename);
static void ValidateXLOGDirectoryStructure(void);
+static void RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr pos);
static void CleanupBackupHistory(void);
static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force);
static XLogRecord *ReadRecord(XLogReaderState *xlogreader,
@@ -1154,23 +1167,56 @@ XLogInsertRecord(XLogRecData *rdata,
END_CRIT_SECTION();
/*
- * Update shared LogwrtRqst.Write, if we crossed page boundary.
+ * If we crossed page boundary, update LogwrtRqst.Write; if we crossed
+ * segment boundary, register that and wake up walwriter.
*/
if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
{
+ XLogSegNo StartSeg;
+ XLogSegNo EndSeg;
+
+ XLByteToSeg(StartPos, StartSeg, wal_segment_size);
+ XLByteToSeg(EndPos, EndSeg, wal_segment_size);
+
+ /*
+ * Register our crossing the segment boundary if that occurred.
+ *
+ * Note that we did not use XLByteToPrevSeg() for determining the
+ * ending segment. This is so that a record that fits perfectly into
+ * the end of the segment causes the latter to get marked ready for
+ * archival immediately.
+ */
+ if (StartSeg != EndSeg && XLogArchivingActive())
+ RegisterSegmentBoundary(EndSeg, EndPos);
+
+ /*
+ * Advance LogwrtRqst.Write so that it includes new block(s).
+ *
+ * We do this after registering the segment boundary so that the
+ * comparison with the flushed pointer below can use the latest value
+ * known globally.
+ */
SpinLockAcquire(&XLogCtl->info_lck);
- /* advance global request to include new block(s) */
if (XLogCtl->LogwrtRqst.Write < EndPos)
XLogCtl->LogwrtRqst.Write = EndPos;
/* update local result copy while I have the chance */
LogwrtResult = XLogCtl->LogwrtResult;
SpinLockRelease(&XLogCtl->info_lck);
+
+ /*
+ * There's a chance that the record was already flushed to disk and we
+ * missed marking segments as ready for archive. If this happens, we
+ * nudge the WALWriter, which will take care of notifying segments as
+ * needed.
+ */
+ if (StartSeg != EndSeg && XLogArchivingActive() &&
+ LogwrtResult.Flush >= EndPos && ProcGlobal->walwriterLatch)
+ SetLatch(ProcGlobal->walwriterLatch);
}
/*
* If this was an XLOG_SWITCH record, flush the record and the empty
- * padding space that fills the rest of the segment, and perform
- * end-of-segment actions (eg, notifying archiver).
+ * padding space that fills the rest of the segment.
*/
if (isLogSwitch)
{
@@ -2421,6 +2467,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
/* We should always be inside a critical section here */
Assert(CritSectionCount > 0);
+ Assert(LWLockHeldByMe(WALWriteLock));
/*
* Update local LogwrtResult (caller probably did this already, but...)
@@ -2586,11 +2633,12 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
* later. Doing it here ensures that one and only one backend will
* perform this fsync.
*
- * This is also the right place to notify the Archiver that the
- * segment is ready to copy to archival storage, and to update the
- * timer for archive_timeout, and to signal for a checkpoint if
- * too many logfile segments have been used since the last
- * checkpoint.
+ * If WAL archiving is active, we attempt to notify the archiver
+ * of any segments that are now ready for archival.
+ *
+ * This is also the right place to update the timer for
+ * archive_timeout and to signal for a checkpoint if too many
+ * logfile segments have been used since the last checkpoint.
*/
if (finishing_seg)
{
@@ -2602,7 +2650,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
LogwrtResult.Flush = LogwrtResult.Write; /* end of page */
if (XLogArchivingActive())
- XLogArchiveNotifySeg(openLogSegNo);
+ NotifySegmentsReadyForArchive(LogwrtResult.Flush);
XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL);
XLogCtl->lastSegSwitchLSN = LogwrtResult.Flush;
@@ -2690,6 +2738,9 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush;
SpinLockRelease(&XLogCtl->info_lck);
}
+
+ if (XLogArchivingActive())
+ NotifySegmentsReadyForArchive(LogwrtResult.Flush);
}
/*
@@ -4329,6 +4380,131 @@ ValidateXLOGDirectoryStructure(void)
}
/*
+ * RegisterSegmentBoundary
+ *
+ * WAL records that are split across a segment boundary require special
+ * treatment for archiving: the initial segment must not be archived until
+ * the end segment has been flushed, in case we crash before we have
+ * the chance to flush the end segment (because after recovery we would
+ * overwrite that WAL record with a different one, and so the file we
+ * archived no longer represents truth.) This also applies to streaming
+ * physical replication.
+ *
+ * To handle this, we keep track of the LSN of WAL records that cross
+ * segment boundaries. Two such are sufficient: the ones with the
+ * earliest and the latest end pointers we know about, since the flush
+ * position advances monotonically. WAL record writers register
+ * boundary-crossing records here, which is used by .ready file creation
+ * to delay until the end segment is known flushed.
+ */
+static void
+RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr endpos)
+{
+ XLogSegNo segno PG_USED_FOR_ASSERTS_ONLY;
+
+ /* verify caller computed segment number correctly */
+ AssertArg((XLByteToSeg(endpos, segno, wal_segment_size), segno == seg));
+
+ SpinLockAcquire(&XLogCtl->segtrack_lck);
+
+ /*
+ * If no segment boundaries are registered, store the new segment boundary
+ * in earliestSegBoundary. Otherwise, store the greater segment
+ * boundaries in latestSegBoundary.
+ */
+ if (XLogCtl->earliestSegBoundary == MaxXLogSegNo)
+ {
+ XLogCtl->earliestSegBoundary = seg;
+ XLogCtl->earliestSegBoundaryEndPtr = endpos;
+ }
+ else if (seg > XLogCtl->earliestSegBoundary &&
+ (XLogCtl->latestSegBoundary == MaxXLogSegNo ||
+ seg > XLogCtl->latestSegBoundary))
+ {
+ XLogCtl->latestSegBoundary = seg;
+ XLogCtl->latestSegBoundaryEndPtr = endpos;
+ }
+
+ SpinLockRelease(&XLogCtl->segtrack_lck);
+}
+
+/*
+ * NotifySegmentsReadyForArchive
+ *
+ * Mark segments as ready for archival, given that it is safe to do so.
+ * This function is idempotent.
+ */
+void
+NotifySegmentsReadyForArchive(XLogRecPtr flushRecPtr)
+{
+ XLogSegNo latest_boundary_seg;
+ XLogSegNo last_notified;
+ XLogSegNo flushed_seg;
+ XLogSegNo seg;
+ bool keep_latest;
+
+ XLByteToSeg(flushRecPtr, flushed_seg, wal_segment_size);
+
+ SpinLockAcquire(&XLogCtl->segtrack_lck);
+
+ if (XLogCtl->latestSegBoundary <= flushed_seg &&
+ XLogCtl->latestSegBoundaryEndPtr <= flushRecPtr)
+ {
+ latest_boundary_seg = XLogCtl->latestSegBoundary;
+ keep_latest = false;
+ }
+ else if (XLogCtl->earliestSegBoundary <= flushed_seg &&
+ XLogCtl->earliestSegBoundaryEndPtr <= flushRecPtr)
+ {
+ latest_boundary_seg = XLogCtl->earliestSegBoundary;
+ keep_latest = true;
+ }
+ else
+ {
+ SpinLockRelease(&XLogCtl->segtrack_lck);
+ return;
+ }
+
+ last_notified = XLogCtl->lastNotifiedSeg;
+
+ /*
+ * Update shared memory and discard segment boundaries that are no longer
+ * needed.
+ *
+ * It is safe to update shared memory before we attempt to create the
+ * .ready files. If our calls to XLogArchiveNotifySeg() fail,
+ * RemoveOldXlogFiles() will retry it as needed.
+ */
+ if (last_notified < latest_boundary_seg - 1)
+ XLogCtl->lastNotifiedSeg = latest_boundary_seg - 1;
+
+ if (keep_latest)
+ {
+ XLogCtl->earliestSegBoundary = XLogCtl->latestSegBoundary;
+ XLogCtl->earliestSegBoundaryEndPtr = XLogCtl->latestSegBoundaryEndPtr;
+ }
+ else
+ {
+ XLogCtl->earliestSegBoundary = MaxXLogSegNo;
+ XLogCtl->earliestSegBoundaryEndPtr = InvalidXLogRecPtr;
+ }
+
+ XLogCtl->latestSegBoundary = MaxXLogSegNo;
+ XLogCtl->latestSegBoundaryEndPtr = InvalidXLogRecPtr;
+
+ SpinLockRelease(&XLogCtl->segtrack_lck);
+
+ /*
+ * Notify archiver about segments that are ready for archival (by creating
+ * the corresponding .ready files).
+ */
+ for (seg = last_notified + 1; seg < latest_boundary_seg; seg++)
+ XLogArchiveNotifySeg(seg, false);
+
+ PgArchWakeup();
+}
+
+/*
* Remove previous backup history files. This also retries creation of
* .ready files for any backup history files for which XLogArchiveNotify
* failed earlier.
@@ -5230,9 +5406,17 @@ XLOGShmemInit(void)
SpinLockInit(&XLogCtl->Insert.insertpos_lck);
SpinLockInit(&XLogCtl->info_lck);
+ SpinLockInit(&XLogCtl->segtrack_lck);
SpinLockInit(&XLogCtl->ulsn_lck);
InitSharedLatch(&XLogCtl->recoveryWakeupLatch);
ConditionVariableInit(&XLogCtl->recoveryNotPausedCV);
+
+ /* Initialize stuff for marking segments as ready for archival. */
+ XLogCtl->lastNotifiedSeg = MaxXLogSegNo;
+ XLogCtl->earliestSegBoundary = MaxXLogSegNo;
+ XLogCtl->earliestSegBoundaryEndPtr = InvalidXLogRecPtr;
+ XLogCtl->latestSegBoundary = MaxXLogSegNo;
+ XLogCtl->latestSegBoundaryEndPtr = InvalidXLogRecPtr;
}
/*
@@ -7874,6 +8058,20 @@ StartupXLOG(void)
XLogCtl->LogwrtRqst.Flush = EndOfLog;
/*
+ * Initialize XLogCtl->lastNotifiedSeg to the previous WAL file.
+ */
+ if (XLogArchivingActive())
+ {
+ XLogSegNo EndOfLogSeg;
+
+ XLByteToSeg(EndOfLog, EndOfLogSeg, wal_segment_size);
+
+ SpinLockAcquire(&XLogCtl->segtrack_lck);
+ XLogCtl->lastNotifiedSeg = EndOfLogSeg - 1;
+ SpinLockRelease(&XLogCtl->segtrack_lck);
+ }
+
+ /*
* Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE
* record before resource manager writes cleanup WAL records or checkpoint
* record is written.
@@ -8000,7 +8198,7 @@ StartupXLOG(void)
XLogArchiveCleanup(partialfname);
durable_rename(origpath, partialpath, ERROR);
- XLogArchiveNotify(partialfname);
+ XLogArchiveNotify(partialfname, true);
}
}
}
diff --git a/src/backend/access/transam/xlogarchive.c b/src/backend/access/transam/xlogarchive.c
index 26b023e754b..b9c19b20856 100644
--- a/src/backend/access/transam/xlogarchive.c
+++ b/src/backend/access/transam/xlogarchive.c
@@ -433,7 +433,7 @@ KeepFileRestoredFromArchive(const char *path, const char *xlogfname)
if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
XLogArchiveForceDone(xlogfname);
else
- XLogArchiveNotify(xlogfname);
+ XLogArchiveNotify(xlogfname, true);
/*
* If the existing file was replaced, since walsenders might have it open,
@@ -462,9 +462,12 @@ KeepFileRestoredFromArchive(const char *path, const char *xlogfname)
* by the archiver, e.g. we write 0000000100000001000000C6.ready
* and the archiver then knows to archive XLOGDIR/0000000100000001000000C6,
* then when complete, rename it to 0000000100000001000000C6.done
+ *
+ * Optionally, nudge the archiver process so that it'll notice the file we
+ * create.
*/
void
-XLogArchiveNotify(const char *xlog)
+XLogArchiveNotify(const char *xlog, bool nudge)
{
char archiveStatusPath[MAXPGPATH];
FILE *fd;
@@ -489,8 +492,8 @@ XLogArchiveNotify(const char *xlog)
return;
}
- /* Notify archiver that it's got something to do */
- if (IsUnderPostmaster)
+ /* If caller requested, let archiver know it's got work to do */
+ if (nudge)
PgArchWakeup();
}
@@ -498,12 +501,12 @@ XLogArchiveNotify(const char *xlog)
* Convenience routine to notify using segment number representation of filename
*/
void
-XLogArchiveNotifySeg(XLogSegNo segno)
+XLogArchiveNotifySeg(XLogSegNo segno, bool nudge)
{
char xlog[MAXFNAMELEN];
XLogFileName(xlog, ThisTimeLineID, segno, wal_segment_size);
- XLogArchiveNotify(xlog);
+ XLogArchiveNotify(xlog, nudge);
}
/*
@@ -608,7 +611,7 @@ XLogArchiveCheckDone(const char *xlog)
return true;
/* Retry creation of the .ready file */
- XLogArchiveNotify(xlog);
+ XLogArchiveNotify(xlog, true);
return false;
}
diff --git a/src/backend/postmaster/walwriter.c b/src/backend/postmaster/walwriter.c
index 626fae8454c..6a1e16edc23 100644
--- a/src/backend/postmaster/walwriter.c
+++ b/src/backend/postmaster/walwriter.c
@@ -249,6 +249,13 @@ WalWriterMain(void)
HandleWalWriterInterrupts();
/*
+ * Notify the archiver of any WAL segments that are ready. We do this
+ * here to handle a race condition where WAL is flushed to disk prior
+ * to registering the segment boundary.
+ */
+ NotifySegmentsReadyForArchive(GetFlushRecPtr());
+
+ /*
* Do what we're here for; then, if XLogBackgroundFlush() found useful
* work to do, reset hibernation counter.
*/
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 9a2bc37fd71..60de3be92c2 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -622,7 +622,7 @@ WalReceiverMain(void)
if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
XLogArchiveForceDone(xlogfname);
else
- XLogArchiveNotify(xlogfname);
+ XLogArchiveNotify(xlogfname, true);
}
recvFile = -1;
@@ -760,7 +760,7 @@ WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
XLogArchiveForceDone(fname);
else
- XLogArchiveNotify(fname);
+ XLogArchiveNotify(fname, true);
pfree(fname);
pfree(content);
@@ -915,7 +915,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
XLogArchiveForceDone(xlogfname);
else
- XLogArchiveNotify(xlogfname);
+ XLogArchiveNotify(xlogfname, true);
}
recvFile = -1;
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 0a8ede700de..6b6ae81c2d5 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -315,6 +315,7 @@ extern XLogRecPtr GetInsertRecPtr(void);
extern XLogRecPtr GetFlushRecPtr(void);
extern XLogRecPtr GetLastImportantRecPtr(void);
extern void RemovePromoteSignalFiles(void);
+extern void NotifySegmentsReadyForArchive(XLogRecPtr flushRecPtr);
extern bool PromoteIsTriggered(void);
extern bool CheckPromoteSignal(void);
diff --git a/src/include/access/xlogarchive.h b/src/include/access/xlogarchive.h
index 3edd1a976c1..935b4cb02d5 100644
--- a/src/include/access/xlogarchive.h
+++ b/src/include/access/xlogarchive.h
@@ -23,8 +23,8 @@ extern bool RestoreArchivedFile(char *path, const char *xlogfname,
extern void ExecuteRecoveryCommand(const char *command, const char *commandName,
bool failOnSignal);
extern void KeepFileRestoredFromArchive(const char *path, const char *xlogfname);
-extern void XLogArchiveNotify(const char *xlog);
-extern void XLogArchiveNotifySeg(XLogSegNo segno);
+extern void XLogArchiveNotify(const char *xlog, bool nudge);
+extern void XLogArchiveNotifySeg(XLogSegNo segno, bool nudge);
extern void XLogArchiveForceDone(const char *xlog);
extern bool XLogArchiveCheckDone(const char *xlog);
extern bool XLogArchiveIsBusy(const char *xlog);
diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h
index 60348d18509..9b455e88e31 100644
--- a/src/include/access/xlogdefs.h
+++ b/src/include/access/xlogdefs.h
@@ -46,6 +46,7 @@ typedef uint64 XLogRecPtr;
* XLogSegNo - physical log file sequence number.
*/
typedef uint64 XLogSegNo;
+#define MaxXLogSegNo ((XLogSegNo) 0xFFFFFFFFFFFFFFFF)
/*
* TimeLineID (TLI) - identifies different database histories to prevent