aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xlog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r--src/backend/access/transam/xlog.c114
1 files changed, 93 insertions, 21 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 7e3cf80c031..e66b75aa7ad 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.427 2010/06/28 19:46:19 rhaas Exp $
+ * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.428 2010/07/03 20:43:57 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -72,7 +72,6 @@ int XLogArchiveTimeout = 0;
bool XLogArchiveMode = false;
char *XLogArchiveCommand = NULL;
bool EnableHotStandby = false;
-int MaxStandbyDelay = 30 * 1000;
bool fullPageWrites = true;
bool log_checkpoints = false;
int sync_method = DEFAULT_SYNC_METHOD;
@@ -450,6 +449,15 @@ static ControlFileData *ControlFile = NULL;
static XLogwrtResult LogwrtResult = {{0, 0}, {0, 0}};
/*
+ * Codes indicating where we got a WAL file from during recovery, or where
+ * to attempt to get one. These are chosen so that they can be OR'd together
+ * in a bitmask state variable.
+ */
+#define XLOG_FROM_ARCHIVE (1<<0) /* Restored using restore_command */
+#define XLOG_FROM_PG_XLOG (1<<1) /* Existing file in pg_xlog */
+#define XLOG_FROM_STREAM (1<<2) /* Streamed from master */
+
+/*
* openLogFile is -1 or a kernel FD for an open log file segment.
* When it's open, openLogOff is the current seek offset in the file.
* openLogId/openLogSeg identify the segment. These variables are only
@@ -461,14 +469,6 @@ static uint32 openLogSeg = 0;
static uint32 openLogOff = 0;
/*
- * Codes indicating where we got a WAL file from during recovery, or where
- * to attempt to get one.
- */
-#define XLOG_FROM_ARCHIVE (1<<0) /* Restored using restore_command */
-#define XLOG_FROM_PG_XLOG (1<<1) /* Existing file in pg_xlog */
-#define XLOG_FROM_STREAM (1<<2) /* Streamed from master */
-
-/*
* These variables are used similarly to the ones above, but for reading
* the XLOG. Note, however, that readOff generally represents the offset
* of the page just read, not the seek position of the FD itself, which
@@ -487,7 +487,16 @@ static int readSource = 0; /* XLOG_FROM_* code */
* Keeps track of which sources we've tried to read the current WAL
* record from and failed.
*/
-static int failedSources = 0;
+static int failedSources = 0; /* OR of XLOG_FROM_* codes */
+
+/*
+ * These variables track when we last obtained some WAL data to process,
+ * and where we got it from. (XLogReceiptSource is initially the same as
+ * readSource, but readSource gets reset to zero when we don't have data
+ * to process right now.)
+ */
+static TimestampTz XLogReceiptTime = 0;
+static int XLogReceiptSource = 0; /* XLOG_FROM_* code */
/* Buffer for currently read page (XLOG_BLCKSZ bytes) */
static char *readBuf = NULL;
@@ -2626,7 +2635,7 @@ XLogFileOpen(uint32 log, uint32 seg)
* Open a logfile segment for reading (during recovery).
*
* If source = XLOG_FROM_ARCHIVE, the segment is retrieved from archive.
- * If source = XLOG_FROM_PG_XLOG, it's read from pg_xlog.
+ * Otherwise, it's assumed to be already available in pg_xlog.
*/
static int
XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
@@ -2655,6 +2664,7 @@ XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
break;
case XLOG_FROM_PG_XLOG:
+ case XLOG_FROM_STREAM:
XLogFilePath(path, tli, log, seg);
restoredFromArchive = false;
break;
@@ -2674,7 +2684,13 @@ XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
xlogfname);
set_ps_display(activitymsg, false);
+ /* Track source of data in assorted state variables */
readSource = source;
+ XLogReceiptSource = source;
+ /* In FROM_STREAM case, caller tracks receipt time, not me */
+ if (source != XLOG_FROM_STREAM)
+ XLogReceiptTime = GetCurrentTimestamp();
+
return fd;
}
if (errno != ENOENT || !notfoundOk) /* unexpected failure? */
@@ -5568,7 +5584,7 @@ pg_is_in_recovery(PG_FUNCTION_ARGS)
/*
* Returns timestamp of last recovered commit/abort record.
*/
-TimestampTz
+static TimestampTz
GetLatestXLogTime(void)
{
/* use volatile pointer to prevent code rearrangement */
@@ -5582,6 +5598,23 @@ GetLatestXLogTime(void)
}
/*
+ * Returns time of receipt of current chunk of XLOG data, as well as
+ * whether it was received from streaming replication or from archives.
+ */
+void
+GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream)
+{
+ /*
+ * This must be executed in the startup process, since we don't export
+ * the relevant state to shared memory.
+ */
+ Assert(InRecovery);
+
+ *rtime = XLogReceiptTime;
+ *fromStream = (XLogReceiptSource == XLOG_FROM_STREAM);
+}
+
+/*
* Note that text field supplied is a parameter name and does not require
* translation
*/
@@ -6060,6 +6093,9 @@ StartupXLOG(void)
xlogctl->recoveryLastRecPtr = ReadRecPtr;
SpinLockRelease(&xlogctl->info_lck);
+ /* Also ensure XLogReceiptTime has a sane value */
+ XLogReceiptTime = GetCurrentTimestamp();
+
/*
* Let postmaster know we've started redo now, so that it can
* launch bgwriter to perform restartpoints. We don't bother
@@ -7647,7 +7683,7 @@ CreateRestartPoint(int flags)
XLogRecPtr endptr;
/* Get the current (or recent) end of xlog */
- endptr = GetWalRcvWriteRecPtr();
+ endptr = GetWalRcvWriteRecPtr(NULL);
PrevLogSeg(_logId, _logSeg);
RemoveOldXlogFiles(_logId, _logSeg, endptr);
@@ -8757,7 +8793,7 @@ pg_last_xlog_receive_location(PG_FUNCTION_ARGS)
XLogRecPtr recptr;
char location[MAXFNAMELEN];
- recptr = GetWalRcvWriteRecPtr();
+ recptr = GetWalRcvWriteRecPtr(NULL);
if (recptr.xlogid == 0 && recptr.xrecoff == 0)
PG_RETURN_NULL();
@@ -9272,6 +9308,8 @@ retry:
{
if (WalRcvInProgress())
{
+ bool havedata;
+
/*
* If we find an invalid record in the WAL streamed from
* master, something is seriously wrong. There's little
@@ -9289,28 +9327,62 @@ retry:
}
/*
- * While walreceiver is active, wait for new WAL to arrive
- * from primary.
+ * Walreceiver is active, so see if new data has arrived.
+ *
+ * We only advance XLogReceiptTime when we obtain fresh
+ * WAL from walreceiver and observe that we had already
+ * processed everything before the most recent "chunk"
+ * that it flushed to disk. In steady state where we are
+ * keeping up with the incoming data, XLogReceiptTime
+ * will be updated on each cycle. When we are behind,
+ * XLogReceiptTime will not advance, so the grace time
+ * alloted to conflicting queries will decrease.
*/
- receivedUpto = GetWalRcvWriteRecPtr();
if (XLByteLT(*RecPtr, receivedUpto))
+ havedata = true;
+ else
+ {
+ XLogRecPtr latestChunkStart;
+
+ receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart);
+ if (XLByteLT(*RecPtr, receivedUpto))
+ {
+ havedata = true;
+ if (!XLByteLT(*RecPtr, latestChunkStart))
+ XLogReceiptTime = GetCurrentTimestamp();
+ }
+ else
+ havedata = false;
+ }
+ if (havedata)
{
/*
* Great, streamed far enough. Open the file if it's
- * not open already.
+ * not open already. Use XLOG_FROM_STREAM so that
+ * source info is set correctly and XLogReceiptTime
+ * isn't changed.
*/
if (readFile < 0)
{
readFile =
XLogFileRead(readId, readSeg, PANIC,
recoveryTargetTLI,
- XLOG_FROM_PG_XLOG, false);
+ XLOG_FROM_STREAM, false);
+ Assert(readFile >= 0);
switched_segment = true;
+ }
+ else
+ {
+ /* just make sure source info is correct... */
readSource = XLOG_FROM_STREAM;
+ XLogReceiptSource = XLOG_FROM_STREAM;
}
break;
}
+ /*
+ * Data not here yet, so check for trigger then sleep.
+ */
if (CheckForStandbyTrigger())
goto triggered;
@@ -9388,7 +9460,7 @@ retry:
readFile = XLogFileReadAnyTLI(readId, readSeg, DEBUG2,
sources);
switched_segment = true;
- if (readFile != -1)
+ if (readFile >= 0)
break;
/*