From 383032bccd47df692b25d76095b3ee814e674cf4 Mon Sep 17 00:00:00 2001 From: Periklis Tsirakidis Date: Tue, 9 Sep 2025 12:15:02 +0200 Subject: [PATCH] chore: Remove dataobj querier component (#19132) --- docs/sources/shared/configuration.md | 14 - pkg/dataobj/config/config.go | 6 - pkg/dataobj/querier/iter.go | 323 -------- pkg/dataobj/querier/iter_test.go | 124 ---- pkg/dataobj/querier/metadata.go | 336 --------- pkg/dataobj/querier/metadata_test.go | 330 -------- pkg/dataobj/querier/store.go | 813 -------------------- pkg/dataobj/querier/store_test.go | 1032 -------------------------- pkg/logql/bench/bench_test.go | 12 +- pkg/logql/bench/store_dataobj.go | 7 - pkg/loki/modules.go | 63 +- 11 files changed, 4 insertions(+), 3056 deletions(-) delete mode 100644 pkg/dataobj/querier/iter.go delete mode 100644 pkg/dataobj/querier/iter_test.go delete mode 100644 pkg/dataobj/querier/metadata.go delete mode 100644 pkg/dataobj/querier/metadata_test.go delete mode 100644 pkg/dataobj/querier/store.go delete mode 100644 pkg/dataobj/querier/store_test.go diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index f9fcd43605..52d1edb881 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -1157,20 +1157,6 @@ dataobj: # CLI flag: -dataobj-metastore.partition-ratio [partition_ratio: | default = 10] - querier: - # Enable the dataobj querier. - # CLI flag: -dataobj-querier-enabled - [enabled: | default = false] - - # The date of the first day of when the dataobj querier should start - # querying from. In YYYY-MM-DD format, for example: 2018-04-15. - # CLI flag: -dataobj-querier-from - [from: | default = 1970-01-01] - - # The number of shards to use for the dataobj querier. - # CLI flag: -dataobj-querier-shard-factor - [shard_factor: | default = 32] - # The prefix to use for the storage bucket. # CLI flag: -dataobj-storage-bucket-prefix [storage_bucket_prefix: | default = "dataobj/"] diff --git a/pkg/dataobj/config/config.go b/pkg/dataobj/config/config.go index ee2d6949cb..a4d0c1a1a9 100644 --- a/pkg/dataobj/config/config.go +++ b/pkg/dataobj/config/config.go @@ -6,14 +6,12 @@ import ( "github.com/grafana/loki/v3/pkg/dataobj/consumer" "github.com/grafana/loki/v3/pkg/dataobj/index" "github.com/grafana/loki/v3/pkg/dataobj/metastore" - "github.com/grafana/loki/v3/pkg/dataobj/querier" ) type Config struct { Consumer consumer.Config `yaml:"consumer"` Index index.Config `yaml:"index"` Metastore metastore.Config `yaml:"metastore"` - Querier querier.Config `yaml:"querier"` // StorageBucketPrefix is the prefix to use for the storage bucket. StorageBucketPrefix string `yaml:"storage_bucket_prefix"` } @@ -22,7 +20,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.Consumer.RegisterFlags(f) cfg.Index.RegisterFlags(f) cfg.Metastore.RegisterFlags(f) - cfg.Querier.RegisterFlags(f) f.StringVar(&cfg.StorageBucketPrefix, "dataobj-storage-bucket-prefix", "dataobj/", "The prefix to use for the storage bucket.") } @@ -30,9 +27,6 @@ func (cfg *Config) Validate() error { if err := cfg.Consumer.Validate(); err != nil { return err } - if err := cfg.Querier.Validate(); err != nil { - return err - } if err := cfg.Metastore.Validate(); err != nil { return err } diff --git a/pkg/dataobj/querier/iter.go b/pkg/dataobj/querier/iter.go deleted file mode 100644 index 855a882c0b..0000000000 --- a/pkg/dataobj/querier/iter.go +++ /dev/null @@ -1,323 +0,0 @@ -package querier - -import ( - "context" - "fmt" - "io" - "slices" - "sync" - - "github.com/grafana/loki/v3/pkg/dataobj/sections/logs" - "github.com/grafana/loki/v3/pkg/dataobj/sections/streams" - "github.com/grafana/loki/v3/pkg/iter" - "github.com/grafana/loki/v3/pkg/logproto" - "github.com/grafana/loki/v3/pkg/logql" - "github.com/grafana/loki/v3/pkg/logql/log" - "github.com/grafana/loki/v3/pkg/logql/syntax" - "github.com/grafana/loki/v3/pkg/logqlmodel/stats" - "github.com/grafana/loki/v3/pkg/util/topk" -) - -var ( - recordsPool = sync.Pool{ - New: func() interface{} { - records := make([]logs.Record, 1024) - return &records - }, - } - samplesPool = sync.Pool{ - New: func() interface{} { - samples := make([]logproto.Sample, 0, 1024) - return &samples - }, - } - entryWithLabelsPool = sync.Pool{ - New: func() interface{} { - entries := make([]entryWithLabels, 0, 1024) - return &entries - }, - } -) - -type entryWithLabels struct { - Labels string - StreamHash uint64 - Entry logproto.Entry -} - -// newEntryIterator creates a new EntryIterator for the given context, streams, and reader. -// It reads records from the reader and adds them to the topk heap based on the direction. -// The topk heap is used to maintain the top k entries based on the direction. -// The final result is returned as a slice of entries. -func newEntryIterator(ctx context.Context, - streams map[int64]streams.Stream, - reader *logs.RowReader, - req logql.SelectLogParams, -) (iter.EntryIterator, error) { - bufPtr := recordsPool.Get().(*[]logs.Record) - defer recordsPool.Put(bufPtr) - buf := *bufPtr - - selector, err := req.LogSelector() - if err != nil { - return nil, err - } - pipeline, err := selector.Pipeline() - if err != nil { - return nil, err - } - - var ( - prevStreamID int64 = -1 - streamExtractor log.StreamPipeline - streamHash uint64 - top = topk.Heap[entryWithLabels]{ - Limit: int(req.Limit), - Less: lessFn(req.Direction), - } - - statistics = stats.FromContext(ctx) - ) - - for { - n, err := reader.Read(ctx, buf) - if err != nil && err != io.EOF { - return nil, fmt.Errorf("failed to read log records: %w", err) - } - - if n == 0 && err == io.EOF { - break - } - - for _, record := range buf[:n] { - stream, ok := streams[record.StreamID] - if !ok { - continue - } - if prevStreamID != record.StreamID { - streamExtractor = pipeline.ForStream(stream.Labels) - streamHash = streamExtractor.BaseLabels().Hash() - prevStreamID = record.StreamID - } - - timestamp := record.Timestamp.UnixNano() - line, parsedLabels, ok := streamExtractor.Process(timestamp, record.Line, record.Metadata) - if !ok { - continue - } - statistics.AddPostFilterRows(1) - - top.Push(entryWithLabels{ - Labels: parsedLabels.String(), - StreamHash: streamHash, - Entry: logproto.Entry{ - Timestamp: record.Timestamp, - Line: string(line), - StructuredMetadata: logproto.FromLabelsToLabelAdapters(parsedLabels.StructuredMetadata()), - Parsed: logproto.FromLabelsToLabelAdapters(parsedLabels.Parsed()), - }, - }) - } - } - - return heapIterator(&top), nil -} - -func lessFn(direction logproto.Direction) func(a, b entryWithLabels) bool { - switch direction { - case logproto.FORWARD: - return func(a, b entryWithLabels) bool { - if a.Entry.Timestamp.Equal(b.Entry.Timestamp) { - return a.Labels < b.Labels - } - return a.Entry.Timestamp.After(b.Entry.Timestamp) - } - case logproto.BACKWARD: - return func(a, b entryWithLabels) bool { - if a.Entry.Timestamp.Equal(b.Entry.Timestamp) { - return a.Labels < b.Labels - } - return a.Entry.Timestamp.Before(b.Entry.Timestamp) - } - default: - panic("invalid direction") - } -} - -// heapIterator creates a new EntryIterator for the given topk heap. After -// calling heapIterator, h is emptied. -func heapIterator(h *topk.Heap[entryWithLabels]) iter.EntryIterator { - elems := h.PopAll() - - // We need to reverse the order of the entries in the slice to maintain the order of logs we - // want to return: - // - // For FORWARD direction, we want smallest timestamps first (but the heap is - // ordered by largest timestamps first due to lessFn). - // - // For BACKWARD direction, we want largest timestamps first (but the heap is - // ordered by smallest timestamps first due to lessFn). - slices.Reverse(elems) - - return &sliceIterator{entries: elems} -} - -type sliceIterator struct { - entries []entryWithLabels - curr entryWithLabels -} - -func (s *sliceIterator) Next() bool { - if len(s.entries) == 0 { - return false - } - s.curr = s.entries[0] - s.entries = s.entries[1:] - return true -} - -func (s *sliceIterator) At() logproto.Entry { - return s.curr.Entry -} - -func (s *sliceIterator) Err() error { - return nil -} - -func (s *sliceIterator) Labels() string { - return s.curr.Labels -} - -func (s *sliceIterator) StreamHash() uint64 { - return s.curr.StreamHash -} - -func (s *sliceIterator) Close() error { - clear(s.entries) - entryWithLabelsPool.Put(&s.entries) - return nil -} - -func newSampleIterator(ctx context.Context, - streamsMap map[int64]streams.Stream, - extractors []syntax.SampleExtractor, - reader *logs.RowReader, -) (iter.SampleIterator, error) { - bufPtr := recordsPool.Get().(*[]logs.Record) - defer recordsPool.Put(bufPtr) - buf := *bufPtr - - var ( - iterators []iter.SampleIterator - prevStreamID int64 = -1 - streamExtractor log.StreamSampleExtractor - series = map[string]*logproto.Series{} - streamHash uint64 - ) - - statistics := stats.FromContext(ctx) - // For dataobjs, this maps to sections downloaded - statistics.AddChunksDownloaded(1) - - for { - n, err := reader.Read(ctx, buf) - if err != nil && err != io.EOF { - return nil, fmt.Errorf("failed to read log records: %w", err) - } - - // Handle end of stream or empty read - if n == 0 && err == io.EOF { - iterators = appendIteratorFromSeries(iterators, series) - break - } - - // Process records in the current batch - for _, record := range buf[:n] { - stream, ok := streamsMap[record.StreamID] - if !ok { - continue - } - - for _, extractor := range extractors { - // Handle stream transition - if prevStreamID != record.StreamID { - iterators = appendIteratorFromSeries(iterators, series) - clear(series) - streamExtractor = extractor.ForStream(stream.Labels) - streamHash = streamExtractor.BaseLabels().Hash() - prevStreamID = record.StreamID - } - - // Process the record - timestamp := record.Timestamp.UnixNano() - - statistics.AddDecompressedLines(1) - samples, ok := streamExtractor.Process(timestamp, record.Line, record.Metadata) - if !ok { - continue - } - statistics.AddPostFilterLines(1) - - for _, sample := range samples { - - parsedLabels := sample.Labels - value := sample.Value - - // Get or create series for the parsed labels - labelString := parsedLabels.String() - s, exists := series[labelString] - if !exists { - s = createNewSeries(labelString, streamHash) - series[labelString] = s - } - - // Add sample to the series - s.Samples = append(s.Samples, logproto.Sample{ - Timestamp: timestamp, - Value: value, - Hash: 0, - }) - } - } - } - } - - if len(iterators) == 0 { - return iter.NoopSampleIterator, nil - } - - return iter.NewSortSampleIterator(iterators), nil -} - -// createNewSeries creates a new Series for the given labels and stream hash -func createNewSeries(labels string, streamHash uint64) *logproto.Series { - samplesPtr := samplesPool.Get().(*[]logproto.Sample) - samples := *samplesPtr - return &logproto.Series{ - Labels: labels, - Samples: samples[:0], - StreamHash: streamHash, - } -} - -// appendIteratorFromSeries appends a new SampleIterator to the given list of iterators -func appendIteratorFromSeries(iterators []iter.SampleIterator, series map[string]*logproto.Series) []iter.SampleIterator { - if len(series) == 0 { - return iterators - } - - seriesResult := make([]logproto.Series, 0, len(series)) - for _, s := range series { - seriesResult = append(seriesResult, *s) - } - - return append(iterators, iter.SampleIteratorWithClose( - iter.NewMultiSeriesIterator(seriesResult), - func() error { - for _, s := range seriesResult { - samplesPool.Put(&s.Samples) - } - return nil - }, - )) -} diff --git a/pkg/dataobj/querier/iter_test.go b/pkg/dataobj/querier/iter_test.go deleted file mode 100644 index addeffd21c..0000000000 --- a/pkg/dataobj/querier/iter_test.go +++ /dev/null @@ -1,124 +0,0 @@ -package querier - -import ( - "testing" - "time" - - "github.com/stretchr/testify/require" - - "github.com/grafana/loki/v3/pkg/logproto" - "github.com/grafana/loki/v3/pkg/util/topk" -) - -// makeEntry is a helper function to create a log entry with given timestamp and line -func makeEntry(ts time.Time, line string) logproto.Entry { - return logproto.Entry{ - Timestamp: ts, - Line: line, - } -} - -func TestTopKIterator(t *testing.T) { - tests := []struct { - name string - k int - direction logproto.Direction - input []entryWithLabels - want []entryWithLabels - }{ - { - name: "forward direction with k=2", - k: 2, - direction: logproto.FORWARD, - input: []entryWithLabels{ - {Entry: makeEntry(time.Unix(1, 0), "line1"), Labels: "{app=\"app1\"}", StreamHash: 1}, - {Entry: makeEntry(time.Unix(3, 0), "line3"), Labels: "{app=\"app1\"}", StreamHash: 1}, - {Entry: makeEntry(time.Unix(2, 0), "line2"), Labels: "{app=\"app2\"}", StreamHash: 2}, - {Entry: makeEntry(time.Unix(4, 0), "line4"), Labels: "{app=\"app2\"}", StreamHash: 2}, - }, - want: []entryWithLabels{ - {Entry: makeEntry(time.Unix(1, 0), "line1"), Labels: "{app=\"app1\"}", StreamHash: 1}, - {Entry: makeEntry(time.Unix(2, 0), "line2"), Labels: "{app=\"app2\"}", StreamHash: 2}, - }, - }, - { - name: "backward direction with k=3", - k: 3, - direction: logproto.BACKWARD, - input: []entryWithLabels{ - {Entry: makeEntry(time.Unix(1, 0), "line1"), Labels: "{app=\"app1\"}", StreamHash: 1}, - {Entry: makeEntry(time.Unix(4, 0), "line4"), Labels: "{app=\"app1\"}", StreamHash: 1}, - {Entry: makeEntry(time.Unix(2, 0), "line2"), Labels: "{app=\"app2\"}", StreamHash: 2}, - {Entry: makeEntry(time.Unix(3, 0), "line3"), Labels: "{app=\"app2\"}", StreamHash: 2}, - {Entry: makeEntry(time.Unix(5, 0), "line5"), Labels: "{app=\"app2\"}", StreamHash: 2}, - }, - want: []entryWithLabels{ - {Entry: makeEntry(time.Unix(5, 0), "line5"), Labels: "{app=\"app2\"}", StreamHash: 2}, - {Entry: makeEntry(time.Unix(4, 0), "line4"), Labels: "{app=\"app1\"}", StreamHash: 1}, - {Entry: makeEntry(time.Unix(3, 0), "line3"), Labels: "{app=\"app2\"}", StreamHash: 2}, - }, - }, - { - name: "k larger than available entries", - k: 10, - direction: logproto.FORWARD, - input: []entryWithLabels{ - {Entry: makeEntry(time.Unix(1, 0), "line1"), Labels: "{app=\"app1\"}", StreamHash: 1}, - {Entry: makeEntry(time.Unix(2, 0), "line2"), Labels: "{app=\"app1\"}", StreamHash: 1}, - }, - want: []entryWithLabels{ - {Entry: makeEntry(time.Unix(1, 0), "line1"), Labels: "{app=\"app1\"}", StreamHash: 1}, - {Entry: makeEntry(time.Unix(2, 0), "line2"), Labels: "{app=\"app1\"}", StreamHash: 1}, - }, - }, - { - name: "mixed timestamps with k=4", - k: 4, - direction: logproto.FORWARD, - input: []entryWithLabels{ - {Entry: makeEntry(time.Unix(1, 0), "line1"), Labels: "{app=\"app1\"}", StreamHash: 1}, - {Entry: makeEntry(time.Unix(4, 0), "line4"), Labels: "{app=\"app1\"}", StreamHash: 1}, - {Entry: makeEntry(time.Unix(5, 0), "line5"), Labels: "{app=\"app1\"}", StreamHash: 1}, - {Entry: makeEntry(time.Unix(2, 0), "line2"), Labels: "{app=\"app2\"}", StreamHash: 2}, - {Entry: makeEntry(time.Unix(3, 0), "line3"), Labels: "{app=\"app2\"}", StreamHash: 2}, - {Entry: makeEntry(time.Unix(6, 0), "line6"), Labels: "{app=\"app2\"}", StreamHash: 2}, - }, - want: []entryWithLabels{ - {Entry: makeEntry(time.Unix(1, 0), "line1"), Labels: "{app=\"app1\"}", StreamHash: 1}, - {Entry: makeEntry(time.Unix(2, 0), "line2"), Labels: "{app=\"app2\"}", StreamHash: 2}, - {Entry: makeEntry(time.Unix(3, 0), "line3"), Labels: "{app=\"app2\"}", StreamHash: 2}, - {Entry: makeEntry(time.Unix(4, 0), "line4"), Labels: "{app=\"app1\"}", StreamHash: 1}, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // Create topk iterator - top := topk.Heap[entryWithLabels]{ - Limit: tt.k, - Less: lessFn(tt.direction), - } - - // Add entries - for _, e := range tt.input { - top.Push(e) - } - - // Collect results - var got []entryWithLabels - - iter := heapIterator(&top) - for iter.Next() { - got = append(got, entryWithLabels{ - Entry: iter.At(), - Labels: iter.Labels(), - StreamHash: iter.StreamHash(), - }) - } - - require.Equal(t, tt.want, got) - require.NoError(t, iter.Err()) - }) - } -} diff --git a/pkg/dataobj/querier/metadata.go b/pkg/dataobj/querier/metadata.go deleted file mode 100644 index f38720e5d2..0000000000 --- a/pkg/dataobj/querier/metadata.go +++ /dev/null @@ -1,336 +0,0 @@ -package querier - -import ( - "context" - "fmt" - "io" - "sort" - "sync" - "time" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/labels" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" - "go.uber.org/atomic" - "golang.org/x/sync/errgroup" - - "github.com/grafana/loki/v3/pkg/dataobj" - "github.com/grafana/loki/v3/pkg/dataobj/sections/streams" - "github.com/grafana/loki/v3/pkg/logproto" - "github.com/grafana/loki/v3/pkg/logql" - util_log "github.com/grafana/loki/v3/pkg/util/log" -) - -var streamsPool = sync.Pool{ - New: func() any { - streams := make([]streams.Stream, 1024) - return &streams - }, -} - -// SelectSeries implements querier.Store -func (s *Store) SelectSeries(ctx context.Context, req logql.SelectLogParams) ([]logproto.SeriesIdentifier, error) { - logger := util_log.WithContext(ctx, s.logger) - - objects, err := s.objectsForTimeRange(ctx, req.Start, req.End, logger) - if err != nil { - return nil, err - } - - if len(objects) == 0 { - return nil, nil - } - - shard, err := parseShards(req.Shards) - if err != nil { - return nil, err - } - - var matchers []*labels.Matcher - if req.Selector != "" { - expr, err := req.LogSelector() - if err != nil { - return nil, err - } - matchers = expr.Matchers() - } - - uniqueSeries := &sync.Map{} - - processor := newStreamProcessor(req.Start, req.End, matchers, objects, shard, logger) - - err = processor.ProcessParallel(ctx, func(h uint64, stream streams.Stream) { - uniqueSeries.Store(h, labelsToSeriesIdentifier(stream.Labels)) - }) - if err != nil { - return nil, err - } - var result []logproto.SeriesIdentifier - - // Convert sync.Map to slice - uniqueSeries.Range(func(_, value interface{}) bool { - if sid, ok := value.(logproto.SeriesIdentifier); ok { - result = append(result, sid) - } - return true - }) - - return result, nil -} - -// LabelNamesForMetricName implements querier.Store -func (s *Store) LabelNamesForMetricName(ctx context.Context, _ string, from, through model.Time, _ string, matchers ...*labels.Matcher) ([]string, error) { - logger := util_log.WithContext(ctx, s.logger) - start, end := from.Time(), through.Time() - objects, err := s.objectsForTimeRange(ctx, start, end, logger) - if err != nil { - return nil, err - } - - if len(objects) == 0 { - return nil, nil - } - - processor := newStreamProcessor(start, end, matchers, objects, noShard, logger) - uniqueNames := sync.Map{} - - err = processor.ProcessParallel(ctx, func(_ uint64, stream streams.Stream) { - stream.Labels.Range(func(label labels.Label) { - uniqueNames.Store(label.Name, nil) - }) - }) - if err != nil { - return nil, err - } - - names := []string{} - uniqueNames.Range(func(key, _ interface{}) bool { - names = append(names, key.(string)) - return true - }) - - sort.Strings(names) - - return names, nil -} - -// LabelValuesForMetricName implements querier.Store -func (s *Store) LabelValuesForMetricName(ctx context.Context, _ string, from, through model.Time, _ string, labelName string, matchers ...*labels.Matcher) ([]string, error) { - logger := util_log.WithContext(ctx, s.logger) - start, end := from.Time(), through.Time() - - requireLabel, err := labels.NewMatcher(labels.MatchNotEqual, labelName, "") - if err != nil { - return nil, fmt.Errorf("failed to instantiate label matcher: %w", err) - } - - matchers = append(matchers, requireLabel) - - objects, err := s.objectsForTimeRange(ctx, start, end, logger) - if err != nil { - return nil, err - } - - if len(objects) == 0 { - return nil, nil - } - - processor := newStreamProcessor(start, end, matchers, objects, noShard, logger) - uniqueValues := sync.Map{} - - err = processor.ProcessParallel(ctx, func(_ uint64, stream streams.Stream) { - uniqueValues.Store(stream.Labels.Get(labelName), nil) - }) - if err != nil { - return nil, err - } - - values := []string{} - uniqueValues.Range(func(key, _ interface{}) bool { - values = append(values, key.(string)) - return true - }) - - sort.Strings(values) - - return values, nil -} - -// streamProcessor handles processing of unique series with custom collection logic -type streamProcessor struct { - predicate streams.RowPredicate - seenSeries *sync.Map - objects []object - shard logql.Shard - logger log.Logger -} - -// newStreamProcessor creates a new streamProcessor with the given parameters -func newStreamProcessor(start, end time.Time, matchers []*labels.Matcher, objects []object, shard logql.Shard, logger log.Logger) *streamProcessor { - return &streamProcessor{ - predicate: streamPredicate(matchers, start, end), - seenSeries: &sync.Map{}, - objects: objects, - shard: shard, - logger: logger, - } -} - -// ProcessParallel processes series from multiple readers in parallel -// dataobj.Stream objects returned to onNewStream may be reused and must be deep copied for further use, including the stream.Labels keys and values. -func (sp *streamProcessor) ProcessParallel(ctx context.Context, onNewStream func(uint64, streams.Stream)) error { - readers, err := shardStreamReaders(ctx, sp.objects, sp.shard) - if err != nil { - return err - } - defer func() { - for _, reader := range readers { - _ = reader.Close() - streamReaderPool.Put(reader) - } - }() - - start := time.Now() - span := trace.SpanFromContext(ctx) - span.AddEvent("processing streams", trace.WithAttributes(attribute.Int("total_readers", len(readers)))) - level.Debug(sp.logger).Log("msg", "processing streams", "total_readers", len(readers)) - - // set predicate on all readers - for _, reader := range readers { - if err := reader.SetPredicate(sp.predicate); err != nil { - return err - } - } - - g, ctx := errgroup.WithContext(ctx) - var processedStreams atomic.Int64 - for _, reader := range readers { - g.Go(func() error { - ctx, span := tracer.Start(ctx, "streamProcessor.processSingleReader") - defer span.End() - - n, err := sp.processSingleReader(ctx, reader, onNewStream) - if err != nil { - return err - } - processedStreams.Add(n) - return nil - }) - } - err = g.Wait() - if err != nil { - return err - } - - level.Debug(sp.logger).Log("msg", "finished processing streams", - "total_readers", len(readers), - "total_streams_processed", processedStreams.Load(), - "duration", time.Since(start), - ) - span.AddEvent("streamProcessor.ProcessParallel done", trace.WithAttributes( - attribute.Int("total_readers", len(readers)), - attribute.Int64("total_streams_processed", processedStreams.Load()), - attribute.String("duration", time.Since(start).String()), - )) - - return nil -} - -func (sp *streamProcessor) processSingleReader(ctx context.Context, reader *streams.RowReader, onNewStream func(uint64, streams.Stream)) (int64, error) { - var ( - streamsPtr = streamsPool.Get().(*[]streams.Stream) - streams = *streamsPtr - buf = make([]byte, 0, 1024) - h uint64 - processed int64 - ) - - defer streamsPool.Put(streamsPtr) - - for { - n, err := reader.Read(ctx, streams) - if err != nil && err != io.EOF { - return processed, fmt.Errorf("failed to read streams: %w", err) - } - if n == 0 && err == io.EOF { - break - } - for _, stream := range streams[:n] { - h, buf = stream.Labels.HashWithoutLabels(buf, []string(nil)...) - // Try to claim this hash first - if _, seen := sp.seenSeries.LoadOrStore(h, nil); seen { - continue - } - onNewStream(h, stream) - processed++ - } - } - return processed, nil -} - -func labelsToSeriesIdentifier(lbls labels.Labels) logproto.SeriesIdentifier { - series := make([]logproto.SeriesIdentifier_LabelsEntry, 0, lbls.Len()) - lbls.Range(func(label labels.Label) { - series = append(series, logproto.SeriesIdentifier_LabelsEntry{ - Key: label.Name, - Value: label.Value, - }) - }) - return logproto.SeriesIdentifier{Labels: series} -} - -// shardStreamReaders fetches metadata of objects in parallel and shards them into a list of StreamsReaders -func shardStreamReaders(ctx context.Context, objects []object, shard logql.Shard) ([]*streams.RowReader, error) { - ctx, span := tracer.Start(ctx, "shardStreamReaders") - defer span.End() - - span.SetAttributes(attribute.Int("objects", len(objects))) - - var ( - // sectionIndex tracks the global section number across all objects to ensure consistent sharding - sectionIndex uint64 - readers []*streams.RowReader - ) - - for _, object := range objects { - // For sharded queries (e.g., "1 of 2"), we only read sections that belong to our shard - // The section is assigned to a shard based on its global index across all objects - if shard.PowerOfTwo != nil && shard.PowerOfTwo.Of > 1 { - if sectionIndex%uint64(shard.PowerOfTwo.Of) != uint64(shard.PowerOfTwo.Shard) { - sectionIndex++ - continue - } - } - - var found *dataobj.Section - for _, section := range object.Sections() { - if !streams.CheckSection(section) { - continue - } else if found != nil { - return nil, fmt.Errorf("object has unsupported multiple streams sections") - } - found = section - } - if found == nil { - return nil, fmt.Errorf("object has no streams sections") - } - - sec, err := streams.Open(ctx, found) - if err != nil { - return nil, fmt.Errorf("opening streams section: %w", err) - } - - reader := streamReaderPool.Get().(*streams.RowReader) - reader.Reset(sec) - readers = append(readers, reader) - sectionIndex++ - } - - span.AddEvent("shardStreamReaders done", trace.WithAttributes( - attribute.Int("readers", len(readers)), - )) - return readers, nil -} diff --git a/pkg/dataobj/querier/metadata_test.go b/pkg/dataobj/querier/metadata_test.go deleted file mode 100644 index d2f03611cb..0000000000 --- a/pkg/dataobj/querier/metadata_test.go +++ /dev/null @@ -1,330 +0,0 @@ -package querier - -import ( - "context" - "testing" - "time" - - "github.com/go-kit/log" - "github.com/grafana/dskit/user" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/labels" - "github.com/stretchr/testify/require" - - "github.com/grafana/loki/v3/pkg/dataobj/metastore" - "github.com/grafana/loki/v3/pkg/logproto" - "github.com/grafana/loki/v3/pkg/logql" -) - -func TestStore_SelectSeries(t *testing.T) { - const testTenant = "test-tenant" - builder := newTestDataBuilder(t) - defer builder.close() - - ctx, _ := context.WithTimeout(t.Context(), time.Second) //nolint:govet - ctx = user.InjectOrgID(ctx, testTenant) - - // Setup test data - now := setupTestData(ctx, t, builder, testTenant) - meta := metastore.NewObjectMetastore(builder.bucket, log.NewNopLogger(), nil) - store := NewStore(builder.bucket, log.NewNopLogger(), meta) - - tests := []struct { - name string - selector string - want []string - }{ - { - name: "select all series", - selector: ``, - want: []string{ - `{app="foo", env="prod"}`, - `{app="foo", env="dev"}`, - `{app="bar", env="prod"}`, - `{app="bar", env="dev"}`, - `{app="baz", env="prod", team="a"}`, - }, - }, - { - name: "select with equality matcher", - selector: `{app="foo"}`, - want: []string{ - `{app="foo", env="prod"}`, - `{app="foo", env="dev"}`, - }, - }, - { - name: "select with regex matcher", - selector: `{app=~"foo|bar"}`, - want: []string{ - `{app="foo", env="prod"}`, - `{app="foo", env="dev"}`, - `{app="bar", env="prod"}`, - `{app="bar", env="dev"}`, - }, - }, - { - name: "select with negative equality matcher", - selector: `{app=~".+", app!="foo"}`, - want: []string{ - `{app="bar", env="prod"}`, - `{app="bar", env="dev"}`, - `{app="baz", env="prod", team="a"}`, - }, - }, - { - name: "select with negative regex matcher", - selector: `{app=~".+", app!~"foo|bar"}`, - want: []string{ - `{app="baz", env="prod", team="a"}`, - }, - }, - { - name: "select with multiple matchers", - selector: `{app="foo", env="prod"}`, - want: []string{ - `{app="foo", env="prod"}`, - }, - }, - { - name: "select with regex and equality matchers", - selector: `{app=~"foo|bar", env="prod"}`, - want: []string{ - `{app="foo", env="prod"}`, - `{app="bar", env="prod"}`, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - series, err := store.SelectSeries(ctx, logql.SelectLogParams{ - QueryRequest: &logproto.QueryRequest{ - Start: now.Add(-time.Hour), - End: now.Add(time.Hour), - Plan: planFromString(tt.selector), - Selector: tt.selector, - }, - }) - require.NoError(t, err) - - var got []string - for _, s := range series { - got = append(got, labelsFromSeriesID(s)) - } - require.ElementsMatch(t, tt.want, got) - }) - } - - t.Run("sharding", func(t *testing.T) { - // Query first shard - series1, err := store.SelectSeries(ctx, logql.SelectLogParams{ - QueryRequest: &logproto.QueryRequest{ - Start: now.Add(-time.Hour), - End: now.Add(time.Hour), - Plan: planFromString(`{app=~"foo|bar|baz"}`), - Selector: `{app=~"foo|bar|baz"}`, - Shards: []string{"0_of_2"}, - }, - }) - require.NoError(t, err) - require.NotEmpty(t, series1) - require.Less(t, len(series1), 5) // Should get less than all series - - // Query second shard - series2, err := store.SelectSeries(ctx, logql.SelectLogParams{ - QueryRequest: &logproto.QueryRequest{ - Start: now.Add(-time.Hour), - End: now.Add(time.Hour), - Plan: planFromString(`{app=~"foo|bar|baz"}`), - Selector: `{app=~"foo|bar|baz"}`, - Shards: []string{"1_of_2"}, - }, - }) - require.NoError(t, err) - require.NotEmpty(t, series2) - - // Combined shards should equal all series - var allSeries []string - for _, s := range append(series1, series2...) { - allSeries = append(allSeries, labelsFromSeriesID(s)) - } - - want := []string{ - `{app="foo", env="prod"}`, - `{app="foo", env="dev"}`, - `{app="bar", env="prod"}`, - `{app="bar", env="dev"}`, - `{app="baz", env="prod", team="a"}`, - } - require.ElementsMatch(t, want, allSeries) - }) -} - -func TestStore_LabelNamesForMetricName(t *testing.T) { - const testTenant = "test-tenant" - builder := newTestDataBuilder(t) - defer builder.close() - - ctx, _ := context.WithTimeout(t.Context(), time.Second) //nolint:govet - ctx = user.InjectOrgID(ctx, testTenant) - - // Setup test data - now := setupTestData(ctx, t, builder, testTenant) - meta := metastore.NewObjectMetastore(builder.bucket, log.NewNopLogger(), nil) - store := NewStore(builder.bucket, log.NewNopLogger(), meta) - - tests := []struct { - name string - matchers []*labels.Matcher - want []string - }{ - { - name: "no matchers", - matchers: nil, - want: []string{"app", "env", "team"}, - }, - { - name: "with equality matcher", - matchers: []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchEqual, "app", "foo"), - }, - want: []string{"app", "env"}, - }, - { - name: "with regex matcher", - matchers: []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchRegexp, "app", "foo|bar"), - }, - want: []string{"app", "env"}, - }, - { - name: "with negative matcher", - matchers: []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchNotEqual, "app", "foo"), - }, - want: []string{"app", "env", "team"}, - }, - { - name: "with negative regex matcher", - matchers: []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchNotRegexp, "app", "foo|bar"), - }, - want: []string{"app", "env", "team"}, - }, - { - name: "with multiple matchers", - matchers: []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchEqual, "app", "foo"), - labels.MustNewMatcher(labels.MatchEqual, "env", "prod"), - }, - want: []string{"app", "env"}, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - names, err := store.LabelNamesForMetricName(ctx, "", model.TimeFromUnixNano(now.Add(-time.Hour).UnixNano()), model.TimeFromUnixNano(now.Add(time.Hour).UnixNano()), "", tt.matchers...) - require.NoError(t, err) - require.ElementsMatch(t, tt.want, names) - }) - } -} - -func TestStore_LabelValuesForMetricName(t *testing.T) { - const testTenant = "test-tenant" - builder := newTestDataBuilder(t) - defer builder.close() - - ctx, _ := context.WithTimeout(t.Context(), time.Second) //nolint:govet - ctx = user.InjectOrgID(ctx, testTenant) - - // Setup test data - now := setupTestData(ctx, t, builder, testTenant) - meta := metastore.NewObjectMetastore(builder.bucket, log.NewNopLogger(), nil) - store := NewStore(builder.bucket, log.NewNopLogger(), meta) - - tests := []struct { - name string - labelName string - matchers []*labels.Matcher - want []string - }{ - { - name: "app label without matchers", - labelName: "app", - matchers: nil, - want: []string{"bar", "baz", "foo"}, - }, - { - name: "env label without matchers", - labelName: "env", - matchers: nil, - want: []string{"dev", "prod"}, - }, - { - name: "team label without matchers", - labelName: "team", - matchers: nil, - want: []string{"a"}, - }, - { - name: "env label with app equality matcher", - labelName: "env", - matchers: []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchEqual, "app", "foo"), - }, - want: []string{"dev", "prod"}, - }, - { - name: "env label with app regex matcher", - labelName: "env", - matchers: []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchRegexp, "app", "foo|bar"), - }, - want: []string{"dev", "prod"}, - }, - { - name: "env label with app negative matcher", - labelName: "env", - matchers: []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchNotEqual, "app", "foo"), - }, - want: []string{"dev", "prod"}, - }, - { - name: "env label with app negative regex matcher", - labelName: "env", - matchers: []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchNotRegexp, "app", "foo|bar"), - }, - want: []string{"prod"}, - }, - { - name: "env label with multiple matchers", - labelName: "env", - matchers: []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchEqual, "app", "foo"), - labels.MustNewMatcher(labels.MatchEqual, "env", "prod"), - }, - want: []string{"prod"}, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - values, err := store.LabelValuesForMetricName(ctx, "", model.TimeFromUnixNano(now.Add(-time.Hour).UnixNano()), model.TimeFromUnixNano(now.Add(time.Hour).UnixNano()), "", tt.labelName, tt.matchers...) - require.NoError(t, err) - require.Equal(t, tt.want, values) - }) - } -} - -func labelsFromSeriesID(id logproto.SeriesIdentifier) string { - builder := labels.NewScratchBuilder(len(id.Labels)) - for _, l := range id.Labels { - builder.Add(l.Key, l.Value) - } - builder.Sort() - return builder.Labels().String() -} diff --git a/pkg/dataobj/querier/store.go b/pkg/dataobj/querier/store.go deleted file mode 100644 index f2bcf50949..0000000000 --- a/pkg/dataobj/querier/store.go +++ /dev/null @@ -1,813 +0,0 @@ -package querier - -import ( - "context" - "flag" - "fmt" - "io" - "slices" - "strings" - "sync" - "time" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/grafana/dskit/user" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/labels" - "github.com/thanos-io/objstore" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" - "golang.org/x/sync/errgroup" - - "github.com/grafana/loki/v3/pkg/dataobj" - "github.com/grafana/loki/v3/pkg/dataobj/metastore" - "github.com/grafana/loki/v3/pkg/dataobj/sections/logs" - "github.com/grafana/loki/v3/pkg/dataobj/sections/streams" - "github.com/grafana/loki/v3/pkg/iter" - "github.com/grafana/loki/v3/pkg/logproto" - "github.com/grafana/loki/v3/pkg/logql" - "github.com/grafana/loki/v3/pkg/logql/syntax" - "github.com/grafana/loki/v3/pkg/querier" - "github.com/grafana/loki/v3/pkg/storage/chunk" - "github.com/grafana/loki/v3/pkg/storage/config" - storageconfig "github.com/grafana/loki/v3/pkg/storage/config" - "github.com/grafana/loki/v3/pkg/storage/stores/index/stats" - "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index" - "github.com/grafana/loki/v3/pkg/tracing" - util_log "github.com/grafana/loki/v3/pkg/util/log" -) - -var tracer = otel.Tracer("pkg/dataobj/querier") - -var ( - _ querier.Store = &Store{} - - noShard = logql.Shard{ - PowerOfTwo: &index.ShardAnnotation{ - Shard: uint32(0), - Of: uint32(1), - }, - } - - shardedObjectsPool = sync.Pool{ - New: func() any { - return &shardedObject{ - streams: make(map[int64]streams.Stream), - streamsIDs: make([]int64, 0, 1024), - logReaders: make([]*logs.RowReader, 0, 16), - } - }, - } - logReaderPool = sync.Pool{ - New: func() any { - return &logs.RowReader{} - }, - } - streamReaderPool = sync.Pool{ - New: func() any { - return &streams.RowReader{} - }, - } -) - -type Config struct { - Enabled bool `yaml:"enabled" doc:"description=Enable the dataobj querier."` - From storageconfig.DayTime `yaml:"from" doc:"description=The date of the first day of when the dataobj querier should start querying from. In YYYY-MM-DD format, for example: 2018-04-15."` - ShardFactor int `yaml:"shard_factor" doc:"description=The number of shards to use for the dataobj querier."` -} - -func (c *Config) RegisterFlags(f *flag.FlagSet) { - f.BoolVar(&c.Enabled, "dataobj-querier-enabled", false, "Enable the dataobj querier.") - f.Var(&c.From, "dataobj-querier-from", "The start time to query from.") - f.IntVar(&c.ShardFactor, "dataobj-querier-shard-factor", 32, "The number of shards to use for the dataobj querier.") -} - -func (c *Config) Validate() error { - if c.Enabled && c.From.ModelTime().Time().IsZero() { - return fmt.Errorf("from is required when dataobj querier is enabled") - } - return nil -} - -func (c *Config) PeriodConfig() config.PeriodConfig { - return config.PeriodConfig{ - From: c.From, - RowShards: uint32(c.ShardFactor), - Schema: "v13", - } -} - -// Store implements querier.Store for querying data objects. -type Store struct { - bucket objstore.Bucket - logger log.Logger - metastore metastore.Metastore -} - -// NewStore creates a new Store. -func NewStore(bucket objstore.Bucket, logger log.Logger, metastore metastore.Metastore) *Store { - return &Store{ - bucket: bucket, - logger: logger, - metastore: metastore, - } -} - -func (s *Store) String() string { - return "dataobj" -} - -// SelectLogs implements querier.Store -func (s *Store) SelectLogs(ctx context.Context, req logql.SelectLogParams) (iter.EntryIterator, error) { - logger := util_log.WithContext(ctx, s.logger) - - objects, err := s.objectsForTimeRange(ctx, req.Start, req.End, logger) - if err != nil { - return nil, err - } - if len(objects) == 0 { - return iter.NoopEntryIterator, nil - } - - shard, err := parseShards(req.Shards) - if err != nil { - return nil, err - } - - return selectLogs(ctx, objects, shard, req, logger) -} - -// SelectSamples implements querier.Store -func (s *Store) SelectSamples(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error) { - logger := util_log.WithContext(ctx, s.logger) - - objects, err := s.objectsForTimeRange(ctx, req.Start, req.End, logger) - if err != nil { - return nil, err - } - if len(objects) == 0 { - return iter.NoopSampleIterator, nil - } - - shard, err := parseShards(req.Shards) - if err != nil { - return nil, err - } - expr, err := req.Expr() - if err != nil { - return nil, err - } - - return selectSamples(ctx, objects, shard, expr, req.Start, req.End, logger) -} - -// Stats implements querier.Store -func (s *Store) Stats(_ context.Context, _ string, _ model.Time, _ model.Time, _ ...*labels.Matcher) (*stats.Stats, error) { - // TODO: Implement - return &stats.Stats{}, nil -} - -// Volume implements querier.Store -func (s *Store) Volume(_ context.Context, _ string, _ model.Time, _ model.Time, _ int32, _ []string, _ string, _ ...*labels.Matcher) (*logproto.VolumeResponse, error) { - // TODO: Implement - return &logproto.VolumeResponse{}, nil -} - -// GetShards implements querier.Store -func (s *Store) GetShards(_ context.Context, _ string, _ model.Time, _ model.Time, _ uint64, _ chunk.Predicate) (*logproto.ShardsResponse, error) { - // TODO: Implement - return &logproto.ShardsResponse{}, nil -} - -type object struct { - *dataobj.Object - path string -} - -// objectsForTimeRange returns data objects for the given time range. -func (s *Store) objectsForTimeRange(ctx context.Context, from, through time.Time, logger log.Logger) ([]object, error) { - ctx, span := tracer.Start(ctx, "objectsForTimeRange") - defer span.End() - - span.SetAttributes( - attribute.String("from", from.String()), - attribute.String("through", through.String()), - ) - - files, err := s.metastore.DataObjects(ctx, from, through) - if err != nil { - return nil, err - } - - level.Debug(logger).Log( - "msg", "found data objects for time range", - "count", len(files), - "from", from, - "through", through, - ) - span.AddEvent("found data objects for time range", trace.WithAttributes( - attribute.Int("count", len(files)), - attribute.StringSlice("files", files)), - ) - - objects := make([]object, 0, len(files)) - for _, path := range files { - obj, err := dataobj.FromBucket(ctx, s.bucket, path) - if err != nil { - return nil, fmt.Errorf("getting object from bucket: %w", err) - } - objects = append(objects, object{Object: obj, path: path}) - } - return objects, nil -} - -func selectLogs(ctx context.Context, objects []object, shard logql.Shard, req logql.SelectLogParams, logger log.Logger) (iter.EntryIterator, error) { - selector, err := req.LogSelector() - if err != nil { - return nil, err - } - shardedObjects, err := shardObjects(ctx, objects, shard, logger) - if err != nil { - return nil, err - } - defer func() { - for _, obj := range shardedObjects { - obj.reset() - shardedObjectsPool.Put(obj) - } - }() - streamsPredicate := streamPredicate(selector.Matchers(), req.Start, req.End) - var logsPredicates []logs.RowPredicate - logsPredicates = append(logsPredicates, logs.TimeRangeRowPredicate{ - StartTime: req.Start, - EndTime: req.End, - IncludeStart: true, - IncludeEnd: false, - }) - - p, expr := buildLogsPredicateFromPipeline(selector) - if p != nil { - logsPredicates = append(logsPredicates, p) - } - req.Plan.AST = expr - - g, ctx := errgroup.WithContext(ctx) - iterators := make([]iter.EntryIterator, len(shardedObjects)) - - for i, obj := range shardedObjects { - g.Go(func() error { - ctx, span := tracer.Start(ctx, "object selectLogs", trace.WithAttributes( - attribute.String("object", obj.object.path), - attribute.Int("sections", len(obj.logReaders)), - )) - defer span.End() - - iterator, err := obj.selectLogs(ctx, streamsPredicate, logsPredicates, req) - if err != nil { - return err - } - iterators[i] = iterator - return nil - }) - } - if err := g.Wait(); err != nil { - return nil, err - } - - return iter.NewSortEntryIterator(iterators, req.Direction), nil -} - -func selectSamples(ctx context.Context, objects []object, shard logql.Shard, expr syntax.SampleExpr, start, end time.Time, logger log.Logger) (iter.SampleIterator, error) { - shardedObjects, err := shardObjects(ctx, objects, shard, logger) - if err != nil { - return nil, err - } - defer func() { - for _, obj := range shardedObjects { - obj.reset() - shardedObjectsPool.Put(obj) - } - }() - selector, err := expr.Selector() - if err != nil { - return nil, err - } - - streamsPredicate := streamPredicate(selector.Matchers(), start, end) - // TODO: support more predicates and combine with log.Pipeline. - var logsPredicates []logs.RowPredicate - logsPredicates = append(logsPredicates, logs.TimeRangeRowPredicate{ - StartTime: start, - EndTime: end, - IncludeStart: true, - IncludeEnd: false, - }) - - var predicateFromExpr logs.RowPredicate - predicateFromExpr, expr = buildLogsPredicateFromSampleExpr(expr) - if predicateFromExpr != nil { - logsPredicates = append(logsPredicates, predicateFromExpr) - } - - g, ctx := errgroup.WithContext(ctx) - iterators := make([]iter.SampleIterator, len(shardedObjects)) - - for i, obj := range shardedObjects { - g.Go(func() error { - ctx, span := tracer.Start(ctx, "object selectSamples", trace.WithAttributes( - attribute.String("object", obj.object.path), - attribute.Int("sections", len(obj.logReaders)), - )) - - defer span.End() - - iterator, err := obj.selectSamples(ctx, streamsPredicate, logsPredicates, expr) - if err != nil { - return err - } - iterators[i] = iterator - return nil - }) - } - if err := g.Wait(); err != nil { - return nil, err - } - - return iter.NewSortSampleIterator(iterators), nil -} - -type shardedObject struct { - object object - streamReader *streams.RowReader - logReaders []*logs.RowReader - - streamsIDs []int64 - streams map[int64]streams.Stream -} - -// shardSections returns a list of section indices to read per metadata based on the sharding configuration. -// The returned slice has the same length as the input metadatas, and each element contains the list of section indices -// that should be read for that metadata. -func shardSections(metadatas []sectionsStats, shard logql.Shard) [][]int { - // Count total sections before sharding - var totalSections int - for _, metadata := range metadatas { - totalSections += metadata.LogsSections - if metadata.StreamsSections > 1 { - // We don't support multiple streams sections, but we still need to return a slice - // with the same length as the input metadatas. - return make([][]int, len(metadatas)) - } - } - - // sectionIndex tracks the global section number across all objects to ensure consistent sharding - var sectionIndex uint64 - result := make([][]int, len(metadatas)) - - for i, metadata := range metadatas { - sections := make([]int, 0, metadata.LogsSections) - for j := 0; j < metadata.LogsSections; j++ { - if shard.PowerOfTwo != nil && shard.PowerOfTwo.Of > 1 { - if sectionIndex%uint64(shard.PowerOfTwo.Of) != uint64(shard.PowerOfTwo.Shard) { - sectionIndex++ - continue - } - } - sections = append(sections, j) - sectionIndex++ - } - result[i] = sections - } - - return result -} - -func shardObjects( - ctx context.Context, - objects []object, - shard logql.Shard, - logger log.Logger, -) ([]*shardedObject, error) { - ctx, span := tracer.Start(ctx, "shardObjects") - defer span.End() - - metadatas, err := fetchSectionsStats(ctx, objects) - if err != nil { - return nil, err - } - - // Get the sections to read per metadata - sectionsPerMetadata := shardSections(metadatas, shard) - - // Count total sections that will be read - var totalSections int - var objectSections []int - for i, sections := range sectionsPerMetadata { - totalSections += len(sections) - objectSections = append(objectSections, metadatas[i].LogsSections) - } - - shardedReaders := make([]*shardedObject, 0, len(objects)) - - for i, sections := range sectionsPerMetadata { - if len(sections) == 0 { - continue - } - - reader := shardedObjectsPool.Get().(*shardedObject) - reader.streamReader = streamReaderPool.Get().(*streams.RowReader) - reader.object = objects[i] - - sec, err := findStreamsSection(ctx, objects[i].Object) - if err != nil { - return nil, fmt.Errorf("finding streams section: %w", err) - } - reader.streamReader.Reset(sec) - - for _, section := range sections { - sec, err := findLogsSection(ctx, objects[i].Object, section) - if err != nil { - return nil, fmt.Errorf("finding logs section: %w", err) - } - - logReader := logReaderPool.Get().(*logs.RowReader) - logReader.Reset(sec) - reader.logReaders = append(reader.logReaders, logReader) - } - shardedReaders = append(shardedReaders, reader) - } - var sectionsString strings.Builder - for _, sections := range sectionsPerMetadata { - sectionsString.WriteString(fmt.Sprintf("%v ", sections)) - } - - logParams := []interface{}{ - "msg", "sharding sections", - "sharded_factor", shard.String(), - "total_objects", len(objects), - "total_sections", totalSections, - "object_sections", fmt.Sprintf("%v", objectSections), - "sharded_total_objects", len(shardedReaders), - "sharded_sections", sectionsString.String(), - } - - level.Debug(logger).Log(logParams...) - sp := trace.SpanFromContext(ctx) - sp.SetAttributes(tracing.KeyValuesToOTelAttributes(logParams)...) - - return shardedReaders, nil -} - -func findLogsSection(ctx context.Context, obj *dataobj.Object, index int) (*logs.Section, error) { - var count int - - for _, section := range obj.Sections().Filter(logs.CheckSection) { - if count == index { - return logs.Open(ctx, section) - } - count++ - } - - return nil, fmt.Errorf("object does not have logs section %d (max %d)", index, count) -} - -func findStreamsSection(ctx context.Context, obj *dataobj.Object) (*streams.Section, error) { - targetTenant, err := user.ExtractOrgID(ctx) - if err != nil { - return nil, fmt.Errorf("extracting org ID: %w", err) - } - - for _, section := range obj.Sections().Filter(streams.CheckSection) { - if section.Tenant != targetTenant { - continue - } - return streams.Open(ctx, section) - } - - return nil, fmt.Errorf("object has no streams sections") -} - -func (s *shardedObject) reset() { - _ = s.streamReader.Close() - streamReaderPool.Put(s.streamReader) - for i, reader := range s.logReaders { - _ = reader.Close() - logReaderPool.Put(reader) - s.logReaders[i] = nil - } - s.streamReader = nil - s.logReaders = s.logReaders[:0] - s.streamsIDs = s.streamsIDs[:0] - s.object = object{} - clear(s.streams) -} - -func (s *shardedObject) selectLogs(ctx context.Context, streamsPredicate streams.RowPredicate, logsPredicates []logs.RowPredicate, req logql.SelectLogParams) (iter.EntryIterator, error) { - if err := s.setPredicate(streamsPredicate, logsPredicates); err != nil { - return nil, err - } - - if err := s.matchStreams(ctx); err != nil { - return nil, err - } - iterators := make([]iter.EntryIterator, len(s.logReaders)) - g, ctx := errgroup.WithContext(ctx) - - for i, reader := range s.logReaders { - g.Go(func() error { - sp := trace.SpanFromContext(ctx) - sp.AddEvent("starting selectLogs in section", trace.WithAttributes( - attribute.Int("index", i), - )) - defer func() { - sp.AddEvent("selectLogs section done", trace.WithAttributes( - attribute.Int("index", i), - )) - }() - - iter, err := newEntryIterator(ctx, s.streams, reader, req) - if err != nil { - return err - } - - iterators[i] = iter - return nil - }) - } - if err := g.Wait(); err != nil { - return nil, err - } - return iter.NewSortEntryIterator(iterators, req.Direction), nil -} - -func (s *shardedObject) selectSamples(ctx context.Context, streamsPredicate streams.RowPredicate, logsPredicates []logs.RowPredicate, expr syntax.SampleExpr) (iter.SampleIterator, error) { - if err := s.setPredicate(streamsPredicate, logsPredicates); err != nil { - return nil, err - } - - if err := s.matchStreams(ctx); err != nil { - return nil, err - } - - iterators := make([]iter.SampleIterator, len(s.logReaders)) - g, ctx := errgroup.WithContext(ctx) - - for i, reader := range s.logReaders { - g.Go(func() error { - sp := trace.SpanFromContext(ctx) - sp.AddEvent("starting selectSamples in section", trace.WithAttributes( - attribute.Int("index", i), - )) - defer func() { - sp.AddEvent("selectSamples section done", trace.WithAttributes( - attribute.Int("index", i), - )) - }() - - // extractors is not thread safe, so we need to create a new one for each object - extractors, err := expr.Extractors() - if err != nil { - return err - } - iter, err := newSampleIterator(ctx, s.streams, extractors, reader) - if err != nil { - return err - } - iterators[i] = iter - return nil - }) - } - - if err := g.Wait(); err != nil { - return nil, err - } - - return iter.NewSortSampleIterator(iterators), nil -} - -func (s *shardedObject) setPredicate(streamsPredicate streams.RowPredicate, logsPredicates []logs.RowPredicate) error { - if err := s.streamReader.SetPredicate(streamsPredicate); err != nil { - return err - } - for _, reader := range s.logReaders { - if err := reader.SetPredicates(logsPredicates); err != nil { - return err - } - } - return nil -} - -func (s *shardedObject) matchStreams(ctx context.Context) error { - sp := trace.SpanFromContext(ctx) - sp.AddEvent("starting matchStreams") - defer sp.AddEvent("matchStreams done") - - streamsPtr := streamsPool.Get().(*[]streams.Stream) - defer streamsPool.Put(streamsPtr) - streams := *streamsPtr - - for { - n, err := s.streamReader.Read(ctx, streams) - if err != nil && err != io.EOF { - return err - } - if n == 0 && err == io.EOF { - break - } - - for _, stream := range streams[:n] { - s.streams[stream.ID] = stream - s.streamsIDs = append(s.streamsIDs, stream.ID) - } - } - // setup log readers to filter streams - for _, reader := range s.logReaders { - if err := reader.MatchStreams(slices.Values(s.streamsIDs)); err != nil { - return err - } - } - return nil -} - -// fetchSectionsStats retrieves section count of objects. -func fetchSectionsStats(ctx context.Context, objects []object) ([]sectionsStats, error) { - sp := trace.SpanFromContext(ctx) - sp.AddEvent("fetching metadata", trace.WithAttributes( - attribute.Int("objects", len(objects)), - )) - defer sp.AddEvent("fetched metadata") - - targetTenant, err := user.ExtractOrgID(ctx) - if err != nil { - return nil, fmt.Errorf("extracting org ID: %w", err) - } - - res := make([]sectionsStats, 0, len(objects)) - - for _, obj := range objects { - var stats sectionsStats - - for _, section := range obj.Sections() { - if section.Tenant != targetTenant { - continue - } - switch { - case streams.CheckSection(section): - stats.StreamsSections++ - case logs.CheckSection(section): - stats.LogsSections++ - } - } - - res = append(res, stats) - } - - return res, nil -} - -type sectionsStats struct { - StreamsSections int - LogsSections int -} - -// streamPredicate creates a dataobj.StreamsPredicate from a list of matchers and a time range -func streamPredicate(matchers []*labels.Matcher, start, end time.Time) streams.RowPredicate { - var predicate streams.RowPredicate = streams.TimeRangeRowPredicate{ - StartTime: start, - EndTime: end, - IncludeStart: true, - IncludeEnd: true, - } - - // If there are any matchers, combine them with an AND predicate - if len(matchers) > 0 { - predicate = streams.AndRowPredicate{ - Left: predicate, - Right: matchersToPredicate(matchers), - } - } - return predicate -} - -// matchersToPredicate converts a list of matchers to a dataobj.StreamsPredicate -func matchersToPredicate(matchers []*labels.Matcher) streams.RowPredicate { - var left streams.RowPredicate - for _, matcher := range matchers { - var right streams.RowPredicate - switch matcher.Type { - case labels.MatchEqual: - right = streams.LabelMatcherRowPredicate{Name: matcher.Name, Value: matcher.Value} - default: - right = streams.LabelFilterRowPredicate{Name: matcher.Name, Keep: func(_, value string) bool { - return matcher.Matches(value) - }} - } - if left == nil { - left = right - } else { - left = streams.AndRowPredicate{ - Left: left, - Right: right, - } - } - } - return left -} - -func parseShards(shards []string) (logql.Shard, error) { - if len(shards) == 0 { - return noShard, nil - } - parsed, _, err := logql.ParseShards(shards) - if err != nil { - return noShard, err - } - if len(parsed) == 0 { - return noShard, nil - } - if parsed[0].Variant() != logql.PowerOfTwoVersion { - return noShard, fmt.Errorf("unsupported shard variant: %s", parsed[0].Variant()) - } - return parsed[0], nil -} - -func buildLogsPredicateFromSampleExpr(expr syntax.SampleExpr) (logs.RowPredicate, syntax.SampleExpr) { - var ( - predicate logs.RowPredicate - skip bool - ) - expr.Walk(func(e syntax.Expr) bool { - switch e := e.(type) { - case *syntax.BinOpExpr: - // we might not encounter BinOpExpr at this point since the lhs and rhs are evaluated separately? - skip = true - case *syntax.RangeAggregationExpr: - if !skip { - predicate, e.Left.Left = buildLogsPredicateFromPipeline(e.Left.Left) - } - } - return true - }) - - return predicate, expr -} - -func buildLogsPredicateFromPipeline(expr syntax.LogSelectorExpr) (logs.RowPredicate, syntax.LogSelectorExpr) { - // Check if expr is a PipelineExpr, other implementations have no stages - pipelineExpr, ok := expr.(*syntax.PipelineExpr) - if !ok { - return nil, expr - } - - var ( - predicate logs.RowPredicate - remainingStages = make([]syntax.StageExpr, 0, len(pipelineExpr.MultiStages)) - appendPredicate = func(p logs.RowPredicate) { - if predicate == nil { - predicate = p - } else { - predicate = logs.AndRowPredicate{ - Left: predicate, - Right: p, - } - } - } - ) - -Outer: - for i, stage := range pipelineExpr.MultiStages { - switch s := stage.(type) { - case *syntax.LineFmtExpr: - // modifies the log line, break early as we cannot apply any more predicates - remainingStages = append(remainingStages, pipelineExpr.MultiStages[i:]...) - break Outer - - case *syntax.LineFilterExpr: - // Convert the line filter to a predicate - f, err := s.Filter() - if err != nil { - remainingStages = append(remainingStages, s) - continue - } - - // Create a line filter predicate - appendPredicate(logs.LogMessageFilterRowPredicate{ - Keep: func(line []byte) bool { - return f.Filter(line) - }, - }) - - default: - remainingStages = append(remainingStages, s) - } - } - - if len(remainingStages) == 0 { - return predicate, pipelineExpr.Left // return MatchersExpr - } - pipelineExpr.MultiStages = remainingStages - - return predicate, pipelineExpr -} diff --git a/pkg/dataobj/querier/store_test.go b/pkg/dataobj/querier/store_test.go deleted file mode 100644 index 7b7432312e..0000000000 --- a/pkg/dataobj/querier/store_test.go +++ /dev/null @@ -1,1032 +0,0 @@ -package querier - -import ( - "cmp" - "context" - "os" - "path/filepath" - "slices" - "testing" - "time" - - "github.com/go-kit/log" - gocmp "github.com/google/go-cmp/cmp" - "github.com/grafana/dskit/user" - "github.com/prometheus/client_golang/prometheus" - "github.com/stretchr/testify/require" - "github.com/thanos-io/objstore" - "github.com/thanos-io/objstore/providers/filesystem" - - "github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj" - "github.com/grafana/loki/v3/pkg/dataobj/metastore" - "github.com/grafana/loki/v3/pkg/dataobj/sections/logs" - "github.com/grafana/loki/v3/pkg/dataobj/uploader" - "github.com/grafana/loki/v3/pkg/iter" - "github.com/grafana/loki/v3/pkg/logproto" - "github.com/grafana/loki/v3/pkg/logql" - "github.com/grafana/loki/v3/pkg/logql/syntax" - "github.com/grafana/loki/v3/pkg/querier/plan" - "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index" -) - -type sampleWithLabels struct { - Labels string - Samples logproto.Sample -} - -func TestStore_SelectSamples(t *testing.T) { - const testTenant = "test-tenant" - builder := newTestDataBuilder(t) - defer builder.close() - - ctx, _ := context.WithTimeout(t.Context(), time.Second) //nolint:govet - ctx = user.InjectOrgID(ctx, testTenant) - - // Setup test data - now := setupTestData(ctx, t, builder, testTenant) - meta := metastore.NewObjectMetastore(builder.bucket, log.NewNopLogger(), nil) - store := NewStore(builder.bucket, log.NewNopLogger(), meta) - - tests := []struct { - // NOTE(rfratto): Do not add tests for shards without a way to have - // consistently hashed objects! - // - // dataobj/querier is only used for testing dataobjs on the old engine, and - // will never hit production. We previously had tests for individual shards - // (which sharded based on the SHA-224 of the dataobjs), but it broke every - // time we updated the dataobj format, since the checksums would change. - // - // This meant constantly updating these tests, which was annoying. - // - // If you need to reintroduce tests for the shards, please first find a way - // to make sure the dataobj hashes are consistent (potentially via mocking) - // regardless of the fomrat so we don't have to keep updating these tests. - - name string - selector string - start time.Time - end time.Time - want []sampleWithLabels - }{ - { - name: "select all samples in range", - selector: `rate({app=~".+"}[1h])`, - start: now, - end: now.Add(time.Hour), - want: []sampleWithLabels{ - {Labels: `{app="bar", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(8 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="bar", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(18 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="bar", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(38 * time.Second).UnixNano(), Value: 1}}, - - {Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(5 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(15 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(25 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(40 * time.Second).UnixNano(), Value: 1}}, - - {Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(12 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(22 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(32 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(42 * time.Second).UnixNano(), Value: 1}}, - - {Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(10 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(20 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(35 * time.Second).UnixNano(), Value: 1}}, - - {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.UnixNano(), Value: 1}}, - {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(30 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(45 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(50 * time.Second).UnixNano(), Value: 1}}, - }, - }, - { - name: "select with time range filter", - selector: `rate({app="baz", env="prod", team="a"}[1h])`, - start: now.Add(20 * time.Second), - end: now.Add(40 * time.Second), - want: []sampleWithLabels{ - {Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(22 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(32 * time.Second).UnixNano(), Value: 1}}, - }, - }, - { - name: "select with label matcher", - selector: `rate({app="foo"}[1h])`, - start: now, - end: now.Add(time.Hour), - want: []sampleWithLabels{ - {Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(10 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(20 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(35 * time.Second).UnixNano(), Value: 1}}, - - {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.UnixNano(), Value: 1}}, - {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(30 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(45 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(50 * time.Second).UnixNano(), Value: 1}}, - }, - }, - { - name: "select with regex matcher", - selector: `rate({app=~"foo|bar", env="prod"}[1h])`, - start: now, - end: now.Add(time.Hour), - want: []sampleWithLabels{ - {Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: 3605000000000, Value: 1}}, - {Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: 3615000000000, Value: 1}}, - {Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: 3625000000000, Value: 1}}, - {Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: 3640000000000, Value: 1}}, - - {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: 3600000000000, Value: 1}}, - {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: 3630000000000, Value: 1}}, - {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: 3645000000000, Value: 1}}, - {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: 3650000000000, Value: 1}}, - }, - }, - { - name: "select all samples in range with a filter", - selector: `count_over_time({app=~".+"} |= "bar2"[1h])`, - start: now, - end: now.Add(time.Hour), - want: []sampleWithLabels{ - {Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(15 * time.Second).UnixNano(), Value: 1}}, - }, - }, - { - name: "select all samples in range with multiple line filters", - selector: `rate({app=~".+"} != "bar" |~ "foo[3-6]"[1h])`, - start: now, - end: now.Add(time.Hour), - want: []sampleWithLabels{ - {Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(10 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(20 * time.Second).UnixNano(), Value: 1}}, - - {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(45 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(50 * time.Second).UnixNano(), Value: 1}}, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - it, err := store.SelectSamples(ctx, logql.SelectSampleParams{ - SampleQueryRequest: &logproto.SampleQueryRequest{ - Start: tt.start, - End: tt.end, - Plan: planFromString(tt.selector), - Selector: tt.selector, - }, - }) - require.NoError(t, err) - samples, err := readAllSamples(it) - require.NoError(t, err) - - // Sort the output by labels and timestamp; changes to how data objects - // are written can change the order of what we see, which is not what we - // care to test here. - slices.SortFunc(samples, func(a, b sampleWithLabels) int { - res := cmp.Compare(a.Labels, b.Labels) - if res == 0 { - return cmp.Compare(a.Samples.Timestamp, b.Samples.Timestamp) - } - return res - }) - - if diff := gocmp.Diff(tt.want, samples); diff != "" { - t.Errorf("samples mismatch (-want +got):\n%s", diff) - } - }) - } -} - -func TestStore_SelectLogs(t *testing.T) { - const testTenant = "test-tenant" - builder := newTestDataBuilder(t) - defer builder.close() - - ctx, _ := context.WithTimeout(t.Context(), time.Second) //nolint:govet - ctx = user.InjectOrgID(ctx, testTenant) - - // Setup test data - now := setupTestData(ctx, t, builder, testTenant) - meta := metastore.NewObjectMetastore(builder.bucket, log.NewNopLogger(), nil) - store := NewStore(builder.bucket, log.NewLogfmtLogger(os.Stdout), meta) - - tests := []struct { - // NOTE(rfratto): Do not add tests for shards without a way to have - // consistently hashed objects! - // - // dataobj/querier is only used for testing dataobjs on the old engine, and - // will never hit production. We previously had tests for individual shards - // (which sharded based on the SHA-224 of the dataobjs), but it broke every - // time we updated the dataobj format, since the checksums would change. - // - // This meant constantly updating these tests, which was annoying. - // - // If you need to reintroduce tests for the shards, please first find a way - // to make sure the dataobj hashes are consistent (potentially via mocking) - // regardless of the fomrat so we don't have to keep updating these tests. - - name string - selector string - start time.Time - end time.Time - limit uint32 - direction logproto.Direction - want []entryWithLabels - }{ - { - name: "select all logs in range", - selector: `{app=~".+"}`, - start: now, - end: now.Add(time.Hour), - limit: 100, - direction: logproto.FORWARD, - want: []entryWithLabels{ - {Labels: `{app="bar", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(8 * time.Second), Line: "bar5"}}, - {Labels: `{app="bar", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(18 * time.Second), Line: "bar6"}}, - {Labels: `{app="bar", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(38 * time.Second), Line: "bar7"}}, - - {Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(5 * time.Second), Line: "bar1"}}, - {Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(15 * time.Second), Line: "bar2"}}, - {Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(25 * time.Second), Line: "bar3"}}, - {Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(40 * time.Second), Line: "bar4"}}, - - {Labels: `{app="baz", env="prod", team="a"}`, Entry: logproto.Entry{Timestamp: now.Add(12 * time.Second), Line: "baz1"}}, - {Labels: `{app="baz", env="prod", team="a"}`, Entry: logproto.Entry{Timestamp: now.Add(22 * time.Second), Line: "baz2"}}, - {Labels: `{app="baz", env="prod", team="a"}`, Entry: logproto.Entry{Timestamp: now.Add(32 * time.Second), Line: "baz3"}}, - {Labels: `{app="baz", env="prod", team="a"}`, Entry: logproto.Entry{Timestamp: now.Add(42 * time.Second), Line: "baz4"}}, - - {Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(10 * time.Second), Line: "foo5"}}, - {Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(20 * time.Second), Line: "foo6"}}, - {Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(35 * time.Second), Line: "foo7"}}, - - {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now, Line: "foo1"}}, - {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(30 * time.Second), Line: "foo2"}}, - {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(45 * time.Second), Line: "foo3"}}, - {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(50 * time.Second), Line: "foo4"}}, - }, - }, - { - name: "select with time range filter", - selector: `{app="baz", env="prod", team="a"}`, - start: now.Add(20 * time.Second), - end: now.Add(40 * time.Second), - limit: 100, - direction: logproto.FORWARD, - want: []entryWithLabels{ - {Labels: `{app="baz", env="prod", team="a"}`, Entry: logproto.Entry{Timestamp: now.Add(22 * time.Second), Line: "baz2"}}, - {Labels: `{app="baz", env="prod", team="a"}`, Entry: logproto.Entry{Timestamp: now.Add(32 * time.Second), Line: "baz3"}}, - }, - }, - { - name: "select with label matcher", - selector: `{app="foo"}`, - start: now, - end: now.Add(time.Hour), - limit: 100, - direction: logproto.FORWARD, - want: []entryWithLabels{ - {Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(10 * time.Second), Line: "foo5"}}, - {Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(20 * time.Second), Line: "foo6"}}, - {Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(35 * time.Second), Line: "foo7"}}, - - {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now, Line: "foo1"}}, - {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(30 * time.Second), Line: "foo2"}}, - {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(45 * time.Second), Line: "foo3"}}, - {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(50 * time.Second), Line: "foo4"}}, - }, - }, - { - name: "select with line filter", - selector: `{app=~".+"} |= "bar2"`, - start: now, - end: now.Add(time.Hour), - limit: 100, - direction: logproto.FORWARD, - want: []entryWithLabels{ - {Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(15 * time.Second), Line: "bar2"}}, - }, - }, - { - name: "select with multiple line filters", - selector: `{app=~".+"} != "bar" |~ "foo[3-6]"`, - start: now, - end: now.Add(time.Hour), - limit: 100, - direction: logproto.FORWARD, - want: []entryWithLabels{ - {Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(10 * time.Second), Line: "foo5"}}, - {Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(20 * time.Second), Line: "foo6"}}, - - {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(45 * time.Second), Line: "foo3"}}, - {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(50 * time.Second), Line: "foo4"}}, - }, - }, - } - - emptyLabelAdapters := logproto.EmptyLabelAdapters() - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // Make sure all empty label sets use an empty slice, rather than nil, to make assertions below easier. - for i := range tt.want { - if len(tt.want[i].Entry.Parsed) == 0 { - tt.want[i].Entry.Parsed = emptyLabelAdapters - } - - if len(tt.want[i].Entry.StructuredMetadata) == 0 { - tt.want[i].Entry.StructuredMetadata = emptyLabelAdapters - } - } - - it, err := store.SelectLogs(ctx, logql.SelectLogParams{ - QueryRequest: &logproto.QueryRequest{ - Start: tt.start, - End: tt.end, - Plan: planFromString(tt.selector), - Selector: tt.selector, - Limit: tt.limit, - Direction: tt.direction, - }, - }) - require.NoError(t, err) - entries, err := readAllEntries(it) - require.NoError(t, err) - - // Sort the output by labels; changes to how data objects are written can - // change the order of what we see, which is not what we care to test - // here. - slices.SortFunc(entries, func(a, b entryWithLabels) int { - res := cmp.Compare(a.Labels, b.Labels) - if res == 0 { - return a.Entry.Timestamp.Compare(b.Entry.Timestamp) - } - return res - }) - - if diff := gocmp.Diff(tt.want, entries); diff != "" { - t.Errorf("entries mismatch (-want +got):\n%s", diff) - } - }) - } -} - -func setupTestData(ctx context.Context, t *testing.T, builder *testDataBuilder, tenant string) time.Time { - t.Helper() - now := time.Unix(0, int64(time.Hour)).UTC() - - // Data before the query range (should not be included in results) - builder.addStream( - tenant, - `{app="foo", env="prod"}`, - logproto.Entry{Timestamp: now.Add(-2 * time.Hour), Line: "foo_before1"}, - logproto.Entry{Timestamp: now.Add(-2 * time.Hour).Add(30 * time.Second), Line: "foo_before2"}, - logproto.Entry{Timestamp: now.Add(-2 * time.Hour).Add(45 * time.Second), Line: "foo_before3"}, - ) - builder.flush(ctx) - - // Data within query range - builder.addStream( - tenant, - `{app="foo", env="prod"}`, - logproto.Entry{Timestamp: now, Line: "foo1"}, - logproto.Entry{Timestamp: now.Add(30 * time.Second), Line: "foo2"}, - logproto.Entry{Timestamp: now.Add(45 * time.Second), Line: "foo3"}, - logproto.Entry{Timestamp: now.Add(50 * time.Second), Line: "foo4"}, - ) - builder.addStream( - tenant, - `{app="foo", env="dev"}`, - logproto.Entry{Timestamp: now.Add(10 * time.Second), Line: "foo5"}, - logproto.Entry{Timestamp: now.Add(20 * time.Second), Line: "foo6"}, - logproto.Entry{Timestamp: now.Add(35 * time.Second), Line: "foo7"}, - ) - builder.flush(ctx) - - builder.addStream( - tenant, - `{app="bar", env="prod"}`, - logproto.Entry{Timestamp: now.Add(5 * time.Second), Line: "bar1"}, - logproto.Entry{Timestamp: now.Add(15 * time.Second), Line: "bar2"}, - logproto.Entry{Timestamp: now.Add(25 * time.Second), Line: "bar3"}, - logproto.Entry{Timestamp: now.Add(40 * time.Second), Line: "bar4"}, - ) - builder.addStream( - tenant, - `{app="bar", env="dev"}`, - logproto.Entry{Timestamp: now.Add(8 * time.Second), Line: "bar5"}, - logproto.Entry{Timestamp: now.Add(18 * time.Second), Line: "bar6"}, - logproto.Entry{Timestamp: now.Add(38 * time.Second), Line: "bar7"}, - ) - builder.flush(ctx) - - builder.addStream( - tenant, - `{app="baz", env="prod", team="a"}`, - logproto.Entry{Timestamp: now.Add(12 * time.Second), Line: "baz1"}, - logproto.Entry{Timestamp: now.Add(22 * time.Second), Line: "baz2"}, - logproto.Entry{Timestamp: now.Add(32 * time.Second), Line: "baz3"}, - logproto.Entry{Timestamp: now.Add(42 * time.Second), Line: "baz4"}, - ) - builder.flush(ctx) - - // Data after the query range (should not be included in results) - builder.addStream( - tenant, - `{app="foo", env="prod"}`, - logproto.Entry{Timestamp: now.Add(2 * time.Hour), Line: "foo_after1"}, - logproto.Entry{Timestamp: now.Add(2 * time.Hour).Add(30 * time.Second), Line: "foo_after2"}, - logproto.Entry{Timestamp: now.Add(2 * time.Hour).Add(45 * time.Second), Line: "foo_after3"}, - ) - builder.flush(ctx) - - return now -} - -func planFromString(s string) *plan.QueryPlan { - if s == "" { - return nil - } - expr, err := syntax.ParseExpr(s) - if err != nil { - panic(err) - } - return &plan.QueryPlan{ - AST: expr, - } -} - -// testDataBuilder helps build test data for querier tests. -type testDataBuilder struct { - t *testing.T - bucket objstore.Bucket - dir string - - builder *logsobj.Builder - meta *metastore.TableOfContentsWriter - uploader *uploader.Uploader -} - -func newTestDataBuilder(t *testing.T) *testDataBuilder { - dir := t.TempDir() - bucket, err := filesystem.NewBucket(dir) - require.NoError(t, err) - - // Create required directories for metastore - metastoreDir := filepath.Join(dir, "tocs") - require.NoError(t, os.MkdirAll(metastoreDir, 0o755)) - - builder, err := logsobj.NewBuilder(logsobj.BuilderConfig{ - TargetPageSize: 1024 * 1024, // 1MB - TargetObjectSize: 10 * 1024 * 1024, // 10MB - TargetSectionSize: 1024 * 1024, // 1MB - BufferSize: 1024 * 1024, // 1MB - - SectionStripeMergeLimit: 2, - }, nil) - require.NoError(t, err) - - meta := metastore.NewTableOfContentsWriter(bucket, log.NewNopLogger()) - require.NoError(t, meta.RegisterMetrics(prometheus.NewRegistry())) - - uploader := uploader.New(uploader.Config{SHAPrefixSize: 2}, bucket, log.NewNopLogger()) - require.NoError(t, uploader.RegisterMetrics(prometheus.NewRegistry())) - - return &testDataBuilder{ - t: t, - bucket: bucket, - dir: dir, - builder: builder, - meta: meta, - uploader: uploader, - } -} - -func (b *testDataBuilder) addStream(tenant, labels string, entries ...logproto.Entry) { - err := b.builder.Append(tenant, logproto.Stream{ - Labels: labels, - Entries: entries, - }) - require.NoError(b.t, err) -} - -func (b *testDataBuilder) flush(ctx context.Context) { - timeRanges := b.builder.TimeRanges() - obj, closer, err := b.builder.Flush() - require.NoError(b.t, err) - defer closer.Close() - - // Upload the data object using the uploader - path, err := b.uploader.Upload(ctx, obj) - require.NoError(b.t, err) - - require.NoError(b.t, b.meta.WriteEntry(ctx, path, timeRanges)) - b.builder.Reset() -} - -func (b *testDataBuilder) close() { - require.NoError(b.t, b.bucket.Close()) - os.RemoveAll(b.dir) -} - -// Helper function to read all samples from an iterator -func readAllSamples(it iter.SampleIterator) ([]sampleWithLabels, error) { - var result []sampleWithLabels - defer it.Close() - for it.Next() { - sample := it.At() - result = append(result, sampleWithLabels{ - Labels: it.Labels(), - Samples: sample, - }) - } - return result, it.Err() -} - -// Helper function to read all entries from an iterator -func readAllEntries(it iter.EntryIterator) ([]entryWithLabels, error) { - var result []entryWithLabels - defer it.Close() - for it.Next() { - result = append(result, entryWithLabels{ - Labels: it.Labels(), - Entry: it.At(), - }) - } - return result, it.Err() -} - -func TestShardSections(t *testing.T) { - tests := []struct { - name string - metadatas []sectionsStats - shard logql.Shard - want [][]int - }{ - { - name: "single section, no sharding", - metadatas: []sectionsStats{ - {LogsSections: 1}, - }, - shard: logql.Shard{ - PowerOfTwo: &index.ShardAnnotation{ - Shard: 0, - Of: 1, - }, - }, - want: [][]int{ - {0}, - }, - }, - { - name: "multiple sections, no sharding", - metadatas: []sectionsStats{ - {LogsSections: 3}, - {LogsSections: 2}, - }, - shard: logql.Shard{ - PowerOfTwo: &index.ShardAnnotation{ - Shard: 0, - Of: 1, - }, - }, - want: [][]int{ - {0, 1, 2}, - {0, 1}, - }, - }, - { - name: "multiple sections, shard 0 of 2", - metadatas: []sectionsStats{ - {LogsSections: 3}, - {LogsSections: 2}, - }, - shard: logql.Shard{ - PowerOfTwo: &index.ShardAnnotation{ - Shard: 0, - Of: 2, - }, - }, - want: [][]int{ - {0, 2}, - {1}, - }, - }, - { - name: "multiple sections, shard 1 of 2", - metadatas: []sectionsStats{ - {LogsSections: 3}, - {LogsSections: 2}, - }, - shard: logql.Shard{ - PowerOfTwo: &index.ShardAnnotation{ - Shard: 1, - Of: 2, - }, - }, - want: [][]int{ - {1}, - {0}, - }, - }, - { - name: "more sections than shards, shard 0 of 2", - metadatas: []sectionsStats{ - {LogsSections: 5}, - }, - shard: logql.Shard{ - PowerOfTwo: &index.ShardAnnotation{ - Shard: 0, - Of: 2, - }, - }, - want: [][]int{ - {0, 2, 4}, - }, - }, - { - name: "more sections than shards, shard 1 of 2", - metadatas: []sectionsStats{ - {LogsSections: 5}, - }, - shard: logql.Shard{ - PowerOfTwo: &index.ShardAnnotation{ - Shard: 1, - Of: 2, - }, - }, - want: [][]int{ - {1, 3}, - }, - }, - { - name: "complex case, shard 0 of 4", - metadatas: []sectionsStats{ - {LogsSections: 7}, // sections 0,4 - {LogsSections: 5}, // sections 0,4 - {LogsSections: 3}, // sections 0 - }, - shard: logql.Shard{ - PowerOfTwo: &index.ShardAnnotation{ - Shard: 0, - Of: 4, - }, - }, - want: [][]int{ - {0, 4}, - {1}, - {0}, - }, - }, - { - name: "complex case, shard 1 of 4", - metadatas: []sectionsStats{ - {LogsSections: 7}, // sections 1,5 - {LogsSections: 5}, // sections 1 - {LogsSections: 3}, // sections 1 - }, - shard: logql.Shard{ - PowerOfTwo: &index.ShardAnnotation{ - Shard: 1, - Of: 4, - }, - }, - want: [][]int{ - {1, 5}, - {2}, - {1}, - }, - }, - { - name: "complex case, shard 2 of 4", - metadatas: []sectionsStats{ - {LogsSections: 7}, // sections 2,6 - {LogsSections: 5}, // sections 2 - {LogsSections: 3}, // sections 2 - }, - shard: logql.Shard{ - PowerOfTwo: &index.ShardAnnotation{ - Shard: 2, - Of: 4, - }, - }, - want: [][]int{ - {2, 6}, - {3}, - {2}, - }, - }, - { - name: "complex case, shard 3 of 4", - metadatas: []sectionsStats{ - {LogsSections: 7}, // sections 3 - {LogsSections: 5}, // sections 3 - {LogsSections: 3}, // no sections - }, - shard: logql.Shard{ - PowerOfTwo: &index.ShardAnnotation{ - Shard: 3, - Of: 4, - }, - }, - want: [][]int{ - {3}, - {0, 4}, - {}, - }, - }, - { - name: "less sections than shards, shard 1 of 4", - metadatas: []sectionsStats{ - {LogsSections: 1}, - }, - shard: logql.Shard{ - PowerOfTwo: &index.ShardAnnotation{ - Shard: 1, - Of: 4, - }, - }, - want: [][]int{{}}, - }, - { - name: "less sections than shards, shard 0 of 4", - metadatas: []sectionsStats{ - {LogsSections: 1}, - }, - shard: logql.Shard{ - PowerOfTwo: &index.ShardAnnotation{ - Shard: 0, - Of: 4, - }, - }, - want: [][]int{{0}}, - }, - { - name: "multiple streams sections not supported", - metadatas: []sectionsStats{ - {LogsSections: 1, StreamsSections: 2}, - }, - shard: logql.Shard{ - PowerOfTwo: &index.ShardAnnotation{ - Shard: 0, - Of: 1, - }, - }, - want: [][]int{nil}, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got := shardSections(tt.metadatas, tt.shard) - require.Equal(t, tt.want, got) - - // For test cases with multiple shards, verify that all sections are covered exactly once - if tt.shard.PowerOfTwo != nil && tt.shard.PowerOfTwo.Of > 1 { - // Skip verification for invalid metadata - for _, meta := range tt.metadatas { - if meta.StreamsSections > 1 { - return - } - } - - // Track which sections are assigned - type sectionKey struct { - metaIdx int - section int - } - sectionCounts := make(map[sectionKey]int) - - // Count sections from all shards - for shardIdx := uint32(0); shardIdx < tt.shard.PowerOfTwo.Of; shardIdx++ { - shard := logql.Shard{ - PowerOfTwo: &index.ShardAnnotation{ - Shard: shardIdx, - Of: tt.shard.PowerOfTwo.Of, - }, - } - sections := shardSections(tt.metadatas, shard) - for metaIdx, metaSections := range sections { - for _, section := range metaSections { - key := sectionKey{metaIdx: metaIdx, section: section} - sectionCounts[key]++ - } - } - } - - // Verify each section is assigned exactly once - for metaIdx, meta := range tt.metadatas { - for section := range meta.LogsSections { - key := sectionKey{metaIdx: metaIdx, section: section} - count := sectionCounts[key] - require.Equal(t, 1, count, "section %d in metadata %d was assigned %d times", section, metaIdx, count) - } - } - } - }) - } -} - -func TestBuildLogsPredicateFromPipeline(t *testing.T) { - var evalPredicate func(p logs.RowPredicate, item []byte) bool - evalPredicate = func(p logs.RowPredicate, line []byte) bool { - switch p := p.(type) { - case logs.LogMessageFilterRowPredicate: - return p.Keep(line) - case logs.AndRowPredicate: - return evalPredicate(p.Left, line) && evalPredicate(p.Right, line) - default: - t.Fatalf("unsupported predicate type: %T", p) - return false - } - } - - // helper function to test predicates against sample data - testPredicate := func(t *testing.T, pred logs.RowPredicate, testData [][]byte, expected []bool) { - t.Helper() - require.Equal(t, len(testData), len(expected), "test data and expected results must have the same length") - - for i, line := range testData { - result := evalPredicate(pred, line) - require.Equal(t, expected[i], result, "predicate mismatch for line: %s", line) - } - } - - // Create test data sets - testData := [][]byte{ - []byte("this is an error message"), - []byte("this is a critical error"), - []byte("this is a success message"), - []byte("this is a warning message"), - } - - for _, tt := range []struct { - name string - query syntax.LogSelectorExpr - expectedExpr syntax.LogSelectorExpr - testFunc func(t *testing.T, pred logs.RowPredicate) - }{ - { - name: "single line match equal filter", - query: mustParseLogSelector(t, `{app="foo"} |= "error"`), - expectedExpr: mustParseLogSelector(t, `{app="foo"}`), // Line filter should be removed - testFunc: func(t *testing.T, pred logs.RowPredicate) { - require.NotNil(t, pred) - require.IsType(t, logs.LogMessageFilterRowPredicate{}, pred) - - // Verify the predicate works correctly - expected := []bool{true, true, false, false} - testPredicate(t, pred, testData, expected) - }, - }, - { - name: "single line match not equal filter", - query: mustParseLogSelector(t, `{app="foo"} != "error"`), - expectedExpr: mustParseLogSelector(t, `{app="foo"}`), // Line filter should be removed - testFunc: func(t *testing.T, pred logs.RowPredicate) { - require.NotNil(t, pred) - require.IsType(t, logs.LogMessageFilterRowPredicate{}, pred) - - // Verify the predicate works correctly - expected := []bool{false, false, true, true} - testPredicate(t, pred, testData, expected) - }, - }, - { - name: "multiple line filters", - query: mustParseLogSelector(t, `{app="foo"} |= "error" |= "critical"`), - expectedExpr: mustParseLogSelector(t, `{app="foo"}`), // Both line filters should be removed - testFunc: func(t *testing.T, pred logs.RowPredicate) { - require.NotNil(t, pred) - // we might expect AND predicate here, but the original expression - // only contains a single Filterer stage with chained OR filters - require.IsType(t, logs.LogMessageFilterRowPredicate{}, pred) - - // The result should match logs containing both "error" and "critical" - expected := []bool{false, true, false, false} - testPredicate(t, pred, testData, expected) - }, - }, - { - name: "line filter stage after line_format", - query: mustParseLogSelector(t, `{app="foo"} | line_format "{{.message}}" |= "error"`), - expectedExpr: mustParseLogSelector(t, `{app="foo"} | line_format "{{.message}}" |= "error"`), // No filters should be removed - testFunc: func(t *testing.T, pred logs.RowPredicate) { - // Line filters after stages that mutate the line should be ignored - require.Nil(t, pred, "expected nil predicate for line filter after parser") - }, - }, - { - name: "no line filter", - query: mustParseLogSelector(t, `{app="foo"} | json | bar>100`), - expectedExpr: mustParseLogSelector(t, `{app="foo"} | json | bar>100`), // No filters to remove - testFunc: func(t *testing.T, pred logs.RowPredicate) { - require.Nil(t, pred, "expected nil predicate for query without line filter") - }, - }, - { - name: "line filter stage after label_fmt", - query: mustParseLogSelector(t, `{app="foo"} | label_format foo=bar |= "error"`), - expectedExpr: mustParseLogSelector(t, `{app="foo"} | label_format foo=bar`), // Line filter should be removed - testFunc: func(t *testing.T, pred logs.RowPredicate) { - require.NotNil(t, pred) - require.IsType(t, logs.LogMessageFilterRowPredicate{}, pred) - - // Verify the predicate works correctly - expected := []bool{true, true, false, false} - testPredicate(t, pred, testData, expected) - }, - }, - { - name: "mixed filters with some removable", - query: mustParseLogSelector(t, `{app="foo"} |= "error" | json | status="critical" |= "critical"`), - expectedExpr: mustParseLogSelector(t, `{app="foo"} | json | status="critical"`), - testFunc: func(t *testing.T, pred logs.RowPredicate) { - require.NotNil(t, pred) - require.IsType(t, logs.AndRowPredicate{}, pred) - expected := []bool{false, true, false, false} - testPredicate(t, pred, testData, expected) - }, - }, - } { - t.Run(tt.name, func(t *testing.T) { - p, actualExpr := buildLogsPredicateFromPipeline(tt.query) - - tt.testFunc(t, p) - - // Verify the updated expression - syntax.AssertExpressions(t, tt.expectedExpr, actualExpr) - }) - } -} - -func TestBuildLogsPredicateFromSampleExpr(t *testing.T) { - for _, tt := range []struct { - name string - query syntax.SampleExpr - expectedExpr syntax.SampleExpr - testFunc func(t *testing.T, pred logs.RowPredicate) - }{ - { - name: "range aggregation with line filter", - query: mustParseSampleExpr(t, `count_over_time({app="foo"} |= "error" [5m])`), - expectedExpr: mustParseSampleExpr(t, `count_over_time({app="foo"}[5m])`), - testFunc: func(t *testing.T, p logs.RowPredicate) { - require.NotNil(t, p) - require.IsType(t, logs.LogMessageFilterRowPredicate{}, p) - }, - }, - { - name: "vector aggregation with line filter", - query: mustParseSampleExpr(t, `sum by (app)(count_over_time({app="foo"} |= "error"[5m]))`), - expectedExpr: mustParseSampleExpr(t, `sum by (app)(count_over_time({app="foo"}[5m]))`), - testFunc: func(t *testing.T, p logs.RowPredicate) { - require.NotNil(t, p) - require.IsType(t, logs.LogMessageFilterRowPredicate{}, p) - }, - }, - { - name: "binary expressions are not modified", - query: mustParseSampleExpr(t, `(count_over_time({app="foo"} |= "error"[5m]) + count_over_time({app="bar"} |= "info"[5m]))`), - expectedExpr: mustParseSampleExpr(t, `(count_over_time({app="foo"} |= "error"[5m]) + count_over_time({app="bar"} |= "info"[5m]))`), - testFunc: func(t *testing.T, p logs.RowPredicate) { - require.Nil(t, p) - }, - }, - { - name: "label replace with line filter", - query: mustParseSampleExpr(t, `label_replace(rate({app="foo"} |= "error"[5m]), "new_label", "new_value", "old_label", "old_value")`), - expectedExpr: mustParseSampleExpr(t, `label_replace(rate({app="foo"}[5m]),"new_label","new_value","old_label","old_value")`), - testFunc: func(t *testing.T, p logs.RowPredicate) { - require.NotNil(t, p) - require.IsType(t, logs.LogMessageFilterRowPredicate{}, p) - }, - }, - } { - t.Run(tt.name, func(t *testing.T) { - p, actualExpr := buildLogsPredicateFromSampleExpr(tt.query) - - tt.testFunc(t, p) - - // Verify the updated expression - syntax.AssertExpressions(t, tt.expectedExpr, actualExpr) - }) - } -} - -// Helper function to parse log selectors for test cases -func mustParseLogSelector(t *testing.T, s string) syntax.LogSelectorExpr { - expr, err := syntax.ParseLogSelector(s, false) - if err != nil { - t.Fatalf("failed to parse log selector %q: %v", s, err) - } - return expr -} - -// Helper function to parse sample expressions for test cases -func mustParseSampleExpr(t *testing.T, s string) syntax.SampleExpr { - expr, err := syntax.ParseSampleExpr(s) - if err != nil { - t.Fatalf("failed to parse sample expr %q: %v", s, err) - } - return expr -} diff --git a/pkg/logql/bench/bench_test.go b/pkg/logql/bench/bench_test.go index 07f4a174ad..b4034c09ab 100644 --- a/pkg/logql/bench/bench_test.go +++ b/pkg/logql/bench/bench_test.go @@ -36,12 +36,11 @@ var ( const testTenant = "test-tenant" const ( - StoreDataObj = "dataobj" StoreDataObjV2Engine = "dataobj-engine" StoreChunk = "chunk" ) -var allStores = []string{StoreDataObj, StoreDataObjV2Engine, StoreChunk} +var allStores = []string{StoreDataObjV2Engine, StoreChunk} //go:generate go run ./cmd/generate/main.go -size 2147483648 -dir ./data -tenant test-tenant @@ -63,15 +62,6 @@ func setupBenchmarkWithStore(tb testing.TB, storeType string) logql.Engine { } return store.engine - case StoreDataObj: - store, err := NewDataObjStore(DefaultDataDir, testTenant) - if err != nil { - tb.Fatal(err) - } - querier, err = store.Querier() - if err != nil { - tb.Fatal(err) - } case StoreChunk: store, err := NewChunkStore(DefaultDataDir, testTenant) if err != nil { diff --git a/pkg/logql/bench/store_dataobj.go b/pkg/logql/bench/store_dataobj.go index b8d34549bf..3ec62a7030 100644 --- a/pkg/logql/bench/store_dataobj.go +++ b/pkg/logql/bench/store_dataobj.go @@ -11,7 +11,6 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/providers/filesystem" @@ -20,10 +19,8 @@ import ( "github.com/grafana/loki/v3/pkg/dataobj/index" "github.com/grafana/loki/v3/pkg/dataobj/index/indexobj" "github.com/grafana/loki/v3/pkg/dataobj/metastore" - "github.com/grafana/loki/v3/pkg/dataobj/querier" "github.com/grafana/loki/v3/pkg/dataobj/uploader" "github.com/grafana/loki/v3/pkg/logproto" - "github.com/grafana/loki/v3/pkg/logql" ) // DataObjStore implements Store using the dataobj format @@ -129,10 +126,6 @@ func (s *DataObjStore) Write(_ context.Context, streams []logproto.Stream) error return nil } -func (s *DataObjStore) Querier() (logql.Querier, error) { - return querier.NewStore(s.bucket, s.logger, metastore.NewObjectMetastore(s.bucket, s.logger, prometheus.DefaultRegisterer)), nil -} - func (s *DataObjStore) flush() error { // Reset the buffer s.buf.Reset() diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 1e36bdbf0f..a4c74eafb3 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -11,7 +11,6 @@ import ( "net/http/httputil" "net/url" "os" - "sort" "strconv" "strings" "time" @@ -55,8 +54,6 @@ import ( "github.com/grafana/loki/v3/pkg/dataobj/consumer" "github.com/grafana/loki/v3/pkg/dataobj/explorer" dataobjindex "github.com/grafana/loki/v3/pkg/dataobj/index" - "github.com/grafana/loki/v3/pkg/dataobj/metastore" - dataobjquerier "github.com/grafana/loki/v3/pkg/dataobj/querier" "github.com/grafana/loki/v3/pkg/distributor" "github.com/grafana/loki/v3/pkg/indexgateway" "github.com/grafana/loki/v3/pkg/ingester" @@ -531,37 +528,6 @@ func (t *Loki) initCodec() (services.Service, error) { return nil, nil } -func (t *Loki) getQuerierStore() (querier.Store, error) { - if !t.Cfg.DataObj.Querier.Enabled { - return t.Store, nil - } - - // verify that there's no schema with a date after the dataobj querier from date - for _, schema := range t.Cfg.SchemaConfig.Configs { - if schema.From.After(t.Cfg.DataObj.Querier.From) { - return nil, fmt.Errorf("dataobj querier From should be after the last schema date") - } - } - - store, err := t.createDataObjBucket("dataobj-querier") - if err != nil { - return nil, err - } - - logger := log.With(util_log.Logger, "component", "dataobj-querier") - storeCombiner := querier.NewStoreCombiner([]querier.StoreConfig{ - { - Store: dataobjquerier.NewStore(store, logger, metastore.NewObjectMetastore(store, logger, prometheus.DefaultRegisterer)), - From: t.Cfg.DataObj.Querier.From.Time, - }, - { - Store: t.Store, - }, - }) - - return storeCombiner, nil -} - func (t *Loki) initQuerier() (services.Service, error) { logger := log.With(util_log.Logger, "component", "querier") if t.Cfg.Ingester.QueryStoreMaxLookBackPeriod != 0 { @@ -578,12 +544,7 @@ func (t *Loki) initQuerier() (services.Service, error) { return nil, err } - querierStore, err := t.getQuerierStore() - if err != nil { - return nil, err - } - - t.Querier, err = querier.New(t.Cfg.Querier, querierStore, t.ingesterQuerier, t.Overrides, deleteStore, logger) + t.Querier, err = querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.Overrides, deleteStore, logger) if err != nil { return nil, err } @@ -1194,26 +1155,13 @@ func (i ingesterQueryOptions) QueryIngestersWithin() time.Duration { func (t *Loki) initQueryFrontendMiddleware() (_ services.Service, err error) { level.Debug(util_log.Logger).Log("msg", "initializing query frontend tripperware") - schemas := t.Cfg.SchemaConfig - // Adjust schema config to use constant sharding for the timerange of dataobj querier. - if t.Cfg.DataObj.Querier.Enabled { - schemas = schemas.Clone() - schemas.Configs = append(schemas.Configs, t.Cfg.DataObj.Querier.PeriodConfig()) - sort.Slice(schemas.Configs, func(i, j int) bool { - return schemas.Configs[i].From.UnixNano() < schemas.Configs[j].From.UnixNano() - }) - for _, cfg := range schemas.Configs { - level.Debug(util_log.Logger).Log("msg", "schema config", "from", cfg.From, "row_shards", cfg.RowShards, "index_type", cfg.IndexType, "object_store", cfg.ObjectType, "schema", cfg.Schema) - } - } - middleware, stopper, err := queryrange.NewMiddleware( t.Cfg.QueryRange, t.Cfg.Querier.Engine, ingesterQueryOptions{t.Cfg.Querier}, util_log.Logger, t.Overrides, - schemas, + t.Cfg.SchemaConfig, t.cacheGenerationLoader, t.Cfg.CompactorConfig.RetentionEnabled, prometheus.DefaultRegisterer, t.Cfg.MetricsNamespace, @@ -2317,12 +2265,7 @@ func (t *Loki) deleteRequestsClient(clientType string, limits limiter.CombinedLi } func (t *Loki) createRulerQueryEngine(logger log.Logger, deleteStore deletion.DeleteRequestsClient) (eng *logql.QueryEngine, err error) { - querierStore, err := t.getQuerierStore() - if err != nil { - return nil, err - } - - q, err := querier.New(t.Cfg.Querier, querierStore, t.ingesterQuerier, t.Overrides, deleteStore, logger) + q, err := querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.Overrides, deleteStore, logger) if err != nil { return nil, fmt.Errorf("could not create querier: %w", err) }