@ -39,8 +39,9 @@ static bool still_sending = true; /* feedback still needs to be sent? */
static PGresult * HandleCopyStream ( PGconn * conn , StreamCtl * stream ,
XLogRecPtr * stoppos ) ;
static int CopyStreamPoll ( PGconn * conn , long timeout_ms ) ;
static int CopyStreamReceive ( PGconn * conn , long timeout , char * * buffer ) ;
static int CopyStreamPoll ( PGconn * conn , long timeout_ms , pgsocket stop_socket ) ;
static int CopyStreamReceive ( PGconn * conn , long timeout , pgsocket stop_socket ,
char * * buffer ) ;
static bool ProcessKeepaliveMsg ( PGconn * conn , StreamCtl * stream , char * copybuf ,
int len , XLogRecPtr blockpos , TimestampTz * last_status ) ;
static bool ProcessXLogDataMsg ( PGconn * conn , StreamCtl * stream , char * copybuf , int len ,
@ -417,8 +418,15 @@ CheckServerVersionForStreaming(PGconn *conn)
* return . As long as it returns false , streaming will continue
* indefinitely .
*
* If stream_stop ( ) checks for external input , stop_socket should be set to
* the FD it checks . This will allow such input to be detected promptly
* rather than after standby_message_timeout ( which might be indefinite ) .
* Note that signals will interrupt waits for input as well , but that is
* race - y since a signal received while busy won ' t interrupt the wait .
*
* standby_message_timeout controls how often we send a message
* back to the master letting it know our progress , in milliseconds .
* Zero means no messages are sent .
* This message will only contain the write location , and never
* flush or replay .
*
@ -825,7 +833,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
sleeptime = CalculateCopyStreamSleeptime ( now , stream - > standby_message_timeout ,
last_status ) ;
r = CopyStreamReceive ( conn , sleeptime , & copybuf ) ;
r = CopyStreamReceive ( conn , sleeptime , stream - > stop_socket , & copybuf ) ;
while ( r ! = 0 )
{
if ( r = = - 1 )
@ -870,7 +878,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
* Process the received data , and any subsequent data we can read
* without blocking .
*/
r = CopyStreamReceive ( conn , 0 , & copybuf ) ;
r = CopyStreamReceive ( conn , 0 , stream - > stop_socket , & copybuf ) ;
}
}
@ -881,20 +889,25 @@ error:
}
/*
* Wait until we can read CopyData message , or timeout .
* Wait until we can read a CopyData message ,
* or timeout , or occurrence of a signal or input on the stop_socket .
* ( timeout_ms < 0 means wait indefinitely ; 0 means don ' t wait . )
*
* Returns 1 if data has become available for reading , 0 if timed out
* or interrupted by signal , and - 1 on an error .
* or interrupted by signal or stop_socket input , and - 1 on an error .
*/
static int
CopyStreamPoll ( PGconn * conn , long timeout_ms )
CopyStreamPoll ( PGconn * conn , long timeout_ms , pgsocket stop_socket )
{
int ret ;
fd_set input_mask ;
int connsocket ;
int maxfd ;
struct timeval timeout ;
struct timeval * timeoutptr ;
if ( PQsocket ( conn ) < 0 )
connsocket = PQsocket ( conn ) ;
if ( connsocket < 0 )
{
fprintf ( stderr , _ ( " %s: invalid socket: %s " ) , progname ,
PQerrorMessage ( conn ) ) ;
@ -902,7 +915,13 @@ CopyStreamPoll(PGconn *conn, long timeout_ms)
}
FD_ZERO ( & input_mask ) ;
FD_SET ( PQsocket ( conn ) , & input_mask ) ;
FD_SET ( connsocket , & input_mask ) ;
maxfd = connsocket ;
if ( stop_socket ! = PGINVALID_SOCKET )
{
FD_SET ( stop_socket , & input_mask ) ;
maxfd = Max ( maxfd , stop_socket ) ;
}
if ( timeout_ms < 0 )
timeoutptr = NULL ;
@ -913,17 +932,20 @@ CopyStreamPoll(PGconn *conn, long timeout_ms)
timeoutptr = & timeout ;
}
ret = select ( PQsocket ( conn ) + 1 , & input_mask , NULL , NULL , timeoutptr ) ;
if ( ret = = 0 | | ( ret < 0 & & errno = = EINTR ) )
return 0 ; /* Got a timeout or signal */
else if ( ret < 0 )
ret = select ( maxfd + 1 , & input_mask , NULL , NULL , timeoutptr ) ;
if ( ret < 0 )
{
if ( errno = = EINTR )
return 0 ; /* Got a signal, so not an error */
fprintf ( stderr , _ ( " %s: select() failed: %s \n " ) ,
progname , strerror ( errno ) ) ;
return - 1 ;
}
if ( ret > 0 & & FD_ISSET ( connsocket , & input_mask ) )
return 1 ; /* Got input on connection socket */
return 1 ;
return 0 ; /* Got timeout or input on stop_socket */
}
/*
@ -934,11 +956,13 @@ CopyStreamPoll(PGconn *conn, long timeout_ms)
* point to a buffer holding the received message . The buffer is only valid
* until the next CopyStreamReceive call .
*
* 0 if no data was available within timeout , or wait was interrupted
* by signal . - 1 on error . - 2 if the server ended the COPY .
* Returns 0 if no data was available within timeout , or if wait was
* interrupted by signal or stop_socket input .
* - 1 on error . - 2 if the server ended the COPY .
*/
static int
CopyStreamReceive ( PGconn * conn , long timeout , char * * buffer )
CopyStreamReceive ( PGconn * conn , long timeout , pgsocket stop_socket ,
char * * buffer )
{
char * copybuf = NULL ;
int rawlen ;
@ -951,20 +975,18 @@ CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
rawlen = PQgetCopyData ( conn , & copybuf , 1 ) ;
if ( rawlen = = 0 )
{
int ret ;
/*
* No data available . Wait for some to appear , but not longer than the
* specified timeout , so that we can ping the server .
* No data available . Wait for some to appear , but not longer than
* the specified timeout , so that we can ping the server . Also stop
* waiting if input appears on stop_socket .
*/
if ( timeout ! = 0 )
{
int ret ;
ret = CopyStreamPoll ( conn , timeout ) ;
if ( ret < = 0 )
return ret ;
}
ret = CopyStreamPoll ( conn , timeout , stop_socket ) ;
if ( ret < = 0 )
return ret ;
/* Else there is actually data on the socket */
/* Now there is actually data on the socket */
if ( PQconsumeInput ( conn ) = = 0 )
{
fprintf ( stderr ,