Alerting: Update scheduler to provide full specification to rule update channel (#101375)

update scheduler's aler rule to accept regular Evaluation in update channel

This makes it accept the full rule definition, which is required in reset state.
pull/101380/head
Yuri Tseretyan 3 months ago committed by GitHub
parent b5faf5d9a1
commit 32fde6dba4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 33
      pkg/services/ngalert/schedule/alert_rule.go
  2. 21
      pkg/services/ngalert/schedule/alert_rule_test.go
  3. 2
      pkg/services/ngalert/schedule/recording_rule.go
  4. 8
      pkg/services/ngalert/schedule/recording_rule_test.go
  5. 5
      pkg/services/ngalert/schedule/registry.go
  6. 7
      pkg/services/ngalert/schedule/schedule.go

@ -37,7 +37,7 @@ type Rule interface {
// It has no effect if the rule has not yet been Run, or if the rule is Stopped.
Eval(eval *Evaluation) (bool, *Evaluation)
// Update sends a singal to change the definition of the rule.
Update(lastVersion RuleVersionAndPauseStatus) bool
Update(eval *Evaluation) bool
// Type gives the type of the rule.
Type() ngmodels.RuleType
// Status indicates the status of the evaluating rule.
@ -59,7 +59,6 @@ func newRuleFactory(
sender AlertsSender,
stateManager *state.Manager,
evalFactory eval.EvaluatorFactory,
ruleProvider ruleProvider,
clock clock.Clock,
rrCfg setting.RecordingRuleSettings,
met *metrics.Scheduler,
@ -95,7 +94,6 @@ func newRuleFactory(
sender,
stateManager,
evalFactory,
ruleProvider,
clock,
met,
logger,
@ -109,15 +107,11 @@ func newRuleFactory(
type evalAppliedFunc = func(ngmodels.AlertRuleKey, time.Time)
type stopAppliedFunc = func(ngmodels.AlertRuleKey)
type ruleProvider interface {
get(ngmodels.AlertRuleKey) *ngmodels.AlertRule
}
type alertRule struct {
key ngmodels.AlertRuleKeyWithGroup
evalCh chan *Evaluation
updateCh chan RuleVersionAndPauseStatus
updateCh chan *Evaluation
ctx context.Context
stopFn util.CancelCauseFunc
@ -129,7 +123,6 @@ type alertRule struct {
sender AlertsSender
stateManager *state.Manager
evalFactory eval.EvaluatorFactory
ruleProvider ruleProvider
// Event hooks that are only used in tests.
evalAppliedHook evalAppliedFunc
@ -149,7 +142,6 @@ func newAlertRule(
sender AlertsSender,
stateManager *state.Manager,
evalFactory eval.EvaluatorFactory,
ruleProvider ruleProvider,
clock clock.Clock,
met *metrics.Scheduler,
logger log.Logger,
@ -161,7 +153,7 @@ func newAlertRule(
return &alertRule{
key: key,
evalCh: make(chan *Evaluation),
updateCh: make(chan RuleVersionAndPauseStatus),
updateCh: make(chan *Evaluation),
ctx: ctx,
stopFn: stop,
appURL: appURL,
@ -171,7 +163,6 @@ func newAlertRule(
sender: sender,
stateManager: stateManager,
evalFactory: evalFactory,
ruleProvider: ruleProvider,
evalAppliedHook: evalAppliedHook,
stopAppliedHook: stopAppliedHook,
metrics: met,
@ -220,7 +211,7 @@ func (a *alertRule) Eval(eval *Evaluation) (bool, *Evaluation) {
}
// update sends an instruction to the rule evaluation routine to update the scheduled rule to the specified version. The specified version must be later than the current version, otherwise no update will happen.
func (a *alertRule) Update(lastVersion RuleVersionAndPauseStatus) bool {
func (a *alertRule) Update(eval *Evaluation) bool {
// check if the channel is not empty.
select {
case <-a.updateCh:
@ -230,7 +221,7 @@ func (a *alertRule) Update(lastVersion RuleVersionAndPauseStatus) bool {
}
select {
case a.updateCh <- lastVersion:
case a.updateCh <- eval:
return true
case <-a.ctx.Done():
return false
@ -254,15 +245,16 @@ func (a *alertRule) Run() error {
select {
// used by external services (API) to notify that rule is updated.
case ctx := <-a.updateCh:
if currentFingerprint == ctx.Fingerprint {
fp := ctx.Fingerprint()
if currentFingerprint == fp {
a.logger.Info("Rule's fingerprint has not changed. Skip resetting the state", "currentFingerprint", currentFingerprint)
continue
}
a.logger.Info("Clearing the state of the rule because it was updated", "isPaused", ctx.IsPaused, "fingerprint", ctx.Fingerprint)
a.logger.Info("Clearing the state of the rule because it was updated", "isPaused", ctx.rule.IsPaused, "fingerprint", fp)
// clear the state. So the next evaluation will start from the scratch.
a.resetState(grafanaCtx, ctx.IsPaused)
currentFingerprint = ctx.Fingerprint
a.resetState(grafanaCtx, ctx.rule, ctx.rule.IsPaused)
currentFingerprint = fp
// evalCh - used by the scheduler to signal that evaluation is needed.
case ctx, ok := <-a.evalCh:
if !ok {
@ -298,7 +290,7 @@ func (a *alertRule) Run() error {
// lingers in DB and won't be cleaned up until next alert rule update.
needReset = needReset || (currentFingerprint == 0 && isPaused)
if needReset {
a.resetState(grafanaCtx, isPaused)
a.resetState(grafanaCtx, ctx.rule, isPaused)
}
currentFingerprint = f
if isPaused {
@ -494,8 +486,7 @@ func (a *alertRule) expireAndSend(ctx context.Context, states []state.StateTrans
}
}
func (a *alertRule) resetState(ctx context.Context, isPaused bool) {
rule := a.ruleProvider.get(a.key.AlertRuleKey)
func (a *alertRule) resetState(ctx context.Context, rule *ngmodels.AlertRule, isPaused bool) {
reason := ngmodels.StateReasonUpdated
if isPaused {
reason = ngmodels.StateReasonPaused

@ -43,7 +43,7 @@ func TestAlertRule(t *testing.T) {
r := blankRuleForTests(context.Background(), models.GenerateRuleKeyWithGroup(1))
resultCh := make(chan bool)
go func() {
resultCh <- r.Update(RuleVersionAndPauseStatus{fingerprint(rand.Uint64()), false})
resultCh <- r.Update(&Evaluation{rule: gen.With(gen.WithIsPaused(false)).GenerateRef()})
}()
select {
case <-r.updateCh:
@ -54,8 +54,8 @@ func TestAlertRule(t *testing.T) {
})
t.Run("update should drop any concurrent sending to updateCh", func(t *testing.T) {
r := blankRuleForTests(context.Background(), models.GenerateRuleKeyWithGroup(1))
version1 := RuleVersionAndPauseStatus{fingerprint(rand.Uint64()), false}
version2 := RuleVersionAndPauseStatus{fingerprint(rand.Uint64()), false}
version1 := &Evaluation{rule: gen.With(gen.WithIsPaused(false)).GenerateRef()}
version2 := &Evaluation{rule: gen.With(gen.WithIsPaused(false)).GenerateRef()}
wg := sync.WaitGroup{}
wg.Add(1)
@ -178,7 +178,7 @@ func TestAlertRule(t *testing.T) {
r := blankRuleForTests(context.Background(), models.GenerateRuleKeyWithGroup(1))
r.Stop(errRuleDeleted)
require.ErrorIs(t, r.ctx.Err(), errRuleDeleted)
require.False(t, r.Update(RuleVersionAndPauseStatus{fingerprint(rand.Uint64()), false}))
require.False(t, r.Update(&Evaluation{rule: gen.GenerateRef()}))
})
t.Run("eval should do nothing", func(t *testing.T) {
ruleSpec := gen.GenerateRef()
@ -234,7 +234,7 @@ func TestAlertRule(t *testing.T) {
}
switch rand.Intn(max) + 1 {
case 1:
r.Update(RuleVersionAndPauseStatus{fingerprint(rand.Uint64()), false})
r.Update(&Evaluation{rule: gen.GenerateRef()})
case 2:
r.Eval(&Evaluation{
scheduledAt: time.Now(),
@ -284,7 +284,7 @@ func blankRuleForTests(ctx context.Context, key models.AlertRuleKeyWithGroup) *a
Log: log.NewNopLogger(),
}
st := state.NewManager(managerCfg, state.NewNoopPersister())
return newAlertRule(ctx, key, nil, false, 0, nil, st, nil, nil, nil, nil, log.NewNopLogger(), nil, nil, nil)
return newAlertRule(ctx, key, nil, false, 0, nil, st, nil, nil, nil, log.NewNopLogger(), nil, nil, nil)
}
func TestRuleRoutine(t *testing.T) {
@ -572,7 +572,6 @@ func TestRuleRoutine(t *testing.T) {
t.Run("when a message is sent to update channel", func(t *testing.T) {
rule := gen.With(withQueryForState(t, eval.Normal)).GenerateRef()
folderTitle := "folderName"
ruleFp := ruleWithFolder{rule, folderTitle}.Fingerprint()
evalAppliedChan := make(chan time.Time)
@ -628,8 +627,8 @@ func TestRuleRoutine(t *testing.T) {
require.Greaterf(t, expectedToBeSent, 0, "State manager was expected to return at least one state that can be expired")
t.Run("should do nothing if version in channel is the same", func(t *testing.T) {
ruleInfo.Update(RuleVersionAndPauseStatus{ruleFp, false})
ruleInfo.Update(RuleVersionAndPauseStatus{ruleFp, false}) // second time just to make sure that previous messages were handled
ruleInfo.Update(&Evaluation{rule: rule, folderTitle: folderTitle})
ruleInfo.Update(&Evaluation{rule: rule, folderTitle: folderTitle}) // second time just to make sure that previous messages were handled
actualStates := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
require.Len(t, actualStates, len(states))
@ -638,7 +637,7 @@ func TestRuleRoutine(t *testing.T) {
})
t.Run("should clear the state and expire firing alerts if version in channel is greater", func(t *testing.T) {
ruleInfo.Update(RuleVersionAndPauseStatus{ruleFp + 1, false})
ruleInfo.Update(&Evaluation{rule: models.CopyRule(rule, gen.WithTitle(util.GenerateShortUID())), folderTitle: folderTitle})
require.Eventually(t, func() bool {
return len(sender.Calls()) > 0
@ -905,7 +904,7 @@ func TestRuleRoutine(t *testing.T) {
}
func ruleFactoryFromScheduler(sch *schedule) ruleFactory {
return newRuleFactory(sch.appURL, sch.disableGrafanaFolder, sch.maxAttempts, sch.alertsSender, sch.stateManager, sch.evaluatorFactory, &sch.schedulableAlertRules, sch.clock, sch.rrCfg, sch.metrics, sch.log, sch.tracer, sch.recordingWriter, sch.evalAppliedFunc, sch.stopAppliedFunc)
return newRuleFactory(sch.appURL, sch.disableGrafanaFolder, sch.maxAttempts, sch.alertsSender, sch.stateManager, sch.evaluatorFactory, sch.clock, sch.rrCfg, sch.metrics, sch.log, sch.tracer, sch.recordingWriter, sch.evalAppliedFunc, sch.stopAppliedFunc)
}
func stateForRule(rule *models.AlertRule, ts time.Time, evalState eval.State) *state.State {

@ -113,7 +113,7 @@ func (r *recordingRule) Eval(eval *Evaluation) (bool, *Evaluation) {
}
}
func (r *recordingRule) Update(lastVersion RuleVersionAndPauseStatus) bool {
func (r *recordingRule) Update(_ *Evaluation) bool {
return true
}

@ -17,6 +17,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
"github.com/grafana/grafana/pkg/expr"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
@ -116,7 +117,10 @@ func TestRecordingRule(t *testing.T) {
}
switch rand.Intn(max) + 1 {
case 1:
r.Update(RuleVersionAndPauseStatus{fingerprint(rand.Uint64()), false})
r.Update(&Evaluation{
rule: gen.GenerateRef(),
folderTitle: util.GenerateShortUID(),
})
case 2:
r.Eval(&Evaluation{
scheduledAt: time.Now(),
@ -492,7 +496,7 @@ func TestRecordingRule_Integration(t *testing.T) {
t.Run("status shows evaluation", func(t *testing.T) {
status := process.(*recordingRule).Status()
//TODO: assert "error" to fix test, update to "nodata" in the future
// TODO: assert "error" to fix test, update to "nodata" in the future
require.Equal(t, "error", status.Health)
})
})

@ -87,11 +87,6 @@ func (r *ruleRegistry) keyMap() map[models.AlertRuleKey]struct{} {
return definitionsIDs
}
type RuleVersionAndPauseStatus struct {
Fingerprint fingerprint
IsPaused bool
}
type Evaluation struct {
scheduledAt time.Time
rule *models.AlertRule

@ -302,7 +302,6 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup.
sch.alertsSender,
sch.stateManager,
sch.evaluatorFactory,
&sch.schedulableAlertRules,
sch.clock,
sch.rrCfg,
sch.metrics,
@ -372,9 +371,9 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup.
// if we do not need to eval the rule, check the whether rule was just updated and if it was, notify evaluation routine about that
logger.Debug("Rule has been updated. Notifying evaluation routine")
go func(routine Rule, rule *ngmodels.AlertRule) {
routine.Update(RuleVersionAndPauseStatus{
Fingerprint: ruleWithFolder{rule: rule, folderTitle: folderTitle}.Fingerprint(),
IsPaused: rule.IsPaused,
routine.Update(&Evaluation{
rule: rule,
folderTitle: folderTitle,
})
}(ruleRoutine, item)
updatedRules = append(updatedRules, ngmodels.AlertRuleKeyWithVersion{

Loading…
Cancel
Save