[Frontend] Log Result Cache (#5502)

* Rough verion still require some fixes

* with file saved

* Finish off the code and logic

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Refactors metrics and hook the log cache middleware

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add tests, fixes and lint

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Hash the key

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* better comment

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* better comment

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Update pkg/querier/queryrange/log_result_cache.go

Co-authored-by: Christian Simon <simon@swine.de>

* Update pkg/querier/queryrange/log_result_cache.go

Co-authored-by: Christian Simon <simon@swine.de>

* Fixes typo in variable namme

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Use only one ok bool

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Review feedbacks

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* check array boundaries

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

Co-authored-by: Christian Simon <simon@swine.de>
pull/5627/head
Cyril Tovena 4 years ago committed by GitHub
parent 01d63c8ae8
commit 39101308f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 61
      pkg/querier/queryrange/codec.go
  2. 279
      pkg/querier/queryrange/log_result_cache.go
  3. 452
      pkg/querier/queryrange/log_result_cache_test.go
  4. 26
      pkg/querier/queryrange/metrics.go
  5. 16
      pkg/querier/queryrange/queryrangebase/results_cache.go
  6. 37
      pkg/querier/queryrange/queryrangebase/results_cache_test.go
  7. 132
      pkg/querier/queryrange/queryrangebase/roundtrip.go
  8. 108
      pkg/querier/queryrange/queryrangebase/roundtrip_test.go
  9. 148
      pkg/querier/queryrange/roundtrip.go
  10. 1
      pkg/querier/queryrange/split_by_interval.go

@ -50,6 +50,13 @@ func (r *LokiRequest) WithStartEnd(s int64, e int64) queryrangebase.Request {
return &new
}
func (r *LokiRequest) WithStartEndTime(s time.Time, e time.Time) *LokiRequest {
new := *r
new.StartTs = s
new.EndTs = e
return &new
}
func (r *LokiRequest) WithQuery(query string) queryrangebase.Request {
new := *r
new.Query = query
@ -565,28 +572,7 @@ func (Codec) MergeResponse(responses ...queryrangebase.Response) (queryrangebase
Statistics: mergedStats,
}, nil
case *LokiResponse:
lokiRes := responses[0].(*LokiResponse)
lokiResponses := make([]*LokiResponse, 0, len(responses))
for _, res := range responses {
lokiResult := res.(*LokiResponse)
mergedStats.Merge(lokiResult.Statistics)
lokiResponses = append(lokiResponses, lokiResult)
}
return &LokiResponse{
Status: loghttp.QueryStatusSuccess,
Direction: lokiRes.Direction,
Limit: lokiRes.Limit,
Version: lokiRes.Version,
ErrorType: lokiRes.ErrorType,
Error: lokiRes.Error,
Statistics: mergedStats,
Data: LokiData{
ResultType: loghttp.ResultTypeStream,
Result: mergeOrderedNonOverlappingStreams(lokiResponses, lokiRes.Limit, lokiRes.Direction),
},
}, nil
return mergeLokiResponse(responses...), nil
case *LokiSeriesResponse:
lokiSeriesRes := responses[0].(*LokiSeriesResponse)
@ -910,3 +896,34 @@ func NewEmptyResponse(r queryrangebase.Request) (queryrangebase.Response, error)
return nil, fmt.Errorf("unsupported request type %T", req)
}
}
func mergeLokiResponse(responses ...queryrangebase.Response) *LokiResponse {
if len(responses) == 0 {
return nil
}
var (
lokiRes = responses[0].(*LokiResponse)
mergedStats stats.Result
lokiResponses = make([]*LokiResponse, 0, len(responses))
)
for _, res := range responses {
lokiResult := res.(*LokiResponse)
mergedStats.Merge(lokiResult.Statistics)
lokiResponses = append(lokiResponses, lokiResult)
}
return &LokiResponse{
Status: loghttp.QueryStatusSuccess,
Direction: lokiRes.Direction,
Limit: lokiRes.Limit,
Version: lokiRes.Version,
ErrorType: lokiRes.ErrorType,
Error: lokiRes.Error,
Statistics: mergedStats,
Data: LokiData{
ResultType: loghttp.ResultTypeStream,
Result: mergeOrderedNonOverlappingStreams(lokiResponses, lokiRes.Limit, lokiRes.Direction),
},
}
}

