From 613116e3714fda2829952f084fdc6fa508d2daa6 Mon Sep 17 00:00:00 2001 From: benclive Date: Wed, 30 Jul 2025 11:37:02 +0100 Subject: [PATCH] chore: Fix empty matchers returning all entries in Metastore Section lookup (#18642) --- pkg/dataobj/metastore/object.go | 36 +++++++---- pkg/dataobj/metastore/object_test.go | 89 ++++++++++++++++++++++++++++ 2 files changed, 112 insertions(+), 13 deletions(-) diff --git a/pkg/dataobj/metastore/object.go b/pkg/dataobj/metastore/object.go index df3226f97e..8ab9cdfcd3 100644 --- a/pkg/dataobj/metastore/object.go +++ b/pkg/dataobj/metastore/object.go @@ -229,17 +229,19 @@ func (m *ObjectMetastore) Sections(ctx context.Context, start, end time.Time, ma } initialSectionPointersCount := len(streamSectionPointers) - // Search the section AMQs to estimate sections that might match the predicates - // AMQs may return false positives so this is an over-estimate. - pointerMatchers := pointerPredicateFromMatchers(predicates...) - sectionMembershipEstimates, err := m.estimateSectionsForPredicates(ctx, paths, pointerMatchers) - if err != nil { - return nil, err - } + if len(predicates) > 0 { + // Search the section AMQs to estimate sections that might match the predicates + // AMQs may return false positives so this is an over-estimate. + pointerMatchers := pointerPredicateFromMatchers(predicates...) + sectionMembershipEstimates, err := m.estimateSectionsForPredicates(ctx, paths, pointerMatchers) + if err != nil { + return nil, err + } - streamSectionPointers = intersectSections(streamSectionPointers, sectionMembershipEstimates) - if len(streamSectionPointers) == 0 { - return nil, errors.New("no relevant sections returned") + streamSectionPointers = intersectSections(streamSectionPointers, sectionMembershipEstimates) + if len(streamSectionPointers) == 0 { + return nil, errors.New("no relevant sections returned") + } } duration := sectionsTimer.ObserveDuration() @@ -381,9 +383,7 @@ func streamPredicateFromMatchers(start, end time.Time, matchers ...*labels.Match return predicates[0] } - current := streams.AndRowPredicate{ - Left: predicates[0], - } + current := predicates[0] for _, predicate := range predicates[1:] { and := streams.AndRowPredicate{ @@ -513,6 +513,11 @@ func (m *ObjectMetastore) listStreamIDsFromObjects(ctx context.Context, paths [] // getSectionsForStreams reads the section data from matching streams and aggregates them into section descriptors. // This is an exact lookup and includes metadata from the streams in each section: the stream IDs, the min-max timestamps, the number of bytes & number of lines. func (m *ObjectMetastore) getSectionsForStreams(ctx context.Context, paths []string, streamPredicate streams.RowPredicate, timeRangePredicate pointers.TimeRangeRowPredicate) ([]*DataobjSectionDescriptor, error) { + if streamPredicate == nil { + // At least one stream matcher is required, currently. + return nil, nil + } + timer := prometheus.NewTimer(m.metrics.streamFilterTotalDuration) defer timer.ObserveDuration() @@ -541,6 +546,11 @@ func (m *ObjectMetastore) getSectionsForStreams(ctx context.Context, paths []str } streamReadTimer.ObserveDuration() + if len(matchingStreamIDs) == 0 { + // No streams match, so skip reading the section pointers or we'll match all of them. + return nil + } + objectSectionDescriptors := make(map[SectionKey]*DataobjSectionDescriptor) sectionPointerReadTimer := prometheus.NewTimer(m.metrics.streamFilterPointersReadDuration) err = forEachObjPointer(ctx, idxObject, timeRangePredicate, matchingStreamIDs, func(pointer pointers.SectionPointer) { diff --git a/pkg/dataobj/metastore/object_test.go b/pkg/dataobj/metastore/object_test.go index 9e61f1a8c1..1c2f6bd6cc 100644 --- a/pkg/dataobj/metastore/object_test.go +++ b/pkg/dataobj/metastore/object_test.go @@ -16,8 +16,11 @@ import ( "github.com/thanos-io/objstore" "github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj" + "github.com/grafana/loki/v3/pkg/dataobj/index/indexobj" + "github.com/grafana/loki/v3/pkg/dataobj/sections/streams" "github.com/grafana/loki/v3/pkg/dataobj/uploader" "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql/syntax" ) const ( @@ -243,6 +246,92 @@ func TestValuesEmptyMatcher(t *testing.T) { }) } +func TestSectionsForStreamMatchers(t *testing.T) { + ctx := user.InjectOrgID(context.Background(), tenantID) + + builder, err := indexobj.NewBuilder(indexobj.BuilderConfig{ + TargetPageSize: 1024 * 1024, + TargetObjectSize: 10 * 1024 * 1024, + TargetSectionSize: 128, + BufferSize: 1024 * 1024, + SectionStripeMergeLimit: 2, + }) + require.NoError(t, err) + + for i, ts := range testStreams { + lbls, err := syntax.ParseLabels(ts.Labels) + require.NoError(t, err) + + newIdx, err := builder.AppendStream(streams.Stream{ + ID: int64(i), + Labels: lbls, + MinTimestamp: ts.Entries[0].Timestamp, + MaxTimestamp: ts.Entries[0].Timestamp, + UncompressedSize: 0, + }) + require.NoError(t, err) + err = builder.ObserveLogLine("test-path", 0, newIdx, int64(i), ts.Entries[0].Timestamp, int64(len(ts.Entries[0].Line))) + require.NoError(t, err) + } + + buf := bytes.NewBuffer(make([]byte, 0, 1024*1024)) + stats, err := builder.Flush(buf) + require.NoError(t, err) + + bucket := objstore.NewInMemBucket() + + uploader := uploader.New(uploader.Config{SHAPrefixSize: 2}, bucket, tenantID, log.NewNopLogger()) + require.NoError(t, uploader.RegisterMetrics(prometheus.NewPedanticRegistry())) + + path, err := uploader.Upload(context.Background(), buf) + require.NoError(t, err) + + metastoreUpdater := NewUpdater(UpdaterConfig{}, bucket, tenantID, log.NewNopLogger()) + + err = metastoreUpdater.Update(context.Background(), path, stats.MinTimestamp, stats.MaxTimestamp) + require.NoError(t, err) + + mstore := NewObjectMetastore(bucket, log.NewNopLogger(), prometheus.NewPedanticRegistry()) + + tests := []struct { + name string + matchers []*labels.Matcher + predicates []*labels.Matcher + wantCount int + }{ + { + name: "no matchers returns no sections", + matchers: nil, + predicates: nil, + wantCount: 0, + }, + { + name: "single matcher returns matching sections", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "app", "foo"), + }, + predicates: nil, + wantCount: 1, + }, + { + name: "non-existent matcher", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "app", "doesnotexist"), + }, + predicates: nil, + wantCount: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + sections, err := mstore.Sections(ctx, now.Add(-time.Hour), now.Add(time.Hour), tt.matchers, tt.predicates) + require.NoError(t, err) + require.Len(t, sections, tt.wantCount) + }) + } +} + func queryMetastore(t *testing.T, tenantID string, mfunc func(context.Context, time.Time, time.Time, Metastore)) { now := time.Now().UTC() start := now.Add(-time.Hour * 5)