Implement hierarchical queues for query scheduler (#8691)

### What this PR does / why we need it:

This PR is the first step towards hierarchical queues within the query scheduler.
Hierarchical queues can be used to ensure query fairness across multiple actors within a single tenant, such as individual (human) users using the same tenant.

The change is described in the LID ["Query fairness across users within tenants"](https://grafana.com/docs/loki/next/lids/0003-queryfairnessinscheduler/).

* Extract metrics for `RequestQueue` into their own `metrics` field, so that they can extended and passed more easily in the future.
* Move mapping for tenant queues into its own data structure (ordered map) so its functionality can be reused. It also simplifies the code inside the `tenantQueues` struct.
* Add the `LeafQueue` struct that implements hierarchical queues. Sub-queues are dequeued in a round-robin manner using the same implementation of `Mapping` that is also used by the tenant queues.

### Special notes for your reviewer

Part of https://github.com/grafana/loki/pull/8585
Contains the changes from PR https://github.com/grafana/loki/pull/8722

**This PR does not yet make use of the new hierarchical queues, it only adds the relevant data structures. Wiring up will be done in a follow up PR.**

While the new `Mapping` data structure adds an unsignificant overhead to the time/op I consider the simplification an overall better tradeoff.

```console
$ benchstat scheduler-bench-old.txt scheduler-bench-new.txt 
name              old time/op    new time/op    delta
GetNextRequest-8    18.0µs ± 2%    20.5µs ± 3%  +13.89%  (p=0.008 n=5+5)
QueueRequest-8      56.6µs ± 0%    53.5µs ± 3%   -5.48%  (p=0.008 n=5+5)

name              old alloc/op   new alloc/op   delta
GetNextRequest-8    1.60kB ± 0%    1.60kB ± 0%     ~     (all equal)
QueueRequest-8      35.6kB ± 0%    29.8kB ± 0%  -16.37%  (p=0.008 n=5+5)

name              old allocs/op  new allocs/op  delta
GetNextRequest-8       100 ± 0%       100 ± 0%     ~     (all equal)
QueueRequest-8         819 ± 0%       806 ± 0%   -1.59%  (p=0.008 n=5+5)
```

#### Complexity of operations

| Operation | Function | Complexity |
| --------- | -------- | ---------- |
| Insert | `Put(key string, value *tenantQueue) bool` | `O(1)`|
| Select | `GetNext(idx QueueIndex) *tenantQueue` | `O(1)` (best case)
`O(n)` (worst case) |
| Remove | `Remove(key string) bool` | `O(1)` |

---

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/8734/head^2
Christian Haudum 3 years ago committed by GitHub
parent 162a2d0057
commit 4b74a5b815
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      pkg/lokifrontend/frontend/v1/frontend.go
  2. 11
      pkg/lokifrontend/frontend/v1/frontend_test.go
  3. 137
      pkg/scheduler/queue/leafqueue.go
  4. 171
      pkg/scheduler/queue/leafqueue_test.go
  5. 117
      pkg/scheduler/queue/mapping.go
  6. 85
      pkg/scheduler/queue/mapping_test.go
  7. 30
      pkg/scheduler/queue/queue.go
  8. 57
      pkg/scheduler/queue/queue_test.go
  9. 130
      pkg/scheduler/queue/tenant_queues.go
  10. 28
      pkg/scheduler/queue/tenant_queues_test.go
  11. 7
      pkg/scheduler/scheduler.go

@ -99,7 +99,12 @@ func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Regist
}),
}
f.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, f.queueLength, f.discardedRequests)
metrics := &queue.Metrics{
QueueLength: f.queueLength,
DiscardedRequests: f.discardedRequests,
}
f.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, metrics)
f.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(f.cleanupInactiveUserMetrics)
var err error

