From d2ce08280f3daf17f00a9dc344be5047f944b398 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Wed, 9 Mar 2022 17:40:10 +0530 Subject: [PATCH] Compactor improvements (#5587) * upload and cleanup index sets concurrently * reuse batch slice used for building a batch of index during compaction --- pkg/storage/stores/shipper/compactor/table.go | 24 +- .../stores/shipper/compactor/table_test.go | 436 +++++++++--------- 2 files changed, 235 insertions(+), 225 deletions(-) diff --git a/pkg/storage/stores/shipper/compactor/table.go b/pkg/storage/stores/shipper/compactor/table.go index 6289216459..dc7e1624d0 100644 --- a/pkg/storage/stores/shipper/compactor/table.go +++ b/pkg/storage/stores/shipper/compactor/table.go @@ -58,7 +58,8 @@ import ( //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// const ( - uploaderName = "compactor" + uploaderName = "compactor" + uploadIndexSetsConcurrency = 10 readDBsConcurrency = 50 batchSize = 1000 @@ -223,7 +224,8 @@ func (t *table) done() error { } } - for userID, is := range t.indexSets { + userIDs := make([]string, 0, len(t.indexSets)) + for userID := range t.indexSets { // indexSet.done() uploads the compacted db and cleans up the source index files. // For user index sets, the files from common index sets are also a source of index. // if we cleanup common index sets first, and we fail to upload newly compacted dbs in user index sets, then we will lose data. @@ -232,9 +234,14 @@ func (t *table) done() error { continue } - if err := is.done(); err != nil { - return err - } + userIDs = append(userIDs, userID) + } + + err := concurrency.ForEachJob(t.ctx, len(userIDs), uploadIndexSetsConcurrency, func(ctx context.Context, idx int) error { + return t.indexSets[userIDs[idx]].done() + }) + if err != nil { + return err } if commonIndexSet, ok := t.indexSets[""]; ok { @@ -443,9 +450,11 @@ func readFile(logger log.Logger, path string, writeBatch func(userID string, bat } }() + batch := make([]indexEntry, 0, batchSize) + return db.View(func(tx *bbolt.Tx) error { return tx.ForEach(func(name []byte, b *bbolt.Bucket) error { - batch := make([]indexEntry, 0, batchSize) + batch = batch[:0] bucketNameStr := string(name) err := b.ForEach(func(k, v []byte) error { ie := indexEntry{ @@ -466,8 +475,7 @@ func readFile(logger log.Logger, path string, writeBatch func(userID string, bat if err != nil { return err } - // todo(cyriltovena) we should just re-slice to avoid allocations - batch = make([]indexEntry, 0, batchSize) + batch = batch[:0] } return nil diff --git a/pkg/storage/stores/shipper/compactor/table_test.go b/pkg/storage/stores/shipper/compactor/table_test.go index 34467df1e6..2ba6c00308 100644 --- a/pkg/storage/stores/shipper/compactor/table_test.go +++ b/pkg/storage/stores/shipper/compactor/table_test.go @@ -35,236 +35,238 @@ type indexSetState struct { } func TestTable_Compaction(t *testing.T) { - numUsers := 5 - - for _, tc := range []struct { - numUnCompactedCommonDBs int - numUnCompactedPerUserDBs int - numCompactedDBs int - - shouldInitializeCommonIndexSet bool - commonIndexSetState *indexSetState - - shouldInitializeUserIndexSet bool - userIndexSetState *indexSetState - }{ - {}, - { - numCompactedDBs: 1, - }, - { - numCompactedDBs: 2, - commonIndexSetState: &indexSetState{ - uploadCompactedDB: true, - removeSourceObjects: true, - }, - userIndexSetState: &indexSetState{ - uploadCompactedDB: true, - removeSourceObjects: true, - }, - }, - { - numUnCompactedCommonDBs: 1, - commonIndexSetState: &indexSetState{ - uploadCompactedDB: true, - removeSourceObjects: true, - }, - }, - { - numUnCompactedCommonDBs: 10, - commonIndexSetState: &indexSetState{ - uploadCompactedDB: true, - removeSourceObjects: true, - }, - }, - { - numUnCompactedCommonDBs: 10, - numCompactedDBs: 1, - commonIndexSetState: &indexSetState{ - uploadCompactedDB: true, - removeSourceObjects: true, - }, - }, - { - numUnCompactedCommonDBs: 10, - numCompactedDBs: 2, - commonIndexSetState: &indexSetState{ - uploadCompactedDB: true, - removeSourceObjects: true, - }, - userIndexSetState: &indexSetState{ - uploadCompactedDB: true, - removeSourceObjects: true, - }, - }, - { - numUnCompactedPerUserDBs: 1, - commonIndexSetState: &indexSetState{ - removeSourceObjects: true, - }, - userIndexSetState: &indexSetState{ - uploadCompactedDB: true, - removeSourceObjects: true, - }, - }, - { - numUnCompactedPerUserDBs: 1, - numCompactedDBs: 1, - commonIndexSetState: &indexSetState{ - uploadCompactedDB: true, - removeSourceObjects: true, - }, - userIndexSetState: &indexSetState{ - uploadCompactedDB: true, - removeSourceObjects: true, - }, - }, - { - numUnCompactedPerUserDBs: 1, - numCompactedDBs: 2, - commonIndexSetState: &indexSetState{ - uploadCompactedDB: true, - removeSourceObjects: true, - }, - userIndexSetState: &indexSetState{ - uploadCompactedDB: true, - removeSourceObjects: true, - }, - }, - { - numUnCompactedPerUserDBs: 10, - commonIndexSetState: &indexSetState{ - removeSourceObjects: true, - }, - userIndexSetState: &indexSetState{ - uploadCompactedDB: true, - removeSourceObjects: true, - }, - }, - { - numUnCompactedCommonDBs: 10, - numUnCompactedPerUserDBs: 10, - commonIndexSetState: &indexSetState{ - uploadCompactedDB: true, - removeSourceObjects: true, - }, - userIndexSetState: &indexSetState{ - uploadCompactedDB: true, - removeSourceObjects: true, - }, - }, - { - numUnCompactedCommonDBs: 10, - numUnCompactedPerUserDBs: 10, - numCompactedDBs: 1, - commonIndexSetState: &indexSetState{ - uploadCompactedDB: true, - removeSourceObjects: true, - }, - userIndexSetState: &indexSetState{ - uploadCompactedDB: true, - removeSourceObjects: true, - }, - }, - { - numUnCompactedCommonDBs: 10, - numUnCompactedPerUserDBs: 10, - numCompactedDBs: 2, - commonIndexSetState: &indexSetState{ - uploadCompactedDB: true, - removeSourceObjects: true, - }, - userIndexSetState: &indexSetState{ - uploadCompactedDB: true, - removeSourceObjects: true, - }, - }, - } { - commonDBsConfig := testutil.DBsConfig{ - NumCompactedDBs: tc.numCompactedDBs, - NumUnCompactedDBs: tc.numUnCompactedCommonDBs, - } - perUserDBsConfig := testutil.PerUserDBsConfig{ - DBsConfig: testutil.DBsConfig{ - NumCompactedDBs: tc.numCompactedDBs, - NumUnCompactedDBs: tc.numUnCompactedPerUserDBs, - }, - NumUsers: numUsers, - } + for _, numUsers := range []int{uploadIndexSetsConcurrency / 2, uploadIndexSetsConcurrency, uploadIndexSetsConcurrency * 2} { + t.Run(fmt.Sprintf("numUsers=%d", numUsers), func(t *testing.T) { + for _, tc := range []struct { + numUnCompactedCommonDBs int + numUnCompactedPerUserDBs int + numCompactedDBs int + + shouldInitializeCommonIndexSet bool + commonIndexSetState *indexSetState + + shouldInitializeUserIndexSet bool + userIndexSetState *indexSetState + }{ + {}, + { + numCompactedDBs: 1, + }, + { + numCompactedDBs: 2, + commonIndexSetState: &indexSetState{ + uploadCompactedDB: true, + removeSourceObjects: true, + }, + userIndexSetState: &indexSetState{ + uploadCompactedDB: true, + removeSourceObjects: true, + }, + }, + { + numUnCompactedCommonDBs: 1, + commonIndexSetState: &indexSetState{ + uploadCompactedDB: true, + removeSourceObjects: true, + }, + }, + { + numUnCompactedCommonDBs: 10, + commonIndexSetState: &indexSetState{ + uploadCompactedDB: true, + removeSourceObjects: true, + }, + }, + { + numUnCompactedCommonDBs: 10, + numCompactedDBs: 1, + commonIndexSetState: &indexSetState{ + uploadCompactedDB: true, + removeSourceObjects: true, + }, + }, + { + numUnCompactedCommonDBs: 10, + numCompactedDBs: 2, + commonIndexSetState: &indexSetState{ + uploadCompactedDB: true, + removeSourceObjects: true, + }, + userIndexSetState: &indexSetState{ + uploadCompactedDB: true, + removeSourceObjects: true, + }, + }, + { + numUnCompactedPerUserDBs: 1, + commonIndexSetState: &indexSetState{ + removeSourceObjects: true, + }, + userIndexSetState: &indexSetState{ + uploadCompactedDB: true, + removeSourceObjects: true, + }, + }, + { + numUnCompactedPerUserDBs: 1, + numCompactedDBs: 1, + commonIndexSetState: &indexSetState{ + uploadCompactedDB: true, + removeSourceObjects: true, + }, + userIndexSetState: &indexSetState{ + uploadCompactedDB: true, + removeSourceObjects: true, + }, + }, + { + numUnCompactedPerUserDBs: 1, + numCompactedDBs: 2, + commonIndexSetState: &indexSetState{ + uploadCompactedDB: true, + removeSourceObjects: true, + }, + userIndexSetState: &indexSetState{ + uploadCompactedDB: true, + removeSourceObjects: true, + }, + }, + { + numUnCompactedPerUserDBs: 10, + commonIndexSetState: &indexSetState{ + removeSourceObjects: true, + }, + userIndexSetState: &indexSetState{ + uploadCompactedDB: true, + removeSourceObjects: true, + }, + }, + { + numUnCompactedCommonDBs: 10, + numUnCompactedPerUserDBs: 10, + commonIndexSetState: &indexSetState{ + uploadCompactedDB: true, + removeSourceObjects: true, + }, + userIndexSetState: &indexSetState{ + uploadCompactedDB: true, + removeSourceObjects: true, + }, + }, + { + numUnCompactedCommonDBs: 10, + numUnCompactedPerUserDBs: 10, + numCompactedDBs: 1, + commonIndexSetState: &indexSetState{ + uploadCompactedDB: true, + removeSourceObjects: true, + }, + userIndexSetState: &indexSetState{ + uploadCompactedDB: true, + removeSourceObjects: true, + }, + }, + { + numUnCompactedCommonDBs: 10, + numUnCompactedPerUserDBs: 10, + numCompactedDBs: 2, + commonIndexSetState: &indexSetState{ + uploadCompactedDB: true, + removeSourceObjects: true, + }, + userIndexSetState: &indexSetState{ + uploadCompactedDB: true, + removeSourceObjects: true, + }, + }, + } { + commonDBsConfig := testutil.DBsConfig{ + NumCompactedDBs: tc.numCompactedDBs, + NumUnCompactedDBs: tc.numUnCompactedCommonDBs, + } + perUserDBsConfig := testutil.PerUserDBsConfig{ + DBsConfig: testutil.DBsConfig{ + NumCompactedDBs: tc.numCompactedDBs, + NumUnCompactedDBs: tc.numUnCompactedPerUserDBs, + }, + NumUsers: numUsers, + } - t.Run(fmt.Sprintf("%s ; %s", commonDBsConfig.String(), perUserDBsConfig.String()), func(t *testing.T) { - tempDir := t.TempDir() + t.Run(fmt.Sprintf("%s ; %s", commonDBsConfig.String(), perUserDBsConfig.String()), func(t *testing.T) { + tempDir := t.TempDir() - objectStoragePath := filepath.Join(tempDir, objectsStorageDirName) - tablePathInStorage := filepath.Join(objectStoragePath, tableName) - tableWorkingDirectory := filepath.Join(tempDir, workingDirName, tableName) + objectStoragePath := filepath.Join(tempDir, objectsStorageDirName) + tablePathInStorage := filepath.Join(objectStoragePath, tableName) + tableWorkingDirectory := filepath.Join(tempDir, workingDirName, tableName) - testutil.SetupTable(t, filepath.Join(objectStoragePath, tableName), commonDBsConfig, perUserDBsConfig) - testutil.SetupTable(t, filepath.Join(objectStoragePath, fmt.Sprintf("%s-copy", tableName)), commonDBsConfig, perUserDBsConfig) + testutil.SetupTable(t, filepath.Join(objectStoragePath, tableName), commonDBsConfig, perUserDBsConfig) + testutil.SetupTable(t, filepath.Join(objectStoragePath, fmt.Sprintf("%s-copy", tableName)), commonDBsConfig, perUserDBsConfig) - // do the compaction - objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath}) - require.NoError(t, err) + // do the compaction + objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath}) + require.NoError(t, err) - table, err := newTable(context.Background(), tableWorkingDirectory, storage.NewIndexStorageClient(objectClient, ""), - nil, nil) - require.NoError(t, err) + table, err := newTable(context.Background(), tableWorkingDirectory, storage.NewIndexStorageClient(objectClient, ""), + nil, nil) + require.NoError(t, err) - require.NoError(t, table.compact(false)) - - numUserIndexSets, numCommonIndexSets := 0, 0 - for _, is := range table.indexSets { - if is.baseIndexSet.IsUserBasedIndexSet() { - require.Equal(t, tc.userIndexSetState.uploadCompactedDB, is.uploadCompactedDB) - require.Equal(t, tc.userIndexSetState.removeSourceObjects, is.removeSourceObjects) - numUserIndexSets++ - } else { - require.Equal(t, tc.commonIndexSetState.uploadCompactedDB, is.uploadCompactedDB) - require.Equal(t, tc.commonIndexSetState.removeSourceObjects, is.removeSourceObjects) - numCommonIndexSets++ - } - require.False(t, is.compactedDBRecreated) - } + require.NoError(t, table.compact(false)) + + numUserIndexSets, numCommonIndexSets := 0, 0 + for _, is := range table.indexSets { + if is.baseIndexSet.IsUserBasedIndexSet() { + require.Equal(t, tc.userIndexSetState.uploadCompactedDB, is.uploadCompactedDB) + require.Equal(t, tc.userIndexSetState.removeSourceObjects, is.removeSourceObjects) + numUserIndexSets++ + } else { + require.Equal(t, tc.commonIndexSetState.uploadCompactedDB, is.uploadCompactedDB) + require.Equal(t, tc.commonIndexSetState.removeSourceObjects, is.removeSourceObjects) + numCommonIndexSets++ + } + require.False(t, is.compactedDBRecreated) + } - if tc.commonIndexSetState != nil { - require.Equal(t, 1, numCommonIndexSets) - } else { - require.Equal(t, 0, numCommonIndexSets) - } + if tc.commonIndexSetState != nil { + require.Equal(t, 1, numCommonIndexSets) + } else { + require.Equal(t, 0, numCommonIndexSets) + } - if tc.userIndexSetState != nil { - require.Equal(t, numUsers, numUserIndexSets) - } else { - require.Equal(t, 0, numUserIndexSets) - } + if tc.userIndexSetState != nil { + require.Equal(t, numUsers, numUserIndexSets) + } else { + require.Equal(t, 0, numUserIndexSets) + } - // verify the state in the storage after compaction. - expectedNumCommonDBs := 0 - if (commonDBsConfig.NumUnCompactedDBs + commonDBsConfig.NumCompactedDBs) > 0 { - expectedNumCommonDBs = 1 - } - numExpectedUsers := 0 - if (perUserDBsConfig.NumUnCompactedDBs + perUserDBsConfig.NumCompactedDBs) > 0 { - numExpectedUsers = numUsers - } - validateTable(t, tablePathInStorage, expectedNumCommonDBs, numExpectedUsers, func(filename string) { - require.True(t, strings.HasSuffix(filename, ".gz")) - }) + // verify the state in the storage after compaction. + expectedNumCommonDBs := 0 + if (commonDBsConfig.NumUnCompactedDBs + commonDBsConfig.NumCompactedDBs) > 0 { + expectedNumCommonDBs = 1 + } + numExpectedUsers := 0 + if (perUserDBsConfig.NumUnCompactedDBs + perUserDBsConfig.NumCompactedDBs) > 0 { + numExpectedUsers = numUsers + } + validateTable(t, tablePathInStorage, expectedNumCommonDBs, numExpectedUsers, func(filename string) { + require.True(t, strings.HasSuffix(filename, ".gz")) + }) - // verify we have all the kvs in compacted db which were there in source dbs. - compareCompactedTable(t, tablePathInStorage, filepath.Join(objectStoragePath, "test-copy")) + // verify we have all the kvs in compacted db which were there in source dbs. + compareCompactedTable(t, tablePathInStorage, filepath.Join(objectStoragePath, "test-copy")) - // running compaction again should not do anything. - table, err = newTable(context.Background(), tableWorkingDirectory, storage.NewIndexStorageClient(objectClient, ""), - nil, nil) - require.NoError(t, err) + // running compaction again should not do anything. + table, err = newTable(context.Background(), tableWorkingDirectory, storage.NewIndexStorageClient(objectClient, ""), + nil, nil) + require.NoError(t, err) - require.NoError(t, table.compact(false)) + require.NoError(t, table.compact(false)) - for _, is := range table.indexSets { - require.False(t, is.uploadCompactedDB) - require.False(t, is.removeSourceObjects) - require.False(t, is.compactedDBRecreated) + for _, is := range table.indexSets { + require.False(t, is.uploadCompactedDB) + require.False(t, is.removeSourceObjects) + require.False(t, is.compactedDBRecreated) + } + }) } }) }