Enable sharding for avg_over_time. (#10373)

**What this PR does / why we need it**:
The range aggregation `avg_over_time` is simply shardable if there's no
label reduction.

If there is one it can be express as
```
sum by (method) (
    sum_over_time(
        {container="app"} | json | unwrap bytes [$__interval])
) 
/
sum by (method) (
    count_over_time(
        {container="app"} | json [$__interval]
    )
) 
```

Note that `sum by`, `sum_over_time` and `count_over_time` are shardable
once more.

**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [x] Tests updated
- [x] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e3ec)

---------

Co-authored-by: Owen Diehl <ow.diehl@gmail.com>
pull/10379/head^2
Karsten Jeschkies 3 years ago committed by GitHub
parent 05195f2e9b
commit f8658fdc38
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 14
      pkg/logql/downstream_test.go
  3. 2
      pkg/logql/range_vector.go
  4. 84
      pkg/logql/shardmapper.go
  5. 154
      pkg/logql/shardmapper_test.go
  6. 27
      pkg/logql/syntax/ast.go
  7. 10
      pkg/logql/test_utils.go
  8. 3
      tools/dev/loki-boltdb-storage-s3/config/loki.yaml
  9. 2
      tools/dev/loki-boltdb-storage-s3/docker-compose.yml

@ -62,6 +62,7 @@
* [10308](https://github.com/grafana/loki/pull/10308) **bboreham** Tracing: elide small traces for Stats call.
* [10341](https://github.com/grafana/loki/pull/10341) **ashwanthgoli** Deprecate older index types and non-object stores - `aws-dynamo, gcp, gcp-columnkey, bigtable, bigtable-hashed, cassandra, grpc`
* [10344](https://github.com/grafana/loki/pull/10344) **ashwanthgoli** Compactor: deprecate `-boltdb.shipper.compactor.` prefix in favor of `-compactor.`.
* [10373](https://github.com/grafana/loki/pull/10373) **jeschkies** Loki: Shard `avg_over_time` range aggregations.
##### Fixes

@ -23,7 +23,7 @@ func TestMappingEquivalence(t *testing.T) {
shards = 3
nStreams = 60
rounds = 20
streams = randomStreams(nStreams, rounds+1, shards, []string{"a", "b", "c", "d"})
streams = randomStreams(nStreams, rounds+1, shards, []string{"a", "b", "c", "d"}, true)
start = time.Unix(0, 0)
end = time.Unix(0, int64(time.Second*time.Duration(rounds)))
step = time.Second
@ -51,6 +51,8 @@ func TestMappingEquivalence(t *testing.T) {
{`max(count(rate({a=~".+"}[1s])))`, false},
{`max(sum by (cluster) (rate({a=~".+"}[1s]))) / count(rate({a=~".+"}[1s]))`, false},
{`sum(rate({a=~".+"} |= "foo" != "foo"[1s]) or vector(1))`, false},
{`avg_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false},
{`avg_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, true},
// topk prefers already-seen values in tiebreakers. Since the test data generates
// the same log lines for each series & the resulting promql.Vectors aren't deterministically
// sorted by labels, we don't expect this to pass.
@ -106,7 +108,7 @@ func TestShardCounter(t *testing.T) {
shards = 3
nStreams = 60
rounds = 20
streams = randomStreams(nStreams, rounds+1, shards, []string{"a", "b", "c", "d"})
streams = randomStreams(nStreams, rounds+1, shards, []string{"a", "b", "c", "d"}, false)
start = time.Unix(0, 0)
end = time.Unix(0, int64(time.Second*time.Duration(rounds)))
step = time.Second
@ -168,7 +170,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
shards = 3
nStreams = 60
rounds = 20
streams = randomStreams(nStreams, rounds+1, shards, []string{"a", "b", "c", "d"})
streams = randomStreams(nStreams, rounds+1, shards, []string{"a", "b", "c", "d"}, false)
start = time.Unix(0, 0)
end = time.Unix(0, int64(time.Second*time.Duration(rounds)))
step = time.Second
@ -427,13 +429,13 @@ func TestRangeMappingEquivalence(t *testing.T) {
// approximatelyEquals ensures two responses are approximately equal,
// up to 6 decimals precision per sample
func approximatelyEquals(t *testing.T, as, bs promql.Matrix) {
require.Equal(t, len(as), len(bs))
require.Len(t, bs, len(as))
for i := 0; i < len(as); i++ {
a := as[i]
b := bs[i]
require.Equal(t, a.Metric, b.Metric)
require.Equal(t, len(a.Floats), len(b.Floats))
require.Lenf(t, b.Floats, len(a.Floats), "at step %d", i)
for j := 0; j < len(a.Floats); j++ {
aSample := &a.Floats[j]
@ -441,6 +443,6 @@ func approximatelyEquals(t *testing.T, as, bs promql.Matrix) {
bSample := &b.Floats[j]
bSample.F = math.Round(bSample.F*1e6) / 1e6
}
require.Equal(t, a, b)
require.Equalf(t, a, b, "metric %s differs from %s at %d", a.Metric, b.Metric, i)
}
}

@ -422,6 +422,8 @@ func minOverTime(samples []promql.FPoint) float64 {
return min
}
// stdvarOverTime calculates the variance using Welford's online algorithm.
// See https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm
func stdvarOverTime(samples []promql.FPoint) float64 {
var aux, count, mean float64
for _, v := range samples {

@ -311,6 +311,24 @@ func (m ShardMapper) mapLabelReplaceExpr(expr *syntax.LabelReplaceExpr, r *downs
return &cpy, bytesPerShard, nil
}
// These functions require a different merge strategy than the default
// concatenation.
// This is because the same label sets may exist on multiple shards when label-reducing parsing is applied or when
// grouping by some subset of the labels. In this case, the resulting vector may have multiple values for the same
// series and we need to combine them appropriately given a particular operation.
var rangeMergeMap = map[string]string{
// all these may be summed
syntax.OpRangeTypeCount: syntax.OpTypeSum,
syntax.OpRangeTypeRate: syntax.OpTypeSum,
syntax.OpRangeTypeBytes: syntax.OpTypeSum,
syntax.OpRangeTypeBytesRate: syntax.OpTypeSum,
syntax.OpRangeTypeSum: syntax.OpTypeSum,
// min & max require taking the min|max of the shards
syntax.OpRangeTypeMin: syntax.OpTypeMin,
syntax.OpRangeTypeMax: syntax.OpTypeMax,
}
func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, r *downstreamRecorder) (syntax.SampleExpr, uint64, error) {
if !expr.Shardable() {
exprStats, err := m.shards.GetStats(expr)
@ -332,24 +350,6 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr,
return m.mapSampleExpr(expr, r)
}
// These functions require a different merge strategy than the default
// concatenation.
// This is because the same label sets may exist on multiple shards when label-reducing parsing is applied or when
// grouping by some subset of the labels. In this case, the resulting vector may have multiple values for the same
// series and we need to combine them appropriately given a particular operation.
mergeMap := map[string]string{
// all these may be summed
syntax.OpRangeTypeCount: syntax.OpTypeSum,
syntax.OpRangeTypeRate: syntax.OpTypeSum,
syntax.OpRangeTypeBytes: syntax.OpTypeSum,
syntax.OpRangeTypeBytesRate: syntax.OpTypeSum,
syntax.OpRangeTypeSum: syntax.OpTypeSum,
// min & max require taking the min|max of the shards
syntax.OpRangeTypeMin: syntax.OpTypeMin,
syntax.OpRangeTypeMax: syntax.OpTypeMax,
}
// range aggregation groupings default to `without ()` behavior
// so we explicitly set the wrapping vector aggregation to this
// for parity when it's not explicitly set
@ -361,7 +361,7 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr,
mapped, bytes, err := m.mapSampleExpr(expr, r)
// max_over_time(_) -> max without() (max_over_time(_) ++ max_over_time(_)...)
// max_over_time(_) by (foo) -> max by (foo) (max_over_time(_) by (foo) ++ max_over_time(_) by (foo)...)
merger, ok := mergeMap[expr.Operation]
merger, ok := rangeMergeMap[expr.Operation]
if !ok {
return nil, 0, fmt.Errorf(
"error while finding merge operation for %s", expr.Operation,
@ -373,6 +373,52 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr,
Operation: merger,
}, bytes, err
case syntax.OpRangeTypeAvg:
potentialConflict := syntax.ReducesLabels(expr)
if !potentialConflict && (expr.Grouping == nil || expr.Grouping.Noop()) {
return m.mapSampleExpr(expr, r)
}
// avg_overtime() by (foo) -> sum by (foo) (sum_over_time()) / sum by (foo) (count_over_time())
lhs, lhsBytesPerShard, err := m.mapVectorAggregationExpr(&syntax.VectorAggregationExpr{
Left: &syntax.RangeAggregationExpr{
Left: expr.Left,
Operation: syntax.OpRangeTypeSum,
},
Grouping: expr.Grouping,
Operation: syntax.OpTypeSum,
}, r)
if err != nil {
return nil, 0, err
}
// Strip unwrap from log range
countOverTimeSelector, err := expr.Left.WithoutUnwrap()
if err != nil {
return nil, 0, err
}
rhs, rhsBytesPerShard, err := m.mapVectorAggregationExpr(&syntax.VectorAggregationExpr{
Left: &syntax.RangeAggregationExpr{
Left: countOverTimeSelector,
Operation: syntax.OpRangeTypeCount,
},
Grouping: expr.Grouping,
Operation: syntax.OpTypeSum,
}, r)
if err != nil {
return nil, 0, err
}
// We take the maximum bytes per shard of both sides of the operation
bytesPerShard := uint64(math.Max(int(lhsBytesPerShard), int(rhsBytesPerShard)))
return &syntax.BinOpExpr{
SampleExpr: lhs,
RHS: rhs,
Op: syntax.OpTypeDiv,
}, bytesPerShard, nil
default:
// don't shard if there's not an appropriate optimization
exprStats, err := m.shards.GetStats(expr)

@ -328,12 +328,40 @@ func TestMappingStrings(t *testing.T) {
)`,
},
{
in: `avg(avg_over_time({job=~"myapps.*"} |= "stats" | json busy="utilization" | unwrap busy [5m]))`,
out: `avg(avg_over_time({job=~"myapps.*"} |= "stats" | json busy="utilization" | unwrap busy [5m]))`,
in: `avg(avg_over_time({job=~"myapps.*"} |= "stats" | json busy="utilization" | unwrap busy [5m]))`,
out: `(
sum(
downstream<sum(avg_over_time({job=~"myapps.*"}|="stats" | json busy="utilization" | unwrap busy [5m])),shard=0_of_2>
++
downstream<sum(avg_over_time({job=~"myapps.*"}|="stats" | json busy="utilization" | unwrap busy [5m])),shard=1_of_2>)
/
sum(
downstream<count(avg_over_time({job=~"myapps.*"}|="stats" | json busy="utilization" | unwrap busy [5m])),shard=0_of_2>
++
downstream<count(avg_over_time({job=~"myapps.*"}|="stats" | json busy="utilization" |unwrap busy [5m])),shard=1_of_2>
)
)`,
},
{
in: `avg_over_time({job=~"myapps.*"} |= "stats" | json busy="utilization" | unwrap busy [5m])`,
out: `avg_over_time({job=~"myapps.*"} |= "stats" | json busy="utilization" | unwrap busy [5m])`,
in: `avg_over_time({job=~"myapps.*"} |= "stats" | json busy="utilization" | unwrap busy [5m])`,
out: `downstream<avg_over_time({job=~"myapps.*"}|= "stats" | json busy="utilization" | unwrap busy [5m]),shard=0_of_2>
++ downstream<avg_over_time({job=~"myapps.*"}|="stats" | json busy="utilization" | unwrap busy [5m]),shard=1_of_2>`,
},
{
in: `avg_over_time({job=~"myapps.*"} |= "stats" | json busy="utilization" | unwrap busy [5m]) by (cluster)`,
out: `(
sum by (cluster) (
downstream<sum by (cluster) (sum_over_time({job=~"myapps.*"}|="stats" | json busy="utilization" | unwrap busy [5m])),shard=0_of_2>
++
downstream<sum by (cluster) (sum_over_time({job=~"myapps.*"}|="stats" | json busy="utilization" | unwrap busy [5m])),shard=1_of_2>
)
/
sum by (cluster) (
downstream<sum by (cluster) (count_over_time({job=~"myapps.*"}|="stats" | json busy="utilization" [5m])),shard=0_of_2>
++
downstream<sum by (cluster) (count_over_time({job=~"myapps.*"}|="stats" | json busy="utilization" [5m])),shard=1_of_2>
)
)`,
},
// should be noop if VectorExpr
{
@ -1180,6 +1208,124 @@ func TestMapping(t *testing.T) {
},
},
},
{
in: `avg_over_time({foo="bar"} | unwrap bytes [5m]) by (cluster)`,
expr: &syntax.BinOpExpr{
Op: syntax.OpTypeDiv,
SampleExpr: &syntax.VectorAggregationExpr{
Grouping: &syntax.Grouping{
Groups: []string{"cluster"},
},
Operation: syntax.OpTypeSum,
Left: &ConcatSampleExpr{
DownstreamSampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: 0,
Of: 2,
},
SampleExpr: &syntax.VectorAggregationExpr{
Grouping: &syntax.Grouping{
Groups: []string{"cluster"},
},
Operation: syntax.OpTypeSum,
Left: &syntax.RangeAggregationExpr{
Operation: syntax.OpRangeTypeSum,
Left: &syntax.LogRange{
Left: &syntax.MatchersExpr{
Mts: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")},
},
Interval: 5 * time.Minute,
Unwrap: &syntax.UnwrapExpr{
Identifier: "bytes",
},
},
},
},
},
next: &ConcatSampleExpr{
DownstreamSampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: 1,
Of: 2,
},
SampleExpr: &syntax.VectorAggregationExpr{
Grouping: &syntax.Grouping{
Groups: []string{"cluster"},
},
Operation: syntax.OpTypeSum,
Left: &syntax.RangeAggregationExpr{
Operation: syntax.OpRangeTypeSum,
Left: &syntax.LogRange{
Left: &syntax.MatchersExpr{
Mts: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")},
},
Interval: 5 * time.Minute,
Unwrap: &syntax.UnwrapExpr{
Identifier: "bytes",
},
},
},
},
},
next: nil,
},
},
},
RHS: &syntax.VectorAggregationExpr{
Operation: syntax.OpTypeSum,
Grouping: &syntax.Grouping{
Groups: []string{"cluster"},
},
Left: &ConcatSampleExpr{
DownstreamSampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: 0,
Of: 2,
},
SampleExpr: &syntax.VectorAggregationExpr{
Grouping: &syntax.Grouping{
Groups: []string{"cluster"},
},
Operation: syntax.OpTypeSum,
Left: &syntax.RangeAggregationExpr{
Operation: syntax.OpRangeTypeCount,
Left: &syntax.LogRange{
Left: &syntax.MatchersExpr{
Mts: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")},
},
Interval: 5 * time.Minute,
},
},
},
},
next: &ConcatSampleExpr{
DownstreamSampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: 1,
Of: 2,
},
SampleExpr: &syntax.VectorAggregationExpr{
Grouping: &syntax.Grouping{
Groups: []string{"cluster"},
},
Operation: syntax.OpTypeSum,
Left: &syntax.RangeAggregationExpr{
Operation: syntax.OpRangeTypeCount,
Left: &syntax.LogRange{
Left: &syntax.MatchersExpr{
Mts: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")},
},
Interval: 5 * time.Minute,
},
},
},
},
next: nil,
},
},
},
},
},
} {
t.Run(tc.in, func(t *testing.T) {
ast, err := syntax.ParseExpr(tc.in)

@ -32,8 +32,17 @@ type Expr interface {
Pretty(level int) string
}
func Clone(e Expr) (Expr, error) {
return ParseExpr(e.String())
func Clone[T Expr](e T) (T, error) {
var empty T
copied, err := ParseExpr(e.String())
if err != nil {
return empty, err
}
cast, ok := copied.(T)
if !ok {
return empty, fmt.Errorf("unpexpected type of cloned expression: want %T, got %T", empty, copied)
}
return cast, nil
}
// implicit holds default implementations
@ -894,6 +903,19 @@ func (r *LogRange) Walk(f WalkFn) {
r.Left.Walk(f)
}
// WithoutUnwrap returns a copy of the log range without the unwrap statement.
func (r *LogRange) WithoutUnwrap() (*LogRange, error) {
left, err := Clone(r.Left)
if err != nil {
return nil, err
}
return &LogRange{
Left: left,
Interval: r.Interval,
Offset: r.Offset,
}, nil
}
func newLogRange(left LogSelectorExpr, interval time.Duration, u *UnwrapExpr, o *OffsetExpr) *LogRange {
var offset time.Duration
if o != nil {
@ -1950,6 +1972,7 @@ var shardableOps = map[string]bool{
OpTypeMin: true,
// range vector ops
OpRangeTypeAvg: true,
OpRangeTypeCount: true,
OpRangeTypeRate: true,
OpRangeTypeBytes: true,

@ -4,6 +4,7 @@ import (
"context"
"fmt"
logger "log"
"math/rand"
"sort"
"strings"
"time"
@ -239,7 +240,8 @@ func (m MockDownstreamer) Downstream(ctx context.Context, queries []DownstreamQu
// create nStreams of nEntries with labelNames each where each label value
// with the exception of the "index" label is modulo'd into a shard
func randomStreams(nStreams, nEntries, nShards int, labelNames []string) (streams []logproto.Stream) {
func randomStreams(nStreams, nEntries, nShards int, labelNames []string, valueField bool) (streams []logproto.Stream) {
r := rand.New(rand.NewSource(42))
for i := 0; i < nStreams; i++ {
// labels
stream := logproto.Stream{}
@ -259,9 +261,13 @@ func randomStreams(nStreams, nEntries, nShards int, labelNames []string) (stream
})
}
for j := 0; j <= nEntries; j++ {
line := fmt.Sprintf("stream=stderr level=debug line=%d", j)
if valueField {
line = fmt.Sprintf("%s value=%f", line, r.Float64()*100.0)
}
stream.Entries = append(stream.Entries, logproto.Entry{
Timestamp: time.Unix(0, int64(j*int(time.Second))),
Line: fmt.Sprintf("stream=stderr level=debug line=%d", j),
Line: line,
})
}

@ -87,10 +87,7 @@ limits_config:
reject_old_samples_max_age: 168h
split_queries_by_interval: 15m
querier:
engine:
timeout: 5m
query_ingesters_within: 2h
query_timeout: 5m
multi_tenant_queries_enabled: true
per_request_limits_enabled: true
query_range:

@ -7,7 +7,7 @@ services:
loki-url: "http://localhost:8001/loki/api/v1/push"
loki-retries: "1"
loki-tenant-id: "1"
image: consul
image: hashicorp/consul
command: [ "agent", "-dev" ,"-client=0.0.0.0", "-log-level=info" ]
ports:
- 8500:8500

Loading…
Cancel
Save