feat(blooms): Separate page buffer pools for series pages and bloom pages (#12992)

Series pages are much smaller than bloom pages and therefore can make use of a separate buffer pool with different buckets.

The second commit fixes a possible panic.


Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/13001/head
Christian Haudum 1 year ago committed by GitHub
parent 94d610e5e0
commit 75ccf2160b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 14
      pkg/storage/bloom/v1/bloom.go
  2. 6
      pkg/storage/bloom/v1/index.go
  3. 17
      pkg/storage/bloom/v1/util.go

@ -24,7 +24,7 @@ type Bloom struct {
func (b *Bloom) Encode(enc *encoding.Encbuf) error {
// divide by 8 b/c bloom capacity is measured in bits, but we want bytes
buf := bytes.NewBuffer(BlockPool.Get(int(b.Capacity() / 8)))
buf := bytes.NewBuffer(BloomPagePool.Get(int(b.Capacity() / 8)))
// TODO(owen-d): have encoder implement writer directly so we don't need
// to indirect via a buffer
@ -36,7 +36,7 @@ func (b *Bloom) Encode(enc *encoding.Encbuf) error {
data := buf.Bytes()
enc.PutUvarint(len(data)) // length of bloom filter
enc.PutBytes(data)
BlockPool.Put(data[:0]) // release to pool
BloomPagePool.Put(data[:0]) // release to pool
return nil
}
@ -65,8 +65,8 @@ func (b *Bloom) Decode(dec *encoding.Decbuf) error {
}
func LazyDecodeBloomPage(r io.Reader, pool chunkenc.ReaderPool, page BloomPageHeader) (*BloomPageDecoder, error) {
data := BlockPool.Get(page.Len)[:page.Len]
defer BlockPool.Put(data)
data := BloomPagePool.Get(page.Len)[:page.Len]
defer BloomPagePool.Put(data)
_, err := io.ReadFull(r, data)
if err != nil {
@ -84,7 +84,7 @@ func LazyDecodeBloomPage(r io.Reader, pool chunkenc.ReaderPool, page BloomPageHe
}
defer pool.PutReader(decompressor)
b := BlockPool.Get(page.DecompressedLen)[:page.DecompressedLen]
b := BloomPagePool.Get(page.DecompressedLen)[:page.DecompressedLen]
if _, err = io.ReadFull(decompressor, b); err != nil {
return nil, errors.Wrap(err, "decompressing bloom page")
@ -101,7 +101,7 @@ func LazyDecodeBloomPageNoCompression(r io.Reader, page BloomPageHeader) (*Bloom
if page.Len != page.DecompressedLen+4 {
return nil, errors.New("the Len and DecompressedLen of the page do not match")
}
data := BlockPool.Get(page.Len)[:page.Len]
data := BloomPagePool.Get(page.Len)[:page.Len]
_, err := io.ReadFull(r, data)
if err != nil {
@ -163,7 +163,7 @@ func (d *BloomPageDecoder) Relinquish() {
d.data = nil
if cap(data) > 0 {
BlockPool.Put(data)
BloomPagePool.Put(data)
}
}

@ -155,7 +155,7 @@ func (b *BlockIndex) NewSeriesPageDecoder(r io.ReadSeeker, header SeriesPageHead
defer func() {
if err != nil {
metrics.pagesSkipped.WithLabelValues(pageTypeSeries, skipReasonErr).Inc()
metrics.bytesSkipped.WithLabelValues(pageTypeSeries).Add(float64(header.DecompressedLen))
metrics.bytesSkipped.WithLabelValues(pageTypeSeries, skipReasonErr).Add(float64(header.DecompressedLen))
} else {
metrics.pagesRead.WithLabelValues(pageTypeSeries).Inc()
metrics.bytesRead.WithLabelValues(pageTypeSeries).Add(float64(header.DecompressedLen))
@ -166,8 +166,8 @@ func (b *BlockIndex) NewSeriesPageDecoder(r io.ReadSeeker, header SeriesPageHead
return nil, errors.Wrap(err, "seeking to series page")
}
data := BlockPool.Get(header.Len)[:header.Len]
defer BlockPool.Put(data)
data := SeriesPagePool.Get(header.Len)[:header.Len]
defer SeriesPagePool.Put(data)
_, err = io.ReadFull(r, data)
if err != nil {
return nil, errors.Wrap(err, "reading series page")

@ -32,10 +32,21 @@ var (
},
}
// 4KB -> 128MB
BlockPool = BytePool{
// buffer pool for series pages
// 1KB 2KB 4KB 8KB 16KB 32KB 64KB 128KB
SeriesPagePool = BytePool{
pool: pool.New(
4<<10, 128<<20, 2,
1<<10, 128<<10, 2,
func(size int) interface{} {
return make([]byte, size)
}),
}
// buffer pool for bloom pages
// 128KB 256KB 512KB 1MB 2MB 4MB 8MB 16MB 32MB 64MB 128MB
BloomPagePool = BytePool{
pool: pool.New(
128<<10, 128<<20, 2,
func(size int) interface{} {
return make([]byte, size)
}),

Loading…
Cancel
Save