mirror of https://github.com/grafana/grafana
NGAlert: Add Kafka notification channel (#34156)
Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>pull/33439/head
parent
ad1d0ae0bf
commit
b2e84277a3
@ -0,0 +1,124 @@ |
||||
package channels |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"path" |
||||
|
||||
gokit_log "github.com/go-kit/kit/log" |
||||
"github.com/prometheus/alertmanager/notify" |
||||
"github.com/prometheus/alertmanager/template" |
||||
"github.com/prometheus/alertmanager/types" |
||||
"github.com/prometheus/common/model" |
||||
|
||||
"github.com/grafana/grafana/pkg/bus" |
||||
"github.com/grafana/grafana/pkg/components/simplejson" |
||||
"github.com/grafana/grafana/pkg/infra/log" |
||||
"github.com/grafana/grafana/pkg/models" |
||||
"github.com/grafana/grafana/pkg/services/alerting" |
||||
old_notifiers "github.com/grafana/grafana/pkg/services/alerting/notifiers" |
||||
) |
||||
|
||||
// KafkaNotifier is responsible for sending
|
||||
// alert notifications to Kafka.
|
||||
type KafkaNotifier struct { |
||||
old_notifiers.NotifierBase |
||||
Endpoint string |
||||
Topic string |
||||
log log.Logger |
||||
tmpl *template.Template |
||||
} |
||||
|
||||
// NewKafkaNotifier is the constructor function for the Kafka notifier.
|
||||
func NewKafkaNotifier(model *NotificationChannelConfig, t *template.Template) (*KafkaNotifier, error) { |
||||
endpoint := model.Settings.Get("kafkaRestProxy").MustString() |
||||
if endpoint == "" { |
||||
return nil, alerting.ValidationError{Reason: "Could not find kafka rest proxy endpoint property in settings"} |
||||
} |
||||
topic := model.Settings.Get("kafkaTopic").MustString() |
||||
if topic == "" { |
||||
return nil, alerting.ValidationError{Reason: "Could not find kafka topic property in settings"} |
||||
} |
||||
|
||||
return &KafkaNotifier{ |
||||
NotifierBase: old_notifiers.NewNotifierBase(&models.AlertNotification{ |
||||
Uid: model.UID, |
||||
Name: model.Name, |
||||
Type: model.Type, |
||||
DisableResolveMessage: model.DisableResolveMessage, |
||||
Settings: model.Settings, |
||||
}), |
||||
Endpoint: endpoint, |
||||
Topic: topic, |
||||
log: log.New("alerting.notifier.kafka"), |
||||
tmpl: t, |
||||
}, nil |
||||
} |
||||
|
||||
// Notify sends the alert notification.
|
||||
func (kn *KafkaNotifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error) { |
||||
// We are using the state from 7.x to not break kafka.
|
||||
// TODO: should we switch to the new ones?
|
||||
alerts := types.Alerts(as...) |
||||
state := models.AlertStateAlerting |
||||
if alerts.Status() == model.AlertResolved { |
||||
state = models.AlertStateOK |
||||
} |
||||
|
||||
kn.log.Debug("Notifying Kafka", "alert_state", state) |
||||
|
||||
data := notify.GetTemplateData(ctx, kn.tmpl, as, gokit_log.NewNopLogger()) |
||||
var tmplErr error |
||||
tmpl := notify.TmplText(kn.tmpl, data, &tmplErr) |
||||
|
||||
bodyJSON := simplejson.New() |
||||
bodyJSON.Set("alert_state", state) |
||||
bodyJSON.Set("description", tmpl(`{{ template "default.title" . }}`)) |
||||
bodyJSON.Set("client", "Grafana") |
||||
bodyJSON.Set("details", tmpl(`{{ template "default.message" . }}`)) |
||||
bodyJSON.Set("client_url", path.Join(kn.tmpl.ExternalURL.String(), "/alerting/list")) |
||||
|
||||
groupKey, err := notify.ExtractGroupKey(ctx) |
||||
if err != nil { |
||||
return false, err |
||||
} |
||||
bodyJSON.Set("incident_key", groupKey.Hash()) |
||||
|
||||
valueJSON := simplejson.New() |
||||
valueJSON.Set("value", bodyJSON) |
||||
|
||||
recordJSON := simplejson.New() |
||||
recordJSON.Set("records", []interface{}{valueJSON}) |
||||
|
||||
if tmplErr != nil { |
||||
return false, fmt.Errorf("failed to template Kafka message: %w", tmplErr) |
||||
} |
||||
|
||||
body, err := recordJSON.MarshalJSON() |
||||
if err != nil { |
||||
return false, err |
||||
} |
||||
|
||||
topicURL := kn.Endpoint + "/topics/" + kn.Topic |
||||
|
||||
cmd := &models.SendWebhookSync{ |
||||
Url: topicURL, |
||||
Body: string(body), |
||||
HttpMethod: "POST", |
||||
HttpHeader: map[string]string{ |
||||
"Content-Type": "application/vnd.kafka.json.v2+json", |
||||
"Accept": "application/vnd.kafka.v2+json", |
||||
}, |
||||
} |
||||
|
||||
if err := bus.DispatchCtx(ctx, cmd); err != nil { |
||||
kn.log.Error("Failed to send notification to Kafka", "error", err, "body", string(body)) |
||||
return false, err |
||||
} |
||||
|
||||
return true, nil |
||||
} |
||||
|
||||
func (kn *KafkaNotifier) SendResolved() bool { |
||||
return !kn.GetDisableResolveMessage() |
||||
} |
||||
@ -0,0 +1,155 @@ |
||||
package channels |
||||
|
||||
import ( |
||||
"context" |
||||
"net/url" |
||||
"testing" |
||||
|
||||
"github.com/prometheus/alertmanager/notify" |
||||
"github.com/prometheus/alertmanager/types" |
||||
"github.com/prometheus/common/model" |
||||
"github.com/stretchr/testify/require" |
||||
|
||||
"github.com/grafana/grafana/pkg/bus" |
||||
"github.com/grafana/grafana/pkg/components/simplejson" |
||||
"github.com/grafana/grafana/pkg/models" |
||||
"github.com/grafana/grafana/pkg/services/alerting" |
||||
) |
||||
|
||||
func TestKafkaNotifier(t *testing.T) { |
||||
tmpl := templateForTests(t) |
||||
|
||||
externalURL, err := url.Parse("http://localhost") |
||||
require.NoError(t, err) |
||||
tmpl.ExternalURL = externalURL |
||||
|
||||
cases := []struct { |
||||
name string |
||||
settings string |
||||
alerts []*types.Alert |
||||
expUrl, expMsg string |
||||
expInitError error |
||||
expMsgError error |
||||
}{ |
||||
{ |
||||
name: "One alert", |
||||
settings: `{ |
||||
"kafkaRestProxy": "http://localhost", |
||||
"kafkaTopic": "sometopic" |
||||
}`, |
||||
alerts: []*types.Alert{ |
||||
{ |
||||
Alert: model.Alert{ |
||||
Labels: model.LabelSet{"alertname": "alert1", "lbl1": "val1"}, |
||||
Annotations: model.LabelSet{"ann1": "annv1"}, |
||||
}, |
||||
}, |
||||
}, |
||||
expUrl: "http://localhost/topics/sometopic", |
||||
expMsg: `{ |
||||
"records": [ |
||||
{ |
||||
"value": { |
||||
"alert_state": "alerting", |
||||
"client": "Grafana", |
||||
"client_url": "http:/localhost/alerting/list", |
||||
"description": "[FIRING:1] (val1)", |
||||
"details": "\n**Firing**\nLabels:\n - alertname = alert1\n - lbl1 = val1\nAnnotations:\n - ann1 = annv1\nSource: \n\n\n\n\n", |
||||
"incident_key": "6e3538104c14b583da237e9693b76debbc17f0f8058ef20492e5853096cf8733" |
||||
} |
||||
} |
||||
] |
||||
}`, |
||||
expInitError: nil, |
||||
expMsgError: nil, |
||||
}, { |
||||
name: "Multiple alerts", |
||||
settings: `{ |
||||
"kafkaRestProxy": "http://localhost", |
||||
"kafkaTopic": "sometopic" |
||||
}`, |
||||
alerts: []*types.Alert{ |
||||
{ |
||||
Alert: model.Alert{ |
||||
Labels: model.LabelSet{"alertname": "alert1", "lbl1": "val1"}, |
||||
Annotations: model.LabelSet{"ann1": "annv1"}, |
||||
}, |
||||
}, { |
||||
Alert: model.Alert{ |
||||
Labels: model.LabelSet{"alertname": "alert1", "lbl1": "val2"}, |
||||
Annotations: model.LabelSet{"ann1": "annv2"}, |
||||
}, |
||||
}, |
||||
}, |
||||
expUrl: "http://localhost/topics/sometopic", |
||||
expMsg: `{ |
||||
"records": [ |
||||
{ |
||||
"value": { |
||||
"alert_state": "alerting", |
||||
"client": "Grafana", |
||||
"client_url": "http:/localhost/alerting/list", |
||||
"description": "[FIRING:2] ", |
||||
"details": "\n**Firing**\nLabels:\n - alertname = alert1\n - lbl1 = val1\nAnnotations:\n - ann1 = annv1\nSource: \nLabels:\n - alertname = alert1\n - lbl1 = val2\nAnnotations:\n - ann1 = annv2\nSource: \n\n\n\n\n", |
||||
"incident_key": "6e3538104c14b583da237e9693b76debbc17f0f8058ef20492e5853096cf8733" |
||||
} |
||||
} |
||||
] |
||||
}`, |
||||
expInitError: nil, |
||||
expMsgError: nil, |
||||
}, { |
||||
name: "Endpoint missing", |
||||
settings: `{"kafkaTopic": "sometopic"}`, |
||||
expInitError: alerting.ValidationError{Reason: "Could not find kafka rest proxy endpoint property in settings"}, |
||||
}, { |
||||
name: "Topic missing", |
||||
settings: `{"kafkaRestProxy": "http://localhost"}`, |
||||
expInitError: alerting.ValidationError{Reason: "Could not find kafka topic property in settings"}, |
||||
}, |
||||
} |
||||
|
||||
for _, c := range cases { |
||||
t.Run(c.name, func(t *testing.T) { |
||||
settingsJSON, err := simplejson.NewJson([]byte(c.settings)) |
||||
require.NoError(t, err) |
||||
|
||||
m := &NotificationChannelConfig{ |
||||
Name: "kafka_testing", |
||||
Type: "kafka", |
||||
Settings: settingsJSON, |
||||
} |
||||
|
||||
pn, err := NewKafkaNotifier(m, tmpl) |
||||
if c.expInitError != nil { |
||||
require.Error(t, err) |
||||
require.Equal(t, c.expInitError.Error(), err.Error()) |
||||
return |
||||
} |
||||
require.NoError(t, err) |
||||
|
||||
body := "" |
||||
actUrl := "" |
||||
bus.AddHandlerCtx("test", func(ctx context.Context, webhook *models.SendWebhookSync) error { |
||||
body = webhook.Body |
||||
actUrl = webhook.Url |
||||
return nil |
||||
}) |
||||
|
||||
ctx := notify.WithGroupKey(context.Background(), "alertname") |
||||
ctx = notify.WithGroupLabels(ctx, model.LabelSet{"alertname": ""}) |
||||
ok, err := pn.Notify(ctx, c.alerts...) |
||||
if c.expMsgError != nil { |
||||
require.False(t, ok) |
||||
require.Error(t, err) |
||||
require.Equal(t, c.expMsgError.Error(), err.Error()) |
||||
return |
||||
} |
||||
require.NoError(t, err) |
||||
require.True(t, ok) |
||||
|
||||
require.Equal(t, c.expUrl, actUrl) |
||||
require.JSONEq(t, c.expMsg, body) |
||||
}) |
||||
} |
||||
} |
||||
Loading…
Reference in new issue