Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loki/pkg/logqlmodel/stats/context.go

491 lines
16 KiB

/*
Package stats provides primitives for recording metrics across the query path.
Statistics are passed through the query context.
To start a new query statistics context use:
statsCtx, ctx := stats.NewContext(ctx)
Then you can update statistics by mutating data by using:
statsCtx.Add...(1)
To get the statistic from the current context you can use:
statsCtx := stats.FromContext(ctx)
Finally to get a snapshot of the current query statistic use
statsCtx.Result(time.Since(start), queueTime, totalEntriesReturned)
*/
package stats
import (
"context"
Feature/querysharding ii (#1927) * [wip] sharding evaluator/ast * [wip] continues experimenting with ast mapping * refactoring in preparation for binops * evaluators can pass state to other evaluators * compiler alignment * Evaluator method renamed to StepEvaluator * chained evaluator impl * tidying up sharding code * handling for ConcatSampleExpr * downstream iterator * structure for downstreaming asts * outlines sharding optimizations * work on sharding mapper * ast sharding optimizations * test for different logrange positions * shard mapper tests * stronger ast sharding & tests * shardmapper tests for string->string * removes sharding evaluator code * removes unused ctx arg * Revert "removes sharding evaluator code" This reverts commit 55d41b9519da9496e9471f13a5048d903ea04aaa. * interfaces for downstreaming, type conversions * sharding plumbing on frontend * type alignment in queryrange to downstream sharded queriers * downstreaming support for sharding incl storage code * removes chainedevaluator * comment alignment * storage shard injection * speccing out testware for sharding equivalence * [wip] shared engine refactor * sorting streams, sharding eval fixes * downstream evaluator embeds defaultevaluator * other pkgs adopt logql changes * metrics & logs use same middleware instantiation process * wires up shardingware * middleware per metrics/logfilter * empty step populating StepEvaluator promql.Matrix adapter * sharding metrics * log/span injection into sharded engine * sharding metrics avoids multiple instantiation * downstreamhandler tracing * sharding parameterized libsonnet * removes querier replicas * default 32 concurrency for workers * jsonnet correct level override * unquote true in yaml * lowercase error + downstreamEvaluator defaults to embedded defaultEvaluator * makes shardRecorder private * logs query on failed parse * refactors engine to be multi-use, minimizes logger injection, generalizes Query methods, removes Engine interface * basic tests for querysharding mware * [wip] concurrent evaluator * integrates stat propagation into sharding evaluator * splitby histogram * extends le bounds for bytes processed * byte throughput histogram buckets to 40gb * chunk duration mixin * fixes merge w/ field rename * derives logger in sharded engine via ctx & logs some downstream evaluators * moves sharded engine to top, adds comments * logs failed merge results in stats ctx * snapshotting stats merge logic is done more effectively * per query concurrency controlled via downstreamer * unexports decodereq * queryrange testware * downstreamer tests * pr requests
5 years ago
"sync"
"sync/atomic" //lint:ignore faillint we can't use go.uber.org/atomic with a protobuf struct without wrapping it.
"time"
"github.com/dustin/go-humanize"
"github.com/go-kit/log"
)
type (
ctxKeyType string
Component int64
)
const (
statsKey ctxKeyType = "stats"
)
// Context is the statistics context. It is passed through the query path and accumulates statistics.
type Context struct {
querier Querier
ingester Ingester
caches Caches
// store is the store statistics collected across the query path
store Store
// result accumulates results for JoinResult.
result Result
mtx sync.Mutex
}
type CacheType string
const (
ChunkCache CacheType = "chunk" //nolint:staticcheck
IndexCache = "index"
ResultCache = "result"
Stats cache can be configured independently (#9535) **What this PR does / why we need it**: Before this PR, the index stats cache would use the same config as the query results cache. This was a limitation since: 1. We would not be able to point to a different cache for storing the index stats if needed. 2. We would not be able to add specific settings for this cache, without adding it to the results cache. In this PR, we refactor the index stats cache config to be independently configurable. Note that if it's not configured, it will try to use the results cache settings. **Which issue(s) this PR fixes**: This is needed for: - https://github.com/grafana/loki/pull/9537 - https://github.com/grafana/loki/pull/9536 **Special notes for your reviewer**: - This PR also refactors all the tripperwares in rountrip.go to reuse the same stats tripperware instead of each one creating their own. - Configuring a new cache in rountrip.go is a requirement for https://github.com/grafana/loki/pull/9536 so the stats summary can distinguish before the stats cache and the results cache. **Checklist** - [x] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [x] Documentation added - [x] Tests updated - [x] `CHANGELOG.md` updated - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213)
2 years ago
StatsResultCache = "stats-result"
WriteDedupeCache = "write-dedupe"
)
// NewContext creates a new statistics context
func NewContext(ctx context.Context) (*Context, context.Context) {
contextData := &Context{}
ctx = context.WithValue(ctx, statsKey, contextData)
return contextData, ctx
}
// FromContext returns the statistics context.
func FromContext(ctx context.Context) *Context {
v, ok := ctx.Value(statsKey).(*Context)
if !ok {
return &Context{}
}
return v
}
// Ingester returns the ingester statistics accumulated so far.
func (c *Context) Ingester() Ingester {
return Ingester{
TotalReached: c.ingester.TotalReached,
TotalChunksMatched: c.ingester.TotalChunksMatched,
TotalBatches: c.ingester.TotalBatches,
TotalLinesSent: c.ingester.TotalLinesSent,
Store: c.store,
}
}
// Caches returns the cache statistics accumulated so far.
func (c *Context) Caches() Caches {
return Caches{
Add back cache stats for index stats requests (#9816) **What this PR does / why we need it**: In https://github.com/grafana/loki/pull/9536, we added cache stats for index stats requests. That PR had a bug that inflated the query stats due to reusing the stats context in the query engine. Therefore, we had to revert the PR at https://github.com/grafana/loki/pull/9721. This PR brings back the changes from https://github.com/grafana/loki/pull/9536 but fixes the inflated starts by no longer reusing the same context in the query engine, but rather creating a new one for the shard resolver. I tested it on a dev cluster and seems to be working fine. here's the output for the same query: **Stats with the bug from #9536**: ``` ... Cache.StatsResult.Requests 980 Cache.StatsResult.EntriesRequested 490 Cache.StatsResult.EntriesFound 0 Cache.StatsResult.EntriesStored 490 Cache.StatsResult.BytesSent 0 B Cache.StatsResult.BytesReceived 0 B ... Summary.BytesProcessedPerSecond 43 GB Summary.LinesProcessedPerSecond 93305142 Summary.TotalBytesProcessed 945 GB Summary.TotalLinesProcessed 2059694183 ``` **Stats from _main_** ``` ... Summary.BytesProcessedPerSecond 1.6 GB Summary.LinesProcessedPerSecond 3403718 Summary.TotalBytesProcessed 95 GB Summary.TotalLinesProcessed 207971404 ``` **Stats with fix in this PR** ``` .. Cache.StatsResult.Requests 132 Cache.StatsResult.EntriesRequested 66 Cache.StatsResult.EntriesFound 0 Cache.StatsResult.EntriesStored 66 Cache.StatsResult.BytesSent 0 B Cache.StatsResult.BytesReceived 0 B ... Summary.BytesProcessedPerSecond 4.3 GB Summary.LinesProcessedPerSecond 9468900 Summary.TotalBytesProcessed 95 GB Summary.TotalLinesProcessed 207793816 ``` As can be seen, with the changes in this PR, the summary stats are no longer inflated. **Which issue(s) this PR fixes**: Fixes https://github.com/grafana/loki/pull/9536 **Special notes for your reviewer**: I think it's ok to skip reviewing the changes from the commit cherry-picking the changes from https://github.com/grafana/loki/pull/9536 **Checklist** - [ ] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [ ] Tests updated - [ ] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213)
2 years ago
Chunk: c.caches.Chunk,
Index: c.caches.Index,
Result: c.caches.Result,
StatsResult: c.caches.StatsResult,
}
}
// Reset clears the statistics.
func (c *Context) Reset() {
c.mtx.Lock()
defer c.mtx.Unlock()
c.store.Reset()
c.querier.Reset()
c.ingester.Reset()
c.result.Reset()
c.caches.Reset()
}
// Result calculates the summary based on store and ingester data.
func (c *Context) Result(execTime time.Duration, queueTime time.Duration, totalEntriesReturned int) Result {
r := c.result
r.Merge(Result{
Querier: Querier{
Store: c.store,
},
Ingester: c.ingester,
Caches: c.caches,
})
r.ComputeSummary(execTime, queueTime, totalEntriesReturned)
return r
}
// JoinResults merges a Result with the embedded Result in a context in a concurrency-safe manner.
func JoinResults(ctx context.Context, res Result) {
stats := FromContext(ctx)
stats.mtx.Lock()
defer stats.mtx.Unlock()
Feature/querysharding ii (#1927) * [wip] sharding evaluator/ast * [wip] continues experimenting with ast mapping * refactoring in preparation for binops * evaluators can pass state to other evaluators * compiler alignment * Evaluator method renamed to StepEvaluator * chained evaluator impl * tidying up sharding code * handling for ConcatSampleExpr * downstream iterator * structure for downstreaming asts * outlines sharding optimizations * work on sharding mapper * ast sharding optimizations * test for different logrange positions * shard mapper tests * stronger ast sharding & tests * shardmapper tests for string->string * removes sharding evaluator code * removes unused ctx arg * Revert "removes sharding evaluator code" This reverts commit 55d41b9519da9496e9471f13a5048d903ea04aaa. * interfaces for downstreaming, type conversions * sharding plumbing on frontend * type alignment in queryrange to downstream sharded queriers * downstreaming support for sharding incl storage code * removes chainedevaluator * comment alignment * storage shard injection * speccing out testware for sharding equivalence * [wip] shared engine refactor * sorting streams, sharding eval fixes * downstream evaluator embeds defaultevaluator * other pkgs adopt logql changes * metrics & logs use same middleware instantiation process * wires up shardingware * middleware per metrics/logfilter * empty step populating StepEvaluator promql.Matrix adapter * sharding metrics * log/span injection into sharded engine * sharding metrics avoids multiple instantiation * downstreamhandler tracing * sharding parameterized libsonnet * removes querier replicas * default 32 concurrency for workers * jsonnet correct level override * unquote true in yaml * lowercase error + downstreamEvaluator defaults to embedded defaultEvaluator * makes shardRecorder private * logs query on failed parse * refactors engine to be multi-use, minimizes logger injection, generalizes Query methods, removes Engine interface * basic tests for querysharding mware * [wip] concurrent evaluator * integrates stat propagation into sharding evaluator * splitby histogram * extends le bounds for bytes processed * byte throughput histogram buckets to 40gb * chunk duration mixin * fixes merge w/ field rename * derives logger in sharded engine via ctx & logs some downstream evaluators * moves sharded engine to top, adds comments * logs failed merge results in stats ctx * snapshotting stats merge logic is done more effectively * per query concurrency controlled via downstreamer * unexports decodereq * queryrange testware * downstreamer tests * pr requests
5 years ago
stats.result.Merge(res)
}
Feature/querysharding ii (#1927) * [wip] sharding evaluator/ast * [wip] continues experimenting with ast mapping * refactoring in preparation for binops * evaluators can pass state to other evaluators * compiler alignment * Evaluator method renamed to StepEvaluator * chained evaluator impl * tidying up sharding code * handling for ConcatSampleExpr * downstream iterator * structure for downstreaming asts * outlines sharding optimizations * work on sharding mapper * ast sharding optimizations * test for different logrange positions * shard mapper tests * stronger ast sharding & tests * shardmapper tests for string->string * removes sharding evaluator code * removes unused ctx arg * Revert "removes sharding evaluator code" This reverts commit 55d41b9519da9496e9471f13a5048d903ea04aaa. * interfaces for downstreaming, type conversions * sharding plumbing on frontend * type alignment in queryrange to downstream sharded queriers * downstreaming support for sharding incl storage code * removes chainedevaluator * comment alignment * storage shard injection * speccing out testware for sharding equivalence * [wip] shared engine refactor * sorting streams, sharding eval fixes * downstream evaluator embeds defaultevaluator * other pkgs adopt logql changes * metrics & logs use same middleware instantiation process * wires up shardingware * middleware per metrics/logfilter * empty step populating StepEvaluator promql.Matrix adapter * sharding metrics * log/span injection into sharded engine * sharding metrics avoids multiple instantiation * downstreamhandler tracing * sharding parameterized libsonnet * removes querier replicas * default 32 concurrency for workers * jsonnet correct level override * unquote true in yaml * lowercase error + downstreamEvaluator defaults to embedded defaultEvaluator * makes shardRecorder private * logs query on failed parse * refactors engine to be multi-use, minimizes logger injection, generalizes Query methods, removes Engine interface * basic tests for querysharding mware * [wip] concurrent evaluator * integrates stat propagation into sharding evaluator * splitby histogram * extends le bounds for bytes processed * byte throughput histogram buckets to 40gb * chunk duration mixin * fixes merge w/ field rename * derives logger in sharded engine via ctx & logs some downstream evaluators * moves sharded engine to top, adds comments * logs failed merge results in stats ctx * snapshotting stats merge logic is done more effectively * per query concurrency controlled via downstreamer * unexports decodereq * queryrange testware * downstreamer tests * pr requests
5 years ago
// JoinIngesterResult joins the ingester result statistics in a concurrency-safe manner.
func JoinIngesters(ctx context.Context, inc Ingester) {
stats := FromContext(ctx)
stats.mtx.Lock()
defer stats.mtx.Unlock()
Feature/querysharding ii (#1927) * [wip] sharding evaluator/ast * [wip] continues experimenting with ast mapping * refactoring in preparation for binops * evaluators can pass state to other evaluators * compiler alignment * Evaluator method renamed to StepEvaluator * chained evaluator impl * tidying up sharding code * handling for ConcatSampleExpr * downstream iterator * structure for downstreaming asts * outlines sharding optimizations * work on sharding mapper * ast sharding optimizations * test for different logrange positions * shard mapper tests * stronger ast sharding & tests * shardmapper tests for string->string * removes sharding evaluator code * removes unused ctx arg * Revert "removes sharding evaluator code" This reverts commit 55d41b9519da9496e9471f13a5048d903ea04aaa. * interfaces for downstreaming, type conversions * sharding plumbing on frontend * type alignment in queryrange to downstream sharded queriers * downstreaming support for sharding incl storage code * removes chainedevaluator * comment alignment * storage shard injection * speccing out testware for sharding equivalence * [wip] shared engine refactor * sorting streams, sharding eval fixes * downstream evaluator embeds defaultevaluator * other pkgs adopt logql changes * metrics & logs use same middleware instantiation process * wires up shardingware * middleware per metrics/logfilter * empty step populating StepEvaluator promql.Matrix adapter * sharding metrics * log/span injection into sharded engine * sharding metrics avoids multiple instantiation * downstreamhandler tracing * sharding parameterized libsonnet * removes querier replicas * default 32 concurrency for workers * jsonnet correct level override * unquote true in yaml * lowercase error + downstreamEvaluator defaults to embedded defaultEvaluator * makes shardRecorder private * logs query on failed parse * refactors engine to be multi-use, minimizes logger injection, generalizes Query methods, removes Engine interface * basic tests for querysharding mware * [wip] concurrent evaluator * integrates stat propagation into sharding evaluator * splitby histogram * extends le bounds for bytes processed * byte throughput histogram buckets to 40gb * chunk duration mixin * fixes merge w/ field rename * derives logger in sharded engine via ctx & logs some downstream evaluators * moves sharded engine to top, adds comments * logs failed merge results in stats ctx * snapshotting stats merge logic is done more effectively * per query concurrency controlled via downstreamer * unexports decodereq * queryrange testware * downstreamer tests * pr requests
5 years ago
stats.ingester.Merge(inc)
}
// ComputeSummary compute the summary of the statistics.
func (r *Result) ComputeSummary(execTime time.Duration, queueTime time.Duration, totalEntriesReturned int) {
r.Summary.TotalBytesProcessed = r.Querier.Store.Chunk.DecompressedBytes + r.Querier.Store.Chunk.HeadChunkBytes +
r.Ingester.Store.Chunk.DecompressedBytes + r.Ingester.Store.Chunk.HeadChunkBytes
r.Summary.TotalLinesProcessed = r.Querier.Store.Chunk.DecompressedLines + r.Querier.Store.Chunk.HeadChunkLines +
r.Ingester.Store.Chunk.DecompressedLines + r.Ingester.Store.Chunk.HeadChunkLines
r.Summary.TotalPostFilterLines = r.Querier.Store.Chunk.PostFilterLines + r.Ingester.Store.Chunk.PostFilterLines
r.Summary.ExecTime = execTime.Seconds()
if execTime != 0 {
r.Summary.BytesProcessedPerSecond = int64(float64(r.Summary.TotalBytesProcessed) /
execTime.Seconds())
r.Summary.LinesProcessedPerSecond = int64(float64(r.Summary.TotalLinesProcessed) /
execTime.Seconds())
}
if queueTime != 0 {
r.Summary.QueueTime = queueTime.Seconds()
}
r.Summary.TotalEntriesReturned = int64(totalEntriesReturned)
}
func (s *Store) Merge(m Store) {
s.TotalChunksRef += m.TotalChunksRef
s.TotalChunksDownloaded += m.TotalChunksDownloaded
s.ChunksDownloadTime += m.ChunksDownloadTime
s.Chunk.HeadChunkBytes += m.Chunk.HeadChunkBytes
s.Chunk.HeadChunkLines += m.Chunk.HeadChunkLines
s.Chunk.DecompressedBytes += m.Chunk.DecompressedBytes
s.Chunk.DecompressedLines += m.Chunk.DecompressedLines
s.Chunk.CompressedBytes += m.Chunk.CompressedBytes
s.Chunk.TotalDuplicates += m.Chunk.TotalDuplicates
s.Chunk.PostFilterLines += m.Chunk.PostFilterLines
}
func (s *Summary) Merge(m Summary) {
s.Splits += m.Splits
s.Shards += m.Shards
}
func (q *Querier) Merge(m Querier) {
q.Store.Merge(m.Store)
}
func (i *Ingester) Merge(m Ingester) {
i.Store.Merge(m.Store)
i.TotalBatches += m.TotalBatches
i.TotalLinesSent += m.TotalLinesSent
i.TotalChunksMatched += m.TotalChunksMatched
i.TotalReached += m.TotalReached
}
func (c *Caches) Merge(m Caches) {
c.Chunk.Merge(m.Chunk)
c.Index.Merge(m.Index)
c.Result.Merge(m.Result)
Add back cache stats for index stats requests (#9816) **What this PR does / why we need it**: In https://github.com/grafana/loki/pull/9536, we added cache stats for index stats requests. That PR had a bug that inflated the query stats due to reusing the stats context in the query engine. Therefore, we had to revert the PR at https://github.com/grafana/loki/pull/9721. This PR brings back the changes from https://github.com/grafana/loki/pull/9536 but fixes the inflated starts by no longer reusing the same context in the query engine, but rather creating a new one for the shard resolver. I tested it on a dev cluster and seems to be working fine. here's the output for the same query: **Stats with the bug from #9536**: ``` ... Cache.StatsResult.Requests 980 Cache.StatsResult.EntriesRequested 490 Cache.StatsResult.EntriesFound 0 Cache.StatsResult.EntriesStored 490 Cache.StatsResult.BytesSent 0 B Cache.StatsResult.BytesReceived 0 B ... Summary.BytesProcessedPerSecond 43 GB Summary.LinesProcessedPerSecond 93305142 Summary.TotalBytesProcessed 945 GB Summary.TotalLinesProcessed 2059694183 ``` **Stats from _main_** ``` ... Summary.BytesProcessedPerSecond 1.6 GB Summary.LinesProcessedPerSecond 3403718 Summary.TotalBytesProcessed 95 GB Summary.TotalLinesProcessed 207971404 ``` **Stats with fix in this PR** ``` .. Cache.StatsResult.Requests 132 Cache.StatsResult.EntriesRequested 66 Cache.StatsResult.EntriesFound 0 Cache.StatsResult.EntriesStored 66 Cache.StatsResult.BytesSent 0 B Cache.StatsResult.BytesReceived 0 B ... Summary.BytesProcessedPerSecond 4.3 GB Summary.LinesProcessedPerSecond 9468900 Summary.TotalBytesProcessed 95 GB Summary.TotalLinesProcessed 207793816 ``` As can be seen, with the changes in this PR, the summary stats are no longer inflated. **Which issue(s) this PR fixes**: Fixes https://github.com/grafana/loki/pull/9536 **Special notes for your reviewer**: I think it's ok to skip reviewing the changes from the commit cherry-picking the changes from https://github.com/grafana/loki/pull/9536 **Checklist** - [ ] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [ ] Tests updated - [ ] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213)
2 years ago
c.StatsResult.Merge(m.StatsResult)
}
func (c *Cache) Merge(m Cache) {
c.EntriesFound += m.EntriesFound
c.EntriesRequested += m.EntriesRequested
c.EntriesStored += m.EntriesStored
c.Requests += m.Requests
c.BytesSent += m.BytesSent
c.BytesReceived += m.BytesReceived
c.DownloadTime += m.DownloadTime
}
func (c *Cache) CacheDownloadTime() time.Duration {
return time.Duration(c.DownloadTime)
}
func (r *Result) MergeSplit(m Result) {
m.Summary.Splits = 1
r.Merge(m)
}
// Merge merges two results of statistics.
func (r *Result) Merge(m Result) {
r.Querier.Merge(m.Querier)
r.Ingester.Merge(m.Ingester)
r.Caches.Merge(m.Caches)
r.Summary.Merge(m.Summary)
r.ComputeSummary(ConvertSecondsToNanoseconds(r.Summary.ExecTime+m.Summary.ExecTime),
ConvertSecondsToNanoseconds(r.Summary.QueueTime+m.Summary.QueueTime), int(r.Summary.TotalEntriesReturned))
}
// ConvertSecondsToNanoseconds converts time.Duration representation of seconds (float64)
// into time.Duration representation of nanoseconds (int64)
func ConvertSecondsToNanoseconds(seconds float64) time.Duration {
return time.Duration(int64(seconds * float64(time.Second)))
}
Feature/querysharding ii (#1927) * [wip] sharding evaluator/ast * [wip] continues experimenting with ast mapping * refactoring in preparation for binops * evaluators can pass state to other evaluators * compiler alignment * Evaluator method renamed to StepEvaluator * chained evaluator impl * tidying up sharding code * handling for ConcatSampleExpr * downstream iterator * structure for downstreaming asts * outlines sharding optimizations * work on sharding mapper * ast sharding optimizations * test for different logrange positions * shard mapper tests * stronger ast sharding & tests * shardmapper tests for string->string * removes sharding evaluator code * removes unused ctx arg * Revert "removes sharding evaluator code" This reverts commit 55d41b9519da9496e9471f13a5048d903ea04aaa. * interfaces for downstreaming, type conversions * sharding plumbing on frontend * type alignment in queryrange to downstream sharded queriers * downstreaming support for sharding incl storage code * removes chainedevaluator * comment alignment * storage shard injection * speccing out testware for sharding equivalence * [wip] shared engine refactor * sorting streams, sharding eval fixes * downstream evaluator embeds defaultevaluator * other pkgs adopt logql changes * metrics & logs use same middleware instantiation process * wires up shardingware * middleware per metrics/logfilter * empty step populating StepEvaluator promql.Matrix adapter * sharding metrics * log/span injection into sharded engine * sharding metrics avoids multiple instantiation * downstreamhandler tracing * sharding parameterized libsonnet * removes querier replicas * default 32 concurrency for workers * jsonnet correct level override * unquote true in yaml * lowercase error + downstreamEvaluator defaults to embedded defaultEvaluator * makes shardRecorder private * logs query on failed parse * refactors engine to be multi-use, minimizes logger injection, generalizes Query methods, removes Engine interface * basic tests for querysharding mware * [wip] concurrent evaluator * integrates stat propagation into sharding evaluator * splitby histogram * extends le bounds for bytes processed * byte throughput histogram buckets to 40gb * chunk duration mixin * fixes merge w/ field rename * derives logger in sharded engine via ctx & logs some downstream evaluators * moves sharded engine to top, adds comments * logs failed merge results in stats ctx * snapshotting stats merge logic is done more effectively * per query concurrency controlled via downstreamer * unexports decodereq * queryrange testware * downstreamer tests * pr requests
5 years ago
func (r Result) ChunksDownloadTime() time.Duration {
return time.Duration(r.Querier.Store.ChunksDownloadTime + r.Ingester.Store.ChunksDownloadTime)
}
Feature/querysharding ii (#1927) * [wip] sharding evaluator/ast * [wip] continues experimenting with ast mapping * refactoring in preparation for binops * evaluators can pass state to other evaluators * compiler alignment * Evaluator method renamed to StepEvaluator * chained evaluator impl * tidying up sharding code * handling for ConcatSampleExpr * downstream iterator * structure for downstreaming asts * outlines sharding optimizations * work on sharding mapper * ast sharding optimizations * test for different logrange positions * shard mapper tests * stronger ast sharding & tests * shardmapper tests for string->string * removes sharding evaluator code * removes unused ctx arg * Revert "removes sharding evaluator code" This reverts commit 55d41b9519da9496e9471f13a5048d903ea04aaa. * interfaces for downstreaming, type conversions * sharding plumbing on frontend * type alignment in queryrange to downstream sharded queriers * downstreaming support for sharding incl storage code * removes chainedevaluator * comment alignment * storage shard injection * speccing out testware for sharding equivalence * [wip] shared engine refactor * sorting streams, sharding eval fixes * downstream evaluator embeds defaultevaluator * other pkgs adopt logql changes * metrics & logs use same middleware instantiation process * wires up shardingware * middleware per metrics/logfilter * empty step populating StepEvaluator promql.Matrix adapter * sharding metrics * log/span injection into sharded engine * sharding metrics avoids multiple instantiation * downstreamhandler tracing * sharding parameterized libsonnet * removes querier replicas * default 32 concurrency for workers * jsonnet correct level override * unquote true in yaml * lowercase error + downstreamEvaluator defaults to embedded defaultEvaluator * makes shardRecorder private * logs query on failed parse * refactors engine to be multi-use, minimizes logger injection, generalizes Query methods, removes Engine interface * basic tests for querysharding mware * [wip] concurrent evaluator * integrates stat propagation into sharding evaluator * splitby histogram * extends le bounds for bytes processed * byte throughput histogram buckets to 40gb * chunk duration mixin * fixes merge w/ field rename * derives logger in sharded engine via ctx & logs some downstream evaluators * moves sharded engine to top, adds comments * logs failed merge results in stats ctx * snapshotting stats merge logic is done more effectively * per query concurrency controlled via downstreamer * unexports decodereq * queryrange testware * downstreamer tests * pr requests
5 years ago
func (r Result) TotalDuplicates() int64 {
return r.Querier.Store.Chunk.TotalDuplicates + r.Ingester.Store.Chunk.TotalDuplicates
Feature/querysharding ii (#1927) * [wip] sharding evaluator/ast * [wip] continues experimenting with ast mapping * refactoring in preparation for binops * evaluators can pass state to other evaluators * compiler alignment * Evaluator method renamed to StepEvaluator * chained evaluator impl * tidying up sharding code * handling for ConcatSampleExpr * downstream iterator * structure for downstreaming asts * outlines sharding optimizations * work on sharding mapper * ast sharding optimizations * test for different logrange positions * shard mapper tests * stronger ast sharding & tests * shardmapper tests for string->string * removes sharding evaluator code * removes unused ctx arg * Revert "removes sharding evaluator code" This reverts commit 55d41b9519da9496e9471f13a5048d903ea04aaa. * interfaces for downstreaming, type conversions * sharding plumbing on frontend * type alignment in queryrange to downstream sharded queriers * downstreaming support for sharding incl storage code * removes chainedevaluator * comment alignment * storage shard injection * speccing out testware for sharding equivalence * [wip] shared engine refactor * sorting streams, sharding eval fixes * downstream evaluator embeds defaultevaluator * other pkgs adopt logql changes * metrics & logs use same middleware instantiation process * wires up shardingware * middleware per metrics/logfilter * empty step populating StepEvaluator promql.Matrix adapter * sharding metrics * log/span injection into sharded engine * sharding metrics avoids multiple instantiation * downstreamhandler tracing * sharding parameterized libsonnet * removes querier replicas * default 32 concurrency for workers * jsonnet correct level override * unquote true in yaml * lowercase error + downstreamEvaluator defaults to embedded defaultEvaluator * makes shardRecorder private * logs query on failed parse * refactors engine to be multi-use, minimizes logger injection, generalizes Query methods, removes Engine interface * basic tests for querysharding mware * [wip] concurrent evaluator * integrates stat propagation into sharding evaluator * splitby histogram * extends le bounds for bytes processed * byte throughput histogram buckets to 40gb * chunk duration mixin * fixes merge w/ field rename * derives logger in sharded engine via ctx & logs some downstream evaluators * moves sharded engine to top, adds comments * logs failed merge results in stats ctx * snapshotting stats merge logic is done more effectively * per query concurrency controlled via downstreamer * unexports decodereq * queryrange testware * downstreamer tests * pr requests
5 years ago
}
func (r Result) TotalChunksDownloaded() int64 {
return r.Querier.Store.TotalChunksDownloaded + r.Ingester.Store.TotalChunksDownloaded
Feature/querysharding ii (#1927) * [wip] sharding evaluator/ast * [wip] continues experimenting with ast mapping * refactoring in preparation for binops * evaluators can pass state to other evaluators * compiler alignment * Evaluator method renamed to StepEvaluator * chained evaluator impl * tidying up sharding code * handling for ConcatSampleExpr * downstream iterator * structure for downstreaming asts * outlines sharding optimizations * work on sharding mapper * ast sharding optimizations * test for different logrange positions * shard mapper tests * stronger ast sharding & tests * shardmapper tests for string->string * removes sharding evaluator code * removes unused ctx arg * Revert "removes sharding evaluator code" This reverts commit 55d41b9519da9496e9471f13a5048d903ea04aaa. * interfaces for downstreaming, type conversions * sharding plumbing on frontend * type alignment in queryrange to downstream sharded queriers * downstreaming support for sharding incl storage code * removes chainedevaluator * comment alignment * storage shard injection * speccing out testware for sharding equivalence * [wip] shared engine refactor * sorting streams, sharding eval fixes * downstream evaluator embeds defaultevaluator * other pkgs adopt logql changes * metrics & logs use same middleware instantiation process * wires up shardingware * middleware per metrics/logfilter * empty step populating StepEvaluator promql.Matrix adapter * sharding metrics * log/span injection into sharded engine * sharding metrics avoids multiple instantiation * downstreamhandler tracing * sharding parameterized libsonnet * removes querier replicas * default 32 concurrency for workers * jsonnet correct level override * unquote true in yaml * lowercase error + downstreamEvaluator defaults to embedded defaultEvaluator * makes shardRecorder private * logs query on failed parse * refactors engine to be multi-use, minimizes logger injection, generalizes Query methods, removes Engine interface * basic tests for querysharding mware * [wip] concurrent evaluator * integrates stat propagation into sharding evaluator * splitby histogram * extends le bounds for bytes processed * byte throughput histogram buckets to 40gb * chunk duration mixin * fixes merge w/ field rename * derives logger in sharded engine via ctx & logs some downstream evaluators * moves sharded engine to top, adds comments * logs failed merge results in stats ctx * snapshotting stats merge logic is done more effectively * per query concurrency controlled via downstreamer * unexports decodereq * queryrange testware * downstreamer tests * pr requests
5 years ago
}
func (r Result) TotalChunksRef() int64 {
return r.Querier.Store.TotalChunksRef + r.Ingester.Store.TotalChunksRef
}
func (r Result) TotalDecompressedBytes() int64 {
return r.Querier.Store.Chunk.DecompressedBytes + r.Ingester.Store.Chunk.DecompressedBytes
}
func (r Result) TotalDecompressedLines() int64 {
return r.Querier.Store.Chunk.DecompressedLines + r.Ingester.Store.Chunk.DecompressedLines
}
func (c *Context) AddIngesterBatch(size int64) {
atomic.AddInt64(&c.ingester.TotalBatches, 1)
atomic.AddInt64(&c.ingester.TotalLinesSent, size)
}
func (c *Context) AddIngesterTotalChunkMatched(i int64) {
atomic.AddInt64(&c.ingester.TotalChunksMatched, i)
}
func (c *Context) AddIngesterReached(i int32) {
atomic.AddInt32(&c.ingester.TotalReached, i)
}
func (c *Context) AddHeadChunkLines(i int64) {
atomic.AddInt64(&c.store.Chunk.HeadChunkLines, i)
}
func (c *Context) AddHeadChunkBytes(i int64) {
atomic.AddInt64(&c.store.Chunk.HeadChunkBytes, i)
}
func (c *Context) AddCompressedBytes(i int64) {
atomic.AddInt64(&c.store.Chunk.CompressedBytes, i)
}
func (c *Context) AddDecompressedBytes(i int64) {
atomic.AddInt64(&c.store.Chunk.DecompressedBytes, i)
}
func (c *Context) AddDecompressedLines(i int64) {
atomic.AddInt64(&c.store.Chunk.DecompressedLines, i)
}
func (c *Context) AddPostFilterLines(i int64) {
atomic.AddInt64(&c.store.Chunk.PostFilterLines, i)
}
func (c *Context) AddDuplicates(i int64) {
atomic.AddInt64(&c.store.Chunk.TotalDuplicates, i)
}
func (c *Context) AddChunksDownloadTime(i time.Duration) {
atomic.AddInt64(&c.store.ChunksDownloadTime, int64(i))
}
func (c *Context) AddChunksDownloaded(i int64) {
atomic.AddInt64(&c.store.TotalChunksDownloaded, i)
}
func (c *Context) AddChunksRef(i int64) {
atomic.AddInt64(&c.store.TotalChunksRef, i)
}
// AddCacheEntriesFound counts the number of cache entries requested and found
func (c *Context) AddCacheEntriesFound(t CacheType, i int) {
stats := c.getCacheStatsByType(t)
if stats == nil {
return
}
atomic.AddInt32(&stats.EntriesFound, int32(i))
}
// AddCacheEntriesRequested counts the number of keys requested from the cache
func (c *Context) AddCacheEntriesRequested(t CacheType, i int) {
stats := c.getCacheStatsByType(t)
if stats == nil {
return
}
atomic.AddInt32(&stats.EntriesRequested, int32(i))
}
// AddCacheEntriesStored counts the number of keys *attempted* to be stored in the cache
// It should be noted that if a background writeback (https://grafana.com/docs/loki/latest/configuration/#cache_config)
// is configured we cannot know if these store attempts succeeded or not as this happens asynchronously
func (c *Context) AddCacheEntriesStored(t CacheType, i int) {
stats := c.getCacheStatsByType(t)
if stats == nil {
return
}
atomic.AddInt32(&stats.EntriesStored, int32(i))
}
// AddCacheBytesRetrieved counts the amount of bytes retrieved from the cache
func (c *Context) AddCacheBytesRetrieved(t CacheType, i int) {
stats := c.getCacheStatsByType(t)
if stats == nil {
return
}
atomic.AddInt64(&stats.BytesReceived, int64(i))
}
// AddCacheBytesSent counts the amount of bytes sent to the cache
// It should be noted that if a background writeback (https://grafana.com/docs/loki/latest/configuration/#cache_config)
// is configured we cannot know if these bytes actually got stored or not as this happens asynchronously
func (c *Context) AddCacheBytesSent(t CacheType, i int) {
stats := c.getCacheStatsByType(t)
if stats == nil {
return
}
atomic.AddInt64(&stats.BytesSent, int64(i))
}
// AddCacheDownloadTime measures the time to download the data from cache
func (c *Context) AddCacheDownloadTime(t CacheType, i time.Duration) {
stats := c.getCacheStatsByType(t)
if stats == nil {
return
}
atomic.AddInt64(&stats.DownloadTime, int64(i))
}
// AddCacheRequest counts the number of fetch/store requests to the cache
func (c *Context) AddCacheRequest(t CacheType, i int) {
stats := c.getCacheStatsByType(t)
if stats == nil {
return
}
atomic.AddInt32(&stats.Requests, int32(i))
}
func (c *Context) AddSplitQueries(num int64) {
atomic.AddInt64(&c.result.Summary.Splits, num)
}
func (c *Context) getCacheStatsByType(t CacheType) *Cache {
var stats *Cache
switch t {
case ChunkCache:
stats = &c.caches.Chunk
case IndexCache:
stats = &c.caches.Index
case ResultCache:
stats = &c.caches.Result
Add back cache stats for index stats requests (#9816) **What this PR does / why we need it**: In https://github.com/grafana/loki/pull/9536, we added cache stats for index stats requests. That PR had a bug that inflated the query stats due to reusing the stats context in the query engine. Therefore, we had to revert the PR at https://github.com/grafana/loki/pull/9721. This PR brings back the changes from https://github.com/grafana/loki/pull/9536 but fixes the inflated starts by no longer reusing the same context in the query engine, but rather creating a new one for the shard resolver. I tested it on a dev cluster and seems to be working fine. here's the output for the same query: **Stats with the bug from #9536**: ``` ... Cache.StatsResult.Requests 980 Cache.StatsResult.EntriesRequested 490 Cache.StatsResult.EntriesFound 0 Cache.StatsResult.EntriesStored 490 Cache.StatsResult.BytesSent 0 B Cache.StatsResult.BytesReceived 0 B ... Summary.BytesProcessedPerSecond 43 GB Summary.LinesProcessedPerSecond 93305142 Summary.TotalBytesProcessed 945 GB Summary.TotalLinesProcessed 2059694183 ``` **Stats from _main_** ``` ... Summary.BytesProcessedPerSecond 1.6 GB Summary.LinesProcessedPerSecond 3403718 Summary.TotalBytesProcessed 95 GB Summary.TotalLinesProcessed 207971404 ``` **Stats with fix in this PR** ``` .. Cache.StatsResult.Requests 132 Cache.StatsResult.EntriesRequested 66 Cache.StatsResult.EntriesFound 0 Cache.StatsResult.EntriesStored 66 Cache.StatsResult.BytesSent 0 B Cache.StatsResult.BytesReceived 0 B ... Summary.BytesProcessedPerSecond 4.3 GB Summary.LinesProcessedPerSecond 9468900 Summary.TotalBytesProcessed 95 GB Summary.TotalLinesProcessed 207793816 ``` As can be seen, with the changes in this PR, the summary stats are no longer inflated. **Which issue(s) this PR fixes**: Fixes https://github.com/grafana/loki/pull/9536 **Special notes for your reviewer**: I think it's ok to skip reviewing the changes from the commit cherry-picking the changes from https://github.com/grafana/loki/pull/9536 **Checklist** - [ ] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [ ] Tests updated - [ ] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213)
2 years ago
case StatsResultCache:
stats = &c.caches.StatsResult
default:
return nil
}
return stats
}
// Log logs a query statistics result.
func (r Result) Log(log log.Logger) {
_ = log.Log(
"Ingester.TotalReached", r.Ingester.TotalReached,
"Ingester.TotalChunksMatched", r.Ingester.TotalChunksMatched,
"Ingester.TotalBatches", r.Ingester.TotalBatches,
"Ingester.TotalLinesSent", r.Ingester.TotalLinesSent,
"Ingester.TotalChunksRef", r.Ingester.Store.TotalChunksRef,
"Ingester.TotalChunksDownloaded", r.Ingester.Store.TotalChunksDownloaded,
"Ingester.ChunksDownloadTime", time.Duration(r.Ingester.Store.ChunksDownloadTime),
"Ingester.HeadChunkBytes", humanize.Bytes(uint64(r.Ingester.Store.Chunk.HeadChunkBytes)),
"Ingester.HeadChunkLines", r.Ingester.Store.Chunk.HeadChunkLines,
"Ingester.DecompressedBytes", humanize.Bytes(uint64(r.Ingester.Store.Chunk.DecompressedBytes)),
"Ingester.DecompressedLines", r.Ingester.Store.Chunk.DecompressedLines,
"Ingester.PostFilterLInes", r.Ingester.Store.Chunk.PostFilterLines,
"Ingester.CompressedBytes", humanize.Bytes(uint64(r.Ingester.Store.Chunk.CompressedBytes)),
"Ingester.TotalDuplicates", r.Ingester.Store.Chunk.TotalDuplicates,
"Querier.TotalChunksRef", r.Querier.Store.TotalChunksRef,
"Querier.TotalChunksDownloaded", r.Querier.Store.TotalChunksDownloaded,
"Querier.ChunksDownloadTime", time.Duration(r.Querier.Store.ChunksDownloadTime),
"Querier.HeadChunkBytes", humanize.Bytes(uint64(r.Querier.Store.Chunk.HeadChunkBytes)),
"Querier.HeadChunkLines", r.Querier.Store.Chunk.HeadChunkLines,
"Querier.DecompressedBytes", humanize.Bytes(uint64(r.Querier.Store.Chunk.DecompressedBytes)),
"Querier.DecompressedLines", r.Querier.Store.Chunk.DecompressedLines,
"Querier.PostFilterLInes", r.Querier.Store.Chunk.PostFilterLines,
"Querier.CompressedBytes", humanize.Bytes(uint64(r.Querier.Store.Chunk.CompressedBytes)),
"Querier.TotalDuplicates", r.Querier.Store.Chunk.TotalDuplicates,
)
r.Caches.Log(log)
r.Summary.Log(log)
}
func (s Summary) Log(log log.Logger) {
_ = log.Log(
"Summary.BytesProcessedPerSecond", humanize.Bytes(uint64(s.BytesProcessedPerSecond)),
"Summary.LinesProcessedPerSecond", s.LinesProcessedPerSecond,
"Summary.TotalBytesProcessed", humanize.Bytes(uint64(s.TotalBytesProcessed)),
"Summary.TotalLinesProcessed", s.TotalLinesProcessed,
"Summary.PostFilterLines", s.TotalPostFilterLines,
"Summary.ExecTime", ConvertSecondsToNanoseconds(s.ExecTime),
"Summary.QueueTime", ConvertSecondsToNanoseconds(s.QueueTime),
)
Feature/querysharding ii (#1927) * [wip] sharding evaluator/ast * [wip] continues experimenting with ast mapping * refactoring in preparation for binops * evaluators can pass state to other evaluators * compiler alignment * Evaluator method renamed to StepEvaluator * chained evaluator impl * tidying up sharding code * handling for ConcatSampleExpr * downstream iterator * structure for downstreaming asts * outlines sharding optimizations * work on sharding mapper * ast sharding optimizations * test for different logrange positions * shard mapper tests * stronger ast sharding & tests * shardmapper tests for string->string * removes sharding evaluator code * removes unused ctx arg * Revert "removes sharding evaluator code" This reverts commit 55d41b9519da9496e9471f13a5048d903ea04aaa. * interfaces for downstreaming, type conversions * sharding plumbing on frontend * type alignment in queryrange to downstream sharded queriers * downstreaming support for sharding incl storage code * removes chainedevaluator * comment alignment * storage shard injection * speccing out testware for sharding equivalence * [wip] shared engine refactor * sorting streams, sharding eval fixes * downstream evaluator embeds defaultevaluator * other pkgs adopt logql changes * metrics & logs use same middleware instantiation process * wires up shardingware * middleware per metrics/logfilter * empty step populating StepEvaluator promql.Matrix adapter * sharding metrics * log/span injection into sharded engine * sharding metrics avoids multiple instantiation * downstreamhandler tracing * sharding parameterized libsonnet * removes querier replicas * default 32 concurrency for workers * jsonnet correct level override * unquote true in yaml * lowercase error + downstreamEvaluator defaults to embedded defaultEvaluator * makes shardRecorder private * logs query on failed parse * refactors engine to be multi-use, minimizes logger injection, generalizes Query methods, removes Engine interface * basic tests for querysharding mware * [wip] concurrent evaluator * integrates stat propagation into sharding evaluator * splitby histogram * extends le bounds for bytes processed * byte throughput histogram buckets to 40gb * chunk duration mixin * fixes merge w/ field rename * derives logger in sharded engine via ctx & logs some downstream evaluators * moves sharded engine to top, adds comments * logs failed merge results in stats ctx * snapshotting stats merge logic is done more effectively * per query concurrency controlled via downstreamer * unexports decodereq * queryrange testware * downstreamer tests * pr requests
5 years ago
}
func (c Caches) Log(log log.Logger) {
_ = log.Log(
"Cache.Chunk.Requests", c.Chunk.Requests,
"Cache.Chunk.EntriesRequested", c.Chunk.EntriesRequested,
"Cache.Chunk.EntriesFound", c.Chunk.EntriesFound,
"Cache.Chunk.EntriesStored", c.Chunk.EntriesStored,
"Cache.Chunk.BytesSent", humanize.Bytes(uint64(c.Chunk.BytesSent)),
"Cache.Chunk.BytesReceived", humanize.Bytes(uint64(c.Chunk.BytesReceived)),
"Cache.Chunk.DownloadTime", c.Chunk.CacheDownloadTime(),
"Cache.Index.Requests", c.Index.Requests,
"Cache.Index.EntriesRequested", c.Index.EntriesRequested,
"Cache.Index.EntriesFound", c.Index.EntriesFound,
"Cache.Index.EntriesStored", c.Index.EntriesStored,
"Cache.Index.BytesSent", humanize.Bytes(uint64(c.Index.BytesSent)),
"Cache.Index.BytesReceived", humanize.Bytes(uint64(c.Index.BytesReceived)),
"Cache.Index.DownloadTime", c.Index.CacheDownloadTime(),
Add back cache stats for index stats requests (#9816) **What this PR does / why we need it**: In https://github.com/grafana/loki/pull/9536, we added cache stats for index stats requests. That PR had a bug that inflated the query stats due to reusing the stats context in the query engine. Therefore, we had to revert the PR at https://github.com/grafana/loki/pull/9721. This PR brings back the changes from https://github.com/grafana/loki/pull/9536 but fixes the inflated starts by no longer reusing the same context in the query engine, but rather creating a new one for the shard resolver. I tested it on a dev cluster and seems to be working fine. here's the output for the same query: **Stats with the bug from #9536**: ``` ... Cache.StatsResult.Requests 980 Cache.StatsResult.EntriesRequested 490 Cache.StatsResult.EntriesFound 0 Cache.StatsResult.EntriesStored 490 Cache.StatsResult.BytesSent 0 B Cache.StatsResult.BytesReceived 0 B ... Summary.BytesProcessedPerSecond 43 GB Summary.LinesProcessedPerSecond 93305142 Summary.TotalBytesProcessed 945 GB Summary.TotalLinesProcessed 2059694183 ``` **Stats from _main_** ``` ... Summary.BytesProcessedPerSecond 1.6 GB Summary.LinesProcessedPerSecond 3403718 Summary.TotalBytesProcessed 95 GB Summary.TotalLinesProcessed 207971404 ``` **Stats with fix in this PR** ``` .. Cache.StatsResult.Requests 132 Cache.StatsResult.EntriesRequested 66 Cache.StatsResult.EntriesFound 0 Cache.StatsResult.EntriesStored 66 Cache.StatsResult.BytesSent 0 B Cache.StatsResult.BytesReceived 0 B ... Summary.BytesProcessedPerSecond 4.3 GB Summary.LinesProcessedPerSecond 9468900 Summary.TotalBytesProcessed 95 GB Summary.TotalLinesProcessed 207793816 ``` As can be seen, with the changes in this PR, the summary stats are no longer inflated. **Which issue(s) this PR fixes**: Fixes https://github.com/grafana/loki/pull/9536 **Special notes for your reviewer**: I think it's ok to skip reviewing the changes from the commit cherry-picking the changes from https://github.com/grafana/loki/pull/9536 **Checklist** - [ ] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [ ] Tests updated - [ ] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213)
2 years ago
"Cache.StatsResult.Requests", c.StatsResult.Requests,
"Cache.StatsResult.EntriesRequested", c.StatsResult.EntriesRequested,
"Cache.StatsResult.EntriesFound", c.StatsResult.EntriesFound,
"Cache.StatsResult.EntriesStored", c.StatsResult.EntriesStored,
"Cache.StatsResult.BytesSent", humanize.Bytes(uint64(c.StatsResult.BytesSent)),
"Cache.StatsResult.BytesReceived", humanize.Bytes(uint64(c.StatsResult.BytesReceived)),
"Cache.Result.DownloadTime", c.Result.CacheDownloadTime(),
"Cache.Result.Requests", c.Result.Requests,
"Cache.Result.EntriesRequested", c.Result.EntriesRequested,
"Cache.Result.EntriesFound", c.Result.EntriesFound,
"Cache.Result.EntriesStored", c.Result.EntriesStored,
"Cache.Result.BytesSent", humanize.Bytes(uint64(c.Result.BytesSent)),
"Cache.Result.BytesReceived", humanize.Bytes(uint64(c.Result.BytesReceived)),
"Cache.Result.DownloadTime", c.Result.CacheDownloadTime(),
)
}