(Blooms) Add metrics to compactor (#11966)

pull/11972/head^2
Salva Corts 1 year ago committed by GitHub
parent 443720f47b
commit 543aaab055
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 21
      pkg/bloomcompactor/bloomcompactor.go
  2. 15
      pkg/bloomcompactor/controller.go
  3. 113
      pkg/bloomcompactor/metrics.go
  4. 26
      pkg/storage/bloom/v1/util.go
  5. 26
      pkg/storage/bloom/v1/util_test.go

@ -109,9 +109,7 @@ func New(
c.logger, c.logger,
) )
c.metrics.compactionRunInterval.Set(cfg.CompactionInterval.Seconds())
c.Service = services.NewBasicService(c.starting, c.running, c.stopping) c.Service = services.NewBasicService(c.starting, c.running, c.stopping)
return c, nil return c, nil
} }
@ -138,11 +136,17 @@ func (c *Compactor) running(ctx context.Context) error {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case <-ticker.C: case start := <-ticker.C:
c.metrics.compactionsStarted.Inc()
if err := c.runOne(ctx); err != nil { if err := c.runOne(ctx); err != nil {
level.Error(c.logger).Log("msg", "compaction iteration failed", "err", err) level.Error(c.logger).Log("msg", "compaction iteration failed", "err", err, "duration", time.Since(start))
c.metrics.compactionCompleted.WithLabelValues(statusFailure).Inc()
c.metrics.compactionTime.WithLabelValues(statusFailure).Observe(time.Since(start).Seconds())
return err return err
} }
level.Info(c.logger).Log("msg", "compaction iteration completed", "duration", time.Since(start))
c.metrics.compactionCompleted.WithLabelValues(statusSuccess).Inc()
c.metrics.compactionTime.WithLabelValues(statusSuccess).Observe(time.Since(start).Seconds())
} }
} }
} }
@ -252,14 +256,17 @@ func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error {
} }
for tenants.Next() && tenants.Err() == nil && ctx.Err() == nil { for tenants.Next() && tenants.Err() == nil && ctx.Err() == nil {
c.metrics.tenantsDiscovered.Inc()
tenant := tenants.At() tenant := tenants.At()
ownershipRange, owns, err := c.ownsTenant(tenant) ownershipRange, owns, err := c.ownsTenant(tenant)
if err != nil { if err != nil {
return errors.Wrap(err, "checking tenant ownership") return errors.Wrap(err, "checking tenant ownership")
} }
if !owns { if !owns {
c.metrics.tenantsSkipped.Inc()
continue continue
} }
c.metrics.tenantsOwned.Inc()
select { select {
case ch <- tenantTable{tenant: tenant, table: table, ownershipRange: ownershipRange}: case ch <- tenantTable{tenant: tenant, table: table, ownershipRange: ownershipRange}:
@ -296,7 +303,11 @@ func (c *Compactor) runWorkers(ctx context.Context, ch <-chan tenantTable) error
return nil return nil
} }
start := time.Now()
c.metrics.tenantsStarted.Inc()
if err := c.compactTenantTable(ctx, tt); err != nil { if err := c.compactTenantTable(ctx, tt); err != nil {
c.metrics.tenantsCompleted.WithLabelValues(statusFailure).Inc()
c.metrics.tenantsCompletedTime.WithLabelValues(statusFailure).Observe(time.Since(start).Seconds())
return errors.Wrapf( return errors.Wrapf(
err, err,
"compacting tenant table (%s) for tenant (%s) with ownership (%s)", "compacting tenant table (%s) for tenant (%s) with ownership (%s)",
@ -305,6 +316,8 @@ func (c *Compactor) runWorkers(ctx context.Context, ch <-chan tenantTable) error
tt.ownershipRange, tt.ownershipRange,
) )
} }
c.metrics.tenantsCompleted.WithLabelValues(statusSuccess).Inc()
c.metrics.tenantsCompletedTime.WithLabelValues(statusSuccess).Observe(time.Since(start).Seconds())
} }
} }

