chore(engine): add base set of new scheduler metrics (#19913)

Signed-off-by: Robert Fratto <robertfratto@gmail.com>
pull/19924/head
Robert Fratto 6 months ago committed by GitHub
parent bce87fbce5
commit 2fce7da611
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 183
      pkg/engine/internal/scheduler/collector.go
  2. 66
      pkg/engine/internal/scheduler/metrics.go
  3. 128
      pkg/engine/internal/scheduler/scheduler.go
  4. 3
      pkg/engine/internal/scheduler/stream.go
  5. 7
      pkg/engine/internal/scheduler/task.go
  6. 95
      pkg/engine/internal/scheduler/worker_conn.go
  7. 11
      pkg/engine/scheduler.go
  8. 9
      pkg/loki/modules.go

@ -0,0 +1,183 @@
package scheduler
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/loki/v3/pkg/engine/internal/workflow"
)
// collector implements [prometheus.Collector], collecting metrics for a
// [Scheduler].
type collector struct {
sched *Scheduler
tasksInflight *prometheus.Desc
streamsInflight *prometheus.Desc
connections *prometheus.Desc
workers *prometheus.Desc
threads *prometheus.Desc
}
var _ prometheus.Collector = (*collector)(nil)
func newCollector(sched *Scheduler) *collector {
return &collector{
sched: sched,
tasksInflight: prometheus.NewDesc(
"loki_engine_scheduler_tasks_inflight",
"Number of in-flight tasks by state",
[]string{"state"},
nil,
),
streamsInflight: prometheus.NewDesc(
"loki_engine_scheduler_streams_inflight",
"Number of in-flight streams by state",
[]string{"state"},
nil,
),
connections: prometheus.NewDesc(
"loki_engine_scheduler_connections_active",
"Current number active connections to the scheduler",
nil,
nil,
),
workers: prometheus.NewDesc(
"loki_engine_scheduler_workers",
"Current number of workers connected to the scheduler by state",
[]string{"state"},
nil,
),
threads: prometheus.NewDesc(
"loki_engine_scheduler_threads",
"Current number of worker threads connected to the scheduler by state",
[]string{"state"},
nil,
),
}
}
func (mc *collector) Collect(ch chan<- prometheus.Metric) {
mc.collectResourceStats(ch)
mc.collectConnStats(ch)
}
func (mc *collector) collectResourceStats(ch chan<- prometheus.Metric) {
mc.sched.resourcesMut.RLock()
defer mc.sched.resourcesMut.RUnlock()
var (
tasksByState = make(map[workflow.TaskState]int)
streamsByState = make(map[workflow.StreamState]int)
)
for _, t := range mc.sched.tasks {
tasksByState[t.status.State]++
}
for _, s := range mc.sched.streams {
streamsByState[s.state]++
}
for state, count := range tasksByState {
ch <- prometheus.MustNewConstMetric(mc.tasksInflight, prometheus.GaugeValue, float64(count), state.String())
}
for state, count := range streamsByState {
ch <- prometheus.MustNewConstMetric(mc.streamsInflight, prometheus.GaugeValue, float64(count), state.String())
}
}
func (mc *collector) collectConnStats(ch chan<- prometheus.Metric) {
var (
totalConnections int
workersByState = make(map[workerState]int)
threadsByState = make(map[workerState]int)
)
mc.sched.connections.Range(func(key, _ any) bool {
wc := key.(*workerConn)
// We only want to count connections which have been flagged as control
// plane connections (meaning they will be assigned work, and are a part
// of the set of compute).
if wc.Type() != connectionTypeControlPlane {
return true
}
workersByState[getWorkerState(wc)]++
idle, ready, busy := countThreadStates(wc)
threadsByState[workerStateIdle] += idle
threadsByState[workerStateReady] += ready
threadsByState[workerStateBusy] += busy
totalConnections++
return true
})
ch <- prometheus.MustNewConstMetric(mc.connections, prometheus.GaugeValue, float64(totalConnections))
for state, count := range workersByState {
ch <- prometheus.MustNewConstMetric(mc.workers, prometheus.GaugeValue, float64(count), state.String())
}
for state, count := range threadsByState {
ch <- prometheus.MustNewConstMetric(mc.threads, prometheus.GaugeValue, float64(count), state.String())
}
}
func (mc *collector) Describe(ch chan<- *prometheus.Desc) {
ch <- mc.tasksInflight
ch <- mc.streamsInflight
ch <- mc.connections
ch <- mc.workers
ch <- mc.threads
}
type workerState int
const (
workerStateIdle workerState = iota // workerStateIdle is used when the worker is sleeping.
workerStateReady // workerStateReady is used when a worker is ready for a task.
workerStateBusy // workerStateBusy is used when a worker is executing a task.
)
func (s workerState) String() string {
switch s {
case workerStateIdle:
return "idle"
case workerStateReady:
return "ready"
case workerStateBusy:
return "busy"
}
return ""
}
func getWorkerState(wc *workerConn) workerState {
_, ready, busy := countThreadStates(wc)
// Worker state is inferred from thread states in descending precedence
// (busy > ready > idle).
switch {
case busy > 0:
return workerStateBusy
case ready > 0:
return workerStateReady
default:
return workerStateIdle
}
}
func countThreadStates(wc *workerConn) (idle, ready, busy int) {
wc.mut.RLock()
defer wc.mut.RUnlock()
busy = len(wc.tasks)
ready = wc.readyThreads
idle = wc.maxThreads - ready - busy
return idle, ready, busy
}

