store: fix bug in getting chunk ids from ingesters when using boltdb-shipper (#5945)

* initialize async store in store factory function itself

* add a comment about QueryIngestersWithin
pull/5947/head
Sandeep Sukhani 4 years ago committed by GitHub
parent 27a536718f
commit d1aff7b29f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      pkg/loki/modules.go
  2. 15
      pkg/storage/async_store.go
  3. 9
      pkg/storage/async_store_test.go
  4. 5
      pkg/storage/factory.go
  5. 4
      pkg/storage/store.go

@ -458,17 +458,19 @@ func (t *Loki) initStore() (_ services.Service, err error) {
}
}
if asyncStore {
t.Cfg.StorageConfig.EnableAsyncStore = true
t.Cfg.StorageConfig.AsyncStoreConfig = storage.AsyncStoreCfg{
IngesterQuerier: t.ingesterQuerier,
QueryIngestersWithin: calculateAsyncStoreQueryIngestersWithin(t.Cfg.Querier.QueryIngestersWithin, boltdbShipperMinIngesterQueryStoreDuration(t.Cfg)),
}
}
t.Store, err = storage.NewStore(t.Cfg.StorageConfig, t.Cfg.ChunkStoreConfig, t.Cfg.SchemaConfig, t.overrides, t.clientMetrics, prometheus.DefaultRegisterer, util_log.Logger)
if err != nil {
return
}
if asyncStore {
t.Store = storage.NewAsyncStore(t.Store, t.Cfg.SchemaConfig, t.ingesterQuerier,
calculateAsyncStoreQueryIngestersWithin(t.Cfg.Querier.QueryIngestersWithin, boltdbShipperMinIngesterQueryStoreDuration(t.Cfg)),
)
}
return services.NewIdleService(nil, func(_ error) error {
t.Store.Stop()
return nil

@ -12,6 +12,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/fetcher"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores"
"github.com/grafana/loki/pkg/util/spanlogger"
util_log "github.com/grafana/loki/pkg/util/log"
@ -21,23 +22,29 @@ type IngesterQuerier interface {
GetChunkIDs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]string, error)
}
type AsyncStoreCfg struct {
IngesterQuerier IngesterQuerier
// QueryIngestersWithin defines maximum lookback beyond which ingesters are not queried for chunk ids.
QueryIngestersWithin time.Duration
}
// AsyncStore does querying to both ingesters and chunk store and combines the results after deduping them.
// This should be used when using an async store like boltdb-shipper.
// AsyncStore is meant to be used only in queriers or any other service other than ingesters.
// It should never be used in ingesters otherwise it would start spiraling around doing queries over and over again to other ingesters.
type AsyncStore struct {
Store
stores.Store
scfg config.SchemaConfig
ingesterQuerier IngesterQuerier
queryIngestersWithin time.Duration
}
func NewAsyncStore(store Store, scfg config.SchemaConfig, querier IngesterQuerier, queryIngestersWithin time.Duration) *AsyncStore {
func NewAsyncStore(cfg AsyncStoreCfg, store stores.Store, scfg config.SchemaConfig) *AsyncStore {
return &AsyncStore{
Store: store,
scfg: scfg,
ingesterQuerier: querier,
queryIngestersWithin: queryIngestersWithin,
ingesterQuerier: cfg.IngesterQuerier,
queryIngestersWithin: cfg.QueryIngestersWithin,
}
}

@ -211,7 +211,8 @@ func TestAsyncStore_mergeIngesterAndStoreChunks(t *testing.T) {
ingesterQuerier := newIngesterQuerierMock()
ingesterQuerier.On("GetChunkIDs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.ingesterChunkIDs, nil)
asyncStore := NewAsyncStore(store, config.SchemaConfig{}, ingesterQuerier, 0)
asyncStoreCfg := AsyncStoreCfg{IngesterQuerier: ingesterQuerier}
asyncStore := NewAsyncStore(asyncStoreCfg, store, config.SchemaConfig{})
chunks, fetchers, err := asyncStore.GetChunkRefs(context.Background(), "fake", model.Now(), model.Now(), nil)
require.NoError(t, err)
@ -268,7 +269,11 @@ func TestAsyncStore_QueryIngestersWithin(t *testing.T) {
ingesterQuerier := newIngesterQuerierMock()
ingesterQuerier.On("GetChunkIDs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil)
asyncStore := NewAsyncStore(store, config.SchemaConfig{}, ingesterQuerier, tc.queryIngestersWithin)
asyncStoreCfg := AsyncStoreCfg{
IngesterQuerier: ingesterQuerier,
QueryIngestersWithin: tc.queryIngestersWithin,
}
asyncStore := NewAsyncStore(asyncStoreCfg, store, config.SchemaConfig{})
_, _, err := asyncStore.GetChunkRefs(context.Background(), "fake", tc.queryFrom, tc.queryThrough, nil)
require.NoError(t, err)

@ -63,6 +63,11 @@ type Config struct {
MaxChunkBatchSize int `yaml:"max_chunk_batch_size"`
BoltDBShipperConfig shipper.Config `yaml:"boltdb_shipper"`
// Config for using AsyncStore when using async index stores like `boltdb-shipper`.
// It is required for getting chunk ids of recently flushed chunks from the ingesters.
EnableAsyncStore bool `yaml:"-"`
AsyncStoreConfig AsyncStoreCfg `yaml:"-"`
}
// RegisterFlags adds the flags required to configure this flag set.

@ -159,6 +159,10 @@ func (s *store) init() error {
}
s.composite.AddStore(p.From.Time, f, idx, w, stop)
}
if s.cfg.EnableAsyncStore {
s.Store = NewAsyncStore(s.cfg.AsyncStoreConfig, s.Store, s.schemaCfg)
}
return nil
}

Loading…
Cancel
Save