Stats cache can be configured independently (#9535)

**What this PR does / why we need it**:

Before this PR, the index stats cache would use the same config as the
query results cache. This was a limitation since:

1. We would not be able to point to a different cache for storing the
index stats if needed.
2. We would not be able to add specific settings for this cache, without
adding it to the results cache.

In this PR, we refactor the index stats cache config to be independently
configurable. Note that if it's not configured, it will try to use the
results cache settings.

**Which issue(s) this PR fixes**:
This is needed for:
- https://github.com/grafana/loki/pull/9537
- https://github.com/grafana/loki/pull/9536

**Special notes for your reviewer**:

- This PR also refactors all the tripperwares in rountrip.go to reuse
the same stats tripperware instead of each one creating their own.
- Configuring a new cache in rountrip.go is a requirement for
https://github.com/grafana/loki/pull/9536 so the stats summary can
distinguish before the stats cache and the results cache.

**Checklist**
- [x] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [x] Documentation added
- [x] Tests updated
- [x] `CHANGELOG.md` updated
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/upgrading/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e3ec)
pull/9536/head
Salva Corts 3 years ago committed by GitHub
parent 3c52034d63
commit 1694ad0f9b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 22
      docs/sources/configuration/_index.md
  3. 1
      pkg/logqlmodel/stats/context.go
  4. 15
      pkg/loki/config_wrapper.go
  5. 43
      pkg/loki/config_wrapper_test.go
  6. 14
      pkg/querier/queryrange/index_stats_cache.go
  7. 1
      pkg/querier/queryrange/limits_test.go
  8. 14
      pkg/querier/queryrange/queryrangebase/results_cache.go
  9. 14
      pkg/querier/queryrange/queryrangebase/roundtrip.go
  10. 132
      pkg/querier/queryrange/roundtrip.go
  11. 180
      pkg/querier/queryrange/roundtrip_test.go
  12. 17
      pkg/storage/chunk/cache/cache.go

