From 5e4fd94413387e3f9692acc49aa5de9442bb2951 Mon Sep 17 00:00:00 2001 From: Joe Blubaugh Date: Fri, 2 Sep 2022 11:17:20 +0800 Subject: [PATCH] Alerting: Write and Delete multiple alert instances. (#54072) Prior to this change, all alert instance writes and deletes happened individually, in their own database transaction. This change batches up writes or deletes for a given rule's evaluation loop into a single transaction before applying it. Before: ``` goos: darwin goarch: arm64 pkg: github.com/grafana/grafana/pkg/services/ngalert/store BenchmarkAlertInstanceOperations-8 398 2991381 ns/op 1133537 B/op 27703 allocs/op --- BENCH: BenchmarkAlertInstanceOperations-8 util.go:127: alert definition: {orgID: 1, UID: FovKXiRVzm} with title: "an alert definition FTvFXmRVkz" interval: 60 created util.go:127: alert definition: {orgID: 1, UID: foDFXmRVkm} with title: "an alert definition fovFXmRVkz" interval: 60 created util.go:127: alert definition: {orgID: 1, UID: VQvFuigVkm} with title: "an alert definition VwDKXmR4kz" interval: 60 created PASS ok github.com/grafana/grafana/pkg/services/ngalert/store 1.619s ``` After: ``` goos: darwin goarch: arm64 pkg: github.com/grafana/grafana/pkg/services/ngalert/store BenchmarkAlertInstanceOperations-8 1440 816484 ns/op 352297 B/op 6529 allocs/op --- BENCH: BenchmarkAlertInstanceOperations-8 util.go:127: alert definition: {orgID: 1, UID: 302r_igVzm} with title: "an alert definition q0h9lmR4zz" interval: 60 created util.go:127: alert definition: {orgID: 1, UID: 71hrlmR4km} with title: "an alert definition nJ29_mR4zz" interval: 60 created util.go:127: alert definition: {orgID: 1, UID: Cahr_mR4zm} with title: "an alert definition ja2rlmg4zz" interval: 60 created PASS ok github.com/grafana/grafana/pkg/services/ngalert/store 1.383s ``` So we cut time by about 75% and memory allocations by about 60% when storing and deleting 100 instances. This change also updates some of our tests so that they run successfully against postgreSQL - we were using random Int64s, but postgres integers, which our tables use, max out at 2^31-1 --- pkg/services/ngalert/models/instance.go | 28 +-- .../ngalert/models/instance_labels.go | 8 +- pkg/services/ngalert/models/testing.go | 12 +- .../ngalert/schedule/schedule_test.go | 36 ++- .../ngalert/schedule/schedule_unit_test.go | 10 +- pkg/services/ngalert/state/manager.go | 120 ++++++--- .../ngalert/state/manager_private_test.go | 8 +- pkg/services/ngalert/state/manager_test.go | 55 +++-- pkg/services/ngalert/store/deltas_test.go | 6 +- .../ngalert/store/instance_database.go | 196 ++++++++++++--- .../ngalert/store/instance_database_test.go | 233 ++++++++++++++---- pkg/services/ngalert/store/testing.go | 10 +- pkg/services/ngalert/tests/util.go | 16 +- .../sqlstore/migrator/postgres_dialect.go | 23 +- pkg/services/sqlstore/migrator/upsert_test.go | 4 +- 15 files changed, 557 insertions(+), 208 deletions(-) diff --git a/pkg/services/ngalert/models/instance.go b/pkg/services/ngalert/models/instance.go index 9451889eb90..1bd5bf02c5d 100644 --- a/pkg/services/ngalert/models/instance.go +++ b/pkg/services/ngalert/models/instance.go @@ -7,10 +7,8 @@ import ( // AlertInstance represents a single alert instance. type AlertInstance struct { - RuleOrgID int64 `xorm:"rule_org_id"` - RuleUID string `xorm:"rule_uid"` + AlertInstanceKey `xorm:"extends"` Labels InstanceLabels - LabelsHash string CurrentState InstanceStateType CurrentReason string CurrentStateSince time.Time @@ -18,6 +16,12 @@ type AlertInstance struct { LastEvalTime time.Time } +type AlertInstanceKey struct { + RuleOrgID int64 `xorm:"rule_org_id"` + RuleUID string `xorm:"rule_uid"` + LabelsHash string +} + // InstanceStateType is an enum for instance states. type InstanceStateType string @@ -44,18 +48,6 @@ func (i InstanceStateType) IsValid() bool { i == InstanceStateError } -// SaveAlertInstanceCommand is the query for saving a new alert instance. -type SaveAlertInstanceCommand struct { - RuleOrgID int64 - RuleUID string - Labels InstanceLabels - State InstanceStateType - StateReason string - LastEvalTime time.Time - CurrentStateSince time.Time - CurrentStateEnd time.Time -} - // GetAlertInstanceQuery is the query for retrieving/deleting an alert definition by ID. // nolint:unused type GetAlertInstanceQuery struct { @@ -78,11 +70,7 @@ type ListAlertInstancesQuery struct { // ValidateAlertInstance validates that the alert instance contains an alert rule id, // and state. -func ValidateAlertInstance(alertInstance *AlertInstance) error { - if alertInstance == nil { - return fmt.Errorf("alert instance is invalid because it is nil") - } - +func ValidateAlertInstance(alertInstance AlertInstance) error { if alertInstance.RuleOrgID == 0 { return fmt.Errorf("alert instance is invalid due to missing alert rule organisation") } diff --git a/pkg/services/ngalert/models/instance_labels.go b/pkg/services/ngalert/models/instance_labels.go index d8bde220fd6..a19d8f736f9 100644 --- a/pkg/services/ngalert/models/instance_labels.go +++ b/pkg/services/ngalert/models/instance_labels.go @@ -42,7 +42,7 @@ func (il *InstanceLabels) StringKey() (string, error) { tl := labelsToTupleLabels(*il) b, err := json.Marshal(tl) if err != nil { - return "", fmt.Errorf("can not gereate key due to failure to encode labels: %w", err) + return "", fmt.Errorf("could not generate key due to failure to encode labels: %w", err) } return string(b), nil } @@ -54,7 +54,7 @@ func (il *InstanceLabels) StringAndHash() (string, string, error) { b, err := json.Marshal(tl) if err != nil { - return "", "", fmt.Errorf("can not gereate key for alert instance due to failure to encode labels: %w", err) + return "", "", fmt.Errorf("could not generate key for alert instance due to failure to encode labels: %w", err) } h := sha1.New() @@ -76,7 +76,7 @@ type tupleLabels []tupleLabel type tupleLabel [2]string // Sort tupleLabels by each elements first property (key). -func (t *tupleLabels) sortBtKey() { +func (t *tupleLabels) sortByKey() { if t == nil { return } @@ -91,7 +91,7 @@ func labelsToTupleLabels(l InstanceLabels) tupleLabels { for k, v := range l { t = append(t, tupleLabel{k, v}) } - t.sortBtKey() + t.sortByKey() return t } diff --git a/pkg/services/ngalert/models/testing.go b/pkg/services/ngalert/models/testing.go index 33988cd9a76..da55f8d5a57 100644 --- a/pkg/services/ngalert/models/testing.go +++ b/pkg/services/ngalert/models/testing.go @@ -53,25 +53,25 @@ func AlertRuleGen(mutators ...AlertRuleMutator) func() *AlertRule { if rand.Int63()%2 == 0 { d := util.GenerateShortUID() dashUID = &d - p := rand.Int63() + p := rand.Int63n(1500) panelID = &p } rule := &AlertRule{ - ID: rand.Int63(), - OrgID: rand.Int63(), + ID: rand.Int63n(1500), + OrgID: rand.Int63n(1500), Title: "TEST-ALERT-" + util.GenerateShortUID(), Condition: "A", Data: []AlertQuery{GenerateAlertQuery()}, Updated: time.Now().Add(-time.Duration(rand.Intn(100) + 1)), IntervalSeconds: rand.Int63n(60) + 1, - Version: rand.Int63(), + Version: rand.Int63n(1500), // Don't generate a rule ID too big for postgres UID: util.GenerateShortUID(), NamespaceUID: util.GenerateShortUID(), DashboardUID: dashUID, PanelID: panelID, RuleGroup: "TEST-GROUP-" + util.GenerateShortUID(), - RuleGroupIndex: rand.Int(), + RuleGroupIndex: rand.Intn(1500), NoDataState: randNoDataState(), ExecErrState: randErrState(), For: forInterval, @@ -95,7 +95,7 @@ func WithUniqueID() AlertRuleMutator { usedID := make(map[int64]struct{}) return func(rule *AlertRule) { for { - id := rand.Int63() + id := rand.Int63n(1500) if _, ok := usedID[id]; !ok { usedID[id] = struct{}{} rule.ID = id diff --git a/pkg/services/ngalert/schedule/schedule_test.go b/pkg/services/ngalert/schedule/schedule_test.go index 34c22e7f5ab..8502561ecdd 100644 --- a/pkg/services/ngalert/schedule/schedule_test.go +++ b/pkg/services/ngalert/schedule/schedule_test.go @@ -76,28 +76,38 @@ func TestWarmStateCache(t *testing.T) { }, } - saveCmd1 := &models.SaveAlertInstanceCommand{ - RuleOrgID: rule.OrgID, - RuleUID: rule.UID, - Labels: models.InstanceLabels{"test1": "testValue1"}, - State: models.InstanceStateNormal, + labels := models.InstanceLabels{"test1": "testValue1"} + _, hash, _ := labels.StringAndHash() + instance1 := models.AlertInstance{ + AlertInstanceKey: models.AlertInstanceKey{ + RuleOrgID: rule.OrgID, + RuleUID: rule.UID, + LabelsHash: hash, + }, + CurrentState: models.InstanceStateNormal, LastEvalTime: evaluationTime, CurrentStateSince: evaluationTime.Add(-1 * time.Minute), CurrentStateEnd: evaluationTime.Add(1 * time.Minute), + Labels: labels, } - _ = dbstore.SaveAlertInstance(ctx, saveCmd1) + _ = dbstore.SaveAlertInstances(ctx, instance1) - saveCmd2 := &models.SaveAlertInstanceCommand{ - RuleOrgID: rule.OrgID, - RuleUID: rule.UID, - Labels: models.InstanceLabels{"test2": "testValue2"}, - State: models.InstanceStateFiring, + labels = models.InstanceLabels{"test2": "testValue2"} + _, hash, _ = labels.StringAndHash() + instance2 := models.AlertInstance{ + AlertInstanceKey: models.AlertInstanceKey{ + RuleOrgID: rule.OrgID, + RuleUID: rule.UID, + LabelsHash: hash, + }, + CurrentState: models.InstanceStateFiring, LastEvalTime: evaluationTime, CurrentStateSince: evaluationTime.Add(-1 * time.Minute), CurrentStateEnd: evaluationTime.Add(1 * time.Minute), + Labels: labels, } - _ = dbstore.SaveAlertInstance(ctx, saveCmd2) + _ = dbstore.SaveAlertInstances(ctx, instance2) cfg := setting.UnifiedAlertingSettings{ BaseInterval: time.Second, @@ -252,7 +262,7 @@ func assertEvalRun(t *testing.T, ch <-chan evalAppliedInfo, tick time.Time, keys case info := <-ch: _, ok := expected[info.alertDefKey] if !ok { - t.Fatal(fmt.Sprintf("alert rule: %v should not have been evaluated at: %v", info.alertDefKey, info.now)) + t.Fatalf(fmt.Sprintf("alert rule: %v should not have been evaluated at: %v", info.alertDefKey, info.now)) } t.Logf("alert rule: %v evaluated at: %v", info.alertDefKey, info.now) assert.Equal(t, tick, info.now) diff --git a/pkg/services/ngalert/schedule/schedule_unit_test.go b/pkg/services/ngalert/schedule/schedule_unit_test.go index dc730ac2ad1..29a34928fe1 100644 --- a/pkg/services/ngalert/schedule/schedule_unit_test.go +++ b/pkg/services/ngalert/schedule/schedule_unit_test.go @@ -111,10 +111,10 @@ func TestSchedule_ruleRoutine(t *testing.T) { require.Len(t, states, 1) s := states[0] - var cmd *models.SaveAlertInstanceCommand + var cmd *models.AlertInstance for _, op := range instanceStore.RecordedOps { switch q := op.(type) { - case models.SaveAlertInstanceCommand: + case models.AlertInstance: cmd = &q } if cmd != nil { @@ -123,11 +123,11 @@ func TestSchedule_ruleRoutine(t *testing.T) { } require.NotNil(t, cmd) - t.Logf("Saved alert instance: %v", cmd) + t.Logf("Saved alert instances: %v", cmd) require.Equal(t, rule.OrgID, cmd.RuleOrgID) require.Equal(t, expectedTime, cmd.LastEvalTime) - require.Equal(t, cmd.RuleUID, cmd.RuleUID) - require.Equal(t, evalState.String(), string(cmd.State)) + require.Equal(t, rule.UID, cmd.RuleUID) + require.Equal(t, evalState.String(), string(cmd.CurrentState)) require.Equal(t, s.Labels, data.Labels(cmd.Labels)) }) diff --git a/pkg/services/ngalert/state/manager.go b/pkg/services/ngalert/state/manager.go index d839a56b7ff..374bc6b820e 100644 --- a/pkg/services/ngalert/state/manager.go +++ b/pkg/services/ngalert/state/manager.go @@ -181,15 +181,13 @@ func (st *Manager) ProcessEvalResults(ctx context.Context, evaluatedAt time.Time states = append(states, s) processedResults[s.CacheId] = s } - st.staleResultsHandler(ctx, evaluatedAt, alertRule, processedResults) + + st.cleanupStaleResults(ctx, evaluatedAt, alertRule, processedResults) if len(states) > 0 { logger.Debug("saving new states to the database", "count", len(states)) - for _, state := range states { - if err := st.saveState(ctx, state); err != nil { - logger.Error("failed to save alert state", "labels", state.Labels.String(), "state", state.State.String(), "err", err.Error()) - } - } + _, _ = st.saveAlertStates(ctx, states...) } + return states } @@ -323,34 +321,86 @@ func (st *Manager) flushState(ctx context.Context) { st.log.Info("flushing the state") st.cache.mtxStates.Lock() defer st.cache.mtxStates.Unlock() + totalStates, errorsCnt := 0, 0 + stateBatchSize := 512 // 4KiB, 8 bytes per pointer + var stateSlice []*State = make([]*State, 0, stateBatchSize) + + writeBatch := func() { + if len(stateSlice) > 0 { + savedCount, failedCount := st.saveAlertStates(ctx, stateSlice...) + errorsCnt += failedCount + totalStates += savedCount + stateSlice = stateSlice[:0] + } + } + for _, orgStates := range st.cache.states { for _, ruleStates := range orgStates { + // Flush the state batch if we would go over the batch size when adding the next loop. + if len(ruleStates)+len(stateSlice) > stateBatchSize { + writeBatch() + } + for _, state := range ruleStates { - err := st.saveState(ctx, state) - totalStates++ - if err != nil { - st.log.Error("failed to save alert state", append(state.GetRuleKey().LogContext(), "labels", state.Labels.String(), "state", state.State.String(), "err", err.Error())) - errorsCnt++ - } + // Take everything for a single rule, even if it exceeds the batch + // size. We'd like to flush everything for one rule as a unit. + stateSlice = append(stateSlice, state) } } } + + writeBatch() + st.log.Info("the state has been flushed", "total_instances", totalStates, "errors", errorsCnt, "took", st.clock.Since(t)) } -func (st *Manager) saveState(ctx context.Context, s *State) error { - cmd := ngModels.SaveAlertInstanceCommand{ - RuleOrgID: s.OrgID, - RuleUID: s.AlertRuleUID, - Labels: ngModels.InstanceLabels(s.Labels), - State: ngModels.InstanceStateType(s.State.String()), - StateReason: s.StateReason, - LastEvalTime: s.LastEvaluationTime, - CurrentStateSince: s.StartsAt, - CurrentStateEnd: s.EndsAt, +// TODO: Is the `State` type necessary? Should it embed the instance? +func (st *Manager) saveAlertStates(ctx context.Context, states ...*State) (saved, failed int) { + st.log.Debug("saving alert states", "count", len(states)) + instances := make([]ngModels.AlertInstance, 0, len(states)) + + type debugInfo struct { + OrgID int64 + Uid string + State string + Labels string } - return st.instanceStore.SaveAlertInstance(ctx, &cmd) + debug := make([]debugInfo, 0) + + for _, s := range states { + labels := ngModels.InstanceLabels(s.Labels) + _, hash, err := labels.StringAndHash() + if err != nil { + debug = append(debug, debugInfo{s.OrgID, s.AlertRuleUID, s.State.String(), s.Labels.String()}) + st.log.Error("failed to save alert instance with invalid labels", "orgID", s.OrgID, "ruleUID", s.AlertRuleUID, "err", err) + continue + } + fields := ngModels.AlertInstance{ + AlertInstanceKey: ngModels.AlertInstanceKey{ + RuleOrgID: s.OrgID, + RuleUID: s.AlertRuleUID, + LabelsHash: hash, + }, + Labels: ngModels.InstanceLabels(s.Labels), + CurrentState: ngModels.InstanceStateType(s.State.String()), + CurrentReason: s.StateReason, + LastEvalTime: s.LastEvaluationTime, + CurrentStateSince: s.StartsAt, + CurrentStateEnd: s.EndsAt, + } + instances = append(instances, fields) + } + + if err := st.instanceStore.SaveAlertInstances(ctx, instances...); err != nil { + for _, inst := range instances { + debug = append(debug, debugInfo{inst.RuleOrgID, inst.RuleUID, string(inst.CurrentState), data.Labels(inst.Labels).String()}) + } + st.log.Error("failed to save alert states", "states", debug, "err", err) + return 0, len(debug) + } + + return len(instances), len(debug) } // TODO: why wouldn't you allow other types like NoData or Error? @@ -426,11 +476,18 @@ func (st *Manager) annotateState(ctx context.Context, alertRule *ngModels.AlertR } } -func (st *Manager) staleResultsHandler(ctx context.Context, evaluatedAt time.Time, alertRule *ngModels.AlertRule, states map[string]*State) { +func (st *Manager) cleanupStaleResults( + ctx context.Context, + evaluatedAt time.Time, + alertRule *ngModels.AlertRule, + newStates map[string]*State, +) { allStates := st.GetStatesForRuleUID(alertRule.OrgID, alertRule.UID) + toDelete := make([]ngModels.AlertInstanceKey, 0) + for _, s := range allStates { - _, ok := states[s.CacheId] - if !ok && isItStale(evaluatedAt, s.LastEvaluationTime, alertRule.IntervalSeconds) { + // Is the cached state in our recently processed results? If not, is it stale? + if _, ok := newStates[s.CacheId]; !ok && stateIsStale(evaluatedAt, s.LastEvaluationTime, alertRule.IntervalSeconds) { st.log.Debug("removing stale state entry", "orgID", s.OrgID, "alertRuleUID", s.AlertRuleUID, "cacheID", s.CacheId) st.cache.deleteEntry(s.OrgID, s.AlertRuleUID, s.CacheId) ilbs := ngModels.InstanceLabels(s.Labels) @@ -439,9 +496,7 @@ func (st *Manager) staleResultsHandler(ctx context.Context, evaluatedAt time.Tim st.log.Error("unable to get labelsHash", "err", err.Error(), "orgID", s.OrgID, "alertRuleUID", s.AlertRuleUID) } - if err = st.instanceStore.DeleteAlertInstance(ctx, s.OrgID, s.AlertRuleUID, labelsHash); err != nil { - st.log.Error("unable to delete stale instance from database", "err", err.Error(), "orgID", s.OrgID, "alertRuleUID", s.AlertRuleUID, "cacheID", s.CacheId) - } + toDelete = append(toDelete, ngModels.AlertInstanceKey{RuleOrgID: s.OrgID, RuleUID: s.AlertRuleUID, LabelsHash: labelsHash}) if s.State == eval.Alerting { st.annotateState(ctx, alertRule, s.Labels, evaluatedAt, @@ -450,9 +505,14 @@ func (st *Manager) staleResultsHandler(ctx context.Context, evaluatedAt time.Tim } } } + + if err := st.instanceStore.DeleteAlertInstances(ctx, toDelete...); err != nil { + st.log.Error("unable to delete stale instances from database", "err", err.Error(), + "orgID", alertRule.OrgID, "alertRuleUID", alertRule.UID, "count", len(toDelete)) + } } -func isItStale(evaluatedAt time.Time, lastEval time.Time, intervalSeconds int64) bool { +func stateIsStale(evaluatedAt time.Time, lastEval time.Time, intervalSeconds int64) bool { return !lastEval.Add(2 * time.Duration(intervalSeconds) * time.Second).After(evaluatedAt) } diff --git a/pkg/services/ngalert/state/manager_private_test.go b/pkg/services/ngalert/state/manager_private_test.go index edee75ed404..b0a7eb9f697 100644 --- a/pkg/services/ngalert/state/manager_private_test.go +++ b/pkg/services/ngalert/state/manager_private_test.go @@ -111,7 +111,7 @@ func Test_maybeNewImage(t *testing.T) { } } -func TestIsItStale(t *testing.T) { +func TestStateIsStale(t *testing.T) { now := time.Now() intervalSeconds := rand.Int63n(10) + 5 @@ -148,7 +148,7 @@ func TestIsItStale(t *testing.T) { } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - require.Equal(t, tc.expectedResult, isItStale(now, tc.lastEvaluation, intervalSeconds)) + require.Equal(t, tc.expectedResult, stateIsStale(now, tc.lastEvaluation, intervalSeconds)) }) } } @@ -178,10 +178,10 @@ func TestClose(t *testing.T) { st.Close(context.Background()) t.Run("should flush the state to store", func(t *testing.T) { - savedStates := make(map[string]ngmodels.SaveAlertInstanceCommand) + savedStates := make(map[string]ngmodels.AlertInstance) for _, op := range instanceStore.RecordedOps { switch q := op.(type) { - case ngmodels.SaveAlertInstanceCommand: + case ngmodels.AlertInstance: cacheId, err := q.Labels.StringKey() require.NoError(t, err) savedStates[cacheId] = q diff --git a/pkg/services/ngalert/state/manager_test.go b/pkg/services/ngalert/state/manager_test.go index 688947e6b94..18ca81f385f 100644 --- a/pkg/services/ngalert/state/manager_test.go +++ b/pkg/services/ngalert/state/manager_test.go @@ -2022,10 +2022,10 @@ func TestProcessEvalResults(t *testing.T) { require.NotEmpty(t, states) - savedStates := make(map[string]models.SaveAlertInstanceCommand) + savedStates := make(map[string]models.AlertInstance) for _, op := range instanceStore.RecordedOps { switch q := op.(type) { - case models.SaveAlertInstanceCommand: + case models.AlertInstance: cacheId, err := q.Labels.StringKey() require.NoError(t, err) savedStates[cacheId] = q @@ -2058,28 +2058,39 @@ func TestStaleResultsHandler(t *testing.T) { const mainOrgID int64 = 1 rule := tests.CreateTestAlertRule(t, ctx, dbstore, int64(interval.Seconds()), mainOrgID) lastEval := evaluationTime.Add(-2 * interval) - saveCmd1 := &models.SaveAlertInstanceCommand{ - RuleOrgID: rule.OrgID, - RuleUID: rule.UID, - Labels: models.InstanceLabels{"test1": "testValue1"}, - State: models.InstanceStateNormal, - LastEvalTime: lastEval, - CurrentStateSince: lastEval, - CurrentStateEnd: lastEval.Add(3 * interval), - } - - _ = dbstore.SaveAlertInstance(ctx, saveCmd1) - saveCmd2 := &models.SaveAlertInstanceCommand{ - RuleOrgID: rule.OrgID, - RuleUID: rule.UID, - Labels: models.InstanceLabels{"test2": "testValue2"}, - State: models.InstanceStateFiring, - LastEvalTime: lastEval, - CurrentStateSince: lastEval, - CurrentStateEnd: lastEval.Add(3 * interval), + labels1 := models.InstanceLabels{"test1": "testValue1"} + _, hash1, _ := labels1.StringAndHash() + labels2 := models.InstanceLabels{"test2": "testValue2"} + _, hash2, _ := labels2.StringAndHash() + instances := []models.AlertInstance{ + { + AlertInstanceKey: models.AlertInstanceKey{ + RuleOrgID: rule.OrgID, + RuleUID: rule.UID, + LabelsHash: hash1, + }, + CurrentState: models.InstanceStateNormal, + Labels: labels1, + LastEvalTime: lastEval, + CurrentStateSince: lastEval, + CurrentStateEnd: lastEval.Add(3 * interval), + }, + { + AlertInstanceKey: models.AlertInstanceKey{ + RuleOrgID: rule.OrgID, + RuleUID: rule.UID, + LabelsHash: hash2, + }, + CurrentState: models.InstanceStateFiring, + Labels: labels2, + LastEvalTime: lastEval, + CurrentStateSince: lastEval, + CurrentStateEnd: lastEval.Add(3 * interval), + }, } - _ = dbstore.SaveAlertInstance(ctx, saveCmd2) + + _ = dbstore.SaveAlertInstances(ctx, instances...) testCases := []struct { desc string diff --git a/pkg/services/ngalert/store/deltas_test.go b/pkg/services/ngalert/store/deltas_test.go index 0cd63526482..127818fea3c 100644 --- a/pkg/services/ngalert/store/deltas_test.go +++ b/pkg/services/ngalert/store/deltas_test.go @@ -16,7 +16,7 @@ import ( ) func TestCalculateChanges(t *testing.T) { - orgId := rand.Int63() + orgId := int64(rand.Int31()) t.Run("detects alerts that need to be added", func(t *testing.T) { fakeStore := NewFakeRuleStore(t) @@ -101,8 +101,8 @@ func TestCalculateChanges(t *testing.T) { r := models.CopyRule(rule) // Ignore difference in the following fields as submitted models do not have them set - r.ID = rand.Int63() - r.Version = rand.Int63() + r.ID = int64(rand.Int31()) + r.Version = int64(rand.Int31()) r.Updated = r.Updated.Add(1 * time.Minute) submitted = append(submitted, r) diff --git a/pkg/services/ngalert/store/instance_database.go b/pkg/services/ngalert/store/instance_database.go index abe8fbe3d0d..ce8e096f348 100644 --- a/pkg/services/ngalert/store/instance_database.go +++ b/pkg/services/ngalert/store/instance_database.go @@ -3,6 +3,7 @@ package store import ( "context" "fmt" + "sort" "strings" "github.com/grafana/grafana/pkg/services/ngalert/models" @@ -12,9 +13,9 @@ import ( type InstanceStore interface { GetAlertInstance(ctx context.Context, cmd *models.GetAlertInstanceQuery) error ListAlertInstances(ctx context.Context, cmd *models.ListAlertInstancesQuery) error - SaveAlertInstance(ctx context.Context, cmd *models.SaveAlertInstanceCommand) error + SaveAlertInstances(ctx context.Context, cmd ...models.AlertInstance) error FetchOrgIds(ctx context.Context) ([]int64, error) - DeleteAlertInstance(ctx context.Context, orgID int64, ruleUID, labelsHash string) error + DeleteAlertInstances(ctx context.Context, keys ...models.AlertInstanceKey) error DeleteAlertInstancesByRule(ctx context.Context, key models.AlertRuleKey) error } @@ -65,7 +66,7 @@ func (st DBstore) ListAlertInstances(ctx context.Context, cmd *models.ListAlertI params = append(params, p...) } - addToQuery("SELECT alert_instance.*, alert_rule.title AS rule_title FROM alert_instance LEFT JOIN alert_rule ON alert_instance.rule_org_id = alert_rule.org_id AND alert_instance.rule_uid = alert_rule.uid WHERE rule_org_id = ?", cmd.RuleOrgID) + addToQuery("SELECT * FROM alert_instance WHERE rule_org_id = ?", cmd.RuleOrgID) if cmd.RuleUID != "" { addToQuery(` AND rule_uid = ?`, cmd.RuleUID) @@ -88,43 +89,89 @@ func (st DBstore) ListAlertInstances(ctx context.Context, cmd *models.ListAlertI }) } -// SaveAlertInstance is a handler for saving a new alert instance. -func (st DBstore) SaveAlertInstance(ctx context.Context, cmd *models.SaveAlertInstanceCommand) error { - return st.SQLStore.WithDbSession(ctx, func(sess *sqlstore.DBSession) error { - labelTupleJSON, labelsHash, err := cmd.Labels.StringAndHash() +// SaveAlertInstances saves all the provided alert instances to the store in a single transaction. +func (st DBstore) SaveAlertInstances(ctx context.Context, cmd ...models.AlertInstance) error { + // The function starts a single transaction and batches writes into + // statements with `maxRows` instances per statements. This makes for a + // fairly efficient transcation without creating statements that are too long + // for some databases to process. For example, SQLite has a limit of 999 + // variables per write. + + err := st.SQLStore.WithTransactionalDbSession(ctx, func(sess *sqlstore.DBSession) error { + keyNames := []string{"rule_org_id", "rule_uid", "labels_hash"} + fieldNames := []string{ + "rule_org_id", "rule_uid", "labels", "labels_hash", "current_state", + "current_reason", "current_state_since", "current_state_end", "last_eval_time", + } + fieldsPerRow := len(fieldNames) + maxRows := 20 + maxArgs := maxRows * fieldsPerRow + + // Prepare a statement for the maximum batch size. + bigUpsertSQL, err := st.SQLStore.Dialect.UpsertMultipleSQL( + "alert_instance", keyNames, fieldNames, maxRows) if err != nil { return err } - alertInstance := &models.AlertInstance{ - RuleOrgID: cmd.RuleOrgID, - RuleUID: cmd.RuleUID, - Labels: cmd.Labels, - LabelsHash: labelsHash, - CurrentState: cmd.State, - CurrentReason: cmd.StateReason, - CurrentStateSince: cmd.CurrentStateSince, - CurrentStateEnd: cmd.CurrentStateEnd, - LastEvalTime: cmd.LastEvalTime, + bigStmt, err := sess.DB().Prepare(bigUpsertSQL) + if err != nil { + return err } - if err := models.ValidateAlertInstance(alertInstance); err != nil { - return err + // Generate batches of `maxRows` and write the statements when full. + args := make([]interface{}, 0, maxArgs) + for _, alertInstance := range cmd { + if len(args) >= maxArgs { + if _, err = bigStmt.ExecContext(ctx, args...); err != nil { + return err + } + args = args[:0] + } + + labelTupleJSON, err := alertInstance.Labels.StringKey() + if err != nil { + return err + } + + if err := models.ValidateAlertInstance(alertInstance); err != nil { + return err + } + + args = append(args, + alertInstance.RuleOrgID, alertInstance.RuleUID, labelTupleJSON, alertInstance.LabelsHash, + alertInstance.CurrentState, alertInstance.CurrentReason, alertInstance.CurrentStateSince.Unix(), + alertInstance.CurrentStateEnd.Unix(), alertInstance.LastEvalTime.Unix()) } - params := append(make([]interface{}, 0), alertInstance.RuleOrgID, alertInstance.RuleUID, labelTupleJSON, alertInstance.LabelsHash, alertInstance.CurrentState, alertInstance.CurrentReason, alertInstance.CurrentStateSince.Unix(), alertInstance.CurrentStateEnd.Unix(), alertInstance.LastEvalTime.Unix()) + // Write the final batch of up to maxRows in size. + if len(args) > 0 && len(args)%fieldsPerRow == 0 { + upsertSQL, err := st.SQLStore.Dialect.UpsertMultipleSQL( + "alert_instance", keyNames, fieldNames, len(args)/fieldsPerRow) + if err != nil { + return err + } - upsertSQL := st.SQLStore.Dialect.UpsertSQL( - "alert_instance", - []string{"rule_org_id", "rule_uid", "labels_hash"}, - []string{"rule_org_id", "rule_uid", "labels", "labels_hash", "current_state", "current_reason", "current_state_since", "current_state_end", "last_eval_time"}) - _, err = sess.SQL(upsertSQL, params...).Query() - if err != nil { - return err + stmt, err := sess.DB().Prepare(upsertSQL) + if err != nil { + return err + } + + _, err = stmt.ExecContext(ctx, args...) + if err != nil { + return err + } + } else { + return fmt.Errorf("failed to upsert alert instances. Last statements had %v fields, which is not a multiple of the number of fields, %v", len(args), fieldsPerRow) } return nil }) + if err != nil { + return err + } + + return nil } func (st DBstore) FetchOrgIds(ctx context.Context) ([]int64, error) { @@ -150,14 +197,103 @@ func (st DBstore) FetchOrgIds(ctx context.Context) ([]int64, error) { return orgIds, err } -func (st DBstore) DeleteAlertInstance(ctx context.Context, orgID int64, ruleUID, labelsHash string) error { - return st.SQLStore.WithTransactionalDbSession(ctx, func(sess *sqlstore.DBSession) error { - _, err := sess.Exec("DELETE FROM alert_instance WHERE rule_org_id = ? AND rule_uid = ? AND labels_hash = ?", orgID, ruleUID, labelsHash) +// DeleteAlertInstances deletes instances with the provided keys in a single transaction. +func (st DBstore) DeleteAlertInstances(ctx context.Context, keys ...models.AlertInstanceKey) error { + if len(keys) == 0 { + return nil + } + + type data struct { + ruleOrgID int64 + ruleUID string + labelHashes []interface{} + } + + // Sort by org and rule UID. Most callers will have grouped already, but it's + // cheap to verify and leads to more compact transactions. + sort.Slice(keys, func(i, j int) bool { + aye := keys[i] + jay := keys[j] + + if aye.RuleOrgID < jay.RuleOrgID { + return true + } + + if aye.RuleOrgID == jay.RuleOrgID && aye.RuleUID < jay.RuleUID { + return true + } + return false + }) + + maxRows := 200 + rowData := data{ + 0, "", make([]interface{}, 0, maxRows), + } + placeholdersBuilder := strings.Builder{} + placeholdersBuilder.WriteString("(") + + execQuery := func(s *sqlstore.DBSession, rd data, placeholders string) error { + if len(rd.labelHashes) == 0 { + return nil + } + + placeholders = strings.TrimRight(placeholders, ", ") + placeholders = placeholders + ")" + + queryString := fmt.Sprintf( + "DELETE FROM alert_instance WHERE rule_org_id = ? AND rule_uid = ? AND labels_hash IN %s;", + placeholders, + ) + + execArgs := make([]interface{}, 0, 3+len(rd.labelHashes)) + execArgs = append(execArgs, queryString, rd.ruleOrgID, rd.ruleUID) + execArgs = append(execArgs, rd.labelHashes...) + _, err := s.Exec(execArgs...) if err != nil { return err } + + return nil + } + + err := st.SQLStore.WithTransactionalDbSession(ctx, func(sess *sqlstore.DBSession) error { + counter := 0 + + // Create batches of up to 200 items and execute a new delete statement for each batch. + for _, k := range keys { + counter++ + // When a rule ID changes or we hit 200 hashes, issue a statement. + if rowData.ruleOrgID != k.RuleOrgID || rowData.ruleUID != k.RuleUID || len(rowData.labelHashes) >= 200 { + err := execQuery(sess, rowData, placeholdersBuilder.String()) + if err != nil { + return err + } + + // reset our reused data. + rowData.ruleOrgID = k.RuleOrgID + rowData.ruleUID = k.RuleUID + rowData.labelHashes = rowData.labelHashes[:0] + placeholdersBuilder.Reset() + placeholdersBuilder.WriteString("(") + } + + // Accumulate new values. + rowData.labelHashes = append(rowData.labelHashes, k.LabelsHash) + placeholdersBuilder.WriteString("?, ") + } + + // Delete any remaining rows. + if len(rowData.labelHashes) != 0 { + err := execQuery(sess, rowData, placeholdersBuilder.String()) + if err != nil { + return err + } + } + return nil }) + + return err } func (st DBstore) DeleteAlertInstancesByRule(ctx context.Context, key models.AlertRuleKey) error { diff --git a/pkg/services/ngalert/store/instance_database_test.go b/pkg/services/ngalert/store/instance_database_test.go index a0b1a117b72..cb0532dd5d4 100644 --- a/pkg/services/ngalert/store/instance_database_test.go +++ b/pkg/services/ngalert/store/instance_database_test.go @@ -2,6 +2,7 @@ package store_test import ( "context" + "fmt" "testing" "github.com/stretchr/testify/require" @@ -12,6 +13,106 @@ import ( const baseIntervalSeconds = 10 +func BenchmarkAlertInstanceOperations(b *testing.B) { + b.StopTimer() + ctx := context.Background() + _, dbstore := tests.SetupTestEnv(b, baseIntervalSeconds) + + const mainOrgID int64 = 1 + + alertRule := tests.CreateTestAlertRule(b, ctx, dbstore, 60, mainOrgID) + + // Create some instances to write down and then delete. + count := 10_003 + instances := make([]models.AlertInstance, 0, count) + keys := make([]models.AlertInstanceKey, 0, count) + for i := 0; i < count; i++ { + labels := models.InstanceLabels{"test": fmt.Sprint(i)} + _, labelsHash, _ := labels.StringAndHash() + instance := models.AlertInstance{ + AlertInstanceKey: models.AlertInstanceKey{ + RuleOrgID: alertRule.OrgID, + RuleUID: alertRule.UID, + LabelsHash: labelsHash, + }, + CurrentState: models.InstanceStateFiring, + CurrentReason: string(models.InstanceStateError), + Labels: labels, + } + instances = append(instances, instance) + keys = append(keys, instance.AlertInstanceKey) + } + + b.StartTimer() + for i := 0; i < b.N; i++ { + _ = dbstore.SaveAlertInstances(ctx, instances...) + _ = dbstore.DeleteAlertInstances(ctx, keys...) + } +} + +func TestIntegrationAlertInstanceBulkWrite(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + ctx := context.Background() + _, dbstore := tests.SetupTestEnv(t, baseIntervalSeconds) + + orgIDs := []int64{1, 2, 3, 4, 5} + counts := []int{20_000, 200, 503, 0, 1257} + instances := []models.AlertInstance{} + keys := []models.AlertInstanceKey{} + + for i, id := range orgIDs { + alertRule := tests.CreateTestAlertRule(t, ctx, dbstore, 60, id) + + // Create some instances to write down and then delete. + for j := 0; j < counts[i]; j++ { + labels := models.InstanceLabels{"test": fmt.Sprint(j)} + _, labelsHash, _ := labels.StringAndHash() + instance := models.AlertInstance{ + AlertInstanceKey: models.AlertInstanceKey{ + RuleOrgID: alertRule.OrgID, + RuleUID: alertRule.UID, + LabelsHash: labelsHash, + }, + CurrentState: models.InstanceStateFiring, + CurrentReason: string(models.InstanceStateError), + Labels: labels, + } + instances = append(instances, instance) + keys = append(keys, instance.AlertInstanceKey) + } + } + + err := dbstore.SaveAlertInstances(ctx, instances...) + require.NoError(t, err) + t.Log("Finished database write") + + // List our instances. Make sure we have the right count. + for i, id := range orgIDs { + q := &models.ListAlertInstancesQuery{ + RuleOrgID: id, + } + err = dbstore.ListAlertInstances(ctx, q) + require.NoError(t, err) + require.Equal(t, counts[i], len(q.Result), "Org %v: Expected %v instances but got %v", id, counts[i], len(q.Result)) + } + t.Log("Finished database read") + + err = dbstore.DeleteAlertInstances(ctx, keys...) + require.NoError(t, err) + t.Log("Finished database delete") + + for _, id := range orgIDs { + q := &models.ListAlertInstancesQuery{ + RuleOrgID: id, + } + err = dbstore.ListAlertInstances(ctx, q) + require.NoError(t, err) + require.Zero(t, len(q.Result), "Org %v: Deleted instances but still had %v", id, len(q.Result)) + } +} + func TestIntegrationAlertInstanceOperations(t *testing.T) { if testing.Short() { t.Skip("skipping integration test") @@ -34,44 +135,54 @@ func TestIntegrationAlertInstanceOperations(t *testing.T) { require.Equal(t, orgID, alertRule4.OrgID) t.Run("can save and read new alert instance", func(t *testing.T) { - saveCmd := &models.SaveAlertInstanceCommand{ - RuleOrgID: alertRule1.OrgID, - RuleUID: alertRule1.UID, - State: models.InstanceStateFiring, - StateReason: string(models.InstanceStateError), - Labels: models.InstanceLabels{"test": "testValue"}, - } - err := dbstore.SaveAlertInstance(ctx, saveCmd) + labels := models.InstanceLabels{"test": "testValue"} + _, hash, _ := labels.StringAndHash() + instance := models.AlertInstance{ + AlertInstanceKey: models.AlertInstanceKey{ + RuleOrgID: alertRule1.OrgID, + RuleUID: alertRule1.UID, + LabelsHash: hash, + }, + CurrentState: models.InstanceStateFiring, + CurrentReason: string(models.InstanceStateError), + Labels: labels, + } + err := dbstore.SaveAlertInstances(ctx, instance) require.NoError(t, err) getCmd := &models.GetAlertInstanceQuery{ - RuleOrgID: saveCmd.RuleOrgID, - RuleUID: saveCmd.RuleUID, + RuleOrgID: instance.RuleOrgID, + RuleUID: instance.RuleUID, Labels: models.InstanceLabels{"test": "testValue"}, } err = dbstore.GetAlertInstance(ctx, getCmd) require.NoError(t, err) - require.Equal(t, saveCmd.Labels, getCmd.Result.Labels) + require.Equal(t, instance.Labels, getCmd.Result.Labels) require.Equal(t, alertRule1.OrgID, getCmd.Result.RuleOrgID) require.Equal(t, alertRule1.UID, getCmd.Result.RuleUID) - require.Equal(t, saveCmd.StateReason, getCmd.Result.CurrentReason) + require.Equal(t, instance.CurrentReason, getCmd.Result.CurrentReason) }) t.Run("can save and read new alert instance with no labels", func(t *testing.T) { - saveCmd := &models.SaveAlertInstanceCommand{ - RuleOrgID: alertRule2.OrgID, - RuleUID: alertRule2.UID, - State: models.InstanceStateNormal, - Labels: models.InstanceLabels{}, + labels := models.InstanceLabels{} + _, hash, _ := labels.StringAndHash() + instance := models.AlertInstance{ + AlertInstanceKey: models.AlertInstanceKey{ + RuleOrgID: alertRule2.OrgID, + RuleUID: alertRule2.UID, + LabelsHash: hash, + }, + CurrentState: models.InstanceStateNormal, + Labels: labels, } - err := dbstore.SaveAlertInstance(ctx, saveCmd) + err := dbstore.SaveAlertInstances(ctx, instance) require.NoError(t, err) getCmd := &models.GetAlertInstanceQuery{ - RuleOrgID: saveCmd.RuleOrgID, - RuleUID: saveCmd.RuleUID, + RuleOrgID: instance.RuleOrgID, + RuleUID: instance.RuleUID, } err = dbstore.GetAlertInstance(ctx, getCmd) @@ -79,32 +190,42 @@ func TestIntegrationAlertInstanceOperations(t *testing.T) { require.Equal(t, alertRule2.OrgID, getCmd.Result.RuleOrgID) require.Equal(t, alertRule2.UID, getCmd.Result.RuleUID) - require.Equal(t, saveCmd.Labels, getCmd.Result.Labels) + require.Equal(t, instance.Labels, getCmd.Result.Labels) }) t.Run("can save two instances with same org_id, uid and different labels", func(t *testing.T) { - saveCmdOne := &models.SaveAlertInstanceCommand{ - RuleOrgID: alertRule3.OrgID, - RuleUID: alertRule3.UID, - State: models.InstanceStateFiring, - Labels: models.InstanceLabels{"test": "testValue"}, + labels := models.InstanceLabels{"test": "testValue"} + _, hash, _ := labels.StringAndHash() + instance1 := models.AlertInstance{ + AlertInstanceKey: models.AlertInstanceKey{ + RuleOrgID: alertRule3.OrgID, + RuleUID: alertRule3.UID, + LabelsHash: hash, + }, + CurrentState: models.InstanceStateFiring, + Labels: labels, } - err := dbstore.SaveAlertInstance(ctx, saveCmdOne) + err := dbstore.SaveAlertInstances(ctx, instance1) require.NoError(t, err) - saveCmdTwo := &models.SaveAlertInstanceCommand{ - RuleOrgID: saveCmdOne.RuleOrgID, - RuleUID: saveCmdOne.RuleUID, - State: models.InstanceStateFiring, - Labels: models.InstanceLabels{"test": "meow"}, + labels = models.InstanceLabels{"test": "testValue2"} + _, hash, _ = labels.StringAndHash() + instance2 := models.AlertInstance{ + AlertInstanceKey: models.AlertInstanceKey{ + RuleOrgID: instance1.RuleOrgID, + RuleUID: instance1.RuleUID, + LabelsHash: hash, + }, + CurrentState: models.InstanceStateFiring, + Labels: labels, } - err = dbstore.SaveAlertInstance(ctx, saveCmdTwo) + err = dbstore.SaveAlertInstances(ctx, instance2) require.NoError(t, err) listQuery := &models.ListAlertInstancesQuery{ - RuleOrgID: saveCmdOne.RuleOrgID, - RuleUID: saveCmdOne.RuleUID, + RuleOrgID: instance1.RuleOrgID, + RuleUID: instance1.RuleUID, } err = dbstore.ListAlertInstances(ctx, listQuery) @@ -136,24 +257,32 @@ func TestIntegrationAlertInstanceOperations(t *testing.T) { require.Len(t, listQuery.Result, 1) }) - t.Run("update instance with same org_id, uid and different labels", func(t *testing.T) { - saveCmdOne := &models.SaveAlertInstanceCommand{ - RuleOrgID: alertRule4.OrgID, - RuleUID: alertRule4.UID, - State: models.InstanceStateFiring, - Labels: models.InstanceLabels{"test": "testValue"}, + t.Run("update instance with same org_id, uid and different state", func(t *testing.T) { + labels := models.InstanceLabels{"test": "testValue"} + _, hash, _ := labels.StringAndHash() + instance1 := models.AlertInstance{ + AlertInstanceKey: models.AlertInstanceKey{ + RuleOrgID: alertRule4.OrgID, + RuleUID: alertRule4.UID, + LabelsHash: hash, + }, + CurrentState: models.InstanceStateFiring, + Labels: labels, } - err := dbstore.SaveAlertInstance(ctx, saveCmdOne) + err := dbstore.SaveAlertInstances(ctx, instance1) require.NoError(t, err) - saveCmdTwo := &models.SaveAlertInstanceCommand{ - RuleOrgID: saveCmdOne.RuleOrgID, - RuleUID: saveCmdOne.RuleUID, - State: models.InstanceStateNormal, - Labels: models.InstanceLabels{"test": "testValue"}, + instance2 := models.AlertInstance{ + AlertInstanceKey: models.AlertInstanceKey{ + RuleOrgID: alertRule4.OrgID, + RuleUID: instance1.RuleUID, + LabelsHash: instance1.LabelsHash, + }, + CurrentState: models.InstanceStateNormal, + Labels: instance1.Labels, } - err = dbstore.SaveAlertInstance(ctx, saveCmdTwo) + err = dbstore.SaveAlertInstances(ctx, instance2) require.NoError(t, err) listQuery := &models.ListAlertInstancesQuery{ @@ -166,9 +295,9 @@ func TestIntegrationAlertInstanceOperations(t *testing.T) { require.Len(t, listQuery.Result, 1) - require.Equal(t, saveCmdTwo.RuleOrgID, listQuery.Result[0].RuleOrgID) - require.Equal(t, saveCmdTwo.RuleUID, listQuery.Result[0].RuleUID) - require.Equal(t, saveCmdTwo.Labels, listQuery.Result[0].Labels) - require.Equal(t, saveCmdTwo.State, listQuery.Result[0].CurrentState) + require.Equal(t, instance2.RuleOrgID, listQuery.Result[0].RuleOrgID) + require.Equal(t, instance2.RuleUID, listQuery.Result[0].RuleUID) + require.Equal(t, instance2.Labels, listQuery.Result[0].Labels) + require.Equal(t, instance2.CurrentState, listQuery.Result[0].CurrentState) }) } diff --git a/pkg/services/ngalert/store/testing.go b/pkg/services/ngalert/store/testing.go index 0a1f4ee82e6..656a293e6c7 100644 --- a/pkg/services/ngalert/store/testing.go +++ b/pkg/services/ngalert/store/testing.go @@ -68,7 +68,7 @@ mainloop: } if existing == nil { folders = append(folders, &models2.Folder{ - Id: rand.Int63(), + Id: rand.Int63n(1500), Uid: r.NamespaceUID, Title: "TEST-FOLDER-" + util.GenerateShortUID(), }) @@ -426,15 +426,17 @@ func (f *FakeInstanceStore) ListAlertInstances(_ context.Context, q *models.List f.RecordedOps = append(f.RecordedOps, *q) return nil } -func (f *FakeInstanceStore) SaveAlertInstance(_ context.Context, q *models.SaveAlertInstanceCommand) error { +func (f *FakeInstanceStore) SaveAlertInstances(_ context.Context, q ...models.AlertInstance) error { f.mtx.Lock() defer f.mtx.Unlock() - f.RecordedOps = append(f.RecordedOps, *q) + for _, inst := range q { + f.RecordedOps = append(f.RecordedOps, inst) + } return nil } func (f *FakeInstanceStore) FetchOrgIds(_ context.Context) ([]int64, error) { return []int64{}, nil } -func (f *FakeInstanceStore) DeleteAlertInstance(_ context.Context, _ int64, _, _ string) error { +func (f *FakeInstanceStore) DeleteAlertInstances(_ context.Context, _ ...models.AlertInstanceKey) error { return nil } func (f *FakeInstanceStore) DeleteAlertInstancesByRule(ctx context.Context, key models.AlertRuleKey) error { diff --git a/pkg/services/ngalert/tests/util.go b/pkg/services/ngalert/tests/util.go index 348db1797e9..b31a9b3d22e 100644 --- a/pkg/services/ngalert/tests/util.go +++ b/pkg/services/ngalert/tests/util.go @@ -37,15 +37,15 @@ import ( ) // SetupTestEnv initializes a store to used by the tests. -func SetupTestEnv(t *testing.T, baseInterval time.Duration) (*ngalert.AlertNG, *store.DBstore) { - t.Helper() +func SetupTestEnv(tb testing.TB, baseInterval time.Duration) (*ngalert.AlertNG, *store.DBstore) { + tb.Helper() origNewGuardian := guardian.New guardian.MockDashboardGuardian(&guardian.FakeDashboardGuardian{ CanSaveValue: true, CanViewValue: true, CanAdminValue: true, }) - t.Cleanup(func() { + tb.Cleanup(func() { guardian.New = origNewGuardian }) @@ -58,8 +58,8 @@ func SetupTestEnv(t *testing.T, baseInterval time.Duration) (*ngalert.AlertNG, * *cfg.UnifiedAlerting.Enabled = true m := metrics.NewNGAlert(prometheus.NewRegistry()) - sqlStore := sqlstore.InitTestDB(t) - secretsService := secretsManager.SetupTestService(t, database.ProvideSecretsStore(sqlStore)) + sqlStore := sqlstore.InitTestDB(tb) + secretsService := secretsManager.SetupTestService(tb, database.ProvideSecretsStore(sqlStore)) dashboardStore := databasestore.ProvideDashboardStore(sqlStore, featuremgmt.WithFeatures()) ac := acmock.New() @@ -83,7 +83,7 @@ func SetupTestEnv(t *testing.T, baseInterval time.Duration) (*ngalert.AlertNG, * cfg, nil, nil, routing.NewRouteRegister(), sqlStore, nil, nil, nil, nil, secretsService, nil, m, folderService, ac, &dashboards.FakeDashboardService{}, nil, bus, ac, ) - require.NoError(t, err) + require.NoError(tb, err) return ng, &store.DBstore{ SQLStore: ng.SQLStore, Cfg: setting.UnifiedAlertingSettings{ @@ -96,11 +96,11 @@ func SetupTestEnv(t *testing.T, baseInterval time.Duration) (*ngalert.AlertNG, * } // CreateTestAlertRule creates a dummy alert definition to be used by the tests. -func CreateTestAlertRule(t *testing.T, ctx context.Context, dbstore *store.DBstore, intervalSeconds int64, orgID int64) *models.AlertRule { +func CreateTestAlertRule(t testing.TB, ctx context.Context, dbstore *store.DBstore, intervalSeconds int64, orgID int64) *models.AlertRule { return CreateTestAlertRuleWithLabels(t, ctx, dbstore, intervalSeconds, orgID, nil) } -func CreateTestAlertRuleWithLabels(t *testing.T, ctx context.Context, dbstore *store.DBstore, intervalSeconds int64, orgID int64, labels map[string]string) *models.AlertRule { +func CreateTestAlertRuleWithLabels(t testing.TB, ctx context.Context, dbstore *store.DBstore, intervalSeconds int64, orgID int64, labels map[string]string) *models.AlertRule { ruleGroup := fmt.Sprintf("ruleGroup-%s", util.GenerateShortUID()) folderUID := "namespace" user := &user.SignedInUser{ diff --git a/pkg/services/sqlstore/migrator/postgres_dialect.go b/pkg/services/sqlstore/migrator/postgres_dialect.go index b2b0e53884c..adc03e3e856 100644 --- a/pkg/services/sqlstore/migrator/postgres_dialect.go +++ b/pkg/services/sqlstore/migrator/postgres_dialect.go @@ -235,7 +235,6 @@ func (db *PostgresDialect) UpsertMultipleSQL(tableName string, keyCols, updateCo } columnsStr := strings.Builder{} onConflictStr := strings.Builder{} - colPlaceHoldersStr := strings.Builder{} setStr := strings.Builder{} const separator = ", " @@ -246,8 +245,7 @@ func (db *PostgresDialect) UpsertMultipleSQL(tableName string, keyCols, updateCo } columnsStr.WriteString(fmt.Sprintf("%s%s", db.Quote(c), separatorVar)) - colPlaceHoldersStr.WriteString(fmt.Sprintf("?%s", separatorVar)) - setStr.WriteString(fmt.Sprintf("%s=excluded.%s%s", db.Quote(c), db.Quote(c), separatorVar)) + setStr.WriteString(fmt.Sprintf("%s=EXCLUDED.%s%s", db.Quote(c), db.Quote(c), separatorVar)) } separatorVar = separator @@ -260,21 +258,36 @@ func (db *PostgresDialect) UpsertMultipleSQL(tableName string, keyCols, updateCo valuesStr := strings.Builder{} separatorVar = separator - colPlaceHolders := colPlaceHoldersStr.String() + nextPlaceHolder := 1 + for i := 0; i < count; i++ { if i == count-1 { separatorVar = "" } + + colPlaceHoldersStr := strings.Builder{} + placeHolderSep := separator + for j := 1; j <= len(updateCols); j++ { + if j == len(updateCols) { + placeHolderSep = "" + } + placeHolder := fmt.Sprintf("$%v%s", nextPlaceHolder, placeHolderSep) + nextPlaceHolder++ + colPlaceHoldersStr.WriteString(placeHolder) + } + colPlaceHolders := colPlaceHoldersStr.String() + valuesStr.WriteString(fmt.Sprintf("(%s)%s", colPlaceHolders, separatorVar)) } - s := fmt.Sprintf(`INSERT INTO %s (%s) VALUES %s ON CONFLICT(%s) DO UPDATE SET %s`, + s := fmt.Sprintf(`INSERT INTO %s (%s) VALUES %s ON CONFLICT (%s) DO UPDATE SET %s;`, tableName, columnsStr.String(), valuesStr.String(), onConflictStr.String(), setStr.String(), ) + return s, nil } diff --git a/pkg/services/sqlstore/migrator/upsert_test.go b/pkg/services/sqlstore/migrator/upsert_test.go index 9c39673d8d6..11312116f7d 100644 --- a/pkg/services/sqlstore/migrator/upsert_test.go +++ b/pkg/services/sqlstore/migrator/upsert_test.go @@ -23,7 +23,7 @@ func TestUpsertMultiple(t *testing.T) { []string{"key1", "key2", "val1", "val2"}, 1, false, - "INSERT INTO test_table (\"key1\", \"key2\", \"val1\", \"val2\") VALUES (?, ?, ?, ?) ON CONFLICT(\"key1\", \"key2\") DO UPDATE SET \"key1\"=excluded.\"key1\", \"key2\"=excluded.\"key2\", \"val1\"=excluded.\"val1\", \"val2\"=excluded.\"val2\"", + "INSERT INTO test_table (\"key1\", \"key2\", \"val1\", \"val2\") VALUES ($1, $2, $3, $4) ON CONFLICT (\"key1\", \"key2\") DO UPDATE SET \"key1\"=EXCLUDED.\"key1\", \"key2\"=EXCLUDED.\"key2\", \"val1\"=EXCLUDED.\"val1\", \"val2\"=EXCLUDED.\"val2\";", "INSERT INTO test_table (`key1`, `key2`, `val1`, `val2`) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE `key1`=VALUES(`key1`), `key2`=VALUES(`key2`), `val1`=VALUES(`val1`), `val2`=VALUES(`val2`)", "INSERT INTO test_table (`key1`, `key2`, `val1`, `val2`) VALUES (?, ?, ?, ?) ON CONFLICT(`key1`, `key2`) DO UPDATE SET `key1`=excluded.`key1`, `key2`=excluded.`key2`, `val1`=excluded.`val1`, `val2`=excluded.`val2`", }, @@ -33,7 +33,7 @@ func TestUpsertMultiple(t *testing.T) { []string{"key1", "key2", "val1", "val2"}, 2, false, - "INSERT INTO test_table (\"key1\", \"key2\", \"val1\", \"val2\") VALUES (?, ?, ?, ?), (?, ?, ?, ?) ON CONFLICT(\"key1\", \"key2\") DO UPDATE SET \"key1\"=excluded.\"key1\", \"key2\"=excluded.\"key2\", \"val1\"=excluded.\"val1\", \"val2\"=excluded.\"val2\"", + "INSERT INTO test_table (\"key1\", \"key2\", \"val1\", \"val2\") VALUES ($1, $2, $3, $4), ($5, $6, $7, $8) ON CONFLICT (\"key1\", \"key2\") DO UPDATE SET \"key1\"=EXCLUDED.\"key1\", \"key2\"=EXCLUDED.\"key2\", \"val1\"=EXCLUDED.\"val1\", \"val2\"=EXCLUDED.\"val2\";", "INSERT INTO test_table (`key1`, `key2`, `val1`, `val2`) VALUES (?, ?, ?, ?), (?, ?, ?, ?) ON DUPLICATE KEY UPDATE `key1`=VALUES(`key1`), `key2`=VALUES(`key2`), `val1`=VALUES(`val1`), `val2`=VALUES(`val2`)", "INSERT INTO test_table (`key1`, `key2`, `val1`, `val2`) VALUES (?, ?, ?, ?), (?, ?, ?, ?) ON CONFLICT(`key1`, `key2`) DO UPDATE SET `key1`=excluded.`key1`, `key2`=excluded.`key2`, `val1`=excluded.`val1`, `val2`=excluded.`val2`", },