@ -161,9 +161,12 @@ static StringInfoData output_message;
static StringInfoData reply_message ;
static StringInfoData tmpbuf ;
/* Timestamp of last ProcessRepliesIfAny(). */
static TimestampTz last_processing = 0 ;
/*
* Timestamp of the last receipt of the reply from the standby . Set to 0 if
* wal_sender_timeout doesn ' t need to be active .
* Timestamp of last ProcessRepliesIfAny ( ) that saw a reply from the
* standby . Set to 0 if wal_sender_timeout doesn ' t need to be active .
*/
static TimestampTz last_reply_timestamp = 0 ;
@ -240,8 +243,8 @@ static void ProcessStandbyReplyMessage(void);
static void ProcessStandbyHSFeedbackMessage ( void ) ;
static void ProcessRepliesIfAny ( void ) ;
static void WalSndKeepalive ( bool requestReply ) ;
static void WalSndKeepaliveIfNecessary ( TimestampTz now ) ;
static void WalSndCheckTimeOut ( TimestampTz now ) ;
static void WalSndKeepaliveIfNecessary ( void ) ;
static void WalSndCheckTimeOut ( void ) ;
static long WalSndComputeSleeptime ( TimestampTz now ) ;
static void WalSndPrepareWrite ( LogicalDecodingContext * ctx , XLogRecPtr lsn , TransactionId xid , bool last_write ) ;
static void WalSndWriteData ( LogicalDecodingContext * ctx , XLogRecPtr lsn , TransactionId xid , bool last_write ) ;
@ -1202,18 +1205,16 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
/* Check for input from the client */
ProcessRepliesIfAny ( ) ;
now = GetCurrentTimestamp ( ) ;
/* die if timeout was reached */
WalSndCheckTimeOut ( now ) ;
WalSndCheckTimeOut ( ) ;
/* Send keepalive if the time has come */
WalSndKeepaliveIfNecessary ( now ) ;
WalSndKeepaliveIfNecessary ( ) ;
if ( ! pq_is_send_pending ( ) )
break ;
sleeptime = WalSndComputeSleeptime ( now ) ;
sleeptime = WalSndComputeSleeptime ( GetCurrentTimestamp ( ) ) ;
wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT ;
@ -1308,7 +1309,6 @@ WalSndWaitForWal(XLogRecPtr loc)
for ( ; ; )
{
long sleeptime ;
TimestampTz now ;
/*
* Emergency bailout if postmaster has died . This is to avoid the
@ -1393,13 +1393,11 @@ WalSndWaitForWal(XLogRecPtr loc)
! pq_is_send_pending ( ) )
break ;
now = GetCurrentTimestamp ( ) ;
/* die if timeout was reached */
WalSndCheckTimeOut ( now ) ;
WalSndCheckTimeOut ( ) ;
/* Send keepalive if the time has come */
WalSndKeepaliveIfNecessary ( now ) ;
WalSndKeepaliveIfNecessary ( ) ;
/*
* Sleep until something happens or we time out . Also wait for the
@ -1408,7 +1406,7 @@ WalSndWaitForWal(XLogRecPtr loc)
* new WAL to be generated . ( But if we have nothing to send , we don ' t
* want to wake on socket - writable . )
*/
sleeptime = WalSndComputeSleeptime ( now ) ;
sleeptime = WalSndComputeSleeptime ( GetCurrentTimestamp ( ) ) ;
wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
WL_SOCKET_READABLE | WL_TIMEOUT ;
@ -1605,6 +1603,8 @@ ProcessRepliesIfAny(void)
int r ;
bool received = false ;
last_processing = GetCurrentTimestamp ( ) ;
for ( ; ; )
{
pq_startmsgread ( ) ;
@ -1692,7 +1692,7 @@ ProcessRepliesIfAny(void)
*/
if ( received )
{
last_reply_timestamp = GetCurrentTimestamp ( ) ;
last_reply_timestamp = last_processing ;
waiting_for_ping_response = false ;
}
}
@ -2071,10 +2071,18 @@ WalSndComputeSleeptime(TimestampTz now)
/*
* Check whether there have been responses by the client within
* wal_sender_timeout and shutdown if not .
* wal_sender_timeout and shutdown if not . Using last_processing as the
* reference point avoids counting server - side stalls against the client .
* However , a long server - side stall can make WalSndKeepaliveIfNecessary ( )
* postdate last_processing by more than wal_sender_timeout . If that happens ,
* the client must reply almost immediately to avoid a timeout . This rarely
* affects the default configuration , under which clients spontaneously send a
* message every standby_message_timeout = wal_sender_timeout / 6 = 10 s . We
* could eliminate that problem by recognizing timeout expiration at
* wal_sender_timeout / 2 after the keepalive .
*/
static void
WalSndCheckTimeOut ( TimestampTz now )
WalSndCheckTimeOut ( void )
{
TimestampTz timeout ;
@ -2085,7 +2093,7 @@ WalSndCheckTimeOut(TimestampTz now)
timeout = TimestampTzPlusMilliseconds ( last_reply_timestamp ,
wal_sender_timeout ) ;
if ( wal_sender_timeout > 0 & & now > = timeout )
if ( wal_sender_timeout > 0 & & last_processing > = timeout )
{
/*
* Since typically expiration of replication timeout means
@ -2116,8 +2124,6 @@ WalSndLoop(WalSndSendDataCallback send_data)
*/
for ( ; ; )
{
TimestampTz now ;
/*
* Emergency bailout if postmaster has died . This is to avoid the
* necessity for manual cleanup of all postmaster children .
@ -2195,13 +2201,11 @@ WalSndLoop(WalSndSendDataCallback send_data)
WalSndDone ( send_data ) ;
}
now = GetCurrentTimestamp ( ) ;
/* Check for replication timeout. */
WalSndCheckTimeOut ( now ) ;
WalSndCheckTimeOut ( ) ;
/* Send keepalive if the time has come */
WalSndKeepaliveIfNecessary ( now ) ;
WalSndKeepaliveIfNecessary ( ) ;
/*
* We don ' t block if not caught up , unless there is unsent data
@ -2219,7 +2223,11 @@ WalSndLoop(WalSndSendDataCallback send_data)
wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT |
WL_SOCKET_READABLE ;
sleeptime = WalSndComputeSleeptime ( now ) ;
/*
* Use fresh timestamp , not last_processed , to reduce the chance
* of reaching wal_sender_timeout before sending a keepalive .
*/
sleeptime = WalSndComputeSleeptime ( GetCurrentTimestamp ( ) ) ;
if ( pq_is_send_pending ( ) )
wakeEvents | = WL_SOCKET_WRITEABLE ;
@ -3379,7 +3387,7 @@ WalSndKeepalive(bool requestReply)
* Send keepalive message if too much time has elapsed .
*/
static void
WalSndKeepaliveIfNecessary ( TimestampTz now )
WalSndKeepaliveIfNecessary ( void )
{
TimestampTz ping_time ;
@ -3400,7 +3408,7 @@ WalSndKeepaliveIfNecessary(TimestampTz now)
*/
ping_time = TimestampTzPlusMilliseconds ( last_reply_timestamp ,
wal_sender_timeout / 2 ) ;
if ( now > = ping_time )
if ( last_processing > = ping_time )
{
WalSndKeepalive ( true ) ;
waiting_for_ping_response = true ;