From 2e14c45ceb71a8285101341462190f6c7b94f9da Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Wed, 21 Oct 2020 05:44:45 -0400 Subject: [PATCH] Logqlv2 pushes groups down to edge (#2786) Pushes down grouping into range aggregation to reduce labels at edges. This works only for sums. Signed-off-by: Cyril Tovena --- pkg/chunkenc/memchunk.go | 2 + pkg/chunkenc/memchunk_test.go | 5 +- pkg/logproto/extensions.go | 4 + pkg/logql/ast.go | 6 +- pkg/logql/engine_test.go | 31 +- pkg/logql/evaluator.go | 41 +- pkg/logql/functions.go | 34 +- pkg/logql/log/labels.go | 10 + pkg/logql/log/metrics_extraction.go | 56 +- pkg/logql/log/metrics_extraction_test.go | 18 +- pkg/logql/parser_test.go | 1802 +++++++++++----------- pkg/logql/sharding.go | 2 +- pkg/logql/sharding_test.go | 3 + pkg/logql/test_utils.go | 4 + pkg/storage/batch_test.go | 3 +- 15 files changed, 1072 insertions(+), 949 deletions(-) diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 32f4273246..5e47eb3a18 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -9,6 +9,7 @@ import ( "hash" "hash/crc32" "io" + "sort" "time" "github.com/cespare/xxhash/v2" @@ -646,6 +647,7 @@ func (hb *headBlock) sampleIterator(ctx context.Context, mint, maxt int64, lbs l } seriesRes := make([]logproto.Series, 0, len(series)) for _, s := range series { + sort.Sort(s) seriesRes = append(seriesRes, *s) } return iter.NewMultiSeriesIterator(ctx, seriesRes) diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index f770a39789..3b09564e65 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -20,6 +20,7 @@ import ( "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/log" "github.com/grafana/loki/pkg/logql/stats" ) @@ -128,7 +129,7 @@ func TestBlock(t *testing.T) { require.NoError(t, it.Close()) require.Equal(t, len(cases), idx) - sampleIt := chk.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), nil, logql.ExtractCount) + sampleIt := chk.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), nil, log.CountExtractor.ToSampleExtractor(nil, false, false)) idx = 0 for sampleIt.Next() { s := sampleIt.Sample() @@ -276,7 +277,7 @@ func TestSerialization(t *testing.T) { } require.NoError(t, it.Error()) - sampleIt := bc.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), nil, logql.ExtractCount) + sampleIt := bc.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), nil, log.CountExtractor.ToSampleExtractor(nil, false, false)) for i := 0; i < numSamples; i++ { require.True(t, sampleIt.Next(), i) diff --git a/pkg/logproto/extensions.go b/pkg/logproto/extensions.go index fe328c60a6..3a68d26148 100644 --- a/pkg/logproto/extensions.go +++ b/pkg/logproto/extensions.go @@ -17,3 +17,7 @@ type Streams []Stream func (xs Streams) Len() int { return len(xs) } func (xs Streams) Swap(i, j int) { xs[i], xs[j] = xs[j], xs[i] } func (xs Streams) Less(i, j int) bool { return xs[i].Labels <= xs[j].Labels } + +func (s Series) Len() int { return len(s.Samples) } +func (s Series) Swap(i, j int) { s.Samples[i], s.Samples[j] = s.Samples[j], s.Samples[i] } +func (s Series) Less(i, j int) bool { return s.Samples[i].Timestamp < s.Samples[j].Timestamp } diff --git a/pkg/logql/ast.go b/pkg/logql/ast.go index 0dba2bf487..031c51e3ad 100644 --- a/pkg/logql/ast.go +++ b/pkg/logql/ast.go @@ -86,7 +86,6 @@ type SampleExtractor = log.SampleExtractor var ( NoopPipeline = log.NoopPipeline - ExtractCount = log.CountExtractor.ToSampleExtractor() ) // PipelineExpr is an expression defining a log pipeline. @@ -716,6 +715,11 @@ func (e *vectorAggregationExpr) Selector() LogSelectorExpr { } func (e *vectorAggregationExpr) Extractor() (log.SampleExtractor, error) { + // inject in the range vector extractor the outer groups to improve performance. + // This is only possible if the operation is a sum. Anything else needs all labels. + if r, ok := e.left.(*rangeAggregationExpr); ok && e.operation == OpTypeSum { + return r.extractor(e.grouping, true) + } return e.left.Extractor() } diff --git a/pkg/logql/engine_test.go b/pkg/logql/engine_test.go index 4741734d0d..5acd40eb76 100644 --- a/pkg/logql/engine_test.go +++ b/pkg/logql/engine_test.go @@ -7,7 +7,6 @@ import ( "math" "strings" - // "math" "testing" "time" @@ -163,7 +162,7 @@ func TestEngine_LogsInstantQuery(t *testing.T) { {newSeries(testSize, factor(5, identity), `{app="foo"}`), newSeries(testSize, factor(5, identity), `{app="bar"}`)}, }, []SelectSampleParams{ - {&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `rate({app=~"foo|bar"}|~".+bar"[1m])`}}, + {&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `sum(rate({app=~"foo|bar"} |~".+bar" [1m]))`}}, }, promql.Vector{ promql.Sample{Point: promql.Point{T: 60 * 1000, V: 0.4}, Metric: labels.Labels{}}, @@ -175,7 +174,7 @@ func TestEngine_LogsInstantQuery(t *testing.T) { {newSeries(testSize, factor(10, identity), `{app="foo"}`), newSeries(testSize, factor(10, identity), `{app="bar"}`)}, }, []SelectSampleParams{ - {&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `count_over_time({app=~"foo|bar"}|~".+bar"[1m])`}}, + {&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `sum by (app)(count_over_time({app=~"foo|bar"} |~".+bar" [1m]))`}}, }, promql.Vector{ promql.Sample{Point: promql.Point{T: 60 * 1000, V: 6}, Metric: labels.Labels{labels.Label{Name: "app", Value: "bar"}}}, @@ -191,7 +190,7 @@ func TestEngine_LogsInstantQuery(t *testing.T) { }, }, []SelectSampleParams{ - {&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `count_over_time({app=~"foo|bar"}|~".+bar"[1m])`}}, + {&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `sum by (namespace,app) (count_over_time({app=~"foo|bar"} |~".+bar" [1m])) `}}, }, promql.Vector{ promql.Sample{ @@ -432,8 +431,8 @@ func TestEngine_LogsInstantQuery(t *testing.T) { {}, }, []SelectSampleParams{ - {&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `count_over_time({app="foo"}[1m])`}}, - {&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `count_over_time({app="bar"}[1m])`}}, + {&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `sum without (app) (count_over_time({app="foo"}[1m]))`}}, + {&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `sum without (app) (count_over_time({app="bar"}[1m]))`}}, }, promql.Vector{ promql.Sample{Point: promql.Point{T: 60 * 1000, V: 0}, Metric: labels.Labels{}}, @@ -449,8 +448,8 @@ func TestEngine_LogsInstantQuery(t *testing.T) { {newSeries(testSize, identity, `{app="bar"}`)}, }, []SelectSampleParams{ - {&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `count_over_time({app="foo"}[1m])`}}, - {&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `count_over_time({app="bar"}[1m])`}}, + {&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `sum without(app) (count_over_time({app="foo"}[1m]))`}}, + {&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `sum without(app) (count_over_time({app="bar"}[1m]))`}}, }, promql.Vector{ promql.Sample{Point: promql.Point{T: 60 * 1000, V: 60}, Metric: labels.Labels{}}, @@ -688,7 +687,7 @@ func TestEngine_RangeQuery(t *testing.T) { {newSeries(testSize, factor(5, identity), `{app="foo"}`), newSeries(testSize, factor(5, identity), `{app="bar"}`)}, }, []SelectSampleParams{ - {&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `rate({app=~"foo|bar"}|~".+bar"[1m])`}}, + {&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `sum(rate({app=~"foo|bar"} |~".+bar" [1m]))`}}, }, promql.Matrix{ promql.Series{ @@ -703,7 +702,7 @@ func TestEngine_RangeQuery(t *testing.T) { {newSeries(testSize, factor(10, identity), `{app="foo"}`), newSeries(testSize, factor(5, identity), `{app="bar"}`)}, }, []SelectSampleParams{ - {&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `count_over_time({app=~"foo|bar"}|~".+bar"[1m])`}}, + {&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `sum by (app) (count_over_time({app=~"foo|bar"} |~".+bar" [1m]))`}}, }, promql.Matrix{ promql.Series{ @@ -727,7 +726,7 @@ func TestEngine_RangeQuery(t *testing.T) { }, }, []SelectSampleParams{ - {&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `count_over_time({app=~"foo|bar"}|~".+bar"[1m])`}}, + {&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `sum by (namespace,cluster, app)(count_over_time({app=~"foo|bar"} |~".+bar" [1m]))`}}, }, promql.Matrix{ promql.Series{ @@ -759,7 +758,7 @@ func TestEngine_RangeQuery(t *testing.T) { }, }, []SelectSampleParams{ - {&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `count_over_time({app=~"foo|bar"}|~".+bar"[1m])`}}, + {&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `sum by (cluster, namespace, app) (count_over_time({app=~"foo|bar"} |~".+bar" [1m]))`}}, }, promql.Matrix{ promql.Series{ @@ -791,7 +790,7 @@ func TestEngine_RangeQuery(t *testing.T) { }, }, []SelectSampleParams{ - {&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `count_over_time({app=~"foo|bar"}|~".+bar"[1m])`}}, + {&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `sum by (namespace, app)(count_over_time({app=~"foo|bar"} |~".+bar" [1m]))`}}, }, promql.Matrix{ promql.Series{ @@ -1224,7 +1223,7 @@ func TestEngine_RangeQuery(t *testing.T) { }, }, []SelectSampleParams{ - {&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `rate({app=~"foo|bar"}|~".+bar"[1m])`}}, + {&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `sum by (app) (rate({app=~"foo|bar"} |~".+bar" [1m]))`}}, }, promql.Matrix{ promql.Series{ @@ -1252,7 +1251,7 @@ func TestEngine_RangeQuery(t *testing.T) { }, }, []SelectSampleParams{ - {&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `rate({app=~"foo|bar"}|~".+bar"[1m])`}}, + {&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `sum by (app) (rate({app=~"foo|bar"} |~".+bar" [1m]))`}}, }, promql.Matrix{ promql.Series{ @@ -1280,7 +1279,7 @@ func TestEngine_RangeQuery(t *testing.T) { }, }, []SelectSampleParams{ - {&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `rate({app=~"foo|bar"}|~".+bar"[1m])`}}, + {&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `sum by (app) (rate({app=~"foo|bar"} |~".+bar" [1m]))`}}, }, promql.Matrix{ promql.Series{ diff --git a/pkg/logql/evaluator.go b/pkg/logql/evaluator.go index 7a31da0266..6a97422415 100644 --- a/pkg/logql/evaluator.go +++ b/pkg/logql/evaluator.go @@ -103,9 +103,23 @@ func GetRangeType(q Params) QueryRangeType { // Evaluator is an interface for iterating over data at different nodes in the AST type Evaluator interface { + SampleEvaluator + EntryEvaluator +} + +type SampleEvaluator interface { // StepEvaluator returns a StepEvaluator for a given SampleExpr. It's explicitly passed another StepEvaluator// in order to enable arbitrary computation of embedded expressions. This allows more modular & extensible // StepEvaluator implementations which can be composed. - StepEvaluator(ctx context.Context, nextEvaluator Evaluator, expr SampleExpr, p Params) (StepEvaluator, error) + StepEvaluator(ctx context.Context, nextEvaluator SampleEvaluator, expr SampleExpr, p Params) (StepEvaluator, error) +} + +type SampleEvaluatorFunc func(ctx context.Context, nextEvaluator SampleEvaluator, expr SampleExpr, p Params) (StepEvaluator, error) + +func (s SampleEvaluatorFunc) StepEvaluator(ctx context.Context, nextEvaluator SampleEvaluator, expr SampleExpr, p Params) (StepEvaluator, error) { + return s(ctx, nextEvaluator, expr, p) +} + +type EntryEvaluator interface { // Iterator returns the iter.EntryIterator for a given LogSelectorExpr Iterator(context.Context, LogSelectorExpr, Params) (iter.EntryIterator, error) } @@ -151,12 +165,31 @@ func (ev *DefaultEvaluator) Iterator(ctx context.Context, expr LogSelectorExpr, func (ev *DefaultEvaluator) StepEvaluator( ctx context.Context, - nextEv Evaluator, + nextEv SampleEvaluator, expr SampleExpr, q Params, ) (StepEvaluator, error) { switch e := expr.(type) { case *vectorAggregationExpr: + if rangExpr, ok := e.left.(*rangeAggregationExpr); ok && e.operation == OpTypeSum { + // if range expression is wrapped with a vector expression + // we should send the vector expression for allowing reducing labels at the source. + nextEv = SampleEvaluatorFunc(func(ctx context.Context, nextEvaluator SampleEvaluator, expr SampleExpr, p Params) (StepEvaluator, error) { + it, err := ev.querier.SelectSamples(ctx, SelectSampleParams{ + &logproto.SampleQueryRequest{ + Start: q.Start().Add(-rangExpr.left.interval), + End: q.End(), + Selector: e.String(), // intentionally send the the vector for reducing labels. + Shards: q.Shards(), + }, + }) + if err != nil { + return nil, err + } + return rangeAggEvaluator(iter.NewPeekingSampleIterator(it), rangExpr, q) + }) + + } return vectorAggEvaluator(ctx, nextEv, e, q) case *rangeAggregationExpr: it, err := ev.querier.SelectSamples(ctx, SelectSampleParams{ @@ -180,7 +213,7 @@ func (ev *DefaultEvaluator) StepEvaluator( func vectorAggEvaluator( ctx context.Context, - ev Evaluator, + ev SampleEvaluator, expr *vectorAggregationExpr, q Params, ) (StepEvaluator, error) { @@ -458,7 +491,7 @@ func (r rangeVectorEvaluator) Error() error { // it makes the type system simpler and these are reduced in mustNewBinOpExpr func binOpStepEvaluator( ctx context.Context, - ev Evaluator, + ev SampleEvaluator, expr *binOpExpr, q Params, ) (StepEvaluator, error) { diff --git a/pkg/logql/functions.go b/pkg/logql/functions.go index 8a797f0e05..9cf56d4dd9 100644 --- a/pkg/logql/functions.go +++ b/pkg/logql/functions.go @@ -13,9 +13,30 @@ import ( const unsupportedErr = "unsupported range vector aggregation operation: %s" func (r rangeAggregationExpr) Extractor() (log.SampleExtractor, error) { + return r.extractor(nil, false) +} + +func (r rangeAggregationExpr) extractor(gr *grouping, all bool) (log.SampleExtractor, error) { if err := r.validate(); err != nil { return nil, err } + var groups []string + var without bool + + // fallback to parents grouping + if gr != nil { + groups = gr.groups + without = gr.without + } + + // range aggregation grouping takes priority + if r.grouping != nil { + groups = r.grouping.groups + without = r.grouping.without + } + + sort.Strings(groups) + var stages []log.Stage if p, ok := r.left.left.(*pipelineExpr); ok { // if the expression is a pipeline then take all stages into account first. @@ -28,30 +49,25 @@ func (r rangeAggregationExpr) Extractor() (log.SampleExtractor, error) { // unwrap...means we want to extract metrics from labels. if r.left.unwrap != nil { var convOp string - var groups []string - var without bool switch r.left.unwrap.operation { case OpConvDuration, OpConvDurationSeconds: convOp = log.ConvertDuration default: convOp = log.ConvertFloat } - if r.grouping != nil { - groups = r.grouping.groups - without = r.grouping.without - } + return log.LabelExtractorWithStages( r.left.unwrap.identifier, - convOp, groups, without, stages, + convOp, groups, without, all, stages, log.ReduceAndLabelFilter(r.left.unwrap.postFilters), ) } // otherwise we extract metrics from the log line. switch r.operation { case OpRangeTypeRate, OpRangeTypeCount: - return log.LineExtractorWithStages(log.CountExtractor, stages) + return log.LineExtractorWithStages(log.CountExtractor, stages, groups, without, all) case OpRangeTypeBytes, OpRangeTypeBytesRate: - return log.LineExtractorWithStages(log.BytesExtractor, stages) + return log.LineExtractorWithStages(log.BytesExtractor, stages, groups, without, all) default: return nil, fmt.Errorf(unsupportedErr, r.operation) } diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go index dcf21fd5b4..327c202c6d 100644 --- a/pkg/logql/log/labels.go +++ b/pkg/logql/log/labels.go @@ -135,3 +135,13 @@ Outer: return res } + +func (b *LabelsBuilder) WithoutLabels(names ...string) labels.Labels { + // naive implementation for now. + return b.Labels().WithoutLabels(names...) +} + +func (b *LabelsBuilder) WithLabels(names ...string) labels.Labels { + // naive implementation for now. + return b.Labels().WithLabels(names...) +} diff --git a/pkg/logql/log/metrics_extraction.go b/pkg/logql/log/metrics_extraction.go index 48012c8e9e..7c0791c8c4 100644 --- a/pkg/logql/log/metrics_extraction.go +++ b/pkg/logql/log/metrics_extraction.go @@ -1,6 +1,7 @@ package log import ( + "sort" "strconv" "time" @@ -29,8 +30,14 @@ type LineExtractor func([]byte) float64 // ToSampleExtractor transform a LineExtractor into a SampleExtractor. // Useful for metric conversion without log Pipeline. -func (l LineExtractor) ToSampleExtractor() SampleExtractor { +func (l LineExtractor) ToSampleExtractor(groups []string, without bool, noLabels bool) SampleExtractor { return SampleExtractorFunc(func(line []byte, lbs labels.Labels) (float64, labels.Labels, bool) { + // todo(cyriltovena) grouping should be done once per stream/chunk not for everyline. + // so for now we'll cover just vector without grouping. This requires changes to SampleExtractor interface. + // For another day ! + if len(groups) == 0 && noLabels { + return l(line), labels.Labels{}, true + } return l(line), lbs, true }) } @@ -44,7 +51,10 @@ type lineSampleExtractor struct { Stage LineExtractor - builder *LabelsBuilder + groups []string + without bool + noLabels bool + builder *LabelsBuilder } func (l lineSampleExtractor) Process(line []byte, lbs labels.Labels) (float64, labels.Labels, bool) { @@ -53,16 +63,33 @@ func (l lineSampleExtractor) Process(line []byte, lbs labels.Labels) (float64, l if !ok { return 0, nil, false } + if len(l.groups) != 0 { + if l.without { + return l.LineExtractor(line), l.builder.WithoutLabels(l.groups...), true + } + return l.LineExtractor(line), l.builder.WithLabels(l.groups...), true + } + if l.noLabels { + // no grouping but it was a vector operation so we return a single vector + return l.LineExtractor(line), labels.Labels{}, true + } return l.LineExtractor(line), l.builder.Labels(), true } // LineExtractorWithStages creates a SampleExtractor from a LineExtractor. // Multiple log stages are run before converting the log line. -func LineExtractorWithStages(ex LineExtractor, stages []Stage) (SampleExtractor, error) { +func LineExtractorWithStages(ex LineExtractor, stages []Stage, groups []string, without bool, noLabels bool) (SampleExtractor, error) { if len(stages) == 0 { - return ex.ToSampleExtractor(), nil + return ex.ToSampleExtractor(groups, without, noLabels), nil } - return lineSampleExtractor{Stage: ReduceStages(stages), LineExtractor: ex, builder: NewLabelsBuilder()}, nil + return lineSampleExtractor{ + Stage: ReduceStages(stages), + LineExtractor: ex, + builder: NewLabelsBuilder(), + groups: groups, + without: without, + noLabels: noLabels, + }, nil } type convertionFn func(value string) (float64, error) @@ -76,6 +103,7 @@ type labelSampleExtractor struct { conversionFn convertionFn groups []string without bool + noLabels bool } // LabelExtractorWithStages creates a SampleExtractor that will extract metrics from a labels. @@ -83,7 +111,7 @@ type labelSampleExtractor struct { // to remove sample containing the __error__ label. func LabelExtractorWithStages( labelName, conversion string, - groups []string, without bool, + groups []string, without bool, noLabels bool, preStages []Stage, postFilter Stage, ) (SampleExtractor, error) { @@ -96,6 +124,10 @@ func LabelExtractorWithStages( default: return nil, errors.Errorf("unsupported conversion operation %s", conversion) } + if len(groups) != 0 && without { + groups = append(groups, labelName) + sort.Strings(groups) + } return &labelSampleExtractor{ preStage: ReduceStages(preStages), conversionFn: convFn, @@ -104,6 +136,7 @@ func LabelExtractorWithStages( postFilter: postFilter, without: without, builder: NewLabelsBuilder(), + noLabels: noLabels, }, nil } @@ -135,16 +168,19 @@ func (l *labelSampleExtractor) Process(line []byte, lbs labels.Labels) (float64, // We need to return now before applying grouping otherwise the error might get lost. return v, l.builder.Labels(), true } - return v, l.groupLabels(l.builder.Labels()), true + return v, l.groupLabels(l.builder), true } -func (l *labelSampleExtractor) groupLabels(lbs labels.Labels) labels.Labels { - if l.groups != nil { +func (l *labelSampleExtractor) groupLabels(lbs *LabelsBuilder) labels.Labels { + if len(l.groups) != 0 { if l.without { - return lbs.WithoutLabels(append(l.groups, l.labelName)...) + return lbs.WithoutLabels(l.groups...) } return lbs.WithLabels(l.groups...) } + if l.noLabels { + return labels.Labels{} + } return lbs.WithoutLabels(l.labelName) } diff --git a/pkg/logql/log/metrics_extraction_test.go b/pkg/logql/log/metrics_extraction_test.go index 16c81a5c6a..c6133c6bb8 100644 --- a/pkg/logql/log/metrics_extraction_test.go +++ b/pkg/logql/log/metrics_extraction_test.go @@ -20,17 +20,27 @@ func Test_labelSampleExtractor_Extract(t *testing.T) { { "convert float", mustSampleExtractor(LabelExtractorWithStages( - "foo", ConvertFloat, nil, false, nil, NoopStage, + "foo", ConvertFloat, nil, false, false, nil, NoopStage, )), labels.Labels{labels.Label{Name: "foo", Value: "15.0"}}, 15, labels.Labels{}, true, }, + { + "convert float as vector with no grouping", + mustSampleExtractor(LabelExtractorWithStages( + "foo", ConvertFloat, nil, false, true, nil, NoopStage, + )), + labels.Labels{labels.Label{Name: "foo", Value: "15.0"}, labels.Label{Name: "bar", Value: "buzz"}}, + 15, + labels.Labels{}, + true, + }, { "convert float without", mustSampleExtractor(LabelExtractorWithStages( - "foo", ConvertFloat, []string{"bar", "buzz"}, true, nil, NoopStage, + "foo", ConvertFloat, []string{"bar", "buzz"}, true, false, nil, NoopStage, )), labels.Labels{ {Name: "foo", Value: "10"}, @@ -47,7 +57,7 @@ func Test_labelSampleExtractor_Extract(t *testing.T) { { "convert float with", mustSampleExtractor(LabelExtractorWithStages( - "foo", ConvertFloat, []string{"bar", "buzz"}, false, nil, NoopStage, + "foo", ConvertFloat, []string{"bar", "buzz"}, false, false, nil, NoopStage, )), labels.Labels{ {Name: "foo", Value: "0.6"}, @@ -65,7 +75,7 @@ func Test_labelSampleExtractor_Extract(t *testing.T) { { "convert duration with", mustSampleExtractor(LabelExtractorWithStages( - "foo", ConvertDuration, []string{"bar", "buzz"}, false, nil, NoopStage, + "foo", ConvertDuration, []string{"bar", "buzz"}, false, false, nil, NoopStage, )), labels.Labels{ {Name: "foo", Value: "500ms"}, diff --git a/pkg/logql/parser_test.go b/pkg/logql/parser_test.go index 2883a0648a..7ee139bcb5 100644 --- a/pkg/logql/parser_test.go +++ b/pkg/logql/parser_test.go @@ -22,909 +22,909 @@ func TestParse(t *testing.T) { exp Expr err error }{ - // { - // // raw string - // in: "count_over_time({foo=~`bar\\w+`}[12h] |~ `error\\`)", - // exp: &rangeAggregationExpr{ - // operation: "count_over_time", - // left: &logRange{ - // left: &pipelineExpr{ - // pipeline: MultiStageExpr{ - // newLineFilterExpr(nil, labels.MatchRegexp, "error\\"), - // }, - // left: &matchersExpr{ - // matchers: []*labels.Matcher{ - // mustNewMatcher(labels.MatchRegexp, "foo", "bar\\w+"), - // }, - // }, - // }, - // interval: 12 * time.Hour, - // }, - // }, - // }, - // { - // // test [12h] before filter expr - // in: `count_over_time({foo="bar"}[12h] |= "error")`, - // exp: &rangeAggregationExpr{ - // operation: "count_over_time", - // left: &logRange{ - // left: newPipelineExpr( - // newMatcherExpr([]*labels.Matcher{{Type: labels.MatchEqual, Name: "foo", Value: "bar"}}), - // MultiStageExpr{ - // newLineFilterExpr(nil, labels.MatchEqual, "error"), - // }, - // ), - // interval: 12 * time.Hour, - // }, - // }, - // }, - // { - // // test [12h] after filter expr - // in: `count_over_time({foo="bar"} |= "error" [12h])`, - // exp: &rangeAggregationExpr{ - // operation: "count_over_time", - // left: &logRange{ - // left: newPipelineExpr( - // newMatcherExpr([]*labels.Matcher{{Type: labels.MatchEqual, Name: "foo", Value: "bar"}}), - // MultiStageExpr{newLineFilterExpr(nil, labels.MatchEqual, "error")}, - // ), - // interval: 12 * time.Hour, - // }, - // }, - // }, - // { - // in: `{foo="bar"}`, - // exp: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}}, - // }, - // { - // in: `{ foo = "bar" }`, - // exp: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}}, - // }, - // { - // in: `{ foo != "bar" }`, - // exp: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotEqual, "foo", "bar")}}, - // }, - // { - // in: `{ foo =~ "bar" }`, - // exp: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchRegexp, "foo", "bar")}}, - // }, - // { - // in: `{ foo !~ "bar" }`, - // exp: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotRegexp, "foo", "bar")}}, - // }, - // { - // in: `count_over_time({ foo !~ "bar" }[12m])`, - // exp: &rangeAggregationExpr{ - // left: &logRange{ - // left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotRegexp, "foo", "bar")}}, - // interval: 12 * time.Minute, - // }, - // operation: "count_over_time", - // }, - // }, - // { - // in: `bytes_over_time({ foo !~ "bar" }[12m])`, - // exp: &rangeAggregationExpr{ - // left: &logRange{ - // left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotRegexp, "foo", "bar")}}, - // interval: 12 * time.Minute, - // }, - // operation: OpRangeTypeBytes, - // }, - // }, - // { - // in: `bytes_rate({ foo !~ "bar" }[12m])`, - // exp: &rangeAggregationExpr{ - // left: &logRange{ - // left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotRegexp, "foo", "bar")}}, - // interval: 12 * time.Minute, - // }, - // operation: OpRangeTypeBytesRate, - // }, - // }, - // { - // in: `rate({ foo !~ "bar" }[5h])`, - // exp: &rangeAggregationExpr{ - // left: &logRange{ - // left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotRegexp, "foo", "bar")}}, - // interval: 5 * time.Hour, - // }, - // operation: "rate", - // }, - // }, - // { - // in: `rate({ foo !~ "bar" }[5d])`, - // exp: &rangeAggregationExpr{ - // left: &logRange{ - // left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotRegexp, "foo", "bar")}}, - // interval: 5 * 24 * time.Hour, - // }, - // operation: "rate", - // }, - // }, - // { - // in: `count_over_time({ foo !~ "bar" }[1w])`, - // exp: &rangeAggregationExpr{ - // left: &logRange{ - // left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotRegexp, "foo", "bar")}}, - // interval: 7 * 24 * time.Hour, - // }, - // operation: "count_over_time", - // }, - // }, - // { - // in: `sum(rate({ foo !~ "bar" }[5h]))`, - // exp: mustNewVectorAggregationExpr(&rangeAggregationExpr{ - // left: &logRange{ - // left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotRegexp, "foo", "bar")}}, - // interval: 5 * time.Hour, - // }, - // operation: "rate", - // }, "sum", nil, nil), - // }, - // { - // in: `sum(rate({ foo !~ "bar" }[1y]))`, - // exp: mustNewVectorAggregationExpr(&rangeAggregationExpr{ - // left: &logRange{ - // left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotRegexp, "foo", "bar")}}, - // interval: 365 * 24 * time.Hour, - // }, - // operation: "rate", - // }, "sum", nil, nil), - // }, - // { - // in: `avg(count_over_time({ foo !~ "bar" }[5h])) by (bar,foo)`, - // exp: mustNewVectorAggregationExpr(&rangeAggregationExpr{ - // left: &logRange{ - // left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotRegexp, "foo", "bar")}}, - // interval: 5 * time.Hour, - // }, - // operation: "count_over_time", - // }, "avg", &grouping{ - // without: false, - // groups: []string{"bar", "foo"}, - // }, nil), - // }, - // { - // in: `max without (bar) (count_over_time({ foo !~ "bar" }[5h]))`, - // exp: mustNewVectorAggregationExpr(&rangeAggregationExpr{ - // left: &logRange{ - // left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotRegexp, "foo", "bar")}}, - // interval: 5 * time.Hour, - // }, - // operation: "count_over_time", - // }, "max", &grouping{ - // without: true, - // groups: []string{"bar"}, - // }, nil), - // }, - // { - // in: `topk(10,count_over_time({ foo !~ "bar" }[5h])) without (bar)`, - // exp: mustNewVectorAggregationExpr(&rangeAggregationExpr{ - // left: &logRange{ - // left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotRegexp, "foo", "bar")}}, - // interval: 5 * time.Hour, - // }, - // operation: "count_over_time", - // }, "topk", &grouping{ - // without: true, - // groups: []string{"bar"}, - // }, NewStringLabelFilter("10")), - // }, - // { - // in: `bottomk(30 ,sum(rate({ foo !~ "bar" }[5h])) by (foo))`, - // exp: mustNewVectorAggregationExpr(mustNewVectorAggregationExpr(&rangeAggregationExpr{ - // left: &logRange{ - // left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotRegexp, "foo", "bar")}}, - // interval: 5 * time.Hour, - // }, - // operation: "rate", - // }, "sum", &grouping{ - // groups: []string{"foo"}, - // without: false, - // }, nil), "bottomk", nil, - // NewStringLabelFilter("30")), - // }, - // { - // in: `max( sum(count_over_time({ foo !~ "bar" }[5h])) without (foo,bar) ) by (foo)`, - // exp: mustNewVectorAggregationExpr(mustNewVectorAggregationExpr(&rangeAggregationExpr{ - // left: &logRange{ - // left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotRegexp, "foo", "bar")}}, - // interval: 5 * time.Hour, - // }, - // operation: "count_over_time", - // }, "sum", &grouping{ - // groups: []string{"foo", "bar"}, - // without: true, - // }, nil), "max", &grouping{ - // groups: []string{"foo"}, - // without: false, - // }, nil), - // }, - // { - // in: `unk({ foo !~ "bar" }[5m])`, - // err: ParseError{ - // msg: "syntax error: unexpected IDENTIFIER", - // line: 1, - // col: 1, - // }, - // }, - // { - // in: `rate({ foo !~ "bar" }[5minutes])`, - // err: ParseError{ - // msg: `not a valid duration string: "5minutes"`, - // line: 0, - // col: 22, - // }, - // }, - // { - // in: `rate({ foo !~ "bar" }[5)`, - // err: ParseError{ - // msg: "missing closing ']' in duration", - // line: 0, - // col: 22, - // }, - // }, - // { - // in: `min({ foo !~ "bar" }[5m])`, - // err: ParseError{ - // msg: "syntax error: unexpected RANGE", - // line: 0, - // col: 21, - // }, - // }, - // { - // in: `sum(3 ,count_over_time({ foo !~ "bar" }[5h]))`, - // err: ParseError{ - // msg: "unsupported parameter for operation sum(3,", - // line: 0, - // col: 0, - // }, - // }, - // { - // in: `topk(count_over_time({ foo !~ "bar" }[5h]))`, - // err: ParseError{ - // msg: "parameter required for operation topk", - // line: 0, - // col: 0, - // }, - // }, - // { - // in: `bottomk(he,count_over_time({ foo !~ "bar" }[5h]))`, - // err: ParseError{ - // msg: "syntax error: unexpected IDENTIFIER", - // line: 1, - // col: 9, - // }, - // }, - // { - // in: `bottomk(1.2,count_over_time({ foo !~ "bar" }[5h]))`, - // err: ParseError{ - // msg: "invalid parameter bottomk(1.2,", - // line: 0, - // col: 0, - // }, - // }, - // { - // in: `stddev({ foo !~ "bar" })`, - // err: ParseError{ - // msg: "syntax error: unexpected )", - // line: 1, - // col: 24, - // }, - // }, - // { - // in: `{ foo = "bar", bar != "baz" }`, - // exp: &matchersExpr{matchers: []*labels.Matcher{ - // mustNewMatcher(labels.MatchEqual, "foo", "bar"), - // mustNewMatcher(labels.MatchNotEqual, "bar", "baz"), - // }}, - // }, - // { - // in: `{foo="bar"} |= "baz"`, - // exp: newPipelineExpr( - // newMatcherExpr([]*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}), - // MultiStageExpr{newLineFilterExpr(nil, labels.MatchEqual, "baz")}, - // ), - // }, - // { - // in: `{foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap"`, - // exp: newPipelineExpr( - // newMatcherExpr([]*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}), - // MultiStageExpr{ - // newLineFilterExpr( - // newLineFilterExpr( - // newLineFilterExpr( - // newLineFilterExpr(nil, labels.MatchEqual, "baz"), - // labels.MatchRegexp, "blip"), - // labels.MatchNotEqual, "flip"), - // labels.MatchNotRegexp, "flap"), - // }, - // ), - // }, - // { - // in: `count_over_time(({foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap")[5m])`, - // exp: newRangeAggregationExpr( - // &logRange{ - // left: newPipelineExpr( - // newMatcherExpr([]*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}), - // MultiStageExpr{ - // newLineFilterExpr( - // newLineFilterExpr( - // newLineFilterExpr( - // newLineFilterExpr(nil, labels.MatchEqual, "baz"), - // labels.MatchRegexp, "blip"), - // labels.MatchNotEqual, "flip"), - // labels.MatchNotRegexp, "flap"), - // }, - // ), - // interval: 5 * time.Minute, - // }, OpRangeTypeCount, nil, nil), - // }, - // { - // in: `bytes_over_time(({foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap")[5m])`, - // exp: newRangeAggregationExpr( - // &logRange{ - // left: newPipelineExpr( - // newMatcherExpr([]*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}), - // MultiStageExpr{ - // newLineFilterExpr( - // newLineFilterExpr( - // newLineFilterExpr( - // newLineFilterExpr(nil, labels.MatchEqual, "baz"), - // labels.MatchRegexp, "blip"), - // labels.MatchNotEqual, "flip"), - // labels.MatchNotRegexp, "flap"), - // }, - // ), - // interval: 5 * time.Minute, - // }, OpRangeTypeBytes, nil, nil), - // }, - // { - // in: `sum(count_over_time(({foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap")[5m])) by (foo)`, - // exp: mustNewVectorAggregationExpr(newRangeAggregationExpr( - // &logRange{ - // left: newPipelineExpr( - // newMatcherExpr([]*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}), - // MultiStageExpr{ - // newLineFilterExpr( - // newLineFilterExpr( - // newLineFilterExpr( - // newLineFilterExpr(nil, labels.MatchEqual, "baz"), - // labels.MatchRegexp, "blip"), - // labels.MatchNotEqual, "flip"), - // labels.MatchNotRegexp, "flap"), - // }, - // ), - // interval: 5 * time.Minute, - // }, OpRangeTypeCount, nil, nil), - // "sum", - // &grouping{ - // without: false, - // groups: []string{"foo"}, - // }, - // nil), - // }, - // { - // in: `sum(bytes_rate(({foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap")[5m])) by (foo)`, - // exp: mustNewVectorAggregationExpr(newRangeAggregationExpr( - // &logRange{ - // left: newPipelineExpr( - // newMatcherExpr([]*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}), - // MultiStageExpr{ - // newLineFilterExpr( - // newLineFilterExpr( - // newLineFilterExpr( - // newLineFilterExpr(nil, labels.MatchEqual, "baz"), - // labels.MatchRegexp, "blip"), - // labels.MatchNotEqual, "flip"), - // labels.MatchNotRegexp, "flap"), - // }, - // ), - // interval: 5 * time.Minute, - // }, OpRangeTypeBytesRate, nil, nil), - // "sum", - // &grouping{ - // without: false, - // groups: []string{"foo"}, - // }, - // nil), - // }, - // { - // in: `topk(5,count_over_time(({foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap")[5m])) without (foo)`, - // exp: mustNewVectorAggregationExpr(newRangeAggregationExpr( - // &logRange{ - // left: newPipelineExpr( - // newMatcherExpr([]*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}), - // MultiStageExpr{ - // newLineFilterExpr( - // newLineFilterExpr( - // newLineFilterExpr( - // newLineFilterExpr(nil, labels.MatchEqual, "baz"), - // labels.MatchRegexp, "blip"), - // labels.MatchNotEqual, "flip"), - // labels.MatchNotRegexp, "flap"), - // }, - // ), - // interval: 5 * time.Minute, - // }, OpRangeTypeCount, nil, nil), - // "topk", - // &grouping{ - // without: true, - // groups: []string{"foo"}, - // }, - // NewStringLabelFilter("5")), - // }, - // { - // in: `topk(5,sum(rate(({foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap")[5m])) by (app))`, - // exp: mustNewVectorAggregationExpr( - // mustNewVectorAggregationExpr( - // newRangeAggregationExpr( - // &logRange{ - // left: newPipelineExpr( - // newMatcherExpr([]*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}), - // MultiStageExpr{ - // newLineFilterExpr( - // newLineFilterExpr( - // newLineFilterExpr( - // newLineFilterExpr(nil, labels.MatchEqual, "baz"), - // labels.MatchRegexp, "blip"), - // labels.MatchNotEqual, "flip"), - // labels.MatchNotRegexp, "flap"), - // }, - // ), - // interval: 5 * time.Minute, - // }, OpRangeTypeRate, nil, nil), - // "sum", - // &grouping{ - // without: false, - // groups: []string{"app"}, - // }, - // nil), - // "topk", - // nil, - // NewStringLabelFilter("5")), - // }, - // { - // in: `count_over_time({foo="bar"}[5m] |= "baz" |~ "blip" != "flip" !~ "flap")`, - // exp: newRangeAggregationExpr( - // &logRange{ - // left: newPipelineExpr( - // newMatcherExpr([]*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}), - // MultiStageExpr{ - // newLineFilterExpr( - // newLineFilterExpr( - // newLineFilterExpr( - // newLineFilterExpr(nil, labels.MatchEqual, "baz"), - // labels.MatchRegexp, "blip"), - // labels.MatchNotEqual, "flip"), - // labels.MatchNotRegexp, "flap"), - // }, - // ), - // interval: 5 * time.Minute, - // }, OpRangeTypeCount, nil, nil), - // }, - // { - // in: `sum(count_over_time({foo="bar"}[5m] |= "baz" |~ "blip" != "flip" !~ "flap")) by (foo)`, - // exp: mustNewVectorAggregationExpr(newRangeAggregationExpr( - // &logRange{ - // left: newPipelineExpr( - // newMatcherExpr([]*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}), - // MultiStageExpr{ - // newLineFilterExpr( - // newLineFilterExpr( - // newLineFilterExpr( - // newLineFilterExpr(nil, labels.MatchEqual, "baz"), - // labels.MatchRegexp, "blip"), - // labels.MatchNotEqual, "flip"), - // labels.MatchNotRegexp, "flap"), - // }, - // ), - // interval: 5 * time.Minute, - // }, OpRangeTypeCount, nil, nil), - // "sum", - // &grouping{ - // without: false, - // groups: []string{"foo"}, - // }, - // nil), - // }, - // { - // in: `topk(5,count_over_time({foo="bar"}[5m] |= "baz" |~ "blip" != "flip" !~ "flap")) without (foo)`, - // exp: mustNewVectorAggregationExpr(newRangeAggregationExpr( - // &logRange{ - // left: newPipelineExpr( - // newMatcherExpr([]*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}), - // MultiStageExpr{ - // newLineFilterExpr( - // newLineFilterExpr( - // newLineFilterExpr( - // newLineFilterExpr(nil, labels.MatchEqual, "baz"), - // labels.MatchRegexp, "blip"), - // labels.MatchNotEqual, "flip"), - // labels.MatchNotRegexp, "flap"), - // }, - // ), - // interval: 5 * time.Minute, - // }, OpRangeTypeCount, nil, nil), - // "topk", - // &grouping{ - // without: true, - // groups: []string{"foo"}, - // }, - // NewStringLabelFilter("5")), - // }, - // { - // in: `topk(5,sum(rate({foo="bar"}[5m] |= "baz" |~ "blip" != "flip" !~ "flap")) by (app))`, - // exp: mustNewVectorAggregationExpr( - // mustNewVectorAggregationExpr( - // newRangeAggregationExpr( - // &logRange{ - // left: newPipelineExpr( - // newMatcherExpr([]*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}), - // MultiStageExpr{ - // newLineFilterExpr( - // newLineFilterExpr( - // newLineFilterExpr( - // newLineFilterExpr(nil, labels.MatchEqual, "baz"), - // labels.MatchRegexp, "blip"), - // labels.MatchNotEqual, "flip"), - // labels.MatchNotRegexp, "flap"), - // }, - // ), - // interval: 5 * time.Minute, - // }, OpRangeTypeRate, nil, nil), - // "sum", - // &grouping{ - // without: false, - // groups: []string{"app"}, - // }, - // nil), - // "topk", - // nil, - // NewStringLabelFilter("5")), - // }, - // { - // in: `{foo="bar}`, - // err: ParseError{ - // msg: "literal not terminated", - // line: 1, - // col: 6, - // }, - // }, - // { - // in: `{foo="bar"`, - // err: ParseError{ - // msg: "syntax error: unexpected $end, expecting } or ,", - // line: 1, - // col: 11, - // }, - // }, + { + // raw string + in: "count_over_time({foo=~`bar\\w+`}[12h] |~ `error\\`)", + exp: &rangeAggregationExpr{ + operation: "count_over_time", + left: &logRange{ + left: &pipelineExpr{ + pipeline: MultiStageExpr{ + newLineFilterExpr(nil, labels.MatchRegexp, "error\\"), + }, + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchRegexp, "foo", "bar\\w+"), + }, + }, + }, + interval: 12 * time.Hour, + }, + }, + }, + { + // test [12h] before filter expr + in: `count_over_time({foo="bar"}[12h] |= "error")`, + exp: &rangeAggregationExpr{ + operation: "count_over_time", + left: &logRange{ + left: newPipelineExpr( + newMatcherExpr([]*labels.Matcher{{Type: labels.MatchEqual, Name: "foo", Value: "bar"}}), + MultiStageExpr{ + newLineFilterExpr(nil, labels.MatchEqual, "error"), + }, + ), + interval: 12 * time.Hour, + }, + }, + }, + { + // test [12h] after filter expr + in: `count_over_time({foo="bar"} |= "error" [12h])`, + exp: &rangeAggregationExpr{ + operation: "count_over_time", + left: &logRange{ + left: newPipelineExpr( + newMatcherExpr([]*labels.Matcher{{Type: labels.MatchEqual, Name: "foo", Value: "bar"}}), + MultiStageExpr{newLineFilterExpr(nil, labels.MatchEqual, "error")}, + ), + interval: 12 * time.Hour, + }, + }, + }, + { + in: `{foo="bar"}`, + exp: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}}, + }, + { + in: `{ foo = "bar" }`, + exp: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}}, + }, + { + in: `{ foo != "bar" }`, + exp: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotEqual, "foo", "bar")}}, + }, + { + in: `{ foo =~ "bar" }`, + exp: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchRegexp, "foo", "bar")}}, + }, + { + in: `{ foo !~ "bar" }`, + exp: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotRegexp, "foo", "bar")}}, + }, + { + in: `count_over_time({ foo !~ "bar" }[12m])`, + exp: &rangeAggregationExpr{ + left: &logRange{ + left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotRegexp, "foo", "bar")}}, + interval: 12 * time.Minute, + }, + operation: "count_over_time", + }, + }, + { + in: `bytes_over_time({ foo !~ "bar" }[12m])`, + exp: &rangeAggregationExpr{ + left: &logRange{ + left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotRegexp, "foo", "bar")}}, + interval: 12 * time.Minute, + }, + operation: OpRangeTypeBytes, + }, + }, + { + in: `bytes_rate({ foo !~ "bar" }[12m])`, + exp: &rangeAggregationExpr{ + left: &logRange{ + left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotRegexp, "foo", "bar")}}, + interval: 12 * time.Minute, + }, + operation: OpRangeTypeBytesRate, + }, + }, + { + in: `rate({ foo !~ "bar" }[5h])`, + exp: &rangeAggregationExpr{ + left: &logRange{ + left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotRegexp, "foo", "bar")}}, + interval: 5 * time.Hour, + }, + operation: "rate", + }, + }, + { + in: `rate({ foo !~ "bar" }[5d])`, + exp: &rangeAggregationExpr{ + left: &logRange{ + left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotRegexp, "foo", "bar")}}, + interval: 5 * 24 * time.Hour, + }, + operation: "rate", + }, + }, + { + in: `count_over_time({ foo !~ "bar" }[1w])`, + exp: &rangeAggregationExpr{ + left: &logRange{ + left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotRegexp, "foo", "bar")}}, + interval: 7 * 24 * time.Hour, + }, + operation: "count_over_time", + }, + }, + { + in: `sum(rate({ foo !~ "bar" }[5h]))`, + exp: mustNewVectorAggregationExpr(&rangeAggregationExpr{ + left: &logRange{ + left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotRegexp, "foo", "bar")}}, + interval: 5 * time.Hour, + }, + operation: "rate", + }, "sum", nil, nil), + }, + { + in: `sum(rate({ foo !~ "bar" }[1y]))`, + exp: mustNewVectorAggregationExpr(&rangeAggregationExpr{ + left: &logRange{ + left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotRegexp, "foo", "bar")}}, + interval: 365 * 24 * time.Hour, + }, + operation: "rate", + }, "sum", nil, nil), + }, + { + in: `avg(count_over_time({ foo !~ "bar" }[5h])) by (bar,foo)`, + exp: mustNewVectorAggregationExpr(&rangeAggregationExpr{ + left: &logRange{ + left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotRegexp, "foo", "bar")}}, + interval: 5 * time.Hour, + }, + operation: "count_over_time", + }, "avg", &grouping{ + without: false, + groups: []string{"bar", "foo"}, + }, nil), + }, + { + in: `max without (bar) (count_over_time({ foo !~ "bar" }[5h]))`, + exp: mustNewVectorAggregationExpr(&rangeAggregationExpr{ + left: &logRange{ + left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotRegexp, "foo", "bar")}}, + interval: 5 * time.Hour, + }, + operation: "count_over_time", + }, "max", &grouping{ + without: true, + groups: []string{"bar"}, + }, nil), + }, + { + in: `topk(10,count_over_time({ foo !~ "bar" }[5h])) without (bar)`, + exp: mustNewVectorAggregationExpr(&rangeAggregationExpr{ + left: &logRange{ + left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotRegexp, "foo", "bar")}}, + interval: 5 * time.Hour, + }, + operation: "count_over_time", + }, "topk", &grouping{ + without: true, + groups: []string{"bar"}, + }, NewStringLabelFilter("10")), + }, + { + in: `bottomk(30 ,sum(rate({ foo !~ "bar" }[5h])) by (foo))`, + exp: mustNewVectorAggregationExpr(mustNewVectorAggregationExpr(&rangeAggregationExpr{ + left: &logRange{ + left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotRegexp, "foo", "bar")}}, + interval: 5 * time.Hour, + }, + operation: "rate", + }, "sum", &grouping{ + groups: []string{"foo"}, + without: false, + }, nil), "bottomk", nil, + NewStringLabelFilter("30")), + }, + { + in: `max( sum(count_over_time({ foo !~ "bar" }[5h])) without (foo,bar) ) by (foo)`, + exp: mustNewVectorAggregationExpr(mustNewVectorAggregationExpr(&rangeAggregationExpr{ + left: &logRange{ + left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotRegexp, "foo", "bar")}}, + interval: 5 * time.Hour, + }, + operation: "count_over_time", + }, "sum", &grouping{ + groups: []string{"foo", "bar"}, + without: true, + }, nil), "max", &grouping{ + groups: []string{"foo"}, + without: false, + }, nil), + }, + { + in: `unk({ foo !~ "bar" }[5m])`, + err: ParseError{ + msg: "syntax error: unexpected IDENTIFIER", + line: 1, + col: 1, + }, + }, + { + in: `rate({ foo !~ "bar" }[5minutes])`, + err: ParseError{ + msg: `not a valid duration string: "5minutes"`, + line: 0, + col: 22, + }, + }, + { + in: `rate({ foo !~ "bar" }[5)`, + err: ParseError{ + msg: "missing closing ']' in duration", + line: 0, + col: 22, + }, + }, + { + in: `min({ foo !~ "bar" }[5m])`, + err: ParseError{ + msg: "syntax error: unexpected RANGE", + line: 0, + col: 21, + }, + }, + { + in: `sum(3 ,count_over_time({ foo !~ "bar" }[5h]))`, + err: ParseError{ + msg: "unsupported parameter for operation sum(3,", + line: 0, + col: 0, + }, + }, + { + in: `topk(count_over_time({ foo !~ "bar" }[5h]))`, + err: ParseError{ + msg: "parameter required for operation topk", + line: 0, + col: 0, + }, + }, + { + in: `bottomk(he,count_over_time({ foo !~ "bar" }[5h]))`, + err: ParseError{ + msg: "syntax error: unexpected IDENTIFIER", + line: 1, + col: 9, + }, + }, + { + in: `bottomk(1.2,count_over_time({ foo !~ "bar" }[5h]))`, + err: ParseError{ + msg: "invalid parameter bottomk(1.2,", + line: 0, + col: 0, + }, + }, + { + in: `stddev({ foo !~ "bar" })`, + err: ParseError{ + msg: "syntax error: unexpected )", + line: 1, + col: 24, + }, + }, + { + in: `{ foo = "bar", bar != "baz" }`, + exp: &matchersExpr{matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + mustNewMatcher(labels.MatchNotEqual, "bar", "baz"), + }}, + }, + { + in: `{foo="bar"} |= "baz"`, + exp: newPipelineExpr( + newMatcherExpr([]*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}), + MultiStageExpr{newLineFilterExpr(nil, labels.MatchEqual, "baz")}, + ), + }, + { + in: `{foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap"`, + exp: newPipelineExpr( + newMatcherExpr([]*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}), + MultiStageExpr{ + newLineFilterExpr( + newLineFilterExpr( + newLineFilterExpr( + newLineFilterExpr(nil, labels.MatchEqual, "baz"), + labels.MatchRegexp, "blip"), + labels.MatchNotEqual, "flip"), + labels.MatchNotRegexp, "flap"), + }, + ), + }, + { + in: `count_over_time(({foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap")[5m])`, + exp: newRangeAggregationExpr( + &logRange{ + left: newPipelineExpr( + newMatcherExpr([]*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}), + MultiStageExpr{ + newLineFilterExpr( + newLineFilterExpr( + newLineFilterExpr( + newLineFilterExpr(nil, labels.MatchEqual, "baz"), + labels.MatchRegexp, "blip"), + labels.MatchNotEqual, "flip"), + labels.MatchNotRegexp, "flap"), + }, + ), + interval: 5 * time.Minute, + }, OpRangeTypeCount, nil, nil), + }, + { + in: `bytes_over_time(({foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap")[5m])`, + exp: newRangeAggregationExpr( + &logRange{ + left: newPipelineExpr( + newMatcherExpr([]*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}), + MultiStageExpr{ + newLineFilterExpr( + newLineFilterExpr( + newLineFilterExpr( + newLineFilterExpr(nil, labels.MatchEqual, "baz"), + labels.MatchRegexp, "blip"), + labels.MatchNotEqual, "flip"), + labels.MatchNotRegexp, "flap"), + }, + ), + interval: 5 * time.Minute, + }, OpRangeTypeBytes, nil, nil), + }, + { + in: `sum(count_over_time(({foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap")[5m])) by (foo)`, + exp: mustNewVectorAggregationExpr(newRangeAggregationExpr( + &logRange{ + left: newPipelineExpr( + newMatcherExpr([]*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}), + MultiStageExpr{ + newLineFilterExpr( + newLineFilterExpr( + newLineFilterExpr( + newLineFilterExpr(nil, labels.MatchEqual, "baz"), + labels.MatchRegexp, "blip"), + labels.MatchNotEqual, "flip"), + labels.MatchNotRegexp, "flap"), + }, + ), + interval: 5 * time.Minute, + }, OpRangeTypeCount, nil, nil), + "sum", + &grouping{ + without: false, + groups: []string{"foo"}, + }, + nil), + }, + { + in: `sum(bytes_rate(({foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap")[5m])) by (foo)`, + exp: mustNewVectorAggregationExpr(newRangeAggregationExpr( + &logRange{ + left: newPipelineExpr( + newMatcherExpr([]*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}), + MultiStageExpr{ + newLineFilterExpr( + newLineFilterExpr( + newLineFilterExpr( + newLineFilterExpr(nil, labels.MatchEqual, "baz"), + labels.MatchRegexp, "blip"), + labels.MatchNotEqual, "flip"), + labels.MatchNotRegexp, "flap"), + }, + ), + interval: 5 * time.Minute, + }, OpRangeTypeBytesRate, nil, nil), + "sum", + &grouping{ + without: false, + groups: []string{"foo"}, + }, + nil), + }, + { + in: `topk(5,count_over_time(({foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap")[5m])) without (foo)`, + exp: mustNewVectorAggregationExpr(newRangeAggregationExpr( + &logRange{ + left: newPipelineExpr( + newMatcherExpr([]*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}), + MultiStageExpr{ + newLineFilterExpr( + newLineFilterExpr( + newLineFilterExpr( + newLineFilterExpr(nil, labels.MatchEqual, "baz"), + labels.MatchRegexp, "blip"), + labels.MatchNotEqual, "flip"), + labels.MatchNotRegexp, "flap"), + }, + ), + interval: 5 * time.Minute, + }, OpRangeTypeCount, nil, nil), + "topk", + &grouping{ + without: true, + groups: []string{"foo"}, + }, + NewStringLabelFilter("5")), + }, + { + in: `topk(5,sum(rate(({foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap")[5m])) by (app))`, + exp: mustNewVectorAggregationExpr( + mustNewVectorAggregationExpr( + newRangeAggregationExpr( + &logRange{ + left: newPipelineExpr( + newMatcherExpr([]*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}), + MultiStageExpr{ + newLineFilterExpr( + newLineFilterExpr( + newLineFilterExpr( + newLineFilterExpr(nil, labels.MatchEqual, "baz"), + labels.MatchRegexp, "blip"), + labels.MatchNotEqual, "flip"), + labels.MatchNotRegexp, "flap"), + }, + ), + interval: 5 * time.Minute, + }, OpRangeTypeRate, nil, nil), + "sum", + &grouping{ + without: false, + groups: []string{"app"}, + }, + nil), + "topk", + nil, + NewStringLabelFilter("5")), + }, + { + in: `count_over_time({foo="bar"}[5m] |= "baz" |~ "blip" != "flip" !~ "flap")`, + exp: newRangeAggregationExpr( + &logRange{ + left: newPipelineExpr( + newMatcherExpr([]*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}), + MultiStageExpr{ + newLineFilterExpr( + newLineFilterExpr( + newLineFilterExpr( + newLineFilterExpr(nil, labels.MatchEqual, "baz"), + labels.MatchRegexp, "blip"), + labels.MatchNotEqual, "flip"), + labels.MatchNotRegexp, "flap"), + }, + ), + interval: 5 * time.Minute, + }, OpRangeTypeCount, nil, nil), + }, + { + in: `sum(count_over_time({foo="bar"}[5m] |= "baz" |~ "blip" != "flip" !~ "flap")) by (foo)`, + exp: mustNewVectorAggregationExpr(newRangeAggregationExpr( + &logRange{ + left: newPipelineExpr( + newMatcherExpr([]*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}), + MultiStageExpr{ + newLineFilterExpr( + newLineFilterExpr( + newLineFilterExpr( + newLineFilterExpr(nil, labels.MatchEqual, "baz"), + labels.MatchRegexp, "blip"), + labels.MatchNotEqual, "flip"), + labels.MatchNotRegexp, "flap"), + }, + ), + interval: 5 * time.Minute, + }, OpRangeTypeCount, nil, nil), + "sum", + &grouping{ + without: false, + groups: []string{"foo"}, + }, + nil), + }, + { + in: `topk(5,count_over_time({foo="bar"}[5m] |= "baz" |~ "blip" != "flip" !~ "flap")) without (foo)`, + exp: mustNewVectorAggregationExpr(newRangeAggregationExpr( + &logRange{ + left: newPipelineExpr( + newMatcherExpr([]*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}), + MultiStageExpr{ + newLineFilterExpr( + newLineFilterExpr( + newLineFilterExpr( + newLineFilterExpr(nil, labels.MatchEqual, "baz"), + labels.MatchRegexp, "blip"), + labels.MatchNotEqual, "flip"), + labels.MatchNotRegexp, "flap"), + }, + ), + interval: 5 * time.Minute, + }, OpRangeTypeCount, nil, nil), + "topk", + &grouping{ + without: true, + groups: []string{"foo"}, + }, + NewStringLabelFilter("5")), + }, + { + in: `topk(5,sum(rate({foo="bar"}[5m] |= "baz" |~ "blip" != "flip" !~ "flap")) by (app))`, + exp: mustNewVectorAggregationExpr( + mustNewVectorAggregationExpr( + newRangeAggregationExpr( + &logRange{ + left: newPipelineExpr( + newMatcherExpr([]*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}), + MultiStageExpr{ + newLineFilterExpr( + newLineFilterExpr( + newLineFilterExpr( + newLineFilterExpr(nil, labels.MatchEqual, "baz"), + labels.MatchRegexp, "blip"), + labels.MatchNotEqual, "flip"), + labels.MatchNotRegexp, "flap"), + }, + ), + interval: 5 * time.Minute, + }, OpRangeTypeRate, nil, nil), + "sum", + &grouping{ + without: false, + groups: []string{"app"}, + }, + nil), + "topk", + nil, + NewStringLabelFilter("5")), + }, + { + in: `{foo="bar}`, + err: ParseError{ + msg: "literal not terminated", + line: 1, + col: 6, + }, + }, + { + in: `{foo="bar"`, + err: ParseError{ + msg: "syntax error: unexpected $end, expecting } or ,", + line: 1, + col: 11, + }, + }, - // { - // in: `{foo="bar"} |~`, - // err: ParseError{ - // msg: "syntax error: unexpected $end, expecting STRING", - // line: 1, - // col: 15, - // }, - // }, + { + in: `{foo="bar"} |~`, + err: ParseError{ + msg: "syntax error: unexpected $end, expecting STRING", + line: 1, + col: 15, + }, + }, - // { - // in: `{foo="bar"} "foo"`, - // err: ParseError{ - // msg: "syntax error: unexpected STRING", - // line: 1, - // col: 13, - // }, - // }, - // { - // in: `{foo="bar"} foo`, - // err: ParseError{ - // msg: "syntax error: unexpected IDENTIFIER", - // line: 1, - // col: 13, - // }, - // }, - // { - // // require left associativity - // in: ` - // sum(count_over_time({foo="bar"}[5m])) by (foo) / - // sum(count_over_time({foo="bar"}[5m])) by (foo) / - // sum(count_over_time({foo="bar"}[5m])) by (foo) - // `, - // exp: mustNewBinOpExpr( - // OpTypeDiv, - // BinOpOptions{}, - // mustNewBinOpExpr( - // OpTypeDiv, - // BinOpOptions{}, - // mustNewVectorAggregationExpr(newRangeAggregationExpr( - // &logRange{ - // left: &matchersExpr{ - // matchers: []*labels.Matcher{ - // mustNewMatcher(labels.MatchEqual, "foo", "bar"), - // }, - // }, - // interval: 5 * time.Minute, - // }, OpRangeTypeCount, nil, nil), - // "sum", - // &grouping{ - // without: false, - // groups: []string{"foo"}, - // }, - // nil, - // ), - // mustNewVectorAggregationExpr(newRangeAggregationExpr( - // &logRange{ - // left: &matchersExpr{ - // matchers: []*labels.Matcher{ - // mustNewMatcher(labels.MatchEqual, "foo", "bar"), - // }, - // }, - // interval: 5 * time.Minute, - // }, OpRangeTypeCount, nil, nil), - // "sum", - // &grouping{ - // without: false, - // groups: []string{"foo"}, - // }, - // nil, - // ), - // ), - // mustNewVectorAggregationExpr(newRangeAggregationExpr( - // &logRange{ - // left: &matchersExpr{ - // matchers: []*labels.Matcher{ - // mustNewMatcher(labels.MatchEqual, "foo", "bar"), - // }, - // }, - // interval: 5 * time.Minute, - // }, OpRangeTypeCount, nil, nil), - // "sum", - // &grouping{ - // without: false, - // groups: []string{"foo"}, - // }, - // nil, - // ), - // ), - // }, - // { - // in: ` - // sum(count_over_time({foo="bar"}[5m])) by (foo) ^ - // sum(count_over_time({foo="bar"}[5m])) by (foo) / - // sum(count_over_time({foo="bar"}[5m])) by (foo) - // `, - // exp: mustNewBinOpExpr( - // OpTypeDiv, - // BinOpOptions{}, - // mustNewBinOpExpr( - // OpTypePow, - // BinOpOptions{}, - // mustNewVectorAggregationExpr(newRangeAggregationExpr( - // &logRange{ - // left: &matchersExpr{ - // matchers: []*labels.Matcher{ - // mustNewMatcher(labels.MatchEqual, "foo", "bar"), - // }, - // }, - // interval: 5 * time.Minute, - // }, OpRangeTypeCount, nil, nil), - // "sum", - // &grouping{ - // without: false, - // groups: []string{"foo"}, - // }, - // nil, - // ), - // mustNewVectorAggregationExpr(newRangeAggregationExpr( - // &logRange{ - // left: &matchersExpr{ - // matchers: []*labels.Matcher{ - // mustNewMatcher(labels.MatchEqual, "foo", "bar"), - // }, - // }, - // interval: 5 * time.Minute, - // }, OpRangeTypeCount, nil, nil), - // "sum", - // &grouping{ - // without: false, - // groups: []string{"foo"}, - // }, - // nil, - // ), - // ), - // mustNewVectorAggregationExpr(newRangeAggregationExpr( - // &logRange{ - // left: &matchersExpr{ - // matchers: []*labels.Matcher{ - // mustNewMatcher(labels.MatchEqual, "foo", "bar"), - // }, - // }, - // interval: 5 * time.Minute, - // }, OpRangeTypeCount, nil, nil), - // "sum", - // &grouping{ - // without: false, - // groups: []string{"foo"}, - // }, - // nil, - // ), - // ), - // }, - // { - // // operator precedence before left associativity - // in: ` - // sum(count_over_time({foo="bar"}[5m])) by (foo) + - // sum(count_over_time({foo="bar"}[5m])) by (foo) / - // sum(count_over_time({foo="bar"}[5m])) by (foo) - // `, - // exp: mustNewBinOpExpr( - // OpTypeAdd, - // BinOpOptions{}, - // mustNewVectorAggregationExpr(newRangeAggregationExpr( - // &logRange{ - // left: &matchersExpr{ - // matchers: []*labels.Matcher{ - // mustNewMatcher(labels.MatchEqual, "foo", "bar"), - // }, - // }, - // interval: 5 * time.Minute, - // }, OpRangeTypeCount, nil, nil), - // "sum", - // &grouping{ - // without: false, - // groups: []string{"foo"}, - // }, - // nil, - // ), - // mustNewBinOpExpr( - // OpTypeDiv, - // BinOpOptions{}, - // mustNewVectorAggregationExpr(newRangeAggregationExpr( - // &logRange{ - // left: &matchersExpr{ - // matchers: []*labels.Matcher{ - // mustNewMatcher(labels.MatchEqual, "foo", "bar"), - // }, - // }, - // interval: 5 * time.Minute, - // }, OpRangeTypeCount, nil, nil), - // "sum", - // &grouping{ - // without: false, - // groups: []string{"foo"}, - // }, - // nil, - // ), - // mustNewVectorAggregationExpr(newRangeAggregationExpr( - // &logRange{ - // left: &matchersExpr{ - // matchers: []*labels.Matcher{ - // mustNewMatcher(labels.MatchEqual, "foo", "bar"), - // }, - // }, - // interval: 5 * time.Minute, - // }, OpRangeTypeCount, nil, nil), - // "sum", - // &grouping{ - // without: false, - // groups: []string{"foo"}, - // }, - // nil, - // ), - // ), - // ), - // }, - // { - // in: `sum by (job) ( - // count_over_time({namespace="tns"} |= "level=error"[5m]) - // / - // count_over_time({namespace="tns"}[5m]) - // )`, - // exp: mustNewVectorAggregationExpr( - // mustNewBinOpExpr(OpTypeDiv, - // BinOpOptions{}, - // newRangeAggregationExpr( - // &logRange{ - // left: newPipelineExpr( - // newMatcherExpr([]*labels.Matcher{ - // mustNewMatcher(labels.MatchEqual, "namespace", "tns"), - // }), - // MultiStageExpr{ - // newLineFilterExpr(nil, labels.MatchEqual, "level=error"), - // }), - // interval: 5 * time.Minute, - // }, OpRangeTypeCount, nil, nil), - // newRangeAggregationExpr( - // &logRange{ - // left: &matchersExpr{ - // matchers: []*labels.Matcher{ - // mustNewMatcher(labels.MatchEqual, "namespace", "tns"), - // }, - // }, - // interval: 5 * time.Minute, - // }, OpRangeTypeCount, nil, nil)), OpTypeSum, &grouping{groups: []string{"job"}}, nil), - // }, - // { - // in: `sum by (job) ( - // count_over_time({namespace="tns"} |= "level=error"[5m]) - // / - // count_over_time({namespace="tns"}[5m]) - // ) * 100`, - // exp: mustNewBinOpExpr(OpTypeMul, BinOpOptions{}, mustNewVectorAggregationExpr( - // mustNewBinOpExpr(OpTypeDiv, - // BinOpOptions{}, - // newRangeAggregationExpr( - // &logRange{ - // left: newPipelineExpr( - // newMatcherExpr([]*labels.Matcher{ - // mustNewMatcher(labels.MatchEqual, "namespace", "tns"), - // }), - // MultiStageExpr{ - // newLineFilterExpr(nil, labels.MatchEqual, "level=error"), - // }), - // interval: 5 * time.Minute, - // }, OpRangeTypeCount, nil, nil), - // newRangeAggregationExpr( - // &logRange{ - // left: &matchersExpr{ - // matchers: []*labels.Matcher{ - // mustNewMatcher(labels.MatchEqual, "namespace", "tns"), - // }, - // }, - // interval: 5 * time.Minute, - // }, OpRangeTypeCount, nil, nil)), OpTypeSum, &grouping{groups: []string{"job"}}, nil), - // mustNewLiteralExpr("100", false), - // ), - // }, - // { - // // reduces binop with two literalExprs - // in: `sum(count_over_time({foo="bar"}[5m])) by (foo) + 1 / 2`, - // exp: mustNewBinOpExpr( - // OpTypeAdd, - // BinOpOptions{}, - // mustNewVectorAggregationExpr( - // newRangeAggregationExpr( - // &logRange{ - // left: &matchersExpr{ - // matchers: []*labels.Matcher{ - // mustNewMatcher(labels.MatchEqual, "foo", "bar"), - // }, - // }, - // interval: 5 * time.Minute, - // }, OpRangeTypeCount, nil, nil), - // "sum", - // &grouping{ - // without: false, - // groups: []string{"foo"}, - // }, - // nil, - // ), - // &literalExpr{value: 0.5}, - // ), - // }, - // { - // // test signs - // in: `1 + -2 / 1`, - // exp: mustNewBinOpExpr( - // OpTypeAdd, - // BinOpOptions{}, - // &literalExpr{value: 1}, - // mustNewBinOpExpr(OpTypeDiv, BinOpOptions{}, &literalExpr{value: -2}, &literalExpr{value: 1}), - // ), - // }, - // { - // // test signs/ops with equal associativity - // in: `1 + 1 - -1`, - // exp: mustNewBinOpExpr( - // OpTypeSub, - // BinOpOptions{}, - // mustNewBinOpExpr(OpTypeAdd, BinOpOptions{}, &literalExpr{value: 1}, &literalExpr{value: 1}), - // &literalExpr{value: -1}, - // ), - // }, + { + in: `{foo="bar"} "foo"`, + err: ParseError{ + msg: "syntax error: unexpected STRING", + line: 1, + col: 13, + }, + }, + { + in: `{foo="bar"} foo`, + err: ParseError{ + msg: "syntax error: unexpected IDENTIFIER", + line: 1, + col: 13, + }, + }, + { + // require left associativity + in: ` + sum(count_over_time({foo="bar"}[5m])) by (foo) / + sum(count_over_time({foo="bar"}[5m])) by (foo) / + sum(count_over_time({foo="bar"}[5m])) by (foo) + `, + exp: mustNewBinOpExpr( + OpTypeDiv, + BinOpOptions{}, + mustNewBinOpExpr( + OpTypeDiv, + BinOpOptions{}, + mustNewVectorAggregationExpr(newRangeAggregationExpr( + &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, OpRangeTypeCount, nil, nil), + "sum", + &grouping{ + without: false, + groups: []string{"foo"}, + }, + nil, + ), + mustNewVectorAggregationExpr(newRangeAggregationExpr( + &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, OpRangeTypeCount, nil, nil), + "sum", + &grouping{ + without: false, + groups: []string{"foo"}, + }, + nil, + ), + ), + mustNewVectorAggregationExpr(newRangeAggregationExpr( + &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, OpRangeTypeCount, nil, nil), + "sum", + &grouping{ + without: false, + groups: []string{"foo"}, + }, + nil, + ), + ), + }, + { + in: ` + sum(count_over_time({foo="bar"}[5m])) by (foo) ^ + sum(count_over_time({foo="bar"}[5m])) by (foo) / + sum(count_over_time({foo="bar"}[5m])) by (foo) + `, + exp: mustNewBinOpExpr( + OpTypeDiv, + BinOpOptions{}, + mustNewBinOpExpr( + OpTypePow, + BinOpOptions{}, + mustNewVectorAggregationExpr(newRangeAggregationExpr( + &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, OpRangeTypeCount, nil, nil), + "sum", + &grouping{ + without: false, + groups: []string{"foo"}, + }, + nil, + ), + mustNewVectorAggregationExpr(newRangeAggregationExpr( + &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, OpRangeTypeCount, nil, nil), + "sum", + &grouping{ + without: false, + groups: []string{"foo"}, + }, + nil, + ), + ), + mustNewVectorAggregationExpr(newRangeAggregationExpr( + &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, OpRangeTypeCount, nil, nil), + "sum", + &grouping{ + without: false, + groups: []string{"foo"}, + }, + nil, + ), + ), + }, + { + // operator precedence before left associativity + in: ` + sum(count_over_time({foo="bar"}[5m])) by (foo) + + sum(count_over_time({foo="bar"}[5m])) by (foo) / + sum(count_over_time({foo="bar"}[5m])) by (foo) + `, + exp: mustNewBinOpExpr( + OpTypeAdd, + BinOpOptions{}, + mustNewVectorAggregationExpr(newRangeAggregationExpr( + &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, OpRangeTypeCount, nil, nil), + "sum", + &grouping{ + without: false, + groups: []string{"foo"}, + }, + nil, + ), + mustNewBinOpExpr( + OpTypeDiv, + BinOpOptions{}, + mustNewVectorAggregationExpr(newRangeAggregationExpr( + &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, OpRangeTypeCount, nil, nil), + "sum", + &grouping{ + without: false, + groups: []string{"foo"}, + }, + nil, + ), + mustNewVectorAggregationExpr(newRangeAggregationExpr( + &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, OpRangeTypeCount, nil, nil), + "sum", + &grouping{ + without: false, + groups: []string{"foo"}, + }, + nil, + ), + ), + ), + }, + { + in: `sum by (job) ( + count_over_time({namespace="tns"} |= "level=error"[5m]) + / + count_over_time({namespace="tns"}[5m]) + )`, + exp: mustNewVectorAggregationExpr( + mustNewBinOpExpr(OpTypeDiv, + BinOpOptions{}, + newRangeAggregationExpr( + &logRange{ + left: newPipelineExpr( + newMatcherExpr([]*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "namespace", "tns"), + }), + MultiStageExpr{ + newLineFilterExpr(nil, labels.MatchEqual, "level=error"), + }), + interval: 5 * time.Minute, + }, OpRangeTypeCount, nil, nil), + newRangeAggregationExpr( + &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "namespace", "tns"), + }, + }, + interval: 5 * time.Minute, + }, OpRangeTypeCount, nil, nil)), OpTypeSum, &grouping{groups: []string{"job"}}, nil), + }, + { + in: `sum by (job) ( + count_over_time({namespace="tns"} |= "level=error"[5m]) + / + count_over_time({namespace="tns"}[5m]) + ) * 100`, + exp: mustNewBinOpExpr(OpTypeMul, BinOpOptions{}, mustNewVectorAggregationExpr( + mustNewBinOpExpr(OpTypeDiv, + BinOpOptions{}, + newRangeAggregationExpr( + &logRange{ + left: newPipelineExpr( + newMatcherExpr([]*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "namespace", "tns"), + }), + MultiStageExpr{ + newLineFilterExpr(nil, labels.MatchEqual, "level=error"), + }), + interval: 5 * time.Minute, + }, OpRangeTypeCount, nil, nil), + newRangeAggregationExpr( + &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "namespace", "tns"), + }, + }, + interval: 5 * time.Minute, + }, OpRangeTypeCount, nil, nil)), OpTypeSum, &grouping{groups: []string{"job"}}, nil), + mustNewLiteralExpr("100", false), + ), + }, + { + // reduces binop with two literalExprs + in: `sum(count_over_time({foo="bar"}[5m])) by (foo) + 1 / 2`, + exp: mustNewBinOpExpr( + OpTypeAdd, + BinOpOptions{}, + mustNewVectorAggregationExpr( + newRangeAggregationExpr( + &logRange{ + left: &matchersExpr{ + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + interval: 5 * time.Minute, + }, OpRangeTypeCount, nil, nil), + "sum", + &grouping{ + without: false, + groups: []string{"foo"}, + }, + nil, + ), + &literalExpr{value: 0.5}, + ), + }, + { + // test signs + in: `1 + -2 / 1`, + exp: mustNewBinOpExpr( + OpTypeAdd, + BinOpOptions{}, + &literalExpr{value: 1}, + mustNewBinOpExpr(OpTypeDiv, BinOpOptions{}, &literalExpr{value: -2}, &literalExpr{value: 1}), + ), + }, + { + // test signs/ops with equal associativity + in: `1 + 1 - -1`, + exp: mustNewBinOpExpr( + OpTypeSub, + BinOpOptions{}, + mustNewBinOpExpr(OpTypeAdd, BinOpOptions{}, &literalExpr{value: 1}, &literalExpr{value: 1}), + &literalExpr{value: -1}, + ), + }, { in: `{app="foo"} |= "bar" | json | latency >= 250ms or ( status_code < 500 and status_code > 200)`, exp: &pipelineExpr{ diff --git a/pkg/logql/sharding.go b/pkg/logql/sharding.go index 01afe7fe1d..c108d64e59 100644 --- a/pkg/logql/sharding.go +++ b/pkg/logql/sharding.go @@ -192,7 +192,7 @@ func NewDownstreamEvaluator(downstreamer Downstreamer) *DownstreamEvaluator { // Evaluator returns a StepEvaluator for a given SampleExpr func (ev *DownstreamEvaluator) StepEvaluator( ctx context.Context, - nextEv Evaluator, + nextEv SampleEvaluator, expr SampleExpr, params Params, ) (StepEvaluator, error) { diff --git a/pkg/logql/sharding_test.go b/pkg/logql/sharding_test.go index 49b56ceab7..d6b364c6a9 100644 --- a/pkg/logql/sharding_test.go +++ b/pkg/logql/sharding_test.go @@ -37,9 +37,12 @@ func TestMappingEquivalence(t *testing.T) { {`{a="1"} |= "number: 10"`, false}, {`rate({a=~".*"}[1s])`, false}, {`sum by (a) (rate({a=~".*"}[1s]))`, false}, + {`sum(rate({a=~".*"}[1s]))`, false}, + {`max without (a) (rate({a=~".*"}[1s]))`, false}, {`count(rate({a=~".*"}[1s]))`, false}, {`avg(rate({a=~".*"}[1s]))`, true}, + {`avg(rate({a=~".*"}[1s])) by (a)`, true}, {`1 + sum by (cluster) (rate({a=~".*"}[1s]))`, false}, {`sum(max(rate({a=~".*"}[1s])))`, false}, {`max(count(rate({a=~".*"}[1s])))`, false}, diff --git a/pkg/logql/test_utils.go b/pkg/logql/test_utils.go index 8ac0534af7..01db6fceeb 100644 --- a/pkg/logql/test_utils.go +++ b/pkg/logql/test_utils.go @@ -4,8 +4,10 @@ import ( "context" "fmt" logger "log" + "sort" "time" + "github.com/cespare/xxhash/v2" "github.com/cortexproject/cortex/pkg/querier/astmapper" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql/parser" @@ -133,12 +135,14 @@ func processSeries(in []logproto.Stream, ex log.SampleExtractor) []logproto.Seri s.Samples = append(s.Samples, logproto.Sample{ Timestamp: e.Timestamp.UnixNano(), Value: f, + Hash: xxhash.Sum64([]byte(e.Line)), }) } } } series := []logproto.Series{} for _, s := range resBySeries { + sort.Sort(s) series = append(series, *s) } return series diff --git a/pkg/storage/batch_test.go b/pkg/storage/batch_test.go index b99cb58568..01742e5514 100644 --- a/pkg/storage/batch_test.go +++ b/pkg/storage/batch_test.go @@ -18,6 +18,7 @@ import ( "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/log" "github.com/grafana/loki/pkg/logql/stats" ) @@ -1230,7 +1231,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { for name, tt := range tests { tt := tt t.Run(name, func(t *testing.T) { - it, err := newSampleBatchIterator(context.Background(), NilMetrics, tt.chunks, tt.batchSize, newMatchers(tt.matchers), logql.ExtractCount, tt.start, tt.end) + it, err := newSampleBatchIterator(context.Background(), NilMetrics, tt.chunks, tt.batchSize, newMatchers(tt.matchers), log.CountExtractor.ToSampleExtractor(nil, false, false), tt.start, tt.end) require.NoError(t, err) series, _, err := iter.ReadSampleBatch(it, 1000) _ = it.Close()