@ -25,7 +25,6 @@ type SimpleBloomController struct {
metrics *Metrics metrics *Metrics
limits Limits limits Limits
// TODO(owen-d): add metrics
logger log.Logger logger log.Logger
} }
@ -269,6 +268,7 @@ func (s *SimpleBloomController) buildGaps(
maxBlockSize = uint64(s.limits.BloomCompactorMaxBlockSize(tenant)) maxBlockSize = uint64(s.limits.BloomCompactorMaxBlockSize(tenant))
blockOpts = v1.NewBlockOptions(nGramSize, nGramSkip, maxBlockSize) blockOpts = v1.NewBlockOptions(nGramSize, nGramSkip, maxBlockSize)
created []bloomshipper.Meta created []bloomshipper.Meta
totalSeries uint64
) )
for _, plan := range work { for _, plan := range work {
@ -295,10 +295,15 @@ func (s *SimpleBloomController) buildGaps(
return nil, errors.Wrap(err, "failed to get series and blocks") return nil, errors.Wrap(err, "failed to get series and blocks")
} }
// Blocks are built consuming the series iterator. For observability, we wrap the series iterator
// with a counter iterator to count the number of times Next() is called on it.
// This is used to observe the number of series that are being processed.
seriesItrWithCounter := v1.NewCounterIter[*v1.Series](seriesItr)
gen := NewSimpleBloomGenerator( gen := NewSimpleBloomGenerator(
tenant, tenant,
blockOpts, blockOpts,
seriesItr, seriesItrWithCounter,
s.chunkLoader, s.chunkLoader,
blocksIter, blocksIter,
s.rwFn, s.rwFn,
@ -307,9 +312,7 @@ func (s *SimpleBloomController) buildGaps(
) )
_, loaded, newBlocks, err := gen.Generate(ctx) _, loaded, newBlocks, err := gen.Generate(ctx)
if err != nil { if err != nil {
// TODO(owen-d): metrics
level.Error(logger).Log("msg", "failed to generate bloom", "err", err) level.Error(logger).Log("msg", "failed to generate bloom", "err", err)
s.closeLoadedBlocks(loaded, blocksIter) s.closeLoadedBlocks(loaded, blocksIter)
return nil, errors.Wrap(err, "failed to generate bloom") return nil, errors.Wrap(err, "failed to generate bloom")
@ -338,7 +341,6 @@ func (s *SimpleBloomController) buildGaps(
} }
if err := newBlocks.Err(); err != nil { if err := newBlocks.Err(); err != nil {
// TODO(owen-d): metrics
level.Error(logger).Log("msg", "failed to generate bloom", "err", err) level.Error(logger).Log("msg", "failed to generate bloom", "err", err)
s.closeLoadedBlocks(loaded, blocksIter) s.closeLoadedBlocks(loaded, blocksIter)
return nil, errors.Wrap(err, "failed to generate bloom") return nil, errors.Wrap(err, "failed to generate bloom")
@ -360,9 +362,12 @@ func (s *SimpleBloomController) buildGaps(
return nil, errors.Wrap(err, "failed to write meta") return nil, errors.Wrap(err, "failed to write meta")
} }
created = append(created, meta) created = append(created, meta)
totalSeries += uint64(seriesItrWithCounter.Count())
} }
} }
s.metrics.tenantsSeries.Observe(float64(totalSeries))
level.Debug(logger).Log("msg", "finished bloom generation", "blocks", blockCt, "tsdbs", tsdbCt) level.Debug(logger).Log("msg", "finished bloom generation", "blocks", blockCt, "tsdbs", tsdbCt)
return created, nil return created, nil
} }

