@ -248,8 +248,9 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
static TimeOffset LagTrackerRead ( int head , XLogRecPtr lsn , TimestampTz now ) ;
static bool TransactionIdInRecentPast ( TransactionId xid , uint32 epoch ) ;
static int WalSndSegmentOpen ( XLogSegNo nextSegNo , WALSegmentContext * segcxt ,
TimeLineID * tli_p ) ;
static void UpdateSpillStats ( LogicalDecodingContext * ctx ) ;
static void XLogRead ( WALSegmentContext * segcxt , char * buf , XLogRecPtr startptr , Size count ) ;
/* Initialize walsender process before entering the main command loop */
@ -767,6 +768,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
{
XLogRecPtr flushptr ;
int count ;
WALReadError errinfo ;
XLogSegNo segno ;
XLogReadDetermineTimeline ( state , targetPagePtr , reqLen ) ;
sendTimeLineIsHistoric = ( state - > currTLI ! = ThisTimeLineID ) ;
@ -787,7 +790,27 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
count = flushptr - targetPagePtr ; /* part of the page available */
/* now actually read the data, we know it's there */
XLogRead ( sendCxt , cur_page , targetPagePtr , XLOG_BLCKSZ ) ;
if ( ! WALRead ( cur_page ,
targetPagePtr ,
XLOG_BLCKSZ ,
sendSeg - > ws_tli , /* Pass the current TLI because only
* WalSndSegmentOpen controls whether new
* TLI is needed . */
sendSeg ,
sendCxt ,
WalSndSegmentOpen ,
& errinfo ) )
WALReadRaiseError ( & errinfo ) ;
/*
* After reading into the buffer , check that what we read was valid . We do
* this after reading , because even though the segment was present when we
* opened it , it might get recycled or removed while we read it . The
* read ( ) succeeds in that case , but the data we tried to read might
* already have been overwritten with new WAL records .
*/
XLByteToSeg ( targetPagePtr , segno , sendCxt - > ws_segsize ) ;
CheckXLogRemoved ( segno , sendSeg - > ws_tli ) ;
return count ;
}
@ -2360,189 +2383,68 @@ WalSndKill(int code, Datum arg)
SpinLockRelease ( & walsnd - > mutex ) ;
}
/*
* Read ' count ' bytes from WAL into ' buf ' , starting at location ' startptr '
*
* XXX probably this should be improved to suck data directly from the
* WAL buffers when possible .
*
* Will open , and keep open , one WAL segment stored in the global file
* descriptor sendFile . This means if XLogRead is used once , there will
* always be one descriptor left open until the process ends , but never
* more than one .
*/
static void
XLogRead ( WALSegmentContext * segcxt , char * buf , XLogRecPtr startptr , Size count )
/* walsender's openSegment callback for WALRead */
static int
WalSndSegmentOpen ( XLogSegNo nextSegNo , WALSegmentContext * segcxt ,
TimeLineID * tli_p )
{
char * p ;
XLogRecPtr recptr ;
Size nbytes ;
XLogSegNo segno ;
retry :
p = buf ;
recptr = startptr ;
nbytes = count ;
char path [ MAXPGPATH ] ;
int fd ;
while ( nbytes > 0 )
/*-------
* When reading from a historic timeline , and there is a timeline switch
* within this segment , read from the WAL segment belonging to the new
* timeline .
*
* For example , imagine that this server is currently on timeline 5 , and
* we ' re streaming timeline 4. The switch from timeline 4 to 5 happened at
* 0 / 13002088. In pg_wal , we have these files :
*
* . . .
* 000000040000000000000012
* 000000040000000000000013
* 000000050000000000000013
* 000000050000000000000014
* . . .
*
* In this situation , when requested to send the WAL from segment 0x13 , on
* timeline 4 , we read the WAL from file 000000050000000000000013. Archive
* recovery prefers files from newer timelines , so if the segment was
* restored from the archive on this server , the file belonging to the old
* timeline , 000000040000000000000013 , might not exist . Their contents are
* equal up to the switchpoint , because at a timeline switch , the used
* portion of the old segment is copied to the new file . - - - - - - -
*/
* tli_p = sendTimeLine ;
if ( sendTimeLineIsHistoric )
{
uint32 startoff ;
int segbytes ;
int readbytes ;
startoff = XLogSegmentOffset ( recptr , segcxt - > ws_segsize ) ;
if ( sendSeg - > ws_file < 0 | |
! XLByteInSeg ( recptr , sendSeg - > ws_segno , segcxt - > ws_segsize ) )
{
char path [ MAXPGPATH ] ;
/* Switch to another logfile segment */
if ( sendSeg - > ws_file > = 0 )
close ( sendSeg - > ws_file ) ;
XLByteToSeg ( recptr , sendSeg - > ws_segno , segcxt - > ws_segsize ) ;
/*-------
* When reading from a historic timeline , and there is a timeline
* switch within this segment , read from the WAL segment belonging
* to the new timeline .
*
* For example , imagine that this server is currently on timeline
* 5 , and we ' re streaming timeline 4. The switch from timeline 4
* to 5 happened at 0 / 13002088. In pg_wal , we have these files :
*
* . . .
* 000000040000000000000012
* 000000040000000000000013
* 000000050000000000000013
* 000000050000000000000014
* . . .
*
* In this situation , when requested to send the WAL from
* segment 0x13 , on timeline 4 , we read the WAL from file
* 000000050000000000000013. Archive recovery prefers files from
* newer timelines , so if the segment was restored from the
* archive on this server , the file belonging to the old timeline ,
* 000000040000000000000013 , might not exist . Their contents are
* equal up to the switchpoint , because at a timeline switch , the
* used portion of the old segment is copied to the new file .
* - - - - - - -
*/
sendSeg - > ws_tli = sendTimeLine ;
if ( sendTimeLineIsHistoric )
{
XLogSegNo endSegNo ;
XLByteToSeg ( sendTimeLineValidUpto , endSegNo , segcxt - > ws_segsize ) ;
if ( sendSeg - > ws_segno = = endSegNo )
sendSeg - > ws_tli = sendTimeLineNextTLI ;
}
XLogFilePath ( path , sendSeg - > ws_tli , sendSeg - > ws_segno , segcxt - > ws_segsize ) ;
sendSeg - > ws_file = BasicOpenFile ( path , O_RDONLY | PG_BINARY ) ;
if ( sendSeg - > ws_file < 0 )
{
/*
* If the file is not found , assume it ' s because the standby
* asked for a too old WAL segment that has already been
* removed or recycled .
*/
if ( errno = = ENOENT )
ereport ( ERROR ,
( errcode_for_file_access ( ) ,
errmsg ( " requested WAL segment %s has already been removed " ,
XLogFileNameP ( sendSeg - > ws_tli , sendSeg - > ws_segno ) ) ) ) ;
else
ereport ( ERROR ,
( errcode_for_file_access ( ) ,
errmsg ( " could not open file \" %s \" : %m " ,
path ) ) ) ;
}
sendSeg - > ws_off = 0 ;
}
/* Need to seek in the file? */
if ( sendSeg - > ws_off ! = startoff )
{
if ( lseek ( sendSeg - > ws_file , ( off_t ) startoff , SEEK_SET ) < 0 )
ereport ( ERROR ,
( errcode_for_file_access ( ) ,
errmsg ( " could not seek in log segment %s to offset %u: %m " ,
XLogFileNameP ( sendSeg - > ws_tli , sendSeg - > ws_segno ) ,
startoff ) ) ) ;
sendSeg - > ws_off = startoff ;
}
/* How many bytes are within this segment? */
if ( nbytes > ( segcxt - > ws_segsize - startoff ) )
segbytes = segcxt - > ws_segsize - startoff ;
else
segbytes = nbytes ;
pgstat_report_wait_start ( WAIT_EVENT_WAL_READ ) ;
readbytes = read ( sendSeg - > ws_file , p , segbytes ) ;
pgstat_report_wait_end ( ) ;
if ( readbytes < 0 )
{
ereport ( ERROR ,
( errcode_for_file_access ( ) ,
errmsg ( " could not read from log segment %s, offset %u, length %zu: %m " ,
XLogFileNameP ( sendSeg - > ws_tli , sendSeg - > ws_segno ) ,
sendSeg - > ws_off , ( Size ) segbytes ) ) ) ;
}
else if ( readbytes = = 0 )
{
ereport ( ERROR ,
( errcode ( ERRCODE_DATA_CORRUPTED ) ,
errmsg ( " could not read from log segment %s, offset %u: read %d of %zu " ,
XLogFileNameP ( sendSeg - > ws_tli , sendSeg - > ws_segno ) ,
sendSeg - > ws_off , readbytes , ( Size ) segbytes ) ) ) ;
}
XLogSegNo endSegNo ;
/* Update state for read */
recptr + = readbytes ;
sendSeg - > ws_off + = readbytes ;
nbytes - = readbytes ;
p + = readbytes ;
XLByteToSeg ( sendTimeLineValidUpto , endSegNo , segcxt - > ws_segsize ) ;
if ( sendSeg - > ws_segno = = endSegNo )
* tli_p = sendTimeLineNextTLI ;
}
/*
* After reading into the buffer , check that what we read was valid . We do
* this after reading , because even though the segment was present when we
* opened it , it might get recycled or removed while we read it . The
* read ( ) succeeds in that case , but the data we tried to read might
* already have been overwritten with new WAL records .
*/
XLByteToSeg ( startptr , segno , segcxt - > ws_segsize ) ;
CheckXLogRemoved ( segno , ThisTimeLineID ) ;
XLogFilePath ( path , * tli_p , nextSegNo , segcxt - > ws_segsize ) ;
fd = BasicOpenFile ( path , O_RDONLY | PG_BINARY ) ;
if ( fd > = 0 )
return fd ;
/*
* During recovery , the currently - open WAL file might be replaced with the
* file of the same name retrieved from archive . So we always need to
* check what we read was valid after reading into the buffer . If it ' s
* invalid , we try to open and read the file again .
* If the file is not found , assume it ' s because the standby asked for a
* too old WAL segment that has already been removed or recycled .
*/
if ( am_cascading_walsender )
{
WalSnd * walsnd = MyWalSnd ;
bool reload ;
SpinLockAcquire ( & walsnd - > mutex ) ;
reload = walsnd - > needreload ;
walsnd - > needreload = false ;
SpinLockRelease ( & walsnd - > mutex ) ;
if ( reload & & sendSeg - > ws_file > = 0 )
{
close ( sendSeg - > ws_file ) ;
sendSeg - > ws_file = - 1 ;
goto retry ;
}
}
if ( errno = = ENOENT )
ereport ( ERROR ,
( errcode_for_file_access ( ) ,
errmsg ( " requested WAL segment %s has already been removed " ,
XLogFileNameP ( * tli_p , nextSegNo ) ) ) ) ;
else
ereport ( ERROR ,
( errcode_for_file_access ( ) ,
errmsg ( " could not open file \" %s \" : %m " ,
path ) ) ) ;
return - 1 ; /* keep compiler quiet */
}
/*
@ -2562,6 +2464,8 @@ XLogSendPhysical(void)
XLogRecPtr startptr ;
XLogRecPtr endptr ;
Size nbytes ;
XLogSegNo segno ;
WALReadError errinfo ;
/* If requested switch the WAL sender to the stopping state. */
if ( got_STOPPING )
@ -2777,7 +2681,49 @@ XLogSendPhysical(void)
* calls .
*/
enlargeStringInfo ( & output_message , nbytes ) ;
XLogRead ( sendCxt , & output_message . data [ output_message . len ] , startptr , nbytes ) ;
retry :
if ( ! WALRead ( & output_message . data [ output_message . len ] ,
startptr ,
nbytes ,
sendSeg - > ws_tli , /* Pass the current TLI because only
* WalSndSegmentOpen controls whether new
* TLI is needed . */
sendSeg ,
sendCxt ,
WalSndSegmentOpen ,
& errinfo ) )
WALReadRaiseError ( & errinfo ) ;
/* See logical_read_xlog_page(). */
XLByteToSeg ( startptr , segno , sendCxt - > ws_segsize ) ;
CheckXLogRemoved ( segno , sendSeg - > ws_tli ) ;
/*
* During recovery , the currently - open WAL file might be replaced with the
* file of the same name retrieved from archive . So we always need to
* check what we read was valid after reading into the buffer . If it ' s
* invalid , we try to open and read the file again .
*/
if ( am_cascading_walsender )
{
WalSnd * walsnd = MyWalSnd ;
bool reload ;
SpinLockAcquire ( & walsnd - > mutex ) ;
reload = walsnd - > needreload ;
walsnd - > needreload = false ;
SpinLockRelease ( & walsnd - > mutex ) ;
if ( reload & & sendSeg - > ws_file > = 0 )
{
close ( sendSeg - > ws_file ) ;
sendSeg - > ws_file = - 1 ;
goto retry ;
}
}
output_message . len + = nbytes ;
output_message . data [ output_message . len ] = ' \0 ' ;