From 1fc0ffde8a5f9139b2e30eeadacad89f7ef515ca Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 9 Dec 2019 09:49:00 -0500 Subject: [PATCH] Logql benchmark and performance improvement. (#1371) * Add logql benchmark test. Signed-off-by: Cyril Tovena * Fixes store benchmark limits. Signed-off-by: Cyril Tovena * Add LogQL improvements. Signed-off-by: Cyril Tovena * Add back logging in the storage benchmark Signed-off-by: Cyril Tovena --- pkg/iter/iterator.go | 17 ++++----- pkg/logql/engine.go | 3 +- pkg/logql/engine_test.go | 80 +++++++++++++++++++++++++++++++++++++++ pkg/logql/range_vector.go | 52 +++++++++++++++++++------ pkg/storage/store_test.go | 8 +++- 5 files changed, 137 insertions(+), 23 deletions(-) diff --git a/pkg/iter/iterator.go b/pkg/iter/iterator.go index ab36a38ffe..21619daca2 100644 --- a/pkg/iter/iterator.go +++ b/pkg/iter/iterator.go @@ -602,26 +602,27 @@ type PeekingEntryIterator interface { func NewPeekingIterator(iter EntryIterator) PeekingEntryIterator { // initialize the next entry so we can peek right from the start. var cache *entryWithLabels + next := &entryWithLabels{} if iter.Next() { cache = &entryWithLabels{ entry: iter.Entry(), labels: iter.Labels(), } + next.entry = cache.entry + next.labels = cache.labels } return &peekingEntryIterator{ iter: iter, cache: cache, - next: cache, + next: next, } } // Next implements `EntryIterator` func (it *peekingEntryIterator) Next() bool { if it.cache != nil { - it.next = &entryWithLabels{ - entry: it.cache.entry, - labels: it.cache.labels, - } + it.next.entry = it.cache.entry + it.next.labels = it.cache.labels it.cacheNext() return true } @@ -631,10 +632,8 @@ func (it *peekingEntryIterator) Next() bool { // cacheNext caches the next element if it exists. func (it *peekingEntryIterator) cacheNext() { if it.iter.Next() { - it.cache = &entryWithLabels{ - entry: it.iter.Entry(), - labels: it.iter.Labels(), - } + it.cache.entry = it.iter.Entry() + it.cache.labels = it.iter.Labels() return } // nothing left removes the cached entry diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index 48ef9cb779..feab29db40 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -296,8 +296,9 @@ type groupedAggregation struct { // Evaluator implements `SampleExpr` for a vectorAggregationExpr // this is copied and adapted from Prometheus vector aggregation code. func (v *vectorAggregationExpr) Evaluator() StepEvaluator { + nextEvaluator := v.left.Evaluator() return StepEvaluatorFn(func() (bool, int64, promql.Vector) { - next, ts, vec := v.left.Evaluator().Next() + next, ts, vec := nextEvaluator.Next() if !next { return false, 0, promql.Vector{} } diff --git a/pkg/logql/engine_test.go b/pkg/logql/engine_test.go index 92026ce3f9..4a60063876 100644 --- a/pkg/logql/engine_test.go +++ b/pkg/logql/engine_test.go @@ -690,6 +690,86 @@ func TestEngine_NewRangeQuery(t *testing.T) { } } +// go test -mod=vendor ./pkg/logql/ -bench=. -benchmem -memprofile memprofile.out -cpuprofile cpuprofile.out +func BenchmarkRangeQuery100000(b *testing.B) { + benchmarkRangeQuery(int64(100000), b) +} +func BenchmarkRangeQuery200000(b *testing.B) { + benchmarkRangeQuery(int64(200000), b) +} +func BenchmarkRangeQuery500000(b *testing.B) { + benchmarkRangeQuery(int64(500000), b) +} + +func BenchmarkRangeQuery1000000(b *testing.B) { + benchmarkRangeQuery(int64(1000000), b) +} + +var result promql.Value + +func benchmarkRangeQuery(testsize int64, b *testing.B) { + b.ReportAllocs() + eng := NewEngine(EngineOpts{}) + start := time.Unix(0, 0) + end := time.Unix(testsize, 0) + querier := getLocalQuerier(testsize) + b.ResetTimer() + for i := 0; i < b.N; i++ { + for _, test := range []struct { + qs string + direction logproto.Direction + }{ + {`{app="foo"}`, logproto.FORWARD}, + {`{app="bar"} |= "foo" |~ ".+bar"`, logproto.BACKWARD}, + {`rate({app="foo"} |~".+bar" [1m])`, logproto.BACKWARD}, + {`rate({app="foo"}[30s])`, logproto.FORWARD}, + {`count_over_time({app="foo"} |~".+bar" [1m])`, logproto.BACKWARD}, + {`count_over_time(({app="foo"} |~".+bar")[5m])`, logproto.BACKWARD}, + {`avg(count_over_time({app=~"foo|bar"} |~".+bar" [1m]))`, logproto.FORWARD}, + {`min(rate({app=~"foo|bar"} |~".+bar" [1m]))`, logproto.FORWARD}, + {`max by (app) (rate({app=~"foo|bar"} |~".+bar" [1m]))`, logproto.FORWARD}, + {`max(rate({app=~"foo|bar"} |~".+bar" [1m]))`, logproto.FORWARD}, + {`sum(rate({app=~"foo|bar"} |~".+bar" [1m]))`, logproto.FORWARD}, + {`sum(count_over_time({app=~"foo|bar"} |~".+bar" [1m])) by (app)`, logproto.FORWARD}, + {`count(count_over_time({app=~"foo|bar"} |~".+bar" [1m])) without (app)`, logproto.FORWARD}, + {`stdvar without (app) (count_over_time(({app=~"foo|bar"} |~".+bar")[1m])) `, logproto.FORWARD}, + {`stddev(count_over_time(({app=~"foo|bar"} |~".+bar")[1m])) `, logproto.FORWARD}, + {`rate(({app=~"foo|bar"} |~".+bar")[1m])`, logproto.FORWARD}, + {`topk(2,rate(({app=~"foo|bar"} |~".+bar")[1m]))`, logproto.FORWARD}, + {`topk(1,rate(({app=~"foo|bar"} |~".+bar")[1m]))`, logproto.FORWARD}, + {`topk(1,rate(({app=~"foo|bar"} |~".+bar")[1m])) by (app)`, logproto.FORWARD}, + {`bottomk(2,rate(({app=~"foo|bar"} |~".+bar")[1m]))`, logproto.FORWARD}, + {`bottomk(3,rate(({app=~"foo|bar"} |~".+bar")[1m])) without (app)`, logproto.FORWARD}, + } { + q := eng.NewRangeQuery(querier, test.qs, start, end, 60*time.Second, test.direction, 1000) + res, err := q.Exec(context.Background()) + if err != nil { + b.Fatal(err) + } + result = res + if res == nil { + b.Fatal("unexpected nil result") + } + } + } +} + +func getLocalQuerier(size int64) Querier { + iters := []iter.EntryIterator{ + iter.NewStreamIterator(newStream(size, identity, `{app="foo"}`)), + iter.NewStreamIterator(newStream(size, identity, `{app="foo",bar="foo"}`)), + iter.NewStreamIterator(newStream(size, identity, `{app="foo",bar="bazz"}`)), + iter.NewStreamIterator(newStream(size, identity, `{app="foo",bar="fuzz"}`)), + iter.NewStreamIterator(newStream(size, identity, `{app="bar"}`)), + iter.NewStreamIterator(newStream(size, identity, `{app="bar",bar="foo"}`)), + iter.NewStreamIterator(newStream(size, identity, `{app="bar",bar="bazz"}`)), + iter.NewStreamIterator(newStream(size, identity, `{app="bar",bar="fuzz"}`)), + } + return QuerierFunc(func(ctx context.Context, p SelectParams) (iter.EntryIterator, error) { + return iter.NewHeapIterator(iters, p.Direction), nil + }) +} + type querierRecorder struct { source map[string][]*logproto.Stream } diff --git a/pkg/logql/range_vector.go b/pkg/logql/range_vector.go index 8ec3bc1087..63f2725ddc 100644 --- a/pkg/logql/range_vector.go +++ b/pkg/logql/range_vector.go @@ -1,7 +1,10 @@ package logql import ( + "sync" + "github.com/grafana/loki/pkg/iter" + "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" ) @@ -22,6 +25,7 @@ type rangeVectorIterator struct { iter iter.PeekingEntryIterator selRange, step, end, current int64 window map[string]*promql.Series + metrics map[string]labels.Labels } func newRangeVectorIterator( @@ -38,6 +42,7 @@ func newRangeVectorIterator( selRange: selRange, current: start - step, // first loop iteration will set it to start window: map[string]*promql.Series{}, + metrics: map[string]labels.Labels{}, } } @@ -73,7 +78,9 @@ func (r *rangeVectorIterator) popBack(newStart int64) { } r.window[fp].Points = r.window[fp].Points[lastPoint+1:] if len(r.window[fp].Points) == 0 { + s := r.window[fp] delete(r.window, fp) + putSeries(s) } } } @@ -95,15 +102,25 @@ func (r *rangeVectorIterator) load(start, end int64) { var ok bool series, ok = r.window[lbs] if !ok { - series = &promql.Series{ - Points: []promql.Point{}, + var metric labels.Labels + if metric, ok = r.metrics[lbs]; !ok { + var err error + metric, err = promql.ParseMetric(lbs) + if err != nil { + continue + } + r.metrics[lbs] = metric } + + series = getSeries() + series.Metric = metric r.window[lbs] = series } - series.Points = append(series.Points, promql.Point{ + p := promql.Point{ T: entry.Timestamp.UnixNano(), V: 1, - }) + } + series.Points = append(series.Points, p) _ = r.iter.Next() } } @@ -112,20 +129,31 @@ func (r *rangeVectorIterator) At(aggregator RangeVectorAggregator) (int64, promq result := make([]promql.Sample, 0, len(r.window)) // convert ts from nano to milli seconds as the iterator work with nanoseconds ts := r.current / 1e+6 - for lbs, series := range r.window { - labels, err := promql.ParseMetric(lbs) - if err != nil { - continue - } - + for _, series := range r.window { result = append(result, promql.Sample{ Point: promql.Point{ V: aggregator(ts, series.Points), T: ts, }, - Metric: labels, + Metric: series.Metric, }) - } return ts, result } + +var seriesPool sync.Pool + +func getSeries() *promql.Series { + if r := seriesPool.Get(); r != nil { + s := r.(*promql.Series) + s.Points = s.Points[:0] + return s + } + return &promql.Series{ + Points: make([]promql.Point, 0, 1024), + } +} + +func putSeries(s *promql.Series) { + seriesPool.Put(s) +} diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 799c494007..ebd2c1fa13 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -139,6 +139,12 @@ func printHeap(b *testing.B, show bool) { } func getLocalStore() Store { + limits, err := validation.NewOverrides(validation.Limits{ + MaxQueryLength: 6000 * time.Hour, + }) + if err != nil { + panic(err) + } store, err := NewStore(Config{ Config: storage.Config{ BoltDBConfig: local.BoltDBConfig{Directory: "/tmp/benchmark/index"}, @@ -158,7 +164,7 @@ func getLocalStore() Store { }, }, }, - }, &validation.Overrides{}) + }, limits) if err != nil { panic(err) }