From 3ece2ea6c470b7a53e92c395e28af2999c328199 Mon Sep 17 00:00:00 2001 From: Ed Welch Date: Thu, 4 Apr 2024 19:36:23 -0400 Subject: [PATCH] refactor!: remove async cache writeback on chunk fetcher (#12456) Signed-off-by: Edward Welch --- docs/sources/configure/_index.md | 21 ++--- docs/sources/setup/upgrade/_index.md | 9 +++ pkg/storage/chunk/cache/background.go | 6 +- pkg/storage/chunk/cache/cache.go | 7 -- pkg/storage/chunk/cache/cache_test.go | 2 +- pkg/storage/chunk/cache/memcached.go | 4 +- pkg/storage/chunk/fetcher/fetcher.go | 79 +++---------------- pkg/storage/chunk/fetcher/fetcher_test.go | 4 +- pkg/storage/store.go | 2 +- pkg/storage/stores/series_store_write_test.go | 2 +- pkg/storage/util_test.go | 2 +- 11 files changed, 38 insertions(+), 100 deletions(-) diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index ba9e5c7235..d3265e8644 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -4727,15 +4727,16 @@ The cache block configures the cache backend. The supported CLI flags `` background: # At what concurrency to write back to cache. # CLI flag: -.background.write-back-concurrency - [writeback_goroutines: | default = 10] + [writeback_goroutines: | default = 1] - # How many key batches to buffer for background write-back. + # How many key batches to buffer for background write-back. Default is large + # to prefer size based limiting. # CLI flag: -.background.write-back-buffer - [writeback_buffer: | default = 10000] + [writeback_buffer: | default = 500000] # Size limit in bytes for background write-back. # CLI flag: -.background.write-back-size-limit - [writeback_size_limit: | default = 1GB] + [writeback_size_limit: | default = 500MB] memcached: # How long keys stay in the memcache. @@ -4744,11 +4745,11 @@ memcached: # How many keys to fetch in each batch. # CLI flag: -.memcached.batchsize - [batch_size: | default = 256] + [batch_size: | default = 4] # Maximum active requests to memcache. # CLI flag: -.memcached.parallelism - [parallelism: | default = 10] + [parallelism: | default = 5] memcached_client: # Hostname for memcached service to use. If empty and if addresses is unset, @@ -4942,14 +4943,6 @@ embedded_cache: # The time to live for items in the cache before they get purged. # CLI flag: -.embedded-cache.ttl [ttl: | default = 1h] - -# The maximum number of concurrent asynchronous writeback cache can occur. -# CLI flag: -.max-async-cache-write-back-concurrency -[async_cache_write_back_concurrency: | default = 16] - -# The maximum number of enqueued asynchronous writeback cache allowed. -# CLI flag: -.max-async-cache-write-back-buffer-size -[async_cache_write_back_buffer_size: | default = 500] ``` ### period_config diff --git a/docs/sources/setup/upgrade/_index.md b/docs/sources/setup/upgrade/_index.md index 2cec750395..4badb1a4c5 100644 --- a/docs/sources/setup/upgrade/_index.md +++ b/docs/sources/setup/upgrade/_index.md @@ -107,6 +107,15 @@ Going forward compactor will run compaction and retention on all the object stor `-compactor.delete-request-store` or its YAML setting should be explicitly configured when retention is enabled, this is required for storing delete requests. The path prefix under which the delete requests are stored is decided by `-compactor.delete-request-store.key-prefix`, it defaults to `index/`. +#### Configuration `async_cache_write_back_concurrency` and `async_cache_write_back_buffer_size` have been removed + +These configurations were redundant with the `Background` configuration in the [cache-config]({{< relref "../../configure#cache_config" >}}). + +`async_cache_write_back_concurrency` can be set with `writeback_goroutines` +`async_cache_write_back_buffer_size` can be set with `writeback_buffer` + +additionally the `Background` configuration also lest you set `writeback_size_limit` which can be used to set a maximum amount of memory to use for writeback objects vs a count of objects. + #### Configuration `use_boltdb_shipper_as_backup` is removed The setting `use_boltdb_shipper_as_backup` (`-tsdb.shipper.use-boltdb-shipper-as-backup`) was a remnant from the development of the TSDB storage. diff --git a/pkg/storage/chunk/cache/background.go b/pkg/storage/chunk/cache/background.go index 859bdf96f9..c5899e3c8b 100644 --- a/pkg/storage/chunk/cache/background.go +++ b/pkg/storage/chunk/cache/background.go @@ -26,9 +26,9 @@ type BackgroundConfig struct { // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet func (cfg *BackgroundConfig) RegisterFlagsWithPrefix(prefix string, description string, f *flag.FlagSet) { - f.IntVar(&cfg.WriteBackGoroutines, prefix+"background.write-back-concurrency", 10, description+"At what concurrency to write back to cache.") - f.IntVar(&cfg.WriteBackBuffer, prefix+"background.write-back-buffer", 10000, description+"How many key batches to buffer for background write-back.") - _ = cfg.WriteBackSizeLimit.Set("1GB") + f.IntVar(&cfg.WriteBackGoroutines, prefix+"background.write-back-concurrency", 1, description+"At what concurrency to write back to cache.") + f.IntVar(&cfg.WriteBackBuffer, prefix+"background.write-back-buffer", 500000, description+"How many key batches to buffer for background write-back. Default is large to prefer size based limiting.") + _ = cfg.WriteBackSizeLimit.Set("500MB") f.Var(&cfg.WriteBackSizeLimit, prefix+"background.write-back-size-limit", description+"Size limit in bytes for background write-back.") } diff --git a/pkg/storage/chunk/cache/cache.go b/pkg/storage/chunk/cache/cache.go index 6e1565fcaa..9239fe88d7 100644 --- a/pkg/storage/chunk/cache/cache.go +++ b/pkg/storage/chunk/cache/cache.go @@ -38,11 +38,6 @@ type Config struct { // For tests to inject specific implementations. Cache Cache `yaml:"-"` - - // AsyncCacheWriteBackConcurrency specifies the number of goroutines to use when asynchronously writing chunks fetched from the store to the chunk cache. - AsyncCacheWriteBackConcurrency int `yaml:"async_cache_write_back_concurrency"` - // AsyncCacheWriteBackBufferSize specifies the maximum number of fetched chunks to buffer for writing back to the chunk cache. - AsyncCacheWriteBackBufferSize int `yaml:"async_cache_write_back_buffer_size"` } // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet @@ -52,8 +47,6 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, description string, f cfg.MemcacheClient.RegisterFlagsWithPrefix(prefix, description, f) cfg.Redis.RegisterFlagsWithPrefix(prefix, description, f) cfg.EmbeddedCache.RegisterFlagsWithPrefix(prefix+"embedded-cache.", description, f) - f.IntVar(&cfg.AsyncCacheWriteBackConcurrency, prefix+"max-async-cache-write-back-concurrency", 16, "The maximum number of concurrent asynchronous writeback cache can occur.") - f.IntVar(&cfg.AsyncCacheWriteBackBufferSize, prefix+"max-async-cache-write-back-buffer-size", 500, "The maximum number of enqueued asynchronous writeback cache allowed.") f.DurationVar(&cfg.DefaultValidity, prefix+"default-validity", time.Hour, description+"The default validity of entries for caches unless overridden.") cfg.Prefix = prefix diff --git a/pkg/storage/chunk/cache/cache_test.go b/pkg/storage/chunk/cache/cache_test.go index 5595b2df0a..23550dd349 100644 --- a/pkg/storage/chunk/cache/cache_test.go +++ b/pkg/storage/chunk/cache/cache_test.go @@ -131,7 +131,7 @@ func testChunkFetcher(t *testing.T, c cache.Cache, chunks []chunk.Chunk) { }, } - fetcher, err := fetcher.New(c, nil, false, s, nil, 10, 100, 0) + fetcher, err := fetcher.New(c, nil, false, s, nil, 0) require.NoError(t, err) defer fetcher.Stop() diff --git a/pkg/storage/chunk/cache/memcached.go b/pkg/storage/chunk/cache/memcached.go index ca8e2e2f92..8e47168afc 100644 --- a/pkg/storage/chunk/cache/memcached.go +++ b/pkg/storage/chunk/cache/memcached.go @@ -30,8 +30,8 @@ type MemcachedConfig struct { // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet func (cfg *MemcachedConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) { f.DurationVar(&cfg.Expiration, prefix+"memcached.expiration", 0, description+"How long keys stay in the memcache.") - f.IntVar(&cfg.BatchSize, prefix+"memcached.batchsize", 256, description+"How many keys to fetch in each batch.") - f.IntVar(&cfg.Parallelism, prefix+"memcached.parallelism", 10, description+"Maximum active requests to memcache.") + f.IntVar(&cfg.BatchSize, prefix+"memcached.batchsize", 4, description+"How many keys to fetch in each batch.") + f.IntVar(&cfg.Parallelism, prefix+"memcached.parallelism", 5, description+"Maximum active requests to memcache.") } // Memcached type caches chunks in memcached diff --git a/pkg/storage/chunk/fetcher/fetcher.go b/pkg/storage/chunk/fetcher/fetcher.go index 7801143932..cf763b9cbe 100644 --- a/pkg/storage/chunk/fetcher/fetcher.go +++ b/pkg/storage/chunk/fetcher/fetcher.go @@ -2,7 +2,6 @@ package fetcher import ( "context" - "errors" "sync" "time" @@ -23,19 +22,6 @@ import ( ) var ( - errAsyncBufferFull = errors.New("the async buffer is full") - skipped = promauto.NewCounter(prometheus.CounterOpts{ - Name: "loki_chunk_fetcher_cache_skipped_buffer_full_total", - Help: "Total number of operations against cache that have been skipped.", - }) - chunkFetcherCacheQueueEnqueue = promauto.NewCounter(prometheus.CounterOpts{ - Name: "loki_chunk_fetcher_cache_enqueued_total", - Help: "Total number of chunks enqueued to a buffer to be asynchronously written back to the chunk cache.", - }) - chunkFetcherCacheQueueDequeue = promauto.NewCounter(prometheus.CounterOpts{ - Name: "loki_chunk_fetcher_cache_dequeued_total", - Help: "Total number of chunks asynchronously dequeued from a buffer and written back to the chunk cache.", - }) cacheCorrupt = promauto.NewCounter(prometheus.CounterOpts{ Namespace: constants.Loki, Name: "cache_corrupt_chunks_total", @@ -69,12 +55,7 @@ type Fetcher struct { wait sync.WaitGroup decodeRequests chan decodeRequest - maxAsyncConcurrency int - maxAsyncBufferSize int - - asyncQueue chan []chunk.Chunk - stopOnce sync.Once - stop chan struct{} + stopOnce sync.Once } type decodeRequest struct { @@ -89,18 +70,15 @@ type decodeResponse struct { } // New makes a new ChunkFetcher. -func New(cache cache.Cache, cachel2 cache.Cache, cacheStubs bool, schema config.SchemaConfig, storage client.Client, maxAsyncConcurrency int, maxAsyncBufferSize int, l2CacheHandoff time.Duration) (*Fetcher, error) { +func New(cache cache.Cache, cachel2 cache.Cache, cacheStubs bool, schema config.SchemaConfig, storage client.Client, l2CacheHandoff time.Duration) (*Fetcher, error) { c := &Fetcher{ - schema: schema, - storage: storage, - cache: cache, - cachel2: cachel2, - l2CacheHandoff: l2CacheHandoff, - cacheStubs: cacheStubs, - decodeRequests: make(chan decodeRequest), - maxAsyncConcurrency: maxAsyncConcurrency, - maxAsyncBufferSize: maxAsyncBufferSize, - stop: make(chan struct{}), + schema: schema, + storage: storage, + cache: cache, + cachel2: cachel2, + l2CacheHandoff: l2CacheHandoff, + cacheStubs: cacheStubs, + decodeRequests: make(chan decodeRequest), } c.wait.Add(chunkDecodeParallelism) @@ -108,48 +86,15 @@ func New(cache cache.Cache, cachel2 cache.Cache, cacheStubs bool, schema config. go c.worker() } - // Start a number of goroutines - processing async operations - equal - // to the max concurrency we have. - c.asyncQueue = make(chan []chunk.Chunk, c.maxAsyncBufferSize) - for i := 0; i < c.maxAsyncConcurrency; i++ { - go c.asyncWriteBackCacheQueueProcessLoop() - } - return c, nil } -func (c *Fetcher) writeBackCacheAsync(fromStorage []chunk.Chunk) error { - select { - case c.asyncQueue <- fromStorage: - chunkFetcherCacheQueueEnqueue.Add(float64(len(fromStorage))) - return nil - default: - return errAsyncBufferFull - } -} - -func (c *Fetcher) asyncWriteBackCacheQueueProcessLoop() { - for { - select { - case fromStorage := <-c.asyncQueue: - chunkFetcherCacheQueueDequeue.Add(float64(len(fromStorage))) - cacheErr := c.WriteBackCache(context.Background(), fromStorage) - if cacheErr != nil { - level.Warn(util_log.Logger).Log("msg", "could not write fetched chunks from storage into chunk cache", "err", cacheErr) - } - case <-c.stop: - return - } - } -} - // Stop the ChunkFetcher. func (c *Fetcher) Stop() { c.stopOnce.Do(func() { close(c.decodeRequests) c.wait.Wait() c.cache.Stop() - close(c.stop) }) } @@ -267,10 +212,8 @@ func (c *Fetcher) FetchChunks(ctx context.Context, chunks []chunk.Chunk) ([]chun st.AddCacheBytesSent(stats.ChunkCache, bytes) // Always cache any chunks we did get - if cacheErr := c.writeBackCacheAsync(fromStorage); cacheErr != nil { - if cacheErr == errAsyncBufferFull { - skipped.Inc() - } + + if cacheErr := c.WriteBackCache(ctx, fromStorage); cacheErr != nil { level.Warn(log).Log("msg", "could not store chunks in chunk cache", "err", cacheErr) } diff --git a/pkg/storage/chunk/fetcher/fetcher_test.go b/pkg/storage/chunk/fetcher/fetcher_test.go index c6215bde5b..902b0dae1d 100644 --- a/pkg/storage/chunk/fetcher/fetcher_test.go +++ b/pkg/storage/chunk/fetcher/fetcher_test.go @@ -193,7 +193,7 @@ func Test(t *testing.T) { assert.NoError(t, chunkClient.PutChunks(context.Background(), test.storeStart)) // Build fetcher - f, err := New(c1, c2, false, sc, chunkClient, 1, 1, test.handoff) + f, err := New(c1, c2, false, sc, chunkClient, test.handoff) assert.NoError(t, err) // Run the test @@ -290,7 +290,7 @@ func BenchmarkFetch(b *testing.B) { _ = chunkClient.PutChunks(context.Background(), test.storeStart) // Build fetcher - f, _ := New(c1, c2, false, sc, chunkClient, 1, 1, test.handoff) + f, _ := New(c1, c2, false, sc, chunkClient, test.handoff) for i := 0; i < b.N; i++ { _, err := f.FetchChunks(context.Background(), test.fetch) diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 1a4fa38606..9e50d1531d 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -199,7 +199,7 @@ func (s *LokiStore) init() error { if err != nil { return err } - f, err := fetcher.New(s.chunksCache, s.chunksCacheL2, s.storeCfg.ChunkCacheStubs(), s.schemaCfg, chunkClient, s.storeCfg.ChunkCacheConfig.AsyncCacheWriteBackConcurrency, s.storeCfg.ChunkCacheConfig.AsyncCacheWriteBackBufferSize, s.storeCfg.L2ChunkCacheHandoff) + f, err := fetcher.New(s.chunksCache, s.chunksCacheL2, s.storeCfg.ChunkCacheStubs(), s.schemaCfg, chunkClient, s.storeCfg.L2ChunkCacheHandoff) if err != nil { return err } diff --git a/pkg/storage/stores/series_store_write_test.go b/pkg/storage/stores/series_store_write_test.go index 823f5bf11f..cac84a17eb 100644 --- a/pkg/storage/stores/series_store_write_test.go +++ b/pkg/storage/stores/series_store_write_test.go @@ -155,7 +155,7 @@ func TestChunkWriter_PutOne(t *testing.T) { idx := &mockIndexWriter{} client := &mockChunksClient{} - f, err := fetcher.New(cache, nil, false, schemaConfig, client, 1, 1, 0) + f, err := fetcher.New(cache, nil, false, schemaConfig, client, 0) require.NoError(t, err) cw := NewChunkWriter(f, schemaConfig, idx, true) diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go index 7743bce2fb..db71f70af5 100644 --- a/pkg/storage/util_test.go +++ b/pkg/storage/util_test.go @@ -260,7 +260,7 @@ func (m *mockChunkStore) GetChunks(_ context.Context, _ string, _, _ model.Time, panic(err) } - f, err := fetcher.New(cache, nil, false, m.schemas, m.client, 10, 100, 0) + f, err := fetcher.New(cache, nil, false, m.schemas, m.client, 0) if err != nil { panic(err) }