diff --git a/pkg/services/ngalert/ngalert.go b/pkg/services/ngalert/ngalert.go index 33de353846d..ac35540ca45 100644 --- a/pkg/services/ngalert/ngalert.go +++ b/pkg/services/ngalert/ngalert.go @@ -40,6 +40,7 @@ import ( "github.com/grafana/grafana/pkg/services/ngalert/state" "github.com/grafana/grafana/pkg/services/ngalert/state/historian" "github.com/grafana/grafana/pkg/services/ngalert/store" + "github.com/grafana/grafana/pkg/services/ngalert/writer" "github.com/grafana/grafana/pkg/services/notifications" "github.com/grafana/grafana/pkg/services/pluginsintegration/pluginstore" "github.com/grafana/grafana/pkg/services/quota" @@ -303,6 +304,8 @@ func (ng *AlertNG) init() error { AlertSender: alertsRouter, Tracer: ng.tracer, Log: log.New("ngalert.scheduler"), + //TODO: replace with real writer impl + RecordingWriter: writer.FakeWriter{}, } // There are a set of feature toggles available that act as short-circuits for common configurations. diff --git a/pkg/services/ngalert/schedule/alert_rule.go b/pkg/services/ngalert/schedule/alert_rule.go index 6c9477e58f4..6ffedf58668 100644 --- a/pkg/services/ngalert/schedule/alert_rule.go +++ b/pkg/services/ngalert/schedule/alert_rule.go @@ -16,6 +16,7 @@ import ( "github.com/grafana/grafana/pkg/services/ngalert/metrics" ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models" "github.com/grafana/grafana/pkg/services/ngalert/state" + "github.com/grafana/grafana/pkg/services/ngalert/writer" "github.com/grafana/grafana/pkg/services/org" "github.com/grafana/grafana/pkg/services/user" "github.com/grafana/grafana/pkg/util" @@ -56,6 +57,7 @@ func newRuleFactory( met *metrics.Scheduler, logger log.Logger, tracer tracing.Tracer, + recordingWriter writer.Writer, evalAppliedHook evalAppliedFunc, stopAppliedHook stopAppliedFunc, ) ruleFactoryFunc { @@ -70,6 +72,7 @@ func newRuleFactory( logger, met, tracer, + recordingWriter, ) } return newAlertRule( diff --git a/pkg/services/ngalert/schedule/alert_rule_test.go b/pkg/services/ngalert/schedule/alert_rule_test.go index cc5f2466800..49b118e0b75 100644 --- a/pkg/services/ngalert/schedule/alert_rule_test.go +++ b/pkg/services/ngalert/schedule/alert_rule_test.go @@ -765,5 +765,5 @@ func TestRuleRoutine(t *testing.T) { } func ruleFactoryFromScheduler(sch *schedule) ruleFactory { - return newRuleFactory(sch.appURL, sch.disableGrafanaFolder, sch.maxAttempts, sch.alertsSender, sch.stateManager, sch.evaluatorFactory, &sch.schedulableAlertRules, sch.clock, sch.featureToggles, sch.metrics, sch.log, sch.tracer, sch.evalAppliedFunc, sch.stopAppliedFunc) + return newRuleFactory(sch.appURL, sch.disableGrafanaFolder, sch.maxAttempts, sch.alertsSender, sch.stateManager, sch.evaluatorFactory, &sch.schedulableAlertRules, sch.clock, sch.featureToggles, sch.metrics, sch.log, sch.tracer, sch.recordingWriter, sch.evalAppliedFunc, sch.stopAppliedFunc) } diff --git a/pkg/services/ngalert/schedule/recording_rule.go b/pkg/services/ngalert/schedule/recording_rule.go index 30c71f2e138..94ff252c6a7 100644 --- a/pkg/services/ngalert/schedule/recording_rule.go +++ b/pkg/services/ngalert/schedule/recording_rule.go @@ -7,12 +7,14 @@ import ( "github.com/benbjohnson/clock" "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/services/ngalert/eval" "github.com/grafana/grafana/pkg/services/ngalert/metrics" ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models" + "github.com/grafana/grafana/pkg/services/ngalert/writer" "github.com/grafana/grafana/pkg/util" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" @@ -36,9 +38,11 @@ type recordingRule struct { logger log.Logger metrics *metrics.Scheduler tracer tracing.Tracer + + writer writer.Writer } -func newRecordingRule(parent context.Context, maxAttempts int64, clock clock.Clock, evalFactory eval.EvaluatorFactory, ft featuremgmt.FeatureToggles, logger log.Logger, metrics *metrics.Scheduler, tracer tracing.Tracer) *recordingRule { +func newRecordingRule(parent context.Context, maxAttempts int64, clock clock.Clock, evalFactory eval.EvaluatorFactory, ft featuremgmt.FeatureToggles, logger log.Logger, metrics *metrics.Scheduler, tracer tracing.Tracer, writer writer.Writer) *recordingRule { ctx, stop := util.WithCancelCause(parent) return &recordingRule{ ctx: ctx, @@ -51,6 +55,7 @@ func newRecordingRule(parent context.Context, maxAttempts int64, clock clock.Clo logger: logger, metrics: metrics, tracer: tracer, + writer: writer, } } @@ -164,10 +169,10 @@ func (r *recordingRule) tryEvaluation(ctx context.Context, ev *Evaluation, logge evalAttemptFailures := r.metrics.EvalAttemptFailures.WithLabelValues(orgID) evalTotalFailures := r.metrics.EvalFailures.WithLabelValues(orgID) - start := r.clock.Now() + evalStart := r.clock.Now() evalCtx := eval.NewContext(ctx, SchedulerUserFor(ev.rule.OrgID)) result, err := r.buildAndExecutePipeline(ctx, evalCtx, ev, logger) - dur := r.clock.Now().Sub(start) + evalDur := r.clock.Now().Sub(evalStart) evalAttemptTotal.Inc() span := trace.SpanFromContext(ctx) @@ -184,10 +189,33 @@ func (r *recordingRule) tryEvaluation(ctx context.Context, ev *Evaluation, logge return fmt.Errorf("server side expressions pipeline returned an error: %w", err) } - logger.Debug("Alert rule evaluated", "results", result, "duration", dur) + logger.Debug("Alert rule evaluated", "results", result, "duration", evalDur) span.AddEvent("rule evaluated", trace.WithAttributes( attribute.Int64("results", int64(len(result.Responses))), )) + + frames, err := r.frameRef(ev.rule.Record.From, result) + if err != nil { + span.SetStatus(codes.Error, "failed to extract frames from rule evaluation") + span.RecordError(err) + return fmt.Errorf("failed to extract frames from rule evaluation: %w", err) + } + + writeStart := r.clock.Now() + err = r.writer.Write(ctx, ev.rule.Record.Metric, writeStart, frames, ev.rule.Labels) + writeDur := r.clock.Now().Sub(writeStart) + + if err != nil { + span.SetStatus(codes.Error, "failed to write metrics") + span.RecordError(err) + return fmt.Errorf("metric remote write failed: %w", err) + } + + logger.Debug("Metrics written", "duration", writeDur) + span.AddEvent("metrics written", trace.WithAttributes( + attribute.Int64("frames", int64(len(frames))), + )) + return nil } @@ -212,3 +240,17 @@ func (r *recordingRule) evaluationDoneTestHook(ev *Evaluation) { r.evalAppliedHook(ev.rule.GetKey(), ev.scheduledAt) } + +func (r *recordingRule) frameRef(refID string, resp *backend.QueryDataResponse) (data.Frames, error) { + if len(resp.Responses) == 0 { + return nil, fmt.Errorf("no responses returned from rule evaluation") + } + + for ref, resp := range resp.Responses { + if ref == refID { + return resp.Frames, nil + } + } + + return nil, fmt.Errorf("no response with refID %s found in rule evaluation", refID) +} diff --git a/pkg/services/ngalert/schedule/recording_rule_test.go b/pkg/services/ngalert/schedule/recording_rule_test.go index e1c529eadc7..82c9f6bad38 100644 --- a/pkg/services/ngalert/schedule/recording_rule_test.go +++ b/pkg/services/ngalert/schedule/recording_rule_test.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/services/featuremgmt" models "github.com/grafana/grafana/pkg/services/ngalert/models" + "github.com/grafana/grafana/pkg/services/ngalert/writer" "github.com/grafana/grafana/pkg/util" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" @@ -146,7 +147,7 @@ func TestRecordingRule(t *testing.T) { func blankRecordingRuleForTests(ctx context.Context) *recordingRule { ft := featuremgmt.WithFeatures(featuremgmt.FlagGrafanaManagedRecordingRules) - return newRecordingRule(context.Background(), 0, nil, nil, ft, log.NewNopLogger(), nil, nil) + return newRecordingRule(context.Background(), 0, nil, nil, ft, log.NewNopLogger(), nil, nil, writer.FakeWriter{}) } func TestRecordingRule_Integration(t *testing.T) { diff --git a/pkg/services/ngalert/schedule/schedule.go b/pkg/services/ngalert/schedule/schedule.go index 16ecc8b5f81..5574e52d715 100644 --- a/pkg/services/ngalert/schedule/schedule.go +++ b/pkg/services/ngalert/schedule/schedule.go @@ -19,6 +19,7 @@ import ( "github.com/grafana/grafana/pkg/services/ngalert/metrics" ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models" "github.com/grafana/grafana/pkg/services/ngalert/state" + "github.com/grafana/grafana/pkg/services/ngalert/writer" "github.com/grafana/grafana/pkg/util/ticker" ) @@ -92,6 +93,8 @@ type schedule struct { schedulableAlertRules alertRulesRegistry tracer tracing.Tracer + + recordingWriter writer.Writer } // SchedulerCfg is the scheduler configuration. @@ -110,6 +113,7 @@ type SchedulerCfg struct { AlertSender AlertsSender Tracer tracing.Tracer Log log.Logger + RecordingWriter writer.Writer } // NewScheduler returns a new scheduler. @@ -138,6 +142,7 @@ func NewScheduler(cfg SchedulerCfg, stateManager *state.Manager) *schedule { schedulableAlertRules: alertRulesRegistry{rules: make(map[ngmodels.AlertRuleKey]*ngmodels.AlertRule)}, alertsSender: cfg.AlertSender, tracer: cfg.Tracer, + recordingWriter: cfg.RecordingWriter, } return &sch @@ -254,6 +259,7 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup. sch.metrics, sch.log, sch.tracer, + sch.recordingWriter, sch.evalAppliedFunc, sch.stopAppliedFunc, ) diff --git a/pkg/services/ngalert/schedule/schedule_unit_test.go b/pkg/services/ngalert/schedule/schedule_unit_test.go index 8e440c90324..810d8ca389c 100644 --- a/pkg/services/ngalert/schedule/schedule_unit_test.go +++ b/pkg/services/ngalert/schedule/schedule_unit_test.go @@ -28,6 +28,7 @@ import ( "github.com/grafana/grafana/pkg/services/ngalert/metrics" "github.com/grafana/grafana/pkg/services/ngalert/models" "github.com/grafana/grafana/pkg/services/ngalert/state" + "github.com/grafana/grafana/pkg/services/ngalert/writer" "github.com/grafana/grafana/pkg/services/pluginsintegration/pluginstore" "github.com/grafana/grafana/pkg/setting" ) @@ -459,6 +460,8 @@ func setupScheduler(t *testing.T, rs *fakeRulesStore, is *state.FakeInstanceStor MaxAttempts: 1, } + fakeRecordingWriter := writer.FakeWriter{} + schedCfg := SchedulerCfg{ BaseInterval: cfg.BaseInterval, MaxAttempts: cfg.MaxAttempts, @@ -471,6 +474,7 @@ func setupScheduler(t *testing.T, rs *fakeRulesStore, is *state.FakeInstanceStor AlertSender: senderMock, Tracer: testTracer, Log: log.New("ngalert.scheduler"), + RecordingWriter: fakeRecordingWriter, } managerCfg := state.ManagerCfg{ Metrics: m.GetStateMetrics(), diff --git a/pkg/services/ngalert/writer/fake.go b/pkg/services/ngalert/writer/fake.go new file mode 100644 index 00000000000..44791cada02 --- /dev/null +++ b/pkg/services/ngalert/writer/fake.go @@ -0,0 +1,20 @@ +package writer + +import ( + "context" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/data" +) + +type FakeWriter struct { + WriteFunc func(ctx context.Context, name string, t time.Time, frames data.Frames, extraLabels map[string]string) error +} + +func (w FakeWriter) Write(ctx context.Context, name string, t time.Time, frames data.Frames, extraLabels map[string]string) error { + if w.WriteFunc == nil { + return nil + } + + return w.WriteFunc(ctx, name, t, frames, extraLabels) +} diff --git a/pkg/services/ngalert/writer/writer.go b/pkg/services/ngalert/writer/writer.go new file mode 100644 index 00000000000..8c52603dd12 --- /dev/null +++ b/pkg/services/ngalert/writer/writer.go @@ -0,0 +1,12 @@ +package writer + +import ( + "context" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/data" +) + +type Writer interface { + Write(ctx context.Context, name string, t time.Time, frames data.Frames, extraLabels map[string]string) error +}