diff --git a/pkg/bloomcompactor/controller.go b/pkg/bloomcompactor/controller.go index c9b3ecae35..f6487d5f61 100644 --- a/pkg/bloomcompactor/controller.go +++ b/pkg/bloomcompactor/controller.go @@ -14,11 +14,19 @@ import ( "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb" ) +type uploader interface { + PutBlock(ctx context.Context, block bloomshipper.Block) error + PutMeta(ctx context.Context, meta bloomshipper.Meta) error +} + type SimpleBloomController struct { + // TODO(owen-d): consider making tenant+table dynamic (not 1 struct per combination) + tenant string + table string ownershipRange v1.FingerprintBounds // ownership range of this controller tsdbStore TSDBStore - metaStore MetaStore - blockStore BlockStore + bloomStore bloomshipper.Store + uploader uploader chunkLoader ChunkLoader rwFn func() (v1.BlockWriter, v1.BlockReader) metrics *Metrics @@ -28,20 +36,23 @@ type SimpleBloomController struct { } func NewSimpleBloomController( + tenant, table string, ownershipRange v1.FingerprintBounds, tsdbStore TSDBStore, - metaStore MetaStore, - blockStore BlockStore, + blockStore bloomshipper.Store, + uploader uploader, chunkLoader ChunkLoader, rwFn func() (v1.BlockWriter, v1.BlockReader), metrics *Metrics, logger log.Logger, ) *SimpleBloomController { return &SimpleBloomController{ + tenant: tenant, + table: table, ownershipRange: ownershipRange, tsdbStore: tsdbStore, - metaStore: metaStore, - blockStore: blockStore, + bloomStore: blockStore, + uploader: uploader, chunkLoader: chunkLoader, rwFn: rwFn, metrics: metrics, @@ -57,18 +68,8 @@ func (s *SimpleBloomController) do(ctx context.Context) error { return errors.Wrap(err, "failed to resolve tsdbs") } - // 2. Resolve Metas - metaRefs, err := s.metaStore.ResolveMetas(s.ownershipRange) - if err != nil { - level.Error(s.logger).Log("msg", "failed to resolve metas", "err", err) - return errors.Wrap(err, "failed to resolve metas") - } - - // 3. Fetch metas - metas, err := s.metaStore.GetMetas(metaRefs) - if err != nil { - level.Error(s.logger).Log("msg", "failed to get metas", "err", err) - return errors.Wrap(err, "failed to get metas") + if len(tsdbs) == 0 { + return nil } ids := make([]tsdb.Identifier, 0, len(tsdbs)) @@ -76,7 +77,21 @@ func (s *SimpleBloomController) do(ctx context.Context) error { ids = append(ids, id) } - // 4. Determine which TSDBs have gaps in the ownership range and need to + // 2. Fetch metas + metas, err := s.bloomStore.FetchMetas( + ctx, + bloomshipper.MetaSearchParams{ + TenantID: s.tenant, + Interval: bloomshipper.Interval{}, // TODO(owen-d): gen interval + Keyspace: s.ownershipRange, + }, + ) + if err != nil { + level.Error(s.logger).Log("msg", "failed to get metas", "err", err) + return errors.Wrap(err, "failed to get metas") + } + + // 3. Determine which TSDBs have gaps in the ownership range and need to // be processed. tsdbsWithGaps, err := gapsBetweenTSDBsAndMetas(s.ownershipRange, ids, metas) if err != nil { @@ -95,7 +110,7 @@ func (s *SimpleBloomController) do(ctx context.Context) error { return errors.Wrap(err, "failed to create plan") } - // 5. Generate Blooms + // 4. Generate Blooms // Now that we have the gaps, we will generate a bloom block for each gap. // We can accelerate this by using existing blocks which may already contain // needed chunks in their blooms, for instance after a new TSDB version is generated @@ -115,7 +130,7 @@ func (s *SimpleBloomController) do(ctx context.Context) error { for _, gap := range plan.gaps { // Fetch blocks that aren't up to date but are in the desired fingerprint range // to try and accelerate bloom creation - seriesItr, preExistingBlocks, err := s.loadWorkForGap(plan.tsdb, gap) + seriesItr, preExistingBlocks, err := s.loadWorkForGap(ctx, plan.tsdb, gap) if err != nil { level.Error(s.logger).Log("msg", "failed to get series and blocks", "err", err) return errors.Wrap(err, "failed to get series and blocks") @@ -142,7 +157,11 @@ func (s *SimpleBloomController) do(ctx context.Context) error { for newBlocks.Next() { blockCt++ blk := newBlocks.At() - if err := s.blockStore.PutBlock(blk); err != nil { + + if err := s.uploader.PutBlock( + ctx, + bloomshipper.BlockFrom(s.tenant, s.table, blk), + ); err != nil { level.Error(s.logger).Log("msg", "failed to write block", "err", err) return errors.Wrap(err, "failed to write block") } @@ -157,24 +176,31 @@ func (s *SimpleBloomController) do(ctx context.Context) error { } } + // TODO(owen-d): build meta from blocks + // TODO(owen-d): reap tombstones, old metas + level.Debug(s.logger).Log("msg", "finished bloom generation", "blocks", blockCt, "tsdbs", tsdbCt) return nil } -func (s *SimpleBloomController) loadWorkForGap(id tsdb.Identifier, gap gapWithBlocks) (v1.CloseableIterator[*v1.Series], []*v1.Block, error) { +func (s *SimpleBloomController) loadWorkForGap(ctx context.Context, id tsdb.Identifier, gap gapWithBlocks) (v1.CloseableIterator[*v1.Series], []*bloomshipper.ClosableBlockQuerier, error) { // load a series iterator for the gap seriesItr, err := s.tsdbStore.LoadTSDB(id, gap.bounds) if err != nil { return nil, nil, errors.Wrap(err, "failed to load tsdb") } - blocks, err := s.blockStore.GetBlocks(gap.blocks) + blocks, err := s.bloomStore.FetchBlocks(ctx, gap.blocks) if err != nil { return nil, nil, errors.Wrap(err, "failed to get blocks") } + results := make([]*bloomshipper.ClosableBlockQuerier, 0, len(blocks)) + for _, block := range blocks { + results = append(results, block.BlockQuerier()) + } - return seriesItr, blocks, nil + return seriesItr, results, nil } type gapWithBlocks struct { @@ -199,7 +225,7 @@ type blockPlan struct { // blockPlansForGaps groups tsdb gaps we wish to fill with overlapping but out of date blocks. // This allows us to expedite bloom generation by using existing blocks to fill in the gaps // since many will contain the same chunks. -func blockPlansForGaps(tsdbs []tsdbGaps, metas []Meta) ([]blockPlan, error) { +func blockPlansForGaps(tsdbs []tsdbGaps, metas []bloomshipper.Meta) ([]blockPlan, error) { plans := make([]blockPlan, 0, len(tsdbs)) for _, idx := range tsdbs { @@ -215,7 +241,7 @@ func blockPlansForGaps(tsdbs []tsdbGaps, metas []Meta) ([]blockPlan, error) { for _, meta := range metas { - if meta.OwnershipRange.Intersection(gap) == nil { + if meta.Bounds.Intersection(gap) == nil { // this meta doesn't overlap the gap, skip continue } @@ -279,7 +305,7 @@ type tsdbGaps struct { func gapsBetweenTSDBsAndMetas( ownershipRange v1.FingerprintBounds, tsdbs []tsdb.Identifier, - metas []Meta, + metas []bloomshipper.Meta, ) (res []tsdbGaps, err error) { for _, db := range tsdbs { id := db.Name() @@ -288,7 +314,7 @@ func gapsBetweenTSDBsAndMetas( for _, meta := range metas { for _, s := range meta.Sources { if s.Name() == id { - relevantMetas = append(relevantMetas, meta.OwnershipRange) + relevantMetas = append(relevantMetas, meta.Bounds) } } } diff --git a/pkg/bloomcompactor/controller_test.go b/pkg/bloomcompactor/controller_test.go index 1f89a0e318..0660a5b601 100644 --- a/pkg/bloomcompactor/controller_test.go +++ b/pkg/bloomcompactor/controller_test.go @@ -120,10 +120,14 @@ func tsdbID(n int) tsdb.SingleTenantTSDBIdentifier { } } -func genMeta(min, max model.Fingerprint, sources []int, blocks []bloomshipper.BlockRef) Meta { - m := Meta{ - OwnershipRange: v1.NewBounds(min, max), - Blocks: blocks, +func genMeta(min, max model.Fingerprint, sources []int, blocks []bloomshipper.BlockRef) bloomshipper.Meta { + m := bloomshipper.Meta{ + MetaRef: bloomshipper.MetaRef{ + Ref: bloomshipper.Ref{ + Bounds: v1.NewBounds(min, max), + }, + }, + Blocks: blocks, } for _, source := range sources { m.Sources = append(m.Sources, tsdbID(source)) @@ -139,14 +143,14 @@ func Test_gapsBetweenTSDBsAndMetas(t *testing.T) { exp []tsdbGaps ownershipRange v1.FingerprintBounds tsdbs []tsdb.Identifier - metas []Meta + metas []bloomshipper.Meta }{ { desc: "non-overlapping tsdbs and metas", err: true, ownershipRange: v1.NewBounds(0, 10), tsdbs: []tsdb.Identifier{tsdbID(0)}, - metas: []Meta{ + metas: []bloomshipper.Meta{ genMeta(11, 20, []int{0}, nil), }, }, @@ -154,7 +158,7 @@ func Test_gapsBetweenTSDBsAndMetas(t *testing.T) { desc: "single tsdb", ownershipRange: v1.NewBounds(0, 10), tsdbs: []tsdb.Identifier{tsdbID(0)}, - metas: []Meta{ + metas: []bloomshipper.Meta{ genMeta(4, 8, []int{0}, nil), }, exp: []tsdbGaps{ @@ -171,7 +175,7 @@ func Test_gapsBetweenTSDBsAndMetas(t *testing.T) { desc: "multiple tsdbs with separate blocks", ownershipRange: v1.NewBounds(0, 10), tsdbs: []tsdb.Identifier{tsdbID(0), tsdbID(1)}, - metas: []Meta{ + metas: []bloomshipper.Meta{ genMeta(0, 5, []int{0}, nil), genMeta(6, 10, []int{1}, nil), }, @@ -194,7 +198,7 @@ func Test_gapsBetweenTSDBsAndMetas(t *testing.T) { desc: "multiple tsdbs with the same blocks", ownershipRange: v1.NewBounds(0, 10), tsdbs: []tsdb.Identifier{tsdbID(0), tsdbID(1)}, - metas: []Meta{ + metas: []bloomshipper.Meta{ genMeta(0, 5, []int{0, 1}, nil), genMeta(6, 8, []int{1}, nil), }, @@ -239,7 +243,7 @@ func Test_blockPlansForGaps(t *testing.T) { desc string ownershipRange v1.FingerprintBounds tsdbs []tsdb.Identifier - metas []Meta + metas []bloomshipper.Meta err bool exp []blockPlan }{ @@ -247,7 +251,7 @@ func Test_blockPlansForGaps(t *testing.T) { desc: "single overlapping meta+no overlapping block", ownershipRange: v1.NewBounds(0, 10), tsdbs: []tsdb.Identifier{tsdbID(0)}, - metas: []Meta{ + metas: []bloomshipper.Meta{ genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(11, 20)}), }, exp: []blockPlan{ @@ -265,7 +269,7 @@ func Test_blockPlansForGaps(t *testing.T) { desc: "single overlapping meta+one overlapping block", ownershipRange: v1.NewBounds(0, 10), tsdbs: []tsdb.Identifier{tsdbID(0)}, - metas: []Meta{ + metas: []bloomshipper.Meta{ genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), }, exp: []blockPlan{ @@ -287,7 +291,7 @@ func Test_blockPlansForGaps(t *testing.T) { desc: "trims up to date area", ownershipRange: v1.NewBounds(0, 10), tsdbs: []tsdb.Identifier{tsdbID(0)}, - metas: []Meta{ + metas: []bloomshipper.Meta{ genMeta(9, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), // block for same tsdb genMeta(9, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), // block for different tsdb }, @@ -306,7 +310,7 @@ func Test_blockPlansForGaps(t *testing.T) { desc: "uses old block for overlapping range", ownershipRange: v1.NewBounds(0, 10), tsdbs: []tsdb.Identifier{tsdbID(0)}, - metas: []Meta{ + metas: []bloomshipper.Meta{ genMeta(9, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), // block for same tsdb genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(5, 20)}), // block for different tsdb }, @@ -326,7 +330,7 @@ func Test_blockPlansForGaps(t *testing.T) { desc: "multi case", ownershipRange: v1.NewBounds(0, 10), tsdbs: []tsdb.Identifier{tsdbID(0), tsdbID(1)}, // generate for both tsdbs - metas: []Meta{ + metas: []bloomshipper.Meta{ genMeta(0, 2, []int{0}, []bloomshipper.BlockRef{ genBlockRef(0, 1), genBlockRef(1, 2), @@ -374,7 +378,7 @@ func Test_blockPlansForGaps(t *testing.T) { desc: "dedupes block refs", ownershipRange: v1.NewBounds(0, 10), tsdbs: []tsdb.Identifier{tsdbID(0)}, - metas: []Meta{ + metas: []bloomshipper.Meta{ genMeta(9, 20, []int{1}, []bloomshipper.BlockRef{ genBlockRef(1, 4), genBlockRef(9, 20), diff --git a/pkg/bloomcompactor/meta.go b/pkg/bloomcompactor/meta.go index c0a333c5c9..2f2c2cd9de 100644 --- a/pkg/bloomcompactor/meta.go +++ b/pkg/bloomcompactor/meta.go @@ -1,13 +1,7 @@ package bloomcompactor import ( - "fmt" - "path" - - "github.com/pkg/errors" - v1 "github.com/grafana/loki/pkg/storage/bloom/v1" - "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb" ) @@ -16,99 +10,7 @@ const ( MetasPrefix = "metas" ) -type MetaRef struct { - OwnershipRange v1.FingerprintBounds - Checksum uint32 -} - -// `bloom///metas/--.json` -func (m MetaRef) Address(tenant string, period int) (string, error) { - joined := path.Join( - BloomPrefix, - fmt.Sprintf("%v", period), - tenant, - MetasPrefix, - fmt.Sprintf("%v-%v", m.OwnershipRange, m.Checksum), - ) - - return fmt.Sprintf("%s.json", joined), nil -} - -type Meta struct { - - // The fingerprint range of the block. This is the range _owned_ by the meta and - // is greater than or equal to the range of the actual data in the underlying blocks. - OwnershipRange v1.FingerprintBounds - - // Old blocks which can be deleted in the future. These should be from previous compaction rounds. - Tombstones []bloomshipper.BlockRef - - // The specific TSDB files used to generate the block. - Sources []tsdb.SingleTenantTSDBIdentifier - - // A list of blocks that were generated - Blocks []bloomshipper.BlockRef -} - -// Generate MetaRef from Meta -func (m Meta) Ref() (MetaRef, error) { - checksum, err := m.Checksum() - if err != nil { - return MetaRef{}, errors.Wrap(err, "getting checksum") - } - return MetaRef{ - OwnershipRange: m.OwnershipRange, - Checksum: checksum, - }, nil -} - -func (m Meta) Checksum() (uint32, error) { - h := v1.Crc32HashPool.Get() - defer v1.Crc32HashPool.Put(h) - - _, err := h.Write([]byte(m.OwnershipRange.String())) - if err != nil { - return 0, errors.Wrap(err, "writing OwnershipRange") - } - - for _, tombstone := range m.Tombstones { - err = tombstone.Hash(h) - if err != nil { - return 0, errors.Wrap(err, "writing Tombstones") - } - } - - for _, source := range m.Sources { - err = source.Hash(h) - if err != nil { - return 0, errors.Wrap(err, "writing Sources") - } - } - - for _, block := range m.Blocks { - err = block.Hash(h) - if err != nil { - return 0, errors.Wrap(err, "writing Blocks") - } - } - - return h.Sum32(), nil - -} - type TSDBStore interface { ResolveTSDBs() ([]*tsdb.SingleTenantTSDBIdentifier, error) LoadTSDB(id tsdb.Identifier, bounds v1.FingerprintBounds) (v1.CloseableIterator[*v1.Series], error) } - -type MetaStore interface { - ResolveMetas(bounds v1.FingerprintBounds) ([]MetaRef, error) - GetMetas([]MetaRef) ([]Meta, error) - PutMeta(Meta) error -} - -type BlockStore interface { - // TODO(owen-d): flesh out|integrate against bloomshipper.Client - GetBlocks([]bloomshipper.BlockRef) ([]*v1.Block, error) - PutBlock(interface{}) error -} diff --git a/pkg/bloomcompactor/spec.go b/pkg/bloomcompactor/spec.go index e0d964e9e9..341e397750 100644 --- a/pkg/bloomcompactor/spec.go +++ b/pkg/bloomcompactor/spec.go @@ -13,11 +13,14 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" + "github.com/grafana/dskit/multierror" + "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/logproto" logql_log "github.com/grafana/loki/pkg/logql/log" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" "github.com/grafana/loki/pkg/storage/chunk" + "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb" ) @@ -71,7 +74,7 @@ type SimpleBloomGenerator struct { chunkLoader ChunkLoader // TODO(owen-d): blocks need not be all downloaded prior. Consider implementing // as an iterator of iterators, where each iterator is a batch of overlapping blocks. - blocks []*v1.Block + blocks []*bloomshipper.ClosableBlockQuerier // options to build blocks with opts v1.BlockOptions @@ -92,7 +95,7 @@ func NewSimpleBloomGenerator( opts v1.BlockOptions, store v1.Iterator[*v1.Series], chunkLoader ChunkLoader, - blocks []*v1.Block, + blocks []*bloomshipper.ClosableBlockQuerier, readWriterFn func() (v1.BlockWriter, v1.BlockReader), metrics *Metrics, logger log.Logger, @@ -129,38 +132,62 @@ func (s *SimpleBloomGenerator) populator(ctx context.Context) func(series *v1.Se } -func (s *SimpleBloomGenerator) Generate(ctx context.Context) (skippedBlocks []*v1.Block, results v1.Iterator[*v1.Block], err error) { +func (s *SimpleBloomGenerator) Generate(ctx context.Context) (skippedBlocks []v1.BlockMetadata, results v1.Iterator[*v1.Block], err error) { + var closeErrors multierror.MultiError blocksMatchingSchema := make([]v1.PeekingIterator[*v1.SeriesWithBloom], 0, len(s.blocks)) + toClose := make([]*bloomshipper.ClosableBlockQuerier, 0, len(s.blocks)) + // Close all remaining blocks on exit + defer func() { + for _, block := range toClose { + closeErrors.Add(block.Close()) + } + if err := closeErrors.Err(); err != nil { + level.Error(s.logger).Log("msg", "failed to close blocks", "err", err) + } + }() + for _, block := range s.blocks { // TODO(owen-d): implement block naming so we can log the affected block in all these calls logger := log.With(s.logger, "block", fmt.Sprintf("%+v", block)) - schema, err := block.Schema() + md, err := block.Metadata() + schema := md.Options.Schema if err != nil { level.Warn(logger).Log("msg", "failed to get schema for block", "err", err) - skippedBlocks = append(skippedBlocks, block) + skippedBlocks = append(skippedBlocks, md) + + // Close unneeded block + closeErrors.Add(block.Close()) + continue } if !s.opts.Schema.Compatible(schema) { level.Warn(logger).Log("msg", "block schema incompatible with options", "generator_schema", fmt.Sprintf("%+v", s.opts.Schema), "block_schema", fmt.Sprintf("%+v", schema)) - skippedBlocks = append(skippedBlocks, block) + skippedBlocks = append(skippedBlocks, md) + + // Close unneeded block + closeErrors.Add(block.Close()) + continue } level.Debug(logger).Log("msg", "adding compatible block to bloom generation inputs") - itr := v1.NewPeekingIter[*v1.SeriesWithBloom](v1.NewBlockQuerier(block)) + itr := v1.NewPeekingIter[*v1.SeriesWithBloom](block) blocksMatchingSchema = append(blocksMatchingSchema, itr) + // append needed block to close list (when finished) + toClose = append(toClose, block) } level.Debug(s.logger).Log("msg", "generating bloom filters for blocks", "num_blocks", len(blocksMatchingSchema), "skipped_blocks", len(skippedBlocks), "schema", fmt.Sprintf("%+v", s.opts.Schema)) // TODO(owen-d): implement bounded block sizes - mergeBuilder := v1.NewMergeBuilder(blocksMatchingSchema, s.store, s.populator(ctx)) writer, reader := s.readWriterFn() + blockBuilder, err := v1.NewBlockBuilder(v1.NewBlockOptionsFromSchema(s.opts.Schema), writer) if err != nil { return skippedBlocks, nil, errors.Wrap(err, "failed to create bloom block builder") } + _, err = mergeBuilder.Build(blockBuilder) if err != nil { return skippedBlocks, nil, errors.Wrap(err, "failed to build bloom block") diff --git a/pkg/bloomcompactor/spec_test.go b/pkg/bloomcompactor/spec_test.go index 08c722d06e..efc0d70f20 100644 --- a/pkg/bloomcompactor/spec_test.go +++ b/pkg/bloomcompactor/spec_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/require" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" + "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" ) func blocksFromSchema(t *testing.T, n int, options v1.BlockOptions) (res []*v1.Block, data []v1.SeriesWithBloom) { @@ -63,11 +64,18 @@ func (dummyChunkLoader) Load(_ context.Context, series *v1.Series) (*ChunkItersB } func dummyBloomGen(opts v1.BlockOptions, store v1.Iterator[*v1.Series], blocks []*v1.Block) *SimpleBloomGenerator { + bqs := make([]*bloomshipper.ClosableBlockQuerier, 0, len(blocks)) + for _, b := range blocks { + bqs = append(bqs, &bloomshipper.ClosableBlockQuerier{ + BlockQuerier: v1.NewBlockQuerier(b), + }) + } + return NewSimpleBloomGenerator( opts, store, dummyChunkLoader{}, - blocks, + bqs, func() (v1.BlockWriter, v1.BlockReader) { indexBuf := bytes.NewBuffer(nil) bloomsBuf := bytes.NewBuffer(nil) diff --git a/pkg/storage/bloom/v1/block.go b/pkg/storage/bloom/v1/block.go index c5b115df19..0b10b9d5b0 100644 --- a/pkg/storage/bloom/v1/block.go +++ b/pkg/storage/bloom/v1/block.go @@ -127,6 +127,10 @@ func NewBlockQuerier(b *Block) *BlockQuerier { } } +func (bq *BlockQuerier) Metadata() (BlockMetadata, error) { + return bq.block.Metadata() +} + func (bq *BlockQuerier) Schema() (Schema, error) { return bq.block.Schema() } diff --git a/pkg/storage/bloom/v1/util.go b/pkg/storage/bloom/v1/util.go index 31fcdc6439..27d347bfb4 100644 --- a/pkg/storage/bloom/v1/util.go +++ b/pkg/storage/bloom/v1/util.go @@ -246,3 +246,21 @@ type CloseableIterator[T any] interface { Iterator[T] Close() error } + +type PeekingCloseableIterator[T any] interface { + PeekingIterator[T] + CloseableIterator[T] +} + +type PeekCloseIter[T any] struct { + *PeekIter[T] + close func() error +} + +func NewPeekCloseIter[T any](itr CloseableIterator[T]) *PeekCloseIter[T] { + return &PeekCloseIter[T]{PeekIter: NewPeekingIter(itr), close: itr.Close} +} + +func (it *PeekCloseIter[T]) Close() error { + return it.close() +} diff --git a/pkg/storage/stores/shipper/bloomshipper/cache.go b/pkg/storage/stores/shipper/bloomshipper/cache.go index 7fe678a1c3..2f9e98f89d 100644 --- a/pkg/storage/stores/shipper/bloomshipper/cache.go +++ b/pkg/storage/stores/shipper/bloomshipper/cache.go @@ -19,7 +19,14 @@ import ( type ClosableBlockQuerier struct { *v1.BlockQuerier - Close func() error + close func() error +} + +func (c *ClosableBlockQuerier) Close() error { + if c.close != nil { + return c.close() + } + return nil } func NewBlocksCache(config config.Config, reg prometheus.Registerer, logger log.Logger) *cache.EmbeddedCache[string, BlockDirectory] { @@ -75,7 +82,7 @@ func (b BlockDirectory) BlockQuerier() *ClosableBlockQuerier { b.activeQueriers.Inc() return &ClosableBlockQuerier{ BlockQuerier: v1.NewBlockQuerier(b.Block()), - Close: func() error { + close: func() error { _ = b.activeQueriers.Dec() return nil }, diff --git a/pkg/storage/stores/shipper/bloomshipper/client.go b/pkg/storage/stores/shipper/bloomshipper/client.go index 396e8808c1..2e31106548 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client.go +++ b/pkg/storage/stores/shipper/bloomshipper/client.go @@ -16,6 +16,7 @@ import ( v1 "github.com/grafana/loki/pkg/storage/bloom/v1" "github.com/grafana/loki/pkg/storage/chunk/client" "github.com/grafana/loki/pkg/storage/config" + "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb" "github.com/grafana/loki/pkg/util/encoding" ) @@ -84,8 +85,49 @@ func (r MetaRef) String() string { type Meta struct { MetaRef `json:"-"` + // The specific TSDB files used to generate the block. + Sources []tsdb.SingleTenantTSDBIdentifier + + // Old blocks which can be deleted in the future. These should be from previous compaction rounds. Tombstones []BlockRef - Blocks []BlockRef + + // A list of blocks that were generated + Blocks []BlockRef +} + +// TODO(owen-d): use this to update internal ref's checksum. +func (m Meta) Checksum() (uint32, error) { + h := v1.Crc32HashPool.Get() + defer v1.Crc32HashPool.Put(h) + + err := m.Bounds.Hash(h) + if err != nil { + return 0, errors.Wrap(err, "writing OwnershipRange") + } + + for _, tombstone := range m.Tombstones { + err = tombstone.Hash(h) + if err != nil { + return 0, errors.Wrap(err, "writing Tombstones") + } + } + + for _, source := range m.Sources { + err = source.Hash(h) + if err != nil { + return 0, errors.Wrap(err, "writing Sources") + } + } + + for _, block := range m.Blocks { + err = block.Hash(h) + if err != nil { + return 0, errors.Wrap(err, "writing Blocks") + } + } + + return h.Sum32(), nil + } type MetaSearchParams struct { @@ -107,6 +149,21 @@ type Block struct { Data io.ReadSeekCloser } +func BlockFrom(tenant, table string, blk *v1.Block) Block { + md, _ := blk.Metadata() + ref := Ref{ + TenantID: tenant, + TableName: table, + Bounds: md.Series.Bounds, + StartTimestamp: md.Series.FromTs, + EndTimestamp: md.Series.ThroughTs, + Checksum: md.Checksum, + } + return Block{ + BlockRef: BlockRef{Ref: ref}, + } +} + type BlockClient interface { KeyResolver GetBlock(ctx context.Context, ref BlockRef) (BlockDirectory, error)