diff --git a/pkg/logql/downstream_test.go b/pkg/logql/downstream_test.go index a845337e61..f65ae9a60e 100644 --- a/pkg/logql/downstream_test.go +++ b/pkg/logql/downstream_test.go @@ -123,28 +123,19 @@ func TestRangeMappingEquivalence(t *testing.T) { {`sum_over_time({a=~".+"} | unwrap b [2s])`, time.Second}, {`max_over_time({a=~".+"} | unwrap b [2s])`, time.Second}, {`max_over_time({a=~".+"} | unwrap b [2s]) by (a)`, time.Second}, - {`max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a)`, time.Second}, {`min_over_time({a=~".+"} | unwrap b [2s])`, time.Second}, {`min_over_time({a=~".+"} | unwrap b [2s]) by (a)`, time.Second}, - {`min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a)`, time.Second}, {`rate({a=~".+"}[2s])`, time.Second}, {`bytes_rate({a=~".+"}[2s])`, time.Second}, // sum {`sum(bytes_over_time({a=~".+"}[2s]))`, time.Second}, - {`sum(bytes_over_time({a=~".+"} | logfmt | line > 5 [2s]))`, time.Second}, {`sum(count_over_time({a=~".+"}[2s]))`, time.Second}, - {`sum(count_over_time({a=~".+"} | logfmt | line > 5 [2s]))`, time.Second}, {`sum(sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, - {`sum(sum_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second}, {`sum(max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`sum(max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, - {`sum(max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, - {`sum(max_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second}, {`sum(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`sum(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, - {`sum(min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, - {`sum(min_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second}, {`sum(rate({a=~".+"}[2s]))`, time.Second}, {`sum(bytes_rate({a=~".+"}[2s]))`, time.Second}, @@ -154,10 +145,8 @@ func TestRangeMappingEquivalence(t *testing.T) { {`sum by (a) (sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`sum by (a) (max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`sum by (a) (max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, - {`sum by (a) (max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, {`sum by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`sum by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, - {`sum by (a) (min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, {`sum by (a) (rate({a=~".+"}[2s]))`, time.Second}, {`sum by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second}, @@ -167,10 +156,8 @@ func TestRangeMappingEquivalence(t *testing.T) { {`count(sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`count(max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`count(max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, - {`count(max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, {`count(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`count(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, - {`count(min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, {`count(rate({a=~".+"}[2s]))`, time.Second}, {`count(bytes_rate({a=~".+"}[2s]))`, time.Second}, @@ -180,10 +167,8 @@ func TestRangeMappingEquivalence(t *testing.T) { {`count by (a) (sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`count by (a) (max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`count by (a) (max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, - {`count by (a) (max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, {`count by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`count by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, - {`count by (a) (min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, {`count by (a) (rate({a=~".+"}[2s]))`, time.Second}, {`count by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second}, @@ -193,10 +178,8 @@ func TestRangeMappingEquivalence(t *testing.T) { {`max(sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`max(max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`max(max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, - {`max(max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, {`max(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`max(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, - {`max(min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, {`max(rate({a=~".+"}[2s]))`, time.Second}, {`max(bytes_rate({a=~".+"}[2s]))`, time.Second}, @@ -206,10 +189,8 @@ func TestRangeMappingEquivalence(t *testing.T) { {`max by (a) (sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`max by (a) (max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`max by (a) (max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, - {`max by (a) (max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, {`max by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`max by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, - {`max by (a) (min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, {`max by (a) (rate({a=~".+"}[2s]))`, time.Second}, {`max by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second}, @@ -219,12 +200,8 @@ func TestRangeMappingEquivalence(t *testing.T) { {`min(sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`min(max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`min(max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, - {`min(max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, - {`min(max_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second}, {`min(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`min(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, - {`min(min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, - {`min(min_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second}, {`min(rate({a=~".+"}[2s]))`, time.Second}, {`min(bytes_rate({a=~".+"}[2s]))`, time.Second}, @@ -234,17 +211,73 @@ func TestRangeMappingEquivalence(t *testing.T) { {`min by (a) (sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`min by (a) (max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`min by (a) (max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, - {`min by (a) (max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, {`min by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second}, {`min by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second}, - {`min by (a) (min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, {`min by (a) (rate({a=~".+"}[2s]))`, time.Second}, {`min by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second}, + // Label extraction stage + {`max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a)`, time.Second}, + {`min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a)`, time.Second}, + + {`sum(bytes_over_time({a=~".+"} | logfmt | line > 5 [2s]))`, time.Second}, + {`sum(count_over_time({a=~".+"} | logfmt [2s]))`, time.Second}, + {`sum(sum_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second}, + {`sum(max_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second}, + {`sum(max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + {`sum(min_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second}, + {`sum(min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + {`sum(rate({a=~".+"} | logfmt[2s]))`, time.Second}, + {`sum(bytes_rate({a=~".+"} | logfmt[2s]))`, time.Second}, + {`sum by (a) (bytes_over_time({a=~".+"} | logfmt [2s]))`, time.Second}, + {`sum by (a) (count_over_time({a=~".+"} | logfmt [2s]))`, time.Second}, + {`sum by (a) (sum_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second}, + {`sum by (a) (max_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second}, + {`sum by (a) (max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + {`sum by (a) (min_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second}, + {`sum by (a) (min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + {`sum by (a) (rate({a=~".+"} | logfmt[2s]))`, time.Second}, + {`sum by (a) (bytes_rate({a=~".+"} | logfmt[2s]))`, time.Second}, + + {`count(max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + {`count(min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + {`count by (a) (max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + {`count by (a) (min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + + {`max(bytes_over_time({a=~".+"} | logfmt [2s]))`, time.Second}, + {`max(count_over_time({a=~".+"} | logfmt [2s]))`, time.Second}, + {`max(sum_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second}, + {`max(max_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second}, + {`max(max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + {`max(min_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second}, + {`max(min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + {`max by (a) (bytes_over_time({a=~".+"} | logfmt [2s]))`, time.Second}, + {`max by (a) (count_over_time({a=~".+"} | logfmt [2s]))`, time.Second}, + {`max by (a) (sum_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second}, + {`max by (a) (max_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second}, + {`max by (a) (max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + {`max by (a) (min_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second}, + {`max by (a) (min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + + {`min(bytes_over_time({a=~".+"} | logfmt [2s]))`, time.Second}, + {`min(count_over_time({a=~".+"} | logfmt [2s]))`, time.Second}, + {`min(sum_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second}, + {`min(max_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second}, + {`min(max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + {`min(min_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second}, + {`min(min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + {`min by (a) (bytes_over_time({a=~".+"} | logfmt [2s]))`, time.Second}, + {`min by (a) (count_over_time({a=~".+"} | logfmt [2s]))`, time.Second}, + {`min by (a) (sum_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second}, + {`min by (a) (max_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second}, + {`min by (a) (max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + {`min by (a) (min_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second}, + {`min by (a) (min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, + // Binary operations {`bytes_over_time({a=~".+"}[3s]) + count_over_time({a=~".+"}[5s])`, time.Second}, {`sum(count_over_time({a=~".+"}[3s]) * count(sum_over_time({a=~".+"} | unwrap b [5s])))`, time.Second}, - {`sum(count_over_time({a=~".+"} | logfmt | b > 2 [3s])) / sum(count_over_time({a=~".+"} [3s]))`, time.Second}, + {`sum by (a) (count_over_time({a=~".+"} | logfmt | line > 5 [3s])) / sum by (a) (count_over_time({a=~".+"} [3s]))`, time.Second}, // Multi vector aggregator layer queries {`sum(max(bytes_over_time({a=~".+"}[3s])))`, time.Second}, diff --git a/pkg/logql/rangemapper.go b/pkg/logql/rangemapper.go index 88de6b9ce2..3085828b6f 100644 --- a/pkg/logql/rangemapper.go +++ b/pkg/logql/rangemapper.go @@ -341,21 +341,42 @@ func (m RangeMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, return expr } - // We cannot execute downstream queries that would potentially produce a huge amount of series - // and therefore would very likely fail. - if expr.Grouping == nil && vectorAggrPushdown == nil && hasLabelExtractionStage(expr) { + labelExtractor := hasLabelExtractionStage(expr) + + // Downstream queries with label extractors can potentially produce a huge amount of series + // which can impact the queries and consequently fail. + // Note: vector aggregation expressions aggregate the result in a single empty label set, + // so these expressions can be pushed downstream + if expr.Grouping == nil && vectorAggrPushdown == nil && labelExtractor { return expr } switch expr.Operation { - case syntax.OpRangeTypeBytes, syntax.OpRangeTypeCount, syntax.OpRangeTypeSum: + case syntax.OpRangeTypeSum: + return m.vectorAggrWithRangeDownstreams(expr, vectorAggrPushdown, syntax.OpTypeSum, rangeInterval, recorder) + case syntax.OpRangeTypeBytes, syntax.OpRangeTypeCount: + // Downstream queries with label extractors use concat as aggregation operator instead of sum + // in order to merge the resultant label sets + if labelExtractor { + var downstream syntax.SampleExpr = expr + if vectorAggrPushdown != nil { + downstream = vectorAggrPushdown + } + return m.mapConcatSampleExpr(downstream, rangeInterval, recorder) + } return m.vectorAggrWithRangeDownstreams(expr, vectorAggrPushdown, syntax.OpTypeSum, rangeInterval, recorder) case syntax.OpRangeTypeMax: return m.vectorAggrWithRangeDownstreams(expr, vectorAggrPushdown, syntax.OpTypeMax, rangeInterval, recorder) case syntax.OpRangeTypeMin: return m.vectorAggrWithRangeDownstreams(expr, vectorAggrPushdown, syntax.OpTypeMin, rangeInterval, recorder) case syntax.OpRangeTypeRate: + if labelExtractor && vectorAggrPushdown.Operation != syntax.OpTypeSum { + return expr + } return m.sumOverFullRange(expr, vectorAggrPushdown, syntax.OpRangeTypeCount, rangeInterval, recorder) case syntax.OpRangeTypeBytesRate: + if labelExtractor && vectorAggrPushdown.Operation != syntax.OpTypeSum { + return expr + } return m.sumOverFullRange(expr, vectorAggrPushdown, syntax.OpRangeTypeBytes, rangeInterval, recorder) default: // this should not be reachable. diff --git a/pkg/logql/rangemapper_test.go b/pkg/logql/rangemapper_test.go index 0f178d2393..fed9bb38a5 100644 --- a/pkg/logql/rangemapper_test.go +++ b/pkg/logql/rangemapper_test.go @@ -875,6 +875,500 @@ func Test_SplitRangeVectorMapping(t *testing.T) { )`, }, + // Label extraction stage + { + `max_over_time({app="foo"} | logfmt | unwrap bar [3m]) by (baz)`, + `max by (baz) ( + downstream> + ++ downstream> + ++ downstream> + )`, + }, + { + `min_over_time({app="foo"} | json | unwrap bar [3m]) by (baz)`, + `min by (baz) ( + downstream> + ++ downstream> + ++ downstream> + )`, + }, + { + `sum(bytes_over_time({app="foo"} | logfmt [3m]))`, + `sum( + downstream> + ++ downstream> + ++ downstream> + )`, + }, + { + `sum(count_over_time({app="foo"} | json [3m]))`, + `sum( + downstream> + ++ downstream> + ++ downstream> + )`, + }, + { + `sum(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `sum( + sum without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `sum(max_over_time({app="foo"} | json | unwrap bar [3m]))`, + `sum( + max without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `sum(max_over_time({app="foo"} | logfmt | unwrap bar [3m]) by (baz))`, + `sum( + max by (baz) ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `sum(min_over_time({app="foo"} | json | unwrap bar [3m]))`, + `sum( + min without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `sum(min_over_time({app="foo"} | logfmt | unwrap bar [3m]) by (baz))`, + `sum( + min by (baz) ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `sum(rate({app="foo"} | json [3m]))`, + `sum( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + )`, + }, + { + `sum(bytes_rate({app="foo"} | logfmt [3m]))`, + `sum( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + )`, + }, + { + `sum by (baz) (bytes_over_time({app="foo"} | json [3m]))`, + `sum by (baz) ( + downstream> + ++ downstream> + ++ downstream> + )`, + }, + { + `sum by (baz) (count_over_time({app="foo"} | logfmt [3m]))`, + `sum by (baz) ( + downstream> + ++ downstream> + ++ downstream> + )`, + }, + { + `sum by (baz) (sum_over_time({app="foo"} | json | unwrap bar [3m]))`, + `sum by (baz) ( + sum without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `sum by (baz) (max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `sum by (baz) ( + max without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `sum by (baz) (max_over_time({app="foo"} | json | unwrap bar [3m]) by (baz))`, + `sum by (baz) ( + max by (baz) ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `sum by (baz) (min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `sum by (baz) ( + min without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `sum by (baz) (min_over_time({app="foo"} | json | unwrap bar [3m]) by (baz))`, + `sum by (baz) ( + min by (baz) ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `sum by (baz) (rate({app="foo"} | logfmt [3m]))`, + `sum by (baz) ( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + )`, + }, + { + `sum by (baz) (bytes_rate({app="foo"} | json [3m]))`, + `sum by (baz) ( + (sum without ( + downstream> + ++ downstream> + ++ downstream> + ) / 180) + )`, + }, + { + `count(max_over_time({app="foo"} | logfmt | unwrap bar [3m]) by (baz))`, + `count( + max by (baz) ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `count(min_over_time({app="foo"} | json | unwrap bar [3m]) by (baz))`, + `count( + min by (baz) ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `count by (baz) (max_over_time({app="foo"} | logfmt | unwrap bar [3m]) by (baz))`, + `count by (baz) ( + max by (baz) ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `count by (baz) (min_over_time({app="foo"} | json | unwrap bar [3m]) by (baz))`, + `count by (baz) ( + min by (baz) ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `max(bytes_over_time({app="foo"} | logfmt [3m]))`, + `max( + downstream> + ++ downstream> + ++ downstream> + )`, + }, + { + `max(count_over_time({app="foo"} | json [3m]))`, + `max( + downstream> + ++ downstream> + ++ downstream> + )`, + }, + { + `max(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `max( + sum without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `max(max_over_time({app="foo"} | json | unwrap bar [3m]))`, + `max( + max without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `max(max_over_time({app="foo"} | logfmt | unwrap bar [3m]) by (baz))`, + `max( + max by (baz) ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `max(min_over_time({app="foo"} | json | unwrap bar [3m]))`, + `max( + min without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `max(min_over_time({app="foo"} | logfmt | unwrap bar [3m]) by (baz))`, + `max( + min by (baz) ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `max by (baz) (bytes_over_time({app="foo"} | json [3m]))`, + `max by (baz) ( + downstream> + ++ downstream> + ++ downstream> + )`, + }, + { + `max by (baz) (count_over_time({app="foo"} | logfmt [3m]))`, + `max by (baz) ( + downstream> + ++ downstream> + ++ downstream> + )`, + }, + { + `max by (baz) (sum_over_time({app="foo"} | json | unwrap bar [3m]))`, + `max by (baz) ( + sum without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `max by (baz) (max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `max by (baz) ( + max without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `max by (baz) (max_over_time({app="foo"} | json | unwrap bar [3m]) by (baz))`, + `max by (baz) ( + max by (baz) ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `max by (baz) (min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `max by (baz) ( + min without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `max by (baz) (min_over_time({app="foo"} | json | unwrap bar [3m]) by (baz))`, + `max by (baz) ( + min by (baz) ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `min(bytes_over_time({app="foo"} | logfmt [3m]))`, + `min( + downstream> + ++ downstream> + ++ downstream> + )`, + }, + { + `min(count_over_time({app="foo"} | json [3m]))`, + `min( + downstream> + ++ downstream> + ++ downstream> + )`, + }, + { + `min(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `min( + sum without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `min(max_over_time({app="foo"} | json | unwrap bar [3m]))`, + `min( + max without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `min(max_over_time({app="foo"} | logfmt | unwrap bar [3m]) by (baz))`, + `min( + max by (baz) ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `min(min_over_time({app="foo"} | json | unwrap bar [3m]))`, + `min( + min without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `min(min_over_time({app="foo"} | logfmt | unwrap bar [3m]) by (baz))`, + `min( + min by (baz) ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `min by (baz) (bytes_over_time({app="foo"} | json [3m]))`, + `min by (baz) ( + downstream> + ++ downstream> + ++ downstream> + )`, + }, + { + `min by (baz) (count_over_time({app="foo"} | logfmt [3m]))`, + `min by (baz) ( + downstream> + ++ downstream> + ++ downstream> + )`, + }, + { + `min by (baz) (sum_over_time({app="foo"} | json | unwrap bar [3m]))`, + `min by (baz) ( + sum without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `min by (baz) (max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `min by (baz) ( + max without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `min by (baz) (max_over_time({app="foo"} | json | unwrap bar [3m]) by (baz))`, + `min by (baz) ( + max by (baz) ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `min by (baz) (min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `min by (baz) ( + min without ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + { + `min by (baz) (min_over_time({app="foo"} | json | unwrap bar [3m]) by (baz))`, + `min by (baz) ( + min by (baz) ( + downstream> + ++ downstream> + ++ downstream> + ) + )`, + }, + // Binary operations { `bytes_over_time({app="foo"}[3m]) + count_over_time({app="foo"}[5m])`, @@ -933,14 +1427,12 @@ func Test_SplitRangeVectorMapping(t *testing.T) { )`, }, { - `sum (count_over_time({app="foo"} | logfmt | duration > 10s [3m])) / sum (count_over_time({app="foo"} [3m]))`, + `sum by (app) (count_over_time({app="foo"} | logfmt | duration > 10s [3m])) / sum (count_over_time({app="foo"} [3m]))`, `( - sum ( - sum without ( - downstream 10s [1m] offset 2m0s)), shard=> - ++ downstream 10s [1m] offset 1m0s)), shard=> - ++ downstream 10s [1m])), shard=> - ) + sum by (app) ( + downstream 10s [1m] offset 2m0s)), shard=> + ++ downstream 10s [1m] offset 1m0s)), shard=> + ++ downstream 10s [1m])), shard=> ) / sum ( @@ -979,98 +1471,6 @@ func Test_SplitRangeVectorMapping(t *testing.T) { ) )`, }, - - // outer vector aggregation is pushed down - { - `sum(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - `sum( - min without( - downstream> - ++ downstream> - ++ downstream> - ) - )`, - }, - { - `sum(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - `sum( - max without( - downstream> - ++ downstream> - ++ downstream> - ) - )`, - }, - { - `sum(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - `sum( - sum without( - downstream> - ++ downstream> - ++ downstream> - ) - )`, - }, - { - `min(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - `min( - min without( - downstream> - ++ downstream> - ++ downstream> - ) - )`, - }, - { - `min(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - `min( - max without( - downstream> - ++ downstream> - ++ downstream> - ) - )`, - }, - { - `min(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - `min( - sum without( - downstream> - ++ downstream> - ++ downstream> - ) - )`, - }, - { - `max(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - `max( - min without( - downstream> - ++ downstream> - ++ downstream> - ) - )`, - }, - { - `max(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - `max( - max without( - downstream> - ++ downstream> - ++ downstream> - ) - )`, - }, - { - `max(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, - `max( - sum without( - downstream> - ++ downstream> - ++ downstream> - ) - )`, - }, } { tc := tc t.Run(tc.expr, func(t *testing.T) { @@ -1101,7 +1501,7 @@ func Test_SplitRangeVectorMapping_Noop(t *testing.T) { `sum(avg_over_time({app="foo"} | unwrap bar[3m]))`, }, - // should be noop if range interval is slower or equal to split interval (1m) + // should be noop if range interval is lower or equal to split interval (1m) { `bytes_over_time({app="foo"}[1m])`, `bytes_over_time({app="foo"}[1m])`, @@ -1109,10 +1509,126 @@ func Test_SplitRangeVectorMapping_Noop(t *testing.T) { // should be noop if inner range aggregation includes a stage for label extraction such as `| json` or `| logfmt` // because otherwise the downstream queries would result in too many series + { + `bytes_over_time({app="foo"} | logfmt [3m])`, + `bytes_over_time({app="foo"} | logfmt [3m])`, + }, + { + `count_over_time({app="foo"} | json [3m])`, + `count_over_time({app="foo"} | json [3m])`, + }, + { + `sum_over_time({app="foo"} | logfmt | unwrap bar [3m])`, + `sum_over_time({app="foo"} | logfmt | unwrap bar [3m])`, + }, { `max_over_time({app="foo"} | json | unwrap bar [3m])`, `max_over_time({app="foo"} | json | unwrap bar [3m])`, }, + { + `min_over_time({app="foo"} | logfmt | unwrap bar [3m])`, + `min_over_time({app="foo"} | logfmt | unwrap bar [3m])`, + }, + { + `rate({app="foo"} | json [3m])`, + `rate({app="foo"} | json [3m])`, + }, + { + `bytes_rate({app="foo"} | logfmt [3m])`, + `bytes_rate({app="foo"} | logfmt [3m])`, + }, + // should be noop if inner range aggregation includes a stage for label extraction + // and the vector aggregator is count + { + `count(bytes_over_time({app="foo"} | logfmt [3m]))`, + `count(bytes_over_time({app="foo"} | logfmt [3m]))`, + }, + { + `count by (foo) (bytes_over_time({app="foo"} | json [3m]))`, + `count by (foo) (bytes_over_time({app="foo"} | json [3m]))`, + }, + { + `count(count_over_time({app="foo"} | logfmt [3m]))`, + `count(count_over_time({app="foo"} | logfmt [3m]))`, + }, + { + `count by (foo) (count_over_time({app="foo"} | json [3m]))`, + `count by (foo) (count_over_time({app="foo"} | json [3m]))`, + }, + { + `count(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `count(sum_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + }, + { + `count by (foo) (sum_over_time({app="foo"} | json | unwrap bar [3m]))`, + `count by (foo) (sum_over_time({app="foo"} | json | unwrap bar [3m]))`, + }, + { + `count(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `count(max_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + }, + { + `count by (foo) (max_over_time({app="foo"} | json | unwrap bar [3m]))`, + `count by (foo) (max_over_time({app="foo"} | json | unwrap bar [3m]))`, + }, + { + `count(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + `count(min_over_time({app="foo"} | logfmt | unwrap bar [3m]))`, + }, + { + `count by (foo) (min_over_time({app="foo"} | json | unwrap bar [3m]))`, + `count by (foo) (min_over_time({app="foo"} | json | unwrap bar [3m]))`, + }, + { + `count(rate({app="foo"} | logfmt [3m]))`, + `count(rate({app="foo"} | logfmt [3m]))`, + }, + { + `count by (foo) (rate({app="foo"} | json [3m]))`, + `count by (foo) (rate({app="foo"} | json [3m]))`, + }, + { + `count(bytes_rate({app="foo"} | logfmt [3m]))`, + `count(bytes_rate({app="foo"} | logfmt [3m]))`, + }, + { + `count by (foo) (bytes_rate({app="foo"} | json [3m]))`, + `count by (foo) (bytes_rate({app="foo"} | json [3m]))`, + }, + // should be noop if inner range aggregation includes a stage for label extraction + // the vector aggregator is max or min and the inner range aggregator is rate or bytes_rate + { + `max(rate({app="foo"} | logfmt [3m]))`, + `max(rate({app="foo"} | logfmt [3m]))`, + }, + { + `max by (foo) (rate({app="foo"} | json [3m]))`, + `max by (foo) (rate({app="foo"} | json [3m]))`, + }, + { + `max(bytes_rate({app="foo"} | logfmt [3m]))`, + `max(bytes_rate({app="foo"} | logfmt [3m]))`, + }, + { + `max by (foo) (bytes_rate({app="foo"} | json [3m]))`, + `max by (foo) (bytes_rate({app="foo"} | json [3m]))`, + }, + { + `min(rate({app="foo"} | logfmt [3m]))`, + `min(rate({app="foo"} | logfmt [3m]))`, + }, + { + `min by (foo) (rate({app="foo"} | json [3m]))`, + `min by (foo) (rate({app="foo"} | json [3m]))`, + }, + { + `min(bytes_rate({app="foo"} | logfmt [3m]))`, + `min(bytes_rate({app="foo"} | logfmt [3m]))`, + }, + { + `min by (foo) (bytes_rate({app="foo"} | json [3m]))`, + `min by (foo) (bytes_rate({app="foo"} | json [3m]))`, + }, // if one side of a binary expression is a noop, the full query is a noop as well {