Tsdb tenant index querying (#6598)

* update per tenant index identifier and relevant changes

* add a flag to ForEach index callback to tell whether it is multitenant index or not
pull/6600/head
Sandeep Sukhani 4 years ago committed by GitHub
parent 7f24b0f6ab
commit 9590e5ca4e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      pkg/storage/stores/indexshipper/downloads/index_set.go
  2. 6
      pkg/storage/stores/indexshipper/downloads/index_set_test.go
  3. 4
      pkg/storage/stores/indexshipper/downloads/table_manager_test.go
  4. 18
      pkg/storage/stores/indexshipper/downloads/table_test.go
  5. 2
      pkg/storage/stores/indexshipper/index/index.go
  6. 4
      pkg/storage/stores/indexshipper/shipper.go
  7. 2
      pkg/storage/stores/indexshipper/uploads/index_set.go
  8. 6
      pkg/storage/stores/indexshipper/uploads/index_set_test.go
  9. 2
      pkg/storage/stores/indexshipper/uploads/table_manager_test.go
  10. 2
      pkg/storage/stores/indexshipper/uploads/table_test.go
  11. 2
      pkg/storage/stores/shipper/index/querier.go
  12. 2
      pkg/storage/stores/shipper/index/table_manager.go
  13. 2
      pkg/storage/stores/shipper/index/table_manager_test.go
  14. 8
      pkg/storage/stores/shipper/index/table_test.go
  15. 3
      pkg/storage/stores/tsdb/compact.go
  16. 65
      pkg/storage/stores/tsdb/identifier.go
  17. 21
      pkg/storage/stores/tsdb/identifier_test.go
  18. 3
      pkg/storage/stores/tsdb/index_shipper_querier.go
  19. 3
      pkg/storage/stores/tsdb/querier_test.go
  20. 5
      pkg/storage/stores/tsdb/single_file_index.go
  21. 3
      pkg/storage/stores/tsdb/util_test.go

@ -187,7 +187,7 @@ func (t *indexSet) ForEach(ctx context.Context, callback index.ForEachIndexCallb
level.Debug(logger).Log("index-files-count", len(t.index))
for _, idx := range t.index {
if err := callback(idx); err != nil {
if err := callback(t.userID == "", idx); err != nil {
return err
}
}

@ -40,7 +40,7 @@ func TestIndexSet_Init(t *testing.T) {
checkIndexSet := func() {
indexSet, stopFunc := buildTestIndexSet(t, userID, tempDir)
require.Len(t, indexSet.index, len(indexesSetup))
verifyIndexForEach(t, indexesSetup, func(callbackFunc func(index.Index) error) error {
verifyIndexForEach(t, indexesSetup, func(callbackFunc index.ForEachIndexCallback) error {
return indexSet.ForEach(context.Background(), callbackFunc)
})
stopFunc()
@ -85,7 +85,7 @@ func TestIndexSet_doConcurrentDownload(t *testing.T) {
if tc > 0 {
require.Len(t, indexSet.index, tc)
}
verifyIndexForEach(t, indexesSetup, func(callbackFunc func(index.Index) error) error {
verifyIndexForEach(t, indexesSetup, func(callbackFunc index.ForEachIndexCallback) error {
return indexSet.ForEach(context.Background(), callbackFunc)
})
})
@ -104,7 +104,7 @@ func TestIndexSet_Sync(t *testing.T) {
checkIndexSet := func() {
require.Len(t, indexSet.index, len(indexesSetup))
verifyIndexForEach(t, indexesSetup, func(callbackFunc func(index.Index) error) error {
verifyIndexForEach(t, indexesSetup, func(callbackFunc index.ForEachIndexCallback) error {
return indexSet.ForEach(context.Background(), callbackFunc)
})
}

@ -82,7 +82,7 @@ func TestTableManager_ForEach(t *testing.T) {
if i == 0 {
expectedIndexes = append(expectedIndexes, buildListOfExpectedIndexes(userID, 1, 5)...)
}
verifyIndexForEach(t, expectedIndexes, func(callbackFunc func(index.Index) error) error {
verifyIndexForEach(t, expectedIndexes, func(callbackFunc index.ForEachIndexCallback) error {
return tableManager.ForEach(context.Background(), tableName, userID, callbackFunc)
})
}
@ -357,7 +357,7 @@ func TestTableManager_loadTables(t *testing.T) {
if i == 0 {
expectedIndexes = append(expectedIndexes, buildListOfExpectedIndexes(userID, 1, 5)...)
}
verifyIndexForEach(t, expectedIndexes, func(callbackFunc func(index.Index) error) error {
verifyIndexForEach(t, expectedIndexes, func(callbackFunc index.ForEachIndexCallback) error {
return tableManager.ForEach(context.Background(), tableName, userID, callbackFunc)
})
}

@ -83,7 +83,7 @@ type mockIndexSet struct {
func (m *mockIndexSet) ForEach(ctx context.Context, callback index.ForEachIndexCallback) error {
for _, idx := range m.indexes {
if err := callback(idx); err != nil {
if err := callback(false, idx); err != nil {
return err
}
}
@ -148,7 +148,7 @@ func TestTable_ForEach(t *testing.T) {
var indexesFound []index.Index
err := table.ForEach(context.Background(), tc.withUserID, func(idx index.Index) error {
err := table.ForEach(context.Background(), tc.withUserID, func(_ bool, idx index.Index) error {
indexesFound = append(indexesFound, idx)
return nil
})
@ -313,7 +313,7 @@ func TestTable_Sync(t *testing.T) {
// check that table has expected indexes setup
var indexesFound []string
err := table.ForEach(context.Background(), userID, func(idx index.Index) error {
err := table.ForEach(context.Background(), userID, func(_ bool, idx index.Index) error {
indexesFound = append(indexesFound, idx.Name())
return nil
})
@ -334,7 +334,7 @@ func TestTable_Sync(t *testing.T) {
// check that table got the new index and dropped the deleted index
indexesFound = []string{}
err = table.ForEach(context.Background(), userID, func(idx index.Index) error {
err = table.ForEach(context.Background(), userID, func(_ bool, idx index.Index) error {
indexesFound = append(indexesFound, idx.Name())
return nil
})
@ -376,7 +376,7 @@ func TestTable_Sync(t *testing.T) {
// verify that table has got only compacted db
indexesFound = []string{}
err = table.ForEach(context.Background(), userID, func(idx index.Index) error {
err = table.ForEach(context.Background(), userID, func(_ bool, idx index.Index) error {
indexesFound = append(indexesFound, idx.Name())
return nil
})
@ -409,7 +409,7 @@ func TestLoadTable(t *testing.T) {
// check the loaded table to see it has right index files.
expectedIndexes := append(buildListOfExpectedIndexes(userID, 0, 5), buildListOfExpectedIndexes("", 0, 5)...)
verifyIndexForEach(t, expectedIndexes, func(callbackFunc func(index.Index) error) error {
verifyIndexForEach(t, expectedIndexes, func(callbackFunc index.ForEachIndexCallback) error {
return table.ForEach(context.Background(), userID, callbackFunc)
})
@ -430,7 +430,7 @@ func TestLoadTable(t *testing.T) {
defer table.Close()
expectedIndexes = append(buildListOfExpectedIndexes(userID, 0, 10), buildListOfExpectedIndexes("", 0, 10)...)
verifyIndexForEach(t, expectedIndexes, func(callbackFunc func(index.Index) error) error {
verifyIndexForEach(t, expectedIndexes, func(callbackFunc index.ForEachIndexCallback) error {
return table.ForEach(context.Background(), userID, callbackFunc)
})
}
@ -449,9 +449,9 @@ func ensureIndexSetExistsInTable(t *testing.T, table *table, indexSetName string
require.True(t, ok)
}
func verifyIndexForEach(t *testing.T, expectedIndexes []string, forEachFunc func(callbackFunc func(index.Index) error) error) {
func verifyIndexForEach(t *testing.T, expectedIndexes []string, forEachFunc func(callbackFunc index.ForEachIndexCallback) error) {
var indexesFound []string
err := forEachFunc(func(idx index.Index) error {
err := forEachFunc(func(_ bool, idx index.Index) error {
// get the reader for the index.
readSeeker, err := idx.Reader()
require.NoError(t, err)

@ -13,4 +13,4 @@ type Index interface {
// There is a possibility of files being corrupted due to abrupt shutdown so
// the implementation should take care of gracefully handling failures in opening corrupted files.
type OpenIndexFileFunc func(string) (Index, error)
type ForEachIndexCallback func(Index) error
type ForEachIndexCallback func(isMultiTenantIndex bool, idx Index) error

@ -49,7 +49,7 @@ type IndexShipper interface {
// ForEach lets us iterates through each index file in a table for a specific user.
// On the write path, it would iterate on the files given to the shipper for uploading, until they eventually get dropped from local disk.
// On the read path, it would iterate through the files if already downloaded else it would download and iterate through them.
ForEach(ctx context.Context, tableName, userID string, callback func(index index.Index) error) error
ForEach(ctx context.Context, tableName, userID string, callback index.ForEachIndexCallback) error
Stop()
}
@ -166,7 +166,7 @@ func (s *indexShipper) AddIndex(tableName, userID string, index index.Index) err
return s.uploadsManager.AddIndex(tableName, userID, index)
}
func (s *indexShipper) ForEach(ctx context.Context, tableName, userID string, callback func(index index.Index) error) error {
func (s *indexShipper) ForEach(ctx context.Context, tableName, userID string, callback index.ForEachIndexCallback) error {
if s.downloadsManager != nil {
if err := s.downloadsManager.ForEach(ctx, tableName, userID, callback); err != nil {
return err

@ -70,7 +70,7 @@ func (t *indexSet) ForEach(callback index.ForEachIndexCallback) error {
defer t.indexMtx.RUnlock()
for _, idx := range t.index {
if err := callback(idx); err != nil {
if err := callback(t.userID == "", idx); err != nil {
return err
}
}

@ -35,7 +35,7 @@ func TestIndexSet_Add(t *testing.T) {
// see if we can find all the added indexes in the table.
indexesFound := map[string]*mockIndex{}
err = indexSet.ForEach(func(index index.Index) error {
err = indexSet.ForEach(func(_ bool, index index.Index) error {
indexesFound[index.Path()] = index.(*mockIndex)
return nil
})
@ -106,7 +106,7 @@ func TestIndexSet_Cleanup(t *testing.T) {
// all the indexes should be retained since they were just uploaded
indexesFound := map[string]*mockIndex{}
err = idxSet.ForEach(func(index index.Index) error {
err = idxSet.ForEach(func(_ bool, index index.Index) error {
indexesFound[index.Path()] = index.(*mockIndex)
return nil
})
@ -130,7 +130,7 @@ func TestIndexSet_Cleanup(t *testing.T) {
// get all the indexes that are retained
indexesFound = map[string]*mockIndex{}
err = idxSet.ForEach(func(index index.Index) error {
err = idxSet.ForEach(func(_ bool, index index.Index) error {
indexesFound[index.Path()] = index.(*mockIndex)
return nil
})

@ -64,7 +64,7 @@ func TestTableManager(t *testing.T) {
// see if we can find all the added indexes in the table.
indexesFound := map[string]*mockIndex{}
err := testTableManager.ForEach(tableName, userID, func(index index.Index) error {
err := testTableManager.ForEach(tableName, userID, func(_ bool, index index.Index) error {
indexesFound[index.Path()] = index.(*mockIndex)
return nil
})

@ -35,7 +35,7 @@ func TestTable(t *testing.T) {
// see if we can find all the added indexes in the table.
indexesFound := map[string]*mockIndex{}
err := testTable.ForEach(userID, func(index index.Index) error {
err := testTable.ForEach(userID, func(_ bool, index index.Index) error {
indexesFound[index.Path()] = index.(*mockIndex)
return nil
})

@ -54,7 +54,7 @@ func (q *querier) QueryPages(ctx context.Context, queries []index.Query, callbac
}
}
return q.indexShipper.ForEach(ctx, table, userID, func(idx shipper_index.Index) error {
return q.indexShipper.ForEach(ctx, table, userID, func(_ bool, idx shipper_index.Index) error {
boltdbIndexFile, ok := idx.(*indexfile.IndexFile)
if !ok {
return fmt.Errorf("unexpected index type %T", idx)

@ -45,7 +45,7 @@ type TableManager struct {
type Shipper interface {
AddIndex(tableName, userID string, index shipper_index.Index) error
ForEach(ctx context.Context, tableName, userID string, callback func(index shipper_index.Index) error) error
ForEach(ctx context.Context, tableName, userID string, callback shipper_index.ForEachIndexCallback) error
}
func NewTableManager(cfg Config, indexShipper Shipper, registerer prometheus.Registerer) (*TableManager, error) {

@ -111,7 +111,7 @@ func TestLoadTables(t *testing.T) {
// see if index shipper has the index files
testutil.VerifyIndexes(t, userID, []index.Query{{TableName: tableName}},
func(ctx context.Context, table string, callback func(boltdb *bbolt.DB) error) error {
return tm.indexShipper.ForEach(ctx, table, userID, func(index index_shipper.Index) error {
return tm.indexShipper.ForEach(ctx, table, userID, func(_ bool, index index_shipper.Index) error {
return callback(index.(*indexfile.IndexFile).GetBoltDB())
})
},

@ -41,9 +41,9 @@ func (m *mockIndexShipper) AddIndex(tableName, _ string, index shipper_index.Ind
return nil
}
func (m *mockIndexShipper) ForEach(ctx context.Context, tableName, _ string, callback func(index shipper_index.Index) error) error {
func (m *mockIndexShipper) ForEach(ctx context.Context, tableName, _ string, callback shipper_index.ForEachIndexCallback) error {
for _, idx := range m.addedIndexes[tableName] {
if err := callback(idx); err != nil {
if err := callback(false, idx); err != nil {
return err
}
}
@ -228,7 +228,7 @@ func TestTable_HandoverIndexesToShipper(t *testing.T) {
testutil.VerifyIndexes(t, userID, []index.Query{{TableName: table.name}},
func(ctx context.Context, _ string, callback func(boltdb *bbolt.DB) error) error {
return indexShipper.ForEach(ctx, table.name, "", func(index shipper_index.Index) error {
return indexShipper.ForEach(ctx, table.name, "", func(_ bool, index shipper_index.Index) error {
return callback(index.(*indexfile.IndexFile).GetBoltDB())
})
},
@ -248,7 +248,7 @@ func TestTable_HandoverIndexesToShipper(t *testing.T) {
require.Len(t, indexShipper.addedIndexes[table.name], 2)
testutil.VerifyIndexes(t, userID, []index.Query{{TableName: table.name}},
func(ctx context.Context, _ string, callback func(boltdb *bbolt.DB) error) error {
return indexShipper.ForEach(ctx, table.name, "", func(index shipper_index.Index) error {
return indexShipper.ForEach(ctx, table.name, "", func(_ bool, index shipper_index.Index) error {
return callback(index.(*indexfile.IndexFile).GetBoltDB())
})
},

@ -3,6 +3,7 @@ package tsdb
import (
"context"
"fmt"
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
@ -75,7 +76,7 @@ func (c *Compactor) Compact(ctx context.Context, indices ...*TSDBIndex) (res Ide
c.parentDir,
func(from, through model.Time, checksum uint32) Identifier {
id := SingleTenantTSDBIdentifier{
Tenant: c.tenant,
TS: time.Now(),
From: from,
Through: through,
Checksum: checksum,

@ -9,10 +9,10 @@ import (
"time"
"github.com/prometheus/common/model"
"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
)
const compactedFileUploader = "compactor"
// Identifier can resolve an index to a name (in object storage)
// and a path (on disk)
type Identifier interface {
@ -22,18 +22,19 @@ type Identifier interface {
// identifierFromPath will detect whether this is a single or multitenant TSDB
func identifierFromPath(p string) (Identifier, error) {
multiID, multitenant := parseMultitenantTSDBPath(p)
if multitenant {
parent := filepath.Dir(p)
return newPrefixedIdentifier(multiID, parent, ""), nil
// try parsing as single tenant since the filename is more deterministic without an arbitrary nodename for uploader
id, ok := parseSingleTenantTSDBPath(p)
if ok {
return newPrefixedIdentifier(id, filepath.Dir(p), ""), nil
}
id, parent, ok := parseSingleTenantTSDBPath(p)
multiID, ok := parseMultitenantTSDBPath(p)
if !ok {
return nil, fmt.Errorf("invalid tsdb path: %s", p)
}
return newPrefixedIdentifier(id, parent, ""), nil
parent := filepath.Dir(p)
return newPrefixedIdentifier(multiID, parent, ""), nil
}
func newPrefixedIdentifier(id Identifier, path, name string) prefixedIdentifier {
@ -78,15 +79,16 @@ func (s suffixedIdentifier) Path() string {
// Identifier has all the information needed to resolve a TSDB index
// Notably this abstracts away OS path separators, etc.
type SingleTenantTSDBIdentifier struct {
Tenant string
TS time.Time
From, Through model.Time
Checksum uint32
}
func (i SingleTenantTSDBIdentifier) str() string {
return fmt.Sprintf(
"%s-%d-%d-%x.tsdb",
index.IndexFilename,
"%d-%s-%d-%d-%x.tsdb",
i.TS.Unix(),
compactedFileUploader,
i.From,
i.Through,
i.Checksum,
@ -94,61 +96,57 @@ func (i SingleTenantTSDBIdentifier) str() string {
}
func (i SingleTenantTSDBIdentifier) Name() string {
return path.Join(i.Tenant, i.str())
return i.str()
}
func (i SingleTenantTSDBIdentifier) Path() string {
return filepath.Join(i.Tenant, i.str())
return i.str()
}
func parseSingleTenantTSDBPath(p string) (id SingleTenantTSDBIdentifier, parent string, ok bool) {
func parseSingleTenantTSDBPath(p string) (id SingleTenantTSDBIdentifier, ok bool) {
// parsing as multitenant didn't work, so try single tenant
file := filepath.Base(p)
parents := filepath.Dir(p)
pathPrefix := filepath.Dir(parents)
tenant := filepath.Base(parents)
// no tenant was provided
if tenant == "." {
// incorrect suffix
trimmed := strings.TrimSuffix(p, ".tsdb")
if trimmed == p {
return
}
// incorrect suffix
trimmed := strings.TrimSuffix(file, ".tsdb")
if trimmed == file {
elems := strings.Split(trimmed, "-")
if len(elems) != 5 {
return
}
elems := strings.Split(trimmed, "-")
if len(elems) != 4 {
ts, err := strconv.Atoi(elems[0])
if err != nil {
return
}
if elems[0] != index.IndexFilename {
if elems[1] != compactedFileUploader {
return
}
from, err := strconv.ParseInt(elems[1], 10, 64)
from, err := strconv.ParseInt(elems[2], 10, 64)
if err != nil {
return
}
through, err := strconv.ParseInt(elems[2], 10, 64)
through, err := strconv.ParseInt(elems[3], 10, 64)
if err != nil {
return
}
checksum, err := strconv.ParseInt(elems[3], 16, 32)
checksum, err := strconv.ParseInt(elems[4], 16, 32)
if err != nil {
return
}
return SingleTenantTSDBIdentifier{
Tenant: tenant,
TS: time.Unix(int64(ts), 0),
From: model.Time(from),
Through: model.Time(through),
Checksum: uint32(checksum),
}, pathPrefix, true
}, true
}
@ -171,11 +169,6 @@ func parseMultitenantTSDBPath(p string) (id MultitenantTSDBIdentifier, ok bool)
return parseMultitenantTSDBNameFromBase(cleaned)
}
func parseMultitenantTSDBName(p string) (id MultitenantTSDBIdentifier, ok bool) {
cleaned := path.Base(p)
return parseMultitenantTSDBNameFromBase(cleaned)
}
func parseMultitenantTSDBNameFromBase(name string) (res MultitenantTSDBIdentifier, ok bool) {
trimmed := strings.TrimSuffix(name, ".tsdb")

@ -2,6 +2,7 @@ package tsdb
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
@ -16,9 +17,9 @@ func TestParseSingleTenantTSDBPath(t *testing.T) {
}{
{
desc: "simple_works",
input: "parent/fake/index-1-10-ff.tsdb",
input: "1-compactor-1-10-ff.tsdb",
id: SingleTenantTSDBIdentifier{
Tenant: "fake",
TS: time.Unix(1, 0),
From: 1,
Through: 10,
Checksum: 255,
@ -27,30 +28,24 @@ func TestParseSingleTenantTSDBPath(t *testing.T) {
ok: true,
},
{
desc: "no tenant dir",
input: "index-1-10-ff.tsdb",
ok: false,
},
{
desc: "wrong index name",
input: "fake/notindex-1-10-ff.tsdb",
desc: "wrong uploader name",
input: "1-notcompactor-1-10-ff.tsdb",
ok: false,
},
{
desc: "wrong argument len",
input: "fake/index-10-ff.tsdb",
input: "1-compactor-10-ff.tsdb",
ok: false,
},
{
desc: "wrong argument encoding",
input: "fake/index-ff-10-ff.tsdb",
input: "1-compactor-ff-10-ff.tsdb",
ok: false,
},
} {
t.Run(tc.desc, func(t *testing.T) {
id, parent, ok := parseSingleTenantTSDBPath(tc.input)
id, ok := parseSingleTenantTSDBPath(tc.input)
require.Equal(t, tc.id, id)
require.Equal(t, tc.parent, parent)
require.Equal(t, tc.ok, ok)
})
}

@ -31,8 +31,7 @@ func (i *indexShipperQuerier) indices(ctx context.Context, from, through model.T
// Ensure we query both per tenant and multitenant TSDBs
for _, bkt := range indexBuckets(from, through) {
if err := i.shipper.ForEach(ctx, fmt.Sprintf("%d", bkt), user, func(idx shipper_index.Index) error {
_, multitenant := parseMultitenantTSDBName(idx.Name())
if err := i.shipper.ForEach(ctx, fmt.Sprintf("%d", bkt), user, func(multitenant bool, idx shipper_index.Index) error {
impl, ok := idx.(Index)
if !ok {
return fmt.Errorf("unexpected shipper index type: %T", idx)

@ -3,6 +3,7 @@ package tsdb
import (
"context"
"testing"
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
@ -91,7 +92,7 @@ func TestQueryIndex(t *testing.T) {
dst, err := b.Build(context.Background(), dir, func(from, through model.Time, checksum uint32) Identifier {
id := SingleTenantTSDBIdentifier{
Tenant: "fake",
TS: time.Now(),
From: from,
Through: through,
Checksum: checksum,

@ -6,6 +6,7 @@ import (
"io"
"io/ioutil"
"strings"
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
@ -253,10 +254,10 @@ func (i *TSDBIndex) Checksum() uint32 {
return i.reader.Checksum()
}
func (i *TSDBIndex) Identifier(tenant string) SingleTenantTSDBIdentifier {
func (i *TSDBIndex) Identifier(string) SingleTenantTSDBIdentifier {
lower, upper := i.Bounds()
return SingleTenantTSDBIdentifier{
Tenant: tenant,
TS: time.Now(),
From: lower,
Through: upper,
Checksum: i.Checksum(),

@ -3,6 +3,7 @@ package tsdb
import (
"context"
"testing"
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
@ -25,7 +26,7 @@ func BuildIndex(t *testing.T, dir, tenant string, cases []LoadableSeries) *TSDBF
dst, err := b.Build(context.Background(), dir, func(from, through model.Time, checksum uint32) Identifier {
id := SingleTenantTSDBIdentifier{
Tenant: tenant,
TS: time.Now(),
From: from,
Through: through,
Checksum: checksum,

Loading…
Cancel
Save