Alerting: Support context.Context in Loki interface (#61979)

This commit adds support for canceleable contexts in the Loki
interface.
pull/62174/head
George Robinson 2 years ago committed by GitHub
parent e6e560e3ed
commit a7eab8e46e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 21
      pkg/services/ngalert/ngalert.go
  2. 2
      pkg/services/ngalert/ngalert_test.go
  3. 10
      pkg/services/ngalert/state/historian/loki.go
  4. 7
      pkg/services/ngalert/state/historian/loki_http.go
  5. 5
      pkg/services/ngalert/state/historian/loki_http_test.go

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net/url"
"time"
"github.com/benbjohnson/clock"
"golang.org/x/sync/errgroup"
@ -143,6 +144,10 @@ type AlertNG struct {
func (ng *AlertNG) init() error {
var err error
// AlertNG should be initialized before the cancellation deadline of initCtx
initCtx, cancelFunc := context.WithTimeout(context.Background(), 30*time.Second)
defer cancelFunc()
store := &store.DBstore{
Cfg: ng.Cfg.UnifiedAlerting,
FeatureToggles: ng.FeatureToggles,
@ -168,7 +173,7 @@ func (ng *AlertNG) init() error {
ng.imageService = imageService
// Let's make sure we're able to complete an initial sync of Alertmanagers before we start the alerting components.
if err := ng.MultiOrgAlertmanager.LoadAndSyncAlertmanagersForOrgs(context.Background()); err != nil {
if err := ng.MultiOrgAlertmanager.LoadAndSyncAlertmanagersForOrgs(initCtx); err != nil {
return fmt.Errorf("failed to initialize alerting because multiorg alertmanager manager failed to warm up: %w", err)
}
@ -205,7 +210,7 @@ func (ng *AlertNG) init() error {
Tracer: ng.tracer,
}
history, err := configureHistorianBackend(ng.Cfg.UnifiedAlerting.StateHistory, ng.annotationsRepo, ng.dashboardService, ng.store)
history, err := configureHistorianBackend(initCtx, ng.Cfg.UnifiedAlerting.StateHistory, ng.annotationsRepo, ng.dashboardService, ng.store)
if err != nil {
return err
}
@ -223,7 +228,7 @@ func (ng *AlertNG) init() error {
// if it is required to include folder title to the alerts, we need to subscribe to changes of alert title
if !ng.Cfg.UnifiedAlerting.ReservedLabels.IsReservedLabelDisabled(models.FolderTitleLabel) {
subscribeToFolderChanges(ng.Log, ng.bus, store, scheduler)
subscribeToFolderChanges(context.Background(), ng.Log, ng.bus, store, scheduler)
}
ng.stateManager = stateManager
@ -290,14 +295,14 @@ func (ng *AlertNG) init() error {
return DeclareFixedRoles(ng.accesscontrolService)
}
func subscribeToFolderChanges(logger log.Logger, bus bus.Bus, dbStore api.RuleStore, scheduler schedule.ScheduleService) {
func subscribeToFolderChanges(ctx context.Context, logger log.Logger, bus bus.Bus, dbStore api.RuleStore, scheduler schedule.ScheduleService) {
// if folder title is changed, we update all alert rules in that folder to make sure that all peers (in HA mode) will update folder title and
// clean up the current state
bus.AddEventListener(func(ctx context.Context, e *events.FolderTitleUpdated) error {
// do not block the upstream execution
go func(evt *events.FolderTitleUpdated) {
logger.Info("Got folder title updated event. updating rules in the folder", "folderUID", evt.UID)
updated, err := dbStore.IncreaseVersionForAllRulesInNamespace(context.Background(), evt.OrgID, evt.UID)
updated, err := dbStore.IncreaseVersionForAllRulesInNamespace(ctx, evt.OrgID, evt.UID)
if err != nil {
logger.Error("Failed to update alert rules in the folder after its title was changed", "error", err, "folderUID", evt.UID, "folder", evt.Title)
return
@ -378,7 +383,7 @@ func readQuotaConfig(cfg *setting.Cfg) (*quota.Map, error) {
return limits, nil
}
func configureHistorianBackend(cfg setting.UnifiedAlertingStateHistorySettings, ar annotations.Repository, ds dashboards.DashboardService, rs historian.RuleStore) (state.Historian, error) {
func configureHistorianBackend(ctx context.Context, cfg setting.UnifiedAlertingStateHistorySettings, ar annotations.Repository, ds dashboards.DashboardService, rs historian.RuleStore) (state.Historian, error) {
if !cfg.Enabled {
return historian.NewNopHistorian(), nil
}
@ -397,7 +402,9 @@ func configureHistorianBackend(cfg setting.UnifiedAlertingStateHistorySettings,
BasicAuthPassword: cfg.LokiBasicAuthPassword,
TenantID: cfg.LokiTenantID,
})
if err := backend.TestConnection(); err != nil {
testConnCtx, cancelFunc := context.WithTimeout(ctx, 10*time.Second)
defer cancelFunc()
if err := backend.TestConnection(testConnCtx); err != nil {
return nil, fmt.Errorf("failed to ping the remote loki historian: %w", err)
}
return backend, nil

@ -37,7 +37,7 @@ func Test_subscribeToFolderChanges(t *testing.T) {
scheduler := &schedule.FakeScheduleService{}
scheduler.On("UpdateAlertRule", mock.Anything, mock.Anything).Return()
subscribeToFolderChanges(log.New("test"), bus, db, scheduler)
subscribeToFolderChanges(context.Background(), log.New("test"), bus, db, scheduler)
err := bus.Publish(context.Background(), &events.FolderTitleUpdated{
Timestamp: time.Now(),

@ -23,8 +23,8 @@ const (
)
type remoteLokiClient interface {
ping() error
push([]stream) error
ping(context.Context) error
push(context.Context, []stream) error
}
type RemoteLokiBackend struct {
@ -40,8 +40,8 @@ func NewRemoteLokiBackend(cfg LokiConfig) *RemoteLokiBackend {
}
}
func (h *RemoteLokiBackend) TestConnection() error {
return h.client.ping()
func (h *RemoteLokiBackend) TestConnection(ctx context.Context) error {
return h.client.ping(ctx)
}
func (h *RemoteLokiBackend) RecordStatesAsync(ctx context.Context, rule history_model.RuleMeta, states []state.StateTransition) <-chan error {
@ -116,7 +116,7 @@ func (h *RemoteLokiBackend) recordStreamsAsync(ctx context.Context, streams []st
}
func (h *RemoteLokiBackend) recordStreams(ctx context.Context, streams []stream, logger log.Logger) error {
if err := h.client.push(streams); err != nil {
if err := h.client.push(ctx, streams); err != nil {
return err
}
logger.Debug("Done saving alert state history batch")

@ -2,6 +2,7 @@ package historian
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
@ -37,7 +38,7 @@ func newLokiClient(cfg LokiConfig, logger log.Logger) *httpLokiClient {
}
}
func (c *httpLokiClient) ping() error {
func (c *httpLokiClient) ping(ctx context.Context) error {
uri := c.cfg.Url.JoinPath("/loki/api/v1/labels")
req, err := http.NewRequest(http.MethodGet, uri.String(), nil)
if err != nil {
@ -45,6 +46,7 @@ func (c *httpLokiClient) ping() error {
}
c.setAuthAndTenantHeaders(req)
req = req.WithContext(ctx)
res, err := c.client.Do(req)
if res != nil {
defer func() {
@ -80,7 +82,7 @@ func (r *row) MarshalJSON() ([]byte, error) {
})
}
func (c *httpLokiClient) push(s []stream) error {
func (c *httpLokiClient) push(ctx context.Context, s []stream) error {
body := struct {
Streams []stream `json:"streams"`
}{Streams: s}
@ -98,6 +100,7 @@ func (c *httpLokiClient) push(s []stream) error {
c.setAuthAndTenantHeaders(req)
req.Header.Add("content-type", "application/json")
req = req.WithContext(ctx)
resp, err := c.client.Do(req)
if resp != nil {
defer func() {

@ -1,6 +1,7 @@
package historian
import (
"context"
"net/url"
"testing"
@ -21,7 +22,7 @@ func TestLokiHTTPClient(t *testing.T) {
}, log.NewNopLogger())
// Unauthorized request should fail against Grafana Cloud.
err = client.ping()
err = client.ping(context.Background())
require.Error(t, err)
client.cfg.BasicAuthUser = "<your_username>"
@ -32,7 +33,7 @@ func TestLokiHTTPClient(t *testing.T) {
// client.cfg.TenantID = "<your_tenant_id>"
// Authorized request should fail against Grafana Cloud.
err = client.ping()
err = client.ping(context.Background())
require.NoError(t, err)
})
}

Loading…
Cancel
Save