@ -254,7 +254,6 @@ static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
static void WalSndSegmentOpen ( XLogReaderState * state , XLogSegNo nextSegNo ,
static void WalSndSegmentOpen ( XLogReaderState * state , XLogSegNo nextSegNo ,
TimeLineID * tli_p ) ;
TimeLineID * tli_p ) ;
static void UpdateSpillStats ( LogicalDecodingContext * ctx ) ;
/* Initialize walsender process before entering the main command loop */
/* Initialize walsender process before entering the main command loop */
@ -1348,8 +1347,7 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
/*
/*
* LogicalDecodingContext ' update_progress ' callback .
* LogicalDecodingContext ' update_progress ' callback .
*
*
* Write the current position to the lag tracker ( see XLogSendPhysical ) ,
* Write the current position to the lag tracker ( see XLogSendPhysical ) .
* and update the spill statistics .
*/
*/
static void
static void
WalSndUpdateProgress ( LogicalDecodingContext * ctx , XLogRecPtr lsn , TransactionId xid )
WalSndUpdateProgress ( LogicalDecodingContext * ctx , XLogRecPtr lsn , TransactionId xid )
@ -1368,11 +1366,6 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
LagTrackerWrite ( lsn , now ) ;
LagTrackerWrite ( lsn , now ) ;
sendTime = now ;
sendTime = now ;
/*
* Update statistics about transactions that spilled to disk .
*/
UpdateSpillStats ( ctx ) ;
}
}
/*
/*
@ -2418,9 +2411,6 @@ InitWalSenderSlot(void)
walsnd - > sync_standby_priority = 0 ;
walsnd - > sync_standby_priority = 0 ;
walsnd - > latch = & MyProc - > procLatch ;
walsnd - > latch = & MyProc - > procLatch ;
walsnd - > replyTime = 0 ;
walsnd - > replyTime = 0 ;
walsnd - > spillTxns = 0 ;
walsnd - > spillCount = 0 ;
walsnd - > spillBytes = 0 ;
SpinLockRelease ( & walsnd - > mutex ) ;
SpinLockRelease ( & walsnd - > mutex ) ;
/* don't need the lock anymore */
/* don't need the lock anymore */
MyWalSnd = ( WalSnd * ) walsnd ;
MyWalSnd = ( WalSnd * ) walsnd ;
@ -3256,7 +3246,7 @@ offset_to_interval(TimeOffset offset)
Datum
Datum
pg_stat_get_wal_senders ( PG_FUNCTION_ARGS )
pg_stat_get_wal_senders ( PG_FUNCTION_ARGS )
{
{
# define PG_STAT_GET_WAL_SENDERS_COLS 15
# define PG_STAT_GET_WAL_SENDERS_COLS 12
ReturnSetInfo * rsinfo = ( ReturnSetInfo * ) fcinfo - > resultinfo ;
ReturnSetInfo * rsinfo = ( ReturnSetInfo * ) fcinfo - > resultinfo ;
TupleDesc tupdesc ;
TupleDesc tupdesc ;
Tuplestorestate * tupstore ;
Tuplestorestate * tupstore ;
@ -3310,9 +3300,6 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
int pid ;
int pid ;
WalSndState state ;
WalSndState state ;
TimestampTz replyTime ;
TimestampTz replyTime ;
int64 spillTxns ;
int64 spillCount ;
int64 spillBytes ;
bool is_sync_standby ;
bool is_sync_standby ;
Datum values [ PG_STAT_GET_WAL_SENDERS_COLS ] ;
Datum values [ PG_STAT_GET_WAL_SENDERS_COLS ] ;
bool nulls [ PG_STAT_GET_WAL_SENDERS_COLS ] ;
bool nulls [ PG_STAT_GET_WAL_SENDERS_COLS ] ;
@ -3336,9 +3323,6 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
applyLag = walsnd - > applyLag ;
applyLag = walsnd - > applyLag ;
priority = walsnd - > sync_standby_priority ;
priority = walsnd - > sync_standby_priority ;
replyTime = walsnd - > replyTime ;
replyTime = walsnd - > replyTime ;
spillTxns = walsnd - > spillTxns ;
spillCount = walsnd - > spillCount ;
spillBytes = walsnd - > spillBytes ;
SpinLockRelease ( & walsnd - > mutex ) ;
SpinLockRelease ( & walsnd - > mutex ) ;
/*
/*
@ -3436,11 +3420,6 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
nulls [ 11 ] = true ;
nulls [ 11 ] = true ;
else
else
values [ 11 ] = TimestampTzGetDatum ( replyTime ) ;
values [ 11 ] = TimestampTzGetDatum ( replyTime ) ;
/* spill to disk */
values [ 12 ] = Int64GetDatum ( spillTxns ) ;
values [ 13 ] = Int64GetDatum ( spillCount ) ;
values [ 14 ] = Int64GetDatum ( spillBytes ) ;
}
}
tuplestore_putvalues ( tupstore , tupdesc , values , nulls ) ;
tuplestore_putvalues ( tupstore , tupdesc , values , nulls ) ;
@ -3677,21 +3656,3 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Assert ( time ! = 0 ) ;
Assert ( time ! = 0 ) ;
return now - time ;
return now - time ;
}
}
static void
UpdateSpillStats ( LogicalDecodingContext * ctx )
{
ReorderBuffer * rb = ctx - > reorder ;
elog ( DEBUG2 , " UpdateSpillStats: updating stats %p %lld %lld %lld " ,
rb ,
( long long ) rb - > spillTxns ,
( long long ) rb - > spillCount ,
( long long ) rb - > spillBytes ) ;
SpinLockAcquire ( & MyWalSnd - > mutex ) ;
MyWalSnd - > spillTxns = rb - > spillTxns ;
MyWalSnd - > spillCount = rb - > spillCount ;
MyWalSnd - > spillBytes = rb - > spillBytes ;
SpinLockRelease ( & MyWalSnd - > mutex ) ;
}