Add way to override userId for caching (#7542)

**What this PR does / why we need it**:

Add a way to change the userId used in caching.

**Which issue(s) this PR fixes**:
Fixes #<issue number>

**Special notes for your reviewer**:

**Checklist**
- [ ] Reviewed the `CONTRIBUTING.md` guide
- [ ] Documentation added
- [ ] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/upgrading/_index.md`

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
pull/7525/head
Michel Hollands 3 years ago committed by GitHub
parent 6b4563cd53
commit 16761723f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      pkg/querier/queryrange/limits.go
  2. 6
      pkg/querier/queryrange/limits_test.go
  3. 6
      pkg/querier/queryrange/queryrangebase/results_cache.go
  4. 4
      pkg/querier/queryrange/queryrangebase/results_cache_test.go
  5. 4
      pkg/querier/queryrange/roundtrip.go
  6. 2
      pkg/querier/queryrange/roundtrip_test.go

@ -58,12 +58,15 @@ func WithSplitByLimits(l Limits, splitBy time.Duration) Limits {
}
}
type UserIDTransformer func(context.Context, string) string
// cacheKeyLimits intersects Limits and CacheSplitter
type cacheKeyLimits struct {
Limits
transformer UserIDTransformer
}
func (l cacheKeyLimits) GenerateCacheKey(userID string, r queryrangebase.Request) string {
func (l cacheKeyLimits) GenerateCacheKey(ctx context.Context, userID string, r queryrangebase.Request) string {
split := l.QuerySplitDuration(userID)
var currentInterval int64
@ -71,6 +74,10 @@ func (l cacheKeyLimits) GenerateCacheKey(userID string, r queryrangebase.Request
currentInterval = r.GetStart() / denominator
}
if l.transformer != nil {
userID = l.transformer(ctx, userID)
}
// include both the currentInterval and the split duration in key to ensure
// a cache key can't be reused when an interval changes
return fmt.Sprintf("%s:%s:%d:%d:%d", userID, r.GetQuery(), r.GetStep(), currentInterval, split)

@ -43,7 +43,7 @@ func TestLimits(t *testing.T) {
require.Equal(
t,
fmt.Sprintf("%s:%s:%d:%d:%d", "a", r.GetQuery(), r.GetStep(), r.GetStart()/int64(time.Hour/time.Millisecond), int64(time.Hour)),
cacheKeyLimits{wrapped}.GenerateCacheKey("a", r),
cacheKeyLimits{wrapped, nil}.GenerateCacheKey(context.Background(), "a", r),
)
}
@ -268,7 +268,7 @@ func Test_MaxQueryLookBack(t *testing.T) {
}
func Test_GenerateCacheKey_NoDivideZero(t *testing.T) {
l := cacheKeyLimits{WithSplitByLimits(nil, 0)}
l := cacheKeyLimits{WithSplitByLimits(nil, 0), nil}
start := time.Now()
r := &LokiRequest{
Query: "qry",
@ -279,6 +279,6 @@ func Test_GenerateCacheKey_NoDivideZero(t *testing.T) {
require.Equal(
t,
fmt.Sprintf("foo:qry:%d:0:0", r.GetStep()),
l.GenerateCacheKey("foo", r),
l.GenerateCacheKey(context.Background(), "foo", r),
)
}

@ -135,14 +135,14 @@ func (PrometheusResponseExtractor) ResponseWithoutHeaders(resp Response) Respons
// CacheSplitter generates cache keys. This is a useful interface for downstream
// consumers who wish to implement their own strategies.
type CacheSplitter interface {
GenerateCacheKey(userID string, r Request) string
GenerateCacheKey(ctx context.Context, userID string, r Request) string
}
// constSplitter is a utility for using a constant split interval when determining cache keys
type constSplitter time.Duration
// GenerateCacheKey generates a cache key based on the userID, Request and interval.
func (t constSplitter) GenerateCacheKey(userID string, r Request) string {
func (t constSplitter) GenerateCacheKey(_ context.Context, userID string, r Request) string {
currentInterval := r.GetStart() / int64(time.Duration(t)/time.Millisecond)
return fmt.Sprintf("%s:%s:%d:%d", userID, r.GetQuery(), r.GetStep(), currentInterval)
}
@ -219,7 +219,7 @@ func (s resultsCache) Do(ctx context.Context, r Request) (Response, error) {
}
var (
key = s.splitter.GenerateCacheKey(tenant.JoinTenantIDs(tenantIDs), r)
key = s.splitter.GenerateCacheKey(ctx, tenant.JoinTenantIDs(tenantIDs), r)
extents []Extent
response Response
)

@ -883,7 +883,7 @@ func TestResultsCacheMaxFreshness(t *testing.T) {
req := parsedRequest.WithStartEnd(int64(modelNow)-(50*1e3), int64(modelNow)-(10*1e3))
// fill cache
key := constSplitter(day).GenerateCacheKey("1", req)
key := constSplitter(day).GenerateCacheKey(context.Background(), "1", req)
rc.(*resultsCache).put(ctx, key, []Extent{mkExtent(int64(modelNow)-(600*1e3), int64(modelNow))})
resp, err := rc.Do(ctx, req)
@ -966,7 +966,7 @@ func TestConstSplitter_generateCacheKey(t *testing.T) {
}
for _, tt := range tests {
t.Run(fmt.Sprintf("%s - %s", tt.name, tt.interval), func(t *testing.T) {
if got := constSplitter(tt.interval).GenerateCacheKey("fake", tt.r); got != tt.want {
if got := constSplitter(tt.interval).GenerateCacheKey(context.Background(), "fake", tt.r); got != tt.want {
t.Errorf("generateKey() = %v, want %v", got, tt.want)
}
})

@ -25,6 +25,7 @@ import (
// Config is the configuration for the queryrange tripperware
type Config struct {
queryrangebase.Config `yaml:",inline"`
Transformer UserIDTransformer `yaml:"-"`
}
// RegisterFlags adds the flags required to configure this flag set.
@ -412,11 +413,12 @@ func NewMetricTripperware(
SplitByIntervalMiddleware(limits, codec, splitMetricByTime, metrics.SplitByMetrics),
)
cacheKey := cacheKeyLimits{limits, cfg.Transformer}
if cfg.CacheResults {
queryCacheMiddleware, err := queryrangebase.NewResultsCacheMiddleware(
log,
c,
cacheKeyLimits{limits},
cacheKey,
limits,
codec,
extractor,

@ -46,7 +46,7 @@ var (
},
},
},
}}
}, nil}
matrix = promql.Matrix{
{
Points: []promql.Point{

Loading…
Cancel
Save