Limit bloom block size (#11878)

**What this PR does / why we need it**:
This PR limits the size of the blocks created by the compactor.

The block builder keeps adding series' blooms to a block until the limit
is exceeded, meaning that blocks may grow beyond the configured maximum.
This is needed so we avoid having tiny blocks which had space for small
blooms but later a bigger blooms didn't fit.

Blocks are built lazily: the generator returns an iterator that builds
one block at a time.

**Special notes for your reviewer**:
The maximum size is currently set to 50 MBs. We will make this
configurable on a followup PR.

**Checklist**
- [x] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [x] Documentation added
- [x] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e3ec)
- [ ] If the change is deprecating or removing a configuration option,
update the `deprecated-config.yaml` and `deleted-config.yaml` files
respectively in the `tools/deprecated-config-checker` directory.
[Example
PR](0d4416a4b0)

---------

Co-authored-by: Christian Haudum <christian.haudum@gmail.com>
pull/11889/head
Salva Corts 1 year ago committed by GitHub
parent 57619b76a4
commit 681bb57971
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 20
      pkg/bloomcompactor/controller.go
  2. 113
      pkg/bloomcompactor/spec.go
  3. 62
      pkg/bloomcompactor/spec_test.go
  4. 4
      pkg/storage/bloom/v1/block.go
  5. 59
      pkg/storage/bloom/v1/builder.go
  6. 65
      pkg/storage/bloom/v1/builder_test.go
  7. 3
      pkg/storage/bloom/v1/fuse_test.go
  8. 2
      pkg/storage/bloom/v1/util.go

