diff --git a/pkg/services/annotations/annotationsimpl/annotations.go b/pkg/services/annotations/annotationsimpl/annotations.go index db1c745be66..42ff9d20cf8 100644 --- a/pkg/services/annotations/annotationsimpl/annotations.go +++ b/pkg/services/annotations/annotationsimpl/annotations.go @@ -4,6 +4,7 @@ import ( "context" "github.com/grafana/grafana/pkg/services/annotations/accesscontrol" + "github.com/grafana/grafana/pkg/services/annotations/annotationsimpl/loki" "github.com/grafana/grafana/pkg/infra/db" "github.com/grafana/grafana/pkg/infra/log" @@ -17,7 +18,8 @@ type RepositoryImpl struct { db db.DB authZ *accesscontrol.AuthService features featuremgmt.FeatureToggles - store store + reader readStore + writer writeStore } func ProvideService( @@ -27,27 +29,42 @@ func ProvideService( tagService tag.Service, ) *RepositoryImpl { l := log.New("annotations") + l.Debug("Initializing annotations service") + + xormStore := NewXormStore(cfg, log.New("annotations.sql"), db, tagService) + write := xormStore + + var read readStore + historianStore := loki.NewLokiHistorianStore(cfg.UnifiedAlerting.StateHistory, features, db, log.New("annotations.loki")) + if historianStore != nil { + l.Debug("Using composite read store") + read = NewCompositeStore(xormStore, historianStore) + } else { + l.Debug("Using xorm read store") + read = write + } return &RepositoryImpl{ db: db, features: features, authZ: accesscontrol.NewAuthService(db, features), - store: NewXormStore(cfg, l, db, tagService), + reader: read, + writer: write, } } func (r *RepositoryImpl) Save(ctx context.Context, item *annotations.Item) error { - return r.store.Add(ctx, item) + return r.writer.Add(ctx, item) } // SaveMany inserts multiple annotations at once. // It does not return IDs associated with created annotations. If you need this functionality, use the single-item Save instead. func (r *RepositoryImpl) SaveMany(ctx context.Context, items []annotations.Item) error { - return r.store.AddMany(ctx, items) + return r.writer.AddMany(ctx, items) } func (r *RepositoryImpl) Update(ctx context.Context, item *annotations.Item) error { - return r.store.Update(ctx, item) + return r.writer.Update(ctx, item) } func (r *RepositoryImpl) Find(ctx context.Context, query *annotations.ItemQuery) ([]*annotations.ItemDTO, error) { @@ -56,13 +73,13 @@ func (r *RepositoryImpl) Find(ctx context.Context, query *annotations.ItemQuery) return make([]*annotations.ItemDTO, 0), err } - return r.store.Get(ctx, query, resources) + return r.reader.Get(ctx, query, resources) } func (r *RepositoryImpl) Delete(ctx context.Context, params *annotations.DeleteParams) error { - return r.store.Delete(ctx, params) + return r.writer.Delete(ctx, params) } func (r *RepositoryImpl) FindTags(ctx context.Context, query *annotations.TagsQuery) (annotations.FindTagsResult, error) { - return r.store.GetTags(ctx, query) + return r.reader.GetTags(ctx, query) } diff --git a/pkg/services/annotations/annotationsimpl/composite_store.go b/pkg/services/annotations/annotationsimpl/composite_store.go new file mode 100644 index 00000000000..4db6e1c9ece --- /dev/null +++ b/pkg/services/annotations/annotationsimpl/composite_store.go @@ -0,0 +1,67 @@ +package annotationsimpl + +import ( + "context" + "sort" + + "github.com/grafana/dskit/concurrency" + "github.com/grafana/grafana/pkg/services/annotations" + "github.com/grafana/grafana/pkg/services/annotations/accesscontrol" +) + +// CompositeStore is a read store that combines two or more read stores, and queries all stores in parallel. +type CompositeStore struct { + readers []readStore +} + +func NewCompositeStore(readers ...readStore) *CompositeStore { + return &CompositeStore{ + readers: readers, + } +} + +// Get returns annotations from all stores, and combines the results. +func (c *CompositeStore) Get(ctx context.Context, query *annotations.ItemQuery, accessResources *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error) { + itemCh := make(chan []*annotations.ItemDTO, len(c.readers)) + + err := concurrency.ForEachJob(ctx, len(c.readers), len(c.readers), func(ctx context.Context, i int) error { + items, err := c.readers[i].Get(ctx, query, accessResources) + itemCh <- items + return err + }) + if err != nil { + return make([]*annotations.ItemDTO, 0), err + } + + close(itemCh) + res := make([]*annotations.ItemDTO, 0) + for items := range itemCh { + res = append(res, items...) + } + sort.Sort(annotations.SortedItems(res)) + + return res, nil +} + +// GetTags returns tags from all stores, and combines the results. +func (c *CompositeStore) GetTags(ctx context.Context, query *annotations.TagsQuery) (annotations.FindTagsResult, error) { + resCh := make(chan annotations.FindTagsResult, len(c.readers)) + + err := concurrency.ForEachJob(ctx, len(c.readers), len(c.readers), func(ctx context.Context, i int) error { + res, err := c.readers[i].GetTags(ctx, query) + resCh <- res + return err + }) + if err != nil { + return annotations.FindTagsResult{}, err + } + + close(resCh) + res := make([]*annotations.TagsDTO, 0) + for r := range resCh { + res = append(res, r.Tags...) + } + sort.Sort(annotations.SortedTags(res)) + + return annotations.FindTagsResult{Tags: res}, nil +} diff --git a/pkg/services/annotations/annotationsimpl/composite_store_test.go b/pkg/services/annotations/annotationsimpl/composite_store_test.go new file mode 100644 index 00000000000..e7800d5dbdc --- /dev/null +++ b/pkg/services/annotations/annotationsimpl/composite_store_test.go @@ -0,0 +1,173 @@ +package annotationsimpl + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + "github.com/grafana/grafana/pkg/services/annotations" + "github.com/grafana/grafana/pkg/services/annotations/accesscontrol" + "github.com/stretchr/testify/require" +) + +var ( + errGet = errors.New("get error") + errGetTags = errors.New("get tags error") +) + +func TestCompositeStore(t *testing.T) { + t.Run("should return first error", func(t *testing.T) { + err1 := errors.New("error 1") + r1 := newFakeReader(withError(err1)) + err2 := errors.New("error 2") + r2 := newFakeReader(withError(err2), withWait(10*time.Millisecond)) + + store := &CompositeStore{ + []readStore{r1, r2}, + } + + tc := []struct { + f func() (any, error) + err error + }{ + { + f: func() (any, error) { return store.Get(context.Background(), nil, nil) }, + err: errGet, + }, + { + f: func() (any, error) { return store.GetTags(context.Background(), nil) }, + err: errGetTags, + }, + } + + for _, tt := range tc { + _, err := tt.f() + require.Error(t, err) + require.ErrorIs(t, err, err1) + require.NotErrorIs(t, err, err2) + } + }) + + t.Run("should combine and sort results from Get", func(t *testing.T) { + items1 := []*annotations.ItemDTO{ + {TimeEnd: 1, Time: 2}, + {TimeEnd: 2, Time: 1}, + } + r1 := newFakeReader(withItems(items1)) + + items2 := []*annotations.ItemDTO{ + {TimeEnd: 1, Time: 1}, + {TimeEnd: 1, Time: 3}, + } + r2 := newFakeReader(withItems(items2)) + + store := &CompositeStore{ + []readStore{r1, r2}, + } + + expected := []*annotations.ItemDTO{ + {TimeEnd: 2, Time: 1}, + {TimeEnd: 1, Time: 3}, + {TimeEnd: 1, Time: 2}, + {TimeEnd: 1, Time: 1}, + } + + items, _ := store.Get(context.Background(), nil, nil) + require.Equal(t, expected, items) + }) + + t.Run("should combine and sort results from GetTags", func(t *testing.T) { + tags1 := []*annotations.TagsDTO{ + {Tag: "key1:val1"}, + {Tag: "key2:val1"}, + } + r1 := newFakeReader(withTags(tags1)) + + tags2 := []*annotations.TagsDTO{ + {Tag: "key1:val2"}, + {Tag: "key2:val2"}, + } + r2 := newFakeReader(withTags(tags2)) + + store := &CompositeStore{ + []readStore{r1, r2}, + } + + expected := []*annotations.TagsDTO{ + {Tag: "key1:val1"}, + {Tag: "key1:val2"}, + {Tag: "key2:val1"}, + {Tag: "key2:val2"}, + } + + res, _ := store.GetTags(context.Background(), nil) + require.Equal(t, expected, res.Tags) + }) +} + +type fakeReader struct { + items []*annotations.ItemDTO + tagRes annotations.FindTagsResult + wait time.Duration + err error +} + +func (f *fakeReader) Get(ctx context.Context, query *annotations.ItemQuery, accessResources *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error) { + if f.wait > 0 { + time.Sleep(f.wait) + } + + if f.err != nil { + err := fmt.Errorf("%w: %w", errGet, f.err) + return nil, err + } + + return f.items, nil +} + +func (f *fakeReader) GetTags(ctx context.Context, query *annotations.TagsQuery) (annotations.FindTagsResult, error) { + if f.wait > 0 { + time.Sleep(f.wait) + } + + if f.err != nil { + err := fmt.Errorf("%w: %w", errGetTags, f.err) + return annotations.FindTagsResult{}, err + } + + return f.tagRes, nil +} + +func withWait(wait time.Duration) func(*fakeReader) { + return func(f *fakeReader) { + f.wait = wait + } +} + +func withError(err error) func(*fakeReader) { + return func(f *fakeReader) { + f.err = err + } +} + +func withItems(items []*annotations.ItemDTO) func(*fakeReader) { + return func(f *fakeReader) { + f.items = items + } +} + +func withTags(tags []*annotations.TagsDTO) func(*fakeReader) { + return func(f *fakeReader) { + f.tagRes = annotations.FindTagsResult{Tags: tags} + } +} + +func newFakeReader(opts ...func(*fakeReader)) *fakeReader { + f := &fakeReader{} + for _, opt := range opts { + opt(f) + } + return f +} diff --git a/pkg/services/annotations/annotationsimpl/loki/historian_store.go b/pkg/services/annotations/annotationsimpl/loki/historian_store.go new file mode 100644 index 00000000000..773a3f53a72 --- /dev/null +++ b/pkg/services/annotations/annotationsimpl/loki/historian_store.go @@ -0,0 +1,75 @@ +package loki + +import ( + "context" + + "github.com/grafana/grafana/pkg/infra/db" + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/services/annotations" + "github.com/grafana/grafana/pkg/services/annotations/accesscontrol" + "github.com/grafana/grafana/pkg/services/featuremgmt" + "github.com/grafana/grafana/pkg/services/ngalert" + ngmetrics "github.com/grafana/grafana/pkg/services/ngalert/metrics" + "github.com/grafana/grafana/pkg/services/ngalert/state/historian" + "github.com/grafana/grafana/pkg/setting" + "github.com/prometheus/client_golang/prometheus" +) + +const ( + subsystem = "annotations" +) + +type lokiQueryClient interface { + RangeQuery(ctx context.Context, query string, start, end, limit int64) (historian.QueryRes, error) +} + +// LokiHistorianStore is a read store that queries Loki for alert state history. +type LokiHistorianStore struct { + client lokiQueryClient + db db.DB + log log.Logger +} + +func NewLokiHistorianStore(cfg setting.UnifiedAlertingStateHistorySettings, ft featuremgmt.FeatureToggles, db db.DB, log log.Logger) *LokiHistorianStore { + if !useStore(cfg, ft) { + return nil + } + lokiCfg, err := historian.NewLokiConfig(cfg) + if err != nil { + // this config error is already handled elsewhere + return nil + } + + return &LokiHistorianStore{ + client: historian.NewLokiClient(lokiCfg, historian.NewRequester(), ngmetrics.NewHistorianMetrics(prometheus.DefaultRegisterer, subsystem), log), + db: db, + log: log, + } +} + +func (r *LokiHistorianStore) Get(ctx context.Context, query *annotations.ItemQuery, accessResources *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error) { + return []*annotations.ItemDTO{}, nil +} + +func (r *LokiHistorianStore) GetTags(ctx context.Context, query *annotations.TagsQuery) (annotations.FindTagsResult, error) { + return annotations.FindTagsResult{}, nil +} + +func useStore(cfg setting.UnifiedAlertingStateHistorySettings, ft featuremgmt.FeatureToggles) bool { + if !cfg.Enabled { + return false + } + + // Override config based on feature toggles. + // We pass in a no-op logger here since this function is also called during ngalert init, + // and we don't want to log the same problem twice. + ngalert.ApplyStateHistoryFeatureToggles(&cfg, ft, log.NewNopLogger()) + + backend, err := historian.ParseBackendType(cfg.Backend) + if err != nil { + return false + } + + // We should only query Loki if annotations do no exist in the database. + return backend == historian.BackendTypeLoki +} diff --git a/pkg/services/annotations/annotationsimpl/loki/historian_store_test.go b/pkg/services/annotations/annotationsimpl/loki/historian_store_test.go new file mode 100644 index 00000000000..8587f5f32d2 --- /dev/null +++ b/pkg/services/annotations/annotationsimpl/loki/historian_store_test.go @@ -0,0 +1,99 @@ +package loki + +import ( + "testing" + + "github.com/grafana/grafana/pkg/services/featuremgmt" + "github.com/grafana/grafana/pkg/setting" + "github.com/stretchr/testify/require" +) + +func TestUseStore(t *testing.T) { + t.Run("false if state history disabled", func(t *testing.T) { + cfg := setting.UnifiedAlertingStateHistorySettings{ + Enabled: false, + } + use := useStore(cfg, featuremgmt.WithFeatures()) + require.False(t, use) + }) + + t.Run("false if any invalid backend", func(t *testing.T) { + t.Run("single", func(t *testing.T) { + cfg := setting.UnifiedAlertingStateHistorySettings{ + Enabled: true, + Backend: "invalid-backend", + } + use := useStore(cfg, featuremgmt.WithFeatures()) + require.False(t, use) + }) + + t.Run("primary", func(t *testing.T) { + cfg := setting.UnifiedAlertingStateHistorySettings{ + Enabled: true, + Backend: "multiple", + MultiPrimary: "invalid-backend", + } + use := useStore(cfg, featuremgmt.WithFeatures()) + require.False(t, use) + }) + + t.Run("secondary", func(t *testing.T) { + cfg := setting.UnifiedAlertingStateHistorySettings{ + Enabled: true, + Backend: "multiple", + MultiPrimary: "annotations", + MultiSecondaries: []string{"annotations", "invalid-backend"}, + } + use := useStore(cfg, featuremgmt.WithFeatures()) + require.False(t, use) + }) + }) + + t.Run("false if no backend is Loki", func(t *testing.T) { + cfg := setting.UnifiedAlertingStateHistorySettings{ + Enabled: true, + Backend: "annotations", + } + use := useStore(cfg, featuremgmt.WithFeatures()) + require.False(t, use) + }) + + t.Run("false if Loki is part of multi backend", func(t *testing.T) { + t.Run("primary", func(t *testing.T) { + cfg := setting.UnifiedAlertingStateHistorySettings{ + Enabled: true, + Backend: "multiple", + MultiPrimary: "loki", + } + use := useStore(cfg, featuremgmt.WithFeatures()) + require.False(t, use) + }) + + t.Run("secondary", func(t *testing.T) { + cfg := setting.UnifiedAlertingStateHistorySettings{ + Enabled: true, + Backend: "multiple", + MultiPrimary: "annotations", + MultiSecondaries: []string{"loki"}, + } + use := useStore(cfg, featuremgmt.WithFeatures()) + require.False(t, use) + }) + }) + + t.Run("true if only backend is Loki", func(t *testing.T) { + t.Run("only", func(t *testing.T) { + cfg := setting.UnifiedAlertingStateHistorySettings{ + Enabled: true, + Backend: "loki", + } + features := featuremgmt.WithFeatures( + featuremgmt.FlagAlertStateHistoryLokiOnly, + featuremgmt.FlagAlertStateHistoryLokiPrimary, + featuremgmt.FlagAlertStateHistoryLokiSecondary, + ) + use := useStore(cfg, features) + require.True(t, use) + }) + }) +} diff --git a/pkg/services/annotations/annotationsimpl/store.go b/pkg/services/annotations/annotationsimpl/store.go index d367bf1e64e..1a3d3991416 100644 --- a/pkg/services/annotations/annotationsimpl/store.go +++ b/pkg/services/annotations/annotationsimpl/store.go @@ -10,12 +10,20 @@ import ( ) type store interface { + readStore + writeStore +} + +type readStore interface { + Get(ctx context.Context, query *annotations.ItemQuery, accessResources *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error) + GetTags(ctx context.Context, query *annotations.TagsQuery) (annotations.FindTagsResult, error) +} + +type writeStore interface { Add(ctx context.Context, items *annotations.Item) error AddMany(ctx context.Context, items []annotations.Item) error Update(ctx context.Context, item *annotations.Item) error - Get(ctx context.Context, query *annotations.ItemQuery, accessResources *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error) Delete(ctx context.Context, params *annotations.DeleteParams) error - GetTags(ctx context.Context, query *annotations.TagsQuery) (annotations.FindTagsResult, error) CleanAnnotations(ctx context.Context, cfg setting.AnnotationCleanupSettings, annotationType string) (int64, error) CleanOrphanedAnnotationTags(ctx context.Context) (int64, error) } diff --git a/pkg/services/annotations/annotationsimpl/xorm_store.go b/pkg/services/annotations/annotationsimpl/xorm_store.go index 19a7a846d82..e7072f2f052 100644 --- a/pkg/services/annotations/annotationsimpl/xorm_store.go +++ b/pkg/services/annotations/annotationsimpl/xorm_store.go @@ -48,7 +48,7 @@ func NewXormStore(cfg *setting.Cfg, l log.Logger, db db.DB, tagService tag.Servi return &xormRepositoryImpl{ cfg: cfg, db: db, - log: l.New("store", "xorm"), + log: l, tagService: tagService, } } diff --git a/pkg/services/annotations/models.go b/pkg/services/annotations/models.go index 1e24a75f40e..2b1325b4f3c 100644 --- a/pkg/services/annotations/models.go +++ b/pkg/services/annotations/models.go @@ -44,6 +44,21 @@ type TagsDTO struct { Count int64 `json:"count"` } +// sort tags in ascending order by tag string +type SortedTags []*TagsDTO + +func (s SortedTags) Len() int { + return len(s) +} + +func (s SortedTags) Less(i, j int) bool { + return s[i].Tag < s[j].Tag +} + +func (s SortedTags) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + // FindTagsResult is the result of a tags search. type FindTagsResult struct { Tags []*TagsDTO `json:"tags"` @@ -110,6 +125,24 @@ type ItemDTO struct { Data *simplejson.Json `json:"data"` } +type SortedItems []*ItemDTO + +// sort annotations in descending order by end time, then by start time +func (s SortedItems) Len() int { + return len(s) +} + +func (s SortedItems) Less(i, j int) bool { + if s[i].TimeEnd != s[j].TimeEnd { + return s[i].TimeEnd > s[j].TimeEnd + } + return s[i].Time > s[j].Time +} + +func (s SortedItems) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + type annotationType int const ( diff --git a/pkg/services/ngalert/ngalert.go b/pkg/services/ngalert/ngalert.go index 48d6e5d4249..c6d2eef7120 100644 --- a/pkg/services/ngalert/ngalert.go +++ b/pkg/services/ngalert/ngalert.go @@ -236,7 +236,7 @@ func (ng *AlertNG) init() error { // There are a set of feature toggles available that act as short-circuits for common configurations. // If any are set, override the config accordingly. - applyStateHistoryFeatureToggles(&ng.Cfg.UnifiedAlerting.StateHistory, ng.FeatureToggles, ng.Log) + ApplyStateHistoryFeatureToggles(&ng.Cfg.UnifiedAlerting.StateHistory, ng.FeatureToggles, ng.Log) history, err := configureHistorianBackend(initCtx, ng.Cfg.UnifiedAlerting.StateHistory, ng.annotationsRepo, ng.dashboardService, ng.store, ng.Metrics.GetHistorianMetrics(), ng.Log) if err != nil { return err @@ -446,8 +446,8 @@ func configureHistorianBackend(ctx context.Context, cfg setting.UnifiedAlertingS return nil, fmt.Errorf("unrecognized state history backend: %s", backend) } -// applyStateHistoryFeatureToggles edits state history configuration to comply with currently active feature toggles. -func applyStateHistoryFeatureToggles(cfg *setting.UnifiedAlertingStateHistorySettings, ft featuremgmt.FeatureToggles, logger log.Logger) { +// ApplyStateHistoryFeatureToggles edits state history configuration to comply with currently active feature toggles. +func ApplyStateHistoryFeatureToggles(cfg *setting.UnifiedAlertingStateHistorySettings, ft featuremgmt.FeatureToggles, logger log.Logger) { backend, _ := historian.ParseBackendType(cfg.Backend) // These feature toggles represent specific, common backend configurations. // If all toggles are enabled, we listen to the state history config as written.