diff --git a/go.mod b/go.mod index b1b3d4d6fb6..490c7e7212c 100644 --- a/go.mod +++ b/go.mod @@ -197,6 +197,7 @@ require ( google.golang.org/protobuf v1.36.6 // @grafana/plugins-platform-backend gopkg.in/ini.v1 v1.67.0 // @grafana/alerting-backend gopkg.in/mail.v2 v2.3.1 // @grafana/grafana-backend-group + gopkg.in/yaml.v2 v2.4.0 // @grafana/alerting-backend gopkg.in/yaml.v3 v3.0.1 // @grafana/alerting-backend k8s.io/api v0.32.3 // @grafana/grafana-app-platform-squad k8s.io/apimachinery v0.32.3 // @grafana/grafana-app-platform-squad @@ -493,7 +494,7 @@ require ( github.com/prometheus/common/sigv4 v0.1.0 // indirect github.com/prometheus/exporter-toolkit v0.13.2 // indirect github.com/prometheus/procfs v0.15.1 // indirect - github.com/prometheus/sigv4 v0.1.0 // indirect + github.com/prometheus/sigv4 v0.1.0 // @grafana/alerting-backend github.com/protocolbuffers/txtpbfmt v0.0.0-20241112170944-20d2c9ebc01d // indirect github.com/puzpuzpuz/xsync/v2 v2.5.1 // indirect github.com/redis/rueidis v1.0.53 // indirect @@ -569,7 +570,6 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/src-d/go-errors.v1 v1.0.0 // indirect gopkg.in/warnings.v0 v0.1.2 // indirect - gopkg.in/yaml.v2 v2.4.0 // indirect k8s.io/apiextensions-apiserver v0.32.3 // indirect k8s.io/kms v0.32.3 // indirect modernc.org/libc v1.61.13 // indirect diff --git a/pkg/services/ngalert/sender/notifier.go b/pkg/services/ngalert/sender/notifier.go index 0e901a8608d..dbf2149f4e7 100644 --- a/pkg/services/ngalert/sender/notifier.go +++ b/pkg/services/ngalert/sender/notifier.go @@ -1,6 +1,6 @@ // THIS FILE IS COPIED FROM UPSTREAM // -// https://github.com/prometheus/prometheus/blob/edfc3bcd025dd6fe296c167a14a216cab1e552ee/notifier/notifier.go +// https://github.com/prometheus/prometheus/blob/293f0c9185260165fd7dabbf8a9e8758b32abeae/notifier/notifier.go // // Copyright 2013 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); @@ -15,26 +15,30 @@ // See the License for the specific language governing permissions and // limitations under the License. -// nolint +//nolint:all package sender import ( "context" + "crypto/md5" + "encoding/hex" "fmt" + "log/slog" "net/http" "net/url" "path" "sync" "time" - "github.com/go-kit/log" - "github.com/go-kit/log/level" "github.com/go-openapi/strfmt" "github.com/prometheus/alertmanager/api/v2/models" "github.com/prometheus/client_golang/prometheus" config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" + "github.com/prometheus/common/promslog" "github.com/prometheus/common/version" + "github.com/prometheus/sigv4" + "gopkg.in/yaml.v2" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery/targetgroup" @@ -53,7 +57,7 @@ const ( alertmanagerLabel = "alertmanager" ) -var userAgent = fmt.Sprintf("Prometheus/%s", version.Version) +var userAgent = version.PrometheusUserAgent() // Alert is a generic representation of an alert in the Prometheus eco-system. type Alert struct { @@ -110,20 +114,22 @@ type Manager struct { metrics *alertMetrics - more chan struct{} - mtx sync.RWMutex - ctx context.Context - cancel func() + more chan struct{} + mtx sync.RWMutex + + stopOnce *sync.Once + stopRequested chan struct{} alertmanagers map[string]*alertmanagerSet - logger log.Logger + logger *slog.Logger } // Options are the configurable parameters of a Handler. type Options struct { - QueueCapacity int - ExternalLabels labels.Labels - RelabelConfigs []*relabel.Config + QueueCapacity int + DrainOnShutdown bool + ExternalLabels labels.Labels + RelabelConfigs []*relabel.Config // Used for sending HTTP requests to the Alertmanager. Do func(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) @@ -155,7 +161,7 @@ func newAlertMetrics(r prometheus.Registerer, queueCap int, queueLen, alertmanag Namespace: namespace, Subsystem: subsystem, Name: "errors_total", - Help: "Total number of errors sending alert notifications.", + Help: "Total number of sent alerts affected by errors.", }, []string{alertmanagerLabel}, ), @@ -216,23 +222,21 @@ func do(ctx context.Context, client *http.Client, req *http.Request) (*http.Resp } // NewManager is the manager constructor. -func NewManager(o *Options, logger log.Logger) *Manager { - ctx, cancel := context.WithCancel(context.Background()) - +func NewManager(o *Options, logger *slog.Logger) *Manager { if o.Do == nil { o.Do = do } if logger == nil { - logger = log.NewNopLogger() + logger = promslog.NewNopLogger() } n := &Manager{ - queue: make([]*Alert, 0, o.QueueCapacity), - ctx: ctx, - cancel: cancel, - more: make(chan struct{}, 1), - opts: o, - logger: logger, + queue: make([]*Alert, 0, o.QueueCapacity), + more: make(chan struct{}, 1), + stopRequested: make(chan struct{}), + stopOnce: &sync.Once{}, + opts: o, + logger: logger, } queueLenFunc := func() float64 { return float64(n.queueLen()) } @@ -274,36 +278,98 @@ func (n *Manager) nextBatch() []*Alert { return alerts } -// Run dispatches notifications continuously. +// Run dispatches notifications continuously, returning once Stop has been called and all +// pending notifications have been drained from the queue (if draining is enabled). +// +// Dispatching of notifications occurs in parallel to processing target updates to avoid one starving the other. +// Refer to https://github.com/prometheus/prometheus/issues/13676 for more details. func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) { + wg := sync.WaitGroup{} + wg.Add(2) + + go func() { + defer wg.Done() + n.targetUpdateLoop(tsets) + }() + + go func() { + defer wg.Done() + n.sendLoop() + n.drainQueue() + }() + + wg.Wait() + n.logger.Info("Notification manager stopped") +} + +// sendLoop continuously consumes the notifications queue and sends alerts to +// the configured Alertmanagers. +func (n *Manager) sendLoop() { for { - // The select is split in two parts, such as we will first try to read - // new alertmanager targets if they are available, before sending new - // alerts. + // If we've been asked to stop, that takes priority over sending any further notifications. select { - case <-n.ctx.Done(): + case <-n.stopRequested: return - case ts := <-tsets: - n.reload(ts) default: select { - case <-n.ctx.Done(): + case <-n.stopRequested: return - case ts := <-tsets: - n.reload(ts) + case <-n.more: + n.sendOneBatch() + + // If the queue still has items left, kick off the next iteration. + if n.queueLen() > 0 { + n.setMore() + } } } - alerts := n.nextBatch() + } +} - if !n.sendAll(alerts...) { - n.metrics.dropped.Add(float64(len(alerts))) +// targetUpdateLoop receives updates of target groups and triggers a reload. +func (n *Manager) targetUpdateLoop(tsets <-chan map[string][]*targetgroup.Group) { + for { + // If we've been asked to stop, that takes priority over processing any further target group updates. + select { + case <-n.stopRequested: + return + default: + select { + case <-n.stopRequested: + return + case ts := <-tsets: + n.reload(ts) + } } - // If the queue still has items left, kick off the next iteration. + } +} + +func (n *Manager) sendOneBatch() { + alerts := n.nextBatch() + + if !n.sendAll(alerts...) { + n.metrics.dropped.Add(float64(len(alerts))) + } +} + +func (n *Manager) drainQueue() { + if !n.opts.DrainOnShutdown { if n.queueLen() > 0 { - n.setMore() + n.logger.Warn("Draining remaining notifications on shutdown is disabled, and some notifications have been dropped", "count", n.queueLen()) + n.metrics.dropped.Add(float64(n.queueLen())) } + + return } + + n.logger.Info("Draining any remaining notifications...") + + for n.queueLen() > 0 { + n.sendOneBatch() + } + + n.logger.Info("Remaining notifications drained") } func (n *Manager) reload(tgs map[string][]*targetgroup.Group) { @@ -313,7 +379,7 @@ func (n *Manager) reload(tgs map[string][]*targetgroup.Group) { for id, tgroup := range tgs { am, ok := n.alertmanagers[id] if !ok { - level.Error(n.logger).Log("msg", "couldn't sync alert manager set", "err", fmt.Sprintf("invalid id:%v", id)) + n.logger.Error("couldn't sync alert manager set", "err", fmt.Sprintf("invalid id:%v", id)) continue } am.sync(tgroup) @@ -326,20 +392,7 @@ func (n *Manager) Send(alerts ...*Alert) { n.mtx.Lock() defer n.mtx.Unlock() - // Attach external labels before relabelling and sending. - for _, a := range alerts { - lb := labels.NewBuilder(a.Labels) - - n.opts.ExternalLabels.Range(func(l labels.Label) { - if a.Labels.Get(l.Name) == "" { - lb.Set(l.Name, l.Value) - } - }) - - a.Labels = lb.Labels() - } - - alerts = n.relabelAlerts(alerts) + alerts = relabelAlerts(n.opts.RelabelConfigs, n.opts.ExternalLabels, alerts) if len(alerts) == 0 { return } @@ -349,7 +402,7 @@ func (n *Manager) Send(alerts ...*Alert) { if d := len(alerts) - n.opts.QueueCapacity; d > 0 { alerts = alerts[d:] - level.Warn(n.logger).Log("msg", "Alert batch larger than queue capacity, dropping alerts", "num_dropped", d) + n.logger.Warn("Alert batch larger than queue capacity, dropping alerts", "num_dropped", d) n.metrics.dropped.Add(float64(d)) } @@ -358,7 +411,7 @@ func (n *Manager) Send(alerts ...*Alert) { if d := (len(n.queue) + len(alerts)) - n.opts.QueueCapacity; d > 0 { n.queue = n.queue[d:] - level.Warn(n.logger).Log("msg", "Alert notification queue full, dropping alerts", "num_dropped", d) + n.logger.Warn("Alert notification queue full, dropping alerts", "num_dropped", d) n.metrics.dropped.Add(float64(d)) } n.queue = append(n.queue, alerts...) @@ -367,15 +420,24 @@ func (n *Manager) Send(alerts ...*Alert) { n.setMore() } -func (n *Manager) relabelAlerts(alerts []*Alert) []*Alert { +func relabelAlerts(relabelConfigs []*relabel.Config, externalLabels labels.Labels, alerts []*Alert) []*Alert { + lb := labels.NewBuilder(labels.EmptyLabels()) var relabeledAlerts []*Alert - for _, alert := range alerts { - labels, keep := relabel.Process(alert.Labels, n.opts.RelabelConfigs...) - if keep { - alert.Labels = labels - relabeledAlerts = append(relabeledAlerts, alert) + for _, a := range alerts { + lb.Reset(a.Labels) + externalLabels.Range(func(l labels.Label) { + if a.Labels.Get(l.Name) == "" { + lb.Set(l.Name, l.Value) + } + }) + + keep := relabel.ProcessBuilder(lb, relabelConfigs...) + if !keep { + continue } + a.Labels = lb.Labels() + relabeledAlerts = append(relabeledAlerts, a) } return relabeledAlerts } @@ -456,10 +518,19 @@ func labelsToOpenAPILabelSet(modelLabelSet labels.Labels) models.LabelSet { return apiLabelSet } -// Stop shuts down the notification handler. +// Stop signals the notification manager to shut down and immediately returns. +// +// Run will return once the notification manager has successfully shut down. +// +// The manager will optionally drain any queued notifications before shutting down. +// +// Stop is safe to call multiple times. func (n *Manager) Stop() { - level.Info(n.logger).Log("msg", "Stopping notification manager...") - n.cancel() + n.logger.Info("Stopping notification manager...") + + n.stopOnce.Do(func() { + close(n.stopRequested) + }) } // Alertmanager holds Alertmanager endpoint information. @@ -479,11 +550,22 @@ func (a alertmanagerLabels) url() *url.URL { } } -func newAlertmanagerSet(cfg *config.AlertmanagerConfig, logger log.Logger, metrics *alertMetrics) (*alertmanagerSet, error) { +func newAlertmanagerSet(cfg *config.AlertmanagerConfig, logger *slog.Logger, metrics *alertMetrics) (*alertmanagerSet, error) { client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, "alertmanager") if err != nil { return nil, err } + t := client.Transport + + if cfg.SigV4Config != nil { + t, err = sigv4.NewSigV4RoundTripper(cfg.SigV4Config, client.Transport) + if err != nil { + return nil, err + } + } + + client.Transport = t + s := &alertmanagerSet{ client: client, cfg: cfg, @@ -502,7 +584,7 @@ func (s *alertmanagerSet) sync(tgs []*targetgroup.Group) { for _, tg := range tgs { ams, droppedAms, err := AlertmanagerFromGroup(tg, s.cfg) if err != nil { - level.Error(s.logger).Log("msg", "Creating discovered Alertmanagers failed", "err", err) + s.logger.Error("Creating discovered Alertmanagers failed", "err", err) continue } allAms = append(allAms, ams...) @@ -511,6 +593,7 @@ func (s *alertmanagerSet) sync(tgs []*targetgroup.Group) { s.mtx.Lock() defer s.mtx.Unlock() + previousAms := s.ams // Set new Alertmanagers and deduplicate them along their unique URL. s.ams = []alertmanager{} s.droppedAms = []alertmanager{} @@ -530,6 +613,26 @@ func (s *alertmanagerSet) sync(tgs []*targetgroup.Group) { seen[us] = struct{}{} s.ams = append(s.ams, am) } + // Now remove counters for any removed Alertmanagers. + for _, am := range previousAms { + us := am.url().String() + if _, ok := seen[us]; ok { + continue + } + s.metrics.latency.DeleteLabelValues(us) + s.metrics.sent.DeleteLabelValues(us) + s.metrics.errors.DeleteLabelValues(us) + seen[us] = struct{}{} + } +} + +func (s *alertmanagerSet) configHash() (string, error) { + b, err := yaml.Marshal(s.cfg) + if err != nil { + return "", err + } + hash := md5.Sum(b) + return hex.EncodeToString(hash[:]), nil } func postPath(pre string, v config.AlertmanagerAPIVersion) string { @@ -540,38 +643,40 @@ func postPath(pre string, v config.AlertmanagerAPIVersion) string { // AlertmanagerFromGroup extracts a list of alertmanagers from a target group // and an associated AlertmanagerConfig. func AlertmanagerFromGroup(tg *targetgroup.Group, cfg *config.AlertmanagerConfig) ([]alertmanager, []alertmanager, error) { - res := make([]alertmanager, 0, len(tg.Targets)) + var res []alertmanager var droppedAlertManagers []alertmanager + lb := labels.NewBuilder(labels.EmptyLabels()) for _, tlset := range tg.Targets { - lbls := make([]labels.Label, 0, len(tlset)+2+len(tg.Labels)) + lb.Reset(labels.EmptyLabels()) for ln, lv := range tlset { - lbls = append(lbls, labels.Label{Name: string(ln), Value: string(lv)}) + lb.Set(string(ln), string(lv)) } // Set configured scheme as the initial scheme label for overwrite. - lbls = append(lbls, labels.Label{Name: model.SchemeLabel, Value: cfg.Scheme}) - lbls = append(lbls, labels.Label{Name: pathLabel, Value: postPath(cfg.PathPrefix, cfg.APIVersion)}) + lb.Set(model.SchemeLabel, cfg.Scheme) + lb.Set(pathLabel, postPath(cfg.PathPrefix, cfg.APIVersion)) // Combine target labels with target group labels. for ln, lv := range tg.Labels { if _, ok := tlset[ln]; !ok { - lbls = append(lbls, labels.Label{Name: string(ln), Value: string(lv)}) + lb.Set(string(ln), string(lv)) } } - lset, keep := relabel.Process(labels.New(lbls...), cfg.RelabelConfigs...) + preRelabel := lb.Labels() + keep := relabel.ProcessBuilder(lb, cfg.RelabelConfigs...) if !keep { - droppedAlertManagers = append(droppedAlertManagers, alertmanagerLabels{labels.New(lbls...)}) + droppedAlertManagers = append(droppedAlertManagers, alertmanagerLabels{preRelabel}) continue } - addr := lset.Get(model.AddressLabel) + addr := lb.Get(model.AddressLabel) if err := config.CheckTargetAddress(model.LabelValue(addr)); err != nil { return nil, nil, err } - res = append(res, alertmanagerLabels{lset}) + res = append(res, alertmanagerLabels{lb.Labels()}) } return res, droppedAlertManagers, nil } diff --git a/pkg/services/ngalert/sender/notifier_ext.go b/pkg/services/ngalert/sender/notifier_ext.go index c1ef0f721fc..348189eaf34 100644 --- a/pkg/services/ngalert/sender/notifier_ext.go +++ b/pkg/services/ngalert/sender/notifier_ext.go @@ -10,17 +10,15 @@ import ( "bytes" "context" "encoding/json" - "errors" "fmt" "io" + "log/slog" "net/http" "sync" "time" - "github.com/go-kit/log" - "github.com/go-kit/log/level" "github.com/prometheus/prometheus/config" - "go.uber.org/atomic" + "github.com/prometheus/prometheus/model/labels" ) // ApplyConfig updates the status state as the new config requires. @@ -33,12 +31,33 @@ func (n *Manager) ApplyConfig(conf *config.Config, headers map[string]http.Heade n.opts.RelabelConfigs = conf.AlertingConfig.AlertRelabelConfigs amSets := make(map[string]*alertmanagerSet) + // configToAlertmanagers maps alertmanager sets for each unique AlertmanagerConfig, + // helping to avoid dropping known alertmanagers and re-use them without waiting for SD updates when applying the config. + configToAlertmanagers := make(map[string]*alertmanagerSet, len(n.alertmanagers)) + for _, oldAmSet := range n.alertmanagers { + hash, err := oldAmSet.configHash() + if err != nil { + return err + } + configToAlertmanagers[hash] = oldAmSet + } for k, cfg := range conf.AlertingConfig.AlertmanagerConfigs.ToMap() { ams, err := newAlertmanagerSet(cfg, n.logger, n.metrics) if err != nil { return err } + + hash, err := ams.configHash() + if err != nil { + return err + } + + if oldAmSet, ok := configToAlertmanagers[hash]; ok { + ams.ams = oldAmSet.ams + ams.droppedAms = oldAmSet.droppedAms + } + // Extension: set the headers to the alertmanager set. if headers, ok := headers[k]; ok { ams.headers = headers @@ -65,11 +84,12 @@ type alertmanagerSet struct { mtx sync.RWMutex ams []alertmanager droppedAms []alertmanager - logger log.Logger + logger *slog.Logger } // sendAll sends the alerts to all configured Alertmanagers concurrently. // It returns true if the alerts could be sent successfully to at least one Alertmanager. +// Extension: passing headers from each ams to sendOne func (n *Manager) sendAll(alerts ...*Alert) bool { if len(alerts) == 0 { return true @@ -77,60 +97,63 @@ func (n *Manager) sendAll(alerts ...*Alert) bool { begin := time.Now() - // v1Payload and v2Payload represent 'alerts' marshaled for Alertmanager API - // v1 or v2. Marshaling happens below. Reference here is for caching between + // cachedPayload represent 'alerts' marshaled for Alertmanager API v2. + // Marshaling happens below. Reference here is for caching between // for loop iterations. - var v1Payload, v2Payload []byte + var cachedPayload []byte n.mtx.RLock() amSets := n.alertmanagers n.mtx.RUnlock() var ( - wg sync.WaitGroup - numSuccess atomic.Uint64 + wg sync.WaitGroup + amSetCovered sync.Map ) - for _, ams := range amSets { + for k, ams := range amSets { var ( - payload []byte - err error + payload []byte + err error + amAlerts = alerts ) ams.mtx.RLock() - switch ams.cfg.APIVersion { - case config.AlertmanagerAPIVersionV1: - { - if v1Payload == nil { - v1Payload, err = json.Marshal(alerts) - if err != nil { - level.Error(n.logger).Log("msg", "Encoding alerts for Alertmanager API v1 failed", "err", err) - ams.mtx.RUnlock() - return false - } - } + if len(ams.ams) == 0 { + ams.mtx.RUnlock() + continue + } - payload = v1Payload + if len(ams.cfg.AlertRelabelConfigs) > 0 { + amAlerts = relabelAlerts(ams.cfg.AlertRelabelConfigs, labels.Labels{}, alerts) + if len(amAlerts) == 0 { + ams.mtx.RUnlock() + continue } + // We can't use the cached values from previous iteration. + cachedPayload = nil + } + + switch ams.cfg.APIVersion { case config.AlertmanagerAPIVersionV2: { - if v2Payload == nil { - openAPIAlerts := alertsToOpenAPIAlerts(alerts) + if cachedPayload == nil { + openAPIAlerts := alertsToOpenAPIAlerts(amAlerts) - v2Payload, err = json.Marshal(openAPIAlerts) + cachedPayload, err = json.Marshal(openAPIAlerts) if err != nil { - level.Error(n.logger).Log("msg", "Encoding alerts for Alertmanager API v2 failed", "err", err) + n.logger.Error("Encoding alerts for Alertmanager API v2 failed", "err", err) ams.mtx.RUnlock() return false } } - payload = v2Payload + payload = cachedPayload } default: { - level.Error(n.logger).Log( - "msg", fmt.Sprintf("Invalid Alertmanager API version '%v', expected one of '%v'", ams.cfg.APIVersion, config.SupportedAlertmanagerAPIVersions), + n.logger.Error( + fmt.Sprintf("Invalid Alertmanager API version '%v', expected one of '%v'", ams.cfg.APIVersion, config.SupportedAlertmanagerAPIVersions), "err", err, ) ams.mtx.RUnlock() @@ -138,26 +161,34 @@ func (n *Manager) sendAll(alerts ...*Alert) bool { } } + if len(ams.cfg.AlertRelabelConfigs) > 0 { + // We can't use the cached values on the next iteration. + cachedPayload = nil + } + + // Being here means len(ams.ams) > 0 + amSetCovered.Store(k, false) for _, am := range ams.ams { wg.Add(1) - ctx, cancel := context.WithTimeout(n.ctx, time.Duration(ams.cfg.Timeout)) + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(ams.cfg.Timeout)) defer cancel() // Extension: added headers parameter. - go func(client *http.Client, url string, headers http.Header) { - // Treat cancellations as a success, so that we don't increment error or dropped counters. - if err := n.sendOne(ctx, client, url, payload, headers); err != nil && !errors.Is(err, context.Canceled) { - level.Error(n.logger).Log("alertmanager", url, "count", len(alerts), "msg", "Error sending alert", "err", err) - n.metrics.errors.WithLabelValues(url).Inc() + go func(ctx context.Context, k string, client *http.Client, url string, payload []byte, count int, headers http.Header) { + err := n.sendOne(ctx, client, url, payload, headers) + if err != nil { + n.logger.Error("Error sending alerts", "alertmanager", url, "count", count, "err", err) + n.metrics.errors.WithLabelValues(url).Add(float64(count)) } else { - numSuccess.Inc() + amSetCovered.CompareAndSwap(k, false, true) } + n.metrics.latency.WithLabelValues(url).Observe(time.Since(begin).Seconds()) - n.metrics.sent.WithLabelValues(url).Add(float64(len(alerts))) + n.metrics.sent.WithLabelValues(url).Add(float64(count)) wg.Done() - }(ams.client, am.url().String(), ams.headers) + }(ctx, k, ams.client, am.url().String(), payload, len(amAlerts), ams.headers) } ams.mtx.RUnlock() @@ -165,12 +196,23 @@ func (n *Manager) sendAll(alerts ...*Alert) bool { wg.Wait() - return numSuccess.Load() > 0 + // Return false if there are any sets which were attempted (e.g. not filtered + // out) but have no successes. + allAmSetsCovered := true + amSetCovered.Range(func(_, value any) bool { + if !value.(bool) { + allAmSetsCovered = false + return false + } + return true + }) + + return allAmSetsCovered } // Extension: added headers parameter. func (n *Manager) sendOne(ctx context.Context, c *http.Client, url string, b []byte, headers http.Header) error { - req, err := http.NewRequest("POST", url, bytes.NewReader(b)) + req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(b)) if err != nil { return err } diff --git a/pkg/services/ngalert/sender/notifier_test.go b/pkg/services/ngalert/sender/notifier_test.go new file mode 100644 index 00000000000..c36ec85e482 --- /dev/null +++ b/pkg/services/ngalert/sender/notifier_test.go @@ -0,0 +1,1171 @@ +// THIS FILE IS COPIED FROM UPSTREAM +// +// https://github.com/prometheus/prometheus/blob/293f0c9185260165fd7dabbf8a9e8758b32abeae/notifier/notifier_test.go +// +// Copyright 2013 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//nolint:all +package sender + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/http/httptest" + "net/url" + "strconv" + "testing" + "time" + + "github.com/prometheus/alertmanager/api/v2/models" + "github.com/prometheus/client_golang/prometheus" + config_util "github.com/prometheus/common/config" + "github.com/prometheus/common/model" + "github.com/prometheus/common/promslog" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" + "gopkg.in/yaml.v2" + + "github.com/prometheus/prometheus/discovery" + + "github.com/prometheus/prometheus/config" + _ "github.com/prometheus/prometheus/discovery/file" + "github.com/prometheus/prometheus/discovery/targetgroup" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/relabel" +) + +func TestPostPath(t *testing.T) { + cases := []struct { + in, out string + }{ + { + in: "", + out: "/api/v2/alerts", + }, + { + in: "/", + out: "/api/v2/alerts", + }, + { + in: "/prefix", + out: "/prefix/api/v2/alerts", + }, + { + in: "/prefix//", + out: "/prefix/api/v2/alerts", + }, + { + in: "prefix//", + out: "/prefix/api/v2/alerts", + }, + } + for _, c := range cases { + require.Equal(t, c.out, postPath(c.in, config.AlertmanagerAPIVersionV2)) + } +} + +func TestHandlerNextBatch(t *testing.T) { + h := NewManager(&Options{}, nil) + + for i := range make([]struct{}, 2*maxBatchSize+1) { + h.queue = append(h.queue, &Alert{ + Labels: labels.FromStrings("alertname", strconv.Itoa(i)), + }) + } + + expected := append([]*Alert{}, h.queue...) + + require.NoError(t, alertsEqual(expected[0:maxBatchSize], h.nextBatch())) + require.NoError(t, alertsEqual(expected[maxBatchSize:2*maxBatchSize], h.nextBatch())) + require.NoError(t, alertsEqual(expected[2*maxBatchSize:], h.nextBatch())) + require.Empty(t, h.queue, "Expected queue to be empty but got %d alerts", len(h.queue)) +} + +func alertsEqual(a, b []*Alert) error { + if len(a) != len(b) { + return fmt.Errorf("length mismatch: %v != %v", a, b) + } + for i, alert := range a { + if !labels.Equal(alert.Labels, b[i].Labels) { + return fmt.Errorf("label mismatch at index %d: %s != %s", i, alert.Labels, b[i].Labels) + } + } + return nil +} + +func newTestHTTPServerBuilder(expected *[]*Alert, errc chan<- error, u, p string, status *atomic.Int32) *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var err error + defer func() { + if err == nil { + return + } + select { + case errc <- err: + default: + } + }() + user, pass, _ := r.BasicAuth() + if user != u || pass != p { + err = fmt.Errorf("unexpected user/password: %s/%s != %s/%s", user, pass, u, p) + w.WriteHeader(http.StatusInternalServerError) + return + } + + b, err := io.ReadAll(r.Body) + if err != nil { + err = fmt.Errorf("error reading body: %w", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + var alerts []*Alert + err = json.Unmarshal(b, &alerts) + if err == nil { + err = alertsEqual(*expected, alerts) + } + w.WriteHeader(int(status.Load())) + })) +} + +func TestHandlerSendAll(t *testing.T) { + var ( + errc = make(chan error, 1) + expected = make([]*Alert, 0, maxBatchSize) + status1, status2, status3 atomic.Int32 + ) + status1.Store(int32(http.StatusOK)) + status2.Store(int32(http.StatusOK)) + status3.Store(int32(http.StatusOK)) + + server1 := newTestHTTPServerBuilder(&expected, errc, "prometheus", "testing_password", &status1) + server2 := newTestHTTPServerBuilder(&expected, errc, "", "", &status2) + server3 := newTestHTTPServerBuilder(&expected, errc, "", "", &status3) + defer server1.Close() + defer server2.Close() + defer server3.Close() + + h := NewManager(&Options{}, nil) + + authClient, _ := config_util.NewClientFromConfig( + config_util.HTTPClientConfig{ + BasicAuth: &config_util.BasicAuth{ + Username: "prometheus", + Password: "testing_password", + }, + }, "auth_alertmanager") + + h.alertmanagers = make(map[string]*alertmanagerSet) + + am1Cfg := config.DefaultAlertmanagerConfig + am1Cfg.Timeout = model.Duration(time.Second) + + am2Cfg := config.DefaultAlertmanagerConfig + am2Cfg.Timeout = model.Duration(time.Second) + + am3Cfg := config.DefaultAlertmanagerConfig + am3Cfg.Timeout = model.Duration(time.Second) + + h.alertmanagers["1"] = &alertmanagerSet{ + ams: []alertmanager{ + alertmanagerMock{ + urlf: func() string { return server1.URL }, + }, + }, + cfg: &am1Cfg, + client: authClient, + } + + h.alertmanagers["2"] = &alertmanagerSet{ + ams: []alertmanager{ + alertmanagerMock{ + urlf: func() string { return server2.URL }, + }, + alertmanagerMock{ + urlf: func() string { return server3.URL }, + }, + }, + cfg: &am2Cfg, + } + + h.alertmanagers["3"] = &alertmanagerSet{ + ams: []alertmanager{}, // empty set + cfg: &am3Cfg, + } + + for i := range make([]struct{}, maxBatchSize) { + h.queue = append(h.queue, &Alert{ + Labels: labels.FromStrings("alertname", strconv.Itoa(i)), + }) + expected = append(expected, &Alert{ + Labels: labels.FromStrings("alertname", strconv.Itoa(i)), + }) + } + + checkNoErr := func() { + t.Helper() + select { + case err := <-errc: + require.NoError(t, err) + default: + } + } + + // all ams in all sets are up + require.True(t, h.sendAll(h.queue...), "all sends failed unexpectedly") + checkNoErr() + + // the only am in set 1 is down + status1.Store(int32(http.StatusNotFound)) + require.False(t, h.sendAll(h.queue...), "all sends failed unexpectedly") + checkNoErr() + + // reset it + status1.Store(int32(http.StatusOK)) + + // only one of the ams in set 2 is down + status2.Store(int32(http.StatusInternalServerError)) + require.True(t, h.sendAll(h.queue...), "all sends succeeded unexpectedly") + checkNoErr() + + // both ams in set 2 are down + status3.Store(int32(http.StatusInternalServerError)) + require.False(t, h.sendAll(h.queue...), "all sends succeeded unexpectedly") + checkNoErr() +} + +func TestHandlerSendAllRemapPerAm(t *testing.T) { + var ( + errc = make(chan error, 1) + expected1 = make([]*Alert, 0, maxBatchSize) + expected2 = make([]*Alert, 0, maxBatchSize) + expected3 = make([]*Alert, 0) + + status1, status2, status3 atomic.Int32 + ) + status1.Store(int32(http.StatusOK)) + status2.Store(int32(http.StatusOK)) + status3.Store(int32(http.StatusOK)) + + server1 := newTestHTTPServerBuilder(&expected1, errc, "", "", &status1) + server2 := newTestHTTPServerBuilder(&expected2, errc, "", "", &status2) + server3 := newTestHTTPServerBuilder(&expected3, errc, "", "", &status3) + + defer server1.Close() + defer server2.Close() + defer server3.Close() + + h := NewManager(&Options{}, nil) + h.alertmanagers = make(map[string]*alertmanagerSet) + + am1Cfg := config.DefaultAlertmanagerConfig + am1Cfg.Timeout = model.Duration(time.Second) + + am2Cfg := config.DefaultAlertmanagerConfig + am2Cfg.Timeout = model.Duration(time.Second) + am2Cfg.AlertRelabelConfigs = []*relabel.Config{ + { + SourceLabels: model.LabelNames{"alertnamedrop"}, + Action: "drop", + Regex: relabel.MustNewRegexp(".+"), + }, + } + + am3Cfg := config.DefaultAlertmanagerConfig + am3Cfg.Timeout = model.Duration(time.Second) + am3Cfg.AlertRelabelConfigs = []*relabel.Config{ + { + SourceLabels: model.LabelNames{"alertname"}, + Action: "drop", + Regex: relabel.MustNewRegexp(".+"), + }, + } + + h.alertmanagers = map[string]*alertmanagerSet{ + // Drop no alerts. + "1": { + ams: []alertmanager{ + alertmanagerMock{ + urlf: func() string { return server1.URL }, + }, + }, + cfg: &am1Cfg, + }, + // Drop only alerts with the "alertnamedrop" label. + "2": { + ams: []alertmanager{ + alertmanagerMock{ + urlf: func() string { return server2.URL }, + }, + }, + cfg: &am2Cfg, + }, + // Drop all alerts. + "3": { + ams: []alertmanager{ + alertmanagerMock{ + urlf: func() string { return server3.URL }, + }, + }, + cfg: &am3Cfg, + }, + // Empty list of Alertmanager endpoints. + "4": { + ams: []alertmanager{}, + cfg: &config.DefaultAlertmanagerConfig, + }, + } + + for i := range make([]struct{}, maxBatchSize/2) { + h.queue = append(h.queue, + &Alert{ + Labels: labels.FromStrings("alertname", strconv.Itoa(i)), + }, + &Alert{ + Labels: labels.FromStrings("alertname", "test", "alertnamedrop", strconv.Itoa(i)), + }, + ) + + expected1 = append(expected1, + &Alert{ + Labels: labels.FromStrings("alertname", strconv.Itoa(i)), + }, &Alert{ + Labels: labels.FromStrings("alertname", "test", "alertnamedrop", strconv.Itoa(i)), + }, + ) + + expected2 = append(expected2, &Alert{ + Labels: labels.FromStrings("alertname", strconv.Itoa(i)), + }) + } + + checkNoErr := func() { + t.Helper() + select { + case err := <-errc: + require.NoError(t, err) + default: + } + } + + // all ams are up + require.True(t, h.sendAll(h.queue...), "all sends failed unexpectedly") + checkNoErr() + + // the only am in set 1 goes down + status1.Store(int32(http.StatusInternalServerError)) + require.False(t, h.sendAll(h.queue...), "all sends failed unexpectedly") + checkNoErr() + + // reset set 1 + status1.Store(int32(http.StatusOK)) + + // set 3 loses its only am, but all alerts were dropped + // so there was nothing to send, keeping sendAll true + status3.Store(int32(http.StatusInternalServerError)) + require.True(t, h.sendAll(h.queue...), "all sends failed unexpectedly") + checkNoErr() + + // Verify that individual locks are released. + for k := range h.alertmanagers { + h.alertmanagers[k].mtx.Lock() + h.alertmanagers[k].ams = nil + h.alertmanagers[k].mtx.Unlock() + } +} + +func TestCustomDo(t *testing.T) { + const testURL = "http://testurl.com/" + const testBody = "testbody" + + var received bool + h := NewManager(&Options{ + Do: func(_ context.Context, _ *http.Client, req *http.Request) (*http.Response, error) { + received = true + body, err := io.ReadAll(req.Body) + + require.NoError(t, err) + + require.Equal(t, testBody, string(body)) + + require.Equal(t, testURL, req.URL.String()) + + return &http.Response{ + Body: io.NopCloser(bytes.NewBuffer(nil)), + }, nil + }, + }, nil) + + h.sendOne(context.Background(), nil, testURL, []byte(testBody), http.Header{}) + + require.True(t, received, "Expected to receive an alert, but didn't") +} + +func TestExternalLabels(t *testing.T) { + h := NewManager(&Options{ + QueueCapacity: 3 * maxBatchSize, + ExternalLabels: labels.FromStrings("a", "b"), + RelabelConfigs: []*relabel.Config{ + { + SourceLabels: model.LabelNames{"alertname"}, + TargetLabel: "a", + Action: "replace", + Regex: relabel.MustNewRegexp("externalrelabelthis"), + Replacement: "c", + }, + }, + }, nil) + + // This alert should get the external label attached. + h.Send(&Alert{ + Labels: labels.FromStrings("alertname", "test"), + }) + + // This alert should get the external label attached, but then set to "c" + // through relabelling. + h.Send(&Alert{ + Labels: labels.FromStrings("alertname", "externalrelabelthis"), + }) + + expected := []*Alert{ + {Labels: labels.FromStrings("alertname", "test", "a", "b")}, + {Labels: labels.FromStrings("alertname", "externalrelabelthis", "a", "c")}, + } + + require.NoError(t, alertsEqual(expected, h.queue)) +} + +func TestHandlerRelabel(t *testing.T) { + h := NewManager(&Options{ + QueueCapacity: 3 * maxBatchSize, + RelabelConfigs: []*relabel.Config{ + { + SourceLabels: model.LabelNames{"alertname"}, + Action: "drop", + Regex: relabel.MustNewRegexp("drop"), + }, + { + SourceLabels: model.LabelNames{"alertname"}, + TargetLabel: "alertname", + Action: "replace", + Regex: relabel.MustNewRegexp("rename"), + Replacement: "renamed", + }, + }, + }, nil) + + // This alert should be dropped due to the configuration + h.Send(&Alert{ + Labels: labels.FromStrings("alertname", "drop"), + }) + + // This alert should be replaced due to the configuration + h.Send(&Alert{ + Labels: labels.FromStrings("alertname", "rename"), + }) + + expected := []*Alert{ + {Labels: labels.FromStrings("alertname", "renamed")}, + } + + require.NoError(t, alertsEqual(expected, h.queue)) +} + +func TestHandlerQueuing(t *testing.T) { + var ( + expectedc = make(chan []*Alert) + called = make(chan struct{}) + done = make(chan struct{}) + errc = make(chan error, 1) + ) + + server := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, r *http.Request) { + // Notify the test function that we have received something. + select { + case called <- struct{}{}: + case <-done: + return + } + + // Wait for the test function to unblock us. + select { + case expected := <-expectedc: + var alerts []*Alert + + b, err := io.ReadAll(r.Body) + if err != nil { + panic(err) + } + + err = json.Unmarshal(b, &alerts) + if err == nil { + err = alertsEqual(expected, alerts) + } + select { + case errc <- err: + default: + } + case <-done: + } + })) + defer func() { + close(done) + server.Close() + }() + + h := NewManager( + &Options{ + QueueCapacity: 3 * maxBatchSize, + }, + nil, + ) + + h.alertmanagers = make(map[string]*alertmanagerSet) + + am1Cfg := config.DefaultAlertmanagerConfig + am1Cfg.Timeout = model.Duration(time.Second) + + h.alertmanagers["1"] = &alertmanagerSet{ + ams: []alertmanager{ + alertmanagerMock{ + urlf: func() string { return server.URL }, + }, + }, + cfg: &am1Cfg, + } + go h.Run(nil) + defer h.Stop() + + var alerts []*Alert + for i := range make([]struct{}, 20*maxBatchSize) { + alerts = append(alerts, &Alert{ + Labels: labels.FromStrings("alertname", strconv.Itoa(i)), + }) + } + + assertAlerts := func(expected []*Alert) { + t.Helper() + for { + select { + case <-called: + expectedc <- expected + case err := <-errc: + require.NoError(t, err) + return + case <-time.After(5 * time.Second): + require.FailNow(t, "Alerts were not pushed.") + } + } + } + + // If the batch is larger than the queue capacity, it should be truncated + // from the front. + h.Send(alerts[:4*maxBatchSize]...) + for i := 1; i < 4; i++ { + assertAlerts(alerts[i*maxBatchSize : (i+1)*maxBatchSize]) + } + + // Send one batch, wait for it to arrive and block the server so the queue fills up. + h.Send(alerts[:maxBatchSize]...) + <-called + + // Send several batches while the server is still blocked so the queue + // fills up to its maximum capacity (3*maxBatchSize). Then check that the + // queue is truncated in the front. + h.Send(alerts[1*maxBatchSize : 2*maxBatchSize]...) // this batch should be dropped. + h.Send(alerts[2*maxBatchSize : 3*maxBatchSize]...) + h.Send(alerts[3*maxBatchSize : 4*maxBatchSize]...) + + // Send the batch that drops the first one. + h.Send(alerts[4*maxBatchSize : 5*maxBatchSize]...) + + // Unblock the server. + expectedc <- alerts[:maxBatchSize] + select { + case err := <-errc: + require.NoError(t, err) + case <-time.After(5 * time.Second): + require.FailNow(t, "Alerts were not pushed.") + } + + // Verify that we receive the last 3 batches. + for i := 2; i < 5; i++ { + assertAlerts(alerts[i*maxBatchSize : (i+1)*maxBatchSize]) + } +} + +type alertmanagerMock struct { + urlf func() string +} + +func (a alertmanagerMock) url() *url.URL { + u, err := url.Parse(a.urlf()) + if err != nil { + panic(err) + } + return u +} + +func TestLabelSetNotReused(t *testing.T) { + tg := makeInputTargetGroup() + _, _, err := AlertmanagerFromGroup(tg, &config.AlertmanagerConfig{}) + + require.NoError(t, err) + + // Target modified during alertmanager extraction + require.Equal(t, tg, makeInputTargetGroup()) +} + +func TestReload(t *testing.T) { + tests := []struct { + in *targetgroup.Group + out string + }{ + { + in: &targetgroup.Group{ + Targets: []model.LabelSet{ + { + "__address__": "alertmanager:9093", + }, + }, + }, + out: "http://alertmanager:9093/api/v2/alerts", + }, + } + + n := NewManager(&Options{}, nil) + + cfg := &config.Config{} + s := ` +alerting: + alertmanagers: + - static_configs: +` + err := yaml.UnmarshalStrict([]byte(s), cfg) + require.NoError(t, err, "Unable to load YAML config.") + require.Len(t, cfg.AlertingConfig.AlertmanagerConfigs, 1) + + err = n.ApplyConfig(cfg, map[string]http.Header{}) + require.NoError(t, err, "Error applying the config.") + + tgs := make(map[string][]*targetgroup.Group) + for _, tt := range tests { + for k := range cfg.AlertingConfig.AlertmanagerConfigs.ToMap() { + tgs[k] = []*targetgroup.Group{ + tt.in, + } + break + } + n.reload(tgs) + res := n.Alertmanagers()[0].String() + + require.Equal(t, tt.out, res) + } +} + +func TestDroppedAlertmanagers(t *testing.T) { + tests := []struct { + in *targetgroup.Group + out string + }{ + { + in: &targetgroup.Group{ + Targets: []model.LabelSet{ + { + "__address__": "alertmanager:9093", + }, + }, + }, + out: "http://alertmanager:9093/api/v2/alerts", + }, + } + + n := NewManager(&Options{}, nil) + + cfg := &config.Config{} + s := ` +alerting: + alertmanagers: + - static_configs: + relabel_configs: + - source_labels: ['__address__'] + regex: 'alertmanager:9093' + action: drop +` + err := yaml.UnmarshalStrict([]byte(s), cfg) + require.NoError(t, err, "Unable to load YAML config.") + require.Len(t, cfg.AlertingConfig.AlertmanagerConfigs, 1) + + err = n.ApplyConfig(cfg, map[string]http.Header{}) + require.NoError(t, err, "Error applying the config.") + + tgs := make(map[string][]*targetgroup.Group) + for _, tt := range tests { + for k := range cfg.AlertingConfig.AlertmanagerConfigs.ToMap() { + tgs[k] = []*targetgroup.Group{ + tt.in, + } + break + } + + n.reload(tgs) + res := n.DroppedAlertmanagers()[0].String() + + require.Equal(t, res, tt.out) + } +} + +func makeInputTargetGroup() *targetgroup.Group { + return &targetgroup.Group{ + Targets: []model.LabelSet{ + { + model.AddressLabel: model.LabelValue("1.1.1.1:9090"), + model.LabelName("notcommon1"): model.LabelValue("label"), + }, + }, + Labels: model.LabelSet{ + model.LabelName("common"): model.LabelValue("label"), + }, + Source: "testsource", + } +} + +func TestLabelsToOpenAPILabelSet(t *testing.T) { + require.Equal(t, models.LabelSet{"aaa": "111", "bbb": "222"}, labelsToOpenAPILabelSet(labels.FromStrings("aaa", "111", "bbb", "222"))) +} + +// TestHangingNotifier ensures that the notifier takes into account SD changes even when there are +// queued alerts. This test reproduces the issue described in https://github.com/prometheus/prometheus/issues/13676. +// and https://github.com/prometheus/prometheus/issues/8768. +func TestHangingNotifier(t *testing.T) { + const ( + batches = 100 + alertsCount = maxBatchSize * batches + ) + + var ( + sendTimeout = 100 * time.Millisecond + sdUpdatert = sendTimeout / 2 + + done = make(chan struct{}) + ) + + defer func() { + close(done) + }() + + // Set up a faulty Alertmanager. + var faultyCalled atomic.Bool + faultyServer := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) { + faultyCalled.Store(true) + select { + case <-done: + case <-time.After(time.Hour): + } + })) + faultyURL, err := url.Parse(faultyServer.URL) + require.NoError(t, err) + + // Set up a functional Alertmanager. + var functionalCalled atomic.Bool + functionalServer := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) { + functionalCalled.Store(true) + })) + functionalURL, err := url.Parse(functionalServer.URL) + require.NoError(t, err) + + // Initialize the discovery manager + // This is relevant as the updates aren't sent continually in real life, but only each updatert. + // The old implementation of TestHangingNotifier didn't take that into account. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + reg := prometheus.NewRegistry() + sdMetrics, err := discovery.RegisterSDMetrics(reg, discovery.NewRefreshMetrics(reg)) + require.NoError(t, err) + sdManager := discovery.NewManager( + ctx, + promslog.NewNopLogger(), + reg, + sdMetrics, + discovery.Name("sd-manager"), + discovery.Updatert(sdUpdatert), + ) + go sdManager.Run() + + // Set up the notifier with both faulty and functional Alertmanagers. + notifier := NewManager( + &Options{ + QueueCapacity: alertsCount, + }, + nil, + ) + notifier.alertmanagers = make(map[string]*alertmanagerSet) + amCfg := config.DefaultAlertmanagerConfig + amCfg.Timeout = model.Duration(sendTimeout) + notifier.alertmanagers["config-0"] = &alertmanagerSet{ + ams: []alertmanager{ + alertmanagerMock{ + urlf: func() string { return faultyURL.String() }, + }, + alertmanagerMock{ + urlf: func() string { return functionalURL.String() }, + }, + }, + cfg: &amCfg, + metrics: notifier.metrics, + } + go notifier.Run(sdManager.SyncCh()) + defer notifier.Stop() + + require.Len(t, notifier.Alertmanagers(), 2) + + // Enqueue the alerts. + var alerts []*Alert + for i := range make([]struct{}, alertsCount) { + alerts = append(alerts, &Alert{ + Labels: labels.FromStrings("alertname", strconv.Itoa(i)), + }) + } + notifier.Send(alerts...) + + // Wait for the Alertmanagers to start receiving alerts. + // 10*sdUpdatert is used as an arbitrary timeout here. + timeout := time.After(10 * sdUpdatert) +loop1: + for { + select { + case <-timeout: + t.Fatalf("Timeout waiting for the alertmanagers to be reached for the first time.") + default: + if faultyCalled.Load() && functionalCalled.Load() { + break loop1 + } + } + } + + // Request to remove the faulty Alertmanager. + c := map[string]discovery.Configs{ + "config-0": { + discovery.StaticConfig{ + &targetgroup.Group{ + Targets: []model.LabelSet{ + { + model.AddressLabel: model.LabelValue(functionalURL.Host), + }, + }, + }, + }, + }, + } + require.NoError(t, sdManager.ApplyConfig(c)) + + // The notifier should not wait until the alerts queue is empty to apply the discovery changes + // A faulty Alertmanager could cause each alert sending cycle to take up to AlertmanagerConfig.Timeout + // The queue may never be emptied, as the arrival rate could be larger than the departure rate + // It could even overflow and alerts could be dropped. + timeout = time.After(batches * sendTimeout) +loop2: + for { + select { + case <-timeout: + t.Fatalf("Timeout, the faulty alertmanager not removed on time.") + default: + // The faulty alertmanager was dropped. + if len(notifier.Alertmanagers()) == 1 { + // Prevent from TOCTOU. + require.Positive(t, notifier.queueLen()) + break loop2 + } + require.Positive(t, notifier.queueLen(), "The faulty alertmanager wasn't dropped before the alerts queue was emptied.") + } + } +} + +func TestStop_DrainingDisabled(t *testing.T) { + releaseReceiver := make(chan struct{}) + receiverReceivedRequest := make(chan struct{}, 2) + alertsReceived := atomic.NewInt64(0) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Let the test know we've received a request. + receiverReceivedRequest <- struct{}{} + + var alerts []*Alert + + b, err := io.ReadAll(r.Body) + require.NoError(t, err) + + err = json.Unmarshal(b, &alerts) + require.NoError(t, err) + + alertsReceived.Add(int64(len(alerts))) + + // Wait for the test to release us. + <-releaseReceiver + + w.WriteHeader(http.StatusOK) + })) + defer func() { + server.Close() + }() + + m := NewManager( + &Options{ + QueueCapacity: 10, + DrainOnShutdown: false, + }, + nil, + ) + + m.alertmanagers = make(map[string]*alertmanagerSet) + + am1Cfg := config.DefaultAlertmanagerConfig + am1Cfg.Timeout = model.Duration(time.Second) + + m.alertmanagers["1"] = &alertmanagerSet{ + ams: []alertmanager{ + alertmanagerMock{ + urlf: func() string { return server.URL }, + }, + }, + cfg: &am1Cfg, + } + + notificationManagerStopped := make(chan struct{}) + + go func() { + defer close(notificationManagerStopped) + m.Run(nil) + }() + + // Queue two alerts. The first should be immediately sent to the receiver, which should block until we release it later. + m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-1")}) + + select { + case <-receiverReceivedRequest: + // Nothing more to do. + case <-time.After(time.Second): + require.FailNow(t, "gave up waiting for receiver to receive notification of first alert") + } + + m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-2")}) + + // Stop the notification manager, pause to allow the shutdown to be observed, and then allow the receiver to proceed. + m.Stop() + time.Sleep(time.Second) + close(releaseReceiver) + + // Wait for the notification manager to stop and confirm only the first notification was sent. + // The second notification should be dropped. + select { + case <-notificationManagerStopped: + // Nothing more to do. + case <-time.After(time.Second): + require.FailNow(t, "gave up waiting for notification manager to stop") + } + + require.Equal(t, int64(1), alertsReceived.Load()) +} + +func TestStop_DrainingEnabled(t *testing.T) { + releaseReceiver := make(chan struct{}) + receiverReceivedRequest := make(chan struct{}, 2) + alertsReceived := atomic.NewInt64(0) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Let the test know we've received a request. + receiverReceivedRequest <- struct{}{} + + var alerts []*Alert + + b, err := io.ReadAll(r.Body) + require.NoError(t, err) + + err = json.Unmarshal(b, &alerts) + require.NoError(t, err) + + alertsReceived.Add(int64(len(alerts))) + + // Wait for the test to release us. + <-releaseReceiver + + w.WriteHeader(http.StatusOK) + })) + defer func() { + server.Close() + }() + + m := NewManager( + &Options{ + QueueCapacity: 10, + DrainOnShutdown: true, + }, + nil, + ) + + m.alertmanagers = make(map[string]*alertmanagerSet) + + am1Cfg := config.DefaultAlertmanagerConfig + am1Cfg.Timeout = model.Duration(time.Second) + + m.alertmanagers["1"] = &alertmanagerSet{ + ams: []alertmanager{ + alertmanagerMock{ + urlf: func() string { return server.URL }, + }, + }, + cfg: &am1Cfg, + } + + notificationManagerStopped := make(chan struct{}) + + go func() { + defer close(notificationManagerStopped) + m.Run(nil) + }() + + // Queue two alerts. The first should be immediately sent to the receiver, which should block until we release it later. + m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-1")}) + + select { + case <-receiverReceivedRequest: + // Nothing more to do. + case <-time.After(time.Second): + require.FailNow(t, "gave up waiting for receiver to receive notification of first alert") + } + + m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-2")}) + + // Stop the notification manager and allow the receiver to proceed. + m.Stop() + close(releaseReceiver) + + // Wait for the notification manager to stop and confirm both notifications were sent. + select { + case <-notificationManagerStopped: + // Nothing more to do. + case <-time.After(200 * time.Millisecond): + require.FailNow(t, "gave up waiting for notification manager to stop") + } + + require.Equal(t, int64(2), alertsReceived.Load()) +} + +func TestApplyConfig(t *testing.T) { + targetURL := "alertmanager:9093" + targetGroup := &targetgroup.Group{ + Targets: []model.LabelSet{ + { + "__address__": model.LabelValue(targetURL), + }, + }, + } + alertmanagerURL := fmt.Sprintf("http://%s/api/v2/alerts", targetURL) + + n := NewManager(&Options{}, nil) + cfg := &config.Config{} + s := ` +alerting: + alertmanagers: + - file_sd_configs: + - files: + - foo.json +` + // 1. Ensure known alertmanagers are not dropped during ApplyConfig. + require.NoError(t, yaml.UnmarshalStrict([]byte(s), cfg)) + require.Len(t, cfg.AlertingConfig.AlertmanagerConfigs, 1) + + // First, apply the config and reload. + require.NoError(t, n.ApplyConfig(cfg, map[string]http.Header{})) + tgs := map[string][]*targetgroup.Group{"config-0": {targetGroup}} + n.reload(tgs) + require.Len(t, n.Alertmanagers(), 1) + require.Equal(t, alertmanagerURL, n.Alertmanagers()[0].String()) + + // Reapply the config. + require.NoError(t, n.ApplyConfig(cfg, map[string]http.Header{})) + // Ensure the known alertmanagers are not dropped. + require.Len(t, n.Alertmanagers(), 1) + require.Equal(t, alertmanagerURL, n.Alertmanagers()[0].String()) + + // 2. Ensure known alertmanagers are not dropped during ApplyConfig even when + // the config order changes. + s = ` +alerting: + alertmanagers: + - static_configs: + - file_sd_configs: + - files: + - foo.json +` + require.NoError(t, yaml.UnmarshalStrict([]byte(s), cfg)) + require.Len(t, cfg.AlertingConfig.AlertmanagerConfigs, 2) + + require.NoError(t, n.ApplyConfig(cfg, map[string]http.Header{})) + require.Len(t, n.Alertmanagers(), 1) + // Ensure no unnecessary alertmanagers are injected. + require.Empty(t, n.alertmanagers["config-0"].ams) + // Ensure the config order is taken into account. + ams := n.alertmanagers["config-1"].ams + require.Len(t, ams, 1) + require.Equal(t, alertmanagerURL, ams[0].url().String()) + + // 3. Ensure known alertmanagers are reused for new config with identical AlertmanagerConfig. + s = ` +alerting: + alertmanagers: + - file_sd_configs: + - files: + - foo.json + - file_sd_configs: + - files: + - foo.json +` + require.NoError(t, yaml.UnmarshalStrict([]byte(s), cfg)) + require.Len(t, cfg.AlertingConfig.AlertmanagerConfigs, 2) + + require.NoError(t, n.ApplyConfig(cfg, map[string]http.Header{})) + require.Len(t, n.Alertmanagers(), 2) + for cfgIdx := range 2 { + ams := n.alertmanagers[fmt.Sprintf("config-%d", cfgIdx)].ams + require.Len(t, ams, 1) + require.Equal(t, alertmanagerURL, ams[0].url().String()) + } + + // 4. Ensure known alertmanagers are reused only for identical AlertmanagerConfig. + s = ` +alerting: + alertmanagers: + - file_sd_configs: + - files: + - foo.json + path_prefix: /bar + - file_sd_configs: + - files: + - foo.json + relabel_configs: + - source_labels: ['__address__'] + regex: 'doesntmatter:1234' + action: drop +` + require.NoError(t, yaml.UnmarshalStrict([]byte(s), cfg)) + require.Len(t, cfg.AlertingConfig.AlertmanagerConfigs, 2) + + require.NoError(t, n.ApplyConfig(cfg, map[string]http.Header{})) + require.Empty(t, n.Alertmanagers()) +} diff --git a/pkg/services/ngalert/sender/sender.go b/pkg/services/ngalert/sender/sender.go index 41f8cd2a2a7..84f29dacbcc 100644 --- a/pkg/services/ngalert/sender/sender.go +++ b/pkg/services/ngalert/sender/sender.go @@ -28,6 +28,7 @@ import ( const ( defaultMaxQueueCapacity = 10000 defaultTimeout = 10 * time.Second + defaultDrainOnShutdown = true ) // ExternalAlertmanager is responsible for dispatching alert notifications to an external Alertmanager service. @@ -106,8 +107,8 @@ func NewExternalAlertmanagerSender(l log.Logger, reg prometheus.Registerer, opts s.manager = NewManager( // Injecting a new registry here means these metrics are not exported. // Once we fix the individual Alertmanager metrics we should fix this scenario too. - &Options{QueueCapacity: defaultMaxQueueCapacity, Registerer: reg}, - s.logger, + &Options{QueueCapacity: defaultMaxQueueCapacity, Registerer: reg, DrainOnShutdown: defaultDrainOnShutdown}, + toSlogLogger(s.logger), ) sdMetrics, err := discovery.CreateAndRegisterSDMetrics(prometheus.NewRegistry()) if err != nil {