The Prometheus monitoring system and time series database.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
prometheus/notifier/manager.go

326 lines
8.6 KiB

// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package notifier
import (
"context"
"fmt"
"log/slog"
"net/http"
"net/url"
"sync"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/common/promslog"
"github.com/prometheus/common/version"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
)
const (
// DefaultMaxBatchSize is the default maximum number of alerts to send in a single request to the alertmanager.
DefaultMaxBatchSize = 256
contentTypeJSON = "application/json"
)
// String constants for instrumentation.
const (
namespace = "prometheus"
subsystem = "notifications"
alertmanagerLabel = "alertmanager"
)
var userAgent = version.PrometheusUserAgent()
// Manager is responsible for dispatching alert notifications to an
// alert manager service.
type Manager struct {
opts *Options
metrics *alertMetrics
mtx sync.RWMutex
stopOnce *sync.Once
stopRequested chan struct{}
alertmanagers map[string]*alertmanagerSet
logger *slog.Logger
}
// Options are the configurable parameters of a Handler.
type Options struct {
QueueCapacity int
DrainOnShutdown bool
ExternalLabels labels.Labels
RelabelConfigs []*relabel.Config
// Used for sending HTTP requests to the Alertmanager.
Do func(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error)
Registerer prometheus.Registerer
// MaxBatchSize determines the maximum number of alerts to send in a single request to the alertmanager.
MaxBatchSize int
}
func do(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) {
if client == nil {
client = http.DefaultClient
}
return client.Do(req.WithContext(ctx))
}
// NewManager is the manager constructor.
func NewManager(o *Options, nameValidationScheme model.ValidationScheme, logger *slog.Logger) *Manager {
if o.Do == nil {
o.Do = do
}
// Set default MaxBatchSize if not provided.
if o.MaxBatchSize <= 0 {
o.MaxBatchSize = DefaultMaxBatchSize
}
if logger == nil {
logger = promslog.NewNopLogger()
}
for _, rc := range o.RelabelConfigs {
switch rc.NameValidationScheme {
case model.LegacyValidation, model.UTF8Validation:
default:
rc.NameValidationScheme = nameValidationScheme
}
}
n := &Manager{
stopRequested: make(chan struct{}),
stopOnce: &sync.Once{},
opts: o,
logger: logger,
}
alertmanagersDiscoveredFunc := func() float64 { return float64(len(n.Alertmanagers())) }
n.metrics = newAlertMetrics(o.Registerer, alertmanagersDiscoveredFunc)
n.metrics.queueCapacity.Set(float64(o.QueueCapacity))
return n
}
// ApplyConfig updates the status state as the new config requires.
func (n *Manager) ApplyConfig(conf *config.Config) error {
n.mtx.Lock()
defer n.mtx.Unlock()
n.opts.ExternalLabels = conf.GlobalConfig.ExternalLabels
n.opts.RelabelConfigs = conf.AlertingConfig.AlertRelabelConfigs
for i, rc := range n.opts.RelabelConfigs {
switch rc.NameValidationScheme {
case model.LegacyValidation, model.UTF8Validation:
default:
n.opts.RelabelConfigs[i].NameValidationScheme = conf.GlobalConfig.MetricNameValidationScheme
}
}
amSets := make(map[string]*alertmanagerSet)
// configToAlertmanagers maps alertmanager sets for each unique AlertmanagerConfig,
// helping to avoid dropping known alertmanagers and re-use them without waiting for SD updates when applying the config.
configToAlertmanagers := make(map[string]*alertmanagerSet, len(n.alertmanagers))
for _, oldAmSet := range n.alertmanagers {
hash, err := oldAmSet.configHash()
if err != nil {
return err
}
configToAlertmanagers[hash] = oldAmSet
}
for k, cfg := range conf.AlertingConfig.AlertmanagerConfigs.ToMap() {
ams, err := newAlertmanagerSet(cfg, n.opts, n.logger, n.metrics)
if err != nil {
return err
}
hash, err := ams.configHash()
if err != nil {
return err
}
if oldAmSet, ok := configToAlertmanagers[hash]; ok {
ams.ams = oldAmSet.ams
ams.droppedAms = oldAmSet.droppedAms
// Only transfer sendLoops to the first new config with this hash.
// Subsequent configs with the same hash should not share the sendLoops
// map reference, as that would cause shared mutable state between
// alertmanagerSets (cleanup in one would affect the other).
oldAmSet.mtx.Lock()
if oldAmSet.sendLoops != nil {
ams.mtx.Lock()
ams.sendLoops = oldAmSet.sendLoops
oldAmSet.sendLoops = nil
ams.mtx.Unlock()
}
oldAmSet.mtx.Unlock()
}
amSets[k] = ams
}
// Clean up sendLoops that weren't transferred to new config.
// This happens when: (1) key was removed, or (2) key exists but hash changed.
// After the transfer loop above, any oldAmSet with non-nil sendLoops
// had its sendLoops NOT transferred (since we set it to nil on transfer).
for _, oldAmSet := range n.alertmanagers {
oldAmSet.mtx.Lock()
if oldAmSet.sendLoops != nil {
oldAmSet.cleanSendLoops(oldAmSet.ams...)
}
oldAmSet.mtx.Unlock()
}
n.alertmanagers = amSets
return nil
}
// Run dispatches notifications continuously, returning once Stop has been called and all
// pending notifications have been drained from the queue (if draining is enabled).
//
// Dispatching of notifications occurs in parallel to processing target updates to avoid one starving the other.
// Refer to https://github.com/prometheus/prometheus/issues/13676 for more details.
func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) {
n.targetUpdateLoop(tsets)
n.mtx.Lock()
defer n.mtx.Unlock()
for _, ams := range n.alertmanagers {
ams.mtx.Lock()
ams.cleanSendLoops(ams.ams...)
ams.mtx.Unlock()
}
}
// targetUpdateLoop receives updates of target groups and triggers a reload.
func (n *Manager) targetUpdateLoop(tsets <-chan map[string][]*targetgroup.Group) {
for {
// If we've been asked to stop, that takes priority over processing any further target group updates.
select {
case <-n.stopRequested:
return
default:
select {
case <-n.stopRequested:
return
case ts, ok := <-tsets:
if !ok {
break
}
n.reload(ts)
}
}
}
}
func (n *Manager) reload(tgs map[string][]*targetgroup.Group) {
n.mtx.Lock()
defer n.mtx.Unlock()
for id, tgroup := range tgs {
am, ok := n.alertmanagers[id]
if !ok {
n.logger.Error("couldn't sync alert manager set", "err", fmt.Sprintf("invalid id:%v", id))
continue
}
am.sync(tgroup)
}
}
// Send queues the given notification requests for processing.
// Panics if called on a handler that is not running.
func (n *Manager) Send(alerts ...*Alert) {
// If we've been asked to stop, that takes priority over accepting new alerts.
select {
case <-n.stopRequested:
return
default:
}
n.mtx.RLock()
defer n.mtx.RUnlock()
alerts = relabelAlerts(n.opts.RelabelConfigs, n.opts.ExternalLabels, alerts)
if len(alerts) == 0 {
return
}
for _, ams := range n.alertmanagers {
ams.send(alerts...)
}
}
// Alertmanagers returns a slice of Alertmanager URLs.
func (n *Manager) Alertmanagers() []*url.URL {
n.mtx.RLock()
amSets := n.alertmanagers
n.mtx.RUnlock()
var res []*url.URL
for _, ams := range amSets {
ams.mtx.RLock()
for _, am := range ams.ams {
res = append(res, am.url())
}
ams.mtx.RUnlock()
}
return res
}
// DroppedAlertmanagers returns a slice of Alertmanager URLs.
func (n *Manager) DroppedAlertmanagers() []*url.URL {
n.mtx.RLock()
amSets := n.alertmanagers
n.mtx.RUnlock()
var res []*url.URL
for _, ams := range amSets {
ams.mtx.RLock()
for _, dam := range ams.droppedAms {
res = append(res, dam.url())
}
ams.mtx.RUnlock()
}
return res
}
// Stop signals the notification manager to shut down and immediately returns.
//
// Run will return once the notification manager has successfully shut down.
//
// The manager will optionally drain send loops before shutting down.
//
// Stop is safe to call multiple times.
func (n *Manager) Stop() {
n.logger.Info("Stopping notification manager...")
n.stopOnce.Do(func() {
close(n.stopRequested)
})
}