|
|
|
@ -17,10 +17,10 @@ import ( |
|
|
|
|
"golang.org/x/sync/errgroup" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
// AlertingService is the background process that
|
|
|
|
|
// AlertEngine is the background process that
|
|
|
|
|
// schedules alert evaluations and makes sure notifications
|
|
|
|
|
// are sent.
|
|
|
|
|
type AlertingService struct { |
|
|
|
|
type AlertEngine struct { |
|
|
|
|
RenderService rendering.Service `inject:""` |
|
|
|
|
|
|
|
|
|
execQueue chan *Job |
|
|
|
@ -33,16 +33,16 @@ type AlertingService struct { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func init() { |
|
|
|
|
registry.RegisterService(&AlertingService{}) |
|
|
|
|
registry.RegisterService(&AlertEngine{}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// IsDisabled returns true if the alerting service is disable for this instance.
|
|
|
|
|
func (e *AlertingService) IsDisabled() bool { |
|
|
|
|
func (e *AlertEngine) IsDisabled() bool { |
|
|
|
|
return !setting.AlertingEnabled || !setting.ExecuteAlerts |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Init initalizes the AlertingService.
|
|
|
|
|
func (e *AlertingService) Init() error { |
|
|
|
|
func (e *AlertEngine) Init() error { |
|
|
|
|
e.ticker = NewTicker(time.Now(), time.Second*0, clock.New()) |
|
|
|
|
e.execQueue = make(chan *Job, 1000) |
|
|
|
|
e.scheduler = newScheduler() |
|
|
|
@ -54,7 +54,7 @@ func (e *AlertingService) Init() error { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Run starts the alerting service background process.
|
|
|
|
|
func (e *AlertingService) Run(ctx context.Context) error { |
|
|
|
|
func (e *AlertEngine) Run(ctx context.Context) error { |
|
|
|
|
alertGroup, ctx := errgroup.WithContext(ctx) |
|
|
|
|
alertGroup.Go(func() error { return e.alertingTicker(ctx) }) |
|
|
|
|
alertGroup.Go(func() error { return e.runJobDispatcher(ctx) }) |
|
|
|
@ -63,7 +63,7 @@ func (e *AlertingService) Run(ctx context.Context) error { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (e *AlertingService) alertingTicker(grafanaCtx context.Context) error { |
|
|
|
|
func (e *AlertEngine) alertingTicker(grafanaCtx context.Context) error { |
|
|
|
|
defer func() { |
|
|
|
|
if err := recover(); err != nil { |
|
|
|
|
e.log.Error("Scheduler Panic: stopping alertingTicker", "error", err, "stack", log.Stack(1)) |
|
|
|
@ -88,7 +88,7 @@ func (e *AlertingService) alertingTicker(grafanaCtx context.Context) error { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (e *AlertingService) runJobDispatcher(grafanaCtx context.Context) error { |
|
|
|
|
func (e *AlertEngine) runJobDispatcher(grafanaCtx context.Context) error { |
|
|
|
|
dispatcherGroup, alertCtx := errgroup.WithContext(grafanaCtx) |
|
|
|
|
|
|
|
|
|
for { |
|
|
|
@ -105,7 +105,7 @@ var ( |
|
|
|
|
unfinishedWorkTimeout = time.Second * 5 |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
func (e *AlertingService) processJobWithRetry(grafanaCtx context.Context, job *Job) error { |
|
|
|
|
func (e *AlertEngine) processJobWithRetry(grafanaCtx context.Context, job *Job) error { |
|
|
|
|
defer func() { |
|
|
|
|
if err := recover(); err != nil { |
|
|
|
|
e.log.Error("Alert Panic", "error", err, "stack", log.Stack(1)) |
|
|
|
@ -140,7 +140,7 @@ func (e *AlertingService) processJobWithRetry(grafanaCtx context.Context, job *J |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (e *AlertingService) endJob(err error, cancelChan chan context.CancelFunc, job *Job) error { |
|
|
|
|
func (e *AlertEngine) endJob(err error, cancelChan chan context.CancelFunc, job *Job) error { |
|
|
|
|
job.Running = false |
|
|
|
|
close(cancelChan) |
|
|
|
|
for cancelFn := range cancelChan { |
|
|
|
@ -149,7 +149,7 @@ func (e *AlertingService) endJob(err error, cancelChan chan context.CancelFunc, |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (e *AlertingService) processJob(attemptID int, attemptChan chan int, cancelChan chan context.CancelFunc, job *Job) { |
|
|
|
|
func (e *AlertEngine) processJob(attemptID int, attemptChan chan int, cancelChan chan context.CancelFunc, job *Job) { |
|
|
|
|
defer func() { |
|
|
|
|
if err := recover(); err != nil { |
|
|
|
|
e.log.Error("Alert Panic", "error", err, "stack", log.Stack(1)) |
|
|
|
@ -180,8 +180,8 @@ func (e *AlertingService) processJob(attemptID int, attemptChan chan int, cancel |
|
|
|
|
|
|
|
|
|
e.evalHandler.Eval(evalContext) |
|
|
|
|
|
|
|
|
|
span.SetTag("alertId", evalContext.Rule.Id) |
|
|
|
|
span.SetTag("dashboardId", evalContext.Rule.DashboardId) |
|
|
|
|
span.SetTag("alertId", evalContext.Rule.ID) |
|
|
|
|
span.SetTag("dashboardId", evalContext.Rule.DashboardID) |
|
|
|
|
span.SetTag("firing", evalContext.Firing) |
|
|
|
|
span.SetTag("nodatapoints", evalContext.NoDataFound) |
|
|
|
|
span.SetTag("attemptID", attemptID) |
|
|
|
@ -194,7 +194,7 @@ func (e *AlertingService) processJob(attemptID int, attemptChan chan int, cancel |
|
|
|
|
) |
|
|
|
|
if attemptID < setting.AlertingMaxAttempts { |
|
|
|
|
span.Finish() |
|
|
|
|
e.log.Debug("Job Execution attempt triggered retry", "timeMs", evalContext.GetDurationMs(), "alertId", evalContext.Rule.Id, "name", evalContext.Rule.Name, "firing", evalContext.Firing, "attemptID", attemptID) |
|
|
|
|
e.log.Debug("Job Execution attempt triggered retry", "timeMs", evalContext.GetDurationMs(), "alertId", evalContext.Rule.ID, "name", evalContext.Rule.Name, "firing", evalContext.Firing, "attemptID", attemptID) |
|
|
|
|
attemptChan <- (attemptID + 1) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
@ -212,7 +212,7 @@ func (e *AlertingService) processJob(attemptID int, attemptChan chan int, cancel |
|
|
|
|
evalContext.Rule.State = evalContext.GetNewState() |
|
|
|
|
e.resultHandler.handle(evalContext) |
|
|
|
|
span.Finish() |
|
|
|
|
e.log.Debug("Job Execution completed", "timeMs", evalContext.GetDurationMs(), "alertId", evalContext.Rule.Id, "name", evalContext.Rule.Name, "firing", evalContext.Firing, "attemptID", attemptID) |
|
|
|
|
e.log.Debug("Job Execution completed", "timeMs", evalContext.GetDurationMs(), "alertId", evalContext.Rule.ID, "name", evalContext.Rule.Name, "firing", evalContext.Firing, "attemptID", attemptID) |
|
|
|
|
close(attemptChan) |
|
|
|
|
}() |
|
|
|
|
} |
|
|
|
|