Auto-expire old items from FIFO cache (#5148)

* Remove validity from FIFO cache

We can remove the validity setting of 1h in 2.4 for the
chunks cache because it doesn't reduce memory usage and instead leads to
valid chunks being ignore when querying the cache. Chunks are immutable
so it doesn't make sense to not return them if they are present in the
cache.

Closes #4922

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Add periodic task to prune old FIFO cache elements

As described in #4921 this PR adds a periodic task that prunes expired
items from the FIFO cache to free up memory.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Apply suggestions from code review

Co-authored-by: Karen Miller <84039272+KMiller-Grafana@users.noreply.github.com>

Co-authored-by: Karen Miller <84039272+KMiller-Grafana@users.noreply.github.com>
pull/5042/head
Christian Haudum 4 years ago committed by GitHub
parent 7e372cbc56
commit 57b384df8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 7
      docs/sources/configuration/_index.md
  3. 2
      pkg/loki/config_wrapper.go
  4. 9
      pkg/loki/modules.go
  5. 2
      pkg/querier/queryrange/roundtrip_test.go
  6. 4
      pkg/storage/chunk/cache/cache.go
  7. 2
      pkg/storage/chunk/cache/cache_test.go
  8. 82
      pkg/storage/chunk/cache/fifo_cache.go
  9. 93
      pkg/storage/chunk/cache/fifo_cache_test.go
  10. 2
      pkg/storage/chunk/storage/caching_fixtures.go
  11. 12
      pkg/storage/chunk/storage/caching_index_client_test.go

@ -5,6 +5,7 @@
* [5262](https://github.com/grafana/loki/pull/5262) **MichelHollands**: Remove the labelFilter field
* [4911](https://github.com/grafana/loki/pull/4911) **jeschkies**: Support Docker service discovery in Promtail.
* [5107](https://github.com/grafana/loki/pull/5107) **chaudum** Fix bug in fluentd plugin that caused log lines containing non UTF-8 characters to be dropped.
* [5148](https://github.com/grafana/loki/pull/5148) **chaudum** Add periodic task to prune old expired items from the FIFO cache to free up memory.
* [5187](https://github.com/grafana/loki/pull/5187) **aknuds1** Rename metric `cortex_experimental_features_in_use_total` to `loki_experimental_features_in_use_total` and metric `log_messages_total` to `loki_log_messages_total`.
* [5170](https://github.com/grafana/loki/pull/5170) **chaudum** Fix deadlock in Promtail caused when targets got removed from a target group by the discovery manager.
* [5163](https://github.com/grafana/loki/pull/5163) **chaudum** Fix regression in fluentd plugin introduced with #5107 that caused `NoMethodError` when parsing non-string values of log lines.

@ -1900,10 +1900,15 @@ fifocache:
# CLI flag: -<prefix>.fifocache.max-size-items
[max_size_items: <int> | default = 0]
# The expiry duration for the cache.
# Deprecated: The expiry duration for the cache. Use `-<prefix>.fifocache.ttl`.
# The default value of 0 disables expiration.
# CLI flag: -<prefix>.fifocache.duration
[validity: <duration>]
# The time for items to live in the cache before those items are purged.
# The value of 0 disables auto-expiration.
# CLI flag: -<prefix>.fifocache.ttl
[ttl: <duration> | default = 1h]
```
## schema_config

@ -483,7 +483,7 @@ func applyFIFOCacheConfig(r *ConfigWrapper) {
// The query results fifocache is still in Cortex so we couldn't change the flag defaults
// so instead we will override them here.
r.QueryRange.ResultsCacheConfig.CacheConfig.Fifocache.MaxSizeBytes = "1GB"
r.QueryRange.ResultsCacheConfig.CacheConfig.Fifocache.Validity = 1 * time.Hour
r.QueryRange.ResultsCacheConfig.CacheConfig.Fifocache.TTL = 1 * time.Hour
}
}

@ -42,7 +42,6 @@ import (
loki_storage "github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/storage/chunk/storage"
chunk_storage "github.com/grafana/loki/pkg/storage/chunk/storage"
chunk_util "github.com/grafana/loki/pkg/storage/chunk/util"
"github.com/grafana/loki/pkg/storage/stores/shipper"
@ -323,12 +322,12 @@ func (t *Loki) initTableManager() (services.Service, error) {
reg := prometheus.WrapRegistererWith(prometheus.Labels{"component": "table-manager-store"}, prometheus.DefaultRegisterer)
tableClient, err := storage.NewTableClient(lastConfig.IndexType, t.Cfg.StorageConfig.Config, reg)
tableClient, err := chunk_storage.NewTableClient(lastConfig.IndexType, t.Cfg.StorageConfig.Config, reg)
if err != nil {
return nil, err
}
bucketClient, err := storage.NewBucketClient(t.Cfg.StorageConfig.Config)
bucketClient, err := chunk_storage.NewBucketClient(t.Cfg.StorageConfig.Config)
util_log.CheckFatal("initializing bucket client", err, util_log.Logger)
t.tableManager, err = chunk.NewTableManager(t.Cfg.TableManager, t.Cfg.SchemaConfig.SchemaConfig, maxChunkAgeForTableManager, tableClient, bucketClient, nil, prometheus.DefaultRegisterer)
@ -363,7 +362,7 @@ func (t *Loki) initStore() (_ services.Service, err error) {
// however it has to be deserialized to do so, setting the cache validity to some arbitrary amount less than the
// IndexCacheValidity guarantees the FIFO cache will expire the object first which can be done without
// having to deserialize the object.
Validity: t.Cfg.StorageConfig.IndexCacheValidity - 1*time.Minute,
TTL: t.Cfg.StorageConfig.IndexCacheValidity - 1*time.Minute,
},
}
// Force the retain period to be longer than the IndexCacheValidity used in the store, this guarantees we don't
@ -718,7 +717,7 @@ func (t *Loki) initCompactor() (services.Service, error) {
func (t *Loki) initIndexGateway() (services.Service, error) {
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly
objectClient, err := storage.NewObjectClient(t.Cfg.StorageConfig.BoltDBShipperConfig.SharedStoreType, t.Cfg.StorageConfig.Config, t.clientMetrics)
objectClient, err := chunk_storage.NewObjectClient(t.Cfg.StorageConfig.BoltDBShipperConfig.SharedStoreType, t.Cfg.StorageConfig.Config, t.clientMetrics)
if err != nil {
return nil, err
}

@ -41,7 +41,7 @@ var (
EnableFifoCache: true,
Fifocache: cache.FifoCacheConfig{
MaxSizeItems: 1024,
Validity: 24 * time.Hour,
TTL: 24 * time.Hour,
},
},
},

@ -91,8 +91,8 @@ func New(cfg Config, reg prometheus.Registerer, logger log.Logger) (Cache, error
caches := []Cache{}
if cfg.EnableFifoCache {
if cfg.Fifocache.Validity == 0 && cfg.DefaultValidity != 0 {
cfg.Fifocache.Validity = cfg.DefaultValidity
if cfg.Fifocache.TTL == 0 && cfg.DefaultValidity != 0 {
cfg.Fifocache.TTL = cfg.DefaultValidity
}
if cache := NewFifoCache(cfg.Prefix+"fifocache", cfg.Fifocache, reg, logger); cache != nil {

@ -199,7 +199,7 @@ func TestMemcache(t *testing.T) {
}
func TestFifoCache(t *testing.T) {
cache := cache.NewFifoCache("test", cache.FifoCacheConfig{MaxSizeItems: 1e3, Validity: 1 * time.Hour},
cache := cache.NewFifoCache("test", cache.FifoCacheConfig{MaxSizeItems: 1e3, TTL: 1 * time.Hour},
nil, log.NewNopLogger())
testCache(t, cache)
}

@ -33,18 +33,22 @@ const (
type FifoCacheConfig struct {
MaxSizeBytes string `yaml:"max_size_bytes"`
MaxSizeItems int `yaml:"max_size_items"`
Validity time.Duration `yaml:"validity"`
TTL time.Duration `yaml:"ttl"`
DeprecatedSize int `yaml:"size"`
DeprecatedValidity time.Duration `yaml:"validity"`
DeprecatedSize int `yaml:"size"`
PurgeInterval time.Duration
}
// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
func (cfg *FifoCacheConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) {
f.StringVar(&cfg.MaxSizeBytes, prefix+"fifocache.max-size-bytes", "1GB", description+"Maximum memory size of the cache in bytes. A unit suffix (KB, MB, GB) may be applied.")
f.IntVar(&cfg.MaxSizeItems, prefix+"fifocache.max-size-items", 0, description+"Maximum number of entries in the cache.")
f.DurationVar(&cfg.Validity, prefix+"fifocache.duration", time.Hour, description+"The expiry duration for the cache.")
f.DurationVar(&cfg.TTL, prefix+"fifocache.ttl", time.Hour, description+"The time to live for items in the cache before they get purged.")
f.IntVar(&cfg.DeprecatedSize, prefix+"fifocache.size", 0, "Deprecated (use max-size-items or max-size-bytes instead): "+description+"The number of entries to cache. ")
f.DurationVar(&cfg.DeprecatedValidity, prefix+"fifocache.duration", 0, "Deprecated (use ttl instead): "+description+"The expiry duration for the cache.")
f.IntVar(&cfg.DeprecatedSize, prefix+"fifocache.size", 0, "Deprecated (use max-size-items or max-size-bytes instead): "+description+"The number of entries to cache.")
}
func (cfg *FifoCacheConfig) Validate() error {
@ -70,11 +74,12 @@ type FifoCache struct {
maxSizeItems int
maxSizeBytes uint64
currSizeBytes uint64
validity time.Duration
entries map[string]*list.Element
lru *list.List
done chan struct{}
entriesAdded prometheus.Counter
entriesAddedNew prometheus.Counter
entriesEvicted prometheus.Counter
@ -107,13 +112,27 @@ func NewFifoCache(name string, cfg FifoCacheConfig, reg prometheus.Registerer, l
level.Warn(logger).Log("msg", "neither fifocache.max-size-bytes nor fifocache.max-size-items is set", "cache", name)
return nil
}
return &FifoCache{
if cfg.DeprecatedValidity > 0 {
flagext.DeprecatedFlagsUsed.Inc()
level.Warn(logger).Log("msg", "running with DEPRECATED flag fifocache.interval, use fifocache.ttl instead", "cache", name)
cfg.TTL = cfg.DeprecatedValidity
}
// Set a default interval for the ticker
// This can be overwritten to a smaller value in tests
if cfg.PurgeInterval == 0 {
cfg.PurgeInterval = 1 * time.Minute
}
cache := &FifoCache{
maxSizeItems: cfg.MaxSizeItems,
maxSizeBytes: maxSizeBytes,
validity: cfg.Validity,
entries: make(map[string]*list.Element),
lru: list.New(),
done: make(chan struct{}),
entriesAdded: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Namespace: "querier",
Subsystem: "cache",
@ -166,7 +185,7 @@ func NewFifoCache(name string, cfg FifoCacheConfig, reg prometheus.Registerer, l
Namespace: "querier",
Subsystem: "cache",
Name: "stale_gets_total",
Help: "The total number of Get calls that had an entry which expired",
Help: "The total number of Get calls that had an entry which expired (deprecated)",
ConstLabels: prometheus.Labels{"cache": name},
}),
@ -178,6 +197,43 @@ func NewFifoCache(name string, cfg FifoCacheConfig, reg prometheus.Registerer, l
ConstLabels: prometheus.Labels{"cache": name},
}),
}
if cfg.TTL > 0 {
go cache.runPruneJob(cfg.PurgeInterval, cfg.TTL)
}
return cache
}
func (c *FifoCache) runPruneJob(interval, ttl time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-c.done:
return
case <-ticker.C:
c.pruneExpiredItems(ttl)
}
}
}
// pruneExpiredItems prunes items in the cache that exceeded their ttl
func (c *FifoCache) pruneExpiredItems(ttl time.Duration) {
c.lock.Lock()
defer c.lock.Unlock()
for k, v := range c.entries {
entry := v.Value.(*cacheEntry)
if time.Since(entry.updated) > ttl {
_ = c.lru.Remove(v).(*cacheEntry)
delete(c.entries, k)
c.currSizeBytes -= sizeOf(entry)
c.entriesCurrent.Dec()
c.entriesEvicted.Inc()
}
}
}
// Fetch implements Cache.
@ -214,6 +270,8 @@ func (c *FifoCache) Stop() {
c.lock.Lock()
defer c.lock.Unlock()
close(c.done)
c.entriesEvicted.Add(float64(c.lru.Len()))
c.entries = make(map[string]*list.Element)
@ -285,13 +343,7 @@ func (c *FifoCache) Get(ctx context.Context, key string) ([]byte, bool) {
element, ok := c.entries[key]
if ok {
entry := element.Value.(*cacheEntry)
if c.validity == 0 || time.Since(entry.updated) < c.validity {
return entry.value, true
}
c.totalMisses.Inc()
c.staleGets.Inc()
return nil, false
return entry.value, true
}
c.totalMisses.Inc()

@ -29,11 +29,11 @@ func TestFifoCacheEviction(t *testing.T) {
}{
{
name: "test-memory-eviction",
cfg: FifoCacheConfig{MaxSizeBytes: strconv.FormatInt(int64(cnt*sizeOf(itemTemplate)), 10), Validity: 1 * time.Minute},
cfg: FifoCacheConfig{MaxSizeBytes: strconv.FormatInt(int64(cnt*sizeOf(itemTemplate)), 10), TTL: 1 * time.Minute},
},
{
name: "test-items-eviction",
cfg: FifoCacheConfig{MaxSizeItems: cnt, Validity: 1 * time.Minute},
cfg: FifoCacheConfig{MaxSizeItems: cnt, TTL: 1 * time.Minute},
},
}
@ -63,7 +63,6 @@ func TestFifoCacheEviction(t *testing.T) {
assert.Equal(t, testutil.ToFloat64(c.entriesCurrent), float64(c.lru.Len()))
assert.Equal(t, testutil.ToFloat64(c.totalGets), float64(0))
assert.Equal(t, testutil.ToFloat64(c.totalMisses), float64(0))
assert.Equal(t, testutil.ToFloat64(c.staleGets), float64(0))
assert.Equal(t, testutil.ToFloat64(c.memoryBytes), float64(cnt*sizeOf(itemTemplate)))
for i := 0; i < cnt; i++ {
@ -81,7 +80,6 @@ func TestFifoCacheEviction(t *testing.T) {
assert.Equal(t, testutil.ToFloat64(c.entriesCurrent), float64(c.lru.Len()))
assert.Equal(t, testutil.ToFloat64(c.totalGets), float64(cnt))
assert.Equal(t, testutil.ToFloat64(c.totalMisses), float64(0))
assert.Equal(t, testutil.ToFloat64(c.staleGets), float64(0))
assert.Equal(t, testutil.ToFloat64(c.memoryBytes), float64(cnt*sizeOf(itemTemplate)))
// Check evictions
@ -106,7 +104,6 @@ func TestFifoCacheEviction(t *testing.T) {
assert.Equal(t, testutil.ToFloat64(c.entriesCurrent), float64(c.lru.Len()))
assert.Equal(t, testutil.ToFloat64(c.totalGets), float64(cnt))
assert.Equal(t, testutil.ToFloat64(c.totalMisses), float64(0))
assert.Equal(t, testutil.ToFloat64(c.staleGets), float64(0))
assert.Equal(t, testutil.ToFloat64(c.memoryBytes), float64(cnt*sizeOf(itemTemplate)))
for i := 0; i < cnt-evicted; i++ {
@ -128,7 +125,6 @@ func TestFifoCacheEviction(t *testing.T) {
assert.Equal(t, testutil.ToFloat64(c.entriesCurrent), float64(c.lru.Len()))
assert.Equal(t, testutil.ToFloat64(c.totalGets), float64(cnt*2+evicted))
assert.Equal(t, testutil.ToFloat64(c.totalMisses), float64(cnt-evicted))
assert.Equal(t, testutil.ToFloat64(c.staleGets), float64(0))
assert.Equal(t, testutil.ToFloat64(c.memoryBytes), float64(cnt*sizeOf(itemTemplate)))
// Check updates work
@ -159,7 +155,6 @@ func TestFifoCacheEviction(t *testing.T) {
assert.Equal(t, testutil.ToFloat64(c.entriesCurrent), float64(c.lru.Len()))
assert.Equal(t, testutil.ToFloat64(c.totalGets), float64(cnt*2+evicted*2))
assert.Equal(t, testutil.ToFloat64(c.totalMisses), float64(cnt-evicted))
assert.Equal(t, testutil.ToFloat64(c.staleGets), float64(0))
assert.Equal(t, testutil.ToFloat64(c.memoryBytes), float64(cnt*sizeOf(itemTemplate)))
c.Stop()
@ -168,7 +163,7 @@ func TestFifoCacheEviction(t *testing.T) {
func TestFifoCacheExpiry(t *testing.T) {
key1, key2, key3, key4 := "01", "02", "03", "04"
data1, data2, data3 := genBytes(24), []byte("testdata"), genBytes(8)
data1, data2, data3, data4 := genBytes(32), genBytes(64), genBytes(128), genBytes(32)
memorySz := sizeOf(&cacheEntry{key: key1, value: data1}) +
sizeOf(&cacheEntry{key: key2, value: data2}) +
@ -180,58 +175,64 @@ func TestFifoCacheExpiry(t *testing.T) {
}{
{
name: "test-memory-expiry",
cfg: FifoCacheConfig{MaxSizeBytes: strconv.FormatInt(int64(memorySz), 10), Validity: 5 * time.Millisecond},
cfg: FifoCacheConfig{
MaxSizeBytes: strconv.FormatInt(int64(memorySz), 10),
TTL: 250 * time.Millisecond,
PurgeInterval: 100 * time.Millisecond,
},
},
{
name: "test-items-expiry",
cfg: FifoCacheConfig{MaxSizeItems: 3, Validity: 5 * time.Millisecond},
cfg: FifoCacheConfig{
MaxSizeItems: 3,
TTL: 100 * time.Millisecond,
PurgeInterval: 50 * time.Millisecond,
},
},
}
for _, test := range tests {
c := NewFifoCache(test.name, test.cfg, nil, log.NewNopLogger())
ctx := context.Background()
t.Run(test.name, func(t *testing.T) {
c := NewFifoCache(test.name, test.cfg, nil, log.NewNopLogger())
ctx := context.Background()
err := c.Store(ctx,
[]string{key1, key2, key4, key3, key2, key1},
[][]byte{genBytes(16), []byte("dummy"), genBytes(20), data3, data2, data1})
require.NoError(t, err)
err := c.Store(ctx, []string{key1, key2, key3, key4}, [][]byte{data1, data2, data3, data4})
require.NoError(t, err)
value, ok := c.Get(ctx, key1)
require.True(t, ok)
require.Equal(t, data1, value)
value, ok := c.Get(ctx, key4)
require.True(t, ok)
require.Equal(t, data1, value)
_, ok = c.Get(ctx, key4)
require.False(t, ok)
_, ok = c.Get(ctx, key1)
require.False(t, ok)
assert.Equal(t, testutil.ToFloat64(c.entriesAdded), float64(1))
assert.Equal(t, testutil.ToFloat64(c.entriesAddedNew), float64(5))
assert.Equal(t, testutil.ToFloat64(c.entriesEvicted), float64(2))
assert.Equal(t, testutil.ToFloat64(c.entriesCurrent), float64(3))
assert.Equal(t, testutil.ToFloat64(c.entriesCurrent), float64(len(c.entries)))
assert.Equal(t, testutil.ToFloat64(c.entriesCurrent), float64(c.lru.Len()))
assert.Equal(t, testutil.ToFloat64(c.totalGets), float64(2))
assert.Equal(t, testutil.ToFloat64(c.totalMisses), float64(1))
assert.Equal(t, testutil.ToFloat64(c.staleGets), float64(0))
assert.Equal(t, testutil.ToFloat64(c.memoryBytes), float64(memorySz))
assert.Equal(t, float64(1), testutil.ToFloat64(c.entriesAdded))
assert.Equal(t, float64(4), testutil.ToFloat64(c.entriesAddedNew))
assert.Equal(t, float64(1), testutil.ToFloat64(c.entriesEvicted))
assert.Equal(t, float64(3), testutil.ToFloat64(c.entriesCurrent))
assert.Equal(t, float64(len(c.entries)), testutil.ToFloat64(c.entriesCurrent))
assert.Equal(t, float64(c.lru.Len()), testutil.ToFloat64(c.entriesCurrent))
assert.Equal(t, float64(2), testutil.ToFloat64(c.totalGets))
assert.Equal(t, float64(1), testutil.ToFloat64(c.totalMisses))
assert.Equal(t, float64(memorySz), testutil.ToFloat64(c.memoryBytes))
// Expire the item.
time.Sleep(5 * time.Millisecond)
_, ok = c.Get(ctx, key1)
require.False(t, ok)
// Expire the item.
time.Sleep(2 * test.cfg.TTL)
_, ok = c.Get(ctx, key4)
require.False(t, ok)
assert.Equal(t, testutil.ToFloat64(c.entriesAdded), float64(1))
assert.Equal(t, testutil.ToFloat64(c.entriesAddedNew), float64(5))
assert.Equal(t, testutil.ToFloat64(c.entriesEvicted), float64(2))
assert.Equal(t, testutil.ToFloat64(c.entriesCurrent), float64(3))
assert.Equal(t, testutil.ToFloat64(c.entriesCurrent), float64(len(c.entries)))
assert.Equal(t, testutil.ToFloat64(c.entriesCurrent), float64(c.lru.Len()))
assert.Equal(t, testutil.ToFloat64(c.totalGets), float64(3))
assert.Equal(t, testutil.ToFloat64(c.totalMisses), float64(2))
assert.Equal(t, testutil.ToFloat64(c.staleGets), float64(1))
assert.Equal(t, testutil.ToFloat64(c.memoryBytes), float64(memorySz))
assert.Equal(t, float64(1), testutil.ToFloat64(c.entriesAdded))
assert.Equal(t, float64(4), testutil.ToFloat64(c.entriesAddedNew))
assert.Equal(t, float64(4), testutil.ToFloat64(c.entriesEvicted))
assert.Equal(t, float64(0), testutil.ToFloat64(c.entriesCurrent))
assert.Equal(t, float64(len(c.entries)), testutil.ToFloat64(c.entriesCurrent))
assert.Equal(t, float64(c.lru.Len()), testutil.ToFloat64(c.entriesCurrent))
assert.Equal(t, float64(3), testutil.ToFloat64(c.totalGets))
assert.Equal(t, float64(2), testutil.ToFloat64(c.totalMisses))
assert.Equal(t, float64(memorySz), testutil.ToFloat64(c.memoryBytes))
c.Stop()
c.Stop()
})
}
}

@ -30,7 +30,7 @@ func (f fixture) Clients() (chunk.IndexClient, chunk.Client, chunk.TableClient,
logger := log.NewNopLogger()
indexClient = newCachingIndexClient(indexClient, cache.NewFifoCache("index-fifo", cache.FifoCacheConfig{
MaxSizeItems: 500,
Validity: 5 * time.Minute,
TTL: 5 * time.Minute,
}, reg, logger), 5*time.Minute, limits, logger, false)
return indexClient, chunkClient, tableClient, schemaConfig, closer, err
}

@ -44,7 +44,7 @@ func TestCachingStorageClientBasic(t *testing.T) {
limits, err := defaultLimits()
require.NoError(t, err)
logger := log.NewNopLogger()
cache := cache.NewFifoCache("test", cache.FifoCacheConfig{MaxSizeItems: 10, Validity: 10 * time.Second}, nil, logger)
cache := cache.NewFifoCache("test", cache.FifoCacheConfig{MaxSizeItems: 10, TTL: 10 * time.Second}, nil, logger)
client := newCachingIndexClient(store, cache, 1*time.Second, limits, logger, false)
queries := []chunk.IndexQuery{{
TableName: "table",
@ -76,7 +76,7 @@ func TestTempCachingStorageClient(t *testing.T) {
limits, err := defaultLimits()
require.NoError(t, err)
logger := log.NewNopLogger()
cache := cache.NewFifoCache("test", cache.FifoCacheConfig{MaxSizeItems: 10, Validity: 10 * time.Second}, nil, logger)
cache := cache.NewFifoCache("test", cache.FifoCacheConfig{MaxSizeItems: 10, TTL: 10 * time.Second}, nil, logger)
client := newCachingIndexClient(store, cache, 100*time.Millisecond, limits, logger, false)
queries := []chunk.IndexQuery{
{TableName: "table", HashValue: "foo"},
@ -135,7 +135,7 @@ func TestPermCachingStorageClient(t *testing.T) {
limits, err := defaultLimits()
require.NoError(t, err)
logger := log.NewNopLogger()
cache := cache.NewFifoCache("test", cache.FifoCacheConfig{MaxSizeItems: 10, Validity: 10 * time.Second}, nil, logger)
cache := cache.NewFifoCache("test", cache.FifoCacheConfig{MaxSizeItems: 10, TTL: 10 * time.Second}, nil, logger)
client := newCachingIndexClient(store, cache, 100*time.Millisecond, limits, logger, false)
queries := []chunk.IndexQuery{
{TableName: "table", HashValue: "foo", Immutable: true},
@ -187,7 +187,7 @@ func TestCachingStorageClientEmptyResponse(t *testing.T) {
limits, err := defaultLimits()
require.NoError(t, err)
logger := log.NewNopLogger()
cache := cache.NewFifoCache("test", cache.FifoCacheConfig{MaxSizeItems: 10, Validity: 10 * time.Second}, nil, logger)
cache := cache.NewFifoCache("test", cache.FifoCacheConfig{MaxSizeItems: 10, TTL: 10 * time.Second}, nil, logger)
client := newCachingIndexClient(store, cache, 1*time.Second, limits, logger, false)
queries := []chunk.IndexQuery{{TableName: "table", HashValue: "foo"}}
err = client.QueryPages(ctx, queries, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool {
@ -226,7 +226,7 @@ func TestCachingStorageClientCollision(t *testing.T) {
limits, err := defaultLimits()
require.NoError(t, err)
logger := log.NewNopLogger()
cache := cache.NewFifoCache("test", cache.FifoCacheConfig{MaxSizeItems: 10, Validity: 10 * time.Second}, nil, logger)
cache := cache.NewFifoCache("test", cache.FifoCacheConfig{MaxSizeItems: 10, TTL: 10 * time.Second}, nil, logger)
client := newCachingIndexClient(store, cache, 1*time.Second, limits, logger, false)
queries := []chunk.IndexQuery{
{TableName: "table", HashValue: "foo", RangeValuePrefix: []byte("bar")},
@ -406,7 +406,7 @@ func TestCachingStorageClientStoreQueries(t *testing.T) {
require.NoError(t, err)
logger := log.NewNopLogger()
cache := &mockCache{
Cache: cache.NewFifoCache("test", cache.FifoCacheConfig{MaxSizeItems: 10, Validity: 10 * time.Second}, nil, logger),
Cache: cache.NewFifoCache("test", cache.FifoCacheConfig{MaxSizeItems: 10, TTL: 10 * time.Second}, nil, logger),
}
client := newCachingIndexClient(store, cache, 1*time.Second, limits, logger, disableBroadQueries)
var callbackQueries []chunk.IndexQuery

Loading…
Cancel
Save