diff --git a/pkg/lokifrontend/frontend/v1/frontend.go b/pkg/lokifrontend/frontend/v1/frontend.go index 5e430e9436..d409820221 100644 --- a/pkg/lokifrontend/frontend/v1/frontend.go +++ b/pkg/lokifrontend/frontend/v1/frontend.go @@ -198,14 +198,14 @@ func (f *Frontend) Process(server frontendv1pb.Frontend_ProcessServer) error { f.requestQueue.RegisterQuerierConnection(querierID) defer f.requestQueue.UnregisterQuerierConnection(querierID) - lastUserIndex := queue.FirstUser() + lastIndex := queue.StartIndex for { - reqWrapper, idx, err := f.requestQueue.GetNextRequestForQuerier(server.Context(), lastUserIndex, querierID) + reqWrapper, idx, err := f.requestQueue.Dequeue(server.Context(), lastIndex, querierID) if err != nil { return err } - lastUserIndex = idx + lastIndex = idx req := reqWrapper.(*request) @@ -224,7 +224,7 @@ func (f *Frontend) Process(server frontendv1pb.Frontend_ProcessServer) error { it's possible that it's own queue would perpetually contain only expired requests. */ if req.originalCtx.Err() != nil { - lastUserIndex = lastUserIndex.ReuseLastUser() + lastIndex = lastIndex.ReuseLastIndex() continue } @@ -322,7 +322,7 @@ func (f *Frontend) queueRequest(ctx context.Context, req *request) error { joinedTenantID := tenant.JoinTenantIDs(tenantIDs) f.activeUsers.UpdateUserTimestamp(joinedTenantID, now) - err = f.requestQueue.EnqueueRequest(joinedTenantID, req, maxQueriers, nil) + err = f.requestQueue.Enqueue(joinedTenantID, req, maxQueriers, nil) if err == queue.ErrTooManyRequests { return errTooManyRequest } diff --git a/pkg/scheduler/queue/queue.go b/pkg/scheduler/queue/queue.go index 4d611dadb5..5a72ea8015 100644 --- a/pkg/scheduler/queue/queue.go +++ b/pkg/scheduler/queue/queue.go @@ -21,29 +21,28 @@ var ( ErrStopped = errors.New("queue is stopped") ) -// UserIndex is opaque type that allows to resume iteration over users between successive calls +// QueueIndex is opaque type that allows to resume iteration over tenants between successive calls // of RequestQueue.GetNextRequestForQuerier method. -type UserIndex struct { - last int -} +type QueueIndex int // nolint:revive -// Modify index to start iteration on the same user, for which last queue was returned. -func (ui UserIndex) ReuseLastUser() UserIndex { - if ui.last >= 0 { - return UserIndex{last: ui.last - 1} +// Modify index to start iteration on the same tenant, for which last queue was returned. +func (ui QueueIndex) ReuseLastIndex() QueueIndex { + if ui < 0 { + return ui } - return ui + return ui - 1 } -// FirstUser returns UserIndex that starts iteration over user queues from the very first user. -func FirstUser() UserIndex { - return UserIndex{last: -1} -} +// StartIndex is the UserIndex that starts iteration over tenant queues from the very first tenant. +var StartIndex QueueIndex = -1 // Request stored into the queue. -type Request interface{} +type Request any + +// RequestChannel is a channel that queues Requests +type RequestChannel chan Request -// RequestQueue holds incoming requests in per-user queues. It also assigns each user specified number of queriers, +// RequestQueue holds incoming requests in per-tenant queues. It also assigns each tenant specified number of queriers, // and when querier asks for next request to handle (using GetNextRequestForQuerier), it returns requests // in a fair fashion. type RequestQueue struct { @@ -53,16 +52,16 @@ type RequestQueue struct { mtx sync.Mutex cond contextCond // Notified when request is enqueued or dequeued, or querier is disconnected. - queues *queues + queues *tenantQueues stopped bool - queueLength *prometheus.GaugeVec // Per user and reason. - discardedRequests *prometheus.CounterVec // Per user. + queueLength *prometheus.GaugeVec // Per tenant and reason. + discardedRequests *prometheus.CounterVec // Per tenant. } func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec) *RequestQueue { q := &RequestQueue{ - queues: newUserQueues(maxOutstandingPerTenant, forgetDelay), + queues: newTenantQueues(maxOutstandingPerTenant, forgetDelay), connectedQuerierWorkers: atomic.NewInt32(0), queueLength: queueLength, discardedRequests: discardedRequests, @@ -74,12 +73,12 @@ func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, que return q } -// EnqueueRequest puts the request into the queue. MaxQueries is user-specific value that specifies how many queriers can -// this user use (zero or negative = all queriers). It is passed to each EnqueueRequest, because it can change +// Enqueue puts the request into the queue. MaxQueries is tenant-specific value that specifies how many queriers can +// this tenant use (zero or negative = all queriers). It is passed to each Enqueue, because it can change // between calls. // // If request is successfully enqueued, successFn is called with the lock held, before any querier can receive the request. -func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers int, successFn func()) error { +func (q *RequestQueue) Enqueue(tenant string, req Request, maxQueriers int, successFn func()) error { q.mtx.Lock() defer q.mtx.Unlock() @@ -87,15 +86,15 @@ func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers in return ErrStopped } - queue := q.queues.getOrAddQueue(userID, maxQueriers) + queue := q.queues.getOrAddQueue(tenant, maxQueriers) if queue == nil { - // This can only happen if userID is "". + // This can only happen if tenant is "". return errors.New("no queue found") } select { case queue <- req: - q.queueLength.WithLabelValues(userID).Inc() + q.queueLength.WithLabelValues(tenant).Inc() q.cond.Broadcast() // Call this function while holding a lock. This guarantees that no querier can fetch the request before function returns. if successFn != nil { @@ -103,22 +102,22 @@ func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers in } return nil default: - q.discardedRequests.WithLabelValues(userID).Inc() + q.discardedRequests.WithLabelValues(tenant).Inc() return ErrTooManyRequests } } -// GetNextRequestForQuerier find next user queue and takes the next request off of it. Will block if there are no requests. -// By passing user index from previous call of this method, querier guarantees that it iterates over all users fairly. -// If querier finds that request from the user is already expired, it can get a request for the same user by using UserIndex.ReuseLastUser. -func (q *RequestQueue) GetNextRequestForQuerier(ctx context.Context, last UserIndex, querierID string) (Request, UserIndex, error) { +// Dequeue find next tenant queue and takes the next request off of it. Will block if there are no requests. +// By passing tenant index from previous call of this method, querier guarantees that it iterates over all tenants fairly. +// If querier finds that request from the tenant is already expired, it can get a request for the same tenant by using UserIndex.ReuseLastUser. +func (q *RequestQueue) Dequeue(ctx context.Context, last QueueIndex, querierID string) (Request, QueueIndex, error) { q.mtx.Lock() defer q.mtx.Unlock() querierWait := false FindQueue: - // We need to wait if there are no users, or no pending requests for given querier. + // We need to wait if there are no tenants, or no pending requests for given querier. for (q.queues.len() == 0 || querierWait) && ctx.Err() == nil && !q.stopped { querierWait = false q.cond.Wait(ctx) @@ -133,8 +132,8 @@ FindQueue: } for { - queue, userID, idx := q.queues.getNextQueueForQuerier(last.last, querierID) - last.last = idx + queue, tenant, idx := q.queues.getNextQueueForQuerier(last, querierID) + last = idx if queue == nil { break } @@ -143,10 +142,10 @@ FindQueue: for { request := <-queue if len(queue) == 0 { - q.queues.deleteQueue(userID) + q.queues.deleteQueue(tenant) } - q.queueLength.WithLabelValues(userID).Dec() + q.queueLength.WithLabelValues(tenant).Dec() // Tell close() we've processed a request. q.cond.Broadcast() diff --git a/pkg/scheduler/queue/queue_test.go b/pkg/scheduler/queue/queue_test.go index ff9ac6e8bf..2e48c1335b 100644 --- a/pkg/scheduler/queue/queue_test.go +++ b/pkg/scheduler/queue/queue_test.go @@ -36,7 +36,7 @@ func BenchmarkGetNextRequest(b *testing.B) { for j := 0; j < numTenants; j++ { userID := strconv.Itoa(j) - err := queue.EnqueueRequest(userID, "request", 0, nil) + err := queue.Enqueue(userID, "request", 0, nil) if err != nil { b.Fatal(err) } @@ -48,19 +48,19 @@ func BenchmarkGetNextRequest(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - idx := FirstUser() + idx := StartIndex for j := 0; j < maxOutstandingPerTenant*numTenants; j++ { querier := "" b: // Find querier with at least one request to avoid blocking in getNextRequestForQuerier. - for _, q := range queues[i].queues.userQueues { + for _, q := range queues[i].queues.queues { for qid := range q.queriers { querier = qid break b } } - _, nidx, err := queues[i].GetNextRequestForQuerier(ctx, idx, querier) + _, nidx, err := queues[i].Dequeue(ctx, idx, querier) if err != nil { b.Fatal(err) } @@ -100,7 +100,7 @@ func BenchmarkQueueRequest(b *testing.B) { for n := 0; n < b.N; n++ { for i := 0; i < maxOutstandingPerTenant; i++ { for j := 0; j < numTenants; j++ { - err := queues[n].EnqueueRequest(users[j], requests[j], 0, nil) + err := queues[n].Enqueue(users[j], requests[j], 0, nil) if err != nil { b.Fatal(err) } @@ -132,7 +132,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBe querier2wg.Add(1) go func() { defer querier2wg.Done() - _, _, err := queue.GetNextRequestForQuerier(ctx, FirstUser(), "querier-2") + _, _, err := queue.Dequeue(ctx, StartIndex, "querier-2") require.NoError(t, err) }() @@ -141,7 +141,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBe // Enqueue a request from an user which would be assigned to querier-1. // NOTE: "user-1" hash falls in the querier-1 shard. - require.NoError(t, queue.EnqueueRequest("user-1", "request", 1, nil)) + require.NoError(t, queue.Enqueue("user-1", "request", 1, nil)) startTime := time.Now() querier2wg.Wait() diff --git a/pkg/scheduler/queue/user_queues.go b/pkg/scheduler/queue/tenant_queues.go similarity index 70% rename from pkg/scheduler/queue/user_queues.go rename to pkg/scheduler/queue/tenant_queues.go index a4fb409313..d9327432e5 100644 --- a/pkg/scheduler/queue/user_queues.go +++ b/pkg/scheduler/queue/tenant_queues.go @@ -25,15 +25,15 @@ type querier struct { disconnectedAt time.Time } -// This struct holds user queues for pending requests. It also keeps track of connected queriers, -// and mapping between users and queriers. -type queues struct { - userQueues map[string]*userQueue +// This struct holds tenant queues for pending requests. It also keeps track of connected queriers, +// and mapping between tenants and queriers. +type tenantQueues struct { + queues map[string]*tenantQueue - // List of all users with queues, used for iteration when searching for next queue to handle. - // Users removed from the middle are replaced with "". To avoid skipping users during iteration, we only shrink + // List of all tenants with queues, used for iteration when searching for next queue to handle. + // Tenants removed from the middle are replaced with "". To avoid skipping tenants during iteration, we only shrink // this list when there are ""'s at the end of it. - users []string + tenants []string maxUserQueueSize int @@ -48,8 +48,8 @@ type queues struct { sortedQueriers []string } -type userQueue struct { - ch chan Request +type tenantQueue struct { + ch RequestChannel // If not nil, only these queriers can handle user requests. If nil, all queriers can. // We set this to nil if number of available queriers <= maxQueriers. @@ -64,10 +64,10 @@ type userQueue struct { index int } -func newUserQueues(maxUserQueueSize int, forgetDelay time.Duration) *queues { - return &queues{ - userQueues: map[string]*userQueue{}, - users: nil, +func newTenantQueues(maxUserQueueSize int, forgetDelay time.Duration) *tenantQueues { + return &tenantQueues{ + queues: map[string]*tenantQueue{}, + tenants: nil, maxUserQueueSize: maxUserQueueSize, forgetDelay: forgetDelay, queriers: map[string]*querier{}, @@ -75,32 +75,32 @@ func newUserQueues(maxUserQueueSize int, forgetDelay time.Duration) *queues { } } -func (q *queues) len() int { - return len(q.userQueues) +func (q *tenantQueues) len() int { + return len(q.queues) } -func (q *queues) deleteQueue(userID string) { - uq := q.userQueues[userID] +func (q *tenantQueues) deleteQueue(tenant string) { + uq := q.queues[tenant] if uq == nil { return } - delete(q.userQueues, userID) - q.users[uq.index] = "" + delete(q.queues, tenant) + q.tenants[uq.index] = "" // Shrink users list size if possible. This is safe, and no users will be skipped during iteration. - for ix := len(q.users) - 1; ix >= 0 && q.users[ix] == ""; ix-- { - q.users = q.users[:ix] + for ix := len(q.tenants) - 1; ix >= 0 && q.tenants[ix] == ""; ix-- { + q.tenants = q.tenants[:ix] } } -// Returns existing or new queue for user. -// MaxQueriers is used to compute which queriers should handle requests for this user. -// If maxQueriers is <= 0, all queriers can handle this user's requests. +// Returns existing or new queue for a tenant. +// MaxQueriers is used to compute which queriers should handle requests for this tenant. +// If maxQueriers is <= 0, all queriers can handle this tenant's requests. // If maxQueriers has changed since the last call, queriers for this are recomputed. -func (q *queues) getOrAddQueue(userID string, maxQueriers int) chan Request { - // Empty user is not allowed, as that would break our users list ("" is used for free spot). - if userID == "" { +func (q *tenantQueues) getOrAddQueue(tenant string, maxQueriers int) RequestChannel { + // Empty tenant is not allowed, as that would break our tenants list ("" is used for free spot). + if tenant == "" { return nil } @@ -108,35 +108,35 @@ func (q *queues) getOrAddQueue(userID string, maxQueriers int) chan Request { maxQueriers = 0 } - uq := q.userQueues[userID] + uq := q.queues[tenant] if uq == nil { - uq = &userQueue{ - ch: make(chan Request, q.maxUserQueueSize), - seed: util.ShuffleShardSeed(userID, ""), + uq = &tenantQueue{ + ch: make(RequestChannel, q.maxUserQueueSize), + seed: util.ShuffleShardSeed(tenant, ""), index: -1, } - q.userQueues[userID] = uq + q.queues[tenant] = uq // Add user to the list of users... find first free spot, and put it there. - for ix, u := range q.users { + for ix, u := range q.tenants { if u == "" { uq.index = ix - q.users[ix] = userID + q.tenants[ix] = tenant break } } // ... or add to the end. if uq.index < 0 { - uq.index = len(q.users) - q.users = append(q.users, userID) + uq.index = len(q.tenants) + q.tenants = append(q.tenants, tenant) } } if uq.maxQueriers != maxQueriers { uq.maxQueriers = maxQueriers - uq.queriers = shuffleQueriersForUser(uq.seed, maxQueriers, q.sortedQueriers, nil) + uq.queriers = shuffleQueriersForTenants(uq.seed, maxQueriers, q.sortedQueriers, nil) } return uq.ch @@ -145,7 +145,7 @@ func (q *queues) getOrAddQueue(userID string, maxQueriers int) chan Request { // Finds next queue for the querier. To support fair scheduling between users, client is expected // to pass last user index returned by this function as argument. Is there was no previous // last user index, use -1. -func (q *queues) getNextQueueForQuerier(lastUserIndex int, querierID string) (chan Request, string, int) { +func (q *tenantQueues) getNextQueueForQuerier(lastUserIndex QueueIndex, querierID string) (RequestChannel, string, QueueIndex) { uid := lastUserIndex // Ensure the querier is not shutting down. If the querier is shutting down, we shouldn't forward @@ -154,21 +154,21 @@ func (q *queues) getNextQueueForQuerier(lastUserIndex int, querierID string) (ch return nil, "", uid } - for iters := 0; iters < len(q.users); iters++ { + for iters := 0; iters < len(q.tenants); iters++ { uid = uid + 1 // Don't use "mod len(q.users)", as that could skip users at the beginning of the list // for example when q.users has shrunk since last call. - if uid >= len(q.users) { + if int(uid) >= len(q.tenants) { uid = 0 } - u := q.users[uid] + u := q.tenants[uid] if u == "" { continue } - q := q.userQueues[u] + q := q.queues[u] if q.queriers != nil { if _, ok := q.queriers[querierID]; !ok { @@ -182,7 +182,7 @@ func (q *queues) getNextQueueForQuerier(lastUserIndex int, querierID string) (ch return nil, "", uid } -func (q *queues) addQuerierConnection(querierID string) { +func (q *tenantQueues) addQuerierConnection(querierID string) { info := q.queriers[querierID] if info != nil { info.connections++ @@ -202,7 +202,7 @@ func (q *queues) addQuerierConnection(querierID string) { q.recomputeUserQueriers() } -func (q *queues) removeQuerierConnection(querierID string, now time.Time) { +func (q *tenantQueues) removeQuerierConnection(querierID string, now time.Time) { info := q.queriers[querierID] if info == nil || info.connections <= 0 { panic("unexpected number of connections for querier") @@ -227,7 +227,7 @@ func (q *queues) removeQuerierConnection(querierID string, now time.Time) { info.disconnectedAt = now } -func (q *queues) removeQuerier(querierID string) { +func (q *tenantQueues) removeQuerier(querierID string) { delete(q.queriers, querierID) ix := sort.SearchStrings(q.sortedQueriers, querierID) @@ -241,7 +241,7 @@ func (q *queues) removeQuerier(querierID string) { } // notifyQuerierShutdown records that a querier has sent notification about a graceful shutdown. -func (q *queues) notifyQuerierShutdown(querierID string) { +func (q *tenantQueues) notifyQuerierShutdown(querierID string) { info := q.queriers[querierID] if info == nil { // The querier may have already been removed, so we just ignore it. @@ -261,7 +261,7 @@ func (q *queues) notifyQuerierShutdown(querierID string) { // forgetDisconnectedQueriers removes all disconnected queriers that have gone since at least // the forget delay. Returns the number of forgotten queriers. -func (q *queues) forgetDisconnectedQueriers(now time.Time) int { +func (q *tenantQueues) forgetDisconnectedQueriers(now time.Time) int { // Nothing to do if the forget delay is disabled. if q.forgetDelay == 0 { return 0 @@ -281,18 +281,18 @@ func (q *queues) forgetDisconnectedQueriers(now time.Time) int { return forgotten } -func (q *queues) recomputeUserQueriers() { +func (q *tenantQueues) recomputeUserQueriers() { scratchpad := make([]string, 0, len(q.sortedQueriers)) - for _, uq := range q.userQueues { - uq.queriers = shuffleQueriersForUser(uq.seed, uq.maxQueriers, q.sortedQueriers, scratchpad) + for _, uq := range q.queues { + uq.queriers = shuffleQueriersForTenants(uq.seed, uq.maxQueriers, q.sortedQueriers, scratchpad) } } -// shuffleQueriersForUser returns nil if queriersToSelect is 0 or there are not enough queriers to select from. +// shuffleQueriersForTenants returns nil if queriersToSelect is 0 or there are not enough queriers to select from. // In that case *all* queriers should be used. // Scratchpad is used for shuffling, to avoid new allocations. If nil, new slice is allocated. -func shuffleQueriersForUser(userSeed int64, queriersToSelect int, allSortedQueriers []string, scratchpad []string) map[string]struct{} { +func shuffleQueriersForTenants(userSeed int64, queriersToSelect int, allSortedQueriers []string, scratchpad []string) map[string]struct{} { if queriersToSelect == 0 || len(allSortedQueriers) <= queriersToSelect { return nil } diff --git a/pkg/scheduler/queue/user_queues_test.go b/pkg/scheduler/queue/tenant_queues_test.go similarity index 92% rename from pkg/scheduler/queue/user_queues_test.go rename to pkg/scheduler/queue/tenant_queues_test.go index 3c5f9b3814..4f18aae6a0 100644 --- a/pkg/scheduler/queue/user_queues_test.go +++ b/pkg/scheduler/queue/tenant_queues_test.go @@ -18,7 +18,7 @@ import ( ) func TestQueues(t *testing.T) { - uq := newUserQueues(0, 0) + uq := newTenantQueues(0, 0) assert.NotNil(t, uq) assert.NoError(t, isConsistent(uq)) @@ -76,7 +76,7 @@ func TestQueues(t *testing.T) { } func TestQueuesOnTerminatingQuerier(t *testing.T) { - uq := newUserQueues(0, 0) + uq := newTenantQueues(0, 0) assert.NotNil(t, uq) assert.NoError(t, isConsistent(uq)) @@ -106,7 +106,7 @@ func TestQueuesOnTerminatingQuerier(t *testing.T) { } func TestQueuesWithQueriers(t *testing.T) { - uq := newUserQueues(0, 0) + uq := newTenantQueues(0, 0) assert.NotNil(t, uq) assert.NoError(t, isConsistent(uq)) @@ -133,7 +133,7 @@ func TestQueuesWithQueriers(t *testing.T) { getOrAdd(t, uq, uid, maxQueriersPerUser) // Verify it has maxQueriersPerUser queriers assigned now. - qs := uq.userQueues[uid].queriers + qs := uq.queues[uid].queriers assert.Equal(t, maxQueriersPerUser, len(qs)) } @@ -144,7 +144,7 @@ func TestQueuesWithQueriers(t *testing.T) { for q := 0; q < queriers; q++ { qid := fmt.Sprintf("querier-%d", q) - lastUserIndex := -1 + lastUserIndex := StartIndex for { _, _, newIx := uq.getNextQueueForQuerier(lastUserIndex, qid) if newIx < lastUserIndex { @@ -183,13 +183,13 @@ func TestQueuesConsistency(t *testing.T) { for testName, testData := range tests { t.Run(testName, func(t *testing.T) { - uq := newUserQueues(0, testData.forgetDelay) + uq := newTenantQueues(0, testData.forgetDelay) assert.NotNil(t, uq) assert.NoError(t, isConsistent(uq)) r := rand.New(rand.NewSource(time.Now().Unix())) - lastUserIndexes := map[string]int{} + lastUserIndexes := map[string]QueueIndex{} conns := map[string]int{} @@ -232,7 +232,7 @@ func TestQueues_ForgetDelay(t *testing.T) { ) now := time.Now() - uq := newUserQueues(0, forgetDelay) + uq := newTenantQueues(0, forgetDelay) assert.NotNil(t, uq) assert.NoError(t, isConsistent(uq)) @@ -324,7 +324,7 @@ func TestQueues_ForgetDelay_ShouldCorrectlyHandleQuerierReconnectingBeforeForget ) now := time.Now() - uq := newUserQueues(0, forgetDelay) + uq := newTenantQueues(0, forgetDelay) assert.NotNil(t, uq) assert.NoError(t, isConsistent(uq)) @@ -397,7 +397,7 @@ func generateQuerier(r *rand.Rand) string { return fmt.Sprint("querier-", r.Int()%5) } -func getOrAdd(t *testing.T, uq *queues, tenant string, maxQueriers int) chan Request { +func getOrAdd(t *testing.T, uq *tenantQueues, tenant string, maxQueriers int) RequestChannel { q := uq.getOrAddQueue(tenant, maxQueriers) assert.NotNil(t, q) assert.NoError(t, isConsistent(uq)) @@ -405,8 +405,8 @@ func getOrAdd(t *testing.T, uq *queues, tenant string, maxQueriers int) chan Req return q } -func confirmOrderForQuerier(t *testing.T, uq *queues, querier string, lastUserIndex int, qs ...chan Request) int { - var n chan Request +func confirmOrderForQuerier(t *testing.T, uq *tenantQueues, querier string, lastUserIndex QueueIndex, qs ...RequestChannel) QueueIndex { + var n RequestChannel for _, q := range qs { n, _, lastUserIndex = uq.getNextQueueForQuerier(lastUserIndex, querier) assert.Equal(t, q, n) @@ -415,14 +415,14 @@ func confirmOrderForQuerier(t *testing.T, uq *queues, querier string, lastUserIn return lastUserIndex } -func isConsistent(uq *queues) error { +func isConsistent(uq *tenantQueues) error { if len(uq.sortedQueriers) != len(uq.queriers) { return fmt.Errorf("inconsistent number of sorted queriers and querier connections") } uc := 0 - for ix, u := range uq.users { - q := uq.userQueues[u] + for ix, u := range uq.tenants { + q := uq.queues[u] if u != "" && q == nil { return fmt.Errorf("user %s doesn't have queue", u) } @@ -452,7 +452,7 @@ func isConsistent(uq *queues) error { } } - if uc != len(uq.userQueues) { + if uc != len(uq.queues) { return fmt.Errorf("inconsistent number of users list and user queues") } @@ -460,9 +460,9 @@ func isConsistent(uq *queues) error { } // getUsersByQuerier returns the list of users handled by the provided querierID. -func getUsersByQuerier(queues *queues, querierID string) []string { +func getUsersByQuerier(queues *tenantQueues, querierID string) []string { var userIDs []string - for userID, q := range queues.userQueues { + for userID, q := range queues.queues { if q.queriers == nil { // If it's nil then all queriers can handle this user. userIDs = append(userIDs, userID) @@ -478,14 +478,14 @@ func getUsersByQuerier(queues *queues, querierID string) []string { func TestShuffleQueriers(t *testing.T) { allQueriers := []string{"a", "b", "c", "d", "e"} - require.Nil(t, shuffleQueriersForUser(12345, 10, allQueriers, nil)) - require.Nil(t, shuffleQueriersForUser(12345, len(allQueriers), allQueriers, nil)) + require.Nil(t, shuffleQueriersForTenants(12345, 10, allQueriers, nil)) + require.Nil(t, shuffleQueriersForTenants(12345, len(allQueriers), allQueriers, nil)) - r1 := shuffleQueriersForUser(12345, 3, allQueriers, nil) + r1 := shuffleQueriersForTenants(12345, 3, allQueriers, nil) require.Equal(t, 3, len(r1)) // Same input produces same output. - r2 := shuffleQueriersForUser(12345, 3, allQueriers, nil) + r2 := shuffleQueriersForTenants(12345, 3, allQueriers, nil) require.Equal(t, 3, len(r2)) require.Equal(t, r1, r2) } @@ -507,7 +507,7 @@ func TestShuffleQueriersCorrectness(t *testing.T) { toSelect = 3 } - selected := shuffleQueriersForUser(r.Int63(), toSelect, allSortedQueriers, nil) + selected := shuffleQueriersForTenants(r.Int63(), toSelect, allSortedQueriers, nil) require.Equal(t, toSelect, len(selected)) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 6610ffe1e9..c81e393372 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -32,7 +32,6 @@ import ( "github.com/grafana/loki/pkg/scheduler/queue" "github.com/grafana/loki/pkg/scheduler/schedulerpb" "github.com/grafana/loki/pkg/util" - lokiutil "github.com/grafana/loki/pkg/util" lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc" lokihttpreq "github.com/grafana/loki/pkg/util/httpreq" util_log "github.com/grafana/loki/pkg/util/log" @@ -123,8 +122,8 @@ type Config struct { QuerierForgetDelay time.Duration `yaml:"querier_forget_delay"` GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=This configures the gRPC client used to report errors back to the query-frontend."` // Schedulers ring - UseSchedulerRing bool `yaml:"use_scheduler_ring"` - SchedulerRing lokiutil.RingConfig `yaml:"scheduler_ring,omitempty" doc:"description=The hash ring configuration. This option is required only if use_scheduler_ring is true."` + UseSchedulerRing bool `yaml:"use_scheduler_ring"` + SchedulerRing util.RingConfig `yaml:"scheduler_ring,omitempty" doc:"description=The hash ring configuration. This option is required only if use_scheduler_ring is true."` } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { @@ -411,7 +410,7 @@ func (s *Scheduler) enqueueRequest(frontendContext context.Context, frontendAddr maxQueriers := validation.SmallestPositiveNonZeroIntPerTenant(tenantIDs, s.limits.MaxQueriersPerUser) s.activeUsers.UpdateUserTimestamp(userID, now) - return s.requestQueue.EnqueueRequest(userID, req, maxQueriers, func() { + return s.requestQueue.Enqueue(userID, req, maxQueriers, func() { shouldCancel = false s.pendingRequestsMu.Lock() @@ -446,15 +445,15 @@ func (s *Scheduler) QuerierLoop(querier schedulerpb.SchedulerForQuerier_QuerierL s.requestQueue.RegisterQuerierConnection(querierID) defer s.requestQueue.UnregisterQuerierConnection(querierID) - lastUserIndex := queue.FirstUser() + lastIndex := queue.StartIndex // In stopping state scheduler is not accepting new queries, but still dispatching queries in the queues. for s.isRunningOrStopping() { - req, idx, err := s.requestQueue.GetNextRequestForQuerier(querier.Context(), lastUserIndex, querierID) + req, idx, err := s.requestQueue.Dequeue(querier.Context(), lastIndex, querierID) if err != nil { return err } - lastUserIndex = idx + lastIndex = idx r := req.(*schedulerRequest) @@ -484,7 +483,7 @@ func (s *Scheduler) QuerierLoop(querier schedulerpb.SchedulerForQuerier_QuerierL // Remove from pending requests. s.cancelRequestAndRemoveFromPending(r.frontendAddress, r.queryID) - lastUserIndex = lastUserIndex.ReuseLastUser() + lastIndex = lastIndex.ReuseLastIndex() continue } @@ -659,7 +658,7 @@ func (s *Scheduler) running(ctx context.Context) error { if !s.cfg.UseSchedulerRing { continue } - isInSet, err := lokiutil.IsInReplicationSet(s.ring, lokiutil.RingKeyOfLeader, s.ringLifecycler.GetInstanceAddr()) + isInSet, err := util.IsInReplicationSet(s.ring, util.RingKeyOfLeader, s.ringLifecycler.GetInstanceAddr()) if err != nil { level.Error(s.log).Log("msg", "failed to query the ring to see if scheduler instance is in ReplicatonSet, will try again", "err", err) continue