|
|
|
@ -1,6 +1,6 @@ |
|
|
|
// THIS FILE IS COPIED FROM UPSTREAM
|
|
|
|
// THIS FILE IS COPIED FROM UPSTREAM
|
|
|
|
//
|
|
|
|
//
|
|
|
|
// https://github.com/prometheus/prometheus/blob/edfc3bcd025dd6fe296c167a14a216cab1e552ee/notifier/notifier.go
|
|
|
|
// https://github.com/prometheus/prometheus/blob/293f0c9185260165fd7dabbf8a9e8758b32abeae/notifier/notifier.go
|
|
|
|
//
|
|
|
|
//
|
|
|
|
// Copyright 2013 The Prometheus Authors
|
|
|
|
// Copyright 2013 The Prometheus Authors
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
@ -15,26 +15,30 @@ |
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
|
|
|
// limitations under the License.
|
|
|
|
|
|
|
|
|
|
|
|
// nolint
|
|
|
|
//nolint:all
|
|
|
|
package sender |
|
|
|
package sender |
|
|
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
import ( |
|
|
|
"context" |
|
|
|
"context" |
|
|
|
|
|
|
|
"crypto/md5" |
|
|
|
|
|
|
|
"encoding/hex" |
|
|
|
"fmt" |
|
|
|
"fmt" |
|
|
|
|
|
|
|
"log/slog" |
|
|
|
"net/http" |
|
|
|
"net/http" |
|
|
|
"net/url" |
|
|
|
"net/url" |
|
|
|
"path" |
|
|
|
"path" |
|
|
|
"sync" |
|
|
|
"sync" |
|
|
|
"time" |
|
|
|
"time" |
|
|
|
|
|
|
|
|
|
|
|
"github.com/go-kit/log" |
|
|
|
|
|
|
|
"github.com/go-kit/log/level" |
|
|
|
|
|
|
|
"github.com/go-openapi/strfmt" |
|
|
|
"github.com/go-openapi/strfmt" |
|
|
|
"github.com/prometheus/alertmanager/api/v2/models" |
|
|
|
"github.com/prometheus/alertmanager/api/v2/models" |
|
|
|
"github.com/prometheus/client_golang/prometheus" |
|
|
|
"github.com/prometheus/client_golang/prometheus" |
|
|
|
config_util "github.com/prometheus/common/config" |
|
|
|
config_util "github.com/prometheus/common/config" |
|
|
|
"github.com/prometheus/common/model" |
|
|
|
"github.com/prometheus/common/model" |
|
|
|
|
|
|
|
"github.com/prometheus/common/promslog" |
|
|
|
"github.com/prometheus/common/version" |
|
|
|
"github.com/prometheus/common/version" |
|
|
|
|
|
|
|
"github.com/prometheus/sigv4" |
|
|
|
|
|
|
|
"gopkg.in/yaml.v2" |
|
|
|
|
|
|
|
|
|
|
|
"github.com/prometheus/prometheus/config" |
|
|
|
"github.com/prometheus/prometheus/config" |
|
|
|
"github.com/prometheus/prometheus/discovery/targetgroup" |
|
|
|
"github.com/prometheus/prometheus/discovery/targetgroup" |
|
|
|
@ -53,7 +57,7 @@ const ( |
|
|
|
alertmanagerLabel = "alertmanager" |
|
|
|
alertmanagerLabel = "alertmanager" |
|
|
|
) |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
var userAgent = fmt.Sprintf("Prometheus/%s", version.Version) |
|
|
|
var userAgent = version.PrometheusUserAgent() |
|
|
|
|
|
|
|
|
|
|
|
// Alert is a generic representation of an alert in the Prometheus eco-system.
|
|
|
|
// Alert is a generic representation of an alert in the Prometheus eco-system.
|
|
|
|
type Alert struct { |
|
|
|
type Alert struct { |
|
|
|
@ -112,16 +116,18 @@ type Manager struct { |
|
|
|
|
|
|
|
|
|
|
|
more chan struct{} |
|
|
|
more chan struct{} |
|
|
|
mtx sync.RWMutex |
|
|
|
mtx sync.RWMutex |
|
|
|
ctx context.Context |
|
|
|
|
|
|
|
cancel func() |
|
|
|
stopOnce *sync.Once |
|
|
|
|
|
|
|
stopRequested chan struct{} |
|
|
|
|
|
|
|
|
|
|
|
alertmanagers map[string]*alertmanagerSet |
|
|
|
alertmanagers map[string]*alertmanagerSet |
|
|
|
logger log.Logger |
|
|
|
logger *slog.Logger |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Options are the configurable parameters of a Handler.
|
|
|
|
// Options are the configurable parameters of a Handler.
|
|
|
|
type Options struct { |
|
|
|
type Options struct { |
|
|
|
QueueCapacity int |
|
|
|
QueueCapacity int |
|
|
|
|
|
|
|
DrainOnShutdown bool |
|
|
|
ExternalLabels labels.Labels |
|
|
|
ExternalLabels labels.Labels |
|
|
|
RelabelConfigs []*relabel.Config |
|
|
|
RelabelConfigs []*relabel.Config |
|
|
|
// Used for sending HTTP requests to the Alertmanager.
|
|
|
|
// Used for sending HTTP requests to the Alertmanager.
|
|
|
|
@ -155,7 +161,7 @@ func newAlertMetrics(r prometheus.Registerer, queueCap int, queueLen, alertmanag |
|
|
|
Namespace: namespace, |
|
|
|
Namespace: namespace, |
|
|
|
Subsystem: subsystem, |
|
|
|
Subsystem: subsystem, |
|
|
|
Name: "errors_total", |
|
|
|
Name: "errors_total", |
|
|
|
Help: "Total number of errors sending alert notifications.", |
|
|
|
Help: "Total number of sent alerts affected by errors.", |
|
|
|
}, |
|
|
|
}, |
|
|
|
[]string{alertmanagerLabel}, |
|
|
|
[]string{alertmanagerLabel}, |
|
|
|
), |
|
|
|
), |
|
|
|
@ -216,21 +222,19 @@ func do(ctx context.Context, client *http.Client, req *http.Request) (*http.Resp |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// NewManager is the manager constructor.
|
|
|
|
// NewManager is the manager constructor.
|
|
|
|
func NewManager(o *Options, logger log.Logger) *Manager { |
|
|
|
func NewManager(o *Options, logger *slog.Logger) *Manager { |
|
|
|
ctx, cancel := context.WithCancel(context.Background()) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if o.Do == nil { |
|
|
|
if o.Do == nil { |
|
|
|
o.Do = do |
|
|
|
o.Do = do |
|
|
|
} |
|
|
|
} |
|
|
|
if logger == nil { |
|
|
|
if logger == nil { |
|
|
|
logger = log.NewNopLogger() |
|
|
|
logger = promslog.NewNopLogger() |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
n := &Manager{ |
|
|
|
n := &Manager{ |
|
|
|
queue: make([]*Alert, 0, o.QueueCapacity), |
|
|
|
queue: make([]*Alert, 0, o.QueueCapacity), |
|
|
|
ctx: ctx, |
|
|
|
|
|
|
|
cancel: cancel, |
|
|
|
|
|
|
|
more: make(chan struct{}, 1), |
|
|
|
more: make(chan struct{}, 1), |
|
|
|
|
|
|
|
stopRequested: make(chan struct{}), |
|
|
|
|
|
|
|
stopOnce: &sync.Once{}, |
|
|
|
opts: o, |
|
|
|
opts: o, |
|
|
|
logger: logger, |
|
|
|
logger: logger, |
|
|
|
} |
|
|
|
} |
|
|
|
@ -274,36 +278,98 @@ func (n *Manager) nextBatch() []*Alert { |
|
|
|
return alerts |
|
|
|
return alerts |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Run dispatches notifications continuously.
|
|
|
|
// Run dispatches notifications continuously, returning once Stop has been called and all
|
|
|
|
|
|
|
|
// pending notifications have been drained from the queue (if draining is enabled).
|
|
|
|
|
|
|
|
//
|
|
|
|
|
|
|
|
// Dispatching of notifications occurs in parallel to processing target updates to avoid one starving the other.
|
|
|
|
|
|
|
|
// Refer to https://github.com/prometheus/prometheus/issues/13676 for more details.
|
|
|
|
func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) { |
|
|
|
func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) { |
|
|
|
|
|
|
|
wg := sync.WaitGroup{} |
|
|
|
|
|
|
|
wg.Add(2) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
go func() { |
|
|
|
|
|
|
|
defer wg.Done() |
|
|
|
|
|
|
|
n.targetUpdateLoop(tsets) |
|
|
|
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
go func() { |
|
|
|
|
|
|
|
defer wg.Done() |
|
|
|
|
|
|
|
n.sendLoop() |
|
|
|
|
|
|
|
n.drainQueue() |
|
|
|
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
wg.Wait() |
|
|
|
|
|
|
|
n.logger.Info("Notification manager stopped") |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// sendLoop continuously consumes the notifications queue and sends alerts to
|
|
|
|
|
|
|
|
// the configured Alertmanagers.
|
|
|
|
|
|
|
|
func (n *Manager) sendLoop() { |
|
|
|
for { |
|
|
|
for { |
|
|
|
// The select is split in two parts, such as we will first try to read
|
|
|
|
// If we've been asked to stop, that takes priority over sending any further notifications.
|
|
|
|
// new alertmanager targets if they are available, before sending new
|
|
|
|
|
|
|
|
// alerts.
|
|
|
|
|
|
|
|
select { |
|
|
|
select { |
|
|
|
case <-n.ctx.Done(): |
|
|
|
case <-n.stopRequested: |
|
|
|
|
|
|
|
return |
|
|
|
|
|
|
|
default: |
|
|
|
|
|
|
|
select { |
|
|
|
|
|
|
|
case <-n.stopRequested: |
|
|
|
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
case <-n.more: |
|
|
|
|
|
|
|
n.sendOneBatch() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// If the queue still has items left, kick off the next iteration.
|
|
|
|
|
|
|
|
if n.queueLen() > 0 { |
|
|
|
|
|
|
|
n.setMore() |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// targetUpdateLoop receives updates of target groups and triggers a reload.
|
|
|
|
|
|
|
|
func (n *Manager) targetUpdateLoop(tsets <-chan map[string][]*targetgroup.Group) { |
|
|
|
|
|
|
|
for { |
|
|
|
|
|
|
|
// If we've been asked to stop, that takes priority over processing any further target group updates.
|
|
|
|
|
|
|
|
select { |
|
|
|
|
|
|
|
case <-n.stopRequested: |
|
|
|
return |
|
|
|
return |
|
|
|
case ts := <-tsets: |
|
|
|
|
|
|
|
n.reload(ts) |
|
|
|
|
|
|
|
default: |
|
|
|
default: |
|
|
|
select { |
|
|
|
select { |
|
|
|
case <-n.ctx.Done(): |
|
|
|
case <-n.stopRequested: |
|
|
|
return |
|
|
|
return |
|
|
|
case ts := <-tsets: |
|
|
|
case ts := <-tsets: |
|
|
|
n.reload(ts) |
|
|
|
n.reload(ts) |
|
|
|
case <-n.more: |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func (n *Manager) sendOneBatch() { |
|
|
|
alerts := n.nextBatch() |
|
|
|
alerts := n.nextBatch() |
|
|
|
|
|
|
|
|
|
|
|
if !n.sendAll(alerts...) { |
|
|
|
if !n.sendAll(alerts...) { |
|
|
|
n.metrics.dropped.Add(float64(len(alerts))) |
|
|
|
n.metrics.dropped.Add(float64(len(alerts))) |
|
|
|
} |
|
|
|
} |
|
|
|
// If the queue still has items left, kick off the next iteration.
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func (n *Manager) drainQueue() { |
|
|
|
|
|
|
|
if !n.opts.DrainOnShutdown { |
|
|
|
if n.queueLen() > 0 { |
|
|
|
if n.queueLen() > 0 { |
|
|
|
n.setMore() |
|
|
|
n.logger.Warn("Draining remaining notifications on shutdown is disabled, and some notifications have been dropped", "count", n.queueLen()) |
|
|
|
|
|
|
|
n.metrics.dropped.Add(float64(n.queueLen())) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
n.logger.Info("Draining any remaining notifications...") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for n.queueLen() > 0 { |
|
|
|
|
|
|
|
n.sendOneBatch() |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
n.logger.Info("Remaining notifications drained") |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (n *Manager) reload(tgs map[string][]*targetgroup.Group) { |
|
|
|
func (n *Manager) reload(tgs map[string][]*targetgroup.Group) { |
|
|
|
@ -313,7 +379,7 @@ func (n *Manager) reload(tgs map[string][]*targetgroup.Group) { |
|
|
|
for id, tgroup := range tgs { |
|
|
|
for id, tgroup := range tgs { |
|
|
|
am, ok := n.alertmanagers[id] |
|
|
|
am, ok := n.alertmanagers[id] |
|
|
|
if !ok { |
|
|
|
if !ok { |
|
|
|
level.Error(n.logger).Log("msg", "couldn't sync alert manager set", "err", fmt.Sprintf("invalid id:%v", id)) |
|
|
|
n.logger.Error("couldn't sync alert manager set", "err", fmt.Sprintf("invalid id:%v", id)) |
|
|
|
continue |
|
|
|
continue |
|
|
|
} |
|
|
|
} |
|
|
|
am.sync(tgroup) |
|
|
|
am.sync(tgroup) |
|
|
|
@ -326,20 +392,7 @@ func (n *Manager) Send(alerts ...*Alert) { |
|
|
|
n.mtx.Lock() |
|
|
|
n.mtx.Lock() |
|
|
|
defer n.mtx.Unlock() |
|
|
|
defer n.mtx.Unlock() |
|
|
|
|
|
|
|
|
|
|
|
// Attach external labels before relabelling and sending.
|
|
|
|
alerts = relabelAlerts(n.opts.RelabelConfigs, n.opts.ExternalLabels, alerts) |
|
|
|
for _, a := range alerts { |
|
|
|
|
|
|
|
lb := labels.NewBuilder(a.Labels) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
n.opts.ExternalLabels.Range(func(l labels.Label) { |
|
|
|
|
|
|
|
if a.Labels.Get(l.Name) == "" { |
|
|
|
|
|
|
|
lb.Set(l.Name, l.Value) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
a.Labels = lb.Labels() |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
alerts = n.relabelAlerts(alerts) |
|
|
|
|
|
|
|
if len(alerts) == 0 { |
|
|
|
if len(alerts) == 0 { |
|
|
|
return |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
@ -349,7 +402,7 @@ func (n *Manager) Send(alerts ...*Alert) { |
|
|
|
if d := len(alerts) - n.opts.QueueCapacity; d > 0 { |
|
|
|
if d := len(alerts) - n.opts.QueueCapacity; d > 0 { |
|
|
|
alerts = alerts[d:] |
|
|
|
alerts = alerts[d:] |
|
|
|
|
|
|
|
|
|
|
|
level.Warn(n.logger).Log("msg", "Alert batch larger than queue capacity, dropping alerts", "num_dropped", d) |
|
|
|
n.logger.Warn("Alert batch larger than queue capacity, dropping alerts", "num_dropped", d) |
|
|
|
n.metrics.dropped.Add(float64(d)) |
|
|
|
n.metrics.dropped.Add(float64(d)) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@ -358,7 +411,7 @@ func (n *Manager) Send(alerts ...*Alert) { |
|
|
|
if d := (len(n.queue) + len(alerts)) - n.opts.QueueCapacity; d > 0 { |
|
|
|
if d := (len(n.queue) + len(alerts)) - n.opts.QueueCapacity; d > 0 { |
|
|
|
n.queue = n.queue[d:] |
|
|
|
n.queue = n.queue[d:] |
|
|
|
|
|
|
|
|
|
|
|
level.Warn(n.logger).Log("msg", "Alert notification queue full, dropping alerts", "num_dropped", d) |
|
|
|
n.logger.Warn("Alert notification queue full, dropping alerts", "num_dropped", d) |
|
|
|
n.metrics.dropped.Add(float64(d)) |
|
|
|
n.metrics.dropped.Add(float64(d)) |
|
|
|
} |
|
|
|
} |
|
|
|
n.queue = append(n.queue, alerts...) |
|
|
|
n.queue = append(n.queue, alerts...) |
|
|
|
@ -367,15 +420,24 @@ func (n *Manager) Send(alerts ...*Alert) { |
|
|
|
n.setMore() |
|
|
|
n.setMore() |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (n *Manager) relabelAlerts(alerts []*Alert) []*Alert { |
|
|
|
func relabelAlerts(relabelConfigs []*relabel.Config, externalLabels labels.Labels, alerts []*Alert) []*Alert { |
|
|
|
|
|
|
|
lb := labels.NewBuilder(labels.EmptyLabels()) |
|
|
|
var relabeledAlerts []*Alert |
|
|
|
var relabeledAlerts []*Alert |
|
|
|
|
|
|
|
|
|
|
|
for _, alert := range alerts { |
|
|
|
for _, a := range alerts { |
|
|
|
labels, keep := relabel.Process(alert.Labels, n.opts.RelabelConfigs...) |
|
|
|
lb.Reset(a.Labels) |
|
|
|
if keep { |
|
|
|
externalLabels.Range(func(l labels.Label) { |
|
|
|
alert.Labels = labels |
|
|
|
if a.Labels.Get(l.Name) == "" { |
|
|
|
relabeledAlerts = append(relabeledAlerts, alert) |
|
|
|
lb.Set(l.Name, l.Value) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
keep := relabel.ProcessBuilder(lb, relabelConfigs...) |
|
|
|
|
|
|
|
if !keep { |
|
|
|
|
|
|
|
continue |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
a.Labels = lb.Labels() |
|
|
|
|
|
|
|
relabeledAlerts = append(relabeledAlerts, a) |
|
|
|
} |
|
|
|
} |
|
|
|
return relabeledAlerts |
|
|
|
return relabeledAlerts |
|
|
|
} |
|
|
|
} |
|
|
|
@ -456,10 +518,19 @@ func labelsToOpenAPILabelSet(modelLabelSet labels.Labels) models.LabelSet { |
|
|
|
return apiLabelSet |
|
|
|
return apiLabelSet |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Stop shuts down the notification handler.
|
|
|
|
// Stop signals the notification manager to shut down and immediately returns.
|
|
|
|
|
|
|
|
//
|
|
|
|
|
|
|
|
// Run will return once the notification manager has successfully shut down.
|
|
|
|
|
|
|
|
//
|
|
|
|
|
|
|
|
// The manager will optionally drain any queued notifications before shutting down.
|
|
|
|
|
|
|
|
//
|
|
|
|
|
|
|
|
// Stop is safe to call multiple times.
|
|
|
|
func (n *Manager) Stop() { |
|
|
|
func (n *Manager) Stop() { |
|
|
|
level.Info(n.logger).Log("msg", "Stopping notification manager...") |
|
|
|
n.logger.Info("Stopping notification manager...") |
|
|
|
n.cancel() |
|
|
|
|
|
|
|
|
|
|
|
n.stopOnce.Do(func() { |
|
|
|
|
|
|
|
close(n.stopRequested) |
|
|
|
|
|
|
|
}) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Alertmanager holds Alertmanager endpoint information.
|
|
|
|
// Alertmanager holds Alertmanager endpoint information.
|
|
|
|
@ -479,11 +550,22 @@ func (a alertmanagerLabels) url() *url.URL { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func newAlertmanagerSet(cfg *config.AlertmanagerConfig, logger log.Logger, metrics *alertMetrics) (*alertmanagerSet, error) { |
|
|
|
func newAlertmanagerSet(cfg *config.AlertmanagerConfig, logger *slog.Logger, metrics *alertMetrics) (*alertmanagerSet, error) { |
|
|
|
client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, "alertmanager") |
|
|
|
client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, "alertmanager") |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
return nil, err |
|
|
|
return nil, err |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
t := client.Transport |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if cfg.SigV4Config != nil { |
|
|
|
|
|
|
|
t, err = sigv4.NewSigV4RoundTripper(cfg.SigV4Config, client.Transport) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
|
|
|
|
return nil, err |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
client.Transport = t |
|
|
|
|
|
|
|
|
|
|
|
s := &alertmanagerSet{ |
|
|
|
s := &alertmanagerSet{ |
|
|
|
client: client, |
|
|
|
client: client, |
|
|
|
cfg: cfg, |
|
|
|
cfg: cfg, |
|
|
|
@ -502,7 +584,7 @@ func (s *alertmanagerSet) sync(tgs []*targetgroup.Group) { |
|
|
|
for _, tg := range tgs { |
|
|
|
for _, tg := range tgs { |
|
|
|
ams, droppedAms, err := AlertmanagerFromGroup(tg, s.cfg) |
|
|
|
ams, droppedAms, err := AlertmanagerFromGroup(tg, s.cfg) |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
level.Error(s.logger).Log("msg", "Creating discovered Alertmanagers failed", "err", err) |
|
|
|
s.logger.Error("Creating discovered Alertmanagers failed", "err", err) |
|
|
|
continue |
|
|
|
continue |
|
|
|
} |
|
|
|
} |
|
|
|
allAms = append(allAms, ams...) |
|
|
|
allAms = append(allAms, ams...) |
|
|
|
@ -511,6 +593,7 @@ func (s *alertmanagerSet) sync(tgs []*targetgroup.Group) { |
|
|
|
|
|
|
|
|
|
|
|
s.mtx.Lock() |
|
|
|
s.mtx.Lock() |
|
|
|
defer s.mtx.Unlock() |
|
|
|
defer s.mtx.Unlock() |
|
|
|
|
|
|
|
previousAms := s.ams |
|
|
|
// Set new Alertmanagers and deduplicate them along their unique URL.
|
|
|
|
// Set new Alertmanagers and deduplicate them along their unique URL.
|
|
|
|
s.ams = []alertmanager{} |
|
|
|
s.ams = []alertmanager{} |
|
|
|
s.droppedAms = []alertmanager{} |
|
|
|
s.droppedAms = []alertmanager{} |
|
|
|
@ -530,6 +613,26 @@ func (s *alertmanagerSet) sync(tgs []*targetgroup.Group) { |
|
|
|
seen[us] = struct{}{} |
|
|
|
seen[us] = struct{}{} |
|
|
|
s.ams = append(s.ams, am) |
|
|
|
s.ams = append(s.ams, am) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Now remove counters for any removed Alertmanagers.
|
|
|
|
|
|
|
|
for _, am := range previousAms { |
|
|
|
|
|
|
|
us := am.url().String() |
|
|
|
|
|
|
|
if _, ok := seen[us]; ok { |
|
|
|
|
|
|
|
continue |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
s.metrics.latency.DeleteLabelValues(us) |
|
|
|
|
|
|
|
s.metrics.sent.DeleteLabelValues(us) |
|
|
|
|
|
|
|
s.metrics.errors.DeleteLabelValues(us) |
|
|
|
|
|
|
|
seen[us] = struct{}{} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func (s *alertmanagerSet) configHash() (string, error) { |
|
|
|
|
|
|
|
b, err := yaml.Marshal(s.cfg) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
|
|
|
|
return "", err |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
hash := md5.Sum(b) |
|
|
|
|
|
|
|
return hex.EncodeToString(hash[:]), nil |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func postPath(pre string, v config.AlertmanagerAPIVersion) string { |
|
|
|
func postPath(pre string, v config.AlertmanagerAPIVersion) string { |
|
|
|
@ -540,38 +643,40 @@ func postPath(pre string, v config.AlertmanagerAPIVersion) string { |
|
|
|
// AlertmanagerFromGroup extracts a list of alertmanagers from a target group
|
|
|
|
// AlertmanagerFromGroup extracts a list of alertmanagers from a target group
|
|
|
|
// and an associated AlertmanagerConfig.
|
|
|
|
// and an associated AlertmanagerConfig.
|
|
|
|
func AlertmanagerFromGroup(tg *targetgroup.Group, cfg *config.AlertmanagerConfig) ([]alertmanager, []alertmanager, error) { |
|
|
|
func AlertmanagerFromGroup(tg *targetgroup.Group, cfg *config.AlertmanagerConfig) ([]alertmanager, []alertmanager, error) { |
|
|
|
res := make([]alertmanager, 0, len(tg.Targets)) |
|
|
|
var res []alertmanager |
|
|
|
var droppedAlertManagers []alertmanager |
|
|
|
var droppedAlertManagers []alertmanager |
|
|
|
|
|
|
|
lb := labels.NewBuilder(labels.EmptyLabels()) |
|
|
|
|
|
|
|
|
|
|
|
for _, tlset := range tg.Targets { |
|
|
|
for _, tlset := range tg.Targets { |
|
|
|
lbls := make([]labels.Label, 0, len(tlset)+2+len(tg.Labels)) |
|
|
|
lb.Reset(labels.EmptyLabels()) |
|
|
|
|
|
|
|
|
|
|
|
for ln, lv := range tlset { |
|
|
|
for ln, lv := range tlset { |
|
|
|
lbls = append(lbls, labels.Label{Name: string(ln), Value: string(lv)}) |
|
|
|
lb.Set(string(ln), string(lv)) |
|
|
|
} |
|
|
|
} |
|
|
|
// Set configured scheme as the initial scheme label for overwrite.
|
|
|
|
// Set configured scheme as the initial scheme label for overwrite.
|
|
|
|
lbls = append(lbls, labels.Label{Name: model.SchemeLabel, Value: cfg.Scheme}) |
|
|
|
lb.Set(model.SchemeLabel, cfg.Scheme) |
|
|
|
lbls = append(lbls, labels.Label{Name: pathLabel, Value: postPath(cfg.PathPrefix, cfg.APIVersion)}) |
|
|
|
lb.Set(pathLabel, postPath(cfg.PathPrefix, cfg.APIVersion)) |
|
|
|
|
|
|
|
|
|
|
|
// Combine target labels with target group labels.
|
|
|
|
// Combine target labels with target group labels.
|
|
|
|
for ln, lv := range tg.Labels { |
|
|
|
for ln, lv := range tg.Labels { |
|
|
|
if _, ok := tlset[ln]; !ok { |
|
|
|
if _, ok := tlset[ln]; !ok { |
|
|
|
lbls = append(lbls, labels.Label{Name: string(ln), Value: string(lv)}) |
|
|
|
lb.Set(string(ln), string(lv)) |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
lset, keep := relabel.Process(labels.New(lbls...), cfg.RelabelConfigs...) |
|
|
|
preRelabel := lb.Labels() |
|
|
|
|
|
|
|
keep := relabel.ProcessBuilder(lb, cfg.RelabelConfigs...) |
|
|
|
if !keep { |
|
|
|
if !keep { |
|
|
|
droppedAlertManagers = append(droppedAlertManagers, alertmanagerLabels{labels.New(lbls...)}) |
|
|
|
droppedAlertManagers = append(droppedAlertManagers, alertmanagerLabels{preRelabel}) |
|
|
|
continue |
|
|
|
continue |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
addr := lset.Get(model.AddressLabel) |
|
|
|
addr := lb.Get(model.AddressLabel) |
|
|
|
if err := config.CheckTargetAddress(model.LabelValue(addr)); err != nil { |
|
|
|
if err := config.CheckTargetAddress(model.LabelValue(addr)); err != nil { |
|
|
|
return nil, nil, err |
|
|
|
return nil, nil, err |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
res = append(res, alertmanagerLabels{lset}) |
|
|
|
res = append(res, alertmanagerLabels{lb.Labels()}) |
|
|
|
} |
|
|
|
} |
|
|
|
return res, droppedAlertManagers, nil |
|
|
|
return res, droppedAlertManagers, nil |
|
|
|
} |
|
|
|
} |
|
|
|
|