From 2bd9c91564d3a71b721224178bbbf41a6afa3d8e Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Mon, 3 Oct 2022 10:32:27 -0500 Subject: [PATCH] Bugfix include cache generations on querier response (#7300) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We check the cache number (incremented by deletes) after executing a query with the value returned by the queriers which executed it. If they’re different, we know we can’t safely cache. However, we’re never actually returning a number from the queriers. This means that as soon as this cache number is incremented by a delete, it’ll disable the results cache for that tenant completely. I suspect this has been a latent bug since we forked cortex (goes back to missing some code from this [PR](https://github.com/cortexproject/cortex/pull/2279) into cortex from @sandeepsukhani back in the day!). It just showed itself once we started using deletes. --- pkg/loki/loki.go | 7 ++- pkg/loki/modules.go | 49 +++++++++++-------- .../queryrangebase/results_cache.go | 11 +++-- .../generationnumber/gennumber_loader.go | 2 +- pkg/util/server/middleware.go | 21 ++++++++ 5 files changed, 63 insertions(+), 27 deletions(-) diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 3ca73cff17..e58bd034cd 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -33,6 +33,7 @@ import ( "github.com/grafana/loki/pkg/lokifrontend" "github.com/grafana/loki/pkg/querier" "github.com/grafana/loki/pkg/querier/queryrange" + "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" basetripper "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" "github.com/grafana/loki/pkg/querier/worker" "github.com/grafana/loki/pkg/ruler" @@ -243,6 +244,7 @@ type Loki struct { distributor *distributor.Distributor Ingester ingester.Interface Querier querier.Querier + cacheGenerationLoader queryrangebase.CacheGenNumberLoader querierAPI *querier.QuerierAPI ingesterQuerier *querier.IngesterQuerier Store storage.Store @@ -510,6 +512,7 @@ func (t *Loki) setupModuleManager() error { mm.RegisterModule(QueryScheduler, t.initQueryScheduler) mm.RegisterModule(IndexGatewayRing, t.initIndexGatewayRing, modules.UserInvisibleModule) mm.RegisterModule(UsageReport, t.initUsageReport) + mm.RegisterModule(CacheGenerationLoader, t.initCacheGenerationLoader) mm.RegisterModule(All, nil) mm.RegisterModule(Read, nil) @@ -526,9 +529,9 @@ func (t *Loki) setupModuleManager() error { Distributor: {Ring, Server, Overrides, TenantConfigs, UsageReport}, Store: {Overrides, Embededcache, IndexGatewayRing}, Ingester: {Store, Server, MemberlistKV, TenantConfigs, UsageReport}, - Querier: {Store, Ring, Server, IngesterQuerier, TenantConfigs, UsageReport}, + Querier: {Store, Ring, Server, IngesterQuerier, TenantConfigs, UsageReport, CacheGenerationLoader}, QueryFrontendTripperware: {Server, Embededcache, Overrides, TenantConfigs}, - QueryFrontend: {QueryFrontendTripperware, UsageReport}, + QueryFrontend: {QueryFrontendTripperware, UsageReport, CacheGenerationLoader}, QueryScheduler: {Server, Overrides, MemberlistKV, UsageReport}, Ruler: {Ring, Server, Store, RulerStorage, IngesterQuerier, Overrides, TenantConfigs, UsageReport}, TableManager: {Server, UsageReport}, diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 6b2be4dd92..60888b0791 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -40,7 +40,6 @@ import ( "github.com/grafana/loki/pkg/lokifrontend/frontend/v2/frontendv2pb" "github.com/grafana/loki/pkg/querier" "github.com/grafana/loki/pkg/querier/queryrange" - "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" "github.com/grafana/loki/pkg/ruler" base_ruler "github.com/grafana/loki/pkg/ruler/base" "github.com/grafana/loki/pkg/runtime" @@ -81,6 +80,7 @@ const ( Distributor string = "distributor" Ingester string = "ingester" Querier string = "querier" + CacheGenerationLoader string = "cache-generation-loader" IngesterQuerier string = "ingester-querier" QueryFrontend string = "query-frontend" QueryFrontendTripperware string = "query-frontend-tripperware" @@ -362,9 +362,16 @@ func (t *Loki) initQuerier() (services.Service, error) { SchedulerRing: scheduler.SafeReadRing(t.queryScheduler), } - httpMiddleware := middleware.Merge( + toMerge := []middleware.Interface{ httpreq.ExtractQueryMetricsMiddleware(), - ) + } + if t.supportIndexDeleteRequest() { + toMerge = append( + toMerge, + serverutil.CacheGenNumberHeaderSetterMiddleware(t.cacheGenerationLoader), + ) + } + httpMiddleware := middleware.Merge(toMerge...) logger := log.With(util_log.Logger, "component", "querier") t.querierAPI = querier.NewQuerierAPI(t.Cfg.Querier, t.Querier, t.overrides, logger) @@ -664,20 +671,11 @@ func (disabledShuffleShardingLimits) MaxQueriersPerUser(userID string) int { ret func (t *Loki) initQueryFrontendTripperware() (_ services.Service, err error) { level.Debug(util_log.Logger).Log("msg", "initializing query frontend tripperware") - var genLoader queryrangebase.CacheGenNumberLoader - if t.supportIndexDeleteRequest() { - cacheGenClient, err := t.cacheGenClient() - if err != nil { - return nil, err - } - genLoader = generationnumber.NewGenNumberLoader(cacheGenClient, prometheus.DefaultRegisterer) - } - tripperware, stopper, err := queryrange.NewTripperware( t.Cfg.QueryRange, util_log.Logger, t.overrides, - t.Cfg.SchemaConfig, genLoader, + t.Cfg.SchemaConfig, t.cacheGenerationLoader, prometheus.DefaultRegisterer, ) if err != nil { @@ -689,16 +687,25 @@ func (t *Loki) initQueryFrontendTripperware() (_ services.Service, err error) { return services.NewIdleService(nil, nil), nil } -func (t *Loki) supportIndexDeleteRequest() bool { - return config.UsingObjectStorageIndex(t.Cfg.SchemaConfig.Configs) +func (t *Loki) initCacheGenerationLoader() (_ services.Service, err error) { + var client generationnumber.CacheGenClient + if t.supportIndexDeleteRequest() { + compactorAddress, err := t.compactorAddress() + if err != nil { + return nil, err + } + client, err = generationnumber.NewGenNumberClient(compactorAddress, &http.Client{Timeout: 5 * time.Second}) + if err != nil { + return nil, err + } + } + + t.cacheGenerationLoader = generationnumber.NewGenNumberLoader(client, prometheus.DefaultRegisterer) + return services.NewIdleService(nil, nil), nil } -func (t *Loki) cacheGenClient() (generationnumber.CacheGenClient, error) { - compactorAddress, err := t.compactorAddress() - if err != nil { - return nil, err - } - return generationnumber.NewGenNumberClient(compactorAddress, &http.Client{Timeout: 5 * time.Second}) +func (t *Loki) supportIndexDeleteRequest() bool { + return config.UsingObjectStorageIndex(t.Cfg.SchemaConfig.Configs) } func (t *Loki) compactorAddress() (string, error) { diff --git a/pkg/querier/queryrange/queryrangebase/results_cache.go b/pkg/querier/queryrange/queryrangebase/results_cache.go index 280deab6d5..60f5256cac 100644 --- a/pkg/querier/queryrange/queryrangebase/results_cache.go +++ b/pkg/querier/queryrange/queryrangebase/results_cache.go @@ -24,6 +24,7 @@ import ( "github.com/prometheus/prometheus/promql/parser" "github.com/uber/jaeger-client-go" "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/user" "github.com/grafana/dskit/tenant" @@ -225,9 +226,13 @@ func (s resultsCache) Do(ctx context.Context, r Request) (Response, error) { // shouldCacheResponse says whether the response should be cached or not. func (s resultsCache) shouldCacheResponse(ctx context.Context, req Request, r Response, maxCacheTime int64) bool { headerValues := getHeaderValuesWithName(r, cacheControlHeader) + + user, _ := user.ExtractOrgID(ctx) + logger := log.With(s.logger, "org_id", user) + for _, v := range headerValues { if v == noStoreValue { - level.Debug(s.logger).Log("msg", fmt.Sprintf("%s header in response is equal to %s, not caching the response", cacheControlHeader, noStoreValue)) + level.Debug(logger).Log("msg", fmt.Sprintf("%s header in response is equal to %s, not caching the response", cacheControlHeader, noStoreValue)) return false } } @@ -244,13 +249,13 @@ func (s resultsCache) shouldCacheResponse(ctx context.Context, req Request, r Re genNumberFromCtx := cache.ExtractCacheGenNumber(ctx) if len(genNumbersFromResp) == 0 && genNumberFromCtx != "" { - level.Debug(s.logger).Log("msg", fmt.Sprintf("we found results cache gen number %s set in store but none in headers", genNumberFromCtx)) + level.Debug(logger).Log("msg", fmt.Sprintf("we found results cache gen number %s set in store but none in headers", genNumberFromCtx)) return false } for _, gen := range genNumbersFromResp { if gen != genNumberFromCtx { - level.Debug(s.logger).Log("msg", fmt.Sprintf("inconsistency in results cache gen numbers %s (GEN-FROM-RESPONSE) != %s (GEN-FROM-STORE), not caching the response", gen, genNumberFromCtx)) + level.Debug(logger).Log("msg", fmt.Sprintf("inconsistency in results cache gen numbers %s (GEN-FROM-RESPONSE) != %s (GEN-FROM-STORE), not caching the response", gen, genNumberFromCtx)) return false } } diff --git a/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader.go b/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader.go index 97a92fd6dc..55267987cf 100644 --- a/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader.go +++ b/pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader.go @@ -149,5 +149,5 @@ func (g *noopNumberGetter) GetCacheGenerationNumber(_ context.Context, _ string) } func (g *noopNumberGetter) Name() string { - return "" + return "noop-getter" } diff --git a/pkg/util/server/middleware.go b/pkg/util/server/middleware.go index 6131e11aaa..def74997aa 100644 --- a/pkg/util/server/middleware.go +++ b/pkg/util/server/middleware.go @@ -3,8 +3,11 @@ package server import ( "net/http" + "github.com/grafana/dskit/tenant" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/middleware" + + "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" ) // NewPrepopulateMiddleware creates a middleware which will parse incoming http forms. @@ -31,3 +34,21 @@ func ResponseJSONMiddleware() middleware.Interface { }) }) } + +// middleware for setting cache gen header to let consumer of response know all previous responses could be invalid due to delete operation +func CacheGenNumberHeaderSetterMiddleware(cacheGenNumbersLoader queryrangebase.CacheGenNumberLoader) middleware.Interface { + return middleware.Func(func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + userIDs, err := tenant.TenantIDs(r.Context()) + if err != nil { + http.Error(w, err.Error(), http.StatusUnauthorized) + return + } + + cacheGenNumber := cacheGenNumbersLoader.GetResultsCacheGenNumber(userIDs) + + w.Header().Set(queryrangebase.ResultsCacheGenNumberHeaderName, cacheGenNumber) + next.ServeHTTP(w, r) + }) + }) +}