@ -218,6 +218,8 @@ static bool recoveryPauseAtTarget = true;
static TransactionId recoveryTargetXid ;
static TimestampTz recoveryTargetTime ;
static char * recoveryTargetName ;
static int min_recovery_apply_delay = 0 ;
static TimestampTz recoveryDelayUntilTime ;
/* options taken from recovery.conf for XLOG streaming */
static bool StandbyModeRequested = false ;
@ -728,8 +730,10 @@ static bool holdingAllSlots = false;
static void readRecoveryCommandFile ( void ) ;
static void exitArchiveRecovery ( TimeLineID endTLI , XLogSegNo endLogSegNo ) ;
static bool recoveryStopsHere ( XLogRecord * record , bool * includeThis ) ;
static bool recoveryStopsHere ( XLogRecord * record , bool * includeThis , bool * delayThis ) ;
static void recoveryPausesHere ( void ) ;
static void recoveryApplyDelay ( void ) ;
static bool SetRecoveryDelayUntilTime ( TimestampTz xtime ) ;
static void SetLatestXTime ( TimestampTz xtime ) ;
static void SetCurrentChunkStartTime ( TimestampTz xtime ) ;
static void CheckRequiredParameterValues ( void ) ;
@ -5476,6 +5480,19 @@ readRecoveryCommandFile(void)
( errmsg_internal ( " trigger_file = '%s' " ,
TriggerFile ) ) ) ;
}
else if ( strcmp ( item - > name , " min_recovery_apply_delay " ) = = 0 )
{
const char * hintmsg ;
if ( ! parse_int ( item - > value , & min_recovery_apply_delay , GUC_UNIT_MS ,
& hintmsg ) )
ereport ( ERROR ,
( errcode ( ERRCODE_INVALID_PARAMETER_VALUE ) ,
errmsg ( " parameter \" %s \" requires a temporal value " , " min_recovery_apply_delay " ) ,
hintmsg ? errhint ( " %s " , _ ( hintmsg ) ) : 0 ) ) ;
ereport ( DEBUG2 ,
( errmsg ( " min_recovery_apply_delay = '%s' " , item - > value ) ) ) ;
}
else
ereport ( FATAL ,
( errmsg ( " unrecognized recovery parameter \" %s \" " ,
@ -5625,10 +5642,11 @@ exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo)
* We also track the timestamp of the latest applied COMMIT / ABORT
* record in XLogCtl - > recoveryLastXTime , for logging purposes .
* Also , some information is saved in recoveryStopXid et al for use in
* annotating the new timeline ' s history file .
* annotating the new timeline ' s history file ; and recoveryDelayUntilTime
* is updated , for time - delayed standbys .
*/
static bool
recoveryStopsHere ( XLogRecord * record , bool * includeThis )
recoveryStopsHere ( XLogRecord * record , bool * includeThis , bool * delayThis )
{
bool stopsHere ;
uint8 record_info ;
@ -5645,6 +5663,8 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis)
recordXactCommitData = ( xl_xact_commit_compact * ) XLogRecGetData ( record ) ;
recordXtime = recordXactCommitData - > xact_time ;
* delayThis = SetRecoveryDelayUntilTime ( recordXactCommitData - > xact_time ) ;
}
else if ( record - > xl_rmid = = RM_XACT_ID & & record_info = = XLOG_XACT_COMMIT )
{
@ -5652,6 +5672,8 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis)
recordXactCommitData = ( xl_xact_commit * ) XLogRecGetData ( record ) ;
recordXtime = recordXactCommitData - > xact_time ;
* delayThis = SetRecoveryDelayUntilTime ( recordXactCommitData - > xact_time ) ;
}
else if ( record - > xl_rmid = = RM_XACT_ID & & record_info = = XLOG_XACT_ABORT )
{
@ -5659,6 +5681,13 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis)
recordXactAbortData = ( xl_xact_abort * ) XLogRecGetData ( record ) ;
recordXtime = recordXactAbortData - > xact_time ;
/*
* We deliberately choose not to delay aborts since they have no
* effect on MVCC . We already allow replay of records that don ' t
* have a timestamp , so there is already opportunity for issues
* caused by early conflicts on standbys .
*/
}
else if ( record - > xl_rmid = = RM_XLOG_ID & & record_info = = XLOG_RESTORE_POINT )
{
@ -5667,6 +5696,8 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis)
recordRestorePointData = ( xl_restore_point * ) XLogRecGetData ( record ) ;
recordXtime = recordRestorePointData - > rp_time ;
strncpy ( recordRPName , recordRestorePointData - > rp_name , MAXFNAMELEN ) ;
* delayThis = SetRecoveryDelayUntilTime ( recordRestorePointData - > rp_time ) ;
}
else
return false ;
@ -5833,6 +5864,66 @@ SetRecoveryPause(bool recoveryPause)
SpinLockRelease ( & xlogctl - > info_lck ) ;
}
static bool
SetRecoveryDelayUntilTime ( TimestampTz xtime )
{
if ( min_recovery_apply_delay ! = 0 )
{
recoveryDelayUntilTime =
TimestampTzPlusMilliseconds ( xtime , min_recovery_apply_delay ) ;
return true ;
}
return false ;
}
/*
* When min_recovery_apply_delay is set , we wait long enough to make sure
* certain record types are applied at least that interval behind the master .
* See recoveryStopsHere ( ) .
*
* Note that the delay is calculated between the WAL record log time and
* the current time on standby . We would prefer to keep track of when this
* standby received each WAL record , which would allow a more consistent
* approach and one not affected by time synchronisation issues , but that
* is significantly more effort and complexity for little actual gain in
* usability .
*/
static void
recoveryApplyDelay ( void )
{
while ( true )
{
long secs ;
int microsecs ;
ResetLatch ( & XLogCtl - > recoveryWakeupLatch ) ;
/* might change the trigger file's location */
HandleStartupProcInterrupts ( ) ;
if ( CheckForStandbyTrigger ( ) )
break ;
/*
* Wait for difference between GetCurrentTimestamp ( ) and
* recoveryDelayUntilTime
*/
TimestampDifference ( GetCurrentTimestamp ( ) , recoveryDelayUntilTime ,
& secs , & microsecs ) ;
if ( secs < = 0 & & microsecs < = 0 )
break ;
elog ( DEBUG2 , " recovery apply delay %ld seconds, %d milliseconds " ,
secs , microsecs / 1000 ) ;
WaitLatch ( & XLogCtl - > recoveryWakeupLatch ,
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH ,
secs * 1000L + microsecs / 1000 ) ;
}
}
/*
* Save timestamp of latest processed commit / abort record .
*
@ -6660,6 +6751,7 @@ StartupXLOG(void)
{
bool recoveryContinue = true ;
bool recoveryApply = true ;
bool recoveryDelay = false ;
ErrorContextCallback errcallback ;
TimestampTz xtime ;
@ -6719,7 +6811,7 @@ StartupXLOG(void)
/*
* Have we reached our recovery target ?
*/
if ( recoveryStopsHere ( record , & recoveryApply ) )
if ( recoveryStopsHere ( record , & recoveryApply , & recoveryDelay ) )
{
if ( recoveryPauseAtTarget )
{
@ -6734,6 +6826,25 @@ StartupXLOG(void)
break ;
}
/*
* If we ' ve been asked to lag the master , wait on
* latch until enough time has passed .
*/
if ( recoveryDelay )
{
recoveryApplyDelay ( ) ;
/*
* We test for paused recovery again here . If
* user sets delayed apply , it may be because
* they expect to pause recovery in case of
* problems , so we must test again here otherwise
* pausing during the delay - wait wouldn ' t work .
*/
if ( xlogctl - > recoveryPause )
recoveryPausesHere ( ) ;
}
/* Setup error traceback support for ereport() */
errcallback . callback = rm_redo_error_callback ;
errcallback . arg = ( void * ) record ;