@ -0,0 +1,66 @@
package scheduler
import (
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
// metrics is a container of metrics for a scheduler.
type metrics struct {
// registry to collect metrics as a unit.
reg *prometheus.Registry
tasksTotal *prometheus.CounterVec
streamsTotal *prometheus.CounterVec
connsTotal prometheus.Counter
taskQueueSeconds prometheus.Histogram
taskExecSeconds prometheus.Histogram
}
func newMetrics() *metrics {
reg := prometheus.NewRegistry()
return &metrics{
reg: reg,
tasksTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "loki_engine_scheduler_tasks_total",
Help: "Total number of tasks by state, counting transitions into state",
}, []string{"state"}),
streamsTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "loki_engine_scheduler_streams_total",
Help: "Total number of streams by state, counting transitions into state",
}, []string{"state"}),
connsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "loki_engine_scheduler_connections_total",
Help: "Total number of connections to the scheduler for any purpose (control or data plane)",
}),
taskQueueSeconds: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "loki_engine_scheduler_task_queue_seconds",
Help: "Number of seconds a task sat in a queue before being assigned to a worker thread",
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: time.Hour,
}),
taskExecSeconds: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "loki_engine_scheduler_task_exec_seconds",
Help: "Number of seconds a task took to complete successfully",
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: time.Hour,
}),
}
}
// Register registers metrics to report to reg.
func (m *metrics) Register(reg prometheus.Registerer) error { return reg.Register(m.reg) }
// Unregister unregisters metrics from the provided Registerer.
func (m *metrics) Unregister(reg prometheus.Registerer) { reg.Unregister(m.reg) }

