Enforce per tenant queue size (#8947)

**What this PR does / why we need it**:

Prior to the changes in https://github.com/grafana/loki/pull/8752 the max queue size per tenant was enforced by the size of the buffered channel to which a request was enqueued.
However, since we have hierarchical queues, every sub-queue has the same channel capacity as the root (tenant) queue. Therefore the total queue size per tenant needs to be tracked separately, so requests can be rejected when the max queue size is reached.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/8955/head
Christian Haudum 3 years ago committed by GitHub
parent 0adedfa689
commit abc0fd26d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 17
      pkg/scheduler/queue/queue.go
  2. 30
      pkg/scheduler/queue/queue_test.go
  3. 27
      pkg/scheduler/queue/tenant_queues.go

@ -93,6 +93,20 @@ func (q *RequestQueue) Enqueue(tenant string, path []string, req Request, maxQue
return errors.New("no queue found")
}
// Optimistically increase queue counter for tenant instead of doing separate
// get and set operations, because _most_ of the time the increased value is
// smaller than the max queue length.
// We need to keep track of queue length separately because the size of the
// buffered channel is the same across all sub-queues which would allow
// enqueuing more items than there are allowed at tenant level.
queueLen := q.queues.perUserQueueLen.Inc(tenant)
if queueLen > q.queues.maxUserQueueSize {
q.metrics.discardedRequests.WithLabelValues(tenant).Inc()
// decrement, because we already optimistically increased the counter
q.queues.perUserQueueLen.Dec(tenant)
return ErrTooManyRequests
}
select {
case queue.Chan() <- req:
q.metrics.queueLength.WithLabelValues(tenant).Inc()
@ -105,6 +119,8 @@ func (q *RequestQueue) Enqueue(tenant string, path []string, req Request, maxQue
return nil
default:
q.metrics.discardedRequests.WithLabelValues(tenant).Inc()
// decrement, because we already optimistically increased the counter
q.queues.perUserQueueLen.Dec(tenant)
return ErrTooManyRequests
}
}
@ -147,6 +163,7 @@ FindQueue:
q.queues.deleteQueue(tenant)
}
q.queues.perUserQueueLen.Dec(tenant)
q.metrics.queueLength.WithLabelValues(tenant).Dec()
// Tell close() we've processed a request.

@ -301,6 +301,36 @@ func TestContextCond(t *testing.T) {
})
}
func TestMaxQueueSize(t *testing.T) {
t.Run("queue size is tracked per tenant", func(t *testing.T) {
maxSize := 3
queue := NewRequestQueue(maxSize, 0, NewMetrics("query_scheduler", nil))
queue.RegisterQuerierConnection("querier")
// enqueue maxSize items with different actors
// different actors have individual channels with maxSize length
assert.NoError(t, queue.Enqueue("tenant", []string{"user-a"}, 1, 0, nil))
assert.NoError(t, queue.Enqueue("tenant", []string{"user-b"}, 2, 0, nil))
assert.NoError(t, queue.Enqueue("tenant", []string{"user-c"}, 3, 0, nil))
// max queue length per tenant is tracked globally for all actors within a tenant
err := queue.Enqueue("tenant", []string{"user-a"}, 4, 0, nil)
assert.Equal(t, err, ErrTooManyRequests)
// dequeue and enqueue some items
_, _, err = queue.Dequeue(context.Background(), StartIndexWithLocalQueue, "querier")
assert.NoError(t, err)
_, _, err = queue.Dequeue(context.Background(), StartIndexWithLocalQueue, "querier")
assert.NoError(t, err)
assert.NoError(t, queue.Enqueue("tenant", []string{"user-a"}, 4, 0, nil))
assert.NoError(t, queue.Enqueue("tenant", []string{"user-b"}, 5, 0, nil))
err = queue.Enqueue("tenant", []string{"user-c"}, 6, 0, nil)
assert.Equal(t, err, ErrTooManyRequests)
})
}
func assertChanReceived(t *testing.T, c chan struct{}, timeout time.Duration, msg string) {
t.Helper()

@ -13,6 +13,31 @@ import (
"github.com/grafana/loki/pkg/util"
)
type intPointerMap map[string]*int
func (tqs intPointerMap) Inc(key string) int {
ptr, ok := tqs[key]
if !ok {
size := 1
tqs[key] = &size
return size
}
(*ptr)++
return *ptr
}
func (tqs intPointerMap) Dec(key string) int {
ptr, ok := tqs[key]
if !ok {
return 0
}
(*ptr)--
if *ptr == 0 {
delete(tqs, key)
}
return *ptr
}
// querier holds information about a querier registered in the queue.
type querier struct {
// Number of active connections.
@ -31,6 +56,7 @@ type tenantQueues struct {
mapping *Mapping[*tenantQueue]
maxUserQueueSize int
perUserQueueLen intPointerMap
// How long to wait before removing a querier which has got disconnected
// but hasn't notified about a graceful shutdown.
@ -76,6 +102,7 @@ func newTenantQueues(maxUserQueueSize int, forgetDelay time.Duration) *tenantQue
return &tenantQueues{
mapping: mm,
maxUserQueueSize: maxUserQueueSize,
perUserQueueLen: make(intPointerMap),
forgetDelay: forgetDelay,
queriers: map[string]*querier{},
sortedQueriers: nil,

Loading…
Cancel
Save