Blooms/integration fixes (#11979)

pull/11989/head
Owen Diehl 2 years ago committed by GitHub
parent ffc61fbbf4
commit 85f7baaeda
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 14
      pkg/bloomcompactor/batch.go
  2. 15
      pkg/bloomcompactor/bloomcompactor.go
  3. 148
      pkg/bloomcompactor/controller.go
  4. 41
      pkg/bloomcompactor/metrics.go
  5. 7
      pkg/bloomcompactor/spec.go
  6. 3
      pkg/bloomcompactor/tsdb.go
  7. 3
      pkg/bloomgateway/util_test.go
  8. 15
      pkg/storage/bloom/v1/builder.go
  9. 3
      pkg/storage/bloom/v1/builder_test.go
  10. 4
      pkg/storage/bloom/v1/index.go
  11. 10
      pkg/storage/bloom/v1/metrics.go
  12. 4
      pkg/storage/stores/shipper/bloomshipper/client.go
  13. 6
      pkg/storage/stores/shipper/bloomshipper/client_test.go
  14. 3
      pkg/storage/stores/shipper/bloomshipper/fetcher_test.go
  15. 10
      pkg/storage/stores/shipper/bloomshipper/resolver.go
  16. 21
      pkg/storage/stores/shipper/bloomshipper/shipper.go
  17. 43
      pkg/storage/stores/shipper/bloomshipper/shipper_test.go
  18. 3
      pkg/storage/stores/shipper/bloomshipper/store_test.go
  19. 7
      pkg/validation/limits.go

@ -286,11 +286,10 @@ func (i *blockLoadingIter) loadNext() bool {
// check if there are more overlapping groups to load
if !i.overlapping.Next() {
i.iter = v1.NewEmptyIter[*v1.SeriesWithBloom]()
return false
}
if i.overlapping.Err() != nil {
i.err = i.overlapping.Err()
}
if i.overlapping.Err() != nil {
i.err = i.overlapping.Err()
return false
}
@ -300,7 +299,7 @@ func (i *blockLoadingIter) loadNext() bool {
filtered := v1.NewFilterIter[*bloomshipper.CloseableBlockQuerier](loader, i.filter)
iters := make([]v1.PeekingIterator[*v1.SeriesWithBloom], 0, len(blockRefs))
for filtered.Next() && filtered.Err() == nil {
for filtered.Next() {
bq := loader.At()
if _, ok := i.loaded[bq]; !ok {
i.loaded[bq] = struct{}{}
@ -309,8 +308,9 @@ func (i *blockLoadingIter) loadNext() bool {
iters = append(iters, iter)
}
if loader.Err() != nil {
i.err = loader.Err()
if err := filtered.Err(); err != nil {
i.err = err
i.iter = v1.NewEmptyIter[*v1.SeriesWithBloom]()
return false
}

@ -214,6 +214,7 @@ func (c *Compactor) ownsTenant(tenant string) (v1.FingerprintBounds, bool, error
// runs a single round of compaction for all relevant tenants and tables
func (c *Compactor) runOne(ctx context.Context) error {
level.Info(c.logger).Log("msg", "running bloom compaction", "workers", c.cfg.WorkerParallelism)
var workersErr error
var wg sync.WaitGroup
ch := make(chan tenantTable)
@ -226,7 +227,11 @@ func (c *Compactor) runOne(ctx context.Context) error {
err := c.loadWork(ctx, ch)
wg.Wait()
return multierror.New(workersErr, err, ctx.Err()).Err()
err = multierror.New(workersErr, err, ctx.Err()).Err()
if err != nil {
level.Error(c.logger).Log("msg", "compaction iteration failed", "err", err)
}
return err
}
func (c *Compactor) tables(ts time.Time) *dayRangeIterator {
@ -241,6 +246,7 @@ func (c *Compactor) tables(ts time.Time) *dayRangeIterator {
fromDay := config.NewDayTime(model.TimeFromUnixNano(from))
throughDay := config.NewDayTime(model.TimeFromUnixNano(through))
level.Debug(c.logger).Log("msg", "loaded tables for compaction", "from", fromDay, "through", throughDay)
return newDayRangeIterator(fromDay, throughDay, c.schemaCfg)
}
@ -250,6 +256,8 @@ func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error {
for tables.Next() && tables.Err() == nil && ctx.Err() == nil {
table := tables.At()
level.Debug(c.logger).Log("msg", "loading work for table", "table", table)
tenants, err := c.tenants(ctx, table)
if err != nil {
return errors.Wrap(err, "getting tenants")
@ -262,6 +270,7 @@ func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error {
if err != nil {
return errors.Wrap(err, "checking tenant ownership")
}
level.Debug(c.logger).Log("msg", "enqueueing work for tenant", "tenant", tenant, "table", table, "ownership", ownershipRange.String(), "owns", owns)
if !owns {
c.metrics.tenantsSkipped.Inc()
continue
@ -280,12 +289,14 @@ func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error {
}
if err := tenants.Err(); err != nil {
level.Error(c.logger).Log("msg", "error iterating tenants", "err", err)
return errors.Wrap(err, "iterating tenants")
}
}
if err := tables.Err(); err != nil {
level.Error(c.logger).Log("msg", "error iterating tables", "err", err)
return errors.Wrap(err, "iterating tables")
}
@ -330,7 +341,7 @@ func (c *Compactor) runWorkers(ctx context.Context, ch <-chan tenantTable) error
}
func (c *Compactor) compactTenantTable(ctx context.Context, tt tenantTable) error {
level.Info(c.logger).Log("msg", "compacting", "org_id", tt.tenant, "table", tt.table, "ownership", tt.ownershipRange)
level.Info(c.logger).Log("msg", "compacting", "org_id", tt.tenant, "table", tt.table, "ownership", tt.ownershipRange.String())
return c.controller.compactTenant(ctx, tt.table, tt.tenant, tt.ownershipRange)
}

@ -70,7 +70,7 @@ func (s *SimpleBloomController) compactTenant(
tenant string,
ownershipRange v1.FingerprintBounds,
) error {
logger := log.With(s.logger, "ownership", ownershipRange, "org_id", tenant, "table", table.Addr())
logger := log.With(s.logger, "org_id", tenant, "table", table.Addr(), "ownership", ownershipRange.String())
client, err := s.bloomStore.Client(table.ModelTime())
if err != nil {
@ -92,6 +92,15 @@ func (s *SimpleBloomController) compactTenant(
return errors.Wrap(err, "failed to get metas")
}
level.Debug(logger).Log("msg", "found relevant metas", "metas", len(metas))
// fetch all metas overlapping our ownership range so we can safely
// check which metas can be deleted even if they only partially overlap out ownership range
superset, err := s.fetchSuperSet(ctx, tenant, table, ownershipRange, metas, logger)
if err != nil {
return errors.Wrap(err, "failed to fetch superset")
}
// build compaction plans
work, err := s.findOutdatedGaps(ctx, tenant, table, ownershipRange, metas, logger)
if err != nil {
@ -104,6 +113,63 @@ func (s *SimpleBloomController) compactTenant(
return errors.Wrap(err, "failed to build gaps")
}
// combine built and superset metas
// in preparation for removing outdated ones
combined := append(superset, built...)
outdated := outdatedMetas(combined)
level.Debug(logger).Log("msg", "found outdated metas", "outdated", len(outdated))
var (
deletedMetas int
deletedBlocks int
)
defer func() {
s.metrics.metasDeleted.Add(float64(deletedMetas))
s.metrics.blocksDeleted.Add(float64(deletedBlocks))
}()
for _, meta := range outdated {
for _, block := range meta.Blocks {
err := client.DeleteBlocks(ctx, []bloomshipper.BlockRef{block})
if err != nil {
if client.IsObjectNotFoundErr(err) {
level.Debug(logger).Log("msg", "block not found while attempting delete, continuing", "block", block.String())
} else {
level.Error(logger).Log("msg", "failed to delete block", "err", err, "block", block.String())
return errors.Wrap(err, "failed to delete block")
}
}
deletedBlocks++
level.Debug(logger).Log("msg", "removed outdated block", "block", block.String())
}
err = client.DeleteMetas(ctx, []bloomshipper.MetaRef{meta.MetaRef})
if err != nil {
if client.IsObjectNotFoundErr(err) {
level.Debug(logger).Log("msg", "meta not found while attempting delete, continuing", "meta", meta.MetaRef.String())
} else {
level.Error(logger).Log("msg", "failed to delete meta", "err", err, "meta", meta.MetaRef.String())
return errors.Wrap(err, "failed to delete meta")
}
}
deletedMetas++
level.Debug(logger).Log("msg", "removed outdated meta", "meta", meta.MetaRef.String())
}
level.Debug(logger).Log("msg", "finished compaction")
return nil
}
// fetchSuperSet fetches all metas which overlap the ownership range of the first set of metas we've resolved
func (s *SimpleBloomController) fetchSuperSet(
ctx context.Context,
tenant string,
table config.DayTable,
ownershipRange v1.FingerprintBounds,
metas []bloomshipper.Meta,
logger log.Logger,
) ([]bloomshipper.Meta, error) {
// in order to delete outdates metas which only partially fall within the ownership range,
// we need to fetcha all metas in the entire bound range of the first set of metas we've resolved
/*
@ -121,12 +187,28 @@ func (s *SimpleBloomController) compactTenant(
union := superset.Union(meta.Bounds)
if len(union) > 1 {
level.Error(logger).Log("msg", "meta bounds union is not a single range", "union", union)
return errors.New("meta bounds union is not a single range")
return nil, errors.New("meta bounds union is not a single range")
}
superset = union[0]
}
metas, err = s.bloomStore.FetchMetas(
within := superset.Within(ownershipRange)
level.Debug(logger).Log(
"msg", "looking for superset metas",
"superset", superset.String(),
"superset_within", within,
)
if within {
// we don't need to fetch any more metas
// NB(owen-d): here we copy metas into the output. This is slightly inefficient, but
// helps prevent mutability bugs by returning the same slice as the input.
results := make([]bloomshipper.Meta, len(metas))
copy(results, metas)
return results, nil
}
supersetMetas, err := s.bloomStore.FetchMetas(
ctx,
bloomshipper.MetaSearchParams{
TenantID: tenant,
@ -134,42 +216,20 @@ func (s *SimpleBloomController) compactTenant(
Keyspace: superset,
},
)
if err != nil {
level.Error(logger).Log("msg", "failed to get meta superset range", "err", err, "superset", superset)
return errors.Wrap(err, "failed to get meta supseret range")
return nil, errors.Wrap(err, "failed to get meta supseret range")
}
// combine built and pre-existing metas
// in preparation for removing outdated metas
metas = append(metas, built...)
outdated := outdatedMetas(metas)
for _, meta := range outdated {
for _, block := range meta.Blocks {
if err := client.DeleteBlocks(ctx, []bloomshipper.BlockRef{block}); err != nil {
if client.IsObjectNotFoundErr(err) {
level.Debug(logger).Log("msg", "block not found while attempting delete, continuing", "block", block)
continue
}
level.Error(logger).Log("msg", "failed to delete blocks", "err", err)
return errors.Wrap(err, "failed to delete blocks")
}
}
if err := client.DeleteMetas(ctx, []bloomshipper.MetaRef{meta.MetaRef}); err != nil {
if client.IsObjectNotFoundErr(err) {
level.Debug(logger).Log("msg", "meta not found while attempting delete, continuing", "meta", meta.MetaRef)
} else {
level.Error(logger).Log("msg", "failed to delete metas", "err", err)
return errors.Wrap(err, "failed to delete metas")
}
}
}
level.Debug(logger).Log("msg", "finished compaction")
return nil
level.Debug(logger).Log(
"msg", "found superset metas",
"metas", len(metas),
"fresh_metas", len(supersetMetas),
"delta", len(supersetMetas)-len(metas),
)
return supersetMetas, nil
}
func (s *SimpleBloomController) findOutdatedGaps(
@ -271,6 +331,7 @@ func (s *SimpleBloomController) buildGaps(
for i := range plan.gaps {
gap := plan.gaps[i]
logger := log.With(logger, "gap", gap.bounds.String(), "tsdb", plan.tsdb.Name())
meta := bloomshipper.Meta{
MetaRef: bloomshipper.MetaRef{
@ -304,9 +365,11 @@ func (s *SimpleBloomController) buildGaps(
blocksIter,
s.rwFn,
s.metrics,
log.With(logger, "tsdb", plan.tsdb.Name(), "ownership", gap),
logger,
)
level.Debug(logger).Log("msg", "generating blocks", "overlapping_blocks", len(gap.blocks))
newBlocks := gen.Generate(ctx)
if err != nil {
level.Error(logger).Log("msg", "failed to generate bloom", "err", err)
@ -333,6 +396,16 @@ func (s *SimpleBloomController) buildGaps(
blocksIter.Close()
return nil, errors.Wrap(err, "failed to write block")
}
s.metrics.blocksCreated.Inc()
totalGapKeyspace := (gap.bounds.Max - gap.bounds.Min)
progress := (built.Bounds.Max - gap.bounds.Min)
pct := float64(progress) / float64(totalGapKeyspace) * 100
level.Debug(logger).Log(
"msg", "uploaded block",
"block", built.BlockRef.String(),
"progress_pct", fmt.Sprintf("%.2f", pct),
)
meta.Blocks = append(meta.Blocks, built.BlockRef)
}
@ -346,6 +419,7 @@ func (s *SimpleBloomController) buildGaps(
blocksIter.Close()
// Write the new meta
// TODO(owen-d): put total size in log, total time in metrics+log
ref, err := bloomshipper.MetaRefFrom(tenant, table.Addr(), gap.bounds, meta.Sources, meta.Blocks)
if err != nil {
level.Error(logger).Log("msg", "failed to checksum meta", "err", err)
@ -357,8 +431,10 @@ func (s *SimpleBloomController) buildGaps(
level.Error(logger).Log("msg", "failed to write meta", "err", err)
return nil, errors.Wrap(err, "failed to write meta")
}
created = append(created, meta)
s.metrics.metasCreated.Inc()
level.Debug(logger).Log("msg", "uploaded meta", "meta", meta.MetaRef.String())
created = append(created, meta)
totalSeries += uint64(seriesItrWithCounter.Count())
}
}

@ -31,6 +31,11 @@ type Metrics struct {
tenantsCompleted *prometheus.CounterVec
tenantsCompletedTime *prometheus.HistogramVec
tenantsSeries prometheus.Histogram
blocksCreated prometheus.Counter
blocksDeleted prometheus.Counter
metasCreated prometheus.Counter
metasDeleted prometheus.Counter
}
func NewMetrics(r prometheus.Registerer, bloomMetrics *v1.Metrics) *Metrics {
@ -53,13 +58,13 @@ func NewMetrics(r prometheus.Registerer, bloomMetrics *v1.Metrics) *Metrics {
compactionsStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "compactions_started",
Name: "compactions_started_total",
Help: "Total number of compactions started",
}),
compactionCompleted: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "compactions_completed",
Name: "compactions_completed_total",
Help: "Total number of compactions completed",
}, []string{"status"}),
compactionTime: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
@ -73,7 +78,7 @@ func NewMetrics(r prometheus.Registerer, bloomMetrics *v1.Metrics) *Metrics {
tenantsDiscovered: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "tenants_discovered",
Name: "tenants_discovered_total",
Help: "Number of tenants discovered during the current compaction run",
}),
tenantsOwned: promauto.With(r).NewCounter(prometheus.CounterOpts{
@ -85,19 +90,19 @@ func NewMetrics(r prometheus.Registerer, bloomMetrics *v1.Metrics) *Metrics {
tenantsSkipped: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "tenants_skipped",
Name: "tenants_skipped_total",
Help: "Number of tenants skipped since they are not owned by this instance",
}),
tenantsStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "tenants_started",
Name: "tenants_started_total",
Help: "Number of tenants started to process during the current compaction run",
}),
tenantsCompleted: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "tenants_completed",
Name: "tenants_completed_total",
Help: "Number of tenants successfully processed during the current compaction run",
}, []string{"status"}),
tenantsCompletedTime: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
@ -115,6 +120,30 @@ func NewMetrics(r prometheus.Registerer, bloomMetrics *v1.Metrics) *Metrics {
// Up to 10M series per tenant, way more than what we expect given our max_global_streams_per_user limits
Buckets: prometheus.ExponentialBucketsRange(1, 10000000, 10),
}),
blocksCreated: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "blocks_created_total",
Help: "Number of blocks created",
}),
blocksDeleted: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "blocks_deleted_total",
Help: "Number of blocks deleted",
}),
metasCreated: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "metas_created_total",
Help: "Number of metas created",
}),
metasDeleted: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "metas_deleted_total",
Help: "Number of metas deleted",
}),
}
return &m

@ -138,7 +138,7 @@ func (s *SimpleBloomGenerator) Generate(ctx context.Context) v1.Iterator[*v1.Blo
)
}
return NewLazyBlockBuilderIterator(ctx, s.opts, s.populator(ctx), s.readWriterFn, series, s.blocksIter)
return NewLazyBlockBuilderIterator(ctx, s.opts, s.metrics, s.populator(ctx), s.readWriterFn, series, s.blocksIter)
}
// LazyBlockBuilderIterator is a lazy iterator over blocks that builds
@ -146,6 +146,7 @@ func (s *SimpleBloomGenerator) Generate(ctx context.Context) v1.Iterator[*v1.Blo
type LazyBlockBuilderIterator struct {
ctx context.Context
opts v1.BlockOptions
metrics *Metrics
populate func(*v1.Series, *v1.Bloom) error
readWriterFn func() (v1.BlockWriter, v1.BlockReader)
series v1.PeekingIterator[*v1.Series]
@ -158,6 +159,7 @@ type LazyBlockBuilderIterator struct {
func NewLazyBlockBuilderIterator(
ctx context.Context,
opts v1.BlockOptions,
metrics *Metrics,
populate func(*v1.Series, *v1.Bloom) error,
readWriterFn func() (v1.BlockWriter, v1.BlockReader),
series v1.PeekingIterator[*v1.Series],
@ -166,6 +168,7 @@ func NewLazyBlockBuilderIterator(
return &LazyBlockBuilderIterator{
ctx: ctx,
opts: opts,
metrics: metrics,
populate: populate,
readWriterFn: readWriterFn,
series: series,
@ -189,7 +192,7 @@ func (b *LazyBlockBuilderIterator) Next() bool {
return false
}
mergeBuilder := v1.NewMergeBuilder(b.blocks, b.series, b.populate)
mergeBuilder := v1.NewMergeBuilder(b.blocks, b.series, b.populate, b.metrics.bloomMetrics)
writer, reader := b.readWriterFn()
blockBuilder, err := v1.NewBlockBuilder(b.opts, writer)
if err != nil {

@ -236,8 +236,7 @@ func NewTSDBStores(
if err != nil {
return nil, errors.Wrap(err, "failed to create object client")
}
prefix := path.Join(cfg.IndexTables.PathPrefix, cfg.IndexTables.Prefix)
res.stores[i] = NewBloomTSDBStore(storage.NewIndexStorageClient(c, prefix))
res.stores[i] = NewBloomTSDBStore(storage.NewIndexStorageClient(c, cfg.IndexTables.PathPrefix))
}
}

@ -323,8 +323,7 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time,
MetaRef: bloomshipper.MetaRef{
Ref: ref,
},
BlockTombstones: []bloomshipper.BlockRef{},
Blocks: []bloomshipper.BlockRef{blockRef},
Blocks: []bloomshipper.BlockRef{blockRef},
}
block, data, _ := v1.MakeBlock(t, n, fromFp, throughFp, from, through)
// Printing fingerprints and the log lines of its chunks comes handy for debugging...

@ -526,6 +526,7 @@ type MergeBuilder struct {
store Iterator[*Series]
// Add chunks to a bloom
populate func(*Series, *Bloom) error
metrics *Metrics
}
// NewMergeBuilder is a specific builder which does the following:
@ -536,11 +537,13 @@ func NewMergeBuilder(
blocks Iterator[*SeriesWithBloom],
store Iterator[*Series],
populate func(*Series, *Bloom) error,
metrics *Metrics,
) *MergeBuilder {
return &MergeBuilder{
blocks: blocks,
store: store,
populate: populate,
metrics: metrics,
}
}
@ -568,6 +571,8 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) {
nextInBlocks = deduped.At()
}
var chunksIndexed, chunksCopied int
cur := nextInBlocks
chunksToAdd := nextInStore.Chunks
// The next series from the store doesn't exist in the blocks, so we add it
@ -583,8 +588,11 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) {
} else {
// if the series already exists in the block, we only need to add the new chunks
chunksToAdd = nextInStore.Chunks.Unless(nextInBlocks.Series.Chunks)
chunksCopied = len(nextInStore.Chunks) - len(chunksToAdd)
}
chunksIndexed = len(chunksToAdd)
if len(chunksToAdd) > 0 {
if err := mb.populate(
&Series{
@ -597,6 +605,9 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) {
}
}
mb.metrics.chunksIndexed.WithLabelValues(chunkIndexedTypeIterated).Add(float64(chunksIndexed))
mb.metrics.chunksIndexed.WithLabelValues(chunkIndexedTypeCopied).Add(float64(chunksCopied))
blockFull, err := builder.AddSeries(*cur)
if err != nil {
return 0, errors.Wrap(err, "adding series to block")
@ -606,6 +617,10 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) {
}
}
if err := mb.store.Err(); err != nil {
return 0, errors.Wrap(err, "iterating store")
}
checksum, err := builder.Close()
if err != nil {
return 0, errors.Wrap(err, "closing block")

@ -226,7 +226,7 @@ func TestMergeBuilder(t *testing.T) {
)
// Ensure that the merge builder combines all the blocks correctly
mergeBuilder := NewMergeBuilder(dedupedBlocks(blocks), storeItr, pop)
mergeBuilder := NewMergeBuilder(dedupedBlocks(blocks), storeItr, pop, NewMetrics(nil))
indexBuf := bytes.NewBuffer(nil)
bloomsBuf := bytes.NewBuffer(nil)
writer := NewMemoryBlockWriter(indexBuf, bloomsBuf)
@ -400,6 +400,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) {
// We're not actually indexing new data in this test
return nil
},
NewMetrics(nil),
)
builder, err := NewBlockBuilder(DefaultBlockOptions, writer)
require.Nil(t, err)

@ -234,8 +234,8 @@ func aggregateHeaders(xs []SeriesHeader) SeriesHeader {
Bounds: NewBounds(fromFp, throughFP),
}
for _, x := range xs {
if x.FromTs < res.FromTs {
for i, x := range xs {
if i == 0 || x.FromTs < res.FromTs {
res.FromTs = x.FromTs
}
if x.ThroughTs > res.ThroughTs {

@ -10,12 +10,16 @@ type Metrics struct {
bloomSize prometheus.Histogram // size of the bloom filter in bytes
hammingWeightRatio prometheus.Histogram // ratio of the hamming weight of the bloom filter to the number of bits in the bloom filter
estimatedCount prometheus.Histogram // estimated number of elements in the bloom filter
chunksIndexed *prometheus.CounterVec
}
const chunkIndexedTypeIterated = "iterated"
const chunkIndexedTypeCopied = "copied"
func NewMetrics(r prometheus.Registerer) *Metrics {
return &Metrics{
sbfCreationTime: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "bloom_creation_time",
Name: "bloom_creation_time_total",
Help: "Time spent creating scalable bloom filters",
}),
bloomSize: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
@ -33,5 +37,9 @@ func NewMetrics(r prometheus.Registerer) *Metrics {
Help: "Estimated number of elements in the bloom filter",
Buckets: prometheus.ExponentialBucketsRange(1, 33554432, 10),
}),
chunksIndexed: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "bloom_chunks_indexed_total",
Help: "Number of chunks indexed in bloom filters, partitioned by type. Type can be iterated or copied, where iterated indicates the chunk data was fetched and ngrams for it's contents generated whereas copied indicates the chunk already existed in another source block and was copied to the new block",
}, []string{"type"}),
}
}

@ -88,10 +88,6 @@ type Meta struct {
// The specific TSDB files used to generate the block.
Sources []tsdb.SingleTenantTSDBIdentifier
// TODO(owen-d): remove, unused
// Old blocks which can be deleted in the future. These should be from previous compaction rounds.
BlockTombstones []BlockRef
// A list of blocks that were generated
Blocks []BlockRef
}

@ -63,8 +63,7 @@ func putMeta(c *BloomClient, tenant string, start model.Time, minFp, maxFp model
// EndTimestamp: start.Add(12 * time.Hour),
},
},
Blocks: []BlockRef{},
BlockTombstones: []BlockRef{},
Blocks: []BlockRef{},
}
raw, _ := json.Marshal(meta)
return meta, c.client.PutObject(context.Background(), c.Meta(meta.MetaRef).Addr(), bytes.NewReader(raw))
@ -129,8 +128,7 @@ func TestBloomClient_PutMeta(t *testing.T) {
// EndTimestamp: start.Add(12 * time.Hour),
},
},
Blocks: []BlockRef{},
BlockTombstones: []BlockRef{},
Blocks: []BlockRef{},
}
err := c.PutMeta(ctx, meta)

@ -34,8 +34,7 @@ func makeMetas(t *testing.T, schemaCfg config.SchemaConfig, ts model.Time, keysp
EndTimestamp: ts,
},
},
BlockTombstones: []BlockRef{},
Blocks: []BlockRef{},
Blocks: []BlockRef{},
}
}
return metas

@ -14,6 +14,9 @@ const (
BloomPrefix = "bloom"
MetasPrefix = "metas"
BlocksPrefix = "blocks"
extTarGz = ".tar.gz"
extJSON = ".json"
)
// KeyResolver is an interface for resolving keys to locations.
@ -36,7 +39,7 @@ func (defaultKeyResolver) Meta(ref MetaRef) Location {
fmt.Sprintf("%v", ref.TableName),
ref.TenantID,
MetasPrefix,
fmt.Sprintf("%v-%v", ref.Bounds, ref.Checksum),
fmt.Sprintf("%v-%x%s", ref.Bounds, ref.Checksum, extJSON),
}
}
@ -50,7 +53,8 @@ func (defaultKeyResolver) ParseMetaKey(loc Location) (MetaRef, error) {
if err != nil {
return MetaRef{}, fmt.Errorf("failed to parse bounds of meta key %s : %w", loc, err)
}
checksum, err := strconv.ParseUint(fnParts[2], 16, 64)
withoutExt := strings.TrimSuffix(fnParts[2], extJSON)
checksum, err := strconv.ParseUint(withoutExt, 16, 64)
if err != nil {
return MetaRef{}, fmt.Errorf("failed to parse checksum of meta key %s : %w", loc, err)
}
@ -77,7 +81,7 @@ func (defaultKeyResolver) Block(ref BlockRef) Location {
ref.TenantID,
BlocksPrefix,
ref.Bounds.String(),
fmt.Sprintf("%d-%d-%x", ref.StartTimestamp, ref.EndTimestamp, ref.Checksum),
fmt.Sprintf("%d-%d-%x%s", ref.StartTimestamp, ref.EndTimestamp, ref.Checksum, extTarGz),
}
}

@ -55,30 +55,15 @@ func (s *Shipper) Stop() {
}
// BlocksForMetas returns all the blocks from all the metas listed that are within the requested bounds
// and not tombstoned in any of the metas
func BlocksForMetas(metas []Meta, interval Interval, keyspaces []v1.FingerprintBounds) []BlockRef {
blocks := make(map[BlockRef]bool) // block -> isTombstoned
func BlocksForMetas(metas []Meta, interval Interval, keyspaces []v1.FingerprintBounds) (refs []BlockRef) {
for _, meta := range metas {
for _, tombstone := range meta.BlockTombstones {
blocks[tombstone] = true
}
for _, block := range meta.Blocks {
tombstoned, ok := blocks[block]
if ok && tombstoned {
// skip tombstoned blocks
continue
if !isOutsideRange(block, interval, keyspaces) {
refs = append(refs, block)
}
blocks[block] = false
}
}
refs := make([]BlockRef, 0, len(blocks))
for ref, tombstoned := range blocks {
if !tombstoned && !isOutsideRange(ref, interval, keyspaces) {
refs = append(refs, ref)
}
}
sort.Slice(refs, func(i, j int) bool {
return refs[i].Bounds.Less(refs[j].Bounds)
})

@ -14,49 +14,6 @@ import (
)
func TestBloomShipper_findBlocks(t *testing.T) {
t.Run("expected block that are specified in tombstones to be filtered out", func(t *testing.T) {
metas := []Meta{
{
Blocks: []BlockRef{
//this blockRef is marked as deleted in the next meta
createMatchingBlockRef(1),
createMatchingBlockRef(2),
},
},
{
Blocks: []BlockRef{
//this blockRef is marked as deleted in the next meta
createMatchingBlockRef(3),
createMatchingBlockRef(4),
},
},
{
BlockTombstones: []BlockRef{
createMatchingBlockRef(1),
createMatchingBlockRef(3),
},
Blocks: []BlockRef{
createMatchingBlockRef(5),
},
},
}
ts := model.Now()
interval := NewInterval(
ts.Add(-2*time.Hour),
ts.Add(-1*time.Hour),
)
blocks := BlocksForMetas(metas, interval, []v1.FingerprintBounds{{Min: 100, Max: 200}})
expectedBlockRefs := []BlockRef{
createMatchingBlockRef(2),
createMatchingBlockRef(4),
createMatchingBlockRef(5),
}
require.ElementsMatch(t, expectedBlockRefs, blocks)
})
tests := map[string]struct {
minFingerprint uint64
maxFingerprint uint64

@ -83,8 +83,7 @@ func createMetaInStorage(store *BloomStore, tenant string, start model.Time, min
// EndTimestamp: start.Add(12 * time.Hour),
},
},
Blocks: []BlockRef{},
BlockTombstones: []BlockRef{},
Blocks: []BlockRef{},
}
err := store.storeDo(start, func(s *bloomStoreEntry) error {
raw, _ := json.Marshal(meta)

@ -339,7 +339,12 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.BloomGatewayBlocksDownloadingParallelism, "bloom-gateway.blocks-downloading-parallelism", 50, "Maximum number of blocks will be downloaded in parallel by the Bloom Gateway.")
f.DurationVar(&l.BloomGatewayCacheKeyInterval, "bloom-gateway.cache-key-interval", 15*time.Minute, "Interval for computing the cache key in the Bloom Gateway.")
_ = l.BloomCompactorMaxBlockSize.Set(defaultBloomCompactorMaxBlockSize)
f.Var(&l.BloomCompactorMaxBlockSize, "bloom-compactor.max-block-size", "The maximum bloom block size. A value of 0 sets an unlimited size. Default is 200MB. The actual block size might exceed this limit since blooms will be added to blocks until the block exceeds the maximum block size.")
f.Var(&l.BloomCompactorMaxBlockSize, "bloom-compactor.max-block-size",
fmt.Sprintf(
"The maximum bloom block size. A value of 0 sets an unlimited size. Default is %s. The actual block size might exceed this limit since blooms will be added to blocks until the block exceeds the maximum block size.",
defaultBloomCompactorMaxBlockSize,
),
)
l.ShardStreams = &shardstreams.Config{}
l.ShardStreams.RegisterFlagsWithPrefix("shard-streams", f)

Loading…
Cancel
Save