mirror of https://github.com/grafana/grafana
Alerting: create wrapper for Alertmanager to enable org level isolation (#37320)
Introduces org-level isolation for the Alertmanager and its components. Silences, Alerts and Contact points are not separated by org and are not shared between them. Co-authored with @davidmparrott and @papagianpull/38485/merge
parent
7ebf4027a7
commit
7fbeefc090
@ -0,0 +1,147 @@ |
||||
package notifier |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"sync" |
||||
"time" |
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log" |
||||
"github.com/grafana/grafana/pkg/services/ngalert/metrics" |
||||
"github.com/grafana/grafana/pkg/services/ngalert/store" |
||||
"github.com/grafana/grafana/pkg/setting" |
||||
) |
||||
|
||||
var ( |
||||
SyncOrgsPollInterval = 1 * time.Minute |
||||
) |
||||
|
||||
var ( |
||||
ErrNoAlertmanagerForOrg = fmt.Errorf("Alertmanager does not exist for this organization") |
||||
ErrAlertmanagerNotReady = fmt.Errorf("Alertmanager is not ready yet") |
||||
) |
||||
|
||||
type MultiOrgAlertmanager struct { |
||||
alertmanagersMtx sync.RWMutex |
||||
alertmanagers map[int64]*Alertmanager |
||||
|
||||
settings *setting.Cfg |
||||
logger log.Logger |
||||
|
||||
configStore store.AlertingStore |
||||
orgStore store.OrgStore |
||||
|
||||
orgRegistry *metrics.OrgRegistries |
||||
} |
||||
|
||||
func NewMultiOrgAlertmanager(cfg *setting.Cfg, configStore store.AlertingStore, orgStore store.OrgStore) *MultiOrgAlertmanager { |
||||
return &MultiOrgAlertmanager{ |
||||
settings: cfg, |
||||
logger: log.New("multiorg.alertmanager"), |
||||
alertmanagers: map[int64]*Alertmanager{}, |
||||
configStore: configStore, |
||||
orgStore: orgStore, |
||||
orgRegistry: metrics.NewOrgRegistries(), |
||||
} |
||||
} |
||||
|
||||
func (moa *MultiOrgAlertmanager) Run(ctx context.Context) error { |
||||
moa.logger.Info("starting MultiOrg Alertmanager") |
||||
|
||||
for { |
||||
select { |
||||
case <-ctx.Done(): |
||||
moa.StopAndWait() |
||||
return nil |
||||
case <-time.After(SyncOrgsPollInterval): |
||||
if err := moa.LoadAndSyncAlertmanagersForOrgs(ctx); err != nil { |
||||
moa.logger.Error("error while synchronizing Alertmanager orgs", "err", err) |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
func (moa *MultiOrgAlertmanager) LoadAndSyncAlertmanagersForOrgs(ctx context.Context) error { |
||||
moa.logger.Debug("synchronizing Alertmanagers for orgs") |
||||
// First, load all the organizations from the database.
|
||||
orgIDs, err := moa.orgStore.GetOrgs(ctx) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
// Then, sync them by creating or deleting Alertmanagers as necessary.
|
||||
moa.SyncAlertmanagersForOrgs(orgIDs) |
||||
|
||||
moa.logger.Debug("done synchronizing Alertmanagers for orgs") |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func (moa *MultiOrgAlertmanager) SyncAlertmanagersForOrgs(orgIDs []int64) { |
||||
orgsFound := make(map[int64]struct{}, len(orgIDs)) |
||||
moa.alertmanagersMtx.Lock() |
||||
for _, orgID := range orgIDs { |
||||
orgsFound[orgID] = struct{}{} |
||||
|
||||
existing, found := moa.alertmanagers[orgID] |
||||
if !found { |
||||
reg := moa.orgRegistry.GetOrCreateOrgRegistry(orgID) |
||||
am, err := newAlertmanager(orgID, moa.settings, moa.configStore, metrics.NewMetrics(reg)) |
||||
if err != nil { |
||||
moa.logger.Error("unable to create Alertmanager for org", "org", orgID, "err", err) |
||||
} |
||||
moa.alertmanagers[orgID] = am |
||||
existing = am |
||||
} |
||||
|
||||
//TODO: This will create an N+1 query
|
||||
if err := existing.SyncAndApplyConfigFromDatabase(); err != nil { |
||||
moa.logger.Error("failed to apply Alertmanager config for org", "org", orgID, "err", err) |
||||
} |
||||
} |
||||
|
||||
amsToStop := map[int64]*Alertmanager{} |
||||
for orgId, am := range moa.alertmanagers { |
||||
if _, exists := orgsFound[orgId]; !exists { |
||||
amsToStop[orgId] = am |
||||
delete(moa.alertmanagers, orgId) |
||||
moa.orgRegistry.RemoveOrgRegistry(orgId) |
||||
} |
||||
} |
||||
moa.alertmanagersMtx.Unlock() |
||||
|
||||
// Now, we can stop the Alertmanagers without having to hold a lock.
|
||||
for orgID, am := range amsToStop { |
||||
moa.logger.Info("stopping Alertmanager", "org", orgID) |
||||
am.StopAndWait() |
||||
moa.logger.Info("stopped Alertmanager", "org", orgID) |
||||
} |
||||
} |
||||
|
||||
func (moa *MultiOrgAlertmanager) StopAndWait() { |
||||
moa.alertmanagersMtx.Lock() |
||||
defer moa.alertmanagersMtx.Unlock() |
||||
|
||||
for _, am := range moa.alertmanagers { |
||||
am.StopAndWait() |
||||
} |
||||
} |
||||
|
||||
// AlertmanagerFor returns the Alertmanager instance for the organization provided.
|
||||
// When the organization does not have an active Alertmanager, it returns a ErrNoAlertmanagerForOrg.
|
||||
// When the Alertmanager of the organization is not ready, it returns a ErrAlertmanagerNotReady.
|
||||
func (moa *MultiOrgAlertmanager) AlertmanagerFor(orgID int64) (*Alertmanager, error) { |
||||
moa.alertmanagersMtx.RLock() |
||||
defer moa.alertmanagersMtx.RUnlock() |
||||
|
||||
orgAM, existing := moa.alertmanagers[orgID] |
||||
if !existing { |
||||
return nil, ErrNoAlertmanagerForOrg |
||||
} |
||||
|
||||
if !orgAM.Ready() { |
||||
return nil, ErrAlertmanagerNotReady |
||||
} |
||||
|
||||
return orgAM, nil |
||||
} |
||||
@ -0,0 +1,92 @@ |
||||
package notifier |
||||
|
||||
import ( |
||||
"context" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/models" |
||||
"github.com/grafana/grafana/pkg/setting" |
||||
|
||||
"github.com/stretchr/testify/require" |
||||
) |
||||
|
||||
func TestMultiOrgAlertmanager_SyncAlertmanagersForOrgs(t *testing.T) { |
||||
configStore := &FakeConfigStore{ |
||||
configs: map[int64]*models.AlertConfiguration{}, |
||||
} |
||||
orgStore := &FakeOrgStore{ |
||||
orgs: []int64{1, 2, 3}, |
||||
} |
||||
SyncOrgsPollInterval = 10 * time.Minute // Don't poll in unit tests.
|
||||
mam := NewMultiOrgAlertmanager(&setting.Cfg{}, configStore, orgStore) |
||||
ctx := context.Background() |
||||
|
||||
// Ensure that one Alertmanager is created per org.
|
||||
{ |
||||
require.NoError(t, mam.LoadAndSyncAlertmanagersForOrgs(ctx)) |
||||
require.Len(t, mam.alertmanagers, 3) |
||||
} |
||||
// When an org is removed, it should detect it.
|
||||
{ |
||||
orgStore.orgs = []int64{1, 3} |
||||
require.NoError(t, mam.LoadAndSyncAlertmanagersForOrgs(ctx)) |
||||
require.Len(t, mam.alertmanagers, 2) |
||||
} |
||||
// if the org comes back, it should detect it.
|
||||
{ |
||||
orgStore.orgs = []int64{1, 2, 3, 4} |
||||
require.NoError(t, mam.LoadAndSyncAlertmanagersForOrgs(ctx)) |
||||
require.Len(t, mam.alertmanagers, 4) |
||||
} |
||||
} |
||||
|
||||
func TestMultiOrgAlertmanager_AlertmanagerFor(t *testing.T) { |
||||
configStore := &FakeConfigStore{ |
||||
configs: map[int64]*models.AlertConfiguration{}, |
||||
} |
||||
orgStore := &FakeOrgStore{ |
||||
orgs: []int64{1, 2, 3}, |
||||
} |
||||
|
||||
SyncOrgsPollInterval = 10 * time.Minute // Don't poll in unit tests.
|
||||
mam := NewMultiOrgAlertmanager(&setting.Cfg{}, configStore, orgStore) |
||||
ctx := context.Background() |
||||
|
||||
// Ensure that one Alertmanagers is created per org.
|
||||
{ |
||||
require.NoError(t, mam.LoadAndSyncAlertmanagersForOrgs(ctx)) |
||||
require.Len(t, mam.alertmanagers, 3) |
||||
} |
||||
|
||||
// First, let's try to request an Alertmanager from an org that doesn't exist.
|
||||
{ |
||||
_, err := mam.AlertmanagerFor(5) |
||||
require.EqualError(t, err, ErrNoAlertmanagerForOrg.Error()) |
||||
} |
||||
|
||||
// Now, let's try to request an Alertmanager that is not ready.
|
||||
{ |
||||
// let's delete its "running config" to make it non-ready
|
||||
mam.alertmanagers[1].config = nil |
||||
_, err := mam.AlertmanagerFor(1) |
||||
require.EqualError(t, err, ErrAlertmanagerNotReady.Error()) |
||||
} |
||||
|
||||
// With an Alertmanager that exists, it responds correctly.
|
||||
{ |
||||
am, err := mam.AlertmanagerFor(2) |
||||
require.NoError(t, err) |
||||
require.Equal(t, *am.GetStatus().VersionInfo.Version, "N/A") |
||||
require.Equal(t, am.orgID, int64(2)) |
||||
require.NotNil(t, am.config) |
||||
} |
||||
|
||||
// Let's now remove the previous queried organization.
|
||||
orgStore.orgs = []int64{1, 3} |
||||
require.NoError(t, mam.LoadAndSyncAlertmanagersForOrgs(ctx)) |
||||
{ |
||||
_, err := mam.AlertmanagerFor(2) |
||||
require.EqualError(t, err, ErrNoAlertmanagerForOrg.Error()) |
||||
} |
||||
} |
||||
@ -1 +0,0 @@ |
||||
package notifier |
||||
@ -0,0 +1,56 @@ |
||||
package notifier |
||||
|
||||
import ( |
||||
"context" |
||||
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/models" |
||||
"github.com/grafana/grafana/pkg/services/ngalert/store" |
||||
) |
||||
|
||||
type FakeConfigStore struct { |
||||
configs map[int64]*models.AlertConfiguration |
||||
} |
||||
|
||||
func (f *FakeConfigStore) GetLatestAlertmanagerConfiguration(query *models.GetLatestAlertmanagerConfigurationQuery) error { |
||||
var ok bool |
||||
query.Result, ok = f.configs[query.OrgID] |
||||
if !ok { |
||||
return store.ErrNoAlertmanagerConfiguration |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func (f *FakeConfigStore) SaveAlertmanagerConfiguration(cmd *models.SaveAlertmanagerConfigurationCmd) error { |
||||
f.configs[cmd.OrgID] = &models.AlertConfiguration{ |
||||
AlertmanagerConfiguration: cmd.AlertmanagerConfiguration, |
||||
OrgID: cmd.OrgID, |
||||
ConfigurationVersion: "v1", |
||||
Default: cmd.Default, |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func (f *FakeConfigStore) SaveAlertmanagerConfigurationWithCallback(cmd *models.SaveAlertmanagerConfigurationCmd, callback store.SaveCallback) error { |
||||
f.configs[cmd.OrgID] = &models.AlertConfiguration{ |
||||
AlertmanagerConfiguration: cmd.AlertmanagerConfiguration, |
||||
OrgID: cmd.OrgID, |
||||
ConfigurationVersion: "v1", |
||||
Default: cmd.Default, |
||||
} |
||||
|
||||
if err := callback(); err != nil { |
||||
return err |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
type FakeOrgStore struct { |
||||
orgs []int64 |
||||
} |
||||
|
||||
func (f *FakeOrgStore) GetOrgs(_ context.Context) ([]int64, error) { |
||||
return f.orgs, nil |
||||
} |
||||
@ -0,0 +1,26 @@ |
||||
package store |
||||
|
||||
import ( |
||||
"context" |
||||
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore" |
||||
) |
||||
|
||||
type OrgStore interface { |
||||
GetOrgs(ctx context.Context) ([]int64, error) |
||||
} |
||||
|
||||
func (st DBstore) GetOrgs(ctx context.Context) ([]int64, error) { |
||||
orgs := make([]int64, 0) |
||||
err := st.SQLStore.WithDbSession(ctx, func(sess *sqlstore.DBSession) error { |
||||
q := "SELECT id FROM org" |
||||
if err := sess.SQL(q).Find(&orgs); err != nil { |
||||
return err |
||||
} |
||||
return nil |
||||
}) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return orgs, nil |
||||
} |
||||
Loading…
Reference in new issue