Alerting: rule evaluation loop's update channel to provide version (#52170)

* handler for update message in rule evaluation routine ignores the message if its version greater or equal.
* replace messages to update the channel if it is not empty
pull/52326/head^2
Yuriy Tseretyan 3 years ago committed by GitHub
parent 17ec9cac83
commit a7509ba18b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      pkg/services/ngalert/api/api_ruler.go
  2. 25
      pkg/services/ngalert/schedule/registry.go
  3. 60
      pkg/services/ngalert/schedule/registry_test.go
  4. 20
      pkg/services/ngalert/schedule/schedule.go
  5. 6
      pkg/services/ngalert/schedule/schedule_mock.go
  6. 38
      pkg/services/ngalert/schedule/schedule_unit_test.go

@ -425,7 +425,7 @@ func (srv RulerSrv) updateAlertRulesInGroup(c *models.ReqContext, groupKey ngmod
srv.scheduleService.UpdateAlertRule(ngmodels.AlertRuleKey{
OrgID: c.SignedInUser.OrgId,
UID: rule.Existing.UID,
})
}, rule.Existing.Version+1)
}
for _, rule := range finalChanges.Delete {

@ -72,16 +72,18 @@ func (r *alertRuleInfoRegistry) keyMap() map[models.AlertRuleKey]struct{} {
return definitionsIDs
}
type ruleVersion int64
type alertRuleInfo struct {
evalCh chan *evaluation
updateCh chan struct{}
updateCh chan ruleVersion
ctx context.Context
stop context.CancelFunc
}
func newAlertRuleInfo(parent context.Context) *alertRuleInfo {
ctx, cancel := context.WithCancel(parent)
return &alertRuleInfo{evalCh: make(chan *evaluation), updateCh: make(chan struct{}), ctx: ctx, stop: cancel}
return &alertRuleInfo{evalCh: make(chan *evaluation), updateCh: make(chan ruleVersion), ctx: ctx, stop: cancel}
}
// eval signals the rule evaluation routine to perform the evaluation of the rule. Does nothing if the loop is stopped.
@ -109,10 +111,23 @@ func (a *alertRuleInfo) eval(t time.Time, version int64) (bool, *evaluation) {
}
}
// update signals the rule evaluation routine to update the internal state. Does nothing if the loop is stopped
func (a *alertRuleInfo) update() bool {
// update sends an instruction to the rule evaluation routine to update the scheduled rule to the specified version. The specified version must be later than the current version, otherwise no update will happen.
func (a *alertRuleInfo) update(lastVersion ruleVersion) bool {
// check if the channel is not empty.
msg := lastVersion
select {
case v := <-a.updateCh:
// if it has a version pick the greatest one.
if v > msg {
msg = v
}
case <-a.ctx.Done():
return false
default:
}
select {
case a.updateCh <- struct{}{}:
case a.updateCh <- msg:
return true
case <-a.ctx.Done():
return false

@ -22,11 +22,11 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
}
t.Run("when rule evaluation is not stopped", func(t *testing.T) {
t.Run("Update should send to updateCh", func(t *testing.T) {
t.Run("update should send to updateCh", func(t *testing.T) {
r := newAlertRuleInfo(context.Background())
resultCh := make(chan bool)
go func() {
resultCh <- r.update()
resultCh <- r.update(ruleVersion(rand.Int63()))
}()
select {
case <-r.updateCh:
@ -35,6 +35,58 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
t.Fatal("No message was received on update channel")
}
})
t.Run("update should drop any concurrent sending to updateCh", func(t *testing.T) {
r := newAlertRuleInfo(context.Background())
version1 := ruleVersion(rand.Int31())
version2 := version1 + 1
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
wg.Done()
r.update(version1)
wg.Done()
}()
wg.Wait()
wg.Add(2) // one when time1 is sent, another when go-routine for time2 has started
go func() {
wg.Done()
r.update(version2)
}()
wg.Wait() // at this point tick 1 has already been dropped
select {
case version := <-r.updateCh:
require.Equal(t, version2, version)
case <-time.After(5 * time.Second):
t.Fatal("No message was received on eval channel")
}
})
t.Run("update should drop any concurrent sending to updateCh and use greater version", func(t *testing.T) {
r := newAlertRuleInfo(context.Background())
version1 := ruleVersion(rand.Int31())
version2 := version1 + 1
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
wg.Done()
r.update(version2)
wg.Done()
}()
wg.Wait()
wg.Add(2) // one when time1 is sent, another when go-routine for time2 has started
go func() {
wg.Done()
r.update(version1)
}()
wg.Wait() // at this point tick 1 has already been dropped
select {
case version := <-r.updateCh:
require.Equal(t, version2, version)
case <-time.After(5 * time.Second):
t.Fatal("No message was received on eval channel")
}
})
t.Run("eval should send to evalCh", func(t *testing.T) {
r := newAlertRuleInfo(context.Background())
expected := time.Now()
@ -114,7 +166,7 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
t.Run("Update should do nothing", func(t *testing.T) {
r := newAlertRuleInfo(context.Background())
r.stop()
require.False(t, r.update())
require.False(t, r.update(ruleVersion(rand.Int63())))
})
t.Run("eval should do nothing", func(t *testing.T) {
r := newAlertRuleInfo(context.Background())
@ -155,7 +207,7 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
}
switch rand.Intn(max) + 1 {
case 1:
r.update()
r.update(ruleVersion(rand.Int63()))
case 2:
r.eval(time.Now(), rand.Int63())
case 3:

@ -33,7 +33,7 @@ type ScheduleService interface {
// an error. The scheduler is terminated when this function returns.
Run(context.Context) error
// UpdateAlertRule notifies scheduler that a rule has been changed
UpdateAlertRule(key ngmodels.AlertRuleKey)
UpdateAlertRule(key ngmodels.AlertRuleKey, lastVersion int64)
// UpdateAlertRulesByNamespaceUID notifies scheduler that all rules in a namespace should be updated.
UpdateAlertRulesByNamespaceUID(ctx context.Context, orgID int64, uid string) error
// DeleteAlertRule notifies scheduler that a rule has been changed
@ -159,12 +159,12 @@ func (sch *schedule) Run(ctx context.Context) error {
}
// UpdateAlertRule looks for the active rule evaluation and commands it to update the rule
func (sch *schedule) UpdateAlertRule(key ngmodels.AlertRuleKey) {
func (sch *schedule) UpdateAlertRule(key ngmodels.AlertRuleKey, lastVersion int64) {
ruleInfo, err := sch.registry.get(key)
if err != nil {
return
}
ruleInfo.update()
ruleInfo.update(ruleVersion(lastVersion))
}
// UpdateAlertRulesByNamespaceUID looks for the active rule evaluation for every rule in the given namespace and commands it to update the rule.
@ -181,7 +181,7 @@ func (sch *schedule) UpdateAlertRulesByNamespaceUID(ctx context.Context, orgID i
sch.UpdateAlertRule(ngmodels.AlertRuleKey{
OrgID: orgID,
UID: r.UID,
})
}, r.Version)
}
return nil
@ -336,7 +336,7 @@ func (sch *schedule) schedulePeriodic(ctx context.Context) error {
}
}
func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertRuleKey, evalCh <-chan *evaluation, updateCh <-chan struct{}) error {
func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertRuleKey, evalCh <-chan *evaluation, updateCh <-chan ruleVersion) error {
logger := sch.log.New("uid", key.UID, "org", key.OrgID)
logger.Debug("alert rule routine started")
@ -409,7 +409,15 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
for {
select {
// used by external services (API) to notify that rule is updated.
case <-updateCh:
case version := <-updateCh:
// sometimes it can happen when, for example, the rule evaluation took so long,
// and there were two concurrent messages in updateCh and evalCh, and the eval's one got processed first.
// therefore, at the time when message from updateCh is processed the current rule will have
// at least the same version (or greater) and the state created for the new version of the rule.
if currentRule != nil && int64(version) <= currentRule.Version {
logger.Info("skip updating rule because its current version is actual", "current_version", currentRule.Version, "new_version", version)
continue
}
logger.Info("fetching new version of the rule")
err := retryIfError(func(attempt int64) error {
newRule, newExtraLabels, err := updateRule(grafanaCtx, currentRule)

@ -37,9 +37,9 @@ func (_m *FakeScheduleService) Run(_a0 context.Context) error {
return r0
}
// UpdateAlertRule provides a mock function with given fields: key
func (_m *FakeScheduleService) UpdateAlertRule(key models.AlertRuleKey) {
_m.Called(key)
// UpdateAlertRule provides a mock function with given fields: key, lastVersion
func (_m *FakeScheduleService) UpdateAlertRule(key models.AlertRuleKey, lastVersion int64) {
_m.Called(key, lastVersion)
}
// UpdateAlertRulesByNamespaceUID provides a mock function with given fields: ctx, orgID, uid

@ -371,7 +371,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
go func() {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan struct{}))
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan ruleVersion))
}()
expectedTime := time.UnixMicro(rand.Int63())
@ -487,7 +487,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
go func() {
err := sch.ruleRoutine(ctx, models.AlertRuleKey{}, make(chan *evaluation), make(chan struct{}))
err := sch.ruleRoutine(ctx, models.AlertRuleKey{}, make(chan *evaluation), make(chan ruleVersion))
stoppedChan <- err
}()
@ -509,7 +509,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
go func() {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan struct{}))
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan ruleVersion))
}()
expectedTime := time.UnixMicro(rand.Int63())
@ -561,7 +561,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
go func() {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan struct{}))
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan ruleVersion))
}()
expectedTime := time.UnixMicro(rand.Int63())
@ -604,7 +604,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
t.Run("should fetch the alert rule from database", func(t *testing.T) {
evalChan := make(chan *evaluation)
evalAppliedChan := make(chan time.Time)
updateChan := make(chan struct{})
updateChan := make(chan ruleVersion)
sch, ruleStore, _, _, _, _ := createSchedule(evalAppliedChan)
@ -615,7 +615,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
t.Cleanup(cancel)
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, updateChan)
}()
updateChan <- struct{}{}
updateChan <- ruleVersion(rule.Version)
// wait for command to be executed
var queries []interface{}
@ -647,7 +647,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
t.Run("should retry when database fails", func(t *testing.T) {
evalAppliedChan := make(chan time.Time)
updateChan := make(chan struct{})
updateChan := make(chan ruleVersion)
sch, ruleStore, _, _, _, _ := createSchedule(evalAppliedChan)
sch.maxAttempts = rand.Int63n(4) + 1
@ -666,7 +666,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
}
return errors.New("TEST")
}
updateChan <- struct{}{}
updateChan <- ruleVersion(rule.Version)
var queries []interface{}
require.Eventuallyf(t, func() bool {
@ -699,7 +699,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
evalChan := make(chan *evaluation)
evalAppliedChan := make(chan time.Time)
updateChan := make(chan struct{})
updateChan := make(chan ruleVersion)
ctx := context.Background()
sch, ruleStore, _, _, _, alertsRouter := createSchedule(evalAppliedChan)
@ -751,14 +751,14 @@ func TestSchedule_ruleRoutine(t *testing.T) {
return nil
}
updateChan <- struct{}{}
updateChan <- ruleVersion(rule.Version)
wg.Wait()
newRule := rule
newRule.Version++
ruleStore.PutRule(ctx, &newRule)
wg.Add(1)
updateChan <- struct{}{}
updateChan <- ruleVersion(newRule.Version)
wg.Wait()
require.Eventually(t, func() bool {
@ -821,7 +821,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
go func() {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan struct{}))
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan ruleVersion))
}()
evalChan <- &evaluation{
@ -850,12 +850,14 @@ func TestSchedule_UpdateAlertRule(t *testing.T) {
sch := setupSchedulerWithFakeStores(t)
key := generateRuleKey()
info, _ := sch.registry.getOrCreateInfo(context.Background(), key)
version := rand.Int63()
go func() {
sch.UpdateAlertRule(key)
sch.UpdateAlertRule(key, version)
}()
select {
case <-info.updateCh:
case v := <-info.updateCh:
require.Equal(t, ruleVersion(version), v)
case <-time.After(5 * time.Second):
t.Fatal("No message was received on update channel")
}
@ -865,14 +867,14 @@ func TestSchedule_UpdateAlertRule(t *testing.T) {
key := generateRuleKey()
info, _ := sch.registry.getOrCreateInfo(context.Background(), key)
info.stop()
sch.UpdateAlertRule(key)
sch.UpdateAlertRule(key, rand.Int63())
})
})
t.Run("when rule does not exist", func(t *testing.T) {
t.Run("should exit", func(t *testing.T) {
sch := setupSchedulerWithFakeStores(t)
key := generateRuleKey()
sch.UpdateAlertRule(key)
sch.UpdateAlertRule(key, rand.Int63())
})
})
}
@ -884,7 +886,7 @@ func TestSchedule_DeleteAlertRule(t *testing.T) {
key := generateRuleKey()
info, _ := sch.registry.getOrCreateInfo(context.Background(), key)
sch.DeleteAlertRule(key)
require.False(t, info.update())
require.False(t, info.update(ruleVersion(rand.Int63())))
success, dropped := info.eval(time.Now(), 1)
require.False(t, success)
require.Nilf(t, dropped, "expected no dropped evaluations but got one")
@ -896,7 +898,7 @@ func TestSchedule_DeleteAlertRule(t *testing.T) {
info, _ := sch.registry.getOrCreateInfo(context.Background(), key)
info.stop()
sch.DeleteAlertRule(key)
require.False(t, info.update())
require.False(t, info.update(ruleVersion(rand.Int63())))
success, dropped := info.eval(time.Now(), 1)
require.False(t, success)
require.Nilf(t, dropped, "expected no dropped evaluations but got one")

Loading…
Cancel
Save