feat: new stream count limiter (#13006)

Signed-off-by: Vladyslav Diachenko <vlad.diachenko@grafana.com>
Co-authored-by: JordanRushing <rushing.jordan@gmail.com>
pull/13021/head
Vladyslav Diachenko 1 year ago committed by GitHub
parent 987e551f9e
commit 1111595179
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 5
      docs/sources/shared/configuration.md
  2. 75
      pkg/ingester/instance.go
  3. 81
      pkg/ingester/limiter.go
  4. 47
      pkg/ingester/limiter_test.go
  5. 44
      pkg/ingester/owned_streams.go
  6. 36
      pkg/ingester/owned_streams_test.go
  7. 6
      pkg/validation/limits.go

@ -2978,6 +2978,11 @@ The `limits_config` block configures global and per-tenant limits in Loki. The v
# CLI flag: -validation.discover-log-levels
[discover_log_levels: <boolean> | default = true]
# When true an ingester takes into account only the streams that it owns
# according to the ring while applying the stream limit.
# CLI flag: -ingester.use-owned-stream-count
[use_owned_stream_count: <boolean> | default = false]
# Maximum number of active streams per user, per ingester. 0 to disable.
# CLI flag: -ingester.max-streams-per-user
[max_streams_per_user: <int> | default = 0]

@ -104,7 +104,10 @@ type instance struct {
tailers map[uint32]*tailer
tailerMtx sync.RWMutex
limiter *Limiter
limiter *Limiter
streamCountLimiter *streamCountLimiter
ownedStreamsSvc *ownedStreamService
configs *runtime.TenantConfigs
wal WAL
@ -147,11 +150,12 @@ func newInstance(
if err != nil {
return nil, err
}
streams := newStreamsMap()
ownedStreamsSvc := newOwnedStreamService(instanceID, limiter)
c := config.SchemaConfig{Configs: periodConfigs}
i := &instance{
cfg: cfg,
streams: newStreamsMap(),
streams: streams,
buf: make([]byte, 0, 1024),
index: invertedIndex,
instanceID: instanceID,
@ -159,9 +163,11 @@ func newInstance(
streamsCreatedTotal: streamsCreatedTotal.WithLabelValues(instanceID),
streamsRemovedTotal: streamsRemovedTotal.WithLabelValues(instanceID),
tailers: map[uint32]*tailer{},
limiter: limiter,
configs: configs,
tailers: map[uint32]*tailer{},
limiter: limiter,
streamCountLimiter: newStreamCountLimiter(instanceID, streams.Len, limiter, ownedStreamsSvc),
ownedStreamsSvc: ownedStreamsSvc,
configs: configs,
wal: wal,
metrics: metrics,
@ -286,29 +292,11 @@ func (i *instance) createStream(ctx context.Context, pushReqStream logproto.Stre
}
if record != nil {
err = i.limiter.AssertMaxStreamsPerUser(i.instanceID, i.streams.Len())
err = i.streamCountLimiter.AssertNewStreamAllowed(i.instanceID)
}
if err != nil {
if i.configs.LogStreamCreation(i.instanceID) {
level.Debug(util_log.Logger).Log(
"msg", "failed to create stream, exceeded limit",
"org_id", i.instanceID,
"err", err,
"stream", pushReqStream.Labels,
)
}
validation.DiscardedSamples.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(len(pushReqStream.Entries)))
bytes := 0
for _, e := range pushReqStream.Entries {
bytes += len(e.Line)
}
validation.DiscardedBytes.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(bytes))
if i.customStreamsTracker != nil {
i.customStreamsTracker.DiscardedBytesAdd(ctx, i.instanceID, validation.StreamLimit, labels, float64(bytes))
}
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.StreamLimitErrorMsg, labels, i.instanceID)
return i.onStreamCreationError(ctx, pushReqStream, err, labels)
}
fp := i.getHashForLabels(labels)
@ -333,21 +321,47 @@ func (i *instance) createStream(ctx context.Context, pushReqStream logproto.Stre
i.metrics.recoveredStreamsTotal.Inc()
}
i.onStreamCreated(s)
return s, nil
}
func (i *instance) onStreamCreationError(ctx context.Context, pushReqStream logproto.Stream, err error, labels labels.Labels) (*stream, error) {
if i.configs.LogStreamCreation(i.instanceID) {
level.Debug(util_log.Logger).Log(
"msg", "failed to create stream, exceeded limit",
"org_id", i.instanceID,
"err", err,
"stream", pushReqStream.Labels,
)
}
validation.DiscardedSamples.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(len(pushReqStream.Entries)))
bytes := 0
for _, e := range pushReqStream.Entries {
bytes += len(e.Line)
}
validation.DiscardedBytes.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(bytes))
if i.customStreamsTracker != nil {
i.customStreamsTracker.DiscardedBytesAdd(ctx, i.instanceID, validation.StreamLimit, labels, float64(bytes))
}
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.StreamLimitErrorMsg, labels, i.instanceID)
}
func (i *instance) onStreamCreated(s *stream) {
memoryStreams.WithLabelValues(i.instanceID).Inc()
memoryStreamsLabelsBytes.Add(float64(len(s.labels.String())))
i.streamsCreatedTotal.Inc()
i.addTailersToNewStream(s)
streamsCountStats.Add(1)
i.ownedStreamsSvc.incOwnedStreamCount()
if i.configs.LogStreamCreation(i.instanceID) {
level.Debug(util_log.Logger).Log(
"msg", "successfully created stream",
"org_id", i.instanceID,
"stream", pushReqStream.Labels,
"stream", s.labels.String(),
)
}
return s, nil
}
func (i *instance) createStreamByFP(ls labels.Labels, fp model.Fingerprint) (*stream, error) {
@ -407,6 +421,7 @@ func (i *instance) removeStream(s *stream) {
memoryStreams.WithLabelValues(i.instanceID).Dec()
memoryStreamsLabelsBytes.Sub(float64(len(s.labels.String())))
streamsCountStats.Add(-1)
i.ownedStreamsSvc.decOwnedStreamCount()
}
}

