Introduce worker queue in bloom gateway (#10976)

Instead of calling the bloom store directly on each and every request to filter chunk refs based on the given filters, we want to queue requests in per-tenant queues and process batches of requests that can be multiplexed to avoid excessive seeking in the bloom block queriers when checking chunk matches.

This PR re-uses the request queue implementation used in the query scheduler. To do so, it moves the queue related code from the scheduler into a separate package `pkg/queue` and renames occurrences of "querier" to "consumer" to be more generic.

The bloom gateway instantiates the request queue when starting the service. The gRPC method `FilterChunkRefs` then enqueues incoming requests to that queue.

**Special notes for your reviewer**:

For testing purposes, this PR also contains a dummy implementation of the workers. The worker implementation - which includes multiplexing of multiple tasks - is subject to a separate PR.

---------

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/10983/head
Christian Haudum 2 years ago committed by GitHub
parent 9474be0fef
commit 27411cff08
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      go.mod
  2. 261
      pkg/bloomgateway/bloomgateway.go
  3. 53
      pkg/bloomgateway/bloomgateway_test.go
  4. 12
      pkg/lokifrontend/frontend/v1/frontend.go
  5. 4
      pkg/lokifrontend/frontend/v1/frontend_test.go
  6. 12
      pkg/queue/dequeue_qos_test.go
  7. 0
      pkg/queue/mapping.go
  8. 0
      pkg/queue/mapping_test.go
  9. 0
      pkg/queue/metrics.go
  10. 42
      pkg/queue/queue.go
  11. 12
      pkg/queue/queue_test.go
  12. 144
      pkg/queue/tenant_queues.go
  13. 108
      pkg/queue/tenant_queues_test.go
  14. 0
      pkg/queue/treequeue.go
  15. 0
      pkg/queue/treequeue_test.go
  16. 10
      pkg/scheduler/scheduler.go
  17. 10
      pkg/util/active_user.go

@ -74,7 +74,7 @@ require (
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f
github.com/ncw/swift v1.0.53
github.com/oklog/run v1.1.0
github.com/oklog/ulid v1.3.1 // indirect
github.com/oklog/ulid v1.3.1
github.com/opentracing-contrib/go-grpc v0.0.0-20210225150812-73cb765af46e
github.com/opentracing-contrib/go-stdlib v1.0.0
github.com/opentracing/opentracing-go v1.2.0

@ -39,27 +39,130 @@ package bloomgateway
import (
"context"
"fmt"
"sort"
"sync"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/tenant"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/queue"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/pkg/util"
)
var errGatewayUnhealthy = errors.New("bloom-gateway is unhealthy in the ring")
var errInvalidTenant = errors.New("invalid tenant in chunk refs")
type metrics struct{}
// TODO(chaudum): Make these configurable
const (
numWorkers = 4
maxTasksPerTenant = 1024
pendingTasksInitialCap = 1024
)
func newMetrics(r prometheus.Registerer) *metrics {
return &metrics{}
type metrics struct {
queueDuration prometheus.Histogram
inflightRequests prometheus.Summary
}
func newMetrics(subsystem string, registerer prometheus.Registerer) *metrics {
return &metrics{
queueDuration: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Subsystem: subsystem,
Name: "queue_duration_seconds",
Help: "Time spent by tasks in queue before getting picked up by a worker.",
Buckets: prometheus.DefBuckets,
}),
inflightRequests: promauto.With(registerer).NewSummary(prometheus.SummaryOpts{
Namespace: "loki",
Subsystem: subsystem,
Name: "inflight_tasks",
Help: "Number of inflight tasks (either queued or processing) sampled at a regular interval. Quantile buckets keep track of inflight tasks over the last 60s.",
Objectives: map[float64]float64{0.5: 0.05, 0.75: 0.02, 0.8: 0.02, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001},
MaxAge: time.Minute,
AgeBuckets: 6,
}),
}
}
// Task is the data structure that is enqueued to the internal queue and queued by query workers
type Task struct {
// ID is a lexcographically sortable unique identifier of the task
ID ulid.ULID
// Tenant is the tenant ID
Tenant string
// Request is the original request
Request *logproto.FilterChunkRefRequest
// ErrCh is a send-only channel to write an error to
ErrCh chan<- error
// ResCh is a send-only channel to write partial responses to
ResCh chan<- *logproto.GroupedChunkRefs
}
// newTask returns a new Task that can be enqueued to the task queue.
// As additional arguments, it returns a result and an error channel, as well
// as an error if the instantiation fails.
func newTask(tenantID string, req *logproto.FilterChunkRefRequest) (Task, chan *logproto.GroupedChunkRefs, chan error, error) {
key, err := ulid.New(ulid.Now(), nil)
if err != nil {
return Task{}, nil, nil, err
}
errCh := make(chan error, 1)
resCh := make(chan *logproto.GroupedChunkRefs, 1)
task := Task{
ID: key,
Tenant: tenantID,
Request: req,
ErrCh: errCh,
ResCh: resCh,
}
return task, resCh, errCh, nil
}
// SyncMap is a map structure which can be synchronized using the RWMutex
type SyncMap[k comparable, v any] struct {
sync.RWMutex
Map map[k]v
}
type pendingTasks SyncMap[ulid.ULID, Task]
func (t *pendingTasks) Len() int {
t.RLock()
defer t.Unlock()
return len(t.Map)
}
func (t *pendingTasks) Add(k ulid.ULID, v Task) {
t.Lock()
t.Map[k] = v
t.Unlock()
}
func (t *pendingTasks) Delete(k ulid.ULID) {
t.Lock()
delete(t.Map, k)
t.Unlock()
}
// makePendingTasks creates a SyncMap that holds pending tasks
func makePendingTasks(n int) *pendingTasks {
return &pendingTasks{
RWMutex: sync.RWMutex{},
Map: make(map[ulid.ULID]Task, n),
}
}
type Gateway struct {
@ -69,20 +172,33 @@ type Gateway struct {
logger log.Logger
metrics *metrics
bloomStore bloomshipper.Store
queue *queue.RequestQueue
queueMetrics *queue.Metrics
activeUsers *util.ActiveUsersCleanupService
bloomStore bloomshipper.Store
sharding ShardingStrategy
pendingTasks *pendingTasks
serviceMngr *services.Manager
serviceWatcher *services.FailureWatcher
}
// New returns a new instance of the Bloom Gateway.
func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, shardingStrategy ShardingStrategy, cm storage.ClientMetrics, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) {
g := &Gateway{
cfg: cfg,
logger: logger,
metrics: newMetrics(reg),
sharding: shardingStrategy,
cfg: cfg,
logger: logger,
metrics: newMetrics("bloom_gateway", reg),
sharding: shardingStrategy,
pendingTasks: makePendingTasks(pendingTasksInitialCap),
}
g.queueMetrics = queue.NewMetrics("bloom_gateway", reg)
g.queue = queue.NewRequestQueue(maxTasksPerTenant, time.Minute, g.queueMetrics)
g.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(g.queueMetrics.Cleanup)
client, err := bloomshipper.NewBloomClient(schemaCfg.Configs, storageCfg, cm)
if err != nil {
return nil, err
@ -99,18 +215,112 @@ func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, s
}
g.bloomStore = bloomStore
g.Service = services.NewIdleService(g.starting, g.stopping)
svcs := []services.Service{g.queue, g.activeUsers}
g.serviceMngr, err = services.NewManager(svcs...)
if err != nil {
return nil, err
}
g.serviceWatcher = services.NewFailureWatcher()
g.serviceWatcher.WatchManager(g.serviceMngr)
g.Service = services.NewBasicService(g.starting, g.running, g.stopping).WithName("bloom-gateway")
return g, nil
}
func (g *Gateway) starting(ctx context.Context) error {
var err error
defer func() {
if err == nil || g.serviceMngr == nil {
return
}
if err := services.StopManagerAndAwaitStopped(context.Background(), g.serviceMngr); err != nil {
level.Error(g.logger).Log("msg", "failed to gracefully stop bloom gateway dependencies", "err", err)
}
}()
if err := services.StartManagerAndAwaitHealthy(ctx, g.serviceMngr); err != nil {
return errors.Wrap(err, "unable to start bloom gateway subservices")
}
for i := 0; i < numWorkers; i++ {
go g.startWorker(ctx, fmt.Sprintf("worker-%d", i))
}
return nil
}
func (g *Gateway) running(ctx context.Context) error {
// We observe inflight tasks frequently and at regular intervals, to have a good
// approximation of max inflight tasks over percentiles of time. We also do it with
// a ticker so that we keep tracking it even if we have no new requests but stuck inflight
// tasks (eg. worker are all exhausted).
inflightTasksTicker := time.NewTicker(250 * time.Millisecond)
defer inflightTasksTicker.Stop()
for {
select {
case <-ctx.Done():
return nil
case err := <-g.serviceWatcher.Chan():
return errors.Wrap(err, "bloom gateway subservice failed")
case <-inflightTasksTicker.C:
inflight := g.pendingTasks.Len()
g.metrics.inflightRequests.Observe(float64(inflight))
}
}
}
func (g *Gateway) stopping(_ error) error {
g.bloomStore.Stop()
return nil
return services.StopManagerAndAwaitStopped(context.Background(), g.serviceMngr)
}
// This is just a dummy implementation of the worker!
// TODO(chaudum): Implement worker that dequeues multiple pending tasks and
// multiplexes them prior to execution.
func (g *Gateway) startWorker(_ context.Context, id string) error {
level.Info(g.logger).Log("msg", "starting worker", "worker", id)
g.queue.RegisterConsumerConnection(id)
defer g.queue.UnregisterConsumerConnection(id)
idx := queue.StartIndexWithLocalQueue
for {
ctx := context.Background()
item, newIdx, err := g.queue.Dequeue(ctx, idx, id)
if err != nil {
if err != queue.ErrStopped {
level.Error(g.logger).Log("msg", "failed to dequeue task", "worker", id, "err", err)
continue
}
level.Info(g.logger).Log("msg", "stopping worker", "worker", id)
return err
}
task, ok := item.(Task)
if !ok {
level.Error(g.logger).Log("msg", "failed to cast to Task", "item", item)
continue
}
idx = newIdx
level.Info(g.logger).Log("msg", "dequeued task", "worker", id, "task", task.ID)
g.pendingTasks.Delete(task.ID)
r := task.Request
if len(r.Filters) > 0 {
r.Refs, err = g.bloomStore.FilterChunkRefs(ctx, task.Tenant, r.From.Time(), r.Through.Time(), r.Refs, r.Filters...)
}
if err != nil {
task.ErrCh <- err
} else {
for _, ref := range r.Refs {
task.ResCh <- ref
}
}
}
}
// FilterChunkRefs implements BloomGatewayServer
@ -131,15 +341,32 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
return req.Refs[i].Fingerprint < req.Refs[j].Fingerprint
})
chunkRefs := req.Refs
task, resCh, errCh, err := newTask(tenantID, req)
if err != nil {
return nil, err
}
g.activeUsers.UpdateUserTimestamp(tenantID, time.Now())
level.Info(g.logger).Log("msg", "enqueue task", "task", task.ID)
g.queue.Enqueue(tenantID, []string{}, task, 100, func() {
// When enqueuing, we also add the task to the pending tasks
g.pendingTasks.Add(task.ID, task)
})
// Only query bloom filters if filters are present
if len(req.Filters) > 0 {
chunkRefs, err = g.bloomStore.FilterChunkRefs(ctx, tenantID, req.From.Time(), req.Through.Time(), req.Refs, req.Filters...)
if err != nil {
response := make([]*logproto.GroupedChunkRefs, 0, len(req.Refs))
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case err := <-errCh:
return nil, err
case res := <-resCh:
level.Info(g.logger).Log("msg", "got result", "task", task.ID, "tenant", tenantID, "res", res)
// wait for all parts of the full response
response = append(response, res)
if len(response) == len(req.Refs) {
return &logproto.FilterChunkRefResponse{ChunkRefs: response}, nil
}
}
}
return &logproto.FilterChunkRefResponse{ChunkRefs: chunkRefs}, nil
}

