From c38f6ff1827d47d8d087ac0a3c177900d38f58d0 Mon Sep 17 00:00:00 2001 From: Carl Bergquist Date: Mon, 3 Oct 2016 09:38:03 +0200 Subject: [PATCH] Make alerting notifcations sync (#6158) * tech(routines): move the async logic from notification to alerting notifier * tech(notification): reduce code dupe * fix(notification): dont touch the response unless its an error * feat(alerting): make alerting exeuction async but flow sync * tech(alerting): remove commented code * tech(alerting): remove unused code * tech(alerting): fix typo * tech(alerting): implement Context on EvalContext * tech(alerting): wait for all alerts to return * feat(alerting): dont allow alert responses to cancel * Revert "feat(alerting): dont allow alert responses to cancel" This reverts commit 324b006c96687da18a542942f39c10c99119430c. * feat(alerting): give alerts some time to finish before closing down --- pkg/api/metrics.go | 3 +- pkg/bus/bus.go | 40 +++++++++ pkg/models/notifications.go | 11 +++ pkg/services/alerting/conditions/query.go | 2 +- .../alerting/conditions/query_test.go | 3 +- pkg/services/alerting/engine.go | 89 ++++++++----------- pkg/services/alerting/eval_context.go | 26 ++++-- pkg/services/alerting/eval_handler.go | 46 +--------- pkg/services/alerting/eval_handler_test.go | 9 +- pkg/services/alerting/interfaces.go | 8 +- pkg/services/alerting/notifier.go | 20 +++-- pkg/services/alerting/notifier_test.go | 2 +- pkg/services/alerting/notifiers/email.go | 40 +++++---- pkg/services/alerting/notifiers/slack.go | 35 ++++---- pkg/services/alerting/notifiers/webhook.go | 24 ++--- pkg/services/alerting/result_handler.go | 56 ++++++------ pkg/services/alerting/test_notification.go | 7 +- pkg/services/alerting/test_rule.go | 3 +- pkg/services/notifications/mailer.go | 50 +++++++++++ pkg/services/notifications/notifications.go | 60 ++++++------- .../notifications/notifications_test.go | 3 +- pkg/services/notifications/webhook.go | 28 +++--- pkg/tsdb/batch.go | 13 +-- pkg/tsdb/executor.go | 4 +- pkg/tsdb/fake_test.go | 4 +- pkg/tsdb/graphite/graphite.go | 12 ++- pkg/tsdb/prometheus/prometheus.go | 6 +- pkg/tsdb/request.go | 10 ++- pkg/tsdb/testdata/testdata.go | 4 +- pkg/tsdb/tsdb_test.go | 11 +-- 30 files changed, 356 insertions(+), 273 deletions(-) diff --git a/pkg/api/metrics.go b/pkg/api/metrics.go index 0fa6003d67a..1655d14e014 100644 --- a/pkg/api/metrics.go +++ b/pkg/api/metrics.go @@ -1,6 +1,7 @@ package api import ( + "context" "encoding/json" "net/http" @@ -31,7 +32,7 @@ func QueryMetrics(c *middleware.Context, reqDto dtos.MetricRequest) Response { }) } - resp, err := tsdb.HandleRequest(request) + resp, err := tsdb.HandleRequest(context.TODO(), request) if err != nil { return ApiError(500, "Metric request error", err) } diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go index 6eb4b741a27..55188b2bc73 100644 --- a/pkg/bus/bus.go +++ b/pkg/bus/bus.go @@ -1,18 +1,22 @@ package bus import ( + "context" "fmt" "reflect" ) type HandlerFunc interface{} +type CtxHandlerFunc func() type Msg interface{} type Bus interface { Dispatch(msg Msg) error + DispatchCtx(ctx context.Context, msg Msg) error Publish(msg Msg) error AddHandler(handler HandlerFunc) + AddCtxHandler(handler HandlerFunc) AddEventListener(handler HandlerFunc) AddWildcardListener(handler HandlerFunc) } @@ -34,6 +38,27 @@ func New() Bus { return bus } +func (b *InProcBus) DispatchCtx(ctx context.Context, msg Msg) error { + var msgName = reflect.TypeOf(msg).Elem().Name() + + var handler = b.handlers[msgName] + if handler == nil { + return fmt.Errorf("handler not found for %s", msgName) + } + + var params = make([]reflect.Value, 2) + params[0] = reflect.ValueOf(ctx) + params[1] = reflect.ValueOf(msg) + + ret := reflect.ValueOf(handler).Call(params) + err := ret[0].Interface() + if err == nil { + return nil + } else { + return err.(error) + } +} + func (b *InProcBus) Dispatch(msg Msg) error { var msgName = reflect.TypeOf(msg).Elem().Name() @@ -90,6 +115,12 @@ func (b *InProcBus) AddHandler(handler HandlerFunc) { b.handlers[queryTypeName] = handler } +func (b *InProcBus) AddCtxHandler(handler HandlerFunc) { + handlerType := reflect.TypeOf(handler) + queryTypeName := handlerType.In(1).Elem().Name() + b.handlers[queryTypeName] = handler +} + func (b *InProcBus) AddEventListener(handler HandlerFunc) { handlerType := reflect.TypeOf(handler) eventName := handlerType.In(0).Elem().Name() @@ -105,6 +136,11 @@ func AddHandler(implName string, handler HandlerFunc) { globalBus.AddHandler(handler) } +// Package level functions +func AddCtxHandler(implName string, handler HandlerFunc) { + globalBus.AddCtxHandler(handler) +} + // Package level functions func AddEventListener(handler HandlerFunc) { globalBus.AddEventListener(handler) @@ -118,6 +154,10 @@ func Dispatch(msg Msg) error { return globalBus.Dispatch(msg) } +func DispatchCtx(ctx context.Context, msg Msg) error { + return globalBus.DispatchCtx(ctx, msg) +} + func Publish(msg Msg) error { return globalBus.Publish(msg) } diff --git a/pkg/models/notifications.go b/pkg/models/notifications.go index d357b9cf562..759efe41a02 100644 --- a/pkg/models/notifications.go +++ b/pkg/models/notifications.go @@ -12,6 +12,10 @@ type SendEmailCommand struct { Info string } +type SendEmailCommandSync struct { + SendEmailCommand +} + type SendWebhook struct { Url string User string @@ -19,6 +23,13 @@ type SendWebhook struct { Body string } +type SendWebhookSync struct { + Url string + User string + Password string + Body string +} + type SendResetPasswordEmailCommand struct { User *User } diff --git a/pkg/services/alerting/conditions/query.go b/pkg/services/alerting/conditions/query.go index b5300a261a3..b37123df5c3 100644 --- a/pkg/services/alerting/conditions/query.go +++ b/pkg/services/alerting/conditions/query.go @@ -82,7 +82,7 @@ func (c *QueryCondition) executeQuery(context *alerting.EvalContext, timeRange * req := c.getRequestForAlertRule(getDsInfo.Result, timeRange) result := make(tsdb.TimeSeriesSlice, 0) - resp, err := c.HandleRequest(req) + resp, err := c.HandleRequest(context.Context, req) if err != nil { return nil, fmt.Errorf("tsdb.HandleRequest() error %v", err) } diff --git a/pkg/services/alerting/conditions/query_test.go b/pkg/services/alerting/conditions/query_test.go index 51c4226f81c..43e0381a80c 100644 --- a/pkg/services/alerting/conditions/query_test.go +++ b/pkg/services/alerting/conditions/query_test.go @@ -1,6 +1,7 @@ package conditions import ( + "context" "testing" null "gopkg.in/guregu/null.v3" @@ -137,7 +138,7 @@ func (ctx *queryConditionTestContext) exec() { ctx.condition = condition - condition.HandleRequest = func(req *tsdb.Request) (*tsdb.Response, error) { + condition.HandleRequest = func(context context.Context, req *tsdb.Request) (*tsdb.Response, error) { return &tsdb.Response{ Results: map[string]*tsdb.QueryResult{ "A": {Series: ctx.series}, diff --git a/pkg/services/alerting/engine.go b/pkg/services/alerting/engine.go index 10b8af64119..dd6e88294a8 100644 --- a/pkg/services/alerting/engine.go +++ b/pkg/services/alerting/engine.go @@ -11,7 +11,6 @@ import ( type Engine struct { execQueue chan *Job - resultQueue chan *EvalContext clock clock.Clock ticker *Ticker scheduler Scheduler @@ -25,7 +24,6 @@ func NewEngine() *Engine { e := &Engine{ ticker: NewTicker(time.Now(), time.Second*0, clock.New()), execQueue: make(chan *Job, 1000), - resultQueue: make(chan *EvalContext, 1000), scheduler: NewScheduler(), evalHandler: NewEvalHandler(), ruleReader: NewRuleReader(), @@ -39,23 +37,17 @@ func NewEngine() *Engine { func (e *Engine) Run(ctx context.Context) error { e.log.Info("Initializing Alerting") - g, ctx := errgroup.WithContext(ctx) + alertGroup, ctx := errgroup.WithContext(ctx) - g.Go(func() error { return e.alertingTicker(ctx) }) - g.Go(func() error { return e.execDispatcher(ctx) }) - g.Go(func() error { return e.resultDispatcher(ctx) }) + alertGroup.Go(func() error { return e.alertingTicker(ctx) }) + alertGroup.Go(func() error { return e.runJobDispatcher(ctx) }) - err := g.Wait() + err := alertGroup.Wait() e.log.Info("Stopped Alerting", "reason", err) return err } -func (e *Engine) Stop() { - close(e.execQueue) - close(e.resultQueue) -} - func (e *Engine) alertingTicker(grafanaCtx context.Context) error { defer func() { if err := recover(); err != nil { @@ -81,69 +73,58 @@ func (e *Engine) alertingTicker(grafanaCtx context.Context) error { } } -func (e *Engine) execDispatcher(grafanaCtx context.Context) error { +func (e *Engine) runJobDispatcher(grafanaCtx context.Context) error { + dispatcherGroup, alertCtx := errgroup.WithContext(grafanaCtx) + for { select { case <-grafanaCtx.Done(): - close(e.resultQueue) - return grafanaCtx.Err() + return dispatcherGroup.Wait() case job := <-e.execQueue: - go e.executeJob(grafanaCtx, job) + dispatcherGroup.Go(func() error { return e.processJob(alertCtx, job) }) } } } -func (e *Engine) executeJob(grafanaCtx context.Context, job *Job) error { +var ( + unfinishedWorkTimeout time.Duration = time.Second * 5 + alertTimeout time.Duration = time.Second * 30 +) + +func (e *Engine) processJob(grafanaCtx context.Context, job *Job) error { defer func() { if err := recover(); err != nil { - e.log.Error("Execute Alert Panic", "error", err, "stack", log.Stack(1)) + e.log.Error("Alert Panic", "error", err, "stack", log.Stack(1)) } }() - done := make(chan *EvalContext, 1) + alertCtx, cancelFn := context.WithTimeout(context.TODO(), alertTimeout) + + job.Running = true + evalContext := NewEvalContext(alertCtx, job.Rule) + + done := make(chan struct{}) + go func() { - job.Running = true - context := NewEvalContext(job.Rule) - e.evalHandler.Eval(context) - job.Running = false - done <- context + e.evalHandler.Eval(evalContext) + e.resultHandler.Handle(evalContext) close(done) }() + var err error = nil select { - case <-grafanaCtx.Done(): - return grafanaCtx.Err() - case evalContext := <-done: - e.resultQueue <- evalContext - } - - return nil -} - -func (e *Engine) resultDispatcher(grafanaCtx context.Context) error { - for { select { - case <-grafanaCtx.Done(): - //handle all responses before shutting down. - for result := range e.resultQueue { - e.handleResponse(result) - } - - return grafanaCtx.Err() - case result := <-e.resultQueue: - e.handleResponse(result) + case <-time.After(unfinishedWorkTimeout): + cancelFn() + err = grafanaCtx.Err() + case <-done: } + case <-done: } -} -func (e *Engine) handleResponse(result *EvalContext) { - defer func() { - if err := recover(); err != nil { - e.log.Error("Panic in resultDispatcher", "error", err, "stack", log.Stack(1)) - } - }() - - e.log.Debug("Alert Rule Result", "ruleId", result.Rule.Id, "firing", result.Firing) - e.resultHandler.Handle(result) + e.log.Debug("Job Execution completed", "timeMs", evalContext.GetDurationMs(), "alertId", evalContext.Rule.Id, "name", evalContext.Rule.Name, "firing", evalContext.Firing) + job.Running = false + cancelFn() + return err } diff --git a/pkg/services/alerting/eval_context.go b/pkg/services/alerting/eval_context.go index a76ed8d519f..4903069b887 100644 --- a/pkg/services/alerting/eval_context.go +++ b/pkg/services/alerting/eval_context.go @@ -1,6 +1,7 @@ package alerting import ( + "context" "fmt" "time" @@ -20,14 +21,30 @@ type EvalContext struct { StartTime time.Time EndTime time.Time Rule *Rule - DoneChan chan bool - CancelChan chan bool log log.Logger dashboardSlug string ImagePublicUrl string ImageOnDiskPath string NoDataFound bool RetryCount int + + Context context.Context +} + +func (evalContext *EvalContext) Deadline() (deadline time.Time, ok bool) { + return evalContext.Deadline() +} + +func (evalContext *EvalContext) Done() <-chan struct{} { + return evalContext.Context.Done() +} + +func (evalContext *EvalContext) Err() error { + return evalContext.Context.Err() +} + +func (evalContext *EvalContext) Value(key interface{}) interface{} { + return evalContext.Context.Value(key) } type StateDescription struct { @@ -94,14 +111,13 @@ func (c *EvalContext) GetRuleUrl() (string, error) { } } -func NewEvalContext(rule *Rule) *EvalContext { +func NewEvalContext(alertCtx context.Context, rule *Rule) *EvalContext { return &EvalContext{ + Context: alertCtx, StartTime: time.Now(), Rule: rule, Logs: make([]*ResultLogEntry, 0), EvalMatches: make([]*EvalMatch, 0), - DoneChan: make(chan bool, 1), - CancelChan: make(chan bool, 1), log: log.New("alerting.evalContext"), RetryCount: 0, } diff --git a/pkg/services/alerting/eval_handler.go b/pkg/services/alerting/eval_handler.go index a5599b96d2c..74054ba8191 100644 --- a/pkg/services/alerting/eval_handler.go +++ b/pkg/services/alerting/eval_handler.go @@ -1,17 +1,12 @@ package alerting import ( - "fmt" "time" "github.com/grafana/grafana/pkg/log" "github.com/grafana/grafana/pkg/metrics" ) -var ( - MaxRetries int = 1 -) - type DefaultEvalHandler struct { log log.Logger alertJobTimeout time.Duration @@ -20,49 +15,11 @@ type DefaultEvalHandler struct { func NewEvalHandler() *DefaultEvalHandler { return &DefaultEvalHandler{ log: log.New("alerting.evalHandler"), - alertJobTimeout: time.Second * 15, + alertJobTimeout: time.Second * 5, } } func (e *DefaultEvalHandler) Eval(context *EvalContext) { - go e.eval(context) - - select { - case <-time.After(e.alertJobTimeout): - context.Error = fmt.Errorf("Execution timed out after %v", e.alertJobTimeout) - context.EndTime = time.Now() - e.log.Debug("Job Execution timeout", "alertId", context.Rule.Id, "timeout setting", e.alertJobTimeout) - e.retry(context) - case <-context.DoneChan: - e.log.Debug("Job Execution done", "timeMs", context.GetDurationMs(), "alertId", context.Rule.Id, "firing", context.Firing) - - if context.Error != nil { - e.retry(context) - } - } -} - -func (e *DefaultEvalHandler) retry(context *EvalContext) { - e.log.Debug("Retrying eval exeuction", "alertId", context.Rule.Id) - - if context.RetryCount < MaxRetries { - context.DoneChan = make(chan bool, 1) - context.CancelChan = make(chan bool, 1) - context.RetryCount++ - e.Eval(context) - } -} - -func (e *DefaultEvalHandler) eval(context *EvalContext) { - defer func() { - if err := recover(); err != nil { - e.log.Error("Alerting rule eval panic", "error", err, "stack", log.Stack(1)) - if panicErr, ok := err.(error); ok { - context.Error = panicErr - } - } - }() - for _, condition := range context.Rule.Conditions { condition.Eval(context) @@ -80,5 +37,4 @@ func (e *DefaultEvalHandler) eval(context *EvalContext) { context.EndTime = time.Now() elapsedTime := context.EndTime.Sub(context.StartTime) / time.Millisecond metrics.M_Alerting_Exeuction_Time.Update(elapsedTime) - context.DoneChan <- true } diff --git a/pkg/services/alerting/eval_handler_test.go b/pkg/services/alerting/eval_handler_test.go index ae5b4e4501d..b69e62f9622 100644 --- a/pkg/services/alerting/eval_handler_test.go +++ b/pkg/services/alerting/eval_handler_test.go @@ -1,6 +1,7 @@ package alerting import ( + "context" "testing" . "github.com/smartystreets/goconvey/convey" @@ -19,25 +20,25 @@ func TestAlertingExecutor(t *testing.T) { handler := NewEvalHandler() Convey("Show return triggered with single passing condition", func() { - context := NewEvalContext(&Rule{ + context := NewEvalContext(context.TODO(), &Rule{ Conditions: []Condition{&conditionStub{ firing: true, }}, }) - handler.eval(context) + handler.Eval(context) So(context.Firing, ShouldEqual, true) }) Convey("Show return false with not passing condition", func() { - context := NewEvalContext(&Rule{ + context := NewEvalContext(context.TODO(), &Rule{ Conditions: []Condition{ &conditionStub{firing: true}, &conditionStub{firing: false}, }, }) - handler.eval(context) + handler.Eval(context) So(context.Firing, ShouldEqual, false) }) }) diff --git a/pkg/services/alerting/interfaces.go b/pkg/services/alerting/interfaces.go index 78ffc280375..88c006f9e47 100644 --- a/pkg/services/alerting/interfaces.go +++ b/pkg/services/alerting/interfaces.go @@ -1,11 +1,9 @@ package alerting -import ( - "time" -) +import "time" type EvalHandler interface { - Eval(context *EvalContext) + Eval(evalContext *EvalContext) } type Scheduler interface { @@ -14,7 +12,7 @@ type Scheduler interface { } type Notifier interface { - Notify(alertResult *EvalContext) + Notify(evalContext *EvalContext) error GetType() string NeedsImage() bool PassesFilter(rule *Rule) bool diff --git a/pkg/services/alerting/notifier.go b/pkg/services/alerting/notifier.go index 39569a94bf1..c44ea777595 100644 --- a/pkg/services/alerting/notifier.go +++ b/pkg/services/alerting/notifier.go @@ -4,6 +4,8 @@ import ( "errors" "fmt" + "golang.org/x/sync/errgroup" + "github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/components/imguploader" "github.com/grafana/grafana/pkg/components/renderer" @@ -33,32 +35,36 @@ func (n *RootNotifier) PassesFilter(rule *Rule) bool { return false } -func (n *RootNotifier) Notify(context *EvalContext) { +func (n *RootNotifier) Notify(context *EvalContext) error { n.log.Info("Sending notifications for", "ruleId", context.Rule.Id) notifiers, err := n.getNotifiers(context.Rule.OrgId, context.Rule.Notifications, context) if err != nil { - n.log.Error("Failed to read notifications", "error", err) - return + return err } if len(notifiers) == 0 { - return + return nil } err = n.uploadImage(context) if err != nil { n.log.Error("Failed to upload alert panel image", "error", err) + return err } - n.sendNotifications(notifiers, context) + return n.sendNotifications(context, notifiers) } -func (n *RootNotifier) sendNotifications(notifiers []Notifier, context *EvalContext) { +func (n *RootNotifier) sendNotifications(context *EvalContext, notifiers []Notifier) error { + g, _ := errgroup.WithContext(context.Context) + for _, notifier := range notifiers { n.log.Info("Sending notification", "firing", context.Firing, "type", notifier.GetType()) - go notifier.Notify(context) + g.Go(func() error { return notifier.Notify(context) }) } + + return g.Wait() } func (n *RootNotifier) uploadImage(context *EvalContext) (err error) { diff --git a/pkg/services/alerting/notifier_test.go b/pkg/services/alerting/notifier_test.go index c854d8475b5..5c79187078f 100644 --- a/pkg/services/alerting/notifier_test.go +++ b/pkg/services/alerting/notifier_test.go @@ -22,7 +22,7 @@ func (fn *FakeNotifier) NeedsImage() bool { return true } -func (fn *FakeNotifier) Notify(alertResult *EvalContext) {} +func (fn *FakeNotifier) Notify(alertResult *EvalContext) error { return nil } func (fn *FakeNotifier) PassesFilter(rule *Rule) bool { return fn.FakeMatchResult diff --git a/pkg/services/alerting/notifiers/email.go b/pkg/services/alerting/notifiers/email.go index eccd3ce9dfb..dfb12e9cb7b 100644 --- a/pkg/services/alerting/notifiers/email.go +++ b/pkg/services/alerting/notifiers/email.go @@ -35,33 +35,39 @@ func NewEmailNotifier(model *m.AlertNotification) (alerting.Notifier, error) { }, nil } -func (this *EmailNotifier) Notify(context *alerting.EvalContext) { +func (this *EmailNotifier) Notify(evalContext *alerting.EvalContext) error { this.log.Info("Sending alert notification to", "addresses", this.Addresses) metrics.M_Alerting_Notification_Sent_Email.Inc(1) - ruleUrl, err := context.GetRuleUrl() + ruleUrl, err := evalContext.GetRuleUrl() if err != nil { this.log.Error("Failed get rule link", "error", err) - return + return err } - cmd := &m.SendEmailCommand{ - Data: map[string]interface{}{ - "Title": context.GetNotificationTitle(), - "State": context.Rule.State, - "Name": context.Rule.Name, - "StateModel": context.GetStateModel(), - "Message": context.Rule.Message, - "RuleUrl": ruleUrl, - "ImageLink": context.ImagePublicUrl, - "AlertPageUrl": setting.AppUrl + "alerting", - "EvalMatches": context.EvalMatches, + cmd := &m.SendEmailCommandSync{ + SendEmailCommand: m.SendEmailCommand{ + Data: map[string]interface{}{ + "Title": evalContext.GetNotificationTitle(), + "State": evalContext.Rule.State, + "Name": evalContext.Rule.Name, + "StateModel": evalContext.GetStateModel(), + "Message": evalContext.Rule.Message, + "RuleUrl": ruleUrl, + "ImageLink": evalContext.ImagePublicUrl, + "AlertPageUrl": setting.AppUrl + "alerting", + "EvalMatches": evalContext.EvalMatches, + }, + To: this.Addresses, + Template: "alert_notification.html", }, - To: this.Addresses, - Template: "alert_notification.html", } - if err := bus.Dispatch(cmd); err != nil { + err = bus.DispatchCtx(evalContext, cmd) + + if err != nil { this.log.Error("Failed to send alert notification email", "error", err) } + return nil + } diff --git a/pkg/services/alerting/notifiers/slack.go b/pkg/services/alerting/notifiers/slack.go index d0d67ca5a88..ff71ae5d8c8 100644 --- a/pkg/services/alerting/notifiers/slack.go +++ b/pkg/services/alerting/notifiers/slack.go @@ -35,19 +35,19 @@ type SlackNotifier struct { log log.Logger } -func (this *SlackNotifier) Notify(context *alerting.EvalContext) { - this.log.Info("Executing slack notification", "ruleId", context.Rule.Id, "notification", this.Name) +func (this *SlackNotifier) Notify(evalContext *alerting.EvalContext) error { + this.log.Info("Executing slack notification", "ruleId", evalContext.Rule.Id, "notification", this.Name) metrics.M_Alerting_Notification_Sent_Slack.Inc(1) - ruleUrl, err := context.GetRuleUrl() + ruleUrl, err := evalContext.GetRuleUrl() if err != nil { this.log.Error("Failed get rule link", "error", err) - return + return err } fields := make([]map[string]interface{}, 0) fieldLimitCount := 4 - for index, evt := range context.EvalMatches { + for index, evt := range evalContext.EvalMatches { fields = append(fields, map[string]interface{}{ "title": evt.Metric, "value": evt.Value, @@ -58,44 +58,41 @@ func (this *SlackNotifier) Notify(context *alerting.EvalContext) { } } - if context.Error != nil { + if evalContext.Error != nil { fields = append(fields, map[string]interface{}{ "title": "Error message", - "value": context.Error.Error(), + "value": evalContext.Error.Error(), "short": false, }) } message := "" - if context.Rule.State != m.AlertStateOK { //dont add message when going back to alert state ok. - message = context.Rule.Message + if evalContext.Rule.State != m.AlertStateOK { //dont add message when going back to alert state ok. + message = evalContext.Rule.Message } body := map[string]interface{}{ "attachments": []map[string]interface{}{ { - "color": context.GetStateModel().Color, - "title": context.GetNotificationTitle(), + "color": evalContext.GetStateModel().Color, + "title": evalContext.GetNotificationTitle(), "title_link": ruleUrl, "text": message, "fields": fields, - "image_url": context.ImagePublicUrl, + "image_url": evalContext.ImagePublicUrl, "footer": "Grafana v" + setting.BuildVersion, "footer_icon": "http://grafana.org/assets/img/fav32.png", "ts": time.Now().Unix(), - //"pretext": "Optional text that appears above the attachment block", - // "author_name": "Bobby Tables", - // "author_link": "http://flickr.com/bobby/", - // "author_icon": "http://flickr.com/icons/bobby.jpg", - // "thumb_url": "http://example.com/path/to/thumb.png", }, }, } data, _ := json.Marshal(&body) - cmd := &m.SendWebhook{Url: this.Url, Body: string(data)} + cmd := &m.SendWebhookSync{Url: this.Url, Body: string(data)} - if err := bus.Dispatch(cmd); err != nil { + if err := bus.DispatchCtx(evalContext, cmd); err != nil { this.log.Error("Failed to send slack notification", "error", err, "webhook", this.Name) } + + return nil } diff --git a/pkg/services/alerting/notifiers/webhook.go b/pkg/services/alerting/notifiers/webhook.go index 320f273eddc..223ea2a5d54 100644 --- a/pkg/services/alerting/notifiers/webhook.go +++ b/pkg/services/alerting/notifiers/webhook.go @@ -36,36 +36,38 @@ type WebhookNotifier struct { log log.Logger } -func (this *WebhookNotifier) Notify(context *alerting.EvalContext) { +func (this *WebhookNotifier) Notify(evalContext *alerting.EvalContext) error { this.log.Info("Sending webhook") metrics.M_Alerting_Notification_Sent_Webhook.Inc(1) bodyJSON := simplejson.New() - bodyJSON.Set("title", context.GetNotificationTitle()) - bodyJSON.Set("ruleId", context.Rule.Id) - bodyJSON.Set("ruleName", context.Rule.Name) - bodyJSON.Set("state", context.Rule.State) - bodyJSON.Set("evalMatches", context.EvalMatches) + bodyJSON.Set("title", evalContext.GetNotificationTitle()) + bodyJSON.Set("ruleId", evalContext.Rule.Id) + bodyJSON.Set("ruleName", evalContext.Rule.Name) + bodyJSON.Set("state", evalContext.Rule.State) + bodyJSON.Set("evalMatches", evalContext.EvalMatches) - ruleUrl, err := context.GetRuleUrl() + ruleUrl, err := evalContext.GetRuleUrl() if err == nil { bodyJSON.Set("rule_url", ruleUrl) } - if context.ImagePublicUrl != "" { - bodyJSON.Set("image_url", context.ImagePublicUrl) + if evalContext.ImagePublicUrl != "" { + bodyJSON.Set("image_url", evalContext.ImagePublicUrl) } body, _ := bodyJSON.MarshalJSON() - cmd := &m.SendWebhook{ + cmd := &m.SendWebhookSync{ Url: this.Url, User: this.User, Password: this.Password, Body: string(body), } - if err := bus.Dispatch(cmd); err != nil { + if err := bus.DispatchCtx(evalContext, cmd); err != nil { this.log.Error("Failed to send webhook", "error", err, "webhook", this.Name) } + + return nil } diff --git a/pkg/services/alerting/result_handler.go b/pkg/services/alerting/result_handler.go index bb9f46d6084..4372955803a 100644 --- a/pkg/services/alerting/result_handler.go +++ b/pkg/services/alerting/result_handler.go @@ -12,7 +12,7 @@ import ( ) type ResultHandler interface { - Handle(ctx *EvalContext) + Handle(evalContext *EvalContext) error } type DefaultResultHandler struct { @@ -27,36 +27,36 @@ func NewResultHandler() *DefaultResultHandler { } } -func (handler *DefaultResultHandler) Handle(ctx *EvalContext) { - oldState := ctx.Rule.State +func (handler *DefaultResultHandler) Handle(evalContext *EvalContext) error { + oldState := evalContext.Rule.State exeuctionError := "" annotationData := simplejson.New() - if ctx.Error != nil { - handler.log.Error("Alert Rule Result Error", "ruleId", ctx.Rule.Id, "error", ctx.Error) - ctx.Rule.State = m.AlertStateExecError - exeuctionError = ctx.Error.Error() + if evalContext.Error != nil { + handler.log.Error("Alert Rule Result Error", "ruleId", evalContext.Rule.Id, "error", evalContext.Error) + evalContext.Rule.State = m.AlertStateExecError + exeuctionError = evalContext.Error.Error() annotationData.Set("errorMessage", exeuctionError) - } else if ctx.Firing { - ctx.Rule.State = m.AlertStateAlerting - annotationData = simplejson.NewFromAny(ctx.EvalMatches) + } else if evalContext.Firing { + evalContext.Rule.State = m.AlertStateAlerting + annotationData = simplejson.NewFromAny(evalContext.EvalMatches) } else { // handle no data case - if ctx.NoDataFound { - ctx.Rule.State = ctx.Rule.NoDataState + if evalContext.NoDataFound { + evalContext.Rule.State = evalContext.Rule.NoDataState } else { - ctx.Rule.State = m.AlertStateOK + evalContext.Rule.State = m.AlertStateOK } } - countStateResult(ctx.Rule.State) - if ctx.Rule.State != oldState { - handler.log.Info("New state change", "alertId", ctx.Rule.Id, "newState", ctx.Rule.State, "oldState", oldState) + countStateResult(evalContext.Rule.State) + if evalContext.Rule.State != oldState { + handler.log.Info("New state change", "alertId", evalContext.Rule.Id, "newState", evalContext.Rule.State, "oldState", oldState) cmd := &m.SetAlertStateCommand{ - AlertId: ctx.Rule.Id, - OrgId: ctx.Rule.OrgId, - State: ctx.Rule.State, + AlertId: evalContext.Rule.Id, + OrgId: evalContext.Rule.OrgId, + State: evalContext.Rule.State, Error: exeuctionError, EvalData: annotationData, } @@ -67,14 +67,14 @@ func (handler *DefaultResultHandler) Handle(ctx *EvalContext) { // save annotation item := annotations.Item{ - OrgId: ctx.Rule.OrgId, - DashboardId: ctx.Rule.DashboardId, - PanelId: ctx.Rule.PanelId, + OrgId: evalContext.Rule.OrgId, + DashboardId: evalContext.Rule.DashboardId, + PanelId: evalContext.Rule.PanelId, Type: annotations.AlertType, - AlertId: ctx.Rule.Id, - Title: ctx.Rule.Name, - Text: ctx.GetStateModel().Text, - NewState: string(ctx.Rule.State), + AlertId: evalContext.Rule.Id, + Title: evalContext.Rule.Name, + Text: evalContext.GetStateModel().Text, + NewState: string(evalContext.Rule.State), PrevState: string(oldState), Epoch: time.Now().Unix(), Data: annotationData, @@ -85,8 +85,10 @@ func (handler *DefaultResultHandler) Handle(ctx *EvalContext) { handler.log.Error("Failed to save annotation for new alert state", "error", err) } - handler.notifier.Notify(ctx) + handler.notifier.Notify(evalContext) } + + return nil } func countStateResult(state m.AlertStateType) { diff --git a/pkg/services/alerting/test_notification.go b/pkg/services/alerting/test_notification.go index de2cb981aaa..b21ee7b24d9 100644 --- a/pkg/services/alerting/test_notification.go +++ b/pkg/services/alerting/test_notification.go @@ -1,6 +1,8 @@ package alerting import ( + "context" + "github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/log" @@ -35,13 +37,12 @@ func handleNotificationTestCommand(cmd *NotificationTestCommand) error { return err } - notifier.sendNotifications([]Notifier{notifiers}, createTestEvalContext()) + notifier.sendNotifications(createTestEvalContext(), []Notifier{notifiers}) return nil } func createTestEvalContext() *EvalContext { - testRule := &Rule{ DashboardId: 1, PanelId: 1, @@ -50,7 +51,7 @@ func createTestEvalContext() *EvalContext { State: m.AlertStateAlerting, } - ctx := NewEvalContext(testRule) + ctx := NewEvalContext(context.TODO(), testRule) ctx.ImagePublicUrl = "http://grafana.org/assets/img/blog/mixed_styles.png" ctx.IsTestRun = true diff --git a/pkg/services/alerting/test_rule.go b/pkg/services/alerting/test_rule.go index 25a08a3b3bf..82b1a6276a1 100644 --- a/pkg/services/alerting/test_rule.go +++ b/pkg/services/alerting/test_rule.go @@ -1,6 +1,7 @@ package alerting import ( + "context" "fmt" "github.com/grafana/grafana/pkg/bus" @@ -48,7 +49,7 @@ func handleAlertTestCommand(cmd *AlertTestCommand) error { func testAlertRule(rule *Rule) *EvalContext { handler := NewEvalHandler() - context := NewEvalContext(rule) + context := NewEvalContext(context.TODO(), rule) context.IsTestRun = true handler.Eval(context) diff --git a/pkg/services/notifications/mailer.go b/pkg/services/notifications/mailer.go index 91c75a1889e..3047065dd0d 100644 --- a/pkg/services/notifications/mailer.go +++ b/pkg/services/notifications/mailer.go @@ -5,8 +5,11 @@ package notifications import ( + "bytes" "crypto/tls" + "errors" "fmt" + "html/template" "net" "net/mail" "net/smtp" @@ -15,6 +18,7 @@ import ( "time" "github.com/grafana/grafana/pkg/log" + m "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/setting" ) @@ -185,3 +189,49 @@ func buildAndSend(msg *Message) (int, error) { } } } + +func buildEmailMessage(cmd *m.SendEmailCommand) (*Message, error) { + if !setting.Smtp.Enabled { + return nil, errors.New("Grafana mailing/smtp options not configured, contact your Grafana admin") + } + + var buffer bytes.Buffer + var err error + var subjectText interface{} + + data := cmd.Data + if data == nil { + data = make(map[string]interface{}, 10) + } + + setDefaultTemplateData(data, nil) + err = mailTemplates.ExecuteTemplate(&buffer, cmd.Template, data) + if err != nil { + return nil, err + } + + subjectData := data["Subject"].(map[string]interface{}) + subjectText, hasSubject := subjectData["value"] + + if !hasSubject { + return nil, errors.New(fmt.Sprintf("Missing subject in Template %s", cmd.Template)) + } + + subjectTmpl, err := template.New("subject").Parse(subjectText.(string)) + if err != nil { + return nil, err + } + + var subjectBuffer bytes.Buffer + err = subjectTmpl.ExecuteTemplate(&subjectBuffer, "subject", data) + if err != nil { + return nil, err + } + + return &Message{ + To: cmd.To, + From: setting.Smtp.FromAddress, + Subject: subjectBuffer.String(), + Body: buffer.String(), + }, nil +} diff --git a/pkg/services/notifications/notifications.go b/pkg/services/notifications/notifications.go index 04b11f73b84..f7762e2d3d0 100644 --- a/pkg/services/notifications/notifications.go +++ b/pkg/services/notifications/notifications.go @@ -1,7 +1,7 @@ package notifications import ( - "bytes" + "context" "errors" "fmt" "html/template" @@ -29,7 +29,10 @@ func Init() error { bus.AddHandler("email", validateResetPasswordCode) bus.AddHandler("email", sendEmailCommandHandler) + bus.AddCtxHandler("email", sendEmailCommandHandlerSync) + bus.AddHandler("webhook", sendWebhook) + bus.AddCtxHandler("webhook", SendWebhookSync) bus.AddEventListener(signUpStartedHandler) bus.AddEventListener(signUpCompletedHandler) @@ -56,6 +59,15 @@ func Init() error { return nil } +func SendWebhookSync(ctx context.Context, cmd *m.SendWebhookSync) error { + return sendWebRequestSync(ctx, &Webhook{ + Url: cmd.Url, + User: cmd.User, + Password: cmd.Password, + Body: cmd.Body, + }) +} + func sendWebhook(cmd *m.SendWebhook) error { addToWebhookQueue(&Webhook{ Url: cmd.Url, @@ -72,50 +84,32 @@ func subjectTemplateFunc(obj map[string]interface{}, value string) string { return "" } -func sendEmailCommandHandler(cmd *m.SendEmailCommand) error { - if !setting.Smtp.Enabled { - return errors.New("Grafana mailing/smtp options not configured, contact your Grafana admin") - } - - var buffer bytes.Buffer - var err error - var subjectText interface{} - - data := cmd.Data - if data == nil { - data = make(map[string]interface{}, 10) - } +func sendEmailCommandHandlerSync(ctx context.Context, cmd *m.SendEmailCommandSync) error { + message, err := buildEmailMessage(&m.SendEmailCommand{ + Data: cmd.Data, + Info: cmd.Info, + Massive: cmd.Massive, + Template: cmd.Template, + To: cmd.To, + }) - setDefaultTemplateData(data, nil) - err = mailTemplates.ExecuteTemplate(&buffer, cmd.Template, data) if err != nil { return err } - subjectData := data["Subject"].(map[string]interface{}) - subjectText, hasSubject := subjectData["value"] + _, err = buildAndSend(message) - if !hasSubject { - return errors.New(fmt.Sprintf("Missing subject in Template %s", cmd.Template)) - } + return err +} - subjectTmpl, err := template.New("subject").Parse(subjectText.(string)) - if err != nil { - return err - } +func sendEmailCommandHandler(cmd *m.SendEmailCommand) error { + message, err := buildEmailMessage(cmd) - var subjectBuffer bytes.Buffer - err = subjectTmpl.ExecuteTemplate(&subjectBuffer, "subject", data) if err != nil { return err } - addToMailQueue(&Message{ - To: cmd.To, - From: setting.Smtp.FromAddress, - Subject: subjectBuffer.String(), - Body: buffer.String(), - }) + addToMailQueue(message) return nil } diff --git a/pkg/services/notifications/notifications_test.go b/pkg/services/notifications/notifications_test.go index d4f4ee0a5fb..79db664e893 100644 --- a/pkg/services/notifications/notifications_test.go +++ b/pkg/services/notifications/notifications_test.go @@ -3,7 +3,6 @@ package notifications import ( "testing" - "github.com/grafana/grafana/pkg/bus" m "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/setting" . "github.com/smartystreets/goconvey/convey" @@ -18,7 +17,7 @@ type testTriggeredAlert struct { func TestNotifications(t *testing.T) { Convey("Given the notifications service", t, func() { - bus.ClearBusHandlers() + //bus.ClearBusHandlers() setting.StaticRootPath = "../../../public/" setting.Smtp.Enabled = true diff --git a/pkg/services/notifications/webhook.go b/pkg/services/notifications/webhook.go index 67ffa43900a..d5b8a718d52 100644 --- a/pkg/services/notifications/webhook.go +++ b/pkg/services/notifications/webhook.go @@ -2,11 +2,14 @@ package notifications import ( "bytes" + "context" "fmt" "io/ioutil" "net/http" "time" + "golang.org/x/net/context/ctxhttp" + "github.com/grafana/grafana/pkg/log" "github.com/grafana/grafana/pkg/util" ) @@ -31,7 +34,7 @@ func processWebhookQueue() { for { select { case webhook := <-webhookQueue: - err := sendWebRequest(webhook) + err := sendWebRequestSync(context.TODO(), webhook) if err != nil { webhookLog.Error("Failed to send webrequest ", "error", err) @@ -40,14 +43,14 @@ func processWebhookQueue() { } } -func sendWebRequest(webhook *Webhook) error { +func sendWebRequestSync(ctx context.Context, webhook *Webhook) error { webhookLog.Debug("Sending webhook", "url", webhook.Url) - client := http.Client{ + client := &http.Client{ Timeout: time.Duration(10 * time.Second), } - request, err := http.NewRequest("POST", webhook.Url, bytes.NewReader([]byte(webhook.Body))) + request, err := http.NewRequest(http.MethodPost, webhook.Url, bytes.NewReader([]byte(webhook.Body))) if webhook.User != "" && webhook.Password != "" { request.Header.Add("Authorization", util.GetBasicAuthHeader(webhook.User, webhook.Password)) } @@ -56,22 +59,23 @@ func sendWebRequest(webhook *Webhook) error { return err } - resp, err := client.Do(request) + resp, err := ctxhttp.Do(ctx, client, request) if err != nil { return err } - _, err = ioutil.ReadAll(resp.Body) - if err != nil { - return err + if resp.StatusCode/100 == 2 { + return nil } - if resp.StatusCode != 200 { - return fmt.Errorf("Webhook response code %v", resp.StatusCode) + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err } - defer resp.Body.Close() - return nil + + webhookLog.Debug("Webhook failed", "statuscode", resp.Status, "body", string(body)) + return fmt.Errorf("Webhook response status %v", resp.Status) } var addToWebhookQueue = func(msg *Webhook) { diff --git a/pkg/tsdb/batch.go b/pkg/tsdb/batch.go index 4dee7b31c86..284a158bf5f 100644 --- a/pkg/tsdb/batch.go +++ b/pkg/tsdb/batch.go @@ -1,6 +1,9 @@ package tsdb -import "errors" +import ( + "context" + "errors" +) type Batch struct { DataSourceId int64 @@ -20,7 +23,7 @@ func newBatch(dsId int64, queries QuerySlice) *Batch { } } -func (bg *Batch) process(context *QueryContext) { +func (bg *Batch) process(ctx context.Context, queryContext *QueryContext) { executor := getExecutorFor(bg.Queries[0].DataSource) if executor == nil { @@ -32,13 +35,13 @@ func (bg *Batch) process(context *QueryContext) { for _, query := range bg.Queries { result.QueryResults[query.RefId] = &QueryResult{Error: result.Error} } - context.ResultsChan <- result + queryContext.ResultsChan <- result return } - res := executor.Execute(bg.Queries, context) + res := executor.Execute(ctx, bg.Queries, queryContext) bg.Done = true - context.ResultsChan <- res + queryContext.ResultsChan <- res } func (bg *Batch) addQuery(query *Query) { diff --git a/pkg/tsdb/executor.go b/pkg/tsdb/executor.go index b39c2cdaa97..97736ef4678 100644 --- a/pkg/tsdb/executor.go +++ b/pkg/tsdb/executor.go @@ -1,7 +1,9 @@ package tsdb +import "context" + type Executor interface { - Execute(queries QuerySlice, context *QueryContext) *BatchResult + Execute(ctx context.Context, queries QuerySlice, context *QueryContext) *BatchResult } var registry map[string]GetExecutorFn diff --git a/pkg/tsdb/fake_test.go b/pkg/tsdb/fake_test.go index 2ba02792d6d..c403fdba4fb 100644 --- a/pkg/tsdb/fake_test.go +++ b/pkg/tsdb/fake_test.go @@ -1,5 +1,7 @@ package tsdb +import "context" + type FakeExecutor struct { results map[string]*QueryResult resultsFn map[string]ResultsFn @@ -14,7 +16,7 @@ func NewFakeExecutor(dsInfo *DataSourceInfo) *FakeExecutor { } } -func (e *FakeExecutor) Execute(queries QuerySlice, context *QueryContext) *BatchResult { +func (e *FakeExecutor) Execute(ctx context.Context, queries QuerySlice, context *QueryContext) *BatchResult { result := &BatchResult{QueryResults: make(map[string]*QueryResult)} for _, query := range queries { if results, has := e.results[query.RefId]; has { diff --git a/pkg/tsdb/graphite/graphite.go b/pkg/tsdb/graphite/graphite.go index 78685d52371..774bca4bca2 100644 --- a/pkg/tsdb/graphite/graphite.go +++ b/pkg/tsdb/graphite/graphite.go @@ -1,6 +1,7 @@ package graphite import ( + "context" "crypto/tls" "encoding/json" "fmt" @@ -11,6 +12,8 @@ import ( "strings" "time" + "golang.org/x/net/context/ctxhttp" + "github.com/grafana/grafana/pkg/log" "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/tsdb" @@ -26,7 +29,7 @@ func NewGraphiteExecutor(dsInfo *tsdb.DataSourceInfo) tsdb.Executor { var ( glog log.Logger - HttpClient http.Client + HttpClient *http.Client ) func init() { @@ -37,13 +40,13 @@ func init() { TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, } - HttpClient = http.Client{ + HttpClient = &http.Client{ Timeout: time.Duration(15 * time.Second), Transport: tr, } } -func (e *GraphiteExecutor) Execute(queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult { +func (e *GraphiteExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult { result := &tsdb.BatchResult{} formData := url.Values{ @@ -66,7 +69,8 @@ func (e *GraphiteExecutor) Execute(queries tsdb.QuerySlice, context *tsdb.QueryC result.Error = err return result } - res, err := HttpClient.Do(req) + + res, err := ctxhttp.Do(ctx, HttpClient, req) if err != nil { result.Error = err return result diff --git a/pkg/tsdb/prometheus/prometheus.go b/pkg/tsdb/prometheus/prometheus.go index f7e68662efa..98d4c72fd03 100644 --- a/pkg/tsdb/prometheus/prometheus.go +++ b/pkg/tsdb/prometheus/prometheus.go @@ -1,6 +1,7 @@ package prometheus import ( + "context" "fmt" "net/http" "regexp" @@ -11,7 +12,6 @@ import ( "github.com/grafana/grafana/pkg/tsdb" "github.com/prometheus/client_golang/api/prometheus" pmodel "github.com/prometheus/common/model" - "golang.org/x/net/context" ) type PrometheusExecutor struct { @@ -45,7 +45,7 @@ func (e *PrometheusExecutor) getClient() (prometheus.QueryAPI, error) { return prometheus.NewQueryAPI(client), nil } -func (e *PrometheusExecutor) Execute(queries tsdb.QuerySlice, queryContext *tsdb.QueryContext) *tsdb.BatchResult { +func (e *PrometheusExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, queryContext *tsdb.QueryContext) *tsdb.BatchResult { result := &tsdb.BatchResult{} client, err := e.getClient() @@ -64,7 +64,7 @@ func (e *PrometheusExecutor) Execute(queries tsdb.QuerySlice, queryContext *tsdb Step: query.Step, } - value, err := client.QueryRange(context.Background(), query.Expr, timeRange) + value, err := client.QueryRange(ctx, query.Expr, timeRange) if err != nil { return resultWithError(result, err) diff --git a/pkg/tsdb/request.go b/pkg/tsdb/request.go index 2e5e5eec25a..88c6eb81eac 100644 --- a/pkg/tsdb/request.go +++ b/pkg/tsdb/request.go @@ -1,8 +1,10 @@ package tsdb -type HandleRequestFunc func(req *Request) (*Response, error) +import "context" -func HandleRequest(req *Request) (*Response, error) { +type HandleRequestFunc func(ctx context.Context, req *Request) (*Response, error) + +func HandleRequest(ctx context.Context, req *Request) (*Response, error) { context := NewQueryContext(req.Queries, req.TimeRange) batches, err := getBatches(req) @@ -16,7 +18,7 @@ func HandleRequest(req *Request) (*Response, error) { if len(batch.Depends) == 0 { currentlyExecuting += 1 batch.Started = true - go batch.process(context) + go batch.process(ctx, context) } } @@ -46,7 +48,7 @@ func HandleRequest(req *Request) (*Response, error) { if batch.allDependenciesAreIn(context) { currentlyExecuting += 1 batch.Started = true - go batch.process(context) + go batch.process(ctx, context) } } } diff --git a/pkg/tsdb/testdata/testdata.go b/pkg/tsdb/testdata/testdata.go index 5b40bb6de5a..cf2dcc0f898 100644 --- a/pkg/tsdb/testdata/testdata.go +++ b/pkg/tsdb/testdata/testdata.go @@ -1,6 +1,8 @@ package testdata import ( + "context" + "github.com/grafana/grafana/pkg/log" "github.com/grafana/grafana/pkg/tsdb" ) @@ -21,7 +23,7 @@ func init() { tsdb.RegisterExecutor("grafana-testdata-datasource", NewTestDataExecutor) } -func (e *TestDataExecutor) Execute(queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult { +func (e *TestDataExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult { result := &tsdb.BatchResult{} result.QueryResults = make(map[string]*tsdb.QueryResult) diff --git a/pkg/tsdb/tsdb_test.go b/pkg/tsdb/tsdb_test.go index 429dd01d6ba..998f59a6b9d 100644 --- a/pkg/tsdb/tsdb_test.go +++ b/pkg/tsdb/tsdb_test.go @@ -1,6 +1,7 @@ package tsdb import ( + "context" "testing" "time" @@ -62,7 +63,7 @@ func TestMetricQuery(t *testing.T) { fakeExecutor := registerFakeExecutor() fakeExecutor.Return("A", TimeSeriesSlice{&TimeSeries{Name: "argh"}}) - res, err := HandleRequest(req) + res, err := HandleRequest(context.TODO(), req) So(err, ShouldBeNil) Convey("Should return query results", func() { @@ -83,7 +84,7 @@ func TestMetricQuery(t *testing.T) { fakeExecutor.Return("A", TimeSeriesSlice{&TimeSeries{Name: "argh"}}) fakeExecutor.Return("B", TimeSeriesSlice{&TimeSeries{Name: "barg"}}) - res, err := HandleRequest(req) + res, err := HandleRequest(context.TODO(), req) So(err, ShouldBeNil) Convey("Should return query results", func() { @@ -106,7 +107,7 @@ func TestMetricQuery(t *testing.T) { }, } - res, err := HandleRequest(req) + res, err := HandleRequest(context.TODO(), req) So(err, ShouldBeNil) Convey("Should have been batched in two requests", func() { @@ -121,7 +122,7 @@ func TestMetricQuery(t *testing.T) { }, } - _, err := HandleRequest(req) + _, err := HandleRequest(context.TODO(), req) So(err, ShouldNotBeNil) }) @@ -152,7 +153,7 @@ func TestMetricQuery(t *testing.T) { }} }) - res, err := HandleRequest(req) + res, err := HandleRequest(context.TODO(), req) So(err, ShouldBeNil) Convey("Should have been batched in two requests", func() {