Add metrics to the distributor about stream sharding (#7390)

This PR adds some more metrics to the steam sharding interactions from
the Distributor.
1. `rate_store_stream_count`: The total number of streams
2. `rate_store_max_stream_shard_count`: The stream with the largest
number of shards
3. `rate_store_max_stream_rate_bytes`: The largest stream seen by a
distributor
4. `rate_store_refresh_duration_seconds`: A histogram of the latency to
query all the ingesters and process their rates
pull/7396/head
Travis Patterson 3 years ago committed by GitHub
parent 5dc27e8064
commit 543ea78b7a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 56
      pkg/distributor/ratestore.go
  2. 50
      pkg/distributor/ratestore_metrics.go

@ -14,11 +14,9 @@ import (
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/ring/client"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/loki/pkg/logproto"
)
@ -54,9 +52,9 @@ type rateStore struct {
rateCollectionInterval time.Duration
ingesterTimeout time.Duration
maxParallelism int
rateRefreshFailures *prometheus.CounterVec
refreshDuration *instrument.HistogramCollector
limits Limits
metrics *ratestoreMetrics
}
func NewRateStore(cfg RateStoreConfig, r ring.ReadRing, cf poolClientFactory, l Limits, registerer prometheus.Registerer) *rateStore { //nolint
@ -67,21 +65,7 @@ func NewRateStore(cfg RateStoreConfig, r ring.ReadRing, cf poolClientFactory, l
maxParallelism: cfg.MaxParallelism,
ingesterTimeout: cfg.IngesterReqTimeout,
limits: l,
rateRefreshFailures: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "rate_store_refresh_failures_total",
Help: "The total number of failed attempts to refresh the distributor's view of stream rates",
}, []string{"source"}),
refreshDuration: instrument.NewHistogramCollector(
promauto.With(registerer).NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "loki",
Name: "rate_store_refresh_duration_seconds",
Help: "Time spent refreshing the rate store",
Buckets: prometheus.DefBuckets,
}, instrument.HistogramCollectorBuckets,
),
),
metrics: newRateStoreMetrics(registerer),
}
s.Service = services.
@ -96,20 +80,22 @@ func (s *rateStore) instrumentedUpdateAllRates(ctx context.Context) error {
return nil
}
return instrument.CollectedRequest(ctx, "GetAllStreamRates", s.refreshDuration, instrument.ErrorCode, s.updateAllRates)
return instrument.CollectedRequest(ctx, "GetAllStreamRates", s.metrics.refreshDuration, instrument.ErrorCode, s.updateAllRates)
}
func (s *rateStore) updateAllRates(ctx context.Context) error {
clients, err := s.getClients()
if err != nil {
level.Error(util_log.Logger).Log("msg", "error getting ingester clients", "err", err)
s.rateRefreshFailures.WithLabelValues("ring").Inc()
s.metrics.rateRefreshFailures.WithLabelValues("ring").Inc()
return nil // Don't fail the service because we have an error getting the clients once
}
streamRates := s.getRates(ctx, clients)
rates := s.aggregateByShard(streamRates)
s.metrics.streamCount.Set(float64(len(rates)))
s.rateLock.Lock()
defer s.rateLock.Unlock()
s.rates = rates
@ -134,18 +120,42 @@ func (s *rateStore) anyShardingEnabled() bool {
}
func (s *rateStore) aggregateByShard(streamRates map[uint64]*logproto.StreamRate) map[uint64]int64 {
var maxRate int64
shardCount := make(map[uint64]int)
rates := make(map[uint64]int64)
for _, sr := range streamRates {
if _, ok := rates[sr.StreamHashNoShard]; ok {
rates[sr.StreamHashNoShard] += sr.Rate
maxRate = max(rates[sr.StreamHashNoShard], maxRate)
shardCount[sr.StreamHashNoShard]++
continue
}
rates[sr.StreamHashNoShard] = sr.Rate
maxRate = max(rates[sr.StreamHashNoShard], maxRate)
shardCount[sr.StreamHashNoShard]++
}
var maxShards int64
for _, v := range shardCount {
maxShards = max(maxShards, int64(v))
}
s.metrics.maxStreamRate.Set(float64(maxRate))
s.metrics.maxStreamShardCount.Set(float64(maxShards))
return rates
}
func max(a, b int64) int64 {
if a > b {
return a
}
return b
}
func (s *rateStore) getRates(ctx context.Context, clients []ingesterClient) map[uint64]*logproto.StreamRate {
parallelClients := make(chan ingesterClient, len(clients))
responses := make(chan *logproto.StreamRatesResponse, len(clients))
@ -169,7 +179,7 @@ func (s *rateStore) getRatesFromIngesters(ctx context.Context, clients chan inge
resp, err := c.client.GetStreamRates(ctx, &logproto.StreamRatesRequest{})
if err != nil {
level.Error(util_log.Logger).Log("msg", "unable to get stream rates", "err", err)
s.rateRefreshFailures.WithLabelValues(c.addr).Inc()
s.metrics.rateRefreshFailures.WithLabelValues(c.addr).Inc()
}
responses <- resp

@ -0,0 +1,50 @@
package distributor
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/instrument"
)
type ratestoreMetrics struct {
rateRefreshFailures *prometheus.CounterVec
streamCount prometheus.Gauge
maxStreamShardCount prometheus.Gauge
maxStreamRate prometheus.Gauge
refreshDuration *instrument.HistogramCollector
}
func newRateStoreMetrics(reg prometheus.Registerer) *ratestoreMetrics {
return &ratestoreMetrics{
rateRefreshFailures: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "rate_store_refresh_failures_total",
Help: "The total number of failed attempts to refresh the distributor's view of stream rates",
}, []string{"source"}),
streamCount: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Namespace: "loki",
Name: "rate_store_streams",
Help: "The number of unique streams reported by all ingesters. Sharded streams are combined",
}),
maxStreamShardCount: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Namespace: "loki",
Name: "rate_store_max_stream_shards",
Help: "The number of shards for a single stream reported by ingesters during a sync operation.",
}),
maxStreamRate: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Namespace: "loki",
Name: "rate_store_max_stream_rate_bytes",
Help: "The maximum stream rate for any stream reported by ingesters during a sync operation. Sharded Streams are combined.",
}),
refreshDuration: instrument.NewHistogramCollector(
promauto.With(reg).NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "loki",
Name: "rate_store_refresh_duration_seconds",
Help: "Time spent refreshing the rate store",
Buckets: prometheus.DefBuckets,
}, instrument.HistogramCollectorBuckets,
),
),
}
}
Loading…
Cancel
Save