Revert "Implement hierarchical queues for query scheduler" (#8796)

Reverts grafana/loki#8691

These changes caused problems with the scheduler in our dev environments.

I will conduct further testing and include the reverted changes in https://github.com/grafana/loki/pull/8752
tpatterson/cache-json-label-values
Christian Haudum 3 years ago committed by GitHub
parent 61311957a3
commit 22b672986a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      pkg/lokifrontend/frontend/v1/frontend.go
  2. 11
      pkg/lokifrontend/frontend/v1/frontend_test.go
  3. 137
      pkg/scheduler/queue/leafqueue.go
  4. 171
      pkg/scheduler/queue/leafqueue_test.go
  5. 117
      pkg/scheduler/queue/mapping.go
  6. 85
      pkg/scheduler/queue/mapping_test.go
  7. 30
      pkg/scheduler/queue/queue.go
  8. 57
      pkg/scheduler/queue/queue_test.go
  9. 130
      pkg/scheduler/queue/tenant_queues.go
  10. 28
      pkg/scheduler/queue/tenant_queues_test.go
  11. 7
      pkg/scheduler/scheduler.go

@ -99,12 +99,7 @@ func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Regist
}),
}
metrics := &queue.Metrics{
QueueLength: f.queueLength,
DiscardedRequests: f.discardedRequests,
}
f.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, metrics)
f.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, f.queueLength, f.discardedRequests)
f.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(f.cleanupInactiveUserMetrics)
var err error

