perf(blooms): always return bloom pages to allocator (#13288)

pull/13289/head^2
Owen Diehl 11 months ago committed by GitHub
parent 0ddf1e6113
commit 0cb3ff1830
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 30
      pkg/bloomgateway/processor.go
  2. 11
      pkg/storage/bloom/v1/block.go
  3. 2
      pkg/storage/bloom/v1/bloom.go
  4. 9
      pkg/storage/bloom/v1/bloom_querier.go
  5. 8
      pkg/storage/stores/shipper/bloomshipper/cache.go

@ -10,6 +10,7 @@ import (
"github.com/pkg/errors"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/multierror"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/config"
@ -113,13 +114,14 @@ func (p *processor) processTasks(ctx context.Context, tenant string, day config.
}
func (p *processor) processBlocks(ctx context.Context, bqs []*bloomshipper.CloseableBlockQuerier, data []blockWithTasks) error {
// We opportunistically close blocks during iteration to allow returning memory to the pool, etc,
// as soon as possible, but since we exit early on error, we need to ensure we close all blocks.
hasClosed := make([]bool, len(bqs))
defer func() {
for i := range bqs {
if bqs[i] == nil {
continue
for i, bq := range bqs {
if !hasClosed[i] {
_ = bq.Close()
}
bqs[i].Close()
}
}()
@ -136,15 +138,21 @@ func (p *processor) processBlocks(ctx context.Context, bqs []*bloomshipper.Close
return errors.Errorf("block and querier bounds differ: %s vs %s", block.ref.Bounds, bq.Bounds)
}
err := p.processBlock(ctx, bq.BlockQuerier, block.tasks)
if err != nil {
return errors.Wrap(err, "processing block")
}
return nil
var errs multierror.MultiError
errs.Add(
errors.Wrap(
p.processBlock(ctx, bq, block.tasks),
"processing block",
),
)
errs.Add(bq.Close())
hasClosed[i] = true
return errs.Err()
})
}
func (p *processor) processBlock(_ context.Context, blockQuerier *v1.BlockQuerier, tasks []Task) error {
func (p *processor) processBlock(_ context.Context, bq *bloomshipper.CloseableBlockQuerier, tasks []Task) (err error) {
blockQuerier := bq.BlockQuerier
schema, err := blockQuerier.Schema()
if err != nil {
return err

@ -136,6 +136,7 @@ func (bq *BlockQuerier) Schema() (Schema, error) {
}
func (bq *BlockQuerier) Reset() error {
bq.blooms.Reset()
return bq.LazySeriesIter.Seek(0)
}
@ -147,10 +148,6 @@ func (bq *BlockQuerier) Err() error {
return bq.blooms.Err()
}
func (bq *BlockQuerier) Close() {
bq.blooms.Close()
}
type BlockQuerierIter struct {
*BlockQuerier
}
@ -163,7 +160,11 @@ func (bq *BlockQuerier) Iter() *BlockQuerierIter {
}
func (b *BlockQuerierIter) Next() bool {
return b.LazySeriesIter.Next()
next := b.LazySeriesIter.Next()
if !next {
b.blooms.Reset()
}
return next
}
func (b *BlockQuerierIter) At() *SeriesWithBlooms {

@ -175,10 +175,12 @@ func (d *BloomPageDecoder) Relinquish(alloc mempool.Allocator) {
data := d.data
d.data = nil
d.Reset() // Reset for cleaning up residual references to data via `dec`
if cap(data) > 0 {
_ = alloc.Put(data)
}
}
func (d *BloomPageDecoder) Reset() {

@ -162,6 +162,11 @@ func (it *LazyBloomIter) Err() error {
}
}
func (it *LazyBloomIter) Close() {
it.curPage.Relinquish(it.alloc)
func (it *LazyBloomIter) Reset() {
it.err = nil
it.curPageIndex = 0
if it.curPage != nil {
it.curPage.Relinquish(it.alloc)
}
it.curPage = nil
}

@ -8,6 +8,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/multierror"
"github.com/pkg/errors"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
@ -23,11 +24,12 @@ type CloseableBlockQuerier struct {
}
func (c *CloseableBlockQuerier) Close() error {
c.BlockQuerier.Close()
var err multierror.MultiError
err.Add(c.BlockQuerier.Reset())
if c.close != nil {
return c.close()
err.Add(c.close())
}
return nil
return err.Err()
}
func (c *CloseableBlockQuerier) SeriesIter() (v1.PeekingIterator[*v1.SeriesWithBlooms], error) {

Loading…
Cancel
Save