From f45265b5f77666d9a493f011ba4db05f9ba69b9a Mon Sep 17 00:00:00 2001 From: Alexander Akhmetov Date: Fri, 31 Jan 2025 23:34:00 +0100 Subject: [PATCH] Alerting: Read from both proto and simple DB instance stores on startup (#99855) --- pkg/services/ngalert/ngalert.go | 41 ++-- pkg/services/ngalert/ngalert_test.go | 5 +- .../ngalert/state/multi_instance_reader.go | 133 +++++++++++ .../state/multi_instance_reader_test.go | 208 ++++++++++++++++++ pkg/services/ngalert/state/persist.go | 16 +- 5 files changed, 381 insertions(+), 22 deletions(-) create mode 100644 pkg/services/ngalert/state/multi_instance_reader.go create mode 100644 pkg/services/ngalert/state/multi_instance_reader_test.go diff --git a/pkg/services/ngalert/ngalert.go b/pkg/services/ngalert/ngalert.go index 2270620dc76..c7fe68d1d68 100644 --- a/pkg/services/ngalert/ngalert.go +++ b/pkg/services/ngalert/ngalert.go @@ -148,6 +148,8 @@ type AlertNG struct { Api *api.API httpClientProvider httpclient.Provider InstanceStore state.InstanceStore + // StartupInstanceReader is used to fetch the state of alerts on startup. + StartupInstanceReader state.InstanceReader // Alerting notification services MultiOrgAlertmanager *notifier.MultiOrgAlertmanager @@ -404,7 +406,7 @@ func (ng *AlertNG) init() error { return err } - ng.InstanceStore = initInstanceStore(ng.store.SQLStore, ng.Log.New("ngalert.state.instancestore"), ng.FeatureToggles) + ng.InstanceStore, ng.StartupInstanceReader = initInstanceStore(ng.store.SQLStore, ng.Log, ng.FeatureToggles) stateManagerCfg := state.ManagerCfg{ Metrics: ng.Metrics.GetStateMetrics(), @@ -519,17 +521,30 @@ func (ng *AlertNG) init() error { return DeclareFixedRoles(ng.AccesscontrolService, ng.FeatureToggles) } -func initInstanceStore(sqlStore db.DB, logger log.Logger, featureToggles featuremgmt.FeatureToggles) state.InstanceStore { +// initInstanceStore initializes the instance store based on the feature toggles. +// It returns two vales: the instance store that should be used for writing alert instances, +// and an alert instance reader that can be used to read alert instances on startup. +func initInstanceStore(sqlStore db.DB, logger log.Logger, featureToggles featuremgmt.FeatureToggles) (state.InstanceStore, state.InstanceReader) { var instanceStore state.InstanceStore + // We init both stores here, but only one will be used based on the feature toggles. + // Two stores are needed for the multi-instance reader to work correctly. + // It's used to read the state of alerts on startup, and allows switching the feature + // flags seamlessly without losing the state of alerts. + protoInstanceStore := store.ProtoInstanceDBStore{ + SQLStore: sqlStore, + Logger: logger, + FeatureToggles: featureToggles, + } + simpleInstanceStore := store.InstanceDBStore{ + SQLStore: sqlStore, + Logger: logger, + FeatureToggles: featureToggles, + } + if featureToggles.IsEnabledGlobally(featuremgmt.FlagAlertingSaveStateCompressed) { logger.Info("Using protobuf-based alert instance store") - instanceStore = store.ProtoInstanceDBStore{ - SQLStore: sqlStore, - Logger: logger, - FeatureToggles: featureToggles, - } - + instanceStore = protoInstanceStore // If FlagAlertingSaveStateCompressed is enabled, ProtoInstanceDBStore is used, // which functions differently from InstanceDBStore. FlagAlertingSaveStatePeriodic is // not applicable to ProtoInstanceDBStore, so a warning is logged if it is set. @@ -538,14 +553,10 @@ func initInstanceStore(sqlStore db.DB, logger log.Logger, featureToggles feature } } else { logger.Info("Using simple database alert instance store") - instanceStore = store.InstanceDBStore{ - SQLStore: sqlStore, - Logger: logger, - FeatureToggles: featureToggles, - } + instanceStore = simpleInstanceStore } - return instanceStore + return instanceStore, state.NewMultiInstanceReader(logger, protoInstanceStore, simpleInstanceStore) } func initStatePersister(uaCfg setting.UnifiedAlertingSettings, cfg state.ManagerCfg, featureToggles featuremgmt.FeatureToggles) state.StatePersister { @@ -605,7 +616,7 @@ func (ng *AlertNG) Run(ctx context.Context) error { // Also note that this runs synchronously to ensure state is loaded // before rule evaluation begins, hence we use ctx and not subCtx. // - ng.stateManager.Warm(ctx, ng.store, ng.InstanceStore) + ng.stateManager.Warm(ctx, ng.store, ng.StartupInstanceReader) children.Go(func() error { return ng.schedule.Run(subCtx) diff --git a/pkg/services/ngalert/ngalert_test.go b/pkg/services/ngalert/ngalert_test.go index 2012ff930fc..b1930a4c3e6 100644 --- a/pkg/services/ngalert/ngalert_test.go +++ b/pkg/services/ngalert/ngalert_test.go @@ -234,8 +234,11 @@ func TestInitInstanceStore(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - instanceStore := initInstanceStore(sqlStore, logger, tt.ft) + instanceStore, instanceReader := initInstanceStore(sqlStore, logger, tt.ft) assert.IsType(t, tt.expectedInstanceStoreType, instanceStore) + assert.IsType(t, &state.MultiInstanceReader{}, instanceReader) + assert.IsType(t, store.ProtoInstanceDBStore{}, instanceReader.(*state.MultiInstanceReader).ProtoDBReader) + assert.IsType(t, store.InstanceDBStore{}, instanceReader.(*state.MultiInstanceReader).DBReader) }) } } diff --git a/pkg/services/ngalert/state/multi_instance_reader.go b/pkg/services/ngalert/state/multi_instance_reader.go new file mode 100644 index 00000000000..bf71bd85aab --- /dev/null +++ b/pkg/services/ngalert/state/multi_instance_reader.go @@ -0,0 +1,133 @@ +package state + +import ( + "context" + "fmt" + "maps" + "slices" + "time" + + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/services/ngalert/models" +) + +// MultiInstanceReader merges results from two InstanceReaders. +// +// For each rule, it picks the state that has the newer LastEvalTime. +type MultiInstanceReader struct { + ProtoDBReader InstanceReader + DBReader InstanceReader + logger log.Logger +} + +func NewMultiInstanceReader(logger log.Logger, r1, r2 InstanceReader) *MultiInstanceReader { + return &MultiInstanceReader{ + ProtoDBReader: r1, + DBReader: r2, + logger: logger, + } +} + +// FetchOrgIds merges org IDs from both readers. +func (m *MultiInstanceReader) FetchOrgIds(ctx context.Context) ([]int64, error) { + orgsOne, err := m.ProtoDBReader.FetchOrgIds(ctx) + if err != nil { + return nil, fmt.Errorf("failed to fetch org IDs from ProtoDBReader: %w", err) + } + + orgsTwo, err := m.DBReader.FetchOrgIds(ctx) + if err != nil { + return nil, fmt.Errorf("failed to fetch org IDs from DBReader: %w", err) + } + + orgsSet := make(map[int64]struct{}) + + for _, orgID := range orgsOne { + orgsSet[orgID] = struct{}{} + } + for _, orgID := range orgsTwo { + orgsSet[orgID] = struct{}{} + } + + return slices.Collect(maps.Keys(orgsSet)), nil +} + +// ListAlertInstances fetches alert instances for a query from both readers, +// groups them by rule UID, and returns the newest instances for each rule as a +// single slice. +func (m *MultiInstanceReader) ListAlertInstances(ctx context.Context, cmd *models.ListAlertInstancesQuery) ([]*models.AlertInstance, error) { + instancesOne, err := m.ProtoDBReader.ListAlertInstances(ctx, cmd) + if err != nil { + return nil, fmt.Errorf("failed to list alert instances from ProtoDBReader: %w", err) + } + + instancesTwo, err := m.DBReader.ListAlertInstances(ctx, cmd) + if err != nil { + return nil, fmt.Errorf("failed to list alert instances from DBReader: %w", err) + } + + byRuleOne := groupByRuleUID(instancesOne) + byRuleTwo := groupByRuleUID(instancesTwo) + + ruleSet := make(map[string]struct{}) + for uid := range byRuleOne { + ruleSet[uid] = struct{}{} + } + for uid := range byRuleTwo { + ruleSet[uid] = struct{}{} + } + + merged := make([]*models.AlertInstance, 0) + for ruleUID := range ruleSet { + sliceA := byRuleOne[ruleUID] + sliceB := byRuleTwo[ruleUID] + newer := getNewestAlertInstances(m.logger, sliceA, sliceB) + merged = append(merged, newer...) + } + + return merged, nil +} + +func groupByRuleUID(instances []*models.AlertInstance) map[string][]*models.AlertInstance { + result := make(map[string][]*models.AlertInstance) + for _, inst := range instances { + if inst == nil { + continue + } + result[inst.RuleUID] = append(result[inst.RuleUID], inst) + } + return result +} + +// getNewestAlertInstances returns the newest alert instances slice +// by comparing the maximum LastEvalTime in each of them. If one is empty, returns the other. +func getNewestAlertInstances(logger log.Logger, firstInstances, secondInstances []*models.AlertInstance) []*models.AlertInstance { + if len(firstInstances) == 0 { + return secondInstances + } + if len(secondInstances) == 0 { + return firstInstances + } + + maxFirst := maxLastEvalTime(firstInstances) + maxSecond := maxLastEvalTime(secondInstances) + + if maxSecond.After(maxFirst) { + logger.Debug("Newer alert instances found", "first_last_eval_time", maxFirst, "second_last_eval_time", maxSecond, "rule_uid", firstInstances[0].RuleUID) + return secondInstances + } + return firstInstances +} + +// maxLastEvalTime finds the maximum LastEvalTime among a slice of alert instances. +func maxLastEvalTime(instances []*models.AlertInstance) time.Time { + var max time.Time + + for _, i := range instances { + if i != nil && i.LastEvalTime.After(max) { + max = i.LastEvalTime + } + } + + return max +} diff --git a/pkg/services/ngalert/state/multi_instance_reader_test.go b/pkg/services/ngalert/state/multi_instance_reader_test.go new file mode 100644 index 00000000000..5e20b594162 --- /dev/null +++ b/pkg/services/ngalert/state/multi_instance_reader_test.go @@ -0,0 +1,208 @@ +package state + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/grafana/grafana/pkg/infra/log/logtest" + "github.com/grafana/grafana/pkg/services/ngalert/models" +) + +type mockInstanceReader struct { + mock.Mock +} + +func (m *mockInstanceReader) FetchOrgIds(ctx context.Context) ([]int64, error) { + args := m.Called(ctx) + return args.Get(0).([]int64), args.Error(1) +} + +func (m *mockInstanceReader) ListAlertInstances(ctx context.Context, cmd *models.ListAlertInstancesQuery) ([]*models.AlertInstance, error) { + args := m.Called(ctx, cmd) + return args.Get(0).([]*models.AlertInstance), args.Error(1) +} + +func TestMultiInstanceReader_FetchOrgIds(t *testing.T) { + tests := []struct { + name string + mockAOrgIDs []int64 + mockBOrgIDs []int64 + mockAError error + mockBError error + expectedOrgIDs []int64 + expectError bool + }{ + { + name: "both readers empty, no errors", + mockAOrgIDs: []int64{}, + mockBOrgIDs: []int64{}, + expectedOrgIDs: []int64{}, + }, + { + name: "simple union, no errors", + mockAOrgIDs: []int64{1, 2}, + mockBOrgIDs: []int64{2, 3}, + expectedOrgIDs: []int64{1, 2, 3}, + }, + { + name: "error in readerA", + mockAError: errors.New("some error"), + expectError: true, + }, + { + name: "error in readerB", + mockBError: errors.New("another error"), + expectError: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + + readerA := &mockInstanceReader{} + if tc.mockAError != nil { + readerA.On("FetchOrgIds", mock.Anything).Return([]int64(nil), tc.mockAError).Once() + } else { + readerA.On("FetchOrgIds", mock.Anything).Return(tc.mockAOrgIDs, nil).Once() + } + + readerB := &mockInstanceReader{} + if tc.mockBError != nil { + readerB.On("FetchOrgIds", mock.Anything).Return([]int64(nil), tc.mockBError).Once() + } else { + readerB.On("FetchOrgIds", mock.Anything).Return(tc.mockBOrgIDs, nil).Once() + } + + multi := NewMultiInstanceReader(&logtest.Fake{}, readerA, readerB) + orgIDs, err := multi.FetchOrgIds(ctx) + + if tc.expectError { + require.Error(t, err) + return + } + require.NoError(t, err) + require.ElementsMatch(t, tc.expectedOrgIDs, orgIDs) + + readerA.AssertExpectations(t) + readerB.AssertExpectations(t) + }) + } +} + +func TestMultiInstanceReader_ListAlertInstances(t *testing.T) { + t1 := time.Unix(100, 0) + t2 := time.Unix(200, 0) + t3 := time.Unix(300, 0) + + ruleA := "rule-1" + ruleB := "rule-2" + + instAOld := &models.AlertInstance{AlertInstanceKey: models.AlertInstanceKey{RuleUID: ruleA}, LastEvalTime: t1} + instANew := &models.AlertInstance{AlertInstanceKey: models.AlertInstanceKey{RuleUID: ruleA}, LastEvalTime: t3} + instBMid := &models.AlertInstance{AlertInstanceKey: models.AlertInstanceKey{RuleUID: ruleB}, LastEvalTime: t2} + instASameTime := &models.AlertInstance{AlertInstanceKey: models.AlertInstanceKey{RuleUID: ruleA}, LastEvalTime: t1} + + tests := []struct { + name string + readerAInstances []*models.AlertInstance + readerBInstances []*models.AlertInstance + readerAError error + readerBError error + expectedInstances []*models.AlertInstance + expectError bool + }{ + { + name: "both readers return empty lists", + readerAInstances: []*models.AlertInstance{}, + readerBInstances: []*models.AlertInstance{}, + expectedInstances: []*models.AlertInstance{}, + }, + { + name: "when readerB is empty, use instances from readerA", + readerAInstances: []*models.AlertInstance{instAOld}, + readerBInstances: []*models.AlertInstance{}, + expectedInstances: []*models.AlertInstance{instAOld}, + }, + { + name: "when readerA is empty, use instances from readerB", + readerAInstances: []*models.AlertInstance{}, + readerBInstances: []*models.AlertInstance{instANew}, + expectedInstances: []*models.AlertInstance{instANew}, + }, + { + name: "when same rule exists in both readers, picks instances from reader with newer evaluation time", + readerAInstances: []*models.AlertInstance{instAOld}, + readerBInstances: []*models.AlertInstance{instANew}, + expectedInstances: []*models.AlertInstance{instANew}, + }, + { + name: "combines instances across rules using newest evaluation time per rule", + readerAInstances: []*models.AlertInstance{instAOld, instBMid}, + readerBInstances: []*models.AlertInstance{instANew}, + expectedInstances: []*models.AlertInstance{instANew, instBMid}, + }, + { + name: "error from readerA", + readerAError: errors.New("some error"), + expectError: true, + }, + { + name: "error from readerB", + readerBError: errors.New("another error"), + expectError: true, + }, + { + name: "nil instances are filtered out from results", + readerAInstances: []*models.AlertInstance{nil, instAOld}, + readerBInstances: []*models.AlertInstance{nil}, + expectedInstances: []*models.AlertInstance{instAOld}, + }, + { + name: "when instances have equal evaluation times, picks instances from readerA", + readerAInstances: []*models.AlertInstance{instAOld}, + readerBInstances: []*models.AlertInstance{instASameTime}, + expectedInstances: []*models.AlertInstance{instAOld}, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + + mockA := &mockInstanceReader{} + if tc.readerAError != nil { + mockA.On("ListAlertInstances", mock.Anything, mock.Anything).Return([]*models.AlertInstance(nil), tc.readerAError).Once() + } else { + mockA.On("ListAlertInstances", mock.Anything, mock.Anything).Return(tc.readerAInstances, nil).Once() + } + + mockB := &mockInstanceReader{} + if tc.readerBError != nil { + mockB.On("ListAlertInstances", mock.Anything, mock.Anything).Return([]*models.AlertInstance(nil), tc.readerBError).Once() + } else { + mockB.On("ListAlertInstances", mock.Anything, mock.Anything).Return(tc.readerBInstances, nil).Once() + } + + multi := NewMultiInstanceReader(&logtest.Fake{}, mockA, mockB) + cmd := &models.ListAlertInstancesQuery{RuleOrgID: 1234} + got, err := multi.ListAlertInstances(ctx, cmd) + + if tc.expectError { + require.Error(t, err) + return + } + require.NoError(t, err) + + require.ElementsMatch(t, tc.expectedInstances, got) + + mockA.AssertExpectations(t) + mockB.AssertExpectations(t) + }) + } +} diff --git a/pkg/services/ngalert/state/persist.go b/pkg/services/ngalert/state/persist.go index b4157c8cb69..841a7dacdd9 100644 --- a/pkg/services/ngalert/state/persist.go +++ b/pkg/services/ngalert/state/persist.go @@ -10,7 +10,17 @@ import ( // InstanceStore represents the ability to fetch and write alert instances. type InstanceStore interface { InstanceReader + InstanceWriter +} + +// InstanceReader provides methods to fetch alert instances. +type InstanceReader interface { + FetchOrgIds(ctx context.Context) ([]int64, error) + ListAlertInstances(ctx context.Context, cmd *models.ListAlertInstancesQuery) ([]*models.AlertInstance, error) +} +// InstanceWriter provides methods to write alert instances. +type InstanceWriter interface { SaveAlertInstance(ctx context.Context, instance models.AlertInstance) error DeleteAlertInstances(ctx context.Context, keys ...models.AlertInstanceKey) error // SaveAlertInstancesForRule overwrites the state for the given rule. @@ -19,12 +29,6 @@ type InstanceStore interface { FullSync(ctx context.Context, instances []models.AlertInstance, batchSize int) error } -// InstanceReader provides methods to fetch alert instances. -type InstanceReader interface { - FetchOrgIds(ctx context.Context) ([]int64, error) - ListAlertInstances(ctx context.Context, cmd *models.ListAlertInstancesQuery) ([]*models.AlertInstance, error) -} - // RuleReader represents the ability to fetch alert rules. type RuleReader interface { ListAlertRules(ctx context.Context, query *models.ListAlertRulesQuery) (models.RulesGroup, error)