mirror of https://github.com/grafana/grafana
Alerting: Add alertmanager integration tests (#100106)
parent
6db155649c
commit
ae9837b793
@ -0,0 +1,12 @@ |
||||
FROM golang:1.23.5 |
||||
|
||||
ADD main.go /go/src/webhook/main.go |
||||
|
||||
WORKDIR /go/src/webhook |
||||
|
||||
RUN mkdir /tmp/logs |
||||
RUN go build -o /bin main.go |
||||
|
||||
ENV PORT=8080 |
||||
|
||||
ENTRYPOINT [ "/bin/main" ] |
@ -0,0 +1,5 @@ |
||||
stateful_webhook: |
||||
build: |
||||
context: docker/blocks/stateful_webhook |
||||
ports: |
||||
- "8080:8080" |
@ -0,0 +1,149 @@ |
||||
package main |
||||
|
||||
import ( |
||||
"encoding/json" |
||||
"io" |
||||
"log" |
||||
"net/http" |
||||
"strings" |
||||
"sync" |
||||
"time" |
||||
) |
||||
|
||||
type Event struct { |
||||
Status string `json:"status"` |
||||
TimeNow time.Time `json:"timeNow"` |
||||
StartsAt time.Time `json:"startsAt"` |
||||
Node string `json:"node"` |
||||
DeltaLastSeconds float64 `json:"deltaLastSeconds"` |
||||
DeltaStartSeconds float64 `json:"deltaStartSeconds"` |
||||
} |
||||
|
||||
type Notification struct { |
||||
Alerts []Alert `json:"alerts"` |
||||
CommonAnnotations map[string]string `json:"commonAnnotations"` |
||||
CommonLabels map[string]string `json:"commonLabels"` |
||||
ExternalURL string `json:"externalURL"` |
||||
GroupKey string `json:"groupKey"` |
||||
GroupLabels map[string]string `json:"groupLabels"` |
||||
Message string `json:"message"` |
||||
OrgID int `json:"orgId"` |
||||
Receiver string `json:"receiver"` |
||||
State string `json:"state"` |
||||
Status string `json:"status"` |
||||
Title string `json:"title"` |
||||
TruncatedAlerts int `json:"truncatedAlerts"` |
||||
Version string `json:"version"` |
||||
} |
||||
|
||||
type Alert struct { |
||||
Annotations map[string]string `json:"annotations"` |
||||
DashboardURL string `json:"dashboardURL"` |
||||
StartsAt time.Time `json:"startsAt"` |
||||
EndsAt time.Time `json:"endsAt"` |
||||
Fingerprint string `json:"fingerprint"` |
||||
GeneratorURL string `json:"generatorURL"` |
||||
Labels map[string]string `json:"labels"` |
||||
PanelURL string `json:"panelURL"` |
||||
SilenceURL string `json:"silenceURL"` |
||||
Status string `json:"status"` |
||||
ValueString string `json:"valueString"` |
||||
Values map[string]any `json:"values"` |
||||
} |
||||
|
||||
type NotificationHandler struct { |
||||
startedAt time.Time |
||||
stats map[string]int |
||||
hist []Event |
||||
m sync.Mutex |
||||
} |
||||
|
||||
func NewNotificationHandler() *NotificationHandler { |
||||
return &NotificationHandler{ |
||||
startedAt: time.Now(), |
||||
stats: make(map[string]int), |
||||
hist: make([]Event, 0), |
||||
} |
||||
} |
||||
|
||||
func (ah *NotificationHandler) Notify(w http.ResponseWriter, r *http.Request) { |
||||
b, err := io.ReadAll(r.Body) |
||||
if err != nil { |
||||
log.Println(err) |
||||
w.WriteHeader(http.StatusBadRequest) |
||||
return |
||||
} |
||||
n := Notification{} |
||||
if err := json.Unmarshal(b, &n); err != nil { |
||||
log.Println(err) |
||||
w.WriteHeader(http.StatusBadRequest) |
||||
return |
||||
} |
||||
log.Printf("got notification from: %s. a: %v", r.RemoteAddr, n) |
||||
|
||||
ah.m.Lock() |
||||
defer ah.m.Unlock() |
||||
|
||||
addr := r.RemoteAddr |
||||
if split := strings.Split(r.RemoteAddr, ":"); len(split) > 0 { |
||||
addr = split[0] |
||||
} |
||||
|
||||
a := n.Alerts[0] |
||||
|
||||
timeNow := time.Now() |
||||
|
||||
ah.stats[n.Status]++ |
||||
|
||||
var d time.Duration |
||||
if len(ah.hist) > 0 { |
||||
last := ah.hist[len(ah.hist)-1] |
||||
d = timeNow.Sub(last.TimeNow) |
||||
} |
||||
|
||||
ah.hist = append(ah.hist, Event{ |
||||
Status: n.Status, |
||||
StartsAt: a.StartsAt, |
||||
TimeNow: timeNow, |
||||
Node: addr, |
||||
DeltaLastSeconds: d.Seconds(), |
||||
DeltaStartSeconds: timeNow.Sub(ah.startedAt).Seconds(), |
||||
}) |
||||
} |
||||
|
||||
func (ah *NotificationHandler) GetNotifications(w http.ResponseWriter, _ *http.Request) { |
||||
ah.m.Lock() |
||||
defer ah.m.Unlock() |
||||
w.Header().Set("Content-Type", "application/json") |
||||
|
||||
res, err := json.MarshalIndent(map[string]any{"stats": ah.stats, "history": ah.hist}, "", "\t") |
||||
if err != nil { |
||||
w.WriteHeader(http.StatusInternalServerError) |
||||
//nolint:errcheck
|
||||
w.Write([]byte(`{"error":"failed to marshal alerts"}`)) |
||||
log.Printf("failed to marshal alerts: %v\n", err) |
||||
return |
||||
} |
||||
|
||||
log.Printf("requested current state\n%v\n", string(res)) |
||||
|
||||
_, err = w.Write(res) |
||||
if err != nil { |
||||
log.Printf("failed to write response: %v\n", err) |
||||
} |
||||
} |
||||
|
||||
func main() { |
||||
ah := NewNotificationHandler() |
||||
|
||||
http.HandleFunc("/ready", func(w http.ResponseWriter, _ *http.Request) { |
||||
w.WriteHeader(http.StatusOK) |
||||
}) |
||||
|
||||
http.HandleFunc("/notify", ah.Notify) |
||||
http.HandleFunc("/notifications", ah.GetNotifications) |
||||
|
||||
log.Println("Listening") |
||||
//nolint:errcheck
|
||||
http.ListenAndServe("0.0.0.0:8080", nil) |
||||
} |
@ -0,0 +1,386 @@ |
||||
package alertmanager |
||||
|
||||
import ( |
||||
"encoding/json" |
||||
"fmt" |
||||
"os" |
||||
"strconv" |
||||
"strings" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/grafana/e2e" |
||||
gapi "github.com/grafana/grafana-api-golang-client" |
||||
"github.com/stretchr/testify/require" |
||||
) |
||||
|
||||
const ( |
||||
defaultNetworkName = "e2e-grafana-am" |
||||
) |
||||
|
||||
type AlertRuleConfig struct { |
||||
PendingPeriod string |
||||
GroupEvaluationIntervalSeconds int64 |
||||
} |
||||
|
||||
type NotificationPolicyCfg struct { |
||||
GroupWait string |
||||
GroupInterval string |
||||
RepeatInterval string |
||||
} |
||||
|
||||
type ProvisionCfg struct { |
||||
AlertRuleConfig |
||||
NotificationPolicyCfg |
||||
} |
||||
|
||||
// AlertmanagerScenario is a helper for writing tests which require some number of AM
|
||||
// configured to communicate with some number of Grafana instances.
|
||||
type AlertmanagerScenario struct { |
||||
*e2e.Scenario |
||||
|
||||
Grafanas map[string]*GrafanaService |
||||
Webhook *WebhookService |
||||
Postgres *PostgresService |
||||
Loki *LokiService |
||||
} |
||||
|
||||
func NewAlertmanagerScenario() (*AlertmanagerScenario, error) { |
||||
s, err := e2e.NewScenario(getNetworkName()) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return &AlertmanagerScenario{ |
||||
Scenario: s, |
||||
Grafanas: make(map[string]*GrafanaService), |
||||
}, nil |
||||
} |
||||
|
||||
// Setup starts a Grafana AM cluster of size n and all required dependencies
|
||||
func (s *AlertmanagerScenario) Start(t *testing.T, n int, peerTimeout string, stopOnExtraDedup bool) { |
||||
is := getInstances(n) |
||||
ips := mapInstancePeers(is) |
||||
|
||||
// start dependencies in one go
|
||||
require.NoError( |
||||
t, |
||||
s.StartAndWaitReady([]e2e.Service{ |
||||
s.NewWebhookService("webhook"), |
||||
s.NewLokiService("loki"), |
||||
s.NewPostgresService("postgres"), |
||||
}...), |
||||
) |
||||
|
||||
for i, ps := range ips { |
||||
require.NoError(t, s.StartAndWaitReady(s.NewGrafanaService(i, ps, peerTimeout, stopOnExtraDedup))) |
||||
} |
||||
|
||||
// wait for instances to come online and cluster to be properly configured
|
||||
time.Sleep(30 * time.Second) |
||||
} |
||||
|
||||
// Provision provisions all required resources for the test
|
||||
func (s *AlertmanagerScenario) Provision(t *testing.T, cfg ProvisionCfg) { //}*GrafanaClient {
|
||||
c, err := s.NewGrafanaClient("grafana-1", 1) |
||||
require.NoError(t, err) |
||||
|
||||
dsUID := "integration-testdata" |
||||
|
||||
// setup resources
|
||||
_, err = c.NewDataSource(&gapi.DataSource{ |
||||
Name: "grafana-testdata-datasource", |
||||
Type: "grafana-testdata-datasource", |
||||
Access: "proxy", |
||||
UID: dsUID, |
||||
}) |
||||
require.NoError(t, err) |
||||
|
||||
// setup loki for state history
|
||||
_, err = c.NewDataSource(&gapi.DataSource{ |
||||
Name: "loki", |
||||
Type: "loki", |
||||
URL: "http://loki:3100", |
||||
Access: "proxy", |
||||
}) |
||||
require.NoError(t, err) |
||||
|
||||
_, err = c.NewContactPoint(&gapi.ContactPoint{ |
||||
Name: "webhook", |
||||
Type: "webhook", |
||||
Settings: map[string]any{ |
||||
"url": "http://webhook:8080/notify", |
||||
}, |
||||
}) |
||||
require.NoError(t, err) |
||||
|
||||
require.NoError(t, c.SetNotificationPolicyTree(&gapi.NotificationPolicyTree{ |
||||
Receiver: "webhook", |
||||
GroupWait: cfg.GroupWait, |
||||
GroupInterval: cfg.GroupInterval, |
||||
RepeatInterval: cfg.RepeatInterval, |
||||
})) |
||||
|
||||
f, err := c.NewFolder("integration_test") |
||||
require.NoError(t, err) |
||||
|
||||
r := &gapi.AlertRule{ |
||||
Title: "integration rule", |
||||
Condition: "C", |
||||
FolderUID: f.UID, |
||||
ExecErrState: gapi.ErrError, |
||||
NoDataState: gapi.NoData, |
||||
For: cfg.PendingPeriod, |
||||
RuleGroup: "test", |
||||
Data: []*gapi.AlertQuery{ |
||||
{ |
||||
RefID: "A", |
||||
RelativeTimeRange: gapi.RelativeTimeRange{ |
||||
From: 600, |
||||
To: 0, |
||||
}, |
||||
DatasourceUID: dsUID, |
||||
Model: json.RawMessage(fmt.Sprintf(`{ |
||||
"refId":"A", |
||||
"datasource": { |
||||
"type": "grafana-testdata-datasource", |
||||
"uid": "%s" |
||||
}, |
||||
"hide":false, |
||||
"range":false, |
||||
"instant":true, |
||||
"intervalMs":1000, |
||||
"maxDataPoints":43200, |
||||
"pulseWave": { |
||||
"offCount": 6, |
||||
"offValue": 0, |
||||
"onCount": 10, |
||||
"onValue": 10, |
||||
"timeStep": 10 |
||||
}, |
||||
"refId": "A", |
||||
"scenarioId": "predictable_pulse", |
||||
"seriesCount": 1 |
||||
}`, dsUID)), |
||||
}, |
||||
{ |
||||
RefID: "B", |
||||
RelativeTimeRange: gapi.RelativeTimeRange{ |
||||
From: 0, |
||||
To: 0, |
||||
}, |
||||
DatasourceUID: "__expr__", |
||||
Model: json.RawMessage(`{ |
||||
"conditions": [ |
||||
{ |
||||
"evaluator": { |
||||
"params": [ |
||||
0, |
||||
0 |
||||
], |
||||
"type": "gt" |
||||
}, |
||||
"operator": { |
||||
"type": "and" |
||||
}, |
||||
"query": { |
||||
"params": [] |
||||
}, |
||||
"reducer": { |
||||
"params": [], |
||||
"type": "avg" |
||||
}, |
||||
"type": "query" |
||||
} |
||||
], |
||||
"datasource": { |
||||
"name": "Expression", |
||||
"type": "__expr__", |
||||
"uid": "__expr__" |
||||
}, |
||||
"expression": "A", |
||||
"intervalMs": 1000, |
||||
"maxDataPoints": 43200, |
||||
"reducer": "last", |
||||
"refId": "B", |
||||
"type": "reduce" |
||||
}`), |
||||
}, |
||||
{ |
||||
RefID: "C", |
||||
RelativeTimeRange: gapi.RelativeTimeRange{ |
||||
From: 0, |
||||
To: 0, |
||||
}, |
||||
DatasourceUID: "__expr__", |
||||
Model: json.RawMessage(`{ |
||||
"conditions": [ |
||||
{ |
||||
"evaluator": { |
||||
"params": [ |
||||
0, |
||||
0 |
||||
], |
||||
"type": "gt" |
||||
}, |
||||
"operator": { |
||||
"type": "and" |
||||
}, |
||||
"query": { |
||||
"params": [ |
||||
"B" |
||||
] |
||||
}, |
||||
"reducer": { |
||||
"params": [], |
||||
"type": "last" |
||||
}, |
||||
"type": "query" |
||||
} |
||||
], |
||||
"datasource": { |
||||
"type": "__expr__", |
||||
"uid": "__expr__" |
||||
}, |
||||
"hide": false, |
||||
"isPaused": false, |
||||
"intervalMs": 1000, |
||||
"maxDataPoints": 43200, |
||||
"refId": "C", |
||||
"expression": "B", |
||||
"type": "threshold" |
||||
}`), |
||||
}, |
||||
}, |
||||
} |
||||
_, err = c.NewAlertRule(r) |
||||
require.NoError(t, err) |
||||
|
||||
require.NoError(t, c.SetAlertRuleGroup(gapi.RuleGroup{ |
||||
Title: "test", |
||||
FolderUID: f.UID, |
||||
Interval: cfg.GroupEvaluationIntervalSeconds, |
||||
Rules: []gapi.AlertRule{*r}, |
||||
})) |
||||
} |
||||
|
||||
// NewGrafanaService creates a new Grafana instance.
|
||||
func (s *AlertmanagerScenario) NewGrafanaService(name string, peers []string, peerTimeout string, stopOnExtraDedup bool) *GrafanaService { |
||||
flags := map[string]string{} |
||||
|
||||
ft := []string{ |
||||
"alertStateHistoryLokiSecondary", |
||||
"alertStateHistoryLokiPrimary", |
||||
"alertStateHistoryLokiOnly", |
||||
"alertingAlertmanagerExtraDedupStage", |
||||
} |
||||
if stopOnExtraDedup { |
||||
ft = append(ft, "alertingAlertmanagerExtraDedupStageStopPipeline") |
||||
} |
||||
envVars := map[string]string{ |
||||
//"GF_LOG_MODE": "file", // disable console logging
|
||||
"GF_LOG_LEVEL": "warn", |
||||
"GF_FEATURE_TOGGLES_ENABLE": strings.Join(ft, ","), |
||||
"GF_UNIFIED_ALERTING_ENABLED": "true", |
||||
"GF_UNIFIED_ALERTING_EXECUTE_ALERTS": "true", |
||||
"GF_UNIFIED_ALERTING_HA_PEER_TIMEOUT": peerTimeout, |
||||
"GF_UNIFIED_ALERTING_HA_RECONNECT_TIMEOUT": "2m", |
||||
"GF_UNIFIED_ALERTING_HA_LISTEN_ADDRESS": ":9094", |
||||
"GF_UNIFIED_ALERTING_HA_PEERS": strings.Join(peers, ","), |
||||
"GF_UNIFIED_ALERTING_STATE_HISTORY_ENABLED": "true", |
||||
"GF_UNIFIED_ALERTING_STATE_HISTORY_BACKEND": "loki", |
||||
"GF_UNIFIED_ALERTING_STATE_HISTORY_LOKI_REMOTE_URL": "http://loki:3100", |
||||
"GF_DATABASE_TYPE": "postgres", |
||||
"GF_DATABASE_HOST": "postgres:5432", |
||||
"GF_DATABASE_NAME": "grafana", |
||||
"GF_DATABASE_USER": "postgres", |
||||
"GF_DATABASE_PASSWORD": "password", |
||||
"GF_DATABASE_SSL_MODE": "disable", |
||||
} |
||||
|
||||
g := NewGrafanaService(name, flags, envVars) |
||||
|
||||
s.Grafanas[name] = g |
||||
return g |
||||
} |
||||
|
||||
// NewGrafanaService creates a new Grafana API client for the requested instance.
|
||||
func (s *AlertmanagerScenario) NewGrafanaClient(grafanaName string, orgID int64) (*GrafanaClient, error) { |
||||
g, ok := s.Grafanas[grafanaName] |
||||
if !ok { |
||||
return nil, fmt.Errorf("unknown grafana instance: %s", grafanaName) |
||||
} |
||||
|
||||
return NewGrafanaClient(g.HTTPEndpoint(), orgID) |
||||
} |
||||
|
||||
func (s *AlertmanagerScenario) NewWebhookClient() (*WebhookClient, error) { |
||||
return NewWebhookClient("http://" + s.Webhook.HTTPEndpoint()) |
||||
} |
||||
|
||||
func (s *AlertmanagerScenario) NewWebhookService(name string) *WebhookService { |
||||
ws := NewWebhookService(name, nil, nil) |
||||
s.Webhook = ws |
||||
|
||||
return ws |
||||
} |
||||
|
||||
func (s *AlertmanagerScenario) NewLokiService(name string) *LokiService { |
||||
ls := NewLokiService(name, map[string]string{"--config.file": "/etc/loki/local-config.yaml"}, nil) |
||||
s.Loki = ls |
||||
|
||||
return ls |
||||
} |
||||
|
||||
func (s *AlertmanagerScenario) NewPostgresService(name string) *PostgresService { |
||||
ps := NewPostgresService(name, map[string]string{"POSTGRES_PASSWORD": "password", "POSTGRES_DB": "grafana"}) |
||||
s.Postgres = ps |
||||
|
||||
return ps |
||||
} |
||||
|
||||
func (s *AlertmanagerScenario) NewLokiClient() (*LokiClient, error) { |
||||
return NewLokiClient("http://" + s.Loki.HTTPEndpoint()) |
||||
} |
||||
|
||||
func getNetworkName() string { |
||||
// If the E2E_NETWORK_NAME is set, use that for the network name.
|
||||
// Otherwise, return the default network name.
|
||||
if os.Getenv("E2E_NETWORK_NAME") != "" { |
||||
return os.Getenv("E2E_NETWORK_NAME") |
||||
} |
||||
|
||||
return defaultNetworkName |
||||
} |
||||
|
||||
func getInstances(n int) []string { |
||||
is := make([]string, n) |
||||
|
||||
for i := 0; i < n; i++ { |
||||
is[i] = "grafana-" + strconv.Itoa(i+1) |
||||
} |
||||
|
||||
return is |
||||
} |
||||
|
||||
func getPeers(i string, is []string) []string { |
||||
peers := make([]string, 0, len(is)-1) |
||||
|
||||
for _, p := range is { |
||||
if p != i { |
||||
peers = append(peers, p+":9094") |
||||
} |
||||
} |
||||
|
||||
return peers |
||||
} |
||||
|
||||
func mapInstancePeers(is []string) map[string][]string { |
||||
mIs := make(map[string][]string, len(is)) |
||||
|
||||
for _, i := range is { |
||||
mIs[i] = getPeers(i, is) |
||||
} |
||||
|
||||
return mIs |
||||
} |
@ -0,0 +1,97 @@ |
||||
package alertmanager |
||||
|
||||
import ( |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/stretchr/testify/require" |
||||
) |
||||
|
||||
func TestAlertmanagerIntegration_ExtraDedupStage(t *testing.T) { |
||||
if testing.Short() { |
||||
t.Skip("skipping integration test") |
||||
} |
||||
|
||||
t.Run("assert no flapping alerts when stopOnExtraDedup is enabled", func(t *testing.T) { |
||||
s, err := NewAlertmanagerScenario() |
||||
require.NoError(t, err) |
||||
defer s.Close() |
||||
|
||||
s.Start(t, 20, "15s", true) |
||||
s.Provision(t, ProvisionCfg{ |
||||
AlertRuleConfig: AlertRuleConfig{ |
||||
PendingPeriod: "30s", |
||||
GroupEvaluationIntervalSeconds: 10, |
||||
}, |
||||
NotificationPolicyCfg: NotificationPolicyCfg{ |
||||
GroupWait: "30s", |
||||
GroupInterval: "1m", |
||||
RepeatInterval: "30m", |
||||
}, |
||||
}) |
||||
|
||||
wc, err := s.NewWebhookClient() |
||||
require.NoError(t, err) |
||||
|
||||
lc, err := s.NewLokiClient() |
||||
require.NoError(t, err) |
||||
|
||||
// notifications only start arriving after 2 to 3 minutes so we wait for that
|
||||
time.Sleep(time.Minute * 2) |
||||
|
||||
timeout := time.After(5 * time.Minute) |
||||
ticker := time.NewTicker(5 * time.Second) |
||||
defer ticker.Stop() |
||||
|
||||
for { |
||||
select { |
||||
case <-ticker.C: |
||||
nr, err := wc.GetNotifications() |
||||
if err != nil { |
||||
t.Logf("failed to get alert notifications: %v\n", err) |
||||
continue |
||||
} |
||||
|
||||
// get the latest state for the alert from loki
|
||||
st, err := lc.GetCurrentAlertState() |
||||
if err != nil { |
||||
t.Logf("failed to get alert state: %v\n", err) |
||||
continue |
||||
} |
||||
|
||||
// if the last state is not normal, ignore
|
||||
// we might be missing other cases of flapping notifications but for now we are only interested in this one
|
||||
// (alerting notification when state is already normal)
|
||||
if st.State != AlertStateNormal { |
||||
continue |
||||
} |
||||
|
||||
// history is ordered - fetch the first notification that is after the last state change
|
||||
var i int |
||||
for i = range nr.History { |
||||
if nr.History[i].TimeNow.After(st.Timestamp) { |
||||
break |
||||
} |
||||
} |
||||
|
||||
// if all notifications are from before the last state change, we can wait a bit more
|
||||
if nr.History[i].TimeNow.Before(st.Timestamp) { |
||||
continue |
||||
} |
||||
|
||||
// for all notifications after the last state change, check if there is a firing one
|
||||
for ; i < len(nr.History); i++ { |
||||
notification := nr.History[i] |
||||
if notification.Status == "firing" { |
||||
t.Errorf("flapping notifications - got firing notification when alert was resolved, state = %#v, notification = %#v", st, notification) |
||||
t.FailNow() |
||||
} |
||||
} |
||||
|
||||
case <-timeout: |
||||
// if after the timeout there are no such cases, we assume there are no flapping notifications
|
||||
return |
||||
} |
||||
} |
||||
}) |
||||
} |
@ -0,0 +1,75 @@ |
||||
package alertmanager |
||||
|
||||
import ( |
||||
_ "embed" |
||||
"fmt" |
||||
"net/url" |
||||
"os" |
||||
|
||||
"github.com/grafana/e2e" |
||||
gapi "github.com/grafana/grafana-api-golang-client" |
||||
) |
||||
|
||||
const ( |
||||
grafanaBinary = "/run.sh" |
||||
grafanaHTTPPort = 3000 |
||||
) |
||||
|
||||
// GetDefaultImage returns the Docker image to use to run the Grafana..
|
||||
func GetGrafanaImage() string { |
||||
if img := os.Getenv("GRAFANA_IMAGE"); img != "" { |
||||
return img |
||||
} |
||||
|
||||
if version := os.Getenv("GRAFANA_VERSION"); version != "" { |
||||
return "grafana/grafana-enterprise-dev:" + version |
||||
} |
||||
|
||||
panic("Provide GRAFANA_VERSION or GRAFANA_IMAGE") |
||||
} |
||||
|
||||
type GrafanaService struct { |
||||
*e2e.HTTPService |
||||
} |
||||
|
||||
func NewGrafanaService(name string, flags, envVars map[string]string) *GrafanaService { |
||||
svc := &GrafanaService{ |
||||
HTTPService: e2e.NewHTTPService( |
||||
name, |
||||
GetGrafanaImage(), |
||||
e2e.NewCommandWithoutEntrypoint(grafanaBinary, e2e.BuildArgs(flags)...), |
||||
e2e.NewHTTPReadinessProbe(grafanaHTTPPort, "/ready", 200, 299), |
||||
grafanaHTTPPort, |
||||
9094, |
||||
), |
||||
} |
||||
|
||||
svc.SetEnvVars(envVars) |
||||
|
||||
return svc |
||||
} |
||||
|
||||
type GrafanaClient struct { |
||||
*gapi.Client |
||||
} |
||||
|
||||
// NewGrafanaClient creates a client for using the Grafana API. Note we don't bother
|
||||
// wrapping the client library, and just use it as-is, until we find a reason not to.
|
||||
func NewGrafanaClient(host string, orgID int64) (*GrafanaClient, error) { |
||||
cfg := gapi.Config{ |
||||
BasicAuth: url.UserPassword("admin", "admin"), |
||||
OrgID: orgID, |
||||
HTTPHeaders: map[string]string{ |
||||
"X-Disable-Provenance": "true", |
||||
}, |
||||
} |
||||
|
||||
client, err := gapi.New(fmt.Sprintf("http://%s/", host), cfg) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return &GrafanaClient{ |
||||
Client: client, |
||||
}, nil |
||||
} |
@ -0,0 +1,152 @@ |
||||
package alertmanager |
||||
|
||||
import ( |
||||
"encoding/json" |
||||
"fmt" |
||||
"net/http" |
||||
"net/url" |
||||
"os" |
||||
"strconv" |
||||
"time" |
||||
|
||||
"github.com/grafana/e2e" |
||||
) |
||||
|
||||
const ( |
||||
defaultLokiImage = "grafana/loki:latest" |
||||
lokiBinary = "/usr/bin/loki" |
||||
lokiHTTPPort = 3100 |
||||
) |
||||
|
||||
// GetDefaultImage returns the Docker image to use to run the Loki..
|
||||
func GetLokiImage() string { |
||||
if img := os.Getenv("LOKI_IMAGE"); img != "" { |
||||
return img |
||||
} |
||||
|
||||
return defaultLokiImage |
||||
} |
||||
|
||||
type LokiService struct { |
||||
*e2e.HTTPService |
||||
} |
||||
|
||||
func NewLokiService(name string, flags, envVars map[string]string) *LokiService { |
||||
svc := &LokiService{ |
||||
HTTPService: e2e.NewHTTPService( |
||||
name, |
||||
GetLokiImage(), |
||||
e2e.NewCommandWithoutEntrypoint(lokiBinary, e2e.BuildArgs(flags)...), |
||||
e2e.NewHTTPReadinessProbe(lokiHTTPPort, "/ready", 200, 299), |
||||
lokiHTTPPort, |
||||
), |
||||
} |
||||
|
||||
svc.SetEnvVars(envVars) |
||||
|
||||
return svc |
||||
} |
||||
|
||||
type LokiClient struct { |
||||
c http.Client |
||||
u *url.URL |
||||
} |
||||
|
||||
func NewLokiClient(u string) (*LokiClient, error) { |
||||
pu, err := url.Parse(u) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return &LokiClient{ |
||||
c: http.Client{}, |
||||
u: pu, |
||||
}, nil |
||||
} |
||||
|
||||
type LokiQueryResponse struct { |
||||
Status string `json:"status"` |
||||
Data struct { |
||||
ResultType string `json:"resultType"` |
||||
Result []struct { |
||||
Stream struct { |
||||
Condition string `json:"condition"` |
||||
Current string `json:"current"` |
||||
DashboardUID string `json:"dashboardUID"` |
||||
Fingerprint string `json:"fingerprint"` |
||||
FolderUID string `json:"folderUID"` |
||||
From string `json:"from"` |
||||
Group string `json:"group"` |
||||
LabelsAlertname string `json:"labels_alertname"` |
||||
LabelsGrafanaFolder string `json:"labels_grafana_folder"` |
||||
OrgID string `json:"orgID"` |
||||
PanelID string `json:"panelID"` |
||||
Previous string `json:"previous"` |
||||
RuleID string `json:"ruleID"` |
||||
RuleTitle string `json:"ruleTitle"` |
||||
RuleUID string `json:"ruleUID"` |
||||
SchemaVersion string `json:"schemaVersion"` |
||||
ServiceName string `json:"service_name"` |
||||
ValuesB string `json:"values_B"` |
||||
ValuesC string `json:"values_C"` |
||||
} `json:"stream"` |
||||
Values [][]string `json:"values"` |
||||
} `json:"result"` |
||||
} |
||||
} |
||||
|
||||
type AlertState string |
||||
|
||||
const ( |
||||
AlertStateNormal AlertState = "Normal" |
||||
AlertStatePending AlertState = "Pending" |
||||
AlertStateAlerting AlertState = "Alerting" |
||||
) |
||||
|
||||
type AlertStateResponse struct { |
||||
State AlertState |
||||
Timestamp time.Time |
||||
} |
||||
|
||||
// GetCurrentAlertState fetches the current alert state from loki
|
||||
func (c *LokiClient) GetCurrentAlertState() (*AlertStateResponse, error) { |
||||
u := c.u.ResolveReference(&url.URL{Path: "/loki/api/v1/query_range"}) |
||||
|
||||
vs := url.Values{} |
||||
vs.Add("query", `{from="state-history"} | json`) |
||||
vs.Add("since", "60s") |
||||
|
||||
u.RawQuery = vs.Encode() |
||||
|
||||
resp, err := c.c.Get(u.String()) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
//nolint:errcheck
|
||||
defer resp.Body.Close() |
||||
|
||||
res := LokiQueryResponse{} |
||||
|
||||
if err = json.NewDecoder(resp.Body).Decode(&res); err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
if res.Status != "success" { |
||||
return nil, fmt.Errorf("failed to query state from loki") |
||||
} |
||||
|
||||
if len(res.Data.Result) == 0 { |
||||
return nil, fmt.Errorf("empty result from loki") |
||||
} |
||||
|
||||
r := res.Data.Result[0] |
||||
it, err := strconv.ParseInt(r.Values[0][0], 10, 0) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("failed to parse timestamp: %v", err) |
||||
} |
||||
|
||||
return &AlertStateResponse{ |
||||
State: AlertState(r.Stream.Current), |
||||
Timestamp: time.Unix(0, it), |
||||
}, nil |
||||
} |
@ -0,0 +1,41 @@ |
||||
package alertmanager |
||||
|
||||
import ( |
||||
"os" |
||||
|
||||
"github.com/grafana/e2e" |
||||
) |
||||
|
||||
const ( |
||||
defaultPostgresImage = "postgres:16.4" |
||||
postgresHTTPPort = 5432 |
||||
) |
||||
|
||||
// GetDefaultImage returns the Docker image to use to run the Postgres..
|
||||
func GetPostgresImage() string { |
||||
if img := os.Getenv("POSTGRES_IMAGE"); img != "" { |
||||
return img |
||||
} |
||||
|
||||
return defaultPostgresImage |
||||
} |
||||
|
||||
type PostgresService struct { |
||||
*e2e.HTTPService |
||||
} |
||||
|
||||
func NewPostgresService(name string, envVars map[string]string) *PostgresService { |
||||
svc := &PostgresService{ |
||||
HTTPService: e2e.NewHTTPService( |
||||
name, |
||||
GetPostgresImage(), |
||||
nil, |
||||
nil, |
||||
postgresHTTPPort, |
||||
), |
||||
} |
||||
|
||||
svc.SetEnvVars(envVars) |
||||
|
||||
return svc |
||||
} |
@ -0,0 +1,85 @@ |
||||
package alertmanager |
||||
|
||||
import ( |
||||
"encoding/json" |
||||
"net/http" |
||||
"net/url" |
||||
"time" |
||||
|
||||
"github.com/grafana/e2e" |
||||
) |
||||
|
||||
const ( |
||||
defaultWebhookImage = "webhook-receiver" |
||||
webhookBinary = "/bin/main" |
||||
webhookHTTPPort = 8080 |
||||
) |
||||
|
||||
type WebhookService struct { |
||||
*e2e.HTTPService |
||||
} |
||||
|
||||
func NewWebhookService(name string, flags, envVars map[string]string) *WebhookService { |
||||
svc := &WebhookService{ |
||||
HTTPService: e2e.NewHTTPService( |
||||
name, |
||||
"webhook-receiver", |
||||
e2e.NewCommandWithoutEntrypoint(webhookBinary, e2e.BuildArgs(flags)...), |
||||
e2e.NewHTTPReadinessProbe(webhookHTTPPort, "/ready", 200, 299), |
||||
webhookHTTPPort), |
||||
} |
||||
|
||||
svc.SetEnvVars(envVars) |
||||
|
||||
return svc |
||||
} |
||||
|
||||
type WebhookClient struct { |
||||
c http.Client |
||||
u *url.URL |
||||
} |
||||
|
||||
func NewWebhookClient(u string) (*WebhookClient, error) { |
||||
pu, err := url.Parse(u) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return &WebhookClient{ |
||||
c: http.Client{}, |
||||
u: pu, |
||||
}, nil |
||||
} |
||||
|
||||
type GetNotificationsResponse struct { |
||||
Stats map[string]int `json:"stats"` |
||||
History []struct { |
||||
Status string `json:"status"` |
||||
TimeNow time.Time `json:"timeNow"` |
||||
StartsAt time.Time `json:"startsAt"` |
||||
Node string `json:"node"` |
||||
DeltaLastSeconds float64 `json:"deltaLastSeconds"` |
||||
DeltaStartSeconds float64 `json:"deltaStartSeconds"` |
||||
} `json:"history"` |
||||
} |
||||
|
||||
// GetNotifications fetches notifications from the webhook server
|
||||
func (c *WebhookClient) GetNotifications() (*GetNotificationsResponse, error) { |
||||
u := c.u.ResolveReference(&url.URL{Path: "/notifications"}) |
||||
|
||||
resp, err := c.c.Get(u.String()) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
//nolint:errcheck
|
||||
defer resp.Body.Close() |
||||
|
||||
res := GetNotificationsResponse{} |
||||
|
||||
err = json.NewDecoder(resp.Body).Decode(&res) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return &res, nil |
||||
} |
@ -0,0 +1,41 @@ |
||||
package main |
||||
|
||||
import ( |
||||
"fmt" |
||||
"os" |
||||
"os/exec" |
||||
"sync" |
||||
|
||||
amtests "github.com/grafana/grafana/pkg/tests/alertmanager" |
||||
) |
||||
|
||||
func docker(args []string) { |
||||
cmd := exec.Command("docker", args...) |
||||
cmd.Stdout = os.Stdout |
||||
cmd.Stderr = os.Stderr |
||||
if err := cmd.Run(); err != nil { |
||||
fmt.Printf("docker pull failed: %v\n", err) |
||||
os.Exit(1) |
||||
} |
||||
} |
||||
|
||||
func main() { |
||||
var wg sync.WaitGroup |
||||
|
||||
for _, cmd := range [][]string{ |
||||
{"pull", amtests.GetGrafanaImage()}, |
||||
{"pull", amtests.GetLokiImage()}, |
||||
{"pull", amtests.GetPostgresImage()}, |
||||
{"build", "-t", "webhook-receiver", "devenv/docker/blocks/stateful_webhook"}, |
||||
} { |
||||
wg.Add(1) |
||||
|
||||
go func(cmd []string) { |
||||
defer wg.Done() |
||||
|
||||
docker(cmd) |
||||
}(cmd) |
||||
} |
||||
|
||||
wg.Wait() |
||||
} |
Loading…
Reference in new issue