Alerting: Scheduler to use AlertRule (#52354)

* update GetAlertRulesForSchedulingQuery to have result AlertRule
* update fetcher utils and registry to support AlertRule
* alertRuleInfo to use alert rule instead of version
* update updateCh hanlder of ruleRoutine to just clean up the state. The updated rule will be provided at the next evaluation
* update evalCh handler of ruleRoutine to use rule from the message and clear state as well as update extra labels

* remove unused function in ruleRoutine
* remove unused model SchedulableAlertRule

* store rule version in ruleRoutine instead of rule
* do not call the sender if nothing to send
pull/52813/head
Yuriy Tseretyan 3 years ago committed by GitHub
parent 58d8d7c44d
commit a081764fd8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 18
      pkg/services/ngalert/models/alert_rule.go
  2. 4
      pkg/services/ngalert/schedule/fetcher.go
  3. 10
      pkg/services/ngalert/schedule/fetcher_test.go
  4. 26
      pkg/services/ngalert/schedule/registry.go
  5. 40
      pkg/services/ngalert/schedule/registry_test.go
  6. 94
      pkg/services/ngalert/schedule/schedule.go
  7. 557
      pkg/services/ngalert/schedule/schedule_unit_test.go
  8. 4
      pkg/services/ngalert/store/alert_rule.go
  9. 9
      pkg/services/ngalert/store/testing.go

@ -137,17 +137,6 @@ type AlertRule struct {
Labels map[string]string Labels map[string]string
} }
type SchedulableAlertRule struct {
Title string
UID string `xorm:"uid"`
OrgID int64 `xorm:"org_id"`
IntervalSeconds int64
Version int64
NamespaceUID string `xorm:"namespace_uid"`
RuleGroup string
RuleGroupIndex int `xorm:"rule_group_idx"`
}
type LabelOption func(map[string]string) type LabelOption func(map[string]string)
func WithoutInternalLabels() LabelOption { func WithoutInternalLabels() LabelOption {
@ -228,11 +217,6 @@ func (alertRule *AlertRule) GetGroupKey() AlertRuleGroupKey {
return AlertRuleGroupKey{OrgID: alertRule.OrgID, NamespaceUID: alertRule.NamespaceUID, RuleGroup: alertRule.RuleGroup} return AlertRuleGroupKey{OrgID: alertRule.OrgID, NamespaceUID: alertRule.NamespaceUID, RuleGroup: alertRule.RuleGroup}
} }
// GetKey returns the alert definitions identifier
func (alertRule *SchedulableAlertRule) GetKey() AlertRuleKey {
return AlertRuleKey{OrgID: alertRule.OrgID, UID: alertRule.UID}
}
// PreSave sets default values and loads the updated model for each alert query. // PreSave sets default values and loads the updated model for each alert query.
func (alertRule *AlertRule) PreSave(timeNow func() time.Time) error { func (alertRule *AlertRule) PreSave(timeNow func() time.Time) error {
for i, q := range alertRule.Data { for i, q := range alertRule.Data {
@ -316,7 +300,7 @@ type ListAlertRulesQuery struct {
} }
type GetAlertRulesForSchedulingQuery struct { type GetAlertRulesForSchedulingQuery struct {
Result []*SchedulableAlertRule Result []*AlertRule
} }
// ListNamespaceAlertRulesQuery is the query for listing namespace alert rules // ListNamespaceAlertRulesQuery is the query for listing namespace alert rules

@ -13,7 +13,7 @@ import (
// hashUIDs returns a fnv64 hash of the UIDs for all alert rules. // hashUIDs returns a fnv64 hash of the UIDs for all alert rules.
// The order of the alert rules does not matter as hashUIDs sorts // The order of the alert rules does not matter as hashUIDs sorts
// the UIDs in increasing order. // the UIDs in increasing order.
func hashUIDs(alertRules []*models.SchedulableAlertRule) uint64 { func hashUIDs(alertRules []*models.AlertRule) uint64 {
h := fnv.New64() h := fnv.New64()
for _, uid := range sortedUIDs(alertRules) { for _, uid := range sortedUIDs(alertRules) {
// We can ignore err as fnv64 does not return an error // We can ignore err as fnv64 does not return an error
@ -24,7 +24,7 @@ func hashUIDs(alertRules []*models.SchedulableAlertRule) uint64 {
} }
// sortedUIDs returns a slice of sorted UIDs. // sortedUIDs returns a slice of sorted UIDs.
func sortedUIDs(alertRules []*models.SchedulableAlertRule) []string { func sortedUIDs(alertRules []*models.AlertRule) []string {
uids := make([]string, 0, len(alertRules)) uids := make([]string, 0, len(alertRules))
for _, alertRule := range alertRules { for _, alertRule := range alertRules {
uids = append(uids, alertRule.UID) uids = append(uids, alertRule.UID)

@ -9,18 +9,18 @@ import (
) )
func TestHashUIDs(t *testing.T) { func TestHashUIDs(t *testing.T) {
r := []*models.SchedulableAlertRule{{UID: "foo"}, {UID: "bar"}} r := []*models.AlertRule{{UID: "foo"}, {UID: "bar"}}
assert.Equal(t, uint64(0xade76f55c76a1c48), hashUIDs(r)) assert.Equal(t, uint64(0xade76f55c76a1c48), hashUIDs(r))
// expect the same hash irrespective of order // expect the same hash irrespective of order
r = []*models.SchedulableAlertRule{{UID: "bar"}, {UID: "foo"}} r = []*models.AlertRule{{UID: "bar"}, {UID: "foo"}}
assert.Equal(t, uint64(0xade76f55c76a1c48), hashUIDs(r)) assert.Equal(t, uint64(0xade76f55c76a1c48), hashUIDs(r))
// expect a different hash // expect a different hash
r = []*models.SchedulableAlertRule{{UID: "bar"}} r = []*models.AlertRule{{UID: "bar"}}
assert.Equal(t, uint64(0xd8d9a5186bad3880), hashUIDs(r)) assert.Equal(t, uint64(0xd8d9a5186bad3880), hashUIDs(r))
// slice with no items // slice with no items
r = []*models.SchedulableAlertRule{} r = []*models.AlertRule{}
assert.Equal(t, uint64(0xcbf29ce484222325), hashUIDs(r)) assert.Equal(t, uint64(0xcbf29ce484222325), hashUIDs(r))
// a different slice with no items should have the same hash // a different slice with no items should have the same hash
r = []*models.SchedulableAlertRule{} r = []*models.AlertRule{}
assert.Equal(t, uint64(0xcbf29ce484222325), hashUIDs(r)) assert.Equal(t, uint64(0xcbf29ce484222325), hashUIDs(r))
} }

@ -92,7 +92,7 @@ func newAlertRuleInfo(parent context.Context) *alertRuleInfo {
// - true when message was sent // - true when message was sent
// - false when the send operation is stopped // - false when the send operation is stopped
// the second element contains a dropped message that was sent by a concurrent sender. // the second element contains a dropped message that was sent by a concurrent sender.
func (a *alertRuleInfo) eval(t time.Time, version int64) (bool, *evaluation) { func (a *alertRuleInfo) eval(t time.Time, rule *models.AlertRule) (bool, *evaluation) {
// read the channel in unblocking manner to make sure that there is no concurrent send operation. // read the channel in unblocking manner to make sure that there is no concurrent send operation.
var droppedMsg *evaluation var droppedMsg *evaluation
select { select {
@ -103,7 +103,7 @@ func (a *alertRuleInfo) eval(t time.Time, version int64) (bool, *evaluation) {
select { select {
case a.evalCh <- &evaluation{ case a.evalCh <- &evaluation{
scheduledAt: t, scheduledAt: t,
version: version, rule: rule,
}: }:
return true, droppedMsg return true, droppedMsg
case <-a.ctx.Done(): case <-a.ctx.Done():
@ -136,52 +136,52 @@ func (a *alertRuleInfo) update(lastVersion ruleVersion) bool {
type evaluation struct { type evaluation struct {
scheduledAt time.Time scheduledAt time.Time
version int64 rule *models.AlertRule
} }
type schedulableAlertRulesRegistry struct { type alertRulesRegistry struct {
rules map[models.AlertRuleKey]*models.SchedulableAlertRule rules map[models.AlertRuleKey]*models.AlertRule
mu sync.Mutex mu sync.Mutex
} }
// all returns all rules in the registry. // all returns all rules in the registry.
func (r *schedulableAlertRulesRegistry) all() []*models.SchedulableAlertRule { func (r *alertRulesRegistry) all() []*models.AlertRule {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
result := make([]*models.SchedulableAlertRule, 0, len(r.rules)) result := make([]*models.AlertRule, 0, len(r.rules))
for _, rule := range r.rules { for _, rule := range r.rules {
result = append(result, rule) result = append(result, rule)
} }
return result return result
} }
func (r *schedulableAlertRulesRegistry) get(k models.AlertRuleKey) *models.SchedulableAlertRule { func (r *alertRulesRegistry) get(k models.AlertRuleKey) *models.AlertRule {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
return r.rules[k] return r.rules[k]
} }
// set replaces all rules in the registry. // set replaces all rules in the registry.
func (r *schedulableAlertRulesRegistry) set(rules []*models.SchedulableAlertRule) { func (r *alertRulesRegistry) set(rules []*models.AlertRule) {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
r.rules = make(map[models.AlertRuleKey]*models.SchedulableAlertRule) r.rules = make(map[models.AlertRuleKey]*models.AlertRule)
for _, rule := range rules { for _, rule := range rules {
r.rules[rule.GetKey()] = rule r.rules[rule.GetKey()] = rule
} }
} }
// update inserts or replaces a rule in the registry. // update inserts or replaces a rule in the registry.
func (r *schedulableAlertRulesRegistry) update(rule *models.SchedulableAlertRule) { func (r *alertRulesRegistry) update(rule *models.AlertRule) {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
r.rules[rule.GetKey()] = rule r.rules[rule.GetKey()] = rule
} }
// del removes pair that has specific key from schedulableAlertRulesRegistry. // del removes pair that has specific key from alertRulesRegistry.
// Returns 2-tuple where the first element is value of the removed pair // Returns 2-tuple where the first element is value of the removed pair
// and the second element indicates whether element with the specified key existed. // and the second element indicates whether element with the specified key existed.
func (r *schedulableAlertRulesRegistry) del(k models.AlertRuleKey) (*models.SchedulableAlertRule, bool) { func (r *alertRulesRegistry) del(k models.AlertRuleKey) (*models.AlertRule, bool) {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
rule, ok := r.rules[k] rule, ok := r.rules[k]

@ -91,14 +91,14 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
r := newAlertRuleInfo(context.Background()) r := newAlertRuleInfo(context.Background())
expected := time.Now() expected := time.Now()
resultCh := make(chan evalResponse) resultCh := make(chan evalResponse)
version := rand.Int63() rule := models.AlertRuleGen()()
go func() { go func() {
result, dropped := r.eval(expected, version) result, dropped := r.eval(expected, rule)
resultCh <- evalResponse{result, dropped} resultCh <- evalResponse{result, dropped}
}() }()
select { select {
case ctx := <-r.evalCh: case ctx := <-r.evalCh:
require.Equal(t, version, ctx.version) require.Equal(t, rule, ctx.rule)
require.Equal(t, expected, ctx.scheduledAt) require.Equal(t, expected, ctx.scheduledAt)
result := <-resultCh result := <-resultCh
require.True(t, result.success) require.True(t, result.success)
@ -113,12 +113,12 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
time2 := time.UnixMilli(rand.Int63n(math.MaxInt64)) time2 := time.UnixMilli(rand.Int63n(math.MaxInt64))
resultCh1 := make(chan evalResponse) resultCh1 := make(chan evalResponse)
resultCh2 := make(chan evalResponse) resultCh2 := make(chan evalResponse)
version := rand.Int63() rule := models.AlertRuleGen()()
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(1) wg.Add(1)
go func() { go func() {
wg.Done() wg.Done()
result, dropped := r.eval(time1, version) result, dropped := r.eval(time1, rule)
wg.Done() wg.Done()
resultCh1 <- evalResponse{result, dropped} resultCh1 <- evalResponse{result, dropped}
}() }()
@ -126,7 +126,7 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
wg.Add(2) // one when time1 is sent, another when go-routine for time2 has started wg.Add(2) // one when time1 is sent, another when go-routine for time2 has started
go func() { go func() {
wg.Done() wg.Done()
result, dropped := r.eval(time2, version) result, dropped := r.eval(time2, rule)
resultCh2 <- evalResponse{result, dropped} resultCh2 <- evalResponse{result, dropped}
}() }()
wg.Wait() // at this point tick 1 has already been dropped wg.Wait() // at this point tick 1 has already been dropped
@ -147,8 +147,9 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
t.Run("eval should exit when context is cancelled", func(t *testing.T) { t.Run("eval should exit when context is cancelled", func(t *testing.T) {
r := newAlertRuleInfo(context.Background()) r := newAlertRuleInfo(context.Background())
resultCh := make(chan evalResponse) resultCh := make(chan evalResponse)
rule := models.AlertRuleGen()()
go func() { go func() {
result, dropped := r.eval(time.Now(), rand.Int63()) result, dropped := r.eval(time.Now(), rule)
resultCh <- evalResponse{result, dropped} resultCh <- evalResponse{result, dropped}
}() }()
runtime.Gosched() runtime.Gosched()
@ -171,7 +172,8 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
t.Run("eval should do nothing", func(t *testing.T) { t.Run("eval should do nothing", func(t *testing.T) {
r := newAlertRuleInfo(context.Background()) r := newAlertRuleInfo(context.Background())
r.stop() r.stop()
success, dropped := r.eval(time.Now(), rand.Int63()) rule := models.AlertRuleGen()()
success, dropped := r.eval(time.Now(), rule)
require.False(t, success) require.False(t, success)
require.Nilf(t, dropped, "expected no dropped evaluations but got one") require.Nilf(t, dropped, "expected no dropped evaluations but got one")
}) })
@ -209,7 +211,7 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
case 1: case 1:
r.update(ruleVersion(rand.Int63())) r.update(ruleVersion(rand.Int63()))
case 2: case 2:
r.eval(time.Now(), rand.Int63()) r.eval(time.Now(), models.AlertRuleGen()())
case 3: case 3:
r.stop() r.stop()
} }
@ -223,39 +225,39 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
} }
func TestSchedulableAlertRulesRegistry(t *testing.T) { func TestSchedulableAlertRulesRegistry(t *testing.T) {
r := schedulableAlertRulesRegistry{rules: make(map[models.AlertRuleKey]*models.SchedulableAlertRule)} r := alertRulesRegistry{rules: make(map[models.AlertRuleKey]*models.AlertRule)}
assert.Len(t, r.all(), 0) assert.Len(t, r.all(), 0)
// replace all rules in the registry with foo // replace all rules in the registry with foo
r.set([]*models.SchedulableAlertRule{{OrgID: 1, UID: "foo", Version: 1}}) r.set([]*models.AlertRule{{OrgID: 1, UID: "foo", Version: 1}})
assert.Len(t, r.all(), 1) assert.Len(t, r.all(), 1)
foo := r.get(models.AlertRuleKey{OrgID: 1, UID: "foo"}) foo := r.get(models.AlertRuleKey{OrgID: 1, UID: "foo"})
require.NotNil(t, foo) require.NotNil(t, foo)
assert.Equal(t, models.SchedulableAlertRule{OrgID: 1, UID: "foo", Version: 1}, *foo) assert.Equal(t, models.AlertRule{OrgID: 1, UID: "foo", Version: 1}, *foo)
// update foo to a newer version // update foo to a newer version
r.update(&models.SchedulableAlertRule{OrgID: 1, UID: "foo", Version: 2}) r.update(&models.AlertRule{OrgID: 1, UID: "foo", Version: 2})
assert.Len(t, r.all(), 1) assert.Len(t, r.all(), 1)
foo = r.get(models.AlertRuleKey{OrgID: 1, UID: "foo"}) foo = r.get(models.AlertRuleKey{OrgID: 1, UID: "foo"})
require.NotNil(t, foo) require.NotNil(t, foo)
assert.Equal(t, models.SchedulableAlertRule{OrgID: 1, UID: "foo", Version: 2}, *foo) assert.Equal(t, models.AlertRule{OrgID: 1, UID: "foo", Version: 2}, *foo)
// update bar which does not exist in the registry // update bar which does not exist in the registry
r.update(&models.SchedulableAlertRule{OrgID: 1, UID: "bar", Version: 1}) r.update(&models.AlertRule{OrgID: 1, UID: "bar", Version: 1})
assert.Len(t, r.all(), 2) assert.Len(t, r.all(), 2)
foo = r.get(models.AlertRuleKey{OrgID: 1, UID: "foo"}) foo = r.get(models.AlertRuleKey{OrgID: 1, UID: "foo"})
require.NotNil(t, foo) require.NotNil(t, foo)
assert.Equal(t, models.SchedulableAlertRule{OrgID: 1, UID: "foo", Version: 2}, *foo) assert.Equal(t, models.AlertRule{OrgID: 1, UID: "foo", Version: 2}, *foo)
bar := r.get(models.AlertRuleKey{OrgID: 1, UID: "bar"}) bar := r.get(models.AlertRuleKey{OrgID: 1, UID: "bar"})
require.NotNil(t, foo) require.NotNil(t, foo)
assert.Equal(t, models.SchedulableAlertRule{OrgID: 1, UID: "bar", Version: 1}, *bar) assert.Equal(t, models.AlertRule{OrgID: 1, UID: "bar", Version: 1}, *bar)
// replace all rules in the registry with baz // replace all rules in the registry with baz
r.set([]*models.SchedulableAlertRule{{OrgID: 1, UID: "baz", Version: 1}}) r.set([]*models.AlertRule{{OrgID: 1, UID: "baz", Version: 1}})
assert.Len(t, r.all(), 1) assert.Len(t, r.all(), 1)
baz := r.get(models.AlertRuleKey{OrgID: 1, UID: "baz"}) baz := r.get(models.AlertRuleKey{OrgID: 1, UID: "baz"})
require.NotNil(t, baz) require.NotNil(t, baz)
assert.Equal(t, models.SchedulableAlertRule{OrgID: 1, UID: "baz", Version: 1}, *baz) assert.Equal(t, models.AlertRule{OrgID: 1, UID: "baz", Version: 1}, *baz)
assert.Nil(t, r.get(models.AlertRuleKey{OrgID: 1, UID: "foo"})) assert.Nil(t, r.get(models.AlertRuleKey{OrgID: 1, UID: "foo"}))
assert.Nil(t, r.get(models.AlertRuleKey{OrgID: 1, UID: "bar"})) assert.Nil(t, r.get(models.AlertRuleKey{OrgID: 1, UID: "bar"}))

@ -96,7 +96,7 @@ type schedule struct {
// evaluation in the current tick. The evaluation of an alert rule in the // evaluation in the current tick. The evaluation of an alert rule in the
// current tick depends on its evaluation interval and when it was // current tick depends on its evaluation interval and when it was
// last evaluated. // last evaluated.
schedulableAlertRules schedulableAlertRulesRegistry schedulableAlertRules alertRulesRegistry
// bus is used to hook into events that should cause rule updates. // bus is used to hook into events that should cause rule updates.
bus bus.Bus bus bus.Bus
@ -137,7 +137,7 @@ func NewScheduler(cfg SchedulerCfg, appURL *url.URL, stateManager *state.Manager
disableGrafanaFolder: cfg.Cfg.ReservedLabels.IsReservedLabelDisabled(ngmodels.FolderTitleLabel), disableGrafanaFolder: cfg.Cfg.ReservedLabels.IsReservedLabelDisabled(ngmodels.FolderTitleLabel),
stateManager: stateManager, stateManager: stateManager,
minRuleInterval: cfg.Cfg.MinInterval, minRuleInterval: cfg.Cfg.MinInterval,
schedulableAlertRules: schedulableAlertRulesRegistry{rules: make(map[ngmodels.AlertRuleKey]*ngmodels.SchedulableAlertRule)}, schedulableAlertRules: alertRulesRegistry{rules: make(map[ngmodels.AlertRuleKey]*ngmodels.AlertRule)},
bus: bus, bus: bus,
alertsSender: cfg.AlertSender, alertsSender: cfg.AlertSender,
} }
@ -240,16 +240,13 @@ func (sch *schedule) schedulePeriodic(ctx context.Context) error {
sch.metrics.SchedulableAlertRulesHash.Set(float64(hashUIDs(alertRules))) sch.metrics.SchedulableAlertRulesHash.Set(float64(hashUIDs(alertRules)))
type readyToRunItem struct { type readyToRunItem struct {
key ngmodels.AlertRuleKey
ruleName string
ruleInfo *alertRuleInfo ruleInfo *alertRuleInfo
version int64 rule *ngmodels.AlertRule
} }
readyToRun := make([]readyToRunItem, 0) readyToRun := make([]readyToRunItem, 0)
for _, item := range alertRules { for _, item := range alertRules {
key := item.GetKey() key := item.GetKey()
itemVersion := item.Version
ruleInfo, newRoutine := sch.registry.getOrCreateInfo(ctx, key) ruleInfo, newRoutine := sch.registry.getOrCreateInfo(ctx, key)
// enforce minimum evaluation interval // enforce minimum evaluation interval
@ -275,7 +272,7 @@ func (sch *schedule) schedulePeriodic(ctx context.Context) error {
itemFrequency := item.IntervalSeconds / int64(sch.baseInterval.Seconds()) itemFrequency := item.IntervalSeconds / int64(sch.baseInterval.Seconds())
if item.IntervalSeconds != 0 && tickNum%itemFrequency == 0 { if item.IntervalSeconds != 0 && tickNum%itemFrequency == 0 {
readyToRun = append(readyToRun, readyToRunItem{key: key, ruleName: item.Title, ruleInfo: ruleInfo, version: itemVersion}) readyToRun = append(readyToRun, readyToRunItem{ruleInfo: ruleInfo, rule: item})
} }
// remove the alert rule from the registered alert rules // remove the alert rule from the registered alert rules
@ -291,15 +288,16 @@ func (sch *schedule) schedulePeriodic(ctx context.Context) error {
item := readyToRun[i] item := readyToRun[i]
time.AfterFunc(time.Duration(int64(i)*step), func() { time.AfterFunc(time.Duration(int64(i)*step), func() {
success, dropped := item.ruleInfo.eval(tick, item.version) key := item.rule.GetKey()
success, dropped := item.ruleInfo.eval(tick, item.rule)
if !success { if !success {
sch.log.Debug("scheduled evaluation was canceled because evaluation routine was stopped", "uid", item.key.UID, "org", item.key.OrgID, "time", tick) sch.log.Debug("scheduled evaluation was canceled because evaluation routine was stopped", "uid", key.UID, "org", key.OrgID, "time", tick)
return return
} }
if dropped != nil { if dropped != nil {
sch.log.Warn("Alert rule evaluation is too slow - dropped tick", "uid", item.key.UID, "org", item.key.OrgID, "time", tick) sch.log.Warn("Alert rule evaluation is too slow - dropped tick", "uid", key.UID, "org", key.OrgID, "time", tick)
orgID := fmt.Sprint(item.key.OrgID) orgID := fmt.Sprint(key.OrgID)
sch.metrics.EvaluationMissed.WithLabelValues(orgID, item.ruleName).Inc() sch.metrics.EvaluationMissed.WithLabelValues(orgID, item.rule.Title).Inc()
} }
}) })
} }
@ -341,31 +339,16 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
states := sch.stateManager.GetStatesForRuleUID(key.OrgID, key.UID) states := sch.stateManager.GetStatesForRuleUID(key.OrgID, key.UID)
expiredAlerts := FromAlertsStateToStoppedAlert(states, sch.appURL, sch.clock) expiredAlerts := FromAlertsStateToStoppedAlert(states, sch.appURL, sch.clock)
sch.stateManager.RemoveByRuleUID(key.OrgID, key.UID) sch.stateManager.RemoveByRuleUID(key.OrgID, key.UID)
sch.alertsSender.Send(key, expiredAlerts) if len(expiredAlerts.PostableAlerts) > 0 {
} sch.alertsSender.Send(key, expiredAlerts)
updateRule := func(ctx context.Context, oldRule *ngmodels.AlertRule) (*ngmodels.AlertRule, map[string]string, error) {
q := ngmodels.GetAlertRuleByUIDQuery{OrgID: key.OrgID, UID: key.UID}
err := sch.ruleStore.GetAlertRuleByUID(ctx, &q)
if err != nil {
logger.Error("failed to fetch alert rule", "err", err)
return nil, nil, err
}
if oldRule != nil && oldRule.Version < q.Result.Version {
clearState()
}
newLabels, err := sch.getRuleExtraLabels(ctx, q.Result)
if err != nil {
return nil, nil, err
} }
return q.Result, newLabels, nil
} }
evaluate := func(ctx context.Context, r *ngmodels.AlertRule, extraLabels map[string]string, attempt int64, e *evaluation) { evaluate := func(ctx context.Context, extraLabels map[string]string, attempt int64, e *evaluation) {
logger := logger.New("version", r.Version, "attempt", attempt, "now", e.scheduledAt) logger := logger.New("version", e.rule.Version, "attempt", attempt, "now", e.scheduledAt)
start := sch.clock.Now() start := sch.clock.Now()
results := sch.evaluator.ConditionEval(ctx, r.GetEvalCondition(), e.scheduledAt) results := sch.evaluator.ConditionEval(ctx, e.rule.GetEvalCondition(), e.scheduledAt)
dur := sch.clock.Now().Sub(start) dur := sch.clock.Now().Sub(start)
evalTotal.Inc() evalTotal.Inc()
evalDuration.Observe(dur.Seconds()) evalDuration.Observe(dur.Seconds())
@ -376,10 +359,12 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
logger.Debug("alert rule evaluated", "results", results, "duration", dur) logger.Debug("alert rule evaluated", "results", results, "duration", dur)
} }
processedStates := sch.stateManager.ProcessEvalResults(ctx, e.scheduledAt, r, results, extraLabels) processedStates := sch.stateManager.ProcessEvalResults(ctx, e.scheduledAt, e.rule, results, extraLabels)
sch.saveAlertStates(ctx, processedStates) sch.saveAlertStates(ctx, processedStates)
alerts := FromAlertStateToPostableAlerts(processedStates, sch.stateManager, sch.appURL) alerts := FromAlertStateToPostableAlerts(processedStates, sch.stateManager, sch.appURL)
sch.alertsSender.Send(key, alerts) if len(alerts.PostableAlerts) > 0 {
sch.alertsSender.Send(key, alerts)
}
} }
retryIfError := func(f func(attempt int64) error) error { retryIfError := func(f func(attempt int64) error) error {
@ -395,35 +380,24 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
} }
evalRunning := false evalRunning := false
var currentRule *ngmodels.AlertRule var currentRuleVersion int64 = 0
var extraLabels map[string]string var extraLabels map[string]string
defer sch.stopApplied(key) defer sch.stopApplied(key)
for { for {
select { select {
// used by external services (API) to notify that rule is updated. // used by external services (API) to notify that rule is updated.
case version := <-updateCh: case lastVersion := <-updateCh:
// sometimes it can happen when, for example, the rule evaluation took so long, // sometimes it can happen when, for example, the rule evaluation took so long,
// and there were two concurrent messages in updateCh and evalCh, and the eval's one got processed first. // and there were two concurrent messages in updateCh and evalCh, and the eval's one got processed first.
// therefore, at the time when message from updateCh is processed the current rule will have // therefore, at the time when message from updateCh is processed the current rule will have
// at least the same version (or greater) and the state created for the new version of the rule. // at least the same version (or greater) and the state created for the new version of the rule.
if currentRule != nil && int64(version) <= currentRule.Version { if currentRuleVersion >= int64(lastVersion) {
logger.Info("skip updating rule because its current version is actual", "current_version", currentRule.Version, "new_version", version) logger.Info("skip updating rule because its current version is actual", "version", currentRuleVersion, "new_version", lastVersion)
continue continue
} }
logger.Info("fetching new version of the rule") logger.Info("clearing the state of the rule because version has changed", "version", currentRuleVersion, "new_version", lastVersion)
err := retryIfError(func(attempt int64) error { // clear the state. So the next evaluation will start from the scratch.
newRule, newExtraLabels, err := updateRule(grafanaCtx, currentRule) clearState()
if err != nil {
return err
}
logger.Debug("new alert rule version fetched", "title", newRule.Title, "version", newRule.Version)
currentRule = newRule
extraLabels = newExtraLabels
return nil
})
if err != nil {
logger.Error("updating rule failed after all retries", "err", err)
}
// evalCh - used by the scheduler to signal that evaluation is needed. // evalCh - used by the scheduler to signal that evaluation is needed.
case ctx, ok := <-evalCh: case ctx, ok := <-evalCh:
if !ok { if !ok {
@ -442,17 +416,21 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
}() }()
err := retryIfError(func(attempt int64) error { err := retryIfError(func(attempt int64) error {
newVersion := ctx.rule.Version
// fetch latest alert rule version // fetch latest alert rule version
if currentRule == nil || currentRule.Version < ctx.version { if currentRuleVersion != newVersion {
newRule, newExtraLabels, err := updateRule(grafanaCtx, currentRule) if currentRuleVersion > 0 { // do not clean up state if the eval loop has just started.
logger.Debug("got a new version of alert rule. Clear up the state and refresh extra labels", "version", currentRuleVersion, "new_version", newVersion)
clearState()
}
newLabels, err := sch.getRuleExtraLabels(grafanaCtx, ctx.rule)
if err != nil { if err != nil {
return err return err
} }
currentRule = newRule currentRuleVersion = newVersion
extraLabels = newExtraLabels extraLabels = newLabels
logger.Debug("new alert rule version fetched", "title", newRule.Title, "version", newRule.Version)
} }
evaluate(grafanaCtx, currentRule, extraLabels, attempt, ctx) evaluate(grafanaCtx, extraLabels, attempt, ctx)
return nil return nil
}) })
if err != nil { if err != nil {

@ -4,11 +4,9 @@ import (
"bytes" "bytes"
"context" "context"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"math/rand" "math/rand"
"net/url" "net/url"
"sync"
"testing" "testing"
"time" "time"
@ -48,7 +46,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
instanceStore := &store.FakeInstanceStore{} instanceStore := &store.FakeInstanceStore{}
registry := prometheus.NewPedanticRegistry() registry := prometheus.NewPedanticRegistry()
sch, _ := setupScheduler(t, ruleStore, instanceStore, registry, senderMock) sch := setupScheduler(t, ruleStore, instanceStore, registry, senderMock, nil)
sch.evalAppliedFunc = func(key models.AlertRuleKey, t time.Time) { sch.evalAppliedFunc = func(key models.AlertRuleKey, t time.Time) {
evalAppliedChan <- t evalAppliedChan <- t
} }
@ -58,10 +56,6 @@ func TestSchedule_ruleRoutine(t *testing.T) {
// normal states do not include NoData and Error because currently it is not possible to perform any sensible test // normal states do not include NoData and Error because currently it is not possible to perform any sensible test
normalStates := []eval.State{eval.Normal, eval.Alerting, eval.Pending} normalStates := []eval.State{eval.Normal, eval.Alerting, eval.Pending}
allStates := [...]eval.State{eval.Normal, eval.Alerting, eval.Pending, eval.NoData, eval.Error} allStates := [...]eval.State{eval.Normal, eval.Alerting, eval.Pending, eval.NoData, eval.Error}
randomNormalState := func() eval.State {
// pick only supported cases
return normalStates[rand.Intn(3)]
}
for _, evalState := range normalStates { for _, evalState := range normalStates {
// TODO rewrite when we are able to mock/fake state manager // TODO rewrite when we are able to mock/fake state manager
@ -70,7 +64,8 @@ func TestSchedule_ruleRoutine(t *testing.T) {
evalAppliedChan := make(chan time.Time) evalAppliedChan := make(chan time.Time)
sch, ruleStore, instanceStore, reg := createSchedule(evalAppliedChan, nil) sch, ruleStore, instanceStore, reg := createSchedule(evalAppliedChan, nil)
rule := CreateTestAlertRule(t, ruleStore, 10, rand.Int63(), evalState) rule := models.AlertRuleGen(withQueryForState(t, evalState))()
ruleStore.PutRule(context.Background(), rule)
go func() { go func() {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -82,38 +77,28 @@ func TestSchedule_ruleRoutine(t *testing.T) {
evalChan <- &evaluation{ evalChan <- &evaluation{
scheduledAt: expectedTime, scheduledAt: expectedTime,
version: rule.Version, rule: rule,
} }
actualTime := waitForTimeChannel(t, evalAppliedChan) actualTime := waitForTimeChannel(t, evalAppliedChan)
require.Equal(t, expectedTime, actualTime) require.Equal(t, expectedTime, actualTime)
t.Run("it should get rule from database when run the first time", func(t *testing.T) { t.Run("it should add extra labels", func(t *testing.T) {
queries := make([]models.GetAlertRuleByUIDQuery, 0)
for _, op := range ruleStore.RecordedOps {
switch q := op.(type) {
case models.GetAlertRuleByUIDQuery:
queries = append(queries, q)
}
}
require.NotEmptyf(t, queries, "Expected a %T request to rule store but nothing was recorded", models.GetAlertRuleByUIDQuery{})
require.Len(t, queries, 1, "Expected exactly one request of %T but got %d", models.GetAlertRuleByUIDQuery{}, len(queries))
require.Equal(t, rule.UID, queries[0].UID)
require.Equal(t, rule.OrgID, queries[0].OrgID)
})
t.Run("it should get rule folder title from database and attach as label", func(t *testing.T) {
states := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID) states := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
folder, _ := ruleStore.GetNamespaceByUID(context.Background(), rule.NamespaceUID, rule.OrgID, nil)
for _, s := range states { for _, s := range states {
require.NotEmptyf(t, s.Labels[models.FolderTitleLabel], "Expected a non-empty title in label %s", models.FolderTitleLabel) assert.Equal(t, rule.UID, s.Labels[models.RuleUIDLabel])
require.Equal(t, s.Labels[models.FolderTitleLabel], ruleStore.Folders[rule.OrgID][0].Title) assert.Equal(t, rule.NamespaceUID, s.Labels[models.NamespaceUIDLabel])
assert.Equal(t, rule.Title, s.Labels[prometheusModel.AlertNameLabel])
assert.Equal(t, folder.Title, s.Labels[models.FolderTitleLabel])
} }
}) })
t.Run("it should process evaluation results via state manager", func(t *testing.T) { t.Run("it should process evaluation results via state manager", func(t *testing.T) {
// TODO rewrite when we are able to mock/fake state manager // TODO rewrite when we are able to mock/fake state manager
states := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID) states := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
require.Len(t, states, 1) require.Len(t, states, 1)
s := states[0] s := states[0]
t.Logf("State: %v", s)
require.Equal(t, rule.UID, s.AlertRuleUID) require.Equal(t, rule.UID, s.AlertRuleUID)
require.Len(t, s.Results, 1) require.Len(t, s.Results, 1)
var expectedStatus = evalState var expectedStatus = evalState
@ -148,6 +133,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
require.Equal(t, evalState.String(), string(cmd.State)) require.Equal(t, evalState.String(), string(cmd.State))
require.Equal(t, s.Labels, data.Labels(cmd.Labels)) require.Equal(t, s.Labels, data.Labels(cmd.Labels))
}) })
t.Run("it reports metrics", func(t *testing.T) { t.Run("it reports metrics", func(t *testing.T) {
// duration metric has 0 values because of mocked clock that do not advance // duration metric has 0 values because of mocked clock that do not advance
expectedMetric := fmt.Sprintf( expectedMetric := fmt.Sprintf(
@ -201,298 +187,168 @@ func TestSchedule_ruleRoutine(t *testing.T) {
}) })
}) })
t.Run("should fetch rule from database only if new version is greater than current", func(t *testing.T) { t.Run("when a message is sent to update channel", func(t *testing.T) {
evalChan := make(chan *evaluation) rule := models.AlertRuleGen(withQueryForState(t, eval.Normal))()
evalAppliedChan := make(chan time.Time)
ctx := context.Background()
sch, ruleStore, _, _ := createSchedule(evalAppliedChan, nil)
rule := CreateTestAlertRule(t, ruleStore, 10, rand.Int63(), randomNormalState())
go func() {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan ruleVersion))
}()
expectedTime := time.UnixMicro(rand.Int63())
evalChan <- &evaluation{
scheduledAt: expectedTime,
version: rule.Version,
}
actualTime := waitForTimeChannel(t, evalAppliedChan)
require.Equal(t, expectedTime, actualTime)
// Now update the rule
newRule := *rule
newRule.Version++
ruleStore.PutRule(ctx, &newRule)
// and call with new version
expectedTime = expectedTime.Add(time.Duration(rand.Intn(10)) * time.Second)
evalChan <- &evaluation{
scheduledAt: expectedTime,
version: newRule.Version,
}
actualTime = waitForTimeChannel(t, evalAppliedChan)
require.Equal(t, expectedTime, actualTime)
queries := make([]models.GetAlertRuleByUIDQuery, 0)
for _, op := range ruleStore.RecordedOps {
switch q := op.(type) {
case models.GetAlertRuleByUIDQuery:
queries = append(queries, q)
}
}
require.Len(t, queries, 2, "Expected exactly two request of %T", models.GetAlertRuleByUIDQuery{})
require.Equal(t, rule.UID, queries[0].UID)
require.Equal(t, rule.OrgID, queries[0].OrgID)
require.Equal(t, rule.UID, queries[1].UID)
require.Equal(t, rule.OrgID, queries[1].OrgID)
})
t.Run("should not fetch rule if version is equal or less than current", func(t *testing.T) {
evalChan := make(chan *evaluation) evalChan := make(chan *evaluation)
evalAppliedChan := make(chan time.Time) evalAppliedChan := make(chan time.Time)
updateChan := make(chan ruleVersion)
sch, ruleStore, _, _ := createSchedule(evalAppliedChan, nil) sender := AlertsSenderMock{}
sender.EXPECT().Send(rule.GetKey(), mock.Anything).Return()
rule := CreateTestAlertRule(t, ruleStore, 10, rand.Int63(), randomNormalState()) sch, ruleStore, _, _ := createSchedule(evalAppliedChan, &sender)
ruleStore.PutRule(context.Background(), rule)
go func() { go func() {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel) t.Cleanup(cancel)
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan ruleVersion)) _ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, updateChan)
}() }()
expectedTime := time.UnixMicro(rand.Int63()) // init evaluation loop so it got the rule version
evalChan <- &evaluation{ evalChan <- &evaluation{
scheduledAt: expectedTime, scheduledAt: sch.clock.Now(),
version: rule.Version, rule: rule,
} }
actualTime := waitForTimeChannel(t, evalAppliedChan) waitForTimeChannel(t, evalAppliedChan)
require.Equal(t, expectedTime, actualTime)
// define some state
// try again with the same version states := make([]*state.State, 0, len(allStates))
expectedTime = expectedTime.Add(time.Duration(rand.Intn(10)) * time.Second) for _, s := range allStates {
evalChan <- &evaluation{ for i := 0; i < 2; i++ {
scheduledAt: expectedTime, states = append(states, &state.State{
version: rule.Version, AlertRuleUID: rule.UID,
CacheId: util.GenerateShortUID(),
OrgID: rule.OrgID,
State: s,
StartsAt: sch.clock.Now(),
EndsAt: sch.clock.Now().Add(time.Duration(rand.Intn(25)+5) * time.Second),
Labels: rule.Labels,
})
}
} }
actualTime = waitForTimeChannel(t, evalAppliedChan) sch.stateManager.Put(states)
require.Equal(t, expectedTime, actualTime)
expectedTime = expectedTime.Add(time.Duration(rand.Intn(10)) * time.Second) states = sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
evalChan <- &evaluation{ expectedToBeSent := 0
scheduledAt: expectedTime, for _, s := range states {
version: rule.Version - 1, if s.State == eval.Normal || s.State == eval.Pending {
} continue
actualTime = waitForTimeChannel(t, evalAppliedChan)
require.Equal(t, expectedTime, actualTime)
queries := make([]models.GetAlertRuleByUIDQuery, 0)
for _, op := range ruleStore.RecordedOps {
switch q := op.(type) {
case models.GetAlertRuleByUIDQuery:
queries = append(queries, q)
} }
expectedToBeSent++
} }
require.Len(t, queries, 1, "Expected exactly one request of %T", models.GetAlertRuleByUIDQuery{}) require.Greaterf(t, expectedToBeSent, 0, "State manger was expected to return at least one state that can be expired")
})
t.Run("when update channel is not empty", func(t *testing.T) {
t.Run("should fetch the alert rule from database", func(t *testing.T) {
evalChan := make(chan *evaluation)
evalAppliedChan := make(chan time.Time)
updateChan := make(chan ruleVersion)
sch, ruleStore, _, _ := createSchedule(evalAppliedChan, nil) t.Run("should do nothing if version in channel is the same", func(t *testing.T) {
updateChan <- ruleVersion(rule.Version - 1)
rule := CreateTestAlertRule(t, ruleStore, 10, rand.Int63(), eval.Alerting) // we want the alert to fire
go func() {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, updateChan)
}()
updateChan <- ruleVersion(rule.Version) updateChan <- ruleVersion(rule.Version)
updateChan <- ruleVersion(rule.Version) // second time just to make sure that previous messages were handled
// wait for command to be executed actualStates := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
var queries []interface{} require.Len(t, actualStates, len(states))
require.Eventuallyf(t, func() bool {
queries = ruleStore.GetRecordedCommands(func(cmd interface{}) (interface{}, bool) {
c, ok := cmd.(models.GetAlertRuleByUIDQuery)
return c, ok
})
return len(queries) == 1
}, 5*time.Second, 100*time.Millisecond, "Expected command a single %T to be recorded. All recordings: %#v", models.GetAlertRuleByUIDQuery{}, ruleStore.RecordedOps)
m := queries[0].(models.GetAlertRuleByUIDQuery)
require.Equal(t, rule.UID, m.UID)
require.Equal(t, rule.OrgID, m.OrgID)
// now call evaluation loop to make sure that the rule was persisted
evalChan <- &evaluation{
scheduledAt: time.UnixMicro(rand.Int63()),
version: rule.Version,
}
waitForTimeChannel(t, evalAppliedChan)
queries = ruleStore.GetRecordedCommands(func(cmd interface{}) (interface{}, bool) { sender.AssertNotCalled(t, "Send", mock.Anything, mock.Anything)
c, ok := cmd.(models.GetAlertRuleByUIDQuery)
return c, ok
})
require.Lenf(t, queries, 1, "evaluation loop requested a rule from database but it should not be")
}) })
t.Run("should retry when database fails", func(t *testing.T) { t.Run("should clear the state and expire firing alerts if version in channel is greater", func(t *testing.T) {
evalAppliedChan := make(chan time.Time) updateChan <- ruleVersion(rule.Version + rand.Int63n(1000) + 1)
updateChan := make(chan ruleVersion)
sch, ruleStore, _, _ := createSchedule(evalAppliedChan, nil)
sch.maxAttempts = rand.Int63n(4) + 1
rule := CreateTestAlertRule(t, ruleStore, 10, rand.Int63(), randomNormalState())
go func() { require.Eventually(t, func() bool {
ctx, cancel := context.WithCancel(context.Background()) return len(sender.Calls) > 0
t.Cleanup(cancel) }, 5*time.Second, 100*time.Millisecond)
_ = sch.ruleRoutine(ctx, rule.GetKey(), make(chan *evaluation), updateChan)
}()
ruleStore.Hook = func(cmd interface{}) error {
if _, ok := cmd.(models.GetAlertRuleByUIDQuery); !ok {
return nil
}
return errors.New("TEST")
}
updateChan <- ruleVersion(rule.Version)
var queries []interface{} require.Empty(t, sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID))
require.Eventuallyf(t, func() bool { sender.AssertNumberOfCalls(t, "Send", 1)
queries = ruleStore.GetRecordedCommands(func(cmd interface{}) (interface{}, bool) { args, ok := sender.Calls[0].Arguments[1].(definitions.PostableAlerts)
c, ok := cmd.(models.GetAlertRuleByUIDQuery) require.Truef(t, ok, fmt.Sprintf("expected argument of function was supposed to be 'definitions.PostableAlerts' but got %T", sender.Calls[0].Arguments[1]))
return c, ok require.Len(t, args.PostableAlerts, expectedToBeSent)
})
return int64(len(queries)) == sch.maxAttempts
}, 5*time.Second, 100*time.Millisecond, "Expected exactly two request of %T. All recordings: %#v", models.GetAlertRuleByUIDQuery{}, ruleStore.RecordedOps)
}) })
}) })
t.Run("when rule version is updated", func(t *testing.T) { t.Run("when evaluation fails", func(t *testing.T) {
t.Run("should clear the state and expire firing alerts", func(t *testing.T) { rule := models.AlertRuleGen(withQueryForState(t, eval.Error))()
orgID := rand.Int63() rule.ExecErrState = models.ErrorErrState
evalChan := make(chan *evaluation)
evalAppliedChan := make(chan time.Time)
updateChan := make(chan ruleVersion)
sender := AlertsSenderMock{}
ctx := context.Background()
sch, ruleStore, _, _ := createSchedule(evalAppliedChan, &sender)
var rule = CreateTestAlertRule(t, ruleStore, 10, orgID, eval.Alerting) // we want the alert to fire evalChan := make(chan *evaluation)
evalAppliedChan := make(chan time.Time)
sender.EXPECT().Send(rule.GetKey(), mock.Anything)
// define some state
states := make([]*state.State, 0, len(allStates))
for _, s := range allStates {
for i := 0; i < 2; i++ {
states = append(states, &state.State{
AlertRuleUID: rule.UID,
CacheId: util.GenerateShortUID(),
OrgID: rule.OrgID,
State: s,
StartsAt: sch.clock.Now(),
EndsAt: sch.clock.Now().Add(time.Duration(rand.Intn(25)+5) * time.Second),
Labels: rule.Labels,
})
}
}
sch.stateManager.Put(states)
states = sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
expectedToBeSent := 0 sender := AlertsSenderMock{}
for _, s := range states { sender.EXPECT().Send(rule.GetKey(), mock.Anything).Return()
if s.State == eval.Normal || s.State == eval.Pending {
continue
}
expectedToBeSent++
}
require.Greaterf(t, expectedToBeSent, 0, "State manger was expected to return at least one state that can be expired")
go func() { sch, ruleStore, _, reg := createSchedule(evalAppliedChan, &sender)
ctx, cancel := context.WithCancel(context.Background()) ruleStore.PutRule(context.Background(), rule)
t.Cleanup(cancel)
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, updateChan)
}()
wg := sync.WaitGroup{} go func() {
wg.Add(1) ctx, cancel := context.WithCancel(context.Background())
ruleStore.Hook = func(cmd interface{}) error { t.Cleanup(cancel)
_, ok := cmd.(models.GetAlertRuleByUIDQuery) _ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan ruleVersion))
if ok { }()
wg.Done() // add synchronization.
}
return nil
}
updateChan <- ruleVersion(rule.Version) evalChan <- &evaluation{
scheduledAt: sch.clock.Now(),
rule: rule,
}
wg.Wait() waitForTimeChannel(t, evalAppliedChan)
newRule := models.CopyRule(rule)
newRule.Version++
ruleStore.PutRule(ctx, newRule)
wg.Add(1)
updateChan <- ruleVersion(newRule.Version)
wg.Wait()
require.Eventually(t, func() bool { t.Run("it should increase failure counter", func(t *testing.T) {
return len(sender.Calls) > 0 // duration metric has 0 values because of mocked clock that do not advance
}, 5*time.Second, 100*time.Millisecond) expectedMetric := fmt.Sprintf(
`# HELP grafana_alerting_rule_evaluation_duration_seconds The duration for a rule to execute.
# TYPE grafana_alerting_rule_evaluation_duration_seconds histogram
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.005"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.01"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.025"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.05"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.1"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.25"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.5"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="1"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="2.5"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="5"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="10"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="25"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="50"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="100"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="+Inf"} 1
grafana_alerting_rule_evaluation_duration_seconds_sum{org="%[1]d"} 0
grafana_alerting_rule_evaluation_duration_seconds_count{org="%[1]d"} 1
# HELP grafana_alerting_rule_evaluation_failures_total The total number of rule evaluation failures.
# TYPE grafana_alerting_rule_evaluation_failures_total counter
grafana_alerting_rule_evaluation_failures_total{org="%[1]d"} 1
# HELP grafana_alerting_rule_evaluations_total The total number of rule evaluations.
# TYPE grafana_alerting_rule_evaluations_total counter
grafana_alerting_rule_evaluations_total{org="%[1]d"} 1
`, rule.OrgID)
require.Empty(t, sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)) err := testutil.GatherAndCompare(reg, bytes.NewBufferString(expectedMetric), "grafana_alerting_rule_evaluation_duration_seconds", "grafana_alerting_rule_evaluations_total", "grafana_alerting_rule_evaluation_failures_total")
require.NoError(t, err)
})
sender.AssertExpectations(t) t.Run("it should send special alert DatasourceError", func(t *testing.T) {
sender.AssertNumberOfCalls(t, "Send", 1)
args, ok := sender.Calls[0].Arguments[1].(definitions.PostableAlerts) args, ok := sender.Calls[0].Arguments[1].(definitions.PostableAlerts)
require.Truef(t, ok, fmt.Sprintf("expected argument of function was supposed to be 'definitions.PostableAlerts' but got %T", sender.Calls[0].Arguments[1])) require.Truef(t, ok, fmt.Sprintf("expected argument of function was supposed to be 'definitions.PostableAlerts' but got %T", sender.Calls[0].Arguments[1]))
require.Len(t, args.PostableAlerts, expectedToBeSent) assert.Len(t, args.PostableAlerts, 1)
}) assert.Equal(t, ErrorAlertName, args.PostableAlerts[0].Labels[prometheusModel.AlertNameLabel])
})
t.Run("when evaluation fails", func(t *testing.T) {
t.Run("it should increase failure counter", func(t *testing.T) {
t.Skip()
// TODO implement check for counter
})
t.Run("it should retry up to configured times", func(t *testing.T) {
// TODO figure out how to simulate failure
t.Skip()
}) })
}) })
t.Run("when there are alerts that should be firing", func(t *testing.T) { t.Run("when there are alerts that should be firing", func(t *testing.T) {
t.Run("it should call sender", func(t *testing.T) { t.Run("it should call sender", func(t *testing.T) {
orgID := rand.Int63() // eval.Alerting makes state manager to create notifications for alertmanagers
rule := models.AlertRuleGen(withQueryForState(t, eval.Alerting))()
evalChan := make(chan *evaluation) evalChan := make(chan *evaluation)
evalAppliedChan := make(chan time.Time) evalAppliedChan := make(chan time.Time)
sender := AlertsSenderMock{} sender := AlertsSenderMock{}
sender.EXPECT().Send(rule.GetKey(), mock.Anything).Return()
sch, ruleStore, _, _ := createSchedule(evalAppliedChan, &sender) sch, ruleStore, _, _ := createSchedule(evalAppliedChan, &sender)
ruleStore.PutRule(context.Background(), rule)
// eval.Alerting makes state manager to create notifications for alertmanagers
rule := CreateTestAlertRule(t, ruleStore, 10, orgID, eval.Alerting)
folder, _ := ruleStore.GetNamespaceByUID(context.Background(), rule.NamespaceUID, orgID, nil)
sender.EXPECT().Send(rule.GetKey(), mock.Anything).Return()
go func() { go func() {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -501,39 +357,56 @@ func TestSchedule_ruleRoutine(t *testing.T) {
}() }()
evalChan <- &evaluation{ evalChan <- &evaluation{
scheduledAt: time.Now(), scheduledAt: sch.clock.Now(),
version: rule.Version, rule: rule,
} }
waitForTimeChannel(t, evalAppliedChan) waitForTimeChannel(t, evalAppliedChan)
sender.AssertExpectations(t) sender.AssertNumberOfCalls(t, "Send", 1)
args, ok := sender.Calls[0].Arguments[1].(definitions.PostableAlerts) args, ok := sender.Calls[0].Arguments[1].(definitions.PostableAlerts)
require.Truef(t, ok, fmt.Sprintf("expected argument of function was supposed to be 'definitions.PostableAlerts' but got %T", sender.Calls[0].Arguments[1])) require.Truef(t, ok, fmt.Sprintf("expected argument of function was supposed to be 'definitions.PostableAlerts' but got %T", sender.Calls[0].Arguments[1]))
require.Len(t, args.PostableAlerts, 1) require.Len(t, args.PostableAlerts, 1)
t.Run("should add extra labels", func(t *testing.T) {
alert := args.PostableAlerts[0]
assert.Equal(t, rule.UID, alert.Labels[models.RuleUIDLabel])
assert.Equal(t, rule.NamespaceUID, alert.Labels[models.NamespaceUIDLabel])
assert.Equal(t, rule.Title, alert.Labels[prometheusModel.AlertNameLabel])
assert.Equal(t, folder.Title, alert.Labels[models.FolderTitleLabel])
})
}) })
}) })
t.Run("when there are no alerts to send it should not call notifiers", func(t *testing.T) { t.Run("when there are no alerts to send it should not call notifiers", func(t *testing.T) {
// TODO needs some mocking/stubbing for Alertmanager and Sender to make sure it was not called rule := models.AlertRuleGen(withQueryForState(t, eval.Normal))()
t.Skip()
evalChan := make(chan *evaluation)
evalAppliedChan := make(chan time.Time)
sender := AlertsSenderMock{}
sender.EXPECT().Send(rule.GetKey(), mock.Anything).Return()
sch, ruleStore, _, _ := createSchedule(evalAppliedChan, &sender)
ruleStore.PutRule(context.Background(), rule)
go func() {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan ruleVersion))
}()
evalChan <- &evaluation{
scheduledAt: sch.clock.Now(),
rule: rule,
}
waitForTimeChannel(t, evalAppliedChan)
sender.AssertNotCalled(t, "Send", mock.Anything, mock.Anything)
require.NotEmpty(t, sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID))
}) })
} }
func TestSchedule_UpdateAlertRule(t *testing.T) { func TestSchedule_UpdateAlertRule(t *testing.T) {
t.Run("when rule exists", func(t *testing.T) { t.Run("when rule exists", func(t *testing.T) {
t.Run("it should call Update", func(t *testing.T) { t.Run("it should call Update", func(t *testing.T) {
sch := setupSchedulerWithFakeStores(t) sch := setupScheduler(t, nil, nil, nil, nil, nil)
key := generateRuleKey() key := models.GenerateRuleKey(rand.Int63())
info, _ := sch.registry.getOrCreateInfo(context.Background(), key) info, _ := sch.registry.getOrCreateInfo(context.Background(), key)
version := rand.Int63() version := rand.Int63()
go func() { go func() {
@ -548,8 +421,8 @@ func TestSchedule_UpdateAlertRule(t *testing.T) {
} }
}) })
t.Run("should exit if it is closed", func(t *testing.T) { t.Run("should exit if it is closed", func(t *testing.T) {
sch := setupSchedulerWithFakeStores(t) sch := setupScheduler(t, nil, nil, nil, nil, nil)
key := generateRuleKey() key := models.GenerateRuleKey(rand.Int63())
info, _ := sch.registry.getOrCreateInfo(context.Background(), key) info, _ := sch.registry.getOrCreateInfo(context.Background(), key)
info.stop() info.stop()
sch.UpdateAlertRule(key, rand.Int63()) sch.UpdateAlertRule(key, rand.Int63())
@ -557,8 +430,8 @@ func TestSchedule_UpdateAlertRule(t *testing.T) {
}) })
t.Run("when rule does not exist", func(t *testing.T) { t.Run("when rule does not exist", func(t *testing.T) {
t.Run("should exit", func(t *testing.T) { t.Run("should exit", func(t *testing.T) {
sch := setupSchedulerWithFakeStores(t) sch := setupScheduler(t, nil, nil, nil, nil, nil)
key := generateRuleKey() key := models.GenerateRuleKey(rand.Int63())
sch.UpdateAlertRule(key, rand.Int63()) sch.UpdateAlertRule(key, rand.Int63())
}) })
}) })
@ -567,24 +440,26 @@ func TestSchedule_UpdateAlertRule(t *testing.T) {
func TestSchedule_DeleteAlertRule(t *testing.T) { func TestSchedule_DeleteAlertRule(t *testing.T) {
t.Run("when rule exists", func(t *testing.T) { t.Run("when rule exists", func(t *testing.T) {
t.Run("it should stop evaluation loop and remove the controller from registry", func(t *testing.T) { t.Run("it should stop evaluation loop and remove the controller from registry", func(t *testing.T) {
sch := setupSchedulerWithFakeStores(t) sch := setupScheduler(t, nil, nil, nil, nil, nil)
key := generateRuleKey() rule := models.AlertRuleGen()()
key := rule.GetKey()
info, _ := sch.registry.getOrCreateInfo(context.Background(), key) info, _ := sch.registry.getOrCreateInfo(context.Background(), key)
sch.DeleteAlertRule(key) sch.DeleteAlertRule(key)
require.False(t, info.update(ruleVersion(rand.Int63()))) require.False(t, info.update(ruleVersion(rand.Int63())))
success, dropped := info.eval(time.Now(), 1) success, dropped := info.eval(time.Now(), rule)
require.False(t, success) require.False(t, success)
require.Nilf(t, dropped, "expected no dropped evaluations but got one") require.Nilf(t, dropped, "expected no dropped evaluations but got one")
require.False(t, sch.registry.exists(key)) require.False(t, sch.registry.exists(key))
}) })
t.Run("should remove controller from registry", func(t *testing.T) { t.Run("should remove controller from registry", func(t *testing.T) {
sch := setupSchedulerWithFakeStores(t) sch := setupScheduler(t, nil, nil, nil, nil, nil)
key := generateRuleKey() rule := models.AlertRuleGen()()
key := rule.GetKey()
info, _ := sch.registry.getOrCreateInfo(context.Background(), key) info, _ := sch.registry.getOrCreateInfo(context.Background(), key)
info.stop() info.stop()
sch.DeleteAlertRule(key) sch.DeleteAlertRule(key)
require.False(t, info.update(ruleVersion(rand.Int63()))) require.False(t, info.update(ruleVersion(rand.Int63())))
success, dropped := info.eval(time.Now(), 1) success, dropped := info.eval(time.Now(), rule)
require.False(t, success) require.False(t, success)
require.Nilf(t, dropped, "expected no dropped evaluations but got one") require.Nilf(t, dropped, "expected no dropped evaluations but got one")
require.False(t, sch.registry.exists(key)) require.False(t, sch.registry.exists(key))
@ -592,40 +467,39 @@ func TestSchedule_DeleteAlertRule(t *testing.T) {
}) })
t.Run("when rule does not exist", func(t *testing.T) { t.Run("when rule does not exist", func(t *testing.T) {
t.Run("should exit", func(t *testing.T) { t.Run("should exit", func(t *testing.T) {
sch := setupSchedulerWithFakeStores(t) sch := setupScheduler(t, nil, nil, nil, nil, nil)
key := generateRuleKey() key := models.GenerateRuleKey(rand.Int63())
sch.DeleteAlertRule(key) sch.DeleteAlertRule(key)
}) })
}) })
} }
func generateRuleKey() models.AlertRuleKey { func setupScheduler(t *testing.T, rs *store.FakeRuleStore, is *store.FakeInstanceStore, registry *prometheus.Registry, senderMock *AlertsSenderMock, evalMock *eval.FakeEvaluator) *schedule {
return models.AlertRuleKey{
OrgID: rand.Int63(),
UID: util.GenerateShortUID(),
}
}
func setupSchedulerWithFakeStores(t *testing.T) *schedule {
t.Helper()
ruleStore := store.NewFakeRuleStore(t)
instanceStore := &store.FakeInstanceStore{}
sch, _ := setupScheduler(t, ruleStore, instanceStore, nil, nil)
return sch
}
func setupScheduler(t *testing.T, rs store.RuleStore, is store.InstanceStore, registry *prometheus.Registry, senderMock *AlertsSenderMock) (*schedule, *clock.Mock) {
t.Helper() t.Helper()
fakeAnnoRepo := store.NewFakeAnnotationsRepo() fakeAnnoRepo := store.NewFakeAnnotationsRepo()
annotations.SetRepository(fakeAnnoRepo) annotations.SetRepository(fakeAnnoRepo)
mockedClock := clock.NewMock() mockedClock := clock.NewMock()
logger := log.New("ngalert schedule test") logger := log.New("ngalert schedule test")
if rs == nil {
rs = store.NewFakeRuleStore(t)
}
if is == nil {
is = &store.FakeInstanceStore{}
}
var evaluator eval.Evaluator = evalMock
if evalMock == nil {
secretsService := secretsManager.SetupTestService(t, fakes.NewFakeSecretsStore())
evaluator = eval.NewEvaluator(&setting.Cfg{ExpressionsEnabled: true}, logger, nil, secretsService, expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, nil, nil))
}
if registry == nil { if registry == nil {
registry = prometheus.NewPedanticRegistry() registry = prometheus.NewPedanticRegistry()
} }
m := metrics.NewNGAlert(registry) m := metrics.NewNGAlert(registry)
secretsService := secretsManager.SetupTestService(t, fakes.NewFakeSecretsStore())
appUrl := &url.URL{ appUrl := &url.URL{
Scheme: "http", Scheme: "http",
@ -638,41 +512,27 @@ func setupScheduler(t *testing.T, rs store.RuleStore, is store.InstanceStore, re
} }
cfg := setting.UnifiedAlertingSettings{ cfg := setting.UnifiedAlertingSettings{
BaseInterval: time.Second, BaseInterval: time.Second,
MaxAttempts: 1, MaxAttempts: 1,
AdminConfigPollInterval: 10 * time.Minute, // do not poll in unit tests.
} }
schedCfg := SchedulerCfg{ schedCfg := SchedulerCfg{
Cfg: cfg, Cfg: cfg,
C: mockedClock, C: mockedClock,
Evaluator: eval.NewEvaluator(&setting.Cfg{ExpressionsEnabled: true}, logger, nil, secretsService, expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, nil, nil)), Evaluator: evaluator,
RuleStore: rs, RuleStore: rs,
InstanceStore: is, InstanceStore: is,
Logger: logger, Logger: logger,
Metrics: m.GetSchedulerMetrics(), Metrics: m.GetSchedulerMetrics(),
AlertSender: senderMock, AlertSender: senderMock,
} }
st := state.NewManager(schedCfg.Logger, m.GetStateMetrics(), nil, rs, is, &dashboards.FakeDashboardService{}, &image.NoopImageService{}, clock.NewMock()) st := state.NewManager(schedCfg.Logger, m.GetStateMetrics(), nil, rs, is, &dashboards.FakeDashboardService{}, &image.NoopImageService{}, mockedClock)
return NewScheduler(schedCfg, appUrl, st, busmock.New()), mockedClock return NewScheduler(schedCfg, appUrl, st, busmock.New())
} }
// createTestAlertRule creates a dummy alert definition to be used by the tests. func withQueryForState(t *testing.T, evalResult eval.State) models.AlertRuleMutator {
func CreateTestAlertRule(t *testing.T, dbstore *store.FakeRuleStore, intervalSeconds int64, orgID int64, evalResult eval.State) *models.AlertRule {
ctx := context.Background()
t.Helper()
records := make([]interface{}, 0, len(dbstore.RecordedOps))
copy(records, dbstore.RecordedOps)
defer func() {
// erase queries that were made by the testing suite
dbstore.RecordedOps = records
}()
d := rand.Intn(1000)
ruleGroup := fmt.Sprintf("ruleGroup-%d", d)
var expression string var expression string
var forDuration time.Duration var forMultimplier int64 = 0
switch evalResult { switch evalResult {
case eval.Normal: case eval.Normal:
expression = `{ expression = `{
@ -687,7 +547,7 @@ func CreateTestAlertRule(t *testing.T, dbstore *store.FakeRuleStore, intervalSec
"expression":"2 + 2 > 1" "expression":"2 + 2 > 1"
}` }`
if evalResult == eval.Pending { if evalResult == eval.Pending {
forDuration = 100 * time.Second forMultimplier = rand.Int63n(9) + 1
} }
case eval.Error: case eval.Error:
expression = `{ expression = `{
@ -695,17 +555,13 @@ func CreateTestAlertRule(t *testing.T, dbstore *store.FakeRuleStore, intervalSec
"type":"math", "type":"math",
"expression":"$A" "expression":"$A"
}` }`
case eval.NoData: default:
// TODO Implement support for NoData require.Fail(t, fmt.Sprintf("Alert rule with desired evaluation result '%s' is not supported yet", evalResult))
require.Fail(t, "Alert rule with desired evaluation result NoData is not supported yet")
} }
rule := &models.AlertRule{ return func(rule *models.AlertRule) {
ID: 1, rule.Condition = "A"
OrgID: orgID, rule.Data = []models.AlertQuery{
Title: fmt.Sprintf("an alert definition %d", d),
Condition: "A",
Data: []models.AlertQuery{
{ {
DatasourceUID: "-100", DatasourceUID: "-100",
Model: json.RawMessage(expression), Model: json.RawMessage(expression),
@ -715,22 +571,7 @@ func CreateTestAlertRule(t *testing.T, dbstore *store.FakeRuleStore, intervalSec
}, },
RefID: "A", RefID: "A",
}, },
}, }
Updated: time.Now(), rule.For = time.Duration(rule.IntervalSeconds*forMultimplier) * time.Second
IntervalSeconds: intervalSeconds,
Version: 1,
UID: util.GenerateShortUID(),
NamespaceUID: "namespace",
RuleGroup: ruleGroup,
NoDataState: models.NoData,
ExecErrState: models.AlertingErrState,
For: forDuration,
Annotations: map[string]string{"testAnnoKey": "testAnnoValue"},
Labels: make(map[string]string),
} }
dbstore.PutRule(ctx, rule)
t.Logf("alert definition: %v with interval: %d created", rule.GetKey(), rule.IntervalSeconds)
return rule
} }

@ -401,8 +401,8 @@ func (st DBstore) GetNamespaceByUID(ctx context.Context, uid string, orgID int64
// GetAlertRulesForScheduling returns a short version of all alert rules except those that belong to an excluded list of organizations // GetAlertRulesForScheduling returns a short version of all alert rules except those that belong to an excluded list of organizations
func (st DBstore) GetAlertRulesForScheduling(ctx context.Context, query *ngmodels.GetAlertRulesForSchedulingQuery) error { func (st DBstore) GetAlertRulesForScheduling(ctx context.Context, query *ngmodels.GetAlertRulesForSchedulingQuery) error {
return st.SQLStore.WithDbSession(ctx, func(sess *sqlstore.DBSession) error { return st.SQLStore.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
alerts := make([]*ngmodels.SchedulableAlertRule, 0) alerts := make([]*ngmodels.AlertRule, 0)
q := sess.Table("alert_rule") q := sess.Table(ngmodels.AlertRule{})
if len(st.Cfg.DisabledOrgs) > 0 { if len(st.Cfg.DisabledOrgs) > 0 {
excludeOrgs := make([]interface{}, 0, len(st.Cfg.DisabledOrgs)) excludeOrgs := make([]interface{}, 0, len(st.Cfg.DisabledOrgs))
for orgID := range st.Cfg.DisabledOrgs { for orgID := range st.Cfg.DisabledOrgs {

@ -183,14 +183,7 @@ func (f *FakeRuleStore) GetAlertRulesForScheduling(_ context.Context, q *models.
return err return err
} }
for _, rules := range f.Rules { for _, rules := range f.Rules {
for _, rule := range rules { q.Result = append(q.Result, rules...)
q.Result = append(q.Result, &models.SchedulableAlertRule{
UID: rule.UID,
OrgID: rule.OrgID,
IntervalSeconds: rule.IntervalSeconds,
Version: rule.Version,
})
}
} }
return nil return nil
} }

Loading…
Cancel
Save