@ -16,6 +16,7 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/services"
"github.com/oklog/ulid/v2"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/propagation"
"golang.org/x/sync/errgroup"
@ -34,13 +35,18 @@ type Config struct {
// Scheduler is a service that can schedule tasks to connected worker instances.
type Scheduler struct {
logger log.Logger
logger log.Logger
metrics *metrics
collector *collector
initOnce sync.Once
svc services.Service
listener wire.Listener
// Current set of connections, used for collecting metrics.
connections sync.Map // map[*workerConn]struct{}
resourcesMut sync.RWMutex
streams map[ulid.ULID]*stream // All known streams (regardless of state)
tasks map[ulid.ULID]*task // All known tasks (regardless of state)
@ -65,15 +71,21 @@ func New(config Config) (*Scheduler, error) {
return nil, errors.New("listener must be provided")
}
return &Scheduler{
logger: config.Logger,
s := &Scheduler{
logger: config.Logger,
listener: config.Listener,
streams: make(map[ulid.ULID]*stream),
tasks: make(map[ulid.ULID]*task),
assignSema: make(chan struct{}, 1),
}, nil
}
s.metrics = newMetrics()
s.collector = newCollector(s)
return s, nil
}
// Service returns the service used to manage the lifecycle of the Scheduler.
@ -114,6 +126,11 @@ func (s *Scheduler) handleConn(ctx context.Context, conn wire.Conn) {
wc := new(workerConn)
s.connections.Store(wc, struct{}{})
defer s.connections.Delete(wc)
s.metrics.connsTotal.Inc()
peer := &wire.Peer{
Logger: logger,
Conn: conn,
@ -157,12 +174,8 @@ func (s *Scheduler) handleConn(ctx context.Context, conn wire.Conn) {
}
func (s *Scheduler) handleStreamData(ctx context.Context, worker *workerConn, msg wire.StreamDataMessage) error {
switch worker.ty {
case connectionTypeInitial:
// Flag the connection as a data plane connection.
worker.ty = connectionTypeDataPlane
case connectionTypeControlPlane:
return fmt.Errorf("workers in state %s can not send stream data messages", worker.ty)
if err := worker.MarkDataPlane(); err != nil {
return err
}
s.resourcesMut.RLock()
@ -178,30 +191,17 @@ func (s *Scheduler) handleStreamData(ctx context.Context, worker *workerConn, ms
}
func (s *Scheduler) handleWorkerHello(_ context.Context, worker *workerConn, msg wire.WorkerHelloMessage) error {
if got, want := worker.ty, connectionTypeInitial; got != want {
return fmt.Errorf("worker connection must be in state %q, got %q", want, got)
} else if msg.Threads <= 0 {
return errors.New("worker must advertise at least one thread")
}
worker.ty = connectionTypeControlPlane
worker.maxThreads = msg.Threads
return nil
return worker.HandleHello(msg)
}
func (s *Scheduler) markWorkerReady(_ context.Context, worker *workerConn) error {
if got, want := worker.ty, connectionTypeControlPlane; got != want {
return fmt.Errorf("worker connection must be in state %q, got %q", want, got)
if err := worker.MarkReady(); err != nil {
return err
}
s.assignMut.Lock()
defer s.assignMut.Unlock()
if !worker.hasCapacity() {
return fmt.Errorf("maximum capacity %d reached, wait for task assignment or complete existing assigned tasks", worker.maxThreads)
}
worker.readyThreads++
s.readyWorkers = append(s.readyWorkers, worker)
// Wake [Scheduler.runAssignLoop] if we have both peers and tasks available.
@ -223,7 +223,7 @@ func nudgeSemaphore(sema chan struct{}) {
}
func (s *Scheduler) handleTaskStatus(ctx context.Context, worker *workerConn, msg wire.TaskStatusMessage) error {
if got, want := worker.ty, connectionTypeControlPlane; got != want {
if got, want := worker.Type(), connectionTypeControlPlane; got != want {
return fmt.Errorf("worker connection must be in state %q, got %q", want, got)
}
@ -238,12 +238,18 @@ func (s *Scheduler) handleTaskStatus(ctx context.Context, worker *workerConn, ms
return fmt.Errorf("task %s not found", msg.ID)
}
changed, err := task.setState(msg.Status)
changed, err := task.setState(s.metrics, msg.Status)
if err != nil {
return err
} else if changed {
if owner := task.owner; owner != nil && task.status.State.Terminal() {
owner.untrackAssignment(task)
owner.Unassign(task)
}
if task.status.State == workflow.TaskStateCompleted {
// The execution time of the task is the duration from when it was
// first assigned to when we received the completion status.
s.metrics.taskExecSeconds.Observe(time.Since(task.assignTime).Seconds())
}
// Notify the handler about the change.
@ -257,7 +263,7 @@ func (s *Scheduler) handleTaskStatus(ctx context.Context, worker *workerConn, ms
}
func (s *Scheduler) handleStreamStatus(ctx context.Context, worker *workerConn, msg wire.StreamStatusMessage) error {
if got, want := worker.ty, connectionTypeControlPlane; got != want {
if got, want := worker.Type(), connectionTypeControlPlane; got != want {
return fmt.Errorf("worker connection must be in state %q, got %q", want, got)
}
@ -277,7 +283,7 @@ func (s *Scheduler) handleStreamStatus(ctx context.Context, worker *workerConn,
// changeStreamState updates the state of the target stream. changeStreamState
// must be called while the resourcesMut lock is held.
func (s *Scheduler) changeStreamState(ctx context.Context, n *notifier, target *stream, newState workflow.StreamState) error {
changed, err := target.setState(newState)
changed, err := target.setState(s.metrics, newState)
if err != nil {
return err
} else if !changed {
@ -323,10 +329,10 @@ func (s *Scheduler) abortWorkerTasks(ctx context.Context, worker *workerConn, re
}
}
for task := range worker.tasks {
worker.untrackAssignment(task)
for _, task := range worker.Assigned() {
worker.Unassign(task)
if changed, _ := task.setState(newStatus); !changed {
if changed, _ := task.setState(s.metrics, newStatus); !changed {
continue
}
@ -436,7 +442,15 @@ func (s *Scheduler) assignTask(ctx context.Context, task *task, worker *workerCo
}
// The worker accepted the message, so we can assign the task to it now.
worker.trackAssignment(task)
worker.Assign(task)
// The queue time of a task is the duration from when it entered the queue
// to when a worker accepted the assignment.
//
// We track this moment as the "assign time" to be able to calculate
// execution time later.
task.assignTime = time.Now()
s.metrics.taskQueueSeconds.Observe(task.assignTime.Sub(task.queueTime).Seconds())
// Now that the task has been accepted, we can attempt address bindings. We
// do this on task assignment to simplify the implementation, though it
@ -585,6 +599,21 @@ NextTask:
return errors.Join(errs...)
}
// Observe initial state for the streams and tasks.
{
var (
initialStreamState = workflow.StreamStateIdle.String()
initialTaskState = workflow.TaskStateCreated.String()
)
for range manifestStreams {
s.metrics.streamsTotal.WithLabelValues(initialStreamState).Inc()
}
for range manifestTasks {
s.metrics.tasksTotal.WithLabelValues(initialTaskState).Inc()
}
}
// Once we hit this point, the manifest has been validated and we can
// atomically update our internal state.
maps.Copy(s.streams, manifestStreams)
@ -625,7 +654,7 @@ func (s *Scheduler) UnregisterManifest(ctx context.Context, manifest *workflow.M
// Immediately clean up our own resources.
s.deleteTask(registered)
if changed, _ := registered.setState(workflow.TaskStatus{State: workflow.TaskStateCancelled}); !changed {
if changed, _ := registered.setState(s.metrics, workflow.TaskStatus{State: workflow.TaskStateCancelled}); !changed {
// Ignore if the task couldn't move into the canceled state, which
// indicates it's already in a terminal state.
continue
@ -653,7 +682,7 @@ func (s *Scheduler) UnregisterManifest(ctx context.Context, manifest *workflow.M
for _, streamToRemove := range manifest.Streams {
registered := s.streams[streamToRemove.ULID] // Validated to exist above
changed, _ := registered.setState(workflow.StreamStateClosed)
changed, _ := registered.setState(s.metrics, workflow.StreamStateClosed)
if changed {
n.AddStreamEvent(streamNotification{
Handler: registered.handler,
@ -672,7 +701,7 @@ func (s *Scheduler) deleteTask(t *task) {
delete(s.tasks, t.inner.ULID)
if owner := t.owner; owner != nil {
owner.untrackAssignment(t)
owner.Unassign(t)
}
}
@ -756,6 +785,7 @@ func (s *Scheduler) enqueueTasks(tasks []*task) error {
continue
}
task.queueTime = time.Now()
s.taskQueue = append(s.taskQueue, task)
}
@ -777,7 +807,7 @@ func (s *Scheduler) markPending(ctx context.Context, tasks []*task) {
defer s.resourcesMut.Unlock()
for _, task := range tasks {
if changed, _ := task.setState(workflow.TaskStatus{State: workflow.TaskStatePending}); !changed {
if changed, _ := task.setState(s.metrics, workflow.TaskStatus{State: workflow.TaskStatePending}); !changed {
// If the state change failed, the task either got canceled or
// picked up by a worker in between enqueueing it and calling this
// method.
@ -811,13 +841,13 @@ func (s *Scheduler) Cancel(ctx context.Context, tasks ...*workflow.Task) error {
continue
}
if changed, _ := registered.setState(workflow.TaskStatus{State: workflow.TaskStateCancelled}); changed {
if changed, _ := registered.setState(s.metrics, workflow.TaskStatus{State: workflow.TaskStateCancelled}); changed {
// If the task has an owner, we'll inform it that the task has been
// canceled and it can stop processing it.
//
// This is a best-effort message, so we don't wait for acknowledgement.
if owner := registered.owner; owner != nil {
owner.untrackAssignment(registered)
owner.Unassign(registered)
_ = owner.SendMessageAsync(ctx, wire.TaskCancelMessage{ID: registered.inner.ULID})
}
@ -847,3 +877,19 @@ func (s *Scheduler) Cancel(ctx context.Context, tasks ...*workflow.Task) error {
}
return nil
}
// RegisterMetrics registers metrics about s to report to reg.
func (s *Scheduler) RegisterMetrics(reg prometheus.Registerer) error {
var errs []error
errs = append(errs, reg.Register(s.collector))
errs = append(errs, s.metrics.Register(reg))
return errors.Join(errs...)
}
// UnregisterMetrics unregisters metrics about s from reg.
func (s *Scheduler) UnregisterMetrics(reg prometheus.Registerer) {
reg.Unregister(s.collector)
s.metrics.Unregister(reg)
}

@ -33,7 +33,7 @@ var validStreamTransitions = map[workflow.StreamState][]workflow.StreamState{
//
// Returns true if the state was updated, false otherwise (such as if the task
// is already in the desired state).
func (s *stream) setState(newState workflow.StreamState) (bool, error) {
func (s *stream) setState(m *metrics, newState workflow.StreamState) (bool, error) {
oldState := s.state
if newState == oldState {
@ -46,6 +46,7 @@ func (s *stream) setState(newState workflow.StreamState) (bool, error) {
}
s.state = newState
m.streamsTotal.WithLabelValues(newState.String()).Inc()
return true, nil
}

@ -3,12 +3,16 @@ package scheduler
import (
"fmt"
"slices"
"time"
"github.com/grafana/loki/v3/pkg/engine/internal/workflow"
)
// task wraps a [workflow.Task] with its handler.
type task struct {
assignTime time.Time // Time when task was assigned to a worker.
queueTime time.Time // Time when task was enqueued.
inner *workflow.Task
handler workflow.TaskEventHandler
@ -31,7 +35,7 @@ var validTaskTransitions = map[workflow.TaskState][]workflow.TaskState{
//
// Returns true if the state was updated, false otherwise (such as if the task
// is already in the desired state).
func (t *task) setState(newStatus workflow.TaskStatus) (bool, error) {
func (t *task) setState(m *metrics, newStatus workflow.TaskStatus) (bool, error) {
oldState, newState := t.status.State, newStatus.State
if newState == oldState {
@ -44,5 +48,6 @@ func (t *task) setState(newStatus workflow.TaskStatus) (bool, error) {
}
t.status = newStatus
m.tasksTotal.WithLabelValues(newState.String()).Inc()
return true, nil
}

@ -1,7 +1,10 @@
package scheduler
import (
"errors"
"fmt"
"maps"
"slices"
"sync"
"github.com/grafana/loki/v3/pkg/engine/internal/scheduler/wire"
@ -53,6 +56,9 @@ type workerConn struct {
// Peer connection to the worker.
*wire.Peer
// mutex of the worker. Protects all fields.
mut sync.RWMutex
// ty represents the type of worker connection. Messages sent by the worker
// that are incompatible with the connection type are rejected.
ty connectionType
@ -63,12 +69,87 @@ type workerConn struct {
// len(tasks) + readyThreads must not exceed maxThreads.
maxThreads int
mut sync.RWMutex
readyThreads int // readyThreads represents the number of threads that are ready to be assigned a task.
tasks map[*task]struct{} // tasks hold the collection of tasks currently assigned to the worker.
// readyThreads represents the number of threads that are ready to be assigned a task.
readyThreads int
// tasks hold the collection of tasks currently assigned to the worker.
tasks map[*task]struct{}
}
// Type returns the type of the worker connection.
func (wc *workerConn) Type() connectionType {
wc.mut.RLock()
defer wc.mut.RUnlock()
return wc.ty
}
func (wc *workerConn) trackAssignment(assigned *task) {
// HandleHello handles a WorkerHelloMessage. Returns an error if the worker is
// not in a valid state for a HelloMessage, or if the message is invalid.
//
// After HandleHello is called, the worker connection is marked as a control
// plane connection.
func (wc *workerConn) HandleHello(msg wire.WorkerHelloMessage) error {
wc.mut.Lock()
defer wc.mut.Unlock()
if got, want := wc.ty, connectionTypeInitial; got != want {
return fmt.Errorf("worker connection must be in state %q, got %q", want, got)
} else if msg.Threads <= 0 {
return errors.New("worker must advertise at least one thread")
}
wc.ty = connectionTypeControlPlane
wc.maxThreads = msg.Threads
return nil
}
// MarkReady marks the worker as ready to receive tasks. Returns an error if the
// worker is not a control plane connection, or if the worker is at full
// capacity.
func (wc *workerConn) MarkReady() error {
wc.mut.Lock()
defer wc.mut.Unlock()
if got, want := wc.ty, connectionTypeControlPlane; got != want {
return fmt.Errorf("worker connection must be in state %q, got %q", want, got)
}
if !wc.hasCapacity() {
return fmt.Errorf("maximum capacity %d reached, wait for task assignment or complete existing assigned tasks", wc.maxThreads)
}
wc.readyThreads++
return nil
}
// MarkDataPlane marks the worker as a data plane connection. Returns an error
// if the worker is not in a valid state. MarkDataPlane is a no-op if the worker
// is already marked as a data plane connection.
func (wc *workerConn) MarkDataPlane() error {
wc.mut.Lock()
defer wc.mut.Unlock()
switch wc.ty {
case connectionTypeInitial:
// Flag the connection as a data plane connection.
wc.ty = connectionTypeDataPlane
case connectionTypeControlPlane:
return fmt.Errorf("workers in state %s can not send stream data messages", wc.ty)
}
return nil
}
// Assigned returns a copy of the assigned tasks in an undefined order.
func (wc *workerConn) Assigned() []*task {
wc.mut.RLock()
defer wc.mut.RUnlock()
return slices.Collect(maps.Keys(wc.tasks))
}
// Assign assigns a task to the worker.
func (wc *workerConn) Assign(assigned *task) {
wc.mut.Lock()
defer wc.mut.Unlock()
@ -84,7 +165,8 @@ func (wc *workerConn) trackAssignment(assigned *task) {
wc.readyThreads--
}
func (wc *workerConn) untrackAssignment(assigned *task) {
// Unassign removes a task from the worker.
func (wc *workerConn) Unassign(assigned *task) {
wc.mut.Lock()
defer wc.mut.Unlock()
@ -93,8 +175,5 @@ func (wc *workerConn) untrackAssignment(assigned *task) {
// hasCapacity returns true if the worker has capacity to accept more tasks.
func (wc *workerConn) hasCapacity() bool {
wc.mut.RLock()
defer wc.mut.RUnlock()
return wc.maxThreads > len(wc.tasks)+wc.readyThreads
}

@ -7,6 +7,7 @@ import (
"github.com/go-kit/log"
"github.com/gorilla/mux"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/loki/v3/pkg/engine/internal/scheduler"
"github.com/grafana/loki/v3/pkg/engine/internal/scheduler/wire"
@ -94,3 +95,13 @@ func (s *Scheduler) RegisterSchedulerServer(router *mux.Router) {
func (s *Scheduler) Service() services.Service {
return s.inner.Service()
}
// RegisterMetrics registers metrics about s to report to reg.
func (s *Scheduler) RegisterMetrics(reg prometheus.Registerer) error {
return s.inner.RegisterMetrics(reg)
}
// UnregisterMetrics unregisters metrics about s from reg.
func (s *Scheduler) UnregisterMetrics(reg prometheus.Registerer) {
s.inner.UnregisterMetrics(reg)
}

@ -1481,6 +1481,15 @@ func (t *Loki) initV2QueryEngineScheduler() (services.Service, error) {
return nil, err
}
if err := sched.RegisterMetrics(prometheus.DefaultRegisterer); err != nil {
return nil, err
}
sched.Service().AddListener(services.NewListener(
nil, nil, nil,
func(_ services.State) { sched.UnregisterMetrics(prometheus.DefaultRegisterer) },
func(_ services.State, _ error) { sched.UnregisterMetrics(prometheus.DefaultRegisterer) },
))
// Only register HTTP handler when running distributed query execution
if t.Cfg.Querier.EngineV2.Distributed {
sched.RegisterSchedulerServer(t.Server.HTTP)

Loading…
Cancel
Save