From 8bd9f752aa8ee53129d3f5cd68c3e3fec1e32855 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 25 Jul 2019 15:45:49 -0400 Subject: [PATCH] Adds more tests for range queries --- pkg/logql/engine_test.go | 391 +++++++++++++++++++++++++++++++++++- pkg/querier/http.go | 19 +- pkg/querier/querier.go | 3 - pkg/querier/querier_test.go | 50 ----- 4 files changed, 406 insertions(+), 57 deletions(-) diff --git a/pkg/logql/engine_test.go b/pkg/logql/engine_test.go index 58f29df3d5..9f8dcde020 100644 --- a/pkg/logql/engine_test.go +++ b/pkg/logql/engine_test.go @@ -14,7 +14,7 @@ import ( "github.com/stretchr/testify/assert" ) -var testSize = int64(100) +var testSize = int64(300) func TestEngine_NewInstantQuery(t *testing.T) { t.Parallel() @@ -301,6 +301,395 @@ func TestEngine_NewInstantQuery(t *testing.T) { } } +func TestEngine_NewRangeQuery(t *testing.T) { + t.Parallel() + eng := NewEngine(EngineOpts{}) + for _, test := range []struct { + qs string + start time.Time + end time.Time + step time.Duration + direction logproto.Direction + limit uint32 + + // an array of streams per SelectParams will be returned by the querier. + // This is to cover logql that requires multiple queries. + streams [][]*logproto.Stream + params []SelectParams + + expected promql.Value + }{ + { + `{app="foo"}`, time.Unix(0, 0), time.Unix(30, 0), time.Second, logproto.FORWARD, 10, + [][]*logproto.Stream{ + {newStream(testSize, identity, `{app="foo"}`)}, + }, + []SelectParams{ + {&logproto.QueryRequest{Direction: logproto.FORWARD, Start: time.Unix(0, 0), End: time.Unix(30, 0), Limit: 10, Selector: `{app="foo"}`}}, + }, + Streams([]*logproto.Stream{newStream(10, identity, `{app="foo"}`)}), + }, + { + `{app="bar"} |= "foo" |~ ".+bar"`, time.Unix(0, 0), time.Unix(30, 0), time.Second, logproto.BACKWARD, 30, + [][]*logproto.Stream{ + {newStream(testSize, identity, `{app="bar"}`)}, + }, + []SelectParams{ + {&logproto.QueryRequest{Direction: logproto.BACKWARD, Start: time.Unix(0, 0), End: time.Unix(30, 0), Limit: 30, Selector: `{app="bar"}|="foo"|~".+bar"`}}, + }, + Streams([]*logproto.Stream{newStream(30, identity, `{app="bar"}`)}), + }, + { + `rate({app="foo"} |~".+bar" [1m])`, time.Unix(60, 0), time.Unix(120, 0), time.Minute, logproto.BACKWARD, 10, + [][]*logproto.Stream{ + {newStream(testSize, identity, `{app="foo"}`)}, + }, + []SelectParams{ + {&logproto.QueryRequest{Direction: logproto.FORWARD, Start: time.Unix(0, 0), End: time.Unix(120, 0), Limit: 0, Selector: `{app="foo"}|~".+bar"`}}, + }, + promql.Matrix{ + promql.Series{ + Metric: labels.Labels{{Name: "app", Value: "foo"}}, + Points: []promql.Point{{T: 60000000000, V: 1}, {T: 120000000000, V: 1}}, + }, + }, + }, + { + `rate({app="foo"}[30s])`, time.Unix(60, 0), time.Unix(120, 0), 15 * time.Second, logproto.FORWARD, 10, + [][]*logproto.Stream{ + {newStream(testSize, factor(2, identity), `{app="foo"}`)}, + }, + []SelectParams{ + {&logproto.QueryRequest{Direction: logproto.FORWARD, Start: time.Unix(30, 0), End: time.Unix(120, 0), Limit: 0, Selector: `{app="foo"}`}}, + }, + promql.Matrix{ + promql.Series{ + Metric: labels.Labels{{Name: "app", Value: "foo"}}, + Points: []promql.Point{{T: 60000000000, V: 0.5}, {T: 75000000000, V: 0.5}, {T: 90000000000, V: 0.5}, {T: 105000000000, V: 0.5}, {T: 120000000000, V: 0.5}}, + }, + }, + }, + { + `count_over_time({app="foo"} |~".+bar" [1m])`, time.Unix(60, 0), time.Unix(120, 0), 30 * time.Second, logproto.BACKWARD, 10, + [][]*logproto.Stream{ + {newStream(testSize, factor(10, identity), `{app="foo"}`)}, // 10 , 20 , 30 .. 60 = 6 total + }, + []SelectParams{ + {&logproto.QueryRequest{Direction: logproto.FORWARD, Start: time.Unix(0, 0), End: time.Unix(120, 0), Limit: 0, Selector: `{app="foo"}|~".+bar"`}}, + }, + promql.Matrix{ + promql.Series{ + Metric: labels.Labels{{Name: "app", Value: "foo"}}, + Points: []promql.Point{{T: 60000000000, V: 6}, {T: 90000000000, V: 6}, {T: 120000000000, V: 6}}, + }, + }, + }, + { + `count_over_time(({app="foo"} |~".+bar")[5m])`, time.Unix(5*60, 0), time.Unix(5*120, 0), 30 * time.Second, logproto.BACKWARD, 10, + [][]*logproto.Stream{ + {newStream(testSize, factor(10, identity), `{app="foo"}`)}, // 10 , 20 , 30 .. 300 = 30 total + }, + []SelectParams{ + {&logproto.QueryRequest{Direction: logproto.FORWARD, Start: time.Unix(0, 0), End: time.Unix(5*120, 0), Limit: 0, Selector: `{app="foo"}|~".+bar"`}}, + }, + promql.Matrix{ + promql.Series{ + Metric: labels.Labels{{Name: "app", Value: "foo"}}, + Points: []promql.Point{ + {T: 300000000000, V: 30}, + {T: 330000000000, V: 30}, + {T: 360000000000, V: 30}, + {T: 390000000000, V: 30}, + {T: 420000000000, V: 30}, + {T: 450000000000, V: 30}, + {T: 480000000000, V: 30}, + {T: 510000000000, V: 30}, + {T: 540000000000, V: 30}, + {T: 570000000000, V: 30}, + {T: 600000000000, V: 30}, + }, + }, + }, + }, + { + `avg(count_over_time({app=~"foo|bar"} |~".+bar" [1m]))`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + [][]*logproto.Stream{ + {newStream(testSize, factor(10, identity), `{app="foo"}`), newStream(testSize, factor(10, identity), `{app="bar"}`)}, + }, + []SelectParams{ + {&logproto.QueryRequest{Direction: logproto.FORWARD, Start: time.Unix(0, 0), End: time.Unix(180, 0), Limit: 0, Selector: `{app=~"foo|bar"}|~".+bar"`}}, + }, + promql.Matrix{ + promql.Series{ + Metric: labels.Labels{}, + Points: []promql.Point{{T: 60000000000, V: 6}, {T: 90000000000, V: 6}, {T: 120000000000, V: 6}, {T: 150000000000, V: 6}, {T: 180000000000, V: 6}}, + }, + }, + }, + { + `min(rate({app=~"foo|bar"} |~".+bar" [1m]))`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + [][]*logproto.Stream{ + {newStream(testSize, factor(10, identity), `{app="foo"}`), newStream(testSize, factor(10, identity), `{app="bar"}`)}, + }, + []SelectParams{ + {&logproto.QueryRequest{Direction: logproto.FORWARD, Start: time.Unix(0, 0), End: time.Unix(180, 0), Limit: 0, Selector: `{app=~"foo|bar"}|~".+bar"`}}, + }, + promql.Matrix{ + promql.Series{ + Metric: labels.Labels{}, + Points: []promql.Point{{T: 60000000000, V: 0.1}, {T: 90000000000, V: 0.1}, {T: 120000000000, V: 0.1}, {T: 150000000000, V: 0.1}, {T: 180000000000, V: 0.1}}, + }, + }, + }, + { + `max by (app) (rate({app=~"foo|bar"} |~".+bar" [1m]))`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + [][]*logproto.Stream{ + {newStream(testSize, factor(10, identity), `{app="foo"}`), newStream(testSize, factor(5, identity), `{app="bar"}`)}, + }, + []SelectParams{ + {&logproto.QueryRequest{Direction: logproto.FORWARD, Start: time.Unix(0, 0), End: time.Unix(180, 0), Limit: 0, Selector: `{app=~"foo|bar"}|~".+bar"`}}, + }, + promql.Matrix{ + promql.Series{ + Metric: labels.Labels{{Name: "app", Value: "bar"}}, + Points: []promql.Point{{T: 60000000000, V: 0.2}, {T: 90000000000, V: 0.2}, {T: 120000000000, V: 0.2}, {T: 150000000000, V: 0.2}, {T: 180000000000, V: 0.2}}, + }, + promql.Series{ + Metric: labels.Labels{{Name: "app", Value: "foo"}}, + Points: []promql.Point{{T: 60000000000, V: 0.1}, {T: 90000000000, V: 0.1}, {T: 120000000000, V: 0.1}, {T: 150000000000, V: 0.1}, {T: 180000000000, V: 0.1}}, + }, + }, + }, + { + `max(rate({app=~"foo|bar"} |~".+bar" [1m]))`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + [][]*logproto.Stream{ + {newStream(testSize, factor(10, identity), `{app="foo"}`), newStream(testSize, factor(5, identity), `{app="bar"}`)}, + }, + []SelectParams{ + {&logproto.QueryRequest{Direction: logproto.FORWARD, Start: time.Unix(0, 0), End: time.Unix(180, 0), Limit: 0, Selector: `{app=~"foo|bar"}|~".+bar"`}}, + }, + promql.Matrix{ + promql.Series{ + Metric: labels.Labels{}, + Points: []promql.Point{{T: 60000000000, V: 0.2}, {T: 90000000000, V: 0.2}, {T: 120000000000, V: 0.2}, {T: 150000000000, V: 0.2}, {T: 180000000000, V: 0.2}}, + }, + }, + }, + { + `sum(rate({app=~"foo|bar"} |~".+bar" [1m]))`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + [][]*logproto.Stream{ + {newStream(testSize, factor(5, identity), `{app="foo"}`), newStream(testSize, factor(5, identity), `{app="bar"}`)}, + }, + []SelectParams{ + {&logproto.QueryRequest{Direction: logproto.FORWARD, Start: time.Unix(0, 0), End: time.Unix(180, 0), Limit: 0, Selector: `{app=~"foo|bar"}|~".+bar"`}}, + }, + promql.Matrix{ + promql.Series{ + Metric: labels.Labels{}, + Points: []promql.Point{{T: 60000000000, V: 0.4}, {T: 90000000000, V: 0.4}, {T: 120000000000, V: 0.4}, {T: 150000000000, V: 0.4}, {T: 180000000000, V: 0.4}}, + }, + }, + }, + { + `sum(count_over_time({app=~"foo|bar"} |~".+bar" [1m])) by (app)`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + [][]*logproto.Stream{ + {newStream(testSize, factor(10, identity), `{app="foo"}`), newStream(testSize, factor(5, identity), `{app="bar"}`)}, + }, + []SelectParams{ + {&logproto.QueryRequest{Direction: logproto.FORWARD, Start: time.Unix(0, 0), End: time.Unix(180, 0), Limit: 0, Selector: `{app=~"foo|bar"}|~".+bar"`}}, + }, + promql.Matrix{ + promql.Series{ + Metric: labels.Labels{{Name: "app", Value: "bar"}}, + Points: []promql.Point{{T: 60000000000, V: 12}, {T: 90000000000, V: 12}, {T: 120000000000, V: 12}, {T: 150000000000, V: 12}, {T: 180000000000, V: 12}}, + }, + promql.Series{ + Metric: labels.Labels{{Name: "app", Value: "foo"}}, + Points: []promql.Point{{T: 60000000000, V: 6}, {T: 90000000000, V: 6}, {T: 120000000000, V: 6}, {T: 150000000000, V: 6}, {T: 180000000000, V: 6}}, + }, + }, + }, + { + `count(count_over_time({app=~"foo|bar"} |~".+bar" [1m])) without (app)`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + [][]*logproto.Stream{ + {newStream(testSize, factor(10, identity), `{app="foo"}`), newStream(testSize, factor(10, identity), `{app="bar"}`)}, + }, + []SelectParams{ + {&logproto.QueryRequest{Direction: logproto.FORWARD, Start: time.Unix(0, 0), End: time.Unix(180, 0), Limit: 0, Selector: `{app=~"foo|bar"}|~".+bar"`}}, + }, + promql.Matrix{ + promql.Series{ + Metric: labels.Labels{}, + Points: []promql.Point{{T: 60000000000, V: 2}, {T: 90000000000, V: 2}, {T: 120000000000, V: 2}, {T: 150000000000, V: 2}, {T: 180000000000, V: 2}}, + }, + }, + }, + { + `stdvar without (app) (count_over_time(({app=~"foo|bar"} |~".+bar")[1m])) `, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + [][]*logproto.Stream{ + {newStream(testSize, factor(10, identity), `{app="foo"}`), newStream(testSize, factor(5, identity), `{app="bar"}`)}, + }, + []SelectParams{ + {&logproto.QueryRequest{Direction: logproto.FORWARD, Start: time.Unix(0, 0), End: time.Unix(180, 0), Limit: 0, Selector: `{app=~"foo|bar"}|~".+bar"`}}, + }, + promql.Matrix{ + promql.Series{ + Metric: labels.Labels{}, + Points: []promql.Point{{T: 60000000000, V: 9}, {T: 90000000000, V: 9}, {T: 120000000000, V: 9}, {T: 150000000000, V: 9}, {T: 180000000000, V: 9}}, + }, + }, + }, + { + `stddev(count_over_time(({app=~"foo|bar"} |~".+bar")[1m])) `, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + [][]*logproto.Stream{ + {newStream(testSize, factor(10, identity), `{app="foo"}`), newStream(testSize, factor(2, identity), `{app="bar"}`)}, + }, + []SelectParams{ + {&logproto.QueryRequest{Direction: logproto.FORWARD, Start: time.Unix(0, 0), End: time.Unix(180, 0), Limit: 0, Selector: `{app=~"foo|bar"}|~".+bar"`}}, + }, + promql.Matrix{ + promql.Series{ + Metric: labels.Labels{}, + Points: []promql.Point{{T: 60000000000, V: 12}, {T: 90000000000, V: 12}, {T: 120000000000, V: 12}, {T: 150000000000, V: 12}, {T: 180000000000, V: 12}}, + }, + }, + }, + { + `rate(({app=~"foo|bar"} |~".+bar")[1m])`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + [][]*logproto.Stream{ + {newStream(testSize, factor(10, identity), `{app="foo"}`), newStream(testSize, factor(5, identity), `{app="bar"}`)}, + }, + []SelectParams{ + {&logproto.QueryRequest{Direction: logproto.FORWARD, Start: time.Unix(0, 0), End: time.Unix(180, 0), Limit: 0, Selector: `{app=~"foo|bar"}|~".+bar"`}}, + }, + promql.Matrix{ + promql.Series{ + Metric: labels.Labels{{Name: "app", Value: "bar"}}, + Points: []promql.Point{{T: 60000000000, V: 0.2}, {T: 90000000000, V: 0.2}, {T: 120000000000, V: 0.2}, {T: 150000000000, V: 0.2}, {T: 180000000000, V: 0.2}}, + }, + promql.Series{ + Metric: labels.Labels{{Name: "app", Value: "foo"}}, + Points: []promql.Point{{T: 60000000000, V: 0.1}, {T: 90000000000, V: 0.1}, {T: 120000000000, V: 0.1}, {T: 150000000000, V: 0.1}, {T: 180000000000, V: 0.1}}, + }, + }, + }, + { + `topk(2,rate(({app=~"foo|bar"} |~".+bar")[1m]))`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + [][]*logproto.Stream{ + {newStream(testSize, factor(10, identity), `{app="foo"}`), newStream(testSize, factor(5, identity), `{app="bar"}`), newStream(testSize, factor(15, identity), `{app="boo"}`)}, + }, + []SelectParams{ + {&logproto.QueryRequest{Direction: logproto.FORWARD, Start: time.Unix(0, 0), End: time.Unix(180, 0), Limit: 0, Selector: `{app=~"foo|bar"}|~".+bar"`}}, + }, + promql.Matrix{ + promql.Series{ + Metric: labels.Labels{{Name: "app", Value: "bar"}}, + Points: []promql.Point{{T: 60000000000, V: 0.2}, {T: 90000000000, V: 0.2}, {T: 120000000000, V: 0.2}, {T: 150000000000, V: 0.2}, {T: 180000000000, V: 0.2}}, + }, + promql.Series{ + Metric: labels.Labels{{Name: "app", Value: "foo"}}, + Points: []promql.Point{{T: 60000000000, V: 0.1}, {T: 90000000000, V: 0.1}, {T: 120000000000, V: 0.1}, {T: 150000000000, V: 0.1}, {T: 180000000000, V: 0.1}}, + }, + }, + }, + { + `topk(1,rate(({app=~"foo|bar"} |~".+bar")[1m]))`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + [][]*logproto.Stream{ + {newStream(testSize, factor(10, identity), `{app="foo"}`), newStream(testSize, factor(5, identity), `{app="bar"}`)}, + }, + []SelectParams{ + {&logproto.QueryRequest{Direction: logproto.FORWARD, Start: time.Unix(0, 0), End: time.Unix(180, 0), Limit: 0, Selector: `{app=~"foo|bar"}|~".+bar"`}}, + }, + promql.Matrix{ + promql.Series{ + Metric: labels.Labels{{Name: "app", Value: "bar"}}, + Points: []promql.Point{{T: 60000000000, V: 0.2}, {T: 90000000000, V: 0.2}, {T: 120000000000, V: 0.2}, {T: 150000000000, V: 0.2}, {T: 180000000000, V: 0.2}}, + }, + }, + }, + { + `topk(1,rate(({app=~"foo|bar"} |~".+bar")[1m])) by (app)`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + [][]*logproto.Stream{ + {newStream(testSize, factor(10, identity), `{app="foo"}`), newStream(testSize, factor(15, identity), `{app="fuzz"}`), + newStream(testSize, factor(5, identity), `{app="fuzz"}`), newStream(testSize, identity, `{app="buzz"}`)}, + }, + []SelectParams{ + {&logproto.QueryRequest{Direction: logproto.FORWARD, Start: time.Unix(0, 0), End: time.Unix(180, 0), Limit: 0, Selector: `{app=~"foo|bar"}|~".+bar"`}}, + }, + promql.Matrix{ + promql.Series{ + Metric: labels.Labels{{Name: "app", Value: "buzz"}}, + Points: []promql.Point{{T: 60000000000, V: 1}, {T: 90000000000, V: 1}, {T: 120000000000, V: 1}, {T: 150000000000, V: 1}, {T: 180000000000, V: 1}}, + }, + promql.Series{ + Metric: labels.Labels{{Name: "app", Value: "foo"}}, + Points: []promql.Point{{T: 60000000000, V: 0.1}, {T: 90000000000, V: 0.1}, {T: 120000000000, V: 0.1}, {T: 150000000000, V: 0.1}, {T: 180000000000, V: 0.1}}, + }, + promql.Series{ + Metric: labels.Labels{{Name: "app", Value: "fuzz"}}, + Points: []promql.Point{{T: 60000000000, V: 0.2}, {T: 90000000000, V: 0.2}, {T: 120000000000, V: 0.2}, {T: 150000000000, V: 0.2}, {T: 180000000000, V: 0.2}}, + }, + }, + }, + { + `bottomk(2,rate(({app=~"foo|bar"} |~".+bar")[1m]))`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + [][]*logproto.Stream{ + {newStream(testSize, factor(10, identity), `{app="foo"}`), newStream(testSize, factor(20, identity), `{app="bar"}`), + newStream(testSize, factor(5, identity), `{app="fuzz"}`), newStream(testSize, identity, `{app="buzz"}`)}, + }, + []SelectParams{ + {&logproto.QueryRequest{Direction: logproto.FORWARD, Start: time.Unix(0, 0), End: time.Unix(180, 0), Limit: 0, Selector: `{app=~"foo|bar"}|~".+bar"`}}, + }, + promql.Matrix{ + promql.Series{ + Metric: labels.Labels{{Name: "app", Value: "bar"}}, + Points: []promql.Point{{T: 60000000000, V: 0.05}, {T: 90000000000, V: 0.05}, {T: 120000000000, V: 0.05}, {T: 150000000000, V: 0.05}, {T: 180000000000, V: 0.05}}, + }, + promql.Series{ + Metric: labels.Labels{{Name: "app", Value: "foo"}}, + Points: []promql.Point{{T: 60000000000, V: 0.1}, {T: 90000000000, V: 0.1}, {T: 120000000000, V: 0.1}, {T: 150000000000, V: 0.1}, {T: 180000000000, V: 0.1}}, + }, + }, + }, + { + `bottomk(3,rate(({app=~"foo|bar"} |~".+bar")[1m])) without (app)`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, logproto.FORWARD, 100, + [][]*logproto.Stream{ + {newStream(testSize, factor(10, identity), `{app="foo"}`), newStream(testSize, factor(20, identity), `{app="bar"}`), + newStream(testSize, factor(5, identity), `{app="fuzz"}`), newStream(testSize, identity, `{app="buzz"}`)}, + }, + []SelectParams{ + {&logproto.QueryRequest{Direction: logproto.FORWARD, Start: time.Unix(0, 0), End: time.Unix(180, 0), Limit: 0, Selector: `{app=~"foo|bar"}|~".+bar"`}}, + }, + promql.Matrix{ + promql.Series{ + Metric: labels.Labels{{Name: "app", Value: "bar"}}, + Points: []promql.Point{{T: 60000000000, V: 0.05}, {T: 90000000000, V: 0.05}, {T: 120000000000, V: 0.05}, {T: 150000000000, V: 0.05}, {T: 180000000000, V: 0.05}}, + }, + promql.Series{ + Metric: labels.Labels{{Name: "app", Value: "foo"}}, + Points: []promql.Point{{T: 60000000000, V: 0.1}, {T: 90000000000, V: 0.1}, {T: 120000000000, V: 0.1}, {T: 150000000000, V: 0.1}, {T: 180000000000, V: 0.1}}, + }, + promql.Series{ + Metric: labels.Labels{{Name: "app", Value: "fuzz"}}, + Points: []promql.Point{{T: 60000000000, V: 0.2}, {T: 90000000000, V: 0.2}, {T: 120000000000, V: 0.2}, {T: 150000000000, V: 0.2}, {T: 180000000000, V: 0.2}}, + }, + }, + }, + } { + test := test + t.Run(fmt.Sprintf("%s %s", test.qs, test.direction), func(t *testing.T) { + t.Parallel() + + q := eng.NewRangeQuery(newQuerierRecorder(test.streams, test.params), test.qs, test.start, test.end, test.step, test.direction, test.limit) + res, err := q.Exec(context.Background()) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, test.expected, res) + }) + } +} + type querierRecorder struct { source map[string][]*logproto.Stream } diff --git a/pkg/querier/http.go b/pkg/querier/http.go index eb88a5fab0..6c5ef8809e 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -1,6 +1,7 @@ package querier import ( + "context" "encoding/json" "fmt" "net/http" @@ -187,13 +188,17 @@ type instantQueryRequest struct { // RangeQueryHandler is a http.HandlerFunc for range queries. func (q *Querier) RangeQueryHandler(w http.ResponseWriter, r *http.Request) { + // Enforce the query timeout while querying backends + ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(q.cfg.QueryTimeout)) + defer cancel() + request, err := httpRequestToRangeQueryRequest(r) if err != nil { server.WriteError(w, err) return } query := q.engine.NewRangeQuery(q, request.query, request.start, request.end, request.step, request.direction, request.limit) - result, err := query.Exec(r.Context()) + result, err := query.Exec(ctx) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return @@ -212,13 +217,17 @@ func (q *Querier) RangeQueryHandler(w http.ResponseWriter, r *http.Request) { // InstantQueryHandler is a http.HandlerFunc for instant queries. func (q *Querier) InstantQueryHandler(w http.ResponseWriter, r *http.Request) { + // Enforce the query timeout while querying backends + ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(q.cfg.QueryTimeout)) + defer cancel() + request, err := httpRequestToInstantQueryRequest(r) if err != nil { server.WriteError(w, err) return } query := q.engine.NewInstantQuery(q, request.query, request.ts, request.direction, request.limit) - result, err := query.Exec(r.Context()) + result, err := query.Exec(ctx) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return @@ -237,13 +246,17 @@ func (q *Querier) InstantQueryHandler(w http.ResponseWriter, r *http.Request) { // LogQueryHandler is a http.HandlerFunc for log only queries. func (q *Querier) LogQueryHandler(w http.ResponseWriter, r *http.Request) { + // Enforce the query timeout while querying backends + ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(q.cfg.QueryTimeout)) + defer cancel() + request, err := httpRequestToRangeQueryRequest(r) if err != nil { server.WriteError(w, err) return } query := q.engine.NewRangeQuery(q, request.query, request.start, request.end, request.step, request.direction, request.limit) - result, err := query.Exec(r.Context()) + result, err := query.Exec(ctx) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 33efadb22b..88b276cd03 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -144,9 +144,6 @@ func (q *Querier) forGivenIngesters(replicationSet ring.ReplicationSet, f func(l // Select Implements logql.Querier which select logs via matchers and regex filters. func (q *Querier) Select(ctx context.Context, params logql.SelectParams) (iter.EntryIterator, error) { - // Enforce the query timeout while querying backends - ctx, cancel := context.WithDeadline(ctx, time.Now().Add(q.cfg.QueryTimeout)) - defer cancel() ingesterIterators, err := q.queryIngesters(ctx, params) if err != nil { diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 46ed4a088b..160f9d97ca 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -7,7 +7,6 @@ import ( "github.com/cortexproject/cortex/pkg/ring" "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/logql" "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -20,54 +19,6 @@ const ( queryTimeout = 12 * time.Second ) -func TestQuerier_Query_QueryTimeoutConfigFlag(t *testing.T) { - query := &logproto.QueryRequest{ - Selector: "{type=\"test\"}", - Limit: 10, - Start: time.Now().Add(-1 * time.Minute), - End: time.Now(), - Direction: logproto.FORWARD, - } - request := logql.SelectParams{ - QueryRequest: query, - } - - store := newStoreMock() - store.On("LazyQuery", mock.Anything, mock.Anything).Return(mockStreamIterator(1, 2), nil) - - queryClient := newQueryClientMock() - queryClient.On("Recv").Return(mockQueryResponse([]*logproto.Stream{mockStream(1, 2)}), nil) - - ingesterClient := newQuerierClientMock() - ingesterClient.On("Query", mock.Anything, query, mock.Anything).Return(queryClient, nil) - - q, err := newQuerier( - mockQuerierConfig(), - mockIngesterClientConfig(), - newIngesterClientMockFactory(ingesterClient), - mockReadRingWithOneActiveIngester(), - store) - require.NoError(t, err) - - ctx := user.InjectOrgID(context.Background(), "test") - _, err = q.Select(ctx, request) - require.NoError(t, err) - - calls := ingesterClient.GetMockedCallsByMethod("Query") - assert.Equal(t, 1, len(calls)) - deadline, ok := calls[0].Arguments.Get(0).(context.Context).Deadline() - assert.True(t, ok) - assert.WithinDuration(t, deadline, time.Now().Add(queryTimeout), 1*time.Second) - - calls = store.GetMockedCallsByMethod("LazyQuery") - assert.Equal(t, 1, len(calls)) - deadline, ok = calls[0].Arguments.Get(0).(context.Context).Deadline() - assert.True(t, ok) - assert.WithinDuration(t, deadline, time.Now().Add(queryTimeout), 1*time.Second) - - store.AssertExpectations(t) -} - func TestQuerier_Label_QueryTimeoutConfigFlag(t *testing.T) { startTime := time.Now().Add(-1 * time.Minute) endTime := time.Now() @@ -221,7 +172,6 @@ func TestQuerier_tailDisconnectedIngesters(t *testing.T) { t.Run(testName, func(t *testing.T) { req := logproto.TailRequest{ Query: "{type=\"test\"}", - Regex: "", DelayFor: 0, Limit: 10, Start: time.Now(),