Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loki/pkg/logproto/extensions.go

92 lines
2.6 KiB

package logproto
import (
Use series hash to identify uniques in merge. (#9985) **What this PR does / why we need it**: A run of the protobuf encoding for the querier showed a spike in memory usage in the `codec.MergeResponse` method. The routine was allocating a lot of memory for the `series.String()` method which is then hashed. Instead, we can use a `uint64` hash. ``` › go test -v -run=^$ -bench "Benchmark_MergeResponses$" -count=10 ./pkg/querier/queryrange ... │ before.txt │ after.txt │ │ sec/op │ sec/op vs base │ _MergeResponses-16 4.563 ± 1% 1.197 ± 1% -73.77% (p=0.000 n=10) │ before.txt │ after.txt │ │ B/op │ B/op vs base │ _MergeResponses-16 6578.715Mi ± 0% 2.362Mi ± 0% -99.96% (p=0.000 n=10) │ before.txt │ after.txt │ │ allocs/op │ allocs/op vs base │ _MergeResponses-16 40500.1k ± 0% 100.1k ± 0% -99.75% (p=0.000 n=10) ``` **Checklist** - [ ] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [x] Tests updated - [ ] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213)
2 years ago
"sort"
fixes bug with queryIngesterWithin logic in asyncStore ingester stats… (#8145) Fixes a previous mistake in the logic calculating when to skip querying ingesters in the async store Statistics method. Notably `through.After` should be `through.Before` when skipping querying ingesters as that's when there's no overlap with the `query-ingesters-within` period: ```go // OLD CODE BELOW if a.queryIngestersWithin != 0 { // don't query ingesters if the query does not overlap with queryIngestersWithin. if !through.After(model.Now().Add(-a.queryIngestersWithin)) { // <----- should be through.Before return a.Store.Stats(ctx, userID, from, through, matchers...) } } ``` I discovered the problem while debugging querier OOMs during a boltdb-shipper -> tsdb migration and ultimately found this happened under the following circumstances: * Queries over high volumes of _recent_ data wouldn't query ingesters for index metadata * Without index metadata, we only checked storage metadata * There is a delay before we ship the index to storage, meaning we don't see it if ingesters are skipped * Calculating ideal shard factors without recent data for queries that only touch recent data _dramatically_ underestimates the desired shard factor * Queries aren't split enough and get scheduled onto too few querier replicas * They oom. We had some fun examples like a single replica trying to download 419,000 chunks/query To be clear, this is still a hypothesis, but a plausible one, especially after finding the boolean logic error this PR fixes. Instead of changing this one line, I took the opportunity to refactor this into a shared utility used by our other `GetChunkRefs` method which is already tested, ensuring the logic works as expected. I also added some more logging visibility into this code so we can understand what the difference is when querying statistics from storage vs ingesters. Finally, I added a helper to prepare our `Stats` objects to be logged which is now used in a few places.
3 years ago
"strings"
"sync/atomic" //lint:ignore faillint we can't use go.uber.org/atomic with a protobuf struct without wrapping it.
Use series hash to identify uniques in merge. (#9985) **What this PR does / why we need it**: A run of the protobuf encoding for the querier showed a spike in memory usage in the `codec.MergeResponse` method. The routine was allocating a lot of memory for the `series.String()` method which is then hashed. Instead, we can use a `uint64` hash. ``` › go test -v -run=^$ -bench "Benchmark_MergeResponses$" -count=10 ./pkg/querier/queryrange ... │ before.txt │ after.txt │ │ sec/op │ sec/op vs base │ _MergeResponses-16 4.563 ± 1% 1.197 ± 1% -73.77% (p=0.000 n=10) │ before.txt │ after.txt │ │ B/op │ B/op vs base │ _MergeResponses-16 6578.715Mi ± 0% 2.362Mi ± 0% -99.96% (p=0.000 n=10) │ before.txt │ after.txt │ │ allocs/op │ allocs/op vs base │ _MergeResponses-16 40500.1k ± 0% 100.1k ± 0% -99.75% (p=0.000 n=10) ``` **Checklist** - [ ] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [x] Tests updated - [ ] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213)
2 years ago
"github.com/cespare/xxhash/v2"
fixes bug with queryIngesterWithin logic in asyncStore ingester stats… (#8145) Fixes a previous mistake in the logic calculating when to skip querying ingesters in the async store Statistics method. Notably `through.After` should be `through.Before` when skipping querying ingesters as that's when there's no overlap with the `query-ingesters-within` period: ```go // OLD CODE BELOW if a.queryIngestersWithin != 0 { // don't query ingesters if the query does not overlap with queryIngestersWithin. if !through.After(model.Now().Add(-a.queryIngestersWithin)) { // <----- should be through.Before return a.Store.Stats(ctx, userID, from, through, matchers...) } } ``` I discovered the problem while debugging querier OOMs during a boltdb-shipper -> tsdb migration and ultimately found this happened under the following circumstances: * Queries over high volumes of _recent_ data wouldn't query ingesters for index metadata * Without index metadata, we only checked storage metadata * There is a delay before we ship the index to storage, meaning we don't see it if ingesters are skipped * Calculating ideal shard factors without recent data for queries that only touch recent data _dramatically_ underestimates the desired shard factor * Queries aren't split enough and get scheduled onto too few querier replicas * They oom. We had some fun examples like a single replica trying to download 419,000 chunks/query To be clear, this is still a hypothesis, but a plausible one, especially after finding the boolean logic error this PR fixes. Instead of changing this one line, I took the opportunity to refactor this into a shared utility used by our other `GetChunkRefs` method which is already tested, ensuring the logic works as expected. I also added some more logging visibility into this code so we can understand what the difference is when querying statistics from storage vs ingesters. Finally, I added a helper to prepare our `Stats` objects to be logged which is now used in a few places.
3 years ago
"github.com/dustin/go-humanize"
"github.com/prometheus/common/model"
"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
)
Use series hash to identify uniques in merge. (#9985) **What this PR does / why we need it**: A run of the protobuf encoding for the querier showed a spike in memory usage in the `codec.MergeResponse` method. The routine was allocating a lot of memory for the `series.String()` method which is then hashed. Instead, we can use a `uint64` hash. ``` › go test -v -run=^$ -bench "Benchmark_MergeResponses$" -count=10 ./pkg/querier/queryrange ... │ before.txt │ after.txt │ │ sec/op │ sec/op vs base │ _MergeResponses-16 4.563 ± 1% 1.197 ± 1% -73.77% (p=0.000 n=10) │ before.txt │ after.txt │ │ B/op │ B/op vs base │ _MergeResponses-16 6578.715Mi ± 0% 2.362Mi ± 0% -99.96% (p=0.000 n=10) │ before.txt │ after.txt │ │ allocs/op │ allocs/op vs base │ _MergeResponses-16 40500.1k ± 0% 100.1k ± 0% -99.75% (p=0.000 n=10) ``` **Checklist** - [ ] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [x] Tests updated - [ ] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213)
2 years ago
var seps = []byte{'\xff'}
Use series hash to identify uniques in merge. (#9985) **What this PR does / why we need it**: A run of the protobuf encoding for the querier showed a spike in memory usage in the `codec.MergeResponse` method. The routine was allocating a lot of memory for the `series.String()` method which is then hashed. Instead, we can use a `uint64` hash. ``` › go test -v -run=^$ -bench "Benchmark_MergeResponses$" -count=10 ./pkg/querier/queryrange ... │ before.txt │ after.txt │ │ sec/op │ sec/op vs base │ _MergeResponses-16 4.563 ± 1% 1.197 ± 1% -73.77% (p=0.000 n=10) │ before.txt │ after.txt │ │ B/op │ B/op vs base │ _MergeResponses-16 6578.715Mi ± 0% 2.362Mi ± 0% -99.96% (p=0.000 n=10) │ before.txt │ after.txt │ │ allocs/op │ allocs/op vs base │ _MergeResponses-16 40500.1k ± 0% 100.1k ± 0% -99.75% (p=0.000 n=10) ``` **Checklist** - [ ] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [x] Tests updated - [ ] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213)
2 years ago
// Hash returns hash of the labels according to Prometheus' Labels.Hash function.
// `b` and `keysForLabels` are buffers that should be reused to avoid
// allocations.
func (id SeriesIdentifier) Hash(b []byte, keysForLabels []string) (uint64, []string) {
keysForLabels = keysForLabels[:0]
for k := range id.Labels {
keysForLabels = append(keysForLabels, k)
}
sort.Strings(keysForLabels)
// Use xxhash.Sum64(b) for fast path as it's faster.
b = b[:0]
for i, name := range keysForLabels {
value := id.Labels[name]
if len(b)+len(name)+len(value)+2 >= cap(b) {
// If labels entry is 1KB+ do not allocate whole entry.
h := xxhash.New()
_, _ = h.Write(b)
for _, name := range keysForLabels[i:] {
value := id.Labels[name]
_, _ = h.WriteString(name)
_, _ = h.Write(seps)
_, _ = h.WriteString(value)
_, _ = h.Write(seps)
}
return h.Sum64(), keysForLabels
}
b = append(b, name...)
b = append(b, seps[0])
b = append(b, value...)
b = append(b, seps[0])
}
return xxhash.Sum64(b), keysForLabels
}
Feature/querysharding ii (#1927) * [wip] sharding evaluator/ast * [wip] continues experimenting with ast mapping * refactoring in preparation for binops * evaluators can pass state to other evaluators * compiler alignment * Evaluator method renamed to StepEvaluator * chained evaluator impl * tidying up sharding code * handling for ConcatSampleExpr * downstream iterator * structure for downstreaming asts * outlines sharding optimizations * work on sharding mapper * ast sharding optimizations * test for different logrange positions * shard mapper tests * stronger ast sharding & tests * shardmapper tests for string->string * removes sharding evaluator code * removes unused ctx arg * Revert "removes sharding evaluator code" This reverts commit 55d41b9519da9496e9471f13a5048d903ea04aaa. * interfaces for downstreaming, type conversions * sharding plumbing on frontend * type alignment in queryrange to downstream sharded queriers * downstreaming support for sharding incl storage code * removes chainedevaluator * comment alignment * storage shard injection * speccing out testware for sharding equivalence * [wip] shared engine refactor * sorting streams, sharding eval fixes * downstream evaluator embeds defaultevaluator * other pkgs adopt logql changes * metrics & logs use same middleware instantiation process * wires up shardingware * middleware per metrics/logfilter * empty step populating StepEvaluator promql.Matrix adapter * sharding metrics * log/span injection into sharded engine * sharding metrics avoids multiple instantiation * downstreamhandler tracing * sharding parameterized libsonnet * removes querier replicas * default 32 concurrency for workers * jsonnet correct level override * unquote true in yaml * lowercase error + downstreamEvaluator defaults to embedded defaultEvaluator * makes shardRecorder private * logs query on failed parse * refactors engine to be multi-use, minimizes logger injection, generalizes Query methods, removes Engine interface * basic tests for querysharding mware * [wip] concurrent evaluator * integrates stat propagation into sharding evaluator * splitby histogram * extends le bounds for bytes processed * byte throughput histogram buckets to 40gb * chunk duration mixin * fixes merge w/ field rename * derives logger in sharded engine via ctx & logs some downstream evaluators * moves sharded engine to top, adds comments * logs failed merge results in stats ctx * snapshotting stats merge logic is done more effectively * per query concurrency controlled via downstreamer * unexports decodereq * queryrange testware * downstreamer tests * pr requests
6 years ago
type Streams []Stream
func (xs Streams) Len() int { return len(xs) }
func (xs Streams) Swap(i, j int) { xs[i], xs[j] = xs[j], xs[i] }
func (xs Streams) Less(i, j int) bool { return xs[i].Labels <= xs[j].Labels }
func (s Series) Len() int { return len(s.Samples) }
func (s Series) Swap(i, j int) { s.Samples[i], s.Samples[j] = s.Samples[j], s.Samples[i] }
func (s Series) Less(i, j int) bool { return s.Samples[i].Timestamp < s.Samples[j].Timestamp }
// Safe for concurrent use
func (m *IndexStatsResponse) AddStream(_ model.Fingerprint) {
atomic.AddUint64(&m.Streams, 1)
}
// Safe for concurrent use
Tsdb/v3 (#9070) brief: adds page support for chunks within a series. This lets us do a few optimizations, most notably skipping chunk ranges that don't overlap with our query bounds. It also allows us to use aggregated stats for pages when computing `Stats` calls that completely overlap all chunks in a page. teaser: ``` pkg: github.com/grafana/loki/pkg/storage/stores/tsdb/index BenchmarkChunkStats/version_2/2_chunks-10 10121240 109.4 ns/op 24 B/op 1 allocs/op BenchmarkChunkStats/version_3/2_chunks-10 27591069 43.51 ns/op 0 B/op 0 allocs/op BenchmarkChunkStats/version_2/4_chunks-10 9410245 128.7 ns/op 24 B/op 1 allocs/op BenchmarkChunkStats/version_3/4_chunks-10 21808070 54.44 ns/op 0 B/op 0 allocs/op BenchmarkChunkStats/version_2/10_chunks-10 6048235 201.3 ns/op 24 B/op 1 allocs/op BenchmarkChunkStats/version_3/10_chunks-10 9517605 125.1 ns/op 0 B/op 0 allocs/op BenchmarkChunkStats/version_2/100_chunks-10 1000000 1081 ns/op 24 B/op 1 allocs/op BenchmarkChunkStats/version_3/100_chunks-10 1669972 715.4 ns/op 528 B/op 3 allocs/op BenchmarkChunkStats/version_2/1000_chunks-10 118125 10165 ns/op 24 B/op 1 allocs/op BenchmarkChunkStats/version_3/1000_chunks-10 570576 2125 ns/op 4816 B/op 6 allocs/op BenchmarkChunkStats/version_2/10000_chunks-10 10000 117447 ns/op 123014 B/op 3 allocs/op BenchmarkChunkStats/version_3/10000_chunks-10 74524 16225 ns/op 48601 B/op 9 allocs/op BenchmarkChunkStats/version_2/100000_chunks-10 842 1240380 ns/op 3211679 B/op 13 allocs/op BenchmarkChunkStats/version_3/100000_chunks-10 8494 141169 ns/op 516530 B/op 13 allocs/op ``` ``` pkg: github.com/grafana/loki/pkg/storage/stores/tsdb/index BenchmarkReadChunks/version_2/2_chunks-10 13673050 80.52 ns/op 172 B/op 0 allocs/op BenchmarkReadChunks/version_3/2_chunks-10 25713412 64.06 ns/op 201 B/op 0 allocs/op BenchmarkReadChunks/version_2/4_chunks-10 12302040 96.56 ns/op 373 B/op 0 allocs/op BenchmarkReadChunks/version_3/4_chunks-10 20512030 84.93 ns/op 346 B/op 0 allocs/op BenchmarkReadChunks/version_2/10_chunks-10 6974947 168.7 ns/op 490 B/op 0 allocs/op BenchmarkReadChunks/version_3/10_chunks-10 7436360 161.0 ns/op 632 B/op 0 allocs/op BenchmarkReadChunks/version_2/50_chunks-10 2038724 599.3 ns/op 2034 B/op 0 allocs/op BenchmarkReadChunks/version_3/50_chunks-10 2251822 619.8 ns/op 1926 B/op 0 allocs/op BenchmarkReadChunks/version_2/100_chunks-10 946430 1092 ns/op 4132 B/op 0 allocs/op BenchmarkReadChunks/version_3/100_chunks-10 1467297 1000 ns/op 3308 B/op 1 allocs/op BenchmarkReadChunks/version_2/150_chunks-10 674575 1721 ns/op 5767 B/op 0 allocs/op BenchmarkReadChunks/version_3/150_chunks-10 1000000 1373 ns/op 6171 B/op 1 allocs/op BenchmarkReadChunks/version_2/1000_chunks-10 109153 10554 ns/op 33063 B/op 0 allocs/op BenchmarkReadChunks/version_3/1000_chunks-10 158086 6963 ns/op 32829 B/op 1 allocs/op BenchmarkReadChunks/version_2/10000_chunks-10 9043 114121 ns/op 345258 B/op 0 allocs/op BenchmarkReadChunks/version_3/10000_chunks-10 22083 79896 ns/op 435493 B/op 1 allocs/op BenchmarkReadChunks/version_2/100000_chunks-10 1039 1103621 ns/op 3479873 B/op 0 allocs/op BenchmarkReadChunks/version_3/100000_chunks-10 2101 1155419 ns/op 4680266 B/op 1 allocs/op ``` --------- Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
3 years ago
func (m *IndexStatsResponse) AddChunkStats(s index.ChunkStats) {
atomic.AddUint64(&m.Chunks, s.Chunks)
atomic.AddUint64(&m.Bytes, s.KB<<10)
atomic.AddUint64(&m.Entries, s.Entries)
}
func (m *IndexStatsResponse) Stats() IndexStatsResponse {
return *m
}
fixes bug with queryIngesterWithin logic in asyncStore ingester stats… (#8145) Fixes a previous mistake in the logic calculating when to skip querying ingesters in the async store Statistics method. Notably `through.After` should be `through.Before` when skipping querying ingesters as that's when there's no overlap with the `query-ingesters-within` period: ```go // OLD CODE BELOW if a.queryIngestersWithin != 0 { // don't query ingesters if the query does not overlap with queryIngestersWithin. if !through.After(model.Now().Add(-a.queryIngestersWithin)) { // <----- should be through.Before return a.Store.Stats(ctx, userID, from, through, matchers...) } } ``` I discovered the problem while debugging querier OOMs during a boltdb-shipper -> tsdb migration and ultimately found this happened under the following circumstances: * Queries over high volumes of _recent_ data wouldn't query ingesters for index metadata * Without index metadata, we only checked storage metadata * There is a delay before we ship the index to storage, meaning we don't see it if ingesters are skipped * Calculating ideal shard factors without recent data for queries that only touch recent data _dramatically_ underestimates the desired shard factor * Queries aren't split enough and get scheduled onto too few querier replicas * They oom. We had some fun examples like a single replica trying to download 419,000 chunks/query To be clear, this is still a hypothesis, but a plausible one, especially after finding the boolean logic error this PR fixes. Instead of changing this one line, I took the opportunity to refactor this into a shared utility used by our other `GetChunkRefs` method which is already tested, ensuring the logic works as expected. I also added some more logging visibility into this code so we can understand what the difference is when querying statistics from storage vs ingesters. Finally, I added a helper to prepare our `Stats` objects to be logged which is now used in a few places.
3 years ago
// Helper function for returning the key value pairs
// to be passed to a logger
func (m *IndexStatsResponse) LoggingKeyValues() []interface{} {
if m == nil {
return nil
}
return []interface{}{
"bytes", strings.Replace(humanize.Bytes(m.Bytes), " ", "", 1),
"chunks", m.Chunks,
"streams", m.Streams,
"entries", m.Entries,
}
}