Compactor improvements (#4018)

* use previously compacted file as seed file and copy index into it

* allow configuring compactor to compact multiple tables at a time
pull/4027/head
Sandeep Sukhani 4 years ago committed by GitHub
parent f811dbfc18
commit 8a9abdfe16
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 70
      pkg/storage/stores/shipper/compactor/compactor.go
  2. 1
      pkg/storage/stores/shipper/compactor/compactor_test.go
  3. 101
      pkg/storage/stores/shipper/compactor/table.go
  4. 92
      pkg/storage/stores/shipper/compactor/table_test.go

@ -40,6 +40,7 @@ type Config struct {
RetentionDeleteDelay time.Duration `yaml:"retention_delete_delay"`
RetentionDeleteWorkCount int `yaml:"retention_delete_worker_count"`
DeleteRequestCancelPeriod time.Duration `yaml:"delete_request_cancel_period"`
MaxCompactionParallelism int `yaml:"max_compaction_parallelism"`
}
// RegisterFlags registers flags.
@ -52,6 +53,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.RetentionEnabled, "boltdb.shipper.compactor.retention-enabled", false, "(Experimental) Activate custom (per-stream,per-tenant) retention.")
f.IntVar(&cfg.RetentionDeleteWorkCount, "boltdb.shipper.compactor.retention-delete-worker-count", 150, "The total amount of worker to use to delete chunks.")
f.DurationVar(&cfg.DeleteRequestCancelPeriod, "boltdb.shipper.compactor.delete-request-cancel-period", 24*time.Hour, "Allow cancellation of delete request until duration after they are created. Data would be deleted only after delete requests have been older than this duration. Ideally this should be set to at least 24h.")
f.IntVar(&cfg.MaxCompactionParallelism, "boltdb.shipper.compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.")
}
func (cfg *Config) IsDefaults() bool {
@ -61,6 +63,9 @@ func (cfg *Config) IsDefaults() bool {
}
func (cfg *Config) Validate() error {
if cfg.MaxCompactionParallelism < 1 {
return errors.New("max compaction parallelism must be >= 1")
}
return shipper_util.ValidateSharedStoreKeyPrefix(cfg.SharedStoreKeyPrefix)
}
@ -246,23 +251,64 @@ func (c *Compactor) RunCompaction(ctx context.Context) error {
tables[i] = strings.TrimSuffix(string(dir), delimiter)
}
for _, tableName := range tables {
if tableName == deletion.DeleteRequestsTableName {
// we do not want to compact or apply retention on delete requests table
continue
compactTablesChan := make(chan string)
errChan := make(chan error)
for i := 0; i < c.cfg.MaxCompactionParallelism; i++ {
go func() {
var err error
defer func() {
errChan <- err
}()
for {
select {
case tableName, ok := <-compactTablesChan:
if !ok {
return
}
level.Info(util_log.Logger).Log("msg", "compacting table", "table-name", tableName)
err = c.CompactTable(ctx, tableName)
if err != nil {
return
}
level.Info(util_log.Logger).Log("msg", "finished compacting table", "table-name", tableName)
case <-ctx.Done():
return
}
}
}()
}
go func() {
for _, tableName := range tables {
if tableName == deletion.DeleteRequestsTableName {
// we do not want to compact or apply retention on delete requests table
continue
}
select {
case compactTablesChan <- tableName:
case <-ctx.Done():
return
}
}
if err := c.CompactTable(ctx, tableName); err != nil {
close(compactTablesChan)
}()
var firstErr error
// read all the errors
for i := 0; i < c.cfg.MaxCompactionParallelism; i++ {
err := <-errChan
if err != nil && firstErr == nil {
status = statusFailure
}
// check if context was cancelled before going for next table.
select {
case <-ctx.Done():
return nil
default:
firstErr = err
}
}
return nil
return firstErr
}
type expirationChecker struct {

@ -50,6 +50,7 @@ func TestIsDefaults(t *testing.T) {
RetentionDeleteDelay: 2 * time.Hour,
RetentionDeleteWorkCount: 150,
DeleteRequestCancelPeriod: 24 * time.Hour,
MaxCompactionParallelism: 1,
}, true},
} {
t.Run(fmt.Sprint(i), func(t *testing.T) {

@ -6,12 +6,14 @@ import (
"fmt"
"os"
"path/filepath"
"strings"
"time"
"github.com/cortexproject/cortex/pkg/chunk"
chunk_util "github.com/cortexproject/cortex/pkg/chunk/util"
util_log "github.com/cortexproject/cortex/pkg/util/log"
util_math "github.com/cortexproject/cortex/pkg/util/math"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"go.etcd.io/bbolt"
@ -42,6 +44,7 @@ type table struct {
tableMarker retention.TableMarker
compactedDB *bbolt.DB
logger log.Logger
ctx context.Context
quit chan struct{}
@ -62,6 +65,7 @@ func newTable(ctx context.Context, workingDirectory string, objectClient chunk.O
applyRetention: applyRetention,
tableMarker: tableMarker,
}
table.logger = log.With(util_log.Logger, "table-name", table.name)
return &table, nil
}
@ -72,12 +76,12 @@ func (t *table) compact(tableHasExpiredStreams bool) error {
return err
}
level.Info(util_log.Logger).Log("msg", "listed files", "count", len(objects))
level.Info(t.logger).Log("msg", "listed files", "count", len(objects))
defer func() {
err := t.cleanup()
if err != nil {
level.Error(util_log.Logger).Log("msg", "failed to cleanup table", "name", t.name)
level.Error(t.logger).Log("msg", "failed to cleanup table")
}
}()
@ -85,7 +89,7 @@ func (t *table) compact(tableHasExpiredStreams bool) error {
if !applyRetention {
if len(objects) < compactMinDBs {
level.Info(util_log.Logger).Log("msg", fmt.Sprintf("skipping compaction since we have just %d files in storage", len(objects)))
level.Info(t.logger).Log("msg", fmt.Sprintf("skipping compaction since we have just %d files in storage", len(objects)))
return nil
}
if err := t.compactFiles(objects); err != nil {
@ -120,16 +124,14 @@ func (t *table) compact(tableHasExpiredStreams bool) error {
if err != nil {
return err
}
t.compactedDB, err = shipper_util.SafeOpenBoltdbFile(downloadAt)
t.compactedDB, err = openBoltdbFileWithNoSync(downloadAt)
if err != nil {
return err
}
// no need to enforce write to disk, we'll upload and delete the file anyway.
t.compactedDB.NoSync = true
}
if t.compactedDB == nil {
level.Info(util_log.Logger).Log("msg", "skipping compaction no files found.")
level.Info(t.logger).Log("msg", "skipping compaction no files found.")
return nil
}
@ -157,15 +159,25 @@ func (t *table) compact(tableHasExpiredStreams bool) error {
func (t *table) compactFiles(objects []chunk.StorageObject) error {
var err error
// create a new compacted db
t.compactedDB, err = shipper_util.SafeOpenBoltdbFile(filepath.Join(t.workingDirectory, fmt.Sprint(time.Now().Unix())))
level.Info(t.logger).Log("msg", "starting compaction of dbs")
compactedDBName := filepath.Join(t.workingDirectory, fmt.Sprint(time.Now().Unix()))
seedFileIdx, err := findSeedObjectIdx(objects)
if err != nil {
return err
}
level.Info(t.logger).Log("msg", fmt.Sprintf("using %s as seed file", objects[seedFileIdx].Key))
err = shipper_util.GetFileFromStorage(t.ctx, t.storageClient, objects[seedFileIdx].Key, compactedDBName, false)
if err != nil {
return err
}
t.compactedDB, err = openBoltdbFileWithNoSync(compactedDBName)
if err != nil {
return err
}
// no need to enforce write to disk, we'll upload and delete the file anyway.
// in case of failure we'll restart the whole process anyway.
t.compactedDB.NoSync = true
level.Info(util_log.Logger).Log("msg", "starting compaction of dbs")
errChan := make(chan error)
readObjectChan := make(chan string)
@ -206,7 +218,7 @@ func (t *table) compactFiles(objects []chunk.StorageObject) error {
err = t.readFile(downloadAt)
if err != nil {
level.Error(util_log.Logger).Log("msg", fmt.Sprintf("error reading file %s", objectKey), "err", err)
level.Error(t.logger).Log("msg", fmt.Sprintf("error reading file %s", objectKey), "err", err)
return
}
case <-t.quit:
@ -220,7 +232,11 @@ func (t *table) compactFiles(objects []chunk.StorageObject) error {
// send all files to readObjectChan
go func() {
for _, object := range objects {
for i, object := range objects {
// skip seed file
if i == seedFileIdx {
continue
}
select {
case readObjectChan <- object.Key:
case <-t.quit:
@ -230,7 +246,7 @@ func (t *table) compactFiles(objects []chunk.StorageObject) error {
}
}
level.Debug(util_log.Logger).Log("msg", "closing readObjectChan")
level.Debug(t.logger).Log("msg", "closing readObjectChan")
close(readObjectChan)
}()
@ -257,7 +273,7 @@ func (t *table) compactFiles(objects []chunk.StorageObject) error {
default:
}
level.Info(util_log.Logger).Log("msg", "finished compacting the dbs")
level.Info(t.logger).Log("msg", "finished compacting the dbs")
return nil
}
@ -293,20 +309,20 @@ func (t *table) writeBatch(batch []indexEntry) error {
// readFile reads a boltdb file from a path and writes the index in batched mode to compactedDB
func (t *table) readFile(path string) error {
level.Debug(util_log.Logger).Log("msg", "reading file for compaction", "path", path)
level.Debug(t.logger).Log("msg", "reading file for compaction", "path", path)
db, err := shipper_util.SafeOpenBoltdbFile(path)
db, err := openBoltdbFileWithNoSync(path)
if err != nil {
return err
}
db.NoSync = true
defer func() {
if err := db.Close(); err != nil {
level.Error(util_log.Logger).Log("msg", "failed to close db", "path", path, "err", err)
level.Error(t.logger).Log("msg", "failed to close db", "path", path, "err", err)
}
if err = os.Remove(path); err != nil {
level.Error(util_log.Logger).Log("msg", "failed to remove file", "path", path, "err", err)
level.Error(t.logger).Log("msg", "failed to remove file", "path", path, "err", err)
}
}()
@ -379,23 +395,23 @@ func (t *table) upload() error {
defer func() {
if err := compressedDB.Close(); err != nil {
level.Error(util_log.Logger).Log("msg", "failed to close file", "path", compactedDBPath, "err", err)
level.Error(t.logger).Log("msg", "failed to close file", "path", compactedDBPath, "err", err)
}
if err := os.Remove(compressedDBPath); err != nil {
level.Error(util_log.Logger).Log("msg", "failed to remove file", "path", compressedDBPath, "err", err)
level.Error(t.logger).Log("msg", "failed to remove file", "path", compressedDBPath, "err", err)
}
}()
objectKey := fmt.Sprintf("%s.gz", shipper_util.BuildObjectKey(t.name, uploaderName, fmt.Sprint(time.Now().Unix())))
level.Info(util_log.Logger).Log("msg", "uploading the compacted file", "objectKey", objectKey)
level.Info(t.logger).Log("msg", "uploading the compacted file", "objectKey", objectKey)
return t.storageClient.PutObject(t.ctx, objectKey, compressedDB)
}
// removeObjectsFromStorage deletes objects from storage.
func (t *table) removeObjectsFromStorage(objects []chunk.StorageObject) error {
level.Info(util_log.Logger).Log("msg", "removing source db files from storage", "count", len(objects))
level.Info(t.logger).Log("msg", "removing source db files from storage", "count", len(objects))
for _, object := range objects {
err := t.storageClient.DeleteObject(t.ctx, object.Key)
@ -406,3 +422,36 @@ func (t *table) removeObjectsFromStorage(objects []chunk.StorageObject) error {
return nil
}
// openBoltdbFileWithNoSync opens a boltdb file and configures it to not sync the file to disk.
// Compaction process is idempotent and we do not retain the files so there is no need to sync them to disk.
func openBoltdbFileWithNoSync(path string) (*bbolt.DB, error) {
boltdb, err := shipper_util.SafeOpenBoltdbFile(path)
if err != nil {
return nil, err
}
// no need to enforce write to disk, we'll upload and delete the file anyway.
boltdb.NoSync = true
return boltdb, nil
}
// findSeedObjectIdx returns index of object to use as seed which would then get index from all the files written to.
// It tries to find previously compacted file(which has uploaderName) which would be the biggest file.
// In a large cluster, using previously compacted file as seed would significantly reduce compaction time.
// If it can't find a previously compacted file, it would just use the first file from the list of files.
func findSeedObjectIdx(objects []chunk.StorageObject) (int, error) {
for i, object := range objects {
dbName, err := shipper_util.GetDBNameFromObjectKey(object.Key)
if err != nil {
return 0, err
}
if strings.HasPrefix(dbName, uploaderName) {
return i, nil
}
}
return 0, nil
}

@ -24,51 +24,75 @@ const (
)
func TestTable_Compaction(t *testing.T) {
tempDir, err := ioutil.TempDir("", "table-compaction")
require.NoError(t, err)
for _, tc := range []struct {
name string
withCompactedFile bool
}{
{
name: "without compacted file",
withCompactedFile: false,
},
{
name: "with compacted file",
withCompactedFile: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
tempDir, err := ioutil.TempDir("", fmt.Sprintf("table-compaction-%v", tc.withCompactedFile))
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(tempDir))
}()
defer func() {
require.NoError(t, os.RemoveAll(tempDir))
}()
objectStoragePath := filepath.Join(tempDir, objectsStorageDirName)
tablePathInStorage := filepath.Join(objectStoragePath, tableName)
tableWorkingDirectory := filepath.Join(tempDir, workingDirName, tableName)
objectStoragePath := filepath.Join(tempDir, objectsStorageDirName)
tablePathInStorage := filepath.Join(objectStoragePath, tableName)
tableWorkingDirectory := filepath.Join(tempDir, workingDirName, tableName)
// setup some dbs
numDBs := compactMinDBs * 2
numRecordsPerDB := 100
// setup some dbs
numDBs := compactMinDBs * 2
numRecordsPerDB := 100
dbsToSetup := make(map[string]testutil.DBRecords)
for i := 0; i < numDBs; i++ {
dbsToSetup[fmt.Sprint(i)] = testutil.DBRecords{
Start: i * numRecordsPerDB,
NumRecords: (i + 1) * numRecordsPerDB,
}
}
dbsToSetup := make(map[string]testutil.DBRecords)
for i := 0; i < numDBs; i++ {
dbsToSetup[fmt.Sprint(i)] = testutil.DBRecords{
Start: i * numRecordsPerDB,
NumRecords: (i + 1) * numRecordsPerDB,
}
}
testutil.SetupDBTablesAtPath(t, tableName, objectStoragePath, dbsToSetup, true)
if tc.withCompactedFile {
// add a compacted file with some overlap with previously created dbs
dbsToSetup[fmt.Sprintf("%s-0", uploaderName)] = testutil.DBRecords{
Start: (numDBs / 2) * numRecordsPerDB,
NumRecords: (numDBs + 10) * numRecordsPerDB,
}
}
// setup exact same copy of dbs for comparison.
testutil.SetupDBTablesAtPath(t, "test-copy", objectStoragePath, dbsToSetup, false)
testutil.SetupDBTablesAtPath(t, tableName, objectStoragePath, dbsToSetup, true)
// do the compaction
objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath})
require.NoError(t, err)
// setup exact same copy of dbs for comparison.
testutil.SetupDBTablesAtPath(t, "test-copy", objectStoragePath, dbsToSetup, false)
table, err := newTable(context.Background(), tableWorkingDirectory, objectClient, false, nil)
require.NoError(t, err)
// do the compaction
objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath})
require.NoError(t, err)
require.NoError(t, table.compact(false))
table, err := newTable(context.Background(), tableWorkingDirectory, objectClient, false, nil)
require.NoError(t, err)
// verify that we have only 1 file left in storage after compaction.
files, err := ioutil.ReadDir(tablePathInStorage)
require.NoError(t, err)
require.Len(t, files, 1)
require.True(t, strings.HasSuffix(files[0].Name(), ".gz"))
require.NoError(t, table.compact(false))
// verify that we have only 1 file left in storage after compaction.
files, err := ioutil.ReadDir(tablePathInStorage)
require.NoError(t, err)
require.Len(t, files, 1)
require.True(t, strings.HasSuffix(files[0].Name(), ".gz"))
// verify we have all the kvs in compacted db which were there in source dbs.
compareCompactedDB(t, filepath.Join(tablePathInStorage, files[0].Name()), filepath.Join(objectStoragePath, "test-copy"))
// verify we have all the kvs in compacted db which were there in source dbs.
compareCompactedDB(t, filepath.Join(tablePathInStorage, files[0].Name()), filepath.Join(objectStoragePath, "test-copy"))
})
}
}
type TableMarkerFunc func(ctx context.Context, tableName string, db *bbolt.DB) (bool, int64, error)

Loading…
Cancel
Save