feat(dataob): Implement SelectSamples (#16251)

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Co-authored-by: Owen Diehl <ow.diehl@gmail.com>
pull/16259/head
Cyril Tovena 11 months ago committed by GitHub
parent 1b1471dca0
commit 13a6c33e32
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 137
      pkg/dataobj/querier/iter.go
  2. 78
      pkg/dataobj/querier/metadata.go
  3. 323
      pkg/dataobj/querier/metadata_test.go
  4. 306
      pkg/dataobj/querier/store.go
  5. 459
      pkg/dataobj/querier/store_test.go
  6. 22
      pkg/iter/sample_iterator.go
  7. 50
      pkg/iter/sample_iterator_test.go

@ -0,0 +1,137 @@
package querier
import (
"context"
"io"
"sync"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/iter"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/log"
"github.com/grafana/loki/v3/pkg/logql/syntax"
)
var (
recordsPool = sync.Pool{
New: func() interface{} {
records := make([]dataobj.Record, 1024)
return &records
},
}
samplesPool = sync.Pool{
New: func() interface{} {
samples := make([]logproto.Sample, 0, 1024)
return &samples
},
}
)
func newSampleIterator(ctx context.Context,
streams map[int64]dataobj.Stream,
extractor syntax.SampleExtractor,
reader *dataobj.LogsReader,
) (iter.SampleIterator, error) {
bufPtr := recordsPool.Get().(*[]dataobj.Record)
defer recordsPool.Put(bufPtr)
buf := *bufPtr
var (
iterators []iter.SampleIterator
prevStreamID int64 = -1
streamExtractor log.StreamSampleExtractor
series = map[string]*logproto.Series{}
streamHash uint64
)
for {
n, err := reader.Read(ctx, buf)
if err != nil && err != io.EOF {
return nil, err
}
// Handle end of stream or empty read
if n == 0 {
iterators = appendIteratorFromSeries(iterators, series)
break
}
// Process records in the current batch
for _, record := range buf[:n] {
stream, ok := streams[record.StreamID]
if !ok {
continue
}
// Handle stream transition
if prevStreamID != record.StreamID {
iterators = appendIteratorFromSeries(iterators, series)
clear(series)
streamExtractor = extractor.ForStream(stream.Labels)
streamHash = streamExtractor.BaseLabels().Hash()
prevStreamID = record.StreamID
}
// Process the record
timestamp := record.Timestamp.UnixNano()
value, parsedLabels, ok := streamExtractor.ProcessString(timestamp, record.Line, record.Metadata...)
if !ok {
continue
}
// Get or create series for the parsed labels
labelString := parsedLabels.String()
s, exists := series[labelString]
if !exists {
s = createNewSeries(labelString, streamHash)
series[labelString] = s
}
// Add sample to the series
s.Samples = append(s.Samples, logproto.Sample{
Timestamp: timestamp,
Value: value,
Hash: 0, // todo write a test to verify that we should not try to dedupe when we don't have a hash
})
}
}
if len(iterators) == 0 {
return iter.NoopSampleIterator, nil
}
return iter.NewSortSampleIterator(iterators), nil
}
// createNewSeries creates a new Series for the given labels and stream hash
func createNewSeries(labels string, streamHash uint64) *logproto.Series {
samplesPtr := samplesPool.Get().(*[]logproto.Sample)
samples := *samplesPtr
return &logproto.Series{
Labels: labels,
Samples: samples[:0],
StreamHash: streamHash,
}
}
// appendIteratorFromSeries appends a new SampleIterator to the given list of iterators
func appendIteratorFromSeries(iterators []iter.SampleIterator, series map[string]*logproto.Series) []iter.SampleIterator {
if len(series) == 0 {
return iterators
}
seriesResult := make([]logproto.Series, 0, len(series))
for _, s := range series {
seriesResult = append(seriesResult, *s)
}
return append(iterators, iter.SampleIteratorWithClose(
iter.NewMultiSeriesIterator(seriesResult),
func() error {
for _, s := range seriesResult {
samplesPool.Put(&s.Samples)
}
return nil
},
))
}

@ -17,6 +17,13 @@ import (
"github.com/grafana/loki/v3/pkg/logql"
)
var streamsPool = sync.Pool{
New: func() any {
streams := make([]dataobj.Stream, 1024)
return &streams
},
}
// SelectSeries implements querier.Store
func (s *Store) SelectSeries(ctx context.Context, req logql.SelectLogParams) ([]logproto.SeriesIdentifier, error) {
objects, err := s.objectsForTimeRange(ctx, req.Start, req.End)
@ -129,13 +136,6 @@ func (s *Store) LabelValuesForMetricName(ctx context.Context, _ string, from, th
return values, nil
}
var streamsPool = sync.Pool{
New: func() any {
streams := make([]dataobj.Stream, 1024)
return &streams
},
}
// streamProcessor handles processing of unique series with custom collection logic
type streamProcessor struct {
predicate dataobj.StreamsPredicate
@ -146,61 +146,25 @@ type streamProcessor struct {
// newStreamProcessor creates a new streamProcessor with the given parameters
func newStreamProcessor(start, end time.Time, matchers []*labels.Matcher, objects []*dataobj.Object, shard logql.Shard) *streamProcessor {
// Create a time range predicate
var predicate dataobj.StreamsPredicate = dataobj.TimeRangePredicate[dataobj.StreamsPredicate]{
StartTime: start,
EndTime: end,
IncludeStart: true,
IncludeEnd: true,
}
// If there are any matchers, combine them with an AND predicate
if len(matchers) > 0 {
predicate = dataobj.AndPredicate[dataobj.StreamsPredicate]{
Left: predicate,
Right: matchersToPredicate(matchers),
}
}
return &streamProcessor{
predicate: predicate,
predicate: streamPredicate(matchers, start, end),
seenSeries: &sync.Map{},
objects: objects,
shard: shard,
}
}
// matchersToPredicate converts a list of matchers to a dataobj.StreamsPredicate
func matchersToPredicate(matchers []*labels.Matcher) dataobj.StreamsPredicate {
var left dataobj.StreamsPredicate
for _, matcher := range matchers {
var right dataobj.StreamsPredicate
switch matcher.Type {
case labels.MatchEqual:
right = dataobj.LabelMatcherPredicate{Name: matcher.Name, Value: matcher.Value}
default:
right = dataobj.LabelFilterPredicate{Name: matcher.Name, Keep: func(_, value string) bool {
return matcher.Matches(value)
}}
}
if left == nil {
left = right
} else {
left = dataobj.AndPredicate[dataobj.StreamsPredicate]{
Left: left,
Right: right,
}
}
}
return left
}
// ProcessParallel processes series from multiple readers in parallel
func (sp *streamProcessor) ProcessParallel(ctx context.Context, onNewStream func(uint64, dataobj.Stream)) error {
readers, err := shardStreamReaders(ctx, sp.objects, sp.shard)
if err != nil {
return err
}
defer func() {
for _, reader := range readers {
streamReaderPool.Put(reader)
}
}()
// set predicate on all readers
for _, reader := range readers {
@ -263,17 +227,8 @@ func labelsToSeriesIdentifier(labels labels.Labels) logproto.SeriesIdentifier {
// shardStreamReaders fetches metadata of objects in parallel and shards them into a list of StreamsReaders
func shardStreamReaders(ctx context.Context, objects []*dataobj.Object, shard logql.Shard) ([]*dataobj.StreamsReader, error) {
// fetch all metadata of objects in parallel
g, ctx := errgroup.WithContext(ctx)
metadatas := make([]dataobj.Metadata, len(objects))
for i, obj := range objects {
g.Go(func() error {
var err error
metadatas[i], err = obj.Metadata(ctx)
return err
})
}
if err := g.Wait(); err != nil {
metadatas, err := fetchMetadatas(ctx, objects)
if err != nil {
return nil, err
}
// sectionIndex tracks the global section number across all objects to ensure consistent sharding
@ -289,7 +244,8 @@ func shardStreamReaders(ctx context.Context, objects []*dataobj.Object, shard lo
continue
}
}
reader := dataobj.NewStreamsReader(objects[i], j)
reader := streamReaderPool.Get().(*dataobj.StreamsReader)
reader.Reset(objects[i], j)
readers = append(readers, reader)
sectionIndex++
}

@ -0,0 +1,323 @@
package querier
import (
"context"
"sort"
"testing"
"time"
"github.com/grafana/dskit/user"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql"
)
func TestStore_SelectSeries(t *testing.T) {
const testTenant = "test-tenant"
builder := newTestDataBuilder(t, testTenant)
defer builder.close()
// Setup test data
now := setupTestData(t, builder)
store := NewStore(builder.bucket)
ctx := user.InjectOrgID(context.Background(), testTenant)
tests := []struct {
name string
selector string
want []string
}{
{
name: "select all series",
selector: ``,
want: []string{
`{app="foo", env="prod"}`,
`{app="foo", env="dev"}`,
`{app="bar", env="prod"}`,
`{app="bar", env="dev"}`,
`{app="baz", env="prod", team="a"}`,
},
},
{
name: "select with equality matcher",
selector: `{app="foo"}`,
want: []string{
`{app="foo", env="prod"}`,
`{app="foo", env="dev"}`,
},
},
{
name: "select with regex matcher",
selector: `{app=~"foo|bar"}`,
want: []string{
`{app="foo", env="prod"}`,
`{app="foo", env="dev"}`,
`{app="bar", env="prod"}`,
`{app="bar", env="dev"}`,
},
},
{
name: "select with negative equality matcher",
selector: `{app=~".+", app!="foo"}`,
want: []string{
`{app="bar", env="prod"}`,
`{app="bar", env="dev"}`,
`{app="baz", env="prod", team="a"}`,
},
},
{
name: "select with negative regex matcher",
selector: `{app=~".+", app!~"foo|bar"}`,
want: []string{
`{app="baz", env="prod", team="a"}`,
},
},
{
name: "select with multiple matchers",
selector: `{app="foo", env="prod"}`,
want: []string{
`{app="foo", env="prod"}`,
},
},
{
name: "select with regex and equality matchers",
selector: `{app=~"foo|bar", env="prod"}`,
want: []string{
`{app="foo", env="prod"}`,
`{app="bar", env="prod"}`,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
series, err := store.SelectSeries(ctx, logql.SelectLogParams{
QueryRequest: &logproto.QueryRequest{
Start: now.Add(-time.Hour),
End: now.Add(time.Hour),
Plan: planFromString(tt.selector),
Selector: tt.selector,
},
})
require.NoError(t, err)
var got []string
for _, s := range series {
got = append(got, labelsFromSeriesID(s))
}
require.ElementsMatch(t, tt.want, got)
})
}
t.Run("sharding", func(t *testing.T) {
// Query first shard
series1, err := store.SelectSeries(ctx, logql.SelectLogParams{
QueryRequest: &logproto.QueryRequest{
Start: now.Add(-time.Hour),
End: now.Add(time.Hour),
Plan: planFromString(`{app=~"foo|bar|baz"}`),
Selector: `{app=~"foo|bar|baz"}`,
Shards: []string{"0_of_2"},
},
})
require.NoError(t, err)
require.NotEmpty(t, series1)
require.Less(t, len(series1), 5) // Should get less than all series
// Query second shard
series2, err := store.SelectSeries(ctx, logql.SelectLogParams{
QueryRequest: &logproto.QueryRequest{
Start: now.Add(-time.Hour),
End: now.Add(time.Hour),
Plan: planFromString(`{app=~"foo|bar|baz"}`),
Selector: `{app=~"foo|bar|baz"}`,
Shards: []string{"1_of_2"},
},
})
require.NoError(t, err)
require.NotEmpty(t, series2)
// Combined shards should equal all series
var allSeries []string
for _, s := range append(series1, series2...) {
allSeries = append(allSeries, labelsFromSeriesID(s))
}
want := []string{
`{app="foo", env="prod"}`,
`{app="foo", env="dev"}`,
`{app="bar", env="prod"}`,
`{app="bar", env="dev"}`,
`{app="baz", env="prod", team="a"}`,
}
require.ElementsMatch(t, want, allSeries)
})
}
func TestStore_LabelNamesForMetricName(t *testing.T) {
const testTenant = "test-tenant"
builder := newTestDataBuilder(t, testTenant)
defer builder.close()
// Setup test data
now := setupTestData(t, builder)
store := NewStore(builder.bucket)
ctx := user.InjectOrgID(context.Background(), testTenant)
tests := []struct {
name string
matchers []*labels.Matcher
want []string
}{
{
name: "no matchers",
matchers: nil,
want: []string{"app", "env", "team"},
},
{
name: "with equality matcher",
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"),
},
want: []string{"app", "env"},
},
{
name: "with regex matcher",
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchRegexp, "app", "foo|bar"),
},
want: []string{"app", "env"},
},
{
name: "with negative matcher",
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchNotEqual, "app", "foo"),
},
want: []string{"app", "env", "team"},
},
{
name: "with negative regex matcher",
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchNotRegexp, "app", "foo|bar"),
},
want: []string{"app", "env", "team"},
},
{
name: "with multiple matchers",
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"),
labels.MustNewMatcher(labels.MatchEqual, "env", "prod"),
},
want: []string{"app", "env"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
names, err := store.LabelNamesForMetricName(ctx, "", model.TimeFromUnixNano(now.Add(-time.Hour).UnixNano()), model.TimeFromUnixNano(now.Add(time.Hour).UnixNano()), "", tt.matchers...)
require.NoError(t, err)
require.ElementsMatch(t, tt.want, names)
})
}
}
func TestStore_LabelValuesForMetricName(t *testing.T) {
const testTenant = "test-tenant"
builder := newTestDataBuilder(t, testTenant)
defer builder.close()
// Setup test data
now := setupTestData(t, builder)
store := NewStore(builder.bucket)
ctx := user.InjectOrgID(context.Background(), testTenant)
tests := []struct {
name string
labelName string
matchers []*labels.Matcher
want []string
}{
{
name: "app label without matchers",
labelName: "app",
matchers: nil,
want: []string{"bar", "baz", "foo"},
},
{
name: "env label without matchers",
labelName: "env",
matchers: nil,
want: []string{"dev", "prod"},
},
{
name: "team label without matchers",
labelName: "team",
matchers: nil,
want: []string{"a"},
},
{
name: "env label with app equality matcher",
labelName: "env",
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"),
},
want: []string{"dev", "prod"},
},
{
name: "env label with app regex matcher",
labelName: "env",
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchRegexp, "app", "foo|bar"),
},
want: []string{"dev", "prod"},
},
{
name: "env label with app negative matcher",
labelName: "env",
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchNotEqual, "app", "foo"),
},
want: []string{"dev", "prod"},
},
{
name: "env label with app negative regex matcher",
labelName: "env",
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchNotRegexp, "app", "foo|bar"),
},
want: []string{"prod"},
},
{
name: "env label with multiple matchers",
labelName: "env",
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"),
labels.MustNewMatcher(labels.MatchEqual, "env", "prod"),
},
want: []string{"prod"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
values, err := store.LabelValuesForMetricName(ctx, "", model.TimeFromUnixNano(now.Add(-time.Hour).UnixNano()), model.TimeFromUnixNano(now.Add(time.Hour).UnixNano()), "", tt.labelName, tt.matchers...)
require.NoError(t, err)
require.Equal(t, tt.want, values)
})
}
}
func labelsFromSeriesID(id logproto.SeriesIdentifier) string {
ls := make(labels.Labels, 0, len(id.Labels))
for _, l := range id.Labels {
ls = append(ls, labels.Label{Name: l.Key, Value: l.Value})
}
sort.Sort(ls)
return ls.String()
}

