From 0bce8d95a2f42708d7ce8148e64cd0173067df05 Mon Sep 17 00:00:00 2001 From: Susana Ferreira Date: Fri, 8 Apr 2022 00:18:54 +0200 Subject: [PATCH] Split by range of instant queries (#5662) * Split by range on Instant queries POC v3 Co-authored-by: Christian Haudum * Handle uneven split by duration * Register SplitByRangeMiddleware in roundtripper Signed-off-by: Christian Haudum * fixup! Register SplitByRangeMiddleware in roundtripper Signed-off-by: Christian Haudum * fixup! fixup! Register SplitByRangeMiddleware in roundtripper Signed-off-by: Christian Haudum * Remove rewrite if range aggr has label extraction stage In case a range aggregation has a generic label extraction stage, such as `| json` or `| logfmt` and no group by, we cannot split it, because otherwise the downstream queries would result in too many series. Signed-off-by: Christian Haudum * Fix linting * Implement range splitting for rate() and bytes_rate() Signed-off-by: Christian Haudum * Fix linting * Calculate offset of downstream queries correctly if the outer query range contains an offset as well. Signed-off-by: Christian Haudum * Fix linting * Add optimization by moving the outer label grouping downstream * Add label grouping downstream optimization to rate and bytes_rate expressions * Add changelog entry Signed-off-by: Christian Haudum * Simplify types in rangemapper Signed-off-by: Christian Haudum * fixup! Simplify types in rangemapper Signed-off-by: Christian Haudum * Check in Map function if query is splittable by range Since this is the main function of the mapper, we can ensure here that only supported vector/range aggregations are handled. Signed-off-by: Christian Haudum * Some code cleanups and variable renaming Signed-off-by: Christian Haudum * Extract duplicate code in range aggr mapping into function Signed-off-by: Christian Haudum * Add topk to supported splittable vector aggregations Signed-off-by: Christian Haudum * Check if query is splittable by range before calling Map() Signed-off-by: Christian Haudum * Add more function comments Signed-off-by: Christian Haudum * Rename RangeVectorMapper to RangeMapper Signed-off-by: Christian Haudum * Fix incorrect import due to rebase Signed-off-by: Christian Haudum * Add equivalence test cases with `logfmt` pipeline stage Signed-off-by: Christian Haudum * Remove limitation of pushing down vector aggr only if grouping is present Signed-off-by: Christian Haudum * Remove TestRangeMappingEquivalenceMockMapper test This test is essentially the same as the test Test_SplitRangeVectorMapping, just using a different representation of the result. Signed-off-by: Christian Haudum * fixup! Remove limitation of pushing down vector aggr only if grouping is present Signed-off-by: Christian Haudum * fixup! fixup! Remove limitation of pushing down vector aggr only if grouping is present Signed-off-by: Christian Haudum * Fix linter errors Signed-off-by: Christian Haudum * Better naming of variable Signed-off-by: Christian Haudum * Split SplitRangeVectorMapping test into two to have the test for noop queries separated Signed-off-by: Christian Haudum Co-authored-by: Christian Haudum Co-authored-by: Owen Diehl --- pkg/logql/downstream_test.go | 202 +++- pkg/logql/rangemapper.go | 332 ++++++ pkg/logql/rangemapper_test.go | 1057 +++++++++++++++++ pkg/logql/test_utils.go | 2 +- pkg/querier/queryrange/roundtrip.go | 1 + pkg/querier/queryrange/split_by_range.go | 120 ++ pkg/querier/queryrange/split_by_range_test.go | 181 +++ 7 files changed, 1893 insertions(+), 2 deletions(-) create mode 100644 pkg/logql/rangemapper.go create mode 100644 pkg/logql/rangemapper_test.go create mode 100644 pkg/querier/queryrange/split_by_range.go create mode 100644 pkg/querier/queryrange/split_by_range_test.go diff --git a/pkg/logql/downstream_test.go b/pkg/logql/downstream_test.go index 775393fd27..c8ffebfcf1 100644 --- a/pkg/logql/downstream_test.go +++ b/pkg/logql/downstream_test.go @@ -99,7 +99,207 @@ func TestMappingEquivalence(t *testing.T) { } } -// approximatelyEquals ensures two responses are approximately equal, up to 6 decimals precision per sample +func TestRangeMappingEquivalence(t *testing.T) { + var ( + shards = 3 + nStreams = 60 + rounds = 20 + streams = randomStreams(nStreams, rounds+1, shards, []string{"a", "b", "c", "d"}) + start = time.Unix(0, 0) + end = time.Unix(0, int64(time.Second*time.Duration(rounds))) + step = time.Second + interval = time.Duration(0) + limit = 100 + ) + + for _, tc := range []struct { + query string + splitByInterval time.Duration + }{ + // Range vector aggregators + {`bytes_over_time({a=~".+"}[2s])`, time.Second}, + {`count_over_time({a=~".+"}[2s])`, time.Second}, + {`sum_over_time({a=~".+"} | unwrap b [2s])`, time.Second}, + {`max_over_time({a=~".+"} | unwrap b [2s])`, time.Second}, + {`max_over_time({a=~".+"} | unwrap b [2s]) by (a)`, time.Second}, + {`max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a)`, time.Second}, + {`min_over_time({a=~".+"} | unwrap b [2s])`, time.Second}, + {`min_over_time({a=~".+"} | unwrap b [2s]) by (a)`, time.Second}, + {`min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a)`, time.Second}, + {`rate({a=~".+"}[2s])`, time.Second}, + {`bytes_rate({a=~".+"}[2s])`, time.Second}, + + // sum + {`sum(bytes_over_time({a=~".+"}[2s]))`, time.Second}, + {`sum(count_over_time({a=~".+"}[2s]))`, time.Second}, + {`sum(sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, + {`sum(max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, + {`sum(max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, + {`sum(max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + {`sum(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, + {`sum(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, + {`sum(min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + {`sum(rate({a=~".+"}[2s]))`, time.Second}, + {`sum(bytes_rate({a=~".+"}[2s]))`, time.Second}, + + // sum by + {`sum by (a) (bytes_over_time({a=~".+"}[2s]))`, time.Second}, + {`sum by (a) (count_over_time({a=~".+"}[2s]))`, time.Second}, + {`sum by (a) (sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, + {`sum by (a) (max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, + {`sum by (a) (max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, + {`sum by (a) (max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + {`sum by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, + {`sum by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, + {`sum by (a) (min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + {`sum by (a) (rate({a=~".+"}[2s]))`, time.Second}, + {`sum by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second}, + + // count + {`count(bytes_over_time({a=~".+"}[2s]))`, time.Second}, + {`count(count_over_time({a=~".+"}[2s]))`, time.Second}, + {`count(sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, + {`count(max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, + {`count(max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, + {`count(max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + {`count(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, + {`count(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, + {`count(min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + {`count(rate({a=~".+"}[2s]))`, time.Second}, + {`count(bytes_rate({a=~".+"}[2s]))`, time.Second}, + + // count by + {`count by (a) (bytes_over_time({a=~".+"}[2s]))`, time.Second}, + {`count by (a) (count_over_time({a=~".+"}[2s]))`, time.Second}, + {`count by (a) (sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, + {`count by (a) (max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, + {`count by (a) (max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, + {`count by (a) (max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + {`count by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, + {`count by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, + {`count by (a) (min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + {`count by (a) (rate({a=~".+"}[2s]))`, time.Second}, + {`count by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second}, + + // max + {`max(bytes_over_time({a=~".+"}[2s]))`, time.Second}, + {`max(count_over_time({a=~".+"}[2s]))`, time.Second}, + {`max(sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, + {`max(max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, + {`max(max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, + {`max(max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + {`max(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, + {`max(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, + {`max(min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + {`max(rate({a=~".+"}[2s]))`, time.Second}, + {`max(bytes_rate({a=~".+"}[2s]))`, time.Second}, + + // max by + {`max by (a) (bytes_over_time({a=~".+"}[2s]))`, time.Second}, + {`max by (a) (count_over_time({a=~".+"}[2s]))`, time.Second}, + {`max by (a) (sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, + {`max by (a) (max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, + {`max by (a) (max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, + {`max by (a) (max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + {`max by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, + {`max by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, + {`max by (a) (min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + {`max by (a) (rate({a=~".+"}[2s]))`, time.Second}, + {`max by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second}, + + // min + {`min(bytes_over_time({a=~".+"}[2s]))`, time.Second}, + {`min(count_over_time({a=~".+"}[2s]))`, time.Second}, + {`min(sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, + {`min(max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, + {`min(max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, + {`min(max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + {`min(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, + {`min(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, + {`min(min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + {`min(rate({a=~".+"}[2s]))`, time.Second}, + {`min(bytes_rate({a=~".+"}[2s]))`, time.Second}, + + // min by + {`min by (a) (bytes_over_time({a=~".+"}[2s]))`, time.Second}, + {`min by (a) (count_over_time({a=~".+"}[2s]))`, time.Second}, + {`min by (a) (sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, + {`min by (a) (max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, + {`min by (a) (max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, + {`min by (a) (max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + {`min by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, + {`min by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, + {`min by (a) (min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + {`min by (a) (rate({a=~".+"}[2s]))`, time.Second}, + {`min by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second}, + + // Binary operations + {`bytes_over_time({a=~".+"}[3s]) + count_over_time({a=~".+"}[5s])`, time.Second}, + {`sum(count_over_time({a=~".+"}[3s]) * count(sum_over_time({a=~".+"} | unwrap b [5s])))`, time.Second}, + + // Multi vector aggregator layer queries + {`sum(max(bytes_over_time({a=~".+"}[3s])))`, time.Second}, + {`sum(min by (a)(max(sum by (b) (count_over_time({a=~".+"} [2s])))))`, time.Second}, + + // Non-splittable vector aggregators + // TODO: Fix topk + //{`topk(2, count_over_time({a=~".+"}[2s]))`, time.Second}, + {`avg(count_over_time({a=~".+"}[2s]))`, time.Second}, + + // Uneven split times + {`bytes_over_time({a=~".+"}[3s])`, 2 * time.Second}, + {`count_over_time({a=~".+"}[5s])`, 2 * time.Second}, + + // range with offset + {`rate({a=~".+"}[2s] offset 2s)`, time.Second}, + } { + q := NewMockQuerier( + shards, + streams, + ) + + opts := EngineOpts{} + regularEngine := NewEngine(opts, q, NoLimits, log.NewNopLogger()) + downstreamEngine := NewDownstreamEngine(opts, MockDownstreamer{regularEngine}, nilMetrics, NoLimits, log.NewNopLogger()) + + t.Run(tc.query, func(t *testing.T) { + ctx := user.InjectOrgID(context.Background(), "fake") + + params := NewLiteralParams( + tc.query, + start, + end, + step, + interval, + logproto.FORWARD, + uint32(limit), + nil, + ) + + // Regular engine + qry := regularEngine.Query(params) + res, err := qry.Exec(ctx) + require.Nil(t, err) + + // Downstream engine - split by range + rangeMapper, err := NewRangeMapper(tc.splitByInterval) + require.Nil(t, err) + noop, rangeExpr, err := rangeMapper.Parse(tc.query) + require.Nil(t, err) + + require.False(t, noop, "downstream engine cannot execute noop") + + rangeQry := downstreamEngine.Query(params, rangeExpr) + rangeRes, err := rangeQry.Exec(ctx) + require.Nil(t, err) + + require.Equal(t, res.Data, rangeRes.Data) + }) + } +} + +// approximatelyEquals ensures two responses are approximately equal, +// up to 6 decimals precision per sample func approximatelyEquals(t *testing.T, as, bs promql.Matrix) { require.Equal(t, len(as), len(bs)) diff --git a/pkg/logql/rangemapper.go b/pkg/logql/rangemapper.go new file mode 100644 index 0000000000..0bfca08a50 --- /dev/null +++ b/pkg/logql/rangemapper.go @@ -0,0 +1,332 @@ +package logql + +import ( + "fmt" + "time" + + "github.com/go-kit/log/level" + "github.com/pkg/errors" + + "github.com/grafana/loki/pkg/logql/syntax" + util_log "github.com/grafana/loki/pkg/util/log" +) + +var splittableVectorOp = map[string]struct{}{ + syntax.OpTypeSum: {}, + syntax.OpTypeCount: {}, + syntax.OpTypeMax: {}, + syntax.OpTypeMin: {}, + syntax.OpTypeAvg: {}, + syntax.OpTypeTopK: {}, +} + +var splittableRangeVectorOp = map[string]struct{}{ + syntax.OpRangeTypeRate: {}, + syntax.OpRangeTypeBytesRate: {}, + syntax.OpRangeTypeBytes: {}, + syntax.OpRangeTypeCount: {}, + syntax.OpRangeTypeSum: {}, + syntax.OpRangeTypeMax: {}, + syntax.OpRangeTypeMin: {}, +} + +type RangeMapper struct { + splitByInterval time.Duration +} + +func NewRangeMapper(interval time.Duration) (RangeMapper, error) { + if interval <= 0 { + return RangeMapper{}, fmt.Errorf("cannot create RangeMapper with splitByInterval <= 0; got %s", interval) + } + return RangeMapper{ + splitByInterval: interval, + }, nil +} + +// Parse returns (noop, parsed expression, error) +func (m RangeMapper) Parse(query string) (bool, syntax.Expr, error) { + origExpr, err := syntax.ParseSampleExpr(query) + if err != nil { + return true, nil, err + } + + if !isSplittableByRange(origExpr) { + return true, origExpr, nil + } + + modExpr, err := m.Map(origExpr, nil) + if err != nil { + return true, nil, err + } + + return origExpr.String() == modExpr.String(), modExpr, err +} + +func (m RangeMapper) Map(expr syntax.SampleExpr, vectorAggrPushdown *syntax.VectorAggregationExpr) (syntax.SampleExpr, error) { + // immediately clone the passed expr to avoid mutating the original + expr, err := clone(expr) + if err != nil { + return nil, err + } + + switch e := expr.(type) { + case *syntax.VectorAggregationExpr: + return m.mapVectorAggregationExpr(e) + case *syntax.RangeAggregationExpr: + return m.mapRangeAggregationExpr(e, vectorAggrPushdown), nil + case *syntax.BinOpExpr: + lhsMapped, err := m.Map(e.SampleExpr, vectorAggrPushdown) + if err != nil { + return nil, err + } + rhsMapped, err := m.Map(e.RHS, vectorAggrPushdown) + if err != nil { + return nil, err + } + e.SampleExpr = lhsMapped + e.RHS = rhsMapped + return e, nil + default: + return nil, errors.Errorf("unexpected expr type (%T) for ASTMapper type (%T) ", expr, m) + } +} + +// getRangeInterval returns the interval in the range vector +// Note that this function must not be called with a BinOpExpr as argument +// as it returns only the range of the RHS. +// Example: expression `count_over_time({app="foo"}[10m])` returns 10m +func getRangeInterval(expr syntax.SampleExpr) time.Duration { + var rangeInterval time.Duration + expr.Walk(func(e interface{}) { + switch concrete := e.(type) { + case *syntax.RangeAggregationExpr: + rangeInterval = concrete.Left.Interval + } + }) + return rangeInterval +} + +// hasLabelExtractionStage returns true if an expression contains a stage for label extraction, +// such as `| json` or `| logfmt`, that would result in an exploding amount of series in downstream queries. +func hasLabelExtractionStage(expr syntax.SampleExpr) bool { + found := false + expr.Walk(func(e interface{}) { + switch concrete := e.(type) { + case *syntax.LabelParserExpr: + // It will **not** return true for `regexp`, `unpack` and `pattern`, since these label extraction + // stages can control how many labels, and therefore the resulting amount of series, are extracted. + if concrete.Op == syntax.OpParserTypeJSON || concrete.Op == syntax.OpParserTypeLogfmt { + found = true + } + } + }) + return found +} + +// sumOverFullRange returns an expression that sums up individual downstream queries (with preserving labels) +// and dividing it by the full range in seconds to calculate a rate value. +// The operation defines the range aggregation operation of the downstream queries. +// Example: +// rate({app="foo"}[2m]) +// => (sum without (count_over_time({app="foo"}[1m]) ++ count_over_time({app="foo"}[1m]) offset 1m) / 120) +func (m RangeMapper) sumOverFullRange(expr *syntax.RangeAggregationExpr, overrideDownstream *syntax.VectorAggregationExpr, operation string, rangeInterval time.Duration) syntax.SampleExpr { + var downstreamExpr syntax.SampleExpr = &syntax.RangeAggregationExpr{ + Left: expr.Left, + Operation: operation, + } + // Optimization: in case overrideDownstream exists, the downstream expression can be optimized with the grouping + // and operation of the overrideDownstream expression in order to reduce the returned streams' label set. + if overrideDownstream != nil { + downstreamExpr = &syntax.VectorAggregationExpr{ + Left: downstreamExpr, + Grouping: overrideDownstream.Grouping, + Operation: overrideDownstream.Operation, + } + } + return &syntax.BinOpExpr{ + SampleExpr: &syntax.VectorAggregationExpr{ + Left: m.mapConcatSampleExpr(downstreamExpr, rangeInterval), + Grouping: &syntax.Grouping{ + Without: true, + }, + Operation: syntax.OpTypeSum, + }, + RHS: &syntax.LiteralExpr{Val: rangeInterval.Seconds()}, + Op: syntax.OpTypeDiv, + Opts: &syntax.BinOpOptions{}, + } +} + +// vectorAggrWithRangeDownstreams returns an expression that aggregates a concat sample expression of multiple range +// aggregations. If a vector aggregation is pushed down, the downstream queries of the concat sample expression are +// wrapped in the vector aggregation of the parent node. +// Example: +// min(bytes_over_time({job="bar"} [2m]) +// => min without (bytes_over_time({job="bar"} [1m]) ++ bytes_over_time({job="bar"} [1m] offset 1m)) +// min by (app) (bytes_over_time({job="bar"} [2m]) +// => min without (min by (app) (bytes_over_time({job="bar"} [1m])) ++ min by (app) (bytes_over_time({job="bar"} [1m] offset 1m))) +func (m RangeMapper) vectorAggrWithRangeDownstreams(expr *syntax.RangeAggregationExpr, vectorAggrPushdown *syntax.VectorAggregationExpr, op string, rangeInterval time.Duration) syntax.SampleExpr { + grouping := expr.Grouping + if expr.Grouping == nil { + grouping = &syntax.Grouping{ + Without: true, + } + } + var downstream syntax.SampleExpr = expr + if vectorAggrPushdown != nil { + downstream = vectorAggrPushdown + } + return &syntax.VectorAggregationExpr{ + Left: m.mapConcatSampleExpr(downstream, rangeInterval), + Grouping: grouping, + Operation: op, + } +} + +// appendDownstream adds expression expr with a range interval 'interval' and offset 'offset' to the downstreams list. +// Returns the updated downstream ConcatSampleExpr. +func appendDownstream(downstreams *ConcatSampleExpr, expr syntax.SampleExpr, interval time.Duration, offset time.Duration) *ConcatSampleExpr { + sampleExpr, _ := clone(expr) + sampleExpr.Walk(func(e interface{}) { + switch concrete := e.(type) { + case *syntax.RangeAggregationExpr: + concrete.Left.Interval = interval + if offset != 0 { + concrete.Left.Offset += offset + } + } + }) + downstreams = &ConcatSampleExpr{ + DownstreamSampleExpr: DownstreamSampleExpr{ + SampleExpr: sampleExpr, + }, + next: downstreams, + } + return downstreams +} + +// mapConcatSampleExpr transform expr in multiple downstream subexpressions split by offset range interval +// rangeInterval should be greater than m.splitByInterval, otherwise the resultant expression +// will have an unnecessary aggregation operation +func (m RangeMapper) mapConcatSampleExpr(expr syntax.SampleExpr, rangeInterval time.Duration) syntax.SampleExpr { + splitCount := int(rangeInterval / m.splitByInterval) + + if splitCount == 0 { + return expr + } + + var split int + var downstreams *ConcatSampleExpr + for split = 0; split < splitCount; split++ { + downstreams = appendDownstream(downstreams, expr, m.splitByInterval, time.Duration(split)*m.splitByInterval) + } + // Add the remainder offset interval + if rangeInterval%m.splitByInterval != 0 { + offset := time.Duration(split) * m.splitByInterval + downstreams = appendDownstream(downstreams, expr, rangeInterval-offset, offset) + } + + return downstreams +} + +func (m RangeMapper) mapVectorAggregationExpr(expr *syntax.VectorAggregationExpr) (syntax.SampleExpr, error) { + rangeInterval := getRangeInterval(expr) + + // in case the interval is smaller than the configured split interval, + // don't split it. + // TODO: what if there is another internal expr with an interval that can be split? + if rangeInterval <= m.splitByInterval { + return expr, nil + } + + // In order to minimize the amount of streams on the downstream query, + // we can push down the outer vector aggregation to the downstream query. + // This does not work for `count()` and `topk()`, though. + // We also do not want to push down, if the inner expression is a binary operation. + // TODO: Currently, it is sending the last inner expression grouping dowstream. + // Which grouping should be sent downstream? + var vectorAggrPushdown *syntax.VectorAggregationExpr + if _, ok := expr.Left.(*syntax.BinOpExpr); !ok && expr.Operation != syntax.OpTypeCount && expr.Operation != syntax.OpTypeTopK { + vectorAggrPushdown = expr + } + + // Split the vector aggregation's inner expression + lhsMapped, err := m.Map(expr.Left, vectorAggrPushdown) + if err != nil { + return nil, err + } + + return &syntax.VectorAggregationExpr{ + Left: lhsMapped, + Grouping: expr.Grouping, + Params: expr.Params, + Operation: expr.Operation, + }, nil +} + +// mapRangeAggregationExpr maps expr into a new SampleExpr with multiple downstream subqueries split by range interval +// Optimization: in order to reduce the returned stream from the inner downstream functions, in case a range aggregation +// expression is aggregated by a vector aggregation expression with a label grouping, the downstream expression can be +// exactly the same as the initial query concatenated by a `sum` operation. If this is the case, overrideDownstream +// contains the initial query which will be the downstream expression with a split range interval. +// Example: `sum by (a) (bytes_over_time)` +// Is mapped to `sum by (a) (sum without downstream++downstream++...)` +func (m RangeMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, vectorAggrPushdown *syntax.VectorAggregationExpr) syntax.SampleExpr { + rangeInterval := getRangeInterval(expr) + + // in case the interval is smaller than the configured split interval, + // don't split it. + if rangeInterval <= m.splitByInterval { + return expr + } + + // We cannot execute downstream queries that would potentially produce a huge amount of series + // and therefore would very likely fail. + if expr.Grouping == nil && hasLabelExtractionStage(expr) { + return expr + } + switch expr.Operation { + case syntax.OpRangeTypeBytes, syntax.OpRangeTypeCount, syntax.OpRangeTypeSum: + return m.vectorAggrWithRangeDownstreams(expr, vectorAggrPushdown, syntax.OpTypeSum, rangeInterval) + case syntax.OpRangeTypeMax: + return m.vectorAggrWithRangeDownstreams(expr, vectorAggrPushdown, syntax.OpTypeMax, rangeInterval) + case syntax.OpRangeTypeMin: + return m.vectorAggrWithRangeDownstreams(expr, vectorAggrPushdown, syntax.OpTypeMin, rangeInterval) + case syntax.OpRangeTypeRate: + return m.sumOverFullRange(expr, vectorAggrPushdown, syntax.OpRangeTypeCount, rangeInterval) + case syntax.OpRangeTypeBytesRate: + return m.sumOverFullRange(expr, vectorAggrPushdown, syntax.OpRangeTypeBytes, rangeInterval) + default: + // this should not be reachable. + // If an operation is splittable it should have an optimization listed. + level.Warn(util_log.Logger).Log( + "msg", "unexpected range aggregation expression", + "operation", expr.Operation, + ) + return expr + } +} + +// isSplittableByRange returns whether it is possible to optimize the given sample expression +func isSplittableByRange(expr syntax.SampleExpr) bool { + switch e := expr.(type) { + case *syntax.VectorAggregationExpr: + _, ok := splittableVectorOp[e.Operation] + return ok && isSplittableByRange(e.Left) + case *syntax.BinOpExpr: + return isSplittableByRange(e.SampleExpr) && isSplittableByRange(e.RHS) + case *syntax.LabelReplaceExpr: + return isSplittableByRange(e.Left) + case *syntax.RangeAggregationExpr: + _, ok := splittableRangeVectorOp[e.Operation] + return ok + default: + return false + } +} + +// clone returns a copy of the given sample expression +// This is needed whenever we want to modify the existing query tree. +func clone(expr syntax.SampleExpr) (syntax.SampleExpr, error) { + return syntax.ParseSampleExpr(expr.String()) +} diff --git a/pkg/logql/rangemapper_test.go b/pkg/logql/rangemapper_test.go new file mode 100644 index 0000000000..9c77a03f22 --- /dev/null +++ b/pkg/logql/rangemapper_test.go @@ -0,0 +1,1057 @@ +package logql + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func Test_SplitRangeInterval(t *testing.T) { + rvm, err := NewRangeMapper(2 * time.Second) + require.NoError(t, err) + + for _, tc := range []struct { + expr string + expected string + }{ + { + `bytes_over_time({app="foo"}[3s])`, + `sum without( + downstream> + ++ downstream> + )`, + }, + { + `count_over_time({app="foo"}[5s])`, + `sum without( + downstream> + ++ downstream> + ++ downstream> + )`, + }, + { + `rate({app="foo"}[4s] offset 1m)`, + `(sum without( + downstream> + ++ downstream> + ) / 4)`, + }, + } { + tc := tc + t.Run(tc.expr, func(t *testing.T) { + t.Parallel() + noop, mappedExpr, err := rvm.Parse(tc.expr) + require.NoError(t, err) + require.Equal(t, removeWhiteSpace(tc.expected), removeWhiteSpace(mappedExpr.String())) + require.Equal(t, false, noop) + }) + } +} + +func Test_SplitRangeVectorMapping(t *testing.T) { + rvm, err := NewRangeMapper(time.Minute) + require.NoError(t, err) + + for _, tc := range []struct { + expr string + expected string + }{ + // Range vector aggregators + { + `bytes_over_time({app="foo"}[3m])`, + `sum without( + downstream> + ++ downstream> + ++ downstream> + )`, + }, + { + `count_over_time({app="foo"}[3m])`, + `sum without( + downstream> + ++ downstream> + ++ downstream> + )`, + }, + { + `sum_over_time({app="foo"} | unwrap bar [3m])`, + `sum without( + downstream> + ++ downstream> + ++ downstream> + )`, + }, + { + `max_over_time({app="foo"} | unwrap bar [3m])`, + `max without( + downstream> + ++ downstream> + ++ downstream> + )`, + }, + { + `max_over_time({app="foo"} | json | unwrap bar [3m]) by (bar)`, + `max by (bar) ( + downstream> + ++ downstream> + ++ downstream> + )`, + }, + { + `max_over_time({app="foo"} | unwrap bar [3m]) by (baz)`, + `max by (baz) ( + downstream> + ++ downstream> + ++ downstream> + )`, + }, + { + `min_over_time({app="foo"} | unwrap bar [3m])`, + `min without( + downstream> + ++ downstream> + ++ downstream> + )`, + }, + { + `min_over_time({app="foo"} | unwrap bar [3m]) by (baz)`, + `min by (baz) ( + downstream> + ++ downstream> + ++ downstream> + )`, + }, + { + `rate({app="foo"}[3m])`, + `(sum without( + downstream> + ++ downstream> + ++ downstream> + ) / 180)`, + }, + { + `bytes_rate({app="foo"}[3m])`, + `(sum without( + downstream> + ++ downstream> + ++ downstream> + ) / 180)`, + }, + + // Vector aggregator - sum + { + `sum(bytes_over_time({app="foo"}[3m]))`, + `sum( + sum without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `sum(count_over_time({app="foo"}[3m]))`, + `sum( + sum without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `sum(sum_over_time({app="foo"} | unwrap bar [3m]))`, + `sum( + sum without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `sum(max_over_time({app="foo"} | unwrap bar [3m]))`, + `sum( + max without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `sum(max_over_time({app="foo"} | unwrap bar [3m]) by (baz))`, + `sum( + max by (baz) ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `sum(min_over_time({app="foo"} | unwrap bar [3m]))`, + `sum( + min without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `sum(min_over_time({app="foo"} | unwrap bar [3m]) by (baz))`, + `sum( + min by (baz) ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `sum(rate({app="foo"}[3m]))`, + `sum( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + )`, + }, + { + `sum(bytes_rate({app="foo"}[3m]))`, + `sum( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + )`, + }, + + // Vector aggregator - sum by + { + `sum by (baz) (bytes_over_time({app="foo"}[3m]))`, + `sum by (baz) ( + sum without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `sum by (baz) (count_over_time({app="foo"}[3m]))`, + `sum by (baz) ( + sum without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `sum by (baz) (sum_over_time({app="foo"} | unwrap bar [3m]))`, + `sum by (baz) ( + sum without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `sum by (baz) (max_over_time({app="foo"} | unwrap bar [3m]))`, + `sum by (baz) ( + max without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `sum by (baz) (max_over_time({app="foo"} | unwrap bar [3m]) by (baz))`, + `sum by (baz) ( + max by (baz) ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `sum by (baz) (min_over_time({app="foo"} | unwrap bar [3m]))`, + `sum by (baz) ( + min without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `sum by (baz) (min_over_time({app="foo"} | unwrap bar [3m]) by (baz))`, + `sum by (baz) ( + min by (baz) ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `sum by (baz) (rate({app="foo"}[3m]))`, + `sum by (baz) ( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + )`, + }, + { + `sum by (baz) (bytes_rate({app="foo"}[3m]))`, + `sum by (baz) ( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + )`, + }, + + // Vector aggregator - count + { + `count(bytes_over_time({app="foo"}[3m]))`, + `count( + sum without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `count(count_over_time({app="foo"}[3m]))`, + `count( + sum without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `count(sum_over_time({app="foo"} | unwrap bar [3m]))`, + `count( + sum without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `count(max_over_time({app="foo"} | unwrap bar [3m]))`, + `count( + max without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `count(max_over_time({app="foo"} | unwrap bar [3m]) by (baz))`, + `count( + max by (baz) ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `count(min_over_time({app="foo"} | unwrap bar [3m]))`, + `count( + min without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `count(min_over_time({app="foo"} | unwrap bar [3m]) by (baz))`, + `count( + min by (baz) ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `count(rate({app="foo"}[3m]))`, + `count( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + )`, + }, + { + `count(bytes_rate({app="foo"}[3m]))`, + `count( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + )`, + }, + + // Vector aggregator - count by + { + `count by (baz) (bytes_over_time({app="foo"}[3m]))`, + `count by (baz) ( + sum without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `count by (baz) (count_over_time({app="foo"}[3m]))`, + `count by (baz) ( + sum without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `count by (baz) (sum_over_time({app="foo"} | unwrap bar [3m]))`, + `count by (baz) ( + sum without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `count by (baz) (max_over_time({app="foo"} | unwrap bar [3m]))`, + `count by (baz) ( + max without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `count by (baz) (max_over_time({app="foo"} | unwrap bar [3m]) by (baz))`, + `count by (baz) ( + max by (baz) ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `count by (baz) (min_over_time({app="foo"} | unwrap bar [3m]))`, + `count by (baz) ( + min without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `count by (baz) (min_over_time({app="foo"} | unwrap bar [3m]) by (baz))`, + `count by (baz) ( + min by (baz) ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `count by (baz) (rate({app="foo"}[3m]))`, + `count by (baz) ( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + )`, + }, + { + `count by (baz) (bytes_rate({app="foo"}[3m]))`, + `count by (baz) ( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + )`, + }, + + // Vector aggregator - max + { + `max(bytes_over_time({app="foo"}[3m]))`, + `max( + sum without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `max(count_over_time({app="foo"}[3m]))`, + `max( + sum without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `max(sum_over_time({app="foo"} | unwrap bar [3m]))`, + `max( + sum without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `max(max_over_time({app="foo"} | unwrap bar [3m]))`, + `max( + max without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `max(max_over_time({app="foo"} | unwrap bar [3m]) by (baz))`, + `max( + max by (baz) ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `max(min_over_time({app="foo"} | unwrap bar [3m]))`, + `max( + min without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `max(min_over_time({app="foo"} | unwrap bar [3m]) by (baz))`, + `max( + min by (baz) ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `max(rate({app="foo"}[3m]))`, + `max( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + )`, + }, + { + `max(bytes_rate({app="foo"}[3m]))`, + `max( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + )`, + }, + + // Vector aggregator - max by + { + `max by (baz) (bytes_over_time({app="foo"}[3m]))`, + `max by (baz) ( + sum without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `max by (baz) (count_over_time({app="foo"}[3m]))`, + `max by (baz) ( + sum without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `max by (baz) (sum_over_time({app="foo"} | unwrap bar [3m]))`, + `max by (baz) ( + sum without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `max by (baz) (max_over_time({app="foo"} | unwrap bar [3m]))`, + `max by (baz) ( + max without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `max by (baz) (max_over_time({app="foo"} | unwrap bar [3m]) by (baz))`, + `max by (baz) ( + max by (baz) ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `max by (baz) (min_over_time({app="foo"} | unwrap bar [3m]))`, + `max by (baz) ( + min without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `max by (baz) (min_over_time({app="foo"} | unwrap bar [3m]) by (baz))`, + `max by (baz) ( + min by (baz) ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `max by (baz) (rate({app="foo"}[3m]))`, + `max by (baz) ( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + )`, + }, + { + `max by (baz) (bytes_rate({app="foo"}[3m]))`, + `max by (baz) ( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + )`, + }, + + // Vector aggregator - min + { + `min(bytes_over_time({app="foo"}[3m]))`, + `min( + sum without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `min(count_over_time({app="foo"}[3m]))`, + `min( + sum without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `min(sum_over_time({app="foo"} | unwrap bar [3m]))`, + `min( + sum without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `min(max_over_time({app="foo"} | unwrap bar [3m]))`, + `min( + max without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `min(max_over_time({app="foo"} | unwrap bar [3m]) by (baz))`, + `min( + max by (baz) ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `min(min_over_time({app="foo"} | unwrap bar [3m]))`, + `min( + min without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `min(min_over_time({app="foo"} | unwrap bar [3m]) by (baz))`, + `min( + min by (baz) ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `min(rate({app="foo"}[3m]))`, + `min( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + )`, + }, + { + `min(bytes_rate({app="foo"}[3m]))`, + `min( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + )`, + }, + + // Vector aggregator - min by + { + `min by (baz) (bytes_over_time({app="foo"}[3m]))`, + `min by (baz) ( + sum without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `min by (baz) (count_over_time({app="foo"}[3m]))`, + `min by (baz) ( + sum without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `min by (baz) (sum_over_time({app="foo"} | unwrap bar [3m]))`, + `min by (baz) ( + sum without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `min by (baz) (max_over_time({app="foo"} | unwrap bar [3m]))`, + `min by (baz) ( + max without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `min by (baz) (max_over_time({app="foo"} | unwrap bar [3m]) by (baz))`, + `min by (baz) ( + max by (baz) ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `min by (baz) (min_over_time({app="foo"} | unwrap bar [3m]))`, + `min by (baz) ( + min without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `min by (baz) (min_over_time({app="foo"} | unwrap bar [3m]) by (baz))`, + `min by (baz) ( + min by (baz) ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `min by (baz) (rate({app="foo"}[3m]))`, + `min by (baz) ( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + )`, + }, + { + `min by (baz) (bytes_rate({app="foo"}[3m]))`, + `min by (baz) ( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + )`, + }, + + // Binary operations + { + `bytes_over_time({app="foo"}[3m]) + count_over_time({app="foo"}[5m])`, + `(sum without ( + downstream> + ++ downstream> + ++ downstream> + ) + + sum without ( + downstream> + ++ downstream> + ++ downstream> + ++ downstream> + ++ downstream> + )) + `, + }, + { + `sum(count_over_time({app="foo"}[3m]) * count(sum_over_time({app="foo"} | unwrap bar [5m])))`, + `sum( + (sum without( + downstream> + ++ downstream> + ++ downstream> + ) * + count ( + sum without( + downstream> + ++ downstream> + ++ downstream> + ++ downstream> + ++ downstream> + ) + )) + ) + `, + }, + { + `sum by (app) (bytes_rate({app="foo"}[3m])) / sum by (app) (rate({app="foo"}[3m]))`, + `( + sum by (app) ( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + ) + / + sum by (app) ( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + ) + )`, + }, + + // Multi vector aggregator layer queries + { + `sum(max(bytes_over_time({app="foo"}[3m])))`, + `sum( + max( + sum without( + downstream> + ++ downstream> + ++ downstream> + ) + ) + ) + `, + }, + + // Non-splittable vector aggregators - should go deeper in the AST + { + `topk(2, count_over_time({app="foo"}[3m]))`, + `topk(2, + sum without( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + } { + tc := tc + t.Run(tc.expr, func(t *testing.T) { + t.Parallel() + noop, mappedExpr, err := rvm.Parse(tc.expr) + require.NoError(t, err) + require.False(t, noop) + require.Equal(t, removeWhiteSpace(tc.expected), removeWhiteSpace(mappedExpr.String())) + }) + } +} + +func Test_SplitRangeVectorMapping_Noop(t *testing.T) { + rvm, err := NewRangeMapper(time.Minute) + require.NoError(t, err) + + for _, tc := range []struct { + expr string + expected string + }{ + // Non-splittable range vector aggregators + { + `quantile_over_time(0.95, {app="foo"} | unwrap bar[3m])`, + `quantile_over_time(0.95, {app="foo"} | unwrap bar[3m])`, + }, + { + `sum(avg_over_time({app="foo"} | unwrap bar[3m]))`, + `sum(avg_over_time({app="foo"} | unwrap bar[3m]))`, + }, + + // should be noop if range interval is slower or equal to split interval (1m) + { + `bytes_over_time({app="foo"}[1m])`, + `bytes_over_time({app="foo"}[1m])`, + }, + + // should be noop if inner range aggregation includes a stage for label extraction such as `| json` or `| logfmt` + // because otherwise the downstream queries would result in too many series + { + `sum(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `sum(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + }, + { + `sum(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `sum(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + }, + { + `sum(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `sum(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + }, + { + `min(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `min(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + }, + { + `min(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `min(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + }, + { + `min(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `min(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + }, + { + `max(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `max(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + }, + { + `max(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `max(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + }, + { + `max(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `max(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + }, + { + `max_over_time({app="foo"} | json | unwrap bar [3m])`, + `max_over_time({app="foo"} | json | unwrap bar [3m])`, + }, + } { + tc := tc + t.Run(tc.expr, func(t *testing.T) { + t.Parallel() + noop, mappedExpr, err := rvm.Parse(tc.expr) + require.NoError(t, err) + require.True(t, noop) + require.Equal(t, removeWhiteSpace(tc.expected), removeWhiteSpace(mappedExpr.String())) + }) + } +} + +func Test_FailQuery(t *testing.T) { + rvm, err := NewRangeMapper(2 * time.Minute) + require.NoError(t, err) + _, _, err = rvm.Parse(`{app="foo"} |= "err"`) + require.Error(t, err) +} diff --git a/pkg/logql/test_utils.go b/pkg/logql/test_utils.go index 55cef0e55b..983e989acd 100644 --- a/pkg/logql/test_utils.go +++ b/pkg/logql/test_utils.go @@ -261,7 +261,7 @@ func randomStreams(nStreams, nEntries, nShards int, labelNames []string) (stream for j := 0; j <= nEntries; j++ { stream.Entries = append(stream.Entries, logproto.Entry{ Timestamp: time.Unix(0, int64(j*int(time.Second))), - Line: fmt.Sprintf("line number: %d", j), + Line: fmt.Sprintf("stream=stderr level=debug line=%d", j), }) } diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index 5f9fdbfcdb..b50c618887 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -478,6 +478,7 @@ func NewInstantMetricTripperware( if cfg.ShardedQueries { queryRangeMiddleware = append(queryRangeMiddleware, + NewSplitByRangeMiddleware(log, limits, nil), NewQueryShardMiddleware( log, schema.Configs, diff --git a/pkg/querier/queryrange/split_by_range.go b/pkg/querier/queryrange/split_by_range.go new file mode 100644 index 0000000000..43890cf1e9 --- /dev/null +++ b/pkg/querier/queryrange/split_by_range.go @@ -0,0 +1,120 @@ +package queryrange + +import ( + "context" + "fmt" + "net/http" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/tenant" + "github.com/prometheus/prometheus/promql/parser" + "github.com/weaveworks/common/httpgrpc" + + "github.com/grafana/loki/pkg/loghttp" + "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" + util_log "github.com/grafana/loki/pkg/util/log" + "github.com/grafana/loki/pkg/util/marshal" + "github.com/grafana/loki/pkg/util/validation" +) + +type splitByRange struct { + logger log.Logger + next queryrangebase.Handler + limits Limits + + ng *logql.DownstreamEngine +} + +// NewSplitByRangeMiddleware creates a new Middleware that splits log requests by the range interval. +func NewSplitByRangeMiddleware(logger log.Logger, limits Limits, metrics *logql.ShardingMetrics) queryrangebase.Middleware { + return queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler { + return &splitByRange{ + logger: log.With(logger, "middleware", "InstantQuery.splitByRangeVector"), + next: next, + limits: limits, + ng: logql.NewDownstreamEngine(logql.EngineOpts{}, DownstreamHandler{next}, metrics, limits, logger), + } + }) +} + +func (s *splitByRange) Do(ctx context.Context, request queryrangebase.Request) (queryrangebase.Response, error) { + logger := util_log.WithContext(ctx, s.logger) + + tenants, err := tenant.TenantIDs(ctx) + if err != nil { + return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + } + + interval := validation.SmallestPositiveNonZeroDurationPerTenant(tenants, s.limits.QuerySplitDuration) + // if no interval configured, continue to the next middleware + if interval == 0 { + return s.next.Do(ctx, request) + } + + mapper, err := logql.NewRangeMapper(interval) + if err != nil { + return nil, err + } + + noop, parsed, err := mapper.Parse(request.GetQuery()) + if err != nil { + level.Warn(logger).Log("msg", "failed mapping AST", "err", err.Error(), "query", request.GetQuery()) + return nil, err + } + level.Debug(logger).Log("msg", "mapped instant query", "interval", interval.String(), "noop", noop, "original", request.GetQuery(), "mapped", parsed.String()) + + if noop { + // the query cannot be split, so continue + return s.next.Do(ctx, request) + } + + params, err := paramsFromRequest(request) + if err != nil { + return nil, err + } + + if _, ok := request.(*LokiInstantRequest); !ok { + return nil, fmt.Errorf("expected *LokiInstantRequest") + } + + query := s.ng.Query(params, parsed) + + res, err := query.Exec(ctx) + if err != nil { + return nil, err + } + + value, err := marshal.NewResultValue(res.Data) + if err != nil { + return nil, err + } + + switch res.Data.Type() { + case parser.ValueTypeMatrix: + return &LokiPromResponse{ + Response: &queryrangebase.PrometheusResponse{ + Status: loghttp.QueryStatusSuccess, + Data: queryrangebase.PrometheusData{ + ResultType: loghttp.ResultTypeMatrix, + Result: toProtoMatrix(value.(loghttp.Matrix)), + }, + }, + Statistics: res.Statistics, + }, nil + case parser.ValueTypeVector: + return &LokiPromResponse{ + Statistics: res.Statistics, + Response: &queryrangebase.PrometheusResponse{ + Status: loghttp.QueryStatusSuccess, + Data: queryrangebase.PrometheusData{ + ResultType: loghttp.ResultTypeVector, + Result: toProtoVector(value.(loghttp.Vector)), + }, + }, + }, nil + default: + return nil, fmt.Errorf("unexpected downstream response type (%T)", res.Data.Type()) + } +} diff --git a/pkg/querier/queryrange/split_by_range_test.go b/pkg/querier/queryrange/split_by_range_test.go new file mode 100644 index 0000000000..9e261e8a57 --- /dev/null +++ b/pkg/querier/queryrange/split_by_range_test.go @@ -0,0 +1,181 @@ +package queryrange + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/grafana/loki/pkg/loghttp" + + "github.com/go-kit/log" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/user" + + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" +) + +func Test_RangeVectorSplit(t *testing.T) { + srm := NewSplitByRangeMiddleware(log.NewNopLogger(), fakeLimits{ + maxSeries: 10000, + splits: map[string]time.Duration{ + "tenant": time.Minute, + }, + }, nilShardingMetrics) + + ctx := user.InjectOrgID(context.TODO(), "tenant") + + for _, tc := range []struct { + in queryrangebase.Request + subQueries []queryrangebase.RequestResponse + expected queryrangebase.Response + }{ + { + in: &LokiInstantRequest{ + Query: `sum(bytes_over_time({app="foo"}[3m]))`, + TimeTs: time.Unix(1, 0), + Path: "/loki/api/v1/query", + }, + subQueries: []queryrangebase.RequestResponse{ + subQueryRequestResponse(`sum(bytes_over_time({app="foo"}[1m]))`, 1), + subQueryRequestResponse(`sum(bytes_over_time({app="foo"}[1m] offset 1m0s))`, 2), + subQueryRequestResponse(`sum(bytes_over_time({app="foo"}[1m] offset 2m0s))`, 3), + }, + expected: expectedMergedResponse(1 + 2 + 3), + }, + { + in: &LokiInstantRequest{ + Query: `sum by (bar) (bytes_over_time({app="foo"}[3m]))`, + TimeTs: time.Unix(1, 0), + Path: "/loki/api/v1/query", + }, + subQueries: []queryrangebase.RequestResponse{ + subQueryRequestResponse(`sum by(bar)(bytes_over_time({app="foo"}[1m]))`, 10), + subQueryRequestResponse(`sum by(bar)(bytes_over_time({app="foo"}[1m] offset 1m0s))`, 20), + subQueryRequestResponse(`sum by(bar)(bytes_over_time({app="foo"}[1m] offset 2m0s))`, 30), + }, + expected: expectedMergedResponse(10 + 20 + 30), + }, + { + in: &LokiInstantRequest{ + Query: `sum(count_over_time({app="foo"}[3m]))`, + TimeTs: time.Unix(1, 0), + Path: "/loki/api/v1/query", + }, + subQueries: []queryrangebase.RequestResponse{ + subQueryRequestResponse(`sum(count_over_time({app="foo"}[1m]))`, 1), + subQueryRequestResponse(`sum(count_over_time({app="foo"}[1m] offset 1m0s))`, 1), + subQueryRequestResponse(`sum(count_over_time({app="foo"}[1m] offset 2m0s))`, 1), + }, + expected: expectedMergedResponse(1 + 1 + 1), + }, + { + in: &LokiInstantRequest{ + Query: `sum by (bar) (count_over_time({app="foo"}[3m]))`, + TimeTs: time.Unix(1, 0), + Path: "/loki/api/v1/query", + }, + subQueries: []queryrangebase.RequestResponse{ + subQueryRequestResponse(`sum by(bar)(count_over_time({app="foo"}[1m]))`, 0), + subQueryRequestResponse(`sum by(bar)(count_over_time({app="foo"}[1m] offset 1m0s))`, 0), + subQueryRequestResponse(`sum by(bar)(count_over_time({app="foo"}[1m] offset 2m0s))`, 0), + }, + expected: expectedMergedResponse(0 + 0 + 0), + }, + { + in: &LokiInstantRequest{ + Query: `sum(sum_over_time({app="foo"} | unwrap bar [3m]))`, + TimeTs: time.Unix(1, 0), + Path: "/loki/api/v1/query", + }, + subQueries: []queryrangebase.RequestResponse{ + subQueryRequestResponse(`sum(sum_over_time({app="foo"} | unwrap bar[1m]))`, 1), + subQueryRequestResponse(`sum(sum_over_time({app="foo"} | unwrap bar[1m] offset 1m0s))`, 2), + subQueryRequestResponse(`sum(sum_over_time({app="foo"} | unwrap bar[1m] offset 2m0s))`, 3), + }, + expected: expectedMergedResponse(1 + 2 + 3), + }, + { + in: &LokiInstantRequest{ + Query: `sum by (bar) (sum_over_time({app="foo"} | unwrap bar [3m]))`, + TimeTs: time.Unix(1, 0), + Path: "/loki/api/v1/query", + }, + subQueries: []queryrangebase.RequestResponse{ + subQueryRequestResponse(`sum by(bar)(sum_over_time({app="foo"} | unwrap bar[1m]))`, 1), + subQueryRequestResponse(`sum by(bar)(sum_over_time({app="foo"} | unwrap bar[1m] offset 1m0s))`, 2), + subQueryRequestResponse(`sum by(bar)(sum_over_time({app="foo"} | unwrap bar[1m] offset 2m0s))`, 3), + }, + expected: expectedMergedResponse(1 + 2 + 3), + }, + } { + tc := tc + t.Run(tc.in.GetQuery(), func(t *testing.T) { + resp, err := srm.Wrap(queryrangebase.HandlerFunc( + func(ctx context.Context, req queryrangebase.Request) (queryrangebase.Response, error) { + // Assert subquery request + for _, reqResp := range tc.subQueries { + if req.GetQuery() == reqResp.Request.GetQuery() { + require.Equal(t, reqResp.Request, req) + // return the test data subquery response + return reqResp.Response, nil + } + } + + return nil, fmt.Errorf("subquery request '" + req.GetQuery() + "' not found") + })).Do(ctx, tc.in) + require.NoError(t, err) + require.Equal(t, tc.expected, resp.(*LokiPromResponse).Response) + }) + } +} + +// subQueryRequestResponse returns a RequestResponse containing the expected subQuery instant request +// and a response containing a sample value returned from the following wrapper +func subQueryRequestResponse(expectedSubQuery string, sampleValue float64) queryrangebase.RequestResponse { + return queryrangebase.RequestResponse{ + Request: &LokiInstantRequest{ + Query: expectedSubQuery, + TimeTs: time.Unix(1, 0), + Path: "/loki/api/v1/query", + }, + Response: &LokiPromResponse{ + Response: &queryrangebase.PrometheusResponse{ + Status: loghttp.QueryStatusSuccess, + Data: queryrangebase.PrometheusData{ + ResultType: loghttp.ResultTypeVector, + Result: []queryrangebase.SampleStream{ + { + Labels: []logproto.LabelAdapter{ + {Name: "app", Value: "foo"}, + }, + Samples: []logproto.LegacySample{ + {TimestampMs: 1000, Value: sampleValue}, + }, + }, + }, + }, + }, + }, + } +} + +// expectedMergedResponse returns the expected middleware Prometheus response with the samples +// as the expectedSampleValue +func expectedMergedResponse(expectedSampleValue float64) *queryrangebase.PrometheusResponse { + return &queryrangebase.PrometheusResponse{ + Status: loghttp.QueryStatusSuccess, + Data: queryrangebase.PrometheusData{ + ResultType: loghttp.ResultTypeVector, + Result: []queryrangebase.SampleStream{ + { + Labels: []logproto.LabelAdapter{}, + Samples: []logproto.LegacySample{ + {TimestampMs: 1000, Value: expectedSampleValue}, + }, + }, + }, + }, + } +}