@ -16,105 +16,104 @@ const (
) )
type Metrics struct { type Metrics struct {
bloomMetrics *v1.Metrics bloomMetrics *v1.Metrics
chunkSize prometheus.Histogram // uncompressed size of all chunks summed per series compactorRunning prometheus.Gauge
chunkSize prometheus.Histogram // uncompressed size of all chunks summed per series
compactionRunsStarted prometheus.Counter compactionsStarted prometheus.Counter
compactionRunsCompleted *prometheus.CounterVec compactionCompleted *prometheus.CounterVec
compactionRunTime *prometheus.HistogramVec compactionTime *prometheus.HistogramVec
compactionRunDiscoveredTenants prometheus.Counter
compactionRunSkippedTenants prometheus.Counter tenantsDiscovered prometheus.Counter
compactionRunTenantsCompleted *prometheus.CounterVec tenantsOwned prometheus.Counter
compactionRunTenantsTime *prometheus.HistogramVec tenantsSkipped prometheus.Counter
compactionRunJobStarted prometheus.Counter tenantsStarted prometheus.Counter
compactionRunJobCompleted *prometheus.CounterVec tenantsCompleted *prometheus.CounterVec
compactionRunJobTime *prometheus.HistogramVec tenantsCompletedTime *prometheus.HistogramVec
compactionRunInterval prometheus.Gauge tenantsSeries prometheus.Histogram
compactorRunning prometheus.Gauge
} }
func NewMetrics(r prometheus.Registerer, bloomMetrics *v1.Metrics) *Metrics { func NewMetrics(r prometheus.Registerer, bloomMetrics *v1.Metrics) *Metrics {
m := Metrics{ m := Metrics{
bloomMetrics: bloomMetrics, bloomMetrics: bloomMetrics,
compactorRunning: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "running",
Help: "Value will be 1 if compactor is currently running on this instance",
}),
chunkSize: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ chunkSize: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Name: "bloom_chunk_series_size", Namespace: metricsNamespace,
Help: "Uncompressed size of chunks in a series", Subsystem: metricsSubsystem,
Buckets: prometheus.ExponentialBucketsRange(1024, 1073741824, 10), Name: "chunk_series_size",
Help: "Uncompressed size of chunks in a series",
Buckets: prometheus.ExponentialBucketsRange(1024, 1073741824, 10),
}), }),
compactionRunsStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{
compactionsStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace, Namespace: metricsNamespace,
Subsystem: metricsSubsystem, Subsystem: metricsSubsystem,
Name: "runs_started_total", Name: "compactions_started",
Help: "Total number of compactions started", Help: "Total number of compactions started",
}), }),
compactionRunsCompleted: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ compactionCompleted: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace, Namespace: metricsNamespace,
Subsystem: metricsSubsystem, Subsystem: metricsSubsystem,
Name: "runs_completed_total", Name: "compactions_completed",
Help: "Total number of compactions completed successfully", Help: "Total number of compactions completed",
}, []string{"status"}), }, []string{"status"}),
compactionRunTime: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ compactionTime: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace, Namespace: metricsNamespace,
Subsystem: metricsSubsystem, Subsystem: metricsSubsystem,
Name: "runs_time_seconds", Name: "compactions_time_seconds",
Help: "Time spent during a compaction cycle.", Help: "Time spent during a compaction cycle.",
Buckets: prometheus.DefBuckets, Buckets: prometheus.DefBuckets,
}, []string{"status"}), }, []string{"status"}),
compactionRunDiscoveredTenants: promauto.With(r).NewCounter(prometheus.CounterOpts{
tenantsDiscovered: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace, Namespace: metricsNamespace,
Subsystem: metricsSubsystem, Subsystem: metricsSubsystem,
Name: "tenants_discovered", Name: "tenants_discovered",
Help: "Number of tenants discovered during the current compaction run", Help: "Number of tenants discovered during the current compaction run",
}), }),
compactionRunSkippedTenants: promauto.With(r).NewCounter(prometheus.CounterOpts{ tenantsOwned: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace, Namespace: metricsNamespace,
Subsystem: metricsSubsystem, Subsystem: metricsSubsystem,
Name: "tenants_skipped", Name: "tenants_owned",
Help: "Number of tenants skipped during the current compaction run", Help: "Number of tenants owned by this instance",
}), }),
compactionRunTenantsCompleted: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ tenantsSkipped: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "tenants_completed",
Help: "Number of tenants successfully processed during the current compaction run",
}, []string{"status"}),
compactionRunTenantsTime: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace, Namespace: metricsNamespace,
Subsystem: metricsSubsystem, Subsystem: metricsSubsystem,
Name: "tenants_time_seconds", Name: "tenants_skipped",
Help: "Time spent processing tenants.", Help: "Number of tenants skipped since they are not owned by this instance",
Buckets: prometheus.DefBuckets, }),
}, []string{"status"}), tenantsStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{
compactionRunJobStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace, Namespace: metricsNamespace,
Subsystem: metricsSubsystem, Subsystem: metricsSubsystem,
Name: "job_started", Name: "tenants_started",
Help: "Number of jobs started processing during the current compaction run", Help: "Number of tenants started to process during the current compaction run",
}), }),
compactionRunJobCompleted: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ tenantsCompleted: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace, Namespace: metricsNamespace,
Subsystem: metricsSubsystem, Subsystem: metricsSubsystem,
Name: "job_completed", Name: "tenants_completed",
Help: "Number of jobs successfully processed during the current compaction run", Help: "Number of tenants successfully processed during the current compaction run",
}, []string{"status"}), }, []string{"status"}),
compactionRunJobTime: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ tenantsCompletedTime: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace, Namespace: metricsNamespace,
Subsystem: metricsSubsystem, Subsystem: metricsSubsystem,
Name: "job_time_seconds", Name: "tenants_time_seconds",
Help: "Time spent processing jobs.", Help: "Time spent processing tenants.",
Buckets: prometheus.DefBuckets, Buckets: prometheus.DefBuckets,
}, []string{"status"}), }, []string{"status"}),
compactionRunInterval: promauto.With(r).NewGauge(prometheus.GaugeOpts{ tenantsSeries: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "compaction_interval_seconds",
Help: "The configured interval on which compaction is run in seconds",
}),
compactorRunning: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: metricsNamespace, Namespace: metricsNamespace,
Subsystem: metricsSubsystem, Subsystem: metricsSubsystem,
Name: "running", Name: "tenants_series",
Help: "Value will be 1 if compactor is currently running on this instance", Help: "Number of series processed per tenant in the owned fingerprint-range.",
// Up to 10M series per tenant, way more than what we expect given our max_global_streams_per_user limits
Buckets: prometheus.ExponentialBucketsRange(1, 10000000, 10),
}), }),
} }

