fix(blooms): fix reference leak and resulting race condition in BloomPageDecoder (#12050)

pull/12053/head
Owen Diehl 2 years ago committed by GitHub
parent 8193063a4a
commit 7c8992f4e4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 41
      pkg/storage/bloom/v1/bloom.go
  2. 6
      pkg/storage/bloom/v1/bloom_querier.go
  3. 7
      pkg/storage/bloom/v1/index.go

@ -20,6 +20,8 @@ func (b *Bloom) Encode(enc *encoding.Encbuf) error {
// divide by 8 b/c bloom capacity is measured in bits, but we want bytes
buf := bytes.NewBuffer(BlockPool.Get(int(b.Capacity() / 8)))
// TODO(owen-d): have encoder implement writer directly so we don't need
// to indirect via a buffer
_, err := b.WriteTo(buf)
if err != nil {
return errors.Wrap(err, "encoding bloom filter")
@ -56,7 +58,16 @@ func (b *Bloom) Decode(dec *encoding.Decbuf) error {
return nil
}
func LazyDecodeBloomPage(dec *encoding.Decbuf, pool chunkenc.ReaderPool, decompressedSize int) (*BloomPageDecoder, error) {
func LazyDecodeBloomPage(r io.Reader, pool chunkenc.ReaderPool, page BloomPageHeader) (*BloomPageDecoder, error) {
data := BlockPool.Get(page.Len)[:page.Len]
defer BlockPool.Put(data)
_, err := io.ReadFull(r, data)
if err != nil {
return nil, errors.Wrap(err, "reading bloom page")
}
dec := encoding.DecWith(data)
if err := dec.CheckCrc(castagnoliTable); err != nil {
return nil, errors.Wrap(err, "checksumming bloom page")
}
@ -67,7 +78,7 @@ func LazyDecodeBloomPage(dec *encoding.Decbuf, pool chunkenc.ReaderPool, decompr
}
defer pool.PutReader(decompressor)
b := BlockPool.Get(decompressedSize)[:decompressedSize]
b := make([]byte, page.DecompressedLen)
if _, err = io.ReadFull(decompressor, b); err != nil {
return nil, errors.Wrap(err, "decompressing bloom page")
@ -98,6 +109,13 @@ func NewBloomPageDecoder(data []byte) *BloomPageDecoder {
}
// Decoder is a seekable, reset-able iterator
// TODO(owen-d): use buffer pools. The reason we don't currently
// do this is because the `data` slice currently escapes the decoder
// via the returned bloom, so we can't know when it's safe to return it to the pool.
// This happens via `data ([]byte) -> dec (*encoding.Decbuf) -> bloom (Bloom)` where
// the final Bloom has a reference to the data slice.
// We could optimize this by encoding the mode (read, write) into our structs
// and doing copy-on-write shenannigans, but I'm avoiding this for now.
type BloomPageDecoder struct {
data []byte
dec *encoding.Decbuf
@ -107,15 +125,6 @@ type BloomPageDecoder struct {
err error
}
// Drop returns the underlying byte slice to the pool
// for efficiency. It's intended to be used as a
// perf optimization prior to garbage collection.
func (d *BloomPageDecoder) Drop() {
if cap(d.data) > 0 {
BlockPool.Put(d.data)
}
}
func (d *BloomPageDecoder) Reset() {
d.err = nil
d.cur = nil
@ -234,13 +243,5 @@ func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int) (*BloomPageD
return nil, errors.Wrap(err, "seeking to bloom page")
}
data := BlockPool.Get(page.Len)[:page.Len]
_, err := io.ReadFull(r, data)
if err != nil {
return nil, errors.Wrap(err, "reading bloom page")
}
dec := encoding.DecWith(data)
return LazyDecodeBloomPage(&dec, b.schema.DecompressorPool(), page.DecompressedLen)
return LazyDecodeBloomPage(r, b.schema.DecompressorPool(), page)
}

@ -39,11 +39,6 @@ func (it *LazyBloomIter) Seek(offset BloomOffset) {
// load the desired page
if it.curPageIndex != offset.Page || it.curPage == nil {
// drop the current page if it exists
if it.curPage != nil {
it.curPage.Drop()
}
r, err := it.b.reader.Blooms()
if err != nil {
it.err = errors.Wrap(err, "getting blooms reader")
@ -103,7 +98,6 @@ func (it *LazyBloomIter) next() bool {
}
// we've exhausted the current page, progress to next
it.curPageIndex++
it.curPage.Drop()
it.curPage = nil
continue
}

@ -173,19 +173,14 @@ func (b *BlockIndex) NewSeriesPageDecoder(r io.ReadSeeker, header SeriesPageHead
return nil, errors.Wrap(err, "getting decompressor")
}
decompressed := BlockPool.Get(header.DecompressedLen)[:header.DecompressedLen]
decompressed := make([]byte, header.DecompressedLen)
if _, err = io.ReadFull(decompressor, decompressed); err != nil {
return nil, errors.Wrap(err, "decompressing series page")
}
// replace decoder's input with the now-decompressed data
dec.B = decompressed
res := &SeriesPageDecoder{
data: decompressed,
header: header.SeriesHeader,
i: -1,
}
res.Reset()

Loading…
Cancel
Save