You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
postgres/src/backend/utils/activity/pgstat_relation.c

931 lines
27 KiB

/* -------------------------------------------------------------------------
*
* pgstat_relation.c
* Implementation of relation statistics.
*
* This file contains the implementation of function relation. It is kept
* separate from pgstat.c to enforce the line between the statistics access /
* storage implementation and the details about individual types of
* statistics.
*
* Copyright (c) 2001-2022, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/utils/activity/pgstat_relation.c
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/twophase_rmgr.h"
#include "access/xact.h"
#include "postmaster/autovacuum.h"
#include "utils/memutils.h"
#include "utils/pgstat_internal.h"
#include "utils/rel.h"
#include "utils/timestamp.h"
/*
* Structures in which backends store per-table info that's waiting to be
* sent to the collector.
*
* NOTE: once allocated, TabStatusArray structures are never moved or deleted
* for the life of the backend. Also, we zero out the t_id fields of the
* contained PgStat_TableStatus structs whenever they are not actively in use.
* This allows relcache pgstat_info pointers to be treated as long-lived data,
* avoiding repeated searches in pgstat_relation_init() when a relation is
* repeatedly opened during a transaction.
*/
#define TABSTAT_QUANTUM 100 /* we alloc this many at a time */
typedef struct TabStatusArray
{
struct TabStatusArray *tsa_next; /* link to next array, if any */
int tsa_used; /* # entries currently used */
PgStat_TableStatus tsa_entries[TABSTAT_QUANTUM]; /* per-table data */
} TabStatusArray;
static TabStatusArray *pgStatTabList = NULL;
/*
* pgStatTabHash entry: map from relation OID to PgStat_TableStatus pointer
*/
typedef struct TabStatHashEntry
{
Oid t_id;
PgStat_TableStatus *tsa_entry;
} TabStatHashEntry;
/* Record that's written to 2PC state file when pgstat state is persisted */
typedef struct TwoPhasePgStatRecord
{
PgStat_Counter tuples_inserted; /* tuples inserted in xact */
PgStat_Counter tuples_updated; /* tuples updated in xact */
PgStat_Counter tuples_deleted; /* tuples deleted in xact */
/* tuples i/u/d prior to truncate/drop */
PgStat_Counter inserted_pre_truncdrop;
PgStat_Counter updated_pre_truncdrop;
PgStat_Counter deleted_pre_truncdrop;
Oid t_id; /* table's OID */
bool t_shared; /* is it a shared catalog? */
bool t_truncdropped; /* was the relation truncated/dropped? */
} TwoPhasePgStatRecord;
static PgStat_TableStatus *get_tabstat_entry(Oid rel_id, bool isshared);
static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg, TimestampTz now);
static void add_tabstat_xact_level(PgStat_TableStatus *pgstat_info, int nest_level);
static void ensure_tabstat_xact_level(PgStat_TableStatus *pgstat_info);
static void pgstat_truncdrop_save_counters(PgStat_TableXactStatus *trans, bool is_drop);
static void pgstat_truncdrop_restore_counters(PgStat_TableXactStatus *trans);
/*
* Indicates if backend has some relation stats that it hasn't yet
* sent to the collector.
*/
bool have_relation_stats;
/*
* Hash table for O(1) t_id -> tsa_entry lookup
*/
static HTAB *pgStatTabHash = NULL;
/*
* Copy stats between relations. This is used for things like REINDEX
* CONCURRENTLY.
*/
void
pgstat_copy_relation_stats(Relation dst, Relation src)
{
PgStat_StatTabEntry *srcstats;
srcstats = pgstat_fetch_stat_tabentry(RelationGetRelid(src));
if (!srcstats)
return;
if (pgstat_relation_should_count(dst))
{
/*
* XXX: temporarily this does not actually quite do what the name
* says, and just copy index related fields. A subsequent commit will
* do more.
*/
dst->pgstat_info->t_counts.t_numscans = srcstats->numscans;
dst->pgstat_info->t_counts.t_tuples_returned = srcstats->tuples_returned;
dst->pgstat_info->t_counts.t_tuples_fetched = srcstats->tuples_fetched;
dst->pgstat_info->t_counts.t_blocks_fetched = srcstats->blocks_fetched;
dst->pgstat_info->t_counts.t_blocks_hit = srcstats->blocks_hit;
/* the data will be sent by the next pgstat_report_stat() call */
}
}
/*
* Initialize a relcache entry to count access statistics.
* Called whenever a relation is opened.
*
* We assume that a relcache entry's pgstat_info field is zeroed by
* relcache.c when the relcache entry is made; thereafter it is long-lived
* data. We can avoid repeated searches of the TabStatus arrays when the
* same relation is touched repeatedly within a transaction.
*/
void
pgstat_relation_init(Relation rel)
{
Oid rel_id = rel->rd_id;
char relkind = rel->rd_rel->relkind;
/*
* We only count stats for relations with storage and partitioned tables
*/
if (!RELKIND_HAS_STORAGE(relkind) && relkind != RELKIND_PARTITIONED_TABLE)
{
rel->pgstat_info = NULL;
return;
}
if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts)
{
/* We're not counting at all */
rel->pgstat_info = NULL;
return;
}
/*
* If we already set up this relation in the current transaction, nothing
* to do.
*/
if (rel->pgstat_info != NULL &&
rel->pgstat_info->t_id == rel_id)
return;
/* Else find or make the PgStat_TableStatus entry, and update link */
rel->pgstat_info = get_tabstat_entry(rel_id, rel->rd_rel->relisshared);
}
/*
* Tell the collector that we just dropped a relation.
* (If the message gets lost, we will still clean the dead entry eventually
* via future invocations of pgstat_vacuum_stat().)
*
* Currently not used for lack of any good place to call it; we rely
* entirely on pgstat_vacuum_stat() to clean out stats for dead rels.
*/
#ifdef NOT_USED
void
pgstat_drop_relation(Oid relid)
{
PgStat_MsgTabpurge msg;
int len;
if (pgStatSock == PGINVALID_SOCKET)
return;
msg.m_tableid[0] = relid;
msg.m_nentries = 1;
len = offsetof(PgStat_MsgTabpurge, m_tableid[0]) + sizeof(Oid);
pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
msg.m_databaseid = MyDatabaseId;
pgstat_send(&msg, len);
}
#endif /* NOT_USED */
/*
* Report that the table was just vacuumed.
*/
void
pgstat_report_vacuum(Oid tableoid, bool shared,
PgStat_Counter livetuples, PgStat_Counter deadtuples)
{
PgStat_MsgVacuum msg;
if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts)
return;
pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_VACUUM);
msg.m_databaseid = shared ? InvalidOid : MyDatabaseId;
msg.m_tableoid = tableoid;
msg.m_autovacuum = IsAutoVacuumWorkerProcess();
msg.m_vacuumtime = GetCurrentTimestamp();
msg.m_live_tuples = livetuples;
msg.m_dead_tuples = deadtuples;
pgstat_send(&msg, sizeof(msg));
}
/*
* Report that the table was just analyzed.
*
* Caller must provide new live- and dead-tuples estimates, as well as a
* flag indicating whether to reset the changes_since_analyze counter.
*/
void
pgstat_report_analyze(Relation rel,
PgStat_Counter livetuples, PgStat_Counter deadtuples,
bool resetcounter)
{
PgStat_MsgAnalyze msg;
if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts)
return;
/*
* Unlike VACUUM, ANALYZE might be running inside a transaction that has
* already inserted and/or deleted rows in the target table. ANALYZE will
* have counted such rows as live or dead respectively. Because we will
* report our counts of such rows at transaction end, we should subtract
* off these counts from the update we're making now, else they'll be
* double-counted after commit. (This approach also ensures that the
* shared stats entry ends up with the right numbers if we abort instead
* of committing.)
*
* Waste no time on partitioned tables, though.
*/
if (pgstat_relation_should_count(rel) &&
rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
{
PgStat_TableXactStatus *trans;
for (trans = rel->pgstat_info->trans; trans; trans = trans->upper)
{
livetuples -= trans->tuples_inserted - trans->tuples_deleted;
deadtuples -= trans->tuples_updated + trans->tuples_deleted;
}
/* count stuff inserted by already-aborted subxacts, too */
deadtuples -= rel->pgstat_info->t_counts.t_delta_dead_tuples;
/* Since ANALYZE's counts are estimates, we could have underflowed */
livetuples = Max(livetuples, 0);
deadtuples = Max(deadtuples, 0);
}
pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_ANALYZE);
msg.m_databaseid = rel->rd_rel->relisshared ? InvalidOid : MyDatabaseId;
msg.m_tableoid = RelationGetRelid(rel);
msg.m_autovacuum = IsAutoVacuumWorkerProcess();
msg.m_resetcounter = resetcounter;
msg.m_analyzetime = GetCurrentTimestamp();
msg.m_live_tuples = livetuples;
msg.m_dead_tuples = deadtuples;
pgstat_send(&msg, sizeof(msg));
}
/*
* count a tuple insertion of n tuples
*/
void
pgstat_count_heap_insert(Relation rel, PgStat_Counter n)
{
if (pgstat_relation_should_count(rel))
{
PgStat_TableStatus *pgstat_info = rel->pgstat_info;
ensure_tabstat_xact_level(pgstat_info);
pgstat_info->trans->tuples_inserted += n;
}
}
/*
* count a tuple update
*/
void
pgstat_count_heap_update(Relation rel, bool hot)
{
if (pgstat_relation_should_count(rel))
{
PgStat_TableStatus *pgstat_info = rel->pgstat_info;
ensure_tabstat_xact_level(pgstat_info);
pgstat_info->trans->tuples_updated++;
/* t_tuples_hot_updated is nontransactional, so just advance it */
if (hot)
pgstat_info->t_counts.t_tuples_hot_updated++;
}
}
/*
* count a tuple deletion
*/
void
pgstat_count_heap_delete(Relation rel)
{
if (pgstat_relation_should_count(rel))
{
PgStat_TableStatus *pgstat_info = rel->pgstat_info;
ensure_tabstat_xact_level(pgstat_info);
pgstat_info->trans->tuples_deleted++;
}
}
/*
* update tuple counters due to truncate
*/
void
pgstat_count_truncate(Relation rel)
{
if (pgstat_relation_should_count(rel))
{
PgStat_TableStatus *pgstat_info = rel->pgstat_info;
ensure_tabstat_xact_level(pgstat_info);
pgstat_truncdrop_save_counters(pgstat_info->trans, false);
pgstat_info->trans->tuples_inserted = 0;
pgstat_info->trans->tuples_updated = 0;
pgstat_info->trans->tuples_deleted = 0;
}
}
/*
* update dead-tuples count
*
* The semantics of this are that we are reporting the nontransactional
* recovery of "delta" dead tuples; so t_delta_dead_tuples decreases
* rather than increasing, and the change goes straight into the per-table
* counter, not into transactional state.
*/
void
pgstat_update_heap_dead_tuples(Relation rel, int delta)
{
if (pgstat_relation_should_count(rel))
{
PgStat_TableStatus *pgstat_info = rel->pgstat_info;
pgstat_info->t_counts.t_delta_dead_tuples -= delta;
}
}
/*
* find any existing PgStat_TableStatus entry for rel
*
* If no entry, return NULL, don't create a new one
*
* Note: if we got an error in the most recent execution of pgstat_report_stat,
* it's possible that an entry exists but there's no hashtable entry for it.
* That's okay, we'll treat this case as "doesn't exist".
*/
PgStat_TableStatus *
find_tabstat_entry(Oid rel_id)
{
TabStatHashEntry *hash_entry;
/* If hashtable doesn't exist, there are no entries at all */
if (!pgStatTabHash)
return NULL;
hash_entry = hash_search(pgStatTabHash, &rel_id, HASH_FIND, NULL);
if (!hash_entry)
return NULL;
/* Note that this step could also return NULL, but that's correct */
return hash_entry->tsa_entry;
}
/*
* Perform relation stats specific end-of-transaction work. Helper for
* AtEOXact_PgStat.
*
* Transfer transactional insert/update counts into the base tabstat entries.
* We don't bother to free any of the transactional state, since it's all in
* TopTransactionContext and will go away anyway.
*/
void
AtEOXact_PgStat_Relations(PgStat_SubXactStatus *xact_state, bool isCommit)
{
PgStat_TableXactStatus *trans;
for (trans = xact_state->first; trans != NULL; trans = trans->next)
{
PgStat_TableStatus *tabstat;
Assert(trans->nest_level == 1);
Assert(trans->upper == NULL);
tabstat = trans->parent;
Assert(tabstat->trans == trans);
/* restore pre-truncate/drop stats (if any) in case of aborted xact */
if (!isCommit)
pgstat_truncdrop_restore_counters(trans);
/* count attempted actions regardless of commit/abort */
tabstat->t_counts.t_tuples_inserted += trans->tuples_inserted;
tabstat->t_counts.t_tuples_updated += trans->tuples_updated;
tabstat->t_counts.t_tuples_deleted += trans->tuples_deleted;
if (isCommit)
{
tabstat->t_counts.t_truncdropped = trans->truncdropped;
if (trans->truncdropped)
{
/* forget live/dead stats seen by backend thus far */
tabstat->t_counts.t_delta_live_tuples = 0;
tabstat->t_counts.t_delta_dead_tuples = 0;
}
/* insert adds a live tuple, delete removes one */
tabstat->t_counts.t_delta_live_tuples +=
trans->tuples_inserted - trans->tuples_deleted;
/* update and delete each create a dead tuple */
tabstat->t_counts.t_delta_dead_tuples +=
trans->tuples_updated + trans->tuples_deleted;
/* insert, update, delete each count as one change event */
tabstat->t_counts.t_changed_tuples +=
trans->tuples_inserted + trans->tuples_updated +
trans->tuples_deleted;
}
else
{
/* inserted tuples are dead, deleted tuples are unaffected */
tabstat->t_counts.t_delta_dead_tuples +=
trans->tuples_inserted + trans->tuples_updated;
/* an aborted xact generates no changed_tuple events */
}
tabstat->trans = NULL;
}
}
/*
* Perform relation stats specific end-of-sub-transaction work. Helper for
* AtEOSubXact_PgStat.
*
* Transfer transactional insert/update counts into the next higher
* subtransaction state.
*/
void
AtEOSubXact_PgStat_Relations(PgStat_SubXactStatus *xact_state, bool isCommit, int nestDepth)
{
PgStat_TableXactStatus *trans;
PgStat_TableXactStatus *next_trans;
for (trans = xact_state->first; trans != NULL; trans = next_trans)
{
PgStat_TableStatus *tabstat;
next_trans = trans->next;
Assert(trans->nest_level == nestDepth);
tabstat = trans->parent;
Assert(tabstat->trans == trans);
if (isCommit)
{
if (trans->upper && trans->upper->nest_level == nestDepth - 1)
{
if (trans->truncdropped)
{
/* propagate the truncate/drop status one level up */
pgstat_truncdrop_save_counters(trans->upper, false);
/* replace upper xact stats with ours */
trans->upper->tuples_inserted = trans->tuples_inserted;
trans->upper->tuples_updated = trans->tuples_updated;
trans->upper->tuples_deleted = trans->tuples_deleted;
}
else
{
trans->upper->tuples_inserted += trans->tuples_inserted;
trans->upper->tuples_updated += trans->tuples_updated;
trans->upper->tuples_deleted += trans->tuples_deleted;
}
tabstat->trans = trans->upper;
pfree(trans);
}
else
{
/*
* When there isn't an immediate parent state, we can just
* reuse the record instead of going through a palloc/pfree
* pushup (this works since it's all in TopTransactionContext
* anyway). We have to re-link it into the parent level,
* though, and that might mean pushing a new entry into the
* pgStatXactStack.
*/
PgStat_SubXactStatus *upper_xact_state;
upper_xact_state = pgstat_xact_stack_level_get(nestDepth - 1);
trans->next = upper_xact_state->first;
upper_xact_state->first = trans;
trans->nest_level = nestDepth - 1;
}
}
else
{
/*
* On abort, update top-level tabstat counts, then forget the
* subtransaction
*/
/* first restore values obliterated by truncate/drop */
pgstat_truncdrop_restore_counters(trans);
/* count attempted actions regardless of commit/abort */
tabstat->t_counts.t_tuples_inserted += trans->tuples_inserted;
tabstat->t_counts.t_tuples_updated += trans->tuples_updated;
tabstat->t_counts.t_tuples_deleted += trans->tuples_deleted;
/* inserted tuples are dead, deleted tuples are unaffected */
tabstat->t_counts.t_delta_dead_tuples +=
trans->tuples_inserted + trans->tuples_updated;
tabstat->trans = trans->upper;
pfree(trans);
}
}
}
/*
* Generate 2PC records for all the pending transaction-dependent relation
* stats.
*/
void
AtPrepare_PgStat_Relations(PgStat_SubXactStatus *xact_state)
{
PgStat_TableXactStatus *trans;
for (trans = xact_state->first; trans != NULL; trans = trans->next)
{
PgStat_TableStatus *tabstat;
TwoPhasePgStatRecord record;
Assert(trans->nest_level == 1);
Assert(trans->upper == NULL);
tabstat = trans->parent;
Assert(tabstat->trans == trans);
record.tuples_inserted = trans->tuples_inserted;
record.tuples_updated = trans->tuples_updated;
record.tuples_deleted = trans->tuples_deleted;
record.inserted_pre_truncdrop = trans->inserted_pre_truncdrop;
record.updated_pre_truncdrop = trans->updated_pre_truncdrop;
record.deleted_pre_truncdrop = trans->deleted_pre_truncdrop;
record.t_id = tabstat->t_id;
record.t_shared = tabstat->t_shared;
record.t_truncdropped = trans->truncdropped;
RegisterTwoPhaseRecord(TWOPHASE_RM_PGSTAT_ID, 0,
&record, sizeof(TwoPhasePgStatRecord));
}
}
/*
* All we need do here is unlink the transaction stats state from the
* nontransactional state. The nontransactional action counts will be
* reported to the stats system immediately, while the effects on live and
* dead tuple counts are preserved in the 2PC state file.
*
* Note: AtEOXact_PgStat_Relations is not called during PREPARE.
*/
void
PostPrepare_PgStat_Relations(PgStat_SubXactStatus *xact_state)
{
PgStat_TableXactStatus *trans;
for (trans = xact_state->first; trans != NULL; trans = trans->next)
{
PgStat_TableStatus *tabstat;
tabstat = trans->parent;
tabstat->trans = NULL;
}
}
/*
* 2PC processing routine for COMMIT PREPARED case.
*
* Load the saved counts into our local pgstats state.
*/
void
pgstat_twophase_postcommit(TransactionId xid, uint16 info,
void *recdata, uint32 len)
{
TwoPhasePgStatRecord *rec = (TwoPhasePgStatRecord *) recdata;
PgStat_TableStatus *pgstat_info;
/* Find or create a tabstat entry for the rel */
pgstat_info = get_tabstat_entry(rec->t_id, rec->t_shared);
/* Same math as in AtEOXact_PgStat, commit case */
pgstat_info->t_counts.t_tuples_inserted += rec->tuples_inserted;
pgstat_info->t_counts.t_tuples_updated += rec->tuples_updated;
pgstat_info->t_counts.t_tuples_deleted += rec->tuples_deleted;
pgstat_info->t_counts.t_truncdropped = rec->t_truncdropped;
if (rec->t_truncdropped)
{
/* forget live/dead stats seen by backend thus far */
pgstat_info->t_counts.t_delta_live_tuples = 0;
pgstat_info->t_counts.t_delta_dead_tuples = 0;
}
pgstat_info->t_counts.t_delta_live_tuples +=
rec->tuples_inserted - rec->tuples_deleted;
pgstat_info->t_counts.t_delta_dead_tuples +=
rec->tuples_updated + rec->tuples_deleted;
pgstat_info->t_counts.t_changed_tuples +=
rec->tuples_inserted + rec->tuples_updated +
rec->tuples_deleted;
}
/*
* 2PC processing routine for ROLLBACK PREPARED case.
*
* Load the saved counts into our local pgstats state, but treat them
* as aborted.
*/
void
pgstat_twophase_postabort(TransactionId xid, uint16 info,
void *recdata, uint32 len)
{
TwoPhasePgStatRecord *rec = (TwoPhasePgStatRecord *) recdata;
PgStat_TableStatus *pgstat_info;
/* Find or create a tabstat entry for the rel */
pgstat_info = get_tabstat_entry(rec->t_id, rec->t_shared);
/* Same math as in AtEOXact_PgStat, abort case */
if (rec->t_truncdropped)
{
rec->tuples_inserted = rec->inserted_pre_truncdrop;
rec->tuples_updated = rec->updated_pre_truncdrop;
rec->tuples_deleted = rec->deleted_pre_truncdrop;
}
pgstat_info->t_counts.t_tuples_inserted += rec->tuples_inserted;
pgstat_info->t_counts.t_tuples_updated += rec->tuples_updated;
pgstat_info->t_counts.t_tuples_deleted += rec->tuples_deleted;
pgstat_info->t_counts.t_delta_dead_tuples +=
rec->tuples_inserted + rec->tuples_updated;
}
/*
* Subroutine for pgstat_report_stat: Send relation statistics
*/
void
pgstat_send_tabstats(TimestampTz now, bool disconnect)
{
/* we assume this inits to all zeroes: */
static const PgStat_TableCounts all_zeroes;
PgStat_MsgTabstat regular_msg;
PgStat_MsgTabstat shared_msg;
TabStatusArray *tsa;
int i;
/*
* Destroy pgStatTabHash before we start invalidating PgStat_TableEntry
* entries it points to. (Should we fail partway through the loop below,
* it's okay to have removed the hashtable already --- the only
* consequence is we'd get multiple entries for the same table in the
* pgStatTabList, and that's safe.)
*/
if (pgStatTabHash)
hash_destroy(pgStatTabHash);
pgStatTabHash = NULL;
/*
* Scan through the TabStatusArray struct(s) to find tables that actually
* have counts, and build messages to send. We have to separate shared
* relations from regular ones because the databaseid field in the message
* header has to depend on that.
*/
regular_msg.m_databaseid = MyDatabaseId;
shared_msg.m_databaseid = InvalidOid;
regular_msg.m_nentries = 0;
shared_msg.m_nentries = 0;
for (tsa = pgStatTabList; tsa != NULL; tsa = tsa->tsa_next)
{
for (i = 0; i < tsa->tsa_used; i++)
{
PgStat_TableStatus *entry = &tsa->tsa_entries[i];
PgStat_MsgTabstat *this_msg;
PgStat_TableEntry *this_ent;
/* Shouldn't have any pending transaction-dependent counts */
Assert(entry->trans == NULL);
/*
* Ignore entries that didn't accumulate any actual counts, such
* as indexes that were opened by the planner but not used.
*/
if (memcmp(&entry->t_counts, &all_zeroes,
sizeof(PgStat_TableCounts)) == 0)
continue;
/*
* OK, insert data into the appropriate message, and send if full.
*/
this_msg = entry->t_shared ? &shared_msg : &regular_msg;
this_ent = &this_msg->m_entry[this_msg->m_nentries];
this_ent->t_id = entry->t_id;
memcpy(&this_ent->t_counts, &entry->t_counts,
sizeof(PgStat_TableCounts));
if (++this_msg->m_nentries >= PGSTAT_NUM_TABENTRIES)
{
pgstat_send_tabstat(this_msg, now);
this_msg->m_nentries = 0;
}
}
/* zero out PgStat_TableStatus structs after use */
MemSet(tsa->tsa_entries, 0,
tsa->tsa_used * sizeof(PgStat_TableStatus));
tsa->tsa_used = 0;
}
/*
* Send partial messages. Make sure that any pending xact commit/abort
* and connection stats get counted, even if there are no table stats to
* send.
*/
if (regular_msg.m_nentries > 0 ||
pgStatXactCommit > 0 || pgStatXactRollback > 0 || disconnect)
pgstat_send_tabstat(&regular_msg, now);
if (shared_msg.m_nentries > 0)
pgstat_send_tabstat(&shared_msg, now);
have_relation_stats = false;
}
/*
* Subroutine for pgstat_send_tabstats: finish and send one tabstat message
*/
static void
pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg, TimestampTz now)
{
int n;
int len;
/* It's unlikely we'd get here with no socket, but maybe not impossible */
if (pgStatSock == PGINVALID_SOCKET)
return;
/*
* Report and reset accumulated xact commit/rollback and I/O timings
* whenever we send a normal tabstat message
*/
pgstat_update_dbstats(tsmsg, now);
n = tsmsg->m_nentries;
len = offsetof(PgStat_MsgTabstat, m_entry[0]) +
n * sizeof(PgStat_TableEntry);
pgstat_setheader(&tsmsg->m_hdr, PGSTAT_MTYPE_TABSTAT);
pgstat_send(tsmsg, len);
}
/*
* find or create a PgStat_TableStatus entry for rel
*/
static PgStat_TableStatus *
get_tabstat_entry(Oid rel_id, bool isshared)
{
TabStatHashEntry *hash_entry;
PgStat_TableStatus *entry;
TabStatusArray *tsa;
bool found;
pgstat_assert_is_up();
have_relation_stats = true;
/*
* Create hash table if we don't have it already.
*/
if (pgStatTabHash == NULL)
{
HASHCTL ctl;
ctl.keysize = sizeof(Oid);
ctl.entrysize = sizeof(TabStatHashEntry);
pgStatTabHash = hash_create("pgstat TabStatusArray lookup hash table",
TABSTAT_QUANTUM,
&ctl,
HASH_ELEM | HASH_BLOBS);
}
/*
* Find an entry or create a new one.
*/
hash_entry = hash_search(pgStatTabHash, &rel_id, HASH_ENTER, &found);
if (!found)
{
/* initialize new entry with null pointer */
hash_entry->tsa_entry = NULL;
}
/*
* If entry is already valid, we're done.
*/
if (hash_entry->tsa_entry)
return hash_entry->tsa_entry;
/*
* Locate the first pgStatTabList entry with free space, making a new list
* entry if needed. Note that we could get an OOM failure here, but if so
* we have left the hashtable and the list in a consistent state.
*/
if (pgStatTabList == NULL)
{
/* Set up first pgStatTabList entry */
pgStatTabList = (TabStatusArray *)
MemoryContextAllocZero(TopMemoryContext,
sizeof(TabStatusArray));
}
tsa = pgStatTabList;
while (tsa->tsa_used >= TABSTAT_QUANTUM)
{
if (tsa->tsa_next == NULL)
tsa->tsa_next = (TabStatusArray *)
MemoryContextAllocZero(TopMemoryContext,
sizeof(TabStatusArray));
tsa = tsa->tsa_next;
}
/*
* Allocate a PgStat_TableStatus entry within this list entry. We assume
* the entry was already zeroed, either at creation or after last use.
*/
entry = &tsa->tsa_entries[tsa->tsa_used++];
entry->t_id = rel_id;
entry->t_shared = isshared;
/*
* Now we can fill the entry in pgStatTabHash.
*/
hash_entry->tsa_entry = entry;
return entry;
}
/*
* add a new (sub)transaction state record
*/
static void
add_tabstat_xact_level(PgStat_TableStatus *pgstat_info, int nest_level)
{
PgStat_SubXactStatus *xact_state;
PgStat_TableXactStatus *trans;
/*
* If this is the first rel to be modified at the current nest level, we
* first have to push a transaction stack entry.
*/
xact_state = pgstat_xact_stack_level_get(nest_level);
/* Now make a per-table stack entry */
trans = (PgStat_TableXactStatus *)
MemoryContextAllocZero(TopTransactionContext,
sizeof(PgStat_TableXactStatus));
trans->nest_level = nest_level;
trans->upper = pgstat_info->trans;
trans->parent = pgstat_info;
trans->next = xact_state->first;
xact_state->first = trans;
pgstat_info->trans = trans;
}
/*
* Add a new (sub)transaction record if needed.
*/
static void
ensure_tabstat_xact_level(PgStat_TableStatus *pgstat_info)
{
int nest_level = GetCurrentTransactionNestLevel();
if (pgstat_info->trans == NULL ||
pgstat_info->trans->nest_level != nest_level)
add_tabstat_xact_level(pgstat_info, nest_level);
}
/*
* Whenever a table is truncated/dropped, we save its i/u/d counters so that
* they can be cleared, and if the (sub)xact that executed the truncate/drop
* later aborts, the counters can be restored to the saved (pre-truncate/drop)
* values.
*
* Note that for truncate we do this on the first truncate in any particular
* subxact level only.
*/
static void
pgstat_truncdrop_save_counters(PgStat_TableXactStatus *trans, bool is_drop)
{
if (!trans->truncdropped || is_drop)
{
trans->inserted_pre_truncdrop = trans->tuples_inserted;
trans->updated_pre_truncdrop = trans->tuples_updated;
trans->deleted_pre_truncdrop = trans->tuples_deleted;
trans->truncdropped = true;
}
}
/*
* restore counters when a truncate aborts
*/
static void
pgstat_truncdrop_restore_counters(PgStat_TableXactStatus *trans)
{
if (trans->truncdropped)
{
trans->tuples_inserted = trans->inserted_pre_truncdrop;
trans->tuples_updated = trans->updated_pre_truncdrop;
trans->tuples_deleted = trans->deleted_pre_truncdrop;
}
}