From ae9837b793e97f683d1517d270b719c99656e556 Mon Sep 17 00:00:00 2001 From: Tito Lins Date: Thu, 13 Feb 2025 11:36:45 +0100 Subject: [PATCH] Alerting: Add alertmanager integration tests (#100106) --- .github/CODEOWNERS | 1 + Makefile | 8 + .../docker/blocks/stateful_webhook/Dockerfile | 12 + .../stateful_webhook/docker-compose.yaml | 5 + devenv/docker/blocks/stateful_webhook/main.go | 149 +++++++ go.mod | 2 + go.sum | 4 + go.work.sum | 1 + .../alertmanager/alertmanager_scenario.go | 386 ++++++++++++++++++ pkg/tests/alertmanager/alertmanager_test.go | 97 +++++ pkg/tests/alertmanager/grafana.go | 75 ++++ pkg/tests/alertmanager/loki.go | 152 +++++++ pkg/tests/alertmanager/postgres.go | 41 ++ pkg/tests/alertmanager/webhook.go | 85 ++++ ...na_alertmanager_integration_test_images.go | 41 ++ 15 files changed, 1059 insertions(+) create mode 100644 devenv/docker/blocks/stateful_webhook/Dockerfile create mode 100644 devenv/docker/blocks/stateful_webhook/docker-compose.yaml create mode 100644 devenv/docker/blocks/stateful_webhook/main.go create mode 100644 pkg/tests/alertmanager/alertmanager_scenario.go create mode 100644 pkg/tests/alertmanager/alertmanager_test.go create mode 100644 pkg/tests/alertmanager/grafana.go create mode 100644 pkg/tests/alertmanager/loki.go create mode 100644 pkg/tests/alertmanager/postgres.go create mode 100644 pkg/tests/alertmanager/webhook.go create mode 100644 tools/setup_grafana_alertmanager_integration_test_images.go diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index cbb2fa1a54f..a349655800b 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -241,6 +241,7 @@ /devenv/dev-dashboards/extensions/ @grafana/plugins-platform-frontend /devenv/docker/blocks/alert_webhook_listener/ @grafana/alerting-backend +/devenv/docker/blocks/stateful_webhook/ @grafana/alerting-backend /devenv/docker/blocks/caddy_tls/ @grafana/alerting-backend /devenv/docker/blocks/clickhouse/ @grafana/partner-datasources /devenv/docker/blocks/collectd/ @grafana/observability-metrics diff --git a/Makefile b/Makefile index 2721c2830b8..11cee0a4255 100644 --- a/Makefile +++ b/Makefile @@ -271,6 +271,14 @@ test-go-integration-alertmanager: ## Run integration tests for the remote alertm AM_URL=http://localhost:8080 AM_TENANT_ID=test \ $(GO) test $(GO_RACE_FLAG) -count=1 -run "^TestIntegrationRemoteAlertmanager" -covermode=atomic -timeout=5m ./pkg/services/ngalert/... +.PHONY: test-go-integration-grafana-alertmanager +test-go-integration-grafana-alertmanager: ## Run integration tests for the grafana alertmanager + @echo "test grafana alertmanager integration tests" + @export GRAFANA_VERSION=11.5.0-81938; \ + $(GO) run tools/setup_grafana_alertmanager_integration_test_images.go; \ + $(GO) clean -testcache; \ + $(GO) test $(GO_RACE_FLAG) -count=1 -run "^TestAlertmanagerIntegration" -covermode=atomic -timeout=10m ./pkg/tests/alertmanager/... + .PHONY: test-go-integration-postgres test-go-integration-postgres: devenv-postgres ## Run integration tests for postgres backend with flags. @echo "test backend integration postgres tests" diff --git a/devenv/docker/blocks/stateful_webhook/Dockerfile b/devenv/docker/blocks/stateful_webhook/Dockerfile new file mode 100644 index 00000000000..03b50db2135 --- /dev/null +++ b/devenv/docker/blocks/stateful_webhook/Dockerfile @@ -0,0 +1,12 @@ +FROM golang:1.23.5 + +ADD main.go /go/src/webhook/main.go + +WORKDIR /go/src/webhook + +RUN mkdir /tmp/logs +RUN go build -o /bin main.go + +ENV PORT=8080 + +ENTRYPOINT [ "/bin/main" ] diff --git a/devenv/docker/blocks/stateful_webhook/docker-compose.yaml b/devenv/docker/blocks/stateful_webhook/docker-compose.yaml new file mode 100644 index 00000000000..7217516c4e9 --- /dev/null +++ b/devenv/docker/blocks/stateful_webhook/docker-compose.yaml @@ -0,0 +1,5 @@ + stateful_webhook: + build: + context: docker/blocks/stateful_webhook + ports: + - "8080:8080" diff --git a/devenv/docker/blocks/stateful_webhook/main.go b/devenv/docker/blocks/stateful_webhook/main.go new file mode 100644 index 00000000000..926cf4e593e --- /dev/null +++ b/devenv/docker/blocks/stateful_webhook/main.go @@ -0,0 +1,149 @@ +package main + +import ( + "encoding/json" + "io" + "log" + "net/http" + "strings" + "sync" + "time" +) + +type Event struct { + Status string `json:"status"` + TimeNow time.Time `json:"timeNow"` + StartsAt time.Time `json:"startsAt"` + Node string `json:"node"` + DeltaLastSeconds float64 `json:"deltaLastSeconds"` + DeltaStartSeconds float64 `json:"deltaStartSeconds"` +} + +type Notification struct { + Alerts []Alert `json:"alerts"` + CommonAnnotations map[string]string `json:"commonAnnotations"` + CommonLabels map[string]string `json:"commonLabels"` + ExternalURL string `json:"externalURL"` + GroupKey string `json:"groupKey"` + GroupLabels map[string]string `json:"groupLabels"` + Message string `json:"message"` + OrgID int `json:"orgId"` + Receiver string `json:"receiver"` + State string `json:"state"` + Status string `json:"status"` + Title string `json:"title"` + TruncatedAlerts int `json:"truncatedAlerts"` + Version string `json:"version"` +} + +type Alert struct { + Annotations map[string]string `json:"annotations"` + DashboardURL string `json:"dashboardURL"` + StartsAt time.Time `json:"startsAt"` + EndsAt time.Time `json:"endsAt"` + Fingerprint string `json:"fingerprint"` + GeneratorURL string `json:"generatorURL"` + Labels map[string]string `json:"labels"` + PanelURL string `json:"panelURL"` + SilenceURL string `json:"silenceURL"` + Status string `json:"status"` + ValueString string `json:"valueString"` + Values map[string]any `json:"values"` +} + +type NotificationHandler struct { + startedAt time.Time + stats map[string]int + hist []Event + m sync.Mutex +} + +func NewNotificationHandler() *NotificationHandler { + return &NotificationHandler{ + startedAt: time.Now(), + stats: make(map[string]int), + hist: make([]Event, 0), + } +} + +func (ah *NotificationHandler) Notify(w http.ResponseWriter, r *http.Request) { + b, err := io.ReadAll(r.Body) + if err != nil { + log.Println(err) + w.WriteHeader(http.StatusBadRequest) + return + } + n := Notification{} + if err := json.Unmarshal(b, &n); err != nil { + log.Println(err) + w.WriteHeader(http.StatusBadRequest) + return + } + log.Printf("got notification from: %s. a: %v", r.RemoteAddr, n) + + ah.m.Lock() + defer ah.m.Unlock() + + addr := r.RemoteAddr + if split := strings.Split(r.RemoteAddr, ":"); len(split) > 0 { + addr = split[0] + } + + a := n.Alerts[0] + + timeNow := time.Now() + + ah.stats[n.Status]++ + + var d time.Duration + if len(ah.hist) > 0 { + last := ah.hist[len(ah.hist)-1] + d = timeNow.Sub(last.TimeNow) + } + + ah.hist = append(ah.hist, Event{ + Status: n.Status, + StartsAt: a.StartsAt, + TimeNow: timeNow, + Node: addr, + DeltaLastSeconds: d.Seconds(), + DeltaStartSeconds: timeNow.Sub(ah.startedAt).Seconds(), + }) +} + +func (ah *NotificationHandler) GetNotifications(w http.ResponseWriter, _ *http.Request) { + ah.m.Lock() + defer ah.m.Unlock() + w.Header().Set("Content-Type", "application/json") + + res, err := json.MarshalIndent(map[string]any{"stats": ah.stats, "history": ah.hist}, "", "\t") + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + //nolint:errcheck + w.Write([]byte(`{"error":"failed to marshal alerts"}`)) + log.Printf("failed to marshal alerts: %v\n", err) + return + } + + log.Printf("requested current state\n%v\n", string(res)) + + _, err = w.Write(res) + if err != nil { + log.Printf("failed to write response: %v\n", err) + } +} + +func main() { + ah := NewNotificationHandler() + + http.HandleFunc("/ready", func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + }) + + http.HandleFunc("/notify", ah.Notify) + http.HandleFunc("/notifications", ah.GetNotifications) + + log.Println("Listening") + //nolint:errcheck + http.ListenAndServe("0.0.0.0:8080", nil) +} diff --git a/go.mod b/go.mod index 3ded590b054..9b05a5c2234 100644 --- a/go.mod +++ b/go.mod @@ -217,6 +217,8 @@ require ( github.com/grafana/grafana/pkg/storage/unified/resource v0.0.0-20250121113133-e747350fee2d // @grafana/grafana-search-and-storage ) +require github.com/grafana/grafana-api-golang-client v0.27.0 // @grafana/alerting-backend + require ( cel.dev/expr v0.19.0 // indirect cloud.google.com/go v0.116.0 // indirect diff --git a/go.sum b/go.sum index f73dcbe3a18..8f98d32599f 100644 --- a/go.sum +++ b/go.sum @@ -1297,6 +1297,8 @@ github.com/go-xorm/sqlfiddle v0.0.0-20180821085327-62ce714f951a h1:9wScpmSP5A3Bk github.com/go-xorm/sqlfiddle v0.0.0-20180821085327-62ce714f951a/go.mod h1:56xuuqnHyryaerycW3BfssRdxQstACi0Epw/yC5E2xM= github.com/go-zookeeper/zk v1.0.4 h1:DPzxraQx7OrPyXq2phlGlNSIyWEsAox0RJmjTseMV6I= github.com/go-zookeeper/zk v1.0.4/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw= +github.com/gobs/pretty v0.0.0-20180724170744-09732c25a95b h1:/vQ+oYKu+JoyaMPDsv5FzwuL2wwWBgBbtj/YLCi4LuA= +github.com/gobs/pretty v0.0.0-20180724170744-09732c25a95b/go.mod h1:Xo4aNUOrJnVruqWQJBtW6+bTBDTniY8yZum5rF3b5jw= github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y= github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= github.com/goccy/go-json v0.9.11/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= @@ -1527,6 +1529,8 @@ github.com/grafana/gofpdf v0.0.0-20231002120153-857cc45be447 h1:jxJJ5z0GxqhWFbQU github.com/grafana/gofpdf v0.0.0-20231002120153-857cc45be447/go.mod h1:IxsY6mns6Q5sAnWcrptrgUrSglTZJXH/kXr9nbpb/9I= github.com/grafana/gomemcache v0.0.0-20240805133030-fdaf6a95408e h1:UlEET0InuoFautfaFp8lDrNF7rPHYXuBMrzwWx9XqFY= github.com/grafana/gomemcache v0.0.0-20240805133030-fdaf6a95408e/go.mod h1:IGRj8oOoxwJbHBYl1+OhS9UjQR0dv6SQOep7HqmtyFU= +github.com/grafana/grafana-api-golang-client v0.27.0 h1:zIwMXcbCB4n588i3O2N6HfNcQogCNTd/vPkEXTr7zX8= +github.com/grafana/grafana-api-golang-client v0.27.0/go.mod h1:uNLZEmgKtTjHBtCQMwNn3qsx2mpMb8zU+7T4Xv3NR9Y= github.com/grafana/grafana-app-sdk v0.31.0 h1:/mFCcx+YqG8cWAi9hePDJQxIdtXDClDIDRgZwHkksFk= github.com/grafana/grafana-app-sdk v0.31.0/go.mod h1:Xw00NL7qpRLo5r3Gn48Bl1Xn2n4eUDI5pYf/wMufKWs= github.com/grafana/grafana-app-sdk/logging v0.30.0 h1:K/P/bm7Cp7Di4tqIJ3EQz2+842JozQGRaz62r95ApME= diff --git a/go.work.sum b/go.work.sum index 1be6908c9d8..832ae031f57 100644 --- a/go.work.sum +++ b/go.work.sum @@ -1455,6 +1455,7 @@ github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1/go.mod h1:oJDH3BJKyqBA2TXFhDs github.com/go-zookeeper/zk v1.0.2/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw= github.com/go-zookeeper/zk v1.0.3 h1:7M2kwOsc//9VeeFiPtf+uSJlVpU66x9Ba5+8XK7/TDg= github.com/go-zookeeper/zk v1.0.3/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw= +github.com/gobs/pretty v0.0.0-20180724170744-09732c25a95b/go.mod h1:Xo4aNUOrJnVruqWQJBtW6+bTBDTniY8yZum5rF3b5jw= github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU= github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM= github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og= diff --git a/pkg/tests/alertmanager/alertmanager_scenario.go b/pkg/tests/alertmanager/alertmanager_scenario.go new file mode 100644 index 00000000000..719cccef4ab --- /dev/null +++ b/pkg/tests/alertmanager/alertmanager_scenario.go @@ -0,0 +1,386 @@ +package alertmanager + +import ( + "encoding/json" + "fmt" + "os" + "strconv" + "strings" + "testing" + "time" + + "github.com/grafana/e2e" + gapi "github.com/grafana/grafana-api-golang-client" + "github.com/stretchr/testify/require" +) + +const ( + defaultNetworkName = "e2e-grafana-am" +) + +type AlertRuleConfig struct { + PendingPeriod string + GroupEvaluationIntervalSeconds int64 +} + +type NotificationPolicyCfg struct { + GroupWait string + GroupInterval string + RepeatInterval string +} + +type ProvisionCfg struct { + AlertRuleConfig + NotificationPolicyCfg +} + +// AlertmanagerScenario is a helper for writing tests which require some number of AM +// configured to communicate with some number of Grafana instances. +type AlertmanagerScenario struct { + *e2e.Scenario + + Grafanas map[string]*GrafanaService + Webhook *WebhookService + Postgres *PostgresService + Loki *LokiService +} + +func NewAlertmanagerScenario() (*AlertmanagerScenario, error) { + s, err := e2e.NewScenario(getNetworkName()) + if err != nil { + return nil, err + } + + return &AlertmanagerScenario{ + Scenario: s, + Grafanas: make(map[string]*GrafanaService), + }, nil +} + +// Setup starts a Grafana AM cluster of size n and all required dependencies +func (s *AlertmanagerScenario) Start(t *testing.T, n int, peerTimeout string, stopOnExtraDedup bool) { + is := getInstances(n) + ips := mapInstancePeers(is) + + // start dependencies in one go + require.NoError( + t, + s.StartAndWaitReady([]e2e.Service{ + s.NewWebhookService("webhook"), + s.NewLokiService("loki"), + s.NewPostgresService("postgres"), + }...), + ) + + for i, ps := range ips { + require.NoError(t, s.StartAndWaitReady(s.NewGrafanaService(i, ps, peerTimeout, stopOnExtraDedup))) + } + + // wait for instances to come online and cluster to be properly configured + time.Sleep(30 * time.Second) +} + +// Provision provisions all required resources for the test +func (s *AlertmanagerScenario) Provision(t *testing.T, cfg ProvisionCfg) { //}*GrafanaClient { + c, err := s.NewGrafanaClient("grafana-1", 1) + require.NoError(t, err) + + dsUID := "integration-testdata" + + // setup resources + _, err = c.NewDataSource(&gapi.DataSource{ + Name: "grafana-testdata-datasource", + Type: "grafana-testdata-datasource", + Access: "proxy", + UID: dsUID, + }) + require.NoError(t, err) + + // setup loki for state history + _, err = c.NewDataSource(&gapi.DataSource{ + Name: "loki", + Type: "loki", + URL: "http://loki:3100", + Access: "proxy", + }) + require.NoError(t, err) + + _, err = c.NewContactPoint(&gapi.ContactPoint{ + Name: "webhook", + Type: "webhook", + Settings: map[string]any{ + "url": "http://webhook:8080/notify", + }, + }) + require.NoError(t, err) + + require.NoError(t, c.SetNotificationPolicyTree(&gapi.NotificationPolicyTree{ + Receiver: "webhook", + GroupWait: cfg.GroupWait, + GroupInterval: cfg.GroupInterval, + RepeatInterval: cfg.RepeatInterval, + })) + + f, err := c.NewFolder("integration_test") + require.NoError(t, err) + + r := &gapi.AlertRule{ + Title: "integration rule", + Condition: "C", + FolderUID: f.UID, + ExecErrState: gapi.ErrError, + NoDataState: gapi.NoData, + For: cfg.PendingPeriod, + RuleGroup: "test", + Data: []*gapi.AlertQuery{ + { + RefID: "A", + RelativeTimeRange: gapi.RelativeTimeRange{ + From: 600, + To: 0, + }, + DatasourceUID: dsUID, + Model: json.RawMessage(fmt.Sprintf(`{ + "refId":"A", + "datasource": { + "type": "grafana-testdata-datasource", + "uid": "%s" + }, + "hide":false, + "range":false, + "instant":true, + "intervalMs":1000, + "maxDataPoints":43200, + "pulseWave": { + "offCount": 6, + "offValue": 0, + "onCount": 10, + "onValue": 10, + "timeStep": 10 + }, + "refId": "A", + "scenarioId": "predictable_pulse", + "seriesCount": 1 + }`, dsUID)), + }, + { + RefID: "B", + RelativeTimeRange: gapi.RelativeTimeRange{ + From: 0, + To: 0, + }, + DatasourceUID: "__expr__", + Model: json.RawMessage(`{ + "conditions": [ + { + "evaluator": { + "params": [ + 0, + 0 + ], + "type": "gt" + }, + "operator": { + "type": "and" + }, + "query": { + "params": [] + }, + "reducer": { + "params": [], + "type": "avg" + }, + "type": "query" + } + ], + "datasource": { + "name": "Expression", + "type": "__expr__", + "uid": "__expr__" + }, + "expression": "A", + "intervalMs": 1000, + "maxDataPoints": 43200, + "reducer": "last", + "refId": "B", + "type": "reduce" + }`), + }, + { + RefID: "C", + RelativeTimeRange: gapi.RelativeTimeRange{ + From: 0, + To: 0, + }, + DatasourceUID: "__expr__", + Model: json.RawMessage(`{ + "conditions": [ + { + "evaluator": { + "params": [ + 0, + 0 + ], + "type": "gt" + }, + "operator": { + "type": "and" + }, + "query": { + "params": [ + "B" + ] + }, + "reducer": { + "params": [], + "type": "last" + }, + "type": "query" + } + ], + "datasource": { + "type": "__expr__", + "uid": "__expr__" + }, + "hide": false, + "isPaused": false, + "intervalMs": 1000, + "maxDataPoints": 43200, + "refId": "C", + "expression": "B", + "type": "threshold" + }`), + }, + }, + } + _, err = c.NewAlertRule(r) + require.NoError(t, err) + + require.NoError(t, c.SetAlertRuleGroup(gapi.RuleGroup{ + Title: "test", + FolderUID: f.UID, + Interval: cfg.GroupEvaluationIntervalSeconds, + Rules: []gapi.AlertRule{*r}, + })) +} + +// NewGrafanaService creates a new Grafana instance. +func (s *AlertmanagerScenario) NewGrafanaService(name string, peers []string, peerTimeout string, stopOnExtraDedup bool) *GrafanaService { + flags := map[string]string{} + + ft := []string{ + "alertStateHistoryLokiSecondary", + "alertStateHistoryLokiPrimary", + "alertStateHistoryLokiOnly", + "alertingAlertmanagerExtraDedupStage", + } + if stopOnExtraDedup { + ft = append(ft, "alertingAlertmanagerExtraDedupStageStopPipeline") + } + envVars := map[string]string{ + //"GF_LOG_MODE": "file", // disable console logging + "GF_LOG_LEVEL": "warn", + "GF_FEATURE_TOGGLES_ENABLE": strings.Join(ft, ","), + "GF_UNIFIED_ALERTING_ENABLED": "true", + "GF_UNIFIED_ALERTING_EXECUTE_ALERTS": "true", + "GF_UNIFIED_ALERTING_HA_PEER_TIMEOUT": peerTimeout, + "GF_UNIFIED_ALERTING_HA_RECONNECT_TIMEOUT": "2m", + "GF_UNIFIED_ALERTING_HA_LISTEN_ADDRESS": ":9094", + "GF_UNIFIED_ALERTING_HA_PEERS": strings.Join(peers, ","), + "GF_UNIFIED_ALERTING_STATE_HISTORY_ENABLED": "true", + "GF_UNIFIED_ALERTING_STATE_HISTORY_BACKEND": "loki", + "GF_UNIFIED_ALERTING_STATE_HISTORY_LOKI_REMOTE_URL": "http://loki:3100", + "GF_DATABASE_TYPE": "postgres", + "GF_DATABASE_HOST": "postgres:5432", + "GF_DATABASE_NAME": "grafana", + "GF_DATABASE_USER": "postgres", + "GF_DATABASE_PASSWORD": "password", + "GF_DATABASE_SSL_MODE": "disable", + } + + g := NewGrafanaService(name, flags, envVars) + + s.Grafanas[name] = g + return g +} + +// NewGrafanaService creates a new Grafana API client for the requested instance. +func (s *AlertmanagerScenario) NewGrafanaClient(grafanaName string, orgID int64) (*GrafanaClient, error) { + g, ok := s.Grafanas[grafanaName] + if !ok { + return nil, fmt.Errorf("unknown grafana instance: %s", grafanaName) + } + + return NewGrafanaClient(g.HTTPEndpoint(), orgID) +} + +func (s *AlertmanagerScenario) NewWebhookClient() (*WebhookClient, error) { + return NewWebhookClient("http://" + s.Webhook.HTTPEndpoint()) +} + +func (s *AlertmanagerScenario) NewWebhookService(name string) *WebhookService { + ws := NewWebhookService(name, nil, nil) + s.Webhook = ws + + return ws +} + +func (s *AlertmanagerScenario) NewLokiService(name string) *LokiService { + ls := NewLokiService(name, map[string]string{"--config.file": "/etc/loki/local-config.yaml"}, nil) + s.Loki = ls + + return ls +} + +func (s *AlertmanagerScenario) NewPostgresService(name string) *PostgresService { + ps := NewPostgresService(name, map[string]string{"POSTGRES_PASSWORD": "password", "POSTGRES_DB": "grafana"}) + s.Postgres = ps + + return ps +} + +func (s *AlertmanagerScenario) NewLokiClient() (*LokiClient, error) { + return NewLokiClient("http://" + s.Loki.HTTPEndpoint()) +} + +func getNetworkName() string { + // If the E2E_NETWORK_NAME is set, use that for the network name. + // Otherwise, return the default network name. + if os.Getenv("E2E_NETWORK_NAME") != "" { + return os.Getenv("E2E_NETWORK_NAME") + } + + return defaultNetworkName +} + +func getInstances(n int) []string { + is := make([]string, n) + + for i := 0; i < n; i++ { + is[i] = "grafana-" + strconv.Itoa(i+1) + } + + return is +} + +func getPeers(i string, is []string) []string { + peers := make([]string, 0, len(is)-1) + + for _, p := range is { + if p != i { + peers = append(peers, p+":9094") + } + } + + return peers +} + +func mapInstancePeers(is []string) map[string][]string { + mIs := make(map[string][]string, len(is)) + + for _, i := range is { + mIs[i] = getPeers(i, is) + } + + return mIs +} diff --git a/pkg/tests/alertmanager/alertmanager_test.go b/pkg/tests/alertmanager/alertmanager_test.go new file mode 100644 index 00000000000..0f127ea2ab6 --- /dev/null +++ b/pkg/tests/alertmanager/alertmanager_test.go @@ -0,0 +1,97 @@ +package alertmanager + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestAlertmanagerIntegration_ExtraDedupStage(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + t.Run("assert no flapping alerts when stopOnExtraDedup is enabled", func(t *testing.T) { + s, err := NewAlertmanagerScenario() + require.NoError(t, err) + defer s.Close() + + s.Start(t, 20, "15s", true) + s.Provision(t, ProvisionCfg{ + AlertRuleConfig: AlertRuleConfig{ + PendingPeriod: "30s", + GroupEvaluationIntervalSeconds: 10, + }, + NotificationPolicyCfg: NotificationPolicyCfg{ + GroupWait: "30s", + GroupInterval: "1m", + RepeatInterval: "30m", + }, + }) + + wc, err := s.NewWebhookClient() + require.NoError(t, err) + + lc, err := s.NewLokiClient() + require.NoError(t, err) + + // notifications only start arriving after 2 to 3 minutes so we wait for that + time.Sleep(time.Minute * 2) + + timeout := time.After(5 * time.Minute) + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + nr, err := wc.GetNotifications() + if err != nil { + t.Logf("failed to get alert notifications: %v\n", err) + continue + } + + // get the latest state for the alert from loki + st, err := lc.GetCurrentAlertState() + if err != nil { + t.Logf("failed to get alert state: %v\n", err) + continue + } + + // if the last state is not normal, ignore + // we might be missing other cases of flapping notifications but for now we are only interested in this one + // (alerting notification when state is already normal) + if st.State != AlertStateNormal { + continue + } + + // history is ordered - fetch the first notification that is after the last state change + var i int + for i = range nr.History { + if nr.History[i].TimeNow.After(st.Timestamp) { + break + } + } + + // if all notifications are from before the last state change, we can wait a bit more + if nr.History[i].TimeNow.Before(st.Timestamp) { + continue + } + + // for all notifications after the last state change, check if there is a firing one + for ; i < len(nr.History); i++ { + notification := nr.History[i] + if notification.Status == "firing" { + t.Errorf("flapping notifications - got firing notification when alert was resolved, state = %#v, notification = %#v", st, notification) + t.FailNow() + } + } + + case <-timeout: + // if after the timeout there are no such cases, we assume there are no flapping notifications + return + } + } + }) +} diff --git a/pkg/tests/alertmanager/grafana.go b/pkg/tests/alertmanager/grafana.go new file mode 100644 index 00000000000..7d67ffdf53c --- /dev/null +++ b/pkg/tests/alertmanager/grafana.go @@ -0,0 +1,75 @@ +package alertmanager + +import ( + _ "embed" + "fmt" + "net/url" + "os" + + "github.com/grafana/e2e" + gapi "github.com/grafana/grafana-api-golang-client" +) + +const ( + grafanaBinary = "/run.sh" + grafanaHTTPPort = 3000 +) + +// GetDefaultImage returns the Docker image to use to run the Grafana.. +func GetGrafanaImage() string { + if img := os.Getenv("GRAFANA_IMAGE"); img != "" { + return img + } + + if version := os.Getenv("GRAFANA_VERSION"); version != "" { + return "grafana/grafana-enterprise-dev:" + version + } + + panic("Provide GRAFANA_VERSION or GRAFANA_IMAGE") +} + +type GrafanaService struct { + *e2e.HTTPService +} + +func NewGrafanaService(name string, flags, envVars map[string]string) *GrafanaService { + svc := &GrafanaService{ + HTTPService: e2e.NewHTTPService( + name, + GetGrafanaImage(), + e2e.NewCommandWithoutEntrypoint(grafanaBinary, e2e.BuildArgs(flags)...), + e2e.NewHTTPReadinessProbe(grafanaHTTPPort, "/ready", 200, 299), + grafanaHTTPPort, + 9094, + ), + } + + svc.SetEnvVars(envVars) + + return svc +} + +type GrafanaClient struct { + *gapi.Client +} + +// NewGrafanaClient creates a client for using the Grafana API. Note we don't bother +// wrapping the client library, and just use it as-is, until we find a reason not to. +func NewGrafanaClient(host string, orgID int64) (*GrafanaClient, error) { + cfg := gapi.Config{ + BasicAuth: url.UserPassword("admin", "admin"), + OrgID: orgID, + HTTPHeaders: map[string]string{ + "X-Disable-Provenance": "true", + }, + } + + client, err := gapi.New(fmt.Sprintf("http://%s/", host), cfg) + if err != nil { + return nil, err + } + + return &GrafanaClient{ + Client: client, + }, nil +} diff --git a/pkg/tests/alertmanager/loki.go b/pkg/tests/alertmanager/loki.go new file mode 100644 index 00000000000..16c6fda0b90 --- /dev/null +++ b/pkg/tests/alertmanager/loki.go @@ -0,0 +1,152 @@ +package alertmanager + +import ( + "encoding/json" + "fmt" + "net/http" + "net/url" + "os" + "strconv" + "time" + + "github.com/grafana/e2e" +) + +const ( + defaultLokiImage = "grafana/loki:latest" + lokiBinary = "/usr/bin/loki" + lokiHTTPPort = 3100 +) + +// GetDefaultImage returns the Docker image to use to run the Loki.. +func GetLokiImage() string { + if img := os.Getenv("LOKI_IMAGE"); img != "" { + return img + } + + return defaultLokiImage +} + +type LokiService struct { + *e2e.HTTPService +} + +func NewLokiService(name string, flags, envVars map[string]string) *LokiService { + svc := &LokiService{ + HTTPService: e2e.NewHTTPService( + name, + GetLokiImage(), + e2e.NewCommandWithoutEntrypoint(lokiBinary, e2e.BuildArgs(flags)...), + e2e.NewHTTPReadinessProbe(lokiHTTPPort, "/ready", 200, 299), + lokiHTTPPort, + ), + } + + svc.SetEnvVars(envVars) + + return svc +} + +type LokiClient struct { + c http.Client + u *url.URL +} + +func NewLokiClient(u string) (*LokiClient, error) { + pu, err := url.Parse(u) + if err != nil { + return nil, err + } + + return &LokiClient{ + c: http.Client{}, + u: pu, + }, nil +} + +type LokiQueryResponse struct { + Status string `json:"status"` + Data struct { + ResultType string `json:"resultType"` + Result []struct { + Stream struct { + Condition string `json:"condition"` + Current string `json:"current"` + DashboardUID string `json:"dashboardUID"` + Fingerprint string `json:"fingerprint"` + FolderUID string `json:"folderUID"` + From string `json:"from"` + Group string `json:"group"` + LabelsAlertname string `json:"labels_alertname"` + LabelsGrafanaFolder string `json:"labels_grafana_folder"` + OrgID string `json:"orgID"` + PanelID string `json:"panelID"` + Previous string `json:"previous"` + RuleID string `json:"ruleID"` + RuleTitle string `json:"ruleTitle"` + RuleUID string `json:"ruleUID"` + SchemaVersion string `json:"schemaVersion"` + ServiceName string `json:"service_name"` + ValuesB string `json:"values_B"` + ValuesC string `json:"values_C"` + } `json:"stream"` + Values [][]string `json:"values"` + } `json:"result"` + } +} + +type AlertState string + +const ( + AlertStateNormal AlertState = "Normal" + AlertStatePending AlertState = "Pending" + AlertStateAlerting AlertState = "Alerting" +) + +type AlertStateResponse struct { + State AlertState + Timestamp time.Time +} + +// GetCurrentAlertState fetches the current alert state from loki +func (c *LokiClient) GetCurrentAlertState() (*AlertStateResponse, error) { + u := c.u.ResolveReference(&url.URL{Path: "/loki/api/v1/query_range"}) + + vs := url.Values{} + vs.Add("query", `{from="state-history"} | json`) + vs.Add("since", "60s") + + u.RawQuery = vs.Encode() + + resp, err := c.c.Get(u.String()) + if err != nil { + return nil, err + } + //nolint:errcheck + defer resp.Body.Close() + + res := LokiQueryResponse{} + + if err = json.NewDecoder(resp.Body).Decode(&res); err != nil { + return nil, err + } + + if res.Status != "success" { + return nil, fmt.Errorf("failed to query state from loki") + } + + if len(res.Data.Result) == 0 { + return nil, fmt.Errorf("empty result from loki") + } + + r := res.Data.Result[0] + it, err := strconv.ParseInt(r.Values[0][0], 10, 0) + if err != nil { + return nil, fmt.Errorf("failed to parse timestamp: %v", err) + } + + return &AlertStateResponse{ + State: AlertState(r.Stream.Current), + Timestamp: time.Unix(0, it), + }, nil +} diff --git a/pkg/tests/alertmanager/postgres.go b/pkg/tests/alertmanager/postgres.go new file mode 100644 index 00000000000..3d3fe4577d9 --- /dev/null +++ b/pkg/tests/alertmanager/postgres.go @@ -0,0 +1,41 @@ +package alertmanager + +import ( + "os" + + "github.com/grafana/e2e" +) + +const ( + defaultPostgresImage = "postgres:16.4" + postgresHTTPPort = 5432 +) + +// GetDefaultImage returns the Docker image to use to run the Postgres.. +func GetPostgresImage() string { + if img := os.Getenv("POSTGRES_IMAGE"); img != "" { + return img + } + + return defaultPostgresImage +} + +type PostgresService struct { + *e2e.HTTPService +} + +func NewPostgresService(name string, envVars map[string]string) *PostgresService { + svc := &PostgresService{ + HTTPService: e2e.NewHTTPService( + name, + GetPostgresImage(), + nil, + nil, + postgresHTTPPort, + ), + } + + svc.SetEnvVars(envVars) + + return svc +} diff --git a/pkg/tests/alertmanager/webhook.go b/pkg/tests/alertmanager/webhook.go new file mode 100644 index 00000000000..c90c3f229eb --- /dev/null +++ b/pkg/tests/alertmanager/webhook.go @@ -0,0 +1,85 @@ +package alertmanager + +import ( + "encoding/json" + "net/http" + "net/url" + "time" + + "github.com/grafana/e2e" +) + +const ( + defaultWebhookImage = "webhook-receiver" + webhookBinary = "/bin/main" + webhookHTTPPort = 8080 +) + +type WebhookService struct { + *e2e.HTTPService +} + +func NewWebhookService(name string, flags, envVars map[string]string) *WebhookService { + svc := &WebhookService{ + HTTPService: e2e.NewHTTPService( + name, + "webhook-receiver", + e2e.NewCommandWithoutEntrypoint(webhookBinary, e2e.BuildArgs(flags)...), + e2e.NewHTTPReadinessProbe(webhookHTTPPort, "/ready", 200, 299), + webhookHTTPPort), + } + + svc.SetEnvVars(envVars) + + return svc +} + +type WebhookClient struct { + c http.Client + u *url.URL +} + +func NewWebhookClient(u string) (*WebhookClient, error) { + pu, err := url.Parse(u) + if err != nil { + return nil, err + } + + return &WebhookClient{ + c: http.Client{}, + u: pu, + }, nil +} + +type GetNotificationsResponse struct { + Stats map[string]int `json:"stats"` + History []struct { + Status string `json:"status"` + TimeNow time.Time `json:"timeNow"` + StartsAt time.Time `json:"startsAt"` + Node string `json:"node"` + DeltaLastSeconds float64 `json:"deltaLastSeconds"` + DeltaStartSeconds float64 `json:"deltaStartSeconds"` + } `json:"history"` +} + +// GetNotifications fetches notifications from the webhook server +func (c *WebhookClient) GetNotifications() (*GetNotificationsResponse, error) { + u := c.u.ResolveReference(&url.URL{Path: "/notifications"}) + + resp, err := c.c.Get(u.String()) + if err != nil { + return nil, err + } + //nolint:errcheck + defer resp.Body.Close() + + res := GetNotificationsResponse{} + + err = json.NewDecoder(resp.Body).Decode(&res) + if err != nil { + return nil, err + } + + return &res, nil +} diff --git a/tools/setup_grafana_alertmanager_integration_test_images.go b/tools/setup_grafana_alertmanager_integration_test_images.go new file mode 100644 index 00000000000..3b6e1436446 --- /dev/null +++ b/tools/setup_grafana_alertmanager_integration_test_images.go @@ -0,0 +1,41 @@ +package main + +import ( + "fmt" + "os" + "os/exec" + "sync" + + amtests "github.com/grafana/grafana/pkg/tests/alertmanager" +) + +func docker(args []string) { + cmd := exec.Command("docker", args...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + if err := cmd.Run(); err != nil { + fmt.Printf("docker pull failed: %v\n", err) + os.Exit(1) + } +} + +func main() { + var wg sync.WaitGroup + + for _, cmd := range [][]string{ + {"pull", amtests.GetGrafanaImage()}, + {"pull", amtests.GetLokiImage()}, + {"pull", amtests.GetPostgresImage()}, + {"build", "-t", "webhook-receiver", "devenv/docker/blocks/stateful_webhook"}, + } { + wg.Add(1) + + go func(cmd []string) { + defer wg.Done() + + docker(cmd) + }(cmd) + } + + wg.Wait() +}