The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
grafana/pkg/services/ngalert/schedule/schedule.go

515 lines
15 KiB

package schedule
import (
"context"
"fmt"
"sync"
"time"
"github.com/benbjohnson/clock"
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
"golang.org/x/sync/errgroup"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/alerting"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/state"
"github.com/grafana/grafana/pkg/services/ngalert/store"
"github.com/grafana/grafana/pkg/tsdb"
)
// timeNow makes it possible to test usage of time
var timeNow = time.Now
// ScheduleService handles scheduling
type ScheduleService interface {
Ticker(context.Context, *state.Manager) error
Pause() error
Unpause() error
WarmStateCache(*state.Manager)
// the following are used by tests only used for tests
evalApplied(models.AlertRuleKey, time.Time)
stopApplied(models.AlertRuleKey)
overrideCfg(cfg SchedulerCfg)
}
func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRuleKey, evalCh <-chan *evalContext, stopCh <-chan struct{}, stateManager *state.Manager) error {
sch.log.Debug("alert rule routine started", "key", key)
evalRunning := false
var attempt int64
var alertRule *models.AlertRule
for {
select {
case ctx := <-evalCh:
if evalRunning {
continue
}
evaluate := func(attempt int64) error {
start := timeNow()
// fetch latest alert rule version
if alertRule == nil || alertRule.Version < ctx.version {
q := models.GetAlertRuleByUIDQuery{OrgID: key.OrgID, UID: key.UID}
err := sch.ruleStore.GetAlertRuleByUID(&q)
if err != nil {
sch.log.Error("failed to fetch alert rule", "key", key)
return err
}
alertRule = q.Result
sch.log.Debug("new alert rule version fetched", "title", alertRule.Title, "key", key, "version", alertRule.Version)
}
condition := models.Condition{
Condition: alertRule.Condition,
OrgID: alertRule.OrgID,
Data: alertRule.Data,
}
results, err := sch.evaluator.ConditionEval(&condition, ctx.now, sch.dataService)
var (
end = timeNow()
tenant = fmt.Sprint(alertRule.OrgID)
dur = end.Sub(start).Seconds()
)
sch.metrics.EvalTotal.WithLabelValues(tenant).Inc()
sch.metrics.EvalDuration.WithLabelValues(tenant).Observe(dur)
if err != nil {
sch.metrics.EvalFailures.WithLabelValues(tenant).Inc()
// consider saving alert instance on error
sch.log.Error("failed to evaluate alert rule", "title", alertRule.Title,
"key", key, "attempt", attempt, "now", ctx.now, "duration", end.Sub(start), "error", err)
return err
}
processedStates := stateManager.ProcessEvalResults(alertRule, results)
sch.saveAlertStates(processedStates)
alerts := FromAlertStateToPostableAlerts(processedStates)
sch.log.Debug("sending alerts to notifier", "count", len(alerts.PostableAlerts), "alerts", alerts.PostableAlerts)
err = sch.sendAlerts(alerts)
if err != nil {
sch.log.Error("failed to put alerts in the notifier", "count", len(alerts.PostableAlerts), "err", err)
}
return nil
}
func() {
evalRunning = true
defer func() {
evalRunning = false
sch.evalApplied(key, ctx.now)
}()
for attempt = 0; attempt < sch.maxAttempts; attempt++ {
err := evaluate(attempt)
if err == nil {
break
}
}
}()
case <-stopCh:
sch.stopApplied(key)
sch.log.Debug("stopping alert rule routine", "key", key)
// interrupt evaluation if it's running
return nil
case <-grafanaCtx.Done():
return grafanaCtx.Err()
}
}
}
// Notifier handles the delivery of alert notifications to the end user
type Notifier interface {
PutAlerts(alerts apimodels.PostableAlerts) error
}
type schedule struct {
// base tick rate (fastest possible configured check)
baseInterval time.Duration
// each alert rule gets its own channel and routine
registry alertRuleRegistry
maxAttempts int64
clock clock.Clock
heartbeat *alerting.Ticker
// evalApplied is only used for tests: test code can set it to non-nil
// function, and then it'll be called from the event loop whenever the
// message from evalApplied is handled.
evalAppliedFunc func(models.AlertRuleKey, time.Time)
// stopApplied is only used for tests: test code can set it to non-nil
// function, and then it'll be called from the event loop whenever the
// message from stopApplied is handled.
stopAppliedFunc func(models.AlertRuleKey)
log log.Logger
evaluator eval.Evaluator
ruleStore store.RuleStore
instanceStore store.InstanceStore
dataService *tsdb.Service
notifier Notifier
metrics *metrics.Metrics
}
// SchedulerCfg is the scheduler configuration.
type SchedulerCfg struct {
C clock.Clock
BaseInterval time.Duration
Logger log.Logger
EvalAppliedFunc func(models.AlertRuleKey, time.Time)
MaxAttempts int64
StopAppliedFunc func(models.AlertRuleKey)
Evaluator eval.Evaluator
RuleStore store.RuleStore
InstanceStore store.InstanceStore
Notifier Notifier
Metrics *metrics.Metrics
}
// NewScheduler returns a new schedule.
func NewScheduler(cfg SchedulerCfg, dataService *tsdb.Service) *schedule {
ticker := alerting.NewTicker(cfg.C.Now(), time.Second*0, cfg.C, int64(cfg.BaseInterval.Seconds()))
sch := schedule{
registry: alertRuleRegistry{alertRuleInfo: make(map[models.AlertRuleKey]alertRuleInfo)},
maxAttempts: cfg.MaxAttempts,
clock: cfg.C,
baseInterval: cfg.BaseInterval,
log: cfg.Logger,
heartbeat: ticker,
evalAppliedFunc: cfg.EvalAppliedFunc,
stopAppliedFunc: cfg.StopAppliedFunc,
evaluator: cfg.Evaluator,
ruleStore: cfg.RuleStore,
instanceStore: cfg.InstanceStore,
dataService: dataService,
notifier: cfg.Notifier,
metrics: cfg.Metrics,
}
return &sch
}
func (sch *schedule) overrideCfg(cfg SchedulerCfg) {
sch.clock = cfg.C
sch.baseInterval = cfg.BaseInterval
sch.heartbeat = alerting.NewTicker(cfg.C.Now(), time.Second*0, cfg.C, int64(cfg.BaseInterval.Seconds()))
sch.evalAppliedFunc = cfg.EvalAppliedFunc
sch.stopAppliedFunc = cfg.StopAppliedFunc
}
func (sch *schedule) evalApplied(alertDefKey models.AlertRuleKey, now time.Time) {
if sch.evalAppliedFunc == nil {
return
}
sch.evalAppliedFunc(alertDefKey, now)
}
func (sch *schedule) stopApplied(alertDefKey models.AlertRuleKey) {
if sch.stopAppliedFunc == nil {
return
}
sch.stopAppliedFunc(alertDefKey)
}
func (sch *schedule) Pause() error {
if sch == nil {
return fmt.Errorf("scheduler is not initialised")
}
sch.heartbeat.Pause()
sch.log.Info("alert rule scheduler paused", "now", sch.clock.Now())
return nil
}
func (sch *schedule) Unpause() error {
if sch == nil {
return fmt.Errorf("scheduler is not initialised")
}
sch.heartbeat.Unpause()
sch.log.Info("alert rule scheduler unpaused", "now", sch.clock.Now())
return nil
}
func (sch *schedule) Ticker(grafanaCtx context.Context, stateManager *state.Manager) error {
dispatcherGroup, ctx := errgroup.WithContext(grafanaCtx)
for {
select {
case tick := <-sch.heartbeat.C:
tickNum := tick.Unix() / int64(sch.baseInterval.Seconds())
alertRules := sch.fetchAllDetails()
sch.log.Debug("alert rules fetched", "count", len(alertRules))
// registeredDefinitions is a map used for finding deleted alert rules
// initially it is assigned to all known alert rules from the previous cycle
// each alert rule found also in this cycle is removed
// so, at the end, the remaining registered alert rules are the deleted ones
registeredDefinitions := sch.registry.keyMap()
type readyToRunItem struct {
key models.AlertRuleKey
ruleInfo alertRuleInfo
}
readyToRun := make([]readyToRunItem, 0)
for _, item := range alertRules {
key := item.GetKey()
itemVersion := item.Version
newRoutine := !sch.registry.exists(key)
ruleInfo := sch.registry.getOrCreateInfo(key, itemVersion)
invalidInterval := item.IntervalSeconds%int64(sch.baseInterval.Seconds()) != 0
if newRoutine && !invalidInterval {
dispatcherGroup.Go(func() error {
return sch.ruleRoutine(ctx, key, ruleInfo.evalCh, ruleInfo.stopCh, stateManager)
})
}
if invalidInterval {
// this is expected to be always false
// give that we validate interval during alert rule updates
sch.log.Debug("alert rule with invalid interval will be ignored: interval should be divided exactly by scheduler interval", "key", key, "interval", time.Duration(item.IntervalSeconds)*time.Second, "scheduler interval", sch.baseInterval)
continue
}
itemFrequency := item.IntervalSeconds / int64(sch.baseInterval.Seconds())
if item.IntervalSeconds != 0 && tickNum%itemFrequency == 0 {
readyToRun = append(readyToRun, readyToRunItem{key: key, ruleInfo: ruleInfo})
}
// remove the alert rule from the registered alert rules
delete(registeredDefinitions, key)
}
var step int64 = 0
if len(readyToRun) > 0 {
step = sch.baseInterval.Nanoseconds() / int64(len(readyToRun))
}
for i := range readyToRun {
item := readyToRun[i]
time.AfterFunc(time.Duration(int64(i)*step), func() {
item.ruleInfo.evalCh <- &evalContext{now: tick, version: item.ruleInfo.version}
})
}
// unregister and stop routines of the deleted alert rules
for key := range registeredDefinitions {
ruleInfo, err := sch.registry.get(key)
if err != nil {
sch.log.Error("failed to get alert rule routine information", "err", err)
continue
}
ruleInfo.stopCh <- struct{}{}
sch.registry.del(key)
}
case <-grafanaCtx.Done():
waitErr := dispatcherGroup.Wait()
orgIds, err := sch.instanceStore.FetchOrgIds()
if err != nil {
sch.log.Error("unable to fetch orgIds", "msg", err.Error())
}
for _, v := range orgIds {
sch.saveAlertStates(stateManager.GetAll(v))
}
stateManager.Close()
return waitErr
}
}
}
func (sch *schedule) sendAlerts(alerts apimodels.PostableAlerts) error {
return sch.notifier.PutAlerts(alerts)
}
func (sch *schedule) saveAlertStates(states []*state.State) {
sch.log.Debug("saving alert states", "count", len(states))
for _, s := range states {
cmd := models.SaveAlertInstanceCommand{
RuleOrgID: s.OrgID,
RuleUID: s.AlertRuleUID,
Labels: models.InstanceLabels(s.Labels),
State: models.InstanceStateType(s.State.String()),
LastEvalTime: s.LastEvaluationTime,
CurrentStateSince: s.StartsAt,
CurrentStateEnd: s.EndsAt,
}
err := sch.instanceStore.SaveAlertInstance(&cmd)
if err != nil {
sch.log.Error("failed to save alert state", "uid", s.AlertRuleUID, "orgId", s.OrgID, "labels", s.Labels.String(), "state", s.State.String(), "msg", err.Error())
}
}
}
func (sch *schedule) WarmStateCache(st *state.Manager) {
sch.log.Info("warming cache for startup")
st.ResetCache()
orgIds, err := sch.instanceStore.FetchOrgIds()
if err != nil {
sch.log.Error("unable to fetch orgIds", "msg", err.Error())
}
var states []*state.State
for _, orgId := range orgIds {
// Get Rules
ruleCmd := models.ListAlertRulesQuery{
OrgID: orgId,
}
if err := sch.ruleStore.GetOrgAlertRules(&ruleCmd); err != nil {
sch.log.Error("unable to fetch previous state", "msg", err.Error())
}
ruleByUID := make(map[string]*models.AlertRule, len(ruleCmd.Result))
for _, rule := range ruleCmd.Result {
ruleByUID[rule.UID] = rule
}
// Get Instances
cmd := models.ListAlertInstancesQuery{
RuleOrgID: orgId,
}
if err := sch.instanceStore.ListAlertInstances(&cmd); err != nil {
sch.log.Error("unable to fetch previous state", "msg", err.Error())
}
for _, entry := range cmd.Result {
ruleForEntry, ok := ruleByUID[entry.RuleUID]
if !ok {
sch.log.Error("rule not found for instance, ignoring", "rule", entry.RuleUID)
continue
}
lbs := map[string]string(entry.Labels)
cacheId, err := entry.Labels.StringKey()
if err != nil {
sch.log.Error("error getting cacheId for entry", "msg", err.Error())
}
stateForEntry := &state.State{
AlertRuleUID: entry.RuleUID,
OrgID: entry.RuleOrgID,
CacheId: cacheId,
Labels: lbs,
State: translateInstanceState(entry.CurrentState),
Results: []state.Evaluation{},
StartsAt: entry.CurrentStateSince,
EndsAt: entry.CurrentStateEnd,
LastEvaluationTime: entry.LastEvalTime,
Annotations: ruleForEntry.Annotations,
}
states = append(states, stateForEntry)
}
}
st.Put(states)
}
func translateInstanceState(state models.InstanceStateType) eval.State {
switch {
case state == models.InstanceStateFiring:
return eval.Alerting
case state == models.InstanceStateNormal:
return eval.Normal
default:
return eval.Error
}
}
type alertRuleRegistry struct {
mu sync.Mutex
alertRuleInfo map[models.AlertRuleKey]alertRuleInfo
}
// getOrCreateInfo returns the channel for the specific alert rule
// if it does not exists creates one and returns it
func (r *alertRuleRegistry) getOrCreateInfo(key models.AlertRuleKey, ruleVersion int64) alertRuleInfo {
r.mu.Lock()
defer r.mu.Unlock()
info, ok := r.alertRuleInfo[key]
if !ok {
r.alertRuleInfo[key] = alertRuleInfo{evalCh: make(chan *evalContext), stopCh: make(chan struct{}), version: ruleVersion}
return r.alertRuleInfo[key]
}
info.version = ruleVersion
r.alertRuleInfo[key] = info
return info
}
// get returns the channel for the specific alert rule
// if the key does not exist returns an error
func (r *alertRuleRegistry) get(key models.AlertRuleKey) (*alertRuleInfo, error) {
r.mu.Lock()
defer r.mu.Unlock()
info, ok := r.alertRuleInfo[key]
if !ok {
return nil, fmt.Errorf("%v key not found", key)
}
return &info, nil
}
func (r *alertRuleRegistry) exists(key models.AlertRuleKey) bool {
r.mu.Lock()
defer r.mu.Unlock()
_, ok := r.alertRuleInfo[key]
return ok
}
func (r *alertRuleRegistry) del(key models.AlertRuleKey) {
r.mu.Lock()
defer r.mu.Unlock()
delete(r.alertRuleInfo, key)
}
func (r *alertRuleRegistry) iter() <-chan models.AlertRuleKey {
c := make(chan models.AlertRuleKey)
f := func() {
r.mu.Lock()
defer r.mu.Unlock()
for k := range r.alertRuleInfo {
c <- k
}
close(c)
}
go f()
return c
}
func (r *alertRuleRegistry) keyMap() map[models.AlertRuleKey]struct{} {
definitionsIDs := make(map[models.AlertRuleKey]struct{})
for k := range r.iter() {
definitionsIDs[k] = struct{}{}
}
return definitionsIDs
}
type alertRuleInfo struct {
evalCh chan *evalContext
stopCh chan struct{}
version int64
}
type evalContext struct {
now time.Time
version int64
}