chore(engine): track scheduler load/pressure metrics (#19924)

pull/19650/head^2
Robert Fratto 6 months ago committed by GitHub
parent 2fce7da611
commit e9030beedf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 100
      pkg/engine/internal/scheduler/collector.go
  2. 1
      pkg/engine/internal/scheduler/scheduler.go
  3. 155
      pkg/engine/internal/util/ewma/ewma.go
  4. 69
      pkg/engine/internal/util/ewma/ewma_test.go
  5. 58
      pkg/engine/internal/util/ewma/window.go

@ -1,8 +1,14 @@
package scheduler
import (
"context"
"math"
"time"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/errgroup"
"github.com/grafana/loki/v3/pkg/engine/internal/util/ewma"
"github.com/grafana/loki/v3/pkg/engine/internal/workflow"
)
@ -11,6 +17,11 @@ import (
type collector struct {
sched *Scheduler
load prometheus.GaugeFunc
saturation prometheus.GaugeFunc
loadAverage *ewma.EWMA
saturationAverage *ewma.EWMA
tasksInflight *prometheus.Desc
streamsInflight *prometheus.Desc
@ -21,10 +32,41 @@ type collector struct {
var _ prometheus.Collector = (*collector)(nil)
// newCollector returns a new collector for the given scheduler. Load and
// saturation average metrics will only be computed after calling
// [collector.Process].
func newCollector(sched *Scheduler) *collector {
var (
loadSource ewma.SourceFunc = func() float64 { return computeLoad(sched) }
saturationSource ewma.SourceFunc = func() float64 { return computeSaturation(sched) }
)
return &collector{
sched: sched,
load: prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "loki_engine_scheduler_load",
Help: "Current load on the scheduler (count of running and pending tasks)",
}, loadSource),
saturation: prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "loki_engine_scheduler_saturation",
Help: "Current saturation of the scheduler (loki_engine_scheduler_load divided by non-idle worker threads)",
}, saturationSource),
loadAverage: ewma.MustNew(ewma.Options{
Name: "loki_engine_scheduler_load_average",
Help: "Average load on the scheduler over time",
UpdateFrequency: 5 * time.Second,
Windows: []time.Duration{1 * time.Minute, 5 * time.Minute, 15 * time.Minute},
}, loadSource),
saturationAverage: ewma.MustNew(ewma.Options{
Name: "loki_engine_scheduler_saturation_average",
Help: "Average saturation of the scheduler over time",
UpdateFrequency: 5 * time.Second,
Windows: []time.Duration{1 * time.Minute, 5 * time.Minute, 15 * time.Minute},
}, saturationSource),
tasksInflight: prometheus.NewDesc(
"loki_engine_scheduler_tasks_inflight",
"Number of in-flight tasks by state",
@ -59,7 +101,65 @@ func newCollector(sched *Scheduler) *collector {
}
}
// computeLoad returns the active load on the scheduler: the sum of running and
// pending tasks.
func computeLoad(sched *Scheduler) float64 {
sched.resourcesMut.RLock()
defer sched.resourcesMut.RUnlock()
var load uint64
for _, t := range sched.tasks {
if t.status.State == workflow.TaskStateRunning || t.status.State == workflow.TaskStatePending {
load++
}
}
return float64(load)
}
// computeSaturation returns the saturation of the scheduler: the load divided by
// the number of non-idle threads.
func computeSaturation(sched *Scheduler) float64 {
// The value here may be slightly off due to the asynchronous nature of the
// scheduler: the load may change while we're counting compute capacity, and
// threads may change states while we're looking at other connections.
//
// However, we should still be providing a good enough approximation of the
// scheduler's saturation.
load := computeLoad(sched)
var compute uint64
sched.connections.Range(func(key, _ any) bool {
_, ready, busy := countThreadStates(key.(*workerConn))
compute += uint64(ready + busy)
return true
})
if compute == 0 {
return math.Inf(1)
}
return float64(load) / float64(compute)
}
// Process performs stat computations for EWMA metrics of the collector. Process
// runs until the provided context is canceled.
func (mc *collector) Process(ctx context.Context) error {
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error { return mc.loadAverage.Monitor(ctx) })
g.Go(func() error { return mc.saturationAverage.Monitor(ctx) })
return g.Wait()
}
func (mc *collector) Collect(ch chan<- prometheus.Metric) {
mc.load.Collect(ch)
mc.saturation.Collect(ch)
mc.loadAverage.Collect(ch)
mc.saturationAverage.Collect(ch)
mc.collectResourceStats(ch)
mc.collectConnStats(ch)
}

@ -100,6 +100,7 @@ func (s *Scheduler) Service() services.Service {
func (s *Scheduler) run(ctx context.Context) error {
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error { return s.collector.Process(ctx) })
g.Go(func() error { return s.runAcceptLoop(ctx) })
g.Go(func() error { return s.runAssignLoop(ctx) })

@ -0,0 +1,155 @@
// Package ewma provides an implementation of an exponentially weighted moving
// average (EWMA) that can be reported as a Prometheus metric.
package ewma
import (
"context"
"errors"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
)
// Source is an interface that provides a value for an EWMA calculation.
type Source interface {
// Get returns the current value for the EWMA calculation.
Get() float64
}
// SourceFunc is a [Source] that provides its value from a function.
type SourceFunc func() float64
// Get returns the value provided by the SourceFunc.
func (sf SourceFunc) Get() float64 { return sf() }
// Options provides configuration options for an EWMA.
type Options struct {
// Name of the EWMA metric.
Name string
// Help text for the EWMA metric.
Help string
// UpdateFrequency is the frequency at which the EWMA is updated from the
// Source. A typical value is 5s.
UpdateFrequency time.Duration
// Windows is a list of time windows for which averages will be calculated;
// such as 1m, 5m, and 15m averages.
Windows []time.Duration
}
// EWMA provides an exponentially weighted moving average (EWMA). EWMA
// implements [prometheus.Collector].
type EWMA struct {
source Source
metric *prometheus.Desc
updateFreq time.Duration
running atomic.Bool
mut sync.RWMutex
windows []*window
}
var _ prometheus.Collector = (*EWMA)(nil)
// New creates a new EWMA with the given options and source. The returned EWMA
// must be started by calling [EWMA.Monitor].
func New(opts Options, source Source) (*EWMA, error) {
if source == nil {
return nil, errors.New("source must not be nil")
} else if opts.UpdateFrequency <= 0 {
return nil, errors.New("UpdateFrequency must be greater than zero")
}
windows := make([]*window, 0, len(opts.Windows))
for _, size := range opts.Windows {
if size <= 0 {
return nil, errors.New("window size must be greater than zero")
}
windows = append(windows, &window{Size: size})
}
return &EWMA{
source: source,
metric: prometheus.NewDesc(opts.Name, opts.Help, []string{"window"}, nil),
updateFreq: opts.UpdateFrequency,
windows: windows,
}, nil
}
// MustNew calls [New] and panics on error.
func MustNew(opts Options, source Source) *EWMA {
ewma, err := New(opts, source)
if err != nil {
panic(err)
}
return ewma
}
// Monitor starts the EWMA, polling values from the source at the configured
// update frequency.
//
// Monitor runs until the provided context is canceled. Monitor returns an error
// if a Monitor is already running.
func (ewma *EWMA) Monitor(ctx context.Context) error {
if !ewma.running.CompareAndSwap(false, true) {
return errors.New("EWMA is already running")
}
defer ewma.running.Store(false)
ticker := time.NewTicker(ewma.updateFreq)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
ewma.updateWindows()
}
}
}
func (ewma *EWMA) updateWindows() {
ewma.mut.Lock()
defer ewma.mut.Unlock()
// Make sure to use the same sample and "now" time for all windows for
// consistency.
var (
sample = ewma.source.Get()
now = time.Now()
)
for _, w := range ewma.windows {
w.Observe(sample, now)
}
}
// Collect writes the EWMA metrics to the given channel. Metrics are only
// written when the EWMA is running via [EWMA.Monitor].
func (ewma *EWMA) Collect(ch chan<- prometheus.Metric) {
if !ewma.running.Load() {
// Don't report any metrics unless the EWMA is running. Not running the
// EWMA is a bug and reporting its values would be misleading since
// they're not changing.
return
}
ewma.mut.RLock()
defer ewma.mut.RUnlock()
for _, w := range ewma.windows {
ch <- prometheus.MustNewConstMetric(ewma.metric, prometheus.GaugeValue, w.Value(), w.Name())
}
}
// Describe describes the metrics emitted by EWMA.
func (ewma *EWMA) Describe(ch chan<- *prometheus.Desc) {
ch <- ewma.metric
}

