diff --git a/pkg/engine/internal/scheduler/collector.go b/pkg/engine/internal/scheduler/collector.go new file mode 100644 index 0000000000..02fd538afc --- /dev/null +++ b/pkg/engine/internal/scheduler/collector.go @@ -0,0 +1,183 @@ +package scheduler + +import ( + "github.com/prometheus/client_golang/prometheus" + + "github.com/grafana/loki/v3/pkg/engine/internal/workflow" +) + +// collector implements [prometheus.Collector], collecting metrics for a +// [Scheduler]. +type collector struct { + sched *Scheduler + + tasksInflight *prometheus.Desc + streamsInflight *prometheus.Desc + + connections *prometheus.Desc + workers *prometheus.Desc + threads *prometheus.Desc +} + +var _ prometheus.Collector = (*collector)(nil) + +func newCollector(sched *Scheduler) *collector { + return &collector{ + sched: sched, + + tasksInflight: prometheus.NewDesc( + "loki_engine_scheduler_tasks_inflight", + "Number of in-flight tasks by state", + []string{"state"}, + nil, + ), + streamsInflight: prometheus.NewDesc( + "loki_engine_scheduler_streams_inflight", + "Number of in-flight streams by state", + []string{"state"}, + nil, + ), + + connections: prometheus.NewDesc( + "loki_engine_scheduler_connections_active", + "Current number active connections to the scheduler", + nil, + nil, + ), + workers: prometheus.NewDesc( + "loki_engine_scheduler_workers", + "Current number of workers connected to the scheduler by state", + []string{"state"}, + nil, + ), + threads: prometheus.NewDesc( + "loki_engine_scheduler_threads", + "Current number of worker threads connected to the scheduler by state", + []string{"state"}, + nil, + ), + } +} + +func (mc *collector) Collect(ch chan<- prometheus.Metric) { + mc.collectResourceStats(ch) + mc.collectConnStats(ch) +} + +func (mc *collector) collectResourceStats(ch chan<- prometheus.Metric) { + mc.sched.resourcesMut.RLock() + defer mc.sched.resourcesMut.RUnlock() + + var ( + tasksByState = make(map[workflow.TaskState]int) + streamsByState = make(map[workflow.StreamState]int) + ) + + for _, t := range mc.sched.tasks { + tasksByState[t.status.State]++ + } + for _, s := range mc.sched.streams { + streamsByState[s.state]++ + } + + for state, count := range tasksByState { + ch <- prometheus.MustNewConstMetric(mc.tasksInflight, prometheus.GaugeValue, float64(count), state.String()) + } + for state, count := range streamsByState { + ch <- prometheus.MustNewConstMetric(mc.streamsInflight, prometheus.GaugeValue, float64(count), state.String()) + } +} + +func (mc *collector) collectConnStats(ch chan<- prometheus.Metric) { + var ( + totalConnections int + + workersByState = make(map[workerState]int) + threadsByState = make(map[workerState]int) + ) + + mc.sched.connections.Range(func(key, _ any) bool { + wc := key.(*workerConn) + + // We only want to count connections which have been flagged as control + // plane connections (meaning they will be assigned work, and are a part + // of the set of compute). + if wc.Type() != connectionTypeControlPlane { + return true + } + + workersByState[getWorkerState(wc)]++ + + idle, ready, busy := countThreadStates(wc) + threadsByState[workerStateIdle] += idle + threadsByState[workerStateReady] += ready + threadsByState[workerStateBusy] += busy + + totalConnections++ + return true + }) + + ch <- prometheus.MustNewConstMetric(mc.connections, prometheus.GaugeValue, float64(totalConnections)) + + for state, count := range workersByState { + ch <- prometheus.MustNewConstMetric(mc.workers, prometheus.GaugeValue, float64(count), state.String()) + } + for state, count := range threadsByState { + ch <- prometheus.MustNewConstMetric(mc.threads, prometheus.GaugeValue, float64(count), state.String()) + } +} + +func (mc *collector) Describe(ch chan<- *prometheus.Desc) { + ch <- mc.tasksInflight + ch <- mc.streamsInflight + ch <- mc.connections + ch <- mc.workers + ch <- mc.threads +} + +type workerState int + +const ( + workerStateIdle workerState = iota // workerStateIdle is used when the worker is sleeping. + workerStateReady // workerStateReady is used when a worker is ready for a task. + workerStateBusy // workerStateBusy is used when a worker is executing a task. +) + +func (s workerState) String() string { + switch s { + case workerStateIdle: + return "idle" + case workerStateReady: + return "ready" + case workerStateBusy: + return "busy" + } + + return "" +} + +func getWorkerState(wc *workerConn) workerState { + _, ready, busy := countThreadStates(wc) + + // Worker state is inferred from thread states in descending precedence + // (busy > ready > idle). + switch { + case busy > 0: + return workerStateBusy + case ready > 0: + return workerStateReady + default: + return workerStateIdle + } +} + +func countThreadStates(wc *workerConn) (idle, ready, busy int) { + wc.mut.RLock() + defer wc.mut.RUnlock() + + busy = len(wc.tasks) + ready = wc.readyThreads + idle = wc.maxThreads - ready - busy + + return idle, ready, busy +} diff --git a/pkg/engine/internal/scheduler/metrics.go b/pkg/engine/internal/scheduler/metrics.go new file mode 100644 index 0000000000..6b0bb24b77 --- /dev/null +++ b/pkg/engine/internal/scheduler/metrics.go @@ -0,0 +1,66 @@ +package scheduler + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +// metrics is a container of metrics for a scheduler. +type metrics struct { + // registry to collect metrics as a unit. + reg *prometheus.Registry + + tasksTotal *prometheus.CounterVec + streamsTotal *prometheus.CounterVec + connsTotal prometheus.Counter + + taskQueueSeconds prometheus.Histogram + taskExecSeconds prometheus.Histogram +} + +func newMetrics() *metrics { + reg := prometheus.NewRegistry() + + return &metrics{ + reg: reg, + + tasksTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "loki_engine_scheduler_tasks_total", + Help: "Total number of tasks by state, counting transitions into state", + }, []string{"state"}), + streamsTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "loki_engine_scheduler_streams_total", + Help: "Total number of streams by state, counting transitions into state", + }, []string{"state"}), + connsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "loki_engine_scheduler_connections_total", + Help: "Total number of connections to the scheduler for any purpose (control or data plane)", + }), + + taskQueueSeconds: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + Name: "loki_engine_scheduler_task_queue_seconds", + Help: "Number of seconds a task sat in a queue before being assigned to a worker thread", + + NativeHistogramBucketFactor: 1.1, + NativeHistogramMaxBucketNumber: 100, + NativeHistogramMinResetDuration: time.Hour, + }), + + taskExecSeconds: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + Name: "loki_engine_scheduler_task_exec_seconds", + Help: "Number of seconds a task took to complete successfully", + + NativeHistogramBucketFactor: 1.1, + NativeHistogramMaxBucketNumber: 100, + NativeHistogramMinResetDuration: time.Hour, + }), + } +} + +// Register registers metrics to report to reg. +func (m *metrics) Register(reg prometheus.Registerer) error { return reg.Register(m.reg) } + +// Unregister unregisters metrics from the provided Registerer. +func (m *metrics) Unregister(reg prometheus.Registerer) { reg.Unregister(m.reg) } diff --git a/pkg/engine/internal/scheduler/scheduler.go b/pkg/engine/internal/scheduler/scheduler.go index 3fae5b5c5f..635bbf0b30 100644 --- a/pkg/engine/internal/scheduler/scheduler.go +++ b/pkg/engine/internal/scheduler/scheduler.go @@ -16,6 +16,7 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/dskit/services" "github.com/oklog/ulid/v2" + "github.com/prometheus/client_golang/prometheus" "go.opentelemetry.io/otel/propagation" "golang.org/x/sync/errgroup" @@ -34,13 +35,18 @@ type Config struct { // Scheduler is a service that can schedule tasks to connected worker instances. type Scheduler struct { - logger log.Logger + logger log.Logger + metrics *metrics + collector *collector initOnce sync.Once svc services.Service listener wire.Listener + // Current set of connections, used for collecting metrics. + connections sync.Map // map[*workerConn]struct{} + resourcesMut sync.RWMutex streams map[ulid.ULID]*stream // All known streams (regardless of state) tasks map[ulid.ULID]*task // All known tasks (regardless of state) @@ -65,15 +71,21 @@ func New(config Config) (*Scheduler, error) { return nil, errors.New("listener must be provided") } - return &Scheduler{ - logger: config.Logger, + s := &Scheduler{ + logger: config.Logger, + listener: config.Listener, streams: make(map[ulid.ULID]*stream), tasks: make(map[ulid.ULID]*task), assignSema: make(chan struct{}, 1), - }, nil + } + + s.metrics = newMetrics() + s.collector = newCollector(s) + + return s, nil } // Service returns the service used to manage the lifecycle of the Scheduler. @@ -114,6 +126,11 @@ func (s *Scheduler) handleConn(ctx context.Context, conn wire.Conn) { wc := new(workerConn) + s.connections.Store(wc, struct{}{}) + defer s.connections.Delete(wc) + + s.metrics.connsTotal.Inc() + peer := &wire.Peer{ Logger: logger, Conn: conn, @@ -157,12 +174,8 @@ func (s *Scheduler) handleConn(ctx context.Context, conn wire.Conn) { } func (s *Scheduler) handleStreamData(ctx context.Context, worker *workerConn, msg wire.StreamDataMessage) error { - switch worker.ty { - case connectionTypeInitial: - // Flag the connection as a data plane connection. - worker.ty = connectionTypeDataPlane - case connectionTypeControlPlane: - return fmt.Errorf("workers in state %s can not send stream data messages", worker.ty) + if err := worker.MarkDataPlane(); err != nil { + return err } s.resourcesMut.RLock() @@ -178,30 +191,17 @@ func (s *Scheduler) handleStreamData(ctx context.Context, worker *workerConn, ms } func (s *Scheduler) handleWorkerHello(_ context.Context, worker *workerConn, msg wire.WorkerHelloMessage) error { - if got, want := worker.ty, connectionTypeInitial; got != want { - return fmt.Errorf("worker connection must be in state %q, got %q", want, got) - } else if msg.Threads <= 0 { - return errors.New("worker must advertise at least one thread") - } - - worker.ty = connectionTypeControlPlane - worker.maxThreads = msg.Threads - return nil + return worker.HandleHello(msg) } func (s *Scheduler) markWorkerReady(_ context.Context, worker *workerConn) error { - if got, want := worker.ty, connectionTypeControlPlane; got != want { - return fmt.Errorf("worker connection must be in state %q, got %q", want, got) + if err := worker.MarkReady(); err != nil { + return err } s.assignMut.Lock() defer s.assignMut.Unlock() - if !worker.hasCapacity() { - return fmt.Errorf("maximum capacity %d reached, wait for task assignment or complete existing assigned tasks", worker.maxThreads) - } - worker.readyThreads++ - s.readyWorkers = append(s.readyWorkers, worker) // Wake [Scheduler.runAssignLoop] if we have both peers and tasks available. @@ -223,7 +223,7 @@ func nudgeSemaphore(sema chan struct{}) { } func (s *Scheduler) handleTaskStatus(ctx context.Context, worker *workerConn, msg wire.TaskStatusMessage) error { - if got, want := worker.ty, connectionTypeControlPlane; got != want { + if got, want := worker.Type(), connectionTypeControlPlane; got != want { return fmt.Errorf("worker connection must be in state %q, got %q", want, got) } @@ -238,12 +238,18 @@ func (s *Scheduler) handleTaskStatus(ctx context.Context, worker *workerConn, ms return fmt.Errorf("task %s not found", msg.ID) } - changed, err := task.setState(msg.Status) + changed, err := task.setState(s.metrics, msg.Status) if err != nil { return err } else if changed { if owner := task.owner; owner != nil && task.status.State.Terminal() { - owner.untrackAssignment(task) + owner.Unassign(task) + } + + if task.status.State == workflow.TaskStateCompleted { + // The execution time of the task is the duration from when it was + // first assigned to when we received the completion status. + s.metrics.taskExecSeconds.Observe(time.Since(task.assignTime).Seconds()) } // Notify the handler about the change. @@ -257,7 +263,7 @@ func (s *Scheduler) handleTaskStatus(ctx context.Context, worker *workerConn, ms } func (s *Scheduler) handleStreamStatus(ctx context.Context, worker *workerConn, msg wire.StreamStatusMessage) error { - if got, want := worker.ty, connectionTypeControlPlane; got != want { + if got, want := worker.Type(), connectionTypeControlPlane; got != want { return fmt.Errorf("worker connection must be in state %q, got %q", want, got) } @@ -277,7 +283,7 @@ func (s *Scheduler) handleStreamStatus(ctx context.Context, worker *workerConn, // changeStreamState updates the state of the target stream. changeStreamState // must be called while the resourcesMut lock is held. func (s *Scheduler) changeStreamState(ctx context.Context, n *notifier, target *stream, newState workflow.StreamState) error { - changed, err := target.setState(newState) + changed, err := target.setState(s.metrics, newState) if err != nil { return err } else if !changed { @@ -323,10 +329,10 @@ func (s *Scheduler) abortWorkerTasks(ctx context.Context, worker *workerConn, re } } - for task := range worker.tasks { - worker.untrackAssignment(task) + for _, task := range worker.Assigned() { + worker.Unassign(task) - if changed, _ := task.setState(newStatus); !changed { + if changed, _ := task.setState(s.metrics, newStatus); !changed { continue } @@ -436,7 +442,15 @@ func (s *Scheduler) assignTask(ctx context.Context, task *task, worker *workerCo } // The worker accepted the message, so we can assign the task to it now. - worker.trackAssignment(task) + worker.Assign(task) + + // The queue time of a task is the duration from when it entered the queue + // to when a worker accepted the assignment. + // + // We track this moment as the "assign time" to be able to calculate + // execution time later. + task.assignTime = time.Now() + s.metrics.taskQueueSeconds.Observe(task.assignTime.Sub(task.queueTime).Seconds()) // Now that the task has been accepted, we can attempt address bindings. We // do this on task assignment to simplify the implementation, though it @@ -585,6 +599,21 @@ NextTask: return errors.Join(errs...) } + // Observe initial state for the streams and tasks. + { + var ( + initialStreamState = workflow.StreamStateIdle.String() + initialTaskState = workflow.TaskStateCreated.String() + ) + + for range manifestStreams { + s.metrics.streamsTotal.WithLabelValues(initialStreamState).Inc() + } + for range manifestTasks { + s.metrics.tasksTotal.WithLabelValues(initialTaskState).Inc() + } + } + // Once we hit this point, the manifest has been validated and we can // atomically update our internal state. maps.Copy(s.streams, manifestStreams) @@ -625,7 +654,7 @@ func (s *Scheduler) UnregisterManifest(ctx context.Context, manifest *workflow.M // Immediately clean up our own resources. s.deleteTask(registered) - if changed, _ := registered.setState(workflow.TaskStatus{State: workflow.TaskStateCancelled}); !changed { + if changed, _ := registered.setState(s.metrics, workflow.TaskStatus{State: workflow.TaskStateCancelled}); !changed { // Ignore if the task couldn't move into the canceled state, which // indicates it's already in a terminal state. continue @@ -653,7 +682,7 @@ func (s *Scheduler) UnregisterManifest(ctx context.Context, manifest *workflow.M for _, streamToRemove := range manifest.Streams { registered := s.streams[streamToRemove.ULID] // Validated to exist above - changed, _ := registered.setState(workflow.StreamStateClosed) + changed, _ := registered.setState(s.metrics, workflow.StreamStateClosed) if changed { n.AddStreamEvent(streamNotification{ Handler: registered.handler, @@ -672,7 +701,7 @@ func (s *Scheduler) deleteTask(t *task) { delete(s.tasks, t.inner.ULID) if owner := t.owner; owner != nil { - owner.untrackAssignment(t) + owner.Unassign(t) } } @@ -756,6 +785,7 @@ func (s *Scheduler) enqueueTasks(tasks []*task) error { continue } + task.queueTime = time.Now() s.taskQueue = append(s.taskQueue, task) } @@ -777,7 +807,7 @@ func (s *Scheduler) markPending(ctx context.Context, tasks []*task) { defer s.resourcesMut.Unlock() for _, task := range tasks { - if changed, _ := task.setState(workflow.TaskStatus{State: workflow.TaskStatePending}); !changed { + if changed, _ := task.setState(s.metrics, workflow.TaskStatus{State: workflow.TaskStatePending}); !changed { // If the state change failed, the task either got canceled or // picked up by a worker in between enqueueing it and calling this // method. @@ -811,13 +841,13 @@ func (s *Scheduler) Cancel(ctx context.Context, tasks ...*workflow.Task) error { continue } - if changed, _ := registered.setState(workflow.TaskStatus{State: workflow.TaskStateCancelled}); changed { + if changed, _ := registered.setState(s.metrics, workflow.TaskStatus{State: workflow.TaskStateCancelled}); changed { // If the task has an owner, we'll inform it that the task has been // canceled and it can stop processing it. // // This is a best-effort message, so we don't wait for acknowledgement. if owner := registered.owner; owner != nil { - owner.untrackAssignment(registered) + owner.Unassign(registered) _ = owner.SendMessageAsync(ctx, wire.TaskCancelMessage{ID: registered.inner.ULID}) } @@ -847,3 +877,19 @@ func (s *Scheduler) Cancel(ctx context.Context, tasks ...*workflow.Task) error { } return nil } + +// RegisterMetrics registers metrics about s to report to reg. +func (s *Scheduler) RegisterMetrics(reg prometheus.Registerer) error { + var errs []error + + errs = append(errs, reg.Register(s.collector)) + errs = append(errs, s.metrics.Register(reg)) + + return errors.Join(errs...) +} + +// UnregisterMetrics unregisters metrics about s from reg. +func (s *Scheduler) UnregisterMetrics(reg prometheus.Registerer) { + reg.Unregister(s.collector) + s.metrics.Unregister(reg) +} diff --git a/pkg/engine/internal/scheduler/stream.go b/pkg/engine/internal/scheduler/stream.go index 220b629648..f99a46e5c6 100644 --- a/pkg/engine/internal/scheduler/stream.go +++ b/pkg/engine/internal/scheduler/stream.go @@ -33,7 +33,7 @@ var validStreamTransitions = map[workflow.StreamState][]workflow.StreamState{ // // Returns true if the state was updated, false otherwise (such as if the task // is already in the desired state). -func (s *stream) setState(newState workflow.StreamState) (bool, error) { +func (s *stream) setState(m *metrics, newState workflow.StreamState) (bool, error) { oldState := s.state if newState == oldState { @@ -46,6 +46,7 @@ func (s *stream) setState(newState workflow.StreamState) (bool, error) { } s.state = newState + m.streamsTotal.WithLabelValues(newState.String()).Inc() return true, nil } diff --git a/pkg/engine/internal/scheduler/task.go b/pkg/engine/internal/scheduler/task.go index 2e5f31493a..cabd6c6d9f 100644 --- a/pkg/engine/internal/scheduler/task.go +++ b/pkg/engine/internal/scheduler/task.go @@ -3,12 +3,16 @@ package scheduler import ( "fmt" "slices" + "time" "github.com/grafana/loki/v3/pkg/engine/internal/workflow" ) // task wraps a [workflow.Task] with its handler. type task struct { + assignTime time.Time // Time when task was assigned to a worker. + queueTime time.Time // Time when task was enqueued. + inner *workflow.Task handler workflow.TaskEventHandler @@ -31,7 +35,7 @@ var validTaskTransitions = map[workflow.TaskState][]workflow.TaskState{ // // Returns true if the state was updated, false otherwise (such as if the task // is already in the desired state). -func (t *task) setState(newStatus workflow.TaskStatus) (bool, error) { +func (t *task) setState(m *metrics, newStatus workflow.TaskStatus) (bool, error) { oldState, newState := t.status.State, newStatus.State if newState == oldState { @@ -44,5 +48,6 @@ func (t *task) setState(newStatus workflow.TaskStatus) (bool, error) { } t.status = newStatus + m.tasksTotal.WithLabelValues(newState.String()).Inc() return true, nil } diff --git a/pkg/engine/internal/scheduler/worker_conn.go b/pkg/engine/internal/scheduler/worker_conn.go index 467d3b32fb..1b89aae448 100644 --- a/pkg/engine/internal/scheduler/worker_conn.go +++ b/pkg/engine/internal/scheduler/worker_conn.go @@ -1,7 +1,10 @@ package scheduler import ( + "errors" "fmt" + "maps" + "slices" "sync" "github.com/grafana/loki/v3/pkg/engine/internal/scheduler/wire" @@ -53,6 +56,9 @@ type workerConn struct { // Peer connection to the worker. *wire.Peer + // mutex of the worker. Protects all fields. + mut sync.RWMutex + // ty represents the type of worker connection. Messages sent by the worker // that are incompatible with the connection type are rejected. ty connectionType @@ -63,12 +69,87 @@ type workerConn struct { // len(tasks) + readyThreads must not exceed maxThreads. maxThreads int - mut sync.RWMutex - readyThreads int // readyThreads represents the number of threads that are ready to be assigned a task. - tasks map[*task]struct{} // tasks hold the collection of tasks currently assigned to the worker. + // readyThreads represents the number of threads that are ready to be assigned a task. + readyThreads int + + // tasks hold the collection of tasks currently assigned to the worker. + tasks map[*task]struct{} +} + +// Type returns the type of the worker connection. +func (wc *workerConn) Type() connectionType { + wc.mut.RLock() + defer wc.mut.RUnlock() + + return wc.ty } -func (wc *workerConn) trackAssignment(assigned *task) { +// HandleHello handles a WorkerHelloMessage. Returns an error if the worker is +// not in a valid state for a HelloMessage, or if the message is invalid. +// +// After HandleHello is called, the worker connection is marked as a control +// plane connection. +func (wc *workerConn) HandleHello(msg wire.WorkerHelloMessage) error { + wc.mut.Lock() + defer wc.mut.Unlock() + + if got, want := wc.ty, connectionTypeInitial; got != want { + return fmt.Errorf("worker connection must be in state %q, got %q", want, got) + } else if msg.Threads <= 0 { + return errors.New("worker must advertise at least one thread") + } + + wc.ty = connectionTypeControlPlane + wc.maxThreads = msg.Threads + return nil +} + +// MarkReady marks the worker as ready to receive tasks. Returns an error if the +// worker is not a control plane connection, or if the worker is at full +// capacity. +func (wc *workerConn) MarkReady() error { + wc.mut.Lock() + defer wc.mut.Unlock() + + if got, want := wc.ty, connectionTypeControlPlane; got != want { + return fmt.Errorf("worker connection must be in state %q, got %q", want, got) + } + + if !wc.hasCapacity() { + return fmt.Errorf("maximum capacity %d reached, wait for task assignment or complete existing assigned tasks", wc.maxThreads) + } + wc.readyThreads++ + return nil +} + +// MarkDataPlane marks the worker as a data plane connection. Returns an error +// if the worker is not in a valid state. MarkDataPlane is a no-op if the worker +// is already marked as a data plane connection. +func (wc *workerConn) MarkDataPlane() error { + wc.mut.Lock() + defer wc.mut.Unlock() + + switch wc.ty { + case connectionTypeInitial: + // Flag the connection as a data plane connection. + wc.ty = connectionTypeDataPlane + case connectionTypeControlPlane: + return fmt.Errorf("workers in state %s can not send stream data messages", wc.ty) + } + + return nil +} + +// Assigned returns a copy of the assigned tasks in an undefined order. +func (wc *workerConn) Assigned() []*task { + wc.mut.RLock() + defer wc.mut.RUnlock() + + return slices.Collect(maps.Keys(wc.tasks)) +} + +// Assign assigns a task to the worker. +func (wc *workerConn) Assign(assigned *task) { wc.mut.Lock() defer wc.mut.Unlock() @@ -84,7 +165,8 @@ func (wc *workerConn) trackAssignment(assigned *task) { wc.readyThreads-- } -func (wc *workerConn) untrackAssignment(assigned *task) { +// Unassign removes a task from the worker. +func (wc *workerConn) Unassign(assigned *task) { wc.mut.Lock() defer wc.mut.Unlock() @@ -93,8 +175,5 @@ func (wc *workerConn) untrackAssignment(assigned *task) { // hasCapacity returns true if the worker has capacity to accept more tasks. func (wc *workerConn) hasCapacity() bool { - wc.mut.RLock() - defer wc.mut.RUnlock() - return wc.maxThreads > len(wc.tasks)+wc.readyThreads } diff --git a/pkg/engine/scheduler.go b/pkg/engine/scheduler.go index 1ed111386f..00d634f413 100644 --- a/pkg/engine/scheduler.go +++ b/pkg/engine/scheduler.go @@ -7,6 +7,7 @@ import ( "github.com/go-kit/log" "github.com/gorilla/mux" "github.com/grafana/dskit/services" + "github.com/prometheus/client_golang/prometheus" "github.com/grafana/loki/v3/pkg/engine/internal/scheduler" "github.com/grafana/loki/v3/pkg/engine/internal/scheduler/wire" @@ -94,3 +95,13 @@ func (s *Scheduler) RegisterSchedulerServer(router *mux.Router) { func (s *Scheduler) Service() services.Service { return s.inner.Service() } + +// RegisterMetrics registers metrics about s to report to reg. +func (s *Scheduler) RegisterMetrics(reg prometheus.Registerer) error { + return s.inner.RegisterMetrics(reg) +} + +// UnregisterMetrics unregisters metrics about s from reg. +func (s *Scheduler) UnregisterMetrics(reg prometheus.Registerer) { + s.inner.UnregisterMetrics(reg) +} diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index f9f45d65f8..2abdf006fe 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -1481,6 +1481,15 @@ func (t *Loki) initV2QueryEngineScheduler() (services.Service, error) { return nil, err } + if err := sched.RegisterMetrics(prometheus.DefaultRegisterer); err != nil { + return nil, err + } + sched.Service().AddListener(services.NewListener( + nil, nil, nil, + func(_ services.State) { sched.UnregisterMetrics(prometheus.DefaultRegisterer) }, + func(_ services.State, _ error) { sched.UnregisterMetrics(prometheus.DefaultRegisterer) }, + )) + // Only register HTTP handler when running distributed query execution if t.Cfg.Querier.EngineV2.Distributed { sched.RegisterSchedulerServer(t.Server.HTTP)