Compactor improvements (#5587)

* upload and cleanup index sets concurrently

* reuse batch slice used for building a batch of index during compaction
pull/5593/head
Sandeep Sukhani 3 years ago committed by GitHub
parent 294fd33a38
commit d2ce08280f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 24
      pkg/storage/stores/shipper/compactor/table.go
  2. 436
      pkg/storage/stores/shipper/compactor/table_test.go

@ -58,7 +58,8 @@ import (
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
const (
uploaderName = "compactor"
uploaderName = "compactor"
uploadIndexSetsConcurrency = 10
readDBsConcurrency = 50
batchSize = 1000
@ -223,7 +224,8 @@ func (t *table) done() error {
}
}
for userID, is := range t.indexSets {
userIDs := make([]string, 0, len(t.indexSets))
for userID := range t.indexSets {
// indexSet.done() uploads the compacted db and cleans up the source index files.
// For user index sets, the files from common index sets are also a source of index.
// if we cleanup common index sets first, and we fail to upload newly compacted dbs in user index sets, then we will lose data.
@ -232,9 +234,14 @@ func (t *table) done() error {
continue
}
if err := is.done(); err != nil {
return err
}
userIDs = append(userIDs, userID)
}
err := concurrency.ForEachJob(t.ctx, len(userIDs), uploadIndexSetsConcurrency, func(ctx context.Context, idx int) error {
return t.indexSets[userIDs[idx]].done()
})
if err != nil {
return err
}
if commonIndexSet, ok := t.indexSets[""]; ok {
@ -443,9 +450,11 @@ func readFile(logger log.Logger, path string, writeBatch func(userID string, bat
}
}()
batch := make([]indexEntry, 0, batchSize)
return db.View(func(tx *bbolt.Tx) error {
return tx.ForEach(func(name []byte, b *bbolt.Bucket) error {
batch := make([]indexEntry, 0, batchSize)
batch = batch[:0]
bucketNameStr := string(name)
err := b.ForEach(func(k, v []byte) error {
ie := indexEntry{
@ -466,8 +475,7 @@ func readFile(logger log.Logger, path string, writeBatch func(userID string, bat
if err != nil {
return err
}
// todo(cyriltovena) we should just re-slice to avoid allocations
batch = make([]indexEntry, 0, batchSize)
batch = batch[:0]
}
return nil

@ -35,236 +35,238 @@ type indexSetState struct {
}
func TestTable_Compaction(t *testing.T) {
numUsers := 5
for _, tc := range []struct {
numUnCompactedCommonDBs int
numUnCompactedPerUserDBs int
numCompactedDBs int
shouldInitializeCommonIndexSet bool
commonIndexSetState *indexSetState
shouldInitializeUserIndexSet bool
userIndexSetState *indexSetState
}{
{},
{
numCompactedDBs: 1,
},
{
numCompactedDBs: 2,
commonIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
userIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedCommonDBs: 1,
commonIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedCommonDBs: 10,
commonIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedCommonDBs: 10,
numCompactedDBs: 1,
commonIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedCommonDBs: 10,
numCompactedDBs: 2,
commonIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
userIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedPerUserDBs: 1,
commonIndexSetState: &indexSetState{
removeSourceObjects: true,
},
userIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedPerUserDBs: 1,
numCompactedDBs: 1,
commonIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
userIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedPerUserDBs: 1,
numCompactedDBs: 2,
commonIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
userIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedPerUserDBs: 10,
commonIndexSetState: &indexSetState{
removeSourceObjects: true,
},
userIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedCommonDBs: 10,
numUnCompactedPerUserDBs: 10,
commonIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
userIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedCommonDBs: 10,
numUnCompactedPerUserDBs: 10,
numCompactedDBs: 1,
commonIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
userIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedCommonDBs: 10,
numUnCompactedPerUserDBs: 10,
numCompactedDBs: 2,
commonIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
userIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
} {
commonDBsConfig := testutil.DBsConfig{
NumCompactedDBs: tc.numCompactedDBs,
NumUnCompactedDBs: tc.numUnCompactedCommonDBs,
}
perUserDBsConfig := testutil.PerUserDBsConfig{
DBsConfig: testutil.DBsConfig{
NumCompactedDBs: tc.numCompactedDBs,
NumUnCompactedDBs: tc.numUnCompactedPerUserDBs,
},
NumUsers: numUsers,
}
for _, numUsers := range []int{uploadIndexSetsConcurrency / 2, uploadIndexSetsConcurrency, uploadIndexSetsConcurrency * 2} {
t.Run(fmt.Sprintf("numUsers=%d", numUsers), func(t *testing.T) {
for _, tc := range []struct {
numUnCompactedCommonDBs int
numUnCompactedPerUserDBs int
numCompactedDBs int
shouldInitializeCommonIndexSet bool
commonIndexSetState *indexSetState
shouldInitializeUserIndexSet bool
userIndexSetState *indexSetState
}{
{},
{
numCompactedDBs: 1,
},
{
numCompactedDBs: 2,
commonIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
userIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedCommonDBs: 1,
commonIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedCommonDBs: 10,
commonIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedCommonDBs: 10,
numCompactedDBs: 1,
commonIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedCommonDBs: 10,
numCompactedDBs: 2,
commonIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
userIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedPerUserDBs: 1,
commonIndexSetState: &indexSetState{
removeSourceObjects: true,
},
userIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedPerUserDBs: 1,
numCompactedDBs: 1,
commonIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
userIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedPerUserDBs: 1,
numCompactedDBs: 2,
commonIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
userIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedPerUserDBs: 10,
commonIndexSetState: &indexSetState{
removeSourceObjects: true,
},
userIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedCommonDBs: 10,
numUnCompactedPerUserDBs: 10,
commonIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
userIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedCommonDBs: 10,
numUnCompactedPerUserDBs: 10,
numCompactedDBs: 1,
commonIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
userIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedCommonDBs: 10,
numUnCompactedPerUserDBs: 10,
numCompactedDBs: 2,
commonIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
userIndexSetState: &indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
} {
commonDBsConfig := testutil.DBsConfig{
NumCompactedDBs: tc.numCompactedDBs,
NumUnCompactedDBs: tc.numUnCompactedCommonDBs,
}
perUserDBsConfig := testutil.PerUserDBsConfig{
DBsConfig: testutil.DBsConfig{
NumCompactedDBs: tc.numCompactedDBs,
NumUnCompactedDBs: tc.numUnCompactedPerUserDBs,
},
NumUsers: numUsers,
}
t.Run(fmt.Sprintf("%s ; %s", commonDBsConfig.String(), perUserDBsConfig.String()), func(t *testing.T) {
tempDir := t.TempDir()
t.Run(fmt.Sprintf("%s ; %s", commonDBsConfig.String(), perUserDBsConfig.String()), func(t *testing.T) {
tempDir := t.TempDir()
objectStoragePath := filepath.Join(tempDir, objectsStorageDirName)
tablePathInStorage := filepath.Join(objectStoragePath, tableName)
tableWorkingDirectory := filepath.Join(tempDir, workingDirName, tableName)
objectStoragePath := filepath.Join(tempDir, objectsStorageDirName)
tablePathInStorage := filepath.Join(objectStoragePath, tableName)
tableWorkingDirectory := filepath.Join(tempDir, workingDirName, tableName)
testutil.SetupTable(t, filepath.Join(objectStoragePath, tableName), commonDBsConfig, perUserDBsConfig)
testutil.SetupTable(t, filepath.Join(objectStoragePath, fmt.Sprintf("%s-copy", tableName)), commonDBsConfig, perUserDBsConfig)
testutil.SetupTable(t, filepath.Join(objectStoragePath, tableName), commonDBsConfig, perUserDBsConfig)
testutil.SetupTable(t, filepath.Join(objectStoragePath, fmt.Sprintf("%s-copy", tableName)), commonDBsConfig, perUserDBsConfig)
// do the compaction
objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath})
require.NoError(t, err)
// do the compaction
objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath})
require.NoError(t, err)
table, err := newTable(context.Background(), tableWorkingDirectory, storage.NewIndexStorageClient(objectClient, ""),
nil, nil)
require.NoError(t, err)
table, err := newTable(context.Background(), tableWorkingDirectory, storage.NewIndexStorageClient(objectClient, ""),
nil, nil)
require.NoError(t, err)
require.NoError(t, table.compact(false))
numUserIndexSets, numCommonIndexSets := 0, 0
for _, is := range table.indexSets {
if is.baseIndexSet.IsUserBasedIndexSet() {
require.Equal(t, tc.userIndexSetState.uploadCompactedDB, is.uploadCompactedDB)
require.Equal(t, tc.userIndexSetState.removeSourceObjects, is.removeSourceObjects)
numUserIndexSets++
} else {
require.Equal(t, tc.commonIndexSetState.uploadCompactedDB, is.uploadCompactedDB)
require.Equal(t, tc.commonIndexSetState.removeSourceObjects, is.removeSourceObjects)
numCommonIndexSets++
}
require.False(t, is.compactedDBRecreated)
}
require.NoError(t, table.compact(false))
numUserIndexSets, numCommonIndexSets := 0, 0
for _, is := range table.indexSets {
if is.baseIndexSet.IsUserBasedIndexSet() {
require.Equal(t, tc.userIndexSetState.uploadCompactedDB, is.uploadCompactedDB)
require.Equal(t, tc.userIndexSetState.removeSourceObjects, is.removeSourceObjects)
numUserIndexSets++
} else {
require.Equal(t, tc.commonIndexSetState.uploadCompactedDB, is.uploadCompactedDB)
require.Equal(t, tc.commonIndexSetState.removeSourceObjects, is.removeSourceObjects)
numCommonIndexSets++
}
require.False(t, is.compactedDBRecreated)
}
if tc.commonIndexSetState != nil {
require.Equal(t, 1, numCommonIndexSets)
} else {
require.Equal(t, 0, numCommonIndexSets)
}
if tc.commonIndexSetState != nil {
require.Equal(t, 1, numCommonIndexSets)
} else {
require.Equal(t, 0, numCommonIndexSets)
}
if tc.userIndexSetState != nil {
require.Equal(t, numUsers, numUserIndexSets)
} else {
require.Equal(t, 0, numUserIndexSets)
}
if tc.userIndexSetState != nil {
require.Equal(t, numUsers, numUserIndexSets)
} else {
require.Equal(t, 0, numUserIndexSets)
}
// verify the state in the storage after compaction.
expectedNumCommonDBs := 0
if (commonDBsConfig.NumUnCompactedDBs + commonDBsConfig.NumCompactedDBs) > 0 {
expectedNumCommonDBs = 1
}
numExpectedUsers := 0
if (perUserDBsConfig.NumUnCompactedDBs + perUserDBsConfig.NumCompactedDBs) > 0 {
numExpectedUsers = numUsers
}
validateTable(t, tablePathInStorage, expectedNumCommonDBs, numExpectedUsers, func(filename string) {
require.True(t, strings.HasSuffix(filename, ".gz"))
})
// verify the state in the storage after compaction.
expectedNumCommonDBs := 0
if (commonDBsConfig.NumUnCompactedDBs + commonDBsConfig.NumCompactedDBs) > 0 {
expectedNumCommonDBs = 1
}
numExpectedUsers := 0
if (perUserDBsConfig.NumUnCompactedDBs + perUserDBsConfig.NumCompactedDBs) > 0 {
numExpectedUsers = numUsers
}
validateTable(t, tablePathInStorage, expectedNumCommonDBs, numExpectedUsers, func(filename string) {
require.True(t, strings.HasSuffix(filename, ".gz"))
})
// verify we have all the kvs in compacted db which were there in source dbs.
compareCompactedTable(t, tablePathInStorage, filepath.Join(objectStoragePath, "test-copy"))
// verify we have all the kvs in compacted db which were there in source dbs.
compareCompactedTable(t, tablePathInStorage, filepath.Join(objectStoragePath, "test-copy"))
// running compaction again should not do anything.
table, err = newTable(context.Background(), tableWorkingDirectory, storage.NewIndexStorageClient(objectClient, ""),
nil, nil)
require.NoError(t, err)
// running compaction again should not do anything.
table, err = newTable(context.Background(), tableWorkingDirectory, storage.NewIndexStorageClient(objectClient, ""),
nil, nil)
require.NoError(t, err)
require.NoError(t, table.compact(false))
require.NoError(t, table.compact(false))
for _, is := range table.indexSets {
require.False(t, is.uploadCompactedDB)
require.False(t, is.removeSourceObjects)
require.False(t, is.compactedDBRecreated)
for _, is := range table.indexSets {
require.False(t, is.uploadCompactedDB)
require.False(t, is.removeSourceObjects)
require.False(t, is.compactedDBRecreated)
}
})
}
})
}

Loading…
Cancel
Save