mirror of https://github.com/grafana/grafana
Annotations/Alerting: Add Loki historian store stub (#78363)
* Add Loki historian store stub * Add composite store * Use composite store if Loki historian enabled * Split store interface into read/write * Make composite + historian stores read only * Use variadic constructor for composite * Modify Loki store enable logic * Use dskit.concurrency.ForEachJob for parallelismpull/79294/head
parent
09cef892a5
commit
62bdbe5b44
@ -0,0 +1,67 @@ |
||||
package annotationsimpl |
||||
|
||||
import ( |
||||
"context" |
||||
"sort" |
||||
|
||||
"github.com/grafana/dskit/concurrency" |
||||
"github.com/grafana/grafana/pkg/services/annotations" |
||||
"github.com/grafana/grafana/pkg/services/annotations/accesscontrol" |
||||
) |
||||
|
||||
// CompositeStore is a read store that combines two or more read stores, and queries all stores in parallel.
|
||||
type CompositeStore struct { |
||||
readers []readStore |
||||
} |
||||
|
||||
func NewCompositeStore(readers ...readStore) *CompositeStore { |
||||
return &CompositeStore{ |
||||
readers: readers, |
||||
} |
||||
} |
||||
|
||||
// Get returns annotations from all stores, and combines the results.
|
||||
func (c *CompositeStore) Get(ctx context.Context, query *annotations.ItemQuery, accessResources *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error) { |
||||
itemCh := make(chan []*annotations.ItemDTO, len(c.readers)) |
||||
|
||||
err := concurrency.ForEachJob(ctx, len(c.readers), len(c.readers), func(ctx context.Context, i int) error { |
||||
items, err := c.readers[i].Get(ctx, query, accessResources) |
||||
itemCh <- items |
||||
return err |
||||
}) |
||||
if err != nil { |
||||
return make([]*annotations.ItemDTO, 0), err |
||||
} |
||||
|
||||
close(itemCh) |
||||
res := make([]*annotations.ItemDTO, 0) |
||||
for items := range itemCh { |
||||
res = append(res, items...) |
||||
} |
||||
sort.Sort(annotations.SortedItems(res)) |
||||
|
||||
return res, nil |
||||
} |
||||
|
||||
// GetTags returns tags from all stores, and combines the results.
|
||||
func (c *CompositeStore) GetTags(ctx context.Context, query *annotations.TagsQuery) (annotations.FindTagsResult, error) { |
||||
resCh := make(chan annotations.FindTagsResult, len(c.readers)) |
||||
|
||||
err := concurrency.ForEachJob(ctx, len(c.readers), len(c.readers), func(ctx context.Context, i int) error { |
||||
res, err := c.readers[i].GetTags(ctx, query) |
||||
resCh <- res |
||||
return err |
||||
}) |
||||
if err != nil { |
||||
return annotations.FindTagsResult{}, err |
||||
} |
||||
|
||||
close(resCh) |
||||
res := make([]*annotations.TagsDTO, 0) |
||||
for r := range resCh { |
||||
res = append(res, r.Tags...) |
||||
} |
||||
sort.Sort(annotations.SortedTags(res)) |
||||
|
||||
return annotations.FindTagsResult{Tags: res}, nil |
||||
} |
||||
@ -0,0 +1,173 @@ |
||||
package annotationsimpl |
||||
|
||||
import ( |
||||
"context" |
||||
"errors" |
||||
"fmt" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/grafana/grafana/pkg/services/annotations" |
||||
"github.com/grafana/grafana/pkg/services/annotations/accesscontrol" |
||||
"github.com/stretchr/testify/require" |
||||
) |
||||
|
||||
var ( |
||||
errGet = errors.New("get error") |
||||
errGetTags = errors.New("get tags error") |
||||
) |
||||
|
||||
func TestCompositeStore(t *testing.T) { |
||||
t.Run("should return first error", func(t *testing.T) { |
||||
err1 := errors.New("error 1") |
||||
r1 := newFakeReader(withError(err1)) |
||||
err2 := errors.New("error 2") |
||||
r2 := newFakeReader(withError(err2), withWait(10*time.Millisecond)) |
||||
|
||||
store := &CompositeStore{ |
||||
[]readStore{r1, r2}, |
||||
} |
||||
|
||||
tc := []struct { |
||||
f func() (any, error) |
||||
err error |
||||
}{ |
||||
{ |
||||
f: func() (any, error) { return store.Get(context.Background(), nil, nil) }, |
||||
err: errGet, |
||||
}, |
||||
{ |
||||
f: func() (any, error) { return store.GetTags(context.Background(), nil) }, |
||||
err: errGetTags, |
||||
}, |
||||
} |
||||
|
||||
for _, tt := range tc { |
||||
_, err := tt.f() |
||||
require.Error(t, err) |
||||
require.ErrorIs(t, err, err1) |
||||
require.NotErrorIs(t, err, err2) |
||||
} |
||||
}) |
||||
|
||||
t.Run("should combine and sort results from Get", func(t *testing.T) { |
||||
items1 := []*annotations.ItemDTO{ |
||||
{TimeEnd: 1, Time: 2}, |
||||
{TimeEnd: 2, Time: 1}, |
||||
} |
||||
r1 := newFakeReader(withItems(items1)) |
||||
|
||||
items2 := []*annotations.ItemDTO{ |
||||
{TimeEnd: 1, Time: 1}, |
||||
{TimeEnd: 1, Time: 3}, |
||||
} |
||||
r2 := newFakeReader(withItems(items2)) |
||||
|
||||
store := &CompositeStore{ |
||||
[]readStore{r1, r2}, |
||||
} |
||||
|
||||
expected := []*annotations.ItemDTO{ |
||||
{TimeEnd: 2, Time: 1}, |
||||
{TimeEnd: 1, Time: 3}, |
||||
{TimeEnd: 1, Time: 2}, |
||||
{TimeEnd: 1, Time: 1}, |
||||
} |
||||
|
||||
items, _ := store.Get(context.Background(), nil, nil) |
||||
require.Equal(t, expected, items) |
||||
}) |
||||
|
||||
t.Run("should combine and sort results from GetTags", func(t *testing.T) { |
||||
tags1 := []*annotations.TagsDTO{ |
||||
{Tag: "key1:val1"}, |
||||
{Tag: "key2:val1"}, |
||||
} |
||||
r1 := newFakeReader(withTags(tags1)) |
||||
|
||||
tags2 := []*annotations.TagsDTO{ |
||||
{Tag: "key1:val2"}, |
||||
{Tag: "key2:val2"}, |
||||
} |
||||
r2 := newFakeReader(withTags(tags2)) |
||||
|
||||
store := &CompositeStore{ |
||||
[]readStore{r1, r2}, |
||||
} |
||||
|
||||
expected := []*annotations.TagsDTO{ |
||||
{Tag: "key1:val1"}, |
||||
{Tag: "key1:val2"}, |
||||
{Tag: "key2:val1"}, |
||||
{Tag: "key2:val2"}, |
||||
} |
||||
|
||||
res, _ := store.GetTags(context.Background(), nil) |
||||
require.Equal(t, expected, res.Tags) |
||||
}) |
||||
} |
||||
|
||||
type fakeReader struct { |
||||
items []*annotations.ItemDTO |
||||
tagRes annotations.FindTagsResult |
||||
wait time.Duration |
||||
err error |
||||
} |
||||
|
||||
func (f *fakeReader) Get(ctx context.Context, query *annotations.ItemQuery, accessResources *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error) { |
||||
if f.wait > 0 { |
||||
time.Sleep(f.wait) |
||||
} |
||||
|
||||
if f.err != nil { |
||||
err := fmt.Errorf("%w: %w", errGet, f.err) |
||||
return nil, err |
||||
} |
||||
|
||||
return f.items, nil |
||||
} |
||||
|
||||
func (f *fakeReader) GetTags(ctx context.Context, query *annotations.TagsQuery) (annotations.FindTagsResult, error) { |
||||
if f.wait > 0 { |
||||
time.Sleep(f.wait) |
||||
} |
||||
|
||||
if f.err != nil { |
||||
err := fmt.Errorf("%w: %w", errGetTags, f.err) |
||||
return annotations.FindTagsResult{}, err |
||||
} |
||||
|
||||
return f.tagRes, nil |
||||
} |
||||
|
||||
func withWait(wait time.Duration) func(*fakeReader) { |
||||
return func(f *fakeReader) { |
||||
f.wait = wait |
||||
} |
||||
} |
||||
|
||||
func withError(err error) func(*fakeReader) { |
||||
return func(f *fakeReader) { |
||||
f.err = err |
||||
} |
||||
} |
||||
|
||||
func withItems(items []*annotations.ItemDTO) func(*fakeReader) { |
||||
return func(f *fakeReader) { |
||||
f.items = items |
||||
} |
||||
} |
||||
|
||||
func withTags(tags []*annotations.TagsDTO) func(*fakeReader) { |
||||
return func(f *fakeReader) { |
||||
f.tagRes = annotations.FindTagsResult{Tags: tags} |
||||
} |
||||
} |
||||
|
||||
func newFakeReader(opts ...func(*fakeReader)) *fakeReader { |
||||
f := &fakeReader{} |
||||
for _, opt := range opts { |
||||
opt(f) |
||||
} |
||||
return f |
||||
} |
||||
@ -0,0 +1,75 @@ |
||||
package loki |
||||
|
||||
import ( |
||||
"context" |
||||
|
||||
"github.com/grafana/grafana/pkg/infra/db" |
||||
"github.com/grafana/grafana/pkg/infra/log" |
||||
"github.com/grafana/grafana/pkg/services/annotations" |
||||
"github.com/grafana/grafana/pkg/services/annotations/accesscontrol" |
||||
"github.com/grafana/grafana/pkg/services/featuremgmt" |
||||
"github.com/grafana/grafana/pkg/services/ngalert" |
||||
ngmetrics "github.com/grafana/grafana/pkg/services/ngalert/metrics" |
||||
"github.com/grafana/grafana/pkg/services/ngalert/state/historian" |
||||
"github.com/grafana/grafana/pkg/setting" |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
) |
||||
|
||||
const ( |
||||
subsystem = "annotations" |
||||
) |
||||
|
||||
type lokiQueryClient interface { |
||||
RangeQuery(ctx context.Context, query string, start, end, limit int64) (historian.QueryRes, error) |
||||
} |
||||
|
||||
// LokiHistorianStore is a read store that queries Loki for alert state history.
|
||||
type LokiHistorianStore struct { |
||||
client lokiQueryClient |
||||
db db.DB |
||||
log log.Logger |
||||
} |
||||
|
||||
func NewLokiHistorianStore(cfg setting.UnifiedAlertingStateHistorySettings, ft featuremgmt.FeatureToggles, db db.DB, log log.Logger) *LokiHistorianStore { |
||||
if !useStore(cfg, ft) { |
||||
return nil |
||||
} |
||||
lokiCfg, err := historian.NewLokiConfig(cfg) |
||||
if err != nil { |
||||
// this config error is already handled elsewhere
|
||||
return nil |
||||
} |
||||
|
||||
return &LokiHistorianStore{ |
||||
client: historian.NewLokiClient(lokiCfg, historian.NewRequester(), ngmetrics.NewHistorianMetrics(prometheus.DefaultRegisterer, subsystem), log), |
||||
db: db, |
||||
log: log, |
||||
} |
||||
} |
||||
|
||||
func (r *LokiHistorianStore) Get(ctx context.Context, query *annotations.ItemQuery, accessResources *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error) { |
||||
return []*annotations.ItemDTO{}, nil |
||||
} |
||||
|
||||
func (r *LokiHistorianStore) GetTags(ctx context.Context, query *annotations.TagsQuery) (annotations.FindTagsResult, error) { |
||||
return annotations.FindTagsResult{}, nil |
||||
} |
||||
|
||||
func useStore(cfg setting.UnifiedAlertingStateHistorySettings, ft featuremgmt.FeatureToggles) bool { |
||||
if !cfg.Enabled { |
||||
return false |
||||
} |
||||
|
||||
// Override config based on feature toggles.
|
||||
// We pass in a no-op logger here since this function is also called during ngalert init,
|
||||
// and we don't want to log the same problem twice.
|
||||
ngalert.ApplyStateHistoryFeatureToggles(&cfg, ft, log.NewNopLogger()) |
||||
|
||||
backend, err := historian.ParseBackendType(cfg.Backend) |
||||
if err != nil { |
||||
return false |
||||
} |
||||
|
||||
// We should only query Loki if annotations do no exist in the database.
|
||||
return backend == historian.BackendTypeLoki |
||||
} |
||||
@ -0,0 +1,99 @@ |
||||
package loki |
||||
|
||||
import ( |
||||
"testing" |
||||
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt" |
||||
"github.com/grafana/grafana/pkg/setting" |
||||
"github.com/stretchr/testify/require" |
||||
) |
||||
|
||||
func TestUseStore(t *testing.T) { |
||||
t.Run("false if state history disabled", func(t *testing.T) { |
||||
cfg := setting.UnifiedAlertingStateHistorySettings{ |
||||
Enabled: false, |
||||
} |
||||
use := useStore(cfg, featuremgmt.WithFeatures()) |
||||
require.False(t, use) |
||||
}) |
||||
|
||||
t.Run("false if any invalid backend", func(t *testing.T) { |
||||
t.Run("single", func(t *testing.T) { |
||||
cfg := setting.UnifiedAlertingStateHistorySettings{ |
||||
Enabled: true, |
||||
Backend: "invalid-backend", |
||||
} |
||||
use := useStore(cfg, featuremgmt.WithFeatures()) |
||||
require.False(t, use) |
||||
}) |
||||
|
||||
t.Run("primary", func(t *testing.T) { |
||||
cfg := setting.UnifiedAlertingStateHistorySettings{ |
||||
Enabled: true, |
||||
Backend: "multiple", |
||||
MultiPrimary: "invalid-backend", |
||||
} |
||||
use := useStore(cfg, featuremgmt.WithFeatures()) |
||||
require.False(t, use) |
||||
}) |
||||
|
||||
t.Run("secondary", func(t *testing.T) { |
||||
cfg := setting.UnifiedAlertingStateHistorySettings{ |
||||
Enabled: true, |
||||
Backend: "multiple", |
||||
MultiPrimary: "annotations", |
||||
MultiSecondaries: []string{"annotations", "invalid-backend"}, |
||||
} |
||||
use := useStore(cfg, featuremgmt.WithFeatures()) |
||||
require.False(t, use) |
||||
}) |
||||
}) |
||||
|
||||
t.Run("false if no backend is Loki", func(t *testing.T) { |
||||
cfg := setting.UnifiedAlertingStateHistorySettings{ |
||||
Enabled: true, |
||||
Backend: "annotations", |
||||
} |
||||
use := useStore(cfg, featuremgmt.WithFeatures()) |
||||
require.False(t, use) |
||||
}) |
||||
|
||||
t.Run("false if Loki is part of multi backend", func(t *testing.T) { |
||||
t.Run("primary", func(t *testing.T) { |
||||
cfg := setting.UnifiedAlertingStateHistorySettings{ |
||||
Enabled: true, |
||||
Backend: "multiple", |
||||
MultiPrimary: "loki", |
||||
} |
||||
use := useStore(cfg, featuremgmt.WithFeatures()) |
||||
require.False(t, use) |
||||
}) |
||||
|
||||
t.Run("secondary", func(t *testing.T) { |
||||
cfg := setting.UnifiedAlertingStateHistorySettings{ |
||||
Enabled: true, |
||||
Backend: "multiple", |
||||
MultiPrimary: "annotations", |
||||
MultiSecondaries: []string{"loki"}, |
||||
} |
||||
use := useStore(cfg, featuremgmt.WithFeatures()) |
||||
require.False(t, use) |
||||
}) |
||||
}) |
||||
|
||||
t.Run("true if only backend is Loki", func(t *testing.T) { |
||||
t.Run("only", func(t *testing.T) { |
||||
cfg := setting.UnifiedAlertingStateHistorySettings{ |
||||
Enabled: true, |
||||
Backend: "loki", |
||||
} |
||||
features := featuremgmt.WithFeatures( |
||||
featuremgmt.FlagAlertStateHistoryLokiOnly, |
||||
featuremgmt.FlagAlertStateHistoryLokiPrimary, |
||||
featuremgmt.FlagAlertStateHistoryLokiSecondary, |
||||
) |
||||
use := useStore(cfg, features) |
||||
require.True(t, use) |
||||
}) |
||||
}) |
||||
} |
||||
Loading…
Reference in new issue