Loki: Improve query timeouts behavior (#7230)

**What this PR does / why we need it**:
Improves the different query-related timeouts behavior by fixing a bug and having better defaults.
The changes are:
- Fix the timeout middleware by creating a new request that will use the new WithDeadline context
- `engine:timeout` was being ignored. This PR improves such scenario by using **the query timeout as a fallback** instead of the default `5min` for all cases
pull/7312/head
Dylan Guedes 4 years ago committed by GitHub
parent 8868957783
commit f9f1eddfc5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      pkg/logcli/query/query_test.go
  2. 1
      pkg/logql/downstream.go
  3. 25
      pkg/logql/engine.go
  4. 7
      pkg/querier/http.go
  5. 157
      pkg/querier/http_test.go

@ -14,6 +14,7 @@ import (
"github.com/gorilla/websocket"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/logcli/output"
"github.com/grafana/loki/pkg/loghttp"
@ -505,6 +506,7 @@ func mustParseLabels(t *testing.T, s string) loghttp.LabelSet {
type testQueryClient struct {
engine *logql.Engine
queryRangeCalls int
orgID string
}
func newTestQueryClient(testStreams ...logproto.Stream) *testQueryClient {
@ -521,10 +523,11 @@ func (t *testQueryClient) Query(queryStr string, limit int, time time.Time, dire
}
func (t *testQueryClient) QueryRange(queryStr string, limit int, from, through time.Time, direction logproto.Direction, step, interval time.Duration, quiet bool) (*loghttp.QueryResponse, error) {
ctx := user.InjectOrgID(context.Background(), "fake")
params := logql.NewLiteralParams(queryStr, from, through, step, interval, direction, uint32(limit), nil)
v, err := t.engine.Query(params).Exec(context.Background())
v, err := t.engine.Query(params).Exec(ctx)
if err != nil {
return nil, err
}

@ -67,6 +67,7 @@ func (ng *DownstreamEngine) Query(ctx context.Context, p Params, mapped syntax.E
logger: ng.logger,
params: p,
evaluator: NewDownstreamEvaluator(ng.downstreamable.Downstreamer(ctx)),
timeout: ng.opts.Timeout,
parse: func(_ context.Context, _ string) (syntax.Expr, error) {
return mapped, nil
},

@ -119,6 +119,7 @@ func (opts *EngineOpts) applyDefault() {
// Engine is the LogQL engine.
type Engine struct {
Timeout time.Duration
logger log.Logger
evaluator Evaluator
limits Limits
@ -126,6 +127,7 @@ type Engine struct {
// NewEngine creates a new LogQL Engine.
func NewEngine(opts EngineOpts, q Querier, l Limits, logger log.Logger) *Engine {
queryTimeout := opts.Timeout
opts.applyDefault()
if logger == nil {
logger = log.NewNopLogger()
@ -134,6 +136,7 @@ func NewEngine(opts EngineOpts, q Querier, l Limits, logger log.Logger) *Engine
logger: logger,
evaluator: NewDefaultEvaluator(q, opts.MaxLookBackPeriod),
limits: l,
Timeout: queryTimeout,
}
}
@ -146,8 +149,9 @@ func (ng *Engine) Query(params Params) Query {
parse: func(_ context.Context, query string) (syntax.Expr, error) {
return syntax.ParseExpr(query)
},
record: true,
limits: ng.limits,
record: true,
limits: ng.limits,
timeout: ng.Timeout,
}
}
@ -162,6 +166,7 @@ type query struct {
params Params
parse func(context.Context, string) (syntax.Expr, error)
limits Limits
timeout time.Duration
evaluator Evaluator
record bool
}
@ -222,13 +227,17 @@ func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) {
}
func (q *query) Eval(ctx context.Context) (promql_parser.Value, error) {
queryTimeout := time.Minute * 5
userID, err := tenant.TenantID(ctx)
if err != nil {
level.Warn(q.logger).Log("msg", fmt.Sprintf("couldn't fetch tenantID to evaluate query timeout, using default value of %s", queryTimeout), "err", err)
} else {
queryTimeout = q.limits.QueryTimeout(userID)
queryTimeout := q.timeout
if q.timeout == 0 {
queryTimeout = time.Minute * 5
userID, err := tenant.TenantID(ctx)
if err != nil {
level.Warn(q.logger).Log("msg", fmt.Sprintf("couldn't fetch tenantID to evaluate query timeout, using default value of %s", queryTimeout), "err", err)
return nil, err
}
queryTimeout = q.limits.QueryTimeout(userID) + time.Second
}
ctx, cancel := context.WithTimeout(ctx, queryTimeout)
defer cancel()

@ -44,7 +44,7 @@ type QueryResponse struct {
Result parser.Value `json:"result"`
}
//nolint // QurierAPI defines HTTP handler functions for the querier.
// nolint // QuerierAPI defines HTTP handler functions for the querier.
type QuerierAPI struct {
querier Querier
cfg Config
@ -502,10 +502,11 @@ func WrapQuerySpanAndTimeout(call string, q *QuerierAPI) middleware.Interface {
queryTimeout = q.cfg.QueryTimeout
}
_, cancel := context.WithDeadline(ctx, time.Now().Add(queryTimeout))
newCtx, cancel := context.WithTimeout(ctx, queryTimeout)
defer cancel()
next.ServeHTTP(w, req)
newReq := req.WithContext(newCtx)
next.ServeHTTP(w, newReq)
})
})
}

@ -1,12 +1,16 @@
package querier
import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/go-kit/log"
"github.com/grafana/dskit/tenant"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
@ -34,3 +38,156 @@ func TestTailHandler(t *testing.T) {
require.Equal(t, http.StatusBadRequest, rr.Code)
require.Equal(t, "multiple org IDs present\n", rr.Body.String())
}
type slowConnectionSimulator struct {
sleepFor time.Duration
deadline time.Duration
didTimeout bool
}
func (s *slowConnectionSimulator) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
if err := ctx.Err(); err != nil {
panic(fmt.Sprintf("context already errored: %s", err))
}
time.Sleep(s.sleepFor)
select {
case <-ctx.Done():
switch ctx.Err() {
case context.DeadlineExceeded:
s.didTimeout = true
case context.Canceled:
panic("context already canceled")
}
case <-time.After(s.deadline):
}
}
func TestQueryWrapperMiddleware(t *testing.T) {
tenant.WithDefaultResolver(tenant.NewMultiResolver())
shortestTimeout := time.Millisecond * 5
t.Run("request timeout is the shortest one", func(t *testing.T) {
defaultLimits := defaultLimitsTestConfig()
limits, err := validation.NewOverrides(defaultLimits, nil)
require.NoError(t, err)
api := NewQuerierAPI(mockQuerierConfig(), nil, limits, log.NewNopLogger())
// request timeout is 5ms but it sleeps for 100ms, so timeout injected in the request is expected.
connSimulator := &slowConnectionSimulator{
sleepFor: time.Millisecond * 100,
deadline: shortestTimeout,
}
api.cfg.QueryTimeout = time.Millisecond * 10
midl := WrapQuerySpanAndTimeout("mycall", api).Wrap(connSimulator)
req, err := http.NewRequest("GET", "/loki/api/v1/label", nil)
ctx, cancelFunc := context.WithTimeout(user.InjectOrgID(req.Context(), "fake"), shortestTimeout)
defer cancelFunc()
req = req.WithContext(ctx)
require.NoError(t, err)
rr := httptest.NewRecorder()
srv := http.HandlerFunc(midl.ServeHTTP)
srv.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
select {
case <-ctx.Done():
break
case <-time.After(shortestTimeout):
require.FailNow(t, "should have timed out before %s", shortestTimeout)
default:
require.FailNow(t, "timeout expected")
}
require.True(t, connSimulator.didTimeout)
})
t.Run("old querier:query_timeout is configured to supersede all others", func(t *testing.T) {
defaultLimits := defaultLimitsTestConfig()
defaultLimits.QueryTimeout = model.Duration(shortestTimeout)
limits, err := validation.NewOverrides(defaultLimits, nil)
require.NoError(t, err)
api := NewQuerierAPI(mockQuerierConfig(), nil, limits, log.NewNopLogger())
// configure old querier:query_timeout parameter.
// although it is longer than the limits timeout, it should supersede it.
api.cfg.QueryTimeout = time.Millisecond * 100
// although limits:query_timeout is shorter than querier:query_timeout,
// limits:query_timeout should be ignored.
// here we configure it to sleep for 100ms and we want it to timeout at the 100ms.
connSimulator := &slowConnectionSimulator{
sleepFor: api.cfg.QueryTimeout,
deadline: time.Millisecond * 200,
}
midl := WrapQuerySpanAndTimeout("mycall", api).Wrap(connSimulator)
req, err := http.NewRequest("GET", "/loki/api/v1/label", nil)
ctx, cancelFunc := context.WithTimeout(user.InjectOrgID(req.Context(), "fake"), time.Millisecond*200)
defer cancelFunc()
req = req.WithContext(ctx)
require.NoError(t, err)
rr := httptest.NewRecorder()
srv := http.HandlerFunc(midl.ServeHTTP)
srv.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
select {
case <-ctx.Done():
require.FailNow(t, fmt.Sprintf("should timeout in %s", api.cfg.QueryTimeout))
case <-time.After(shortestTimeout):
// didn't use the limits timeout (i.e: shortest one), exactly what we want.
break
case <-time.After(api.cfg.QueryTimeout):
require.FailNow(t, fmt.Sprintf("should timeout in %s", api.cfg.QueryTimeout))
}
require.True(t, connSimulator.didTimeout)
})
t.Run("new limits query timeout is configured to supersede all others", func(t *testing.T) {
defaultLimits := defaultLimitsTestConfig()
defaultLimits.QueryTimeout = model.Duration(shortestTimeout)
limits, err := validation.NewOverrides(defaultLimits, nil)
require.NoError(t, err)
api := NewQuerierAPI(mockQuerierConfig(), nil, limits, log.NewNopLogger())
connSimulator := &slowConnectionSimulator{
sleepFor: time.Millisecond * 100,
deadline: shortestTimeout,
}
midl := WrapQuerySpanAndTimeout("mycall", api).Wrap(connSimulator)
req, err := http.NewRequest("GET", "/loki/api/v1/label", nil)
ctx, cancelFunc := context.WithTimeout(user.InjectOrgID(req.Context(), "fake"), time.Millisecond*100)
defer cancelFunc()
req = req.WithContext(ctx)
require.NoError(t, err)
rr := httptest.NewRecorder()
srv := http.HandlerFunc(midl.ServeHTTP)
srv.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
select {
case <-ctx.Done():
break
case <-time.After(shortestTimeout):
require.FailNow(t, "should have timed out before %s", shortestTimeout)
}
require.True(t, connSimulator.didTimeout)
})
}

Loading…
Cancel
Save