Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/compactor/table_test.go

871 lines
27 KiB

package compactor
import (
"context"
"fmt"
"os"
"path/filepath"
"slices"
"sort"
"strings"
"testing"
"time"
"github.com/go-kit/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/compactor/deletion"
"github.com/grafana/loki/v3/pkg/compactor/retention"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/local"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/storage"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)
const (
objectsStorageDirName = "objects"
workingDirName = "working-dir"
tableName = "test"
)
type indexSetState struct {
uploadCompactedDB bool
removeSourceObjects bool
}
func TestTable_Compaction(t *testing.T) {
// user counts are aligned with default upload parallelism
for _, numUsers := range []int{5, 10, 20} {
t.Run(fmt.Sprintf("numUsers=%d", numUsers), func(t *testing.T) {
for _, tc := range []struct {
numUnCompactedCommonDBs int
numUnCompactedPerUserDBs int
numCompactedDBs int
commonIndexSetState indexSetState
userIndexSetState indexSetState
}{
{},
{
numCompactedDBs: 1,
},
{
numCompactedDBs: 2,
commonIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
userIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedCommonDBs: 1,
commonIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedCommonDBs: 10,
commonIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedCommonDBs: 10,
numCompactedDBs: 1,
commonIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedCommonDBs: 10,
numCompactedDBs: 2,
commonIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
userIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedPerUserDBs: 1,
commonIndexSetState: indexSetState{
removeSourceObjects: true,
},
userIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedPerUserDBs: 1,
numCompactedDBs: 1,
commonIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
userIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedPerUserDBs: 1,
numCompactedDBs: 2,
commonIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
userIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedPerUserDBs: 10,
commonIndexSetState: indexSetState{
removeSourceObjects: true,
},
userIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedCommonDBs: 10,
numUnCompactedPerUserDBs: 10,
commonIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
userIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedCommonDBs: 10,
numUnCompactedPerUserDBs: 10,
numCompactedDBs: 1,
commonIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
userIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
{
numUnCompactedCommonDBs: 10,
numUnCompactedPerUserDBs: 10,
numCompactedDBs: 2,
commonIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
userIndexSetState: indexSetState{
uploadCompactedDB: true,
removeSourceObjects: true,
},
},
} {
commonDBsConfig := IndexesConfig{
NumCompactedFiles: tc.numCompactedDBs,
NumUnCompactedFiles: tc.numUnCompactedCommonDBs,
}
perUserDBsConfig := PerUserIndexesConfig{
IndexesConfig: IndexesConfig{
NumCompactedFiles: tc.numCompactedDBs,
NumUnCompactedFiles: tc.numUnCompactedPerUserDBs,
},
NumUsers: numUsers,
}
t.Run(fmt.Sprintf("%s ; %s", commonDBsConfig.String(), perUserDBsConfig.String()), func(t *testing.T) {
tempDir := t.TempDir()
objectStoragePath := filepath.Join(tempDir, objectsStorageDirName)
tablePathInStorage := filepath.Join(objectStoragePath, tableName)
tableWorkingDirectory := filepath.Join(tempDir, workingDirName, tableName)
SetupTable(t, filepath.Join(objectStoragePath, tableName), commonDBsConfig, perUserDBsConfig)
// do the compaction
objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath})
require.NoError(t, err)
table, err := newTable(context.Background(), tableWorkingDirectory, storage.NewIndexStorageClient(objectClient, ""),
newTestIndexCompactor(), config.PeriodConfig{}, nil, nil, 10)
require.NoError(t, err)
require.NoError(t, table.compact())
require.NoError(t, table.done())
table.cleanup()
numUserIndexSets, numCommonIndexSets := 0, 0
for _, is := range table.indexSets {
if is.baseIndexSet.IsUserBasedIndexSet() {
require.Equal(t, tc.userIndexSetState.uploadCompactedDB, is.uploadCompactedDB)
require.Equal(t, tc.userIndexSetState.removeSourceObjects, is.removeSourceObjects)
numUserIndexSets++
} else {
require.Equal(t, tc.commonIndexSetState.uploadCompactedDB, is.uploadCompactedDB)
require.Equal(t, tc.commonIndexSetState.removeSourceObjects, is.removeSourceObjects)
numCommonIndexSets++
}
}
// verify the state in the storage after compaction.
expectedNumCommonDBs := 0
if (commonDBsConfig.NumUnCompactedFiles + commonDBsConfig.NumCompactedFiles) > 0 {
require.Equal(t, 1, numCommonIndexSets)
expectedNumCommonDBs = 1
}
numExpectedUsers := 0
if (perUserDBsConfig.NumUnCompactedFiles + perUserDBsConfig.NumCompactedFiles) > 0 {
require.Equal(t, numUsers, numUserIndexSets)
numExpectedUsers = numUsers
}
validateTable(t, tablePathInStorage, expectedNumCommonDBs, numExpectedUsers, func(filename string) {
require.True(t, strings.HasSuffix(filename, ".gz"), filename)
})
verifyCompactedIndexTable(t, commonDBsConfig, perUserDBsConfig, tablePathInStorage)
// running compaction again should not do anything.
table, err = newTable(context.Background(), tableWorkingDirectory, storage.NewIndexStorageClient(objectClient, ""),
newTestIndexCompactor(), config.PeriodConfig{}, nil, nil, 10)
require.NoError(t, err)
require.NoError(t, table.compact())
require.NoError(t, table.done())
table.cleanup()
for _, is := range table.indexSets {
require.False(t, is.uploadCompactedDB)
require.False(t, is.removeSourceObjects)
}
})
}
})
}
}
type TableMarkerFunc func(ctx context.Context, tableName, userID string, indexFile retention.IndexProcessor, logger log.Logger) (bool, bool, error)
func (t TableMarkerFunc) FindAndMarkChunksForDeletion(ctx context.Context, tableName, userID string, indexFile retention.IndexProcessor, logger log.Logger) (bool, bool, error) {
return t(ctx, tableName, userID, indexFile, logger)
}
func (t TableMarkerFunc) MarkChunksForDeletion(_ string, _ []string) error {
return nil
}
type IntervalMayHaveExpiredChunksFunc func(interval model.Interval, userID string) bool
func (f IntervalMayHaveExpiredChunksFunc) IntervalMayHaveExpiredChunks(interval model.Interval, userID string) bool {
return f(interval, userID)
}
func TestTable_CompactionRetention(t *testing.T) {
numUsers := 10
type dbsSetup struct {
numUnCompactedCommonDBs int
numUnCompactedPerUserDBs int
numCompactedDBs int
}
for _, setup := range []dbsSetup{
{
numUnCompactedCommonDBs: 10,
numUnCompactedPerUserDBs: 10,
},
{
numCompactedDBs: 1,
},
{
numCompactedDBs: 10,
},
{
numUnCompactedCommonDBs: 10,
numUnCompactedPerUserDBs: 10,
numCompactedDBs: 1,
},
{
numUnCompactedCommonDBs: 1,
},
{
numUnCompactedPerUserDBs: 1,
},
} {
for name, tt := range map[string]struct {
dbsSetup dbsSetup
dbCount int
assert func(t *testing.T, storagePath, tableName string)
tableMarker retention.TableMarker
}{
"emptied table": {
dbsSetup: setup,
assert: func(t *testing.T, storagePath, tableName string) {
_, err := os.ReadDir(filepath.Join(storagePath, tableName))
require.True(t, os.IsNotExist(err))
},
tableMarker: TableMarkerFunc(func(_ context.Context, _, _ string, _ retention.IndexProcessor, _ log.Logger) (bool, bool, error) {
return true, true, nil
}),
},
"marked table": {
dbsSetup: setup,
assert: func(t *testing.T, storagePath, tableName string) {
expectedNumCommonDBs := 0
if setup.numUnCompactedCommonDBs+setup.numCompactedDBs > 0 {
expectedNumCommonDBs = 1
}
expectedNumUsers := 0
if setup.numUnCompactedPerUserDBs+setup.numCompactedDBs > 0 {
expectedNumUsers = numUsers
}
validateTable(t, filepath.Join(storagePath, tableName), expectedNumCommonDBs, expectedNumUsers, func(filename string) {
require.True(t, strings.HasSuffix(filename, ".gz"))
})
},
tableMarker: TableMarkerFunc(func(_ context.Context, _, _ string, _ retention.IndexProcessor, _ log.Logger) (bool, bool, error) {
return false, true, nil
}),
},
"not modified": {
dbsSetup: setup,
assert: func(t *testing.T, storagePath, tableName string) {
expectedNumCommonDBs := 0
if setup.numUnCompactedCommonDBs+setup.numCompactedDBs > 0 {
expectedNumCommonDBs = 1
}
expectedNumUsers := 0
if setup.numUnCompactedPerUserDBs+setup.numCompactedDBs > 0 {
expectedNumUsers = numUsers
}
validateTable(t, filepath.Join(storagePath, tableName), expectedNumCommonDBs, expectedNumUsers, func(filename string) {
require.True(t, strings.HasSuffix(filename, ".gz"))
})
},
tableMarker: TableMarkerFunc(func(_ context.Context, _, _ string, _ retention.IndexProcessor, _ log.Logger) (bool, bool, error) {
return false, false, nil
}),
},
} {
commonDBsConfig := IndexesConfig{
NumCompactedFiles: tt.dbsSetup.numCompactedDBs,
NumUnCompactedFiles: tt.dbsSetup.numUnCompactedCommonDBs,
}
perUserDBsConfig := PerUserIndexesConfig{
IndexesConfig: IndexesConfig{
NumUnCompactedFiles: tt.dbsSetup.numUnCompactedPerUserDBs,
NumCompactedFiles: tt.dbsSetup.numCompactedDBs,
},
NumUsers: numUsers,
}
t.Run(fmt.Sprintf("%s - %s ; %s", name, commonDBsConfig.String(), perUserDBsConfig.String()), func(t *testing.T) {
tempDir := t.TempDir()
tableName := fmt.Sprintf("%s12345", tableName)
objectStoragePath := filepath.Join(tempDir, objectsStorageDirName)
tableWorkingDirectory := filepath.Join(tempDir, workingDirName, tableName)
SetupTable(t, filepath.Join(objectStoragePath, tableName), commonDBsConfig, perUserDBsConfig)
// do the compaction
objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath})
require.NoError(t, err)
table, err := newTable(context.Background(), tableWorkingDirectory, storage.NewIndexStorageClient(objectClient, ""),
newTestIndexCompactor(), config.PeriodConfig{},
tt.tableMarker, IntervalMayHaveExpiredChunksFunc(func(_ model.Interval, _ string) bool {
return true
}), 10)
require.NoError(t, err)
defer table.cleanup()
require.NoError(t, table.compact())
require.NoError(t, table.applyRetention())
require.NoError(t, table.done())
tt.assert(t, objectStoragePath, tableName)
})
}
}
}
func validateTable(t *testing.T, path string, expectedNumCommonDBs, numUsers int, filesCallback func(filename string)) {
files, folders := listDir(t, path)
require.Len(t, files, expectedNumCommonDBs)
require.Len(t, folders, numUsers)
for _, fileName := range files {
filesCallback(fileName)
}
for _, folder := range folders {
files, folders := listDir(t, filepath.Join(path, folder))
require.Len(t, files, 1)
require.Len(t, folders, 0)
for _, fileName := range files {
filesCallback(fileName)
}
}
}
func listDir(t *testing.T, path string) (files, folders []string) {
dirEntries, err := os.ReadDir(path)
require.NoError(t, err)
for _, entry := range dirEntries {
if entry.IsDir() {
folders = append(folders, entry.Name())
} else {
files = append(files, entry.Name())
}
}
return
}
func TestTable_CompactionFailure(t *testing.T) {
tempDir := t.TempDir()
tableName := "test"
objectStoragePath := filepath.Join(tempDir, objectsStorageDirName)
tablePathInStorage := filepath.Join(objectStoragePath, tableName)
tableWorkingDirectory := filepath.Join(tempDir, workingDirName, tableName)
// setup some dbs
numDBs := 10
dbsToSetup := make(map[string]IndexFileConfig)
for i := 0; i < numDBs; i++ {
dbsToSetup[fmt.Sprint(i)] = IndexFileConfig{
CompressFile: i%2 == 0,
}
}
SetupTable(t, filepath.Join(objectStoragePath, tableName), IndexesConfig{NumCompactedFiles: numDBs}, PerUserIndexesConfig{})
// put a corrupt zip file in the table which should cause the compaction to fail in the middle because it would fail to open that file with boltdb client.
require.NoError(t, os.WriteFile(filepath.Join(tablePathInStorage, "fail.gz"), []byte("fail the compaction"), 0o666))
// do the compaction
objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath})
require.NoError(t, err)
table, err := newTable(context.Background(), tableWorkingDirectory, storage.NewIndexStorageClient(objectClient, ""),
newTestIndexCompactor(), config.PeriodConfig{}, nil, nil, 10)
require.NoError(t, err)
// compaction should fail due to a non-boltdb file.
require.Error(t, table.compact())
table.cleanup()
// ensure that files in storage are intact.
files, err := os.ReadDir(tablePathInStorage)
require.NoError(t, err)
require.Len(t, files, numDBs+1)
// ensure that we have cleanup the local working directory after failing the compaction.
require.NoFileExists(t, tableWorkingDirectory)
// remove the corrupt zip file and ensure that compaction succeeds now.
require.NoError(t, os.Remove(filepath.Join(tablePathInStorage, "fail.gz")))
table, err = newTable(context.Background(), tableWorkingDirectory, storage.NewIndexStorageClient(objectClient, ""),
newTestIndexCompactor(), config.PeriodConfig{}, nil, nil, 10)
require.NoError(t, err)
require.NoError(t, table.compact())
require.NoError(t, table.done())
table.cleanup()
// ensure that cleanup removes the local working directory.
require.NoFileExists(t, tableWorkingDirectory)
}
type chunkDeletionMarkerRecorder struct {
retention.TableMarker
chunkIDs []string
}
func (r *chunkDeletionMarkerRecorder) MarkChunksForDeletion(_ string, chunks []string) error {
r.chunkIDs = append(r.chunkIDs, chunks...)
return nil
}
func (r *chunkDeletionMarkerRecorder) sortEntries() {
sort.Strings(r.chunkIDs)
}
func TestTable_applyStorageUpdates(t *testing.T) {
user1 := "user1"
schemaCfg := config.SchemaConfig{
Configs: []config.PeriodConfig{
{
From: dayFromTime(0),
IndexType: "tsdb",
ObjectType: "filesystem",
Schema: "v13",
IndexTables: config.IndexPeriodicTableConfig{
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
}},
RowShards: 16,
},
},
}
var chunksToDeIndex []string
for i := 0; i < 10; i++ {
chunkID := schemaCfg.ExternalKey(logproto.ChunkRef{
Fingerprint: uint64(i),
UserID: user1,
From: model.Time(i),
Through: model.Time(i + 1),
Checksum: uint32(i),
})
chunksToDeIndex = append(chunksToDeIndex, chunkID)
}
// build 10 chunks with only the first 5 having a new chunk built out of them
var sourceChunkIDs []string
var newChunkIDs []string
rebuiltChunks := make(map[string]deletion.Chunk)
for i := 10; i < 20; i++ {
chunkID := schemaCfg.ExternalKey(logproto.ChunkRef{
Fingerprint: uint64(i),
UserID: user1,
From: model.Time(i),
Through: model.Time(i + 1),
Checksum: uint32(i),
})
var newChunk deletion.Chunk
if i >= 15 {
newChunk = dummyChunk{
from: model.Time(i),
through: model.Time(i + 1),
fingerprint: uint64(i),
checksum: uint32(i + 1),
kb: uint32(i),
entries: uint32(i),
}
newChunkID := schemaCfg.ExternalKey(logproto.ChunkRef{
Fingerprint: newChunk.GetFingerprint(),
UserID: user1,
From: newChunk.GetFrom(),
Through: newChunk.GetThrough(),
Checksum: newChunk.GetChecksum(),
})
newChunkIDs = append(newChunkIDs, newChunkID)
}
rebuiltChunks[chunkID] = newChunk
sourceChunkIDs = append(sourceChunkIDs, chunkID)
}
lblFoo := labels.FromStrings("foo", "bar")
for _, tc := range []struct {
name string
indexUpdatesRecorder *indexUpdatesRecorder
expectedChunksToRemoveFromIndex func() map[string][]string
expectedChunksToAddToIndex func() map[string][]deletion.Chunk
expectedChunksMarkedForDeletion func() []string
noUserIndex bool
}{
{
name: "no source chunks missing",
indexUpdatesRecorder: newIndexUpdatesRecorder(schemaCfg, nil),
expectedChunksToRemoveFromIndex: func() map[string][]string {
entries := make([]string, 0, len(chunksToDeIndex)+len(rebuiltChunks))
entries = append(entries, chunksToDeIndex...)
for chunkID := range rebuiltChunks {
entries = append(entries, chunkID)
}
sort.Strings(entries)
return map[string][]string{lblFoo.String(): entries}
},
expectedChunksToAddToIndex: func() map[string][]deletion.Chunk {
chunks := make([]deletion.Chunk, 0, len(rebuiltChunks))
for _, newChunk := range rebuiltChunks {
if newChunk != nil {
chunks = append(chunks, newChunk)
}
}
slices.SortFunc(chunks, func(a, b deletion.Chunk) int {
if a.GetFrom() < b.GetFrom() {
return -1
} else if a.GetFrom() > b.GetFrom() {
return 1
}
return 0
})
return map[string][]deletion.Chunk{lblFoo.String(): chunks}
},
expectedChunksMarkedForDeletion: func() []string {
resp := make([]string, 0, len(rebuiltChunks))
for chunkID := range rebuiltChunks {
resp = append(resp, chunkID)
}
sort.Strings(resp)
return resp
},
},
{
name: "some source chunks missing with all the new chunks already indexed",
indexUpdatesRecorder: newIndexUpdatesRecorder(schemaCfg, sourceChunkIDs[3:7]),
expectedChunksToRemoveFromIndex: func() map[string][]string {
entries := append([]string{}, chunksToDeIndex...)
entries = append(entries, sourceChunkIDs[:3]...)
entries = append(entries, sourceChunkIDs[7:]...)
sort.Strings(entries)
return map[string][]string{lblFoo.String(): entries}
},
expectedChunksToAddToIndex: func() map[string][]deletion.Chunk {
chunks := make([]deletion.Chunk, 0, len(rebuiltChunks))
for _, sourceChunkID := range sourceChunkIDs[7:] {
newChunk := rebuiltChunks[sourceChunkID]
if newChunk == nil {
continue
}
chunks = append(chunks, newChunk)
}
slices.SortFunc(chunks, func(a, b deletion.Chunk) int {
if a.GetFrom() < b.GetFrom() {
return -1
} else if a.GetFrom() > b.GetFrom() {
return 1
}
return 0
})
return map[string][]deletion.Chunk{lblFoo.String(): chunks}
},
expectedChunksMarkedForDeletion: func() []string {
chunkIDs := make([]string, 0, len(rebuiltChunks))
// add all the source chunkIDs for deletion
for chunkID := range rebuiltChunks {
chunkIDs = append(chunkIDs, chunkID)
}
sort.Strings(chunkIDs)
return chunkIDs
},
},
{
name: "some source chunks missing with none of the new chunks indexed",
indexUpdatesRecorder: newIndexUpdatesRecorder(schemaCfg, append(append([]string{}, sourceChunkIDs[3:7]...), newChunkIDs...)),
expectedChunksToRemoveFromIndex: func() map[string][]string {
entries := append([]string{}, chunksToDeIndex...)
entries = append(entries, sourceChunkIDs[:3]...)
entries = append(entries, sourceChunkIDs[7:]...)
sort.Strings(entries)
return map[string][]string{lblFoo.String(): entries}
},
expectedChunksToAddToIndex: func() map[string][]deletion.Chunk {
chunks := make([]deletion.Chunk, 0, len(rebuiltChunks))
for _, sourceChunkID := range sourceChunkIDs[7:] {
newChunk := rebuiltChunks[sourceChunkID]
if newChunk == nil {
continue
}
chunks = append(chunks, newChunk)
}
slices.SortFunc(chunks, func(a, b deletion.Chunk) int {
if a.GetFrom() < b.GetFrom() {
return -1
} else if a.GetFrom() > b.GetFrom() {
return 1
}
return 0
})
return map[string][]deletion.Chunk{lblFoo.String(): chunks}
},
expectedChunksMarkedForDeletion: func() []string {
chunkIDs := make([]string, 0, len(rebuiltChunks))
// add all the source chunkIDs for deletion
for chunkID := range rebuiltChunks {
chunkIDs = append(chunkIDs, chunkID)
}
// also add the new chunks for deletion which have their source chunks missing
for _, sourceChunkID := range sourceChunkIDs[5:7] {
newChunk := rebuiltChunks[sourceChunkID]
chunkID := schemaCfg.ExternalKey(logproto.ChunkRef{
Fingerprint: newChunk.GetFingerprint(),
UserID: user1,
From: newChunk.GetFrom(),
Through: newChunk.GetThrough(),
Checksum: newChunk.GetChecksum(),
})
chunkIDs = append(chunkIDs, chunkID)
}
sort.Strings(chunkIDs)
return chunkIDs
},
},
{
name: "all the source chunks missing with all the new chunks already indexed",
indexUpdatesRecorder: newIndexUpdatesRecorder(schemaCfg, sourceChunkIDs),
expectedChunksToRemoveFromIndex: func() map[string][]string {
// only the chunksToDeIndex should be removed from index
return map[string][]string{lblFoo.String(): chunksToDeIndex}
},
expectedChunksToAddToIndex: func() map[string][]deletion.Chunk {
// no chunks to index since we have no source chunks in the index
return map[string][]deletion.Chunk{}
},
expectedChunksMarkedForDeletion: func() []string {
chunkIDs := make([]string, 0, len(rebuiltChunks))
// add all the source chunkIDs for deletion
for chunkID := range rebuiltChunks {
chunkIDs = append(chunkIDs, chunkID)
}
sort.Strings(chunkIDs)
return chunkIDs
},
},
{
name: "all the source chunks missing with none of the new chunks indexed",
indexUpdatesRecorder: newIndexUpdatesRecorder(schemaCfg, append(append([]string{}, sourceChunkIDs...), newChunkIDs...)),
expectedChunksToRemoveFromIndex: func() map[string][]string {
// only the chunksToDeIndex should be removed from index
return map[string][]string{lblFoo.String(): chunksToDeIndex}
},
expectedChunksToAddToIndex: func() map[string][]deletion.Chunk {
// no chunks to index since we have no source chunks in the index
return map[string][]deletion.Chunk{}
},
expectedChunksMarkedForDeletion: func() []string {
chunkIDs := make([]string, 0, len(rebuiltChunks))
// add all the source chunkIDs for deletion
for chunkID := range rebuiltChunks {
chunkIDs = append(chunkIDs, chunkID)
}
// all the newly built chunks should be marked for deletion since their source chunks are missing from index
for _, newChunk := range rebuiltChunks {
if newChunk == nil {
continue
}
chunkID := schemaCfg.ExternalKey(logproto.ChunkRef{
Fingerprint: newChunk.GetFingerprint(),
UserID: user1,
From: newChunk.GetFrom(),
Through: newChunk.GetThrough(),
Checksum: newChunk.GetChecksum(),
})
chunkIDs = append(chunkIDs, chunkID)
}
sort.Strings(chunkIDs)
return chunkIDs
},
},
{
name: "user without any index",
noUserIndex: true,
expectedChunksToRemoveFromIndex: func() map[string][]string {
return map[string][]string{}
},
expectedChunksToAddToIndex: func() map[string][]deletion.Chunk {
return map[string][]deletion.Chunk{}
},
expectedChunksMarkedForDeletion: func() []string {
chunkIDs := make([]string, 0, len(rebuiltChunks))
// all the newly built chunks should be marked for deletion since the whole user index is missing
for _, newChunk := range rebuiltChunks {
if newChunk == nil {
continue
}
chunkID := schemaCfg.ExternalKey(logproto.ChunkRef{
Fingerprint: newChunk.GetFingerprint(),
UserID: user1,
From: newChunk.GetFrom(),
Through: newChunk.GetThrough(),
Checksum: newChunk.GetChecksum(),
})
chunkIDs = append(chunkIDs, chunkID)
}
sort.Strings(chunkIDs)
return chunkIDs
},
},
} {
t.Run(tc.name, func(t *testing.T) {
chunkDeletionMarkerRecorder := &chunkDeletionMarkerRecorder{}
table := &table{
name: "t1",
periodConfig: schemaCfg.Configs[0],
indexSets: map[string]*indexSet{user1: {
userID: user1,
compactedIndex: tc.indexUpdatesRecorder,
}},
logger: util_log.Logger,
ctx: context.Background(),
tableMarker: chunkDeletionMarkerRecorder,
}
if tc.noUserIndex {
table.indexSets = map[string]*indexSet{}
}
lblFoo := labels.FromStrings("foo", "bar")
err := table.applyStorageUpdates(user1, lblFoo.String(), rebuiltChunks, chunksToDeIndex)
require.NoError(t, err)
expectedChunksMarkedForDeletion := tc.expectedChunksMarkedForDeletion()
chunkDeletionMarkerRecorder.sortEntries()
require.Equal(t, expectedChunksMarkedForDeletion, chunkDeletionMarkerRecorder.chunkIDs)
if !tc.noUserIndex {
tc.indexUpdatesRecorder.sortEntries()
expectedChunksToRemoveFromIndex := tc.expectedChunksToRemoveFromIndex()
require.Equal(t, expectedChunksToRemoveFromIndex, tc.indexUpdatesRecorder.removedChunks)
expectedChunksToAddToIndex := tc.expectedChunksToAddToIndex()
require.Equal(t, expectedChunksToAddToIndex, tc.indexUpdatesRecorder.indexedChunks)
}
})
}
}