@ -0,0 +1,69 @@
package ewma_test
import (
"context"
"strings"
"sync"
"testing"
"testing/synctest"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/engine/internal/util/ewma"
)
func Test(t *testing.T) {
var idx int
observations := []float64{
1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
}
source := ewma.SourceFunc(func() float64 {
if idx >= len(observations) {
<-t.Context().Done()
return 0
}
res := observations[idx]
idx++
return res
})
synctest.Test(t, func(t *testing.T) {
reg := prometheus.NewRegistry()
m, err := ewma.New(ewma.Options{
Name: "my_ewma_metric",
UpdateFrequency: time.Minute,
Windows: []time.Duration{time.Minute, 5 * time.Minute, 15 * time.Minute},
}, source)
require.NoError(t, err)
reg.MustRegister(m)
var wg sync.WaitGroup
defer wg.Wait()
ctx, cancel := context.WithCancel(t.Context())
defer cancel()
wg.Go(func() { _ = m.Monitor(ctx) })
// Wait for all observations to have been made.
time.Sleep(time.Duration(len(observations)) * time.Minute)
synctest.Wait()
expect := `
# HELP
# TYPE my_ewma_metric gauge
my_ewma_metric{window="15m"} 9.581664816797675
my_ewma_metric{window="1m"} 19.41802329639137
my_ewma_metric{window="5m"} 15.584385505095714
`
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expect)))
})
}

