@ -35,6 +35,8 @@ static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
uint32 timeline , char * basedir ,
stream_stop_callback stream_stop , int standby_message_timeout ,
char * partial_suffix , XLogRecPtr * stoppos ) ;
static int CopyStreamPoll ( PGconn * conn , long timeout_ms ) ;
static int CopyStreamReceive ( PGconn * conn , long timeout , char * * buffer ) ;
static bool ReadEndOfStreamingResult ( PGresult * res , XLogRecPtr * startpos ,
uint32 * timeline ) ;
@ -744,12 +746,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
int bytes_written ;
int64 now ;
int hdr_len ;
if ( copybuf ! = NULL )
{
PQfreemem ( copybuf ) ;
copybuf = NULL ;
}
long sleeptime ;
/*
* Check if we should continue streaming , or abort at this point .
@ -784,67 +781,38 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
last_status = now ;
}
r = PQgetCopyData ( conn , & copybuf , 1 ) ;
if ( r = = 0 )
/*
* Compute how long send / receive loops should sleep
*/
if ( standby_message_timeout & & still_sending )
{
/*
* No data available . Wait for some to appear , but not longer than
* the specified timeout , so that we can ping the server .
*/
fd_set input_mask ;
struct timeval timeout ;
struct timeval * timeoutptr ;
FD_ZERO ( & input_mask ) ;
FD_SET ( PQsocket ( conn ) , & input_mask ) ;
if ( standby_message_timeout & & still_sending )
int64 targettime ;
long secs ;
int usecs ;
targettime = last_status + ( standby_message_timeout - 1 ) * ( ( int64 ) 1000 ) ;
feTimestampDifference ( now ,
targettime ,
& secs ,
& usecs ) ;
/* Always sleep at least 1 sec */
if ( secs < = 0 )
{
int64 targettime ;
long secs ;
int usecs ;
targettime = last_status + ( standby_message_timeout - 1 ) * ( ( int64 ) 1000 ) ;
feTimestampDifference ( now ,
targettime ,
& secs ,
& usecs ) ;
if ( secs < = 0 )
timeout . tv_sec = 1 ; /* Always sleep at least 1 sec */
else
timeout . tv_sec = secs ;
timeout . tv_usec = usecs ;
timeoutptr = & timeout ;
secs = 1 ;
usecs = 0 ;
}
else
timeoutptr = NULL ;
r = select ( PQsocket ( conn ) + 1 , & input_mask , NULL , NULL , timeoutptr ) ;
if ( r = = 0 | | ( r < 0 & & errno = = EINTR ) )
{
/*
* Got a timeout or signal . Continue the loop and either
* deliver a status packet to the server or just go back into
* blocking .
*/
continue ;
}
else if ( r < 0 )
{
fprintf ( stderr , _ ( " %s: select() failed: %s \n " ) ,
progname , strerror ( errno ) ) ;
goto error ;
}
/* Else there is actually data on the socket */
if ( PQconsumeInput ( conn ) = = 0 )
{
fprintf ( stderr ,
_ ( " %s: could not receive data from WAL stream: %s " ) ,
progname , PQerrorMessage ( conn ) ) ;
goto error ;
}
continue ;
sleeptime = secs * 1000 + usecs / 1000 ;
}
else
sleeptime = - 1 ;
r = CopyStreamReceive ( conn , sleeptime , & copybuf ) ;
if ( r = = 0 )
continue ;
if ( r = = - 1 )
goto error ;
if ( r = = - 2 )
{
PGresult * res = PQgetResult ( conn ) ;
@ -877,15 +845,10 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
}
if ( copybuf ! = NULL )
PQfreemem ( copybuf ) ;
copybuf = NULL ;
* stoppos = blockpos ;
return res ;
}
if ( r = = - 2 )
{
fprintf ( stderr , _ ( " %s: could not read COPY data: %s " ) ,
progname , PQerrorMessage ( conn ) ) ;
goto error ;
}
/* Check the message type. */
if ( copybuf [ 0 ] = = ' k ' )
@ -1056,3 +1019,115 @@ error:
PQfreemem ( copybuf ) ;
return NULL ;
}
/*
* Wait until we can read CopyData message , or timeout .
*
* Returns 1 if data has become available for reading , 0 if timed out
* or interrupted by signal , and - 1 on an error .
*/
static int
CopyStreamPoll ( PGconn * conn , long timeout_ms )
{
int ret ;
fd_set input_mask ;
struct timeval timeout ;
struct timeval * timeoutptr ;
if ( PQsocket ( conn ) < 0 )
{
fprintf ( stderr , _ ( " %s: socket not open " ) , progname ) ;
return - 1 ;
}
FD_ZERO ( & input_mask ) ;
FD_SET ( PQsocket ( conn ) , & input_mask ) ;
if ( timeout_ms < 0 )
timeoutptr = NULL ;
else
{
timeout . tv_sec = timeout_ms / 1000L ;
timeout . tv_usec = ( timeout_ms % 1000L ) * 1000L ;
timeoutptr = & timeout ;
}
ret = select ( PQsocket ( conn ) + 1 , & input_mask , NULL , NULL , timeoutptr ) ;
if ( ret = = 0 | | ( ret < 0 & & errno = = EINTR ) )
return 0 ; /* Got a timeout or signal */
else if ( ret < 0 )
{
fprintf ( stderr , _ ( " %s: select() failed: %s \n " ) ,
progname , strerror ( errno ) ) ;
return - 1 ;
}
return 1 ;
}
/*
* Receive CopyData message available from XLOG stream , blocking for
* maximum of ' timeout ' ms .
*
* If data was received , returns the length of the data . * buffer is set to
* point to a buffer holding the received message . The buffer is only valid
* until the next CopyStreamReceive call .
*
* 0 if no data was available within timeout , or wait was interrupted
* by signal . - 1 on error . - 2 if the server ended the COPY .
*/
static int
CopyStreamReceive ( PGconn * conn , long timeout , char * * buffer )
{
static char * copybuf = NULL ;
int rawlen ;
if ( copybuf ! = NULL )
PQfreemem ( copybuf ) ;
copybuf = NULL ;
* buffer = NULL ;
/* Try to receive a CopyData message */
rawlen = PQgetCopyData ( conn , & copybuf , 1 ) ;
if ( rawlen = = 0 )
{
/*
* No data available . Wait for some to appear , but not longer than
* the specified timeout , so that we can ping the server .
*/
if ( timeout > 0 )
{
int ret ;
ret = CopyStreamPoll ( conn , timeout ) ;
if ( ret < = 0 )
return ret ;
}
/* Else there is actually data on the socket */
if ( PQconsumeInput ( conn ) = = 0 )
{
fprintf ( stderr ,
_ ( " %s: could not receive data from WAL stream: %s " ) ,
progname , PQerrorMessage ( conn ) ) ;
return - 1 ;
}
/* Now that we've consumed some input, try again */
rawlen = PQgetCopyData ( conn , & copybuf , 1 ) ;
if ( rawlen = = 0 )
return 0 ;
}
if ( rawlen = = - 1 ) /* end-of-streaming or error */
return - 2 ;
if ( rawlen = = - 2 )
{
fprintf ( stderr , _ ( " %s: could not read COPY data: %s " ) ,
progname , PQerrorMessage ( conn ) ) ;
return - 1 ;
}
/* Return received messages to caller */
* buffer = copybuf ;
return rawlen ;
}