@ -276,3 +276,29 @@ func NewPeekCloseIter[T any](itr CloseableIterator[T]) *PeekCloseIter[T] {
func (it *PeekCloseIter[T]) Close() error { func (it *PeekCloseIter[T]) Close() error {
return it.close() return it.close()
} }
type CounterIterator[T any] interface {
Iterator[T]
Count() int
}
type CounterIter[T any] struct {
Iterator[T] // the underlying iterator
count int
}
func NewCounterIter[T any](itr Iterator[T]) *CounterIter[T] {
return &CounterIter[T]{Iterator: itr}
}
func (it *CounterIter[T]) Next() bool {
if it.Iterator.Next() {
it.count++
return true
}
return false
}
func (it *CounterIter[T]) Count() int {
return it.count
}

@ -26,3 +26,29 @@ func TestPeekingIterator(t *testing.T) {
require.False(t, itr.Next()) require.False(t, itr.Next())
} }
func TestCounterIter(t *testing.T) {
t.Parallel()
data := []int{1, 2, 3, 4, 5}
itr := NewCounterIter[int](NewSliceIter[int](data))
peekItr := NewPeekingIter[int](itr)
// Consume the outer iter and use peek
for {
if _, ok := peekItr.Peek(); !ok {
break
}
if !peekItr.Next() {
break
}
}
// Both iterators should be exhausted
require.False(t, itr.Next())
require.Nil(t, itr.Err())
require.False(t, peekItr.Next())
require.Nil(t, peekItr.Err())
// Assert that the count is correct and peeking hasn't jeopardized the count
require.Equal(t, len(data), itr.Count())
}

Loading…
Cancel
Save