@ -0,0 +1,279 @@
package queryrange
import (
"context"
"fmt"
"net/http"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/weaveworks/common/httpgrpc"
"golang.org/x/sync/errgroup"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/tenant"
"github.com/grafana/loki/pkg/util/validation"
)
// LogResultCacheMetrics is the metrics wrapper used in log result cache.
type LogResultCacheMetrics struct {
CacheHit prometheus.Counter
CacheMiss prometheus.Counter
}
// NewLogResultCacheMetrics creates metrics to be used in log result cache.
func NewLogResultCacheMetrics(registerer prometheus.Registerer) *LogResultCacheMetrics {
return &LogResultCacheMetrics{
CacheHit: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Namespace: "loki",
Name: "query_frontend_log_result_cache_hit_total",
}),
CacheMiss: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Namespace: "loki",
Name: "query_frontend_log_result_cache_miss_total",
}),
}
}
// NewLogResultCache creates a new log result cache middleware.
// Currently it only caches empty filter queries, this is because those are usually easily and freely cacheable.
// Log hits are difficult to handle because of the limit query parameter and the size of the response.
// In the future it could be extended to cache non-empty query results.
// see https://docs.google.com/document/d/1_mACOpxdWZ5K0cIedaja5gzMbv-m0lUVazqZd2O4mEU/edit
func NewLogResultCache(logger log.Logger, limits Limits, cache cache.Cache, shouldCache queryrangebase.ShouldCacheFn, metrics *LogResultCacheMetrics) queryrangebase.Middleware {
if metrics == nil {
metrics = NewLogResultCacheMetrics(nil)
}
return queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler {
return &logResultCache{
next: next,
limits: limits,
cache: cache,
logger: logger,
shouldCache: shouldCache,
metrics: metrics,
}
})
}
type logResultCache struct {
next queryrangebase.Handler
limits Limits
cache cache.Cache
shouldCache queryrangebase.ShouldCacheFn
metrics *LogResultCacheMetrics
logger log.Logger
}
func (l *logResultCache) Do(ctx context.Context, req queryrangebase.Request) (queryrangebase.Response, error) {
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
if l.shouldCache != nil && !l.shouldCache(req) {
return l.next.Do(ctx, req)
}
maxCacheFreshness := validation.MaxDurationPerTenant(tenantIDs, l.limits.MaxCacheFreshness)
maxCacheTime := int64(model.Now().Add(-maxCacheFreshness))
if req.GetEnd() > maxCacheTime {
return l.next.Do(ctx, req)
}
lokiReq, ok := req.(*LokiRequest)
if !ok {
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "invalid request type %T", req)
}
interval := validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, l.limits.QuerySplitDuration)
// skip caching by if interval is unset
if interval == 0 {
return l.next.Do(ctx, req)
}
// The first subquery might not be aligned.
alignedStart := time.Unix(0, lokiReq.GetStartTs().UnixNano()-(lokiReq.GetStartTs().UnixNano()%interval.Nanoseconds()))
// generate the cache key based on query, tenant and start time.
cacheKey := fmt.Sprintf("log:%s:%s:%d:%d", tenant.JoinTenantIDs(tenantIDs), req.GetQuery(), interval.Nanoseconds(), alignedStart.UnixNano()/(interval.Nanoseconds()))
_, buff, _, err := l.cache.Fetch(ctx, []string{cache.HashKey(cacheKey)})
if err != nil {
level.Warn(l.logger).Log("msg", "error fetching cache", "err", err, "cacheKey", cacheKey)
return l.next.Do(ctx, req)
}
// we expect only one key to be found or missing.
if len(buff) > 1 {
level.Warn(l.logger).Log("msg", "unexpected length of cache return values", "buff", len(buff))
return l.next.Do(ctx, req)
}
if len(buff) == 0 {
// cache miss
return l.handleMiss(ctx, cacheKey, lokiReq)
}
// cache hit
var cachedRequest LokiRequest
err = proto.Unmarshal(buff[0], &cachedRequest)
if err != nil {
level.Warn(l.logger).Log("msg", "error unmarshalling request from cache", "err", err)
return l.next.Do(ctx, req)
}
return l.handleHit(ctx, cacheKey, &cachedRequest, lokiReq)
}
func (l *logResultCache) handleMiss(ctx context.Context, cacheKey string, req *LokiRequest) (queryrangebase.Response, error) {
l.metrics.CacheMiss.Inc()
level.Debug(l.logger).Log("msg", "cache miss", "key", cacheKey)
resp, err := l.next.Do(ctx, req)
if err != nil {
return nil, err
}
lokiRes, ok := resp.(*LokiResponse)
if !ok {
return nil, fmt.Errorf("unexpected response type %T", resp)
}
// At the moment we only cache empty results
if !isEmpty(lokiRes) {
return resp, nil
}
data, err := proto.Marshal(req)
if err != nil {
level.Warn(l.logger).Log("msg", "error marshalling request", "err", err)
return resp, nil
}
// cache the result
err = l.cache.Store(ctx, []string{cache.HashKey(cacheKey)}, [][]byte{data})
if err != nil {
level.Warn(l.logger).Log("msg", "error storing cache", "err", err)
}
return resp, nil
}
func (l *logResultCache) handleHit(ctx context.Context, cacheKey string, cachedRequest *LokiRequest, lokiReq *LokiRequest) (queryrangebase.Response, error) {
l.metrics.CacheHit.Inc()
// we start with an empty response
result := emptyResponse(cachedRequest)
// if the request is the same and cover the whole time range,
// we can just return the cached result.
if !lokiReq.GetStartTs().After(cachedRequest.GetStartTs()) && lokiReq.GetStartTs().Equal(cachedRequest.GetStartTs()) &&
!lokiReq.GetEndTs().Before(cachedRequest.GetEndTs()) && lokiReq.GetEndTs().Equal(cachedRequest.GetEndTs()) {
return result, nil
}
// we could be missing data at the start and the end.
// so we're going to fetch what is missing.
var (
startRequest, endRequest *LokiRequest
startResp, endResp *LokiResponse
updateCache bool
ok bool
)
g, ctx := errgroup.WithContext(ctx)
// if we're missing data at the start, start fetching from the start to the cached start.
if lokiReq.GetStartTs().Before(cachedRequest.GetStartTs()) {
g.Go(func() error {
startRequest = lokiReq.WithStartEndTime(lokiReq.GetStartTs(), cachedRequest.GetStartTs())
resp, err := l.next.Do(ctx, startRequest)
if err != nil {
return err
}
startResp, ok = resp.(*LokiResponse)
if !ok {
return fmt.Errorf("unexpected response type %T", resp)
}
return nil
})
}
// if we're missing data at the end, start fetching from the cached end to the end.
if lokiReq.GetEndTs().After(cachedRequest.GetEndTs()) {
g.Go(func() error {
endRequest = lokiReq.WithStartEndTime(cachedRequest.GetEndTs(), lokiReq.GetEndTs())
resp, err := l.next.Do(ctx, endRequest)
if err != nil {
return err
}
endResp, ok = resp.(*LokiResponse)
if !ok {
return fmt.Errorf("unexpected response type %T", resp)
}
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
// if we have data at the start, we need to merge it with the cached data if it's empty and update the cache.
// If it's not empty only merge the response.
if startResp != nil {
if isEmpty(startResp) {
cachedRequest = cachedRequest.WithStartEndTime(startRequest.GetStartTs(), cachedRequest.GetEndTs())
updateCache = true
} else {
if startResp.Status != loghttp.QueryStatusSuccess {
return startResp, nil
}
result = mergeLokiResponse(startResp, result)
}
}
// if we have data at the end, we need to merge it with the cached data if it's empty and update the cache.
// If it's not empty only merge the response.
if endResp != nil {
if isEmpty(endResp) {
cachedRequest = cachedRequest.WithStartEndTime(cachedRequest.GetStartTs(), endRequest.GetEndTs())
updateCache = true
} else {
if endResp.Status != loghttp.QueryStatusSuccess {
return endResp, nil
}
result = mergeLokiResponse(endResp, result)
}
}
// we need to update the cache since we fetched more either at the end or the start and it was empty.
if updateCache {
data, err := proto.Marshal(cachedRequest)
if err != nil {
level.Warn(l.logger).Log("msg", "error marshalling request", "err", err)
return result, err
}
// cache the result
err = l.cache.Store(ctx, []string{cache.HashKey(cacheKey)}, [][]byte{data})
if err != nil {
level.Warn(l.logger).Log("msg", "error storing cache", "err", err)
}
}
return result, nil
}
func isEmpty(lokiRes *LokiResponse) bool {
return lokiRes.Status == loghttp.QueryStatusSuccess && len(lokiRes.Data.Result) == 0
}
func emptyResponse(lokiReq *LokiRequest) *LokiResponse {
return &LokiResponse{
Status: loghttp.QueryStatusSuccess,
Statistics: stats.Result{},
Direction: lokiReq.Direction,
Limit: lokiReq.Limit,
Version: uint32(loghttp.GetVersion(lokiReq.Path)),
Data: LokiData{
ResultType: loghttp.ResultTypeStream,
Result: []logproto.Stream{},
},
}
}

@ -0,0 +1,452 @@
package queryrange
import (
"context"
"fmt"
"testing"
"time"
"github.com/go-kit/log"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/pkg/storage/chunk/cache"
)
func Test_LogResultCacheSameRange(t *testing.T) {
var (
ctx = user.InjectOrgID(context.Background(), "foo")
lrc = NewLogResultCache(
log.NewNopLogger(),
fakeLimits{
splits: map[string]time.Duration{"foo": time.Minute},
},
cache.NewMockCache(),
nil,
nil,
)
)
req := &LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()),
EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()),
}
fake := newFakeResponse([]mockResponse{
{
RequestResponse: queryrangebase.RequestResponse{
Request: req,
Response: emptyResponse(req),
},
},
})
h := lrc.Wrap(fake)
resp, err := h.Do(ctx, req)
require.NoError(t, err)
require.Equal(t, emptyResponse(req), resp)
resp, err = h.Do(ctx, req)
require.NoError(t, err)
require.Equal(t, emptyResponse(req), resp)
fake.AssertExpectations(t)
}
func Test_LogResultCacheSameRangeNonEmpty(t *testing.T) {
var (
ctx = user.InjectOrgID(context.Background(), "foo")
lrc = NewLogResultCache(
log.NewNopLogger(),
fakeLimits{
splits: map[string]time.Duration{"foo": time.Minute},
},
cache.NewMockCache(),
nil,
nil,
)
)
req := &LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()),
EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()),
}
fake := newFakeResponse([]mockResponse{
{
RequestResponse: queryrangebase.RequestResponse{
Request: req,
Response: nonEmptyResponse(req, 1),
},
},
{
RequestResponse: queryrangebase.RequestResponse{
Request: req,
Response: nonEmptyResponse(req, 2),
},
},
})
h := lrc.Wrap(fake)
resp, err := h.Do(ctx, req)
require.NoError(t, err)
require.Equal(t, nonEmptyResponse(req, 1), resp)
resp, err = h.Do(ctx, req)
require.NoError(t, err)
require.Equal(t, nonEmptyResponse(req, 2), resp)
fake.AssertExpectations(t)
}
func Test_LogResultCacheSmallerRange(t *testing.T) {
var (
ctx = user.InjectOrgID(context.Background(), "foo")
lrc = NewLogResultCache(
log.NewNopLogger(),
fakeLimits{
splits: map[string]time.Duration{"foo": time.Minute},
},
cache.NewMockCache(),
nil,
nil,
)
)
req := &LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()),
EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()),
}
fake := newFakeResponse([]mockResponse{
{
RequestResponse: queryrangebase.RequestResponse{
Request: req,
Response: emptyResponse(req),
},
},
})
h := lrc.Wrap(fake)
resp, err := h.Do(ctx, req)
require.NoError(t, err)
require.Equal(t, emptyResponse(req), resp)
resp, err = h.Do(ctx, &LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()),
EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()-30*time.Second.Nanoseconds()),
})
require.NoError(t, err)
require.Equal(t, emptyResponse(&LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()),
EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()-30*time.Second.Nanoseconds()),
}), resp)
fake.AssertExpectations(t)
}
func Test_LogResultCacheDifferentRange(t *testing.T) {
var (
ctx = user.InjectOrgID(context.Background(), "foo")
lrc = NewLogResultCache(
log.NewNopLogger(),
fakeLimits{
splits: map[string]time.Duration{"foo": time.Minute},
},
cache.NewMockCache(),
nil,
nil,
)
)
req1 := &LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()),
EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()-30*time.Second.Nanoseconds()),
}
req2 := &LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()),
EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()),
}
fake := newFakeResponse([]mockResponse{
{
RequestResponse: queryrangebase.RequestResponse{
Request: req1,
Response: emptyResponse(req1),
},
},
{
RequestResponse: queryrangebase.RequestResponse{
Request: &LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()),
},
Response: emptyResponse(&LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()),
}),
},
},
{
RequestResponse: queryrangebase.RequestResponse{
Request: &LokiRequest{
StartTs: time.Unix(0, 2*time.Minute.Nanoseconds()-30*time.Second.Nanoseconds()),
EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()),
},
Response: emptyResponse(&LokiRequest{
StartTs: time.Unix(0, 2*time.Minute.Nanoseconds()-30*time.Second.Nanoseconds()),
EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()),
}),
},
},
})
h := lrc.Wrap(fake)
resp, err := h.Do(ctx, req1)
require.NoError(t, err)
require.Equal(t, emptyResponse(req1), resp)
resp, err = h.Do(ctx, req2)
require.NoError(t, err)
require.Equal(t, emptyResponse(req2), resp)
fake.AssertExpectations(t)
}
func Test_LogResultCacheDifferentRangeNonEmpty(t *testing.T) {
var (
ctx = user.InjectOrgID(context.Background(), "foo")
lrc = NewLogResultCache(
log.NewNopLogger(),
fakeLimits{
splits: map[string]time.Duration{"foo": time.Minute},
},
cache.NewMockCache(),
nil,
nil,
)
)
req1 := &LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()),
EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()-30*time.Second.Nanoseconds()),
}
req2 := &LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()),
EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()),
}
fake := newFakeResponse([]mockResponse{
{
RequestResponse: queryrangebase.RequestResponse{
Request: req1,
Response: emptyResponse(req1),
},
},
{
RequestResponse: queryrangebase.RequestResponse{
Request: &LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()),
},
Response: nonEmptyResponse(&LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()),
}, 1),
},
},
{
RequestResponse: queryrangebase.RequestResponse{
Request: &LokiRequest{
StartTs: time.Unix(0, 2*time.Minute.Nanoseconds()-30*time.Second.Nanoseconds()),
EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()),
},
Response: nonEmptyResponse(&LokiRequest{
StartTs: time.Unix(0, 2*time.Minute.Nanoseconds()-30*time.Second.Nanoseconds()),
EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()),
}, 2),
},
},
})
h := lrc.Wrap(fake)
resp, err := h.Do(ctx, req1)
require.NoError(t, err)
require.Equal(t, emptyResponse(req1), resp)
resp, err = h.Do(ctx, req2)
require.NoError(t, err)
require.Equal(t, mergeLokiResponse(
nonEmptyResponse(&LokiRequest{
StartTs: time.Unix(0, 2*time.Minute.Nanoseconds()-30*time.Second.Nanoseconds()),
EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()),
}, 2),
nonEmptyResponse(&LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()),
}, 1),
), resp)
fake.AssertExpectations(t)
}
func Test_LogResultCacheDifferentRangeNonEmptyAndEmpty(t *testing.T) {
var (
ctx = user.InjectOrgID(context.Background(), "foo")
lrc = NewLogResultCache(
log.NewNopLogger(),
fakeLimits{
splits: map[string]time.Duration{"foo": time.Minute},
},
cache.NewMockCache(),
nil,
nil,
)
)
req1 := &LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()),
EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()-30*time.Second.Nanoseconds()),
}
req2 := &LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()),
EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()),
}
fake := newFakeResponse([]mockResponse{
{
RequestResponse: queryrangebase.RequestResponse{
Request: req1,
Response: emptyResponse(req1),
},
},
{
RequestResponse: queryrangebase.RequestResponse{
Request: &LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()),
},
Response: emptyResponse(&LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()),
}),
},
},
{
RequestResponse: queryrangebase.RequestResponse{
Request: &LokiRequest{
StartTs: time.Unix(0, 2*time.Minute.Nanoseconds()-30*time.Second.Nanoseconds()),
EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()),
},
Response: nonEmptyResponse(&LokiRequest{
StartTs: time.Unix(0, 2*time.Minute.Nanoseconds()-30*time.Second.Nanoseconds()),
EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()),
}, 2),
},
},
// we call it twice
{
RequestResponse: queryrangebase.RequestResponse{
Request: &LokiRequest{
StartTs: time.Unix(0, 2*time.Minute.Nanoseconds()-30*time.Second.Nanoseconds()),
EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()),
},
Response: nonEmptyResponse(&LokiRequest{
StartTs: time.Unix(0, 2*time.Minute.Nanoseconds()-30*time.Second.Nanoseconds()),
EndTs: time.Unix(0, 2*time.Minute.Nanoseconds()),
}, 2),
},
},
})
h := lrc.Wrap(fake)
resp, err := h.Do(ctx, req1)
require.NoError(t, err)
require.Equal(t, emptyResponse(req1), resp)
resp, err = h.Do(ctx, req2)
require.NoError(t, err)
require.Equal(t, mergeLokiResponse(
emptyResponse(req1),
nonEmptyResponse(&LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()),
}, 1),
), resp)
resp, err = h.Do(ctx, req2)
require.NoError(t, err)
require.Equal(t, mergeLokiResponse(
emptyResponse(req1),
nonEmptyResponse(&LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()),
}, 1),
), resp)
fake.AssertExpectations(t)
}
type fakeResponse struct {
*mock.Mock
}
type mockResponse struct {
queryrangebase.RequestResponse
err error
}
func newFakeResponse(responses []mockResponse) fakeResponse {
m := &mock.Mock{}
for _, r := range responses {
m.On("Do", mock.Anything, r.Request).Return(r.Response, r.err).Once()
}
return fakeResponse{
Mock: m,
}
}
func (f fakeResponse) Do(ctx context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
var (
resp queryrangebase.Response
err error
args = f.Mock.Called(ctx, r)
)
if args.Get(0) != nil {
resp = args.Get(0).(queryrangebase.Response)
}
if args.Get(1) != nil {
err = args.Get(1).(error)
}
return resp, err
}
func nonEmptyResponse(lokiReq *LokiRequest, i int) *LokiResponse {
return &LokiResponse{
Status: loghttp.QueryStatusSuccess,
Statistics: stats.Result{},
Direction: lokiReq.Direction,
Limit: lokiReq.Limit,
Version: uint32(loghttp.GetVersion(lokiReq.Path)),
Data: LokiData{
ResultType: loghttp.ResultTypeStream,
Result: []logproto.Stream{
{
Labels: `{foo="bar"}`,
Entries: []logproto.Entry{
{
Timestamp: time.Unix(1, 0),
Line: fmt.Sprintf("%d", i),
},
},
},
},
},
}
}

