@ -190,6 +190,26 @@ static volatile sig_atomic_t replication_active = false;
static LogicalDecodingContext * logical_decoding_ctx = NULL ;
static XLogRecPtr logical_startptr = InvalidXLogRecPtr ;
/* A sample associating a log position with the time it was written. */
typedef struct
{
XLogRecPtr lsn ;
TimestampTz time ;
} WalTimeSample ;
/* The size of our buffer of time samples. */
# define LAG_TRACKER_BUFFER_SIZE 8192
/* A mechanism for tracking replication lag. */
static struct
{
XLogRecPtr last_lsn ;
WalTimeSample buffer [ LAG_TRACKER_BUFFER_SIZE ] ;
int write_head ;
int read_heads [ NUM_SYNC_REP_WAIT_MODE ] ;
WalTimeSample last_read [ NUM_SYNC_REP_WAIT_MODE ] ;
} LagTracker ;
/* Signal handlers */
static void WalSndSigHupHandler ( SIGNAL_ARGS ) ;
static void WalSndXLogSendHandler ( SIGNAL_ARGS ) ;
@ -221,6 +241,7 @@ static long WalSndComputeSleeptime(TimestampTz now);
static void WalSndPrepareWrite ( LogicalDecodingContext * ctx , XLogRecPtr lsn , TransactionId xid , bool last_write ) ;
static void WalSndWriteData ( LogicalDecodingContext * ctx , XLogRecPtr lsn , TransactionId xid , bool last_write ) ;
static XLogRecPtr WalSndWaitForWal ( XLogRecPtr loc ) ;
static TimeOffset LagTrackerRead ( int head , XLogRecPtr lsn , TimestampTz now ) ;
static void XLogRead ( char * buf , XLogRecPtr startptr , Size count ) ;
@ -246,6 +267,9 @@ InitWalSender(void)
*/
MarkPostmasterChildWalSender ( ) ;
SendPostmasterSignal ( PMSIGNAL_ADVANCE_STATE_MACHINE ) ;
/* Initialize empty timestamp buffer for lag tracking. */
memset ( & LagTracker , 0 , sizeof ( LagTracker ) ) ;
}
/*
@ -1646,6 +1670,13 @@ ProcessStandbyReplyMessage(void)
flushPtr ,
applyPtr ;
bool replyRequested ;
TimeOffset writeLag ,
flushLag ,
applyLag ;
bool clearLagTimes ;
TimestampTz now ;
static bool fullyAppliedLastTime = false ;
/* the caller already consumed the msgtype byte */
writePtr = pq_getmsgint64 ( & reply_message ) ;
@ -1660,6 +1691,30 @@ ProcessStandbyReplyMessage(void)
( uint32 ) ( applyPtr > > 32 ) , ( uint32 ) applyPtr ,
replyRequested ? " (reply requested) " : " " ) ;
/* See if we can compute the round-trip lag for these positions. */
now = GetCurrentTimestamp ( ) ;
writeLag = LagTrackerRead ( SYNC_REP_WAIT_WRITE , writePtr , now ) ;
flushLag = LagTrackerRead ( SYNC_REP_WAIT_FLUSH , flushPtr , now ) ;
applyLag = LagTrackerRead ( SYNC_REP_WAIT_APPLY , applyPtr , now ) ;
/*
* If the standby reports that it has fully replayed the WAL in two
* consecutive reply messages , then the second such message must result
* from wal_receiver_status_interval expiring on the standby . This is a
* convenient time to forget the lag times measured when it last
* wrote / flushed / applied a WAL record , to avoid displaying stale lag data
* until more WAL traffic arrives .
*/
clearLagTimes = false ;
if ( applyPtr = = sentPtr )
{
if ( fullyAppliedLastTime )
clearLagTimes = true ;
fullyAppliedLastTime = true ;
}
else
fullyAppliedLastTime = false ;
/* Send a reply if the standby requested one. */
if ( replyRequested )
WalSndKeepalive ( false ) ;
@ -1675,6 +1730,12 @@ ProcessStandbyReplyMessage(void)
walsnd - > write = writePtr ;
walsnd - > flush = flushPtr ;
walsnd - > apply = applyPtr ;
if ( writeLag ! = - 1 | | clearLagTimes )
walsnd - > writeLag = writeLag ;
if ( flushLag ! = - 1 | | clearLagTimes )
walsnd - > flushLag = flushLag ;
if ( applyLag ! = - 1 | | clearLagTimes )
walsnd - > applyLag = applyLag ;
SpinLockRelease ( & walsnd - > mutex ) ;
}
@ -2063,6 +2124,9 @@ InitWalSenderSlot(void)
walsnd - > write = InvalidXLogRecPtr ;
walsnd - > flush = InvalidXLogRecPtr ;
walsnd - > apply = InvalidXLogRecPtr ;
walsnd - > writeLag = - 1 ;
walsnd - > flushLag = - 1 ;
walsnd - > applyLag = - 1 ;
walsnd - > state = WALSNDSTATE_STARTUP ;
walsnd - > latch = & MyProc - > procLatch ;
SpinLockRelease ( & walsnd - > mutex ) ;
@ -2389,6 +2453,32 @@ XLogSendPhysical(void)
SendRqstPtr = GetFlushRecPtr ( ) ;
}
/*
* Record the current system time as an approximation of the time at which
* this WAL position was written for the purposes of lag tracking .
*
* In theory we could make XLogFlush ( ) record a time in shmem whenever WAL
* is flushed and we could get that time as well as the LSN when we call
* GetFlushRecPtr ( ) above ( and likewise for the cascading standby
* equivalent ) , but rather than putting any new code into the hot WAL path
* it seems good enough to capture the time here . We should reach this
* after XLogFlush ( ) runs WalSndWakeupProcessRequests ( ) , and although that
* may take some time , we read the WAL flush pointer and take the time
* very close to together here so that we ' ll get a later position if it
* is still moving .
*
* Because LagTrackerWriter ignores samples when the LSN hasn ' t advanced ,
* this gives us a cheap approximation for the WAL flush time for this
* LSN .
*
* Note that the LSN is not necessarily the LSN for the data contained in
* the present message ; it ' s the end of the the WAL , which might be
* further ahead . All the lag tracking machinery cares about is finding
* out when that arbitrary LSN is eventually reported as written , flushed
* and applied , so that it can measure the elapsed time .
*/
LagTrackerWrite ( SendRqstPtr , GetCurrentTimestamp ( ) ) ;
/*
* If this is a historic timeline and we ' ve reached the point where we
* forked to the next timeline , stop streaming .
@ -2543,6 +2633,11 @@ XLogSendLogical(void)
if ( record ! = NULL )
{
/*
* Note the lack of any call to LagTrackerWrite ( ) which is the responsibility
* of the logical decoding plugin . Response messages are handled normally ,
* so this responsibility does not extend to needing to call LagTrackerRead ( ) .
*/
LogicalDecodingProcessRecord ( logical_decoding_ctx , logical_decoding_ctx - > reader ) ;
sentPtr = logical_decoding_ctx - > reader - > EndRecPtr ;
@ -2839,6 +2934,17 @@ WalSndGetStateString(WalSndState state)
return " UNKNOWN " ;
}
static Interval *
offset_to_interval ( TimeOffset offset )
{
Interval * result = palloc ( sizeof ( Interval ) ) ;
result - > month = 0 ;
result - > day = 0 ;
result - > time = offset ;
return result ;
}
/*
* Returns activity of walsenders , including pids and xlog locations sent to
@ -2847,7 +2953,7 @@ WalSndGetStateString(WalSndState state)
Datum
pg_stat_get_wal_senders ( PG_FUNCTION_ARGS )
{
# define PG_STAT_GET_WAL_SENDERS_COLS 8
# define PG_STAT_GET_WAL_SENDERS_COLS 11
ReturnSetInfo * rsinfo = ( ReturnSetInfo * ) fcinfo - > resultinfo ;
TupleDesc tupdesc ;
Tuplestorestate * tupstore ;
@ -2895,6 +3001,9 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
XLogRecPtr write ;
XLogRecPtr flush ;
XLogRecPtr apply ;
TimeOffset writeLag ;
TimeOffset flushLag ;
TimeOffset applyLag ;
int priority ;
WalSndState state ;
Datum values [ PG_STAT_GET_WAL_SENDERS_COLS ] ;
@ -2909,6 +3018,9 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
write = walsnd - > write ;
flush = walsnd - > flush ;
apply = walsnd - > apply ;
writeLag = walsnd - > writeLag ;
flushLag = walsnd - > flushLag ;
applyLag = walsnd - > applyLag ;
priority = walsnd - > sync_standby_priority ;
SpinLockRelease ( & walsnd - > mutex ) ;
@ -2950,7 +3062,22 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
*/
priority = XLogRecPtrIsInvalid ( walsnd - > flush ) ? 0 : priority ;
values [ 6 ] = Int32GetDatum ( priority ) ;
if ( writeLag < 0 )
nulls [ 6 ] = true ;
else
values [ 6 ] = IntervalPGetDatum ( offset_to_interval ( writeLag ) ) ;
if ( flushLag < 0 )
nulls [ 7 ] = true ;
else
values [ 7 ] = IntervalPGetDatum ( offset_to_interval ( flushLag ) ) ;
if ( applyLag < 0 )
nulls [ 8 ] = true ;
else
values [ 8 ] = IntervalPGetDatum ( offset_to_interval ( applyLag ) ) ;
values [ 9 ] = Int32GetDatum ( priority ) ;
/*
* More easily understood version of standby state . This is purely
@ -2964,12 +3091,12 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
* states . We report just " quorum " for them .
*/
if ( priority = = 0 )
values [ 7 ] = CStringGetTextDatum ( " async " ) ;
values [ 10 ] = CStringGetTextDatum ( " async " ) ;
else if ( list_member_int ( sync_standbys , i ) )
values [ 7 ] = SyncRepConfig - > syncrep_method = = SYNC_REP_PRIORITY ?
values [ 10 ] = SyncRepConfig - > syncrep_method = = SYNC_REP_PRIORITY ?
CStringGetTextDatum ( " sync " ) : CStringGetTextDatum ( " quorum " ) ;
else
values [ 7 ] = CStringGetTextDatum ( " potential " ) ;
values [ 10 ] = CStringGetTextDatum ( " potential " ) ;
}
tuplestore_putvalues ( tupstore , tupdesc , values , nulls ) ;
@ -3037,3 +3164,143 @@ WalSndKeepaliveIfNecessary(TimestampTz now)
WalSndShutdown ( ) ;
}
}
/*
* Record the end of the WAL and the time it was flushed locally , so that
* LagTrackerRead can compute the elapsed time ( lag ) when this WAL position is
* eventually reported to have been written , flushed and applied by the
* standby in a reply message .
* Exported to allow logical decoding plugins to call this when they choose .
*/
void
LagTrackerWrite ( XLogRecPtr lsn , TimestampTz local_flush_time )
{
bool buffer_full ;
int new_write_head ;
int i ;
if ( ! am_walsender )
return ;
/*
* If the lsn hasn ' t advanced since last time , then do nothing . This way
* we only record a new sample when new WAL has been written .
*/
if ( LagTracker . last_lsn = = lsn )
return ;
LagTracker . last_lsn = lsn ;
/*
* If advancing the write head of the circular buffer would crash into any
* of the read heads , then the buffer is full . In other words , the
* slowest reader ( presumably apply ) is the one that controls the release
* of space .
*/
new_write_head = ( LagTracker . write_head + 1 ) % LAG_TRACKER_BUFFER_SIZE ;
buffer_full = false ;
for ( i = 0 ; i < NUM_SYNC_REP_WAIT_MODE ; + + i )
{
if ( new_write_head = = LagTracker . read_heads [ i ] )
buffer_full = true ;
}
/*
* If the buffer is full , for now we just rewind by one slot and overwrite
* the last sample , as a simple ( if somewhat uneven ) way to lower the
* sampling rate . There may be better adaptive compaction algorithms .
*/
if ( buffer_full )
{
new_write_head = LagTracker . write_head ;
if ( LagTracker . write_head > 0 )
LagTracker . write_head - - ;
else
LagTracker . write_head = LAG_TRACKER_BUFFER_SIZE - 1 ;
}
/* Store a sample at the current write head position. */
LagTracker . buffer [ LagTracker . write_head ] . lsn = lsn ;
LagTracker . buffer [ LagTracker . write_head ] . time = local_flush_time ;
LagTracker . write_head = new_write_head ;
}
/*
* Find out how much time has elapsed between the moment WAL position ' lsn '
* ( or the highest known earlier LSN ) was flushed locally and the time ' now ' .
* We have a separate read head for each of the reported LSN locations we
* receive in replies from standby ; ' head ' controls which read head is
* used . Whenever a read head crosses an LSN which was written into the
* lag buffer with LagTrackerWrite , we can use the associated timestamp to
* find out the time this LSN ( or an earlier one ) was flushed locally , and
* therefore compute the lag .
*
* Return - 1 if no new sample data is available , and otherwise the elapsed
* time in microseconds .
*/
static TimeOffset
LagTrackerRead ( int head , XLogRecPtr lsn , TimestampTz now )
{
TimestampTz time = 0 ;
/* Read all unread samples up to this LSN or end of buffer. */
while ( LagTracker . read_heads [ head ] ! = LagTracker . write_head & &
LagTracker . buffer [ LagTracker . read_heads [ head ] ] . lsn < = lsn )
{
time = LagTracker . buffer [ LagTracker . read_heads [ head ] ] . time ;
LagTracker . last_read [ head ] =
LagTracker . buffer [ LagTracker . read_heads [ head ] ] ;
LagTracker . read_heads [ head ] =
( LagTracker . read_heads [ head ] + 1 ) % LAG_TRACKER_BUFFER_SIZE ;
}
if ( time > now )
{
/* If the clock somehow went backwards, treat as not found. */
return - 1 ;
}
else if ( time = = 0 )
{
/*
* We didn ' t cross a time . If there is a future sample that we
* haven ' t reached yet , and we ' ve already reached at least one sample ,
* let ' s interpolate the local flushed time . This is mainly useful for
* reporting a completely stuck apply position as having increasing
* lag , since otherwise we ' d have to wait for it to eventually start
* moving again and cross one of our samples before we can show the
* lag increasing .
*/
if ( LagTracker . read_heads [ head ] ! = LagTracker . write_head & &
LagTracker . last_read [ head ] . time ! = 0 )
{
double fraction ;
WalTimeSample prev = LagTracker . last_read [ head ] ;
WalTimeSample next = LagTracker . buffer [ LagTracker . read_heads [ head ] ] ;
Assert ( lsn > = prev . lsn ) ;
Assert ( prev . lsn < next . lsn ) ;
if ( prev . time > next . time )
{
/* If the clock somehow went backwards, treat as not found. */
return - 1 ;
}
/* See how far we are between the previous and next samples. */
fraction =
( double ) ( lsn - prev . lsn ) / ( double ) ( next . lsn - prev . lsn ) ;
/* Scale the local flush time proportionally. */
time = ( TimestampTz )
( ( double ) prev . time + ( next . time - prev . time ) * fraction ) ;
}
else
{
/* Couldn't interpolate due to lack of data. */
return - 1 ;
}
}
/* Return the elapsed time since local flush time in microseconds. */
Assert ( time ! = 0 ) ;
return now - time ;
}