From 22b672986a83c2aa160a9ff37bc953429848987b Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Tue, 14 Mar 2023 15:31:48 +0100 Subject: [PATCH] Revert "Implement hierarchical queues for query scheduler" (#8796) Reverts grafana/loki#8691 These changes caused problems with the scheduler in our dev environments. I will conduct further testing and include the reverted changes in https://github.com/grafana/loki/pull/8752 --- pkg/lokifrontend/frontend/v1/frontend.go | 7 +- pkg/lokifrontend/frontend/v1/frontend_test.go | 11 +- pkg/scheduler/queue/leafqueue.go | 137 -------------- pkg/scheduler/queue/leafqueue_test.go | 171 ------------------ pkg/scheduler/queue/mapping.go | 117 ------------ pkg/scheduler/queue/mapping_test.go | 85 --------- pkg/scheduler/queue/queue.go | 30 ++- pkg/scheduler/queue/queue_test.go | 57 +++--- pkg/scheduler/queue/tenant_queues.go | 130 ++++++------- pkg/scheduler/queue/tenant_queues_test.go | 28 +-- pkg/scheduler/scheduler.go | 7 +- 11 files changed, 127 insertions(+), 653 deletions(-) delete mode 100644 pkg/scheduler/queue/leafqueue.go delete mode 100644 pkg/scheduler/queue/leafqueue_test.go delete mode 100644 pkg/scheduler/queue/mapping.go delete mode 100644 pkg/scheduler/queue/mapping_test.go diff --git a/pkg/lokifrontend/frontend/v1/frontend.go b/pkg/lokifrontend/frontend/v1/frontend.go index 7748f6313f..d409820221 100644 --- a/pkg/lokifrontend/frontend/v1/frontend.go +++ b/pkg/lokifrontend/frontend/v1/frontend.go @@ -99,12 +99,7 @@ func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Regist }), } - metrics := &queue.Metrics{ - QueueLength: f.queueLength, - DiscardedRequests: f.discardedRequests, - } - - f.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, metrics) + f.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, f.queueLength, f.discardedRequests) f.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(f.cleanupInactiveUserMetrics) var err error diff --git a/pkg/lokifrontend/frontend/v1/frontend_test.go b/pkg/lokifrontend/frontend/v1/frontend_test.go index 5c5ad1912f..54a2d65e6e 100644 --- a/pkg/lokifrontend/frontend/v1/frontend_test.go +++ b/pkg/lokifrontend/frontend/v1/frontend_test.go @@ -126,13 +126,12 @@ func TestFrontendCheckReady(t *testing.T) { {"no url, no clients is not ready", 0, "not ready: number of queriers connected to query-frontend is 0", false}, } { t.Run(tt.name, func(t *testing.T) { - m := &queue.Metrics{ - QueueLength: prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), - DiscardedRequests: prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), - } f := &Frontend{ - log: log.NewNopLogger(), - requestQueue: queue.NewRequestQueue(5, 0, m), + log: log.NewNopLogger(), + requestQueue: queue.NewRequestQueue(5, 0, + prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), + prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), + ), } for i := 0; i < tt.connectedClients; i++ { f.requestQueue.RegisterQuerierConnection("test") diff --git a/pkg/scheduler/queue/leafqueue.go b/pkg/scheduler/queue/leafqueue.go deleted file mode 100644 index eea2545133..0000000000 --- a/pkg/scheduler/queue/leafqueue.go +++ /dev/null @@ -1,137 +0,0 @@ -package queue - -import ( - "fmt" - "strings" -) - -type QueuePath []string //nolint:revive - -// LeafQueue is an hierarchical queue implementation where each sub-queue -// has the same guarantees to be chosen from. -// Each queue has also a local queue, which gets chosen from first. Only if the -// local queue is empty, items from the sub-queues are dequeued. -type LeafQueue struct { - // local queue - ch RequestChannel - // index of where this item is located in the mapping - pos QueueIndex - // index of the sub-queues - current QueueIndex - // mapping for sub-queues - mapping *Mapping[*LeafQueue] - // name of the queue - name string - // maximum queue size of the local queue - size int -} - -// newLeafQueue creates a new LeafQueue instance -func newLeafQueue(size int, name string) *LeafQueue { - m := &Mapping[*LeafQueue]{} - m.Init(64) // TODO(chaudum): What is a good initial value? - return &LeafQueue{ - ch: make(RequestChannel, size), - pos: StartIndex, - current: StartIndex, - mapping: m, - name: name, - size: size, - } -} - -// add recursively adds queues based on given path -func (q *LeafQueue) add(ident QueuePath) *LeafQueue { - if len(ident) == 0 { - return nil - } - curr := ident[0] - queue, created := q.getOrCreate(curr) - if created { - q.mapping.Put(queue.Name(), queue) - } - if len(ident[1:]) > 0 { - queue.add(ident[1:]) - } - return queue -} - -func (q *LeafQueue) getOrCreate(ident string) (subq *LeafQueue, created bool) { - subq = q.mapping.GetByKey(ident) - if subq == nil { - subq = newLeafQueue(q.size, ident) - created = true - } - return subq, created -} - -// Chan implements Queue -func (q *LeafQueue) Chan() RequestChannel { - return q.ch -} - -// Dequeue implements Queue -func (q *LeafQueue) Dequeue() Request { - // first, return item from local channel - if len(q.ch) > 0 { - return <-q.ch - } - - // only if there are no items queued in the local queue, dequeue from sub-queues - maxIter := q.mapping.Len() - for iters := 0; iters < maxIter; iters++ { - subq := q.mapping.GetNext(q.current) - if subq != nil { - q.current = subq.pos - item := subq.Dequeue() - if item != nil { - return item - } - q.mapping.Remove(subq.name) - } - } - return nil -} - -// Name implements Queue -func (q *LeafQueue) Name() string { - return q.name -} - -// Len implements Queue -// It returns the length of the local queue and all sub-queues. -// This may be expensive depending on the size of the queue tree. -func (q *LeafQueue) Len() int { - count := len(q.ch) - for _, subq := range q.mapping.Values() { - count += subq.Len() - } - return count -} - -// Index implements Mapable -func (q *LeafQueue) Pos() QueueIndex { - return q.pos -} - -// Index implements Mapable -func (q *LeafQueue) SetPos(index QueueIndex) { - q.pos = index -} - -// String makes the queue printable -func (q *LeafQueue) String() string { - sb := &strings.Builder{} - sb.WriteString("{") - fmt.Fprintf(sb, "name=%s, len=%d/%d, leafs=[", q.Name(), q.Len(), cap(q.ch)) - subqs := q.mapping.Values() - for i, m := range subqs { - sb.WriteString(m.String()) - if i < len(subqs)-1 { - sb.WriteString(",") - } - } - sb.WriteString("]") - sb.WriteString("}") - return sb.String() -} diff --git a/pkg/scheduler/queue/leafqueue_test.go b/pkg/scheduler/queue/leafqueue_test.go deleted file mode 100644 index ce6e85bb57..0000000000 --- a/pkg/scheduler/queue/leafqueue_test.go +++ /dev/null @@ -1,171 +0,0 @@ -package queue - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -type dummyRequest struct { - id int -} - -func r(id int) *dummyRequest { - return &dummyRequest{id} -} - -func TestLeafQueue(t *testing.T) { - - t.Run("add sub queues recursively", func(t *testing.T) { - pathA := QueuePath([]string{"l0", "l1", "l3"}) - pathB := QueuePath([]string{"l0", "l2", "l3"}) - - q := newLeafQueue(1, "root") - require.NotNil(t, q) - require.Equal(t, "root", q.Name()) - require.Equal(t, 0, q.Len()) - require.Equal(t, 0, q.mapping.Len()) - - q.add(pathA) - require.Equal(t, 1, q.mapping.Len()) - - q.add(pathB) - require.Equal(t, 1, q.mapping.Len()) - }) - - t.Run("enqueue/dequeue to/from subqueues", func(t *testing.T) { - /** - root: [0] - a: [1] - b: [2] - b0: [20] - b1: [21] - c: [3] - c0: [30] - c00: [300] - c01: [301] - c1: [31] - c10: [310] - c11: [311] - **/ - paths := []QueuePath{ - QueuePath([]string{"a"}), - QueuePath([]string{"b", "b0"}), - QueuePath([]string{"b", "b1"}), - QueuePath([]string{"c", "c0", "c00"}), - QueuePath([]string{"c", "c0", "c01"}), - QueuePath([]string{"c", "c1", "c10"}), - QueuePath([]string{"c", "c1", "c11"}), - } - - q := newLeafQueue(10, "root") - require.NotNil(t, q) - for _, p := range paths { - q.add(p) - } - - require.Equal(t, 3, q.mapping.Len()) - - // no items in any queues - require.Equal(t, 0, q.Len()) - - q.Chan() <- r(0) - require.Equal(t, 1, q.Len()) - - q.mapping.GetByKey("a").Chan() <- r(1) - require.Equal(t, 2, q.Len()) - - q.mapping.GetByKey("b").Chan() <- r(2) - q.mapping.GetByKey("b").mapping.GetByKey("b0").Chan() <- r(20) - q.mapping.GetByKey("b").mapping.GetByKey("b1").Chan() <- r(21) - require.Equal(t, 5, q.Len()) - - q.mapping.GetByKey("c").Chan() <- r(3) - q.mapping.GetByKey("c").mapping.GetByKey("c0").Chan() <- r(30) - q.mapping.GetByKey("c").mapping.GetByKey("c0").mapping.GetByKey("c00").Chan() <- r(300) - q.mapping.GetByKey("c").mapping.GetByKey("c0").mapping.GetByKey("c01").Chan() <- r(301) - q.mapping.GetByKey("c").mapping.GetByKey("c1").Chan() <- r(31) - q.mapping.GetByKey("c").mapping.GetByKey("c1").mapping.GetByKey("c10").Chan() <- r(310) - q.mapping.GetByKey("c").mapping.GetByKey("c1").mapping.GetByKey("c11").Chan() <- r(311) - require.Equal(t, 12, q.Len()) - t.Log(q) - - items := make([]int, 0, q.Len()) - - for q.Len() > 0 { - r := q.Dequeue() - if r == nil { - continue - } - items = append(items, r.(*dummyRequest).id) - } - require.Len(t, items, 12) - require.Equal(t, []int{0, 1, 2, 3, 20, 30, 21, 31, 300, 310, 301, 311}, items) - }) - - t.Run("dequeue ensure round-robin", func(t *testing.T) { - /** - root: - a: [100, 101, 102] - b: [200] - c: [300, 301] - **/ - paths := []QueuePath{ - QueuePath([]string{"a"}), - QueuePath([]string{"b"}), - QueuePath([]string{"c"}), - } - - q := newLeafQueue(10, "root") - require.NotNil(t, q) - for _, p := range paths { - q.add(p) - } - - require.Equal(t, 3, q.mapping.Len()) - - // no items in any queues - require.Equal(t, 0, q.Len()) - - q.mapping.GetByKey("a").Chan() <- r(100) - q.mapping.GetByKey("a").Chan() <- r(101) - q.mapping.GetByKey("a").Chan() <- r(102) - q.mapping.GetByKey("b").Chan() <- r(200) - q.mapping.GetByKey("c").Chan() <- r(300) - q.mapping.GetByKey("c").Chan() <- r(301) - - t.Log(q) - - items := make([]int, 0, q.Len()) - - for q.Len() > 0 { - r := q.Dequeue() - if r == nil { - continue - } - items = append(items, r.(*dummyRequest).id) - } - require.Len(t, items, 6) - require.Equal(t, []int{100, 200, 300, 101, 301, 102}, items) - }) - - t.Run("empty sub-queues are removed", func(t *testing.T) { - q := newLeafQueue(10, "root") - q.add(QueuePath{"a"}) - q.add(QueuePath{"b"}) - - q.mapping.GetByKey("a").Chan() <- r(1) - q.mapping.GetByKey("b").Chan() <- r(2) - - t.Log(q) - - // drain queue - r := q.Dequeue() - for r != nil { - r = q.Dequeue() - } - - require.Nil(t, q.mapping.GetByKey("a")) - require.Nil(t, q.mapping.GetByKey("b")) - }) -} diff --git a/pkg/scheduler/queue/mapping.go b/pkg/scheduler/queue/mapping.go deleted file mode 100644 index c85eb1a57b..0000000000 --- a/pkg/scheduler/queue/mapping.go +++ /dev/null @@ -1,117 +0,0 @@ -package queue - -type Mapable interface { - *tenantQueue | *LeafQueue - // https://github.com/golang/go/issues/48522#issuecomment-924348755 - Pos() QueueIndex - SetPos(index QueueIndex) -} - -var empty = string([]byte{byte(0)}) - -// Mapping is a map-like data structure that allows accessing its items not -// only by key but also by index. -// When an item is removed, the iinternal key array is not resized, but the -// removed place is marked as empty. This allows to remove keys without -// changing the index of the remaining items after the removed key. -// Mapping uses *tenantQueue as concrete value and keys of type string. -// The data structure is not thread-safe. -type Mapping[v Mapable] struct { - m map[string]v - keys []string - empty []QueueIndex -} - -func (m *Mapping[v]) Init(size int) { - m.m = make(map[string]v, size) - m.keys = make([]string, 0, size) - m.empty = make([]QueueIndex, 0, size) -} - -func (m *Mapping[v]) Put(key string, value v) bool { - // do not allow empty string or 0 byte string as key - if key == "" || key == empty { - return false - } - if len(m.empty) == 0 { - value.SetPos(QueueIndex(len(m.keys))) - m.keys = append(m.keys, key) - } else { - idx := m.empty[0] - m.empty = m.empty[1:] - m.keys[idx] = key - value.SetPos(idx) - } - m.m[key] = value - return true -} - -func (m *Mapping[v]) Get(idx QueueIndex) v { - if len(m.keys) == 0 { - return nil - } - k := m.keys[idx] - return m.GetByKey(k) -} - -func (m *Mapping[v]) GetNext(idx QueueIndex) v { - if len(m.keys) == 0 { - return nil - } - - // convert to int - i := int(idx) - // proceed to the next index - i = i + 1 - // start from beginning if next index exceeds slice length - if i >= len(m.keys) { - i = 0 - } - - for i < len(m.keys) { - k := m.keys[i] - if k != empty { - return m.GetByKey(k) - } - i++ - } - return nil -} - -func (m *Mapping[v]) GetByKey(key string) v { - // do not allow empty string or 0 byte string as key - if key == "" || key == empty { - return nil - } - return m.m[key] -} - -func (m *Mapping[v]) Remove(key string) bool { - e := m.m[key] - if e == nil { - return false - } - delete(m.m, key) - m.keys[e.Pos()] = empty - m.empty = append(m.empty, e.Pos()) - return true -} - -func (m *Mapping[v]) Keys() []string { - return m.keys -} - -func (m *Mapping[v]) Values() []v { - values := make([]v, 0, len(m.keys)) - for _, k := range m.keys { - if k == empty { - continue - } - values = append(values, m.m[k]) - } - return values -} - -func (m *Mapping[v]) Len() int { - return len(m.keys) - len(m.empty) -} diff --git a/pkg/scheduler/queue/mapping_test.go b/pkg/scheduler/queue/mapping_test.go deleted file mode 100644 index 200e73b99e..0000000000 --- a/pkg/scheduler/queue/mapping_test.go +++ /dev/null @@ -1,85 +0,0 @@ -package queue - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func TestQueueMapping(t *testing.T) { - // Individual sub-tests in this test case are reflecting a scenario and need - // to be executed in sequential order. - - m := &Mapping[*LeafQueue]{} - m.Init(16) - - require.Equal(t, m.Len(), 0) - - t.Run("put item to mapping", func(t *testing.T) { - q1 := newLeafQueue(10, "queue-1") - m.Put(q1.Name(), q1) - require.Equal(t, 1, m.Len()) - require.Equal(t, []string{"queue-1"}, m.Keys()) - }) - - t.Run("insert order is preserved if there is no empty slot", func(t *testing.T) { - q2 := newLeafQueue(10, "queue-2") - m.Put(q2.Name(), q2) - require.Equal(t, 2, m.Len()) - require.Equal(t, []string{"queue-1", "queue-2"}, m.Keys()) - }) - - t.Run("insert into empty slot if item was removed previously", func(t *testing.T) { - ok := m.Remove("queue-1") - require.True(t, ok) - require.Equal(t, 1, m.Len()) - q3 := newLeafQueue(10, "queue-3") - m.Put(q3.Name(), q3) - require.Equal(t, 2, m.Len()) - require.Equal(t, []string{"queue-3", "queue-2"}, m.Keys()) - }) - - t.Run("insert order is preserved across keys and values", func(t *testing.T) { - q4 := newLeafQueue(10, "queue-4") - m.Put(q4.Name(), q4) - require.Equal(t, 3, m.Len()) - for idx, v := range m.Values() { - require.Equal(t, v.Name(), m.Keys()[idx]) - } - }) - - t.Run("get by key", func(t *testing.T) { - key := "queue-2" - item := m.GetByKey(key) - require.Equal(t, key, item.Name()) - require.Equal(t, QueueIndex(1), item.Pos()) - }) - - t.Run("get by empty key returns nil", func(t *testing.T) { - require.Nil(t, m.GetByKey("")) - require.Nil(t, m.GetByKey(empty)) - }) - - t.Run("get next item based on index must not skip when items are removed", func(t *testing.T) { - item := m.GetNext(StartIndex) - require.Equal(t, "queue-3", item.Name()) - item = m.GetNext(item.Pos()) - require.Equal(t, "queue-2", item.Name()) - m.Remove(item.Name()) - item = m.GetNext(item.Pos()) - require.Equal(t, "queue-4", item.Name()) - }) - - t.Run("get next item out of range returns first item", func(t *testing.T) { - item := m.GetNext(100) - require.Equal(t, "queue-3", item.Name()) - }) - - t.Run("get next item skips empty slots", func(t *testing.T) { - item := m.GetNext(100) - require.Equal(t, "queue-3", item.Name()) - item = m.GetNext(item.Pos()) - require.Equal(t, "queue-4", item.Name()) - }) - -} diff --git a/pkg/scheduler/queue/queue.go b/pkg/scheduler/queue/queue.go index b5cc69f369..5a72ea8015 100644 --- a/pkg/scheduler/queue/queue.go +++ b/pkg/scheduler/queue/queue.go @@ -25,9 +25,6 @@ var ( // of RequestQueue.GetNextRequestForQuerier method. type QueueIndex int // nolint:revive -// StartIndex is the UserIndex that starts iteration over tenant queues from the very first tenant. -var StartIndex QueueIndex = -1 - // Modify index to start iteration on the same tenant, for which last queue was returned. func (ui QueueIndex) ReuseLastIndex() QueueIndex { if ui < 0 { @@ -36,6 +33,9 @@ func (ui QueueIndex) ReuseLastIndex() QueueIndex { return ui - 1 } +// StartIndex is the UserIndex that starts iteration over tenant queues from the very first tenant. +var StartIndex QueueIndex = -1 + // Request stored into the queue. type Request any @@ -54,19 +54,17 @@ type RequestQueue struct { cond contextCond // Notified when request is enqueued or dequeued, or querier is disconnected. queues *tenantQueues stopped bool - metrics *Metrics -} -type Metrics struct { - QueueLength *prometheus.GaugeVec // Per tenant and reason. - DiscardedRequests *prometheus.CounterVec // Per tenant. + queueLength *prometheus.GaugeVec // Per tenant and reason. + discardedRequests *prometheus.CounterVec // Per tenant. } -func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, metrics *Metrics) *RequestQueue { +func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec) *RequestQueue { q := &RequestQueue{ queues: newTenantQueues(maxOutstandingPerTenant, forgetDelay), connectedQuerierWorkers: atomic.NewInt32(0), - metrics: metrics, + queueLength: queueLength, + discardedRequests: discardedRequests, } q.cond = contextCond{Cond: sync.NewCond(&q.mtx)} @@ -95,8 +93,8 @@ func (q *RequestQueue) Enqueue(tenant string, req Request, maxQueriers int, succ } select { - case queue.Chan() <- req: - q.metrics.QueueLength.WithLabelValues(tenant).Inc() + case queue <- req: + q.queueLength.WithLabelValues(tenant).Inc() q.cond.Broadcast() // Call this function while holding a lock. This guarantees that no querier can fetch the request before function returns. if successFn != nil { @@ -104,7 +102,7 @@ func (q *RequestQueue) Enqueue(tenant string, req Request, maxQueriers int, succ } return nil default: - q.metrics.DiscardedRequests.WithLabelValues(tenant).Inc() + q.discardedRequests.WithLabelValues(tenant).Inc() return ErrTooManyRequests } } @@ -142,12 +140,12 @@ FindQueue: // Pick next request from the queue. for { - request := queue.Dequeue() - if queue.Len() == 0 { + request := <-queue + if len(queue) == 0 { q.queues.deleteQueue(tenant) } - q.metrics.QueueLength.WithLabelValues(tenant).Dec() + q.queueLength.WithLabelValues(tenant).Dec() // Tell close() we've processed a request. q.cond.Broadcast() diff --git a/pkg/scheduler/queue/queue_test.go b/pkg/scheduler/queue/queue_test.go index 0bd8a6ba26..2e48c1335b 100644 --- a/pkg/scheduler/queue/queue_test.go +++ b/pkg/scheduler/queue/queue_test.go @@ -22,11 +22,10 @@ func BenchmarkGetNextRequest(b *testing.B) { queues := make([]*RequestQueue, 0, b.N) for n := 0; n < b.N; n++ { - m := &Metrics{ - QueueLength: prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), - DiscardedRequests: prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), - } - queue := NewRequestQueue(maxOutstandingPerTenant, 0, m) + queue := NewRequestQueue(maxOutstandingPerTenant, 0, + prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), + prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), + ) queues = append(queues, queue) for ix := 0; ix < queriers; ix++ { @@ -43,30 +42,29 @@ func BenchmarkGetNextRequest(b *testing.B) { } } } - - } - - querierNames := make([]string, queriers) - for x := 0; x < queriers; x++ { - querierNames[x] = fmt.Sprintf("querier-%d", x) } ctx := context.Background() b.ResetTimer() for i := 0; i < b.N; i++ { - for j := 0; j < queriers; j++ { - idx := StartIndex - for x := 0; x < maxOutstandingPerTenant*numTenants/queriers; x++ { - r, nidx, err := queues[i].Dequeue(ctx, idx, querierNames[j]) - if r == nil { - break + idx := StartIndex + for j := 0; j < maxOutstandingPerTenant*numTenants; j++ { + querier := "" + b: + // Find querier with at least one request to avoid blocking in getNextRequestForQuerier. + for _, q := range queues[i].queues.queues { + for qid := range q.queriers { + querier = qid + break b } - if err != nil { - b.Fatal(err) - } - idx = nidx } + + _, nidx, err := queues[i].Dequeue(ctx, idx, querier) + if err != nil { + b.Fatal(err) + } + idx = nidx } } } @@ -81,11 +79,10 @@ func BenchmarkQueueRequest(b *testing.B) { requests := make([]string, 0, numTenants) for n := 0; n < b.N; n++ { - m := &Metrics{ - QueueLength: prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), - DiscardedRequests: prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), - } - q := NewRequestQueue(maxOutstandingPerTenant, 0, m) + q := NewRequestQueue(maxOutstandingPerTenant, 0, + prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), + prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), + ) for ix := 0; ix < queriers; ix++ { q.RegisterQuerierConnection(fmt.Sprintf("querier-%d", ix)) @@ -115,11 +112,9 @@ func BenchmarkQueueRequest(b *testing.B) { func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBecauseQuerierHasBeenForgotten(t *testing.T) { const forgetDelay = 3 * time.Second - m := &Metrics{ - QueueLength: prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), - DiscardedRequests: prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), - } - queue := NewRequestQueue(1, forgetDelay, m) + queue := NewRequestQueue(1, forgetDelay, + prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), + prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"})) // Start the queue service. ctx := context.Background() diff --git a/pkg/scheduler/queue/tenant_queues.go b/pkg/scheduler/queue/tenant_queues.go index add29f84fd..d9327432e5 100644 --- a/pkg/scheduler/queue/tenant_queues.go +++ b/pkg/scheduler/queue/tenant_queues.go @@ -28,7 +28,12 @@ type querier struct { // This struct holds tenant queues for pending requests. It also keeps track of connected queriers, // and mapping between tenants and queriers. type tenantQueues struct { - mapping *Mapping[*tenantQueue] + queues map[string]*tenantQueue + + // List of all tenants with queues, used for iteration when searching for next queue to handle. + // Tenants removed from the middle are replaced with "". To avoid skipping tenants during iteration, we only shrink + // this list when there are ""'s at the end of it. + tenants []string maxUserQueueSize int @@ -43,19 +48,9 @@ type tenantQueues struct { sortedQueriers []string } -type Queue interface { - Chan() RequestChannel - Dequeue() Request - Name() string - Len() int -} - type tenantQueue struct { ch RequestChannel - // name of the queue (aka tenant) - name string - // If not nil, only these queriers can handle user requests. If nil, all queriers can. // We set this to nil if number of available queriers <= maxQueriers. queriers map[string]struct{} @@ -66,44 +61,13 @@ type tenantQueue struct { seed int64 // Points back to 'users' field in queues. Enables quick cleanup. - index QueueIndex -} - -// Chan implements Queue -func (q *tenantQueue) Chan() RequestChannel { - return q.ch -} - -// Dequeue implements Queue -func (q *tenantQueue) Dequeue() Request { - return <-q.ch -} - -// Name implements Queue -func (q *tenantQueue) Name() string { - return q.name -} - -// Len implements Queue -func (q *tenantQueue) Len() int { - return len(q.ch) -} - -// Len implements Mapable -func (q *tenantQueue) Pos() QueueIndex { - return q.index -} - -// Len implements Mapable -func (q *tenantQueue) SetPos(index QueueIndex) { - q.index = index + index int } func newTenantQueues(maxUserQueueSize int, forgetDelay time.Duration) *tenantQueues { - mm := &Mapping[*tenantQueue]{} - mm.Init(64) return &tenantQueues{ - mapping: mm, + queues: map[string]*tenantQueue{}, + tenants: nil, maxUserQueueSize: maxUserQueueSize, forgetDelay: forgetDelay, queriers: map[string]*querier{}, @@ -112,18 +76,29 @@ func newTenantQueues(maxUserQueueSize int, forgetDelay time.Duration) *tenantQue } func (q *tenantQueues) len() int { - return q.mapping.Len() + return len(q.queues) } func (q *tenantQueues) deleteQueue(tenant string) { - q.mapping.Remove(tenant) + uq := q.queues[tenant] + if uq == nil { + return + } + + delete(q.queues, tenant) + q.tenants[uq.index] = "" + + // Shrink users list size if possible. This is safe, and no users will be skipped during iteration. + for ix := len(q.tenants) - 1; ix >= 0 && q.tenants[ix] == ""; ix-- { + q.tenants = q.tenants[:ix] + } } // Returns existing or new queue for a tenant. // MaxQueriers is used to compute which queriers should handle requests for this tenant. // If maxQueriers is <= 0, all queriers can handle this tenant's requests. // If maxQueriers has changed since the last call, queriers for this are recomputed. -func (q *tenantQueues) getOrAddQueue(tenant string, maxQueriers int) Queue { +func (q *tenantQueues) getOrAddQueue(tenant string, maxQueriers int) RequestChannel { // Empty tenant is not allowed, as that would break our tenants list ("" is used for free spot). if tenant == "" { return nil @@ -133,14 +108,30 @@ func (q *tenantQueues) getOrAddQueue(tenant string, maxQueriers int) Queue { maxQueriers = 0 } - uq := q.mapping.GetByKey(tenant) + uq := q.queues[tenant] + if uq == nil { uq = &tenantQueue{ - ch: make(RequestChannel, q.maxUserQueueSize), - seed: util.ShuffleShardSeed(tenant, ""), - name: tenant, + ch: make(RequestChannel, q.maxUserQueueSize), + seed: util.ShuffleShardSeed(tenant, ""), + index: -1, + } + q.queues[tenant] = uq + + // Add user to the list of users... find first free spot, and put it there. + for ix, u := range q.tenants { + if u == "" { + uq.index = ix + q.tenants[ix] = tenant + break + } + } + + // ... or add to the end. + if uq.index < 0 { + uq.index = len(q.tenants) + q.tenants = append(q.tenants, tenant) } - q.mapping.Put(tenant, uq) } if uq.maxQueriers != maxQueriers { @@ -148,13 +139,13 @@ func (q *tenantQueues) getOrAddQueue(tenant string, maxQueriers int) Queue { uq.queriers = shuffleQueriersForTenants(uq.seed, maxQueriers, q.sortedQueriers, nil) } - return uq + return uq.ch } // Finds next queue for the querier. To support fair scheduling between users, client is expected // to pass last user index returned by this function as argument. Is there was no previous // last user index, use -1. -func (q *tenantQueues) getNextQueueForQuerier(lastUserIndex QueueIndex, querierID string) (Queue, string, QueueIndex) { +func (q *tenantQueues) getNextQueueForQuerier(lastUserIndex QueueIndex, querierID string) (RequestChannel, string, QueueIndex) { uid := lastUserIndex // Ensure the querier is not shutting down. If the querier is shutting down, we shouldn't forward @@ -163,22 +154,31 @@ func (q *tenantQueues) getNextQueueForQuerier(lastUserIndex QueueIndex, querierI return nil, "", uid } - for iters := 0; iters < q.mapping.Len(); iters++ { - tq := q.mapping.GetNext(uid) - if tq == nil { - break + for iters := 0; iters < len(q.tenants); iters++ { + uid = uid + 1 + + // Don't use "mod len(q.users)", as that could skip users at the beginning of the list + // for example when q.users has shrunk since last call. + if int(uid) >= len(q.tenants) { + uid = 0 + } + + u := q.tenants[uid] + if u == "" { + continue } - uid = tq.index - if tq.queriers != nil { - if _, ok := tq.queriers[querierID]; !ok { + q := q.queues[u] + + if q.queriers != nil { + if _, ok := q.queriers[querierID]; !ok { // This querier is not handling the user. continue } } - return tq, tq.name, uid - } + return q.ch, u, uid + } return nil, "", uid } @@ -284,7 +284,7 @@ func (q *tenantQueues) forgetDisconnectedQueriers(now time.Time) int { func (q *tenantQueues) recomputeUserQueriers() { scratchpad := make([]string, 0, len(q.sortedQueriers)) - for _, uq := range q.mapping.Values() { + for _, uq := range q.queues { uq.queriers = shuffleQueriersForTenants(uq.seed, uq.maxQueriers, q.sortedQueriers, scratchpad) } } diff --git a/pkg/scheduler/queue/tenant_queues_test.go b/pkg/scheduler/queue/tenant_queues_test.go index e79ea91d7a..4f18aae6a0 100644 --- a/pkg/scheduler/queue/tenant_queues_test.go +++ b/pkg/scheduler/queue/tenant_queues_test.go @@ -133,7 +133,7 @@ func TestQueuesWithQueriers(t *testing.T) { getOrAdd(t, uq, uid, maxQueriersPerUser) // Verify it has maxQueriersPerUser queriers assigned now. - qs := uq.mapping.GetByKey(uid).queriers + qs := uq.queues[uid].queriers assert.Equal(t, maxQueriersPerUser, len(qs)) } @@ -397,7 +397,7 @@ func generateQuerier(r *rand.Rand) string { return fmt.Sprint("querier-", r.Int()%5) } -func getOrAdd(t *testing.T, uq *tenantQueues, tenant string, maxQueriers int) Queue { +func getOrAdd(t *testing.T, uq *tenantQueues, tenant string, maxQueriers int) RequestChannel { q := uq.getOrAddQueue(tenant, maxQueriers) assert.NotNil(t, q) assert.NoError(t, isConsistent(uq)) @@ -405,9 +405,8 @@ func getOrAdd(t *testing.T, uq *tenantQueues, tenant string, maxQueriers int) Qu return q } -func confirmOrderForQuerier(t *testing.T, uq *tenantQueues, querier string, lastUserIndex QueueIndex, qs ...Queue) QueueIndex { - t.Helper() - var n Queue +func confirmOrderForQuerier(t *testing.T, uq *tenantQueues, querier string, lastUserIndex QueueIndex, qs ...RequestChannel) QueueIndex { + var n RequestChannel for _, q := range qs { n, _, lastUserIndex = uq.getNextQueueForQuerier(lastUserIndex, querier) assert.Equal(t, q, n) @@ -422,20 +421,24 @@ func isConsistent(uq *tenantQueues) error { } uc := 0 - for _, u := range uq.mapping.Keys() { - q := uq.mapping.GetByKey(u) - if u != empty && q == nil { + for ix, u := range uq.tenants { + q := uq.queues[u] + if u != "" && q == nil { return fmt.Errorf("user %s doesn't have queue", u) } - if u == empty && q != nil { + if u == "" && q != nil { return fmt.Errorf("user %s shouldn't have queue", u) } - if u == empty { + if u == "" { continue } uc++ + if q.index != ix { + return fmt.Errorf("invalid user's index, expected=%d, got=%d", ix, q.index) + } + if q.maxQueriers == 0 && q.queriers != nil { return fmt.Errorf("user %s has queriers, but maxQueriers=0", u) } @@ -449,7 +452,7 @@ func isConsistent(uq *tenantQueues) error { } } - if uc != uq.mapping.Len() { + if uc != len(uq.queues) { return fmt.Errorf("inconsistent number of users list and user queues") } @@ -459,8 +462,7 @@ func isConsistent(uq *tenantQueues) error { // getUsersByQuerier returns the list of users handled by the provided querierID. func getUsersByQuerier(queues *tenantQueues, querierID string) []string { var userIDs []string - for _, userID := range queues.mapping.Keys() { - q := queues.mapping.GetByKey(userID) + for userID, q := range queues.queues { if q.queriers == nil { // If it's nil then all queriers can handle this user. userIDs = append(userIDs, userID) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 783324efe7..c81e393372 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -154,12 +154,7 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe Name: "cortex_query_scheduler_discarded_requests_total", Help: "Total number of query requests discarded.", }, []string{"user"}) - - metrics := &queue.Metrics{ - QueueLength: s.queueLength, - DiscardedRequests: s.discardedRequests, - } - s.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, metrics) + s.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, s.queueLength, s.discardedRequests) s.queueDuration = promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{ Name: "cortex_query_scheduler_queue_duration_seconds",