Fix panic in instant query splitting when using unwrapped rate (#6348)

* Fix panic in instant query splitting when using unwrapped rate

The range aggregation `rate()` supports both log ranges and unwrapped
ranges, e.g.

`rate({app="foo"} [$__interval])`
and
`rate({app="foo"} | unwrap bar [$__interval])`

Since `rate()` was split into multiple `count_over_time()` over total
duration, but `count_over_time()` does not support `unwrap`, unwrapped
rate queries caused panics.

This fix changes the splitting of `rate({app="foo"} | unwrap bar [$__interval]`
into multiple `sum_over_time()` over total duration.

Fixes #6344

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Add tests

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Integrate review feedback

Co-authored-by: Susana Ferreira <susana.ferreira@grafana.com>

Co-authored-by: Susana Ferreira <susana.ferreira@grafana.com>
pull/6466/head
Christian Haudum 4 years ago committed by GitHub
parent 8dcc2d6e55
commit 798677ae7f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 11
      pkg/logql/downstream_test.go
  2. 14
      pkg/logql/rangemapper.go
  3. 29
      pkg/logql/rangemapper_test.go

@ -125,6 +125,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`min_over_time({a=~".+"} | unwrap b [2s])`, time.Second},
{`min_over_time({a=~".+"} | unwrap b [2s]) by (a)`, time.Second},
{`rate({a=~".+"}[2s])`, time.Second},
{`rate({a=~".+"} | unwrap b [2s])`, time.Second},
{`bytes_rate({a=~".+"}[2s])`, time.Second},
// sum
@ -136,6 +137,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`sum(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`sum(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`sum(rate({a=~".+"}[2s]))`, time.Second},
{`sum(rate({a=~".+"} | unwrap b [2s]))`, time.Second},
{`sum(bytes_rate({a=~".+"}[2s]))`, time.Second},
// sum by
@ -147,6 +149,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`sum by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`sum by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`sum by (a) (rate({a=~".+"}[2s]))`, time.Second},
{`sum by (a) (rate({a=~".+"} | unwrap b [2s]))`, time.Second},
{`sum by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second},
// count
@ -158,6 +161,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`count(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`count(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`count(rate({a=~".+"}[2s]))`, time.Second},
{`count(rate({a=~".+"} | unwrap b [2s]))`, time.Second},
{`count(bytes_rate({a=~".+"}[2s]))`, time.Second},
// count by
@ -169,6 +173,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`count by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`count by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`count by (a) (rate({a=~".+"}[2s]))`, time.Second},
{`count by (a) (rate({a=~".+"} | unwrap b [2s]))`, time.Second},
{`count by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second},
// max
@ -180,6 +185,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`max(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`max(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`max(rate({a=~".+"}[2s]))`, time.Second},
{`max(rate({a=~".+"} | unwrap b [2s]))`, time.Second},
{`max(bytes_rate({a=~".+"}[2s]))`, time.Second},
// max by
@ -191,6 +197,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`max by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`max by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`max by (a) (rate({a=~".+"}[2s]))`, time.Second},
{`max by (a) (rate({a=~".+"} | unwrap b [2s]))`, time.Second},
{`max by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second},
// min
@ -202,6 +209,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`min(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`min(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`min(rate({a=~".+"}[2s]))`, time.Second},
{`min(rate({a=~".+"} | unwrap b [2s]))`, time.Second},
{`min(bytes_rate({a=~".+"}[2s]))`, time.Second},
// min by
@ -213,6 +221,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`min by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`min by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`min by (a) (rate({a=~".+"}[2s]))`, time.Second},
{`min by (a) (rate({a=~".+"} | unwrap b [2s]))`, time.Second},
{`min by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second},
// Label extraction stage
@ -227,6 +236,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`sum(min_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second},
{`sum(min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`sum(rate({a=~".+"} | logfmt[2s]))`, time.Second},
{`sum(rate({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second},
{`sum(bytes_rate({a=~".+"} | logfmt[2s]))`, time.Second},
{`sum by (a) (bytes_over_time({a=~".+"} | logfmt [2s]))`, time.Second},
{`sum by (a) (count_over_time({a=~".+"} | logfmt [2s]))`, time.Second},
@ -236,6 +246,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`sum by (a) (min_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second},
{`sum by (a) (min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`sum by (a) (rate({a=~".+"} | logfmt[2s]))`, time.Second},
{`sum by (a) (rate({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second},
{`sum by (a) (bytes_rate({a=~".+"} | logfmt[2s]))`, time.Second},
{`count(max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},

@ -183,9 +183,11 @@ func hasLabelExtractionStage(expr syntax.SampleExpr) bool {
// sumOverFullRange returns an expression that sums up individual downstream queries (with preserving labels)
// and dividing it by the full range in seconds to calculate a rate value.
// The operation defines the range aggregation operation of the downstream queries.
// Example:
// Examples:
// rate({app="foo"}[2m])
// => (sum without (count_over_time({app="foo"}[1m]) ++ count_over_time({app="foo"}[1m]) offset 1m) / 120)
// rate({app="foo"} | unwrap bar [2m])
// => (sum without (sum_over_time({app="foo"}[1m]) ++ sum_over_time({app="foo"}[1m]) offset 1m) / 120)
func (m RangeMapper) sumOverFullRange(expr *syntax.RangeAggregationExpr, overrideDownstream *syntax.VectorAggregationExpr, operation string, rangeInterval time.Duration, recorder *downstreamRecorder) syntax.SampleExpr {
var downstreamExpr syntax.SampleExpr = &syntax.RangeAggregationExpr{
Left: expr.Left,
@ -373,7 +375,15 @@ func (m RangeMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr,
if labelExtractor && vectorAggrPushdown.Operation != syntax.OpTypeSum {
return expr
}
return m.sumOverFullRange(expr, vectorAggrPushdown, syntax.OpRangeTypeCount, rangeInterval, recorder)
// rate({app="foo"}[2m]) =>
// => (sum without (count_over_time({app="foo"}[1m]) ++ count_over_time({app="foo"}[1m]) offset 1m) / 120)
op := syntax.OpRangeTypeCount
if expr.Left.Unwrap != nil {
// rate({app="foo"} | unwrap bar [2m])
// => (sum without (sum_over_time({app="foo"}[1m]) ++ sum_over_time({app="foo"}[1m]) offset 1m) / 120)
op = syntax.OpRangeTypeSum
}
return m.sumOverFullRange(expr, vectorAggrPushdown, op, rangeInterval, recorder)
case syntax.OpRangeTypeBytesRate:
if labelExtractor && vectorAggrPushdown.Operation != syntax.OpTypeSum {
return expr

@ -130,6 +130,14 @@ func Test_SplitRangeVectorMapping(t *testing.T) {
++ downstream<count_over_time({app="foo"}[1m]), shard=<nil>>
) / 180)`,
},
{
`rate({app="foo"} | unwrap bar[3m])`,
`(sum without(
downstream<sum_over_time({app="foo"} | unwrap bar [1m] offset 2m0s), shard=<nil>>
++ downstream<sum_over_time({app="foo"} | unwrap bar [1m] offset 1m0s), shard=<nil>>
++ downstream<sum_over_time({app="foo"} | unwrap bar [1m]), shard=<nil>>
) / 180)`,
},
{
`bytes_rate({app="foo"}[3m])`,
`(sum without(
@ -1471,6 +1479,23 @@ func Test_SplitRangeVectorMapping(t *testing.T) {
)
)`,
},
// regression test queries
{
`topk(10,sum by (org_id) (rate({container="query-frontend",namespace="loki"} |= "metrics.go" | logfmt | unwrap bytes(total_bytes) | __error__="" [3m])))`,
`topk(10,
sum by (org_id) (
(
sum without(
downstream<sum by(org_id)(sum_over_time({container="query-frontend",namespace="loki"} |= "metrics.go" | logfmt | unwrap bytes(total_bytes) | __error__="" [1m] offset 2m0s)),shard=<nil>>
++ downstream<sum by(org_id)(sum_over_time({container="query-frontend",namespace="loki"} |= "metrics.go" | logfmt | unwrap bytes(total_bytes) | __error__="" [1m] offset 1m0s)),shard=<nil>>
++ downstream<sum by(org_id)(sum_over_time({container="query-frontend",namespace="loki"} |= "metrics.go" | logfmt | unwrap bytes(total_bytes) | __error__="" [1m])),shard=<nil>>
)
/ 180
)
)
)`,
},
} {
tc := tc
t.Run(tc.expr, func(t *testing.T) {
@ -1500,10 +1525,6 @@ func Test_SplitRangeVectorMapping_Noop(t *testing.T) {
`sum(avg_over_time({app="foo"} | unwrap bar[3m]))`,
`sum(avg_over_time({app="foo"} | unwrap bar[3m]))`,
},
{ // this query caused a panic in ops
`topk(10,sum by (cluster,org_id) (rate({container="query-frontend",namespace="loki-prod",cluster="prod-us-central-0"} |= "metrics.go" | logfmt | unwrap bytes(total_bytes) | __error__=""[1h])))`,
`topk(10,sum by (cluster,org_id) (rate({container="query-frontend",namespace="loki-prod",cluster="prod-us-central-0"} |= "metrics.go" | logfmt | unwrap bytes(total_bytes) | __error__=""[1h])))`,
},
// should be noop if range interval is lower or equal to split interval (1m)
{

Loading…
Cancel
Save