diff --git a/pkg/services/ngalert/schedule/schedule.go b/pkg/services/ngalert/schedule/schedule.go index 4acad845a0e..1d54285b0ce 100644 --- a/pkg/services/ngalert/schedule/schedule.go +++ b/pkg/services/ngalert/schedule/schedule.go @@ -120,7 +120,7 @@ func NewScheduler(cfg SchedulerCfg, expressionService *expr.Service, appURL *url ticker := alerting.NewTicker(cfg.C.Now(), time.Second*0, cfg.C, int64(cfg.BaseInterval.Seconds())) sch := schedule{ - registry: alertRuleRegistry{alertRuleInfo: make(map[models.AlertRuleKey]alertRuleInfo)}, + registry: alertRuleRegistry{alertRuleInfo: make(map[models.AlertRuleKey]*alertRuleInfo)}, maxAttempts: cfg.MaxAttempts, clock: cfg.C, baseInterval: cfg.BaseInterval, @@ -349,15 +349,15 @@ func (sch *schedule) ruleEvaluationLoop(ctx context.Context) error { type readyToRunItem struct { key models.AlertRuleKey - ruleInfo alertRuleInfo + ruleInfo *alertRuleInfo + version int64 } readyToRun := make([]readyToRunItem, 0) for _, item := range alertRules { key := item.GetKey() itemVersion := item.Version - newRoutine := !sch.registry.exists(key) - ruleInfo := sch.registry.getOrCreateInfo(key, itemVersion) + ruleInfo, newRoutine := sch.registry.getOrCreateInfo(ctx, key) // enforce minimum evaluation interval if item.IntervalSeconds < int64(sch.minRuleInterval.Seconds()) { @@ -369,7 +369,7 @@ func (sch *schedule) ruleEvaluationLoop(ctx context.Context) error { if newRoutine && !invalidInterval { dispatcherGroup.Go(func() error { - return sch.ruleRoutine(ctx, key, ruleInfo.evalCh, ruleInfo.stopCh) + return sch.ruleRoutine(ruleInfo.ctx, key, ruleInfo.evalCh) }) } @@ -382,7 +382,7 @@ func (sch *schedule) ruleEvaluationLoop(ctx context.Context) error { itemFrequency := item.IntervalSeconds / int64(sch.baseInterval.Seconds()) if item.IntervalSeconds != 0 && tickNum%itemFrequency == 0 { - readyToRun = append(readyToRun, readyToRunItem{key: key, ruleInfo: ruleInfo}) + readyToRun = append(readyToRun, readyToRunItem{key: key, ruleInfo: ruleInfo, version: itemVersion}) } // remove the alert rule from the registered alert rules @@ -398,19 +398,21 @@ func (sch *schedule) ruleEvaluationLoop(ctx context.Context) error { item := readyToRun[i] time.AfterFunc(time.Duration(int64(i)*step), func() { - item.ruleInfo.evalCh <- &evalContext{now: tick, version: item.ruleInfo.version} + success := item.ruleInfo.eval(tick, item.version) + if !success { + sch.log.Debug("Scheduled evaluation was canceled because evaluation routine was stopped", "uid", item.key.UID, "org", item.key.OrgID, "time", tick) + } }) } // unregister and stop routines of the deleted alert rules for key := range registeredDefinitions { - ruleInfo, err := sch.registry.get(key) - if err != nil { - sch.log.Error("failed to get alert rule routine information", "err", err) + ruleInfo, ok := sch.registry.del(key) + if !ok { + sch.log.Error("unable to delete alert rule routine information because it did not exist", "uid", key.UID, "org_id", key.OrgID) continue } - ruleInfo.stopCh <- struct{}{} - sch.registry.del(key) + ruleInfo.stop() } case <-ctx.Done(): waitErr := dispatcherGroup.Wait() @@ -430,7 +432,7 @@ func (sch *schedule) ruleEvaluationLoop(ctx context.Context) error { } } -func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRuleKey, evalCh <-chan *evalContext, stopCh <-chan struct{}) error { +func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRuleKey, evalCh <-chan *evalContext) error { logger := sch.log.New("uid", key.UID, "org", key.OrgID) logger.Debug("alert rule routine started") @@ -525,9 +527,14 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRul evalRunning := false var currentRule *models.AlertRule + defer sch.stopApplied(key) for { select { - case ctx := <-evalCh: + case ctx, ok := <-evalCh: + if !ok { + logger.Debug("Evaluation channel has been closed. Exiting") + return nil + } if evalRunning { continue } @@ -555,13 +562,9 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRul logger.Error("evaluation failed after all retries", "err", err) } }() - case <-stopCh: - sch.stopApplied(key) + case <-grafanaCtx.Done(): logger.Debug("stopping alert rule routine") - // interrupt evaluation if it's running return nil - case <-grafanaCtx.Done(): - return grafanaCtx.Err() } } } @@ -587,51 +590,34 @@ func (sch *schedule) saveAlertStates(states []*state.State) { type alertRuleRegistry struct { mu sync.Mutex - alertRuleInfo map[models.AlertRuleKey]alertRuleInfo + alertRuleInfo map[models.AlertRuleKey]*alertRuleInfo } -// getOrCreateInfo returns the channel for the specific alert rule -// if it does not exists creates one and returns it -func (r *alertRuleRegistry) getOrCreateInfo(key models.AlertRuleKey, ruleVersion int64) alertRuleInfo { +// getOrCreateInfo gets rule routine information from registry by the key. If it does not exist, it creates a new one. +// Returns a pointer to the rule routine information and a flag that indicates whether it is a new struct or not. +func (r *alertRuleRegistry) getOrCreateInfo(context context.Context, key models.AlertRuleKey) (*alertRuleInfo, bool) { r.mu.Lock() defer r.mu.Unlock() info, ok := r.alertRuleInfo[key] if !ok { - r.alertRuleInfo[key] = alertRuleInfo{evalCh: make(chan *evalContext), stopCh: make(chan struct{}), version: ruleVersion} - return r.alertRuleInfo[key] + info = newAlertRuleInfo(context) + r.alertRuleInfo[key] = info } - info.version = ruleVersion - r.alertRuleInfo[key] = info - return info + return info, !ok } -// get returns the channel for the specific alert rule -// if the key does not exist returns an error -func (r *alertRuleRegistry) get(key models.AlertRuleKey) (*alertRuleInfo, error) { +// del removes pair that has specific key from alertRuleInfo. +// Returns 2-tuple where the first element is value of the removed pair +// and the second element indicates whether element with the specified key existed. +func (r *alertRuleRegistry) del(key models.AlertRuleKey) (*alertRuleInfo, bool) { r.mu.Lock() defer r.mu.Unlock() - info, ok := r.alertRuleInfo[key] - if !ok { - return nil, fmt.Errorf("%v key not found", key) + if ok { + delete(r.alertRuleInfo, key) } - return &info, nil -} - -func (r *alertRuleRegistry) exists(key models.AlertRuleKey) bool { - r.mu.Lock() - defer r.mu.Unlock() - - _, ok := r.alertRuleInfo[key] - return ok -} - -func (r *alertRuleRegistry) del(key models.AlertRuleKey) { - r.mu.Lock() - defer r.mu.Unlock() - - delete(r.alertRuleInfo, key) + return info, ok } func (r *alertRuleRegistry) iter() <-chan models.AlertRuleKey { @@ -660,9 +646,27 @@ func (r *alertRuleRegistry) keyMap() map[models.AlertRuleKey]struct{} { } type alertRuleInfo struct { - evalCh chan *evalContext - stopCh chan struct{} - version int64 + evalCh chan *evalContext + ctx context.Context + stop context.CancelFunc +} + +func newAlertRuleInfo(parent context.Context) *alertRuleInfo { + ctx, cancel := context.WithCancel(parent) + return &alertRuleInfo{evalCh: make(chan *evalContext), ctx: ctx, stop: cancel} +} + +// eval signals the rule evaluation routine to perform the evaluation of the rule. Does nothing if the loop is stopped +func (a *alertRuleInfo) eval(t time.Time, version int64) bool { + select { + case a.evalCh <- &evalContext{ + now: t, + version: version, + }: + return true + case <-a.ctx.Done(): + return false + } } type evalContext struct { diff --git a/pkg/services/ngalert/schedule/schedule_unit_test.go b/pkg/services/ngalert/schedule/schedule_unit_test.go index ecb45e97804..bbc7973242a 100644 --- a/pkg/services/ngalert/schedule/schedule_unit_test.go +++ b/pkg/services/ngalert/schedule/schedule_unit_test.go @@ -7,6 +7,8 @@ import ( "fmt" "math/rand" "net/url" + "runtime" + "sync" "testing" "time" @@ -272,11 +274,9 @@ func TestSchedule_ruleRoutine(t *testing.T) { rule := CreateTestAlertRule(t, ruleStore, 10, rand.Int63(), evalState) go func() { - stop := make(chan struct{}) - t.Cleanup(func() { - close(stop) - }) - _ = sch.ruleRoutine(context.Background(), rule.GetKey(), evalChan, stop) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + _ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan) }() expectedTime := time.UnixMicro(rand.Int63()) @@ -367,35 +367,19 @@ func TestSchedule_ruleRoutine(t *testing.T) { } t.Run("should exit", func(t *testing.T) { - t.Run("when we signal it to stop", func(t *testing.T) { - stopChan := make(chan struct{}) - stoppedChan := make(chan error) - - sch, _, _, _, _ := createSchedule(make(chan time.Time)) - - go func() { - err := sch.ruleRoutine(context.Background(), models.AlertRuleKey{}, make(chan *evalContext), stopChan) - stoppedChan <- err - }() - - stopChan <- struct{}{} - err := waitForErrChannel(t, stoppedChan) - require.NoError(t, err) - }) - t.Run("when context is cancelled", func(t *testing.T) { stoppedChan := make(chan error) sch, _, _, _, _ := createSchedule(make(chan time.Time)) ctx, cancel := context.WithCancel(context.Background()) go func() { - err := sch.ruleRoutine(ctx, models.AlertRuleKey{}, make(chan *evalContext), make(chan struct{})) + err := sch.ruleRoutine(ctx, models.AlertRuleKey{}, make(chan *evalContext)) stoppedChan <- err }() cancel() err := waitForErrChannel(t, stoppedChan) - require.ErrorIs(t, err, context.Canceled) + require.NoError(t, err) }) }) @@ -408,11 +392,9 @@ func TestSchedule_ruleRoutine(t *testing.T) { rule := CreateTestAlertRule(t, ruleStore, 10, rand.Int63(), randomNormalState()) go func() { - stop := make(chan struct{}) - t.Cleanup(func() { - close(stop) - }) - _ = sch.ruleRoutine(context.Background(), rule.GetKey(), evalChan, stop) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + _ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan) }() expectedTime := time.UnixMicro(rand.Int63()) @@ -462,11 +444,9 @@ func TestSchedule_ruleRoutine(t *testing.T) { rule := CreateTestAlertRule(t, ruleStore, 10, rand.Int63(), randomNormalState()) go func() { - stop := make(chan struct{}) - t.Cleanup(func() { - close(stop) - }) - _ = sch.ruleRoutine(context.Background(), rule.GetKey(), evalChan, stop) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + _ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan) }() expectedTime := time.UnixMicro(rand.Int63()) @@ -547,11 +527,9 @@ func TestSchedule_ruleRoutine(t *testing.T) { rule := CreateTestAlertRule(t, ruleStore, 10, orgID, eval.Alerting) go func() { - stop := make(chan struct{}) - t.Cleanup(func() { - close(stop) - }) - _ = sch.ruleRoutine(context.Background(), rule.GetKey(), evalChan, stop) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + _ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan) }() evalChan <- &evalContext{ @@ -574,6 +552,90 @@ func TestSchedule_ruleRoutine(t *testing.T) { }) } +func TestSchedule_alertRuleInfo(t *testing.T) { + t.Run("when rule evaluation is not stopped", func(t *testing.T) { + t.Run("eval should send to evalCh", func(t *testing.T) { + r := newAlertRuleInfo(context.Background()) + expected := time.Now() + resultCh := make(chan bool) + version := rand.Int63() + go func() { + resultCh <- r.eval(expected, version) + }() + select { + case ctx := <-r.evalCh: + require.Equal(t, version, ctx.version) + require.Equal(t, expected, ctx.now) + require.True(t, <-resultCh) + case <-time.After(5 * time.Second): + t.Fatal("No message was received on eval channel") + } + }) + t.Run("eval should exit when context is cancelled", func(t *testing.T) { + r := newAlertRuleInfo(context.Background()) + resultCh := make(chan bool) + go func() { + resultCh <- r.eval(time.Now(), rand.Int63()) + }() + runtime.Gosched() + r.stop() + select { + case result := <-resultCh: + require.False(t, result) + case <-time.After(5 * time.Second): + t.Fatal("No message was received on eval channel") + } + }) + }) + t.Run("when rule evaluation is stopped", func(t *testing.T) { + t.Run("eval should do nothing", func(t *testing.T) { + r := newAlertRuleInfo(context.Background()) + r.stop() + require.False(t, r.eval(time.Now(), rand.Int63())) + }) + t.Run("stop should do nothing", func(t *testing.T) { + r := newAlertRuleInfo(context.Background()) + r.stop() + r.stop() + }) + }) + t.Run("should be thread-safe", func(t *testing.T) { + r := newAlertRuleInfo(context.Background()) + wg := sync.WaitGroup{} + go func() { + for { + select { + case <-r.evalCh: + time.Sleep(time.Millisecond) + case <-r.ctx.Done(): + return + } + } + }() + + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + for i := 0; i < 20; i++ { + max := 2 + if i <= 10 { + max = 1 + } + switch rand.Intn(max) + 1 { + case 1: + r.eval(time.Now(), rand.Int63()) + case 2: + r.stop() + } + } + wg.Done() + }() + } + + wg.Wait() + }) +} + func setupScheduler(t *testing.T, rs store.RuleStore, is store.InstanceStore, acs store.AdminConfigurationStore, registry *prometheus.Registry) (*schedule, *clock.Mock) { t.Helper()