|
|
|
|
@ -30,7 +30,7 @@ import ( |
|
|
|
|
// Rule represents a single piece of work that is executed periodically by the ruler.
|
|
|
|
|
type Rule interface { |
|
|
|
|
// Run creates the resources that will perform the rule's work, and starts it. It blocks indefinitely, until Stop is called or another signal is sent.
|
|
|
|
|
Run(key ngmodels.AlertRuleKey) error |
|
|
|
|
Run() error |
|
|
|
|
// Stop shuts down the rule's execution with an optional reason. It has no effect if the rule has not yet been Run.
|
|
|
|
|
Stop(reason error) |
|
|
|
|
// Eval sends a signal to execute the work represented by the rule, exactly one time.
|
|
|
|
|
@ -67,6 +67,7 @@ func newRuleFactory( |
|
|
|
|
if rule.Type() == ngmodels.RuleTypeRecording { |
|
|
|
|
return newRecordingRule( |
|
|
|
|
ctx, |
|
|
|
|
rule.GetKey(), |
|
|
|
|
maxAttempts, |
|
|
|
|
clock, |
|
|
|
|
evalFactory, |
|
|
|
|
@ -79,6 +80,7 @@ func newRuleFactory( |
|
|
|
|
} |
|
|
|
|
return newAlertRule( |
|
|
|
|
ctx, |
|
|
|
|
rule.GetKey(), |
|
|
|
|
appURL, |
|
|
|
|
disableGrafanaFolder, |
|
|
|
|
maxAttempts, |
|
|
|
|
@ -104,6 +106,8 @@ type ruleProvider interface { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type alertRule struct { |
|
|
|
|
key ngmodels.AlertRuleKey |
|
|
|
|
|
|
|
|
|
evalCh chan *Evaluation |
|
|
|
|
updateCh chan RuleVersionAndPauseStatus |
|
|
|
|
ctx context.Context |
|
|
|
|
@ -130,6 +134,7 @@ type alertRule struct { |
|
|
|
|
|
|
|
|
|
func newAlertRule( |
|
|
|
|
parent context.Context, |
|
|
|
|
key ngmodels.AlertRuleKey, |
|
|
|
|
appURL *url.URL, |
|
|
|
|
disableGrafanaFolder bool, |
|
|
|
|
maxAttempts int64, |
|
|
|
|
@ -144,8 +149,9 @@ func newAlertRule( |
|
|
|
|
evalAppliedHook func(ngmodels.AlertRuleKey, time.Time), |
|
|
|
|
stopAppliedHook func(ngmodels.AlertRuleKey), |
|
|
|
|
) *alertRule { |
|
|
|
|
ctx, stop := util.WithCancelCause(parent) |
|
|
|
|
ctx, stop := util.WithCancelCause(ngmodels.WithRuleKey(parent, key)) |
|
|
|
|
return &alertRule{ |
|
|
|
|
key: key, |
|
|
|
|
evalCh: make(chan *Evaluation), |
|
|
|
|
updateCh: make(chan RuleVersionAndPauseStatus), |
|
|
|
|
ctx: ctx, |
|
|
|
|
@ -161,7 +167,7 @@ func newAlertRule( |
|
|
|
|
evalAppliedHook: evalAppliedHook, |
|
|
|
|
stopAppliedHook: stopAppliedHook, |
|
|
|
|
metrics: met, |
|
|
|
|
logger: logger, |
|
|
|
|
logger: logger.FromContext(ctx), |
|
|
|
|
tracer: tracer, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
@ -174,6 +180,11 @@ func newAlertRule( |
|
|
|
|
//
|
|
|
|
|
// the second element contains a dropped message that was sent by a concurrent sender.
|
|
|
|
|
func (a *alertRule) Eval(eval *Evaluation) (bool, *Evaluation) { |
|
|
|
|
if a.key != eval.rule.GetKey() { |
|
|
|
|
// Make sure that rule has the same key. This should not happen
|
|
|
|
|
a.logger.Error("Invalid rule sent for evaluating. Skipping", "ruleKeyToEvaluate", eval.rule.GetKey().String()) |
|
|
|
|
return false, eval |
|
|
|
|
} |
|
|
|
|
// read the channel in unblocking manner to make sure that there is no concurrent send operation.
|
|
|
|
|
var droppedMsg *Evaluation |
|
|
|
|
select { |
|
|
|
|
@ -214,13 +225,13 @@ func (a *alertRule) Stop(reason error) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (a *alertRule) Run(key ngmodels.AlertRuleKey) error { |
|
|
|
|
grafanaCtx := ngmodels.WithRuleKey(a.ctx, key) |
|
|
|
|
logger := a.logger.FromContext(grafanaCtx) |
|
|
|
|
func (a *alertRule) Run() error { |
|
|
|
|
grafanaCtx := a.ctx |
|
|
|
|
logger := a.logger |
|
|
|
|
logger.Debug("Alert rule routine started") |
|
|
|
|
|
|
|
|
|
var currentFingerprint fingerprint |
|
|
|
|
defer a.stopApplied(key) |
|
|
|
|
defer a.stopApplied() |
|
|
|
|
for { |
|
|
|
|
select { |
|
|
|
|
// used by external services (API) to notify that rule is updated.
|
|
|
|
|
@ -232,7 +243,7 @@ func (a *alertRule) Run(key ngmodels.AlertRuleKey) error { |
|
|
|
|
|
|
|
|
|
logger.Info("Clearing the state of the rule because it was updated", "isPaused", ctx.IsPaused, "fingerprint", ctx.Fingerprint) |
|
|
|
|
// clear the state. So the next evaluation will start from the scratch.
|
|
|
|
|
a.resetState(grafanaCtx, key, ctx.IsPaused) |
|
|
|
|
a.resetState(grafanaCtx, ctx.IsPaused) |
|
|
|
|
currentFingerprint = ctx.Fingerprint |
|
|
|
|
// evalCh - used by the scheduler to signal that evaluation is needed.
|
|
|
|
|
case ctx, ok := <-a.evalCh: |
|
|
|
|
@ -242,14 +253,14 @@ func (a *alertRule) Run(key ngmodels.AlertRuleKey) error { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func() { |
|
|
|
|
orgID := fmt.Sprint(key.OrgID) |
|
|
|
|
orgID := fmt.Sprint(a.key.OrgID) |
|
|
|
|
evalDuration := a.metrics.EvalDuration.WithLabelValues(orgID) |
|
|
|
|
evalTotal := a.metrics.EvalTotal.WithLabelValues(orgID) |
|
|
|
|
|
|
|
|
|
evalStart := a.clock.Now() |
|
|
|
|
defer func() { |
|
|
|
|
evalDuration.Observe(a.clock.Now().Sub(evalStart).Seconds()) |
|
|
|
|
a.evalApplied(key, ctx.scheduledAt) |
|
|
|
|
a.evalApplied(ctx.scheduledAt) |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
for attempt := int64(1); attempt <= a.maxAttempts; attempt++ { |
|
|
|
|
@ -266,7 +277,7 @@ func (a *alertRule) Run(key ngmodels.AlertRuleKey) error { |
|
|
|
|
// lingers in DB and won't be cleaned up until next alert rule update.
|
|
|
|
|
needReset = needReset || (currentFingerprint == 0 && isPaused) |
|
|
|
|
if needReset { |
|
|
|
|
a.resetState(grafanaCtx, key, isPaused) |
|
|
|
|
a.resetState(grafanaCtx, isPaused) |
|
|
|
|
} |
|
|
|
|
currentFingerprint = f |
|
|
|
|
if isPaused { |
|
|
|
|
@ -298,7 +309,7 @@ func (a *alertRule) Run(key ngmodels.AlertRuleKey) error { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
retry := attempt < a.maxAttempts |
|
|
|
|
err := a.evaluate(tracingCtx, key, f, attempt, ctx, span, retry) |
|
|
|
|
err := a.evaluate(tracingCtx, f, attempt, ctx, span, retry) |
|
|
|
|
// This is extremely confusing - when we exhaust all retry attempts, or we have no retryable errors
|
|
|
|
|
// we return nil - so technically, this is meaningless to know whether the evaluation has errors or not.
|
|
|
|
|
span.End() |
|
|
|
|
@ -325,8 +336,8 @@ func (a *alertRule) Run(key ngmodels.AlertRuleKey) error { |
|
|
|
|
// cases.
|
|
|
|
|
ctx, cancelFunc := context.WithTimeout(context.Background(), time.Minute) |
|
|
|
|
defer cancelFunc() |
|
|
|
|
states := a.stateManager.DeleteStateByRuleUID(ngmodels.WithRuleKey(ctx, key), key, ngmodels.StateReasonRuleDeleted) |
|
|
|
|
a.expireAndSend(grafanaCtx, key, states) |
|
|
|
|
states := a.stateManager.DeleteStateByRuleUID(ngmodels.WithRuleKey(ctx, a.key), a.key, ngmodels.StateReasonRuleDeleted) |
|
|
|
|
a.expireAndSend(grafanaCtx, states) |
|
|
|
|
} |
|
|
|
|
logger.Debug("Stopping alert rule routine") |
|
|
|
|
return nil |
|
|
|
|
@ -334,8 +345,8 @@ func (a *alertRule) Run(key ngmodels.AlertRuleKey) error { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (a *alertRule) evaluate(ctx context.Context, key ngmodels.AlertRuleKey, f fingerprint, attempt int64, e *Evaluation, span trace.Span, retry bool) error { |
|
|
|
|
orgID := fmt.Sprint(key.OrgID) |
|
|
|
|
func (a *alertRule) evaluate(ctx context.Context, f fingerprint, attempt int64, e *Evaluation, span trace.Span, retry bool) error { |
|
|
|
|
orgID := fmt.Sprint(a.key.OrgID) |
|
|
|
|
evalAttemptTotal := a.metrics.EvalAttemptTotal.WithLabelValues(orgID) |
|
|
|
|
evalAttemptFailures := a.metrics.EvalAttemptFailures.WithLabelValues(orgID) |
|
|
|
|
evalTotalFailures := a.metrics.EvalFailures.WithLabelValues(orgID) |
|
|
|
|
@ -419,7 +430,7 @@ func (a *alertRule) evaluate(ctx context.Context, key ngmodels.AlertRuleKey, f f |
|
|
|
|
state.GetRuleExtraLabels(logger, e.rule, e.folderTitle, !a.disableGrafanaFolder), |
|
|
|
|
func(ctx context.Context, statesToSend state.StateTransitions) { |
|
|
|
|
start := a.clock.Now() |
|
|
|
|
alerts := a.send(ctx, key, statesToSend) |
|
|
|
|
alerts := a.send(ctx, statesToSend) |
|
|
|
|
span.AddEvent("results sent", trace.WithAttributes( |
|
|
|
|
attribute.Int64("alerts_sent", int64(len(alerts.PostableAlerts))), |
|
|
|
|
)) |
|
|
|
|
@ -432,52 +443,52 @@ func (a *alertRule) evaluate(ctx context.Context, key ngmodels.AlertRuleKey, f f |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// send sends alerts for the given state transitions.
|
|
|
|
|
func (a *alertRule) send(ctx context.Context, key ngmodels.AlertRuleKey, states state.StateTransitions) definitions.PostableAlerts { |
|
|
|
|
func (a *alertRule) send(ctx context.Context, states state.StateTransitions) definitions.PostableAlerts { |
|
|
|
|
alerts := definitions.PostableAlerts{PostableAlerts: make([]models.PostableAlert, 0, len(states))} |
|
|
|
|
for _, alertState := range states { |
|
|
|
|
alerts.PostableAlerts = append(alerts.PostableAlerts, *state.StateToPostableAlert(alertState, a.appURL)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if len(alerts.PostableAlerts) > 0 { |
|
|
|
|
a.sender.Send(ctx, key, alerts) |
|
|
|
|
a.sender.Send(ctx, a.key, alerts) |
|
|
|
|
} |
|
|
|
|
return alerts |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// sendExpire sends alerts to expire all previously firing alerts in the provided state transitions.
|
|
|
|
|
func (a *alertRule) expireAndSend(ctx context.Context, key ngmodels.AlertRuleKey, states []state.StateTransition) { |
|
|
|
|
func (a *alertRule) expireAndSend(ctx context.Context, states []state.StateTransition) { |
|
|
|
|
expiredAlerts := state.FromAlertsStateToStoppedAlert(states, a.appURL, a.clock) |
|
|
|
|
if len(expiredAlerts.PostableAlerts) > 0 { |
|
|
|
|
a.sender.Send(ctx, key, expiredAlerts) |
|
|
|
|
a.sender.Send(ctx, a.key, expiredAlerts) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (a *alertRule) resetState(ctx context.Context, key ngmodels.AlertRuleKey, isPaused bool) { |
|
|
|
|
rule := a.ruleProvider.get(key) |
|
|
|
|
func (a *alertRule) resetState(ctx context.Context, isPaused bool) { |
|
|
|
|
rule := a.ruleProvider.get(a.key) |
|
|
|
|
reason := ngmodels.StateReasonUpdated |
|
|
|
|
if isPaused { |
|
|
|
|
reason = ngmodels.StateReasonPaused |
|
|
|
|
} |
|
|
|
|
states := a.stateManager.ResetStateByRuleUID(ctx, rule, reason) |
|
|
|
|
a.expireAndSend(ctx, key, states) |
|
|
|
|
a.expireAndSend(ctx, states) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// evalApplied is only used on tests.
|
|
|
|
|
func (a *alertRule) evalApplied(alertDefKey ngmodels.AlertRuleKey, now time.Time) { |
|
|
|
|
func (a *alertRule) evalApplied(now time.Time) { |
|
|
|
|
if a.evalAppliedHook == nil { |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
a.evalAppliedHook(alertDefKey, now) |
|
|
|
|
a.evalAppliedHook(a.key, now) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// stopApplied is only used on tests.
|
|
|
|
|
func (a *alertRule) stopApplied(alertDefKey ngmodels.AlertRuleKey) { |
|
|
|
|
func (a *alertRule) stopApplied() { |
|
|
|
|
if a.stopAppliedHook == nil { |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
a.stopAppliedHook(alertDefKey) |
|
|
|
|
a.stopAppliedHook(a.key) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func SchedulerUserFor(orgID int64) *user.SignedInUser { |
|
|
|
|
|