From 3e1f2fc273a5c173f13dfd069b11a305e8dc36c2 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Sat, 24 Jun 2023 01:27:06 +0530 Subject: [PATCH] caching: do not try to fill the gap in log results cache when the new query interval does not overlap the cached query interval (#9757) **What this PR does / why we need it**: Currently, when we find a relevant cached negative response for a logs query, we do the following: * If the cached query completely covers the new query: * return back an empty response. * else: * fill the gaps on either/both sides of the cached query. The problem with filling the gaps is that when the cached query does not overlap at all with the new query, we have to extend the query beyond what the query requests for. However, with the logs query, we have a limit on the number of lines we can send back in the response. So, this could result in the query response having logs which were not requested by the query, which then get filtered out by the [response extractor](https://github.com/grafana/loki/blob/b78d3f05525d8bcab13e621bc2e5851aadc8fc91/pkg/querier/queryrange/log_result_cache.go#L299), unexpectedly resulting in an empty response. For example, if the query was cached for start=15, end=20 and we get a `backwards` query for start=5, end=10. To fill the gap, the query would be executed for start=5, end=15. Now, if we have logs more than the query `limit` in the range 10-15, we would filter out all the data in the response extractor and send back an empty response to the user. This PR fixes the issue by doing the following changes when handling cache hit: * If the cached query completely covers the new query: * return back an empty response[_existing_]. * else if the cached query does not overlap with the new query: * do the new query as requested. * If the new query results in an empty response and has a higher interval than the cached query: * update the cache * else: * query the data for missing intervals on both/either side[_existing_] * update the cache with extended intervals if the new queries resulted in an empty response[_existing_] **Special notes for your reviewer**: We could do further improvements in the handling of queries not overlapping with cached query by selectively extending the queries based on query direction and cached query lying before/after the new query. For example, if the new query is doing `backwards` query and the `cachedQuery.End` < `newQuery.Start`, it should be okay to extend the query and do `cachedQuery.End` to `newQuery.End` to fill the cache since query would first fill the most relevant data before hitting the limits. I did not want to complicate the fix so went without implementing this approach. We can revisit later if we feel we need to improve our caching. **Checklist** - [x] Tests updated - [x] `CHANGELOG.md` updated --------- Co-authored-by: Travis Patterson --- CHANGELOG.md | 1 + pkg/querier/queryrange/log_result_cache.go | 154 ++++++++++-------- .../queryrange/log_result_cache_test.go | 104 +++++++++--- pkg/storage/chunk/cache/mock.go | 13 +- 4 files changed, 182 insertions(+), 90 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index db221b15cb..34445ad9cf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -56,6 +56,7 @@ * [9495](https://github.com/grafana/loki/pull/9495) **thampiotr**: Promtail: Fix potential goroutine leak in file tailer. * [9650](https://github.com/grafana/loki/pull/9650) **ashwanthgoli**: Config: ensure storage config defaults apply to named stores. * [9629](https://github.com/grafana/loki/pull/9629) **periklis**: Fix duplicate label values from ingester streams. +* [9757](https://github.com/grafana/loki/pull/9757) **sandeepsukhani**: Frontend Caching: Fix a bug in negative logs results cache causing Loki to unexpectedly send empty/incorrect results. * [9754](https://github.com/grafana/loki/pull/9754) **ashwanthgoli**: Fixes an issue with indexes becoming unqueriable if the index prefix is different from the one configured in the latest period config. * [9763](https://github.com/grafana/loki/pull/9763) **ssncferreira**: Fix the logic of the `offset` operator for downstream queries on instant query splitting of (range) vector aggregation expressions containing an offset. * [9773](https://github.com/grafana/loki/pull/9773) **ssncferreira**: Fix instant query summary statistic's `splits` corresponding to the number of subqueries a query is split into based on `split_queries_by_interval`. diff --git a/pkg/querier/queryrange/log_result_cache.go b/pkg/querier/queryrange/log_result_cache.go index 0d00a0d134..827229dd0d 100644 --- a/pkg/querier/queryrange/log_result_cache.go +++ b/pkg/querier/queryrange/log_result_cache.go @@ -187,77 +187,95 @@ func (l *logResultCache) handleHit(ctx context.Context, cacheKey string, cachedR if cachedRequest.StartTs.UnixNano() <= lokiReq.StartTs.UnixNano() && cachedRequest.EndTs.UnixNano() >= lokiReq.EndTs.UnixNano() { return result, nil } - // we could be missing data at the start and the end. - // so we're going to fetch what is missing. - var ( - startRequest, endRequest *LokiRequest - startResp, endResp *LokiResponse - updateCache bool - ok bool - ) - g, ctx := errgroup.WithContext(ctx) - - // if we're missing data at the start, start fetching from the start to the cached start. - if lokiReq.GetStartTs().Before(cachedRequest.GetStartTs()) { - g.Go(func() error { - startRequest = lokiReq.WithStartEndTime(lokiReq.GetStartTs(), cachedRequest.GetStartTs()) - resp, err := l.next.Do(ctx, startRequest) - if err != nil { - return err - } - startResp, ok = resp.(*LokiResponse) - if !ok { - return fmt.Errorf("unexpected response type %T", resp) - } - return nil - }) - } - - // if we're missing data at the end, start fetching from the cached end to the end. - if lokiReq.GetEndTs().After(cachedRequest.GetEndTs()) { - g.Go(func() error { - endRequest = lokiReq.WithStartEndTime(cachedRequest.GetEndTs(), lokiReq.GetEndTs()) - resp, err := l.next.Do(ctx, endRequest) - if err != nil { - return err - } - endResp, ok = resp.(*LokiResponse) - if !ok { - return fmt.Errorf("unexpected response type %T", resp) - } - return nil - }) - } - if err := g.Wait(); err != nil { - return nil, err - } + updateCache := false + // if the query does not overlap cached interval, do not try to fill the gap since it requires extending the queries beyond what is requested in the query. + // Extending the queries beyond what is requested could result in empty responses due to response limit set in the queries. + if !overlap(lokiReq.StartTs, lokiReq.EndTs, cachedRequest.StartTs, cachedRequest.EndTs) { + resp, err := l.next.Do(ctx, lokiReq) + if err != nil { + return nil, err + } + result = resp.(*LokiResponse) - // if we have data at the start, we need to merge it with the cached data if it's empty and update the cache. - // If it's not empty only merge the response. - if startResp != nil { - if isEmpty(startResp) { - cachedRequest = cachedRequest.WithStartEndTime(startRequest.GetStartTs(), cachedRequest.GetEndTs()) + // if the response is empty and the query is larger than what is cached, update the cache + if isEmpty(result) && (lokiReq.EndTs.UnixNano()-lokiReq.StartTs.UnixNano() > cachedRequest.EndTs.UnixNano()-cachedRequest.StartTs.UnixNano()) { + cachedRequest = cachedRequest.WithStartEndTime(lokiReq.GetStartTs(), lokiReq.GetEndTs()) updateCache = true - } else { - if startResp.Status != loghttp.QueryStatusSuccess { - return startResp, nil + } + } else { + // we could be missing data at the start and the end. + // so we're going to fetch what is missing. + var ( + startRequest, endRequest *LokiRequest + startResp, endResp *LokiResponse + ) + g, ctx := errgroup.WithContext(ctx) + + // if we're missing data at the start, start fetching from the start to the cached start. + if lokiReq.GetStartTs().Before(cachedRequest.GetStartTs()) { + g.Go(func() error { + startRequest = lokiReq.WithStartEndTime(lokiReq.GetStartTs(), cachedRequest.GetStartTs()) + resp, err := l.next.Do(ctx, startRequest) + if err != nil { + return err + } + var ok bool + startResp, ok = resp.(*LokiResponse) + if !ok { + return fmt.Errorf("unexpected response type %T", resp) + } + return nil + }) + } + + // if we're missing data at the end, start fetching from the cached end to the end. + if lokiReq.GetEndTs().After(cachedRequest.GetEndTs()) { + g.Go(func() error { + endRequest = lokiReq.WithStartEndTime(cachedRequest.GetEndTs(), lokiReq.GetEndTs()) + resp, err := l.next.Do(ctx, endRequest) + if err != nil { + return err + } + var ok bool + endResp, ok = resp.(*LokiResponse) + if !ok { + return fmt.Errorf("unexpected response type %T", resp) + } + return nil + }) + } + + if err := g.Wait(); err != nil { + return nil, err + } + + // if we have data at the start, we need to merge it with the cached data if it's empty and update the cache. + // If it's not empty only merge the response. + if startResp != nil { + if isEmpty(startResp) { + cachedRequest = cachedRequest.WithStartEndTime(startRequest.GetStartTs(), cachedRequest.GetEndTs()) + updateCache = true + } else { + if startResp.Status != loghttp.QueryStatusSuccess { + return startResp, nil + } + result = mergeLokiResponse(startResp, result) } - result = mergeLokiResponse(extractLokiResponse(lokiReq.GetStartTs(), lokiReq.GetEndTs(), startResp), result) } - } - // if we have data at the end, we need to merge it with the cached data if it's empty and update the cache. - // If it's not empty only merge the response. - if endResp != nil { - if isEmpty(endResp) { - cachedRequest = cachedRequest.WithStartEndTime(cachedRequest.GetStartTs(), endRequest.GetEndTs()) - updateCache = true - } else { - if endResp.Status != loghttp.QueryStatusSuccess { - return endResp, nil + // if we have data at the end, we need to merge it with the cached data if it's empty and update the cache. + // If it's not empty only merge the response. + if endResp != nil { + if isEmpty(endResp) { + cachedRequest = cachedRequest.WithStartEndTime(cachedRequest.GetStartTs(), endRequest.GetEndTs()) + updateCache = true + } else { + if endResp.Status != loghttp.QueryStatusSuccess { + return endResp, nil + } + result = mergeLokiResponse(endResp, result) } - result = mergeLokiResponse(extractLokiResponse(lokiReq.GetStartTs(), lokiReq.GetEndTs(), endResp), result) } } @@ -333,3 +351,11 @@ func emptyResponse(lokiReq *LokiRequest) *LokiResponse { }, } } + +func overlap(aFrom, aThrough, bFrom, bThrough time.Time) bool { + if aFrom.After(bThrough) || bFrom.After(aThrough) { + return false + } + + return true +} diff --git a/pkg/querier/queryrange/log_result_cache_test.go b/pkg/querier/queryrange/log_result_cache_test.go index 5e2f172e50..916a4d20d1 100644 --- a/pkg/querier/queryrange/log_result_cache_test.go +++ b/pkg/querier/queryrange/log_result_cache_test.go @@ -7,6 +7,8 @@ import ( "time" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/weaveworks/common/user" @@ -434,7 +436,10 @@ func Test_LogResultCacheDifferentRangeNonEmptyAndEmpty(t *testing.T) { fake.AssertExpectations(t) } -func Test_LogResultFillingGap(t *testing.T) { +// Test_LogResultNonOverlappingCache tests the scenario where the cached query does not overlap with the new request +func Test_LogResultNonOverlappingCache(t *testing.T) { + metrics := NewLogResultCacheMetrics(prometheus.NewPedanticRegistry()) + mockCache := cache.NewMockCache() var ( ctx = user.InjectOrgID(context.Background(), "foo") lrc = NewLogResultCache( @@ -442,13 +447,18 @@ func Test_LogResultFillingGap(t *testing.T) { fakeLimits{ splits: map[string]time.Duration{"foo": time.Minute}, }, - cache.NewMockCache(), - nil, + mockCache, nil, nil, + metrics, ) ) + checkCacheMetrics := func(expectedHits, expectedMisses int) { + require.Equal(t, float64(expectedHits), testutil.ToFloat64(metrics.CacheHit)) + require.Equal(t, float64(expectedMisses), testutil.ToFloat64(metrics.CacheMiss)) + } + // data requested for just 1 sec, resulting in empty response req1 := &LokiRequest{ StartTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()), @@ -456,16 +466,24 @@ func Test_LogResultFillingGap(t *testing.T) { Limit: entriesLimit, } - // data requested for just 1 sec, within the same split but couple seconds apart + // data requested for just 1 sec(non-overlapping), resulting in empty response req2 := &LokiRequest{ - StartTs: time.Unix(0, time.Minute.Nanoseconds()+35*time.Second.Nanoseconds()), - EndTs: time.Unix(0, time.Minute.Nanoseconds()+36*time.Second.Nanoseconds()), + StartTs: time.Unix(0, time.Minute.Nanoseconds()+24*time.Second.Nanoseconds()), + EndTs: time.Unix(0, time.Minute.Nanoseconds()+25*time.Second.Nanoseconds()), Limit: entriesLimit, } + // data requested for larger interval than req1(overlapping with req2), returns empty response req3 := &LokiRequest{ - StartTs: time.Unix(0, time.Minute.Nanoseconds()+25*time.Second.Nanoseconds()), - EndTs: time.Unix(0, time.Minute.Nanoseconds()+26*time.Second.Nanoseconds()), + StartTs: time.Unix(0, time.Minute.Nanoseconds()+24*time.Second.Nanoseconds()), + EndTs: time.Unix(0, time.Minute.Nanoseconds()+29*time.Second.Nanoseconds()), + Limit: entriesLimit, + } + + // data requested for larger interval than req3(non-overlapping), returns non-empty response + req4 := &LokiRequest{ + StartTs: time.Unix(0, time.Minute.Nanoseconds()+10*time.Second.Nanoseconds()), + EndTs: time.Unix(0, time.Minute.Nanoseconds()+20*time.Second.Nanoseconds()), Limit: entriesLimit, } @@ -476,34 +494,49 @@ func Test_LogResultFillingGap(t *testing.T) { Response: emptyResponse(req1), }, }, - // partial request being made for missing interval at the end + // req2 should do query for just its query range and should not update the cache { RequestResponse: queryrangebase.RequestResponse{ Request: &LokiRequest{ - StartTs: time.Unix(0, time.Minute.Nanoseconds()+31*time.Second.Nanoseconds()), - EndTs: time.Unix(0, time.Minute.Nanoseconds()+36*time.Second.Nanoseconds()), + StartTs: time.Unix(0, time.Minute.Nanoseconds()+24*time.Second.Nanoseconds()), + EndTs: time.Unix(0, time.Minute.Nanoseconds()+25*time.Second.Nanoseconds()), Limit: entriesLimit, }, - Response: nonEmptyResponse(&LokiRequest{ - StartTs: time.Unix(0, time.Minute.Nanoseconds()+31*time.Second.Nanoseconds()), - EndTs: time.Unix(0, time.Minute.Nanoseconds()+36*time.Second.Nanoseconds()), + Response: emptyResponse(&LokiRequest{ + StartTs: time.Unix(0, time.Minute.Nanoseconds()+24*time.Second.Nanoseconds()), + EndTs: time.Unix(0, time.Minute.Nanoseconds()+25*time.Second.Nanoseconds()), Limit: entriesLimit, - }, time.Unix(31, 0), time.Unix(34, 0), lblFooBar), // data not present for actual query interval i.e req2 + }), }, }, - // partial request being made for missing interval at the beginning + // req3 should do query for just its query range and should update the cache { RequestResponse: queryrangebase.RequestResponse{ Request: &LokiRequest{ - StartTs: time.Unix(0, time.Minute.Nanoseconds()+25*time.Second.Nanoseconds()), - EndTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()), + StartTs: time.Unix(0, time.Minute.Nanoseconds()+24*time.Second.Nanoseconds()), + EndTs: time.Unix(0, time.Minute.Nanoseconds()+29*time.Second.Nanoseconds()), + Limit: entriesLimit, + }, + Response: emptyResponse(&LokiRequest{ + StartTs: time.Unix(0, time.Minute.Nanoseconds()+24*time.Second.Nanoseconds()), + EndTs: time.Unix(0, time.Minute.Nanoseconds()+29*time.Second.Nanoseconds()), + Limit: entriesLimit, + }), + }, + }, + // req4 should do query for its query range. Data would be non-empty so cache should not be updated + { + RequestResponse: queryrangebase.RequestResponse{ + Request: &LokiRequest{ + StartTs: time.Unix(0, time.Minute.Nanoseconds()+10*time.Second.Nanoseconds()), + EndTs: time.Unix(0, time.Minute.Nanoseconds()+20*time.Second.Nanoseconds()), Limit: entriesLimit, }, Response: nonEmptyResponse(&LokiRequest{ - StartTs: time.Unix(0, time.Minute.Nanoseconds()+25*time.Second.Nanoseconds()), - EndTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()), + StartTs: time.Unix(0, time.Minute.Nanoseconds()+10*time.Second.Nanoseconds()), + EndTs: time.Unix(0, time.Minute.Nanoseconds()+20*time.Second.Nanoseconds()), Limit: entriesLimit, - }, time.Unix(27, 0), time.Unix(29, 0), lblFooBar), // data not present for actual query interval i.e req3 + }, time.Unix(71, 0), time.Unix(79, 0), lblFooBar), }, }, }) @@ -513,16 +546,37 @@ func Test_LogResultFillingGap(t *testing.T) { resp, err := h.Do(ctx, req1) require.NoError(t, err) require.Equal(t, emptyResponse(req1), resp) + checkCacheMetrics(0, 1) + require.Equal(t, 1, mockCache.NumKeyUpdates()) - // although the caching code would request for more data than the actual query, we should have empty response here since we - // do not have any data for the query we made + // req2 should not update the cache since it has same length as previously cached query resp, err = h.Do(ctx, req2) require.NoError(t, err) - require.Equal(t, mergeLokiResponse(emptyResponse(req1), emptyResponse(req2)), resp) + require.Equal(t, emptyResponse(req2), resp) + checkCacheMetrics(1, 1) + require.Equal(t, 1, mockCache.NumKeyUpdates()) + // req3 should update the cache since it has larger length than previously cached query resp, err = h.Do(ctx, req3) require.NoError(t, err) - require.Equal(t, mergeLokiResponse(emptyResponse(req1), emptyResponse(req3)), resp) + require.Equal(t, emptyResponse(req3), resp) + checkCacheMetrics(2, 1) + require.Equal(t, 2, mockCache.NumKeyUpdates()) + + // req4 returns non-empty response so it should not update the cache + resp, err = h.Do(ctx, req4) + require.NoError(t, err) + require.Equal(t, nonEmptyResponse(req4, time.Unix(71, 0), time.Unix(79, 0), lblFooBar), resp) + checkCacheMetrics(3, 1) + require.Equal(t, 2, mockCache.NumKeyUpdates()) + + // req2 should return back empty response from the cache, without updating the cache + resp, err = h.Do(ctx, req2) + require.NoError(t, err) + require.Equal(t, emptyResponse(req2), resp) + checkCacheMetrics(4, 1) + require.Equal(t, 2, mockCache.NumKeyUpdates()) + fake.AssertExpectations(t) } diff --git a/pkg/storage/chunk/cache/mock.go b/pkg/storage/chunk/cache/mock.go index db672a1df2..a477de40b9 100644 --- a/pkg/storage/chunk/cache/mock.go +++ b/pkg/storage/chunk/cache/mock.go @@ -7,7 +7,13 @@ import ( "github.com/grafana/loki/pkg/logqlmodel/stats" ) +type MockCache interface { + Cache + NumKeyUpdates() int +} + type mockCache struct { + numKeyUpdates int sync.Mutex cache map[string][]byte } @@ -17,6 +23,7 @@ func (m *mockCache) Store(_ context.Context, keys []string, bufs [][]byte) error defer m.Unlock() for i := range keys { m.cache[keys[i]] = bufs[i] + m.numKeyUpdates++ } return nil } @@ -43,8 +50,12 @@ func (m *mockCache) GetCacheType() stats.CacheType { return "mock" } +func (m *mockCache) NumKeyUpdates() int { + return m.numKeyUpdates +} + // NewMockCache makes a new MockCache. -func NewMockCache() Cache { +func NewMockCache() MockCache { return &mockCache{ cache: map[string][]byte{}, }