mirror of https://github.com/grafana/loki
chore: Remove dataobj querier component (#19132)
parent
18b8995a32
commit
383032bccd
@ -1,323 +0,0 @@ |
||||
package querier |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"io" |
||||
"slices" |
||||
"sync" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs" |
||||
"github.com/grafana/loki/v3/pkg/dataobj/sections/streams" |
||||
"github.com/grafana/loki/v3/pkg/iter" |
||||
"github.com/grafana/loki/v3/pkg/logproto" |
||||
"github.com/grafana/loki/v3/pkg/logql" |
||||
"github.com/grafana/loki/v3/pkg/logql/log" |
||||
"github.com/grafana/loki/v3/pkg/logql/syntax" |
||||
"github.com/grafana/loki/v3/pkg/logqlmodel/stats" |
||||
"github.com/grafana/loki/v3/pkg/util/topk" |
||||
) |
||||
|
||||
var ( |
||||
recordsPool = sync.Pool{ |
||||
New: func() interface{} { |
||||
records := make([]logs.Record, 1024) |
||||
return &records |
||||
}, |
||||
} |
||||
samplesPool = sync.Pool{ |
||||
New: func() interface{} { |
||||
samples := make([]logproto.Sample, 0, 1024) |
||||
return &samples |
||||
}, |
||||
} |
||||
entryWithLabelsPool = sync.Pool{ |
||||
New: func() interface{} { |
||||
entries := make([]entryWithLabels, 0, 1024) |
||||
return &entries |
||||
}, |
||||
} |
||||
) |
||||
|
||||
type entryWithLabels struct { |
||||
Labels string |
||||
StreamHash uint64 |
||||
Entry logproto.Entry |
||||
} |
||||
|
||||
// newEntryIterator creates a new EntryIterator for the given context, streams, and reader.
|
||||
// It reads records from the reader and adds them to the topk heap based on the direction.
|
||||
// The topk heap is used to maintain the top k entries based on the direction.
|
||||
// The final result is returned as a slice of entries.
|
||||
func newEntryIterator(ctx context.Context, |
||||
streams map[int64]streams.Stream, |
||||
reader *logs.RowReader, |
||||
req logql.SelectLogParams, |
||||
) (iter.EntryIterator, error) { |
||||
bufPtr := recordsPool.Get().(*[]logs.Record) |
||||
defer recordsPool.Put(bufPtr) |
||||
buf := *bufPtr |
||||
|
||||
selector, err := req.LogSelector() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
pipeline, err := selector.Pipeline() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
var ( |
||||
prevStreamID int64 = -1 |
||||
streamExtractor log.StreamPipeline |
||||
streamHash uint64 |
||||
top = topk.Heap[entryWithLabels]{ |
||||
Limit: int(req.Limit), |
||||
Less: lessFn(req.Direction), |
||||
} |
||||
|
||||
statistics = stats.FromContext(ctx) |
||||
) |
||||
|
||||
for { |
||||
n, err := reader.Read(ctx, buf) |
||||
if err != nil && err != io.EOF { |
||||
return nil, fmt.Errorf("failed to read log records: %w", err) |
||||
} |
||||
|
||||
if n == 0 && err == io.EOF { |
||||
break |
||||
} |
||||
|
||||
for _, record := range buf[:n] { |
||||
stream, ok := streams[record.StreamID] |
||||
if !ok { |
||||
continue |
||||
} |
||||
if prevStreamID != record.StreamID { |
||||
streamExtractor = pipeline.ForStream(stream.Labels) |
||||
streamHash = streamExtractor.BaseLabels().Hash() |
||||
prevStreamID = record.StreamID |
||||
} |
||||
|
||||
timestamp := record.Timestamp.UnixNano() |
||||
line, parsedLabels, ok := streamExtractor.Process(timestamp, record.Line, record.Metadata) |
||||
if !ok { |
||||
continue |
||||
} |
||||
statistics.AddPostFilterRows(1) |
||||
|
||||
top.Push(entryWithLabels{ |
||||
Labels: parsedLabels.String(), |
||||
StreamHash: streamHash, |
||||
Entry: logproto.Entry{ |
||||
Timestamp: record.Timestamp, |
||||
Line: string(line), |
||||
StructuredMetadata: logproto.FromLabelsToLabelAdapters(parsedLabels.StructuredMetadata()), |
||||
Parsed: logproto.FromLabelsToLabelAdapters(parsedLabels.Parsed()), |
||||
}, |
||||
}) |
||||
} |
||||
} |
||||
|
||||
return heapIterator(&top), nil |
||||
} |
||||
|
||||
func lessFn(direction logproto.Direction) func(a, b entryWithLabels) bool { |
||||
switch direction { |
||||
case logproto.FORWARD: |
||||
return func(a, b entryWithLabels) bool { |
||||
if a.Entry.Timestamp.Equal(b.Entry.Timestamp) { |
||||
return a.Labels < b.Labels |
||||
} |
||||
return a.Entry.Timestamp.After(b.Entry.Timestamp) |
||||
} |
||||
case logproto.BACKWARD: |
||||
return func(a, b entryWithLabels) bool { |
||||
if a.Entry.Timestamp.Equal(b.Entry.Timestamp) { |
||||
return a.Labels < b.Labels |
||||
} |
||||
return a.Entry.Timestamp.Before(b.Entry.Timestamp) |
||||
} |
||||
default: |
||||
panic("invalid direction") |
||||
} |
||||
} |
||||
|
||||
// heapIterator creates a new EntryIterator for the given topk heap. After
|
||||
// calling heapIterator, h is emptied.
|
||||
func heapIterator(h *topk.Heap[entryWithLabels]) iter.EntryIterator { |
||||
elems := h.PopAll() |
||||
|
||||
// We need to reverse the order of the entries in the slice to maintain the order of logs we
|
||||
// want to return:
|
||||
//
|
||||
// For FORWARD direction, we want smallest timestamps first (but the heap is
|
||||
// ordered by largest timestamps first due to lessFn).
|
||||
//
|
||||
// For BACKWARD direction, we want largest timestamps first (but the heap is
|
||||
// ordered by smallest timestamps first due to lessFn).
|
||||
slices.Reverse(elems) |
||||
|
||||
return &sliceIterator{entries: elems} |
||||
} |
||||
|
||||
type sliceIterator struct { |
||||
entries []entryWithLabels |
||||
curr entryWithLabels |
||||
} |
||||
|
||||
func (s *sliceIterator) Next() bool { |
||||
if len(s.entries) == 0 { |
||||
return false |
||||
} |
||||
s.curr = s.entries[0] |
||||
s.entries = s.entries[1:] |
||||
return true |
||||
} |
||||
|
||||
func (s *sliceIterator) At() logproto.Entry { |
||||
return s.curr.Entry |
||||
} |
||||
|
||||
func (s *sliceIterator) Err() error { |
||||
return nil |
||||
} |
||||
|
||||
func (s *sliceIterator) Labels() string { |
||||
return s.curr.Labels |
||||
} |
||||
|
||||
func (s *sliceIterator) StreamHash() uint64 { |
||||
return s.curr.StreamHash |
||||
} |
||||
|
||||
func (s *sliceIterator) Close() error { |
||||
clear(s.entries) |
||||
entryWithLabelsPool.Put(&s.entries) |
||||
return nil |
||||
} |
||||
|
||||
func newSampleIterator(ctx context.Context, |
||||
streamsMap map[int64]streams.Stream, |
||||
extractors []syntax.SampleExtractor, |
||||
reader *logs.RowReader, |
||||
) (iter.SampleIterator, error) { |
||||
bufPtr := recordsPool.Get().(*[]logs.Record) |
||||
defer recordsPool.Put(bufPtr) |
||||
buf := *bufPtr |
||||
|
||||
var ( |
||||
iterators []iter.SampleIterator |
||||
prevStreamID int64 = -1 |
||||
streamExtractor log.StreamSampleExtractor |
||||
series = map[string]*logproto.Series{} |
||||
streamHash uint64 |
||||
) |
||||
|
||||
statistics := stats.FromContext(ctx) |
||||
// For dataobjs, this maps to sections downloaded
|
||||
statistics.AddChunksDownloaded(1) |
||||
|
||||
for { |
||||
n, err := reader.Read(ctx, buf) |
||||
if err != nil && err != io.EOF { |
||||
return nil, fmt.Errorf("failed to read log records: %w", err) |
||||
} |
||||
|
||||
// Handle end of stream or empty read
|
||||
if n == 0 && err == io.EOF { |
||||
iterators = appendIteratorFromSeries(iterators, series) |
||||
break |
||||
} |
||||
|
||||
// Process records in the current batch
|
||||
for _, record := range buf[:n] { |
||||
stream, ok := streamsMap[record.StreamID] |
||||
if !ok { |
||||
continue |
||||
} |
||||
|
||||
for _, extractor := range extractors { |
||||
// Handle stream transition
|
||||
if prevStreamID != record.StreamID { |
||||
iterators = appendIteratorFromSeries(iterators, series) |
||||
clear(series) |
||||
streamExtractor = extractor.ForStream(stream.Labels) |
||||
streamHash = streamExtractor.BaseLabels().Hash() |
||||
prevStreamID = record.StreamID |
||||
} |
||||
|
||||
// Process the record
|
||||
timestamp := record.Timestamp.UnixNano() |
||||
|
||||
statistics.AddDecompressedLines(1) |
||||
samples, ok := streamExtractor.Process(timestamp, record.Line, record.Metadata) |
||||
if !ok { |
||||
continue |
||||
} |
||||
statistics.AddPostFilterLines(1) |
||||
|
||||
for _, sample := range samples { |
||||
|
||||
parsedLabels := sample.Labels |
||||
value := sample.Value |
||||
|
||||
// Get or create series for the parsed labels
|
||||
labelString := parsedLabels.String() |
||||
s, exists := series[labelString] |
||||
if !exists { |
||||
s = createNewSeries(labelString, streamHash) |
||||
series[labelString] = s |
||||
} |
||||
|
||||
// Add sample to the series
|
||||
s.Samples = append(s.Samples, logproto.Sample{ |
||||
Timestamp: timestamp, |
||||
Value: value, |
||||
Hash: 0, |
||||
}) |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
if len(iterators) == 0 { |
||||
return iter.NoopSampleIterator, nil |
||||
} |
||||
|
||||
return iter.NewSortSampleIterator(iterators), nil |
||||
} |
||||
|
||||
// createNewSeries creates a new Series for the given labels and stream hash
|
||||
func createNewSeries(labels string, streamHash uint64) *logproto.Series { |
||||
samplesPtr := samplesPool.Get().(*[]logproto.Sample) |
||||
samples := *samplesPtr |
||||
return &logproto.Series{ |
||||
Labels: labels, |
||||
Samples: samples[:0], |
||||
StreamHash: streamHash, |
||||
} |
||||
} |
||||
|
||||
// appendIteratorFromSeries appends a new SampleIterator to the given list of iterators
|
||||
func appendIteratorFromSeries(iterators []iter.SampleIterator, series map[string]*logproto.Series) []iter.SampleIterator { |
||||
if len(series) == 0 { |
||||
return iterators |
||||
} |
||||
|
||||
seriesResult := make([]logproto.Series, 0, len(series)) |
||||
for _, s := range series { |
||||
seriesResult = append(seriesResult, *s) |
||||
} |
||||
|
||||
return append(iterators, iter.SampleIteratorWithClose( |
||||
iter.NewMultiSeriesIterator(seriesResult), |
||||
func() error { |
||||
for _, s := range seriesResult { |
||||
samplesPool.Put(&s.Samples) |
||||
} |
||||
return nil |
||||
}, |
||||
)) |
||||
} |
||||
@ -1,124 +0,0 @@ |
||||
package querier |
||||
|
||||
import ( |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/stretchr/testify/require" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/logproto" |
||||
"github.com/grafana/loki/v3/pkg/util/topk" |
||||
) |
||||
|
||||
// makeEntry is a helper function to create a log entry with given timestamp and line
|
||||
func makeEntry(ts time.Time, line string) logproto.Entry { |
||||
return logproto.Entry{ |
||||
Timestamp: ts, |
||||
Line: line, |
||||
} |
||||
} |
||||
|
||||
func TestTopKIterator(t *testing.T) { |
||||
tests := []struct { |
||||
name string |
||||
k int |
||||
direction logproto.Direction |
||||
input []entryWithLabels |
||||
want []entryWithLabels |
||||
}{ |
||||
{ |
||||
name: "forward direction with k=2", |
||||
k: 2, |
||||
direction: logproto.FORWARD, |
||||
input: []entryWithLabels{ |
||||
{Entry: makeEntry(time.Unix(1, 0), "line1"), Labels: "{app=\"app1\"}", StreamHash: 1}, |
||||
{Entry: makeEntry(time.Unix(3, 0), "line3"), Labels: "{app=\"app1\"}", StreamHash: 1}, |
||||
{Entry: makeEntry(time.Unix(2, 0), "line2"), Labels: "{app=\"app2\"}", StreamHash: 2}, |
||||
{Entry: makeEntry(time.Unix(4, 0), "line4"), Labels: "{app=\"app2\"}", StreamHash: 2}, |
||||
}, |
||||
want: []entryWithLabels{ |
||||
{Entry: makeEntry(time.Unix(1, 0), "line1"), Labels: "{app=\"app1\"}", StreamHash: 1}, |
||||
{Entry: makeEntry(time.Unix(2, 0), "line2"), Labels: "{app=\"app2\"}", StreamHash: 2}, |
||||
}, |
||||
}, |
||||
{ |
||||
name: "backward direction with k=3", |
||||
k: 3, |
||||
direction: logproto.BACKWARD, |
||||
input: []entryWithLabels{ |
||||
{Entry: makeEntry(time.Unix(1, 0), "line1"), Labels: "{app=\"app1\"}", StreamHash: 1}, |
||||
{Entry: makeEntry(time.Unix(4, 0), "line4"), Labels: "{app=\"app1\"}", StreamHash: 1}, |
||||
{Entry: makeEntry(time.Unix(2, 0), "line2"), Labels: "{app=\"app2\"}", StreamHash: 2}, |
||||
{Entry: makeEntry(time.Unix(3, 0), "line3"), Labels: "{app=\"app2\"}", StreamHash: 2}, |
||||
{Entry: makeEntry(time.Unix(5, 0), "line5"), Labels: "{app=\"app2\"}", StreamHash: 2}, |
||||
}, |
||||
want: []entryWithLabels{ |
||||
{Entry: makeEntry(time.Unix(5, 0), "line5"), Labels: "{app=\"app2\"}", StreamHash: 2}, |
||||
{Entry: makeEntry(time.Unix(4, 0), "line4"), Labels: "{app=\"app1\"}", StreamHash: 1}, |
||||
{Entry: makeEntry(time.Unix(3, 0), "line3"), Labels: "{app=\"app2\"}", StreamHash: 2}, |
||||
}, |
||||
}, |
||||
{ |
||||
name: "k larger than available entries", |
||||
k: 10, |
||||
direction: logproto.FORWARD, |
||||
input: []entryWithLabels{ |
||||
{Entry: makeEntry(time.Unix(1, 0), "line1"), Labels: "{app=\"app1\"}", StreamHash: 1}, |
||||
{Entry: makeEntry(time.Unix(2, 0), "line2"), Labels: "{app=\"app1\"}", StreamHash: 1}, |
||||
}, |
||||
want: []entryWithLabels{ |
||||
{Entry: makeEntry(time.Unix(1, 0), "line1"), Labels: "{app=\"app1\"}", StreamHash: 1}, |
||||
{Entry: makeEntry(time.Unix(2, 0), "line2"), Labels: "{app=\"app1\"}", StreamHash: 1}, |
||||
}, |
||||
}, |
||||
{ |
||||
name: "mixed timestamps with k=4", |
||||
k: 4, |
||||
direction: logproto.FORWARD, |
||||
input: []entryWithLabels{ |
||||
{Entry: makeEntry(time.Unix(1, 0), "line1"), Labels: "{app=\"app1\"}", StreamHash: 1}, |
||||
{Entry: makeEntry(time.Unix(4, 0), "line4"), Labels: "{app=\"app1\"}", StreamHash: 1}, |
||||
{Entry: makeEntry(time.Unix(5, 0), "line5"), Labels: "{app=\"app1\"}", StreamHash: 1}, |
||||
{Entry: makeEntry(time.Unix(2, 0), "line2"), Labels: "{app=\"app2\"}", StreamHash: 2}, |
||||
{Entry: makeEntry(time.Unix(3, 0), "line3"), Labels: "{app=\"app2\"}", StreamHash: 2}, |
||||
{Entry: makeEntry(time.Unix(6, 0), "line6"), Labels: "{app=\"app2\"}", StreamHash: 2}, |
||||
}, |
||||
want: []entryWithLabels{ |
||||
{Entry: makeEntry(time.Unix(1, 0), "line1"), Labels: "{app=\"app1\"}", StreamHash: 1}, |
||||
{Entry: makeEntry(time.Unix(2, 0), "line2"), Labels: "{app=\"app2\"}", StreamHash: 2}, |
||||
{Entry: makeEntry(time.Unix(3, 0), "line3"), Labels: "{app=\"app2\"}", StreamHash: 2}, |
||||
{Entry: makeEntry(time.Unix(4, 0), "line4"), Labels: "{app=\"app1\"}", StreamHash: 1}, |
||||
}, |
||||
}, |
||||
} |
||||
|
||||
for _, tt := range tests { |
||||
t.Run(tt.name, func(t *testing.T) { |
||||
// Create topk iterator
|
||||
top := topk.Heap[entryWithLabels]{ |
||||
Limit: tt.k, |
||||
Less: lessFn(tt.direction), |
||||
} |
||||
|
||||
// Add entries
|
||||
for _, e := range tt.input { |
||||
top.Push(e) |
||||
} |
||||
|
||||
// Collect results
|
||||
var got []entryWithLabels |
||||
|
||||
iter := heapIterator(&top) |
||||
for iter.Next() { |
||||
got = append(got, entryWithLabels{ |
||||
Entry: iter.At(), |
||||
Labels: iter.Labels(), |
||||
StreamHash: iter.StreamHash(), |
||||
}) |
||||
} |
||||
|
||||
require.Equal(t, tt.want, got) |
||||
require.NoError(t, iter.Err()) |
||||
}) |
||||
} |
||||
} |
||||
@ -1,336 +0,0 @@ |
||||
package querier |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"io" |
||||
"sort" |
||||
"sync" |
||||
"time" |
||||
|
||||
"github.com/go-kit/log" |
||||
"github.com/go-kit/log/level" |
||||
"github.com/prometheus/common/model" |
||||
"github.com/prometheus/prometheus/model/labels" |
||||
"go.opentelemetry.io/otel/attribute" |
||||
"go.opentelemetry.io/otel/trace" |
||||
"go.uber.org/atomic" |
||||
"golang.org/x/sync/errgroup" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/dataobj" |
||||
"github.com/grafana/loki/v3/pkg/dataobj/sections/streams" |
||||
"github.com/grafana/loki/v3/pkg/logproto" |
||||
"github.com/grafana/loki/v3/pkg/logql" |
||||
util_log "github.com/grafana/loki/v3/pkg/util/log" |
||||
) |
||||
|
||||
var streamsPool = sync.Pool{ |
||||
New: func() any { |
||||
streams := make([]streams.Stream, 1024) |
||||
return &streams |
||||
}, |
||||
} |
||||
|
||||
// SelectSeries implements querier.Store
|
||||
func (s *Store) SelectSeries(ctx context.Context, req logql.SelectLogParams) ([]logproto.SeriesIdentifier, error) { |
||||
logger := util_log.WithContext(ctx, s.logger) |
||||
|
||||
objects, err := s.objectsForTimeRange(ctx, req.Start, req.End, logger) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
if len(objects) == 0 { |
||||
return nil, nil |
||||
} |
||||
|
||||
shard, err := parseShards(req.Shards) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
var matchers []*labels.Matcher |
||||
if req.Selector != "" { |
||||
expr, err := req.LogSelector() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
matchers = expr.Matchers() |
||||
} |
||||
|
||||
uniqueSeries := &sync.Map{} |
||||
|
||||
processor := newStreamProcessor(req.Start, req.End, matchers, objects, shard, logger) |
||||
|
||||
err = processor.ProcessParallel(ctx, func(h uint64, stream streams.Stream) { |
||||
uniqueSeries.Store(h, labelsToSeriesIdentifier(stream.Labels)) |
||||
}) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
var result []logproto.SeriesIdentifier |
||||
|
||||
// Convert sync.Map to slice
|
||||
uniqueSeries.Range(func(_, value interface{}) bool { |
||||
if sid, ok := value.(logproto.SeriesIdentifier); ok { |
||||
result = append(result, sid) |
||||
} |
||||
return true |
||||
}) |
||||
|
||||
return result, nil |
||||
} |
||||
|
||||
// LabelNamesForMetricName implements querier.Store
|
||||
func (s *Store) LabelNamesForMetricName(ctx context.Context, _ string, from, through model.Time, _ string, matchers ...*labels.Matcher) ([]string, error) { |
||||
logger := util_log.WithContext(ctx, s.logger) |
||||
start, end := from.Time(), through.Time() |
||||
objects, err := s.objectsForTimeRange(ctx, start, end, logger) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
if len(objects) == 0 { |
||||
return nil, nil |
||||
} |
||||
|
||||
processor := newStreamProcessor(start, end, matchers, objects, noShard, logger) |
||||
uniqueNames := sync.Map{} |
||||
|
||||
err = processor.ProcessParallel(ctx, func(_ uint64, stream streams.Stream) { |
||||
stream.Labels.Range(func(label labels.Label) { |
||||
uniqueNames.Store(label.Name, nil) |
||||
}) |
||||
}) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
names := []string{} |
||||
uniqueNames.Range(func(key, _ interface{}) bool { |
||||
names = append(names, key.(string)) |
||||
return true |
||||
}) |
||||
|
||||
sort.Strings(names) |
||||
|
||||
return names, nil |
||||
} |
||||
|
||||
// LabelValuesForMetricName implements querier.Store
|
||||
func (s *Store) LabelValuesForMetricName(ctx context.Context, _ string, from, through model.Time, _ string, labelName string, matchers ...*labels.Matcher) ([]string, error) { |
||||
logger := util_log.WithContext(ctx, s.logger) |
||||
start, end := from.Time(), through.Time() |
||||
|
||||
requireLabel, err := labels.NewMatcher(labels.MatchNotEqual, labelName, "") |
||||
if err != nil { |
||||
return nil, fmt.Errorf("failed to instantiate label matcher: %w", err) |
||||
} |
||||
|
||||
matchers = append(matchers, requireLabel) |
||||
|
||||
objects, err := s.objectsForTimeRange(ctx, start, end, logger) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
if len(objects) == 0 { |
||||
return nil, nil |
||||
} |
||||
|
||||
processor := newStreamProcessor(start, end, matchers, objects, noShard, logger) |
||||
uniqueValues := sync.Map{} |
||||
|
||||
err = processor.ProcessParallel(ctx, func(_ uint64, stream streams.Stream) { |
||||
uniqueValues.Store(stream.Labels.Get(labelName), nil) |
||||
}) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
values := []string{} |
||||
uniqueValues.Range(func(key, _ interface{}) bool { |
||||
values = append(values, key.(string)) |
||||
return true |
||||
}) |
||||
|
||||
sort.Strings(values) |
||||
|
||||
return values, nil |
||||
} |
||||
|
||||
// streamProcessor handles processing of unique series with custom collection logic
|
||||
type streamProcessor struct { |
||||
predicate streams.RowPredicate |
||||
seenSeries *sync.Map |
||||
objects []object |
||||
shard logql.Shard |
||||
logger log.Logger |
||||
} |
||||
|
||||
// newStreamProcessor creates a new streamProcessor with the given parameters
|
||||
func newStreamProcessor(start, end time.Time, matchers []*labels.Matcher, objects []object, shard logql.Shard, logger log.Logger) *streamProcessor { |
||||
return &streamProcessor{ |
||||
predicate: streamPredicate(matchers, start, end), |
||||
seenSeries: &sync.Map{}, |
||||
objects: objects, |
||||
shard: shard, |
||||
logger: logger, |
||||
} |
||||
} |
||||
|
||||
// ProcessParallel processes series from multiple readers in parallel
|
||||
// dataobj.Stream objects returned to onNewStream may be reused and must be deep copied for further use, including the stream.Labels keys and values.
|
||||
func (sp *streamProcessor) ProcessParallel(ctx context.Context, onNewStream func(uint64, streams.Stream)) error { |
||||
readers, err := shardStreamReaders(ctx, sp.objects, sp.shard) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
defer func() { |
||||
for _, reader := range readers { |
||||
_ = reader.Close() |
||||
streamReaderPool.Put(reader) |
||||
} |
||||
}() |
||||
|
||||
start := time.Now() |
||||
span := trace.SpanFromContext(ctx) |
||||
span.AddEvent("processing streams", trace.WithAttributes(attribute.Int("total_readers", len(readers)))) |
||||
level.Debug(sp.logger).Log("msg", "processing streams", "total_readers", len(readers)) |
||||
|
||||
// set predicate on all readers
|
||||
for _, reader := range readers { |
||||
if err := reader.SetPredicate(sp.predicate); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
|
||||
g, ctx := errgroup.WithContext(ctx) |
||||
var processedStreams atomic.Int64 |
||||
for _, reader := range readers { |
||||
g.Go(func() error { |
||||
ctx, span := tracer.Start(ctx, "streamProcessor.processSingleReader") |
||||
defer span.End() |
||||
|
||||
n, err := sp.processSingleReader(ctx, reader, onNewStream) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
processedStreams.Add(n) |
||||
return nil |
||||
}) |
||||
} |
||||
err = g.Wait() |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
level.Debug(sp.logger).Log("msg", "finished processing streams", |
||||
"total_readers", len(readers), |
||||
"total_streams_processed", processedStreams.Load(), |
||||
"duration", time.Since(start), |
||||
) |
||||
span.AddEvent("streamProcessor.ProcessParallel done", trace.WithAttributes( |
||||
attribute.Int("total_readers", len(readers)), |
||||
attribute.Int64("total_streams_processed", processedStreams.Load()), |
||||
attribute.String("duration", time.Since(start).String()), |
||||
)) |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func (sp *streamProcessor) processSingleReader(ctx context.Context, reader *streams.RowReader, onNewStream func(uint64, streams.Stream)) (int64, error) { |
||||
var ( |
||||
streamsPtr = streamsPool.Get().(*[]streams.Stream) |
||||
streams = *streamsPtr |
||||
buf = make([]byte, 0, 1024) |
||||
h uint64 |
||||
processed int64 |
||||
) |
||||
|
||||
defer streamsPool.Put(streamsPtr) |
||||
|
||||
for { |
||||
n, err := reader.Read(ctx, streams) |
||||
if err != nil && err != io.EOF { |
||||
return processed, fmt.Errorf("failed to read streams: %w", err) |
||||
} |
||||
if n == 0 && err == io.EOF { |
||||
break |
||||
} |
||||
for _, stream := range streams[:n] { |
||||
h, buf = stream.Labels.HashWithoutLabels(buf, []string(nil)...) |
||||
// Try to claim this hash first
|
||||
if _, seen := sp.seenSeries.LoadOrStore(h, nil); seen { |
||||
continue |
||||
} |
||||
onNewStream(h, stream) |
||||
processed++ |
||||
} |
||||
} |
||||
return processed, nil |
||||
} |
||||
|
||||
func labelsToSeriesIdentifier(lbls labels.Labels) logproto.SeriesIdentifier { |
||||
series := make([]logproto.SeriesIdentifier_LabelsEntry, 0, lbls.Len()) |
||||
lbls.Range(func(label labels.Label) { |
||||
series = append(series, logproto.SeriesIdentifier_LabelsEntry{ |
||||
Key: label.Name, |
||||
Value: label.Value, |
||||
}) |
||||
}) |
||||
return logproto.SeriesIdentifier{Labels: series} |
||||
} |
||||
|
||||
// shardStreamReaders fetches metadata of objects in parallel and shards them into a list of StreamsReaders
|
||||
func shardStreamReaders(ctx context.Context, objects []object, shard logql.Shard) ([]*streams.RowReader, error) { |
||||
ctx, span := tracer.Start(ctx, "shardStreamReaders") |
||||
defer span.End() |
||||
|
||||
span.SetAttributes(attribute.Int("objects", len(objects))) |
||||
|
||||
var ( |
||||
// sectionIndex tracks the global section number across all objects to ensure consistent sharding
|
||||
sectionIndex uint64 |
||||
readers []*streams.RowReader |
||||
) |
||||
|
||||
for _, object := range objects { |
||||
// For sharded queries (e.g., "1 of 2"), we only read sections that belong to our shard
|
||||
// The section is assigned to a shard based on its global index across all objects
|
||||
if shard.PowerOfTwo != nil && shard.PowerOfTwo.Of > 1 { |
||||
if sectionIndex%uint64(shard.PowerOfTwo.Of) != uint64(shard.PowerOfTwo.Shard) { |
||||
sectionIndex++ |
||||
continue |
||||
} |
||||
} |
||||
|
||||
var found *dataobj.Section |
||||
for _, section := range object.Sections() { |
||||
if !streams.CheckSection(section) { |
||||
continue |
||||
} else if found != nil { |
||||
return nil, fmt.Errorf("object has unsupported multiple streams sections") |
||||
} |
||||
found = section |
||||
} |
||||
if found == nil { |
||||
return nil, fmt.Errorf("object has no streams sections") |
||||
} |
||||
|
||||
sec, err := streams.Open(ctx, found) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("opening streams section: %w", err) |
||||
} |
||||
|
||||
reader := streamReaderPool.Get().(*streams.RowReader) |
||||
reader.Reset(sec) |
||||
readers = append(readers, reader) |
||||
sectionIndex++ |
||||
} |
||||
|
||||
span.AddEvent("shardStreamReaders done", trace.WithAttributes( |
||||
attribute.Int("readers", len(readers)), |
||||
)) |
||||
return readers, nil |
||||
} |
||||
@ -1,330 +0,0 @@ |
||||
package querier |
||||
|
||||
import ( |
||||
"context" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/go-kit/log" |
||||
"github.com/grafana/dskit/user" |
||||
"github.com/prometheus/common/model" |
||||
"github.com/prometheus/prometheus/model/labels" |
||||
"github.com/stretchr/testify/require" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/dataobj/metastore" |
||||
"github.com/grafana/loki/v3/pkg/logproto" |
||||
"github.com/grafana/loki/v3/pkg/logql" |
||||
) |
||||
|
||||
func TestStore_SelectSeries(t *testing.T) { |
||||
const testTenant = "test-tenant" |
||||
builder := newTestDataBuilder(t) |
||||
defer builder.close() |
||||
|
||||
ctx, _ := context.WithTimeout(t.Context(), time.Second) //nolint:govet
|
||||
ctx = user.InjectOrgID(ctx, testTenant) |
||||
|
||||
// Setup test data
|
||||
now := setupTestData(ctx, t, builder, testTenant) |
||||
meta := metastore.NewObjectMetastore(builder.bucket, log.NewNopLogger(), nil) |
||||
store := NewStore(builder.bucket, log.NewNopLogger(), meta) |
||||
|
||||
tests := []struct { |
||||
name string |
||||
selector string |
||||
want []string |
||||
}{ |
||||
{ |
||||
name: "select all series", |
||||
selector: ``, |
||||
want: []string{ |
||||
`{app="foo", env="prod"}`, |
||||
`{app="foo", env="dev"}`, |
||||
`{app="bar", env="prod"}`, |
||||
`{app="bar", env="dev"}`, |
||||
`{app="baz", env="prod", team="a"}`, |
||||
}, |
||||
}, |
||||
{ |
||||
name: "select with equality matcher", |
||||
selector: `{app="foo"}`, |
||||
want: []string{ |
||||
`{app="foo", env="prod"}`, |
||||
`{app="foo", env="dev"}`, |
||||
}, |
||||
}, |
||||
{ |
||||
name: "select with regex matcher", |
||||
selector: `{app=~"foo|bar"}`, |
||||
want: []string{ |
||||
`{app="foo", env="prod"}`, |
||||
`{app="foo", env="dev"}`, |
||||
`{app="bar", env="prod"}`, |
||||
`{app="bar", env="dev"}`, |
||||
}, |
||||
}, |
||||
{ |
||||
name: "select with negative equality matcher", |
||||
selector: `{app=~".+", app!="foo"}`, |
||||
want: []string{ |
||||
`{app="bar", env="prod"}`, |
||||
`{app="bar", env="dev"}`, |
||||
`{app="baz", env="prod", team="a"}`, |
||||
}, |
||||
}, |
||||
{ |
||||
name: "select with negative regex matcher", |
||||
selector: `{app=~".+", app!~"foo|bar"}`, |
||||
want: []string{ |
||||
`{app="baz", env="prod", team="a"}`, |
||||
}, |
||||
}, |
||||
{ |
||||
name: "select with multiple matchers", |
||||
selector: `{app="foo", env="prod"}`, |
||||
want: []string{ |
||||
`{app="foo", env="prod"}`, |
||||
}, |
||||
}, |
||||
{ |
||||
name: "select with regex and equality matchers", |
||||
selector: `{app=~"foo|bar", env="prod"}`, |
||||
want: []string{ |
||||
`{app="foo", env="prod"}`, |
||||
`{app="bar", env="prod"}`, |
||||
}, |
||||
}, |
||||
} |
||||
|
||||
for _, tt := range tests { |
||||
t.Run(tt.name, func(t *testing.T) { |
||||
series, err := store.SelectSeries(ctx, logql.SelectLogParams{ |
||||
QueryRequest: &logproto.QueryRequest{ |
||||
Start: now.Add(-time.Hour), |
||||
End: now.Add(time.Hour), |
||||
Plan: planFromString(tt.selector), |
||||
Selector: tt.selector, |
||||
}, |
||||
}) |
||||
require.NoError(t, err) |
||||
|
||||
var got []string |
||||
for _, s := range series { |
||||
got = append(got, labelsFromSeriesID(s)) |
||||
} |
||||
require.ElementsMatch(t, tt.want, got) |
||||
}) |
||||
} |
||||
|
||||
t.Run("sharding", func(t *testing.T) { |
||||
// Query first shard
|
||||
series1, err := store.SelectSeries(ctx, logql.SelectLogParams{ |
||||
QueryRequest: &logproto.QueryRequest{ |
||||
Start: now.Add(-time.Hour), |
||||
End: now.Add(time.Hour), |
||||
Plan: planFromString(`{app=~"foo|bar|baz"}`), |
||||
Selector: `{app=~"foo|bar|baz"}`, |
||||
Shards: []string{"0_of_2"}, |
||||
}, |
||||
}) |
||||
require.NoError(t, err) |
||||
require.NotEmpty(t, series1) |
||||
require.Less(t, len(series1), 5) // Should get less than all series
|
||||
|
||||
// Query second shard
|
||||
series2, err := store.SelectSeries(ctx, logql.SelectLogParams{ |
||||
QueryRequest: &logproto.QueryRequest{ |
||||
Start: now.Add(-time.Hour), |
||||
End: now.Add(time.Hour), |
||||
Plan: planFromString(`{app=~"foo|bar|baz"}`), |
||||
Selector: `{app=~"foo|bar|baz"}`, |
||||
Shards: []string{"1_of_2"}, |
||||
}, |
||||
}) |
||||
require.NoError(t, err) |
||||
require.NotEmpty(t, series2) |
||||
|
||||
// Combined shards should equal all series
|
||||
var allSeries []string |
||||
for _, s := range append(series1, series2...) { |
||||
allSeries = append(allSeries, labelsFromSeriesID(s)) |
||||
} |
||||
|
||||
want := []string{ |
||||
`{app="foo", env="prod"}`, |
||||
`{app="foo", env="dev"}`, |
||||
`{app="bar", env="prod"}`, |
||||
`{app="bar", env="dev"}`, |
||||
`{app="baz", env="prod", team="a"}`, |
||||
} |
||||
require.ElementsMatch(t, want, allSeries) |
||||
}) |
||||
} |
||||
|
||||
func TestStore_LabelNamesForMetricName(t *testing.T) { |
||||
const testTenant = "test-tenant" |
||||
builder := newTestDataBuilder(t) |
||||
defer builder.close() |
||||
|
||||
ctx, _ := context.WithTimeout(t.Context(), time.Second) //nolint:govet
|
||||
ctx = user.InjectOrgID(ctx, testTenant) |
||||
|
||||
// Setup test data
|
||||
now := setupTestData(ctx, t, builder, testTenant) |
||||
meta := metastore.NewObjectMetastore(builder.bucket, log.NewNopLogger(), nil) |
||||
store := NewStore(builder.bucket, log.NewNopLogger(), meta) |
||||
|
||||
tests := []struct { |
||||
name string |
||||
matchers []*labels.Matcher |
||||
want []string |
||||
}{ |
||||
{ |
||||
name: "no matchers", |
||||
matchers: nil, |
||||
want: []string{"app", "env", "team"}, |
||||
}, |
||||
{ |
||||
name: "with equality matcher", |
||||
matchers: []*labels.Matcher{ |
||||
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"), |
||||
}, |
||||
want: []string{"app", "env"}, |
||||
}, |
||||
{ |
||||
name: "with regex matcher", |
||||
matchers: []*labels.Matcher{ |
||||
labels.MustNewMatcher(labels.MatchRegexp, "app", "foo|bar"), |
||||
}, |
||||
want: []string{"app", "env"}, |
||||
}, |
||||
{ |
||||
name: "with negative matcher", |
||||
matchers: []*labels.Matcher{ |
||||
labels.MustNewMatcher(labels.MatchNotEqual, "app", "foo"), |
||||
}, |
||||
want: []string{"app", "env", "team"}, |
||||
}, |
||||
{ |
||||
name: "with negative regex matcher", |
||||
matchers: []*labels.Matcher{ |
||||
labels.MustNewMatcher(labels.MatchNotRegexp, "app", "foo|bar"), |
||||
}, |
||||
want: []string{"app", "env", "team"}, |
||||
}, |
||||
{ |
||||
name: "with multiple matchers", |
||||
matchers: []*labels.Matcher{ |
||||
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"), |
||||
labels.MustNewMatcher(labels.MatchEqual, "env", "prod"), |
||||
}, |
||||
want: []string{"app", "env"}, |
||||
}, |
||||
} |
||||
|
||||
for _, tt := range tests { |
||||
t.Run(tt.name, func(t *testing.T) { |
||||
names, err := store.LabelNamesForMetricName(ctx, "", model.TimeFromUnixNano(now.Add(-time.Hour).UnixNano()), model.TimeFromUnixNano(now.Add(time.Hour).UnixNano()), "", tt.matchers...) |
||||
require.NoError(t, err) |
||||
require.ElementsMatch(t, tt.want, names) |
||||
}) |
||||
} |
||||
} |
||||
|
||||
func TestStore_LabelValuesForMetricName(t *testing.T) { |
||||
const testTenant = "test-tenant" |
||||
builder := newTestDataBuilder(t) |
||||
defer builder.close() |
||||
|
||||
ctx, _ := context.WithTimeout(t.Context(), time.Second) //nolint:govet
|
||||
ctx = user.InjectOrgID(ctx, testTenant) |
||||
|
||||
// Setup test data
|
||||
now := setupTestData(ctx, t, builder, testTenant) |
||||
meta := metastore.NewObjectMetastore(builder.bucket, log.NewNopLogger(), nil) |
||||
store := NewStore(builder.bucket, log.NewNopLogger(), meta) |
||||
|
||||
tests := []struct { |
||||
name string |
||||
labelName string |
||||
matchers []*labels.Matcher |
||||
want []string |
||||
}{ |
||||
{ |
||||
name: "app label without matchers", |
||||
labelName: "app", |
||||
matchers: nil, |
||||
want: []string{"bar", "baz", "foo"}, |
||||
}, |
||||
{ |
||||
name: "env label without matchers", |
||||
labelName: "env", |
||||
matchers: nil, |
||||
want: []string{"dev", "prod"}, |
||||
}, |
||||
{ |
||||
name: "team label without matchers", |
||||
labelName: "team", |
||||
matchers: nil, |
||||
want: []string{"a"}, |
||||
}, |
||||
{ |
||||
name: "env label with app equality matcher", |
||||
labelName: "env", |
||||
matchers: []*labels.Matcher{ |
||||
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"), |
||||
}, |
||||
want: []string{"dev", "prod"}, |
||||
}, |
||||
{ |
||||
name: "env label with app regex matcher", |
||||
labelName: "env", |
||||
matchers: []*labels.Matcher{ |
||||
labels.MustNewMatcher(labels.MatchRegexp, "app", "foo|bar"), |
||||
}, |
||||
want: []string{"dev", "prod"}, |
||||
}, |
||||
{ |
||||
name: "env label with app negative matcher", |
||||
labelName: "env", |
||||
matchers: []*labels.Matcher{ |
||||
labels.MustNewMatcher(labels.MatchNotEqual, "app", "foo"), |
||||
}, |
||||
want: []string{"dev", "prod"}, |
||||
}, |
||||
{ |
||||
name: "env label with app negative regex matcher", |
||||
labelName: "env", |
||||
matchers: []*labels.Matcher{ |
||||
labels.MustNewMatcher(labels.MatchNotRegexp, "app", "foo|bar"), |
||||
}, |
||||
want: []string{"prod"}, |
||||
}, |
||||
{ |
||||
name: "env label with multiple matchers", |
||||
labelName: "env", |
||||
matchers: []*labels.Matcher{ |
||||
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"), |
||||
labels.MustNewMatcher(labels.MatchEqual, "env", "prod"), |
||||
}, |
||||
want: []string{"prod"}, |
||||
}, |
||||
} |
||||
|
||||
for _, tt := range tests { |
||||
t.Run(tt.name, func(t *testing.T) { |
||||
values, err := store.LabelValuesForMetricName(ctx, "", model.TimeFromUnixNano(now.Add(-time.Hour).UnixNano()), model.TimeFromUnixNano(now.Add(time.Hour).UnixNano()), "", tt.labelName, tt.matchers...) |
||||
require.NoError(t, err) |
||||
require.Equal(t, tt.want, values) |
||||
}) |
||||
} |
||||
} |
||||
|
||||
func labelsFromSeriesID(id logproto.SeriesIdentifier) string { |
||||
builder := labels.NewScratchBuilder(len(id.Labels)) |
||||
for _, l := range id.Labels { |
||||
builder.Add(l.Key, l.Value) |
||||
} |
||||
builder.Sort() |
||||
return builder.Labels().String() |
||||
} |
||||
@ -1,813 +0,0 @@ |
||||
package querier |
||||
|
||||
import ( |
||||
"context" |
||||
"flag" |
||||
"fmt" |
||||
"io" |
||||
"slices" |
||||
"strings" |
||||
"sync" |
||||
"time" |
||||
|
||||
"github.com/go-kit/log" |
||||
"github.com/go-kit/log/level" |
||||
"github.com/grafana/dskit/user" |
||||
"github.com/prometheus/common/model" |
||||
"github.com/prometheus/prometheus/model/labels" |
||||
"github.com/thanos-io/objstore" |
||||
"go.opentelemetry.io/otel" |
||||
"go.opentelemetry.io/otel/attribute" |
||||
"go.opentelemetry.io/otel/trace" |
||||
"golang.org/x/sync/errgroup" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/dataobj" |
||||
"github.com/grafana/loki/v3/pkg/dataobj/metastore" |
||||
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs" |
||||
"github.com/grafana/loki/v3/pkg/dataobj/sections/streams" |
||||
"github.com/grafana/loki/v3/pkg/iter" |
||||
"github.com/grafana/loki/v3/pkg/logproto" |
||||
"github.com/grafana/loki/v3/pkg/logql" |
||||
"github.com/grafana/loki/v3/pkg/logql/syntax" |
||||
"github.com/grafana/loki/v3/pkg/querier" |
||||
"github.com/grafana/loki/v3/pkg/storage/chunk" |
||||
"github.com/grafana/loki/v3/pkg/storage/config" |
||||
storageconfig "github.com/grafana/loki/v3/pkg/storage/config" |
||||
"github.com/grafana/loki/v3/pkg/storage/stores/index/stats" |
||||
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index" |
||||
"github.com/grafana/loki/v3/pkg/tracing" |
||||
util_log "github.com/grafana/loki/v3/pkg/util/log" |
||||
) |
||||
|
||||
var tracer = otel.Tracer("pkg/dataobj/querier") |
||||
|
||||
var ( |
||||
_ querier.Store = &Store{} |
||||
|
||||
noShard = logql.Shard{ |
||||
PowerOfTwo: &index.ShardAnnotation{ |
||||
Shard: uint32(0), |
||||
Of: uint32(1), |
||||
}, |
||||
} |
||||
|
||||
shardedObjectsPool = sync.Pool{ |
||||
New: func() any { |
||||
return &shardedObject{ |
||||
streams: make(map[int64]streams.Stream), |
||||
streamsIDs: make([]int64, 0, 1024), |
||||
logReaders: make([]*logs.RowReader, 0, 16), |
||||
} |
||||
}, |
||||
} |
||||
logReaderPool = sync.Pool{ |
||||
New: func() any { |
||||
return &logs.RowReader{} |
||||
}, |
||||
} |
||||
streamReaderPool = sync.Pool{ |
||||
New: func() any { |
||||
return &streams.RowReader{} |
||||
}, |
||||
} |
||||
) |
||||
|
||||
type Config struct { |
||||
Enabled bool `yaml:"enabled" doc:"description=Enable the dataobj querier."` |
||||
From storageconfig.DayTime `yaml:"from" doc:"description=The date of the first day of when the dataobj querier should start querying from. In YYYY-MM-DD format, for example: 2018-04-15."` |
||||
ShardFactor int `yaml:"shard_factor" doc:"description=The number of shards to use for the dataobj querier."` |
||||
} |
||||
|
||||
func (c *Config) RegisterFlags(f *flag.FlagSet) { |
||||
f.BoolVar(&c.Enabled, "dataobj-querier-enabled", false, "Enable the dataobj querier.") |
||||
f.Var(&c.From, "dataobj-querier-from", "The start time to query from.") |
||||
f.IntVar(&c.ShardFactor, "dataobj-querier-shard-factor", 32, "The number of shards to use for the dataobj querier.") |
||||
} |
||||
|
||||
func (c *Config) Validate() error { |
||||
if c.Enabled && c.From.ModelTime().Time().IsZero() { |
||||
return fmt.Errorf("from is required when dataobj querier is enabled") |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (c *Config) PeriodConfig() config.PeriodConfig { |
||||
return config.PeriodConfig{ |
||||
From: c.From, |
||||
RowShards: uint32(c.ShardFactor), |
||||
Schema: "v13", |
||||
} |
||||
} |
||||
|
||||
// Store implements querier.Store for querying data objects.
|
||||
type Store struct { |
||||
bucket objstore.Bucket |
||||
logger log.Logger |
||||
metastore metastore.Metastore |
||||
} |
||||
|
||||
// NewStore creates a new Store.
|
||||
func NewStore(bucket objstore.Bucket, logger log.Logger, metastore metastore.Metastore) *Store { |
||||
return &Store{ |
||||
bucket: bucket, |
||||
logger: logger, |
||||
metastore: metastore, |
||||
} |
||||
} |
||||
|
||||
func (s *Store) String() string { |
||||
return "dataobj" |
||||
} |
||||
|
||||
// SelectLogs implements querier.Store
|
||||
func (s *Store) SelectLogs(ctx context.Context, req logql.SelectLogParams) (iter.EntryIterator, error) { |
||||
logger := util_log.WithContext(ctx, s.logger) |
||||
|
||||
objects, err := s.objectsForTimeRange(ctx, req.Start, req.End, logger) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
if len(objects) == 0 { |
||||
return iter.NoopEntryIterator, nil |
||||
} |
||||
|
||||
shard, err := parseShards(req.Shards) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return selectLogs(ctx, objects, shard, req, logger) |
||||
} |
||||
|
||||
// SelectSamples implements querier.Store
|
||||
func (s *Store) SelectSamples(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error) { |
||||
logger := util_log.WithContext(ctx, s.logger) |
||||
|
||||
objects, err := s.objectsForTimeRange(ctx, req.Start, req.End, logger) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
if len(objects) == 0 { |
||||
return iter.NoopSampleIterator, nil |
||||
} |
||||
|
||||
shard, err := parseShards(req.Shards) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
expr, err := req.Expr() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return selectSamples(ctx, objects, shard, expr, req.Start, req.End, logger) |
||||
} |
||||
|
||||
// Stats implements querier.Store
|
||||
func (s *Store) Stats(_ context.Context, _ string, _ model.Time, _ model.Time, _ ...*labels.Matcher) (*stats.Stats, error) { |
||||
// TODO: Implement
|
||||
return &stats.Stats{}, nil |
||||
} |
||||
|
||||
// Volume implements querier.Store
|
||||
func (s *Store) Volume(_ context.Context, _ string, _ model.Time, _ model.Time, _ int32, _ []string, _ string, _ ...*labels.Matcher) (*logproto.VolumeResponse, error) { |
||||
// TODO: Implement
|
||||
return &logproto.VolumeResponse{}, nil |
||||
} |
||||
|
||||
// GetShards implements querier.Store
|
||||
func (s *Store) GetShards(_ context.Context, _ string, _ model.Time, _ model.Time, _ uint64, _ chunk.Predicate) (*logproto.ShardsResponse, error) { |
||||
// TODO: Implement
|
||||
return &logproto.ShardsResponse{}, nil |
||||
} |
||||
|
||||
type object struct { |
||||
*dataobj.Object |
||||
path string |
||||
} |
||||
|
||||
// objectsForTimeRange returns data objects for the given time range.
|
||||
func (s *Store) objectsForTimeRange(ctx context.Context, from, through time.Time, logger log.Logger) ([]object, error) { |
||||
ctx, span := tracer.Start(ctx, "objectsForTimeRange") |
||||
defer span.End() |
||||
|
||||
span.SetAttributes( |
||||
attribute.String("from", from.String()), |
||||
attribute.String("through", through.String()), |
||||
) |
||||
|
||||
files, err := s.metastore.DataObjects(ctx, from, through) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
level.Debug(logger).Log( |
||||
"msg", "found data objects for time range", |
||||
"count", len(files), |
||||
"from", from, |
||||
"through", through, |
||||
) |
||||
span.AddEvent("found data objects for time range", trace.WithAttributes( |
||||
attribute.Int("count", len(files)), |
||||
attribute.StringSlice("files", files)), |
||||
) |
||||
|
||||
objects := make([]object, 0, len(files)) |
||||
for _, path := range files { |
||||
obj, err := dataobj.FromBucket(ctx, s.bucket, path) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("getting object from bucket: %w", err) |
||||
} |
||||
objects = append(objects, object{Object: obj, path: path}) |
||||
} |
||||
return objects, nil |
||||
} |
||||
|
||||
func selectLogs(ctx context.Context, objects []object, shard logql.Shard, req logql.SelectLogParams, logger log.Logger) (iter.EntryIterator, error) { |
||||
selector, err := req.LogSelector() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
shardedObjects, err := shardObjects(ctx, objects, shard, logger) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
defer func() { |
||||
for _, obj := range shardedObjects { |
||||
obj.reset() |
||||
shardedObjectsPool.Put(obj) |
||||
} |
||||
}() |
||||
streamsPredicate := streamPredicate(selector.Matchers(), req.Start, req.End) |
||||
var logsPredicates []logs.RowPredicate |
||||
logsPredicates = append(logsPredicates, logs.TimeRangeRowPredicate{ |
||||
StartTime: req.Start, |
||||
EndTime: req.End, |
||||
IncludeStart: true, |
||||
IncludeEnd: false, |
||||
}) |
||||
|
||||
p, expr := buildLogsPredicateFromPipeline(selector) |
||||
if p != nil { |
||||
logsPredicates = append(logsPredicates, p) |
||||
} |
||||
req.Plan.AST = expr |
||||
|
||||
g, ctx := errgroup.WithContext(ctx) |
||||
iterators := make([]iter.EntryIterator, len(shardedObjects)) |
||||
|
||||
for i, obj := range shardedObjects { |
||||
g.Go(func() error { |
||||
ctx, span := tracer.Start(ctx, "object selectLogs", trace.WithAttributes( |
||||
attribute.String("object", obj.object.path), |
||||
attribute.Int("sections", len(obj.logReaders)), |
||||
)) |
||||
defer span.End() |
||||
|
||||
iterator, err := obj.selectLogs(ctx, streamsPredicate, logsPredicates, req) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
iterators[i] = iterator |
||||
return nil |
||||
}) |
||||
} |
||||
if err := g.Wait(); err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return iter.NewSortEntryIterator(iterators, req.Direction), nil |
||||
} |
||||
|
||||
func selectSamples(ctx context.Context, objects []object, shard logql.Shard, expr syntax.SampleExpr, start, end time.Time, logger log.Logger) (iter.SampleIterator, error) { |
||||
shardedObjects, err := shardObjects(ctx, objects, shard, logger) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
defer func() { |
||||
for _, obj := range shardedObjects { |
||||
obj.reset() |
||||
shardedObjectsPool.Put(obj) |
||||
} |
||||
}() |
||||
selector, err := expr.Selector() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
streamsPredicate := streamPredicate(selector.Matchers(), start, end) |
||||
// TODO: support more predicates and combine with log.Pipeline.
|
||||
var logsPredicates []logs.RowPredicate |
||||
logsPredicates = append(logsPredicates, logs.TimeRangeRowPredicate{ |
||||
StartTime: start, |
||||
EndTime: end, |
||||
IncludeStart: true, |
||||
IncludeEnd: false, |
||||
}) |
||||
|
||||
var predicateFromExpr logs.RowPredicate |
||||
predicateFromExpr, expr = buildLogsPredicateFromSampleExpr(expr) |
||||
if predicateFromExpr != nil { |
||||
logsPredicates = append(logsPredicates, predicateFromExpr) |
||||
} |
||||
|
||||
g, ctx := errgroup.WithContext(ctx) |
||||
iterators := make([]iter.SampleIterator, len(shardedObjects)) |
||||
|
||||
for i, obj := range shardedObjects { |
||||
g.Go(func() error { |
||||
ctx, span := tracer.Start(ctx, "object selectSamples", trace.WithAttributes( |
||||
attribute.String("object", obj.object.path), |
||||
attribute.Int("sections", len(obj.logReaders)), |
||||
)) |
||||
|
||||
defer span.End() |
||||
|
||||
iterator, err := obj.selectSamples(ctx, streamsPredicate, logsPredicates, expr) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
iterators[i] = iterator |
||||
return nil |
||||
}) |
||||
} |
||||
if err := g.Wait(); err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return iter.NewSortSampleIterator(iterators), nil |
||||
} |
||||
|
||||
type shardedObject struct { |
||||
object object |
||||
streamReader *streams.RowReader |
||||
logReaders []*logs.RowReader |
||||
|
||||
streamsIDs []int64 |
||||
streams map[int64]streams.Stream |
||||
} |
||||
|
||||
// shardSections returns a list of section indices to read per metadata based on the sharding configuration.
|
||||
// The returned slice has the same length as the input metadatas, and each element contains the list of section indices
|
||||
// that should be read for that metadata.
|
||||
func shardSections(metadatas []sectionsStats, shard logql.Shard) [][]int { |
||||
// Count total sections before sharding
|
||||
var totalSections int |
||||
for _, metadata := range metadatas { |
||||
totalSections += metadata.LogsSections |
||||
if metadata.StreamsSections > 1 { |
||||
// We don't support multiple streams sections, but we still need to return a slice
|
||||
// with the same length as the input metadatas.
|
||||
return make([][]int, len(metadatas)) |
||||
} |
||||
} |
||||
|
||||
// sectionIndex tracks the global section number across all objects to ensure consistent sharding
|
||||
var sectionIndex uint64 |
||||
result := make([][]int, len(metadatas)) |
||||
|
||||
for i, metadata := range metadatas { |
||||
sections := make([]int, 0, metadata.LogsSections) |
||||
for j := 0; j < metadata.LogsSections; j++ { |
||||
if shard.PowerOfTwo != nil && shard.PowerOfTwo.Of > 1 { |
||||
if sectionIndex%uint64(shard.PowerOfTwo.Of) != uint64(shard.PowerOfTwo.Shard) { |
||||
sectionIndex++ |
||||
continue |
||||
} |
||||
} |
||||
sections = append(sections, j) |
||||
sectionIndex++ |
||||
} |
||||
result[i] = sections |
||||
} |
||||
|
||||
return result |
||||
} |
||||
|
||||
func shardObjects( |
||||
ctx context.Context, |
||||
objects []object, |
||||
shard logql.Shard, |
||||
logger log.Logger, |
||||
) ([]*shardedObject, error) { |
||||
ctx, span := tracer.Start(ctx, "shardObjects") |
||||
defer span.End() |
||||
|
||||
metadatas, err := fetchSectionsStats(ctx, objects) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
// Get the sections to read per metadata
|
||||
sectionsPerMetadata := shardSections(metadatas, shard) |
||||
|
||||
// Count total sections that will be read
|
||||
var totalSections int |
||||
var objectSections []int |
||||
for i, sections := range sectionsPerMetadata { |
||||
totalSections += len(sections) |
||||
objectSections = append(objectSections, metadatas[i].LogsSections) |
||||
} |
||||
|
||||
shardedReaders := make([]*shardedObject, 0, len(objects)) |
||||
|
||||
for i, sections := range sectionsPerMetadata { |
||||
if len(sections) == 0 { |
||||
continue |
||||
} |
||||
|
||||
reader := shardedObjectsPool.Get().(*shardedObject) |
||||
reader.streamReader = streamReaderPool.Get().(*streams.RowReader) |
||||
reader.object = objects[i] |
||||
|
||||
sec, err := findStreamsSection(ctx, objects[i].Object) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("finding streams section: %w", err) |
||||
} |
||||
reader.streamReader.Reset(sec) |
||||
|
||||
for _, section := range sections { |
||||
sec, err := findLogsSection(ctx, objects[i].Object, section) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("finding logs section: %w", err) |
||||
} |
||||
|
||||
logReader := logReaderPool.Get().(*logs.RowReader) |
||||
logReader.Reset(sec) |
||||
reader.logReaders = append(reader.logReaders, logReader) |
||||
} |
||||
shardedReaders = append(shardedReaders, reader) |
||||
} |
||||
var sectionsString strings.Builder |
||||
for _, sections := range sectionsPerMetadata { |
||||
sectionsString.WriteString(fmt.Sprintf("%v ", sections)) |
||||
} |
||||
|
||||
logParams := []interface{}{ |
||||
"msg", "sharding sections", |
||||
"sharded_factor", shard.String(), |
||||
"total_objects", len(objects), |
||||
"total_sections", totalSections, |
||||
"object_sections", fmt.Sprintf("%v", objectSections), |
||||
"sharded_total_objects", len(shardedReaders), |
||||
"sharded_sections", sectionsString.String(), |
||||
} |
||||
|
||||
level.Debug(logger).Log(logParams...) |
||||
sp := trace.SpanFromContext(ctx) |
||||
sp.SetAttributes(tracing.KeyValuesToOTelAttributes(logParams)...) |
||||
|
||||
return shardedReaders, nil |
||||
} |
||||
|
||||
func findLogsSection(ctx context.Context, obj *dataobj.Object, index int) (*logs.Section, error) { |
||||
var count int |
||||
|
||||
for _, section := range obj.Sections().Filter(logs.CheckSection) { |
||||
if count == index { |
||||
return logs.Open(ctx, section) |
||||
} |
||||
count++ |
||||
} |
||||
|
||||
return nil, fmt.Errorf("object does not have logs section %d (max %d)", index, count) |
||||
} |
||||
|
||||
func findStreamsSection(ctx context.Context, obj *dataobj.Object) (*streams.Section, error) { |
||||
targetTenant, err := user.ExtractOrgID(ctx) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("extracting org ID: %w", err) |
||||
} |
||||
|
||||
for _, section := range obj.Sections().Filter(streams.CheckSection) { |
||||
if section.Tenant != targetTenant { |
||||
continue |
||||
} |
||||
return streams.Open(ctx, section) |
||||
} |
||||
|
||||
return nil, fmt.Errorf("object has no streams sections") |
||||
} |
||||
|
||||
func (s *shardedObject) reset() { |
||||
_ = s.streamReader.Close() |
||||
streamReaderPool.Put(s.streamReader) |
||||
for i, reader := range s.logReaders { |
||||
_ = reader.Close() |
||||
logReaderPool.Put(reader) |
||||
s.logReaders[i] = nil |
||||
} |
||||
s.streamReader = nil |
||||
s.logReaders = s.logReaders[:0] |
||||
s.streamsIDs = s.streamsIDs[:0] |
||||
s.object = object{} |
||||
clear(s.streams) |
||||
} |
||||
|
||||
func (s *shardedObject) selectLogs(ctx context.Context, streamsPredicate streams.RowPredicate, logsPredicates []logs.RowPredicate, req logql.SelectLogParams) (iter.EntryIterator, error) { |
||||
if err := s.setPredicate(streamsPredicate, logsPredicates); err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
if err := s.matchStreams(ctx); err != nil { |
||||
return nil, err |
||||
} |
||||
iterators := make([]iter.EntryIterator, len(s.logReaders)) |
||||
g, ctx := errgroup.WithContext(ctx) |
||||
|
||||
for i, reader := range s.logReaders { |
||||
g.Go(func() error { |
||||
sp := trace.SpanFromContext(ctx) |
||||
sp.AddEvent("starting selectLogs in section", trace.WithAttributes( |
||||
attribute.Int("index", i), |
||||
)) |
||||
defer func() { |
||||
sp.AddEvent("selectLogs section done", trace.WithAttributes( |
||||
attribute.Int("index", i), |
||||
)) |
||||
}() |
||||
|
||||
iter, err := newEntryIterator(ctx, s.streams, reader, req) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
iterators[i] = iter |
||||
return nil |
||||
}) |
||||
} |
||||
if err := g.Wait(); err != nil { |
||||
return nil, err |
||||
} |
||||
return iter.NewSortEntryIterator(iterators, req.Direction), nil |
||||
} |
||||
|
||||
func (s *shardedObject) selectSamples(ctx context.Context, streamsPredicate streams.RowPredicate, logsPredicates []logs.RowPredicate, expr syntax.SampleExpr) (iter.SampleIterator, error) { |
||||
if err := s.setPredicate(streamsPredicate, logsPredicates); err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
if err := s.matchStreams(ctx); err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
iterators := make([]iter.SampleIterator, len(s.logReaders)) |
||||
g, ctx := errgroup.WithContext(ctx) |
||||
|
||||
for i, reader := range s.logReaders { |
||||
g.Go(func() error { |
||||
sp := trace.SpanFromContext(ctx) |
||||
sp.AddEvent("starting selectSamples in section", trace.WithAttributes( |
||||
attribute.Int("index", i), |
||||
)) |
||||
defer func() { |
||||
sp.AddEvent("selectSamples section done", trace.WithAttributes( |
||||
attribute.Int("index", i), |
||||
)) |
||||
}() |
||||
|
||||
// extractors is not thread safe, so we need to create a new one for each object
|
||||
extractors, err := expr.Extractors() |
||||
if err != nil { |
||||
return err |
||||
} |
||||
iter, err := newSampleIterator(ctx, s.streams, extractors, reader) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
iterators[i] = iter |
||||
return nil |
||||
}) |
||||
} |
||||
|
||||
if err := g.Wait(); err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return iter.NewSortSampleIterator(iterators), nil |
||||
} |
||||
|
||||
func (s *shardedObject) setPredicate(streamsPredicate streams.RowPredicate, logsPredicates []logs.RowPredicate) error { |
||||
if err := s.streamReader.SetPredicate(streamsPredicate); err != nil { |
||||
return err |
||||
} |
||||
for _, reader := range s.logReaders { |
||||
if err := reader.SetPredicates(logsPredicates); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (s *shardedObject) matchStreams(ctx context.Context) error { |
||||
sp := trace.SpanFromContext(ctx) |
||||
sp.AddEvent("starting matchStreams") |
||||
defer sp.AddEvent("matchStreams done") |
||||
|
||||
streamsPtr := streamsPool.Get().(*[]streams.Stream) |
||||
defer streamsPool.Put(streamsPtr) |
||||
streams := *streamsPtr |
||||
|
||||
for { |
||||
n, err := s.streamReader.Read(ctx, streams) |
||||
if err != nil && err != io.EOF { |
||||
return err |
||||
} |
||||
if n == 0 && err == io.EOF { |
||||
break |
||||
} |
||||
|
||||
for _, stream := range streams[:n] { |
||||
s.streams[stream.ID] = stream |
||||
s.streamsIDs = append(s.streamsIDs, stream.ID) |
||||
} |
||||
} |
||||
// setup log readers to filter streams
|
||||
for _, reader := range s.logReaders { |
||||
if err := reader.MatchStreams(slices.Values(s.streamsIDs)); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// fetchSectionsStats retrieves section count of objects.
|
||||
func fetchSectionsStats(ctx context.Context, objects []object) ([]sectionsStats, error) { |
||||
sp := trace.SpanFromContext(ctx) |
||||
sp.AddEvent("fetching metadata", trace.WithAttributes( |
||||
attribute.Int("objects", len(objects)), |
||||
)) |
||||
defer sp.AddEvent("fetched metadata") |
||||
|
||||
targetTenant, err := user.ExtractOrgID(ctx) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("extracting org ID: %w", err) |
||||
} |
||||
|
||||
res := make([]sectionsStats, 0, len(objects)) |
||||
|
||||
for _, obj := range objects { |
||||
var stats sectionsStats |
||||
|
||||
for _, section := range obj.Sections() { |
||||
if section.Tenant != targetTenant { |
||||
continue |
||||
} |
||||
switch { |
||||
case streams.CheckSection(section): |
||||
stats.StreamsSections++ |
||||
case logs.CheckSection(section): |
||||
stats.LogsSections++ |
||||
} |
||||
} |
||||
|
||||
res = append(res, stats) |
||||
} |
||||
|
||||
return res, nil |
||||
} |
||||
|
||||
type sectionsStats struct { |
||||
StreamsSections int |
||||
LogsSections int |
||||
} |
||||
|
||||
// streamPredicate creates a dataobj.StreamsPredicate from a list of matchers and a time range
|
||||
func streamPredicate(matchers []*labels.Matcher, start, end time.Time) streams.RowPredicate { |
||||
var predicate streams.RowPredicate = streams.TimeRangeRowPredicate{ |
||||
StartTime: start, |
||||
EndTime: end, |
||||
IncludeStart: true, |
||||
IncludeEnd: true, |
||||
} |
||||
|
||||
// If there are any matchers, combine them with an AND predicate
|
||||
if len(matchers) > 0 { |
||||
predicate = streams.AndRowPredicate{ |
||||
Left: predicate, |
||||
Right: matchersToPredicate(matchers), |
||||
} |
||||
} |
||||
return predicate |
||||
} |
||||
|
||||
// matchersToPredicate converts a list of matchers to a dataobj.StreamsPredicate
|
||||
func matchersToPredicate(matchers []*labels.Matcher) streams.RowPredicate { |
||||
var left streams.RowPredicate |
||||
for _, matcher := range matchers { |
||||
var right streams.RowPredicate |
||||
switch matcher.Type { |
||||
case labels.MatchEqual: |
||||
right = streams.LabelMatcherRowPredicate{Name: matcher.Name, Value: matcher.Value} |
||||
default: |
||||
right = streams.LabelFilterRowPredicate{Name: matcher.Name, Keep: func(_, value string) bool { |
||||
return matcher.Matches(value) |
||||
}} |
||||
} |
||||
if left == nil { |
||||
left = right |
||||
} else { |
||||
left = streams.AndRowPredicate{ |
||||
Left: left, |
||||
Right: right, |
||||
} |
||||
} |
||||
} |
||||
return left |
||||
} |
||||
|
||||
func parseShards(shards []string) (logql.Shard, error) { |
||||
if len(shards) == 0 { |
||||
return noShard, nil |
||||
} |
||||
parsed, _, err := logql.ParseShards(shards) |
||||
if err != nil { |
||||
return noShard, err |
||||
} |
||||
if len(parsed) == 0 { |
||||
return noShard, nil |
||||
} |
||||
if parsed[0].Variant() != logql.PowerOfTwoVersion { |
||||
return noShard, fmt.Errorf("unsupported shard variant: %s", parsed[0].Variant()) |
||||
} |
||||
return parsed[0], nil |
||||
} |
||||
|
||||
func buildLogsPredicateFromSampleExpr(expr syntax.SampleExpr) (logs.RowPredicate, syntax.SampleExpr) { |
||||
var ( |
||||
predicate logs.RowPredicate |
||||
skip bool |
||||
) |
||||
expr.Walk(func(e syntax.Expr) bool { |
||||
switch e := e.(type) { |
||||
case *syntax.BinOpExpr: |
||||
// we might not encounter BinOpExpr at this point since the lhs and rhs are evaluated separately?
|
||||
skip = true |
||||
case *syntax.RangeAggregationExpr: |
||||
if !skip { |
||||
predicate, e.Left.Left = buildLogsPredicateFromPipeline(e.Left.Left) |
||||
} |
||||
} |
||||
return true |
||||
}) |
||||
|
||||
return predicate, expr |
||||
} |
||||
|
||||
func buildLogsPredicateFromPipeline(expr syntax.LogSelectorExpr) (logs.RowPredicate, syntax.LogSelectorExpr) { |
||||
// Check if expr is a PipelineExpr, other implementations have no stages
|
||||
pipelineExpr, ok := expr.(*syntax.PipelineExpr) |
||||
if !ok { |
||||
return nil, expr |
||||
} |
||||
|
||||
var ( |
||||
predicate logs.RowPredicate |
||||
remainingStages = make([]syntax.StageExpr, 0, len(pipelineExpr.MultiStages)) |
||||
appendPredicate = func(p logs.RowPredicate) { |
||||
if predicate == nil { |
||||
predicate = p |
||||
} else { |
||||
predicate = logs.AndRowPredicate{ |
||||
Left: predicate, |
||||
Right: p, |
||||
} |
||||
} |
||||
} |
||||
) |
||||
|
||||
Outer: |
||||
for i, stage := range pipelineExpr.MultiStages { |
||||
switch s := stage.(type) { |
||||
case *syntax.LineFmtExpr: |
||||
// modifies the log line, break early as we cannot apply any more predicates
|
||||
remainingStages = append(remainingStages, pipelineExpr.MultiStages[i:]...) |
||||
break Outer |
||||
|
||||
case *syntax.LineFilterExpr: |
||||
// Convert the line filter to a predicate
|
||||
f, err := s.Filter() |
||||
if err != nil { |
||||
remainingStages = append(remainingStages, s) |
||||
continue |
||||
} |
||||
|
||||
// Create a line filter predicate
|
||||
appendPredicate(logs.LogMessageFilterRowPredicate{ |
||||
Keep: func(line []byte) bool { |
||||
return f.Filter(line) |
||||
}, |
||||
}) |
||||
|
||||
default: |
||||
remainingStages = append(remainingStages, s) |
||||
} |
||||
} |
||||
|
||||
if len(remainingStages) == 0 { |
||||
return predicate, pipelineExpr.Left // return MatchersExpr
|
||||
} |
||||
pipelineExpr.MultiStages = remainingStages |
||||
|
||||
return predicate, pipelineExpr |
||||
} |
||||
File diff suppressed because it is too large
Load Diff
Loading…
Reference in new issue