aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xlog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r--src/backend/access/transam/xlog.c119
1 files changed, 115 insertions, 4 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 6fa5479c92b..a76aef37f3d 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -218,6 +218,8 @@ static bool recoveryPauseAtTarget = true;
static TransactionId recoveryTargetXid;
static TimestampTz recoveryTargetTime;
static char *recoveryTargetName;
+static int min_recovery_apply_delay = 0;
+static TimestampTz recoveryDelayUntilTime;
/* options taken from recovery.conf for XLOG streaming */
static bool StandbyModeRequested = false;
@@ -728,8 +730,10 @@ static bool holdingAllSlots = false;
static void readRecoveryCommandFile(void);
static void exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo);
-static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
+static bool recoveryStopsHere(XLogRecord *record, bool *includeThis, bool *delayThis);
static void recoveryPausesHere(void);
+static void recoveryApplyDelay(void);
+static bool SetRecoveryDelayUntilTime(TimestampTz xtime);
static void SetLatestXTime(TimestampTz xtime);
static void SetCurrentChunkStartTime(TimestampTz xtime);
static void CheckRequiredParameterValues(void);
@@ -5476,6 +5480,19 @@ readRecoveryCommandFile(void)
(errmsg_internal("trigger_file = '%s'",
TriggerFile)));
}
+ else if (strcmp(item->name, "min_recovery_apply_delay") == 0)
+ {
+ const char *hintmsg;
+
+ if (!parse_int(item->value, &min_recovery_apply_delay, GUC_UNIT_MS,
+ &hintmsg))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("parameter \"%s\" requires a temporal value", "min_recovery_apply_delay"),
+ hintmsg ? errhint("%s", _(hintmsg)) : 0));
+ ereport(DEBUG2,
+ (errmsg("min_recovery_apply_delay = '%s'", item->value)));
+ }
else
ereport(FATAL,
(errmsg("unrecognized recovery parameter \"%s\"",
@@ -5625,10 +5642,11 @@ exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo)
* We also track the timestamp of the latest applied COMMIT/ABORT
* record in XLogCtl->recoveryLastXTime, for logging purposes.
* Also, some information is saved in recoveryStopXid et al for use in
- * annotating the new timeline's history file.
+ * annotating the new timeline's history file; and recoveryDelayUntilTime
+ * is updated, for time-delayed standbys.
*/
static bool
-recoveryStopsHere(XLogRecord *record, bool *includeThis)
+recoveryStopsHere(XLogRecord *record, bool *includeThis, bool *delayThis)
{
bool stopsHere;
uint8 record_info;
@@ -5645,6 +5663,8 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis)
recordXactCommitData = (xl_xact_commit_compact *) XLogRecGetData(record);
recordXtime = recordXactCommitData->xact_time;
+
+ *delayThis = SetRecoveryDelayUntilTime(recordXactCommitData->xact_time);
}
else if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT)
{
@@ -5652,6 +5672,8 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis)
recordXactCommitData = (xl_xact_commit *) XLogRecGetData(record);
recordXtime = recordXactCommitData->xact_time;
+
+ *delayThis = SetRecoveryDelayUntilTime(recordXactCommitData->xact_time);
}
else if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_ABORT)
{
@@ -5659,6 +5681,13 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis)
recordXactAbortData = (xl_xact_abort *) XLogRecGetData(record);
recordXtime = recordXactAbortData->xact_time;
+
+ /*
+ * We deliberately choose not to delay aborts since they have no
+ * effect on MVCC. We already allow replay of records that don't
+ * have a timestamp, so there is already opportunity for issues
+ * caused by early conflicts on standbys.
+ */
}
else if (record->xl_rmid == RM_XLOG_ID && record_info == XLOG_RESTORE_POINT)
{
@@ -5667,6 +5696,8 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis)
recordRestorePointData = (xl_restore_point *) XLogRecGetData(record);
recordXtime = recordRestorePointData->rp_time;
strncpy(recordRPName, recordRestorePointData->rp_name, MAXFNAMELEN);
+
+ *delayThis = SetRecoveryDelayUntilTime(recordRestorePointData->rp_time);
}
else
return false;
@@ -5833,6 +5864,66 @@ SetRecoveryPause(bool recoveryPause)
SpinLockRelease(&xlogctl->info_lck);
}
+static bool
+SetRecoveryDelayUntilTime(TimestampTz xtime)
+{
+ if (min_recovery_apply_delay != 0)
+ {
+ recoveryDelayUntilTime =
+ TimestampTzPlusMilliseconds(xtime, min_recovery_apply_delay);
+
+ return true;
+ }
+
+ return false;
+}
+/*
+ * When min_recovery_apply_delay is set, we wait long enough to make sure
+ * certain record types are applied at least that interval behind the master.
+ * See recoveryStopsHere().
+ *
+ * Note that the delay is calculated between the WAL record log time and
+ * the current time on standby. We would prefer to keep track of when this
+ * standby received each WAL record, which would allow a more consistent
+ * approach and one not affected by time synchronisation issues, but that
+ * is significantly more effort and complexity for little actual gain in
+ * usability.
+ */
+static void
+recoveryApplyDelay(void)
+{
+ while (true)
+ {
+ long secs;
+ int microsecs;
+
+ ResetLatch(&XLogCtl->recoveryWakeupLatch);
+
+ /* might change the trigger file's location */
+ HandleStartupProcInterrupts();
+
+ if (CheckForStandbyTrigger())
+ break;
+
+ /*
+ * Wait for difference between GetCurrentTimestamp() and
+ * recoveryDelayUntilTime
+ */
+ TimestampDifference(GetCurrentTimestamp(), recoveryDelayUntilTime,
+ &secs, &microsecs);
+
+ if (secs <= 0 && microsecs <=0)
+ break;
+
+ elog(DEBUG2, "recovery apply delay %ld seconds, %d milliseconds",
+ secs, microsecs / 1000);
+
+ WaitLatch(&XLogCtl->recoveryWakeupLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
+ secs * 1000L + microsecs / 1000);
+ }
+}
+
/*
* Save timestamp of latest processed commit/abort record.
*
@@ -6660,6 +6751,7 @@ StartupXLOG(void)
{
bool recoveryContinue = true;
bool recoveryApply = true;
+ bool recoveryDelay = false;
ErrorContextCallback errcallback;
TimestampTz xtime;
@@ -6719,7 +6811,7 @@ StartupXLOG(void)
/*
* Have we reached our recovery target?
*/
- if (recoveryStopsHere(record, &recoveryApply))
+ if (recoveryStopsHere(record, &recoveryApply, &recoveryDelay))
{
if (recoveryPauseAtTarget)
{
@@ -6734,6 +6826,25 @@ StartupXLOG(void)
break;
}
+ /*
+ * If we've been asked to lag the master, wait on
+ * latch until enough time has passed.
+ */
+ if (recoveryDelay)
+ {
+ recoveryApplyDelay();
+
+ /*
+ * We test for paused recovery again here. If
+ * user sets delayed apply, it may be because
+ * they expect to pause recovery in case of
+ * problems, so we must test again here otherwise
+ * pausing during the delay-wait wouldn't work.
+ */
+ if (xlogctl->recoveryPause)
+ recoveryPausesHere();
+ }
+
/* Setup error traceback support for ereport() */
errcallback.callback = rm_redo_error_callback;
errcallback.arg = (void *) record;