From 27411cff087e6fa2c660c6bfa577c31945ab5232 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Fri, 20 Oct 2023 15:27:55 +0200 Subject: [PATCH] Introduce worker queue in bloom gateway (#10976) Instead of calling the bloom store directly on each and every request to filter chunk refs based on the given filters, we want to queue requests in per-tenant queues and process batches of requests that can be multiplexed to avoid excessive seeking in the bloom block queriers when checking chunk matches. This PR re-uses the request queue implementation used in the query scheduler. To do so, it moves the queue related code from the scheduler into a separate package `pkg/queue` and renames occurrences of "querier" to "consumer" to be more generic. The bloom gateway instantiates the request queue when starting the service. The gRPC method `FilterChunkRefs` then enqueues incoming requests to that queue. **Special notes for your reviewer**: For testing purposes, this PR also contains a dummy implementation of the workers. The worker implementation - which includes multiplexing of multiple tasks - is subject to a separate PR. --------- Signed-off-by: Christian Haudum --- go.mod | 2 +- pkg/bloomgateway/bloomgateway.go | 261 ++++++++++++++++-- pkg/bloomgateway/bloomgateway_test.go | 53 +++- pkg/lokifrontend/frontend/v1/frontend.go | 12 +- pkg/lokifrontend/frontend/v1/frontend_test.go | 4 +- pkg/{scheduler => }/queue/dequeue_qos_test.go | 12 +- pkg/{scheduler => }/queue/mapping.go | 0 pkg/{scheduler => }/queue/mapping_test.go | 0 pkg/{scheduler => }/queue/metrics.go | 0 pkg/{scheduler => }/queue/queue.go | 42 +-- pkg/{scheduler => }/queue/queue_test.go | 12 +- pkg/{scheduler => }/queue/tenant_queues.go | 144 +++++----- .../queue/tenant_queues_test.go | 108 ++++---- pkg/{scheduler => }/queue/treequeue.go | 0 pkg/{scheduler => }/queue/treequeue_test.go | 0 pkg/scheduler/scheduler.go | 10 +- pkg/util/active_user.go | 10 + 17 files changed, 479 insertions(+), 191 deletions(-) rename pkg/{scheduler => }/queue/dequeue_qos_test.go (93%) rename pkg/{scheduler => }/queue/mapping.go (100%) rename pkg/{scheduler => }/queue/mapping_test.go (100%) rename pkg/{scheduler => }/queue/metrics.go (100%) rename pkg/{scheduler => }/queue/queue.go (86%) rename pkg/{scheduler => }/queue/queue_test.go (96%) rename pkg/{scheduler => }/queue/tenant_queues.go (50%) rename pkg/{scheduler => }/queue/tenant_queues_test.go (81%) rename pkg/{scheduler => }/queue/treequeue.go (100%) rename pkg/{scheduler => }/queue/treequeue_test.go (100%) diff --git a/go.mod b/go.mod index 0a3b7f5343..432cd5b97c 100644 --- a/go.mod +++ b/go.mod @@ -74,7 +74,7 @@ require ( github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f github.com/ncw/swift v1.0.53 github.com/oklog/run v1.1.0 - github.com/oklog/ulid v1.3.1 // indirect + github.com/oklog/ulid v1.3.1 github.com/opentracing-contrib/go-grpc v0.0.0-20210225150812-73cb765af46e github.com/opentracing-contrib/go-stdlib v1.0.0 github.com/opentracing/opentracing-go v1.2.0 diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index 44eae46e4b..2b920e270b 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -39,27 +39,130 @@ package bloomgateway import ( "context" + "fmt" "sort" + "sync" + "time" "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/grafana/dskit/services" "github.com/grafana/dskit/tenant" + "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/queue" "github.com/grafana/loki/pkg/storage" "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" + "github.com/grafana/loki/pkg/util" ) var errGatewayUnhealthy = errors.New("bloom-gateway is unhealthy in the ring") var errInvalidTenant = errors.New("invalid tenant in chunk refs") -type metrics struct{} +// TODO(chaudum): Make these configurable +const ( + numWorkers = 4 + maxTasksPerTenant = 1024 + pendingTasksInitialCap = 1024 +) -func newMetrics(r prometheus.Registerer) *metrics { - return &metrics{} +type metrics struct { + queueDuration prometheus.Histogram + inflightRequests prometheus.Summary +} + +func newMetrics(subsystem string, registerer prometheus.Registerer) *metrics { + return &metrics{ + queueDuration: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{ + Namespace: "loki", + Subsystem: subsystem, + Name: "queue_duration_seconds", + Help: "Time spent by tasks in queue before getting picked up by a worker.", + Buckets: prometheus.DefBuckets, + }), + inflightRequests: promauto.With(registerer).NewSummary(prometheus.SummaryOpts{ + Namespace: "loki", + Subsystem: subsystem, + Name: "inflight_tasks", + Help: "Number of inflight tasks (either queued or processing) sampled at a regular interval. Quantile buckets keep track of inflight tasks over the last 60s.", + Objectives: map[float64]float64{0.5: 0.05, 0.75: 0.02, 0.8: 0.02, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001}, + MaxAge: time.Minute, + AgeBuckets: 6, + }), + } +} + +// Task is the data structure that is enqueued to the internal queue and queued by query workers +type Task struct { + // ID is a lexcographically sortable unique identifier of the task + ID ulid.ULID + // Tenant is the tenant ID + Tenant string + // Request is the original request + Request *logproto.FilterChunkRefRequest + // ErrCh is a send-only channel to write an error to + ErrCh chan<- error + // ResCh is a send-only channel to write partial responses to + ResCh chan<- *logproto.GroupedChunkRefs +} + +// newTask returns a new Task that can be enqueued to the task queue. +// As additional arguments, it returns a result and an error channel, as well +// as an error if the instantiation fails. +func newTask(tenantID string, req *logproto.FilterChunkRefRequest) (Task, chan *logproto.GroupedChunkRefs, chan error, error) { + key, err := ulid.New(ulid.Now(), nil) + if err != nil { + return Task{}, nil, nil, err + } + errCh := make(chan error, 1) + resCh := make(chan *logproto.GroupedChunkRefs, 1) + task := Task{ + ID: key, + Tenant: tenantID, + Request: req, + ErrCh: errCh, + ResCh: resCh, + } + return task, resCh, errCh, nil +} + +// SyncMap is a map structure which can be synchronized using the RWMutex +type SyncMap[k comparable, v any] struct { + sync.RWMutex + Map map[k]v +} + +type pendingTasks SyncMap[ulid.ULID, Task] + +func (t *pendingTasks) Len() int { + t.RLock() + defer t.Unlock() + return len(t.Map) +} + +func (t *pendingTasks) Add(k ulid.ULID, v Task) { + t.Lock() + t.Map[k] = v + t.Unlock() +} + +func (t *pendingTasks) Delete(k ulid.ULID) { + t.Lock() + delete(t.Map, k) + t.Unlock() +} + +// makePendingTasks creates a SyncMap that holds pending tasks +func makePendingTasks(n int) *pendingTasks { + return &pendingTasks{ + RWMutex: sync.RWMutex{}, + Map: make(map[ulid.ULID]Task, n), + } } type Gateway struct { @@ -69,20 +172,33 @@ type Gateway struct { logger log.Logger metrics *metrics - bloomStore bloomshipper.Store + queue *queue.RequestQueue + queueMetrics *queue.Metrics + activeUsers *util.ActiveUsersCleanupService + bloomStore bloomshipper.Store sharding ShardingStrategy + + pendingTasks *pendingTasks + + serviceMngr *services.Manager + serviceWatcher *services.FailureWatcher } // New returns a new instance of the Bloom Gateway. func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, shardingStrategy ShardingStrategy, cm storage.ClientMetrics, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) { g := &Gateway{ - cfg: cfg, - logger: logger, - metrics: newMetrics(reg), - sharding: shardingStrategy, + cfg: cfg, + logger: logger, + metrics: newMetrics("bloom_gateway", reg), + sharding: shardingStrategy, + pendingTasks: makePendingTasks(pendingTasksInitialCap), } + g.queueMetrics = queue.NewMetrics("bloom_gateway", reg) + g.queue = queue.NewRequestQueue(maxTasksPerTenant, time.Minute, g.queueMetrics) + g.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(g.queueMetrics.Cleanup) + client, err := bloomshipper.NewBloomClient(schemaCfg.Configs, storageCfg, cm) if err != nil { return nil, err @@ -99,18 +215,112 @@ func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, s } g.bloomStore = bloomStore - g.Service = services.NewIdleService(g.starting, g.stopping) + + svcs := []services.Service{g.queue, g.activeUsers} + g.serviceMngr, err = services.NewManager(svcs...) + if err != nil { + return nil, err + } + g.serviceWatcher = services.NewFailureWatcher() + g.serviceWatcher.WatchManager(g.serviceMngr) + + g.Service = services.NewBasicService(g.starting, g.running, g.stopping).WithName("bloom-gateway") return g, nil } func (g *Gateway) starting(ctx context.Context) error { + var err error + defer func() { + if err == nil || g.serviceMngr == nil { + return + } + if err := services.StopManagerAndAwaitStopped(context.Background(), g.serviceMngr); err != nil { + level.Error(g.logger).Log("msg", "failed to gracefully stop bloom gateway dependencies", "err", err) + } + }() + + if err := services.StartManagerAndAwaitHealthy(ctx, g.serviceMngr); err != nil { + return errors.Wrap(err, "unable to start bloom gateway subservices") + } + + for i := 0; i < numWorkers; i++ { + go g.startWorker(ctx, fmt.Sprintf("worker-%d", i)) + } + return nil } +func (g *Gateway) running(ctx context.Context) error { + // We observe inflight tasks frequently and at regular intervals, to have a good + // approximation of max inflight tasks over percentiles of time. We also do it with + // a ticker so that we keep tracking it even if we have no new requests but stuck inflight + // tasks (eg. worker are all exhausted). + inflightTasksTicker := time.NewTicker(250 * time.Millisecond) + defer inflightTasksTicker.Stop() + + for { + select { + case <-ctx.Done(): + return nil + case err := <-g.serviceWatcher.Chan(): + return errors.Wrap(err, "bloom gateway subservice failed") + case <-inflightTasksTicker.C: + inflight := g.pendingTasks.Len() + g.metrics.inflightRequests.Observe(float64(inflight)) + } + } +} + func (g *Gateway) stopping(_ error) error { g.bloomStore.Stop() - return nil + return services.StopManagerAndAwaitStopped(context.Background(), g.serviceMngr) +} + +// This is just a dummy implementation of the worker! +// TODO(chaudum): Implement worker that dequeues multiple pending tasks and +// multiplexes them prior to execution. +func (g *Gateway) startWorker(_ context.Context, id string) error { + level.Info(g.logger).Log("msg", "starting worker", "worker", id) + + g.queue.RegisterConsumerConnection(id) + defer g.queue.UnregisterConsumerConnection(id) + + idx := queue.StartIndexWithLocalQueue + + for { + ctx := context.Background() + item, newIdx, err := g.queue.Dequeue(ctx, idx, id) + if err != nil { + if err != queue.ErrStopped { + level.Error(g.logger).Log("msg", "failed to dequeue task", "worker", id, "err", err) + continue + } + level.Info(g.logger).Log("msg", "stopping worker", "worker", id) + return err + } + task, ok := item.(Task) + if !ok { + level.Error(g.logger).Log("msg", "failed to cast to Task", "item", item) + continue + } + + idx = newIdx + level.Info(g.logger).Log("msg", "dequeued task", "worker", id, "task", task.ID) + g.pendingTasks.Delete(task.ID) + + r := task.Request + if len(r.Filters) > 0 { + r.Refs, err = g.bloomStore.FilterChunkRefs(ctx, task.Tenant, r.From.Time(), r.Through.Time(), r.Refs, r.Filters...) + } + if err != nil { + task.ErrCh <- err + } else { + for _, ref := range r.Refs { + task.ResCh <- ref + } + } + } } // FilterChunkRefs implements BloomGatewayServer @@ -131,15 +341,32 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk return req.Refs[i].Fingerprint < req.Refs[j].Fingerprint }) - chunkRefs := req.Refs + task, resCh, errCh, err := newTask(tenantID, req) + if err != nil { + return nil, err + } + + g.activeUsers.UpdateUserTimestamp(tenantID, time.Now()) + level.Info(g.logger).Log("msg", "enqueue task", "task", task.ID) + g.queue.Enqueue(tenantID, []string{}, task, 100, func() { + // When enqueuing, we also add the task to the pending tasks + g.pendingTasks.Add(task.ID, task) + }) - // Only query bloom filters if filters are present - if len(req.Filters) > 0 { - chunkRefs, err = g.bloomStore.FilterChunkRefs(ctx, tenantID, req.From.Time(), req.Through.Time(), req.Refs, req.Filters...) - if err != nil { + response := make([]*logproto.GroupedChunkRefs, 0, len(req.Refs)) + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case err := <-errCh: return nil, err + case res := <-resCh: + level.Info(g.logger).Log("msg", "got result", "task", task.ID, "tenant", tenantID, "res", res) + // wait for all parts of the full response + response = append(response, res) + if len(response) == len(req.Refs) { + return &logproto.FilterChunkRefResponse{ChunkRefs: response}, nil + } } } - - return &logproto.FilterChunkRefResponse{ChunkRefs: chunkRefs}, nil } diff --git a/pkg/bloomgateway/bloomgateway_test.go b/pkg/bloomgateway/bloomgateway_test.go index 285d11aaf4..bfb2b9b9d8 100644 --- a/pkg/bloomgateway/bloomgateway_test.go +++ b/pkg/bloomgateway/bloomgateway_test.go @@ -2,6 +2,7 @@ package bloomgateway import ( "context" + "os" "testing" "time" @@ -87,6 +88,10 @@ func TestBloomGateway_StartStopService(t *testing.T) { err = services.StartAndAwaitRunning(context.Background(), gw) require.NoError(t, err) + // Wait for workers to connect to queue + time.Sleep(50 * time.Millisecond) + require.Equal(t, float64(numWorkers), gw.queue.GetConnectedConsumersMetric()) + err = services.StopAndAwaitTerminated(context.Background(), gw) require.NoError(t, err) }) @@ -96,7 +101,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { tenantID := "test" ss := NewNoopStrategy() - logger := log.NewNopLogger() + logger := log.NewLogfmtLogger(os.Stderr) reg := prometheus.NewRegistry() cm := storage.NewClientMetrics() @@ -136,9 +141,17 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { } t.Run("returns unfiltered chunk refs if no filters provided", func(t *testing.T) { + reg := prometheus.NewRegistry() gw, err := New(cfg, schemaCfg, storageCfg, ss, cm, logger, reg) require.NoError(t, err) + err = services.StartAndAwaitRunning(context.Background(), gw) + require.NoError(t, err) + t.Cleanup(func() { + err = services.StopAndAwaitTerminated(context.Background(), gw) + require.NoError(t, err) + }) + ts, _ := time.Parse("2006-01-02 15:04", "2023-10-03 10:00") now := model.TimeFromUnix(ts.Unix()) @@ -174,6 +187,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { }) t.Run("returns error if chunk refs do not belong to tenant", func(t *testing.T) { + reg := prometheus.NewRegistry() gw, err := New(cfg, schemaCfg, storageCfg, ss, cm, logger, reg) require.NoError(t, err) @@ -196,4 +210,41 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { require.Equal(t, "expected chunk refs from tenant test, got tenant other: invalid tenant in chunk refs", err.Error()) }) + t.Run("gateway tracks active users", func(t *testing.T) { + reg := prometheus.NewRegistry() + gw, err := New(cfg, schemaCfg, storageCfg, ss, cm, logger, reg) + require.NoError(t, err) + + err = services.StartAndAwaitRunning(context.Background(), gw) + require.NoError(t, err) + t.Cleanup(func() { + err = services.StopAndAwaitTerminated(context.Background(), gw) + require.NoError(t, err) + }) + + ts, _ := time.Parse("2006-01-02 15:04", "2023-10-03 10:00") + now := model.TimeFromUnix(ts.Unix()) + + tenants := []string{"tenant-a", "tenant-b", "tenant-c"} + for idx, tenantID := range tenants { + chunkRefs := []*logproto.ChunkRef{ + { + Fingerprint: uint64(1000 + 100*idx), + UserID: tenantID, + From: now.Add(-24 * time.Hour), + Through: now, + Checksum: uint32(idx), + }, + } + req := &logproto.FilterChunkRefRequest{ + From: now.Add(-24 * time.Hour), + Through: now, + Refs: groupRefs(t, chunkRefs), + } + ctx := user.InjectOrgID(context.Background(), tenantID) + _, err = gw.FilterChunkRefs(ctx, req) + require.NoError(t, err) + } + require.Equal(t, tenants, gw.activeUsers.ActiveUsers()) + }) } diff --git a/pkg/lokifrontend/frontend/v1/frontend.go b/pkg/lokifrontend/frontend/v1/frontend.go index 320e3dd50f..836baf283a 100644 --- a/pkg/lokifrontend/frontend/v1/frontend.go +++ b/pkg/lokifrontend/frontend/v1/frontend.go @@ -20,7 +20,7 @@ import ( "github.com/grafana/loki/pkg/lokifrontend/frontend/v1/frontendv1pb" "github.com/grafana/loki/pkg/querier/stats" - "github.com/grafana/loki/pkg/scheduler/queue" + "github.com/grafana/loki/pkg/queue" "github.com/grafana/loki/pkg/util" lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc" "github.com/grafana/loki/pkg/util/validation" @@ -106,7 +106,7 @@ func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Regist f.numClients = promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{ Name: "cortex_query_frontend_connected_clients", Help: "Number of worker clients currently connected to the frontend.", - }, f.requestQueue.GetConnectedQuerierWorkersMetric) + }, f.requestQueue.GetConnectedConsumersMetric) f.Service = services.NewBasicService(f.starting, f.running, f.stopping) return f, nil @@ -189,8 +189,8 @@ func (f *Frontend) Process(server frontendv1pb.Frontend_ProcessServer) error { return err } - f.requestQueue.RegisterQuerierConnection(querierID) - defer f.requestQueue.UnregisterQuerierConnection(querierID) + f.requestQueue.RegisterConsumerConnection(querierID) + defer f.requestQueue.UnregisterConsumerConnection(querierID) lastIndex := queue.StartIndex @@ -273,7 +273,7 @@ func (f *Frontend) Process(server frontendv1pb.Frontend_ProcessServer) error { func (f *Frontend) NotifyClientShutdown(_ context.Context, req *frontendv1pb.NotifyClientShutdownRequest) (*frontendv1pb.NotifyClientShutdownResponse, error) { level.Info(f.log).Log("msg", "received shutdown notification from querier", "querier", req.GetClientID()) - f.requestQueue.NotifyQuerierShutdown(req.GetClientID()) + f.requestQueue.NotifyConsumerShutdown(req.GetClientID()) return &frontendv1pb.NotifyClientShutdownResponse{}, nil } @@ -327,7 +327,7 @@ func (f *Frontend) queueRequest(ctx context.Context, req *request) error { // chosen to match the same method in the ingester func (f *Frontend) CheckReady(_ context.Context) error { // if we have more than one querier connected we will consider ourselves ready - connectedClients := f.requestQueue.GetConnectedQuerierWorkersMetric() + connectedClients := f.requestQueue.GetConnectedConsumersMetric() if connectedClients > 0 { return nil } diff --git a/pkg/lokifrontend/frontend/v1/frontend_test.go b/pkg/lokifrontend/frontend/v1/frontend_test.go index 82422b79b5..cbe34776e6 100644 --- a/pkg/lokifrontend/frontend/v1/frontend_test.go +++ b/pkg/lokifrontend/frontend/v1/frontend_test.go @@ -33,7 +33,7 @@ import ( "github.com/grafana/loki/pkg/querier/queryrange" "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" querier_worker "github.com/grafana/loki/pkg/querier/worker" - "github.com/grafana/loki/pkg/scheduler/queue" + "github.com/grafana/loki/pkg/queue" ) const ( @@ -136,7 +136,7 @@ func TestFrontendCheckReady(t *testing.T) { requestQueue: queue.NewRequestQueue(5, 0, qm), } for i := 0; i < tt.connectedClients; i++ { - f.requestQueue.RegisterQuerierConnection("test") + f.requestQueue.RegisterConsumerConnection("test") } err := f.CheckReady(context.Background()) errMsg := "" diff --git a/pkg/scheduler/queue/dequeue_qos_test.go b/pkg/queue/dequeue_qos_test.go similarity index 93% rename from pkg/scheduler/queue/dequeue_qos_test.go rename to pkg/queue/dequeue_qos_test.go index 82dc3f66a2..0709f4723d 100644 --- a/pkg/scheduler/queue/dequeue_qos_test.go +++ b/pkg/queue/dequeue_qos_test.go @@ -60,7 +60,7 @@ func BenchmarkQueryFairness(t *testing.B) { enqueueRequestsForActor(t, []string{}, useActor, requestQueue, numSubRequestsActorA, 50*time.Millisecond) enqueueRequestsForActor(t, []string{"a"}, useActor, requestQueue, numSubRequestsActorA, 100*time.Millisecond) enqueueRequestsForActor(t, []string{"b"}, useActor, requestQueue, numSubRequestsActorB, 50*time.Millisecond) - requestQueue.queues.recomputeUserQueriers() + requestQueue.queues.recomputeUserConsumers() // set timeout to minize impact on overall test run duration in case something goes wrong ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) @@ -76,8 +76,8 @@ func BenchmarkQueryFairness(t *testing.B) { go func(id string) { defer wg.Done() - requestQueue.RegisterQuerierConnection(id) - defer requestQueue.UnregisterQuerierConnection(id) + requestQueue.RegisterConsumerConnection(id) + defer requestQueue.UnregisterConsumerConnection(id) idx := StartIndex for ctx.Err() == nil { r, newIdx, err := requestQueue.Dequeue(ctx, idx, id) @@ -143,7 +143,7 @@ func TestQueryFairnessAcrossSameLevel(t *testing.T) { _ = requestQueue.Enqueue("tenant1", []string{"xyz"}, r(22), 0, nil) _ = requestQueue.Enqueue("tenant1", []string{"xyz", "123"}, r(200), 0, nil) _ = requestQueue.Enqueue("tenant1", []string{"xyz", "456"}, r(210), 0, nil) - requestQueue.queues.recomputeUserQueriers() + requestQueue.queues.recomputeUserConsumers() items := make([]int, 0) @@ -151,8 +151,8 @@ func TestQueryFairnessAcrossSameLevel(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - requestQueue.RegisterQuerierConnection("querier") - defer requestQueue.UnregisterQuerierConnection("querier") + requestQueue.RegisterConsumerConnection("querier") + defer requestQueue.UnregisterConsumerConnection("querier") idx := StartIndexWithLocalQueue for ctx.Err() == nil { diff --git a/pkg/scheduler/queue/mapping.go b/pkg/queue/mapping.go similarity index 100% rename from pkg/scheduler/queue/mapping.go rename to pkg/queue/mapping.go diff --git a/pkg/scheduler/queue/mapping_test.go b/pkg/queue/mapping_test.go similarity index 100% rename from pkg/scheduler/queue/mapping_test.go rename to pkg/queue/mapping_test.go diff --git a/pkg/scheduler/queue/metrics.go b/pkg/queue/metrics.go similarity index 100% rename from pkg/scheduler/queue/metrics.go rename to pkg/queue/metrics.go diff --git a/pkg/scheduler/queue/queue.go b/pkg/queue/queue.go similarity index 86% rename from pkg/scheduler/queue/queue.go rename to pkg/queue/queue.go index cb25388a0c..fa1860e4e8 100644 --- a/pkg/scheduler/queue/queue.go +++ b/pkg/queue/queue.go @@ -51,7 +51,7 @@ type RequestChannel chan Request type RequestQueue struct { services.Service - connectedQuerierWorkers *atomic.Int32 + connectedConsumers *atomic.Int32 mtx sync.Mutex cond contextCond // Notified when request is enqueued or dequeued, or querier is disconnected. @@ -63,13 +63,13 @@ type RequestQueue struct { func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, metrics *Metrics) *RequestQueue { q := &RequestQueue{ - queues: newTenantQueues(maxOutstandingPerTenant, forgetDelay), - connectedQuerierWorkers: atomic.NewInt32(0), - metrics: metrics, + queues: newTenantQueues(maxOutstandingPerTenant, forgetDelay), + connectedConsumers: atomic.NewInt32(0), + metrics: metrics, } q.cond = contextCond{Cond: sync.NewCond(&q.mtx)} - q.Service = services.NewTimerService(forgetCheckPeriod, nil, q.forgetDisconnectedQueriers, q.stopping).WithName("request queue") + q.Service = services.NewTimerService(forgetCheckPeriod, nil, q.forgetDisconnectedConsumers, q.stopping).WithName("request queue") return q } @@ -127,8 +127,8 @@ func (q *RequestQueue) Enqueue(tenant string, path []string, req Request, maxQue // Dequeue find next tenant queue and takes the next request off of it. Will block if there are no requests. // By passing tenant index from previous call of this method, querier guarantees that it iterates over all tenants fairly. -// If querier finds that request from the tenant is already expired, it can get a request for the same tenant by using UserIndex.ReuseLastUser. -func (q *RequestQueue) Dequeue(ctx context.Context, last QueueIndex, querierID string) (Request, QueueIndex, error) { +// If consumer finds that request from the tenant is already expired, it can get a request for the same tenant by using UserIndex.ReuseLastUser. +func (q *RequestQueue) Dequeue(ctx context.Context, last QueueIndex, consumerID string) (Request, QueueIndex, error) { q.mtx.Lock() defer q.mtx.Unlock() @@ -140,7 +140,7 @@ FindQueue: querierWait = false start := time.Now() q.cond.Wait(ctx) - q.metrics.querierWaitTime.WithLabelValues(querierID).Observe(time.Since(start).Seconds()) + q.metrics.querierWaitTime.WithLabelValues(consumerID).Observe(time.Since(start).Seconds()) } if q.stopped { @@ -152,7 +152,7 @@ FindQueue: } for { - queue, tenant, idx := q.queues.getNextQueueForQuerier(last, querierID) + queue, tenant, idx := q.queues.getNextQueueForConsumer(last, consumerID) last = idx if queue == nil { break @@ -181,11 +181,11 @@ FindQueue: goto FindQueue } -func (q *RequestQueue) forgetDisconnectedQueriers(_ context.Context) error { +func (q *RequestQueue) forgetDisconnectedConsumers(_ context.Context) error { q.mtx.Lock() defer q.mtx.Unlock() - if q.queues.forgetDisconnectedQueriers(time.Now()) > 0 { + if q.queues.forgetDisconnectedConsumers(time.Now()) > 0 { // We need to notify goroutines cause having removed some queriers // may have caused a resharding. q.cond.Broadcast() @@ -198,7 +198,7 @@ func (q *RequestQueue) stopping(_ error) error { q.mtx.Lock() defer q.mtx.Unlock() - for !q.queues.hasNoTenantQueues() && q.connectedQuerierWorkers.Load() > 0 { + for !q.queues.hasNoTenantQueues() && q.connectedConsumers.Load() > 0 { q.cond.Wait(context.Background()) } @@ -211,30 +211,30 @@ func (q *RequestQueue) stopping(_ error) error { return nil } -func (q *RequestQueue) RegisterQuerierConnection(querier string) { - q.connectedQuerierWorkers.Inc() +func (q *RequestQueue) RegisterConsumerConnection(querier string) { + q.connectedConsumers.Inc() q.mtx.Lock() defer q.mtx.Unlock() - q.queues.addQuerierConnection(querier) + q.queues.addConsumerToConnection(querier) } -func (q *RequestQueue) UnregisterQuerierConnection(querier string) { - q.connectedQuerierWorkers.Dec() +func (q *RequestQueue) UnregisterConsumerConnection(querier string) { + q.connectedConsumers.Dec() q.mtx.Lock() defer q.mtx.Unlock() - q.queues.removeQuerierConnection(querier, time.Now()) + q.queues.removeConsumerConnection(querier, time.Now()) } -func (q *RequestQueue) NotifyQuerierShutdown(querierID string) { +func (q *RequestQueue) NotifyConsumerShutdown(querierID string) { q.mtx.Lock() defer q.mtx.Unlock() q.queues.notifyQuerierShutdown(querierID) } -func (q *RequestQueue) GetConnectedQuerierWorkersMetric() float64 { - return float64(q.connectedQuerierWorkers.Load()) +func (q *RequestQueue) GetConnectedConsumersMetric() float64 { + return float64(q.connectedConsumers.Load()) } // contextCond is a *sync.Cond with Wait() method overridden to support context-based waiting. diff --git a/pkg/scheduler/queue/queue_test.go b/pkg/queue/queue_test.go similarity index 96% rename from pkg/scheduler/queue/queue_test.go rename to pkg/queue/queue_test.go index 86adbbfe53..fe8d1a0a6a 100644 --- a/pkg/scheduler/queue/queue_test.go +++ b/pkg/queue/queue_test.go @@ -49,7 +49,7 @@ func BenchmarkGetNextRequest(b *testing.B) { queues = append(queues, queue) for ix := 0; ix < queriers; ix++ { - queue.RegisterQuerierConnection(fmt.Sprintf("querier-%d", ix)) + queue.RegisterConsumerConnection(fmt.Sprintf("querier-%d", ix)) } for i := 0; i < maxOutstandingPerTenant; i++ { @@ -106,7 +106,7 @@ func BenchmarkQueueRequest(b *testing.B) { q := NewRequestQueue(maxOutstandingPerTenant, 0, NewMetrics("query_scheduler", nil)) for ix := 0; ix < queriers; ix++ { - q.RegisterQuerierConnection(fmt.Sprintf("querier-%d", ix)) + q.RegisterConsumerConnection(fmt.Sprintf("querier-%d", ix)) } queues = append(queues, q) @@ -143,8 +143,8 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBe }) // Two queriers connect. - queue.RegisterQuerierConnection("querier-1") - queue.RegisterQuerierConnection("querier-2") + queue.RegisterConsumerConnection("querier-1") + queue.RegisterConsumerConnection("querier-2") // Querier-2 waits for a new request. querier2wg := sync.WaitGroup{} @@ -156,7 +156,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBe }() // Querier-1 crashes (no graceful shutdown notification). - queue.UnregisterQuerierConnection("querier-1") + queue.UnregisterConsumerConnection("querier-1") // Enqueue a request from an user which would be assigned to querier-1. // NOTE: "user-1" hash falls in the querier-1 shard. @@ -305,7 +305,7 @@ func TestMaxQueueSize(t *testing.T) { t.Run("queue size is tracked per tenant", func(t *testing.T) { maxSize := 3 queue := NewRequestQueue(maxSize, 0, NewMetrics("query_scheduler", nil)) - queue.RegisterQuerierConnection("querier") + queue.RegisterConsumerConnection("querier") // enqueue maxSize items with different actors // different actors have individual channels with maxSize length diff --git a/pkg/scheduler/queue/tenant_queues.go b/pkg/queue/tenant_queues.go similarity index 50% rename from pkg/scheduler/queue/tenant_queues.go rename to pkg/queue/tenant_queues.go index 9a7b42cbfd..46e8a999fb 100644 --- a/pkg/scheduler/queue/tenant_queues.go +++ b/pkg/queue/tenant_queues.go @@ -38,35 +38,35 @@ func (tqs intPointerMap) Dec(key string) int { return *ptr } -// querier holds information about a querier registered in the queue. -type querier struct { +// consumer holds information about a consumer registered in the queue. +type consumer struct { // Number of active connections. connections int - // True if the querier notified it's gracefully shutting down. + // True if the consumer notified it's gracefully shutting down. shuttingDown bool // When the last connection has been unregistered. disconnectedAt time.Time } -// This struct holds tenant queues for pending requests. It also keeps track of connected queriers, -// and mapping between tenants and queriers. +// This struct holds tenant queues for pending requests. It also keeps track of connected consumers, +// and mapping between tenants and consumers. type tenantQueues struct { mapping *Mapping[*tenantQueue] maxUserQueueSize int perUserQueueLen intPointerMap - // How long to wait before removing a querier which has got disconnected + // How long to wait before removing a consumer which has got disconnected // but hasn't notified about a graceful shutdown. forgetDelay time.Duration - // Tracks queriers registered to the queue. - queriers map[string]*querier + // Tracks consumers registered to the queue. + consumers map[string]*consumer - // Sorted list of querier names, used when creating per-user shard. - sortedQueriers []string + // sortedConsumer list of consumer IDs, used when creating per-user shard. + sortedConsumers []string } type Queue interface { @@ -86,12 +86,12 @@ type Mapable interface { type tenantQueue struct { *TreeQueue - // If not nil, only these queriers can handle user requests. If nil, all queriers can. - // We set this to nil if number of available queriers <= maxQueriers. - queriers map[string]struct{} + // If not nil, only these consumers can handle user requests. If nil, all consumers can. + // We set this to nil if number of available consumers <= maxQueriers. + consumers map[string]struct{} maxQueriers int - // Seed for shuffle sharding of queriers. This seed is based on userID only and is therefore consistent + // Seed for shuffle sharding of consumers. This seed is based on userID only and is therefore consistent // between different frontends. seed int64 } @@ -104,8 +104,8 @@ func newTenantQueues(maxUserQueueSize int, forgetDelay time.Duration) *tenantQue maxUserQueueSize: maxUserQueueSize, perUserQueueLen: make(intPointerMap), forgetDelay: forgetDelay, - queriers: map[string]*querier{}, - sortedQueriers: nil, + consumers: map[string]*consumer{}, + sortedConsumers: nil, } } @@ -118,9 +118,9 @@ func (q *tenantQueues) deleteQueue(tenant string) { } // Returns existing or new queue for a tenant. -// MaxQueriers is used to compute which queriers should handle requests for this tenant. -// If maxQueriers is <= 0, all queriers can handle this tenant's requests. -// If maxQueriers has changed since the last call, queriers for this are recomputed. +// MaxQueriers is used to compute which consumers should handle requests for this tenant. +// If maxQueriers is <= 0, all consumers can handle this tenant's requests. +// If maxQueriers has changed since the last call, consumers for this are recomputed. func (q *tenantQueues) getOrAddQueue(tenant string, path []string, maxQueriers int) Queue { // Empty tenant is not allowed, as that would break our tenants list ("" is used for free spot). if tenant == "" { @@ -142,7 +142,7 @@ func (q *tenantQueues) getOrAddQueue(tenant string, path []string, maxQueriers i if uq.maxQueriers != maxQueriers { uq.maxQueriers = maxQueriers - uq.queriers = shuffleQueriersForTenants(uq.seed, maxQueriers, q.sortedQueriers, nil) + uq.consumers = shuffleConsumersForTenants(uq.seed, maxQueriers, q.sortedConsumers, nil) } if len(path) == 0 { @@ -151,10 +151,10 @@ func (q *tenantQueues) getOrAddQueue(tenant string, path []string, maxQueriers i return uq.add(path) } -// Finds next queue for the querier. To support fair scheduling between users, client is expected +// Finds next queue for the consumer. To support fair scheduling between users, client is expected // to pass last user index returned by this function as argument. Is there was no previous // last user index, use -1. -func (q *tenantQueues) getNextQueueForQuerier(lastUserIndex QueueIndex, querierID string) (Queue, string, QueueIndex) { +func (q *tenantQueues) getNextQueueForConsumer(lastUserIndex QueueIndex, consumerID string) (Queue, string, QueueIndex) { uid := lastUserIndex // at the RequestQueue level we don't have local queues, so start index is -1 @@ -162,9 +162,9 @@ func (q *tenantQueues) getNextQueueForQuerier(lastUserIndex QueueIndex, querierI uid = StartIndex } - // Ensure the querier is not shutting down. If the querier is shutting down, we shouldn't forward + // Ensure the consumer is not shutting down. If the consumer is shutting down, we shouldn't forward // any more queries to it. - if info := q.queriers[querierID]; info == nil || info.shuttingDown { + if info := q.consumers[consumerID]; info == nil || info.shuttingDown { return nil, "", uid } @@ -180,9 +180,9 @@ func (q *tenantQueues) getNextQueueForQuerier(lastUserIndex QueueIndex, querierI } uid = tq.pos - if tq.queriers != nil { - if _, ok := tq.queriers[querierID]; !ok { - // This querier is not handling the user. + if tq.consumers != nil { + if _, ok := tq.consumers[consumerID]; !ok { + // This consumer is not handling the user. continue } } @@ -192,30 +192,30 @@ func (q *tenantQueues) getNextQueueForQuerier(lastUserIndex QueueIndex, querierI return nil, "", uid } -func (q *tenantQueues) addQuerierConnection(querierID string) { - info := q.queriers[querierID] +func (q *tenantQueues) addConsumerToConnection(consumerID string) { + info := q.consumers[consumerID] if info != nil { info.connections++ - // Reset in case the querier re-connected while it was in the forget waiting period. + // Reset in case the consumer re-connected while it was in the forget waiting period. info.shuttingDown = false info.disconnectedAt = time.Time{} return } - // First connection from this querier. - q.queriers[querierID] = &querier{connections: 1} - q.sortedQueriers = append(q.sortedQueriers, querierID) - sort.Strings(q.sortedQueriers) + // First connection from this consumer. + q.consumers[consumerID] = &consumer{connections: 1} + q.sortedConsumers = append(q.sortedConsumers, consumerID) + sort.Strings(q.sortedConsumers) - q.recomputeUserQueriers() + q.recomputeUserConsumers() } -func (q *tenantQueues) removeQuerierConnection(querierID string, now time.Time) { - info := q.queriers[querierID] +func (q *tenantQueues) removeConsumerConnection(consumerID string, now time.Time) { + info := q.consumers[consumerID] if info == nil || info.connections <= 0 { - panic("unexpected number of connections for querier") + panic("unexpected number of connections for consumer") } // Decrease the number of active connections. @@ -225,65 +225,65 @@ func (q *tenantQueues) removeQuerierConnection(querierID string, now time.Time) } // There no more active connections. If the forget delay is configured then - // we can remove it only if querier has announced a graceful shutdown. + // we can remove it only if consumer has announced a graceful shutdown. if info.shuttingDown || q.forgetDelay == 0 { - q.removeQuerier(querierID) + q.removeConsumer(consumerID) return } // No graceful shutdown has been notified yet, so we should track the current time - // so that we'll remove the querier as soon as we receive the graceful shutdown + // so that we'll remove the consumer as soon as we receive the graceful shutdown // notification (if any) or once the threshold expires. info.disconnectedAt = now } -func (q *tenantQueues) removeQuerier(querierID string) { - delete(q.queriers, querierID) +func (q *tenantQueues) removeConsumer(consumerID string) { + delete(q.consumers, consumerID) - ix := sort.SearchStrings(q.sortedQueriers, querierID) - if ix >= len(q.sortedQueriers) || q.sortedQueriers[ix] != querierID { - panic("incorrect state of sorted queriers") + ix := sort.SearchStrings(q.sortedConsumers, consumerID) + if ix >= len(q.sortedConsumers) || q.sortedConsumers[ix] != consumerID { + panic("incorrect state of sorted consumers") } - q.sortedQueriers = append(q.sortedQueriers[:ix], q.sortedQueriers[ix+1:]...) + q.sortedConsumers = append(q.sortedConsumers[:ix], q.sortedConsumers[ix+1:]...) - q.recomputeUserQueriers() + q.recomputeUserConsumers() } -// notifyQuerierShutdown records that a querier has sent notification about a graceful shutdown. -func (q *tenantQueues) notifyQuerierShutdown(querierID string) { - info := q.queriers[querierID] +// notifyQuerierShutdown records that a consumer has sent notification about a graceful shutdown. +func (q *tenantQueues) notifyQuerierShutdown(consumerID string) { + info := q.consumers[consumerID] if info == nil { - // The querier may have already been removed, so we just ignore it. + // The consumer may have already been removed, so we just ignore it. return } - // If there are no more connections, we should remove the querier. + // If there are no more connections, we should remove the consumer. if info.connections == 0 { - q.removeQuerier(querierID) + q.removeConsumer(consumerID) return } // Otherwise we should annotate we received a graceful shutdown notification - // and the querier will be removed once all connections are unregistered. + // and the consumer will be removed once all connections are unregistered. info.shuttingDown = true } -// forgetDisconnectedQueriers removes all disconnected queriers that have gone since at least -// the forget delay. Returns the number of forgotten queriers. -func (q *tenantQueues) forgetDisconnectedQueriers(now time.Time) int { +// forgetDisconnectedConsumers removes all disconnected consumer that have gone since at least +// the forget delay. Returns the number of forgotten consumers. +func (q *tenantQueues) forgetDisconnectedConsumers(now time.Time) int { // Nothing to do if the forget delay is disabled. if q.forgetDelay == 0 { return 0 } - // Remove all queriers with no connections that have gone since at least the forget delay. + // Remove all consumers with no connections that have gone since at least the forget delay. threshold := now.Add(-q.forgetDelay) forgotten := 0 - for querierID := range q.queriers { - if info := q.queriers[querierID]; info.connections == 0 && info.disconnectedAt.Before(threshold) { - q.removeQuerier(querierID) + for id := range q.consumers { + if info := q.consumers[id]; info.connections == 0 && info.disconnectedAt.Before(threshold) { + q.removeConsumer(id) forgotten++ } } @@ -291,30 +291,30 @@ func (q *tenantQueues) forgetDisconnectedQueriers(now time.Time) int { return forgotten } -func (q *tenantQueues) recomputeUserQueriers() { - scratchpad := make([]string, 0, len(q.sortedQueriers)) +func (q *tenantQueues) recomputeUserConsumers() { + scratchpad := make([]string, 0, len(q.sortedConsumers)) for _, uq := range q.mapping.Values() { - uq.queriers = shuffleQueriersForTenants(uq.seed, uq.maxQueriers, q.sortedQueriers, scratchpad) + uq.consumers = shuffleConsumersForTenants(uq.seed, uq.maxQueriers, q.sortedConsumers, scratchpad) } } -// shuffleQueriersForTenants returns nil if queriersToSelect is 0 or there are not enough queriers to select from. -// In that case *all* queriers should be used. +// shuffleConsumersForTenants returns nil if consumersToSelect is 0 or there are not enough consumers to select from. +// In that case *all* consumers should be used. // Scratchpad is used for shuffling, to avoid new allocations. If nil, new slice is allocated. -func shuffleQueriersForTenants(userSeed int64, queriersToSelect int, allSortedQueriers []string, scratchpad []string) map[string]struct{} { - if queriersToSelect == 0 || len(allSortedQueriers) <= queriersToSelect { +func shuffleConsumersForTenants(userSeed int64, consumersToSelect int, allSortedConsumers []string, scratchpad []string) map[string]struct{} { + if consumersToSelect == 0 || len(allSortedConsumers) <= consumersToSelect { return nil } - result := make(map[string]struct{}, queriersToSelect) + result := make(map[string]struct{}, consumersToSelect) rnd := rand.New(rand.NewSource(userSeed)) scratchpad = scratchpad[:0] - scratchpad = append(scratchpad, allSortedQueriers...) + scratchpad = append(scratchpad, allSortedConsumers...) last := len(scratchpad) - 1 - for i := 0; i < queriersToSelect; i++ { + for i := 0; i < consumersToSelect; i++ { r := rnd.Intn(last + 1) result[scratchpad[r]] = struct{}{} // move selected item to the end, it won't be selected anymore. diff --git a/pkg/scheduler/queue/tenant_queues_test.go b/pkg/queue/tenant_queues_test.go similarity index 81% rename from pkg/scheduler/queue/tenant_queues_test.go rename to pkg/queue/tenant_queues_test.go index 626fbc57cf..95f2a67963 100644 --- a/pkg/scheduler/queue/tenant_queues_test.go +++ b/pkg/queue/tenant_queues_test.go @@ -22,10 +22,10 @@ func TestQueues(t *testing.T) { assert.NotNil(t, uq) assert.NoError(t, isConsistent(uq)) - uq.addQuerierConnection("querier-1") - uq.addQuerierConnection("querier-2") + uq.addConsumerToConnection("querier-1") + uq.addConsumerToConnection("querier-2") - q, u, lastUserIndex := uq.getNextQueueForQuerier(-1, "querier-1") + q, u, lastUserIndex := uq.getNextQueueForConsumer(-1, "querier-1") assert.Nil(t, q) assert.Equal(t, "", u) @@ -71,7 +71,7 @@ func TestQueues(t *testing.T) { uq.deleteQueue("four") assert.NoError(t, isConsistent(uq)) - q, _, _ = uq.getNextQueueForQuerier(lastUserIndex, "querier-1") + q, _, _ = uq.getNextQueueForConsumer(lastUserIndex, "querier-1") assert.Nil(t, q) } @@ -80,8 +80,8 @@ func TestQueuesOnTerminatingQuerier(t *testing.T) { assert.NotNil(t, uq) assert.NoError(t, isConsistent(uq)) - uq.addQuerierConnection("querier-1") - uq.addQuerierConnection("querier-2") + uq.addConsumerToConnection("querier-1") + uq.addConsumerToConnection("querier-2") // Add queues: [one, two] qOne := getOrAdd(t, uq, "one", 0) @@ -91,7 +91,7 @@ func TestQueuesOnTerminatingQuerier(t *testing.T) { // After notify shutdown for querier-2, it's expected to own no queue. uq.notifyQuerierShutdown("querier-2") - q, u, _ := uq.getNextQueueForQuerier(-1, "querier-2") + q, u, _ := uq.getNextQueueForConsumer(-1, "querier-2") assert.Nil(t, q) assert.Equal(t, "", u) @@ -99,8 +99,8 @@ func TestQueuesOnTerminatingQuerier(t *testing.T) { confirmOrderForQuerier(t, uq, "querier-1", -1, qOne, qTwo, qOne, qTwo) // After disconnecting querier-2, it's expected to own no queue. - uq.removeQuerier("querier-2") - q, u, _ = uq.getNextQueueForQuerier(-1, "querier-2") + uq.removeConsumer("querier-2") + q, u, _ = uq.getNextQueueForConsumer(-1, "querier-2") assert.Nil(t, q) assert.Equal(t, "", u) } @@ -117,10 +117,10 @@ func TestQueuesWithQueriers(t *testing.T) { // Add some queriers. for ix := 0; ix < queriers; ix++ { qid := fmt.Sprintf("querier-%d", ix) - uq.addQuerierConnection(qid) + uq.addConsumerToConnection(qid) // No querier has any queues yet. - q, u, _ := uq.getNextQueueForQuerier(-1, qid) + q, u, _ := uq.getNextQueueForConsumer(-1, qid) assert.Nil(t, q) assert.Equal(t, "", u) } @@ -133,7 +133,7 @@ func TestQueuesWithQueriers(t *testing.T) { getOrAdd(t, uq, uid, maxQueriersPerUser) // Verify it has maxQueriersPerUser queriers assigned now. - qs := uq.mapping.GetByKey(uid).queriers + qs := uq.mapping.GetByKey(uid).consumers assert.Equal(t, maxQueriersPerUser, len(qs)) } @@ -146,7 +146,7 @@ func TestQueuesWithQueriers(t *testing.T) { lastUserIndex := StartIndex for { - _, _, newIx := uq.getNextQueueForQuerier(lastUserIndex, qid) + _, _, newIx := uq.getNextQueueForConsumer(lastUserIndex, qid) if newIx < lastUserIndex { break } @@ -199,18 +199,18 @@ func TestQueuesConsistency(t *testing.T) { assert.NotNil(t, uq.getOrAddQueue(generateTenant(r), generateActor(r), 3)) case 1: qid := generateQuerier(r) - _, _, luid := uq.getNextQueueForQuerier(lastUserIndexes[qid], qid) + _, _, luid := uq.getNextQueueForConsumer(lastUserIndexes[qid], qid) lastUserIndexes[qid] = luid case 2: uq.deleteQueue(generateTenant(r)) case 3: q := generateQuerier(r) - uq.addQuerierConnection(q) + uq.addConsumerToConnection(q) conns[q]++ case 4: q := generateQuerier(r) if conns[q] > 0 { - uq.removeQuerierConnection(q, time.Now()) + uq.removeConsumerConnection(q, time.Now()) conns[q]-- } case 5: @@ -238,8 +238,8 @@ func TestQueues_ForgetDelay(t *testing.T) { // 3 queriers open 2 connections each. for i := 1; i <= 3; i++ { - uq.addQuerierConnection(fmt.Sprintf("querier-%d", i)) - uq.addQuerierConnection(fmt.Sprintf("querier-%d", i)) + uq.addConsumerToConnection(fmt.Sprintf("querier-%d", i)) + uq.addConsumerToConnection(fmt.Sprintf("querier-%d", i)) } // Add user queues. @@ -253,12 +253,12 @@ func TestQueues_ForgetDelay(t *testing.T) { require.NotEmpty(t, querier1Users) // Gracefully shutdown querier-1. - uq.removeQuerierConnection("querier-1", now.Add(20*time.Second)) - uq.removeQuerierConnection("querier-1", now.Add(21*time.Second)) + uq.removeConsumerConnection("querier-1", now.Add(20*time.Second)) + uq.removeConsumerConnection("querier-1", now.Add(21*time.Second)) uq.notifyQuerierShutdown("querier-1") // We expect querier-1 has been removed. - assert.NotContains(t, uq.queriers, "querier-1") + assert.NotContains(t, uq.consumers, "querier-1") assert.NoError(t, isConsistent(uq)) // We expect querier-1 users have been shuffled to other queriers. @@ -267,8 +267,8 @@ func TestQueues_ForgetDelay(t *testing.T) { } // Querier-1 reconnects. - uq.addQuerierConnection("querier-1") - uq.addQuerierConnection("querier-1") + uq.addConsumerToConnection("querier-1") + uq.addConsumerToConnection("querier-1") // We expect the initial querier-1 users have got back to querier-1. for _, userID := range querier1Users { @@ -278,11 +278,11 @@ func TestQueues_ForgetDelay(t *testing.T) { } // Querier-1 abruptly terminates (no shutdown notification received). - uq.removeQuerierConnection("querier-1", now.Add(40*time.Second)) - uq.removeQuerierConnection("querier-1", now.Add(41*time.Second)) + uq.removeConsumerConnection("querier-1", now.Add(40*time.Second)) + uq.removeConsumerConnection("querier-1", now.Add(41*time.Second)) // We expect querier-1 has NOT been removed. - assert.Contains(t, uq.queriers, "querier-1") + assert.Contains(t, uq.consumers, "querier-1") assert.NoError(t, isConsistent(uq)) // We expect the querier-1 users have not been shuffled to other queriers. @@ -293,9 +293,9 @@ func TestQueues_ForgetDelay(t *testing.T) { } // Try to forget disconnected queriers, but querier-1 forget delay hasn't passed yet. - uq.forgetDisconnectedQueriers(now.Add(90 * time.Second)) + uq.forgetDisconnectedConsumers(now.Add(90 * time.Second)) - assert.Contains(t, uq.queriers, "querier-1") + assert.Contains(t, uq.consumers, "querier-1") assert.NoError(t, isConsistent(uq)) for _, userID := range querier1Users { @@ -305,9 +305,9 @@ func TestQueues_ForgetDelay(t *testing.T) { } // Try to forget disconnected queriers. This time querier-1 forget delay has passed. - uq.forgetDisconnectedQueriers(now.Add(105 * time.Second)) + uq.forgetDisconnectedConsumers(now.Add(105 * time.Second)) - assert.NotContains(t, uq.queriers, "querier-1") + assert.NotContains(t, uq.consumers, "querier-1") assert.NoError(t, isConsistent(uq)) // We expect querier-1 users have been shuffled to other queriers. @@ -330,8 +330,8 @@ func TestQueues_ForgetDelay_ShouldCorrectlyHandleQuerierReconnectingBeforeForget // 3 queriers open 2 connections each. for i := 1; i <= 3; i++ { - uq.addQuerierConnection(fmt.Sprintf("querier-%d", i)) - uq.addQuerierConnection(fmt.Sprintf("querier-%d", i)) + uq.addConsumerToConnection(fmt.Sprintf("querier-%d", i)) + uq.addConsumerToConnection(fmt.Sprintf("querier-%d", i)) } // Add user queues. @@ -345,11 +345,11 @@ func TestQueues_ForgetDelay_ShouldCorrectlyHandleQuerierReconnectingBeforeForget require.NotEmpty(t, querier1Users) // Querier-1 abruptly terminates (no shutdown notification received). - uq.removeQuerierConnection("querier-1", now.Add(40*time.Second)) - uq.removeQuerierConnection("querier-1", now.Add(41*time.Second)) + uq.removeConsumerConnection("querier-1", now.Add(40*time.Second)) + uq.removeConsumerConnection("querier-1", now.Add(41*time.Second)) // We expect querier-1 has NOT been removed. - assert.Contains(t, uq.queriers, "querier-1") + assert.Contains(t, uq.consumers, "querier-1") assert.NoError(t, isConsistent(uq)) // We expect the querier-1 users have not been shuffled to other queriers. @@ -360,13 +360,13 @@ func TestQueues_ForgetDelay_ShouldCorrectlyHandleQuerierReconnectingBeforeForget } // Try to forget disconnected queriers, but querier-1 forget delay hasn't passed yet. - uq.forgetDisconnectedQueriers(now.Add(90 * time.Second)) + uq.forgetDisconnectedConsumers(now.Add(90 * time.Second)) // Querier-1 reconnects. - uq.addQuerierConnection("querier-1") - uq.addQuerierConnection("querier-1") + uq.addConsumerToConnection("querier-1") + uq.addConsumerToConnection("querier-1") - assert.Contains(t, uq.queriers, "querier-1") + assert.Contains(t, uq.consumers, "querier-1") assert.NoError(t, isConsistent(uq)) // We expect the querier-1 users have not been shuffled to other queriers. @@ -377,9 +377,9 @@ func TestQueues_ForgetDelay_ShouldCorrectlyHandleQuerierReconnectingBeforeForget } // Try to forget disconnected queriers far in the future, but there's no disconnected querier. - uq.forgetDisconnectedQueriers(now.Add(200 * time.Second)) + uq.forgetDisconnectedConsumers(now.Add(200 * time.Second)) - assert.Contains(t, uq.queriers, "querier-1") + assert.Contains(t, uq.consumers, "querier-1") assert.NoError(t, isConsistent(uq)) for _, userID := range querier1Users { @@ -414,7 +414,7 @@ func confirmOrderForQuerier(t *testing.T, uq *tenantQueues, querier string, last t.Helper() var n Queue for _, q := range qs { - n, _, lastUserIndex = uq.getNextQueueForQuerier(lastUserIndex, querier) + n, _, lastUserIndex = uq.getNextQueueForConsumer(lastUserIndex, querier) assert.Equal(t, q, n) assert.NoError(t, isConsistent(uq)) } @@ -422,7 +422,7 @@ func confirmOrderForQuerier(t *testing.T, uq *tenantQueues, querier string, last } func isConsistent(uq *tenantQueues) error { - if len(uq.sortedQueriers) != len(uq.queriers) { + if len(uq.sortedConsumers) != len(uq.consumers) { return fmt.Errorf("inconsistent number of sorted queriers and querier connections") } @@ -441,16 +441,16 @@ func isConsistent(uq *tenantQueues) error { uc++ - if q.maxQueriers == 0 && q.queriers != nil { + if q.maxQueriers == 0 && q.consumers != nil { return fmt.Errorf("user %s has queriers, but maxQueriers=0", u) } - if q.maxQueriers > 0 && len(uq.sortedQueriers) <= q.maxQueriers && q.queriers != nil { + if q.maxQueriers > 0 && len(uq.sortedConsumers) <= q.maxQueriers && q.consumers != nil { return fmt.Errorf("user %s has queriers set despite not enough queriers available", u) } - if q.maxQueriers > 0 && len(uq.sortedQueriers) > q.maxQueriers && len(q.queriers) != q.maxQueriers { - return fmt.Errorf("user %s has incorrect number of queriers, expected=%d, got=%d", u, len(q.queriers), q.maxQueriers) + if q.maxQueriers > 0 && len(uq.sortedConsumers) > q.maxQueriers && len(q.consumers) != q.maxQueriers { + return fmt.Errorf("user %s has incorrect number of queriers, expected=%d, got=%d", u, len(q.consumers), q.maxQueriers) } } @@ -466,12 +466,12 @@ func getUsersByQuerier(queues *tenantQueues, querierID string) []string { var userIDs []string for _, userID := range queues.mapping.Keys() { q := queues.mapping.GetByKey(userID) - if q.queriers == nil { + if q.consumers == nil { // If it's nil then all queriers can handle this user. userIDs = append(userIDs, userID) continue } - if _, ok := q.queriers[querierID]; ok { + if _, ok := q.consumers[querierID]; ok { userIDs = append(userIDs, userID) } } @@ -481,14 +481,14 @@ func getUsersByQuerier(queues *tenantQueues, querierID string) []string { func TestShuffleQueriers(t *testing.T) { allQueriers := []string{"a", "b", "c", "d", "e"} - require.Nil(t, shuffleQueriersForTenants(12345, 10, allQueriers, nil)) - require.Nil(t, shuffleQueriersForTenants(12345, len(allQueriers), allQueriers, nil)) + require.Nil(t, shuffleConsumersForTenants(12345, 10, allQueriers, nil)) + require.Nil(t, shuffleConsumersForTenants(12345, len(allQueriers), allQueriers, nil)) - r1 := shuffleQueriersForTenants(12345, 3, allQueriers, nil) + r1 := shuffleConsumersForTenants(12345, 3, allQueriers, nil) require.Equal(t, 3, len(r1)) // Same input produces same output. - r2 := shuffleQueriersForTenants(12345, 3, allQueriers, nil) + r2 := shuffleConsumersForTenants(12345, 3, allQueriers, nil) require.Equal(t, 3, len(r2)) require.Equal(t, r1, r2) } @@ -510,7 +510,7 @@ func TestShuffleQueriersCorrectness(t *testing.T) { toSelect = 3 } - selected := shuffleQueriersForTenants(r.Int63(), toSelect, allSortedQueriers, nil) + selected := shuffleConsumersForTenants(r.Int63(), toSelect, allSortedQueriers, nil) require.Equal(t, toSelect, len(selected)) diff --git a/pkg/scheduler/queue/treequeue.go b/pkg/queue/treequeue.go similarity index 100% rename from pkg/scheduler/queue/treequeue.go rename to pkg/queue/treequeue.go diff --git a/pkg/scheduler/queue/treequeue_test.go b/pkg/queue/treequeue_test.go similarity index 100% rename from pkg/scheduler/queue/treequeue_test.go rename to pkg/queue/treequeue_test.go diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index ef2a1b5bbe..8dfbc5eb77 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -29,7 +29,7 @@ import ( "google.golang.org/grpc" "github.com/grafana/loki/pkg/lokifrontend/frontend/v2/frontendv2pb" - "github.com/grafana/loki/pkg/scheduler/queue" + "github.com/grafana/loki/pkg/queue" "github.com/grafana/loki/pkg/scheduler/schedulerpb" "github.com/grafana/loki/pkg/util" lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc" @@ -144,7 +144,7 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, ringManager *lokiri s.connectedQuerierClients = promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{ Name: "cortex_query_scheduler_connected_querier_clients", Help: "Number of querier worker clients currently connected to the query-scheduler.", - }, s.requestQueue.GetConnectedQuerierWorkersMetric) + }, s.requestQueue.GetConnectedConsumersMetric) s.connectedFrontendClients = promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{ Name: "cortex_query_scheduler_connected_frontend_clients", Help: "Number of query-frontend worker clients currently connected to the query-scheduler.", @@ -404,8 +404,8 @@ func (s *Scheduler) QuerierLoop(querier schedulerpb.SchedulerForQuerier_QuerierL querierID := resp.GetQuerierID() level.Debug(s.log).Log("msg", "querier connected", "querier", querierID) - s.requestQueue.RegisterQuerierConnection(querierID) - defer s.requestQueue.UnregisterQuerierConnection(querierID) + s.requestQueue.RegisterConsumerConnection(querierID) + defer s.requestQueue.UnregisterConsumerConnection(querierID) lastIndex := queue.StartIndex @@ -463,7 +463,7 @@ func (s *Scheduler) QuerierLoop(querier schedulerpb.SchedulerForQuerier_QuerierL func (s *Scheduler) NotifyQuerierShutdown(_ context.Context, req *schedulerpb.NotifyQuerierShutdownRequest) (*schedulerpb.NotifyQuerierShutdownResponse, error) { level.Debug(s.log).Log("msg", "received shutdown notification from querier", "querier", req.GetQuerierID()) - s.requestQueue.NotifyQuerierShutdown(req.GetQuerierID()) + s.requestQueue.NotifyConsumerShutdown(req.GetQuerierID()) return &schedulerpb.NotifyQuerierShutdownResponse{}, nil } diff --git a/pkg/util/active_user.go b/pkg/util/active_user.go index 3e495b9b84..08b327df93 100644 --- a/pkg/util/active_user.go +++ b/pkg/util/active_user.go @@ -128,3 +128,13 @@ func (s *ActiveUsersCleanupService) iteration(_ context.Context) error { } return nil } + +func (s *ActiveUsersCleanupService) ActiveUsers() []string { + s.activeUsers.mu.RLock() + defer s.activeUsers.mu.RUnlock() + users := make([]string, 0, len(s.activeUsers.timestamps)) + for u := range s.activeUsers.timestamps { + users = append(users, u) + } + return users +}