@ -32,6 +32,14 @@
/* Time to sleep between reconnection attempts */
/* Time to sleep between reconnection attempts */
# define RECONNECT_SLEEP_TIME 5
# define RECONNECT_SLEEP_TIME 5
typedef enum
{
STREAM_STOP_NONE ,
STREAM_STOP_END_OF_WAL ,
STREAM_STOP_KEEPALIVE ,
STREAM_STOP_SIGNAL
} StreamStopReason ;
/* Global Options */
/* Global Options */
static char * outfile = NULL ;
static char * outfile = NULL ;
static int verbose = 0 ;
static int verbose = 0 ;
@ -55,6 +63,7 @@ static const char *plugin = "test_decoding";
/* Global State */
/* Global State */
static int outfd = - 1 ;
static int outfd = - 1 ;
static volatile sig_atomic_t time_to_abort = false ;
static volatile sig_atomic_t time_to_abort = false ;
static volatile sig_atomic_t stop_reason = STREAM_STOP_NONE ;
static volatile sig_atomic_t output_reopen = false ;
static volatile sig_atomic_t output_reopen = false ;
static bool output_isfile ;
static bool output_isfile ;
static TimestampTz output_last_fsync = - 1 ;
static TimestampTz output_last_fsync = - 1 ;
@ -66,7 +75,8 @@ static void usage(void);
static void StreamLogicalLog ( void ) ;
static void StreamLogicalLog ( void ) ;
static bool flushAndSendFeedback ( PGconn * conn , TimestampTz * now ) ;
static bool flushAndSendFeedback ( PGconn * conn , TimestampTz * now ) ;
static void prepareToTerminate ( PGconn * conn , XLogRecPtr endpos ,
static void prepareToTerminate ( PGconn * conn , XLogRecPtr endpos ,
bool keepalive , XLogRecPtr lsn ) ;
StreamStopReason reason ,
XLogRecPtr lsn ) ;
static void
static void
usage ( void )
usage ( void )
@ -207,9 +217,11 @@ StreamLogicalLog(void)
TimestampTz last_status = - 1 ;
TimestampTz last_status = - 1 ;
int i ;
int i ;
PQExpBuffer query ;
PQExpBuffer query ;
XLogRecPtr cur_record_lsn ;
output_written_lsn = InvalidXLogRecPtr ;
output_written_lsn = InvalidXLogRecPtr ;
output_fsync_lsn = InvalidXLogRecPtr ;
output_fsync_lsn = InvalidXLogRecPtr ;
cur_record_lsn = InvalidXLogRecPtr ;
/*
/*
* Connect in replication mode to the server
* Connect in replication mode to the server
@ -275,7 +287,8 @@ StreamLogicalLog(void)
int bytes_written ;
int bytes_written ;
TimestampTz now ;
TimestampTz now ;
int hdr_len ;
int hdr_len ;
XLogRecPtr cur_record_lsn = InvalidXLogRecPtr ;
cur_record_lsn = InvalidXLogRecPtr ;
if ( copybuf ! = NULL )
if ( copybuf ! = NULL )
{
{
@ -487,7 +500,7 @@ StreamLogicalLog(void)
if ( endposReached )
if ( endposReached )
{
{
prepareToTerminate ( conn , endpos , true , InvalidXLogRecPtr ) ;
stop_reason = STREAM_STOP_KEEPALIVE ;
time_to_abort = true ;
time_to_abort = true ;
break ;
break ;
}
}
@ -527,7 +540,7 @@ StreamLogicalLog(void)
*/
*/
if ( ! flushAndSendFeedback ( conn , & now ) )
if ( ! flushAndSendFeedback ( conn , & now ) )
goto error ;
goto error ;
prepareToTerminate ( conn , endpos , false , cur_record_lsn ) ;
stop_reason = STREAM_STOP_END_OF_WAL ;
time_to_abort = true ;
time_to_abort = true ;
break ;
break ;
}
}
@ -572,12 +585,16 @@ StreamLogicalLog(void)
/* endpos was exactly the record we just processed, we're done */
/* endpos was exactly the record we just processed, we're done */
if ( ! flushAndSendFeedback ( conn , & now ) )
if ( ! flushAndSendFeedback ( conn , & now ) )
goto error ;
goto error ;
prepareToTerminate ( conn , endpos , false , cur_record_lsn ) ;
stop_reason = STREAM_STOP_END_OF_WAL ;
time_to_abort = true ;
time_to_abort = true ;
break ;
break ;
}
}
}
}
/* Clean up connection state if stream has been aborted */
if ( time_to_abort )
prepareToTerminate ( conn , endpos , stop_reason , cur_record_lsn ) ;
res = PQgetResult ( conn ) ;
res = PQgetResult ( conn ) ;
if ( PQresultStatus ( res ) = = PGRES_COPY_OUT )
if ( PQresultStatus ( res ) = = PGRES_COPY_OUT )
{
{
@ -656,6 +673,7 @@ error:
static void
static void
sigexit_handler ( SIGNAL_ARGS )
sigexit_handler ( SIGNAL_ARGS )
{
{
stop_reason = STREAM_STOP_SIGNAL ;
time_to_abort = true ;
time_to_abort = true ;
}
}
@ -1021,18 +1039,31 @@ flushAndSendFeedback(PGconn *conn, TimestampTz *now)
* retry on failure .
* retry on failure .
*/
*/
static void
static void
prepareToTerminate ( PGconn * conn , XLogRecPtr endpos , bool keepalive , XLogRecPtr lsn )
prepareToTerminate ( PGconn * conn , XLogRecPtr endpos , StreamStopReason reason ,
XLogRecPtr lsn )
{
{
( void ) PQputCopyEnd ( conn , NULL ) ;
( void ) PQputCopyEnd ( conn , NULL ) ;
( void ) PQflush ( conn ) ;
( void ) PQflush ( conn ) ;
if ( verbose )
if ( verbose )
{
{
if ( keepalive )
switch ( reason )
pg_log_info ( " end position %X/%X reached by keepalive " ,
{
LSN_FORMAT_ARGS ( endpos ) ) ;
case STREAM_STOP_SIGNAL :
else
pg_log_info ( " received interrupt signal, exiting " ) ;
pg_log_info ( " end position %X/%X reached by WAL record at %X/%X " ,
break ;
LSN_FORMAT_ARGS ( endpos ) , LSN_FORMAT_ARGS ( lsn ) ) ;
case STREAM_STOP_KEEPALIVE :
pg_log_info ( " end position %X/%X reached by keepalive " ,
LSN_FORMAT_ARGS ( endpos ) ) ;
break ;
case STREAM_STOP_END_OF_WAL :
Assert ( ! XLogRecPtrIsInvalid ( lsn ) ) ;
pg_log_info ( " end position %X/%X reached by WAL record at %X/%X " ,
LSN_FORMAT_ARGS ( endpos ) , LSN_FORMAT_ARGS ( lsn ) ) ;
break ;
case STREAM_STOP_NONE :
Assert ( false ) ;
break ;
}
}
}
}
}