@ -31,12 +31,14 @@ static char current_walfile_name[MAXPGPATH] = "";
static bool reportFlushPosition = false ;
static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr ;
static int64 last_fsync = - 1 ; /* timestamp of last WAL file flush */
static bool still_sending = true ; /* feedback still needs to be sent? */
static PGresult * HandleCopyStream ( PGconn * conn , XLogRecPtr startpos ,
uint32 timeline , char * basedir ,
stream_stop_callback stream_stop , int standby_message_timeout ,
char * partial_suffix , XLogRecPtr * stoppos ) ;
char * partial_suffix , XLogRecPtr * stoppos ,
int fsync_interval ) ;
static int CopyStreamPoll ( PGconn * conn , long timeout_ms ) ;
static int CopyStreamReceive ( PGconn * conn , long timeout , char * * buffer ) ;
static bool ProcessKeepaliveMsg ( PGconn * conn , char * copybuf , int len ,
@ -48,6 +50,13 @@ static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
static PGresult * HandleEndOfCopyStream ( PGconn * conn , char * copybuf ,
XLogRecPtr blockpos , char * basedir , char * partial_suffix ,
XLogRecPtr * stoppos ) ;
static bool CheckCopyStreamStop ( PGconn * conn , XLogRecPtr blockpos ,
uint32 timeline , char * basedir ,
stream_stop_callback stream_stop ,
char * partial_suffix , XLogRecPtr * stoppos ) ;
static long CalculateCopyStreamSleeptime ( int64 now , int standby_message_timeout ,
int64 last_status , int fsync_interval ,
XLogRecPtr blockpos ) ;
static bool ReadEndOfStreamingResult ( PGresult * res , XLogRecPtr * startpos ,
uint32 * timeline ) ;
@ -200,6 +209,7 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
progname , current_walfile_name , partial_suffix ) ;
lastFlushPosition = pos ;
last_fsync = feGetCurrentTimestamp ( ) ;
return true ;
}
@ -430,13 +440,17 @@ CheckServerVersionForStreaming(PGconn *conn)
* allows you to tell the difference between partial and completed files ,
* so that you can continue later where you left .
*
* fsync_interval controls how often we flush to the received WAL file ,
* in milliseconds .
*
* Note : The log position * must * be at a log segment start !
*/
bool
ReceiveXlogStream ( PGconn * conn , XLogRecPtr startpos , uint32 timeline ,
char * sysidentifier , char * basedir ,
stream_stop_callback stream_stop ,
int standby_message_timeout , char * partial_suffix )
int standby_message_timeout , char * partial_suffix ,
int fsync_interval )
{
char query [ 128 ] ;
char slotcmd [ 128 ] ;
@ -581,7 +595,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/* Stream the WAL */
res = HandleCopyStream ( conn , startpos , timeline , basedir , stream_stop ,
standby_message_timeout , partial_suffix ,
& stoppos ) ;
& stoppos , fsync_interval ) ;
if ( res = = NULL )
goto error ;
@ -746,7 +760,7 @@ static PGresult *
HandleCopyStream ( PGconn * conn , XLogRecPtr startpos , uint32 timeline ,
char * basedir , stream_stop_callback stream_stop ,
int standby_message_timeout , char * partial_suffix ,
XLogRecPtr * stoppos )
XLogRecPtr * stoppos , int fsync_interval )
{
char * copybuf = NULL ;
int64 last_status = - 1 ;
@ -763,26 +777,36 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/*
* Check if we should continue streaming , or abort at this point .
*/
if ( still_sending & & stream_stop ( blockpos , timeline , false ) )
if ( ! CheckCopyStreamStop ( conn , blockpos , timeline , basedir ,
stream_stop , partial_suffix , stoppos ) )
goto error ;
now = feGetCurrentTimestamp ( ) ;
/*
* If fsync_interval has elapsed since last WAL flush and we ' ve written
* some WAL data , flush them to disk .
*/
if ( lastFlushPosition < blockpos & &
walfile ! = - 1 & &
( ( fsync_interval > 0 & &
feTimestampDifferenceExceeds ( last_fsync , now , fsync_interval ) ) | |
fsync_interval < 0 ) )
{
if ( ! close_walfile ( basedir , partial_suffix , blockpos ) )
{
/* Potential error message is written by close_walfile */
goto error ;
}
if ( PQputCopyEnd ( conn , NULL ) < = 0 | | PQflush ( conn ) )
if ( fsync ( walfile ) ! = 0 )
{
fprintf ( stderr , _ ( " %s: could not send copy-end packet: %s " ) ,
progname , PQerrorMessage ( conn ) ) ;
fprintf ( stderr , _ ( " %s: could not fsync file \" %s \" : %s \n " ) ,
progname , current_walfile_name , strerror ( errno ) ) ;
goto error ;
}
still_sending = false ;
lastFlushPosition = blockpos ;
last_fsync = now ;
}
/*
* Potentially send a status message to the master
*/
now = feGetCurrentTimestamp ( ) ;
if ( still_sending & & standby_message_timeout > 0 & &
feTimestampDifferenceExceeds ( last_status , now ,
standby_message_timeout ) )
@ -794,64 +818,58 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
}
/*
* Compu te how long send / receive loops should sleep
* Calcula te how long send / receive loops should sleep
*/
if ( standby_message_timeout & & still_sending )
{
int64 targettime ;
long secs ;
int usecs ;
targettime = last_status + ( standby_message_timeout - 1 ) * ( ( int64 ) 1000 ) ;
feTimestampDifference ( now ,
targettime ,
& secs ,
& usecs ) ;
/* Always sleep at least 1 sec */
if ( secs < = 0 )
{
secs = 1 ;
usecs = 0 ;
}
sleeptime = secs * 1000 + usecs / 1000 ;
}
else
sleeptime = - 1 ;
sleeptime = CalculateCopyStreamSleeptime ( now , standby_message_timeout ,
last_status , fsync_interval , blockpos ) ;
r = CopyStreamReceive ( conn , sleeptime , & copybuf ) ;
if ( r = = 0 )
continue ;
if ( r = = - 1 )
goto error ;
if ( r = = - 2 )
while ( r ! = 0 )
{
PGresult * res = HandleEndOfCopyStream ( conn , copybuf , blockpos ,
basedir , partial_suffix , stoppos ) ;
if ( res = = NULL )
if ( r = = - 1 )
goto error ;
else
return res ;
}
if ( r = = - 2 )
{
PGresult * res = HandleEndOfCopyStream ( conn , copybuf , blockpos ,
basedir , partial_suffix , stoppos ) ;
if ( res = = NULL )
goto error ;
else
return res ;
}
/* Check the message type. */
if ( copybuf [ 0 ] = = ' k ' )
{
if ( ! ProcessKeepaliveMsg ( conn , copybuf , r , blockpos ,
& last_status ) )
goto error ;
}
else if ( copybuf [ 0 ] = = ' w ' )
{
if ( ! ProcessXLogDataMsg ( conn , copybuf , r , & blockpos ,
timeline , basedir , stream_stop , partial_suffix ) )
/* Check the message type. */
if ( copybuf [ 0 ] = = ' k ' )
{
if ( ! ProcessKeepaliveMsg ( conn , copybuf , r , blockpos ,
& last_status ) )
goto error ;
}
else if ( copybuf [ 0 ] = = ' w ' )
{
if ( ! ProcessXLogDataMsg ( conn , copybuf , r , & blockpos ,
timeline , basedir , stream_stop , partial_suffix ) )
goto error ;
/*
* Check if we should continue streaming , or abort at this point .
*/
if ( ! CheckCopyStreamStop ( conn , blockpos , timeline , basedir ,
stream_stop , partial_suffix , stoppos ) )
goto error ;
}
else
{
fprintf ( stderr , _ ( " %s: unrecognized streaming header: \" %c \" \n " ) ,
progname , copybuf [ 0 ] ) ;
goto error ;
}
else
{
fprintf ( stderr , _ ( " %s: unrecognized streaming header: \" %c \" \n " ) ,
progname , copybuf [ 0 ] ) ;
goto error ;
}
/*
* Process the received data , and any subsequent data we
* can read without blocking .
*/
r = CopyStreamReceive ( conn , 0 , & copybuf ) ;
}
}
@ -1193,3 +1211,80 @@ HandleEndOfCopyStream(PGconn *conn, char *copybuf,
* stoppos = blockpos ;
return res ;
}
/*
* Check if we should continue streaming , or abort at this point .
*/
static bool
CheckCopyStreamStop ( PGconn * conn , XLogRecPtr blockpos , uint32 timeline ,
char * basedir , stream_stop_callback stream_stop ,
char * partial_suffix , XLogRecPtr * stoppos )
{
if ( still_sending & & stream_stop ( blockpos , timeline , false ) )
{
if ( ! close_walfile ( basedir , partial_suffix , blockpos ) )
{
/* Potential error message is written by close_walfile */
return false ;
}
if ( PQputCopyEnd ( conn , NULL ) < = 0 | | PQflush ( conn ) )
{
fprintf ( stderr , _ ( " %s: could not send copy-end packet: %s " ) ,
progname , PQerrorMessage ( conn ) ) ;
return false ;
}
still_sending = false ;
}
return true ;
}
/*
* Calculate how long send / receive loops should sleep
*/
static long
CalculateCopyStreamSleeptime ( int64 now , int standby_message_timeout ,
int64 last_status , int fsync_interval , XLogRecPtr blockpos )
{
int64 targettime = 0 ;
int64 status_targettime = 0 ;
int64 fsync_targettime = 0 ;
long sleeptime ;
if ( standby_message_timeout & & still_sending )
status_targettime = last_status +
( standby_message_timeout - 1 ) * ( ( int64 ) 1000 ) ;
if ( fsync_interval > 0 & & lastFlushPosition < blockpos )
fsync_targettime = last_fsync +
( fsync_interval - 1 ) * ( ( int64 ) 1000 ) ;
if ( ( status_targettime < fsync_targettime & & status_targettime > 0 ) | |
fsync_targettime = = 0 )
targettime = status_targettime ;
else
targettime = fsync_targettime ;
if ( targettime > 0 )
{
long secs ;
int usecs ;
feTimestampDifference ( now ,
targettime ,
& secs ,
& usecs ) ;
/* Always sleep at least 1 sec */
if ( secs < = 0 )
{
secs = 1 ;
usecs = 0 ;
}
sleeptime = secs * 1000 + usecs / 1000 ;
}
else
sleeptime = - 1 ;
return sleeptime ;
}