@ -126,12 +126,13 @@ func TestFrontendCheckReady(t *testing.T) {
{"no url, no clients is not ready", 0, "not ready: number of queriers connected to query-frontend is 0", false},
} {
t.Run(tt.name, func(t *testing.T) {
m := &queue.Metrics{
QueueLength: prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
DiscardedRequests: prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
}
f := &Frontend{
log: log.NewNopLogger(),
requestQueue: queue.NewRequestQueue(5, 0,
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
),
log: log.NewNopLogger(),
requestQueue: queue.NewRequestQueue(5, 0, m),
}
for i := 0; i < tt.connectedClients; i++ {
f.requestQueue.RegisterQuerierConnection("test")

@ -0,0 +1,137 @@
package queue
import (
"fmt"
"strings"
)
type QueuePath []string //nolint:revive
// LeafQueue is an hierarchical queue implementation where each sub-queue
// has the same guarantees to be chosen from.
// Each queue has also a local queue, which gets chosen from first. Only if the
// local queue is empty, items from the sub-queues are dequeued.
type LeafQueue struct {
// local queue
ch RequestChannel
// index of where this item is located in the mapping
pos QueueIndex
// index of the sub-queues
current QueueIndex
// mapping for sub-queues
mapping *Mapping[*LeafQueue]
// name of the queue
name string
// maximum queue size of the local queue
size int
}
// newLeafQueue creates a new LeafQueue instance
func newLeafQueue(size int, name string) *LeafQueue {
m := &Mapping[*LeafQueue]{}
m.Init(64) // TODO(chaudum): What is a good initial value?
return &LeafQueue{
ch: make(RequestChannel, size),
pos: StartIndex,
current: StartIndex,
mapping: m,
name: name,
size: size,
}
}
// add recursively adds queues based on given path
func (q *LeafQueue) add(ident QueuePath) *LeafQueue {
if len(ident) == 0 {
return nil
}
curr := ident[0]
queue, created := q.getOrCreate(curr)
if created {
q.mapping.Put(queue.Name(), queue)
}
if len(ident[1:]) > 0 {
queue.add(ident[1:])
}
return queue
}
func (q *LeafQueue) getOrCreate(ident string) (subq *LeafQueue, created bool) {
subq = q.mapping.GetByKey(ident)
if subq == nil {
subq = newLeafQueue(q.size, ident)
created = true
}
return subq, created
}
// Chan implements Queue
func (q *LeafQueue) Chan() RequestChannel {
return q.ch
}
// Dequeue implements Queue
func (q *LeafQueue) Dequeue() Request {
// first, return item from local channel
if len(q.ch) > 0 {
return <-q.ch
}
// only if there are no items queued in the local queue, dequeue from sub-queues
maxIter := q.mapping.Len()
for iters := 0; iters < maxIter; iters++ {
subq := q.mapping.GetNext(q.current)
if subq != nil {
q.current = subq.pos
item := subq.Dequeue()
if item != nil {
return item
}
q.mapping.Remove(subq.name)
}
}
return nil
}
// Name implements Queue
func (q *LeafQueue) Name() string {
return q.name
}
// Len implements Queue
// It returns the length of the local queue and all sub-queues.
// This may be expensive depending on the size of the queue tree.
func (q *LeafQueue) Len() int {
count := len(q.ch)
for _, subq := range q.mapping.Values() {
count += subq.Len()
}
return count
}
// Index implements Mapable
func (q *LeafQueue) Pos() QueueIndex {
return q.pos
}
// Index implements Mapable
func (q *LeafQueue) SetPos(index QueueIndex) {
q.pos = index
}
// String makes the queue printable
func (q *LeafQueue) String() string {
sb := &strings.Builder{}
sb.WriteString("{")
fmt.Fprintf(sb, "name=%s, len=%d/%d, leafs=[", q.Name(), q.Len(), cap(q.ch))
subqs := q.mapping.Values()
for i, m := range subqs {
sb.WriteString(m.String())
if i < len(subqs)-1 {
sb.WriteString(",")
}
}
sb.WriteString("]")
sb.WriteString("}")
return sb.String()
}

@ -0,0 +1,171 @@
package queue
import (
"testing"
"github.com/stretchr/testify/require"
)
type dummyRequest struct {
id int
}
func r(id int) *dummyRequest {
return &dummyRequest{id}
}
func TestLeafQueue(t *testing.T) {
t.Run("add sub queues recursively", func(t *testing.T) {
pathA := QueuePath([]string{"l0", "l1", "l3"})
pathB := QueuePath([]string{"l0", "l2", "l3"})
q := newLeafQueue(1, "root")
require.NotNil(t, q)
require.Equal(t, "root", q.Name())
require.Equal(t, 0, q.Len())
require.Equal(t, 0, q.mapping.Len())
q.add(pathA)
require.Equal(t, 1, q.mapping.Len())
q.add(pathB)
require.Equal(t, 1, q.mapping.Len())
})
t.Run("enqueue/dequeue to/from subqueues", func(t *testing.T) {
/**
root: [0]
a: [1]
b: [2]
b0: [20]
b1: [21]
c: [3]
c0: [30]
c00: [300]
c01: [301]
c1: [31]
c10: [310]
c11: [311]
**/
paths := []QueuePath{
QueuePath([]string{"a"}),
QueuePath([]string{"b", "b0"}),
QueuePath([]string{"b", "b1"}),
QueuePath([]string{"c", "c0", "c00"}),
QueuePath([]string{"c", "c0", "c01"}),
QueuePath([]string{"c", "c1", "c10"}),
QueuePath([]string{"c", "c1", "c11"}),
}
q := newLeafQueue(10, "root")
require.NotNil(t, q)
for _, p := range paths {
q.add(p)
}
require.Equal(t, 3, q.mapping.Len())
// no items in any queues
require.Equal(t, 0, q.Len())
q.Chan() <- r(0)
require.Equal(t, 1, q.Len())
q.mapping.GetByKey("a").Chan() <- r(1)
require.Equal(t, 2, q.Len())
q.mapping.GetByKey("b").Chan() <- r(2)
q.mapping.GetByKey("b").mapping.GetByKey("b0").Chan() <- r(20)
q.mapping.GetByKey("b").mapping.GetByKey("b1").Chan() <- r(21)
require.Equal(t, 5, q.Len())
q.mapping.GetByKey("c").Chan() <- r(3)
q.mapping.GetByKey("c").mapping.GetByKey("c0").Chan() <- r(30)
q.mapping.GetByKey("c").mapping.GetByKey("c0").mapping.GetByKey("c00").Chan() <- r(300)
q.mapping.GetByKey("c").mapping.GetByKey("c0").mapping.GetByKey("c01").Chan() <- r(301)
q.mapping.GetByKey("c").mapping.GetByKey("c1").Chan() <- r(31)
q.mapping.GetByKey("c").mapping.GetByKey("c1").mapping.GetByKey("c10").Chan() <- r(310)
q.mapping.GetByKey("c").mapping.GetByKey("c1").mapping.GetByKey("c11").Chan() <- r(311)
require.Equal(t, 12, q.Len())
t.Log(q)
items := make([]int, 0, q.Len())
for q.Len() > 0 {
r := q.Dequeue()
if r == nil {
continue
}
items = append(items, r.(*dummyRequest).id)
}
require.Len(t, items, 12)
require.Equal(t, []int{0, 1, 2, 3, 20, 30, 21, 31, 300, 310, 301, 311}, items)
})
t.Run("dequeue ensure round-robin", func(t *testing.T) {
/**
root:
a: [100, 101, 102]
b: [200]
c: [300, 301]
**/
paths := []QueuePath{
QueuePath([]string{"a"}),
QueuePath([]string{"b"}),
QueuePath([]string{"c"}),
}
q := newLeafQueue(10, "root")
require.NotNil(t, q)
for _, p := range paths {
q.add(p)
}
require.Equal(t, 3, q.mapping.Len())
// no items in any queues
require.Equal(t, 0, q.Len())
q.mapping.GetByKey("a").Chan() <- r(100)
q.mapping.GetByKey("a").Chan() <- r(101)
q.mapping.GetByKey("a").Chan() <- r(102)
q.mapping.GetByKey("b").Chan() <- r(200)
q.mapping.GetByKey("c").Chan() <- r(300)
q.mapping.GetByKey("c").Chan() <- r(301)
t.Log(q)
items := make([]int, 0, q.Len())
for q.Len() > 0 {
r := q.Dequeue()
if r == nil {
continue
}
items = append(items, r.(*dummyRequest).id)
}
require.Len(t, items, 6)
require.Equal(t, []int{100, 200, 300, 101, 301, 102}, items)
})
t.Run("empty sub-queues are removed", func(t *testing.T) {
q := newLeafQueue(10, "root")
q.add(QueuePath{"a"})
q.add(QueuePath{"b"})
q.mapping.GetByKey("a").Chan() <- r(1)
q.mapping.GetByKey("b").Chan() <- r(2)
t.Log(q)
// drain queue
r := q.Dequeue()
for r != nil {
r = q.Dequeue()
}
require.Nil(t, q.mapping.GetByKey("a"))
require.Nil(t, q.mapping.GetByKey("b"))
})
}

@ -0,0 +1,117 @@
package queue
type Mapable interface {
*tenantQueue | *LeafQueue
// https://github.com/golang/go/issues/48522#issuecomment-924348755
Pos() QueueIndex
SetPos(index QueueIndex)
}
var empty = string([]byte{byte(0)})
// Mapping is a map-like data structure that allows accessing its items not
// only by key but also by index.
// When an item is removed, the iinternal key array is not resized, but the
// removed place is marked as empty. This allows to remove keys without
// changing the index of the remaining items after the removed key.
// Mapping uses *tenantQueue as concrete value and keys of type string.
// The data structure is not thread-safe.
type Mapping[v Mapable] struct {
m map[string]v
keys []string
empty []QueueIndex
}
func (m *Mapping[v]) Init(size int) {
m.m = make(map[string]v, size)
m.keys = make([]string, 0, size)
m.empty = make([]QueueIndex, 0, size)
}
func (m *Mapping[v]) Put(key string, value v) bool {
// do not allow empty string or 0 byte string as key
if key == "" || key == empty {
return false
}
if len(m.empty) == 0 {
value.SetPos(QueueIndex(len(m.keys)))
m.keys = append(m.keys, key)
} else {
idx := m.empty[0]
m.empty = m.empty[1:]
m.keys[idx] = key
value.SetPos(idx)
}
m.m[key] = value
return true
}
func (m *Mapping[v]) Get(idx QueueIndex) v {
if len(m.keys) == 0 {
return nil
}
k := m.keys[idx]
return m.GetByKey(k)
}
func (m *Mapping[v]) GetNext(idx QueueIndex) v {
if len(m.keys) == 0 {
return nil
}
// convert to int
i := int(idx)
// proceed to the next index
i = i + 1
// start from beginning if next index exceeds slice length
if i >= len(m.keys) {
i = 0
}
for i < len(m.keys) {
k := m.keys[i]
if k != empty {
return m.GetByKey(k)
}
i++
}
return nil
}
func (m *Mapping[v]) GetByKey(key string) v {
// do not allow empty string or 0 byte string as key
if key == "" || key == empty {
return nil
}
return m.m[key]
}
func (m *Mapping[v]) Remove(key string) bool {
e := m.m[key]
if e == nil {
return false
}
delete(m.m, key)
m.keys[e.Pos()] = empty
m.empty = append(m.empty, e.Pos())
return true
}
func (m *Mapping[v]) Keys() []string {
return m.keys
}
func (m *Mapping[v]) Values() []v {
values := make([]v, 0, len(m.keys))
for _, k := range m.keys {
if k == empty {
continue
}
values = append(values, m.m[k])
}
return values
}
func (m *Mapping[v]) Len() int {
return len(m.keys) - len(m.empty)
}

@ -0,0 +1,85 @@
package queue
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestQueueMapping(t *testing.T) {
// Individual sub-tests in this test case are reflecting a scenario and need
// to be executed in sequential order.
m := &Mapping[*LeafQueue]{}
m.Init(16)
require.Equal(t, m.Len(), 0)
t.Run("put item to mapping", func(t *testing.T) {
q1 := newLeafQueue(10, "queue-1")
m.Put(q1.Name(), q1)
require.Equal(t, 1, m.Len())
require.Equal(t, []string{"queue-1"}, m.Keys())
})
t.Run("insert order is preserved if there is no empty slot", func(t *testing.T) {
q2 := newLeafQueue(10, "queue-2")
m.Put(q2.Name(), q2)
require.Equal(t, 2, m.Len())
require.Equal(t, []string{"queue-1", "queue-2"}, m.Keys())
})
t.Run("insert into empty slot if item was removed previously", func(t *testing.T) {
ok := m.Remove("queue-1")
require.True(t, ok)
require.Equal(t, 1, m.Len())
q3 := newLeafQueue(10, "queue-3")
m.Put(q3.Name(), q3)
require.Equal(t, 2, m.Len())
require.Equal(t, []string{"queue-3", "queue-2"}, m.Keys())
})
t.Run("insert order is preserved across keys and values", func(t *testing.T) {
q4 := newLeafQueue(10, "queue-4")
m.Put(q4.Name(), q4)
require.Equal(t, 3, m.Len())
for idx, v := range m.Values() {
require.Equal(t, v.Name(), m.Keys()[idx])
}
})
t.Run("get by key", func(t *testing.T) {
key := "queue-2"
item := m.GetByKey(key)
require.Equal(t, key, item.Name())
require.Equal(t, QueueIndex(1), item.Pos())
})
t.Run("get by empty key returns nil", func(t *testing.T) {
require.Nil(t, m.GetByKey(""))
require.Nil(t, m.GetByKey(empty))
})
t.Run("get next item based on index must not skip when items are removed", func(t *testing.T) {
item := m.GetNext(StartIndex)
require.Equal(t, "queue-3", item.Name())
item = m.GetNext(item.Pos())
require.Equal(t, "queue-2", item.Name())
m.Remove(item.Name())
item = m.GetNext(item.Pos())
require.Equal(t, "queue-4", item.Name())
})
t.Run("get next item out of range returns first item", func(t *testing.T) {
item := m.GetNext(100)
require.Equal(t, "queue-3", item.Name())
})
t.Run("get next item skips empty slots", func(t *testing.T) {
item := m.GetNext(100)
require.Equal(t, "queue-3", item.Name())
item = m.GetNext(item.Pos())
require.Equal(t, "queue-4", item.Name())
})
}

@ -25,6 +25,9 @@ var (
// of RequestQueue.GetNextRequestForQuerier method.
type QueueIndex int // nolint:revive
// StartIndex is the UserIndex that starts iteration over tenant queues from the very first tenant.
var StartIndex QueueIndex = -1
// Modify index to start iteration on the same tenant, for which last queue was returned.
func (ui QueueIndex) ReuseLastIndex() QueueIndex {
if ui < 0 {
@ -33,9 +36,6 @@ func (ui QueueIndex) ReuseLastIndex() QueueIndex {
return ui - 1
}
// StartIndex is the UserIndex that starts iteration over tenant queues from the very first tenant.
var StartIndex QueueIndex = -1
// Request stored into the queue.
type Request any
@ -54,17 +54,19 @@ type RequestQueue struct {
cond contextCond // Notified when request is enqueued or dequeued, or querier is disconnected.
queues *tenantQueues
stopped bool
metrics *Metrics
}
queueLength *prometheus.GaugeVec // Per tenant and reason.
discardedRequests *prometheus.CounterVec // Per tenant.
type Metrics struct {
QueueLength *prometheus.GaugeVec // Per tenant and reason.
DiscardedRequests *prometheus.CounterVec // Per tenant.
}
func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec) *RequestQueue {
func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, metrics *Metrics) *RequestQueue {
q := &RequestQueue{
queues: newTenantQueues(maxOutstandingPerTenant, forgetDelay),
connectedQuerierWorkers: atomic.NewInt32(0),
queueLength: queueLength,
discardedRequests: discardedRequests,
metrics: metrics,
}
q.cond = contextCond{Cond: sync.NewCond(&q.mtx)}
@ -93,8 +95,8 @@ func (q *RequestQueue) Enqueue(tenant string, req Request, maxQueriers int, succ
}
select {
case queue <- req:
q.queueLength.WithLabelValues(tenant).Inc()
case queue.Chan() <- req:
q.metrics.QueueLength.WithLabelValues(tenant).Inc()
q.cond.Broadcast()
// Call this function while holding a lock. This guarantees that no querier can fetch the request before function returns.
if successFn != nil {
@ -102,7 +104,7 @@ func (q *RequestQueue) Enqueue(tenant string, req Request, maxQueriers int, succ
}
return nil
default:
q.discardedRequests.WithLabelValues(tenant).Inc()
q.metrics.DiscardedRequests.WithLabelValues(tenant).Inc()
return ErrTooManyRequests
}
}
@ -140,12 +142,12 @@ FindQueue:
// Pick next request from the queue.
for {
request := <-queue
if len(queue) == 0 {
request := queue.Dequeue()
if queue.Len() == 0 {
q.queues.deleteQueue(tenant)
}
q.queueLength.WithLabelValues(tenant).Dec()
q.metrics.QueueLength.WithLabelValues(tenant).Dec()
// Tell close() we've processed a request.
q.cond.Broadcast()

@ -22,10 +22,11 @@ func BenchmarkGetNextRequest(b *testing.B) {
queues := make([]*RequestQueue, 0, b.N)
for n := 0; n < b.N; n++ {
queue := NewRequestQueue(maxOutstandingPerTenant, 0,
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
)
m := &Metrics{
QueueLength: prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
DiscardedRequests: prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
}
queue := NewRequestQueue(maxOutstandingPerTenant, 0, m)
queues = append(queues, queue)
for ix := 0; ix < queriers; ix++ {
@ -42,29 +43,30 @@ func BenchmarkGetNextRequest(b *testing.B) {
}
}
}
}
querierNames := make([]string, queriers)
for x := 0; x < queriers; x++ {
querierNames[x] = fmt.Sprintf("querier-%d", x)
}
ctx := context.Background()
b.ResetTimer()
for i := 0; i < b.N; i++ {
idx := StartIndex
for j := 0; j < maxOutstandingPerTenant*numTenants; j++ {
querier := ""
b:
// Find querier with at least one request to avoid blocking in getNextRequestForQuerier.
for _, q := range queues[i].queues.queues {
for qid := range q.queriers {
querier = qid
break b
for j := 0; j < queriers; j++ {
idx := StartIndex
for x := 0; x < maxOutstandingPerTenant*numTenants/queriers; x++ {
r, nidx, err := queues[i].Dequeue(ctx, idx, querierNames[j])
if r == nil {
break
}
if err != nil {
b.Fatal(err)
}
idx = nidx
}
_, nidx, err := queues[i].Dequeue(ctx, idx, querier)
if err != nil {
b.Fatal(err)
}
idx = nidx
}
}
}
@ -79,10 +81,11 @@ func BenchmarkQueueRequest(b *testing.B) {
requests := make([]string, 0, numTenants)
for n := 0; n < b.N; n++ {
q := NewRequestQueue(maxOutstandingPerTenant, 0,
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
)
m := &Metrics{
QueueLength: prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
DiscardedRequests: prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
}
q := NewRequestQueue(maxOutstandingPerTenant, 0, m)
for ix := 0; ix < queriers; ix++ {
q.RegisterQuerierConnection(fmt.Sprintf("querier-%d", ix))
@ -112,9 +115,11 @@ func BenchmarkQueueRequest(b *testing.B) {
func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBecauseQuerierHasBeenForgotten(t *testing.T) {
const forgetDelay = 3 * time.Second
queue := NewRequestQueue(1, forgetDelay,
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}))
m := &Metrics{
QueueLength: prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
DiscardedRequests: prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
}
queue := NewRequestQueue(1, forgetDelay, m)
// Start the queue service.
ctx := context.Background()

@ -28,12 +28,7 @@ type querier struct {
// This struct holds tenant queues for pending requests. It also keeps track of connected queriers,
// and mapping between tenants and queriers.
type tenantQueues struct {
queues map[string]*tenantQueue
// List of all tenants with queues, used for iteration when searching for next queue to handle.
// Tenants removed from the middle are replaced with "". To avoid skipping tenants during iteration, we only shrink
// this list when there are ""'s at the end of it.
tenants []string
mapping *Mapping[*tenantQueue]
maxUserQueueSize int
@ -48,9 +43,19 @@ type tenantQueues struct {
sortedQueriers []string
}
type Queue interface {
Chan() RequestChannel
Dequeue() Request
Name() string
Len() int
}
type tenantQueue struct {
ch RequestChannel
// name of the queue (aka tenant)
name string
// If not nil, only these queriers can handle user requests. If nil, all queriers can.
// We set this to nil if number of available queriers <= maxQueriers.
queriers map[string]struct{}
@ -61,13 +66,44 @@ type tenantQueue struct {
seed int64
// Points back to 'users' field in queues. Enables quick cleanup.
index int
index QueueIndex
}
// Chan implements Queue
func (q *tenantQueue) Chan() RequestChannel {
return q.ch
}
// Dequeue implements Queue
func (q *tenantQueue) Dequeue() Request {
return <-q.ch
}
// Name implements Queue
func (q *tenantQueue) Name() string {
return q.name
}
// Len implements Queue
func (q *tenantQueue) Len() int {
return len(q.ch)
}
// Len implements Mapable
func (q *tenantQueue) Pos() QueueIndex {
return q.index
}
// Len implements Mapable
func (q *tenantQueue) SetPos(index QueueIndex) {
q.index = index
}
func newTenantQueues(maxUserQueueSize int, forgetDelay time.Duration) *tenantQueues {
mm := &Mapping[*tenantQueue]{}
mm.Init(64)
return &tenantQueues{
queues: map[string]*tenantQueue{},
tenants: nil,
mapping: mm,
maxUserQueueSize: maxUserQueueSize,
forgetDelay: forgetDelay,
queriers: map[string]*querier{},
@ -76,29 +112,18 @@ func newTenantQueues(maxUserQueueSize int, forgetDelay time.Duration) *tenantQue
}
func (q *tenantQueues) len() int {
return len(q.queues)
return q.mapping.Len()
}
func (q *tenantQueues) deleteQueue(tenant string) {
uq := q.queues[tenant]
if uq == nil {
return
}
delete(q.queues, tenant)
q.tenants[uq.index] = ""
// Shrink users list size if possible. This is safe, and no users will be skipped during iteration.
for ix := len(q.tenants) - 1; ix >= 0 && q.tenants[ix] == ""; ix-- {
q.tenants = q.tenants[:ix]
}
q.mapping.Remove(tenant)
}
// Returns existing or new queue for a tenant.
// MaxQueriers is used to compute which queriers should handle requests for this tenant.
// If maxQueriers is <= 0, all queriers can handle this tenant's requests.
// If maxQueriers has changed since the last call, queriers for this are recomputed.
func (q *tenantQueues) getOrAddQueue(tenant string, maxQueriers int) RequestChannel {
func (q *tenantQueues) getOrAddQueue(tenant string, maxQueriers int) Queue {
// Empty tenant is not allowed, as that would break our tenants list ("" is used for free spot).
if tenant == "" {
return nil
@ -108,30 +133,14 @@ func (q *tenantQueues) getOrAddQueue(tenant string, maxQueriers int) RequestChan
maxQueriers = 0
}
uq := q.queues[tenant]
uq := q.mapping.GetByKey(tenant)
if uq == nil {
uq = &tenantQueue{
ch: make(RequestChannel, q.maxUserQueueSize),
seed: util.ShuffleShardSeed(tenant, ""),
index: -1,
}
q.queues[tenant] = uq
// Add user to the list of users... find first free spot, and put it there.
for ix, u := range q.tenants {
if u == "" {
uq.index = ix
q.tenants[ix] = tenant
break
}
}
// ... or add to the end.
if uq.index < 0 {
uq.index = len(q.tenants)
q.tenants = append(q.tenants, tenant)
ch: make(RequestChannel, q.maxUserQueueSize),
seed: util.ShuffleShardSeed(tenant, ""),
name: tenant,
}
q.mapping.Put(tenant, uq)
}
if uq.maxQueriers != maxQueriers {
@ -139,13 +148,13 @@ func (q *tenantQueues) getOrAddQueue(tenant string, maxQueriers int) RequestChan
uq.queriers = shuffleQueriersForTenants(uq.seed, maxQueriers, q.sortedQueriers, nil)
}
return uq.ch
return uq
}
// Finds next queue for the querier. To support fair scheduling between users, client is expected
// to pass last user index returned by this function as argument. Is there was no previous
// last user index, use -1.
func (q *tenantQueues) getNextQueueForQuerier(lastUserIndex QueueIndex, querierID string) (RequestChannel, string, QueueIndex) {
func (q *tenantQueues) getNextQueueForQuerier(lastUserIndex QueueIndex, querierID string) (Queue, string, QueueIndex) {
uid := lastUserIndex
// Ensure the querier is not shutting down. If the querier is shutting down, we shouldn't forward
@ -154,31 +163,22 @@ func (q *tenantQueues) getNextQueueForQuerier(lastUserIndex QueueIndex, querierI
return nil, "", uid
}
for iters := 0; iters < len(q.tenants); iters++ {
uid = uid + 1
// Don't use "mod len(q.users)", as that could skip users at the beginning of the list
// for example when q.users has shrunk since last call.
if int(uid) >= len(q.tenants) {
uid = 0
}
u := q.tenants[uid]
if u == "" {
continue
for iters := 0; iters < q.mapping.Len(); iters++ {
tq := q.mapping.GetNext(uid)
if tq == nil {
break
}
uid = tq.index
q := q.queues[u]
if q.queriers != nil {
if _, ok := q.queriers[querierID]; !ok {
if tq.queriers != nil {
if _, ok := tq.queriers[querierID]; !ok {
// This querier is not handling the user.
continue
}
}
return q.ch, u, uid
return tq, tq.name, uid
}
return nil, "", uid
}
@ -284,7 +284,7 @@ func (q *tenantQueues) forgetDisconnectedQueriers(now time.Time) int {
func (q *tenantQueues) recomputeUserQueriers() {
scratchpad := make([]string, 0, len(q.sortedQueriers))
for _, uq := range q.queues {
for _, uq := range q.mapping.Values() {
uq.queriers = shuffleQueriersForTenants(uq.seed, uq.maxQueriers, q.sortedQueriers, scratchpad)
}
}

@ -133,7 +133,7 @@ func TestQueuesWithQueriers(t *testing.T) {
getOrAdd(t, uq, uid, maxQueriersPerUser)
// Verify it has maxQueriersPerUser queriers assigned now.
qs := uq.queues[uid].queriers
qs := uq.mapping.GetByKey(uid).queriers
assert.Equal(t, maxQueriersPerUser, len(qs))
}
@ -397,7 +397,7 @@ func generateQuerier(r *rand.Rand) string {
return fmt.Sprint("querier-", r.Int()%5)
}
func getOrAdd(t *testing.T, uq *tenantQueues, tenant string, maxQueriers int) RequestChannel {
func getOrAdd(t *testing.T, uq *tenantQueues, tenant string, maxQueriers int) Queue {
q := uq.getOrAddQueue(tenant, maxQueriers)
assert.NotNil(t, q)
assert.NoError(t, isConsistent(uq))
@ -405,8 +405,9 @@ func getOrAdd(t *testing.T, uq *tenantQueues, tenant string, maxQueriers int) Re
return q
}
func confirmOrderForQuerier(t *testing.T, uq *tenantQueues, querier string, lastUserIndex QueueIndex, qs ...RequestChannel) QueueIndex {
var n RequestChannel
func confirmOrderForQuerier(t *testing.T, uq *tenantQueues, querier string, lastUserIndex QueueIndex, qs ...Queue) QueueIndex {
t.Helper()
var n Queue
for _, q := range qs {
n, _, lastUserIndex = uq.getNextQueueForQuerier(lastUserIndex, querier)
assert.Equal(t, q, n)
@ -421,24 +422,20 @@ func isConsistent(uq *tenantQueues) error {
}
uc := 0
for ix, u := range uq.tenants {
q := uq.queues[u]
if u != "" && q == nil {
for _, u := range uq.mapping.Keys() {
q := uq.mapping.GetByKey(u)
if u != empty && q == nil {
return fmt.Errorf("user %s doesn't have queue", u)
}
if u == "" && q != nil {
if u == empty && q != nil {
return fmt.Errorf("user %s shouldn't have queue", u)
}
if u == "" {
if u == empty {
continue
}
uc++
if q.index != ix {
return fmt.Errorf("invalid user's index, expected=%d, got=%d", ix, q.index)
}
if q.maxQueriers == 0 && q.queriers != nil {
return fmt.Errorf("user %s has queriers, but maxQueriers=0", u)
}
@ -452,7 +449,7 @@ func isConsistent(uq *tenantQueues) error {
}
}
if uc != len(uq.queues) {
if uc != uq.mapping.Len() {
return fmt.Errorf("inconsistent number of users list and user queues")
}
@ -462,7 +459,8 @@ func isConsistent(uq *tenantQueues) error {
// getUsersByQuerier returns the list of users handled by the provided querierID.
func getUsersByQuerier(queues *tenantQueues, querierID string) []string {
var userIDs []string
for userID, q := range queues.queues {
for _, userID := range queues.mapping.Keys() {
q := queues.mapping.GetByKey(userID)
if q.queriers == nil {
// If it's nil then all queriers can handle this user.
userIDs = append(userIDs, userID)

@ -154,7 +154,12 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe
Name: "cortex_query_scheduler_discarded_requests_total",
Help: "Total number of query requests discarded.",
}, []string{"user"})
s.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, s.queueLength, s.discardedRequests)
metrics := &queue.Metrics{
QueueLength: s.queueLength,
DiscardedRequests: s.discardedRequests,
}
s.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, metrics)
s.queueDuration = promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_query_scheduler_queue_duration_seconds",

Loading…
Cancel
Save