diff options
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r-- | src/backend/access/transam/xlog.c | 119 |
1 files changed, 115 insertions, 4 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 6fa5479c92b..a76aef37f3d 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -218,6 +218,8 @@ static bool recoveryPauseAtTarget = true; static TransactionId recoveryTargetXid; static TimestampTz recoveryTargetTime; static char *recoveryTargetName; +static int min_recovery_apply_delay = 0; +static TimestampTz recoveryDelayUntilTime; /* options taken from recovery.conf for XLOG streaming */ static bool StandbyModeRequested = false; @@ -728,8 +730,10 @@ static bool holdingAllSlots = false; static void readRecoveryCommandFile(void); static void exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo); -static bool recoveryStopsHere(XLogRecord *record, bool *includeThis); +static bool recoveryStopsHere(XLogRecord *record, bool *includeThis, bool *delayThis); static void recoveryPausesHere(void); +static void recoveryApplyDelay(void); +static bool SetRecoveryDelayUntilTime(TimestampTz xtime); static void SetLatestXTime(TimestampTz xtime); static void SetCurrentChunkStartTime(TimestampTz xtime); static void CheckRequiredParameterValues(void); @@ -5476,6 +5480,19 @@ readRecoveryCommandFile(void) (errmsg_internal("trigger_file = '%s'", TriggerFile))); } + else if (strcmp(item->name, "min_recovery_apply_delay") == 0) + { + const char *hintmsg; + + if (!parse_int(item->value, &min_recovery_apply_delay, GUC_UNIT_MS, + &hintmsg)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("parameter \"%s\" requires a temporal value", "min_recovery_apply_delay"), + hintmsg ? errhint("%s", _(hintmsg)) : 0)); + ereport(DEBUG2, + (errmsg("min_recovery_apply_delay = '%s'", item->value))); + } else ereport(FATAL, (errmsg("unrecognized recovery parameter \"%s\"", @@ -5625,10 +5642,11 @@ exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo) * We also track the timestamp of the latest applied COMMIT/ABORT * record in XLogCtl->recoveryLastXTime, for logging purposes. * Also, some information is saved in recoveryStopXid et al for use in - * annotating the new timeline's history file. + * annotating the new timeline's history file; and recoveryDelayUntilTime + * is updated, for time-delayed standbys. */ static bool -recoveryStopsHere(XLogRecord *record, bool *includeThis) +recoveryStopsHere(XLogRecord *record, bool *includeThis, bool *delayThis) { bool stopsHere; uint8 record_info; @@ -5645,6 +5663,8 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis) recordXactCommitData = (xl_xact_commit_compact *) XLogRecGetData(record); recordXtime = recordXactCommitData->xact_time; + + *delayThis = SetRecoveryDelayUntilTime(recordXactCommitData->xact_time); } else if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT) { @@ -5652,6 +5672,8 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis) recordXactCommitData = (xl_xact_commit *) XLogRecGetData(record); recordXtime = recordXactCommitData->xact_time; + + *delayThis = SetRecoveryDelayUntilTime(recordXactCommitData->xact_time); } else if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_ABORT) { @@ -5659,6 +5681,13 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis) recordXactAbortData = (xl_xact_abort *) XLogRecGetData(record); recordXtime = recordXactAbortData->xact_time; + + /* + * We deliberately choose not to delay aborts since they have no + * effect on MVCC. We already allow replay of records that don't + * have a timestamp, so there is already opportunity for issues + * caused by early conflicts on standbys. + */ } else if (record->xl_rmid == RM_XLOG_ID && record_info == XLOG_RESTORE_POINT) { @@ -5667,6 +5696,8 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis) recordRestorePointData = (xl_restore_point *) XLogRecGetData(record); recordXtime = recordRestorePointData->rp_time; strncpy(recordRPName, recordRestorePointData->rp_name, MAXFNAMELEN); + + *delayThis = SetRecoveryDelayUntilTime(recordRestorePointData->rp_time); } else return false; @@ -5833,6 +5864,66 @@ SetRecoveryPause(bool recoveryPause) SpinLockRelease(&xlogctl->info_lck); } +static bool +SetRecoveryDelayUntilTime(TimestampTz xtime) +{ + if (min_recovery_apply_delay != 0) + { + recoveryDelayUntilTime = + TimestampTzPlusMilliseconds(xtime, min_recovery_apply_delay); + + return true; + } + + return false; +} +/* + * When min_recovery_apply_delay is set, we wait long enough to make sure + * certain record types are applied at least that interval behind the master. + * See recoveryStopsHere(). + * + * Note that the delay is calculated between the WAL record log time and + * the current time on standby. We would prefer to keep track of when this + * standby received each WAL record, which would allow a more consistent + * approach and one not affected by time synchronisation issues, but that + * is significantly more effort and complexity for little actual gain in + * usability. + */ +static void +recoveryApplyDelay(void) +{ + while (true) + { + long secs; + int microsecs; + + ResetLatch(&XLogCtl->recoveryWakeupLatch); + + /* might change the trigger file's location */ + HandleStartupProcInterrupts(); + + if (CheckForStandbyTrigger()) + break; + + /* + * Wait for difference between GetCurrentTimestamp() and + * recoveryDelayUntilTime + */ + TimestampDifference(GetCurrentTimestamp(), recoveryDelayUntilTime, + &secs, µsecs); + + if (secs <= 0 && microsecs <=0) + break; + + elog(DEBUG2, "recovery apply delay %ld seconds, %d milliseconds", + secs, microsecs / 1000); + + WaitLatch(&XLogCtl->recoveryWakeupLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + secs * 1000L + microsecs / 1000); + } +} + /* * Save timestamp of latest processed commit/abort record. * @@ -6660,6 +6751,7 @@ StartupXLOG(void) { bool recoveryContinue = true; bool recoveryApply = true; + bool recoveryDelay = false; ErrorContextCallback errcallback; TimestampTz xtime; @@ -6719,7 +6811,7 @@ StartupXLOG(void) /* * Have we reached our recovery target? */ - if (recoveryStopsHere(record, &recoveryApply)) + if (recoveryStopsHere(record, &recoveryApply, &recoveryDelay)) { if (recoveryPauseAtTarget) { @@ -6734,6 +6826,25 @@ StartupXLOG(void) break; } + /* + * If we've been asked to lag the master, wait on + * latch until enough time has passed. + */ + if (recoveryDelay) + { + recoveryApplyDelay(); + + /* + * We test for paused recovery again here. If + * user sets delayed apply, it may be because + * they expect to pause recovery in case of + * problems, so we must test again here otherwise + * pausing during the delay-wait wouldn't work. + */ + if (xlogctl->recoveryPause) + recoveryPausesHere(); + } + /* Setup error traceback support for ereport() */ errcallback.callback = rm_redo_error_callback; errcallback.arg = (void *) record; |