feat(dataobj): Add methods for querying data objects metadata (#16190)

pull/16227/head
Cyril Tovena 11 months ago committed by GitHub
parent 5aa9e47d1c
commit 4bc95c0c68
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 4
      pkg/dataobj/builder.go
  2. 6
      pkg/dataobj/metastore/metastore.go
  3. 298
      pkg/dataobj/querier/metadata.go
  4. 61
      pkg/dataobj/querier/store.go
  5. 485
      pkg/dataobj/querier/store_test.go
  6. 4
      pkg/loki/modules.go

@ -141,9 +141,7 @@ func NewBuilder(cfg BuilderConfig) (*Builder, error) {
return nil, fmt.Errorf("failed to create LRU cache: %w", err)
}
var (
metrics = newMetrics()
)
metrics := newMetrics()
metrics.ObserveConfig(cfg)
return &Builder{

@ -234,7 +234,9 @@ func listObjectsFromStores(ctx context.Context, bucket objstore.Bucket, storePat
g.Go(func() error {
var err error
objects[i], err = listObjects(ctx, bucket, path, start, end)
if err != nil {
// If the metastore object is not found, it means it's outside of any existing window
// and we can safely ignore it.
if err != nil && !bucket.IsObjNotFoundErr(err) {
return fmt.Errorf("listing objects from metastore %s: %w", path, err)
}
return nil
@ -252,7 +254,7 @@ func listObjects(ctx context.Context, bucket objstore.Bucket, path string, start
var buf bytes.Buffer
objectReader, err := bucket.Get(ctx, path)
if err != nil {
return nil, fmt.Errorf("getting metastore object: %w", err)
return nil, err
}
n, err := buf.ReadFrom(objectReader)
if err != nil {

@ -0,0 +1,298 @@
package querier
import (
"context"
"fmt"
"io"
"sort"
"sync"
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"golang.org/x/sync/errgroup"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql"
)
// SelectSeries implements querier.Store
func (s *Store) SelectSeries(ctx context.Context, req logql.SelectLogParams) ([]logproto.SeriesIdentifier, error) {
objects, err := s.objectsForTimeRange(ctx, req.Start, req.End)
if err != nil {
return nil, err
}
shard, err := parseShards(req.Shards)
if err != nil {
return nil, err
}
var matchers []*labels.Matcher
if req.Selector != "" {
expr, err := req.LogSelector()
if err != nil {
return nil, err
}
matchers = expr.Matchers()
}
uniqueSeries := &sync.Map{}
processor := newStreamProcessor(req.Start, req.End, matchers, objects, shard)
err = processor.ProcessParallel(ctx, func(h uint64, stream dataobj.Stream) {
uniqueSeries.Store(h, labelsToSeriesIdentifier(stream.Labels))
})
if err != nil {
return nil, err
}
var result []logproto.SeriesIdentifier
// Convert sync.Map to slice
uniqueSeries.Range(func(_, value interface{}) bool {
if sid, ok := value.(logproto.SeriesIdentifier); ok {
result = append(result, sid)
}
return true
})
return result, nil
}
// LabelNamesForMetricName implements querier.Store
func (s *Store) LabelNamesForMetricName(ctx context.Context, _ string, from, through model.Time, _ string, matchers ...*labels.Matcher) ([]string, error) {
start, end := from.Time(), through.Time()
objects, err := s.objectsForTimeRange(ctx, start, end)
if err != nil {
return nil, err
}
processor := newStreamProcessor(start, end, matchers, objects, noShard)
uniqueNames := sync.Map{}
err = processor.ProcessParallel(ctx, func(_ uint64, stream dataobj.Stream) {
for _, label := range stream.Labels {
uniqueNames.Store(label.Name, nil)
}
})
if err != nil {
return nil, err
}
names := []string{}
uniqueNames.Range(func(key, _ interface{}) bool {
names = append(names, key.(string))
return true
})
sort.Strings(names)
return names, nil
}
// LabelValuesForMetricName implements querier.Store
func (s *Store) LabelValuesForMetricName(ctx context.Context, _ string, from, through model.Time, _ string, labelName string, matchers ...*labels.Matcher) ([]string, error) {
start, end := from.Time(), through.Time()
requireLabel, err := labels.NewMatcher(labels.MatchNotEqual, labelName, "")
if err != nil {
return nil, fmt.Errorf("failed to instantiate label matcher: %w", err)
}
matchers = append(matchers, requireLabel)
objects, err := s.objectsForTimeRange(ctx, start, end)
if err != nil {
return nil, err
}
processor := newStreamProcessor(start, end, matchers, objects, noShard)
uniqueValues := sync.Map{}
err = processor.ProcessParallel(ctx, func(_ uint64, stream dataobj.Stream) {
uniqueValues.Store(stream.Labels.Get(labelName), nil)
})
if err != nil {
return nil, err
}
values := []string{}
uniqueValues.Range(func(key, _ interface{}) bool {
values = append(values, key.(string))
return true
})
sort.Strings(values)
return values, nil
}
var streamsPool = sync.Pool{
New: func() any {
streams := make([]dataobj.Stream, 1024)
return &streams
},
}
// streamProcessor handles processing of unique series with custom collection logic
type streamProcessor struct {
predicate dataobj.StreamsPredicate
seenSeries *sync.Map
objects []*dataobj.Object
shard logql.Shard
}
// newStreamProcessor creates a new streamProcessor with the given parameters
func newStreamProcessor(start, end time.Time, matchers []*labels.Matcher, objects []*dataobj.Object, shard logql.Shard) *streamProcessor {
// Create a time range predicate
var predicate dataobj.StreamsPredicate = dataobj.TimeRangePredicate[dataobj.StreamsPredicate]{
StartTime: start,
EndTime: end,
IncludeStart: true,
IncludeEnd: true,
}
// If there are any matchers, combine them with an AND predicate
if len(matchers) > 0 {
predicate = dataobj.AndPredicate[dataobj.StreamsPredicate]{
Left: predicate,
Right: matchersToPredicate(matchers),
}
}
return &streamProcessor{
predicate: predicate,
seenSeries: &sync.Map{},
objects: objects,
shard: shard,
}
}
// matchersToPredicate converts a list of matchers to a dataobj.StreamsPredicate
func matchersToPredicate(matchers []*labels.Matcher) dataobj.StreamsPredicate {
var left dataobj.StreamsPredicate
for _, matcher := range matchers {
var right dataobj.StreamsPredicate
switch matcher.Type {
case labels.MatchEqual:
right = dataobj.LabelMatcherPredicate{Name: matcher.Name, Value: matcher.Value}
default:
right = dataobj.LabelFilterPredicate{Name: matcher.Name, Keep: func(_, value string) bool {
return matcher.Matches(value)
}}
}
if left == nil {
left = right
} else {
left = dataobj.AndPredicate[dataobj.StreamsPredicate]{
Left: left,
Right: right,
}
}
}
return left
}
// ProcessParallel processes series from multiple readers in parallel
func (sp *streamProcessor) ProcessParallel(ctx context.Context, onNewStream func(uint64, dataobj.Stream)) error {
readers, err := shardStreamReaders(ctx, sp.objects, sp.shard)
if err != nil {
return err
}
// set predicate on all readers
for _, reader := range readers {
if err := reader.SetPredicate(sp.predicate); err != nil {
return err
}
}
g, ctx := errgroup.WithContext(ctx)
for _, reader := range readers {
g.Go(func() error {
return sp.processSingleReader(ctx, reader, onNewStream)
})
}
return g.Wait()
}
func (sp *streamProcessor) processSingleReader(ctx context.Context, reader *dataobj.StreamsReader, onNewStream func(uint64, dataobj.Stream)) error {
var (
streamsPtr = streamsPool.Get().(*[]dataobj.Stream)
streams = *streamsPtr
buf = make([]byte, 0, 1024)
h uint64
)
defer streamsPool.Put(streamsPtr)
for {
n, err := reader.Read(ctx, streams)
if err != nil && err != io.EOF {
return err
}
if n == 0 {
break
}
for _, stream := range streams[:n] {
h, buf = stream.Labels.HashWithoutLabels(buf, []string(nil)...)
// Try to claim this hash first
if _, seen := sp.seenSeries.LoadOrStore(h, nil); seen {
continue
}
onNewStream(h, stream)
}
}
return nil
}
func labelsToSeriesIdentifier(labels labels.Labels) logproto.SeriesIdentifier {
series := make([]logproto.SeriesIdentifier_LabelsEntry, len(labels))
for i, label := range labels {
series[i] = logproto.SeriesIdentifier_LabelsEntry{
Key: label.Name,
Value: label.Value,
}
}
return logproto.SeriesIdentifier{
Labels: series,
}
}
// shardStreamReaders fetches metadata of objects in parallel and shards them into a list of StreamsReaders
func shardStreamReaders(ctx context.Context, objects []*dataobj.Object, shard logql.Shard) ([]*dataobj.StreamsReader, error) {
// fetch all metadata of objects in parallel
g, ctx := errgroup.WithContext(ctx)
metadatas := make([]dataobj.Metadata, len(objects))
for i, obj := range objects {
g.Go(func() error {
var err error
metadatas[i], err = obj.Metadata(ctx)
return err
})
}
if err := g.Wait(); err != nil {
return nil, err
}
// sectionIndex tracks the global section number across all objects to ensure consistent sharding
var sectionIndex uint64
var readers []*dataobj.StreamsReader
for i, metadata := range metadatas {
for j := 0; j < metadata.StreamsSections; j++ {
// For sharded queries (e.g., "1 of 2"), we only read sections that belong to our shard
// The section is assigned to a shard based on its global index across all objects
if shard.PowerOfTwo != nil && shard.PowerOfTwo.Of > 1 {
if sectionIndex%uint64(shard.PowerOfTwo.Of) != uint64(shard.PowerOfTwo.Shard) {
sectionIndex++
continue
}
}
reader := dataobj.NewStreamsReader(objects[i], j)
readers = append(readers, reader)
sectionIndex++
}
}
return readers, nil
}

@ -4,11 +4,15 @@ import (
"context"
"flag"
"fmt"
"time"
"github.com/grafana/dskit/tenant"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/thanos-io/objstore"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/metastore"
"github.com/grafana/loki/v3/pkg/iter"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql"
@ -16,6 +20,7 @@ import (
"github.com/grafana/loki/v3/pkg/storage/chunk"
storageconfig "github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/index/stats"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
)
var _ querier.Store = &Store{}
@ -59,24 +64,6 @@ func (s *Store) SelectSamples(_ context.Context, _ logql.SelectSampleParams) (it
return iter.NoopSampleIterator, nil
}
// SelectSeries implements querier.Store
func (s *Store) SelectSeries(_ context.Context, _ logql.SelectLogParams) ([]logproto.SeriesIdentifier, error) {
// TODO: Implement
return []logproto.SeriesIdentifier{}, nil
}
// LabelValuesForMetricName implements querier.Store
func (s *Store) LabelValuesForMetricName(_ context.Context, _ string, _ model.Time, _ model.Time, _ string, _ string, _ ...*labels.Matcher) ([]string, error) {
// TODO: Implement
return []string{}, nil
}
// LabelNamesForMetricName implements querier.Store
func (s *Store) LabelNamesForMetricName(_ context.Context, _ string, _ model.Time, _ model.Time, _ string, _ ...*labels.Matcher) ([]string, error) {
// TODO: Implement
return []string{}, nil
}
// Stats implements querier.Store
func (s *Store) Stats(_ context.Context, _ string, _ model.Time, _ model.Time, _ ...*labels.Matcher) (*stats.Stats, error) {
// TODO: Implement
@ -94,3 +81,41 @@ func (s *Store) GetShards(_ context.Context, _ string, _ model.Time, _ model.Tim
// TODO: Implement
return &logproto.ShardsResponse{}, nil
}
func (s *Store) objectsForTimeRange(ctx context.Context, from, through time.Time) ([]*dataobj.Object, error) {
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}
files, err := metastore.ListDataObjects(ctx, s.bucket, userID, from, through)
if err != nil {
return nil, err
}
objects := make([]*dataobj.Object, 0, len(files))
for _, path := range files {
objects = append(objects, dataobj.FromBucket(s.bucket, path))
}
return objects, nil
}
var noShard = logql.Shard{
PowerOfTwo: &index.ShardAnnotation{
Shard: uint32(1),
Of: uint32(1),
},
}
func parseShards(shards []string) (logql.Shard, error) {
if len(shards) == 0 {
return noShard, nil
}
parsed, _, err := logql.ParseShards(shards)
if err != nil {
return noShard, err
}
if len(parsed) == 0 {
return noShard, nil
}
return parsed[0], nil
}

@ -0,0 +1,485 @@
package querier
import (
"bytes"
"context"
"os"
"path/filepath"
"sort"
"testing"
"time"
"github.com/go-kit/log"
"github.com/grafana/dskit/user"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/providers/filesystem"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/metastore"
"github.com/grafana/loki/v3/pkg/dataobj/uploader"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/querier/plan"
)
func TestStore_SelectSeries(t *testing.T) {
const testTenant = "test-tenant"
builder := newTestDataBuilder(t, testTenant)
defer builder.close()
// Setup test data
now := setupTestData(t, builder)
store := NewStore(builder.bucket)
ctx := user.InjectOrgID(context.Background(), testTenant)
tests := []struct {
name string
selector string
want []string
}{
{
name: "select all series",
selector: ``,
want: []string{
`{app="foo", env="prod"}`,
`{app="foo", env="dev"}`,
`{app="bar", env="prod"}`,
`{app="bar", env="dev"}`,
`{app="baz", env="prod", team="a"}`,
},
},
{
name: "select with equality matcher",
selector: `{app="foo"}`,
want: []string{
`{app="foo", env="prod"}`,
`{app="foo", env="dev"}`,
},
},
{
name: "select with regex matcher",
selector: `{app=~"foo|bar"}`,
want: []string{
`{app="foo", env="prod"}`,
`{app="foo", env="dev"}`,
`{app="bar", env="prod"}`,
`{app="bar", env="dev"}`,
},
},
{
name: "select with negative equality matcher",
selector: `{app=~".+", app!="foo"}`,
want: []string{
`{app="bar", env="prod"}`,
`{app="bar", env="dev"}`,
`{app="baz", env="prod", team="a"}`,
},
},
{
name: "select with negative regex matcher",
selector: `{app=~".+", app!~"foo|bar"}`,
want: []string{
`{app="baz", env="prod", team="a"}`,
},
},
{
name: "select with multiple matchers",
selector: `{app="foo", env="prod"}`,
want: []string{
`{app="foo", env="prod"}`,
},
},
{
name: "select with regex and equality matchers",
selector: `{app=~"foo|bar", env="prod"}`,
want: []string{
`{app="foo", env="prod"}`,
`{app="bar", env="prod"}`,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
series, err := store.SelectSeries(ctx, logql.SelectLogParams{
QueryRequest: &logproto.QueryRequest{
Start: now.Add(-time.Hour),
End: now.Add(time.Hour),
Plan: planFromString(tt.selector),
Selector: tt.selector,
},
})
require.NoError(t, err)
var got []string
for _, s := range series {
got = append(got, labelsFromSeriesID(s))
}
require.ElementsMatch(t, tt.want, got)
})
}
t.Run("sharding", func(t *testing.T) {
// Query first shard
series1, err := store.SelectSeries(ctx, logql.SelectLogParams{
QueryRequest: &logproto.QueryRequest{
Start: now.Add(-time.Hour),
End: now.Add(time.Hour),
Plan: planFromString(`{app=~"foo|bar|baz"}`),
Selector: `{app=~"foo|bar|baz"}`,
Shards: []string{"0_of_2"},
},
})
require.NoError(t, err)
require.NotEmpty(t, series1)
require.Less(t, len(series1), 5) // Should get less than all series
// Query second shard
series2, err := store.SelectSeries(ctx, logql.SelectLogParams{
QueryRequest: &logproto.QueryRequest{
Start: now.Add(-time.Hour),
End: now.Add(time.Hour),
Plan: planFromString(`{app=~"foo|bar|baz"}`),
Selector: `{app=~"foo|bar|baz"}`,
Shards: []string{"1_of_2"},
},
})
require.NoError(t, err)
require.NotEmpty(t, series2)
// Combined shards should equal all series
var allSeries []string
for _, s := range append(series1, series2...) {
allSeries = append(allSeries, labelsFromSeriesID(s))
}
want := []string{
`{app="foo", env="prod"}`,
`{app="foo", env="dev"}`,
`{app="bar", env="prod"}`,
`{app="bar", env="dev"}`,
`{app="baz", env="prod", team="a"}`,
}
require.ElementsMatch(t, want, allSeries)
})
}
func TestStore_LabelNamesForMetricName(t *testing.T) {
const testTenant = "test-tenant"
builder := newTestDataBuilder(t, testTenant)
defer builder.close()
// Setup test data
now := setupTestData(t, builder)
store := NewStore(builder.bucket)
ctx := user.InjectOrgID(context.Background(), testTenant)
tests := []struct {
name string
matchers []*labels.Matcher
want []string
}{
{
name: "no matchers",
matchers: nil,
want: []string{"app", "env", "team"},
},
{
name: "with equality matcher",
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"),
},
want: []string{"app", "env"},
},
{
name: "with regex matcher",
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchRegexp, "app", "foo|bar"),
},
want: []string{"app", "env"},
},
{
name: "with negative matcher",
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchNotEqual, "app", "foo"),
},
want: []string{"app", "env", "team"},
},
{
name: "with negative regex matcher",
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchNotRegexp, "app", "foo|bar"),
},
want: []string{"app", "env", "team"},
},
{
name: "with multiple matchers",
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"),
labels.MustNewMatcher(labels.MatchEqual, "env", "prod"),
},
want: []string{"app", "env"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
names, err := store.LabelNamesForMetricName(ctx, "", model.TimeFromUnixNano(now.Add(-time.Hour).UnixNano()), model.TimeFromUnixNano(now.Add(time.Hour).UnixNano()), "", tt.matchers...)
require.NoError(t, err)
require.ElementsMatch(t, tt.want, names)
})
}
}
func TestStore_LabelValuesForMetricName(t *testing.T) {
const testTenant = "test-tenant"
builder := newTestDataBuilder(t, testTenant)
defer builder.close()
// Setup test data
now := setupTestData(t, builder)
store := NewStore(builder.bucket)
ctx := user.InjectOrgID(context.Background(), testTenant)
tests := []struct {
name string
labelName string
matchers []*labels.Matcher
want []string
}{
{
name: "app label without matchers",
labelName: "app",
matchers: nil,
want: []string{"bar", "baz", "foo"},
},
{
name: "env label without matchers",
labelName: "env",
matchers: nil,
want: []string{"dev", "prod"},
},
{
name: "team label without matchers",
labelName: "team",
matchers: nil,
want: []string{"a"},
},
{
name: "env label with app equality matcher",
labelName: "env",
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"),
},
want: []string{"dev", "prod"},
},
{
name: "env label with app regex matcher",
labelName: "env",
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchRegexp, "app", "foo|bar"),
},
want: []string{"dev", "prod"},
},
{
name: "env label with app negative matcher",
labelName: "env",
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchNotEqual, "app", "foo"),
},
want: []string{"dev", "prod"},
},
{
name: "env label with app negative regex matcher",
labelName: "env",
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchNotRegexp, "app", "foo|bar"),
},
want: []string{"prod"},
},
{
name: "env label with multiple matchers",
labelName: "env",
matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"),
labels.MustNewMatcher(labels.MatchEqual, "env", "prod"),
},
want: []string{"prod"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
values, err := store.LabelValuesForMetricName(ctx, "", model.TimeFromUnixNano(now.Add(-time.Hour).UnixNano()), model.TimeFromUnixNano(now.Add(time.Hour).UnixNano()), "", tt.labelName, tt.matchers...)
require.NoError(t, err)
require.Equal(t, tt.want, values)
})
}
}
func setupTestData(t *testing.T, builder *testDataBuilder) time.Time {
t.Helper()
now := time.Now()
// First object with app=foo series
builder.addStream(
`{app="foo", env="prod"}`,
logproto.Entry{Timestamp: now, Line: "foo1"},
logproto.Entry{Timestamp: now.Add(time.Second), Line: "foo2"},
)
builder.addStream(
`{app="foo", env="dev"}`,
logproto.Entry{Timestamp: now, Line: "foo3"},
logproto.Entry{Timestamp: now.Add(time.Second), Line: "foo4"},
)
builder.flush()
// Second object with app=bar series
builder.addStream(
`{app="bar", env="prod"}`,
logproto.Entry{Timestamp: now, Line: "bar1"},
logproto.Entry{Timestamp: now.Add(time.Second), Line: "bar2"},
)
builder.addStream(
`{app="bar", env="dev"}`,
logproto.Entry{Timestamp: now, Line: "bar3"},
logproto.Entry{Timestamp: now.Add(time.Second), Line: "bar4"},
)
builder.flush()
// Third object with app=baz series
builder.addStream(
`{app="baz", env="prod", team="a"}`,
logproto.Entry{Timestamp: now, Line: "baz1"},
logproto.Entry{Timestamp: now.Add(time.Second), Line: "baz2"},
)
builder.flush()
return now
}
func labelsFromSeriesID(id logproto.SeriesIdentifier) string {
ls := make(labels.Labels, 0, len(id.Labels))
for _, l := range id.Labels {
ls = append(ls, labels.Label{Name: l.Key, Value: l.Value})
}
sort.Sort(ls)
return ls.String()
}
func mustParseSeriesID(s string) logproto.SeriesIdentifier {
ls, err := syntax.ParseLabels(s)
if err != nil {
panic(err)
}
return logproto.SeriesIdentifier{
Labels: labelsToSeriesLabels(ls),
}
}
func labelsToSeriesLabels(ls labels.Labels) []logproto.SeriesIdentifier_LabelsEntry {
entries := make([]logproto.SeriesIdentifier_LabelsEntry, 0, len(ls))
for _, l := range ls {
entries = append(entries, logproto.SeriesIdentifier_LabelsEntry{
Key: l.Name,
Value: l.Value,
})
}
return entries
}
func planFromString(s string) *plan.QueryPlan {
if s == "" {
return nil
}
expr, err := syntax.ParseExpr(s)
if err != nil {
panic(err)
}
return &plan.QueryPlan{
AST: expr,
}
}
// testDataBuilder helps build test data for querier tests.
type testDataBuilder struct {
t *testing.T
bucket objstore.Bucket
dir string
tenantID string
builder *dataobj.Builder
meta *metastore.Manager
uploader *uploader.Uploader
}
func newTestDataBuilder(t *testing.T, tenantID string) *testDataBuilder {
dir := t.TempDir()
bucket, err := filesystem.NewBucket(dir)
require.NoError(t, err)
// Create required directories for metastore
metastoreDir := filepath.Join(dir, "tenant-"+tenantID, "metastore")
require.NoError(t, os.MkdirAll(metastoreDir, 0o755))
builder, err := dataobj.NewBuilder(dataobj.BuilderConfig{
TargetPageSize: 1024 * 1024, // 1MB
TargetObjectSize: 10 * 1024 * 1024, // 10MB
TargetSectionSize: 1024 * 1024, // 1MB
BufferSize: 1024 * 1024, // 1MB
})
require.NoError(t, err)
meta := metastore.NewManager(bucket, tenantID, log.NewLogfmtLogger(os.Stdout))
require.NoError(t, meta.RegisterMetrics(prometheus.NewRegistry()))
uploader := uploader.New(uploader.Config{SHAPrefixSize: 2}, bucket, tenantID)
require.NoError(t, uploader.RegisterMetrics(prometheus.NewRegistry()))
return &testDataBuilder{
t: t,
bucket: bucket,
dir: dir,
tenantID: tenantID,
builder: builder,
meta: meta,
uploader: uploader,
}
}
func (b *testDataBuilder) addStream(labels string, entries ...logproto.Entry) {
err := b.builder.Append(logproto.Stream{
Labels: labels,
Entries: entries,
})
require.NoError(b.t, err)
}
func (b *testDataBuilder) flush() {
buf := bytes.NewBuffer(make([]byte, 0, 1024*1024))
stats, err := b.builder.Flush(buf)
require.NoError(b.t, err)
// Upload the data object using the uploader
path, err := b.uploader.Upload(context.Background(), buf)
require.NoError(b.t, err)
// Update metastore with the new data object
err = b.meta.UpdateMetastore(context.Background(), path, stats)
require.NoError(b.t, err)
b.builder.Reset()
}
func (b *testDataBuilder) close() {
require.NoError(b.t, b.bucket.Close())
os.RemoveAll(b.dir)
}

@ -1956,13 +1956,13 @@ func (t *Loki) initDataObjConsumer() (services.Service, error) {
return t.dataObjConsumer, nil
}
func (t *Loki) createDataObjBucket(name string) (objstore.Bucket, error) {
func (t *Loki) createDataObjBucket(clientName string) (objstore.Bucket, error) {
schema, err := t.Cfg.SchemaConfig.SchemaForTime(model.Now())
if err != nil {
return nil, fmt.Errorf("failed to get schema for now: %w", err)
}
var objstoreBucket objstore.Bucket
objstoreBucket, err = bucket.NewClient(context.Background(), schema.ObjectType, t.Cfg.StorageConfig.ObjectStore.Config, name, util_log.Logger)
objstoreBucket, err = bucket.NewClient(context.Background(), schema.ObjectType, t.Cfg.StorageConfig.ObjectStore.Config, clientName, util_log.Logger)
if err != nil {
return nil, err
}

Loading…
Cancel
Save