fix(engine): Fix `Walk()` function implementation on various `Expr` implementations (#16033)

The `Walk(f WalkFn)` implementation expects to first visit the current node and then invoke `Walk(f)` on all its children if they are not `nil`.

This also fixes the `checkIntervalLimit(syntax.SampleExpr, time.Duration)` function, which did not visit the expression if it was a `*ConcatSampleExpr`.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/16080/head
Christian Haudum 1 year ago committed by GitHub
parent 3a02d64a04
commit 3888866616
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 33
      pkg/logql/downstream.go
  2. 7
      pkg/logql/engine.go
  3. 39
      pkg/logql/engine_test.go
  4. 46
      pkg/logql/syntax/ast.go
  5. 6
      pkg/logql/syntax/walk.go
  6. 4
      pkg/logql/syntax/walk_test.go
  7. 2
      pkg/querier/queryrange/roundtrip_test.go

@ -139,7 +139,12 @@ func (d DownstreamLogSelectorExpr) Pretty(level int) string {
return s
}
func (d DownstreamSampleExpr) Walk(f syntax.WalkFn) { f(d) }
func (d DownstreamSampleExpr) Walk(f syntax.WalkFn) {
f(d)
if d.SampleExpr != nil {
d.SampleExpr.Walk(f)
}
}
var defaultMaxDepth = 4
@ -173,7 +178,12 @@ func (c *ConcatSampleExpr) string(maxDepth int) string {
func (c *ConcatSampleExpr) Walk(f syntax.WalkFn) {
f(c)
f(c.next)
if c.SampleExpr != nil {
c.SampleExpr.Walk(f)
}
if c.next != nil {
c.next.Walk(f)
}
}
// ConcatSampleExpr has no LogQL repretenstation. It is expressed in in the
@ -271,7 +281,12 @@ func (e QuantileSketchEvalExpr) String() string {
func (e *QuantileSketchEvalExpr) Walk(f syntax.WalkFn) {
f(e)
e.quantileMergeExpr.Walk(f)
if e.SampleExpr != nil {
e.SampleExpr.Walk(f)
}
if e.quantileMergeExpr != nil {
e.quantileMergeExpr.Walk(f)
}
}
type QuantileSketchMergeExpr struct {
@ -297,6 +312,9 @@ func (e QuantileSketchMergeExpr) String() string {
func (e *QuantileSketchMergeExpr) Walk(f syntax.WalkFn) {
f(e)
if e.SampleExpr != nil {
e.SampleExpr.Walk(f)
}
for _, d := range e.downstreams {
d.Walk(f)
}
@ -326,6 +344,9 @@ func (e MergeFirstOverTimeExpr) String() string {
func (e *MergeFirstOverTimeExpr) Walk(f syntax.WalkFn) {
f(e)
if e.SampleExpr != nil {
e.SampleExpr.Walk(f)
}
for _, d := range e.downstreams {
d.Walk(f)
}
@ -355,6 +376,9 @@ func (e MergeLastOverTimeExpr) String() string {
func (e *MergeLastOverTimeExpr) Walk(f syntax.WalkFn) {
f(e)
if e.SampleExpr != nil {
e.SampleExpr.Walk(f)
}
for _, d := range e.downstreams {
d.Walk(f)
}
@ -383,6 +407,9 @@ func (e CountMinSketchEvalExpr) String() string {
func (e *CountMinSketchEvalExpr) Walk(f syntax.WalkFn) {
f(e)
if e.SampleExpr != nil {
e.SampleExpr.Walk(f)
}
for _, d := range e.downstreams {
d.Walk(f)
}

@ -489,11 +489,10 @@ func (q *query) checkIntervalLimit(expr syntax.SampleExpr, limit time.Duration)
var err error
expr.Walk(func(e syntax.Expr) {
switch e := e.(type) {
case *syntax.RangeAggregationExpr:
if e.Left == nil || e.Left.Interval <= limit {
return
case *syntax.LogRange:
if e.Interval > limit {
err = fmt.Errorf("%w: [%s] > [%s]", logqlmodel.ErrIntervalLimit, model.Duration(e.Interval), model.Duration(limit))
}
err = fmt.Errorf("%w: [%s] > [%s]", logqlmodel.ErrIntervalLimit, model.Duration(e.Left.Interval), model.Duration(limit))
}
})
return err

@ -38,6 +38,45 @@ var (
ErrMockMultiple = util.MultiError{ErrMock, ErrMock}
)
func TestEngine_checkIntervalLimit(t *testing.T) {
q := &query{}
for _, tc := range []struct {
query string
expErr string
}{
{query: `rate({app="foo"} [1m])`, expErr: ""},
{query: `rate({app="foo"} [10m])`, expErr: ""},
{query: `max(rate({app="foo"} [5m])) - max(rate({app="bar"} [10m]))`, expErr: ""},
{query: `rate({app="foo"} [5m]) - rate({app="bar"} [15m])`, expErr: "[15m] > [10m]"},
{query: `rate({app="foo"} [1h])`, expErr: "[1h] > [10m]"},
{query: `sum(rate({app="foo"} [1h]))`, expErr: "[1h] > [10m]"},
{query: `sum_over_time({app="foo"} |= "foo" | json | unwrap bar [1h])`, expErr: "[1h] > [10m]"},
} {
for _, downstream := range []bool{true, false} {
t.Run(fmt.Sprintf("%v/downstream=%v", tc.query, downstream), func(t *testing.T) {
expr := syntax.MustParseExpr(tc.query).(syntax.SampleExpr)
if downstream {
// Simulate downstream expression
expr = &ConcatSampleExpr{
DownstreamSampleExpr: DownstreamSampleExpr{
shard: nil,
SampleExpr: expr,
},
next: nil,
}
}
err := q.checkIntervalLimit(expr, 10*time.Minute)
if tc.expErr != "" {
require.ErrorContains(t, err, tc.expErr)
} else {
require.NoError(t, err)
}
})
}
}
}
func TestEngine_LogsRateUnwrap(t *testing.T) {
t.Parallel()
for _, test := range []struct {

@ -341,16 +341,12 @@ func (e *PipelineExpr) Shardable(topLevel bool) bool {
func (e *PipelineExpr) Walk(f WalkFn) {
f(e)
if e.Left == nil {
return
if e.Left != nil {
e.Left.Walk(f)
}
xs := make([]Walkable, 0, len(e.MultiStages)+1)
xs = append(xs, e.Left)
for _, p := range e.MultiStages {
xs = append(xs, p)
p.Walk(f)
}
walkAll(f, xs...)
}
func (e *PipelineExpr) Accept(v RootVisitor) { v.VisitPipeline(e) }
@ -501,10 +497,12 @@ func (*LineFilterExpr) isStageExpr() {}
func (e *LineFilterExpr) Walk(f WalkFn) {
f(e)
if e.Left == nil {
return
if e.Left != nil {
e.Left.Walk(f)
}
if e.Or != nil {
e.Or.Walk(f)
}
e.Left.Walk(f)
}
func (e *LineFilterExpr) Accept(v RootVisitor) {
@ -1153,10 +1151,9 @@ func (r *LogRange) Shardable(topLevel bool) bool { return r.Left.Shardable(topLe
func (r *LogRange) Walk(f WalkFn) {
f(r)
if r.Left == nil {
return
if r.Left != nil {
r.Left.Walk(f)
}
r.Left.Walk(f)
}
func (r *LogRange) Accept(v RootVisitor) {
@ -1476,10 +1473,9 @@ func (e *RangeAggregationExpr) Shardable(topLevel bool) bool {
func (e *RangeAggregationExpr) Walk(f WalkFn) {
f(e)
if e.Left == nil {
return
if e.Left != nil {
e.Left.Walk(f)
}
e.Left.Walk(f)
}
func (e *RangeAggregationExpr) Accept(v RootVisitor) { v.VisitRangeAggregation(e) }
@ -1686,10 +1682,9 @@ func (e *VectorAggregationExpr) Shardable(topLevel bool) bool {
func (e *VectorAggregationExpr) Walk(f WalkFn) {
f(e)
if e.Left == nil {
return
if e.Left != nil {
e.Left.Walk(f)
}
e.Left.Walk(f)
}
func (e *VectorAggregationExpr) Accept(v RootVisitor) { v.VisitVectorAggregation(e) }
@ -1806,7 +1801,13 @@ func (e *BinOpExpr) Shardable(topLevel bool) bool {
}
func (e *BinOpExpr) Walk(f WalkFn) {
walkAll(f, e.SampleExpr, e.RHS)
f(e)
if e.SampleExpr != nil {
e.SampleExpr.Walk(f)
}
if e.RHS != nil {
e.RHS.Walk(f)
}
}
func (e *BinOpExpr) Accept(v RootVisitor) { v.VisitBinOp(e) }
@ -2235,10 +2236,9 @@ func (e *LabelReplaceExpr) Shardable(_ bool) bool {
func (e *LabelReplaceExpr) Walk(f WalkFn) {
f(e)
if e.Left == nil {
return
if e.Left != nil {
e.Left.Walk(f)
}
e.Left.Walk(f)
}
func (e *LabelReplaceExpr) Accept(v RootVisitor) { v.VisitLabelReplace(e) }

@ -2,12 +2,6 @@ package syntax
type WalkFn = func(e Expr)
func walkAll(f WalkFn, xs ...Walkable) {
for _, x := range xs {
x.Walk(f)
}
}
type Walkable interface {
Walk(f WalkFn)
}

@ -22,7 +22,7 @@ func Test_Walkable(t *testing.T) {
{
desc: "bin op query",
expr: `(sum by(cluster)(rate({job="foo"} |= "bar" | logfmt | bazz="buzz"[5m])) / sum by(cluster)(rate({job="foo"} |= "bar" | logfmt | bazz="buzz"[5m])))`,
want: 16,
want: 17,
},
}
for _, test := range tests {
@ -79,8 +79,6 @@ func Test_AppendMatchers(t *testing.T) {
switch me := e.(type) {
case *MatchersExpr:
me.AppendMatchers(test.matchers)
default:
// Do nothing
}
})
require.Equal(t, test.want, expr.String())

@ -1464,7 +1464,7 @@ func (f fakeLimits) MaxQueryLength(context.Context, string) time.Duration {
}
func (f fakeLimits) MaxQueryRange(context.Context, string) time.Duration {
return time.Second
return time.Hour
}
func (f fakeLimits) MaxQueryParallelism(context.Context, string) int {

Loading…
Cancel
Save