diff options
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r-- | src/backend/access/transam/xlog.c | 114 |
1 files changed, 93 insertions, 21 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 7e3cf80c031..e66b75aa7ad 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.427 2010/06/28 19:46:19 rhaas Exp $ + * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.428 2010/07/03 20:43:57 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -72,7 +72,6 @@ int XLogArchiveTimeout = 0; bool XLogArchiveMode = false; char *XLogArchiveCommand = NULL; bool EnableHotStandby = false; -int MaxStandbyDelay = 30 * 1000; bool fullPageWrites = true; bool log_checkpoints = false; int sync_method = DEFAULT_SYNC_METHOD; @@ -450,6 +449,15 @@ static ControlFileData *ControlFile = NULL; static XLogwrtResult LogwrtResult = {{0, 0}, {0, 0}}; /* + * Codes indicating where we got a WAL file from during recovery, or where + * to attempt to get one. These are chosen so that they can be OR'd together + * in a bitmask state variable. + */ +#define XLOG_FROM_ARCHIVE (1<<0) /* Restored using restore_command */ +#define XLOG_FROM_PG_XLOG (1<<1) /* Existing file in pg_xlog */ +#define XLOG_FROM_STREAM (1<<2) /* Streamed from master */ + +/* * openLogFile is -1 or a kernel FD for an open log file segment. * When it's open, openLogOff is the current seek offset in the file. * openLogId/openLogSeg identify the segment. These variables are only @@ -461,14 +469,6 @@ static uint32 openLogSeg = 0; static uint32 openLogOff = 0; /* - * Codes indicating where we got a WAL file from during recovery, or where - * to attempt to get one. - */ -#define XLOG_FROM_ARCHIVE (1<<0) /* Restored using restore_command */ -#define XLOG_FROM_PG_XLOG (1<<1) /* Existing file in pg_xlog */ -#define XLOG_FROM_STREAM (1<<2) /* Streamed from master */ - -/* * These variables are used similarly to the ones above, but for reading * the XLOG. Note, however, that readOff generally represents the offset * of the page just read, not the seek position of the FD itself, which @@ -487,7 +487,16 @@ static int readSource = 0; /* XLOG_FROM_* code */ * Keeps track of which sources we've tried to read the current WAL * record from and failed. */ -static int failedSources = 0; +static int failedSources = 0; /* OR of XLOG_FROM_* codes */ + +/* + * These variables track when we last obtained some WAL data to process, + * and where we got it from. (XLogReceiptSource is initially the same as + * readSource, but readSource gets reset to zero when we don't have data + * to process right now.) + */ +static TimestampTz XLogReceiptTime = 0; +static int XLogReceiptSource = 0; /* XLOG_FROM_* code */ /* Buffer for currently read page (XLOG_BLCKSZ bytes) */ static char *readBuf = NULL; @@ -2626,7 +2635,7 @@ XLogFileOpen(uint32 log, uint32 seg) * Open a logfile segment for reading (during recovery). * * If source = XLOG_FROM_ARCHIVE, the segment is retrieved from archive. - * If source = XLOG_FROM_PG_XLOG, it's read from pg_xlog. + * Otherwise, it's assumed to be already available in pg_xlog. */ static int XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli, @@ -2655,6 +2664,7 @@ XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli, break; case XLOG_FROM_PG_XLOG: + case XLOG_FROM_STREAM: XLogFilePath(path, tli, log, seg); restoredFromArchive = false; break; @@ -2674,7 +2684,13 @@ XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli, xlogfname); set_ps_display(activitymsg, false); + /* Track source of data in assorted state variables */ readSource = source; + XLogReceiptSource = source; + /* In FROM_STREAM case, caller tracks receipt time, not me */ + if (source != XLOG_FROM_STREAM) + XLogReceiptTime = GetCurrentTimestamp(); + return fd; } if (errno != ENOENT || !notfoundOk) /* unexpected failure? */ @@ -5568,7 +5584,7 @@ pg_is_in_recovery(PG_FUNCTION_ARGS) /* * Returns timestamp of last recovered commit/abort record. */ -TimestampTz +static TimestampTz GetLatestXLogTime(void) { /* use volatile pointer to prevent code rearrangement */ @@ -5582,6 +5598,23 @@ GetLatestXLogTime(void) } /* + * Returns time of receipt of current chunk of XLOG data, as well as + * whether it was received from streaming replication or from archives. + */ +void +GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream) +{ + /* + * This must be executed in the startup process, since we don't export + * the relevant state to shared memory. + */ + Assert(InRecovery); + + *rtime = XLogReceiptTime; + *fromStream = (XLogReceiptSource == XLOG_FROM_STREAM); +} + +/* * Note that text field supplied is a parameter name and does not require * translation */ @@ -6060,6 +6093,9 @@ StartupXLOG(void) xlogctl->recoveryLastRecPtr = ReadRecPtr; SpinLockRelease(&xlogctl->info_lck); + /* Also ensure XLogReceiptTime has a sane value */ + XLogReceiptTime = GetCurrentTimestamp(); + /* * Let postmaster know we've started redo now, so that it can * launch bgwriter to perform restartpoints. We don't bother @@ -7647,7 +7683,7 @@ CreateRestartPoint(int flags) XLogRecPtr endptr; /* Get the current (or recent) end of xlog */ - endptr = GetWalRcvWriteRecPtr(); + endptr = GetWalRcvWriteRecPtr(NULL); PrevLogSeg(_logId, _logSeg); RemoveOldXlogFiles(_logId, _logSeg, endptr); @@ -8757,7 +8793,7 @@ pg_last_xlog_receive_location(PG_FUNCTION_ARGS) XLogRecPtr recptr; char location[MAXFNAMELEN]; - recptr = GetWalRcvWriteRecPtr(); + recptr = GetWalRcvWriteRecPtr(NULL); if (recptr.xlogid == 0 && recptr.xrecoff == 0) PG_RETURN_NULL(); @@ -9272,6 +9308,8 @@ retry: { if (WalRcvInProgress()) { + bool havedata; + /* * If we find an invalid record in the WAL streamed from * master, something is seriously wrong. There's little @@ -9289,28 +9327,62 @@ retry: } /* - * While walreceiver is active, wait for new WAL to arrive - * from primary. + * Walreceiver is active, so see if new data has arrived. + * + * We only advance XLogReceiptTime when we obtain fresh + * WAL from walreceiver and observe that we had already + * processed everything before the most recent "chunk" + * that it flushed to disk. In steady state where we are + * keeping up with the incoming data, XLogReceiptTime + * will be updated on each cycle. When we are behind, + * XLogReceiptTime will not advance, so the grace time + * alloted to conflicting queries will decrease. */ - receivedUpto = GetWalRcvWriteRecPtr(); if (XLByteLT(*RecPtr, receivedUpto)) + havedata = true; + else + { + XLogRecPtr latestChunkStart; + + receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart); + if (XLByteLT(*RecPtr, receivedUpto)) + { + havedata = true; + if (!XLByteLT(*RecPtr, latestChunkStart)) + XLogReceiptTime = GetCurrentTimestamp(); + } + else + havedata = false; + } + if (havedata) { /* * Great, streamed far enough. Open the file if it's - * not open already. + * not open already. Use XLOG_FROM_STREAM so that + * source info is set correctly and XLogReceiptTime + * isn't changed. */ if (readFile < 0) { readFile = XLogFileRead(readId, readSeg, PANIC, recoveryTargetTLI, - XLOG_FROM_PG_XLOG, false); + XLOG_FROM_STREAM, false); + Assert(readFile >= 0); switched_segment = true; + } + else + { + /* just make sure source info is correct... */ readSource = XLOG_FROM_STREAM; + XLogReceiptSource = XLOG_FROM_STREAM; } break; } + /* + * Data not here yet, so check for trigger then sleep. + */ if (CheckForStandbyTrigger()) goto triggered; @@ -9388,7 +9460,7 @@ retry: readFile = XLogFileReadAnyTLI(readId, readSeg, DEBUG2, sources); switched_segment = true; - if (readFile != -1) + if (readFile >= 0) break; /* |