Bugfix include cache generations on querier response (#7300)

We check the cache number (incremented by deletes) after executing a
query with the value returned by the queriers which executed it. If
they’re different, we know we can’t safely cache. However, we’re never
actually returning a number from the queriers. This means that as soon
as this cache number is incremented by a delete, it’ll disable the
results cache for that tenant completely.

I suspect this has been a latent bug since we forked cortex (goes back
to missing some code from this
[PR](https://github.com/cortexproject/cortex/pull/2279) into cortex from
@sandeepsukhani back in the day!). It just showed itself once we started
using deletes.
pull/7321/head
Owen Diehl 3 years ago committed by GitHub
parent 8886800bf6
commit 2bd9c91564
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      pkg/loki/loki.go
  2. 49
      pkg/loki/modules.go
  3. 11
      pkg/querier/queryrange/queryrangebase/results_cache.go
  4. 2
      pkg/storage/stores/indexshipper/compactor/generationnumber/gennumber_loader.go
  5. 21
      pkg/util/server/middleware.go

@ -33,6 +33,7 @@ import (
"github.com/grafana/loki/pkg/lokifrontend"
"github.com/grafana/loki/pkg/querier"
"github.com/grafana/loki/pkg/querier/queryrange"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
basetripper "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/pkg/querier/worker"
"github.com/grafana/loki/pkg/ruler"
@ -243,6 +244,7 @@ type Loki struct {
distributor *distributor.Distributor
Ingester ingester.Interface
Querier querier.Querier
cacheGenerationLoader queryrangebase.CacheGenNumberLoader
querierAPI *querier.QuerierAPI
ingesterQuerier *querier.IngesterQuerier
Store storage.Store
@ -510,6 +512,7 @@ func (t *Loki) setupModuleManager() error {
mm.RegisterModule(QueryScheduler, t.initQueryScheduler)
mm.RegisterModule(IndexGatewayRing, t.initIndexGatewayRing, modules.UserInvisibleModule)
mm.RegisterModule(UsageReport, t.initUsageReport)
mm.RegisterModule(CacheGenerationLoader, t.initCacheGenerationLoader)
mm.RegisterModule(All, nil)
mm.RegisterModule(Read, nil)
@ -526,9 +529,9 @@ func (t *Loki) setupModuleManager() error {
Distributor: {Ring, Server, Overrides, TenantConfigs, UsageReport},
Store: {Overrides, Embededcache, IndexGatewayRing},
Ingester: {Store, Server, MemberlistKV, TenantConfigs, UsageReport},
Querier: {Store, Ring, Server, IngesterQuerier, TenantConfigs, UsageReport},
Querier: {Store, Ring, Server, IngesterQuerier, TenantConfigs, UsageReport, CacheGenerationLoader},
QueryFrontendTripperware: {Server, Embededcache, Overrides, TenantConfigs},
QueryFrontend: {QueryFrontendTripperware, UsageReport},
QueryFrontend: {QueryFrontendTripperware, UsageReport, CacheGenerationLoader},
QueryScheduler: {Server, Overrides, MemberlistKV, UsageReport},
Ruler: {Ring, Server, Store, RulerStorage, IngesterQuerier, Overrides, TenantConfigs, UsageReport},
TableManager: {Server, UsageReport},

@ -40,7 +40,6 @@ import (
"github.com/grafana/loki/pkg/lokifrontend/frontend/v2/frontendv2pb"
"github.com/grafana/loki/pkg/querier"
"github.com/grafana/loki/pkg/querier/queryrange"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/pkg/ruler"
base_ruler "github.com/grafana/loki/pkg/ruler/base"
"github.com/grafana/loki/pkg/runtime"
@ -81,6 +80,7 @@ const (
Distributor string = "distributor"
Ingester string = "ingester"
Querier string = "querier"
CacheGenerationLoader string = "cache-generation-loader"
IngesterQuerier string = "ingester-querier"
QueryFrontend string = "query-frontend"
QueryFrontendTripperware string = "query-frontend-tripperware"
@ -362,9 +362,16 @@ func (t *Loki) initQuerier() (services.Service, error) {
SchedulerRing: scheduler.SafeReadRing(t.queryScheduler),
}
httpMiddleware := middleware.Merge(
toMerge := []middleware.Interface{
httpreq.ExtractQueryMetricsMiddleware(),
)
}
if t.supportIndexDeleteRequest() {
toMerge = append(
toMerge,
serverutil.CacheGenNumberHeaderSetterMiddleware(t.cacheGenerationLoader),
)
}
httpMiddleware := middleware.Merge(toMerge...)
logger := log.With(util_log.Logger, "component", "querier")
t.querierAPI = querier.NewQuerierAPI(t.Cfg.Querier, t.Querier, t.overrides, logger)
@ -664,20 +671,11 @@ func (disabledShuffleShardingLimits) MaxQueriersPerUser(userID string) int { ret
func (t *Loki) initQueryFrontendTripperware() (_ services.Service, err error) {
level.Debug(util_log.Logger).Log("msg", "initializing query frontend tripperware")
var genLoader queryrangebase.CacheGenNumberLoader
if t.supportIndexDeleteRequest() {
cacheGenClient, err := t.cacheGenClient()
if err != nil {
return nil, err
}
genLoader = generationnumber.NewGenNumberLoader(cacheGenClient, prometheus.DefaultRegisterer)
}
tripperware, stopper, err := queryrange.NewTripperware(
t.Cfg.QueryRange,
util_log.Logger,
t.overrides,
t.Cfg.SchemaConfig, genLoader,
t.Cfg.SchemaConfig, t.cacheGenerationLoader,
prometheus.DefaultRegisterer,
)
if err != nil {
@ -689,16 +687,25 @@ func (t *Loki) initQueryFrontendTripperware() (_ services.Service, err error) {
return services.NewIdleService(nil, nil), nil
}
func (t *Loki) supportIndexDeleteRequest() bool {
return config.UsingObjectStorageIndex(t.Cfg.SchemaConfig.Configs)
func (t *Loki) initCacheGenerationLoader() (_ services.Service, err error) {
var client generationnumber.CacheGenClient
if t.supportIndexDeleteRequest() {
compactorAddress, err := t.compactorAddress()
if err != nil {
return nil, err
}
client, err = generationnumber.NewGenNumberClient(compactorAddress, &http.Client{Timeout: 5 * time.Second})
if err != nil {
return nil, err
}
}
t.cacheGenerationLoader = generationnumber.NewGenNumberLoader(client, prometheus.DefaultRegisterer)
return services.NewIdleService(nil, nil), nil
}
func (t *Loki) cacheGenClient() (generationnumber.CacheGenClient, error) {
compactorAddress, err := t.compactorAddress()
if err != nil {
return nil, err
}
return generationnumber.NewGenNumberClient(compactorAddress, &http.Client{Timeout: 5 * time.Second})
func (t *Loki) supportIndexDeleteRequest() bool {
return config.UsingObjectStorageIndex(t.Cfg.SchemaConfig.Configs)
}
func (t *Loki) compactorAddress() (string, error) {

@ -24,6 +24,7 @@ import (
"github.com/prometheus/prometheus/promql/parser"
"github.com/uber/jaeger-client-go"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"github.com/grafana/dskit/tenant"
@ -225,9 +226,13 @@ func (s resultsCache) Do(ctx context.Context, r Request) (Response, error) {
// shouldCacheResponse says whether the response should be cached or not.
func (s resultsCache) shouldCacheResponse(ctx context.Context, req Request, r Response, maxCacheTime int64) bool {
headerValues := getHeaderValuesWithName(r, cacheControlHeader)
user, _ := user.ExtractOrgID(ctx)
logger := log.With(s.logger, "org_id", user)
for _, v := range headerValues {
if v == noStoreValue {
level.Debug(s.logger).Log("msg", fmt.Sprintf("%s header in response is equal to %s, not caching the response", cacheControlHeader, noStoreValue))
level.Debug(logger).Log("msg", fmt.Sprintf("%s header in response is equal to %s, not caching the response", cacheControlHeader, noStoreValue))
return false
}
}
@ -244,13 +249,13 @@ func (s resultsCache) shouldCacheResponse(ctx context.Context, req Request, r Re
genNumberFromCtx := cache.ExtractCacheGenNumber(ctx)
if len(genNumbersFromResp) == 0 && genNumberFromCtx != "" {
level.Debug(s.logger).Log("msg", fmt.Sprintf("we found results cache gen number %s set in store but none in headers", genNumberFromCtx))
level.Debug(logger).Log("msg", fmt.Sprintf("we found results cache gen number %s set in store but none in headers", genNumberFromCtx))
return false
}
for _, gen := range genNumbersFromResp {
if gen != genNumberFromCtx {
level.Debug(s.logger).Log("msg", fmt.Sprintf("inconsistency in results cache gen numbers %s (GEN-FROM-RESPONSE) != %s (GEN-FROM-STORE), not caching the response", gen, genNumberFromCtx))
level.Debug(logger).Log("msg", fmt.Sprintf("inconsistency in results cache gen numbers %s (GEN-FROM-RESPONSE) != %s (GEN-FROM-STORE), not caching the response", gen, genNumberFromCtx))
return false
}
}

@ -149,5 +149,5 @@ func (g *noopNumberGetter) GetCacheGenerationNumber(_ context.Context, _ string)
}
func (g *noopNumberGetter) Name() string {
return ""
return "noop-getter"
}

@ -3,8 +3,11 @@ package server
import (
"net/http"
"github.com/grafana/dskit/tenant"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/middleware"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
)
// NewPrepopulateMiddleware creates a middleware which will parse incoming http forms.
@ -31,3 +34,21 @@ func ResponseJSONMiddleware() middleware.Interface {
})
})
}
// middleware for setting cache gen header to let consumer of response know all previous responses could be invalid due to delete operation
func CacheGenNumberHeaderSetterMiddleware(cacheGenNumbersLoader queryrangebase.CacheGenNumberLoader) middleware.Interface {
return middleware.Func(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
userIDs, err := tenant.TenantIDs(r.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusUnauthorized)
return
}
cacheGenNumber := cacheGenNumbersLoader.GetResultsCacheGenNumber(userIDs)
w.Header().Set(queryrangebase.ResultsCacheGenNumberHeaderName, cacheGenNumber)
next.ServeHTTP(w, r)
})
})
}

Loading…
Cancel
Save