@ -24,6 +24,7 @@ type RingCount interface {
type Limits interface {
UnorderedWrites(userID string) bool
UseOwnedStreamCount(userID string) bool
MaxLocalStreamsPerUser(userID string) int
MaxGlobalStreamsPerUser(userID string) int
PerStreamRateLimit(userID string) validation.RateLimit
@ -76,46 +77,39 @@ func (l *Limiter) UnorderedWrites(userID string) bool {
return l.limits.UnorderedWrites(userID)
}
// AssertMaxStreamsPerUser ensures limit has not been reached compared to the current
// number of streams in input and returns an error if so.
func (l *Limiter) AssertMaxStreamsPerUser(userID string, streams int) error {
// Until the limiter actually starts, all accesses are successful.
// This is used to disable limits while recovering from the WAL.
l.mtx.RLock()
defer l.mtx.RUnlock()
if l.disabled {
return nil
}
func (l *Limiter) GetStreamCountLimit(tenantID string) (calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit int) {
// Start by setting the local limit either from override or default
localLimit := l.limits.MaxLocalStreamsPerUser(userID)
localLimit = l.limits.MaxLocalStreamsPerUser(tenantID)
// We can assume that streams are evenly distributed across ingesters
// so we do convert the global limit into a local limit
globalLimit := l.limits.MaxGlobalStreamsPerUser(userID)
adjustedGlobalLimit := l.convertGlobalToLocalLimit(globalLimit)
globalLimit = l.limits.MaxGlobalStreamsPerUser(tenantID)
adjustedGlobalLimit = l.convertGlobalToLocalLimit(globalLimit)
// Set the calculated limit to the lesser of the local limit or the new calculated global limit
calculatedLimit := l.minNonZero(localLimit, adjustedGlobalLimit)
calculatedLimit = l.minNonZero(localLimit, adjustedGlobalLimit)
// If both the local and global limits are disabled, we just
// use the largest int value
if calculatedLimit == 0 {
calculatedLimit = math.MaxInt32
}
return
}
if streams < calculatedLimit {
return nil
func (l *Limiter) minNonZero(first, second int) int {
if first == 0 || (second != 0 && first > second) {
return second
}
return fmt.Errorf(errMaxStreamsPerUserLimitExceeded, userID, streams, calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit)
return first
}
func (l *Limiter) convertGlobalToLocalLimit(globalLimit int) int {
if globalLimit == 0 {
return 0
}
// todo: change to healthyInstancesInZoneCount() once
// Given we don't need a super accurate count (ie. when the ingesters
// topology changes) and we prefer to always be in favor of the tenant,
// we can use a per-ingester limit equal to:
@ -131,12 +125,53 @@ func (l *Limiter) convertGlobalToLocalLimit(globalLimit int) int {
return 0
}
func (l *Limiter) minNonZero(first, second int) int {
if first == 0 || (second != 0 && first > second) {
return second
type supplier[T any] func() T
type streamCountLimiter struct {
tenantID string
limiter *Limiter
defaultStreamCountSupplier supplier[int]
ownedStreamSvc *ownedStreamService
}
var noopFixedLimitSupplier = func() int {
return 0
}
func newStreamCountLimiter(tenantID string, defaultStreamCountSupplier supplier[int], limiter *Limiter, service *ownedStreamService) *streamCountLimiter {
return &streamCountLimiter{
tenantID: tenantID,
limiter: limiter,
defaultStreamCountSupplier: defaultStreamCountSupplier,
ownedStreamSvc: service,
}
}
return first
func (l *streamCountLimiter) AssertNewStreamAllowed(tenantID string) error {
streamCountSupplier, fixedLimitSupplier := l.getSuppliers(tenantID)
calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit := l.getCurrentLimit(tenantID, fixedLimitSupplier)
actualStreamsCount := streamCountSupplier()
if actualStreamsCount < calculatedLimit {
return nil
}
return fmt.Errorf(errMaxStreamsPerUserLimitExceeded, tenantID, actualStreamsCount, calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit)
}
func (l *streamCountLimiter) getCurrentLimit(tenantID string, fixedLimitSupplier supplier[int]) (calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit int) {
calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit = l.limiter.GetStreamCountLimit(tenantID)
fixedLimit := fixedLimitSupplier()
if fixedLimit > calculatedLimit {
calculatedLimit = fixedLimit
}
return
}
func (l *streamCountLimiter) getSuppliers(tenant string) (streamCountSupplier, fixedLimitSupplier supplier[int]) {
if l.limiter.limits.UseOwnedStreamCount(tenant) {
return l.ownedStreamSvc.getOwnedStreamCount, l.ownedStreamSvc.getFixedLimit
}
return l.defaultStreamCountSupplier, noopFixedLimitSupplier
}
type RateLimiterStrategy interface {

@ -8,12 +8,13 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"golang.org/x/time/rate"
"github.com/grafana/loki/v3/pkg/validation"
)
func TestLimiter_AssertMaxStreamsPerUser(t *testing.T) {
func TestStreamCountLimiter_AssertNewStreamAllowed(t *testing.T) {
tests := map[string]struct {
maxLocalStreamsPerUser int
maxGlobalStreamsPerUser int
@ -21,6 +22,9 @@ func TestLimiter_AssertMaxStreamsPerUser(t *testing.T) {
ringIngesterCount int
streams int
expected error
useOwnedStreamService bool
fixedLimit int32
ownedStreamCount int64
}{
"both local and global limit are disabled": {
maxLocalStreamsPerUser: 0,
@ -94,6 +98,36 @@ func TestLimiter_AssertMaxStreamsPerUser(t *testing.T) {
streams: 3000,
expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 3000, 300, 500, 1000, 300),
},
"actual limit must be used if it's greater than fixed limit": {
maxLocalStreamsPerUser: 500,
maxGlobalStreamsPerUser: 1000,
ringReplicationFactor: 3,
ringIngesterCount: 10,
useOwnedStreamService: true,
fixedLimit: 20,
ownedStreamCount: 3000,
expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 3000, 300, 500, 1000, 300),
},
"fixed limit must be used if it's greater than actual limit": {
maxLocalStreamsPerUser: 500,
maxGlobalStreamsPerUser: 1000,
ringReplicationFactor: 3,
ringIngesterCount: 10,
useOwnedStreamService: true,
fixedLimit: 2000,
ownedStreamCount: 2001,
expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 2001, 2000, 500, 1000, 300),
},
"fixed limit must not be used if both limits are disabled": {
maxLocalStreamsPerUser: 0,
maxGlobalStreamsPerUser: 0,
ringReplicationFactor: 3,
ringIngesterCount: 10,
useOwnedStreamService: true,
fixedLimit: 2000,
ownedStreamCount: 2001,
expected: nil,
},
}
for testName, testData := range tests {
@ -107,11 +141,20 @@ func TestLimiter_AssertMaxStreamsPerUser(t *testing.T) {
limits, err := validation.NewOverrides(validation.Limits{
MaxLocalStreamsPerUser: testData.maxLocalStreamsPerUser,
MaxGlobalStreamsPerUser: testData.maxGlobalStreamsPerUser,
UseOwnedStreamCount: testData.useOwnedStreamService,
}, nil)
require.NoError(t, err)
ownedStreamSvc := &ownedStreamService{
fixedLimit: atomic.NewInt32(testData.fixedLimit),
ownedStreamCount: atomic.NewInt64(testData.ownedStreamCount),
}
limiter := NewLimiter(limits, NilMetrics, ring, testData.ringReplicationFactor)
actual := limiter.AssertMaxStreamsPerUser("test", testData.streams)
defaultCountSupplier := func() int {
return testData.streams
}
streamCountLimiter := newStreamCountLimiter("test", defaultCountSupplier, limiter, ownedStreamSvc)
actual := streamCountLimiter.AssertNewStreamAllowed("test")
assert.Equal(t, testData.expected, actual)
})

@ -0,0 +1,44 @@
package ingester
import "go.uber.org/atomic"
type ownedStreamService struct {
tenantID string
limiter *Limiter
fixedLimit *atomic.Int32
//todo: implement job to recalculate it
ownedStreamCount *atomic.Int64
}
func newOwnedStreamService(tenantID string, limiter *Limiter) *ownedStreamService {
svc := &ownedStreamService{
tenantID: tenantID,
limiter: limiter,
ownedStreamCount: atomic.NewInt64(0),
fixedLimit: atomic.NewInt32(0),
}
svc.updateFixedLimit()
return svc
}
func (s *ownedStreamService) getOwnedStreamCount() int {
return int(s.ownedStreamCount.Load())
}
func (s *ownedStreamService) updateFixedLimit() {
limit, _, _, _ := s.limiter.GetStreamCountLimit(s.tenantID)
s.fixedLimit.Store(int32(limit))
}
func (s *ownedStreamService) getFixedLimit() int {
return int(s.fixedLimit.Load())
}
func (s *ownedStreamService) incOwnedStreamCount() {
s.ownedStreamCount.Inc()
}
func (s *ownedStreamService) decOwnedStreamCount() {
s.ownedStreamCount.Dec()
}

@ -0,0 +1,36 @@
package ingester
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/validation"
)
func Test_OwnedStreamService(t *testing.T) {
limits, err := validation.NewOverrides(validation.Limits{
MaxGlobalStreamsPerUser: 100,
}, nil)
require.NoError(t, err)
// Mock the ring
ring := &ringCountMock{count: 30}
limiter := NewLimiter(limits, NilMetrics, ring, 3)
service := newOwnedStreamService("test", limiter)
require.Equal(t, 0, service.getOwnedStreamCount())
require.Equal(t, 10, service.getFixedLimit(), "fixed limit must be initialised during the instantiation")
limits.DefaultLimits().MaxGlobalStreamsPerUser = 1000
require.Equal(t, 10, service.getFixedLimit(), "fixed list must not be changed until update is triggered")
service.updateFixedLimit()
require.Equal(t, 100, service.getFixedLimit())
service.incOwnedStreamCount()
service.incOwnedStreamCount()
require.Equal(t, 2, service.getOwnedStreamCount())
service.decOwnedStreamCount()
require.Equal(t, 1, service.getOwnedStreamCount())
}

@ -85,6 +85,7 @@ type Limits struct {
DiscoverLogLevels bool `yaml:"discover_log_levels" json:"discover_log_levels"`
// Ingester enforced limits.
UseOwnedStreamCount bool `yaml:"use_owned_stream_count" json:"use_owned_stream_count"`
MaxLocalStreamsPerUser int `yaml:"max_streams_per_user" json:"max_streams_per_user"`
MaxGlobalStreamsPerUser int `yaml:"max_global_streams_per_user" json:"max_global_streams_per_user"`
UnorderedWrites bool `yaml:"unordered_writes" json:"unordered_writes"`
@ -270,6 +271,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.Var(&l.CreationGracePeriod, "validation.create-grace-period", "Duration which table will be created/deleted before/after it's needed; we won't accept sample from before this time.")
f.IntVar(&l.MaxEntriesLimitPerQuery, "validation.max-entries-limit", 5000, "Maximum number of log entries that will be returned for a query.")
f.BoolVar(&l.UseOwnedStreamCount, "ingester.use-owned-stream-count", false, "When true an ingester takes into account only the streams that it owns according to the ring while applying the stream limit.")
f.IntVar(&l.MaxLocalStreamsPerUser, "ingester.max-streams-per-user", 0, "Maximum number of active streams per user, per ingester. 0 to disable.")
f.IntVar(&l.MaxGlobalStreamsPerUser, "ingester.max-global-streams-per-user", 5000, "Maximum number of active streams per user, across the cluster. 0 to disable. When the global limit is enabled, each ingester is configured with a dynamic local limit based on the replication factor and the current number of healthy ingesters, and is kept updated whenever the number of ingesters change.")
@ -588,6 +590,10 @@ func (o *Overrides) CreationGracePeriod(userID string) time.Duration {
return time.Duration(o.getOverridesForUser(userID).CreationGracePeriod)
}
func (o *Overrides) UseOwnedStreamCount(userID string) bool {
return o.getOverridesForUser(userID).UseOwnedStreamCount
}
// MaxLocalStreamsPerUser returns the maximum number of streams a user is allowed to store
// in a single ingester.
func (o *Overrides) MaxLocalStreamsPerUser(userID string) int {

Loading…
Cancel
Save