From f8658fdc38ae39d88b016fd0dda2bd2274a65ffd Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Tue, 29 Aug 2023 12:32:36 +0200 Subject: [PATCH] Enable sharding for avg_over_time. (#10373) **What this PR does / why we need it**: The range aggregation `avg_over_time` is simply shardable if there's no label reduction. If there is one it can be express as ``` sum by (method) ( sum_over_time( {container="app"} | json | unwrap bytes [$__interval]) ) / sum by (method) ( count_over_time( {container="app"} | json [$__interval] ) ) ``` Note that `sum by`, `sum_over_time` and `count_over_time` are shardable once more. **Checklist** - [ ] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [x] Tests updated - [x] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/setup/upgrade/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213) --------- Co-authored-by: Owen Diehl --- CHANGELOG.md | 1 + pkg/logql/downstream_test.go | 14 +- pkg/logql/range_vector.go | 2 + pkg/logql/shardmapper.go | 84 +++++++--- pkg/logql/shardmapper_test.go | 154 +++++++++++++++++- pkg/logql/syntax/ast.go | 27 ++- pkg/logql/test_utils.go | 10 +- .../loki-boltdb-storage-s3/config/loki.yaml | 3 - .../loki-boltdb-storage-s3/docker-compose.yml | 2 +- 9 files changed, 260 insertions(+), 37 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 37d3e05453..37a8a94315 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -62,6 +62,7 @@ * [10308](https://github.com/grafana/loki/pull/10308) **bboreham** Tracing: elide small traces for Stats call. * [10341](https://github.com/grafana/loki/pull/10341) **ashwanthgoli** Deprecate older index types and non-object stores - `aws-dynamo, gcp, gcp-columnkey, bigtable, bigtable-hashed, cassandra, grpc` * [10344](https://github.com/grafana/loki/pull/10344) **ashwanthgoli** Compactor: deprecate `-boltdb.shipper.compactor.` prefix in favor of `-compactor.`. +* [10373](https://github.com/grafana/loki/pull/10373) **jeschkies** Loki: Shard `avg_over_time` range aggregations. ##### Fixes diff --git a/pkg/logql/downstream_test.go b/pkg/logql/downstream_test.go index 8c52f23cfa..c5f54b9e1c 100644 --- a/pkg/logql/downstream_test.go +++ b/pkg/logql/downstream_test.go @@ -23,7 +23,7 @@ func TestMappingEquivalence(t *testing.T) { shards = 3 nStreams = 60 rounds = 20 - streams = randomStreams(nStreams, rounds+1, shards, []string{"a", "b", "c", "d"}) + streams = randomStreams(nStreams, rounds+1, shards, []string{"a", "b", "c", "d"}, true) start = time.Unix(0, 0) end = time.Unix(0, int64(time.Second*time.Duration(rounds))) step = time.Second @@ -51,6 +51,8 @@ func TestMappingEquivalence(t *testing.T) { {`max(count(rate({a=~".+"}[1s])))`, false}, {`max(sum by (cluster) (rate({a=~".+"}[1s]))) / count(rate({a=~".+"}[1s]))`, false}, {`sum(rate({a=~".+"} |= "foo" != "foo"[1s]) or vector(1))`, false}, + {`avg_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false}, + {`avg_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, true}, // topk prefers already-seen values in tiebreakers. Since the test data generates // the same log lines for each series & the resulting promql.Vectors aren't deterministically // sorted by labels, we don't expect this to pass. @@ -106,7 +108,7 @@ func TestShardCounter(t *testing.T) { shards = 3 nStreams = 60 rounds = 20 - streams = randomStreams(nStreams, rounds+1, shards, []string{"a", "b", "c", "d"}) + streams = randomStreams(nStreams, rounds+1, shards, []string{"a", "b", "c", "d"}, false) start = time.Unix(0, 0) end = time.Unix(0, int64(time.Second*time.Duration(rounds))) step = time.Second @@ -168,7 +170,7 @@ func TestRangeMappingEquivalence(t *testing.T) { shards = 3 nStreams = 60 rounds = 20 - streams = randomStreams(nStreams, rounds+1, shards, []string{"a", "b", "c", "d"}) + streams = randomStreams(nStreams, rounds+1, shards, []string{"a", "b", "c", "d"}, false) start = time.Unix(0, 0) end = time.Unix(0, int64(time.Second*time.Duration(rounds))) step = time.Second @@ -427,13 +429,13 @@ func TestRangeMappingEquivalence(t *testing.T) { // approximatelyEquals ensures two responses are approximately equal, // up to 6 decimals precision per sample func approximatelyEquals(t *testing.T, as, bs promql.Matrix) { - require.Equal(t, len(as), len(bs)) + require.Len(t, bs, len(as)) for i := 0; i < len(as); i++ { a := as[i] b := bs[i] require.Equal(t, a.Metric, b.Metric) - require.Equal(t, len(a.Floats), len(b.Floats)) + require.Lenf(t, b.Floats, len(a.Floats), "at step %d", i) for j := 0; j < len(a.Floats); j++ { aSample := &a.Floats[j] @@ -441,6 +443,6 @@ func approximatelyEquals(t *testing.T, as, bs promql.Matrix) { bSample := &b.Floats[j] bSample.F = math.Round(bSample.F*1e6) / 1e6 } - require.Equal(t, a, b) + require.Equalf(t, a, b, "metric %s differs from %s at %d", a.Metric, b.Metric, i) } } diff --git a/pkg/logql/range_vector.go b/pkg/logql/range_vector.go index 6d09b38bc2..805446ff1b 100644 --- a/pkg/logql/range_vector.go +++ b/pkg/logql/range_vector.go @@ -422,6 +422,8 @@ func minOverTime(samples []promql.FPoint) float64 { return min } +// stdvarOverTime calculates the variance using Welford's online algorithm. +// See https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm func stdvarOverTime(samples []promql.FPoint) float64 { var aux, count, mean float64 for _, v := range samples { diff --git a/pkg/logql/shardmapper.go b/pkg/logql/shardmapper.go index d94476671e..a4795ff621 100644 --- a/pkg/logql/shardmapper.go +++ b/pkg/logql/shardmapper.go @@ -311,6 +311,24 @@ func (m ShardMapper) mapLabelReplaceExpr(expr *syntax.LabelReplaceExpr, r *downs return &cpy, bytesPerShard, nil } +// These functions require a different merge strategy than the default +// concatenation. +// This is because the same label sets may exist on multiple shards when label-reducing parsing is applied or when +// grouping by some subset of the labels. In this case, the resulting vector may have multiple values for the same +// series and we need to combine them appropriately given a particular operation. +var rangeMergeMap = map[string]string{ + // all these may be summed + syntax.OpRangeTypeCount: syntax.OpTypeSum, + syntax.OpRangeTypeRate: syntax.OpTypeSum, + syntax.OpRangeTypeBytes: syntax.OpTypeSum, + syntax.OpRangeTypeBytesRate: syntax.OpTypeSum, + syntax.OpRangeTypeSum: syntax.OpTypeSum, + + // min & max require taking the min|max of the shards + syntax.OpRangeTypeMin: syntax.OpTypeMin, + syntax.OpRangeTypeMax: syntax.OpTypeMax, +} + func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, r *downstreamRecorder) (syntax.SampleExpr, uint64, error) { if !expr.Shardable() { exprStats, err := m.shards.GetStats(expr) @@ -332,24 +350,6 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, return m.mapSampleExpr(expr, r) } - // These functions require a different merge strategy than the default - // concatenation. - // This is because the same label sets may exist on multiple shards when label-reducing parsing is applied or when - // grouping by some subset of the labels. In this case, the resulting vector may have multiple values for the same - // series and we need to combine them appropriately given a particular operation. - mergeMap := map[string]string{ - // all these may be summed - syntax.OpRangeTypeCount: syntax.OpTypeSum, - syntax.OpRangeTypeRate: syntax.OpTypeSum, - syntax.OpRangeTypeBytes: syntax.OpTypeSum, - syntax.OpRangeTypeBytesRate: syntax.OpTypeSum, - syntax.OpRangeTypeSum: syntax.OpTypeSum, - - // min & max require taking the min|max of the shards - syntax.OpRangeTypeMin: syntax.OpTypeMin, - syntax.OpRangeTypeMax: syntax.OpTypeMax, - } - // range aggregation groupings default to `without ()` behavior // so we explicitly set the wrapping vector aggregation to this // for parity when it's not explicitly set @@ -361,7 +361,7 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, mapped, bytes, err := m.mapSampleExpr(expr, r) // max_over_time(_) -> max without() (max_over_time(_) ++ max_over_time(_)...) // max_over_time(_) by (foo) -> max by (foo) (max_over_time(_) by (foo) ++ max_over_time(_) by (foo)...) - merger, ok := mergeMap[expr.Operation] + merger, ok := rangeMergeMap[expr.Operation] if !ok { return nil, 0, fmt.Errorf( "error while finding merge operation for %s", expr.Operation, @@ -373,6 +373,52 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, Operation: merger, }, bytes, err + case syntax.OpRangeTypeAvg: + potentialConflict := syntax.ReducesLabels(expr) + if !potentialConflict && (expr.Grouping == nil || expr.Grouping.Noop()) { + return m.mapSampleExpr(expr, r) + } + + // avg_overtime() by (foo) -> sum by (foo) (sum_over_time()) / sum by (foo) (count_over_time()) + lhs, lhsBytesPerShard, err := m.mapVectorAggregationExpr(&syntax.VectorAggregationExpr{ + Left: &syntax.RangeAggregationExpr{ + Left: expr.Left, + Operation: syntax.OpRangeTypeSum, + }, + Grouping: expr.Grouping, + Operation: syntax.OpTypeSum, + }, r) + if err != nil { + return nil, 0, err + } + + // Strip unwrap from log range + countOverTimeSelector, err := expr.Left.WithoutUnwrap() + if err != nil { + return nil, 0, err + } + + rhs, rhsBytesPerShard, err := m.mapVectorAggregationExpr(&syntax.VectorAggregationExpr{ + Left: &syntax.RangeAggregationExpr{ + Left: countOverTimeSelector, + Operation: syntax.OpRangeTypeCount, + }, + Grouping: expr.Grouping, + Operation: syntax.OpTypeSum, + }, r) + if err != nil { + return nil, 0, err + } + + // We take the maximum bytes per shard of both sides of the operation + bytesPerShard := uint64(math.Max(int(lhsBytesPerShard), int(rhsBytesPerShard))) + + return &syntax.BinOpExpr{ + SampleExpr: lhs, + RHS: rhs, + Op: syntax.OpTypeDiv, + }, bytesPerShard, nil + default: // don't shard if there's not an appropriate optimization exprStats, err := m.shards.GetStats(expr) diff --git a/pkg/logql/shardmapper_test.go b/pkg/logql/shardmapper_test.go index 1f3eac75e0..d281ddd34b 100644 --- a/pkg/logql/shardmapper_test.go +++ b/pkg/logql/shardmapper_test.go @@ -328,12 +328,40 @@ func TestMappingStrings(t *testing.T) { )`, }, { - in: `avg(avg_over_time({job=~"myapps.*"} |= "stats" | json busy="utilization" | unwrap busy [5m]))`, - out: `avg(avg_over_time({job=~"myapps.*"} |= "stats" | json busy="utilization" | unwrap busy [5m]))`, + in: `avg(avg_over_time({job=~"myapps.*"} |= "stats" | json busy="utilization" | unwrap busy [5m]))`, + out: `( + sum( + downstream + ++ + downstream) + / + sum( + downstream + ++ + downstream + ) + )`, }, { - in: `avg_over_time({job=~"myapps.*"} |= "stats" | json busy="utilization" | unwrap busy [5m])`, - out: `avg_over_time({job=~"myapps.*"} |= "stats" | json busy="utilization" | unwrap busy [5m])`, + in: `avg_over_time({job=~"myapps.*"} |= "stats" | json busy="utilization" | unwrap busy [5m])`, + out: `downstream + ++ downstream`, + }, + { + in: `avg_over_time({job=~"myapps.*"} |= "stats" | json busy="utilization" | unwrap busy [5m]) by (cluster)`, + out: `( + sum by (cluster) ( + downstream + ++ + downstream + ) + / + sum by (cluster) ( + downstream + ++ + downstream + ) + )`, }, // should be noop if VectorExpr { @@ -1180,6 +1208,124 @@ func TestMapping(t *testing.T) { }, }, }, + { + in: `avg_over_time({foo="bar"} | unwrap bytes [5m]) by (cluster)`, + expr: &syntax.BinOpExpr{ + Op: syntax.OpTypeDiv, + SampleExpr: &syntax.VectorAggregationExpr{ + Grouping: &syntax.Grouping{ + Groups: []string{"cluster"}, + }, + Operation: syntax.OpTypeSum, + Left: &ConcatSampleExpr{ + DownstreamSampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 2, + }, + SampleExpr: &syntax.VectorAggregationExpr{ + Grouping: &syntax.Grouping{ + Groups: []string{"cluster"}, + }, + Operation: syntax.OpTypeSum, + Left: &syntax.RangeAggregationExpr{ + Operation: syntax.OpRangeTypeSum, + Left: &syntax.LogRange{ + Left: &syntax.MatchersExpr{ + Mts: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}, + }, + Interval: 5 * time.Minute, + Unwrap: &syntax.UnwrapExpr{ + Identifier: "bytes", + }, + }, + }, + }, + }, + next: &ConcatSampleExpr{ + DownstreamSampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 2, + }, + SampleExpr: &syntax.VectorAggregationExpr{ + Grouping: &syntax.Grouping{ + Groups: []string{"cluster"}, + }, + Operation: syntax.OpTypeSum, + Left: &syntax.RangeAggregationExpr{ + Operation: syntax.OpRangeTypeSum, + Left: &syntax.LogRange{ + Left: &syntax.MatchersExpr{ + Mts: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}, + }, + Interval: 5 * time.Minute, + Unwrap: &syntax.UnwrapExpr{ + Identifier: "bytes", + }, + }, + }, + }, + }, + next: nil, + }, + }, + }, + RHS: &syntax.VectorAggregationExpr{ + Operation: syntax.OpTypeSum, + Grouping: &syntax.Grouping{ + Groups: []string{"cluster"}, + }, + Left: &ConcatSampleExpr{ + DownstreamSampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 2, + }, + SampleExpr: &syntax.VectorAggregationExpr{ + Grouping: &syntax.Grouping{ + Groups: []string{"cluster"}, + }, + Operation: syntax.OpTypeSum, + Left: &syntax.RangeAggregationExpr{ + Operation: syntax.OpRangeTypeCount, + Left: &syntax.LogRange{ + Left: &syntax.MatchersExpr{ + Mts: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}, + }, + Interval: 5 * time.Minute, + }, + }, + }, + }, + next: &ConcatSampleExpr{ + DownstreamSampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 2, + }, + SampleExpr: &syntax.VectorAggregationExpr{ + Grouping: &syntax.Grouping{ + Groups: []string{"cluster"}, + }, + Operation: syntax.OpTypeSum, + Left: &syntax.RangeAggregationExpr{ + Operation: syntax.OpRangeTypeCount, + Left: &syntax.LogRange{ + Left: &syntax.MatchersExpr{ + Mts: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}, + }, + Interval: 5 * time.Minute, + }, + }, + }, + }, + next: nil, + }, + }, + }, + }, + }, } { t.Run(tc.in, func(t *testing.T) { ast, err := syntax.ParseExpr(tc.in) diff --git a/pkg/logql/syntax/ast.go b/pkg/logql/syntax/ast.go index e3fb5d34da..0897149e7e 100644 --- a/pkg/logql/syntax/ast.go +++ b/pkg/logql/syntax/ast.go @@ -32,8 +32,17 @@ type Expr interface { Pretty(level int) string } -func Clone(e Expr) (Expr, error) { - return ParseExpr(e.String()) +func Clone[T Expr](e T) (T, error) { + var empty T + copied, err := ParseExpr(e.String()) + if err != nil { + return empty, err + } + cast, ok := copied.(T) + if !ok { + return empty, fmt.Errorf("unpexpected type of cloned expression: want %T, got %T", empty, copied) + } + return cast, nil } // implicit holds default implementations @@ -894,6 +903,19 @@ func (r *LogRange) Walk(f WalkFn) { r.Left.Walk(f) } +// WithoutUnwrap returns a copy of the log range without the unwrap statement. +func (r *LogRange) WithoutUnwrap() (*LogRange, error) { + left, err := Clone(r.Left) + if err != nil { + return nil, err + } + return &LogRange{ + Left: left, + Interval: r.Interval, + Offset: r.Offset, + }, nil +} + func newLogRange(left LogSelectorExpr, interval time.Duration, u *UnwrapExpr, o *OffsetExpr) *LogRange { var offset time.Duration if o != nil { @@ -1950,6 +1972,7 @@ var shardableOps = map[string]bool{ OpTypeMin: true, // range vector ops + OpRangeTypeAvg: true, OpRangeTypeCount: true, OpRangeTypeRate: true, OpRangeTypeBytes: true, diff --git a/pkg/logql/test_utils.go b/pkg/logql/test_utils.go index 38bd735fac..982fa7f5f1 100644 --- a/pkg/logql/test_utils.go +++ b/pkg/logql/test_utils.go @@ -4,6 +4,7 @@ import ( "context" "fmt" logger "log" + "math/rand" "sort" "strings" "time" @@ -239,7 +240,8 @@ func (m MockDownstreamer) Downstream(ctx context.Context, queries []DownstreamQu // create nStreams of nEntries with labelNames each where each label value // with the exception of the "index" label is modulo'd into a shard -func randomStreams(nStreams, nEntries, nShards int, labelNames []string) (streams []logproto.Stream) { +func randomStreams(nStreams, nEntries, nShards int, labelNames []string, valueField bool) (streams []logproto.Stream) { + r := rand.New(rand.NewSource(42)) for i := 0; i < nStreams; i++ { // labels stream := logproto.Stream{} @@ -259,9 +261,13 @@ func randomStreams(nStreams, nEntries, nShards int, labelNames []string) (stream }) } for j := 0; j <= nEntries; j++ { + line := fmt.Sprintf("stream=stderr level=debug line=%d", j) + if valueField { + line = fmt.Sprintf("%s value=%f", line, r.Float64()*100.0) + } stream.Entries = append(stream.Entries, logproto.Entry{ Timestamp: time.Unix(0, int64(j*int(time.Second))), - Line: fmt.Sprintf("stream=stderr level=debug line=%d", j), + Line: line, }) } diff --git a/tools/dev/loki-boltdb-storage-s3/config/loki.yaml b/tools/dev/loki-boltdb-storage-s3/config/loki.yaml index 97835ff478..eb012f8ba5 100644 --- a/tools/dev/loki-boltdb-storage-s3/config/loki.yaml +++ b/tools/dev/loki-boltdb-storage-s3/config/loki.yaml @@ -87,10 +87,7 @@ limits_config: reject_old_samples_max_age: 168h split_queries_by_interval: 15m querier: - engine: - timeout: 5m query_ingesters_within: 2h - query_timeout: 5m multi_tenant_queries_enabled: true per_request_limits_enabled: true query_range: diff --git a/tools/dev/loki-boltdb-storage-s3/docker-compose.yml b/tools/dev/loki-boltdb-storage-s3/docker-compose.yml index fec53dad5f..4af04b7664 100644 --- a/tools/dev/loki-boltdb-storage-s3/docker-compose.yml +++ b/tools/dev/loki-boltdb-storage-s3/docker-compose.yml @@ -7,7 +7,7 @@ services: loki-url: "http://localhost:8001/loki/api/v1/push" loki-retries: "1" loki-tenant-id: "1" - image: consul + image: hashicorp/consul command: [ "agent", "-dev" ,"-client=0.0.0.0", "-log-level=info" ] ports: - 8500:8500