diff --git a/pkg/services/ngalert/eval/eval.go b/pkg/services/ngalert/eval/eval.go index 66087834eb8..d012839dd09 100644 --- a/pkg/services/ngalert/eval/eval.go +++ b/pkg/services/ngalert/eval/eval.go @@ -54,26 +54,26 @@ type ExecutionResults struct { } // Results is a slice of evaluated alert instances states. -type Results []result +type Results []Result -// result contains the evaluated state of an alert instance +// Result contains the evaluated State of an alert instance // identified by its labels. -type result struct { +type Result struct { Instance data.Labels - State state // Enum + State State // Enum // StartAt is the time at which we first saw this state StartAt time.Time // FiredAt is the time at which we first transitioned to a firing state FiredAt time.Time } -// state is an enum of the evaluation state for an alert instance. -type state int +// State is an enum of the evaluation State for an alert instance. +type State int const ( // Normal is the eval state for an alert instance condition // that evaluated to false. - Normal state = iota + Normal State = iota // Alerting is the eval state for an alert instance condition // that evaluated to true (Alerting). @@ -88,7 +88,7 @@ const ( Error ) -func (s state) String() string { +func (s State) String() string { return [...]string{"Normal", "Alerting", "NoData", "Error"}[s] } @@ -177,9 +177,9 @@ func execute(ctx AlertExecCtx, c *models.Condition, now time.Time, dataService * } // evaluateExecutionResult takes the ExecutionResult, and returns a frame where -// each column is a string type that holds a string representing its state. +// each column is a string type that holds a string representing its State. func evaluateExecutionResult(results *ExecutionResults, ts time.Time) (Results, error) { - evalResults := make([]result, 0) + evalResults := make([]Result, 0) labels := make(map[string]bool) for _, f := range results.Results { rowLen, err := f.RowLen() @@ -210,7 +210,7 @@ func evaluateExecutionResult(results *ExecutionResults, ts time.Time) (Results, return nil, &invalidEvalResultFormatError{refID: f.RefID, reason: fmt.Sprintf("expected nullable float64 but got type %T", f.Fields[0].Type())} } - r := result{ + r := Result{ Instance: f.Fields[0].Labels, StartAt: ts, } diff --git a/pkg/services/ngalert/ngalert.go b/pkg/services/ngalert/ngalert.go index 4665dc63fd9..44b9bf32206 100644 --- a/pkg/services/ngalert/ngalert.go +++ b/pkg/services/ngalert/ngalert.go @@ -4,6 +4,8 @@ import ( "context" "time" + "github.com/grafana/grafana/pkg/services/ngalert/state" + "github.com/benbjohnson/clock" "github.com/grafana/grafana/pkg/api/routing" @@ -45,6 +47,7 @@ type AlertNG struct { DataProxy *datasourceproxy.DatasourceProxyService `inject:""` Log log.Logger schedule schedule.ScheduleService + stateTracker *state.StateTracker } func init() { @@ -54,7 +57,7 @@ func init() { // Init initializes the AlertingService. func (ng *AlertNG) Init() error { ng.Log = log.New("ngalert") - + ng.stateTracker = state.NewStateTracker() baseInterval := baseIntervalSeconds * time.Second store := store.DBstore{BaseInterval: baseInterval, DefaultIntervalSeconds: defaultIntervalSeconds, SQLStore: ng.SQLStore} @@ -87,7 +90,7 @@ func (ng *AlertNG) Init() error { // Run starts the scheduler func (ng *AlertNG) Run(ctx context.Context) error { ng.Log.Debug("ngalert starting") - return ng.schedule.Ticker(ctx) + return ng.schedule.Ticker(ctx, ng.stateTracker) } // IsDisabled returns true if the alerting service is disable for this instance. diff --git a/pkg/services/ngalert/schedule/schedule.go b/pkg/services/ngalert/schedule/schedule.go index 19718075a6d..980398f1b1a 100644 --- a/pkg/services/ngalert/schedule/schedule.go +++ b/pkg/services/ngalert/schedule/schedule.go @@ -6,6 +6,8 @@ import ( "sync" "time" + "github.com/grafana/grafana/pkg/services/ngalert/state" + "github.com/grafana/grafana/pkg/services/ngalert/store" "github.com/grafana/grafana/pkg/services/ngalert/models" @@ -23,7 +25,7 @@ var timeNow = time.Now // ScheduleService handles scheduling type ScheduleService interface { - Ticker(context.Context) error + Ticker(context.Context, *state.StateTracker) error Pause() error Unpause() error @@ -33,8 +35,7 @@ type ScheduleService interface { overrideCfg(cfg SchedulerCfg) } -func (sch *schedule) definitionRoutine(grafanaCtx context.Context, key models.AlertDefinitionKey, - evalCh <-chan *evalContext, stopCh <-chan struct{}) error { +func (sch *schedule) definitionRoutine(grafanaCtx context.Context, key models.AlertDefinitionKey, evalCh <-chan *evalContext, stopCh <-chan struct{}, stateTracker *state.StateTracker) error { sch.log.Debug("alert definition routine started", "key", key) evalRunning := false @@ -77,13 +78,14 @@ func (sch *schedule) definitionRoutine(grafanaCtx context.Context, key models.Al return err } for _, r := range results { - sch.log.Debug("alert definition result", "title", alertDefinition.Title, "key", key, "attempt", attempt, "now", ctx.now, "duration", end.Sub(start), "instance", r.Instance, "state", r.State.String()) + sch.log.Info("alert definition result", "title", alertDefinition.Title, "key", key, "attempt", attempt, "now", ctx.now, "duration", end.Sub(start), "instance", r.Instance, "state", r.State.String()) cmd := models.SaveAlertInstanceCommand{DefinitionOrgID: key.OrgID, DefinitionUID: key.DefinitionUID, State: models.InstanceStateType(r.State.String()), Labels: models.InstanceLabels(r.Instance), LastEvalTime: ctx.now} err := sch.store.SaveAlertInstance(&cmd) if err != nil { sch.log.Error("failed saving alert instance", "title", alertDefinition.Title, "key", key, "attempt", attempt, "now", ctx.now, "instance", r.Instance, "state", r.State.String(), "error", err) } } + _ = stateTracker.ProcessEvalResults(key.DefinitionUID, results, condition) return nil } @@ -217,7 +219,7 @@ func (sch *schedule) Unpause() error { return nil } -func (sch *schedule) Ticker(grafanaCtx context.Context) error { +func (sch *schedule) Ticker(grafanaCtx context.Context, stateTracker *state.StateTracker) error { dispatcherGroup, ctx := errgroup.WithContext(grafanaCtx) for { select { @@ -250,7 +252,7 @@ func (sch *schedule) Ticker(grafanaCtx context.Context) error { if newRoutine && !invalidInterval { dispatcherGroup.Go(func() error { - return sch.definitionRoutine(ctx, key, definitionInfo.evalCh, definitionInfo.stopCh) + return sch.definitionRoutine(ctx, key, definitionInfo.evalCh, definitionInfo.stopCh, stateTracker) }) } diff --git a/pkg/services/ngalert/state/state_tracker.go b/pkg/services/ngalert/state/state_tracker.go new file mode 100644 index 00000000000..4751420858a --- /dev/null +++ b/pkg/services/ngalert/state/state_tracker.go @@ -0,0 +1,100 @@ +package state + +import ( + "fmt" + "sync" + + "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/grafana/grafana/pkg/services/ngalert/eval" + "github.com/grafana/grafana/pkg/services/ngalert/models" +) + +type AlertState struct { + UID string + CacheId string + Labels data.Labels + State eval.State + Results []eval.State +} + +type cache struct { + cacheMap map[string]AlertState + mu sync.Mutex +} + +type StateTracker struct { + stateCache cache +} + +func NewStateTracker() *StateTracker { + return &StateTracker{ + stateCache: cache{ + cacheMap: make(map[string]AlertState), + mu: sync.Mutex{}, + }, + } +} + +func (c *cache) getOrCreate(uid string, result eval.Result) AlertState { + c.mu.Lock() + defer c.mu.Unlock() + + idString := fmt.Sprintf("%s %s", uid, result.Instance.String()) + if state, ok := c.cacheMap[idString]; ok { + return state + } + newState := AlertState{ + UID: uid, + CacheId: idString, + Labels: result.Instance, + State: result.State, + Results: []eval.State{result.State}, + } + c.cacheMap[idString] = newState + return newState +} + +func (c *cache) update(stateEntry AlertState) { + c.mu.Lock() + defer c.mu.Unlock() + c.cacheMap[stateEntry.CacheId] = stateEntry +} + +func (c *cache) getStateForEntry(stateId string) eval.State { + c.mu.Lock() + defer c.mu.Unlock() + return c.cacheMap[stateId].State +} + +func (st *StateTracker) ProcessEvalResults(uid string, results eval.Results, condition models.Condition) []AlertState { + var changedStates []AlertState + for _, result := range results { + currentState := st.stateCache.getOrCreate(uid, result) + currentState.Results = append(currentState.Results, result.State) + newState := st.getNextState(uid, result) + if newState != currentState.State { + currentState.State = newState + changedStates = append(changedStates, currentState) + } + st.stateCache.update(currentState) + } + return changedStates +} + +func (st *StateTracker) getNextState(uid string, result eval.Result) eval.State { + currentState := st.stateCache.getOrCreate(uid, result) + if currentState.State == result.State { + return currentState.State + } + + switch { + case currentState.State == result.State: + return currentState.State + case currentState.State == eval.Normal && result.State == eval.Alerting: + return eval.Alerting + case currentState.State == eval.Alerting && result.State == eval.Normal: + return eval.Normal + default: + return eval.Alerting + } +} diff --git a/pkg/services/ngalert/state/state_tracker_test.go b/pkg/services/ngalert/state/state_tracker_test.go new file mode 100644 index 00000000000..7cd03db1d91 --- /dev/null +++ b/pkg/services/ngalert/state/state_tracker_test.go @@ -0,0 +1,123 @@ +package state + +import ( + "testing" + + "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/grafana/grafana/pkg/services/ngalert/eval" + "github.com/grafana/grafana/pkg/services/ngalert/models" + "github.com/stretchr/testify/assert" +) + +func TestProcessEvalResults(t *testing.T) { + testCases := []struct { + desc string + uid string + evalResults eval.Results + condition models.Condition + expectedCacheEntries int + expectedState eval.State + expectedResultCount int + }{ + { + desc: "given a single evaluation result", + uid: "test_uid", + evalResults: eval.Results{ + eval.Result{ + Instance: data.Labels{"label1": "value1", "label2": "value2"}, + }, + }, + expectedCacheEntries: 1, + expectedState: eval.Normal, + expectedResultCount: 0, + }, + { + desc: "given a state change from normal to alerting", + uid: "test_uid", + evalResults: eval.Results{ + eval.Result{ + Instance: data.Labels{"label1": "value1", "label2": "value2"}, + State: eval.Normal, + }, + eval.Result{ + Instance: data.Labels{"label1": "value1", "label2": "value2"}, + State: eval.Alerting, + }, + }, + expectedCacheEntries: 1, + expectedState: eval.Alerting, + expectedResultCount: 1, + }, + { + desc: "given a state change from alerting to normal", + uid: "test_uid", + evalResults: eval.Results{ + eval.Result{ + Instance: data.Labels{"label1": "value1", "label2": "value2"}, + State: eval.Alerting, + }, + eval.Result{ + Instance: data.Labels{"label1": "value1", "label2": "value2"}, + State: eval.Normal, + }, + }, + expectedCacheEntries: 1, + expectedState: eval.Normal, + expectedResultCount: 1, + }, + { + desc: "given a constant alerting state", + uid: "test_uid", + evalResults: eval.Results{ + eval.Result{ + Instance: data.Labels{"label1": "value1", "label2": "value2"}, + State: eval.Alerting, + }, + eval.Result{ + Instance: data.Labels{"label1": "value1", "label2": "value2"}, + State: eval.Alerting, + }, + }, + expectedCacheEntries: 1, + expectedState: eval.Alerting, + expectedResultCount: 0, + }, + { + desc: "given a constant normal state", + uid: "test_uid", + evalResults: eval.Results{ + eval.Result{ + Instance: data.Labels{"label1": "value1", "label2": "value2"}, + State: eval.Normal, + }, + eval.Result{ + Instance: data.Labels{"label1": "value1", "label2": "value2"}, + State: eval.Normal, + }, + }, + expectedCacheEntries: 1, + expectedState: eval.Normal, + expectedResultCount: 0, + }, + } + + for _, tc := range testCases { + t.Run("the correct number of entries are added to the cache", func(t *testing.T) { + st := NewStateTracker() + st.ProcessEvalResults(tc.uid, tc.evalResults, tc.condition) + assert.Equal(t, len(st.stateCache.cacheMap), tc.expectedCacheEntries) + }) + + t.Run("the correct state is set", func(t *testing.T) { + st := NewStateTracker() + st.ProcessEvalResults(tc.uid, tc.evalResults, tc.condition) + assert.Equal(t, st.stateCache.getStateForEntry("test_uid label1=value1, label2=value2"), tc.expectedState) + }) + + t.Run("the correct number of results are returned", func(t *testing.T) { + st := NewStateTracker() + results := st.ProcessEvalResults(tc.uid, tc.evalResults, tc.condition) + assert.Equal(t, len(results), tc.expectedResultCount) + }) + } +} diff --git a/pkg/services/ngalert/tests/schedule_test.go b/pkg/services/ngalert/tests/schedule_test.go index 9f165b389ae..2f0da1610d3 100644 --- a/pkg/services/ngalert/tests/schedule_test.go +++ b/pkg/services/ngalert/tests/schedule_test.go @@ -8,6 +8,8 @@ import ( "testing" "time" + "github.com/grafana/grafana/pkg/services/ngalert/state" + "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/services/ngalert/schedule" @@ -57,8 +59,10 @@ func TestAlertingTicker(t *testing.T) { sched := schedule.NewScheduler(schefCfg, nil) ctx := context.Background() + + st := state.NewStateTracker() go func() { - err := sched.Ticker(ctx) + err := sched.Ticker(ctx, st) require.NoError(t, err) }() runtime.Gosched()