@ -36,6 +36,7 @@
# include "access/xact.h"
# include "access/xlog_internal.h"
# include "access/xlogarchive.h"
# include "access/xlogprefetcher.h"
# include "access/xlogreader.h"
# include "access/xlogrecovery.h"
# include "access/xlogutils.h"
@ -183,6 +184,9 @@ static bool doRequestWalReceiverReply;
/* XLogReader object used to parse the WAL records */
static XLogReaderState * xlogreader = NULL ;
/* XLogPrefetcher object used to consume WAL records with read-ahead */
static XLogPrefetcher * xlogprefetcher = NULL ;
/* Parameters passed down from ReadRecord to the XLogPageRead callback. */
typedef struct XLogPageReadPrivate
{
@ -404,18 +408,21 @@ static void recoveryPausesHere(bool endOfRecovery);
static bool recoveryApplyDelay ( XLogReaderState * record ) ;
static void ConfirmRecoveryPaused ( void ) ;
static XLogRecord * ReadRecord ( XLogReaderState * xlogreader ,
int emode , bool fetching_ckpt , TimeLineID replayTLI ) ;
static XLogRecord * ReadRecord ( XLogPrefetcher * xlogprefetcher ,
int emode , bool fetching_ckpt ,
TimeLineID replayTLI ) ;
static int XLogPageRead ( XLogReaderState * xlogreader , XLogRecPtr targetPagePtr ,
int reqLen , XLogRecPtr targetRecPtr , char * readBuf ) ;
static bool WaitForWALToBecomeAvailable ( XLogRecPtr RecPtr , bool randAccess ,
bool fetching_ckpt ,
XLogRecPtr tliRecPtr ,
TimeLineID replayTLI ,
XLogRecPtr replayLSN ) ;
static XLogPageReadResult WaitForWALToBecomeAvailable ( XLogRecPtr RecPtr ,
bool randAccess ,
bool fetching_ckpt ,
XLogRecPtr tliRecPtr ,
TimeLineID replayTLI ,
XLogRecPtr replayLSN ,
bool nonblocking ) ;
static int emode_for_corrupt_record ( int emode , XLogRecPtr RecPtr ) ;
static XLogRecord * ReadCheckpointRecord ( XLogReaderState * xlogreader , XLogRecPtr RecPtr ,
static XLogRecord * ReadCheckpointRecord ( XLogPrefetcher * xlogprefetch er , XLogRecPtr RecPtr ,
int whichChkpt , bool report , TimeLineID replayTLI ) ;
static bool rescanLatestTimeLine ( TimeLineID replayTLI , XLogRecPtr replayLSN ) ;
static int XLogFileRead ( XLogSegNo segno , int emode , TimeLineID tli ,
@ -561,6 +568,15 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr,
errdetail ( " Failed while allocating a WAL reading processor. " ) ) ) ;
xlogreader - > system_identifier = ControlFile - > system_identifier ;
/*
* Set the WAL decode buffer size . This limits how far ahead we can read
* in the WAL .
*/
XLogReaderSetDecodeBuffer ( xlogreader , NULL , wal_decode_buffer_size ) ;
/* Create a WAL prefetcher. */
xlogprefetcher = XLogPrefetcherAllocate ( xlogreader ) ;
/*
* Allocate two page buffers dedicated to WAL consistency checks . We do
* it this way , rather than just making static arrays , for two reasons :
@ -589,7 +605,8 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr,
* When a backup_label file is present , we want to roll forward from
* the checkpoint it identifies , rather than using pg_control .
*/
record = ReadCheckpointRecord ( xlogreader , CheckPointLoc , 0 , true , CheckPointTLI ) ;
record = ReadCheckpointRecord ( xlogprefetcher , CheckPointLoc , 0 , true ,
CheckPointTLI ) ;
if ( record ! = NULL )
{
memcpy ( & checkPoint , XLogRecGetData ( xlogreader ) , sizeof ( CheckPoint ) ) ;
@ -607,8 +624,8 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr,
*/
if ( checkPoint . redo < CheckPointLoc )
{
XLogBeginRead ( xlogread er , checkPoint . redo ) ;
if ( ! ReadRecord ( xlogread er , LOG , false ,
XLogPrefetcher BeginRead ( xlogprefetch er , checkPoint . redo ) ;
if ( ! ReadRecord ( xlogprefetch er , LOG , false ,
checkPoint . ThisTimeLineID ) )
ereport ( FATAL ,
( errmsg ( " could not find redo location referenced by checkpoint record " ) ,
@ -727,7 +744,7 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr,
CheckPointTLI = ControlFile - > checkPointCopy . ThisTimeLineID ;
RedoStartLSN = ControlFile - > checkPointCopy . redo ;
RedoStartTLI = ControlFile - > checkPointCopy . ThisTimeLineID ;
record = ReadCheckpointRecord ( xlogread er , CheckPointLoc , 1 , true ,
record = ReadCheckpointRecord ( xlogprefetch er , CheckPointLoc , 1 , true ,
CheckPointTLI ) ;
if ( record ! = NULL )
{
@ -1413,8 +1430,8 @@ FinishWalRecovery(void)
lastRec = XLogRecoveryCtl - > lastReplayedReadRecPtr ;
lastRecTLI = XLogRecoveryCtl - > lastReplayedTLI ;
}
XLogBeginRead ( xlogread er , lastRec ) ;
( void ) ReadRecord ( xlogread er , PANIC , false , lastRecTLI ) ;
XLogPrefetcher BeginRead ( xlogprefetch er , lastRec ) ;
( void ) ReadRecord ( xlogprefetch er , PANIC , false , lastRecTLI ) ;
endOfLog = xlogreader - > EndRecPtr ;
/*
@ -1503,6 +1520,9 @@ ShutdownWalRecovery(void)
{
char recoveryPath [ MAXPGPATH ] ;
/* Final update of pg_stat_recovery_prefetch. */
XLogPrefetcherComputeStats ( xlogprefetcher ) ;
/* Shut down xlogreader */
if ( readFile > = 0 )
{
@ -1510,6 +1530,7 @@ ShutdownWalRecovery(void)
readFile = - 1 ;
}
XLogReaderFree ( xlogreader ) ;
XLogPrefetcherFree ( xlogprefetcher ) ;
if ( ArchiveRecoveryRequested )
{
@ -1593,15 +1614,15 @@ PerformWalRecovery(void)
{
/* back up to find the record */
replayTLI = RedoStartTLI ;
XLogBeginRead ( xlogread er , RedoStartLSN ) ;
record = ReadRecord ( xlogread er , PANIC , false , replayTLI ) ;
XLogPrefetcher BeginRead ( xlogprefetch er , RedoStartLSN ) ;
record = ReadRecord ( xlogprefetch er , PANIC , false , replayTLI ) ;
}
else
{
/* just have to read next record after CheckPoint */
Assert ( xlogreader - > ReadRecPtr = = CheckPointLoc ) ;
replayTLI = CheckPointTLI ;
record = ReadRecord ( xlogread er , LOG , false , replayTLI ) ;
record = ReadRecord ( xlogprefetch er , LOG , false , replayTLI ) ;
}
if ( record ! = NULL )
@ -1710,7 +1731,7 @@ PerformWalRecovery(void)
}
/* Else, try to fetch the next WAL record */
record = ReadRecord ( xlogread er , LOG , false , replayTLI ) ;
record = ReadRecord ( xlogprefetch er , LOG , false , replayTLI ) ;
} while ( record ! = NULL ) ;
/*
@ -1921,6 +1942,9 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
*/
if ( AllowCascadeReplication ( ) )
WalSndWakeup ( ) ;
/* Reset the prefetcher. */
XLogPrefetchReconfigure ( ) ;
}
}
@ -2305,7 +2329,8 @@ verifyBackupPageConsistency(XLogReaderState *record)
* temporary page .
*/
buf = XLogReadBufferExtended ( rnode , forknum , blkno ,
RBM_NORMAL_NO_LOG ) ;
RBM_NORMAL_NO_LOG ,
InvalidBuffer ) ;
if ( ! BufferIsValid ( buf ) )
continue ;
@ -2917,17 +2942,18 @@ ConfirmRecoveryPaused(void)
* Attempt to read the next XLOG record .
*
* Before first call , the reader needs to be positioned to the first record
* by calling XLogBeginRead ( ) .
* by calling XLogPrefetcher BeginRead ( ) .
*
* If no valid record is available , returns NULL , or fails if emode is PANIC .
* ( emode must be either PANIC , LOG ) . In standby mode , retries until a valid
* record is available .
*/
static XLogRecord *
ReadRecord ( XLogReaderState * xlogread er , int emode ,
ReadRecord ( XLogPrefetcher * xlogprefetch er , int emode ,
bool fetching_ckpt , TimeLineID replayTLI )
{
XLogRecord * record ;
XLogReaderState * xlogreader = XLogPrefetcherGetReader ( xlogprefetcher ) ;
XLogPageReadPrivate * private = ( XLogPageReadPrivate * ) xlogreader - > private_data ;
/* Pass through parameters to XLogPageRead */
@ -2943,7 +2969,7 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
{
char * errormsg ;
record = XLogReadRecord ( xlogread er , & errormsg ) ;
record = XLogPrefetcher ReadRecord ( xlogprefetch er , & errormsg ) ;
if ( record = = NULL )
{
/*
@ -3056,9 +3082,12 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
/*
* Read the XLOG page containing RecPtr into readBuf ( if not read already ) .
* Returns number of bytes read , if the page is read successfully , or - 1
* in case of errors . When errors occur , they are ereport ' ed , but only
* if they have not been previously reported .
* Returns number of bytes read , if the page is read successfully , or
* XLREAD_FAIL in case of errors . When errors occur , they are ereport ' ed , but
* only if they have not been previously reported .
*
* While prefetching , xlogreader - > nonblocking may be set . In that case ,
* returns XLREAD_WOULDBLOCK if we ' d otherwise have to wait for more WAL .
*
* This is responsible for restoring files from archive as needed , as well
* as for waiting for the requested WAL record to arrive in standby mode .
@ -3066,7 +3095,7 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
* ' emode ' specifies the log level used for reporting " file not found " or
* " end of WAL " situations in archive recovery , or in standby mode when a
* trigger file is found . If set to WARNING or below , XLogPageRead ( ) returns
* false in those situations , on higher log levels the ereport ( ) won ' t
* XLREAD_FAIL in those situations , on higher log levels the ereport ( ) won ' t
* return .
*
* In standby mode , if after a successful return of XLogPageRead ( ) the
@ -3125,20 +3154,31 @@ retry:
( readSource = = XLOG_FROM_STREAM & &
flushedUpto < targetPagePtr + reqLen ) )
{
if ( ! WaitForWALToBecomeAvailable ( targetPagePtr + reqLen ,
private - > randAccess ,
private - > fetching_ckpt ,
targetRecPtr ,
private - > replayTLI ,
xlogreader - > EndRecPtr ) )
if ( readFile > = 0 & &
xlogreader - > nonblocking & &
readSource = = XLOG_FROM_STREAM & &
flushedUpto < targetPagePtr + reqLen )
return XLREAD_WOULDBLOCK ;
switch ( WaitForWALToBecomeAvailable ( targetPagePtr + reqLen ,
private - > randAccess ,
private - > fetching_ckpt ,
targetRecPtr ,
private - > replayTLI ,
xlogreader - > EndRecPtr ,
xlogreader - > nonblocking ) )
{
if ( readFile > = 0 )
close ( readFile ) ;
readFile = - 1 ;
readLen = 0 ;
readSource = XLOG_FROM_ANY ;
return - 1 ;
case XLREAD_WOULDBLOCK :
return XLREAD_WOULDBLOCK ;
case XLREAD_FAIL :
if ( readFile > = 0 )
close ( readFile ) ;
readFile = - 1 ;
readLen = 0 ;
readSource = XLOG_FROM_ANY ;
return XLREAD_FAIL ;
case XLREAD_SUCCESS :
break ;
}
}
@ -3263,7 +3303,7 @@ next_record_is_invalid:
if ( StandbyMode )
goto retry ;
else
return - 1 ;
return XLREAD_FAIL ;
}
/*
@ -3292,14 +3332,18 @@ next_record_is_invalid:
* available .
*
* When the requested record becomes available , the function opens the file
* containing it ( if not open already ) , and returns true . When end of standby
* mode is triggered by the user , and there is no more WAL available , returns
* false .
* containing it ( if not open already ) , and returns XLREAD_SUCCESS . When end
* of standby mode is triggered by the user , and there is no more WAL
* available , returns XLREAD_FAIL .
*
* If nonblocking is true , then give up immediately if we can ' t satisfy the
* request , returning XLREAD_WOULDBLOCK instead of waiting .
*/
static bool
static XLogPageReadResult
WaitForWALToBecomeAvailable ( XLogRecPtr RecPtr , bool randAccess ,
bool fetching_ckpt , XLogRecPtr tliRecPtr ,
TimeLineID replayTLI , XLogRecPtr replayLSN )
TimeLineID replayTLI , XLogRecPtr replayLSN ,
bool nonblocking )
{
static TimestampTz last_fail_time = 0 ;
TimestampTz now ;
@ -3353,6 +3397,14 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
*/
if ( lastSourceFailed )
{
/*
* Don ' t allow any retry loops to occur during nonblocking
* readahead . Let the caller process everything that has been
* decoded already first .
*/
if ( nonblocking )
return XLREAD_WOULDBLOCK ;
switch ( currentSource )
{
case XLOG_FROM_ARCHIVE :
@ -3367,7 +3419,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
if ( StandbyMode & & CheckForStandbyTrigger ( ) )
{
XLogShutdownWalRcv ( ) ;
return false ;
return XLREAD_FAIL ;
}
/*
@ -3375,7 +3427,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
* and pg_wal .
*/
if ( ! StandbyMode )
return false ;
return XLREAD_FAIL ;
/*
* Move to XLOG_FROM_STREAM state , and set to start a
@ -3519,7 +3571,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
currentSource = = XLOG_FROM_ARCHIVE ? XLOG_FROM_ANY :
currentSource ) ;
if ( readFile > = 0 )
return true ; /* success! */
return XLREAD_SUCCESS ; /* success! */
/*
* Nope , not found in archive or pg_wal .
@ -3674,11 +3726,15 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
/* just make sure source info is correct... */
readSource = XLOG_FROM_STREAM ;
XLogReceiptSource = XLOG_FROM_STREAM ;
return true ;
return XLREAD_SUCCESS ;
}
break ;
}
/* In nonblocking mode, return rather than sleeping. */
if ( nonblocking )
return XLREAD_WOULDBLOCK ;
/*
* Data not here yet . Check for trigger , then wait for
* walreceiver to wake us up when new WAL arrives .
@ -3686,13 +3742,13 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
if ( CheckForStandbyTrigger ( ) )
{
/*
* Note that we don ' t " return false " immediately here .
* After being triggered , we still want to replay all
* the WAL that was already streamed . It ' s in pg_wal
* now , so we just treat this as a failure , and the
* state machine will move on to replay the streamed
* WAL from pg_wal , and then recheck the trigger and
* exit replay .
* Note that we don ' t return XLREAD_FAIL immediately
* here . After being triggered , we still want to
* replay all the WAL that was already streamed . It ' s
* in pg_wal now , so we just treat this as a failure ,
* and the state machine will move on to replay the
* streamed WAL from pg_wal , and then recheck the
* trigger and exit replay .
*/
lastSourceFailed = true ;
break ;
@ -3711,6 +3767,9 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
streaming_reply_sent = true ;
}
/* Update pg_stat_recovery_prefetch before sleeping. */
XLogPrefetcherComputeStats ( xlogprefetcher ) ;
/*
* Wait for more WAL to arrive . Time out after 5 seconds
* to react to a trigger file promptly and to check if the
@ -3743,7 +3802,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
HandleStartupProcInterrupts ( ) ;
}
return false ; /* not reached */
return XLREAD_FAIL ; /* not reached */
}
@ -3788,7 +3847,7 @@ emode_for_corrupt_record(int emode, XLogRecPtr RecPtr)
* 1 for " primary " , 0 for " other " ( backup_label )
*/
static XLogRecord *
ReadCheckpointRecord ( XLogReaderState * xlogread er , XLogRecPtr RecPtr ,
ReadCheckpointRecord ( XLogPrefetcher * xlogprefetch er , XLogRecPtr RecPtr ,
int whichChkpt , bool report , TimeLineID replayTLI )
{
XLogRecord * record ;
@ -3815,8 +3874,8 @@ ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr,
return NULL ;
}
XLogBeginRead ( xlogread er , RecPtr ) ;
record = ReadRecord ( xlogread er , LOG , true , replayTLI ) ;
XLogPrefetcher BeginRead ( xlogprefetch er , RecPtr ) ;
record = ReadRecord ( xlogprefetch er , LOG , true , replayTLI ) ;
if ( record = = NULL )
{