@ -8,13 +8,16 @@ import (
"sync"
"time"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/events"
"github.com/grafana/grafana/pkg/expr"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/alerting"
"github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
"github.com/grafana/grafana/pkg/services/ngalert/models"
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/notifier"
"github.com/grafana/grafana/pkg/services/ngalert/sender"
"github.com/grafana/grafana/pkg/services/ngalert/state"
@ -40,13 +43,17 @@ type ScheduleService interface {
// organization.
DroppedAlertmanagersFor ( orgID int64 ) [ ] * url . URL
// UpdateAlertRule notifies scheduler that a rule has been changed
UpdateAlertRule ( key models . AlertRuleKey )
UpdateAlertRule ( key ngmodels . AlertRuleKey )
// UpdateAlertRulesByNamespaceUID notifies scheduler that all rules in a namespace should be updated.
UpdateAlertRulesByNamespaceUID ( ctx context . Context , orgID int64 , uid string ) error
// DeleteAlertRule notifies scheduler that a rule has been changed
DeleteAlertRule ( key models . AlertRuleKey )
DeleteAlertRule ( key ng models. AlertRuleKey )
// the following are used by tests only used for tests
evalApplied ( models . AlertRuleKey , time . Time )
stopApplied ( models . AlertRuleKey )
evalApplied ( ng models. AlertRuleKey , time . Time )
stopApplied ( ng models. AlertRuleKey )
overrideCfg ( cfg SchedulerCfg )
folderUpdateHandler ( ctx context . Context , evt * events . FolderUpdated ) error
}
type schedule struct {
@ -65,12 +72,12 @@ type schedule struct {
// evalApplied is only used for tests: test code can set it to non-nil
// function, and then it'll be called from the event loop whenever the
// message from evalApplied is handled.
evalAppliedFunc func ( models . AlertRuleKey , time . Time )
evalAppliedFunc func ( ng models. AlertRuleKey , time . Time )
// stopApplied is only used for tests: test code can set it to non-nil
// function, and then it'll be called from the event loop whenever the
// message from stopApplied is handled.
stopAppliedFunc func ( models . AlertRuleKey )
stopAppliedFunc func ( ng models. AlertRuleKey )
log log . Logger
@ -91,7 +98,7 @@ type schedule struct {
// Senders help us send alerts to external Alertmanagers.
adminConfigMtx sync . RWMutex
sendAlertsTo map [ int64 ] models . AlertmanagersChoice
sendAlertsTo map [ int64 ] ng models. AlertmanagersChoice
sendersCfgHash map [ int64 ] string
senders map [ int64 ] * sender . Sender
adminConfigPollInterval time . Duration
@ -103,6 +110,9 @@ type schedule struct {
// current tick depends on its evaluation interval and when it was
// last evaluated.
schedulableAlertRules schedulableAlertRulesRegistry
// bus is used to hook into events that should cause rule updates.
bus bus . Bus
}
// SchedulerCfg is the scheduler configuration.
@ -110,9 +120,9 @@ type SchedulerCfg struct {
C clock . Clock
BaseInterval time . Duration
Logger log . Logger
EvalAppliedFunc func ( models . AlertRuleKey , time . Time )
EvalAppliedFunc func ( ng models. AlertRuleKey , time . Time )
MaxAttempts int64
StopAppliedFunc func ( models . AlertRuleKey )
StopAppliedFunc func ( ng models. AlertRuleKey )
Evaluator eval . Evaluator
RuleStore store . RuleStore
OrgStore store . OrgStore
@ -126,11 +136,11 @@ type SchedulerCfg struct {
}
// NewScheduler returns a new schedule.
func NewScheduler ( cfg SchedulerCfg , expressionService * expr . Service , appURL * url . URL , stateManager * state . Manager ) * schedule {
func NewScheduler ( cfg SchedulerCfg , expressionService * expr . Service , appURL * url . URL , stateManager * state . Manager , bus bus . Bus ) * schedule {
ticker := alerting . NewTicker ( cfg . C , cfg . BaseInterval , cfg . Metrics . Ticker )
sch := schedule {
registry : alertRuleInfoRegistry { alertRuleInfo : make ( map [ models . AlertRuleKey ] * alertRuleInfo ) } ,
registry : alertRuleInfoRegistry { alertRuleInfo : make ( map [ ng models. AlertRuleKey ] * alertRuleInfo ) } ,
maxAttempts : cfg . MaxAttempts ,
clock : cfg . C ,
baseInterval : cfg . BaseInterval ,
@ -148,14 +158,18 @@ func NewScheduler(cfg SchedulerCfg, expressionService *expr.Service, appURL *url
metrics : cfg . Metrics ,
appURL : appURL ,
stateManager : stateManager ,
sendAlertsTo : map [ int64 ] models . AlertmanagersChoice { } ,
sendAlertsTo : map [ int64 ] ng models. AlertmanagersChoice { } ,
senders : map [ int64 ] * sender . Sender { } ,
sendersCfgHash : map [ int64 ] string { } ,
adminConfigPollInterval : cfg . AdminConfigPollInterval ,
disabledOrgs : cfg . DisabledOrgs ,
minRuleInterval : cfg . MinRuleInterval ,
schedulableAlertRules : schedulableAlertRulesRegistry { rules : make ( map [ models . AlertRuleKey ] * models . SchedulableAlertRule ) } ,
schedulableAlertRules : schedulableAlertRulesRegistry { rules : make ( map [ ngmodels . AlertRuleKey ] * ngmodels . SchedulableAlertRule ) } ,
bus : bus ,
}
bus . AddEventListener ( sch . folderUpdateHandler )
return & sch
}
@ -216,7 +230,7 @@ func (sch *schedule) SyncAndApplyConfigFromDatabase() error {
continue
}
// We have no running sender and alerts are handled internally, no-op.
if ! ok && cfg . SendAlertsTo == models . InternalAlertmanager {
if ! ok && cfg . SendAlertsTo == ng models. InternalAlertmanager {
sch . log . Debug ( "alerts are handled internally" , "org" , cfg . OrgID )
continue
}
@ -313,7 +327,7 @@ func (sch *schedule) DroppedAlertmanagersFor(orgID int64) []*url.URL {
}
// UpdateAlertRule looks for the active rule evaluation and commands it to update the rule
func ( sch * schedule ) UpdateAlertRule ( key models . AlertRuleKey ) {
func ( sch * schedule ) UpdateAlertRule ( key ng models. AlertRuleKey ) {
ruleInfo , err := sch . registry . get ( key )
if err != nil {
return
@ -321,8 +335,28 @@ func (sch *schedule) UpdateAlertRule(key models.AlertRuleKey) {
ruleInfo . update ( )
}
// UpdateAlertRulesByNamespaceUID looks for the active rule evaluation for every rule in the given namespace and commands it to update the rule.
func ( sch * schedule ) UpdateAlertRulesByNamespaceUID ( ctx context . Context , orgID int64 , uid string ) error {
q := ngmodels . ListAlertRulesQuery {
OrgID : orgID ,
NamespaceUIDs : [ ] string { uid } ,
}
if err := sch . ruleStore . ListAlertRules ( ctx , & q ) ; err != nil {
return err
}
for _ , r := range q . Result {
sch . UpdateAlertRule ( ngmodels . AlertRuleKey {
OrgID : orgID ,
UID : r . UID ,
} )
}
return nil
}
// DeleteAlertRule stops evaluation of the rule, deletes it from active rules, and cleans up state cache.
func ( sch * schedule ) DeleteAlertRule ( key models . AlertRuleKey ) {
func ( sch * schedule ) DeleteAlertRule ( key ng models. AlertRuleKey ) {
// It can happen that the scheduler has deleted the alert rule before the
// Ruler API has called DeleteAlertRule. This can happen as requests to
// the Ruler API do not hold an exclusive lock over all scheduler operations.
@ -403,7 +437,7 @@ func (sch *schedule) schedulePeriodic(ctx context.Context) error {
sch . metrics . SchedulableAlertRulesHash . Set ( float64 ( hashUIDs ( alertRules ) ) )
type readyToRunItem struct {
key models . AlertRuleKey
key ng models. AlertRuleKey
ruleName string
ruleInfo * alertRuleInfo
version int64
@ -491,7 +525,8 @@ func (sch *schedule) schedulePeriodic(ctx context.Context) error {
}
}
func ( sch * schedule ) ruleRoutine ( grafanaCtx context . Context , key models . AlertRuleKey , evalCh <- chan * evaluation , updateCh <- chan struct { } ) error {
//nolint: gocyclo
func ( sch * schedule ) ruleRoutine ( grafanaCtx context . Context , key ngmodels . AlertRuleKey , evalCh <- chan * evaluation , updateCh <- chan struct { } ) error {
logger := sch . log . New ( "uid" , key . UID , "org" , key . OrgID )
logger . Debug ( "alert rule routine started" )
@ -509,7 +544,7 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRul
// Send alerts to local notifier if they need to be handled internally
// or if no external AMs have been discovered yet.
var localNotifierExist , externalNotifierExist bool
if sch . sendAlertsTo [ key . OrgID ] == models . ExternalAlertmanagers && len ( sch . AlertmanagersFor ( key . OrgID ) ) > 0 {
if sch . sendAlertsTo [ key . OrgID ] == ng models. ExternalAlertmanagers && len ( sch . AlertmanagersFor ( key . OrgID ) ) > 0 {
logger . Debug ( "no alerts to put in the notifier" )
} else {
logger . Debug ( "sending alerts to local notifier" , "count" , len ( alerts . PostableAlerts ) , "alerts" , alerts . PostableAlerts )
@ -533,7 +568,7 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRul
sch . adminConfigMtx . RLock ( )
defer sch . adminConfigMtx . RUnlock ( )
s , ok := sch . senders [ key . OrgID ]
if ok && sch . sendAlertsTo [ key . OrgID ] != models . InternalAlertmanager {
if ok && sch . sendAlertsTo [ key . OrgID ] != ng models. InternalAlertmanager {
logger . Debug ( "sending alerts to external notifier" , "count" , len ( alerts . PostableAlerts ) , "alerts" , alerts . PostableAlerts )
s . SendAlerts ( alerts )
externalNotifierExist = true
@ -551,8 +586,8 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRul
notify ( expiredAlerts , logger )
}
updateRule := func ( ctx context . Context , oldRule * models . AlertRule ) ( * models . AlertRule , error ) {
q := models . GetAlertRuleByUIDQuery { OrgID : key . OrgID , UID : key . UID }
updateRule := func ( ctx context . Context , oldRule * ng models. AlertRule ) ( * ng models. AlertRule , error ) {
q := ng models. GetAlertRuleByUIDQuery { OrgID : key . OrgID , UID : key . UID }
err := sch . ruleStore . GetAlertRuleByUID ( ctx , & q )
if err != nil {
logger . Error ( "failed to fetch alert rule" , "err" , err )
@ -561,14 +596,34 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRul
if oldRule != nil && oldRule . Version < q . Result . Version {
clearState ( )
}
user := & models . SignedInUser {
UserId : 0 ,
OrgRole : models . ROLE_ADMIN ,
OrgId : key . OrgID ,
}
folder , err := sch . ruleStore . GetNamespaceByUID ( ctx , q . Result . NamespaceUID , q . Result . OrgID , user )
if err != nil {
logger . Error ( "failed to fetch alert rule namespace" , "err" , err )
return nil , err
}
if q . Result . Labels == nil {
q . Result . Labels = make ( map [ string ] string )
} else if val , ok := q . Result . Labels [ ngmodels . FolderTitleLabel ] ; ok {
logger . Warn ( "alert rule contains protected label, value will be overwritten" , "label" , ngmodels . FolderTitleLabel , "value" , val )
}
q . Result . Labels [ ngmodels . FolderTitleLabel ] = folder . Title
return q . Result , nil
}
evaluate := func ( ctx context . Context , r * models . AlertRule , attempt int64 , e * evaluation ) error {
evaluate := func ( ctx context . Context , r * ng models. AlertRule , attempt int64 , e * evaluation ) error {
logger := logger . New ( "version" , r . Version , "attempt" , attempt , "now" , e . scheduledAt )
start := sch . clock . Now ( )
condition := models . Condition {
condition := ng models. Condition {
Condition : r . Condition ,
OrgID : r . OrgID ,
Data : r . Data ,
@ -606,7 +661,7 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRul
}
evalRunning := false
var currentRule * models . AlertRule
var currentRule * ng models. AlertRule
defer sch . stopApplied ( key )
for {
select {
@ -669,11 +724,11 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRul
func ( sch * schedule ) saveAlertStates ( ctx context . Context , states [ ] * state . State ) {
sch . log . Debug ( "saving alert states" , "count" , len ( states ) )
for _ , s := range states {
cmd := models . SaveAlertInstanceCommand {
cmd := ng models. SaveAlertInstanceCommand {
RuleOrgID : s . OrgID ,
RuleUID : s . AlertRuleUID ,
Labels : models . InstanceLabels ( s . Labels ) ,
State : models . InstanceStateType ( s . State . String ( ) ) ,
Labels : ng models. InstanceLabels ( s . Labels ) ,
State : ng models. InstanceStateType ( s . State . String ( ) ) ,
StateReason : s . StateReason ,
LastEvalTime : s . LastEvaluationTime ,
CurrentStateSince : s . StartsAt ,
@ -686,6 +741,11 @@ func (sch *schedule) saveAlertStates(ctx context.Context, states []*state.State)
}
}
// folderUpdateHandler listens for folder update events and updates all rules in the given folder.
func ( sch * schedule ) folderUpdateHandler ( ctx context . Context , evt * events . FolderUpdated ) error {
return sch . UpdateAlertRulesByNamespaceUID ( ctx , evt . OrgID , evt . UID )
}
// overrideCfg is only used on tests.
func ( sch * schedule ) overrideCfg ( cfg SchedulerCfg ) {
sch . clock = cfg . C
@ -697,7 +757,7 @@ func (sch *schedule) overrideCfg(cfg SchedulerCfg) {
}
// evalApplied is only used on tests.
func ( sch * schedule ) evalApplied ( alertDefKey models . AlertRuleKey , now time . Time ) {
func ( sch * schedule ) evalApplied ( alertDefKey ng models. AlertRuleKey , now time . Time ) {
if sch . evalAppliedFunc == nil {
return
}
@ -706,7 +766,7 @@ func (sch *schedule) evalApplied(alertDefKey models.AlertRuleKey, now time.Time)
}
// stopApplied is only used on tests.
func ( sch * schedule ) stopApplied ( alertDefKey models . AlertRuleKey ) {
func ( sch * schedule ) stopApplied ( alertDefKey ng models. AlertRuleKey ) {
if sch . stopAppliedFunc == nil {
return
}