Alerting: Read from both proto and simple DB instance stores on startup (#99855)

pull/98752/head^2
Alexander Akhmetov 4 months ago committed by GitHub
parent e03656669a
commit f45265b5f7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 41
      pkg/services/ngalert/ngalert.go
  2. 5
      pkg/services/ngalert/ngalert_test.go
  3. 133
      pkg/services/ngalert/state/multi_instance_reader.go
  4. 208
      pkg/services/ngalert/state/multi_instance_reader_test.go
  5. 16
      pkg/services/ngalert/state/persist.go

@ -148,6 +148,8 @@ type AlertNG struct {
Api *api.API
httpClientProvider httpclient.Provider
InstanceStore state.InstanceStore
// StartupInstanceReader is used to fetch the state of alerts on startup.
StartupInstanceReader state.InstanceReader
// Alerting notification services
MultiOrgAlertmanager *notifier.MultiOrgAlertmanager
@ -404,7 +406,7 @@ func (ng *AlertNG) init() error {
return err
}
ng.InstanceStore = initInstanceStore(ng.store.SQLStore, ng.Log.New("ngalert.state.instancestore"), ng.FeatureToggles)
ng.InstanceStore, ng.StartupInstanceReader = initInstanceStore(ng.store.SQLStore, ng.Log, ng.FeatureToggles)
stateManagerCfg := state.ManagerCfg{
Metrics: ng.Metrics.GetStateMetrics(),
@ -519,17 +521,30 @@ func (ng *AlertNG) init() error {
return DeclareFixedRoles(ng.AccesscontrolService, ng.FeatureToggles)
}
func initInstanceStore(sqlStore db.DB, logger log.Logger, featureToggles featuremgmt.FeatureToggles) state.InstanceStore {
// initInstanceStore initializes the instance store based on the feature toggles.
// It returns two vales: the instance store that should be used for writing alert instances,
// and an alert instance reader that can be used to read alert instances on startup.
func initInstanceStore(sqlStore db.DB, logger log.Logger, featureToggles featuremgmt.FeatureToggles) (state.InstanceStore, state.InstanceReader) {
var instanceStore state.InstanceStore
// We init both stores here, but only one will be used based on the feature toggles.
// Two stores are needed for the multi-instance reader to work correctly.
// It's used to read the state of alerts on startup, and allows switching the feature
// flags seamlessly without losing the state of alerts.
protoInstanceStore := store.ProtoInstanceDBStore{
SQLStore: sqlStore,
Logger: logger,
FeatureToggles: featureToggles,
}
simpleInstanceStore := store.InstanceDBStore{
SQLStore: sqlStore,
Logger: logger,
FeatureToggles: featureToggles,
}
if featureToggles.IsEnabledGlobally(featuremgmt.FlagAlertingSaveStateCompressed) {
logger.Info("Using protobuf-based alert instance store")
instanceStore = store.ProtoInstanceDBStore{
SQLStore: sqlStore,
Logger: logger,
FeatureToggles: featureToggles,
}
instanceStore = protoInstanceStore
// If FlagAlertingSaveStateCompressed is enabled, ProtoInstanceDBStore is used,
// which functions differently from InstanceDBStore. FlagAlertingSaveStatePeriodic is
// not applicable to ProtoInstanceDBStore, so a warning is logged if it is set.
@ -538,14 +553,10 @@ func initInstanceStore(sqlStore db.DB, logger log.Logger, featureToggles feature
}
} else {
logger.Info("Using simple database alert instance store")
instanceStore = store.InstanceDBStore{
SQLStore: sqlStore,
Logger: logger,
FeatureToggles: featureToggles,
}
instanceStore = simpleInstanceStore
}
return instanceStore
return instanceStore, state.NewMultiInstanceReader(logger, protoInstanceStore, simpleInstanceStore)
}
func initStatePersister(uaCfg setting.UnifiedAlertingSettings, cfg state.ManagerCfg, featureToggles featuremgmt.FeatureToggles) state.StatePersister {
@ -605,7 +616,7 @@ func (ng *AlertNG) Run(ctx context.Context) error {
// Also note that this runs synchronously to ensure state is loaded
// before rule evaluation begins, hence we use ctx and not subCtx.
//
ng.stateManager.Warm(ctx, ng.store, ng.InstanceStore)
ng.stateManager.Warm(ctx, ng.store, ng.StartupInstanceReader)
children.Go(func() error {
return ng.schedule.Run(subCtx)

@ -234,8 +234,11 @@ func TestInitInstanceStore(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
instanceStore := initInstanceStore(sqlStore, logger, tt.ft)
instanceStore, instanceReader := initInstanceStore(sqlStore, logger, tt.ft)
assert.IsType(t, tt.expectedInstanceStoreType, instanceStore)
assert.IsType(t, &state.MultiInstanceReader{}, instanceReader)
assert.IsType(t, store.ProtoInstanceDBStore{}, instanceReader.(*state.MultiInstanceReader).ProtoDBReader)
assert.IsType(t, store.InstanceDBStore{}, instanceReader.(*state.MultiInstanceReader).DBReader)
})
}
}

@ -0,0 +1,133 @@
package state
import (
"context"
"fmt"
"maps"
"slices"
"time"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/ngalert/models"
)
// MultiInstanceReader merges results from two InstanceReaders.
//
// For each rule, it picks the state that has the newer LastEvalTime.
type MultiInstanceReader struct {
ProtoDBReader InstanceReader
DBReader InstanceReader
logger log.Logger
}
func NewMultiInstanceReader(logger log.Logger, r1, r2 InstanceReader) *MultiInstanceReader {
return &MultiInstanceReader{
ProtoDBReader: r1,
DBReader: r2,
logger: logger,
}
}
// FetchOrgIds merges org IDs from both readers.
func (m *MultiInstanceReader) FetchOrgIds(ctx context.Context) ([]int64, error) {
orgsOne, err := m.ProtoDBReader.FetchOrgIds(ctx)
if err != nil {
return nil, fmt.Errorf("failed to fetch org IDs from ProtoDBReader: %w", err)
}
orgsTwo, err := m.DBReader.FetchOrgIds(ctx)
if err != nil {
return nil, fmt.Errorf("failed to fetch org IDs from DBReader: %w", err)
}
orgsSet := make(map[int64]struct{})
for _, orgID := range orgsOne {
orgsSet[orgID] = struct{}{}
}
for _, orgID := range orgsTwo {
orgsSet[orgID] = struct{}{}
}
return slices.Collect(maps.Keys(orgsSet)), nil
}
// ListAlertInstances fetches alert instances for a query from both readers,
// groups them by rule UID, and returns the newest instances for each rule as a
// single slice.
func (m *MultiInstanceReader) ListAlertInstances(ctx context.Context, cmd *models.ListAlertInstancesQuery) ([]*models.AlertInstance, error) {
instancesOne, err := m.ProtoDBReader.ListAlertInstances(ctx, cmd)
if err != nil {
return nil, fmt.Errorf("failed to list alert instances from ProtoDBReader: %w", err)
}
instancesTwo, err := m.DBReader.ListAlertInstances(ctx, cmd)
if err != nil {
return nil, fmt.Errorf("failed to list alert instances from DBReader: %w", err)
}
byRuleOne := groupByRuleUID(instancesOne)
byRuleTwo := groupByRuleUID(instancesTwo)
ruleSet := make(map[string]struct{})
for uid := range byRuleOne {
ruleSet[uid] = struct{}{}
}
for uid := range byRuleTwo {
ruleSet[uid] = struct{}{}
}
merged := make([]*models.AlertInstance, 0)
for ruleUID := range ruleSet {
sliceA := byRuleOne[ruleUID]
sliceB := byRuleTwo[ruleUID]
newer := getNewestAlertInstances(m.logger, sliceA, sliceB)
merged = append(merged, newer...)
}
return merged, nil
}
func groupByRuleUID(instances []*models.AlertInstance) map[string][]*models.AlertInstance {
result := make(map[string][]*models.AlertInstance)
for _, inst := range instances {
if inst == nil {
continue
}
result[inst.RuleUID] = append(result[inst.RuleUID], inst)
}
return result
}
// getNewestAlertInstances returns the newest alert instances slice
// by comparing the maximum LastEvalTime in each of them. If one is empty, returns the other.
func getNewestAlertInstances(logger log.Logger, firstInstances, secondInstances []*models.AlertInstance) []*models.AlertInstance {
if len(firstInstances) == 0 {
return secondInstances
}
if len(secondInstances) == 0 {
return firstInstances
}
maxFirst := maxLastEvalTime(firstInstances)
maxSecond := maxLastEvalTime(secondInstances)
if maxSecond.After(maxFirst) {
logger.Debug("Newer alert instances found", "first_last_eval_time", maxFirst, "second_last_eval_time", maxSecond, "rule_uid", firstInstances[0].RuleUID)
return secondInstances
}
return firstInstances
}
// maxLastEvalTime finds the maximum LastEvalTime among a slice of alert instances.
func maxLastEvalTime(instances []*models.AlertInstance) time.Time {
var max time.Time
for _, i := range instances {
if i != nil && i.LastEvalTime.After(max) {
max = i.LastEvalTime
}
}
return max
}

@ -0,0 +1,208 @@
package state
import (
"context"
"errors"
"testing"
"time"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/infra/log/logtest"
"github.com/grafana/grafana/pkg/services/ngalert/models"
)
type mockInstanceReader struct {
mock.Mock
}
func (m *mockInstanceReader) FetchOrgIds(ctx context.Context) ([]int64, error) {
args := m.Called(ctx)
return args.Get(0).([]int64), args.Error(1)
}
func (m *mockInstanceReader) ListAlertInstances(ctx context.Context, cmd *models.ListAlertInstancesQuery) ([]*models.AlertInstance, error) {
args := m.Called(ctx, cmd)
return args.Get(0).([]*models.AlertInstance), args.Error(1)
}
func TestMultiInstanceReader_FetchOrgIds(t *testing.T) {
tests := []struct {
name string
mockAOrgIDs []int64
mockBOrgIDs []int64
mockAError error
mockBError error
expectedOrgIDs []int64
expectError bool
}{
{
name: "both readers empty, no errors",
mockAOrgIDs: []int64{},
mockBOrgIDs: []int64{},
expectedOrgIDs: []int64{},
},
{
name: "simple union, no errors",
mockAOrgIDs: []int64{1, 2},
mockBOrgIDs: []int64{2, 3},
expectedOrgIDs: []int64{1, 2, 3},
},
{
name: "error in readerA",
mockAError: errors.New("some error"),
expectError: true,
},
{
name: "error in readerB",
mockBError: errors.New("another error"),
expectError: true,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
readerA := &mockInstanceReader{}
if tc.mockAError != nil {
readerA.On("FetchOrgIds", mock.Anything).Return([]int64(nil), tc.mockAError).Once()
} else {
readerA.On("FetchOrgIds", mock.Anything).Return(tc.mockAOrgIDs, nil).Once()
}
readerB := &mockInstanceReader{}
if tc.mockBError != nil {
readerB.On("FetchOrgIds", mock.Anything).Return([]int64(nil), tc.mockBError).Once()
} else {
readerB.On("FetchOrgIds", mock.Anything).Return(tc.mockBOrgIDs, nil).Once()
}
multi := NewMultiInstanceReader(&logtest.Fake{}, readerA, readerB)
orgIDs, err := multi.FetchOrgIds(ctx)
if tc.expectError {
require.Error(t, err)
return
}
require.NoError(t, err)
require.ElementsMatch(t, tc.expectedOrgIDs, orgIDs)
readerA.AssertExpectations(t)
readerB.AssertExpectations(t)
})
}
}
func TestMultiInstanceReader_ListAlertInstances(t *testing.T) {
t1 := time.Unix(100, 0)
t2 := time.Unix(200, 0)
t3 := time.Unix(300, 0)
ruleA := "rule-1"
ruleB := "rule-2"
instAOld := &models.AlertInstance{AlertInstanceKey: models.AlertInstanceKey{RuleUID: ruleA}, LastEvalTime: t1}
instANew := &models.AlertInstance{AlertInstanceKey: models.AlertInstanceKey{RuleUID: ruleA}, LastEvalTime: t3}
instBMid := &models.AlertInstance{AlertInstanceKey: models.AlertInstanceKey{RuleUID: ruleB}, LastEvalTime: t2}
instASameTime := &models.AlertInstance{AlertInstanceKey: models.AlertInstanceKey{RuleUID: ruleA}, LastEvalTime: t1}
tests := []struct {
name string
readerAInstances []*models.AlertInstance
readerBInstances []*models.AlertInstance
readerAError error
readerBError error
expectedInstances []*models.AlertInstance
expectError bool
}{
{
name: "both readers return empty lists",
readerAInstances: []*models.AlertInstance{},
readerBInstances: []*models.AlertInstance{},
expectedInstances: []*models.AlertInstance{},
},
{
name: "when readerB is empty, use instances from readerA",
readerAInstances: []*models.AlertInstance{instAOld},
readerBInstances: []*models.AlertInstance{},
expectedInstances: []*models.AlertInstance{instAOld},
},
{
name: "when readerA is empty, use instances from readerB",
readerAInstances: []*models.AlertInstance{},
readerBInstances: []*models.AlertInstance{instANew},
expectedInstances: []*models.AlertInstance{instANew},
},
{
name: "when same rule exists in both readers, picks instances from reader with newer evaluation time",
readerAInstances: []*models.AlertInstance{instAOld},
readerBInstances: []*models.AlertInstance{instANew},
expectedInstances: []*models.AlertInstance{instANew},
},
{
name: "combines instances across rules using newest evaluation time per rule",
readerAInstances: []*models.AlertInstance{instAOld, instBMid},
readerBInstances: []*models.AlertInstance{instANew},
expectedInstances: []*models.AlertInstance{instANew, instBMid},
},
{
name: "error from readerA",
readerAError: errors.New("some error"),
expectError: true,
},
{
name: "error from readerB",
readerBError: errors.New("another error"),
expectError: true,
},
{
name: "nil instances are filtered out from results",
readerAInstances: []*models.AlertInstance{nil, instAOld},
readerBInstances: []*models.AlertInstance{nil},
expectedInstances: []*models.AlertInstance{instAOld},
},
{
name: "when instances have equal evaluation times, picks instances from readerA",
readerAInstances: []*models.AlertInstance{instAOld},
readerBInstances: []*models.AlertInstance{instASameTime},
expectedInstances: []*models.AlertInstance{instAOld},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
mockA := &mockInstanceReader{}
if tc.readerAError != nil {
mockA.On("ListAlertInstances", mock.Anything, mock.Anything).Return([]*models.AlertInstance(nil), tc.readerAError).Once()
} else {
mockA.On("ListAlertInstances", mock.Anything, mock.Anything).Return(tc.readerAInstances, nil).Once()
}
mockB := &mockInstanceReader{}
if tc.readerBError != nil {
mockB.On("ListAlertInstances", mock.Anything, mock.Anything).Return([]*models.AlertInstance(nil), tc.readerBError).Once()
} else {
mockB.On("ListAlertInstances", mock.Anything, mock.Anything).Return(tc.readerBInstances, nil).Once()
}
multi := NewMultiInstanceReader(&logtest.Fake{}, mockA, mockB)
cmd := &models.ListAlertInstancesQuery{RuleOrgID: 1234}
got, err := multi.ListAlertInstances(ctx, cmd)
if tc.expectError {
require.Error(t, err)
return
}
require.NoError(t, err)
require.ElementsMatch(t, tc.expectedInstances, got)
mockA.AssertExpectations(t)
mockB.AssertExpectations(t)
})
}
}

@ -10,7 +10,17 @@ import (
// InstanceStore represents the ability to fetch and write alert instances.
type InstanceStore interface {
InstanceReader
InstanceWriter
}
// InstanceReader provides methods to fetch alert instances.
type InstanceReader interface {
FetchOrgIds(ctx context.Context) ([]int64, error)
ListAlertInstances(ctx context.Context, cmd *models.ListAlertInstancesQuery) ([]*models.AlertInstance, error)
}
// InstanceWriter provides methods to write alert instances.
type InstanceWriter interface {
SaveAlertInstance(ctx context.Context, instance models.AlertInstance) error
DeleteAlertInstances(ctx context.Context, keys ...models.AlertInstanceKey) error
// SaveAlertInstancesForRule overwrites the state for the given rule.
@ -19,12 +29,6 @@ type InstanceStore interface {
FullSync(ctx context.Context, instances []models.AlertInstance, batchSize int) error
}
// InstanceReader provides methods to fetch alert instances.
type InstanceReader interface {
FetchOrgIds(ctx context.Context) ([]int64, error)
ListAlertInstances(ctx context.Context, cmd *models.ListAlertInstancesQuery) ([]*models.AlertInstance, error)
}
// RuleReader represents the ability to fetch alert rules.
type RuleReader interface {
ListAlertRules(ctx context.Context, query *models.ListAlertRulesQuery) (models.RulesGroup, error)

Loading…
Cancel
Save