@ -51,6 +51,7 @@
# include "postmaster/fork_process.h"
# include "postmaster/interrupt.h"
# include "postmaster/postmaster.h"
# include "replication/slot.h"
# include "replication/walsender.h"
# include "storage/backendid.h"
# include "storage/dsm.h"
@ -284,6 +285,8 @@ static PgStat_ArchiverStats archiverStats;
static PgStat_GlobalStats globalStats ;
static PgStat_WalStats walStats ;
static PgStat_SLRUStats slruStats [ SLRU_NUM_ELEMENTS ] ;
static PgStat_ReplSlotStats * replSlotStats ;
static int nReplSlotStats ;
/*
* List of OIDs of databases we need to write out . If an entry is InvalidOid ,
@ -324,6 +327,9 @@ static void pgstat_read_current_status(void);
static bool pgstat_write_statsfile_needed ( void ) ;
static bool pgstat_db_requested ( Oid databaseid ) ;
static int pgstat_replslot_index ( const char * name , bool create_it ) ;
static void pgstat_reset_replslot ( int i , TimestampTz ts ) ;
static void pgstat_send_tabstat ( PgStat_MsgTabstat * tsmsg ) ;
static void pgstat_send_funcstats ( void ) ;
static void pgstat_send_slru ( void ) ;
@ -350,6 +356,7 @@ static void pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len);
static void pgstat_recv_resetsharedcounter ( PgStat_MsgResetsharedcounter * msg , int len ) ;
static void pgstat_recv_resetsinglecounter ( PgStat_MsgResetsinglecounter * msg , int len ) ;
static void pgstat_recv_resetslrucounter ( PgStat_MsgResetslrucounter * msg , int len ) ;
static void pgstat_recv_resetreplslotcounter ( PgStat_MsgResetreplslotcounter * msg , int len ) ;
static void pgstat_recv_autovac ( PgStat_MsgAutovacStart * msg , int len ) ;
static void pgstat_recv_vacuum ( PgStat_MsgVacuum * msg , int len ) ;
static void pgstat_recv_analyze ( PgStat_MsgAnalyze * msg , int len ) ;
@ -362,6 +369,7 @@ static void pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len);
static void pgstat_recv_recoveryconflict ( PgStat_MsgRecoveryConflict * msg , int len ) ;
static void pgstat_recv_deadlock ( PgStat_MsgDeadlock * msg , int len ) ;
static void pgstat_recv_checksum_failure ( PgStat_MsgChecksumFailure * msg , int len ) ;
static void pgstat_recv_replslot ( PgStat_MsgReplSlot * msg , int len ) ;
static void pgstat_recv_tempfile ( PgStat_MsgTempFile * msg , int len ) ;
/* ------------------------------------------------------------
@ -1437,6 +1445,61 @@ pgstat_reset_slru_counter(const char *name)
pgstat_send ( & msg , sizeof ( msg ) ) ;
}
/* ----------
* pgstat_reset_replslot_counter ( ) -
*
* Tell the statistics collector to reset a single replication slot
* counter , or all replication slots counters ( when name is null ) .
*
* Permission checking for this function is managed through the normal
* GRANT system .
* - - - - - - - - - -
*/
void
pgstat_reset_replslot_counter ( const char * name )
{
PgStat_MsgResetreplslotcounter msg ;
if ( pgStatSock = = PGINVALID_SOCKET )
return ;
if ( name )
{
ReplicationSlot * slot ;
/*
* Check if the slot exits with the given name . It is possible that by
* the time this message is executed the slot is dropped but at least
* this check will ensure that the given name is for a valid slot .
*/
LWLockAcquire ( ReplicationSlotControlLock , LW_SHARED ) ;
slot = SearchNamedReplicationSlot ( name ) ;
LWLockRelease ( ReplicationSlotControlLock ) ;
if ( ! slot )
ereport ( ERROR ,
( errcode ( ERRCODE_INVALID_PARAMETER_VALUE ) ,
errmsg ( " replication slot \" %s \" does not exist " ,
name ) ) ) ;
/*
* Nothing to do for physical slots as we collect stats only for
* logical slots .
*/
if ( SlotIsPhysical ( slot ) )
return ;
memcpy ( & msg . m_slotname , name , NAMEDATALEN ) ;
msg . clearall = false ;
}
else
msg . clearall = true ;
pgstat_setheader ( & msg . m_hdr , PGSTAT_MTYPE_RESETREPLSLOTCOUNTER ) ;
pgstat_send ( & msg , sizeof ( msg ) ) ;
}
/* ----------
* pgstat_report_autovac ( ) -
*
@ -1637,6 +1700,46 @@ pgstat_report_tempfile(size_t filesize)
pgstat_send ( & msg , sizeof ( msg ) ) ;
}
/* ----------
* pgstat_report_replslot ( ) -
*
* Tell the collector about replication slot statistics .
* - - - - - - - - - -
*/
void
pgstat_report_replslot ( const char * slotname , int spilltxns , int spillcount ,
int spillbytes )
{
PgStat_MsgReplSlot msg ;
/*
* Prepare and send the message
*/
pgstat_setheader ( & msg . m_hdr , PGSTAT_MTYPE_REPLSLOT ) ;
memcpy ( & msg . m_slotname , slotname , NAMEDATALEN ) ;
msg . m_drop = false ;
msg . m_spill_txns = spilltxns ;
msg . m_spill_count = spillcount ;
msg . m_spill_bytes = spillbytes ;
pgstat_send ( & msg , sizeof ( PgStat_MsgReplSlot ) ) ;
}
/* ----------
* pgstat_report_replslot_drop ( ) -
*
* Tell the collector about dropping the replication slot .
* - - - - - - - - - -
*/
void
pgstat_report_replslot_drop ( const char * slotname )
{
PgStat_MsgReplSlot msg ;
pgstat_setheader ( & msg . m_hdr , PGSTAT_MTYPE_REPLSLOT ) ;
memcpy ( & msg . m_slotname , slotname , NAMEDATALEN ) ;
msg . m_drop = true ;
pgstat_send ( & msg , sizeof ( PgStat_MsgReplSlot ) ) ;
}
/* ----------
* pgstat_ping ( ) -
@ -2714,6 +2817,23 @@ pgstat_fetch_slru(void)
return slruStats ;
}
/*
* - - - - - - - - -
* pgstat_fetch_replslot ( ) -
*
* Support function for the SQL - callable pgstat * functions . Returns
* a pointer to the replication slot statistics struct and sets the
* number of entries in nslots_p .
* - - - - - - - - -
*/
PgStat_ReplSlotStats *
pgstat_fetch_replslot ( int * nslots_p )
{
backend_read_statsfile ( ) ;
* nslots_p = nReplSlotStats ;
return replSlotStats ;
}
/* ------------------------------------------------------------
* Functions for management of the shared - memory PgBackendStatus array
@ -4693,6 +4813,11 @@ PgstatCollectorMain(int argc, char *argv[])
len ) ;
break ;
case PGSTAT_MTYPE_RESETREPLSLOTCOUNTER :
pgstat_recv_resetreplslotcounter ( & msg . msg_resetreplslotcounter ,
len ) ;
break ;
case PGSTAT_MTYPE_AUTOVAC_START :
pgstat_recv_autovac ( & msg . msg_autovacuum_start , len ) ;
break ;
@ -4747,6 +4872,10 @@ PgstatCollectorMain(int argc, char *argv[])
len ) ;
break ;
case PGSTAT_MTYPE_REPLSLOT :
pgstat_recv_replslot ( & msg . msg_replslot , len ) ;
break ;
default :
break ;
}
@ -4946,6 +5075,7 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
const char * tmpfile = permanent ? PGSTAT_STAT_PERMANENT_TMPFILE : pgstat_stat_tmpname ;
const char * statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename ;
int rc ;
int i ;
elog ( DEBUG2 , " writing stats file \" %s \" " , statfile ) ;
@ -5025,6 +5155,16 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
( void ) rc ; /* we'll check for error with ferror */
}
/*
* Write replication slot stats struct
*/
for ( i = 0 ; i < nReplSlotStats ; i + + )
{
fputc ( ' R ' , fpout ) ;
rc = fwrite ( & replSlotStats [ i ] , sizeof ( PgStat_ReplSlotStats ) , 1 , fpout ) ;
( void ) rc ; /* we'll check for error with ferror */
}
/*
* No more output to be done . Close the temp file and replace the old
* pgstat . stat with it . The ferror ( ) check replaces testing for error
@ -5250,6 +5390,10 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
dbhash = hash_create ( " Databases hash " , PGSTAT_DB_HASH_SIZE , & hash_ctl ,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT ) ;
/* Allocate the space for replication slot statistics */
replSlotStats = palloc0 ( max_replication_slots * sizeof ( PgStat_ReplSlotStats ) ) ;
nReplSlotStats = 0 ;
/*
* Clear out global , archiver , WAL and SLRU statistics so they start from
* zero in case we can ' t load an existing statsfile .
@ -5273,6 +5417,12 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
for ( i = 0 ; i < SLRU_NUM_ELEMENTS ; i + + )
slruStats [ i ] . stat_reset_timestamp = globalStats . stat_reset_timestamp ;
/*
* Set the same reset timestamp for all replication slots too .
*/
for ( i = 0 ; i < max_replication_slots ; i + + )
replSlotStats [ i ] . stat_reset_timestamp = globalStats . stat_reset_timestamp ;
/*
* Try to open the stats file . If it doesn ' t exist , the backends simply
* return zero for anything and the collector simply starts from scratch
@ -5447,6 +5597,23 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
break ;
/*
* ' R ' A PgStat_ReplSlotStats struct describing a replication
* slot follows .
*/
case ' R ' :
if ( fread ( & replSlotStats [ nReplSlotStats ] , 1 , sizeof ( PgStat_ReplSlotStats ) , fpin )
! = sizeof ( PgStat_ReplSlotStats ) )
{
ereport ( pgStatRunningInCollector ? LOG : WARNING ,
( errmsg ( " corrupted statistics file \" %s \" " ,
statfile ) ) ) ;
memset ( & replSlotStats [ nReplSlotStats ] , 0 , sizeof ( PgStat_ReplSlotStats ) ) ;
goto done ;
}
nReplSlotStats + + ;
break ;
case ' E ' :
goto done ;
@ -5658,6 +5825,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
PgStat_ArchiverStats myArchiverStats ;
PgStat_WalStats myWalStats ;
PgStat_SLRUStats mySLRUStats [ SLRU_NUM_ELEMENTS ] ;
PgStat_ReplSlotStats myReplSlotStats ;
FILE * fpin ;
int32 format_id ;
const char * statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename ;
@ -5772,6 +5940,22 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
break ;
/*
* ' R ' A PgStat_ReplSlotStats struct describing a replication
* slot follows .
*/
case ' R ' :
if ( fread ( & myReplSlotStats , 1 , sizeof ( PgStat_ReplSlotStats ) , fpin )
! = sizeof ( PgStat_ReplSlotStats ) )
{
ereport ( pgStatRunningInCollector ? LOG : WARNING ,
( errmsg ( " corrupted statistics file \" %s \" " ,
statfile ) ) ) ;
FreeFile ( fpin ) ;
return false ;
}
break ;
case ' E ' :
goto done ;
@ -6367,6 +6551,46 @@ pgstat_recv_resetslrucounter(PgStat_MsgResetslrucounter *msg, int len)
}
}
/* ----------
* pgstat_recv_resetreplslotcounter ( ) -
*
* Reset some replication slot statistics of the cluster .
* - - - - - - - - - -
*/
static void
pgstat_recv_resetreplslotcounter ( PgStat_MsgResetreplslotcounter * msg ,
int len )
{
int i ;
int idx = - 1 ;
TimestampTz ts ;
ts = GetCurrentTimestamp ( ) ;
if ( msg - > clearall )
{
for ( i = 0 ; i < nReplSlotStats ; i + + )
pgstat_reset_replslot ( i , ts ) ;
}
else
{
/* Get the index of replication slot statistics to reset */
idx = pgstat_replslot_index ( msg - > m_slotname , false ) ;
/*
* Nothing to do if the given slot entry is not found . This could
* happen when the slot with the given name is removed and the
* corresponding statistics entry is also removed before receiving the
* reset message .
*/
if ( idx < 0 )
return ;
/* Reset the stats for the requested replication slot */
pgstat_reset_replslot ( idx , ts ) ;
}
}
/* ----------
* pgstat_recv_autovac ( ) -
*
@ -6626,6 +6850,51 @@ pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len)
dbentry - > last_checksum_failure = msg - > m_failure_time ;
}
/* ----------
* pgstat_recv_replslot ( ) -
*
* Process a REPLSLOT message .
* - - - - - - - - - -
*/
static void
pgstat_recv_replslot ( PgStat_MsgReplSlot * msg , int len )
{
int idx ;
/*
* Get the index of replication slot statistics . On dropping , we don ' t
* create the new statistics .
*/
idx = pgstat_replslot_index ( msg - > m_slotname , ! msg - > m_drop ) ;
/*
* The slot entry is not found or there is no space to accommodate the new
* entry . This could happen when the message for the creation of a slot
* reached before the drop message even though the actual operations
* happen in reverse order . In such a case , the next update of the
* statistics for the same slot will create the required entry .
*/
if ( idx < 0 )
return ;
Assert ( idx > = 0 & & idx < = max_replication_slots ) ;
if ( msg - > m_drop )
{
/* Remove the replication slot statistics with the given name */
memcpy ( & replSlotStats [ idx ] , & replSlotStats [ nReplSlotStats - 1 ] ,
sizeof ( PgStat_ReplSlotStats ) ) ;
nReplSlotStats - - ;
Assert ( nReplSlotStats > = 0 ) ;
}
else
{
/* Update the replication slot statistics */
replSlotStats [ idx ] . spill_txns + = msg - > m_spill_txns ;
replSlotStats [ idx ] . spill_count + = msg - > m_spill_count ;
replSlotStats [ idx ] . spill_bytes + = msg - > m_spill_bytes ;
}
}
/* ----------
* pgstat_recv_tempfile ( ) -
*
@ -6808,6 +7077,57 @@ pgstat_clip_activity(const char *raw_activity)
return activity ;
}
/* ----------
* pgstat_replslot_index
*
* Return the index of entry of a replication slot with the given name , or
* - 1 if the slot is not found .
*
* create_it tells whether to create the new slot entry if it is not found .
* - - - - - - - - - -
*/
static int
pgstat_replslot_index ( const char * name , bool create_it )
{
int i ;
Assert ( nReplSlotStats < = max_replication_slots ) ;
for ( i = 0 ; i < nReplSlotStats ; i + + )
{
if ( strcmp ( replSlotStats [ i ] . slotname , name ) = = 0 )
return i ; /* found */
}
/*
* The slot is not found . We don ' t want to register the new statistics if
* the list is already full or the caller didn ' t request .
*/
if ( i = = max_replication_slots | | ! create_it )
return - 1 ;
/* Register new slot */
memset ( & replSlotStats [ nReplSlotStats ] , 0 , sizeof ( PgStat_ReplSlotStats ) ) ;
memcpy ( & replSlotStats [ nReplSlotStats ] . slotname , name , NAMEDATALEN ) ;
return nReplSlotStats + + ;
}
/* ----------
* pgstat_reset_replslot
*
* Reset the replication slot stats at index ' i ' .
* - - - - - - - - - -
*/
static void
pgstat_reset_replslot ( int i , TimestampTz ts )
{
/* reset only counters. Don't clear slot name */
replSlotStats [ i ] . spill_txns = 0 ;
replSlotStats [ i ] . spill_count = 0 ;
replSlotStats [ i ] . spill_bytes = 0 ;
replSlotStats [ i ] . stat_reset_timestamp = ts ;
}
/*
* pgstat_slru_index
*