diff --git a/pkg/storage/stores/tsdb/builder.go b/pkg/storage/stores/tsdb/builder.go index 8b0adbf75a..ec5a64d0a0 100644 --- a/pkg/storage/stores/tsdb/builder.go +++ b/pkg/storage/stores/tsdb/builder.go @@ -24,6 +24,7 @@ import ( type Builder struct { streams map[string]*stream chunksFinalized bool + version int } type stream struct { @@ -32,8 +33,11 @@ type stream struct { chunks index.ChunkMetas } -func NewBuilder() *Builder { - return &Builder{streams: make(map[string]*stream)} +func NewBuilder(version int) *Builder { + return &Builder{ + streams: make(map[string]*stream), + version: version, + } } func (b *Builder) AddSeries(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { @@ -86,27 +90,9 @@ func (b *Builder) DropChunk(streamID string, chk index.ChunkMeta) (bool, error) return chunkFound, nil } -func (b *Builder) BuildWithVersion( - ctx context.Context, - version int, // build TSDB with specified version. 0 means default version. - scratchDir string, - createFn func(from, through model.Time, checksum uint32) Identifier, -) (id Identifier, err error) { - return b.buildWithVersion(ctx, version, scratchDir, createFn) -} - func (b *Builder) Build( ctx context.Context, scratchDir string, - createFn func(from, through model.Time, checksum uint32) Identifier, -) (id Identifier, err error) { - return b.buildWithVersion(ctx, 0, scratchDir, createFn) -} - -func (b *Builder) buildWithVersion( - ctx context.Context, - version int, // build TSDB with specified version. 0 means default version. - scratchDir string, // Determines how to create the resulting Identifier and file name. // This is variable as we use Builder for multiple reasons, // such as building multi-tenant tsdbs on the ingester @@ -127,18 +113,9 @@ func (b *Builder) buildWithVersion( var writer *index.Writer - if version == 0 { - var err error - writer, err = index.NewWriter(ctx, tmpPath) - if err != nil { - return id, err - } - } else { - var err error - writer, err = index.NewWriterWithVersion(ctx, version, tmpPath) - if err != nil { - return id, err - } + writer, err = index.NewWriterWithVersion(ctx, b.version, tmpPath) + if err != nil { + return id, err } // TODO(owen-d): multithread diff --git a/pkg/storage/stores/tsdb/builder_test.go b/pkg/storage/stores/tsdb/builder_test.go new file mode 100644 index 0000000000..52accda20a --- /dev/null +++ b/pkg/storage/stores/tsdb/builder_test.go @@ -0,0 +1,127 @@ +package tsdb + +import ( + "context" + "fmt" + "path/filepath" + "testing" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/storage/stores/tsdb/index" +) + +func Test_Build(t *testing.T) { + setup := func(version int) (context.Context, *Builder, string) { + builder := NewBuilder(version) + tmpDir := t.TempDir() + + lbls1 := mustParseLabels(`{foo="bar", a="b"}`) + + stream := stream{ + labels: lbls1, + fp: model.Fingerprint(lbls1.Hash()), + chunks: buildChunkMetas(1, 5), + } + + builder.AddSeries( + lbls1, + stream.fp, + stream.chunks, + ) + + return context.Background(), builder, tmpDir + } + + getReader := func(path string) *index.Reader { + indexPath := fakeIdentifierPathForBounds(path, 1, 6) //default step is 1 + files, err := filepath.Glob(indexPath) + require.NoError(t, err) + require.Len(t, files, 1) + + reader, err := index.NewFileReader(files[0]) + require.NoError(t, err) + return reader + } + + t.Run("writes index to disk with from/through bounds of series in filename", func(t *testing.T) { + ctx, builder, tmpDir := setup(index.LiveFormat) + + _, err := builder.Build(ctx, tmpDir, func(from, through model.Time, checksum uint32) Identifier { + return &fakeIdentifier{ + parentPath: tmpDir, + from: from, + through: through, + checksum: checksum, + } + }) + + require.NoError(t, err) + indexPath := fakeIdentifierPathForBounds(tmpDir, 1, 6) //default step is 1 + + files, err := filepath.Glob(indexPath) + require.NoError(t, err) + require.Len(t, files, 1) + }) + + t.Run("sorts symbols before writing to the index", func(t *testing.T) { + ctx, builder, tmpDir := setup(index.LiveFormat) + _, err := builder.Build(ctx, tmpDir, func(from, through model.Time, checksum uint32) Identifier { + return &fakeIdentifier{ + parentPath: tmpDir, + from: from, + through: through, + checksum: checksum, + } + }) + require.NoError(t, err) + + reader := getReader(tmpDir) + + symbols := reader.Symbols() + require.NoError(t, err) + + symbolsList := make([]string, 0, 2) + for symbols.Next() { + symbolsList = append(symbolsList, symbols.At()) + } + + require.Equal(t, symbolsList, []string{"a", "b", "bar", "foo"}) + }) + + t.Run("write index with correct version", func(t *testing.T) { + ctx, builder, tmpDir := setup(index.FormatV2) + _, err := builder.Build(ctx, tmpDir, func(from, through model.Time, checksum uint32) Identifier { + return &fakeIdentifier{ + parentPath: tmpDir, + from: from, + through: through, + checksum: checksum, + } + }) + require.NoError(t, err) + reader := getReader(tmpDir) + require.Equal(t, index.FormatV2, reader.Version()) + }) +} + +type fakeIdentifier struct { + parentPath string + from model.Time + through model.Time + checksum uint32 +} + +func (f *fakeIdentifier) Name() string { + return "need to implement Name() function for fakeIdentifier" +} + +func (f *fakeIdentifier) Path() string { + path := fmt.Sprintf("%d-%d-%x.tsdb", f.from, f.through, f.checksum) + return filepath.Join(f.parentPath, path) +} + +func fakeIdentifierPathForBounds(path string, from, through model.Time) string { + return filepath.Join(path, fmt.Sprintf("%d-%d-*.tsdb", from, through)) +} diff --git a/pkg/storage/stores/tsdb/compactor.go b/pkg/storage/stores/tsdb/compactor.go index 928252e025..4530c295e0 100644 --- a/pkg/storage/stores/tsdb/compactor.go +++ b/pkg/storage/stores/tsdb/compactor.go @@ -47,7 +47,7 @@ func (i indexProcessor) OpenCompactedIndexFile(ctx context.Context, path, tableN } }() - builder := NewBuilder() + builder := NewBuilder(index.LiveFormat) err = indexFile.(*TSDBFile).Index.(*TSDBIndex).ForSeries(ctx, nil, 0, math.MaxInt64, func(lbls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { builder.AddSeries(lbls.Copy(), fp, chks) }, labels.MustNewMatcher(labels.MatchEqual, "", "")) @@ -193,7 +193,7 @@ func (t *tableCompactor) CompactTable() error { // It combines the users index from multiTenantIndexes and its existing compacted index(es) func setupBuilder(ctx context.Context, userID string, sourceIndexSet compactor.IndexSet, multiTenantIndexes []Index) (*Builder, error) { sourceIndexes := sourceIndexSet.ListSourceFiles() - builder := NewBuilder() + builder := NewBuilder(index.LiveFormat) // add users index from multi-tenant indexes to the builder for _, idx := range multiTenantIndexes { diff --git a/pkg/storage/stores/tsdb/compactor_test.go b/pkg/storage/stores/tsdb/compactor_test.go index a8f8bd2816..0b6b2d09d8 100644 --- a/pkg/storage/stores/tsdb/compactor_test.go +++ b/pkg/storage/stores/tsdb/compactor_test.go @@ -117,7 +117,7 @@ func (m *mockIndexSet) SetCompactedIndex(compactedIndex compactor.CompactedIndex func setupMultiTenantIndex(t *testing.T, userStreams map[string][]stream, destDir string, ts time.Time) string { require.NoError(t, util.EnsureDirectory(destDir)) - b := NewBuilder() + b := NewBuilder(index.LiveFormat) for userID, streams := range userStreams { for _, stream := range streams { lb := labels.NewBuilder(stream.labels) @@ -155,7 +155,7 @@ func setupMultiTenantIndex(t *testing.T, userStreams map[string][]stream, destDi func setupPerTenantIndex(t *testing.T, streams []stream, destDir string, ts time.Time) string { require.NoError(t, util.EnsureDirectory(destDir)) - b := NewBuilder() + b := NewBuilder(index.LiveFormat) for _, stream := range streams { b.AddSeries( stream.labels, @@ -881,7 +881,7 @@ func setupCompactedIndex(t *testing.T) *testContext { userID := buildUserID(0) buildCompactedIndex := func() *compactedIndex { - builder := NewBuilder() + builder := NewBuilder(index.LiveFormat) stream := buildStream(lbls1, buildChunkMetas(shiftTableStart(0), shiftTableStart(10)), "") builder.AddSeries(stream.labels, stream.fp, stream.chunks) diff --git a/pkg/storage/stores/tsdb/manager.go b/pkg/storage/stores/tsdb/manager.go index c5b970e46d..9332b44e02 100644 --- a/pkg/storage/stores/tsdb/manager.go +++ b/pkg/storage/stores/tsdb/manager.go @@ -175,7 +175,7 @@ func (m *tsdbManager) buildFromHead(heads *tenantHeads, shipper indexshipper.Ind for pd, matchingChks := range pds { b, ok := periods[pd] if !ok { - b = NewBuilder() + b = NewBuilder(index.LiveFormat) periods[pd] = b } diff --git a/pkg/storage/stores/tsdb/querier_test.go b/pkg/storage/stores/tsdb/querier_test.go index e2e5e03e90..e83fa824e3 100644 --- a/pkg/storage/stores/tsdb/querier_test.go +++ b/pkg/storage/stores/tsdb/querier_test.go @@ -24,7 +24,7 @@ func mustParseLabels(s string) labels.Labels { func TestQueryIndex(t *testing.T) { dir := t.TempDir() - b := NewBuilder() + b := NewBuilder(index.LiveFormat) cases := []struct { labels labels.Labels chunks []index.ChunkMeta diff --git a/pkg/storage/stores/tsdb/single_file_index.go b/pkg/storage/stores/tsdb/single_file_index.go index ead47d6767..6c1b020017 100644 --- a/pkg/storage/stores/tsdb/single_file_index.go +++ b/pkg/storage/stores/tsdb/single_file_index.go @@ -51,7 +51,7 @@ func RebuildWithVersion(ctx context.Context, path string, desiredVer int) (index return nil, ErrAlreadyOnDesiredVersion } - builder := NewBuilder() + builder := NewBuilder(desiredVer) err = indexFile.(*TSDBFile).Index.(*TSDBIndex).ForSeries(ctx, nil, 0, math.MaxInt64, func(lbls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { builder.AddSeries(lbls.Copy(), fp, chks) }, labels.MustNewMatcher(labels.MatchEqual, "", "")) @@ -61,7 +61,7 @@ func RebuildWithVersion(ctx context.Context, path string, desiredVer int) (index parentDir := filepath.Dir(path) - id, err := builder.BuildWithVersion(ctx, desiredVer, parentDir, func(from, through model.Time, checksum uint32) Identifier { + id, err := builder.Build(ctx, parentDir, func(from, through model.Time, checksum uint32) Identifier { id := SingleTenantTSDBIdentifier{ TS: time.Now(), From: from, diff --git a/pkg/storage/stores/tsdb/util_test.go b/pkg/storage/stores/tsdb/util_test.go index f3223f161b..2931e59d88 100644 --- a/pkg/storage/stores/tsdb/util_test.go +++ b/pkg/storage/stores/tsdb/util_test.go @@ -18,7 +18,7 @@ type LoadableSeries struct { } func BuildIndex(t testing.TB, dir string, cases []LoadableSeries) *TSDBFile { - b := NewBuilder() + b := NewBuilder(index.LiveFormat) for _, s := range cases { b.AddSeries(s.Labels, model.Fingerprint(s.Labels.Hash()), s.Chunks) diff --git a/tools/tsdb/migrate-versions/main_test.go b/tools/tsdb/migrate-versions/main_test.go index 3b2d1fc33a..92daabb960 100644 --- a/tools/tsdb/migrate-versions/main_test.go +++ b/tools/tsdb/migrate-versions/main_test.go @@ -60,7 +60,7 @@ func TestMigrateTables(t *testing.T) { // setup some tables for i := currTableNum - 5; i <= currTableNum; i++ { - b := tsdb.NewBuilder() + b := tsdb.NewBuilder(index.FormatV2) b.AddSeries(labels.Labels{ { Name: "table_name", @@ -76,7 +76,7 @@ func TestMigrateTables(t *testing.T) { }, }) - id, err := b.BuildWithVersion(context.Background(), index.FormatV2, tempDir, func(from, through model.Time, checksum uint32) tsdb.Identifier { + id, err := b.Build(context.Background(), tempDir, func(from, through model.Time, checksum uint32) tsdb.Identifier { id := tsdb.SingleTenantTSDBIdentifier{ TS: time.Now(), From: from, diff --git a/tools/tsdb/tsdb-map/main.go b/tools/tsdb/tsdb-map/main.go index fdb872f89a..5b65e0857b 100644 --- a/tools/tsdb/tsdb-map/main.go +++ b/tools/tsdb/tsdb-map/main.go @@ -67,7 +67,7 @@ func main() { panic(err) } - builder := tsdb.NewBuilder() + builder := tsdb.NewBuilder(index.LiveFormat) log.Println("Loading index into memory")