diff --git a/conf/defaults.ini b/conf/defaults.ini index 3f13b0ec1a3..001cbbce61c 100644 --- a/conf/defaults.ini +++ b/conf/defaults.ini @@ -1302,6 +1302,10 @@ loki_basic_auth_username = # Optional password for basic authentication on requests sent to Loki. Can be left blank. loki_basic_auth_password = +# For "loki" only. +# Optional max query length for queries sent to Loki. Default is 721h which matches the default Loki value. +loki_max_query_length = 721h + [unified_alerting.state_history.external_labels] # Optional extra labels to attach to outbound state history records or log streams. # Any number of label key-value-pairs can be provided. diff --git a/conf/sample.ini b/conf/sample.ini index f62b1df8c25..1c52da72f8d 100644 --- a/conf/sample.ini +++ b/conf/sample.ini @@ -1195,6 +1195,10 @@ # Optional password for basic authentication on requests sent to Loki. Can be left blank. ; loki_basic_auth_password = "mypass" +# For "loki" only. +# Optional max query length for queries sent to Loki. Default is 721h which matches the default Loki value. +; loki_max_query_length = 360h + [unified_alerting.state_history.external_labels] # Optional extra labels to attach to outbound state history records or log streams. # Any number of label key-value-pairs can be provided. diff --git a/pkg/services/annotations/annotationsimpl/loki/historian_store_test.go b/pkg/services/annotations/annotationsimpl/loki/historian_store_test.go index 6ed3933de88..56b999da54f 100644 --- a/pkg/services/annotations/annotationsimpl/loki/historian_store_test.go +++ b/pkg/services/annotations/annotationsimpl/loki/historian_store_test.go @@ -88,7 +88,7 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) { t.Run("can query history by alert id", func(t *testing.T) { rule := dashboardRules[dashboard1.UID][0] - fakeLokiClient.Response = []historian.Stream{ + fakeLokiClient.rangeQueryRes = []historian.Stream{ historian.StatesToStream(ruleMetaFromRule(t, rule), transitions, map[string]string{}, log.NewNopLogger()), } @@ -113,7 +113,7 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) { }) t.Run("can query history by dashboard id", func(t *testing.T) { - fakeLokiClient.Response = []historian.Stream{ + fakeLokiClient.rangeQueryRes = []historian.Stream{ historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][0]), transitions, map[string]string{}, log.NewNopLogger()), historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][1]), transitions, map[string]string{}, log.NewNopLogger()), } @@ -139,7 +139,7 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) { }) t.Run("should return empty results when type is annotation", func(t *testing.T) { - fakeLokiClient.Response = []historian.Stream{ + fakeLokiClient.rangeQueryRes = []historian.Stream{ historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][0]), transitions, map[string]string{}, log.NewNopLogger()), historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][1]), transitions, map[string]string{}, log.NewNopLogger()), } @@ -163,7 +163,7 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) { }) t.Run("should return empty results when history is outside time range", func(t *testing.T) { - fakeLokiClient.Response = []historian.Stream{ + fakeLokiClient.rangeQueryRes = []historian.Stream{ historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][0]), transitions, map[string]string{}, log.NewNopLogger()), historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][1]), transitions, map[string]string{}, log.NewNopLogger()), } @@ -188,8 +188,41 @@ func TestIntegrationAlertStateHistoryStore(t *testing.T) { require.Len(t, res, 0) }) + t.Run("should return partial results when history is partly outside clamped time range", func(t *testing.T) { + fakeLokiClient.rangeQueryRes = []historian.Stream{ + historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][0]), transitions, map[string]string{}, log.NewNopLogger()), + historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][1]), transitions, map[string]string{}, log.NewNopLogger()), + } + + // clamp time range to 1 second + oldMax := fakeLokiClient.cfg.MaxQueryLength + fakeLokiClient.cfg.MaxQueryLength = 1 * time.Second + + query := annotations.ItemQuery{ + OrgID: 1, + DashboardID: dashboard1.ID, + From: start.Add(-1 * time.Second).UnixMilli(), // should clamp to start + To: start.Add(1 * time.Second).UnixMilli(), + } + res, err := store.Get( + context.Background(), + &query, + &annotation_ac.AccessResources{ + Dashboards: map[string]int64{ + dashboard1.UID: dashboard1.ID, + }, + CanAccessDashAnnotations: true, + }, + ) + require.NoError(t, err) + require.Len(t, res, 2) + + // restore original max query length + fakeLokiClient.cfg.MaxQueryLength = oldMax + }) + t.Run("should sort history by time", func(t *testing.T) { - fakeLokiClient.Response = []historian.Stream{ + fakeLokiClient.rangeQueryRes = []historian.Stream{ historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][0]), transitions, map[string]string{}, log.NewNopLogger()), historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][1]), transitions, map[string]string{}, log.NewNopLogger()), } @@ -716,11 +749,11 @@ func compareAnnotationItem(t *testing.T, expected, actual *annotations.ItemDTO) } type FakeLokiClient struct { - client client.Requester - cfg historian.LokiConfig - metrics *metrics.Historian - log log.Logger - Response []historian.Stream + client client.Requester + cfg historian.LokiConfig + metrics *metrics.Historian + log log.Logger + rangeQueryRes []historian.Stream } func NewFakeLokiClient() *FakeLokiClient { @@ -731,19 +764,23 @@ func NewFakeLokiClient() *FakeLokiClient { return &FakeLokiClient{ client: client.NewTimedClient(req, metrics.WriteDuration), cfg: historian.LokiConfig{ - WritePathURL: url, - ReadPathURL: url, - Encoder: historian.JsonEncoder{}, + WritePathURL: url, + ReadPathURL: url, + Encoder: historian.JsonEncoder{}, + MaxQueryLength: 721 * time.Hour, }, metrics: metrics, log: log.New("ngalert.state.historian", "backend", "loki"), } } -func (c *FakeLokiClient) RangeQuery(_ context.Context, _ string, from, to, _ int64) (historian.QueryRes, error) { - streams := make([]historian.Stream, len(c.Response)) +func (c *FakeLokiClient) RangeQuery(ctx context.Context, query string, from, to, limit int64) (historian.QueryRes, error) { + streams := make([]historian.Stream, len(c.rangeQueryRes)) + + // clamp time range using logic from historian + from, to = historian.ClampRange(from, to, c.cfg.MaxQueryLength.Nanoseconds()) - for n, stream := range c.Response { + for n, stream := range c.rangeQueryRes { streams[n].Stream = stream.Stream streams[n].Values = []historian.Sample{} for _, sample := range stream.Values { @@ -759,8 +796,9 @@ func (c *FakeLokiClient) RangeQuery(_ context.Context, _ string, from, to, _ int Result: streams, }, } + // reset expected streams on read - c.Response = []historian.Stream{} + c.rangeQueryRes = []historian.Stream{} return res, nil } diff --git a/pkg/services/ngalert/state/historian/loki_http.go b/pkg/services/ngalert/state/historian/loki_http.go index 89dde820476..44a7c0044eb 100644 --- a/pkg/services/ngalert/state/historian/loki_http.go +++ b/pkg/services/ngalert/state/historian/loki_http.go @@ -40,6 +40,7 @@ type LokiConfig struct { TenantID string ExternalLabels map[string]string Encoder encoder + MaxQueryLength time.Duration } func NewLokiConfig(cfg setting.UnifiedAlertingStateHistorySettings) (LokiConfig, error) { @@ -74,6 +75,7 @@ func NewLokiConfig(cfg setting.UnifiedAlertingStateHistorySettings) (LokiConfig, BasicAuthPassword: cfg.LokiBasicAuthPassword, TenantID: cfg.LokiTenantID, ExternalLabels: cfg.ExternalLabels, + MaxQueryLength: cfg.LokiMaxQueryLength, // Snappy-compressed protobuf is the default, same goes for Promtail. Encoder: SnappyProtoEncoder{}, }, nil @@ -193,26 +195,20 @@ func (c *HttpLokiClient) Push(ctx context.Context, s []Stream) error { c.metrics.BytesWritten.Add(float64(len(enc))) req = req.WithContext(ctx) resp, err := c.client.Do(req) - if resp != nil { - defer func() { - if err := resp.Body.Close(); err != nil { - c.log.Warn("Failed to close response body", "err", err) - } - }() - } if err != nil { return fmt.Errorf("failed to send request: %w", err) } - - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - byt, _ := io.ReadAll(resp.Body) - if len(byt) > 0 { - c.log.Error("Error response from Loki", "response", string(byt), "status", resp.StatusCode) - } else { - c.log.Error("Error response from Loki with an empty body", "status", resp.StatusCode) + defer func() { + if err := resp.Body.Close(); err != nil { + c.log.Warn("Failed to close response body", "err", err) } - return fmt.Errorf("received a non-200 response from loki, status: %d", resp.StatusCode) + }() + + _, err = c.handleLokiResponse(resp) + if err != nil { + return err } + return nil } @@ -231,6 +227,7 @@ func (c *HttpLokiClient) RangeQuery(ctx context.Context, logQL string, start, en if start > end { return QueryRes{}, fmt.Errorf("start time cannot be after end time") } + start, end = ClampRange(start, end, c.cfg.MaxQueryLength.Nanoseconds()) if limit < 1 { limit = defaultPageSize } @@ -261,23 +258,15 @@ func (c *HttpLokiClient) RangeQuery(ctx context.Context, logQL string, start, en if err != nil { return QueryRes{}, fmt.Errorf("error executing request: %w", err) } - defer func() { - _ = res.Body.Close() + if err := res.Body.Close(); err != nil { + c.log.Warn("Failed to close response body", "err", err) + } }() - data, err := io.ReadAll(res.Body) + data, err := c.handleLokiResponse(res) if err != nil { - return QueryRes{}, fmt.Errorf("error reading request response: %w", err) - } - - if res.StatusCode < 200 || res.StatusCode >= 300 { - if len(data) > 0 { - c.log.Error("Error response from Loki", "response", string(data), "status", res.StatusCode) - } else { - c.log.Error("Error response from Loki with an empty body", "status", res.StatusCode) - } - return QueryRes{}, fmt.Errorf("received a non-200 response from loki, status: %d", res.StatusCode) + return QueryRes{}, err } result := QueryRes{} @@ -297,3 +286,36 @@ type QueryRes struct { type QueryData struct { Result []Stream `json:"result"` } + +func (c *HttpLokiClient) handleLokiResponse(res *http.Response) ([]byte, error) { + if res == nil { + return nil, fmt.Errorf("response is nil") + } + + data, err := io.ReadAll(res.Body) + if err != nil { + return nil, fmt.Errorf("error reading request response: %w", err) + } + + if res.StatusCode < 200 || res.StatusCode >= 300 { + if len(data) > 0 { + c.log.Error("Error response from Loki", "response", string(data), "status", res.StatusCode) + } else { + c.log.Error("Error response from Loki with an empty body", "status", res.StatusCode) + } + return nil, fmt.Errorf("received a non-200 response from loki, status: %d", res.StatusCode) + } + + return data, nil +} + +// ClampRange ensures that the time range is within the configured maximum query length. +func ClampRange(start, end, maxTimeRange int64) (newStart int64, newEnd int64) { + newStart, newEnd = start, end + + if maxTimeRange != 0 && end-start > maxTimeRange { + newStart = end - maxTimeRange + } + + return newStart, newEnd +} diff --git a/pkg/services/ngalert/state/historian/loki_http_test.go b/pkg/services/ngalert/state/historian/loki_http_test.go index f180dd5e2f7..961f3238097 100644 --- a/pkg/services/ngalert/state/historian/loki_http_test.go +++ b/pkg/services/ngalert/state/historian/loki_http_test.go @@ -338,6 +338,49 @@ func TestStream(t *testing.T) { }) } +func TestClampRange(t *testing.T) { + tc := []struct { + name string + oldRange []int64 + max int64 + newRange []int64 + }{ + { + name: "clamps start value if max is smaller than range", + oldRange: []int64{5, 10}, + max: 1, + newRange: []int64{9, 10}, + }, + { + name: "returns same values if max is greater than range", + oldRange: []int64{5, 10}, + max: 20, + newRange: []int64{5, 10}, + }, + { + name: "returns same values if max is equal to range", + oldRange: []int64{5, 10}, + max: 5, + newRange: []int64{5, 10}, + }, + { + name: "returns same values if max is zero", + oldRange: []int64{5, 10}, + max: 0, + newRange: []int64{5, 10}, + }, + } + + for _, c := range tc { + t.Run(c.name, func(t *testing.T) { + start, end := ClampRange(c.oldRange[0], c.oldRange[1], c.max) + + require.Equal(t, c.newRange[0], start) + require.Equal(t, c.newRange[1], end) + }) + } +} + func createTestLokiClient(req client.Requester) *HttpLokiClient { url, _ := url.Parse("http://some.url") cfg := LokiConfig{ diff --git a/pkg/setting/setting_unified_alerting.go b/pkg/setting/setting_unified_alerting.go index e2508bf5336..eaca5d11afc 100644 --- a/pkg/setting/setting_unified_alerting.go +++ b/pkg/setting/setting_unified_alerting.go @@ -60,6 +60,7 @@ const ( // DefaultRuleEvaluationInterval indicates a default interval of for how long a rule should be evaluated to change state from Pending to Alerting DefaultRuleEvaluationInterval = SchedulerBaseInterval * 6 // == 60 seconds stateHistoryDefaultEnabled = true + lokiDefaultMaxQueryLength = 721 * time.Hour // 30d1h, matches the default value in Loki ) type UnifiedAlertingSettings struct { @@ -134,6 +135,7 @@ type UnifiedAlertingStateHistorySettings struct { // if one of them is set. LokiBasicAuthPassword string LokiBasicAuthUsername string + LokiMaxQueryLength time.Duration MultiPrimary string MultiSecondaries []string ExternalLabels map[string]string @@ -362,6 +364,7 @@ func (cfg *Cfg) ReadUnifiedAlertingSettings(iniFile *ini.File) error { LokiTenantID: stateHistory.Key("loki_tenant_id").MustString(""), LokiBasicAuthUsername: stateHistory.Key("loki_basic_auth_username").MustString(""), LokiBasicAuthPassword: stateHistory.Key("loki_basic_auth_password").MustString(""), + LokiMaxQueryLength: stateHistory.Key("loki_max_query_length").MustDuration(lokiDefaultMaxQueryLength), MultiPrimary: stateHistory.Key("primary").MustString(""), MultiSecondaries: splitTrim(stateHistory.Key("secondaries").MustString(""), ","), ExternalLabels: stateHistoryLabels.KeysHash(),