From 01d274852c389f7c33b22254acb5dec35913ff92 Mon Sep 17 00:00:00 2001 From: Santiago Date: Tue, 28 Nov 2023 15:34:45 +0100 Subject: [PATCH] Alerting: Add GetFullState method to FileStore (#78701) * Alerting: Add GetFullState method to FileStore * make tests compile, create stateStore in NewAlertmanager * return errors instead of logging, accept an arbitrary number of strings * make NewAlertmanager() accept a stateStore --- pkg/services/ngalert/ngalert.go | 4 +- pkg/services/ngalert/notifier/file_store.go | 40 ++++++++++++++++ .../ngalert/notifier/file_store_test.go | 48 +++++++++++++++++++ pkg/services/ngalert/remote/alertmanager.go | 8 +++- .../ngalert/remote/alertmanager_test.go | 12 ++--- pkg/services/ngalert/tests/fakes/kvstore.go | 22 ++++++++- 6 files changed, 125 insertions(+), 9 deletions(-) diff --git a/pkg/services/ngalert/ngalert.go b/pkg/services/ngalert/ngalert.go index 8694893793d..3e7a4dde681 100644 --- a/pkg/services/ngalert/ngalert.go +++ b/pkg/services/ngalert/ngalert.go @@ -177,7 +177,9 @@ func (ng *AlertNG) init() error { if ng.Cfg.UnifiedAlerting.RemoteAlertmanager.Enable { override := notifier.WithAlertmanagerOverride(func(ctx context.Context, orgID int64) (notifier.Alertmanager, error) { externalAMCfg := remote.AlertmanagerConfig{} - return remote.NewAlertmanager(externalAMCfg, orgID) + // We won't be handling files on disk, we can pass an empty string as workingDirPath. + stateStore := notifier.NewFileStore(orgID, ng.KVStore, "") + return remote.NewAlertmanager(externalAMCfg, orgID, stateStore) }) overrides = append(overrides, override) diff --git a/pkg/services/ngalert/notifier/file_store.go b/pkg/services/ngalert/notifier/file_store.go index cba28381452..5fc99cac8d9 100644 --- a/pkg/services/ngalert/notifier/file_store.go +++ b/pkg/services/ngalert/notifier/file_store.go @@ -8,6 +8,7 @@ import ( "path/filepath" alertingNotify "github.com/grafana/alerting/notify" + "github.com/prometheus/alertmanager/cluster/clusterpb" "github.com/grafana/grafana/pkg/infra/kvstore" "github.com/grafana/grafana/pkg/infra/log" @@ -62,6 +63,45 @@ func (fileStore *FileStore) FilepathFor(ctx context.Context, filename string) (s return fileStore.pathFor(filename), err } +// GetFullState receives a list of keys, looks for the corresponding values in the kvstore, +// and returns a base64-encoded protobuf message containing those key-value pairs. +// That base64-encoded string represents the Alertmanager's internal state. +func (fileStore *FileStore) GetFullState(ctx context.Context, filenames ...string) (string, error) { + all, err := fileStore.kv.GetAll(ctx) + if err != nil { + return "", err + } + + keys, ok := all[fileStore.orgID] + if !ok { + return "", fmt.Errorf("no values for org %d", fileStore.orgID) + } + + var parts []clusterpb.Part + for _, f := range filenames { + v, ok := keys[f] + if !ok { + return "", fmt.Errorf("no value found for key %q", f) + } + + b, err := decode(v) + if err != nil { + return "", fmt.Errorf("error decoding value for key %q", f) + } + parts = append(parts, clusterpb.Part{Key: f, Data: b}) + } + + fs := clusterpb.FullState{ + Parts: parts, + } + b, err := fs.Marshal() + if err != nil { + return "", fmt.Errorf("error marshaling full state: %w", err) + } + + return encode(b), nil +} + // Persist takes care of persisting the binary representation of internal state to the database as a base64 encoded string. func (fileStore *FileStore) Persist(ctx context.Context, filename string, st alertingNotify.State) (int64, error) { var size int64 diff --git a/pkg/services/ngalert/notifier/file_store_test.go b/pkg/services/ngalert/notifier/file_store_test.go index 92182396fa0..4f61a18d03e 100644 --- a/pkg/services/ngalert/notifier/file_store_test.go +++ b/pkg/services/ngalert/notifier/file_store_test.go @@ -2,10 +2,12 @@ package notifier import ( "context" + "encoding/base64" "os" "path/filepath" "testing" + "github.com/prometheus/alertmanager/cluster/clusterpb" "github.com/stretchr/testify/require" "github.com/grafana/grafana/pkg/services/ngalert/tests/fakes" @@ -74,6 +76,52 @@ func TestFileStore_FilepathFor(t *testing.T) { } } +func TestFileStore_GetFullState(t *testing.T) { + ctx := context.Background() + + t.Run("empty store", func(tt *testing.T) { + store := fakes.NewFakeKVStore(t) + fs := NewFileStore(1, store, workingDir) + _, err := fs.GetFullState(ctx, "silences", "notifications") + require.NotNil(tt, err) + require.Equal(tt, "no values for org 1", err.Error()) + }) + + t.Run("no values for key", func(tt *testing.T) { + store := fakes.NewFakeKVStore(t) + require.NoError(t, store.Set(ctx, 1, "alertmanager", "test-key", "test-value")) + fs := NewFileStore(1, store, workingDir) + _, err := fs.GetFullState(ctx, "silences") + require.NotNil(tt, err) + require.Equal(tt, "no value found for key \"silences\"", err.Error()) + }) + + t.Run("non-empty values", func(tt *testing.T) { + store := fakes.NewFakeKVStore(t) + silences := []byte("test-silences") + nflog := []byte("test-notifications") + require.NoError(t, store.Set(ctx, 1, "alertmanager", "silences", base64.StdEncoding.EncodeToString(silences))) + require.NoError(t, store.Set(ctx, 1, "alertmanager", "notifications", base64.StdEncoding.EncodeToString(nflog))) + + state := clusterpb.FullState{ + Parts: []clusterpb.Part{ + {Key: "silences", Data: silences}, + {Key: "notifications", Data: nflog}, + }, + } + b, err := state.Marshal() + require.NoError(t, err) + + encodedFullState := base64.StdEncoding.EncodeToString(b) + + fs := NewFileStore(1, store, workingDir) + + got, err := fs.GetFullState(ctx, "silences", "notifications") + require.NoError(t, err) + require.Equal(t, encodedFullState, got) + }) +} + func TestFileStore_Persist(t *testing.T) { store := fakes.NewFakeKVStore(t) state := &fakeState{data: "something to marshal"} diff --git a/pkg/services/ngalert/remote/alertmanager.go b/pkg/services/ngalert/remote/alertmanager.go index 349c8f6bcf0..e7f4b70559f 100644 --- a/pkg/services/ngalert/remote/alertmanager.go +++ b/pkg/services/ngalert/remote/alertmanager.go @@ -26,6 +26,10 @@ import ( const readyPath = "/-/ready" +type stateStore interface { + GetFullState(ctx context.Context, keys ...string) (string, error) +} + type Alertmanager struct { log log.Logger orgID int64 @@ -37,6 +41,7 @@ type Alertmanager struct { httpClient *http.Client ready bool sender *sender.ExternalAlertmanager + stateStore stateStore } type AlertmanagerConfig struct { @@ -45,7 +50,7 @@ type AlertmanagerConfig struct { BasicAuthPassword string } -func NewAlertmanager(cfg AlertmanagerConfig, orgID int64) (*Alertmanager, error) { +func NewAlertmanager(cfg AlertmanagerConfig, orgID int64, store stateStore) (*Alertmanager, error) { client := http.Client{ Transport: &mimirClient.MimirAuthRoundTripper{ TenantID: cfg.TenantID, @@ -100,6 +105,7 @@ func NewAlertmanager(cfg AlertmanagerConfig, orgID int64) (*Alertmanager, error) amClient: amclient.New(transport, nil), httpClient: &client, sender: s, + stateStore: store, orgID: orgID, tenantID: cfg.TenantID, url: cfg.URL, diff --git a/pkg/services/ngalert/remote/alertmanager_test.go b/pkg/services/ngalert/remote/alertmanager_test.go index b33974ed361..f80346583a7 100644 --- a/pkg/services/ngalert/remote/alertmanager_test.go +++ b/pkg/services/ngalert/remote/alertmanager_test.go @@ -63,7 +63,7 @@ func TestNewAlertmanager(t *testing.T) { TenantID: test.tenantID, BasicAuthPassword: test.password, } - am, err := NewAlertmanager(cfg, test.orgID) + am, err := NewAlertmanager(cfg, test.orgID, nil) if test.expErr != "" { require.EqualError(tt, err, test.expErr) return @@ -93,7 +93,7 @@ func TestApplyConfig(t *testing.T) { cfg := AlertmanagerConfig{ URL: server.URL, } - am, err := NewAlertmanager(cfg, 1) + am, err := NewAlertmanager(cfg, 1, nil) require.NoError(t, err) config := &ngmodels.AlertConfiguration{} @@ -144,7 +144,7 @@ func TestIntegrationRemoteAlertmanagerApplyConfigOnlyUploadsOnce(t *testing.T) { } ctx := context.Background() - am, err := NewAlertmanager(cfg, 1) + am, err := NewAlertmanager(cfg, 1, nil) require.NoError(t, err) // We should have no configuration at first. @@ -214,7 +214,7 @@ func TestIntegrationRemoteAlertmanagerSilences(t *testing.T) { TenantID: tenantID, BasicAuthPassword: password, } - am, err := NewAlertmanager(cfg, 1) + am, err := NewAlertmanager(cfg, 1, nil) require.NoError(t, err) // We should have no silences at first. @@ -293,7 +293,7 @@ func TestIntegrationRemoteAlertmanagerAlerts(t *testing.T) { TenantID: tenantID, BasicAuthPassword: password, } - am, err := NewAlertmanager(cfg, 1) + am, err := NewAlertmanager(cfg, 1, nil) require.NoError(t, err) // Wait until the Alertmanager is ready to send alerts. @@ -355,7 +355,7 @@ func TestIntegrationRemoteAlertmanagerReceivers(t *testing.T) { BasicAuthPassword: password, } - am, err := NewAlertmanager(cfg, 1) + am, err := NewAlertmanager(cfg, 1, nil) require.NoError(t, err) // We should start with the default config. diff --git a/pkg/services/ngalert/tests/fakes/kvstore.go b/pkg/services/ngalert/tests/fakes/kvstore.go index 7ef60184b83..ecf569554d2 100644 --- a/pkg/services/ngalert/tests/fakes/kvstore.go +++ b/pkg/services/ngalert/tests/fakes/kvstore.go @@ -98,5 +98,25 @@ func (fkv *FakeKVStore) Keys(ctx context.Context, orgID int64, namespace string, } func (fkv *FakeKVStore) GetAll(ctx context.Context, orgId int64, namespace string) (map[int64]map[string]string, error) { - return nil, nil + fkv.Mtx.Lock() + defer fkv.Mtx.Unlock() + + all := map[int64]map[string]string{ + orgId: make(map[string]string), + } + + org, ok := fkv.Store[orgId] + if !ok { + return nil, nil + } + + values, ok := org[namespace] + if !ok { + return all, nil + } + + for k, v := range values { + all[orgId][k] = v + } + return all, nil }