@ -0,0 +1,58 @@
package ewma
import (
"math"
"strings"
"time"
)
// window represents a window size for EWMA calculations; such as a 15m window.
type window struct {
Size time.Duration
initialized bool
value float64
lastUpdate time.Time
}
// Name returns a name for the window, based on its size. Unlike
// [time.Duration.String], trailing zero units are removed, so 15m0s becomes
// 15m.
func (w *window) Name() string {
name := w.Size.String()
if strings.HasSuffix(name, "m0s") {
name = name[:len(name)-2] // Trim 0s
}
if strings.HasSuffix(name, "h0m") {
name = name[:len(name)-2] // Trim 0m
}
return name
}
// Observe updates the window with a new value. Observe reinitializes the window
// the now timestamp is earlier than now timestamp on the previous call.
func (w *window) Observe(value float64, now time.Time) {
// We'll also treat clock drift as reinitialization.
if !w.initialized || now.Before(w.lastUpdate) {
w.initialized = true
w.value = value
w.lastUpdate = now
return
}
// EWMA is calculated using the formula:
// ewma_new = decay * ewma_old + (1 - decay) * value
//
// Where decay is:
// e^(-delta/window_size)
delta := now.Sub(w.lastUpdate)
decay := math.Exp(-delta.Seconds() / w.Size.Seconds())
w.value = decay*w.value + (1-decay)*value
w.lastUpdate = now
}
// Value returns the current EWMA value.
func (w *window) Value() float64 { return w.value }
Loading…
Cancel
Save