Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/compactor/table.go

388 lines
13 KiB

package compactor
import (
"context"
"fmt"
"os"
"path/filepath"
"sync"
"unsafe"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/concurrency"
"github.com/prometheus/common/model"
"github.com/grafana/loki/v3/pkg/compactor/deletion"
"github.com/grafana/loki/v3/pkg/compactor/retention"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
chunk_util "github.com/grafana/loki/v3/pkg/storage/chunk/client/util"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/storage"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)
const (
gzipExtension = ".gz"
)
var errFileCountNotOne = fmt.Errorf("can't apply retention or index updates when index file count is not one")
type tableExpirationChecker interface {
IntervalMayHaveExpiredChunks(interval model.Interval, userID string) bool
}
type IndexCompactor interface {
// NewTableCompactor returns a new TableCompactor for compacting a table.
// commonIndexSet refers to common index files or in other words multi-tenant index.
// existingUserIndexSet refers to existing user specific index files in the storage.
// makeEmptyUserIndexSetFunc can be used for creating an empty indexSet for a user
// who does not have an index for it in existingUserIndexSet.
// periodConfig holds the PeriodConfig for the table.
NewTableCompactor(
ctx context.Context,
commonIndexSet IndexSet,
existingUserIndexSet map[string]IndexSet,
makeEmptyUserIndexSetFunc MakeEmptyUserIndexSetFunc,
periodConfig config.PeriodConfig,
) TableCompactor
// OpenCompactedIndexFile opens a compressed index file at given path.
OpenCompactedIndexFile(
ctx context.Context,
path,
tableName,
userID,
workingDir string,
periodConfig config.PeriodConfig,
logger log.Logger,
) (
CompactedIndex,
error,
)
}
type TableCompactor interface {
// CompactTable compacts the table.
// After compaction is done successfully, it should set the new/updated CompactedIndex for relevant IndexSets.
CompactTable() (err error)
}
type Chunk interface {
GetFrom() model.Time
GetThrough() model.Time
GetFingerprint() uint64
GetChecksum() uint32
GetSize() uint32
GetEntriesCount() uint32
}
type MakeEmptyUserIndexSetFunc func(userID string) (IndexSet, error)
type table struct {
name string
workingDirectory string
uploadConcurrency int
indexStorageClient storage.Client
indexCompactor IndexCompactor
tableMarker retention.TableMarker
expirationChecker tableExpirationChecker
periodConfig config.PeriodConfig
baseUserIndexSet, baseCommonIndexSet storage.IndexSet
indexSets map[string]*indexSet
usersWithPerUserIndex []string
logger log.Logger
ctx context.Context
}
func newTable(ctx context.Context, workingDirectory string, indexStorageClient storage.Client,
indexCompactor IndexCompactor, periodConfig config.PeriodConfig,
tableMarker retention.TableMarker, expirationChecker tableExpirationChecker,
uploadConcurrency int,
) (*table, error) {
err := chunk_util.EnsureDirectory(workingDirectory)
if err != nil {
return nil, err
}
table := table{
ctx: ctx,
name: filepath.Base(workingDirectory),
workingDirectory: workingDirectory,
indexStorageClient: indexStorageClient,
indexCompactor: indexCompactor,
tableMarker: tableMarker,
expirationChecker: expirationChecker,
periodConfig: periodConfig,
indexSets: map[string]*indexSet{},
baseUserIndexSet: storage.NewIndexSet(indexStorageClient, true),
baseCommonIndexSet: storage.NewIndexSet(indexStorageClient, false),
uploadConcurrency: uploadConcurrency,
}
table.logger = log.With(util_log.Logger, "table-name", table.name)
return &table, nil
}
func (t *table) compact() error {
t.indexStorageClient.RefreshIndexTableCache(t.ctx, t.name)
indexFiles, usersWithPerUserIndex, err := t.indexStorageClient.ListFiles(t.ctx, t.name, false)
if err != nil {
return err
}
if len(indexFiles) == 0 && len(usersWithPerUserIndex) == 0 {
level.Info(t.logger).Log("msg", "no common index files and user index found")
return nil
}
t.usersWithPerUserIndex = usersWithPerUserIndex
level.Info(t.logger).Log("msg", "listed files", "count", len(indexFiles))
t.indexSets[""], err = newCommonIndexSet(t.ctx, t.name, t.baseCommonIndexSet, t.workingDirectory, t.logger)
if err != nil {
return err
}
// userIndexSets is just for passing it to NewTableCompactor since go considers map[string]*indexSet different type than map[string]IndexSet
userIndexSets := make(map[string]IndexSet, len(t.usersWithPerUserIndex))
for _, userID := range t.usersWithPerUserIndex {
var err error
t.indexSets[userID], err = newUserIndexSet(t.ctx, t.name, userID, t.baseUserIndexSet, filepath.Join(t.workingDirectory, userID), t.logger)
if err != nil {
return err
}
userIndexSets[userID] = t.indexSets[userID]
}
// protect indexSets with mutex so that we are concurrency safe if the TableCompactor calls MakeEmptyUserIndexSetFunc concurrently
indexSetsMtx := sync.Mutex{}
tableCompactor := t.indexCompactor.NewTableCompactor(t.ctx, t.indexSets[""], userIndexSets, func(userID string) (IndexSet, error) {
indexSetsMtx.Lock()
defer indexSetsMtx.Unlock()
indexSet, err := newUserIndexSet(t.ctx, t.name, userID, t.baseUserIndexSet, filepath.Join(t.workingDirectory, userID), t.logger)
if err != nil {
return nil, err
}
t.indexSets[userID] = indexSet
return indexSet, nil
}, t.periodConfig)
return tableCompactor.CompactTable()
}
// done takes care of uploading the index to the object storage and removing any source index files that were compacted away.
// No index updates must be done after calling this method.
func (t *table) done() error {
userIDs := make([]string, 0, len(t.indexSets))
for userID := range t.indexSets {
// indexSet.done() uploads the compacted db and cleans up the source index files.
// For user index sets, the files from common index sets are also a source of index.
// if we cleanup common index sets first, and we fail to upload newly compacted dbs in user index sets, then we will lose data.
// To avoid any data loss, we should call done() on common index sets at the end.
if userID == "" {
continue
}
userIDs = append(userIDs, userID)
}
err := concurrency.ForEachJob(t.ctx, len(userIDs), t.uploadConcurrency, func(_ context.Context, idx int) error {
return t.indexSets[userIDs[idx]].done()
})
if err != nil {
return err
}
if commonIndexSet, ok := t.indexSets[""]; ok {
if err := commonIndexSet.done(); err != nil {
return err
}
}
return nil
}
// applyRetention applies retention on the index sets
func (t *table) applyRetention() error {
tableInterval := retention.ExtractIntervalFromTableName(t.name)
// call runRetention on the index sets which may have expired chunks
for userID, is := range t.indexSets {
// Make sure we do not apply retention on common index set when one of the following is true:
// 1. It got compacted away to the per-user indexes.
// 2. There are no common index files.
if userID == "" && is.compactedIndex == nil && ((is.removeSourceObjects && !is.uploadCompactedDB) || len(is.ListSourceFiles()) == 0) {
continue
}
if !t.expirationChecker.IntervalMayHaveExpiredChunks(tableInterval, userID) {
continue
}
// compactedIndex is only set in indexSet when files have been compacted,
// so we need to open the compacted index file for applying retention if compactedIndex is nil
if is.compactedIndex == nil {
if err := t.openCompactedIndexForUpdates(is); err != nil {
return err
}
}
err := is.runRetention(t.tableMarker)
if err != nil {
return err
}
}
return nil
}
func (t *table) openCompactedIndexForUpdates(idxSet *indexSet) error {
sourceFiles := idxSet.ListSourceFiles()
if len(sourceFiles) != 1 {
return errFileCountNotOne
}
downloadedAt, err := idxSet.GetSourceFile(sourceFiles[0])
if err != nil {
return err
}
compactedIndexFile, err := t.indexCompactor.OpenCompactedIndexFile(t.ctx, downloadedAt, t.name, idxSet.userID, filepath.Join(t.workingDirectory, idxSet.userID), t.periodConfig, idxSet.logger)
if err != nil {
return err
}
idxSet.setCompactedIndex(compactedIndexFile, false, false)
return nil
}
// applyStorageUpdates applies storage updates for a single stream of a user
func (t *table) applyStorageUpdates(userID, labelsStr string, rebuiltChunks map[string]deletion.Chunk, chunksToDeIndex []string) error {
labels, err := syntax.ParseLabels(labelsStr)
if err != nil {
return err
}
is, ok := t.indexSets[userID]
if !ok {
// Index for the user does not exist, likely removed by retention/deletion without line filter.
// Mark all the rebuilt chunks for deletion.
level.Info(util_log.Logger).Log("msg", "user index not found, removing the newly built chunks", "table_name", t.name, "userID", userID)
chunksToDelete := make([]string, 0, len(rebuiltChunks))
cfg := config.SchemaConfig{Configs: []config.PeriodConfig{t.periodConfig}}
for _, newChunk := range rebuiltChunks {
if newChunk == nil {
continue
}
chunkID := cfg.ExternalKey(logproto.ChunkRef{
Fingerprint: newChunk.GetFingerprint(),
UserID: userID,
From: newChunk.GetFrom(),
Through: newChunk.GetThrough(),
Checksum: newChunk.GetChecksum(),
})
chunksToDelete = append(chunksToDelete, chunkID)
}
return t.tableMarker.MarkChunksForDeletion(t.name, chunksToDelete)
}
// compactedIndex is only set in indexSet when files have been compacted,
// so we need to open the compacted index file for applying index updates if compactedIndex is nil
if is.compactedIndex == nil {
if err := t.openCompactedIndexForUpdates(is); err != nil {
return err
}
}
chunksNotIndexed, err := is.applyUpdates(labels, rebuiltChunks, chunksToDeIndex)
if err != nil {
return err
}
// build the list of source chunks to delete
chunksToDelete := make([]string, 0, len(rebuiltChunks)+len(chunksNotIndexed))
for chunkID := range rebuiltChunks {
chunksToDelete = append(chunksToDelete, chunkID)
}
// Remove the newly built chunks which were not indexed due to their source chunks missing from the current index.
// Source chunks could be deleted by retention or delete requests without line filters.
// However, since storage updates are supposed to be idempotent, see if the chunk was already indexed in previous attempts which also already removed the source chunk.
cfg := config.SchemaConfig{Configs: []config.PeriodConfig{t.periodConfig}}
for _, chk := range chunksNotIndexed {
chunkRef := logproto.ChunkRef{
Fingerprint: chk.GetFingerprint(),
UserID: userID,
From: chk.GetFrom(),
Through: chk.GetThrough(),
Checksum: chk.GetChecksum(),
}
chunkExists, err := is.chunkExists(labels, chunkRef)
if err != nil {
return err
}
if chunkExists {
continue
}
chunkID := cfg.ExternalKey(logproto.ChunkRef{
Fingerprint: chk.GetFingerprint(),
UserID: userID,
From: chk.GetFrom(),
Through: chk.GetThrough(),
Checksum: chk.GetChecksum(),
})
chunksToDelete = append(chunksToDelete, chunkID)
}
return t.tableMarker.MarkChunksForDeletion(t.name, chunksToDelete)
}
// cleanup takes care of cleaning up any local data on disk
func (t *table) cleanup() {
for _, is := range t.indexSets {
is.cleanup()
}
if err := os.RemoveAll(t.workingDirectory); err != nil {
level.Error(t.logger).Log("msg", fmt.Sprintf("failed to remove working directory %s", t.workingDirectory), "err", err)
}
}
func (t *table) GetUserIndex(userID string) (retention.SeriesIterator, error) {
is, ok := t.indexSets[userID]
if !ok {
return nil, nil
}
// compactedIndex is only set in indexSet when files have been compacted,
// so we need to open the compacted index file for applying index updates if compactedIndex is nil
if is.compactedIndex == nil {
if err := t.openCompactedIndexForUpdates(is); err != nil {
return nil, err
}
}
return is.compactedIndex, nil
}
// tableHasUncompactedIndex returns true if we have more than "1" common index files.
// We are checking for more than "1" because earlier boltdb-shipper index type did not have per tenant index so there would be only common index files.
// In case of per tenant index, it is okay to consider it compacted since having just 1 uncompacted index file for a while should be fine.
func tableHasUncompactedIndex(ctx context.Context, tableName string, indexStorageClient storage.Client) (bool, error) {
commonIndexFiles, _, err := indexStorageClient.ListFiles(ctx, tableName, false)
return len(commonIndexFiles) > 1, err
}
func unsafeGetBytes(s string) []byte {
return unsafe.Slice(unsafe.StringData(s), len(s)) // #nosec G103 -- we know the string is not mutated -- nosemgrep: use-of-unsafe-block
}