Ruler: enable Loki to configure Alertmanagers per tenant (#7359)

pull/7292/head^2
Mohamed-Amine Bouqsimi 3 years ago committed by GitHub
parent 0f5dd2bceb
commit 7debe7c235
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 53
      docs/sources/configuration/_index.md
  2. 2
      pkg/ruler/base/compat.go
  3. 40
      pkg/ruler/base/manager.go
  4. 2
      pkg/ruler/base/manager_test.go
  5. 106
      pkg/ruler/base/notifier.go
  6. 97
      pkg/ruler/base/notifier_test.go
  7. 20
      pkg/ruler/base/ruler.go
  8. 96
      pkg/ruler/base/ruler_test.go
  9. 42
      pkg/ruler/config/alertmanager.go
  10. 1
      pkg/ruler/ruler.go
  11. 16
      pkg/util/validation/limits.go
  12. 13
      pkg/validation/limits.go
  13. 8
      production/docker/config/alertmanager.yml
  14. 2
      production/docker/config/loki.yaml
  15. 59
      production/docker/docker-compose.yaml

@ -2308,6 +2308,9 @@ The `limits_config` block configures global and per-tenant limits in Loki.
# CLI flag: -ruler.max-rule-groups-per-tenant
[ruler_max_rule_groups_per_tenant: <int> | default = 0]
# Ruler alertmanager configuration per tenant.
[ruler_alertmanager_config: <alertmanager_config>]
# Retention to apply for the store, if the retention is enable on the compactor side.
# CLI flag: -store.retention
[retention_period: <duration> | default = 744h]
@ -2505,6 +2508,56 @@ sign every remote write request.
[role_arn: <string>]
```
## alertmanager_config
The `alertmanager_config` block configures the alertmanager for the ruler alerts.
```yaml
# Comma-separated list of Alertmanager URLs to send notifications to.
# Each Alertmanager URL is treated as a separate group in the configuration.
# Multiple Alertmanagers in HA per group can be supported by using DNS
# resolution via -ruler.alertmanager-discovery.
[alertmanager_url: <string> | default = ""]
alertmanager_client:
# Sets the `Authorization` header on every remote write request with the
# configured username and password.
# password and password_file are mutually exclusive.
[basic_auth_username: <string>]
[basic_auth_password: <secret>]
# Optional `Authorization` header configuration.
authorization:
# Sets the authentication type.
[type: <string> | default: Bearer]
# Sets the credentials. It is mutually exclusive with
# `credentials_file`.
[credentials: <secret>]
# Sets the credentials to the credentials read from the configured file.
# It is mutually exclusive with `credentials`.
[credentials_file: <filename>]
# Use DNS SRV records to discover Alertmanager hosts.
[enable_alertmanager_discovery: <boolean> | default = false]
# How long to wait between refreshing DNS resolutions of Alertmanager hosts.
[alertmanager_refresh_interval: <duration> | default = 1m]
# If enabled, then requests to Alertmanager use the v2 API.
[enable_alertmanager_v2: <boolean> | default = false]
# List of alert relabel configs
alert_relabel_configs:
[- <relabel_config> ...]
# Capacity of the queue for notifications to be sent to the Alertmanager.
[notification_queue_capacity: <int> | default = 10000]
# HTTP timeout duration when sending notifications to the Alertmanager.
[notification_timeout: <duration> | default = 10s]
```
## remote_write_client_config
The `remote_write_client_config` block configures the client for the remote write function in the ruler.

@ -21,6 +21,7 @@ import (
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/ruler/config"
util_log "github.com/grafana/loki/pkg/util/log"
)
@ -134,6 +135,7 @@ type RulesLimits interface {
RulerTenantShardSize(userID string) int
RulerMaxRuleGroupsPerTenant(userID string) int
RulerMaxRulesPerRuleGroup(userID string) int
RulerAlertManagerConfig(userID string) *config.AlertManagerConfig
}
// EngineQueryFunc returns a new query function using the rules.EngineQueryFunc function

@ -24,8 +24,9 @@ import (
type DefaultMultiTenantManager struct {
cfg Config
notifierCfg *config.Config
notifiersCfg map[string]*config.Config
managerFactory ManagerFactory
limits RulesLimits
mapper *mapper
@ -47,12 +48,7 @@ type DefaultMultiTenantManager struct {
logger log.Logger
}
func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, reg prometheus.Registerer, logger log.Logger) (*DefaultMultiTenantManager, error) {
ncfg, err := buildNotifierConfig(&cfg)
if err != nil {
return nil, err
}
func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, reg prometheus.Registerer, logger log.Logger, limits RulesLimits) (*DefaultMultiTenantManager, error) {
userManagerMetrics := NewManagerMetrics(cfg.DisableRuleGroupLabel)
if reg != nil {
reg.MustRegister(userManagerMetrics)
@ -60,8 +56,9 @@ func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, reg
return &DefaultMultiTenantManager{
cfg: cfg,
notifierCfg: ncfg,
notifiersCfg: map[string]*config.Config{},
managerFactory: managerFactory,
limits: limits,
notifiers: map[string]*rulerNotifier{},
mapper: newMapper(cfg.RulePath, logger),
userManagers: map[string]RulesManager{},
@ -185,6 +182,31 @@ func (r *DefaultMultiTenantManager) getOrCreateNotifier(userID string) (*notifie
return n.notifier, nil
}
nCfg, ok := r.notifiersCfg[userID]
if !ok {
amCfg := r.cfg.AlertManagerConfig
amOverrides := r.limits.RulerAlertManagerConfig(userID)
var err error
if amOverrides != nil {
tenantAmCfg, err := getAlertmanagerTenantConfig(r.cfg.AlertManagerConfig, *amOverrides)
if err != nil {
return nil, fmt.Errorf("failed to get alertmaanger config for tenant %s: %w", userID, err)
}
amCfg = tenantAmCfg
}
nCfg, err = buildNotifierConfig(&amCfg, r.cfg.ExternalLabels)
if err != nil {
return nil, fmt.Errorf("failed to build notifier config for tenant %s: %w", userID, err)
}
if nCfg != nil {
r.notifiersCfg[userID] = nCfg
}
}
reg := prometheus.WrapRegistererWith(prometheus.Labels{"user": userID}, r.registry)
reg = prometheus.WrapRegistererWithPrefix("cortex_", reg)
n = newRulerNotifier(&notifier.Options{
@ -210,7 +232,7 @@ func (r *DefaultMultiTenantManager) getOrCreateNotifier(userID string) (*notifie
n.run()
// This should never fail, unless there's a programming mistake.
if err := n.applyConfig(r.notifierCfg); err != nil {
if err := n.applyConfig(nCfg); err != nil {
return nil, err
}

@ -20,7 +20,7 @@ import (
func TestSyncRuleGroups(t *testing.T) {
dir := t.TempDir()
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, factory, nil, log.NewNopLogger())
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, factory, nil, log.NewNopLogger(), ruleLimits{})
require.NoError(t, err)
const user = "testUser"

@ -2,7 +2,6 @@ package base
import (
"context"
"flag"
"fmt"
"net/url"
"regexp"
@ -11,29 +10,18 @@ import (
gklog "github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/crypto/tls"
"github.com/imdario/mergo"
config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/discovery/dns"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/notifier"
"github.com/grafana/loki/pkg/util"
ruler_config "github.com/grafana/loki/pkg/ruler/config"
)
type NotifierConfig struct {
TLS tls.ClientConfig `yaml:",inline"`
BasicAuth util.BasicAuth `yaml:",inline"`
HeaderAuth util.HeaderAuth `yaml:",inline"`
}
func (cfg *NotifierConfig) RegisterFlags(f *flag.FlagSet) {
cfg.TLS.RegisterFlagsWithPrefix("ruler.alertmanager-client", f)
cfg.BasicAuth.RegisterFlagsWithPrefix("ruler.alertmanager-client.", f)
cfg.HeaderAuth.RegisterFlagsWithPrefix("ruler.alertmanager-client.", f)
}
// rulerNotifier bundles a notifier.Manager together with an associated
// Alertmanager service discovery manager and handles the lifecycle
// of both actors.
@ -88,10 +76,46 @@ func (rn *rulerNotifier) stop() {
rn.wg.Wait()
}
func getAlertmanagerTenantConfig(amConfig ruler_config.AlertManagerConfig, amOverrides ruler_config.AlertManagerConfig) (ruler_config.AlertManagerConfig, error) {
if amOverrides.AlertmanagerURL != "" {
amConfig.AlertmanagerURL = amOverrides.AlertmanagerURL
}
if len(amOverrides.AlertRelabelConfigs) > 0 {
amConfig.AlertRelabelConfigs = amOverrides.AlertRelabelConfigs
}
if amOverrides.AlertmanagerDiscovery {
amConfig.AlertmanagerDiscovery = amOverrides.AlertmanagerDiscovery
}
if amOverrides.AlertmanangerEnableV2API {
amConfig.AlertmanangerEnableV2API = amOverrides.AlertmanangerEnableV2API
}
if amOverrides.AlertmanagerRefreshInterval > 0 {
amConfig.AlertmanagerRefreshInterval = amOverrides.AlertmanagerRefreshInterval
}
if amOverrides.NotificationQueueCapacity > 0 {
amConfig.NotificationQueueCapacity = amOverrides.NotificationQueueCapacity
}
if amOverrides.NotificationTimeout > 0 {
amConfig.NotificationTimeout = amOverrides.NotificationTimeout
}
if err := mergo.Merge(&amConfig.Notifier, amOverrides.Notifier, mergo.WithOverride); err != nil {
return amConfig, fmt.Errorf("failed to apply alertmanager notifier limits config: %w", err)
}
return amConfig, nil
}
// Builds a Prometheus config.Config from a ruler.Config with just the required
// options to configure notifications to Alertmanager.
func buildNotifierConfig(rulerConfig *Config) (*config.Config, error) {
amURLs := strings.Split(rulerConfig.AlertmanagerURL, ",")
func buildNotifierConfig(amConfig *ruler_config.AlertManagerConfig, externalLabels labels.Labels) (*config.Config, error) {
amURLs := strings.Split(amConfig.AlertmanagerURL, ",")
validURLs := make([]*url.URL, 0, len(amURLs))
srvDNSregexp := regexp.MustCompile(`^_.+._.+`)
@ -108,7 +132,7 @@ func buildNotifierConfig(rulerConfig *Config) (*config.Config, error) {
// Given we only support SRV lookups as part of service discovery, we need to ensure
// hosts provided follow this specification: _service._proto.name
// e.g. _http._tcp.alertmanager.com
if rulerConfig.AlertmanagerDiscovery && !srvDNSregexp.MatchString(url.Host) {
if amConfig.AlertmanagerDiscovery && !srvDNSregexp.MatchString(url.Host) {
return nil, fmt.Errorf("when alertmanager-discovery is on, host name must be of the form _portname._tcp.service.fqdn (is %q)", url.Host)
}
@ -120,21 +144,21 @@ func buildNotifierConfig(rulerConfig *Config) (*config.Config, error) {
}
apiVersion := config.AlertmanagerAPIVersionV1
if rulerConfig.AlertmanangerEnableV2API {
if amConfig.AlertmanangerEnableV2API {
apiVersion = config.AlertmanagerAPIVersionV2
}
amConfigs := make([]*config.AlertmanagerConfig, 0, len(validURLs))
for _, url := range validURLs {
amConfigs = append(amConfigs, amConfigFromURL(rulerConfig, url, apiVersion))
amConfigs = append(amConfigs, amConfigFromURL(amConfig, url, apiVersion))
}
promConfig := &config.Config{
GlobalConfig: config.GlobalConfig{
ExternalLabels: rulerConfig.ExternalLabels,
ExternalLabels: externalLabels,
},
AlertingConfig: config.AlertingConfig{
AlertRelabelConfigs: rulerConfig.AlertRelabelConfigs,
AlertRelabelConfigs: amConfig.AlertRelabelConfigs,
AlertmanagerConfigs: amConfigs,
},
}
@ -142,13 +166,13 @@ func buildNotifierConfig(rulerConfig *Config) (*config.Config, error) {
return promConfig, nil
}
func amConfigFromURL(rulerConfig *Config, url *url.URL, apiVersion config.AlertmanagerAPIVersion) *config.AlertmanagerConfig {
func amConfigFromURL(cfg *ruler_config.AlertManagerConfig, url *url.URL, apiVersion config.AlertmanagerAPIVersion) *config.AlertmanagerConfig {
var sdConfig discovery.Configs
if rulerConfig.AlertmanagerDiscovery {
if cfg.AlertmanagerDiscovery {
sdConfig = discovery.Configs{
&dns.SDConfig{
Names: []string{url.Host},
RefreshInterval: model.Duration(rulerConfig.AlertmanagerRefreshInterval),
RefreshInterval: model.Duration(cfg.AlertmanagerRefreshInterval),
Type: "SRV",
Port: 0, // Ignored, because of SRV.
},
@ -168,15 +192,15 @@ func amConfigFromURL(rulerConfig *Config, url *url.URL, apiVersion config.Alertm
APIVersion: apiVersion,
Scheme: url.Scheme,
PathPrefix: url.Path,
Timeout: model.Duration(rulerConfig.NotificationTimeout),
Timeout: model.Duration(cfg.NotificationTimeout),
ServiceDiscoveryConfigs: sdConfig,
HTTPClientConfig: config_util.HTTPClientConfig{
TLSConfig: config_util.TLSConfig{
CAFile: rulerConfig.Notifier.TLS.CAPath,
CertFile: rulerConfig.Notifier.TLS.CertPath,
KeyFile: rulerConfig.Notifier.TLS.KeyPath,
InsecureSkipVerify: rulerConfig.Notifier.TLS.InsecureSkipVerify,
ServerName: rulerConfig.Notifier.TLS.ServerName,
CAFile: cfg.Notifier.TLS.CAPath,
CertFile: cfg.Notifier.TLS.CertPath,
KeyFile: cfg.Notifier.TLS.KeyPath,
InsecureSkipVerify: cfg.Notifier.TLS.InsecureSkipVerify,
ServerName: cfg.Notifier.TLS.ServerName,
},
},
}
@ -193,23 +217,23 @@ func amConfigFromURL(rulerConfig *Config, url *url.URL, apiVersion config.Alertm
}
// Override URL basic authentication configs with hard coded config values if present
if rulerConfig.Notifier.BasicAuth.IsEnabled() {
if cfg.Notifier.BasicAuth.IsEnabled() {
amConfig.HTTPClientConfig.BasicAuth = &config_util.BasicAuth{
Username: rulerConfig.Notifier.BasicAuth.Username,
Password: config_util.Secret(rulerConfig.Notifier.BasicAuth.Password),
Username: cfg.Notifier.BasicAuth.Username,
Password: config_util.Secret(cfg.Notifier.BasicAuth.Password),
}
}
if rulerConfig.Notifier.HeaderAuth.IsEnabled() {
if rulerConfig.Notifier.HeaderAuth.Credentials != "" {
if cfg.Notifier.HeaderAuth.IsEnabled() {
if cfg.Notifier.HeaderAuth.Credentials != "" {
amConfig.HTTPClientConfig.Authorization = &config_util.Authorization{
Type: rulerConfig.Notifier.HeaderAuth.Type,
Credentials: config_util.Secret(rulerConfig.Notifier.HeaderAuth.Credentials),
Type: cfg.Notifier.HeaderAuth.Type,
Credentials: config_util.Secret(cfg.Notifier.HeaderAuth.Credentials),
}
} else if rulerConfig.Notifier.HeaderAuth.CredentialsFile != "" {
} else if cfg.Notifier.HeaderAuth.CredentialsFile != "" {
amConfig.HTTPClientConfig.Authorization = &config_util.Authorization{
Type: rulerConfig.Notifier.HeaderAuth.Type,
CredentialsFile: rulerConfig.Notifier.HeaderAuth.CredentialsFile,
Type: cfg.Notifier.HeaderAuth.Type,
CredentialsFile: cfg.Notifier.HeaderAuth.CredentialsFile,
}
}

@ -14,6 +14,7 @@ import (
"github.com/prometheus/prometheus/model/relabel"
"github.com/stretchr/testify/require"
ruler_config "github.com/grafana/loki/pkg/ruler/config"
"github.com/grafana/loki/pkg/util"
)
@ -32,7 +33,9 @@ func TestBuildNotifierConfig(t *testing.T) {
{
name: "with a single URL and no service discovery",
cfg: &Config{
AlertmanagerURL: "http://alertmanager.default.svc.cluster.local/alertmanager",
AlertManagerConfig: ruler_config.AlertManagerConfig{
AlertmanagerURL: "http://alertmanager.default.svc.cluster.local/alertmanager",
},
},
ncfg: &config.Config{
AlertingConfig: config.AlertingConfig{
@ -56,9 +59,11 @@ func TestBuildNotifierConfig(t *testing.T) {
{
name: "with a single URL and service discovery",
cfg: &Config{
AlertmanagerURL: "http://_http._tcp.alertmanager.default.svc.cluster.local/alertmanager",
AlertmanagerDiscovery: true,
AlertmanagerRefreshInterval: time.Duration(60),
AlertManagerConfig: ruler_config.AlertManagerConfig{
AlertmanagerURL: "http://_http._tcp.alertmanager.default.svc.cluster.local/alertmanager",
AlertmanagerDiscovery: true,
AlertmanagerRefreshInterval: time.Duration(60),
},
},
ncfg: &config.Config{
AlertingConfig: config.AlertingConfig{
@ -83,15 +88,19 @@ func TestBuildNotifierConfig(t *testing.T) {
{
name: "with service discovery and an invalid URL",
cfg: &Config{
AlertmanagerURL: "http://_http.default.svc.cluster.local/alertmanager",
AlertmanagerDiscovery: true,
AlertManagerConfig: ruler_config.AlertManagerConfig{
AlertmanagerURL: "http://_http.default.svc.cluster.local/alertmanager",
AlertmanagerDiscovery: true,
},
},
err: fmt.Errorf("when alertmanager-discovery is on, host name must be of the form _portname._tcp.service.fqdn (is \"alertmanager.default.svc.cluster.local\")"),
},
{
name: "with multiple URLs and no service discovery",
cfg: &Config{
AlertmanagerURL: "http://alertmanager-0.default.svc.cluster.local/alertmanager,http://alertmanager-1.default.svc.cluster.local/alertmanager",
AlertManagerConfig: ruler_config.AlertManagerConfig{
AlertmanagerURL: "http://alertmanager-0.default.svc.cluster.local/alertmanager,http://alertmanager-1.default.svc.cluster.local/alertmanager",
},
},
ncfg: &config.Config{
AlertingConfig: config.AlertingConfig{
@ -125,9 +134,11 @@ func TestBuildNotifierConfig(t *testing.T) {
{
name: "with multiple URLs and service discovery",
cfg: &Config{
AlertmanagerURL: "http://_http._tcp.alertmanager-0.default.svc.cluster.local/alertmanager,http://_http._tcp.alertmanager-1.default.svc.cluster.local/alertmanager",
AlertmanagerDiscovery: true,
AlertmanagerRefreshInterval: time.Duration(60),
AlertManagerConfig: ruler_config.AlertManagerConfig{
AlertmanagerURL: "http://_http._tcp.alertmanager-0.default.svc.cluster.local/alertmanager,http://_http._tcp.alertmanager-1.default.svc.cluster.local/alertmanager",
AlertmanagerDiscovery: true,
AlertmanagerRefreshInterval: time.Duration(60),
},
},
ncfg: &config.Config{
AlertingConfig: config.AlertingConfig{
@ -165,7 +176,9 @@ func TestBuildNotifierConfig(t *testing.T) {
{
name: "with Basic Authentication URL",
cfg: &Config{
AlertmanagerURL: "http://marco:hunter2@alertmanager-0.default.svc.cluster.local/alertmanager",
AlertManagerConfig: ruler_config.AlertManagerConfig{
AlertmanagerURL: "http://marco:hunter2@alertmanager-0.default.svc.cluster.local/alertmanager",
},
},
ncfg: &config.Config{
AlertingConfig: config.AlertingConfig{
@ -192,11 +205,13 @@ func TestBuildNotifierConfig(t *testing.T) {
{
name: "with Basic Authentication URL and Explicit",
cfg: &Config{
AlertmanagerURL: "http://marco:hunter2@alertmanager-0.default.svc.cluster.local/alertmanager",
Notifier: NotifierConfig{
BasicAuth: util.BasicAuth{
Username: "jacob",
Password: "test",
AlertManagerConfig: ruler_config.AlertManagerConfig{
AlertmanagerURL: "http://marco:hunter2@alertmanager-0.default.svc.cluster.local/alertmanager",
Notifier: ruler_config.NotifierConfig{
BasicAuth: util.BasicAuth{
Username: "jacob",
Password: "test",
},
},
},
},
@ -225,11 +240,13 @@ func TestBuildNotifierConfig(t *testing.T) {
{
name: "with Header Authorization",
cfg: &Config{
AlertmanagerURL: "http://alertmanager-0.default.svc.cluster.local/alertmanager",
Notifier: NotifierConfig{
HeaderAuth: util.HeaderAuth{
Type: "Bearer",
Credentials: "jacob",
AlertManagerConfig: ruler_config.AlertManagerConfig{
AlertmanagerURL: "http://alertmanager-0.default.svc.cluster.local/alertmanager",
Notifier: ruler_config.NotifierConfig{
HeaderAuth: util.HeaderAuth{
Type: "Bearer",
Credentials: "jacob",
},
},
},
},
@ -261,11 +278,13 @@ func TestBuildNotifierConfig(t *testing.T) {
{
name: "with Header Authorization and credentials file",
cfg: &Config{
AlertmanagerURL: "http://alertmanager-0.default.svc.cluster.local/alertmanager",
Notifier: NotifierConfig{
HeaderAuth: util.HeaderAuth{
Type: "Bearer",
CredentialsFile: "/path/to/secret/file",
AlertManagerConfig: ruler_config.AlertManagerConfig{
AlertmanagerURL: "http://alertmanager-0.default.svc.cluster.local/alertmanager",
Notifier: ruler_config.NotifierConfig{
HeaderAuth: util.HeaderAuth{
Type: "Bearer",
CredentialsFile: "/path/to/secret/file",
},
},
},
},
@ -297,7 +316,9 @@ func TestBuildNotifierConfig(t *testing.T) {
{
name: "with external labels",
cfg: &Config{
AlertmanagerURL: "http://alertmanager.default.svc.cluster.local/alertmanager",
AlertManagerConfig: ruler_config.AlertManagerConfig{
AlertmanagerURL: "http://alertmanager.default.svc.cluster.local/alertmanager",
},
ExternalLabels: []labels.Label{
{Name: "region", Value: "us-east-1"},
},
@ -329,18 +350,20 @@ func TestBuildNotifierConfig(t *testing.T) {
{
name: "with alert relabel config",
cfg: &Config{
AlertmanagerURL: "http://alertmanager.default.svc.cluster.local/alertmanager",
AlertManagerConfig: ruler_config.AlertManagerConfig{
AlertmanagerURL: "http://alertmanager.default.svc.cluster.local/alertmanager",
AlertRelabelConfigs: []*relabel.Config{
{
SourceLabels: model.LabelNames{"severity"},
Regex: relabel.MustNewRegexp("high"),
TargetLabel: "priority",
Replacement: "p1",
},
},
},
ExternalLabels: []labels.Label{
{Name: "region", Value: "us-east-1"},
},
AlertRelabelConfigs: []*relabel.Config{
{
SourceLabels: model.LabelNames{"severity"},
Regex: relabel.MustNewRegexp("high"),
TargetLabel: "priority",
Replacement: "p1",
},
},
},
ncfg: &config.Config{
AlertingConfig: config.AlertingConfig{
@ -378,7 +401,7 @@ func TestBuildNotifierConfig(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ncfg, err := buildNotifierConfig(tt.cfg)
ncfg, err := buildNotifierConfig(&tt.cfg.AlertManagerConfig, tt.cfg.ExternalLabels)
if tt.err == nil {
require.NoError(t, err)
require.Equal(t, tt.ncfg, ncfg)

@ -24,7 +24,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/model/rulefmt"
"github.com/prometheus/prometheus/notifier"
promRules "github.com/prometheus/prometheus/rules"
@ -35,6 +34,7 @@ import (
"github.com/grafana/dskit/tenant"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/ruler/config"
"github.com/grafana/loki/pkg/ruler/rulespb"
"github.com/grafana/loki/pkg/ruler/rulestore"
"github.com/grafana/loki/pkg/util"
@ -87,22 +87,8 @@ type Config struct {
// Path to store rule files for prom manager.
RulePath string `yaml:"rule_path"`
// URL of the Alertmanager to send notifications to.
AlertmanagerURL string `yaml:"alertmanager_url"`
// Whether to use DNS SRV records to discover Alertmanager.
AlertmanagerDiscovery bool `yaml:"enable_alertmanager_discovery"`
// How long to wait between refreshing the list of Alertmanager based on DNS service discovery.
AlertmanagerRefreshInterval time.Duration `yaml:"alertmanager_refresh_interval"`
// Enables the ruler notifier to use the Alertmananger V2 API.
AlertmanangerEnableV2API bool `yaml:"enable_alertmanager_v2"`
// Configuration for alert relabeling.
AlertRelabelConfigs []*relabel.Config `yaml:"alert_relabel_configs,omitempty"`
// Capacity of the queue for notifications to be sent to the Alertmanager.
NotificationQueueCapacity int `yaml:"notification_queue_capacity"`
// HTTP timeout duration when sending notifications to the Alertmanager.
NotificationTimeout time.Duration `yaml:"notification_timeout"`
// Client configs for interacting with the Alertmanager
Notifier NotifierConfig `yaml:"alertmanager_client"`
// Global alertmanager config.
config.AlertManagerConfig `yaml:",inline"`
// Max time to tolerate outage for restoring "for" state of alert.
OutageTolerance time.Duration `yaml:"for_outage_tolerance"`

@ -44,6 +44,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/querier/series"
"github.com/grafana/loki/pkg/ruler/config"
"github.com/grafana/loki/pkg/ruler/rulespb"
"github.com/grafana/loki/pkg/ruler/rulestore"
"github.com/grafana/loki/pkg/ruler/rulestore/objectclient"
@ -83,6 +84,7 @@ type ruleLimits struct {
tenantShard int
maxRulesPerRuleGroup int
maxRuleGroups int
alertManagerConfig map[string]*config.AlertManagerConfig
}
func (r ruleLimits) EvaluationDelay(_ string) time.Duration {
@ -101,6 +103,10 @@ func (r ruleLimits) RulerMaxRulesPerRuleGroup(_ string) int {
return r.maxRulesPerRuleGroup
}
func (r ruleLimits) RulerAlertManagerConfig(tenantID string) *config.AlertManagerConfig {
return r.alertManagerConfig[tenantID]
}
func testQueryableFunc(q storage.Querier) storage.QueryableFunc {
if q != nil {
return func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
@ -139,7 +145,19 @@ func testSetup(t *testing.T, q storage.Querier) (*promql.Engine, storage.Queryab
func newManager(t *testing.T, cfg Config, q storage.Querier) *DefaultMultiTenantManager {
engine, queryable, pusher, logger, overrides, reg := testSetup(t, q)
manager, err := NewDefaultMultiTenantManager(cfg, DefaultTenantManagerFactory(cfg, pusher, queryable, engine, overrides, nil), reg, logger)
manager, err := NewDefaultMultiTenantManager(cfg, DefaultTenantManagerFactory(cfg, pusher, queryable, engine, overrides, nil), reg, logger, overrides)
require.NoError(t, err)
return manager
}
func newMultiTenantManager(t *testing.T, cfg Config, q storage.Querier, amConf map[string]*config.AlertManagerConfig) *DefaultMultiTenantManager {
engine, queryable, pusher, logger, _, reg := testSetup(t, q)
overrides := ruleLimits{evalDelay: 0, maxRuleGroups: 20, maxRulesPerRuleGroup: 15}
overrides.alertManagerConfig = amConf
manager, err := NewDefaultMultiTenantManager(cfg, DefaultTenantManagerFactory(cfg, pusher, queryable, engine, overrides, nil), reg, logger, overrides)
require.NoError(t, err)
return manager
@ -189,7 +207,7 @@ func buildRuler(t *testing.T, rulerConfig Config, q storage.Querier, clientMetri
require.NoError(t, err)
managerFactory := DefaultTenantManagerFactory(rulerConfig, pusher, queryable, engine, overrides, reg)
manager, err := NewDefaultMultiTenantManager(rulerConfig, managerFactory, reg, log.NewNopLogger())
manager, err := NewDefaultMultiTenantManager(rulerConfig, managerFactory, reg, log.NewNopLogger(), overrides)
require.NoError(t, err)
ruler, err := newRuler(
@ -234,7 +252,6 @@ func TestNotifierSendsUserIDHeader(t *testing.T) {
// We create an empty rule store so that the ruler will not load any rule from it.
cfg := defaultRulerConfig(t, newMockRuleStore(nil))
cfg.AlertmanagerURL = ts.URL
cfg.AlertmanagerDiscovery = false
@ -262,6 +279,79 @@ func TestNotifierSendsUserIDHeader(t *testing.T) {
`), "cortex_prometheus_notifications_dropped_total"))
}
func TestMultiTenantsNotifierSendsUserIDHeader(t *testing.T) {
var wg sync.WaitGroup
const tenant1 = "tenant1"
const tenant2 = "tenant2"
// We do expect 2 API calls for the users create with the getOrCreateNotifier()
wg.Add(2)
ts1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
userID, _, err := tenant.ExtractTenantIDFromHTTPRequest(r)
assert.NoError(t, err)
assert.Equal(t, userID, tenant1)
wg.Done()
}))
defer ts1.Close()
ts2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
userID, _, err := tenant.ExtractTenantIDFromHTTPRequest(r)
assert.NoError(t, err)
assert.Equal(t, userID, tenant2)
wg.Done()
}))
defer ts2.Close()
// We create an empty rule store so that the ruler will not load any rule from it.
cfg := defaultRulerConfig(t, newMockRuleStore(nil))
amCfg := map[string]*config.AlertManagerConfig{
tenant1: {
AlertmanagerURL: ts1.URL,
AlertmanagerDiscovery: false,
},
tenant2: {
AlertmanagerURL: ts2.URL,
AlertmanagerDiscovery: false,
},
}
manager := newMultiTenantManager(t, cfg, nil, amCfg)
defer manager.Stop()
n1, err := manager.getOrCreateNotifier(tenant1)
require.NoError(t, err)
n2, err := manager.getOrCreateNotifier(tenant2)
require.NoError(t, err)
// Loop until notifier discovery syncs up
for len(n1.Alertmanagers()) == 0 {
time.Sleep(10 * time.Millisecond)
}
n1.Send(&notifier.Alert{
Labels: labels.Labels{labels.Label{Name: "alertname1", Value: "testalert1"}},
})
for len(n2.Alertmanagers()) == 0 {
time.Sleep(10 * time.Millisecond)
}
n2.Send(&notifier.Alert{
Labels: labels.Labels{labels.Label{Name: "alertname2", Value: "testalert2"}},
})
wg.Wait()
// Ensure we have metrics in the notifier.
assert.NoError(t, prom_testutil.GatherAndCompare(manager.registry.(*prometheus.Registry), strings.NewReader(`
# HELP cortex_prometheus_notifications_dropped_total Total number of alerts dropped due to errors when sending to Alertmanager.
# TYPE cortex_prometheus_notifications_dropped_total counter
cortex_prometheus_notifications_dropped_total{user="tenant1"} 0
cortex_prometheus_notifications_dropped_total{user="tenant2"} 0
`), "cortex_prometheus_notifications_dropped_total"))
}
func TestRuler_Rules(t *testing.T) {
cfg := defaultRulerConfig(t, newMockRuleStore(mockRules))

@ -0,0 +1,42 @@
package config
import (
"flag"
"time"
"github.com/grafana/dskit/crypto/tls"
"github.com/prometheus/prometheus/model/relabel"
"github.com/grafana/loki/pkg/util"
)
type AlertManagerConfig struct {
// URL of the Alertmanager to send notifications to.
AlertmanagerURL string `yaml:"alertmanager_url"`
// Whether to use DNS SRV records to discover Alertmanager.
AlertmanagerDiscovery bool `yaml:"enable_alertmanager_discovery"`
// How long to wait between refreshing the list of Alertmanager based on DNS service discovery.
AlertmanagerRefreshInterval time.Duration `yaml:"alertmanager_refresh_interval"`
// Enables the ruler notifier to use the Alertmananger V2 API.
AlertmanangerEnableV2API bool `yaml:"enable_alertmanager_v2"`
// Configuration for alert relabeling.
AlertRelabelConfigs []*relabel.Config `yaml:"alert_relabel_configs,omitempty"`
// Capacity of the queue for notifications to be sent to the Alertmanager.
NotificationQueueCapacity int `yaml:"notification_queue_capacity"`
// HTTP timeout duration when sending notifications to the Alertmanager.
NotificationTimeout time.Duration `yaml:"notification_timeout"`
// Client configs for interacting with the Alertmanager
Notifier NotifierConfig `yaml:"alertmanager_client,omitempty"`
}
type NotifierConfig struct {
TLS tls.ClientConfig `yaml:",inline"`
BasicAuth util.BasicAuth `yaml:",inline"`
HeaderAuth util.HeaderAuth `yaml:",inline"`
}
func (cfg *NotifierConfig) RegisterFlags(f *flag.FlagSet) {
cfg.TLS.RegisterFlagsWithPrefix("ruler.alertmanager-client", f)
cfg.BasicAuth.RegisterFlagsWithPrefix("ruler.alertmanager-client.", f)
cfg.HeaderAuth.RegisterFlagsWithPrefix("ruler.alertmanager-client.", f)
}

@ -31,6 +31,7 @@ func NewRuler(cfg Config, engine *logql.Engine, reg prometheus.Registerer, logge
MultiTenantRuleManager(cfg, engine, limits, logger, reg),
reg,
logger,
limits,
)
if err != nil {
return nil, err

@ -13,6 +13,8 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/relabel"
"golang.org/x/time/rate"
"github.com/grafana/loki/pkg/ruler/config"
)
var errMaxGlobalSeriesPerUserValidation = errors.New("The ingester.max-global-series-per-user limit is unsupported if distributor.shard-by-all-labels is disabled")
@ -82,10 +84,11 @@ type Limits struct {
MaxQueriersPerTenant int `yaml:"max_queriers_per_tenant" json:"max_queriers_per_tenant"`
// Ruler defaults and limits.
RulerEvaluationDelay model.Duration `yaml:"ruler_evaluation_delay_duration" json:"ruler_evaluation_delay_duration"`
RulerTenantShardSize int `yaml:"ruler_tenant_shard_size" json:"ruler_tenant_shard_size"`
RulerMaxRulesPerRuleGroup int `yaml:"ruler_max_rules_per_rule_group" json:"ruler_max_rules_per_rule_group"`
RulerMaxRuleGroupsPerTenant int `yaml:"ruler_max_rule_groups_per_tenant" json:"ruler_max_rule_groups_per_tenant"`
RulerEvaluationDelay model.Duration `yaml:"ruler_evaluation_delay_duration" json:"ruler_evaluation_delay_duration"`
RulerTenantShardSize int `yaml:"ruler_tenant_shard_size" json:"ruler_tenant_shard_size"`
RulerMaxRulesPerRuleGroup int `yaml:"ruler_max_rules_per_rule_group" json:"ruler_max_rules_per_rule_group"`
RulerMaxRuleGroupsPerTenant int `yaml:"ruler_max_rule_groups_per_tenant" json:"ruler_max_rule_groups_per_tenant"`
RulerAlertManagerConfig *config.AlertManagerConfig `yaml:"ruler_alertmanager_config" json:"ruler_alertmanager_config"`
// Store-gateway.
StoreGatewayTenantShardSize int `yaml:"store_gateway_tenant_shard_size" json:"store_gateway_tenant_shard_size"`
@ -515,6 +518,11 @@ func (o *Overrides) RulerMaxRuleGroupsPerTenant(userID string) int {
return o.getOverridesForUser(userID).RulerMaxRuleGroupsPerTenant
}
// RulerAlertManagerConfig returns the alertmanager configurations to use for a given user.
func (o *Overrides) RulerAlertManagerConfig(userID string) *config.AlertManagerConfig {
return o.getOverridesForUser(userID).RulerAlertManagerConfig
}
// StoreGatewayTenantShardSize returns the store-gateway shard size for a given user.
func (o *Overrides) StoreGatewayTenantShardSize(userID string) int {
return o.getOverridesForUser(userID).StoreGatewayTenantShardSize

@ -20,6 +20,7 @@ import (
"github.com/grafana/loki/pkg/distributor/shardstreams"
"github.com/grafana/loki/pkg/logql/syntax"
ruler_config "github.com/grafana/loki/pkg/ruler/config"
"github.com/grafana/loki/pkg/ruler/util"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletionmode"
"github.com/grafana/loki/pkg/util/flagext"
@ -95,9 +96,10 @@ type Limits struct {
MinShardingLookback model.Duration `yaml:"min_sharding_lookback" json:"min_sharding_lookback"`
// Ruler defaults and limits.
RulerEvaluationDelay model.Duration `yaml:"ruler_evaluation_delay_duration" json:"ruler_evaluation_delay_duration"`
RulerMaxRulesPerRuleGroup int `yaml:"ruler_max_rules_per_rule_group" json:"ruler_max_rules_per_rule_group"`
RulerMaxRuleGroupsPerTenant int `yaml:"ruler_max_rule_groups_per_tenant" json:"ruler_max_rule_groups_per_tenant"`
RulerEvaluationDelay model.Duration `yaml:"ruler_evaluation_delay_duration" json:"ruler_evaluation_delay_duration"`
RulerMaxRulesPerRuleGroup int `yaml:"ruler_max_rules_per_rule_group" json:"ruler_max_rules_per_rule_group"`
RulerMaxRuleGroupsPerTenant int `yaml:"ruler_max_rule_groups_per_tenant" json:"ruler_max_rule_groups_per_tenant"`
RulerAlertManagerConfig *ruler_config.AlertManagerConfig `yaml:"ruler_alertmanager_config" json:"ruler_alertmanager_config"`
// TODO(dannyk): add HTTP client overrides (basic auth / tls config, etc)
// Ruler remote-write limits.
@ -504,6 +506,11 @@ func (o *Overrides) RulerMaxRuleGroupsPerTenant(userID string) int {
return o.getOverridesForUser(userID).RulerMaxRuleGroupsPerTenant
}
// RulerAlertManagerConfig returns the alertmanager configurations to use for a given user.
func (o *Overrides) RulerAlertManagerConfig(userID string) *ruler_config.AlertManagerConfig {
return o.getOverridesForUser(userID).RulerAlertManagerConfig
}
// RulerRemoteWriteDisabled returns whether remote-write is disabled for a given user or not.
func (o *Overrides) RulerRemoteWriteDisabled(userID string) bool {
return o.getOverridesForUser(userID).RulerRemoteWriteDisabled

@ -0,0 +1,8 @@
route:
receiver: 'default-receiver'
group_wait: 30s
group_interval: 30m
group_by: [ alertname ]
receivers:
- name: 'default-receiver'

@ -103,4 +103,4 @@ querier:
compactor:
working_directory: /tmp/compactor
shared_store: s3
shared_store: s3

@ -3,6 +3,13 @@ version: "3.8"
networks:
loki:
volumes:
prometheus:
grafana:
alertmanager-data:
services:
# Since the Loki containers are running as user 10001 and the mounted data volume is owned by root,
@ -41,7 +48,12 @@ services:
volumes:
- ./config/prometheus.yaml:/etc/prometheus/prometheus.yml
- prometheus:/prometheus
command: [ '--log.level=debug', '--config.file=/etc/prometheus/prometheus.yml', '--query.lookback-delta=30s' ]
command:
[
'--log.level=debug',
'--config.file=/etc/prometheus/prometheus.yml',
'--query.lookback-delta=30s'
]
networks:
- loki
@ -51,8 +63,8 @@ services:
command:
- --loop
- --format=json
- --number=10 # number of log lines to generate per second
- --delay=100ms # delay between log lines
- --number=10 # number of log lines to generate per second
- --delay=100ms # delay between log lines
- --output=/var/log/generated-logs.txt
- --overwrite
- --type=log
@ -104,9 +116,9 @@ services:
loki-frontend:
image: grafana/loki:2.6.1
volumes:
- ./config:/etc/loki/
- ./config:/etc/loki/
ports:
- "3100"
- "3100"
command: "-config.file=/etc/loki/loki.yaml -target=query-frontend -frontend.downstream-url=http://loki-read:3100"
networks:
- loki
@ -121,13 +133,12 @@ services:
ports:
- "3100"
- "7946"
# uncomment to use interactive debugging
# uncomment to use interactive debugging
# - "40000-40002:40000" # # makes the replicas available on ports 40000, 40001, 40002
#cap_add:
# - SYS_PTRACE
#security_opt:
# - apparmor=unconfined
#cap_add:
# - SYS_PTRACE
#security_opt:
# - apparmor=unconfined
command: "-config.file=/etc/loki/loki.yaml -target=read"
networks:
- loki
@ -144,13 +155,12 @@ services:
ports:
- "3100"
- "7946"
# uncomment to use interactive debugging
# uncomment to use interactive debugging
# - "50000-50002:40000" # makes the replicas available on ports 50000, 50001, 50002
# cap_add:
# - SYS_PTRACE
# security_opt:
# - apparmor=unconfined
# cap_add:
# - SYS_PTRACE
# security_opt:
# - apparmor=unconfined
command: "-config.file=/etc/loki/loki.yaml -target=write"
networks:
- loki
@ -159,6 +169,15 @@ services:
mode: replicated
replicas: 3
volumes:
prometheus:
grafana:
# alertmanager to enable receiving alerts
alertmanager:
image: prom/alertmanager:v0.23.0
restart: unless-stopped
ports:
- "9093:9093"
volumes:
- "./config:/config"
- alertmanager-data:/data
command: --config.file=/config/alertmanager.yml --log.level=debug
networks:
- loki

Loading…
Cancel
Save