index-store: fix indexing of chunks overlapping multiple schemas (#8251)

pull/8257/head
Sandeep Sukhani 3 years ago committed by GitHub
parent a183d6308e
commit 96a4fc1409
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 162
      pkg/storage/store_test.go
  3. 6
      pkg/storage/stores/index/index.go
  4. 4
      pkg/storage/stores/indexshipper/shipper.go
  5. 2
      pkg/storage/stores/series/series_index_gateway_store.go
  6. 4
      pkg/storage/stores/series/series_index_store.go
  7. 2
      pkg/storage/stores/series_store_write.go
  8. 2
      pkg/storage/stores/series_store_write_test.go
  9. 7
      pkg/storage/stores/tsdb/store.go

@ -41,6 +41,7 @@
* [7988](https://github.com/grafana/loki/pull/7988) **ashwanthgoli** store: write overlapping chunks to multiple stores.
* [7925](https://github.com/grafana/loki/pull/7925) **sandeepsukhani**: Fix bugs in logs results caching causing query-frontend to return logs outside of query window.
* [8120](https://github.com/grafana/loki/pull/8120) **ashwanthgoli** fix panic on hitting /scheduler/ring when ring is disabled.
* [8251](https://github.com/grafana/loki/pull/8251) **sandeepsukhani** index-store: fix indexing of chunks overlapping multiple schemas.
##### Changes

@ -2,11 +2,15 @@ package storage
import (
"context"
"fmt"
"log"
"math"
"net/http"
_ "net/http/pprof"
"os"
"path"
"path/filepath"
"regexp"
"runtime"
"testing"
"time"
@ -200,7 +204,7 @@ func getLocalStore(cm ClientMetrics) Store {
{
From: config.DayTime{Time: start},
IndexType: "boltdb",
ObjectType: "filesystem",
ObjectType: config.StorageTypeFileSystem,
Schema: "v9",
IndexTables: config.PeriodicTableConfig{
Prefix: "index_",
@ -1001,7 +1005,7 @@ func TestStore_MultipleBoltDBShippersInConfig(t *testing.T) {
boltdbShipperConfig := shipper.Config{}
flagext.DefaultValues(&boltdbShipperConfig)
boltdbShipperConfig.ActiveIndexDirectory = path.Join(tempDir, "index")
boltdbShipperConfig.SharedStoreType = "filesystem"
boltdbShipperConfig.SharedStoreType = config.StorageTypeFileSystem
boltdbShipperConfig.CacheLocation = path.Join(tempDir, "boltdb-shipper-cache")
boltdbShipperConfig.Mode = indexshipper.ModeReadWrite
@ -1019,7 +1023,7 @@ func TestStore_MultipleBoltDBShippersInConfig(t *testing.T) {
{
From: config.DayTime{Time: timeToModelTime(firstStoreDate)},
IndexType: "boltdb-shipper",
ObjectType: "filesystem",
ObjectType: config.StorageTypeFileSystem,
Schema: "v9",
IndexTables: config.PeriodicTableConfig{
Prefix: "index_",
@ -1029,7 +1033,7 @@ func TestStore_MultipleBoltDBShippersInConfig(t *testing.T) {
{
From: config.DayTime{Time: timeToModelTime(secondStoreDate)},
IndexType: "boltdb-shipper",
ObjectType: "filesystem",
ObjectType: config.StorageTypeFileSystem,
Schema: "v11",
IndexTables: config.PeriodicTableConfig{
Prefix: "index_",
@ -1289,7 +1293,7 @@ func TestGetIndexStoreTableRanges(t *testing.T) {
{
From: config.DayTime{Time: now.Add(30 * 24 * time.Hour)},
IndexType: config.BoltDBShipperType,
ObjectType: "filesystem",
ObjectType: config.StorageTypeFileSystem,
Schema: "v9",
IndexTables: config.PeriodicTableConfig{
Prefix: "index_",
@ -1299,7 +1303,7 @@ func TestGetIndexStoreTableRanges(t *testing.T) {
{
From: config.DayTime{Time: now.Add(20 * 24 * time.Hour)},
IndexType: config.BoltDBShipperType,
ObjectType: "filesystem",
ObjectType: config.StorageTypeFileSystem,
Schema: "v11",
IndexTables: config.PeriodicTableConfig{
Prefix: "index_",
@ -1310,7 +1314,7 @@ func TestGetIndexStoreTableRanges(t *testing.T) {
{
From: config.DayTime{Time: now.Add(15 * 24 * time.Hour)},
IndexType: config.TSDBType,
ObjectType: "filesystem",
ObjectType: config.StorageTypeFileSystem,
Schema: "v11",
IndexTables: config.PeriodicTableConfig{
Prefix: "index_",
@ -1321,7 +1325,7 @@ func TestGetIndexStoreTableRanges(t *testing.T) {
{
From: config.DayTime{Time: now.Add(10 * 24 * time.Hour)},
IndexType: config.StorageTypeBigTable,
ObjectType: "filesystem",
ObjectType: config.StorageTypeFileSystem,
Schema: "v11",
IndexTables: config.PeriodicTableConfig{
Prefix: "index_",
@ -1332,7 +1336,7 @@ func TestGetIndexStoreTableRanges(t *testing.T) {
{
From: config.DayTime{Time: now.Add(5 * 24 * time.Hour)},
IndexType: config.TSDBType,
ObjectType: "filesystem",
ObjectType: config.StorageTypeFileSystem,
Schema: "v11",
IndexTables: config.PeriodicTableConfig{
Prefix: "index_",
@ -1377,3 +1381,143 @@ func TestGetIndexStoreTableRanges(t *testing.T) {
},
}, getIndexStoreTableRanges(config.TSDBType, schemaConfig.Configs))
}
func TestStore_BoltdbTsdbSameIndexPrefix(t *testing.T) {
tempDir := t.TempDir()
ingesterName := "ingester-1"
limits, err := validation.NewOverrides(validation.Limits{}, nil)
require.NoError(t, err)
// config for BoltDB Shipper
boltdbShipperConfig := shipper.Config{}
flagext.DefaultValues(&boltdbShipperConfig)
boltdbShipperConfig.ActiveIndexDirectory = path.Join(tempDir, "index")
boltdbShipperConfig.SharedStoreType = config.StorageTypeFileSystem
boltdbShipperConfig.CacheLocation = path.Join(tempDir, "boltdb-shipper-cache")
boltdbShipperConfig.Mode = indexshipper.ModeReadWrite
boltdbShipperConfig.IngesterName = ingesterName
// config for tsdb Shipper
tsdbShipperConfig := indexshipper.Config{}
flagext.DefaultValues(&tsdbShipperConfig)
tsdbShipperConfig.ActiveIndexDirectory = path.Join(tempDir, "tsdb-index")
tsdbShipperConfig.SharedStoreType = config.StorageTypeFileSystem
tsdbShipperConfig.CacheLocation = path.Join(tempDir, "tsdb-shipper-cache")
tsdbShipperConfig.Mode = indexshipper.ModeReadWrite
tsdbShipperConfig.IngesterName = ingesterName
// dates for activation of boltdb shippers
boltdbShipperStartDate := parseDate("2019-01-01")
tsdbStartDate := parseDate("2019-01-02")
cfg := Config{
FSConfig: local.FSConfig{Directory: path.Join(tempDir, "chunks")},
BoltDBShipperConfig: boltdbShipperConfig,
TSDBShipperConfig: tsdbShipperConfig,
}
schemaConfig := config.SchemaConfig{
Configs: []config.PeriodConfig{
{
From: config.DayTime{Time: timeToModelTime(boltdbShipperStartDate)},
IndexType: "boltdb-shipper",
ObjectType: config.StorageTypeFileSystem,
Schema: "v12",
IndexTables: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
},
RowShards: 2,
},
{
From: config.DayTime{Time: timeToModelTime(tsdbStartDate)},
IndexType: "tsdb",
ObjectType: config.StorageTypeFileSystem,
Schema: "v12",
IndexTables: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
},
},
},
}
ResetBoltDBIndexClientWithShipper()
store, err := NewStore(cfg, config.ChunkStoreConfig{}, schemaConfig, limits, cm, nil, util_log.Logger)
require.NoError(t, err)
// time ranges adding a chunk for each store and a chunk which overlaps both the stores
chunksToBuildForTimeRanges := []timeRange{
{
// chunk just for first store
tsdbStartDate.Add(-3 * time.Hour),
tsdbStartDate.Add(-2 * time.Hour),
},
{
// chunk overlapping both the stores
tsdbStartDate.Add(-time.Hour),
tsdbStartDate.Add(time.Hour),
},
{
// chunk just for second store
tsdbStartDate.Add(2 * time.Hour),
tsdbStartDate.Add(3 * time.Hour),
},
}
// build and add chunks to the store
addedChunkIDs := map[string]struct{}{}
for _, tr := range chunksToBuildForTimeRanges {
chk := newChunk(buildTestStreams(fooLabelsWithName, tr))
err := store.PutOne(ctx, chk.From, chk.Through, chk)
require.NoError(t, err)
addedChunkIDs[schemaConfig.ExternalKey(chk.ChunkRef)] = struct{}{}
}
// recreate the store because boltdb-shipper now runs queriers on snapshots which are created every 1 min and during startup.
store.Stop()
// there should be 2 index tables in the object storage
indexTables, err := os.ReadDir(filepath.Join(cfg.FSConfig.Directory, "index"))
require.NoError(t, err)
require.Len(t, indexTables, 2)
require.Equal(t, "index_17897", indexTables[0].Name())
require.Equal(t, "index_17898", indexTables[1].Name())
// there should be just 1 file in each table in the object storage
boltdbFiles, err := os.ReadDir(filepath.Join(cfg.FSConfig.Directory, "index", indexTables[0].Name()))
require.NoError(t, err)
require.Len(t, boltdbFiles, 1)
require.Regexp(t, regexp.MustCompile(fmt.Sprintf(`%s-\d{19}-\d{10}\.gz`, ingesterName)), boltdbFiles[0].Name())
tsdbFiles, err := os.ReadDir(filepath.Join(cfg.FSConfig.Directory, "index", indexTables[1].Name()))
require.NoError(t, err)
require.Len(t, tsdbFiles, 1)
require.Regexp(t, regexp.MustCompile(fmt.Sprintf(`\d{10}-%s\.tsdb\.gz`, ingesterName)), tsdbFiles[0].Name())
store, err = NewStore(cfg, config.ChunkStoreConfig{}, schemaConfig, limits, cm, nil, util_log.Logger)
require.NoError(t, err)
defer store.Stop()
// get all the chunks from both the stores
chunks, _, err := store.GetChunkRefs(ctx, "fake", timeToModelTime(boltdbShipperStartDate), timeToModelTime(tsdbStartDate.Add(24*time.Hour)), newMatchers(fooLabelsWithName.String())...)
require.NoError(t, err)
var totalChunks int
for _, chks := range chunks {
totalChunks += len(chks)
}
// we get common chunk twice because it is indexed in both the stores
require.Equal(t, totalChunks, len(addedChunkIDs)+1)
// check whether we got back all the chunks which were added
for i := range chunks {
for _, c := range chunks[i] {
_, ok := addedChunkIDs[schemaConfig.ExternalKey(c.ChunkRef)]
require.True(t, ok)
}
}
}

@ -26,7 +26,7 @@ type Reader interface {
}
type Writer interface {
IndexChunk(ctx context.Context, chk chunk.Chunk) error
IndexChunk(ctx context.Context, from, through model.Time, chk chunk.Chunk) error
}
type ReaderWriter interface {
@ -117,8 +117,8 @@ func (m monitoredReaderWriter) SetChunkFilterer(chunkFilter chunk.RequestChunkFi
m.rw.SetChunkFilterer(chunkFilter)
}
func (m monitoredReaderWriter) IndexChunk(ctx context.Context, chk chunk.Chunk) error {
func (m monitoredReaderWriter) IndexChunk(ctx context.Context, from, through model.Time, chk chunk.Chunk) error {
return instrument.CollectedRequest(ctx, "index_chunk", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error {
return m.rw.IndexChunk(ctx, chk)
return m.rw.IndexChunk(ctx, from, through, chk)
})
}

@ -70,6 +70,10 @@ type Config struct {
IngesterDBRetainPeriod time.Duration
}
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("", f)
}
// RegisterFlagsWithPrefix registers flags.
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.IndexGatewayClientConfig.RegisterFlagsWithPrefix(prefix+"shipper.index-gateway-client", f)

@ -146,7 +146,7 @@ func (c *IndexGatewayClientStore) SetChunkFilterer(chunkFilter chunk.RequestChun
}
}
func (c *IndexGatewayClientStore) IndexChunk(ctx context.Context, chk chunk.Chunk) error {
func (c *IndexGatewayClientStore) IndexChunk(_ context.Context, _, _ model.Time, _ chunk.Chunk) error {
return fmt.Errorf("index writes not supported on index gateway client")
}

@ -82,8 +82,8 @@ func NewIndexReaderWriter(schemaCfg config.SchemaConfig, schema series_index.Ser
}
}
func (c *indexReaderWriter) IndexChunk(ctx context.Context, chk chunk.Chunk) error {
writeReqs, keysToCache, err := c.calculateIndexEntries(ctx, chk.From, chk.Through, chk)
func (c *indexReaderWriter) IndexChunk(ctx context.Context, from, through model.Time, chk chunk.Chunk) error {
writeReqs, keysToCache, err := c.calculateIndexEntries(ctx, from, through, chk)
if err != nil {
return err
}

@ -97,7 +97,7 @@ func (c *Writer) PutOne(ctx context.Context, from, through model.Time, chk chunk
}
}
if err := c.indexWriter.IndexChunk(ctx, chk); err != nil {
if err := c.indexWriter.IndexChunk(ctx, from, through, chk); err != nil {
return err
}

@ -46,7 +46,7 @@ type mockIndexWriter struct {
called int
}
func (m *mockIndexWriter) IndexChunk(ctx context.Context, chk chunk.Chunk) error {
func (m *mockIndexWriter) IndexChunk(_ context.Context, _, _ model.Time, _ chunk.Chunk) error {
m.called++
return nil
}

@ -9,6 +9,7 @@ import (
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/pkg/storage/chunk"
@ -183,7 +184,7 @@ func (s *store) Stop() {
})
}
func (s *store) IndexChunk(ctx context.Context, chk chunk.Chunk) error {
func (s *store) IndexChunk(ctx context.Context, from model.Time, through model.Time, chk chunk.Chunk) error {
// Always write the index to benefit durability via replication factor.
approxKB := math.Round(float64(chk.Data.UncompressedSize()) / float64(1<<10))
metas := tsdb_index.ChunkMetas{
@ -199,7 +200,7 @@ func (s *store) IndexChunk(ctx context.Context, chk chunk.Chunk) error {
return errors.Wrap(err, "writing index entry")
}
return s.backupIndexWriter.IndexChunk(ctx, chk)
return s.backupIndexWriter.IndexChunk(ctx, from, through, chk)
}
type failingIndexWriter struct{}
@ -210,6 +211,6 @@ func (f failingIndexWriter) Append(_ string, _ labels.Labels, _ uint64, _ tsdb_i
type noopBackupIndexWriter struct{}
func (n noopBackupIndexWriter) IndexChunk(ctx context.Context, chk chunk.Chunk) error {
func (n noopBackupIndexWriter) IndexChunk(_ context.Context, _, _ model.Time, _ chunk.Chunk) error {
return nil
}

Loading…
Cancel
Save