diff --git a/pkg/services/ngalert/schedule/alert_rule.go b/pkg/services/ngalert/schedule/alert_rule.go index 85c254b0a59..771d32a2573 100644 --- a/pkg/services/ngalert/schedule/alert_rule.go +++ b/pkg/services/ngalert/schedule/alert_rule.go @@ -37,7 +37,7 @@ type Rule interface { // It has no effect if the rule has not yet been Run, or if the rule is Stopped. Eval(eval *Evaluation) (bool, *Evaluation) // Update sends a singal to change the definition of the rule. - Update(lastVersion RuleVersionAndPauseStatus) bool + Update(eval *Evaluation) bool // Type gives the type of the rule. Type() ngmodels.RuleType // Status indicates the status of the evaluating rule. @@ -59,7 +59,6 @@ func newRuleFactory( sender AlertsSender, stateManager *state.Manager, evalFactory eval.EvaluatorFactory, - ruleProvider ruleProvider, clock clock.Clock, rrCfg setting.RecordingRuleSettings, met *metrics.Scheduler, @@ -95,7 +94,6 @@ func newRuleFactory( sender, stateManager, evalFactory, - ruleProvider, clock, met, logger, @@ -109,15 +107,11 @@ func newRuleFactory( type evalAppliedFunc = func(ngmodels.AlertRuleKey, time.Time) type stopAppliedFunc = func(ngmodels.AlertRuleKey) -type ruleProvider interface { - get(ngmodels.AlertRuleKey) *ngmodels.AlertRule -} - type alertRule struct { key ngmodels.AlertRuleKeyWithGroup evalCh chan *Evaluation - updateCh chan RuleVersionAndPauseStatus + updateCh chan *Evaluation ctx context.Context stopFn util.CancelCauseFunc @@ -129,7 +123,6 @@ type alertRule struct { sender AlertsSender stateManager *state.Manager evalFactory eval.EvaluatorFactory - ruleProvider ruleProvider // Event hooks that are only used in tests. evalAppliedHook evalAppliedFunc @@ -149,7 +142,6 @@ func newAlertRule( sender AlertsSender, stateManager *state.Manager, evalFactory eval.EvaluatorFactory, - ruleProvider ruleProvider, clock clock.Clock, met *metrics.Scheduler, logger log.Logger, @@ -161,7 +153,7 @@ func newAlertRule( return &alertRule{ key: key, evalCh: make(chan *Evaluation), - updateCh: make(chan RuleVersionAndPauseStatus), + updateCh: make(chan *Evaluation), ctx: ctx, stopFn: stop, appURL: appURL, @@ -171,7 +163,6 @@ func newAlertRule( sender: sender, stateManager: stateManager, evalFactory: evalFactory, - ruleProvider: ruleProvider, evalAppliedHook: evalAppliedHook, stopAppliedHook: stopAppliedHook, metrics: met, @@ -220,7 +211,7 @@ func (a *alertRule) Eval(eval *Evaluation) (bool, *Evaluation) { } // update sends an instruction to the rule evaluation routine to update the scheduled rule to the specified version. The specified version must be later than the current version, otherwise no update will happen. -func (a *alertRule) Update(lastVersion RuleVersionAndPauseStatus) bool { +func (a *alertRule) Update(eval *Evaluation) bool { // check if the channel is not empty. select { case <-a.updateCh: @@ -230,7 +221,7 @@ func (a *alertRule) Update(lastVersion RuleVersionAndPauseStatus) bool { } select { - case a.updateCh <- lastVersion: + case a.updateCh <- eval: return true case <-a.ctx.Done(): return false @@ -254,15 +245,16 @@ func (a *alertRule) Run() error { select { // used by external services (API) to notify that rule is updated. case ctx := <-a.updateCh: - if currentFingerprint == ctx.Fingerprint { + fp := ctx.Fingerprint() + if currentFingerprint == fp { a.logger.Info("Rule's fingerprint has not changed. Skip resetting the state", "currentFingerprint", currentFingerprint) continue } - a.logger.Info("Clearing the state of the rule because it was updated", "isPaused", ctx.IsPaused, "fingerprint", ctx.Fingerprint) + a.logger.Info("Clearing the state of the rule because it was updated", "isPaused", ctx.rule.IsPaused, "fingerprint", fp) // clear the state. So the next evaluation will start from the scratch. - a.resetState(grafanaCtx, ctx.IsPaused) - currentFingerprint = ctx.Fingerprint + a.resetState(grafanaCtx, ctx.rule, ctx.rule.IsPaused) + currentFingerprint = fp // evalCh - used by the scheduler to signal that evaluation is needed. case ctx, ok := <-a.evalCh: if !ok { @@ -298,7 +290,7 @@ func (a *alertRule) Run() error { // lingers in DB and won't be cleaned up until next alert rule update. needReset = needReset || (currentFingerprint == 0 && isPaused) if needReset { - a.resetState(grafanaCtx, isPaused) + a.resetState(grafanaCtx, ctx.rule, isPaused) } currentFingerprint = f if isPaused { @@ -494,8 +486,7 @@ func (a *alertRule) expireAndSend(ctx context.Context, states []state.StateTrans } } -func (a *alertRule) resetState(ctx context.Context, isPaused bool) { - rule := a.ruleProvider.get(a.key.AlertRuleKey) +func (a *alertRule) resetState(ctx context.Context, rule *ngmodels.AlertRule, isPaused bool) { reason := ngmodels.StateReasonUpdated if isPaused { reason = ngmodels.StateReasonPaused diff --git a/pkg/services/ngalert/schedule/alert_rule_test.go b/pkg/services/ngalert/schedule/alert_rule_test.go index 02ea0dd5735..b9f66875c6b 100644 --- a/pkg/services/ngalert/schedule/alert_rule_test.go +++ b/pkg/services/ngalert/schedule/alert_rule_test.go @@ -43,7 +43,7 @@ func TestAlertRule(t *testing.T) { r := blankRuleForTests(context.Background(), models.GenerateRuleKeyWithGroup(1)) resultCh := make(chan bool) go func() { - resultCh <- r.Update(RuleVersionAndPauseStatus{fingerprint(rand.Uint64()), false}) + resultCh <- r.Update(&Evaluation{rule: gen.With(gen.WithIsPaused(false)).GenerateRef()}) }() select { case <-r.updateCh: @@ -54,8 +54,8 @@ func TestAlertRule(t *testing.T) { }) t.Run("update should drop any concurrent sending to updateCh", func(t *testing.T) { r := blankRuleForTests(context.Background(), models.GenerateRuleKeyWithGroup(1)) - version1 := RuleVersionAndPauseStatus{fingerprint(rand.Uint64()), false} - version2 := RuleVersionAndPauseStatus{fingerprint(rand.Uint64()), false} + version1 := &Evaluation{rule: gen.With(gen.WithIsPaused(false)).GenerateRef()} + version2 := &Evaluation{rule: gen.With(gen.WithIsPaused(false)).GenerateRef()} wg := sync.WaitGroup{} wg.Add(1) @@ -178,7 +178,7 @@ func TestAlertRule(t *testing.T) { r := blankRuleForTests(context.Background(), models.GenerateRuleKeyWithGroup(1)) r.Stop(errRuleDeleted) require.ErrorIs(t, r.ctx.Err(), errRuleDeleted) - require.False(t, r.Update(RuleVersionAndPauseStatus{fingerprint(rand.Uint64()), false})) + require.False(t, r.Update(&Evaluation{rule: gen.GenerateRef()})) }) t.Run("eval should do nothing", func(t *testing.T) { ruleSpec := gen.GenerateRef() @@ -234,7 +234,7 @@ func TestAlertRule(t *testing.T) { } switch rand.Intn(max) + 1 { case 1: - r.Update(RuleVersionAndPauseStatus{fingerprint(rand.Uint64()), false}) + r.Update(&Evaluation{rule: gen.GenerateRef()}) case 2: r.Eval(&Evaluation{ scheduledAt: time.Now(), @@ -284,7 +284,7 @@ func blankRuleForTests(ctx context.Context, key models.AlertRuleKeyWithGroup) *a Log: log.NewNopLogger(), } st := state.NewManager(managerCfg, state.NewNoopPersister()) - return newAlertRule(ctx, key, nil, false, 0, nil, st, nil, nil, nil, nil, log.NewNopLogger(), nil, nil, nil) + return newAlertRule(ctx, key, nil, false, 0, nil, st, nil, nil, nil, log.NewNopLogger(), nil, nil, nil) } func TestRuleRoutine(t *testing.T) { @@ -572,7 +572,6 @@ func TestRuleRoutine(t *testing.T) { t.Run("when a message is sent to update channel", func(t *testing.T) { rule := gen.With(withQueryForState(t, eval.Normal)).GenerateRef() folderTitle := "folderName" - ruleFp := ruleWithFolder{rule, folderTitle}.Fingerprint() evalAppliedChan := make(chan time.Time) @@ -628,8 +627,8 @@ func TestRuleRoutine(t *testing.T) { require.Greaterf(t, expectedToBeSent, 0, "State manager was expected to return at least one state that can be expired") t.Run("should do nothing if version in channel is the same", func(t *testing.T) { - ruleInfo.Update(RuleVersionAndPauseStatus{ruleFp, false}) - ruleInfo.Update(RuleVersionAndPauseStatus{ruleFp, false}) // second time just to make sure that previous messages were handled + ruleInfo.Update(&Evaluation{rule: rule, folderTitle: folderTitle}) + ruleInfo.Update(&Evaluation{rule: rule, folderTitle: folderTitle}) // second time just to make sure that previous messages were handled actualStates := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID) require.Len(t, actualStates, len(states)) @@ -638,7 +637,7 @@ func TestRuleRoutine(t *testing.T) { }) t.Run("should clear the state and expire firing alerts if version in channel is greater", func(t *testing.T) { - ruleInfo.Update(RuleVersionAndPauseStatus{ruleFp + 1, false}) + ruleInfo.Update(&Evaluation{rule: models.CopyRule(rule, gen.WithTitle(util.GenerateShortUID())), folderTitle: folderTitle}) require.Eventually(t, func() bool { return len(sender.Calls()) > 0 @@ -905,7 +904,7 @@ func TestRuleRoutine(t *testing.T) { } func ruleFactoryFromScheduler(sch *schedule) ruleFactory { - return newRuleFactory(sch.appURL, sch.disableGrafanaFolder, sch.maxAttempts, sch.alertsSender, sch.stateManager, sch.evaluatorFactory, &sch.schedulableAlertRules, sch.clock, sch.rrCfg, sch.metrics, sch.log, sch.tracer, sch.recordingWriter, sch.evalAppliedFunc, sch.stopAppliedFunc) + return newRuleFactory(sch.appURL, sch.disableGrafanaFolder, sch.maxAttempts, sch.alertsSender, sch.stateManager, sch.evaluatorFactory, sch.clock, sch.rrCfg, sch.metrics, sch.log, sch.tracer, sch.recordingWriter, sch.evalAppliedFunc, sch.stopAppliedFunc) } func stateForRule(rule *models.AlertRule, ts time.Time, evalState eval.State) *state.State { diff --git a/pkg/services/ngalert/schedule/recording_rule.go b/pkg/services/ngalert/schedule/recording_rule.go index 954b0869a4a..6c3f733a5ca 100644 --- a/pkg/services/ngalert/schedule/recording_rule.go +++ b/pkg/services/ngalert/schedule/recording_rule.go @@ -113,7 +113,7 @@ func (r *recordingRule) Eval(eval *Evaluation) (bool, *Evaluation) { } } -func (r *recordingRule) Update(lastVersion RuleVersionAndPauseStatus) bool { +func (r *recordingRule) Update(_ *Evaluation) bool { return true } diff --git a/pkg/services/ngalert/schedule/recording_rule_test.go b/pkg/services/ngalert/schedule/recording_rule_test.go index 7e72453bb9f..c880b1d417f 100644 --- a/pkg/services/ngalert/schedule/recording_rule_test.go +++ b/pkg/services/ngalert/schedule/recording_rule_test.go @@ -17,6 +17,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/grafana-plugin-sdk-go/backend/httpclient" + "github.com/grafana/grafana/pkg/expr" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/services/ngalert/metrics" @@ -116,7 +117,10 @@ func TestRecordingRule(t *testing.T) { } switch rand.Intn(max) + 1 { case 1: - r.Update(RuleVersionAndPauseStatus{fingerprint(rand.Uint64()), false}) + r.Update(&Evaluation{ + rule: gen.GenerateRef(), + folderTitle: util.GenerateShortUID(), + }) case 2: r.Eval(&Evaluation{ scheduledAt: time.Now(), @@ -492,7 +496,7 @@ func TestRecordingRule_Integration(t *testing.T) { t.Run("status shows evaluation", func(t *testing.T) { status := process.(*recordingRule).Status() - //TODO: assert "error" to fix test, update to "nodata" in the future + // TODO: assert "error" to fix test, update to "nodata" in the future require.Equal(t, "error", status.Health) }) }) diff --git a/pkg/services/ngalert/schedule/registry.go b/pkg/services/ngalert/schedule/registry.go index 11f72229f51..8ef0c87ddde 100644 --- a/pkg/services/ngalert/schedule/registry.go +++ b/pkg/services/ngalert/schedule/registry.go @@ -87,11 +87,6 @@ func (r *ruleRegistry) keyMap() map[models.AlertRuleKey]struct{} { return definitionsIDs } -type RuleVersionAndPauseStatus struct { - Fingerprint fingerprint - IsPaused bool -} - type Evaluation struct { scheduledAt time.Time rule *models.AlertRule diff --git a/pkg/services/ngalert/schedule/schedule.go b/pkg/services/ngalert/schedule/schedule.go index 3bc21115ca0..0416fa2fb7e 100644 --- a/pkg/services/ngalert/schedule/schedule.go +++ b/pkg/services/ngalert/schedule/schedule.go @@ -302,7 +302,6 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup. sch.alertsSender, sch.stateManager, sch.evaluatorFactory, - &sch.schedulableAlertRules, sch.clock, sch.rrCfg, sch.metrics, @@ -372,9 +371,9 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup. // if we do not need to eval the rule, check the whether rule was just updated and if it was, notify evaluation routine about that logger.Debug("Rule has been updated. Notifying evaluation routine") go func(routine Rule, rule *ngmodels.AlertRule) { - routine.Update(RuleVersionAndPauseStatus{ - Fingerprint: ruleWithFolder{rule: rule, folderTitle: folderTitle}.Fingerprint(), - IsPaused: rule.IsPaused, + routine.Update(&Evaluation{ + rule: rule, + folderTitle: folderTitle, }) }(ruleRoutine, item) updatedRules = append(updatedRules, ngmodels.AlertRuleKeyWithVersion{