Add label_replace and literal expressions to instant query split by range (#6515)

* Add label_replace logic to instant query split by range

* Add label_replace and literal expressions support
* Support binary operations with literal expressions

* Remove Literal expression as splittable

* Fix lint

* Address comments
pull/6534/head
Susana Ferreira 3 years ago committed by GitHub
parent 002dd8aa78
commit 79b2d01970
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      pkg/logql/downstream_test.go
  2. 36
      pkg/logql/rangemapper.go
  3. 60
      pkg/logql/rangemapper_test.go

@ -285,6 +285,8 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`min by (a) (min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second}, {`min by (a) (min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
// Binary operations // Binary operations
{`2 * bytes_over_time({a=~".+"}[3s])`, time.Second},
{`count_over_time({a=~".+"}[3s]) * 2`, time.Second},
{`bytes_over_time({a=~".+"}[3s]) + count_over_time({a=~".+"}[5s])`, time.Second}, {`bytes_over_time({a=~".+"}[3s]) + count_over_time({a=~".+"}[5s])`, time.Second},
{`sum(count_over_time({a=~".+"}[3s]) * count(sum_over_time({a=~".+"} | unwrap b [5s])))`, time.Second}, {`sum(count_over_time({a=~".+"}[3s]) * count(sum_over_time({a=~".+"} | unwrap b [5s])))`, time.Second},
{`sum by (a) (count_over_time({a=~".+"} | logfmt | line > 5 [3s])) / sum by (a) (count_over_time({a=~".+"} [3s]))`, time.Second}, {`sum by (a) (count_over_time({a=~".+"} | logfmt | line > 5 [3s])) / sum by (a) (count_over_time({a=~".+"} [3s]))`, time.Second},
@ -304,6 +306,10 @@ func TestRangeMappingEquivalence(t *testing.T) {
// range with offset // range with offset
{`rate({a=~".+"}[2s] offset 2s)`, time.Second}, {`rate({a=~".+"}[2s] offset 2s)`, time.Second},
// label_replace
{`label_replace(sum by (a) (count_over_time({a=~".+"}[3s])), "", "", "", "")`, time.Second},
{`label_replace(sum by (a) (count_over_time({a=~".+"}[3s])), "foo", "$1", "a", "(.*)")`, time.Second},
} { } {
q := NewMockQuerier( q := NewMockQuerier(
shards, shards,

@ -124,26 +124,40 @@ func (m RangeMapper) Map(expr syntax.SampleExpr, vectorAggrPushdown *syntax.Vect
if err != nil { if err != nil {
return nil, err return nil, err
} }
// if left hand side is a noop, we need to return the original expression // if left-hand side is a noop, we need to return the original expression
// so the whole expression is a noop and thus not executed using the // so the whole expression is a noop and thus not executed using the
// downstream engine // downstream engine.
if e.SampleExpr.String() == lhsMapped.String() { // Note: literal expressions are identical to their mapped expression,
// map binary expression if left-hand size is a literal
if _, ok := e.SampleExpr.(*syntax.LiteralExpr); e.SampleExpr.String() == lhsMapped.String() && !ok {
return e, nil return e, nil
} }
rhsMapped, err := m.Map(e.RHS, vectorAggrPushdown, recorder) rhsMapped, err := m.Map(e.RHS, vectorAggrPushdown, recorder)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// if right hand side is a noop, we need to return the original expression // if right-hand side is a noop, we need to return the original expression
// so the whole expression is a noop and thus not executed using the // so the whole expression is a noop and thus not executed using the
// downstream engine // downstream engine
if e.RHS.String() == rhsMapped.String() { // Note: literal expressions are identical to their mapped expression,
// map binary expression if right-hand size is a literal
if _, ok := e.RHS.(*syntax.LiteralExpr); e.RHS.String() == rhsMapped.String() && !ok {
return e, nil return e, nil
} }
e.SampleExpr = lhsMapped e.SampleExpr = lhsMapped
e.RHS = rhsMapped e.RHS = rhsMapped
return e, nil return e, nil
case *syntax.LabelReplaceExpr:
lhsMapped, err := m.Map(e.Left, vectorAggrPushdown, recorder)
if err != nil {
return nil, err
}
e.Left = lhsMapped
return e, nil
case *syntax.LiteralExpr:
return e, nil
default: default:
// ConcatSampleExpr and DownstreamSampleExpr are not supported input expression types
return nil, errors.Errorf("unexpected expr type (%T) for ASTMapper type (%T) ", expr, m) return nil, errors.Errorf("unexpected expr type (%T) for ASTMapper type (%T) ", expr, m)
} }
} }
@ -413,13 +427,17 @@ func isSplittableByRange(expr syntax.SampleExpr) bool {
case *syntax.VectorAggregationExpr: case *syntax.VectorAggregationExpr:
_, ok := splittableVectorOp[e.Operation] _, ok := splittableVectorOp[e.Operation]
return ok && isSplittableByRange(e.Left) return ok && isSplittableByRange(e.Left)
case *syntax.BinOpExpr:
return isSplittableByRange(e.SampleExpr) && isSplittableByRange(e.RHS)
case *syntax.LabelReplaceExpr:
return isSplittableByRange(e.Left)
case *syntax.RangeAggregationExpr: case *syntax.RangeAggregationExpr:
_, ok := splittableRangeVectorOp[e.Operation] _, ok := splittableRangeVectorOp[e.Operation]
return ok return ok
case *syntax.BinOpExpr:
_, literalLHS := e.SampleExpr.(*syntax.LiteralExpr)
_, literalRHS := e.RHS.(*syntax.LiteralExpr)
// Note: if both left-hand side and right-hand side are literal expressions,
// the syntax.ParseSampleExpr returns a literal expression
return isSplittableByRange(e.SampleExpr) || literalLHS && isSplittableByRange(e.RHS) || literalRHS
case *syntax.LabelReplaceExpr:
return isSplittableByRange(e.Left)
default: default:
return false return false
} }

@ -1378,6 +1378,28 @@ func Test_SplitRangeVectorMapping(t *testing.T) {
}, },
// Binary operations // Binary operations
{
`2 * bytes_over_time({app="foo"}[3m])`,
`(
2 *
sum without (
downstream<bytes_over_time({app="foo"}[1m] offset 2m0s), shard=<nil>>
++ downstream<bytes_over_time({app="foo"}[1m] offset 1m0s), shard=<nil>>
++ downstream<bytes_over_time({app="foo"}[1m]), shard=<nil>>
)
)`,
},
{
`count_over_time({app="foo"}[3m]) * 2`,
`(
sum without (
downstream<count_over_time({app="foo"}[1m] offset 2m0s), shard=<nil>>
++ downstream<count_over_time({app="foo"}[1m] offset 1m0s), shard=<nil>>
++ downstream<count_over_time({app="foo"}[1m]), shard=<nil>>
)
* 2
)`,
},
{ {
`bytes_over_time({app="foo"}[3m]) + count_over_time({app="foo"}[5m])`, `bytes_over_time({app="foo"}[3m]) + count_over_time({app="foo"}[5m])`,
`(sum without ( `(sum without (
@ -1496,6 +1518,34 @@ func Test_SplitRangeVectorMapping(t *testing.T) {
) )
)`, )`,
}, },
// label_replace
{
`label_replace(sum by (baz) (count_over_time({app="foo"}[3m])), "x", "$1", "a", "(.*)")`,
`label_replace(
sum by (baz) (
sum without (
downstream<sum by (baz) (count_over_time({app="foo"} [1m] offset 2m0s)), shard=<nil>>
++ downstream<sum by (baz) (count_over_time({app="foo"} [1m] offset 1m0s)), shard=<nil>>
++ downstream<sum by (baz) (count_over_time({app="foo"} [1m])), shard=<nil>>
)
),
"x", "$1", "a", "(.*)"
)`,
},
{
`label_replace(rate({job="api-server", service="a:c"} |= "err" [3m]), "foo", "$1", "service", "(.*):.*")`,
`label_replace(
(
sum without (
downstream<count_over_time({job="api-server",service="a:c"} |= "err" [1m] offset 2m0s), shard=<nil>>
++ downstream<count_over_time({job="api-server",service="a:c"} |= "err" [1m] offset 1m0s), shard=<nil>>
++ downstream<count_over_time({job="api-server",service="a:c"} |= "err" [1m]), shard=<nil>>
)
/ 180),
"foo", "$1", "service", "(.*):.*"
)`,
},
} { } {
tc := tc tc := tc
t.Run(tc.expr, func(t *testing.T) { t.Run(tc.expr, func(t *testing.T) {
@ -1660,6 +1710,16 @@ func Test_SplitRangeVectorMapping_Noop(t *testing.T) {
`sum by (foo) (sum_over_time({app="foo"} | json | unwrap bar [3m])) / sum_over_time({app="foo"} | json | unwrap bar [6m])`, `sum by (foo) (sum_over_time({app="foo"} | json | unwrap bar [3m])) / sum_over_time({app="foo"} | json | unwrap bar [6m])`,
`(sum by (foo) (sum_over_time({app="foo"} | json | unwrap bar [3m])) / sum_over_time({app="foo"} | json | unwrap bar [6m]))`, `(sum by (foo) (sum_over_time({app="foo"} | json | unwrap bar [3m])) / sum_over_time({app="foo"} | json | unwrap bar [6m]))`,
}, },
// should be noop if literal expression
{
`5`,
`5`,
},
{
`5 * 5`,
`25`,
},
} { } {
tc := tc tc := tc
t.Run(tc.expr, func(t *testing.T) { t.Run(tc.expr, func(t *testing.T) {

Loading…
Cancel
Save