|
|
|
@ -3,13 +3,20 @@ package ingester |
|
|
|
|
import ( |
|
|
|
|
"sync" |
|
|
|
|
|
|
|
|
|
"github.com/grafana/dskit/services" |
|
|
|
|
"github.com/prometheus/client_golang/prometheus" |
|
|
|
|
"github.com/prometheus/client_golang/prometheus/promauto" |
|
|
|
|
"go.uber.org/atomic" |
|
|
|
|
|
|
|
|
|
"github.com/grafana/loki/v3/pkg/util/constants" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
type ownedStreamService struct { |
|
|
|
|
services.Service |
|
|
|
|
var notOwnedStreamsMetric = promauto.NewGaugeVec(prometheus.GaugeOpts{ |
|
|
|
|
Namespace: constants.Loki, |
|
|
|
|
Name: "ingester_not_owned_streams", |
|
|
|
|
Help: "The total number of not owned streams in memory per tenant.", |
|
|
|
|
}, []string{"tenant"}) |
|
|
|
|
|
|
|
|
|
type ownedStreamService struct { |
|
|
|
|
tenantID string |
|
|
|
|
limiter *Limiter |
|
|
|
|
fixedLimit *atomic.Int32 |
|
|
|
@ -53,6 +60,7 @@ func (s *ownedStreamService) incOwnedStreamCount() { |
|
|
|
|
func (s *ownedStreamService) incNotOwnedStreamCount() { |
|
|
|
|
s.lock.Lock() |
|
|
|
|
defer s.lock.Unlock() |
|
|
|
|
notOwnedStreamsMetric.WithLabelValues(s.tenantID).Inc() |
|
|
|
|
s.notOwnedStreamCount++ |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -60,6 +68,7 @@ func (s *ownedStreamService) decOwnedStreamCount() { |
|
|
|
|
s.lock.Lock() |
|
|
|
|
defer s.lock.Unlock() |
|
|
|
|
if s.notOwnedStreamCount > 0 { |
|
|
|
|
notOwnedStreamsMetric.WithLabelValues(s.tenantID).Dec() |
|
|
|
|
s.notOwnedStreamCount-- |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
@ -71,4 +80,5 @@ func (s *ownedStreamService) resetStreamCounts() { |
|
|
|
|
defer s.lock.Unlock() |
|
|
|
|
s.ownedStreamCount = 0 |
|
|
|
|
s.notOwnedStreamCount = 0 |
|
|
|
|
notOwnedStreamsMetric.WithLabelValues(s.tenantID).Set(0) |
|
|
|
|
} |
|
|
|
|