Add o11y for chunk data retrieved from ingesters (#12003)

Signed-off-by: Danny Kopping <danny.kopping@grafana.com>
pull/12004/head
Danny Kopping 1 year ago committed by GitHub
parent fac5997b18
commit bdea0b6df1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 30
      pkg/logql/metrics.go
  2. 4
      pkg/logqlmodel/stats/context.go

@ -142,9 +142,9 @@ func RecordRangeAndInstantQueryMetrics(
"status", status,
"limit", p.Limit(),
"returned_lines", returnedLines,
"throughput", strings.Replace(humanize.Bytes(uint64(stats.Summary.BytesProcessedPerSecond)), " ", "", 1),
"total_bytes", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalBytesProcessed)), " ", "", 1),
"total_bytes_structured_metadata", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalStructuredMetadataBytesProcessed)), " ", "", 1),
"throughput", humanizeBytes(uint64(stats.Summary.BytesProcessedPerSecond)),
"total_bytes", humanizeBytes(uint64(stats.Summary.TotalBytesProcessed)),
"total_bytes_structured_metadata", humanizeBytes(uint64(stats.Summary.TotalStructuredMetadataBytesProcessed)),
"lines_per_second", stats.Summary.LinesProcessedPerSecond,
"total_lines", stats.Summary.TotalLinesProcessed,
"post_filter_lines", stats.Summary.TotalPostFilterLines,
@ -173,6 +173,26 @@ func RecordRangeAndInstantQueryMetrics(
"cache_result_hit", resultCache.EntriesFound,
"cache_result_download_time", resultCache.CacheDownloadTime(),
"cache_result_query_length_served", resultCache.CacheQueryLengthServed(),
// The total of chunk reference fetched from index.
"ingester_chunk_refs", stats.Ingester.Store.GetTotalChunksRef(),
// Total number of chunks fetched.
"ingester_chunk_downloaded", stats.Ingester.Store.GetTotalChunksDownloaded(),
// Time spent fetching chunks in nanoseconds.
"ingester_chunk_fetch_time", stats.Ingester.Store.ChunksDownloadDuration(),
// Total of chunks matched by the query from ingesters.
"ingester_chunk_matches", stats.Ingester.GetTotalChunksMatched(),
// Total ingester reached for this query.
"ingester_requests", stats.Ingester.GetTotalReached(),
// Total bytes processed but was already in memory (found in the headchunk). Includes structured metadata bytes.
"ingester_chunk_head_bytes", humanizeBytes(uint64(stats.Ingester.Store.Chunk.GetHeadChunkBytes())),
// Total bytes of compressed chunks (blocks) processed.
"ingester_chunk_compressed_bytes", humanizeBytes(uint64(stats.Ingester.Store.Chunk.GetCompressedBytes())),
// Total bytes decompressed and processed from chunks. Includes structured metadata bytes.
"ingester_chunk_decompressed_bytes", humanizeBytes(uint64(stats.Ingester.Store.Chunk.GetDecompressedBytes())),
// Total duplicates found while processing.
"ingester_chunk_duplicates", stats.Ingester.Store.Chunk.GetTotalDuplicates(),
// Total lines post filtering.
"ingester_post_filter_lines", stats.Ingester.Store.Chunk.GetPostFilterLines(),
}...)
logValues = append(logValues, tagsToKeyValues(queryTags)...)
@ -200,6 +220,10 @@ func RecordRangeAndInstantQueryMetrics(
recordUsageStats(queryType, stats)
}
func humanizeBytes(val uint64) string {
return strings.Replace(humanize.Bytes(val), " ", "", 1)
}
func RecordLabelQueryMetrics(
ctx context.Context,
log log.Logger,

@ -199,6 +199,10 @@ func (s *Store) Merge(m Store) {
}
}
func (s *Store) ChunksDownloadDuration() time.Duration {
return time.Duration(s.GetChunksDownloadTime())
}
func (s *Summary) Merge(m Summary) {
s.Splits += m.Splits
s.Shards += m.Shards

Loading…
Cancel
Save