|
|
|
|
@ -690,6 +690,86 @@ func TestEngine_NewRangeQuery(t *testing.T) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// go test -mod=vendor ./pkg/logql/ -bench=. -benchmem -memprofile memprofile.out -cpuprofile cpuprofile.out
|
|
|
|
|
func BenchmarkRangeQuery100000(b *testing.B) { |
|
|
|
|
benchmarkRangeQuery(int64(100000), b) |
|
|
|
|
} |
|
|
|
|
func BenchmarkRangeQuery200000(b *testing.B) { |
|
|
|
|
benchmarkRangeQuery(int64(200000), b) |
|
|
|
|
} |
|
|
|
|
func BenchmarkRangeQuery500000(b *testing.B) { |
|
|
|
|
benchmarkRangeQuery(int64(500000), b) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func BenchmarkRangeQuery1000000(b *testing.B) { |
|
|
|
|
benchmarkRangeQuery(int64(1000000), b) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
var result promql.Value |
|
|
|
|
|
|
|
|
|
func benchmarkRangeQuery(testsize int64, b *testing.B) { |
|
|
|
|
b.ReportAllocs() |
|
|
|
|
eng := NewEngine(EngineOpts{}) |
|
|
|
|
start := time.Unix(0, 0) |
|
|
|
|
end := time.Unix(testsize, 0) |
|
|
|
|
querier := getLocalQuerier(testsize) |
|
|
|
|
b.ResetTimer() |
|
|
|
|
for i := 0; i < b.N; i++ { |
|
|
|
|
for _, test := range []struct { |
|
|
|
|
qs string |
|
|
|
|
direction logproto.Direction |
|
|
|
|
}{ |
|
|
|
|
{`{app="foo"}`, logproto.FORWARD}, |
|
|
|
|
{`{app="bar"} |= "foo" |~ ".+bar"`, logproto.BACKWARD}, |
|
|
|
|
{`rate({app="foo"} |~".+bar" [1m])`, logproto.BACKWARD}, |
|
|
|
|
{`rate({app="foo"}[30s])`, logproto.FORWARD}, |
|
|
|
|
{`count_over_time({app="foo"} |~".+bar" [1m])`, logproto.BACKWARD}, |
|
|
|
|
{`count_over_time(({app="foo"} |~".+bar")[5m])`, logproto.BACKWARD}, |
|
|
|
|
{`avg(count_over_time({app=~"foo|bar"} |~".+bar" [1m]))`, logproto.FORWARD}, |
|
|
|
|
{`min(rate({app=~"foo|bar"} |~".+bar" [1m]))`, logproto.FORWARD}, |
|
|
|
|
{`max by (app) (rate({app=~"foo|bar"} |~".+bar" [1m]))`, logproto.FORWARD}, |
|
|
|
|
{`max(rate({app=~"foo|bar"} |~".+bar" [1m]))`, logproto.FORWARD}, |
|
|
|
|
{`sum(rate({app=~"foo|bar"} |~".+bar" [1m]))`, logproto.FORWARD}, |
|
|
|
|
{`sum(count_over_time({app=~"foo|bar"} |~".+bar" [1m])) by (app)`, logproto.FORWARD}, |
|
|
|
|
{`count(count_over_time({app=~"foo|bar"} |~".+bar" [1m])) without (app)`, logproto.FORWARD}, |
|
|
|
|
{`stdvar without (app) (count_over_time(({app=~"foo|bar"} |~".+bar")[1m])) `, logproto.FORWARD}, |
|
|
|
|
{`stddev(count_over_time(({app=~"foo|bar"} |~".+bar")[1m])) `, logproto.FORWARD}, |
|
|
|
|
{`rate(({app=~"foo|bar"} |~".+bar")[1m])`, logproto.FORWARD}, |
|
|
|
|
{`topk(2,rate(({app=~"foo|bar"} |~".+bar")[1m]))`, logproto.FORWARD}, |
|
|
|
|
{`topk(1,rate(({app=~"foo|bar"} |~".+bar")[1m]))`, logproto.FORWARD}, |
|
|
|
|
{`topk(1,rate(({app=~"foo|bar"} |~".+bar")[1m])) by (app)`, logproto.FORWARD}, |
|
|
|
|
{`bottomk(2,rate(({app=~"foo|bar"} |~".+bar")[1m]))`, logproto.FORWARD}, |
|
|
|
|
{`bottomk(3,rate(({app=~"foo|bar"} |~".+bar")[1m])) without (app)`, logproto.FORWARD}, |
|
|
|
|
} { |
|
|
|
|
q := eng.NewRangeQuery(querier, test.qs, start, end, 60*time.Second, test.direction, 1000) |
|
|
|
|
res, err := q.Exec(context.Background()) |
|
|
|
|
if err != nil { |
|
|
|
|
b.Fatal(err) |
|
|
|
|
} |
|
|
|
|
result = res |
|
|
|
|
if res == nil { |
|
|
|
|
b.Fatal("unexpected nil result") |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func getLocalQuerier(size int64) Querier { |
|
|
|
|
iters := []iter.EntryIterator{ |
|
|
|
|
iter.NewStreamIterator(newStream(size, identity, `{app="foo"}`)), |
|
|
|
|
iter.NewStreamIterator(newStream(size, identity, `{app="foo",bar="foo"}`)), |
|
|
|
|
iter.NewStreamIterator(newStream(size, identity, `{app="foo",bar="bazz"}`)), |
|
|
|
|
iter.NewStreamIterator(newStream(size, identity, `{app="foo",bar="fuzz"}`)), |
|
|
|
|
iter.NewStreamIterator(newStream(size, identity, `{app="bar"}`)), |
|
|
|
|
iter.NewStreamIterator(newStream(size, identity, `{app="bar",bar="foo"}`)), |
|
|
|
|
iter.NewStreamIterator(newStream(size, identity, `{app="bar",bar="bazz"}`)), |
|
|
|
|
iter.NewStreamIterator(newStream(size, identity, `{app="bar",bar="fuzz"}`)), |
|
|
|
|
} |
|
|
|
|
return QuerierFunc(func(ctx context.Context, p SelectParams) (iter.EntryIterator, error) { |
|
|
|
|
return iter.NewHeapIterator(iters, p.Direction), nil |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type querierRecorder struct { |
|
|
|
|
source map[string][]*logproto.Stream |
|
|
|
|
} |
|
|
|
|
|