Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/dataobj/metastore/object.go

725 lines
22 KiB

package metastore
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"iter"
"maps"
"slices"
"sort"
"strings"
"sync"
"time"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/scalar"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/tenant"
"github.com/grafana/dskit/user"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/thanos-io/objstore"
"golang.org/x/sync/errgroup"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/metastore/multitenancy"
"github.com/grafana/loki/v3/pkg/dataobj/sections/indexpointers"
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
"github.com/grafana/loki/v3/pkg/dataobj/sections/pointers"
"github.com/grafana/loki/v3/pkg/dataobj/sections/streams"
utillog "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/xcap"
)
const (
metastoreWindowSize = 12 * time.Hour
)
type ObjectMetastore struct {
bucket objstore.Bucket
parallelism int
logger log.Logger
metrics *objectMetastoreMetrics
}
type SectionKey struct {
ObjectPath string
SectionIdx int64
}
type DataobjSectionDescriptor struct {
SectionKey
StreamIDs []int64
RowCount int
Size int64
Start time.Time
End time.Time
}
func NewSectionDescriptor(pointer pointers.SectionPointer) *DataobjSectionDescriptor {
return &DataobjSectionDescriptor{
SectionKey: SectionKey{
ObjectPath: pointer.Path,
SectionIdx: pointer.Section,
},
StreamIDs: []int64{pointer.StreamIDRef},
RowCount: int(pointer.LineCount),
Size: pointer.UncompressedSize,
Start: pointer.StartTs,
End: pointer.EndTs,
}
}
func (d *DataobjSectionDescriptor) Merge(pointer pointers.SectionPointer) {
d.StreamIDs = append(d.StreamIDs, pointer.StreamIDRef)
d.RowCount += int(pointer.LineCount)
d.Size += pointer.UncompressedSize
if pointer.StartTs.Before(d.Start) {
d.Start = pointer.StartTs
}
if pointer.EndTs.After(d.End) {
d.End = pointer.EndTs
}
}
// Table of Content files are stored in well-known locations that can be computed from a known time.
func tableOfContentsPath(window time.Time) string {
return fmt.Sprintf("tocs/%s.toc", strings.ReplaceAll(window.Format(time.RFC3339), ":", "_"))
}
func iterTableOfContentsPaths(start, end time.Time) iter.Seq2[string, multitenancy.TimeRange] {
minTocWindow := start.Truncate(metastoreWindowSize).UTC()
maxTocWindow := end.Truncate(metastoreWindowSize).UTC()
return func(yield func(t string, timeRange multitenancy.TimeRange) bool) {
for tocWindow := minTocWindow; !tocWindow.After(maxTocWindow); tocWindow = tocWindow.Add(metastoreWindowSize) {
tocTimeRange := multitenancy.TimeRange{
MinTime: tocWindow,
MaxTime: tocWindow.Add(metastoreWindowSize),
}
if !yield(tableOfContentsPath(tocWindow), tocTimeRange) {
return
}
}
}
}
func NewObjectMetastore(bucket objstore.Bucket, logger log.Logger, reg prometheus.Registerer) *ObjectMetastore {
store := &ObjectMetastore{
bucket: bucket,
parallelism: 64,
logger: logger,
metrics: newObjectMetastoreMetrics(),
}
if reg != nil {
store.metrics.register(reg)
}
return store
}
func matchersToString(matchers []*labels.Matcher) string {
var s strings.Builder
s.WriteString("{")
for i, m := range matchers {
if i > 0 {
s.WriteString(",")
}
s.WriteString(m.String())
}
s.WriteString("}")
return s.String()
}
func (m *ObjectMetastore) streams(ctx context.Context, start, end time.Time, matchers ...*labels.Matcher) ([]*labels.Labels, error) {
tenantID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}
level.Debug(utillog.WithContext(ctx, m.logger)).Log("msg", "ObjectMetastore.Streams", "tenant", tenantID, "start", start, "end", end, "matchers", matchersToString(matchers))
// Get all metastore paths for the time range
var (
tablePaths []string
)
for path := range iterTableOfContentsPaths(start, end) {
tablePaths = append(tablePaths, path)
}
// List objects from all stores concurrently
paths, err := m.listObjectsFromTables(ctx, tablePaths, start, end)
if err != nil {
return nil, err
}
// Search the stream sections of the matching objects to find matching streams
predicate := streamPredicateFromMatchers(start, end, matchers...)
return m.listStreamsFromObjects(ctx, paths, predicate)
}
func (m *ObjectMetastore) Sections(ctx context.Context, start, end time.Time, matchers []*labels.Matcher, predicates []*labels.Matcher) ([]*DataobjSectionDescriptor, error) {
ctx, region := xcap.StartRegion(ctx, "ObjectMetastore.Sections")
defer region.End()
sectionsTimer := prometheus.NewTimer(m.metrics.resolvedSectionsTotalDuration)
// Get all metastore paths for the time range
var tablePaths []string
for path := range iterTableOfContentsPaths(start, end) {
tablePaths = append(tablePaths, path)
}
// Return early if no toc files are found
if len(tablePaths) == 0 {
m.metrics.indexObjectsTotal.Observe(0)
m.metrics.resolvedSectionsTotal.Observe(0)
level.Debug(utillog.WithContext(ctx, m.logger)).Log("msg", "no sections resolved", "reason", "no toc paths")
return nil, nil
}
// List index objects from all tables concurrently
indexPaths, err := m.listObjectsFromTables(ctx, tablePaths, start, end)
if err != nil {
return nil, err
}
m.metrics.indexObjectsTotal.Observe(float64(len(indexPaths)))
region.Record(xcap.StatMetastoreIndexObjects.Observe(int64(len(indexPaths))))
// Return early if no index files are found
if len(indexPaths) == 0 {
m.metrics.resolvedSectionsTotal.Observe(0)
level.Debug(utillog.WithContext(ctx, m.logger)).Log("msg", "no sections resolved", "reason", "no index paths")
return nil, nil
}
// init index files
indexObjects := make([]*dataobj.Object, len(indexPaths))
g, initCtx := errgroup.WithContext(ctx)
g.SetLimit(m.parallelism)
for idx, indexPath := range indexPaths {
g.Go(func() error {
indexObjects[idx], err = dataobj.FromBucket(initCtx, m.bucket, indexPath)
return err
})
}
if err := g.Wait(); err != nil {
return nil, err
}
// Search the stream sections of the matching objects to find matching streams
streamMatchers := streamPredicateFromMatchers(start, end, matchers...)
streamSectionPointers, err := m.getSectionsForStreams(ctx, indexObjects, streamMatchers, start, end)
if err != nil {
return nil, err
}
initialSectionPointersCount := len(streamSectionPointers)
if len(predicates) > 0 {
// Search the section AMQs to estimate sections that might match the predicates
// AMQs may return false positives so this is an over-estimate.
//
// Only match one predicate at a time in order to obtain the intersection of all estimates, rather than the union.
for _, predicate := range predicates {
if predicate.Type != labels.MatchEqual {
continue
}
matchedSections, err := m.estimateSectionsForMatcher(ctx, indexObjects, predicate)
if err != nil {
return nil, err
}
streamSectionPointers = intersectSections(streamSectionPointers, matchedSections)
if len(streamSectionPointers) == 0 {
level.Debug(utillog.WithContext(ctx, m.logger)).Log("msg", "no sections resolved", "reason", "no matching predicates")
// Short circuit here if no sections match the predicates
return streamSectionPointers, nil
}
}
}
duration := sectionsTimer.ObserveDuration()
m.metrics.resolvedSectionsTotal.Observe(float64(len(streamSectionPointers)))
m.metrics.resolvedSectionsRatio.Observe(float64(len(streamSectionPointers)) / float64(initialSectionPointersCount))
region.Record(xcap.StatMetastoreResolvedSections.Observe(int64(len(streamSectionPointers))))
level.Debug(utillog.WithContext(ctx, m.logger)).Log(
"msg", "resolved sections",
"duration", duration,
"tables", len(tablePaths),
"indexes", len(indexPaths),
"sections", len(streamSectionPointers),
"ratio", float64(len(streamSectionPointers))/float64(initialSectionPointersCount),
"matchers", matchersToString(matchers),
"start", start,
"end", end,
)
return streamSectionPointers, nil
}
func intersectSections(sectionPointers []*DataobjSectionDescriptor, sectionMembershipEstimates []SectionKey) []*DataobjSectionDescriptor {
existence := make(map[SectionKey]struct{}, len(sectionMembershipEstimates))
for _, section := range sectionMembershipEstimates {
existence[section] = struct{}{}
}
nextEmptyIdx := 0
key := SectionKey{}
for _, section := range sectionPointers {
key.ObjectPath = section.ObjectPath
key.SectionIdx = section.SectionIdx
if _, ok := existence[key]; ok {
sectionPointers[nextEmptyIdx] = section
nextEmptyIdx++
}
}
return sectionPointers[:nextEmptyIdx]
}
func (m *ObjectMetastore) DataObjects(ctx context.Context, start, end time.Time, _ ...*labels.Matcher) ([]string, error) {
tenantID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}
level.Debug(utillog.WithContext(ctx, m.logger)).Log("msg", "ObjectMetastore.DataObjects", "tenant", tenantID, "start", start, "end", end)
// Get all metastore paths for the time range
var tablePaths []string
for path := range iterTableOfContentsPaths(start, end) {
tablePaths = append(tablePaths, path)
}
// List objects from all tables concurrently
return m.listObjectsFromTables(ctx, tablePaths, start, end)
}
func (m *ObjectMetastore) Labels(ctx context.Context, start, end time.Time, matchers ...*labels.Matcher) ([]string, error) {
uniqueLabels := map[string]struct{}{}
err := m.forEachLabel(ctx, start, end, func(label labels.Label) {
if _, ok := uniqueLabels[label.Name]; !ok {
uniqueLabels[label.Name] = struct{}{}
}
}, matchers...)
return slices.Collect(maps.Keys(uniqueLabels)), err
}
func (m *ObjectMetastore) Values(ctx context.Context, start, end time.Time, matchers ...*labels.Matcher) ([]string, error) {
values := map[string]struct{}{}
err := m.forEachLabel(ctx, start, end, func(label labels.Label) {
if _, ok := values[label.Value]; !ok {
values[label.Value] = struct{}{}
}
}, matchers...)
return slices.Collect(maps.Keys(values)), err
}
func (m *ObjectMetastore) forEachLabel(ctx context.Context, start, end time.Time, foreach func(labels.Label), matchers ...*labels.Matcher) error {
streams, err := m.streams(ctx, start, end, matchers...)
if err != nil {
return err
}
for _, streamLabels := range streams {
if streamLabels == nil {
continue
}
streamLabels.Range(func(streamLabel labels.Label) {
foreach(streamLabel)
})
}
return nil
}
func streamPredicateFromMatchers(start, end time.Time, matchers ...*labels.Matcher) streams.RowPredicate {
if len(matchers) == 0 {
return nil
}
predicates := make([]streams.RowPredicate, 0, len(matchers)+1)
predicates = append(predicates, streams.TimeRangeRowPredicate{
StartTime: start,
EndTime: end,
IncludeStart: true,
IncludeEnd: true,
})
for _, matcher := range matchers {
switch matcher.Type {
case labels.MatchEqual:
predicates = append(predicates, streams.LabelMatcherRowPredicate{
Name: matcher.Name,
Value: matcher.Value,
})
case labels.MatchNotEqual:
predicates = append(predicates, streams.NotRowPredicate{
Inner: streams.LabelMatcherRowPredicate{
Name: matcher.Name,
Value: matcher.Value,
},
})
case labels.MatchRegexp:
predicates = append(predicates, streams.LabelFilterRowPredicate{
Name: matcher.Name,
Keep: func(_, value string) bool {
return matcher.Matches(value)
},
})
case labels.MatchNotRegexp:
predicates = append(predicates, streams.NotRowPredicate{
Inner: streams.LabelFilterRowPredicate{
Name: matcher.Name,
Keep: func(_, value string) bool {
return !matcher.Matches(value)
},
},
})
}
}
if len(predicates) == 1 {
return predicates[0]
}
current := predicates[0]
for _, predicate := range predicates[1:] {
and := streams.AndRowPredicate{
Left: predicate,
Right: current,
}
current = and
}
return current
}
// listObjectsFromTables concurrently lists objects from multiple metastore files
func (m *ObjectMetastore) listObjectsFromTables(ctx context.Context, tablePaths []string, start, end time.Time) ([]string, error) {
objects := make([][]string, len(tablePaths))
g, ctx := errgroup.WithContext(ctx)
sStart := scalar.NewTimestampScalar(arrow.Timestamp(start.UnixNano()), arrow.FixedWidthTypes.Timestamp_ns)
sEnd := scalar.NewTimestampScalar(arrow.Timestamp(end.UnixNano()), arrow.FixedWidthTypes.Timestamp_ns)
for i, path := range tablePaths {
g.Go(func() error {
var err error
objects[i], err = m.listObjects(ctx, path, sStart, sEnd)
// If the metastore object is not found, it means it's outside of any existing window
// and we can safely ignore it.
if err != nil && !m.bucket.IsObjNotFoundErr(err) {
return fmt.Errorf("listing objects from metastore %s: %w", path, err)
}
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return dedupeAndSort(objects), nil
}
func (m *ObjectMetastore) listStreamsFromObjects(ctx context.Context, paths []string, predicate streams.RowPredicate) ([]*labels.Labels, error) {
mu := sync.Mutex{}
foundStreams := make(map[uint64][]*labels.Labels, 1024)
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(m.parallelism)
for _, path := range paths {
g.Go(func() error {
object, err := dataobj.FromBucket(ctx, m.bucket, path)
if err != nil {
return fmt.Errorf("getting object from bucket: %w", err)
}
return forEachStream(ctx, object, predicate, func(stream streams.Stream) {
addLabels(&mu, foundStreams, &stream.Labels)
})
})
}
if err := g.Wait(); err != nil {
return nil, err
}
streamsSlice := make([]*labels.Labels, 0, len(foundStreams))
for _, labels := range foundStreams {
streamsSlice = append(streamsSlice, labels...)
}
return streamsSlice, nil
}
func (m *ObjectMetastore) listStreamIDsFromLogObjects(ctx context.Context, objectPaths []string, predicate streams.RowPredicate) ([][]int64, []int, error) {
streamIDs := make([][]int64, len(objectPaths))
sections := make([]int, len(objectPaths))
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(m.parallelism)
for idx, objectPath := range objectPaths {
g.Go(func() error {
object, err := dataobj.FromBucket(ctx, m.bucket, objectPath)
if err != nil {
return fmt.Errorf("getting object from bucket: %w", err)
}
sections[idx] = object.Sections().Count(logs.CheckSection)
streamIDs[idx] = make([]int64, 0, 8)
return forEachStream(ctx, object, predicate, func(stream streams.Stream) {
streamIDs[idx] = append(streamIDs[idx], stream.ID)
})
})
}
if err := g.Wait(); err != nil {
return nil, nil, err
}
return streamIDs, sections, nil
}
// getSectionsForStreams reads the section data from matching streams and aggregates them into section descriptors.
// This is an exact lookup and includes metadata from the streams in each section: the stream IDs, the min-max timestamps, the number of bytes & number of lines.
func (m *ObjectMetastore) getSectionsForStreams(ctx context.Context, indexObjects []*dataobj.Object, streamPredicate streams.RowPredicate, start, end time.Time) ([]*DataobjSectionDescriptor, error) {
if streamPredicate == nil {
// At least one stream matcher is required, currently.
return nil, nil
}
sStart := scalar.NewTimestampScalar(arrow.Timestamp(start.UnixNano()), arrow.FixedWidthTypes.Timestamp_ns)
sEnd := scalar.NewTimestampScalar(arrow.Timestamp(end.UnixNano()), arrow.FixedWidthTypes.Timestamp_ns)
timer := prometheus.NewTimer(m.metrics.streamFilterTotalDuration)
defer timer.ObserveDuration()
var sectionDescriptors []*DataobjSectionDescriptor
sectionDescriptorsMutex := sync.Mutex{}
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(m.parallelism)
for _, indexObject := range indexObjects {
g.Go(func() error {
var key SectionKey
var matchingStreamIDs []int64
streamReadTimer := prometheus.NewTimer(m.metrics.streamFilterStreamsReadDuration)
err := forEachStream(ctx, indexObject, streamPredicate, func(stream streams.Stream) {
matchingStreamIDs = append(matchingStreamIDs, stream.ID)
})
if err != nil {
return fmt.Errorf("reading streams from index: %w", err)
}
streamReadTimer.ObserveDuration()
if len(matchingStreamIDs) == 0 {
// No streams match, so skip reading the section pointers or we'll match all of them.
return nil
}
objectSectionDescriptors := make(map[SectionKey]*DataobjSectionDescriptor)
sectionPointerReadTimer := prometheus.NewTimer(m.metrics.streamFilterPointersReadDuration)
err = forEachStreamSectionPointer(ctx, indexObject, sStart, sEnd, matchingStreamIDs, func(pointer pointers.SectionPointer) {
key.ObjectPath = pointer.Path
key.SectionIdx = pointer.Section
sectionDescriptor, ok := objectSectionDescriptors[key]
if !ok {
objectSectionDescriptors[key] = NewSectionDescriptor(pointer)
return
}
sectionDescriptor.Merge(pointer)
})
if err != nil {
return fmt.Errorf("reading section pointers from index: %w", err)
}
sectionPointerReadTimer.ObserveDuration()
// Merge the section descriptors for the object into the global section descriptors in one batch
sectionDescriptorsMutex.Lock()
for _, sectionDescriptor := range objectSectionDescriptors {
sectionDescriptors = append(sectionDescriptors, sectionDescriptor)
}
sectionDescriptorsMutex.Unlock()
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
m.metrics.streamFilterSections.Observe(float64(len(sectionDescriptors)))
return sectionDescriptors, nil
}
// estimateSectionsForMatcher checks the matcher against the section AMQs to determine approximate section membership.
// This is an inexact lookup and only returns probable sections: there may be false positives, but no true negatives. There is no additional metadata returned beyond the section info.
func (m *ObjectMetastore) estimateSectionsForMatcher(ctx context.Context, indexObjects []*dataobj.Object, matcher *labels.Matcher) ([]SectionKey, error) {
timer := prometheus.NewTimer(m.metrics.estimateSectionsTotalDuration)
defer timer.ObserveDuration()
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(m.parallelism)
sColumnName := scalar.NewStringScalar(matcher.Name)
var matchedSections []SectionKey
var matchedSectionsMu sync.Mutex
for _, indexObject := range indexObjects {
g.Go(func() error {
pointerReadTimer := prometheus.NewTimer(m.metrics.estimateSectionsPointerReadDuration)
var objMatchedSections []SectionKey
err := forEachMatchedPointerSectionKey(ctx, indexObject, sColumnName, matcher.Value, func(sk SectionKey) {
objMatchedSections = append(objMatchedSections, sk)
})
if err != nil {
return fmt.Errorf("reading section keys: %w", err)
}
pointerReadTimer.ObserveDuration()
if len(objMatchedSections) == 0 {
// TODO(benclive): Find a way to differentiate between unknown columns and columns missing the target value.
// For now, log a warning to track how often this happens.
level.Warn(m.logger).Log("msg", "no section keys found for column")
}
matchedSectionsMu.Lock()
matchedSections = append(matchedSections, objMatchedSections...)
matchedSectionsMu.Unlock()
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
m.metrics.estimateSectionsSections.Observe(float64(len(matchedSections)))
return matchedSections, nil
}
func addLabels(mtx *sync.Mutex, streams map[uint64][]*labels.Labels, newLabels *labels.Labels) {
mtx.Lock()
defer mtx.Unlock()
key := labels.StableHash(*newLabels)
matches, ok := streams[key]
if !ok {
streams[key] = append(streams[key], newLabels)
return
}
for _, lbs := range matches {
if labels.Equal(*lbs, *newLabels) {
return
}
}
streams[key] = append(streams[key], newLabels)
}
func (m *ObjectMetastore) listObjects(ctx context.Context, path string, sStart, sEnd *scalar.Timestamp) ([]string, error) {
var buf bytes.Buffer
objectReader, err := m.bucket.Get(ctx, path)
if err != nil {
return nil, err
}
defer objectReader.Close()
n, err := buf.ReadFrom(objectReader)
if err != nil {
return nil, fmt.Errorf("reading metastore object: %w", err)
}
object, err := dataobj.FromReaderAt(bytes.NewReader(buf.Bytes()), n)
if err != nil {
return nil, fmt.Errorf("getting object from reader: %w", err)
}
var objectPaths []string
err = forEachIndexPointer(ctx, object, sStart, sEnd, func(indexPointer indexpointers.IndexPointer) {
objectPaths = append(objectPaths, indexPointer.Path)
})
if err != nil {
return nil, err
}
return objectPaths, nil
}
func forEachStream(ctx context.Context, object *dataobj.Object, predicate streams.RowPredicate, f func(streams.Stream)) error {
targetTenant, err := user.ExtractOrgID(ctx)
if err != nil {
return fmt.Errorf("extracting org ID: %w", err)
}
var reader streams.RowReader
defer reader.Close()
buf := make([]streams.Stream, 1024)
for _, section := range object.Sections().Filter(streams.CheckSection) {
if section.Tenant != targetTenant {
continue
}
sec, err := streams.Open(ctx, section)
if err != nil {
return fmt.Errorf("opening section: %w", err)
}
reader.Reset(sec)
if predicate != nil {
err := reader.SetPredicate(predicate)
if err != nil {
return err
}
}
for {
num, err := reader.Read(ctx, buf)
if err != nil && !errors.Is(err, io.EOF) {
return err
}
if num == 0 && errors.Is(err, io.EOF) {
break
}
for _, stream := range buf[:num] {
f(stream)
}
}
}
return nil
}
// dedupeAndSort takes a slice of string slices and returns a sorted slice of unique strings
func dedupeAndSort(objects [][]string) []string {
uniquePaths := make(map[string]struct{})
for _, batch := range objects {
for _, path := range batch {
uniquePaths[path] = struct{}{}
}
}
paths := make([]string, 0, len(uniquePaths))
for path := range uniquePaths {
paths = append(paths, path)
}
sort.Strings(paths)
return paths
}