diff --git a/pkg/services/ngalert/eval/eval.go b/pkg/services/ngalert/eval/eval.go index d012839dd09..4b2d2bccbdd 100644 --- a/pkg/services/ngalert/eval/eval.go +++ b/pkg/services/ngalert/eval/eval.go @@ -59,12 +59,9 @@ type Results []Result // Result contains the evaluated State of an alert instance // identified by its labels. type Result struct { - Instance data.Labels - State State // Enum - // StartAt is the time at which we first saw this state - StartAt time.Time - // FiredAt is the time at which we first transitioned to a firing state - FiredAt time.Time + Instance data.Labels + State State // Enum + EvaluatedAt time.Time } // State is an enum of the evaluation State for an alert instance. @@ -211,8 +208,8 @@ func evaluateExecutionResult(results *ExecutionResults, ts time.Time) (Results, } r := Result{ - Instance: f.Fields[0].Labels, - StartAt: ts, + Instance: f.Fields[0].Labels, + EvaluatedAt: ts, } switch { @@ -223,7 +220,6 @@ func evaluateExecutionResult(results *ExecutionResults, ts time.Time) (Results, case *val == 0: r.State = Normal default: - r.FiredAt = ts r.State = Alerting } diff --git a/pkg/services/ngalert/ngalert.go b/pkg/services/ngalert/ngalert.go index 44b9bf32206..7b4c8e95d13 100644 --- a/pkg/services/ngalert/ngalert.go +++ b/pkg/services/ngalert/ngalert.go @@ -57,7 +57,7 @@ func init() { // Init initializes the AlertingService. func (ng *AlertNG) Init() error { ng.Log = log.New("ngalert") - ng.stateTracker = state.NewStateTracker() + ng.stateTracker = state.NewStateTracker(ng.Log) baseInterval := baseIntervalSeconds * time.Second store := store.DBstore{BaseInterval: baseInterval, DefaultIntervalSeconds: defaultIntervalSeconds, SQLStore: ng.SQLStore} @@ -69,6 +69,7 @@ func (ng *AlertNG) Init() error { MaxAttempts: maxAttempts, Evaluator: eval.Evaluator{Cfg: ng.Cfg}, Store: store, + Notifier: ng.Alertmanager, } ng.schedule = schedule.NewScheduler(schedCfg, ng.DataService) diff --git a/pkg/services/ngalert/notifier/alertmanager.go b/pkg/services/ngalert/notifier/alertmanager.go index 099af3d9911..a9ab2cf4fd6 100644 --- a/pkg/services/ngalert/notifier/alertmanager.go +++ b/pkg/services/ngalert/notifier/alertmanager.go @@ -246,8 +246,8 @@ func (am *Alertmanager) buildReceiverIntegrations(receiver *api.PostableApiRecei return integrations, nil } -// CreateAlerts receives the alerts and then sends them through the corresponding route based on whenever the alert has a receiver embedded or not -func (am *Alertmanager) CreateAlerts(alerts ...*PostableAlert) error { +// PutAlerts receives the alerts and then sends them through the corresponding route based on whenever the alert has a receiver embedded or not +func (am *Alertmanager) PutAlerts(alerts ...*PostableAlert) error { return am.alerts.PutPostableAlert(alerts...) } diff --git a/pkg/services/ngalert/schedule/compat.go b/pkg/services/ngalert/schedule/compat.go new file mode 100644 index 00000000000..f90cab27457 --- /dev/null +++ b/pkg/services/ngalert/schedule/compat.go @@ -0,0 +1,24 @@ +package schedule + +import ( + "github.com/grafana/grafana/pkg/services/ngalert/notifier" + "github.com/grafana/grafana/pkg/services/ngalert/state" + "github.com/prometheus/alertmanager/api/v2/models" +) + +func FromAlertStateToPostableAlerts(firingStates []state.AlertState) []*notifier.PostableAlert { + alerts := make([]*notifier.PostableAlert, 0, len(firingStates)) + for _, state := range firingStates { + alerts = append(alerts, ¬ifier.PostableAlert{ + PostableAlert: models.PostableAlert{ + Annotations: models.LabelSet{}, //TODO: add annotations to evaluation results, add them to the state struct, and then set them before sending to the notifier + StartsAt: state.StartsAt, + EndsAt: state.EndsAt, + Alert: models.Alert{ + Labels: models.LabelSet(state.Labels), + }, + }, + }) + } + return alerts +} diff --git a/pkg/services/ngalert/schedule/schedule.go b/pkg/services/ngalert/schedule/schedule.go index 980398f1b1a..b4d9a7c8150 100644 --- a/pkg/services/ngalert/schedule/schedule.go +++ b/pkg/services/ngalert/schedule/schedule.go @@ -6,18 +6,19 @@ import ( "sync" "time" - "github.com/grafana/grafana/pkg/services/ngalert/state" + "golang.org/x/sync/errgroup" - "github.com/grafana/grafana/pkg/services/ngalert/store" + "github.com/benbjohnson/clock" "github.com/grafana/grafana/pkg/services/ngalert/models" + "github.com/grafana/grafana/pkg/services/ngalert/notifier" + "github.com/grafana/grafana/pkg/services/ngalert/state" + "github.com/grafana/grafana/pkg/services/ngalert/store" - "github.com/benbjohnson/clock" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/services/alerting" "github.com/grafana/grafana/pkg/services/ngalert/eval" "github.com/grafana/grafana/pkg/tsdb" - "golang.org/x/sync/errgroup" ) // timeNow makes it possible to test usage of time @@ -85,7 +86,12 @@ func (sch *schedule) definitionRoutine(grafanaCtx context.Context, key models.Al sch.log.Error("failed saving alert instance", "title", alertDefinition.Title, "key", key, "attempt", attempt, "now", ctx.now, "instance", r.Instance, "state", r.State.String(), "error", err) } } - _ = stateTracker.ProcessEvalResults(key.DefinitionUID, results, condition) + transitionedStates := stateTracker.ProcessEvalResults(key.DefinitionUID, results, condition) + alerts := FromAlertStateToPostableAlerts(transitionedStates) + err = sch.SendAlerts(alerts) + if err != nil { + sch.log.Error("failed to put alerts in the notifier", "count", len(alerts), "err", err) + } return nil } @@ -114,6 +120,11 @@ func (sch *schedule) definitionRoutine(grafanaCtx context.Context, key models.Al } } +// Notifier handles the delivery of alert notifications to the end user +type Notifier interface { + PutAlerts(alerts ...*notifier.PostableAlert) error +} + type schedule struct { // base tick rate (fastest possible configured check) baseInterval time.Duration @@ -144,6 +155,8 @@ type schedule struct { store store.Store dataService *tsdb.Service + + notifier Notifier } // SchedulerCfg is the scheduler configuration. @@ -156,6 +169,7 @@ type SchedulerCfg struct { StopAppliedFunc func(models.AlertDefinitionKey) Evaluator eval.Evaluator Store store.Store + Notifier Notifier } // NewScheduler returns a new schedule. @@ -173,6 +187,7 @@ func NewScheduler(cfg SchedulerCfg, dataService *tsdb.Service) *schedule { evaluator: cfg.Evaluator, store: cfg.Store, dataService: dataService, + notifier: cfg.Notifier, } return &sch } @@ -302,6 +317,10 @@ func (sch *schedule) Ticker(grafanaCtx context.Context, stateTracker *state.Stat } } +func (sch *schedule) SendAlerts(alerts []*notifier.PostableAlert) error { + return sch.notifier.PutAlerts(alerts...) +} + type alertDefinitionRegistry struct { mu sync.Mutex alertDefinitionInfo map[models.AlertDefinitionKey]alertDefinitionInfo diff --git a/pkg/services/ngalert/state/state_tracker.go b/pkg/services/ngalert/state/state_tracker.go index 4751420858a..9bd4f14ada8 100644 --- a/pkg/services/ngalert/state/state_tracker.go +++ b/pkg/services/ngalert/state/state_tracker.go @@ -3,18 +3,25 @@ package state import ( "fmt" "sync" + "time" + "github.com/grafana/grafana/pkg/infra/log" + + "github.com/go-openapi/strfmt" "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana/pkg/services/ngalert/eval" - "github.com/grafana/grafana/pkg/services/ngalert/models" + ngModels "github.com/grafana/grafana/pkg/services/ngalert/models" ) type AlertState struct { - UID string - CacheId string - Labels data.Labels - State eval.State - Results []eval.State + UID string + CacheId string + Labels data.Labels + State eval.State + Results []eval.State + StartsAt strfmt.DateTime + EndsAt strfmt.DateTime + EvaluatedAt strfmt.DateTime } type cache struct { @@ -24,77 +31,135 @@ type cache struct { type StateTracker struct { stateCache cache + quit chan struct{} + Log log.Logger } -func NewStateTracker() *StateTracker { - return &StateTracker{ +func NewStateTracker(logger log.Logger) *StateTracker { + tracker := &StateTracker{ stateCache: cache{ cacheMap: make(map[string]AlertState), mu: sync.Mutex{}, }, + quit: make(chan struct{}), + Log: logger, } + go tracker.cleanUp() + return tracker } -func (c *cache) getOrCreate(uid string, result eval.Result) AlertState { - c.mu.Lock() - defer c.mu.Unlock() +func (st *StateTracker) getOrCreate(uid string, result eval.Result) AlertState { + st.stateCache.mu.Lock() + defer st.stateCache.mu.Unlock() idString := fmt.Sprintf("%s %s", uid, result.Instance.String()) - if state, ok := c.cacheMap[idString]; ok { + if state, ok := st.stateCache.cacheMap[idString]; ok { return state } + st.Log.Debug("adding new alert state cache entry", "cacheId", idString, "state", result.State.String(), "evaluatedAt", result.EvaluatedAt.String()) newState := AlertState{ - UID: uid, - CacheId: idString, - Labels: result.Instance, - State: result.State, - Results: []eval.State{result.State}, + UID: uid, + CacheId: idString, + Labels: result.Instance, + State: result.State, + Results: []eval.State{}, + EvaluatedAt: strfmt.DateTime(result.EvaluatedAt), } - c.cacheMap[idString] = newState + st.stateCache.cacheMap[idString] = newState return newState } -func (c *cache) update(stateEntry AlertState) { - c.mu.Lock() - defer c.mu.Unlock() - c.cacheMap[stateEntry.CacheId] = stateEntry +func (st *StateTracker) set(stateEntry AlertState) { + st.stateCache.mu.Lock() + defer st.stateCache.mu.Unlock() + st.stateCache.cacheMap[stateEntry.CacheId] = stateEntry } -func (c *cache) getStateForEntry(stateId string) eval.State { - c.mu.Lock() - defer c.mu.Unlock() - return c.cacheMap[stateId].State +func (st *StateTracker) get(stateId string) AlertState { + st.stateCache.mu.Lock() + defer st.stateCache.mu.Unlock() + return st.stateCache.cacheMap[stateId] } -func (st *StateTracker) ProcessEvalResults(uid string, results eval.Results, condition models.Condition) []AlertState { +func (st *StateTracker) ProcessEvalResults(uid string, results eval.Results, condition ngModels.Condition) []AlertState { + st.Log.Info("state tracker processing evaluation results", "uid", uid, "resultCount", len(results)) var changedStates []AlertState for _, result := range results { - currentState := st.stateCache.getOrCreate(uid, result) - currentState.Results = append(currentState.Results, result.State) - newState := st.getNextState(uid, result) - if newState != currentState.State { - currentState.State = newState - changedStates = append(changedStates, currentState) + if s, ok := st.setNextState(uid, result); ok { + changedStates = append(changedStates, s) } - st.stateCache.update(currentState) } + st.Log.Debug("returning changed states to scheduler", "count", len(changedStates)) return changedStates } -func (st *StateTracker) getNextState(uid string, result eval.Result) eval.State { - currentState := st.stateCache.getOrCreate(uid, result) - if currentState.State == result.State { - return currentState.State - } - +//TODO: When calculating if an alert should not be firing anymore, we should take three things into account: +// 1. The re-send the delay if any, we don't want to send every firing alert every time, we should have a fixed delay across all alerts to avoid saturating the notification system +// 2. The evaluation interval defined for this particular alert - we don't support that yet but will eventually allow you to define how often do you want this alert to be evaluted +// 3. The base interval defined by the scheduler - in the case where #2 is not yet an option we can use the base interval at which every alert runs. +//Set the current state based on evaluation results +//return the state and a bool indicating whether a state transition occurred +func (st *StateTracker) setNextState(uid string, result eval.Result) (AlertState, bool) { + currentState := st.getOrCreate(uid, result) + st.Log.Debug("setting alert state", "uid", uid) switch { case currentState.State == result.State: - return currentState.State + st.Log.Debug("no state transition", "cacheId", currentState.CacheId, "state", currentState.State.String()) + currentState.EvaluatedAt = strfmt.DateTime(result.EvaluatedAt) + currentState.Results = append(currentState.Results, result.State) + if currentState.State == eval.Alerting { + currentState.EndsAt = strfmt.DateTime(result.EvaluatedAt.Add(40 * time.Second)) + } + st.set(currentState) + return currentState, false case currentState.State == eval.Normal && result.State == eval.Alerting: - return eval.Alerting + st.Log.Debug("state transition from normal to alerting", "cacheId", currentState.CacheId) + currentState.State = eval.Alerting + currentState.EvaluatedAt = strfmt.DateTime(result.EvaluatedAt) + currentState.StartsAt = strfmt.DateTime(result.EvaluatedAt) + currentState.EndsAt = strfmt.DateTime(result.EvaluatedAt.Add(40 * time.Second)) + currentState.Results = append(currentState.Results, result.State) + st.set(currentState) + return currentState, true case currentState.State == eval.Alerting && result.State == eval.Normal: - return eval.Normal + st.Log.Debug("state transition from alerting to normal", "cacheId", currentState.CacheId) + currentState.State = eval.Normal + currentState.EvaluatedAt = strfmt.DateTime(result.EvaluatedAt) + currentState.EndsAt = strfmt.DateTime(result.EvaluatedAt) + currentState.Results = append(currentState.Results, result.State) + st.set(currentState) + return currentState, true default: - return eval.Alerting + return currentState, false + } +} + +func (st *StateTracker) cleanUp() { + ticker := time.NewTicker(time.Duration(60) * time.Minute) + st.Log.Debug("starting cleanup process", "intervalMinutes", 60) + for { + select { + case <-ticker.C: + st.trim() + case <-st.quit: + st.Log.Debug("stopping cleanup process", "now", time.Now()) + ticker.Stop() + return + } + } +} + +func (st *StateTracker) trim() { + st.Log.Info("trimming alert state cache") + st.stateCache.mu.Lock() + defer st.stateCache.mu.Unlock() + for _, v := range st.stateCache.cacheMap { + if len(v.Results) > 100 { + st.Log.Debug("trimming result set", "cacheId", v.CacheId, "count", len(v.Results)-100) + newResults := make([]eval.State, 100) + copy(newResults, v.Results[100:]) + v.Results = newResults + st.set(v) + } } } diff --git a/pkg/services/ngalert/state/state_tracker_test.go b/pkg/services/ngalert/state/state_tracker_test.go index 7cd03db1d91..14bd749d3ae 100644 --- a/pkg/services/ngalert/state/state_tracker_test.go +++ b/pkg/services/ngalert/state/state_tracker_test.go @@ -2,6 +2,11 @@ package state import ( "testing" + "time" + + "github.com/grafana/grafana/pkg/infra/log" + + "github.com/go-openapi/strfmt" "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana/pkg/services/ngalert/eval" @@ -10,14 +15,19 @@ import ( ) func TestProcessEvalResults(t *testing.T) { + evaluationTime, err := time.Parse("2006-01-02", "2021-03-25") + if err != nil { + t.Fatalf("error parsing date format: %s", err.Error()) + } testCases := []struct { - desc string - uid string - evalResults eval.Results - condition models.Condition - expectedCacheEntries int - expectedState eval.State - expectedResultCount int + desc string + uid string + evalResults eval.Results + condition models.Condition + expectedState eval.State + expectedReturnedStateCount int + expectedResultCount int + expectedCacheEntries []AlertState }{ { desc: "given a single evaluation result", @@ -27,12 +37,24 @@ func TestProcessEvalResults(t *testing.T) { Instance: data.Labels{"label1": "value1", "label2": "value2"}, }, }, - expectedCacheEntries: 1, - expectedState: eval.Normal, - expectedResultCount: 0, + expectedState: eval.Normal, + expectedReturnedStateCount: 0, + expectedResultCount: 1, + expectedCacheEntries: []AlertState{ + { + UID: "test_uid", + CacheId: "test_uid label1=value1, label2=value2", + Labels: data.Labels{"label1": "value1", "label2": "value2"}, + State: eval.Normal, + Results: []eval.State{eval.Normal}, + StartsAt: strfmt.DateTime{}, + EndsAt: strfmt.DateTime{}, + EvaluatedAt: strfmt.DateTime(evaluationTime), + }, + }, }, { - desc: "given a state change from normal to alerting", + desc: "given a state change from normal to alerting for a single entity", uid: "test_uid", evalResults: eval.Results{ eval.Result{ @@ -44,12 +66,24 @@ func TestProcessEvalResults(t *testing.T) { State: eval.Alerting, }, }, - expectedCacheEntries: 1, - expectedState: eval.Alerting, - expectedResultCount: 1, + expectedState: eval.Alerting, + expectedReturnedStateCount: 1, + expectedResultCount: 2, + expectedCacheEntries: []AlertState{ + { + UID: "test_uid", + CacheId: "test_uid label1=value1, label2=value2", + Labels: data.Labels{"label1": "value1", "label2": "value2"}, + State: eval.Alerting, + Results: []eval.State{eval.Normal, eval.Alerting}, + StartsAt: strfmt.DateTime{}, + EndsAt: strfmt.DateTime{}, + EvaluatedAt: strfmt.DateTime(evaluationTime), + }, + }, }, { - desc: "given a state change from alerting to normal", + desc: "given a state change from alerting to normal for a single entity", uid: "test_uid", evalResults: eval.Results{ eval.Result{ @@ -61,12 +95,24 @@ func TestProcessEvalResults(t *testing.T) { State: eval.Normal, }, }, - expectedCacheEntries: 1, - expectedState: eval.Normal, - expectedResultCount: 1, + expectedState: eval.Normal, + expectedReturnedStateCount: 1, + expectedResultCount: 2, + expectedCacheEntries: []AlertState{ + { + UID: "test_uid", + CacheId: "test_uid label1=value1, label2=value2", + Labels: data.Labels{"label1": "value1", "label2": "value2"}, + State: eval.Normal, + Results: []eval.State{eval.Alerting, eval.Normal}, + StartsAt: strfmt.DateTime{}, + EndsAt: strfmt.DateTime{}, + EvaluatedAt: strfmt.DateTime(evaluationTime), + }, + }, }, { - desc: "given a constant alerting state", + desc: "given a constant alerting state for a single entity", uid: "test_uid", evalResults: eval.Results{ eval.Result{ @@ -78,12 +124,24 @@ func TestProcessEvalResults(t *testing.T) { State: eval.Alerting, }, }, - expectedCacheEntries: 1, - expectedState: eval.Alerting, - expectedResultCount: 0, + expectedState: eval.Alerting, + expectedReturnedStateCount: 0, + expectedResultCount: 2, + expectedCacheEntries: []AlertState{ + { + UID: "test_uid", + CacheId: "test_uid label1=value1, label2=value2", + Labels: data.Labels{"label1": "value1", "label2": "value2"}, + State: eval.Alerting, + Results: []eval.State{eval.Alerting, eval.Alerting}, + StartsAt: strfmt.DateTime{}, + EndsAt: strfmt.DateTime{}, + EvaluatedAt: strfmt.DateTime(evaluationTime), + }, + }, }, { - desc: "given a constant normal state", + desc: "given a constant normal state for a single entity", uid: "test_uid", evalResults: eval.Results{ eval.Result{ @@ -95,29 +153,56 @@ func TestProcessEvalResults(t *testing.T) { State: eval.Normal, }, }, - expectedCacheEntries: 1, - expectedState: eval.Normal, - expectedResultCount: 0, + expectedState: eval.Normal, + expectedReturnedStateCount: 0, + expectedResultCount: 2, + expectedCacheEntries: []AlertState{ + { + UID: "test_uid", + CacheId: "test_uid label1=value1, label2=value2", + Labels: data.Labels{"label1": "value1", "label2": "value2"}, + State: eval.Normal, + Results: []eval.State{eval.Normal, eval.Normal}, + StartsAt: strfmt.DateTime{}, + EndsAt: strfmt.DateTime{}, + EvaluatedAt: strfmt.DateTime(evaluationTime), + }, + }, }, } for _, tc := range testCases { t.Run("the correct number of entries are added to the cache", func(t *testing.T) { - st := NewStateTracker() + st := NewStateTracker(log.New("test_state_tracker")) st.ProcessEvalResults(tc.uid, tc.evalResults, tc.condition) - assert.Equal(t, len(st.stateCache.cacheMap), tc.expectedCacheEntries) + assert.Equal(t, len(tc.expectedCacheEntries), len(st.stateCache.cacheMap)) }) - t.Run("the correct state is set", func(t *testing.T) { - st := NewStateTracker() + t.Run("the correct state is set for each evaluation result", func(t *testing.T) { + st := NewStateTracker(log.New("test_state_tracker")) st.ProcessEvalResults(tc.uid, tc.evalResults, tc.condition) - assert.Equal(t, st.stateCache.getStateForEntry("test_uid label1=value1, label2=value2"), tc.expectedState) + for _, entry := range tc.expectedCacheEntries { + testState := st.get(entry.CacheId) + assert.Equal(t, tc.expectedState, testState.State) + } }) - t.Run("the correct number of results are returned", func(t *testing.T) { - st := NewStateTracker() + t.Run("the correct number of states are returned to the caller", func(t *testing.T) { + st := NewStateTracker(log.New("test_state_tracker")) results := st.ProcessEvalResults(tc.uid, tc.evalResults, tc.condition) - assert.Equal(t, len(results), tc.expectedResultCount) + assert.Equal(t, tc.expectedReturnedStateCount, len(results)) + }) + + t.Run("the correct results are set for each cache entry", func(t *testing.T) { + st := NewStateTracker(log.New("test_state_tracker")) + _ = st.ProcessEvalResults(tc.uid, tc.evalResults, tc.condition) + for _, entry := range tc.expectedCacheEntries { + testState := st.get(entry.CacheId) + assert.Equal(t, len(entry.Results), len(testState.Results)) + for i, res := range entry.Results { + assert.Equal(t, res, testState.Results[i]) + } + } }) } } diff --git a/pkg/services/ngalert/tests/schedule_test.go b/pkg/services/ngalert/tests/schedule_test.go index 2f0da1610d3..4e70c9b3bcd 100644 --- a/pkg/services/ngalert/tests/schedule_test.go +++ b/pkg/services/ngalert/tests/schedule_test.go @@ -60,7 +60,7 @@ func TestAlertingTicker(t *testing.T) { ctx := context.Background() - st := state.NewStateTracker() + st := state.NewStateTracker(schefCfg.Logger) go func() { err := sched.Ticker(ctx, st) require.NoError(t, err)