Added max streams per user global limit (#1493)

* Added max streams per user global limit

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* Updated changelog

Signed-off-by: Marco Pracucci <marco@pracucci.com>

Co-authored-by: Cyril Tovena <cyril.tovena@gmail.com>
pull/1499/head
Marco Pracucci 6 years ago committed by Cyril Tovena
parent ce407d3276
commit ec40515d31
  1. 1
      CHANGELOG.md
  2. 8
      docs/configuration/README.md
  3. 9
      pkg/ingester/ingester.go
  4. 2
      pkg/ingester/ingester_test.go
  5. 13
      pkg/ingester/instance.go
  6. 15
      pkg/ingester/instance_test.go
  7. 94
      pkg/ingester/limiter.go
  8. 195
      pkg/ingester/limiter_test.go
  9. 19
      pkg/util/validation/limits.go

@ -5,6 +5,7 @@
### Features
* [1493](https://github.com/grafana/loki/pull/1493) **pracucci**: pkg/ingester: added a per-cluster limit on the maximum number of series per-user, configured via the `max_global_streams_per_user` config option.
* [FEATURE] promtail positions file corruptions can be ignored with the `positions.ignore-invalid-yaml` flag. In the case the positions yaml is corrupted an empty positions config will be used and should later overwrite the malformed yaml.
* [1486](https://github.com/grafana/loki/pull/1486) **pracucci**: Added `global` ingestion rate limiter strategy support.

@ -762,9 +762,15 @@ logs in Loki.
# Enforce every sample has a metric name.
[enforce_metric_name: <boolean> | default = true]
# Maximum number of active streams per user.
# Maximum number of active streams per user, per ingester. 0 to disable.
[max_streams_per_user: <int> | default = 10000]
# Maximum number of active streams per user, across the cluster. 0 to disable.
# When the global limit is enabled, each ingester is configured with a dynamic
# local limit based on the replication factor and the current number of healthy
# ingesters, and is kept updated whenever the number of ingesters change.
[max_global_streams_per_user: <int> | default = 0]
# Maximum number of chunks that can be fetched by a single query.
[max_chunks_per_query: <int> | default = 2000000]

@ -99,7 +99,7 @@ type Ingester struct {
flushQueues []*util.PriorityQueue
flushQueuesDone sync.WaitGroup
limits *validation.Overrides
limiter *Limiter
factory func() chunkenc.Chunk
}
@ -126,7 +126,6 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid
quit: make(chan struct{}),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
quitting: make(chan struct{}),
limits: limits,
factory: func() chunkenc.Chunk {
return chunkenc.NewMemChunkSize(enc, cfg.BlockSize, cfg.TargetChunkSize)
},
@ -145,6 +144,10 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid
i.lifecycler.Start()
// Now that the lifecycler has been created, we can create the limiter
// which depends on it.
i.limiter = NewLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor)
i.done.Add(1)
go i.loop()
@ -208,7 +211,7 @@ func (i *Ingester) getOrCreateInstance(instanceID string) *instance {
defer i.instancesMtx.Unlock()
inst, ok = i.instances[instanceID]
if !ok {
inst = newInstance(instanceID, i.factory, i.limits, i.cfg.SyncPeriod, i.cfg.SyncMinUtilization)
inst = newInstance(instanceID, i.factory, i.limiter, i.cfg.SyncPeriod, i.cfg.SyncMinUtilization)
i.instances[instanceID] = inst
}
return inst

@ -189,7 +189,7 @@ func TestIngester(t *testing.T) {
func TestIngesterStreamLimitExceeded(t *testing.T) {
ingesterConfig := defaultIngesterTestConfig(t)
defaultLimits := defaultLimitsTestConfig()
defaultLimits.MaxStreamsPerUser = 1
defaultLimits.MaxLocalStreamsPerUser = 1
overrides, err := validation.NewOverrides(defaultLimits)
require.NoError(t, err)

@ -24,7 +24,6 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/validation"
)
const queryBatchSize = 128
@ -66,7 +65,7 @@ type instance struct {
tailers map[uint32]*tailer
tailerMtx sync.RWMutex
limits *validation.Overrides
limiter *Limiter
factory func() chunkenc.Chunk
// sync
@ -74,7 +73,7 @@ type instance struct {
syncMinUtil float64
}
func newInstance(instanceID string, factory func() chunkenc.Chunk, limits *validation.Overrides, syncPeriod time.Duration, syncMinUtil float64) *instance {
func newInstance(instanceID string, factory func() chunkenc.Chunk, limiter *Limiter, syncPeriod time.Duration, syncMinUtil float64) *instance {
i := &instance{
streams: map[model.Fingerprint]*stream{},
index: index.New(),
@ -85,7 +84,7 @@ func newInstance(instanceID string, factory func() chunkenc.Chunk, limits *valid
factory: factory,
tailers: map[uint32]*tailer{},
limits: limits,
limiter: limiter,
syncPeriod: syncPeriod,
syncMinUtil: syncMinUtil,
@ -160,9 +159,11 @@ func (i *instance) getOrCreateStream(labels []client.LabelAdapter) (*stream, err
return stream, nil
}
if len(i.streams) >= i.limits.MaxStreamsPerUser(i.instanceID) {
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "per-user streams limit (%d) exceeded", i.limits.MaxStreamsPerUser(i.instanceID))
err := i.limiter.AssertMaxStreamsPerUser(i.instanceID, len(i.streams))
if err != nil {
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, err.Error())
}
sortedLabels := i.index.Add(labels, fp)
stream = newStream(fp, sortedLabels, i.factory)
i.streams[fp] = stream

@ -24,10 +24,11 @@ var defaultFactory = func() chunkenc.Chunk {
}
func TestLabelsCollisions(t *testing.T) {
o, err := validation.NewOverrides(validation.Limits{MaxStreamsPerUser: 1000})
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000})
require.NoError(t, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)
i := newInstance("test", defaultFactory, o, 0, 0)
i := newInstance("test", defaultFactory, limiter, 0, 0)
// avoid entries from the future.
tt := time.Now().Add(-5 * time.Minute)
@ -50,10 +51,11 @@ func TestLabelsCollisions(t *testing.T) {
}
func TestConcurrentPushes(t *testing.T) {
o, err := validation.NewOverrides(validation.Limits{MaxStreamsPerUser: 1000})
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000})
require.NoError(t, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)
inst := newInstance("test", defaultFactory, o, 0, 0)
inst := newInstance("test", defaultFactory, limiter, 0, 0)
const (
concurrent = 10
@ -100,8 +102,9 @@ func TestConcurrentPushes(t *testing.T) {
}
func TestSyncPeriod(t *testing.T) {
o, err := validation.NewOverrides(validation.Limits{MaxStreamsPerUser: 1000})
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000})
require.NoError(t, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)
const (
syncPeriod = 1 * time.Minute
@ -110,7 +113,7 @@ func TestSyncPeriod(t *testing.T) {
minUtil = 0.20
)
inst := newInstance("test", defaultFactory, o, syncPeriod, minUtil)
inst := newInstance("test", defaultFactory, limiter, syncPeriod, minUtil)
lbls := makeRandomLabels()
tt := time.Now()

@ -0,0 +1,94 @@
package ingester
import (
"fmt"
"math"
"github.com/grafana/loki/pkg/util/validation"
)
const (
errMaxStreamsPerUserLimitExceeded = "per-user streams limit (local: %d global: %d actual local: %d) exceeded"
)
// RingCount is the interface exposed by a ring implementation which allows
// to count members
type RingCount interface {
HealthyInstancesCount() int
}
// Limiter implements primitives to get the maximum number of streams
// an ingester can handle for a specific tenant
type Limiter struct {
limits *validation.Overrides
ring RingCount
replicationFactor int
}
// NewLimiter makes a new limiter
func NewLimiter(limits *validation.Overrides, ring RingCount, replicationFactor int) *Limiter {
return &Limiter{
limits: limits,
ring: ring,
replicationFactor: replicationFactor,
}
}
// AssertMaxStreamsPerUser ensures limit has not been reached compared to the current
// number of streams in input and returns an error if so.
func (l *Limiter) AssertMaxStreamsPerUser(userID string, streams int) error {
actualLimit := l.maxStreamsPerUser(userID)
if streams < actualLimit {
return nil
}
localLimit := l.limits.MaxLocalStreamsPerUser(userID)
globalLimit := l.limits.MaxGlobalStreamsPerUser(userID)
return fmt.Errorf(errMaxStreamsPerUserLimitExceeded, localLimit, globalLimit, actualLimit)
}
func (l *Limiter) maxStreamsPerUser(userID string) int {
localLimit := l.limits.MaxLocalStreamsPerUser(userID)
// We can assume that streams are evenly distributed across ingesters
// so we do convert the global limit into a local limit
globalLimit := l.limits.MaxGlobalStreamsPerUser(userID)
localLimit = l.minNonZero(localLimit, l.convertGlobalToLocalLimit(globalLimit))
// If both the local and global limits are disabled, we just
// use the largest int value
if localLimit == 0 {
localLimit = math.MaxInt32
}
return localLimit
}
func (l *Limiter) convertGlobalToLocalLimit(globalLimit int) int {
if globalLimit == 0 {
return 0
}
// Given we don't need a super accurate count (ie. when the ingesters
// topology changes) and we prefer to always be in favor of the tenant,
// we can use a per-ingester limit equal to:
// (global limit / number of ingesters) * replication factor
numIngesters := l.ring.HealthyInstancesCount()
// May happen because the number of ingesters is asynchronously updated.
// If happens, we just temporarily ignore the global limit.
if numIngesters > 0 {
return int((float64(globalLimit) / float64(numIngesters)) * float64(l.replicationFactor))
}
return 0
}
func (l *Limiter) minNonZero(first, second int) int {
if first == 0 || (second != 0 && first > second) {
return second
}
return first
}

@ -0,0 +1,195 @@
package ingester
import (
"fmt"
"math"
"testing"
"github.com/grafana/loki/pkg/util/validation"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestLimiter_maxStreamsPerUser(t *testing.T) {
tests := map[string]struct {
maxLocalStreamsPerUser int
maxGlobalStreamsPerUser int
ringReplicationFactor int
ringIngesterCount int
expected int
}{
"both local and global limits are disabled": {
maxLocalStreamsPerUser: 0,
maxGlobalStreamsPerUser: 0,
ringReplicationFactor: 1,
ringIngesterCount: 1,
expected: math.MaxInt32,
},
"only local limit is enabled": {
maxLocalStreamsPerUser: 1000,
maxGlobalStreamsPerUser: 0,
ringReplicationFactor: 1,
ringIngesterCount: 1,
expected: 1000,
},
"only global limit is enabled with replication-factor=1": {
maxLocalStreamsPerUser: 0,
maxGlobalStreamsPerUser: 1000,
ringReplicationFactor: 1,
ringIngesterCount: 10,
expected: 100,
},
"only global limit is enabled with replication-factor=3": {
maxLocalStreamsPerUser: 0,
maxGlobalStreamsPerUser: 1000,
ringReplicationFactor: 3,
ringIngesterCount: 10,
expected: 300,
},
"both local and global limits are set with local limit < global limit": {
maxLocalStreamsPerUser: 150,
maxGlobalStreamsPerUser: 1000,
ringReplicationFactor: 3,
ringIngesterCount: 10,
expected: 150,
},
"both local and global limits are set with local limit > global limit": {
maxLocalStreamsPerUser: 500,
maxGlobalStreamsPerUser: 1000,
ringReplicationFactor: 3,
ringIngesterCount: 10,
expected: 300,
},
}
for testName, testData := range tests {
testData := testData
t.Run(testName, func(t *testing.T) {
// Mock the ring
ring := &ringCountMock{count: testData.ringIngesterCount}
// Mock limits
limits, err := validation.NewOverrides(validation.Limits{
MaxLocalStreamsPerUser: testData.maxLocalStreamsPerUser,
MaxGlobalStreamsPerUser: testData.maxGlobalStreamsPerUser,
})
require.NoError(t, err)
limiter := NewLimiter(limits, ring, testData.ringReplicationFactor)
actual := limiter.maxStreamsPerUser("test")
assert.Equal(t, testData.expected, actual)
})
}
}
func TestLimiter_AssertMaxStreamsPerUser(t *testing.T) {
tests := map[string]struct {
maxLocalStreamsPerUser int
maxGlobalStreamsPerUser int
ringReplicationFactor int
ringIngesterCount int
streams int
expected error
}{
"both local and global limit are disabled": {
maxLocalStreamsPerUser: 0,
maxGlobalStreamsPerUser: 0,
ringReplicationFactor: 1,
ringIngesterCount: 1,
streams: 100,
expected: nil,
},
"current number of streams is below the limit": {
maxLocalStreamsPerUser: 0,
maxGlobalStreamsPerUser: 1000,
ringReplicationFactor: 3,
ringIngesterCount: 10,
streams: 299,
expected: nil,
},
"current number of streams is above the limit": {
maxLocalStreamsPerUser: 0,
maxGlobalStreamsPerUser: 1000,
ringReplicationFactor: 3,
ringIngesterCount: 10,
streams: 300,
expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, 0, 1000, 300),
},
}
for testName, testData := range tests {
testData := testData
t.Run(testName, func(t *testing.T) {
// Mock the ring
ring := &ringCountMock{count: testData.ringIngesterCount}
// Mock limits
limits, err := validation.NewOverrides(validation.Limits{
MaxLocalStreamsPerUser: testData.maxLocalStreamsPerUser,
MaxGlobalStreamsPerUser: testData.maxGlobalStreamsPerUser,
})
require.NoError(t, err)
limiter := NewLimiter(limits, ring, testData.ringReplicationFactor)
actual := limiter.AssertMaxStreamsPerUser("test", testData.streams)
assert.Equal(t, testData.expected, actual)
})
}
}
func TestLimiter_minNonZero(t *testing.T) {
t.Parallel()
tests := map[string]struct {
first int
second int
expected int
}{
"both zero": {
first: 0,
second: 0,
expected: 0,
},
"first is zero": {
first: 0,
second: 1,
expected: 1,
},
"second is zero": {
first: 1,
second: 0,
expected: 1,
},
"both non zero, second > first": {
first: 1,
second: 2,
expected: 1,
},
"both non zero, first > second": {
first: 2,
second: 1,
expected: 1,
},
}
for testName, testData := range tests {
testData := testData
t.Run(testName, func(t *testing.T) {
limiter := NewLimiter(nil, nil, 0)
assert.Equal(t, testData.expected, limiter.minNonZero(testData.first, testData.second))
})
}
}
type ringCountMock struct {
count int
}
func (m *ringCountMock) HealthyInstancesCount() int {
return m.count
}

@ -36,7 +36,8 @@ type Limits struct {
EnforceMetricName bool `yaml:"enforce_metric_name"`
// Ingester enforced limits.
MaxStreamsPerUser int `yaml:"max_streams_per_user"`
MaxLocalStreamsPerUser int `yaml:"max_streams_per_user"`
MaxGlobalStreamsPerUser int `yaml:"max_global_streams_per_user"`
// Querier enforced limits.
MaxChunksPerQuery int `yaml:"max_chunks_per_query"`
@ -63,7 +64,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&l.CreationGracePeriod, "validation.create-grace-period", 10*time.Minute, "Duration which table will be created/deleted before/after it's needed; we won't accept sample from before this time.")
f.BoolVar(&l.EnforceMetricName, "validation.enforce-metric-name", true, "Enforce every sample has a metric name.")
f.IntVar(&l.MaxStreamsPerUser, "ingester.max-streams-per-user", 10e3, "Maximum number of active streams per user.")
f.IntVar(&l.MaxLocalStreamsPerUser, "ingester.max-streams-per-user", 10e3, "Maximum number of active streams per user, per ingester. 0 to disable.")
f.IntVar(&l.MaxGlobalStreamsPerUser, "ingester.max-global-streams-per-user", 0, "Maximum number of active streams per user, across the cluster. 0 to disable.")
f.IntVar(&l.MaxChunksPerQuery, "store.query-chunk-limit", 2e6, "Maximum number of chunks that can be fetched in a single query.")
f.DurationVar(&l.MaxQueryLength, "store.max-query-length", 0, "Limit to length of chunk store queries, 0 to disable.")
@ -180,9 +182,16 @@ func (o *Overrides) CreationGracePeriod(userID string) time.Duration {
return o.overridesManager.GetLimits(userID).(*Limits).CreationGracePeriod
}
// MaxStreamsPerUser returns the maximum number of streams a user is allowed to store.
func (o *Overrides) MaxStreamsPerUser(userID string) int {
return o.overridesManager.GetLimits(userID).(*Limits).MaxStreamsPerUser
// MaxLocalStreamsPerUser returns the maximum number of streams a user is allowed to store
// in a single ingester.
func (o *Overrides) MaxLocalStreamsPerUser(userID string) int {
return o.overridesManager.GetLimits(userID).(*Limits).MaxLocalStreamsPerUser
}
// MaxGlobalStreamsPerUser returns the maximum number of streams a user is allowed to store
// across the cluster.
func (o *Overrides) MaxGlobalStreamsPerUser(userID string) int {
return o.overridesManager.GetLimits(userID).(*Limits).MaxGlobalStreamsPerUser
}
// MaxChunksPerQuery returns the maximum number of chunks allowed per query.

Loading…
Cancel
Save