diff --git a/CHANGELOG.md b/CHANGELOG.md index db221b15cb..34445ad9cf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -56,6 +56,7 @@ * [9495](https://github.com/grafana/loki/pull/9495) **thampiotr**: Promtail: Fix potential goroutine leak in file tailer. * [9650](https://github.com/grafana/loki/pull/9650) **ashwanthgoli**: Config: ensure storage config defaults apply to named stores. * [9629](https://github.com/grafana/loki/pull/9629) **periklis**: Fix duplicate label values from ingester streams. +* [9757](https://github.com/grafana/loki/pull/9757) **sandeepsukhani**: Frontend Caching: Fix a bug in negative logs results cache causing Loki to unexpectedly send empty/incorrect results. * [9754](https://github.com/grafana/loki/pull/9754) **ashwanthgoli**: Fixes an issue with indexes becoming unqueriable if the index prefix is different from the one configured in the latest period config. * [9763](https://github.com/grafana/loki/pull/9763) **ssncferreira**: Fix the logic of the `offset` operator for downstream queries on instant query splitting of (range) vector aggregation expressions containing an offset. * [9773](https://github.com/grafana/loki/pull/9773) **ssncferreira**: Fix instant query summary statistic's `splits` corresponding to the number of subqueries a query is split into based on `split_queries_by_interval`. diff --git a/pkg/querier/queryrange/log_result_cache.go b/pkg/querier/queryrange/log_result_cache.go index 0d00a0d134..827229dd0d 100644 --- a/pkg/querier/queryrange/log_result_cache.go +++ b/pkg/querier/queryrange/log_result_cache.go @@ -187,77 +187,95 @@ func (l *logResultCache) handleHit(ctx context.Context, cacheKey string, cachedR if cachedRequest.StartTs.UnixNano() <= lokiReq.StartTs.UnixNano() && cachedRequest.EndTs.UnixNano() >= lokiReq.EndTs.UnixNano() { return result, nil } - // we could be missing data at the start and the end. - // so we're going to fetch what is missing. - var ( - startRequest, endRequest *LokiRequest - startResp, endResp *LokiResponse - updateCache bool - ok bool - ) - g, ctx := errgroup.WithContext(ctx) - - // if we're missing data at the start, start fetching from the start to the cached start. - if lokiReq.GetStartTs().Before(cachedRequest.GetStartTs()) { - g.Go(func() error { - startRequest = lokiReq.WithStartEndTime(lokiReq.GetStartTs(), cachedRequest.GetStartTs()) - resp, err := l.next.Do(ctx, startRequest) - if err != nil { - return err - } - startResp, ok = resp.(*LokiResponse) - if !ok { - return fmt.Errorf("unexpected response type %T", resp) - } - return nil - }) - } - - // if we're missing data at the end, start fetching from the cached end to the end. - if lokiReq.GetEndTs().After(cachedRequest.GetEndTs()) { - g.Go(func() error { - endRequest = lokiReq.WithStartEndTime(cachedRequest.GetEndTs(), lokiReq.GetEndTs()) - resp, err := l.next.Do(ctx, endRequest) - if err != nil { - return err - } - endResp, ok = resp.(*LokiResponse) - if !ok { - return fmt.Errorf("unexpected response type %T", resp) - } - return nil - }) - } - if err := g.Wait(); err != nil { - return nil, err - } + updateCache := false + // if the query does not overlap cached interval, do not try to fill the gap since it requires extending the queries beyond what is requested in the query. + // Extending the queries beyond what is requested could result in empty responses due to response limit set in the queries. + if !overlap(lokiReq.StartTs, lokiReq.EndTs, cachedRequest.StartTs, cachedRequest.EndTs) { + resp, err := l.next.Do(ctx, lokiReq) + if err != nil { + return nil, err + } + result = resp.(*LokiResponse) - // if we have data at the start, we need to merge it with the cached data if it's empty and update the cache. - // If it's not empty only merge the response. - if startResp != nil { - if isEmpty(startResp) { - cachedRequest = cachedRequest.WithStartEndTime(startRequest.GetStartTs(), cachedRequest.GetEndTs()) + // if the response is empty and the query is larger than what is cached, update the cache + if isEmpty(result) && (lokiReq.EndTs.UnixNano()-lokiReq.StartTs.UnixNano() > cachedRequest.EndTs.UnixNano()-cachedRequest.StartTs.UnixNano()) { + cachedRequest = cachedRequest.WithStartEndTime(lokiReq.GetStartTs(), lokiReq.GetEndTs()) updateCache = true - } else { - if startResp.Status != loghttp.QueryStatusSuccess { - return startResp, nil + } + } else { + // we could be missing data at the start and the end. + // so we're going to fetch what is missing. + var ( + startRequest, endRequest *LokiRequest + startResp, endResp *LokiResponse + ) + g, ctx := errgroup.WithContext(ctx) + + // if we're missing data at the start, start fetching from the start to the cached start. + if lokiReq.GetStartTs().Before(cachedRequest.GetStartTs()) { + g.Go(func() error { + startRequest = lokiReq.WithStartEndTime(lokiReq.GetStartTs(), cachedRequest.GetStartTs()) + resp, err := l.next.Do(ctx, startRequest) + if err != nil { + return err + } + var ok bool + startResp, ok = resp.(*LokiResponse) + if !ok { + return fmt.Errorf("unexpected response type %T", resp) + } + return nil + }) + } + + // if we're missing data at the end, start fetching from the cached end to the end. + if lokiReq.GetEndTs().After(cachedRequest.GetEndTs()) { + g.Go(func() error { + endRequest = lokiReq.WithStartEndTime(cachedRequest.GetEndTs(), lokiReq.GetEndTs()) + resp, err := l.next.Do(ctx, endRequest) + if err != nil { + return err + } + var ok bool + endResp, ok = resp.(*LokiResponse) + if !ok { + return fmt.Errorf("unexpected response type %T", resp) + } + return nil + }) + } + + if err := g.Wait(); err != nil { + return nil, err + } + + // if we have data at the start, we need to merge it with the cached data if it's empty and update the cache. + // If it's not empty only merge the response. + if startResp != nil { + if isEmpty(startResp) { + cachedRequest = cachedRequest.WithStartEndTime(startRequest.GetStartTs(), cachedRequest.GetEndTs()) + updateCache = true + } else { + if startResp.Status != loghttp.QueryStatusSuccess { + return startResp, nil + } + result = mergeLokiResponse(startResp, result) } - result = mergeLokiResponse(extractLokiResponse(lokiReq.GetStartTs(), lokiReq.GetEndTs(), startResp), result) } - } - // if we have data at the end, we need to merge it with the cached data if it's empty and update the cache. - // If it's not empty only merge the response. - if endResp != nil { - if isEmpty(endResp) { - cachedRequest = cachedRequest.WithStartEndTime(cachedRequest.GetStartTs(), endRequest.GetEndTs()) - updateCache = true - } else { - if endResp.Status != loghttp.QueryStatusSuccess { - return endResp, nil + // if we have data at the end, we need to merge it with the cached data if it's empty and update the cache. + // If it's not empty only merge the response. + if endResp != nil { + if isEmpty(endResp) { + cachedRequest = cachedRequest.WithStartEndTime(cachedRequest.GetStartTs(), endRequest.GetEndTs()) + updateCache = true + } else { + if endResp.Status != loghttp.QueryStatusSuccess { + return endResp, nil + } + result = mergeLokiResponse(endResp, result) } - result = mergeLokiResponse(extractLokiResponse(lokiReq.GetStartTs(), lokiReq.GetEndTs(), endResp), result) } } @@ -333,3 +351,11 @@ func emptyResponse(lokiReq *LokiRequest) *LokiResponse { }, } } + +func overlap(aFrom, aThrough, bFrom, bThrough time.Time) bool { + if aFrom.After(bThrough) || bFrom.After(aThrough) { + return false + } + + return true +} diff --git a/pkg/querier/queryrange/log_result_cache_test.go b/pkg/querier/queryrange/log_result_cache_test.go index 5e2f172e50..916a4d20d1 100644 --- a/pkg/querier/queryrange/log_result_cache_test.go +++ b/pkg/querier/queryrange/log_result_cache_test.go @@ -7,6 +7,8 @@ import ( "time" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/weaveworks/common/user" @@ -434,7 +436,10 @@ func Test_LogResultCacheDifferentRangeNonEmptyAndEmpty(t *testing.T) { fake.AssertExpectations(t) } -func Test_LogResultFillingGap(t *testing.T) { +// Test_LogResultNonOverlappingCache tests the scenario where the cached query does not overlap with the new request +func Test_LogResultNonOverlappingCache(t *testing.T) { + metrics := NewLogResultCacheMetrics(prometheus.NewPedanticRegistry()) + mockCache := cache.NewMockCache() var ( ctx = user.InjectOrgID(context.Background(), "foo") lrc = NewLogResultCache( @@ -442,13 +447,18 @@ func Test_LogResultFillingGap(t *testing.T) { fakeLimits{ splits: map[string]time.Duration{"foo": time.Minute}, }, - cache.NewMockCache(), - nil, + mockCache, nil, nil, + metrics, ) ) + checkCacheMetrics := func(expectedHits, expectedMisses int) { + require.Equal(t, float64(expectedHits), testutil.ToFloat64(metrics.CacheHit)) + require.Equal(t, float64(expectedMisses), testutil.ToFloat64(metrics.CacheMiss)) + } + // data requested for just 1 sec, resulting in empty response req1 := &LokiRequest{ StartTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()), @@ -456,16 +466,24 @@ func Test_LogResultFillingGap(t *testing.T) { Limit: entriesLimit, } - // data requested for just 1 sec, within the same split but couple seconds apart + // data requested for just 1 sec(non-overlapping), resulting in empty response req2 := &LokiRequest{ - StartTs: time.Unix(0, time.Minute.Nanoseconds()+35*time.Second.Nanoseconds()), - EndTs: time.Unix(0, time.Minute.Nanoseconds()+36*time.Second.Nanoseconds()), + StartTs: time.Unix(0, time.Minute.Nanoseconds()+24*time.Second.Nanoseconds()), + EndTs: time.Unix(0, time.Minute.Nanoseconds()+25*time.Second.Nanoseconds()), Limit: entriesLimit, } + // data requested for larger interval than req1(overlapping with req2), returns empty response req3 := &LokiRequest{ - StartTs: time.Unix(0, time.Minute.Nanoseconds()+25*time.Second.Nanoseconds()), - EndTs: time.Unix(0, time.Minute.Nanoseconds()+26*time.Second.Nanoseconds()), + StartTs: time.Unix(0, time.Minute.Nanoseconds()+24*time.Second.Nanoseconds()), + EndTs: time.Unix(0, time.Minute.Nanoseconds()+29*time.Second.Nanoseconds()), + Limit: entriesLimit, + } + + // data requested for larger interval than req3(non-overlapping), returns non-empty response + req4 := &LokiRequest{ + StartTs: time.Unix(0, time.Minute.Nanoseconds()+10*time.Second.Nanoseconds()), + EndTs: time.Unix(0, time.Minute.Nanoseconds()+20*time.Second.Nanoseconds()), Limit: entriesLimit, } @@ -476,34 +494,49 @@ func Test_LogResultFillingGap(t *testing.T) { Response: emptyResponse(req1), }, }, - // partial request being made for missing interval at the end + // req2 should do query for just its query range and should not update the cache { RequestResponse: queryrangebase.RequestResponse{ Request: &LokiRequest{ - StartTs: time.Unix(0, time.Minute.Nanoseconds()+31*time.Second.Nanoseconds()), - EndTs: time.Unix(0, time.Minute.Nanoseconds()+36*time.Second.Nanoseconds()), + StartTs: time.Unix(0, time.Minute.Nanoseconds()+24*time.Second.Nanoseconds()), + EndTs: time.Unix(0, time.Minute.Nanoseconds()+25*time.Second.Nanoseconds()), Limit: entriesLimit, }, - Response: nonEmptyResponse(&LokiRequest{ - StartTs: time.Unix(0, time.Minute.Nanoseconds()+31*time.Second.Nanoseconds()), - EndTs: time.Unix(0, time.Minute.Nanoseconds()+36*time.Second.Nanoseconds()), + Response: emptyResponse(&LokiRequest{ + StartTs: time.Unix(0, time.Minute.Nanoseconds()+24*time.Second.Nanoseconds()), + EndTs: time.Unix(0, time.Minute.Nanoseconds()+25*time.Second.Nanoseconds()), Limit: entriesLimit, - }, time.Unix(31, 0), time.Unix(34, 0), lblFooBar), // data not present for actual query interval i.e req2 + }), }, }, - // partial request being made for missing interval at the beginning + // req3 should do query for just its query range and should update the cache { RequestResponse: queryrangebase.RequestResponse{ Request: &LokiRequest{ - StartTs: time.Unix(0, time.Minute.Nanoseconds()+25*time.Second.Nanoseconds()), - EndTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()), + StartTs: time.Unix(0, time.Minute.Nanoseconds()+24*time.Second.Nanoseconds()), + EndTs: time.Unix(0, time.Minute.Nanoseconds()+29*time.Second.Nanoseconds()), + Limit: entriesLimit, + }, + Response: emptyResponse(&LokiRequest{ + StartTs: time.Unix(0, time.Minute.Nanoseconds()+24*time.Second.Nanoseconds()), + EndTs: time.Unix(0, time.Minute.Nanoseconds()+29*time.Second.Nanoseconds()), + Limit: entriesLimit, + }), + }, + }, + // req4 should do query for its query range. Data would be non-empty so cache should not be updated + { + RequestResponse: queryrangebase.RequestResponse{ + Request: &LokiRequest{ + StartTs: time.Unix(0, time.Minute.Nanoseconds()+10*time.Second.Nanoseconds()), + EndTs: time.Unix(0, time.Minute.Nanoseconds()+20*time.Second.Nanoseconds()), Limit: entriesLimit, }, Response: nonEmptyResponse(&LokiRequest{ - StartTs: time.Unix(0, time.Minute.Nanoseconds()+25*time.Second.Nanoseconds()), - EndTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()), + StartTs: time.Unix(0, time.Minute.Nanoseconds()+10*time.Second.Nanoseconds()), + EndTs: time.Unix(0, time.Minute.Nanoseconds()+20*time.Second.Nanoseconds()), Limit: entriesLimit, - }, time.Unix(27, 0), time.Unix(29, 0), lblFooBar), // data not present for actual query interval i.e req3 + }, time.Unix(71, 0), time.Unix(79, 0), lblFooBar), }, }, }) @@ -513,16 +546,37 @@ func Test_LogResultFillingGap(t *testing.T) { resp, err := h.Do(ctx, req1) require.NoError(t, err) require.Equal(t, emptyResponse(req1), resp) + checkCacheMetrics(0, 1) + require.Equal(t, 1, mockCache.NumKeyUpdates()) - // although the caching code would request for more data than the actual query, we should have empty response here since we - // do not have any data for the query we made + // req2 should not update the cache since it has same length as previously cached query resp, err = h.Do(ctx, req2) require.NoError(t, err) - require.Equal(t, mergeLokiResponse(emptyResponse(req1), emptyResponse(req2)), resp) + require.Equal(t, emptyResponse(req2), resp) + checkCacheMetrics(1, 1) + require.Equal(t, 1, mockCache.NumKeyUpdates()) + // req3 should update the cache since it has larger length than previously cached query resp, err = h.Do(ctx, req3) require.NoError(t, err) - require.Equal(t, mergeLokiResponse(emptyResponse(req1), emptyResponse(req3)), resp) + require.Equal(t, emptyResponse(req3), resp) + checkCacheMetrics(2, 1) + require.Equal(t, 2, mockCache.NumKeyUpdates()) + + // req4 returns non-empty response so it should not update the cache + resp, err = h.Do(ctx, req4) + require.NoError(t, err) + require.Equal(t, nonEmptyResponse(req4, time.Unix(71, 0), time.Unix(79, 0), lblFooBar), resp) + checkCacheMetrics(3, 1) + require.Equal(t, 2, mockCache.NumKeyUpdates()) + + // req2 should return back empty response from the cache, without updating the cache + resp, err = h.Do(ctx, req2) + require.NoError(t, err) + require.Equal(t, emptyResponse(req2), resp) + checkCacheMetrics(4, 1) + require.Equal(t, 2, mockCache.NumKeyUpdates()) + fake.AssertExpectations(t) } diff --git a/pkg/storage/chunk/cache/mock.go b/pkg/storage/chunk/cache/mock.go index db672a1df2..a477de40b9 100644 --- a/pkg/storage/chunk/cache/mock.go +++ b/pkg/storage/chunk/cache/mock.go @@ -7,7 +7,13 @@ import ( "github.com/grafana/loki/pkg/logqlmodel/stats" ) +type MockCache interface { + Cache + NumKeyUpdates() int +} + type mockCache struct { + numKeyUpdates int sync.Mutex cache map[string][]byte } @@ -17,6 +23,7 @@ func (m *mockCache) Store(_ context.Context, keys []string, bufs [][]byte) error defer m.Unlock() for i := range keys { m.cache[keys[i]] = bufs[i] + m.numKeyUpdates++ } return nil } @@ -43,8 +50,12 @@ func (m *mockCache) GetCacheType() stats.CacheType { return "mock" } +func (m *mockCache) NumKeyUpdates() int { + return m.numKeyUpdates +} + // NewMockCache makes a new MockCache. -func NewMockCache() Cache { +func NewMockCache() MockCache { return &mockCache{ cache: map[string][]byte{}, }