From 82638d059fd8de976f19b5e046f680a609ea6918 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Philippe=20Qu=C3=A9m=C3=A9ner?= Date: Wed, 17 Jan 2024 13:33:13 +0100 Subject: [PATCH] feat(alerting): add state persister interface (#80384) --- pkg/services/ngalert/api/api_testing.go | 19 ++- pkg/services/ngalert/backtesting/engine.go | 19 ++- pkg/services/ngalert/ngalert.go | 5 +- .../ngalert/schedule/schedule_unit_test.go | 24 +-- pkg/services/ngalert/state/manager.go | 91 ++---------- .../ngalert/state/manager_bench_test.go | 9 +- .../ngalert/state/manager_private_test.go | 100 ++----------- pkg/services/ngalert/state/manager_test.go | 138 +++++++++--------- pkg/services/ngalert/state/persister_noop.go | 17 +++ pkg/services/ngalert/state/persister_sync.go | 111 ++++++++++++++ .../ngalert/state/persister_sync_test.go | 103 +++++++++++++ 11 files changed, 353 insertions(+), 283 deletions(-) create mode 100644 pkg/services/ngalert/state/persister_noop.go create mode 100644 pkg/services/ngalert/state/persister_sync.go create mode 100644 pkg/services/ngalert/state/persister_sync_test.go diff --git a/pkg/services/ngalert/api/api_testing.go b/pkg/services/ngalert/api/api_testing.go index 599ddb4d687..c1c62a6f5e2 100644 --- a/pkg/services/ngalert/api/api_testing.go +++ b/pkg/services/ngalert/api/api_testing.go @@ -87,17 +87,16 @@ func (srv TestingApiSrv) RouteTestGrafanaRuleConfig(c *contextmodel.ReqContext, } cfg := state.ManagerCfg{ - Metrics: nil, - ExternalURL: srv.appUrl, - InstanceStore: nil, - Images: &backtesting.NoopImageService{}, - Clock: clock.New(), - Historian: nil, - MaxStateSaveConcurrency: 1, - Tracer: srv.tracer, - Log: log.New("ngalert.state.manager"), + Metrics: nil, + ExternalURL: srv.appUrl, + InstanceStore: nil, + Images: &backtesting.NoopImageService{}, + Clock: clock.New(), + Historian: nil, + Tracer: srv.tracer, + Log: log.New("ngalert.state.manager"), } - manager := state.NewManager(cfg) + manager := state.NewManager(cfg, state.NewNoopPersister()) includeFolder := !srv.cfg.ReservedLabels.IsReservedLabelDisabled(models.FolderTitleLabel) transitions := manager.ProcessEvalResults( c.Req.Context(), diff --git a/pkg/services/ngalert/backtesting/engine.go b/pkg/services/ngalert/backtesting/engine.go index 0d2f481cbf9..30d4c8135b9 100644 --- a/pkg/services/ngalert/backtesting/engine.go +++ b/pkg/services/ngalert/backtesting/engine.go @@ -49,17 +49,16 @@ func NewEngine(appUrl *url.URL, evalFactory eval.EvaluatorFactory, tracer tracin evalFactory: evalFactory, createStateManager: func() stateManager { cfg := state.ManagerCfg{ - Metrics: nil, - ExternalURL: appUrl, - InstanceStore: nil, - Images: &NoopImageService{}, - Clock: clock.New(), - Historian: nil, - MaxStateSaveConcurrency: 1, - Tracer: tracer, - Log: log.New("ngalert.state.manager"), + Metrics: nil, + ExternalURL: appUrl, + InstanceStore: nil, + Images: &NoopImageService{}, + Clock: clock.New(), + Historian: nil, + Tracer: tracer, + Log: log.New("ngalert.state.manager"), } - return state.NewManager(cfg) + return state.NewManager(cfg, state.NewNoopPersister()) }, } } diff --git a/pkg/services/ngalert/ngalert.go b/pkg/services/ngalert/ngalert.go index f11d1fade82..a54db4db1d0 100644 --- a/pkg/services/ngalert/ngalert.go +++ b/pkg/services/ngalert/ngalert.go @@ -290,12 +290,13 @@ func (ng *AlertNG) init() error { Clock: clk, Historian: history, DoNotSaveNormalState: ng.FeatureToggles.IsEnabledGlobally(featuremgmt.FlagAlertingNoNormalState), - MaxStateSaveConcurrency: ng.Cfg.UnifiedAlerting.MaxStateSaveConcurrency, ApplyNoDataAndErrorToAllStates: ng.FeatureToggles.IsEnabledGlobally(featuremgmt.FlagAlertingNoDataErrorExecution), + MaxStateSaveConcurrency: ng.Cfg.UnifiedAlerting.MaxStateSaveConcurrency, Tracer: ng.tracer, Log: log.New("ngalert.state.manager"), } - stateManager := state.NewManager(cfg) + statePersister := state.NewSyncStatePersisiter(log.New("ngalert.state.manager.persist"), cfg) + stateManager := state.NewManager(cfg, statePersister) scheduler := schedule.NewScheduler(schedCfg, stateManager) // if it is required to include folder title to the alerts, we need to subscribe to changes of alert title diff --git a/pkg/services/ngalert/schedule/schedule_unit_test.go b/pkg/services/ngalert/schedule/schedule_unit_test.go index 6d1d03f6d02..3efbc28976f 100644 --- a/pkg/services/ngalert/schedule/schedule_unit_test.go +++ b/pkg/services/ngalert/schedule/schedule_unit_test.go @@ -82,17 +82,16 @@ func TestProcessTicks(t *testing.T) { Log: log.New("ngalert.scheduler"), } managerCfg := state.ManagerCfg{ - Metrics: testMetrics.GetStateMetrics(), - ExternalURL: nil, - InstanceStore: nil, - Images: &state.NoopImageService{}, - Clock: mockedClock, - Historian: &state.FakeHistorian{}, - MaxStateSaveConcurrency: 1, - Tracer: testTracer, - Log: log.New("ngalert.state.manager"), + Metrics: testMetrics.GetStateMetrics(), + ExternalURL: nil, + InstanceStore: nil, + Images: &state.NoopImageService{}, + Clock: mockedClock, + Historian: &state.FakeHistorian{}, + Tracer: testTracer, + Log: log.New("ngalert.state.manager"), } - st := state.NewManager(managerCfg) + st := state.NewManager(managerCfg, state.NewNoopPersister()) sched := NewScheduler(schedCfg, st) @@ -906,11 +905,12 @@ func setupScheduler(t *testing.T, rs *fakeRulesStore, is *state.FakeInstanceStor Images: &state.NoopImageService{}, Clock: mockedClock, Historian: &state.FakeHistorian{}, - MaxStateSaveConcurrency: 1, Tracer: testTracer, Log: log.New("ngalert.state.manager"), + MaxStateSaveConcurrency: 1, } - st := state.NewManager(managerCfg) + syncStatePersister := state.NewSyncStatePersisiter(log.New("ngalert.state.manager.perist"), managerCfg) + st := state.NewManager(managerCfg, syncStatePersister) return NewScheduler(schedCfg, st) } diff --git a/pkg/services/ngalert/state/manager.go b/pkg/services/ngalert/state/manager.go index 166407e5ac0..bef12d6120e 100644 --- a/pkg/services/ngalert/state/manager.go +++ b/pkg/services/ngalert/state/manager.go @@ -7,7 +7,6 @@ import ( "time" "github.com/benbjohnson/clock" - "github.com/grafana/dskit/concurrency" "github.com/grafana/grafana-plugin-sdk-go/data" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -30,6 +29,11 @@ type AlertInstanceManager interface { GetStatesForRuleUID(orgID int64, alertRuleUID string) []*State } +type StatePersister interface { + Async(ctx context.Context, ticker *clock.Ticker, cache *cache) + Sync(ctx context.Context, span trace.Span, states, staleStates []StateTransition) +} + type Manager struct { log log.Logger metrics *metrics.State @@ -45,8 +49,9 @@ type Manager struct { externalURL *url.URL doNotSaveNormalState bool - maxStateSaveConcurrency int applyNoDataAndErrorToAllStates bool + + persister StatePersister } type ManagerCfg struct { @@ -60,7 +65,6 @@ type ManagerCfg struct { DoNotSaveNormalState bool // MaxStateSaveConcurrency controls the number of goroutines (per rule) that can save alert state in parallel. MaxStateSaveConcurrency int - // ApplyNoDataAndErrorToAllStates makes state manager to apply exceptional results (NoData and Error) // to all states when corresponding execution in the rule definition is set to either `Alerting` or `OK` ApplyNoDataAndErrorToAllStates bool @@ -69,7 +73,7 @@ type ManagerCfg struct { Log log.Logger } -func NewManager(cfg ManagerCfg) *Manager { +func NewManager(cfg ManagerCfg, statePersister StatePersister) *Manager { // Metrics for the cache use a collector, so they need access to the register directly. c := newCache() if cfg.Metrics != nil { @@ -87,8 +91,8 @@ func NewManager(cfg ManagerCfg) *Manager { clock: cfg.Clock, externalURL: cfg.ExternalURL, doNotSaveNormalState: cfg.DoNotSaveNormalState, - maxStateSaveConcurrency: cfg.MaxStateSaveConcurrency, applyNoDataAndErrorToAllStates: cfg.ApplyNoDataAndErrorToAllStates, + persister: statePersister, tracer: cfg.Tracer, } @@ -279,16 +283,7 @@ func (st *Manager) ProcessEvalResults(ctx context.Context, evaluatedAt time.Time )) staleStates := st.deleteStaleStatesFromCache(ctx, logger, evaluatedAt, alertRule) - st.deleteAlertStates(tracingCtx, logger, staleStates) - - if len(staleStates) > 0 { - span.AddEvent("deleted stale states", trace.WithAttributes( - attribute.Int64("state_transitions", int64(len(staleStates))), - )) - } - - st.saveAlertStates(tracingCtx, logger, states...) - span.AddEvent("updated database") + st.persister.Sync(tracingCtx, span, states, staleStates) allChanges := append(states, staleStates...) if st.historian != nil { @@ -442,72 +437,6 @@ func (st *Manager) Put(states []*State) { } } -// TODO: Is the `State` type necessary? Should it embed the instance? -func (st *Manager) saveAlertStates(ctx context.Context, logger log.Logger, states ...StateTransition) { - if st.instanceStore == nil || len(states) == 0 { - return - } - - saveState := func(ctx context.Context, idx int) error { - s := states[idx] - // Do not save normal state to database and remove transition to Normal state but keep mapped states - if st.doNotSaveNormalState && IsNormalStateWithNoReason(s.State) && !s.Changed() { - return nil - } - - key, err := s.GetAlertInstanceKey() - if err != nil { - logger.Error("Failed to create a key for alert state to save it to database. The state will be ignored ", "cacheID", s.CacheID, "error", err, "labels", s.Labels.String()) - return nil - } - instance := ngModels.AlertInstance{ - AlertInstanceKey: key, - Labels: ngModels.InstanceLabels(s.Labels), - CurrentState: ngModels.InstanceStateType(s.State.State.String()), - CurrentReason: s.StateReason, - LastEvalTime: s.LastEvaluationTime, - CurrentStateSince: s.StartsAt, - CurrentStateEnd: s.EndsAt, - ResultFingerprint: s.ResultFingerprint.String(), - } - - err = st.instanceStore.SaveAlertInstance(ctx, instance) - if err != nil { - logger.Error("Failed to save alert state", "labels", s.Labels.String(), "state", s.State, "error", err) - return nil - } - return nil - } - - start := time.Now() - logger.Debug("Saving alert states", "count", len(states), "max_state_save_concurrency", st.maxStateSaveConcurrency) - _ = concurrency.ForEachJob(ctx, len(states), st.maxStateSaveConcurrency, saveState) - logger.Debug("Saving alert states done", "count", len(states), "max_state_save_concurrency", st.maxStateSaveConcurrency, "duration", time.Since(start)) -} - -func (st *Manager) deleteAlertStates(ctx context.Context, logger log.Logger, states []StateTransition) { - if st.instanceStore == nil || len(states) == 0 { - return - } - - logger.Debug("Deleting alert states", "count", len(states)) - toDelete := make([]ngModels.AlertInstanceKey, 0, len(states)) - - for _, s := range states { - key, err := s.GetAlertInstanceKey() - if err != nil { - logger.Error("Failed to delete alert instance with invalid labels", "cacheID", s.CacheID, "error", err) - continue - } - toDelete = append(toDelete, key) - } - - err := st.instanceStore.DeleteAlertInstances(ctx, toDelete...) - if err != nil { - logger.Error("Failed to delete stale states", "error", err) - } -} - func translateInstanceState(state ngModels.InstanceStateType) eval.State { switch state { case ngModels.InstanceStateFiring: diff --git a/pkg/services/ngalert/state/manager_bench_test.go b/pkg/services/ngalert/state/manager_bench_test.go index 3e6ce0e2ec0..24543e296ce 100644 --- a/pkg/services/ngalert/state/manager_bench_test.go +++ b/pkg/services/ngalert/state/manager_bench_test.go @@ -26,12 +26,11 @@ func BenchmarkProcessEvalResults(b *testing.B) { store := historian.NewAnnotationStore(&as, nil, metrics) hist := historian.NewAnnotationBackend(store, nil, metrics) cfg := state.ManagerCfg{ - Historian: hist, - MaxStateSaveConcurrency: 1, - Tracer: tracing.InitializeTracerForTest(), - Log: log.New("ngalert.state.manager"), + Historian: hist, + Tracer: tracing.InitializeTracerForTest(), + Log: log.New("ngalert.state.manager"), } - sut := state.NewManager(cfg) + sut := state.NewManager(cfg, state.NewNoopPersister()) now := time.Now().UTC() rule := makeBenchRule() results := makeBenchResults(100) diff --git a/pkg/services/ngalert/state/manager_private_test.go b/pkg/services/ngalert/state/manager_private_test.go index 663aac925d4..681e44a38f1 100644 --- a/pkg/services/ngalert/state/manager_private_test.go +++ b/pkg/services/ngalert/state/manager_private_test.go @@ -19,13 +19,10 @@ import ( "github.com/grafana/grafana/pkg/expr" "github.com/grafana/grafana/pkg/infra/log" - "github.com/grafana/grafana/pkg/infra/log/logtest" "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/services/ngalert/eval" "github.com/grafana/grafana/pkg/services/ngalert/metrics" ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models" - - "github.com/grafana/grafana/pkg/util" ) // Not for parallel tests. @@ -82,84 +79,6 @@ func TestStateIsStale(t *testing.T) { } } -func TestManager_saveAlertStates(t *testing.T) { - type stateWithReason struct { - State eval.State - Reason string - } - create := func(s eval.State, r string) stateWithReason { - return stateWithReason{ - State: s, - Reason: r, - } - } - allStates := [...]stateWithReason{ - create(eval.Normal, ""), - create(eval.Normal, eval.NoData.String()), - create(eval.Normal, eval.Error.String()), - create(eval.Normal, util.GenerateShortUID()), - create(eval.Alerting, ""), - create(eval.Pending, ""), - create(eval.NoData, ""), - create(eval.Error, ""), - } - - transitionToKey := map[ngmodels.AlertInstanceKey]StateTransition{} - transitions := make([]StateTransition, 0) - for _, fromState := range allStates { - for i, toState := range allStates { - tr := StateTransition{ - State: &State{ - State: toState.State, - StateReason: toState.Reason, - Labels: ngmodels.GenerateAlertLabels(5, fmt.Sprintf("%d--", i)), - }, - PreviousState: fromState.State, - PreviousStateReason: fromState.Reason, - } - key, err := tr.GetAlertInstanceKey() - require.NoError(t, err) - transitionToKey[key] = tr - transitions = append(transitions, tr) - } - } - - t.Run("should save all transitions if doNotSaveNormalState is false", func(t *testing.T) { - st := &FakeInstanceStore{} - m := Manager{instanceStore: st, doNotSaveNormalState: false, maxStateSaveConcurrency: 1} - m.saveAlertStates(context.Background(), &logtest.Fake{}, transitions...) - - savedKeys := map[ngmodels.AlertInstanceKey]ngmodels.AlertInstance{} - for _, op := range st.RecordedOps { - saved := op.(ngmodels.AlertInstance) - savedKeys[saved.AlertInstanceKey] = saved - } - assert.Len(t, transitionToKey, len(savedKeys)) - - for key, tr := range transitionToKey { - assert.Containsf(t, savedKeys, key, "state %s (%s) was not saved but should be", tr.State.State, tr.StateReason) - } - }) - - t.Run("should not save Normal->Normal if doNotSaveNormalState is true", func(t *testing.T) { - st := &FakeInstanceStore{} - m := Manager{instanceStore: st, doNotSaveNormalState: true, maxStateSaveConcurrency: 1} - m.saveAlertStates(context.Background(), &logtest.Fake{}, transitions...) - - savedKeys := map[ngmodels.AlertInstanceKey]ngmodels.AlertInstance{} - for _, op := range st.RecordedOps { - saved := op.(ngmodels.AlertInstance) - savedKeys[saved.AlertInstanceKey] = saved - } - for key, tr := range transitionToKey { - if tr.State.State == eval.Normal && tr.StateReason == "" && tr.PreviousState == eval.Normal && tr.PreviousStateReason == "" { - continue - } - assert.Containsf(t, savedKeys, key, "state %s (%s) was not saved but should be", tr.State.State, tr.StateReason) - } - }) -} - // TestProcessEvalResults_StateTransitions tests how state.Manager's ProcessEvalResults processes results and creates or changes states. // In other words, it tests the state transition. // @@ -328,19 +247,18 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { testMetrics := metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics() cfg := ManagerCfg{ - Metrics: testMetrics, - Tracer: tracing.InitializeTracerForTest(), - Log: log.New("ngalert.state.manager"), - ExternalURL: nil, - InstanceStore: &FakeInstanceStore{}, - Images: &NotAvailableImageService{}, - Clock: clk, - Historian: &FakeHistorian{}, - MaxStateSaveConcurrency: 1, + Metrics: testMetrics, + Tracer: tracing.InitializeTracerForTest(), + Log: log.New("ngalert.state.manager"), + ExternalURL: nil, + InstanceStore: &FakeInstanceStore{}, + Images: &NotAvailableImageService{}, + Clock: clk, + Historian: &FakeHistorian{}, ApplyNoDataAndErrorToAllStates: applyNoDataErrorToAllStates, } - st := NewManager(cfg) + st := NewManager(cfg, NewNoopPersister()) tss := make([]time.Time, 0, len(resultsAtTime)) for ts, results := range resultsAtTime { diff --git a/pkg/services/ngalert/state/manager_test.go b/pkg/services/ngalert/state/manager_test.go index 0be0460777b..ee582b7856e 100644 --- a/pkg/services/ngalert/state/manager_test.go +++ b/pkg/services/ngalert/state/manager_test.go @@ -204,17 +204,16 @@ func TestWarmStateCache(t *testing.T) { } cfg := state.ManagerCfg{ - Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(), - ExternalURL: nil, - InstanceStore: dbstore, - Images: &state.NoopImageService{}, - Clock: clock.NewMock(), - Historian: &state.FakeHistorian{}, - MaxStateSaveConcurrency: 1, - Tracer: tracing.InitializeTracerForTest(), - Log: log.New("ngalert.state.manager"), + Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(), + ExternalURL: nil, + InstanceStore: dbstore, + Images: &state.NoopImageService{}, + Clock: clock.NewMock(), + Historian: &state.FakeHistorian{}, + Tracer: tracing.InitializeTracerForTest(), + Log: log.New("ngalert.state.manager"), } - st := state.NewManager(cfg) + st := state.NewManager(cfg, state.NewNoopPersister()) st.Warm(ctx, dbstore) t.Run("instance cache has expected entries", func(t *testing.T) { @@ -242,17 +241,16 @@ func TestDashboardAnnotations(t *testing.T) { store := historian.NewAnnotationStore(fakeAnnoRepo, &dashboards.FakeDashboardService{}, historianMetrics) hist := historian.NewAnnotationBackend(store, nil, historianMetrics) cfg := state.ManagerCfg{ - Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(), - ExternalURL: nil, - InstanceStore: dbstore, - Images: &state.NoopImageService{}, - Clock: clock.New(), - Historian: hist, - MaxStateSaveConcurrency: 1, - Tracer: tracing.InitializeTracerForTest(), - Log: log.New("ngalert.state.manager"), + Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(), + ExternalURL: nil, + InstanceStore: dbstore, + Images: &state.NoopImageService{}, + Clock: clock.New(), + Historian: hist, + Tracer: tracing.InitializeTracerForTest(), + Log: log.New("ngalert.state.manager"), } - st := state.NewManager(cfg) + st := state.NewManager(cfg, state.NewNoopPersister()) const mainOrgID int64 = 1 @@ -1257,17 +1255,16 @@ func TestProcessEvalResults(t *testing.T) { hist := historian.NewAnnotationBackend(store, nil, m) clk := clock.NewMock() cfg := state.ManagerCfg{ - Metrics: stateMetrics, - ExternalURL: nil, - InstanceStore: &state.FakeInstanceStore{}, - Images: &state.NotAvailableImageService{}, - Clock: clk, - Historian: hist, - MaxStateSaveConcurrency: 1, - Tracer: tracing.InitializeTracerForTest(), - Log: log.New("ngalert.state.manager"), + Metrics: stateMetrics, + ExternalURL: nil, + InstanceStore: &state.FakeInstanceStore{}, + Images: &state.NotAvailableImageService{}, + Clock: clk, + Historian: hist, + Tracer: tracing.InitializeTracerForTest(), + Log: log.New("ngalert.state.manager"), } - st := state.NewManager(cfg) + st := state.NewManager(cfg, state.NewNoopPersister()) evals := make([]time.Time, 0, len(tc.evalResults)) for evalTime := range tc.evalResults { @@ -1364,11 +1361,12 @@ func TestProcessEvalResults(t *testing.T) { Images: &state.NotAvailableImageService{}, Clock: clk, Historian: &state.FakeHistorian{}, - MaxStateSaveConcurrency: 1, Tracer: tracing.InitializeTracerForTest(), Log: log.New("ngalert.state.manager"), + MaxStateSaveConcurrency: 1, } - st := state.NewManager(cfg) + statePersister := state.NewSyncStatePersisiter(log.New("ngalert.state.manager.persist"), cfg) + st := state.NewManager(cfg, statePersister) rule := models.AlertRuleGen()() var results = eval.GenerateResults(rand.Intn(4)+1, eval.ResultGen(eval.WithEvaluatedAt(clk.Now()))) @@ -1511,17 +1509,16 @@ func TestStaleResultsHandler(t *testing.T) { for _, tc := range testCases { ctx := context.Background() cfg := state.ManagerCfg{ - Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(), - ExternalURL: nil, - InstanceStore: dbstore, - Images: &state.NoopImageService{}, - Clock: clock.New(), - Historian: &state.FakeHistorian{}, - MaxStateSaveConcurrency: 1, - Tracer: tracing.InitializeTracerForTest(), - Log: log.New("ngalert.state.manager"), + Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(), + ExternalURL: nil, + InstanceStore: dbstore, + Images: &state.NoopImageService{}, + Clock: clock.New(), + Historian: &state.FakeHistorian{}, + Tracer: tracing.InitializeTracerForTest(), + Log: log.New("ngalert.state.manager"), } - st := state.NewManager(cfg) + st := state.NewManager(cfg, state.NewNoopPersister()) st.Warm(ctx, dbstore) existingStatesForRule := st.GetStatesForRuleUID(rule.OrgID, rule.UID) @@ -1594,17 +1591,16 @@ func TestStaleResults(t *testing.T) { store := &state.FakeInstanceStore{} cfg := state.ManagerCfg{ - Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(), - ExternalURL: nil, - InstanceStore: store, - Images: &state.NoopImageService{}, - Clock: clk, - Historian: &state.FakeHistorian{}, - MaxStateSaveConcurrency: 1, - Tracer: tracing.InitializeTracerForTest(), - Log: log.New("ngalert.state.manager"), + Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(), + ExternalURL: nil, + InstanceStore: store, + Images: &state.NoopImageService{}, + Clock: clk, + Historian: &state.FakeHistorian{}, + Tracer: tracing.InitializeTracerForTest(), + Log: log.New("ngalert.state.manager"), } - st := state.NewManager(cfg) + st := state.NewManager(cfg, state.NewNoopPersister()) rule := models.AlertRuleGen(models.WithFor(0))() @@ -1768,17 +1764,16 @@ func TestDeleteStateByRuleUID(t *testing.T) { clk := clock.NewMock() clk.Set(time.Now()) cfg := state.ManagerCfg{ - Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(), - ExternalURL: nil, - InstanceStore: dbstore, - Images: &state.NoopImageService{}, - Clock: clk, - Historian: &state.FakeHistorian{}, - MaxStateSaveConcurrency: 1, - Tracer: tracing.InitializeTracerForTest(), - Log: log.New("ngalert.state.manager"), + Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(), + ExternalURL: nil, + InstanceStore: dbstore, + Images: &state.NoopImageService{}, + Clock: clk, + Historian: &state.FakeHistorian{}, + Tracer: tracing.InitializeTracerForTest(), + Log: log.New("ngalert.state.manager"), } - st := state.NewManager(cfg) + st := state.NewManager(cfg, state.NewNoopPersister()) st.Warm(ctx, dbstore) q := &models.ListAlertInstancesQuery{RuleOrgID: rule.OrgID, RuleUID: rule.UID} alerts, _ := dbstore.ListAlertInstances(ctx, q) @@ -1910,17 +1905,16 @@ func TestResetStateByRuleUID(t *testing.T) { clk := clock.NewMock() clk.Set(time.Now()) cfg := state.ManagerCfg{ - Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(), - ExternalURL: nil, - InstanceStore: dbstore, - Images: &state.NoopImageService{}, - Clock: clk, - Historian: fakeHistorian, - MaxStateSaveConcurrency: 1, - Tracer: tracing.InitializeTracerForTest(), - Log: log.New("ngalert.state.manager"), + Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(), + ExternalURL: nil, + InstanceStore: dbstore, + Images: &state.NoopImageService{}, + Clock: clk, + Historian: fakeHistorian, + Tracer: tracing.InitializeTracerForTest(), + Log: log.New("ngalert.state.manager"), } - st := state.NewManager(cfg) + st := state.NewManager(cfg, state.NewNoopPersister()) st.Warm(ctx, dbstore) q := &models.ListAlertInstancesQuery{RuleOrgID: rule.OrgID, RuleUID: rule.UID} alerts, _ := dbstore.ListAlertInstances(ctx, q) diff --git a/pkg/services/ngalert/state/persister_noop.go b/pkg/services/ngalert/state/persister_noop.go new file mode 100644 index 00000000000..c1abd5dc1d6 --- /dev/null +++ b/pkg/services/ngalert/state/persister_noop.go @@ -0,0 +1,17 @@ +package state + +import ( + "context" + + "github.com/benbjohnson/clock" + "go.opentelemetry.io/otel/trace" +) + +type NoopPersister struct{} + +func (n *NoopPersister) Async(_ context.Context, _ *clock.Ticker, _ *cache) {} +func (n *NoopPersister) Sync(_ context.Context, _ trace.Span, _, _ []StateTransition) {} + +func NewNoopPersister() StatePersister { + return &NoopPersister{} +} diff --git a/pkg/services/ngalert/state/persister_sync.go b/pkg/services/ngalert/state/persister_sync.go new file mode 100644 index 00000000000..5d28922b213 --- /dev/null +++ b/pkg/services/ngalert/state/persister_sync.go @@ -0,0 +1,111 @@ +package state + +import ( + "context" + "time" + + "github.com/benbjohnson/clock" + "github.com/grafana/dskit/concurrency" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + + "github.com/grafana/grafana/pkg/infra/log" + ngModels "github.com/grafana/grafana/pkg/services/ngalert/models" +) + +type SyncStatePersister struct { + log log.Logger + store InstanceStore + // doNotSaveNormalState controls whether eval.Normal state is persisted to the database and returned by get methods. + doNotSaveNormalState bool + // maxStateSaveConcurrency controls the number of goroutines (per rule) that can save alert state in parallel. + maxStateSaveConcurrency int +} + +func NewSyncStatePersisiter(log log.Logger, cfg ManagerCfg) StatePersister { + return &SyncStatePersister{ + log: log, + store: cfg.InstanceStore, + doNotSaveNormalState: cfg.DoNotSaveNormalState, + maxStateSaveConcurrency: cfg.MaxStateSaveConcurrency, + } +} + +func (a *SyncStatePersister) Async(_ context.Context, _ *clock.Ticker, _ *cache) { + a.log.Debug("Async: No-Op") +} +func (a *SyncStatePersister) Sync(ctx context.Context, span trace.Span, states, staleStates []StateTransition) { + a.deleteAlertStates(ctx, staleStates) + if len(staleStates) > 0 { + span.AddEvent("deleted stale states", trace.WithAttributes( + attribute.Int64("state_transitions", int64(len(staleStates))), + )) + } + + a.saveAlertStates(ctx, states...) + span.AddEvent("updated database") +} + +func (a *SyncStatePersister) deleteAlertStates(ctx context.Context, states []StateTransition) { + if a.store == nil || len(states) == 0 { + return + } + + a.log.Debug("Deleting alert states", "count", len(states)) + toDelete := make([]ngModels.AlertInstanceKey, 0, len(states)) + + for _, s := range states { + key, err := s.GetAlertInstanceKey() + if err != nil { + a.log.Error("Failed to delete alert instance with invalid labels", "cacheID", s.CacheID, "error", err) + continue + } + toDelete = append(toDelete, key) + } + + err := a.store.DeleteAlertInstances(ctx, toDelete...) + if err != nil { + a.log.Error("Failed to delete stale states", "error", err) + } +} + +func (a *SyncStatePersister) saveAlertStates(ctx context.Context, states ...StateTransition) { + if a.store == nil || len(states) == 0 { + return + } + + saveState := func(ctx context.Context, idx int) error { + s := states[idx] + // Do not save normal state to database and remove transition to Normal state but keep mapped states + if a.doNotSaveNormalState && IsNormalStateWithNoReason(s.State) && !s.Changed() { + return nil + } + + key, err := s.GetAlertInstanceKey() + if err != nil { + a.log.Error("Failed to create a key for alert state to save it to database. The state will be ignored ", "cacheID", s.CacheID, "error", err, "labels", s.Labels.String()) + return nil + } + instance := ngModels.AlertInstance{ + AlertInstanceKey: key, + Labels: ngModels.InstanceLabels(s.Labels), + CurrentState: ngModels.InstanceStateType(s.State.State.String()), + CurrentReason: s.StateReason, + LastEvalTime: s.LastEvaluationTime, + CurrentStateSince: s.StartsAt, + CurrentStateEnd: s.EndsAt, + } + + err = a.store.SaveAlertInstance(ctx, instance) + if err != nil { + a.log.Error("Failed to save alert state", "labels", s.Labels.String(), "state", s.State, "error", err) + return nil + } + return nil + } + + start := time.Now() + a.log.Debug("Saving alert states", "count", len(states), "max_state_save_concurrency", a.maxStateSaveConcurrency) + _ = concurrency.ForEachJob(ctx, len(states), a.maxStateSaveConcurrency, saveState) + a.log.Debug("Saving alert states done", "count", len(states), "max_state_save_concurrency", a.maxStateSaveConcurrency, "duration", time.Since(start)) +} diff --git a/pkg/services/ngalert/state/persister_sync_test.go b/pkg/services/ngalert/state/persister_sync_test.go new file mode 100644 index 00000000000..7b8c93729b4 --- /dev/null +++ b/pkg/services/ngalert/state/persister_sync_test.go @@ -0,0 +1,103 @@ +package state + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/component-base/tracing" + + "github.com/grafana/grafana/pkg/infra/log/logtest" + "github.com/grafana/grafana/pkg/services/ngalert/eval" + ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models" + "github.com/grafana/grafana/pkg/util" +) + +func TestSyncPersister_saveAlertStates(t *testing.T) { + type stateWithReason struct { + State eval.State + Reason string + } + create := func(s eval.State, r string) stateWithReason { + return stateWithReason{ + State: s, + Reason: r, + } + } + allStates := [...]stateWithReason{ + create(eval.Normal, ""), + create(eval.Normal, eval.NoData.String()), + create(eval.Normal, eval.Error.String()), + create(eval.Normal, util.GenerateShortUID()), + create(eval.Alerting, ""), + create(eval.Pending, ""), + create(eval.NoData, ""), + create(eval.Error, ""), + } + + transitionToKey := map[ngmodels.AlertInstanceKey]StateTransition{} + transitions := make([]StateTransition, 0) + for _, fromState := range allStates { + for i, toState := range allStates { + tr := StateTransition{ + State: &State{ + State: toState.State, + StateReason: toState.Reason, + Labels: ngmodels.GenerateAlertLabels(5, fmt.Sprintf("%d--", i)), + }, + PreviousState: fromState.State, + PreviousStateReason: fromState.Reason, + } + key, err := tr.GetAlertInstanceKey() + require.NoError(t, err) + transitionToKey[key] = tr + transitions = append(transitions, tr) + } + } + + t.Run("should save all transitions if doNotSaveNormalState is false", func(t *testing.T) { + trace := tracing.NewNoopTracerProvider().Tracer("test") + _, span := trace.Start(context.Background(), "") + st := &FakeInstanceStore{} + syncStatePersister := NewSyncStatePersisiter(&logtest.Fake{}, ManagerCfg{ + InstanceStore: st, + MaxStateSaveConcurrency: 1, + }) + syncStatePersister.Sync(context.Background(), span, transitions, nil) + savedKeys := map[ngmodels.AlertInstanceKey]ngmodels.AlertInstance{} + for _, op := range st.RecordedOps { + saved := op.(ngmodels.AlertInstance) + savedKeys[saved.AlertInstanceKey] = saved + } + assert.Len(t, transitionToKey, len(savedKeys)) + + for key, tr := range transitionToKey { + assert.Containsf(t, savedKeys, key, "state %s (%s) was not saved but should be", tr.State.State, tr.StateReason) + } + }) + + t.Run("should not save Normal->Normal if doNotSaveNormalState is true", func(t *testing.T) { + trace := tracing.NewNoopTracerProvider().Tracer("test") + _, span := trace.Start(context.Background(), "") + st := &FakeInstanceStore{} + syncStatePersister := NewSyncStatePersisiter(&logtest.Fake{}, ManagerCfg{ + InstanceStore: st, + MaxStateSaveConcurrency: 1, + }) + syncStatePersister.Sync(context.Background(), span, transitions, nil) + + savedKeys := map[ngmodels.AlertInstanceKey]ngmodels.AlertInstance{} + for _, op := range st.RecordedOps { + saved := op.(ngmodels.AlertInstance) + savedKeys[saved.AlertInstanceKey] = saved + } + for key, tr := range transitionToKey { + if tr.State.State == eval.Normal && tr.StateReason == "" && tr.PreviousState == eval.Normal && tr.PreviousStateReason == "" { + continue + } + assert.Containsf(t, savedKeys, key, "state %s (%s) was not saved but should be", tr.State.State, tr.StateReason) + } + }) +}