@ -0,0 +1,26 @@
package queryrange
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
)
type Metrics struct {
*queryrangebase.InstrumentMiddlewareMetrics
*queryrangebase.RetryMiddlewareMetrics
*logql.ShardingMetrics
*SplitByMetrics
*LogResultCacheMetrics
}
func NewMetrics(registerer prometheus.Registerer) *Metrics {
return &Metrics{
InstrumentMiddlewareMetrics: queryrangebase.NewInstrumentMiddlewareMetrics(registerer),
RetryMiddlewareMetrics: queryrangebase.NewRetryMiddlewareMetrics(registerer),
ShardingMetrics: logql.NewShardingMetrics(registerer),
SplitByMetrics: NewSplitByMetrics(registerer),
LogResultCacheMetrics: NewLogResultCacheMetrics(registerer),
}
}

@ -128,7 +128,6 @@ type ShouldCacheFn func(r Request) bool
type resultsCache struct {
logger log.Logger
cfg ResultsCacheConfig
next Handler
cache cache.Cache
limits Limits
@ -149,7 +148,7 @@ type resultsCache struct {
// see `generateKey`.
func NewResultsCacheMiddleware(
logger log.Logger,
cfg ResultsCacheConfig,
c cache.Cache,
splitter CacheSplitter,
limits Limits,
merger Merger,
@ -157,15 +156,7 @@ func NewResultsCacheMiddleware(
cacheGenNumberLoader CacheGenNumberLoader,
shouldCache ShouldCacheFn,
reg prometheus.Registerer,
) (Middleware, cache.Cache, error) {
c, err := cache.New(cfg.CacheConfig, reg, logger)
if err != nil {
return nil, nil, err
}
if cfg.Compression == "snappy" {
c = cache.NewSnappy(c, logger)
}
) (Middleware, error) {
if cacheGenNumberLoader != nil {
c = cache.NewCacheGenNumMiddleware(c)
}
@ -173,7 +164,6 @@ func NewResultsCacheMiddleware(
return MiddlewareFunc(func(next Handler) Handler {
return &resultsCache{
logger: logger,
cfg: cfg,
next: next,
cache: c,
limits: limits,
@ -184,7 +174,7 @@ func NewResultsCacheMiddleware(
cacheGenNumberLoader: cacheGenNumberLoader,
shouldCache: shouldCache,
}
}), c, nil
}), nil
}
func (s resultsCache) Do(ctx context.Context, r Request) (Response, error) {

@ -430,7 +430,8 @@ func TestPartition(t *testing.T) {
&PrometheusRequest{
Start: 0,
End: 100,
}},
},
},
},
{
name: "Test a partial hit.",
@ -751,9 +752,11 @@ func TestResultsCache(t *testing.T) {
Cache: cache.NewMockCache(),
},
}
rcm, _, err := NewResultsCacheMiddleware(
c, err := cache.New(cfg.CacheConfig, nil, log.NewNopLogger())
require.NoError(t, err)
rcm, err := NewResultsCacheMiddleware(
log.NewNopLogger(),
cfg,
c,
constSplitter(day),
mockLimits{},
PrometheusCodec,
@ -791,9 +794,11 @@ func TestResultsCacheRecent(t *testing.T) {
var cfg ResultsCacheConfig
flagext.DefaultValues(&cfg)
cfg.CacheConfig.Cache = cache.NewMockCache()
rcm, _, err := NewResultsCacheMiddleware(
c, err := cache.New(cfg.CacheConfig, nil, log.NewNopLogger())
require.NoError(t, err)
rcm, err := NewResultsCacheMiddleware(
log.NewNopLogger(),
cfg,
c,
constSplitter(day),
mockLimits{maxCacheFreshness: 10 * time.Minute},
PrometheusCodec,
@ -852,11 +857,12 @@ func TestResultsCacheMaxFreshness(t *testing.T) {
var cfg ResultsCacheConfig
flagext.DefaultValues(&cfg)
cfg.CacheConfig.Cache = cache.NewMockCache()
c, err := cache.New(cfg.CacheConfig, nil, log.NewNopLogger())
require.NoError(t, err)
fakeLimits := tc.fakeLimits
rcm, _, err := NewResultsCacheMiddleware(
rcm, err := NewResultsCacheMiddleware(
log.NewNopLogger(),
cfg,
c,
constSplitter(day),
fakeLimits,
PrometheusCodec,
@ -891,9 +897,11 @@ func Test_resultsCache_MissingData(t *testing.T) {
Cache: cache.NewMockCache(),
},
}
rm, _, err := NewResultsCacheMiddleware(
c, err := cache.New(cfg.CacheConfig, nil, log.NewNopLogger())
require.NoError(t, err)
rm, err := NewResultsCacheMiddleware(
log.NewNopLogger(),
cfg,
c,
constSplitter(day),
mockLimits{},
PrometheusCodec,
@ -996,9 +1004,11 @@ func TestResultsCacheShouldCacheFunc(t *testing.T) {
var cfg ResultsCacheConfig
flagext.DefaultValues(&cfg)
cfg.CacheConfig.Cache = cache.NewMockCache()
rcm, _, err := NewResultsCacheMiddleware(
c, err := cache.New(cfg.CacheConfig, nil, log.NewNopLogger())
require.NoError(t, err)
rcm, err := NewResultsCacheMiddleware(
log.NewNopLogger(),
cfg,
c,
constSplitter(day),
mockLimits{maxCacheFreshness: 10 * time.Minute},
PrometheusCodec,
@ -1024,8 +1034,7 @@ func TestResultsCacheShouldCacheFunc(t *testing.T) {
}
}
type mockCacheGenNumberLoader struct {
}
type mockCacheGenNumberLoader struct{}
func newMockCacheGenNumberLoader() CacheGenNumberLoader {
return mockCacheGenNumberLoader{}

@ -21,36 +21,21 @@ import (
"io"
"io/ioutil"
"net/http"
"strings"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/flagext"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/promql"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/tenant"
"github.com/grafana/loki/pkg/util"
)
const day = 24 * time.Hour
var (
// PassthroughMiddleware is a noop middleware
PassthroughMiddleware = MiddlewareFunc(func(next Handler) Handler {
return next
})
errInvalidMinShardingLookback = errors.New("a non-zero value is required for querier.query-ingesters-within when -querier.parallelise-shardable-queries is enabled")
)
// PassthroughMiddleware is a noop middleware
var PassthroughMiddleware = MiddlewareFunc(func(next Handler) Handler {
return next
})
// Config for query_range middleware chain.
type Config struct {
@ -137,114 +122,6 @@ func (f RoundTripFunc) RoundTrip(r *http.Request) (*http.Response, error) {
return f(r)
}
// NewTripperware returns a Tripperware configured with middlewares to limit, align, split, retry and cache requests.
func NewTripperware(
cfg Config,
log log.Logger,
limits Limits,
codec Codec,
cacheExtractor Extractor,
schema chunk.SchemaConfig,
engineOpts promql.EngineOpts,
minShardingLookback time.Duration,
registerer prometheus.Registerer,
cacheGenNumberLoader CacheGenNumberLoader,
) (Tripperware, cache.Cache, error) {
// Per tenant query metrics.
queriesPerTenant := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_frontend_queries_total",
Help: "Total queries sent per tenant.",
}, []string{"op", "user"})
activeUsers := util.NewActiveUsersCleanupWithDefaultValues(func(user string) {
err := util.DeleteMatchingLabels(queriesPerTenant, map[string]string{"user": user})
if err != nil {
level.Warn(log).Log("msg", "failed to remove cortex_query_frontend_queries_total metric for user", "user", user)
}
})
// Metric used to keep track of each middleware execution duration.
metrics := NewInstrumentMiddlewareMetrics(registerer)
queryRangeMiddleware := []Middleware{NewLimitsMiddleware(limits)}
if cfg.AlignQueriesWithStep {
queryRangeMiddleware = append(queryRangeMiddleware, InstrumentMiddleware("step_align", metrics), StepAlignMiddleware)
}
if cfg.SplitQueriesByInterval != 0 {
staticIntervalFn := func(_ Request) time.Duration { return cfg.SplitQueriesByInterval }
queryRangeMiddleware = append(queryRangeMiddleware, InstrumentMiddleware("split_by_interval", metrics), SplitByIntervalMiddleware(staticIntervalFn, limits, codec, registerer))
}
var c cache.Cache
if cfg.CacheResults {
shouldCache := func(r Request) bool {
return !r.GetCachingOptions().Disabled
}
queryCacheMiddleware, cache, err := NewResultsCacheMiddleware(log, cfg.ResultsCacheConfig, constSplitter(cfg.SplitQueriesByInterval), limits, codec, cacheExtractor, cacheGenNumberLoader, shouldCache, registerer)
if err != nil {
return nil, nil, err
}
c = cache
queryRangeMiddleware = append(queryRangeMiddleware, InstrumentMiddleware("results_cache", metrics), queryCacheMiddleware)
}
if cfg.ShardedQueries {
if minShardingLookback == 0 {
return nil, nil, errInvalidMinShardingLookback
}
shardingware := NewQueryShardMiddleware(
log,
promql.NewEngine(engineOpts),
schema.Configs,
codec,
minShardingLookback,
metrics,
registerer,
)
queryRangeMiddleware = append(
queryRangeMiddleware,
shardingware, // instrumentation is included in the sharding middleware
)
}
if cfg.MaxRetries > 0 {
queryRangeMiddleware = append(queryRangeMiddleware, InstrumentMiddleware("retry", metrics), NewRetryMiddleware(log, cfg.MaxRetries, NewRetryMiddlewareMetrics(registerer)))
}
// Start cleanup. If cleaner stops or fail, we will simply not clean the metrics for inactive users.
_ = activeUsers.StartAsync(context.Background())
return func(next http.RoundTripper) http.RoundTripper {
// Finally, if the user selected any query range middleware, stitch it in.
if len(queryRangeMiddleware) > 0 {
queryrange := NewRoundTripper(next, codec, cfg.ForwardHeaders, queryRangeMiddleware...)
return RoundTripFunc(func(r *http.Request) (*http.Response, error) {
isQueryRange := strings.HasSuffix(r.URL.Path, "/query_range")
op := "query"
if isQueryRange {
op = "query_range"
}
tenantIDs, err := tenant.TenantIDs(r.Context())
// This should never happen anyways because we have auth middleware before this.
if err != nil {
return nil, err
}
userStr := tenant.JoinTenantIDs(tenantIDs)
activeUsers.UpdateUserTimestamp(userStr, time.Now())
queriesPerTenant.WithLabelValues(op, userStr).Inc()
if !isQueryRange {
return next.RoundTrip(r)
}
return queryrange.RoundTrip(r)
})
}
return next
}, c, nil
}
type roundTripper struct {
next http.RoundTripper
handler Handler
@ -265,7 +142,6 @@ func NewRoundTripper(next http.RoundTripper, codec Codec, headers []string, midd
}
func (q roundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
// include the headers specified in the roundTripper during decoding the request.
request, err := q.codec.DecodeRequest(r.Context(), r, q.headers)
if err != nil {

@ -1,100 +1,9 @@
package queryrangebase
import (
"context"
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
"strconv"
"testing"
"time"
"github.com/go-kit/log"
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/storage/chunk"
)
func TestRoundTrip(t *testing.T) {
s := httptest.NewServer(
middleware.AuthenticateUser.Wrap(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var err error
if r.RequestURI == query {
_, err = w.Write([]byte(responseBody))
} else {
_, err = w.Write([]byte("bar"))
}
if err != nil {
t.Fatal(err)
}
}),
),
)
defer s.Close()
u, err := url.Parse(s.URL)
require.NoError(t, err)
downstream := singleHostRoundTripper{
host: u.Host,
next: http.DefaultTransport,
}
tw, _, err := NewTripperware(Config{},
log.NewNopLogger(),
mockLimits{},
PrometheusCodec,
nil,
chunk.SchemaConfig{},
promql.EngineOpts{
Logger: log.NewNopLogger(),
Reg: nil,
MaxSamples: 1000,
Timeout: time.Minute,
},
0,
nil,
nil,
)
if err != nil {
t.Fatal(err)
}
for i, tc := range []struct {
path, expectedBody string
}{
{"/foo", "bar"},
{query, responseBody},
} {
t.Run(strconv.Itoa(i), func(t *testing.T) {
req, err := http.NewRequest("GET", tc.path, http.NoBody)
require.NoError(t, err)
// query-frontend doesn't actually authenticate requests, we rely on
// the queriers to do this. Hence we ensure the request doesn't have a
// org ID in the ctx, but does have the header.
ctx := user.InjectOrgID(context.Background(), "1")
req = req.WithContext(ctx)
err = user.InjectOrgIDIntoHTTPRequest(ctx, req)
require.NoError(t, err)
resp, err := tw(downstream).RoundTrip(req)
require.NoError(t, err)
require.Equal(t, 200, resp.StatusCode)
bs, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)
require.Equal(t, tc.expectedBody, string(bs))
})
}
}
type singleHostRoundTripper struct {
host string
next http.RoundTripper
@ -105,20 +14,3 @@ func (s singleHostRoundTripper) RoundTrip(r *http.Request) (*http.Response, erro
r.URL.Host = s.host
return s.next.RoundTrip(r)
}
func Test_ShardingConfigError(t *testing.T) {
_, _, err := NewTripperware(
Config{ShardedQueries: true},
log.NewNopLogger(),
nil,
nil,
nil,
chunk.SchemaConfig{},
promql.EngineOpts{},
0,
nil,
nil,
)
require.EqualError(t, err, errInvalidMinShardingLookback.Error())
}

@ -12,7 +12,6 @@ import (
"github.com/weaveworks/common/httpgrpc"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/pkg/storage/chunk"
@ -43,35 +42,46 @@ func NewTripperware(
schema chunk.SchemaConfig,
registerer prometheus.Registerer,
) (queryrangebase.Tripperware, Stopper, error) {
instrumentMetrics := queryrangebase.NewInstrumentMiddlewareMetrics(registerer)
retryMetrics := queryrangebase.NewRetryMiddlewareMetrics(registerer)
shardingMetrics := logql.NewShardingMetrics(registerer)
splitByMetrics := NewSplitByMetrics(registerer)
metrics := NewMetrics(registerer)
metricsTripperware, cache, err := NewMetricTripperware(cfg, log, limits, schema, LokiCodec,
PrometheusExtractor{}, instrumentMetrics, retryMetrics, shardingMetrics, splitByMetrics, registerer)
var (
c cache.Cache
err error
)
if cfg.CacheResults {
c, err = cache.New(cfg.CacheConfig, registerer, log)
if err != nil {
return nil, nil, err
}
if cfg.Compression == "snappy" {
c = cache.NewSnappy(c, log)
}
}
metricsTripperware, err := NewMetricTripperware(cfg, log, limits, schema, LokiCodec, c,
PrometheusExtractor{}, metrics, registerer)
if err != nil {
return nil, nil, err
}
// NOTE: When we would start caching response from non-metric queries we would have to consider cache gen headers as well in
// MergeResponse implementation for Loki codecs same as it is done in Cortex at https://github.com/cortexproject/cortex/blob/21bad57b346c730d684d6d0205efef133422ab28/pkg/querier/queryrange/query_range.go#L170
logFilterTripperware, err := NewLogFilterTripperware(cfg, log, limits, schema, LokiCodec, instrumentMetrics, retryMetrics, shardingMetrics, splitByMetrics)
logFilterTripperware, err := NewLogFilterTripperware(cfg, log, limits, schema, LokiCodec, c, metrics)
if err != nil {
return nil, nil, err
}
seriesTripperware, err := NewSeriesTripperware(cfg, log, limits, LokiCodec, instrumentMetrics, retryMetrics, splitByMetrics, shardingMetrics, schema)
seriesTripperware, err := NewSeriesTripperware(cfg, log, limits, LokiCodec, metrics, schema)
if err != nil {
return nil, nil, err
}
labelsTripperware, err := NewLabelsTripperware(cfg, log, limits, LokiCodec, instrumentMetrics, retryMetrics, splitByMetrics)
labelsTripperware, err := NewLabelsTripperware(cfg, log, limits, LokiCodec, metrics)
if err != nil {
return nil, nil, err
}
instantMetricTripperware, err := NewInstantMetricTripperware(cfg, log, limits, schema, LokiCodec, instrumentMetrics, retryMetrics, shardingMetrics, splitByMetrics)
instantMetricTripperware, err := NewInstantMetricTripperware(cfg, log, limits, schema, LokiCodec, metrics)
if err != nil {
return nil, nil, err
}
@ -82,7 +92,7 @@ func NewTripperware(
labelsRT := labelsTripperware(next)
instantRT := instantMetricTripperware(next)
return newRoundTripper(next, logFilterRT, metricRT, seriesRT, labelsRT, instantRT, limits)
}, cache, nil
}, c, nil
}
type roundTripper struct {
@ -235,16 +245,31 @@ func NewLogFilterTripperware(
limits Limits,
schema chunk.SchemaConfig,
codec queryrangebase.Codec,
instrumentMetrics *queryrangebase.InstrumentMiddlewareMetrics,
retryMiddlewareMetrics *queryrangebase.RetryMiddlewareMetrics,
shardingMetrics *logql.ShardingMetrics,
splitByMetrics *SplitByMetrics,
c cache.Cache,
metrics *Metrics,
) (queryrangebase.Tripperware, error) {
queryRangeMiddleware := []queryrangebase.Middleware{
StatsCollectorMiddleware(),
NewLimitsMiddleware(limits),
queryrangebase.InstrumentMiddleware("split_by_interval", instrumentMetrics),
SplitByIntervalMiddleware(limits, codec, splitByTime, splitByMetrics),
queryrangebase.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics),
SplitByIntervalMiddleware(limits, codec, splitByTime, metrics.SplitByMetrics),
}
if cfg.CacheResults {
queryCacheMiddleware := NewLogResultCache(
log,
limits,
c,
func(r queryrangebase.Request) bool {
return !r.GetCachingOptions().Disabled
},
metrics.LogResultCacheMetrics,
)
queryRangeMiddleware = append(
queryRangeMiddleware,
queryrangebase.InstrumentMiddleware("log_results_cache", metrics.InstrumentMiddlewareMetrics),
queryCacheMiddleware,
)
}
if cfg.ShardedQueries {
@ -252,15 +277,18 @@ func NewLogFilterTripperware(
NewQueryShardMiddleware(
log,
schema.Configs,
instrumentMetrics, // instrumentation is included in the sharding middleware
shardingMetrics,
metrics.InstrumentMiddlewareMetrics, // instrumentation is included in the sharding middleware
metrics.ShardingMetrics,
limits,
),
)
}
if cfg.MaxRetries > 0 {
queryRangeMiddleware = append(queryRangeMiddleware, queryrangebase.InstrumentMiddleware("retry", instrumentMetrics), queryrangebase.NewRetryMiddleware(log, cfg.MaxRetries, retryMiddlewareMetrics))
queryRangeMiddleware = append(
queryRangeMiddleware, queryrangebase.InstrumentMiddleware("retry", metrics.InstrumentMiddlewareMetrics),
queryrangebase.NewRetryMiddleware(log, cfg.MaxRetries, metrics.RetryMiddlewareMetrics),
)
}
return func(next http.RoundTripper) http.RoundTripper {
@ -277,23 +305,23 @@ func NewSeriesTripperware(
log log.Logger,
limits Limits,
codec queryrangebase.Codec,
instrumentMetrics *queryrangebase.InstrumentMiddlewareMetrics,
retryMiddlewareMetrics *queryrangebase.RetryMiddlewareMetrics,
splitByMetrics *SplitByMetrics,
shardingMetrics *logql.ShardingMetrics,
metrics *Metrics,
schema chunk.SchemaConfig,
) (queryrangebase.Tripperware, error) {
queryRangeMiddleware := []queryrangebase.Middleware{
NewLimitsMiddleware(limits),
queryrangebase.InstrumentMiddleware("split_by_interval", instrumentMetrics),
queryrangebase.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics),
// The Series API needs to pull one chunk per series to extract the label set, which is much cheaper than iterating through all matching chunks.
// Force a 24 hours split by for series API, this will be more efficient with our static daily bucket storage.
// This would avoid queriers downloading chunks for same series over and over again for serving smaller queries.
SplitByIntervalMiddleware(WithSplitByLimits(limits, 24*time.Hour), codec, splitByTime, splitByMetrics),
SplitByIntervalMiddleware(WithSplitByLimits(limits, 24*time.Hour), codec, splitByTime, metrics.SplitByMetrics),
}
if cfg.MaxRetries > 0 {
queryRangeMiddleware = append(queryRangeMiddleware, queryrangebase.InstrumentMiddleware("retry", instrumentMetrics), queryrangebase.NewRetryMiddleware(log, cfg.MaxRetries, retryMiddlewareMetrics))
queryRangeMiddleware = append(queryRangeMiddleware,
queryrangebase.InstrumentMiddleware("retry", metrics.InstrumentMiddlewareMetrics),
queryrangebase.NewRetryMiddleware(log, cfg.MaxRetries, metrics.RetryMiddlewareMetrics),
)
}
if cfg.ShardedQueries {
@ -301,8 +329,8 @@ func NewSeriesTripperware(
NewSeriesQueryShardMiddleware(
log,
schema.Configs,
instrumentMetrics,
shardingMetrics,
metrics.InstrumentMiddlewareMetrics,
metrics.ShardingMetrics,
limits,
codec,
),
@ -323,20 +351,21 @@ func NewLabelsTripperware(
log log.Logger,
limits Limits,
codec queryrangebase.Codec,
instrumentMetrics *queryrangebase.InstrumentMiddlewareMetrics,
retryMiddlewareMetrics *queryrangebase.RetryMiddlewareMetrics,
splitByMetrics *SplitByMetrics,
metrics *Metrics,
) (queryrangebase.Tripperware, error) {
queryRangeMiddleware := []queryrangebase.Middleware{
NewLimitsMiddleware(limits),
queryrangebase.InstrumentMiddleware("split_by_interval", instrumentMetrics),
queryrangebase.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics),
// Force a 24 hours split by for labels API, this will be more efficient with our static daily bucket storage.
// This is because the labels API is an index-only operation.
SplitByIntervalMiddleware(WithSplitByLimits(limits, 24*time.Hour), codec, splitByTime, splitByMetrics),
SplitByIntervalMiddleware(WithSplitByLimits(limits, 24*time.Hour), codec, splitByTime, metrics.SplitByMetrics),
}
if cfg.MaxRetries > 0 {
queryRangeMiddleware = append(queryRangeMiddleware, queryrangebase.InstrumentMiddleware("retry", instrumentMetrics), queryrangebase.NewRetryMiddleware(log, cfg.MaxRetries, retryMiddlewareMetrics))
queryRangeMiddleware = append(queryRangeMiddleware,
queryrangebase.InstrumentMiddleware("retry", metrics.InstrumentMiddlewareMetrics),
queryrangebase.NewRetryMiddleware(log, cfg.MaxRetries, metrics.RetryMiddlewareMetrics),
)
}
return func(next http.RoundTripper) http.RoundTripper {
@ -355,33 +384,30 @@ func NewMetricTripperware(
limits Limits,
schema chunk.SchemaConfig,
codec queryrangebase.Codec,
c cache.Cache,
extractor queryrangebase.Extractor,
instrumentMetrics *queryrangebase.InstrumentMiddlewareMetrics,
retryMiddlewareMetrics *queryrangebase.RetryMiddlewareMetrics,
shardingMetrics *logql.ShardingMetrics,
splitByMetrics *SplitByMetrics,
metrics *Metrics,
registerer prometheus.Registerer,
) (queryrangebase.Tripperware, Stopper, error) {
) (queryrangebase.Tripperware, error) {
queryRangeMiddleware := []queryrangebase.Middleware{StatsCollectorMiddleware(), NewLimitsMiddleware(limits)}
if cfg.AlignQueriesWithStep {
queryRangeMiddleware = append(
queryRangeMiddleware,
queryrangebase.InstrumentMiddleware("step_align", instrumentMetrics),
queryrangebase.InstrumentMiddleware("step_align", metrics.InstrumentMiddlewareMetrics),
queryrangebase.StepAlignMiddleware,
)
}
queryRangeMiddleware = append(
queryRangeMiddleware,
queryrangebase.InstrumentMiddleware("split_by_interval", instrumentMetrics),
SplitByIntervalMiddleware(limits, codec, splitMetricByTime, splitByMetrics),
queryrangebase.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics),
SplitByIntervalMiddleware(limits, codec, splitMetricByTime, metrics.SplitByMetrics),
)
var c cache.Cache
if cfg.CacheResults {
queryCacheMiddleware, cache, err := queryrangebase.NewResultsCacheMiddleware(
queryCacheMiddleware, err := queryrangebase.NewResultsCacheMiddleware(
log,
cfg.ResultsCacheConfig,
c,
cacheKeyLimits{limits},
limits,
codec,
@ -393,12 +419,11 @@ func NewMetricTripperware(
registerer,
)
if err != nil {
return nil, nil, err
return nil, err
}
c = cache
queryRangeMiddleware = append(
queryRangeMiddleware,
queryrangebase.InstrumentMiddleware("results_cache", instrumentMetrics),
queryrangebase.InstrumentMiddleware("results_cache", metrics.InstrumentMiddlewareMetrics),
queryCacheMiddleware,
)
}
@ -408,8 +433,8 @@ func NewMetricTripperware(
NewQueryShardMiddleware(
log,
schema.Configs,
instrumentMetrics, // instrumentation is included in the sharding middleware
shardingMetrics,
metrics.InstrumentMiddlewareMetrics, // instrumentation is included in the sharding middleware
metrics.ShardingMetrics,
limits,
),
)
@ -418,8 +443,8 @@ func NewMetricTripperware(
if cfg.MaxRetries > 0 {
queryRangeMiddleware = append(
queryRangeMiddleware,
queryrangebase.InstrumentMiddleware("retry", instrumentMetrics),
queryrangebase.NewRetryMiddleware(log, cfg.MaxRetries, retryMiddlewareMetrics),
queryrangebase.InstrumentMiddleware("retry", metrics.InstrumentMiddlewareMetrics),
queryrangebase.NewRetryMiddleware(log, cfg.MaxRetries, metrics.RetryMiddlewareMetrics),
)
}
@ -435,7 +460,7 @@ func NewMetricTripperware(
})
}
return next
}, c, nil
}, nil
}
// NewInstantMetricTripperware creates a new frontend tripperware responsible for handling metric queries
@ -445,10 +470,7 @@ func NewInstantMetricTripperware(
limits Limits,
schema chunk.SchemaConfig,
codec queryrangebase.Codec,
instrumentMetrics *queryrangebase.InstrumentMiddlewareMetrics,
retryMiddlewareMetrics *queryrangebase.RetryMiddlewareMetrics,
shardingMetrics *logql.ShardingMetrics,
splitByMetrics *SplitByMetrics,
metrics *Metrics,
) (queryrangebase.Tripperware, error) {
queryRangeMiddleware := []queryrangebase.Middleware{StatsCollectorMiddleware(), NewLimitsMiddleware(limits)}
@ -457,8 +479,8 @@ func NewInstantMetricTripperware(
NewQueryShardMiddleware(
log,
schema.Configs,
instrumentMetrics, // instrumentation is included in the sharding middleware
shardingMetrics,
metrics.InstrumentMiddlewareMetrics, // instrumentation is included in the sharding middleware
metrics.ShardingMetrics,
limits,
),
)
@ -467,8 +489,8 @@ func NewInstantMetricTripperware(
if cfg.MaxRetries > 0 {
queryRangeMiddleware = append(
queryRangeMiddleware,
queryrangebase.InstrumentMiddleware("retry", instrumentMetrics),
queryrangebase.NewRetryMiddleware(log, cfg.MaxRetries, retryMiddlewareMetrics),
queryrangebase.InstrumentMiddleware("retry", metrics.InstrumentMiddlewareMetrics),
queryrangebase.NewRetryMiddleware(log, cfg.MaxRetries, metrics.RetryMiddlewareMetrics),
)
}

@ -281,7 +281,6 @@ func forInterval(interval time.Duration, start, end time.Time, endTimeInclusive
} else if endTimeInclusive {
newEnd = newEnd.Add(-time.Millisecond)
}
if firstInterval {
callback(ogStart, newEnd)
firstInterval = false

Loading…
Cancel
Save