Clean up scheduler code to prepare upcoming changes (#8643)

In preparation of https://github.com/grafana/loki/pull/8585 this PR contains some cleanups.

Changes in this PR do not change the implementation, but only rename functions and data structures to make them more generic and/or suitable for extending.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/8667/head
Christian Haudum 3 years ago committed by GitHub
parent a15365202d
commit a5652c8027
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      pkg/lokifrontend/frontend/v1/frontend.go
  2. 69
      pkg/scheduler/queue/queue.go
  3. 14
      pkg/scheduler/queue/queue_test.go
  4. 104
      pkg/scheduler/queue/tenant_queues.go
  5. 46
      pkg/scheduler/queue/tenant_queues_test.go
  6. 17
      pkg/scheduler/scheduler.go

@ -198,14 +198,14 @@ func (f *Frontend) Process(server frontendv1pb.Frontend_ProcessServer) error {
f.requestQueue.RegisterQuerierConnection(querierID)
defer f.requestQueue.UnregisterQuerierConnection(querierID)
lastUserIndex := queue.FirstUser()
lastIndex := queue.StartIndex
for {
reqWrapper, idx, err := f.requestQueue.GetNextRequestForQuerier(server.Context(), lastUserIndex, querierID)
reqWrapper, idx, err := f.requestQueue.Dequeue(server.Context(), lastIndex, querierID)
if err != nil {
return err
}
lastUserIndex = idx
lastIndex = idx
req := reqWrapper.(*request)
@ -224,7 +224,7 @@ func (f *Frontend) Process(server frontendv1pb.Frontend_ProcessServer) error {
it's possible that it's own queue would perpetually contain only expired requests.
*/
if req.originalCtx.Err() != nil {
lastUserIndex = lastUserIndex.ReuseLastUser()
lastIndex = lastIndex.ReuseLastIndex()
continue
}
@ -322,7 +322,7 @@ func (f *Frontend) queueRequest(ctx context.Context, req *request) error {
joinedTenantID := tenant.JoinTenantIDs(tenantIDs)
f.activeUsers.UpdateUserTimestamp(joinedTenantID, now)
err = f.requestQueue.EnqueueRequest(joinedTenantID, req, maxQueriers, nil)
err = f.requestQueue.Enqueue(joinedTenantID, req, maxQueriers, nil)
if err == queue.ErrTooManyRequests {
return errTooManyRequest
}

@ -21,29 +21,28 @@ var (
ErrStopped = errors.New("queue is stopped")
)
// UserIndex is opaque type that allows to resume iteration over users between successive calls
// QueueIndex is opaque type that allows to resume iteration over tenants between successive calls
// of RequestQueue.GetNextRequestForQuerier method.
type UserIndex struct {
last int
}
type QueueIndex int // nolint:revive
// Modify index to start iteration on the same user, for which last queue was returned.
func (ui UserIndex) ReuseLastUser() UserIndex {
if ui.last >= 0 {
return UserIndex{last: ui.last - 1}
// Modify index to start iteration on the same tenant, for which last queue was returned.
func (ui QueueIndex) ReuseLastIndex() QueueIndex {
if ui < 0 {
return ui
}
return ui
return ui - 1
}
// FirstUser returns UserIndex that starts iteration over user queues from the very first user.
func FirstUser() UserIndex {
return UserIndex{last: -1}
}
// StartIndex is the UserIndex that starts iteration over tenant queues from the very first tenant.
var StartIndex QueueIndex = -1
// Request stored into the queue.
type Request interface{}
type Request any
// RequestChannel is a channel that queues Requests
type RequestChannel chan Request
// RequestQueue holds incoming requests in per-user queues. It also assigns each user specified number of queriers,
// RequestQueue holds incoming requests in per-tenant queues. It also assigns each tenant specified number of queriers,
// and when querier asks for next request to handle (using GetNextRequestForQuerier), it returns requests
// in a fair fashion.
type RequestQueue struct {
@ -53,16 +52,16 @@ type RequestQueue struct {
mtx sync.Mutex
cond contextCond // Notified when request is enqueued or dequeued, or querier is disconnected.
queues *queues
queues *tenantQueues
stopped bool
queueLength *prometheus.GaugeVec // Per user and reason.
discardedRequests *prometheus.CounterVec // Per user.
queueLength *prometheus.GaugeVec // Per tenant and reason.
discardedRequests *prometheus.CounterVec // Per tenant.
}
func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec) *RequestQueue {
q := &RequestQueue{
queues: newUserQueues(maxOutstandingPerTenant, forgetDelay),
queues: newTenantQueues(maxOutstandingPerTenant, forgetDelay),
connectedQuerierWorkers: atomic.NewInt32(0),
queueLength: queueLength,
discardedRequests: discardedRequests,
@ -74,12 +73,12 @@ func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, que
return q
}
// EnqueueRequest puts the request into the queue. MaxQueries is user-specific value that specifies how many queriers can
// this user use (zero or negative = all queriers). It is passed to each EnqueueRequest, because it can change
// Enqueue puts the request into the queue. MaxQueries is tenant-specific value that specifies how many queriers can
// this tenant use (zero or negative = all queriers). It is passed to each Enqueue, because it can change
// between calls.
//
// If request is successfully enqueued, successFn is called with the lock held, before any querier can receive the request.
func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers int, successFn func()) error {
func (q *RequestQueue) Enqueue(tenant string, req Request, maxQueriers int, successFn func()) error {
q.mtx.Lock()
defer q.mtx.Unlock()
@ -87,15 +86,15 @@ func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers in
return ErrStopped
}
queue := q.queues.getOrAddQueue(userID, maxQueriers)
queue := q.queues.getOrAddQueue(tenant, maxQueriers)
if queue == nil {
// This can only happen if userID is "".
// This can only happen if tenant is "".
return errors.New("no queue found")
}
select {
case queue <- req:
q.queueLength.WithLabelValues(userID).Inc()
q.queueLength.WithLabelValues(tenant).Inc()
q.cond.Broadcast()
// Call this function while holding a lock. This guarantees that no querier can fetch the request before function returns.
if successFn != nil {
@ -103,22 +102,22 @@ func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers in
}
return nil
default:
q.discardedRequests.WithLabelValues(userID).Inc()
q.discardedRequests.WithLabelValues(tenant).Inc()
return ErrTooManyRequests
}
}
// GetNextRequestForQuerier find next user queue and takes the next request off of it. Will block if there are no requests.
// By passing user index from previous call of this method, querier guarantees that it iterates over all users fairly.
// If querier finds that request from the user is already expired, it can get a request for the same user by using UserIndex.ReuseLastUser.
func (q *RequestQueue) GetNextRequestForQuerier(ctx context.Context, last UserIndex, querierID string) (Request, UserIndex, error) {
// Dequeue find next tenant queue and takes the next request off of it. Will block if there are no requests.
// By passing tenant index from previous call of this method, querier guarantees that it iterates over all tenants fairly.
// If querier finds that request from the tenant is already expired, it can get a request for the same tenant by using UserIndex.ReuseLastUser.
func (q *RequestQueue) Dequeue(ctx context.Context, last QueueIndex, querierID string) (Request, QueueIndex, error) {
q.mtx.Lock()
defer q.mtx.Unlock()
querierWait := false
FindQueue:
// We need to wait if there are no users, or no pending requests for given querier.
// We need to wait if there are no tenants, or no pending requests for given querier.
for (q.queues.len() == 0 || querierWait) && ctx.Err() == nil && !q.stopped {
querierWait = false
q.cond.Wait(ctx)
@ -133,8 +132,8 @@ FindQueue:
}
for {
queue, userID, idx := q.queues.getNextQueueForQuerier(last.last, querierID)
last.last = idx
queue, tenant, idx := q.queues.getNextQueueForQuerier(last, querierID)
last = idx
if queue == nil {
break
}
@ -143,10 +142,10 @@ FindQueue:
for {
request := <-queue
if len(queue) == 0 {
q.queues.deleteQueue(userID)
q.queues.deleteQueue(tenant)
}
q.queueLength.WithLabelValues(userID).Dec()
q.queueLength.WithLabelValues(tenant).Dec()
// Tell close() we've processed a request.
q.cond.Broadcast()

@ -36,7 +36,7 @@ func BenchmarkGetNextRequest(b *testing.B) {
for j := 0; j < numTenants; j++ {
userID := strconv.Itoa(j)
err := queue.EnqueueRequest(userID, "request", 0, nil)
err := queue.Enqueue(userID, "request", 0, nil)
if err != nil {
b.Fatal(err)
}
@ -48,19 +48,19 @@ func BenchmarkGetNextRequest(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
idx := FirstUser()
idx := StartIndex
for j := 0; j < maxOutstandingPerTenant*numTenants; j++ {
querier := ""
b:
// Find querier with at least one request to avoid blocking in getNextRequestForQuerier.
for _, q := range queues[i].queues.userQueues {
for _, q := range queues[i].queues.queues {
for qid := range q.queriers {
querier = qid
break b
}
}
_, nidx, err := queues[i].GetNextRequestForQuerier(ctx, idx, querier)
_, nidx, err := queues[i].Dequeue(ctx, idx, querier)
if err != nil {
b.Fatal(err)
}
@ -100,7 +100,7 @@ func BenchmarkQueueRequest(b *testing.B) {
for n := 0; n < b.N; n++ {
for i := 0; i < maxOutstandingPerTenant; i++ {
for j := 0; j < numTenants; j++ {
err := queues[n].EnqueueRequest(users[j], requests[j], 0, nil)
err := queues[n].Enqueue(users[j], requests[j], 0, nil)
if err != nil {
b.Fatal(err)
}
@ -132,7 +132,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBe
querier2wg.Add(1)
go func() {
defer querier2wg.Done()
_, _, err := queue.GetNextRequestForQuerier(ctx, FirstUser(), "querier-2")
_, _, err := queue.Dequeue(ctx, StartIndex, "querier-2")
require.NoError(t, err)
}()
@ -141,7 +141,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBe
// Enqueue a request from an user which would be assigned to querier-1.
// NOTE: "user-1" hash falls in the querier-1 shard.
require.NoError(t, queue.EnqueueRequest("user-1", "request", 1, nil))
require.NoError(t, queue.Enqueue("user-1", "request", 1, nil))
startTime := time.Now()
querier2wg.Wait()

@ -25,15 +25,15 @@ type querier struct {
disconnectedAt time.Time
}
// This struct holds user queues for pending requests. It also keeps track of connected queriers,
// and mapping between users and queriers.
type queues struct {
userQueues map[string]*userQueue
// This struct holds tenant queues for pending requests. It also keeps track of connected queriers,
// and mapping between tenants and queriers.
type tenantQueues struct {
queues map[string]*tenantQueue
// List of all users with queues, used for iteration when searching for next queue to handle.
// Users removed from the middle are replaced with "". To avoid skipping users during iteration, we only shrink
// List of all tenants with queues, used for iteration when searching for next queue to handle.
// Tenants removed from the middle are replaced with "". To avoid skipping tenants during iteration, we only shrink
// this list when there are ""'s at the end of it.
users []string
tenants []string
maxUserQueueSize int
@ -48,8 +48,8 @@ type queues struct {
sortedQueriers []string
}
type userQueue struct {
ch chan Request
type tenantQueue struct {
ch RequestChannel
// If not nil, only these queriers can handle user requests. If nil, all queriers can.
// We set this to nil if number of available queriers <= maxQueriers.
@ -64,10 +64,10 @@ type userQueue struct {
index int
}
func newUserQueues(maxUserQueueSize int, forgetDelay time.Duration) *queues {
return &queues{
userQueues: map[string]*userQueue{},
users: nil,
func newTenantQueues(maxUserQueueSize int, forgetDelay time.Duration) *tenantQueues {
return &tenantQueues{
queues: map[string]*tenantQueue{},
tenants: nil,
maxUserQueueSize: maxUserQueueSize,
forgetDelay: forgetDelay,
queriers: map[string]*querier{},
@ -75,32 +75,32 @@ func newUserQueues(maxUserQueueSize int, forgetDelay time.Duration) *queues {
}
}
func (q *queues) len() int {
return len(q.userQueues)
func (q *tenantQueues) len() int {
return len(q.queues)
}
func (q *queues) deleteQueue(userID string) {
uq := q.userQueues[userID]
func (q *tenantQueues) deleteQueue(tenant string) {
uq := q.queues[tenant]
if uq == nil {
return
}
delete(q.userQueues, userID)
q.users[uq.index] = ""
delete(q.queues, tenant)
q.tenants[uq.index] = ""
// Shrink users list size if possible. This is safe, and no users will be skipped during iteration.
for ix := len(q.users) - 1; ix >= 0 && q.users[ix] == ""; ix-- {
q.users = q.users[:ix]
for ix := len(q.tenants) - 1; ix >= 0 && q.tenants[ix] == ""; ix-- {
q.tenants = q.tenants[:ix]
}
}
// Returns existing or new queue for user.
// MaxQueriers is used to compute which queriers should handle requests for this user.
// If maxQueriers is <= 0, all queriers can handle this user's requests.
// Returns existing or new queue for a tenant.
// MaxQueriers is used to compute which queriers should handle requests for this tenant.
// If maxQueriers is <= 0, all queriers can handle this tenant's requests.
// If maxQueriers has changed since the last call, queriers for this are recomputed.
func (q *queues) getOrAddQueue(userID string, maxQueriers int) chan Request {
// Empty user is not allowed, as that would break our users list ("" is used for free spot).
if userID == "" {
func (q *tenantQueues) getOrAddQueue(tenant string, maxQueriers int) RequestChannel {
// Empty tenant is not allowed, as that would break our tenants list ("" is used for free spot).
if tenant == "" {
return nil
}
@ -108,35 +108,35 @@ func (q *queues) getOrAddQueue(userID string, maxQueriers int) chan Request {
maxQueriers = 0
}
uq := q.userQueues[userID]
uq := q.queues[tenant]
if uq == nil {
uq = &userQueue{
ch: make(chan Request, q.maxUserQueueSize),
seed: util.ShuffleShardSeed(userID, ""),
uq = &tenantQueue{
ch: make(RequestChannel, q.maxUserQueueSize),
seed: util.ShuffleShardSeed(tenant, ""),
index: -1,
}
q.userQueues[userID] = uq
q.queues[tenant] = uq
// Add user to the list of users... find first free spot, and put it there.
for ix, u := range q.users {
for ix, u := range q.tenants {
if u == "" {
uq.index = ix
q.users[ix] = userID
q.tenants[ix] = tenant
break
}
}
// ... or add to the end.
if uq.index < 0 {
uq.index = len(q.users)
q.users = append(q.users, userID)
uq.index = len(q.tenants)
q.tenants = append(q.tenants, tenant)
}
}
if uq.maxQueriers != maxQueriers {
uq.maxQueriers = maxQueriers
uq.queriers = shuffleQueriersForUser(uq.seed, maxQueriers, q.sortedQueriers, nil)
uq.queriers = shuffleQueriersForTenants(uq.seed, maxQueriers, q.sortedQueriers, nil)
}
return uq.ch
@ -145,7 +145,7 @@ func (q *queues) getOrAddQueue(userID string, maxQueriers int) chan Request {
// Finds next queue for the querier. To support fair scheduling between users, client is expected
// to pass last user index returned by this function as argument. Is there was no previous
// last user index, use -1.
func (q *queues) getNextQueueForQuerier(lastUserIndex int, querierID string) (chan Request, string, int) {
func (q *tenantQueues) getNextQueueForQuerier(lastUserIndex QueueIndex, querierID string) (RequestChannel, string, QueueIndex) {
uid := lastUserIndex
// Ensure the querier is not shutting down. If the querier is shutting down, we shouldn't forward
@ -154,21 +154,21 @@ func (q *queues) getNextQueueForQuerier(lastUserIndex int, querierID string) (ch
return nil, "", uid
}
for iters := 0; iters < len(q.users); iters++ {
for iters := 0; iters < len(q.tenants); iters++ {
uid = uid + 1
// Don't use "mod len(q.users)", as that could skip users at the beginning of the list
// for example when q.users has shrunk since last call.
if uid >= len(q.users) {
if int(uid) >= len(q.tenants) {
uid = 0
}
u := q.users[uid]
u := q.tenants[uid]
if u == "" {
continue
}
q := q.userQueues[u]
q := q.queues[u]
if q.queriers != nil {
if _, ok := q.queriers[querierID]; !ok {
@ -182,7 +182,7 @@ func (q *queues) getNextQueueForQuerier(lastUserIndex int, querierID string) (ch
return nil, "", uid
}
func (q *queues) addQuerierConnection(querierID string) {
func (q *tenantQueues) addQuerierConnection(querierID string) {
info := q.queriers[querierID]
if info != nil {
info.connections++
@ -202,7 +202,7 @@ func (q *queues) addQuerierConnection(querierID string) {
q.recomputeUserQueriers()
}
func (q *queues) removeQuerierConnection(querierID string, now time.Time) {
func (q *tenantQueues) removeQuerierConnection(querierID string, now time.Time) {
info := q.queriers[querierID]
if info == nil || info.connections <= 0 {
panic("unexpected number of connections for querier")
@ -227,7 +227,7 @@ func (q *queues) removeQuerierConnection(querierID string, now time.Time) {
info.disconnectedAt = now
}
func (q *queues) removeQuerier(querierID string) {
func (q *tenantQueues) removeQuerier(querierID string) {
delete(q.queriers, querierID)
ix := sort.SearchStrings(q.sortedQueriers, querierID)
@ -241,7 +241,7 @@ func (q *queues) removeQuerier(querierID string) {
}
// notifyQuerierShutdown records that a querier has sent notification about a graceful shutdown.
func (q *queues) notifyQuerierShutdown(querierID string) {
func (q *tenantQueues) notifyQuerierShutdown(querierID string) {
info := q.queriers[querierID]
if info == nil {
// The querier may have already been removed, so we just ignore it.
@ -261,7 +261,7 @@ func (q *queues) notifyQuerierShutdown(querierID string) {
// forgetDisconnectedQueriers removes all disconnected queriers that have gone since at least
// the forget delay. Returns the number of forgotten queriers.
func (q *queues) forgetDisconnectedQueriers(now time.Time) int {
func (q *tenantQueues) forgetDisconnectedQueriers(now time.Time) int {
// Nothing to do if the forget delay is disabled.
if q.forgetDelay == 0 {
return 0
@ -281,18 +281,18 @@ func (q *queues) forgetDisconnectedQueriers(now time.Time) int {
return forgotten
}
func (q *queues) recomputeUserQueriers() {
func (q *tenantQueues) recomputeUserQueriers() {
scratchpad := make([]string, 0, len(q.sortedQueriers))
for _, uq := range q.userQueues {
uq.queriers = shuffleQueriersForUser(uq.seed, uq.maxQueriers, q.sortedQueriers, scratchpad)
for _, uq := range q.queues {
uq.queriers = shuffleQueriersForTenants(uq.seed, uq.maxQueriers, q.sortedQueriers, scratchpad)
}
}
// shuffleQueriersForUser returns nil if queriersToSelect is 0 or there are not enough queriers to select from.
// shuffleQueriersForTenants returns nil if queriersToSelect is 0 or there are not enough queriers to select from.
// In that case *all* queriers should be used.
// Scratchpad is used for shuffling, to avoid new allocations. If nil, new slice is allocated.
func shuffleQueriersForUser(userSeed int64, queriersToSelect int, allSortedQueriers []string, scratchpad []string) map[string]struct{} {
func shuffleQueriersForTenants(userSeed int64, queriersToSelect int, allSortedQueriers []string, scratchpad []string) map[string]struct{} {
if queriersToSelect == 0 || len(allSortedQueriers) <= queriersToSelect {
return nil
}

@ -18,7 +18,7 @@ import (
)
func TestQueues(t *testing.T) {
uq := newUserQueues(0, 0)
uq := newTenantQueues(0, 0)
assert.NotNil(t, uq)
assert.NoError(t, isConsistent(uq))
@ -76,7 +76,7 @@ func TestQueues(t *testing.T) {
}
func TestQueuesOnTerminatingQuerier(t *testing.T) {
uq := newUserQueues(0, 0)
uq := newTenantQueues(0, 0)
assert.NotNil(t, uq)
assert.NoError(t, isConsistent(uq))
@ -106,7 +106,7 @@ func TestQueuesOnTerminatingQuerier(t *testing.T) {
}
func TestQueuesWithQueriers(t *testing.T) {
uq := newUserQueues(0, 0)
uq := newTenantQueues(0, 0)
assert.NotNil(t, uq)
assert.NoError(t, isConsistent(uq))
@ -133,7 +133,7 @@ func TestQueuesWithQueriers(t *testing.T) {
getOrAdd(t, uq, uid, maxQueriersPerUser)
// Verify it has maxQueriersPerUser queriers assigned now.
qs := uq.userQueues[uid].queriers
qs := uq.queues[uid].queriers
assert.Equal(t, maxQueriersPerUser, len(qs))
}
@ -144,7 +144,7 @@ func TestQueuesWithQueriers(t *testing.T) {
for q := 0; q < queriers; q++ {
qid := fmt.Sprintf("querier-%d", q)
lastUserIndex := -1
lastUserIndex := StartIndex
for {
_, _, newIx := uq.getNextQueueForQuerier(lastUserIndex, qid)
if newIx < lastUserIndex {
@ -183,13 +183,13 @@ func TestQueuesConsistency(t *testing.T) {
for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
uq := newUserQueues(0, testData.forgetDelay)
uq := newTenantQueues(0, testData.forgetDelay)
assert.NotNil(t, uq)
assert.NoError(t, isConsistent(uq))
r := rand.New(rand.NewSource(time.Now().Unix()))
lastUserIndexes := map[string]int{}
lastUserIndexes := map[string]QueueIndex{}
conns := map[string]int{}
@ -232,7 +232,7 @@ func TestQueues_ForgetDelay(t *testing.T) {
)
now := time.Now()
uq := newUserQueues(0, forgetDelay)
uq := newTenantQueues(0, forgetDelay)
assert.NotNil(t, uq)
assert.NoError(t, isConsistent(uq))
@ -324,7 +324,7 @@ func TestQueues_ForgetDelay_ShouldCorrectlyHandleQuerierReconnectingBeforeForget
)
now := time.Now()
uq := newUserQueues(0, forgetDelay)
uq := newTenantQueues(0, forgetDelay)
assert.NotNil(t, uq)
assert.NoError(t, isConsistent(uq))
@ -397,7 +397,7 @@ func generateQuerier(r *rand.Rand) string {
return fmt.Sprint("querier-", r.Int()%5)
}
func getOrAdd(t *testing.T, uq *queues, tenant string, maxQueriers int) chan Request {
func getOrAdd(t *testing.T, uq *tenantQueues, tenant string, maxQueriers int) RequestChannel {
q := uq.getOrAddQueue(tenant, maxQueriers)
assert.NotNil(t, q)
assert.NoError(t, isConsistent(uq))
@ -405,8 +405,8 @@ func getOrAdd(t *testing.T, uq *queues, tenant string, maxQueriers int) chan Req
return q
}
func confirmOrderForQuerier(t *testing.T, uq *queues, querier string, lastUserIndex int, qs ...chan Request) int {
var n chan Request
func confirmOrderForQuerier(t *testing.T, uq *tenantQueues, querier string, lastUserIndex QueueIndex, qs ...RequestChannel) QueueIndex {
var n RequestChannel
for _, q := range qs {
n, _, lastUserIndex = uq.getNextQueueForQuerier(lastUserIndex, querier)
assert.Equal(t, q, n)
@ -415,14 +415,14 @@ func confirmOrderForQuerier(t *testing.T, uq *queues, querier string, lastUserIn
return lastUserIndex
}
func isConsistent(uq *queues) error {
func isConsistent(uq *tenantQueues) error {
if len(uq.sortedQueriers) != len(uq.queriers) {
return fmt.Errorf("inconsistent number of sorted queriers and querier connections")
}
uc := 0
for ix, u := range uq.users {
q := uq.userQueues[u]
for ix, u := range uq.tenants {
q := uq.queues[u]
if u != "" && q == nil {
return fmt.Errorf("user %s doesn't have queue", u)
}
@ -452,7 +452,7 @@ func isConsistent(uq *queues) error {
}
}
if uc != len(uq.userQueues) {
if uc != len(uq.queues) {
return fmt.Errorf("inconsistent number of users list and user queues")
}
@ -460,9 +460,9 @@ func isConsistent(uq *queues) error {
}
// getUsersByQuerier returns the list of users handled by the provided querierID.
func getUsersByQuerier(queues *queues, querierID string) []string {
func getUsersByQuerier(queues *tenantQueues, querierID string) []string {
var userIDs []string
for userID, q := range queues.userQueues {
for userID, q := range queues.queues {
if q.queriers == nil {
// If it's nil then all queriers can handle this user.
userIDs = append(userIDs, userID)
@ -478,14 +478,14 @@ func getUsersByQuerier(queues *queues, querierID string) []string {
func TestShuffleQueriers(t *testing.T) {
allQueriers := []string{"a", "b", "c", "d", "e"}
require.Nil(t, shuffleQueriersForUser(12345, 10, allQueriers, nil))
require.Nil(t, shuffleQueriersForUser(12345, len(allQueriers), allQueriers, nil))
require.Nil(t, shuffleQueriersForTenants(12345, 10, allQueriers, nil))
require.Nil(t, shuffleQueriersForTenants(12345, len(allQueriers), allQueriers, nil))
r1 := shuffleQueriersForUser(12345, 3, allQueriers, nil)
r1 := shuffleQueriersForTenants(12345, 3, allQueriers, nil)
require.Equal(t, 3, len(r1))
// Same input produces same output.
r2 := shuffleQueriersForUser(12345, 3, allQueriers, nil)
r2 := shuffleQueriersForTenants(12345, 3, allQueriers, nil)
require.Equal(t, 3, len(r2))
require.Equal(t, r1, r2)
}
@ -507,7 +507,7 @@ func TestShuffleQueriersCorrectness(t *testing.T) {
toSelect = 3
}
selected := shuffleQueriersForUser(r.Int63(), toSelect, allSortedQueriers, nil)
selected := shuffleQueriersForTenants(r.Int63(), toSelect, allSortedQueriers, nil)
require.Equal(t, toSelect, len(selected))

@ -32,7 +32,6 @@ import (
"github.com/grafana/loki/pkg/scheduler/queue"
"github.com/grafana/loki/pkg/scheduler/schedulerpb"
"github.com/grafana/loki/pkg/util"
lokiutil "github.com/grafana/loki/pkg/util"
lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc"
lokihttpreq "github.com/grafana/loki/pkg/util/httpreq"
util_log "github.com/grafana/loki/pkg/util/log"
@ -123,8 +122,8 @@ type Config struct {
QuerierForgetDelay time.Duration `yaml:"querier_forget_delay"`
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=This configures the gRPC client used to report errors back to the query-frontend."`
// Schedulers ring
UseSchedulerRing bool `yaml:"use_scheduler_ring"`
SchedulerRing lokiutil.RingConfig `yaml:"scheduler_ring,omitempty" doc:"description=The hash ring configuration. This option is required only if use_scheduler_ring is true."`
UseSchedulerRing bool `yaml:"use_scheduler_ring"`
SchedulerRing util.RingConfig `yaml:"scheduler_ring,omitempty" doc:"description=The hash ring configuration. This option is required only if use_scheduler_ring is true."`
}
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
@ -411,7 +410,7 @@ func (s *Scheduler) enqueueRequest(frontendContext context.Context, frontendAddr
maxQueriers := validation.SmallestPositiveNonZeroIntPerTenant(tenantIDs, s.limits.MaxQueriersPerUser)
s.activeUsers.UpdateUserTimestamp(userID, now)
return s.requestQueue.EnqueueRequest(userID, req, maxQueriers, func() {
return s.requestQueue.Enqueue(userID, req, maxQueriers, func() {
shouldCancel = false
s.pendingRequestsMu.Lock()
@ -446,15 +445,15 @@ func (s *Scheduler) QuerierLoop(querier schedulerpb.SchedulerForQuerier_QuerierL
s.requestQueue.RegisterQuerierConnection(querierID)
defer s.requestQueue.UnregisterQuerierConnection(querierID)
lastUserIndex := queue.FirstUser()
lastIndex := queue.StartIndex
// In stopping state scheduler is not accepting new queries, but still dispatching queries in the queues.
for s.isRunningOrStopping() {
req, idx, err := s.requestQueue.GetNextRequestForQuerier(querier.Context(), lastUserIndex, querierID)
req, idx, err := s.requestQueue.Dequeue(querier.Context(), lastIndex, querierID)
if err != nil {
return err
}
lastUserIndex = idx
lastIndex = idx
r := req.(*schedulerRequest)
@ -484,7 +483,7 @@ func (s *Scheduler) QuerierLoop(querier schedulerpb.SchedulerForQuerier_QuerierL
// Remove from pending requests.
s.cancelRequestAndRemoveFromPending(r.frontendAddress, r.queryID)
lastUserIndex = lastUserIndex.ReuseLastUser()
lastIndex = lastIndex.ReuseLastIndex()
continue
}
@ -659,7 +658,7 @@ func (s *Scheduler) running(ctx context.Context) error {
if !s.cfg.UseSchedulerRing {
continue
}
isInSet, err := lokiutil.IsInReplicationSet(s.ring, lokiutil.RingKeyOfLeader, s.ringLifecycler.GetInstanceAddr())
isInSet, err := util.IsInReplicationSet(s.ring, util.RingKeyOfLeader, s.ringLifecycler.GetInstanceAddr())
if err != nil {
level.Error(s.log).Log("msg", "failed to query the ring to see if scheduler instance is in ReplicatonSet, will try again", "err", err)
continue

Loading…
Cancel
Save