diff --git a/pkg/services/ngalert/api.go b/pkg/services/ngalert/api.go index 615ae09f574..69397381105 100644 --- a/pkg/services/ngalert/api.go +++ b/pkg/services/ngalert/api.go @@ -9,42 +9,52 @@ import ( "github.com/grafana/grafana/pkg/api/routing" "github.com/grafana/grafana/pkg/middleware" "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/services/datasources" "github.com/grafana/grafana/pkg/services/ngalert/eval" + "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/tsdb" "github.com/grafana/grafana/pkg/util" ) -func (ng *AlertNG) registerAPIEndpoints() { - ng.RouteRegister.Group("/api/alert-definitions", func(alertDefinitions routing.RouteRegister) { - alertDefinitions.Get("", middleware.ReqSignedIn, routing.Wrap(ng.listAlertDefinitions)) - alertDefinitions.Get("/eval/:alertDefinitionUID", middleware.ReqSignedIn, ng.validateOrgAlertDefinition, routing.Wrap(ng.alertDefinitionEvalEndpoint)) - alertDefinitions.Post("/eval", middleware.ReqSignedIn, binding.Bind(evalAlertConditionCommand{}), routing.Wrap(ng.conditionEvalEndpoint)) - alertDefinitions.Get("/:alertDefinitionUID", middleware.ReqSignedIn, ng.validateOrgAlertDefinition, routing.Wrap(ng.getAlertDefinitionEndpoint)) - alertDefinitions.Delete("/:alertDefinitionUID", middleware.ReqEditorRole, ng.validateOrgAlertDefinition, routing.Wrap(ng.deleteAlertDefinitionEndpoint)) - alertDefinitions.Post("/", middleware.ReqEditorRole, binding.Bind(saveAlertDefinitionCommand{}), routing.Wrap(ng.createAlertDefinitionEndpoint)) - alertDefinitions.Put("/:alertDefinitionUID", middleware.ReqEditorRole, ng.validateOrgAlertDefinition, binding.Bind(updateAlertDefinitionCommand{}), routing.Wrap(ng.updateAlertDefinitionEndpoint)) - alertDefinitions.Post("/pause", middleware.ReqEditorRole, binding.Bind(updateAlertDefinitionPausedCommand{}), routing.Wrap(ng.alertDefinitionPauseEndpoint)) - alertDefinitions.Post("/unpause", middleware.ReqEditorRole, binding.Bind(updateAlertDefinitionPausedCommand{}), routing.Wrap(ng.alertDefinitionUnpauseEndpoint)) +type apiImpl struct { + Cfg *setting.Cfg `inject:""` + DatasourceCache datasources.CacheService `inject:""` + RouteRegister routing.RouteRegister `inject:""` + schedule scheduleService + store store +} + +func (api *apiImpl) registerAPIEndpoints() { + api.RouteRegister.Group("/api/alert-definitions", func(alertDefinitions routing.RouteRegister) { + alertDefinitions.Get("", middleware.ReqSignedIn, routing.Wrap(api.listAlertDefinitions)) + alertDefinitions.Get("/eval/:alertDefinitionUID", middleware.ReqSignedIn, api.validateOrgAlertDefinition, routing.Wrap(api.alertDefinitionEvalEndpoint)) + alertDefinitions.Post("/eval", middleware.ReqSignedIn, binding.Bind(evalAlertConditionCommand{}), routing.Wrap(api.conditionEvalEndpoint)) + alertDefinitions.Get("/:alertDefinitionUID", middleware.ReqSignedIn, api.validateOrgAlertDefinition, routing.Wrap(api.getAlertDefinitionEndpoint)) + alertDefinitions.Delete("/:alertDefinitionUID", middleware.ReqEditorRole, api.validateOrgAlertDefinition, routing.Wrap(api.deleteAlertDefinitionEndpoint)) + alertDefinitions.Post("/", middleware.ReqEditorRole, binding.Bind(saveAlertDefinitionCommand{}), routing.Wrap(api.createAlertDefinitionEndpoint)) + alertDefinitions.Put("/:alertDefinitionUID", middleware.ReqEditorRole, api.validateOrgAlertDefinition, binding.Bind(updateAlertDefinitionCommand{}), routing.Wrap(api.updateAlertDefinitionEndpoint)) + alertDefinitions.Post("/pause", middleware.ReqEditorRole, binding.Bind(updateAlertDefinitionPausedCommand{}), routing.Wrap(api.alertDefinitionPauseEndpoint)) + alertDefinitions.Post("/unpause", middleware.ReqEditorRole, binding.Bind(updateAlertDefinitionPausedCommand{}), routing.Wrap(api.alertDefinitionUnpauseEndpoint)) }) - ng.RouteRegister.Group("/api/ngalert/", func(schedulerRouter routing.RouteRegister) { - schedulerRouter.Post("/pause", routing.Wrap(ng.pauseScheduler)) - schedulerRouter.Post("/unpause", routing.Wrap(ng.unpauseScheduler)) + api.RouteRegister.Group("/api/ngalert/", func(schedulerRouter routing.RouteRegister) { + schedulerRouter.Post("/pause", routing.Wrap(api.pauseScheduler)) + schedulerRouter.Post("/unpause", routing.Wrap(api.unpauseScheduler)) }, middleware.ReqOrgAdmin) - ng.RouteRegister.Group("/api/alert-instances", func(alertInstances routing.RouteRegister) { - alertInstances.Get("", middleware.ReqSignedIn, routing.Wrap(ng.listAlertInstancesEndpoint)) + api.RouteRegister.Group("/api/alert-instances", func(alertInstances routing.RouteRegister) { + alertInstances.Get("", middleware.ReqSignedIn, routing.Wrap(api.listAlertInstancesEndpoint)) }) } // conditionEvalEndpoint handles POST /api/alert-definitions/eval. -func (ng *AlertNG) conditionEvalEndpoint(c *models.ReqContext, cmd evalAlertConditionCommand) response.Response { +func (api *apiImpl) conditionEvalEndpoint(c *models.ReqContext, cmd evalAlertConditionCommand) response.Response { evalCond := eval.Condition{ RefID: cmd.Condition, OrgID: c.SignedInUser.OrgId, QueriesAndExpressions: cmd.Data, } - if err := ng.validateCondition(evalCond, c.SignedInUser, c.SkipCache); err != nil { + if err := api.validateCondition(evalCond, c.SignedInUser, c.SkipCache); err != nil { return response.Error(400, "invalid condition", err) } @@ -53,9 +63,8 @@ func (ng *AlertNG) conditionEvalEndpoint(c *models.ReqContext, cmd evalAlertCond now = timeNow() } - evaluator := eval.Evaluator{Cfg: ng.Cfg} - - evalResults, err := evaluator.ConditionEval(&evalCond, now) + evaluator := eval.Evaluator{Cfg: api.Cfg} + evalResults, err := evaluator.ConditionEval(&evalCond, timeNow()) if err != nil { return response.Error(400, "Failed to evaluate conditions", err) } @@ -72,20 +81,20 @@ func (ng *AlertNG) conditionEvalEndpoint(c *models.ReqContext, cmd evalAlertCond }) } -// alertDefinitionEvalEndpoint handles POST /api/alert-definitions/eval/:alertDefinitionUID. -func (ng *AlertNG) alertDefinitionEvalEndpoint(c *models.ReqContext) response.Response { +// alertDefinitionEvalEndpoint handles GET /api/alert-definitions/eval/:alertDefinitionUID. +func (api *apiImpl) alertDefinitionEvalEndpoint(c *models.ReqContext) response.Response { alertDefinitionUID := c.Params(":alertDefinitionUID") - condition, err := ng.LoadAlertCondition(alertDefinitionUID, c.SignedInUser.OrgId) + condition, err := api.LoadAlertCondition(alertDefinitionUID, c.SignedInUser.OrgId) if err != nil { return response.Error(400, "Failed to load alert definition conditions", err) } - if err := ng.validateCondition(*condition, c.SignedInUser, c.SkipCache); err != nil { + if err := api.validateCondition(*condition, c.SignedInUser, c.SkipCache); err != nil { return response.Error(400, "invalid condition", err) } - evaluator := eval.Evaluator{Cfg: ng.Cfg} + evaluator := eval.Evaluator{Cfg: api.Cfg} evalResults, err := evaluator.ConditionEval(condition, timeNow()) if err != nil { return response.Error(400, "Failed to evaluate alert", err) @@ -107,7 +116,7 @@ func (ng *AlertNG) alertDefinitionEvalEndpoint(c *models.ReqContext) response.Re } // getAlertDefinitionEndpoint handles GET /api/alert-definitions/:alertDefinitionUID. -func (ng *AlertNG) getAlertDefinitionEndpoint(c *models.ReqContext) response.Response { +func (api *apiImpl) getAlertDefinitionEndpoint(c *models.ReqContext) response.Response { alertDefinitionUID := c.Params(":alertDefinitionUID") query := getAlertDefinitionByUIDQuery{ @@ -115,7 +124,7 @@ func (ng *AlertNG) getAlertDefinitionEndpoint(c *models.ReqContext) response.Res OrgID: c.SignedInUser.OrgId, } - if err := ng.getAlertDefinitionByUID(&query); err != nil { + if err := api.store.getAlertDefinitionByUID(&query); err != nil { return response.Error(500, "Failed to get alert definition", err) } @@ -123,7 +132,7 @@ func (ng *AlertNG) getAlertDefinitionEndpoint(c *models.ReqContext) response.Res } // deleteAlertDefinitionEndpoint handles DELETE /api/alert-definitions/:alertDefinitionUID. -func (ng *AlertNG) deleteAlertDefinitionEndpoint(c *models.ReqContext) response.Response { +func (api *apiImpl) deleteAlertDefinitionEndpoint(c *models.ReqContext) response.Response { alertDefinitionUID := c.Params(":alertDefinitionUID") cmd := deleteAlertDefinitionByUIDCommand{ @@ -131,7 +140,7 @@ func (ng *AlertNG) deleteAlertDefinitionEndpoint(c *models.ReqContext) response. OrgID: c.SignedInUser.OrgId, } - if err := ng.deleteAlertDefinitionByUID(&cmd); err != nil { + if err := api.store.deleteAlertDefinitionByUID(&cmd); err != nil { return response.Error(500, "Failed to delete alert definition", err) } @@ -139,7 +148,7 @@ func (ng *AlertNG) deleteAlertDefinitionEndpoint(c *models.ReqContext) response. } // updateAlertDefinitionEndpoint handles PUT /api/alert-definitions/:alertDefinitionUID. -func (ng *AlertNG) updateAlertDefinitionEndpoint(c *models.ReqContext, cmd updateAlertDefinitionCommand) response.Response { +func (api *apiImpl) updateAlertDefinitionEndpoint(c *models.ReqContext, cmd updateAlertDefinitionCommand) response.Response { cmd.UID = c.Params(":alertDefinitionUID") cmd.OrgID = c.SignedInUser.OrgId @@ -148,11 +157,11 @@ func (ng *AlertNG) updateAlertDefinitionEndpoint(c *models.ReqContext, cmd updat OrgID: c.SignedInUser.OrgId, QueriesAndExpressions: cmd.Data, } - if err := ng.validateCondition(evalCond, c.SignedInUser, c.SkipCache); err != nil { + if err := api.validateCondition(evalCond, c.SignedInUser, c.SkipCache); err != nil { return response.Error(400, "invalid condition", err) } - if err := ng.updateAlertDefinition(&cmd); err != nil { + if err := api.store.updateAlertDefinition(&cmd); err != nil { return response.Error(500, "Failed to update alert definition", err) } @@ -160,7 +169,7 @@ func (ng *AlertNG) updateAlertDefinitionEndpoint(c *models.ReqContext, cmd updat } // createAlertDefinitionEndpoint handles POST /api/alert-definitions. -func (ng *AlertNG) createAlertDefinitionEndpoint(c *models.ReqContext, cmd saveAlertDefinitionCommand) response.Response { +func (api *apiImpl) createAlertDefinitionEndpoint(c *models.ReqContext, cmd saveAlertDefinitionCommand) response.Response { cmd.OrgID = c.SignedInUser.OrgId evalCond := eval.Condition{ @@ -168,11 +177,11 @@ func (ng *AlertNG) createAlertDefinitionEndpoint(c *models.ReqContext, cmd saveA OrgID: c.SignedInUser.OrgId, QueriesAndExpressions: cmd.Data, } - if err := ng.validateCondition(evalCond, c.SignedInUser, c.SkipCache); err != nil { + if err := api.validateCondition(evalCond, c.SignedInUser, c.SkipCache); err != nil { return response.Error(400, "invalid condition", err) } - if err := ng.saveAlertDefinition(&cmd); err != nil { + if err := api.store.saveAlertDefinition(&cmd); err != nil { return response.Error(500, "Failed to create alert definition", err) } @@ -180,26 +189,26 @@ func (ng *AlertNG) createAlertDefinitionEndpoint(c *models.ReqContext, cmd saveA } // listAlertDefinitions handles GET /api/alert-definitions. -func (ng *AlertNG) listAlertDefinitions(c *models.ReqContext) response.Response { +func (api *apiImpl) listAlertDefinitions(c *models.ReqContext) response.Response { query := listAlertDefinitionsQuery{OrgID: c.SignedInUser.OrgId} - if err := ng.getOrgAlertDefinitions(&query); err != nil { + if err := api.store.getOrgAlertDefinitions(&query); err != nil { return response.Error(500, "Failed to list alert definitions", err) } return response.JSON(200, util.DynMap{"results": query.Result}) } -func (ng *AlertNG) pauseScheduler() response.Response { - err := ng.schedule.pause() +func (api *apiImpl) pauseScheduler() response.Response { + err := api.schedule.Pause() if err != nil { return response.Error(500, "Failed to pause scheduler", err) } return response.JSON(200, util.DynMap{"message": "alert definition scheduler paused"}) } -func (ng *AlertNG) unpauseScheduler() response.Response { - err := ng.schedule.unpause() +func (api *apiImpl) unpauseScheduler() response.Response { + err := api.schedule.Unpause() if err != nil { return response.Error(500, "Failed to unpause scheduler", err) } @@ -207,11 +216,11 @@ func (ng *AlertNG) unpauseScheduler() response.Response { } // alertDefinitionPauseEndpoint handles POST /api/alert-definitions/pause. -func (ng *AlertNG) alertDefinitionPauseEndpoint(c *models.ReqContext, cmd updateAlertDefinitionPausedCommand) response.Response { +func (api *apiImpl) alertDefinitionPauseEndpoint(c *models.ReqContext, cmd updateAlertDefinitionPausedCommand) response.Response { cmd.OrgID = c.SignedInUser.OrgId cmd.Paused = true - err := ng.updateAlertDefinitionPaused(&cmd) + err := api.store.updateAlertDefinitionPaused(&cmd) if err != nil { return response.Error(500, "Failed to pause alert definition", err) } @@ -219,11 +228,11 @@ func (ng *AlertNG) alertDefinitionPauseEndpoint(c *models.ReqContext, cmd update } // alertDefinitionUnpauseEndpoint handles POST /api/alert-definitions/unpause. -func (ng *AlertNG) alertDefinitionUnpauseEndpoint(c *models.ReqContext, cmd updateAlertDefinitionPausedCommand) response.Response { +func (api *apiImpl) alertDefinitionUnpauseEndpoint(c *models.ReqContext, cmd updateAlertDefinitionPausedCommand) response.Response { cmd.OrgID = c.SignedInUser.OrgId cmd.Paused = false - err := ng.updateAlertDefinitionPaused(&cmd) + err := api.store.updateAlertDefinitionPaused(&cmd) if err != nil { return response.Error(500, "Failed to unpause alert definition", err) } diff --git a/pkg/services/ngalert/common_test.go b/pkg/services/ngalert/common_test.go index 5d437f64ad5..d9e0e4bbbf5 100644 --- a/pkg/services/ngalert/common_test.go +++ b/pkg/services/ngalert/common_test.go @@ -17,23 +17,20 @@ import ( "github.com/stretchr/testify/require" ) -func setupTestEnv(t *testing.T) *AlertNG { +func setupTestEnv(t *testing.T, baseIntervalSeconds int64) (AlertNG, *storeImpl) { cfg := setting.NewCfg() cfg.FeatureToggles = map[string]bool{"ngalert": true} - ng := overrideAlertNGInRegistry(cfg) - - sqlStore := sqlstore.InitTestDB(t) - ng.SQLStore = sqlStore + ng := overrideAlertNGInRegistry(t, cfg) + ng.SQLStore = sqlstore.InitTestDB(t) err := ng.Init() require.NoError(t, err) - return &ng + return ng, &storeImpl{SQLStore: ng.SQLStore, baseInterval: time.Duration(baseIntervalSeconds) * time.Second} } -func overrideAlertNGInRegistry(cfg *setting.Cfg) AlertNG { +func overrideAlertNGInRegistry(t *testing.T, cfg *setting.Cfg) AlertNG { ng := AlertNG{ - SQLStore: nil, Cfg: cfg, RouteRegister: routing.NewRouteRegister(), log: log.New("ngalert-test"), @@ -42,7 +39,7 @@ func overrideAlertNGInRegistry(cfg *setting.Cfg) AlertNG { overrideServiceFunc := func(descriptor registry.Descriptor) (*registry.Descriptor, bool) { if _, ok := descriptor.Instance.(*AlertNG); ok { return ®istry.Descriptor{ - Name: "AlertNG", + Name: descriptor.Name, Instance: &ng, InitPriority: descriptor.InitPriority, }, true @@ -55,7 +52,7 @@ func overrideAlertNGInRegistry(cfg *setting.Cfg) AlertNG { return ng } -func createTestAlertDefinition(t *testing.T, ng *AlertNG, intervalSeconds int64) *AlertDefinition { +func createTestAlertDefinition(t *testing.T, store *storeImpl, intervalSeconds int64) *AlertDefinition { cmd := saveAlertDefinitionCommand{ OrgID: 1, Title: fmt.Sprintf("an alert definition %d", rand.Intn(1000)), @@ -76,7 +73,7 @@ func createTestAlertDefinition(t *testing.T, ng *AlertNG, intervalSeconds int64) }, IntervalSeconds: &intervalSeconds, } - err := ng.saveAlertDefinition(&cmd) + err := store.saveAlertDefinition(&cmd) require.NoError(t, err) t.Logf("alert definition: %v with interval: %d created", cmd.Result.getKey(), intervalSeconds) return cmd.Result diff --git a/pkg/services/ngalert/database.go b/pkg/services/ngalert/database.go index e02a2fd7754..4956fe23a4f 100644 --- a/pkg/services/ngalert/database.go +++ b/pkg/services/ngalert/database.go @@ -5,11 +5,32 @@ import ( "errors" "fmt" "strings" + "time" "github.com/grafana/grafana/pkg/services/sqlstore" "github.com/grafana/grafana/pkg/util" ) +type store interface { + deleteAlertDefinitionByUID(*deleteAlertDefinitionByUIDCommand) error + getAlertDefinitionByUID(*getAlertDefinitionByUIDQuery) error + getAlertDefinitions(query *listAlertDefinitionsQuery) error + getOrgAlertDefinitions(query *listAlertDefinitionsQuery) error + saveAlertDefinition(*saveAlertDefinitionCommand) error + updateAlertDefinition(*updateAlertDefinitionCommand) error + getAlertInstance(*getAlertInstanceQuery) error + listAlertInstances(cmd *listAlertInstancesQuery) error + saveAlertInstance(cmd *saveAlertInstanceCommand) error + validateAlertDefinition(*AlertDefinition, bool) error + updateAlertDefinitionPaused(*updateAlertDefinitionPausedCommand) error +} + +type storeImpl struct { + // the base scheduler tick rate; it's used for validating definition interval + baseInterval time.Duration + SQLStore *sqlstore.SQLStore `inject:""` +} + func getAlertDefinitionByUID(sess *sqlstore.DBSession, alertDefinitionUID string, orgID int64) (*AlertDefinition, error) { // we consider optionally enabling some caching alertDefinition := AlertDefinition{OrgID: orgID, UID: alertDefinitionUID} @@ -25,8 +46,8 @@ func getAlertDefinitionByUID(sess *sqlstore.DBSession, alertDefinitionUID string // deleteAlertDefinitionByID is a handler for deleting an alert definition. // It returns models.ErrAlertDefinitionNotFound if no alert definition is found for the provided ID. -func (ng *AlertNG) deleteAlertDefinitionByUID(cmd *deleteAlertDefinitionByUIDCommand) error { - return ng.SQLStore.WithTransactionalDbSession(context.Background(), func(sess *sqlstore.DBSession) error { +func (st storeImpl) deleteAlertDefinitionByUID(cmd *deleteAlertDefinitionByUIDCommand) error { + return st.SQLStore.WithTransactionalDbSession(context.Background(), func(sess *sqlstore.DBSession) error { _, err := sess.Exec("DELETE FROM alert_definition WHERE uid = ? AND org_id = ?", cmd.UID, cmd.OrgID) if err != nil { return err @@ -47,8 +68,8 @@ func (ng *AlertNG) deleteAlertDefinitionByUID(cmd *deleteAlertDefinitionByUIDCom // getAlertDefinitionByUID is a handler for retrieving an alert definition from that database by its UID and organisation ID. // It returns models.ErrAlertDefinitionNotFound if no alert definition is found for the provided ID. -func (ng *AlertNG) getAlertDefinitionByUID(query *getAlertDefinitionByUIDQuery) error { - return ng.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error { +func (st storeImpl) getAlertDefinitionByUID(query *getAlertDefinitionByUIDQuery) error { + return st.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error { alertDefinition, err := getAlertDefinitionByUID(sess, query.UID, query.OrgID) if err != nil { return err @@ -59,8 +80,8 @@ func (ng *AlertNG) getAlertDefinitionByUID(query *getAlertDefinitionByUIDQuery) } // saveAlertDefinition is a handler for saving a new alert definition. -func (ng *AlertNG) saveAlertDefinition(cmd *saveAlertDefinitionCommand) error { - return ng.SQLStore.WithTransactionalDbSession(context.Background(), func(sess *sqlstore.DBSession) error { +func (st storeImpl) saveAlertDefinition(cmd *saveAlertDefinitionCommand) error { + return st.SQLStore.WithTransactionalDbSession(context.Background(), func(sess *sqlstore.DBSession) error { intervalSeconds := defaultIntervalSeconds if cmd.IntervalSeconds != nil { intervalSeconds = *cmd.IntervalSeconds @@ -83,7 +104,7 @@ func (ng *AlertNG) saveAlertDefinition(cmd *saveAlertDefinitionCommand) error { UID: uid, } - if err := ng.validateAlertDefinition(alertDefinition, false); err != nil { + if err := st.validateAlertDefinition(alertDefinition, false); err != nil { return err } @@ -92,7 +113,7 @@ func (ng *AlertNG) saveAlertDefinition(cmd *saveAlertDefinitionCommand) error { } if _, err := sess.Insert(alertDefinition); err != nil { - if ng.SQLStore.Dialect.IsUniqueConstraintViolation(err) && strings.Contains(err.Error(), "title") { + if st.SQLStore.Dialect.IsUniqueConstraintViolation(err) && strings.Contains(err.Error(), "title") { return fmt.Errorf("an alert definition with the title '%s' already exists: %w", cmd.Title, err) } return err @@ -119,8 +140,8 @@ func (ng *AlertNG) saveAlertDefinition(cmd *saveAlertDefinitionCommand) error { // updateAlertDefinition is a handler for updating an existing alert definition. // It returns models.ErrAlertDefinitionNotFound if no alert definition is found for the provided ID. -func (ng *AlertNG) updateAlertDefinition(cmd *updateAlertDefinitionCommand) error { - return ng.SQLStore.WithTransactionalDbSession(context.Background(), func(sess *sqlstore.DBSession) error { +func (st storeImpl) updateAlertDefinition(cmd *updateAlertDefinitionCommand) error { + return st.SQLStore.WithTransactionalDbSession(context.Background(), func(sess *sqlstore.DBSession) error { existingAlertDefinition, err := getAlertDefinitionByUID(sess, cmd.UID, cmd.OrgID) if err != nil { if errors.Is(err, errAlertDefinitionNotFound) { @@ -157,7 +178,7 @@ func (ng *AlertNG) updateAlertDefinition(cmd *updateAlertDefinitionCommand) erro UID: existingAlertDefinition.UID, } - if err := ng.validateAlertDefinition(alertDefinition, true); err != nil { + if err := st.validateAlertDefinition(alertDefinition, true); err != nil { return err } @@ -169,7 +190,7 @@ func (ng *AlertNG) updateAlertDefinition(cmd *updateAlertDefinitionCommand) erro _, err = sess.ID(existingAlertDefinition.ID).Update(alertDefinition) if err != nil { - if ng.SQLStore.Dialect.IsUniqueConstraintViolation(err) && strings.Contains(err.Error(), "title") { + if st.SQLStore.Dialect.IsUniqueConstraintViolation(err) && strings.Contains(err.Error(), "title") { return fmt.Errorf("an alert definition with the title '%s' already exists: %w", cmd.Title, err) } return err @@ -196,8 +217,8 @@ func (ng *AlertNG) updateAlertDefinition(cmd *updateAlertDefinitionCommand) erro } // getOrgAlertDefinitions is a handler for retrieving alert definitions of specific organisation. -func (ng *AlertNG) getOrgAlertDefinitions(query *listAlertDefinitionsQuery) error { - return ng.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error { +func (st storeImpl) getOrgAlertDefinitions(query *listAlertDefinitionsQuery) error { + return st.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error { alertDefinitions := make([]*AlertDefinition, 0) q := "SELECT * FROM alert_definition WHERE org_id = ?" if err := sess.SQL(q, query.OrgID).Find(&alertDefinitions); err != nil { @@ -209,8 +230,8 @@ func (ng *AlertNG) getOrgAlertDefinitions(query *listAlertDefinitionsQuery) erro }) } -func (ng *AlertNG) getAlertDefinitions(query *listAlertDefinitionsQuery) error { - return ng.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error { +func (st storeImpl) getAlertDefinitions(query *listAlertDefinitionsQuery) error { + return st.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error { alerts := make([]*AlertDefinition, 0) q := "SELECT uid, org_id, interval_seconds, version, paused FROM alert_definition" if err := sess.SQL(q).Find(&alerts); err != nil { @@ -222,8 +243,8 @@ func (ng *AlertNG) getAlertDefinitions(query *listAlertDefinitionsQuery) error { }) } -func (ng *AlertNG) updateAlertDefinitionPaused(cmd *updateAlertDefinitionPausedCommand) error { - return ng.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error { +func (st storeImpl) updateAlertDefinitionPaused(cmd *updateAlertDefinitionPausedCommand) error { + return st.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error { if len(cmd.UIDs) == 0 { return nil } @@ -250,8 +271,8 @@ func (ng *AlertNG) updateAlertDefinitionPaused(cmd *updateAlertDefinitionPausedC if err != nil { return err } - if cmd.ResultCount, err = res.RowsAffected(); err != nil { - ng.log.Debug("failed to get rows affected: %w", err) + if resultCount, err := res.RowsAffected(); err == nil { + cmd.ResultCount = resultCount } return nil }) diff --git a/pkg/services/ngalert/database_test.go b/pkg/services/ngalert/database_test.go index 827a4468407..e6a382c4bab 100644 --- a/pkg/services/ngalert/database_test.go +++ b/pkg/services/ngalert/database_test.go @@ -68,10 +68,12 @@ func TestCreatingAlertDefinition(t *testing.T) { expectedError: errEmptyTitleError, }, } + + _, store := setupTestEnv(t, baseIntervalSeconds) + t.Cleanup(registry.ClearOverrides) + for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - ng := setupTestEnv(t) - t.Cleanup(registry.ClearOverrides) q := saveAlertDefinitionCommand{ OrgID: 1, @@ -95,7 +97,7 @@ func TestCreatingAlertDefinition(t *testing.T) { if tc.inputIntervalSeconds != nil { q.IntervalSeconds = tc.inputIntervalSeconds } - err := ng.saveAlertDefinition(&q) + err := store.saveAlertDefinition(&q) switch { case tc.expectedError != nil: require.Error(t, err) @@ -111,7 +113,7 @@ func TestCreatingAlertDefinition(t *testing.T) { } func TestCreatingConflictionAlertDefinition(t *testing.T) { t.Run("Should fail to create alert definition with conflicting org_id, title", func(t *testing.T) { - ng := setupTestEnv(t) + _, store := setupTestEnv(t, baseIntervalSeconds) t.Cleanup(registry.ClearOverrides) q := saveAlertDefinitionCommand{ @@ -134,12 +136,12 @@ func TestCreatingConflictionAlertDefinition(t *testing.T) { }, } - err := ng.saveAlertDefinition(&q) + err := store.saveAlertDefinition(&q) require.NoError(t, err) - err = ng.saveAlertDefinition(&q) + err = store.saveAlertDefinition(&q) require.Error(t, err) - assert.True(t, ng.SQLStore.Dialect.IsUniqueConstraintViolation(err)) + assert.True(t, store.SQLStore.Dialect.IsUniqueConstraintViolation(err)) }) } @@ -148,7 +150,7 @@ func TestUpdatingAlertDefinition(t *testing.T) { mockTimeNow() defer resetTimeNow() - ng := setupTestEnv(t) + _, store := setupTestEnv(t, baseIntervalSeconds) t.Cleanup(registry.ClearOverrides) q := updateAlertDefinitionCommand{ @@ -172,7 +174,7 @@ func TestUpdatingAlertDefinition(t *testing.T) { }, } - err := ng.updateAlertDefinition(&q) + err := store.updateAlertDefinition(&q) require.NoError(t, err) }) @@ -180,11 +182,11 @@ func TestUpdatingAlertDefinition(t *testing.T) { mockTimeNow() defer resetTimeNow() - ng := setupTestEnv(t) + _, store := setupTestEnv(t, baseIntervalSeconds) t.Cleanup(registry.ClearOverrides) var initialInterval int64 = 120 - alertDefinition := createTestAlertDefinition(t, ng, initialInterval) + alertDefinition := createTestAlertDefinition(t, store, initialInterval) created := alertDefinition.Updated var customInterval int64 = 30 @@ -273,7 +275,7 @@ func TestUpdatingAlertDefinition(t *testing.T) { q.OrgID = tc.inputOrgID } q.Title = tc.inputTitle - err := ng.updateAlertDefinition(&q) + err := store.updateAlertDefinition(&q) switch { case tc.expectedError != nil: require.Error(t, err) @@ -319,12 +321,12 @@ func TestUpdatingConflictingAlertDefinition(t *testing.T) { mockTimeNow() defer resetTimeNow() - ng := setupTestEnv(t) + _, store := setupTestEnv(t, baseIntervalSeconds) t.Cleanup(registry.ClearOverrides) var initialInterval int64 = 120 - alertDef1 := createTestAlertDefinition(t, ng, initialInterval) - alertDef2 := createTestAlertDefinition(t, ng, initialInterval) + alertDef1 := createTestAlertDefinition(t, store, initialInterval) + alertDef2 := createTestAlertDefinition(t, store, initialInterval) q := updateAlertDefinitionCommand{ UID: (*alertDef2).UID, @@ -346,15 +348,15 @@ func TestUpdatingConflictingAlertDefinition(t *testing.T) { }, } - err := ng.updateAlertDefinition(&q) + err := store.updateAlertDefinition(&q) require.Error(t, err) - assert.True(t, ng.SQLStore.Dialect.IsUniqueConstraintViolation(err)) + assert.True(t, store.SQLStore.Dialect.IsUniqueConstraintViolation(err)) }) } func TestDeletingAlertDefinition(t *testing.T) { t.Run("zero rows affected when deleting unknown alert", func(t *testing.T) { - ng := setupTestEnv(t) + _, store := setupTestEnv(t, baseIntervalSeconds) t.Cleanup(registry.ClearOverrides) q := deleteAlertDefinitionByUIDCommand{ @@ -362,15 +364,15 @@ func TestDeletingAlertDefinition(t *testing.T) { OrgID: 1, } - err := ng.deleteAlertDefinitionByUID(&q) + err := store.deleteAlertDefinitionByUID(&q) require.NoError(t, err) }) t.Run("deleting successfully existing alert", func(t *testing.T) { - ng := setupTestEnv(t) + _, store := setupTestEnv(t, baseIntervalSeconds) t.Cleanup(registry.ClearOverrides) - alertDefinition := createTestAlertDefinition(t, ng, 60) + alertDefinition := createTestAlertDefinition(t, store, 60) q := deleteAlertDefinitionByUIDCommand{ UID: (*alertDefinition).UID, @@ -384,21 +386,21 @@ func TestDeletingAlertDefinition(t *testing.T) { State: InstanceStateFiring, Labels: InstanceLabels{"test": "testValue"}, } - err := ng.saveAlertInstance(saveCmd) + err := store.saveAlertInstance(saveCmd) require.NoError(t, err) listCommand := &listAlertInstancesQuery{ DefinitionOrgID: alertDefinition.OrgID, DefinitionUID: alertDefinition.UID, } - err = ng.listAlertInstances(listCommand) + err = store.listAlertInstances(listCommand) require.NoError(t, err) require.Len(t, listCommand.Result, 1) - err = ng.deleteAlertDefinitionByUID(&q) + err = store.deleteAlertDefinitionByUID(&q) require.NoError(t, err) // assert that alert instance is deleted - err = ng.listAlertInstances(listCommand) + err = store.listAlertInstances(listCommand) require.NoError(t, err) require.Len(t, listCommand.Result, 0) diff --git a/pkg/services/ngalert/fetcher.go b/pkg/services/ngalert/fetcher.go index f040163f3b2..64f0222ce44 100644 --- a/pkg/services/ngalert/fetcher.go +++ b/pkg/services/ngalert/fetcher.go @@ -4,11 +4,11 @@ import ( "time" ) -func (ng *AlertNG) fetchAllDetails(now time.Time) []*AlertDefinition { +func (sch *schedule) fetchAllDetails(now time.Time) []*AlertDefinition { q := listAlertDefinitionsQuery{} - err := ng.getAlertDefinitions(&q) + err := sch.store.getAlertDefinitions(&q) if err != nil { - ng.schedule.log.Error("failed to fetch alert definitions", "now", now, "err", err) + sch.log.Error("failed to fetch alert definitions", "now", now, "err", err) return nil } return q.Result diff --git a/pkg/services/ngalert/instance_api.go b/pkg/services/ngalert/instance_api.go index 26788458fb7..6551852d8b0 100644 --- a/pkg/services/ngalert/instance_api.go +++ b/pkg/services/ngalert/instance_api.go @@ -6,10 +6,10 @@ import ( ) // listAlertInstancesEndpoint handles GET /api/alert-instances. -func (ng *AlertNG) listAlertInstancesEndpoint(c *models.ReqContext) response.Response { +func (api *apiImpl) listAlertInstancesEndpoint(c *models.ReqContext) response.Response { cmd := listAlertInstancesQuery{DefinitionOrgID: c.SignedInUser.OrgId} - if err := ng.listAlertInstances(&cmd); err != nil { + if err := api.store.listAlertInstances(&cmd); err != nil { return response.Error(500, "Failed to list alert instances", err) } diff --git a/pkg/services/ngalert/instance_database.go b/pkg/services/ngalert/instance_database.go index 32d87979c0f..9adea88665d 100644 --- a/pkg/services/ngalert/instance_database.go +++ b/pkg/services/ngalert/instance_database.go @@ -12,8 +12,8 @@ import ( // getAlertInstance is a handler for retrieving an alert instance based on OrgId, AlertDefintionID, and // the hash of the labels. // nolint:unused -func (ng *AlertNG) getAlertInstance(cmd *getAlertInstanceQuery) error { - return ng.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error { +func (st storeImpl) getAlertInstance(cmd *getAlertInstanceQuery) error { + return st.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error { instance := AlertInstance{} s := strings.Builder{} s.WriteString(`SELECT * FROM alert_instance @@ -45,8 +45,8 @@ func (ng *AlertNG) getAlertInstance(cmd *getAlertInstanceQuery) error { // listAlertInstances is a handler for retrieving alert instances within specific organisation // based on various filters. -func (ng *AlertNG) listAlertInstances(cmd *listAlertInstancesQuery) error { - return ng.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error { +func (st storeImpl) listAlertInstances(cmd *listAlertInstancesQuery) error { + return st.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error { alertInstances := make([]*listAlertInstancesQueryResult, 0) s := strings.Builder{} @@ -78,8 +78,8 @@ func (ng *AlertNG) listAlertInstances(cmd *listAlertInstancesQuery) error { // saveAlertDefinition is a handler for saving a new alert definition. // nolint:unused -func (ng *AlertNG) saveAlertInstance(cmd *saveAlertInstanceCommand) error { - return ng.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error { +func (st storeImpl) saveAlertInstance(cmd *saveAlertInstanceCommand) error { + return st.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error { labelTupleJSON, labelsHash, err := cmd.Labels.StringAndHash() if err != nil { return err @@ -101,7 +101,7 @@ func (ng *AlertNG) saveAlertInstance(cmd *saveAlertInstanceCommand) error { params := append(make([]interface{}, 0), alertInstance.DefinitionOrgID, alertInstance.DefinitionUID, labelTupleJSON, alertInstance.LabelsHash, alertInstance.CurrentState, alertInstance.CurrentStateSince.Unix(), alertInstance.LastEvalTime.Unix()) - upsertSQL := ng.SQLStore.Dialect.UpsertSQL( + upsertSQL := st.SQLStore.Dialect.UpsertSQL( "alert_instance", []string{"def_org_id", "def_uid", "labels_hash"}, []string{"def_org_id", "def_uid", "labels", "labels_hash", "current_state", "current_state_since", "last_eval_time"}) diff --git a/pkg/services/ngalert/instance_database_test.go b/pkg/services/ngalert/instance_database_test.go index 8c522dd1ce9..24ac2c39f7f 100644 --- a/pkg/services/ngalert/instance_database_test.go +++ b/pkg/services/ngalert/instance_database_test.go @@ -9,18 +9,18 @@ import ( ) func TestAlertInstanceOperations(t *testing.T) { - ng := setupTestEnv(t) + _, store := setupTestEnv(t, baseIntervalSeconds) - alertDefinition1 := createTestAlertDefinition(t, ng, 60) + alertDefinition1 := createTestAlertDefinition(t, store, 60) orgID := alertDefinition1.OrgID - alertDefinition2 := createTestAlertDefinition(t, ng, 60) + alertDefinition2 := createTestAlertDefinition(t, store, 60) require.Equal(t, orgID, alertDefinition2.OrgID) - alertDefinition3 := createTestAlertDefinition(t, ng, 60) + alertDefinition3 := createTestAlertDefinition(t, store, 60) require.Equal(t, orgID, alertDefinition3.OrgID) - alertDefinition4 := createTestAlertDefinition(t, ng, 60) + alertDefinition4 := createTestAlertDefinition(t, store, 60) require.Equal(t, orgID, alertDefinition4.OrgID) t.Run("can save and read new alert instance", func(t *testing.T) { @@ -30,7 +30,7 @@ func TestAlertInstanceOperations(t *testing.T) { State: InstanceStateFiring, Labels: InstanceLabels{"test": "testValue"}, } - err := ng.saveAlertInstance(saveCmd) + err := store.saveAlertInstance(saveCmd) require.NoError(t, err) getCmd := &getAlertInstanceQuery{ @@ -39,7 +39,7 @@ func TestAlertInstanceOperations(t *testing.T) { Labels: InstanceLabels{"test": "testValue"}, } - err = ng.getAlertInstance(getCmd) + err = store.getAlertInstance(getCmd) require.NoError(t, err) require.Equal(t, saveCmd.Labels, getCmd.Result.Labels) @@ -53,7 +53,7 @@ func TestAlertInstanceOperations(t *testing.T) { DefinitionUID: alertDefinition2.UID, State: InstanceStateNormal, } - err := ng.saveAlertInstance(saveCmd) + err := store.saveAlertInstance(saveCmd) require.NoError(t, err) getCmd := &getAlertInstanceQuery{ @@ -61,7 +61,7 @@ func TestAlertInstanceOperations(t *testing.T) { DefinitionUID: saveCmd.DefinitionUID, } - err = ng.getAlertInstance(getCmd) + err = store.getAlertInstance(getCmd) require.NoError(t, err) require.Equal(t, alertDefinition2.OrgID, getCmd.Result.DefinitionOrgID) @@ -77,7 +77,7 @@ func TestAlertInstanceOperations(t *testing.T) { Labels: InstanceLabels{"test": "testValue"}, } - err := ng.saveAlertInstance(saveCmdOne) + err := store.saveAlertInstance(saveCmdOne) require.NoError(t, err) saveCmdTwo := &saveAlertInstanceCommand{ @@ -86,7 +86,7 @@ func TestAlertInstanceOperations(t *testing.T) { State: InstanceStateFiring, Labels: InstanceLabels{"test": "meow"}, } - err = ng.saveAlertInstance(saveCmdTwo) + err = store.saveAlertInstance(saveCmdTwo) require.NoError(t, err) listCommand := &listAlertInstancesQuery{ @@ -94,7 +94,7 @@ func TestAlertInstanceOperations(t *testing.T) { DefinitionUID: saveCmdOne.DefinitionUID, } - err = ng.listAlertInstances(listCommand) + err = store.listAlertInstances(listCommand) require.NoError(t, err) require.Len(t, listCommand.Result, 2) @@ -105,7 +105,7 @@ func TestAlertInstanceOperations(t *testing.T) { DefinitionOrgID: orgID, } - err := ng.listAlertInstances(listCommand) + err := store.listAlertInstances(listCommand) require.NoError(t, err) require.Len(t, listCommand.Result, 4) @@ -117,7 +117,7 @@ func TestAlertInstanceOperations(t *testing.T) { State: InstanceStateNormal, } - err := ng.listAlertInstances(listCommand) + err := store.listAlertInstances(listCommand) require.NoError(t, err) require.Len(t, listCommand.Result, 1) @@ -131,7 +131,7 @@ func TestAlertInstanceOperations(t *testing.T) { Labels: InstanceLabels{"test": "testValue"}, } - err := ng.saveAlertInstance(saveCmdOne) + err := store.saveAlertInstance(saveCmdOne) require.NoError(t, err) saveCmdTwo := &saveAlertInstanceCommand{ @@ -140,7 +140,7 @@ func TestAlertInstanceOperations(t *testing.T) { State: InstanceStateNormal, Labels: InstanceLabels{"test": "testValue"}, } - err = ng.saveAlertInstance(saveCmdTwo) + err = store.saveAlertInstance(saveCmdTwo) require.NoError(t, err) listCommand := &listAlertInstancesQuery{ @@ -148,7 +148,7 @@ func TestAlertInstanceOperations(t *testing.T) { DefinitionUID: alertDefinition4.UID, } - err = ng.listAlertInstances(listCommand) + err = store.listAlertInstances(listCommand) require.NoError(t, err) require.Len(t, listCommand.Result, 1) diff --git a/pkg/services/ngalert/middleware.go b/pkg/services/ngalert/middleware.go index 066a18da013..c28b5c8739e 100644 --- a/pkg/services/ngalert/middleware.go +++ b/pkg/services/ngalert/middleware.go @@ -4,12 +4,17 @@ import ( "github.com/grafana/grafana/pkg/models" ) -func (ng *AlertNG) validateOrgAlertDefinition(c *models.ReqContext) { +func (api *apiImpl) validateOrgAlertDefinition(c *models.ReqContext) { uid := c.ParamsEscape(":alertDefinitionUID") + if uid == "" { + c.JsonApiErr(403, "Permission denied", nil) + return + } + query := getAlertDefinitionByUIDQuery{UID: uid, OrgID: c.SignedInUser.OrgId} - if err := ng.getAlertDefinitionByUID(&query); err != nil { + if err := api.store.getAlertDefinitionByUID(&query); err != nil { c.JsonApiErr(404, "Alert definition not found", nil) return } diff --git a/pkg/services/ngalert/ngalert.go b/pkg/services/ngalert/ngalert.go index 837a0e267d5..76dcbf28ce0 100644 --- a/pkg/services/ngalert/ngalert.go +++ b/pkg/services/ngalert/ngalert.go @@ -6,12 +6,12 @@ import ( "github.com/benbjohnson/clock" "github.com/grafana/grafana/pkg/services/ngalert/eval" + "github.com/grafana/grafana/pkg/services/sqlstore" "github.com/grafana/grafana/pkg/api/routing" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/registry" "github.com/grafana/grafana/pkg/services/datasources" - "github.com/grafana/grafana/pkg/services/sqlstore" "github.com/grafana/grafana/pkg/services/sqlstore/migrator" "github.com/grafana/grafana/pkg/setting" ) @@ -35,7 +35,7 @@ type AlertNG struct { RouteRegister routing.RouteRegister `inject:""` SQLStore *sqlstore.SQLStore `inject:""` log log.Logger - schedule *schedule + schedule scheduleService } func init() { @@ -46,21 +46,34 @@ func init() { func (ng *AlertNG) Init() error { ng.log = log.New("ngalert") - ng.registerAPIEndpoints() + baseInterval := baseIntervalSeconds * time.Second + + store := storeImpl{baseInterval: baseInterval, SQLStore: ng.SQLStore} + schedCfg := schedulerCfg{ c: clock.New(), - baseInterval: baseIntervalSeconds * time.Second, + baseInterval: baseInterval, logger: ng.log, evaluator: eval.Evaluator{Cfg: ng.Cfg}, + store: store, } ng.schedule = newScheduler(schedCfg) + + api := apiImpl{ + Cfg: ng.Cfg, + DatasourceCache: ng.DatasourceCache, + RouteRegister: ng.RouteRegister, + schedule: ng.schedule, + store: store} + api.registerAPIEndpoints() + return nil } // Run starts the scheduler func (ng *AlertNG) Run(ctx context.Context) error { ng.log.Debug("ngalert starting") - return ng.alertingTicker(ctx) + return ng.schedule.Ticker(ctx) } // IsDisabled returns true if the alerting service is disable for this instance. @@ -85,14 +98,14 @@ func (ng *AlertNG) AddMigration(mg *migrator.Migrator) { } // LoadAlertCondition returns a Condition object for the given alertDefinitionID. -func (ng *AlertNG) LoadAlertCondition(alertDefinitionUID string, orgID int64) (*eval.Condition, error) { +func (api *apiImpl) LoadAlertCondition(alertDefinitionUID string, orgID int64) (*eval.Condition, error) { q := getAlertDefinitionByUIDQuery{UID: alertDefinitionUID, OrgID: orgID} - if err := ng.getAlertDefinitionByUID(&q); err != nil { + if err := api.store.getAlertDefinitionByUID(&q); err != nil { return nil, err } alertDefinition := q.Result - err := ng.validateAlertDefinition(alertDefinition, true) + err := api.store.validateAlertDefinition(alertDefinition, true) if err != nil { return nil, err } diff --git a/pkg/services/ngalert/schedule.go b/pkg/services/ngalert/schedule.go index 3a74d40e2a6..0bfe5e4790e 100644 --- a/pkg/services/ngalert/schedule.go +++ b/pkg/services/ngalert/schedule.go @@ -13,8 +13,19 @@ import ( "golang.org/x/sync/errgroup" ) -func (ng *AlertNG) definitionRoutine(grafanaCtx context.Context, key alertDefinitionKey, evalCh <-chan *evalContext, stopCh <-chan struct{}) error { - ng.log.Debug("alert definition routine started", "key", key) +type scheduleService interface { + Ticker(context.Context) error + Pause() error + Unpause() error + + // the following are used by tests only used for tests + evalApplied(alertDefinitionKey, time.Time) + stopApplied(alertDefinitionKey) + overrideCfg(cfg schedulerCfg) +} + +func (sch *schedule) definitionRoutine(grafanaCtx context.Context, key alertDefinitionKey, evalCh <-chan *evalContext, stopCh <-chan struct{}) error { + sch.log.Debug("alert definition routine started", "key", key) evalRunning := false var start, end time.Time @@ -33,13 +44,13 @@ func (ng *AlertNG) definitionRoutine(grafanaCtx context.Context, key alertDefini // fetch latest alert definition version if alertDefinition == nil || alertDefinition.Version < ctx.version { q := getAlertDefinitionByUIDQuery{OrgID: key.orgID, UID: key.definitionUID} - err := ng.getAlertDefinitionByUID(&q) + err := sch.store.getAlertDefinitionByUID(&q) if err != nil { - ng.schedule.log.Error("failed to fetch alert definition", "key", key) + sch.log.Error("failed to fetch alert definition", "key", key) return err } alertDefinition = q.Result - ng.schedule.log.Debug("new alert definition version fetched", "title", alertDefinition.Title, "key", key, "version", alertDefinition.Version) + sch.log.Debug("new alert definition version fetched", "title", alertDefinition.Title, "key", key, "version", alertDefinition.Version) } condition := eval.Condition{ @@ -47,19 +58,19 @@ func (ng *AlertNG) definitionRoutine(grafanaCtx context.Context, key alertDefini OrgID: alertDefinition.OrgID, QueriesAndExpressions: alertDefinition.Data, } - results, err := ng.schedule.evaluator.ConditionEval(&condition, ctx.now) + results, err := sch.evaluator.ConditionEval(&condition, ctx.now) end = timeNow() if err != nil { // consider saving alert instance on error - ng.schedule.log.Error("failed to evaluate alert definition", "title", alertDefinition.Title, "key", key, "attempt", attempt, "now", ctx.now, "duration", end.Sub(start), "error", err) + sch.log.Error("failed to evaluate alert definition", "title", alertDefinition.Title, "key", key, "attempt", attempt, "now", ctx.now, "duration", end.Sub(start), "error", err) return err } for _, r := range results { - ng.schedule.log.Debug("alert definition result", "title", alertDefinition.Title, "key", key, "attempt", attempt, "now", ctx.now, "duration", end.Sub(start), "instance", r.Instance, "state", r.State.String()) + sch.log.Debug("alert definition result", "title", alertDefinition.Title, "key", key, "attempt", attempt, "now", ctx.now, "duration", end.Sub(start), "instance", r.Instance, "state", r.State.String()) cmd := saveAlertInstanceCommand{DefinitionOrgID: key.orgID, DefinitionUID: key.definitionUID, State: InstanceStateType(r.State.String()), Labels: InstanceLabels(r.Instance), LastEvalTime: ctx.now} - err := ng.saveAlertInstance(&cmd) + err := sch.store.saveAlertInstance(&cmd) if err != nil { - ng.schedule.log.Error("failed saving alert instance", "title", alertDefinition.Title, "key", key, "attempt", attempt, "now", ctx.now, "instance", r.Instance, "state", r.State.String(), "error", err) + sch.log.Error("failed saving alert instance", "title", alertDefinition.Title, "key", key, "attempt", attempt, "now", ctx.now, "instance", r.Instance, "state", r.State.String(), "error", err) } } return nil @@ -69,12 +80,10 @@ func (ng *AlertNG) definitionRoutine(grafanaCtx context.Context, key alertDefini evalRunning = true defer func() { evalRunning = false - if ng.schedule.evalApplied != nil { - ng.schedule.evalApplied(key, ctx.now) - } + sch.evalApplied(key, ctx.now) }() - for attempt = 0; attempt < ng.schedule.maxAttempts; attempt++ { + for attempt = 0; attempt < sch.maxAttempts; attempt++ { err := evaluate(attempt) if err == nil { break @@ -82,10 +91,8 @@ func (ng *AlertNG) definitionRoutine(grafanaCtx context.Context, key alertDefini } }() case <-stopCh: - if ng.schedule.stopApplied != nil { - ng.schedule.stopApplied(key) - } - ng.schedule.log.Debug("stopping alert definition routine", "key", key) + sch.stopApplied(key) + sch.log.Debug("stopping alert definition routine", "key", key) // interrupt evaluation if it's running return nil case <-grafanaCtx.Done(): @@ -110,43 +117,73 @@ type schedule struct { // evalApplied is only used for tests: test code can set it to non-nil // function, and then it'll be called from the event loop whenever the // message from evalApplied is handled. - evalApplied func(alertDefinitionKey, time.Time) + evalAppliedFunc func(alertDefinitionKey, time.Time) // stopApplied is only used for tests: test code can set it to non-nil // function, and then it'll be called from the event loop whenever the // message from stopApplied is handled. - stopApplied func(alertDefinitionKey) + stopAppliedFunc func(alertDefinitionKey) log log.Logger evaluator eval.Evaluator + + store store } type schedulerCfg struct { - c clock.Clock - baseInterval time.Duration - logger log.Logger - evalApplied func(alertDefinitionKey, time.Time) - evaluator eval.Evaluator + c clock.Clock + baseInterval time.Duration + logger log.Logger + evalAppliedFunc func(alertDefinitionKey, time.Time) + stopAppliedFunc func(alertDefinitionKey) + evaluator eval.Evaluator + store store } // newScheduler returns a new schedule. func newScheduler(cfg schedulerCfg) *schedule { ticker := alerting.NewTicker(cfg.c.Now(), time.Second*0, cfg.c, int64(cfg.baseInterval.Seconds())) sch := schedule{ - registry: alertDefinitionRegistry{alertDefinitionInfo: make(map[alertDefinitionKey]alertDefinitionInfo)}, - maxAttempts: maxAttempts, - clock: cfg.c, - baseInterval: cfg.baseInterval, - log: cfg.logger, - heartbeat: ticker, - evalApplied: cfg.evalApplied, - evaluator: cfg.evaluator, + registry: alertDefinitionRegistry{alertDefinitionInfo: make(map[alertDefinitionKey]alertDefinitionInfo)}, + maxAttempts: maxAttempts, + clock: cfg.c, + baseInterval: cfg.baseInterval, + log: cfg.logger, + heartbeat: ticker, + evalAppliedFunc: cfg.evalAppliedFunc, + stopAppliedFunc: cfg.stopAppliedFunc, + evaluator: cfg.evaluator, + store: cfg.store, } return &sch } -func (sch *schedule) pause() error { +func (sch *schedule) overrideCfg(cfg schedulerCfg) { + sch.clock = cfg.c + sch.baseInterval = cfg.baseInterval + sch.heartbeat = alerting.NewTicker(cfg.c.Now(), time.Second*0, cfg.c, int64(cfg.baseInterval.Seconds())) + sch.evalAppliedFunc = cfg.evalAppliedFunc + sch.stopAppliedFunc = cfg.stopAppliedFunc +} + +func (sch *schedule) evalApplied(alertDefKey alertDefinitionKey, now time.Time) { + if sch.evalAppliedFunc == nil { + return + } + + sch.evalAppliedFunc(alertDefKey, now) +} + +func (sch *schedule) stopApplied(alertDefKey alertDefinitionKey) { + if sch.stopAppliedFunc == nil { + return + } + + sch.stopAppliedFunc(alertDefKey) +} + +func (sch *schedule) Pause() error { if sch == nil { return fmt.Errorf("scheduler is not initialised") } @@ -155,7 +192,7 @@ func (sch *schedule) pause() error { return nil } -func (sch *schedule) unpause() error { +func (sch *schedule) Unpause() error { if sch == nil { return fmt.Errorf("scheduler is not initialised") } @@ -164,20 +201,20 @@ func (sch *schedule) unpause() error { return nil } -func (ng *AlertNG) alertingTicker(grafanaCtx context.Context) error { +func (sch *schedule) Ticker(grafanaCtx context.Context) error { dispatcherGroup, ctx := errgroup.WithContext(grafanaCtx) for { select { - case tick := <-ng.schedule.heartbeat.C: - tickNum := tick.Unix() / int64(ng.schedule.baseInterval.Seconds()) - alertDefinitions := ng.fetchAllDetails(tick) - ng.schedule.log.Debug("alert definitions fetched", "count", len(alertDefinitions)) + case tick := <-sch.heartbeat.C: + tickNum := tick.Unix() / int64(sch.baseInterval.Seconds()) + alertDefinitions := sch.fetchAllDetails(tick) + sch.log.Debug("alert definitions fetched", "count", len(alertDefinitions)) // registeredDefinitions is a map used for finding deleted alert definitions // initially it is assigned to all known alert definitions from the previous cycle // each alert definition found also in this cycle is removed // so, at the end, the remaining registered alert definitions are the deleted ones - registeredDefinitions := ng.schedule.registry.keyMap() + registeredDefinitions := sch.registry.keyMap() type readyToRunItem struct { key alertDefinitionKey @@ -191,24 +228,24 @@ func (ng *AlertNG) alertingTicker(grafanaCtx context.Context) error { key := item.getKey() itemVersion := item.Version - newRoutine := !ng.schedule.registry.exists(key) - definitionInfo := ng.schedule.registry.getOrCreateInfo(key, itemVersion) - invalidInterval := item.IntervalSeconds%int64(ng.schedule.baseInterval.Seconds()) != 0 + newRoutine := !sch.registry.exists(key) + definitionInfo := sch.registry.getOrCreateInfo(key, itemVersion) + invalidInterval := item.IntervalSeconds%int64(sch.baseInterval.Seconds()) != 0 if newRoutine && !invalidInterval { dispatcherGroup.Go(func() error { - return ng.definitionRoutine(ctx, key, definitionInfo.evalCh, definitionInfo.stopCh) + return sch.definitionRoutine(ctx, key, definitionInfo.evalCh, definitionInfo.stopCh) }) } if invalidInterval { // this is expected to be always false // give that we validate interval during alert definition updates - ng.schedule.log.Debug("alert definition with invalid interval will be ignored: interval should be divided exactly by scheduler interval", "key", key, "interval", time.Duration(item.IntervalSeconds)*time.Second, "scheduler interval", ng.schedule.baseInterval) + sch.log.Debug("alert definition with invalid interval will be ignored: interval should be divided exactly by scheduler interval", "key", key, "interval", time.Duration(item.IntervalSeconds)*time.Second, "scheduler interval", sch.baseInterval) continue } - itemFrequency := item.IntervalSeconds / int64(ng.schedule.baseInterval.Seconds()) + itemFrequency := item.IntervalSeconds / int64(sch.baseInterval.Seconds()) if item.IntervalSeconds != 0 && tickNum%itemFrequency == 0 { readyToRun = append(readyToRun, readyToRunItem{key: key, definitionInfo: definitionInfo}) } @@ -219,7 +256,7 @@ func (ng *AlertNG) alertingTicker(grafanaCtx context.Context) error { var step int64 = 0 if len(readyToRun) > 0 { - step = ng.schedule.baseInterval.Nanoseconds() / int64(len(readyToRun)) + step = sch.baseInterval.Nanoseconds() / int64(len(readyToRun)) } for i := range readyToRun { @@ -232,13 +269,13 @@ func (ng *AlertNG) alertingTicker(grafanaCtx context.Context) error { // unregister and stop routines of the deleted alert definitions for key := range registeredDefinitions { - definitionInfo, err := ng.schedule.registry.get(key) + definitionInfo, err := sch.registry.get(key) if err != nil { - ng.schedule.log.Error("failed to get alert definition routine information", "err", err) + sch.log.Error("failed to get alert definition routine information", "err", err) continue } definitionInfo.stopCh <- struct{}{} - ng.schedule.registry.del(key) + sch.registry.del(key) } case <-grafanaCtx.Done(): err := dispatcherGroup.Wait() diff --git a/pkg/services/ngalert/schedule_test.go b/pkg/services/ngalert/schedule_test.go index ca44cbe1ef0..3850b74dddc 100644 --- a/pkg/services/ngalert/schedule_test.go +++ b/pkg/services/ngalert/schedule_test.go @@ -8,9 +8,7 @@ import ( "testing" "time" - "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/registry" - "github.com/grafana/grafana/pkg/services/ngalert/eval" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -23,40 +21,37 @@ type evalAppliedInfo struct { } func TestAlertingTicker(t *testing.T) { - ng := setupTestEnv(t) + ng, store := setupTestEnv(t, 1) t.Cleanup(registry.ClearOverrides) - mockedClock := clock.NewMock() - schefCfg := schedulerCfg{ - c: mockedClock, - baseInterval: time.Second, - logger: log.New("ngalert.schedule.test"), - evaluator: eval.Evaluator{Cfg: ng.Cfg}, - } - ng.schedule = newScheduler(schefCfg) - alerts := make([]*AlertDefinition, 0) - // create alert definition with zero interval (should never run) - alerts = append(alerts, createTestAlertDefinition(t, ng, 0)) + alerts = append(alerts, createTestAlertDefinition(t, store, 0)) // create alert definition with one second interval - alerts = append(alerts, createTestAlertDefinition(t, ng, 1)) + alerts = append(alerts, createTestAlertDefinition(t, store, 1)) evalAppliedCh := make(chan evalAppliedInfo, len(alerts)) stopAppliedCh := make(chan alertDefinitionKey, len(alerts)) - ng.schedule.evalApplied = func(alertDefKey alertDefinitionKey, now time.Time) { - evalAppliedCh <- evalAppliedInfo{alertDefKey: alertDefKey, now: now} - } + mockedClock := clock.NewMock() + baseInterval := time.Second - ng.schedule.stopApplied = func(alertDefKey alertDefinitionKey) { - stopAppliedCh <- alertDefKey + schefCfg := schedulerCfg{ + c: mockedClock, + baseInterval: baseInterval, + evalAppliedFunc: func(alertDefKey alertDefinitionKey, now time.Time) { + evalAppliedCh <- evalAppliedInfo{alertDefKey: alertDefKey, now: now} + }, + stopAppliedFunc: func(alertDefKey alertDefinitionKey) { + stopAppliedCh <- alertDefKey + }, } + ng.schedule.overrideCfg(schefCfg) ctx := context.Background() go func() { - err := ng.alertingTicker(ctx) + err := ng.schedule.Ticker(ctx) require.NoError(t, err) }() runtime.Gosched() @@ -69,7 +64,7 @@ func TestAlertingTicker(t *testing.T) { // change alert definition interval to three seconds var threeSecInterval int64 = 3 - err := ng.updateAlertDefinition(&updateAlertDefinitionCommand{ + err := store.updateAlertDefinition(&updateAlertDefinitionCommand{ UID: alerts[0].UID, IntervalSeconds: &threeSecInterval, OrgID: alerts[0].OrgID, @@ -95,7 +90,7 @@ func TestAlertingTicker(t *testing.T) { assertEvalRun(t, evalAppliedCh, tick, expectedAlertDefinitionsEvaluated...) }) - err = ng.deleteAlertDefinitionByUID(&deleteAlertDefinitionByUIDCommand{UID: alerts[1].UID, OrgID: alerts[1].OrgID}) + err = store.deleteAlertDefinitionByUID(&deleteAlertDefinitionByUIDCommand{UID: alerts[1].UID, OrgID: alerts[1].OrgID}) require.NoError(t, err) t.Logf("alert definition: %v deleted", alerts[1].getKey()) @@ -116,7 +111,7 @@ func TestAlertingTicker(t *testing.T) { }) // create alert definition with one second interval - alerts = append(alerts, createTestAlertDefinition(t, ng, 1)) + alerts = append(alerts, createTestAlertDefinition(t, store, 1)) expectedAlertDefinitionsEvaluated = []alertDefinitionKey{alerts[2].getKey()} t.Run(fmt.Sprintf("on 7th tick alert definitions: %s should be evaluated", concatenate(expectedAlertDefinitionsEvaluated)), func(t *testing.T) { @@ -125,7 +120,7 @@ func TestAlertingTicker(t *testing.T) { }) // pause alert definition - err = ng.updateAlertDefinitionPaused(&updateAlertDefinitionPausedCommand{UIDs: []string{alerts[2].UID}, OrgID: alerts[2].OrgID, Paused: true}) + err = store.updateAlertDefinitionPaused(&updateAlertDefinitionPausedCommand{UIDs: []string{alerts[2].UID}, OrgID: alerts[2].OrgID, Paused: true}) require.NoError(t, err) t.Logf("alert definition: %v paused", alerts[2].getKey()) @@ -141,7 +136,7 @@ func TestAlertingTicker(t *testing.T) { }) // unpause alert definition - err = ng.updateAlertDefinitionPaused(&updateAlertDefinitionPausedCommand{UIDs: []string{alerts[2].UID}, OrgID: alerts[2].OrgID, Paused: false}) + err = store.updateAlertDefinitionPaused(&updateAlertDefinitionPausedCommand{UIDs: []string{alerts[2].UID}, OrgID: alerts[2].OrgID, Paused: false}) require.NoError(t, err) t.Logf("alert definition: %v unpaused", alerts[2].getKey()) diff --git a/pkg/services/ngalert/validator.go b/pkg/services/ngalert/validator.go index 745e0816d90..a1b1133284d 100644 --- a/pkg/services/ngalert/validator.go +++ b/pkg/services/ngalert/validator.go @@ -15,7 +15,7 @@ var errEmptyTitleError = errors.New("title is empty") // validateAlertDefinition validates the alert definition interval and organisation. // If requireData is true checks that it contains at least one alert query -func (ng *AlertNG) validateAlertDefinition(alertDefinition *AlertDefinition, requireData bool) error { +func (st storeImpl) validateAlertDefinition(alertDefinition *AlertDefinition, requireData bool) error { if !requireData && len(alertDefinition.Data) == 0 { return fmt.Errorf("no queries or expressions are found") } @@ -24,8 +24,8 @@ func (ng *AlertNG) validateAlertDefinition(alertDefinition *AlertDefinition, req return errEmptyTitleError } - if alertDefinition.IntervalSeconds%int64(ng.schedule.baseInterval.Seconds()) != 0 { - return fmt.Errorf("invalid interval: %v: interval should be divided exactly by scheduler interval: %v", time.Duration(alertDefinition.IntervalSeconds)*time.Second, ng.schedule.baseInterval) + if alertDefinition.IntervalSeconds%int64(st.baseInterval.Seconds()) != 0 { + return fmt.Errorf("invalid interval: %v: interval should be divided exactly by scheduler interval: %v", time.Duration(alertDefinition.IntervalSeconds)*time.Second, st.baseInterval) } // enfore max name length in SQLite @@ -41,7 +41,7 @@ func (ng *AlertNG) validateAlertDefinition(alertDefinition *AlertDefinition, req } // validateCondition validates that condition queries refer to existing datasources -func (ng *AlertNG) validateCondition(c eval.Condition, user *models.SignedInUser, skipCache bool) error { +func (api *apiImpl) validateCondition(c eval.Condition, user *models.SignedInUser, skipCache bool) error { var refID string if len(c.QueriesAndExpressions) == 0 { @@ -66,7 +66,7 @@ func (ng *AlertNG) validateCondition(c eval.Condition, user *models.SignedInUser continue } - _, err = ng.DatasourceCache.GetDatasourceByUID(datasourceUID, user, skipCache) + _, err = api.DatasourceCache.GetDatasourceByUID(datasourceUID, user, skipCache) if err != nil { return fmt.Errorf("failed to get datasource: %s: %w", datasourceUID, err) }