diff --git a/pkg/services/ngalert/ngalert.go b/pkg/services/ngalert/ngalert.go index a5a9fa894db..e113610b66d 100644 --- a/pkg/services/ngalert/ngalert.go +++ b/pkg/services/ngalert/ngalert.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/url" + "time" "github.com/benbjohnson/clock" "golang.org/x/sync/errgroup" @@ -143,6 +144,10 @@ type AlertNG struct { func (ng *AlertNG) init() error { var err error + // AlertNG should be initialized before the cancellation deadline of initCtx + initCtx, cancelFunc := context.WithTimeout(context.Background(), 30*time.Second) + defer cancelFunc() + store := &store.DBstore{ Cfg: ng.Cfg.UnifiedAlerting, FeatureToggles: ng.FeatureToggles, @@ -168,7 +173,7 @@ func (ng *AlertNG) init() error { ng.imageService = imageService // Let's make sure we're able to complete an initial sync of Alertmanagers before we start the alerting components. - if err := ng.MultiOrgAlertmanager.LoadAndSyncAlertmanagersForOrgs(context.Background()); err != nil { + if err := ng.MultiOrgAlertmanager.LoadAndSyncAlertmanagersForOrgs(initCtx); err != nil { return fmt.Errorf("failed to initialize alerting because multiorg alertmanager manager failed to warm up: %w", err) } @@ -205,7 +210,7 @@ func (ng *AlertNG) init() error { Tracer: ng.tracer, } - history, err := configureHistorianBackend(ng.Cfg.UnifiedAlerting.StateHistory, ng.annotationsRepo, ng.dashboardService, ng.store) + history, err := configureHistorianBackend(initCtx, ng.Cfg.UnifiedAlerting.StateHistory, ng.annotationsRepo, ng.dashboardService, ng.store) if err != nil { return err } @@ -223,7 +228,7 @@ func (ng *AlertNG) init() error { // if it is required to include folder title to the alerts, we need to subscribe to changes of alert title if !ng.Cfg.UnifiedAlerting.ReservedLabels.IsReservedLabelDisabled(models.FolderTitleLabel) { - subscribeToFolderChanges(ng.Log, ng.bus, store, scheduler) + subscribeToFolderChanges(context.Background(), ng.Log, ng.bus, store, scheduler) } ng.stateManager = stateManager @@ -290,14 +295,14 @@ func (ng *AlertNG) init() error { return DeclareFixedRoles(ng.accesscontrolService) } -func subscribeToFolderChanges(logger log.Logger, bus bus.Bus, dbStore api.RuleStore, scheduler schedule.ScheduleService) { +func subscribeToFolderChanges(ctx context.Context, logger log.Logger, bus bus.Bus, dbStore api.RuleStore, scheduler schedule.ScheduleService) { // if folder title is changed, we update all alert rules in that folder to make sure that all peers (in HA mode) will update folder title and // clean up the current state bus.AddEventListener(func(ctx context.Context, e *events.FolderTitleUpdated) error { // do not block the upstream execution go func(evt *events.FolderTitleUpdated) { logger.Info("Got folder title updated event. updating rules in the folder", "folderUID", evt.UID) - updated, err := dbStore.IncreaseVersionForAllRulesInNamespace(context.Background(), evt.OrgID, evt.UID) + updated, err := dbStore.IncreaseVersionForAllRulesInNamespace(ctx, evt.OrgID, evt.UID) if err != nil { logger.Error("Failed to update alert rules in the folder after its title was changed", "error", err, "folderUID", evt.UID, "folder", evt.Title) return @@ -378,7 +383,7 @@ func readQuotaConfig(cfg *setting.Cfg) (*quota.Map, error) { return limits, nil } -func configureHistorianBackend(cfg setting.UnifiedAlertingStateHistorySettings, ar annotations.Repository, ds dashboards.DashboardService, rs historian.RuleStore) (state.Historian, error) { +func configureHistorianBackend(ctx context.Context, cfg setting.UnifiedAlertingStateHistorySettings, ar annotations.Repository, ds dashboards.DashboardService, rs historian.RuleStore) (state.Historian, error) { if !cfg.Enabled { return historian.NewNopHistorian(), nil } @@ -397,7 +402,9 @@ func configureHistorianBackend(cfg setting.UnifiedAlertingStateHistorySettings, BasicAuthPassword: cfg.LokiBasicAuthPassword, TenantID: cfg.LokiTenantID, }) - if err := backend.TestConnection(); err != nil { + testConnCtx, cancelFunc := context.WithTimeout(ctx, 10*time.Second) + defer cancelFunc() + if err := backend.TestConnection(testConnCtx); err != nil { return nil, fmt.Errorf("failed to ping the remote loki historian: %w", err) } return backend, nil diff --git a/pkg/services/ngalert/ngalert_test.go b/pkg/services/ngalert/ngalert_test.go index 2d70e25c5ce..ceb93ca58ca 100644 --- a/pkg/services/ngalert/ngalert_test.go +++ b/pkg/services/ngalert/ngalert_test.go @@ -37,7 +37,7 @@ func Test_subscribeToFolderChanges(t *testing.T) { scheduler := &schedule.FakeScheduleService{} scheduler.On("UpdateAlertRule", mock.Anything, mock.Anything).Return() - subscribeToFolderChanges(log.New("test"), bus, db, scheduler) + subscribeToFolderChanges(context.Background(), log.New("test"), bus, db, scheduler) err := bus.Publish(context.Background(), &events.FolderTitleUpdated{ Timestamp: time.Now(), diff --git a/pkg/services/ngalert/state/historian/loki.go b/pkg/services/ngalert/state/historian/loki.go index a37f6b40bac..466fc6137c4 100644 --- a/pkg/services/ngalert/state/historian/loki.go +++ b/pkg/services/ngalert/state/historian/loki.go @@ -23,8 +23,8 @@ const ( ) type remoteLokiClient interface { - ping() error - push([]stream) error + ping(context.Context) error + push(context.Context, []stream) error } type RemoteLokiBackend struct { @@ -40,8 +40,8 @@ func NewRemoteLokiBackend(cfg LokiConfig) *RemoteLokiBackend { } } -func (h *RemoteLokiBackend) TestConnection() error { - return h.client.ping() +func (h *RemoteLokiBackend) TestConnection(ctx context.Context) error { + return h.client.ping(ctx) } func (h *RemoteLokiBackend) RecordStatesAsync(ctx context.Context, rule history_model.RuleMeta, states []state.StateTransition) <-chan error { @@ -116,7 +116,7 @@ func (h *RemoteLokiBackend) recordStreamsAsync(ctx context.Context, streams []st } func (h *RemoteLokiBackend) recordStreams(ctx context.Context, streams []stream, logger log.Logger) error { - if err := h.client.push(streams); err != nil { + if err := h.client.push(ctx, streams); err != nil { return err } logger.Debug("Done saving alert state history batch") diff --git a/pkg/services/ngalert/state/historian/loki_http.go b/pkg/services/ngalert/state/historian/loki_http.go index 457516b0b4f..708c48965a9 100644 --- a/pkg/services/ngalert/state/historian/loki_http.go +++ b/pkg/services/ngalert/state/historian/loki_http.go @@ -2,6 +2,7 @@ package historian import ( "bytes" + "context" "encoding/json" "fmt" "io" @@ -37,7 +38,7 @@ func newLokiClient(cfg LokiConfig, logger log.Logger) *httpLokiClient { } } -func (c *httpLokiClient) ping() error { +func (c *httpLokiClient) ping(ctx context.Context) error { uri := c.cfg.Url.JoinPath("/loki/api/v1/labels") req, err := http.NewRequest(http.MethodGet, uri.String(), nil) if err != nil { @@ -45,6 +46,7 @@ func (c *httpLokiClient) ping() error { } c.setAuthAndTenantHeaders(req) + req = req.WithContext(ctx) res, err := c.client.Do(req) if res != nil { defer func() { @@ -80,7 +82,7 @@ func (r *row) MarshalJSON() ([]byte, error) { }) } -func (c *httpLokiClient) push(s []stream) error { +func (c *httpLokiClient) push(ctx context.Context, s []stream) error { body := struct { Streams []stream `json:"streams"` }{Streams: s} @@ -98,6 +100,7 @@ func (c *httpLokiClient) push(s []stream) error { c.setAuthAndTenantHeaders(req) req.Header.Add("content-type", "application/json") + req = req.WithContext(ctx) resp, err := c.client.Do(req) if resp != nil { defer func() { diff --git a/pkg/services/ngalert/state/historian/loki_http_test.go b/pkg/services/ngalert/state/historian/loki_http_test.go index 89947ccd8fc..f9ec280e4f1 100644 --- a/pkg/services/ngalert/state/historian/loki_http_test.go +++ b/pkg/services/ngalert/state/historian/loki_http_test.go @@ -1,6 +1,7 @@ package historian import ( + "context" "net/url" "testing" @@ -21,7 +22,7 @@ func TestLokiHTTPClient(t *testing.T) { }, log.NewNopLogger()) // Unauthorized request should fail against Grafana Cloud. - err = client.ping() + err = client.ping(context.Background()) require.Error(t, err) client.cfg.BasicAuthUser = "" @@ -32,7 +33,7 @@ func TestLokiHTTPClient(t *testing.T) { // client.cfg.TenantID = "" // Authorized request should fail against Grafana Cloud. - err = client.ping() + err = client.ping(context.Background()) require.NoError(t, err) }) }