@ -128,16 +128,8 @@ bool log_replication_commands = false;
*/
bool wake_wal_senders = false ;
/*
* These variables are used similarly to openLogFile / SegNo / Off ,
* but for walsender to read the XLOG .
*/
static int sendFile = - 1 ;
static XLogSegNo sendSegNo = 0 ;
static uint32 sendOff = 0 ;
/* Timeline ID of the currently open file */
static TimeLineID curFileTimeLine = 0 ;
static WALOpenSegment * sendSeg = NULL ;
static WALSegmentContext * sendCxt = NULL ;
/*
* These variables keep track of the state of the timeline we ' re currently
@ -256,7 +248,7 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
static TimeOffset LagTrackerRead ( int head , XLogRecPtr lsn , TimestampTz now ) ;
static bool TransactionIdInRecentPast ( TransactionId xid , uint32 epoch ) ;
static void XLogRead ( char * buf , XLogRecPtr startptr , Size count ) ;
static void XLogRead ( WALSegmentContext * segcxt , char * buf , XLogRecPtr startptr , Size count ) ;
/* Initialize walsender process before entering the main command loop */
@ -285,6 +277,13 @@ InitWalSender(void)
/* Initialize empty timestamp buffer for lag tracking. */
lag_tracker = MemoryContextAllocZero ( TopMemoryContext , sizeof ( LagTracker ) ) ;
/* Make sure we can remember the current read position in XLOG. */
sendSeg = ( WALOpenSegment * )
MemoryContextAlloc ( TopMemoryContext , sizeof ( WALOpenSegment ) ) ;
sendCxt = ( WALSegmentContext * )
MemoryContextAlloc ( TopMemoryContext , sizeof ( WALSegmentContext ) ) ;
WALOpenSegmentInit ( sendSeg , sendCxt , wal_segment_size , NULL ) ;
}
/*
@ -301,10 +300,10 @@ WalSndErrorCleanup(void)
ConditionVariableCancelSleep ( ) ;
pgstat_report_wait_end ( ) ;
if ( sendF ile > = 0 )
if ( sendSeg - > ws_f ile > = 0 )
{
close ( sendF ile ) ;
sendF ile = - 1 ;
close ( sendSeg - > ws_f ile ) ;
sendSeg - > ws_f ile = - 1 ;
}
if ( MyReplicationSlot ! = NULL )
@ -763,7 +762,7 @@ StartReplication(StartReplicationCmd *cmd)
*/
static int
logical_read_xlog_page ( XLogReaderState * state , XLogRecPtr targetPagePtr , int reqLen ,
XLogRecPtr targetRecPtr , char * cur_page , TimeLineID * pageTLI )
XLogRecPtr targetRecPtr , char * cur_page )
{
XLogRecPtr flushptr ;
int count ;
@ -787,7 +786,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
count = flushptr - targetPagePtr ; /* part of the page available */
/* now actually read the data, we know it's there */
XLogRead ( cur_page , targetPagePtr , XLOG_BLCKSZ ) ;
XLogRead ( sendCxt , cur_page , targetPagePtr , XLOG_BLCKSZ ) ;
return count ;
}
@ -2364,7 +2363,7 @@ WalSndKill(int code, Datum arg)
* more than one .
*/
static void
XLogRead ( char * buf , XLogRecPtr startptr , Size count )
XLogRead ( WALSegmentContext * segcxt , char * buf , XLogRecPtr startptr , Size count )
{
char * p ;
XLogRecPtr recptr ;
@ -2382,17 +2381,18 @@ retry:
int segbytes ;
int readbytes ;
startoff = XLogSegmentOffset ( recptr , wal_segment_ size) ;
startoff = XLogSegmentOffset ( recptr , segcxt - > ws_seg size) ;
if ( sendFile < 0 | | ! XLByteInSeg ( recptr , sendSegNo , wal_segment_size ) )
if ( sendSeg - > ws_file < 0 | |
! XLByteInSeg ( recptr , sendSeg - > ws_segno , segcxt - > ws_segsize ) )
{
char path [ MAXPGPATH ] ;
/* Switch to another logfile segment */
if ( sendF ile > = 0 )
close ( sendF ile ) ;
if ( sendSeg - > ws_f ile > = 0 )
close ( sendSeg - > ws_f ile ) ;
XLByteToSeg ( recptr , sendSegNo , wal_segment_ size ) ;
XLByteToSeg ( recptr , sendSeg - > ws_segno , segcxt - > ws_seg size) ;
/*-------
* When reading from a historic timeline , and there is a timeline
@ -2420,20 +2420,20 @@ retry:
* used portion of the old segment is copied to the new file .
* - - - - - - -
*/
curFileTimeLine = sendTimeLine ;
sendSeg - > ws_tli = sendTimeLine ;
if ( sendTimeLineIsHistoric )
{
XLogSegNo endSegNo ;
XLByteToSeg ( sendTimeLineValidUpto , endSegNo , wal_segment_ size) ;
if ( sendSegN o = = endSegNo )
curFileTimeLine = sendTimeLineNextTLI ;
XLByteToSeg ( sendTimeLineValidUpto , endSegNo , segcxt - > ws_seg size) ;
if ( sendSeg - > ws_segn o = = endSegNo )
sendSeg - > ws_tli = sendTimeLineNextTLI ;
}
XLogFilePath ( path , curFileTimeLine , sendSegNo , wal_segment_ size) ;
XLogFilePath ( path , sendSeg - > ws_tli , sendSeg - > ws_segno , segcxt - > ws_seg size) ;
sendF ile = BasicOpenFile ( path , O_RDONLY | PG_BINARY ) ;
if ( sendF ile < 0 )
sendSeg - > ws_f ile = BasicOpenFile ( path , O_RDONLY | PG_BINARY ) ;
if ( sendSeg - > ws_f ile < 0 )
{
/*
* If the file is not found , assume it ' s because the standby
@ -2444,58 +2444,58 @@ retry:
ereport ( ERROR ,
( errcode_for_file_access ( ) ,
errmsg ( " requested WAL segment %s has already been removed " ,
XLogFileNameP ( curFileTimeLine , sendSegN o) ) ) ) ;
XLogFileNameP ( sendSeg - > ws_tli , sendSeg - > ws_segn o) ) ) ) ;
else
ereport ( ERROR ,
( errcode_for_file_access ( ) ,
errmsg ( " could not open file \" %s \" : %m " ,
path ) ) ) ;
}
sendO ff = 0 ;
sendSeg - > ws_o ff = 0 ;
}
/* Need to seek in the file? */
if ( sendO ff ! = startoff )
if ( sendSeg - > ws_o ff ! = startoff )
{
if ( lseek ( sendF ile , ( off_t ) startoff , SEEK_SET ) < 0 )
if ( lseek ( sendSeg - > ws_f ile , ( off_t ) startoff , SEEK_SET ) < 0 )
ereport ( ERROR ,
( errcode_for_file_access ( ) ,
errmsg ( " could not seek in log segment %s to offset %u: %m " ,
XLogFileNameP ( curFileTimeLine , sendSegN o) ,
XLogFileNameP ( sendSeg - > ws_tli , sendSeg - > ws_segn o) ,
startoff ) ) ) ;
sendO ff = startoff ;
sendSeg - > ws_o ff = startoff ;
}
/* How many bytes are within this segment? */
if ( nbytes > ( wal_segment_ size - startoff ) )
segbytes = wal_segment_ size - startoff ;
if ( nbytes > ( segcxt - > ws_seg size - startoff ) )
segbytes = segcxt - > ws_seg size - startoff ;
else
segbytes = nbytes ;
pgstat_report_wait_start ( WAIT_EVENT_WAL_READ ) ;
readbytes = read ( sendF ile , p , segbytes ) ;
readbytes = read ( sendSeg - > ws_f ile , p , segbytes ) ;
pgstat_report_wait_end ( ) ;
if ( readbytes < 0 )
{
ereport ( ERROR ,
( errcode_for_file_access ( ) ,
errmsg ( " could not read from log segment %s, offset %u, length %zu: %m " ,
XLogFileNameP ( curFileTimeLine , sendSegN o) ,
sendO ff , ( Size ) segbytes ) ) ) ;
XLogFileNameP ( sendSeg - > ws_tli , sendSeg - > ws_segn o) ,
sendSeg - > ws_o ff , ( Size ) segbytes ) ) ) ;
}
else if ( readbytes = = 0 )
{
ereport ( ERROR ,
( errcode ( ERRCODE_DATA_CORRUPTED ) ,
errmsg ( " could not read from log segment %s, offset %u: read %d of %zu " ,
XLogFileNameP ( curFileTimeLine , sendSegN o) ,
sendO ff , readbytes , ( Size ) segbytes ) ) ) ;
XLogFileNameP ( sendSeg - > ws_tli , sendSeg - > ws_segn o) ,
sendSeg - > ws_o ff , readbytes , ( Size ) segbytes ) ) ) ;
}
/* Update state for read */
recptr + = readbytes ;
sendO ff + = readbytes ;
sendSeg - > ws_o ff + = readbytes ;
nbytes - = readbytes ;
p + = readbytes ;
}
@ -2507,7 +2507,7 @@ retry:
* read ( ) succeeds in that case , but the data we tried to read might
* already have been overwritten with new WAL records .
*/
XLByteToSeg ( startptr , segno , wal_segment_ size) ;
XLByteToSeg ( startptr , segno , segcxt - > ws_seg size) ;
CheckXLogRemoved ( segno , ThisTimeLineID ) ;
/*
@ -2526,10 +2526,10 @@ retry:
walsnd - > needreload = false ;
SpinLockRelease ( & walsnd - > mutex ) ;
if ( reload & & sendF ile > = 0 )
if ( reload & & sendSeg - > ws_f ile > = 0 )
{
close ( sendF ile ) ;
sendF ile = - 1 ;
close ( sendSeg - > ws_f ile ) ;
sendSeg - > ws_f ile = - 1 ;
goto retry ;
}
@ -2695,9 +2695,9 @@ XLogSendPhysical(void)
if ( sendTimeLineIsHistoric & & sendTimeLineValidUpto < = sentPtr )
{
/* close the current file. */
if ( sendF ile > = 0 )
close ( sendF ile ) ;
sendF ile = - 1 ;
if ( sendSeg - > ws_f ile > = 0 )
close ( sendSeg - > ws_f ile ) ;
sendSeg - > ws_f ile = - 1 ;
/* Send CopyDone */
pq_putmessage_noblock ( ' c ' , NULL , 0 ) ;
@ -2768,7 +2768,7 @@ XLogSendPhysical(void)
* calls .
*/
enlargeStringInfo ( & output_message , nbytes ) ;
XLogRead ( & output_message . data [ output_message . len ] , startptr , nbytes ) ;
XLogRead ( sendCxt , & output_message . data [ output_message . len ] , startptr , nbytes ) ;
output_message . len + = nbytes ;
output_message . data [ output_message . len ] = ' \0 ' ;