diff --git a/conf/defaults.ini b/conf/defaults.ini index 597a74b88de..ccd2ba2d73a 100644 --- a/conf/defaults.ini +++ b/conf/defaults.ini @@ -1560,6 +1560,30 @@ max_age = # Configures max number of alert annotations that Grafana stores. Default value is 0, which keeps all alert annotations. max_annotations_to_keep = +[unified_alerting.notification_history] +# Enable the notification history functionality in Unified Alerting. +# Alertmanager notification logs will be stored in Loki. +enabled = false + +# URL of the Loki instance. +loki_remote_url = + +# Optional tenant ID to attach to requests sent to Loki. +loki_tenant_id = + +# Optional username for basic authentication on requests sent to Loki. Can be left blank to disable basic auth. +loki_basic_auth_username = + +# Optional password for basic authentication on requests sent to Loki. Can be left blank. +loki_basic_auth_password = + +[unified_alerting.notification_history.external_labels] +# Optional extra labels to attach to outbound notification history records or log streams. +# Any number of label key-value-pairs can be provided. +# +# ex. +# mylabelkey = mylabelvalue + [unified_alerting.prometheus_conversion] # Configuration options for converting Prometheus alerting and recording rules to Grafana rules. # These settings affect rules created via the Prometheus conversion API. diff --git a/conf/sample.ini b/conf/sample.ini index 8835829ddb2..238162d2590 100644 --- a/conf/sample.ini +++ b/conf/sample.ini @@ -1535,6 +1535,28 @@ max_age = # Configures max number of alert annotations that Grafana stores. Default value is 0, which keeps all alert annotations. max_annotations_to_keep = +[unified_alerting.notification_history] +# Enable the notification history functionality in Unified Alerting. +# Alertmanager notification logs will be stored in Loki. +; enabled = false + +# URL of the Loki instance. +; loki_remote_url = + +# Optional tenant ID to attach to requests sent to Loki. +; loki_tenant_id = + +# Optional username for basic authentication on requests sent to Loki. Can be left blank to disable basic auth. +; loki_basic_auth_username = + +# Optional password for basic authentication on requests sent to Loki. Can be left blank. +; loki_basic_auth_password = + +[unified_alerting.notification_history.external_labels] +# Optional extra labels to attach to outbound notification history records or log streams. +# Any number of label key-value-pairs can be provided. +; mylabelkey = mylabelvalue + [unified_alerting.prometheus_conversion] # Configuration options for converting Prometheus alerting and recording rules to Grafana rules. # These settings affect rules created via the Prometheus conversion API. diff --git a/go.mod b/go.mod index 2e3ce1071f5..b922613a0a8 100644 --- a/go.mod +++ b/go.mod @@ -85,7 +85,7 @@ require ( github.com/googleapis/gax-go/v2 v2.14.2 // @grafana/grafana-backend-group github.com/gorilla/mux v1.8.1 // @grafana/grafana-backend-group github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // @grafana/grafana-app-platform-squad - github.com/grafana/alerting v0.0.0-20250711181610-8eef376f49f8 // @grafana/alerting-backend + github.com/grafana/alerting v0.0.0-20250716142237-8308539caa27 // @grafana/alerting-backend github.com/grafana/authlib v0.0.0-20250710201142-9542f2f28d43 // @grafana/identity-access-team github.com/grafana/authlib/types v0.0.0-20250710201142-9542f2f28d43 // @grafana/identity-access-team github.com/grafana/dataplane/examples v0.0.1 // @grafana/observability-metrics diff --git a/go.sum b/go.sum index 2630af0674d..f13cfc2dc32 100644 --- a/go.sum +++ b/go.sum @@ -1573,8 +1573,8 @@ github.com/gorilla/sessions v1.2.1 h1:DHd3rPN5lE3Ts3D8rKkQ8x/0kqfeNmBAaiSi+o7Fsg github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 h1:JeSE6pjso5THxAzdVpqr6/geYxZytqFMBCOtn/ujyeo= github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674/go.mod h1:r4w70xmWCQKmi1ONH4KIaBptdivuRPyosB9RmPlGEwA= -github.com/grafana/alerting v0.0.0-20250711181610-8eef376f49f8 h1:XUfln7L8Lz1E+gWU3Zz9+H+qIIqsBEls57vZmokoQog= -github.com/grafana/alerting v0.0.0-20250711181610-8eef376f49f8/go.mod h1:gtR7agmxVfJOmNKV/n2ZULgOYTYNL+PDKYB5N48tQ7Q= +github.com/grafana/alerting v0.0.0-20250716142237-8308539caa27 h1:buoRKSKUO2QGJ16j2mY8T8S0KTjwB9uLtBrNvbYjkmM= +github.com/grafana/alerting v0.0.0-20250716142237-8308539caa27/go.mod h1:gtR7agmxVfJOmNKV/n2ZULgOYTYNL+PDKYB5N48tQ7Q= github.com/grafana/authlib v0.0.0-20250710201142-9542f2f28d43 h1:vVPT0i5Y1vI6qzecYStV2yk7cHKrC3Pc7AgvwT5KydQ= github.com/grafana/authlib v0.0.0-20250710201142-9542f2f28d43/go.mod h1:1fWkOiL+m32NBgRHZtlZGz2ji868tPZACYbqP3nBRJI= github.com/grafana/authlib/types v0.0.0-20250710201142-9542f2f28d43 h1:NlkGMnVi/oUn6Cr90QbJYpQJ4FnjyAIG9Ex5GtTZIzw= diff --git a/packages/grafana-data/src/types/featureToggles.gen.ts b/packages/grafana-data/src/types/featureToggles.gen.ts index da95cb93db2..e99d737d882 100644 --- a/packages/grafana-data/src/types/featureToggles.gen.ts +++ b/packages/grafana-data/src/types/featureToggles.gen.ts @@ -1045,4 +1045,9 @@ export interface FeatureToggles { * Applies OTel formatting templates to displayed logs */ otelLogsFormatting?: boolean; + /** + * Enables the notification history feature + * @default false + */ + alertingNotificationHistory?: boolean; } diff --git a/pkg/services/annotations/annotationsimpl/loki/historian_store.go b/pkg/services/annotations/annotationsimpl/loki/historian_store.go index 8ab1095cdec..59ed1494c41 100644 --- a/pkg/services/annotations/annotationsimpl/loki/historian_store.go +++ b/pkg/services/annotations/annotationsimpl/loki/historian_store.go @@ -8,6 +8,7 @@ import ( "sort" "time" + "github.com/grafana/grafana/pkg/services/ngalert/lokiclient" "golang.org/x/exp/constraints" "github.com/grafana/grafana/pkg/components/simplejson" @@ -44,7 +45,7 @@ type RuleStore interface { } type lokiQueryClient interface { - RangeQuery(ctx context.Context, query string, start, end, limit int64) (historian.QueryRes, error) + RangeQuery(ctx context.Context, query string, start, end, limit int64) (lokiclient.QueryRes, error) MaxQuerySize() int } @@ -60,14 +61,15 @@ func NewLokiHistorianStore(cfg setting.UnifiedAlertingStateHistorySettings, db d if !useStore(cfg) { return nil } - lokiCfg, err := historian.NewLokiConfig(cfg) + lokiCfg, err := lokiclient.NewLokiConfig(cfg.LokiSettings) if err != nil { // this config error is already handled elsewhere return nil } + metrics := ngmetrics.NewHistorianMetrics(reg, subsystem) return &LokiHistorianStore{ - client: historian.NewLokiClient(lokiCfg, historian.NewRequester(), ngmetrics.NewHistorianMetrics(reg, subsystem), log, tracer), + client: lokiclient.NewLokiClient(lokiCfg, lokiclient.NewRequester(), metrics.BytesWritten, metrics.WriteDuration, log, tracer, historian.LokiClientSpanName), db: db, log: log, ruleStore: ruleStore, @@ -142,7 +144,7 @@ func (r *LokiHistorianStore) Get(ctx context.Context, query annotations.ItemQuer return items, err } -func (r *LokiHistorianStore) annotationsFromStream(stream historian.Stream, ac accesscontrol.AccessResources) []*annotations.ItemDTO { +func (r *LokiHistorianStore) annotationsFromStream(stream lokiclient.Stream, ac accesscontrol.AccessResources) []*annotations.ItemDTO { items := make([]*annotations.ItemDTO, 0, len(stream.Values)) for _, sample := range stream.Values { entry := historian.LokiEntry{} diff --git a/pkg/services/annotations/annotationsimpl/loki/historian_store_test.go b/pkg/services/annotations/annotationsimpl/loki/historian_store_test.go index cf6c7af51e3..12ce9254cf1 100644 --- a/pkg/services/annotations/annotationsimpl/loki/historian_store_test.go +++ b/pkg/services/annotations/annotationsimpl/loki/historian_store_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/grafana/grafana/pkg/services/ngalert/lokiclient" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "golang.org/x/exp/maps" @@ -84,7 +85,7 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) { t.Run("can query history by alert id", func(t *testing.T) { rule := dashboardRules[dashboard1.UID][0] - fakeLokiClient.rangeQueryRes = []historian.Stream{ + fakeLokiClient.rangeQueryRes = []lokiclient.Stream{ historian.StatesToStream(ruleMetaFromRule(t, rule), transitions, map[string]string{}, log.NewNopLogger()), } @@ -111,7 +112,7 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) { t.Run("can query history by alert uid", func(t *testing.T) { rule := dashboardRules[dashboard1.UID][0] - fakeLokiClient.rangeQueryRes = []historian.Stream{ + fakeLokiClient.rangeQueryRes = []lokiclient.Stream{ historian.StatesToStream(ruleMetaFromRule(t, rule), transitions, map[string]string{}, log.NewNopLogger()), } @@ -186,7 +187,7 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) { }) t.Run("can query history by dashboard id", func(t *testing.T) { - fakeLokiClient.rangeQueryRes = []historian.Stream{ + fakeLokiClient.rangeQueryRes = []lokiclient.Stream{ historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][0]), transitions, map[string]string{}, log.NewNopLogger()), historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][1]), transitions, map[string]string{}, log.NewNopLogger()), } @@ -212,7 +213,7 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) { }) t.Run("should return empty results when type is annotation", func(t *testing.T) { - fakeLokiClient.rangeQueryRes = []historian.Stream{ + fakeLokiClient.rangeQueryRes = []lokiclient.Stream{ historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][0]), transitions, map[string]string{}, log.NewNopLogger()), historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][1]), transitions, map[string]string{}, log.NewNopLogger()), } @@ -236,7 +237,7 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) { }) t.Run("should return empty results when history is outside time range", func(t *testing.T) { - fakeLokiClient.rangeQueryRes = []historian.Stream{ + fakeLokiClient.rangeQueryRes = []lokiclient.Stream{ historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][0]), transitions, map[string]string{}, log.NewNopLogger()), historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][1]), transitions, map[string]string{}, log.NewNopLogger()), } @@ -262,7 +263,7 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) { }) t.Run("should return partial results when history is partly outside clamped time range", func(t *testing.T) { - fakeLokiClient.rangeQueryRes = []historian.Stream{ + fakeLokiClient.rangeQueryRes = []lokiclient.Stream{ historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][0]), transitions, map[string]string{}, log.NewNopLogger()), historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][1]), transitions, map[string]string{}, log.NewNopLogger()), } @@ -295,7 +296,7 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) { }) t.Run("should sort history by time and be able to query by dashboard uid", func(t *testing.T) { - fakeLokiClient.rangeQueryRes = []historian.Stream{ + fakeLokiClient.rangeQueryRes = []lokiclient.Stream{ historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][0]), transitions, map[string]string{}, log.NewNopLogger()), historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][1]), transitions, map[string]string{}, log.NewNopLogger()), } @@ -329,7 +330,7 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) { }) t.Run("should return nothing if query is for tags only", func(t *testing.T) { - fakeLokiClient.rangeQueryRes = []historian.Stream{ + fakeLokiClient.rangeQueryRes = []lokiclient.Stream{ historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][0]), transitions, map[string]string{}, log.NewNopLogger()), historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][1]), transitions, map[string]string{}, log.NewNopLogger()), } @@ -360,13 +361,13 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) { store := createTestLokiStore(t, sql, fakeLokiClient) t.Run("should return empty list when no streams", func(t *testing.T) { - items := store.annotationsFromStream(historian.Stream{}, annotation_ac.AccessResources{}) + items := store.annotationsFromStream(lokiclient.Stream{}, annotation_ac.AccessResources{}) require.Empty(t, items) }) t.Run("should return empty list when no entries", func(t *testing.T) { - items := store.annotationsFromStream(historian.Stream{ - Values: []historian.Sample{}, + items := store.annotationsFromStream(lokiclient.Stream{ + Values: []lokiclient.Sample{}, }, annotation_ac.AccessResources{}) require.Empty(t, items) }) @@ -419,7 +420,7 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) { rule = createAlertRule(t, sql, "Test rule", gen) stream2 := historian.StatesToStream(ruleMetaFromRule(t, rule), transitions, map[string]string{}, log.NewNopLogger()) - stream := historian.Stream{ + stream := lokiclient.Stream{ Values: append(stream1.Values, stream2.Values...), Stream: stream1.Stream, } @@ -450,7 +451,7 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) { rule.DashboardUID = nil stream2 := historian.StatesToStream(ruleMetaFromRule(t, rule), transitions, map[string]string{}, log.NewNopLogger()) - stream := historian.Stream{ + stream := lokiclient.Stream{ Values: append(stream1.Values, stream2.Values...), Stream: stream1.Stream, } @@ -811,23 +812,23 @@ func compareAnnotationItem(t *testing.T, expected, actual *annotations.ItemDTO) type FakeLokiClient struct { client client.Requester - cfg historian.LokiConfig + cfg lokiclient.LokiConfig metrics *metrics.Historian log log.Logger - rangeQueryRes []historian.Stream + rangeQueryRes []lokiclient.Stream } func NewFakeLokiClient() *FakeLokiClient { url, _ := url.Parse("http://some.url") - req := historian.NewFakeRequester() + req := lokiclient.NewFakeRequester() metrics := metrics.NewHistorianMetrics(prometheus.NewRegistry(), "annotations_test") return &FakeLokiClient{ client: client.NewTimedClient(req, metrics.WriteDuration), - cfg: historian.LokiConfig{ + cfg: lokiclient.LokiConfig{ WritePathURL: url, ReadPathURL: url, - Encoder: historian.JsonEncoder{}, + Encoder: lokiclient.JsonEncoder{}, MaxQueryLength: 721 * time.Hour, MaxQuerySize: 65536, }, @@ -836,15 +837,15 @@ func NewFakeLokiClient() *FakeLokiClient { } } -func (c *FakeLokiClient) RangeQuery(ctx context.Context, query string, from, to, limit int64) (historian.QueryRes, error) { - streams := make([]historian.Stream, len(c.rangeQueryRes)) +func (c *FakeLokiClient) RangeQuery(ctx context.Context, query string, from, to, limit int64) (lokiclient.QueryRes, error) { + streams := make([]lokiclient.Stream, len(c.rangeQueryRes)) // clamp time range using logic from historian - from, to = historian.ClampRange(from, to, c.cfg.MaxQueryLength.Nanoseconds()) + from, to = lokiclient.ClampRange(from, to, c.cfg.MaxQueryLength.Nanoseconds()) for n, stream := range c.rangeQueryRes { streams[n].Stream = stream.Stream - streams[n].Values = []historian.Sample{} + streams[n].Values = []lokiclient.Sample{} for _, sample := range stream.Values { if sample.T.UnixNano() < from || sample.T.UnixNano() >= to { // matches Loki behavior continue @@ -853,14 +854,14 @@ func (c *FakeLokiClient) RangeQuery(ctx context.Context, query string, from, to, } } - res := historian.QueryRes{ - Data: historian.QueryData{ + res := lokiclient.QueryRes{ + Data: lokiclient.QueryData{ Result: streams, }, } // reset expected streams on read - c.rangeQueryRes = []historian.Stream{} + c.rangeQueryRes = []lokiclient.Stream{} return res, nil } diff --git a/pkg/services/featuremgmt/registry.go b/pkg/services/featuremgmt/registry.go index ed2d1ad9c8b..8dcf7371de1 100644 --- a/pkg/services/featuremgmt/registry.go +++ b/pkg/services/featuremgmt/registry.go @@ -1800,6 +1800,15 @@ var ( FrontendOnly: true, Owner: grafanaObservabilityLogsSquad, }, + { + Name: "alertingNotificationHistory", + Description: "Enables the notification history feature", + Stage: FeatureStageExperimental, + Owner: grafanaAlertingSquad, + HideFromAdminPage: true, + HideFromDocs: true, + Expression: "false", + }, } ) diff --git a/pkg/services/featuremgmt/toggles_gen.csv b/pkg/services/featuremgmt/toggles_gen.csv index df7f2b9dd58..571010580d7 100644 --- a/pkg/services/featuremgmt/toggles_gen.csv +++ b/pkg/services/featuremgmt/toggles_gen.csv @@ -233,3 +233,4 @@ enableAppChromeExtensions,experimental,@grafana/plugins-platform-backend,false,f foldersAppPlatformAPI,experimental,@grafana/grafana-search-navigate-organise,false,false,true enablePluginImporter,experimental,@grafana/plugins-platform-backend,false,false,true otelLogsFormatting,experimental,@grafana/observability-logs,false,false,true +alertingNotificationHistory,experimental,@grafana/alerting-squad,false,false,false diff --git a/pkg/services/featuremgmt/toggles_gen.go b/pkg/services/featuremgmt/toggles_gen.go index 67f5c3e0669..0ce86c85bdb 100644 --- a/pkg/services/featuremgmt/toggles_gen.go +++ b/pkg/services/featuremgmt/toggles_gen.go @@ -942,4 +942,8 @@ const ( // FlagOtelLogsFormatting // Applies OTel formatting templates to displayed logs FlagOtelLogsFormatting = "otelLogsFormatting" + + // FlagAlertingNotificationHistory + // Enables the notification history feature + FlagAlertingNotificationHistory = "alertingNotificationHistory" ) diff --git a/pkg/services/featuremgmt/toggles_gen.json b/pkg/services/featuremgmt/toggles_gen.json index c545d85e6ee..076d491c7b1 100644 --- a/pkg/services/featuremgmt/toggles_gen.json +++ b/pkg/services/featuremgmt/toggles_gen.json @@ -294,6 +294,21 @@ "expression": "true" } }, + { + "metadata": { + "name": "alertingNotificationHistory", + "resourceVersion": "1752682072771", + "creationTimestamp": "2025-07-16T16:07:52Z" + }, + "spec": { + "description": "Enables the notification history feature", + "stage": "experimental", + "codeowner": "@grafana/alerting-squad", + "hideFromAdminPage": true, + "hideFromDocs": true, + "expression": "false" + } + }, { "metadata": { "name": "alertingNotificationsStepMode", diff --git a/pkg/services/ngalert/api/api_alertmanager_test.go b/pkg/services/ngalert/api/api_alertmanager_test.go index ab202caa75d..bd253f5a685 100644 --- a/pkg/services/ngalert/api/api_alertmanager_test.go +++ b/pkg/services/ngalert/api/api_alertmanager_test.go @@ -611,6 +611,7 @@ func createMultiOrgAlertmanager(t *testing.T, configs map[int64]*ngmodels.AlertC log.New("testlogger"), secretsService, featuremgmt.WithManager(), + nil, ) require.NoError(t, err) err = mam.LoadAndSyncAlertmanagersForOrgs(context.Background()) diff --git a/pkg/services/ngalert/state/historian/loki_http.go b/pkg/services/ngalert/lokiclient/client.go similarity index 90% rename from pkg/services/ngalert/state/historian/loki_http.go rename to pkg/services/ngalert/lokiclient/client.go index c925dc361c5..0b1fca30daf 100644 --- a/pkg/services/ngalert/state/historian/loki_http.go +++ b/pkg/services/ngalert/lokiclient/client.go @@ -1,4 +1,4 @@ -package historian +package lokiclient import ( "bytes" @@ -11,11 +11,12 @@ import ( "strconv" "time" + "github.com/grafana/dskit/instrument" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/services/ngalert/client" - "github.com/grafana/grafana/pkg/services/ngalert/metrics" "github.com/grafana/grafana/pkg/setting" + "github.com/prometheus/client_golang/prometheus" ) const defaultPageSize = 1000 @@ -45,7 +46,7 @@ type LokiConfig struct { MaxQuerySize int } -func NewLokiConfig(cfg setting.UnifiedAlertingStateHistorySettings) (LokiConfig, error) { +func NewLokiConfig(cfg setting.UnifiedAlertingLokiSettings) (LokiConfig, error) { read, write := cfg.LokiReadURL, cfg.LokiWriteURL if read == "" { read = cfg.LokiRemoteURL @@ -85,11 +86,11 @@ func NewLokiConfig(cfg setting.UnifiedAlertingStateHistorySettings) (LokiConfig, } type HttpLokiClient struct { - client client.Requester - encoder encoder - cfg LokiConfig - metrics *metrics.Historian - log log.Logger + client client.Requester + encoder encoder + cfg LokiConfig + bytesWritten prometheus.Counter + log log.Logger } // Kind of Operation (=, !=, =~, !~) @@ -106,15 +107,15 @@ const ( NeqRegEx Operator = "!~" ) -func NewLokiClient(cfg LokiConfig, req client.Requester, metrics *metrics.Historian, logger log.Logger, tracer tracing.Tracer) *HttpLokiClient { - tc := client.NewTimedClient(req, metrics.WriteDuration) - trc := client.NewTracedClient(tc, tracer, "ngalert.historian.client") +func NewLokiClient(cfg LokiConfig, req client.Requester, bytesWritten prometheus.Counter, writeDuration *instrument.HistogramCollector, logger log.Logger, tracer tracing.Tracer, spanName string) *HttpLokiClient { + tc := client.NewTimedClient(req, writeDuration) + trc := client.NewTracedClient(tc, tracer, spanName) return &HttpLokiClient{ - client: trc, - encoder: cfg.Encoder, - cfg: cfg, - metrics: metrics, - log: logger.New("protocol", "http"), + client: trc, + encoder: cfg.Encoder, + cfg: cfg, + bytesWritten: bytesWritten, + log: logger.New("protocol", "http"), } } @@ -198,7 +199,7 @@ func (c *HttpLokiClient) Push(ctx context.Context, s []Stream) error { req.Header.Add(k, v) } - c.metrics.BytesWritten.Add(float64(len(enc))) + c.bytesWritten.Add(float64(len(enc))) req = req.WithContext(ctx) resp, err := c.client.Do(req) if err != nil { diff --git a/pkg/services/ngalert/state/historian/loki_http_test.go b/pkg/services/ngalert/lokiclient/client_test.go similarity index 88% rename from pkg/services/ngalert/state/historian/loki_http_test.go rename to pkg/services/ngalert/lokiclient/client_test.go index 0c81eca1ee4..5568ec1601c 100644 --- a/pkg/services/ngalert/state/historian/loki_http_test.go +++ b/pkg/services/ngalert/lokiclient/client_test.go @@ -1,4 +1,4 @@ -package historian +package lokiclient import ( "bytes" @@ -21,11 +21,13 @@ import ( "github.com/grafana/grafana/pkg/infra/tracing" ) +const lokiClientSpanName = "testLokiClientSpanName" + func TestLokiConfig(t *testing.T) { t.Run("test URL options", func(t *testing.T) { type testCase struct { name string - in setting.UnifiedAlertingStateHistorySettings + in setting.UnifiedAlertingLokiSettings expRead string expWrite string expErr string @@ -34,7 +36,7 @@ func TestLokiConfig(t *testing.T) { cases := []testCase{ { name: "remote url only", - in: setting.UnifiedAlertingStateHistorySettings{ + in: setting.UnifiedAlertingLokiSettings{ LokiRemoteURL: "http://url.com", }, expRead: "http://url.com", @@ -42,7 +44,7 @@ func TestLokiConfig(t *testing.T) { }, { name: "separate urls", - in: setting.UnifiedAlertingStateHistorySettings{ + in: setting.UnifiedAlertingLokiSettings{ LokiReadURL: "http://read.url.com", LokiWriteURL: "http://write.url.com", }, @@ -51,7 +53,7 @@ func TestLokiConfig(t *testing.T) { }, { name: "single fallback", - in: setting.UnifiedAlertingStateHistorySettings{ + in: setting.UnifiedAlertingLokiSettings{ LokiRemoteURL: "http://url.com", LokiReadURL: "http://read.url.com", }, @@ -60,21 +62,21 @@ func TestLokiConfig(t *testing.T) { }, { name: "missing read", - in: setting.UnifiedAlertingStateHistorySettings{ + in: setting.UnifiedAlertingLokiSettings{ LokiWriteURL: "http://url.com", }, expErr: "either read path URL or remote", }, { name: "missing write", - in: setting.UnifiedAlertingStateHistorySettings{ + in: setting.UnifiedAlertingLokiSettings{ LokiReadURL: "http://url.com", }, expErr: "either write path URL or remote", }, { name: "invalid", - in: setting.UnifiedAlertingStateHistorySettings{ + in: setting.UnifiedAlertingLokiSettings{ LokiRemoteURL: "://://", }, expErr: "failed to parse", @@ -95,7 +97,7 @@ func TestLokiConfig(t *testing.T) { }) t.Run("captures external labels", func(t *testing.T) { - set := setting.UnifiedAlertingStateHistorySettings{ + set := setting.UnifiedAlertingLokiSettings{ LokiRemoteURL: "http://url.com", ExternalLabels: map[string]string{"a": "b"}, } @@ -127,8 +129,8 @@ func TestLokiHTTPClient(t *testing.T) { err := client.Push(context.Background(), data) require.NoError(t, err) - require.Contains(t, "/loki/api/v1/push", req.lastRequest.URL.Path) - sent := reqBody(t, req.lastRequest) + require.Contains(t, "/loki/api/v1/push", req.LastRequest.URL.Path) + sent := reqBody(t, req.LastRequest) exp := fmt.Sprintf(`{"streams": [{"stream": {}, "values": [["%d", "some line"]]}]}`, now.UnixNano()) require.JSONEq(t, exp, sent) }) @@ -149,7 +151,7 @@ func TestLokiHTTPClient(t *testing.T) { _, err := client.RangeQuery(context.Background(), q, now-100, now, 1100) require.NoError(t, err) - params := req.lastRequest.URL.Query() + params := req.LastRequest.URL.Query() require.True(t, params.Has("limit"), "query params did not contain 'limit': %#v", params) require.Equal(t, fmt.Sprint(1100), params.Get("limit")) }) @@ -169,7 +171,7 @@ func TestLokiHTTPClient(t *testing.T) { _, err := client.RangeQuery(context.Background(), q, now-100, now, 0) require.NoError(t, err) - params := req.lastRequest.URL.Query() + params := req.LastRequest.URL.Query() require.True(t, params.Has("limit"), "query params did not contain 'limit': %#v", params) require.Equal(t, fmt.Sprint(defaultPageSize), params.Get("limit")) }) @@ -189,7 +191,7 @@ func TestLokiHTTPClient(t *testing.T) { _, err := client.RangeQuery(context.Background(), q, now-100, now, -100) require.NoError(t, err) - params := req.lastRequest.URL.Query() + params := req.LastRequest.URL.Query() require.True(t, params.Has("limit"), "query params did not contain 'limit': %#v", params) require.Equal(t, fmt.Sprint(defaultPageSize), params.Get("limit")) }) @@ -209,7 +211,7 @@ func TestLokiHTTPClient(t *testing.T) { _, err := client.RangeQuery(context.Background(), q, now-100, now, maximumPageSize+1000) require.NoError(t, err) - params := req.lastRequest.URL.Query() + params := req.LastRequest.URL.Query() require.True(t, params.Has("limit"), "query params did not contain 'limit': %#v", params) require.Equal(t, fmt.Sprint(maximumPageSize), params.Get("limit")) }) @@ -224,11 +226,12 @@ func TestLokiHTTPClient_Manual(t *testing.T) { url, err := url.Parse("https://logs-prod-eu-west-0.grafana.net") require.NoError(t, err) + metrics := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem) client := NewLokiClient(LokiConfig{ ReadPathURL: url, WritePathURL: url, Encoder: JsonEncoder{}, - }, NewRequester(), metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem), log.NewNopLogger(), tracing.InitializeTracerForTest()) + }, NewRequester(), metrics.BytesWritten, metrics.WriteDuration, log.NewNopLogger(), tracing.InitializeTracerForTest(), lokiClientSpanName) // Unauthorized request should fail against Grafana Cloud. err = client.Ping(context.Background()) @@ -250,13 +253,14 @@ func TestLokiHTTPClient_Manual(t *testing.T) { url, err := url.Parse("https://logs-prod-eu-west-0.grafana.net") require.NoError(t, err) + metrics := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem) client := NewLokiClient(LokiConfig{ ReadPathURL: url, WritePathURL: url, BasicAuthUser: "", BasicAuthPassword: "", Encoder: JsonEncoder{}, - }, NewRequester(), metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem), log.NewNopLogger(), tracing.InitializeTracerForTest()) + }, NewRequester(), metrics.BytesWritten, metrics.WriteDuration, log.NewNopLogger(), tracing.InitializeTracerForTest(), lokiClientSpanName) // When running on prem, you might need to set the tenant id, // so the x-scope-orgid header is set. @@ -390,7 +394,7 @@ func createTestLokiClient(req client.Requester) *HttpLokiClient { Encoder: JsonEncoder{}, } met := metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem) - return NewLokiClient(cfg, req, met, log.NewNopLogger(), tracing.InitializeTracerForTest()) + return NewLokiClient(cfg, req, met.BytesWritten, met.WriteDuration, log.NewNopLogger(), tracing.InitializeTracerForTest(), lokiClientSpanName) } func reqBody(t *testing.T, req *http.Request) string { diff --git a/pkg/services/ngalert/state/historian/encode.go b/pkg/services/ngalert/lokiclient/encode.go similarity index 99% rename from pkg/services/ngalert/state/historian/encode.go rename to pkg/services/ngalert/lokiclient/encode.go index 18a551c31b8..16dd827f6c5 100644 --- a/pkg/services/ngalert/state/historian/encode.go +++ b/pkg/services/ngalert/lokiclient/encode.go @@ -1,4 +1,4 @@ -package historian +package lokiclient import ( "encoding/json" diff --git a/pkg/services/ngalert/lokiclient/testing.go b/pkg/services/ngalert/lokiclient/testing.go new file mode 100644 index 00000000000..8d3cd097f0c --- /dev/null +++ b/pkg/services/ngalert/lokiclient/testing.go @@ -0,0 +1,45 @@ +package lokiclient + +import ( + "bytes" + "io" + "net/http" +) + +type FakeRequester struct { + LastRequest *http.Request + Resp *http.Response +} + +func NewFakeRequester() *FakeRequester { + return &FakeRequester{ + Resp: &http.Response{ + Status: "200 OK", + StatusCode: 200, + Body: io.NopCloser(bytes.NewBufferString("")), + ContentLength: int64(0), + Header: make(http.Header, 0), + }, + } +} + +func (f *FakeRequester) WithResponse(resp *http.Response) *FakeRequester { + f.Resp = resp + return f +} + +func (f *FakeRequester) Do(req *http.Request) (*http.Response, error) { + f.LastRequest = req + f.Resp.Request = req // Not concurrency-safe! + return f.Resp, nil +} + +func BadResponse() *http.Response { + return &http.Response{ + Status: "400 Bad Request", + StatusCode: http.StatusBadRequest, + Body: io.NopCloser(bytes.NewBufferString("")), + ContentLength: int64(0), + Header: make(http.Header, 0), + } +} diff --git a/pkg/services/ngalert/metrics/ngalert.go b/pkg/services/ngalert/metrics/ngalert.go index 52ce2d05aa9..0ae8434b9c0 100644 --- a/pkg/services/ngalert/metrics/ngalert.go +++ b/pkg/services/ngalert/metrics/ngalert.go @@ -25,26 +25,28 @@ type NGAlert struct { // Registerer is used by subcomponents which register their own metrics. Registerer prometheus.Registerer - schedulerMetrics *Scheduler - stateMetrics *State - multiOrgAlertmanagerMetrics *MultiOrgAlertmanager - apiMetrics *API - historianMetrics *Historian - remoteAlertmanagerMetrics *RemoteAlertmanager - remoteWriterMetrics *RemoteWriter + schedulerMetrics *Scheduler + stateMetrics *State + multiOrgAlertmanagerMetrics *MultiOrgAlertmanager + apiMetrics *API + historianMetrics *Historian + notificationHistorianMetrics *NotificationHistorian + remoteAlertmanagerMetrics *RemoteAlertmanager + remoteWriterMetrics *RemoteWriter } // NewNGAlert manages the metrics of all the alerting components. func NewNGAlert(r prometheus.Registerer) *NGAlert { return &NGAlert{ - Registerer: r, - schedulerMetrics: NewSchedulerMetrics(r), - stateMetrics: NewStateMetrics(r), - multiOrgAlertmanagerMetrics: NewMultiOrgAlertmanagerMetrics(r), - apiMetrics: NewAPIMetrics(r), - historianMetrics: NewHistorianMetrics(r, Subsystem), - remoteAlertmanagerMetrics: NewRemoteAlertmanagerMetrics(r), - remoteWriterMetrics: NewRemoteWriterMetrics(r), + Registerer: r, + schedulerMetrics: NewSchedulerMetrics(r), + stateMetrics: NewStateMetrics(r), + multiOrgAlertmanagerMetrics: NewMultiOrgAlertmanagerMetrics(r), + apiMetrics: NewAPIMetrics(r), + historianMetrics: NewHistorianMetrics(r, Subsystem), + notificationHistorianMetrics: NewNotificationHistorianMetrics(r), + remoteAlertmanagerMetrics: NewRemoteAlertmanagerMetrics(r), + remoteWriterMetrics: NewRemoteWriterMetrics(r), } } @@ -68,6 +70,10 @@ func (ng *NGAlert) GetHistorianMetrics() *Historian { return ng.historianMetrics } +func (ng *NGAlert) GetNotificationHistorianMetrics() *NotificationHistorian { + return ng.notificationHistorianMetrics +} + func (ng *NGAlert) GetRemoteAlertmanagerMetrics() *RemoteAlertmanager { return ng.remoteAlertmanagerMetrics } diff --git a/pkg/services/ngalert/metrics/notification_historian.go b/pkg/services/ngalert/metrics/notification_historian.go new file mode 100644 index 00000000000..880e156a83c --- /dev/null +++ b/pkg/services/ngalert/metrics/notification_historian.go @@ -0,0 +1,51 @@ +package metrics + +import ( + "github.com/grafana/dskit/instrument" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +type NotificationHistorian struct { + Info prometheus.Gauge + WritesTotal prometheus.Counter + WritesFailed prometheus.Counter + WriteDuration *instrument.HistogramCollector + BytesWritten prometheus.Counter +} + +func NewNotificationHistorianMetrics(r prometheus.Registerer) *NotificationHistorian { + return &NotificationHistorian{ + Info: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: Subsystem, + Name: "notification_history_info", + Help: "Information about the notification history store.", + }), + WritesTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: Namespace, + Subsystem: Subsystem, + Name: "notification_history_writes_total", + Help: "The total number of notification history batches that were attempted to be written.", + }), + WritesFailed: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: Namespace, + Subsystem: Subsystem, + Name: "notification_history_writes_failed_total", + Help: "The total number of failed writes of notification history batches.", + }), + WriteDuration: instrument.NewHistogramCollector(promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: Namespace, + Subsystem: Subsystem, + Name: "notification_history_request_duration_seconds", + Help: "Histogram of request durations to the notification history store.", + Buckets: instrument.DefBuckets, + }, instrument.HistogramCollectorBuckets)), + BytesWritten: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: Namespace, + Subsystem: Subsystem, + Name: "notification_history_writes_bytes_total", + Help: "The total number of bytes sent within a batch to the notification history store.", + }), + } +} diff --git a/pkg/services/ngalert/ngalert.go b/pkg/services/ngalert/ngalert.go index c923f39fb6a..0c2a0274867 100644 --- a/pkg/services/ngalert/ngalert.go +++ b/pkg/services/ngalert/ngalert.go @@ -7,6 +7,8 @@ import ( "time" "github.com/benbjohnson/clock" + "github.com/grafana/alerting/notify/nfstatus" + "github.com/grafana/grafana/pkg/services/ngalert/lokiclient" "github.com/prometheus/alertmanager/featurecontrol" "github.com/prometheus/alertmanager/matchers/compat" "golang.org/x/sync/errgroup" @@ -284,6 +286,18 @@ func (ng *AlertNG) init() error { overrides = append(overrides, override) } + notificationHistorian, err := configureNotificationHistorian( + initCtx, + ng.FeatureToggles, + ng.Cfg.UnifiedAlerting.NotificationHistory, + ng.Metrics.GetNotificationHistorianMetrics(), + ng.Log, + ng.tracer, + ) + if err != nil { + return err + } + decryptFn := ng.SecretsService.GetDecryptedValue multiOrgMetrics := ng.Metrics.GetMultiOrgAlertmanagerMetrics() moa, err := notifier.NewMultiOrgAlertmanager( @@ -299,6 +313,7 @@ func (ng *AlertNG) init() error { moaLogger, ng.SecretsService, ng.FeatureToggles, + notificationHistorian, overrides..., ) if err != nil { @@ -678,11 +693,11 @@ func configureHistorianBackend( return historian.NewAnnotationBackend(annotationBackendLogger, store, rs, met, ac), nil } if backend == historian.BackendTypeLoki { - lcfg, err := historian.NewLokiConfig(cfg) + lcfg, err := lokiclient.NewLokiConfig(cfg.LokiSettings) if err != nil { return nil, fmt.Errorf("invalid remote loki configuration: %w", err) } - req := historian.NewRequester() + req := lokiclient.NewRequester() logCtx := log.WithContextualAttributes(ctx, []any{"backend", "loki"}) lokiBackendLogger := log.New("ngalert.state.historian").FromContext(logCtx) backend := historian.NewRemoteLokiBackend(lokiBackendLogger, lcfg, req, met, tracer, rs, ac) @@ -717,6 +732,36 @@ func configureHistorianBackend( return nil, fmt.Errorf("unrecognized state history backend: %s", backend) } +func configureNotificationHistorian( + ctx context.Context, + featureToggles featuremgmt.FeatureToggles, + cfg setting.UnifiedAlertingNotificationHistorySettings, + met *metrics.NotificationHistorian, + l log.Logger, + tracer tracing.Tracer, +) (nfstatus.NotificationHistorian, error) { + if !featureToggles.IsEnabled(ctx, featuremgmt.FlagAlertingNotificationHistory) || !cfg.Enabled { + met.Info.Set(0) + return nil, nil + } + + met.Info.Set(1) + lcfg, err := lokiclient.NewLokiConfig(cfg.LokiSettings) + if err != nil { + return nil, fmt.Errorf("invalid remote loki configuration: %w", err) + } + req := lokiclient.NewRequester() + logger := log.New("ngalert.notifier.historian").FromContext(ctx) + notificationHistorian := notifier.NewNotificationHistorian(logger, lcfg, req, met, tracer) + + testConnCtx, cancelFunc := context.WithTimeout(ctx, 10*time.Second) + defer cancelFunc() + if err := notificationHistorian.TestConnection(testConnCtx); err != nil { + l.Error("Failed to communicate with configured remote Loki backend, notification history may not be persisted", "error", err) + } + return notificationHistorian, nil +} + func createRemoteAlertmanager(ctx context.Context, cfg remote.AlertmanagerConfig, kvstore kvstore.KVStore, crypto remote.Crypto, autogenFn remote.AutogenFn, m *metrics.RemoteAlertmanager, tracer tracing.Tracer) (*remote.Alertmanager, error) { return remote.NewAlertmanager(ctx, cfg, notifier.NewFileStore(cfg.OrgID, kvstore), crypto, autogenFn, m, tracer) } diff --git a/pkg/services/ngalert/ngalert_test.go b/pkg/services/ngalert/ngalert_test.go index a0b42309f47..640a81821c9 100644 --- a/pkg/services/ngalert/ngalert_test.go +++ b/pkg/services/ngalert/ngalert_test.go @@ -137,9 +137,11 @@ func TestConfigureHistorianBackend(t *testing.T) { cfg := setting.UnifiedAlertingStateHistorySettings{ Enabled: true, Backend: "loki", - // Should never resolve at the DNS level: https://www.rfc-editor.org/rfc/rfc6761#section-6.4 - LokiReadURL: "http://gone.invalid", - LokiWriteURL: "http://gone.invalid", + LokiSettings: setting.UnifiedAlertingLokiSettings{ + // Should never resolve at the DNS level: https://www.rfc-editor.org/rfc/rfc6761#section-6.4 + LokiReadURL: "http://gone.invalid", + LokiWriteURL: "http://gone.invalid", + }, } ac := &acfakes.FakeRuleService{} @@ -232,6 +234,75 @@ grafana_alerting_state_history_info{backend="noop"} 0 }) } +func TestConfigureNotificationHistorian(t *testing.T) { + t.Run("do not fail initialization if pinging Loki fails", func(t *testing.T) { + reg := prometheus.NewRegistry() + met := metrics.NewNotificationHistorianMetrics(reg) + logger := log.NewNopLogger() + tracer := tracing.InitializeTracerForTest() + ft := featuremgmt.WithFeatures(featuremgmt.FlagAlertingNotificationHistory) + cfg := setting.UnifiedAlertingNotificationHistorySettings{ + Enabled: true, + LokiSettings: setting.UnifiedAlertingLokiSettings{ + // Should never resolve at the DNS level: https://www.rfc-editor.org/rfc/rfc6761#section-6.4 + LokiRemoteURL: "http://gone.invalid", + }, + } + + h, err := configureNotificationHistorian(context.Background(), ft, cfg, met, logger, tracer) + require.NotNil(t, h) + require.NoError(t, err) + + // Verify that the metric value is set to 1, indicating that notification history is enabled. + exp := bytes.NewBufferString(` +# HELP grafana_alerting_notification_history_info Information about the notification history store. +# TYPE grafana_alerting_notification_history_info gauge +grafana_alerting_notification_history_info 1 +`) + err = testutil.GatherAndCompare(reg, exp, "grafana_alerting_notification_history_info") + require.NoError(t, err) + }) + + t.Run("emit special zero metric if notification history disabled", func(t *testing.T) { + testCases := []struct { + name string + ft featuremgmt.FeatureToggles + cfg setting.UnifiedAlertingNotificationHistorySettings + }{ + { + "disabled via config", + featuremgmt.WithFeatures(featuremgmt.FlagAlertingNotificationHistory), + setting.UnifiedAlertingNotificationHistorySettings{Enabled: false}, + }, + { + "disabled via feature toggle", + featuremgmt.WithFeatures(), + setting.UnifiedAlertingNotificationHistorySettings{Enabled: true}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + reg := prometheus.NewRegistry() + met := metrics.NewNotificationHistorianMetrics(reg) + logger := log.NewNopLogger() + tracer := tracing.InitializeTracerForTest() + h, err := configureNotificationHistorian(context.Background(), tc.ft, tc.cfg, met, logger, tracer) + require.Nil(t, h) + require.NoError(t, err) + + exp := bytes.NewBufferString(` +# HELP grafana_alerting_notification_history_info Information about the notification history store. +# TYPE grafana_alerting_notification_history_info gauge +grafana_alerting_notification_history_info 0 +`) + err = testutil.GatherAndCompare(reg, exp, "grafana_alerting_notification_history_info") + require.NoError(t, err) + }) + } + }) +} + type mockDB struct { db.DB } diff --git a/pkg/services/ngalert/notifier/alertmanager.go b/pkg/services/ngalert/notifier/alertmanager.go index db90eaced84..faf85e3f07a 100644 --- a/pkg/services/ngalert/notifier/alertmanager.go +++ b/pkg/services/ngalert/notifier/alertmanager.go @@ -11,6 +11,7 @@ import ( "time" alertingNotify "github.com/grafana/alerting/notify" + "github.com/grafana/alerting/notify/nfstatus" "github.com/prometheus/alertmanager/config" amv2 "github.com/prometheus/alertmanager/api/v2/models" @@ -87,7 +88,7 @@ func (m maintenanceOptions) MaintenanceFunc(state alertingNotify.State) (int64, func NewAlertmanager(ctx context.Context, orgID int64, cfg *setting.Cfg, store AlertingStore, stateStore stateStore, peer alertingNotify.ClusterPeer, decryptFn alertingNotify.GetDecryptedValueFn, ns notifications.Service, - m *metrics.Alertmanager, featureToggles featuremgmt.FeatureToggles, crypto Crypto, + m *metrics.Alertmanager, featureToggles featuremgmt.FeatureToggles, crypto Crypto, notificationHistorian nfstatus.NotificationHistorian, ) (*alertmanager, error) { nflog, err := stateStore.GetNotificationLog(ctx) if err != nil { @@ -129,15 +130,16 @@ func NewAlertmanager(ctx context.Context, orgID int64, cfg *setting.Cfg, store A MaxSilences: cfg.UnifiedAlerting.AlertmanagerMaxSilencesCount, MaxSilenceSizeBytes: cfg.UnifiedAlerting.AlertmanagerMaxSilenceSizeBytes, }, - EmailSender: &emailSender{ns}, - ImageProvider: newImageProvider(store, l.New("component", "image-provider")), - Decrypter: decryptFn, - Version: setting.BuildVersion, - TenantKey: "orgID", - TenantID: orgID, - Peer: peer, - Logger: l, - Metrics: alertingNotify.NewGrafanaAlertmanagerMetrics(m.Registerer, l), + EmailSender: &emailSender{ns}, + ImageProvider: newImageProvider(store, l.New("component", "image-provider")), + Decrypter: decryptFn, + Version: setting.BuildVersion, + TenantKey: "orgID", + TenantID: orgID, + Peer: peer, + Logger: l, + Metrics: alertingNotify.NewGrafanaAlertmanagerMetrics(m.Registerer, l), + NotificationHistorian: notificationHistorian, } gam, err := alertingNotify.NewGrafanaAlertmanager(opts) diff --git a/pkg/services/ngalert/notifier/alertmanager_test.go b/pkg/services/ngalert/notifier/alertmanager_test.go index fe00b4f8331..e865fca9178 100644 --- a/pkg/services/ngalert/notifier/alertmanager_test.go +++ b/pkg/services/ngalert/notifier/alertmanager_test.go @@ -58,7 +58,7 @@ func setupAMTest(t *testing.T) *alertmanager { stateStore := NewFileStore(int64(orgID), kvStore) crypto := NewCrypto(secretsService, s, l) - am, err := NewAlertmanager(context.Background(), 1, cfg, s, stateStore, &NilPeer{}, decryptFn, nil, m, featuremgmt.WithFeatures(), crypto) + am, err := NewAlertmanager(context.Background(), 1, cfg, s, stateStore, &NilPeer{}, decryptFn, nil, m, featuremgmt.WithFeatures(), crypto, nil) require.NoError(t, err) return am } diff --git a/pkg/services/ngalert/notifier/historian.go b/pkg/services/ngalert/notifier/historian.go new file mode 100644 index 00000000000..f56175de20c --- /dev/null +++ b/pkg/services/ngalert/notifier/historian.go @@ -0,0 +1,190 @@ +package notifier + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + alertingModels "github.com/grafana/alerting/models" + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/infra/tracing" + "github.com/grafana/grafana/pkg/services/ngalert/client" + "github.com/grafana/grafana/pkg/services/ngalert/lokiclient" + "github.com/grafana/grafana/pkg/services/ngalert/metrics" + "github.com/prometheus/alertmanager/notify" + "github.com/prometheus/alertmanager/types" + prometheusModel "github.com/prometheus/common/model" + "go.opentelemetry.io/otel/trace" +) + +const LokiClientSpanName = "ngalert.notification-historian.client" +const NotificationHistoryWriteTimeout = time.Minute +const NotificationHistoryKey = "from" +const NotificationHistoryLabelValue = "notify-history" + +type NotificationHistoryLokiEntry struct { + SchemaVersion int `json:"schemaVersion"` + Receiver string `json:"receiver"` + Status string `json:"status"` + GroupLabels map[string]string `json:"groupLabels"` + Alerts []NotificationHistoryLokiEntryAlert `json:"alerts"` + Retry bool `json:"retry"` + Error string `json:"error,omitempty"` + Duration int64 `json:"duration"` +} + +type NotificationHistoryLokiEntryAlert struct { + Status string `json:"status"` + Labels map[string]string `json:"labels"` + Annotations map[string]string `json:"annotations"` + StartsAt time.Time `json:"startsAt"` + EndsAt time.Time `json:"endsAt"` + RuleUID string `json:"ruleUID"` +} + +type remoteLokiClient interface { + Ping(context.Context) error + Push(context.Context, []lokiclient.Stream) error +} + +type NotificationHistorian struct { + client remoteLokiClient + externalLabels map[string]string + metrics *metrics.NotificationHistorian + log log.Logger +} + +func NewNotificationHistorian(logger log.Logger, cfg lokiclient.LokiConfig, req client.Requester, metrics *metrics.NotificationHistorian, tracer tracing.Tracer) *NotificationHistorian { + return &NotificationHistorian{ + client: lokiclient.NewLokiClient(cfg, req, metrics.BytesWritten, metrics.WriteDuration, logger, tracer, LokiClientSpanName), + externalLabels: cfg.ExternalLabels, + metrics: metrics, + log: logger, + } +} + +func (h *NotificationHistorian) TestConnection(ctx context.Context) error { + return h.client.Ping(ctx) +} + +func (h *NotificationHistorian) Record(ctx context.Context, alerts []*types.Alert, retry bool, notificationErr error, duration time.Duration) <-chan error { + stream, err := h.prepareStream(ctx, alerts, retry, notificationErr, duration) + logger := h.log.FromContext(ctx) + errCh := make(chan error, 1) + if err != nil { + logger.Error("Failed to convert notification history to stream", "error", err) + errCh <- fmt.Errorf("failed to convert notification history to stream: %w", err) + close(errCh) + return errCh + } + + // This is a new background job, so let's create a new context for it. + // We want it to be isolated, i.e. we don't want grafana shutdowns to interrupt this work + // immediately but rather try to flush writes. + // This also prevents timeouts or other lingering objects (like transactions) from being + // incorrectly propagated here from other areas. + writeCtx := context.Background() + writeCtx, cancel := context.WithTimeout(writeCtx, NotificationHistoryWriteTimeout) + writeCtx = trace.ContextWithSpan(writeCtx, trace.SpanFromContext(ctx)) + + go func(ctx context.Context) { + defer cancel() + defer close(errCh) + logger := h.log.FromContext(ctx) + logger.Debug("Saving notification history") + h.metrics.WritesTotal.Inc() + + if err := h.recordStream(ctx, stream, logger); err != nil { + logger.Error("Failed to save notification history", "error", err) + h.metrics.WritesFailed.Inc() + errCh <- fmt.Errorf("failed to save notification history: %w", err) + } + }(writeCtx) + return errCh +} + +func (h *NotificationHistorian) prepareStream(ctx context.Context, alerts []*types.Alert, retry bool, notificationErr error, duration time.Duration) (lokiclient.Stream, error) { + receiverName, ok := notify.ReceiverName(ctx) + if !ok { + return lokiclient.Stream{}, fmt.Errorf("receiver name not found in context") + } + groupLabels, ok := notify.GroupLabels(ctx) + if !ok { + return lokiclient.Stream{}, fmt.Errorf("group labels not found in context") + } + now, ok := notify.Now(ctx) + if !ok { + return lokiclient.Stream{}, fmt.Errorf("now not found in context") + } + + entryAlerts := make([]NotificationHistoryLokiEntryAlert, len(alerts)) + for i, alert := range alerts { + labels := prepareLabels(alert.Labels) + annotations := prepareLabels(alert.Annotations) + entryAlerts[i] = NotificationHistoryLokiEntryAlert{ + Labels: labels, + Annotations: annotations, + Status: string(alert.StatusAt(now)), + StartsAt: alert.StartsAt, + EndsAt: alert.EndsAt, + RuleUID: string(alert.Labels[alertingModels.RuleUIDLabel]), + } + } + + notificationErrStr := "" + if notificationErr != nil { + notificationErrStr = notificationErr.Error() + } + + entry := NotificationHistoryLokiEntry{ + SchemaVersion: 1, + Receiver: receiverName, + Status: string(types.Alerts(alerts...).StatusAt(now)), + GroupLabels: prepareLabels(groupLabels), + Alerts: entryAlerts, + Retry: retry, + Error: notificationErrStr, + Duration: duration.Milliseconds(), + } + + entryJSON, err := json.Marshal(entry) + if err != nil { + return lokiclient.Stream{}, err + } + + streamLabels := make(map[string]string) + streamLabels[NotificationHistoryKey] = NotificationHistoryLabelValue + for k, v := range h.externalLabels { + streamLabels[k] = v + } + + return lokiclient.Stream{ + Stream: streamLabels, + Values: []lokiclient.Sample{ + { + T: now, + V: string(entryJSON), + }}, + }, nil +} + +func (h *NotificationHistorian) recordStream(ctx context.Context, stream lokiclient.Stream, logger log.Logger) error { + if err := h.client.Push(ctx, []lokiclient.Stream{stream}); err != nil { + return err + } + logger.Debug("Done saving notification history") + return nil +} + +func prepareLabels(labels prometheusModel.LabelSet) map[string]string { + result := make(map[string]string) + for k, v := range labels { + // Remove private labels + if !strings.HasPrefix(string(k), "__") && !strings.HasSuffix(string(k), "__") { + result[string(k)] = string(v) + } + } + return result +} diff --git a/pkg/services/ngalert/notifier/historian_test.go b/pkg/services/ngalert/notifier/historian_test.go new file mode 100644 index 00000000000..92056499dc3 --- /dev/null +++ b/pkg/services/ngalert/notifier/historian_test.go @@ -0,0 +1,126 @@ +package notifier + +import ( + "bytes" + "context" + "errors" + "io" + "net/url" + "testing" + "time" + + alertingModels "github.com/grafana/alerting/models" + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/infra/tracing" + "github.com/grafana/grafana/pkg/services/ngalert/client" + "github.com/grafana/grafana/pkg/services/ngalert/lokiclient" + "github.com/grafana/grafana/pkg/services/ngalert/metrics" + "github.com/prometheus/alertmanager/notify" + "github.com/prometheus/alertmanager/types" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" +) + +var testNow = time.Date(2025, time.July, 15, 16, 55, 0, 0, time.UTC) +var testAlerts = []*types.Alert{ + { + Alert: model.Alert{ + Labels: model.LabelSet{"alertname": "Alert1", alertingModels.RuleUIDLabel: "testRuleUID"}, + Annotations: model.LabelSet{"foo": "bar", "__private__": "baz"}, + StartsAt: testNow, + EndsAt: testNow, + GeneratorURL: "http://localhost/test", + }, + }, +} + +func TestRecord(t *testing.T) { + t.Run("write notification history to Loki", func(t *testing.T) { + testCases := []struct { + name string + retry bool + notificationErr error + expected string + }{ + { + "successful notification", + false, + nil, + "{\"streams\":[{\"stream\":{\"externalLabelKey\":\"externalLabelValue\",\"from\":\"notify-history\"},\"values\":[[\"1752598500000000000\",\"{\\\"schemaVersion\\\":1,\\\"receiver\\\":\\\"testReceiverName\\\",\\\"status\\\":\\\"resolved\\\",\\\"groupLabels\\\":{\\\"foo\\\":\\\"bar\\\"},\\\"alerts\\\":[{\\\"status\\\":\\\"resolved\\\",\\\"labels\\\":{\\\"alertname\\\":\\\"Alert1\\\"},\\\"annotations\\\":{\\\"foo\\\":\\\"bar\\\"},\\\"startsAt\\\":\\\"2025-07-15T16:55:00Z\\\",\\\"endsAt\\\":\\\"2025-07-15T16:55:00Z\\\",\\\"ruleUID\\\":\\\"testRuleUID\\\"}],\\\"retry\\\":false,\\\"duration\\\":1000}\"]]}]}", + }, + { + "failed notification", + true, + errors.New("test notification error"), + "{\"streams\":[{\"stream\":{\"externalLabelKey\":\"externalLabelValue\",\"from\":\"notify-history\"},\"values\":[[\"1752598500000000000\",\"{\\\"schemaVersion\\\":1,\\\"receiver\\\":\\\"testReceiverName\\\",\\\"status\\\":\\\"resolved\\\",\\\"groupLabels\\\":{\\\"foo\\\":\\\"bar\\\"},\\\"alerts\\\":[{\\\"status\\\":\\\"resolved\\\",\\\"labels\\\":{\\\"alertname\\\":\\\"Alert1\\\"},\\\"annotations\\\":{\\\"foo\\\":\\\"bar\\\"},\\\"startsAt\\\":\\\"2025-07-15T16:55:00Z\\\",\\\"endsAt\\\":\\\"2025-07-15T16:55:00Z\\\",\\\"ruleUID\\\":\\\"testRuleUID\\\"}],\\\"retry\\\":true,\\\"error\\\":\\\"test notification error\\\",\\\"duration\\\":1000}\"]]}]}", + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + req := lokiclient.NewFakeRequester() + met := metrics.NewNotificationHistorianMetrics(prometheus.NewRegistry()) + h := createTestNotificationHistorian(req, met) + + err := <-h.Record(recordCtx(), testAlerts, tc.retry, tc.notificationErr, time.Second) + require.NoError(t, err) + + reqBody, err := io.ReadAll(req.LastRequest.Body) + require.NoError(t, err) + require.Equal(t, tc.expected, string(reqBody)) + }) + } + }) + + t.Run("emits expected write metrics", func(t *testing.T) { + reg := prometheus.NewRegistry() + met := metrics.NewNotificationHistorianMetrics(reg) + goodHistorian := createTestNotificationHistorian(lokiclient.NewFakeRequester(), met) + badHistorian := createTestNotificationHistorian(lokiclient.NewFakeRequester().WithResponse(lokiclient.BadResponse()), met) + + <-goodHistorian.Record(recordCtx(), testAlerts, false, nil, time.Second) + <-badHistorian.Record(recordCtx(), testAlerts, false, nil, time.Second) + + exp := bytes.NewBufferString(` +# HELP grafana_alerting_notification_history_writes_failed_total The total number of failed writes of notification history batches. +# TYPE grafana_alerting_notification_history_writes_failed_total counter +grafana_alerting_notification_history_writes_failed_total 1 +# HELP grafana_alerting_notification_history_writes_total The total number of notification history batches that were attempted to be written. +# TYPE grafana_alerting_notification_history_writes_total counter +grafana_alerting_notification_history_writes_total 2 +`) + err := testutil.GatherAndCompare(reg, exp, + "grafana_alerting_notification_history_writes_total", + "grafana_alerting_notification_history_writes_failed_total", + ) + require.NoError(t, err) + }) + + t.Run("returns error when context is missing required fields", func(t *testing.T) { + req := lokiclient.NewFakeRequester() + met := metrics.NewNotificationHistorianMetrics(prometheus.NewRegistry()) + h := createTestNotificationHistorian(req, met) + + err := <-h.Record(context.Background(), testAlerts, false, nil, time.Second) + require.Error(t, err) + }) +} + +func createTestNotificationHistorian(req client.Requester, met *metrics.NotificationHistorian) *NotificationHistorian { + writePathURL, _ := url.Parse("http://some.url") + cfg := lokiclient.LokiConfig{ + WritePathURL: writePathURL, + ExternalLabels: map[string]string{"externalLabelKey": "externalLabelValue"}, + Encoder: lokiclient.JsonEncoder{}, + } + tracer := tracing.InitializeTracerForTest() + return NewNotificationHistorian(log.NewNopLogger(), cfg, req, met, tracer) +} + +func recordCtx() context.Context { + ctx := notify.WithReceiverName(context.Background(), "testReceiverName") + ctx = notify.WithGroupLabels(ctx, model.LabelSet{"foo": "bar"}) + ctx = notify.WithNow(ctx, testNow) + return ctx +} diff --git a/pkg/services/ngalert/notifier/multiorg_alertmanager.go b/pkg/services/ngalert/notifier/multiorg_alertmanager.go index ddde2ad8e89..7b52cb68c3b 100644 --- a/pkg/services/ngalert/notifier/multiorg_alertmanager.go +++ b/pkg/services/ngalert/notifier/multiorg_alertmanager.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/grafana/alerting/notify/nfstatus" "github.com/prometheus/client_golang/prometheus" alertingCluster "github.com/grafana/alerting/cluster" @@ -134,6 +135,7 @@ func NewMultiOrgAlertmanager( l log.Logger, s secrets.Service, featureManager featuremgmt.FeatureToggles, + notificationHistorian nfstatus.NotificationHistorian, opts ...Option, ) (*MultiOrgAlertmanager, error) { moa := &MultiOrgAlertmanager{ @@ -166,7 +168,7 @@ func NewMultiOrgAlertmanager( moa.factory = func(ctx context.Context, orgID int64) (Alertmanager, error) { m := metrics.NewAlertmanagerMetrics(moa.metrics.GetOrCreateOrgRegistry(orgID), l) stateStore := NewFileStore(orgID, kvStore) - return NewAlertmanager(ctx, orgID, moa.settings, moa.configStore, stateStore, moa.peer, moa.decryptFn, moa.ns, m, featureManager, moa.Crypto) + return NewAlertmanager(ctx, orgID, moa.settings, moa.configStore, stateStore, moa.peer, moa.decryptFn, moa.ns, m, featureManager, moa.Crypto, notificationHistorian) } for _, opt := range opts { diff --git a/pkg/services/ngalert/notifier/multiorg_alertmanager_remote_test.go b/pkg/services/ngalert/notifier/multiorg_alertmanager_remote_test.go index 2d60c284535..81b8aebb766 100644 --- a/pkg/services/ngalert/notifier/multiorg_alertmanager_remote_test.go +++ b/pkg/services/ngalert/notifier/multiorg_alertmanager_remote_test.go @@ -102,6 +102,7 @@ func TestMultiorgAlertmanager_RemoteSecondaryMode(t *testing.T) { nopLogger, secretsService, featuremgmt.WithFeatures(), + nil, override, ) require.NoError(t, err) diff --git a/pkg/services/ngalert/notifier/multiorg_alertmanager_test.go b/pkg/services/ngalert/notifier/multiorg_alertmanager_test.go index 92c0209aa37..68a6a7dedea 100644 --- a/pkg/services/ngalert/notifier/multiorg_alertmanager_test.go +++ b/pkg/services/ngalert/notifier/multiorg_alertmanager_test.go @@ -396,6 +396,7 @@ func setupMam(t *testing.T, cfg *setting.Cfg) *MultiOrgAlertmanager { log.New("testlogger"), secretsService, featuremgmt.WithFeatures(), + nil, ) require.NoError(t, err) return mam diff --git a/pkg/services/ngalert/sender/router_test.go b/pkg/services/ngalert/sender/router_test.go index 25f0c7566aa..9aa7d8120ee 100644 --- a/pkg/services/ngalert/sender/router_test.go +++ b/pkg/services/ngalert/sender/router_test.go @@ -504,6 +504,7 @@ func createMultiOrgAlertmanager(t *testing.T, orgs []int64) *notifier.MultiOrgAl log.New("testlogger"), secretsService, featuremgmt.WithFeatures(), + nil, ) require.NoError(t, err) require.NoError(t, moa.LoadAndSyncAlertmanagersForOrgs(context.Background())) diff --git a/pkg/services/ngalert/state/historian/loki.go b/pkg/services/ngalert/state/historian/loki.go index b2db9da08bf..c8e9b356552 100644 --- a/pkg/services/ngalert/state/historian/loki.go +++ b/pkg/services/ngalert/state/historian/loki.go @@ -13,6 +13,7 @@ import ( "github.com/benbjohnson/clock" "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/grafana/grafana/pkg/services/ngalert/lokiclient" "go.opentelemetry.io/otel/trace" "github.com/grafana/grafana/pkg/apimachinery/errutil" @@ -42,6 +43,7 @@ const ( const ( StateHistoryLabelKey = "from" StateHistoryLabelValue = "state-history" + LokiClientSpanName = "ngalert.historian.client" ) const defaultQueryRange = 6 * time.Hour @@ -67,8 +69,8 @@ func NewErrLokiQueryTooLong(query string, maxLimit int) error { type remoteLokiClient interface { Ping(context.Context) error - Push(context.Context, []Stream) error - RangeQuery(ctx context.Context, logQL string, start, end, limit int64) (QueryRes, error) + Push(context.Context, []lokiclient.Stream) error + RangeQuery(ctx context.Context, logQL string, start, end, limit int64) (lokiclient.QueryRes, error) MaxQuerySize() int } @@ -83,9 +85,9 @@ type RemoteLokiBackend struct { ruleStore RuleStore } -func NewRemoteLokiBackend(logger log.Logger, cfg LokiConfig, req client.Requester, metrics *metrics.Historian, tracer tracing.Tracer, ruleStore RuleStore, ac AccessControl) *RemoteLokiBackend { +func NewRemoteLokiBackend(logger log.Logger, cfg lokiclient.LokiConfig, req client.Requester, metrics *metrics.Historian, tracer tracing.Tracer, ruleStore RuleStore, ac AccessControl) *RemoteLokiBackend { return &RemoteLokiBackend{ - client: NewLokiClient(cfg, req, metrics, logger, tracer), + client: lokiclient.NewLokiClient(cfg, req, metrics.BytesWritten, metrics.WriteDuration, logger, tracer, LokiClientSpanName), externalLabels: cfg.ExternalLabels, clock: clock.New(), metrics: metrics, @@ -161,7 +163,7 @@ func (h *RemoteLokiBackend) Query(ctx context.Context, query models.HistoryQuery if query.From.IsZero() { query.From = now.Add(-defaultQueryRange) } - var res []Stream + var res []lokiclient.Stream for _, logQL := range queries { // Timestamps are expected in RFC3339Nano. // Apply user-defined limit to every request. Multiple batches is a very rare case, and therefore we can tolerate getting more data than needed. @@ -176,7 +178,7 @@ func (h *RemoteLokiBackend) Query(ctx context.Context, query models.HistoryQuery } // merge will put all the results in one array sorted by timestamp. -func merge(res []Stream, folderUIDToFilter []string) (*data.Frame, error) { +func merge(res []lokiclient.Stream, folderUIDToFilter []string) (*data.Frame, error) { filterByFolderUIDMap := make(map[string]struct{}, len(folderUIDToFilter)) for _, uid := range folderUIDToFilter { filterByFolderUIDMap[uid] = struct{}{} @@ -207,7 +209,7 @@ func merge(res []Stream, folderUIDToFilter []string) (*data.Frame, error) { pointers := make([]int, len(res)) for { minTime := int64(math.MaxInt64) - minEl := Sample{} + minEl := lokiclient.Sample{} minElStreamIdx := -1 // Find the element with the earliest time among all arrays. for i, stream := range res { @@ -269,7 +271,7 @@ func merge(res []Stream, folderUIDToFilter []string) (*data.Frame, error) { return frame, nil } -func StatesToStream(rule history_model.RuleMeta, states []state.StateTransition, externalLabels map[string]string, logger log.Logger) Stream { +func StatesToStream(rule history_model.RuleMeta, states []state.StateTransition, externalLabels map[string]string, logger log.Logger) lokiclient.Stream { labels := mergeLabels(make(map[string]string), externalLabels) // System-defined labels take precedence over user-defined external labels. labels[StateHistoryLabelKey] = StateHistoryLabelValue @@ -277,7 +279,7 @@ func StatesToStream(rule history_model.RuleMeta, states []state.StateTransition, labels[GroupLabel] = fmt.Sprint(rule.Group) labels[FolderUIDLabel] = fmt.Sprint(rule.NamespaceUID) - samples := make([]Sample, 0, len(states)) + samples := make([]lokiclient.Sample, 0, len(states)) for _, state := range states { if !shouldRecord(state) { continue @@ -309,20 +311,20 @@ func StatesToStream(rule history_model.RuleMeta, states []state.StateTransition, } line := string(jsn) - samples = append(samples, Sample{ + samples = append(samples, lokiclient.Sample{ T: state.LastEvaluationTime, V: line, }) } - return Stream{ + return lokiclient.Stream{ Stream: labels, Values: samples, } } -func (h *RemoteLokiBackend) recordStreams(ctx context.Context, stream Stream, logger log.Logger) error { - if err := h.client.Push(ctx, []Stream{stream}); err != nil { +func (h *RemoteLokiBackend) recordStreams(ctx context.Context, stream lokiclient.Stream, logger log.Logger) error { + if err := h.client.Push(ctx, []lokiclient.Stream{stream}); err != nil { return err } diff --git a/pkg/services/ngalert/state/historian/loki_test.go b/pkg/services/ngalert/state/historian/loki_test.go index 1f61948083f..e206fac1df4 100644 --- a/pkg/services/ngalert/state/historian/loki_test.go +++ b/pkg/services/ngalert/state/historian/loki_test.go @@ -14,6 +14,7 @@ import ( "time" "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/grafana/grafana/pkg/services/ngalert/lokiclient" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" @@ -348,15 +349,15 @@ func TestBuildLogQuery(t *testing.T) { func TestMerge(t *testing.T) { testCases := []struct { name string - res QueryRes + res lokiclient.QueryRes expected *data.Frame folderUIDs []string }{ { name: "Should return values from multiple streams in right order", - res: QueryRes{ - Data: QueryData{ - Result: []Stream{ + res: lokiclient.QueryRes{ + Data: lokiclient.QueryData{ + Result: []lokiclient.Stream{ { Stream: map[string]string{ "from": "state-history", @@ -365,8 +366,8 @@ func TestMerge(t *testing.T) { "folderUID": "test-folder-1", "extra": "label", }, - Values: []Sample{ - {time.Unix(1, 0), `{"schemaVersion": 1, "previous": "normal", "current": "pending", "values":{"a": 1.5}, "ruleUID": "test-rule-1"}`}, + Values: []lokiclient.Sample{ + {T: time.Unix(1, 0), V: `{"schemaVersion": 1, "previous": "normal", "current": "pending", "values":{"a": 1.5}, "ruleUID": "test-rule-1"}`}, }, }, { @@ -376,8 +377,8 @@ func TestMerge(t *testing.T) { "group": "test-group-2", "folderUID": "test-folder-1", }, - Values: []Sample{ - {time.Unix(2, 0), `{"schemaVersion": 1, "previous": "pending", "current": "firing", "values":{"a": 2.5}, "ruleUID": "test-rule-2"}`}, + Values: []lokiclient.Sample{ + {T: time.Unix(2, 0), V: `{"schemaVersion": 1, "previous": "pending", "current": "firing", "values":{"a": 2.5}, "ruleUID": "test-rule-2"}`}, }, }, }, @@ -411,14 +412,14 @@ func TestMerge(t *testing.T) { }, { name: "Should handle empty values", - res: QueryRes{ - Data: QueryData{ - Result: []Stream{ + res: lokiclient.QueryRes{ + Data: lokiclient.QueryData{ + Result: []lokiclient.Stream{ { Stream: map[string]string{ "extra": "labels", }, - Values: []Sample{}, + Values: []lokiclient.Sample{}, }, }, }, @@ -431,9 +432,9 @@ func TestMerge(t *testing.T) { }, { name: "Should handle multiple values in one stream", - res: QueryRes{ - Data: QueryData{ - Result: []Stream{ + res: lokiclient.QueryRes{ + Data: lokiclient.QueryData{ + Result: []lokiclient.Stream{ { Stream: map[string]string{ "from": "state-history", @@ -441,9 +442,9 @@ func TestMerge(t *testing.T) { "group": "test-group-1", "folderUID": "test-folder-1", }, - Values: []Sample{ - {time.Unix(1, 0), `{"schemaVersion": 1, "previous": "normal", "current": "pending", "values":{"a": 1.5}, "ruleUID": "test-rule-1"}`}, - {time.Unix(5, 0), `{"schemaVersion": 1, "previous": "pending", "current": "normal", "values":{"a": 0.5}, "ruleUID": "test-rule-2"}`}, + Values: []lokiclient.Sample{ + {T: time.Unix(1, 0), V: `{"schemaVersion": 1, "previous": "normal", "current": "pending", "values":{"a": 1.5}, "ruleUID": "test-rule-1"}`}, + {T: time.Unix(5, 0), V: `{"schemaVersion": 1, "previous": "pending", "current": "normal", "values":{"a": 0.5}, "ruleUID": "test-rule-2"}`}, }, }, { @@ -453,8 +454,8 @@ func TestMerge(t *testing.T) { "group": "test-group-2", "folderUID": "test-folder-1", }, - Values: []Sample{ - {time.Unix(2, 0), `{"schemaVersion": 1, "previous": "pending", "current": "firing", "values":{"a": 2.5}, "ruleUID": "test-rule-3"}`}, + Values: []lokiclient.Sample{ + {T: time.Unix(2, 0), V: `{"schemaVersion": 1, "previous": "pending", "current": "firing", "values":{"a": 2.5}, "ruleUID": "test-rule-3"}`}, }, }, }, @@ -496,9 +497,9 @@ func TestMerge(t *testing.T) { { name: "should filter streams by folder UID", folderUIDs: []string{"test-folder-1"}, - res: QueryRes{ - Data: QueryData{ - Result: []Stream{ + res: lokiclient.QueryRes{ + Data: lokiclient.QueryData{ + Result: []lokiclient.Stream{ { Stream: map[string]string{ "from": "state-history", @@ -506,9 +507,9 @@ func TestMerge(t *testing.T) { "group": "test-group-1", "folderUID": "test-folder-1", }, - Values: []Sample{ - {time.Unix(1, 0), `{"schemaVersion": 1, "previous": "normal", "current": "pending", "values":{"a": 1.5}, "ruleUID": "test-rule-1"}`}, - {time.Unix(5, 0), `{"schemaVersion": 1, "previous": "pending", "current": "normal", "values":{"a": 0.5}, "ruleUID": "test-rule-2"}`}, + Values: []lokiclient.Sample{ + {T: time.Unix(1, 0), V: `{"schemaVersion": 1, "previous": "normal", "current": "pending", "values":{"a": 1.5}, "ruleUID": "test-rule-1"}`}, + {T: time.Unix(5, 0), V: `{"schemaVersion": 1, "previous": "pending", "current": "normal", "values":{"a": 0.5}, "ruleUID": "test-rule-2"}`}, }, }, { @@ -518,8 +519,8 @@ func TestMerge(t *testing.T) { "group": "test-group-2", "folderUID": "test-folder-2", }, - Values: []Sample{ - {time.Unix(2, 0), `{"schemaVersion": 1, "previous": "pending", "current": "firing", "values":{"a": 2.5}, "ruleUID": "test-rule-3"}`}, + Values: []lokiclient.Sample{ + {T: time.Unix(2, 0), V: `{"schemaVersion": 1, "previous": "pending", "current": "firing", "values":{"a": 2.5}, "ruleUID": "test-rule-3"}`}, }, }, }, @@ -553,16 +554,16 @@ func TestMerge(t *testing.T) { { name: "should skip streams without folder UID if filter is specified", folderUIDs: []string{"test-folder-1"}, - res: QueryRes{ - Data: QueryData{ - Result: []Stream{ + res: lokiclient.QueryRes{ + Data: lokiclient.QueryData{ + Result: []lokiclient.Stream{ { Stream: map[string]string{ "group": "test-group-1", }, - Values: []Sample{ - {time.Unix(1, 0), `{"schemaVersion": 1, "previous": "normal", "current": "pending", "values":{"a": 1.5}, "ruleUID": "test-rule-1"}`}, - {time.Unix(5, 0), `{"schemaVersion": 1, "previous": "pending", "current": "normal", "values":{"a": 0.5}, "ruleUID": "test-rule-2"}`}, + Values: []lokiclient.Sample{ + {T: time.Unix(1, 0), V: `{"schemaVersion": 1, "previous": "normal", "current": "pending", "values":{"a": 1.5}, "ruleUID": "test-rule-1"}`}, + {T: time.Unix(5, 0), V: `{"schemaVersion": 1, "previous": "pending", "current": "normal", "values":{"a": 0.5}, "ruleUID": "test-rule-2"}`}, }, }, }, @@ -577,15 +578,15 @@ func TestMerge(t *testing.T) { { name: "should return streams without folder UID if filter is not specified", folderUIDs: []string{}, - res: QueryRes{ - Data: QueryData{ - Result: []Stream{ + res: lokiclient.QueryRes{ + Data: lokiclient.QueryData{ + Result: []lokiclient.Stream{ { Stream: map[string]string{ "group": "test-group-1", }, - Values: []Sample{ - {time.Unix(1, 0), `{"schemaVersion": 1, "previous": "normal", "current": "pending", "values":{"a": 1.5}, "ruleUID": "test-rule-1"}`}, + Values: []lokiclient.Sample{ + {T: time.Unix(1, 0), V: `{"schemaVersion": 1, "previous": "normal", "current": "pending", "values":{"a": 1.5}, "ruleUID": "test-rule-1"}`}, }, }, }, @@ -624,7 +625,7 @@ func TestMerge(t *testing.T) { func TestRecordStates(t *testing.T) { t.Run("writes state transitions to loki", func(t *testing.T) { - req := NewFakeRequester() + req := lokiclient.NewFakeRequester() loki := createTestLokiBackend(t, req, metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem)) rule := createTestRule() states := singleFromNormal(&state.State{ @@ -635,14 +636,14 @@ func TestRecordStates(t *testing.T) { err := <-loki.Record(context.Background(), rule, states) require.NoError(t, err) - require.Contains(t, "/loki/api/v1/push", req.lastRequest.URL.Path) + require.Contains(t, "/loki/api/v1/push", req.LastRequest.URL.Path) }) t.Run("emits expected write metrics", func(t *testing.T) { reg := prometheus.NewRegistry() met := metrics.NewHistorianMetrics(reg, metrics.Subsystem) - loki := createTestLokiBackend(t, NewFakeRequester(), met) - errLoki := createTestLokiBackend(t, NewFakeRequester().WithResponse(badResponse()), met) //nolint:bodyclose + loki := createTestLokiBackend(t, lokiclient.NewFakeRequester(), met) + errLoki := createTestLokiBackend(t, lokiclient.NewFakeRequester().WithResponse(lokiclient.BadResponse()), met) //nolint:bodyclose rule := createTestRule() states := singleFromNormal(&state.State{ State: eval.Alerting, @@ -676,7 +677,7 @@ grafana_alerting_state_history_writes_total{backend="loki",org="1"} 2 }) t.Run("elides request if nothing to send", func(t *testing.T) { - req := NewFakeRequester() + req := lokiclient.NewFakeRequester() loki := createTestLokiBackend(t, req, metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem)) rule := createTestRule() states := []state.StateTransition{} @@ -684,11 +685,11 @@ grafana_alerting_state_history_writes_total{backend="loki",org="1"} 2 err := <-loki.Record(context.Background(), rule, states) require.NoError(t, err) - require.Nil(t, req.lastRequest) + require.Nil(t, req.LastRequest) }) t.Run("succeeds with special chars in labels", func(t *testing.T) { - req := NewFakeRequester() + req := lokiclient.NewFakeRequester() loki := createTestLokiBackend(t, req, metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem)) rule := createTestRule() states := singleFromNormal(&state.State{ @@ -703,15 +704,15 @@ grafana_alerting_state_history_writes_total{backend="loki",org="1"} 2 err := <-loki.Record(context.Background(), rule, states) require.NoError(t, err) - require.Contains(t, "/loki/api/v1/push", req.lastRequest.URL.Path) - sent := string(readBody(t, req.lastRequest)) + require.Contains(t, "/loki/api/v1/push", req.LastRequest.URL.Path) + sent := string(readBody(t, req.LastRequest)) require.Contains(t, sent, "contains.dot") require.Contains(t, sent, "contains=equals") require.Contains(t, sent, "contains🤔emoji") }) t.Run("adds external labels to log lines", func(t *testing.T) { - req := NewFakeRequester() + req := lokiclient.NewFakeRequester() loki := createTestLokiBackend(t, req, metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem)) rule := createTestRule() states := singleFromNormal(&state.State{ @@ -721,8 +722,8 @@ grafana_alerting_state_history_writes_total{backend="loki",org="1"} 2 err := <-loki.Record(context.Background(), rule, states) require.NoError(t, err) - require.Contains(t, "/loki/api/v1/push", req.lastRequest.URL.Path) - sent := string(readBody(t, req.lastRequest)) + require.Contains(t, "/loki/api/v1/push", req.LastRequest.URL.Path) + sent := string(readBody(t, req.LastRequest)) require.Contains(t, sent, "externalLabelKey") require.Contains(t, sent, "externalLabelValue") }) @@ -739,7 +740,7 @@ func TestGetFolderUIDsForFilter(t *testing.T) { usr := accesscontrol.BackgroundUser("test", 1, org.RoleNone, nil) createLoki := func(ac AccessControl) *RemoteLokiBackend { - req := NewFakeRequester() + req := lokiclient.NewFakeRequester() loki := createTestLokiBackend(t, req, metrics.NewHistorianMetrics(prometheus.NewRegistry(), metrics.Subsystem)) rules := fakes.NewRuleStore(t) f := make([]*folder.Folder, 0, len(folders)) @@ -881,10 +882,10 @@ func TestGetFolderUIDsForFilter(t *testing.T) { func createTestLokiBackend(t *testing.T, req client.Requester, met *metrics.Historian) *RemoteLokiBackend { url, _ := url.Parse("http://some.url") - cfg := LokiConfig{ + cfg := lokiclient.LokiConfig{ WritePathURL: url, ReadPathURL: url, - Encoder: JsonEncoder{}, + Encoder: lokiclient.JsonEncoder{}, ExternalLabels: map[string]string{"externalLabelKey": "externalLabelValue"}, } lokiBackendLogger := log.New("ngalert.state.historian", "backend", "loki") @@ -915,12 +916,12 @@ func createTestRule() history_model.RuleMeta { } } -func requireSingleEntry(t *testing.T, res Stream) LokiEntry { +func requireSingleEntry(t *testing.T, res lokiclient.Stream) LokiEntry { require.Len(t, res.Values, 1) return requireEntry(t, res.Values[0]) } -func requireEntry(t *testing.T, row Sample) LokiEntry { +func requireEntry(t *testing.T, row lokiclient.Sample) LokiEntry { t.Helper() var entry LokiEntry @@ -929,16 +930,6 @@ func requireEntry(t *testing.T, row Sample) LokiEntry { return entry } -func badResponse() *http.Response { - return &http.Response{ - Status: "400 Bad Request", - StatusCode: http.StatusBadRequest, - Body: io.NopCloser(bytes.NewBufferString("")), - ContentLength: int64(0), - Header: make(http.Header, 0), - } -} - func readBody(t *testing.T, req *http.Request) []byte { t.Helper() diff --git a/pkg/services/ngalert/state/historian/testing.go b/pkg/services/ngalert/state/historian/testing.go index 8ed51e881a2..c6cebe83219 100644 --- a/pkg/services/ngalert/state/historian/testing.go +++ b/pkg/services/ngalert/state/historian/testing.go @@ -1,43 +1,12 @@ package historian import ( - "bytes" "context" "fmt" - "io" - "net/http" "github.com/grafana/grafana/pkg/services/annotations" ) -type fakeRequester struct { - lastRequest *http.Request - resp *http.Response -} - -func NewFakeRequester() *fakeRequester { - return &fakeRequester{ - resp: &http.Response{ - Status: "200 OK", - StatusCode: 200, - Body: io.NopCloser(bytes.NewBufferString("")), - ContentLength: int64(0), - Header: make(http.Header, 0), - }, - } -} - -func (f *fakeRequester) WithResponse(resp *http.Response) *fakeRequester { - f.resp = resp - return f -} - -func (f *fakeRequester) Do(req *http.Request) (*http.Response, error) { - f.lastRequest = req - f.resp.Request = req // Not concurrency-safe! - return f.resp, nil -} - type failingAnnotationRepo struct{} func (f *failingAnnotationRepo) SaveMany(_ context.Context, _ []annotations.Item) error { diff --git a/pkg/setting/setting_unified_alerting.go b/pkg/setting/setting_unified_alerting.go index f78485d239e..665c751532f 100644 --- a/pkg/setting/setting_unified_alerting.go +++ b/pkg/setting/setting_unified_alerting.go @@ -64,6 +64,7 @@ const ( // DefaultRuleEvaluationInterval indicates a default interval of for how long a rule should be evaluated to change state from Pending to Alerting DefaultRuleEvaluationInterval = SchedulerBaseInterval * 6 // == 60 seconds stateHistoryDefaultEnabled = true + notificationHistoryDefaultEnabled = false lokiDefaultMaxQueryLength = 721 * time.Hour // 30d1h, matches the default value in Loki defaultRecordingRequestTimeout = 10 * time.Second lokiDefaultMaxQuerySize = 65536 // 64kb @@ -122,6 +123,7 @@ type UnifiedAlertingSettings struct { ReservedLabels UnifiedAlertingReservedLabelSettings SkipClustering bool StateHistory UnifiedAlertingStateHistorySettings + NotificationHistory UnifiedAlertingNotificationHistorySettings RemoteAlertmanager RemoteAlertmanagerSettings RecordingRules RecordingRuleSettings PrometheusConversion UnifiedAlertingPrometheusConversionSettings @@ -181,19 +183,24 @@ type UnifiedAlertingPrometheusConversionSettings struct { RuleQueryOffset time.Duration } -type UnifiedAlertingStateHistorySettings struct { - Enabled bool - Backend string +type UnifiedAlertingLokiSettings struct { LokiRemoteURL string LokiReadURL string LokiWriteURL string LokiTenantID string // LokiBasicAuthUsername and LokiBasicAuthPassword are used for basic auth // if one of them is set. - LokiBasicAuthPassword string - LokiBasicAuthUsername string - LokiMaxQueryLength time.Duration - LokiMaxQuerySize int + LokiBasicAuthPassword string + LokiBasicAuthUsername string + LokiMaxQueryLength time.Duration + LokiMaxQuerySize int + ExternalLabels map[string]string +} + +type UnifiedAlertingStateHistorySettings struct { + Enabled bool + Backend string + LokiSettings UnifiedAlertingLokiSettings PrometheusMetricName string PrometheusTargetDatasourceUID string PrometheusWriteTimeout time.Duration @@ -202,6 +209,11 @@ type UnifiedAlertingStateHistorySettings struct { ExternalLabels map[string]string } +type UnifiedAlertingNotificationHistorySettings struct { + Enabled bool + LokiSettings UnifiedAlertingLokiSettings +} + // IsEnabled returns true if UnifiedAlertingSettings.Enabled is either nil or true. // It hides the implementation details of the Enabled and simplifies its usage. func (u *UnifiedAlertingSettings) IsEnabled() bool { @@ -453,16 +465,18 @@ func (cfg *Cfg) ReadUnifiedAlertingSettings(iniFile *ini.File) error { stateHistory := iniFile.Section("unified_alerting.state_history") stateHistoryLabels := iniFile.Section("unified_alerting.state_history.external_labels") uaCfgStateHistory := UnifiedAlertingStateHistorySettings{ - Enabled: stateHistory.Key("enabled").MustBool(stateHistoryDefaultEnabled), - Backend: stateHistory.Key("backend").MustString("annotations"), - LokiRemoteURL: stateHistory.Key("loki_remote_url").MustString(""), - LokiReadURL: stateHistory.Key("loki_remote_read_url").MustString(""), - LokiWriteURL: stateHistory.Key("loki_remote_write_url").MustString(""), - LokiTenantID: stateHistory.Key("loki_tenant_id").MustString(""), - LokiBasicAuthUsername: stateHistory.Key("loki_basic_auth_username").MustString(""), - LokiBasicAuthPassword: stateHistory.Key("loki_basic_auth_password").MustString(""), - LokiMaxQueryLength: stateHistory.Key("loki_max_query_length").MustDuration(lokiDefaultMaxQueryLength), - LokiMaxQuerySize: stateHistory.Key("loki_max_query_size").MustInt(lokiDefaultMaxQuerySize), + Enabled: stateHistory.Key("enabled").MustBool(stateHistoryDefaultEnabled), + Backend: stateHistory.Key("backend").MustString("annotations"), + LokiSettings: UnifiedAlertingLokiSettings{ + LokiRemoteURL: stateHistory.Key("loki_remote_url").MustString(""), + LokiReadURL: stateHistory.Key("loki_remote_read_url").MustString(""), + LokiWriteURL: stateHistory.Key("loki_remote_write_url").MustString(""), + LokiTenantID: stateHistory.Key("loki_tenant_id").MustString(""), + LokiBasicAuthUsername: stateHistory.Key("loki_basic_auth_username").MustString(""), + LokiBasicAuthPassword: stateHistory.Key("loki_basic_auth_password").MustString(""), + LokiMaxQueryLength: stateHistory.Key("loki_max_query_length").MustDuration(lokiDefaultMaxQueryLength), + LokiMaxQuerySize: stateHistory.Key("loki_max_query_size").MustInt(lokiDefaultMaxQuerySize), + }, MultiPrimary: stateHistory.Key("primary").MustString(""), MultiSecondaries: splitTrim(stateHistory.Key("secondaries").MustString(""), ","), PrometheusMetricName: stateHistory.Key("prometheus_metric_name").MustString(defaultHistorianPrometheusMetricName), @@ -472,6 +486,24 @@ func (cfg *Cfg) ReadUnifiedAlertingSettings(iniFile *ini.File) error { } uaCfg.StateHistory = uaCfgStateHistory + notificationHistory := iniFile.Section("unified_alerting.notification_history") + notificationHistoryLabels := iniFile.Section("unified_alerting.notification_history.external_labels") + uaCfgNotificationHistory := UnifiedAlertingNotificationHistorySettings{ + Enabled: notificationHistory.Key("enabled").MustBool(notificationHistoryDefaultEnabled), + LokiSettings: UnifiedAlertingLokiSettings{ + LokiRemoteURL: notificationHistory.Key("loki_remote_url").MustString(""), + LokiReadURL: notificationHistory.Key("loki_remote_read_url").MustString(""), + LokiWriteURL: notificationHistory.Key("loki_remote_write_url").MustString(""), + LokiTenantID: notificationHistory.Key("loki_tenant_id").MustString(""), + LokiBasicAuthUsername: notificationHistory.Key("loki_basic_auth_username").MustString(""), + LokiBasicAuthPassword: notificationHistory.Key("loki_basic_auth_password").MustString(""), + LokiMaxQueryLength: notificationHistory.Key("loki_max_query_length").MustDuration(lokiDefaultMaxQueryLength), + LokiMaxQuerySize: notificationHistory.Key("loki_max_query_size").MustInt(lokiDefaultMaxQuerySize), + ExternalLabels: notificationHistoryLabels.KeysHash(), + }, + } + uaCfg.NotificationHistory = uaCfgNotificationHistory + prometheusConversion := iniFile.Section("unified_alerting.prometheus_conversion") uaCfg.PrometheusConversion = UnifiedAlertingPrometheusConversionSettings{ RuleQueryOffset: prometheusConversion.Key("rule_query_offset").MustDuration(time.Minute),