The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
grafana/pkg/services/alerting/ticker.go

86 lines
2.7 KiB

package alerting
import (
"fmt"
"time"
"github.com/benbjohnson/clock"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/alerting/metrics"
)
// Ticker is a ticker to power the alerting scheduler. it's like a time.Ticker, except:
// * it doesn't drop ticks for slow receivers, rather, it queues up. so that callers are in control to instrument what's going on.
// * it ticks on interval marks or very shortly after. this provides a predictable load pattern
// (this shouldn't cause too much load contention issues because the next steps in the pipeline just process at their own pace)
// * the timestamps are used to mark "last datapoint to query for" and as such, are a configurable amount of seconds in the past
type Ticker struct {
C chan time.Time
clock clock.Clock
last time.Time
interval time.Duration
metrics *metrics.Ticker
stopCh chan struct{}
}
// NewTicker returns a Ticker that ticks on interval marks (or very shortly after) starting at c.Now(), and never drops ticks. interval should not be negative or zero.
func NewTicker(c clock.Clock, interval time.Duration, metric *metrics.Ticker) *Ticker {
if interval <= 0 {
panic(fmt.Errorf("non-positive interval [%v] is not allowed", interval))
}
t := &Ticker{
C: make(chan time.Time),
clock: c,
last: getStartTick(c, interval),
interval: interval,
metrics: metric,
stopCh: make(chan struct{}),
}
metric.IntervalSeconds.Set(t.interval.Seconds()) // Seconds report fractional part as well, so it matches the format of the timestamp we report below
go t.run()
return t
}
func getStartTick(clk clock.Clock, interval time.Duration) time.Time {
nano := clk.Now().UnixNano()
return time.Unix(0, nano-(nano%interval.Nanoseconds()))
}
func (t *Ticker) run() {
logger := log.New("ticker")
logger.Info("starting", "first_tick", t.last.Add(t.interval))
LOOP:
for {
next := t.last.Add(t.interval) // calculate the time of the next tick
t.metrics.NextTickTime.Set(float64(next.UnixNano()) / 1e9)
diff := t.clock.Now().Sub(next) // calculate the difference between the current time and the next tick
// if difference is not negative, then it should tick
if diff >= 0 {
select {
case t.C <- next:
case <-t.stopCh:
break LOOP
}
t.last = next
t.metrics.LastTickTime.Set(float64(next.UnixNano()) / 1e9)
continue
}
// tick is too young. try again when ...
select {
case <-t.clock.After(-diff): // ...it'll definitely be old enough
case <-t.stopCh:
break LOOP
}
}
logger.Info("stopped", "last_tick", t.last)
}
// Stop stops the ticker. It does not close the C channel
func (t *Ticker) Stop() {
select {
case t.stopCh <- struct{}{}:
default:
// already stopped
}
}