From fefc33debc57c2db81ef9bfbddb1556c728ab6ff Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Fri, 3 Jun 2022 12:35:16 +0530 Subject: [PATCH] make tsdb store instance a singleton (#6273) * make tsdb store instance a singleton * make it mandatory to have 24h index period config for tsdb * avoid initializing head and tsdb manager when running tsdb store in read only mode --- pkg/storage/config/schema_config.go | 14 ++- pkg/storage/stores/tsdb/head_manager.go | 2 - pkg/storage/stores/tsdb/head_manager_test.go | 8 +- pkg/storage/stores/tsdb/index_client.go | 9 +- .../stores/tsdb/index_shipper_querier.go | 112 ++++++++++++++++++ pkg/storage/stores/tsdb/manager.go | 105 ++-------------- pkg/storage/stores/tsdb/store.go | 107 ++++++++++++----- 7 files changed, 218 insertions(+), 139 deletions(-) create mode 100644 pkg/storage/stores/tsdb/index_shipper_querier.go diff --git a/pkg/storage/config/schema_config.go b/pkg/storage/config/schema_config.go index 2e652e4928..f7856c72af 100644 --- a/pkg/storage/config/schema_config.go +++ b/pkg/storage/config/schema_config.go @@ -42,6 +42,9 @@ const ( // BoltDBShipperType holds the index type for using boltdb with shipper which keeps flushing them to a shared storage BoltDBShipperType = "boltdb-shipper" TSDBType = "tsdb" + + // ObjectStorageIndexRequiredPeriod defines the required index period for object storage based index stores like boltdb-shipper and tsdb + ObjectStorageIndexRequiredPeriod = 24 * time.Hour ) var ( @@ -53,6 +56,7 @@ var ( errCurrentBoltdbShipperNon24Hours = errors.New("boltdb-shipper works best with 24h periodic index config. Either add a new config with future date set to 24h to retain the existing index or change the existing config to use 24h period") errUpcomingBoltdbShipperNon24Hours = errors.New("boltdb-shipper with future date must always have periodic config for index set to 24h") + errTSDBNon24HoursIndexPeriod = errors.New("tsdb must always have periodic config for index set to 24h") errZeroLengthConfig = errors.New("must specify at least one schema configuration") ) @@ -149,12 +153,14 @@ func (cfg *SchemaConfig) Validate() error { activePCIndex := ActivePeriodConfig((*cfg).Configs) // if current index type is boltdb-shipper and there are no upcoming index types then it should be set to 24 hours. - if cfg.Configs[activePCIndex].IndexType == BoltDBShipperType && cfg.Configs[activePCIndex].IndexTables.Period != 24*time.Hour && len(cfg.Configs)-1 == activePCIndex { + if cfg.Configs[activePCIndex].IndexType == BoltDBShipperType && + cfg.Configs[activePCIndex].IndexTables.Period != ObjectStorageIndexRequiredPeriod && len(cfg.Configs)-1 == activePCIndex { return errCurrentBoltdbShipperNon24Hours } // if upcoming index type is boltdb-shipper, it should always be set to 24 hours. - if len(cfg.Configs)-1 > activePCIndex && (cfg.Configs[activePCIndex+1].IndexType == BoltDBShipperType && cfg.Configs[activePCIndex+1].IndexTables.Period != 24*time.Hour) { + if len(cfg.Configs)-1 > activePCIndex && (cfg.Configs[activePCIndex+1].IndexType == BoltDBShipperType && + cfg.Configs[activePCIndex+1].IndexTables.Period != ObjectStorageIndexRequiredPeriod) { return errUpcomingBoltdbShipperNon24Hours } @@ -282,6 +288,10 @@ func (cfg PeriodConfig) validate() error { return validateError } + if cfg.IndexType == TSDBType && cfg.IndexTables.Period != ObjectStorageIndexRequiredPeriod { + return errTSDBNon24HoursIndexPeriod + } + // Ensure the tables period is a multiple of the bucket period if cfg.IndexTables.Period > 0 && cfg.IndexTables.Period%(24*time.Hour) != 0 { return errInvalidTablePeriod diff --git a/pkg/storage/stores/tsdb/head_manager.go b/pkg/storage/stores/tsdb/head_manager.go index 872c1d2e90..589e34a6ba 100644 --- a/pkg/storage/stores/tsdb/head_manager.go +++ b/pkg/storage/stores/tsdb/head_manager.go @@ -127,8 +127,6 @@ func NewHeadManager(logger log.Logger, dir string, metrics *Metrics, tsdbManager indices = append(indices, m.activeHeads) } - indices = append(indices, m.tsdbManager) - return NewMultiIndex(indices...) }) diff --git a/pkg/storage/stores/tsdb/head_manager_test.go b/pkg/storage/stores/tsdb/head_manager_test.go index e993b31f15..e778bdc423 100644 --- a/pkg/storage/stores/tsdb/head_manager_test.go +++ b/pkg/storage/stores/tsdb/head_manager_test.go @@ -272,9 +272,13 @@ func Test_HeadManager_Lifecycle(t *testing.T) { // Start, ensuring recovery from old WALs require.Nil(t, mgr.Start()) + // Ensure old WAL data is queryable + multiIndex, err := NewMultiIndex(mgr, mgr.tsdbManager.(noopTSDBManager).tenantHeads) + require.Nil(t, err) + for _, c := range cases { - refs, err := mgr.GetChunkRefs( + refs, err := multiIndex.GetChunkRefs( context.Background(), c.User, 0, math.MaxInt64, @@ -309,7 +313,7 @@ func Test_HeadManager_Lifecycle(t *testing.T) { // Ensure old + new data is queryable for _, c := range append(cases, newCase) { - refs, err := mgr.GetChunkRefs( + refs, err := multiIndex.GetChunkRefs( context.Background(), c.User, 0, math.MaxInt64, diff --git a/pkg/storage/stores/tsdb/index_client.go b/pkg/storage/stores/tsdb/index_client.go index 08f3d652ee..8a5fb86e19 100644 --- a/pkg/storage/stores/tsdb/index_client.go +++ b/pkg/storage/stores/tsdb/index_client.go @@ -9,21 +9,16 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/querier/astmapper" "github.com/grafana/loki/pkg/storage/chunk" - "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/tsdb/index" ) // implements stores.Index type IndexClient struct { - schema config.SchemaConfig - idx Index + idx Index } -func NewIndexClient(idx Index, pd config.PeriodConfig) *IndexClient { +func NewIndexClient(idx Index) *IndexClient { return &IndexClient{ - schema: config.SchemaConfig{ - Configs: []config.PeriodConfig{pd}, - }, idx: idx, } } diff --git a/pkg/storage/stores/tsdb/index_shipper_querier.go b/pkg/storage/stores/tsdb/index_shipper_querier.go new file mode 100644 index 0000000000..d292822a62 --- /dev/null +++ b/pkg/storage/stores/tsdb/index_shipper_querier.go @@ -0,0 +1,112 @@ +package tsdb + +import ( + "context" + "fmt" + "math" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + + "github.com/grafana/loki/pkg/storage/chunk" + "github.com/grafana/loki/pkg/storage/stores/indexshipper" + shipper_index "github.com/grafana/loki/pkg/storage/stores/indexshipper/index" + "github.com/grafana/loki/pkg/storage/stores/tsdb/index" +) + +// indexShipperQuerier is used for querying index from the shipper. +type indexShipperQuerier struct { + shipper indexshipper.IndexShipper + chunkFilter chunk.RequestChunkFilterer +} + +func newIndexShipperQuerier(shipper indexshipper.IndexShipper) Index { + return &indexShipperQuerier{shipper: shipper} +} + +func (i *indexShipperQuerier) indices(ctx context.Context, from, through model.Time, user string) (Index, error) { + var indices []Index + + // Ensure we query both per tenant and multitenant TSDBs + + for _, bkt := range indexBuckets(from, through) { + if err := i.shipper.ForEach(ctx, fmt.Sprintf("%d", bkt), user, func(idx shipper_index.Index) error { + _, multitenant := parseMultitenantTSDBName(idx.Name()) + impl, ok := idx.(Index) + if !ok { + return fmt.Errorf("unexpected shipper index type: %T", idx) + } + if multitenant { + indices = append(indices, NewMultiTenantIndex(impl)) + } else { + indices = append(indices, impl) + } + return nil + }); err != nil { + return nil, err + } + + } + + if len(indices) == 0 { + return NoopIndex{}, nil + } + idx, err := NewMultiIndex(indices...) + if err != nil { + return nil, err + } + + if i.chunkFilter != nil { + idx.SetChunkFilterer(i.chunkFilter) + } + return idx, nil +} + +// TODO(owen-d): how to better implement this? +// setting 0->maxint will force the tsdbmanager to always query +// underlying tsdbs, which is safe, but can we optimize this? +func (i *indexShipperQuerier) Bounds() (model.Time, model.Time) { + return 0, math.MaxInt64 +} + +func (i *indexShipperQuerier) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) { + i.chunkFilter = chunkFilter +} + +// Close implements Index.Close, but we offload this responsibility +// to the index shipper +func (i *indexShipperQuerier) Close() error { + return nil +} + +func (i *indexShipperQuerier) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error) { + idx, err := i.indices(ctx, from, through, userID) + if err != nil { + return nil, err + } + return idx.GetChunkRefs(ctx, userID, from, through, res, shard, matchers...) +} + +func (i *indexShipperQuerier) Series(ctx context.Context, userID string, from, through model.Time, res []Series, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]Series, error) { + idx, err := i.indices(ctx, from, through, userID) + if err != nil { + return nil, err + } + return idx.Series(ctx, userID, from, through, res, shard, matchers...) +} + +func (i *indexShipperQuerier) LabelNames(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]string, error) { + idx, err := i.indices(ctx, from, through, userID) + if err != nil { + return nil, err + } + return idx.LabelNames(ctx, userID, from, through, matchers...) +} + +func (i *indexShipperQuerier) LabelValues(ctx context.Context, userID string, from, through model.Time, name string, matchers ...*labels.Matcher) ([]string, error) { + idx, err := i.indices(ctx, from, through, userID) + if err != nil { + return nil, err + } + return idx.LabelValues(ctx, userID, from, through, name, matchers...) +} diff --git a/pkg/storage/stores/tsdb/manager.go b/pkg/storage/stores/tsdb/manager.go index f96d6f3b9a..4c8aad779c 100644 --- a/pkg/storage/stores/tsdb/manager.go +++ b/pkg/storage/stores/tsdb/manager.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "io/ioutil" - "math" "path/filepath" "strconv" "sync" @@ -16,9 +15,8 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" - "github.com/grafana/loki/pkg/storage/chunk" + "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/indexshipper" - shipper_index "github.com/grafana/loki/pkg/storage/stores/indexshipper/index" "github.com/grafana/loki/pkg/storage/stores/tsdb/index" ) @@ -27,13 +25,12 @@ import ( // TSDB files on disk type TSDBManager interface { Start() error - Index // Builds a new TSDB file from a set of WALs BuildFromWALs(time.Time, []WALIdentifier) error } /* -tsdbManager is responsible for: +tsdbManager is used for managing active index and is responsible for: * Turning WALs into optimized multi-tenant TSDBs when requested * Serving reads from these TSDBs * Shipping them to remote storage @@ -49,8 +46,7 @@ type tsdbManager struct { sync.RWMutex - chunkFilter chunk.RequestChunkFilterer - shipper indexshipper.IndexShipper + shipper indexshipper.IndexShipper } func NewTSDBManager( @@ -176,7 +172,7 @@ func (m *tsdbManager) BuildFromWALs(t time.Time, ids []WALIdentifier) (err error // chunks may overlap index period bounds, in which case they're written to multiple pds := make(map[int]index.ChunkMetas) for _, chk := range chks { - for _, bucket := range indexBuckets(m.indexPeriod, chk.From(), chk.Through()) { + for _, bucket := range indexBuckets(chk.From(), chk.Through()) { pds[bucket] = append(pds[bucket], chk) } } @@ -249,98 +245,11 @@ func (m *tsdbManager) BuildFromWALs(t time.Time, ids []WALIdentifier) (err error return nil } -func indexBuckets(indexPeriod time.Duration, from, through model.Time) (res []int) { - start := from.Time().UnixNano() / int64(indexPeriod) - end := through.Time().UnixNano() / int64(indexPeriod) +func indexBuckets(from, through model.Time) (res []int) { + start := from.Time().UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod) + end := through.Time().UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod) for cur := start; cur <= end; cur++ { res = append(res, int(cur)) } return } - -func (m *tsdbManager) indices(ctx context.Context, from, through model.Time, user string) (Index, error) { - var indices []Index - - // Ensure we query both per tenant and multitenant TSDBs - - for _, bkt := range indexBuckets(m.indexPeriod, from, through) { - if err := m.shipper.ForEach(ctx, fmt.Sprintf("%d", bkt), user, func(idx shipper_index.Index) error { - _, multitenant := parseMultitenantTSDBName(idx.Name()) - impl, ok := idx.(Index) - if !ok { - return fmt.Errorf("unexpected shipper index type: %T", idx) - } - if multitenant { - indices = append(indices, NewMultiTenantIndex(impl)) - } else { - indices = append(indices, impl) - } - return nil - }); err != nil { - return nil, err - } - - } - - if len(indices) == 0 { - return NoopIndex{}, nil - } - idx, err := NewMultiIndex(indices...) - if err != nil { - return nil, err - } - - if m.chunkFilter != nil { - idx.SetChunkFilterer(m.chunkFilter) - } - return idx, nil -} - -// TODO(owen-d): how to better implement this? -// setting 0->maxint will force the tsdbmanager to always query -// underlying tsdbs, which is safe, but can we optimize this? -func (m *tsdbManager) Bounds() (model.Time, model.Time) { - return 0, math.MaxInt64 -} - -func (m *tsdbManager) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) { - m.chunkFilter = chunkFilter -} - -// Close implements Index.Close, but we offload this responsibility -// to the index shipper -func (m *tsdbManager) Close() error { - return nil -} - -func (m *tsdbManager) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error) { - idx, err := m.indices(ctx, from, through, userID) - if err != nil { - return nil, err - } - return idx.GetChunkRefs(ctx, userID, from, through, res, shard, matchers...) -} - -func (m *tsdbManager) Series(ctx context.Context, userID string, from, through model.Time, res []Series, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]Series, error) { - idx, err := m.indices(ctx, from, through, userID) - if err != nil { - return nil, err - } - return idx.Series(ctx, userID, from, through, res, shard, matchers...) -} - -func (m *tsdbManager) LabelNames(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]string, error) { - idx, err := m.indices(ctx, from, through, userID) - if err != nil { - return nil, err - } - return idx.LabelNames(ctx, userID, from, through, matchers...) -} - -func (m *tsdbManager) LabelValues(ctx context.Context, userID string, from, through model.Time, name string, matchers ...*labels.Matcher) ([]string, error) { - idx, err := m.indices(ctx, from, through, userID) - if err != nil { - return nil, err - } - return idx.LabelValues(ctx, userID, from, through, name, matchers...) -} diff --git a/pkg/storage/stores/tsdb/store.go b/pkg/storage/stores/tsdb/store.go index 286041c2bf..a353f9863c 100644 --- a/pkg/storage/stores/tsdb/store.go +++ b/pkg/storage/stores/tsdb/store.go @@ -1,7 +1,10 @@ package tsdb import ( + "fmt" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/pkg/storage/chunk/client" "github.com/grafana/loki/pkg/storage/chunk/fetcher" @@ -10,16 +13,39 @@ import ( "github.com/grafana/loki/pkg/storage/stores/indexshipper" "github.com/grafana/loki/pkg/storage/stores/indexshipper/downloads" "github.com/grafana/loki/pkg/storage/stores/series" + "github.com/grafana/loki/pkg/storage/stores/tsdb/index" util_log "github.com/grafana/loki/pkg/util/log" ) +// we do not need to build store for each schema config since we do not do any schema specific handling yet. +// If we do need to do schema specific handling, it would be a good idea to abstract away the handling since +// running multiple head managers would be complicated and wasteful. +var storeInstance *store + +type store struct { + indexWriter IndexWriter + indexStore series.IndexStore +} + +// NewStore creates a new store if not initialized already. +// Each call to NewStore will always build a new stores.ChunkWriter even if the store was already initialized since +// fetcher.Fetcher instances could be different due to periodic configs having different types of object storage configured +// for storing chunks. func NewStore(indexShipperCfg indexshipper.Config, p config.PeriodConfig, f *fetcher.Fetcher, objectClient client.ObjectClient, limits downloads.Limits, reg prometheus.Registerer) (stores.ChunkWriter, series.IndexStore, error) { - var ( - nodeName = indexShipperCfg.IngesterName - dir = indexShipperCfg.ActiveIndexDirectory - ) - tsdbMetrics := NewMetrics(reg) + if storeInstance == nil { + storeInstance = &store{} + err := storeInstance.init(indexShipperCfg, p, objectClient, limits, reg) + if err != nil { + return nil, nil, err + } + } + + return NewChunkWriter(f, p, storeInstance.indexWriter), storeInstance.indexStore, nil +} + +func (s *store) init(indexShipperCfg indexshipper.Config, p config.PeriodConfig, + objectClient client.ObjectClient, limits downloads.Limits, reg prometheus.Registerer) error { shpr, err := indexshipper.NewIndexShipper( indexShipperCfg, @@ -30,31 +56,56 @@ func NewStore(indexShipperCfg indexshipper.Config, p config.PeriodConfig, f *fet prometheus.WrapRegistererWithPrefix("loki_tsdb_shipper_", reg), ) if err != nil { - return nil, nil, err + return err } - tsdbManager := NewTSDBManager( - nodeName, - dir, - shpr, - p.IndexTables.Period, - util_log.Logger, - tsdbMetrics, - ) - // TODO(owen-d): Only need HeadManager - // on the ingester. Otherwise, the TSDBManager is sufficient - headManager := NewHeadManager( - util_log.Logger, - dir, - tsdbMetrics, - tsdbManager, - ) - if err := headManager.Start(); err != nil { - return nil, nil, err + + var indices []Index + + if indexShipperCfg.Mode != indexshipper.ModeReadOnly { + var ( + nodeName = indexShipperCfg.IngesterName + dir = indexShipperCfg.ActiveIndexDirectory + ) + + tsdbMetrics := NewMetrics(reg) + tsdbManager := NewTSDBManager( + nodeName, + dir, + shpr, + p.IndexTables.Period, + util_log.Logger, + tsdbMetrics, + ) + + headManager := NewHeadManager( + util_log.Logger, + dir, + tsdbMetrics, + tsdbManager, + ) + if err := headManager.Start(); err != nil { + return err + } + + s.indexWriter = headManager + indices = append(indices, headManager) + } else { + s.indexWriter = failingIndexWriter{} } - idx := NewIndexClient(headManager, p) - writer := NewChunkWriter(f, p, headManager) - // TODO(owen-d): add TSDB index-gateway support + indices = append(indices, newIndexShipperQuerier(shpr)) + multiIndex, err := NewMultiIndex(indices...) + if err != nil { + return err + } + + s.indexStore = NewIndexClient(multiIndex) + + return nil +} + +type failingIndexWriter struct{} - return writer, idx, nil +func (f failingIndexWriter) Append(_ string, _ labels.Labels, _ index.ChunkMetas) error { + return fmt.Errorf("index writer is not initialized due to tsdb store being initialized in read-only mode") }