diff --git a/pkg/engine/internal/scheduler/collector.go b/pkg/engine/internal/scheduler/collector.go index 02fd538afc..cb905f0704 100644 --- a/pkg/engine/internal/scheduler/collector.go +++ b/pkg/engine/internal/scheduler/collector.go @@ -1,8 +1,14 @@ package scheduler import ( + "context" + "math" + "time" + "github.com/prometheus/client_golang/prometheus" + "golang.org/x/sync/errgroup" + "github.com/grafana/loki/v3/pkg/engine/internal/util/ewma" "github.com/grafana/loki/v3/pkg/engine/internal/workflow" ) @@ -11,6 +17,11 @@ import ( type collector struct { sched *Scheduler + load prometheus.GaugeFunc + saturation prometheus.GaugeFunc + loadAverage *ewma.EWMA + saturationAverage *ewma.EWMA + tasksInflight *prometheus.Desc streamsInflight *prometheus.Desc @@ -21,10 +32,41 @@ type collector struct { var _ prometheus.Collector = (*collector)(nil) +// newCollector returns a new collector for the given scheduler. Load and +// saturation average metrics will only be computed after calling +// [collector.Process]. func newCollector(sched *Scheduler) *collector { + var ( + loadSource ewma.SourceFunc = func() float64 { return computeLoad(sched) } + saturationSource ewma.SourceFunc = func() float64 { return computeSaturation(sched) } + ) + return &collector{ sched: sched, + load: prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "loki_engine_scheduler_load", + Help: "Current load on the scheduler (count of running and pending tasks)", + }, loadSource), + saturation: prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "loki_engine_scheduler_saturation", + Help: "Current saturation of the scheduler (loki_engine_scheduler_load divided by non-idle worker threads)", + }, saturationSource), + loadAverage: ewma.MustNew(ewma.Options{ + Name: "loki_engine_scheduler_load_average", + Help: "Average load on the scheduler over time", + + UpdateFrequency: 5 * time.Second, + Windows: []time.Duration{1 * time.Minute, 5 * time.Minute, 15 * time.Minute}, + }, loadSource), + saturationAverage: ewma.MustNew(ewma.Options{ + Name: "loki_engine_scheduler_saturation_average", + Help: "Average saturation of the scheduler over time", + + UpdateFrequency: 5 * time.Second, + Windows: []time.Duration{1 * time.Minute, 5 * time.Minute, 15 * time.Minute}, + }, saturationSource), + tasksInflight: prometheus.NewDesc( "loki_engine_scheduler_tasks_inflight", "Number of in-flight tasks by state", @@ -59,7 +101,65 @@ func newCollector(sched *Scheduler) *collector { } } +// computeLoad returns the active load on the scheduler: the sum of running and +// pending tasks. +func computeLoad(sched *Scheduler) float64 { + sched.resourcesMut.RLock() + defer sched.resourcesMut.RUnlock() + + var load uint64 + + for _, t := range sched.tasks { + if t.status.State == workflow.TaskStateRunning || t.status.State == workflow.TaskStatePending { + load++ + } + } + + return float64(load) +} + +// computeSaturation returns the saturation of the scheduler: the load divided by +// the number of non-idle threads. +func computeSaturation(sched *Scheduler) float64 { + // The value here may be slightly off due to the asynchronous nature of the + // scheduler: the load may change while we're counting compute capacity, and + // threads may change states while we're looking at other connections. + // + // However, we should still be providing a good enough approximation of the + // scheduler's saturation. + load := computeLoad(sched) + + var compute uint64 + + sched.connections.Range(func(key, _ any) bool { + _, ready, busy := countThreadStates(key.(*workerConn)) + compute += uint64(ready + busy) + return true + }) + + if compute == 0 { + return math.Inf(1) + } + return float64(load) / float64(compute) +} + +// Process performs stat computations for EWMA metrics of the collector. Process +// runs until the provided context is canceled. +func (mc *collector) Process(ctx context.Context) error { + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { return mc.loadAverage.Monitor(ctx) }) + g.Go(func() error { return mc.saturationAverage.Monitor(ctx) }) + + return g.Wait() +} + func (mc *collector) Collect(ch chan<- prometheus.Metric) { + mc.load.Collect(ch) + mc.saturation.Collect(ch) + mc.loadAverage.Collect(ch) + mc.saturationAverage.Collect(ch) + mc.collectResourceStats(ch) mc.collectConnStats(ch) } diff --git a/pkg/engine/internal/scheduler/scheduler.go b/pkg/engine/internal/scheduler/scheduler.go index 635bbf0b30..2539458608 100644 --- a/pkg/engine/internal/scheduler/scheduler.go +++ b/pkg/engine/internal/scheduler/scheduler.go @@ -100,6 +100,7 @@ func (s *Scheduler) Service() services.Service { func (s *Scheduler) run(ctx context.Context) error { g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { return s.collector.Process(ctx) }) g.Go(func() error { return s.runAcceptLoop(ctx) }) g.Go(func() error { return s.runAssignLoop(ctx) }) diff --git a/pkg/engine/internal/util/ewma/ewma.go b/pkg/engine/internal/util/ewma/ewma.go new file mode 100644 index 0000000000..f1f609e761 --- /dev/null +++ b/pkg/engine/internal/util/ewma/ewma.go @@ -0,0 +1,155 @@ +// Package ewma provides an implementation of an exponentially weighted moving +// average (EWMA) that can be reported as a Prometheus metric. +package ewma + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/atomic" +) + +// Source is an interface that provides a value for an EWMA calculation. +type Source interface { + // Get returns the current value for the EWMA calculation. + Get() float64 +} + +// SourceFunc is a [Source] that provides its value from a function. +type SourceFunc func() float64 + +// Get returns the value provided by the SourceFunc. +func (sf SourceFunc) Get() float64 { return sf() } + +// Options provides configuration options for an EWMA. +type Options struct { + // Name of the EWMA metric. + Name string + + // Help text for the EWMA metric. + Help string + + // UpdateFrequency is the frequency at which the EWMA is updated from the + // Source. A typical value is 5s. + UpdateFrequency time.Duration + + // Windows is a list of time windows for which averages will be calculated; + // such as 1m, 5m, and 15m averages. + Windows []time.Duration +} + +// EWMA provides an exponentially weighted moving average (EWMA). EWMA +// implements [prometheus.Collector]. +type EWMA struct { + source Source + metric *prometheus.Desc + updateFreq time.Duration + + running atomic.Bool + + mut sync.RWMutex + windows []*window +} + +var _ prometheus.Collector = (*EWMA)(nil) + +// New creates a new EWMA with the given options and source. The returned EWMA +// must be started by calling [EWMA.Monitor]. +func New(opts Options, source Source) (*EWMA, error) { + if source == nil { + return nil, errors.New("source must not be nil") + } else if opts.UpdateFrequency <= 0 { + return nil, errors.New("UpdateFrequency must be greater than zero") + } + + windows := make([]*window, 0, len(opts.Windows)) + for _, size := range opts.Windows { + if size <= 0 { + return nil, errors.New("window size must be greater than zero") + } + windows = append(windows, &window{Size: size}) + } + + return &EWMA{ + source: source, + metric: prometheus.NewDesc(opts.Name, opts.Help, []string{"window"}, nil), + updateFreq: opts.UpdateFrequency, + + windows: windows, + }, nil +} + +// MustNew calls [New] and panics on error. +func MustNew(opts Options, source Source) *EWMA { + ewma, err := New(opts, source) + if err != nil { + panic(err) + } + return ewma +} + +// Monitor starts the EWMA, polling values from the source at the configured +// update frequency. +// +// Monitor runs until the provided context is canceled. Monitor returns an error +// if a Monitor is already running. +func (ewma *EWMA) Monitor(ctx context.Context) error { + if !ewma.running.CompareAndSwap(false, true) { + return errors.New("EWMA is already running") + } + defer ewma.running.Store(false) + + ticker := time.NewTicker(ewma.updateFreq) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + ewma.updateWindows() + } + } +} + +func (ewma *EWMA) updateWindows() { + ewma.mut.Lock() + defer ewma.mut.Unlock() + + // Make sure to use the same sample and "now" time for all windows for + // consistency. + var ( + sample = ewma.source.Get() + now = time.Now() + ) + + for _, w := range ewma.windows { + w.Observe(sample, now) + } +} + +// Collect writes the EWMA metrics to the given channel. Metrics are only +// written when the EWMA is running via [EWMA.Monitor]. +func (ewma *EWMA) Collect(ch chan<- prometheus.Metric) { + if !ewma.running.Load() { + // Don't report any metrics unless the EWMA is running. Not running the + // EWMA is a bug and reporting its values would be misleading since + // they're not changing. + return + } + + ewma.mut.RLock() + defer ewma.mut.RUnlock() + + for _, w := range ewma.windows { + ch <- prometheus.MustNewConstMetric(ewma.metric, prometheus.GaugeValue, w.Value(), w.Name()) + } +} + +// Describe describes the metrics emitted by EWMA. +func (ewma *EWMA) Describe(ch chan<- *prometheus.Desc) { + ch <- ewma.metric +} diff --git a/pkg/engine/internal/util/ewma/ewma_test.go b/pkg/engine/internal/util/ewma/ewma_test.go new file mode 100644 index 0000000000..94a8141d5b --- /dev/null +++ b/pkg/engine/internal/util/ewma/ewma_test.go @@ -0,0 +1,69 @@ +package ewma_test + +import ( + "context" + "strings" + "sync" + "testing" + "testing/synctest" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/engine/internal/util/ewma" +) + +func Test(t *testing.T) { + var idx int + observations := []float64{ + 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, + 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, + } + + source := ewma.SourceFunc(func() float64 { + if idx >= len(observations) { + <-t.Context().Done() + return 0 + } + + res := observations[idx] + idx++ + return res + }) + + synctest.Test(t, func(t *testing.T) { + reg := prometheus.NewRegistry() + + m, err := ewma.New(ewma.Options{ + Name: "my_ewma_metric", + UpdateFrequency: time.Minute, + Windows: []time.Duration{time.Minute, 5 * time.Minute, 15 * time.Minute}, + }, source) + require.NoError(t, err) + + reg.MustRegister(m) + + var wg sync.WaitGroup + defer wg.Wait() + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + wg.Go(func() { _ = m.Monitor(ctx) }) + + // Wait for all observations to have been made. + time.Sleep(time.Duration(len(observations)) * time.Minute) + synctest.Wait() + + expect := ` +# HELP +# TYPE my_ewma_metric gauge +my_ewma_metric{window="15m"} 9.581664816797675 +my_ewma_metric{window="1m"} 19.41802329639137 +my_ewma_metric{window="5m"} 15.584385505095714 +` + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expect))) + }) +} diff --git a/pkg/engine/internal/util/ewma/window.go b/pkg/engine/internal/util/ewma/window.go new file mode 100644 index 0000000000..2f8689a926 --- /dev/null +++ b/pkg/engine/internal/util/ewma/window.go @@ -0,0 +1,58 @@ +package ewma + +import ( + "math" + "strings" + "time" +) + +// window represents a window size for EWMA calculations; such as a 15m window. +type window struct { + Size time.Duration + + initialized bool + value float64 + lastUpdate time.Time +} + +// Name returns a name for the window, based on its size. Unlike +// [time.Duration.String], trailing zero units are removed, so 15m0s becomes +// 15m. +func (w *window) Name() string { + name := w.Size.String() + + if strings.HasSuffix(name, "m0s") { + name = name[:len(name)-2] // Trim 0s + } + if strings.HasSuffix(name, "h0m") { + name = name[:len(name)-2] // Trim 0m + } + return name +} + +// Observe updates the window with a new value. Observe reinitializes the window +// the now timestamp is earlier than now timestamp on the previous call. +func (w *window) Observe(value float64, now time.Time) { + // We'll also treat clock drift as reinitialization. + if !w.initialized || now.Before(w.lastUpdate) { + w.initialized = true + w.value = value + w.lastUpdate = now + return + } + + // EWMA is calculated using the formula: + // ewma_new = decay * ewma_old + (1 - decay) * value + // + // Where decay is: + // e^(-delta/window_size) + + delta := now.Sub(w.lastUpdate) + decay := math.Exp(-delta.Seconds() / w.Size.Seconds()) + + w.value = decay*w.value + (1-decay)*value + w.lastUpdate = now +} + +// Value returns the current EWMA value. +func (w *window) Value() float64 { return w.value }