package queryrange import ( "context" "fmt" "net/http" "sync" "testing" "time" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" "github.com/stretchr/testify/require" "github.com/weaveworks/common/user" "go.uber.org/atomic" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logqlmodel" "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" "github.com/grafana/loki/pkg/storage/config" util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/util/marshal" ) func TestLimits(t *testing.T) { l := fakeLimits{ splits: map[string]time.Duration{"a": time.Minute}, } wrapped := WithSplitByLimits(l, time.Hour) // Test default require.Equal(t, wrapped.QuerySplitDuration("b"), time.Hour) // Ensure we override the underlying implementation require.Equal(t, wrapped.QuerySplitDuration("a"), time.Hour) r := &LokiRequest{ Query: "qry", StartTs: time.Now(), Step: int64(time.Minute / time.Millisecond), } require.Equal( t, fmt.Sprintf("%s:%s:%d:%d:%d", "a", r.GetQuery(), r.GetStep(), r.GetStart()/int64(time.Hour/time.Millisecond), int64(time.Hour)), cacheKeyLimits{wrapped}.GenerateCacheKey("a", r), ) } func Test_seriesLimiter(t *testing.T) { cfg := testConfig cfg.CacheResults = false // split in 7 with 2 in // max. l := WithSplitByLimits(fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, time.Hour) tpw, stopper, err := NewTripperware(cfg, util_log.Logger, l, config.SchemaConfig{}, nil, nil) if stopper != nil { defer stopper.Stop() } require.NoError(t, err) lreq := &LokiRequest{ Query: `rate({app="foo"} |= "foo"[1m])`, Limit: 1000, Step: 30000, // 30sec StartTs: testTime.Add(-6 * time.Hour), EndTs: testTime, Direction: logproto.FORWARD, Path: "/query_range", } ctx := user.InjectOrgID(context.Background(), "1") req, err := LokiCodec.EncodeRequest(ctx, lreq) require.NoError(t, err) req = req.WithContext(ctx) err = user.InjectOrgIDIntoHTTPRequest(ctx, req) require.NoError(t, err) rt, err := newfakeRoundTripper() require.NoError(t, err) defer rt.Close() count, h := promqlResult(matrix) rt.setHandler(h) _, err = tpw(rt).RoundTrip(req) require.NoError(t, err) require.Equal(t, 7, *count) // 2 series should not be allowed. c := new(int) m := &sync.Mutex{} h = http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { m.Lock() defer m.Unlock() defer func() { *c++ }() // first time returns a single series if *c == 0 { if err := marshal.WriteQueryResponseJSON(logqlmodel.Result{Data: matrix}, rw); err != nil { panic(err) } return } // second time returns a different series. if err := marshal.WriteQueryResponseJSON(logqlmodel.Result{ Data: promql.Matrix{ { Points: []promql.Point{ { T: toMs(testTime.Add(-4 * time.Hour)), V: 0.013333333333333334, }, }, Metric: []labels.Label{ { Name: "filename", Value: `/var/hostlog/apport.log`, }, { Name: "job", Value: "anotherjob", }, }, }, }, }, rw); err != nil { panic(err) } }) rt.setHandler(h) _, err = tpw(rt).RoundTrip(req) require.Error(t, err) require.LessOrEqual(t, *c, 4) } func Test_MaxQueryParallelism(t *testing.T) { maxQueryParallelism := 2 f, err := newfakeRoundTripper() require.Nil(t, err) var count atomic.Int32 var max atomic.Int32 f.setHandler(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { cur := count.Inc() if cur > max.Load() { max.Store(cur) } defer count.Dec() // simulate some work time.Sleep(20 * time.Millisecond) })) ctx := user.InjectOrgID(context.Background(), "foo") r, err := http.NewRequestWithContext(ctx, "GET", "/query_range", http.NoBody) require.Nil(t, err) _, _ = NewLimitedRoundTripper(f, LokiCodec, fakeLimits{maxQueryParallelism: maxQueryParallelism}, queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler { return queryrangebase.HandlerFunc(func(c context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func() { defer wg.Done() _, _ = next.Do(c, &LokiRequest{}) }() } wg.Wait() return nil, nil }) }), ).RoundTrip(r) maxFound := int(max.Load()) require.LessOrEqual(t, maxFound, maxQueryParallelism, "max query parallelism: ", maxFound, " went over the configured one:", maxQueryParallelism) } func Test_MaxQueryParallelismLateScheduling(t *testing.T) { maxQueryParallelism := 2 f, err := newfakeRoundTripper() require.Nil(t, err) f.setHandler(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { // simulate some work time.Sleep(20 * time.Millisecond) })) ctx := user.InjectOrgID(context.Background(), "foo") r, err := http.NewRequestWithContext(ctx, "GET", "/query_range", http.NoBody) require.Nil(t, err) _, _ = NewLimitedRoundTripper(f, LokiCodec, fakeLimits{maxQueryParallelism: maxQueryParallelism}, queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler { return queryrangebase.HandlerFunc(func(c context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { for i := 0; i < 10; i++ { go func() { _, _ = next.Do(c, &LokiRequest{}) }() } return nil, nil }) }), ).RoundTrip(r) } func Test_MaxQueryParallelismDisable(t *testing.T) { maxQueryParallelism := 0 f, err := newfakeRoundTripper() require.Nil(t, err) f.setHandler(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { // simulate some work time.Sleep(20 * time.Millisecond) })) ctx := user.InjectOrgID(context.Background(), "foo") r, err := http.NewRequestWithContext(ctx, "GET", "/query_range", http.NoBody) require.Nil(t, err) _, err = NewLimitedRoundTripper(f, LokiCodec, fakeLimits{maxQueryParallelism: maxQueryParallelism}, queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler { return queryrangebase.HandlerFunc(func(c context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { for i := 0; i < 10; i++ { go func() { _, _ = next.Do(c, &LokiRequest{}) }() } return nil, nil }) }), ).RoundTrip(r) require.Error(t, err) } func Test_MaxQueryLookBack(t *testing.T) { tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{ maxQueryLookback: 1 * time.Hour, maxQueryParallelism: 1, }, config.SchemaConfig{}, nil, nil) if stopper != nil { defer stopper.Stop() } require.NoError(t, err) rt, err := newfakeRoundTripper() require.NoError(t, err) defer rt.Close() lreq := &LokiRequest{ Query: `{app="foo"} |= "foo"`, Limit: 10000, StartTs: testTime.Add(-6 * time.Hour), EndTs: testTime, Direction: logproto.FORWARD, Path: "/loki/api/v1/query_range", } ctx := user.InjectOrgID(context.Background(), "1") req, err := LokiCodec.EncodeRequest(ctx, lreq) require.NoError(t, err) req = req.WithContext(ctx) err = user.InjectOrgIDIntoHTTPRequest(ctx, req) require.NoError(t, err) _, err = tpw(rt).RoundTrip(req) require.NoError(t, err) } func Test_GenerateCacheKey_NoDivideZero(t *testing.T) { l := cacheKeyLimits{WithSplitByLimits(nil, 0)} start := time.Now() r := &LokiRequest{ Query: "qry", StartTs: start, Step: int64(time.Minute / time.Millisecond), } require.Equal( t, fmt.Sprintf("foo:qry:%d:0:0", r.GetStep()), l.GenerateCacheKey("foo", r), ) }