Prettify concat and downstream expressions. (#11877)

**What this PR does / why we need it**:
Prettifying `ConcatSampleExpr` et al will simplify debugging the shard
mapper.

**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [x] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e3ec)
- [ ] If the change is deprecating or removing a configuration option,
update the `deprecated-config.yaml` and `deleted-config.yaml` files
respectively in the `tools/deprecated-config-checker` directory.
[Example
PR](0d4416a4b0)
ptodev/update-win-eventlog
Karsten Jeschkies 2 years ago committed by GitHub
parent 60551dacc4
commit ddaa497554
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 112
      pkg/logql/downstream.go
  2. 141
      pkg/logql/downstream_test.go
  3. 29
      pkg/logql/shardmapper_test.go
  4. 64
      pkg/logql/syntax/prettier.go
  5. 8
      pkg/logql/syntax/prettier_test.go

@ -83,6 +83,29 @@ func (d DownstreamSampleExpr) String() string {
return fmt.Sprintf("downstream<%s, shard=%s>", d.SampleExpr.String(), d.shard)
}
// The DownstreamSampleExpr is not part of LogQL. In the prettified version it's
// represented as e.g. `downstream<count_over_time({foo="bar"} |= "error"), shard=1_of_3>`
func (d DownstreamSampleExpr) Pretty(level int) string {
s := syntax.Indent(level)
if !syntax.NeedSplit(d) {
return s + d.String()
}
s += "downstream<\n"
s += d.SampleExpr.Pretty(level + 1)
s += ",\n"
s += syntax.Indent(level+1) + "shard="
if d.shard != nil {
s += d.shard.String() + "\n"
} else {
s += "nil\n"
}
s += syntax.Indent(level) + ">"
return s
}
// DownstreamLogSelectorExpr is a LogSelectorExpr which signals downstream computation
type DownstreamLogSelectorExpr struct {
shard *astmapper.ShardAnnotation
@ -93,6 +116,29 @@ func (d DownstreamLogSelectorExpr) String() string {
return fmt.Sprintf("downstream<%s, shard=%s>", d.LogSelectorExpr.String(), d.shard)
}
// The DownstreamLogSelectorExpr is not part of LogQL. In the prettified version it's
// represented as e.g. `downstream<{foo="bar"} |= "error", shard=1_of_3>`
func (d DownstreamLogSelectorExpr) Pretty(level int) string {
s := syntax.Indent(level)
if !syntax.NeedSplit(d) {
return s + d.String()
}
s += "downstream<\n"
s += d.LogSelectorExpr.Pretty(level + 1)
s += ",\n"
s += syntax.Indent(level+1) + "shard="
if d.shard != nil {
s += d.shard.String() + "\n"
} else {
s += "nil\n"
}
s += syntax.Indent(level) + ">"
return s
}
func (d DownstreamSampleExpr) Walk(f syntax.WalkFn) { f(d) }
var defaultMaxDepth = 4
@ -105,7 +151,7 @@ type ConcatSampleExpr struct {
next *ConcatSampleExpr
}
func (c ConcatSampleExpr) String() string {
func (c *ConcatSampleExpr) String() string {
if c.next == nil {
return c.DownstreamSampleExpr.String()
}
@ -115,7 +161,7 @@ func (c ConcatSampleExpr) String() string {
// in order to not display huge queries with thousands of shards,
// we can limit the number of stringified subqueries.
func (c ConcatSampleExpr) string(maxDepth int) string {
func (c *ConcatSampleExpr) string(maxDepth int) string {
if c.next == nil {
return c.DownstreamSampleExpr.String()
}
@ -125,18 +171,46 @@ func (c ConcatSampleExpr) string(maxDepth int) string {
return fmt.Sprintf("%s ++ %s", c.DownstreamSampleExpr.String(), c.next.string(maxDepth-1))
}
func (c ConcatSampleExpr) Walk(f syntax.WalkFn) {
func (c *ConcatSampleExpr) Walk(f syntax.WalkFn) {
f(c)
f(c.next)
}
// ConcatSampleExpr has no LogQL repretenstation. It is expressed in in the
// prettified version as e.g. `concat(downstream<count_over_time({foo="bar"}), shard=...> ++ )`
func (c *ConcatSampleExpr) Pretty(level int) string {
s := syntax.Indent(level)
if !syntax.NeedSplit(c) {
return s + c.String()
}
s += "concat(\n"
head := c
for i := 0; i < defaultMaxDepth && head != nil; i++ {
if i > 0 {
s += syntax.Indent(level+1) + "++\n"
}
s += head.DownstreamSampleExpr.Pretty(level + 1)
s += "\n"
head = head.next
}
// There are more downstream samples...
if head != nil {
s += syntax.Indent(level+1) + "++ ...\n"
}
s += syntax.Indent(level) + ")"
return s
}
// ConcatLogSelectorExpr is an expr for concatenating multiple LogSelectorExpr
type ConcatLogSelectorExpr struct {
DownstreamLogSelectorExpr
next *ConcatLogSelectorExpr
}
func (c ConcatLogSelectorExpr) String() string {
func (c *ConcatLogSelectorExpr) String() string {
if c.next == nil {
return c.DownstreamLogSelectorExpr.String()
}
@ -146,7 +220,7 @@ func (c ConcatLogSelectorExpr) String() string {
// in order to not display huge queries with thousands of shards,
// we can limit the number of stringified subqueries.
func (c ConcatLogSelectorExpr) string(maxDepth int) string {
func (c *ConcatLogSelectorExpr) string(maxDepth int) string {
if c.next == nil {
return c.DownstreamLogSelectorExpr.String()
}
@ -156,6 +230,34 @@ func (c ConcatLogSelectorExpr) string(maxDepth int) string {
return fmt.Sprintf("%s ++ %s", c.DownstreamLogSelectorExpr.String(), c.next.string(maxDepth-1))
}
// ConcatLogSelectorExpr has no representation in LogQL. Its prettified version
// is e.g. `concat(downstream<{foo="bar"} |= "error", shard=1_of_3>)`
func (c *ConcatLogSelectorExpr) Pretty(level int) string {
s := syntax.Indent(level)
if !syntax.NeedSplit(c) {
return s + c.String()
}
s += "concat(\n"
head := c
for i := 0; i < defaultMaxDepth && head != nil; i++ {
if i > 0 {
s += syntax.Indent(level+1) + "++\n"
}
s += head.DownstreamLogSelectorExpr.Pretty(level + 1)
s += "\n"
head = head.next
}
// There are more downstream samples...
if head != nil {
s += syntax.Indent(level+1) + "++ ...\n"
}
s += ")"
return s
}
// QuantileSketchEvalExpr evaluates a quantile sketch to the actual quantile.
type QuantileSketchEvalExpr struct {
syntax.SampleExpr

@ -8,12 +8,14 @@ import (
"github.com/go-kit/log"
"github.com/grafana/dskit/user"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/querier/astmapper"
)
var nilShardMetrics = NewShardMapperMetrics(nil)
@ -543,3 +545,142 @@ func relativeError(t *testing.T, expected, actual promql.Matrix, alpha float64)
require.InEpsilonSlice(t, e, a, alpha)
}
}
func TestFormat_ShardedExpr(t *testing.T) {
oldMax := syntax.MaxCharsPerLine
syntax.MaxCharsPerLine = 20
oldDefaultDepth := defaultMaxDepth
defaultMaxDepth = 2
defer func() {
syntax.MaxCharsPerLine = oldMax
defaultMaxDepth = oldDefaultDepth
}()
cases := []struct {
name string
in syntax.Expr
exp string
}{
{
name: "ConcatSampleExpr",
in: &ConcatSampleExpr{
DownstreamSampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: 0,
Of: 3,
},
SampleExpr: &syntax.RangeAggregationExpr{
Operation: syntax.OpRangeTypeRate,
Left: &syntax.LogRange{
Left: &syntax.MatchersExpr{
Mts: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")},
},
Interval: time.Minute,
},
},
},
next: &ConcatSampleExpr{
DownstreamSampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: 1,
Of: 3,
},
SampleExpr: &syntax.RangeAggregationExpr{
Operation: syntax.OpRangeTypeRate,
Left: &syntax.LogRange{
Left: &syntax.MatchersExpr{
Mts: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")},
},
Interval: time.Minute,
},
},
},
next: &ConcatSampleExpr{
DownstreamSampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: 1,
Of: 3,
},
SampleExpr: &syntax.RangeAggregationExpr{
Operation: syntax.OpRangeTypeRate,
Left: &syntax.LogRange{
Left: &syntax.MatchersExpr{
Mts: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")},
},
Interval: time.Minute,
},
},
},
next: nil,
},
},
},
exp: `concat(
downstream<
rate(
{foo="bar"} [1m]
),
shard=0_of_3
>
++
downstream<
rate(
{foo="bar"} [1m]
),
shard=1_of_3
>
++ ...
)`,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
got := syntax.Prettify(c.in)
assert.Equal(t, c.exp, got)
})
}
}
func TestPrettierWithoutShards(t *testing.T) {
q := `((quantile_over_time(0.5,{foo="bar"} | json | unwrap bytes[1d]) by (cluster) > 42) and (count by (cluster)(max_over_time({foo="baz"} |= "error" | json | unwrap bytes[1d]) by (cluster,namespace)) > 10))`
e := syntax.MustParseExpr(q)
mapper := NewShardMapper(ConstantShards(4), nilShardMetrics, []string{})
_, _, mapped, err := mapper.Parse(e)
require.NoError(t, err)
got := syntax.Prettify(mapped)
expected := ` downstream<quantile_over_time(0.5,{foo="bar"} | json | unwrap bytes[1d]) by (cluster), shard=<nil>>
>
42
and
count by (cluster)(
max by (cluster, namespace)(
concat(
downstream<
max_over_time({foo="baz"} |= "error" | json | unwrap bytes[1d]) by (cluster,namespace),
shard=0_of_4
>
++
downstream<
max_over_time({foo="baz"} |= "error" | json | unwrap bytes[1d]) by (cluster,namespace),
shard=1_of_4
>
++
downstream<
max_over_time({foo="baz"} |= "error" | json | unwrap bytes[1d]) by (cluster,namespace),
shard=2_of_4
>
++
downstream<
max_over_time({foo="baz"} |= "error" | json | unwrap bytes[1d]) by (cluster,namespace),
shard=3_of_4
>
)
)
)
>
10`
assert.Equal(t, expected, got)
}

@ -1598,3 +1598,32 @@ func TestStringTrimming(t *testing.T) {
func float64p(v float64) *float64 {
return &v
}
func TestShardTopk(t *testing.T) {
expr := `topk(
10,
sum by (ip) (
sum_over_time({job="foo"} | json | unwrap bytes(bytes)[1m])
)
)`
m := NewShardMapper(ConstantShards(5), nilShardMetrics, []string{ShardQuantileOverTime})
_, _, mappedExpr, err := m.Parse(syntax.MustParseExpr(expr))
require.NoError(t, err)
expected := `topk(
10,
sum by (ip)(
concat(
downstream<sum by (ip)(sum_over_time({job="foo"} | json | unwrap bytes(bytes)[1m])), shard=0_of_5>
++
downstream<sum by (ip)(sum_over_time({job="foo"} | json | unwrap bytes(bytes)[1m])), shard=1_of_5>
++
downstream<sum by (ip)(sum_over_time({job="foo"} | json | unwrap bytes(bytes)[1m])), shard=2_of_5>
++
downstream<sum by (ip)(sum_over_time({job="foo"} | json | unwrap bytes(bytes)[1m])), shard=3_of_5>
++ ...
)
)
)`
require.Equal(t, expected, mappedExpr.Pretty(0))
}

@ -35,8 +35,8 @@ import (
//
var (
// maxCharsPerLine is used to qualify whether some LogQL expressions are worth `splitting` into new lines.
maxCharsPerLine = 100
// MaxCharsPerLine is used to qualify whether some LogQL expressions are worth `splitting` into new lines.
MaxCharsPerLine = 100
)
func Prettify(e Expr) string {
@ -51,8 +51,8 @@ func (e *MatchersExpr) Pretty(level int) string {
// e.g: `{foo="bar"} | logfmt | level="error"`
// Here, left = `{foo="bar"}` and multistages would collection of each stage in pipeline, here `logfmt` and `level="error"`
func (e *PipelineExpr) Pretty(level int) string {
if !needSplit(e) {
return indent(level) + e.String()
if !NeedSplit(e) {
return Indent(level) + e.String()
}
s := fmt.Sprintf("%s\n", e.Left.Pretty(level))
@ -73,8 +73,8 @@ func (e *PipelineExpr) Pretty(level int) string {
// e.g: `|= "error" != "memcache" |= ip("192.168.0.1")`
// NOTE: here `ip` is Op in this expression.
func (e *LineFilterExpr) Pretty(level int) string {
if !needSplit(e) {
return indent(level) + e.String()
if !NeedSplit(e) {
return Indent(level) + e.String()
}
var s string
@ -90,7 +90,7 @@ func (e *LineFilterExpr) Pretty(level int) string {
s += "\n"
}
s += indent(level)
s += Indent(level)
// We re-use LineFilterExpr's String() implementation to avoid duplication.
// We create new LineFilterExpr without `Left`.
@ -153,7 +153,7 @@ func (e *LogfmtExpressionParser) Pretty(level int) string {
// e.g: sum_over_time({foo="bar"} | logfmt | unwrap bytes_processed [5m])
func (e *UnwrapExpr) Pretty(level int) string {
s := indent(level)
s := Indent(level)
if e.Operation != "" {
s += fmt.Sprintf("%s %s %s(%s)", OpPipe, OpUnwrap, e.Operation, e.Identifier)
@ -161,7 +161,7 @@ func (e *UnwrapExpr) Pretty(level int) string {
s += fmt.Sprintf("%s %s %s", OpPipe, OpUnwrap, e.Identifier)
}
for _, f := range e.PostFilters {
s += fmt.Sprintf("\n%s%s %s", indent(level), OpPipe, f)
s += fmt.Sprintf("\n%s%s %s", Indent(level), OpPipe, f)
}
return s
}
@ -200,8 +200,8 @@ func (e *OffsetExpr) Pretty(_ int) string {
// e.g: count_over_time({foo="bar"}[5m])
func (e *RangeAggregationExpr) Pretty(level int) string {
s := indent(level)
if !needSplit(e) {
s := Indent(level)
if !NeedSplit(e) {
return s + e.String()
}
@ -211,13 +211,13 @@ func (e *RangeAggregationExpr) Pretty(level int) string {
// print args to the function.
if e.Params != nil {
s = fmt.Sprintf("%s%s%s,", s, indent(level+1), fmt.Sprint(*e.Params))
s = fmt.Sprintf("%s%s%s,", s, Indent(level+1), fmt.Sprint(*e.Params))
s += "\n"
}
s += e.Left.Pretty(level + 1)
s += "\n" + indent(level) + ")"
s += "\n" + Indent(level) + ")"
if e.Grouping != nil {
s += e.Grouping.Pretty(level)
@ -236,9 +236,9 @@ func (e *RangeAggregationExpr) Pretty(level int) string {
// <vector expression> - vector on which aggregation is done.
// [without|by (<label list)] - optional labels to aggregate either with `by` or `without` clause.
func (e *VectorAggregationExpr) Pretty(level int) string {
s := indent(level)
s := Indent(level)
if !needSplit(e) {
if !NeedSplit(e) {
return s + e.String()
}
@ -249,11 +249,11 @@ func (e *VectorAggregationExpr) Pretty(level int) string {
switch e.Operation {
// e.Params default value (0) can mean a legit param for topk and bottomk
case OpTypeBottomK, OpTypeTopK:
params = []string{fmt.Sprintf("%s%d", indent(level+1), e.Params), left}
params = []string{fmt.Sprintf("%s%d", Indent(level+1), e.Params), left}
default:
if e.Params != 0 {
params = []string{fmt.Sprintf("%s%d", indent(level+1), e.Params), left}
params = []string{fmt.Sprintf("%s%d", Indent(level+1), e.Params), left}
} else {
params = []string{left}
}
@ -274,7 +274,7 @@ func (e *VectorAggregationExpr) Pretty(level int) string {
}
s += "\n"
}
s += indent(level) + ")"
s += Indent(level) + ")"
return s
}
@ -285,15 +285,15 @@ func (e *VectorAggregationExpr) Pretty(level int) string {
// "==", "!=", ">", ">=", "<", "<=" (comparison)
func (e *BinOpExpr) Pretty(level int) string {
s := indent(level)
if !needSplit(e) {
s := Indent(level)
if !NeedSplit(e) {
return s + e.String()
}
s = e.SampleExpr.Pretty(level+1) + "\n"
op := formatBinaryOp(e.Op, e.Opts)
s += indent(level) + op + "\n"
s += Indent(level) + op + "\n"
s += e.RHS.Pretty(level + 1)
return s
@ -306,9 +306,9 @@ func (e *LiteralExpr) Pretty(level int) string {
// e.g: label_replace(rate({job="api-server",service="a:c"}[5m]), "foo", "$1", "service", "(.*):.*")
func (e *LabelReplaceExpr) Pretty(level int) string {
s := indent(level)
s := Indent(level)
if !needSplit(e) {
if !NeedSplit(e) {
return s + e.String()
}
@ -318,10 +318,10 @@ func (e *LabelReplaceExpr) Pretty(level int) string {
params := []string{
e.Left.Pretty(level + 1),
indent(level+1) + strconv.Quote(e.Dst),
indent(level+1) + strconv.Quote(e.Replacement),
indent(level+1) + strconv.Quote(e.Src),
indent(level+1) + strconv.Quote(e.Regex),
Indent(level+1) + strconv.Quote(e.Dst),
Indent(level+1) + strconv.Quote(e.Replacement),
Indent(level+1) + strconv.Quote(e.Src),
Indent(level+1) + strconv.Quote(e.Regex),
}
for i, v := range params {
@ -333,7 +333,7 @@ func (e *LabelReplaceExpr) Pretty(level int) string {
s += "\n"
}
s += indent(level) + ")"
s += Indent(level) + ")"
return s
}
@ -369,19 +369,19 @@ func (g *Grouping) Pretty(_ int) string {
// Helpers
func commonPrefixIndent(level int, current Expr) string {
return fmt.Sprintf("%s%s", indent(level), current.String())
return fmt.Sprintf("%s%s", Indent(level), current.String())
}
func needSplit(e Expr) bool {
func NeedSplit(e Expr) bool {
if e == nil {
return false
}
return len(e.String()) > maxCharsPerLine
return len(e.String()) > MaxCharsPerLine
}
const indentString = " "
func indent(level int) string {
func Indent(level int) string {
return strings.Repeat(indentString, level)
}

@ -8,7 +8,7 @@ import (
)
func TestFormat(t *testing.T) {
maxCharsPerLine = 20
MaxCharsPerLine = 20
cases := []struct {
name string
@ -108,7 +108,7 @@ func TestFormat(t *testing.T) {
}
func TestFormat_VectorAggregation(t *testing.T) {
maxCharsPerLine = 20
MaxCharsPerLine = 20
cases := []struct {
name string
@ -147,7 +147,7 @@ func TestFormat_VectorAggregation(t *testing.T) {
}
func TestFormat_LabelReplace(t *testing.T) {
maxCharsPerLine = 20
MaxCharsPerLine = 20
cases := []struct {
name string
@ -201,7 +201,7 @@ func TestFormat_LabelReplace(t *testing.T) {
}
func TestFormat_BinOp(t *testing.T) {
maxCharsPerLine = 20
MaxCharsPerLine = 20
cases := []struct {
name string

Loading…
Cancel
Save