Alerting: Sequential evaluation of rules in group (#98829)

* introduce RulesGroupComparer

* extract runJob method

* implement sequential evaluation

* Make sequence building testable & add comments

* Also run callback in recording rules + add tests

* Improve tests

* Address PR comments

---------

Co-authored-by: William Wernert <william.wernert@grafana.com>
pull/103250/head
Yuri Tseretyan 3 months ago committed by GitHub
parent ba5c38b078
commit dc0083d879
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 2
      packages/grafana-data/src/types/featureToggles.gen.ts
  2. 2
      pkg/services/featuremgmt/registry.go
  3. 2
      pkg/services/featuremgmt/toggles_gen.go
  4. 24
      pkg/services/featuremgmt/toggles_gen.json
  5. 22
      pkg/services/ngalert/models/alert_rule.go
  6. 5
      pkg/services/ngalert/schedule/alert_rule.go
  7. 217
      pkg/services/ngalert/schedule/alert_rule_test.go
  8. 4
      pkg/services/ngalert/schedule/recording_rule.go
  9. 237
      pkg/services/ngalert/schedule/recording_rule_test.go
  10. 1
      pkg/services/ngalert/schedule/registry.go
  11. 54
      pkg/services/ngalert/schedule/schedule.go
  12. 105
      pkg/services/ngalert/schedule/schedule_unit_test.go
  13. 127
      pkg/services/ngalert/schedule/sequence.go
  14. 153
      pkg/services/ngalert/schedule/sequence_test.go

@ -550,7 +550,7 @@ export interface FeatureToggles {
*/
newFolderPicker?: boolean;
/**
* Distributes alert rule evaluations more evenly over time, including spreading out rules within the same group
* Distributes alert rule evaluations more evenly over time, including spreading out rules within the same group. Disables sequential evaluation if enabled.
*/
jitterAlertRulesWithinGroups?: boolean;
/**

@ -931,7 +931,7 @@ var (
},
{
Name: "jitterAlertRulesWithinGroups",
Description: "Distributes alert rule evaluations more evenly over time, including spreading out rules within the same group",
Description: "Distributes alert rule evaluations more evenly over time, including spreading out rules within the same group. Disables sequential evaluation if enabled.",
FrontendOnly: false,
Stage: FeatureStagePublicPreview,
Owner: grafanaAlertingSquad,

@ -500,7 +500,7 @@ const (
FlagNewFolderPicker = "newFolderPicker"
// FlagJitterAlertRulesWithinGroups
// Distributes alert rule evaluations more evenly over time, including spreading out rules within the same group
// Distributes alert rule evaluations more evenly over time, including spreading out rules within the same group. Disables sequential evaluation if enabled.
FlagJitterAlertRulesWithinGroups = "jitterAlertRulesWithinGroups"
// FlagOnPremToCloudMigrations

@ -437,6 +437,21 @@
"expression": "false"
}
},
{
"metadata": {
"name": "alertingRuleSequentialEvaluation",
"resourceVersion": "1742580089675",
"creationTimestamp": "2025-03-21T18:01:29Z",
"deletionTimestamp": "2025-03-31T17:26:04Z"
},
"spec": {
"description": "Enables the alerting rule sequential evaluation feature",
"stage": "preview",
"codeowner": "@grafana/alerting-squad",
"hideFromAdminPage": true,
"hideFromDocs": true
}
},
{
"metadata": {
"name": "alertingRuleRecoverDeleted",
@ -2324,11 +2339,14 @@
{
"metadata": {
"name": "jitterAlertRulesWithinGroups",
"resourceVersion": "1718727528075",
"creationTimestamp": "2024-01-18T18:48:11Z"
"resourceVersion": "1742581260617",
"creationTimestamp": "2024-01-18T18:48:11Z",
"annotations": {
"grafana.app/updatedTimestamp": "2025-03-21 18:21:00.617577 +0000 UTC"
}
},
"spec": {
"description": "Distributes alert rule evaluations more evenly over time, including spreading out rules within the same group",
"description": "Distributes alert rule evaluations more evenly over time, including spreading out rules within the same group. Disables sequential evaluation if enabled.",
"stage": "preview",
"codeowner": "@grafana/alerting-squad",
"requiresRestart": true,

@ -7,6 +7,7 @@ import (
"fmt"
"hash/fnv"
"maps"
"slices"
"sort"
"strconv"
"strings"
@ -993,13 +994,22 @@ func ValidateRuleGroupInterval(intervalSeconds, baseIntervalSeconds int64) error
type RulesGroup []*AlertRule
func RulesGroupComparer(a, b *AlertRule) int {
if a.RuleGroupIndex < b.RuleGroupIndex {
return -1
} else if a.RuleGroupIndex > b.RuleGroupIndex {
return 1
}
if a.ID < b.ID {
return -1
} else if a.ID > b.ID {
return 1
}
return 0
}
func (g RulesGroup) SortByGroupIndex() {
sort.Slice(g, func(i, j int) bool {
if g[i].RuleGroupIndex == g[j].RuleGroupIndex {
return g[i].ID < g[j].ID
}
return g[i].RuleGroupIndex < g[j].RuleGroupIndex
})
slices.SortFunc(g, RulesGroupComparer)
}
func SortAlertRulesByGroupIndex(rules []AlertRule) {

@ -341,7 +341,10 @@ func (a *alertRule) Run() error {
}
}
}()
if ctx.afterEval != nil {
logger.Debug("Calling afterEval")
ctx.afterEval()
}
case <-grafanaCtx.Done():
reason := grafanaCtx.Err()

@ -19,6 +19,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
alertingModels "github.com/grafana/alerting/models"
@ -278,6 +279,222 @@ func TestAlertRuleIdentifier(t *testing.T) {
})
}
func TestAlertRuleAfterEval(t *testing.T) {
gen := models.RuleGen.With(models.RuleGen.WithOrgID(123))
type testContext struct {
rule *models.AlertRule
process Rule
evalDoneChan chan time.Time
afterEvalCh chan struct{}
callCount *atomic.Int32
mutex *sync.Mutex
stateManager *state.Manager
sender *SyncAlertsSenderMock
}
// Configuration struct for test setup
type setupConfig struct {
queryState eval.State
isPaused bool
}
// Default configuration
defaultSetupConfig := setupConfig{
queryState: eval.Normal,
isPaused: false,
}
setup := func(t *testing.T, cfg setupConfig) *testContext {
t.Helper()
evalDoneChan := make(chan time.Time, 1) // Buffer to avoid blocking
afterEvalCh := make(chan struct{}, 1) // Buffer to avoid blocking
callCount := atomic.NewInt32(0)
mutex := &sync.Mutex{}
sender := NewSyncAlertsSenderMock()
sender.EXPECT().Send(mock.Anything, mock.Anything, mock.Anything).Return()
ruleStore := newFakeRulesStore()
instanceStore := &state.FakeInstanceStore{}
registry := prometheus.NewPedanticRegistry()
sch := setupScheduler(t, ruleStore, instanceStore, registry, sender, nil, nil)
// Set up the evaluation callback
sch.evalAppliedFunc = func(key models.AlertRuleKey, t time.Time) {
evalDoneChan <- t
}
rule := gen.With(withQueryForState(t, cfg.queryState)).GenerateRef()
if cfg.isPaused {
rule.IsPaused = true
}
ruleStore.PutRule(context.Background(), rule)
ruleFactory := ruleFactoryFromScheduler(sch)
process := ruleFactory.new(context.Background(), rule)
return &testContext{
rule: rule,
process: process,
evalDoneChan: evalDoneChan,
afterEvalCh: afterEvalCh,
callCount: callCount,
mutex: mutex,
stateManager: sch.stateManager,
sender: sender,
}
}
runTest := func(t *testing.T, ctx *testContext, expectCallbackCalled bool) {
t.Helper()
now := time.Now()
eval := &Evaluation{
scheduledAt: now,
rule: ctx.rule,
folderTitle: "test-folder",
afterEval: func() {
ctx.callCount.Inc()
select {
case ctx.afterEvalCh <- struct{}{}:
default:
// Channel is full, which is fine for tests
}
},
}
// Start the rule processing goroutine
go func() {
_ = ctx.process.Run()
}()
// Send the evaluation
ctx.process.Eval(eval)
// Wait for evaluation to complete
select {
case <-ctx.evalDoneChan:
// Evaluation was completed
case <-time.After(5 * time.Second):
t.Fatal("Evaluation was not completed in time")
}
// Wait for potential afterEval execution
waitDuration := 500 * time.Millisecond
if expectCallbackCalled {
select {
case <-ctx.afterEvalCh:
// Success - afterEval was called
case <-time.After(5 * time.Second):
t.Fatal("afterEval callback was not called")
}
} else {
// Just wait a bit to make sure callback isn't called
time.Sleep(waitDuration)
}
// Verify callback count
count := ctx.callCount.Load()
if expectCallbackCalled {
require.Equal(t, int32(1), count, "afterEval callback should have been called exactly once")
} else {
require.Equal(t, int32(0), count, "afterEval callback should not have been called")
}
}
t.Run("afterEval callback is called after successful evaluation", func(t *testing.T) {
ctx := setup(t, defaultSetupConfig)
runTest(t, ctx, true)
})
t.Run("afterEval callback is called even when evaluation produces errors", func(t *testing.T) {
ctx := setup(t, setupConfig{
queryState: eval.Error,
isPaused: false,
})
// The important part of this test is confirming the callback works
// even with an eval.Error state, which is checked by runTest
runTest(t, ctx, true)
})
t.Run("afterEval callback is called when rule is paused", func(t *testing.T) {
ctx := setup(t, setupConfig{
queryState: eval.Normal,
isPaused: true,
})
runTest(t, ctx, true)
})
t.Run("afterEval callback is called before stopping rule evaluation", func(t *testing.T) {
ctx := setup(t, defaultSetupConfig)
// Start the rule processing goroutine
go func() {
_ = ctx.process.Run()
}()
// Create a channel to signal when Stop is called
stopSignalCh := make(chan struct{})
// Send an evaluation that will be pending when we stop the rule
now := time.Now()
eval := &Evaluation{
scheduledAt: now,
rule: ctx.rule,
folderTitle: "test-folder",
afterEval: func() {
// Wait until we know Stop has been called
select {
case <-stopSignalCh:
// Stop was called before this callback executed
case <-time.After(100 * time.Millisecond):
t.Error("afterEval callback executed but Stop signal wasn't received")
}
ctx.callCount.Inc()
select {
case ctx.afterEvalCh <- struct{}{}:
default:
// Channel is full, which is fine for tests
}
},
}
ctx.process.Eval(eval)
// Create a stopChan to verify rule stopping
stopChan := make(chan struct{})
go func() {
// Signal that we're about to call Stop
close(stopSignalCh)
ctx.process.Stop(nil)
close(stopChan)
}()
// Verify afterEval was called during stopping
select {
case <-ctx.afterEvalCh:
// Success - afterEval was called during stopping
case <-time.After(5 * time.Second):
t.Fatal("afterEval callback was not called during rule stopping")
}
// Verify the rule stopped properly
select {
case <-stopChan:
// Success - the rule stopped
case <-time.After(5 * time.Second):
t.Fatal("Rule did not stop in time")
}
// Verify callback count
count := ctx.callCount.Load()
require.Equal(t, int32(1), count, "afterEval callback should have been called exactly once during stopping")
})
}
func blankRuleForTests(ctx context.Context, key models.AlertRuleKeyWithGroup) *alertRule {
managerCfg := state.ManagerCfg{
Historian: &state.FakeHistorian{},

@ -144,6 +144,10 @@ func (r *recordingRule) Run() error {
// TODO: Either implement me or remove from alert rules once investigated.
r.doEvaluate(ctx, eval)
// Call afterEval callback if it exists
if eval.afterEval != nil {
eval.afterEval()
}
case <-ctx.Done():
r.logger.Debug("Stopping recording rule routine")
return nil

@ -15,6 +15,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
@ -192,6 +193,242 @@ func TestRecordingRule_Integration(t *testing.T) {
})
}
func TestRecordingRuleAfterEval(t *testing.T) {
gen := models.RuleGen.With(models.RuleGen.WithAllRecordingRules(), models.RuleGen.WithOrgID(123))
type testContext struct {
rule *models.AlertRule
process Rule
evalDoneChan chan time.Time
afterEvalCh chan struct{}
callCount *atomic.Int32
mutex *sync.Mutex
scheduler *schedule
ruleStore *fakeRulesStore // Use the concrete type for access to getNamespaceTitle
}
// Configuration struct for test setup
type setupConfig struct {
queryHealth string
enableRecordingRules bool
isPaused bool
// Add any other configurable parameters here
}
// Default configuration
defaultSetupConfig := setupConfig{
queryHealth: "ok",
enableRecordingRules: true,
isPaused: false,
}
setup := func(t *testing.T, cfg setupConfig) *testContext {
t.Helper()
ruleStore := newFakeRulesStore()
reg := prometheus.NewPedanticRegistry()
sch := setupScheduler(t, ruleStore, nil, reg, nil, nil, nil)
sch.recordingWriter = writer.FakeWriter{}
if !cfg.enableRecordingRules {
sch.rrCfg.Enabled = false
}
rule := gen.With(withQueryForHealth(cfg.queryHealth)).GenerateRef()
if cfg.isPaused {
rule.IsPaused = true
}
ruleStore.PutRule(context.Background(), rule)
ruleFactory := ruleFactoryFromScheduler(sch)
process := ruleFactory.new(context.Background(), rule)
evalDoneChan := make(chan time.Time, 1) // Buffer to avoid blocking
afterEvalCh := make(chan struct{}, 1) // Buffer to avoid blocking
callCount := atomic.NewInt32(0)
mutex := &sync.Mutex{}
process.(*recordingRule).evalAppliedHook = func(_ models.AlertRuleKey, t time.Time) {
evalDoneChan <- t
}
// Start the rule processing goroutine
go func() {
_ = process.Run()
}()
return &testContext{
rule: rule,
process: process,
evalDoneChan: evalDoneChan,
afterEvalCh: afterEvalCh,
callCount: callCount,
mutex: mutex,
scheduler: sch,
ruleStore: ruleStore,
}
}
runTest := func(t *testing.T, ctx *testContext, expectCallbackCalled bool) {
t.Helper()
now := time.Now()
folderTitle := ctx.ruleStore.getNamespaceTitle(ctx.rule.NamespaceUID)
eval := &Evaluation{
scheduledAt: now,
rule: ctx.rule,
folderTitle: folderTitle,
afterEval: func() {
ctx.callCount.Inc()
select {
case ctx.afterEvalCh <- struct{}{}:
default:
// Channel is full, which is fine for tests
}
},
}
// Send the evaluation
ctx.process.Eval(eval)
// For enabled rules that are not paused, we should see the evaluation complete
if ctx.scheduler.rrCfg.Enabled && !ctx.rule.IsPaused {
select {
case <-ctx.evalDoneChan:
// Evaluation was completed
case <-time.After(5 * time.Second):
t.Fatal("Evaluation was not completed in time")
}
}
// Wait for potential afterEval execution
waitDuration := 500 * time.Millisecond
if expectCallbackCalled {
select {
case <-ctx.afterEvalCh:
// Success - afterEval was called
case <-time.After(5 * time.Second):
t.Fatal("afterEval callback was not called")
}
} else {
// Just wait a bit to make sure callback isn't called
time.Sleep(waitDuration)
}
// Verify callback count
count := ctx.callCount.Load()
if expectCallbackCalled {
require.Equal(t, int32(1), count, "afterEval callback should have been called exactly once")
} else {
require.Equal(t, int32(0), count, "afterEval callback should not have been called")
}
}
t.Run("afterEval callback is called after successful evaluation", func(t *testing.T) {
ctx := setup(t, defaultSetupConfig)
runTest(t, ctx, true)
})
t.Run("afterEval callback is called even when evaluation fails", func(t *testing.T) {
ctx := setup(t, setupConfig{
queryHealth: "error",
enableRecordingRules: true,
isPaused: false,
})
runTest(t, ctx, true)
// Verify that the rule evaluation did indeed fail
status := ctx.process.(*recordingRule).Status()
require.Equal(t, "error", status.Health)
require.NotNil(t, status.LastError)
})
t.Run("afterEval callback is not called when recording rule feature is disabled", func(t *testing.T) {
ctx := setup(t, setupConfig{
queryHealth: "ok",
enableRecordingRules: false,
isPaused: false,
})
runTest(t, ctx, false)
})
t.Run("afterEval callback is called before stopping rule evaluation", func(t *testing.T) {
ctx := setup(t, defaultSetupConfig)
// Create a channel to signal when Stop is called
stopSignalCh := make(chan struct{})
// Send an evaluation that will be pending when we stop the rule
now := time.Now()
folderTitle := ctx.ruleStore.getNamespaceTitle(ctx.rule.NamespaceUID)
eval := &Evaluation{
scheduledAt: now,
rule: ctx.rule,
folderTitle: folderTitle,
afterEval: func() {
// Wait until we know Stop has been called
select {
case <-stopSignalCh:
// Stop was called before this callback executed
case <-time.After(100 * time.Millisecond):
t.Error("afterEval callback executed but Stop signal wasn't received")
}
ctx.callCount.Inc()
select {
case ctx.afterEvalCh <- struct{}{}:
default:
// Channel is full, which is fine for tests
}
},
}
ctx.process.Eval(eval)
// Create a stopChan to verify rule stopping
stopChan := make(chan struct{})
go func() {
// Signal that we're about to call Stop
close(stopSignalCh)
ctx.process.Stop(nil)
close(stopChan)
}()
// Verify afterEval was called during stopping
select {
case <-ctx.afterEvalCh:
// Success - afterEval was called during stopping
case <-time.After(5 * time.Second):
t.Fatal("afterEval callback was not called during rule stopping")
}
// Verify the rule stopped properly
select {
case <-stopChan:
// Success - the rule stopped
case <-time.After(5 * time.Second):
t.Fatal("Rule did not stop in time")
}
// Verify callback count
count := ctx.callCount.Load()
require.Equal(t, int32(1), count, "afterEval callback should have been called exactly once during stopping")
})
t.Run("afterEval callback is still called when rule is paused", func(t *testing.T) {
ctx := setup(t, setupConfig{
queryHealth: "ok",
enableRecordingRules: true,
isPaused: true,
})
runTest(t, ctx, true)
// Verify the rule status
status := ctx.process.(*recordingRule).Status()
require.Equal(t, "unknown", status.Health, "Paused rule should have 'unknown' health since it's not evaluated")
})
}
func testRecordingRule_Integration(t *testing.T, writeTarget *writer.TestRemoteWriteTarget, writer RecordingWriter, writerReg *prometheus.Registry, dsUID string) {
gen := models.RuleGen.With(models.RuleGen.WithAllRecordingRules(), models.RuleGen.WithOrgID(123))
ruleStore := newFakeRulesStore()

@ -91,6 +91,7 @@ type Evaluation struct {
scheduledAt time.Time
rule *models.AlertRule
folderTitle string
afterEval func()
}
func (e *Evaluation) Fingerprint() fingerprint {

@ -4,8 +4,6 @@ import (
"context"
"fmt"
"net/url"
"slices"
"strings"
"time"
"github.com/benbjohnson/clock"
@ -390,31 +388,14 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup.
sch.log.Warn("Unable to obtain folder titles for some rules", "missingFolderUIDToRuleUID", missingFolder)
}
var step int64 = 0
// jitter the start time based on the base interval and total scheduled items
var step int64
if len(readyToRun) > 0 {
step = sch.baseInterval.Nanoseconds() / int64(len(readyToRun))
}
slices.SortFunc(readyToRun, func(a, b readyToRunItem) int {
return strings.Compare(a.rule.UID, b.rule.UID)
})
for i := range readyToRun {
item := readyToRun[i]
time.AfterFunc(time.Duration(int64(i)*step), func() {
key := item.rule.GetKey()
success, dropped := item.ruleRoutine.Eval(&item.Evaluation)
if !success {
sch.log.Debug("Scheduled evaluation was canceled because evaluation routine was stopped", append(key.LogContext(), "time", tick)...)
return
}
if dropped != nil {
sch.log.Warn("Tick dropped because alert rule evaluation is too slow", append(key.LogContext(), "time", tick, "droppedTick", dropped.scheduledAt)...)
orgID := fmt.Sprint(key.OrgID)
sch.metrics.EvaluationMissed.WithLabelValues(orgID, item.rule.Title).Inc()
}
})
}
sequences := sch.buildSequences(readyToRun, sch.runJobFn)
sch.runSequences(sequences, step)
// Stop old routines for rules that got restarted.
for _, oldRoutine := range restartedRules {
@ -427,5 +408,32 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup.
toDelete = append(toDelete, key)
}
sch.deleteAlertRule(ctx, toDelete...)
return readyToRun, registeredDefinitions, updatedRules
}
// runJobFn sends the scheduled evaluation to the evaluation routine, optionally with a previous item to log the trigger source.
func (sch *schedule) runJobFn(next readyToRunItem, prev ...readyToRunItem) func() {
return func() {
if len(prev) > 0 {
sch.log.Debug("Rule evaluation triggered by previous rule", append(next.rule.GetKey().LogContext(), "previousRule", prev[0].rule.UID)...)
}
key := next.rule.GetKey()
success, dropped := next.ruleRoutine.Eval(&next.Evaluation)
if !success {
sch.log.Debug("Scheduled evaluation was canceled because evaluation routine was stopped", append(key.LogContext(), "time", next.scheduledAt)...)
return
}
if dropped != nil {
sch.log.Warn("Tick dropped because alert rule evaluation is too slow", append(key.LogContext(), "time", next.scheduledAt, "droppedTick", dropped.scheduledAt)...)
orgID := fmt.Sprint(key.OrgID)
sch.metrics.EvaluationMissed.WithLabelValues(orgID, next.rule.Title).Inc()
}
}
}
func (sch *schedule) runSequences(sequences []sequence, step int64) {
for i := range sequences {
time.AfterFunc(time.Duration(int64(i)*step), sch.runJobFn(readyToRunItem(sequences[i])))
}
}

@ -8,7 +8,8 @@ import (
"fmt"
"math/rand"
"net/url"
"slices"
"sort"
"sync"
"testing"
"time"
@ -519,7 +520,7 @@ func TestProcessTicks(t *testing.T) {
require.Emptyf(t, updated, "No rules should be updated")
})
t.Run("after 12th tick no status should be available", func(t *testing.T) {
t.Run("after 17th tick no status should be available", func(t *testing.T) {
_, ok := sched.Status(alertRule1.GetKey())
require.False(t, ok, "status for a rule that was deleted should not be available")
_, ok = sched.Status(alertRule2.GetKey())
@ -533,26 +534,102 @@ func TestProcessTicks(t *testing.T) {
ruleStore.rules = map[string]*models.AlertRule{}
ruleStore.PutRule(context.Background(), rules...)
expectedUids := make([]string, 0, len(rules))
for _, rule := range rules {
expectedUids = append(expectedUids, rule.UID)
}
slices.Sort(expectedUids)
tick = tick.Add(cfg.BaseInterval)
scheduled, stopped, updated := sched.processTick(ctx, dispatcherGroup, tick)
require.Emptyf(t, stopped, "None rules are expected to be stopped")
require.Emptyf(t, updated, "None rules are expected to be updated")
require.Len(t, scheduled, len(rules), "All rules should be scheduled in this tick")
})
t.Run("sequence should be evaluated in the correct order", func(t *testing.T) {
rules := gen.With(gen.WithOrgID(mainOrgID), gen.WithInterval(cfg.BaseInterval), gen.WithPrometheusOriginalRuleDefinition("def")).GenerateManyRef(10, 20)
ruleStore.rules = map[string]*models.AlertRule{}
ruleStore.PutRule(context.Background(), rules...)
// Create tracking for evaluation order by group
evalOrderByGroup := make(map[string][]string)
mutex := sync.Mutex{}
// Replace evalAppliedFunc to track order by group
origEvalAppliedFunc := sched.evalAppliedFunc
sched.evalAppliedFunc = func(alertDefKey models.AlertRuleKey, now time.Time) {
// Find corresponding rule
var rule *models.AlertRule
for _, r := range rules {
if r.GetKey() == alertDefKey {
rule = r
break
}
}
if rule != nil {
groupKey := fmt.Sprintf("%s;%s", ruleStore.getNamespaceTitle(rule.NamespaceUID), rule.RuleGroup)
mutex.Lock()
evalOrderByGroup[groupKey] = append(evalOrderByGroup[groupKey], rule.UID)
mutex.Unlock()
}
origEvalAppliedFunc(alertDefKey, now)
}
defer func() {
sched.evalAppliedFunc = origEvalAppliedFunc
}()
tick = tick.Add(cfg.BaseInterval)
scheduled, _, _ := sched.processTick(ctx, dispatcherGroup, tick)
require.NotEmpty(t, scheduled)
// Wait for all evaluations to complete
time.Sleep(100 * time.Millisecond)
// Group rules by their group for expected order
expectedOrderByGroup := make(map[string][]string)
for _, rule := range rules {
groupKey := fmt.Sprintf("%s;%s", ruleStore.getNamespaceTitle(rule.NamespaceUID), rule.RuleGroup)
expectedOrderByGroup[groupKey] = append(expectedOrderByGroup[groupKey], rule.UID)
}
// Sort each group's rules by title
for _, ruleUIDs := range expectedOrderByGroup {
rulesByUID := make(map[string]*models.AlertRule)
for _, rule := range rules {
rulesByUID[rule.UID] = rule
}
actualUids := make([]string, 0, len(scheduled))
for _, rule := range scheduled {
actualUids = append(actualUids, rule.rule.UID)
sort.Slice(ruleUIDs, func(i, j int) bool {
return rulesByUID[ruleUIDs[i]].Title < rulesByUID[ruleUIDs[j]].Title
})
}
require.Len(t, scheduled, len(rules))
assert.Truef(t, slices.IsSorted(actualUids), "The scheduler rules should be sorted by UID but they aren't")
require.Equal(t, expectedUids, actualUids)
// Verify that rules within each group were evaluated in correct order
for groupKey, expectedUIDs := range expectedOrderByGroup {
actualUIDs, evaluated := evalOrderByGroup[groupKey]
if !evaluated {
// Some groups might not be evaluated during the test
continue
}
if len(actualUIDs) > 1 { // Only check order for groups with multiple rules
// Convert back to rule titles for clearer error messages
rulesByUID := make(map[string]*models.AlertRule)
for _, rule := range rules {
rulesByUID[rule.UID] = rule
}
expectedTitles := make([]string, 0, len(expectedUIDs))
for _, uid := range expectedUIDs {
expectedTitles = append(expectedTitles, rulesByUID[uid].Title)
}
actualTitles := make([]string, 0, len(actualUIDs))
for _, uid := range actualUIDs {
actualTitles = append(actualTitles, rulesByUID[uid].Title)
}
assert.Equal(t, expectedTitles, actualTitles, "Rules in group %s were not evaluated in expected order", groupKey)
}
}
})
}

@ -0,0 +1,127 @@
package schedule
import (
"cmp"
"slices"
"strings"
models "github.com/grafana/grafana/pkg/services/ngalert/models"
)
// sequence represents a chain of rules that should be evaluated in order.
// It is a convience type that wraps readyToRunItem as an indicator of what
// is being represented.
type sequence readyToRunItem
type groupKey struct {
folderTitle string
folderUID string
groupName string
}
// buildSequences organizes rules into evaluation sequences where rules in the same group
// are chained together. The first rule in each group will trigger the evaluation of subsequent
// rules in that group through the afterEval callback.
//
// For example, if we have rules A, B, C in group G1 and rules D, E in group G2:
// - A will have afterEval set to evaluate B
// - B will have afterEval set to evaluate C
// - D will have afterEval set to evaluate E
//
// The function returns a slice of sequences, where each sequence represents a chain of rules
// that should be evaluated in order.
//
// NOTE: This currently only chains rules in imported groups.
func (sch *schedule) buildSequences(items []readyToRunItem, runJobFn func(next readyToRunItem, prev ...readyToRunItem) func()) []sequence {
// Step 1: Group rules by their folder and group name
groups := map[groupKey][]readyToRunItem{}
var keys []groupKey
for _, item := range items {
g := groupKey{
folderTitle: item.folderTitle,
folderUID: item.rule.NamespaceUID,
groupName: item.rule.RuleGroup,
}
i, ok := groups[g]
if !ok {
keys = append(keys, g)
}
groups[g] = append(i, item)
}
// Step 2: Sort group keys to ensure consistent ordering
slices.SortFunc(keys, func(a, b groupKey) int {
return cmp.Or(
cmp.Compare(a.folderTitle, b.folderTitle),
cmp.Compare(a.folderUID, b.folderUID),
cmp.Compare(a.groupName, b.groupName),
)
})
// Step 3: Build evaluation sequences for each group
result := make([]sequence, 0, len(items))
for _, key := range keys {
groupItems := groups[key]
if sch.shouldEvaluateSequentially(groupItems) {
result = append(result, sch.buildSequence(key, groupItems, runJobFn))
continue
}
for _, item := range groupItems {
result = append(result, sequence(item))
}
}
// sort the sequences by UID
slices.SortFunc(result, func(a, b sequence) int {
return strings.Compare(a.rule.UID, b.rule.UID)
})
return result
}
func (sch *schedule) buildSequence(groupKey groupKey, groupItems []readyToRunItem, runJobFn func(next readyToRunItem, prev ...readyToRunItem) func()) sequence {
if len(groupItems) < 2 {
return sequence(groupItems[0])
}
slices.SortFunc(groupItems, func(a, b readyToRunItem) int {
return models.RulesGroupComparer(a.rule, b.rule)
})
// iterate over the group items backwards to set the afterEval callback
for i := len(groupItems) - 2; i >= 0; i-- {
groupItems[i].Evaluation.afterEval = runJobFn(groupItems[i+1], groupItems[i])
}
uids := make([]string, 0, len(groupItems))
for _, item := range groupItems {
uids = append(uids, item.rule.UID)
}
sch.log.Debug("Sequence created", "folder", groupKey.folderTitle, "group", groupKey.groupName, "sequence", strings.Join(uids, "->"))
return sequence(groupItems[0])
}
func (sch *schedule) shouldEvaluateSequentially(groupItems []readyToRunItem) bool {
// if jitter by rule is enabled, we can't evaluate rules sequentially
if sch.jitterEvaluations == JitterByRule {
return false
}
// if there is only one rule, there are no rules to chain
if len(groupItems) == 1 {
return false
}
// only evaluate rules in imported groups sequentially
for _, item := range groupItems {
if item.rule.ImportedFromPrometheus() {
return true
}
}
// default to false
return false
}

@ -0,0 +1,153 @@
package schedule
import (
"testing"
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)
type fakeSequenceRule struct {
// these fields help with debugging tests
UID string
Group string
}
func (r *fakeSequenceRule) Eval(e *Evaluation) (bool, *Evaluation) {
if e.afterEval != nil {
e.afterEval()
}
return true, nil
}
func (r *fakeSequenceRule) Run() error {
return nil
}
func (r *fakeSequenceRule) Stop(reason error) {
}
func (r *fakeSequenceRule) Update(e *Evaluation) bool {
return true
}
func (r *fakeSequenceRule) Type() models.RuleType {
return models.RuleTypeAlerting
}
func (r *fakeSequenceRule) Identifier() models.AlertRuleKeyWithGroup {
return models.AlertRuleKeyWithGroup{
AlertRuleKey: models.AlertRuleKey{
UID: r.UID,
},
RuleGroup: r.Group,
}
}
func (r *fakeSequenceRule) Status() models.RuleStatus {
return models.RuleStatus{}
}
func TestSequence(t *testing.T) {
ruleStore := newFakeRulesStore()
reg := prometheus.NewPedanticRegistry()
sch := setupScheduler(t, ruleStore, nil, reg, nil, nil, nil)
gen := models.RuleGen.With(models.RuleGen.WithNamespaceUID("ns1"))
t.Run("should set callbacks in correct order", func(t *testing.T) {
nextByGroup := map[string][]string{}
prevByGroup := map[string][]string{}
callback := func(next readyToRunItem, prev ...readyToRunItem) func() {
return func() {
group := next.rule.RuleGroup
nextByGroup[group] = append(nextByGroup[group], next.rule.UID)
if len(prev) > 0 {
prevByGroup[group] = append(prevByGroup[group], prev[0].rule.UID)
}
// Ensure we call the eval the next rule
next.ruleRoutine.Eval(&next.Evaluation)
}
}
// rg1 : 1, 2
// rg2 : 3, 4 (prometheus), 5 (prometheus)
items := []readyToRunItem{
{
ruleRoutine: &fakeSequenceRule{UID: "3", Group: "rg2"},
Evaluation: Evaluation{
rule: gen.With(
models.RuleGen.WithUID("3"),
models.RuleGen.WithGroupIndex(1),
models.RuleGen.WithGroupName("rg2"),
).GenerateRef(),
folderTitle: "folder1",
},
},
{
ruleRoutine: &fakeSequenceRule{UID: "4", Group: "rg2"},
Evaluation: Evaluation{
rule: gen.With(
models.RuleGen.WithUID("4"),
models.RuleGen.WithGroupIndex(2),
models.RuleGen.WithGroupName("rg2"),
models.RuleGen.WithPrometheusOriginalRuleDefinition("test"),
).GenerateRef(),
folderTitle: "folder1",
},
},
{
ruleRoutine: &fakeSequenceRule{UID: "5", Group: "rg2"},
Evaluation: Evaluation{
rule: gen.With(
models.RuleGen.WithUID("5"),
models.RuleGen.WithGroupIndex(3),
models.RuleGen.WithGroupName("rg2"),
models.RuleGen.WithPrometheusOriginalRuleDefinition("test"),
).GenerateRef(),
folderTitle: "folder1",
},
},
{
ruleRoutine: &fakeSequenceRule{UID: "1", Group: "rg1"},
Evaluation: Evaluation{
rule: gen.With(
models.RuleGen.WithUID("1"),
models.RuleGen.WithGroupIndex(1),
models.RuleGen.WithGroupName("rg1"),
).GenerateRef(),
folderTitle: "folder1",
},
},
{
ruleRoutine: &fakeSequenceRule{UID: "2", Group: "rg1"},
Evaluation: Evaluation{
rule: gen.With(
models.RuleGen.WithUID("2"),
models.RuleGen.WithGroupIndex(2),
models.RuleGen.WithGroupName("rg1"),
).GenerateRef(),
folderTitle: "folder1",
},
},
}
sequences := sch.buildSequences(items, callback)
require.Equal(t, 3, len(sequences))
// Ensure sequences are sorted by UID
require.Equal(t, "1", sequences[0].rule.UID)
require.Equal(t, "2", sequences[1].rule.UID)
require.Equal(t, "3", sequences[2].rule.UID)
// Run the sequences
for _, sequence := range sequences {
sequence.ruleRoutine.Eval(&sequence.Evaluation)
}
// Verify the callbacks were called in the correct order. Since we dont sort these slices they should
// be in the same order as the items that were added to the sequences
require.Nil(t, nextByGroup["rg1"])
require.Nil(t, prevByGroup["rg1"])
require.Equal(t, []string{"4", "5"}, nextByGroup["rg2"])
require.Equal(t, []string{"3", "4"}, prevByGroup["rg2"])
})
}
Loading…
Cancel
Save