From 4bc95c0c68f5adb79ccafdf82a2023262fc31d33 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Wed, 12 Feb 2025 20:52:37 +0100 Subject: [PATCH] feat(dataobj): Add methods for querying data objects metadata (#16190) --- pkg/dataobj/builder.go | 4 +- pkg/dataobj/metastore/metastore.go | 6 +- pkg/dataobj/querier/metadata.go | 298 ++++++++++++++++++ pkg/dataobj/querier/store.go | 61 ++-- pkg/dataobj/querier/store_test.go | 485 +++++++++++++++++++++++++++++ pkg/loki/modules.go | 4 +- 6 files changed, 833 insertions(+), 25 deletions(-) create mode 100644 pkg/dataobj/querier/metadata.go create mode 100644 pkg/dataobj/querier/store_test.go diff --git a/pkg/dataobj/builder.go b/pkg/dataobj/builder.go index fa8f551956..245488bc18 100644 --- a/pkg/dataobj/builder.go +++ b/pkg/dataobj/builder.go @@ -141,9 +141,7 @@ func NewBuilder(cfg BuilderConfig) (*Builder, error) { return nil, fmt.Errorf("failed to create LRU cache: %w", err) } - var ( - metrics = newMetrics() - ) + metrics := newMetrics() metrics.ObserveConfig(cfg) return &Builder{ diff --git a/pkg/dataobj/metastore/metastore.go b/pkg/dataobj/metastore/metastore.go index d14d6a1421..eacc5ceb02 100644 --- a/pkg/dataobj/metastore/metastore.go +++ b/pkg/dataobj/metastore/metastore.go @@ -234,7 +234,9 @@ func listObjectsFromStores(ctx context.Context, bucket objstore.Bucket, storePat g.Go(func() error { var err error objects[i], err = listObjects(ctx, bucket, path, start, end) - if err != nil { + // If the metastore object is not found, it means it's outside of any existing window + // and we can safely ignore it. + if err != nil && !bucket.IsObjNotFoundErr(err) { return fmt.Errorf("listing objects from metastore %s: %w", path, err) } return nil @@ -252,7 +254,7 @@ func listObjects(ctx context.Context, bucket objstore.Bucket, path string, start var buf bytes.Buffer objectReader, err := bucket.Get(ctx, path) if err != nil { - return nil, fmt.Errorf("getting metastore object: %w", err) + return nil, err } n, err := buf.ReadFrom(objectReader) if err != nil { diff --git a/pkg/dataobj/querier/metadata.go b/pkg/dataobj/querier/metadata.go new file mode 100644 index 0000000000..eccb4afd3c --- /dev/null +++ b/pkg/dataobj/querier/metadata.go @@ -0,0 +1,298 @@ +package querier + +import ( + "context" + "fmt" + "io" + "sort" + "sync" + "time" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "golang.org/x/sync/errgroup" + + "github.com/grafana/loki/v3/pkg/dataobj" + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql" +) + +// SelectSeries implements querier.Store +func (s *Store) SelectSeries(ctx context.Context, req logql.SelectLogParams) ([]logproto.SeriesIdentifier, error) { + objects, err := s.objectsForTimeRange(ctx, req.Start, req.End) + if err != nil { + return nil, err + } + + shard, err := parseShards(req.Shards) + if err != nil { + return nil, err + } + + var matchers []*labels.Matcher + if req.Selector != "" { + expr, err := req.LogSelector() + if err != nil { + return nil, err + } + matchers = expr.Matchers() + } + + uniqueSeries := &sync.Map{} + + processor := newStreamProcessor(req.Start, req.End, matchers, objects, shard) + + err = processor.ProcessParallel(ctx, func(h uint64, stream dataobj.Stream) { + uniqueSeries.Store(h, labelsToSeriesIdentifier(stream.Labels)) + }) + if err != nil { + return nil, err + } + var result []logproto.SeriesIdentifier + + // Convert sync.Map to slice + uniqueSeries.Range(func(_, value interface{}) bool { + if sid, ok := value.(logproto.SeriesIdentifier); ok { + result = append(result, sid) + } + return true + }) + + return result, nil +} + +// LabelNamesForMetricName implements querier.Store +func (s *Store) LabelNamesForMetricName(ctx context.Context, _ string, from, through model.Time, _ string, matchers ...*labels.Matcher) ([]string, error) { + start, end := from.Time(), through.Time() + objects, err := s.objectsForTimeRange(ctx, start, end) + if err != nil { + return nil, err + } + + processor := newStreamProcessor(start, end, matchers, objects, noShard) + uniqueNames := sync.Map{} + + err = processor.ProcessParallel(ctx, func(_ uint64, stream dataobj.Stream) { + for _, label := range stream.Labels { + uniqueNames.Store(label.Name, nil) + } + }) + if err != nil { + return nil, err + } + + names := []string{} + uniqueNames.Range(func(key, _ interface{}) bool { + names = append(names, key.(string)) + return true + }) + + sort.Strings(names) + + return names, nil +} + +// LabelValuesForMetricName implements querier.Store +func (s *Store) LabelValuesForMetricName(ctx context.Context, _ string, from, through model.Time, _ string, labelName string, matchers ...*labels.Matcher) ([]string, error) { + start, end := from.Time(), through.Time() + + requireLabel, err := labels.NewMatcher(labels.MatchNotEqual, labelName, "") + if err != nil { + return nil, fmt.Errorf("failed to instantiate label matcher: %w", err) + } + + matchers = append(matchers, requireLabel) + + objects, err := s.objectsForTimeRange(ctx, start, end) + if err != nil { + return nil, err + } + + processor := newStreamProcessor(start, end, matchers, objects, noShard) + uniqueValues := sync.Map{} + + err = processor.ProcessParallel(ctx, func(_ uint64, stream dataobj.Stream) { + uniqueValues.Store(stream.Labels.Get(labelName), nil) + }) + if err != nil { + return nil, err + } + + values := []string{} + uniqueValues.Range(func(key, _ interface{}) bool { + values = append(values, key.(string)) + return true + }) + + sort.Strings(values) + + return values, nil +} + +var streamsPool = sync.Pool{ + New: func() any { + streams := make([]dataobj.Stream, 1024) + return &streams + }, +} + +// streamProcessor handles processing of unique series with custom collection logic +type streamProcessor struct { + predicate dataobj.StreamsPredicate + seenSeries *sync.Map + objects []*dataobj.Object + shard logql.Shard +} + +// newStreamProcessor creates a new streamProcessor with the given parameters +func newStreamProcessor(start, end time.Time, matchers []*labels.Matcher, objects []*dataobj.Object, shard logql.Shard) *streamProcessor { + // Create a time range predicate + var predicate dataobj.StreamsPredicate = dataobj.TimeRangePredicate[dataobj.StreamsPredicate]{ + StartTime: start, + EndTime: end, + IncludeStart: true, + IncludeEnd: true, + } + + // If there are any matchers, combine them with an AND predicate + if len(matchers) > 0 { + predicate = dataobj.AndPredicate[dataobj.StreamsPredicate]{ + Left: predicate, + Right: matchersToPredicate(matchers), + } + } + + return &streamProcessor{ + predicate: predicate, + seenSeries: &sync.Map{}, + objects: objects, + shard: shard, + } +} + +// matchersToPredicate converts a list of matchers to a dataobj.StreamsPredicate +func matchersToPredicate(matchers []*labels.Matcher) dataobj.StreamsPredicate { + var left dataobj.StreamsPredicate + for _, matcher := range matchers { + var right dataobj.StreamsPredicate + switch matcher.Type { + case labels.MatchEqual: + right = dataobj.LabelMatcherPredicate{Name: matcher.Name, Value: matcher.Value} + default: + right = dataobj.LabelFilterPredicate{Name: matcher.Name, Keep: func(_, value string) bool { + return matcher.Matches(value) + }} + } + if left == nil { + left = right + } else { + left = dataobj.AndPredicate[dataobj.StreamsPredicate]{ + Left: left, + Right: right, + } + } + } + return left +} + +// ProcessParallel processes series from multiple readers in parallel +func (sp *streamProcessor) ProcessParallel(ctx context.Context, onNewStream func(uint64, dataobj.Stream)) error { + readers, err := shardStreamReaders(ctx, sp.objects, sp.shard) + if err != nil { + return err + } + + // set predicate on all readers + for _, reader := range readers { + if err := reader.SetPredicate(sp.predicate); err != nil { + return err + } + } + + g, ctx := errgroup.WithContext(ctx) + for _, reader := range readers { + g.Go(func() error { + return sp.processSingleReader(ctx, reader, onNewStream) + }) + } + return g.Wait() +} + +func (sp *streamProcessor) processSingleReader(ctx context.Context, reader *dataobj.StreamsReader, onNewStream func(uint64, dataobj.Stream)) error { + var ( + streamsPtr = streamsPool.Get().(*[]dataobj.Stream) + streams = *streamsPtr + buf = make([]byte, 0, 1024) + h uint64 + ) + + defer streamsPool.Put(streamsPtr) + + for { + n, err := reader.Read(ctx, streams) + if err != nil && err != io.EOF { + return err + } + if n == 0 { + break + } + for _, stream := range streams[:n] { + h, buf = stream.Labels.HashWithoutLabels(buf, []string(nil)...) + // Try to claim this hash first + if _, seen := sp.seenSeries.LoadOrStore(h, nil); seen { + continue + } + onNewStream(h, stream) + } + } + return nil +} + +func labelsToSeriesIdentifier(labels labels.Labels) logproto.SeriesIdentifier { + series := make([]logproto.SeriesIdentifier_LabelsEntry, len(labels)) + for i, label := range labels { + series[i] = logproto.SeriesIdentifier_LabelsEntry{ + Key: label.Name, + Value: label.Value, + } + } + return logproto.SeriesIdentifier{ + Labels: series, + } +} + +// shardStreamReaders fetches metadata of objects in parallel and shards them into a list of StreamsReaders +func shardStreamReaders(ctx context.Context, objects []*dataobj.Object, shard logql.Shard) ([]*dataobj.StreamsReader, error) { + // fetch all metadata of objects in parallel + g, ctx := errgroup.WithContext(ctx) + metadatas := make([]dataobj.Metadata, len(objects)) + for i, obj := range objects { + g.Go(func() error { + var err error + metadatas[i], err = obj.Metadata(ctx) + return err + }) + } + if err := g.Wait(); err != nil { + return nil, err + } + // sectionIndex tracks the global section number across all objects to ensure consistent sharding + var sectionIndex uint64 + var readers []*dataobj.StreamsReader + for i, metadata := range metadatas { + for j := 0; j < metadata.StreamsSections; j++ { + // For sharded queries (e.g., "1 of 2"), we only read sections that belong to our shard + // The section is assigned to a shard based on its global index across all objects + if shard.PowerOfTwo != nil && shard.PowerOfTwo.Of > 1 { + if sectionIndex%uint64(shard.PowerOfTwo.Of) != uint64(shard.PowerOfTwo.Shard) { + sectionIndex++ + continue + } + } + reader := dataobj.NewStreamsReader(objects[i], j) + readers = append(readers, reader) + sectionIndex++ + } + } + return readers, nil +} diff --git a/pkg/dataobj/querier/store.go b/pkg/dataobj/querier/store.go index 31fcb96c0b..65e7b6d074 100644 --- a/pkg/dataobj/querier/store.go +++ b/pkg/dataobj/querier/store.go @@ -4,11 +4,15 @@ import ( "context" "flag" "fmt" + "time" + "github.com/grafana/dskit/tenant" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/thanos-io/objstore" + "github.com/grafana/loki/v3/pkg/dataobj" + "github.com/grafana/loki/v3/pkg/dataobj/metastore" "github.com/grafana/loki/v3/pkg/iter" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql" @@ -16,6 +20,7 @@ import ( "github.com/grafana/loki/v3/pkg/storage/chunk" storageconfig "github.com/grafana/loki/v3/pkg/storage/config" "github.com/grafana/loki/v3/pkg/storage/stores/index/stats" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index" ) var _ querier.Store = &Store{} @@ -59,24 +64,6 @@ func (s *Store) SelectSamples(_ context.Context, _ logql.SelectSampleParams) (it return iter.NoopSampleIterator, nil } -// SelectSeries implements querier.Store -func (s *Store) SelectSeries(_ context.Context, _ logql.SelectLogParams) ([]logproto.SeriesIdentifier, error) { - // TODO: Implement - return []logproto.SeriesIdentifier{}, nil -} - -// LabelValuesForMetricName implements querier.Store -func (s *Store) LabelValuesForMetricName(_ context.Context, _ string, _ model.Time, _ model.Time, _ string, _ string, _ ...*labels.Matcher) ([]string, error) { - // TODO: Implement - return []string{}, nil -} - -// LabelNamesForMetricName implements querier.Store -func (s *Store) LabelNamesForMetricName(_ context.Context, _ string, _ model.Time, _ model.Time, _ string, _ ...*labels.Matcher) ([]string, error) { - // TODO: Implement - return []string{}, nil -} - // Stats implements querier.Store func (s *Store) Stats(_ context.Context, _ string, _ model.Time, _ model.Time, _ ...*labels.Matcher) (*stats.Stats, error) { // TODO: Implement @@ -94,3 +81,41 @@ func (s *Store) GetShards(_ context.Context, _ string, _ model.Time, _ model.Tim // TODO: Implement return &logproto.ShardsResponse{}, nil } + +func (s *Store) objectsForTimeRange(ctx context.Context, from, through time.Time) ([]*dataobj.Object, error) { + userID, err := tenant.TenantID(ctx) + if err != nil { + return nil, err + } + files, err := metastore.ListDataObjects(ctx, s.bucket, userID, from, through) + if err != nil { + return nil, err + } + + objects := make([]*dataobj.Object, 0, len(files)) + for _, path := range files { + objects = append(objects, dataobj.FromBucket(s.bucket, path)) + } + return objects, nil +} + +var noShard = logql.Shard{ + PowerOfTwo: &index.ShardAnnotation{ + Shard: uint32(1), + Of: uint32(1), + }, +} + +func parseShards(shards []string) (logql.Shard, error) { + if len(shards) == 0 { + return noShard, nil + } + parsed, _, err := logql.ParseShards(shards) + if err != nil { + return noShard, err + } + if len(parsed) == 0 { + return noShard, nil + } + return parsed[0], nil +} diff --git a/pkg/dataobj/querier/store_test.go b/pkg/dataobj/querier/store_test.go new file mode 100644 index 0000000000..a2877c77fb --- /dev/null +++ b/pkg/dataobj/querier/store_test.go @@ -0,0 +1,485 @@ +package querier + +import ( + "bytes" + "context" + "os" + "path/filepath" + "sort" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/dskit/user" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" + "github.com/thanos-io/objstore/providers/filesystem" + + "github.com/grafana/loki/v3/pkg/dataobj" + "github.com/grafana/loki/v3/pkg/dataobj/metastore" + "github.com/grafana/loki/v3/pkg/dataobj/uploader" + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql" + "github.com/grafana/loki/v3/pkg/logql/syntax" + "github.com/grafana/loki/v3/pkg/querier/plan" +) + +func TestStore_SelectSeries(t *testing.T) { + const testTenant = "test-tenant" + builder := newTestDataBuilder(t, testTenant) + defer builder.close() + + // Setup test data + now := setupTestData(t, builder) + + store := NewStore(builder.bucket) + ctx := user.InjectOrgID(context.Background(), testTenant) + + tests := []struct { + name string + selector string + want []string + }{ + { + name: "select all series", + selector: ``, + want: []string{ + `{app="foo", env="prod"}`, + `{app="foo", env="dev"}`, + `{app="bar", env="prod"}`, + `{app="bar", env="dev"}`, + `{app="baz", env="prod", team="a"}`, + }, + }, + { + name: "select with equality matcher", + selector: `{app="foo"}`, + want: []string{ + `{app="foo", env="prod"}`, + `{app="foo", env="dev"}`, + }, + }, + { + name: "select with regex matcher", + selector: `{app=~"foo|bar"}`, + want: []string{ + `{app="foo", env="prod"}`, + `{app="foo", env="dev"}`, + `{app="bar", env="prod"}`, + `{app="bar", env="dev"}`, + }, + }, + { + name: "select with negative equality matcher", + selector: `{app=~".+", app!="foo"}`, + want: []string{ + `{app="bar", env="prod"}`, + `{app="bar", env="dev"}`, + `{app="baz", env="prod", team="a"}`, + }, + }, + { + name: "select with negative regex matcher", + selector: `{app=~".+", app!~"foo|bar"}`, + want: []string{ + `{app="baz", env="prod", team="a"}`, + }, + }, + { + name: "select with multiple matchers", + selector: `{app="foo", env="prod"}`, + want: []string{ + `{app="foo", env="prod"}`, + }, + }, + { + name: "select with regex and equality matchers", + selector: `{app=~"foo|bar", env="prod"}`, + want: []string{ + `{app="foo", env="prod"}`, + `{app="bar", env="prod"}`, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + series, err := store.SelectSeries(ctx, logql.SelectLogParams{ + QueryRequest: &logproto.QueryRequest{ + Start: now.Add(-time.Hour), + End: now.Add(time.Hour), + Plan: planFromString(tt.selector), + Selector: tt.selector, + }, + }) + require.NoError(t, err) + + var got []string + for _, s := range series { + got = append(got, labelsFromSeriesID(s)) + } + require.ElementsMatch(t, tt.want, got) + }) + } + + t.Run("sharding", func(t *testing.T) { + // Query first shard + series1, err := store.SelectSeries(ctx, logql.SelectLogParams{ + QueryRequest: &logproto.QueryRequest{ + Start: now.Add(-time.Hour), + End: now.Add(time.Hour), + Plan: planFromString(`{app=~"foo|bar|baz"}`), + Selector: `{app=~"foo|bar|baz"}`, + Shards: []string{"0_of_2"}, + }, + }) + require.NoError(t, err) + require.NotEmpty(t, series1) + require.Less(t, len(series1), 5) // Should get less than all series + + // Query second shard + series2, err := store.SelectSeries(ctx, logql.SelectLogParams{ + QueryRequest: &logproto.QueryRequest{ + Start: now.Add(-time.Hour), + End: now.Add(time.Hour), + Plan: planFromString(`{app=~"foo|bar|baz"}`), + Selector: `{app=~"foo|bar|baz"}`, + Shards: []string{"1_of_2"}, + }, + }) + require.NoError(t, err) + require.NotEmpty(t, series2) + + // Combined shards should equal all series + var allSeries []string + for _, s := range append(series1, series2...) { + allSeries = append(allSeries, labelsFromSeriesID(s)) + } + + want := []string{ + `{app="foo", env="prod"}`, + `{app="foo", env="dev"}`, + `{app="bar", env="prod"}`, + `{app="bar", env="dev"}`, + `{app="baz", env="prod", team="a"}`, + } + require.ElementsMatch(t, want, allSeries) + }) +} + +func TestStore_LabelNamesForMetricName(t *testing.T) { + const testTenant = "test-tenant" + builder := newTestDataBuilder(t, testTenant) + defer builder.close() + + // Setup test data + now := setupTestData(t, builder) + + store := NewStore(builder.bucket) + ctx := user.InjectOrgID(context.Background(), testTenant) + + tests := []struct { + name string + matchers []*labels.Matcher + want []string + }{ + { + name: "no matchers", + matchers: nil, + want: []string{"app", "env", "team"}, + }, + { + name: "with equality matcher", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "app", "foo"), + }, + want: []string{"app", "env"}, + }, + { + name: "with regex matcher", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchRegexp, "app", "foo|bar"), + }, + want: []string{"app", "env"}, + }, + { + name: "with negative matcher", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchNotEqual, "app", "foo"), + }, + want: []string{"app", "env", "team"}, + }, + { + name: "with negative regex matcher", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchNotRegexp, "app", "foo|bar"), + }, + want: []string{"app", "env", "team"}, + }, + { + name: "with multiple matchers", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "app", "foo"), + labels.MustNewMatcher(labels.MatchEqual, "env", "prod"), + }, + want: []string{"app", "env"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + names, err := store.LabelNamesForMetricName(ctx, "", model.TimeFromUnixNano(now.Add(-time.Hour).UnixNano()), model.TimeFromUnixNano(now.Add(time.Hour).UnixNano()), "", tt.matchers...) + require.NoError(t, err) + require.ElementsMatch(t, tt.want, names) + }) + } +} + +func TestStore_LabelValuesForMetricName(t *testing.T) { + const testTenant = "test-tenant" + builder := newTestDataBuilder(t, testTenant) + defer builder.close() + + // Setup test data + now := setupTestData(t, builder) + + store := NewStore(builder.bucket) + ctx := user.InjectOrgID(context.Background(), testTenant) + + tests := []struct { + name string + labelName string + matchers []*labels.Matcher + want []string + }{ + { + name: "app label without matchers", + labelName: "app", + matchers: nil, + want: []string{"bar", "baz", "foo"}, + }, + { + name: "env label without matchers", + labelName: "env", + matchers: nil, + want: []string{"dev", "prod"}, + }, + { + name: "team label without matchers", + labelName: "team", + matchers: nil, + want: []string{"a"}, + }, + { + name: "env label with app equality matcher", + labelName: "env", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "app", "foo"), + }, + want: []string{"dev", "prod"}, + }, + { + name: "env label with app regex matcher", + labelName: "env", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchRegexp, "app", "foo|bar"), + }, + want: []string{"dev", "prod"}, + }, + { + name: "env label with app negative matcher", + labelName: "env", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchNotEqual, "app", "foo"), + }, + want: []string{"dev", "prod"}, + }, + { + name: "env label with app negative regex matcher", + labelName: "env", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchNotRegexp, "app", "foo|bar"), + }, + want: []string{"prod"}, + }, + { + name: "env label with multiple matchers", + labelName: "env", + matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "app", "foo"), + labels.MustNewMatcher(labels.MatchEqual, "env", "prod"), + }, + want: []string{"prod"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + values, err := store.LabelValuesForMetricName(ctx, "", model.TimeFromUnixNano(now.Add(-time.Hour).UnixNano()), model.TimeFromUnixNano(now.Add(time.Hour).UnixNano()), "", tt.labelName, tt.matchers...) + require.NoError(t, err) + require.Equal(t, tt.want, values) + }) + } +} + +func setupTestData(t *testing.T, builder *testDataBuilder) time.Time { + t.Helper() + now := time.Now() + + // First object with app=foo series + builder.addStream( + `{app="foo", env="prod"}`, + logproto.Entry{Timestamp: now, Line: "foo1"}, + logproto.Entry{Timestamp: now.Add(time.Second), Line: "foo2"}, + ) + builder.addStream( + `{app="foo", env="dev"}`, + logproto.Entry{Timestamp: now, Line: "foo3"}, + logproto.Entry{Timestamp: now.Add(time.Second), Line: "foo4"}, + ) + builder.flush() + + // Second object with app=bar series + builder.addStream( + `{app="bar", env="prod"}`, + logproto.Entry{Timestamp: now, Line: "bar1"}, + logproto.Entry{Timestamp: now.Add(time.Second), Line: "bar2"}, + ) + builder.addStream( + `{app="bar", env="dev"}`, + logproto.Entry{Timestamp: now, Line: "bar3"}, + logproto.Entry{Timestamp: now.Add(time.Second), Line: "bar4"}, + ) + builder.flush() + + // Third object with app=baz series + builder.addStream( + `{app="baz", env="prod", team="a"}`, + logproto.Entry{Timestamp: now, Line: "baz1"}, + logproto.Entry{Timestamp: now.Add(time.Second), Line: "baz2"}, + ) + builder.flush() + + return now +} + +func labelsFromSeriesID(id logproto.SeriesIdentifier) string { + ls := make(labels.Labels, 0, len(id.Labels)) + for _, l := range id.Labels { + ls = append(ls, labels.Label{Name: l.Key, Value: l.Value}) + } + sort.Sort(ls) + return ls.String() +} + +func mustParseSeriesID(s string) logproto.SeriesIdentifier { + ls, err := syntax.ParseLabels(s) + if err != nil { + panic(err) + } + return logproto.SeriesIdentifier{ + Labels: labelsToSeriesLabels(ls), + } +} + +func labelsToSeriesLabels(ls labels.Labels) []logproto.SeriesIdentifier_LabelsEntry { + entries := make([]logproto.SeriesIdentifier_LabelsEntry, 0, len(ls)) + for _, l := range ls { + entries = append(entries, logproto.SeriesIdentifier_LabelsEntry{ + Key: l.Name, + Value: l.Value, + }) + } + return entries +} + +func planFromString(s string) *plan.QueryPlan { + if s == "" { + return nil + } + expr, err := syntax.ParseExpr(s) + if err != nil { + panic(err) + } + return &plan.QueryPlan{ + AST: expr, + } +} + +// testDataBuilder helps build test data for querier tests. +type testDataBuilder struct { + t *testing.T + bucket objstore.Bucket + dir string + + tenantID string + builder *dataobj.Builder + meta *metastore.Manager + uploader *uploader.Uploader +} + +func newTestDataBuilder(t *testing.T, tenantID string) *testDataBuilder { + dir := t.TempDir() + bucket, err := filesystem.NewBucket(dir) + require.NoError(t, err) + + // Create required directories for metastore + metastoreDir := filepath.Join(dir, "tenant-"+tenantID, "metastore") + require.NoError(t, os.MkdirAll(metastoreDir, 0o755)) + + builder, err := dataobj.NewBuilder(dataobj.BuilderConfig{ + TargetPageSize: 1024 * 1024, // 1MB + TargetObjectSize: 10 * 1024 * 1024, // 10MB + TargetSectionSize: 1024 * 1024, // 1MB + BufferSize: 1024 * 1024, // 1MB + }) + require.NoError(t, err) + + meta := metastore.NewManager(bucket, tenantID, log.NewLogfmtLogger(os.Stdout)) + require.NoError(t, meta.RegisterMetrics(prometheus.NewRegistry())) + + uploader := uploader.New(uploader.Config{SHAPrefixSize: 2}, bucket, tenantID) + require.NoError(t, uploader.RegisterMetrics(prometheus.NewRegistry())) + + return &testDataBuilder{ + t: t, + bucket: bucket, + dir: dir, + tenantID: tenantID, + builder: builder, + meta: meta, + uploader: uploader, + } +} + +func (b *testDataBuilder) addStream(labels string, entries ...logproto.Entry) { + err := b.builder.Append(logproto.Stream{ + Labels: labels, + Entries: entries, + }) + require.NoError(b.t, err) +} + +func (b *testDataBuilder) flush() { + buf := bytes.NewBuffer(make([]byte, 0, 1024*1024)) + stats, err := b.builder.Flush(buf) + require.NoError(b.t, err) + + // Upload the data object using the uploader + path, err := b.uploader.Upload(context.Background(), buf) + require.NoError(b.t, err) + + // Update metastore with the new data object + err = b.meta.UpdateMetastore(context.Background(), path, stats) + require.NoError(b.t, err) + + b.builder.Reset() +} + +func (b *testDataBuilder) close() { + require.NoError(b.t, b.bucket.Close()) + os.RemoveAll(b.dir) +} diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 5a49ea5362..c782123a56 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -1956,13 +1956,13 @@ func (t *Loki) initDataObjConsumer() (services.Service, error) { return t.dataObjConsumer, nil } -func (t *Loki) createDataObjBucket(name string) (objstore.Bucket, error) { +func (t *Loki) createDataObjBucket(clientName string) (objstore.Bucket, error) { schema, err := t.Cfg.SchemaConfig.SchemaForTime(model.Now()) if err != nil { return nil, fmt.Errorf("failed to get schema for now: %w", err) } var objstoreBucket objstore.Bucket - objstoreBucket, err = bucket.NewClient(context.Background(), schema.ObjectType, t.Cfg.StorageConfig.ObjectStore.Config, name, util_log.Logger) + objstoreBucket, err = bucket.NewClient(context.Background(), schema.ObjectType, t.Cfg.StorageConfig.ObjectStore.Config, clientName, util_log.Logger) if err != nil { return nil, err }