@ -82,7 +82,7 @@ static bool replication_started = false; /* Started streaming yet? */
/* User-settable parameters for walsender */
int max_wal_senders = 0 ; /* the maximum number of concurrent walsenders */
int replication _timeout = 60 * 1000 ; /* maximum time to send one
int wal_sender _timeout = 60 * 1000 ; /* maximum time to send one
* WAL data message */
/*
* State for WalSndWakeupRequest
@ -103,15 +103,20 @@ static uint32 sendOff = 0;
*/
static XLogRecPtr sentPtr = 0 ;
/* Buffer for processing reply messages. */
static StringInfoData reply_message ;
/*
* Buffer for processing reply messages .
* Buffer for constructing outgoing messages .
* ( 1 + sizeof ( WalDataMessageHeader ) + MAX_SEND_SIZE bytes )
*/
static StringInfoData reply _message;
static char * output _message;
/*
* Timestamp of the last receipt of the reply from the standby .
*/
static TimestampTz last_reply_timestamp ;
/* Have we sent a heartbeat message asking for reply, since last reply? */
static bool ping_sent = false ;
/* Flags set by signal handlers for later service in main loop */
static volatile sig_atomic_t got_SIGHUP = false ;
@ -126,14 +131,14 @@ static void WalSndLastCycleHandler(SIGNAL_ARGS);
static void WalSndLoop ( void ) __attribute__ ( ( noreturn ) ) ;
static void InitWalSenderSlot ( void ) ;
static void WalSndKill ( int code , Datum arg ) ;
static void XLogSend ( char * msgbuf , bool * caughtup ) ;
static void XLogSend ( bool * caughtup ) ;
static void IdentifySystem ( void ) ;
static void StartReplication ( StartReplicationCmd * cmd ) ;
static void ProcessStandbyMessage ( void ) ;
static void ProcessStandbyReplyMessage ( void ) ;
static void ProcessStandbyHSFeedbackMessage ( void ) ;
static void ProcessRepliesIfAny ( void ) ;
static void WalSndKeepalive ( char * msgbuf ) ;
static void WalSndKeepalive ( bool requestReply ) ;
/* Initialize walsender process before entering the main command loop */
@ -465,7 +470,10 @@ ProcessRepliesIfAny(void)
* Save the last reply timestamp if we ' ve received at least one reply .
*/
if ( received )
{
last_reply_timestamp = GetCurrentTimestamp ( ) ;
ping_sent = false ;
}
}
/*
@ -527,6 +535,10 @@ ProcessStandbyReplyMessage(void)
( uint32 ) ( reply . flush > > 32 ) , ( uint32 ) reply . flush ,
( uint32 ) ( reply . apply > > 32 ) , ( uint32 ) reply . apply ) ;
/* Send a reply if the standby requested one. */
if ( reply . replyRequested )
WalSndKeepalive ( false ) ;
/*
* Update shared state for this WalSender process based on reply data from
* standby .
@ -620,7 +632,6 @@ ProcessStandbyHSFeedbackMessage(void)
static void
WalSndLoop ( void )
{
char * output_message ;
bool caughtup = false ;
/*
@ -638,6 +649,7 @@ WalSndLoop(void)
/* Initialize the last reply timestamp */
last_reply_timestamp = GetCurrentTimestamp ( ) ;
ping_sent = false ;
/* Loop forever, unless we get an error */
for ( ; ; )
@ -672,7 +684,7 @@ WalSndLoop(void)
* caught up .
*/
if ( ! pq_is_send_pending ( ) )
XLogSend ( output_message , & caughtup ) ;
XLogSend ( & caughtup ) ;
else
caughtup = false ;
@ -708,7 +720,7 @@ WalSndLoop(void)
if ( walsender_ready_to_stop )
{
/* ... let's just be real sure we're caught up ... */
XLogSend ( output_message , & caughtup ) ;
XLogSend ( & caughtup ) ;
if ( caughtup & & ! pq_is_send_pending ( ) )
{
/* Inform the standby that XLOG streaming is done */
@ -738,23 +750,34 @@ WalSndLoop(void)
if ( pq_is_send_pending ( ) )
wakeEvents | = WL_SOCKET_WRITEABLE ;
else if ( MyWalSnd - > sendKeepalive )
else if ( wal_sender_timeout > 0 & & ! ping_sent )
{
WalSndKeepalive ( output_message ) ;
/* Try to flush pending output to the client */
if ( pq_flush_if_writable ( ) ! = 0 )
break ;
/*
* If half of wal_sender_timeout has lapsed without receiving
* any reply from standby , send a keep - alive message to standby
* requesting an immediate reply .
*/
timeout = TimestampTzPlusMilliseconds ( last_reply_timestamp ,
wal_sender_timeout / 2 ) ;
if ( GetCurrentTimestamp ( ) > = timeout )
{
WalSndKeepalive ( true ) ;
ping_sent = true ;
/* Try to flush pending output to the client */
if ( pq_flush_if_writable ( ) ! = 0 )
break ;
}
}
/* Determine time until replication timeout */
if ( replication_timeout > 0 )
if ( wal_sender _timeout > 0 )
{
timeout = TimestampTzPlusMilliseconds ( last_reply_timestamp ,
replication_timeout ) ;
sleeptime = 1 + ( replication_timeout / 10 ) ;
wal_sender _timeout) ;
sleeptime = 1 + ( wal_sender _timeout / 10 ) ;
}
/* Sleep until something happens or replication timeout */
/* Sleep until something happens or we time out */
ImmediateInterruptOK = true ;
CHECK_FOR_INTERRUPTS ( ) ;
WaitLatchOrSocket ( & MyWalSnd - > latch , wakeEvents ,
@ -766,8 +789,7 @@ WalSndLoop(void)
* possibility that the client replied just as we reached the
* timeout . . . he ' s supposed to reply * before * that .
*/
if ( replication_timeout > 0 & &
GetCurrentTimestamp ( ) > = timeout )
if ( wal_sender_timeout > 0 & & GetCurrentTimestamp ( ) > = timeout )
{
/*
* Since typically expiration of replication timeout means
@ -1016,15 +1038,11 @@ retry:
* but not yet sent to the client , and buffer it in the libpq output
* buffer .
*
* msgbuf is a work area in which the output message is constructed . It ' s
* passed in just so we can avoid re - palloc ' ing the buffer on each cycle .
* It must be of size 1 + sizeof ( WalDataMessageHeader ) + MAX_SEND_SIZE .
*
* If there is no unsent WAL remaining , * caughtup is set to true , otherwise
* * caughtup is set to false .
*/
static void
XLogSend ( char * msgbuf , bool * caughtup )
XLogSend ( bool * caughtup )
{
XLogRecPtr SendRqstPtr ;
XLogRecPtr startptr ;
@ -1107,13 +1125,13 @@ XLogSend(char *msgbuf, bool *caughtup)
/*
* OK to read and send the slice .
*/
msgbuf [ 0 ] = ' w ' ;
output_message [ 0 ] = ' w ' ;
/*
* Read the log directly into the output buffer to avoid extra memcpy
* calls .
*/
XLogRead ( msgbuf + 1 + sizeof ( WalDataMessageHeader ) , startptr , nbytes ) ;
XLogRead ( output_message + 1 + sizeof ( WalDataMessageHeader ) , startptr , nbytes ) ;
/*
* We fill the message header last so that the send timestamp is taken as
@ -1123,9 +1141,9 @@ XLogSend(char *msgbuf, bool *caughtup)
msghdr . walEnd = SendRqstPtr ;
msghdr . sendTime = GetCurrentTimestamp ( ) ;
memcpy ( msgbuf + 1 , & msghdr , sizeof ( WalDataMessageHeader ) ) ;
memcpy ( output_message + 1 , & msghdr , sizeof ( WalDataMessageHeader ) ) ;
pq_putmessage_noblock ( ' d ' , msgbuf , 1 + sizeof ( WalDataMessageHeader ) + nbytes ) ;
pq_putmessage_noblock ( ' d ' , output_message , 1 + sizeof ( WalDataMessageHeader ) + nbytes ) ;
sentPtr = endptr ;
@ -1492,21 +1510,27 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
return ( Datum ) 0 ;
}
/*
* This function is used to send keepalive message to standby .
* If requestReply is set , sets a flag in the message requesting the standby
* to send a message back to us , for heartbeat purposes .
*/
static void
WalSndKeepalive ( char * msgbuf )
WalSndKeepalive ( bool requestReply )
{
PrimaryKeepaliveMessage keepalive_message ;
/* Construct a new message */
keepalive_message . walEnd = sentPtr ;
keepalive_message . sendTime = GetCurrentTimestamp ( ) ;
keepalive_message . replyRequested = requestReply ;
elog ( DEBUG2 , " sending replication keepalive " ) ;
/* Prepend with the message type and send it. */
msgbuf [ 0 ] = ' k ' ;
memcpy ( msgbuf + 1 , & keepalive_message , sizeof ( PrimaryKeepaliveMessage ) ) ;
pq_putmessage_noblock ( ' d ' , msgbuf , sizeof ( PrimaryKeepaliveMessage ) + 1 ) ;
output_message [ 0 ] = ' k ' ;
memcpy ( output_message + 1 , & keepalive_message , sizeof ( PrimaryKeepaliveMessage ) ) ;
pq_putmessage_noblock ( ' d ' , output_message , sizeof ( PrimaryKeepaliveMessage ) + 1 ) ;
}
/*