@ -32,6 +32,7 @@
* [9357](https://github.com/grafana/loki/pull/9357) **Indransh**: Add HTTP API to change the log level at runtime
* [9431](https://github.com/grafana/loki/pull/9431) **dannykopping**: Add more buckets to `loki_memcache_request_duration_seconds` metric; latencies can increase if using memcached with NVMe
* [8684](https://github.com/grafana/loki/pull/8684) **oleksii-boiko-ua**: Helm: Add hpa templates for read, write and backend components.
* [9535](https://github.com/grafana/loki/pull/9535) **salvacorts** Index stats cache can be configured independently of the results cache. If it's not configured, but it's enabled, it will use the results cache configuration.
* [9604](https://github.com/grafana/loki/pull/9604) **dannykopping**: Querier: configurable writeback queue bytes size
##### Fixes

@ -772,10 +772,6 @@ results_cache:
# CLI flag: -querier.cache-results
[cache_results: <boolean> | default = false]
# Cache index stats query results.
# CLI flag: -querier.cache-index-stats-results
[cache_index_stats_results: <boolean> | default = false]
# Maximum number of retries for a single request; beyond this, the downstream
# error is returned.
# CLI flag: -querier.max-retries-per-request
@ -789,6 +785,23 @@ results_cache:
# List of headers forwarded by the query Frontend to downstream querier.
# CLI flag: -frontend.forward-headers-list
[forward_headers_list: <list of strings> | default = []]
# Cache index stats query results.
# CLI flag: -querier.cache-index-stats-results
[cache_index_stats_results: <boolean> | default = false]
# If a cache config is not specified and cache_index_stats_results is true, the
# config for the results cache is used.
index_stats_results_cache:
# The cache block configures the cache backend.
# The CLI flags prefix for this block configuration is:
# frontend.index-stats-results-cache
[cache: <cache_config>]
# Use compression in results cache. Supported values are: 'snappy' and ''
# (disable compression).
# CLI flag: -frontend.index-stats-results-cache.compression
[compression: <string> | default = ""]
```
### ruler
@ -3728,6 +3741,7 @@ The TLS configuration.
The cache block configures the cache backend. The supported CLI flags `<prefix>` used to reference this configuration block are:
- `frontend`
- `frontend.index-stats-results-cache`
- `store.chunks-cache`
- `store.index-cache-read`
- `store.index-cache-write`

@ -58,6 +58,7 @@ const (
ChunkCache CacheType = "chunk" //nolint:staticcheck
IndexCache = "index"
ResultCache = "result"
StatsResultCache = "stats-result"
WriteDedupeCache = "write-dedupe"
)

@ -551,11 +551,10 @@ func betterTSDBShipperDefaults(cfg, defaults *ConfigWrapper, period config.Perio
}
}
// applyFIFOCacheConfig turns on FIFO cache for the chunk store and for the query range results,
// but only if no other cache storage is configured (redis or memcache).
//
// This behavior is only applied for the chunk store cache and for the query range results cache
// (i.e: not applicable for the index queries cache or for the write dedupe cache).
// applyFIFOCacheConfig turns on FIFO cache for the chunk store, for the query range results,
// and for the index stats results, but only if no other cache storage is configured (redis or memcache).
// This behavior is only applied for the chunk store cache, for the query range results cache, and for
// the index stats results (i.e: not applicable for the index queries cache or for the write dedupe cache).
func applyFIFOCacheConfig(r *ConfigWrapper) {
chunkCacheConfig := r.ChunkStoreConfig.ChunkCacheConfig
if !cache.IsCacheConfigured(chunkCacheConfig) {
@ -570,6 +569,12 @@ func applyFIFOCacheConfig(r *ConfigWrapper) {
r.QueryRange.ResultsCacheConfig.CacheConfig.Fifocache.MaxSizeBytes = "1GB"
r.QueryRange.ResultsCacheConfig.CacheConfig.Fifocache.TTL = 1 * time.Hour
}
indexStatsCacheConfig := r.QueryRange.StatsCacheConfig.CacheConfig
if !cache.IsCacheConfigured(indexStatsCacheConfig) {
// We use the same config as the query range results cache.
r.QueryRange.StatsCacheConfig.CacheConfig = r.QueryRange.ResultsCacheConfig.CacheConfig
}
}
func applyIngesterFinalSleep(cfg *ConfigWrapper) {

@ -959,8 +959,8 @@ query_range:
endpoint: endpoint.redis.org`
config, _, _ := configWrapperFromYAML(t, configFileString, nil)
assert.EqualValues(t, config.QueryRange.CacheConfig.Redis.Endpoint, "endpoint.redis.org")
assert.False(t, config.QueryRange.CacheConfig.EnableFifoCache)
assert.EqualValues(t, config.QueryRange.ResultsCacheConfig.CacheConfig.Redis.Endpoint, "endpoint.redis.org")
assert.False(t, config.QueryRange.ResultsCacheConfig.CacheConfig.EnableFifoCache)
})
t.Run("no FIFO cache enabled by default if Memcache is set", func(t *testing.T) {
@ -972,13 +972,46 @@ query_range:
host: memcached.host.org`
config, _, _ := configWrapperFromYAML(t, configFileString, nil)
assert.EqualValues(t, "memcached.host.org", config.QueryRange.CacheConfig.MemcacheClient.Host)
assert.False(t, config.QueryRange.CacheConfig.EnableFifoCache)
assert.EqualValues(t, "memcached.host.org", config.QueryRange.ResultsCacheConfig.CacheConfig.MemcacheClient.Host)
assert.False(t, config.QueryRange.ResultsCacheConfig.CacheConfig.EnableFifoCache)
})
t.Run("FIFO cache is enabled by default if no other cache is set", func(t *testing.T) {
config, _, _ := configWrapperFromYAML(t, minimalConfig, nil)
assert.True(t, config.QueryRange.CacheConfig.EnableFifoCache)
assert.True(t, config.QueryRange.ResultsCacheConfig.CacheConfig.EnableFifoCache)
})
})
t.Run("for the index stats results cache config", func(t *testing.T) {
t.Run("no FIFO cache enabled by default if Redis is set", func(t *testing.T) {
configFileString := `---
query_range:
index_stats_results_cache:
cache:
redis:
endpoint: endpoint.redis.org`
config, _, _ := configWrapperFromYAML(t, configFileString, nil)
assert.EqualValues(t, config.QueryRange.StatsCacheConfig.CacheConfig.Redis.Endpoint, "endpoint.redis.org")
assert.False(t, config.QueryRange.StatsCacheConfig.CacheConfig.EnableFifoCache)
})
t.Run("no FIFO cache enabled by default if Memcache is set", func(t *testing.T) {
configFileString := `---
query_range:
index_stats_results_cache:
cache:
memcached_client:
host: memcached.host.org`
config, _, _ := configWrapperFromYAML(t, configFileString, nil)
assert.EqualValues(t, "memcached.host.org", config.QueryRange.StatsCacheConfig.CacheConfig.MemcacheClient.Host)
assert.False(t, config.QueryRange.StatsCacheConfig.CacheConfig.EnableFifoCache)
})
t.Run("FIFO cache is enabled by default if no other cache is set", func(t *testing.T) {
config, _, _ := configWrapperFromYAML(t, minimalConfig, nil)
assert.True(t, config.QueryRange.StatsCacheConfig.CacheConfig.EnableFifoCache)
})
})
}

@ -2,6 +2,7 @@ package queryrange
import (
"context"
"flag"
"fmt"
"github.com/go-kit/log"
@ -47,6 +48,19 @@ func (p IndexStatsExtractor) ResponseWithoutHeaders(resp queryrangebase.Response
}
}
type IndexStatsCacheConfig struct {
queryrangebase.ResultsCacheConfig `yaml:",inline"`
}
// RegisterFlags registers flags.
func (cfg *IndexStatsCacheConfig) RegisterFlags(f *flag.FlagSet) {
cfg.ResultsCacheConfig.RegisterFlagsWithPrefix(f, "frontend.index-stats-results-cache.")
}
func (cfg *IndexStatsCacheConfig) Validate() error {
return cfg.ResultsCacheConfig.Validate()
}
func NewIndexStatsCacheMiddleware(
log log.Logger,
limits Limits,

@ -53,6 +53,7 @@ func TestLimits(t *testing.T) {
func Test_seriesLimiter(t *testing.T) {
cfg := testConfig
cfg.CacheResults = false
cfg.CacheIndexStatsResults = false
// split in 7 with 2 in // max.
l := WithSplitByLimits(fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, time.Hour)
tpw, stopper, err := NewTripperware(cfg, testEngineOpts, util_log.Logger, l, config.SchemaConfig{

@ -78,13 +78,17 @@ type ResultsCacheConfig struct {
Compression string `yaml:"compression"`
}
// RegisterFlags registers flags.
func (cfg *ResultsCacheConfig) RegisterFlags(f *flag.FlagSet) {
cfg.CacheConfig.RegisterFlagsWithPrefix("frontend.", "", f)
func (cfg *ResultsCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
cfg.CacheConfig.RegisterFlagsWithPrefix(prefix, "", f)
f.StringVar(&cfg.Compression, "frontend.compression", "", "Use compression in results cache. Supported values are: 'snappy' and '' (disable compression).")
f.StringVar(&cfg.Compression, prefix+"compression", "", "Use compression in results cache. Supported values are: 'snappy' and '' (disable compression).")
//lint:ignore faillint Need to pass the global logger like this for warning on deprecated methods
flagext.DeprecatedFlag(f, "frontend.cache-split-interval", "Deprecated: The maximum interval expected for each request, results will be cached per single interval. This behavior is now determined by querier.split-queries-by-interval.", util_log.Logger)
flagext.DeprecatedFlag(f, prefix+"cache-split-interval", "Deprecated: The maximum interval expected for each request, results will be cached per single interval. This behavior is now determined by querier.split-queries-by-interval.", util_log.Logger)
}
// RegisterFlags registers flags.
func (cfg *ResultsCacheConfig) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix(f, "frontend.")
}
func (cfg *ResultsCacheConfig) Validate() error {

@ -41,12 +41,11 @@ type Config struct {
// Deprecated: SplitQueriesByInterval will be removed in the next major release
SplitQueriesByInterval time.Duration `yaml:"split_queries_by_interval" doc:"deprecated|description=Use -querier.split-queries-by-interval instead. CLI flag: -querier.split-queries-by-day. Split queries by day and execute in parallel."`
AlignQueriesWithStep bool `yaml:"align_queries_with_step"`
ResultsCacheConfig `yaml:"results_cache"`
CacheResults bool `yaml:"cache_results"`
CacheIndexStatsResults bool `yaml:"cache_index_stats_results"`
MaxRetries int `yaml:"max_retries"`
ShardedQueries bool `yaml:"parallelise_shardable_queries"`
AlignQueriesWithStep bool `yaml:"align_queries_with_step"`
ResultsCacheConfig ResultsCacheConfig `yaml:"results_cache"`
CacheResults bool `yaml:"cache_results"`
MaxRetries int `yaml:"max_retries"`
ShardedQueries bool `yaml:"parallelise_shardable_queries"`
// List of headers which query_range middleware chain would forward to downstream querier.
ForwardHeaders flagext.StringSlice `yaml:"forward_headers_list"`
}
@ -56,7 +55,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxRetries, "querier.max-retries-per-request", 5, "Maximum number of retries for a single request; beyond this, the downstream error is returned.")
f.BoolVar(&cfg.AlignQueriesWithStep, "querier.align-querier-with-step", false, "Mutate incoming queries to align their start and end with their step.")
f.BoolVar(&cfg.CacheResults, "querier.cache-results", false, "Cache query results.")
f.BoolVar(&cfg.CacheIndexStatsResults, "querier.cache-index-stats-results", false, "Cache index stats query results.")
f.BoolVar(&cfg.ShardedQueries, "querier.parallelise-shardable-queries", true, "Perform query parallelisations based on storage sharding configuration and query ASTs. This feature is supported only by the chunks storage engine.")
f.Var(&cfg.ForwardHeaders, "frontend.forward-headers-list", "List of headers forwarded by the query Frontend to downstream querier.")
cfg.ResultsCacheConfig.RegisterFlags(f)
@ -69,7 +67,7 @@ func (cfg *Config) Validate() error {
}
if cfg.CacheResults {
if err := cfg.ResultsCacheConfig.Validate(); err != nil {
return errors.Wrap(err, "invalid ResultsCache config")
return errors.Wrap(err, "invalid results_cache config")
}
}
return nil

@ -9,6 +9,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
@ -26,13 +27,31 @@ import (
// Config is the configuration for the queryrange tripperware
type Config struct {
queryrangebase.Config `yaml:",inline"`
Transformer UserIDTransformer `yaml:"-"`
queryrangebase.Config `yaml:",inline"`
Transformer UserIDTransformer `yaml:"-"`
CacheIndexStatsResults bool `yaml:"cache_index_stats_results"`
StatsCacheConfig IndexStatsCacheConfig `yaml:"index_stats_results_cache" doc:"description=If a cache config is not specified and cache_index_stats_results is true, the config for the results cache is used."`
}
// RegisterFlags adds the flags required to configure this flag set.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.Config.RegisterFlags(f)
f.BoolVar(&cfg.CacheIndexStatsResults, "querier.cache-index-stats-results", false, "Cache index stats query results.")
cfg.StatsCacheConfig.RegisterFlags(f)
}
// Validate validates the config.
func (cfg *Config) Validate() error {
if err := cfg.Config.Validate(); err != nil {
return err
}
if cfg.CacheIndexStatsResults {
if err := cfg.StatsCacheConfig.Validate(); err != nil {
return errors.Wrap(err, "invalid index_stats_results_cache config")
}
}
return nil
}
// Stopper gracefully shutdown resources created
@ -40,6 +59,34 @@ type Stopper interface {
Stop()
}
type StopperWrapper []Stopper
// Stop gracefully shutdowns created resources
func (s StopperWrapper) Stop() {
for _, stopper := range s {
if stopper != nil {
stopper.Stop()
}
}
}
func newResultsCacheFromConfig(cfg queryrangebase.ResultsCacheConfig, registerer prometheus.Registerer, log log.Logger, cacheType stats.CacheType) (cache.Cache, error) {
if !cache.IsCacheConfigured(cfg.CacheConfig) {
return nil, errors.Errorf("%s cache is not configured", cacheType)
}
c, err := cache.New(cfg.CacheConfig, registerer, log, cacheType)
if err != nil {
return nil, err
}
if cfg.Compression == "snappy" {
c = cache.NewSnappy(c, log)
}
return c, nil
}
// NewTripperware returns a Tripperware configured with middlewares to align, split and cache requests.
func NewTripperware(
cfg Config,
@ -54,58 +101,67 @@ func NewTripperware(
metrics := NewMetrics(registerer)
var (
c cache.Cache
err error
resultsCache cache.Cache
statsCache cache.Cache
err error
)
if cfg.CacheResults {
c, err = cache.New(cfg.CacheConfig, registerer, log, stats.ResultCache)
resultsCache, err = newResultsCacheFromConfig(cfg.ResultsCacheConfig, registerer, log, stats.ResultCache)
if err != nil {
return nil, nil, err
}
if cfg.Compression == "snappy" {
c = cache.NewSnappy(c, log)
}
if cfg.CacheIndexStatsResults {
// If the stats cache is not configured, use the results cache config.
cacheCfg := cfg.StatsCacheConfig.ResultsCacheConfig
if !cache.IsCacheConfigured(cacheCfg.CacheConfig) {
level.Debug(log).Log("msg", "using results cache config for stats cache")
cacheCfg = cfg.ResultsCacheConfig
}
statsCache, err = newResultsCacheFromConfig(cacheCfg, registerer, log, stats.StatsResultCache)
if err != nil {
return nil, nil, err
}
}
metricsTripperware, err := NewMetricTripperware(cfg, engineOpts, log, limits, schema, LokiCodec, c,
cacheGenNumLoader, retentionEnabled, PrometheusExtractor{}, metrics)
indexStatsTripperware, err := NewIndexStatsTripperware(cfg, log, limits, schema, LokiCodec, statsCache,
cacheGenNumLoader, retentionEnabled, metrics)
if err != nil {
return nil, nil, err
}
limitedTripperware, err := NewLimitedTripperware(cfg, engineOpts, log, limits, schema, LokiCodec, c,
cacheGenNumLoader, retentionEnabled, metrics)
metricsTripperware, err := NewMetricTripperware(cfg, engineOpts, log, limits, schema, LokiCodec, resultsCache,
cacheGenNumLoader, retentionEnabled, PrometheusExtractor{}, metrics, indexStatsTripperware)
if err != nil {
return nil, nil, err
}
// NOTE: When we would start caching response from non-metric queries we would have to consider cache gen headers as well in
// MergeResponse implementation for Loki codecs same as it is done in Cortex at https://github.com/cortexproject/cortex/blob/21bad57b346c730d684d6d0205efef133422ab28/pkg/querier/queryrange/query_range.go#L170
logFilterTripperware, err := NewLogFilterTripperware(cfg, engineOpts, log, limits, schema, LokiCodec, c,
cacheGenNumLoader, retentionEnabled, metrics)
limitedTripperware, err := NewLimitedTripperware(cfg, engineOpts, log, limits, schema, LokiCodec, metrics, indexStatsTripperware)
if err != nil {
return nil, nil, err
}
seriesTripperware, err := NewSeriesTripperware(cfg, log, limits, LokiCodec, metrics, schema)
// NOTE: When we would start caching response from non-metric queries we would have to consider cache gen headers as well in
// MergeResponse implementation for Loki codecs same as it is done in Cortex at https://github.com/cortexproject/cortex/blob/21bad57b346c730d684d6d0205efef133422ab28/pkg/querier/queryrange/query_range.go#L170
logFilterTripperware, err := NewLogFilterTripperware(cfg, engineOpts, log, limits, schema, LokiCodec, resultsCache, metrics, indexStatsTripperware)
if err != nil {
return nil, nil, err
}
labelsTripperware, err := NewLabelsTripperware(cfg, log, limits, LokiCodec, metrics, schema)
seriesTripperware, err := NewSeriesTripperware(cfg, log, limits, LokiCodec, metrics, schema)
if err != nil {
return nil, nil, err
}
instantMetricTripperware, err := NewInstantMetricTripperware(cfg, engineOpts, log, limits, schema, LokiCodec, c,
cacheGenNumLoader, retentionEnabled, metrics)
labelsTripperware, err := NewLabelsTripperware(cfg, log, limits, LokiCodec, metrics, schema)
if err != nil {
return nil, nil, err
}
indexStatsTripperware, err := NewIndexStatsTripperware(cfg, log, limits, schema, LokiCodec, c,
cacheGenNumLoader, retentionEnabled, metrics)
instantMetricTripperware, err := NewInstantMetricTripperware(cfg, engineOpts, log, limits, schema, LokiCodec, metrics, indexStatsTripperware)
if err != nil {
return nil, nil, err
}
@ -119,7 +175,7 @@ func NewTripperware(
instantRT := instantMetricTripperware(next)
statsRT := indexStatsTripperware(next)
return newRoundTripper(log, next, limitedRT, logFilterRT, metricRT, seriesRT, labelsRT, instantRT, statsRT, limits)
}, c, nil
}, StopperWrapper{resultsCache, statsCache}, nil
}
type roundTripper struct {
@ -308,15 +364,9 @@ func NewLogFilterTripperware(
schema config.SchemaConfig,
codec queryrangebase.Codec,
c cache.Cache,
cacheGenNumLoader queryrangebase.CacheGenNumberLoader,
retentionEnabled bool,
metrics *Metrics,
indexStatsTripperware queryrangebase.Tripperware,
) (queryrangebase.Tripperware, error) {
indexStatsTripperware, err := NewIndexStatsTripperware(cfg, log, limits, schema, codec, c, cacheGenNumLoader, retentionEnabled, metrics)
if err != nil {
return nil, err
}
return func(next http.RoundTripper) http.RoundTripper {
statsHandler := queryrangebase.NewRoundTripperHandler(indexStatsTripperware(next), codec)
@ -390,16 +440,9 @@ func NewLimitedTripperware(
limits Limits,
schema config.SchemaConfig,
codec queryrangebase.Codec,
c cache.Cache,
cacheGenNumLoader queryrangebase.CacheGenNumberLoader,
retentionEnabled bool,
metrics *Metrics,
indexStatsTripperware queryrangebase.Tripperware,
) (queryrangebase.Tripperware, error) {
indexStatsTripperware, err := NewIndexStatsTripperware(cfg, log, limits, schema, codec, c, cacheGenNumLoader, retentionEnabled, metrics)
if err != nil {
return nil, err
}
return func(next http.RoundTripper) http.RoundTripper {
statsHandler := queryrangebase.NewRoundTripperHandler(indexStatsTripperware(next), codec)
@ -518,12 +561,8 @@ func NewMetricTripperware(
retentionEnabled bool,
extractor queryrangebase.Extractor,
metrics *Metrics,
indexStatsTripperware queryrangebase.Tripperware,
) (queryrangebase.Tripperware, error) {
indexStatsTripperware, err := NewIndexStatsTripperware(cfg, log, limits, schema, codec, c, cacheGenNumLoader, retentionEnabled, metrics)
if err != nil {
return nil, err
}
cacheKey := cacheKeyLimits{limits, cfg.Transformer}
var queryCacheMiddleware queryrangebase.Middleware
if cfg.CacheResults {
@ -640,16 +679,9 @@ func NewInstantMetricTripperware(
limits Limits,
schema config.SchemaConfig,
codec queryrangebase.Codec,
c cache.Cache,
cacheGenNumLoader queryrangebase.CacheGenNumberLoader,
retentionEnabled bool,
metrics *Metrics,
indexStatsTripperware queryrangebase.Tripperware,
) (queryrangebase.Tripperware, error) {
indexStatsTripperware, err := NewIndexStatsTripperware(cfg, log, limits, schema, codec, c, cacheGenNumLoader, retentionEnabled, metrics)
if err != nil {
return nil, err
}
return func(next http.RoundTripper) http.RoundTripper {
statsHandler := queryrangebase.NewRoundTripperHandler(indexStatsTripperware(next), codec)

@ -27,6 +27,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/storage/config"
@ -37,21 +38,35 @@ import (
var (
testTime = time.Date(2019, 12, 2, 11, 10, 10, 10, time.UTC)
testConfig = Config{queryrangebase.Config{
AlignQueriesWithStep: true,
MaxRetries: 3,
CacheResults: true,
testConfig = Config{
Config: queryrangebase.Config{
AlignQueriesWithStep: true,
MaxRetries: 3,
CacheResults: true,
ResultsCacheConfig: queryrangebase.ResultsCacheConfig{
CacheConfig: cache.Config{
EnableFifoCache: true,
Fifocache: cache.FifoCacheConfig{
MaxSizeItems: 1024,
TTL: 24 * time.Hour,
},
},
},
},
Transformer: nil,
CacheIndexStatsResults: true,
ResultsCacheConfig: queryrangebase.ResultsCacheConfig{
CacheConfig: cache.Config{
EnableFifoCache: true,
Fifocache: cache.FifoCacheConfig{
MaxSizeItems: 1024,
TTL: 24 * time.Hour,
StatsCacheConfig: IndexStatsCacheConfig{
ResultsCacheConfig: queryrangebase.ResultsCacheConfig{
CacheConfig: cache.Config{
EnableFifoCache: true,
Fifocache: cache.FifoCacheConfig{
MaxSizeItems: 1024,
TTL: 24 * time.Hour,
},
},
},
},
}, nil}
}
testEngineOpts = logql.EngineOpts{
Timeout: 30 * time.Second,
MaxLookBackPeriod: 30 * time.Second,
@ -524,6 +539,149 @@ func TestIndexStatsTripperware(t *testing.T) {
require.Equal(t, response.Entries*2, res.Response.Entries)
}
func TestNewTripperware_Caches(t *testing.T) {
for _, tc := range []struct {
name string
config Config
numCaches int
equalCaches bool
err string
}{
{
name: "results cache disabled, stats cache disabled",
config: Config{
Config: queryrangebase.Config{
CacheResults: false,
},
CacheIndexStatsResults: false,
},
numCaches: 0,
err: "",
},
{
name: "results cache enabled, stats cache disabled",
config: Config{
Config: queryrangebase.Config{
CacheResults: true,
ResultsCacheConfig: queryrangebase.ResultsCacheConfig{
CacheConfig: cache.Config{
EmbeddedCache: cache.EmbeddedCacheConfig{
Enabled: true,
},
},
},
},
CacheIndexStatsResults: false,
},
numCaches: 1,
err: "",
},
{
name: "results cache enabled, stats cache enabled",
config: Config{
Config: queryrangebase.Config{
CacheResults: true,
ResultsCacheConfig: queryrangebase.ResultsCacheConfig{
CacheConfig: cache.Config{
EmbeddedCache: cache.EmbeddedCacheConfig{
Enabled: true,
},
},
},
},
CacheIndexStatsResults: true,
},
numCaches: 2,
equalCaches: true,
err: "",
},
{
name: "results cache enabled, stats cache enabled but different",
config: Config{
Config: queryrangebase.Config{
CacheResults: true,
ResultsCacheConfig: queryrangebase.ResultsCacheConfig{
CacheConfig: cache.Config{
EmbeddedCache: cache.EmbeddedCacheConfig{
Enabled: true,
MaxSizeMB: 2000,
},
},
},
},
CacheIndexStatsResults: true,
StatsCacheConfig: IndexStatsCacheConfig{
ResultsCacheConfig: queryrangebase.ResultsCacheConfig{
CacheConfig: cache.Config{
EmbeddedCache: cache.EmbeddedCacheConfig{
Enabled: true,
MaxSizeMB: 1000,
},
},
},
},
},
numCaches: 2,
equalCaches: false,
err: "",
},
{
name: "results cache enabled (no config provided)",
config: Config{
Config: queryrangebase.Config{
CacheResults: true,
},
},
err: fmt.Sprintf("%s cache is not configured", stats.ResultCache),
},
{
name: "results cache disabled, stats cache enabled (no config provided)",
config: Config{
Config: queryrangebase.Config{
CacheResults: false,
},
CacheIndexStatsResults: true,
},
numCaches: 0,
err: fmt.Sprintf("%s cache is not configured", stats.StatsResultCache),
},
} {
t.Run(tc.name, func(t *testing.T) {
_, stopper, err := NewTripperware(tc.config, testEngineOpts, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{Configs: testSchemas}, nil, false, nil)
if stopper != nil {
defer stopper.Stop()
}
if tc.err != "" {
require.ErrorContains(t, err, tc.err)
return
}
require.NoError(t, err)
require.IsType(t, StopperWrapper{}, stopper)
var caches []cache.Cache
for _, s := range stopper.(StopperWrapper) {
if s != nil {
c, ok := s.(cache.Cache)
require.True(t, ok)
caches = append(caches, c)
}
}
require.Equal(t, tc.numCaches, len(caches))
if tc.numCaches == 2 {
if tc.equalCaches {
require.Equal(t, caches[0], caches[1])
} else {
require.NotEqual(t, caches[0], caches[1])
}
}
})
}
}
func TestLogNoFilter(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, testEngineOpts, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, config.SchemaConfig{Configs: testSchemas}, nil, false, nil)
if stopper != nil {

@ -88,9 +88,22 @@ func IsEmbeddedCacheSet(cfg Config) bool {
return cfg.EmbeddedCache.Enabled
}
// IsCacheConfigured determines if memcached, redis, or embedded-cache have been configured
func IsFifoCacheSet(cfg Config) bool {
return cfg.EnableFifoCache
}
func IsSpecificImplementationSet(cfg Config) bool {
return cfg.Cache != nil
}
// IsCacheConfigured determines if either of the following caches is configured:
// - memcached
// - redis
// - embedded-cache
// - fifo-cache
// - specific cache implementation
func IsCacheConfigured(cfg Config) bool {
return IsMemcacheSet(cfg) || IsRedisSet(cfg) || IsEmbeddedCacheSet(cfg)
return IsMemcacheSet(cfg) || IsRedisSet(cfg) || IsEmbeddedCacheSet(cfg) || IsFifoCacheSet(cfg) || IsSpecificImplementationSet(cfg)
}
// New creates a new Cache using Config.

Loading…
Cancel
Save