The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
grafana/pkg/services/ngalert/remote/alertmanager.go

696 lines
22 KiB

package remote
import (
"context"
"crypto/md5"
"encoding/base64"
"encoding/json"
"fmt"
"net/http"
"net/url"
"strings"
"time"
"github.com/go-openapi/strfmt"
amalert "github.com/prometheus/alertmanager/api/v2/client/alert"
amalertgroup "github.com/prometheus/alertmanager/api/v2/client/alertgroup"
amgeneral "github.com/prometheus/alertmanager/api/v2/client/general"
amsilence "github.com/prometheus/alertmanager/api/v2/client/silence"
"github.com/prometheus/client_golang/prometheus"
alertingClusterPB "github.com/grafana/alerting/cluster/clusterpb"
alertingModels "github.com/grafana/alerting/models"
alertingNotify "github.com/grafana/alerting/notify"
"gopkg.in/yaml.v3"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/notifier"
remoteClient "github.com/grafana/grafana/pkg/services/ngalert/remote/client"
"github.com/grafana/grafana/pkg/services/ngalert/sender"
)
type stateStore interface {
GetSilences(ctx context.Context) (string, error)
GetNotificationLog(ctx context.Context) (string, error)
}
// AutogenFn is a function that adds auto-generated routes to a configuration.
type AutogenFn func(ctx context.Context, logger log.Logger, orgId int64, config *apimodels.PostableApiAlertingConfig, skipInvalid bool) error
// NoopAutogenFn is used to skip auto-generating routes.
func NoopAutogenFn(_ context.Context, _ log.Logger, _ int64, _ *apimodels.PostableApiAlertingConfig, _ bool) error {
return nil
}
// DecryptFn is a function that takes in an encrypted value and returns it decrypted.
type DecryptFn func(ctx context.Context, payload []byte) ([]byte, error)
type Alertmanager struct {
autogenFn AutogenFn
decrypt DecryptFn
defaultConfig string
defaultConfigHash string
log log.Logger
metrics *metrics.RemoteAlertmanager
orgID int64
ready bool
sender *sender.ExternalAlertmanager
smtpFrom string
state stateStore
tenantID string
url string
lastConfigSync time.Time
syncInterval time.Duration
amClient *remoteClient.Alertmanager
mimirClient remoteClient.MimirClient
}
type AlertmanagerConfig struct {
OrgID int64
URL string
TenantID string
BasicAuthPassword string
DefaultConfig string
// ExternalURL is used in notifications sent by the remote Alertmanager.
ExternalURL string
// PromoteConfig is a flag that determines whether the configuration should be used in the remote Alertmanager.
// The same flag is used for promoting state.
PromoteConfig bool
// SmtpFrom and StaticHeaders are used in email notifications sent by the remote Alertmanager.
SmtpFrom string
StaticHeaders map[string]string
// SyncInterval determines how often we should attempt to synchronize configuration.
SyncInterval time.Duration
}
func (cfg *AlertmanagerConfig) Validate() error {
if cfg.OrgID == 0 {
return fmt.Errorf("orgID for remote Alertmanager not set")
}
if cfg.TenantID == "" {
return fmt.Errorf("empty remote Alertmanager tenantID")
}
if cfg.URL == "" {
return fmt.Errorf("empty remote Alertmanager URL for tenant '%s'", cfg.TenantID)
}
return nil
}
func NewAlertmanager(ctx context.Context, cfg AlertmanagerConfig, store stateStore, decryptFn DecryptFn, autogenFn AutogenFn, metrics *metrics.RemoteAlertmanager, tracer tracing.Tracer) (*Alertmanager, error) {
if err := cfg.Validate(); err != nil {
return nil, err
}
u, err := url.Parse(cfg.URL)
if err != nil {
return nil, fmt.Errorf("unable to parse remote Alertmanager URL: %w", err)
}
logger := log.New("ngalert.remote.alertmanager")
mcCfg := &remoteClient.Config{
Logger: logger,
Password: cfg.BasicAuthPassword,
TenantID: cfg.TenantID,
URL: u,
PromoteConfig: cfg.PromoteConfig,
ExternalURL: cfg.ExternalURL,
SmtpFrom: cfg.SmtpFrom,
StaticHeaders: cfg.StaticHeaders,
}
mc, err := remoteClient.New(mcCfg, metrics, tracer)
if err != nil {
return nil, err
}
amcCfg := &remoteClient.AlertmanagerConfig{
URL: u,
TenantID: cfg.TenantID,
Password: cfg.BasicAuthPassword,
Logger: logger,
}
amc, err := remoteClient.NewAlertmanager(amcCfg, metrics, tracer)
if err != nil {
return nil, err
}
// Configure and start the components that sends alerts.
c := amc.GetAuthedClient()
doFunc := func(ctx context.Context, _ *http.Client, req *http.Request) (*http.Response, error) {
return c.Do(req.WithContext(ctx))
}
senderLogger := log.New("ngalert.sender.external-alertmanager")
s, err := sender.NewExternalAlertmanagerSender(
senderLogger,
prometheus.NewRegistry(),
sender.WithDoFunc(doFunc),
sender.WithUTF8Labels(),
)
if err != nil {
return nil, err
}
s.Run()
err = s.ApplyConfig(cfg.OrgID, 0, []sender.ExternalAMcfg{{URL: cfg.URL + "/alertmanager"}})
if err != nil {
return nil, err
}
// Parse the default configuration into a postable config.
pCfg, err := notifier.Load([]byte(cfg.DefaultConfig))
if err != nil {
return nil, err
}
if err := autogenFn(ctx, logger, cfg.OrgID, &pCfg.AlertmanagerConfig, true); err != nil {
return nil, err
}
rawCfg, err := json.Marshal(pCfg)
if err != nil {
return nil, err
}
// Initialize LastReadinessCheck so it's present even if the check fails.
metrics.LastReadinessCheck.Set(0)
return &Alertmanager{
amClient: amc,
autogenFn: autogenFn,
decrypt: decryptFn,
defaultConfig: string(rawCfg),
defaultConfigHash: fmt.Sprintf("%x", md5.Sum(rawCfg)),
log: logger,
metrics: metrics,
mimirClient: mc,
orgID: cfg.OrgID,
smtpFrom: cfg.SmtpFrom,
state: store,
sender: s,
syncInterval: cfg.SyncInterval,
tenantID: cfg.TenantID,
url: cfg.URL,
}, nil
}
// ApplyConfig is called by the multi-org Alertmanager on startup and on every sync loop iteration (1m default).
// We do two things on startup:
// 1. Execute a readiness check to make sure the remote Alertmanager we're about to communicate with is up and ready.
// 2. Upload the configuration and state we currently hold.
// On each subsequent call to ApplyConfig we compare and upload only the configuration.
func (am *Alertmanager) ApplyConfig(ctx context.Context, config *models.AlertConfiguration) error {
if am.ready {
am.log.Debug("Alertmanager previously marked as ready, skipping readiness check and state sync")
} else {
am.log.Debug("Start readiness check for remote Alertmanager", "url", am.url)
if err := am.checkReadiness(ctx); err != nil {
return fmt.Errorf("unable to pass the readiness check: %w", err)
}
am.log.Debug("Completed readiness check for remote Alertmanager, starting state upload", "url", am.url)
if err := am.SendState(ctx); err != nil {
return fmt.Errorf("unable to upload the state to the remote Alertmanager: %w", err)
}
am.log.Debug("Completed state upload to remote Alertmanager", "url", am.url)
}
if time.Since(am.lastConfigSync) < am.syncInterval {
am.log.Debug("Not syncing configuration to remote Alertmanager, last sync was too recent")
return nil
}
am.log.Debug("Start configuration upload to remote Alertmanager", "url", am.url)
if err := am.CompareAndSendConfiguration(ctx, config); err != nil {
return fmt.Errorf("unable to upload the configuration to the remote Alertmanager: %w", err)
}
am.log.Debug("Completed configuration upload to remote Alertmanager", "url", am.url)
return nil
}
func (am *Alertmanager) checkReadiness(ctx context.Context) error {
err := am.amClient.IsReadyWithBackoff(ctx)
if err != nil {
return err
}
am.log.Debug("Alertmanager readiness check successful")
am.metrics.LastReadinessCheck.SetToCurrentTime()
am.ready = true
return nil
}
// CompareAndSendConfiguration checks whether a given configuration is being used by the remote Alertmanager.
// If not, it sends the configuration to the remote Alertmanager.
func (am *Alertmanager) CompareAndSendConfiguration(ctx context.Context, config *models.AlertConfiguration) error {
c, err := notifier.Load([]byte(config.AlertmanagerConfiguration))
if err != nil {
return err
}
// Add auto-generated routes and decrypt before comparing.
if err := am.autogenFn(ctx, am.log, am.orgID, &c.AlertmanagerConfig, true); err != nil {
return err
}
rawDecrypted, configHash, err := am.decryptConfiguration(ctx, c)
if err != nil {
return err
}
// Send the configuration only if we need to.
if !am.shouldSendConfig(ctx, configHash) {
return nil
}
decrypted, err := notifier.Load(rawDecrypted)
if err != nil {
return err
}
return am.sendConfiguration(ctx, decrypted, config.ConfigurationHash, config.CreatedAt, am.isDefaultConfiguration(configHash))
}
func (am *Alertmanager) isDefaultConfiguration(configHash [16]byte) bool {
return fmt.Sprintf("%x", configHash) == am.defaultConfigHash
}
// decryptConfiguration creates a copy of the configuration, decrypts it, and returns the decrypted configuration alongside its hash.
// Should not be used outside of this package and the specific use case of decrypting the configuration before sending
// it to the remote Alertmanager.
func (am *Alertmanager) decryptConfiguration(ctx context.Context, cfg *apimodels.PostableUserConfig) ([]byte, [16]byte, error) {
fn := func(payload []byte) ([]byte, error) {
return am.decrypt(ctx, payload)
}
// Create a copy of the configuration to avoid modifying the original
cfgCopy := &apimodels.PostableUserConfig{}
rawCfg, err := json.Marshal(cfg)
if err != nil {
return nil, [16]byte{}, fmt.Errorf("unable to marshal original configuration: %w", err)
}
if err := json.Unmarshal(rawCfg, cfgCopy); err != nil {
return nil, [16]byte{}, fmt.Errorf("unable to unmarshal original configuration: %w", err)
}
// Iterate through receivers and decrypt secure settings on the copy
for _, rcv := range cfgCopy.AlertmanagerConfig.Receivers {
for _, gmr := range rcv.GrafanaManagedReceivers {
decrypted, err := gmr.DecryptSecureSettings(fn)
if err != nil {
return nil, [16]byte{}, fmt.Errorf("unable to decrypt settings on receiver %q (uid: %q): %w", gmr.Name, gmr.UID, err)
}
gmr.SecureSettings = decrypted
}
}
rawDecrypted, err := json.Marshal(cfgCopy)
if err != nil {
return nil, [16]byte{}, fmt.Errorf("unable to marshal decrypted configuration: %w", err)
}
return rawDecrypted, md5.Sum(rawDecrypted), nil
}
func (am *Alertmanager) sendConfiguration(ctx context.Context, decrypted *apimodels.PostableUserConfig, hash string, createdAt int64, isDefault bool) error {
am.metrics.ConfigSyncsTotal.Inc()
if err := am.mimirClient.CreateGrafanaAlertmanagerConfig(
ctx,
decrypted,
hash,
createdAt,
isDefault,
); err != nil {
am.metrics.ConfigSyncErrorsTotal.Inc()
return err
}
am.metrics.LastConfigSync.SetToCurrentTime()
am.lastConfigSync = time.Now()
return nil
}
// SendState gets the Alertmanager's internal state and sends it to the remote Alertmanager.
func (am *Alertmanager) SendState(ctx context.Context) error {
am.metrics.StateSyncsTotal.Inc()
state, err := am.getFullState(ctx)
if err != nil {
am.metrics.StateSyncErrorsTotal.Inc()
return err
}
if err := am.mimirClient.CreateGrafanaAlertmanagerState(ctx, state); err != nil {
am.metrics.StateSyncErrorsTotal.Inc()
return err
}
am.metrics.LastStateSync.SetToCurrentTime()
return nil
}
// SaveAndApplyConfig decrypts and sends a configuration to the remote Alertmanager.
func (am *Alertmanager) SaveAndApplyConfig(ctx context.Context, cfg *apimodels.PostableUserConfig) error {
// Get the hash for the encrypted configuration.
rawCfg, err := json.Marshal(cfg)
if err != nil {
return err
}
hash := fmt.Sprintf("%x", md5.Sum(rawCfg))
// Add auto-generated routes and decrypt before sending.
if err := am.autogenFn(ctx, am.log, am.orgID, &cfg.AlertmanagerConfig, false); err != nil {
return err
}
rawDecrypted, _, err := am.decryptConfiguration(ctx, cfg)
if err != nil {
return err
}
decrypted, err := notifier.Load(rawDecrypted)
if err != nil {
return err
}
return am.sendConfiguration(ctx, decrypted, hash, time.Now().Unix(), false)
}
// SaveAndApplyDefaultConfig sends the default Grafana Alertmanager configuration to the remote Alertmanager.
func (am *Alertmanager) SaveAndApplyDefaultConfig(ctx context.Context) error {
c, err := notifier.Load([]byte(am.defaultConfig))
if err != nil {
return fmt.Errorf("unable to parse the default configuration: %w", err)
}
// Add auto-generated routes and decrypt before sending.
if err := am.autogenFn(ctx, am.log, am.orgID, &c.AlertmanagerConfig, true); err != nil {
return err
}
rawDecrypted, _, err := am.decryptConfiguration(ctx, c)
if err != nil {
return err
}
decrypted, err := notifier.Load(rawDecrypted)
if err != nil {
return err
}
return am.sendConfiguration(
ctx,
decrypted,
am.defaultConfigHash,
time.Now().Unix(),
true,
)
}
func (am *Alertmanager) CreateSilence(ctx context.Context, silence *apimodels.PostableSilence) (string, error) {
defer func() {
if r := recover(); r != nil {
am.log.Error("Panic while creating silence", "err", r)
}
}()
params := amsilence.NewPostSilencesParamsWithContext(ctx).WithSilence(silence)
res, err := am.amClient.Silence.PostSilences(params)
if err != nil {
return "", err
}
return res.Payload.SilenceID, nil
}
func (am *Alertmanager) DeleteSilence(ctx context.Context, silenceID string) error {
defer func() {
if r := recover(); r != nil {
am.log.Error("Panic while deleting silence", "err", r)
}
}()
params := amsilence.NewDeleteSilenceParamsWithContext(ctx).WithSilenceID(strfmt.UUID(silenceID))
_, err := am.amClient.Silence.DeleteSilence(params)
if err != nil {
return err
}
return nil
}
func (am *Alertmanager) GetSilence(ctx context.Context, silenceID string) (apimodels.GettableSilence, error) {
defer func() {
if r := recover(); r != nil {
am.log.Error("Panic while getting silence", "err", r)
}
}()
params := amsilence.NewGetSilenceParamsWithContext(ctx).WithSilenceID(strfmt.UUID(silenceID))
res, err := am.amClient.Silence.GetSilence(params)
if err != nil {
return apimodels.GettableSilence{}, err
}
return *res.Payload, nil
}
func (am *Alertmanager) ListSilences(ctx context.Context, filter []string) (apimodels.GettableSilences, error) {
defer func() {
if r := recover(); r != nil {
am.log.Error("Panic while listing silences", "err", r)
}
}()
params := amsilence.NewGetSilencesParamsWithContext(ctx).WithFilter(filter)
res, err := am.amClient.Silence.GetSilences(params)
if err != nil {
return apimodels.GettableSilences{}, err
}
return res.Payload, nil
}
func (am *Alertmanager) GetAlerts(ctx context.Context, active, silenced, inhibited bool, filter []string, receiver string) (apimodels.GettableAlerts, error) {
defer func() {
if r := recover(); r != nil {
am.log.Error("Panic while getting alerts", "err", r)
}
}()
params := amalert.NewGetAlertsParamsWithContext(ctx).
WithActive(&active).
WithSilenced(&silenced).
WithInhibited(&inhibited).
WithFilter(filter).
WithReceiver(&receiver)
res, err := am.amClient.Alert.GetAlerts(params)
if err != nil {
return apimodels.GettableAlerts{}, err
}
return res.Payload, nil
}
func (am *Alertmanager) GetAlertGroups(ctx context.Context, active, silenced, inhibited bool, filter []string, receiver string) (apimodels.AlertGroups, error) {
defer func() {
if r := recover(); r != nil {
am.log.Error("Panic while getting alert groups", "err", r)
}
}()
params := amalertgroup.NewGetAlertGroupsParamsWithContext(ctx).
WithActive(&active).
WithSilenced(&silenced).
WithInhibited(&inhibited).
WithFilter(filter).
WithReceiver(&receiver)
res, err := am.amClient.Alertgroup.GetAlertGroups(params)
if err != nil {
return apimodels.AlertGroups{}, err
}
return res.Payload, nil
}
func (am *Alertmanager) PutAlerts(ctx context.Context, alerts apimodels.PostableAlerts) error {
for _, a := range alerts.PostableAlerts {
for k, v := range a.Labels {
// The Grafana Alertmanager skips empty and namespace UID labels.
// To get the same alert fingerprint we need to remove these labels too.
// https://github.com/grafana/alerting/blob/2dda1c67ec02625ac9fc8607157b3d5825d47919/notify/grafana_alertmanager.go#L722-L724
if len(v) == 0 || k == alertingModels.NamespaceUIDLabel {
delete(a.Labels, k)
}
}
}
am.log.Debug("Sending alerts to a remote alertmanager", "url", am.url, "alerts", len(alerts.PostableAlerts))
am.sender.SendAlerts(alerts)
return nil
}
// GetStatus retrieves the remote Alertmanager configuration.
func (am *Alertmanager) GetStatus(ctx context.Context) (apimodels.GettableStatus, error) {
defer func() {
if r := recover(); r != nil {
am.log.Error("Panic while getting status", "err", r)
}
}()
params := amgeneral.NewGetStatusParamsWithContext(ctx)
res, err := am.amClient.General.GetStatus(params)
if err != nil {
return apimodels.GettableStatus{}, err
}
var cfg apimodels.PostableApiAlertingConfig
if err := yaml.Unmarshal([]byte(*res.Payload.Config.Original), &cfg); err != nil {
return apimodels.GettableStatus{}, err
}
return *apimodels.NewGettableStatus(&cfg), nil
}
func (am *Alertmanager) GetReceivers(ctx context.Context) ([]apimodels.Receiver, error) {
return am.mimirClient.GetReceivers(ctx)
}
func (am *Alertmanager) TestReceivers(ctx context.Context, c apimodels.TestReceiversConfigBodyParams) (*alertingNotify.TestReceiversResult, int, error) {
fn := func(payload []byte) ([]byte, error) {
return am.decrypt(ctx, payload)
}
receivers := make([]*alertingNotify.APIReceiver, 0, len(c.Receivers))
for _, r := range c.Receivers {
integrations := make([]*alertingNotify.GrafanaIntegrationConfig, 0, len(r.GrafanaManagedReceivers))
for _, gr := range r.GrafanaManagedReceivers {
decrypted, err := gr.DecryptSecureSettings(fn)
if err != nil {
return nil, 0, err
}
integrations = append(integrations, &alertingNotify.GrafanaIntegrationConfig{
UID: gr.UID,
Name: gr.Name,
Type: gr.Type,
DisableResolveMessage: gr.DisableResolveMessage,
Settings: json.RawMessage(gr.Settings),
SecureSettings: decrypted,
})
}
receivers = append(receivers, &alertingNotify.APIReceiver{
ConfigReceiver: r.Receiver,
GrafanaIntegrations: alertingNotify.GrafanaIntegrations{
Integrations: integrations,
},
})
}
var alert *alertingNotify.TestReceiversConfigAlertParams
if c.Alert != nil {
alert = &alertingNotify.TestReceiversConfigAlertParams{Annotations: c.Alert.Annotations, Labels: c.Alert.Labels}
}
return am.mimirClient.TestReceivers(ctx, alertingNotify.TestReceiversConfigBodyParams{
Alert: alert,
Receivers: receivers,
})
}
func (am *Alertmanager) TestTemplate(ctx context.Context, c apimodels.TestTemplatesConfigBodyParams) (*notifier.TestTemplatesResults, error) {
for _, alert := range c.Alerts {
notifier.AddDefaultLabelsAndAnnotations(alert)
}
return am.mimirClient.TestTemplate(ctx, alertingNotify.TestTemplatesConfigBodyParams{
Alerts: c.Alerts,
Template: c.Template,
Name: c.Name,
})
}
// StopAndWait is called when the grafana server is instructed to shut down or an org is deleted.
// In the context of a "remote Alertmanager" it is a good heuristic for Grafana is about to shut down or we no longer need you.
func (am *Alertmanager) StopAndWait() {
am.sender.Stop()
}
func (am *Alertmanager) Ready() bool {
return am.ready
}
// SilenceState returns the Alertmanager's silence state as a SilenceState. Currently, does not retrieve the state
// remotely and instead uses the value from the state store.
func (am *Alertmanager) SilenceState(ctx context.Context) (alertingNotify.SilenceState, error) {
silences, err := am.state.GetSilences(ctx)
if err != nil {
return nil, fmt.Errorf("error getting silences: %w", err)
}
return alertingNotify.DecodeState(strings.NewReader(silences))
}
// getFullState returns a base64-encoded protobuf message representing the Alertmanager's internal state.
func (am *Alertmanager) getFullState(ctx context.Context) (string, error) {
var parts []alertingClusterPB.Part
state, err := am.SilenceState(ctx)
if err != nil {
return "", fmt.Errorf("error getting silences: %w", err)
}
b, err := state.MarshalBinary()
if err != nil {
return "", fmt.Errorf("error marshalling silences: %w", err)
}
parts = append(parts, alertingClusterPB.Part{Key: notifier.SilencesFilename, Data: b})
notificationLog, err := am.state.GetNotificationLog(ctx)
if err != nil {
return "", fmt.Errorf("error getting notification log: %w", err)
}
parts = append(parts, alertingClusterPB.Part{Key: notifier.NotificationLogFilename, Data: []byte(notificationLog)})
fs := alertingClusterPB.FullState{
Parts: parts,
}
b, err = fs.Marshal()
if err != nil {
return "", fmt.Errorf("error marshaling full state: %w", err)
}
return base64.StdEncoding.EncodeToString(b), nil
}
// shouldSendConfig compares the remote Alertmanager configuration with our local one.
// It returns true if the configurations are different.
func (am *Alertmanager) shouldSendConfig(ctx context.Context, hash [16]byte) bool {
rc, err := am.mimirClient.GetGrafanaAlertmanagerConfig(ctx)
if err != nil {
// Log the error and return true so we try to upload our config anyway.
am.log.Warn("Unable to get the remote Alertmanager configuration for comparison, sending the configuration without comparing", "err", err)
return true
}
if rc.Promoted != am.mimirClient.ShouldPromoteConfig() {
return true
}
if rc.SmtpFrom != am.smtpFrom {
am.log.Debug("SMTP 'from' address is different, sending the configuration to the remote Alertmanager", "remote", rc.SmtpFrom, "local", am.smtpFrom)
return true
}
rawRemote, err := json.Marshal(rc.GrafanaAlertmanagerConfig)
if err != nil {
am.log.Error("Unable to marshal the remote Alertmanager configuration for comparison", "err", err)
return true
}
return md5.Sum(rawRemote) != hash
}