@ -248,6 +248,7 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
static TimeOffset LagTrackerRead ( int head , XLogRecPtr lsn , TimestampTz now ) ;
static bool TransactionIdInRecentPast ( TransactionId xid , uint32 epoch ) ;
static void UpdateSpillStats ( LogicalDecodingContext * ctx ) ;
static void XLogRead ( WALSegmentContext * segcxt , char * buf , XLogRecPtr startptr , Size count ) ;
@ -1261,7 +1262,8 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
/*
* LogicalDecodingContext ' update_progress ' callback .
*
* Write the current position to the lag tracker ( see XLogSendPhysical ) .
* Write the current position to the lag tracker ( see XLogSendPhysical ) ,
* and update the spill statistics .
*/
static void
WalSndUpdateProgress ( LogicalDecodingContext * ctx , XLogRecPtr lsn , TransactionId xid )
@ -1280,6 +1282,11 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
LagTrackerWrite ( lsn , now ) ;
sendTime = now ;
/*
* Update statistics about transactions that spilled to disk .
*/
UpdateSpillStats ( ctx ) ;
}
/*
@ -2318,6 +2325,9 @@ InitWalSenderSlot(void)
walsnd - > state = WALSNDSTATE_STARTUP ;
walsnd - > latch = & MyProc - > procLatch ;
walsnd - > replyTime = 0 ;
walsnd - > spillTxns = 0 ;
walsnd - > spillCount = 0 ;
walsnd - > spillBytes = 0 ;
SpinLockRelease ( & walsnd - > mutex ) ;
/* don't need the lock anymore */
MyWalSnd = ( WalSnd * ) walsnd ;
@ -3219,7 +3229,7 @@ offset_to_interval(TimeOffset offset)
Datum
pg_stat_get_wal_senders ( PG_FUNCTION_ARGS )
{
# define PG_STAT_GET_WAL_SENDERS_COLS 12
# define PG_STAT_GET_WAL_SENDERS_COLS 15
ReturnSetInfo * rsinfo = ( ReturnSetInfo * ) fcinfo - > resultinfo ;
TupleDesc tupdesc ;
Tuplestorestate * tupstore ;
@ -3274,6 +3284,9 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
int pid ;
WalSndState state ;
TimestampTz replyTime ;
int64 spillTxns ;
int64 spillCount ;
int64 spillBytes ;
Datum values [ PG_STAT_GET_WAL_SENDERS_COLS ] ;
bool nulls [ PG_STAT_GET_WAL_SENDERS_COLS ] ;
@ -3294,6 +3307,9 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
applyLag = walsnd - > applyLag ;
priority = walsnd - > sync_standby_priority ;
replyTime = walsnd - > replyTime ;
spillTxns = walsnd - > spillTxns ;
spillCount = walsnd - > spillCount ;
spillBytes = walsnd - > spillBytes ;
SpinLockRelease ( & walsnd - > mutex ) ;
memset ( nulls , 0 , sizeof ( nulls ) ) ;
@ -3375,6 +3391,11 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
nulls [ 11 ] = true ;
else
values [ 11 ] = TimestampTzGetDatum ( replyTime ) ;
/* spill to disk */
values [ 12 ] = Int64GetDatum ( spillTxns ) ;
values [ 13 ] = Int64GetDatum ( spillCount ) ;
values [ 14 ] = Int64GetDatum ( spillBytes ) ;
}
tuplestore_putvalues ( tupstore , tupdesc , values , nulls ) ;
@ -3611,3 +3632,20 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Assert ( time ! = 0 ) ;
return now - time ;
}
static void
UpdateSpillStats ( LogicalDecodingContext * ctx )
{
ReorderBuffer * rb = ctx - > reorder ;
SpinLockAcquire ( & MyWalSnd - > mutex ) ;
MyWalSnd - > spillTxns = rb - > spillTxns ;
MyWalSnd - > spillCount = rb - > spillCount ;
MyWalSnd - > spillBytes = rb - > spillBytes ;
elog ( DEBUG2 , " UpdateSpillStats: updating stats %p %ld %ld %ld " ,
rb , rb - > spillTxns , rb - > spillCount , rb - > spillBytes ) ;
SpinLockRelease ( & MyWalSnd - > mutex ) ;
}