Logqlv2 pushes groups down to edge (#2786)

Pushes down grouping into range aggregation to reduce labels at edges.

This works  only for sums.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
pull/2788/head
Cyril Tovena 5 years ago committed by GitHub
parent dd5fceb348
commit 2e14c45ceb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      pkg/chunkenc/memchunk.go
  2. 5
      pkg/chunkenc/memchunk_test.go
  3. 4
      pkg/logproto/extensions.go
  4. 6
      pkg/logql/ast.go
  5. 31
      pkg/logql/engine_test.go
  6. 41
      pkg/logql/evaluator.go
  7. 34
      pkg/logql/functions.go
  8. 10
      pkg/logql/log/labels.go
  9. 56
      pkg/logql/log/metrics_extraction.go
  10. 18
      pkg/logql/log/metrics_extraction_test.go
  11. 1802
      pkg/logql/parser_test.go
  12. 2
      pkg/logql/sharding.go
  13. 3
      pkg/logql/sharding_test.go
  14. 4
      pkg/logql/test_utils.go
  15. 3
      pkg/storage/batch_test.go

@ -9,6 +9,7 @@ import (
"hash"
"hash/crc32"
"io"
"sort"
"time"
"github.com/cespare/xxhash/v2"
@ -646,6 +647,7 @@ func (hb *headBlock) sampleIterator(ctx context.Context, mint, maxt int64, lbs l
}
seriesRes := make([]logproto.Series, 0, len(series))
for _, s := range series {
sort.Sort(s)
seriesRes = append(seriesRes, *s)
}
return iter.NewMultiSeriesIterator(ctx, seriesRes)

@ -20,6 +20,7 @@ import (
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/logql/stats"
)
@ -128,7 +129,7 @@ func TestBlock(t *testing.T) {
require.NoError(t, it.Close())
require.Equal(t, len(cases), idx)
sampleIt := chk.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), nil, logql.ExtractCount)
sampleIt := chk.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), nil, log.CountExtractor.ToSampleExtractor(nil, false, false))
idx = 0
for sampleIt.Next() {
s := sampleIt.Sample()
@ -276,7 +277,7 @@ func TestSerialization(t *testing.T) {
}
require.NoError(t, it.Error())
sampleIt := bc.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), nil, logql.ExtractCount)
sampleIt := bc.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), nil, log.CountExtractor.ToSampleExtractor(nil, false, false))
for i := 0; i < numSamples; i++ {
require.True(t, sampleIt.Next(), i)

@ -17,3 +17,7 @@ type Streams []Stream
func (xs Streams) Len() int { return len(xs) }
func (xs Streams) Swap(i, j int) { xs[i], xs[j] = xs[j], xs[i] }
func (xs Streams) Less(i, j int) bool { return xs[i].Labels <= xs[j].Labels }
func (s Series) Len() int { return len(s.Samples) }
func (s Series) Swap(i, j int) { s.Samples[i], s.Samples[j] = s.Samples[j], s.Samples[i] }
func (s Series) Less(i, j int) bool { return s.Samples[i].Timestamp < s.Samples[j].Timestamp }

@ -86,7 +86,6 @@ type SampleExtractor = log.SampleExtractor
var (
NoopPipeline = log.NoopPipeline
ExtractCount = log.CountExtractor.ToSampleExtractor()
)
// PipelineExpr is an expression defining a log pipeline.
@ -716,6 +715,11 @@ func (e *vectorAggregationExpr) Selector() LogSelectorExpr {
}
func (e *vectorAggregationExpr) Extractor() (log.SampleExtractor, error) {
// inject in the range vector extractor the outer groups to improve performance.
// This is only possible if the operation is a sum. Anything else needs all labels.
if r, ok := e.left.(*rangeAggregationExpr); ok && e.operation == OpTypeSum {
return r.extractor(e.grouping, true)
}
return e.left.Extractor()
}

@ -7,7 +7,6 @@ import (
"math"
"strings"
// "math"
"testing"
"time"
@ -163,7 +162,7 @@ func TestEngine_LogsInstantQuery(t *testing.T) {
{newSeries(testSize, factor(5, identity), `{app="foo"}`), newSeries(testSize, factor(5, identity), `{app="bar"}`)},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `rate({app=~"foo|bar"}|~".+bar"[1m])`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `sum(rate({app=~"foo|bar"} |~".+bar" [1m]))`}},
},
promql.Vector{
promql.Sample{Point: promql.Point{T: 60 * 1000, V: 0.4}, Metric: labels.Labels{}},
@ -175,7 +174,7 @@ func TestEngine_LogsInstantQuery(t *testing.T) {
{newSeries(testSize, factor(10, identity), `{app="foo"}`), newSeries(testSize, factor(10, identity), `{app="bar"}`)},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `count_over_time({app=~"foo|bar"}|~".+bar"[1m])`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `sum by (app)(count_over_time({app=~"foo|bar"} |~".+bar" [1m]))`}},
},
promql.Vector{
promql.Sample{Point: promql.Point{T: 60 * 1000, V: 6}, Metric: labels.Labels{labels.Label{Name: "app", Value: "bar"}}},
@ -191,7 +190,7 @@ func TestEngine_LogsInstantQuery(t *testing.T) {
},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `count_over_time({app=~"foo|bar"}|~".+bar"[1m])`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `sum by (namespace,app) (count_over_time({app=~"foo|bar"} |~".+bar" [1m])) `}},
},
promql.Vector{
promql.Sample{
@ -432,8 +431,8 @@ func TestEngine_LogsInstantQuery(t *testing.T) {
{},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `count_over_time({app="foo"}[1m])`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `count_over_time({app="bar"}[1m])`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `sum without (app) (count_over_time({app="foo"}[1m]))`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `sum without (app) (count_over_time({app="bar"}[1m]))`}},
},
promql.Vector{
promql.Sample{Point: promql.Point{T: 60 * 1000, V: 0}, Metric: labels.Labels{}},
@ -449,8 +448,8 @@ func TestEngine_LogsInstantQuery(t *testing.T) {
{newSeries(testSize, identity, `{app="bar"}`)},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `count_over_time({app="foo"}[1m])`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `count_over_time({app="bar"}[1m])`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `sum without(app) (count_over_time({app="foo"}[1m]))`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `sum without(app) (count_over_time({app="bar"}[1m]))`}},
},
promql.Vector{
promql.Sample{Point: promql.Point{T: 60 * 1000, V: 60}, Metric: labels.Labels{}},
@ -688,7 +687,7 @@ func TestEngine_RangeQuery(t *testing.T) {
{newSeries(testSize, factor(5, identity), `{app="foo"}`), newSeries(testSize, factor(5, identity), `{app="bar"}`)},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `rate({app=~"foo|bar"}|~".+bar"[1m])`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `sum(rate({app=~"foo|bar"} |~".+bar" [1m]))`}},
},
promql.Matrix{
promql.Series{
@ -703,7 +702,7 @@ func TestEngine_RangeQuery(t *testing.T) {
{newSeries(testSize, factor(10, identity), `{app="foo"}`), newSeries(testSize, factor(5, identity), `{app="bar"}`)},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `count_over_time({app=~"foo|bar"}|~".+bar"[1m])`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `sum by (app) (count_over_time({app=~"foo|bar"} |~".+bar" [1m]))`}},
},
promql.Matrix{
promql.Series{
@ -727,7 +726,7 @@ func TestEngine_RangeQuery(t *testing.T) {
},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `count_over_time({app=~"foo|bar"}|~".+bar"[1m])`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `sum by (namespace,cluster, app)(count_over_time({app=~"foo|bar"} |~".+bar" [1m]))`}},
},
promql.Matrix{
promql.Series{
@ -759,7 +758,7 @@ func TestEngine_RangeQuery(t *testing.T) {
},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `count_over_time({app=~"foo|bar"}|~".+bar"[1m])`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `sum by (cluster, namespace, app) (count_over_time({app=~"foo|bar"} |~".+bar" [1m]))`}},
},
promql.Matrix{
promql.Series{
@ -791,7 +790,7 @@ func TestEngine_RangeQuery(t *testing.T) {
},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `count_over_time({app=~"foo|bar"}|~".+bar"[1m])`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `sum by (namespace, app)(count_over_time({app=~"foo|bar"} |~".+bar" [1m]))`}},
},
promql.Matrix{
promql.Series{
@ -1224,7 +1223,7 @@ func TestEngine_RangeQuery(t *testing.T) {
},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `rate({app=~"foo|bar"}|~".+bar"[1m])`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `sum by (app) (rate({app=~"foo|bar"} |~".+bar" [1m]))`}},
},
promql.Matrix{
promql.Series{
@ -1252,7 +1251,7 @@ func TestEngine_RangeQuery(t *testing.T) {
},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `rate({app=~"foo|bar"}|~".+bar"[1m])`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `sum by (app) (rate({app=~"foo|bar"} |~".+bar" [1m]))`}},
},
promql.Matrix{
promql.Series{
@ -1280,7 +1279,7 @@ func TestEngine_RangeQuery(t *testing.T) {
},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `rate({app=~"foo|bar"}|~".+bar"[1m])`}},
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `sum by (app) (rate({app=~"foo|bar"} |~".+bar" [1m]))`}},
},
promql.Matrix{
promql.Series{

@ -103,9 +103,23 @@ func GetRangeType(q Params) QueryRangeType {
// Evaluator is an interface for iterating over data at different nodes in the AST
type Evaluator interface {
SampleEvaluator
EntryEvaluator
}
type SampleEvaluator interface {
// StepEvaluator returns a StepEvaluator for a given SampleExpr. It's explicitly passed another StepEvaluator// in order to enable arbitrary computation of embedded expressions. This allows more modular & extensible
// StepEvaluator implementations which can be composed.
StepEvaluator(ctx context.Context, nextEvaluator Evaluator, expr SampleExpr, p Params) (StepEvaluator, error)
StepEvaluator(ctx context.Context, nextEvaluator SampleEvaluator, expr SampleExpr, p Params) (StepEvaluator, error)
}
type SampleEvaluatorFunc func(ctx context.Context, nextEvaluator SampleEvaluator, expr SampleExpr, p Params) (StepEvaluator, error)
func (s SampleEvaluatorFunc) StepEvaluator(ctx context.Context, nextEvaluator SampleEvaluator, expr SampleExpr, p Params) (StepEvaluator, error) {
return s(ctx, nextEvaluator, expr, p)
}
type EntryEvaluator interface {
// Iterator returns the iter.EntryIterator for a given LogSelectorExpr
Iterator(context.Context, LogSelectorExpr, Params) (iter.EntryIterator, error)
}
@ -151,12 +165,31 @@ func (ev *DefaultEvaluator) Iterator(ctx context.Context, expr LogSelectorExpr,
func (ev *DefaultEvaluator) StepEvaluator(
ctx context.Context,
nextEv Evaluator,
nextEv SampleEvaluator,
expr SampleExpr,
q Params,
) (StepEvaluator, error) {
switch e := expr.(type) {
case *vectorAggregationExpr:
if rangExpr, ok := e.left.(*rangeAggregationExpr); ok && e.operation == OpTypeSum {
// if range expression is wrapped with a vector expression
// we should send the vector expression for allowing reducing labels at the source.
nextEv = SampleEvaluatorFunc(func(ctx context.Context, nextEvaluator SampleEvaluator, expr SampleExpr, p Params) (StepEvaluator, error) {
it, err := ev.querier.SelectSamples(ctx, SelectSampleParams{
&logproto.SampleQueryRequest{
Start: q.Start().Add(-rangExpr.left.interval),
End: q.End(),
Selector: e.String(), // intentionally send the the vector for reducing labels.
Shards: q.Shards(),
},
})
if err != nil {
return nil, err
}
return rangeAggEvaluator(iter.NewPeekingSampleIterator(it), rangExpr, q)
})
}
return vectorAggEvaluator(ctx, nextEv, e, q)
case *rangeAggregationExpr:
it, err := ev.querier.SelectSamples(ctx, SelectSampleParams{
@ -180,7 +213,7 @@ func (ev *DefaultEvaluator) StepEvaluator(
func vectorAggEvaluator(
ctx context.Context,
ev Evaluator,
ev SampleEvaluator,
expr *vectorAggregationExpr,
q Params,
) (StepEvaluator, error) {
@ -458,7 +491,7 @@ func (r rangeVectorEvaluator) Error() error {
// it makes the type system simpler and these are reduced in mustNewBinOpExpr
func binOpStepEvaluator(
ctx context.Context,
ev Evaluator,
ev SampleEvaluator,
expr *binOpExpr,
q Params,
) (StepEvaluator, error) {

@ -13,9 +13,30 @@ import (
const unsupportedErr = "unsupported range vector aggregation operation: %s"
func (r rangeAggregationExpr) Extractor() (log.SampleExtractor, error) {
return r.extractor(nil, false)
}
func (r rangeAggregationExpr) extractor(gr *grouping, all bool) (log.SampleExtractor, error) {
if err := r.validate(); err != nil {
return nil, err
}
var groups []string
var without bool
// fallback to parents grouping
if gr != nil {
groups = gr.groups
without = gr.without
}
// range aggregation grouping takes priority
if r.grouping != nil {
groups = r.grouping.groups
without = r.grouping.without
}
sort.Strings(groups)
var stages []log.Stage
if p, ok := r.left.left.(*pipelineExpr); ok {
// if the expression is a pipeline then take all stages into account first.
@ -28,30 +49,25 @@ func (r rangeAggregationExpr) Extractor() (log.SampleExtractor, error) {
// unwrap...means we want to extract metrics from labels.
if r.left.unwrap != nil {
var convOp string
var groups []string
var without bool
switch r.left.unwrap.operation {
case OpConvDuration, OpConvDurationSeconds:
convOp = log.ConvertDuration
default:
convOp = log.ConvertFloat
}
if r.grouping != nil {
groups = r.grouping.groups
without = r.grouping.without
}
return log.LabelExtractorWithStages(
r.left.unwrap.identifier,
convOp, groups, without, stages,
convOp, groups, without, all, stages,
log.ReduceAndLabelFilter(r.left.unwrap.postFilters),
)
}
// otherwise we extract metrics from the log line.
switch r.operation {
case OpRangeTypeRate, OpRangeTypeCount:
return log.LineExtractorWithStages(log.CountExtractor, stages)
return log.LineExtractorWithStages(log.CountExtractor, stages, groups, without, all)
case OpRangeTypeBytes, OpRangeTypeBytesRate:
return log.LineExtractorWithStages(log.BytesExtractor, stages)
return log.LineExtractorWithStages(log.BytesExtractor, stages, groups, without, all)
default:
return nil, fmt.Errorf(unsupportedErr, r.operation)
}

@ -135,3 +135,13 @@ Outer:
return res
}
func (b *LabelsBuilder) WithoutLabels(names ...string) labels.Labels {
// naive implementation for now.
return b.Labels().WithoutLabels(names...)
}
func (b *LabelsBuilder) WithLabels(names ...string) labels.Labels {
// naive implementation for now.
return b.Labels().WithLabels(names...)
}

@ -1,6 +1,7 @@
package log
import (
"sort"
"strconv"
"time"
@ -29,8 +30,14 @@ type LineExtractor func([]byte) float64
// ToSampleExtractor transform a LineExtractor into a SampleExtractor.
// Useful for metric conversion without log Pipeline.
func (l LineExtractor) ToSampleExtractor() SampleExtractor {
func (l LineExtractor) ToSampleExtractor(groups []string, without bool, noLabels bool) SampleExtractor {
return SampleExtractorFunc(func(line []byte, lbs labels.Labels) (float64, labels.Labels, bool) {
// todo(cyriltovena) grouping should be done once per stream/chunk not for everyline.
// so for now we'll cover just vector without grouping. This requires changes to SampleExtractor interface.
// For another day !
if len(groups) == 0 && noLabels {
return l(line), labels.Labels{}, true
}
return l(line), lbs, true
})
}
@ -44,7 +51,10 @@ type lineSampleExtractor struct {
Stage
LineExtractor
builder *LabelsBuilder
groups []string
without bool
noLabels bool
builder *LabelsBuilder
}
func (l lineSampleExtractor) Process(line []byte, lbs labels.Labels) (float64, labels.Labels, bool) {
@ -53,16 +63,33 @@ func (l lineSampleExtractor) Process(line []byte, lbs labels.Labels) (float64, l
if !ok {
return 0, nil, false
}
if len(l.groups) != 0 {
if l.without {
return l.LineExtractor(line), l.builder.WithoutLabels(l.groups...), true
}
return l.LineExtractor(line), l.builder.WithLabels(l.groups...), true
}
if l.noLabels {
// no grouping but it was a vector operation so we return a single vector
return l.LineExtractor(line), labels.Labels{}, true
}
return l.LineExtractor(line), l.builder.Labels(), true
}
// LineExtractorWithStages creates a SampleExtractor from a LineExtractor.
// Multiple log stages are run before converting the log line.
func LineExtractorWithStages(ex LineExtractor, stages []Stage) (SampleExtractor, error) {
func LineExtractorWithStages(ex LineExtractor, stages []Stage, groups []string, without bool, noLabels bool) (SampleExtractor, error) {
if len(stages) == 0 {
return ex.ToSampleExtractor(), nil
return ex.ToSampleExtractor(groups, without, noLabels), nil
}
return lineSampleExtractor{Stage: ReduceStages(stages), LineExtractor: ex, builder: NewLabelsBuilder()}, nil
return lineSampleExtractor{
Stage: ReduceStages(stages),
LineExtractor: ex,
builder: NewLabelsBuilder(),
groups: groups,
without: without,
noLabels: noLabels,
}, nil
}
type convertionFn func(value string) (float64, error)
@ -76,6 +103,7 @@ type labelSampleExtractor struct {
conversionFn convertionFn
groups []string
without bool
noLabels bool
}
// LabelExtractorWithStages creates a SampleExtractor that will extract metrics from a labels.
@ -83,7 +111,7 @@ type labelSampleExtractor struct {
// to remove sample containing the __error__ label.
func LabelExtractorWithStages(
labelName, conversion string,
groups []string, without bool,
groups []string, without bool, noLabels bool,
preStages []Stage,
postFilter Stage,
) (SampleExtractor, error) {
@ -96,6 +124,10 @@ func LabelExtractorWithStages(
default:
return nil, errors.Errorf("unsupported conversion operation %s", conversion)
}
if len(groups) != 0 && without {
groups = append(groups, labelName)
sort.Strings(groups)
}
return &labelSampleExtractor{
preStage: ReduceStages(preStages),
conversionFn: convFn,
@ -104,6 +136,7 @@ func LabelExtractorWithStages(
postFilter: postFilter,
without: without,
builder: NewLabelsBuilder(),
noLabels: noLabels,
}, nil
}
@ -135,16 +168,19 @@ func (l *labelSampleExtractor) Process(line []byte, lbs labels.Labels) (float64,
// We need to return now before applying grouping otherwise the error might get lost.
return v, l.builder.Labels(), true
}
return v, l.groupLabels(l.builder.Labels()), true
return v, l.groupLabels(l.builder), true
}
func (l *labelSampleExtractor) groupLabels(lbs labels.Labels) labels.Labels {
if l.groups != nil {
func (l *labelSampleExtractor) groupLabels(lbs *LabelsBuilder) labels.Labels {
if len(l.groups) != 0 {
if l.without {
return lbs.WithoutLabels(append(l.groups, l.labelName)...)
return lbs.WithoutLabels(l.groups...)
}
return lbs.WithLabels(l.groups...)
}
if l.noLabels {
return labels.Labels{}
}
return lbs.WithoutLabels(l.labelName)
}

@ -20,17 +20,27 @@ func Test_labelSampleExtractor_Extract(t *testing.T) {
{
"convert float",
mustSampleExtractor(LabelExtractorWithStages(
"foo", ConvertFloat, nil, false, nil, NoopStage,
"foo", ConvertFloat, nil, false, false, nil, NoopStage,
)),
labels.Labels{labels.Label{Name: "foo", Value: "15.0"}},
15,
labels.Labels{},
true,
},
{
"convert float as vector with no grouping",
mustSampleExtractor(LabelExtractorWithStages(
"foo", ConvertFloat, nil, false, true, nil, NoopStage,
)),
labels.Labels{labels.Label{Name: "foo", Value: "15.0"}, labels.Label{Name: "bar", Value: "buzz"}},
15,
labels.Labels{},
true,
},
{
"convert float without",
mustSampleExtractor(LabelExtractorWithStages(
"foo", ConvertFloat, []string{"bar", "buzz"}, true, nil, NoopStage,
"foo", ConvertFloat, []string{"bar", "buzz"}, true, false, nil, NoopStage,
)),
labels.Labels{
{Name: "foo", Value: "10"},
@ -47,7 +57,7 @@ func Test_labelSampleExtractor_Extract(t *testing.T) {
{
"convert float with",
mustSampleExtractor(LabelExtractorWithStages(
"foo", ConvertFloat, []string{"bar", "buzz"}, false, nil, NoopStage,
"foo", ConvertFloat, []string{"bar", "buzz"}, false, false, nil, NoopStage,
)),
labels.Labels{
{Name: "foo", Value: "0.6"},
@ -65,7 +75,7 @@ func Test_labelSampleExtractor_Extract(t *testing.T) {
{
"convert duration with",
mustSampleExtractor(LabelExtractorWithStages(
"foo", ConvertDuration, []string{"bar", "buzz"}, false, nil, NoopStage,
"foo", ConvertDuration, []string{"bar", "buzz"}, false, false, nil, NoopStage,
)),
labels.Labels{
{Name: "foo", Value: "500ms"},

File diff suppressed because it is too large Load Diff

@ -192,7 +192,7 @@ func NewDownstreamEvaluator(downstreamer Downstreamer) *DownstreamEvaluator {
// Evaluator returns a StepEvaluator for a given SampleExpr
func (ev *DownstreamEvaluator) StepEvaluator(
ctx context.Context,
nextEv Evaluator,
nextEv SampleEvaluator,
expr SampleExpr,
params Params,
) (StepEvaluator, error) {

@ -37,9 +37,12 @@ func TestMappingEquivalence(t *testing.T) {
{`{a="1"} |= "number: 10"`, false},
{`rate({a=~".*"}[1s])`, false},
{`sum by (a) (rate({a=~".*"}[1s]))`, false},
{`sum(rate({a=~".*"}[1s]))`, false},
{`max without (a) (rate({a=~".*"}[1s]))`, false},
{`count(rate({a=~".*"}[1s]))`, false},
{`avg(rate({a=~".*"}[1s]))`, true},
{`avg(rate({a=~".*"}[1s])) by (a)`, true},
{`1 + sum by (cluster) (rate({a=~".*"}[1s]))`, false},
{`sum(max(rate({a=~".*"}[1s])))`, false},
{`max(count(rate({a=~".*"}[1s])))`, false},

@ -4,8 +4,10 @@ import (
"context"
"fmt"
logger "log"
"sort"
"time"
"github.com/cespare/xxhash/v2"
"github.com/cortexproject/cortex/pkg/querier/astmapper"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql/parser"
@ -133,12 +135,14 @@ func processSeries(in []logproto.Stream, ex log.SampleExtractor) []logproto.Seri
s.Samples = append(s.Samples, logproto.Sample{
Timestamp: e.Timestamp.UnixNano(),
Value: f,
Hash: xxhash.Sum64([]byte(e.Line)),
})
}
}
}
series := []logproto.Series{}
for _, s := range resBySeries {
sort.Sort(s)
series = append(series, *s)
}
return series

@ -18,6 +18,7 @@ import (
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/logql/stats"
)
@ -1230,7 +1231,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) {
for name, tt := range tests {
tt := tt
t.Run(name, func(t *testing.T) {
it, err := newSampleBatchIterator(context.Background(), NilMetrics, tt.chunks, tt.batchSize, newMatchers(tt.matchers), logql.ExtractCount, tt.start, tt.end)
it, err := newSampleBatchIterator(context.Background(), NilMetrics, tt.chunks, tt.batchSize, newMatchers(tt.matchers), log.CountExtractor.ToSampleExtractor(nil, false, false), tt.start, tt.end)
require.NoError(t, err)
series, _, err := iter.ReadSampleBatch(it, 1000)
_ = it.Close()

Loading…
Cancel
Save