chore(bloomgateway): update bloom gateway to test for structured metadata blooms (#14130)

Signed-off-by: Robert Fratto <robertfratto@gmail.com>
Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
Co-authored-by: Christian Haudum <christian.haudum@gmail.com>
pull/14136/head
Robert Fratto 8 months ago committed by GitHub
parent 89c4282654
commit 4de51f9953
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 12
      pkg/bloomgateway/bloomgateway.go
  2. 24
      pkg/bloomgateway/bloomgateway_test.go
  3. 8
      pkg/bloomgateway/metrics.go
  4. 17
      pkg/bloomgateway/multiplexing.go
  5. 8
      pkg/bloomgateway/multiplexing_test.go
  6. 3
      pkg/bloomgateway/processor.go
  7. 38
      pkg/bloomgateway/processor_test.go
  8. 4
      pkg/bloomgateway/stats.go
  9. 4
      pkg/storage/bloom/v1/ast_extractor.go
  10. 121
      pkg/storage/bloom/v1/bloom_tester.go
  11. 121
      pkg/storage/bloom/v1/bloom_tester_test.go
  12. 13
      pkg/storage/bloom/v1/builder_test.go
  13. 15
      pkg/storage/bloom/v1/fuse.go
  14. 8
      pkg/storage/bloom/v1/fuse_test.go
  15. 95
      pkg/storage/bloom/v1/test_util.go
  16. 58
      pkg/storage/bloom/v1/versioned_builder_test.go

@ -193,12 +193,12 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
return nil, errors.New("from time must not be after through time")
}
filters := v1.ExtractTestableLineFilters(req.Plan.AST)
stats.NumFilters = len(filters)
g.metrics.receivedFilters.Observe(float64(len(filters)))
matchers := v1.ExtractTestableLabelMatchers(req.Plan.AST)
stats.NumMatchers = len(matchers)
g.metrics.receivedMatchers.Observe(float64(len(matchers)))
// Shortcut if request does not contain filters
if len(filters) == 0 {
if len(matchers) == 0 {
stats.Status = labelSuccess
return &logproto.FilterChunkRefResponse{
ChunkRefs: req.Refs,
@ -227,7 +227,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
stats.NumTasks = len(seriesByDay)
sp.LogKV(
"filters", len(filters),
"matchers", len(matchers),
"days", len(seriesByDay),
"blocks", len(req.Blocks),
"series_requested", len(req.Refs),
@ -239,7 +239,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
}
series := seriesByDay[0]
task := newTask(ctx, tenantID, series, filters, blocks)
task := newTask(ctx, tenantID, series, matchers, blocks)
// TODO(owen-d): include capacity in constructor?
task.responses = responsesPool.Get(len(series.series))

@ -157,7 +157,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
chunkRefs := createQueryInputFromBlockData(t, tenantID, data, 100)
expr, err := syntax.ParseExpr(`{foo="bar"} |= "does not match"`)
expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="nomatch"`)
require.NoError(t, err)
req := &logproto.FilterChunkRefRequest{
@ -196,7 +196,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
// saturate workers
// then send additional request
for i := 0; i < gw.cfg.WorkerConcurrency+1; i++ {
expr, err := syntax.ParseExpr(`{foo="bar"} |= "does not match"`)
expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="nomatch"`)
require.NoError(t, err)
req := &logproto.FilterChunkRefRequest{
@ -240,7 +240,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
// saturate workers
// then send additional request
for i := 0; i < gw.cfg.WorkerConcurrency+1; i++ {
expr, err := syntax.ParseExpr(`{foo="bar"} |= "does not match"`)
expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="nomatch"`)
require.NoError(t, err)
req := &logproto.FilterChunkRefRequest{
@ -341,7 +341,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
Checksum: uint32(idx),
},
}
expr, err := syntax.ParseExpr(`{foo="bar"} |= "foo"`)
expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="nomatch"`)
require.NoError(t, err)
req := &logproto.FilterChunkRefRequest{
From: now.Add(-4 * time.Hour),
@ -380,7 +380,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
t.Run("no match - return empty response", func(t *testing.T) {
inputChunkRefs := groupRefs(t, chunkRefs)
expr, err := syntax.ParseExpr(`{foo="bar"} |= "does not match"`)
expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="nomatch"`)
require.NoError(t, err)
req := &logproto.FilterChunkRefRequest{
From: now.Add(-8 * time.Hour),
@ -403,16 +403,14 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
inputChunkRefs := groupRefs(t, chunkRefs)
// Hack to get search string for a specific series
// see MkBasicSeriesWithBlooms() in pkg/storage/bloom/v1/test_util.go
// each series has 1 chunk
// each chunk has multiple strings, from int(fp) to int(nextFp)-1
x := rand.Intn(len(inputChunkRefs))
fp := inputChunkRefs[x].Fingerprint
chks := inputChunkRefs[x].Refs
line := fmt.Sprintf("%04x:%04x", int(fp), 0) // first line
rnd := rand.Intn(len(inputChunkRefs))
fp := inputChunkRefs[rnd].Fingerprint
chks := inputChunkRefs[rnd].Refs
key := fmt.Sprintf("%s:%04x", model.Fingerprint(fp), 0)
t.Log("x=", x, "fp=", fp, "line=", line)
t.Log("rnd=", rnd, "fp=", fp, "key=", key)
expr, err := syntax.ParseExpr(fmt.Sprintf(`{foo="bar"} |= "%s"`, line))
expr, err := syntax.ParseExpr(fmt.Sprintf(`{foo="bar"} | trace_id="%s"`, key))
require.NoError(t, err)
req := &logproto.FilterChunkRefRequest{

@ -56,7 +56,7 @@ type serverMetrics struct {
filteredSeries prometheus.Histogram
requestedChunks prometheus.Histogram
filteredChunks prometheus.Histogram
receivedFilters prometheus.Histogram
receivedMatchers prometheus.Histogram
}
func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) *metrics {
@ -105,11 +105,11 @@ func newServerMetrics(registerer prometheus.Registerer, namespace, subsystem str
Help: "Total amount of chunk refs filtered by bloom-gateway",
Buckets: prometheus.ExponentialBucketsRange(1, 100e3, 10),
}),
receivedFilters: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
receivedMatchers: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "request_filters",
Help: "Number of filters per request.",
Name: "request_matchers",
Help: "Number of matchers per request.",
Buckets: prometheus.ExponentialBuckets(1, 2, 9), // 1 -> 256
}),
}

@ -9,7 +9,6 @@ import (
iter "github.com/grafana/loki/v3/pkg/iter/v2"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
@ -56,8 +55,8 @@ type Task struct {
// series of the original request
series []*logproto.GroupedChunkRefs
// filters of the original request
filters []syntax.LineFilterExpr
// matchers to check against
matchers []v1.LabelMatcher
// blocks that were resolved on the index gateway and sent with the request
blocks []bloomshipper.BlockRef
// from..through date of the task's chunks
@ -75,13 +74,13 @@ type Task struct {
recorder *v1.BloomRecorder
}
func newTask(ctx context.Context, tenantID string, refs seriesWithInterval, filters []syntax.LineFilterExpr, blocks []bloomshipper.BlockRef) Task {
func newTask(ctx context.Context, tenantID string, refs seriesWithInterval, matchers []v1.LabelMatcher, blocks []bloomshipper.BlockRef) Task {
return Task{
tenant: tenantID,
recorder: v1.NewBloomRecorder(ctx, "task"),
err: new(wrappedError),
resCh: make(chan v1.Output),
filters: filters,
matchers: matchers,
blocks: blocks,
series: refs.series,
interval: refs.interval,
@ -122,7 +121,7 @@ func (t Task) Copy(series []*logproto.GroupedChunkRefs) Task {
tenant: t.tenant,
err: t.err,
resCh: t.resCh,
filters: t.filters,
matchers: t.matchers,
blocks: t.blocks,
series: series,
interval: t.interval,
@ -132,13 +131,11 @@ func (t Task) Copy(series []*logproto.GroupedChunkRefs) Task {
}
}
func (t Task) RequestIter(
tokenizer *v1.NGramTokenizer,
) iter.Iterator[v1.Request] {
func (t Task) RequestIter() iter.Iterator[v1.Request] {
return &requestIterator{
recorder: t.recorder,
series: iter.NewSliceIter(t.series),
search: v1.FiltersToBloomTest(tokenizer, t.filters...),
search: v1.LabelMatchersToBloomTest(t.matchers...),
channel: t.resCh,
curr: v1.Request{},
}

@ -11,7 +11,6 @@ import (
v2 "github.com/grafana/loki/v3/pkg/iter/v2"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
)
@ -55,15 +54,14 @@ func createTasksForRequests(t *testing.T, tenant string, requests ...*logproto.F
func TestTask_RequestIterator(t *testing.T) {
ts := mktime("2024-01-24 12:00")
tenant := "fake"
tokenizer := v1.NewNGramTokenizer(4, 0)
t.Run("empty request yields empty iterator", func(t *testing.T) {
swb := seriesWithInterval{
interval: bloomshipper.Interval{Start: 0, End: math.MaxInt64},
series: []*logproto.GroupedChunkRefs{},
}
task := newTask(context.Background(), tenant, swb, []syntax.LineFilterExpr{}, nil)
it := task.RequestIter(tokenizer)
task := newTask(context.Background(), tenant, swb, nil, nil)
it := task.RequestIter()
// nothing to iterate over
require.False(t, it.Next())
})
@ -106,7 +104,7 @@ func TestTask_RequestIterator(t *testing.T) {
iters := make([]v2.PeekIterator[v1.Request], 0, len(tasks))
for _, task := range tasks {
iters = append(iters, v2.NewPeekIter(task.RequestIter(tokenizer)))
iters = append(iters, v2.NewPeekIter(task.RequestIter()))
}
// merge the request iterators using the heap sort iterator

@ -150,7 +150,6 @@ func (p *processor) processBlock(_ context.Context, bq *bloomshipper.CloseableBl
return v1.ErrUnsupportedSchemaVersion
}
tokenizer := v1.NewNGramTokenizer(schema.NGramLen(), schema.NGramSkip())
iters := make([]iter.PeekIterator[v1.Request], 0, len(tasks))
for _, task := range tasks {
@ -164,7 +163,7 @@ func (p *processor) processBlock(_ context.Context, bq *bloomshipper.CloseableBl
// sp.LogKV("process block", blockID, "series", len(task.series))
// }
it := iter.NewPeekIter(task.RequestIter(tokenizer))
it := iter.NewPeekIter(task.RequestIter())
iters = append(iters, it)
}

@ -14,7 +14,6 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"github.com/grafana/loki/v3/pkg/logql/syntax"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/chunk/client"
"github.com/grafana/loki/v3/pkg/storage/config"
@ -140,17 +139,16 @@ func TestProcessor(t *testing.T) {
},
day: config.NewDayTime(truncateDay(now)),
}
filters := []syntax.LineFilterExpr{
{
LineFilter: syntax.LineFilter{
Ty: 0,
Match: "no match",
},
matchers := []v1.LabelMatcher{
v1.PlainLabelMatcher{
Key: "trace_id",
Value: "nomatch",
},
}
t.Log("series", len(swb.series))
task := newTask(ctx, "fake", swb, filters, nil)
task := newTask(ctx, "fake", swb, matchers, nil)
tasks := []Task{task}
results := atomic.NewInt64(0)
@ -192,17 +190,15 @@ func TestProcessor(t *testing.T) {
},
day: config.NewDayTime(truncateDay(now)),
}
filters := []syntax.LineFilterExpr{
{
LineFilter: syntax.LineFilter{
Ty: 0,
Match: "no match",
},
matchers := []v1.LabelMatcher{
v1.PlainLabelMatcher{
Key: "trace_id",
Value: "nomatch",
},
}
t.Log("series", len(swb.series))
task := newTask(ctx, "fake", swb, filters, blocks)
task := newTask(ctx, "fake", swb, matchers, blocks)
tasks := []Task{task}
results := atomic.NewInt64(0)
@ -241,17 +237,15 @@ func TestProcessor(t *testing.T) {
},
day: config.NewDayTime(truncateDay(now)),
}
filters := []syntax.LineFilterExpr{
{
LineFilter: syntax.LineFilter{
Ty: 0,
Match: "no match",
},
matchers := []v1.LabelMatcher{
v1.PlainLabelMatcher{
Key: "trace_id",
Value: "nomatch",
},
}
t.Log("series", len(swb.series))
task := newTask(ctx, "fake", swb, filters, nil)
task := newTask(ctx, "fake", swb, matchers, nil)
tasks := []Task{task}
results := atomic.NewInt64(0)

@ -9,7 +9,7 @@ import (
type Stats struct {
Status string
NumTasks, NumFilters int
NumTasks, NumMatchers int
ChunksRequested, ChunksFiltered int
SeriesRequested, SeriesFiltered int
QueueTime *atomic.Duration
@ -70,7 +70,7 @@ func (s *Stats) KVArgs() []any {
"msg", "stats-report",
"status", s.Status,
"tasks", s.NumTasks,
"filters", s.NumFilters,
"matchers", s.NumMatchers,
"blocks_processed", s.ProcessedBlocks.Load(),
"series_requested", s.SeriesRequested,
"series_filtered", s.SeriesFiltered,

@ -35,6 +35,10 @@ type AndLabelMatcher struct{ Left, Right LabelMatcher }
// Unsupported LabelFilterExprs map to an UnsupportedLabelMatcher, for which
// bloom tests should always pass.
func ExtractTestableLabelMatchers(expr syntax.Expr) []LabelMatcher {
if expr == nil {
return nil
}
var (
exprs []*syntax.LabelFilterExpr
foundParseStage bool

@ -1,7 +1,9 @@
package v1
import (
"fmt"
"unicode/utf8"
"unsafe"
"github.com/grafana/regexp"
@ -292,6 +294,25 @@ func (o orTest) MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefixLen
return o.left.MatchesWithPrefixBuf(bloom, buf, prefixLen) || o.right.MatchesWithPrefixBuf(bloom, buf, prefixLen)
}
type andTest struct {
left, right BloomTest
}
func newAndTest(left, right BloomTest) andTest {
return andTest{
left: left,
right: right,
}
}
func (a andTest) Matches(bloom filter.Checker) bool {
return a.left.Matches(bloom) && a.right.Matches(bloom)
}
func (a andTest) MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefixLen int) bool {
return a.left.MatchesWithPrefixBuf(bloom, buf, prefixLen) && a.right.MatchesWithPrefixBuf(bloom, buf, prefixLen)
}
func newPatternTest(b NGramBuilder, match string) BloomTest {
lit, err := pattern.ParseLiterals(match)
if err != nil {
@ -305,3 +326,103 @@ func newPatternTest(b NGramBuilder, match string) BloomTest {
}
return res
}
func LabelMatchersToBloomTest(matchers ...LabelMatcher) BloomTest {
tests := make(BloomTests, 0, len(matchers))
for _, matcher := range matchers {
tests = append(tests, matcherToBloomTest(matcher))
}
return tests
}
func matcherToBloomTest(matcher LabelMatcher) BloomTest {
switch matcher := matcher.(type) {
case UnsupportedLabelMatcher:
return matchAllTest{}
case PlainLabelMatcher:
return newStringMatcherTest(matcher)
case OrLabelMatcher:
return newOrTest(
matcherToBloomTest(matcher.Left),
matcherToBloomTest(matcher.Right),
)
case AndLabelMatcher:
return newAndTest(
matcherToBloomTest(matcher.Left),
matcherToBloomTest(matcher.Right),
)
default:
// Unhandled cases pass bloom tests by default.
return matchAllTest{}
}
}
type stringMatcherTest struct {
matcher PlainLabelMatcher
}
func newStringMatcherTest(matcher PlainLabelMatcher) stringMatcherTest {
return stringMatcherTest{matcher: matcher}
}
func (sm stringMatcherTest) Matches(bloom filter.Checker) bool {
// TODO(rfratto): reintroduce the use of a shared tokenizer here to avoid
// desyncing between how tokens are passed during building vs passed during
// querying.
//
// For a shared tokenizer to be ergonomic:
//
// 1. A prefix shouldn't be required until MatchesWithPrefixBuf is called
// 2. It should be possible to test for just the key
var (
combined = fmt.Sprintf("%s=%s", sm.matcher.Key, sm.matcher.Value)
rawKey = unsafe.Slice(unsafe.StringData(sm.matcher.Key), len(sm.matcher.Key))
rawCombined = unsafe.Slice(unsafe.StringData(combined), len(combined))
)
if !bloom.Test(rawKey) {
// The structured metadata key wasn't indexed. We pass the bloom test
// since we can only filter data out if the key was indexed but the value
// wasn't.
//
// TODO(rfratto): The negative test here is a bit confusing, and the key
// presence test should likely be done higher up.
return true
}
return bloom.Test(rawCombined)
}
func (sm stringMatcherTest) MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefixLen int) bool {
var (
combined = fmt.Sprintf("%s=%s", sm.matcher.Key, sm.matcher.Value)
prefixedKey = appendToBuf(buf, prefixLen, sm.matcher.Key)
prefixedCombined = appendToBuf(buf, prefixLen, combined)
)
if !bloom.Test(prefixedKey) {
// The structured metadata key wasn't indexed for a prefix. We pass the
// bloom test since we can only filter data out if the key was indexed but
// the value wasn't.
//
// TODO(rfratto): The negative test here is a bit confusing, and the key
// presence test should likely be done higher up.
return true
}
return bloom.Test(prefixedCombined)
}
// appendToBuf is the equivalent of append(buf[:prefixLen], str). len(buf) must
// be greater than or equal to prefixLen+len(str) to avoid allocations.
func appendToBuf(buf []byte, prefixLen int, str string) []byte {
rawString := unsafe.Slice(unsafe.StringData(str), len(str))
return append(buf[:prefixLen], rawString...)
}

@ -5,14 +5,15 @@ import (
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/push"
"github.com/grafana/loki/v3/pkg/logql/syntax"
)
type fakeBloom []string
type fakeLineBloom []string
// fakeBloom is a fake bloom filter that matches tokens exactly.
// It uses a tokenizer to build the tokens for a line
func newFakeBloom(tokenizer *NGramTokenizer, line string) (res fakeBloom) {
func newFakeBloom(tokenizer *NGramTokenizer, line string) (res fakeLineBloom) {
toks := tokenizer.Tokens(line)
for toks.Next() {
res = append(res, string(toks.At()))
@ -20,7 +21,7 @@ func newFakeBloom(tokenizer *NGramTokenizer, line string) (res fakeBloom) {
return
}
func (f fakeBloom) Test(data []byte) bool {
func (f fakeLineBloom) Test(data []byte) bool {
str := string(data)
for _, match := range f {
if str == match {
@ -117,3 +118,117 @@ func TestBloomQueryingLogic(t *testing.T) {
})
}
}
func TestLabelMatchersToBloomTest(t *testing.T) {
// All test cases below have access to a fake bloom filter with
// trace_id=exists_1 and trace_id=exists_2
var (
prefix = "fakeprefix"
tokenizer = NewStructuredMetadataTokenizer(prefix)
bloom = newFakeMetadataBloom(
tokenizer,
push.LabelAdapter{Name: "trace_id", Value: "exists_1"},
push.LabelAdapter{Name: "trace_id", Value: "exists_2"},
)
)
tt := []struct {
name string
query string
match bool
}{
{
name: "no matchers",
query: `{app="fake"}`,
match: true,
},
{
name: "basic matcher pass",
query: `{app="fake"} | trace_id="exists_1"`,
match: true,
},
{
name: "basic matcher fail",
query: `{app="fake"} | trace_id="noexist"`,
match: false,
},
{
name: "multiple matcher pass",
query: `{app="fake"} | trace_id="exists_1" | trace_id="exists_2"`,
match: true,
},
{
name: "multiple matcher fail",
query: `{app="fake"} | trace_id="exists_1" | trace_id="noexist"`,
match: false,
},
{
name: "ignore non-indexed key",
query: `{app="fake"} | noexist="noexist"`,
match: true,
},
{
name: "ignore unsupported operator",
query: `{app="fake"} | trace_id=~".*noexist.*"`,
match: true,
},
{
name: "or test pass",
query: `{app="fake"} | trace_id="noexist" or trace_id="exists_1"`,
match: true,
},
{
name: "or test fail",
query: `{app="fake"} | trace_id="noexist" or trace_id="noexist"`,
match: false,
},
{
name: "and test pass",
query: `{app="fake"} | trace_id="exists_1" or trace_id="exists_2"`,
match: true,
},
{
name: "and test fail",
query: `{app="fake"} | trace_id="exists_1" and trace_id="noexist"`,
match: false,
},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
expr, err := syntax.ParseExpr(tc.query)
require.NoError(t, err)
matchers := ExtractTestableLabelMatchers(expr)
bloomTest := LabelMatchersToBloomTest(matchers...)
// .Matches and .MatchesWithPrefixBuf should both have the same result.
require.Equal(t, tc.match, bloomTest.Matches(bloom))
require.Equal(t, tc.match, bloomTest.MatchesWithPrefixBuf(bloom, []byte(prefix), len(prefix)))
})
}
}
type fakeMetadataBloom []string
// fakeBloom is a fake bloom filter that matches tokens exactly.
// It uses a tokenizer to build the tokens for a line
func newFakeMetadataBloom(tokenizer *StructuredMetadataTokenizer, kvs ...push.LabelAdapter) (res fakeLineBloom) {
for _, kv := range kvs {
it := tokenizer.Tokens(kv)
for it.Next() {
res = append(res, it.At())
}
}
return res
}
func (f fakeMetadataBloom) Test(data []byte) bool {
str := string(data)
for _, match := range f {
if str == match {
return true
}
}
return false
}

@ -49,7 +49,6 @@ func TestBlockOptions_RoundTrip(t *testing.T) {
func TestBlockBuilder_RoundTrip(t *testing.T) {
numSeries := 100
data, keys := MkBasicSeriesWithLiteralBlooms(numSeries, 0, 0xffff, 0, 10000)
for _, enc := range blockEncodings {
// references for linking in memory reader+writer
@ -97,16 +96,12 @@ func TestBlockBuilder_RoundTrip(t *testing.T) {
BloomPageSize: 10 << 10,
BlockSize: tc.maxBlockSize,
}
data, keys := MkBasicSeriesWithBlooms(numSeries, 0, 0xffff, 0, 10000)
builder, err := NewBlockBuilder(blockOpts, tc.writer)
require.Nil(t, err)
itr := iter.NewPeekIter[SeriesWithBlooms](
iter.NewMapIter(
iter.NewSliceIter[SeriesWithLiteralBlooms](data),
func(x SeriesWithLiteralBlooms) SeriesWithBlooms { return x.SeriesWithBlooms() },
),
)
itr := iter.NewPeekIter(iter.NewSliceIter(data))
_, err = builder.BuildFrom(itr)
require.Nil(t, err)
@ -134,7 +129,7 @@ func TestBlockBuilder_RoundTrip(t *testing.T) {
got := querier.At()
blooms, err := iter.Collect(got.Blooms)
require.Nil(t, err)
require.Equal(t, *processedData[i].Series, got.Series.Series)
require.Equal(t, processedData[i].Series.Series, got.Series.Series)
for _, key := range keys[i] {
found := false
for _, b := range blooms {
@ -161,7 +156,7 @@ func TestBlockBuilder_RoundTrip(t *testing.T) {
got := querier.At()
blooms, err := iter.Collect(got.Blooms)
require.Nil(t, err)
require.Equal(t, *halfData[j].Series, got.Series.Series)
require.Equal(t, halfData[j].Series.Series, got.Series.Series)
for _, key := range halfKeys[j] {
found := false
for _, b := range blooms {

@ -331,23 +331,16 @@ func (fq *FusedQuerier) runSeries(schema Schema, series *SeriesWithMeta, reqs []
continue
}
// TODO(owen-d): copying this over, but they're going to be the same
// across any block schema because prefix len is determined by n-gram and
// all chunks have the same encoding length. tl;dr: it's weird/unnecessary to have
// these defined this way and recreated across each bloom
var (
tokenBuf []byte
prefixLen int
)
for k, chk := range inputs[j].InBlooms {
// if we've already found this chunk in a previous bloom, skip testing it
if inputs[j].found[k] {
continue
}
// Get buf to concatenate the chunk and search token
tokenBuf, prefixLen = prefixedToken(schema.NGramLen(), chk, tokenBuf)
if matched := req.Search.MatchesWithPrefixBuf(bloom, tokenBuf, prefixLen); matched {
// TODO(rfratto): reuse buffer between multiple calls to
// prefixForChunkRef and MatchesWithPrefixBuf to avoid allocations.
tokenBuf := prefixForChunkRef(chk)
if matched := req.Search.MatchesWithPrefixBuf(bloom, tokenBuf, len(tokenBuf)); matched {
inputs[j].found[k] = true
}
}

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"math"
"sync"
"testing"
@ -27,13 +28,14 @@ var BloomPagePool = mempool.New("test", []mempool.Bucket{
// TODO(owen-d): this is unhinged from the data it represents. I'm leaving this solely so I don't
// have to refactor tests here in order to fix this elsewhere, but it can/should be fixed --
// the skip & n len are hardcoded based on data that's passed to it elsewhere.
// TODO(chaudum): Can be removed once matching with structured metadata is implemented.
type fakeNgramBuilder struct{}
func (f fakeNgramBuilder) N() int { return 4 }
func (f fakeNgramBuilder) N() int { return math.MaxInt } // do not tokenize
func (f fakeNgramBuilder) SkipFactor() int { return 0 }
func (f fakeNgramBuilder) Tokens(line string) v2.Iterator[[]byte] {
return v2.NewSliceIter[[]byte]([][]byte{[]byte(line)})
func (f fakeNgramBuilder) Tokens(key string) v2.Iterator[[]byte] {
return v2.NewSliceIter[[]byte]([][]byte{[]byte(key)})
}
func keysToBloomTest(keys [][]byte) BloomTest {

@ -9,9 +9,9 @@ import (
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/push"
"github.com/grafana/loki/v3/pkg/chunkenc"
iter "github.com/grafana/loki/v3/pkg/iter/v2"
"github.com/grafana/loki/v3/pkg/storage/bloom/v1/filter"
)
// TODO(owen-d): this should probably be in it's own testing-util package
@ -46,94 +46,73 @@ func MakeBlock(t testing.TB, nth int, fromFp, throughFp model.Fingerprint, fromT
return block, data, keys
}
// This is a helper type used in tests that buffers blooms and can be turned into
// the commonly used iterator form *SeriesWithBlooms.
type SeriesWithLiteralBlooms struct {
Series *Series
Blooms []*Bloom
}
func (s *SeriesWithLiteralBlooms) SeriesWithBlooms() SeriesWithBlooms {
offsets := make([]BloomOffset, 0, len(s.Blooms))
for i := range s.Blooms {
func newSeriesWithBlooms(series Series, blooms []*Bloom) SeriesWithBlooms {
offsets := make([]BloomOffset, 0, len(blooms))
for i := range blooms {
offsets = append(offsets, BloomOffset{Page: i, ByteOffset: 0})
}
return SeriesWithBlooms{
Series: &SeriesWithMeta{
Series: *s.Series,
Series: series,
Meta: Meta{
Fields: NewSetFromLiteral[Field]("trace_id"),
Offsets: offsets,
},
},
Blooms: iter.NewSliceIter(s.Blooms),
Blooms: iter.NewSliceIter(blooms),
}
}
func MkBasicSeriesWithBlooms(nSeries int, fromFp, throughFp model.Fingerprint, fromTs, throughTs model.Time) (seriesList []SeriesWithBlooms, keysList [][][]byte) {
series, keys := MkBasicSeriesWithLiteralBlooms(nSeries, fromFp, throughFp, fromTs, throughTs)
mapped := make([]SeriesWithBlooms, 0, len(series))
for _, s := range series {
v := s.SeriesWithBlooms()
mapped = append(mapped, v)
}
return mapped, keys
}
func MkBasicSeriesWithBlooms(nSeries int, fromFp, throughFp model.Fingerprint, fromTs, throughTs model.Time) ([]SeriesWithBlooms, [][][]byte) {
// return values
seriesList := make([]SeriesWithBlooms, 0, nSeries)
keysList := make([][][]byte, 0, nSeries)
func MkBasicSeriesWithLiteralBlooms(nSeries int, fromFp, throughFp model.Fingerprint, fromTs, throughTs model.Time) (seriesList []SeriesWithLiteralBlooms, keysList [][][]byte) {
const nGramLen = 4
seriesList = make([]SeriesWithLiteralBlooms, 0, nSeries)
keysList = make([][][]byte, 0, nSeries)
numChunksPerSeries := 10
numBloomsPerSeries := 2
step := (throughFp - fromFp) / model.Fingerprint(nSeries)
timeDelta := time.Duration(throughTs.Sub(fromTs).Nanoseconds() / int64(nSeries))
timeDelta := time.Duration(throughTs.Sub(fromTs).Nanoseconds() / int64(numChunksPerSeries))
tokenizer := NewNGramTokenizer(nGramLen, 0)
for i := 0; i < nSeries; i++ {
var series Series
var blooms []*Bloom
series.Fingerprint = fromFp + model.Fingerprint(i)*step
from := fromTs.Add(timeDelta * time.Duration(i))
series.Chunks = []ChunkRef{
{
From: from,
Through: from.Add(timeDelta),
Checksum: uint32(i),
},
for from := fromTs; from < throughTs; from = from.Add(timeDelta) {
series.Chunks = append(series.Chunks,
ChunkRef{
From: from,
Through: from.Add(timeDelta),
},
)
}
var bloom Bloom
bloom.ScalableBloomFilter = *filter.NewScalableBloomFilter(1024, 0.01, 0.8)
keys := make([][]byte, 0, int(step))
for _, chk := range series.Chunks {
tokenBuf, prefixLen := prefixedToken(nGramLen, chk, nil)
for j := 0; j < int(step); j++ {
line := fmt.Sprintf("%04x:%04x", int(series.Fingerprint), j)
it := tokenizer.Tokens(line)
chunkBatchSize := (series.Chunks.Len() + numBloomsPerSeries - 1) / numBloomsPerSeries
for j := 0; j < numBloomsPerSeries; j++ {
bloom := NewBloom()
batchStart, batchEnd := j*chunkBatchSize, min(series.Chunks.Len(), (j+1)*chunkBatchSize)
for x, chk := range series.Chunks[batchStart:batchEnd] {
tokenizer := NewStructuredMetadataTokenizer(string(prefixForChunkRef(chk)))
kv := push.LabelAdapter{Name: "trace_id", Value: fmt.Sprintf("%s:%04x", series.Fingerprint, j*chunkBatchSize+x)}
it := tokenizer.Tokens(kv)
for it.Next() {
key := it.At()
// series-level key
key := []byte(it.At())
bloom.Add(key)
// chunk-level key
tokenBuf = append(tokenBuf[:prefixLen], key...)
bloom.Add(tokenBuf)
keyCopy := key
keys = append(keys, keyCopy)
keys = append(keys, key)
}
}
blooms = append(blooms, bloom)
}
seriesList = append(seriesList, SeriesWithLiteralBlooms{
Series: &series,
Blooms: []*Bloom{&bloom},
})
seriesList = append(seriesList, newSeriesWithBlooms(series, blooms))
keysList = append(keysList, keys)
}
return
return seriesList, keysList
}
func EqualIterators[T any](t *testing.T, test func(a, b T), expected, actual iter.Iterator[T]) {

@ -28,9 +28,9 @@ func smallBlockOpts(v Version, enc chunkenc.Encoding) BlockOptions {
}
}
func setup(v Version) (BlockOptions, []SeriesWithLiteralBlooms, BlockWriter, BlockReader) {
func setup(v Version) (BlockOptions, []SeriesWithBlooms, BlockWriter, BlockReader) {
numSeries := 100
data, _ := MkBasicSeriesWithLiteralBlooms(numSeries, 0, 0xffff, 0, 10000)
data, _ := MkBasicSeriesWithBlooms(numSeries, 0, 0xffff, 0, 10000)
indexBuf := bytes.NewBuffer(nil)
bloomsBuf := bytes.NewBuffer(nil)
writer := NewMemoryBlockWriter(indexBuf, bloomsBuf)
@ -39,61 +39,47 @@ func setup(v Version) (BlockOptions, []SeriesWithLiteralBlooms, BlockWriter, Blo
}
func TestV3Roundtrip(t *testing.T) {
opts, data, writer, reader := setup(V3)
opts, sourceData, writer, reader := setup(V3)
data, err := v2.Collect(
v2.NewMapIter[SeriesWithLiteralBlooms, SeriesWithLiteralBlooms](
v2.NewSliceIter(data),
func(swlb SeriesWithLiteralBlooms) SeriesWithLiteralBlooms {
return SeriesWithLiteralBlooms{
Series: swlb.Series,
// hack(owen-d): data currently only creates one bloom per series, but I want to test multiple.
// we're not checking the contents here, so ensuring the same bloom is used twice is fine.
Blooms: []*Bloom{swlb.Blooms[0], swlb.Blooms[0]},
}
},
),
)
require.NoError(t, err)
// SeriesWithBlooms holds an interator of blooms,
// which will be exhausted after being consumed by the block builder
// therefore we need a deepcopy of the original data, or - and that's easier to achieve -
// we simply create the same data twice.
_, unmodifiedData, _, _ := setup(V3)
b, err := NewBlockBuilderV3(opts, writer)
require.NoError(t, err)
mapped := v2.NewMapIter[SeriesWithLiteralBlooms](
v2.NewSliceIter(data),
func(s SeriesWithLiteralBlooms) SeriesWithBlooms {
return s.SeriesWithBlooms()
},
)
_, err = b.BuildFrom(mapped)
_, err = b.BuildFrom(v2.NewSliceIter(sourceData))
require.NoError(t, err)
// Ensure Equality
block := NewBlock(reader, NewMetrics(nil))
querier := NewBlockQuerier(block, &mempool.SimpleHeapAllocator{}, DefaultMaxPageSize).Iter()
CompareIterators[SeriesWithLiteralBlooms, *SeriesWithBlooms](
CompareIterators[SeriesWithBlooms, *SeriesWithBlooms](
t,
func(t *testing.T, a SeriesWithLiteralBlooms, b *SeriesWithBlooms) {
require.Equal(t, *a.Series, b.Series.Series) // ensure series equality
bs, err := v2.Collect(b.Blooms)
func(t *testing.T, a SeriesWithBlooms, b *SeriesWithBlooms) {
require.Equal(t, a.Series.Series.Fingerprint, b.Series.Series.Fingerprint)
require.ElementsMatch(t, a.Series.Series.Chunks, b.Series.Series.Chunks)
bloomsA, err := v2.Collect(a.Blooms)
require.NoError(t, err)
bloomsB, err := v2.Collect(b.Blooms)
require.NoError(t, err)
// ensure we only have one bloom in v1
require.Equal(t, 2, len(a.Blooms))
require.Equal(t, 2, len(bs))
require.Equal(t, 2, len(bloomsA))
require.Equal(t, 2, len(bloomsB))
var encA, encB encoding.Encbuf
for i := range a.Blooms {
require.NoError(t, a.Blooms[i].Encode(&encA))
require.NoError(t, bs[i].Encode(&encB))
for i := range bloomsA {
require.NoError(t, bloomsA[i].Encode(&encA))
require.NoError(t, bloomsB[i].Encode(&encB))
require.Equal(t, encA.Get(), encB.Get())
encA.Reset()
encB.Reset()
}
},
v2.NewSliceIter(data),
v2.NewSliceIter(unmodifiedData),
querier,
)
}

Loading…
Cancel
Save