Alerting: Add state_periodic_save_batch_size config option (#98019)

* Alerting: Add state_periodic_save_batch_size config option

---------

Co-authored-by: brendamuir <100768211+brendamuir@users.noreply.github.com>
pull/98002/head^2
Alexander Akhmetov 6 months ago committed by GitHub
parent 22a9770ec5
commit 1f8f9a45d7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 3
      conf/defaults.ini
  2. 3
      conf/sample.ini
  3. 4
      docs/sources/alerting/set-up/performance-limitations/index.md
  4. 1
      pkg/services/ngalert/ngalert.go
  5. 3
      pkg/services/ngalert/state/manager.go
  6. 2
      pkg/services/ngalert/state/persist.go
  7. 6
      pkg/services/ngalert/state/persister_async.go
  8. 2
      pkg/services/ngalert/state/testing.go
  9. 103
      pkg/services/ngalert/store/instance_database.go
  10. 138
      pkg/services/ngalert/store/instance_database_test.go
  11. 3
      pkg/setting/setting_unified_alerting.go

@ -1354,6 +1354,9 @@ max_state_save_concurrency = 1
# The interval string is a possibly signed sequence of decimal numbers, followed by a unit suffix (ms, s, m, h, d), e.g. 30s or 1m.
state_periodic_save_interval = 5m
# If the feature flag 'alertingSaveStatePeriodic' is enabled, this is the size of the batch that is saved to the database at once.
state_periodic_save_batch_size = 1
# Disables the smoothing of alert evaluations across their evaluation window.
# Rules will evaluate in sync.
disable_jitter = false

@ -1338,6 +1338,9 @@
# The interval string is a possibly signed sequence of decimal numbers, followed by a unit suffix (ms, s, m, h, d), e.g. 30s or 1m.
;state_periodic_save_interval = 5m
# If the feature flag 'alertingSaveStatePeriodic' is enabled, this is the size of the batch that is saved to the database at once.
;state_periodic_save_batch_size = 1
# Disables the smoothing of alert evaluations across their evaluation window.
# Rules will evaluate in sync.
;disable_jitter = false

@ -63,7 +63,9 @@ transition of an alert instance is saved in the database.
This can be prevented by writing to the database periodically. For this the feature flag `alertingSaveStatePeriodic` needs
to be enabled. By default, it saves the states every 5 minutes to the database and on each shutdown. The periodic interval
can also be configured using the `state_periodic_save_interval` configuration flag.
can also be configured using the `state_periodic_save_interval` configuration flag. During this process, Grafana deletes all existing alert instances from the
database and then writes the entire current set of instances back in batches in a single transacton.
Configure the size of each batch using the `state_periodic_save_batch_size` configuration option.
The time it takes to write to the database periodically can be monitored using the `state_full_sync_duration_seconds` metric
that is exposed by Grafana.

@ -409,6 +409,7 @@ func (ng *AlertNG) init() error {
DoNotSaveNormalState: ng.FeatureToggles.IsEnabledGlobally(featuremgmt.FlagAlertingNoNormalState),
ApplyNoDataAndErrorToAllStates: ng.FeatureToggles.IsEnabledGlobally(featuremgmt.FlagAlertingNoDataErrorExecution),
MaxStateSaveConcurrency: ng.Cfg.UnifiedAlerting.MaxStateSaveConcurrency,
StatePeriodicSaveBatchSize: ng.Cfg.UnifiedAlerting.StatePeriodicSaveBatchSize,
RulesPerRuleGroupLimit: ng.Cfg.UnifiedAlerting.RulesPerRuleGroupLimit,
Tracer: ng.tracer,
Log: log.New("ngalert.state.manager"),

@ -71,6 +71,9 @@ type ManagerCfg struct {
DoNotSaveNormalState bool
// MaxStateSaveConcurrency controls the number of goroutines (per rule) that can save alert state in parallel.
MaxStateSaveConcurrency int
// StatePeriodicSaveBatchSize controls the size of the alert instance batch that is saved periodically when the
// alertingSaveStatePeriodic feature flag is enabled.
StatePeriodicSaveBatchSize int
// ApplyNoDataAndErrorToAllStates makes state manager to apply exceptional results (NoData and Error)
// to all states when corresponding execution in the rule definition is set to either `Alerting` or `OK`
ApplyNoDataAndErrorToAllStates bool

@ -16,7 +16,7 @@ type InstanceStore interface {
// SaveAlertInstancesForRule overwrites the state for the given rule.
SaveAlertInstancesForRule(ctx context.Context, key models.AlertRuleKeyWithGroup, instances []models.AlertInstance) error
DeleteAlertInstancesByRule(ctx context.Context, key models.AlertRuleKeyWithGroup) error
FullSync(ctx context.Context, instances []models.AlertInstance) error
FullSync(ctx context.Context, instances []models.AlertInstance, batchSize int) error
}
// InstanceReader provides methods to fetch alert instances.

@ -20,6 +20,7 @@ type AsyncStatePersister struct {
log log.Logger
// doNotSaveNormalState controls whether eval.Normal state is persisted to the database and returned by get methods.
doNotSaveNormalState bool
batchSize int
store InstanceStore
ticker *clock.Ticker
metrics *metrics.State
@ -31,6 +32,7 @@ func NewAsyncStatePersister(log log.Logger, ticker *clock.Ticker, cfg ManagerCfg
store: cfg.InstanceStore,
ticker: ticker,
doNotSaveNormalState: cfg.DoNotSaveNormalState,
batchSize: cfg.StatePeriodicSaveBatchSize,
metrics: cfg.Metrics,
}
}
@ -58,11 +60,11 @@ func (a *AsyncStatePersister) fullSync(ctx context.Context, instancesProvider Al
startTime := time.Now()
a.log.Debug("Full state sync start")
instances := instancesProvider.GetAlertInstances(a.doNotSaveNormalState)
if err := a.store.FullSync(ctx, instances); err != nil {
if err := a.store.FullSync(ctx, instances, a.batchSize); err != nil {
a.log.Error("Full state sync failed", "duration", time.Since(startTime), "instances", len(instances))
return err
}
a.log.Debug("Full state sync done", "duration", time.Since(startTime), "instances", len(instances))
a.log.Debug("Full state sync done", "duration", time.Since(startTime), "instances", len(instances), "batchSize", a.batchSize)
if a.metrics != nil {
a.metrics.StateFullSyncDuration.Observe(time.Since(startTime).Seconds())
}

@ -64,7 +64,7 @@ func (f *FakeInstanceStore) DeleteAlertInstancesByRule(ctx context.Context, key
return nil
}
func (f *FakeInstanceStore) FullSync(ctx context.Context, instances []models.AlertInstance) error {
func (f *FakeInstanceStore) FullSync(ctx context.Context, instances []models.AlertInstance, batchSize int) error {
f.mtx.Lock()
defer f.mtx.Unlock()
f.recordedOps = []any{}

@ -224,49 +224,106 @@ func (st DBstore) DeleteAlertInstancesByRule(ctx context.Context, key models.Ale
})
}
func (st DBstore) FullSync(ctx context.Context, instances []models.AlertInstance) error {
// FullSync performs a full synchronization of the given alert instances to the database.
//
// This method will delete all existing alert instances and insert the given instances in a single transaction.
//
// The batchSize parameter controls how many instances are inserted per batch. Increasing batchSize can improve
// performance for large datasets, but can also increase load on the database.
func (st DBstore) FullSync(ctx context.Context, instances []models.AlertInstance, batchSize int) error {
if len(instances) == 0 {
return nil
}
if batchSize <= 0 {
batchSize = 1
}
return st.SQLStore.WithTransactionalDbSession(ctx, func(sess *sqlstore.DBSession) error {
// First we delete all records from the table
if _, err := sess.Exec("DELETE FROM alert_instance"); err != nil {
return fmt.Errorf("failed to delete alert_instance table: %w", err)
}
for _, alertInstance := range instances {
if err := models.ValidateAlertInstance(alertInstance); err != nil {
st.Logger.Warn("Failed to validate alert instance, skipping", "err", err, "rule_uid", alertInstance.RuleUID)
total := len(instances)
for start := 0; start < total; start += batchSize {
end := start + batchSize
if end > total {
end = total
}
batch := instances[start:end]
if err := st.insertInstancesBatch(sess, batch); err != nil {
return fmt.Errorf("failed to insert batch [%d:%d]: %w", start, end, err)
}
}
if err := sess.Commit(); err != nil {
return fmt.Errorf("failed to commit alert_instance table: %w", err)
}
return nil
})
}
func (st DBstore) insertInstancesBatch(sess *sqlstore.DBSession, batch []models.AlertInstance) error {
// If the batch is empty, nothing to insert.
if len(batch) == 0 {
return nil
}
query := strings.Builder{}
placeholders := make([]string, 0, len(batch))
args := make([]any, 0, len(batch)*11)
query.WriteString("INSERT INTO alert_instance ")
query.WriteString("(rule_org_id, rule_uid, labels, labels_hash, current_state, current_reason, current_state_since, current_state_end, last_eval_time, resolved_at, last_sent_at) VALUES ")
for _, instance := range batch {
if err := models.ValidateAlertInstance(instance); err != nil {
st.Logger.Warn("Skipping invalid alert instance", "err", err, "rule_uid", instance.RuleUID)
continue
}
labelTupleJSON, err := alertInstance.Labels.StringKey()
labelTupleJSON, err := instance.Labels.StringKey()
if err != nil {
st.Logger.Warn("Failed to generate alert instance labels key, skipping", "err", err, "rule_uid", alertInstance.RuleUID)
st.Logger.Warn("Skipping instance with invalid labels key", "err", err, "rule_uid", instance.RuleUID)
continue
}
_, err = sess.Exec(
"INSERT INTO alert_instance (rule_org_id, rule_uid, labels, labels_hash, current_state, current_reason, current_state_since, current_state_end, last_eval_time, resolved_at, last_sent_at) VALUES (?,?,?,?,?,?,?,?,?,?,?)",
alertInstance.RuleOrgID,
alertInstance.RuleUID,
placeholders = append(placeholders, "(?,?,?,?,?,?,?,?,?,?,?)")
args = append(args,
instance.RuleOrgID,
instance.RuleUID,
labelTupleJSON,
alertInstance.LabelsHash,
alertInstance.CurrentState,
alertInstance.CurrentReason,
alertInstance.CurrentStateSince.Unix(),
alertInstance.CurrentStateEnd.Unix(),
alertInstance.LastEvalTime.Unix(),
nullableTimeToUnix(alertInstance.ResolvedAt),
nullableTimeToUnix(alertInstance.LastSentAt),
instance.LabelsHash,
instance.CurrentState,
instance.CurrentReason,
instance.CurrentStateSince.Unix(),
instance.CurrentStateEnd.Unix(),
instance.LastEvalTime.Unix(),
nullableTimeToUnix(instance.ResolvedAt),
nullableTimeToUnix(instance.LastSentAt),
)
if err != nil {
return fmt.Errorf("failed to insert into alert_instance table: %w", err)
}
// If no valid instances were found in this batch, skip insertion.
if len(placeholders) == 0 {
return nil
}
if err := sess.Commit(); err != nil {
return fmt.Errorf("failed to commit alert_instance table: %w", err)
query.WriteString(strings.Join(placeholders, ","))
execArgs := make([]any, 0, len(args)+1)
execArgs = append(execArgs, query.String())
execArgs = append(execArgs, args...)
if _, err := sess.Exec(execArgs...); err != nil {
return fmt.Errorf("failed to insert instances: %w", err)
}
return nil
})
}
// nullableTimeToUnix converts a nullable time.Time to nil, if it is nil, otherwise it converts the time.Time to a unix timestamp.

@ -297,6 +297,8 @@ func TestIntegrationAlertInstanceOperations(t *testing.T) {
}
func TestIntegrationFullSync(t *testing.T) {
batchSize := 1
ctx := context.Background()
_, dbstore := tests.SetupTestEnv(t, baseIntervalSeconds)
@ -310,7 +312,7 @@ func TestIntegrationFullSync(t *testing.T) {
}
t.Run("Should do a proper full sync", func(t *testing.T) {
err := dbstore.FullSync(ctx, instances)
err := dbstore.FullSync(ctx, instances, batchSize)
require.NoError(t, err)
res, err := dbstore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{
@ -331,8 +333,9 @@ func TestIntegrationFullSync(t *testing.T) {
}
}
})
t.Run("Should remove non existing entries on sync", func(t *testing.T) {
err := dbstore.FullSync(ctx, instances[1:])
err := dbstore.FullSync(ctx, instances[1:], batchSize)
require.NoError(t, err)
res, err := dbstore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{
@ -346,9 +349,35 @@ func TestIntegrationFullSync(t *testing.T) {
}
}
})
t.Run("Should add new entries on sync", func(t *testing.T) {
newRuleUID := "y"
err := dbstore.FullSync(ctx, append(instances, generateTestAlertInstance(orgID, newRuleUID)))
err := dbstore.FullSync(ctx, append(instances, generateTestAlertInstance(orgID, newRuleUID)), batchSize)
require.NoError(t, err)
res, err := dbstore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{
RuleOrgID: orgID,
})
require.NoError(t, err)
require.Len(t, res, len(instances)+1)
for _, ruleUID := range append(ruleUIDs, newRuleUID) {
found := false
for _, instance := range res {
if instance.RuleUID == ruleUID {
found = true
continue
}
}
if !found {
t.Errorf("Instance with RuleUID '%s' not found", ruleUID)
}
}
})
t.Run("Should save all instances when batch size is bigger than 1", func(t *testing.T) {
batchSize = 2
newRuleUID := "y"
err := dbstore.FullSync(ctx, append(instances, generateTestAlertInstance(orgID, newRuleUID)), batchSize)
require.NoError(t, err)
res, err := dbstore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{
@ -369,6 +398,109 @@ func TestIntegrationFullSync(t *testing.T) {
}
}
})
t.Run("Should not fail when the instances are empty", func(t *testing.T) {
// First, insert some data into the table.
initialInstances := []models.AlertInstance{
generateTestAlertInstance(orgID, "preexisting-1"),
generateTestAlertInstance(orgID, "preexisting-2"),
}
err := dbstore.FullSync(ctx, initialInstances, 5)
require.NoError(t, err)
// Now call FullSync with no instances. According to the code, this should return nil
// and should not delete anything in the table.
err = dbstore.FullSync(ctx, []models.AlertInstance{}, 5)
require.NoError(t, err)
// Check that the previously inserted instances are still present.
res, err := dbstore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{
RuleOrgID: orgID,
})
require.NoError(t, err)
require.Len(t, res, 2, "Expected the preexisting instances to remain since empty sync does nothing")
found1, found2 := false, false
for _, r := range res {
if r.RuleUID == "preexisting-1" {
found1 = true
}
if r.RuleUID == "preexisting-2" {
found2 = true
}
}
require.True(t, found1, "Expected preexisting-1 to remain")
require.True(t, found2, "Expected preexisting-2 to remain")
})
t.Run("Should handle invalid instances by skipping them", func(t *testing.T) {
// Create a batch with one valid and one invalid instance
validInstance := generateTestAlertInstance(orgID, "valid")
invalidInstance := generateTestAlertInstance(orgID, "")
// Make the invalid instance actually invalid
invalidInstance.AlertInstanceKey.RuleUID = ""
err := dbstore.FullSync(ctx, []models.AlertInstance{validInstance, invalidInstance}, 2)
require.NoError(t, err)
// Only the valid instance should be saved.
res, err := dbstore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{
RuleOrgID: orgID,
})
require.NoError(t, err)
require.Len(t, res, 1)
require.Equal(t, "valid", res[0].RuleUID)
})
t.Run("Should handle batchSize larger than the number of instances", func(t *testing.T) {
// Insert a small number of instances but use a large batchSize
smallSet := []models.AlertInstance{
generateTestAlertInstance(orgID, "batch-test1"),
generateTestAlertInstance(orgID, "batch-test2"),
}
err := dbstore.FullSync(ctx, smallSet, 100)
require.NoError(t, err)
res, err := dbstore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{
RuleOrgID: orgID,
})
require.NoError(t, err)
require.Len(t, res, len(smallSet))
found1, found2 := false, false
for _, r := range res {
if r.RuleUID == "batch-test1" {
found1 = true
}
if r.RuleUID == "batch-test2" {
found2 = true
}
}
require.True(t, found1)
require.True(t, found2)
})
t.Run("Should handle a large set of instances with a moderate batchSize", func(t *testing.T) {
// Clear everything first.
err := dbstore.FullSync(ctx, []models.AlertInstance{}, 1)
require.NoError(t, err)
largeCount := 300
largeSet := make([]models.AlertInstance, largeCount)
for i := 0; i < largeCount; i++ {
largeSet[i] = generateTestAlertInstance(orgID, fmt.Sprintf("large-%d", i))
}
err = dbstore.FullSync(ctx, largeSet, 50)
require.NoError(t, err)
res, err := dbstore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{
RuleOrgID: orgID,
})
require.NoError(t, err)
require.Len(t, res, largeCount)
})
}
func generateTestAlertInstance(orgID int64, ruleID string) models.AlertInstance {

@ -116,6 +116,7 @@ type UnifiedAlertingSettings struct {
// MaxStateSaveConcurrency controls the number of goroutines (per rule) that can save alert state in parallel.
MaxStateSaveConcurrency int
StatePeriodicSaveInterval time.Duration
StatePeriodicSaveBatchSize int
RulesPerRuleGroupLimit int64
// Retention period for Alertmanager notification log entries.
@ -457,6 +458,8 @@ func (cfg *Cfg) ReadUnifiedAlertingSettings(iniFile *ini.File) error {
return err
}
uaCfg.StatePeriodicSaveBatchSize = ua.Key("state_periodic_save_batch_size").MustInt(1)
uaCfg.NotificationLogRetention, err = gtime.ParseDuration(valueAsString(ua, "notification_log_retention", (5 * 24 * time.Hour).String()))
if err != nil {
return err

Loading…
Cancel
Save