From ddaa497554dd1f9b7e20713384c034d66f024f42 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Fri, 9 Feb 2024 14:03:57 +0100 Subject: [PATCH] Prettify concat and downstream expressions. (#11877) **What this PR does / why we need it**: Prettifying `ConcatSampleExpr` et al will simplify debugging the shard mapper. **Checklist** - [ ] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [x] Tests updated - [ ] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/setup/upgrade/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213) - [ ] If the change is deprecating or removing a configuration option, update the `deprecated-config.yaml` and `deleted-config.yaml` files respectively in the `tools/deprecated-config-checker` directory. [Example PR](https://github.com/grafana/loki/pull/10840/commits/0d4416a4b03739583349934b96f272fb4f685d15) --- pkg/logql/downstream.go | 112 ++++++++++++++++++++++-- pkg/logql/downstream_test.go | 141 ++++++++++++++++++++++++++++++ pkg/logql/shardmapper_test.go | 29 ++++++ pkg/logql/syntax/prettier.go | 64 +++++++------- pkg/logql/syntax/prettier_test.go | 8 +- 5 files changed, 313 insertions(+), 41 deletions(-) diff --git a/pkg/logql/downstream.go b/pkg/logql/downstream.go index e29b47054f..33d945f11b 100644 --- a/pkg/logql/downstream.go +++ b/pkg/logql/downstream.go @@ -83,6 +83,29 @@ func (d DownstreamSampleExpr) String() string { return fmt.Sprintf("downstream<%s, shard=%s>", d.SampleExpr.String(), d.shard) } +// The DownstreamSampleExpr is not part of LogQL. In the prettified version it's +// represented as e.g. `downstream` +func (d DownstreamSampleExpr) Pretty(level int) string { + s := syntax.Indent(level) + if !syntax.NeedSplit(d) { + return s + d.String() + } + + s += "downstream<\n" + + s += d.SampleExpr.Pretty(level + 1) + s += ",\n" + s += syntax.Indent(level+1) + "shard=" + if d.shard != nil { + s += d.shard.String() + "\n" + } else { + s += "nil\n" + } + + s += syntax.Indent(level) + ">" + return s +} + // DownstreamLogSelectorExpr is a LogSelectorExpr which signals downstream computation type DownstreamLogSelectorExpr struct { shard *astmapper.ShardAnnotation @@ -93,6 +116,29 @@ func (d DownstreamLogSelectorExpr) String() string { return fmt.Sprintf("downstream<%s, shard=%s>", d.LogSelectorExpr.String(), d.shard) } +// The DownstreamLogSelectorExpr is not part of LogQL. In the prettified version it's +// represented as e.g. `downstream<{foo="bar"} |= "error", shard=1_of_3>` +func (d DownstreamLogSelectorExpr) Pretty(level int) string { + s := syntax.Indent(level) + if !syntax.NeedSplit(d) { + return s + d.String() + } + + s += "downstream<\n" + + s += d.LogSelectorExpr.Pretty(level + 1) + s += ",\n" + s += syntax.Indent(level+1) + "shard=" + if d.shard != nil { + s += d.shard.String() + "\n" + } else { + s += "nil\n" + } + + s += syntax.Indent(level) + ">" + return s +} + func (d DownstreamSampleExpr) Walk(f syntax.WalkFn) { f(d) } var defaultMaxDepth = 4 @@ -105,7 +151,7 @@ type ConcatSampleExpr struct { next *ConcatSampleExpr } -func (c ConcatSampleExpr) String() string { +func (c *ConcatSampleExpr) String() string { if c.next == nil { return c.DownstreamSampleExpr.String() } @@ -115,7 +161,7 @@ func (c ConcatSampleExpr) String() string { // in order to not display huge queries with thousands of shards, // we can limit the number of stringified subqueries. -func (c ConcatSampleExpr) string(maxDepth int) string { +func (c *ConcatSampleExpr) string(maxDepth int) string { if c.next == nil { return c.DownstreamSampleExpr.String() } @@ -125,18 +171,46 @@ func (c ConcatSampleExpr) string(maxDepth int) string { return fmt.Sprintf("%s ++ %s", c.DownstreamSampleExpr.String(), c.next.string(maxDepth-1)) } -func (c ConcatSampleExpr) Walk(f syntax.WalkFn) { +func (c *ConcatSampleExpr) Walk(f syntax.WalkFn) { f(c) f(c.next) } +// ConcatSampleExpr has no LogQL repretenstation. It is expressed in in the +// prettified version as e.g. `concat(downstream ++ )` +func (c *ConcatSampleExpr) Pretty(level int) string { + s := syntax.Indent(level) + if !syntax.NeedSplit(c) { + return s + c.String() + } + + s += "concat(\n" + + head := c + for i := 0; i < defaultMaxDepth && head != nil; i++ { + if i > 0 { + s += syntax.Indent(level+1) + "++\n" + } + s += head.DownstreamSampleExpr.Pretty(level + 1) + s += "\n" + head = head.next + } + // There are more downstream samples... + if head != nil { + s += syntax.Indent(level+1) + "++ ...\n" + } + s += syntax.Indent(level) + ")" + + return s +} + // ConcatLogSelectorExpr is an expr for concatenating multiple LogSelectorExpr type ConcatLogSelectorExpr struct { DownstreamLogSelectorExpr next *ConcatLogSelectorExpr } -func (c ConcatLogSelectorExpr) String() string { +func (c *ConcatLogSelectorExpr) String() string { if c.next == nil { return c.DownstreamLogSelectorExpr.String() } @@ -146,7 +220,7 @@ func (c ConcatLogSelectorExpr) String() string { // in order to not display huge queries with thousands of shards, // we can limit the number of stringified subqueries. -func (c ConcatLogSelectorExpr) string(maxDepth int) string { +func (c *ConcatLogSelectorExpr) string(maxDepth int) string { if c.next == nil { return c.DownstreamLogSelectorExpr.String() } @@ -156,6 +230,34 @@ func (c ConcatLogSelectorExpr) string(maxDepth int) string { return fmt.Sprintf("%s ++ %s", c.DownstreamLogSelectorExpr.String(), c.next.string(maxDepth-1)) } +// ConcatLogSelectorExpr has no representation in LogQL. Its prettified version +// is e.g. `concat(downstream<{foo="bar"} |= "error", shard=1_of_3>)` +func (c *ConcatLogSelectorExpr) Pretty(level int) string { + s := syntax.Indent(level) + if !syntax.NeedSplit(c) { + return s + c.String() + } + + s += "concat(\n" + + head := c + for i := 0; i < defaultMaxDepth && head != nil; i++ { + if i > 0 { + s += syntax.Indent(level+1) + "++\n" + } + s += head.DownstreamLogSelectorExpr.Pretty(level + 1) + s += "\n" + head = head.next + } + // There are more downstream samples... + if head != nil { + s += syntax.Indent(level+1) + "++ ...\n" + } + s += ")" + + return s +} + // QuantileSketchEvalExpr evaluates a quantile sketch to the actual quantile. type QuantileSketchEvalExpr struct { syntax.SampleExpr diff --git a/pkg/logql/downstream_test.go b/pkg/logql/downstream_test.go index 426722a554..ec5f317046 100644 --- a/pkg/logql/downstream_test.go +++ b/pkg/logql/downstream_test.go @@ -8,12 +8,14 @@ import ( "github.com/go-kit/log" "github.com/grafana/dskit/user" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/syntax" + "github.com/grafana/loki/pkg/querier/astmapper" ) var nilShardMetrics = NewShardMapperMetrics(nil) @@ -543,3 +545,142 @@ func relativeError(t *testing.T, expected, actual promql.Matrix, alpha float64) require.InEpsilonSlice(t, e, a, alpha) } } + +func TestFormat_ShardedExpr(t *testing.T) { + oldMax := syntax.MaxCharsPerLine + syntax.MaxCharsPerLine = 20 + + oldDefaultDepth := defaultMaxDepth + defaultMaxDepth = 2 + defer func() { + syntax.MaxCharsPerLine = oldMax + defaultMaxDepth = oldDefaultDepth + }() + + cases := []struct { + name string + in syntax.Expr + exp string + }{ + { + name: "ConcatSampleExpr", + in: &ConcatSampleExpr{ + DownstreamSampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 0, + Of: 3, + }, + SampleExpr: &syntax.RangeAggregationExpr{ + Operation: syntax.OpRangeTypeRate, + Left: &syntax.LogRange{ + Left: &syntax.MatchersExpr{ + Mts: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}, + }, + Interval: time.Minute, + }, + }, + }, + next: &ConcatSampleExpr{ + DownstreamSampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 3, + }, + SampleExpr: &syntax.RangeAggregationExpr{ + Operation: syntax.OpRangeTypeRate, + Left: &syntax.LogRange{ + Left: &syntax.MatchersExpr{ + Mts: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}, + }, + Interval: time.Minute, + }, + }, + }, + next: &ConcatSampleExpr{ + DownstreamSampleExpr: DownstreamSampleExpr{ + shard: &astmapper.ShardAnnotation{ + Shard: 1, + Of: 3, + }, + SampleExpr: &syntax.RangeAggregationExpr{ + Operation: syntax.OpRangeTypeRate, + Left: &syntax.LogRange{ + Left: &syntax.MatchersExpr{ + Mts: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}, + }, + Interval: time.Minute, + }, + }, + }, + next: nil, + }, + }, + }, + exp: `concat( + downstream< + rate( + {foo="bar"} [1m] + ), + shard=0_of_3 + > + ++ + downstream< + rate( + {foo="bar"} [1m] + ), + shard=1_of_3 + > + ++ ... +)`, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + got := syntax.Prettify(c.in) + assert.Equal(t, c.exp, got) + }) + } +} + +func TestPrettierWithoutShards(t *testing.T) { + q := `((quantile_over_time(0.5,{foo="bar"} | json | unwrap bytes[1d]) by (cluster) > 42) and (count by (cluster)(max_over_time({foo="baz"} |= "error" | json | unwrap bytes[1d]) by (cluster,namespace)) > 10))` + e := syntax.MustParseExpr(q) + + mapper := NewShardMapper(ConstantShards(4), nilShardMetrics, []string{}) + _, _, mapped, err := mapper.Parse(e) + require.NoError(t, err) + got := syntax.Prettify(mapped) + expected := ` downstream> + > + 42 +and + count by (cluster)( + max by (cluster, namespace)( + concat( + downstream< + max_over_time({foo="baz"} |= "error" | json | unwrap bytes[1d]) by (cluster,namespace), + shard=0_of_4 + > + ++ + downstream< + max_over_time({foo="baz"} |= "error" | json | unwrap bytes[1d]) by (cluster,namespace), + shard=1_of_4 + > + ++ + downstream< + max_over_time({foo="baz"} |= "error" | json | unwrap bytes[1d]) by (cluster,namespace), + shard=2_of_4 + > + ++ + downstream< + max_over_time({foo="baz"} |= "error" | json | unwrap bytes[1d]) by (cluster,namespace), + shard=3_of_4 + > + ) + ) + ) + > + 10` + assert.Equal(t, expected, got) +} diff --git a/pkg/logql/shardmapper_test.go b/pkg/logql/shardmapper_test.go index 96955109a9..0e345291ee 100644 --- a/pkg/logql/shardmapper_test.go +++ b/pkg/logql/shardmapper_test.go @@ -1598,3 +1598,32 @@ func TestStringTrimming(t *testing.T) { func float64p(v float64) *float64 { return &v } + +func TestShardTopk(t *testing.T) { + expr := `topk( + 10, + sum by (ip) ( + sum_over_time({job="foo"} | json | unwrap bytes(bytes)[1m]) + ) + )` + m := NewShardMapper(ConstantShards(5), nilShardMetrics, []string{ShardQuantileOverTime}) + _, _, mappedExpr, err := m.Parse(syntax.MustParseExpr(expr)) + require.NoError(t, err) + + expected := `topk( + 10, + sum by (ip)( + concat( + downstream + ++ + downstream + ++ + downstream + ++ + downstream + ++ ... + ) + ) +)` + require.Equal(t, expected, mappedExpr.Pretty(0)) +} diff --git a/pkg/logql/syntax/prettier.go b/pkg/logql/syntax/prettier.go index cf346e26c5..1b40745385 100644 --- a/pkg/logql/syntax/prettier.go +++ b/pkg/logql/syntax/prettier.go @@ -35,8 +35,8 @@ import ( // var ( - // maxCharsPerLine is used to qualify whether some LogQL expressions are worth `splitting` into new lines. - maxCharsPerLine = 100 + // MaxCharsPerLine is used to qualify whether some LogQL expressions are worth `splitting` into new lines. + MaxCharsPerLine = 100 ) func Prettify(e Expr) string { @@ -51,8 +51,8 @@ func (e *MatchersExpr) Pretty(level int) string { // e.g: `{foo="bar"} | logfmt | level="error"` // Here, left = `{foo="bar"}` and multistages would collection of each stage in pipeline, here `logfmt` and `level="error"` func (e *PipelineExpr) Pretty(level int) string { - if !needSplit(e) { - return indent(level) + e.String() + if !NeedSplit(e) { + return Indent(level) + e.String() } s := fmt.Sprintf("%s\n", e.Left.Pretty(level)) @@ -73,8 +73,8 @@ func (e *PipelineExpr) Pretty(level int) string { // e.g: `|= "error" != "memcache" |= ip("192.168.0.1")` // NOTE: here `ip` is Op in this expression. func (e *LineFilterExpr) Pretty(level int) string { - if !needSplit(e) { - return indent(level) + e.String() + if !NeedSplit(e) { + return Indent(level) + e.String() } var s string @@ -90,7 +90,7 @@ func (e *LineFilterExpr) Pretty(level int) string { s += "\n" } - s += indent(level) + s += Indent(level) // We re-use LineFilterExpr's String() implementation to avoid duplication. // We create new LineFilterExpr without `Left`. @@ -153,7 +153,7 @@ func (e *LogfmtExpressionParser) Pretty(level int) string { // e.g: sum_over_time({foo="bar"} | logfmt | unwrap bytes_processed [5m]) func (e *UnwrapExpr) Pretty(level int) string { - s := indent(level) + s := Indent(level) if e.Operation != "" { s += fmt.Sprintf("%s %s %s(%s)", OpPipe, OpUnwrap, e.Operation, e.Identifier) @@ -161,7 +161,7 @@ func (e *UnwrapExpr) Pretty(level int) string { s += fmt.Sprintf("%s %s %s", OpPipe, OpUnwrap, e.Identifier) } for _, f := range e.PostFilters { - s += fmt.Sprintf("\n%s%s %s", indent(level), OpPipe, f) + s += fmt.Sprintf("\n%s%s %s", Indent(level), OpPipe, f) } return s } @@ -200,8 +200,8 @@ func (e *OffsetExpr) Pretty(_ int) string { // e.g: count_over_time({foo="bar"}[5m]) func (e *RangeAggregationExpr) Pretty(level int) string { - s := indent(level) - if !needSplit(e) { + s := Indent(level) + if !NeedSplit(e) { return s + e.String() } @@ -211,13 +211,13 @@ func (e *RangeAggregationExpr) Pretty(level int) string { // print args to the function. if e.Params != nil { - s = fmt.Sprintf("%s%s%s,", s, indent(level+1), fmt.Sprint(*e.Params)) + s = fmt.Sprintf("%s%s%s,", s, Indent(level+1), fmt.Sprint(*e.Params)) s += "\n" } s += e.Left.Pretty(level + 1) - s += "\n" + indent(level) + ")" + s += "\n" + Indent(level) + ")" if e.Grouping != nil { s += e.Grouping.Pretty(level) @@ -236,9 +236,9 @@ func (e *RangeAggregationExpr) Pretty(level int) string { // - vector on which aggregation is done. // [without|by (