diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index 5a579f95fd..e8dc880f9d 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -109,9 +109,7 @@ func New( c.logger, ) - c.metrics.compactionRunInterval.Set(cfg.CompactionInterval.Seconds()) c.Service = services.NewBasicService(c.starting, c.running, c.stopping) - return c, nil } @@ -138,11 +136,17 @@ func (c *Compactor) running(ctx context.Context) error { case <-ctx.Done(): return ctx.Err() - case <-ticker.C: + case start := <-ticker.C: + c.metrics.compactionsStarted.Inc() if err := c.runOne(ctx); err != nil { - level.Error(c.logger).Log("msg", "compaction iteration failed", "err", err) + level.Error(c.logger).Log("msg", "compaction iteration failed", "err", err, "duration", time.Since(start)) + c.metrics.compactionCompleted.WithLabelValues(statusFailure).Inc() + c.metrics.compactionTime.WithLabelValues(statusFailure).Observe(time.Since(start).Seconds()) return err } + level.Info(c.logger).Log("msg", "compaction iteration completed", "duration", time.Since(start)) + c.metrics.compactionCompleted.WithLabelValues(statusSuccess).Inc() + c.metrics.compactionTime.WithLabelValues(statusSuccess).Observe(time.Since(start).Seconds()) } } } @@ -252,14 +256,17 @@ func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error { } for tenants.Next() && tenants.Err() == nil && ctx.Err() == nil { + c.metrics.tenantsDiscovered.Inc() tenant := tenants.At() ownershipRange, owns, err := c.ownsTenant(tenant) if err != nil { return errors.Wrap(err, "checking tenant ownership") } if !owns { + c.metrics.tenantsSkipped.Inc() continue } + c.metrics.tenantsOwned.Inc() select { case ch <- tenantTable{tenant: tenant, table: table, ownershipRange: ownershipRange}: @@ -296,7 +303,11 @@ func (c *Compactor) runWorkers(ctx context.Context, ch <-chan tenantTable) error return nil } + start := time.Now() + c.metrics.tenantsStarted.Inc() if err := c.compactTenantTable(ctx, tt); err != nil { + c.metrics.tenantsCompleted.WithLabelValues(statusFailure).Inc() + c.metrics.tenantsCompletedTime.WithLabelValues(statusFailure).Observe(time.Since(start).Seconds()) return errors.Wrapf( err, "compacting tenant table (%s) for tenant (%s) with ownership (%s)", @@ -305,6 +316,8 @@ func (c *Compactor) runWorkers(ctx context.Context, ch <-chan tenantTable) error tt.ownershipRange, ) } + c.metrics.tenantsCompleted.WithLabelValues(statusSuccess).Inc() + c.metrics.tenantsCompletedTime.WithLabelValues(statusSuccess).Observe(time.Since(start).Seconds()) } } diff --git a/pkg/bloomcompactor/controller.go b/pkg/bloomcompactor/controller.go index cc801dc27e..089ab800c7 100644 --- a/pkg/bloomcompactor/controller.go +++ b/pkg/bloomcompactor/controller.go @@ -25,7 +25,6 @@ type SimpleBloomController struct { metrics *Metrics limits Limits - // TODO(owen-d): add metrics logger log.Logger } @@ -269,6 +268,7 @@ func (s *SimpleBloomController) buildGaps( maxBlockSize = uint64(s.limits.BloomCompactorMaxBlockSize(tenant)) blockOpts = v1.NewBlockOptions(nGramSize, nGramSkip, maxBlockSize) created []bloomshipper.Meta + totalSeries uint64 ) for _, plan := range work { @@ -295,10 +295,15 @@ func (s *SimpleBloomController) buildGaps( return nil, errors.Wrap(err, "failed to get series and blocks") } + // Blocks are built consuming the series iterator. For observability, we wrap the series iterator + // with a counter iterator to count the number of times Next() is called on it. + // This is used to observe the number of series that are being processed. + seriesItrWithCounter := v1.NewCounterIter[*v1.Series](seriesItr) + gen := NewSimpleBloomGenerator( tenant, blockOpts, - seriesItr, + seriesItrWithCounter, s.chunkLoader, blocksIter, s.rwFn, @@ -307,9 +312,7 @@ func (s *SimpleBloomController) buildGaps( ) _, loaded, newBlocks, err := gen.Generate(ctx) - if err != nil { - // TODO(owen-d): metrics level.Error(logger).Log("msg", "failed to generate bloom", "err", err) s.closeLoadedBlocks(loaded, blocksIter) return nil, errors.Wrap(err, "failed to generate bloom") @@ -338,7 +341,6 @@ func (s *SimpleBloomController) buildGaps( } if err := newBlocks.Err(); err != nil { - // TODO(owen-d): metrics level.Error(logger).Log("msg", "failed to generate bloom", "err", err) s.closeLoadedBlocks(loaded, blocksIter) return nil, errors.Wrap(err, "failed to generate bloom") @@ -360,9 +362,12 @@ func (s *SimpleBloomController) buildGaps( return nil, errors.Wrap(err, "failed to write meta") } created = append(created, meta) + + totalSeries += uint64(seriesItrWithCounter.Count()) } } + s.metrics.tenantsSeries.Observe(float64(totalSeries)) level.Debug(logger).Log("msg", "finished bloom generation", "blocks", blockCt, "tsdbs", tsdbCt) return created, nil } diff --git a/pkg/bloomcompactor/metrics.go b/pkg/bloomcompactor/metrics.go index b02ac32aca..350e3ed7e4 100644 --- a/pkg/bloomcompactor/metrics.go +++ b/pkg/bloomcompactor/metrics.go @@ -16,105 +16,104 @@ const ( ) type Metrics struct { - bloomMetrics *v1.Metrics - chunkSize prometheus.Histogram // uncompressed size of all chunks summed per series + bloomMetrics *v1.Metrics + compactorRunning prometheus.Gauge + chunkSize prometheus.Histogram // uncompressed size of all chunks summed per series - compactionRunsStarted prometheus.Counter - compactionRunsCompleted *prometheus.CounterVec - compactionRunTime *prometheus.HistogramVec - compactionRunDiscoveredTenants prometheus.Counter - compactionRunSkippedTenants prometheus.Counter - compactionRunTenantsCompleted *prometheus.CounterVec - compactionRunTenantsTime *prometheus.HistogramVec - compactionRunJobStarted prometheus.Counter - compactionRunJobCompleted *prometheus.CounterVec - compactionRunJobTime *prometheus.HistogramVec - compactionRunInterval prometheus.Gauge - compactorRunning prometheus.Gauge + compactionsStarted prometheus.Counter + compactionCompleted *prometheus.CounterVec + compactionTime *prometheus.HistogramVec + + tenantsDiscovered prometheus.Counter + tenantsOwned prometheus.Counter + tenantsSkipped prometheus.Counter + tenantsStarted prometheus.Counter + tenantsCompleted *prometheus.CounterVec + tenantsCompletedTime *prometheus.HistogramVec + tenantsSeries prometheus.Histogram } func NewMetrics(r prometheus.Registerer, bloomMetrics *v1.Metrics) *Metrics { m := Metrics{ bloomMetrics: bloomMetrics, + compactorRunning: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "running", + Help: "Value will be 1 if compactor is currently running on this instance", + }), chunkSize: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ - Name: "bloom_chunk_series_size", - Help: "Uncompressed size of chunks in a series", - Buckets: prometheus.ExponentialBucketsRange(1024, 1073741824, 10), + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "chunk_series_size", + Help: "Uncompressed size of chunks in a series", + Buckets: prometheus.ExponentialBucketsRange(1024, 1073741824, 10), }), - compactionRunsStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{ + + compactionsStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, - Name: "runs_started_total", + Name: "compactions_started", Help: "Total number of compactions started", }), - compactionRunsCompleted: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + compactionCompleted: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, - Name: "runs_completed_total", - Help: "Total number of compactions completed successfully", + Name: "compactions_completed", + Help: "Total number of compactions completed", }, []string{"status"}), - compactionRunTime: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + compactionTime: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, - Name: "runs_time_seconds", + Name: "compactions_time_seconds", Help: "Time spent during a compaction cycle.", Buckets: prometheus.DefBuckets, }, []string{"status"}), - compactionRunDiscoveredTenants: promauto.With(r).NewCounter(prometheus.CounterOpts{ + + tenantsDiscovered: promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, Name: "tenants_discovered", Help: "Number of tenants discovered during the current compaction run", }), - compactionRunSkippedTenants: promauto.With(r).NewCounter(prometheus.CounterOpts{ + tenantsOwned: promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, - Name: "tenants_skipped", - Help: "Number of tenants skipped during the current compaction run", + Name: "tenants_owned", + Help: "Number of tenants owned by this instance", }), - compactionRunTenantsCompleted: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ - Namespace: metricsNamespace, - Subsystem: metricsSubsystem, - Name: "tenants_completed", - Help: "Number of tenants successfully processed during the current compaction run", - }, []string{"status"}), - compactionRunTenantsTime: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + tenantsSkipped: promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, - Name: "tenants_time_seconds", - Help: "Time spent processing tenants.", - Buckets: prometheus.DefBuckets, - }, []string{"status"}), - compactionRunJobStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "tenants_skipped", + Help: "Number of tenants skipped since they are not owned by this instance", + }), + tenantsStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, - Name: "job_started", - Help: "Number of jobs started processing during the current compaction run", + Name: "tenants_started", + Help: "Number of tenants started to process during the current compaction run", }), - compactionRunJobCompleted: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + tenantsCompleted: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, - Name: "job_completed", - Help: "Number of jobs successfully processed during the current compaction run", + Name: "tenants_completed", + Help: "Number of tenants successfully processed during the current compaction run", }, []string{"status"}), - compactionRunJobTime: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + tenantsCompletedTime: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, - Name: "job_time_seconds", - Help: "Time spent processing jobs.", + Name: "tenants_time_seconds", + Help: "Time spent processing tenants.", Buckets: prometheus.DefBuckets, }, []string{"status"}), - compactionRunInterval: promauto.With(r).NewGauge(prometheus.GaugeOpts{ - Namespace: metricsNamespace, - Subsystem: metricsSubsystem, - Name: "compaction_interval_seconds", - Help: "The configured interval on which compaction is run in seconds", - }), - compactorRunning: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + tenantsSeries: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, - Name: "running", - Help: "Value will be 1 if compactor is currently running on this instance", + Name: "tenants_series", + Help: "Number of series processed per tenant in the owned fingerprint-range.", + // Up to 10M series per tenant, way more than what we expect given our max_global_streams_per_user limits + Buckets: prometheus.ExponentialBucketsRange(1, 10000000, 10), }), } diff --git a/pkg/storage/bloom/v1/util.go b/pkg/storage/bloom/v1/util.go index 3b9e0631b7..67c0087a0d 100644 --- a/pkg/storage/bloom/v1/util.go +++ b/pkg/storage/bloom/v1/util.go @@ -276,3 +276,29 @@ func NewPeekCloseIter[T any](itr CloseableIterator[T]) *PeekCloseIter[T] { func (it *PeekCloseIter[T]) Close() error { return it.close() } + +type CounterIterator[T any] interface { + Iterator[T] + Count() int +} + +type CounterIter[T any] struct { + Iterator[T] // the underlying iterator + count int +} + +func NewCounterIter[T any](itr Iterator[T]) *CounterIter[T] { + return &CounterIter[T]{Iterator: itr} +} + +func (it *CounterIter[T]) Next() bool { + if it.Iterator.Next() { + it.count++ + return true + } + return false +} + +func (it *CounterIter[T]) Count() int { + return it.count +} diff --git a/pkg/storage/bloom/v1/util_test.go b/pkg/storage/bloom/v1/util_test.go index ad89a226ec..afafa4d05a 100644 --- a/pkg/storage/bloom/v1/util_test.go +++ b/pkg/storage/bloom/v1/util_test.go @@ -26,3 +26,29 @@ func TestPeekingIterator(t *testing.T) { require.False(t, itr.Next()) } + +func TestCounterIter(t *testing.T) { + t.Parallel() + + data := []int{1, 2, 3, 4, 5} + itr := NewCounterIter[int](NewSliceIter[int](data)) + peekItr := NewPeekingIter[int](itr) + + // Consume the outer iter and use peek + for { + if _, ok := peekItr.Peek(); !ok { + break + } + if !peekItr.Next() { + break + } + } + // Both iterators should be exhausted + require.False(t, itr.Next()) + require.Nil(t, itr.Err()) + require.False(t, peekItr.Next()) + require.Nil(t, peekItr.Err()) + + // Assert that the count is correct and peeking hasn't jeopardized the count + require.Equal(t, len(data), itr.Count()) +}