Pull/11950 ammendments (#11972)

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
Co-authored-by: Christian Haudum <christian.haudum@gmail.com>
pull/11973/head
Owen Diehl 1 year ago committed by GitHub
parent 543aaab055
commit 87ae2efe41
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 367
      pkg/bloomcompactor/batch.go
  2. 211
      pkg/bloomcompactor/batch_test.go
  3. 44
      pkg/bloomcompactor/controller.go
  4. 239
      pkg/bloomcompactor/spec.go
  5. 177
      pkg/bloomcompactor/spec_test.go
  6. 23
      pkg/storage/bloom/v1/builder.go
  7. 21
      pkg/storage/bloom/v1/builder_test.go
  8. 32
      pkg/storage/bloom/v1/util.go
  9. 7
      pkg/storage/stores/shipper/bloomshipper/cache.go

@ -2,94 +2,359 @@ package bloomcompactor
import (
"context"
"io"
"math"
"time"
"github.com/grafana/dskit/multierror"
"golang.org/x/exp/slices"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/logproto"
logql_log "github.com/grafana/loki/pkg/logql/log"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
)
// interface modeled from `pkg/storage/stores/shipper/bloomshipper.Fetcher`
type blocksFetcher interface {
FetchBlocks(context.Context, []bloomshipper.BlockRef) ([]*bloomshipper.CloseableBlockQuerier, error)
type Fetcher[A, B any] interface {
Fetch(ctx context.Context, inputs []A) ([]B, error)
}
func newBatchedBlockLoader(ctx context.Context, fetcher blocksFetcher, blocks []bloomshipper.BlockRef) (*batchedBlockLoader, error) {
return &batchedBlockLoader{
ctx: ctx,
batchSize: 10, // make configurable?
source: blocks,
fetcher: fetcher,
}, nil
type FetchFunc[A, B any] func(ctx context.Context, inputs []A) ([]B, error)
func (f FetchFunc[A, B]) Fetch(ctx context.Context, inputs []A) ([]B, error) {
return f(ctx, inputs)
}
type batchedBlockLoader struct {
ctx context.Context
// batchedLoader implements `v1.Iterator[C]` in batches
type batchedLoader[A, B, C any] struct {
metrics *Metrics
batchSize int
ctx context.Context
fetchers []Fetcher[A, B]
work [][]A
mapper func(B) (C, error)
cur C
batch []B
err error
}
const batchedLoaderDefaultBatchSize = 50
func newBatchedLoader[A, B, C any](
ctx context.Context,
fetchers []Fetcher[A, B],
inputs [][]A,
mapper func(B) (C, error),
batchSize int,
) *batchedLoader[A, B, C] {
return &batchedLoader[A, B, C]{
batchSize: max(batchSize, 1),
ctx: ctx,
fetchers: fetchers,
work: inputs,
mapper: mapper,
}
}
func (b *batchedLoader[A, B, C]) Next() bool {
// iterate work until we have non-zero length batch
for len(b.batch) == 0 {
// empty batch + no work remaining = we're done
if len(b.work) == 0 {
return false
}
// setup next batch
next := b.work[0]
batchSize := min(b.batchSize, len(next))
toFetch := next[:batchSize]
fetcher := b.fetchers[0]
// update work
b.work[0] = b.work[0][batchSize:]
if len(b.work[0]) == 0 {
// if we've exhausted work from this set of inputs,
// set pointer to next set of inputs
// and their respective fetcher
b.work = b.work[1:]
b.fetchers = b.fetchers[1:]
}
// there was no work in this batch; continue (should not happen)
if len(toFetch) == 0 {
continue
}
b.batch, b.err = fetcher.Fetch(b.ctx, toFetch)
// error fetching, short-circuit iteration
if b.err != nil {
return false
}
}
source []bloomshipper.BlockRef
fetcher blocksFetcher
return b.prepNext()
}
batch []*bloomshipper.CloseableBlockQuerier
cur *bloomshipper.CloseableBlockQuerier
err error
func (b *batchedLoader[_, B, C]) prepNext() bool {
b.cur, b.err = b.mapper(b.batch[0])
b.batch = b.batch[1:]
return b.err == nil
}
// At implements v1.CloseableIterator.
func (b *batchedBlockLoader) At() *bloomshipper.CloseableBlockQuerier {
func (b *batchedLoader[_, _, C]) At() C {
return b.cur
}
// Close implements v1.CloseableIterator.
func (b *batchedBlockLoader) Close() error {
if b.cur != nil {
return b.cur.Close()
func (b *batchedLoader[_, _, _]) Err() error {
return b.err
}
// to ensure memory is bounded while loading chunks
// TODO(owen-d): testware
func newBatchedChunkLoader(
ctx context.Context,
fetchers []Fetcher[chunk.Chunk, chunk.Chunk],
inputs [][]chunk.Chunk,
metrics *Metrics,
batchSize int,
) *batchedLoader[chunk.Chunk, chunk.Chunk, v1.ChunkRefWithIter] {
mapper := func(c chunk.Chunk) (v1.ChunkRefWithIter, error) {
chk := c.Data.(*chunkenc.Facade).LokiChunk()
metrics.chunkSize.Observe(float64(chk.UncompressedSize()))
itr, err := chk.Iterator(
ctx,
time.Unix(0, 0),
time.Unix(0, math.MaxInt64),
logproto.FORWARD,
logql_log.NewNoopPipeline().ForStream(c.Metric),
)
if err != nil {
return v1.ChunkRefWithIter{}, err
}
return v1.ChunkRefWithIter{
Ref: v1.ChunkRef{
Start: c.From,
End: c.Through,
Checksum: c.Checksum,
},
Itr: itr,
}, nil
}
return nil
return newBatchedLoader(ctx, fetchers, inputs, mapper, batchSize)
}
// CloseBatch closes the remaining items from the current batch
func (b *batchedBlockLoader) CloseBatch() error {
var err multierror.MultiError
for _, cur := range b.batch {
err.Add(cur.Close())
func newBatchedBlockLoader(
ctx context.Context,
fetcher Fetcher[bloomshipper.BlockRef, *bloomshipper.CloseableBlockQuerier],
blocks []bloomshipper.BlockRef,
batchSize int,
) *batchedLoader[bloomshipper.BlockRef, *bloomshipper.CloseableBlockQuerier, *bloomshipper.CloseableBlockQuerier] {
fetchers := []Fetcher[bloomshipper.BlockRef, *bloomshipper.CloseableBlockQuerier]{fetcher}
inputs := [][]bloomshipper.BlockRef{blocks}
mapper := func(a *bloomshipper.CloseableBlockQuerier) (*bloomshipper.CloseableBlockQuerier, error) {
return a, nil
}
if len(b.batch) > 0 {
b.batch = b.batch[:0]
return newBatchedLoader(ctx, fetchers, inputs, mapper, batchSize)
}
// compiler checks
var _ v1.Iterator[*v1.SeriesWithBloom] = &blockLoadingIter{}
var _ v1.CloseableIterator[*v1.SeriesWithBloom] = &blockLoadingIter{}
var _ v1.ResettableIterator[*v1.SeriesWithBloom] = &blockLoadingIter{}
// TODO(chaudum): testware
func newBlockLoadingIter(ctx context.Context, blocks []bloomshipper.BlockRef, fetcher FetchFunc[bloomshipper.BlockRef, *bloomshipper.CloseableBlockQuerier], batchSize int) *blockLoadingIter {
return &blockLoadingIter{
ctx: ctx,
fetcher: fetcher,
inputs: blocks,
batchSize: batchSize,
loaded: make(map[io.Closer]struct{}),
}
}
type blockLoadingIter struct {
// constructor arguments
ctx context.Context
fetcher Fetcher[bloomshipper.BlockRef, *bloomshipper.CloseableBlockQuerier]
inputs []bloomshipper.BlockRef
overlapping v1.Iterator[[]bloomshipper.BlockRef]
batchSize int
// optional arguments
filter func(*bloomshipper.CloseableBlockQuerier) bool
// internals
initialized bool
err error
iter v1.Iterator[*v1.SeriesWithBloom]
loader *batchedLoader[bloomshipper.BlockRef, *bloomshipper.CloseableBlockQuerier, *bloomshipper.CloseableBlockQuerier]
loaded map[io.Closer]struct{}
}
// At implements v1.Iterator.
func (i *blockLoadingIter) At() *v1.SeriesWithBloom {
if !i.initialized {
panic("iterator not initialized")
}
return i.iter.At()
}
// Err implements v1.Iterator.
func (i *blockLoadingIter) Err() error {
if !i.initialized {
panic("iterator not initialized")
}
if i.err != nil {
return i.err
}
return i.iter.Err()
}
// Next implements v1.Iterator.
func (i *blockLoadingIter) Next() bool {
i.init()
// next from current batch
hasNext := i.iter.Next()
if !hasNext && !i.loadNext() {
return false
}
// next from next batch
return i.iter.Next()
}
// Close implements v1.CloseableIterator.
func (i *blockLoadingIter) Close() error {
var err multierror.MultiError
for k := range i.loaded {
err.Add(k.Close())
}
return err.Err()
}
// Err implements v1.CloseableIterator.
func (b *batchedBlockLoader) Err() error {
return b.err
// Reset implements v1.ResettableIterator.
// TODO(chaudum) Cache already fetched blocks to to avoid the overhead of
// creating the reader.
func (i *blockLoadingIter) Reset() error {
if !i.initialized {
return nil
}
// close loaded queriers
err := i.Close()
i.initialized = false
clear(i.loaded)
return err
}
// Next implements v1.CloseableIterator.
func (b *batchedBlockLoader) Next() bool {
if len(b.batch) > 0 {
return b.setNext()
func (i *blockLoadingIter) init() {
if i.initialized {
return
}
if len(b.source) == 0 {
// group overlapping blocks
i.overlapping = overlappingBlocksIter(i.inputs)
// set "match all" filter function if not present
if i.filter == nil {
i.filter = func(cbq *bloomshipper.CloseableBlockQuerier) bool { return true }
}
// load first batch
i.loadNext()
// done
i.initialized = true
}
func (i *blockLoadingIter) Filter(filter func(*bloomshipper.CloseableBlockQuerier) bool) {
if i.initialized {
panic("iterator already initialized")
}
i.filter = filter
}
func (i *blockLoadingIter) loadNext() bool {
// check if there are more overlapping groups to load
if !i.overlapping.Next() {
return false
}
if i.overlapping.Err() != nil {
i.err = i.overlapping.Err()
return false
}
// setup next batch
batchSize := min(b.batchSize, len(b.source))
toFetch := b.source[:batchSize]
blockRefs := i.overlapping.At()
// update source
b.source = b.source[batchSize:]
loader := newBatchedBlockLoader(i.ctx, i.fetcher, blockRefs, i.batchSize)
filtered := v1.NewFilterIter[*bloomshipper.CloseableBlockQuerier](loader, i.filter)
b.batch, b.err = b.fetcher.FetchBlocks(b.ctx, toFetch)
if b.err != nil {
iters := make([]v1.PeekingIterator[*v1.SeriesWithBloom], 0, len(blockRefs))
for filtered.Next() && filtered.Err() == nil {
bq := loader.At()
if _, ok := i.loaded[bq]; !ok {
i.loaded[bq] = struct{}{}
}
iter, _ := bq.SeriesIter()
iters = append(iters, iter)
}
if loader.Err() != nil {
i.err = loader.Err()
return false
}
return b.setNext()
}
func (b *batchedBlockLoader) setNext() bool {
b.cur, b.err = b.batch[0], nil
b.batch = b.batch[1:]
if len(iters) == 0 {
i.iter = v1.NewEmptyIter[*v1.SeriesWithBloom]()
return true
}
// Turn the list of blocks into a single iterator that returns the next series
mergedBlocks := v1.NewHeapIterForSeriesWithBloom(iters...)
// two overlapping blocks can conceivably have the same series, so we need to dedupe,
// preferring the one with the most chunks already indexed since we'll have
// to add fewer chunks to the bloom
i.iter = v1.NewDedupingIter[*v1.SeriesWithBloom, *v1.SeriesWithBloom](
func(a, b *v1.SeriesWithBloom) bool {
return a.Series.Fingerprint == b.Series.Fingerprint
},
v1.Identity[*v1.SeriesWithBloom],
func(a, b *v1.SeriesWithBloom) *v1.SeriesWithBloom {
if len(a.Series.Chunks) > len(b.Series.Chunks) {
return a
}
return b
},
v1.NewPeekingIter(mergedBlocks),
)
return true
}
func overlappingBlocksIter(inputs []bloomshipper.BlockRef) v1.Iterator[[]bloomshipper.BlockRef] {
// can we assume sorted blocks?
peekIter := v1.NewPeekingIter(v1.NewSliceIter(inputs))
return v1.NewDedupingIter[bloomshipper.BlockRef, []bloomshipper.BlockRef](
func(a bloomshipper.BlockRef, b []bloomshipper.BlockRef) bool {
minFp := b[0].Bounds.Min
maxFp := slices.MaxFunc(b, func(a, b bloomshipper.BlockRef) int { return int(a.Bounds.Max - b.Bounds.Max) }).Bounds.Max
return a.Bounds.Overlaps(v1.NewBounds(minFp, maxFp))
},
func(a bloomshipper.BlockRef) []bloomshipper.BlockRef {
return []bloomshipper.BlockRef{a}
},
func(a bloomshipper.BlockRef, b []bloomshipper.BlockRef) []bloomshipper.BlockRef {
return append(b, a)
},
peekIter,
)
}

@ -2,36 +2,209 @@ package bloomcompactor
import (
"context"
"errors"
"testing"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
)
type dummyBlocksFetcher struct {
count *atomic.Int32
}
func TestBatchedLoader(t *testing.T) {
t.Parallel()
func (f *dummyBlocksFetcher) FetchBlocks(_ context.Context, blocks []bloomshipper.BlockRef) ([]*bloomshipper.CloseableBlockQuerier, error) {
f.count.Inc()
return make([]*bloomshipper.CloseableBlockQuerier, len(blocks)), nil
}
errMapper := func(i int) (int, error) {
return 0, errors.New("bzzt")
}
successMapper := func(i int) (int, error) {
return i, nil
}
func TestBatchedBlockLoader(t *testing.T) {
ctx := context.Background()
f := &dummyBlocksFetcher{count: atomic.NewInt32(0)}
expired, cancel := context.WithCancel(context.Background())
cancel()
blocks := make([]bloomshipper.BlockRef, 25)
blocksIter, err := newBatchedBlockLoader(ctx, f, blocks)
require.NoError(t, err)
for _, tc := range []struct {
desc string
ctx context.Context
batchSize int
mapper func(int) (int, error)
err bool
inputs [][]int
exp []int
}{
{
desc: "OneBatch",
ctx: context.Background(),
batchSize: 2,
mapper: successMapper,
err: false,
inputs: [][]int{{0, 1}},
exp: []int{0, 1},
},
{
desc: "ZeroBatchSizeStillWorks",
ctx: context.Background(),
batchSize: 0,
mapper: successMapper,
err: false,
inputs: [][]int{{0, 1}},
exp: []int{0, 1},
},
{
desc: "OneBatchLessThanFull",
ctx: context.Background(),
batchSize: 2,
mapper: successMapper,
err: false,
inputs: [][]int{{0}},
exp: []int{0},
},
{
desc: "TwoBatches",
ctx: context.Background(),
batchSize: 2,
mapper: successMapper,
err: false,
inputs: [][]int{{0, 1, 2, 3}},
exp: []int{0, 1, 2, 3},
},
{
desc: "MultipleBatchesMultipleLoaders",
ctx: context.Background(),
batchSize: 2,
mapper: successMapper,
err: false,
inputs: [][]int{{0, 1}, {2}, {3, 4, 5}},
exp: []int{0, 1, 2, 3, 4, 5},
},
{
desc: "HandlesEmptyInputs",
ctx: context.Background(),
batchSize: 2,
mapper: successMapper,
err: false,
inputs: [][]int{{0, 1, 2, 3}, nil, {4}},
exp: []int{0, 1, 2, 3, 4},
},
{
desc: "Timeout",
ctx: expired,
batchSize: 2,
mapper: successMapper,
err: true,
inputs: [][]int{{0}},
},
{
desc: "MappingFailure",
ctx: context.Background(),
batchSize: 2,
mapper: errMapper,
err: true,
inputs: [][]int{{0}},
},
} {
tc := tc
t.Run(tc.desc, func(t *testing.T) {
fetchers := make([]Fetcher[int, int], 0, len(tc.inputs))
for range tc.inputs {
fetchers = append(
fetchers,
FetchFunc[int, int](func(ctx context.Context, xs []int) ([]int, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
return xs, nil
}),
)
}
var count int
for blocksIter.Next() && blocksIter.Err() == nil {
count++
loader := newBatchedLoader[int, int, int](
tc.ctx,
fetchers,
tc.inputs,
tc.mapper,
tc.batchSize,
)
got, err := v1.Collect[int](loader)
if tc.err {
require.Error(t, err)
return
}
require.NoError(t, err)
require.Equal(t, tc.exp, got)
})
}
}
require.Equal(t, len(blocks), count)
require.Equal(t, int32(len(blocks)/blocksIter.batchSize+1), f.count.Load())
func TestOverlappingBlocksIter(t *testing.T) {
t.Parallel()
for _, tc := range []struct {
desc string
inp []bloomshipper.BlockRef
exp int // expected groups
}{
{
desc: "Empty",
inp: []bloomshipper.BlockRef{},
exp: 0,
},
{
desc: "NonOverlapping",
inp: []bloomshipper.BlockRef{
genBlockRef(0x0000, 0x00ff),
genBlockRef(0x0100, 0x01ff),
genBlockRef(0x0200, 0x02ff),
},
exp: 3,
},
{
desc: "AllOverlapping",
inp: []bloomshipper.BlockRef{
genBlockRef(0x0000, 0x02ff), // |-----------|
genBlockRef(0x0100, 0x01ff), // |---|
genBlockRef(0x0200, 0x02ff), // |---|
},
exp: 1,
},
{
desc: "PartialOverlapping",
inp: []bloomshipper.BlockRef{
genBlockRef(0x0000, 0x01ff), // group 1 |-------|
genBlockRef(0x0100, 0x02ff), // group 1 |-------|
genBlockRef(0x0200, 0x03ff), // group 1 |-------|
genBlockRef(0x0200, 0x02ff), // group 1 |---|
},
exp: 1,
},
{
desc: "PartialOverlapping",
inp: []bloomshipper.BlockRef{
genBlockRef(0x0000, 0x01ff), // group 1 |-------|
genBlockRef(0x0100, 0x02ff), // group 1 |-------|
genBlockRef(0x0100, 0x01ff), // group 1 |---|
genBlockRef(0x0300, 0x03ff), // group 2 |---|
genBlockRef(0x0310, 0x03ff), // group 2 |-|
},
exp: 2,
},
} {
tc := tc
t.Run(tc.desc, func(t *testing.T) {
it := overlappingBlocksIter(tc.inp)
var overlapping [][]bloomshipper.BlockRef
var i int
for it.Next() && it.Err() == nil {
require.NotNil(t, it.At())
overlapping = append(overlapping, it.At())
for _, r := range it.At() {
t.Log(i, r)
}
i++
}
require.Equal(t, tc.exp, len(overlapping))
})
}
}

@ -4,12 +4,10 @@ import (
"bytes"
"context"
"fmt"
"io"
"sort"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/multierror"
"github.com/pkg/errors"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
@ -221,7 +219,7 @@ func (s *SimpleBloomController) loadWorkForGap(
tenant string,
id tsdb.Identifier,
gap gapWithBlocks,
) (v1.CloseableIterator[*v1.Series], v1.CloseableIterator[*bloomshipper.CloseableBlockQuerier], error) {
) (v1.CloseableIterator[*v1.Series], v1.CloseableResettableIterator[*v1.SeriesWithBloom], error) {
// load a series iterator for the gap
seriesItr, err := s.tsdbStore.LoadTSDB(ctx, table, tenant, id, gap.bounds)
if err != nil {
@ -234,10 +232,8 @@ func (s *SimpleBloomController) loadWorkForGap(
return nil, nil, errors.Wrap(err, "failed to get fetcher")
}
blocksIter, err := newBatchedBlockLoader(ctx, fetcher, gap.blocks)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to load blocks")
}
f := FetchFunc[bloomshipper.BlockRef, *bloomshipper.CloseableBlockQuerier](fetcher.FetchBlocks)
blocksIter := newBlockLoadingIter(ctx, gap.blocks, f, 10)
return seriesItr, blocksIter, nil
}
@ -311,10 +307,10 @@ func (s *SimpleBloomController) buildGaps(
log.With(logger, "tsdb", plan.tsdb.Name(), "ownership", gap),
)
_, loaded, newBlocks, err := gen.Generate(ctx)
newBlocks := gen.Generate(ctx)
if err != nil {
level.Error(logger).Log("msg", "failed to generate bloom", "err", err)
s.closeLoadedBlocks(loaded, blocksIter)
blocksIter.Close()
return nil, errors.Wrap(err, "failed to generate bloom")
}
@ -325,6 +321,7 @@ func (s *SimpleBloomController) buildGaps(
built, err := bloomshipper.BlockFrom(tenant, table.Addr(), blk)
if err != nil {
level.Error(logger).Log("msg", "failed to build block", "err", err)
blocksIter.Close()
return nil, errors.Wrap(err, "failed to build block")
}
@ -333,7 +330,7 @@ func (s *SimpleBloomController) buildGaps(
built,
); err != nil {
level.Error(logger).Log("msg", "failed to write block", "err", err)
s.closeLoadedBlocks(loaded, blocksIter)
blocksIter.Close()
return nil, errors.Wrap(err, "failed to write block")
}
@ -342,12 +339,11 @@ func (s *SimpleBloomController) buildGaps(
if err := newBlocks.Err(); err != nil {
level.Error(logger).Log("msg", "failed to generate bloom", "err", err)
s.closeLoadedBlocks(loaded, blocksIter)
return nil, errors.Wrap(err, "failed to generate bloom")
}
// Close pre-existing blocks
s.closeLoadedBlocks(loaded, blocksIter)
blocksIter.Close()
// Write the new meta
ref, err := bloomshipper.MetaRefFrom(tenant, table.Addr(), gap.bounds, meta.Sources, meta.Blocks)
@ -485,30 +481,6 @@ func tsdbsStrictlyNewer(as, bs []tsdb.SingleTenantTSDBIdentifier) bool {
return true
}
func (s *SimpleBloomController) closeLoadedBlocks(toClose []io.Closer, it v1.CloseableIterator[*bloomshipper.CloseableBlockQuerier]) {
// close loaded blocks
var err multierror.MultiError
for _, closer := range toClose {
err.Add(closer.Close())
}
switch itr := it.(type) {
case *batchedBlockLoader:
// close remaining loaded blocks from batch
err.Add(itr.CloseBatch())
default:
// close remaining loaded blocks
for itr.Next() && itr.Err() == nil {
err.Add(itr.At().Close())
}
}
// log error
if err.Err() != nil {
level.Error(s.logger).Log("msg", "failed to close blocks", "err", err)
}
}
type gapWithBlocks struct {
bounds v1.FingerprintBounds
blocks []bloomshipper.BlockRef

@ -4,17 +4,13 @@ import (
"context"
"fmt"
"io"
"math"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/logproto"
logql_log "github.com/grafana/loki/pkg/logql/log"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/fetcher"
@ -48,7 +44,8 @@ type SimpleBloomGenerator struct {
userID string
store v1.Iterator[*v1.Series]
chunkLoader ChunkLoader
blocksIter v1.CloseableIterator[*bloomshipper.CloseableBlockQuerier]
blocksIter v1.ResettableIterator[*v1.SeriesWithBloom]
skipped []v1.BlockMetadata
// options to build blocks with
opts v1.BlockOptions
@ -70,7 +67,7 @@ func NewSimpleBloomGenerator(
opts v1.BlockOptions,
store v1.Iterator[*v1.Series],
chunkLoader ChunkLoader,
blocksIter v1.CloseableIterator[*bloomshipper.CloseableBlockQuerier],
blocksIter v1.ResettableIterator[*v1.SeriesWithBloom],
readWriterFn func() (v1.BlockWriter, v1.BlockReader),
metrics *Metrics,
logger log.Logger,
@ -107,44 +104,41 @@ func (s *SimpleBloomGenerator) populator(ctx context.Context) func(series *v1.Se
}
func (s *SimpleBloomGenerator) Generate(ctx context.Context) ([]v1.BlockMetadata, []io.Closer, v1.Iterator[*v1.Block], error) {
skippedBlocks := make([]v1.BlockMetadata, 0)
toClose := make([]io.Closer, 0)
blocksMatchingSchema := make([]*bloomshipper.CloseableBlockQuerier, 0)
func (s *SimpleBloomGenerator) Generate(ctx context.Context) v1.Iterator[*v1.Block] {
level.Debug(s.logger).Log("msg", "generating bloom filters for blocks", "schema", fmt.Sprintf("%+v", s.opts.Schema))
for s.blocksIter.Next() && s.blocksIter.Err() == nil {
block := s.blocksIter.At()
toClose = append(toClose, block)
logger := log.With(s.logger, "block", block.BlockRef)
md, err := block.Metadata()
schema := md.Options.Schema
if err != nil {
level.Warn(logger).Log("msg", "failed to get schema for block", "err", err)
skippedBlocks = append(skippedBlocks, md)
continue
}
if !s.opts.Schema.Compatible(schema) {
level.Warn(logger).Log("msg", "block schema incompatible with options", "generator_schema", fmt.Sprintf("%+v", s.opts.Schema), "block_schema", fmt.Sprintf("%+v", schema))
skippedBlocks = append(skippedBlocks, md)
continue
}
level.Debug(logger).Log("msg", "adding compatible block to bloom generation inputs")
blocksMatchingSchema = append(blocksMatchingSchema, block)
}
series := v1.NewPeekingIter(s.store)
if s.blocksIter.Err() != nil {
// should we ignore the error and continue with the blocks we got?
return skippedBlocks, toClose, v1.NewSliceIter([]*v1.Block{}), s.blocksIter.Err()
// TODO: Use interface
impl, ok := s.blocksIter.(*blockLoadingIter)
if ok {
impl.Filter(
func(bq *bloomshipper.CloseableBlockQuerier) bool {
logger := log.With(s.logger, "block", bq.BlockRef)
md, err := bq.Metadata()
schema := md.Options.Schema
if err != nil {
level.Warn(logger).Log("msg", "failed to get schema for block", "err", err)
s.skipped = append(s.skipped, md)
bq.Close() // close unused querier
return false
}
if !s.opts.Schema.Compatible(schema) {
level.Warn(logger).Log("msg", "block schema incompatible with options", "generator_schema", fmt.Sprintf("%+v", s.opts.Schema), "block_schema", fmt.Sprintf("%+v", schema))
s.skipped = append(s.skipped, md)
bq.Close() // close unused querier
return false
}
level.Debug(logger).Log("msg", "adding compatible block to bloom generation inputs")
return true
},
)
}
level.Debug(s.logger).Log("msg", "generating bloom filters for blocks", "num_blocks", len(blocksMatchingSchema), "skipped_blocks", len(skippedBlocks), "schema", fmt.Sprintf("%+v", s.opts.Schema))
series := v1.NewPeekingIter(s.store)
blockIter := NewLazyBlockBuilderIterator(ctx, s.opts, s.populator(ctx), s.readWriterFn, series, blocksMatchingSchema)
return skippedBlocks, toClose, blockIter, nil
return NewLazyBlockBuilderIterator(ctx, s.opts, s.populator(ctx), s.readWriterFn, series, s.blocksIter)
}
// LazyBlockBuilderIterator is a lazy iterator over blocks that builds
@ -155,11 +149,10 @@ type LazyBlockBuilderIterator struct {
populate func(*v1.Series, *v1.Bloom) error
readWriterFn func() (v1.BlockWriter, v1.BlockReader)
series v1.PeekingIterator[*v1.Series]
blocks []*bloomshipper.CloseableBlockQuerier
blocks v1.ResettableIterator[*v1.SeriesWithBloom]
blocksAsPeekingIter []v1.PeekingIterator[*v1.SeriesWithBloom]
curr *v1.Block
err error
curr *v1.Block
err error
}
func NewLazyBlockBuilderIterator(
@ -168,20 +161,16 @@ func NewLazyBlockBuilderIterator(
populate func(*v1.Series, *v1.Bloom) error,
readWriterFn func() (v1.BlockWriter, v1.BlockReader),
series v1.PeekingIterator[*v1.Series],
blocks []*bloomshipper.CloseableBlockQuerier,
blocks v1.ResettableIterator[*v1.SeriesWithBloom],
) *LazyBlockBuilderIterator {
it := &LazyBlockBuilderIterator{
return &LazyBlockBuilderIterator{
ctx: ctx,
opts: opts,
populate: populate,
readWriterFn: readWriterFn,
series: series,
blocks: blocks,
blocksAsPeekingIter: make([]v1.PeekingIterator[*v1.SeriesWithBloom], len(blocks)),
}
return it
}
func (b *LazyBlockBuilderIterator) Next() bool {
@ -190,21 +179,17 @@ func (b *LazyBlockBuilderIterator) Next() bool {
return false
}
// reset all the blocks to the start
for i, block := range b.blocks {
if err := block.Reset(); err != nil {
b.err = errors.Wrapf(err, "failed to reset block iterator %d", i)
return false
}
b.blocksAsPeekingIter[i] = v1.NewPeekingIter[*v1.SeriesWithBloom](block)
}
if err := b.ctx.Err(); err != nil {
b.err = errors.Wrap(err, "context canceled")
return false
}
mergeBuilder := v1.NewMergeBuilder(b.blocksAsPeekingIter, b.series, b.populate)
if err := b.blocks.Reset(); err != nil {
b.err = errors.Wrap(err, "reset blocks iterator")
return false
}
mergeBuilder := v1.NewMergeBuilder(b.blocks, b.series, b.populate)
writer, reader := b.readWriterFn()
blockBuilder, err := v1.NewBlockBuilder(b.opts, writer)
if err != nil {
@ -292,137 +277,3 @@ func (s *StoreChunkLoader) Load(ctx context.Context, userID string, series *v1.S
itr: newBatchedChunkLoader(ctx, fetchers, inputs, s.metrics, batchedLoaderDefaultBatchSize),
}, nil
}
type Fetcher[A, B any] interface {
Fetch(ctx context.Context, inputs []A) ([]B, error)
}
type FetchFunc[A, B any] func(ctx context.Context, inputs []A) ([]B, error)
func (f FetchFunc[A, B]) Fetch(ctx context.Context, inputs []A) ([]B, error) {
return f(ctx, inputs)
}
// batchedLoader implements `v1.Iterator[v1.ChunkRefWithIter]` in batches
// to ensure memory is bounded while loading chunks
// TODO(owen-d): testware
type batchedLoader[A, B, C any] struct {
metrics *Metrics
batchSize int
ctx context.Context
fetchers []Fetcher[A, B]
work [][]A
mapper func(B) (C, error)
cur C
batch []B
err error
}
const batchedLoaderDefaultBatchSize = 50
func newBatchedLoader[A, B, C any](
ctx context.Context,
fetchers []Fetcher[A, B],
inputs [][]A,
mapper func(B) (C, error),
batchSize int,
) *batchedLoader[A, B, C] {
return &batchedLoader[A, B, C]{
batchSize: max(batchSize, 1),
ctx: ctx,
fetchers: fetchers,
work: inputs,
mapper: mapper,
}
}
func (b *batchedLoader[A, B, C]) Next() bool {
// iterate work until we have non-zero length batch
for len(b.batch) == 0 {
// empty batch + no work remaining = we're done
if len(b.work) == 0 {
return false
}
// setup next batch
next := b.work[0]
batchSize := min(b.batchSize, len(next))
toFetch := next[:batchSize]
fetcher := b.fetchers[0]
// update work
b.work[0] = b.work[0][batchSize:]
if len(b.work[0]) == 0 {
// if we've exhausted work from this set of inputs,
// set pointer to next set of inputs
// and their respective fetcher
b.work = b.work[1:]
b.fetchers = b.fetchers[1:]
}
// there was no work in this batch; continue (should not happen)
if len(toFetch) == 0 {
continue
}
b.batch, b.err = fetcher.Fetch(b.ctx, toFetch)
// error fetching, short-circuit iteration
if b.err != nil {
return false
}
}
return b.prepNext()
}
func (b *batchedLoader[_, B, C]) prepNext() bool {
b.cur, b.err = b.mapper(b.batch[0])
b.batch = b.batch[1:]
return b.err == nil
}
func newBatchedChunkLoader(
ctx context.Context,
fetchers []Fetcher[chunk.Chunk, chunk.Chunk],
inputs [][]chunk.Chunk,
metrics *Metrics,
batchSize int,
) *batchedLoader[chunk.Chunk, chunk.Chunk, v1.ChunkRefWithIter] {
mapper := func(c chunk.Chunk) (v1.ChunkRefWithIter, error) {
chk := c.Data.(*chunkenc.Facade).LokiChunk()
metrics.chunkSize.Observe(float64(chk.UncompressedSize()))
itr, err := chk.Iterator(
ctx,
time.Unix(0, 0),
time.Unix(0, math.MaxInt64),
logproto.FORWARD,
logql_log.NewNoopPipeline().ForStream(c.Metric),
)
if err != nil {
return v1.ChunkRefWithIter{}, err
}
return v1.ChunkRefWithIter{
Ref: v1.ChunkRef{
Start: c.From,
End: c.Through,
Checksum: c.Checksum,
},
Itr: itr,
}, nil
}
return newBatchedLoader(ctx, fetchers, inputs, mapper, batchSize)
}
func (b *batchedLoader[_, _, C]) At() C {
return b.cur
}
func (b *batchedLoader[_, _, _]) Err() error {
return b.err
}

@ -3,7 +3,6 @@ package bloomcompactor
import (
"bytes"
"context"
"errors"
"testing"
"github.com/go-kit/log"
@ -14,20 +13,19 @@ import (
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
)
func blocksFromSchema(t *testing.T, n int, options v1.BlockOptions) (res []*v1.Block, data []v1.SeriesWithBloom) {
func blocksFromSchema(t *testing.T, n int, options v1.BlockOptions) (res []*v1.Block, data []v1.SeriesWithBloom, refs []bloomshipper.BlockRef) {
return blocksFromSchemaWithRange(t, n, options, 0, 0xffff)
}
// splits 100 series across `n` non-overlapping blocks.
// uses options to build blocks with.
func blocksFromSchemaWithRange(t *testing.T, n int, options v1.BlockOptions, fromFP, throughFp model.Fingerprint) (res []*v1.Block, data []v1.SeriesWithBloom) {
func blocksFromSchemaWithRange(t *testing.T, n int, options v1.BlockOptions, fromFP, throughFp model.Fingerprint) (res []*v1.Block, data []v1.SeriesWithBloom, refs []bloomshipper.BlockRef) {
if 100%n != 0 {
panic("100 series must be evenly divisible by n")
}
numSeries := 100
numKeysPerSeries := 10000
data, _ = v1.MkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, fromFP, throughFp, 0, 10000)
data, _ = v1.MkBasicSeriesWithBlooms(numSeries, 0, fromFP, throughFp, 0, 10000)
seriesPerBlock := numSeries / n
@ -44,14 +42,19 @@ func blocksFromSchemaWithRange(t *testing.T, n int, options v1.BlockOptions, fro
)
require.Nil(t, err)
itr := v1.NewSliceIter[v1.SeriesWithBloom](data[i*seriesPerBlock : (i+1)*seriesPerBlock])
minIdx, maxIdx := i*seriesPerBlock, (i+1)*seriesPerBlock
itr := v1.NewSliceIter[v1.SeriesWithBloom](data[minIdx:maxIdx])
_, err = builder.BuildFrom(itr)
require.Nil(t, err)
res = append(res, v1.NewBlock(reader))
ref := genBlockRef(data[minIdx].Series.Fingerprint, data[maxIdx-1].Series.Fingerprint)
t.Log("create block", ref)
refs = append(refs, ref)
}
return res, data
return res, data, refs
}
// doesn't actually load any chunks
@ -64,14 +67,30 @@ func (dummyChunkLoader) Load(_ context.Context, _ string, series *v1.Series) (*C
}, nil
}
func dummyBloomGen(opts v1.BlockOptions, store v1.Iterator[*v1.Series], blocks []*v1.Block) *SimpleBloomGenerator {
func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Series], blocks []*v1.Block, refs []bloomshipper.BlockRef) *SimpleBloomGenerator {
bqs := make([]*bloomshipper.CloseableBlockQuerier, 0, len(blocks))
for _, b := range blocks {
for i, b := range blocks {
bqs = append(bqs, &bloomshipper.CloseableBlockQuerier{
BlockRef: refs[i],
BlockQuerier: v1.NewBlockQuerier(b),
})
}
blocksIter := v1.NewCloseableIterator(v1.NewSliceIter(bqs))
fetcher := func(_ context.Context, refs []bloomshipper.BlockRef) ([]*bloomshipper.CloseableBlockQuerier, error) {
res := make([]*bloomshipper.CloseableBlockQuerier, 0, len(refs))
for _, ref := range refs {
for _, bq := range bqs {
if ref.Bounds.Equal(bq.Bounds) {
res = append(res, bq)
}
}
}
t.Log("req", refs)
t.Log("res", res)
return res, nil
}
blocksIter := newBlockLoadingIter(context.Background(), refs, FetchFunc[bloomshipper.BlockRef, *bloomshipper.CloseableBlockQuerier](fetcher), 1)
return NewSimpleBloomGenerator(
"fake",
@ -95,6 +114,7 @@ func TestSimpleBloomGenerator(t *testing.T) {
desc string
fromSchema, toSchema v1.BlockOptions
sourceBlocks, numSkipped, outputBlocks int
overlapping bool
}{
{
desc: "SkipsIncompatibleSchemas",
@ -118,11 +138,11 @@ func TestSimpleBloomGenerator(t *testing.T) {
toSchema: v1.NewBlockOptions(4, 0, 1<<10), // 1KB
sourceBlocks: 2,
numSkipped: 0,
outputBlocks: 3,
outputBlocks: 6,
},
} {
t.Run(tc.desc, func(t *testing.T) {
sourceBlocks, data := blocksFromSchema(t, tc.sourceBlocks, tc.fromSchema)
sourceBlocks, data, refs := blocksFromSchemaWithRange(t, tc.sourceBlocks, tc.fromSchema, 0x00000, 0x6ffff)
storeItr := v1.NewMapIter[v1.SeriesWithBloom, *v1.Series](
v1.NewSliceIter[v1.SeriesWithBloom](data),
func(swb v1.SeriesWithBloom) *v1.Series {
@ -130,16 +150,15 @@ func TestSimpleBloomGenerator(t *testing.T) {
},
)
gen := dummyBloomGen(tc.toSchema, storeItr, sourceBlocks)
skipped, _, results, err := gen.Generate(context.Background())
require.Nil(t, err)
require.Equal(t, tc.numSkipped, len(skipped))
gen := dummyBloomGen(t, tc.toSchema, storeItr, sourceBlocks, refs)
results := gen.Generate(context.Background())
var outputBlocks []*v1.Block
for results.Next() {
outputBlocks = append(outputBlocks, results.At())
}
require.Equal(t, tc.outputBlocks, len(outputBlocks))
require.Equal(t, tc.numSkipped, len(gen.skipped))
// Check all the input series are present in the output blocks.
expectedRefs := v1.PointerSlice(data)
@ -157,129 +176,3 @@ func TestSimpleBloomGenerator(t *testing.T) {
})
}
}
func TestBatchedLoader(t *testing.T) {
errMapper := func(i int) (int, error) {
return 0, errors.New("bzzt")
}
successMapper := func(i int) (int, error) {
return i, nil
}
expired, cancel := context.WithCancel(context.Background())
cancel()
for _, tc := range []struct {
desc string
ctx context.Context
batchSize int
mapper func(int) (int, error)
err bool
inputs [][]int
exp []int
}{
{
desc: "OneBatch",
ctx: context.Background(),
batchSize: 2,
mapper: successMapper,
err: false,
inputs: [][]int{{0, 1}},
exp: []int{0, 1},
},
{
desc: "ZeroBatchSizeStillWorks",
ctx: context.Background(),
batchSize: 0,
mapper: successMapper,
err: false,
inputs: [][]int{{0, 1}},
exp: []int{0, 1},
},
{
desc: "OneBatchLessThanFull",
ctx: context.Background(),
batchSize: 2,
mapper: successMapper,
err: false,
inputs: [][]int{{0}},
exp: []int{0},
},
{
desc: "TwoBatches",
ctx: context.Background(),
batchSize: 2,
mapper: successMapper,
err: false,
inputs: [][]int{{0, 1, 2, 3}},
exp: []int{0, 1, 2, 3},
},
{
desc: "MultipleBatchesMultipleLoaders",
ctx: context.Background(),
batchSize: 2,
mapper: successMapper,
err: false,
inputs: [][]int{{0, 1}, {2}, {3, 4, 5}},
exp: []int{0, 1, 2, 3, 4, 5},
},
{
desc: "HandlesEmptyInputs",
ctx: context.Background(),
batchSize: 2,
mapper: successMapper,
err: false,
inputs: [][]int{{0, 1, 2, 3}, nil, {4}},
exp: []int{0, 1, 2, 3, 4},
},
{
desc: "Timeout",
ctx: expired,
batchSize: 2,
mapper: successMapper,
err: true,
inputs: [][]int{{0}},
},
{
desc: "MappingFailure",
ctx: context.Background(),
batchSize: 2,
mapper: errMapper,
err: true,
inputs: [][]int{{0}},
},
} {
t.Run(tc.desc, func(t *testing.T) {
fetchers := make([]Fetcher[int, int], 0, len(tc.inputs))
for range tc.inputs {
fetchers = append(
fetchers,
FetchFunc[int, int](func(ctx context.Context, xs []int) ([]int, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
return xs, nil
}),
)
}
loader := newBatchedLoader[int, int, int](
tc.ctx,
fetchers,
tc.inputs,
tc.mapper,
tc.batchSize,
)
got, err := v1.Collect[int](loader)
if tc.err {
require.Error(t, err)
return
}
require.NoError(t, err)
require.Equal(t, tc.exp, got)
})
}
}

@ -521,7 +521,7 @@ func (b *IndexBuilder) Close() (uint32, error) {
// from a list of blocks and a store of series.
type MergeBuilder struct {
// existing blocks
blocks []PeekingIterator[*SeriesWithBloom]
blocks Iterator[*SeriesWithBloom]
// store
store Iterator[*Series]
// Add chunks to a bloom
@ -533,7 +533,7 @@ type MergeBuilder struct {
// i) When two blocks have the same series, it will prefer the one with the most chunks already indexed
// 2. iterates through the store, adding chunks to the relevant blooms via the `populate` argument
func NewMergeBuilder(
blocks []PeekingIterator[*SeriesWithBloom],
blocks Iterator[*SeriesWithBloom],
store Iterator[*Series],
populate func(*Series, *Bloom) error,
) *MergeBuilder {
@ -549,24 +549,7 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) {
nextInBlocks *SeriesWithBloom
)
// Turn the list of blocks into a single iterator that returns the next series
mergedBlocks := NewPeekingIter[*SeriesWithBloom](NewHeapIterForSeriesWithBloom(mb.blocks...))
// two overlapping blocks can conceivably have the same series, so we need to dedupe,
// preferring the one with the most chunks already indexed since we'll have
// to add fewer chunks to the bloom
deduped := NewDedupingIter[*SeriesWithBloom](
func(a, b *SeriesWithBloom) bool {
return a.Series.Fingerprint == b.Series.Fingerprint
},
Identity[*SeriesWithBloom],
func(a, b *SeriesWithBloom) *SeriesWithBloom {
if len(a.Series.Chunks) > len(b.Series.Chunks) {
return a
}
return b
},
mergedBlocks,
)
deduped := mb.blocks
for mb.store.Next() {
nextInStore := mb.store.At()

@ -150,6 +150,23 @@ func TestBlockBuilderRoundTrip(t *testing.T) {
}
}
func dedupedBlocks(blocks []PeekingIterator[*SeriesWithBloom]) Iterator[*SeriesWithBloom] {
orderedBlocks := NewHeapIterForSeriesWithBloom(blocks...)
return NewDedupingIter[*SeriesWithBloom](
func(a *SeriesWithBloom, b *SeriesWithBloom) bool {
return a.Series.Fingerprint == b.Series.Fingerprint
},
Identity[*SeriesWithBloom],
func(a *SeriesWithBloom, b *SeriesWithBloom) *SeriesWithBloom {
if len(a.Series.Chunks) > len(b.Series.Chunks) {
return a
}
return b
},
NewPeekingIter[*SeriesWithBloom](orderedBlocks),
)
}
func TestMergeBuilder(t *testing.T) {
t.Parallel()
@ -209,7 +226,7 @@ func TestMergeBuilder(t *testing.T) {
)
// Ensure that the merge builder combines all the blocks correctly
mergeBuilder := NewMergeBuilder(blocks, storeItr, pop)
mergeBuilder := NewMergeBuilder(dedupedBlocks(blocks), storeItr, pop)
indexBuf := bytes.NewBuffer(nil)
bloomsBuf := bytes.NewBuffer(nil)
writer := NewMemoryBlockWriter(indexBuf, bloomsBuf)
@ -377,7 +394,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) {
writer := NewMemoryBlockWriter(indexBuf, bloomBuf)
reader := NewByteReader(indexBuf, bloomBuf)
mb := NewMergeBuilder(
blocks,
dedupedBlocks(blocks),
dedupedStore,
func(s *Series, b *Bloom) error {
// We're not actually indexing new data in this test

@ -277,6 +277,38 @@ func (it *PeekCloseIter[T]) Close() error {
return it.close()
}
type ResettableIterator[T any] interface {
Reset() error
Iterator[T]
}
type CloseableResettableIterator[T any] interface {
CloseableIterator[T]
ResettableIterator[T]
}
type Predicate[T any] func(T) bool
func NewFilterIter[T any](it Iterator[T], p Predicate[T]) *FilterIter[T] {
return &FilterIter[T]{
Iterator: it,
match: p,
}
}
type FilterIter[T any] struct {
Iterator[T]
match Predicate[T]
}
func (i *FilterIter[T]) Next() bool {
hasNext := i.Iterator.Next()
for hasNext && !i.match(i.Iterator.At()) {
hasNext = i.Iterator.Next()
}
return hasNext
}
type CounterIterator[T any] interface {
Iterator[T]
Count() int

@ -29,6 +29,13 @@ func (c *CloseableBlockQuerier) Close() error {
return nil
}
func (c *CloseableBlockQuerier) SeriesIter() (v1.PeekingIterator[*v1.SeriesWithBloom], error) {
if err := c.Reset(); err != nil {
return nil, err
}
return v1.NewPeekingIter[*v1.SeriesWithBloom](c.BlockQuerier), nil
}
func NewBlocksCache(cfg cache.EmbeddedCacheConfig, reg prometheus.Registerer, logger log.Logger) *cache.EmbeddedCache[string, BlockDirectory] {
return cache.NewTypedEmbeddedCache[string, BlockDirectory](
"bloom-blocks-cache",

Loading…
Cancel
Save