@ -2,6 +2,7 @@ package bloomgateway
import (
"context"
"os"
"testing"
"time"
@ -87,6 +88,10 @@ func TestBloomGateway_StartStopService(t *testing.T) {
err = services.StartAndAwaitRunning(context.Background(), gw)
require.NoError(t, err)
// Wait for workers to connect to queue
time.Sleep(50 * time.Millisecond)
require.Equal(t, float64(numWorkers), gw.queue.GetConnectedConsumersMetric())
err = services.StopAndAwaitTerminated(context.Background(), gw)
require.NoError(t, err)
})
@ -96,7 +101,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
tenantID := "test"
ss := NewNoopStrategy()
logger := log.NewNopLogger()
logger := log.NewLogfmtLogger(os.Stderr)
reg := prometheus.NewRegistry()
cm := storage.NewClientMetrics()
@ -136,9 +141,17 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
}
t.Run("returns unfiltered chunk refs if no filters provided", func(t *testing.T) {
reg := prometheus.NewRegistry()
gw, err := New(cfg, schemaCfg, storageCfg, ss, cm, logger, reg)
require.NoError(t, err)
err = services.StartAndAwaitRunning(context.Background(), gw)
require.NoError(t, err)
t.Cleanup(func() {
err = services.StopAndAwaitTerminated(context.Background(), gw)
require.NoError(t, err)
})
ts, _ := time.Parse("2006-01-02 15:04", "2023-10-03 10:00")
now := model.TimeFromUnix(ts.Unix())
@ -174,6 +187,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
})
t.Run("returns error if chunk refs do not belong to tenant", func(t *testing.T) {
reg := prometheus.NewRegistry()
gw, err := New(cfg, schemaCfg, storageCfg, ss, cm, logger, reg)
require.NoError(t, err)
@ -196,4 +210,41 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
require.Equal(t, "expected chunk refs from tenant test, got tenant other: invalid tenant in chunk refs", err.Error())
})
t.Run("gateway tracks active users", func(t *testing.T) {
reg := prometheus.NewRegistry()
gw, err := New(cfg, schemaCfg, storageCfg, ss, cm, logger, reg)
require.NoError(t, err)
err = services.StartAndAwaitRunning(context.Background(), gw)
require.NoError(t, err)
t.Cleanup(func() {
err = services.StopAndAwaitTerminated(context.Background(), gw)
require.NoError(t, err)
})
ts, _ := time.Parse("2006-01-02 15:04", "2023-10-03 10:00")
now := model.TimeFromUnix(ts.Unix())
tenants := []string{"tenant-a", "tenant-b", "tenant-c"}
for idx, tenantID := range tenants {
chunkRefs := []*logproto.ChunkRef{
{
Fingerprint: uint64(1000 + 100*idx),
UserID: tenantID,
From: now.Add(-24 * time.Hour),
Through: now,
Checksum: uint32(idx),
},
}
req := &logproto.FilterChunkRefRequest{
From: now.Add(-24 * time.Hour),
Through: now,
Refs: groupRefs(t, chunkRefs),
}
ctx := user.InjectOrgID(context.Background(), tenantID)
_, err = gw.FilterChunkRefs(ctx, req)
require.NoError(t, err)
}
require.Equal(t, tenants, gw.activeUsers.ActiveUsers())
})
}

