@ -40,6 +40,7 @@ static int noloop = 0;
static int standby_message_timeout = 10 * 1000 ; /* 10 sec = default */
static int fsync_interval = 10 * 1000 ; /* 10 sec = default */
static XLogRecPtr startpos = InvalidXLogRecPtr ;
static XLogRecPtr endpos = InvalidXLogRecPtr ;
static bool do_create_slot = false ;
static bool slot_exists_ok = false ;
static bool do_start_slot = false ;
@ -63,6 +64,9 @@ static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
static void usage ( void ) ;
static void StreamLogicalLog ( void ) ;
static void disconnect_and_exit ( int code ) ;
static bool flushAndSendFeedback ( PGconn * conn , TimestampTz * now ) ;
static void prepareToTerminate ( PGconn * conn , XLogRecPtr endpos ,
bool keepalive , XLogRecPtr lsn ) ;
static void
usage ( void )
@ -81,6 +85,7 @@ usage(void)
" time between fsyncs to the output file (default: %d) \n " ) , ( fsync_interval / 1000 ) ) ;
printf ( _ ( " --if-not-exists do not error if slot already exists when creating a slot \n " ) ) ;
printf ( _ ( " -I, --startpos=LSN where in an existing slot should the streaming start \n " ) ) ;
printf ( _ ( " -E, --endpos=LSN exit after receiving the specified LSN \n " ) ) ;
printf ( _ ( " -n, --no-loop do not loop on connection lost \n " ) ) ;
printf ( _ ( " -o, --option=NAME[=VALUE] \n "
" pass option NAME with optional value VALUE to the \n "
@ -281,6 +286,7 @@ StreamLogicalLog(void)
int bytes_written ;
int64 now ;
int hdr_len ;
XLogRecPtr cur_record_lsn = InvalidXLogRecPtr ;
if ( copybuf ! = NULL )
{
@ -454,6 +460,7 @@ StreamLogicalLog(void)
int pos ;
bool replyRequested ;
XLogRecPtr walEnd ;
bool endposReached = false ;
/*
* Parse the keepalive message , enclosed in the CopyData message .
@ -476,18 +483,32 @@ StreamLogicalLog(void)
}
replyRequested = copybuf [ pos ] ;
/* If the server requested an immediate reply, send one. */
if ( replyRequested )
if ( endpos ! = InvalidXLogRecPtr & & walEnd > = endpos )
{
/* fsync data, so we send a recent flush pointer */
if ( ! OutputFsync ( now ) )
goto error ;
/*
* If there ' s nothing to read on the socket until a keepalive
* we know that the server has nothing to send us ; and if
* walEnd has passed endpos , we know nothing else can have
* committed before endpos . So we can bail out now .
*/
endposReached = true ;
}
now = feGetCurrentTimestamp ( ) ;
if ( ! sendFeedback ( conn , now , true , false ) )
/* Send a reply, if necessary */
if ( replyRequested | | endposReached )
{
if ( ! flushAndSendFeedback ( conn , & now ) )
goto error ;
last_status = now ;
}
if ( endposReached )
{
prepareToTerminate ( conn , endpos , true , InvalidXLogRecPtr ) ;
time_to_abort = true ;
break ;
}
continue ;
}
else if ( copybuf [ 0 ] ! = ' w ' )
@ -497,7 +518,6 @@ StreamLogicalLog(void)
goto error ;
}
/*
* Read the header of the XLogData message , enclosed in the CopyData
* message . We only need the WAL location field ( dataStart ) , the rest
@ -515,12 +535,23 @@ StreamLogicalLog(void)
}
/* Extract WAL location for this block */
{
XLogRecPtr temp = fe_recvint64 ( & copybuf [ 1 ] ) ;
cur_record_lsn = fe_recvint64 ( & copybuf [ 1 ] ) ;
output_written_lsn = Max ( temp , output_written_lsn ) ;
if ( endpos ! = InvalidXLogRecPtr & & cur_record_lsn > endpos )
{
/*
* We ' ve read past our endpoint , so prepare to go away being
* cautious about what happens to our output data .
*/
if ( ! flushAndSendFeedback ( conn , & now ) )
goto error ;
prepareToTerminate ( conn , endpos , false , cur_record_lsn ) ;
time_to_abort = true ;
break ;
}
output_written_lsn = Max ( cur_record_lsn , output_written_lsn ) ;
bytes_left = r - hdr_len ;
bytes_written = 0 ;
@ -557,10 +588,29 @@ StreamLogicalLog(void)
strerror ( errno ) ) ;
goto error ;
}
if ( endpos ! = InvalidXLogRecPtr & & cur_record_lsn = = endpos )
{
/* endpos was exactly the record we just processed, we're done */
if ( ! flushAndSendFeedback ( conn , & now ) )
goto error ;
prepareToTerminate ( conn , endpos , false , cur_record_lsn ) ;
time_to_abort = true ;
break ;
}
}
res = PQgetResult ( conn ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
if ( PQresultStatus ( res ) = = PGRES_COPY_OUT )
{
/*
* We ' re doing a client - initiated clean exit and have sent CopyDone to
* the server . We ' ve already sent replay confirmation and fsync ' d so
* we can just clean up the connection now .
*/
goto error ;
}
else if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
{
fprintf ( stderr ,
_ ( " %s: unexpected termination of replication stream: %s " ) ,
@ -638,6 +688,7 @@ main(int argc, char **argv)
{ " password " , no_argument , NULL , ' W ' } ,
/* replication options */
{ " startpos " , required_argument , NULL , ' I ' } ,
{ " endpos " , required_argument , NULL , ' E ' } ,
{ " option " , required_argument , NULL , ' o ' } ,
{ " plugin " , required_argument , NULL , ' P ' } ,
{ " status-interval " , required_argument , NULL , ' s ' } ,
@ -673,7 +724,7 @@ main(int argc, char **argv)
}
}
while ( ( c = getopt_long ( argc , argv , " f:F:nvd:h:p:U:wWI:o:P:s:S: " ,
while ( ( c = getopt_long ( argc , argv , " f:F:nvd:h:p:U:wWI:E: o:P:s:S: " ,
long_options , & option_index ) ) ! = - 1 )
{
switch ( c )
@ -733,6 +784,16 @@ main(int argc, char **argv)
}
startpos = ( ( uint64 ) hi ) < < 32 | lo ;
break ;
case ' E ' :
if ( sscanf ( optarg , " %X/%X " , & hi , & lo ) ! = 2 )
{
fprintf ( stderr ,
_ ( " %s: could not parse end position \" %s \" \n " ) ,
progname , optarg ) ;
exit ( 1 ) ;
}
endpos = ( ( uint64 ) hi ) < < 32 | lo ;
break ;
case ' o ' :
{
char * data = pg_strdup ( optarg ) ;
@ -857,6 +918,16 @@ main(int argc, char **argv)
exit ( 1 ) ;
}
if ( endpos ! = InvalidXLogRecPtr & & ! do_start_slot )
{
fprintf ( stderr ,
_ ( " %s: --endpos may only be specified with --start \n " ) ,
progname ) ;
fprintf ( stderr , _ ( " Try \" %s --help \" for more information. \n " ) ,
progname ) ;
exit ( 1 ) ;
}
# ifndef WIN32
pqsignal ( SIGINT , sigint_handler ) ;
pqsignal ( SIGHUP , sighup_handler ) ;
@ -923,8 +994,8 @@ main(int argc, char **argv)
if ( time_to_abort )
{
/*
* We ' ve been Ctrl - C ' ed . That ' s not an error , so exit without an
* errorcode .
* We ' ve been Ctrl - C ' ed or reached an exit limit condition . That ' s
* not an error , so exit without an errorcode .
*/
disconnect_and_exit ( 0 ) ;
}
@ -943,3 +1014,47 @@ main(int argc, char **argv)
}
}
}
/*
* Fsync our output data , and send a feedback message to the server . Returns
* true if successful , false otherwise .
*
* If successful , * now is updated to the current timestamp just before sending
* feedback .
*/
static bool
flushAndSendFeedback ( PGconn * conn , TimestampTz * now )
{
/* flush data to disk, so that we send a recent flush pointer */
if ( ! OutputFsync ( * now ) )
return false ;
* now = feGetCurrentTimestamp ( ) ;
if ( ! sendFeedback ( conn , * now , true , false ) )
return false ;
return true ;
}
/*
* Try to inform the server about of upcoming demise , but don ' t wait around or
* retry on failure .
*/
static void
prepareToTerminate ( PGconn * conn , XLogRecPtr endpos , bool keepalive , XLogRecPtr lsn )
{
( void ) PQputCopyEnd ( conn , NULL ) ;
( void ) PQflush ( conn ) ;
if ( verbose )
{
if ( keepalive )
fprintf ( stderr , " %s: endpos %X/%X reached by keepalive \n " ,
progname ,
( uint32 ) ( endpos > > 32 ) , ( uint32 ) endpos ) ;
else
fprintf ( stderr , " %s: endpos %X/%X reached by record at %X/%X \n " ,
progname , ( uint32 ) ( endpos > > 32 ) , ( uint32 ) ( endpos ) ,
( uint32 ) ( lsn > > 32 ) , ( uint32 ) lsn ) ;
}
}