The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
grafana/pkg/util/debouncer/debouncer.go

311 lines
8.4 KiB

package debouncer
import (
"context"
"errors"
"sync"
"time"
"github.com/grafana/dskit/instrument"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
ErrBufferFull = errors.New("debouncer buffer full")
)
type ProcessFunc[T comparable] func(context.Context, T) error
type ErrorFunc[T comparable] func(T, error)
type metrics struct {
itemsAddedCounter prometheus.Counter
itemsDroppedCounter prometheus.Counter
itemsProcessedCounter prometheus.Counter
processingErrorsCounter prometheus.Counter
processingDurationHistogram prometheus.Histogram
}
func newMetrics(reg prometheus.Registerer, name string) *metrics {
return &metrics{
itemsAddedCounter: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "debouncer_items_added_total",
Help: "Total number of items added to the debouncer",
ConstLabels: prometheus.Labels{
"name": name,
},
}),
itemsDroppedCounter: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "debouncer_items_dropped_total",
Help: "Total number of items dropped due to a full buffer",
ConstLabels: prometheus.Labels{
"name": name,
},
}),
itemsProcessedCounter: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "debouncer_items_processed_total",
Help: "Total number of items processed by the debouncer",
ConstLabels: prometheus.Labels{
"name": name,
},
}),
processingErrorsCounter: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "debouncer_processing_errors_total",
Help: "Total number of errors during processing",
ConstLabels: prometheus.Labels{
"name": name,
},
}),
processingDurationHistogram: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "debouncer_processing_duration_seconds",
Help: "Time taken to process items",
Buckets: instrument.DefBuckets,
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 160,
NativeHistogramMinResetDuration: time.Hour,
ConstLabels: prometheus.Labels{
"name": name,
},
}),
}
}
// DebouncerOpts hold all the options to create a debouncer group.
type DebouncerOpts[T comparable] struct {
// Name should be a unique name for this debouncer group. It is
// also used a name label value for the metrics.
Name string
// BufferSize is the maximum number of pending events to buffer.
BufferSize int
// ErrorHandler is the function that is called when a process for a given
// key returns an error while running.
ErrorHandler ErrorFunc[T]
// ProcessHandler is the function that is called once a process for a given
// key should be run.
ProcessHandler ProcessFunc[T]
// MinWait is the cooldown period after receiving an event. If another event with the
// same key arrives during this period, the timer resets and we wait another MinWait duration.
MinWait time.Duration
// MaxWait is the maximum time any event will wait before processing. Even if new events
// for the same key keep arriving, we guarantee processing after MaxWait from the first event.
MaxWait time.Duration
Reg prometheus.Registerer
}
type Group[T comparable] struct {
buffer chan T
// mutex protecting the debouncers map.
debouncersMu sync.Mutex
debouncers map[T]*debouncer[T]
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
errorHandler ErrorFunc[T]
processHandler ProcessFunc[T]
minWait time.Duration
maxWait time.Duration
metrics *metrics
}
// NewGroup creates a new debouncer group for processing events with unique keys.
//
// A debouncer group helps optimize expensive operations by:
// 1. Grouping identical events that occur in rapid succession
// 2. Processing each unique key only once after waiting periods expire
//
// Example usage:
//
// group := debouncer.NewGroup(DebouncerOpts[string]{
// BufferSize: 1000,
// ProcessHandler: func(ctx context.Context, key string) error {
// // This is where you perform the expensive operation
// return doSuperExpensiveCommand(key)
// }
// MinWait: time.Second * 10,
// MaxWait: time.Minute,
// })
//
// // Start the debouncer group.
// group.Start(ctx)
//
// // Queue events
// if err := group.Add("user-1"); err != nil {
// // Do something with the error.
// }
// // Adding the same key resets MinWait but not MaxWait
// if err := group.Add("user-1"); err != nil {
// // Do something with the error.
// }
//
// The event will be processed when either MinWait expires (after the most recent add)
// or MaxWait expires (after the first add), whichever comes first.
func NewGroup[T comparable](opts DebouncerOpts[T]) (*Group[T], error) {
if opts.BufferSize <= 0 {
opts.BufferSize = 100
}
if opts.MinWait <= 0 {
opts.MinWait = time.Minute
}
if opts.MaxWait <= 0 {
opts.MaxWait = 5 * time.Minute
}
if opts.MinWait > opts.MaxWait {
return nil, errors.New("minWait is bigger than maxWait")
}
if opts.ProcessHandler == nil {
return nil, errors.New("processHandler is required")
}
if opts.ErrorHandler == nil {
opts.ErrorHandler = func(_ T, _ error) {}
}
return &Group[T]{
buffer: make(chan T, opts.BufferSize),
debouncers: make(map[T]*debouncer[T]),
processHandler: opts.ProcessHandler,
errorHandler: opts.ErrorHandler,
minWait: opts.MinWait,
maxWait: opts.MaxWait,
metrics: newMetrics(opts.Reg, opts.Name),
}, nil
}
// Add will create a new debouncer for the given Key if it doesn't exist yet.
// If a key has already a debouncer it will either reset the MinWait timer for
// this key, or if they key is already running its process be no-op.
func (g *Group[T]) Add(value T) error {
select {
case g.buffer <- value:
g.metrics.itemsAddedCounter.Inc()
return nil
default:
g.metrics.itemsDroppedCounter.Inc()
return ErrBufferFull
}
}
func (g *Group[T]) Start(ctx context.Context) {
g.ctx, g.cancel = context.WithCancel(ctx)
g.wg.Add(1)
go func() {
defer g.wg.Done()
for {
select {
case <-g.ctx.Done():
return
case value := <-g.buffer:
g.processValue(value)
}
}
}()
}
func (g *Group[T]) Stop() {
if g.cancel != nil {
g.cancel()
g.wg.Wait()
}
}
func (g *Group[T]) processValue(key T) {
g.debouncersMu.Lock()
deb, ok := g.debouncers[key]
if !ok {
deb = newDebouncer[T](g.minWait, g.maxWait, key, func(v T) {
g.processWithMetrics(g.ctx, v, g.processHandler)
g.debouncersMu.Lock()
defer g.debouncersMu.Unlock()
if current, exists := g.debouncers[key]; exists && current == deb {
delete(g.debouncers, key)
}
})
g.wg.Add(1)
go func() {
defer g.wg.Done()
deb.run(g.ctx)
}()
g.debouncers[key] = deb
}
g.debouncersMu.Unlock()
deb.reset()
}
func (g *Group[T]) processWithMetrics(ctx context.Context, value T, processFunc ProcessFunc[T]) {
timer := prometheus.NewTimer(g.metrics.processingDurationHistogram)
defer timer.ObserveDuration()
g.metrics.itemsProcessedCounter.Inc()
if err := processFunc(ctx, value); err != nil {
g.errorHandler(value, err)
g.metrics.processingErrorsCounter.Inc()
}
}
// debouncer handles debouncing for a specific key.
type debouncer[T comparable] struct {
key T
resetChan chan struct{}
minWait time.Duration
maxWait time.Duration
processFunc func(T)
}
// newDebouncer creates a new key debouncer.
func newDebouncer[T comparable](minWait, maxWait time.Duration, key T, processFunc func(T)) *debouncer[T] {
deb := &debouncer[T]{
key: key,
resetChan: make(chan struct{}, 1),
minWait: minWait,
maxWait: maxWait,
processFunc: processFunc,
}
return deb
}
// reset triggers a timer reset for the minWait.
func (d *debouncer[T]) reset() {
select {
case d.resetChan <- struct{}{}:
// Value sent successfully.
default:
// Value was dropped. Is not an issue as
// a reset is already about to being processed
// or the process is being run.
}
}
// run manages the debouncing process for a specific key.
func (d *debouncer[T]) run(ctx context.Context) {
// Create timers after getting the first updateChan.
minTimer := time.NewTimer(d.minWait)
maxTimer := time.NewTimer(d.maxWait)
defer func() {
minTimer.Stop()
maxTimer.Stop()
}()
for {
select {
case <-ctx.Done():
return
case <-d.resetChan:
minTimer.Stop()
minTimer.Reset(d.minWait)
case <-minTimer.C:
d.processFunc(d.key)
return
case <-maxTimer.C:
d.processFunc(d.key)
return
}
}
}