@ -4,18 +4,23 @@ import (
"context"
"flag"
"fmt"
"io"
"slices"
"sync"
"time"
"github.com/grafana/dskit/tenant"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/thanos-io/objstore"
"golang.org/x/sync/errgroup"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/metastore"
"github.com/grafana/loki/v3/pkg/iter"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/querier"
"github.com/grafana/loki/v3/pkg/storage/chunk"
storageconfig "github.com/grafana/loki/v3/pkg/storage/config"
@ -23,7 +28,36 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
)
var _ querier.Store = &Store{}
var (
_ querier.Store = &Store{}
noShard = logql.Shard{
PowerOfTwo: &index.ShardAnnotation{
Shard: uint32(0),
Of: uint32(1),
},
}
shardedObjectsPool = sync.Pool{
New: func() any {
return &shardedObject{
streams: make(map[int64]dataobj.Stream),
streamsIDs: make([]int64, 0, 1024),
logReaders: make([]*dataobj.LogsReader, 0, 16),
}
},
}
logReaderPool = sync.Pool{
New: func() any {
return &dataobj.LogsReader{}
},
}
streamReaderPool = sync.Pool{
New: func() any {
return &dataobj.StreamsReader{}
},
}
)
type Config struct {
Enabled bool `yaml:"enabled" doc:"description=Enable the dataobj querier."`
@ -42,10 +76,12 @@ func (c *Config) Validate() error {
return nil
}
// Store implements querier.Store for querying data objects.
type Store struct {
bucket objstore.Bucket
}
// NewStore creates a new Store.
func NewStore(bucket objstore.Bucket) *Store {
return &Store{
bucket: bucket,
@ -59,9 +95,22 @@ func (s *Store) SelectLogs(_ context.Context, _ logql.SelectLogParams) (iter.Ent
}
// SelectSamples implements querier.Store
func (s *Store) SelectSamples(_ context.Context, _ logql.SelectSampleParams) (iter.SampleIterator, error) {
// TODO: Implement
return iter.NoopSampleIterator, nil
func (s *Store) SelectSamples(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error) {
objects, err := s.objectsForTimeRange(ctx, req.Start, req.End)
if err != nil {
return nil, err
}
shard, err := parseShards(req.Shards)
if err != nil {
return nil, err
}
expr, err := req.Expr()
if err != nil {
return nil, err
}
return selectSamples(ctx, objects, shard, expr, req.Start, req.End)
}
// Stats implements querier.Store
@ -82,6 +131,7 @@ func (s *Store) GetShards(_ context.Context, _ string, _ model.Time, _ model.Tim
return &logproto.ShardsResponse{}, nil
}
// objectsForTimeRange returns data objects for the given time range.
func (s *Store) objectsForTimeRange(ctx context.Context, from, through time.Time) ([]*dataobj.Object, error) {
userID, err := tenant.TenantID(ctx)
if err != nil {
@ -99,11 +149,249 @@ func (s *Store) objectsForTimeRange(ctx context.Context, from, through time.Time
return objects, nil
}
var noShard = logql.Shard{
PowerOfTwo: &index.ShardAnnotation{
Shard: uint32(1),
Of: uint32(1),
},
func selectSamples(ctx context.Context, objects []*dataobj.Object, shard logql.Shard, expr syntax.SampleExpr, start, end time.Time) (iter.SampleIterator, error) {
selector, err := expr.Selector()
if err != nil {
return nil, err
}
shardedObjects, err := shardObjects(ctx, objects, shard)
if err != nil {
return nil, err
}
defer func() {
for _, obj := range shardedObjects {
obj.reset()
shardedObjectsPool.Put(obj)
}
}()
streamsPredicate := streamPredicate(selector.Matchers(), start, end)
// TODO: support more predicates and combine with log.Pipeline.
logsPredicate := dataobj.TimeRangePredicate[dataobj.LogsPredicate]{
StartTime: start,
EndTime: end,
IncludeStart: true,
IncludeEnd: false,
}
g, ctx := errgroup.WithContext(ctx)
iterators := make([]iter.SampleIterator, len(shardedObjects))
for i, obj := range shardedObjects {
g.Go(func() error {
iterator, err := obj.selectSamples(ctx, streamsPredicate, logsPredicate, expr)
if err != nil {
return err
}
iterators[i] = iterator
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return iter.NewSortSampleIterator(iterators), nil
}
type shardedObject struct {
streamReader *dataobj.StreamsReader
logReaders []*dataobj.LogsReader
streamsIDs []int64
streams map[int64]dataobj.Stream
}
func shardObjects(
ctx context.Context,
objects []*dataobj.Object,
shard logql.Shard,
) ([]*shardedObject, error) {
metadatas, err := fetchMetadatas(ctx, objects)
if err != nil {
return nil, err
}
// sectionIndex tracks the global section number across all objects to ensure consistent sharding
var sectionIndex uint64
shardedReaders := make([]*shardedObject, 0, len(objects))
for i, metadata := range metadatas {
var reader *shardedObject
for j := 0; j < metadata.LogsSections; j++ {
if shard.PowerOfTwo != nil && shard.PowerOfTwo.Of > 1 {
if sectionIndex%uint64(shard.PowerOfTwo.Of) != uint64(shard.PowerOfTwo.Shard) {
sectionIndex++
continue
}
}
if reader == nil {
reader = shardedObjectsPool.Get().(*shardedObject)
reader.streamReader = streamReaderPool.Get().(*dataobj.StreamsReader)
reader.streamReader.Reset(objects[i], j)
}
logReader := logReaderPool.Get().(*dataobj.LogsReader)
logReader.Reset(objects[i], j)
reader.logReaders = append(reader.logReaders, logReader)
sectionIndex++
}
// if reader is not nil, it means we have at least one log reader
if reader != nil {
shardedReaders = append(shardedReaders, reader)
}
}
return shardedReaders, nil
}
func (s *shardedObject) reset() {
streamReaderPool.Put(s.streamReader)
for i, reader := range s.logReaders {
logReaderPool.Put(reader)
s.logReaders[i] = nil
}
s.streamReader = nil
s.logReaders = s.logReaders[:0]
s.streamsIDs = s.streamsIDs[:0]
clear(s.streams)
}
func (s *shardedObject) selectSamples(ctx context.Context, streamsPredicate dataobj.StreamsPredicate, logsPredicate dataobj.LogsPredicate, expr syntax.SampleExpr) (iter.SampleIterator, error) {
if err := s.setPredicate(streamsPredicate, logsPredicate); err != nil {
return nil, err
}
if err := s.matchStreams(ctx); err != nil {
return nil, err
}
iterators := make([]iter.SampleIterator, len(s.logReaders))
g, ctx := errgroup.WithContext(ctx)
for i, reader := range s.logReaders {
g.Go(func() error {
// extractor is not thread safe, so we need to create a new one for each object
extractor, err := expr.Extractor()
if err != nil {
return err
}
iter, err := newSampleIterator(ctx, s.streams, extractor, reader)
if err != nil {
return err
}
iterators[i] = iter
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return iter.NewSortSampleIterator(iterators), nil
}
func (s *shardedObject) setPredicate(streamsPredicate dataobj.StreamsPredicate, logsPredicate dataobj.LogsPredicate) error {
if err := s.streamReader.SetPredicate(streamsPredicate); err != nil {
return err
}
for _, reader := range s.logReaders {
if err := reader.SetPredicate(logsPredicate); err != nil {
return err
}
}
return nil
}
func (s *shardedObject) matchStreams(ctx context.Context) error {
streamsPtr := streamsPool.Get().(*[]dataobj.Stream)
defer streamsPool.Put(streamsPtr)
streams := *streamsPtr
for {
n, err := s.streamReader.Read(ctx, streams)
if err != nil && err != io.EOF {
return err
}
if n == 0 {
break
}
for _, stream := range streams[:n] {
s.streams[stream.ID] = stream
s.streamsIDs = append(s.streamsIDs, stream.ID)
}
}
// setup log readers to filter streams
for _, reader := range s.logReaders {
if err := reader.MatchStreams(slices.Values(s.streamsIDs)); err != nil {
return err
}
}
return nil
}
// fetchMetadatas fetches metadata of objects in parallel
func fetchMetadatas(ctx context.Context, objects []*dataobj.Object) ([]dataobj.Metadata, error) {
g, ctx := errgroup.WithContext(ctx)
metadatas := make([]dataobj.Metadata, len(objects))
for i, obj := range objects {
g.Go(func() error {
var err error
metadatas[i], err = obj.Metadata(ctx)
return err
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return metadatas, nil
}
// streamPredicate creates a dataobj.StreamsPredicate from a list of matchers and a time range
func streamPredicate(matchers []*labels.Matcher, start, end time.Time) dataobj.StreamsPredicate {
var predicate dataobj.StreamsPredicate = dataobj.TimeRangePredicate[dataobj.StreamsPredicate]{
StartTime: start,
EndTime: end,
IncludeStart: true,
IncludeEnd: true,
}
// If there are any matchers, combine them with an AND predicate
if len(matchers) > 0 {
predicate = dataobj.AndPredicate[dataobj.StreamsPredicate]{
Left: predicate,
Right: matchersToPredicate(matchers),
}
}
return predicate
}
// matchersToPredicate converts a list of matchers to a dataobj.StreamsPredicate
func matchersToPredicate(matchers []*labels.Matcher) dataobj.StreamsPredicate {
var left dataobj.StreamsPredicate
for _, matcher := range matchers {
var right dataobj.StreamsPredicate
switch matcher.Type {
case labels.MatchEqual:
right = dataobj.LabelMatcherPredicate{Name: matcher.Name, Value: matcher.Value}
default:
right = dataobj.LabelFilterPredicate{Name: matcher.Name, Keep: func(_, value string) bool {
return matcher.Matches(value)
}}
}
if left == nil {
left = right
} else {
left = dataobj.AndPredicate[dataobj.StreamsPredicate]{
Left: left,
Right: right,
}
}
}
return left
}
func parseShards(shards []string) (logql.Shard, error) {

@ -5,15 +5,13 @@ import (
"context"
"os"
"path/filepath"
"sort"
"testing"
"time"
"github.com/go-kit/log"
"github.com/google/go-cmp/cmp"
"github.com/grafana/dskit/user"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/providers/filesystem"
@ -21,379 +19,229 @@ import (
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/metastore"
"github.com/grafana/loki/v3/pkg/dataobj/uploader"
"github.com/grafana/loki/v3/pkg/iter"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/querier/plan"
)
func TestStore_SelectSeries(t *testing.T) {
func TestStore_SelectSamples(t *testing.T) {
const testTenant = "test-tenant"
builder := newTestDataBuilder(t, testTenant)
defer builder.close()
// Setup test data
now := setupTestData(t, builder)
store := NewStore(builder.bucket)
ctx := user.InjectOrgID(context.Background(), testTenant)
tests := []struct {
name string
selector string
want []string
start time.Time
end time.Time
shards []string
want []sampleWithLabels
}{
{
name: "select all series",
selector: ``,
want: []string{
`{app="foo", env="prod"}`,
`{app="foo", env="dev"}`,
`{app="bar", env="prod"}`,
`{app="bar", env="dev"}`,
`{app="baz", env="prod", team="a"}`,
name: "select all samples in range",
selector: `rate({app=~".+"}[1h])`,
start: now,
end: now.Add(time.Hour),
want: []sampleWithLabels{
{Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.UnixNano(), Value: 1}},
{Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(5 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(8 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(10 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(12 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(15 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(18 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(20 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(22 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(25 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(30 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(32 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(35 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(38 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(40 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(42 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(45 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(50 * time.Second).UnixNano(), Value: 1}},
},
},
{
name: "select with equality matcher",
selector: `{app="foo"}`,
want: []string{
`{app="foo", env="prod"}`,
`{app="foo", env="dev"}`,
name: "select with time range filter",
selector: `rate({app="baz", env="prod", team="a"}[1h])`,
start: now.Add(20 * time.Second),
end: now.Add(40 * time.Second),
want: []sampleWithLabels{
{Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(22 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(32 * time.Second).UnixNano(), Value: 1}},
},
},
{
name: "select with regex matcher",
selector: `{app=~"foo|bar"}`,
want: []string{
`{app="foo", env="prod"}`,
`{app="foo", env="dev"}`,
`{app="bar", env="prod"}`,
`{app="bar", env="dev"}`,
name: "select with label matcher",
selector: `rate({app="foo"}[1h])`,
start: now,
end: now.Add(time.Hour),
want: []sampleWithLabels{
{Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.UnixNano(), Value: 1}},
{Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(10 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(20 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(30 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(35 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(45 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(50 * time.Second).UnixNano(), Value: 1}},
},
},
{
name: "select with negative equality matcher",
selector: `{app=~".+", app!="foo"}`,
want: []string{
`{app="bar", env="prod"}`,
`{app="bar", env="dev"}`,
`{app="baz", env="prod", team="a"}`,
name: "select with regex matcher",
selector: `rate({app=~"foo|bar", env="prod"}[1h])`,
start: now,
end: now.Add(time.Hour),
want: []sampleWithLabels{
{Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: 3600000000000, Value: 1}},
{Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: 3605000000000, Value: 1}},
{Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: 3615000000000, Value: 1}},
{Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: 3625000000000, Value: 1}},
{Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: 3630000000000, Value: 1}},
{Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: 3640000000000, Value: 1}},
{Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: 3645000000000, Value: 1}},
{Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: 3650000000000, Value: 1}},
},
},
{
name: "select with negative regex matcher",
selector: `{app=~".+", app!~"foo|bar"}`,
want: []string{
`{app="baz", env="prod", team="a"}`,
name: "select first shard",
selector: `rate({app=~".+"}[1h])`,
start: now,
end: now.Add(time.Hour),
shards: []string{"0_of_2"},
want: []sampleWithLabels{
{Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(5 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(8 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(12 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(15 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(18 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(22 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(25 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(32 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(38 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(40 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(42 * time.Second).UnixNano(), Value: 1}},
},
},
{
name: "select with multiple matchers",
selector: `{app="foo", env="prod"}`,
want: []string{
`{app="foo", env="prod"}`,
name: "select second shard",
selector: `rate({app=~".+"}[1h])`,
start: now,
end: now.Add(time.Hour),
shards: []string{"1_of_2"},
want: []sampleWithLabels{
{Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.UnixNano(), Value: 1}},
{Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(10 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(20 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(30 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(35 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(45 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(50 * time.Second).UnixNano(), Value: 1}},
},
},
{
name: "select with regex and equality matchers",
selector: `{app=~"foo|bar", env="prod"}`,
want: []string{
`{app="foo", env="prod"}`,
`{app="bar", env="prod"}`,
name: "select all samples in range with a filter",
selector: `count_over_time({app=~".+"} |= "bar2"[1h])`,
start: now,
end: now.Add(time.Hour),
want: []sampleWithLabels{
{Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(15 * time.Second).UnixNano(), Value: 1}},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
series, err := store.SelectSeries(ctx, logql.SelectLogParams{
QueryRequest: &logproto.QueryRequest{
Start: now.Add(-time.Hour),
End: now.Add(time.Hour),
it, err := store.SelectSamples(ctx, logql.SelectSampleParams{
SampleQueryRequest: &logproto.SampleQueryRequest{
Start: tt.start,
End: tt.end,
Plan: planFromString(tt.selector),
Selector: tt.selector,
Shards: tt.shards,
},
})
require.NoError(t, err)
var got []string
for _, s := range series {
got = append(got, labelsFromSeriesID(s))
}
require.ElementsMatch(t, tt.want, got)
})
}
t.Run("sharding", func(t *testing.T) {
// Query first shard
series1, err := store.SelectSeries(ctx, logql.SelectLogParams{
QueryRequest: &logproto.QueryRequest{
Start: now.Add(-time.Hour),
End: now.Add(time.Hour),
Plan: planFromString(`{app=~"foo|bar|baz"}`),
Selector: `{app=~"foo|bar|baz"}`,
Shards: []string{"0_of_2"},
},
})
require.NoError(t, err)
require.NotEmpty(t, series1)
require.Less(t, len(series1), 5) // Should get less than all series
// Query second shard
series2, err := store.SelectSeries(ctx, logql.SelectLogParams{
QueryRequest: &logproto.QueryRequest{
Start: now.Add(-time.Hour),
End: now.Add(time.Hour),
Plan: planFromString(`{app=~"foo|bar|baz"}`),
Selector: `{app=~"foo|bar|baz"}`,
Shards: []string{"1_of_2"},
},
})
require.NoError(t, err)
require.NotEmpty(t, series2)
// Combined shards should equal all series
var allSeries []string
for _, s := range append(series1, series2...) {
allSeries = append(allSeries, labelsFromSeriesID(s))
}
want := []string{
`{app="foo", env="prod"}`,
`{app="foo", env="dev"}`,
`{app="bar", env="prod"}`,
`{app="bar", env="dev"}`,
`{app="baz", env="prod", team="a"}`,
}
require.ElementsMatch(t, want, allSeries)
})
}
func TestStore_LabelNamesForMetricName(t *testing.T) {
const testTenant = "test-tenant"
builder := newTestDataBuilder(t, testTenant)
defer builder.close()
// Setup test data
now := setupTestData(t, builder)
store := NewStore(builder.bucket)
ctx := user.InjectOrgID(context.Background(), testTenant)
tests := []struct {
name string
matchers []*labels.Matcher
want []string
}{
{
name: "no matchers",
matchers: nil,
want: []string{"app", "env", "team"},
},
{
name: "with equality matcher",
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"),
},
want: []string{"app", "env"},
},
{
name: "with regex matcher",
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchRegexp, "app", "foo|bar"),
},
want: []string{"app", "env"},
},
{
name: "with negative matcher",
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchNotEqual, "app", "foo"),
},
want: []string{"app", "env", "team"},
},
{
name: "with negative regex matcher",
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchNotRegexp, "app", "foo|bar"),
},
want: []string{"app", "env", "team"},
},
{
name: "with multiple matchers",
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"),
labels.MustNewMatcher(labels.MatchEqual, "env", "prod"),
},
want: []string{"app", "env"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
names, err := store.LabelNamesForMetricName(ctx, "", model.TimeFromUnixNano(now.Add(-time.Hour).UnixNano()), model.TimeFromUnixNano(now.Add(time.Hour).UnixNano()), "", tt.matchers...)
require.NoError(t, err)
require.ElementsMatch(t, tt.want, names)
})
}
}
func TestStore_LabelValuesForMetricName(t *testing.T) {
const testTenant = "test-tenant"
builder := newTestDataBuilder(t, testTenant)
defer builder.close()
// Setup test data
now := setupTestData(t, builder)
store := NewStore(builder.bucket)
ctx := user.InjectOrgID(context.Background(), testTenant)
tests := []struct {
name string
labelName string
matchers []*labels.Matcher
want []string
}{
{
name: "app label without matchers",
labelName: "app",
matchers: nil,
want: []string{"bar", "baz", "foo"},
},
{
name: "env label without matchers",
labelName: "env",
matchers: nil,
want: []string{"dev", "prod"},
},
{
name: "team label without matchers",
labelName: "team",
matchers: nil,
want: []string{"a"},
},
{
name: "env label with app equality matcher",
labelName: "env",
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"),
},
want: []string{"dev", "prod"},
},
{
name: "env label with app regex matcher",
labelName: "env",
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchRegexp, "app", "foo|bar"),
},
want: []string{"dev", "prod"},
},
{
name: "env label with app negative matcher",
labelName: "env",
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchNotEqual, "app", "foo"),
},
want: []string{"dev", "prod"},
},
{
name: "env label with app negative regex matcher",
labelName: "env",
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchNotRegexp, "app", "foo|bar"),
},
want: []string{"prod"},
},
{
name: "env label with multiple matchers",
labelName: "env",
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"),
labels.MustNewMatcher(labels.MatchEqual, "env", "prod"),
},
want: []string{"prod"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
values, err := store.LabelValuesForMetricName(ctx, "", model.TimeFromUnixNano(now.Add(-time.Hour).UnixNano()), model.TimeFromUnixNano(now.Add(time.Hour).UnixNano()), "", tt.labelName, tt.matchers...)
samples, err := readAllSamples(it)
require.NoError(t, err)
require.Equal(t, tt.want, values)
if diff := cmp.Diff(tt.want, samples); diff != "" {
t.Errorf("samples mismatch (-want +got):\n%s", diff)
}
})
}
}
func setupTestData(t *testing.T, builder *testDataBuilder) time.Time {
t.Helper()
now := time.Now()
now := time.Unix(0, int64(time.Hour))
// Data before the query range (should not be included in results)
builder.addStream(
`{app="foo", env="prod"}`,
logproto.Entry{Timestamp: now.Add(-2 * time.Hour), Line: "foo_before1"},
logproto.Entry{Timestamp: now.Add(-2 * time.Hour).Add(30 * time.Second), Line: "foo_before2"},
logproto.Entry{Timestamp: now.Add(-2 * time.Hour).Add(45 * time.Second), Line: "foo_before3"},
)
builder.flush()
// First object with app=foo series
// Data within query range
builder.addStream(
`{app="foo", env="prod"}`,
logproto.Entry{Timestamp: now, Line: "foo1"},
logproto.Entry{Timestamp: now.Add(time.Second), Line: "foo2"},
logproto.Entry{Timestamp: now.Add(30 * time.Second), Line: "foo2"},
logproto.Entry{Timestamp: now.Add(45 * time.Second), Line: "foo3"},
logproto.Entry{Timestamp: now.Add(50 * time.Second), Line: "foo4"},
)
builder.addStream(
`{app="foo", env="dev"}`,
logproto.Entry{Timestamp: now, Line: "foo3"},
logproto.Entry{Timestamp: now.Add(time.Second), Line: "foo4"},
logproto.Entry{Timestamp: now.Add(10 * time.Second), Line: "foo5"},
logproto.Entry{Timestamp: now.Add(20 * time.Second), Line: "foo6"},
logproto.Entry{Timestamp: now.Add(35 * time.Second), Line: "foo7"},
)
builder.flush()
// Second object with app=bar series
builder.addStream(
`{app="bar", env="prod"}`,
logproto.Entry{Timestamp: now, Line: "bar1"},
logproto.Entry{Timestamp: now.Add(time.Second), Line: "bar2"},
logproto.Entry{Timestamp: now.Add(5 * time.Second), Line: "bar1"},
logproto.Entry{Timestamp: now.Add(15 * time.Second), Line: "bar2"},
logproto.Entry{Timestamp: now.Add(25 * time.Second), Line: "bar3"},
logproto.Entry{Timestamp: now.Add(40 * time.Second), Line: "bar4"},
)
builder.addStream(
`{app="bar", env="dev"}`,
logproto.Entry{Timestamp: now, Line: "bar3"},
logproto.Entry{Timestamp: now.Add(time.Second), Line: "bar4"},
logproto.Entry{Timestamp: now.Add(8 * time.Second), Line: "bar5"},
logproto.Entry{Timestamp: now.Add(18 * time.Second), Line: "bar6"},
logproto.Entry{Timestamp: now.Add(38 * time.Second), Line: "bar7"},
)
builder.flush()
// Third object with app=baz series
builder.addStream(
`{app="baz", env="prod", team="a"}`,
logproto.Entry{Timestamp: now, Line: "baz1"},
logproto.Entry{Timestamp: now.Add(time.Second), Line: "baz2"},
logproto.Entry{Timestamp: now.Add(12 * time.Second), Line: "baz1"},
logproto.Entry{Timestamp: now.Add(22 * time.Second), Line: "baz2"},
logproto.Entry{Timestamp: now.Add(32 * time.Second), Line: "baz3"},
logproto.Entry{Timestamp: now.Add(42 * time.Second), Line: "baz4"},
)
builder.flush()
return now
}
func labelsFromSeriesID(id logproto.SeriesIdentifier) string {
ls := make(labels.Labels, 0, len(id.Labels))
for _, l := range id.Labels {
ls = append(ls, labels.Label{Name: l.Key, Value: l.Value})
}
sort.Sort(ls)
return ls.String()
}
func mustParseSeriesID(s string) logproto.SeriesIdentifier {
ls, err := syntax.ParseLabels(s)
if err != nil {
panic(err)
}
return logproto.SeriesIdentifier{
Labels: labelsToSeriesLabels(ls),
}
}
// Data after the query range (should not be included in results)
builder.addStream(
`{app="foo", env="prod"}`,
logproto.Entry{Timestamp: now.Add(2 * time.Hour), Line: "foo_after1"},
logproto.Entry{Timestamp: now.Add(2 * time.Hour).Add(30 * time.Second), Line: "foo_after2"},
logproto.Entry{Timestamp: now.Add(2 * time.Hour).Add(45 * time.Second), Line: "foo_after3"},
)
builder.flush()
func labelsToSeriesLabels(ls labels.Labels) []logproto.SeriesIdentifier_LabelsEntry {
entries := make([]logproto.SeriesIdentifier_LabelsEntry, 0, len(ls))
for _, l := range ls {
entries = append(entries, logproto.SeriesIdentifier_LabelsEntry{
Key: l.Name,
Value: l.Value,
})
}
return entries
return now
}
func planFromString(s string) *plan.QueryPlan {
@ -483,3 +331,22 @@ func (b *testDataBuilder) close() {
require.NoError(b.t, b.bucket.Close())
os.RemoveAll(b.dir)
}
type sampleWithLabels struct {
Labels string
Samples logproto.Sample
}
// Helper function to read all samples from an iterator
func readAllSamples(it iter.SampleIterator) ([]sampleWithLabels, error) {
var result []sampleWithLabels
defer it.Close()
for it.Next() {
sample := it.At()
result = append(result, sampleWithLabels{
Labels: it.Labels(),
Samples: sample,
})
}
return result, it.Err()
}

@ -253,11 +253,13 @@ Outer:
heap.Pop(i.heap)
previous := i.buffer
var dupe bool
for _, t := range previous {
if t.Sample.Hash == sample.Hash {
i.stats.AddDuplicates(1)
dupe = true
break
if sample.Hash != 0 {
for _, t := range previous {
if t.Sample.Hash == sample.Hash {
i.stats.AddDuplicates(1)
dupe = true
break
}
}
}
if !dupe {
@ -277,10 +279,12 @@ Outer:
sample.Timestamp != i.buffer[0].Timestamp {
break
}
for _, t := range previous {
if t.Hash == sample.Hash {
i.stats.AddDuplicates(1)
continue inner
if sample.Hash != 0 {
for _, t := range previous {
if t.Hash == sample.Hash {
i.stats.AddDuplicates(1)
continue inner
}
}
}
i.buffer = append(i.buffer, sampleWithLabels{

@ -491,3 +491,53 @@ func TestDedupeMergeSampleIterator(t *testing.T) {
require.Equal(t, 1., it.At().Value)
require.Equal(t, xxhash.Sum64String("3"), it.At().Hash)
}
func TestMergeSampleIteratorZeroHash(t *testing.T) {
// Create series with samples that have zero hashes but same timestamps
series1 := logproto.Series{
Labels: `{foo="bar"}`,
StreamHash: hashLabels(`{foo="bar"}`),
Samples: []logproto.Sample{
{Timestamp: 1, Value: 1.0, Hash: 0}, // Zero hash
{Timestamp: 1, Value: 2.0, Hash: 0}, // Zero hash, same timestamp
{Timestamp: 2, Value: 3.0, Hash: 42}, // Non-zero hash
},
}
series2 := logproto.Series{
Labels: `{foo="bar"}`,
StreamHash: hashLabels(`{foo="bar"}`),
Samples: []logproto.Sample{
{Timestamp: 1, Value: 4.0, Hash: 0}, // Zero hash, same timestamp
{Timestamp: 2, Value: 3.0, Hash: 42}, // Non-zero hash, should be deduplicated
},
}
it := NewMergeSampleIterator(context.Background(), []SampleIterator{
NewSeriesIterator(series1),
NewSeriesIterator(series2),
})
// Should get all samples with zero hash at timestamp 1
require.True(t, it.Next())
require.Equal(t, `{foo="bar"}`, it.Labels())
require.Equal(t, logproto.Sample{Timestamp: 1, Value: 1.0, Hash: 0}, it.At())
require.True(t, it.Next())
require.Equal(t, `{foo="bar"}`, it.Labels())
require.Equal(t, logproto.Sample{Timestamp: 1, Value: 2.0, Hash: 0}, it.At())
require.True(t, it.Next())
require.Equal(t, `{foo="bar"}`, it.Labels())
require.Equal(t, logproto.Sample{Timestamp: 1, Value: 4.0, Hash: 0}, it.At())
// Should get only one sample with non-zero hash at timestamp 2 (deduplicated)
require.True(t, it.Next())
require.Equal(t, `{foo="bar"}`, it.Labels())
require.Equal(t, logproto.Sample{Timestamp: 2, Value: 3.0, Hash: 42}, it.At())
// No more samples
require.False(t, it.Next())
require.NoError(t, it.Err())
require.NoError(t, it.Close())
}

Loading…
Cancel
Save