@ -20,7 +20,7 @@ import (
"github.com/grafana/loki/pkg/lokifrontend/frontend/v1/frontendv1pb"
"github.com/grafana/loki/pkg/querier/stats"
"github.com/grafana/loki/pkg/scheduler/queue"
"github.com/grafana/loki/pkg/queue"
"github.com/grafana/loki/pkg/util"
lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc"
"github.com/grafana/loki/pkg/util/validation"
@ -106,7 +106,7 @@ func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Regist
f.numClients = promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{
Name: "cortex_query_frontend_connected_clients",
Help: "Number of worker clients currently connected to the frontend.",
}, f.requestQueue.GetConnectedQuerierWorkersMetric)
}, f.requestQueue.GetConnectedConsumersMetric)
f.Service = services.NewBasicService(f.starting, f.running, f.stopping)
return f, nil
@ -189,8 +189,8 @@ func (f *Frontend) Process(server frontendv1pb.Frontend_ProcessServer) error {
return err
}
f.requestQueue.RegisterQuerierConnection(querierID)
defer f.requestQueue.UnregisterQuerierConnection(querierID)
f.requestQueue.RegisterConsumerConnection(querierID)
defer f.requestQueue.UnregisterConsumerConnection(querierID)
lastIndex := queue.StartIndex
@ -273,7 +273,7 @@ func (f *Frontend) Process(server frontendv1pb.Frontend_ProcessServer) error {
func (f *Frontend) NotifyClientShutdown(_ context.Context, req *frontendv1pb.NotifyClientShutdownRequest) (*frontendv1pb.NotifyClientShutdownResponse, error) {
level.Info(f.log).Log("msg", "received shutdown notification from querier", "querier", req.GetClientID())
f.requestQueue.NotifyQuerierShutdown(req.GetClientID())
f.requestQueue.NotifyConsumerShutdown(req.GetClientID())
return &frontendv1pb.NotifyClientShutdownResponse{}, nil
}
@ -327,7 +327,7 @@ func (f *Frontend) queueRequest(ctx context.Context, req *request) error {
// chosen to match the same method in the ingester
func (f *Frontend) CheckReady(_ context.Context) error {
// if we have more than one querier connected we will consider ourselves ready
connectedClients := f.requestQueue.GetConnectedQuerierWorkersMetric()
connectedClients := f.requestQueue.GetConnectedConsumersMetric()
if connectedClients > 0 {
return nil
}

@ -33,7 +33,7 @@ import (
"github.com/grafana/loki/pkg/querier/queryrange"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
querier_worker "github.com/grafana/loki/pkg/querier/worker"
"github.com/grafana/loki/pkg/scheduler/queue"
"github.com/grafana/loki/pkg/queue"
)
const (
@ -136,7 +136,7 @@ func TestFrontendCheckReady(t *testing.T) {
requestQueue: queue.NewRequestQueue(5, 0, qm),
}
for i := 0; i < tt.connectedClients; i++ {
f.requestQueue.RegisterQuerierConnection("test")
f.requestQueue.RegisterConsumerConnection("test")
}
err := f.CheckReady(context.Background())
errMsg := ""

@ -60,7 +60,7 @@ func BenchmarkQueryFairness(t *testing.B) {
enqueueRequestsForActor(t, []string{}, useActor, requestQueue, numSubRequestsActorA, 50*time.Millisecond)
enqueueRequestsForActor(t, []string{"a"}, useActor, requestQueue, numSubRequestsActorA, 100*time.Millisecond)
enqueueRequestsForActor(t, []string{"b"}, useActor, requestQueue, numSubRequestsActorB, 50*time.Millisecond)
requestQueue.queues.recomputeUserQueriers()
requestQueue.queues.recomputeUserConsumers()
// set timeout to minize impact on overall test run duration in case something goes wrong
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
@ -76,8 +76,8 @@ func BenchmarkQueryFairness(t *testing.B) {
go func(id string) {
defer wg.Done()
requestQueue.RegisterQuerierConnection(id)
defer requestQueue.UnregisterQuerierConnection(id)
requestQueue.RegisterConsumerConnection(id)
defer requestQueue.UnregisterConsumerConnection(id)
idx := StartIndex
for ctx.Err() == nil {
r, newIdx, err := requestQueue.Dequeue(ctx, idx, id)
@ -143,7 +143,7 @@ func TestQueryFairnessAcrossSameLevel(t *testing.T) {
_ = requestQueue.Enqueue("tenant1", []string{"xyz"}, r(22), 0, nil)
_ = requestQueue.Enqueue("tenant1", []string{"xyz", "123"}, r(200), 0, nil)
_ = requestQueue.Enqueue("tenant1", []string{"xyz", "456"}, r(210), 0, nil)
requestQueue.queues.recomputeUserQueriers()
requestQueue.queues.recomputeUserConsumers()
items := make([]int, 0)
@ -151,8 +151,8 @@ func TestQueryFairnessAcrossSameLevel(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
requestQueue.RegisterQuerierConnection("querier")
defer requestQueue.UnregisterQuerierConnection("querier")
requestQueue.RegisterConsumerConnection("querier")
defer requestQueue.UnregisterConsumerConnection("querier")
idx := StartIndexWithLocalQueue
for ctx.Err() == nil {

@ -51,7 +51,7 @@ type RequestChannel chan Request
type RequestQueue struct {
services.Service
connectedQuerierWorkers *atomic.Int32
connectedConsumers *atomic.Int32
mtx sync.Mutex
cond contextCond // Notified when request is enqueued or dequeued, or querier is disconnected.
@ -63,13 +63,13 @@ type RequestQueue struct {
func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, metrics *Metrics) *RequestQueue {
q := &RequestQueue{
queues: newTenantQueues(maxOutstandingPerTenant, forgetDelay),
connectedQuerierWorkers: atomic.NewInt32(0),
metrics: metrics,
queues: newTenantQueues(maxOutstandingPerTenant, forgetDelay),
connectedConsumers: atomic.NewInt32(0),
metrics: metrics,
}
q.cond = contextCond{Cond: sync.NewCond(&q.mtx)}
q.Service = services.NewTimerService(forgetCheckPeriod, nil, q.forgetDisconnectedQueriers, q.stopping).WithName("request queue")
q.Service = services.NewTimerService(forgetCheckPeriod, nil, q.forgetDisconnectedConsumers, q.stopping).WithName("request queue")
return q
}
@ -127,8 +127,8 @@ func (q *RequestQueue) Enqueue(tenant string, path []string, req Request, maxQue
// Dequeue find next tenant queue and takes the next request off of it. Will block if there are no requests.
// By passing tenant index from previous call of this method, querier guarantees that it iterates over all tenants fairly.
// If querier finds that request from the tenant is already expired, it can get a request for the same tenant by using UserIndex.ReuseLastUser.
func (q *RequestQueue) Dequeue(ctx context.Context, last QueueIndex, querierID string) (Request, QueueIndex, error) {
// If consumer finds that request from the tenant is already expired, it can get a request for the same tenant by using UserIndex.ReuseLastUser.
func (q *RequestQueue) Dequeue(ctx context.Context, last QueueIndex, consumerID string) (Request, QueueIndex, error) {
q.mtx.Lock()
defer q.mtx.Unlock()
@ -140,7 +140,7 @@ FindQueue:
querierWait = false
start := time.Now()
q.cond.Wait(ctx)
q.metrics.querierWaitTime.WithLabelValues(querierID).Observe(time.Since(start).Seconds())
q.metrics.querierWaitTime.WithLabelValues(consumerID).Observe(time.Since(start).Seconds())
}
if q.stopped {
@ -152,7 +152,7 @@ FindQueue:
}
for {
queue, tenant, idx := q.queues.getNextQueueForQuerier(last, querierID)
queue, tenant, idx := q.queues.getNextQueueForConsumer(last, consumerID)
last = idx
if queue == nil {
break
@ -181,11 +181,11 @@ FindQueue:
goto FindQueue
}
func (q *RequestQueue) forgetDisconnectedQueriers(_ context.Context) error {
func (q *RequestQueue) forgetDisconnectedConsumers(_ context.Context) error {
q.mtx.Lock()
defer q.mtx.Unlock()
if q.queues.forgetDisconnectedQueriers(time.Now()) > 0 {
if q.queues.forgetDisconnectedConsumers(time.Now()) > 0 {
// We need to notify goroutines cause having removed some queriers
// may have caused a resharding.
q.cond.Broadcast()
@ -198,7 +198,7 @@ func (q *RequestQueue) stopping(_ error) error {
q.mtx.Lock()
defer q.mtx.Unlock()
for !q.queues.hasNoTenantQueues() && q.connectedQuerierWorkers.Load() > 0 {
for !q.queues.hasNoTenantQueues() && q.connectedConsumers.Load() > 0 {
q.cond.Wait(context.Background())
}
@ -211,30 +211,30 @@ func (q *RequestQueue) stopping(_ error) error {
return nil
}
func (q *RequestQueue) RegisterQuerierConnection(querier string) {
q.connectedQuerierWorkers.Inc()
func (q *RequestQueue) RegisterConsumerConnection(querier string) {
q.connectedConsumers.Inc()
q.mtx.Lock()
defer q.mtx.Unlock()
q.queues.addQuerierConnection(querier)
q.queues.addConsumerToConnection(querier)
}
func (q *RequestQueue) UnregisterQuerierConnection(querier string) {
q.connectedQuerierWorkers.Dec()
func (q *RequestQueue) UnregisterConsumerConnection(querier string) {
q.connectedConsumers.Dec()
q.mtx.Lock()
defer q.mtx.Unlock()
q.queues.removeQuerierConnection(querier, time.Now())
q.queues.removeConsumerConnection(querier, time.Now())
}
func (q *RequestQueue) NotifyQuerierShutdown(querierID string) {
func (q *RequestQueue) NotifyConsumerShutdown(querierID string) {
q.mtx.Lock()
defer q.mtx.Unlock()
q.queues.notifyQuerierShutdown(querierID)
}
func (q *RequestQueue) GetConnectedQuerierWorkersMetric() float64 {
return float64(q.connectedQuerierWorkers.Load())
func (q *RequestQueue) GetConnectedConsumersMetric() float64 {
return float64(q.connectedConsumers.Load())
}
// contextCond is a *sync.Cond with Wait() method overridden to support context-based waiting.

@ -49,7 +49,7 @@ func BenchmarkGetNextRequest(b *testing.B) {
queues = append(queues, queue)
for ix := 0; ix < queriers; ix++ {
queue.RegisterQuerierConnection(fmt.Sprintf("querier-%d", ix))
queue.RegisterConsumerConnection(fmt.Sprintf("querier-%d", ix))
}
for i := 0; i < maxOutstandingPerTenant; i++ {
@ -106,7 +106,7 @@ func BenchmarkQueueRequest(b *testing.B) {
q := NewRequestQueue(maxOutstandingPerTenant, 0, NewMetrics("query_scheduler", nil))
for ix := 0; ix < queriers; ix++ {
q.RegisterQuerierConnection(fmt.Sprintf("querier-%d", ix))
q.RegisterConsumerConnection(fmt.Sprintf("querier-%d", ix))
}
queues = append(queues, q)
@ -143,8 +143,8 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBe
})
// Two queriers connect.
queue.RegisterQuerierConnection("querier-1")
queue.RegisterQuerierConnection("querier-2")
queue.RegisterConsumerConnection("querier-1")
queue.RegisterConsumerConnection("querier-2")
// Querier-2 waits for a new request.
querier2wg := sync.WaitGroup{}
@ -156,7 +156,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBe
}()
// Querier-1 crashes (no graceful shutdown notification).
queue.UnregisterQuerierConnection("querier-1")
queue.UnregisterConsumerConnection("querier-1")
// Enqueue a request from an user which would be assigned to querier-1.
// NOTE: "user-1" hash falls in the querier-1 shard.
@ -305,7 +305,7 @@ func TestMaxQueueSize(t *testing.T) {
t.Run("queue size is tracked per tenant", func(t *testing.T) {
maxSize := 3
queue := NewRequestQueue(maxSize, 0, NewMetrics("query_scheduler", nil))
queue.RegisterQuerierConnection("querier")
queue.RegisterConsumerConnection("querier")
// enqueue maxSize items with different actors
// different actors have individual channels with maxSize length

@ -38,35 +38,35 @@ func (tqs intPointerMap) Dec(key string) int {
return *ptr
}
// querier holds information about a querier registered in the queue.
type querier struct {
// consumer holds information about a consumer registered in the queue.
type consumer struct {
// Number of active connections.
connections int
// True if the querier notified it's gracefully shutting down.
// True if the consumer notified it's gracefully shutting down.
shuttingDown bool
// When the last connection has been unregistered.
disconnectedAt time.Time
}
// This struct holds tenant queues for pending requests. It also keeps track of connected queriers,
// and mapping between tenants and queriers.
// This struct holds tenant queues for pending requests. It also keeps track of connected consumers,
// and mapping between tenants and consumers.
type tenantQueues struct {
mapping *Mapping[*tenantQueue]
maxUserQueueSize int
perUserQueueLen intPointerMap
// How long to wait before removing a querier which has got disconnected
// How long to wait before removing a consumer which has got disconnected
// but hasn't notified about a graceful shutdown.
forgetDelay time.Duration
// Tracks queriers registered to the queue.
queriers map[string]*querier
// Tracks consumers registered to the queue.
consumers map[string]*consumer
// Sorted list of querier names, used when creating per-user shard.
sortedQueriers []string
// sortedConsumer list of consumer IDs, used when creating per-user shard.
sortedConsumers []string
}
type Queue interface {
@ -86,12 +86,12 @@ type Mapable interface {
type tenantQueue struct {
*TreeQueue
// If not nil, only these queriers can handle user requests. If nil, all queriers can.
// We set this to nil if number of available queriers <= maxQueriers.
queriers map[string]struct{}
// If not nil, only these consumers can handle user requests. If nil, all consumers can.
// We set this to nil if number of available consumers <= maxQueriers.
consumers map[string]struct{}
maxQueriers int
// Seed for shuffle sharding of queriers. This seed is based on userID only and is therefore consistent
// Seed for shuffle sharding of consumers. This seed is based on userID only and is therefore consistent
// between different frontends.
seed int64
}
@ -104,8 +104,8 @@ func newTenantQueues(maxUserQueueSize int, forgetDelay time.Duration) *tenantQue
maxUserQueueSize: maxUserQueueSize,
perUserQueueLen: make(intPointerMap),
forgetDelay: forgetDelay,
queriers: map[string]*querier{},
sortedQueriers: nil,
consumers: map[string]*consumer{},
sortedConsumers: nil,
}
}
@ -118,9 +118,9 @@ func (q *tenantQueues) deleteQueue(tenant string) {
}
// Returns existing or new queue for a tenant.
// MaxQueriers is used to compute which queriers should handle requests for this tenant.
// If maxQueriers is <= 0, all queriers can handle this tenant's requests.
// If maxQueriers has changed since the last call, queriers for this are recomputed.
// MaxQueriers is used to compute which consumers should handle requests for this tenant.
// If maxQueriers is <= 0, all consumers can handle this tenant's requests.
// If maxQueriers has changed since the last call, consumers for this are recomputed.
func (q *tenantQueues) getOrAddQueue(tenant string, path []string, maxQueriers int) Queue {
// Empty tenant is not allowed, as that would break our tenants list ("" is used for free spot).
if tenant == "" {
@ -142,7 +142,7 @@ func (q *tenantQueues) getOrAddQueue(tenant string, path []string, maxQueriers i
if uq.maxQueriers != maxQueriers {
uq.maxQueriers = maxQueriers
uq.queriers = shuffleQueriersForTenants(uq.seed, maxQueriers, q.sortedQueriers, nil)
uq.consumers = shuffleConsumersForTenants(uq.seed, maxQueriers, q.sortedConsumers, nil)
}
if len(path) == 0 {
@ -151,10 +151,10 @@ func (q *tenantQueues) getOrAddQueue(tenant string, path []string, maxQueriers i
return uq.add(path)
}
// Finds next queue for the querier. To support fair scheduling between users, client is expected
// Finds next queue for the consumer. To support fair scheduling between users, client is expected
// to pass last user index returned by this function as argument. Is there was no previous
// last user index, use -1.
func (q *tenantQueues) getNextQueueForQuerier(lastUserIndex QueueIndex, querierID string) (Queue, string, QueueIndex) {
func (q *tenantQueues) getNextQueueForConsumer(lastUserIndex QueueIndex, consumerID string) (Queue, string, QueueIndex) {
uid := lastUserIndex
// at the RequestQueue level we don't have local queues, so start index is -1
@ -162,9 +162,9 @@ func (q *tenantQueues) getNextQueueForQuerier(lastUserIndex QueueIndex, querierI
uid = StartIndex
}
// Ensure the querier is not shutting down. If the querier is shutting down, we shouldn't forward
// Ensure the consumer is not shutting down. If the consumer is shutting down, we shouldn't forward
// any more queries to it.
if info := q.queriers[querierID]; info == nil || info.shuttingDown {
if info := q.consumers[consumerID]; info == nil || info.shuttingDown {
return nil, "", uid
}
@ -180,9 +180,9 @@ func (q *tenantQueues) getNextQueueForQuerier(lastUserIndex QueueIndex, querierI
}
uid = tq.pos
if tq.queriers != nil {
if _, ok := tq.queriers[querierID]; !ok {
// This querier is not handling the user.
if tq.consumers != nil {
if _, ok := tq.consumers[consumerID]; !ok {
// This consumer is not handling the user.
continue
}
}
@ -192,30 +192,30 @@ func (q *tenantQueues) getNextQueueForQuerier(lastUserIndex QueueIndex, querierI
return nil, "", uid
}
func (q *tenantQueues) addQuerierConnection(querierID string) {
info := q.queriers[querierID]
func (q *tenantQueues) addConsumerToConnection(consumerID string) {
info := q.consumers[consumerID]
if info != nil {
info.connections++
// Reset in case the querier re-connected while it was in the forget waiting period.
// Reset in case the consumer re-connected while it was in the forget waiting period.
info.shuttingDown = false
info.disconnectedAt = time.Time{}
return
}
// First connection from this querier.
q.queriers[querierID] = &querier{connections: 1}
q.sortedQueriers = append(q.sortedQueriers, querierID)
sort.Strings(q.sortedQueriers)
// First connection from this consumer.
q.consumers[consumerID] = &consumer{connections: 1}
q.sortedConsumers = append(q.sortedConsumers, consumerID)
sort.Strings(q.sortedConsumers)
q.recomputeUserQueriers()
q.recomputeUserConsumers()
}
func (q *tenantQueues) removeQuerierConnection(querierID string, now time.Time) {
info := q.queriers[querierID]
func (q *tenantQueues) removeConsumerConnection(consumerID string, now time.Time) {
info := q.consumers[consumerID]
if info == nil || info.connections <= 0 {
panic("unexpected number of connections for querier")
panic("unexpected number of connections for consumer")
}
// Decrease the number of active connections.
@ -225,65 +225,65 @@ func (q *tenantQueues) removeQuerierConnection(querierID string, now time.Time)
}
// There no more active connections. If the forget delay is configured then
// we can remove it only if querier has announced a graceful shutdown.
// we can remove it only if consumer has announced a graceful shutdown.
if info.shuttingDown || q.forgetDelay == 0 {
q.removeQuerier(querierID)
q.removeConsumer(consumerID)
return
}
// No graceful shutdown has been notified yet, so we should track the current time
// so that we'll remove the querier as soon as we receive the graceful shutdown
// so that we'll remove the consumer as soon as we receive the graceful shutdown
// notification (if any) or once the threshold expires.
info.disconnectedAt = now
}
func (q *tenantQueues) removeQuerier(querierID string) {
delete(q.queriers, querierID)
func (q *tenantQueues) removeConsumer(consumerID string) {
delete(q.consumers, consumerID)
ix := sort.SearchStrings(q.sortedQueriers, querierID)
if ix >= len(q.sortedQueriers) || q.sortedQueriers[ix] != querierID {
panic("incorrect state of sorted queriers")
ix := sort.SearchStrings(q.sortedConsumers, consumerID)
if ix >= len(q.sortedConsumers) || q.sortedConsumers[ix] != consumerID {
panic("incorrect state of sorted consumers")
}
q.sortedQueriers = append(q.sortedQueriers[:ix], q.sortedQueriers[ix+1:]...)
q.sortedConsumers = append(q.sortedConsumers[:ix], q.sortedConsumers[ix+1:]...)
q.recomputeUserQueriers()
q.recomputeUserConsumers()
}
// notifyQuerierShutdown records that a querier has sent notification about a graceful shutdown.
func (q *tenantQueues) notifyQuerierShutdown(querierID string) {
info := q.queriers[querierID]
// notifyQuerierShutdown records that a consumer has sent notification about a graceful shutdown.
func (q *tenantQueues) notifyQuerierShutdown(consumerID string) {
info := q.consumers[consumerID]
if info == nil {
// The querier may have already been removed, so we just ignore it.
// The consumer may have already been removed, so we just ignore it.
return
}
// If there are no more connections, we should remove the querier.
// If there are no more connections, we should remove the consumer.
if info.connections == 0 {
q.removeQuerier(querierID)
q.removeConsumer(consumerID)
return
}
// Otherwise we should annotate we received a graceful shutdown notification
// and the querier will be removed once all connections are unregistered.
// and the consumer will be removed once all connections are unregistered.
info.shuttingDown = true
}
// forgetDisconnectedQueriers removes all disconnected queriers that have gone since at least
// the forget delay. Returns the number of forgotten queriers.
func (q *tenantQueues) forgetDisconnectedQueriers(now time.Time) int {
// forgetDisconnectedConsumers removes all disconnected consumer that have gone since at least
// the forget delay. Returns the number of forgotten consumers.
func (q *tenantQueues) forgetDisconnectedConsumers(now time.Time) int {
// Nothing to do if the forget delay is disabled.
if q.forgetDelay == 0 {
return 0
}
// Remove all queriers with no connections that have gone since at least the forget delay.
// Remove all consumers with no connections that have gone since at least the forget delay.
threshold := now.Add(-q.forgetDelay)
forgotten := 0
for querierID := range q.queriers {
if info := q.queriers[querierID]; info.connections == 0 && info.disconnectedAt.Before(threshold) {
q.removeQuerier(querierID)
for id := range q.consumers {
if info := q.consumers[id]; info.connections == 0 && info.disconnectedAt.Before(threshold) {
q.removeConsumer(id)
forgotten++
}
}
@ -291,30 +291,30 @@ func (q *tenantQueues) forgetDisconnectedQueriers(now time.Time) int {
return forgotten
}
func (q *tenantQueues) recomputeUserQueriers() {
scratchpad := make([]string, 0, len(q.sortedQueriers))
func (q *tenantQueues) recomputeUserConsumers() {
scratchpad := make([]string, 0, len(q.sortedConsumers))
for _, uq := range q.mapping.Values() {
uq.queriers = shuffleQueriersForTenants(uq.seed, uq.maxQueriers, q.sortedQueriers, scratchpad)
uq.consumers = shuffleConsumersForTenants(uq.seed, uq.maxQueriers, q.sortedConsumers, scratchpad)
}
}
// shuffleQueriersForTenants returns nil if queriersToSelect is 0 or there are not enough queriers to select from.
// In that case *all* queriers should be used.
// shuffleConsumersForTenants returns nil if consumersToSelect is 0 or there are not enough consumers to select from.
// In that case *all* consumers should be used.
// Scratchpad is used for shuffling, to avoid new allocations. If nil, new slice is allocated.
func shuffleQueriersForTenants(userSeed int64, queriersToSelect int, allSortedQueriers []string, scratchpad []string) map[string]struct{} {
if queriersToSelect == 0 || len(allSortedQueriers) <= queriersToSelect {
func shuffleConsumersForTenants(userSeed int64, consumersToSelect int, allSortedConsumers []string, scratchpad []string) map[string]struct{} {
if consumersToSelect == 0 || len(allSortedConsumers) <= consumersToSelect {
return nil
}
result := make(map[string]struct{}, queriersToSelect)
result := make(map[string]struct{}, consumersToSelect)
rnd := rand.New(rand.NewSource(userSeed))
scratchpad = scratchpad[:0]
scratchpad = append(scratchpad, allSortedQueriers...)
scratchpad = append(scratchpad, allSortedConsumers...)
last := len(scratchpad) - 1
for i := 0; i < queriersToSelect; i++ {
for i := 0; i < consumersToSelect; i++ {
r := rnd.Intn(last + 1)
result[scratchpad[r]] = struct{}{}
// move selected item to the end, it won't be selected anymore.

@ -22,10 +22,10 @@ func TestQueues(t *testing.T) {
assert.NotNil(t, uq)
assert.NoError(t, isConsistent(uq))
uq.addQuerierConnection("querier-1")
uq.addQuerierConnection("querier-2")
uq.addConsumerToConnection("querier-1")
uq.addConsumerToConnection("querier-2")
q, u, lastUserIndex := uq.getNextQueueForQuerier(-1, "querier-1")
q, u, lastUserIndex := uq.getNextQueueForConsumer(-1, "querier-1")
assert.Nil(t, q)
assert.Equal(t, "", u)
@ -71,7 +71,7 @@ func TestQueues(t *testing.T) {
uq.deleteQueue("four")
assert.NoError(t, isConsistent(uq))
q, _, _ = uq.getNextQueueForQuerier(lastUserIndex, "querier-1")
q, _, _ = uq.getNextQueueForConsumer(lastUserIndex, "querier-1")
assert.Nil(t, q)
}
@ -80,8 +80,8 @@ func TestQueuesOnTerminatingQuerier(t *testing.T) {
assert.NotNil(t, uq)
assert.NoError(t, isConsistent(uq))
uq.addQuerierConnection("querier-1")
uq.addQuerierConnection("querier-2")
uq.addConsumerToConnection("querier-1")
uq.addConsumerToConnection("querier-2")
// Add queues: [one, two]
qOne := getOrAdd(t, uq, "one", 0)
@ -91,7 +91,7 @@ func TestQueuesOnTerminatingQuerier(t *testing.T) {
// After notify shutdown for querier-2, it's expected to own no queue.
uq.notifyQuerierShutdown("querier-2")
q, u, _ := uq.getNextQueueForQuerier(-1, "querier-2")
q, u, _ := uq.getNextQueueForConsumer(-1, "querier-2")
assert.Nil(t, q)
assert.Equal(t, "", u)
@ -99,8 +99,8 @@ func TestQueuesOnTerminatingQuerier(t *testing.T) {
confirmOrderForQuerier(t, uq, "querier-1", -1, qOne, qTwo, qOne, qTwo)
// After disconnecting querier-2, it's expected to own no queue.
uq.removeQuerier("querier-2")
q, u, _ = uq.getNextQueueForQuerier(-1, "querier-2")
uq.removeConsumer("querier-2")
q, u, _ = uq.getNextQueueForConsumer(-1, "querier-2")
assert.Nil(t, q)
assert.Equal(t, "", u)
}
@ -117,10 +117,10 @@ func TestQueuesWithQueriers(t *testing.T) {
// Add some queriers.
for ix := 0; ix < queriers; ix++ {
qid := fmt.Sprintf("querier-%d", ix)
uq.addQuerierConnection(qid)
uq.addConsumerToConnection(qid)
// No querier has any queues yet.
q, u, _ := uq.getNextQueueForQuerier(-1, qid)
q, u, _ := uq.getNextQueueForConsumer(-1, qid)
assert.Nil(t, q)
assert.Equal(t, "", u)
}
@ -133,7 +133,7 @@ func TestQueuesWithQueriers(t *testing.T) {
getOrAdd(t, uq, uid, maxQueriersPerUser)
// Verify it has maxQueriersPerUser queriers assigned now.
qs := uq.mapping.GetByKey(uid).queriers
qs := uq.mapping.GetByKey(uid).consumers
assert.Equal(t, maxQueriersPerUser, len(qs))
}
@ -146,7 +146,7 @@ func TestQueuesWithQueriers(t *testing.T) {
lastUserIndex := StartIndex
for {
_, _, newIx := uq.getNextQueueForQuerier(lastUserIndex, qid)
_, _, newIx := uq.getNextQueueForConsumer(lastUserIndex, qid)
if newIx < lastUserIndex {
break
}
@ -199,18 +199,18 @@ func TestQueuesConsistency(t *testing.T) {
assert.NotNil(t, uq.getOrAddQueue(generateTenant(r), generateActor(r), 3))
case 1:
qid := generateQuerier(r)
_, _, luid := uq.getNextQueueForQuerier(lastUserIndexes[qid], qid)
_, _, luid := uq.getNextQueueForConsumer(lastUserIndexes[qid], qid)
lastUserIndexes[qid] = luid
case 2:
uq.deleteQueue(generateTenant(r))
case 3:
q := generateQuerier(r)
uq.addQuerierConnection(q)
uq.addConsumerToConnection(q)
conns[q]++
case 4:
q := generateQuerier(r)
if conns[q] > 0 {
uq.removeQuerierConnection(q, time.Now())
uq.removeConsumerConnection(q, time.Now())
conns[q]--
}
case 5:
@ -238,8 +238,8 @@ func TestQueues_ForgetDelay(t *testing.T) {
// 3 queriers open 2 connections each.
for i := 1; i <= 3; i++ {
uq.addQuerierConnection(fmt.Sprintf("querier-%d", i))
uq.addQuerierConnection(fmt.Sprintf("querier-%d", i))
uq.addConsumerToConnection(fmt.Sprintf("querier-%d", i))
uq.addConsumerToConnection(fmt.Sprintf("querier-%d", i))
}
// Add user queues.
@ -253,12 +253,12 @@ func TestQueues_ForgetDelay(t *testing.T) {
require.NotEmpty(t, querier1Users)
// Gracefully shutdown querier-1.
uq.removeQuerierConnection("querier-1", now.Add(20*time.Second))
uq.removeQuerierConnection("querier-1", now.Add(21*time.Second))
uq.removeConsumerConnection("querier-1", now.Add(20*time.Second))
uq.removeConsumerConnection("querier-1", now.Add(21*time.Second))
uq.notifyQuerierShutdown("querier-1")
// We expect querier-1 has been removed.
assert.NotContains(t, uq.queriers, "querier-1")
assert.NotContains(t, uq.consumers, "querier-1")
assert.NoError(t, isConsistent(uq))
// We expect querier-1 users have been shuffled to other queriers.
@ -267,8 +267,8 @@ func TestQueues_ForgetDelay(t *testing.T) {
}
// Querier-1 reconnects.
uq.addQuerierConnection("querier-1")
uq.addQuerierConnection("querier-1")
uq.addConsumerToConnection("querier-1")
uq.addConsumerToConnection("querier-1")
// We expect the initial querier-1 users have got back to querier-1.
for _, userID := range querier1Users {
@ -278,11 +278,11 @@ func TestQueues_ForgetDelay(t *testing.T) {
}
// Querier-1 abruptly terminates (no shutdown notification received).
uq.removeQuerierConnection("querier-1", now.Add(40*time.Second))
uq.removeQuerierConnection("querier-1", now.Add(41*time.Second))
uq.removeConsumerConnection("querier-1", now.Add(40*time.Second))
uq.removeConsumerConnection("querier-1", now.Add(41*time.Second))
// We expect querier-1 has NOT been removed.
assert.Contains(t, uq.queriers, "querier-1")
assert.Contains(t, uq.consumers, "querier-1")
assert.NoError(t, isConsistent(uq))
// We expect the querier-1 users have not been shuffled to other queriers.
@ -293,9 +293,9 @@ func TestQueues_ForgetDelay(t *testing.T) {
}
// Try to forget disconnected queriers, but querier-1 forget delay hasn't passed yet.
uq.forgetDisconnectedQueriers(now.Add(90 * time.Second))
uq.forgetDisconnectedConsumers(now.Add(90 * time.Second))
assert.Contains(t, uq.queriers, "querier-1")
assert.Contains(t, uq.consumers, "querier-1")
assert.NoError(t, isConsistent(uq))
for _, userID := range querier1Users {
@ -305,9 +305,9 @@ func TestQueues_ForgetDelay(t *testing.T) {
}
// Try to forget disconnected queriers. This time querier-1 forget delay has passed.
uq.forgetDisconnectedQueriers(now.Add(105 * time.Second))
uq.forgetDisconnectedConsumers(now.Add(105 * time.Second))
assert.NotContains(t, uq.queriers, "querier-1")
assert.NotContains(t, uq.consumers, "querier-1")
assert.NoError(t, isConsistent(uq))
// We expect querier-1 users have been shuffled to other queriers.
@ -330,8 +330,8 @@ func TestQueues_ForgetDelay_ShouldCorrectlyHandleQuerierReconnectingBeforeForget
// 3 queriers open 2 connections each.
for i := 1; i <= 3; i++ {
uq.addQuerierConnection(fmt.Sprintf("querier-%d", i))
uq.addQuerierConnection(fmt.Sprintf("querier-%d", i))
uq.addConsumerToConnection(fmt.Sprintf("querier-%d", i))
uq.addConsumerToConnection(fmt.Sprintf("querier-%d", i))
}
// Add user queues.
@ -345,11 +345,11 @@ func TestQueues_ForgetDelay_ShouldCorrectlyHandleQuerierReconnectingBeforeForget
require.NotEmpty(t, querier1Users)
// Querier-1 abruptly terminates (no shutdown notification received).
uq.removeQuerierConnection("querier-1", now.Add(40*time.Second))
uq.removeQuerierConnection("querier-1", now.Add(41*time.Second))
uq.removeConsumerConnection("querier-1", now.Add(40*time.Second))
uq.removeConsumerConnection("querier-1", now.Add(41*time.Second))
// We expect querier-1 has NOT been removed.
assert.Contains(t, uq.queriers, "querier-1")
assert.Contains(t, uq.consumers, "querier-1")
assert.NoError(t, isConsistent(uq))
// We expect the querier-1 users have not been shuffled to other queriers.
@ -360,13 +360,13 @@ func TestQueues_ForgetDelay_ShouldCorrectlyHandleQuerierReconnectingBeforeForget
}
// Try to forget disconnected queriers, but querier-1 forget delay hasn't passed yet.
uq.forgetDisconnectedQueriers(now.Add(90 * time.Second))
uq.forgetDisconnectedConsumers(now.Add(90 * time.Second))
// Querier-1 reconnects.
uq.addQuerierConnection("querier-1")
uq.addQuerierConnection("querier-1")
uq.addConsumerToConnection("querier-1")
uq.addConsumerToConnection("querier-1")
assert.Contains(t, uq.queriers, "querier-1")
assert.Contains(t, uq.consumers, "querier-1")
assert.NoError(t, isConsistent(uq))
// We expect the querier-1 users have not been shuffled to other queriers.
@ -377,9 +377,9 @@ func TestQueues_ForgetDelay_ShouldCorrectlyHandleQuerierReconnectingBeforeForget
}
// Try to forget disconnected queriers far in the future, but there's no disconnected querier.
uq.forgetDisconnectedQueriers(now.Add(200 * time.Second))
uq.forgetDisconnectedConsumers(now.Add(200 * time.Second))
assert.Contains(t, uq.queriers, "querier-1")
assert.Contains(t, uq.consumers, "querier-1")
assert.NoError(t, isConsistent(uq))
for _, userID := range querier1Users {
@ -414,7 +414,7 @@ func confirmOrderForQuerier(t *testing.T, uq *tenantQueues, querier string, last
t.Helper()
var n Queue
for _, q := range qs {
n, _, lastUserIndex = uq.getNextQueueForQuerier(lastUserIndex, querier)
n, _, lastUserIndex = uq.getNextQueueForConsumer(lastUserIndex, querier)
assert.Equal(t, q, n)
assert.NoError(t, isConsistent(uq))
}
@ -422,7 +422,7 @@ func confirmOrderForQuerier(t *testing.T, uq *tenantQueues, querier string, last
}
func isConsistent(uq *tenantQueues) error {
if len(uq.sortedQueriers) != len(uq.queriers) {
if len(uq.sortedConsumers) != len(uq.consumers) {
return fmt.Errorf("inconsistent number of sorted queriers and querier connections")
}
@ -441,16 +441,16 @@ func isConsistent(uq *tenantQueues) error {
uc++
if q.maxQueriers == 0 && q.queriers != nil {
if q.maxQueriers == 0 && q.consumers != nil {
return fmt.Errorf("user %s has queriers, but maxQueriers=0", u)
}
if q.maxQueriers > 0 && len(uq.sortedQueriers) <= q.maxQueriers && q.queriers != nil {
if q.maxQueriers > 0 && len(uq.sortedConsumers) <= q.maxQueriers && q.consumers != nil {
return fmt.Errorf("user %s has queriers set despite not enough queriers available", u)
}
if q.maxQueriers > 0 && len(uq.sortedQueriers) > q.maxQueriers && len(q.queriers) != q.maxQueriers {
return fmt.Errorf("user %s has incorrect number of queriers, expected=%d, got=%d", u, len(q.queriers), q.maxQueriers)
if q.maxQueriers > 0 && len(uq.sortedConsumers) > q.maxQueriers && len(q.consumers) != q.maxQueriers {
return fmt.Errorf("user %s has incorrect number of queriers, expected=%d, got=%d", u, len(q.consumers), q.maxQueriers)
}
}
@ -466,12 +466,12 @@ func getUsersByQuerier(queues *tenantQueues, querierID string) []string {
var userIDs []string
for _, userID := range queues.mapping.Keys() {
q := queues.mapping.GetByKey(userID)
if q.queriers == nil {
if q.consumers == nil {
// If it's nil then all queriers can handle this user.
userIDs = append(userIDs, userID)
continue
}
if _, ok := q.queriers[querierID]; ok {
if _, ok := q.consumers[querierID]; ok {
userIDs = append(userIDs, userID)
}
}
@ -481,14 +481,14 @@ func getUsersByQuerier(queues *tenantQueues, querierID string) []string {
func TestShuffleQueriers(t *testing.T) {
allQueriers := []string{"a", "b", "c", "d", "e"}
require.Nil(t, shuffleQueriersForTenants(12345, 10, allQueriers, nil))
require.Nil(t, shuffleQueriersForTenants(12345, len(allQueriers), allQueriers, nil))
require.Nil(t, shuffleConsumersForTenants(12345, 10, allQueriers, nil))
require.Nil(t, shuffleConsumersForTenants(12345, len(allQueriers), allQueriers, nil))
r1 := shuffleQueriersForTenants(12345, 3, allQueriers, nil)
r1 := shuffleConsumersForTenants(12345, 3, allQueriers, nil)
require.Equal(t, 3, len(r1))
// Same input produces same output.
r2 := shuffleQueriersForTenants(12345, 3, allQueriers, nil)
r2 := shuffleConsumersForTenants(12345, 3, allQueriers, nil)
require.Equal(t, 3, len(r2))
require.Equal(t, r1, r2)
}
@ -510,7 +510,7 @@ func TestShuffleQueriersCorrectness(t *testing.T) {
toSelect = 3
}
selected := shuffleQueriersForTenants(r.Int63(), toSelect, allSortedQueriers, nil)
selected := shuffleConsumersForTenants(r.Int63(), toSelect, allSortedQueriers, nil)
require.Equal(t, toSelect, len(selected))

@ -29,7 +29,7 @@ import (
"google.golang.org/grpc"
"github.com/grafana/loki/pkg/lokifrontend/frontend/v2/frontendv2pb"
"github.com/grafana/loki/pkg/scheduler/queue"
"github.com/grafana/loki/pkg/queue"
"github.com/grafana/loki/pkg/scheduler/schedulerpb"
"github.com/grafana/loki/pkg/util"
lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc"
@ -144,7 +144,7 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, ringManager *lokiri
s.connectedQuerierClients = promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{
Name: "cortex_query_scheduler_connected_querier_clients",
Help: "Number of querier worker clients currently connected to the query-scheduler.",
}, s.requestQueue.GetConnectedQuerierWorkersMetric)
}, s.requestQueue.GetConnectedConsumersMetric)
s.connectedFrontendClients = promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{
Name: "cortex_query_scheduler_connected_frontend_clients",
Help: "Number of query-frontend worker clients currently connected to the query-scheduler.",
@ -404,8 +404,8 @@ func (s *Scheduler) QuerierLoop(querier schedulerpb.SchedulerForQuerier_QuerierL
querierID := resp.GetQuerierID()
level.Debug(s.log).Log("msg", "querier connected", "querier", querierID)
s.requestQueue.RegisterQuerierConnection(querierID)
defer s.requestQueue.UnregisterQuerierConnection(querierID)
s.requestQueue.RegisterConsumerConnection(querierID)
defer s.requestQueue.UnregisterConsumerConnection(querierID)
lastIndex := queue.StartIndex
@ -463,7 +463,7 @@ func (s *Scheduler) QuerierLoop(querier schedulerpb.SchedulerForQuerier_QuerierL
func (s *Scheduler) NotifyQuerierShutdown(_ context.Context, req *schedulerpb.NotifyQuerierShutdownRequest) (*schedulerpb.NotifyQuerierShutdownResponse, error) {
level.Debug(s.log).Log("msg", "received shutdown notification from querier", "querier", req.GetQuerierID())
s.requestQueue.NotifyQuerierShutdown(req.GetQuerierID())
s.requestQueue.NotifyConsumerShutdown(req.GetQuerierID())
return &schedulerpb.NotifyQuerierShutdownResponse{}, nil
}

@ -128,3 +128,13 @@ func (s *ActiveUsersCleanupService) iteration(_ context.Context) error {
}
return nil
}
func (s *ActiveUsersCleanupService) ActiveUsers() []string {
s.activeUsers.mu.RLock()
defer s.activeUsers.mu.RUnlock()
users := make([]string, 0, len(s.activeUsers.timestamps))
for u := range s.activeUsers.timestamps {
users = append(users, u)
}
return users
}

Loading…
Cancel
Save