add user-id transformer for logs results cache (#7581)

**What this PR does / why we need it**:
Adding transformer to results cache same as metrics cache done in PR
#7542
pull/7453/head
Sandeep Sukhani 3 years ago committed by GitHub
parent 1e2079eb5f
commit 020631ebac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 17
      pkg/querier/queryrange/log_result_cache.go
  2. 6
      pkg/querier/queryrange/log_result_cache_test.go
  3. 1
      pkg/querier/queryrange/roundtrip.go

@ -50,7 +50,8 @@ func NewLogResultCacheMetrics(registerer prometheus.Registerer) *LogResultCacheM
// Log hits are difficult to handle because of the limit query parameter and the size of the response.
// In the future it could be extended to cache non-empty query results.
// see https://docs.google.com/document/d/1_mACOpxdWZ5K0cIedaja5gzMbv-m0lUVazqZd2O4mEU/edit
func NewLogResultCache(logger log.Logger, limits Limits, cache cache.Cache, shouldCache queryrangebase.ShouldCacheFn, metrics *LogResultCacheMetrics) queryrangebase.Middleware {
func NewLogResultCache(logger log.Logger, limits Limits, cache cache.Cache, shouldCache queryrangebase.ShouldCacheFn,
transformer UserIDTransformer, metrics *LogResultCacheMetrics) queryrangebase.Middleware {
if metrics == nil {
metrics = NewLogResultCacheMetrics(nil)
}
@ -61,6 +62,7 @@ func NewLogResultCache(logger log.Logger, limits Limits, cache cache.Cache, shou
cache: cache,
logger: logger,
shouldCache: shouldCache,
transformer: transformer,
metrics: metrics,
}
})
@ -71,6 +73,7 @@ type logResultCache struct {
limits Limits
cache cache.Cache
shouldCache queryrangebase.ShouldCacheFn
transformer UserIDTransformer
metrics *LogResultCacheMetrics
logger log.Logger
@ -105,7 +108,17 @@ func (l *logResultCache) Do(ctx context.Context, req queryrangebase.Request) (qu
// The first subquery might not be aligned.
alignedStart := time.Unix(0, lokiReq.GetStartTs().UnixNano()-(lokiReq.GetStartTs().UnixNano()%interval.Nanoseconds()))
// generate the cache key based on query, tenant and start time.
cacheKey := fmt.Sprintf("log:%s:%s:%d:%d", tenant.JoinTenantIDs(tenantIDs), req.GetQuery(), interval.Nanoseconds(), alignedStart.UnixNano()/(interval.Nanoseconds()))
transformedTenantIDs := tenantIDs
if l.transformer != nil {
transformedTenantIDs = make([]string, 0, len(tenantIDs))
for _, tenantID := range tenantIDs {
transformedTenantIDs = append(transformedTenantIDs, l.transformer(ctx, tenantID))
}
}
cacheKey := fmt.Sprintf("log:%s:%s:%d:%d", tenant.JoinTenantIDs(transformedTenantIDs), req.GetQuery(), interval.Nanoseconds(), alignedStart.UnixNano()/(interval.Nanoseconds()))
_, buff, _, err := l.cache.Fetch(ctx, []string{cache.HashKey(cacheKey)})
if err != nil {

@ -29,6 +29,7 @@ func Test_LogResultCacheSameRange(t *testing.T) {
cache.NewMockCache(),
nil,
nil,
nil,
)
)
@ -69,6 +70,7 @@ func Test_LogResultCacheSameRangeNonEmpty(t *testing.T) {
cache.NewMockCache(),
nil,
nil,
nil,
)
)
@ -115,6 +117,7 @@ func Test_LogResultCacheSmallerRange(t *testing.T) {
cache.NewMockCache(),
nil,
nil,
nil,
)
)
@ -161,6 +164,7 @@ func Test_LogResultCacheDifferentRange(t *testing.T) {
cache.NewMockCache(),
nil,
nil,
nil,
)
)
@ -230,6 +234,7 @@ func Test_LogResultCacheDifferentRangeNonEmpty(t *testing.T) {
cache.NewMockCache(),
nil,
nil,
nil,
)
)
@ -308,6 +313,7 @@ func Test_LogResultCacheDifferentRangeNonEmptyAndEmpty(t *testing.T) {
cache.NewMockCache(),
nil,
nil,
nil,
)
)

@ -269,6 +269,7 @@ func NewLogFilterTripperware(
func(r queryrangebase.Request) bool {
return !r.GetCachingOptions().Disabled
},
cfg.Transformer,
metrics.LogResultCacheMetrics,
)
queryRangeMiddleware = append(

Loading…
Cancel
Save