feat(metastore): Implement Labels() and Values() on ObjectMetastore (#16734)

pull/16819/head
aarogoss 2 months ago committed by GitHub
parent 9691f3e5dc
commit a57a80ea32
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 45
      pkg/dataobj/metastore/object.go
  2. 260
      pkg/dataobj/metastore/object_test.go

@ -6,6 +6,8 @@ import (
"fmt"
"io"
"iter"
"maps"
"slices"
"sort"
"strconv"
"sync"
@ -88,12 +90,47 @@ func (m *ObjectMetastore) DataObjects(ctx context.Context, start, end time.Time,
return m.listObjectsFromStores(ctx, storePaths, start, end)
}
func (m *ObjectMetastore) Labels(_ context.Context, _, _ time.Time, _ ...*labels.Matcher) ([]string, error) {
return nil, nil
func (m *ObjectMetastore) Labels(ctx context.Context, start, end time.Time, matchers ...*labels.Matcher) ([]string, error) {
uniqueLabels := map[string]struct{}{}
err := m.forEachLabel(ctx, start, end, func(label labels.Label) {
if _, ok := uniqueLabels[label.Name]; !ok {
uniqueLabels[label.Name] = struct{}{}
}
}, matchers...)
return slices.Collect(maps.Keys(uniqueLabels)), err
}
func (m *ObjectMetastore) Values(_ context.Context, _, _ time.Time, _ ...*labels.Matcher) ([]string, error) {
return nil, nil
func (m *ObjectMetastore) Values(ctx context.Context, start, end time.Time, matchers ...*labels.Matcher) ([]string, error) {
values := map[string]struct{}{}
err := m.forEachLabel(ctx, start, end, func(label labels.Label) {
if _, ok := values[label.Value]; !ok {
values[label.Value] = struct{}{}
}
}, matchers...)
return slices.Collect(maps.Keys(values)), err
}
func (m *ObjectMetastore) forEachLabel(ctx context.Context, start, end time.Time, foreach func(labels.Label), matchers ...*labels.Matcher) error {
streams, err := m.Streams(ctx, start, end, matchers...)
if err != nil {
return err
}
for _, streamLabels := range streams {
if streamLabels == nil {
continue
}
for _, streamLabel := range *streamLabels {
foreach(streamLabel)
}
}
return nil
}
func predicateFromMatchers(start, end time.Time, matchers ...*labels.Matcher) dataobj.StreamsPredicate {

@ -0,0 +1,260 @@
package metastore
import (
"bytes"
"context"
"os"
"slices"
"testing"
"time"
"github.com/go-kit/log"
"github.com/grafana/dskit/user"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/uploader"
"github.com/grafana/loki/v3/pkg/logproto"
)
const (
tenantID = "test-tenant"
)
var (
now = time.Now().UTC()
// our streams won't use any log lines, therefore leave them out of the Entry structs
streams = []logproto.Stream{
{
Labels: `{app="foo", env="prod"}`,
Entries: []logproto.Entry{{Timestamp: now.Add(-2 * time.Hour)}},
},
{
Labels: `{app="foo", env="dev"}`,
Entries: []logproto.Entry{{Timestamp: now}},
},
{
Labels: `{app="bar", env="prod"}`,
Entries: []logproto.Entry{{Timestamp: now.Add(5 * time.Second)}},
},
{
Labels: `{app="bar", env="dev"}`,
Entries: []logproto.Entry{{Timestamp: now.Add(8 * time.Minute)}},
},
{
Labels: `{app="baz", env="prod", team="a"}`,
Entries: []logproto.Entry{{Timestamp: now.Add(12 * time.Minute)}},
},
{
Labels: `{app="foo", env="prod"}`,
Entries: []logproto.Entry{{Timestamp: now.Add(-12 * time.Hour)}},
},
{
Labels: `{app="foo", env="prod"}`,
Entries: []logproto.Entry{{Timestamp: now.Add(12 * time.Hour)}},
},
}
)
// Similar to store_test.go -- we need a populated dataobj/builder/metastore to test labels and values
type testDataBuilder struct {
t *testing.T
bucket objstore.Bucket
builder *dataobj.Builder
meta *Updater
uploader *uploader.Uploader
}
func (b *testDataBuilder) addStreamAndFlush(stream logproto.Stream) {
err := b.builder.Append(stream)
require.NoError(b.t, err)
buf := bytes.NewBuffer(make([]byte, 0, 1024*1024))
stats, err := b.builder.Flush(buf)
require.NoError(b.t, err)
path, err := b.uploader.Upload(context.Background(), buf)
require.NoError(b.t, err)
err = b.meta.Update(context.Background(), path, stats)
require.NoError(b.t, err)
b.builder.Reset()
}
func TestLabels(t *testing.T) {
matchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"),
labels.MustNewMatcher(labels.MatchEqual, "env", "prod"),
}
queryMetastore(t, tenantID, func(ctx context.Context, start, end time.Time, mstore Metastore) {
matchedLabels, err := mstore.Labels(ctx, start, end, matchers...)
require.NoError(t, err)
require.Len(t, matchedLabels, len(matchers))
})
}
func TestNonExistentLabels(t *testing.T) {
matchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "app", "invalid"),
labels.MustNewMatcher(labels.MatchEqual, "env", "ops"),
}
queryMetastore(t, tenantID, func(ctx context.Context, start, end time.Time, mstore Metastore) {
matchedLabels, err := mstore.Labels(ctx, start, end, matchers...)
require.NoError(t, err)
require.Len(t, matchedLabels, 0)
})
}
func TestMixedLabels(t *testing.T) {
matchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"),
labels.MustNewMatcher(labels.MatchEqual, "env", "invalid"),
}
queryMetastore(t, tenantID, func(ctx context.Context, start, end time.Time, mstore Metastore) {
matchedLabels, err := mstore.Labels(ctx, start, end, matchers...)
require.NoError(t, err)
require.Len(t, matchedLabels, 0)
})
}
func TestLabelsSingleMatcher(t *testing.T) {
matchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "env", "prod"),
}
queryMetastore(t, tenantID, func(ctx context.Context, start, end time.Time, mstore Metastore) {
matchedLabels, err := mstore.Labels(ctx, start, end, matchers...)
require.NoError(t, err)
require.Len(t, matchedLabels, 3)
for _, expectedLabel := range []string{"env", "team", "app"} {
require.NotEqual(t, slices.Index(matchedLabels, expectedLabel), -1)
}
})
}
func TestLabelsEmptyMatcher(t *testing.T) {
queryMetastore(t, tenantID, func(ctx context.Context, start, end time.Time, mstore Metastore) {
matchedLabels, err := mstore.Labels(ctx, start, end)
require.NoError(t, err)
require.Len(t, matchedLabels, 3)
})
}
func TestValues(t *testing.T) {
matchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"),
labels.MustNewMatcher(labels.MatchEqual, "env", "prod"),
}
queryMetastore(t, tenantID, func(ctx context.Context, start, end time.Time, mstore Metastore) {
matchedValues, err := mstore.Values(ctx, start, end, matchers...)
require.NoError(t, err)
require.Len(t, matchedValues, len(matchers))
})
}
func TestNonExistentValues(t *testing.T) {
matchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "app", "invalid"),
labels.MustNewMatcher(labels.MatchEqual, "env", "ops"),
}
queryMetastore(t, tenantID, func(ctx context.Context, start, end time.Time, mstore Metastore) {
matchedValues, err := mstore.Values(ctx, start, end, matchers...)
require.NoError(t, err)
require.Len(t, matchedValues, 0)
})
}
func TestMixedValues(t *testing.T) {
matchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"),
labels.MustNewMatcher(labels.MatchEqual, "env", "ops"),
}
queryMetastore(t, tenantID, func(ctx context.Context, start, end time.Time, mstore Metastore) {
matchedValues, err := mstore.Values(ctx, start, end, matchers...)
require.NoError(t, err)
require.Len(t, matchedValues, 0)
})
}
func TestValuesSingleMatcher(t *testing.T) {
matchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "env", "prod"),
}
queryMetastore(t, tenantID, func(ctx context.Context, start, end time.Time, mstore Metastore) {
matchedValues, err := mstore.Values(ctx, start, end, matchers...)
require.NoError(t, err)
require.Len(t, matchedValues, 5)
})
}
func TestValuesEmptyMatcher(t *testing.T) {
queryMetastore(t, tenantID, func(ctx context.Context, start, end time.Time, mstore Metastore) {
matchedValues, err := mstore.Values(ctx, start, end)
require.NoError(t, err)
require.Len(t, matchedValues, 6)
for _, expectedValue := range []string{"foo", "prod", "bar", "dev", "baz", "a"} {
require.NotEqual(t, slices.Index(matchedValues, expectedValue), -1)
}
})
}
func queryMetastore(t *testing.T, tenantID string, mfunc func(context.Context, time.Time, time.Time, Metastore)) {
now := time.Now().UTC()
start := now.Add(-time.Hour * 5)
end := now.Add(time.Hour * 5)
builder := newTestDataBuilder(t, tenantID)
for _, stream := range streams {
builder.addStreamAndFlush(stream)
}
mstore := NewObjectMetastore(builder.bucket)
defer func() {
require.NoError(t, mstore.bucket.Close())
}()
ctx := user.InjectOrgID(context.Background(), tenantID)
mfunc(ctx, start, end, mstore)
}
func newTestDataBuilder(t *testing.T, tenantID string) *testDataBuilder {
bucket := objstore.NewInMemBucket()
builder, err := dataobj.NewBuilder(dataobj.BuilderConfig{
TargetPageSize: 1024 * 1024, // 1MB
TargetObjectSize: 10 * 1024 * 1024, // 10MB
TargetSectionSize: 1024 * 1024, // 1MB
BufferSize: 1024 * 1024, // 1MB
})
require.NoError(t, err)
meta := NewUpdater(bucket, tenantID, log.NewLogfmtLogger(os.Stdout))
require.NoError(t, meta.RegisterMetrics(prometheus.NewPedanticRegistry()))
uploader := uploader.New(uploader.Config{SHAPrefixSize: 2}, bucket, tenantID)
require.NoError(t, uploader.RegisterMetrics(prometheus.NewPedanticRegistry()))
return &testDataBuilder{
t: t,
bucket: bucket,
builder: builder,
meta: meta,
uploader: uploader,
}
}
Loading…
Cancel
Save