From cf71ac7114a6fb3a2c4bf71c7fbcd9c4e91a05eb Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Fri, 29 Mar 2024 15:43:18 +0100 Subject: [PATCH] fix(blooms): Remove blocks not matching any series in task (#12401) --- pkg/bloomgateway/processor.go | 2 +- pkg/bloomgateway/util.go | 2 +- pkg/bloomgateway/util_test.go | 20 ++++++++++++++++++++ 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/pkg/bloomgateway/processor.go b/pkg/bloomgateway/processor.go index 5cf805b11a..5d43e79eec 100644 --- a/pkg/bloomgateway/processor.go +++ b/pkg/bloomgateway/processor.go @@ -175,7 +175,7 @@ func (p *processor) processBlock(_ context.Context, blockQuerier *v1.BlockQuerie if sp := opentracing.SpanFromContext(task.ctx); sp != nil { md, _ := blockQuerier.Metadata() blk := bloomshipper.BlockRefFrom(task.Tenant, task.table.String(), md) - sp.LogKV("process block", blk.String()) + sp.LogKV("process block", blk.String(), "series", len(task.series)) } it := v1.NewPeekingIter(task.RequestIter(tokenizer)) diff --git a/pkg/bloomgateway/util.go b/pkg/bloomgateway/util.go index 5f2f2e31f7..e07c5740fd 100644 --- a/pkg/bloomgateway/util.go +++ b/pkg/bloomgateway/util.go @@ -78,7 +78,7 @@ func partitionTasks(tasks []Task, blocks []bloomshipper.BlockRef) []blockWithTas }) // All fingerprints fall outside of the consumer's range - if min == len(refs) || max == 0 { + if min == len(refs) || max == 0 || min == max { continue } diff --git a/pkg/bloomgateway/util_test.go b/pkg/bloomgateway/util_test.go index 9bd158219e..f624d33709 100644 --- a/pkg/bloomgateway/util_test.go +++ b/pkg/bloomgateway/util_test.go @@ -136,6 +136,26 @@ func TestPartitionTasks(t *testing.T) { require.Len(t, res.tasks[0].series, 90) } }) + + t.Run("block series before and after task series", func(t *testing.T) { + bounds := []bloomshipper.BlockRef{ + mkBlockRef(100, 200), + } + + tasks := []Task{ + { + series: []*logproto.GroupedChunkRefs{ + {Fingerprint: 50}, + {Fingerprint: 75}, + {Fingerprint: 250}, + {Fingerprint: 300}, + }, + }, + } + + results := partitionTasks(tasks, bounds) + require.Len(t, results, 0) + }) } func TestPartitionRequest(t *testing.T) {