[Blooms] bloomshipper meta integration (#11874)

Builds on top of https://github.com/grafana/loki/pull/11872,
standardizing on the `bloomshipper` types `Meta` and `MetaRef`.
pull/11886/head
Owen Diehl 2 years ago committed by GitHub
parent 6b46e373fb
commit af66ecec6a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 84
      pkg/bloomcompactor/controller.go
  2. 36
      pkg/bloomcompactor/controller_test.go
  3. 98
      pkg/bloomcompactor/meta.go
  4. 43
      pkg/bloomcompactor/spec.go
  5. 10
      pkg/bloomcompactor/spec_test.go
  6. 4
      pkg/storage/bloom/v1/block.go
  7. 18
      pkg/storage/bloom/v1/util.go
  8. 11
      pkg/storage/stores/shipper/bloomshipper/cache.go
  9. 59
      pkg/storage/stores/shipper/bloomshipper/client.go

@ -14,11 +14,19 @@ import (
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb"
)
type uploader interface {
PutBlock(ctx context.Context, block bloomshipper.Block) error
PutMeta(ctx context.Context, meta bloomshipper.Meta) error
}
type SimpleBloomController struct {
// TODO(owen-d): consider making tenant+table dynamic (not 1 struct per combination)
tenant string
table string
ownershipRange v1.FingerprintBounds // ownership range of this controller
tsdbStore TSDBStore
metaStore MetaStore
blockStore BlockStore
bloomStore bloomshipper.Store
uploader uploader
chunkLoader ChunkLoader
rwFn func() (v1.BlockWriter, v1.BlockReader)
metrics *Metrics
@ -28,20 +36,23 @@ type SimpleBloomController struct {
}
func NewSimpleBloomController(
tenant, table string,
ownershipRange v1.FingerprintBounds,
tsdbStore TSDBStore,
metaStore MetaStore,
blockStore BlockStore,
blockStore bloomshipper.Store,
uploader uploader,
chunkLoader ChunkLoader,
rwFn func() (v1.BlockWriter, v1.BlockReader),
metrics *Metrics,
logger log.Logger,
) *SimpleBloomController {
return &SimpleBloomController{
tenant: tenant,
table: table,
ownershipRange: ownershipRange,
tsdbStore: tsdbStore,
metaStore: metaStore,
blockStore: blockStore,
bloomStore: blockStore,
uploader: uploader,
chunkLoader: chunkLoader,
rwFn: rwFn,
metrics: metrics,
@ -57,18 +68,8 @@ func (s *SimpleBloomController) do(ctx context.Context) error {
return errors.Wrap(err, "failed to resolve tsdbs")
}
// 2. Resolve Metas
metaRefs, err := s.metaStore.ResolveMetas(s.ownershipRange)
if err != nil {
level.Error(s.logger).Log("msg", "failed to resolve metas", "err", err)
return errors.Wrap(err, "failed to resolve metas")
}
// 3. Fetch metas
metas, err := s.metaStore.GetMetas(metaRefs)
if err != nil {
level.Error(s.logger).Log("msg", "failed to get metas", "err", err)
return errors.Wrap(err, "failed to get metas")
if len(tsdbs) == 0 {
return nil
}
ids := make([]tsdb.Identifier, 0, len(tsdbs))
@ -76,7 +77,21 @@ func (s *SimpleBloomController) do(ctx context.Context) error {
ids = append(ids, id)
}
// 4. Determine which TSDBs have gaps in the ownership range and need to
// 2. Fetch metas
metas, err := s.bloomStore.FetchMetas(
ctx,
bloomshipper.MetaSearchParams{
TenantID: s.tenant,
Interval: bloomshipper.Interval{}, // TODO(owen-d): gen interval
Keyspace: s.ownershipRange,
},
)
if err != nil {
level.Error(s.logger).Log("msg", "failed to get metas", "err", err)
return errors.Wrap(err, "failed to get metas")
}
// 3. Determine which TSDBs have gaps in the ownership range and need to
// be processed.
tsdbsWithGaps, err := gapsBetweenTSDBsAndMetas(s.ownershipRange, ids, metas)
if err != nil {
@ -95,7 +110,7 @@ func (s *SimpleBloomController) do(ctx context.Context) error {
return errors.Wrap(err, "failed to create plan")
}
// 5. Generate Blooms
// 4. Generate Blooms
// Now that we have the gaps, we will generate a bloom block for each gap.
// We can accelerate this by using existing blocks which may already contain
// needed chunks in their blooms, for instance after a new TSDB version is generated
@ -115,7 +130,7 @@ func (s *SimpleBloomController) do(ctx context.Context) error {
for _, gap := range plan.gaps {
// Fetch blocks that aren't up to date but are in the desired fingerprint range
// to try and accelerate bloom creation
seriesItr, preExistingBlocks, err := s.loadWorkForGap(plan.tsdb, gap)
seriesItr, preExistingBlocks, err := s.loadWorkForGap(ctx, plan.tsdb, gap)
if err != nil {
level.Error(s.logger).Log("msg", "failed to get series and blocks", "err", err)
return errors.Wrap(err, "failed to get series and blocks")
@ -142,7 +157,11 @@ func (s *SimpleBloomController) do(ctx context.Context) error {
for newBlocks.Next() {
blockCt++
blk := newBlocks.At()
if err := s.blockStore.PutBlock(blk); err != nil {
if err := s.uploader.PutBlock(
ctx,
bloomshipper.BlockFrom(s.tenant, s.table, blk),
); err != nil {
level.Error(s.logger).Log("msg", "failed to write block", "err", err)
return errors.Wrap(err, "failed to write block")
}
@ -157,24 +176,31 @@ func (s *SimpleBloomController) do(ctx context.Context) error {
}
}
// TODO(owen-d): build meta from blocks
// TODO(owen-d): reap tombstones, old metas
level.Debug(s.logger).Log("msg", "finished bloom generation", "blocks", blockCt, "tsdbs", tsdbCt)
return nil
}
func (s *SimpleBloomController) loadWorkForGap(id tsdb.Identifier, gap gapWithBlocks) (v1.CloseableIterator[*v1.Series], []*v1.Block, error) {
func (s *SimpleBloomController) loadWorkForGap(ctx context.Context, id tsdb.Identifier, gap gapWithBlocks) (v1.CloseableIterator[*v1.Series], []*bloomshipper.ClosableBlockQuerier, error) {
// load a series iterator for the gap
seriesItr, err := s.tsdbStore.LoadTSDB(id, gap.bounds)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to load tsdb")
}
blocks, err := s.blockStore.GetBlocks(gap.blocks)
blocks, err := s.bloomStore.FetchBlocks(ctx, gap.blocks)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to get blocks")
}
results := make([]*bloomshipper.ClosableBlockQuerier, 0, len(blocks))
for _, block := range blocks {
results = append(results, block.BlockQuerier())
}
return seriesItr, blocks, nil
return seriesItr, results, nil
}
type gapWithBlocks struct {
@ -199,7 +225,7 @@ type blockPlan struct {
// blockPlansForGaps groups tsdb gaps we wish to fill with overlapping but out of date blocks.
// This allows us to expedite bloom generation by using existing blocks to fill in the gaps
// since many will contain the same chunks.
func blockPlansForGaps(tsdbs []tsdbGaps, metas []Meta) ([]blockPlan, error) {
func blockPlansForGaps(tsdbs []tsdbGaps, metas []bloomshipper.Meta) ([]blockPlan, error) {
plans := make([]blockPlan, 0, len(tsdbs))
for _, idx := range tsdbs {
@ -215,7 +241,7 @@ func blockPlansForGaps(tsdbs []tsdbGaps, metas []Meta) ([]blockPlan, error) {
for _, meta := range metas {
if meta.OwnershipRange.Intersection(gap) == nil {
if meta.Bounds.Intersection(gap) == nil {
// this meta doesn't overlap the gap, skip
continue
}
@ -279,7 +305,7 @@ type tsdbGaps struct {
func gapsBetweenTSDBsAndMetas(
ownershipRange v1.FingerprintBounds,
tsdbs []tsdb.Identifier,
metas []Meta,
metas []bloomshipper.Meta,
) (res []tsdbGaps, err error) {
for _, db := range tsdbs {
id := db.Name()
@ -288,7 +314,7 @@ func gapsBetweenTSDBsAndMetas(
for _, meta := range metas {
for _, s := range meta.Sources {
if s.Name() == id {
relevantMetas = append(relevantMetas, meta.OwnershipRange)
relevantMetas = append(relevantMetas, meta.Bounds)
}
}
}

@ -120,10 +120,14 @@ func tsdbID(n int) tsdb.SingleTenantTSDBIdentifier {
}
}
func genMeta(min, max model.Fingerprint, sources []int, blocks []bloomshipper.BlockRef) Meta {
m := Meta{
OwnershipRange: v1.NewBounds(min, max),
Blocks: blocks,
func genMeta(min, max model.Fingerprint, sources []int, blocks []bloomshipper.BlockRef) bloomshipper.Meta {
m := bloomshipper.Meta{
MetaRef: bloomshipper.MetaRef{
Ref: bloomshipper.Ref{
Bounds: v1.NewBounds(min, max),
},
},
Blocks: blocks,
}
for _, source := range sources {
m.Sources = append(m.Sources, tsdbID(source))
@ -139,14 +143,14 @@ func Test_gapsBetweenTSDBsAndMetas(t *testing.T) {
exp []tsdbGaps
ownershipRange v1.FingerprintBounds
tsdbs []tsdb.Identifier
metas []Meta
metas []bloomshipper.Meta
}{
{
desc: "non-overlapping tsdbs and metas",
err: true,
ownershipRange: v1.NewBounds(0, 10),
tsdbs: []tsdb.Identifier{tsdbID(0)},
metas: []Meta{
metas: []bloomshipper.Meta{
genMeta(11, 20, []int{0}, nil),
},
},
@ -154,7 +158,7 @@ func Test_gapsBetweenTSDBsAndMetas(t *testing.T) {
desc: "single tsdb",
ownershipRange: v1.NewBounds(0, 10),
tsdbs: []tsdb.Identifier{tsdbID(0)},
metas: []Meta{
metas: []bloomshipper.Meta{
genMeta(4, 8, []int{0}, nil),
},
exp: []tsdbGaps{
@ -171,7 +175,7 @@ func Test_gapsBetweenTSDBsAndMetas(t *testing.T) {
desc: "multiple tsdbs with separate blocks",
ownershipRange: v1.NewBounds(0, 10),
tsdbs: []tsdb.Identifier{tsdbID(0), tsdbID(1)},
metas: []Meta{
metas: []bloomshipper.Meta{
genMeta(0, 5, []int{0}, nil),
genMeta(6, 10, []int{1}, nil),
},
@ -194,7 +198,7 @@ func Test_gapsBetweenTSDBsAndMetas(t *testing.T) {
desc: "multiple tsdbs with the same blocks",
ownershipRange: v1.NewBounds(0, 10),
tsdbs: []tsdb.Identifier{tsdbID(0), tsdbID(1)},
metas: []Meta{
metas: []bloomshipper.Meta{
genMeta(0, 5, []int{0, 1}, nil),
genMeta(6, 8, []int{1}, nil),
},
@ -239,7 +243,7 @@ func Test_blockPlansForGaps(t *testing.T) {
desc string
ownershipRange v1.FingerprintBounds
tsdbs []tsdb.Identifier
metas []Meta
metas []bloomshipper.Meta
err bool
exp []blockPlan
}{
@ -247,7 +251,7 @@ func Test_blockPlansForGaps(t *testing.T) {
desc: "single overlapping meta+no overlapping block",
ownershipRange: v1.NewBounds(0, 10),
tsdbs: []tsdb.Identifier{tsdbID(0)},
metas: []Meta{
metas: []bloomshipper.Meta{
genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(11, 20)}),
},
exp: []blockPlan{
@ -265,7 +269,7 @@ func Test_blockPlansForGaps(t *testing.T) {
desc: "single overlapping meta+one overlapping block",
ownershipRange: v1.NewBounds(0, 10),
tsdbs: []tsdb.Identifier{tsdbID(0)},
metas: []Meta{
metas: []bloomshipper.Meta{
genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(9, 20)}),
},
exp: []blockPlan{
@ -287,7 +291,7 @@ func Test_blockPlansForGaps(t *testing.T) {
desc: "trims up to date area",
ownershipRange: v1.NewBounds(0, 10),
tsdbs: []tsdb.Identifier{tsdbID(0)},
metas: []Meta{
metas: []bloomshipper.Meta{
genMeta(9, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), // block for same tsdb
genMeta(9, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), // block for different tsdb
},
@ -306,7 +310,7 @@ func Test_blockPlansForGaps(t *testing.T) {
desc: "uses old block for overlapping range",
ownershipRange: v1.NewBounds(0, 10),
tsdbs: []tsdb.Identifier{tsdbID(0)},
metas: []Meta{
metas: []bloomshipper.Meta{
genMeta(9, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), // block for same tsdb
genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(5, 20)}), // block for different tsdb
},
@ -326,7 +330,7 @@ func Test_blockPlansForGaps(t *testing.T) {
desc: "multi case",
ownershipRange: v1.NewBounds(0, 10),
tsdbs: []tsdb.Identifier{tsdbID(0), tsdbID(1)}, // generate for both tsdbs
metas: []Meta{
metas: []bloomshipper.Meta{
genMeta(0, 2, []int{0}, []bloomshipper.BlockRef{
genBlockRef(0, 1),
genBlockRef(1, 2),
@ -374,7 +378,7 @@ func Test_blockPlansForGaps(t *testing.T) {
desc: "dedupes block refs",
ownershipRange: v1.NewBounds(0, 10),
tsdbs: []tsdb.Identifier{tsdbID(0)},
metas: []Meta{
metas: []bloomshipper.Meta{
genMeta(9, 20, []int{1}, []bloomshipper.BlockRef{
genBlockRef(1, 4),
genBlockRef(9, 20),

@ -1,13 +1,7 @@
package bloomcompactor
import (
"fmt"
"path"
"github.com/pkg/errors"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb"
)
@ -16,99 +10,7 @@ const (
MetasPrefix = "metas"
)
type MetaRef struct {
OwnershipRange v1.FingerprintBounds
Checksum uint32
}
// `bloom/<period>/<tenant>/metas/<start_fp>-<end_fp>-<checksum>.json`
func (m MetaRef) Address(tenant string, period int) (string, error) {
joined := path.Join(
BloomPrefix,
fmt.Sprintf("%v", period),
tenant,
MetasPrefix,
fmt.Sprintf("%v-%v", m.OwnershipRange, m.Checksum),
)
return fmt.Sprintf("%s.json", joined), nil
}
type Meta struct {
// The fingerprint range of the block. This is the range _owned_ by the meta and
// is greater than or equal to the range of the actual data in the underlying blocks.
OwnershipRange v1.FingerprintBounds
// Old blocks which can be deleted in the future. These should be from previous compaction rounds.
Tombstones []bloomshipper.BlockRef
// The specific TSDB files used to generate the block.
Sources []tsdb.SingleTenantTSDBIdentifier
// A list of blocks that were generated
Blocks []bloomshipper.BlockRef
}
// Generate MetaRef from Meta
func (m Meta) Ref() (MetaRef, error) {
checksum, err := m.Checksum()
if err != nil {
return MetaRef{}, errors.Wrap(err, "getting checksum")
}
return MetaRef{
OwnershipRange: m.OwnershipRange,
Checksum: checksum,
}, nil
}
func (m Meta) Checksum() (uint32, error) {
h := v1.Crc32HashPool.Get()
defer v1.Crc32HashPool.Put(h)
_, err := h.Write([]byte(m.OwnershipRange.String()))
if err != nil {
return 0, errors.Wrap(err, "writing OwnershipRange")
}
for _, tombstone := range m.Tombstones {
err = tombstone.Hash(h)
if err != nil {
return 0, errors.Wrap(err, "writing Tombstones")
}
}
for _, source := range m.Sources {
err = source.Hash(h)
if err != nil {
return 0, errors.Wrap(err, "writing Sources")
}
}
for _, block := range m.Blocks {
err = block.Hash(h)
if err != nil {
return 0, errors.Wrap(err, "writing Blocks")
}
}
return h.Sum32(), nil
}
type TSDBStore interface {
ResolveTSDBs() ([]*tsdb.SingleTenantTSDBIdentifier, error)
LoadTSDB(id tsdb.Identifier, bounds v1.FingerprintBounds) (v1.CloseableIterator[*v1.Series], error)
}
type MetaStore interface {
ResolveMetas(bounds v1.FingerprintBounds) ([]MetaRef, error)
GetMetas([]MetaRef) ([]Meta, error)
PutMeta(Meta) error
}
type BlockStore interface {
// TODO(owen-d): flesh out|integrate against bloomshipper.Client
GetBlocks([]bloomshipper.BlockRef) ([]*v1.Block, error)
PutBlock(interface{}) error
}

@ -13,11 +13,14 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/grafana/dskit/multierror"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/logproto"
logql_log "github.com/grafana/loki/pkg/logql/log"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb"
)
@ -71,7 +74,7 @@ type SimpleBloomGenerator struct {
chunkLoader ChunkLoader
// TODO(owen-d): blocks need not be all downloaded prior. Consider implementing
// as an iterator of iterators, where each iterator is a batch of overlapping blocks.
blocks []*v1.Block
blocks []*bloomshipper.ClosableBlockQuerier
// options to build blocks with
opts v1.BlockOptions
@ -92,7 +95,7 @@ func NewSimpleBloomGenerator(
opts v1.BlockOptions,
store v1.Iterator[*v1.Series],
chunkLoader ChunkLoader,
blocks []*v1.Block,
blocks []*bloomshipper.ClosableBlockQuerier,
readWriterFn func() (v1.BlockWriter, v1.BlockReader),
metrics *Metrics,
logger log.Logger,
@ -129,38 +132,62 @@ func (s *SimpleBloomGenerator) populator(ctx context.Context) func(series *v1.Se
}
func (s *SimpleBloomGenerator) Generate(ctx context.Context) (skippedBlocks []*v1.Block, results v1.Iterator[*v1.Block], err error) {
func (s *SimpleBloomGenerator) Generate(ctx context.Context) (skippedBlocks []v1.BlockMetadata, results v1.Iterator[*v1.Block], err error) {
var closeErrors multierror.MultiError
blocksMatchingSchema := make([]v1.PeekingIterator[*v1.SeriesWithBloom], 0, len(s.blocks))
toClose := make([]*bloomshipper.ClosableBlockQuerier, 0, len(s.blocks))
// Close all remaining blocks on exit
defer func() {
for _, block := range toClose {
closeErrors.Add(block.Close())
}
if err := closeErrors.Err(); err != nil {
level.Error(s.logger).Log("msg", "failed to close blocks", "err", err)
}
}()
for _, block := range s.blocks {
// TODO(owen-d): implement block naming so we can log the affected block in all these calls
logger := log.With(s.logger, "block", fmt.Sprintf("%+v", block))
schema, err := block.Schema()
md, err := block.Metadata()
schema := md.Options.Schema
if err != nil {
level.Warn(logger).Log("msg", "failed to get schema for block", "err", err)
skippedBlocks = append(skippedBlocks, block)
skippedBlocks = append(skippedBlocks, md)
// Close unneeded block
closeErrors.Add(block.Close())
continue
}
if !s.opts.Schema.Compatible(schema) {
level.Warn(logger).Log("msg", "block schema incompatible with options", "generator_schema", fmt.Sprintf("%+v", s.opts.Schema), "block_schema", fmt.Sprintf("%+v", schema))
skippedBlocks = append(skippedBlocks, block)
skippedBlocks = append(skippedBlocks, md)
// Close unneeded block
closeErrors.Add(block.Close())
continue
}
level.Debug(logger).Log("msg", "adding compatible block to bloom generation inputs")
itr := v1.NewPeekingIter[*v1.SeriesWithBloom](v1.NewBlockQuerier(block))
itr := v1.NewPeekingIter[*v1.SeriesWithBloom](block)
blocksMatchingSchema = append(blocksMatchingSchema, itr)
// append needed block to close list (when finished)
toClose = append(toClose, block)
}
level.Debug(s.logger).Log("msg", "generating bloom filters for blocks", "num_blocks", len(blocksMatchingSchema), "skipped_blocks", len(skippedBlocks), "schema", fmt.Sprintf("%+v", s.opts.Schema))
// TODO(owen-d): implement bounded block sizes
mergeBuilder := v1.NewMergeBuilder(blocksMatchingSchema, s.store, s.populator(ctx))
writer, reader := s.readWriterFn()
blockBuilder, err := v1.NewBlockBuilder(v1.NewBlockOptionsFromSchema(s.opts.Schema), writer)
if err != nil {
return skippedBlocks, nil, errors.Wrap(err, "failed to create bloom block builder")
}
_, err = mergeBuilder.Build(blockBuilder)
if err != nil {
return skippedBlocks, nil, errors.Wrap(err, "failed to build bloom block")

@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/require"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
)
func blocksFromSchema(t *testing.T, n int, options v1.BlockOptions) (res []*v1.Block, data []v1.SeriesWithBloom) {
@ -63,11 +64,18 @@ func (dummyChunkLoader) Load(_ context.Context, series *v1.Series) (*ChunkItersB
}
func dummyBloomGen(opts v1.BlockOptions, store v1.Iterator[*v1.Series], blocks []*v1.Block) *SimpleBloomGenerator {
bqs := make([]*bloomshipper.ClosableBlockQuerier, 0, len(blocks))
for _, b := range blocks {
bqs = append(bqs, &bloomshipper.ClosableBlockQuerier{
BlockQuerier: v1.NewBlockQuerier(b),
})
}
return NewSimpleBloomGenerator(
opts,
store,
dummyChunkLoader{},
blocks,
bqs,
func() (v1.BlockWriter, v1.BlockReader) {
indexBuf := bytes.NewBuffer(nil)
bloomsBuf := bytes.NewBuffer(nil)

@ -127,6 +127,10 @@ func NewBlockQuerier(b *Block) *BlockQuerier {
}
}
func (bq *BlockQuerier) Metadata() (BlockMetadata, error) {
return bq.block.Metadata()
}
func (bq *BlockQuerier) Schema() (Schema, error) {
return bq.block.Schema()
}

@ -246,3 +246,21 @@ type CloseableIterator[T any] interface {
Iterator[T]
Close() error
}
type PeekingCloseableIterator[T any] interface {
PeekingIterator[T]
CloseableIterator[T]
}
type PeekCloseIter[T any] struct {
*PeekIter[T]
close func() error
}
func NewPeekCloseIter[T any](itr CloseableIterator[T]) *PeekCloseIter[T] {
return &PeekCloseIter[T]{PeekIter: NewPeekingIter(itr), close: itr.Close}
}
func (it *PeekCloseIter[T]) Close() error {
return it.close()
}

@ -19,7 +19,14 @@ import (
type ClosableBlockQuerier struct {
*v1.BlockQuerier
Close func() error
close func() error
}
func (c *ClosableBlockQuerier) Close() error {
if c.close != nil {
return c.close()
}
return nil
}
func NewBlocksCache(config config.Config, reg prometheus.Registerer, logger log.Logger) *cache.EmbeddedCache[string, BlockDirectory] {
@ -75,7 +82,7 @@ func (b BlockDirectory) BlockQuerier() *ClosableBlockQuerier {
b.activeQueriers.Inc()
return &ClosableBlockQuerier{
BlockQuerier: v1.NewBlockQuerier(b.Block()),
Close: func() error {
close: func() error {
_ = b.activeQueriers.Dec()
return nil
},

@ -16,6 +16,7 @@ import (
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/chunk/client"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/pkg/util/encoding"
)
@ -84,8 +85,49 @@ func (r MetaRef) String() string {
type Meta struct {
MetaRef `json:"-"`
// The specific TSDB files used to generate the block.
Sources []tsdb.SingleTenantTSDBIdentifier
// Old blocks which can be deleted in the future. These should be from previous compaction rounds.
Tombstones []BlockRef
Blocks []BlockRef
// A list of blocks that were generated
Blocks []BlockRef
}
// TODO(owen-d): use this to update internal ref's checksum.
func (m Meta) Checksum() (uint32, error) {
h := v1.Crc32HashPool.Get()
defer v1.Crc32HashPool.Put(h)
err := m.Bounds.Hash(h)
if err != nil {
return 0, errors.Wrap(err, "writing OwnershipRange")
}
for _, tombstone := range m.Tombstones {
err = tombstone.Hash(h)
if err != nil {
return 0, errors.Wrap(err, "writing Tombstones")
}
}
for _, source := range m.Sources {
err = source.Hash(h)
if err != nil {
return 0, errors.Wrap(err, "writing Sources")
}
}
for _, block := range m.Blocks {
err = block.Hash(h)
if err != nil {
return 0, errors.Wrap(err, "writing Blocks")
}
}
return h.Sum32(), nil
}
type MetaSearchParams struct {
@ -107,6 +149,21 @@ type Block struct {
Data io.ReadSeekCloser
}
func BlockFrom(tenant, table string, blk *v1.Block) Block {
md, _ := blk.Metadata()
ref := Ref{
TenantID: tenant,
TableName: table,
Bounds: md.Series.Bounds,
StartTimestamp: md.Series.FromTs,
EndTimestamp: md.Series.ThroughTs,
Checksum: md.Checksum,
}
return Block{
BlockRef: BlockRef{Ref: ref},
}
}
type BlockClient interface {
KeyResolver
GetBlock(ctx context.Context, ref BlockRef) (BlockDirectory, error)

Loading…
Cancel
Save