@ -7,6 +7,7 @@ import (
"time"
"github.com/grafana/loki/pkg/util"
"github.com/opentracing/opentracing-go"
"github.com/weaveworks/common/instrument"
@ -31,12 +32,14 @@ type RateStoreConfig struct {
MaxParallelism int ` yaml:"max_request_parallelism" `
StreamRateUpdateInterval time . Duration ` yaml:"stream_rate_update_interval" `
IngesterReqTimeout time . Duration ` yaml:"ingester_request_timeout" `
Debug bool ` yaml:"debug" `
}
func ( cfg * RateStoreConfig ) RegisterFlagsWithPrefix ( prefix string , fs * flag . FlagSet ) {
fs . IntVar ( & cfg . MaxParallelism , prefix + ".max-request-parallelism" , 200 , "The max number of concurrent requests to make to ingester stream apis" )
fs . DurationVar ( & cfg . StreamRateUpdateInterval , prefix + ".stream-rate-update-interval" , time . Second , "The interval on which distributors will update current stream rates from ingesters" )
fs . DurationVar ( & cfg . IngesterReqTimeout , prefix + ".ingester-request-timeout" , 500 * time . Millisecond , "Timeout for communication between distributors and any given ingester when updating rates" )
fs . BoolVar ( & cfg . Debug , prefix + ".debug" , false , "If enabled, detailed logs and spans will be emitted." )
}
type ingesterClient struct {
@ -63,6 +66,8 @@ type rateStore struct {
limits Limits
metrics * ratestoreMetrics
debug bool
}
func NewRateStore ( cfg RateStoreConfig , r ring . ReadRing , cf poolClientFactory , l Limits , registerer prometheus . Registerer ) * rateStore { //nolint
@ -75,6 +80,7 @@ func NewRateStore(cfg RateStoreConfig, r ring.ReadRing, cf poolClientFactory, l
limits : l ,
metrics : newRateStoreMetrics ( registerer ) ,
rates : make ( map [ string ] map [ uint64 ] expiringRate ) ,
debug : cfg . Debug ,
}
rateCollectionInterval := util . DurationWithJitter ( cfg . StreamRateUpdateInterval , 0.2 )
@ -94,7 +100,7 @@ func (s *rateStore) instrumentedUpdateAllRates(ctx context.Context) error {
}
func ( s * rateStore ) updateAllRates ( ctx context . Context ) error {
clients , err := s . getClients ( )
clients , err := s . getClients ( ctx )
if err != nil {
level . Error ( util_log . Logger ) . Log ( "msg" , "error getting ingester clients" , "err" , err )
s . metrics . rateRefreshFailures . WithLabelValues ( "ring" ) . Inc ( )
@ -102,8 +108,8 @@ func (s *rateStore) updateAllRates(ctx context.Context) error {
}
streamRates := s . getRates ( ctx , clients )
updated := s . aggregateByShard ( streamRates )
updateStats := s . updateRates ( updated )
updated := s . aggregateByShard ( ctx , streamRates )
updateStats := s . updateRates ( ctx , updated )
s . metrics . maxStreamRate . Set ( float64 ( updateStats . maxRate ) )
s . metrics . maxStreamShardCount . Set ( float64 ( updateStats . maxShards ) )
@ -120,7 +126,13 @@ type rateStats struct {
expiredCount int64
}
func ( s * rateStore ) updateRates ( updated map [ string ] map [ uint64 ] expiringRate ) rateStats {
func ( s * rateStore ) updateRates ( ctx context . Context , updated map [ string ] map [ uint64 ] expiringRate ) rateStats {
if s . debug {
if sp := opentracing . SpanFromContext ( ctx ) ; sp != nil {
sp . LogKV ( "event" , "started to update rates" )
defer sp . LogKV ( "event" , "finished to update rates" )
}
}
s . rateLock . Lock ( )
defer s . rateLock . Unlock ( )
@ -179,7 +191,13 @@ func (s *rateStore) anyShardingEnabled() bool {
return false
}
func ( s * rateStore ) aggregateByShard ( streamRates map [ string ] map [ uint64 ] * logproto . StreamRate ) map [ string ] map [ uint64 ] expiringRate {
func ( s * rateStore ) aggregateByShard ( ctx context . Context , streamRates map [ string ] map [ uint64 ] * logproto . StreamRate ) map [ string ] map [ uint64 ] expiringRate {
if s . debug {
if sp := opentracing . SpanFromContext ( ctx ) ; sp != nil {
sp . LogKV ( "started to aggregate by shard" )
defer sp . LogKV ( "finished to aggregate by shard" )
}
}
rates := map [ string ] map [ uint64 ] expiringRate { }
for tID , tenant := range streamRates {
@ -208,6 +226,13 @@ func max(a, b int64) int64 {
}
func ( s * rateStore ) getRates ( ctx context . Context , clients [ ] ingesterClient ) map [ string ] map [ uint64 ] * logproto . StreamRate {
if s . debug {
if sp := opentracing . SpanFromContext ( ctx ) ; sp != nil {
sp . LogKV ( "event" , "started to get rates from ingesters" )
defer sp . LogKV ( "event" , "finished to get rates from ingesters" )
}
}
parallelClients := make ( chan ingesterClient , len ( clients ) )
responses := make ( chan * logproto . StreamRatesResponse , len ( clients ) )
@ -225,16 +250,24 @@ func (s *rateStore) getRates(ctx context.Context, clients []ingesterClient) map[
func ( s * rateStore ) getRatesFromIngesters ( ctx context . Context , clients chan ingesterClient , responses chan * logproto . StreamRatesResponse ) {
for c := range clients {
ctx , cancel := context . WithTimeout ( ctx , s . ingesterTimeout )
func ( ) {
if s . debug {
startTime := time . Now ( )
defer func ( ) {
level . Debug ( util_log . Logger ) . Log ( "msg" , "get rates from ingester" , "duration" , time . Since ( startTime ) , "ingester" , c . addr )
} ( )
}
ctx , cancel := context . WithTimeout ( ctx , s . ingesterTimeout )
resp , err := c . client . GetStreamRates ( ctx , & logproto . StreamRatesRequest { } )
if err != nil {
level . Error ( util_log . Logger ) . Log ( "msg" , "unable to get stream rates from ingester" , "ingester" , c . addr , "err" , err )
s . metrics . rateRefreshFailures . WithLabelValues ( c . addr ) . Inc ( )
}
resp , err := c . client . GetStreamRates ( ctx , & logproto . StreamRatesRequest { } )
if err != nil {
level . Error ( util_log . Logger ) . Log ( "msg" , "unable to get stream rates from ingester" , "ingester" , c . addr , "err" , err )
s . metrics . rateRefreshFailures . WithLabelValues ( c . addr ) . Inc ( )
}
responses <- resp
cancel ( )
responses <- resp
cancel ( )
} ( )
}
}
@ -269,7 +302,14 @@ func (s *rateStore) ratesPerStream(responses chan *logproto.StreamRatesResponse,
return streamRates
}
func ( s * rateStore ) getClients ( ) ( [ ] ingesterClient , error ) {
func ( s * rateStore ) getClients ( ctx context . Context ) ( [ ] ingesterClient , error ) {
if s . debug {
if sp := opentracing . SpanFromContext ( ctx ) ; sp != nil {
sp . LogKV ( "event" , "ratestore started getting clients" )
defer sp . LogKV ( "event" , "ratestore finished getting clients" )
}
}
ingesters , err := s . ring . GetAllHealthy ( ring . Read )
if err != nil {
return nil , err