From 3ac9818f82b19be88bb99423a9b6de40c1e526d0 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Thu, 2 Dec 2021 18:00:24 +0530 Subject: [PATCH] recreate compacted boltdb files from compactor when they are more than 12 hours old (#4853) --- CHANGELOG.md | 1 + pkg/storage/stores/shipper/compactor/table.go | 162 ++++++++++++++---- .../stores/shipper/compactor/table_test.go | 149 +++++++++++++++- 3 files changed, 272 insertions(+), 40 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6c5beb43c8..90e203bf7d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ * [4736](https://github.com/grafana/loki/pull/4736) **sandeepsukhani**: allow applying retention at different interval than compaction * [4744](https://github.com/grafana/loki/pull/4744) **cyriltovena**: Promtail: Adds GELF UDP support. * [4741](https://github.com/grafana/loki/pull/4741) **sandeepsukhani**: index cleanup fixes while applying retention +* [4853](https://github.com/grafana/loki/pull/4853) **sandeepsukhani**: recreate compacted boltdb files from compactor to reduce storage space usage # 2.4.1 (2021/11/07) diff --git a/pkg/storage/stores/shipper/compactor/table.go b/pkg/storage/stores/shipper/compactor/table.go index 54739f1d1b..bb43cfc0d7 100644 --- a/pkg/storage/stores/shipper/compactor/table.go +++ b/pkg/storage/stores/shipper/compactor/table.go @@ -22,11 +22,16 @@ import ( ) const ( - compactMinDBs = 4 - uploaderName = "compactor" + uploaderName = "compactor" readDBsParallelism = 50 batchSize = 1000 + + // we want to recreate compactedDB when the chances of it changing due to compaction or deletion of data are low. + // this is to avoid recreation of the DB too often which would be too costly in a large cluster. + recreateCompactedDBOlderThan = 12 * time.Hour + dropFreePagesTxMaxSize = 100 * 1024 * 1024 // 100MB + recreatedCompactedDBSuffix = ".r.gz" ) var bucketName = []byte("index") @@ -42,8 +47,12 @@ type table struct { applyRetention bool tableMarker retention.TableMarker - compactedDB *bbolt.DB - logger log.Logger + sourceFiles []storage.IndexFile + compactedDB *bbolt.DB + compactedDBRecreated bool + uploadCompactedDB bool + removeSourceFiles bool + logger log.Logger ctx context.Context quit chan struct{} @@ -75,6 +84,12 @@ func (t *table) compact(tableHasExpiredStreams bool) error { return err } + if len(indexFiles) == 0 { + level.Info(t.logger).Log("msg", "no index files found") + return nil + } + + t.sourceFiles = indexFiles level.Info(t.logger).Log("msg", "listed files", "count", len(indexFiles)) defer func() { @@ -86,74 +101,141 @@ func (t *table) compact(tableHasExpiredStreams bool) error { applyRetention := t.applyRetention && tableHasExpiredStreams - if !applyRetention { - if len(indexFiles) < compactMinDBs { - level.Info(t.logger).Log("msg", fmt.Sprintf("skipping compaction since we have just %d files in storage", len(indexFiles))) - return nil - } + if len(indexFiles) > 1 { if err := t.compactFiles(indexFiles); err != nil { return err } - // upload the compacted db - err = t.upload() + + // we have compacted the files to a single file so let use upload the compacted db and remove the source files. + t.uploadCompactedDB = true + t.removeSourceFiles = true + } else if !applyRetention && !t.mustRecreateCompactedDB() { + return nil + } else { + // download the db for applying retention or recreating the compacted db + downloadAt := filepath.Join(t.workingDirectory, indexFiles[0].Name) + err = shipper_util.GetFileFromStorage(t.ctx, t.indexStorageClient, t.name, indexFiles[0].Name, downloadAt, false) if err != nil { return err } + t.compactedDB, err = openBoltdbFileWithNoSync(downloadAt) + if err != nil { + return err + } + } - // remove source files from storage which were compacted - err = t.removeFilesFromStorage(indexFiles) + if applyRetention { + empty, modified, err := t.tableMarker.MarkForDelete(t.ctx, t.name, t.compactedDB) if err != nil { return err } - return nil + + if empty { + // we have deleted all the data so we can remove the source files without uploading the compacted db + t.removeSourceFiles = true + t.uploadCompactedDB = false + } else if modified { + // we have modified the compacted db so we need to upload the compacted db and remove the source file(s) + t.uploadCompactedDB = true + t.removeSourceFiles = true + } } - var compacted bool - if len(indexFiles) > 1 { - if err := t.compactFiles(indexFiles); err != nil { + // file was not modified so see if we must recreate the compacted db to optimize storage usage + if !t.uploadCompactedDB && !t.removeSourceFiles && t.mustRecreateCompactedDB() { + err := t.recreateCompactedDB() + if err != nil { return err } - compacted = true + + // we have recreated the compacted db so we need to upload the compacted db and remove the source file + t.uploadCompactedDB = true + t.removeSourceFiles = true + t.compactedDBRecreated = true } - if len(indexFiles) == 1 { - // download the db - downloadAt := filepath.Join(t.workingDirectory, fmt.Sprint(time.Now().Unix())) - err = shipper_util.GetFileFromStorage(t.ctx, t.indexStorageClient, t.name, indexFiles[0].Name, downloadAt, false) + return t.done() +} + +// done takes care of uploading the files and cleaning up the working directory based on the value in uploadCompactedDB and removeSourceFiles +func (t *table) done() error { + if t.uploadCompactedDB { + err := t.upload() if err != nil { return err } - t.compactedDB, err = openBoltdbFileWithNoSync(downloadAt) + } + + if t.removeSourceFiles { + err := t.removeSourceFilesFromStorage() if err != nil { return err } } - if t.compactedDB == nil { - level.Info(t.logger).Log("msg", "skipping compaction no files found.") - return nil + return nil +} + +// mustRecreateCompactedDB returns true if the compacted db should be recreated +func (t *table) mustRecreateCompactedDB() bool { + if len(t.sourceFiles) != 1 { + // do not recreate if there are multiple source files + return false + } else if time.Since(t.sourceFiles[0].ModifiedAt) < recreateCompactedDBOlderThan { + // do not recreate if the source file is younger than the threshold + return false } - empty, modified, err := t.tableMarker.MarkForDelete(t.ctx, t.name, t.compactedDB) + // recreate the compacted db only if we have not recreated it before + return !strings.HasSuffix(t.sourceFiles[0].Name, recreatedCompactedDBSuffix) +} + +// recreateCompactedDB just copies the old db to the new one using bbolt.Compact for following reasons: +// 1. When files are deleted, boltdb leaves free pages in the file. The only way to drop those free pages is to re-create the file. +// See https://github.com/boltdb/bolt/issues/308 for more details. +// 2. boltdb by default fills only about 50% of the page in the file. See https://github.com/etcd-io/bbolt/blob/master/bucket.go#L26. +// This setting is optimal for unordered writes. +// bbolt.Compact fills the whole page by setting FillPercent to 1 which works well here since while copying the data, it receives the index entries in order. +// The storage space goes down from anywhere between 25% to 50% as per my(Sandeep) tests. +func (t *table) recreateCompactedDB() error { + destDB, err := openBoltdbFileWithNoSync(filepath.Join(t.workingDirectory, fmt.Sprint(time.Now().Unix()))) if err != nil { return err } - if empty { - return t.removeFilesFromStorage(indexFiles) + level.Info(t.logger).Log("msg", "recreating compacted db") + + err = bbolt.Compact(destDB, t.compactedDB, dropFreePagesTxMaxSize) + if err != nil { + return err } - if !modified && !compacted { - // we didn't make a modification so let's just return + sourceSize := int64(0) + destSize := int64(0) + + if err := t.compactedDB.View(func(tx *bbolt.Tx) error { + sourceSize = tx.Size() return nil + }); err != nil { + return err } - err = t.upload() + if err := destDB.View(func(tx *bbolt.Tx) error { + destSize = tx.Size() + return nil + }); err != nil { + return err + } + + level.Info(t.logger).Log("msg", "recreated compacted db", "src_size_bytes", sourceSize, "dest_size_bytes", destSize) + + err = t.compactedDB.Close() if err != nil { return err } - return t.removeFilesFromStorage(indexFiles) + t.compactedDB = destDB + return nil } func (t *table) compactFiles(files []storage.IndexFile) error { @@ -388,17 +470,21 @@ func (t *table) upload() error { } }() - fileName := fmt.Sprintf("%s.gz", shipper_util.BuildIndexFileName(t.name, uploaderName, fmt.Sprint(time.Now().Unix()))) + fileNameFormat := "%s.gz" + if t.compactedDBRecreated { + fileNameFormat = "%s" + recreatedCompactedDBSuffix + } + fileName := fmt.Sprintf(fileNameFormat, shipper_util.BuildIndexFileName(t.name, uploaderName, fmt.Sprint(time.Now().Unix()))) level.Info(t.logger).Log("msg", "uploading the compacted file", "fileName", fileName) return t.indexStorageClient.PutFile(t.ctx, t.name, fileName, compressedDB) } -// removeFilesFromStorage deletes index files from storage. -func (t *table) removeFilesFromStorage(files []storage.IndexFile) error { - level.Info(t.logger).Log("msg", "removing source db files from storage", "count", len(files)) +// removeSourceFilesFromStorage deletes source db files from storage. +func (t *table) removeSourceFilesFromStorage() error { + level.Info(t.logger).Log("msg", "removing source db files from storage", "count", len(t.sourceFiles)) - for _, file := range files { + for _, file := range t.sourceFiles { err := t.indexStorageClient.DeleteFile(t.ctx, t.name, file.Name) if err != nil { return err diff --git a/pkg/storage/stores/shipper/compactor/table_test.go b/pkg/storage/stores/shipper/compactor/table_test.go index 3afc454ee1..02378b6d86 100644 --- a/pkg/storage/stores/shipper/compactor/table_test.go +++ b/pkg/storage/stores/shipper/compactor/table_test.go @@ -8,6 +8,7 @@ import ( "path/filepath" "strings" "testing" + "time" "github.com/stretchr/testify/require" "go.etcd.io/bbolt" @@ -51,7 +52,7 @@ func TestTable_Compaction(t *testing.T) { tableWorkingDirectory := filepath.Join(tempDir, workingDirName, tableName) // setup some dbs - numDBs := compactMinDBs * 2 + numDBs := 10 numRecordsPerDB := 100 dbsToSetup := make(map[string]testutil.DBRecords) @@ -209,7 +210,7 @@ func TestTable_CompactionFailure(t *testing.T) { tableWorkingDirectory := filepath.Join(tempDir, workingDirName, tableName) // setup some dbs - numDBs := compactMinDBs * 2 + numDBs := 10 numRecordsPerDB := 100 dbsToSetup := make(map[string]testutil.DBRecords) @@ -303,3 +304,147 @@ func compareCompactedDB(t *testing.T, compactedDBPath string, sourceDBsPath stri require.NoError(t, err) } + +func TestTable_RecreateCompactedDB(t *testing.T) { + for name, tt := range map[string]struct { + dbCount int + assert func(t *testing.T, storagePath, tableName string) + tableMarker retention.TableMarker + compactedDBMtime time.Time + shouldRecreateCompactedDB bool + }{ + // must not recreate compacted db test cases: + "more than 1 file in table": { + dbCount: 2, + assert: func(t *testing.T, storagePath, tableName string) { + files, err := ioutil.ReadDir(filepath.Join(storagePath, tableName)) + require.NoError(t, err) + require.Len(t, files, 1) + require.True(t, strings.HasSuffix(files[0].Name(), ".gz")) + require.False(t, strings.HasSuffix(files[0].Name(), recreatedCompactedDBSuffix)) + compareCompactedDB(t, filepath.Join(storagePath, tableName, files[0].Name()), filepath.Join(storagePath, "test-copy")) + }, + tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) { + return false, false, nil + }), + }, + "compacted db not old enough": { + dbCount: 1, + assert: func(t *testing.T, storagePath, tableName string) { + files, err := ioutil.ReadDir(filepath.Join(storagePath, tableName)) + require.NoError(t, err) + require.Len(t, files, 1) + require.True(t, strings.HasSuffix(files[0].Name(), ".gz")) + require.False(t, strings.HasSuffix(files[0].Name(), recreatedCompactedDBSuffix)) + compareCompactedDB(t, filepath.Join(storagePath, tableName, files[0].Name()), filepath.Join(storagePath, "test-copy")) + }, + tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) { + return false, false, nil + }), + compactedDBMtime: time.Now().Add(-recreateCompactedDBOlderThan / 2), + }, + "marked table": { + dbCount: 1, + assert: func(t *testing.T, storagePath, tableName string) { + files, err := ioutil.ReadDir(filepath.Join(storagePath, tableName)) + require.NoError(t, err) + require.Len(t, files, 1) + require.True(t, strings.HasSuffix(files[0].Name(), ".gz")) + require.False(t, strings.HasSuffix(files[0].Name(), recreatedCompactedDBSuffix)) + compareCompactedDB(t, filepath.Join(storagePath, tableName, files[0].Name()), filepath.Join(storagePath, "test-copy")) + }, + tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) { + return false, true, nil + }), + }, + "emptied table": { + dbCount: 2, + assert: func(t *testing.T, storagePath, tableName string) { + _, err := ioutil.ReadDir(filepath.Join(storagePath, tableName)) + require.True(t, os.IsNotExist(err)) + }, + tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) { + return true, true, nil + }), + }, + + // must recreate compacted db test cases + "compacted db old enough": { + dbCount: 1, + assert: func(t *testing.T, storagePath, tableName string) { + files, err := ioutil.ReadDir(filepath.Join(storagePath, tableName)) + require.NoError(t, err) + require.Len(t, files, 1) + require.True(t, strings.HasSuffix(files[0].Name(), recreatedCompactedDBSuffix)) + compareCompactedDB(t, filepath.Join(storagePath, tableName, files[0].Name()), filepath.Join(storagePath, "test-copy")) + }, + tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) { + return false, false, nil + }), + compactedDBMtime: time.Now().Add(-(recreateCompactedDBOlderThan + time.Minute)), + shouldRecreateCompactedDB: true, + }, + } { + tt := tt + t.Run(name, func(t *testing.T) { + if !tt.compactedDBMtime.IsZero() { + require.Equal(t, 1, tt.dbCount) + } + tempDir := t.TempDir() + + objectStoragePath := filepath.Join(tempDir, objectsStorageDirName) + tableWorkingDirectory := filepath.Join(tempDir, workingDirName, tableName) + + // setup some dbs + numDBs := tt.dbCount + numRecordsPerDB := 100 + + dbsToSetup := make(map[string]testutil.DBRecords) + for i := 0; i < numDBs; i++ { + dbsToSetup[fmt.Sprint(i)] = testutil.DBRecords{ + Start: i * numRecordsPerDB, + NumRecords: (i + 1) * numRecordsPerDB, + } + } + + testutil.SetupDBTablesAtPath(t, tableName, objectStoragePath, dbsToSetup, true) + + if !tt.compactedDBMtime.IsZero() && tt.dbCount == 1 { + err := os.Chtimes(filepath.Join(objectStoragePath, tableName, "0.gz"), tt.compactedDBMtime, tt.compactedDBMtime) + require.NoError(t, err) + } + + // setup exact same copy of dbs for comparison. + testutil.SetupDBTablesAtPath(t, "test-copy", objectStoragePath, dbsToSetup, false) + + // do the compaction + objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath}) + require.NoError(t, err) + + table, err := newTable(context.Background(), tableWorkingDirectory, storage.NewIndexStorageClient(objectClient, ""), true, tt.tableMarker) + require.NoError(t, err) + + require.NoError(t, table.compact(true)) + require.Equal(t, tt.shouldRecreateCompactedDB, table.compactedDBRecreated) + tt.assert(t, objectStoragePath, tableName) + + // if the compacted db was recreated, running the compaction again must not recreate the file even if the mtime is older than the threshold + if tt.shouldRecreateCompactedDB { + files, err := ioutil.ReadDir(filepath.Join(objectStoragePath, tableName)) + require.NoError(t, err) + require.Len(t, files, 1) + + // change the mtime of the recreated file + err = os.Chtimes(filepath.Join(objectStoragePath, tableName, files[0].Name()), tt.compactedDBMtime, tt.compactedDBMtime) + require.NoError(t, err) + + table, err := newTable(context.Background(), tableWorkingDirectory, storage.NewIndexStorageClient(objectClient, ""), true, tt.tableMarker) + require.NoError(t, err) + + require.NoError(t, table.compact(true)) + require.Equal(t, false, table.compactedDBRecreated) + tt.assert(t, objectStoragePath, tableName) + } + }) + } +}