Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loki/pkg/queue/mapping.go

111 lines
2.2 KiB

Scheduler: Add query fairness control across multiple actors within a tenant (#8752) **What this PR does / why we need it**: This PR wires up the scheduler with the hierarchical queues. It is the last PR to implement https://github.com/grafana/loki/pull/8585. When these changes are in place, the client performing query requests can control their QoS (query fairness) using the `X-Actor-Path` HTTP header. This header controls in which sub-queue of the tenant's scheduler queue the query request is enqueued. The place within the hierarchy where it is enqueued defines the probability with which the request gets dequeued. A common use-case for this QoS control is giving each Grafana user within a tenant their fair share of query execution time. Any documentation is still missing and will be provided by follow-up PRs. **Special notes for your reviewer**: ```console $ gotest -count=1 -v ./pkg/scheduler/queue/... -test.run=TestQueryFairness === RUN TestQueryFairness === RUN TestQueryFairness/use_hierarchical_queues_=_false dequeue_qos_test.go:109: duration actor a 2.007765568s dequeue_qos_test.go:109: duration actor b 2.209088331s dequeue_qos_test.go:112: total duration 2.209280772s === RUN TestQueryFairness/use_hierarchical_queues_=_true dequeue_qos_test.go:109: duration actor b 605.283144ms dequeue_qos_test.go:109: duration actor a 2.270931324s dequeue_qos_test.go:112: total duration 2.271108551s --- PASS: TestQueryFairness (4.48s) --- PASS: TestQueryFairness/use_hierarchical_queues_=_false (2.21s) --- PASS: TestQueryFairness/use_hierarchical_queues_=_true (2.27s) PASS ok github.com/grafana/loki/pkg/scheduler/queue 4.491s ``` ```console $ gotest -count=5 -v ./pkg/scheduler/queue/... -bench=Benchmark -test.run=^$ -benchtime=10000x -benchmem goos: linux goarch: amd64 pkg: github.com/grafana/loki/pkg/scheduler/queue cpu: 11th Gen Intel(R) Core(TM) i7-1185G7 @ 3.00GHz BenchmarkGetNextRequest BenchmarkGetNextRequest/without_sub-queues BenchmarkGetNextRequest/without_sub-queues-8 10000 29337 ns/op 1600 B/op 100 allocs/op BenchmarkGetNextRequest/without_sub-queues-8 10000 21348 ns/op 1600 B/op 100 allocs/op BenchmarkGetNextRequest/without_sub-queues-8 10000 21595 ns/op 1600 B/op 100 allocs/op BenchmarkGetNextRequest/without_sub-queues-8 10000 21189 ns/op 1600 B/op 100 allocs/op BenchmarkGetNextRequest/without_sub-queues-8 10000 21602 ns/op 1600 B/op 100 allocs/op BenchmarkGetNextRequest/with_1_level_of_sub-queues BenchmarkGetNextRequest/with_1_level_of_sub-queues-8 10000 33770 ns/op 2400 B/op 200 allocs/op BenchmarkGetNextRequest/with_1_level_of_sub-queues-8 10000 33596 ns/op 2400 B/op 200 allocs/op BenchmarkGetNextRequest/with_1_level_of_sub-queues-8 10000 34432 ns/op 2400 B/op 200 allocs/op BenchmarkGetNextRequest/with_1_level_of_sub-queues-8 10000 33760 ns/op 2400 B/op 200 allocs/op BenchmarkGetNextRequest/with_1_level_of_sub-queues-8 10000 33664 ns/op 2400 B/op 200 allocs/op BenchmarkGetNextRequest/with_2_levels_of_sub-queues BenchmarkGetNextRequest/with_2_levels_of_sub-queues-8 10000 71405 ns/op 3200 B/op 300 allocs/op BenchmarkGetNextRequest/with_2_levels_of_sub-queues-8 10000 59472 ns/op 3200 B/op 300 allocs/op BenchmarkGetNextRequest/with_2_levels_of_sub-queues-8 10000 117163 ns/op 3200 B/op 300 allocs/op BenchmarkGetNextRequest/with_2_levels_of_sub-queues-8 10000 106505 ns/op 3200 B/op 300 allocs/op BenchmarkGetNextRequest/with_2_levels_of_sub-queues-8 10000 64374 ns/op 3200 B/op 300 allocs/op BenchmarkQueueRequest BenchmarkQueueRequest-8 10000 168391 ns/op 320588 B/op 1156 allocs/op BenchmarkQueueRequest-8 10000 166203 ns/op 320587 B/op 1156 allocs/op BenchmarkQueueRequest-8 10000 149518 ns/op 320584 B/op 1156 allocs/op BenchmarkQueueRequest-8 10000 219776 ns/op 320583 B/op 1156 allocs/op BenchmarkQueueRequest-8 10000 185198 ns/op 320597 B/op 1156 allocs/op PASS ok github.com/grafana/loki/pkg/scheduler/queue 64.648s ``` Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
2 years ago
package queue
import (
"github.com/pkg/errors"
)
var ErrOutOfBounds = errors.New("queue index out of bounds")
var empty = string([]byte{byte(0)})
// Mapping is a map-like data structure that allows accessing its items not
// only by key but also by index.
// When an item is removed, the internal key array is not resized, but the
// removed place is marked as empty. This allows to remove keys without
// changing the index of the remaining items after the removed key.
// Mapping uses *tenantQueue as concrete value and keys of type string.
// The data structure is not thread-safe.
type Mapping[v Mapable] struct {
m map[string]v
keys []string
empty []QueueIndex
}
func (m *Mapping[v]) Init(size int) {
m.m = make(map[string]v, size)
m.keys = make([]string, 0, size)
m.empty = make([]QueueIndex, 0, size)
}
func (m *Mapping[v]) Put(key string, value v) bool {
// do not allow empty string or 0 byte string as key
if key == "" || key == empty {
return false
}
if len(m.empty) == 0 {
value.SetPos(QueueIndex(len(m.keys)))
m.keys = append(m.keys, key)
} else {
idx := m.empty[0]
m.empty = m.empty[1:]
m.keys[idx] = key
value.SetPos(idx)
}
m.m[key] = value
return true
}
func (m *Mapping[v]) Get(idx QueueIndex) v {
if len(m.keys) == 0 {
return nil
}
k := m.keys[idx]
return m.GetByKey(k)
}
func (m *Mapping[v]) GetNext(idx QueueIndex) (v, error) {
if m.Len() == 0 {
return nil, ErrOutOfBounds
}
i := int(idx)
i++
for i < len(m.keys) {
k := m.keys[i]
if k != empty {
return m.GetByKey(k), nil
}
i++
}
return nil, ErrOutOfBounds
}
func (m *Mapping[v]) GetByKey(key string) v {
// do not allow empty string or 0 byte string as key
if key == "" || key == empty {
return nil
}
return m.m[key]
}
func (m *Mapping[v]) Remove(key string) bool {
e := m.m[key]
if e == nil {
return false
}
delete(m.m, key)
m.keys[e.Pos()] = empty
m.empty = append(m.empty, e.Pos())
return true
}
func (m *Mapping[v]) Keys() []string {
return m.keys
}
func (m *Mapping[v]) Values() []v {
values := make([]v, 0, len(m.keys))
for _, k := range m.keys {
if k == empty {
continue
}
values = append(values, m.m[k])
}
return values
}
func (m *Mapping[v]) Len() int {
return len(m.keys) - len(m.empty)
}