mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
842 lines
26 KiB
842 lines
26 KiB
package metastore
|
|
|
|
import (
|
|
"context"
|
|
"os"
|
|
"slices"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/bits-and-blooms/bloom/v3"
|
|
"github.com/go-kit/log"
|
|
"github.com/grafana/dskit/user"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/prometheus/model/labels"
|
|
"github.com/stretchr/testify/require"
|
|
"github.com/thanos-io/objstore"
|
|
|
|
"github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/index/indexobj"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/sections/streams"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/uploader"
|
|
"github.com/grafana/loki/v3/pkg/logproto"
|
|
"github.com/grafana/loki/v3/pkg/logql/syntax"
|
|
)
|
|
|
|
const (
|
|
tenantID = "test-tenant"
|
|
)
|
|
|
|
var (
|
|
now = time.Now().UTC()
|
|
|
|
// our streams won't use any log lines, therefore leave them out of the Entry structs
|
|
testStreams = []logproto.Stream{
|
|
{
|
|
Labels: `{app="foo", env="prod"}`,
|
|
Entries: []logproto.Entry{{Timestamp: now.Add(-2 * time.Hour)}},
|
|
},
|
|
{
|
|
Labels: `{app="foo", env="dev"}`,
|
|
Entries: []logproto.Entry{{Timestamp: now}},
|
|
},
|
|
{
|
|
Labels: `{app="bar", env="prod"}`,
|
|
Entries: []logproto.Entry{{Timestamp: now.Add(5 * time.Second)}},
|
|
},
|
|
{
|
|
Labels: `{app="bar", env="dev"}`,
|
|
Entries: []logproto.Entry{{Timestamp: now.Add(8 * time.Minute)}},
|
|
},
|
|
{
|
|
Labels: `{app="baz", env="prod", team="a"}`,
|
|
Entries: []logproto.Entry{{Timestamp: now.Add(12 * time.Minute)}},
|
|
},
|
|
{
|
|
Labels: `{app="foo", env="prod"}`,
|
|
Entries: []logproto.Entry{{Timestamp: now.Add(-12 * time.Hour)}},
|
|
},
|
|
{
|
|
Labels: `{app="foo", env="prod"}`,
|
|
Entries: []logproto.Entry{{Timestamp: now.Add(12 * time.Hour)}},
|
|
},
|
|
}
|
|
)
|
|
|
|
// Similar to store_test.go -- we need a populated dataobj/builder/metastore to test labels and values
|
|
type testDataBuilder struct {
|
|
t testing.TB
|
|
bucket objstore.Bucket
|
|
|
|
builder *logsobj.Builder
|
|
meta *TableOfContentsWriter
|
|
uploader *uploader.Uploader
|
|
}
|
|
|
|
func (b *testDataBuilder) addStreamAndFlush(tenant string, stream logproto.Stream) {
|
|
err := b.builder.Append(tenant, stream)
|
|
require.NoError(b.t, err)
|
|
|
|
timeRanges := b.builder.TimeRanges()
|
|
obj, closer, err := b.builder.Flush()
|
|
require.NoError(b.t, err)
|
|
defer closer.Close()
|
|
|
|
path, err := b.uploader.Upload(b.t.Context(), obj)
|
|
require.NoError(b.t, err)
|
|
|
|
require.NoError(b.t, b.meta.WriteEntry(context.Background(), path, timeRanges))
|
|
}
|
|
|
|
func TestLabels(t *testing.T) {
|
|
matchers := []*labels.Matcher{
|
|
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"),
|
|
labels.MustNewMatcher(labels.MatchEqual, "env", "prod"),
|
|
}
|
|
|
|
queryMetastore(t, tenantID, func(ctx context.Context, start, end time.Time, mstore Metastore) {
|
|
matchedLabels, err := mstore.Labels(ctx, start, end, matchers...)
|
|
require.NoError(t, err)
|
|
require.Len(t, matchedLabels, len(matchers))
|
|
})
|
|
}
|
|
|
|
func TestNonExistentLabels(t *testing.T) {
|
|
matchers := []*labels.Matcher{
|
|
labels.MustNewMatcher(labels.MatchEqual, "app", "invalid"),
|
|
labels.MustNewMatcher(labels.MatchEqual, "env", "ops"),
|
|
}
|
|
|
|
queryMetastore(t, tenantID, func(ctx context.Context, start, end time.Time, mstore Metastore) {
|
|
matchedLabels, err := mstore.Labels(ctx, start, end, matchers...)
|
|
require.NoError(t, err)
|
|
require.Len(t, matchedLabels, 0)
|
|
})
|
|
}
|
|
|
|
func TestMixedLabels(t *testing.T) {
|
|
matchers := []*labels.Matcher{
|
|
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"),
|
|
labels.MustNewMatcher(labels.MatchEqual, "env", "invalid"),
|
|
}
|
|
|
|
queryMetastore(t, tenantID, func(ctx context.Context, start, end time.Time, mstore Metastore) {
|
|
matchedLabels, err := mstore.Labels(ctx, start, end, matchers...)
|
|
require.NoError(t, err)
|
|
require.Len(t, matchedLabels, 0)
|
|
})
|
|
}
|
|
|
|
func TestLabelsSingleMatcher(t *testing.T) {
|
|
matchers := []*labels.Matcher{
|
|
labels.MustNewMatcher(labels.MatchEqual, "env", "prod"),
|
|
}
|
|
|
|
queryMetastore(t, tenantID, func(ctx context.Context, start, end time.Time, mstore Metastore) {
|
|
matchedLabels, err := mstore.Labels(ctx, start, end, matchers...)
|
|
require.NoError(t, err)
|
|
|
|
require.Len(t, matchedLabels, 3)
|
|
for _, expectedLabel := range []string{"env", "team", "app"} {
|
|
require.NotEqual(t, slices.Index(matchedLabels, expectedLabel), -1)
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestLabelsEmptyMatcher(t *testing.T) {
|
|
queryMetastore(t, tenantID, func(ctx context.Context, start, end time.Time, mstore Metastore) {
|
|
matchedLabels, err := mstore.Labels(ctx, start, end)
|
|
require.NoError(t, err)
|
|
require.Len(t, matchedLabels, 3)
|
|
})
|
|
}
|
|
|
|
func TestValues(t *testing.T) {
|
|
matchers := []*labels.Matcher{
|
|
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"),
|
|
labels.MustNewMatcher(labels.MatchEqual, "env", "prod"),
|
|
}
|
|
|
|
queryMetastore(t, tenantID, func(ctx context.Context, start, end time.Time, mstore Metastore) {
|
|
matchedValues, err := mstore.Values(ctx, start, end, matchers...)
|
|
require.NoError(t, err)
|
|
require.Len(t, matchedValues, len(matchers))
|
|
})
|
|
}
|
|
|
|
func TestNonExistentValues(t *testing.T) {
|
|
matchers := []*labels.Matcher{
|
|
labels.MustNewMatcher(labels.MatchEqual, "app", "invalid"),
|
|
labels.MustNewMatcher(labels.MatchEqual, "env", "ops"),
|
|
}
|
|
|
|
queryMetastore(t, tenantID, func(ctx context.Context, start, end time.Time, mstore Metastore) {
|
|
matchedValues, err := mstore.Values(ctx, start, end, matchers...)
|
|
require.NoError(t, err)
|
|
require.Len(t, matchedValues, 0)
|
|
})
|
|
}
|
|
|
|
func TestMixedValues(t *testing.T) {
|
|
matchers := []*labels.Matcher{
|
|
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"),
|
|
labels.MustNewMatcher(labels.MatchEqual, "env", "ops"),
|
|
}
|
|
|
|
queryMetastore(t, tenantID, func(ctx context.Context, start, end time.Time, mstore Metastore) {
|
|
matchedValues, err := mstore.Values(ctx, start, end, matchers...)
|
|
require.NoError(t, err)
|
|
require.Len(t, matchedValues, 0)
|
|
})
|
|
}
|
|
|
|
func TestValuesSingleMatcher(t *testing.T) {
|
|
matchers := []*labels.Matcher{
|
|
labels.MustNewMatcher(labels.MatchEqual, "env", "prod"),
|
|
}
|
|
|
|
queryMetastore(t, tenantID, func(ctx context.Context, start, end time.Time, mstore Metastore) {
|
|
matchedValues, err := mstore.Values(ctx, start, end, matchers...)
|
|
require.NoError(t, err)
|
|
require.Len(t, matchedValues, 5)
|
|
})
|
|
}
|
|
|
|
func TestValuesEmptyMatcher(t *testing.T) {
|
|
queryMetastore(t, tenantID, func(ctx context.Context, start, end time.Time, mstore Metastore) {
|
|
matchedValues, err := mstore.Values(ctx, start, end)
|
|
require.NoError(t, err)
|
|
require.Len(t, matchedValues, 6)
|
|
for _, expectedValue := range []string{"foo", "prod", "bar", "dev", "baz", "a"} {
|
|
require.NotEqual(t, slices.Index(matchedValues, expectedValue), -1)
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestSectionsForStreamMatchers(t *testing.T) {
|
|
ctx := user.InjectOrgID(context.Background(), tenantID)
|
|
|
|
builder, err := indexobj.NewBuilder(logsobj.BuilderBaseConfig{
|
|
TargetPageSize: 1024 * 1024,
|
|
TargetObjectSize: 10 * 1024 * 1024,
|
|
TargetSectionSize: 128,
|
|
BufferSize: 1024 * 1024,
|
|
SectionStripeMergeLimit: 2,
|
|
}, nil)
|
|
require.NoError(t, err)
|
|
|
|
for i, ts := range testStreams {
|
|
lbls, err := syntax.ParseLabels(ts.Labels)
|
|
require.NoError(t, err)
|
|
|
|
newIdx, err := builder.AppendStream(tenantID, streams.Stream{
|
|
ID: int64(i),
|
|
Labels: lbls,
|
|
MinTimestamp: ts.Entries[0].Timestamp,
|
|
MaxTimestamp: ts.Entries[0].Timestamp,
|
|
UncompressedSize: 0,
|
|
})
|
|
require.NoError(t, err)
|
|
err = builder.ObserveLogLine(tenantID, "test-path", 1, newIdx, int64(i), ts.Entries[0].Timestamp, int64(len(ts.Entries[0].Line)))
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
// Add one more stream for a different tenant to ensure it is not resolved.
|
|
altTenant := "tenant-alt"
|
|
altTenantSection := int64(99) // Emulate a different section from a log object that doesn't collide with the main tenant's section
|
|
newIdx, err := builder.AppendStream(altTenant, streams.Stream{
|
|
ID: 1,
|
|
Labels: labels.New(labels.Label{Name: "app", Value: "foo"}, labels.Label{Name: "tenant", Value: altTenant}),
|
|
MinTimestamp: now.Add(-3 * time.Hour),
|
|
MaxTimestamp: now.Add(-2 * time.Hour),
|
|
UncompressedSize: 5,
|
|
})
|
|
require.NoError(t, err)
|
|
err = builder.ObserveLogLine(altTenant, "test-path", altTenantSection, newIdx, 1, now.Add(-2*time.Hour), 5)
|
|
require.NoError(t, err)
|
|
|
|
// Build and store the object
|
|
timeRanges := builder.TimeRanges()
|
|
require.Len(t, timeRanges, 2)
|
|
|
|
obj, closer, err := builder.Flush()
|
|
require.NoError(t, err)
|
|
t.Cleanup(func() { _ = closer.Close() })
|
|
|
|
bucket := objstore.NewInMemBucket()
|
|
|
|
uploader := uploader.New(uploader.Config{SHAPrefixSize: 2}, bucket, log.NewNopLogger())
|
|
require.NoError(t, uploader.RegisterMetrics(prometheus.NewPedanticRegistry()))
|
|
|
|
path, err := uploader.Upload(context.Background(), obj)
|
|
require.NoError(t, err)
|
|
|
|
metastoreTocWriter := NewTableOfContentsWriter(bucket, log.NewNopLogger())
|
|
err = metastoreTocWriter.WriteEntry(context.Background(), path, timeRanges)
|
|
require.NoError(t, err)
|
|
|
|
mstore := newTestObjectMetastore(bucket)
|
|
|
|
tests := []struct {
|
|
name string
|
|
matchers []*labels.Matcher
|
|
predicates []*labels.Matcher
|
|
start, end time.Time
|
|
wantCount int
|
|
}{
|
|
{
|
|
name: "no matchers returns no sections",
|
|
matchers: nil,
|
|
predicates: nil,
|
|
start: now.Add(-time.Hour),
|
|
end: now.Add(time.Hour),
|
|
wantCount: 0,
|
|
},
|
|
{
|
|
name: "single matcher returns matching sections",
|
|
matchers: []*labels.Matcher{
|
|
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"),
|
|
},
|
|
predicates: nil,
|
|
start: now.Add(-time.Hour),
|
|
end: now.Add(time.Hour),
|
|
wantCount: 1,
|
|
},
|
|
{
|
|
name: "non-existent matcher",
|
|
matchers: []*labels.Matcher{
|
|
labels.MustNewMatcher(labels.MatchEqual, "app", "doesnotexist"),
|
|
},
|
|
predicates: nil,
|
|
start: now.Add(-time.Hour),
|
|
end: now.Add(time.Hour),
|
|
wantCount: 0,
|
|
},
|
|
{
|
|
name: "matching selector with unsupported predicate type",
|
|
matchers: []*labels.Matcher{
|
|
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"),
|
|
},
|
|
predicates: []*labels.Matcher{
|
|
labels.MustNewMatcher(labels.MatchRegexp, "bar", "something"),
|
|
},
|
|
start: now.Add(-time.Hour),
|
|
end: now.Add(time.Hour),
|
|
wantCount: 1,
|
|
},
|
|
{
|
|
name: "stream matcher with not matching predicate returns no matching sections",
|
|
matchers: []*labels.Matcher{
|
|
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"),
|
|
},
|
|
predicates: []*labels.Matcher{
|
|
labels.MustNewMatcher(labels.MatchEqual, "bar", "something"),
|
|
},
|
|
start: now.Add(-time.Hour),
|
|
end: now.Add(time.Hour),
|
|
wantCount: 0,
|
|
},
|
|
{
|
|
name: "matcher returns no matching sections if time range is out of bounds",
|
|
matchers: []*labels.Matcher{
|
|
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"),
|
|
},
|
|
predicates: nil,
|
|
start: now.Add(-3 * time.Hour),
|
|
end: now.Add(-2 * time.Hour),
|
|
wantCount: 0,
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
sectionsResp, err := mstore.Sections(ctx, SectionsRequest{tt.start, tt.end, tt.matchers, tt.predicates})
|
|
require.NoError(t, err)
|
|
require.Len(t, sectionsResp.Sections, tt.wantCount)
|
|
for _, section := range sectionsResp.Sections {
|
|
require.NotEqual(t, section.SectionIdx, altTenantSection)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestSectionsForPredicateMatchers(t *testing.T) {
|
|
ctx := user.InjectOrgID(context.Background(), tenantID)
|
|
|
|
builder, err := indexobj.NewBuilder(logsobj.BuilderBaseConfig{
|
|
TargetPageSize: 1024 * 1024,
|
|
TargetObjectSize: 10 * 1024 * 1024,
|
|
TargetSectionSize: 128,
|
|
BufferSize: 1024 * 1024,
|
|
SectionStripeMergeLimit: 2,
|
|
}, nil)
|
|
require.NoError(t, err)
|
|
|
|
_, err = builder.AppendStream(tenantID, streams.Stream{
|
|
ID: 1,
|
|
Labels: labels.New(labels.Label{Name: "app", Value: "foo"}),
|
|
MinTimestamp: now.Add(-3 * time.Hour),
|
|
MaxTimestamp: now.Add(-2 * time.Hour),
|
|
UncompressedSize: 5,
|
|
})
|
|
require.NoError(t, err)
|
|
err = builder.ObserveLogLine(tenantID, "test-path", 0, 1, 1, now.Add(-3*time.Hour), 5)
|
|
require.NoError(t, err)
|
|
err = builder.ObserveLogLine(tenantID, "test-path", 0, 1, 1, now.Add(-2*time.Hour), 0)
|
|
require.NoError(t, err)
|
|
|
|
traceIDBloom := bloom.NewWithEstimates(10, 0.01)
|
|
traceIDBloom.AddString("abcd")
|
|
traceIDBloom.AddString("1234")
|
|
traceIDBloomBytes, err := traceIDBloom.MarshalBinary()
|
|
require.NoError(t, err)
|
|
|
|
err = builder.AppendColumnIndex(tenantID, "test-path", 0, "traceID", 0, traceIDBloomBytes)
|
|
require.NoError(t, err)
|
|
|
|
// Build and store the object
|
|
timeRanges := builder.TimeRanges()
|
|
require.Len(t, timeRanges, 1)
|
|
|
|
obj, closer, err := builder.Flush()
|
|
require.NoError(t, err)
|
|
t.Cleanup(func() { _ = closer.Close() })
|
|
|
|
bucket := objstore.NewInMemBucket()
|
|
|
|
uploader := uploader.New(uploader.Config{SHAPrefixSize: 2}, bucket, log.NewNopLogger())
|
|
require.NoError(t, uploader.RegisterMetrics(prometheus.NewPedanticRegistry()))
|
|
|
|
path, err := uploader.Upload(context.Background(), obj)
|
|
require.NoError(t, err)
|
|
|
|
metastoreTocWriter := NewTableOfContentsWriter(bucket, log.NewNopLogger())
|
|
err = metastoreTocWriter.WriteEntry(context.Background(), path, timeRanges)
|
|
require.NoError(t, err)
|
|
|
|
mstore := newTestObjectMetastore(bucket)
|
|
|
|
tests := []struct {
|
|
name string
|
|
predicates []*labels.Matcher
|
|
wantCount int
|
|
}{
|
|
{
|
|
name: "no predicates returns all sections",
|
|
predicates: nil,
|
|
wantCount: 1,
|
|
},
|
|
{
|
|
name: "single predicate returns matching sections",
|
|
predicates: []*labels.Matcher{
|
|
labels.MustNewMatcher(labels.MatchEqual, "traceID", "abcd"),
|
|
},
|
|
wantCount: 1,
|
|
},
|
|
{
|
|
name: "multiple valid predicates returns matching sections",
|
|
predicates: []*labels.Matcher{
|
|
labels.MustNewMatcher(labels.MatchEqual, "traceID", "abcd"),
|
|
labels.MustNewMatcher(labels.MatchEqual, "traceID", "1234"),
|
|
},
|
|
wantCount: 1,
|
|
},
|
|
{
|
|
name: "missing predicates returns no sections",
|
|
predicates: []*labels.Matcher{
|
|
labels.MustNewMatcher(labels.MatchEqual, "traceID", "cdef"),
|
|
},
|
|
wantCount: 0,
|
|
},
|
|
{
|
|
name: "partial missing predicates returns no sections",
|
|
predicates: []*labels.Matcher{
|
|
labels.MustNewMatcher(labels.MatchEqual, "traceID", "abcd"),
|
|
labels.MustNewMatcher(labels.MatchEqual, "traceID", "cdef"),
|
|
},
|
|
wantCount: 0,
|
|
},
|
|
{
|
|
name: "multiple missing predicates returns no sections",
|
|
predicates: []*labels.Matcher{
|
|
labels.MustNewMatcher(labels.MatchEqual, "traceID", "5678"),
|
|
labels.MustNewMatcher(labels.MatchEqual, "traceID", "cdef"),
|
|
},
|
|
wantCount: 0,
|
|
},
|
|
}
|
|
|
|
matchers := []*labels.Matcher{
|
|
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"),
|
|
}
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
sectionsResp, err := mstore.Sections(ctx, SectionsRequest{now.Add(-3 * time.Hour), now.Add(time.Hour), matchers, tt.predicates})
|
|
require.NoError(t, err)
|
|
require.Len(t, sectionsResp.Sections, tt.wantCount)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestSectionsForLabelsByStreamID(t *testing.T) {
|
|
ctx := user.InjectOrgID(context.Background(), tenantID)
|
|
|
|
builder, err := indexobj.NewBuilder(logsobj.BuilderBaseConfig{
|
|
TargetPageSize: 1024 * 1024,
|
|
TargetObjectSize: 10 * 1024 * 1024,
|
|
TargetSectionSize: 128,
|
|
BufferSize: 1024 * 1024,
|
|
SectionStripeMergeLimit: 2,
|
|
}, nil)
|
|
require.NoError(t, err)
|
|
|
|
// Stream 1: app=foo, env=prod
|
|
_, err = builder.AppendStream(tenantID, streams.Stream{
|
|
ID: 1,
|
|
Labels: labels.New(labels.Label{Name: "app", Value: "foo"}, labels.Label{Name: "env", Value: "prod"}),
|
|
MinTimestamp: now.Add(-3 * time.Hour),
|
|
MaxTimestamp: now.Add(-2 * time.Hour),
|
|
UncompressedSize: 5,
|
|
})
|
|
require.NoError(t, err)
|
|
err = builder.ObserveLogLine(tenantID, "test-path", 0, 1, 1, now.Add(-3*time.Hour), 5)
|
|
require.NoError(t, err)
|
|
|
|
// Stream 2: app=bar, env=dev
|
|
_, err = builder.AppendStream(tenantID, streams.Stream{
|
|
ID: 2,
|
|
Labels: labels.New(labels.Label{Name: "app", Value: "bar"}, labels.Label{Name: "env", Value: "dev"}),
|
|
MinTimestamp: now.Add(-1 * time.Hour),
|
|
MaxTimestamp: now,
|
|
UncompressedSize: 10,
|
|
})
|
|
require.NoError(t, err)
|
|
err = builder.ObserveLogLine(tenantID, "test-path", 1, 2, 2, now.Add(-1*time.Hour), 10)
|
|
require.NoError(t, err)
|
|
|
|
// Stream 3: app=foo, env=dev (shares app label with stream 1, env with stream 2)
|
|
_, err = builder.AppendStream(tenantID, streams.Stream{
|
|
ID: 3,
|
|
Labels: labels.New(labels.Label{Name: "app", Value: "foo"}, labels.Label{Name: "env", Value: "dev"}),
|
|
MinTimestamp: now.Add(-30 * time.Minute),
|
|
MaxTimestamp: now.Add(-15 * time.Minute),
|
|
UncompressedSize: 7,
|
|
})
|
|
require.NoError(t, err)
|
|
err = builder.ObserveLogLine(tenantID, "test-path", 2, 3, 3, now.Add(-30*time.Minute), 7)
|
|
require.NoError(t, err)
|
|
|
|
// Build and store the object
|
|
timeRanges := builder.TimeRanges()
|
|
require.Len(t, timeRanges, 1)
|
|
|
|
obj, closer, err := builder.Flush()
|
|
require.NoError(t, err)
|
|
t.Cleanup(func() { _ = closer.Close() })
|
|
|
|
bucket := objstore.NewInMemBucket()
|
|
|
|
uploader := uploader.New(uploader.Config{SHAPrefixSize: 2}, bucket, log.NewNopLogger())
|
|
require.NoError(t, uploader.RegisterMetrics(prometheus.NewPedanticRegistry()))
|
|
|
|
path, err := uploader.Upload(context.Background(), obj)
|
|
require.NoError(t, err)
|
|
|
|
metastoreTocWriter := NewTableOfContentsWriter(bucket, log.NewNopLogger())
|
|
err = metastoreTocWriter.WriteEntry(context.Background(), path, timeRanges)
|
|
require.NoError(t, err)
|
|
|
|
mstore := newTestObjectMetastore(bucket)
|
|
|
|
streamLabels := []string{"app", "env"}
|
|
|
|
tests := []struct {
|
|
name string
|
|
matchers []*labels.Matcher
|
|
predicates []*labels.Matcher
|
|
start, end time.Time
|
|
wantCount int
|
|
wantLabelsByStreamID map[int64][]string // Expected stream ID -> labels mapping
|
|
}{
|
|
{
|
|
name: "exact match on app=foo returns sections with correct labels per stream",
|
|
matchers: []*labels.Matcher{
|
|
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"),
|
|
},
|
|
predicates: nil,
|
|
start: now.Add(-4 * time.Hour),
|
|
end: now.Add(time.Hour),
|
|
wantCount: 2,
|
|
wantLabelsByStreamID: map[int64][]string{
|
|
1: streamLabels,
|
|
3: streamLabels,
|
|
},
|
|
},
|
|
{
|
|
name: "exact match on env=dev returns sections with correct labels per stream",
|
|
matchers: []*labels.Matcher{
|
|
labels.MustNewMatcher(labels.MatchEqual, "env", "dev"),
|
|
},
|
|
predicates: nil,
|
|
start: now.Add(-4 * time.Hour),
|
|
end: now.Add(time.Hour),
|
|
wantCount: 2,
|
|
wantLabelsByStreamID: map[int64][]string{
|
|
2: streamLabels,
|
|
3: streamLabels,
|
|
},
|
|
},
|
|
{
|
|
name: "combined matchers return only matching section with correct stream labels",
|
|
matchers: []*labels.Matcher{
|
|
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"),
|
|
labels.MustNewMatcher(labels.MatchEqual, "env", "prod"),
|
|
},
|
|
predicates: nil,
|
|
start: now.Add(-4 * time.Hour),
|
|
end: now.Add(time.Hour),
|
|
wantCount: 1,
|
|
wantLabelsByStreamID: map[int64][]string{
|
|
1: streamLabels,
|
|
},
|
|
},
|
|
{
|
|
name: "regex matcher returns multiple matching sections with correct labels per stream",
|
|
matchers: []*labels.Matcher{
|
|
labels.MustNewMatcher(labels.MatchRegexp, "app", "foo|bar"),
|
|
},
|
|
predicates: nil,
|
|
start: now.Add(-4 * time.Hour),
|
|
end: now.Add(time.Hour),
|
|
wantCount: 3,
|
|
wantLabelsByStreamID: map[int64][]string{
|
|
1: streamLabels,
|
|
2: streamLabels,
|
|
3: streamLabels,
|
|
},
|
|
},
|
|
{
|
|
name: "non-matching combined matchers return no sections",
|
|
matchers: []*labels.Matcher{
|
|
labels.MustNewMatcher(labels.MatchEqual, "app", "bar"),
|
|
labels.MustNewMatcher(labels.MatchEqual, "env", "prod"),
|
|
},
|
|
predicates: nil,
|
|
start: now.Add(-4 * time.Hour),
|
|
end: now.Add(time.Hour),
|
|
wantCount: 0,
|
|
wantLabelsByStreamID: nil,
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
sectionsResp, err := mstore.Sections(ctx, SectionsRequest{tt.start, tt.end, tt.matchers, tt.predicates})
|
|
require.NoError(t, err)
|
|
require.Len(t, sectionsResp.Sections, tt.wantCount)
|
|
|
|
if tt.wantLabelsByStreamID != nil {
|
|
// Collect all stream ID -> labels mappings across all sections
|
|
gotLabelsByStreamID := make(map[int64][]string)
|
|
for _, section := range sectionsResp.Sections {
|
|
for streamID, lbls := range section.LabelsByStreamID {
|
|
gotLabelsByStreamID[streamID] = lbls
|
|
}
|
|
}
|
|
|
|
// Verify each expected stream ID has the correct labels
|
|
require.Len(t, gotLabelsByStreamID, len(tt.wantLabelsByStreamID), "unexpected number of streams")
|
|
for streamID, wantLabels := range tt.wantLabelsByStreamID {
|
|
gotLabels, ok := gotLabelsByStreamID[streamID]
|
|
require.True(t, ok, "missing labels for stream ID %d", streamID)
|
|
require.Equal(t, wantLabels, gotLabels, "labels mismatch for stream ID %d", streamID)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestIndexSectionsReader_LabelPredicatesNotFilteredByBlooms tests that predicates on known
|
|
// stream labels are not incorrectly filtered out by bloom filters. Before this change, label filters
|
|
// after stream selectors (e.g. `{app="foo"} | app="foo"`) would incorrectly return no results
|
|
// because the bloom filter lookup would fail for labels that only exist as indexed stream labels,
|
|
// not as structured metadata.
|
|
func TestIndexSectionsReader_LabelPredicatesNotFilteredByBlooms(t *testing.T) {
|
|
ctx := user.InjectOrgID(context.Background(), tenantID)
|
|
|
|
builder, err := indexobj.NewBuilder(logsobj.BuilderBaseConfig{
|
|
TargetPageSize: 1024 * 1024,
|
|
TargetObjectSize: 10 * 1024 * 1024,
|
|
TargetSectionSize: 128,
|
|
BufferSize: 1024 * 1024,
|
|
SectionStripeMergeLimit: 2,
|
|
}, nil)
|
|
require.NoError(t, err)
|
|
|
|
// Create a stream with label app=foo
|
|
_, err = builder.AppendStream(tenantID, streams.Stream{
|
|
ID: 1,
|
|
Labels: labels.New(labels.Label{Name: "app", Value: "foo"}),
|
|
MinTimestamp: now.Add(-3 * time.Hour),
|
|
MaxTimestamp: now.Add(-2 * time.Hour),
|
|
UncompressedSize: 5,
|
|
})
|
|
require.NoError(t, err)
|
|
err = builder.ObserveLogLine(tenantID, "test-path", 0, 1, 1, now.Add(-3*time.Hour), 5)
|
|
require.NoError(t, err)
|
|
err = builder.ObserveLogLine(tenantID, "test-path", 0, 1, 1, now.Add(-2*time.Hour), 0)
|
|
require.NoError(t, err)
|
|
|
|
// Add a bloom filter for a metadata column (traceID), NOT for the stream label (app)
|
|
traceIDBloom := bloom.NewWithEstimates(10, 0.01)
|
|
traceIDBloom.AddString("abcd")
|
|
traceIDBloomBytes, err := traceIDBloom.MarshalBinary()
|
|
require.NoError(t, err)
|
|
err = builder.AppendColumnIndex(tenantID, "test-path", 0, "traceID", 0, traceIDBloomBytes)
|
|
require.NoError(t, err)
|
|
|
|
// Build and store the object
|
|
timeRanges := builder.TimeRanges()
|
|
require.Len(t, timeRanges, 1)
|
|
|
|
obj, closer, err := builder.Flush()
|
|
require.NoError(t, err)
|
|
t.Cleanup(func() { _ = closer.Close() })
|
|
|
|
bucket := objstore.NewInMemBucket()
|
|
|
|
uploader := uploader.New(uploader.Config{SHAPrefixSize: 2}, bucket, log.NewNopLogger())
|
|
require.NoError(t, uploader.RegisterMetrics(prometheus.NewPedanticRegistry()))
|
|
|
|
path, err := uploader.Upload(context.Background(), obj)
|
|
require.NoError(t, err)
|
|
|
|
mstore := newTestObjectMetastore(bucket)
|
|
|
|
tests := []struct {
|
|
name string
|
|
matchers []*labels.Matcher
|
|
predicates []*labels.Matcher
|
|
expectResults bool
|
|
}{
|
|
{
|
|
name: "predicate on stream label should not be filtered by blooms",
|
|
matchers: []*labels.Matcher{
|
|
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"),
|
|
},
|
|
predicates: []*labels.Matcher{
|
|
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"),
|
|
},
|
|
expectResults: true,
|
|
},
|
|
{
|
|
name: "predicate on metadata column still works",
|
|
matchers: []*labels.Matcher{
|
|
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"),
|
|
},
|
|
predicates: []*labels.Matcher{
|
|
labels.MustNewMatcher(labels.MatchEqual, "traceID", "abcd"),
|
|
},
|
|
expectResults: true,
|
|
},
|
|
{
|
|
name: "predicate on metadata column with non-matching value returns no results",
|
|
matchers: []*labels.Matcher{
|
|
labels.MustNewMatcher(labels.MatchEqual, "app", "foo"),
|
|
},
|
|
predicates: []*labels.Matcher{
|
|
labels.MustNewMatcher(labels.MatchEqual, "traceID", "not-in-bloom"),
|
|
},
|
|
expectResults: false,
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
resp, err := mstore.IndexSectionsReader(ctx, IndexSectionsReaderRequest{
|
|
IndexPath: path,
|
|
SectionsRequest: SectionsRequest{
|
|
Start: now.Add(-4 * time.Hour),
|
|
End: now.Add(time.Hour),
|
|
Matchers: tt.matchers,
|
|
Predicates: tt.predicates,
|
|
},
|
|
})
|
|
require.NoError(t, err)
|
|
t.Cleanup(resp.Reader.Close)
|
|
|
|
// Read all records from the reader
|
|
var totalRows int64
|
|
for {
|
|
rec, err := resp.Reader.Read(ctx)
|
|
if err != nil {
|
|
break
|
|
}
|
|
totalRows += rec.NumRows()
|
|
}
|
|
|
|
if !tt.expectResults {
|
|
require.Equal(t, int64(0), totalRows, "expected no results")
|
|
} else {
|
|
require.NotEmpty(t, totalRows, "expected results to be returned")
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func queryMetastore(t *testing.T, tenant string, mfunc func(context.Context, time.Time, time.Time, Metastore)) {
|
|
now := time.Now().UTC()
|
|
start := now.Add(-time.Hour * 5)
|
|
end := now.Add(time.Hour * 5)
|
|
|
|
builder := newTestDataBuilder(t)
|
|
|
|
for _, stream := range testStreams {
|
|
builder.addStreamAndFlush(tenant, stream)
|
|
}
|
|
|
|
mstore := newTestObjectMetastore(builder.bucket)
|
|
defer func() {
|
|
require.NoError(t, mstore.bucket.Close())
|
|
}()
|
|
|
|
ctx := user.InjectOrgID(context.Background(), tenant)
|
|
|
|
mfunc(ctx, start, end, mstore)
|
|
}
|
|
|
|
func newTestDataBuilder(t testing.TB) *testDataBuilder {
|
|
bucket := objstore.NewInMemBucket()
|
|
|
|
builder, err := logsobj.NewBuilder(logsobj.BuilderConfig{
|
|
BuilderBaseConfig: logsobj.BuilderBaseConfig{
|
|
TargetPageSize: 1024 * 1024, // 1MB
|
|
TargetObjectSize: 10 * 1024 * 1024, // 10MB
|
|
TargetSectionSize: 1024 * 1024, // 1MB
|
|
BufferSize: 1024 * 1024, // 1MB
|
|
SectionStripeMergeLimit: 2,
|
|
},
|
|
}, nil)
|
|
require.NoError(t, err)
|
|
|
|
logger := log.NewLogfmtLogger(os.Stdout)
|
|
logger = log.With(logger, "test", t.Name())
|
|
|
|
meta := NewTableOfContentsWriter(bucket, logger)
|
|
require.NoError(t, meta.RegisterMetrics(prometheus.NewPedanticRegistry()))
|
|
|
|
uploader := uploader.New(uploader.Config{SHAPrefixSize: 2}, bucket, logger)
|
|
require.NoError(t, uploader.RegisterMetrics(prometheus.NewPedanticRegistry()))
|
|
|
|
return &testDataBuilder{
|
|
t: t,
|
|
bucket: bucket,
|
|
builder: builder,
|
|
meta: meta,
|
|
uploader: uploader,
|
|
}
|
|
}
|
|
|
|
func newTestObjectMetastore(bucket objstore.Bucket) *ObjectMetastore {
|
|
return NewObjectMetastore(bucket, Config{}, log.NewNopLogger(), NewObjectMetastoreMetrics(prometheus.NewRegistry()))
|
|
}
|
|
|