caching: do not try to fill the gap in log results cache when the new query interval does not overlap the cached query interval (#9757)

**What this PR does / why we need it**:
Currently, when we find a relevant cached negative response for a logs
query, we do the following:
* If the cached query completely covers the new query:
  * return back an empty response.
* else:
  * fill the gaps on either/both sides of the cached query.

The problem with filling the gaps is that when the cached query does not
overlap at all with the new query, we have to extend the query beyond
what the query requests for. However, with the logs query, we have a
limit on the number of lines we can send back in the response. So, this
could result in the query response having logs which were not requested
by the query, which then get filtered out by the [response
extractor](b78d3f0552/pkg/querier/queryrange/log_result_cache.go (L299)),
unexpectedly resulting in an empty response. For example, if the query
was cached for start=15, end=20 and we get a `backwards` query for
start=5, end=10. To fill the gap, the query would be executed for
start=5, end=15. Now, if we have logs more than the query `limit` in the
range 10-15, we would filter out all the data in the response extractor
and send back an empty response to the user.

This PR fixes the issue by doing the following changes when handling
cache hit:
* If the cached query completely covers the new query:
  * return back an empty response[_existing_].
* else if the cached query does not overlap with the new query:
  * do the new query as requested.
* If the new query results in an empty response and has a higher
interval than the cached query:
    * update the cache
* else:
  * query the data for missing intervals on both/either side[_existing_]
* update the cache with extended intervals if the new queries resulted
in an empty response[_existing_]

**Special notes for your reviewer**:
We could do further improvements in the handling of queries not
overlapping with cached query by selectively extending the queries based
on query direction and cached query lying before/after the new query.
For example, if the new query is doing `backwards` query and the
`cachedQuery.End` < `newQuery.Start`, it should be okay to extend the
query and do `cachedQuery.End` to `newQuery.End` to fill the cache since
query would first fill the most relevant data before hitting the limits.
I did not want to complicate the fix so went without implementing this
approach. We can revisit later if we feel we need to improve our
caching.

**Checklist**
- [x] Tests updated
- [x] `CHANGELOG.md` updated

---------

Co-authored-by: Travis Patterson <travis.patterson@grafana.com>
pull/8178/head^2
Sandeep Sukhani 3 years ago committed by GitHub
parent 0da462558f
commit 3e1f2fc273
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 154
      pkg/querier/queryrange/log_result_cache.go
  3. 104
      pkg/querier/queryrange/log_result_cache_test.go
  4. 13
      pkg/storage/chunk/cache/mock.go

@ -56,6 +56,7 @@
* [9495](https://github.com/grafana/loki/pull/9495) **thampiotr**: Promtail: Fix potential goroutine leak in file tailer.
* [9650](https://github.com/grafana/loki/pull/9650) **ashwanthgoli**: Config: ensure storage config defaults apply to named stores.
* [9629](https://github.com/grafana/loki/pull/9629) **periklis**: Fix duplicate label values from ingester streams.
* [9757](https://github.com/grafana/loki/pull/9757) **sandeepsukhani**: Frontend Caching: Fix a bug in negative logs results cache causing Loki to unexpectedly send empty/incorrect results.
* [9754](https://github.com/grafana/loki/pull/9754) **ashwanthgoli**: Fixes an issue with indexes becoming unqueriable if the index prefix is different from the one configured in the latest period config.
* [9763](https://github.com/grafana/loki/pull/9763) **ssncferreira**: Fix the logic of the `offset` operator for downstream queries on instant query splitting of (range) vector aggregation expressions containing an offset.
* [9773](https://github.com/grafana/loki/pull/9773) **ssncferreira**: Fix instant query summary statistic's `splits` corresponding to the number of subqueries a query is split into based on `split_queries_by_interval`.

@ -187,77 +187,95 @@ func (l *logResultCache) handleHit(ctx context.Context, cacheKey string, cachedR
if cachedRequest.StartTs.UnixNano() <= lokiReq.StartTs.UnixNano() && cachedRequest.EndTs.UnixNano() >= lokiReq.EndTs.UnixNano() {
return result, nil
}
// we could be missing data at the start and the end.
// so we're going to fetch what is missing.
var (
startRequest, endRequest *LokiRequest
startResp, endResp *LokiResponse
updateCache bool
ok bool
)
g, ctx := errgroup.WithContext(ctx)
// if we're missing data at the start, start fetching from the start to the cached start.
if lokiReq.GetStartTs().Before(cachedRequest.GetStartTs()) {
g.Go(func() error {
startRequest = lokiReq.WithStartEndTime(lokiReq.GetStartTs(), cachedRequest.GetStartTs())
resp, err := l.next.Do(ctx, startRequest)
if err != nil {
return err
}
startResp, ok = resp.(*LokiResponse)
if !ok {
return fmt.Errorf("unexpected response type %T", resp)
}
return nil
})
}
// if we're missing data at the end, start fetching from the cached end to the end.
if lokiReq.GetEndTs().After(cachedRequest.GetEndTs()) {
g.Go(func() error {
endRequest = lokiReq.WithStartEndTime(cachedRequest.GetEndTs(), lokiReq.GetEndTs())
resp, err := l.next.Do(ctx, endRequest)
if err != nil {
return err
}
endResp, ok = resp.(*LokiResponse)
if !ok {
return fmt.Errorf("unexpected response type %T", resp)
}
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
updateCache := false
// if the query does not overlap cached interval, do not try to fill the gap since it requires extending the queries beyond what is requested in the query.
// Extending the queries beyond what is requested could result in empty responses due to response limit set in the queries.
if !overlap(lokiReq.StartTs, lokiReq.EndTs, cachedRequest.StartTs, cachedRequest.EndTs) {
resp, err := l.next.Do(ctx, lokiReq)
if err != nil {
return nil, err
}
result = resp.(*LokiResponse)
// if we have data at the start, we need to merge it with the cached data if it's empty and update the cache.
// If it's not empty only merge the response.
if startResp != nil {
if isEmpty(startResp) {
cachedRequest = cachedRequest.WithStartEndTime(startRequest.GetStartTs(), cachedRequest.GetEndTs())
// if the response is empty and the query is larger than what is cached, update the cache
if isEmpty(result) && (lokiReq.EndTs.UnixNano()-lokiReq.StartTs.UnixNano() > cachedRequest.EndTs.UnixNano()-cachedRequest.StartTs.UnixNano()) {
cachedRequest = cachedRequest.WithStartEndTime(lokiReq.GetStartTs(), lokiReq.GetEndTs())
updateCache = true
} else {
if startResp.Status != loghttp.QueryStatusSuccess {
return startResp, nil
}
} else {
// we could be missing data at the start and the end.
// so we're going to fetch what is missing.
var (
startRequest, endRequest *LokiRequest
startResp, endResp *LokiResponse
)
g, ctx := errgroup.WithContext(ctx)
// if we're missing data at the start, start fetching from the start to the cached start.
if lokiReq.GetStartTs().Before(cachedRequest.GetStartTs()) {
g.Go(func() error {
startRequest = lokiReq.WithStartEndTime(lokiReq.GetStartTs(), cachedRequest.GetStartTs())
resp, err := l.next.Do(ctx, startRequest)
if err != nil {
return err
}
var ok bool
startResp, ok = resp.(*LokiResponse)
if !ok {
return fmt.Errorf("unexpected response type %T", resp)
}
return nil
})
}
// if we're missing data at the end, start fetching from the cached end to the end.
if lokiReq.GetEndTs().After(cachedRequest.GetEndTs()) {
g.Go(func() error {
endRequest = lokiReq.WithStartEndTime(cachedRequest.GetEndTs(), lokiReq.GetEndTs())
resp, err := l.next.Do(ctx, endRequest)
if err != nil {
return err
}
var ok bool
endResp, ok = resp.(*LokiResponse)
if !ok {
return fmt.Errorf("unexpected response type %T", resp)
}
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
// if we have data at the start, we need to merge it with the cached data if it's empty and update the cache.
// If it's not empty only merge the response.
if startResp != nil {
if isEmpty(startResp) {
cachedRequest = cachedRequest.WithStartEndTime(startRequest.GetStartTs(), cachedRequest.GetEndTs())
updateCache = true
} else {
if startResp.Status != loghttp.QueryStatusSuccess {
return startResp, nil
}
result = mergeLokiResponse(startResp, result)
}
result = mergeLokiResponse(extractLokiResponse(lokiReq.GetStartTs(), lokiReq.GetEndTs(), startResp), result)
}
}
// if we have data at the end, we need to merge it with the cached data if it's empty and update the cache.
// If it's not empty only merge the response.
if endResp != nil {
if isEmpty(endResp) {
cachedRequest = cachedRequest.WithStartEndTime(cachedRequest.GetStartTs(), endRequest.GetEndTs())
updateCache = true
} else {
if endResp.Status != loghttp.QueryStatusSuccess {
return endResp, nil
// if we have data at the end, we need to merge it with the cached data if it's empty and update the cache.
// If it's not empty only merge the response.
if endResp != nil {
if isEmpty(endResp) {
cachedRequest = cachedRequest.WithStartEndTime(cachedRequest.GetStartTs(), endRequest.GetEndTs())
updateCache = true
} else {
if endResp.Status != loghttp.QueryStatusSuccess {
return endResp, nil
}
result = mergeLokiResponse(endResp, result)
}
result = mergeLokiResponse(extractLokiResponse(lokiReq.GetStartTs(), lokiReq.GetEndTs(), endResp), result)
}
}
@ -333,3 +351,11 @@ func emptyResponse(lokiReq *LokiRequest) *LokiResponse {
},
}
}
func overlap(aFrom, aThrough, bFrom, bThrough time.Time) bool {
if aFrom.After(bThrough) || bFrom.After(aThrough) {
return false
}
return true
}

@ -7,6 +7,8 @@ import (
"time"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
@ -434,7 +436,10 @@ func Test_LogResultCacheDifferentRangeNonEmptyAndEmpty(t *testing.T) {
fake.AssertExpectations(t)
}
func Test_LogResultFillingGap(t *testing.T) {
// Test_LogResultNonOverlappingCache tests the scenario where the cached query does not overlap with the new request
func Test_LogResultNonOverlappingCache(t *testing.T) {
metrics := NewLogResultCacheMetrics(prometheus.NewPedanticRegistry())
mockCache := cache.NewMockCache()
var (
ctx = user.InjectOrgID(context.Background(), "foo")
lrc = NewLogResultCache(
@ -442,13 +447,18 @@ func Test_LogResultFillingGap(t *testing.T) {
fakeLimits{
splits: map[string]time.Duration{"foo": time.Minute},
},
cache.NewMockCache(),
nil,
mockCache,
nil,
nil,
metrics,
)
)
checkCacheMetrics := func(expectedHits, expectedMisses int) {
require.Equal(t, float64(expectedHits), testutil.ToFloat64(metrics.CacheHit))
require.Equal(t, float64(expectedMisses), testutil.ToFloat64(metrics.CacheMiss))
}
// data requested for just 1 sec, resulting in empty response
req1 := &LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()),
@ -456,16 +466,24 @@ func Test_LogResultFillingGap(t *testing.T) {
Limit: entriesLimit,
}
// data requested for just 1 sec, within the same split but couple seconds apart
// data requested for just 1 sec(non-overlapping), resulting in empty response
req2 := &LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()+35*time.Second.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+36*time.Second.Nanoseconds()),
StartTs: time.Unix(0, time.Minute.Nanoseconds()+24*time.Second.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+25*time.Second.Nanoseconds()),
Limit: entriesLimit,
}
// data requested for larger interval than req1(overlapping with req2), returns empty response
req3 := &LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()+25*time.Second.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+26*time.Second.Nanoseconds()),
StartTs: time.Unix(0, time.Minute.Nanoseconds()+24*time.Second.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+29*time.Second.Nanoseconds()),
Limit: entriesLimit,
}
// data requested for larger interval than req3(non-overlapping), returns non-empty response
req4 := &LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()+10*time.Second.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+20*time.Second.Nanoseconds()),
Limit: entriesLimit,
}
@ -476,34 +494,49 @@ func Test_LogResultFillingGap(t *testing.T) {
Response: emptyResponse(req1),
},
},
// partial request being made for missing interval at the end
// req2 should do query for just its query range and should not update the cache
{
RequestResponse: queryrangebase.RequestResponse{
Request: &LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()+31*time.Second.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+36*time.Second.Nanoseconds()),
StartTs: time.Unix(0, time.Minute.Nanoseconds()+24*time.Second.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+25*time.Second.Nanoseconds()),
Limit: entriesLimit,
},
Response: nonEmptyResponse(&LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()+31*time.Second.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+36*time.Second.Nanoseconds()),
Response: emptyResponse(&LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()+24*time.Second.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+25*time.Second.Nanoseconds()),
Limit: entriesLimit,
}, time.Unix(31, 0), time.Unix(34, 0), lblFooBar), // data not present for actual query interval i.e req2
}),
},
},
// partial request being made for missing interval at the beginning
// req3 should do query for just its query range and should update the cache
{
RequestResponse: queryrangebase.RequestResponse{
Request: &LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()+25*time.Second.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()),
StartTs: time.Unix(0, time.Minute.Nanoseconds()+24*time.Second.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+29*time.Second.Nanoseconds()),
Limit: entriesLimit,
},
Response: emptyResponse(&LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()+24*time.Second.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+29*time.Second.Nanoseconds()),
Limit: entriesLimit,
}),
},
},
// req4 should do query for its query range. Data would be non-empty so cache should not be updated
{
RequestResponse: queryrangebase.RequestResponse{
Request: &LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()+10*time.Second.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+20*time.Second.Nanoseconds()),
Limit: entriesLimit,
},
Response: nonEmptyResponse(&LokiRequest{
StartTs: time.Unix(0, time.Minute.Nanoseconds()+25*time.Second.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+30*time.Second.Nanoseconds()),
StartTs: time.Unix(0, time.Minute.Nanoseconds()+10*time.Second.Nanoseconds()),
EndTs: time.Unix(0, time.Minute.Nanoseconds()+20*time.Second.Nanoseconds()),
Limit: entriesLimit,
}, time.Unix(27, 0), time.Unix(29, 0), lblFooBar), // data not present for actual query interval i.e req3
}, time.Unix(71, 0), time.Unix(79, 0), lblFooBar),
},
},
})
@ -513,16 +546,37 @@ func Test_LogResultFillingGap(t *testing.T) {
resp, err := h.Do(ctx, req1)
require.NoError(t, err)
require.Equal(t, emptyResponse(req1), resp)
checkCacheMetrics(0, 1)
require.Equal(t, 1, mockCache.NumKeyUpdates())
// although the caching code would request for more data than the actual query, we should have empty response here since we
// do not have any data for the query we made
// req2 should not update the cache since it has same length as previously cached query
resp, err = h.Do(ctx, req2)
require.NoError(t, err)
require.Equal(t, mergeLokiResponse(emptyResponse(req1), emptyResponse(req2)), resp)
require.Equal(t, emptyResponse(req2), resp)
checkCacheMetrics(1, 1)
require.Equal(t, 1, mockCache.NumKeyUpdates())
// req3 should update the cache since it has larger length than previously cached query
resp, err = h.Do(ctx, req3)
require.NoError(t, err)
require.Equal(t, mergeLokiResponse(emptyResponse(req1), emptyResponse(req3)), resp)
require.Equal(t, emptyResponse(req3), resp)
checkCacheMetrics(2, 1)
require.Equal(t, 2, mockCache.NumKeyUpdates())
// req4 returns non-empty response so it should not update the cache
resp, err = h.Do(ctx, req4)
require.NoError(t, err)
require.Equal(t, nonEmptyResponse(req4, time.Unix(71, 0), time.Unix(79, 0), lblFooBar), resp)
checkCacheMetrics(3, 1)
require.Equal(t, 2, mockCache.NumKeyUpdates())
// req2 should return back empty response from the cache, without updating the cache
resp, err = h.Do(ctx, req2)
require.NoError(t, err)
require.Equal(t, emptyResponse(req2), resp)
checkCacheMetrics(4, 1)
require.Equal(t, 2, mockCache.NumKeyUpdates())
fake.AssertExpectations(t)
}

@ -7,7 +7,13 @@ import (
"github.com/grafana/loki/pkg/logqlmodel/stats"
)
type MockCache interface {
Cache
NumKeyUpdates() int
}
type mockCache struct {
numKeyUpdates int
sync.Mutex
cache map[string][]byte
}
@ -17,6 +23,7 @@ func (m *mockCache) Store(_ context.Context, keys []string, bufs [][]byte) error
defer m.Unlock()
for i := range keys {
m.cache[keys[i]] = bufs[i]
m.numKeyUpdates++
}
return nil
}
@ -43,8 +50,12 @@ func (m *mockCache) GetCacheType() stats.CacheType {
return "mock"
}
func (m *mockCache) NumKeyUpdates() int {
return m.numKeyUpdates
}
// NewMockCache makes a new MockCache.
func NewMockCache() Cache {
func NewMockCache() MockCache {
return &mockCache{
cache: map[string][]byte{},
}

Loading…
Cancel
Save