@ -95,8 +95,6 @@ bool hot_standby_feedback;
static WalReceiverConn * wrconn = NULL ;
WalReceiverFunctionsType * WalReceiverFunctions = NULL ;
# define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */
/*
* These variables are used similarly to openLogFile / SegNo ,
* but for walreceiver to write the XLOG . recvFileTLI is the TimeLineID
@ -116,6 +114,23 @@ static struct
XLogRecPtr Flush ; /* last byte + 1 flushed in the standby */
} LogstreamResult ;
/*
* Reasons to wake up and perform periodic tasks .
*/
typedef enum WalRcvWakeupReason
{
WALRCV_WAKEUP_TERMINATE ,
WALRCV_WAKEUP_PING ,
WALRCV_WAKEUP_REPLY ,
WALRCV_WAKEUP_HSFEEDBACK ,
NUM_WALRCV_WAKEUPS
} WalRcvWakeupReason ;
/*
* Wake up times for periodic tasks .
*/
static TimestampTz wakeup [ NUM_WALRCV_WAKEUPS ] ;
static StringInfoData reply_message ;
static StringInfoData incoming_message ;
@ -132,6 +147,7 @@ static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli);
static void XLogWalRcvSendReply ( bool force , bool requestReply ) ;
static void XLogWalRcvSendHSFeedback ( bool immed ) ;
static void ProcessWalSndrMessage ( XLogRecPtr walEnd , TimestampTz sendTime ) ;
static void WalRcvComputeNextWakeup ( WalRcvWakeupReason reason , TimestampTz now ) ;
/*
* Process any interrupts the walreceiver process may have received .
@ -179,9 +195,7 @@ WalReceiverMain(void)
TimeLineID primaryTLI ;
bool first_stream ;
WalRcvData * walrcv = WalRcv ;
TimestampTz last_recv_timestamp ;
TimestampTz starttime ;
bool ping_sent ;
TimestampTz now ;
char * err ;
char * sender_host = NULL ;
int sender_port = 0 ;
@ -192,7 +206,7 @@ WalReceiverMain(void)
*/
Assert ( walrcv ! = NULL ) ;
starttime = GetCurrentTimestamp ( ) ;
now = GetCurrentTimestamp ( ) ;
/*
* Mark walreceiver as running in shared memory .
@ -248,7 +262,7 @@ WalReceiverMain(void)
/* Initialise to a sanish value */
walrcv - > lastMsgSendTime =
walrcv - > lastMsgReceiptTime = walrcv - > latestWalEndTime = starttime ;
walrcv - > lastMsgReceiptTime = walrcv - > latestWalEndTime = now ;
/* Report the latch to use to awaken this process */
walrcv - > latch = & MyProc - > procLatch ;
@ -414,9 +428,10 @@ WalReceiverMain(void)
initStringInfo ( & reply_message ) ;
initStringInfo ( & incoming_message ) ;
/* Initialize the last recv timestamp */
last_recv_timestamp = GetCurrentTimestamp ( ) ;
ping_sent = false ;
/* Initialize nap wakeup times. */
now = GetCurrentTimestamp ( ) ;
for ( int i = 0 ; i < NUM_WALRCV_WAKEUPS ; + + i )
WalRcvComputeNextWakeup ( i , now ) ;
/* Loop until end-of-streaming or error */
for ( ; ; )
@ -426,6 +441,8 @@ WalReceiverMain(void)
bool endofwal = false ;
pgsocket wait_fd = PGINVALID_SOCKET ;
int rc ;
TimestampTz nextWakeup ;
int nap ;
/*
* Exit walreceiver if we ' re not in recovery . This should not
@ -443,11 +460,15 @@ WalReceiverMain(void)
{
ConfigReloadPending = false ;
ProcessConfigFile ( PGC_SIGHUP ) ;
now = GetCurrentTimestamp ( ) ;
for ( int i = 0 ; i < NUM_WALRCV_WAKEUPS ; + + i )
WalRcvComputeNextWakeup ( i , now ) ;
XLogWalRcvSendHSFeedback ( true ) ;
}
/* See if we can read data immediately */
len = walrcv_receive ( wrconn , & buf , & wait_fd ) ;
now = GetCurrentTimestamp ( ) ;
if ( len ! = 0 )
{
/*
@ -459,11 +480,12 @@ WalReceiverMain(void)
if ( len > 0 )
{
/*
* Something was received from primary , so rese t
* timeout
* Something was received from primary , so adjus t
* the ping and terminate wakeup times .
*/
last_recv_timestamp = GetCurrentTimestamp ( ) ;
ping_sent = false ;
WalRcvComputeNextWakeup ( WALRCV_WAKEUP_TERMINATE ,
now ) ;
WalRcvComputeNextWakeup ( WALRCV_WAKEUP_PING , now ) ;
XLogWalRcvProcessMsg ( buf [ 0 ] , & buf [ 1 ] , len - 1 ,
startpointTLI ) ;
}
@ -480,6 +502,7 @@ WalReceiverMain(void)
break ;
}
len = walrcv_receive ( wrconn , & buf , & wait_fd ) ;
now = GetCurrentTimestamp ( ) ;
}
/* Let the primary know that we received some data. */
@ -497,6 +520,20 @@ WalReceiverMain(void)
if ( endofwal )
break ;
/* Find the soonest wakeup time, to limit our nap. */
nextWakeup = PG_INT64_MAX ;
for ( int i = 0 ; i < NUM_WALRCV_WAKEUPS ; + + i )
nextWakeup = Min ( wakeup [ i ] , nextWakeup ) ;
/*
* Calculate the nap time . WaitLatchOrSocket ( ) doesn ' t accept
* timeouts longer than INT_MAX milliseconds , so we limit the
* result accordingly . Also , we round up to the next
* millisecond to avoid waking up too early and spinning until
* one of the wakeup times .
*/
nap = ( int ) Min ( INT_MAX , Max ( 0 , ( nextWakeup - now + 999 ) / 1000 ) ) ;
/*
* Ideally we would reuse a WaitEventSet object repeatedly
* here to avoid the overheads of WaitLatchOrSocket on epoll
@ -513,8 +550,9 @@ WalReceiverMain(void)
WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
WL_TIMEOUT | WL_LATCH_SET ,
wait_fd ,
NAPTIME_PER_CYCLE ,
nap ,
WAIT_EVENT_WAL_RECEIVER_MAIN ) ;
now = GetCurrentTimestamp ( ) ;
if ( rc & WL_LATCH_SET )
{
ResetLatch ( MyLatch ) ;
@ -550,34 +588,19 @@ WalReceiverMain(void)
* Check if time since last receive from primary has
* reached the configured limit .
*/
if ( wal_receiver_timeout > 0 )
{
TimestampTz now = GetCurrentTimestamp ( ) ;
TimestampTz timeout ;
timeout =
TimestampTzPlusMilliseconds ( last_recv_timestamp ,
wal_receiver_timeout ) ;
if ( now > = timeout )
if ( now > = wakeup [ WALRCV_WAKEUP_TERMINATE ] )
ereport ( ERROR ,
( errcode ( ERRCODE_CONNECTION_FAILURE ) ,
errmsg ( " terminating walreceiver due to timeout " ) ) ) ;
/*
* We didn ' t receive anything new , for half of
* receiver replication timeout . Ping the server .
* We didn ' t receive anything new , for half of receiver
* replication timeout . Ping the server .
*/
if ( ! ping_sent )
{
timeout = TimestampTzPlusMilliseconds ( last_recv_timestamp ,
( wal_receiver_timeout / 2 ) ) ;
if ( now > = timeout )
if ( now > = wakeup [ WALRCV_WAKEUP_PING ] )
{
requestReply = true ;
ping_sent = true ;
}
}
wakeup [ WALRCV_WAKEUP_PING ] = PG_INT64_MAX ;
}
XLogWalRcvSendReply ( requestReply , requestReply ) ;
@ -1076,7 +1099,6 @@ XLogWalRcvSendReply(bool force, bool requestReply)
static XLogRecPtr writePtr = 0 ;
static XLogRecPtr flushPtr = 0 ;
XLogRecPtr applyPtr ;
static TimestampTz sendTime = 0 ;
TimestampTz now ;
/*
@ -1101,10 +1123,11 @@ XLogWalRcvSendReply(bool force, bool requestReply)
if ( ! force
& & writePtr = = LogstreamResult . Write
& & flushPtr = = LogstreamResult . Flush
& & ! TimestampDifferenceExceeds ( sendTime , now ,
wal_receiver_status_interval * 1000 ) )
& & now < wakeup [ WALRCV_WAKEUP_REPLY ] )
return ;
sendTime = now ;
/* Make sure we wake up when it's time to send another reply. */
WalRcvComputeNextWakeup ( WALRCV_WAKEUP_REPLY , now ) ;
/* Construct a new message */
writePtr = LogstreamResult . Write ;
@ -1149,7 +1172,6 @@ XLogWalRcvSendHSFeedback(bool immed)
catalog_xmin_epoch ;
TransactionId xmin ,
catalog_xmin ;
static TimestampTz sendTime = 0 ;
/* initially true so we always send at least one feedback message */
static bool primary_has_standby_xmin = true ;
@ -1165,16 +1187,12 @@ XLogWalRcvSendHSFeedback(bool immed)
/* Get current timestamp. */
now = GetCurrentTimestamp ( ) ;
if ( ! immed )
{
/*
* Send feedback at most once per wal_receiver_status_interval .
*/
if ( ! TimestampDifferenceExceeds ( sendTime , now ,
wal_receiver_status_interval * 1000 ) )
/* Send feedback at most once per wal_receiver_status_interval. */
if ( ! immed & & now < wakeup [ WALRCV_WAKEUP_HSFEEDBACK ] )
return ;
sendTime = now ;
}
/* Make sure we wake up when it's time to send feedback again. */
WalRcvComputeNextWakeup ( WALRCV_WAKEUP_HSFEEDBACK , now ) ;
/*
* If Hot Standby is not yet accepting connections there is nothing to
@ -1285,6 +1303,45 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
}
}
/*
* Compute the next wakeup time for a given wakeup reason . Can be called to
* initialize a wakeup time , to adjust it for the next wakeup , or to
* reinitialize it when GUCs have changed .
*/
static void
WalRcvComputeNextWakeup ( WalRcvWakeupReason reason , TimestampTz now )
{
switch ( reason )
{
case WALRCV_WAKEUP_TERMINATE :
if ( wal_receiver_timeout < = 0 )
wakeup [ reason ] = PG_INT64_MAX ;
else
wakeup [ reason ] = now + wal_receiver_timeout * INT64CONST ( 1000 ) ;
break ;
case WALRCV_WAKEUP_PING :
if ( wal_receiver_timeout < = 0 )
wakeup [ reason ] = PG_INT64_MAX ;
else
wakeup [ reason ] = now + ( wal_receiver_timeout / 2 ) * INT64CONST ( 1000 ) ;
break ;
case WALRCV_WAKEUP_HSFEEDBACK :
if ( ! hot_standby_feedback | | wal_receiver_status_interval < = 0 )
wakeup [ reason ] = PG_INT64_MAX ;
else
wakeup [ reason ] = now + wal_receiver_status_interval * INT64CONST ( 1000000 ) ;
break ;
case WALRCV_WAKEUP_REPLY :
if ( wal_receiver_status_interval < = 0 )
wakeup [ reason ] = PG_INT64_MAX ;
else
wakeup [ reason ] = now + wal_receiver_status_interval * INT64CONST ( 1000000 ) ;
break ;
default :
break ;
}
}
/*
* Wake up the walreceiver main loop .
*