From 9d182986f19c20aa700fc0577d79bb738adb14c2 Mon Sep 17 00:00:00 2001 From: Alexander Akhmetov Date: Fri, 11 Oct 2024 15:25:29 +0200 Subject: [PATCH] Alerting: make StatePersister more configurable to support custom rule-level state persisters (#94590) --- pkg/services/ngalert/state/cache.go | 4 ++-- pkg/services/ngalert/state/manager.go | 6 +++--- pkg/services/ngalert/state/persister_async.go | 17 +++++++++++------ pkg/services/ngalert/state/persister_noop.go | 7 +++++-- pkg/services/ngalert/state/persister_sync.go | 4 ++-- .../ngalert/state/persister_sync_test.go | 7 ++++--- 6 files changed, 27 insertions(+), 18 deletions(-) diff --git a/pkg/services/ngalert/state/cache.go b/pkg/services/ngalert/state/cache.go index 28a5391f55d..0da09784d38 100644 --- a/pkg/services/ngalert/state/cache.go +++ b/pkg/services/ngalert/state/cache.go @@ -346,8 +346,8 @@ func (c *cache) removeByRuleUID(orgID int64, uid string) []*State { return states } -// asInstances returns the whole content of the cache as a slice of AlertInstance. -func (c *cache) asInstances(skipNormalState bool) []ngModels.AlertInstance { +// GetAlertInstances returns the whole content of the cache as a slice of AlertInstance. +func (c *cache) GetAlertInstances(skipNormalState bool) []ngModels.AlertInstance { var states []ngModels.AlertInstance c.mtxStates.RLock() defer c.mtxStates.RUnlock() diff --git a/pkg/services/ngalert/state/manager.go b/pkg/services/ngalert/state/manager.go index 94c5371e308..b351f6c274f 100644 --- a/pkg/services/ngalert/state/manager.go +++ b/pkg/services/ngalert/state/manager.go @@ -31,8 +31,8 @@ type AlertInstanceManager interface { } type StatePersister interface { - Async(ctx context.Context, cache *cache) - Sync(ctx context.Context, span trace.Span, states StateTransitions) + Async(ctx context.Context, instancesProvider AlertInstancesProvider) + Sync(ctx context.Context, span trace.Span, ruleKey ngModels.AlertRuleKeyWithGroup, states StateTransitions) } // Sender is an optional callback intended for sending the states to an alertmanager. @@ -347,7 +347,7 @@ func (st *Manager) ProcessEvalResults( statesToSend = st.updateLastSentAt(allChanges, evaluatedAt) } - st.persister.Sync(ctx, span, allChanges) + st.persister.Sync(ctx, span, alertRule.GetKeyWithGroup(), allChanges) if st.historian != nil { st.historian.Record(ctx, history_model.NewRuleMeta(alertRule, logger), allChanges) } diff --git a/pkg/services/ngalert/state/persister_async.go b/pkg/services/ngalert/state/persister_async.go index 91807f26921..cc617866202 100644 --- a/pkg/services/ngalert/state/persister_async.go +++ b/pkg/services/ngalert/state/persister_async.go @@ -9,8 +9,13 @@ import ( "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/services/ngalert/metrics" + "github.com/grafana/grafana/pkg/services/ngalert/models" ) +type AlertInstancesProvider interface { + GetAlertInstances(skipNormalState bool) []models.AlertInstance +} + type AsyncStatePersister struct { log log.Logger // doNotSaveNormalState controls whether eval.Normal state is persisted to the database and returned by get methods. @@ -30,16 +35,16 @@ func NewAsyncStatePersister(log log.Logger, ticker *clock.Ticker, cfg ManagerCfg } } -func (a *AsyncStatePersister) Async(ctx context.Context, cache *cache) { +func (a *AsyncStatePersister) Async(ctx context.Context, instancesProvider AlertInstancesProvider) { for { select { case <-a.ticker.C: - if err := a.fullSync(ctx, cache); err != nil { + if err := a.fullSync(ctx, instancesProvider); err != nil { a.log.Error("Failed to do a full state sync to database", "err", err) } case <-ctx.Done(): a.log.Info("Scheduler is shutting down, doing a final state sync.") - if err := a.fullSync(context.Background(), cache); err != nil { + if err := a.fullSync(context.Background(), instancesProvider); err != nil { a.log.Error("Failed to do a full state sync to database", "err", err) } a.ticker.Stop() @@ -49,10 +54,10 @@ func (a *AsyncStatePersister) Async(ctx context.Context, cache *cache) { } } -func (a *AsyncStatePersister) fullSync(ctx context.Context, cache *cache) error { +func (a *AsyncStatePersister) fullSync(ctx context.Context, instancesProvider AlertInstancesProvider) error { startTime := time.Now() a.log.Debug("Full state sync start") - instances := cache.asInstances(a.doNotSaveNormalState) + instances := instancesProvider.GetAlertInstances(a.doNotSaveNormalState) if err := a.store.FullSync(ctx, instances); err != nil { a.log.Error("Full state sync failed", "duration", time.Since(startTime), "instances", len(instances)) return err @@ -64,6 +69,6 @@ func (a *AsyncStatePersister) fullSync(ctx context.Context, cache *cache) error return nil } -func (a *AsyncStatePersister) Sync(_ context.Context, _ trace.Span, _ StateTransitions) { +func (a *AsyncStatePersister) Sync(_ context.Context, _ trace.Span, _ models.AlertRuleKeyWithGroup, _ StateTransitions) { a.log.Debug("Sync: No-Op") } diff --git a/pkg/services/ngalert/state/persister_noop.go b/pkg/services/ngalert/state/persister_noop.go index 0275bc5f351..0e770bdc1c4 100644 --- a/pkg/services/ngalert/state/persister_noop.go +++ b/pkg/services/ngalert/state/persister_noop.go @@ -4,12 +4,15 @@ import ( "context" "go.opentelemetry.io/otel/trace" + + "github.com/grafana/grafana/pkg/services/ngalert/models" ) type NoopPersister struct{} -func (n *NoopPersister) Async(_ context.Context, _ *cache) {} -func (n *NoopPersister) Sync(_ context.Context, _ trace.Span, _ StateTransitions) {} +func (n *NoopPersister) Async(_ context.Context, _ AlertInstancesProvider) {} +func (n *NoopPersister) Sync(_ context.Context, _ trace.Span, _ models.AlertRuleKeyWithGroup, _ StateTransitions) { +} func NewNoopPersister() StatePersister { return &NoopPersister{} diff --git a/pkg/services/ngalert/state/persister_sync.go b/pkg/services/ngalert/state/persister_sync.go index 1aee0bed09e..5e0e653547b 100644 --- a/pkg/services/ngalert/state/persister_sync.go +++ b/pkg/services/ngalert/state/persister_sync.go @@ -30,12 +30,12 @@ func NewSyncStatePersisiter(log log.Logger, cfg ManagerCfg) StatePersister { } } -func (a *SyncStatePersister) Async(_ context.Context, _ *cache) { +func (a *SyncStatePersister) Async(_ context.Context, _ AlertInstancesProvider) { a.log.Debug("Async: No-Op") } // Sync persists the state transitions to the database. It deletes stale states and saves the current states. -func (a *SyncStatePersister) Sync(ctx context.Context, span trace.Span, allStates StateTransitions) { +func (a *SyncStatePersister) Sync(ctx context.Context, span trace.Span, _ ngModels.AlertRuleKeyWithGroup, allStates StateTransitions) { staleStates := allStates.StaleStates() if len(staleStates) > 0 { a.deleteAlertStates(ctx, staleStates) diff --git a/pkg/services/ngalert/state/persister_sync_test.go b/pkg/services/ngalert/state/persister_sync_test.go index 390e97a4137..e4e98b91401 100644 --- a/pkg/services/ngalert/state/persister_sync_test.go +++ b/pkg/services/ngalert/state/persister_sync_test.go @@ -40,6 +40,7 @@ func TestSyncPersister_saveAlertStates(t *testing.T) { create(eval.NoData, ""), create(eval.Error, ""), } + ruleKey := ngmodels.AlertRuleKeyWithGroup{} transitionToKey := map[ngmodels.AlertInstanceKey]StateTransition{} transitions := make([]StateTransition, 0) @@ -69,7 +70,7 @@ func TestSyncPersister_saveAlertStates(t *testing.T) { InstanceStore: st, MaxStateSaveConcurrency: 1, }) - syncStatePersister.Sync(context.Background(), span, transitions) + syncStatePersister.Sync(context.Background(), span, ruleKey, transitions) savedKeys := map[ngmodels.AlertInstanceKey]ngmodels.AlertInstance{} for _, op := range st.RecordedOps() { saved := op.(ngmodels.AlertInstance) @@ -90,7 +91,7 @@ func TestSyncPersister_saveAlertStates(t *testing.T) { InstanceStore: st, MaxStateSaveConcurrency: 1, }) - syncStatePersister.Sync(context.Background(), span, transitions) + syncStatePersister.Sync(context.Background(), span, ruleKey, transitions) savedKeys := map[ngmodels.AlertInstanceKey]ngmodels.AlertInstance{} for _, op := range st.RecordedOps() { @@ -160,7 +161,7 @@ func TestSyncPersister_saveAlertStates(t *testing.T) { PreviousStateReason: util.GenerateShortUID(), } - syncStatePersister.Sync(context.Background(), span, []StateTransition{transition}) + syncStatePersister.Sync(context.Background(), span, ruleKey, []StateTransition{transition}) require.Len(t, st.RecordedOps(), 1) saved := st.RecordedOps()[0].(ngmodels.AlertInstance)