Scheduler: Add query fairness control across multiple actors within a tenant (#8752)

**What this PR does / why we need it**:

This PR wires up the scheduler with the hierarchical queues. It is the last PR to implement https://github.com/grafana/loki/pull/8585.

When these changes are in place, the client performing query requests can control their QoS (query fairness) using the `X-Actor-Path` HTTP header. This header controls in which sub-queue of the tenant's scheduler queue the query request is enqueued. The place within the hierarchy where it is enqueued defines the probability with which the request gets dequeued.

A common use-case for this QoS control is giving each Grafana user within a tenant their fair share of query execution time.

Any documentation is still missing and will be provided by follow-up PRs.

**Special notes for your reviewer**:

```console
$ gotest -count=1 -v ./pkg/scheduler/queue/... -test.run=TestQueryFairness
=== RUN   TestQueryFairness
=== RUN   TestQueryFairness/use_hierarchical_queues_=_false
    dequeue_qos_test.go:109: duration actor a 2.007765568s
    dequeue_qos_test.go:109: duration actor b 2.209088331s
    dequeue_qos_test.go:112: total duration 2.209280772s
=== RUN   TestQueryFairness/use_hierarchical_queues_=_true
    dequeue_qos_test.go:109: duration actor b 605.283144ms
    dequeue_qos_test.go:109: duration actor a 2.270931324s
    dequeue_qos_test.go:112: total duration 2.271108551s
--- PASS: TestQueryFairness (4.48s)
    --- PASS: TestQueryFairness/use_hierarchical_queues_=_false (2.21s)
    --- PASS: TestQueryFairness/use_hierarchical_queues_=_true (2.27s)
PASS
ok  	github.com/grafana/loki/pkg/scheduler/queue	4.491s
```

```console
$ gotest -count=5 -v ./pkg/scheduler/queue/... -bench=Benchmark -test.run=^$ -benchtime=10000x -benchmem
goos: linux
goarch: amd64
pkg: github.com/grafana/loki/pkg/scheduler/queue
cpu: 11th Gen Intel(R) Core(TM) i7-1185G7 @ 3.00GHz
BenchmarkGetNextRequest
BenchmarkGetNextRequest/without_sub-queues
BenchmarkGetNextRequest/without_sub-queues-8         	   10000	     29337 ns/op	    1600 B/op	     100 allocs/op
BenchmarkGetNextRequest/without_sub-queues-8         	   10000	     21348 ns/op	    1600 B/op	     100 allocs/op
BenchmarkGetNextRequest/without_sub-queues-8         	   10000	     21595 ns/op	    1600 B/op	     100 allocs/op
BenchmarkGetNextRequest/without_sub-queues-8         	   10000	     21189 ns/op	    1600 B/op	     100 allocs/op
BenchmarkGetNextRequest/without_sub-queues-8         	   10000	     21602 ns/op	    1600 B/op	     100 allocs/op
BenchmarkGetNextRequest/with_1_level_of_sub-queues
BenchmarkGetNextRequest/with_1_level_of_sub-queues-8 	   10000	     33770 ns/op	    2400 B/op	     200 allocs/op
BenchmarkGetNextRequest/with_1_level_of_sub-queues-8 	   10000	     33596 ns/op	    2400 B/op	     200 allocs/op
BenchmarkGetNextRequest/with_1_level_of_sub-queues-8 	   10000	     34432 ns/op	    2400 B/op	     200 allocs/op
BenchmarkGetNextRequest/with_1_level_of_sub-queues-8 	   10000	     33760 ns/op	    2400 B/op	     200 allocs/op
BenchmarkGetNextRequest/with_1_level_of_sub-queues-8 	   10000	     33664 ns/op	    2400 B/op	     200 allocs/op
BenchmarkGetNextRequest/with_2_levels_of_sub-queues
BenchmarkGetNextRequest/with_2_levels_of_sub-queues-8         	   10000	     71405 ns/op	    3200 B/op	     300 allocs/op
BenchmarkGetNextRequest/with_2_levels_of_sub-queues-8         	   10000	     59472 ns/op	    3200 B/op	     300 allocs/op
BenchmarkGetNextRequest/with_2_levels_of_sub-queues-8         	   10000	    117163 ns/op	    3200 B/op	     300 allocs/op
BenchmarkGetNextRequest/with_2_levels_of_sub-queues-8         	   10000	    106505 ns/op	    3200 B/op	     300 allocs/op
BenchmarkGetNextRequest/with_2_levels_of_sub-queues-8         	   10000	     64374 ns/op	    3200 B/op	     300 allocs/op
BenchmarkQueueRequest
BenchmarkQueueRequest-8                                       	   10000	    168391 ns/op	  320588 B/op	    1156 allocs/op
BenchmarkQueueRequest-8                                       	   10000	    166203 ns/op	  320587 B/op	    1156 allocs/op
BenchmarkQueueRequest-8                                       	   10000	    149518 ns/op	  320584 B/op	    1156 allocs/op
BenchmarkQueueRequest-8                                       	   10000	    219776 ns/op	  320583 B/op	    1156 allocs/op
BenchmarkQueueRequest-8                                       	   10000	    185198 ns/op	  320597 B/op	    1156 allocs/op
PASS
ok  	github.com/grafana/loki/pkg/scheduler/queue	64.648s
```

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/8853/head
Christian Haudum 3 years ago committed by GitHub
parent 3bed7eef55
commit be8b4eece3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 5
      docs/sources/configuration/_index.md
  3. 1
      pkg/loki/modules.go
  4. 2
      pkg/lokifrontend/frontend/v1/frontend.go
  5. 11
      pkg/lokifrontend/frontend/v2/frontend.go
  6. 3
      pkg/lokifrontend/frontend/v2/frontend_scheduler_worker.go
  7. 5
      pkg/querier/queryrange/codec.go
  8. 183
      pkg/scheduler/queue/dequeue_qos_test.go
  9. 154
      pkg/scheduler/queue/leafqueue.go
  10. 171
      pkg/scheduler/queue/leafqueue_test.go
  11. 117
      pkg/scheduler/queue/mapping.go
  12. 92
      pkg/scheduler/queue/mapping_test.go
  13. 25
      pkg/scheduler/queue/queue.go
  14. 107
      pkg/scheduler/queue/queue_test.go
  15. 112
      pkg/scheduler/queue/tenant_queues.go
  16. 39
      pkg/scheduler/queue/tenant_queues_test.go
  17. 42
      pkg/scheduler/scheduler.go
  18. 153
      pkg/scheduler/schedulerpb/scheduler.pb.go
  19. 2
      pkg/scheduler/schedulerpb/scheduler.proto
  20. 48
      pkg/util/httpreq/headers.go
  21. 1
      pkg/util/httpreq/tags.go

@ -8,6 +8,7 @@
##### Enhancements
* [8752](https://github.com/grafana/loki/pull/8752) **chaudum**: Add query fairness control across actors within a tenant to scheduler, which can be enabled by passing the `X-Loki-Actor-Path` header to the HTTP request of the query.
* [8786](https://github.com/grafana/loki/pull/8786) **DylanGuedes**: Ingester: add new /ingester/prepare_shutdown endpoint.
* [8744](https://github.com/grafana/loki/pull/8744) **dannykopping**: Ruler: remote rule evaluation.
* [8727](https://github.com/grafana/loki/pull/8727) **cstyan** **jeschkies**: Propagate per-request limit header to querier.

@ -522,6 +522,11 @@ The `query_scheduler` block configures the Loki query scheduler. When configured
# CLI flag: -query-scheduler.max-outstanding-requests-per-tenant
[max_outstanding_requests_per_tenant: <int> | default = 100]
# Maximum number of levels of nesting of hierarchical queues. 0 means that
# hierarchical queues are disabled.
# CLI flag: -query-scheduler.max-queue-hierarchy-levels
[max_queue_hierarchy_levels: <int> | default = 3]
# If a querier disconnects without sending notification about graceful shutdown,
# the query-scheduler will keep the querier in the tenant's shard until the
# forget delay has passed. This feature is useful to reduce the blast radius

@ -795,6 +795,7 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
toMerge := []middleware.Interface{
httpreq.ExtractQueryTagsMiddleware(),
httpreq.PropagateHeadersMiddleware(httpreq.LokiActorPathHeader),
serverutil.RecoveryHTTPMiddleware,
t.HTTPAuthMiddleware,
queryrange.StatsHTTPMiddleware,

@ -322,7 +322,7 @@ func (f *Frontend) queueRequest(ctx context.Context, req *request) error {
joinedTenantID := tenant.JoinTenantIDs(tenantIDs)
f.activeUsers.UpdateUserTimestamp(joinedTenantID, now)
err = f.requestQueue.Enqueue(joinedTenantID, req, maxQueriers, nil)
err = f.requestQueue.Enqueue(joinedTenantID, nil, req, maxQueriers, nil)
if err == queue.ErrTooManyRequests {
return errTooManyRequest
}

@ -28,6 +28,7 @@ import (
"github.com/grafana/loki/pkg/lokifrontend/frontend/v2/frontendv2pb"
"github.com/grafana/loki/pkg/querier/stats"
lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc"
"github.com/grafana/loki/pkg/util/httpreq"
util_log "github.com/grafana/loki/pkg/util/log"
)
@ -81,7 +82,8 @@ type Frontend struct {
type frontendRequest struct {
queryID uint64
request *httpgrpc.HTTPRequest
userID string
tenantID string
actor []string
statsEnabled bool
cancel context.CancelFunc
@ -188,7 +190,7 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest)
if err != nil {
return nil, err
}
userID := tenant.JoinTenantIDs(tenantIDs)
tenantID := tenant.JoinTenantIDs(tenantIDs)
// Propagate trace context in gRPC too - this will be ignored if using HTTP.
tracer, span := opentracing.GlobalTracer(), opentracing.SpanFromContext(ctx)
@ -205,7 +207,8 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest)
freq := &frontendRequest{
queryID: f.lastQueryID.Inc(),
request: req,
userID: userID,
tenantID: tenantID,
actor: httpreq.ExtractActorPath(ctx),
statsEnabled: stats.IsEnabled(ctx),
cancel: cancel,
@ -278,7 +281,7 @@ func (f *Frontend) QueryResult(ctx context.Context, qrReq *frontendv2pb.QueryRes
// It is possible that some old response belonging to different user was received, if frontend has restarted.
// To avoid leaking query results between users, we verify the user here.
// To avoid mixing results from different queries, we randomize queryID counter on start.
if req != nil && req.userID == userID {
if req != nil && req.tenantID == userID {
select {
case req.response <- qrReq:
// Should always be possible, unless QueryResult is called multiple times with the same queryID.

@ -286,7 +286,8 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro
err := loop.Send(&schedulerpb.FrontendToScheduler{
Type: schedulerpb.ENQUEUE,
QueryID: req.queryID,
UserID: req.userID,
UserID: req.tenantID,
QueuePath: req.actor,
HttpRequest: req.request,
FrontendAddress: w.frontendAddr,
StatsEnabled: req.statsEnabled,

@ -283,6 +283,11 @@ func (Codec) EncodeRequest(ctx context.Context, r queryrangebase.Request) (*http
header.Set(string(httpreq.QueryTagsHTTPHeader), queryTags)
}
actor := httpreq.ExtractHeader(ctx, httpreq.LokiActorPathHeader)
if actor != "" {
header.Set(httpreq.LokiActorPathHeader, actor)
}
switch request := r.(type) {
case *LokiRequest:
params := url.Values{

@ -0,0 +1,183 @@
package queue
import (
"context"
"fmt"
"strings"
"sync"
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)
const (
numRequestsPerActor = 1
numQueriers = 5
)
type req struct {
duration time.Duration
tenant string
actor string
queryID int
subQueryID int
}
func enqueueRequestsForActor(t testing.TB, actor []string, useActor bool, queue *RequestQueue, numSubRequests int, d time.Duration) {
tenant := "tenant"
serializedActor := strings.Join(actor, "|")
for x := 0; x < numRequestsPerActor; x++ {
for y := 0; y < numSubRequests; y++ {
r := &req{
duration: d,
queryID: x,
subQueryID: y,
tenant: tenant,
actor: serializedActor,
}
if !useActor {
actor = nil
}
err := queue.Enqueue("tenant", actor, r, 0, nil)
if err != nil {
t.Fatal(err)
}
}
}
}
func BenchmarkQueryFairness(t *testing.B) {
numSubRequestsActorA, numSubRequestsActorB := 123, 45
total := int64((numSubRequestsActorA + numSubRequestsActorA + numSubRequestsActorB) * numRequestsPerActor)
for _, useActor := range []bool{false, true} {
t.Run(fmt.Sprintf("use hierarchical queues = %v", useActor), func(t *testing.B) {
requestQueue := NewRequestQueue(1024, 0,
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
)
enqueueRequestsForActor(t, []string{}, useActor, requestQueue, numSubRequestsActorA, 50*time.Millisecond)
enqueueRequestsForActor(t, []string{"a"}, useActor, requestQueue, numSubRequestsActorA, 100*time.Millisecond)
enqueueRequestsForActor(t, []string{"b"}, useActor, requestQueue, numSubRequestsActorB, 50*time.Millisecond)
requestQueue.queues.recomputeUserQueriers()
// set timeout to minize impact on overall test run duration in case something goes wrong
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
start := time.Now()
durations := &sync.Map{}
var wg sync.WaitGroup
var responseCount atomic.Int64
// Simulate querier loop
for q := 0; q < numQueriers; q++ {
wg.Add(1)
go func(id string) {
defer wg.Done()
requestQueue.RegisterQuerierConnection(id)
defer requestQueue.UnregisterQuerierConnection(id)
idx := StartIndex
for ctx.Err() == nil {
r, newIdx, err := requestQueue.Dequeue(ctx, idx, id)
if err != nil {
if err != context.Canceled {
t.Log("Dequeue() returned error:", err)
}
break
}
if r == nil {
t.Log("Dequeue() returned nil response")
break
}
res, _ := r.(*req)
idx = newIdx
time.Sleep(res.duration)
count := responseCount.Add(1)
durations.Store(res.actor, time.Since(start))
if count == total {
t.Log("count", count, "total", total)
cancel()
}
}
}(fmt.Sprintf("querier-%d", q))
}
wg.Wait()
require.Equal(t, total, responseCount.Load())
durations.Range(func(k, v any) bool {
t.Log("duration actor", k, v)
return true
})
t.Log("total duration", time.Since(start))
})
}
}
func TestQueryFairnessAcrossSameLevel(t *testing.T) {
/**
`tenant1`, `tenant1|abc`, and `tenant1|xyz` have equal preference
`tenant1|xyz|123` and `tenant1|xyz|456` have equal preference
root:
tenant1: [0, 1, 2]
abc: [10, 11, 12]
xyz: [20, 21, 22]
123: [200]
456: [210]
**/
requestQueue := NewRequestQueue(1024, 0,
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
)
_ = requestQueue.Enqueue("tenant1", []string{}, r(0), 0, nil)
_ = requestQueue.Enqueue("tenant1", []string{}, r(1), 0, nil)
_ = requestQueue.Enqueue("tenant1", []string{}, r(2), 0, nil)
_ = requestQueue.Enqueue("tenant1", []string{"abc"}, r(10), 0, nil)
_ = requestQueue.Enqueue("tenant1", []string{"abc"}, r(11), 0, nil)
_ = requestQueue.Enqueue("tenant1", []string{"abc"}, r(12), 0, nil)
_ = requestQueue.Enqueue("tenant1", []string{"xyz"}, r(20), 0, nil)
_ = requestQueue.Enqueue("tenant1", []string{"xyz"}, r(21), 0, nil)
_ = requestQueue.Enqueue("tenant1", []string{"xyz"}, r(22), 0, nil)
_ = requestQueue.Enqueue("tenant1", []string{"xyz", "123"}, r(200), 0, nil)
_ = requestQueue.Enqueue("tenant1", []string{"xyz", "456"}, r(210), 0, nil)
requestQueue.queues.recomputeUserQueriers()
items := make([]int, 0)
// set timeout to minize impact on overall test run duration in case something goes wrong
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
requestQueue.RegisterQuerierConnection("querier")
defer requestQueue.UnregisterQuerierConnection("querier")
idx := StartIndexWithLocalQueue
for ctx.Err() == nil {
r, newIdx, err := requestQueue.Dequeue(ctx, idx, "querier")
if err != nil {
if err != context.Canceled {
t.Log("Dequeue() returned error:", err)
}
break
}
if r == nil {
t.Log("Dequeue() returned nil response")
break
}
res, _ := r.(*dummyRequest)
idx = newIdx
items = append(items, res.id)
}
require.Equal(t, []int{0, 10, 20, 1, 11, 200, 2, 12, 210, 21, 22}, items)
}

@ -0,0 +1,154 @@
package queue
import (
"fmt"
"strings"
)
type QueuePath []string //nolint:revive
// LeafQueue is an hierarchical queue implementation where each sub-queue
// has the same guarantees to be chosen from.
// Each queue has also a local queue, which gets chosen with equal preference as the sub-queues.
type LeafQueue struct {
// local queue
ch RequestChannel
// index of where this item is located in the mapping
pos QueueIndex
// index of the sub-queues
current QueueIndex
// mapping for sub-queues
mapping *Mapping[*LeafQueue]
// name of the queue
name string
// maximum queue size of the local queue
size int
}
// newLeafQueue creates a new LeafQueue instance
func newLeafQueue(size int, name string) *LeafQueue {
m := &Mapping[*LeafQueue]{}
m.Init(64) // TODO(chaudum): What is a good initial value?
return &LeafQueue{
ch: make(RequestChannel, size),
pos: StartIndexWithLocalQueue,
current: StartIndexWithLocalQueue,
mapping: m,
name: name,
size: size,
}
}
// add recursively adds queues based on given path
func (q *LeafQueue) add(path QueuePath) *LeafQueue {
if len(path) == 0 {
return q
}
curr, remaining := path[0], path[1:]
queue, created := q.getOrCreate(curr)
if created {
q.mapping.Put(queue.Name(), queue)
}
return queue.add(remaining)
}
func (q *LeafQueue) getOrCreate(name string) (subq *LeafQueue, created bool) {
subq = q.mapping.GetByKey(name)
if subq == nil {
subq = newLeafQueue(q.size, name)
created = true
}
return subq, created
}
// Chan implements Queue
func (q *LeafQueue) Chan() RequestChannel {
return q.ch
}
// Dequeue implements Queue
func (q *LeafQueue) Dequeue() Request {
var item Request
// shortcut of there are not sub-queues
// always use local queue
if q.mapping.Len() == 0 {
if len(q.ch) > 0 {
return <-q.ch
}
return nil
}
maxIter := len(q.mapping.keys) + 1
for iters := 0; iters < maxIter; iters++ {
if q.current == StartIndexWithLocalQueue {
q.current++
if len(q.ch) > 0 {
item = <-q.ch
if item != nil {
return item
}
}
}
subq, err := q.mapping.GetNext(q.current)
if err == ErrOutOfBounds {
q.current = StartIndexWithLocalQueue
continue
}
if subq != nil {
q.current = subq.pos
item := subq.Dequeue()
if item != nil {
if subq.Len() == 0 {
q.mapping.Remove(subq.name)
}
return item
}
}
}
return nil
}
// Name implements Queue
func (q *LeafQueue) Name() string {
return q.name
}
// Len implements Queue
// It returns the length of the local queue and all sub-queues.
// This may be expensive depending on the size of the queue tree.
func (q *LeafQueue) Len() int {
count := len(q.ch)
for _, subq := range q.mapping.Values() {
count += subq.Len()
}
return count
}
// Index implements Mapable
func (q *LeafQueue) Pos() QueueIndex {
return q.pos
}
// Index implements Mapable
func (q *LeafQueue) SetPos(index QueueIndex) {
q.pos = index
}
// String makes the queue printable
func (q *LeafQueue) String() string {
sb := &strings.Builder{}
sb.WriteString("{")
fmt.Fprintf(sb, "name=%s, len=%d/%d, leafs=[", q.Name(), q.Len(), cap(q.ch))
subqs := q.mapping.Values()
for i, m := range subqs {
sb.WriteString(m.String())
if i < len(subqs)-1 {
sb.WriteString(",")
}
}
sb.WriteString("]")
sb.WriteString("}")
return sb.String()
}

@ -0,0 +1,171 @@
package queue
import (
"testing"
"github.com/stretchr/testify/require"
)
type dummyRequest struct {
id int
}
func r(id int) *dummyRequest {
return &dummyRequest{id}
}
func TestLeafQueue(t *testing.T) {
t.Run("add sub queues recursively", func(t *testing.T) {
pathA := QueuePath([]string{"l0", "l1", "l3"})
pathB := QueuePath([]string{"l0", "l2", "l3"})
q := newLeafQueue(1, "root")
require.NotNil(t, q)
require.Equal(t, "root", q.Name())
require.Equal(t, 0, q.Len())
require.Equal(t, 0, q.mapping.Len())
q.add(pathA)
require.Equal(t, 1, q.mapping.Len())
q.add(pathB)
require.Equal(t, 1, q.mapping.Len())
})
t.Run("enqueue/dequeue to/from subqueues", func(t *testing.T) {
/**
root: [0]
a: [1]
b: [2]
b0: [20]
b1: [21]
c: [3]
c0: [30]
c00: [300]
c01: [301]
c1: [31]
c10: [310]
c11: [311]
**/
paths := []QueuePath{
QueuePath([]string{"a"}),
QueuePath([]string{"b", "b0"}),
QueuePath([]string{"b", "b1"}),
QueuePath([]string{"c", "c0", "c00"}),
QueuePath([]string{"c", "c0", "c01"}),
QueuePath([]string{"c", "c1", "c10"}),
QueuePath([]string{"c", "c1", "c11"}),
}
q := newLeafQueue(10, "root")
require.NotNil(t, q)
for _, p := range paths {
q.add(p)
}
require.Equal(t, 3, q.mapping.Len())
// no items in any queues
require.Equal(t, 0, q.Len())
q.Chan() <- r(0)
require.Equal(t, 1, q.Len())
q.mapping.GetByKey("a").Chan() <- r(1)
require.Equal(t, 2, q.Len())
q.mapping.GetByKey("b").Chan() <- r(2)
q.mapping.GetByKey("b").mapping.GetByKey("b0").Chan() <- r(20)
q.mapping.GetByKey("b").mapping.GetByKey("b1").Chan() <- r(21)
require.Equal(t, 5, q.Len())
q.mapping.GetByKey("c").Chan() <- r(3)
q.mapping.GetByKey("c").mapping.GetByKey("c0").Chan() <- r(30)
q.mapping.GetByKey("c").mapping.GetByKey("c0").mapping.GetByKey("c00").Chan() <- r(300)
q.mapping.GetByKey("c").mapping.GetByKey("c0").mapping.GetByKey("c01").Chan() <- r(301)
q.mapping.GetByKey("c").mapping.GetByKey("c1").Chan() <- r(31)
q.mapping.GetByKey("c").mapping.GetByKey("c1").mapping.GetByKey("c10").Chan() <- r(310)
q.mapping.GetByKey("c").mapping.GetByKey("c1").mapping.GetByKey("c11").Chan() <- r(311)
require.Equal(t, 12, q.Len())
t.Log(q)
items := make([]int, 0, q.Len())
for q.Len() > 0 {
r := q.Dequeue()
if r == nil {
continue
}
items = append(items, r.(*dummyRequest).id)
}
require.Len(t, items, 12)
require.Equal(t, []int{0, 1, 2, 3, 20, 30, 21, 31, 300, 310, 301, 311}, items)
})
t.Run("dequeue ensure round-robin", func(t *testing.T) {
/**
root:
a: [100, 101, 102]
b: [200]
c: [300, 301]
**/
paths := []QueuePath{
QueuePath([]string{"a"}),
QueuePath([]string{"b"}),
QueuePath([]string{"c"}),
}
q := newLeafQueue(10, "root")
require.NotNil(t, q)
for _, p := range paths {
q.add(p)
}
require.Equal(t, 3, q.mapping.Len())
// no items in any queues
require.Equal(t, 0, q.Len())
q.mapping.GetByKey("a").Chan() <- r(100)
q.mapping.GetByKey("a").Chan() <- r(101)
q.mapping.GetByKey("a").Chan() <- r(102)
q.mapping.GetByKey("b").Chan() <- r(200)
q.mapping.GetByKey("c").Chan() <- r(300)
q.mapping.GetByKey("c").Chan() <- r(301)
t.Log(q)
items := make([]int, 0, q.Len())
for q.Len() > 0 {
r := q.Dequeue()
if r == nil {
continue
}
items = append(items, r.(*dummyRequest).id)
}
require.Len(t, items, 6)
require.Equal(t, []int{100, 200, 300, 101, 301, 102}, items)
})
t.Run("empty sub-queues are removed", func(t *testing.T) {
q := newLeafQueue(10, "root")
q.add(QueuePath{"a"})
q.add(QueuePath{"b"})
q.mapping.GetByKey("a").Chan() <- r(1)
q.mapping.GetByKey("b").Chan() <- r(2)
t.Log(q)
// drain queue
r := q.Dequeue()
for r != nil {
r = q.Dequeue()
}
require.Nil(t, q.mapping.GetByKey("a"))
require.Nil(t, q.mapping.GetByKey("b"))
})
}

@ -0,0 +1,117 @@
package queue
import (
"github.com/pkg/errors"
)
type Mapable interface {
*tenantQueue | *LeafQueue
// https://github.com/golang/go/issues/48522#issuecomment-924348755
Pos() QueueIndex
SetPos(index QueueIndex)
}
var ErrOutOfBounds = errors.New("queue index out of bounds")
var empty = string([]byte{byte(0)})
// Mapping is a map-like data structure that allows accessing its items not
// only by key but also by index.
// When an item is removed, the internal key array is not resized, but the
// removed place is marked as empty. This allows to remove keys without
// changing the index of the remaining items after the removed key.
// Mapping uses *tenantQueue as concrete value and keys of type string.
// The data structure is not thread-safe.
type Mapping[v Mapable] struct {
m map[string]v
keys []string
empty []QueueIndex
}
func (m *Mapping[v]) Init(size int) {
m.m = make(map[string]v, size)
m.keys = make([]string, 0, size)
m.empty = make([]QueueIndex, 0, size)
}
func (m *Mapping[v]) Put(key string, value v) bool {
// do not allow empty string or 0 byte string as key
if key == "" || key == empty {
return false
}
if len(m.empty) == 0 {
value.SetPos(QueueIndex(len(m.keys)))
m.keys = append(m.keys, key)
} else {
idx := m.empty[0]
m.empty = m.empty[1:]
m.keys[idx] = key
value.SetPos(idx)
}
m.m[key] = value
return true
}
func (m *Mapping[v]) Get(idx QueueIndex) v {
if len(m.keys) == 0 {
return nil
}
k := m.keys[idx]
return m.GetByKey(k)
}
func (m *Mapping[v]) GetNext(idx QueueIndex) (v, error) {
if m.Len() == 0 {
return nil, ErrOutOfBounds
}
i := int(idx)
i++
for i < len(m.keys) {
k := m.keys[i]
if k != empty {
return m.GetByKey(k), nil
}
i++
}
return nil, ErrOutOfBounds
}
func (m *Mapping[v]) GetByKey(key string) v {
// do not allow empty string or 0 byte string as key
if key == "" || key == empty {
return nil
}
return m.m[key]
}
func (m *Mapping[v]) Remove(key string) bool {
e := m.m[key]
if e == nil {
return false
}
delete(m.m, key)
m.keys[e.Pos()] = empty
m.empty = append(m.empty, e.Pos())
return true
}
func (m *Mapping[v]) Keys() []string {
return m.keys
}
func (m *Mapping[v]) Values() []v {
values := make([]v, 0, len(m.keys))
for _, k := range m.keys {
if k == empty {
continue
}
values = append(values, m.m[k])
}
return values
}
func (m *Mapping[v]) Len() int {
return len(m.keys) - len(m.empty)
}

@ -0,0 +1,92 @@
package queue
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestQueueMapping(t *testing.T) {
// Individual sub-tests in this test case are reflecting a scenario and need
// to be executed in sequential order.
m := &Mapping[*LeafQueue]{}
m.Init(16)
require.Equal(t, m.Len(), 0)
t.Run("put item to mapping", func(t *testing.T) {
q1 := newLeafQueue(10, "queue-1")
m.Put(q1.Name(), q1)
require.Equal(t, 1, m.Len())
require.Equal(t, []string{"queue-1"}, m.Keys())
})
t.Run("insert order is preserved if there is no empty slot", func(t *testing.T) {
q2 := newLeafQueue(10, "queue-2")
m.Put(q2.Name(), q2)
require.Equal(t, 2, m.Len())
require.Equal(t, []string{"queue-1", "queue-2"}, m.Keys())
})
t.Run("insert into empty slot if item was removed previously", func(t *testing.T) {
ok := m.Remove("queue-1")
require.True(t, ok)
require.Equal(t, 1, m.Len())
q3 := newLeafQueue(10, "queue-3")
m.Put(q3.Name(), q3)
require.Equal(t, 2, m.Len())
require.Equal(t, []string{"queue-3", "queue-2"}, m.Keys())
})
t.Run("insert order is preserved across keys and values", func(t *testing.T) {
q4 := newLeafQueue(10, "queue-4")
m.Put(q4.Name(), q4)
require.Equal(t, 3, m.Len())
for idx, v := range m.Values() {
require.Equal(t, v.Name(), m.Keys()[idx])
}
})
t.Run("get by key", func(t *testing.T) {
key := "queue-2"
item := m.GetByKey(key)
require.Equal(t, key, item.Name())
require.Equal(t, QueueIndex(1), item.Pos())
})
t.Run("get by empty key returns nil", func(t *testing.T) {
require.Nil(t, m.GetByKey(""))
require.Nil(t, m.GetByKey(empty))
})
t.Run("get next item based on index must not skip when items are removed", func(t *testing.T) {
item, err := m.GetNext(-1)
require.Nil(t, err)
require.Equal(t, "queue-3", item.Name())
item, err = m.GetNext(item.Pos())
require.Nil(t, err)
require.Equal(t, "queue-2", item.Name())
m.Remove(item.Name())
item, err = m.GetNext(item.Pos())
require.Nil(t, err)
require.Equal(t, "queue-4", item.Name())
})
t.Run("get next item out of range returns ErrOutOfBounds", func(t *testing.T) {
item, err := m.GetNext(100)
require.Nil(t, item)
require.ErrorIs(t, err, ErrOutOfBounds)
})
t.Run("get next item skips empty slots", func(t *testing.T) {
item, err := m.GetNext(-1)
require.Nil(t, err)
require.Equal(t, "queue-3", item.Name())
item, err = m.GetNext(item.Pos())
require.Nil(t, err)
require.Equal(t, "queue-4", item.Name())
})
}

@ -25,17 +25,20 @@ var (
// of RequestQueue.GetNextRequestForQuerier method.
type QueueIndex int // nolint:revive
// StartIndexWithLocalQueue is the index of the queue that starts iteration over local and sub queues.
var StartIndexWithLocalQueue QueueIndex = -2
// StartIndex is the index of the queue that starts iteration over sub queues.
var StartIndex QueueIndex = -1
// Modify index to start iteration on the same tenant, for which last queue was returned.
func (ui QueueIndex) ReuseLastIndex() QueueIndex {
if ui < 0 {
if ui < StartIndex {
return ui
}
return ui - 1
}
// StartIndex is the UserIndex that starts iteration over tenant queues from the very first tenant.
var StartIndex QueueIndex = -1
// Request stored into the queue.
type Request any
@ -78,7 +81,7 @@ func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, que
// between calls.
//
// If request is successfully enqueued, successFn is called with the lock held, before any querier can receive the request.
func (q *RequestQueue) Enqueue(tenant string, req Request, maxQueriers int, successFn func()) error {
func (q *RequestQueue) Enqueue(tenant string, path []string, req Request, maxQueriers int, successFn func()) error {
q.mtx.Lock()
defer q.mtx.Unlock()
@ -86,14 +89,14 @@ func (q *RequestQueue) Enqueue(tenant string, req Request, maxQueriers int, succ
return ErrStopped
}
queue := q.queues.getOrAddQueue(tenant, maxQueriers)
queue := q.queues.getOrAddQueue(tenant, path, maxQueriers)
if queue == nil {
// This can only happen if tenant is "".
return errors.New("no queue found")
}
select {
case queue <- req:
case queue.Chan() <- req:
q.queueLength.WithLabelValues(tenant).Inc()
q.cond.Broadcast()
// Call this function while holding a lock. This guarantees that no querier can fetch the request before function returns.
@ -118,7 +121,7 @@ func (q *RequestQueue) Dequeue(ctx context.Context, last QueueIndex, querierID s
FindQueue:
// We need to wait if there are no tenants, or no pending requests for given querier.
for (q.queues.len() == 0 || querierWait) && ctx.Err() == nil && !q.stopped {
for (q.queues.hasTenantQueues() || querierWait) && ctx.Err() == nil && !q.stopped {
querierWait = false
q.cond.Wait(ctx)
}
@ -140,8 +143,8 @@ FindQueue:
// Pick next request from the queue.
for {
request := <-queue
if len(queue) == 0 {
request := queue.Dequeue()
if queue.Len() == 0 {
q.queues.deleteQueue(tenant)
}
@ -177,7 +180,7 @@ func (q *RequestQueue) stopping(_ error) error {
q.mtx.Lock()
defer q.mtx.Unlock()
for q.queues.len() > 0 && q.connectedQuerierWorkers.Load() > 0 {
for !q.queues.hasTenantQueues() && q.connectedQuerierWorkers.Load() > 0 {
q.cond.Wait(context.Background())
}

@ -19,54 +19,82 @@ func BenchmarkGetNextRequest(b *testing.B) {
const numTenants = 50
const queriers = 5
queues := make([]*RequestQueue, 0, b.N)
type generateActor func(i int) []string
benchCases := []struct {
name string
fn generateActor
}{
{
"without sub-queues",
func(i int) []string { return nil },
},
{
"with 1 level of sub-queues",
func(i int) []string { return []string{fmt.Sprintf("user-%d", i%11)} },
},
{
"with 2 levels of sub-queues",
func(i int) []string { return []string{fmt.Sprintf("user-%d", i%11), fmt.Sprintf("tool-%d", i%9)} },
},
}
for n := 0; n < b.N; n++ {
queue := NewRequestQueue(maxOutstandingPerTenant, 0,
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
)
queues = append(queues, queue)
for _, benchCase := range benchCases {
benchCase := benchCase
for ix := 0; ix < queriers; ix++ {
queue.RegisterQuerierConnection(fmt.Sprintf("querier-%d", ix))
}
b.Run(benchCase.name, func(b *testing.B) {
for i := 0; i < maxOutstandingPerTenant; i++ {
for j := 0; j < numTenants; j++ {
userID := strconv.Itoa(j)
queues := make([]*RequestQueue, 0, b.N)
for n := 0; n < b.N; n++ {
queue := NewRequestQueue(maxOutstandingPerTenant, 0,
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
)
queues = append(queues, queue)
err := queue.Enqueue(userID, "request", 0, nil)
if err != nil {
b.Fatal(err)
for ix := 0; ix < queriers; ix++ {
queue.RegisterQuerierConnection(fmt.Sprintf("querier-%d", ix))
}
for i := 0; i < maxOutstandingPerTenant; i++ {
for j := 0; j < numTenants; j++ {
userID := strconv.Itoa(j)
err := queue.Enqueue(userID, benchCase.fn(j), "request", 0, nil)
if err != nil {
b.Fatal(err)
}
}
}
}
}
}
ctx := context.Background()
b.ResetTimer()
querierNames := make([]string, queriers)
for x := 0; x < queriers; x++ {
querierNames[x] = fmt.Sprintf("querier-%d", x)
}
for i := 0; i < b.N; i++ {
idx := StartIndex
for j := 0; j < maxOutstandingPerTenant*numTenants; j++ {
querier := ""
b:
// Find querier with at least one request to avoid blocking in getNextRequestForQuerier.
for _, q := range queues[i].queues.queues {
for qid := range q.queriers {
querier = qid
break b
ctx := context.Background()
b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := 0; j < queriers; j++ {
idx := StartIndexWithLocalQueue
for x := 0; x < maxOutstandingPerTenant*numTenants/queriers; x++ {
r, nidx, err := queues[i].Dequeue(ctx, idx, querierNames[j])
if r == nil {
break
}
if err != nil {
b.Fatal(err)
}
idx = nidx
}
}
}
_, nidx, err := queues[i].Dequeue(ctx, idx, querier)
if err != nil {
b.Fatal(err)
}
idx = nidx
}
})
}
}
func BenchmarkQueueRequest(b *testing.B) {
@ -100,7 +128,7 @@ func BenchmarkQueueRequest(b *testing.B) {
for n := 0; n < b.N; n++ {
for i := 0; i < maxOutstandingPerTenant; i++ {
for j := 0; j < numTenants; j++ {
err := queues[n].Enqueue(users[j], requests[j], 0, nil)
err := queues[n].Enqueue(users[j], nil, requests[j], 0, nil)
if err != nil {
b.Fatal(err)
}
@ -114,7 +142,8 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBe
queue := NewRequestQueue(1, forgetDelay,
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}))
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
)
// Start the queue service.
ctx := context.Background()
@ -141,7 +170,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBe
// Enqueue a request from an user which would be assigned to querier-1.
// NOTE: "user-1" hash falls in the querier-1 shard.
require.NoError(t, queue.Enqueue("user-1", "request", 1, nil))
require.NoError(t, queue.Enqueue("user-1", nil, "request", 1, nil))
startTime := time.Now()
querier2wg.Wait()

@ -28,12 +28,7 @@ type querier struct {
// This struct holds tenant queues for pending requests. It also keeps track of connected queriers,
// and mapping between tenants and queriers.
type tenantQueues struct {
queues map[string]*tenantQueue
// List of all tenants with queues, used for iteration when searching for next queue to handle.
// Tenants removed from the middle are replaced with "". To avoid skipping tenants during iteration, we only shrink
// this list when there are ""'s at the end of it.
tenants []string
mapping *Mapping[*tenantQueue]
maxUserQueueSize int
@ -48,8 +43,15 @@ type tenantQueues struct {
sortedQueriers []string
}
type Queue interface {
Chan() RequestChannel
Dequeue() Request
Name() string
Len() int
}
type tenantQueue struct {
ch RequestChannel
*LeafQueue
// If not nil, only these queriers can handle user requests. If nil, all queriers can.
// We set this to nil if number of available queriers <= maxQueriers.
@ -59,15 +61,13 @@ type tenantQueue struct {
// Seed for shuffle sharding of queriers. This seed is based on userID only and is therefore consistent
// between different frontends.
seed int64
// Points back to 'users' field in queues. Enables quick cleanup.
index int
}
func newTenantQueues(maxUserQueueSize int, forgetDelay time.Duration) *tenantQueues {
mm := &Mapping[*tenantQueue]{}
mm.Init(64)
return &tenantQueues{
queues: map[string]*tenantQueue{},
tenants: nil,
mapping: mm,
maxUserQueueSize: maxUserQueueSize,
forgetDelay: forgetDelay,
queriers: map[string]*querier{},
@ -75,30 +75,19 @@ func newTenantQueues(maxUserQueueSize int, forgetDelay time.Duration) *tenantQue
}
}
func (q *tenantQueues) len() int {
return len(q.queues)
func (q *tenantQueues) hasTenantQueues() bool {
return q.mapping.Len() == 0
}
func (q *tenantQueues) deleteQueue(tenant string) {
uq := q.queues[tenant]
if uq == nil {
return
}
delete(q.queues, tenant)
q.tenants[uq.index] = ""
// Shrink users list size if possible. This is safe, and no users will be skipped during iteration.
for ix := len(q.tenants) - 1; ix >= 0 && q.tenants[ix] == ""; ix-- {
q.tenants = q.tenants[:ix]
}
q.mapping.Remove(tenant)
}
// Returns existing or new queue for a tenant.
// MaxQueriers is used to compute which queriers should handle requests for this tenant.
// If maxQueriers is <= 0, all queriers can handle this tenant's requests.
// If maxQueriers has changed since the last call, queriers for this are recomputed.
func (q *tenantQueues) getOrAddQueue(tenant string, maxQueriers int) RequestChannel {
func (q *tenantQueues) getOrAddQueue(tenant string, path []string, maxQueriers int) Queue {
// Empty tenant is not allowed, as that would break our tenants list ("" is used for free spot).
if tenant == "" {
return nil
@ -108,30 +97,13 @@ func (q *tenantQueues) getOrAddQueue(tenant string, maxQueriers int) RequestChan
maxQueriers = 0
}
uq := q.queues[tenant]
uq := q.mapping.GetByKey(tenant)
if uq == nil {
uq = &tenantQueue{
ch: make(RequestChannel, q.maxUserQueueSize),
seed: util.ShuffleShardSeed(tenant, ""),
index: -1,
}
q.queues[tenant] = uq
// Add user to the list of users... find first free spot, and put it there.
for ix, u := range q.tenants {
if u == "" {
uq.index = ix
q.tenants[ix] = tenant
break
}
}
// ... or add to the end.
if uq.index < 0 {
uq.index = len(q.tenants)
q.tenants = append(q.tenants, tenant)
seed: util.ShuffleShardSeed(tenant, ""),
}
uq.LeafQueue = newLeafQueue(q.maxUserQueueSize, tenant)
q.mapping.Put(tenant, uq)
}
if uq.maxQueriers != maxQueriers {
@ -139,46 +111,50 @@ func (q *tenantQueues) getOrAddQueue(tenant string, maxQueriers int) RequestChan
uq.queriers = shuffleQueriersForTenants(uq.seed, maxQueriers, q.sortedQueriers, nil)
}
return uq.ch
if len(path) == 0 {
return uq
}
return uq.add(path)
}
// Finds next queue for the querier. To support fair scheduling between users, client is expected
// to pass last user index returned by this function as argument. Is there was no previous
// last user index, use -1.
func (q *tenantQueues) getNextQueueForQuerier(lastUserIndex QueueIndex, querierID string) (RequestChannel, string, QueueIndex) {
func (q *tenantQueues) getNextQueueForQuerier(lastUserIndex QueueIndex, querierID string) (Queue, string, QueueIndex) {
uid := lastUserIndex
// at the RequestQueue level we don't have local queues, so start index is -1
if uid == StartIndexWithLocalQueue {
uid = StartIndex
}
// Ensure the querier is not shutting down. If the querier is shutting down, we shouldn't forward
// any more queries to it.
if info := q.queriers[querierID]; info == nil || info.shuttingDown {
return nil, "", uid
}
for iters := 0; iters < len(q.tenants); iters++ {
uid = uid + 1
// Don't use "mod len(q.users)", as that could skip users at the beginning of the list
// for example when q.users has shrunk since last call.
if int(uid) >= len(q.tenants) {
uid = 0
}
u := q.tenants[uid]
if u == "" {
maxIters := len(q.mapping.keys) + 1
for iters := 0; iters < maxIters; iters++ {
tq, err := q.mapping.GetNext(uid)
if err == ErrOutOfBounds {
uid = StartIndex
continue
}
if tq == nil {
break
}
uid = tq.pos
q := q.queues[u]
if q.queriers != nil {
if _, ok := q.queriers[querierID]; !ok {
if tq.queriers != nil {
if _, ok := tq.queriers[querierID]; !ok {
// This querier is not handling the user.
continue
}
}
return q.ch, u, uid
return tq, tq.name, uid
}
return nil, "", uid
}
@ -284,7 +260,7 @@ func (q *tenantQueues) forgetDisconnectedQueriers(now time.Time) int {
func (q *tenantQueues) recomputeUserQueriers() {
scratchpad := make([]string, 0, len(q.sortedQueriers))
for _, uq := range q.queues {
for _, uq := range q.mapping.Values() {
uq.queriers = shuffleQueriersForTenants(uq.seed, uq.maxQueriers, q.sortedQueriers, scratchpad)
}
}

@ -133,7 +133,7 @@ func TestQueuesWithQueriers(t *testing.T) {
getOrAdd(t, uq, uid, maxQueriersPerUser)
// Verify it has maxQueriersPerUser queriers assigned now.
qs := uq.queues[uid].queriers
qs := uq.mapping.GetByKey(uid).queriers
assert.Equal(t, maxQueriersPerUser, len(qs))
}
@ -196,7 +196,7 @@ func TestQueuesConsistency(t *testing.T) {
for i := 0; i < 10000; i++ {
switch r.Int() % 6 {
case 0:
assert.NotNil(t, uq.getOrAddQueue(generateTenant(r), 3))
assert.NotNil(t, uq.getOrAddQueue(generateTenant(r), generateActor(r), 3))
case 1:
qid := generateQuerier(r)
_, _, luid := uq.getNextQueueForQuerier(lastUserIndexes[qid], qid)
@ -389,6 +389,10 @@ func TestQueues_ForgetDelay_ShouldCorrectlyHandleQuerierReconnectingBeforeForget
}
}
func generateActor(r *rand.Rand) []string {
return []string{fmt.Sprint("actor-", r.Int()%10)}
}
func generateTenant(r *rand.Rand) string {
return fmt.Sprint("tenant-", r.Int()%5)
}
@ -397,16 +401,18 @@ func generateQuerier(r *rand.Rand) string {
return fmt.Sprint("querier-", r.Int()%5)
}
func getOrAdd(t *testing.T, uq *tenantQueues, tenant string, maxQueriers int) RequestChannel {
q := uq.getOrAddQueue(tenant, maxQueriers)
func getOrAdd(t *testing.T, uq *tenantQueues, tenant string, maxQueriers int) Queue {
actor := []string{}
q := uq.getOrAddQueue(tenant, actor, maxQueriers)
assert.NotNil(t, q)
assert.NoError(t, isConsistent(uq))
assert.Equal(t, q, uq.getOrAddQueue(tenant, maxQueriers))
assert.Equal(t, q, uq.getOrAddQueue(tenant, actor, maxQueriers))
return q
}
func confirmOrderForQuerier(t *testing.T, uq *tenantQueues, querier string, lastUserIndex QueueIndex, qs ...RequestChannel) QueueIndex {
var n RequestChannel
func confirmOrderForQuerier(t *testing.T, uq *tenantQueues, querier string, lastUserIndex QueueIndex, qs ...Queue) QueueIndex {
t.Helper()
var n Queue
for _, q := range qs {
n, _, lastUserIndex = uq.getNextQueueForQuerier(lastUserIndex, querier)
assert.Equal(t, q, n)
@ -421,24 +427,20 @@ func isConsistent(uq *tenantQueues) error {
}
uc := 0
for ix, u := range uq.tenants {
q := uq.queues[u]
if u != "" && q == nil {
for _, u := range uq.mapping.Keys() {
q := uq.mapping.GetByKey(u)
if u != empty && q == nil {
return fmt.Errorf("user %s doesn't have queue", u)
}
if u == "" && q != nil {
if u == empty && q != nil {
return fmt.Errorf("user %s shouldn't have queue", u)
}
if u == "" {
if u == empty {
continue
}
uc++
if q.index != ix {
return fmt.Errorf("invalid user's index, expected=%d, got=%d", ix, q.index)
}
if q.maxQueriers == 0 && q.queriers != nil {
return fmt.Errorf("user %s has queriers, but maxQueriers=0", u)
}
@ -452,7 +454,7 @@ func isConsistent(uq *tenantQueues) error {
}
}
if uc != len(uq.queues) {
if uc != uq.mapping.Len() {
return fmt.Errorf("inconsistent number of users list and user queues")
}
@ -462,7 +464,8 @@ func isConsistent(uq *tenantQueues) error {
// getUsersByQuerier returns the list of users handled by the provided querierID.
func getUsersByQuerier(queues *tenantQueues, querierID string) []string {
var userIDs []string
for userID, q := range queues.queues {
for _, userID := range queues.mapping.Keys() {
q := queues.mapping.GetByKey(userID)
if q.queriers == nil {
// If it's nil then all queriers can handle this user.
userIDs = append(userIDs, userID)

@ -3,9 +3,11 @@ package scheduler
import (
"context"
"flag"
"fmt"
"io"
"net/http"
"net/textproto"
"strings"
"sync"
"time"
@ -119,6 +121,7 @@ type connectedFrontend struct {
type Config struct {
MaxOutstandingPerTenant int `yaml:"max_outstanding_requests_per_tenant"`
MaxQueueHierarchyLevels int `yaml:"max_queue_hierarchy_levels"`
QuerierForgetDelay time.Duration `yaml:"querier_forget_delay"`
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=This configures the gRPC client used to report errors back to the query-frontend."`
// Schedulers ring
@ -128,6 +131,7 @@ type Config struct {
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxOutstandingPerTenant, "query-scheduler.max-outstanding-requests-per-tenant", 100, "Maximum number of outstanding requests per tenant per query-scheduler. In-flight requests above this limit will fail with HTTP response status code 429.")
f.IntVar(&cfg.MaxQueueHierarchyLevels, "query-scheduler.max-queue-hierarchy-levels", 3, "Maximum number of levels of nesting of hierarchical queues. 0 means that hierarchical queues are disabled.")
f.DurationVar(&cfg.QuerierForgetDelay, "query-scheduler.querier-forget-delay", 0, "If a querier disconnects without sending notification about graceful shutdown, the query-scheduler will keep the querier in the tenant's shard until the forget delay has passed. This feature is useful to reduce the blast radius when shuffle-sharding is enabled.")
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("query-scheduler.grpc-client-config", f)
f.BoolVar(&cfg.UseSchedulerRing, "query-scheduler.use-scheduler-ring", false, "Set to true to have the query schedulers create and place themselves in a ring. If no frontend_address or scheduler_address are present anywhere else in the configuration, Loki will toggle this value to true.")
@ -154,6 +158,7 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe
Name: "cortex_query_scheduler_discarded_requests_total",
Help: "Total number of query requests discarded.",
}, []string{"user"})
s.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, s.queueLength, s.discardedRequests)
s.queueDuration = promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
@ -245,7 +250,7 @@ type Limits interface {
type schedulerRequest struct {
frontendAddress string
userID string
tenantID string
queryID uint64
request *httpgrpc.HTTPRequest
statsEnabled bool
@ -385,11 +390,9 @@ func (s *Scheduler) enqueueRequest(frontendContext context.Context, frontendAddr
return err
}
userID := msg.GetUserID()
req := &schedulerRequest{
frontendAddress: frontendAddr,
userID: msg.UserID,
tenantID: msg.UserID,
queryID: msg.QueryID,
request: msg.HttpRequest,
statsEnabled: msg.StatsEnabled,
@ -403,14 +406,31 @@ func (s *Scheduler) enqueueRequest(frontendContext context.Context, frontendAddr
req.ctxCancel = cancel
// aggregate the max queriers limit in the case of a multi tenant query
tenantIDs, err := tenant.TenantIDsFromOrgID(userID)
tenantIDs, err := tenant.TenantIDsFromOrgID(req.tenantID)
if err != nil {
return err
}
maxQueriers := validation.SmallestPositiveNonZeroIntPerTenant(tenantIDs, s.limits.MaxQueriersPerUser)
s.activeUsers.UpdateUserTimestamp(userID, now)
return s.requestQueue.Enqueue(userID, req, maxQueriers, func() {
var queuePath []string
if s.cfg.MaxQueueHierarchyLevels > 0 {
queuePath = msg.QueuePath
if len(queuePath) > s.cfg.MaxQueueHierarchyLevels {
msg := fmt.Sprintf(
"The header %s with value '%s' would result in a sub-queue which is "+
"nested %d levels deep, however only %d levels are allowed based on the "+
"configuration setting -query-scheduler.max-queue-hierarchy-levels",
lokihttpreq.LokiActorPathHeader,
strings.Join(queuePath, lokihttpreq.LokiActorPathDelimiter),
len(queuePath),
s.cfg.MaxQueueHierarchyLevels,
)
return fmt.Errorf("desired queue level exceeds maxium depth of queue hierarchy: %s", msg)
}
}
s.activeUsers.UpdateUserTimestamp(req.tenantID, now)
return s.requestQueue.Enqueue(req.tenantID, queuePath, req, maxQueriers, func() {
shouldCancel = false
s.pendingRequestsMu.Lock()
@ -455,6 +475,10 @@ func (s *Scheduler) QuerierLoop(querier schedulerpb.SchedulerForQuerier_QuerierL
}
lastIndex = idx
// This really should not happen, but log additional information before the scheduler panics.
if req == nil {
level.Error(s.log).Log("msg", "dequeue() call resulted in nil response", "querier", querierID)
}
r := req.(*schedulerRequest)
reqQueueTime := time.Since(r.queueTime)
@ -511,7 +535,7 @@ func (s *Scheduler) forwardRequestToQuerier(querier schedulerpb.SchedulerForQuer
errCh := make(chan error, 1)
go func() {
err := querier.Send(&schedulerpb.SchedulerToQuerier{
UserID: req.userID,
UserID: req.tenantID,
QueryID: req.queryID,
FrontendAddress: req.frontendAddress,
HttpRequest: req.request,
@ -567,7 +591,7 @@ func (s *Scheduler) forwardErrorToFrontend(ctx context.Context, req *schedulerRe
client := frontendv2pb.NewFrontendForQuerierClient(conn)
userCtx := user.InjectOrgID(ctx, req.userID)
userCtx := user.InjectOrgID(ctx, req.tenantID)
_, err = client.QueryResult(userCtx, &frontendv2pb.QueryResultRequest{
QueryID: req.queryID,
HttpResponse: &httpgrpc.HTTPResponse{

@ -219,6 +219,8 @@ type FrontendToScheduler struct {
UserID string `protobuf:"bytes,4,opt,name=userID,proto3" json:"userID,omitempty"`
HttpRequest *httpgrpc.HTTPRequest `protobuf:"bytes,5,opt,name=httpRequest,proto3" json:"httpRequest,omitempty"`
StatsEnabled bool `protobuf:"varint,6,opt,name=statsEnabled,proto3" json:"statsEnabled,omitempty"`
// Path to queue to which the request will be enqueued.
QueuePath []string `protobuf:"bytes,7,rep,name=queuePath,proto3" json:"queuePath,omitempty"`
}
func (m *FrontendToScheduler) Reset() { *m = FrontendToScheduler{} }
@ -295,6 +297,13 @@ func (m *FrontendToScheduler) GetStatsEnabled() bool {
return false
}
func (m *FrontendToScheduler) GetQueuePath() []string {
if m != nil {
return m.QueuePath
}
return nil
}
type SchedulerToFrontend struct {
Status SchedulerToFrontendStatus `protobuf:"varint,1,opt,name=status,proto3,enum=schedulerpb.SchedulerToFrontendStatus" json:"status,omitempty"`
Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"`
@ -440,48 +449,49 @@ func init() {
}
var fileDescriptor_c3657184e8d38989 = []byte{
// 650 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x54, 0xcf, 0x53, 0xd3, 0x40,
0x14, 0xce, 0x96, 0xb6, 0xc0, 0x2b, 0x6a, 0x5c, 0x40, 0x6b, 0x07, 0x97, 0x4e, 0xc7, 0x71, 0x0a,
0x87, 0xd6, 0xa9, 0xce, 0xe8, 0x81, 0x71, 0xa6, 0x42, 0x90, 0x8e, 0x98, 0xc2, 0x76, 0x3b, 0xfe,
0xb8, 0x74, 0x68, 0xbb, 0xb4, 0x0c, 0x90, 0x0d, 0x9b, 0x44, 0xa6, 0x37, 0x8f, 0x1e, 0xfd, 0x33,
0xfc, 0x53, 0xbc, 0x38, 0xc3, 0x91, 0x83, 0x07, 0x09, 0x17, 0x8f, 0xfc, 0x09, 0x4e, 0xd3, 0xa4,
0xa4, 0x98, 0x02, 0xb7, 0xb7, 0x2f, 0xdf, 0x97, 0x7c, 0xdf, 0xf7, 0xde, 0x06, 0x96, 0xcc, 0xfd,
0x4e, 0xd1, 0x6a, 0x75, 0x79, 0xdb, 0x39, 0xe0, 0xf2, 0xb2, 0x32, 0x9b, 0x97, 0x75, 0xc1, 0x94,
0xc2, 0x16, 0x38, 0x15, 0x7a, 0x98, 0x79, 0xd1, 0xd9, 0xb3, 0xbb, 0x4e, 0xb3, 0xd0, 0x12, 0x87,
0xc5, 0x63, 0xbe, 0xf3, 0x85, 0x1f, 0x0b, 0xb9, 0x6f, 0x15, 0x5b, 0xe2, 0xf0, 0x50, 0x18, 0xc5,
0xae, 0x6d, 0x9b, 0x1d, 0x69, 0xb6, 0x86, 0xc5, 0xe0, 0x15, 0x99, 0xb9, 0x8e, 0xe8, 0x08, 0xaf,
0x2c, 0xf6, 0xab, 0x41, 0x37, 0x57, 0x02, 0xbc, 0xed, 0x70, 0xb9, 0xc7, 0x25, 0x13, 0xb5, 0xe0,
0x1b, 0x78, 0x01, 0xa6, 0x8f, 0x06, 0xdd, 0xca, 0x5a, 0x1a, 0x65, 0x51, 0x7e, 0x9a, 0x5e, 0x36,
0x72, 0xbf, 0x10, 0xe0, 0x21, 0x96, 0x09, 0x9f, 0x8f, 0xd3, 0x30, 0xd9, 0xc7, 0xf4, 0x7c, 0x4a,
0x9c, 0x06, 0x47, 0xfc, 0x12, 0x52, 0x7d, 0x31, 0x94, 0x1f, 0x39, 0xdc, 0xb2, 0xd3, 0xb1, 0x2c,
0xca, 0xa7, 0x4a, 0xf3, 0x85, 0xa1, 0xc0, 0x0d, 0xc6, 0xb6, 0xfc, 0x87, 0x34, 0x8c, 0xc4, 0x79,
0xb8, 0xb7, 0x2b, 0x85, 0x61, 0x73, 0xa3, 0x5d, 0x6e, 0xb7, 0x25, 0xb7, 0xac, 0xf4, 0x84, 0xa7,
0xe6, 0x6a, 0x1b, 0x3f, 0x80, 0xa4, 0x63, 0x79, 0x72, 0xe3, 0x1e, 0xc0, 0x3f, 0xe1, 0x1c, 0xcc,
0x58, 0xf6, 0x8e, 0x6d, 0x69, 0xc6, 0x4e, 0xf3, 0x80, 0xb7, 0xd3, 0x89, 0x2c, 0xca, 0x4f, 0xd1,
0x91, 0x5e, 0xee, 0x5b, 0x0c, 0x66, 0xd7, 0xfd, 0xf7, 0x85, 0x53, 0x78, 0x05, 0x71, 0xbb, 0x67,
0x72, 0xcf, 0xcd, 0xdd, 0xd2, 0x93, 0x42, 0x68, 0x06, 0x85, 0x08, 0x3c, 0xeb, 0x99, 0x9c, 0x7a,
0x8c, 0x28, 0xdd, 0xb1, 0x68, 0xdd, 0xa1, 0xd0, 0x26, 0x46, 0x43, 0x1b, 0xe7, 0xe8, 0x4a, 0x98,
0x89, 0x5b, 0x87, 0x79, 0x35, 0x8a, 0x64, 0x44, 0x14, 0xfb, 0x30, 0x1b, 0x9a, 0x6c, 0x60, 0x12,
0xbf, 0x86, 0x64, 0x1f, 0xe6, 0x58, 0x7e, 0x16, 0x4f, 0x47, 0xb2, 0x88, 0x60, 0xd4, 0x3c, 0x34,
0xf5, 0x59, 0x78, 0x0e, 0x12, 0x5c, 0x4a, 0x21, 0xfd, 0x14, 0x06, 0x87, 0xdc, 0x0a, 0x2c, 0xe8,
0xc2, 0xde, 0xdb, 0xed, 0xf9, 0x1b, 0x54, 0xeb, 0x3a, 0x76, 0x5b, 0x1c, 0x1b, 0x81, 0xe0, 0xeb,
0xb7, 0x70, 0x11, 0x1e, 0x8f, 0x61, 0x5b, 0xa6, 0x30, 0x2c, 0xbe, 0xbc, 0x02, 0x0f, 0xc7, 0x4c,
0x09, 0x4f, 0x41, 0xbc, 0xa2, 0x57, 0x98, 0xaa, 0xe0, 0x14, 0x4c, 0x6a, 0xfa, 0x76, 0x5d, 0xab,
0x6b, 0x2a, 0xc2, 0x00, 0xc9, 0xd5, 0xb2, 0xbe, 0xaa, 0x6d, 0xaa, 0xb1, 0xe5, 0x16, 0x3c, 0x1a,
0xeb, 0x0b, 0x27, 0x21, 0x56, 0x7d, 0xa7, 0x2a, 0x38, 0x0b, 0x0b, 0xac, 0x5a, 0x6d, 0xbc, 0x2f,
0xeb, 0x9f, 0x1a, 0x54, 0xdb, 0xae, 0x6b, 0x35, 0x56, 0x6b, 0x6c, 0x69, 0xb4, 0xc1, 0x34, 0xbd,
0xac, 0x33, 0x15, 0xe1, 0x69, 0x48, 0x68, 0x94, 0x56, 0xa9, 0x1a, 0xc3, 0xf7, 0xe1, 0x4e, 0x6d,
0xa3, 0xce, 0x58, 0x45, 0x7f, 0xdb, 0x58, 0xab, 0x7e, 0xd0, 0xd5, 0x89, 0xd2, 0x6f, 0x14, 0xca,
0x7b, 0x5d, 0xc8, 0xe0, 0x2a, 0xd5, 0x21, 0xe5, 0x97, 0x9b, 0x42, 0x98, 0x78, 0x71, 0x24, 0xee,
0xff, 0xef, 0x6b, 0x66, 0x71, 0xdc, 0x3c, 0x7c, 0x6c, 0x4e, 0xc9, 0xa3, 0x67, 0x08, 0x1b, 0x30,
0x1f, 0x19, 0x19, 0x5e, 0x1a, 0xe1, 0x5f, 0x37, 0x94, 0xcc, 0xf2, 0x6d, 0xa0, 0x83, 0x09, 0x94,
0x4c, 0x98, 0x0b, 0xbb, 0x1b, 0xae, 0xd3, 0x47, 0x98, 0x09, 0x6a, 0xcf, 0x5f, 0xf6, 0xa6, 0xab,
0x95, 0xc9, 0xde, 0xb4, 0x70, 0x03, 0x87, 0x6f, 0xca, 0x27, 0x67, 0x44, 0x39, 0x3d, 0x23, 0xca,
0xc5, 0x19, 0x41, 0x5f, 0x5d, 0x82, 0x7e, 0xb8, 0x04, 0xfd, 0x74, 0x09, 0x3a, 0x71, 0x09, 0xfa,
0xe3, 0x12, 0xf4, 0xd7, 0x25, 0xca, 0x85, 0x4b, 0xd0, 0xf7, 0x73, 0xa2, 0x9c, 0x9c, 0x13, 0xe5,
0xf4, 0x9c, 0x28, 0x9f, 0xc3, 0x7f, 0xd7, 0x66, 0xd2, 0xfb, 0x31, 0x3e, 0xff, 0x17, 0x00, 0x00,
0xff, 0xff, 0x72, 0x76, 0x0e, 0xdb, 0x9e, 0x05, 0x00, 0x00,
// 667 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x94, 0xcb, 0x4e, 0xdb, 0x4c,
0x14, 0xc7, 0x3d, 0xb9, 0x01, 0x27, 0x7c, 0x5f, 0xdd, 0x01, 0xda, 0x34, 0xa2, 0x83, 0x65, 0x55,
0x55, 0x60, 0x91, 0x54, 0x69, 0xa5, 0x76, 0x81, 0x2a, 0xa5, 0x60, 0x4a, 0x54, 0xea, 0x80, 0xe3,
0xa8, 0x97, 0x4d, 0x94, 0xcb, 0x90, 0x20, 0xc0, 0x63, 0xec, 0x71, 0x51, 0x76, 0x7d, 0x84, 0x3e,
0x45, 0xd5, 0x47, 0xe9, 0xa6, 0x12, 0x4b, 0x16, 0x5d, 0x14, 0xb3, 0xe9, 0x92, 0x47, 0xa8, 0x62,
0x4f, 0x82, 0x03, 0x09, 0xb0, 0x3b, 0x73, 0xfc, 0x3f, 0xe3, 0x73, 0x7e, 0xff, 0x99, 0x81, 0x65,
0x7b, 0xbf, 0x53, 0x70, 0x5b, 0x5d, 0xda, 0xf6, 0x0e, 0xa8, 0x73, 0x19, 0xd9, 0xcd, 0xcb, 0x38,
0x6f, 0x3b, 0x8c, 0x33, 0x9c, 0x8e, 0x7c, 0xcc, 0xbe, 0xe8, 0xec, 0xf1, 0xae, 0xd7, 0xcc, 0xb7,
0xd8, 0x61, 0xe1, 0x98, 0x36, 0xbe, 0xd0, 0x63, 0xe6, 0xec, 0xbb, 0x85, 0x16, 0x3b, 0x3c, 0x64,
0x56, 0xa1, 0xcb, 0xb9, 0xdd, 0x71, 0xec, 0xd6, 0x30, 0x08, 0xb7, 0xc8, 0xce, 0x77, 0x58, 0x87,
0x05, 0x61, 0xa1, 0x1f, 0x85, 0x59, 0xb5, 0x08, 0x78, 0xc7, 0xa3, 0xce, 0x1e, 0x75, 0x4c, 0x56,
0x1d, 0xfc, 0x03, 0x2f, 0xc2, 0xcc, 0x51, 0x98, 0x2d, 0xaf, 0x67, 0x90, 0x82, 0x72, 0x33, 0xc6,
0x65, 0x42, 0xfd, 0x85, 0x00, 0x0f, 0xb5, 0x26, 0x13, 0xf5, 0x38, 0x03, 0x53, 0x7d, 0x4d, 0x4f,
0x94, 0x24, 0x8c, 0xc1, 0x12, 0xbf, 0x84, 0x74, 0xbf, 0x19, 0x83, 0x1e, 0x79, 0xd4, 0xe5, 0x99,
0x98, 0x82, 0x72, 0xe9, 0xe2, 0x42, 0x7e, 0xd8, 0xe0, 0xa6, 0x69, 0x6e, 0x8b, 0x8f, 0x46, 0x54,
0x89, 0x73, 0x70, 0x6f, 0xd7, 0x61, 0x16, 0xa7, 0x56, 0xbb, 0xd4, 0x6e, 0x3b, 0xd4, 0x75, 0x33,
0xf1, 0xa0, 0x9b, 0xab, 0x69, 0xfc, 0x00, 0x52, 0x9e, 0x1b, 0xb4, 0x9b, 0x08, 0x04, 0x62, 0x85,
0x55, 0x98, 0x75, 0x79, 0x83, 0xbb, 0x9a, 0xd5, 0x68, 0x1e, 0xd0, 0x76, 0x26, 0xa9, 0xa0, 0xdc,
0xb4, 0x31, 0x92, 0x53, 0xbf, 0xc7, 0x60, 0x6e, 0x43, 0xec, 0x17, 0xa5, 0xf0, 0x0a, 0x12, 0xbc,
0x67, 0xd3, 0x60, 0x9a, 0xff, 0x8b, 0x4f, 0xf2, 0x11, 0x0f, 0xf2, 0x63, 0xf4, 0x66, 0xcf, 0xa6,
0x46, 0x50, 0x31, 0xae, 0xef, 0xd8, 0xf8, 0xbe, 0x23, 0xd0, 0xe2, 0xa3, 0xd0, 0x26, 0x4d, 0x74,
0x05, 0x66, 0xf2, 0xce, 0x30, 0xaf, 0xa2, 0x48, 0x5d, 0x47, 0x21, 0x8c, 0xf7, 0xe8, 0x76, 0x83,
0x77, 0x33, 0x53, 0x4a, 0x5c, 0x18, 0x1f, 0x26, 0xd4, 0x7d, 0x98, 0x8b, 0xf8, 0x3e, 0x40, 0x80,
0x5f, 0x43, 0xaa, 0xbf, 0x89, 0xe7, 0x0a, 0x52, 0x4f, 0x47, 0x48, 0x8d, 0xa9, 0xa8, 0x06, 0x6a,
0x43, 0x54, 0xe1, 0x79, 0x48, 0x52, 0xc7, 0x61, 0x8e, 0x60, 0x14, 0x2e, 0xd4, 0x55, 0x58, 0xd4,
0x19, 0xdf, 0xdb, 0xed, 0x89, 0xf3, 0x55, 0xed, 0x7a, 0xbc, 0xcd, 0x8e, 0xad, 0xc1, 0x38, 0x37,
0x9f, 0xd1, 0x25, 0x78, 0x3c, 0xa1, 0xda, 0xb5, 0x99, 0xe5, 0xd2, 0x95, 0x55, 0x78, 0x38, 0xc1,
0x43, 0x3c, 0x0d, 0x89, 0xb2, 0x5e, 0x36, 0x65, 0x09, 0xa7, 0x61, 0x4a, 0xd3, 0x77, 0x6a, 0x5a,
0x4d, 0x93, 0x11, 0x06, 0x48, 0xad, 0x95, 0xf4, 0x35, 0x6d, 0x4b, 0x8e, 0xad, 0xb4, 0xe0, 0xd1,
0xc4, 0xb9, 0x70, 0x0a, 0x62, 0x95, 0x77, 0xb2, 0x84, 0x15, 0x58, 0x34, 0x2b, 0x95, 0xfa, 0xfb,
0x92, 0xfe, 0xa9, 0x6e, 0x68, 0x3b, 0x35, 0xad, 0x6a, 0x56, 0xeb, 0xdb, 0x9a, 0x51, 0x37, 0x35,
0xbd, 0xa4, 0x9b, 0x32, 0xc2, 0x33, 0x90, 0xd4, 0x0c, 0xa3, 0x62, 0xc8, 0x31, 0x7c, 0x1f, 0xfe,
0xab, 0x6e, 0xd6, 0x4c, 0xb3, 0xac, 0xbf, 0xad, 0xaf, 0x57, 0x3e, 0xe8, 0x72, 0xbc, 0xf8, 0x1b,
0x45, 0x78, 0x6f, 0x30, 0x67, 0x70, 0xd1, 0x6a, 0x90, 0x16, 0xe1, 0x16, 0x63, 0x36, 0x5e, 0x1a,
0xc1, 0x7d, 0xfd, 0x36, 0x67, 0x97, 0x26, 0xf9, 0x21, 0xb4, 0xaa, 0x94, 0x43, 0xcf, 0x10, 0xb6,
0x60, 0x61, 0x2c, 0x32, 0xbc, 0x3c, 0x52, 0x7f, 0x93, 0x29, 0xd9, 0x95, 0xbb, 0x48, 0x43, 0x07,
0x8a, 0x36, 0xcc, 0x47, 0xa7, 0x1b, 0x1e, 0xa7, 0x8f, 0x30, 0x3b, 0x88, 0x83, 0xf9, 0x94, 0xdb,
0x2e, 0x5e, 0x56, 0xb9, 0xed, 0xc0, 0x85, 0x13, 0xbe, 0x29, 0x9d, 0x9c, 0x11, 0xe9, 0xf4, 0x8c,
0x48, 0x17, 0x67, 0x04, 0x7d, 0xf5, 0x09, 0xfa, 0xe1, 0x13, 0xf4, 0xd3, 0x27, 0xe8, 0xc4, 0x27,
0xe8, 0x8f, 0x4f, 0xd0, 0x5f, 0x9f, 0x48, 0x17, 0x3e, 0x41, 0xdf, 0xce, 0x89, 0x74, 0x72, 0x4e,
0xa4, 0xd3, 0x73, 0x22, 0x7d, 0x8e, 0xbe, 0xbd, 0xcd, 0x54, 0xf0, 0x6c, 0x3e, 0xff, 0x17, 0x00,
0x00, 0xff, 0xff, 0xf4, 0x06, 0xc6, 0xd8, 0xbc, 0x05, 0x00, 0x00,
}
func (x FrontendToSchedulerType) String() string {
@ -595,6 +605,14 @@ func (this *FrontendToScheduler) Equal(that interface{}) bool {
if this.StatsEnabled != that1.StatsEnabled {
return false
}
if len(this.QueuePath) != len(that1.QueuePath) {
return false
}
for i := range this.QueuePath {
if this.QueuePath[i] != that1.QueuePath[i] {
return false
}
}
return true
}
func (this *SchedulerToFrontend) Equal(that interface{}) bool {
@ -699,7 +717,7 @@ func (this *FrontendToScheduler) GoString() string {
if this == nil {
return "nil"
}
s := make([]string, 0, 10)
s := make([]string, 0, 11)
s = append(s, "&schedulerpb.FrontendToScheduler{")
s = append(s, "Type: "+fmt.Sprintf("%#v", this.Type)+",\n")
s = append(s, "FrontendAddress: "+fmt.Sprintf("%#v", this.FrontendAddress)+",\n")
@ -709,6 +727,7 @@ func (this *FrontendToScheduler) GoString() string {
s = append(s, "HttpRequest: "+fmt.Sprintf("%#v", this.HttpRequest)+",\n")
}
s = append(s, "StatsEnabled: "+fmt.Sprintf("%#v", this.StatsEnabled)+",\n")
s = append(s, "QueuePath: "+fmt.Sprintf("%#v", this.QueuePath)+",\n")
s = append(s, "}")
return strings.Join(s, "")
}
@ -1144,6 +1163,15 @@ func (m *FrontendToScheduler) MarshalToSizedBuffer(dAtA []byte) (int, error) {
_ = i
var l int
_ = l
if len(m.QueuePath) > 0 {
for iNdEx := len(m.QueuePath) - 1; iNdEx >= 0; iNdEx-- {
i -= len(m.QueuePath[iNdEx])
copy(dAtA[i:], m.QueuePath[iNdEx])
i = encodeVarintScheduler(dAtA, i, uint64(len(m.QueuePath[iNdEx])))
i--
dAtA[i] = 0x3a
}
}
if m.StatsEnabled {
i--
if m.StatsEnabled {
@ -1359,6 +1387,12 @@ func (m *FrontendToScheduler) Size() (n int) {
if m.StatsEnabled {
n += 2
}
if len(m.QueuePath) > 0 {
for _, s := range m.QueuePath {
l = len(s)
n += 1 + l + sovScheduler(uint64(l))
}
}
return n
}
@ -1441,6 +1475,7 @@ func (this *FrontendToScheduler) String() string {
`UserID:` + fmt.Sprintf("%v", this.UserID) + `,`,
`HttpRequest:` + strings.Replace(fmt.Sprintf("%v", this.HttpRequest), "HTTPRequest", "httpgrpc.HTTPRequest", 1) + `,`,
`StatsEnabled:` + fmt.Sprintf("%v", this.StatsEnabled) + `,`,
`QueuePath:` + fmt.Sprintf("%v", this.QueuePath) + `,`,
`}`,
}, "")
return s
@ -1947,6 +1982,38 @@ func (m *FrontendToScheduler) Unmarshal(dAtA []byte) error {
}
}
m.StatsEnabled = bool(v != 0)
case 7:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field QueuePath", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowScheduler
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthScheduler
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthScheduler
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.QueuePath = append(m.QueuePath, string(dAtA[iNdEx:postIndex]))
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipScheduler(dAtA[iNdEx:])

@ -77,6 +77,8 @@ message FrontendToScheduler {
string userID = 4;
httpgrpc.HTTPRequest httpRequest = 5;
bool statsEnabled = 6;
// Path to queue to which the request will be enqueued.
repeated string queuePath = 7;
}
enum SchedulerToFrontendStatus {

@ -0,0 +1,48 @@
package httpreq
import (
"context"
"net/http"
"strings"
"github.com/weaveworks/common/middleware"
)
type headerContextKey string
var (
// LokiActorPathHeader is the name of the header e.g. used to enqueue requests in hierarchical queues.
LokiActorPathHeader = "X-Loki-Actor-Path"
// LokiActorPathDelimiter is the delimiter used to serialise the hierarchy of the actor.
LokiActorPathDelimiter = "|"
)
func PropagateHeadersMiddleware(headers ...string) middleware.Interface {
return middleware.Func(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
for _, h := range headers {
value := req.Header.Get(h)
if value != "" {
ctx := req.Context()
ctx = context.WithValue(ctx, headerContextKey(h), value)
req = req.WithContext(ctx)
}
}
next.ServeHTTP(w, req)
})
})
}
func ExtractHeader(ctx context.Context, name string) string {
s, _ := ctx.Value(headerContextKey(name)).(string)
return s
}
func ExtractActorPath(ctx context.Context) []string {
value := ExtractHeader(ctx, LokiActorPathHeader)
if value == "" {
return nil
}
return strings.Split(value, LokiActorPathDelimiter)
}

@ -11,6 +11,7 @@ import (
// NOTE(kavi): Why new type?
// Our linter won't allow to use basic types like string to be used as key in context.
// TODO(chaudum): Can we safely change the type of the header key?
type ctxKey string
var (

Loading…
Cancel
Save