|
|
|
@ -38,9 +38,9 @@ func NewKafkaNotifier(model *m.AlertNotification) (alerting.Notifier, error) { |
|
|
|
|
return nil, alerting.ValidationError{Reason: "Could not find kafka rest proxy endpoint property in settings"} |
|
|
|
|
} |
|
|
|
|
topic := model.Settings.Get("kafkaTopic").MustString() |
|
|
|
|
if topic == "" { |
|
|
|
|
return nil, alerting.ValidationError{Reason: "Could not find kafka topic property in settings"} |
|
|
|
|
} |
|
|
|
|
if topic == "" { |
|
|
|
|
return nil, alerting.ValidationError{Reason: "Could not find kafka topic property in settings"} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return &KafkaNotifier{ |
|
|
|
|
NotifierBase: NewNotifierBase(model.Id, model.IsDefault, model.Name, model.Type, model.Settings), |
|
|
|
@ -52,9 +52,9 @@ func NewKafkaNotifier(model *m.AlertNotification) (alerting.Notifier, error) { |
|
|
|
|
|
|
|
|
|
type KafkaNotifier struct { |
|
|
|
|
NotifierBase |
|
|
|
|
Endpoint string |
|
|
|
|
Topic string |
|
|
|
|
log log.Logger |
|
|
|
|
Endpoint string |
|
|
|
|
Topic string |
|
|
|
|
log log.Logger |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (this *KafkaNotifier) Notify(evalContext *alerting.EvalContext) error { |
|
|
|
@ -99,16 +99,16 @@ func (this *KafkaNotifier) Notify(evalContext *alerting.EvalContext) error { |
|
|
|
|
recordJSON.Set("records", records) |
|
|
|
|
body, _ := recordJSON.MarshalJSON() |
|
|
|
|
|
|
|
|
|
topicUrl := this.Endpoint+"/topics/"+this.Topic |
|
|
|
|
topicUrl := this.Endpoint + "/topics/" + this.Topic |
|
|
|
|
|
|
|
|
|
cmd := &m.SendWebhookSync{ |
|
|
|
|
Url: topicUrl, |
|
|
|
|
Body: string(body), |
|
|
|
|
HttpMethod: "POST", |
|
|
|
|
HttpHeader: map[string]string{ |
|
|
|
|
"Content-Type": "application/vnd.kafka.json.v2+json", |
|
|
|
|
"Accept": "application/vnd.kafka.v2+json", |
|
|
|
|
}, |
|
|
|
|
"Content-Type": "application/vnd.kafka.json.v2+json", |
|
|
|
|
"Accept": "application/vnd.kafka.v2+json", |
|
|
|
|
}, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if err := bus.DispatchCtx(evalContext.Ctx, cmd); err != nil { |
|
|
|
|