Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/compactor/table_test.go

494 lines
15 KiB

package compactor
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"testing"
"github.com/go-kit/log"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/compactor/retention"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/local"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/storage"
)
const (
objectsStorageDirName = "objects"
workingDirName = "working-dir"
tableName = "test"
)
type indexSetState struct {
uploadCompactedDB bool
removeSourceObjects bool
}
func TestTable_Compaction(t *testing.T) {
// user counts are aligned with default upload parallelism
for _, numUsers := range []int{5, 10, 20} {
t.Run(fmt.Sprintf("numUsers=%d", numUsers), func(t *testing.T) {
for _, tc := range []struct {
numUnCompactedCommonDBs int
numUnCompactedPerUserDBs int
numCompactedDBs int
commonIndexSetState indexSetState
userIndexSetState indexSetState
}{
{},
{
numCompactedDBs: 1,
},
{
numCompactedDBs: 2,
commonIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
userIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedCommonDBs: 1,
commonIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedCommonDBs: 10,
commonIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedCommonDBs: 10,
numCompactedDBs: 1,
commonIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedCommonDBs: 10,
numCompactedDBs: 2,
commonIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
userIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedPerUserDBs: 1,
commonIndexSetState: indexSetState{
removeSourceObjects: true,
},
userIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedPerUserDBs: 1,
numCompactedDBs: 1,
commonIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
userIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedPerUserDBs: 1,
numCompactedDBs: 2,
commonIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
userIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedPerUserDBs: 10,
commonIndexSetState: indexSetState{
removeSourceObjects: true,
},
userIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedCommonDBs: 10,
numUnCompactedPerUserDBs: 10,
commonIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
userIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedCommonDBs: 10,
numUnCompactedPerUserDBs: 10,
numCompactedDBs: 1,
commonIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
userIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedCommonDBs: 10,
numUnCompactedPerUserDBs: 10,
numCompactedDBs: 2,
commonIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
userIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
} {
commonDBsConfig := IndexesConfig{
NumCompactedFiles: tc.numCompactedDBs,
NumUnCompactedFiles: tc.numUnCompactedCommonDBs,
}
perUserDBsConfig := PerUserIndexesConfig{
IndexesConfig: IndexesConfig{
NumCompactedFiles: tc.numCompactedDBs,
NumUnCompactedFiles: tc.numUnCompactedPerUserDBs,
},
NumUsers: numUsers,
}
t.Run(fmt.Sprintf("%s ; %s", commonDBsConfig.String(), perUserDBsConfig.String()), func(t *testing.T) {
tempDir := t.TempDir()
objectStoragePath := filepath.Join(tempDir, objectsStorageDirName)
tablePathInStorage := filepath.Join(objectStoragePath, tableName)
tableWorkingDirectory := filepath.Join(tempDir, workingDirName, tableName)
SetupTable(t, filepath.Join(objectStoragePath, tableName), commonDBsConfig, perUserDBsConfig)
// do the compaction
objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath})
require.NoError(t, err)
table, err := newTable(context.Background(), tableWorkingDirectory, storage.NewIndexStorageClient(objectClient, ""),
newTestIndexCompactor(), config.PeriodConfig{}, nil, nil, 10)
require.NoError(t, err)
require.NoError(t, table.compact())
require.NoError(t, table.done())
table.cleanup()
numUserIndexSets, numCommonIndexSets := 0, 0
for _, is := range table.indexSets {
if is.baseIndexSet.IsUserBasedIndexSet() {
require.Equal(t, tc.userIndexSetState.uploadCompactedDB, is.uploadCompactedDB)
require.Equal(t, tc.userIndexSetState.removeSourceObjects, is.removeSourceObjects)
numUserIndexSets++
} else {
require.Equal(t, tc.commonIndexSetState.uploadCompactedDB, is.uploadCompactedDB)
require.Equal(t, tc.commonIndexSetState.removeSourceObjects, is.removeSourceObjects)
numCommonIndexSets++
}
}
// verify the state in the storage after compaction.
expectedNumCommonDBs := 0
if (commonDBsConfig.NumUnCompactedFiles + commonDBsConfig.NumCompactedFiles) > 0 {
require.Equal(t, 1, numCommonIndexSets)
expectedNumCommonDBs = 1
}
numExpectedUsers := 0
if (perUserDBsConfig.NumUnCompactedFiles + perUserDBsConfig.NumCompactedFiles) > 0 {
require.Equal(t, numUsers, numUserIndexSets)
numExpectedUsers = numUsers
}
validateTable(t, tablePathInStorage, expectedNumCommonDBs, numExpectedUsers, func(filename string) {
require.True(t, strings.HasSuffix(filename, ".gz"), filename)
})
verifyCompactedIndexTable(t, commonDBsConfig, perUserDBsConfig, tablePathInStorage)
// running compaction again should not do anything.
table, err = newTable(context.Background(), tableWorkingDirectory, storage.NewIndexStorageClient(objectClient, ""),
newTestIndexCompactor(), config.PeriodConfig{}, nil, nil, 10)
require.NoError(t, err)
require.NoError(t, table.compact())
require.NoError(t, table.done())
table.cleanup()
for _, is := range table.indexSets {
require.False(t, is.uploadCompactedDB)
require.False(t, is.removeSourceObjects)
}
})
}
})
}
}
type TableMarkerFunc func(ctx context.Context, tableName, userID string, indexFile retention.IndexProcessor, logger log.Logger) (bool, bool, error)
func (t TableMarkerFunc) FindAndMarkChunksForDeletion(ctx context.Context, tableName, userID string, indexFile retention.IndexProcessor, logger log.Logger) (bool, bool, error) {
return t(ctx, tableName, userID, indexFile, logger)
}
func (t TableMarkerFunc) MarkChunksForDeletion(_ string, _ []string) error {
return nil
}
type IntervalMayHaveExpiredChunksFunc func(interval model.Interval, userID string) bool
func (f IntervalMayHaveExpiredChunksFunc) IntervalMayHaveExpiredChunks(interval model.Interval, userID string) bool {
return f(interval, userID)
}
func TestTable_CompactionRetention(t *testing.T) {
numUsers := 10
type dbsSetup struct {
numUnCompactedCommonDBs int
numUnCompactedPerUserDBs int
numCompactedDBs int
}
for _, setup := range []dbsSetup{
{
numUnCompactedCommonDBs: 10,
numUnCompactedPerUserDBs: 10,
},
{
numCompactedDBs: 1,
},
{
numCompactedDBs: 10,
},
{
numUnCompactedCommonDBs: 10,
numUnCompactedPerUserDBs: 10,
numCompactedDBs: 1,
},
{
numUnCompactedCommonDBs: 1,
},
{
numUnCompactedPerUserDBs: 1,
},
} {
for name, tt := range map[string]struct {
dbsSetup dbsSetup
dbCount int
assert func(t *testing.T, storagePath, tableName string)
tableMarker retention.TableMarker
}{
"emptied table": {
dbsSetup: setup,
assert: func(t *testing.T, storagePath, tableName string) {
_, err := os.ReadDir(filepath.Join(storagePath, tableName))
require.True(t, os.IsNotExist(err))
},
tableMarker: TableMarkerFunc(func(_ context.Context, _, _ string, _ retention.IndexProcessor, _ log.Logger) (bool, bool, error) {
return true, true, nil
}),
},
"marked table": {
dbsSetup: setup,
assert: func(t *testing.T, storagePath, tableName string) {
expectedNumCommonDBs := 0
if setup.numUnCompactedCommonDBs+setup.numCompactedDBs > 0 {
expectedNumCommonDBs = 1
}
expectedNumUsers := 0
if setup.numUnCompactedPerUserDBs+setup.numCompactedDBs > 0 {
expectedNumUsers = numUsers
}
validateTable(t, filepath.Join(storagePath, tableName), expectedNumCommonDBs, expectedNumUsers, func(filename string) {
require.True(t, strings.HasSuffix(filename, ".gz"))
})
},
tableMarker: TableMarkerFunc(func(_ context.Context, _, _ string, _ retention.IndexProcessor, _ log.Logger) (bool, bool, error) {
return false, true, nil
}),
},
"not modified": {
dbsSetup: setup,
assert: func(t *testing.T, storagePath, tableName string) {
expectedNumCommonDBs := 0
if setup.numUnCompactedCommonDBs+setup.numCompactedDBs > 0 {
expectedNumCommonDBs = 1
}
expectedNumUsers := 0
if setup.numUnCompactedPerUserDBs+setup.numCompactedDBs > 0 {
expectedNumUsers = numUsers
}
validateTable(t, filepath.Join(storagePath, tableName), expectedNumCommonDBs, expectedNumUsers, func(filename string) {
require.True(t, strings.HasSuffix(filename, ".gz"))
})
},
tableMarker: TableMarkerFunc(func(_ context.Context, _, _ string, _ retention.IndexProcessor, _ log.Logger) (bool, bool, error) {
return false, false, nil
}),
},
} {
commonDBsConfig := IndexesConfig{
NumCompactedFiles: tt.dbsSetup.numCompactedDBs,
NumUnCompactedFiles: tt.dbsSetup.numUnCompactedCommonDBs,
}
perUserDBsConfig := PerUserIndexesConfig{
IndexesConfig: IndexesConfig{
NumUnCompactedFiles: tt.dbsSetup.numUnCompactedPerUserDBs,
NumCompactedFiles: tt.dbsSetup.numCompactedDBs,
},
NumUsers: numUsers,
}
t.Run(fmt.Sprintf("%s - %s ; %s", name, commonDBsConfig.String(), perUserDBsConfig.String()), func(t *testing.T) {
tempDir := t.TempDir()
tableName := fmt.Sprintf("%s12345", tableName)
objectStoragePath := filepath.Join(tempDir, objectsStorageDirName)
tableWorkingDirectory := filepath.Join(tempDir, workingDirName, tableName)
SetupTable(t, filepath.Join(objectStoragePath, tableName), commonDBsConfig, perUserDBsConfig)
// do the compaction
objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath})
require.NoError(t, err)
table, err := newTable(context.Background(), tableWorkingDirectory, storage.NewIndexStorageClient(objectClient, ""),
newTestIndexCompactor(), config.PeriodConfig{},
tt.tableMarker, IntervalMayHaveExpiredChunksFunc(func(_ model.Interval, _ string) bool {
return true
}), 10)
require.NoError(t, err)
defer table.cleanup()
require.NoError(t, table.compact())
require.NoError(t, table.applyRetention())
require.NoError(t, table.done())
tt.assert(t, objectStoragePath, tableName)
})
}
}
}
func validateTable(t *testing.T, path string, expectedNumCommonDBs, numUsers int, filesCallback func(filename string)) {
files, folders := listDir(t, path)
require.Len(t, files, expectedNumCommonDBs)
require.Len(t, folders, numUsers)
for _, fileName := range files {
filesCallback(fileName)
}
for _, folder := range folders {
files, folders := listDir(t, filepath.Join(path, folder))
require.Len(t, files, 1)
require.Len(t, folders, 0)
for _, fileName := range files {
filesCallback(fileName)
}
}
}
func listDir(t *testing.T, path string) (files, folders []string) {
dirEntries, err := os.ReadDir(path)
require.NoError(t, err)
for _, entry := range dirEntries {
if entry.IsDir() {
folders = append(folders, entry.Name())
} else {
files = append(files, entry.Name())
}
}
return
}
func TestTable_CompactionFailure(t *testing.T) {
tempDir := t.TempDir()
tableName := "test"
objectStoragePath := filepath.Join(tempDir, objectsStorageDirName)
tablePathInStorage := filepath.Join(objectStoragePath, tableName)
tableWorkingDirectory := filepath.Join(tempDir, workingDirName, tableName)
// setup some dbs
numDBs := 10
dbsToSetup := make(map[string]IndexFileConfig)
for i := 0; i < numDBs; i++ {
dbsToSetup[fmt.Sprint(i)] = IndexFileConfig{
CompressFile: i%2 == 0,
}
}
SetupTable(t, filepath.Join(objectStoragePath, tableName), IndexesConfig{NumCompactedFiles: numDBs}, PerUserIndexesConfig{})
// put a corrupt zip file in the table which should cause the compaction to fail in the middle because it would fail to open that file with boltdb client.
require.NoError(t, os.WriteFile(filepath.Join(tablePathInStorage, "fail.gz"), []byte("fail the compaction"), 0o666))
// do the compaction
objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath})
require.NoError(t, err)
table, err := newTable(context.Background(), tableWorkingDirectory, storage.NewIndexStorageClient(objectClient, ""),
newTestIndexCompactor(), config.PeriodConfig{}, nil, nil, 10)
require.NoError(t, err)
// compaction should fail due to a non-boltdb file.
require.Error(t, table.compact())
table.cleanup()
// ensure that files in storage are intact.
files, err := os.ReadDir(tablePathInStorage)
require.NoError(t, err)
require.Len(t, files, numDBs+1)
// ensure that we have cleanup the local working directory after failing the compaction.
require.NoFileExists(t, tableWorkingDirectory)
// remove the corrupt zip file and ensure that compaction succeeds now.
require.NoError(t, os.Remove(filepath.Join(tablePathInStorage, "fail.gz")))
table, err = newTable(context.Background(), tableWorkingDirectory, storage.NewIndexStorageClient(objectClient, ""),
newTestIndexCompactor(), config.PeriodConfig{}, nil, nil, 10)
require.NoError(t, err)
require.NoError(t, table.compact())
require.NoError(t, table.done())
table.cleanup()
// ensure that cleanup removes the local working directory.
require.NoFileExists(t, tableWorkingDirectory)
}