@ -8,6 +8,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/multierror"
"github.com/pkg/errors"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
@ -134,10 +135,20 @@ func (s *SimpleBloomController) buildBlocks(
level.Error(logger).Log("msg", "failed to get series and blocks", "err", err)
return errors.Wrap(err, "failed to get series and blocks")
}
// Close all remaining blocks on exit
closePreExistingBlocks := func() {
var closeErrors multierror.MultiError
for _, block := range preExistingBlocks {
closeErrors.Add(block.Close())
}
if err := closeErrors.Err(); err != nil {
level.Error(s.logger).Log("msg", "failed to close blocks", "err", err)
}
}
gen := NewSimpleBloomGenerator(
tenant,
v1.DefaultBlockOptions,
v1.DefaultBlockOptions, // TODO(salvacorts) make block options configurable
seriesItr,
s.chunkLoader,
preExistingBlocks,
@ -150,13 +161,14 @@ func (s *SimpleBloomController) buildBlocks(
if err != nil {
// TODO(owen-d): metrics
level.Error(logger).Log("msg", "failed to generate bloom", "err", err)
closePreExistingBlocks()
return errors.Wrap(err, "failed to generate bloom")
}
client, err := s.bloomStore.Client(table.ModelTime())
if err != nil {
level.Error(logger).Log("msg", "failed to get client", "err", err)
closePreExistingBlocks()
return errors.Wrap(err, "failed to get client")
}
for newBlocks.Next() {
@ -168,6 +180,7 @@ func (s *SimpleBloomController) buildBlocks(
bloomshipper.BlockFrom(tenant, table.String(), blk),
); err != nil {
level.Error(logger).Log("msg", "failed to write block", "err", err)
closePreExistingBlocks()
return errors.Wrap(err, "failed to write block")
}
}
@ -175,9 +188,12 @@ func (s *SimpleBloomController) buildBlocks(
if err := newBlocks.Err(); err != nil {
// TODO(owen-d): metrics
level.Error(logger).Log("msg", "failed to generate bloom", "err", err)
closePreExistingBlocks()
return errors.Wrap(err, "failed to generate bloom")
}
// Close pre-existing blocks
closePreExistingBlocks()
}
}

@ -11,8 +11,6 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/grafana/dskit/multierror"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/logproto"
logql_log "github.com/grafana/loki/pkg/logql/log"
@ -110,20 +108,7 @@ func (s *SimpleBloomGenerator) populator(ctx context.Context) func(series *v1.Se
}
func (s *SimpleBloomGenerator) Generate(ctx context.Context) (skippedBlocks []v1.BlockMetadata, results v1.Iterator[*v1.Block], err error) {
var closeErrors multierror.MultiError
blocksMatchingSchema := make([]v1.PeekingIterator[*v1.SeriesWithBloom], 0, len(s.blocks))
toClose := make([]*bloomshipper.CloseableBlockQuerier, 0, len(s.blocks))
// Close all remaining blocks on exit
defer func() {
for _, block := range toClose {
closeErrors.Add(block.Close())
}
if err := closeErrors.Err(); err != nil {
level.Error(s.logger).Log("msg", "failed to close blocks", "err", err)
}
}()
blocksMatchingSchema := make([]*bloomshipper.CloseableBlockQuerier, 0, len(s.blocks))
for _, block := range s.blocks {
logger := log.With(s.logger, "block", block.BlockRef)
md, err := block.Metadata()
@ -131,46 +116,106 @@ func (s *SimpleBloomGenerator) Generate(ctx context.Context) (skippedBlocks []v1
if err != nil {
level.Warn(logger).Log("msg", "failed to get schema for block", "err", err)
skippedBlocks = append(skippedBlocks, md)
// Close unneeded block
closeErrors.Add(block.Close())
continue
}
if !s.opts.Schema.Compatible(schema) {
level.Warn(logger).Log("msg", "block schema incompatible with options", "generator_schema", fmt.Sprintf("%+v", s.opts.Schema), "block_schema", fmt.Sprintf("%+v", schema))
skippedBlocks = append(skippedBlocks, md)
// Close unneeded block
closeErrors.Add(block.Close())
continue
}
level.Debug(logger).Log("msg", "adding compatible block to bloom generation inputs")
itr := v1.NewPeekingIter[*v1.SeriesWithBloom](block)
blocksMatchingSchema = append(blocksMatchingSchema, itr)
// append needed block to close list (when finished)
toClose = append(toClose, block)
blocksMatchingSchema = append(blocksMatchingSchema, block)
}
level.Debug(s.logger).Log("msg", "generating bloom filters for blocks", "num_blocks", len(blocksMatchingSchema), "skipped_blocks", len(skippedBlocks), "schema", fmt.Sprintf("%+v", s.opts.Schema))
// TODO(owen-d): implement bounded block sizes
mergeBuilder := v1.NewMergeBuilder(blocksMatchingSchema, s.store, s.populator(ctx))
writer, reader := s.readWriterFn()
series := v1.NewPeekingIter(s.store)
blockIter := NewLazyBlockBuilderIterator(ctx, s.opts, s.populator(ctx), s.readWriterFn, series, blocksMatchingSchema)
return skippedBlocks, blockIter, nil
}
blockBuilder, err := v1.NewBlockBuilder(v1.NewBlockOptionsFromSchema(s.opts.Schema), writer)
if err != nil {
return skippedBlocks, nil, errors.Wrap(err, "failed to create bloom block builder")
// LazyBlockBuilderIterator is a lazy iterator over blocks that builds
// each block by adding series to them until they are full.
type LazyBlockBuilderIterator struct {
ctx context.Context
opts v1.BlockOptions
populate func(*v1.Series, *v1.Bloom) error
readWriterFn func() (v1.BlockWriter, v1.BlockReader)
series v1.PeekingIterator[*v1.Series]
blocks []*bloomshipper.CloseableBlockQuerier
blocksAsPeekingIter []v1.PeekingIterator[*v1.SeriesWithBloom]
curr *v1.Block
err error
}
func NewLazyBlockBuilderIterator(
ctx context.Context,
opts v1.BlockOptions,
populate func(*v1.Series, *v1.Bloom) error,
readWriterFn func() (v1.BlockWriter, v1.BlockReader),
series v1.PeekingIterator[*v1.Series],
blocks []*bloomshipper.CloseableBlockQuerier,
) *LazyBlockBuilderIterator {
it := &LazyBlockBuilderIterator{
ctx: ctx,
opts: opts,
populate: populate,
readWriterFn: readWriterFn,
series: series,
blocks: blocks,
blocksAsPeekingIter: make([]v1.PeekingIterator[*v1.SeriesWithBloom], len(blocks)),
}
return it
}
func (b *LazyBlockBuilderIterator) Next() bool {
// No more series to process
if _, hasNext := b.series.Peek(); !hasNext {
return false
}
// reset all the blocks to the start
for i, block := range b.blocks {
if err := block.Reset(); err != nil {
b.err = errors.Wrapf(err, "failed to reset block iterator %d", i)
return false
}
b.blocksAsPeekingIter[i] = v1.NewPeekingIter[*v1.SeriesWithBloom](block)
}
if err := b.ctx.Err(); err != nil {
b.err = errors.Wrap(err, "context canceled")
return false
}
mergeBuilder := v1.NewMergeBuilder(b.blocksAsPeekingIter, b.series, b.populate)
writer, reader := b.readWriterFn()
blockBuilder, err := v1.NewBlockBuilder(b.opts, writer)
if err != nil {
b.err = errors.Wrap(err, "failed to create bloom block builder")
return false
}
_, err = mergeBuilder.Build(blockBuilder)
if err != nil {
return skippedBlocks, nil, errors.Wrap(err, "failed to build bloom block")
b.err = errors.Wrap(err, "failed to build bloom block")
return false
}
return skippedBlocks, v1.NewSliceIter[*v1.Block]([]*v1.Block{v1.NewBlock(reader)}), nil
b.curr = v1.NewBlock(reader)
return true
}
func (b *LazyBlockBuilderIterator) At() *v1.Block {
return b.curr
}
func (b *LazyBlockBuilderIterator) Err() error {
return b.err
}
// IndexLoader loads an index. This helps us do things like

@ -28,7 +28,7 @@ func blocksFromSchemaWithRange(t *testing.T, n int, options v1.BlockOptions, fro
numKeysPerSeries := 10000
data, _ = v1.MkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, fromFP, throughFp, 0, 10000)
seriesPerBlock := 100 / n
seriesPerBlock := numSeries / n
for i := 0; i < n; i++ {
// references for linking in memory reader+writer
@ -88,24 +88,35 @@ func dummyBloomGen(opts v1.BlockOptions, store v1.Iterator[*v1.Series], blocks [
}
func TestSimpleBloomGenerator(t *testing.T) {
const maxBlockSize = 100 << 20 // 100MB
for _, tc := range []struct {
desc string
fromSchema, toSchema v1.BlockOptions
sourceBlocks, numSkipped int
desc string
fromSchema, toSchema v1.BlockOptions
sourceBlocks, numSkipped, outputBlocks int
}{
{
desc: "SkipsIncompatibleSchemas",
fromSchema: v1.NewBlockOptions(3, 0),
toSchema: v1.NewBlockOptions(4, 0),
fromSchema: v1.NewBlockOptions(3, 0, maxBlockSize),
toSchema: v1.NewBlockOptions(4, 0, maxBlockSize),
sourceBlocks: 2,
numSkipped: 2,
outputBlocks: 1,
},
{
desc: "CombinesBlocks",
fromSchema: v1.NewBlockOptions(4, 0),
toSchema: v1.NewBlockOptions(4, 0),
fromSchema: v1.NewBlockOptions(4, 0, maxBlockSize),
toSchema: v1.NewBlockOptions(4, 0, maxBlockSize),
sourceBlocks: 2,
numSkipped: 0,
outputBlocks: 1,
},
{
desc: "MaxBlockSize",
fromSchema: v1.NewBlockOptions(4, 0, maxBlockSize),
toSchema: v1.NewBlockOptions(4, 0, 1<<10), // 1KB
sourceBlocks: 2,
numSkipped: 0,
outputBlocks: 3,
},
} {
t.Run(tc.desc, func(t *testing.T) {
@ -122,22 +133,25 @@ func TestSimpleBloomGenerator(t *testing.T) {
require.Nil(t, err)
require.Equal(t, tc.numSkipped, len(skipped))
require.True(t, results.Next())
block := results.At()
require.False(t, results.Next())
refs := v1.PointerSlice[v1.SeriesWithBloom](data)
v1.EqualIterators[*v1.SeriesWithBloom](
t,
func(a, b *v1.SeriesWithBloom) {
// TODO(owen-d): better equality check
// once chunk fetching is implemented
require.Equal(t, a.Series, b.Series)
},
v1.NewSliceIter[*v1.SeriesWithBloom](refs),
block.Querier(),
)
var outputBlocks []*v1.Block
for results.Next() {
outputBlocks = append(outputBlocks, results.At())
}
require.Equal(t, tc.outputBlocks, len(outputBlocks))
// Check all the input series are present in the output blocks.
expectedRefs := v1.PointerSlice(data)
outputRefs := make([]*v1.SeriesWithBloom, 0, len(data))
for _, block := range outputBlocks {
bq := block.Querier()
for bq.Next() {
outputRefs = append(outputRefs, bq.At())
}
}
require.Equal(t, len(expectedRefs), len(outputRefs))
for i := range expectedRefs {
require.Equal(t, expectedRefs[i].Series, outputRefs[i].Series)
}
})
}
}

@ -135,6 +135,10 @@ func (bq *BlockQuerier) Schema() (Schema, error) {
return bq.block.Schema()
}
func (bq *BlockQuerier) Reset() error {
return bq.series.Seek(0)
}
func (bq *BlockQuerier) Seek(fp model.Fingerprint) error {
return bq.series.Seek(fp)
}

@ -15,7 +15,7 @@ import (
)
var (
DefaultBlockOptions = NewBlockOptions(4, 0)
DefaultBlockOptions = NewBlockOptions(4, 0, 50<<20) // 50MB
)
type BlockOptions struct {
@ -65,16 +65,19 @@ func (b BlockOptions) Encode(enc *encoding.Encbuf) {
type BlockBuilder struct {
opts BlockOptions
writer BlockWriter
index *IndexBuilder
blooms *BloomBlockBuilder
}
func NewBlockOptions(NGramLength, NGramSkip uint64) BlockOptions {
return NewBlockOptionsFromSchema(Schema{
func NewBlockOptions(NGramLength, NGramSkip, MaxBlockSizeBytes uint64) BlockOptions {
opts := NewBlockOptionsFromSchema(Schema{
version: byte(1),
nGramLength: NGramLength,
nGramSkip: NGramSkip,
})
opts.BlockSize = MaxBlockSizeBytes
return opts
}
func NewBlockOptionsFromSchema(s Schema) BlockOptions {
@ -98,6 +101,7 @@ func NewBlockBuilder(opts BlockOptions, writer BlockWriter) (*BlockBuilder, erro
return &BlockBuilder{
opts: opts,
writer: writer,
index: NewIndexBuilder(opts, index),
blooms: NewBloomBlockBuilder(opts, blooms),
}, nil
@ -110,10 +114,13 @@ type SeriesWithBloom struct {
func (b *BlockBuilder) BuildFrom(itr Iterator[SeriesWithBloom]) (uint32, error) {
for itr.Next() {
if err := b.AddSeries(itr.At()); err != nil {
blockFull, err := b.AddSeries(itr.At())
if err != nil {
return 0, err
}
if blockFull {
break
}
}
if err := itr.Err(); err != nil {
@ -135,20 +142,40 @@ func (b *BlockBuilder) Close() (uint32, error) {
return combineChecksums(indexCheckSum, bloomChecksum), nil
}
func (b *BlockBuilder) AddSeries(series SeriesWithBloom) error {
// AddSeries adds a series to the block. It returns true after adding the series, the block is full.
func (b *BlockBuilder) AddSeries(series SeriesWithBloom) (bool, error) {
offset, err := b.blooms.Append(series)
if err != nil {
return errors.Wrapf(err, "writing bloom for series %v", series.Series.Fingerprint)
return false, errors.Wrapf(err, "writing bloom for series %v", series.Series.Fingerprint)
}
if err := b.index.Append(SeriesWithOffset{
Offset: offset,
Series: *series.Series,
}); err != nil {
return errors.Wrapf(err, "writing index for series %v", series.Series.Fingerprint)
return false, errors.Wrapf(err, "writing index for series %v", series.Series.Fingerprint)
}
return nil
full, err := b.isBlockFull()
if err != nil {
return false, errors.Wrap(err, "checking if block is full")
}
return full, nil
}
func (b *BlockBuilder) isBlockFull() (bool, error) {
// if the block size is 0, the max size is unlimited
if b.opts.BlockSize == 0 {
return false, nil
}
size, err := b.writer.Size()
if err != nil {
return false, errors.Wrap(err, "getting block size")
}
return uint64(size) >= b.opts.BlockSize, nil
}
type BloomBlockBuilder struct {
@ -505,7 +532,11 @@ type MergeBuilder struct {
// 1. merges multiple blocks into a single ordered querier,
// i) When two blocks have the same series, it will prefer the one with the most chunks already indexed
// 2. iterates through the store, adding chunks to the relevant blooms via the `populate` argument
func NewMergeBuilder(blocks []PeekingIterator[*SeriesWithBloom], store Iterator[*Series], populate func(*Series, *Bloom) error) *MergeBuilder {
func NewMergeBuilder(
blocks []PeekingIterator[*SeriesWithBloom],
store Iterator[*Series],
populate func(*Series, *Bloom) error,
) *MergeBuilder {
return &MergeBuilder{
blocks: blocks,
store: store,
@ -513,8 +544,6 @@ func NewMergeBuilder(blocks []PeekingIterator[*SeriesWithBloom], store Iterator[
}
}
// NB: this will build one block. Ideally we would build multiple blocks once a target size threshold is met
// but this gives us a good starting point.
func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) {
var (
nextInBlocks *SeriesWithBloom
@ -585,9 +614,13 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) {
}
}
if err := builder.AddSeries(*cur); err != nil {
blockFull, err := builder.AddSeries(*cur)
if err != nil {
return 0, errors.Wrap(err, "adding series to block")
}
if blockFull {
break
}
}
checksum, err := builder.Close()

@ -3,6 +3,7 @@ package v1
import (
"bytes"
"errors"
"sort"
"testing"
"github.com/prometheus/common/model"
@ -48,9 +49,11 @@ func TestBlockBuilderRoundTrip(t *testing.T) {
tmpDir := t.TempDir()
for _, tc := range []struct {
desc string
writer BlockWriter
reader BlockReader
desc string
writer BlockWriter
reader BlockReader
maxBlockSize uint64
iterHasPendingData bool
}{
{
desc: "in-memory",
@ -62,6 +65,14 @@ func TestBlockBuilderRoundTrip(t *testing.T) {
writer: NewDirectoryBlockWriter(tmpDir),
reader: NewDirectoryBlockReader(tmpDir),
},
{
desc: "max block size",
writer: NewDirectoryBlockWriter(tmpDir),
reader: NewDirectoryBlockReader(tmpDir),
// Set max block big enough to fit a bunch of series but not all of them
maxBlockSize: 50 << 10,
iterHasPendingData: true,
},
} {
t.Run(tc.desc, func(t *testing.T) {
schema := Schema{
@ -76,14 +87,27 @@ func TestBlockBuilderRoundTrip(t *testing.T) {
Schema: schema,
SeriesPageSize: 100,
BloomPageSize: 10 << 10,
BlockSize: tc.maxBlockSize,
},
tc.writer,
)
require.Nil(t, err)
itr := NewSliceIter[SeriesWithBloom](data)
itr := NewPeekingIter[SeriesWithBloom](NewSliceIter[SeriesWithBloom](data))
_, err = builder.BuildFrom(itr)
require.Nil(t, err)
firstPendingSeries, iterHasPendingData := itr.Peek()
require.Equal(t, tc.iterHasPendingData, iterHasPendingData)
processedData := data
if iterHasPendingData {
lastProcessedIdx := sort.Search(len(data), func(i int) bool {
return data[i].Series.Fingerprint >= firstPendingSeries.Series.Fingerprint
})
processedData = data[:lastProcessedIdx]
}
block := NewBlock(tc.reader)
querier := NewBlockQuerier(block)
@ -91,10 +115,11 @@ func TestBlockBuilderRoundTrip(t *testing.T) {
require.Nil(t, err)
require.Equal(t, block.blooms.schema, schema)
for i := 0; i < len(data); i++ {
// Check processed data can be queried
for i := 0; i < len(processedData); i++ {
require.Equal(t, true, querier.Next(), "on iteration %d with error %v", i, querier.Err())
got := querier.At()
require.Equal(t, data[i].Series, got.Series)
require.Equal(t, processedData[i].Series, got.Series)
for _, key := range keys[i] {
require.True(t, got.Bloom.Test(key))
}
@ -104,20 +129,22 @@ func TestBlockBuilderRoundTrip(t *testing.T) {
require.False(t, querier.Next())
// test seek
i := numSeries / 2
halfData := data[i:]
halfKeys := keys[i:]
require.Nil(t, querier.Seek(halfData[0].Series.Fingerprint))
for j := 0; j < len(halfData); j++ {
require.Equal(t, true, querier.Next(), "on iteration %d", j)
got := querier.At()
require.Equal(t, halfData[j].Series, got.Series)
for _, key := range halfKeys[j] {
require.True(t, got.Bloom.Test(key))
if !iterHasPendingData {
i := numSeries / 2
halfData := data[i:]
halfKeys := keys[i:]
require.NoError(t, querier.Seek(halfData[0].Series.Fingerprint))
for j := 0; j < len(halfData); j++ {
require.Equal(t, true, querier.Next(), "on iteration %d", j)
got := querier.At()
require.Equal(t, halfData[j].Series, got.Series)
for _, key := range halfKeys[j] {
require.True(t, got.Bloom.Test(key))
}
require.NoError(t, querier.Err())
}
require.NoError(t, querier.Err())
require.False(t, querier.Next())
}
require.False(t, querier.Next())
})
}
@ -357,7 +384,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) {
return nil
},
)
builder, err := NewBlockBuilder(NewBlockOptions(4, 0), writer)
builder, err := NewBlockBuilder(DefaultBlockOptions, writer)
require.Nil(t, err)
checksum, err := mb.Build(builder)

@ -36,7 +36,8 @@ func TestFusedQuerier(t *testing.T) {
require.Nil(t, err)
itr := NewSliceIter[SeriesWithBloom](data)
_, err = builder.BuildFrom(itr)
require.Nil(t, err)
require.NoError(t, err)
require.False(t, itr.Next())
block := NewBlock(reader)
querier := NewBlockQuerier(block)

@ -258,7 +258,7 @@ type PeekCloseIter[T any] struct {
}
func NewPeekCloseIter[T any](itr CloseableIterator[T]) *PeekCloseIter[T] {
return &PeekCloseIter[T]{PeekIter: NewPeekingIter(itr), close: itr.Close}
return &PeekCloseIter[T]{PeekIter: NewPeekingIter[T](itr), close: itr.Close}
}
func (it *PeekCloseIter[T]) Close() error {

Loading…
Cancel
Save