From 96a4fc1409338b6f4a3df4e2a4ed195fc7da1990 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Tue, 24 Jan 2023 17:57:07 +0530 Subject: [PATCH] index-store: fix indexing of chunks overlapping multiple schemas (#8251) --- CHANGELOG.md | 1 + pkg/storage/store_test.go | 162 +++++++++++++++++- pkg/storage/stores/index/index.go | 6 +- pkg/storage/stores/indexshipper/shipper.go | 4 + .../series/series_index_gateway_store.go | 2 +- .../stores/series/series_index_store.go | 4 +- pkg/storage/stores/series_store_write.go | 2 +- pkg/storage/stores/series_store_write_test.go | 2 +- pkg/storage/stores/tsdb/store.go | 7 +- 9 files changed, 170 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9271cfec72..ffa76c1fb0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,7 @@ * [7988](https://github.com/grafana/loki/pull/7988) **ashwanthgoli** store: write overlapping chunks to multiple stores. * [7925](https://github.com/grafana/loki/pull/7925) **sandeepsukhani**: Fix bugs in logs results caching causing query-frontend to return logs outside of query window. * [8120](https://github.com/grafana/loki/pull/8120) **ashwanthgoli** fix panic on hitting /scheduler/ring when ring is disabled. +* [8251](https://github.com/grafana/loki/pull/8251) **sandeepsukhani** index-store: fix indexing of chunks overlapping multiple schemas. ##### Changes diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 749701aef3..b7063f897f 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -2,11 +2,15 @@ package storage import ( "context" + "fmt" "log" "math" "net/http" _ "net/http/pprof" + "os" "path" + "path/filepath" + "regexp" "runtime" "testing" "time" @@ -200,7 +204,7 @@ func getLocalStore(cm ClientMetrics) Store { { From: config.DayTime{Time: start}, IndexType: "boltdb", - ObjectType: "filesystem", + ObjectType: config.StorageTypeFileSystem, Schema: "v9", IndexTables: config.PeriodicTableConfig{ Prefix: "index_", @@ -1001,7 +1005,7 @@ func TestStore_MultipleBoltDBShippersInConfig(t *testing.T) { boltdbShipperConfig := shipper.Config{} flagext.DefaultValues(&boltdbShipperConfig) boltdbShipperConfig.ActiveIndexDirectory = path.Join(tempDir, "index") - boltdbShipperConfig.SharedStoreType = "filesystem" + boltdbShipperConfig.SharedStoreType = config.StorageTypeFileSystem boltdbShipperConfig.CacheLocation = path.Join(tempDir, "boltdb-shipper-cache") boltdbShipperConfig.Mode = indexshipper.ModeReadWrite @@ -1019,7 +1023,7 @@ func TestStore_MultipleBoltDBShippersInConfig(t *testing.T) { { From: config.DayTime{Time: timeToModelTime(firstStoreDate)}, IndexType: "boltdb-shipper", - ObjectType: "filesystem", + ObjectType: config.StorageTypeFileSystem, Schema: "v9", IndexTables: config.PeriodicTableConfig{ Prefix: "index_", @@ -1029,7 +1033,7 @@ func TestStore_MultipleBoltDBShippersInConfig(t *testing.T) { { From: config.DayTime{Time: timeToModelTime(secondStoreDate)}, IndexType: "boltdb-shipper", - ObjectType: "filesystem", + ObjectType: config.StorageTypeFileSystem, Schema: "v11", IndexTables: config.PeriodicTableConfig{ Prefix: "index_", @@ -1289,7 +1293,7 @@ func TestGetIndexStoreTableRanges(t *testing.T) { { From: config.DayTime{Time: now.Add(30 * 24 * time.Hour)}, IndexType: config.BoltDBShipperType, - ObjectType: "filesystem", + ObjectType: config.StorageTypeFileSystem, Schema: "v9", IndexTables: config.PeriodicTableConfig{ Prefix: "index_", @@ -1299,7 +1303,7 @@ func TestGetIndexStoreTableRanges(t *testing.T) { { From: config.DayTime{Time: now.Add(20 * 24 * time.Hour)}, IndexType: config.BoltDBShipperType, - ObjectType: "filesystem", + ObjectType: config.StorageTypeFileSystem, Schema: "v11", IndexTables: config.PeriodicTableConfig{ Prefix: "index_", @@ -1310,7 +1314,7 @@ func TestGetIndexStoreTableRanges(t *testing.T) { { From: config.DayTime{Time: now.Add(15 * 24 * time.Hour)}, IndexType: config.TSDBType, - ObjectType: "filesystem", + ObjectType: config.StorageTypeFileSystem, Schema: "v11", IndexTables: config.PeriodicTableConfig{ Prefix: "index_", @@ -1321,7 +1325,7 @@ func TestGetIndexStoreTableRanges(t *testing.T) { { From: config.DayTime{Time: now.Add(10 * 24 * time.Hour)}, IndexType: config.StorageTypeBigTable, - ObjectType: "filesystem", + ObjectType: config.StorageTypeFileSystem, Schema: "v11", IndexTables: config.PeriodicTableConfig{ Prefix: "index_", @@ -1332,7 +1336,7 @@ func TestGetIndexStoreTableRanges(t *testing.T) { { From: config.DayTime{Time: now.Add(5 * 24 * time.Hour)}, IndexType: config.TSDBType, - ObjectType: "filesystem", + ObjectType: config.StorageTypeFileSystem, Schema: "v11", IndexTables: config.PeriodicTableConfig{ Prefix: "index_", @@ -1377,3 +1381,143 @@ func TestGetIndexStoreTableRanges(t *testing.T) { }, }, getIndexStoreTableRanges(config.TSDBType, schemaConfig.Configs)) } + +func TestStore_BoltdbTsdbSameIndexPrefix(t *testing.T) { + tempDir := t.TempDir() + + ingesterName := "ingester-1" + limits, err := validation.NewOverrides(validation.Limits{}, nil) + require.NoError(t, err) + + // config for BoltDB Shipper + boltdbShipperConfig := shipper.Config{} + flagext.DefaultValues(&boltdbShipperConfig) + boltdbShipperConfig.ActiveIndexDirectory = path.Join(tempDir, "index") + boltdbShipperConfig.SharedStoreType = config.StorageTypeFileSystem + boltdbShipperConfig.CacheLocation = path.Join(tempDir, "boltdb-shipper-cache") + boltdbShipperConfig.Mode = indexshipper.ModeReadWrite + boltdbShipperConfig.IngesterName = ingesterName + + // config for tsdb Shipper + tsdbShipperConfig := indexshipper.Config{} + flagext.DefaultValues(&tsdbShipperConfig) + tsdbShipperConfig.ActiveIndexDirectory = path.Join(tempDir, "tsdb-index") + tsdbShipperConfig.SharedStoreType = config.StorageTypeFileSystem + tsdbShipperConfig.CacheLocation = path.Join(tempDir, "tsdb-shipper-cache") + tsdbShipperConfig.Mode = indexshipper.ModeReadWrite + tsdbShipperConfig.IngesterName = ingesterName + + // dates for activation of boltdb shippers + boltdbShipperStartDate := parseDate("2019-01-01") + tsdbStartDate := parseDate("2019-01-02") + + cfg := Config{ + FSConfig: local.FSConfig{Directory: path.Join(tempDir, "chunks")}, + BoltDBShipperConfig: boltdbShipperConfig, + TSDBShipperConfig: tsdbShipperConfig, + } + + schemaConfig := config.SchemaConfig{ + Configs: []config.PeriodConfig{ + { + From: config.DayTime{Time: timeToModelTime(boltdbShipperStartDate)}, + IndexType: "boltdb-shipper", + ObjectType: config.StorageTypeFileSystem, + Schema: "v12", + IndexTables: config.PeriodicTableConfig{ + Prefix: "index_", + Period: time.Hour * 24, + }, + RowShards: 2, + }, + { + From: config.DayTime{Time: timeToModelTime(tsdbStartDate)}, + IndexType: "tsdb", + ObjectType: config.StorageTypeFileSystem, + Schema: "v12", + IndexTables: config.PeriodicTableConfig{ + Prefix: "index_", + Period: time.Hour * 24, + }, + }, + }, + } + + ResetBoltDBIndexClientWithShipper() + store, err := NewStore(cfg, config.ChunkStoreConfig{}, schemaConfig, limits, cm, nil, util_log.Logger) + require.NoError(t, err) + + // time ranges adding a chunk for each store and a chunk which overlaps both the stores + chunksToBuildForTimeRanges := []timeRange{ + { + // chunk just for first store + tsdbStartDate.Add(-3 * time.Hour), + tsdbStartDate.Add(-2 * time.Hour), + }, + { + // chunk overlapping both the stores + tsdbStartDate.Add(-time.Hour), + tsdbStartDate.Add(time.Hour), + }, + { + // chunk just for second store + tsdbStartDate.Add(2 * time.Hour), + tsdbStartDate.Add(3 * time.Hour), + }, + } + + // build and add chunks to the store + addedChunkIDs := map[string]struct{}{} + for _, tr := range chunksToBuildForTimeRanges { + chk := newChunk(buildTestStreams(fooLabelsWithName, tr)) + + err := store.PutOne(ctx, chk.From, chk.Through, chk) + require.NoError(t, err) + + addedChunkIDs[schemaConfig.ExternalKey(chk.ChunkRef)] = struct{}{} + } + + // recreate the store because boltdb-shipper now runs queriers on snapshots which are created every 1 min and during startup. + store.Stop() + + // there should be 2 index tables in the object storage + indexTables, err := os.ReadDir(filepath.Join(cfg.FSConfig.Directory, "index")) + require.NoError(t, err) + require.Len(t, indexTables, 2) + require.Equal(t, "index_17897", indexTables[0].Name()) + require.Equal(t, "index_17898", indexTables[1].Name()) + + // there should be just 1 file in each table in the object storage + boltdbFiles, err := os.ReadDir(filepath.Join(cfg.FSConfig.Directory, "index", indexTables[0].Name())) + require.NoError(t, err) + require.Len(t, boltdbFiles, 1) + require.Regexp(t, regexp.MustCompile(fmt.Sprintf(`%s-\d{19}-\d{10}\.gz`, ingesterName)), boltdbFiles[0].Name()) + + tsdbFiles, err := os.ReadDir(filepath.Join(cfg.FSConfig.Directory, "index", indexTables[1].Name())) + require.NoError(t, err) + require.Len(t, tsdbFiles, 1) + require.Regexp(t, regexp.MustCompile(fmt.Sprintf(`\d{10}-%s\.tsdb\.gz`, ingesterName)), tsdbFiles[0].Name()) + + store, err = NewStore(cfg, config.ChunkStoreConfig{}, schemaConfig, limits, cm, nil, util_log.Logger) + require.NoError(t, err) + + defer store.Stop() + + // get all the chunks from both the stores + chunks, _, err := store.GetChunkRefs(ctx, "fake", timeToModelTime(boltdbShipperStartDate), timeToModelTime(tsdbStartDate.Add(24*time.Hour)), newMatchers(fooLabelsWithName.String())...) + require.NoError(t, err) + var totalChunks int + for _, chks := range chunks { + totalChunks += len(chks) + } + // we get common chunk twice because it is indexed in both the stores + require.Equal(t, totalChunks, len(addedChunkIDs)+1) + + // check whether we got back all the chunks which were added + for i := range chunks { + for _, c := range chunks[i] { + _, ok := addedChunkIDs[schemaConfig.ExternalKey(c.ChunkRef)] + require.True(t, ok) + } + } +} diff --git a/pkg/storage/stores/index/index.go b/pkg/storage/stores/index/index.go index 264e7392e8..b9be68d05a 100644 --- a/pkg/storage/stores/index/index.go +++ b/pkg/storage/stores/index/index.go @@ -26,7 +26,7 @@ type Reader interface { } type Writer interface { - IndexChunk(ctx context.Context, chk chunk.Chunk) error + IndexChunk(ctx context.Context, from, through model.Time, chk chunk.Chunk) error } type ReaderWriter interface { @@ -117,8 +117,8 @@ func (m monitoredReaderWriter) SetChunkFilterer(chunkFilter chunk.RequestChunkFi m.rw.SetChunkFilterer(chunkFilter) } -func (m monitoredReaderWriter) IndexChunk(ctx context.Context, chk chunk.Chunk) error { +func (m monitoredReaderWriter) IndexChunk(ctx context.Context, from, through model.Time, chk chunk.Chunk) error { return instrument.CollectedRequest(ctx, "index_chunk", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error { - return m.rw.IndexChunk(ctx, chk) + return m.rw.IndexChunk(ctx, from, through, chk) }) } diff --git a/pkg/storage/stores/indexshipper/shipper.go b/pkg/storage/stores/indexshipper/shipper.go index 5ee8dd3f8d..14d170659a 100644 --- a/pkg/storage/stores/indexshipper/shipper.go +++ b/pkg/storage/stores/indexshipper/shipper.go @@ -70,6 +70,10 @@ type Config struct { IngesterDBRetainPeriod time.Duration } +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + cfg.RegisterFlagsWithPrefix("", f) +} + // RegisterFlagsWithPrefix registers flags. func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { cfg.IndexGatewayClientConfig.RegisterFlagsWithPrefix(prefix+"shipper.index-gateway-client", f) diff --git a/pkg/storage/stores/series/series_index_gateway_store.go b/pkg/storage/stores/series/series_index_gateway_store.go index b6ccc3363e..8b9d374c96 100644 --- a/pkg/storage/stores/series/series_index_gateway_store.go +++ b/pkg/storage/stores/series/series_index_gateway_store.go @@ -146,7 +146,7 @@ func (c *IndexGatewayClientStore) SetChunkFilterer(chunkFilter chunk.RequestChun } } -func (c *IndexGatewayClientStore) IndexChunk(ctx context.Context, chk chunk.Chunk) error { +func (c *IndexGatewayClientStore) IndexChunk(_ context.Context, _, _ model.Time, _ chunk.Chunk) error { return fmt.Errorf("index writes not supported on index gateway client") } diff --git a/pkg/storage/stores/series/series_index_store.go b/pkg/storage/stores/series/series_index_store.go index c0ca13e11e..043344e0e1 100644 --- a/pkg/storage/stores/series/series_index_store.go +++ b/pkg/storage/stores/series/series_index_store.go @@ -82,8 +82,8 @@ func NewIndexReaderWriter(schemaCfg config.SchemaConfig, schema series_index.Ser } } -func (c *indexReaderWriter) IndexChunk(ctx context.Context, chk chunk.Chunk) error { - writeReqs, keysToCache, err := c.calculateIndexEntries(ctx, chk.From, chk.Through, chk) +func (c *indexReaderWriter) IndexChunk(ctx context.Context, from, through model.Time, chk chunk.Chunk) error { + writeReqs, keysToCache, err := c.calculateIndexEntries(ctx, from, through, chk) if err != nil { return err } diff --git a/pkg/storage/stores/series_store_write.go b/pkg/storage/stores/series_store_write.go index f9b5648fb6..3401fdbaed 100644 --- a/pkg/storage/stores/series_store_write.go +++ b/pkg/storage/stores/series_store_write.go @@ -97,7 +97,7 @@ func (c *Writer) PutOne(ctx context.Context, from, through model.Time, chk chunk } } - if err := c.indexWriter.IndexChunk(ctx, chk); err != nil { + if err := c.indexWriter.IndexChunk(ctx, from, through, chk); err != nil { return err } diff --git a/pkg/storage/stores/series_store_write_test.go b/pkg/storage/stores/series_store_write_test.go index b7576bc258..b01d08921c 100644 --- a/pkg/storage/stores/series_store_write_test.go +++ b/pkg/storage/stores/series_store_write_test.go @@ -46,7 +46,7 @@ type mockIndexWriter struct { called int } -func (m *mockIndexWriter) IndexChunk(ctx context.Context, chk chunk.Chunk) error { +func (m *mockIndexWriter) IndexChunk(_ context.Context, _, _ model.Time, _ chunk.Chunk) error { m.called++ return nil } diff --git a/pkg/storage/stores/tsdb/store.go b/pkg/storage/stores/tsdb/store.go index e73bcbdd00..f9c0f8576d 100644 --- a/pkg/storage/stores/tsdb/store.go +++ b/pkg/storage/stores/tsdb/store.go @@ -9,6 +9,7 @@ import ( "github.com/go-kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/pkg/storage/chunk" @@ -183,7 +184,7 @@ func (s *store) Stop() { }) } -func (s *store) IndexChunk(ctx context.Context, chk chunk.Chunk) error { +func (s *store) IndexChunk(ctx context.Context, from model.Time, through model.Time, chk chunk.Chunk) error { // Always write the index to benefit durability via replication factor. approxKB := math.Round(float64(chk.Data.UncompressedSize()) / float64(1<<10)) metas := tsdb_index.ChunkMetas{ @@ -199,7 +200,7 @@ func (s *store) IndexChunk(ctx context.Context, chk chunk.Chunk) error { return errors.Wrap(err, "writing index entry") } - return s.backupIndexWriter.IndexChunk(ctx, chk) + return s.backupIndexWriter.IndexChunk(ctx, from, through, chk) } type failingIndexWriter struct{} @@ -210,6 +211,6 @@ func (f failingIndexWriter) Append(_ string, _ labels.Labels, _ uint64, _ tsdb_i type noopBackupIndexWriter struct{} -func (n noopBackupIndexWriter) IndexChunk(ctx context.Context, chk chunk.Chunk) error { +func (n noopBackupIndexWriter) IndexChunk(_ context.Context, _, _ model.Time, _ chunk.Chunk) error { return nil }