recreate compacted boltdb files from compactor when they are more than 12 hours old (#4853)

pull/4873/head
Sandeep Sukhani 4 years ago committed by GitHub
parent 6347faf876
commit 3ac9818f82
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 162
      pkg/storage/stores/shipper/compactor/table.go
  3. 149
      pkg/storage/stores/shipper/compactor/table_test.go

@ -9,6 +9,7 @@
* [4736](https://github.com/grafana/loki/pull/4736) **sandeepsukhani**: allow applying retention at different interval than compaction
* [4744](https://github.com/grafana/loki/pull/4744) **cyriltovena**: Promtail: Adds GELF UDP support.
* [4741](https://github.com/grafana/loki/pull/4741) **sandeepsukhani**: index cleanup fixes while applying retention
* [4853](https://github.com/grafana/loki/pull/4853) **sandeepsukhani**: recreate compacted boltdb files from compactor to reduce storage space usage
# 2.4.1 (2021/11/07)

@ -22,11 +22,16 @@ import (
)
const (
compactMinDBs = 4
uploaderName = "compactor"
uploaderName = "compactor"
readDBsParallelism = 50
batchSize = 1000
// we want to recreate compactedDB when the chances of it changing due to compaction or deletion of data are low.
// this is to avoid recreation of the DB too often which would be too costly in a large cluster.
recreateCompactedDBOlderThan = 12 * time.Hour
dropFreePagesTxMaxSize = 100 * 1024 * 1024 // 100MB
recreatedCompactedDBSuffix = ".r.gz"
)
var bucketName = []byte("index")
@ -42,8 +47,12 @@ type table struct {
applyRetention bool
tableMarker retention.TableMarker
compactedDB *bbolt.DB
logger log.Logger
sourceFiles []storage.IndexFile
compactedDB *bbolt.DB
compactedDBRecreated bool
uploadCompactedDB bool
removeSourceFiles bool
logger log.Logger
ctx context.Context
quit chan struct{}
@ -75,6 +84,12 @@ func (t *table) compact(tableHasExpiredStreams bool) error {
return err
}
if len(indexFiles) == 0 {
level.Info(t.logger).Log("msg", "no index files found")
return nil
}
t.sourceFiles = indexFiles
level.Info(t.logger).Log("msg", "listed files", "count", len(indexFiles))
defer func() {
@ -86,74 +101,141 @@ func (t *table) compact(tableHasExpiredStreams bool) error {
applyRetention := t.applyRetention && tableHasExpiredStreams
if !applyRetention {
if len(indexFiles) < compactMinDBs {
level.Info(t.logger).Log("msg", fmt.Sprintf("skipping compaction since we have just %d files in storage", len(indexFiles)))
return nil
}
if len(indexFiles) > 1 {
if err := t.compactFiles(indexFiles); err != nil {
return err
}
// upload the compacted db
err = t.upload()
// we have compacted the files to a single file so let use upload the compacted db and remove the source files.
t.uploadCompactedDB = true
t.removeSourceFiles = true
} else if !applyRetention && !t.mustRecreateCompactedDB() {
return nil
} else {
// download the db for applying retention or recreating the compacted db
downloadAt := filepath.Join(t.workingDirectory, indexFiles[0].Name)
err = shipper_util.GetFileFromStorage(t.ctx, t.indexStorageClient, t.name, indexFiles[0].Name, downloadAt, false)
if err != nil {
return err
}
t.compactedDB, err = openBoltdbFileWithNoSync(downloadAt)
if err != nil {
return err
}
}
// remove source files from storage which were compacted
err = t.removeFilesFromStorage(indexFiles)
if applyRetention {
empty, modified, err := t.tableMarker.MarkForDelete(t.ctx, t.name, t.compactedDB)
if err != nil {
return err
}
return nil
if empty {
// we have deleted all the data so we can remove the source files without uploading the compacted db
t.removeSourceFiles = true
t.uploadCompactedDB = false
} else if modified {
// we have modified the compacted db so we need to upload the compacted db and remove the source file(s)
t.uploadCompactedDB = true
t.removeSourceFiles = true
}
}
var compacted bool
if len(indexFiles) > 1 {
if err := t.compactFiles(indexFiles); err != nil {
// file was not modified so see if we must recreate the compacted db to optimize storage usage
if !t.uploadCompactedDB && !t.removeSourceFiles && t.mustRecreateCompactedDB() {
err := t.recreateCompactedDB()
if err != nil {
return err
}
compacted = true
// we have recreated the compacted db so we need to upload the compacted db and remove the source file
t.uploadCompactedDB = true
t.removeSourceFiles = true
t.compactedDBRecreated = true
}
if len(indexFiles) == 1 {
// download the db
downloadAt := filepath.Join(t.workingDirectory, fmt.Sprint(time.Now().Unix()))
err = shipper_util.GetFileFromStorage(t.ctx, t.indexStorageClient, t.name, indexFiles[0].Name, downloadAt, false)
return t.done()
}
// done takes care of uploading the files and cleaning up the working directory based on the value in uploadCompactedDB and removeSourceFiles
func (t *table) done() error {
if t.uploadCompactedDB {
err := t.upload()
if err != nil {
return err
}
t.compactedDB, err = openBoltdbFileWithNoSync(downloadAt)
}
if t.removeSourceFiles {
err := t.removeSourceFilesFromStorage()
if err != nil {
return err
}
}
if t.compactedDB == nil {
level.Info(t.logger).Log("msg", "skipping compaction no files found.")
return nil
return nil
}
// mustRecreateCompactedDB returns true if the compacted db should be recreated
func (t *table) mustRecreateCompactedDB() bool {
if len(t.sourceFiles) != 1 {
// do not recreate if there are multiple source files
return false
} else if time.Since(t.sourceFiles[0].ModifiedAt) < recreateCompactedDBOlderThan {
// do not recreate if the source file is younger than the threshold
return false
}
empty, modified, err := t.tableMarker.MarkForDelete(t.ctx, t.name, t.compactedDB)
// recreate the compacted db only if we have not recreated it before
return !strings.HasSuffix(t.sourceFiles[0].Name, recreatedCompactedDBSuffix)
}
// recreateCompactedDB just copies the old db to the new one using bbolt.Compact for following reasons:
// 1. When files are deleted, boltdb leaves free pages in the file. The only way to drop those free pages is to re-create the file.
// See https://github.com/boltdb/bolt/issues/308 for more details.
// 2. boltdb by default fills only about 50% of the page in the file. See https://github.com/etcd-io/bbolt/blob/master/bucket.go#L26.
// This setting is optimal for unordered writes.
// bbolt.Compact fills the whole page by setting FillPercent to 1 which works well here since while copying the data, it receives the index entries in order.
// The storage space goes down from anywhere between 25% to 50% as per my(Sandeep) tests.
func (t *table) recreateCompactedDB() error {
destDB, err := openBoltdbFileWithNoSync(filepath.Join(t.workingDirectory, fmt.Sprint(time.Now().Unix())))
if err != nil {
return err
}
if empty {
return t.removeFilesFromStorage(indexFiles)
level.Info(t.logger).Log("msg", "recreating compacted db")
err = bbolt.Compact(destDB, t.compactedDB, dropFreePagesTxMaxSize)
if err != nil {
return err
}
if !modified && !compacted {
// we didn't make a modification so let's just return
sourceSize := int64(0)
destSize := int64(0)
if err := t.compactedDB.View(func(tx *bbolt.Tx) error {
sourceSize = tx.Size()
return nil
}); err != nil {
return err
}
err = t.upload()
if err := destDB.View(func(tx *bbolt.Tx) error {
destSize = tx.Size()
return nil
}); err != nil {
return err
}
level.Info(t.logger).Log("msg", "recreated compacted db", "src_size_bytes", sourceSize, "dest_size_bytes", destSize)
err = t.compactedDB.Close()
if err != nil {
return err
}
return t.removeFilesFromStorage(indexFiles)
t.compactedDB = destDB
return nil
}
func (t *table) compactFiles(files []storage.IndexFile) error {
@ -388,17 +470,21 @@ func (t *table) upload() error {
}
}()
fileName := fmt.Sprintf("%s.gz", shipper_util.BuildIndexFileName(t.name, uploaderName, fmt.Sprint(time.Now().Unix())))
fileNameFormat := "%s.gz"
if t.compactedDBRecreated {
fileNameFormat = "%s" + recreatedCompactedDBSuffix
}
fileName := fmt.Sprintf(fileNameFormat, shipper_util.BuildIndexFileName(t.name, uploaderName, fmt.Sprint(time.Now().Unix())))
level.Info(t.logger).Log("msg", "uploading the compacted file", "fileName", fileName)
return t.indexStorageClient.PutFile(t.ctx, t.name, fileName, compressedDB)
}
// removeFilesFromStorage deletes index files from storage.
func (t *table) removeFilesFromStorage(files []storage.IndexFile) error {
level.Info(t.logger).Log("msg", "removing source db files from storage", "count", len(files))
// removeSourceFilesFromStorage deletes source db files from storage.
func (t *table) removeSourceFilesFromStorage() error {
level.Info(t.logger).Log("msg", "removing source db files from storage", "count", len(t.sourceFiles))
for _, file := range files {
for _, file := range t.sourceFiles {
err := t.indexStorageClient.DeleteFile(t.ctx, t.name, file.Name)
if err != nil {
return err

@ -8,6 +8,7 @@ import (
"path/filepath"
"strings"
"testing"
"time"
"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
@ -51,7 +52,7 @@ func TestTable_Compaction(t *testing.T) {
tableWorkingDirectory := filepath.Join(tempDir, workingDirName, tableName)
// setup some dbs
numDBs := compactMinDBs * 2
numDBs := 10
numRecordsPerDB := 100
dbsToSetup := make(map[string]testutil.DBRecords)
@ -209,7 +210,7 @@ func TestTable_CompactionFailure(t *testing.T) {
tableWorkingDirectory := filepath.Join(tempDir, workingDirName, tableName)
// setup some dbs
numDBs := compactMinDBs * 2
numDBs := 10
numRecordsPerDB := 100
dbsToSetup := make(map[string]testutil.DBRecords)
@ -303,3 +304,147 @@ func compareCompactedDB(t *testing.T, compactedDBPath string, sourceDBsPath stri
require.NoError(t, err)
}
func TestTable_RecreateCompactedDB(t *testing.T) {
for name, tt := range map[string]struct {
dbCount int
assert func(t *testing.T, storagePath, tableName string)
tableMarker retention.TableMarker
compactedDBMtime time.Time
shouldRecreateCompactedDB bool
}{
// must not recreate compacted db test cases:
"more than 1 file in table": {
dbCount: 2,
assert: func(t *testing.T, storagePath, tableName string) {
files, err := ioutil.ReadDir(filepath.Join(storagePath, tableName))
require.NoError(t, err)
require.Len(t, files, 1)
require.True(t, strings.HasSuffix(files[0].Name(), ".gz"))
require.False(t, strings.HasSuffix(files[0].Name(), recreatedCompactedDBSuffix))
compareCompactedDB(t, filepath.Join(storagePath, tableName, files[0].Name()), filepath.Join(storagePath, "test-copy"))
},
tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) {
return false, false, nil
}),
},
"compacted db not old enough": {
dbCount: 1,
assert: func(t *testing.T, storagePath, tableName string) {
files, err := ioutil.ReadDir(filepath.Join(storagePath, tableName))
require.NoError(t, err)
require.Len(t, files, 1)
require.True(t, strings.HasSuffix(files[0].Name(), ".gz"))
require.False(t, strings.HasSuffix(files[0].Name(), recreatedCompactedDBSuffix))
compareCompactedDB(t, filepath.Join(storagePath, tableName, files[0].Name()), filepath.Join(storagePath, "test-copy"))
},
tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) {
return false, false, nil
}),
compactedDBMtime: time.Now().Add(-recreateCompactedDBOlderThan / 2),
},
"marked table": {
dbCount: 1,
assert: func(t *testing.T, storagePath, tableName string) {
files, err := ioutil.ReadDir(filepath.Join(storagePath, tableName))
require.NoError(t, err)
require.Len(t, files, 1)
require.True(t, strings.HasSuffix(files[0].Name(), ".gz"))
require.False(t, strings.HasSuffix(files[0].Name(), recreatedCompactedDBSuffix))
compareCompactedDB(t, filepath.Join(storagePath, tableName, files[0].Name()), filepath.Join(storagePath, "test-copy"))
},
tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) {
return false, true, nil
}),
},
"emptied table": {
dbCount: 2,
assert: func(t *testing.T, storagePath, tableName string) {
_, err := ioutil.ReadDir(filepath.Join(storagePath, tableName))
require.True(t, os.IsNotExist(err))
},
tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) {
return true, true, nil
}),
},
// must recreate compacted db test cases
"compacted db old enough": {
dbCount: 1,
assert: func(t *testing.T, storagePath, tableName string) {
files, err := ioutil.ReadDir(filepath.Join(storagePath, tableName))
require.NoError(t, err)
require.Len(t, files, 1)
require.True(t, strings.HasSuffix(files[0].Name(), recreatedCompactedDBSuffix))
compareCompactedDB(t, filepath.Join(storagePath, tableName, files[0].Name()), filepath.Join(storagePath, "test-copy"))
},
tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) {
return false, false, nil
}),
compactedDBMtime: time.Now().Add(-(recreateCompactedDBOlderThan + time.Minute)),
shouldRecreateCompactedDB: true,
},
} {
tt := tt
t.Run(name, func(t *testing.T) {
if !tt.compactedDBMtime.IsZero() {
require.Equal(t, 1, tt.dbCount)
}
tempDir := t.TempDir()
objectStoragePath := filepath.Join(tempDir, objectsStorageDirName)
tableWorkingDirectory := filepath.Join(tempDir, workingDirName, tableName)
// setup some dbs
numDBs := tt.dbCount
numRecordsPerDB := 100
dbsToSetup := make(map[string]testutil.DBRecords)
for i := 0; i < numDBs; i++ {
dbsToSetup[fmt.Sprint(i)] = testutil.DBRecords{
Start: i * numRecordsPerDB,
NumRecords: (i + 1) * numRecordsPerDB,
}
}
testutil.SetupDBTablesAtPath(t, tableName, objectStoragePath, dbsToSetup, true)
if !tt.compactedDBMtime.IsZero() && tt.dbCount == 1 {
err := os.Chtimes(filepath.Join(objectStoragePath, tableName, "0.gz"), tt.compactedDBMtime, tt.compactedDBMtime)
require.NoError(t, err)
}
// setup exact same copy of dbs for comparison.
testutil.SetupDBTablesAtPath(t, "test-copy", objectStoragePath, dbsToSetup, false)
// do the compaction
objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath})
require.NoError(t, err)
table, err := newTable(context.Background(), tableWorkingDirectory, storage.NewIndexStorageClient(objectClient, ""), true, tt.tableMarker)
require.NoError(t, err)
require.NoError(t, table.compact(true))
require.Equal(t, tt.shouldRecreateCompactedDB, table.compactedDBRecreated)
tt.assert(t, objectStoragePath, tableName)
// if the compacted db was recreated, running the compaction again must not recreate the file even if the mtime is older than the threshold
if tt.shouldRecreateCompactedDB {
files, err := ioutil.ReadDir(filepath.Join(objectStoragePath, tableName))
require.NoError(t, err)
require.Len(t, files, 1)
// change the mtime of the recreated file
err = os.Chtimes(filepath.Join(objectStoragePath, tableName, files[0].Name()), tt.compactedDBMtime, tt.compactedDBMtime)
require.NoError(t, err)
table, err := newTable(context.Background(), tableWorkingDirectory, storage.NewIndexStorageClient(objectClient, ""), true, tt.tableMarker)
require.NoError(t, err)
require.NoError(t, table.compact(true))
require.Equal(t, false, table.compactedDBRecreated)
tt.assert(t, objectStoragePath, tableName)
}
})
}
}

Loading…
Cancel
Save