mirror of https://github.com/grafana/loki
Enable results cache for volume queries (#10403)
This PR enables the results cache for volume queries to the `index/volume` and `index/volume_range` endpointspull/10420/head
parent
60f57607b7
commit
a05744a385
@ -0,0 +1,134 @@ |
||||
package queryrange |
||||
|
||||
import ( |
||||
"context" |
||||
"flag" |
||||
"fmt" |
||||
strings "strings" |
||||
"time" |
||||
|
||||
"github.com/go-kit/log" |
||||
"github.com/go-kit/log/level" |
||||
"github.com/grafana/dskit/tenant" |
||||
"github.com/prometheus/common/model" |
||||
|
||||
"github.com/grafana/loki/pkg/logproto" |
||||
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" |
||||
"github.com/grafana/loki/pkg/storage/chunk/cache" |
||||
"github.com/grafana/loki/pkg/util" |
||||
"github.com/grafana/loki/pkg/util/validation" |
||||
) |
||||
|
||||
type VolumeSplitter struct { |
||||
cacheKeyLimits |
||||
} |
||||
|
||||
// GenerateCacheKey generates a cache key based on the userID, Request and interval.
|
||||
func (i VolumeSplitter) GenerateCacheKey(ctx context.Context, userID string, r queryrangebase.Request) string { |
||||
cacheKey := i.cacheKeyLimits.GenerateCacheKey(ctx, userID, r) |
||||
|
||||
volumeReq := r.(*logproto.VolumeRequest) |
||||
limit := volumeReq.GetLimit() |
||||
aggregateBy := volumeReq.GetAggregateBy() |
||||
targetLabels := volumeReq.GetTargetLabels() |
||||
return fmt.Sprintf("volume:%s:%d:%s:%s", cacheKey, limit, aggregateBy, strings.Join(targetLabels, ",")) |
||||
} |
||||
|
||||
type VolumeExtractor struct{} |
||||
|
||||
// Extract favors the ability to cache over exactness of results. It assumes a constant distribution
|
||||
// of log volumes over a range and will extract subsets proportionally.
|
||||
func (p VolumeExtractor) Extract(start, end int64, res queryrangebase.Response, resStart, resEnd int64) queryrangebase.Response { |
||||
factor := util.GetFactorOfTime(start, end, resStart, resEnd) |
||||
|
||||
volumeRes := res.(*VolumeResponse) |
||||
volumes := volumeRes.Response.GetVolumes() |
||||
for i, v := range volumes { |
||||
volumes[i].Volume = uint64(float64(v.Volume) * factor) |
||||
} |
||||
return &VolumeResponse{ |
||||
Response: &logproto.VolumeResponse{ |
||||
Volumes: volumes, |
||||
Limit: volumeRes.Response.GetLimit(), |
||||
}, |
||||
} |
||||
} |
||||
|
||||
func (p VolumeExtractor) ResponseWithoutHeaders(resp queryrangebase.Response) queryrangebase.Response { |
||||
volumeRes := resp.(*VolumeResponse) |
||||
return &VolumeResponse{ |
||||
Response: volumeRes.Response, |
||||
} |
||||
} |
||||
|
||||
type VolumeCacheConfig struct { |
||||
queryrangebase.ResultsCacheConfig `yaml:",inline"` |
||||
} |
||||
|
||||
// RegisterFlags registers flags.
|
||||
func (cfg *VolumeCacheConfig) RegisterFlags(f *flag.FlagSet) { |
||||
cfg.RegisterFlagsWithPrefix(f, "frontend.volume-results-cache.") |
||||
} |
||||
|
||||
func (cfg *VolumeCacheConfig) Validate() error { |
||||
return cfg.ResultsCacheConfig.Validate() |
||||
} |
||||
|
||||
// volumeCacheMiddlewareNowTimeFunc is a function that returns the current time.
|
||||
// It is used to allow tests to override the current time.
|
||||
var volumeCacheMiddlewareNowTimeFunc = model.Now |
||||
|
||||
// shouldCacheVolume returns true if the request should be cached.
|
||||
// It returns false if:
|
||||
// - The request end time falls within the max_stats_cache_freshness duration.
|
||||
func shouldCacheVolume(ctx context.Context, req queryrangebase.Request, lim Limits) (bool, error) { |
||||
tenantIDs, err := tenant.TenantIDs(ctx) |
||||
if err != nil { |
||||
return false, err |
||||
} |
||||
|
||||
cacheFreshnessCapture := func(id string) time.Duration { return lim.MaxStatsCacheFreshness(ctx, id) } |
||||
maxCacheFreshness := validation.MaxDurationPerTenant(tenantIDs, cacheFreshnessCapture) |
||||
|
||||
now := volumeCacheMiddlewareNowTimeFunc() |
||||
return maxCacheFreshness == 0 || model.Time(req.GetEnd()).Before(now.Add(-maxCacheFreshness)), nil |
||||
} |
||||
|
||||
func NewVolumeCacheMiddleware( |
||||
log log.Logger, |
||||
limits Limits, |
||||
merger queryrangebase.Merger, |
||||
c cache.Cache, |
||||
cacheGenNumberLoader queryrangebase.CacheGenNumberLoader, |
||||
shouldCache queryrangebase.ShouldCacheFn, |
||||
parallelismForReq func(ctx context.Context, tenantIDs []string, r queryrangebase.Request) int, |
||||
retentionEnabled bool, |
||||
transformer UserIDTransformer, |
||||
metrics *queryrangebase.ResultsCacheMetrics, |
||||
) (queryrangebase.Middleware, error) { |
||||
return queryrangebase.NewResultsCacheMiddleware( |
||||
log, |
||||
c, |
||||
VolumeSplitter{cacheKeyLimits{limits, transformer}}, |
||||
limits, |
||||
merger, |
||||
VolumeExtractor{}, |
||||
cacheGenNumberLoader, |
||||
func(ctx context.Context, r queryrangebase.Request) bool { |
||||
if shouldCache != nil && !shouldCache(ctx, r) { |
||||
return false |
||||
} |
||||
|
||||
cacheStats, err := shouldCacheVolume(ctx, r, limits) |
||||
if err != nil { |
||||
level.Error(log).Log("msg", "failed to determine if volume should be cached. Won't cache", "err", err) |
||||
return false |
||||
} |
||||
|
||||
return cacheStats |
||||
}, |
||||
parallelismForReq, |
||||
retentionEnabled, |
||||
metrics, |
||||
) |
||||
} |
@ -0,0 +1,334 @@ |
||||
package queryrange |
||||
|
||||
import ( |
||||
"context" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/go-kit/log" |
||||
"github.com/grafana/dskit/user" |
||||
"github.com/prometheus/common/model" |
||||
"github.com/stretchr/testify/require" |
||||
|
||||
"github.com/grafana/loki/pkg/logproto" |
||||
"github.com/grafana/loki/pkg/logqlmodel/stats" |
||||
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" |
||||
"github.com/grafana/loki/pkg/storage/chunk/cache" |
||||
"github.com/grafana/loki/pkg/storage/stores/index/seriesvolume" |
||||
"github.com/grafana/loki/pkg/util" |
||||
) |
||||
|
||||
func TestVolumeCache(t *testing.T) { |
||||
setup := func(volResp *VolumeResponse) (*int, queryrangebase.Handler) { |
||||
cfg := queryrangebase.ResultsCacheConfig{ |
||||
CacheConfig: cache.Config{ |
||||
Cache: cache.NewMockCache(), |
||||
}, |
||||
} |
||||
c, err := cache.New(cfg.CacheConfig, nil, log.NewNopLogger(), stats.ResultCache) |
||||
require.NoError(t, err) |
||||
cacheMiddleware, err := NewVolumeCacheMiddleware( |
||||
log.NewNopLogger(), |
||||
WithSplitByLimits(fakeLimits{}, 24*time.Hour), |
||||
DefaultCodec, |
||||
c, |
||||
nil, |
||||
nil, |
||||
func(_ context.Context, _ []string, _ queryrangebase.Request) int { |
||||
return 1 |
||||
}, |
||||
false, |
||||
nil, |
||||
nil, |
||||
) |
||||
require.NoError(t, err) |
||||
|
||||
calls, volHandler := volumeResultHandler(volResp) |
||||
rc := cacheMiddleware.Wrap(volHandler) |
||||
|
||||
return calls, rc |
||||
} |
||||
|
||||
t.Run("caches the response for the same request", func(t *testing.T) { |
||||
volResp := &VolumeResponse{ |
||||
Response: &logproto.VolumeResponse{ |
||||
Volumes: []logproto.Volume{ |
||||
{ |
||||
Name: `{foo="bar"}`, |
||||
Volume: 42, |
||||
}, |
||||
}, |
||||
Limit: 10, |
||||
}, |
||||
} |
||||
calls, handler := setup(volResp) |
||||
|
||||
from, through := util.RoundToMilliseconds(testTime, testTime.Add(1*time.Hour)) |
||||
volReq := &logproto.VolumeRequest{ |
||||
From: from, |
||||
Through: through, |
||||
Matchers: `{foo="bar"}`, |
||||
Limit: 10, |
||||
} |
||||
|
||||
*calls = 0 |
||||
ctx := user.InjectOrgID(context.Background(), "fake") |
||||
resp, err := handler.Do(ctx, volReq) |
||||
require.NoError(t, err) |
||||
require.Equal(t, 1, *calls) |
||||
require.Equal(t, volResp, resp) |
||||
|
||||
// Doing same request again shouldn't change anything.
|
||||
*calls = 0 |
||||
resp, err = handler.Do(ctx, volReq) |
||||
require.NoError(t, err) |
||||
require.Equal(t, 0, *calls) |
||||
require.Equal(t, volResp, resp) |
||||
}) |
||||
|
||||
t.Run("a new request with overlapping time range should reuse part of the previous request for the overlap", func(t *testing.T) { |
||||
volResp := &VolumeResponse{ |
||||
Response: &logproto.VolumeResponse{ |
||||
Volumes: []logproto.Volume{ |
||||
{ |
||||
Name: `{foo="bar"}`, |
||||
Volume: 42, |
||||
}, |
||||
}, |
||||
Limit: 10, |
||||
}, |
||||
} |
||||
calls, handler := setup(volResp) |
||||
|
||||
from, through := util.RoundToMilliseconds(testTime, testTime.Add(1*time.Hour)) |
||||
volReq := &logproto.VolumeRequest{ |
||||
From: from, |
||||
Through: through, |
||||
Matchers: `{foo="bar"}`, |
||||
Limit: 10, |
||||
} |
||||
|
||||
ctx := user.InjectOrgID(context.Background(), "fake") |
||||
resp, err := handler.Do(ctx, volReq) |
||||
require.NoError(t, err) |
||||
require.Equal(t, 1, *calls) |
||||
require.Equal(t, volResp, resp) |
||||
|
||||
// The new start time is 15m (i.e. 25%) in the future with regard to the previous request time span.
|
||||
*calls = 0 |
||||
req := volReq.WithStartEnd(volReq.GetStart()+(15*time.Minute).Milliseconds(), volReq.GetEnd()+(15*time.Minute).Milliseconds()) |
||||
vol := float64(0.75) |
||||
expectedVol := &VolumeResponse{ |
||||
Response: &logproto.VolumeResponse{ |
||||
Volumes: []logproto.Volume{ |
||||
{ |
||||
Name: `{foo="bar"}`, |
||||
Volume: uint64(vol*float64(42)) + 42, |
||||
}, |
||||
}, |
||||
Limit: 10, |
||||
}, |
||||
} |
||||
|
||||
resp, err = handler.Do(ctx, req) |
||||
require.NoError(t, err) |
||||
require.Equal(t, 1, *calls) |
||||
require.Equal(t, expectedVol, resp) |
||||
}) |
||||
|
||||
t.Run("caches are only valid for the same request parameters", func(t *testing.T) { |
||||
volResp := &VolumeResponse{ |
||||
Response: &logproto.VolumeResponse{ |
||||
Volumes: []logproto.Volume{ |
||||
{ |
||||
Name: `{foo="bar"}`, |
||||
Volume: 42, |
||||
}, |
||||
}, |
||||
Limit: 10, |
||||
}, |
||||
} |
||||
calls, handler := setup(volResp) |
||||
|
||||
// initial call to fill cache
|
||||
from, through := util.RoundToMilliseconds(testTime, testTime.Add(1*time.Hour)) |
||||
volReq := &logproto.VolumeRequest{ |
||||
From: from, |
||||
Through: through, |
||||
Matchers: `{foo="bar"}`, |
||||
Limit: 10, |
||||
Step: 1, |
||||
} |
||||
|
||||
ctx := user.InjectOrgID(context.Background(), "fake") |
||||
_, err := handler.Do(ctx, volReq) |
||||
require.NoError(t, err) |
||||
require.Equal(t, 1, *calls) |
||||
|
||||
type testCase struct { |
||||
fn func(*logproto.VolumeRequest) |
||||
} |
||||
testCases := map[string]testCase{ |
||||
"different step": { |
||||
fn: func(req *logproto.VolumeRequest) { |
||||
req.Step = 2 |
||||
}, |
||||
}, |
||||
"new limit": { |
||||
fn: func(req *logproto.VolumeRequest) { |
||||
req.Limit = 11 |
||||
}, |
||||
}, |
||||
"aggregate by labels": { |
||||
fn: func(req *logproto.VolumeRequest) { |
||||
req.AggregateBy = seriesvolume.Labels |
||||
}, |
||||
}, |
||||
"target labels": { |
||||
fn: func(req *logproto.VolumeRequest) { |
||||
req.TargetLabels = []string{"foo"} |
||||
}, |
||||
}, |
||||
} |
||||
|
||||
for name, tc := range testCases { |
||||
*calls = 0 |
||||
|
||||
volReq := &logproto.VolumeRequest{ |
||||
From: from, |
||||
Through: through, |
||||
Matchers: `{foo="bar"}`, |
||||
Limit: 10, |
||||
Step: 1, |
||||
} |
||||
tc.fn(volReq) |
||||
|
||||
_, err = handler.Do(ctx, volReq) |
||||
require.NoError(t, err) |
||||
require.Equal(t, 1, *calls, name) |
||||
} |
||||
}) |
||||
} |
||||
|
||||
func TestVolumeCache_RecentData(t *testing.T) { |
||||
volumeCacheMiddlewareNowTimeFunc = func() model.Time { return model.Time(testTime.UnixMilli()) } |
||||
now := volumeCacheMiddlewareNowTimeFunc() |
||||
|
||||
volResp := &VolumeResponse{ |
||||
Response: &logproto.VolumeResponse{ |
||||
Volumes: []logproto.Volume{ |
||||
{ |
||||
Name: `{foo="bar"}`, |
||||
Volume: 42, |
||||
}, |
||||
}, |
||||
Limit: 10, |
||||
}, |
||||
} |
||||
|
||||
for _, tc := range []struct { |
||||
name string |
||||
maxStatsCacheFreshness time.Duration |
||||
req *logproto.VolumeRequest |
||||
|
||||
expectedCallsBeforeCache int |
||||
expectedCallsAfterCache int |
||||
expectedResp *VolumeResponse |
||||
}{ |
||||
{ |
||||
name: "MaxStatsCacheFreshness disabled", |
||||
maxStatsCacheFreshness: 0, |
||||
req: &logproto.VolumeRequest{ |
||||
From: now.Add(-1 * time.Hour), |
||||
Through: now.Add(-5 * time.Minute), // So we don't hit the max_cache_freshness_per_query limit (1m)
|
||||
Matchers: `{foo="bar"}`, |
||||
Limit: 10, |
||||
}, |
||||
|
||||
expectedCallsBeforeCache: 1, |
||||
expectedCallsAfterCache: 0, |
||||
expectedResp: volResp, |
||||
}, |
||||
{ |
||||
name: "MaxStatsCacheFreshness enabled", |
||||
maxStatsCacheFreshness: 30 * time.Minute, |
||||
req: &logproto.VolumeRequest{ |
||||
From: now.Add(-1 * time.Hour), |
||||
Through: now.Add(-5 * time.Minute), // So we don't hit the max_cache_freshness_per_query limit (1m)
|
||||
Matchers: `{foo="bar"}`, |
||||
Limit: 10, |
||||
}, |
||||
|
||||
expectedCallsBeforeCache: 1, |
||||
expectedCallsAfterCache: 1, // The whole request is done since it wasn't cached.
|
||||
expectedResp: volResp, |
||||
}, |
||||
{ |
||||
name: "MaxStatsCacheFreshness enabled, but request before the max freshness", |
||||
maxStatsCacheFreshness: 30 * time.Minute, |
||||
req: &logproto.VolumeRequest{ |
||||
From: now.Add(-1 * time.Hour), |
||||
Through: now.Add(-45 * time.Minute), |
||||
Matchers: `{foo="bar"}`, |
||||
Limit: 10, |
||||
}, |
||||
|
||||
expectedCallsBeforeCache: 1, |
||||
expectedCallsAfterCache: 0, |
||||
expectedResp: volResp, |
||||
}, |
||||
} { |
||||
t.Run(tc.name, func(t *testing.T) { |
||||
cfg := queryrangebase.ResultsCacheConfig{ |
||||
CacheConfig: cache.Config{ |
||||
Cache: cache.NewMockCache(), |
||||
}, |
||||
} |
||||
c, err := cache.New(cfg.CacheConfig, nil, log.NewNopLogger(), stats.ResultCache) |
||||
defer c.Stop() |
||||
require.NoError(t, err) |
||||
|
||||
lim := fakeLimits{maxStatsCacheFreshness: tc.maxStatsCacheFreshness} |
||||
|
||||
cacheMiddleware, err := NewVolumeCacheMiddleware( |
||||
log.NewNopLogger(), |
||||
WithSplitByLimits(lim, 24*time.Hour), |
||||
DefaultCodec, |
||||
c, |
||||
nil, |
||||
nil, |
||||
func(_ context.Context, _ []string, _ queryrangebase.Request) int { |
||||
return 1 |
||||
}, |
||||
false, |
||||
nil, |
||||
nil, |
||||
) |
||||
require.NoError(t, err) |
||||
|
||||
calls, statsHandler := volumeResultHandler(volResp) |
||||
rc := cacheMiddleware.Wrap(statsHandler) |
||||
|
||||
ctx := user.InjectOrgID(context.Background(), "fake") |
||||
resp, err := rc.Do(ctx, tc.req) |
||||
require.NoError(t, err) |
||||
require.Equal(t, tc.expectedCallsBeforeCache, *calls) |
||||
require.Equal(t, tc.expectedResp, resp) |
||||
|
||||
// Doing same request again
|
||||
*calls = 0 |
||||
resp, err = rc.Do(ctx, tc.req) |
||||
require.NoError(t, err) |
||||
require.Equal(t, tc.expectedCallsAfterCache, *calls) |
||||
require.Equal(t, tc.expectedResp, resp) |
||||
}) |
||||
} |
||||
} |
||||
|
||||
func volumeResultHandler(v *VolumeResponse) (*int, queryrangebase.Handler) { |
||||
calls := 0 |
||||
return &calls, queryrangebase.HandlerFunc(func(_ context.Context, _ queryrangebase.Request) (queryrangebase.Response, error) { |
||||
calls++ |
||||
return v, nil |
||||
}) |
||||
} |
Loading…
Reference in new issue