@ -106,7 +106,7 @@
# define PGSTAT_DB_HASH_SIZE 16
# define PGSTAT_TAB_HASH_SIZE 512
# define PGSTAT_FUNCTION_HASH_SIZE 512
# define PGSTAT_SUBWORKER _HASH_SIZE 32
# define PGSTAT_SUBSCRIPTION _HASH_SIZE 32
# define PGSTAT_REPLSLOT_HASH_SIZE 32
@ -284,6 +284,7 @@ static PgStat_GlobalStats globalStats;
static PgStat_WalStats walStats ;
static PgStat_SLRUStats slruStats [ SLRU_NUM_ELEMENTS ] ;
static HTAB * replSlotStatHash = NULL ;
static HTAB * subscriptionStatHash = NULL ;
/*
* List of OIDs of databases we need to write out . If an entry is InvalidOid ,
@ -322,14 +323,13 @@ NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]) pg_attribute_no
static PgStat_StatDBEntry * pgstat_get_db_entry ( Oid databaseid , bool create ) ;
static PgStat_StatTabEntry * pgstat_get_tab_entry ( PgStat_StatDBEntry * dbentry ,
Oid tableoid , bool create ) ;
static PgStat_StatSubWorkerEntry * pgstat_get_subworker_entry ( PgStat_StatDBEntry * dbentry ,
Oid subid , Oid subrelid ,
bool create ) ;
static PgStat_StatSubEntry * pgstat_get_subscription_entry ( Oid subid , bool create ) ;
static void pgstat_reset_subscription ( PgStat_StatSubEntry * subentry , TimestampTz ts ) ;
static void pgstat_write_statsfiles ( bool permanent , bool allDbs ) ;
static void pgstat_write_db_statsfile ( PgStat_StatDBEntry * dbentry , bool permanent ) ;
static HTAB * pgstat_read_statsfiles ( Oid onlydb , bool permanent , bool deep ) ;
static void pgstat_read_db_statsfile ( Oid databaseid , HTAB * tabhash , HTAB * funchash ,
HTAB * subworkerhash , bool permanent ) ;
bool permanent ) ;
static void backend_read_statsfile ( void ) ;
static bool pgstat_write_statsfile_needed ( void ) ;
@ -341,7 +341,6 @@ static void pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotstats, Timestamp
static void pgstat_send_tabstat ( PgStat_MsgTabstat * tsmsg , TimestampTz now ) ;
static void pgstat_send_funcstats ( void ) ;
static void pgstat_send_slru ( void ) ;
static void pgstat_send_subscription_purge ( PgStat_MsgSubscriptionPurge * msg ) ;
static HTAB * pgstat_collect_oids ( Oid catalogid , AttrNumber anum_oid ) ;
static bool pgstat_should_report_connstat ( void ) ;
static void pgstat_report_disconnect ( Oid dboid ) ;
@ -363,6 +362,7 @@ static void pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter *msg, in
static void pgstat_recv_resetsinglecounter ( PgStat_MsgResetsinglecounter * msg , int len ) ;
static void pgstat_recv_resetslrucounter ( PgStat_MsgResetslrucounter * msg , int len ) ;
static void pgstat_recv_resetreplslotcounter ( PgStat_MsgResetreplslotcounter * msg , int len ) ;
static void pgstat_recv_resetsubcounter ( PgStat_MsgResetsubcounter * msg , int len ) ;
static void pgstat_recv_autovac ( PgStat_MsgAutovacStart * msg , int len ) ;
static void pgstat_recv_vacuum ( PgStat_MsgVacuum * msg , int len ) ;
static void pgstat_recv_analyze ( PgStat_MsgAnalyze * msg , int len ) ;
@ -380,8 +380,8 @@ static void pgstat_recv_connect(PgStat_MsgConnect *msg, int len);
static void pgstat_recv_disconnect ( PgStat_MsgDisconnect * msg , int len ) ;
static void pgstat_recv_replslot ( PgStat_MsgReplSlot * msg , int len ) ;
static void pgstat_recv_tempfile ( PgStat_MsgTempFile * msg , int len ) ;
static void pgstat_recv_subscription_purge ( PgStat_MsgSubscriptionPurge * msg , int len ) ;
static void pgstat_recv_subworker_error ( PgStat_MsgSubWorker Error * msg , int len ) ;
static void pgstat_recv_subscription_dro p ( PgStat_MsgSubscriptionDrop * msg , int len ) ;
static void pgstat_recv_subscription_error ( PgStat_MsgSubscription Error * msg , int len ) ;
/* ------------------------------------------------------------
* Public functions called from postmaster follow
@ -1187,6 +1187,32 @@ pgstat_vacuum_stat(void)
}
}
/*
* Repeat the above steps for subscriptions , if subscription stats are
* being collected .
*/
if ( subscriptionStatHash )
{
PgStat_StatSubEntry * subentry ;
/*
* Read pg_subscription and make a list of OIDs of all existing
* subscriptions .
*/
htab = pgstat_collect_oids ( SubscriptionRelationId , Anum_pg_subscription_oid ) ;
hash_seq_init ( & hstat , subscriptionStatHash ) ;
while ( ( subentry = ( PgStat_StatSubEntry * ) hash_seq_search ( & hstat ) ) ! = NULL )
{
CHECK_FOR_INTERRUPTS ( ) ;
if ( hash_search ( htab , ( void * ) & ( subentry - > subid ) , HASH_FIND , NULL ) = = NULL )
pgstat_report_subscription_drop ( subentry - > subid ) ;
}
hash_destroy ( htab ) ;
}
/*
* Lookup our own database entry ; if not found , nothing more to do .
*/
@ -1311,74 +1337,6 @@ pgstat_vacuum_stat(void)
hash_destroy ( htab ) ;
}
/*
* Repeat for subscription workers . Similarly , we needn ' t bother in the
* common case where no subscription workers ' stats are being collected .
*/
if ( dbentry - > subworkers ! = NULL & &
hash_get_num_entries ( dbentry - > subworkers ) > 0 )
{
PgStat_StatSubWorkerEntry * subwentry ;
PgStat_MsgSubscriptionPurge spmsg ;
/*
* Read pg_subscription and make a list of OIDs of all existing
* subscriptions
*/
htab = pgstat_collect_oids ( SubscriptionRelationId , Anum_pg_subscription_oid ) ;
spmsg . m_databaseid = MyDatabaseId ;
spmsg . m_nentries = 0 ;
hash_seq_init ( & hstat , dbentry - > subworkers ) ;
while ( ( subwentry = ( PgStat_StatSubWorkerEntry * ) hash_seq_search ( & hstat ) ) ! = NULL )
{
bool exists = false ;
Oid subid = subwentry - > key . subid ;
CHECK_FOR_INTERRUPTS ( ) ;
if ( hash_search ( htab , ( void * ) & subid , HASH_FIND , NULL ) ! = NULL )
continue ;
/*
* It is possible that we have multiple entries for the
* subscription corresponding to apply worker and tablesync
* workers . In such cases , we don ' t need to add the same subid
* again .
*/
for ( int i = 0 ; i < spmsg . m_nentries ; i + + )
{
if ( spmsg . m_subids [ i ] = = subid )
{
exists = true ;
break ;
}
}
if ( exists )
continue ;
/* This subscription is dead, add the subid to the message */
spmsg . m_subids [ spmsg . m_nentries + + ] = subid ;
/*
* If the message is full , send it out and reinitialize to empty
*/
if ( spmsg . m_nentries > = PGSTAT_NUM_SUBSCRIPTIONPURGE )
{
pgstat_send_subscription_purge ( & spmsg ) ;
spmsg . m_nentries = 0 ;
}
}
/* Send the rest of dead subscriptions */
if ( spmsg . m_nentries > 0 )
pgstat_send_subscription_purge ( & spmsg ) ;
hash_destroy ( htab ) ;
}
}
@ -1551,8 +1509,7 @@ pgstat_reset_shared_counters(const char *target)
* - - - - - - - - - -
*/
void
pgstat_reset_single_counter ( Oid objoid , Oid subobjoid ,
PgStat_Single_Reset_Type type )
pgstat_reset_single_counter ( Oid objoid , PgStat_Single_Reset_Type type )
{
PgStat_MsgResetsinglecounter msg ;
@ -1563,7 +1520,6 @@ pgstat_reset_single_counter(Oid objoid, Oid subobjoid,
msg . m_databaseid = MyDatabaseId ;
msg . m_resettype = type ;
msg . m_objectid = objoid ;
msg . m_subobjectid = subobjoid ;
pgstat_send ( & msg , sizeof ( msg ) ) ;
}
@ -1623,6 +1579,30 @@ pgstat_reset_replslot_counter(const char *name)
pgstat_send ( & msg , sizeof ( msg ) ) ;
}
/* ----------
* pgstat_reset_subscription_counter ( ) -
*
* Tell the statistics collector to reset a single subscription
* counter , or all subscription counters ( when subid is InvalidOid ) .
*
* Permission checking for this function is managed through the normal
* GRANT system .
* - - - - - - - - - -
*/
void
pgstat_reset_subscription_counter ( Oid subid )
{
PgStat_MsgResetsubcounter msg ;
if ( pgStatSock = = PGINVALID_SOCKET )
return ;
msg . m_subid = subid ;
pgstat_setheader ( & msg . m_hdr , PGSTAT_MTYPE_RESETSUBCOUNTER ) ;
pgstat_send ( & msg , sizeof ( msg ) ) ;
}
/* ----------
* pgstat_report_autovac ( ) -
*
@ -1949,31 +1929,20 @@ pgstat_report_replslot_drop(const char *slotname)
}
/* ----------
* pgstat_report_subworker _error ( ) -
* pgstat_report_subscription _error ( ) -
*
* Tell the collector about the subscription worker error .
* Tell the collector about the subscription error .
* - - - - - - - - - -
*/
void
pgstat_report_subworker_error ( Oid subid , Oid subrelid , Oid relid ,
LogicalRepMsgType command , TransactionId xid ,
const char * errmsg )
pgstat_report_subscription_error ( Oid subid , bool is_apply_error )
{
PgStat_MsgSubWorkerError msg ;
int len ;
PgStat_MsgSubscriptionError msg ;
pgstat_setheader ( & msg . m_hdr , PGSTAT_MTYPE_SUBWORKERERROR ) ;
msg . m_databaseid = MyDatabaseId ;
pgstat_setheader ( & msg . m_hdr , PGSTAT_MTYPE_SUBSCRIPTIONERROR ) ;
msg . m_subid = subid ;
msg . m_subrelid = subrelid ;
msg . m_relid = relid ;
msg . m_command = command ;
msg . m_xid = xid ;
msg . m_timestamp = GetCurrentTimestamp ( ) ;
strlcpy ( msg . m_message , errmsg , PGSTAT_SUBWORKERERROR_MSGLEN ) ;
len = offsetof ( PgStat_MsgSubWorkerError , m_message ) + strlen ( msg . m_message ) + 1 ;
pgstat_send ( & msg , len ) ;
msg . m_is_apply_error = is_apply_error ;
pgstat_send ( & msg , sizeof ( PgStat_MsgSubscriptionError ) ) ;
}
/* ----------
@ -1985,12 +1954,11 @@ pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid,
void
pgstat_report_subscription_drop ( Oid subid )
{
PgStat_MsgSubscriptionPurge msg ;
PgStat_MsgSubscriptionDrop msg ;
msg . m_databaseid = MyDatabaseId ;
msg . m_subids [ 0 ] = subid ;
msg . m_nentries = 1 ;
pgstat_send_subscription_purge ( & msg ) ;
pgstat_setheader ( & msg . m_hdr , PGSTAT_MTYPE_SUBSCRIPTIONDROP ) ;
msg . m_subid = subid ;
pgstat_send ( & msg , sizeof ( PgStat_MsgSubscriptionDrop ) ) ;
}
/* ----------
@ -2998,36 +2966,6 @@ pgstat_fetch_stat_funcentry(Oid func_id)
return funcentry ;
}
/*
* - - - - - - - - -
* pgstat_fetch_stat_subworker_entry ( ) -
*
* Support function for the SQL - callable pgstat * functions . Returns
* the collected statistics for subscription worker or NULL .
* - - - - - - - - -
*/
PgStat_StatSubWorkerEntry *
pgstat_fetch_stat_subworker_entry ( Oid subid , Oid subrelid )
{
PgStat_StatDBEntry * dbentry ;
PgStat_StatSubWorkerEntry * wentry = NULL ;
/* Load the stats file if needed */
backend_read_statsfile ( ) ;
/*
* Lookup our database , then find the requested subscription worker stats .
*/
dbentry = pgstat_fetch_stat_dbentry ( MyDatabaseId ) ;
if ( dbentry ! = NULL & & dbentry - > subworkers ! = NULL )
{
wentry = pgstat_get_subworker_entry ( dbentry , subid , subrelid ,
false ) ;
}
return wentry ;
}
/*
* - - - - - - - - -
* pgstat_fetch_stat_archiver ( ) -
@ -3140,6 +3078,23 @@ pgstat_fetch_replslot(NameData slotname)
return pgstat_get_replslot_entry ( slotname , false ) ;
}
/*
* - - - - - - - - -
* pgstat_fetch_stat_subscription ( ) -
*
* Support function for the SQL - callable pgstat * functions . Returns
* the collected statistics for one subscription or NULL .
* - - - - - - - - -
*/
PgStat_StatSubEntry *
pgstat_fetch_stat_subscription ( Oid subid )
{
/* Load the stats file if needed */
backend_read_statsfile ( ) ;
return pgstat_get_subscription_entry ( subid , false ) ;
}
/*
* Shut down a single backend ' s statistics reporting at process exit .
*
@ -3465,24 +3420,6 @@ pgstat_send_slru(void)
}
}
/* --------
* pgstat_send_subscription_purge ( ) -
*
* Send a subscription purge message to the collector
* - - - - - - - -
*/
static void
pgstat_send_subscription_purge ( PgStat_MsgSubscriptionPurge * msg )
{
int len ;
len = offsetof ( PgStat_MsgSubscriptionPurge , m_subids [ 0 ] )
+ msg - > m_nentries * sizeof ( Oid ) ;
pgstat_setheader ( & msg - > m_hdr , PGSTAT_MTYPE_SUBSCRIPTIONPURGE ) ;
pgstat_send ( msg , len ) ;
}
/* ----------
* PgstatCollectorMain ( ) -
*
@ -3668,6 +3605,10 @@ PgstatCollectorMain(int argc, char *argv[])
len ) ;
break ;
case PGSTAT_MTYPE_RESETSUBCOUNTER :
pgstat_recv_resetsubcounter ( & msg . msg_resetsubcounter , len ) ;
break ;
case PGSTAT_MTYPE_AUTOVAC_START :
pgstat_recv_autovac ( & msg . msg_autovacuum_start , len ) ;
break ;
@ -3738,12 +3679,12 @@ PgstatCollectorMain(int argc, char *argv[])
pgstat_recv_disconnect ( & msg . msg_disconnect , len ) ;
break ;
case PGSTAT_MTYPE_SUBSCRIPTIONPURGE :
pgstat_recv_subscription_purge ( & msg . msg_subscriptionpurge , len ) ;
case PGSTAT_MTYPE_SUBSCRIPTIONDRO P :
pgstat_recv_subscription_dro p ( & msg . msg_subscriptiondro p , len ) ;
break ;
case PGSTAT_MTYPE_SUBWORKER ERROR :
pgstat_recv_subworker _error ( & msg . msg_subworker error , len ) ;
case PGSTAT_MTYPE_SUBSCRIPTION ERROR :
pgstat_recv_subscription _error ( & msg . msg_subscription error , len ) ;
break ;
default :
@ -3791,8 +3732,7 @@ PgstatCollectorMain(int argc, char *argv[])
/*
* Subroutine to clear stats in a database entry
*
* Tables , functions , and subscription workers hashes are initialized
* to empty .
* Tables and functions hashes are initialized to empty .
*/
static void
reset_dbentry_counters ( PgStat_StatDBEntry * dbentry )
@ -3845,13 +3785,6 @@ reset_dbentry_counters(PgStat_StatDBEntry *dbentry)
PGSTAT_FUNCTION_HASH_SIZE ,
& hash_ctl ,
HASH_ELEM | HASH_BLOBS ) ;
hash_ctl . keysize = sizeof ( PgStat_StatSubWorkerKey ) ;
hash_ctl . entrysize = sizeof ( PgStat_StatSubWorkerEntry ) ;
dbentry - > subworkers = hash_create ( " Per-database subscription worker " ,
PGSTAT_SUBWORKER_HASH_SIZE ,
& hash_ctl ,
HASH_ELEM | HASH_BLOBS ) ;
}
/*
@ -3876,7 +3809,7 @@ pgstat_get_db_entry(Oid databaseid, bool create)
/*
* If not found , initialize the new one . This creates empty hash tables
* for tables , functions , and subscription worker s, too .
* for tables and function s , too .
*/
if ( ! found )
reset_dbentry_counters ( result ) ;
@ -3934,48 +3867,6 @@ pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry, Oid tableoid, bool create)
return result ;
}
/* ----------
* pgstat_get_subworker_entry
*
* Return subscription worker entry with the given subscription OID and
* relation OID . If subrelid is InvalidOid , it returns an entry of the
* apply worker otherwise returns an entry of the table sync worker
* associated with subrelid . If no subscription worker entry exists ,
* initialize it , if the create parameter is true . Else , return NULL .
* - - - - - - - - - -
*/
static PgStat_StatSubWorkerEntry *
pgstat_get_subworker_entry ( PgStat_StatDBEntry * dbentry , Oid subid , Oid subrelid ,
bool create )
{
PgStat_StatSubWorkerEntry * subwentry ;
PgStat_StatSubWorkerKey key ;
bool found ;
HASHACTION action = ( create ? HASH_ENTER : HASH_FIND ) ;
key . subid = subid ;
key . subrelid = subrelid ;
subwentry = ( PgStat_StatSubWorkerEntry * ) hash_search ( dbentry - > subworkers ,
( void * ) & key ,
action , & found ) ;
if ( ! create & & ! found )
return NULL ;
/* If not found, initialize the new one */
if ( ! found )
{
subwentry - > last_error_relid = InvalidOid ;
subwentry - > last_error_command = 0 ;
subwentry - > last_error_xid = InvalidTransactionId ;
subwentry - > last_error_count = 0 ;
subwentry - > last_error_time = 0 ;
subwentry - > last_error_message [ 0 ] = ' \0 ' ;
}
return subwentry ;
}
/* ----------
* pgstat_write_statsfiles ( ) -
* Write the global statistics file , as well as requested DB files .
@ -4059,8 +3950,8 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
while ( ( dbentry = ( PgStat_StatDBEntry * ) hash_seq_search ( & hstat ) ) ! = NULL )
{
/*
* Write out the table , function , and subscription - worker stats for
* this DB into the appropriate per - DB stat file , if required .
* Write out the table and function stats for this DB into the
* appropriate per - DB stat file , if required .
*/
if ( allDbs | | pgstat_db_requested ( dbentry - > databaseid ) )
{
@ -4095,6 +3986,22 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
}
}
/*
* Write subscription stats struct
*/
if ( subscriptionStatHash )
{
PgStat_StatSubEntry * subentry ;
hash_seq_init ( & hstat , subscriptionStatHash ) ;
while ( ( subentry = ( PgStat_StatSubEntry * ) hash_seq_search ( & hstat ) ) ! = NULL )
{
fputc ( ' S ' , fpout ) ;
rc = fwrite ( subentry , sizeof ( PgStat_StatSubEntry ) , 1 , fpout ) ;
( void ) rc ; /* we'll check for error with ferror */
}
}
/*
* No more output to be done . Close the temp file and replace the old
* pgstat . stat with it . The ferror ( ) check replaces testing for error
@ -4174,10 +4081,8 @@ pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent)
{
HASH_SEQ_STATUS tstat ;
HASH_SEQ_STATUS fstat ;
HASH_SEQ_STATUS sstat ;
PgStat_StatTabEntry * tabentry ;
PgStat_StatFuncEntry * funcentry ;
PgStat_StatSubWorkerEntry * subwentry ;
FILE * fpout ;
int32 format_id ;
Oid dbid = dbentry - > databaseid ;
@ -4232,17 +4137,6 @@ pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent)
( void ) rc ; /* we'll check for error with ferror */
}
/*
* Walk through the database ' s subscription worker stats table .
*/
hash_seq_init ( & sstat , dbentry - > subworkers ) ;
while ( ( subwentry = ( PgStat_StatSubWorkerEntry * ) hash_seq_search ( & sstat ) ) ! = NULL )
{
fputc ( ' S ' , fpout ) ;
rc = fwrite ( subwentry , sizeof ( PgStat_StatSubWorkerEntry ) , 1 , fpout ) ;
( void ) rc ; /* we'll check for error with ferror */
}
/*
* No more output to be done . Close the temp file and replace the old
* pgstat . stat with it . The ferror ( ) check replaces testing for error
@ -4301,9 +4195,8 @@ pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent)
* files after reading ; the in - memory status is now authoritative , and the
* files would be out of date in case somebody else reads them .
*
* If a ' deep ' read is requested , table / function / subscription - worker stats are
* read , otherwise the table / function / subscription - worker hash tables remain
* empty .
* If a ' deep ' read is requested , table / function stats are read , otherwise
* the table / function hash tables remain empty .
* - - - - - - - - - -
*/
static HTAB *
@ -4482,7 +4375,6 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
memcpy ( dbentry , & dbbuf , sizeof ( PgStat_StatDBEntry ) ) ;
dbentry - > tables = NULL ;
dbentry - > functions = NULL ;
dbentry - > subworkers = NULL ;
/*
* In the collector , disregard the timestamp we read from the
@ -4494,8 +4386,8 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
dbentry - > stats_timestamp = 0 ;
/*
* Don ' t create tables / functions / subworkers hashtables for
* uninteresting databases .
* Don ' t create tables / functions hashtables for uninteresting
* databases .
*/
if ( onlydb ! = InvalidOid )
{
@ -4520,14 +4412,6 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
& hash_ctl ,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT ) ;
hash_ctl . keysize = sizeof ( PgStat_StatSubWorkerKey ) ;
hash_ctl . entrysize = sizeof ( PgStat_StatSubWorkerEntry ) ;
hash_ctl . hcxt = pgStatLocalContext ;
dbentry - > subworkers = hash_create ( " Per-database subscription worker " ,
PGSTAT_SUBWORKER_HASH_SIZE ,
& hash_ctl ,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT ) ;
/*
* If requested , read the data from the database - specific
* file . Otherwise we just leave the hashtables empty .
@ -4536,7 +4420,6 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
pgstat_read_db_statsfile ( dbentry - > databaseid ,
dbentry - > tables ,
dbentry - > functions ,
dbentry - > subworkers ,
permanent ) ;
break ;
@ -4580,6 +4463,45 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
break ;
}
/*
* ' S ' A PgStat_StatSubEntry struct describing subscription
* statistics .
*/
case ' S ' :
{
PgStat_StatSubEntry subbuf ;
PgStat_StatSubEntry * subentry ;
if ( fread ( & subbuf , 1 , sizeof ( PgStat_StatSubEntry ) , fpin )
! = sizeof ( PgStat_StatSubEntry ) )
{
ereport ( pgStatRunningInCollector ? LOG : WARNING ,
( errmsg ( " corrupted statistics file \" %s \" " ,
statfile ) ) ) ;
goto done ;
}
if ( subscriptionStatHash = = NULL )
{
HASHCTL hash_ctl ;
hash_ctl . keysize = sizeof ( Oid ) ;
hash_ctl . entrysize = sizeof ( PgStat_StatSubEntry ) ;
hash_ctl . hcxt = pgStatLocalContext ;
subscriptionStatHash = hash_create ( " Subscription hash " ,
PGSTAT_SUBSCRIPTION_HASH_SIZE ,
& hash_ctl ,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT ) ;
}
subentry = ( PgStat_StatSubEntry * ) hash_search ( subscriptionStatHash ,
( void * ) & subbuf . subid ,
HASH_ENTER , NULL ) ;
memcpy ( subentry , & subbuf , sizeof ( subbuf ) ) ;
break ;
}
case ' E ' :
goto done ;
@ -4614,21 +4536,19 @@ done:
* As in pgstat_read_statsfiles , if the permanent file is requested , it is
* removed after reading .
*
* Note : this code has the ability to skip storing per - table , per - function , or
* per - subscription - worker data , if NULL is passed for the corresponding hashtable .
* That ' s not used at the moment though .
* Note : this code has the ability to skip storing per - table or per - function
* data , if NULL is passed for the corresponding hashtable . That ' s not used
* at the moment though .
* - - - - - - - - - -
*/
static void
pgstat_read_db_statsfile ( Oid databaseid , HTAB * tabhash , HTAB * funchash ,
HTAB * subworkerhash , bool permanent )
bool permanent )
{
PgStat_StatTabEntry * tabentry ;
PgStat_StatTabEntry tabbuf ;
PgStat_StatFuncEntry funcbuf ;
PgStat_StatFuncEntry * funcentry ;
PgStat_StatSubWorkerEntry subwbuf ;
PgStat_StatSubWorkerEntry * subwentry ;
FILE * fpin ;
int32 format_id ;
bool found ;
@ -4742,41 +4662,6 @@ pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash,
memcpy ( funcentry , & funcbuf , sizeof ( funcbuf ) ) ;
break ;
/*
* ' S ' A PgStat_StatSubWorkerEntry struct describing
* subscription worker statistics .
*/
case ' S ' :
if ( fread ( & subwbuf , 1 , sizeof ( PgStat_StatSubWorkerEntry ) ,
fpin ) ! = sizeof ( PgStat_StatSubWorkerEntry ) )
{
ereport ( pgStatRunningInCollector ? LOG : WARNING ,
( errmsg ( " corrupted statistics file \" %s \" " ,
statfile ) ) ) ;
goto done ;
}
/*
* Skip if subscription worker data not wanted .
*/
if ( subworkerhash = = NULL )
break ;
subwentry = ( PgStat_StatSubWorkerEntry * ) hash_search ( subworkerhash ,
( void * ) & subwbuf . key ,
HASH_ENTER , & found ) ;
if ( found )
{
ereport ( pgStatRunningInCollector ? LOG : WARNING ,
( errmsg ( " corrupted statistics file \" %s \" " ,
statfile ) ) ) ;
goto done ;
}
memcpy ( subwentry , & subwbuf , sizeof ( subwbuf ) ) ;
break ;
/*
* ' E ' The EOF marker of a complete stats file .
*/
@ -4829,6 +4714,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
PgStat_WalStats myWalStats ;
PgStat_SLRUStats mySLRUStats [ SLRU_NUM_ELEMENTS ] ;
PgStat_StatReplSlotEntry myReplSlotStats ;
PgStat_StatSubEntry mySubStats ;
FILE * fpin ;
int32 format_id ;
const char * statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename ;
@ -4959,6 +4845,22 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
}
break ;
/*
* ' S ' A PgStat_StatSubEntry struct describing subscription
* statistics follows .
*/
case ' S ' :
if ( fread ( & mySubStats , 1 , sizeof ( PgStat_StatSubEntry ) , fpin )
! = sizeof ( PgStat_StatSubEntry ) )
{
ereport ( pgStatRunningInCollector ? LOG : WARNING ,
( errmsg ( " corrupted statistics file \" %s \" " ,
statfile ) ) ) ;
FreeFile ( fpin ) ;
return false ;
}
break ;
case ' E ' :
goto done ;
@ -5164,6 +5066,7 @@ pgstat_clear_snapshot(void)
pgStatLocalContext = NULL ;
pgStatDBHash = NULL ;
replSlotStatHash = NULL ;
subscriptionStatHash = NULL ;
/*
* Historically the backend_status . c facilities lived in this file , and
@ -5450,8 +5353,6 @@ pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len)
hash_destroy ( dbentry - > tables ) ;
if ( dbentry - > functions ! = NULL )
hash_destroy ( dbentry - > functions ) ;
if ( dbentry - > subworkers ! = NULL )
hash_destroy ( dbentry - > subworkers ) ;
if ( hash_search ( pgStatDBHash ,
( void * ) & dbid ,
@ -5489,16 +5390,13 @@ pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len)
hash_destroy ( dbentry - > tables ) ;
if ( dbentry - > functions ! = NULL )
hash_destroy ( dbentry - > functions ) ;
if ( dbentry - > subworkers ! = NULL )
hash_destroy ( dbentry - > subworkers ) ;
dbentry - > tables = NULL ;
dbentry - > functions = NULL ;
dbentry - > subworkers = NULL ;
/*
* Reset database - level stats , too . This creates empty hash tables for
* tables , functions , and subscription worker s.
* tables and function s .
*/
reset_dbentry_counters ( dbentry ) ;
}
@ -5567,14 +5465,6 @@ pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len)
else if ( msg - > m_resettype = = RESET_FUNCTION )
( void ) hash_search ( dbentry - > functions , ( void * ) & ( msg - > m_objectid ) ,
HASH_REMOVE , NULL ) ;
else if ( msg - > m_resettype = = RESET_SUBWORKER )
{
PgStat_StatSubWorkerKey key ;
key . subid = msg - > m_objectid ;
key . subrelid = msg - > m_subobjectid ;
( void ) hash_search ( dbentry - > subworkers , ( void * ) & key , HASH_REMOVE , NULL ) ;
}
}
/* ----------
@ -5645,6 +5535,51 @@ pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg,
}
}
/* ----------
* pgstat_recv_resetsubcounter ( ) -
*
* Reset some subscription statistics of the cluster .
* - - - - - - - - - -
*/
static void
pgstat_recv_resetsubcounter ( PgStat_MsgResetsubcounter * msg , int len )
{
PgStat_StatSubEntry * subentry ;
TimestampTz ts ;
/* Return if we don't have replication subscription statistics */
if ( subscriptionStatHash = = NULL )
return ;
ts = GetCurrentTimestamp ( ) ;
if ( ! OidIsValid ( msg - > m_subid ) )
{
HASH_SEQ_STATUS sstat ;
/* Clear all subscription counters */
hash_seq_init ( & sstat , subscriptionStatHash ) ;
while ( ( subentry = ( PgStat_StatSubEntry * ) hash_seq_search ( & sstat ) ) ! = NULL )
pgstat_reset_subscription ( subentry , ts ) ;
}
else
{
/* Get the subscription statistics to reset */
subentry = pgstat_get_subscription_entry ( msg - > m_subid , false ) ;
/*
* Nothing to do if the given subscription entry is not found . This
* could happen when the subscription with the subid is removed and
* the corresponding statistics entry is also removed before receiving
* the reset message .
*/
if ( ! subentry )
return ;
/* Reset the stats for the requested subscription */
pgstat_reset_subscription ( subentry , ts ) ;
}
}
/* ----------
* pgstat_recv_autovac ( ) -
@ -6118,81 +6053,42 @@ pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len)
}
/* ----------
* pgstat_recv_subscription_purge ( ) -
* pgstat_recv_subscription_dro p ( ) -
*
* Process a SUBSCRIPTIONPURGE message .
* Process a SUBSCRIPTIONDRO P message .
* - - - - - - - - - -
*/
static void
pgstat_recv_subscription_purge ( PgStat_MsgSubscriptionPurge * msg , int len )
pgstat_recv_subscription_dro p ( PgStat_MsgSubscriptionDrop * msg , int len )
{
HASH_SEQ_STATUS hstat ;
PgStat_StatDBEntry * dbentry ;
PgStat_StatSubWorkerEntry * subwentry ;
dbentry = pgstat_get_db_entry ( msg - > m_databaseid , false ) ;
/* No need to purge if we don't even know the database */
if ( ! dbentry | | ! dbentry - > subworkers )
/* Return if we don't have replication subscription statistics */
if ( subscriptionStatHash = = NULL )
return ;
/* Remove all subscription worker statistics for the given subscriptions */
hash_seq_init ( & hstat , dbentry - > subworkers ) ;
while ( ( subwentry = ( PgStat_StatSubWorkerEntry * ) hash_seq_search ( & hstat ) ) ! = NULL )
{
for ( int i = 0 ; i < msg - > m_nentries ; i + + )
{
if ( subwentry - > key . subid = = msg - > m_subids [ i ] )
{
( void ) hash_search ( dbentry - > subworkers , ( void * ) & ( subwentry - > key ) ,
HASH_REMOVE , NULL ) ;
break ;
}
}
}
/* Remove from hashtable if present; we don't care if it's not */
( void ) hash_search ( subscriptionStatHash , ( void * ) & ( msg - > m_subid ) ,
HASH_REMOVE , NULL ) ;
}
/* ----------
* pgstat_recv_subworker _error ( ) -
* pgstat_recv_subscription_error ( ) -
*
* Process a SUBWORKER ERROR message .
* Process a SUBSCRIPTIONERROR message .
* - - - - - - - - - -
*/
static void
pgstat_recv_subworker_error ( PgStat_MsgSubWorker Error * msg , int len )
pgstat_recv_subscription_error ( PgStat_MsgSubscriptionError * msg , int len )
{
PgStat_StatDBEntry * dbentry ;
PgStat_StatSubWorkerEntry * subwentry ;
dbentry = pgstat_get_db_entry ( msg - > m_databaseid , true ) ;
PgStat_StatSubEntry * subentry ;
/* Get the subscription worker stats */
subwentry = pgstat_get_subworker_entry ( dbentry , msg - > m_subid ,
msg - > m_subrelid , true ) ;
Assert ( subwentry ) ;
if ( subwentry - > last_error_relid = = msg - > m_relid & &
subwentry - > last_error_command = = msg - > m_command & &
subwentry - > last_error_xid = = msg - > m_xid & &
strcmp ( subwentry - > last_error_message , msg - > m_message ) = = 0 )
{
/*
* The same error occurred again in succession , just update its
* timestamp and count .
*/
subwentry - > last_error_count + + ;
subwentry - > last_error_time = msg - > m_timestamp ;
return ;
}
/* Get the subscription stats */
subentry = pgstat_get_subscription_entry ( msg - > m_subid , true ) ;
Assert ( subentry ) ;
/* Otherwise, update the error information */
subwentry - > last_error_relid = msg - > m_relid ;
subwentry - > last_error_command = msg - > m_command ;
subwentry - > last_error_xid = msg - > m_xid ;
subwentry - > last_error_count = 1 ;
subwentry - > last_error_time = msg - > m_timestamp ;
strlcpy ( subwentry - > last_error_message , msg - > m_message ,
PGSTAT_SUBWORKERERROR_MSGLEN ) ;
if ( msg - > m_is_apply_error )
subentry - > apply_error_count + + ;
else
subentry - > sync_error_count + + ;
}
/* ----------
@ -6313,6 +6209,68 @@ pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotent, TimestampTz ts)
slotent - > stat_reset_timestamp = ts ;
}
/* ----------
* pgstat_get_subscription_entry
*
* Return the subscription statistics entry with the given subscription OID .
* If no subscription entry exists , initialize it , if the create parameter is
* true . Else , return NULL .
* - - - - - - - - - -
*/
static PgStat_StatSubEntry *
pgstat_get_subscription_entry ( Oid subid , bool create )
{
PgStat_StatSubEntry * subentry ;
bool found ;
HASHACTION action = ( create ? HASH_ENTER : HASH_FIND ) ;
if ( subscriptionStatHash = = NULL )
{
HASHCTL hash_ctl ;
/*
* Quick return NULL if the hash table is empty and the caller didn ' t
* request to create the entry .
*/
if ( ! create )
return NULL ;
hash_ctl . keysize = sizeof ( Oid ) ;
hash_ctl . entrysize = sizeof ( PgStat_StatSubEntry ) ;
subscriptionStatHash = hash_create ( " Subscription hash " ,
PGSTAT_SUBSCRIPTION_HASH_SIZE ,
& hash_ctl ,
HASH_ELEM | HASH_BLOBS ) ;
}
subentry = ( PgStat_StatSubEntry * ) hash_search ( subscriptionStatHash ,
( void * ) & subid ,
action , & found ) ;
if ( ! create & & ! found )
return NULL ;
/* If not found, initialize the new one */
if ( ! found )
pgstat_reset_subscription ( subentry , 0 ) ;
return subentry ;
}
/* ----------
* pgstat_reset_subscription
*
* Reset the given subscription stats .
* - - - - - - - - - -
*/
static void
pgstat_reset_subscription ( PgStat_StatSubEntry * subentry , TimestampTz ts )
{
subentry - > apply_error_count = 0 ;
subentry - > sync_error_count = 0 ;
subentry - > stat_reset_timestamp = ts ;
}
/*
* pgstat_slru_index
*