diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index a7562d0a1b..faab9b3a95 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -458,17 +458,19 @@ func (t *Loki) initStore() (_ services.Service, err error) { } } + if asyncStore { + t.Cfg.StorageConfig.EnableAsyncStore = true + t.Cfg.StorageConfig.AsyncStoreConfig = storage.AsyncStoreCfg{ + IngesterQuerier: t.ingesterQuerier, + QueryIngestersWithin: calculateAsyncStoreQueryIngestersWithin(t.Cfg.Querier.QueryIngestersWithin, boltdbShipperMinIngesterQueryStoreDuration(t.Cfg)), + } + } + t.Store, err = storage.NewStore(t.Cfg.StorageConfig, t.Cfg.ChunkStoreConfig, t.Cfg.SchemaConfig, t.overrides, t.clientMetrics, prometheus.DefaultRegisterer, util_log.Logger) if err != nil { return } - if asyncStore { - t.Store = storage.NewAsyncStore(t.Store, t.Cfg.SchemaConfig, t.ingesterQuerier, - calculateAsyncStoreQueryIngestersWithin(t.Cfg.Querier.QueryIngestersWithin, boltdbShipperMinIngesterQueryStoreDuration(t.Cfg)), - ) - } - return services.NewIdleService(nil, func(_ error) error { t.Store.Stop() return nil diff --git a/pkg/storage/async_store.go b/pkg/storage/async_store.go index 94c7af94ae..dd3fb35162 100644 --- a/pkg/storage/async_store.go +++ b/pkg/storage/async_store.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/chunk/fetcher" "github.com/grafana/loki/pkg/storage/config" + "github.com/grafana/loki/pkg/storage/stores" "github.com/grafana/loki/pkg/util/spanlogger" util_log "github.com/grafana/loki/pkg/util/log" @@ -21,23 +22,29 @@ type IngesterQuerier interface { GetChunkIDs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]string, error) } +type AsyncStoreCfg struct { + IngesterQuerier IngesterQuerier + // QueryIngestersWithin defines maximum lookback beyond which ingesters are not queried for chunk ids. + QueryIngestersWithin time.Duration +} + // AsyncStore does querying to both ingesters and chunk store and combines the results after deduping them. // This should be used when using an async store like boltdb-shipper. // AsyncStore is meant to be used only in queriers or any other service other than ingesters. // It should never be used in ingesters otherwise it would start spiraling around doing queries over and over again to other ingesters. type AsyncStore struct { - Store + stores.Store scfg config.SchemaConfig ingesterQuerier IngesterQuerier queryIngestersWithin time.Duration } -func NewAsyncStore(store Store, scfg config.SchemaConfig, querier IngesterQuerier, queryIngestersWithin time.Duration) *AsyncStore { +func NewAsyncStore(cfg AsyncStoreCfg, store stores.Store, scfg config.SchemaConfig) *AsyncStore { return &AsyncStore{ Store: store, scfg: scfg, - ingesterQuerier: querier, - queryIngestersWithin: queryIngestersWithin, + ingesterQuerier: cfg.IngesterQuerier, + queryIngestersWithin: cfg.QueryIngestersWithin, } } diff --git a/pkg/storage/async_store_test.go b/pkg/storage/async_store_test.go index 67af1ad9df..30e96381cc 100644 --- a/pkg/storage/async_store_test.go +++ b/pkg/storage/async_store_test.go @@ -211,7 +211,8 @@ func TestAsyncStore_mergeIngesterAndStoreChunks(t *testing.T) { ingesterQuerier := newIngesterQuerierMock() ingesterQuerier.On("GetChunkIDs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.ingesterChunkIDs, nil) - asyncStore := NewAsyncStore(store, config.SchemaConfig{}, ingesterQuerier, 0) + asyncStoreCfg := AsyncStoreCfg{IngesterQuerier: ingesterQuerier} + asyncStore := NewAsyncStore(asyncStoreCfg, store, config.SchemaConfig{}) chunks, fetchers, err := asyncStore.GetChunkRefs(context.Background(), "fake", model.Now(), model.Now(), nil) require.NoError(t, err) @@ -268,7 +269,11 @@ func TestAsyncStore_QueryIngestersWithin(t *testing.T) { ingesterQuerier := newIngesterQuerierMock() ingesterQuerier.On("GetChunkIDs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil) - asyncStore := NewAsyncStore(store, config.SchemaConfig{}, ingesterQuerier, tc.queryIngestersWithin) + asyncStoreCfg := AsyncStoreCfg{ + IngesterQuerier: ingesterQuerier, + QueryIngestersWithin: tc.queryIngestersWithin, + } + asyncStore := NewAsyncStore(asyncStoreCfg, store, config.SchemaConfig{}) _, _, err := asyncStore.GetChunkRefs(context.Background(), "fake", tc.queryFrom, tc.queryThrough, nil) require.NoError(t, err) diff --git a/pkg/storage/factory.go b/pkg/storage/factory.go index bf54ad17a3..d22fee324b 100644 --- a/pkg/storage/factory.go +++ b/pkg/storage/factory.go @@ -63,6 +63,11 @@ type Config struct { MaxChunkBatchSize int `yaml:"max_chunk_batch_size"` BoltDBShipperConfig shipper.Config `yaml:"boltdb_shipper"` + + // Config for using AsyncStore when using async index stores like `boltdb-shipper`. + // It is required for getting chunk ids of recently flushed chunks from the ingesters. + EnableAsyncStore bool `yaml:"-"` + AsyncStoreConfig AsyncStoreCfg `yaml:"-"` } // RegisterFlags adds the flags required to configure this flag set. diff --git a/pkg/storage/store.go b/pkg/storage/store.go index b9264b33aa..4179763354 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -159,6 +159,10 @@ func (s *store) init() error { } s.composite.AddStore(p.From.Time, f, idx, w, stop) } + + if s.cfg.EnableAsyncStore { + s.Store = NewAsyncStore(s.cfg.AsyncStoreConfig, s.Store, s.schemaCfg) + } return nil }