make tsdb store instance a singleton (#6273)

* make tsdb store instance a singleton

* make it mandatory to have 24h index period config for tsdb
* avoid initializing head and tsdb manager when running tsdb store in read only mode
pull/6302/head^2
Sandeep Sukhani 4 years ago committed by GitHub
parent 3c1accccb1
commit fefc33debc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      pkg/storage/config/schema_config.go
  2. 2
      pkg/storage/stores/tsdb/head_manager.go
  3. 8
      pkg/storage/stores/tsdb/head_manager_test.go
  4. 9
      pkg/storage/stores/tsdb/index_client.go
  5. 112
      pkg/storage/stores/tsdb/index_shipper_querier.go
  6. 105
      pkg/storage/stores/tsdb/manager.go
  7. 107
      pkg/storage/stores/tsdb/store.go

@ -42,6 +42,9 @@ const (
// BoltDBShipperType holds the index type for using boltdb with shipper which keeps flushing them to a shared storage
BoltDBShipperType = "boltdb-shipper"
TSDBType = "tsdb"
// ObjectStorageIndexRequiredPeriod defines the required index period for object storage based index stores like boltdb-shipper and tsdb
ObjectStorageIndexRequiredPeriod = 24 * time.Hour
)
var (
@ -53,6 +56,7 @@ var (
errCurrentBoltdbShipperNon24Hours = errors.New("boltdb-shipper works best with 24h periodic index config. Either add a new config with future date set to 24h to retain the existing index or change the existing config to use 24h period")
errUpcomingBoltdbShipperNon24Hours = errors.New("boltdb-shipper with future date must always have periodic config for index set to 24h")
errTSDBNon24HoursIndexPeriod = errors.New("tsdb must always have periodic config for index set to 24h")
errZeroLengthConfig = errors.New("must specify at least one schema configuration")
)
@ -149,12 +153,14 @@ func (cfg *SchemaConfig) Validate() error {
activePCIndex := ActivePeriodConfig((*cfg).Configs)
// if current index type is boltdb-shipper and there are no upcoming index types then it should be set to 24 hours.
if cfg.Configs[activePCIndex].IndexType == BoltDBShipperType && cfg.Configs[activePCIndex].IndexTables.Period != 24*time.Hour && len(cfg.Configs)-1 == activePCIndex {
if cfg.Configs[activePCIndex].IndexType == BoltDBShipperType &&
cfg.Configs[activePCIndex].IndexTables.Period != ObjectStorageIndexRequiredPeriod && len(cfg.Configs)-1 == activePCIndex {
return errCurrentBoltdbShipperNon24Hours
}
// if upcoming index type is boltdb-shipper, it should always be set to 24 hours.
if len(cfg.Configs)-1 > activePCIndex && (cfg.Configs[activePCIndex+1].IndexType == BoltDBShipperType && cfg.Configs[activePCIndex+1].IndexTables.Period != 24*time.Hour) {
if len(cfg.Configs)-1 > activePCIndex && (cfg.Configs[activePCIndex+1].IndexType == BoltDBShipperType &&
cfg.Configs[activePCIndex+1].IndexTables.Period != ObjectStorageIndexRequiredPeriod) {
return errUpcomingBoltdbShipperNon24Hours
}
@ -282,6 +288,10 @@ func (cfg PeriodConfig) validate() error {
return validateError
}
if cfg.IndexType == TSDBType && cfg.IndexTables.Period != ObjectStorageIndexRequiredPeriod {
return errTSDBNon24HoursIndexPeriod
}
// Ensure the tables period is a multiple of the bucket period
if cfg.IndexTables.Period > 0 && cfg.IndexTables.Period%(24*time.Hour) != 0 {
return errInvalidTablePeriod

@ -127,8 +127,6 @@ func NewHeadManager(logger log.Logger, dir string, metrics *Metrics, tsdbManager
indices = append(indices, m.activeHeads)
}
indices = append(indices, m.tsdbManager)
return NewMultiIndex(indices...)
})

@ -272,9 +272,13 @@ func Test_HeadManager_Lifecycle(t *testing.T) {
// Start, ensuring recovery from old WALs
require.Nil(t, mgr.Start())
// Ensure old WAL data is queryable
multiIndex, err := NewMultiIndex(mgr, mgr.tsdbManager.(noopTSDBManager).tenantHeads)
require.Nil(t, err)
for _, c := range cases {
refs, err := mgr.GetChunkRefs(
refs, err := multiIndex.GetChunkRefs(
context.Background(),
c.User,
0, math.MaxInt64,
@ -309,7 +313,7 @@ func Test_HeadManager_Lifecycle(t *testing.T) {
// Ensure old + new data is queryable
for _, c := range append(cases, newCase) {
refs, err := mgr.GetChunkRefs(
refs, err := multiIndex.GetChunkRefs(
context.Background(),
c.User,
0, math.MaxInt64,

@ -9,21 +9,16 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/querier/astmapper"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
)
// implements stores.Index
type IndexClient struct {
schema config.SchemaConfig
idx Index
idx Index
}
func NewIndexClient(idx Index, pd config.PeriodConfig) *IndexClient {
func NewIndexClient(idx Index) *IndexClient {
return &IndexClient{
schema: config.SchemaConfig{
Configs: []config.PeriodConfig{pd},
},
idx: idx,
}
}

@ -0,0 +1,112 @@
package tsdb
import (
"context"
"fmt"
"math"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/stores/indexshipper"
shipper_index "github.com/grafana/loki/pkg/storage/stores/indexshipper/index"
"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
)
// indexShipperQuerier is used for querying index from the shipper.
type indexShipperQuerier struct {
shipper indexshipper.IndexShipper
chunkFilter chunk.RequestChunkFilterer
}
func newIndexShipperQuerier(shipper indexshipper.IndexShipper) Index {
return &indexShipperQuerier{shipper: shipper}
}
func (i *indexShipperQuerier) indices(ctx context.Context, from, through model.Time, user string) (Index, error) {
var indices []Index
// Ensure we query both per tenant and multitenant TSDBs
for _, bkt := range indexBuckets(from, through) {
if err := i.shipper.ForEach(ctx, fmt.Sprintf("%d", bkt), user, func(idx shipper_index.Index) error {
_, multitenant := parseMultitenantTSDBName(idx.Name())
impl, ok := idx.(Index)
if !ok {
return fmt.Errorf("unexpected shipper index type: %T", idx)
}
if multitenant {
indices = append(indices, NewMultiTenantIndex(impl))
} else {
indices = append(indices, impl)
}
return nil
}); err != nil {
return nil, err
}
}
if len(indices) == 0 {
return NoopIndex{}, nil
}
idx, err := NewMultiIndex(indices...)
if err != nil {
return nil, err
}
if i.chunkFilter != nil {
idx.SetChunkFilterer(i.chunkFilter)
}
return idx, nil
}
// TODO(owen-d): how to better implement this?
// setting 0->maxint will force the tsdbmanager to always query
// underlying tsdbs, which is safe, but can we optimize this?
func (i *indexShipperQuerier) Bounds() (model.Time, model.Time) {
return 0, math.MaxInt64
}
func (i *indexShipperQuerier) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) {
i.chunkFilter = chunkFilter
}
// Close implements Index.Close, but we offload this responsibility
// to the index shipper
func (i *indexShipperQuerier) Close() error {
return nil
}
func (i *indexShipperQuerier) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error) {
idx, err := i.indices(ctx, from, through, userID)
if err != nil {
return nil, err
}
return idx.GetChunkRefs(ctx, userID, from, through, res, shard, matchers...)
}
func (i *indexShipperQuerier) Series(ctx context.Context, userID string, from, through model.Time, res []Series, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]Series, error) {
idx, err := i.indices(ctx, from, through, userID)
if err != nil {
return nil, err
}
return idx.Series(ctx, userID, from, through, res, shard, matchers...)
}
func (i *indexShipperQuerier) LabelNames(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]string, error) {
idx, err := i.indices(ctx, from, through, userID)
if err != nil {
return nil, err
}
return idx.LabelNames(ctx, userID, from, through, matchers...)
}
func (i *indexShipperQuerier) LabelValues(ctx context.Context, userID string, from, through model.Time, name string, matchers ...*labels.Matcher) ([]string, error) {
idx, err := i.indices(ctx, from, through, userID)
if err != nil {
return nil, err
}
return idx.LabelValues(ctx, userID, from, through, name, matchers...)
}

@ -4,7 +4,6 @@ import (
"context"
"fmt"
"io/ioutil"
"math"
"path/filepath"
"strconv"
"sync"
@ -16,9 +15,8 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/indexshipper"
shipper_index "github.com/grafana/loki/pkg/storage/stores/indexshipper/index"
"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
)
@ -27,13 +25,12 @@ import (
// TSDB files on disk
type TSDBManager interface {
Start() error
Index
// Builds a new TSDB file from a set of WALs
BuildFromWALs(time.Time, []WALIdentifier) error
}
/*
tsdbManager is responsible for:
tsdbManager is used for managing active index and is responsible for:
* Turning WALs into optimized multi-tenant TSDBs when requested
* Serving reads from these TSDBs
* Shipping them to remote storage
@ -49,8 +46,7 @@ type tsdbManager struct {
sync.RWMutex
chunkFilter chunk.RequestChunkFilterer
shipper indexshipper.IndexShipper
shipper indexshipper.IndexShipper
}
func NewTSDBManager(
@ -176,7 +172,7 @@ func (m *tsdbManager) BuildFromWALs(t time.Time, ids []WALIdentifier) (err error
// chunks may overlap index period bounds, in which case they're written to multiple
pds := make(map[int]index.ChunkMetas)
for _, chk := range chks {
for _, bucket := range indexBuckets(m.indexPeriod, chk.From(), chk.Through()) {
for _, bucket := range indexBuckets(chk.From(), chk.Through()) {
pds[bucket] = append(pds[bucket], chk)
}
}
@ -249,98 +245,11 @@ func (m *tsdbManager) BuildFromWALs(t time.Time, ids []WALIdentifier) (err error
return nil
}
func indexBuckets(indexPeriod time.Duration, from, through model.Time) (res []int) {
start := from.Time().UnixNano() / int64(indexPeriod)
end := through.Time().UnixNano() / int64(indexPeriod)
func indexBuckets(from, through model.Time) (res []int) {
start := from.Time().UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod)
end := through.Time().UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod)
for cur := start; cur <= end; cur++ {
res = append(res, int(cur))
}
return
}
func (m *tsdbManager) indices(ctx context.Context, from, through model.Time, user string) (Index, error) {
var indices []Index
// Ensure we query both per tenant and multitenant TSDBs
for _, bkt := range indexBuckets(m.indexPeriod, from, through) {
if err := m.shipper.ForEach(ctx, fmt.Sprintf("%d", bkt), user, func(idx shipper_index.Index) error {
_, multitenant := parseMultitenantTSDBName(idx.Name())
impl, ok := idx.(Index)
if !ok {
return fmt.Errorf("unexpected shipper index type: %T", idx)
}
if multitenant {
indices = append(indices, NewMultiTenantIndex(impl))
} else {
indices = append(indices, impl)
}
return nil
}); err != nil {
return nil, err
}
}
if len(indices) == 0 {
return NoopIndex{}, nil
}
idx, err := NewMultiIndex(indices...)
if err != nil {
return nil, err
}
if m.chunkFilter != nil {
idx.SetChunkFilterer(m.chunkFilter)
}
return idx, nil
}
// TODO(owen-d): how to better implement this?
// setting 0->maxint will force the tsdbmanager to always query
// underlying tsdbs, which is safe, but can we optimize this?
func (m *tsdbManager) Bounds() (model.Time, model.Time) {
return 0, math.MaxInt64
}
func (m *tsdbManager) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) {
m.chunkFilter = chunkFilter
}
// Close implements Index.Close, but we offload this responsibility
// to the index shipper
func (m *tsdbManager) Close() error {
return nil
}
func (m *tsdbManager) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error) {
idx, err := m.indices(ctx, from, through, userID)
if err != nil {
return nil, err
}
return idx.GetChunkRefs(ctx, userID, from, through, res, shard, matchers...)
}
func (m *tsdbManager) Series(ctx context.Context, userID string, from, through model.Time, res []Series, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]Series, error) {
idx, err := m.indices(ctx, from, through, userID)
if err != nil {
return nil, err
}
return idx.Series(ctx, userID, from, through, res, shard, matchers...)
}
func (m *tsdbManager) LabelNames(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]string, error) {
idx, err := m.indices(ctx, from, through, userID)
if err != nil {
return nil, err
}
return idx.LabelNames(ctx, userID, from, through, matchers...)
}
func (m *tsdbManager) LabelValues(ctx context.Context, userID string, from, through model.Time, name string, matchers ...*labels.Matcher) ([]string, error) {
idx, err := m.indices(ctx, from, through, userID)
if err != nil {
return nil, err
}
return idx.LabelValues(ctx, userID, from, through, name, matchers...)
}

@ -1,7 +1,10 @@
package tsdb
import (
"fmt"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/pkg/storage/chunk/client"
"github.com/grafana/loki/pkg/storage/chunk/fetcher"
@ -10,16 +13,39 @@ import (
"github.com/grafana/loki/pkg/storage/stores/indexshipper"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/downloads"
"github.com/grafana/loki/pkg/storage/stores/series"
"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
util_log "github.com/grafana/loki/pkg/util/log"
)
// we do not need to build store for each schema config since we do not do any schema specific handling yet.
// If we do need to do schema specific handling, it would be a good idea to abstract away the handling since
// running multiple head managers would be complicated and wasteful.
var storeInstance *store
type store struct {
indexWriter IndexWriter
indexStore series.IndexStore
}
// NewStore creates a new store if not initialized already.
// Each call to NewStore will always build a new stores.ChunkWriter even if the store was already initialized since
// fetcher.Fetcher instances could be different due to periodic configs having different types of object storage configured
// for storing chunks.
func NewStore(indexShipperCfg indexshipper.Config, p config.PeriodConfig, f *fetcher.Fetcher,
objectClient client.ObjectClient, limits downloads.Limits, reg prometheus.Registerer) (stores.ChunkWriter, series.IndexStore, error) {
var (
nodeName = indexShipperCfg.IngesterName
dir = indexShipperCfg.ActiveIndexDirectory
)
tsdbMetrics := NewMetrics(reg)
if storeInstance == nil {
storeInstance = &store{}
err := storeInstance.init(indexShipperCfg, p, objectClient, limits, reg)
if err != nil {
return nil, nil, err
}
}
return NewChunkWriter(f, p, storeInstance.indexWriter), storeInstance.indexStore, nil
}
func (s *store) init(indexShipperCfg indexshipper.Config, p config.PeriodConfig,
objectClient client.ObjectClient, limits downloads.Limits, reg prometheus.Registerer) error {
shpr, err := indexshipper.NewIndexShipper(
indexShipperCfg,
@ -30,31 +56,56 @@ func NewStore(indexShipperCfg indexshipper.Config, p config.PeriodConfig, f *fet
prometheus.WrapRegistererWithPrefix("loki_tsdb_shipper_", reg),
)
if err != nil {
return nil, nil, err
return err
}
tsdbManager := NewTSDBManager(
nodeName,
dir,
shpr,
p.IndexTables.Period,
util_log.Logger,
tsdbMetrics,
)
// TODO(owen-d): Only need HeadManager
// on the ingester. Otherwise, the TSDBManager is sufficient
headManager := NewHeadManager(
util_log.Logger,
dir,
tsdbMetrics,
tsdbManager,
)
if err := headManager.Start(); err != nil {
return nil, nil, err
var indices []Index
if indexShipperCfg.Mode != indexshipper.ModeReadOnly {
var (
nodeName = indexShipperCfg.IngesterName
dir = indexShipperCfg.ActiveIndexDirectory
)
tsdbMetrics := NewMetrics(reg)
tsdbManager := NewTSDBManager(
nodeName,
dir,
shpr,
p.IndexTables.Period,
util_log.Logger,
tsdbMetrics,
)
headManager := NewHeadManager(
util_log.Logger,
dir,
tsdbMetrics,
tsdbManager,
)
if err := headManager.Start(); err != nil {
return err
}
s.indexWriter = headManager
indices = append(indices, headManager)
} else {
s.indexWriter = failingIndexWriter{}
}
idx := NewIndexClient(headManager, p)
writer := NewChunkWriter(f, p, headManager)
// TODO(owen-d): add TSDB index-gateway support
indices = append(indices, newIndexShipperQuerier(shpr))
multiIndex, err := NewMultiIndex(indices...)
if err != nil {
return err
}
s.indexStore = NewIndexClient(multiIndex)
return nil
}
type failingIndexWriter struct{}
return writer, idx, nil
func (f failingIndexWriter) Append(_ string, _ labels.Labels, _ index.ChunkMetas) error {
return fmt.Errorf("index writer is not initialized due to tsdb store being initialized in read-only mode")
}

Loading…
Cancel
Save