aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/transam/xlog.c114
-rw-r--r--src/backend/replication/walreceiver.c3
-rw-r--r--src/backend/replication/walreceiverfuncs.c17
-rw-r--r--src/backend/storage/ipc/standby.c131
-rw-r--r--src/backend/storage/lmgr/proc.c24
-rw-r--r--src/backend/utils/misc/guc.c34
-rw-r--r--src/backend/utils/misc/postgresql.conf.sample14
-rw-r--r--src/include/access/xlog.h21
-rw-r--r--src/include/replication/walreceiver.h34
-rw-r--r--src/include/storage/standby.h7
10 files changed, 266 insertions, 133 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 7e3cf80c031..e66b75aa7ad 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.427 2010/06/28 19:46:19 rhaas Exp $
+ * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.428 2010/07/03 20:43:57 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -72,7 +72,6 @@ int XLogArchiveTimeout = 0;
bool XLogArchiveMode = false;
char *XLogArchiveCommand = NULL;
bool EnableHotStandby = false;
-int MaxStandbyDelay = 30 * 1000;
bool fullPageWrites = true;
bool log_checkpoints = false;
int sync_method = DEFAULT_SYNC_METHOD;
@@ -450,6 +449,15 @@ static ControlFileData *ControlFile = NULL;
static XLogwrtResult LogwrtResult = {{0, 0}, {0, 0}};
/*
+ * Codes indicating where we got a WAL file from during recovery, or where
+ * to attempt to get one. These are chosen so that they can be OR'd together
+ * in a bitmask state variable.
+ */
+#define XLOG_FROM_ARCHIVE (1<<0) /* Restored using restore_command */
+#define XLOG_FROM_PG_XLOG (1<<1) /* Existing file in pg_xlog */
+#define XLOG_FROM_STREAM (1<<2) /* Streamed from master */
+
+/*
* openLogFile is -1 or a kernel FD for an open log file segment.
* When it's open, openLogOff is the current seek offset in the file.
* openLogId/openLogSeg identify the segment. These variables are only
@@ -461,14 +469,6 @@ static uint32 openLogSeg = 0;
static uint32 openLogOff = 0;
/*
- * Codes indicating where we got a WAL file from during recovery, or where
- * to attempt to get one.
- */
-#define XLOG_FROM_ARCHIVE (1<<0) /* Restored using restore_command */
-#define XLOG_FROM_PG_XLOG (1<<1) /* Existing file in pg_xlog */
-#define XLOG_FROM_STREAM (1<<2) /* Streamed from master */
-
-/*
* These variables are used similarly to the ones above, but for reading
* the XLOG. Note, however, that readOff generally represents the offset
* of the page just read, not the seek position of the FD itself, which
@@ -487,7 +487,16 @@ static int readSource = 0; /* XLOG_FROM_* code */
* Keeps track of which sources we've tried to read the current WAL
* record from and failed.
*/
-static int failedSources = 0;
+static int failedSources = 0; /* OR of XLOG_FROM_* codes */
+
+/*
+ * These variables track when we last obtained some WAL data to process,
+ * and where we got it from. (XLogReceiptSource is initially the same as
+ * readSource, but readSource gets reset to zero when we don't have data
+ * to process right now.)
+ */
+static TimestampTz XLogReceiptTime = 0;
+static int XLogReceiptSource = 0; /* XLOG_FROM_* code */
/* Buffer for currently read page (XLOG_BLCKSZ bytes) */
static char *readBuf = NULL;
@@ -2626,7 +2635,7 @@ XLogFileOpen(uint32 log, uint32 seg)
* Open a logfile segment for reading (during recovery).
*
* If source = XLOG_FROM_ARCHIVE, the segment is retrieved from archive.
- * If source = XLOG_FROM_PG_XLOG, it's read from pg_xlog.
+ * Otherwise, it's assumed to be already available in pg_xlog.
*/
static int
XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
@@ -2655,6 +2664,7 @@ XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
break;
case XLOG_FROM_PG_XLOG:
+ case XLOG_FROM_STREAM:
XLogFilePath(path, tli, log, seg);
restoredFromArchive = false;
break;
@@ -2674,7 +2684,13 @@ XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
xlogfname);
set_ps_display(activitymsg, false);
+ /* Track source of data in assorted state variables */
readSource = source;
+ XLogReceiptSource = source;
+ /* In FROM_STREAM case, caller tracks receipt time, not me */
+ if (source != XLOG_FROM_STREAM)
+ XLogReceiptTime = GetCurrentTimestamp();
+
return fd;
}
if (errno != ENOENT || !notfoundOk) /* unexpected failure? */
@@ -5568,7 +5584,7 @@ pg_is_in_recovery(PG_FUNCTION_ARGS)
/*
* Returns timestamp of last recovered commit/abort record.
*/
-TimestampTz
+static TimestampTz
GetLatestXLogTime(void)
{
/* use volatile pointer to prevent code rearrangement */
@@ -5582,6 +5598,23 @@ GetLatestXLogTime(void)
}
/*
+ * Returns time of receipt of current chunk of XLOG data, as well as
+ * whether it was received from streaming replication or from archives.
+ */
+void
+GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream)
+{
+ /*
+ * This must be executed in the startup process, since we don't export
+ * the relevant state to shared memory.
+ */
+ Assert(InRecovery);
+
+ *rtime = XLogReceiptTime;
+ *fromStream = (XLogReceiptSource == XLOG_FROM_STREAM);
+}
+
+/*
* Note that text field supplied is a parameter name and does not require
* translation
*/
@@ -6060,6 +6093,9 @@ StartupXLOG(void)
xlogctl->recoveryLastRecPtr = ReadRecPtr;
SpinLockRelease(&xlogctl->info_lck);
+ /* Also ensure XLogReceiptTime has a sane value */
+ XLogReceiptTime = GetCurrentTimestamp();
+
/*
* Let postmaster know we've started redo now, so that it can
* launch bgwriter to perform restartpoints. We don't bother
@@ -7647,7 +7683,7 @@ CreateRestartPoint(int flags)
XLogRecPtr endptr;
/* Get the current (or recent) end of xlog */
- endptr = GetWalRcvWriteRecPtr();
+ endptr = GetWalRcvWriteRecPtr(NULL);
PrevLogSeg(_logId, _logSeg);
RemoveOldXlogFiles(_logId, _logSeg, endptr);
@@ -8757,7 +8793,7 @@ pg_last_xlog_receive_location(PG_FUNCTION_ARGS)
XLogRecPtr recptr;
char location[MAXFNAMELEN];
- recptr = GetWalRcvWriteRecPtr();
+ recptr = GetWalRcvWriteRecPtr(NULL);
if (recptr.xlogid == 0 && recptr.xrecoff == 0)
PG_RETURN_NULL();
@@ -9272,6 +9308,8 @@ retry:
{
if (WalRcvInProgress())
{
+ bool havedata;
+
/*
* If we find an invalid record in the WAL streamed from
* master, something is seriously wrong. There's little
@@ -9289,28 +9327,62 @@ retry:
}
/*
- * While walreceiver is active, wait for new WAL to arrive
- * from primary.
+ * Walreceiver is active, so see if new data has arrived.
+ *
+ * We only advance XLogReceiptTime when we obtain fresh
+ * WAL from walreceiver and observe that we had already
+ * processed everything before the most recent "chunk"
+ * that it flushed to disk. In steady state where we are
+ * keeping up with the incoming data, XLogReceiptTime
+ * will be updated on each cycle. When we are behind,
+ * XLogReceiptTime will not advance, so the grace time
+ * alloted to conflicting queries will decrease.
*/
- receivedUpto = GetWalRcvWriteRecPtr();
if (XLByteLT(*RecPtr, receivedUpto))
+ havedata = true;
+ else
+ {
+ XLogRecPtr latestChunkStart;
+
+ receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart);
+ if (XLByteLT(*RecPtr, receivedUpto))
+ {
+ havedata = true;
+ if (!XLByteLT(*RecPtr, latestChunkStart))
+ XLogReceiptTime = GetCurrentTimestamp();
+ }
+ else
+ havedata = false;
+ }
+ if (havedata)
{
/*
* Great, streamed far enough. Open the file if it's
- * not open already.
+ * not open already. Use XLOG_FROM_STREAM so that
+ * source info is set correctly and XLogReceiptTime
+ * isn't changed.
*/
if (readFile < 0)
{
readFile =
XLogFileRead(readId, readSeg, PANIC,
recoveryTargetTLI,
- XLOG_FROM_PG_XLOG, false);
+ XLOG_FROM_STREAM, false);
+ Assert(readFile >= 0);
switched_segment = true;
+ }
+ else
+ {
+ /* just make sure source info is correct... */
readSource = XLOG_FROM_STREAM;
+ XLogReceiptSource = XLOG_FROM_STREAM;
}
break;
}
+ /*
+ * Data not here yet, so check for trigger then sleep.
+ */
if (CheckForStandbyTrigger())
goto triggered;
@@ -9388,7 +9460,7 @@ retry:
readFile = XLogFileReadAnyTLI(readId, readSeg, DEBUG2,
sources);
switched_segment = true;
- if (readFile != -1)
+ if (readFile >= 0)
break;
/*
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index c5a6b315eba..153b7ff0e50 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -29,7 +29,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.14 2010/06/09 15:04:07 heikki Exp $
+ * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.15 2010/07/03 20:43:57 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -524,6 +524,7 @@ XLogWalRcvFlush(void)
/* Update shared-memory status */
SpinLockAcquire(&walrcv->mutex);
+ walrcv->latestChunkStart = walrcv->receivedUpto;
walrcv->receivedUpto = LogstreamResult.Flush;
SpinLockRelease(&walrcv->mutex);
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index 78ee7fb9f7e..4bc3bd875c0 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -10,7 +10,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.5 2010/04/28 16:54:15 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.6 2010/07/03 20:43:57 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -187,10 +187,11 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
if (recptr.xrecoff % XLogSegSize != 0)
recptr.xrecoff -= recptr.xrecoff % XLogSegSize;
+ SpinLockAcquire(&walrcv->mutex);
+
/* It better be stopped before we try to restart it */
Assert(walrcv->walRcvState == WALRCV_STOPPED);
- SpinLockAcquire(&walrcv->mutex);
if (conninfo != NULL)
strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
else
@@ -199,16 +200,22 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
walrcv->startTime = now;
walrcv->receivedUpto = recptr;
+ walrcv->latestChunkStart = recptr;
+
SpinLockRelease(&walrcv->mutex);
SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
}
/*
- * Returns the byte position that walreceiver has written
+ * Returns the last+1 byte position that walreceiver has written.
+ *
+ * Optionally, returns the previous chunk start, that is the first byte
+ * written in the most recent walreceiver flush cycle. Callers not
+ * interested in that value may pass NULL for latestChunkStart.
*/
XLogRecPtr
-GetWalRcvWriteRecPtr(void)
+GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
@@ -216,6 +223,8 @@ GetWalRcvWriteRecPtr(void)
SpinLockAcquire(&walrcv->mutex);
recptr = walrcv->receivedUpto;
+ if (latestChunkStart)
+ *latestChunkStart = walrcv->latestChunkStart;
SpinLockRelease(&walrcv->mutex);
return recptr;
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index a61a4e62169..8525492bc25 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -11,7 +11,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/storage/ipc/standby.c,v 1.25 2010/06/14 00:49:24 itagaki Exp $
+ * $PostgreSQL: pgsql/src/backend/storage/ipc/standby.c,v 1.26 2010/07/03 20:43:58 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -30,7 +30,10 @@
#include "storage/standby.h"
#include "utils/ps_status.h"
+/* User-settable GUC parameters */
int vacuum_defer_cleanup_age;
+int max_standby_archive_delay = 30 * 1000;
+int max_standby_streaming_delay = 30 * 1000;
static List *RecoveryLockList;
@@ -40,13 +43,14 @@ static void ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid);
static void LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks);
+
/*
* InitRecoveryTransactionEnvironment
- * Initiallize tracking of in-progress transactions in master
+ * Initialize tracking of in-progress transactions in master
*
* We need to issue shared invalidations and hold locks. Holding locks
- * means others may want to wait on us, so we need to make lock table
- * inserts to appear like a transaction. We could create and delete
+ * means others may want to wait on us, so we need to make a lock table
+ * vxact entry like a real transaction. We could create and delete
* lock table entries for each transaction but its simpler just to create
* one permanent entry and leave it there all the time. Locks are then
* acquired and released as needed. Yes, this means you can see the
@@ -58,7 +62,7 @@ InitRecoveryTransactionEnvironment(void)
VirtualTransactionId vxid;
/*
- * Initialise shared invalidation management for Startup process, being
+ * Initialize shared invalidation management for Startup process, being
* careful to register ourselves as a sendOnly process so we don't need to
* read messages, nor will we get signalled when the queue starts filling
* up.
@@ -113,6 +117,36 @@ ShutdownRecoveryTransactionEnvironment(void)
* -----------------------------------------------------
*/
+/*
+ * Determine the cutoff time at which we want to start canceling conflicting
+ * transactions. Returns zero (a time safely in the past) if we are willing
+ * to wait forever.
+ */
+static TimestampTz
+GetStandbyLimitTime(void)
+{
+ TimestampTz rtime;
+ bool fromStream;
+
+ /*
+ * The cutoff time is the last WAL data receipt time plus the appropriate
+ * delay variable. Delay of -1 means wait forever.
+ */
+ GetXLogReceiptTime(&rtime, &fromStream);
+ if (fromStream)
+ {
+ if (max_standby_streaming_delay < 0)
+ return 0; /* wait forever */
+ return TimestampTzPlusMilliseconds(rtime, max_standby_streaming_delay);
+ }
+ else
+ {
+ if (max_standby_archive_delay < 0)
+ return 0; /* wait forever */
+ return TimestampTzPlusMilliseconds(rtime, max_standby_archive_delay);
+ }
+}
+
#define STANDBY_INITIAL_WAIT_US 1000
static int standbyWait_us = STANDBY_INITIAL_WAIT_US;
@@ -124,10 +158,11 @@ static int standbyWait_us = STANDBY_INITIAL_WAIT_US;
static bool
WaitExceedsMaxStandbyDelay(void)
{
- /* Are we past max_standby_delay? */
- if (MaxStandbyDelay >= 0 &&
- TimestampDifferenceExceeds(GetLatestXLogTime(), GetCurrentTimestamp(),
- MaxStandbyDelay))
+ TimestampTz ltime;
+
+ /* Are we past the limit time? */
+ ltime = GetStandbyLimitTime();
+ if (ltime && GetCurrentTimestamp() >= ltime)
return true;
/*
@@ -203,8 +238,8 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
pid = CancelVirtualTransaction(*waitlist, reason);
/*
- * Wait awhile for it to die so that we avoid flooding an
- * unresponsive backend when system is heavily loaded.
+ * Wait a little bit for it to die so that we avoid flooding
+ * an unresponsive backend when system is heavily loaded.
*/
if (pid != 0)
pg_usleep(5000L);
@@ -286,7 +321,7 @@ void
ResolveRecoveryConflictWithDatabase(Oid dbid)
{
/*
- * We don't do ResolveRecoveryConflictWithVirutalXIDs() here since that
+ * We don't do ResolveRecoveryConflictWithVirtualXIDs() here since that
* only waits for transactions and completely idle sessions would block
* us. This is rare enough that we do this as simply as possible: no wait,
* just force them off immediately.
@@ -355,12 +390,11 @@ ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid)
* the limit of our patience. The sleep in LockBufferForCleanup() is
* performed here, for code clarity.
*
- * Resolve conflict by sending a SIGUSR1 reason to all backends to check if
+ * Resolve conflicts by sending a PROCSIG signal to all backends to check if
* they hold one of the buffer pins that is blocking Startup process. If so,
* backends will take an appropriate error action, ERROR or FATAL.
*
- * We also check for deadlocks before we wait, though applications that cause
- * these will be extremely rare. Deadlocks occur because if queries
+ * We also must check for deadlocks. Deadlocks occur because if queries
* wait on a lock, that must be behind an AccessExclusiveLock, which can only
* be cleared if the Startup process replays a transaction completion record.
* If Startup process is also waiting then that is a deadlock. The deadlock
@@ -368,66 +402,51 @@ ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid)
* Startup is sleeping and the query waits on a lock. We protect against
* only the former sequence here, the latter sequence is checked prior to
* the query sleeping, in CheckRecoveryConflictDeadlock().
+ *
+ * Deadlocks are extremely rare, and relatively expensive to check for,
+ * so we don't do a deadlock check right away ... only if we have had to wait
+ * at least deadlock_timeout. Most of the logic about that is in proc.c.
*/
void
ResolveRecoveryConflictWithBufferPin(void)
{
bool sig_alarm_enabled = false;
+ TimestampTz ltime;
+ TimestampTz now;
Assert(InHotStandby);
- if (MaxStandbyDelay == 0)
- {
- /*
- * We don't want to wait, so just tell everybody holding the pin to
- * get out of town.
- */
- SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
- }
- else if (MaxStandbyDelay < 0)
- {
- TimestampTz now = GetCurrentTimestamp();
+ ltime = GetStandbyLimitTime();
+ now = GetCurrentTimestamp();
+ if (!ltime)
+ {
/*
- * Set timeout for deadlock check (only)
+ * We're willing to wait forever for conflicts, so set timeout for
+ * deadlock check (only)
*/
if (enable_standby_sig_alarm(now, now, true))
sig_alarm_enabled = true;
else
elog(FATAL, "could not set timer for process wakeup");
}
+ else if (now >= ltime)
+ {
+ /*
+ * We're already behind, so clear a path as quickly as possible.
+ */
+ SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
+ }
else
{
- TimestampTz then = GetLatestXLogTime();
- TimestampTz now = GetCurrentTimestamp();
-
- /* Are we past max_standby_delay? */
- if (TimestampDifferenceExceeds(then, now, MaxStandbyDelay))
- {
- /*
- * We're already behind, so clear a path as quickly as possible.
- */
- SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
- }
+ /*
+ * Wake up at ltime, and check for deadlocks as well if we will be
+ * waiting longer than deadlock_timeout
+ */
+ if (enable_standby_sig_alarm(now, ltime, false))
+ sig_alarm_enabled = true;
else
- {
- TimestampTz max_standby_time;
-
- /*
- * At what point in the future do we hit MaxStandbyDelay?
- */
- max_standby_time = TimestampTzPlusMilliseconds(then, MaxStandbyDelay);
- Assert(max_standby_time > now);
-
- /*
- * Wake up at MaxStandby delay, and check for deadlocks as well
- * if we will be waiting longer than deadlock_timeout
- */
- if (enable_standby_sig_alarm(now, max_standby_time, false))
- sig_alarm_enabled = true;
- else
- elog(FATAL, "could not set timer for process wakeup");
- }
+ elog(FATAL, "could not set timer for process wakeup");
}
/* Wait to be signaled by UnpinBuffer() */
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index d7eb8695038..2d77be0a34e 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/storage/lmgr/proc.c,v 1.219 2010/05/26 19:52:52 sriggs Exp $
+ * $PostgreSQL: pgsql/src/backend/storage/lmgr/proc.c,v 1.220 2010/07/03 20:43:58 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -1627,12 +1627,13 @@ handle_sig_alarm(SIGNAL_ARGS)
bool
enable_standby_sig_alarm(TimestampTz now, TimestampTz fin_time, bool deadlock_only)
{
- TimestampTz deadlock_time = TimestampTzPlusMilliseconds(now, DeadlockTimeout);
+ TimestampTz deadlock_time = TimestampTzPlusMilliseconds(now,
+ DeadlockTimeout);
if (deadlock_only)
{
/*
- * Wake up at DeadlockTimeout only, then wait forever
+ * Wake up at deadlock_time only, then wait forever
*/
statement_fin_time = deadlock_time;
deadlock_timeout_active = true;
@@ -1641,7 +1642,7 @@ enable_standby_sig_alarm(TimestampTz now, TimestampTz fin_time, bool deadlock_on
else if (fin_time > deadlock_time)
{
/*
- * Wake up at DeadlockTimeout, then again at MaxStandbyDelay
+ * Wake up at deadlock_time, then again at fin_time
*/
statement_fin_time = deadlock_time;
statement_fin_time2 = fin_time;
@@ -1651,7 +1652,7 @@ enable_standby_sig_alarm(TimestampTz now, TimestampTz fin_time, bool deadlock_on
else
{
/*
- * Wake only at MaxStandbyDelay because its fairly soon
+ * Wake only at fin_time because its fairly soon
*/
statement_fin_time = fin_time;
deadlock_timeout_active = false;
@@ -1729,15 +1730,16 @@ CheckStandbyTimeout(void)
if (deadlock_timeout_active)
{
/*
- * We're still waiting when we reach DeadlockTimeout, so send out a request
- * to have other backends check themselves for deadlock. Then continue
- * waiting until MaxStandbyDelay.
+ * We're still waiting when we reach deadlock timeout, so send out
+ * a request to have other backends check themselves for
+ * deadlock. Then continue waiting until statement_fin_time,
+ * if that's set.
*/
SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
deadlock_timeout_active = false;
/*
- * Begin second waiting period to MaxStandbyDelay if required.
+ * Begin second waiting period if required.
*/
if (statement_timeout_active)
{
@@ -1748,8 +1750,8 @@ CheckStandbyTimeout(void)
else
{
/*
- * We've now reached MaxStandbyDelay, so ask all conflicts to leave, cos
- * its time for us to press ahead with applying changes in recovery.
+ * We've now reached statement_fin_time, so ask all conflicts to
+ * leave, so we can press ahead with applying changes in recovery.
*/
SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
}
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 73305486569..f2caad2cd9a 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -10,7 +10,7 @@
* Written by Peter Eisentraut <peter_e@gmx.net>.
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/utils/misc/guc.c,v 1.557 2010/06/25 13:11:25 sriggs Exp $
+ * $PostgreSQL: pgsql/src/backend/utils/misc/guc.c,v 1.558 2010/07/03 20:43:58 tgl Exp $
*
*--------------------------------------------------------------------
*/
@@ -57,6 +57,7 @@
#include "postmaster/walwriter.h"
#include "replication/walsender.h"
#include "storage/bufmgr.h"
+#include "storage/standby.h"
#include "storage/fd.h"
#include "tcop/tcopprot.h"
#include "tsearch/ts_cache.h"
@@ -116,7 +117,6 @@ extern char *default_tablespace;
extern char *temp_tablespaces;
extern bool synchronize_seqscans;
extern bool fullPageWrites;
-extern int vacuum_defer_cleanup_age;
extern int ssl_renegotiation_limit;
#ifdef TRACE_SORT
@@ -1373,6 +1373,26 @@ static struct config_int ConfigureNamesInt[] =
1000, 1, INT_MAX / 1000, NULL, NULL
},
+ {
+ {"max_standby_archive_delay", PGC_SIGHUP, WAL_STANDBY_SERVERS,
+ gettext_noop("Sets the maximum delay before canceling queries when a hot standby server is processing archived WAL data."),
+ NULL,
+ GUC_UNIT_MS
+ },
+ &max_standby_archive_delay,
+ 30 * 1000, -1, INT_MAX / 1000, NULL, NULL
+ },
+
+ {
+ {"max_standby_streaming_delay", PGC_SIGHUP, WAL_STANDBY_SERVERS,
+ gettext_noop("Sets the maximum delay before canceling queries when a hot standby server is processing streamed WAL data."),
+ NULL,
+ GUC_UNIT_MS
+ },
+ &max_standby_streaming_delay,
+ 30 * 1000, -1, INT_MAX / 1000, NULL, NULL
+ },
+
/*
* Note: MaxBackends is limited to INT_MAX/4 because some places compute
* 4*MaxBackends without any overflow check. This check is made in
@@ -1393,16 +1413,6 @@ static struct config_int ConfigureNamesInt[] =
},
{
- {"max_standby_delay", PGC_SIGHUP, WAL_STANDBY_SERVERS,
- gettext_noop("Sets the maximum delay to avoid conflict processing on hot standby servers."),
- NULL,
- GUC_UNIT_MS
- },
- &MaxStandbyDelay,
- 30 * 1000, -1, INT_MAX / 1000, NULL, NULL
- },
-
- {
{"superuser_reserved_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
gettext_noop("Sets the number of connection slots reserved for superusers."),
NULL
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 5ea568a3551..e765664ecc0 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -186,15 +186,19 @@
# - Streaming Replication -
#max_wal_senders = 0 # max number of walsender processes
-#wal_sender_delay = 200ms # 1-10000 milliseconds
+#wal_sender_delay = 200ms # walsender cycle time, 1-10000 milliseconds
#wal_keep_segments = 0 # in logfile segments, 16MB each; 0 disables
# - Standby Servers -
-#hot_standby = off # allows queries during recovery
-#max_standby_delay = 30s # max acceptable lag to allow queries to
- # complete without conflict; -1 means forever
-#vacuum_defer_cleanup_age = 0 # num transactions by which cleanup is deferred
+#hot_standby = off # "on" allows queries during recovery
+#max_standby_archive_delay = 30s # max delay before canceling queries
+ # when reading WAL from archive;
+ # -1 allows indefinite delay
+#max_standby_streaming_delay = 30s # max delay before canceling queries
+ # when reading streaming WAL;
+ # -1 allows indefinite delay
+#vacuum_defer_cleanup_age = 0 # number of transactions by which cleanup is deferred
#------------------------------------------------------------------------------
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index cbadd7f91fb..27e7f404d8d 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -6,7 +6,7 @@
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/include/access/xlog.h,v 1.113 2010/06/17 16:41:25 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/access/xlog.h,v 1.114 2010/07/03 20:43:58 tgl Exp $
*/
#ifndef XLOG_H
#define XLOG_H
@@ -135,22 +135,25 @@ typedef struct XLogRecData
extern PGDLLIMPORT TimeLineID ThisTimeLineID; /* current TLI */
/*
- * Prior to 8.4, all activity during recovery was carried out by Startup
+ * Prior to 8.4, all activity during recovery was carried out by the startup
* process. This local variable continues to be used in many parts of the
- * code to indicate actions taken by RecoveryManagers. Other processes who
- * potentially perform work during recovery should check RecoveryInProgress()
- * see XLogCtl notes in xlog.c
+ * code to indicate actions taken by RecoveryManagers. Other processes that
+ * potentially perform work during recovery should check RecoveryInProgress().
+ * See XLogCtl notes in xlog.c.
*/
extern bool InRecovery;
/*
* Like InRecovery, standbyState is only valid in the startup process.
+ * In all other processes it will have the value STANDBY_DISABLED (so
+ * InHotStandby will read as FALSE).
*
* In DISABLED state, we're performing crash recovery or hot standby was
* disabled in recovery.conf.
*
- * In INITIALIZED state, we haven't yet received a RUNNING_XACTS or shutdown
- * checkpoint record to initialize our master transaction tracking system.
+ * In INITIALIZED state, we've run InitRecoveryTransactionEnvironment, but
+ * we haven't yet processed a RUNNING_XACTS or shutdown-checkpoint WAL record
+ * to initialize our master-transaction tracking system.
*
* When the transaction tracking is initialized, we enter the SNAPSHOT_PENDING
* state. The tracked information might still be incomplete, so we can't allow
@@ -168,6 +171,7 @@ typedef enum
STANDBY_SNAPSHOT_PENDING,
STANDBY_SNAPSHOT_READY
} HotStandbyState;
+
extern HotStandbyState standbyState;
#define InHotStandby (standbyState >= STANDBY_SNAPSHOT_PENDING)
@@ -193,7 +197,6 @@ extern int XLogArchiveTimeout;
extern bool XLogArchiveMode;
extern char *XLogArchiveCommand;
extern bool EnableHotStandby;
-extern int MaxStandbyDelay;
extern bool log_checkpoints;
/* WAL levels */
@@ -279,7 +282,7 @@ extern void issue_xlog_fsync(int fd, uint32 log, uint32 seg);
extern bool RecoveryInProgress(void);
extern bool XLogInsertAllowed(void);
-extern TimestampTz GetLatestXLogTime(void);
+extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
extern void UpdateControlFile(void);
extern uint64 GetSystemIdentifier(void);
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 5dcaeba3f33..734380ee4f8 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -5,7 +5,7 @@
*
* Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
*
- * $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.9 2010/06/03 22:17:32 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.10 2010/07/03 20:43:58 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -41,25 +41,35 @@ typedef enum
typedef struct
{
/*
- * connection string; is used for walreceiver to connect with the primary.
- */
- char conninfo[MAXCONNINFO];
-
- /*
- * PID of currently active walreceiver process, and the current state.
+ * PID of currently active walreceiver process, its current state and
+ * start time (actually, the time at which it was requested to be started).
*/
pid_t pid;
WalRcvState walRcvState;
pg_time_t startTime;
/*
- * receivedUpto-1 is the last byte position that has been already
- * received. When startup process starts the walreceiver, it sets this to
- * the point where it wants the streaming to begin. After that,
- * walreceiver updates this whenever it flushes the received WAL.
+ * receivedUpto-1 is the last byte position that has already been
+ * received. When startup process starts the walreceiver, it sets
+ * receivedUpto to the point where it wants the streaming to begin.
+ * After that, walreceiver updates this whenever it flushes the received
+ * WAL to disk.
*/
XLogRecPtr receivedUpto;
+ /*
+ * latestChunkStart is the starting byte position of the current "batch"
+ * of received WAL. It's actually the same as the previous value of
+ * receivedUpto before the last flush to disk. Startup process can use
+ * this to detect whether it's keeping up or not.
+ */
+ XLogRecPtr latestChunkStart;
+
+ /*
+ * connection string; is used for walreceiver to connect with the primary.
+ */
+ char conninfo[MAXCONNINFO];
+
slock_t mutex; /* locks shared variables shown above */
} WalRcvData;
@@ -83,6 +93,6 @@ extern void ShutdownWalRcv(void);
extern bool WalRcvInProgress(void);
extern XLogRecPtr WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished);
extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo);
-extern XLogRecPtr GetWalRcvWriteRecPtr(void);
+extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart);
#endif /* _WALRECEIVER_H */
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index 9159301a168..0654c5bcccf 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/include/storage/standby.h,v 1.10 2010/05/13 11:15:38 sriggs Exp $
+ * $PostgreSQL: pgsql/src/include/storage/standby.h,v 1.11 2010/07/03 20:43:58 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -19,7 +19,10 @@
#include "storage/procsignal.h"
#include "storage/relfilenode.h"
+/* User-settable GUC parameters */
extern int vacuum_defer_cleanup_age;
+extern int max_standby_archive_delay;
+extern int max_standby_streaming_delay;
extern void InitRecoveryTransactionEnvironment(void);
extern void ShutdownRecoveryTransactionEnvironment(void);
@@ -83,7 +86,7 @@ extern void standby_desc(StringInfo buf, uint8 xl_info, char *rec);
/*
* Declarations for GetRunningTransactionData(). Similar to Snapshots, but
* not quite. This has nothing at all to do with visibility on this server,
- * so this is completely separate from snapmgr.c and snapmgr.h
+ * so this is completely separate from snapmgr.c and snapmgr.h.
* This data is important for creating the initial snapshot state on a
* standby server. We need lots more information than a normal snapshot,
* hence we use a specific data structure for our needs. This data