|
|
|
@ -16,7 +16,7 @@ import ( |
|
|
|
|
"golang.org/x/sync/errgroup" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
type Engine struct { |
|
|
|
|
type AlertingEngine struct { |
|
|
|
|
execQueue chan *Job |
|
|
|
|
//clock clock.Clock
|
|
|
|
|
ticker *Ticker |
|
|
|
@ -28,20 +28,20 @@ type Engine struct { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func init() { |
|
|
|
|
registry.RegisterService(&Engine{}) |
|
|
|
|
registry.RegisterService(&AlertingEngine{}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func NewEngine() *Engine { |
|
|
|
|
e := &Engine{} |
|
|
|
|
func NewEngine() *AlertingEngine { |
|
|
|
|
e := &AlertingEngine{} |
|
|
|
|
e.Init() |
|
|
|
|
return e |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (e *Engine) IsDisabled() bool { |
|
|
|
|
func (e *AlertingEngine) IsDisabled() bool { |
|
|
|
|
return !setting.AlertingEnabled || !setting.ExecuteAlerts |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (e *Engine) Init() error { |
|
|
|
|
func (e *AlertingEngine) Init() error { |
|
|
|
|
e.ticker = NewTicker(time.Now(), time.Second*0, clock.New()) |
|
|
|
|
e.execQueue = make(chan *Job, 1000) |
|
|
|
|
e.scheduler = NewScheduler() |
|
|
|
@ -52,7 +52,7 @@ func (e *Engine) Init() error { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (e *Engine) Run(ctx context.Context) error { |
|
|
|
|
func (e *AlertingEngine) Run(ctx context.Context) error { |
|
|
|
|
alertGroup, ctx := errgroup.WithContext(ctx) |
|
|
|
|
alertGroup.Go(func() error { return e.alertingTicker(ctx) }) |
|
|
|
|
alertGroup.Go(func() error { return e.runJobDispatcher(ctx) }) |
|
|
|
@ -61,7 +61,7 @@ func (e *Engine) Run(ctx context.Context) error { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (e *Engine) alertingTicker(grafanaCtx context.Context) error { |
|
|
|
|
func (e *AlertingEngine) alertingTicker(grafanaCtx context.Context) error { |
|
|
|
|
defer func() { |
|
|
|
|
if err := recover(); err != nil { |
|
|
|
|
e.log.Error("Scheduler Panic: stopping alertingTicker", "error", err, "stack", log.Stack(1)) |
|
|
|
@ -86,7 +86,7 @@ func (e *Engine) alertingTicker(grafanaCtx context.Context) error { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (e *Engine) runJobDispatcher(grafanaCtx context.Context) error { |
|
|
|
|
func (e *AlertingEngine) runJobDispatcher(grafanaCtx context.Context) error { |
|
|
|
|
dispatcherGroup, alertCtx := errgroup.WithContext(grafanaCtx) |
|
|
|
|
|
|
|
|
|
for { |
|
|
|
@ -106,7 +106,7 @@ var ( |
|
|
|
|
alertMaxAttempts = 3 |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
func (e *Engine) processJobWithRetry(grafanaCtx context.Context, job *Job) error { |
|
|
|
|
func (e *AlertingEngine) processJobWithRetry(grafanaCtx context.Context, job *Job) error { |
|
|
|
|
defer func() { |
|
|
|
|
if err := recover(); err != nil { |
|
|
|
|
e.log.Error("Alert Panic", "error", err, "stack", log.Stack(1)) |
|
|
|
@ -141,7 +141,7 @@ func (e *Engine) processJobWithRetry(grafanaCtx context.Context, job *Job) error |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (e *Engine) endJob(err error, cancelChan chan context.CancelFunc, job *Job) error { |
|
|
|
|
func (e *AlertingEngine) endJob(err error, cancelChan chan context.CancelFunc, job *Job) error { |
|
|
|
|
job.Running = false |
|
|
|
|
close(cancelChan) |
|
|
|
|
for cancelFn := range cancelChan { |
|
|
|
@ -150,7 +150,7 @@ func (e *Engine) endJob(err error, cancelChan chan context.CancelFunc, job *Job) |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (e *Engine) processJob(attemptID int, attemptChan chan int, cancelChan chan context.CancelFunc, job *Job) { |
|
|
|
|
func (e *AlertingEngine) processJob(attemptID int, attemptChan chan int, cancelChan chan context.CancelFunc, job *Job) { |
|
|
|
|
defer func() { |
|
|
|
|
if err := recover(); err != nil { |
|
|
|
|
e.log.Error("Alert Panic", "error", err, "stack", log.Stack(1)) |
|
|
|
|