Blooms/contiguous bounds union (#12009)

more-release-testing
Owen Diehl 2 years ago committed by GitHub
parent b91bcecb3a
commit 09aa0f80b5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 3
      pkg/bloomcompactor/controller.go
  2. 8
      pkg/bloomcompactor/metrics.go
  3. 17
      pkg/storage/bloom/v1/bounds.go
  4. 17
      pkg/storage/bloom/v1/bounds_test.go
  5. 4
      pkg/storage/bloom/v1/index.go

@ -346,6 +346,7 @@ func (s *SimpleBloomController) buildGaps(
// Fetch blocks that aren't up to date but are in the desired fingerprint range
// to try and accelerate bloom creation
level.Debug(logger).Log("msg", "loading series and blocks for gap", "blocks", len(gap.blocks))
seriesItr, blocksIter, err := s.loadWorkForGap(ctx, table, tenant, plan.tsdb, gap)
if err != nil {
level.Error(logger).Log("msg", "failed to get series and blocks", "err", err)
@ -436,6 +437,8 @@ func (s *SimpleBloomController) buildGaps(
created = append(created, meta)
totalSeries += uint64(seriesItrWithCounter.Count())
s.metrics.blocksReused.Add(float64(len(gap.blocks)))
}
}

@ -32,6 +32,8 @@ type Metrics struct {
tenantsCompletedTime *prometheus.HistogramVec
tenantsSeries prometheus.Histogram
blocksReused prometheus.Counter
blocksCreated prometheus.Counter
blocksDeleted prometheus.Counter
metasCreated prometheus.Counter
@ -120,6 +122,12 @@ func NewMetrics(r prometheus.Registerer, bloomMetrics *v1.Metrics) *Metrics {
// Up to 10M series per tenant, way more than what we expect given our max_global_streams_per_user limits
Buckets: prometheus.ExponentialBucketsRange(1, 10000000, 10),
}),
blocksReused: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "blocks_reused_total",
Help: "Number of overlapping bloom blocks reused when creating new blocks",
}),
blocksCreated: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,

@ -125,10 +125,21 @@ func (b FingerprintBounds) Intersection(target FingerprintBounds) *FingerprintBo
// Union returns the union of the two bounds
func (b FingerprintBounds) Union(target FingerprintBounds) (res []FingerprintBounds) {
if !b.Overlaps(target) {
if b.Less(target) {
return []FingerprintBounds{b, target}
if target.Less(b) {
b, target = target, b
}
return []FingerprintBounds{target, b}
// special case: if the bounds are contiguous, merge them
if b.Max+1 == target.Min {
return []FingerprintBounds{
{
Min: min(b.Min, target.Min),
Max: max(b.Max, target.Max),
},
}
}
return []FingerprintBounds{b, target}
}
return []FingerprintBounds{

@ -82,20 +82,29 @@ func Test_FingerprintBounds_Intersection(t *testing.T) {
func Test_FingerprintBounds_Union(t *testing.T) {
t.Parallel()
target := NewBounds(10, 20)
assert.Equal(t, []FingerprintBounds{
{Min: 1, Max: 9},
{Min: 1, Max: 8},
{Min: 10, Max: 20},
}, NewBounds(1, 9).Union(target))
}, NewBounds(1, 8).Union(target))
assert.Equal(t, []FingerprintBounds{
{Min: 10, Max: 20},
{Min: 21, Max: 30},
}, NewBounds(21, 30).Union(target))
{Min: 22, Max: 30},
}, NewBounds(22, 30).Union(target))
assert.Equal(t, []FingerprintBounds{
{Min: 10, Max: 20},
}, NewBounds(10, 20).Union(target))
assert.Equal(t, []FingerprintBounds{
{Min: 5, Max: 20},
}, NewBounds(5, 15).Union(target))
// contiguous range, target before
assert.Equal(t, []FingerprintBounds{
{Min: 10, Max: 25},
}, NewBounds(21, 25).Union(target))
// contiguous range, target after
assert.Equal(t, []FingerprintBounds{
{Min: 5, Max: 20},
}, NewBounds(5, 9).Union(target))
}
func Test_FingerprintBounds_Unless(t *testing.T) {

@ -218,10 +218,6 @@ type SeriesHeader struct {
FromTs, ThroughTs model.Time
}
func (h SeriesHeader) OverlapFingerprintRange(other SeriesHeader) bool {
return h.Bounds.Overlaps(other.Bounds)
}
// build one aggregated header for the entire block
func aggregateHeaders(xs []SeriesHeader) SeriesHeader {
if len(xs) == 0 {

Loading…
Cancel
Save