Add version to TSDB index builder struct (#9566)

Add version as a property to the TSDB Index Builder, forcing
users of the builder to specify a version for each builder instance.
This is the first of many PRs that will add the TSDB index version to
the schema version config.
pull/9761/head
Trevor Whitney 3 years ago committed by GitHub
parent ce81895241
commit d74c67a9d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 41
      pkg/storage/stores/tsdb/builder.go
  2. 127
      pkg/storage/stores/tsdb/builder_test.go
  3. 4
      pkg/storage/stores/tsdb/compactor.go
  4. 6
      pkg/storage/stores/tsdb/compactor_test.go
  5. 2
      pkg/storage/stores/tsdb/manager.go
  6. 2
      pkg/storage/stores/tsdb/querier_test.go
  7. 4
      pkg/storage/stores/tsdb/single_file_index.go
  8. 2
      pkg/storage/stores/tsdb/util_test.go
  9. 4
      tools/tsdb/migrate-versions/main_test.go
  10. 2
      tools/tsdb/tsdb-map/main.go

@ -24,6 +24,7 @@ import (
type Builder struct {
streams map[string]*stream
chunksFinalized bool
version int
}
type stream struct {
@ -32,8 +33,11 @@ type stream struct {
chunks index.ChunkMetas
}
func NewBuilder() *Builder {
return &Builder{streams: make(map[string]*stream)}
func NewBuilder(version int) *Builder {
return &Builder{
streams: make(map[string]*stream),
version: version,
}
}
func (b *Builder) AddSeries(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) {
@ -86,27 +90,9 @@ func (b *Builder) DropChunk(streamID string, chk index.ChunkMeta) (bool, error)
return chunkFound, nil
}
func (b *Builder) BuildWithVersion(
ctx context.Context,
version int, // build TSDB with specified version. 0 means default version.
scratchDir string,
createFn func(from, through model.Time, checksum uint32) Identifier,
) (id Identifier, err error) {
return b.buildWithVersion(ctx, version, scratchDir, createFn)
}
func (b *Builder) Build(
ctx context.Context,
scratchDir string,
createFn func(from, through model.Time, checksum uint32) Identifier,
) (id Identifier, err error) {
return b.buildWithVersion(ctx, 0, scratchDir, createFn)
}
func (b *Builder) buildWithVersion(
ctx context.Context,
version int, // build TSDB with specified version. 0 means default version.
scratchDir string,
// Determines how to create the resulting Identifier and file name.
// This is variable as we use Builder for multiple reasons,
// such as building multi-tenant tsdbs on the ingester
@ -127,18 +113,9 @@ func (b *Builder) buildWithVersion(
var writer *index.Writer
if version == 0 {
var err error
writer, err = index.NewWriter(ctx, tmpPath)
if err != nil {
return id, err
}
} else {
var err error
writer, err = index.NewWriterWithVersion(ctx, version, tmpPath)
if err != nil {
return id, err
}
writer, err = index.NewWriterWithVersion(ctx, b.version, tmpPath)
if err != nil {
return id, err
}
// TODO(owen-d): multithread

@ -0,0 +1,127 @@
package tsdb
import (
"context"
"fmt"
"path/filepath"
"testing"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
)
func Test_Build(t *testing.T) {
setup := func(version int) (context.Context, *Builder, string) {
builder := NewBuilder(version)
tmpDir := t.TempDir()
lbls1 := mustParseLabels(`{foo="bar", a="b"}`)
stream := stream{
labels: lbls1,
fp: model.Fingerprint(lbls1.Hash()),
chunks: buildChunkMetas(1, 5),
}
builder.AddSeries(
lbls1,
stream.fp,
stream.chunks,
)
return context.Background(), builder, tmpDir
}
getReader := func(path string) *index.Reader {
indexPath := fakeIdentifierPathForBounds(path, 1, 6) //default step is 1
files, err := filepath.Glob(indexPath)
require.NoError(t, err)
require.Len(t, files, 1)
reader, err := index.NewFileReader(files[0])
require.NoError(t, err)
return reader
}
t.Run("writes index to disk with from/through bounds of series in filename", func(t *testing.T) {
ctx, builder, tmpDir := setup(index.LiveFormat)
_, err := builder.Build(ctx, tmpDir, func(from, through model.Time, checksum uint32) Identifier {
return &fakeIdentifier{
parentPath: tmpDir,
from: from,
through: through,
checksum: checksum,
}
})
require.NoError(t, err)
indexPath := fakeIdentifierPathForBounds(tmpDir, 1, 6) //default step is 1
files, err := filepath.Glob(indexPath)
require.NoError(t, err)
require.Len(t, files, 1)
})
t.Run("sorts symbols before writing to the index", func(t *testing.T) {
ctx, builder, tmpDir := setup(index.LiveFormat)
_, err := builder.Build(ctx, tmpDir, func(from, through model.Time, checksum uint32) Identifier {
return &fakeIdentifier{
parentPath: tmpDir,
from: from,
through: through,
checksum: checksum,
}
})
require.NoError(t, err)
reader := getReader(tmpDir)
symbols := reader.Symbols()
require.NoError(t, err)
symbolsList := make([]string, 0, 2)
for symbols.Next() {
symbolsList = append(symbolsList, symbols.At())
}
require.Equal(t, symbolsList, []string{"a", "b", "bar", "foo"})
})
t.Run("write index with correct version", func(t *testing.T) {
ctx, builder, tmpDir := setup(index.FormatV2)
_, err := builder.Build(ctx, tmpDir, func(from, through model.Time, checksum uint32) Identifier {
return &fakeIdentifier{
parentPath: tmpDir,
from: from,
through: through,
checksum: checksum,
}
})
require.NoError(t, err)
reader := getReader(tmpDir)
require.Equal(t, index.FormatV2, reader.Version())
})
}
type fakeIdentifier struct {
parentPath string
from model.Time
through model.Time
checksum uint32
}
func (f *fakeIdentifier) Name() string {
return "need to implement Name() function for fakeIdentifier"
}
func (f *fakeIdentifier) Path() string {
path := fmt.Sprintf("%d-%d-%x.tsdb", f.from, f.through, f.checksum)
return filepath.Join(f.parentPath, path)
}
func fakeIdentifierPathForBounds(path string, from, through model.Time) string {
return filepath.Join(path, fmt.Sprintf("%d-%d-*.tsdb", from, through))
}

@ -47,7 +47,7 @@ func (i indexProcessor) OpenCompactedIndexFile(ctx context.Context, path, tableN
}
}()
builder := NewBuilder()
builder := NewBuilder(index.LiveFormat)
err = indexFile.(*TSDBFile).Index.(*TSDBIndex).ForSeries(ctx, nil, 0, math.MaxInt64, func(lbls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) {
builder.AddSeries(lbls.Copy(), fp, chks)
}, labels.MustNewMatcher(labels.MatchEqual, "", ""))
@ -193,7 +193,7 @@ func (t *tableCompactor) CompactTable() error {
// It combines the users index from multiTenantIndexes and its existing compacted index(es)
func setupBuilder(ctx context.Context, userID string, sourceIndexSet compactor.IndexSet, multiTenantIndexes []Index) (*Builder, error) {
sourceIndexes := sourceIndexSet.ListSourceFiles()
builder := NewBuilder()
builder := NewBuilder(index.LiveFormat)
// add users index from multi-tenant indexes to the builder
for _, idx := range multiTenantIndexes {

@ -117,7 +117,7 @@ func (m *mockIndexSet) SetCompactedIndex(compactedIndex compactor.CompactedIndex
func setupMultiTenantIndex(t *testing.T, userStreams map[string][]stream, destDir string, ts time.Time) string {
require.NoError(t, util.EnsureDirectory(destDir))
b := NewBuilder()
b := NewBuilder(index.LiveFormat)
for userID, streams := range userStreams {
for _, stream := range streams {
lb := labels.NewBuilder(stream.labels)
@ -155,7 +155,7 @@ func setupMultiTenantIndex(t *testing.T, userStreams map[string][]stream, destDi
func setupPerTenantIndex(t *testing.T, streams []stream, destDir string, ts time.Time) string {
require.NoError(t, util.EnsureDirectory(destDir))
b := NewBuilder()
b := NewBuilder(index.LiveFormat)
for _, stream := range streams {
b.AddSeries(
stream.labels,
@ -881,7 +881,7 @@ func setupCompactedIndex(t *testing.T) *testContext {
userID := buildUserID(0)
buildCompactedIndex := func() *compactedIndex {
builder := NewBuilder()
builder := NewBuilder(index.LiveFormat)
stream := buildStream(lbls1, buildChunkMetas(shiftTableStart(0), shiftTableStart(10)), "")
builder.AddSeries(stream.labels, stream.fp, stream.chunks)

@ -175,7 +175,7 @@ func (m *tsdbManager) buildFromHead(heads *tenantHeads, shipper indexshipper.Ind
for pd, matchingChks := range pds {
b, ok := periods[pd]
if !ok {
b = NewBuilder()
b = NewBuilder(index.LiveFormat)
periods[pd] = b
}

@ -24,7 +24,7 @@ func mustParseLabels(s string) labels.Labels {
func TestQueryIndex(t *testing.T) {
dir := t.TempDir()
b := NewBuilder()
b := NewBuilder(index.LiveFormat)
cases := []struct {
labels labels.Labels
chunks []index.ChunkMeta

@ -51,7 +51,7 @@ func RebuildWithVersion(ctx context.Context, path string, desiredVer int) (index
return nil, ErrAlreadyOnDesiredVersion
}
builder := NewBuilder()
builder := NewBuilder(desiredVer)
err = indexFile.(*TSDBFile).Index.(*TSDBIndex).ForSeries(ctx, nil, 0, math.MaxInt64, func(lbls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) {
builder.AddSeries(lbls.Copy(), fp, chks)
}, labels.MustNewMatcher(labels.MatchEqual, "", ""))
@ -61,7 +61,7 @@ func RebuildWithVersion(ctx context.Context, path string, desiredVer int) (index
parentDir := filepath.Dir(path)
id, err := builder.BuildWithVersion(ctx, desiredVer, parentDir, func(from, through model.Time, checksum uint32) Identifier {
id, err := builder.Build(ctx, parentDir, func(from, through model.Time, checksum uint32) Identifier {
id := SingleTenantTSDBIdentifier{
TS: time.Now(),
From: from,

@ -18,7 +18,7 @@ type LoadableSeries struct {
}
func BuildIndex(t testing.TB, dir string, cases []LoadableSeries) *TSDBFile {
b := NewBuilder()
b := NewBuilder(index.LiveFormat)
for _, s := range cases {
b.AddSeries(s.Labels, model.Fingerprint(s.Labels.Hash()), s.Chunks)

@ -60,7 +60,7 @@ func TestMigrateTables(t *testing.T) {
// setup some tables
for i := currTableNum - 5; i <= currTableNum; i++ {
b := tsdb.NewBuilder()
b := tsdb.NewBuilder(index.FormatV2)
b.AddSeries(labels.Labels{
{
Name: "table_name",
@ -76,7 +76,7 @@ func TestMigrateTables(t *testing.T) {
},
})
id, err := b.BuildWithVersion(context.Background(), index.FormatV2, tempDir, func(from, through model.Time, checksum uint32) tsdb.Identifier {
id, err := b.Build(context.Background(), tempDir, func(from, through model.Time, checksum uint32) tsdb.Identifier {
id := tsdb.SingleTenantTSDBIdentifier{
TS: time.Now(),
From: from,

@ -67,7 +67,7 @@ func main() {
panic(err)
}
builder := tsdb.NewBuilder()
builder := tsdb.NewBuilder(index.LiveFormat)
log.Println("Loading index into memory")

Loading…
Cancel
Save