mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
388 lines
13 KiB
388 lines
13 KiB
package compactor
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"unsafe"
|
|
|
|
"github.com/go-kit/log"
|
|
"github.com/go-kit/log/level"
|
|
"github.com/grafana/dskit/concurrency"
|
|
"github.com/prometheus/common/model"
|
|
|
|
"github.com/grafana/loki/v3/pkg/compactor/deletion"
|
|
"github.com/grafana/loki/v3/pkg/compactor/retention"
|
|
"github.com/grafana/loki/v3/pkg/logproto"
|
|
"github.com/grafana/loki/v3/pkg/logql/syntax"
|
|
chunk_util "github.com/grafana/loki/v3/pkg/storage/chunk/client/util"
|
|
"github.com/grafana/loki/v3/pkg/storage/config"
|
|
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/storage"
|
|
util_log "github.com/grafana/loki/v3/pkg/util/log"
|
|
)
|
|
|
|
const (
|
|
gzipExtension = ".gz"
|
|
)
|
|
|
|
var errFileCountNotOne = fmt.Errorf("can't apply retention or index updates when index file count is not one")
|
|
|
|
type tableExpirationChecker interface {
|
|
IntervalMayHaveExpiredChunks(interval model.Interval, userID string) bool
|
|
}
|
|
|
|
type IndexCompactor interface {
|
|
// NewTableCompactor returns a new TableCompactor for compacting a table.
|
|
// commonIndexSet refers to common index files or in other words multi-tenant index.
|
|
// existingUserIndexSet refers to existing user specific index files in the storage.
|
|
// makeEmptyUserIndexSetFunc can be used for creating an empty indexSet for a user
|
|
// who does not have an index for it in existingUserIndexSet.
|
|
// periodConfig holds the PeriodConfig for the table.
|
|
NewTableCompactor(
|
|
ctx context.Context,
|
|
commonIndexSet IndexSet,
|
|
existingUserIndexSet map[string]IndexSet,
|
|
makeEmptyUserIndexSetFunc MakeEmptyUserIndexSetFunc,
|
|
periodConfig config.PeriodConfig,
|
|
) TableCompactor
|
|
|
|
// OpenCompactedIndexFile opens a compressed index file at given path.
|
|
OpenCompactedIndexFile(
|
|
ctx context.Context,
|
|
path,
|
|
tableName,
|
|
userID,
|
|
workingDir string,
|
|
periodConfig config.PeriodConfig,
|
|
logger log.Logger,
|
|
) (
|
|
CompactedIndex,
|
|
error,
|
|
)
|
|
}
|
|
|
|
type TableCompactor interface {
|
|
// CompactTable compacts the table.
|
|
// After compaction is done successfully, it should set the new/updated CompactedIndex for relevant IndexSets.
|
|
CompactTable() (err error)
|
|
}
|
|
|
|
type Chunk interface {
|
|
GetFrom() model.Time
|
|
GetThrough() model.Time
|
|
GetFingerprint() uint64
|
|
GetChecksum() uint32
|
|
GetSize() uint32
|
|
GetEntriesCount() uint32
|
|
}
|
|
|
|
type MakeEmptyUserIndexSetFunc func(userID string) (IndexSet, error)
|
|
|
|
type table struct {
|
|
name string
|
|
workingDirectory string
|
|
uploadConcurrency int
|
|
indexStorageClient storage.Client
|
|
indexCompactor IndexCompactor
|
|
tableMarker retention.TableMarker
|
|
expirationChecker tableExpirationChecker
|
|
periodConfig config.PeriodConfig
|
|
|
|
baseUserIndexSet, baseCommonIndexSet storage.IndexSet
|
|
|
|
indexSets map[string]*indexSet
|
|
usersWithPerUserIndex []string
|
|
logger log.Logger
|
|
|
|
ctx context.Context
|
|
}
|
|
|
|
func newTable(ctx context.Context, workingDirectory string, indexStorageClient storage.Client,
|
|
indexCompactor IndexCompactor, periodConfig config.PeriodConfig,
|
|
tableMarker retention.TableMarker, expirationChecker tableExpirationChecker,
|
|
uploadConcurrency int,
|
|
) (*table, error) {
|
|
err := chunk_util.EnsureDirectory(workingDirectory)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
table := table{
|
|
ctx: ctx,
|
|
name: filepath.Base(workingDirectory),
|
|
workingDirectory: workingDirectory,
|
|
indexStorageClient: indexStorageClient,
|
|
indexCompactor: indexCompactor,
|
|
tableMarker: tableMarker,
|
|
expirationChecker: expirationChecker,
|
|
periodConfig: periodConfig,
|
|
indexSets: map[string]*indexSet{},
|
|
baseUserIndexSet: storage.NewIndexSet(indexStorageClient, true),
|
|
baseCommonIndexSet: storage.NewIndexSet(indexStorageClient, false),
|
|
uploadConcurrency: uploadConcurrency,
|
|
}
|
|
table.logger = log.With(util_log.Logger, "table-name", table.name)
|
|
|
|
return &table, nil
|
|
}
|
|
|
|
func (t *table) compact() error {
|
|
t.indexStorageClient.RefreshIndexTableCache(t.ctx, t.name)
|
|
indexFiles, usersWithPerUserIndex, err := t.indexStorageClient.ListFiles(t.ctx, t.name, false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(indexFiles) == 0 && len(usersWithPerUserIndex) == 0 {
|
|
level.Info(t.logger).Log("msg", "no common index files and user index found")
|
|
return nil
|
|
}
|
|
|
|
t.usersWithPerUserIndex = usersWithPerUserIndex
|
|
|
|
level.Info(t.logger).Log("msg", "listed files", "count", len(indexFiles))
|
|
|
|
t.indexSets[""], err = newCommonIndexSet(t.ctx, t.name, t.baseCommonIndexSet, t.workingDirectory, t.logger)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// userIndexSets is just for passing it to NewTableCompactor since go considers map[string]*indexSet different type than map[string]IndexSet
|
|
userIndexSets := make(map[string]IndexSet, len(t.usersWithPerUserIndex))
|
|
|
|
for _, userID := range t.usersWithPerUserIndex {
|
|
var err error
|
|
t.indexSets[userID], err = newUserIndexSet(t.ctx, t.name, userID, t.baseUserIndexSet, filepath.Join(t.workingDirectory, userID), t.logger)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
userIndexSets[userID] = t.indexSets[userID]
|
|
}
|
|
|
|
// protect indexSets with mutex so that we are concurrency safe if the TableCompactor calls MakeEmptyUserIndexSetFunc concurrently
|
|
indexSetsMtx := sync.Mutex{}
|
|
tableCompactor := t.indexCompactor.NewTableCompactor(t.ctx, t.indexSets[""], userIndexSets, func(userID string) (IndexSet, error) {
|
|
indexSetsMtx.Lock()
|
|
defer indexSetsMtx.Unlock()
|
|
|
|
indexSet, err := newUserIndexSet(t.ctx, t.name, userID, t.baseUserIndexSet, filepath.Join(t.workingDirectory, userID), t.logger)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
t.indexSets[userID] = indexSet
|
|
return indexSet, nil
|
|
}, t.periodConfig)
|
|
|
|
return tableCompactor.CompactTable()
|
|
}
|
|
|
|
// done takes care of uploading the index to the object storage and removing any source index files that were compacted away.
|
|
// No index updates must be done after calling this method.
|
|
func (t *table) done() error {
|
|
userIDs := make([]string, 0, len(t.indexSets))
|
|
for userID := range t.indexSets {
|
|
// indexSet.done() uploads the compacted db and cleans up the source index files.
|
|
// For user index sets, the files from common index sets are also a source of index.
|
|
// if we cleanup common index sets first, and we fail to upload newly compacted dbs in user index sets, then we will lose data.
|
|
// To avoid any data loss, we should call done() on common index sets at the end.
|
|
if userID == "" {
|
|
continue
|
|
}
|
|
|
|
userIDs = append(userIDs, userID)
|
|
}
|
|
|
|
err := concurrency.ForEachJob(t.ctx, len(userIDs), t.uploadConcurrency, func(_ context.Context, idx int) error {
|
|
return t.indexSets[userIDs[idx]].done()
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if commonIndexSet, ok := t.indexSets[""]; ok {
|
|
if err := commonIndexSet.done(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// applyRetention applies retention on the index sets
|
|
func (t *table) applyRetention() error {
|
|
tableInterval := retention.ExtractIntervalFromTableName(t.name)
|
|
// call runRetention on the index sets which may have expired chunks
|
|
for userID, is := range t.indexSets {
|
|
// Make sure we do not apply retention on common index set when one of the following is true:
|
|
// 1. It got compacted away to the per-user indexes.
|
|
// 2. There are no common index files.
|
|
if userID == "" && is.compactedIndex == nil && ((is.removeSourceObjects && !is.uploadCompactedDB) || len(is.ListSourceFiles()) == 0) {
|
|
continue
|
|
}
|
|
|
|
if !t.expirationChecker.IntervalMayHaveExpiredChunks(tableInterval, userID) {
|
|
continue
|
|
}
|
|
|
|
// compactedIndex is only set in indexSet when files have been compacted,
|
|
// so we need to open the compacted index file for applying retention if compactedIndex is nil
|
|
if is.compactedIndex == nil {
|
|
if err := t.openCompactedIndexForUpdates(is); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
err := is.runRetention(t.tableMarker)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *table) openCompactedIndexForUpdates(idxSet *indexSet) error {
|
|
sourceFiles := idxSet.ListSourceFiles()
|
|
if len(sourceFiles) != 1 {
|
|
return errFileCountNotOne
|
|
}
|
|
|
|
downloadedAt, err := idxSet.GetSourceFile(sourceFiles[0])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
compactedIndexFile, err := t.indexCompactor.OpenCompactedIndexFile(t.ctx, downloadedAt, t.name, idxSet.userID, filepath.Join(t.workingDirectory, idxSet.userID), t.periodConfig, idxSet.logger)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
idxSet.setCompactedIndex(compactedIndexFile, false, false)
|
|
|
|
return nil
|
|
}
|
|
|
|
// applyStorageUpdates applies storage updates for a single stream of a user
|
|
func (t *table) applyStorageUpdates(userID, labelsStr string, rebuiltChunks map[string]deletion.Chunk, chunksToDeIndex []string) error {
|
|
labels, err := syntax.ParseLabels(labelsStr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
is, ok := t.indexSets[userID]
|
|
if !ok {
|
|
// Index for the user does not exist, likely removed by retention/deletion without line filter.
|
|
// Mark all the rebuilt chunks for deletion.
|
|
level.Info(util_log.Logger).Log("msg", "user index not found, removing the newly built chunks", "table_name", t.name, "userID", userID)
|
|
chunksToDelete := make([]string, 0, len(rebuiltChunks))
|
|
cfg := config.SchemaConfig{Configs: []config.PeriodConfig{t.periodConfig}}
|
|
|
|
for _, newChunk := range rebuiltChunks {
|
|
if newChunk == nil {
|
|
continue
|
|
}
|
|
chunkID := cfg.ExternalKey(logproto.ChunkRef{
|
|
Fingerprint: newChunk.GetFingerprint(),
|
|
UserID: userID,
|
|
From: newChunk.GetFrom(),
|
|
Through: newChunk.GetThrough(),
|
|
Checksum: newChunk.GetChecksum(),
|
|
})
|
|
chunksToDelete = append(chunksToDelete, chunkID)
|
|
}
|
|
|
|
return t.tableMarker.MarkChunksForDeletion(t.name, chunksToDelete)
|
|
}
|
|
|
|
// compactedIndex is only set in indexSet when files have been compacted,
|
|
// so we need to open the compacted index file for applying index updates if compactedIndex is nil
|
|
if is.compactedIndex == nil {
|
|
if err := t.openCompactedIndexForUpdates(is); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
chunksNotIndexed, err := is.applyUpdates(labels, rebuiltChunks, chunksToDeIndex)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// build the list of source chunks to delete
|
|
chunksToDelete := make([]string, 0, len(rebuiltChunks)+len(chunksNotIndexed))
|
|
for chunkID := range rebuiltChunks {
|
|
chunksToDelete = append(chunksToDelete, chunkID)
|
|
}
|
|
|
|
// Remove the newly built chunks which were not indexed due to their source chunks missing from the current index.
|
|
// Source chunks could be deleted by retention or delete requests without line filters.
|
|
// However, since storage updates are supposed to be idempotent, see if the chunk was already indexed in previous attempts which also already removed the source chunk.
|
|
cfg := config.SchemaConfig{Configs: []config.PeriodConfig{t.periodConfig}}
|
|
for _, chk := range chunksNotIndexed {
|
|
chunkRef := logproto.ChunkRef{
|
|
Fingerprint: chk.GetFingerprint(),
|
|
UserID: userID,
|
|
From: chk.GetFrom(),
|
|
Through: chk.GetThrough(),
|
|
Checksum: chk.GetChecksum(),
|
|
}
|
|
chunkExists, err := is.chunkExists(labels, chunkRef)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if chunkExists {
|
|
continue
|
|
}
|
|
chunkID := cfg.ExternalKey(logproto.ChunkRef{
|
|
Fingerprint: chk.GetFingerprint(),
|
|
UserID: userID,
|
|
From: chk.GetFrom(),
|
|
Through: chk.GetThrough(),
|
|
Checksum: chk.GetChecksum(),
|
|
})
|
|
chunksToDelete = append(chunksToDelete, chunkID)
|
|
}
|
|
|
|
return t.tableMarker.MarkChunksForDeletion(t.name, chunksToDelete)
|
|
}
|
|
|
|
// cleanup takes care of cleaning up any local data on disk
|
|
func (t *table) cleanup() {
|
|
for _, is := range t.indexSets {
|
|
is.cleanup()
|
|
}
|
|
|
|
if err := os.RemoveAll(t.workingDirectory); err != nil {
|
|
level.Error(t.logger).Log("msg", fmt.Sprintf("failed to remove working directory %s", t.workingDirectory), "err", err)
|
|
}
|
|
}
|
|
|
|
func (t *table) GetUserIndex(userID string) (retention.SeriesIterator, error) {
|
|
is, ok := t.indexSets[userID]
|
|
if !ok {
|
|
return nil, nil
|
|
}
|
|
|
|
// compactedIndex is only set in indexSet when files have been compacted,
|
|
// so we need to open the compacted index file for applying index updates if compactedIndex is nil
|
|
if is.compactedIndex == nil {
|
|
if err := t.openCompactedIndexForUpdates(is); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return is.compactedIndex, nil
|
|
}
|
|
|
|
// tableHasUncompactedIndex returns true if we have more than "1" common index files.
|
|
// We are checking for more than "1" because earlier boltdb-shipper index type did not have per tenant index so there would be only common index files.
|
|
// In case of per tenant index, it is okay to consider it compacted since having just 1 uncompacted index file for a while should be fine.
|
|
func tableHasUncompactedIndex(ctx context.Context, tableName string, indexStorageClient storage.Client) (bool, error) {
|
|
commonIndexFiles, _, err := indexStorageClient.ListFiles(ctx, tableName, false)
|
|
return len(commonIndexFiles) > 1, err
|
|
}
|
|
|
|
func unsafeGetBytes(s string) []byte {
|
|
return unsafe.Slice(unsafe.StringData(s), len(s)) // #nosec G103 -- we know the string is not mutated -- nosemgrep: use-of-unsafe-block
|
|
}
|
|
|