@ -125,6 +125,7 @@ static void WalRcvDie(int code, Datum arg);
static void XLogWalRcvProcessMsg ( unsigned char type , char * buf , Size len ) ;
static void XLogWalRcvProcessMsg ( unsigned char type , char * buf , Size len ) ;
static void XLogWalRcvWrite ( char * buf , Size nbytes , XLogRecPtr recptr ) ;
static void XLogWalRcvWrite ( char * buf , Size nbytes , XLogRecPtr recptr ) ;
static void XLogWalRcvFlush ( bool dying ) ;
static void XLogWalRcvFlush ( bool dying ) ;
static void XLogWalRcvClose ( XLogRecPtr recptr ) ;
static void XLogWalRcvSendReply ( bool force , bool requestReply ) ;
static void XLogWalRcvSendReply ( bool force , bool requestReply ) ;
static void XLogWalRcvSendHSFeedback ( bool immed ) ;
static void XLogWalRcvSendHSFeedback ( bool immed ) ;
static void ProcessWalSndrMessage ( XLogRecPtr walEnd , TimestampTz sendTime ) ;
static void ProcessWalSndrMessage ( XLogRecPtr walEnd , TimestampTz sendTime ) ;
@ -883,42 +884,12 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
{
{
int segbytes ;
int segbytes ;
if ( recvFile < 0 | | ! XLByteInSeg ( recptr , recvSegNo , wal_segment_size ) )
/* Close the current segment if it's completed */
{
if ( recvFile > = 0 & & ! XLByteInSeg ( recptr , recvSegNo , wal_segment_size ) )
/*
XLogWalRcvClose ( recptr ) ;
* fsync ( ) and close current file before we switch to next one . We
* would otherwise have to reopen this file to fsync it later
*/
if ( recvFile > = 0 )
{
char xlogfname [ MAXFNAMELEN ] ;
XLogWalRcvFlush ( false ) ;
XLogFileName ( xlogfname , recvFileTLI , recvSegNo , wal_segment_size ) ;
/*
* XLOG segment files will be re - read by recovery in startup
* process soon , so we don ' t advise the OS to release cache
* pages associated with the file like XLogFileClose ( ) does .
*/
if ( close ( recvFile ) ! = 0 )
ereport ( PANIC ,
( errcode_for_file_access ( ) ,
errmsg ( " could not close log segment %s: %m " ,
xlogfname ) ) ) ;
/*
* Create . done file forcibly to prevent the streamed segment
* from being archived later .
*/
if ( XLogArchiveMode ! = ARCHIVE_MODE_ALWAYS )
XLogArchiveForceDone ( xlogfname ) ;
else
XLogArchiveNotify ( xlogfname ) ;
}
recvFile = - 1 ;
if ( recvFile < 0 )
{
/* Create/use new log file */
/* Create/use new log file */
XLByteToSeg ( recptr , recvSegNo , wal_segment_size ) ;
XLByteToSeg ( recptr , recvSegNo , wal_segment_size ) ;
recvFile = XLogFileInit ( recvSegNo ) ;
recvFile = XLogFileInit ( recvSegNo ) ;
@ -967,6 +938,15 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
/* Update shared-memory status */
/* Update shared-memory status */
pg_atomic_write_u64 ( & WalRcv - > writtenUpto , LogstreamResult . Write ) ;
pg_atomic_write_u64 ( & WalRcv - > writtenUpto , LogstreamResult . Write ) ;
/*
* Close the current segment if it ' s fully written up in the last cycle of
* the loop , to create its archive notification file soon . Otherwise WAL
* archiving of the segment will be delayed until any data in the next
* segment is received and written .
*/
if ( recvFile > = 0 & & ! XLByteInSeg ( recptr , recvSegNo , wal_segment_size ) )
XLogWalRcvClose ( recptr ) ;
}
}
/*
/*
@ -1020,6 +1000,52 @@ XLogWalRcvFlush(bool dying)
}
}
}
}
/*
* Close the current segment .
*
* Flush the segment to disk before closing it . Otherwise we have to
* reopen and fsync it later .
*
* Create an archive notification file since the segment is known completed .
*/
static void
XLogWalRcvClose ( XLogRecPtr recptr )
{
char xlogfname [ MAXFNAMELEN ] ;
Assert ( recvFile > = 0 & & ! XLByteInSeg ( recptr , recvSegNo , wal_segment_size ) ) ;
/*
* fsync ( ) and close current file before we switch to next one . We would
* otherwise have to reopen this file to fsync it later
*/
XLogWalRcvFlush ( false ) ;
XLogFileName ( xlogfname , recvFileTLI , recvSegNo , wal_segment_size ) ;
/*
* XLOG segment files will be re - read by recovery in startup process soon ,
* so we don ' t advise the OS to release cache pages associated with the
* file like XLogFileClose ( ) does .
*/
if ( close ( recvFile ) ! = 0 )
ereport ( PANIC ,
( errcode_for_file_access ( ) ,
errmsg ( " could not close log segment %s: %m " ,
xlogfname ) ) ) ;
/*
* Create . done file forcibly to prevent the streamed segment from being
* archived later .
*/
if ( XLogArchiveMode ! = ARCHIVE_MODE_ALWAYS )
XLogArchiveForceDone ( xlogfname ) ;
else
XLogArchiveNotify ( xlogfname ) ;
recvFile = - 1 ;
}
/*
/*
* Send reply message to primary , indicating our current WAL locations , oldest
* Send reply message to primary , indicating our current WAL locations , oldest
* xmin and the current time .
* xmin and the current time .