AlertingNG: code refactoring (#30787)

* AlertingNG: refactoring

* Fix tests
pull/31656/head
Sofia Papagiannaki 4 years ago committed by GitHub
parent 08f500ed06
commit bd2390c49f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 101
      pkg/services/ngalert/api.go
  2. 19
      pkg/services/ngalert/common_test.go
  3. 61
      pkg/services/ngalert/database.go
  4. 52
      pkg/services/ngalert/database_test.go
  5. 6
      pkg/services/ngalert/fetcher.go
  6. 4
      pkg/services/ngalert/instance_api.go
  7. 14
      pkg/services/ngalert/instance_database.go
  8. 34
      pkg/services/ngalert/instance_database_test.go
  9. 9
      pkg/services/ngalert/middleware.go
  10. 29
      pkg/services/ngalert/ngalert.go
  11. 139
      pkg/services/ngalert/schedule.go
  12. 47
      pkg/services/ngalert/schedule_test.go
  13. 10
      pkg/services/ngalert/validator.go

@ -9,42 +9,52 @@ import (
"github.com/grafana/grafana/pkg/api/routing"
"github.com/grafana/grafana/pkg/middleware"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb"
"github.com/grafana/grafana/pkg/util"
)
func (ng *AlertNG) registerAPIEndpoints() {
ng.RouteRegister.Group("/api/alert-definitions", func(alertDefinitions routing.RouteRegister) {
alertDefinitions.Get("", middleware.ReqSignedIn, routing.Wrap(ng.listAlertDefinitions))
alertDefinitions.Get("/eval/:alertDefinitionUID", middleware.ReqSignedIn, ng.validateOrgAlertDefinition, routing.Wrap(ng.alertDefinitionEvalEndpoint))
alertDefinitions.Post("/eval", middleware.ReqSignedIn, binding.Bind(evalAlertConditionCommand{}), routing.Wrap(ng.conditionEvalEndpoint))
alertDefinitions.Get("/:alertDefinitionUID", middleware.ReqSignedIn, ng.validateOrgAlertDefinition, routing.Wrap(ng.getAlertDefinitionEndpoint))
alertDefinitions.Delete("/:alertDefinitionUID", middleware.ReqEditorRole, ng.validateOrgAlertDefinition, routing.Wrap(ng.deleteAlertDefinitionEndpoint))
alertDefinitions.Post("/", middleware.ReqEditorRole, binding.Bind(saveAlertDefinitionCommand{}), routing.Wrap(ng.createAlertDefinitionEndpoint))
alertDefinitions.Put("/:alertDefinitionUID", middleware.ReqEditorRole, ng.validateOrgAlertDefinition, binding.Bind(updateAlertDefinitionCommand{}), routing.Wrap(ng.updateAlertDefinitionEndpoint))
alertDefinitions.Post("/pause", middleware.ReqEditorRole, binding.Bind(updateAlertDefinitionPausedCommand{}), routing.Wrap(ng.alertDefinitionPauseEndpoint))
alertDefinitions.Post("/unpause", middleware.ReqEditorRole, binding.Bind(updateAlertDefinitionPausedCommand{}), routing.Wrap(ng.alertDefinitionUnpauseEndpoint))
type apiImpl struct {
Cfg *setting.Cfg `inject:""`
DatasourceCache datasources.CacheService `inject:""`
RouteRegister routing.RouteRegister `inject:""`
schedule scheduleService
store store
}
func (api *apiImpl) registerAPIEndpoints() {
api.RouteRegister.Group("/api/alert-definitions", func(alertDefinitions routing.RouteRegister) {
alertDefinitions.Get("", middleware.ReqSignedIn, routing.Wrap(api.listAlertDefinitions))
alertDefinitions.Get("/eval/:alertDefinitionUID", middleware.ReqSignedIn, api.validateOrgAlertDefinition, routing.Wrap(api.alertDefinitionEvalEndpoint))
alertDefinitions.Post("/eval", middleware.ReqSignedIn, binding.Bind(evalAlertConditionCommand{}), routing.Wrap(api.conditionEvalEndpoint))
alertDefinitions.Get("/:alertDefinitionUID", middleware.ReqSignedIn, api.validateOrgAlertDefinition, routing.Wrap(api.getAlertDefinitionEndpoint))
alertDefinitions.Delete("/:alertDefinitionUID", middleware.ReqEditorRole, api.validateOrgAlertDefinition, routing.Wrap(api.deleteAlertDefinitionEndpoint))
alertDefinitions.Post("/", middleware.ReqEditorRole, binding.Bind(saveAlertDefinitionCommand{}), routing.Wrap(api.createAlertDefinitionEndpoint))
alertDefinitions.Put("/:alertDefinitionUID", middleware.ReqEditorRole, api.validateOrgAlertDefinition, binding.Bind(updateAlertDefinitionCommand{}), routing.Wrap(api.updateAlertDefinitionEndpoint))
alertDefinitions.Post("/pause", middleware.ReqEditorRole, binding.Bind(updateAlertDefinitionPausedCommand{}), routing.Wrap(api.alertDefinitionPauseEndpoint))
alertDefinitions.Post("/unpause", middleware.ReqEditorRole, binding.Bind(updateAlertDefinitionPausedCommand{}), routing.Wrap(api.alertDefinitionUnpauseEndpoint))
})
ng.RouteRegister.Group("/api/ngalert/", func(schedulerRouter routing.RouteRegister) {
schedulerRouter.Post("/pause", routing.Wrap(ng.pauseScheduler))
schedulerRouter.Post("/unpause", routing.Wrap(ng.unpauseScheduler))
api.RouteRegister.Group("/api/ngalert/", func(schedulerRouter routing.RouteRegister) {
schedulerRouter.Post("/pause", routing.Wrap(api.pauseScheduler))
schedulerRouter.Post("/unpause", routing.Wrap(api.unpauseScheduler))
}, middleware.ReqOrgAdmin)
ng.RouteRegister.Group("/api/alert-instances", func(alertInstances routing.RouteRegister) {
alertInstances.Get("", middleware.ReqSignedIn, routing.Wrap(ng.listAlertInstancesEndpoint))
api.RouteRegister.Group("/api/alert-instances", func(alertInstances routing.RouteRegister) {
alertInstances.Get("", middleware.ReqSignedIn, routing.Wrap(api.listAlertInstancesEndpoint))
})
}
// conditionEvalEndpoint handles POST /api/alert-definitions/eval.
func (ng *AlertNG) conditionEvalEndpoint(c *models.ReqContext, cmd evalAlertConditionCommand) response.Response {
func (api *apiImpl) conditionEvalEndpoint(c *models.ReqContext, cmd evalAlertConditionCommand) response.Response {
evalCond := eval.Condition{
RefID: cmd.Condition,
OrgID: c.SignedInUser.OrgId,
QueriesAndExpressions: cmd.Data,
}
if err := ng.validateCondition(evalCond, c.SignedInUser, c.SkipCache); err != nil {
if err := api.validateCondition(evalCond, c.SignedInUser, c.SkipCache); err != nil {
return response.Error(400, "invalid condition", err)
}
@ -53,9 +63,8 @@ func (ng *AlertNG) conditionEvalEndpoint(c *models.ReqContext, cmd evalAlertCond
now = timeNow()
}
evaluator := eval.Evaluator{Cfg: ng.Cfg}
evalResults, err := evaluator.ConditionEval(&evalCond, now)
evaluator := eval.Evaluator{Cfg: api.Cfg}
evalResults, err := evaluator.ConditionEval(&evalCond, timeNow())
if err != nil {
return response.Error(400, "Failed to evaluate conditions", err)
}
@ -72,20 +81,20 @@ func (ng *AlertNG) conditionEvalEndpoint(c *models.ReqContext, cmd evalAlertCond
})
}
// alertDefinitionEvalEndpoint handles POST /api/alert-definitions/eval/:alertDefinitionUID.
func (ng *AlertNG) alertDefinitionEvalEndpoint(c *models.ReqContext) response.Response {
// alertDefinitionEvalEndpoint handles GET /api/alert-definitions/eval/:alertDefinitionUID.
func (api *apiImpl) alertDefinitionEvalEndpoint(c *models.ReqContext) response.Response {
alertDefinitionUID := c.Params(":alertDefinitionUID")
condition, err := ng.LoadAlertCondition(alertDefinitionUID, c.SignedInUser.OrgId)
condition, err := api.LoadAlertCondition(alertDefinitionUID, c.SignedInUser.OrgId)
if err != nil {
return response.Error(400, "Failed to load alert definition conditions", err)
}
if err := ng.validateCondition(*condition, c.SignedInUser, c.SkipCache); err != nil {
if err := api.validateCondition(*condition, c.SignedInUser, c.SkipCache); err != nil {
return response.Error(400, "invalid condition", err)
}
evaluator := eval.Evaluator{Cfg: ng.Cfg}
evaluator := eval.Evaluator{Cfg: api.Cfg}
evalResults, err := evaluator.ConditionEval(condition, timeNow())
if err != nil {
return response.Error(400, "Failed to evaluate alert", err)
@ -107,7 +116,7 @@ func (ng *AlertNG) alertDefinitionEvalEndpoint(c *models.ReqContext) response.Re
}
// getAlertDefinitionEndpoint handles GET /api/alert-definitions/:alertDefinitionUID.
func (ng *AlertNG) getAlertDefinitionEndpoint(c *models.ReqContext) response.Response {
func (api *apiImpl) getAlertDefinitionEndpoint(c *models.ReqContext) response.Response {
alertDefinitionUID := c.Params(":alertDefinitionUID")
query := getAlertDefinitionByUIDQuery{
@ -115,7 +124,7 @@ func (ng *AlertNG) getAlertDefinitionEndpoint(c *models.ReqContext) response.Res
OrgID: c.SignedInUser.OrgId,
}
if err := ng.getAlertDefinitionByUID(&query); err != nil {
if err := api.store.getAlertDefinitionByUID(&query); err != nil {
return response.Error(500, "Failed to get alert definition", err)
}
@ -123,7 +132,7 @@ func (ng *AlertNG) getAlertDefinitionEndpoint(c *models.ReqContext) response.Res
}
// deleteAlertDefinitionEndpoint handles DELETE /api/alert-definitions/:alertDefinitionUID.
func (ng *AlertNG) deleteAlertDefinitionEndpoint(c *models.ReqContext) response.Response {
func (api *apiImpl) deleteAlertDefinitionEndpoint(c *models.ReqContext) response.Response {
alertDefinitionUID := c.Params(":alertDefinitionUID")
cmd := deleteAlertDefinitionByUIDCommand{
@ -131,7 +140,7 @@ func (ng *AlertNG) deleteAlertDefinitionEndpoint(c *models.ReqContext) response.
OrgID: c.SignedInUser.OrgId,
}
if err := ng.deleteAlertDefinitionByUID(&cmd); err != nil {
if err := api.store.deleteAlertDefinitionByUID(&cmd); err != nil {
return response.Error(500, "Failed to delete alert definition", err)
}
@ -139,7 +148,7 @@ func (ng *AlertNG) deleteAlertDefinitionEndpoint(c *models.ReqContext) response.
}
// updateAlertDefinitionEndpoint handles PUT /api/alert-definitions/:alertDefinitionUID.
func (ng *AlertNG) updateAlertDefinitionEndpoint(c *models.ReqContext, cmd updateAlertDefinitionCommand) response.Response {
func (api *apiImpl) updateAlertDefinitionEndpoint(c *models.ReqContext, cmd updateAlertDefinitionCommand) response.Response {
cmd.UID = c.Params(":alertDefinitionUID")
cmd.OrgID = c.SignedInUser.OrgId
@ -148,11 +157,11 @@ func (ng *AlertNG) updateAlertDefinitionEndpoint(c *models.ReqContext, cmd updat
OrgID: c.SignedInUser.OrgId,
QueriesAndExpressions: cmd.Data,
}
if err := ng.validateCondition(evalCond, c.SignedInUser, c.SkipCache); err != nil {
if err := api.validateCondition(evalCond, c.SignedInUser, c.SkipCache); err != nil {
return response.Error(400, "invalid condition", err)
}
if err := ng.updateAlertDefinition(&cmd); err != nil {
if err := api.store.updateAlertDefinition(&cmd); err != nil {
return response.Error(500, "Failed to update alert definition", err)
}
@ -160,7 +169,7 @@ func (ng *AlertNG) updateAlertDefinitionEndpoint(c *models.ReqContext, cmd updat
}
// createAlertDefinitionEndpoint handles POST /api/alert-definitions.
func (ng *AlertNG) createAlertDefinitionEndpoint(c *models.ReqContext, cmd saveAlertDefinitionCommand) response.Response {
func (api *apiImpl) createAlertDefinitionEndpoint(c *models.ReqContext, cmd saveAlertDefinitionCommand) response.Response {
cmd.OrgID = c.SignedInUser.OrgId
evalCond := eval.Condition{
@ -168,11 +177,11 @@ func (ng *AlertNG) createAlertDefinitionEndpoint(c *models.ReqContext, cmd saveA
OrgID: c.SignedInUser.OrgId,
QueriesAndExpressions: cmd.Data,
}
if err := ng.validateCondition(evalCond, c.SignedInUser, c.SkipCache); err != nil {
if err := api.validateCondition(evalCond, c.SignedInUser, c.SkipCache); err != nil {
return response.Error(400, "invalid condition", err)
}
if err := ng.saveAlertDefinition(&cmd); err != nil {
if err := api.store.saveAlertDefinition(&cmd); err != nil {
return response.Error(500, "Failed to create alert definition", err)
}
@ -180,26 +189,26 @@ func (ng *AlertNG) createAlertDefinitionEndpoint(c *models.ReqContext, cmd saveA
}
// listAlertDefinitions handles GET /api/alert-definitions.
func (ng *AlertNG) listAlertDefinitions(c *models.ReqContext) response.Response {
func (api *apiImpl) listAlertDefinitions(c *models.ReqContext) response.Response {
query := listAlertDefinitionsQuery{OrgID: c.SignedInUser.OrgId}
if err := ng.getOrgAlertDefinitions(&query); err != nil {
if err := api.store.getOrgAlertDefinitions(&query); err != nil {
return response.Error(500, "Failed to list alert definitions", err)
}
return response.JSON(200, util.DynMap{"results": query.Result})
}
func (ng *AlertNG) pauseScheduler() response.Response {
err := ng.schedule.pause()
func (api *apiImpl) pauseScheduler() response.Response {
err := api.schedule.Pause()
if err != nil {
return response.Error(500, "Failed to pause scheduler", err)
}
return response.JSON(200, util.DynMap{"message": "alert definition scheduler paused"})
}
func (ng *AlertNG) unpauseScheduler() response.Response {
err := ng.schedule.unpause()
func (api *apiImpl) unpauseScheduler() response.Response {
err := api.schedule.Unpause()
if err != nil {
return response.Error(500, "Failed to unpause scheduler", err)
}
@ -207,11 +216,11 @@ func (ng *AlertNG) unpauseScheduler() response.Response {
}
// alertDefinitionPauseEndpoint handles POST /api/alert-definitions/pause.
func (ng *AlertNG) alertDefinitionPauseEndpoint(c *models.ReqContext, cmd updateAlertDefinitionPausedCommand) response.Response {
func (api *apiImpl) alertDefinitionPauseEndpoint(c *models.ReqContext, cmd updateAlertDefinitionPausedCommand) response.Response {
cmd.OrgID = c.SignedInUser.OrgId
cmd.Paused = true
err := ng.updateAlertDefinitionPaused(&cmd)
err := api.store.updateAlertDefinitionPaused(&cmd)
if err != nil {
return response.Error(500, "Failed to pause alert definition", err)
}
@ -219,11 +228,11 @@ func (ng *AlertNG) alertDefinitionPauseEndpoint(c *models.ReqContext, cmd update
}
// alertDefinitionUnpauseEndpoint handles POST /api/alert-definitions/unpause.
func (ng *AlertNG) alertDefinitionUnpauseEndpoint(c *models.ReqContext, cmd updateAlertDefinitionPausedCommand) response.Response {
func (api *apiImpl) alertDefinitionUnpauseEndpoint(c *models.ReqContext, cmd updateAlertDefinitionPausedCommand) response.Response {
cmd.OrgID = c.SignedInUser.OrgId
cmd.Paused = false
err := ng.updateAlertDefinitionPaused(&cmd)
err := api.store.updateAlertDefinitionPaused(&cmd)
if err != nil {
return response.Error(500, "Failed to unpause alert definition", err)
}

@ -17,23 +17,20 @@ import (
"github.com/stretchr/testify/require"
)
func setupTestEnv(t *testing.T) *AlertNG {
func setupTestEnv(t *testing.T, baseIntervalSeconds int64) (AlertNG, *storeImpl) {
cfg := setting.NewCfg()
cfg.FeatureToggles = map[string]bool{"ngalert": true}
ng := overrideAlertNGInRegistry(cfg)
sqlStore := sqlstore.InitTestDB(t)
ng.SQLStore = sqlStore
ng := overrideAlertNGInRegistry(t, cfg)
ng.SQLStore = sqlstore.InitTestDB(t)
err := ng.Init()
require.NoError(t, err)
return &ng
return ng, &storeImpl{SQLStore: ng.SQLStore, baseInterval: time.Duration(baseIntervalSeconds) * time.Second}
}
func overrideAlertNGInRegistry(cfg *setting.Cfg) AlertNG {
func overrideAlertNGInRegistry(t *testing.T, cfg *setting.Cfg) AlertNG {
ng := AlertNG{
SQLStore: nil,
Cfg: cfg,
RouteRegister: routing.NewRouteRegister(),
log: log.New("ngalert-test"),
@ -42,7 +39,7 @@ func overrideAlertNGInRegistry(cfg *setting.Cfg) AlertNG {
overrideServiceFunc := func(descriptor registry.Descriptor) (*registry.Descriptor, bool) {
if _, ok := descriptor.Instance.(*AlertNG); ok {
return &registry.Descriptor{
Name: "AlertNG",
Name: descriptor.Name,
Instance: &ng,
InitPriority: descriptor.InitPriority,
}, true
@ -55,7 +52,7 @@ func overrideAlertNGInRegistry(cfg *setting.Cfg) AlertNG {
return ng
}
func createTestAlertDefinition(t *testing.T, ng *AlertNG, intervalSeconds int64) *AlertDefinition {
func createTestAlertDefinition(t *testing.T, store *storeImpl, intervalSeconds int64) *AlertDefinition {
cmd := saveAlertDefinitionCommand{
OrgID: 1,
Title: fmt.Sprintf("an alert definition %d", rand.Intn(1000)),
@ -76,7 +73,7 @@ func createTestAlertDefinition(t *testing.T, ng *AlertNG, intervalSeconds int64)
},
IntervalSeconds: &intervalSeconds,
}
err := ng.saveAlertDefinition(&cmd)
err := store.saveAlertDefinition(&cmd)
require.NoError(t, err)
t.Logf("alert definition: %v with interval: %d created", cmd.Result.getKey(), intervalSeconds)
return cmd.Result

@ -5,11 +5,32 @@ import (
"errors"
"fmt"
"strings"
"time"
"github.com/grafana/grafana/pkg/services/sqlstore"
"github.com/grafana/grafana/pkg/util"
)
type store interface {
deleteAlertDefinitionByUID(*deleteAlertDefinitionByUIDCommand) error
getAlertDefinitionByUID(*getAlertDefinitionByUIDQuery) error
getAlertDefinitions(query *listAlertDefinitionsQuery) error
getOrgAlertDefinitions(query *listAlertDefinitionsQuery) error
saveAlertDefinition(*saveAlertDefinitionCommand) error
updateAlertDefinition(*updateAlertDefinitionCommand) error
getAlertInstance(*getAlertInstanceQuery) error
listAlertInstances(cmd *listAlertInstancesQuery) error
saveAlertInstance(cmd *saveAlertInstanceCommand) error
validateAlertDefinition(*AlertDefinition, bool) error
updateAlertDefinitionPaused(*updateAlertDefinitionPausedCommand) error
}
type storeImpl struct {
// the base scheduler tick rate; it's used for validating definition interval
baseInterval time.Duration
SQLStore *sqlstore.SQLStore `inject:""`
}
func getAlertDefinitionByUID(sess *sqlstore.DBSession, alertDefinitionUID string, orgID int64) (*AlertDefinition, error) {
// we consider optionally enabling some caching
alertDefinition := AlertDefinition{OrgID: orgID, UID: alertDefinitionUID}
@ -25,8 +46,8 @@ func getAlertDefinitionByUID(sess *sqlstore.DBSession, alertDefinitionUID string
// deleteAlertDefinitionByID is a handler for deleting an alert definition.
// It returns models.ErrAlertDefinitionNotFound if no alert definition is found for the provided ID.
func (ng *AlertNG) deleteAlertDefinitionByUID(cmd *deleteAlertDefinitionByUIDCommand) error {
return ng.SQLStore.WithTransactionalDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
func (st storeImpl) deleteAlertDefinitionByUID(cmd *deleteAlertDefinitionByUIDCommand) error {
return st.SQLStore.WithTransactionalDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
_, err := sess.Exec("DELETE FROM alert_definition WHERE uid = ? AND org_id = ?", cmd.UID, cmd.OrgID)
if err != nil {
return err
@ -47,8 +68,8 @@ func (ng *AlertNG) deleteAlertDefinitionByUID(cmd *deleteAlertDefinitionByUIDCom
// getAlertDefinitionByUID is a handler for retrieving an alert definition from that database by its UID and organisation ID.
// It returns models.ErrAlertDefinitionNotFound if no alert definition is found for the provided ID.
func (ng *AlertNG) getAlertDefinitionByUID(query *getAlertDefinitionByUIDQuery) error {
return ng.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
func (st storeImpl) getAlertDefinitionByUID(query *getAlertDefinitionByUIDQuery) error {
return st.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
alertDefinition, err := getAlertDefinitionByUID(sess, query.UID, query.OrgID)
if err != nil {
return err
@ -59,8 +80,8 @@ func (ng *AlertNG) getAlertDefinitionByUID(query *getAlertDefinitionByUIDQuery)
}
// saveAlertDefinition is a handler for saving a new alert definition.
func (ng *AlertNG) saveAlertDefinition(cmd *saveAlertDefinitionCommand) error {
return ng.SQLStore.WithTransactionalDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
func (st storeImpl) saveAlertDefinition(cmd *saveAlertDefinitionCommand) error {
return st.SQLStore.WithTransactionalDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
intervalSeconds := defaultIntervalSeconds
if cmd.IntervalSeconds != nil {
intervalSeconds = *cmd.IntervalSeconds
@ -83,7 +104,7 @@ func (ng *AlertNG) saveAlertDefinition(cmd *saveAlertDefinitionCommand) error {
UID: uid,
}
if err := ng.validateAlertDefinition(alertDefinition, false); err != nil {
if err := st.validateAlertDefinition(alertDefinition, false); err != nil {
return err
}
@ -92,7 +113,7 @@ func (ng *AlertNG) saveAlertDefinition(cmd *saveAlertDefinitionCommand) error {
}
if _, err := sess.Insert(alertDefinition); err != nil {
if ng.SQLStore.Dialect.IsUniqueConstraintViolation(err) && strings.Contains(err.Error(), "title") {
if st.SQLStore.Dialect.IsUniqueConstraintViolation(err) && strings.Contains(err.Error(), "title") {
return fmt.Errorf("an alert definition with the title '%s' already exists: %w", cmd.Title, err)
}
return err
@ -119,8 +140,8 @@ func (ng *AlertNG) saveAlertDefinition(cmd *saveAlertDefinitionCommand) error {
// updateAlertDefinition is a handler for updating an existing alert definition.
// It returns models.ErrAlertDefinitionNotFound if no alert definition is found for the provided ID.
func (ng *AlertNG) updateAlertDefinition(cmd *updateAlertDefinitionCommand) error {
return ng.SQLStore.WithTransactionalDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
func (st storeImpl) updateAlertDefinition(cmd *updateAlertDefinitionCommand) error {
return st.SQLStore.WithTransactionalDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
existingAlertDefinition, err := getAlertDefinitionByUID(sess, cmd.UID, cmd.OrgID)
if err != nil {
if errors.Is(err, errAlertDefinitionNotFound) {
@ -157,7 +178,7 @@ func (ng *AlertNG) updateAlertDefinition(cmd *updateAlertDefinitionCommand) erro
UID: existingAlertDefinition.UID,
}
if err := ng.validateAlertDefinition(alertDefinition, true); err != nil {
if err := st.validateAlertDefinition(alertDefinition, true); err != nil {
return err
}
@ -169,7 +190,7 @@ func (ng *AlertNG) updateAlertDefinition(cmd *updateAlertDefinitionCommand) erro
_, err = sess.ID(existingAlertDefinition.ID).Update(alertDefinition)
if err != nil {
if ng.SQLStore.Dialect.IsUniqueConstraintViolation(err) && strings.Contains(err.Error(), "title") {
if st.SQLStore.Dialect.IsUniqueConstraintViolation(err) && strings.Contains(err.Error(), "title") {
return fmt.Errorf("an alert definition with the title '%s' already exists: %w", cmd.Title, err)
}
return err
@ -196,8 +217,8 @@ func (ng *AlertNG) updateAlertDefinition(cmd *updateAlertDefinitionCommand) erro
}
// getOrgAlertDefinitions is a handler for retrieving alert definitions of specific organisation.
func (ng *AlertNG) getOrgAlertDefinitions(query *listAlertDefinitionsQuery) error {
return ng.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
func (st storeImpl) getOrgAlertDefinitions(query *listAlertDefinitionsQuery) error {
return st.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
alertDefinitions := make([]*AlertDefinition, 0)
q := "SELECT * FROM alert_definition WHERE org_id = ?"
if err := sess.SQL(q, query.OrgID).Find(&alertDefinitions); err != nil {
@ -209,8 +230,8 @@ func (ng *AlertNG) getOrgAlertDefinitions(query *listAlertDefinitionsQuery) erro
})
}
func (ng *AlertNG) getAlertDefinitions(query *listAlertDefinitionsQuery) error {
return ng.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
func (st storeImpl) getAlertDefinitions(query *listAlertDefinitionsQuery) error {
return st.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
alerts := make([]*AlertDefinition, 0)
q := "SELECT uid, org_id, interval_seconds, version, paused FROM alert_definition"
if err := sess.SQL(q).Find(&alerts); err != nil {
@ -222,8 +243,8 @@ func (ng *AlertNG) getAlertDefinitions(query *listAlertDefinitionsQuery) error {
})
}
func (ng *AlertNG) updateAlertDefinitionPaused(cmd *updateAlertDefinitionPausedCommand) error {
return ng.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
func (st storeImpl) updateAlertDefinitionPaused(cmd *updateAlertDefinitionPausedCommand) error {
return st.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
if len(cmd.UIDs) == 0 {
return nil
}
@ -250,8 +271,8 @@ func (ng *AlertNG) updateAlertDefinitionPaused(cmd *updateAlertDefinitionPausedC
if err != nil {
return err
}
if cmd.ResultCount, err = res.RowsAffected(); err != nil {
ng.log.Debug("failed to get rows affected: %w", err)
if resultCount, err := res.RowsAffected(); err == nil {
cmd.ResultCount = resultCount
}
return nil
})

@ -68,10 +68,12 @@ func TestCreatingAlertDefinition(t *testing.T) {
expectedError: errEmptyTitleError,
},
}
_, store := setupTestEnv(t, baseIntervalSeconds)
t.Cleanup(registry.ClearOverrides)
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
ng := setupTestEnv(t)
t.Cleanup(registry.ClearOverrides)
q := saveAlertDefinitionCommand{
OrgID: 1,
@ -95,7 +97,7 @@ func TestCreatingAlertDefinition(t *testing.T) {
if tc.inputIntervalSeconds != nil {
q.IntervalSeconds = tc.inputIntervalSeconds
}
err := ng.saveAlertDefinition(&q)
err := store.saveAlertDefinition(&q)
switch {
case tc.expectedError != nil:
require.Error(t, err)
@ -111,7 +113,7 @@ func TestCreatingAlertDefinition(t *testing.T) {
}
func TestCreatingConflictionAlertDefinition(t *testing.T) {
t.Run("Should fail to create alert definition with conflicting org_id, title", func(t *testing.T) {
ng := setupTestEnv(t)
_, store := setupTestEnv(t, baseIntervalSeconds)
t.Cleanup(registry.ClearOverrides)
q := saveAlertDefinitionCommand{
@ -134,12 +136,12 @@ func TestCreatingConflictionAlertDefinition(t *testing.T) {
},
}
err := ng.saveAlertDefinition(&q)
err := store.saveAlertDefinition(&q)
require.NoError(t, err)
err = ng.saveAlertDefinition(&q)
err = store.saveAlertDefinition(&q)
require.Error(t, err)
assert.True(t, ng.SQLStore.Dialect.IsUniqueConstraintViolation(err))
assert.True(t, store.SQLStore.Dialect.IsUniqueConstraintViolation(err))
})
}
@ -148,7 +150,7 @@ func TestUpdatingAlertDefinition(t *testing.T) {
mockTimeNow()
defer resetTimeNow()
ng := setupTestEnv(t)
_, store := setupTestEnv(t, baseIntervalSeconds)
t.Cleanup(registry.ClearOverrides)
q := updateAlertDefinitionCommand{
@ -172,7 +174,7 @@ func TestUpdatingAlertDefinition(t *testing.T) {
},
}
err := ng.updateAlertDefinition(&q)
err := store.updateAlertDefinition(&q)
require.NoError(t, err)
})
@ -180,11 +182,11 @@ func TestUpdatingAlertDefinition(t *testing.T) {
mockTimeNow()
defer resetTimeNow()
ng := setupTestEnv(t)
_, store := setupTestEnv(t, baseIntervalSeconds)
t.Cleanup(registry.ClearOverrides)
var initialInterval int64 = 120
alertDefinition := createTestAlertDefinition(t, ng, initialInterval)
alertDefinition := createTestAlertDefinition(t, store, initialInterval)
created := alertDefinition.Updated
var customInterval int64 = 30
@ -273,7 +275,7 @@ func TestUpdatingAlertDefinition(t *testing.T) {
q.OrgID = tc.inputOrgID
}
q.Title = tc.inputTitle
err := ng.updateAlertDefinition(&q)
err := store.updateAlertDefinition(&q)
switch {
case tc.expectedError != nil:
require.Error(t, err)
@ -319,12 +321,12 @@ func TestUpdatingConflictingAlertDefinition(t *testing.T) {
mockTimeNow()
defer resetTimeNow()
ng := setupTestEnv(t)
_, store := setupTestEnv(t, baseIntervalSeconds)
t.Cleanup(registry.ClearOverrides)
var initialInterval int64 = 120
alertDef1 := createTestAlertDefinition(t, ng, initialInterval)
alertDef2 := createTestAlertDefinition(t, ng, initialInterval)
alertDef1 := createTestAlertDefinition(t, store, initialInterval)
alertDef2 := createTestAlertDefinition(t, store, initialInterval)
q := updateAlertDefinitionCommand{
UID: (*alertDef2).UID,
@ -346,15 +348,15 @@ func TestUpdatingConflictingAlertDefinition(t *testing.T) {
},
}
err := ng.updateAlertDefinition(&q)
err := store.updateAlertDefinition(&q)
require.Error(t, err)
assert.True(t, ng.SQLStore.Dialect.IsUniqueConstraintViolation(err))
assert.True(t, store.SQLStore.Dialect.IsUniqueConstraintViolation(err))
})
}
func TestDeletingAlertDefinition(t *testing.T) {
t.Run("zero rows affected when deleting unknown alert", func(t *testing.T) {
ng := setupTestEnv(t)
_, store := setupTestEnv(t, baseIntervalSeconds)
t.Cleanup(registry.ClearOverrides)
q := deleteAlertDefinitionByUIDCommand{
@ -362,15 +364,15 @@ func TestDeletingAlertDefinition(t *testing.T) {
OrgID: 1,
}
err := ng.deleteAlertDefinitionByUID(&q)
err := store.deleteAlertDefinitionByUID(&q)
require.NoError(t, err)
})
t.Run("deleting successfully existing alert", func(t *testing.T) {
ng := setupTestEnv(t)
_, store := setupTestEnv(t, baseIntervalSeconds)
t.Cleanup(registry.ClearOverrides)
alertDefinition := createTestAlertDefinition(t, ng, 60)
alertDefinition := createTestAlertDefinition(t, store, 60)
q := deleteAlertDefinitionByUIDCommand{
UID: (*alertDefinition).UID,
@ -384,21 +386,21 @@ func TestDeletingAlertDefinition(t *testing.T) {
State: InstanceStateFiring,
Labels: InstanceLabels{"test": "testValue"},
}
err := ng.saveAlertInstance(saveCmd)
err := store.saveAlertInstance(saveCmd)
require.NoError(t, err)
listCommand := &listAlertInstancesQuery{
DefinitionOrgID: alertDefinition.OrgID,
DefinitionUID: alertDefinition.UID,
}
err = ng.listAlertInstances(listCommand)
err = store.listAlertInstances(listCommand)
require.NoError(t, err)
require.Len(t, listCommand.Result, 1)
err = ng.deleteAlertDefinitionByUID(&q)
err = store.deleteAlertDefinitionByUID(&q)
require.NoError(t, err)
// assert that alert instance is deleted
err = ng.listAlertInstances(listCommand)
err = store.listAlertInstances(listCommand)
require.NoError(t, err)
require.Len(t, listCommand.Result, 0)

@ -4,11 +4,11 @@ import (
"time"
)
func (ng *AlertNG) fetchAllDetails(now time.Time) []*AlertDefinition {
func (sch *schedule) fetchAllDetails(now time.Time) []*AlertDefinition {
q := listAlertDefinitionsQuery{}
err := ng.getAlertDefinitions(&q)
err := sch.store.getAlertDefinitions(&q)
if err != nil {
ng.schedule.log.Error("failed to fetch alert definitions", "now", now, "err", err)
sch.log.Error("failed to fetch alert definitions", "now", now, "err", err)
return nil
}
return q.Result

@ -6,10 +6,10 @@ import (
)
// listAlertInstancesEndpoint handles GET /api/alert-instances.
func (ng *AlertNG) listAlertInstancesEndpoint(c *models.ReqContext) response.Response {
func (api *apiImpl) listAlertInstancesEndpoint(c *models.ReqContext) response.Response {
cmd := listAlertInstancesQuery{DefinitionOrgID: c.SignedInUser.OrgId}
if err := ng.listAlertInstances(&cmd); err != nil {
if err := api.store.listAlertInstances(&cmd); err != nil {
return response.Error(500, "Failed to list alert instances", err)
}

@ -12,8 +12,8 @@ import (
// getAlertInstance is a handler for retrieving an alert instance based on OrgId, AlertDefintionID, and
// the hash of the labels.
// nolint:unused
func (ng *AlertNG) getAlertInstance(cmd *getAlertInstanceQuery) error {
return ng.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
func (st storeImpl) getAlertInstance(cmd *getAlertInstanceQuery) error {
return st.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
instance := AlertInstance{}
s := strings.Builder{}
s.WriteString(`SELECT * FROM alert_instance
@ -45,8 +45,8 @@ func (ng *AlertNG) getAlertInstance(cmd *getAlertInstanceQuery) error {
// listAlertInstances is a handler for retrieving alert instances within specific organisation
// based on various filters.
func (ng *AlertNG) listAlertInstances(cmd *listAlertInstancesQuery) error {
return ng.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
func (st storeImpl) listAlertInstances(cmd *listAlertInstancesQuery) error {
return st.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
alertInstances := make([]*listAlertInstancesQueryResult, 0)
s := strings.Builder{}
@ -78,8 +78,8 @@ func (ng *AlertNG) listAlertInstances(cmd *listAlertInstancesQuery) error {
// saveAlertDefinition is a handler for saving a new alert definition.
// nolint:unused
func (ng *AlertNG) saveAlertInstance(cmd *saveAlertInstanceCommand) error {
return ng.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
func (st storeImpl) saveAlertInstance(cmd *saveAlertInstanceCommand) error {
return st.SQLStore.WithDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
labelTupleJSON, labelsHash, err := cmd.Labels.StringAndHash()
if err != nil {
return err
@ -101,7 +101,7 @@ func (ng *AlertNG) saveAlertInstance(cmd *saveAlertInstanceCommand) error {
params := append(make([]interface{}, 0), alertInstance.DefinitionOrgID, alertInstance.DefinitionUID, labelTupleJSON, alertInstance.LabelsHash, alertInstance.CurrentState, alertInstance.CurrentStateSince.Unix(), alertInstance.LastEvalTime.Unix())
upsertSQL := ng.SQLStore.Dialect.UpsertSQL(
upsertSQL := st.SQLStore.Dialect.UpsertSQL(
"alert_instance",
[]string{"def_org_id", "def_uid", "labels_hash"},
[]string{"def_org_id", "def_uid", "labels", "labels_hash", "current_state", "current_state_since", "last_eval_time"})

@ -9,18 +9,18 @@ import (
)
func TestAlertInstanceOperations(t *testing.T) {
ng := setupTestEnv(t)
_, store := setupTestEnv(t, baseIntervalSeconds)
alertDefinition1 := createTestAlertDefinition(t, ng, 60)
alertDefinition1 := createTestAlertDefinition(t, store, 60)
orgID := alertDefinition1.OrgID
alertDefinition2 := createTestAlertDefinition(t, ng, 60)
alertDefinition2 := createTestAlertDefinition(t, store, 60)
require.Equal(t, orgID, alertDefinition2.OrgID)
alertDefinition3 := createTestAlertDefinition(t, ng, 60)
alertDefinition3 := createTestAlertDefinition(t, store, 60)
require.Equal(t, orgID, alertDefinition3.OrgID)
alertDefinition4 := createTestAlertDefinition(t, ng, 60)
alertDefinition4 := createTestAlertDefinition(t, store, 60)
require.Equal(t, orgID, alertDefinition4.OrgID)
t.Run("can save and read new alert instance", func(t *testing.T) {
@ -30,7 +30,7 @@ func TestAlertInstanceOperations(t *testing.T) {
State: InstanceStateFiring,
Labels: InstanceLabels{"test": "testValue"},
}
err := ng.saveAlertInstance(saveCmd)
err := store.saveAlertInstance(saveCmd)
require.NoError(t, err)
getCmd := &getAlertInstanceQuery{
@ -39,7 +39,7 @@ func TestAlertInstanceOperations(t *testing.T) {
Labels: InstanceLabels{"test": "testValue"},
}
err = ng.getAlertInstance(getCmd)
err = store.getAlertInstance(getCmd)
require.NoError(t, err)
require.Equal(t, saveCmd.Labels, getCmd.Result.Labels)
@ -53,7 +53,7 @@ func TestAlertInstanceOperations(t *testing.T) {
DefinitionUID: alertDefinition2.UID,
State: InstanceStateNormal,
}
err := ng.saveAlertInstance(saveCmd)
err := store.saveAlertInstance(saveCmd)
require.NoError(t, err)
getCmd := &getAlertInstanceQuery{
@ -61,7 +61,7 @@ func TestAlertInstanceOperations(t *testing.T) {
DefinitionUID: saveCmd.DefinitionUID,
}
err = ng.getAlertInstance(getCmd)
err = store.getAlertInstance(getCmd)
require.NoError(t, err)
require.Equal(t, alertDefinition2.OrgID, getCmd.Result.DefinitionOrgID)
@ -77,7 +77,7 @@ func TestAlertInstanceOperations(t *testing.T) {
Labels: InstanceLabels{"test": "testValue"},
}
err := ng.saveAlertInstance(saveCmdOne)
err := store.saveAlertInstance(saveCmdOne)
require.NoError(t, err)
saveCmdTwo := &saveAlertInstanceCommand{
@ -86,7 +86,7 @@ func TestAlertInstanceOperations(t *testing.T) {
State: InstanceStateFiring,
Labels: InstanceLabels{"test": "meow"},
}
err = ng.saveAlertInstance(saveCmdTwo)
err = store.saveAlertInstance(saveCmdTwo)
require.NoError(t, err)
listCommand := &listAlertInstancesQuery{
@ -94,7 +94,7 @@ func TestAlertInstanceOperations(t *testing.T) {
DefinitionUID: saveCmdOne.DefinitionUID,
}
err = ng.listAlertInstances(listCommand)
err = store.listAlertInstances(listCommand)
require.NoError(t, err)
require.Len(t, listCommand.Result, 2)
@ -105,7 +105,7 @@ func TestAlertInstanceOperations(t *testing.T) {
DefinitionOrgID: orgID,
}
err := ng.listAlertInstances(listCommand)
err := store.listAlertInstances(listCommand)
require.NoError(t, err)
require.Len(t, listCommand.Result, 4)
@ -117,7 +117,7 @@ func TestAlertInstanceOperations(t *testing.T) {
State: InstanceStateNormal,
}
err := ng.listAlertInstances(listCommand)
err := store.listAlertInstances(listCommand)
require.NoError(t, err)
require.Len(t, listCommand.Result, 1)
@ -131,7 +131,7 @@ func TestAlertInstanceOperations(t *testing.T) {
Labels: InstanceLabels{"test": "testValue"},
}
err := ng.saveAlertInstance(saveCmdOne)
err := store.saveAlertInstance(saveCmdOne)
require.NoError(t, err)
saveCmdTwo := &saveAlertInstanceCommand{
@ -140,7 +140,7 @@ func TestAlertInstanceOperations(t *testing.T) {
State: InstanceStateNormal,
Labels: InstanceLabels{"test": "testValue"},
}
err = ng.saveAlertInstance(saveCmdTwo)
err = store.saveAlertInstance(saveCmdTwo)
require.NoError(t, err)
listCommand := &listAlertInstancesQuery{
@ -148,7 +148,7 @@ func TestAlertInstanceOperations(t *testing.T) {
DefinitionUID: alertDefinition4.UID,
}
err = ng.listAlertInstances(listCommand)
err = store.listAlertInstances(listCommand)
require.NoError(t, err)
require.Len(t, listCommand.Result, 1)

@ -4,12 +4,17 @@ import (
"github.com/grafana/grafana/pkg/models"
)
func (ng *AlertNG) validateOrgAlertDefinition(c *models.ReqContext) {
func (api *apiImpl) validateOrgAlertDefinition(c *models.ReqContext) {
uid := c.ParamsEscape(":alertDefinitionUID")
if uid == "" {
c.JsonApiErr(403, "Permission denied", nil)
return
}
query := getAlertDefinitionByUIDQuery{UID: uid, OrgID: c.SignedInUser.OrgId}
if err := ng.getAlertDefinitionByUID(&query); err != nil {
if err := api.store.getAlertDefinitionByUID(&query); err != nil {
c.JsonApiErr(404, "Alert definition not found", nil)
return
}

@ -6,12 +6,12 @@ import (
"github.com/benbjohnson/clock"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/services/sqlstore"
"github.com/grafana/grafana/pkg/api/routing"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/sqlstore"
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
"github.com/grafana/grafana/pkg/setting"
)
@ -35,7 +35,7 @@ type AlertNG struct {
RouteRegister routing.RouteRegister `inject:""`
SQLStore *sqlstore.SQLStore `inject:""`
log log.Logger
schedule *schedule
schedule scheduleService
}
func init() {
@ -46,21 +46,34 @@ func init() {
func (ng *AlertNG) Init() error {
ng.log = log.New("ngalert")
ng.registerAPIEndpoints()
baseInterval := baseIntervalSeconds * time.Second
store := storeImpl{baseInterval: baseInterval, SQLStore: ng.SQLStore}
schedCfg := schedulerCfg{
c: clock.New(),
baseInterval: baseIntervalSeconds * time.Second,
baseInterval: baseInterval,
logger: ng.log,
evaluator: eval.Evaluator{Cfg: ng.Cfg},
store: store,
}
ng.schedule = newScheduler(schedCfg)
api := apiImpl{
Cfg: ng.Cfg,
DatasourceCache: ng.DatasourceCache,
RouteRegister: ng.RouteRegister,
schedule: ng.schedule,
store: store}
api.registerAPIEndpoints()
return nil
}
// Run starts the scheduler
func (ng *AlertNG) Run(ctx context.Context) error {
ng.log.Debug("ngalert starting")
return ng.alertingTicker(ctx)
return ng.schedule.Ticker(ctx)
}
// IsDisabled returns true if the alerting service is disable for this instance.
@ -85,14 +98,14 @@ func (ng *AlertNG) AddMigration(mg *migrator.Migrator) {
}
// LoadAlertCondition returns a Condition object for the given alertDefinitionID.
func (ng *AlertNG) LoadAlertCondition(alertDefinitionUID string, orgID int64) (*eval.Condition, error) {
func (api *apiImpl) LoadAlertCondition(alertDefinitionUID string, orgID int64) (*eval.Condition, error) {
q := getAlertDefinitionByUIDQuery{UID: alertDefinitionUID, OrgID: orgID}
if err := ng.getAlertDefinitionByUID(&q); err != nil {
if err := api.store.getAlertDefinitionByUID(&q); err != nil {
return nil, err
}
alertDefinition := q.Result
err := ng.validateAlertDefinition(alertDefinition, true)
err := api.store.validateAlertDefinition(alertDefinition, true)
if err != nil {
return nil, err
}

@ -13,8 +13,19 @@ import (
"golang.org/x/sync/errgroup"
)
func (ng *AlertNG) definitionRoutine(grafanaCtx context.Context, key alertDefinitionKey, evalCh <-chan *evalContext, stopCh <-chan struct{}) error {
ng.log.Debug("alert definition routine started", "key", key)
type scheduleService interface {
Ticker(context.Context) error
Pause() error
Unpause() error
// the following are used by tests only used for tests
evalApplied(alertDefinitionKey, time.Time)
stopApplied(alertDefinitionKey)
overrideCfg(cfg schedulerCfg)
}
func (sch *schedule) definitionRoutine(grafanaCtx context.Context, key alertDefinitionKey, evalCh <-chan *evalContext, stopCh <-chan struct{}) error {
sch.log.Debug("alert definition routine started", "key", key)
evalRunning := false
var start, end time.Time
@ -33,13 +44,13 @@ func (ng *AlertNG) definitionRoutine(grafanaCtx context.Context, key alertDefini
// fetch latest alert definition version
if alertDefinition == nil || alertDefinition.Version < ctx.version {
q := getAlertDefinitionByUIDQuery{OrgID: key.orgID, UID: key.definitionUID}
err := ng.getAlertDefinitionByUID(&q)
err := sch.store.getAlertDefinitionByUID(&q)
if err != nil {
ng.schedule.log.Error("failed to fetch alert definition", "key", key)
sch.log.Error("failed to fetch alert definition", "key", key)
return err
}
alertDefinition = q.Result
ng.schedule.log.Debug("new alert definition version fetched", "title", alertDefinition.Title, "key", key, "version", alertDefinition.Version)
sch.log.Debug("new alert definition version fetched", "title", alertDefinition.Title, "key", key, "version", alertDefinition.Version)
}
condition := eval.Condition{
@ -47,19 +58,19 @@ func (ng *AlertNG) definitionRoutine(grafanaCtx context.Context, key alertDefini
OrgID: alertDefinition.OrgID,
QueriesAndExpressions: alertDefinition.Data,
}
results, err := ng.schedule.evaluator.ConditionEval(&condition, ctx.now)
results, err := sch.evaluator.ConditionEval(&condition, ctx.now)
end = timeNow()
if err != nil {
// consider saving alert instance on error
ng.schedule.log.Error("failed to evaluate alert definition", "title", alertDefinition.Title, "key", key, "attempt", attempt, "now", ctx.now, "duration", end.Sub(start), "error", err)
sch.log.Error("failed to evaluate alert definition", "title", alertDefinition.Title, "key", key, "attempt", attempt, "now", ctx.now, "duration", end.Sub(start), "error", err)
return err
}
for _, r := range results {
ng.schedule.log.Debug("alert definition result", "title", alertDefinition.Title, "key", key, "attempt", attempt, "now", ctx.now, "duration", end.Sub(start), "instance", r.Instance, "state", r.State.String())
sch.log.Debug("alert definition result", "title", alertDefinition.Title, "key", key, "attempt", attempt, "now", ctx.now, "duration", end.Sub(start), "instance", r.Instance, "state", r.State.String())
cmd := saveAlertInstanceCommand{DefinitionOrgID: key.orgID, DefinitionUID: key.definitionUID, State: InstanceStateType(r.State.String()), Labels: InstanceLabels(r.Instance), LastEvalTime: ctx.now}
err := ng.saveAlertInstance(&cmd)
err := sch.store.saveAlertInstance(&cmd)
if err != nil {
ng.schedule.log.Error("failed saving alert instance", "title", alertDefinition.Title, "key", key, "attempt", attempt, "now", ctx.now, "instance", r.Instance, "state", r.State.String(), "error", err)
sch.log.Error("failed saving alert instance", "title", alertDefinition.Title, "key", key, "attempt", attempt, "now", ctx.now, "instance", r.Instance, "state", r.State.String(), "error", err)
}
}
return nil
@ -69,12 +80,10 @@ func (ng *AlertNG) definitionRoutine(grafanaCtx context.Context, key alertDefini
evalRunning = true
defer func() {
evalRunning = false
if ng.schedule.evalApplied != nil {
ng.schedule.evalApplied(key, ctx.now)
}
sch.evalApplied(key, ctx.now)
}()
for attempt = 0; attempt < ng.schedule.maxAttempts; attempt++ {
for attempt = 0; attempt < sch.maxAttempts; attempt++ {
err := evaluate(attempt)
if err == nil {
break
@ -82,10 +91,8 @@ func (ng *AlertNG) definitionRoutine(grafanaCtx context.Context, key alertDefini
}
}()
case <-stopCh:
if ng.schedule.stopApplied != nil {
ng.schedule.stopApplied(key)
}
ng.schedule.log.Debug("stopping alert definition routine", "key", key)
sch.stopApplied(key)
sch.log.Debug("stopping alert definition routine", "key", key)
// interrupt evaluation if it's running
return nil
case <-grafanaCtx.Done():
@ -110,43 +117,73 @@ type schedule struct {
// evalApplied is only used for tests: test code can set it to non-nil
// function, and then it'll be called from the event loop whenever the
// message from evalApplied is handled.
evalApplied func(alertDefinitionKey, time.Time)
evalAppliedFunc func(alertDefinitionKey, time.Time)
// stopApplied is only used for tests: test code can set it to non-nil
// function, and then it'll be called from the event loop whenever the
// message from stopApplied is handled.
stopApplied func(alertDefinitionKey)
stopAppliedFunc func(alertDefinitionKey)
log log.Logger
evaluator eval.Evaluator
store store
}
type schedulerCfg struct {
c clock.Clock
baseInterval time.Duration
logger log.Logger
evalApplied func(alertDefinitionKey, time.Time)
evaluator eval.Evaluator
c clock.Clock
baseInterval time.Duration
logger log.Logger
evalAppliedFunc func(alertDefinitionKey, time.Time)
stopAppliedFunc func(alertDefinitionKey)
evaluator eval.Evaluator
store store
}
// newScheduler returns a new schedule.
func newScheduler(cfg schedulerCfg) *schedule {
ticker := alerting.NewTicker(cfg.c.Now(), time.Second*0, cfg.c, int64(cfg.baseInterval.Seconds()))
sch := schedule{
registry: alertDefinitionRegistry{alertDefinitionInfo: make(map[alertDefinitionKey]alertDefinitionInfo)},
maxAttempts: maxAttempts,
clock: cfg.c,
baseInterval: cfg.baseInterval,
log: cfg.logger,
heartbeat: ticker,
evalApplied: cfg.evalApplied,
evaluator: cfg.evaluator,
registry: alertDefinitionRegistry{alertDefinitionInfo: make(map[alertDefinitionKey]alertDefinitionInfo)},
maxAttempts: maxAttempts,
clock: cfg.c,
baseInterval: cfg.baseInterval,
log: cfg.logger,
heartbeat: ticker,
evalAppliedFunc: cfg.evalAppliedFunc,
stopAppliedFunc: cfg.stopAppliedFunc,
evaluator: cfg.evaluator,
store: cfg.store,
}
return &sch
}
func (sch *schedule) pause() error {
func (sch *schedule) overrideCfg(cfg schedulerCfg) {
sch.clock = cfg.c
sch.baseInterval = cfg.baseInterval
sch.heartbeat = alerting.NewTicker(cfg.c.Now(), time.Second*0, cfg.c, int64(cfg.baseInterval.Seconds()))
sch.evalAppliedFunc = cfg.evalAppliedFunc
sch.stopAppliedFunc = cfg.stopAppliedFunc
}
func (sch *schedule) evalApplied(alertDefKey alertDefinitionKey, now time.Time) {
if sch.evalAppliedFunc == nil {
return
}
sch.evalAppliedFunc(alertDefKey, now)
}
func (sch *schedule) stopApplied(alertDefKey alertDefinitionKey) {
if sch.stopAppliedFunc == nil {
return
}
sch.stopAppliedFunc(alertDefKey)
}
func (sch *schedule) Pause() error {
if sch == nil {
return fmt.Errorf("scheduler is not initialised")
}
@ -155,7 +192,7 @@ func (sch *schedule) pause() error {
return nil
}
func (sch *schedule) unpause() error {
func (sch *schedule) Unpause() error {
if sch == nil {
return fmt.Errorf("scheduler is not initialised")
}
@ -164,20 +201,20 @@ func (sch *schedule) unpause() error {
return nil
}
func (ng *AlertNG) alertingTicker(grafanaCtx context.Context) error {
func (sch *schedule) Ticker(grafanaCtx context.Context) error {
dispatcherGroup, ctx := errgroup.WithContext(grafanaCtx)
for {
select {
case tick := <-ng.schedule.heartbeat.C:
tickNum := tick.Unix() / int64(ng.schedule.baseInterval.Seconds())
alertDefinitions := ng.fetchAllDetails(tick)
ng.schedule.log.Debug("alert definitions fetched", "count", len(alertDefinitions))
case tick := <-sch.heartbeat.C:
tickNum := tick.Unix() / int64(sch.baseInterval.Seconds())
alertDefinitions := sch.fetchAllDetails(tick)
sch.log.Debug("alert definitions fetched", "count", len(alertDefinitions))
// registeredDefinitions is a map used for finding deleted alert definitions
// initially it is assigned to all known alert definitions from the previous cycle
// each alert definition found also in this cycle is removed
// so, at the end, the remaining registered alert definitions are the deleted ones
registeredDefinitions := ng.schedule.registry.keyMap()
registeredDefinitions := sch.registry.keyMap()
type readyToRunItem struct {
key alertDefinitionKey
@ -191,24 +228,24 @@ func (ng *AlertNG) alertingTicker(grafanaCtx context.Context) error {
key := item.getKey()
itemVersion := item.Version
newRoutine := !ng.schedule.registry.exists(key)
definitionInfo := ng.schedule.registry.getOrCreateInfo(key, itemVersion)
invalidInterval := item.IntervalSeconds%int64(ng.schedule.baseInterval.Seconds()) != 0
newRoutine := !sch.registry.exists(key)
definitionInfo := sch.registry.getOrCreateInfo(key, itemVersion)
invalidInterval := item.IntervalSeconds%int64(sch.baseInterval.Seconds()) != 0
if newRoutine && !invalidInterval {
dispatcherGroup.Go(func() error {
return ng.definitionRoutine(ctx, key, definitionInfo.evalCh, definitionInfo.stopCh)
return sch.definitionRoutine(ctx, key, definitionInfo.evalCh, definitionInfo.stopCh)
})
}
if invalidInterval {
// this is expected to be always false
// give that we validate interval during alert definition updates
ng.schedule.log.Debug("alert definition with invalid interval will be ignored: interval should be divided exactly by scheduler interval", "key", key, "interval", time.Duration(item.IntervalSeconds)*time.Second, "scheduler interval", ng.schedule.baseInterval)
sch.log.Debug("alert definition with invalid interval will be ignored: interval should be divided exactly by scheduler interval", "key", key, "interval", time.Duration(item.IntervalSeconds)*time.Second, "scheduler interval", sch.baseInterval)
continue
}
itemFrequency := item.IntervalSeconds / int64(ng.schedule.baseInterval.Seconds())
itemFrequency := item.IntervalSeconds / int64(sch.baseInterval.Seconds())
if item.IntervalSeconds != 0 && tickNum%itemFrequency == 0 {
readyToRun = append(readyToRun, readyToRunItem{key: key, definitionInfo: definitionInfo})
}
@ -219,7 +256,7 @@ func (ng *AlertNG) alertingTicker(grafanaCtx context.Context) error {
var step int64 = 0
if len(readyToRun) > 0 {
step = ng.schedule.baseInterval.Nanoseconds() / int64(len(readyToRun))
step = sch.baseInterval.Nanoseconds() / int64(len(readyToRun))
}
for i := range readyToRun {
@ -232,13 +269,13 @@ func (ng *AlertNG) alertingTicker(grafanaCtx context.Context) error {
// unregister and stop routines of the deleted alert definitions
for key := range registeredDefinitions {
definitionInfo, err := ng.schedule.registry.get(key)
definitionInfo, err := sch.registry.get(key)
if err != nil {
ng.schedule.log.Error("failed to get alert definition routine information", "err", err)
sch.log.Error("failed to get alert definition routine information", "err", err)
continue
}
definitionInfo.stopCh <- struct{}{}
ng.schedule.registry.del(key)
sch.registry.del(key)
}
case <-grafanaCtx.Done():
err := dispatcherGroup.Wait()

@ -8,9 +8,7 @@ import (
"testing"
"time"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -23,40 +21,37 @@ type evalAppliedInfo struct {
}
func TestAlertingTicker(t *testing.T) {
ng := setupTestEnv(t)
ng, store := setupTestEnv(t, 1)
t.Cleanup(registry.ClearOverrides)
mockedClock := clock.NewMock()
schefCfg := schedulerCfg{
c: mockedClock,
baseInterval: time.Second,
logger: log.New("ngalert.schedule.test"),
evaluator: eval.Evaluator{Cfg: ng.Cfg},
}
ng.schedule = newScheduler(schefCfg)
alerts := make([]*AlertDefinition, 0)
// create alert definition with zero interval (should never run)
alerts = append(alerts, createTestAlertDefinition(t, ng, 0))
alerts = append(alerts, createTestAlertDefinition(t, store, 0))
// create alert definition with one second interval
alerts = append(alerts, createTestAlertDefinition(t, ng, 1))
alerts = append(alerts, createTestAlertDefinition(t, store, 1))
evalAppliedCh := make(chan evalAppliedInfo, len(alerts))
stopAppliedCh := make(chan alertDefinitionKey, len(alerts))
ng.schedule.evalApplied = func(alertDefKey alertDefinitionKey, now time.Time) {
evalAppliedCh <- evalAppliedInfo{alertDefKey: alertDefKey, now: now}
}
mockedClock := clock.NewMock()
baseInterval := time.Second
ng.schedule.stopApplied = func(alertDefKey alertDefinitionKey) {
stopAppliedCh <- alertDefKey
schefCfg := schedulerCfg{
c: mockedClock,
baseInterval: baseInterval,
evalAppliedFunc: func(alertDefKey alertDefinitionKey, now time.Time) {
evalAppliedCh <- evalAppliedInfo{alertDefKey: alertDefKey, now: now}
},
stopAppliedFunc: func(alertDefKey alertDefinitionKey) {
stopAppliedCh <- alertDefKey
},
}
ng.schedule.overrideCfg(schefCfg)
ctx := context.Background()
go func() {
err := ng.alertingTicker(ctx)
err := ng.schedule.Ticker(ctx)
require.NoError(t, err)
}()
runtime.Gosched()
@ -69,7 +64,7 @@ func TestAlertingTicker(t *testing.T) {
// change alert definition interval to three seconds
var threeSecInterval int64 = 3
err := ng.updateAlertDefinition(&updateAlertDefinitionCommand{
err := store.updateAlertDefinition(&updateAlertDefinitionCommand{
UID: alerts[0].UID,
IntervalSeconds: &threeSecInterval,
OrgID: alerts[0].OrgID,
@ -95,7 +90,7 @@ func TestAlertingTicker(t *testing.T) {
assertEvalRun(t, evalAppliedCh, tick, expectedAlertDefinitionsEvaluated...)
})
err = ng.deleteAlertDefinitionByUID(&deleteAlertDefinitionByUIDCommand{UID: alerts[1].UID, OrgID: alerts[1].OrgID})
err = store.deleteAlertDefinitionByUID(&deleteAlertDefinitionByUIDCommand{UID: alerts[1].UID, OrgID: alerts[1].OrgID})
require.NoError(t, err)
t.Logf("alert definition: %v deleted", alerts[1].getKey())
@ -116,7 +111,7 @@ func TestAlertingTicker(t *testing.T) {
})
// create alert definition with one second interval
alerts = append(alerts, createTestAlertDefinition(t, ng, 1))
alerts = append(alerts, createTestAlertDefinition(t, store, 1))
expectedAlertDefinitionsEvaluated = []alertDefinitionKey{alerts[2].getKey()}
t.Run(fmt.Sprintf("on 7th tick alert definitions: %s should be evaluated", concatenate(expectedAlertDefinitionsEvaluated)), func(t *testing.T) {
@ -125,7 +120,7 @@ func TestAlertingTicker(t *testing.T) {
})
// pause alert definition
err = ng.updateAlertDefinitionPaused(&updateAlertDefinitionPausedCommand{UIDs: []string{alerts[2].UID}, OrgID: alerts[2].OrgID, Paused: true})
err = store.updateAlertDefinitionPaused(&updateAlertDefinitionPausedCommand{UIDs: []string{alerts[2].UID}, OrgID: alerts[2].OrgID, Paused: true})
require.NoError(t, err)
t.Logf("alert definition: %v paused", alerts[2].getKey())
@ -141,7 +136,7 @@ func TestAlertingTicker(t *testing.T) {
})
// unpause alert definition
err = ng.updateAlertDefinitionPaused(&updateAlertDefinitionPausedCommand{UIDs: []string{alerts[2].UID}, OrgID: alerts[2].OrgID, Paused: false})
err = store.updateAlertDefinitionPaused(&updateAlertDefinitionPausedCommand{UIDs: []string{alerts[2].UID}, OrgID: alerts[2].OrgID, Paused: false})
require.NoError(t, err)
t.Logf("alert definition: %v unpaused", alerts[2].getKey())

@ -15,7 +15,7 @@ var errEmptyTitleError = errors.New("title is empty")
// validateAlertDefinition validates the alert definition interval and organisation.
// If requireData is true checks that it contains at least one alert query
func (ng *AlertNG) validateAlertDefinition(alertDefinition *AlertDefinition, requireData bool) error {
func (st storeImpl) validateAlertDefinition(alertDefinition *AlertDefinition, requireData bool) error {
if !requireData && len(alertDefinition.Data) == 0 {
return fmt.Errorf("no queries or expressions are found")
}
@ -24,8 +24,8 @@ func (ng *AlertNG) validateAlertDefinition(alertDefinition *AlertDefinition, req
return errEmptyTitleError
}
if alertDefinition.IntervalSeconds%int64(ng.schedule.baseInterval.Seconds()) != 0 {
return fmt.Errorf("invalid interval: %v: interval should be divided exactly by scheduler interval: %v", time.Duration(alertDefinition.IntervalSeconds)*time.Second, ng.schedule.baseInterval)
if alertDefinition.IntervalSeconds%int64(st.baseInterval.Seconds()) != 0 {
return fmt.Errorf("invalid interval: %v: interval should be divided exactly by scheduler interval: %v", time.Duration(alertDefinition.IntervalSeconds)*time.Second, st.baseInterval)
}
// enfore max name length in SQLite
@ -41,7 +41,7 @@ func (ng *AlertNG) validateAlertDefinition(alertDefinition *AlertDefinition, req
}
// validateCondition validates that condition queries refer to existing datasources
func (ng *AlertNG) validateCondition(c eval.Condition, user *models.SignedInUser, skipCache bool) error {
func (api *apiImpl) validateCondition(c eval.Condition, user *models.SignedInUser, skipCache bool) error {
var refID string
if len(c.QueriesAndExpressions) == 0 {
@ -66,7 +66,7 @@ func (ng *AlertNG) validateCondition(c eval.Condition, user *models.SignedInUser
continue
}
_, err = ng.DatasourceCache.GetDatasourceByUID(datasourceUID, user, skipCache)
_, err = api.DatasourceCache.GetDatasourceByUID(datasourceUID, user, skipCache)
if err != nil {
return fmt.Errorf("failed to get datasource: %s: %w", datasourceUID, err)
}

Loading…
Cancel
Save