The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
grafana/pkg/services/ngalert/state/persister_sync.go

110 lines
3.6 KiB

package state
import (
"context"
"time"
"github.com/grafana/dskit/concurrency"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"github.com/grafana/grafana/pkg/infra/log"
ngModels "github.com/grafana/grafana/pkg/services/ngalert/models"
)
type SyncStatePersister struct {
log log.Logger
store InstanceStore
// doNotSaveNormalState controls whether eval.Normal state is persisted to the database and returned by get methods.
doNotSaveNormalState bool
// maxStateSaveConcurrency controls the number of goroutines (per rule) that can save alert state in parallel.
maxStateSaveConcurrency int
}
func NewSyncStatePersisiter(log log.Logger, cfg ManagerCfg) StatePersister {
return &SyncStatePersister{
log: log,
store: cfg.InstanceStore,
doNotSaveNormalState: cfg.DoNotSaveNormalState,
maxStateSaveConcurrency: cfg.MaxStateSaveConcurrency,
}
}
func (a *SyncStatePersister) Async(_ context.Context, _ *cache) {
a.log.Debug("Async: No-Op")
}
func (a *SyncStatePersister) Sync(ctx context.Context, span trace.Span, states, staleStates []StateTransition) {
a.deleteAlertStates(ctx, staleStates)
if len(staleStates) > 0 {
span.AddEvent("deleted stale states", trace.WithAttributes(
attribute.Int64("state_transitions", int64(len(staleStates))),
))
}
a.saveAlertStates(ctx, states...)
span.AddEvent("updated database")
}
func (a *SyncStatePersister) deleteAlertStates(ctx context.Context, states []StateTransition) {
if a.store == nil || len(states) == 0 {
return
}
a.log.Debug("Deleting alert states", "count", len(states))
toDelete := make([]ngModels.AlertInstanceKey, 0, len(states))
for _, s := range states {
key, err := s.GetAlertInstanceKey()
if err != nil {
a.log.Error("Failed to delete alert instance with invalid labels", "cacheID", s.CacheID, "error", err)
continue
}
toDelete = append(toDelete, key)
}
err := a.store.DeleteAlertInstances(ctx, toDelete...)
if err != nil {
a.log.Error("Failed to delete stale states", "error", err)
}
}
func (a *SyncStatePersister) saveAlertStates(ctx context.Context, states ...StateTransition) {
if a.store == nil || len(states) == 0 {
return
}
saveState := func(ctx context.Context, idx int) error {
s := states[idx]
// Do not save normal state to database and remove transition to Normal state but keep mapped states
if a.doNotSaveNormalState && IsNormalStateWithNoReason(s.State) && !s.Changed() {
return nil
}
key, err := s.GetAlertInstanceKey()
if err != nil {
a.log.Error("Failed to create a key for alert state to save it to database. The state will be ignored ", "cacheID", s.CacheID, "error", err, "labels", s.Labels.String())
return nil
}
instance := ngModels.AlertInstance{
AlertInstanceKey: key,
Labels: ngModels.InstanceLabels(s.Labels),
CurrentState: ngModels.InstanceStateType(s.State.State.String()),
CurrentReason: s.StateReason,
LastEvalTime: s.LastEvaluationTime,
CurrentStateSince: s.StartsAt,
CurrentStateEnd: s.EndsAt,
}
err = a.store.SaveAlertInstance(ctx, instance)
if err != nil {
a.log.Error("Failed to save alert state", "labels", s.Labels.String(), "state", s.State, "error", err)
return nil
}
return nil
}
start := time.Now()
a.log.Debug("Saving alert states", "count", len(states), "max_state_save_concurrency", a.maxStateSaveConcurrency)
_ = concurrency.ForEachJob(ctx, len(states), a.maxStateSaveConcurrency, saveState)
a.log.Debug("Saving alert states done", "count", len(states), "max_state_save_concurrency", a.maxStateSaveConcurrency, "duration", time.Since(start))
}