fix(blooms): skip empty blooms on reads (#13500)

Read path safeguard to prevent filtering chunks associated with empty blooms while we work on finding the underlying cause of empty blooms in compaction.
pull/13502/head
Owen Diehl 11 months ago committed by GitHub
parent 652ad2455c
commit bfa69556af
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 7
      pkg/storage/bloom/v1/filter/scalable.go
  2. 7
      pkg/storage/bloom/v1/fuse.go
  3. 63
      pkg/storage/bloom/v1/fuse_test.go

@ -110,6 +110,13 @@ func (s *ScalableBloomFilter) K() uint {
return s.filters[len(s.filters)-1].K()
}
func (s *ScalableBloomFilter) Count() (ct int) {
for _, filter := range s.filters {
ct += int(filter.Count())
}
return
}
// FillRatio returns the average ratio of set bits across every filter.
func (s *ScalableBloomFilter) FillRatio() float64 {
var sum, count float64

@ -300,6 +300,13 @@ func (fq *FusedQuerier) runSeries(schema Schema, series *SeriesWithOffsets, reqs
// Test each bloom individually
bloom := fq.bq.blooms.At()
for j, req := range reqs {
// TODO(owen-d): this is a stopgap to avoid filtering broken blooms until we find their cause.
// In the case we don't have any data in the bloom, don't filter any chunks.
if bloom.ScalableBloomFilter.Count() == 0 {
for k := range inputs[j].InBlooms {
inputs[j].found[k] = true
}
}
// shortcut: series level removal
// we can skip testing chunk keys individually if the bloom doesn't match

@ -356,6 +356,69 @@ func TestLazyBloomIter_Seek_ResetError(t *testing.T) {
}
}
func TestFusedQuerierSkipsEmptyBlooms(t *testing.T) {
// references for linking in memory reader+writer
indexBuf := bytes.NewBuffer(nil)
bloomsBuf := bytes.NewBuffer(nil)
writer := NewMemoryBlockWriter(indexBuf, bloomsBuf)
reader := NewByteReader(indexBuf, bloomsBuf)
builder, err := NewBlockBuilder(
BlockOptions{
Schema: Schema{
version: DefaultSchemaVersion,
encoding: chunkenc.EncNone,
},
SeriesPageSize: 100,
BloomPageSize: 10 << 10,
},
writer,
)
require.Nil(t, err)
data := SeriesWithBlooms{
Series: &Series{
Fingerprint: 0,
Chunks: []ChunkRef{
{
From: 0,
Through: 10,
Checksum: 0x1234,
},
},
},
Blooms: v2.NewSliceIter([]*Bloom{
// simulate empty bloom
{
*filter.NewScalableBloomFilter(1024, 0.01, 0.8),
},
}),
}
itr := v2.NewSliceIter[SeriesWithBlooms]([]SeriesWithBlooms{data})
_, err = builder.BuildFrom(itr)
require.NoError(t, err)
require.False(t, itr.Next())
block := NewBlock(reader, NewMetrics(nil))
ch := make(chan Output, 1)
req := Request{
Fp: data.Series.Fingerprint,
Chks: data.Series.Chunks,
Search: keysToBloomTest([][]byte{[]byte("foobar")}),
Response: ch,
Recorder: NewBloomRecorder(context.Background(), "unknown"),
}
err = NewBlockQuerier(block, BloomPagePool, DefaultMaxPageSize).Fuse(
[]v2.PeekIterator[Request]{
v2.NewPeekIter[Request](v2.NewSliceIter[Request]([]Request{req})),
},
log.NewNopLogger(),
).Run()
require.NoError(t, err)
x := <-ch
require.Equal(t, 0, len(x.Removals))
}
func setupBlockForBenchmark(b *testing.B) (*BlockQuerier, [][]Request, []chan Output) {
indexBuf := bytes.NewBuffer(nil)
bloomsBuf := bytes.NewBuffer(nil)

Loading…
Cancel
Save