diff --git a/pkg/dataobj/metastore/object.go b/pkg/dataobj/metastore/object.go index 9fd682f50c..93b95ec5d7 100644 --- a/pkg/dataobj/metastore/object.go +++ b/pkg/dataobj/metastore/object.go @@ -272,13 +272,21 @@ func (m *ObjectMetastore) Sections(ctx context.Context, start, end time.Time, ma if len(predicates) > 0 { // Search the section AMQs to estimate sections that might match the predicates // AMQs may return false positives so this is an over-estimate. - pointerMatchers := pointerPredicateFromMatchers(predicates...) - sectionMembershipEstimates, err := m.estimateSectionsForPredicates(ctx, indexObjects, pointerMatchers) - if err != nil { - return nil, err - } + // + // Only match one predicate at a time in order to obtain the intersection of all estimates, rather than the union. + for _, predicate := range predicates { + pointerMatchers := pointerPredicateFromMatcher(predicate) + sectionMembershipEstimates, err := m.estimateSectionsForPredicates(ctx, indexObjects, pointerMatchers) + if err != nil { + return nil, err + } - streamSectionPointers = intersectSections(streamSectionPointers, sectionMembershipEstimates) + streamSectionPointers = intersectSections(streamSectionPointers, sectionMembershipEstimates) + if len(streamSectionPointers) == 0 { + // Short circuit here if no sections match the predicates + return streamSectionPointers, nil + } + } } duration := sectionsTimer.ObserveDuration() @@ -432,36 +440,20 @@ func streamPredicateFromMatchers(start, end time.Time, matchers ...*labels.Match return current } -func pointerPredicateFromMatchers(matchers ...*labels.Matcher) pointers.RowPredicate { - if len(matchers) == 0 { +func pointerPredicateFromMatcher(matcher *labels.Matcher) pointers.RowPredicate { + if matcher == nil { return nil } - - predicates := make([]pointers.RowPredicate, 0, len(matchers)+1) - for _, matcher := range matchers { - switch matcher.Type { - case labels.MatchEqual: - predicates = append(predicates, pointers.BloomExistenceRowPredicate{ - Name: matcher.Name, - Value: matcher.Value, - }) + switch matcher.Type { + case labels.MatchEqual: + return pointers.BloomExistenceRowPredicate{ + Name: matcher.Name, + Value: matcher.Value, } - } - - if len(predicates) == 0 { + default: + // unsupported matcher type return nil } - - current := predicates[0] - - for _, predicate := range predicates[1:] { - and := pointers.AndRowPredicate{ - Left: predicate, - Right: current, - } - current = and - } - return current } // listObjectsFromTables concurrently lists objects from multiple metastore files @@ -652,6 +644,12 @@ func (m *ObjectMetastore) estimateSectionsForPredicates(ctx context.Context, ind } pointerReadTimer.ObserveDuration() + if len(objectSectionDescriptors) == 0 { + // TODO(benclive): Find a way to differentiate between unknown columns and columns missing the target value. + // For now, log a warning to track how often this happens. + level.Warn(m.logger).Log("msg", "no section descriptors found for column") + } + sectionDescriptorsMutex.Lock() sectionDescriptors = append(sectionDescriptors, objectSectionDescriptors...) sectionDescriptorsMutex.Unlock() diff --git a/pkg/dataobj/metastore/object_test.go b/pkg/dataobj/metastore/object_test.go index d04fb3d50e..7ce57a9a12 100644 --- a/pkg/dataobj/metastore/object_test.go +++ b/pkg/dataobj/metastore/object_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/bits-and-blooms/bloom/v3" "github.com/go-kit/log" "github.com/grafana/dskit/user" "github.com/prometheus/client_golang/prometheus" @@ -390,6 +391,124 @@ func TestSectionsForStreamMatchers(t *testing.T) { } } +func TestSectionsForPredicateMatchers(t *testing.T) { + ctx := user.InjectOrgID(context.Background(), tenantID) + + builder, err := indexobj.NewBuilder(indexobj.BuilderConfig{ + TargetPageSize: 1024 * 1024, + TargetObjectSize: 10 * 1024 * 1024, + TargetSectionSize: 128, + BufferSize: 1024 * 1024, + SectionStripeMergeLimit: 2, + }, nil) + require.NoError(t, err) + + _, err = builder.AppendStream(tenantID, streams.Stream{ + ID: 1, + Labels: labels.New(labels.Label{Name: "app", Value: "foo"}), + MinTimestamp: now.Add(-3 * time.Hour), + MaxTimestamp: now.Add(-2 * time.Hour), + UncompressedSize: 5, + }) + require.NoError(t, err) + err = builder.ObserveLogLine(tenantID, "test-path", 0, 1, 1, now.Add(-3*time.Hour), 5) + require.NoError(t, err) + err = builder.ObserveLogLine(tenantID, "test-path", 0, 1, 1, now.Add(-2*time.Hour), 0) + require.NoError(t, err) + + traceIDBloom := bloom.NewWithEstimates(10, 0.01) + traceIDBloom.AddString("abcd") + traceIDBloom.AddString("1234") + traceIDBloomBytes, err := traceIDBloom.MarshalBinary() + require.NoError(t, err) + + err = builder.AppendColumnIndex(tenantID, "test-path", 0, "traceID", 0, traceIDBloomBytes) + require.NoError(t, err) + + // Build and store the object + timeRanges := builder.TimeRanges() + require.Len(t, timeRanges, 1) + + obj, closer, err := builder.Flush() + require.NoError(t, err) + t.Cleanup(func() { _ = closer.Close() }) + + bucket := objstore.NewInMemBucket() + + uploader := uploader.New(uploader.Config{SHAPrefixSize: 2}, bucket, log.NewNopLogger()) + require.NoError(t, uploader.RegisterMetrics(prometheus.NewPedanticRegistry())) + + path, err := uploader.Upload(context.Background(), obj) + require.NoError(t, err) + + metastoreTocWriter := NewTableOfContentsWriter(bucket, log.NewNopLogger()) + err = metastoreTocWriter.WriteEntry(context.Background(), path, timeRanges) + require.NoError(t, err) + + mstore := NewObjectMetastore(bucket, log.NewNopLogger(), prometheus.NewPedanticRegistry()) + + tests := []struct { + name string + predicates []*labels.Matcher + wantCount int + }{ + { + name: "no predicates returns all sections", + predicates: nil, + wantCount: 1, + }, + { + name: "single predicate returns matching sections", + predicates: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "traceID", "abcd"), + }, + wantCount: 1, + }, + { + name: "multiple valid predicates returns matching sections", + predicates: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "traceID", "abcd"), + labels.MustNewMatcher(labels.MatchEqual, "traceID", "1234"), + }, + wantCount: 1, + }, + { + name: "missing predicates returns no sections", + predicates: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "traceID", "cdef"), + }, + wantCount: 0, + }, + { + name: "partial missing predicates returns no sections", + predicates: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "traceID", "abcd"), + labels.MustNewMatcher(labels.MatchEqual, "traceID", "cdef"), + }, + wantCount: 0, + }, + { + name: "multiple missing predicates returns no sections", + predicates: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "traceID", "5678"), + labels.MustNewMatcher(labels.MatchEqual, "traceID", "cdef"), + }, + wantCount: 0, + }, + } + + matchers := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "app", "foo"), + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + sections, err := mstore.Sections(ctx, now.Add(-3*time.Hour), now.Add(time.Hour), matchers, tt.predicates) + require.NoError(t, err) + require.Len(t, sections, tt.wantCount) + }) + } +} + func queryMetastore(t *testing.T, tenant string, mfunc func(context.Context, time.Time, time.Time, Metastore)) { now := time.Now().UTC() start := now.Add(-time.Hour * 5)