|
|
|
@ -11,7 +11,6 @@ import ( |
|
|
|
|
|
|
|
|
|
type Engine struct { |
|
|
|
|
execQueue chan *Job |
|
|
|
|
resultQueue chan *EvalContext |
|
|
|
|
clock clock.Clock |
|
|
|
|
ticker *Ticker |
|
|
|
|
scheduler Scheduler |
|
|
|
@ -25,7 +24,6 @@ func NewEngine() *Engine { |
|
|
|
|
e := &Engine{ |
|
|
|
|
ticker: NewTicker(time.Now(), time.Second*0, clock.New()), |
|
|
|
|
execQueue: make(chan *Job, 1000), |
|
|
|
|
resultQueue: make(chan *EvalContext, 1000), |
|
|
|
|
scheduler: NewScheduler(), |
|
|
|
|
evalHandler: NewEvalHandler(), |
|
|
|
|
ruleReader: NewRuleReader(), |
|
|
|
@ -39,23 +37,17 @@ func NewEngine() *Engine { |
|
|
|
|
func (e *Engine) Run(ctx context.Context) error { |
|
|
|
|
e.log.Info("Initializing Alerting") |
|
|
|
|
|
|
|
|
|
g, ctx := errgroup.WithContext(ctx) |
|
|
|
|
alertGroup, ctx := errgroup.WithContext(ctx) |
|
|
|
|
|
|
|
|
|
g.Go(func() error { return e.alertingTicker(ctx) }) |
|
|
|
|
g.Go(func() error { return e.execDispatcher(ctx) }) |
|
|
|
|
g.Go(func() error { return e.resultDispatcher(ctx) }) |
|
|
|
|
alertGroup.Go(func() error { return e.alertingTicker(ctx) }) |
|
|
|
|
alertGroup.Go(func() error { return e.runJobDispatcher(ctx) }) |
|
|
|
|
|
|
|
|
|
err := g.Wait() |
|
|
|
|
err := alertGroup.Wait() |
|
|
|
|
|
|
|
|
|
e.log.Info("Stopped Alerting", "reason", err) |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (e *Engine) Stop() { |
|
|
|
|
close(e.execQueue) |
|
|
|
|
close(e.resultQueue) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (e *Engine) alertingTicker(grafanaCtx context.Context) error { |
|
|
|
|
defer func() { |
|
|
|
|
if err := recover(); err != nil { |
|
|
|
@ -81,69 +73,58 @@ func (e *Engine) alertingTicker(grafanaCtx context.Context) error { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (e *Engine) execDispatcher(grafanaCtx context.Context) error { |
|
|
|
|
func (e *Engine) runJobDispatcher(grafanaCtx context.Context) error { |
|
|
|
|
dispatcherGroup, alertCtx := errgroup.WithContext(grafanaCtx) |
|
|
|
|
|
|
|
|
|
for { |
|
|
|
|
select { |
|
|
|
|
case <-grafanaCtx.Done(): |
|
|
|
|
close(e.resultQueue) |
|
|
|
|
return grafanaCtx.Err() |
|
|
|
|
return dispatcherGroup.Wait() |
|
|
|
|
case job := <-e.execQueue: |
|
|
|
|
go e.executeJob(grafanaCtx, job) |
|
|
|
|
dispatcherGroup.Go(func() error { return e.processJob(alertCtx, job) }) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (e *Engine) executeJob(grafanaCtx context.Context, job *Job) error { |
|
|
|
|
var ( |
|
|
|
|
unfinishedWorkTimeout time.Duration = time.Second * 5 |
|
|
|
|
alertTimeout time.Duration = time.Second * 30 |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
func (e *Engine) processJob(grafanaCtx context.Context, job *Job) error { |
|
|
|
|
defer func() { |
|
|
|
|
if err := recover(); err != nil { |
|
|
|
|
e.log.Error("Execute Alert Panic", "error", err, "stack", log.Stack(1)) |
|
|
|
|
e.log.Error("Alert Panic", "error", err, "stack", log.Stack(1)) |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
done := make(chan *EvalContext, 1) |
|
|
|
|
alertCtx, cancelFn := context.WithTimeout(context.TODO(), alertTimeout) |
|
|
|
|
|
|
|
|
|
job.Running = true |
|
|
|
|
evalContext := NewEvalContext(alertCtx, job.Rule) |
|
|
|
|
|
|
|
|
|
done := make(chan struct{}) |
|
|
|
|
|
|
|
|
|
go func() { |
|
|
|
|
job.Running = true |
|
|
|
|
context := NewEvalContext(job.Rule) |
|
|
|
|
e.evalHandler.Eval(context) |
|
|
|
|
job.Running = false |
|
|
|
|
done <- context |
|
|
|
|
e.evalHandler.Eval(evalContext) |
|
|
|
|
e.resultHandler.Handle(evalContext) |
|
|
|
|
close(done) |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
var err error = nil |
|
|
|
|
select { |
|
|
|
|
|
|
|
|
|
case <-grafanaCtx.Done(): |
|
|
|
|
return grafanaCtx.Err() |
|
|
|
|
case evalContext := <-done: |
|
|
|
|
e.resultQueue <- evalContext |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (e *Engine) resultDispatcher(grafanaCtx context.Context) error { |
|
|
|
|
for { |
|
|
|
|
select { |
|
|
|
|
case <-grafanaCtx.Done(): |
|
|
|
|
//handle all responses before shutting down.
|
|
|
|
|
for result := range e.resultQueue { |
|
|
|
|
e.handleResponse(result) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return grafanaCtx.Err() |
|
|
|
|
case result := <-e.resultQueue: |
|
|
|
|
e.handleResponse(result) |
|
|
|
|
case <-time.After(unfinishedWorkTimeout): |
|
|
|
|
cancelFn() |
|
|
|
|
err = grafanaCtx.Err() |
|
|
|
|
case <-done: |
|
|
|
|
} |
|
|
|
|
case <-done: |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (e *Engine) handleResponse(result *EvalContext) { |
|
|
|
|
defer func() { |
|
|
|
|
if err := recover(); err != nil { |
|
|
|
|
e.log.Error("Panic in resultDispatcher", "error", err, "stack", log.Stack(1)) |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
e.log.Debug("Alert Rule Result", "ruleId", result.Rule.Id, "firing", result.Firing) |
|
|
|
|
e.resultHandler.Handle(result) |
|
|
|
|
e.log.Debug("Job Execution completed", "timeMs", evalContext.GetDurationMs(), "alertId", evalContext.Rule.Id, "name", evalContext.Rule.Name, "firing", evalContext.Firing) |
|
|
|
|
job.Running = false |
|
|
|
|
cancelFn() |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|