@ -126,13 +126,12 @@ func TestFrontendCheckReady(t *testing.T) {
{"no url, no clients is not ready", 0, "not ready: number of queriers connected to query-frontend is 0", false},
} {
t.Run(tt.name, func(t *testing.T) {
m := &queue.Metrics{
QueueLength: prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
DiscardedRequests: prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
}
f := &Frontend{
log: log.NewNopLogger(),
requestQueue: queue.NewRequestQueue(5, 0, m),
log: log.NewNopLogger(),
requestQueue: queue.NewRequestQueue(5, 0,
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
),
}
for i := 0; i < tt.connectedClients; i++ {
f.requestQueue.RegisterQuerierConnection("test")

@ -1,137 +0,0 @@
package queue
import (
"fmt"
"strings"
)
type QueuePath []string //nolint:revive
// LeafQueue is an hierarchical queue implementation where each sub-queue
// has the same guarantees to be chosen from.
// Each queue has also a local queue, which gets chosen from first. Only if the
// local queue is empty, items from the sub-queues are dequeued.
type LeafQueue struct {
// local queue
ch RequestChannel
// index of where this item is located in the mapping
pos QueueIndex
// index of the sub-queues
current QueueIndex
// mapping for sub-queues
mapping *Mapping[*LeafQueue]
// name of the queue
name string
// maximum queue size of the local queue
size int
}
// newLeafQueue creates a new LeafQueue instance
func newLeafQueue(size int, name string) *LeafQueue {
m := &Mapping[*LeafQueue]{}
m.Init(64) // TODO(chaudum): What is a good initial value?
return &LeafQueue{
ch: make(RequestChannel, size),
pos: StartIndex,
current: StartIndex,
mapping: m,
name: name,
size: size,
}
}
// add recursively adds queues based on given path
func (q *LeafQueue) add(ident QueuePath) *LeafQueue {
if len(ident) == 0 {
return nil
}
curr := ident[0]
queue, created := q.getOrCreate(curr)
if created {
q.mapping.Put(queue.Name(), queue)
}
if len(ident[1:]) > 0 {
queue.add(ident[1:])
}
return queue
}
func (q *LeafQueue) getOrCreate(ident string) (subq *LeafQueue, created bool) {
subq = q.mapping.GetByKey(ident)
if subq == nil {
subq = newLeafQueue(q.size, ident)
created = true
}
return subq, created
}
// Chan implements Queue
func (q *LeafQueue) Chan() RequestChannel {
return q.ch
}
// Dequeue implements Queue
func (q *LeafQueue) Dequeue() Request {
// first, return item from local channel
if len(q.ch) > 0 {
return <-q.ch
}
// only if there are no items queued in the local queue, dequeue from sub-queues
maxIter := q.mapping.Len()
for iters := 0; iters < maxIter; iters++ {
subq := q.mapping.GetNext(q.current)
if subq != nil {
q.current = subq.pos
item := subq.Dequeue()
if item != nil {
return item
}
q.mapping.Remove(subq.name)
}
}
return nil
}
// Name implements Queue
func (q *LeafQueue) Name() string {
return q.name
}
// Len implements Queue
// It returns the length of the local queue and all sub-queues.
// This may be expensive depending on the size of the queue tree.
func (q *LeafQueue) Len() int {
count := len(q.ch)
for _, subq := range q.mapping.Values() {
count += subq.Len()
}
return count
}
// Index implements Mapable
func (q *LeafQueue) Pos() QueueIndex {
return q.pos
}
// Index implements Mapable
func (q *LeafQueue) SetPos(index QueueIndex) {
q.pos = index
}
// String makes the queue printable
func (q *LeafQueue) String() string {
sb := &strings.Builder{}
sb.WriteString("{")
fmt.Fprintf(sb, "name=%s, len=%d/%d, leafs=[", q.Name(), q.Len(), cap(q.ch))
subqs := q.mapping.Values()
for i, m := range subqs {
sb.WriteString(m.String())
if i < len(subqs)-1 {
sb.WriteString(",")
}
}
sb.WriteString("]")
sb.WriteString("}")
return sb.String()
}

@ -1,171 +0,0 @@
package queue
import (
"testing"
"github.com/stretchr/testify/require"
)
type dummyRequest struct {
id int
}
func r(id int) *dummyRequest {
return &dummyRequest{id}
}
func TestLeafQueue(t *testing.T) {
t.Run("add sub queues recursively", func(t *testing.T) {
pathA := QueuePath([]string{"l0", "l1", "l3"})
pathB := QueuePath([]string{"l0", "l2", "l3"})
q := newLeafQueue(1, "root")
require.NotNil(t, q)
require.Equal(t, "root", q.Name())
require.Equal(t, 0, q.Len())
require.Equal(t, 0, q.mapping.Len())
q.add(pathA)
require.Equal(t, 1, q.mapping.Len())
q.add(pathB)
require.Equal(t, 1, q.mapping.Len())
})
t.Run("enqueue/dequeue to/from subqueues", func(t *testing.T) {
/**
root: [0]
a: [1]
b: [2]
b0: [20]
b1: [21]
c: [3]
c0: [30]
c00: [300]
c01: [301]
c1: [31]
c10: [310]
c11: [311]
**/
paths := []QueuePath{
QueuePath([]string{"a"}),
QueuePath([]string{"b", "b0"}),
QueuePath([]string{"b", "b1"}),
QueuePath([]string{"c", "c0", "c00"}),
QueuePath([]string{"c", "c0", "c01"}),
QueuePath([]string{"c", "c1", "c10"}),
QueuePath([]string{"c", "c1", "c11"}),
}
q := newLeafQueue(10, "root")
require.NotNil(t, q)
for _, p := range paths {
q.add(p)
}
require.Equal(t, 3, q.mapping.Len())
// no items in any queues
require.Equal(t, 0, q.Len())
q.Chan() <- r(0)
require.Equal(t, 1, q.Len())
q.mapping.GetByKey("a").Chan() <- r(1)
require.Equal(t, 2, q.Len())
q.mapping.GetByKey("b").Chan() <- r(2)
q.mapping.GetByKey("b").mapping.GetByKey("b0").Chan() <- r(20)
q.mapping.GetByKey("b").mapping.GetByKey("b1").Chan() <- r(21)
require.Equal(t, 5, q.Len())
q.mapping.GetByKey("c").Chan() <- r(3)
q.mapping.GetByKey("c").mapping.GetByKey("c0").Chan() <- r(30)
q.mapping.GetByKey("c").mapping.GetByKey("c0").mapping.GetByKey("c00").Chan() <- r(300)
q.mapping.GetByKey("c").mapping.GetByKey("c0").mapping.GetByKey("c01").Chan() <- r(301)
q.mapping.GetByKey("c").mapping.GetByKey("c1").Chan() <- r(31)
q.mapping.GetByKey("c").mapping.GetByKey("c1").mapping.GetByKey("c10").Chan() <- r(310)
q.mapping.GetByKey("c").mapping.GetByKey("c1").mapping.GetByKey("c11").Chan() <- r(311)
require.Equal(t, 12, q.Len())
t.Log(q)
items := make([]int, 0, q.Len())
for q.Len() > 0 {
r := q.Dequeue()
if r == nil {
continue
}
items = append(items, r.(*dummyRequest).id)
}
require.Len(t, items, 12)
require.Equal(t, []int{0, 1, 2, 3, 20, 30, 21, 31, 300, 310, 301, 311}, items)
})
t.Run("dequeue ensure round-robin", func(t *testing.T) {
/**
root:
a: [100, 101, 102]
b: [200]
c: [300, 301]
**/
paths := []QueuePath{
QueuePath([]string{"a"}),
QueuePath([]string{"b"}),
QueuePath([]string{"c"}),
}
q := newLeafQueue(10, "root")
require.NotNil(t, q)
for _, p := range paths {
q.add(p)
}
require.Equal(t, 3, q.mapping.Len())
// no items in any queues
require.Equal(t, 0, q.Len())
q.mapping.GetByKey("a").Chan() <- r(100)
q.mapping.GetByKey("a").Chan() <- r(101)
q.mapping.GetByKey("a").Chan() <- r(102)
q.mapping.GetByKey("b").Chan() <- r(200)
q.mapping.GetByKey("c").Chan() <- r(300)
q.mapping.GetByKey("c").Chan() <- r(301)
t.Log(q)
items := make([]int, 0, q.Len())
for q.Len() > 0 {
r := q.Dequeue()
if r == nil {
continue
}
items = append(items, r.(*dummyRequest).id)
}
require.Len(t, items, 6)
require.Equal(t, []int{100, 200, 300, 101, 301, 102}, items)
})
t.Run("empty sub-queues are removed", func(t *testing.T) {
q := newLeafQueue(10, "root")
q.add(QueuePath{"a"})
q.add(QueuePath{"b"})
q.mapping.GetByKey("a").Chan() <- r(1)
q.mapping.GetByKey("b").Chan() <- r(2)
t.Log(q)
// drain queue
r := q.Dequeue()
for r != nil {
r = q.Dequeue()
}
require.Nil(t, q.mapping.GetByKey("a"))
require.Nil(t, q.mapping.GetByKey("b"))
})
}

@ -1,117 +0,0 @@
package queue
type Mapable interface {
*tenantQueue | *LeafQueue
// https://github.com/golang/go/issues/48522#issuecomment-924348755
Pos() QueueIndex
SetPos(index QueueIndex)
}
var empty = string([]byte{byte(0)})
// Mapping is a map-like data structure that allows accessing its items not
// only by key but also by index.
// When an item is removed, the iinternal key array is not resized, but the
// removed place is marked as empty. This allows to remove keys without
// changing the index of the remaining items after the removed key.
// Mapping uses *tenantQueue as concrete value and keys of type string.
// The data structure is not thread-safe.
type Mapping[v Mapable] struct {
m map[string]v
keys []string
empty []QueueIndex
}
func (m *Mapping[v]) Init(size int) {
m.m = make(map[string]v, size)
m.keys = make([]string, 0, size)
m.empty = make([]QueueIndex, 0, size)
}
func (m *Mapping[v]) Put(key string, value v) bool {
// do not allow empty string or 0 byte string as key
if key == "" || key == empty {
return false
}
if len(m.empty) == 0 {
value.SetPos(QueueIndex(len(m.keys)))
m.keys = append(m.keys, key)
} else {
idx := m.empty[0]
m.empty = m.empty[1:]
m.keys[idx] = key
value.SetPos(idx)
}
m.m[key] = value
return true
}
func (m *Mapping[v]) Get(idx QueueIndex) v {
if len(m.keys) == 0 {
return nil
}
k := m.keys[idx]
return m.GetByKey(k)
}
func (m *Mapping[v]) GetNext(idx QueueIndex) v {
if len(m.keys) == 0 {
return nil
}
// convert to int
i := int(idx)
// proceed to the next index
i = i + 1
// start from beginning if next index exceeds slice length
if i >= len(m.keys) {
i = 0
}
for i < len(m.keys) {
k := m.keys[i]
if k != empty {
return m.GetByKey(k)
}
i++
}
return nil
}
func (m *Mapping[v]) GetByKey(key string) v {
// do not allow empty string or 0 byte string as key
if key == "" || key == empty {
return nil
}
return m.m[key]
}
func (m *Mapping[v]) Remove(key string) bool {
e := m.m[key]
if e == nil {
return false
}
delete(m.m, key)
m.keys[e.Pos()] = empty
m.empty = append(m.empty, e.Pos())
return true
}
func (m *Mapping[v]) Keys() []string {
return m.keys
}
func (m *Mapping[v]) Values() []v {
values := make([]v, 0, len(m.keys))
for _, k := range m.keys {
if k == empty {
continue
}
values = append(values, m.m[k])
}
return values
}
func (m *Mapping[v]) Len() int {
return len(m.keys) - len(m.empty)
}

@ -1,85 +0,0 @@
package queue
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestQueueMapping(t *testing.T) {
// Individual sub-tests in this test case are reflecting a scenario and need
// to be executed in sequential order.
m := &Mapping[*LeafQueue]{}
m.Init(16)
require.Equal(t, m.Len(), 0)
t.Run("put item to mapping", func(t *testing.T) {
q1 := newLeafQueue(10, "queue-1")
m.Put(q1.Name(), q1)
require.Equal(t, 1, m.Len())
require.Equal(t, []string{"queue-1"}, m.Keys())
})
t.Run("insert order is preserved if there is no empty slot", func(t *testing.T) {
q2 := newLeafQueue(10, "queue-2")
m.Put(q2.Name(), q2)
require.Equal(t, 2, m.Len())
require.Equal(t, []string{"queue-1", "queue-2"}, m.Keys())
})
t.Run("insert into empty slot if item was removed previously", func(t *testing.T) {
ok := m.Remove("queue-1")
require.True(t, ok)
require.Equal(t, 1, m.Len())
q3 := newLeafQueue(10, "queue-3")
m.Put(q3.Name(), q3)
require.Equal(t, 2, m.Len())
require.Equal(t, []string{"queue-3", "queue-2"}, m.Keys())
})
t.Run("insert order is preserved across keys and values", func(t *testing.T) {
q4 := newLeafQueue(10, "queue-4")
m.Put(q4.Name(), q4)
require.Equal(t, 3, m.Len())
for idx, v := range m.Values() {
require.Equal(t, v.Name(), m.Keys()[idx])
}
})
t.Run("get by key", func(t *testing.T) {
key := "queue-2"
item := m.GetByKey(key)
require.Equal(t, key, item.Name())
require.Equal(t, QueueIndex(1), item.Pos())
})
t.Run("get by empty key returns nil", func(t *testing.T) {
require.Nil(t, m.GetByKey(""))
require.Nil(t, m.GetByKey(empty))
})
t.Run("get next item based on index must not skip when items are removed", func(t *testing.T) {
item := m.GetNext(StartIndex)
require.Equal(t, "queue-3", item.Name())
item = m.GetNext(item.Pos())
require.Equal(t, "queue-2", item.Name())
m.Remove(item.Name())
item = m.GetNext(item.Pos())
require.Equal(t, "queue-4", item.Name())
})
t.Run("get next item out of range returns first item", func(t *testing.T) {
item := m.GetNext(100)
require.Equal(t, "queue-3", item.Name())
})
t.Run("get next item skips empty slots", func(t *testing.T) {
item := m.GetNext(100)
require.Equal(t, "queue-3", item.Name())
item = m.GetNext(item.Pos())
require.Equal(t, "queue-4", item.Name())
})
}

@ -25,9 +25,6 @@ var (
// of RequestQueue.GetNextRequestForQuerier method.
type QueueIndex int // nolint:revive
// StartIndex is the UserIndex that starts iteration over tenant queues from the very first tenant.
var StartIndex QueueIndex = -1
// Modify index to start iteration on the same tenant, for which last queue was returned.
func (ui QueueIndex) ReuseLastIndex() QueueIndex {
if ui < 0 {
@ -36,6 +33,9 @@ func (ui QueueIndex) ReuseLastIndex() QueueIndex {
return ui - 1
}
// StartIndex is the UserIndex that starts iteration over tenant queues from the very first tenant.
var StartIndex QueueIndex = -1
// Request stored into the queue.
type Request any
@ -54,19 +54,17 @@ type RequestQueue struct {
cond contextCond // Notified when request is enqueued or dequeued, or querier is disconnected.
queues *tenantQueues
stopped bool
metrics *Metrics
}
type Metrics struct {
QueueLength *prometheus.GaugeVec // Per tenant and reason.
DiscardedRequests *prometheus.CounterVec // Per tenant.
queueLength *prometheus.GaugeVec // Per tenant and reason.
discardedRequests *prometheus.CounterVec // Per tenant.
}
func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, metrics *Metrics) *RequestQueue {
func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec) *RequestQueue {
q := &RequestQueue{
queues: newTenantQueues(maxOutstandingPerTenant, forgetDelay),
connectedQuerierWorkers: atomic.NewInt32(0),
metrics: metrics,
queueLength: queueLength,
discardedRequests: discardedRequests,
}
q.cond = contextCond{Cond: sync.NewCond(&q.mtx)}
@ -95,8 +93,8 @@ func (q *RequestQueue) Enqueue(tenant string, req Request, maxQueriers int, succ
}
select {
case queue.Chan() <- req:
q.metrics.QueueLength.WithLabelValues(tenant).Inc()
case queue <- req:
q.queueLength.WithLabelValues(tenant).Inc()
q.cond.Broadcast()
// Call this function while holding a lock. This guarantees that no querier can fetch the request before function returns.
if successFn != nil {
@ -104,7 +102,7 @@ func (q *RequestQueue) Enqueue(tenant string, req Request, maxQueriers int, succ
}
return nil
default:
q.metrics.DiscardedRequests.WithLabelValues(tenant).Inc()
q.discardedRequests.WithLabelValues(tenant).Inc()
return ErrTooManyRequests
}
}
@ -142,12 +140,12 @@ FindQueue:
// Pick next request from the queue.
for {
request := queue.Dequeue()
if queue.Len() == 0 {
request := <-queue
if len(queue) == 0 {
q.queues.deleteQueue(tenant)
}
q.metrics.QueueLength.WithLabelValues(tenant).Dec()
q.queueLength.WithLabelValues(tenant).Dec()
// Tell close() we've processed a request.
q.cond.Broadcast()

@ -22,11 +22,10 @@ func BenchmarkGetNextRequest(b *testing.B) {
queues := make([]*RequestQueue, 0, b.N)
for n := 0; n < b.N; n++ {
m := &Metrics{
QueueLength: prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
DiscardedRequests: prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
}
queue := NewRequestQueue(maxOutstandingPerTenant, 0, m)
queue := NewRequestQueue(maxOutstandingPerTenant, 0,
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
)
queues = append(queues, queue)
for ix := 0; ix < queriers; ix++ {
@ -43,30 +42,29 @@ func BenchmarkGetNextRequest(b *testing.B) {
}
}
}
}
querierNames := make([]string, queriers)
for x := 0; x < queriers; x++ {
querierNames[x] = fmt.Sprintf("querier-%d", x)
}
ctx := context.Background()
b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := 0; j < queriers; j++ {
idx := StartIndex
for x := 0; x < maxOutstandingPerTenant*numTenants/queriers; x++ {
r, nidx, err := queues[i].Dequeue(ctx, idx, querierNames[j])
if r == nil {
break
idx := StartIndex
for j := 0; j < maxOutstandingPerTenant*numTenants; j++ {
querier := ""
b:
// Find querier with at least one request to avoid blocking in getNextRequestForQuerier.
for _, q := range queues[i].queues.queues {
for qid := range q.queriers {
querier = qid
break b
}
if err != nil {
b.Fatal(err)
}
idx = nidx
}
_, nidx, err := queues[i].Dequeue(ctx, idx, querier)
if err != nil {
b.Fatal(err)
}
idx = nidx
}
}
}
@ -81,11 +79,10 @@ func BenchmarkQueueRequest(b *testing.B) {
requests := make([]string, 0, numTenants)
for n := 0; n < b.N; n++ {
m := &Metrics{
QueueLength: prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
DiscardedRequests: prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
}
q := NewRequestQueue(maxOutstandingPerTenant, 0, m)
q := NewRequestQueue(maxOutstandingPerTenant, 0,
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
)
for ix := 0; ix < queriers; ix++ {
q.RegisterQuerierConnection(fmt.Sprintf("querier-%d", ix))
@ -115,11 +112,9 @@ func BenchmarkQueueRequest(b *testing.B) {
func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBecauseQuerierHasBeenForgotten(t *testing.T) {
const forgetDelay = 3 * time.Second
m := &Metrics{
QueueLength: prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
DiscardedRequests: prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
}
queue := NewRequestQueue(1, forgetDelay, m)
queue := NewRequestQueue(1, forgetDelay,
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}))
// Start the queue service.
ctx := context.Background()

@ -28,7 +28,12 @@ type querier struct {
// This struct holds tenant queues for pending requests. It also keeps track of connected queriers,
// and mapping between tenants and queriers.
type tenantQueues struct {
mapping *Mapping[*tenantQueue]
queues map[string]*tenantQueue
// List of all tenants with queues, used for iteration when searching for next queue to handle.
// Tenants removed from the middle are replaced with "". To avoid skipping tenants during iteration, we only shrink
// this list when there are ""'s at the end of it.
tenants []string
maxUserQueueSize int
@ -43,19 +48,9 @@ type tenantQueues struct {
sortedQueriers []string
}
type Queue interface {
Chan() RequestChannel
Dequeue() Request
Name() string
Len() int
}
type tenantQueue struct {
ch RequestChannel
// name of the queue (aka tenant)
name string
// If not nil, only these queriers can handle user requests. If nil, all queriers can.
// We set this to nil if number of available queriers <= maxQueriers.
queriers map[string]struct{}
@ -66,44 +61,13 @@ type tenantQueue struct {
seed int64
// Points back to 'users' field in queues. Enables quick cleanup.
index QueueIndex
}
// Chan implements Queue
func (q *tenantQueue) Chan() RequestChannel {
return q.ch
}
// Dequeue implements Queue
func (q *tenantQueue) Dequeue() Request {
return <-q.ch
}
// Name implements Queue
func (q *tenantQueue) Name() string {
return q.name
}
// Len implements Queue
func (q *tenantQueue) Len() int {
return len(q.ch)
}
// Len implements Mapable
func (q *tenantQueue) Pos() QueueIndex {
return q.index
}
// Len implements Mapable
func (q *tenantQueue) SetPos(index QueueIndex) {
q.index = index
index int
}
func newTenantQueues(maxUserQueueSize int, forgetDelay time.Duration) *tenantQueues {
mm := &Mapping[*tenantQueue]{}
mm.Init(64)
return &tenantQueues{
mapping: mm,
queues: map[string]*tenantQueue{},
tenants: nil,
maxUserQueueSize: maxUserQueueSize,
forgetDelay: forgetDelay,
queriers: map[string]*querier{},
@ -112,18 +76,29 @@ func newTenantQueues(maxUserQueueSize int, forgetDelay time.Duration) *tenantQue
}
func (q *tenantQueues) len() int {
return q.mapping.Len()
return len(q.queues)
}
func (q *tenantQueues) deleteQueue(tenant string) {
q.mapping.Remove(tenant)
uq := q.queues[tenant]
if uq == nil {
return
}
delete(q.queues, tenant)
q.tenants[uq.index] = ""
// Shrink users list size if possible. This is safe, and no users will be skipped during iteration.
for ix := len(q.tenants) - 1; ix >= 0 && q.tenants[ix] == ""; ix-- {
q.tenants = q.tenants[:ix]
}
}
// Returns existing or new queue for a tenant.
// MaxQueriers is used to compute which queriers should handle requests for this tenant.
// If maxQueriers is <= 0, all queriers can handle this tenant's requests.
// If maxQueriers has changed since the last call, queriers for this are recomputed.
func (q *tenantQueues) getOrAddQueue(tenant string, maxQueriers int) Queue {
func (q *tenantQueues) getOrAddQueue(tenant string, maxQueriers int) RequestChannel {
// Empty tenant is not allowed, as that would break our tenants list ("" is used for free spot).
if tenant == "" {
return nil
@ -133,14 +108,30 @@ func (q *tenantQueues) getOrAddQueue(tenant string, maxQueriers int) Queue {
maxQueriers = 0
}
uq := q.mapping.GetByKey(tenant)
uq := q.queues[tenant]
if uq == nil {
uq = &tenantQueue{
ch: make(RequestChannel, q.maxUserQueueSize),
seed: util.ShuffleShardSeed(tenant, ""),
name: tenant,
ch: make(RequestChannel, q.maxUserQueueSize),
seed: util.ShuffleShardSeed(tenant, ""),
index: -1,
}
q.queues[tenant] = uq
// Add user to the list of users... find first free spot, and put it there.
for ix, u := range q.tenants {
if u == "" {
uq.index = ix
q.tenants[ix] = tenant
break
}
}
// ... or add to the end.
if uq.index < 0 {
uq.index = len(q.tenants)
q.tenants = append(q.tenants, tenant)
}
q.mapping.Put(tenant, uq)
}
if uq.maxQueriers != maxQueriers {
@ -148,13 +139,13 @@ func (q *tenantQueues) getOrAddQueue(tenant string, maxQueriers int) Queue {
uq.queriers = shuffleQueriersForTenants(uq.seed, maxQueriers, q.sortedQueriers, nil)
}
return uq
return uq.ch
}
// Finds next queue for the querier. To support fair scheduling between users, client is expected
// to pass last user index returned by this function as argument. Is there was no previous
// last user index, use -1.
func (q *tenantQueues) getNextQueueForQuerier(lastUserIndex QueueIndex, querierID string) (Queue, string, QueueIndex) {
func (q *tenantQueues) getNextQueueForQuerier(lastUserIndex QueueIndex, querierID string) (RequestChannel, string, QueueIndex) {
uid := lastUserIndex
// Ensure the querier is not shutting down. If the querier is shutting down, we shouldn't forward
@ -163,22 +154,31 @@ func (q *tenantQueues) getNextQueueForQuerier(lastUserIndex QueueIndex, querierI
return nil, "", uid
}
for iters := 0; iters < q.mapping.Len(); iters++ {
tq := q.mapping.GetNext(uid)
if tq == nil {
break
for iters := 0; iters < len(q.tenants); iters++ {
uid = uid + 1
// Don't use "mod len(q.users)", as that could skip users at the beginning of the list
// for example when q.users has shrunk since last call.
if int(uid) >= len(q.tenants) {
uid = 0
}
u := q.tenants[uid]
if u == "" {
continue
}
uid = tq.index
if tq.queriers != nil {
if _, ok := tq.queriers[querierID]; !ok {
q := q.queues[u]
if q.queriers != nil {
if _, ok := q.queriers[querierID]; !ok {
// This querier is not handling the user.
continue
}
}
return tq, tq.name, uid
}
return q.ch, u, uid
}
return nil, "", uid
}
@ -284,7 +284,7 @@ func (q *tenantQueues) forgetDisconnectedQueriers(now time.Time) int {
func (q *tenantQueues) recomputeUserQueriers() {
scratchpad := make([]string, 0, len(q.sortedQueriers))
for _, uq := range q.mapping.Values() {
for _, uq := range q.queues {
uq.queriers = shuffleQueriersForTenants(uq.seed, uq.maxQueriers, q.sortedQueriers, scratchpad)
}
}

@ -133,7 +133,7 @@ func TestQueuesWithQueriers(t *testing.T) {
getOrAdd(t, uq, uid, maxQueriersPerUser)
// Verify it has maxQueriersPerUser queriers assigned now.
qs := uq.mapping.GetByKey(uid).queriers
qs := uq.queues[uid].queriers
assert.Equal(t, maxQueriersPerUser, len(qs))
}
@ -397,7 +397,7 @@ func generateQuerier(r *rand.Rand) string {
return fmt.Sprint("querier-", r.Int()%5)
}
func getOrAdd(t *testing.T, uq *tenantQueues, tenant string, maxQueriers int) Queue {
func getOrAdd(t *testing.T, uq *tenantQueues, tenant string, maxQueriers int) RequestChannel {
q := uq.getOrAddQueue(tenant, maxQueriers)
assert.NotNil(t, q)
assert.NoError(t, isConsistent(uq))
@ -405,9 +405,8 @@ func getOrAdd(t *testing.T, uq *tenantQueues, tenant string, maxQueriers int) Qu
return q
}
func confirmOrderForQuerier(t *testing.T, uq *tenantQueues, querier string, lastUserIndex QueueIndex, qs ...Queue) QueueIndex {
t.Helper()
var n Queue
func confirmOrderForQuerier(t *testing.T, uq *tenantQueues, querier string, lastUserIndex QueueIndex, qs ...RequestChannel) QueueIndex {
var n RequestChannel
for _, q := range qs {
n, _, lastUserIndex = uq.getNextQueueForQuerier(lastUserIndex, querier)
assert.Equal(t, q, n)
@ -422,20 +421,24 @@ func isConsistent(uq *tenantQueues) error {
}
uc := 0
for _, u := range uq.mapping.Keys() {
q := uq.mapping.GetByKey(u)
if u != empty && q == nil {
for ix, u := range uq.tenants {
q := uq.queues[u]
if u != "" && q == nil {
return fmt.Errorf("user %s doesn't have queue", u)
}
if u == empty && q != nil {
if u == "" && q != nil {
return fmt.Errorf("user %s shouldn't have queue", u)
}
if u == empty {
if u == "" {
continue
}
uc++
if q.index != ix {
return fmt.Errorf("invalid user's index, expected=%d, got=%d", ix, q.index)
}
if q.maxQueriers == 0 && q.queriers != nil {
return fmt.Errorf("user %s has queriers, but maxQueriers=0", u)
}
@ -449,7 +452,7 @@ func isConsistent(uq *tenantQueues) error {
}
}
if uc != uq.mapping.Len() {
if uc != len(uq.queues) {
return fmt.Errorf("inconsistent number of users list and user queues")
}
@ -459,8 +462,7 @@ func isConsistent(uq *tenantQueues) error {
// getUsersByQuerier returns the list of users handled by the provided querierID.
func getUsersByQuerier(queues *tenantQueues, querierID string) []string {
var userIDs []string
for _, userID := range queues.mapping.Keys() {
q := queues.mapping.GetByKey(userID)
for userID, q := range queues.queues {
if q.queriers == nil {
// If it's nil then all queriers can handle this user.
userIDs = append(userIDs, userID)

@ -154,12 +154,7 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe
Name: "cortex_query_scheduler_discarded_requests_total",
Help: "Total number of query requests discarded.",
}, []string{"user"})
metrics := &queue.Metrics{
QueueLength: s.queueLength,
DiscardedRequests: s.discardedRequests,
}
s.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, metrics)
s.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, s.queueLength, s.discardedRequests)
s.queueDuration = promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_query_scheduler_queue_duration_seconds",

Loading…
Cancel
Save