Alerting: Send configuration and state to the remote Alertmanager on shutdown (#78682)

* Alerting: Send configuration and state to the remote Alertmanager on shutdown

* Alerting: Add a sync interval for ApplyConfig in remote secondary mode

* add routine to sync states and configs

* pass a cancellable context to syncRoutine(), remove tests for ApplyConfig, cache last config in memory

* extract logic to update config and state in the remote Alertmanager

* get latest config from the database

* avoid using separate goroutine for updating state and config

* clean up PR

* refactor, comments, tests

* update tests

* remove canceled context from calls to StopAndWait()

* create context with timeout and send config and state to remote Alertmanager

* update tests

* address code review comments
pull/79494/head
Santiago 1 year ago committed by GitHub
parent 44e781a00b
commit 23b4568597
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      pkg/services/ngalert/remote/alertmanager.go
  2. 52
      pkg/services/ngalert/remote/forked_alertmanager_test.go
  3. 34
      pkg/services/ngalert/remote/remote_secondary_forked_alertmanager.go

@ -336,8 +336,6 @@ func (am *Alertmanager) TestTemplate(ctx context.Context, c apimodels.TestTempla
// In the context of a "remote Alertmanager" it is a good heuristic for Grafana is about to shut down or we no longer need you.
func (am *Alertmanager) StopAndWait() {
am.sender.Stop()
// Upload the configuration and state
}
func (am *Alertmanager) Ready() bool {

@ -323,11 +323,41 @@ func TestForkedAlertmanager_ModeRemoteSecondary(t *testing.T) {
})
t.Run("StopAndWait", func(tt *testing.T) {
// StopAndWait should be called on both Alertmanagers.
internal, remote, forked := genTestAlertmanagers(tt, modeRemoteSecondary)
internal.EXPECT().StopAndWait().Once()
remote.EXPECT().StopAndWait().Once()
forked.StopAndWait()
{
// StopAndWait should be called in both Alertmanagers.
// Methods to sync the Alertmanagers should be called on the remote Alertmanager.
internal, remote, forked := genTestAlertmanagers(tt, modeRemoteSecondary)
internal.EXPECT().StopAndWait().Once()
remote.EXPECT().StopAndWait().Once()
remote.EXPECT().CompareAndSendConfiguration(mock.Anything, mock.Anything).Return(nil).Once()
remote.EXPECT().CompareAndSendState(mock.Anything).Return(nil).Once()
forked.StopAndWait()
}
{
// An error in the remote Alertmanager should't be a problem.
// These errors are caught and logged.
internal, remote, forked := genTestAlertmanagers(tt, modeRemoteSecondary)
internal.EXPECT().StopAndWait().Once()
remote.EXPECT().StopAndWait().Once()
remote.EXPECT().CompareAndSendConfiguration(mock.Anything, mock.Anything).Return(expErr).Once()
remote.EXPECT().CompareAndSendState(mock.Anything).Return(expErr).Once()
forked.StopAndWait()
}
{
// An error when retrieving the configuration should cause
// CompareAndSendConfiguration not to be called.
internal, remote, forked := genTestAlertmanagers(tt, modeRemoteSecondary)
secondaryForked, ok := forked.(*RemoteSecondaryForkedAlertmanager)
require.True(t, ok)
secondaryForked.store = &errConfigStore{}
internal.EXPECT().StopAndWait().Once()
remote.EXPECT().StopAndWait().Once()
remote.EXPECT().CompareAndSendState(mock.Anything).Return(expErr).Once()
forked.StopAndWait()
}
})
t.Run("Ready", func(tt *testing.T) {
@ -583,9 +613,14 @@ func genTestAlertmanagersWithSyncInterval(t *testing.T, mode int, syncInterval t
remote := remote_alertmanager_mock.NewRemoteAlertmanagerMock(t)
if mode == modeRemoteSecondary {
configs := map[int64]*models.AlertConfiguration{
1: {},
}
cfg := RemoteSecondaryConfig{
Logger: log.NewNopLogger(),
SyncInterval: syncInterval,
OrgID: 1,
Store: notifier.NewFakeConfigStore(t, configs),
}
forked, err := NewRemoteSecondaryForkedAlertmanager(cfg, internal, remote)
require.NoError(t, err)
@ -593,3 +628,10 @@ func genTestAlertmanagersWithSyncInterval(t *testing.T, mode int, syncInterval t
}
return internal, remote, NewRemotePrimaryForkedAlertmanager(internal, remote)
}
// errConfigStore returns an error when a method is called.
type errConfigStore struct{}
func (s *errConfigStore) GetLatestAlertmanagerConfiguration(context.Context, int64) (*models.AlertConfiguration, error) {
return nil, errors.New("test error")
}

@ -12,6 +12,10 @@ import (
"github.com/grafana/grafana/pkg/services/ngalert/notifier"
)
type configStore interface {
GetLatestAlertmanagerConfiguration(ctx context.Context, orgID int64) (*models.AlertConfiguration, error)
}
//go:generate mockery --name remoteAlertmanager --structname RemoteAlertmanagerMock --with-expecter --output mock --outpkg alertmanager_mock
type remoteAlertmanager interface {
notifier.Alertmanager
@ -20,7 +24,9 @@ type remoteAlertmanager interface {
}
type RemoteSecondaryForkedAlertmanager struct {
log log.Logger
log log.Logger
orgID int64
store configStore
internal notifier.Alertmanager
remote remoteAlertmanager
@ -30,10 +36,13 @@ type RemoteSecondaryForkedAlertmanager struct {
}
type RemoteSecondaryConfig struct {
Logger log.Logger
OrgID int64
Store configStore
// SyncInterval determines how often we should attempt to synchronize
// state and configuration on the external Alertmanager.
SyncInterval time.Duration
Logger log.Logger
}
func (c *RemoteSecondaryConfig) Validate() error {
@ -49,6 +58,8 @@ func NewRemoteSecondaryForkedAlertmanager(cfg RemoteSecondaryConfig, internal no
}
return &RemoteSecondaryForkedAlertmanager{
log: cfg.Logger,
orgID: cfg.OrgID,
store: cfg.Store,
internal: internal,
remote: remote,
syncInterval: cfg.SyncInterval,
@ -160,9 +171,26 @@ func (fam *RemoteSecondaryForkedAlertmanager) CleanUp() {
}
func (fam *RemoteSecondaryForkedAlertmanager) StopAndWait() {
// Stop the internal Alertmanager.
fam.internal.StopAndWait()
// Stop our alert senders.
fam.remote.StopAndWait()
// TODO: send config and state on shutdown.
// Send config and state to the remote Alertmanager.
// Using context.TODO() here as we think we want to allow this operation to finish regardless of time.
ctx := context.TODO()
if err := fam.remote.CompareAndSendState(ctx); err != nil {
fam.log.Error("Error sending state to the remote Alertmanager while stopping", "err", err)
}
config, err := fam.store.GetLatestAlertmanagerConfiguration(ctx, fam.orgID)
if err != nil {
fam.log.Error("Error getting latest Alertmanager configuration while stopping", "err", err)
return
}
if err := fam.remote.CompareAndSendConfiguration(ctx, config); err != nil {
fam.log.Error("Error sending configuration to the remote Alertmanager while stopping", "err", err)
}
}
func (fam *RemoteSecondaryForkedAlertmanager) Ready() bool {

Loading…
Cancel
Save