From 39101308f105418fb486cd4e53e68b11db4d8357 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Tue, 15 Mar 2022 15:49:20 +0100 Subject: [PATCH] [Frontend] Log Result Cache (#5502) * Rough verion still require some fixes * with file saved * Finish off the code and logic Signed-off-by: Cyril Tovena * Refactors metrics and hook the log cache middleware Signed-off-by: Cyril Tovena * Add tests, fixes and lint Signed-off-by: Cyril Tovena * Hash the key Signed-off-by: Cyril Tovena * better comment Signed-off-by: Cyril Tovena * better comment Signed-off-by: Cyril Tovena * Update pkg/querier/queryrange/log_result_cache.go Co-authored-by: Christian Simon * Update pkg/querier/queryrange/log_result_cache.go Co-authored-by: Christian Simon * Fixes typo in variable namme Signed-off-by: Cyril Tovena * Use only one ok bool Signed-off-by: Cyril Tovena * Review feedbacks Signed-off-by: Cyril Tovena * check array boundaries Signed-off-by: Cyril Tovena Co-authored-by: Christian Simon --- pkg/querier/queryrange/codec.go | 61 ++- pkg/querier/queryrange/log_result_cache.go | 279 +++++++++++ .../queryrange/log_result_cache_test.go | 452 ++++++++++++++++++ pkg/querier/queryrange/metrics.go | 26 + .../queryrangebase/results_cache.go | 16 +- .../queryrangebase/results_cache_test.go | 37 +- .../queryrange/queryrangebase/roundtrip.go | 132 +---- .../queryrangebase/roundtrip_test.go | 108 ----- pkg/querier/queryrange/roundtrip.go | 148 +++--- pkg/querier/queryrange/split_by_interval.go | 1 - 10 files changed, 911 insertions(+), 349 deletions(-) create mode 100644 pkg/querier/queryrange/log_result_cache.go create mode 100644 pkg/querier/queryrange/log_result_cache_test.go create mode 100644 pkg/querier/queryrange/metrics.go diff --git a/pkg/querier/queryrange/codec.go b/pkg/querier/queryrange/codec.go index 8932429c43..712997198e 100644 --- a/pkg/querier/queryrange/codec.go +++ b/pkg/querier/queryrange/codec.go @@ -50,6 +50,13 @@ func (r *LokiRequest) WithStartEnd(s int64, e int64) queryrangebase.Request { return &new } +func (r *LokiRequest) WithStartEndTime(s time.Time, e time.Time) *LokiRequest { + new := *r + new.StartTs = s + new.EndTs = e + return &new +} + func (r *LokiRequest) WithQuery(query string) queryrangebase.Request { new := *r new.Query = query @@ -565,28 +572,7 @@ func (Codec) MergeResponse(responses ...queryrangebase.Response) (queryrangebase Statistics: mergedStats, }, nil case *LokiResponse: - lokiRes := responses[0].(*LokiResponse) - - lokiResponses := make([]*LokiResponse, 0, len(responses)) - for _, res := range responses { - lokiResult := res.(*LokiResponse) - mergedStats.Merge(lokiResult.Statistics) - lokiResponses = append(lokiResponses, lokiResult) - } - - return &LokiResponse{ - Status: loghttp.QueryStatusSuccess, - Direction: lokiRes.Direction, - Limit: lokiRes.Limit, - Version: lokiRes.Version, - ErrorType: lokiRes.ErrorType, - Error: lokiRes.Error, - Statistics: mergedStats, - Data: LokiData{ - ResultType: loghttp.ResultTypeStream, - Result: mergeOrderedNonOverlappingStreams(lokiResponses, lokiRes.Limit, lokiRes.Direction), - }, - }, nil + return mergeLokiResponse(responses...), nil case *LokiSeriesResponse: lokiSeriesRes := responses[0].(*LokiSeriesResponse) @@ -910,3 +896,34 @@ func NewEmptyResponse(r queryrangebase.Request) (queryrangebase.Response, error) return nil, fmt.Errorf("unsupported request type %T", req) } } + +func mergeLokiResponse(responses ...queryrangebase.Response) *LokiResponse { + if len(responses) == 0 { + return nil + } + var ( + lokiRes = responses[0].(*LokiResponse) + mergedStats stats.Result + lokiResponses = make([]*LokiResponse, 0, len(responses)) + ) + + for _, res := range responses { + lokiResult := res.(*LokiResponse) + mergedStats.Merge(lokiResult.Statistics) + lokiResponses = append(lokiResponses, lokiResult) + } + + return &LokiResponse{ + Status: loghttp.QueryStatusSuccess, + Direction: lokiRes.Direction, + Limit: lokiRes.Limit, + Version: lokiRes.Version, + ErrorType: lokiRes.ErrorType, + Error: lokiRes.Error, + Statistics: mergedStats, + Data: LokiData{ + ResultType: loghttp.ResultTypeStream, + Result: mergeOrderedNonOverlappingStreams(lokiResponses, lokiRes.Limit, lokiRes.Direction), + }, + } +} diff --git a/pkg/querier/queryrange/log_result_cache.go b/pkg/querier/queryrange/log_result_cache.go new file mode 100644 index 0000000000..4e06fcd746 --- /dev/null +++ b/pkg/querier/queryrange/log_result_cache.go @@ -0,0 +1,279 @@ +package queryrange + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/gogo/protobuf/proto" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/common/model" + "github.com/weaveworks/common/httpgrpc" + "golang.org/x/sync/errgroup" + + "github.com/grafana/loki/pkg/loghttp" + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logqlmodel/stats" + "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" + "github.com/grafana/loki/pkg/storage/chunk/cache" + "github.com/grafana/loki/pkg/tenant" + "github.com/grafana/loki/pkg/util/validation" +) + +// LogResultCacheMetrics is the metrics wrapper used in log result cache. +type LogResultCacheMetrics struct { + CacheHit prometheus.Counter + CacheMiss prometheus.Counter +} + +// NewLogResultCacheMetrics creates metrics to be used in log result cache. +func NewLogResultCacheMetrics(registerer prometheus.Registerer) *LogResultCacheMetrics { + return &LogResultCacheMetrics{ + CacheHit: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Namespace: "loki", + Name: "query_frontend_log_result_cache_hit_total", + }), + CacheMiss: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Namespace: "loki", + Name: "query_frontend_log_result_cache_miss_total", + }), + } +} + +// NewLogResultCache creates a new log result cache middleware. +// Currently it only caches empty filter queries, this is because those are usually easily and freely cacheable. +// Log hits are difficult to handle because of the limit query parameter and the size of the response. +// In the future it could be extended to cache non-empty query results. +// see https://docs.google.com/document/d/1_mACOpxdWZ5K0cIedaja5gzMbv-m0lUVazqZd2O4mEU/edit +func NewLogResultCache(logger log.Logger, limits Limits, cache cache.Cache, shouldCache queryrangebase.ShouldCacheFn, metrics *LogResultCacheMetrics) queryrangebase.Middleware { + if metrics == nil { + metrics = NewLogResultCacheMetrics(nil) + } + return queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler { + return &logResultCache{ + next: next, + limits: limits, + cache: cache, + logger: logger, + shouldCache: shouldCache, + metrics: metrics, + } + }) +} + +type logResultCache struct { + next queryrangebase.Handler + limits Limits + cache cache.Cache + shouldCache queryrangebase.ShouldCacheFn + + metrics *LogResultCacheMetrics + logger log.Logger +} + +func (l *logResultCache) Do(ctx context.Context, req queryrangebase.Request) (queryrangebase.Response, error) { + tenantIDs, err := tenant.TenantIDs(ctx) + if err != nil { + return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + } + + if l.shouldCache != nil && !l.shouldCache(req) { + return l.next.Do(ctx, req) + } + + maxCacheFreshness := validation.MaxDurationPerTenant(tenantIDs, l.limits.MaxCacheFreshness) + maxCacheTime := int64(model.Now().Add(-maxCacheFreshness)) + if req.GetEnd() > maxCacheTime { + return l.next.Do(ctx, req) + } + + lokiReq, ok := req.(*LokiRequest) + if !ok { + return nil, httpgrpc.Errorf(http.StatusInternalServerError, "invalid request type %T", req) + } + + interval := validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, l.limits.QuerySplitDuration) + // skip caching by if interval is unset + if interval == 0 { + return l.next.Do(ctx, req) + } + // The first subquery might not be aligned. + alignedStart := time.Unix(0, lokiReq.GetStartTs().UnixNano()-(lokiReq.GetStartTs().UnixNano()%interval.Nanoseconds())) + // generate the cache key based on query, tenant and start time. + cacheKey := fmt.Sprintf("log:%s:%s:%d:%d", tenant.JoinTenantIDs(tenantIDs), req.GetQuery(), interval.Nanoseconds(), alignedStart.UnixNano()/(interval.Nanoseconds())) + + _, buff, _, err := l.cache.Fetch(ctx, []string{cache.HashKey(cacheKey)}) + if err != nil { + level.Warn(l.logger).Log("msg", "error fetching cache", "err", err, "cacheKey", cacheKey) + return l.next.Do(ctx, req) + } + // we expect only one key to be found or missing. + if len(buff) > 1 { + level.Warn(l.logger).Log("msg", "unexpected length of cache return values", "buff", len(buff)) + return l.next.Do(ctx, req) + } + + if len(buff) == 0 { + // cache miss + return l.handleMiss(ctx, cacheKey, lokiReq) + } + + // cache hit + var cachedRequest LokiRequest + err = proto.Unmarshal(buff[0], &cachedRequest) + if err != nil { + level.Warn(l.logger).Log("msg", "error unmarshalling request from cache", "err", err) + return l.next.Do(ctx, req) + } + return l.handleHit(ctx, cacheKey, &cachedRequest, lokiReq) +} + +func (l *logResultCache) handleMiss(ctx context.Context, cacheKey string, req *LokiRequest) (queryrangebase.Response, error) { + l.metrics.CacheMiss.Inc() + level.Debug(l.logger).Log("msg", "cache miss", "key", cacheKey) + resp, err := l.next.Do(ctx, req) + if err != nil { + return nil, err + } + lokiRes, ok := resp.(*LokiResponse) + if !ok { + return nil, fmt.Errorf("unexpected response type %T", resp) + } + // At the moment we only cache empty results + if !isEmpty(lokiRes) { + return resp, nil + } + data, err := proto.Marshal(req) + if err != nil { + level.Warn(l.logger).Log("msg", "error marshalling request", "err", err) + return resp, nil + } + // cache the result + err = l.cache.Store(ctx, []string{cache.HashKey(cacheKey)}, [][]byte{data}) + if err != nil { + level.Warn(l.logger).Log("msg", "error storing cache", "err", err) + } + return resp, nil +} + +func (l *logResultCache) handleHit(ctx context.Context, cacheKey string, cachedRequest *LokiRequest, lokiReq *LokiRequest) (queryrangebase.Response, error) { + l.metrics.CacheHit.Inc() + // we start with an empty response + result := emptyResponse(cachedRequest) + // if the request is the same and cover the whole time range, + // we can just return the cached result. + if !lokiReq.GetStartTs().After(cachedRequest.GetStartTs()) && lokiReq.GetStartTs().Equal(cachedRequest.GetStartTs()) && + !lokiReq.GetEndTs().Before(cachedRequest.GetEndTs()) && lokiReq.GetEndTs().Equal(cachedRequest.GetEndTs()) { + return result, nil + } + // we could be missing data at the start and the end. + // so we're going to fetch what is missing. + var ( + startRequest, endRequest *LokiRequest + startResp, endResp *LokiResponse + updateCache bool + ok bool + ) + g, ctx := errgroup.WithContext(ctx) + + // if we're missing data at the start, start fetching from the start to the cached start. + if lokiReq.GetStartTs().Before(cachedRequest.GetStartTs()) { + g.Go(func() error { + startRequest = lokiReq.WithStartEndTime(lokiReq.GetStartTs(), cachedRequest.GetStartTs()) + resp, err := l.next.Do(ctx, startRequest) + if err != nil { + return err + } + startResp, ok = resp.(*LokiResponse) + if !ok { + return fmt.Errorf("unexpected response type %T", resp) + } + return nil + }) + } + + // if we're missing data at the end, start fetching from the cached end to the end. + if lokiReq.GetEndTs().After(cachedRequest.GetEndTs()) { + g.Go(func() error { + endRequest = lokiReq.WithStartEndTime(cachedRequest.GetEndTs(), lokiReq.GetEndTs()) + resp, err := l.next.Do(ctx, endRequest) + if err != nil { + return err + } + endResp, ok = resp.(*LokiResponse) + if !ok { + return fmt.Errorf("unexpected response type %T", resp) + } + return nil + }) + } + + if err := g.Wait(); err != nil { + return nil, err + } + + // if we have data at the start, we need to merge it with the cached data if it's empty and update the cache. + // If it's not empty only merge the response. + if startResp != nil { + if isEmpty(startResp) { + cachedRequest = cachedRequest.WithStartEndTime(startRequest.GetStartTs(), cachedRequest.GetEndTs()) + updateCache = true + } else { + if startResp.Status != loghttp.QueryStatusSuccess { + return startResp, nil + } + result = mergeLokiResponse(startResp, result) + } + } + + // if we have data at the end, we need to merge it with the cached data if it's empty and update the cache. + // If it's not empty only merge the response. + if endResp != nil { + if isEmpty(endResp) { + cachedRequest = cachedRequest.WithStartEndTime(cachedRequest.GetStartTs(), endRequest.GetEndTs()) + updateCache = true + } else { + if endResp.Status != loghttp.QueryStatusSuccess { + return endResp, nil + } + result = mergeLokiResponse(endResp, result) + } + } + + // we need to update the cache since we fetched more either at the end or the start and it was empty. + if updateCache { + data, err := proto.Marshal(cachedRequest) + if err != nil { + level.Warn(l.logger).Log("msg", "error marshalling request", "err", err) + return result, err + } + // cache the result + err = l.cache.Store(ctx, []string{cache.HashKey(cacheKey)}, [][]byte{data}) + if err != nil { + level.Warn(l.logger).Log("msg", "error storing cache", "err", err) + } + } + return result, nil +} + +func isEmpty(lokiRes *LokiResponse) bool { + return lokiRes.Status == loghttp.QueryStatusSuccess && len(lokiRes.Data.Result) == 0 +} + +func emptyResponse(lokiReq *LokiRequest) *LokiResponse { + return &LokiResponse{ + Status: loghttp.QueryStatusSuccess, + Statistics: stats.Result{}, + Direction: lokiReq.Direction, + Limit: lokiReq.Limit, + Version: uint32(loghttp.GetVersion(lokiReq.Path)), + Data: LokiData{ + ResultType: loghttp.ResultTypeStream, + Result: []logproto.Stream{}, + }, + } +} diff --git a/pkg/querier/queryrange/log_result_cache_test.go b/pkg/querier/queryrange/log_result_cache_test.go new file mode 100644 index 0000000000..19567b98a3 --- /dev/null +++ b/pkg/querier/queryrange/log_result_cache_test.go @@ -0,0 +1,452 @@ +package queryrange + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/user" + + "github.com/grafana/loki/pkg/loghttp" + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logqlmodel/stats" + "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" + "github.com/grafana/loki/pkg/storage/chunk/cache" +) + +func Test_LogResultCacheSameRange(t *testing.T) { + var ( + ctx = user.InjectOrgID(context.Background(), "foo") + lrc = NewLogResultCache( + log.NewNopLogger(), + fakeLimits{ + splits: map[string]time.Duration{"foo": time.Minute}, + }, + cache.NewMockCache(), + nil, + nil, + ) + ) + + req := &LokiRequest{ + StartTs: time.Unix(0, time.Minute.Nanoseconds()), + EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()), + } + + fake := newFakeResponse([]mockResponse{ + { + RequestResponse: queryrangebase.RequestResponse{ + Request: req, + Response: emptyResponse(req), + }, + }, + }) + + h := lrc.Wrap(fake) + + resp, err := h.Do(ctx, req) + require.NoError(t, err) + require.Equal(t, emptyResponse(req), resp) + resp, err = h.Do(ctx, req) + require.NoError(t, err) + require.Equal(t, emptyResponse(req), resp) + + fake.AssertExpectations(t) +} + +func Test_LogResultCacheSameRangeNonEmpty(t *testing.T) { + var ( + ctx = user.InjectOrgID(context.Background(), "foo") + lrc = NewLogResultCache( + log.NewNopLogger(), + fakeLimits{ + splits: map[string]time.Duration{"foo": time.Minute}, + }, + cache.NewMockCache(), + nil, + nil, + ) + ) + + req := &LokiRequest{ + StartTs: time.Unix(0, time.Minute.Nanoseconds()), + EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()), + } + + fake := newFakeResponse([]mockResponse{ + { + RequestResponse: queryrangebase.RequestResponse{ + Request: req, + Response: nonEmptyResponse(req, 1), + }, + }, + { + RequestResponse: queryrangebase.RequestResponse{ + Request: req, + Response: nonEmptyResponse(req, 2), + }, + }, + }) + + h := lrc.Wrap(fake) + + resp, err := h.Do(ctx, req) + require.NoError(t, err) + require.Equal(t, nonEmptyResponse(req, 1), resp) + resp, err = h.Do(ctx, req) + require.NoError(t, err) + require.Equal(t, nonEmptyResponse(req, 2), resp) + + fake.AssertExpectations(t) +} + +func Test_LogResultCacheSmallerRange(t *testing.T) { + var ( + ctx = user.InjectOrgID(context.Background(), "foo") + lrc = NewLogResultCache( + log.NewNopLogger(), + fakeLimits{ + splits: map[string]time.Duration{"foo": time.Minute}, + }, + cache.NewMockCache(), + nil, + nil, + ) + ) + + req := &LokiRequest{ + StartTs: time.Unix(0, time.Minute.Nanoseconds()), + EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()), + } + + fake := newFakeResponse([]mockResponse{ + { + RequestResponse: queryrangebase.RequestResponse{ + Request: req, + Response: emptyResponse(req), + }, + }, + }) + + h := lrc.Wrap(fake) + + resp, err := h.Do(ctx, req) + require.NoError(t, err) + require.Equal(t, emptyResponse(req), resp) + resp, err = h.Do(ctx, &LokiRequest{ + StartTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()), + EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()-30*time.Second.Nanoseconds()), + }) + require.NoError(t, err) + require.Equal(t, emptyResponse(&LokiRequest{ + StartTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()), + EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()-30*time.Second.Nanoseconds()), + }), resp) + + fake.AssertExpectations(t) +} + +func Test_LogResultCacheDifferentRange(t *testing.T) { + var ( + ctx = user.InjectOrgID(context.Background(), "foo") + lrc = NewLogResultCache( + log.NewNopLogger(), + fakeLimits{ + splits: map[string]time.Duration{"foo": time.Minute}, + }, + cache.NewMockCache(), + nil, + nil, + ) + ) + + req1 := &LokiRequest{ + StartTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()), + EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()-30*time.Second.Nanoseconds()), + } + + req2 := &LokiRequest{ + StartTs: time.Unix(0, time.Minute.Nanoseconds()), + EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()), + } + + fake := newFakeResponse([]mockResponse{ + { + RequestResponse: queryrangebase.RequestResponse{ + Request: req1, + Response: emptyResponse(req1), + }, + }, + { + RequestResponse: queryrangebase.RequestResponse{ + Request: &LokiRequest{ + StartTs: time.Unix(0, time.Minute.Nanoseconds()), + EndTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()), + }, + Response: emptyResponse(&LokiRequest{ + StartTs: time.Unix(0, time.Minute.Nanoseconds()), + EndTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()), + }), + }, + }, + { + RequestResponse: queryrangebase.RequestResponse{ + Request: &LokiRequest{ + StartTs: time.Unix(0, 2*time.Minute.Nanoseconds()-30*time.Second.Nanoseconds()), + EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()), + }, + Response: emptyResponse(&LokiRequest{ + StartTs: time.Unix(0, 2*time.Minute.Nanoseconds()-30*time.Second.Nanoseconds()), + EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()), + }), + }, + }, + }) + + h := lrc.Wrap(fake) + + resp, err := h.Do(ctx, req1) + require.NoError(t, err) + require.Equal(t, emptyResponse(req1), resp) + resp, err = h.Do(ctx, req2) + require.NoError(t, err) + require.Equal(t, emptyResponse(req2), resp) + + fake.AssertExpectations(t) +} + +func Test_LogResultCacheDifferentRangeNonEmpty(t *testing.T) { + var ( + ctx = user.InjectOrgID(context.Background(), "foo") + lrc = NewLogResultCache( + log.NewNopLogger(), + fakeLimits{ + splits: map[string]time.Duration{"foo": time.Minute}, + }, + cache.NewMockCache(), + nil, + nil, + ) + ) + + req1 := &LokiRequest{ + StartTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()), + EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()-30*time.Second.Nanoseconds()), + } + + req2 := &LokiRequest{ + StartTs: time.Unix(0, time.Minute.Nanoseconds()), + EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()), + } + + fake := newFakeResponse([]mockResponse{ + { + RequestResponse: queryrangebase.RequestResponse{ + Request: req1, + Response: emptyResponse(req1), + }, + }, + { + RequestResponse: queryrangebase.RequestResponse{ + Request: &LokiRequest{ + StartTs: time.Unix(0, time.Minute.Nanoseconds()), + EndTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()), + }, + Response: nonEmptyResponse(&LokiRequest{ + StartTs: time.Unix(0, time.Minute.Nanoseconds()), + EndTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()), + }, 1), + }, + }, + { + RequestResponse: queryrangebase.RequestResponse{ + Request: &LokiRequest{ + StartTs: time.Unix(0, 2*time.Minute.Nanoseconds()-30*time.Second.Nanoseconds()), + EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()), + }, + Response: nonEmptyResponse(&LokiRequest{ + StartTs: time.Unix(0, 2*time.Minute.Nanoseconds()-30*time.Second.Nanoseconds()), + EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()), + }, 2), + }, + }, + }) + + h := lrc.Wrap(fake) + + resp, err := h.Do(ctx, req1) + require.NoError(t, err) + require.Equal(t, emptyResponse(req1), resp) + resp, err = h.Do(ctx, req2) + require.NoError(t, err) + require.Equal(t, mergeLokiResponse( + nonEmptyResponse(&LokiRequest{ + StartTs: time.Unix(0, 2*time.Minute.Nanoseconds()-30*time.Second.Nanoseconds()), + EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()), + }, 2), + nonEmptyResponse(&LokiRequest{ + StartTs: time.Unix(0, time.Minute.Nanoseconds()), + EndTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()), + }, 1), + ), resp) + + fake.AssertExpectations(t) +} + +func Test_LogResultCacheDifferentRangeNonEmptyAndEmpty(t *testing.T) { + var ( + ctx = user.InjectOrgID(context.Background(), "foo") + lrc = NewLogResultCache( + log.NewNopLogger(), + fakeLimits{ + splits: map[string]time.Duration{"foo": time.Minute}, + }, + cache.NewMockCache(), + nil, + nil, + ) + ) + + req1 := &LokiRequest{ + StartTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()), + EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()-30*time.Second.Nanoseconds()), + } + + req2 := &LokiRequest{ + StartTs: time.Unix(0, time.Minute.Nanoseconds()), + EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()), + } + + fake := newFakeResponse([]mockResponse{ + { + RequestResponse: queryrangebase.RequestResponse{ + Request: req1, + Response: emptyResponse(req1), + }, + }, + { + RequestResponse: queryrangebase.RequestResponse{ + Request: &LokiRequest{ + StartTs: time.Unix(0, time.Minute.Nanoseconds()), + EndTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()), + }, + Response: emptyResponse(&LokiRequest{ + StartTs: time.Unix(0, time.Minute.Nanoseconds()), + EndTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()), + }), + }, + }, + { + RequestResponse: queryrangebase.RequestResponse{ + Request: &LokiRequest{ + StartTs: time.Unix(0, 2*time.Minute.Nanoseconds()-30*time.Second.Nanoseconds()), + EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()), + }, + Response: nonEmptyResponse(&LokiRequest{ + StartTs: time.Unix(0, 2*time.Minute.Nanoseconds()-30*time.Second.Nanoseconds()), + EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()), + }, 2), + }, + }, + // we call it twice + { + RequestResponse: queryrangebase.RequestResponse{ + Request: &LokiRequest{ + StartTs: time.Unix(0, 2*time.Minute.Nanoseconds()-30*time.Second.Nanoseconds()), + EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()), + }, + Response: nonEmptyResponse(&LokiRequest{ + StartTs: time.Unix(0, 2*time.Minute.Nanoseconds()-30*time.Second.Nanoseconds()), + EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()), + }, 2), + }, + }, + }) + + h := lrc.Wrap(fake) + + resp, err := h.Do(ctx, req1) + require.NoError(t, err) + require.Equal(t, emptyResponse(req1), resp) + resp, err = h.Do(ctx, req2) + require.NoError(t, err) + require.Equal(t, mergeLokiResponse( + emptyResponse(req1), + nonEmptyResponse(&LokiRequest{ + StartTs: time.Unix(0, time.Minute.Nanoseconds()), + EndTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()), + }, 1), + ), resp) + resp, err = h.Do(ctx, req2) + require.NoError(t, err) + require.Equal(t, mergeLokiResponse( + emptyResponse(req1), + nonEmptyResponse(&LokiRequest{ + StartTs: time.Unix(0, time.Minute.Nanoseconds()), + EndTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()), + }, 1), + ), resp) + fake.AssertExpectations(t) +} + +type fakeResponse struct { + *mock.Mock +} + +type mockResponse struct { + queryrangebase.RequestResponse + err error +} + +func newFakeResponse(responses []mockResponse) fakeResponse { + m := &mock.Mock{} + for _, r := range responses { + m.On("Do", mock.Anything, r.Request).Return(r.Response, r.err).Once() + } + return fakeResponse{ + Mock: m, + } +} + +func (f fakeResponse) Do(ctx context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { + var ( + resp queryrangebase.Response + err error + args = f.Mock.Called(ctx, r) + ) + if args.Get(0) != nil { + resp = args.Get(0).(queryrangebase.Response) + } + if args.Get(1) != nil { + err = args.Get(1).(error) + } + return resp, err +} + +func nonEmptyResponse(lokiReq *LokiRequest, i int) *LokiResponse { + return &LokiResponse{ + Status: loghttp.QueryStatusSuccess, + Statistics: stats.Result{}, + Direction: lokiReq.Direction, + Limit: lokiReq.Limit, + Version: uint32(loghttp.GetVersion(lokiReq.Path)), + Data: LokiData{ + ResultType: loghttp.ResultTypeStream, + Result: []logproto.Stream{ + { + Labels: `{foo="bar"}`, + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(1, 0), + Line: fmt.Sprintf("%d", i), + }, + }, + }, + }, + }, + } +} diff --git a/pkg/querier/queryrange/metrics.go b/pkg/querier/queryrange/metrics.go new file mode 100644 index 0000000000..25a9d883b9 --- /dev/null +++ b/pkg/querier/queryrange/metrics.go @@ -0,0 +1,26 @@ +package queryrange + +import ( + "github.com/prometheus/client_golang/prometheus" + + "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" +) + +type Metrics struct { + *queryrangebase.InstrumentMiddlewareMetrics + *queryrangebase.RetryMiddlewareMetrics + *logql.ShardingMetrics + *SplitByMetrics + *LogResultCacheMetrics +} + +func NewMetrics(registerer prometheus.Registerer) *Metrics { + return &Metrics{ + InstrumentMiddlewareMetrics: queryrangebase.NewInstrumentMiddlewareMetrics(registerer), + RetryMiddlewareMetrics: queryrangebase.NewRetryMiddlewareMetrics(registerer), + ShardingMetrics: logql.NewShardingMetrics(registerer), + SplitByMetrics: NewSplitByMetrics(registerer), + LogResultCacheMetrics: NewLogResultCacheMetrics(registerer), + } +} diff --git a/pkg/querier/queryrange/queryrangebase/results_cache.go b/pkg/querier/queryrange/queryrangebase/results_cache.go index 8f2fa87614..48d217dc6a 100644 --- a/pkg/querier/queryrange/queryrangebase/results_cache.go +++ b/pkg/querier/queryrange/queryrangebase/results_cache.go @@ -128,7 +128,6 @@ type ShouldCacheFn func(r Request) bool type resultsCache struct { logger log.Logger - cfg ResultsCacheConfig next Handler cache cache.Cache limits Limits @@ -149,7 +148,7 @@ type resultsCache struct { // see `generateKey`. func NewResultsCacheMiddleware( logger log.Logger, - cfg ResultsCacheConfig, + c cache.Cache, splitter CacheSplitter, limits Limits, merger Merger, @@ -157,15 +156,7 @@ func NewResultsCacheMiddleware( cacheGenNumberLoader CacheGenNumberLoader, shouldCache ShouldCacheFn, reg prometheus.Registerer, -) (Middleware, cache.Cache, error) { - c, err := cache.New(cfg.CacheConfig, reg, logger) - if err != nil { - return nil, nil, err - } - if cfg.Compression == "snappy" { - c = cache.NewSnappy(c, logger) - } - +) (Middleware, error) { if cacheGenNumberLoader != nil { c = cache.NewCacheGenNumMiddleware(c) } @@ -173,7 +164,6 @@ func NewResultsCacheMiddleware( return MiddlewareFunc(func(next Handler) Handler { return &resultsCache{ logger: logger, - cfg: cfg, next: next, cache: c, limits: limits, @@ -184,7 +174,7 @@ func NewResultsCacheMiddleware( cacheGenNumberLoader: cacheGenNumberLoader, shouldCache: shouldCache, } - }), c, nil + }), nil } func (s resultsCache) Do(ctx context.Context, r Request) (Response, error) { diff --git a/pkg/querier/queryrange/queryrangebase/results_cache_test.go b/pkg/querier/queryrange/queryrangebase/results_cache_test.go index f5dd7ec238..7fe5a05faf 100644 --- a/pkg/querier/queryrange/queryrangebase/results_cache_test.go +++ b/pkg/querier/queryrange/queryrangebase/results_cache_test.go @@ -430,7 +430,8 @@ func TestPartition(t *testing.T) { &PrometheusRequest{ Start: 0, End: 100, - }}, + }, + }, }, { name: "Test a partial hit.", @@ -751,9 +752,11 @@ func TestResultsCache(t *testing.T) { Cache: cache.NewMockCache(), }, } - rcm, _, err := NewResultsCacheMiddleware( + c, err := cache.New(cfg.CacheConfig, nil, log.NewNopLogger()) + require.NoError(t, err) + rcm, err := NewResultsCacheMiddleware( log.NewNopLogger(), - cfg, + c, constSplitter(day), mockLimits{}, PrometheusCodec, @@ -791,9 +794,11 @@ func TestResultsCacheRecent(t *testing.T) { var cfg ResultsCacheConfig flagext.DefaultValues(&cfg) cfg.CacheConfig.Cache = cache.NewMockCache() - rcm, _, err := NewResultsCacheMiddleware( + c, err := cache.New(cfg.CacheConfig, nil, log.NewNopLogger()) + require.NoError(t, err) + rcm, err := NewResultsCacheMiddleware( log.NewNopLogger(), - cfg, + c, constSplitter(day), mockLimits{maxCacheFreshness: 10 * time.Minute}, PrometheusCodec, @@ -852,11 +857,12 @@ func TestResultsCacheMaxFreshness(t *testing.T) { var cfg ResultsCacheConfig flagext.DefaultValues(&cfg) cfg.CacheConfig.Cache = cache.NewMockCache() - + c, err := cache.New(cfg.CacheConfig, nil, log.NewNopLogger()) + require.NoError(t, err) fakeLimits := tc.fakeLimits - rcm, _, err := NewResultsCacheMiddleware( + rcm, err := NewResultsCacheMiddleware( log.NewNopLogger(), - cfg, + c, constSplitter(day), fakeLimits, PrometheusCodec, @@ -891,9 +897,11 @@ func Test_resultsCache_MissingData(t *testing.T) { Cache: cache.NewMockCache(), }, } - rm, _, err := NewResultsCacheMiddleware( + c, err := cache.New(cfg.CacheConfig, nil, log.NewNopLogger()) + require.NoError(t, err) + rm, err := NewResultsCacheMiddleware( log.NewNopLogger(), - cfg, + c, constSplitter(day), mockLimits{}, PrometheusCodec, @@ -996,9 +1004,11 @@ func TestResultsCacheShouldCacheFunc(t *testing.T) { var cfg ResultsCacheConfig flagext.DefaultValues(&cfg) cfg.CacheConfig.Cache = cache.NewMockCache() - rcm, _, err := NewResultsCacheMiddleware( + c, err := cache.New(cfg.CacheConfig, nil, log.NewNopLogger()) + require.NoError(t, err) + rcm, err := NewResultsCacheMiddleware( log.NewNopLogger(), - cfg, + c, constSplitter(day), mockLimits{maxCacheFreshness: 10 * time.Minute}, PrometheusCodec, @@ -1024,8 +1034,7 @@ func TestResultsCacheShouldCacheFunc(t *testing.T) { } } -type mockCacheGenNumberLoader struct { -} +type mockCacheGenNumberLoader struct{} func newMockCacheGenNumberLoader() CacheGenNumberLoader { return mockCacheGenNumberLoader{} diff --git a/pkg/querier/queryrange/queryrangebase/roundtrip.go b/pkg/querier/queryrange/queryrangebase/roundtrip.go index 714f95c3f6..210de8ee06 100644 --- a/pkg/querier/queryrange/queryrangebase/roundtrip.go +++ b/pkg/querier/queryrange/queryrangebase/roundtrip.go @@ -21,36 +21,21 @@ import ( "io" "io/ioutil" "net/http" - "strings" "time" - "github.com/go-kit/log" - "github.com/go-kit/log/level" "github.com/grafana/dskit/flagext" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/prometheus/prometheus/promql" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" - - "github.com/grafana/loki/pkg/storage/chunk" - "github.com/grafana/loki/pkg/storage/chunk/cache" - "github.com/grafana/loki/pkg/tenant" - "github.com/grafana/loki/pkg/util" ) const day = 24 * time.Hour -var ( - // PassthroughMiddleware is a noop middleware - PassthroughMiddleware = MiddlewareFunc(func(next Handler) Handler { - return next - }) - - errInvalidMinShardingLookback = errors.New("a non-zero value is required for querier.query-ingesters-within when -querier.parallelise-shardable-queries is enabled") -) +// PassthroughMiddleware is a noop middleware +var PassthroughMiddleware = MiddlewareFunc(func(next Handler) Handler { + return next +}) // Config for query_range middleware chain. type Config struct { @@ -137,114 +122,6 @@ func (f RoundTripFunc) RoundTrip(r *http.Request) (*http.Response, error) { return f(r) } -// NewTripperware returns a Tripperware configured with middlewares to limit, align, split, retry and cache requests. -func NewTripperware( - cfg Config, - log log.Logger, - limits Limits, - codec Codec, - cacheExtractor Extractor, - schema chunk.SchemaConfig, - engineOpts promql.EngineOpts, - minShardingLookback time.Duration, - registerer prometheus.Registerer, - cacheGenNumberLoader CacheGenNumberLoader, -) (Tripperware, cache.Cache, error) { - // Per tenant query metrics. - queriesPerTenant := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_query_frontend_queries_total", - Help: "Total queries sent per tenant.", - }, []string{"op", "user"}) - - activeUsers := util.NewActiveUsersCleanupWithDefaultValues(func(user string) { - err := util.DeleteMatchingLabels(queriesPerTenant, map[string]string{"user": user}) - if err != nil { - level.Warn(log).Log("msg", "failed to remove cortex_query_frontend_queries_total metric for user", "user", user) - } - }) - - // Metric used to keep track of each middleware execution duration. - metrics := NewInstrumentMiddlewareMetrics(registerer) - - queryRangeMiddleware := []Middleware{NewLimitsMiddleware(limits)} - if cfg.AlignQueriesWithStep { - queryRangeMiddleware = append(queryRangeMiddleware, InstrumentMiddleware("step_align", metrics), StepAlignMiddleware) - } - if cfg.SplitQueriesByInterval != 0 { - staticIntervalFn := func(_ Request) time.Duration { return cfg.SplitQueriesByInterval } - queryRangeMiddleware = append(queryRangeMiddleware, InstrumentMiddleware("split_by_interval", metrics), SplitByIntervalMiddleware(staticIntervalFn, limits, codec, registerer)) - } - - var c cache.Cache - if cfg.CacheResults { - shouldCache := func(r Request) bool { - return !r.GetCachingOptions().Disabled - } - queryCacheMiddleware, cache, err := NewResultsCacheMiddleware(log, cfg.ResultsCacheConfig, constSplitter(cfg.SplitQueriesByInterval), limits, codec, cacheExtractor, cacheGenNumberLoader, shouldCache, registerer) - if err != nil { - return nil, nil, err - } - c = cache - queryRangeMiddleware = append(queryRangeMiddleware, InstrumentMiddleware("results_cache", metrics), queryCacheMiddleware) - } - - if cfg.ShardedQueries { - if minShardingLookback == 0 { - return nil, nil, errInvalidMinShardingLookback - } - - shardingware := NewQueryShardMiddleware( - log, - promql.NewEngine(engineOpts), - schema.Configs, - codec, - minShardingLookback, - metrics, - registerer, - ) - - queryRangeMiddleware = append( - queryRangeMiddleware, - shardingware, // instrumentation is included in the sharding middleware - ) - } - - if cfg.MaxRetries > 0 { - queryRangeMiddleware = append(queryRangeMiddleware, InstrumentMiddleware("retry", metrics), NewRetryMiddleware(log, cfg.MaxRetries, NewRetryMiddlewareMetrics(registerer))) - } - - // Start cleanup. If cleaner stops or fail, we will simply not clean the metrics for inactive users. - _ = activeUsers.StartAsync(context.Background()) - return func(next http.RoundTripper) http.RoundTripper { - // Finally, if the user selected any query range middleware, stitch it in. - if len(queryRangeMiddleware) > 0 { - queryrange := NewRoundTripper(next, codec, cfg.ForwardHeaders, queryRangeMiddleware...) - return RoundTripFunc(func(r *http.Request) (*http.Response, error) { - isQueryRange := strings.HasSuffix(r.URL.Path, "/query_range") - op := "query" - if isQueryRange { - op = "query_range" - } - - tenantIDs, err := tenant.TenantIDs(r.Context()) - // This should never happen anyways because we have auth middleware before this. - if err != nil { - return nil, err - } - userStr := tenant.JoinTenantIDs(tenantIDs) - activeUsers.UpdateUserTimestamp(userStr, time.Now()) - queriesPerTenant.WithLabelValues(op, userStr).Inc() - - if !isQueryRange { - return next.RoundTrip(r) - } - return queryrange.RoundTrip(r) - }) - } - return next - }, c, nil -} - type roundTripper struct { next http.RoundTripper handler Handler @@ -265,7 +142,6 @@ func NewRoundTripper(next http.RoundTripper, codec Codec, headers []string, midd } func (q roundTripper) RoundTrip(r *http.Request) (*http.Response, error) { - // include the headers specified in the roundTripper during decoding the request. request, err := q.codec.DecodeRequest(r.Context(), r, q.headers) if err != nil { diff --git a/pkg/querier/queryrange/queryrangebase/roundtrip_test.go b/pkg/querier/queryrange/queryrangebase/roundtrip_test.go index 790eed2569..f834ba868d 100644 --- a/pkg/querier/queryrange/queryrangebase/roundtrip_test.go +++ b/pkg/querier/queryrange/queryrangebase/roundtrip_test.go @@ -1,100 +1,9 @@ package queryrangebase import ( - "context" - "io/ioutil" "net/http" - "net/http/httptest" - "net/url" - "strconv" - "testing" - "time" - - "github.com/go-kit/log" - "github.com/prometheus/prometheus/promql" - "github.com/stretchr/testify/require" - "github.com/weaveworks/common/middleware" - "github.com/weaveworks/common/user" - - "github.com/grafana/loki/pkg/storage/chunk" ) -func TestRoundTrip(t *testing.T) { - s := httptest.NewServer( - middleware.AuthenticateUser.Wrap( - http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - var err error - if r.RequestURI == query { - _, err = w.Write([]byte(responseBody)) - } else { - _, err = w.Write([]byte("bar")) - } - if err != nil { - t.Fatal(err) - } - }), - ), - ) - defer s.Close() - - u, err := url.Parse(s.URL) - require.NoError(t, err) - - downstream := singleHostRoundTripper{ - host: u.Host, - next: http.DefaultTransport, - } - - tw, _, err := NewTripperware(Config{}, - log.NewNopLogger(), - mockLimits{}, - PrometheusCodec, - nil, - chunk.SchemaConfig{}, - promql.EngineOpts{ - Logger: log.NewNopLogger(), - Reg: nil, - MaxSamples: 1000, - Timeout: time.Minute, - }, - 0, - nil, - nil, - ) - - if err != nil { - t.Fatal(err) - } - - for i, tc := range []struct { - path, expectedBody string - }{ - {"/foo", "bar"}, - {query, responseBody}, - } { - t.Run(strconv.Itoa(i), func(t *testing.T) { - req, err := http.NewRequest("GET", tc.path, http.NoBody) - require.NoError(t, err) - - // query-frontend doesn't actually authenticate requests, we rely on - // the queriers to do this. Hence we ensure the request doesn't have a - // org ID in the ctx, but does have the header. - ctx := user.InjectOrgID(context.Background(), "1") - req = req.WithContext(ctx) - err = user.InjectOrgIDIntoHTTPRequest(ctx, req) - require.NoError(t, err) - - resp, err := tw(downstream).RoundTrip(req) - require.NoError(t, err) - require.Equal(t, 200, resp.StatusCode) - - bs, err := ioutil.ReadAll(resp.Body) - require.NoError(t, err) - require.Equal(t, tc.expectedBody, string(bs)) - }) - } -} - type singleHostRoundTripper struct { host string next http.RoundTripper @@ -105,20 +14,3 @@ func (s singleHostRoundTripper) RoundTrip(r *http.Request) (*http.Response, erro r.URL.Host = s.host return s.next.RoundTrip(r) } - -func Test_ShardingConfigError(t *testing.T) { - _, _, err := NewTripperware( - Config{ShardedQueries: true}, - log.NewNopLogger(), - nil, - nil, - nil, - chunk.SchemaConfig{}, - promql.EngineOpts{}, - 0, - nil, - nil, - ) - - require.EqualError(t, err, errInvalidMinShardingLookback.Error()) -} diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index d2bb701329..f662b1c505 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -12,7 +12,6 @@ import ( "github.com/weaveworks/common/httpgrpc" "github.com/grafana/loki/pkg/loghttp" - "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/logql/syntax" "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" "github.com/grafana/loki/pkg/storage/chunk" @@ -43,35 +42,46 @@ func NewTripperware( schema chunk.SchemaConfig, registerer prometheus.Registerer, ) (queryrangebase.Tripperware, Stopper, error) { - instrumentMetrics := queryrangebase.NewInstrumentMiddlewareMetrics(registerer) - retryMetrics := queryrangebase.NewRetryMiddlewareMetrics(registerer) - shardingMetrics := logql.NewShardingMetrics(registerer) - splitByMetrics := NewSplitByMetrics(registerer) + metrics := NewMetrics(registerer) - metricsTripperware, cache, err := NewMetricTripperware(cfg, log, limits, schema, LokiCodec, - PrometheusExtractor{}, instrumentMetrics, retryMetrics, shardingMetrics, splitByMetrics, registerer) + var ( + c cache.Cache + err error + ) + if cfg.CacheResults { + c, err = cache.New(cfg.CacheConfig, registerer, log) + if err != nil { + return nil, nil, err + } + if cfg.Compression == "snappy" { + c = cache.NewSnappy(c, log) + } + } + + metricsTripperware, err := NewMetricTripperware(cfg, log, limits, schema, LokiCodec, c, + PrometheusExtractor{}, metrics, registerer) if err != nil { return nil, nil, err } // NOTE: When we would start caching response from non-metric queries we would have to consider cache gen headers as well in // MergeResponse implementation for Loki codecs same as it is done in Cortex at https://github.com/cortexproject/cortex/blob/21bad57b346c730d684d6d0205efef133422ab28/pkg/querier/queryrange/query_range.go#L170 - logFilterTripperware, err := NewLogFilterTripperware(cfg, log, limits, schema, LokiCodec, instrumentMetrics, retryMetrics, shardingMetrics, splitByMetrics) + logFilterTripperware, err := NewLogFilterTripperware(cfg, log, limits, schema, LokiCodec, c, metrics) if err != nil { return nil, nil, err } - seriesTripperware, err := NewSeriesTripperware(cfg, log, limits, LokiCodec, instrumentMetrics, retryMetrics, splitByMetrics, shardingMetrics, schema) + seriesTripperware, err := NewSeriesTripperware(cfg, log, limits, LokiCodec, metrics, schema) if err != nil { return nil, nil, err } - labelsTripperware, err := NewLabelsTripperware(cfg, log, limits, LokiCodec, instrumentMetrics, retryMetrics, splitByMetrics) + labelsTripperware, err := NewLabelsTripperware(cfg, log, limits, LokiCodec, metrics) if err != nil { return nil, nil, err } - instantMetricTripperware, err := NewInstantMetricTripperware(cfg, log, limits, schema, LokiCodec, instrumentMetrics, retryMetrics, shardingMetrics, splitByMetrics) + instantMetricTripperware, err := NewInstantMetricTripperware(cfg, log, limits, schema, LokiCodec, metrics) if err != nil { return nil, nil, err } @@ -82,7 +92,7 @@ func NewTripperware( labelsRT := labelsTripperware(next) instantRT := instantMetricTripperware(next) return newRoundTripper(next, logFilterRT, metricRT, seriesRT, labelsRT, instantRT, limits) - }, cache, nil + }, c, nil } type roundTripper struct { @@ -235,16 +245,31 @@ func NewLogFilterTripperware( limits Limits, schema chunk.SchemaConfig, codec queryrangebase.Codec, - instrumentMetrics *queryrangebase.InstrumentMiddlewareMetrics, - retryMiddlewareMetrics *queryrangebase.RetryMiddlewareMetrics, - shardingMetrics *logql.ShardingMetrics, - splitByMetrics *SplitByMetrics, + c cache.Cache, + metrics *Metrics, ) (queryrangebase.Tripperware, error) { queryRangeMiddleware := []queryrangebase.Middleware{ StatsCollectorMiddleware(), NewLimitsMiddleware(limits), - queryrangebase.InstrumentMiddleware("split_by_interval", instrumentMetrics), - SplitByIntervalMiddleware(limits, codec, splitByTime, splitByMetrics), + queryrangebase.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics), + SplitByIntervalMiddleware(limits, codec, splitByTime, metrics.SplitByMetrics), + } + + if cfg.CacheResults { + queryCacheMiddleware := NewLogResultCache( + log, + limits, + c, + func(r queryrangebase.Request) bool { + return !r.GetCachingOptions().Disabled + }, + metrics.LogResultCacheMetrics, + ) + queryRangeMiddleware = append( + queryRangeMiddleware, + queryrangebase.InstrumentMiddleware("log_results_cache", metrics.InstrumentMiddlewareMetrics), + queryCacheMiddleware, + ) } if cfg.ShardedQueries { @@ -252,15 +277,18 @@ func NewLogFilterTripperware( NewQueryShardMiddleware( log, schema.Configs, - instrumentMetrics, // instrumentation is included in the sharding middleware - shardingMetrics, + metrics.InstrumentMiddlewareMetrics, // instrumentation is included in the sharding middleware + metrics.ShardingMetrics, limits, ), ) } if cfg.MaxRetries > 0 { - queryRangeMiddleware = append(queryRangeMiddleware, queryrangebase.InstrumentMiddleware("retry", instrumentMetrics), queryrangebase.NewRetryMiddleware(log, cfg.MaxRetries, retryMiddlewareMetrics)) + queryRangeMiddleware = append( + queryRangeMiddleware, queryrangebase.InstrumentMiddleware("retry", metrics.InstrumentMiddlewareMetrics), + queryrangebase.NewRetryMiddleware(log, cfg.MaxRetries, metrics.RetryMiddlewareMetrics), + ) } return func(next http.RoundTripper) http.RoundTripper { @@ -277,23 +305,23 @@ func NewSeriesTripperware( log log.Logger, limits Limits, codec queryrangebase.Codec, - instrumentMetrics *queryrangebase.InstrumentMiddlewareMetrics, - retryMiddlewareMetrics *queryrangebase.RetryMiddlewareMetrics, - splitByMetrics *SplitByMetrics, - shardingMetrics *logql.ShardingMetrics, + metrics *Metrics, schema chunk.SchemaConfig, ) (queryrangebase.Tripperware, error) { queryRangeMiddleware := []queryrangebase.Middleware{ NewLimitsMiddleware(limits), - queryrangebase.InstrumentMiddleware("split_by_interval", instrumentMetrics), + queryrangebase.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics), // The Series API needs to pull one chunk per series to extract the label set, which is much cheaper than iterating through all matching chunks. // Force a 24 hours split by for series API, this will be more efficient with our static daily bucket storage. // This would avoid queriers downloading chunks for same series over and over again for serving smaller queries. - SplitByIntervalMiddleware(WithSplitByLimits(limits, 24*time.Hour), codec, splitByTime, splitByMetrics), + SplitByIntervalMiddleware(WithSplitByLimits(limits, 24*time.Hour), codec, splitByTime, metrics.SplitByMetrics), } if cfg.MaxRetries > 0 { - queryRangeMiddleware = append(queryRangeMiddleware, queryrangebase.InstrumentMiddleware("retry", instrumentMetrics), queryrangebase.NewRetryMiddleware(log, cfg.MaxRetries, retryMiddlewareMetrics)) + queryRangeMiddleware = append(queryRangeMiddleware, + queryrangebase.InstrumentMiddleware("retry", metrics.InstrumentMiddlewareMetrics), + queryrangebase.NewRetryMiddleware(log, cfg.MaxRetries, metrics.RetryMiddlewareMetrics), + ) } if cfg.ShardedQueries { @@ -301,8 +329,8 @@ func NewSeriesTripperware( NewSeriesQueryShardMiddleware( log, schema.Configs, - instrumentMetrics, - shardingMetrics, + metrics.InstrumentMiddlewareMetrics, + metrics.ShardingMetrics, limits, codec, ), @@ -323,20 +351,21 @@ func NewLabelsTripperware( log log.Logger, limits Limits, codec queryrangebase.Codec, - instrumentMetrics *queryrangebase.InstrumentMiddlewareMetrics, - retryMiddlewareMetrics *queryrangebase.RetryMiddlewareMetrics, - splitByMetrics *SplitByMetrics, + metrics *Metrics, ) (queryrangebase.Tripperware, error) { queryRangeMiddleware := []queryrangebase.Middleware{ NewLimitsMiddleware(limits), - queryrangebase.InstrumentMiddleware("split_by_interval", instrumentMetrics), + queryrangebase.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics), // Force a 24 hours split by for labels API, this will be more efficient with our static daily bucket storage. // This is because the labels API is an index-only operation. - SplitByIntervalMiddleware(WithSplitByLimits(limits, 24*time.Hour), codec, splitByTime, splitByMetrics), + SplitByIntervalMiddleware(WithSplitByLimits(limits, 24*time.Hour), codec, splitByTime, metrics.SplitByMetrics), } if cfg.MaxRetries > 0 { - queryRangeMiddleware = append(queryRangeMiddleware, queryrangebase.InstrumentMiddleware("retry", instrumentMetrics), queryrangebase.NewRetryMiddleware(log, cfg.MaxRetries, retryMiddlewareMetrics)) + queryRangeMiddleware = append(queryRangeMiddleware, + queryrangebase.InstrumentMiddleware("retry", metrics.InstrumentMiddlewareMetrics), + queryrangebase.NewRetryMiddleware(log, cfg.MaxRetries, metrics.RetryMiddlewareMetrics), + ) } return func(next http.RoundTripper) http.RoundTripper { @@ -355,33 +384,30 @@ func NewMetricTripperware( limits Limits, schema chunk.SchemaConfig, codec queryrangebase.Codec, + c cache.Cache, extractor queryrangebase.Extractor, - instrumentMetrics *queryrangebase.InstrumentMiddlewareMetrics, - retryMiddlewareMetrics *queryrangebase.RetryMiddlewareMetrics, - shardingMetrics *logql.ShardingMetrics, - splitByMetrics *SplitByMetrics, + metrics *Metrics, registerer prometheus.Registerer, -) (queryrangebase.Tripperware, Stopper, error) { +) (queryrangebase.Tripperware, error) { queryRangeMiddleware := []queryrangebase.Middleware{StatsCollectorMiddleware(), NewLimitsMiddleware(limits)} if cfg.AlignQueriesWithStep { queryRangeMiddleware = append( queryRangeMiddleware, - queryrangebase.InstrumentMiddleware("step_align", instrumentMetrics), + queryrangebase.InstrumentMiddleware("step_align", metrics.InstrumentMiddlewareMetrics), queryrangebase.StepAlignMiddleware, ) } queryRangeMiddleware = append( queryRangeMiddleware, - queryrangebase.InstrumentMiddleware("split_by_interval", instrumentMetrics), - SplitByIntervalMiddleware(limits, codec, splitMetricByTime, splitByMetrics), + queryrangebase.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics), + SplitByIntervalMiddleware(limits, codec, splitMetricByTime, metrics.SplitByMetrics), ) - var c cache.Cache if cfg.CacheResults { - queryCacheMiddleware, cache, err := queryrangebase.NewResultsCacheMiddleware( + queryCacheMiddleware, err := queryrangebase.NewResultsCacheMiddleware( log, - cfg.ResultsCacheConfig, + c, cacheKeyLimits{limits}, limits, codec, @@ -393,12 +419,11 @@ func NewMetricTripperware( registerer, ) if err != nil { - return nil, nil, err + return nil, err } - c = cache queryRangeMiddleware = append( queryRangeMiddleware, - queryrangebase.InstrumentMiddleware("results_cache", instrumentMetrics), + queryrangebase.InstrumentMiddleware("results_cache", metrics.InstrumentMiddlewareMetrics), queryCacheMiddleware, ) } @@ -408,8 +433,8 @@ func NewMetricTripperware( NewQueryShardMiddleware( log, schema.Configs, - instrumentMetrics, // instrumentation is included in the sharding middleware - shardingMetrics, + metrics.InstrumentMiddlewareMetrics, // instrumentation is included in the sharding middleware + metrics.ShardingMetrics, limits, ), ) @@ -418,8 +443,8 @@ func NewMetricTripperware( if cfg.MaxRetries > 0 { queryRangeMiddleware = append( queryRangeMiddleware, - queryrangebase.InstrumentMiddleware("retry", instrumentMetrics), - queryrangebase.NewRetryMiddleware(log, cfg.MaxRetries, retryMiddlewareMetrics), + queryrangebase.InstrumentMiddleware("retry", metrics.InstrumentMiddlewareMetrics), + queryrangebase.NewRetryMiddleware(log, cfg.MaxRetries, metrics.RetryMiddlewareMetrics), ) } @@ -435,7 +460,7 @@ func NewMetricTripperware( }) } return next - }, c, nil + }, nil } // NewInstantMetricTripperware creates a new frontend tripperware responsible for handling metric queries @@ -445,10 +470,7 @@ func NewInstantMetricTripperware( limits Limits, schema chunk.SchemaConfig, codec queryrangebase.Codec, - instrumentMetrics *queryrangebase.InstrumentMiddlewareMetrics, - retryMiddlewareMetrics *queryrangebase.RetryMiddlewareMetrics, - shardingMetrics *logql.ShardingMetrics, - splitByMetrics *SplitByMetrics, + metrics *Metrics, ) (queryrangebase.Tripperware, error) { queryRangeMiddleware := []queryrangebase.Middleware{StatsCollectorMiddleware(), NewLimitsMiddleware(limits)} @@ -457,8 +479,8 @@ func NewInstantMetricTripperware( NewQueryShardMiddleware( log, schema.Configs, - instrumentMetrics, // instrumentation is included in the sharding middleware - shardingMetrics, + metrics.InstrumentMiddlewareMetrics, // instrumentation is included in the sharding middleware + metrics.ShardingMetrics, limits, ), ) @@ -467,8 +489,8 @@ func NewInstantMetricTripperware( if cfg.MaxRetries > 0 { queryRangeMiddleware = append( queryRangeMiddleware, - queryrangebase.InstrumentMiddleware("retry", instrumentMetrics), - queryrangebase.NewRetryMiddleware(log, cfg.MaxRetries, retryMiddlewareMetrics), + queryrangebase.InstrumentMiddleware("retry", metrics.InstrumentMiddlewareMetrics), + queryrangebase.NewRetryMiddleware(log, cfg.MaxRetries, metrics.RetryMiddlewareMetrics), ) } diff --git a/pkg/querier/queryrange/split_by_interval.go b/pkg/querier/queryrange/split_by_interval.go index c040423ba5..16c704e24b 100644 --- a/pkg/querier/queryrange/split_by_interval.go +++ b/pkg/querier/queryrange/split_by_interval.go @@ -281,7 +281,6 @@ func forInterval(interval time.Duration, start, end time.Time, endTimeInclusive } else if endTimeInclusive { newEnd = newEnd.Add(-time.Millisecond) } - if firstInterval { callback(ogStart, newEnd) firstInterval = false