correctly handle unpack when reordering stages (#9936)

The optimization to reorder stages doesn't take into account the
`unpack` stage. When line filters come after unpack stages, they
shouldn't be moved.
pull/9949/head^2
Travis Patterson 3 years ago committed by GitHub
parent 9e22a89bcb
commit 6dc722bc9f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 15
      pkg/logql/syntax/ast.go
  3. 10
      pkg/logql/syntax/ast_test.go

@ -63,6 +63,7 @@
* [9754](https://github.com/grafana/loki/pull/9754) **ashwanthgoli**: Fixes an issue with indexes becoming unqueriable if the index prefix is different from the one configured in the latest period config.
* [9763](https://github.com/grafana/loki/pull/9763) **ssncferreira**: Fix the logic of the `offset` operator for downstream queries on instant query splitting of (range) vector aggregation expressions containing an offset.
* [9773](https://github.com/grafana/loki/pull/9773) **ssncferreira**: Fix instant query summary statistic's `splits` corresponding to the number of subqueries a query is split into based on `split_queries_by_interval`.
* [9936](https://github.com/grafana/loki/pull/9936) **masslessparticle**: Fix the way query stages are reordered when `unpack` is present.
##### Changes

@ -120,6 +120,21 @@ func (m MultiStageExpr) reorderStages() []StageExpr {
filters = filters[:0]
rest = rest[:0]
case *LabelParserExpr:
rest = append(rest, f)
// unpack modifies the contents of the line so any line filter
// originally after an unpack must still be after the same
// unpack.
if f.Op == OpParserTypeUnpack {
if len(filters) > 0 {
result = append(result, combineFilters(filters))
}
result = append(result, rest...)
filters = filters[:0]
rest = rest[:0]
}
default:
rest = append(rest, f)
}

@ -652,6 +652,16 @@ func TestFilterReodering(t *testing.T) {
require.Len(t, stages, 5)
require.Equal(t, `|= "foo" |= "next" |= "bar" |= "baz" | logfmt | line_format "{{.foo}}" |= "1" |= "2" |= "3" | logfmt`, MultiStageExpr(stages).String())
})
t.Run("unpack test", func(t *testing.T) {
logExpr := `{container_name="app"} |= "06497595" | unpack != "message" | json | line_format "new log: {{.foo}}"`
l, err := ParseExpr(logExpr)
require.NoError(t, err)
stages := l.(*PipelineExpr).MultiStages.reorderStages()
require.Len(t, stages, 5)
require.Equal(t, `|= "06497595" | unpack != "message" | json | line_format "new log: {{.foo}}"`, MultiStageExpr(stages).String())
})
}
var result bool

Loading…
Cancel
Save