From 81c45bfe449584dc7ff50e4fabd5d0c6568999b5 Mon Sep 17 00:00:00 2001 From: Alexander Weaver Date: Fri, 12 Jan 2024 14:05:04 -0600 Subject: [PATCH] Annotations: Split cleanup into separate queries and deletes to avoid deadlocks on MySQL (#80329) * Split subquery when cleaning annotations * update comment * Raise batch size, now that we pay attention to it * Iterate in batches * Separate cancellable batch implementation to allow for multi-statement callbacks, add overload for single-statement use * Use split-out utility in outer batching loop so it respects context cancellation * guard against empty queries * Use SQL parameters * Use same approach for tags * drop unused function * Work around parameter limit on sqlite for large batches * Bulk insert test data in DB * Refactor test to customise test data creation * Add test for catching SQLITE_MAX_VARIABLE_NUMBER limit * Turn annotation cleanup test to integration tests * lint --------- Co-authored-by: Sofia Papagiannaki <1632407+papagian@users.noreply.github.com> --- .../annotationsimpl/cleanup_test.go | 129 ++++++++++++----- .../annotations/annotationsimpl/xorm_store.go | 130 ++++++++++++++---- pkg/services/cleanup/cleanup.go | 2 +- 3 files changed, 199 insertions(+), 62 deletions(-) diff --git a/pkg/services/annotations/annotationsimpl/cleanup_test.go b/pkg/services/annotations/annotationsimpl/cleanup_test.go index 3b94ef8fdd5..278bb48e365 100644 --- a/pkg/services/annotations/annotationsimpl/cleanup_test.go +++ b/pkg/services/annotations/annotationsimpl/cleanup_test.go @@ -2,6 +2,7 @@ package annotationsimpl import ( "context" + "errors" "testing" "time" @@ -14,31 +15,30 @@ import ( "github.com/grafana/grafana/pkg/setting" ) -func TestAnnotationCleanUp(t *testing.T) { - fakeSQL := db.InitTestDB(t) - - t.Cleanup(func() { - err := fakeSQL.WithDbSession(context.Background(), func(session *db.Session) error { - _, err := session.Exec("DELETE FROM annotation") - return err - }) - assert.NoError(t, err) - }) +func TestIntegrationAnnotationCleanUp(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test") + } - createTestAnnotations(t, fakeSQL, 21, 6) - assertAnnotationCount(t, fakeSQL, "", 21) - assertAnnotationTagCount(t, fakeSQL, 42) + fakeSQL := db.InitTestDB(t) tests := []struct { - name string - cfg *setting.Cfg - alertAnnotationCount int64 - dashboardAnnotationCount int64 - APIAnnotationCount int64 - affectedAnnotations int64 + name string + createAnnotationsNum int + createOldAnnotationsNum int + + cfg *setting.Cfg + alertAnnotationCount int64 + annotationCleanupJobBatchSize int + dashboardAnnotationCount int64 + APIAnnotationCount int64 + affectedAnnotations int64 }{ { - name: "default settings should not delete any annotations", + name: "default settings should not delete any annotations", + createAnnotationsNum: 21, + createOldAnnotationsNum: 6, + annotationCleanupJobBatchSize: 1, cfg: &setting.Cfg{ AlertingAnnotationCleanupSetting: settingsFn(0, 0), DashboardAnnotationCleanupSettings: settingsFn(0, 0), @@ -50,7 +50,10 @@ func TestAnnotationCleanUp(t *testing.T) { affectedAnnotations: 0, }, { - name: "should remove annotations created before cut off point", + name: "should remove annotations created before cut off point", + createAnnotationsNum: 21, + createOldAnnotationsNum: 6, + annotationCleanupJobBatchSize: 1, cfg: &setting.Cfg{ AlertingAnnotationCleanupSetting: settingsFn(time.Hour*48, 0), DashboardAnnotationCleanupSettings: settingsFn(time.Hour*48, 0), @@ -62,7 +65,10 @@ func TestAnnotationCleanUp(t *testing.T) { affectedAnnotations: 6, }, { - name: "should only keep three annotations", + name: "should only keep three annotations", + createAnnotationsNum: 15, + createOldAnnotationsNum: 6, + annotationCleanupJobBatchSize: 1, cfg: &setting.Cfg{ AlertingAnnotationCleanupSetting: settingsFn(0, 3), DashboardAnnotationCleanupSettings: settingsFn(0, 3), @@ -74,7 +80,10 @@ func TestAnnotationCleanUp(t *testing.T) { affectedAnnotations: 6, }, { - name: "running the max count delete again should not remove any annotations", + name: "running the max count delete again should not remove any annotations", + createAnnotationsNum: 9, + createOldAnnotationsNum: 6, + annotationCleanupJobBatchSize: 1, cfg: &setting.Cfg{ AlertingAnnotationCleanupSetting: settingsFn(0, 3), DashboardAnnotationCleanupSettings: settingsFn(0, 3), @@ -85,12 +94,40 @@ func TestAnnotationCleanUp(t *testing.T) { APIAnnotationCount: 3, affectedAnnotations: 0, }, + { + name: "should not fail if batch size is larger than SQLITE_MAX_VARIABLE_NUMBER for SQLite >= 3.32.0", + createAnnotationsNum: 40003, + createOldAnnotationsNum: 0, + annotationCleanupJobBatchSize: 32767, + cfg: &setting.Cfg{ + AlertingAnnotationCleanupSetting: settingsFn(0, 1), + DashboardAnnotationCleanupSettings: settingsFn(0, 1), + APIAnnotationCleanupSettings: settingsFn(0, 1), + }, + alertAnnotationCount: 1, + dashboardAnnotationCount: 1, + APIAnnotationCount: 1, + affectedAnnotations: 40000, + }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { + createTestAnnotations(t, fakeSQL, test.createAnnotationsNum, test.createOldAnnotationsNum) + assertAnnotationCount(t, fakeSQL, "", int64(test.createAnnotationsNum)) + assertAnnotationTagCount(t, fakeSQL, 2*int64(test.createAnnotationsNum)) + + t.Cleanup(func() { + err := fakeSQL.WithDbSession(context.Background(), func(session *db.Session) error { + _, deleteAnnotationErr := session.Exec("DELETE FROM annotation") + _, deleteAnnotationTagErr := session.Exec("DELETE FROM annotation_tag") + return errors.Join(deleteAnnotationErr, deleteAnnotationTagErr) + }) + assert.NoError(t, err) + }) + cfg := setting.NewCfg() - cfg.AnnotationCleanupJobBatchSize = 1 + cfg.AnnotationCleanupJobBatchSize = int64(test.annotationCleanupJobBatchSize) cleaner := ProvideCleanupService(fakeSQL, cfg) affectedAnnotations, affectedAnnotationTags, err := cleaner.Run(context.Background(), test.cfg) require.NoError(t, err) @@ -111,7 +148,11 @@ func TestAnnotationCleanUp(t *testing.T) { } } -func TestOldAnnotationsAreDeletedFirst(t *testing.T) { +func TestIntegrationOldAnnotationsAreDeletedFirst(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test") + } + fakeSQL := db.InitTestDB(t) t.Cleanup(func() { @@ -193,8 +234,11 @@ func createTestAnnotations(t *testing.T, store db.DB, expectedCount int, oldAnno cutoffDate := time.Now() + newAnnotations := make([]*annotations.Item, 0, expectedCount) + newAnnotationTags := make([]*annotationTag, 0, 2*expectedCount) for i := 0; i < expectedCount; i++ { a := &annotations.Item{ + ID: int64(i + 1), DashboardID: 1, OrgID: 1, UserID: 1, @@ -222,20 +266,29 @@ func createTestAnnotations(t *testing.T, store db.DB, expectedCount int, oldAnno a.Created = cutoffDate.AddDate(-10, 0, -10).UnixNano() / int64(time.Millisecond) } - err := store.WithDbSession(context.Background(), func(sess *db.Session) error { - _, err := sess.Insert(a) - require.NoError(t, err, "should be able to save annotation", err) - - // mimick the SQL annotation Save logic by writing records to the annotation_tag table - // we need to ensure they get deleted when we clean up annotations - for tagID := range []int{1, 2} { - _, err = sess.Exec("INSERT INTO annotation_tag (annotation_id, tag_id) VALUES(?,?)", a.ID, tagID) - require.NoError(t, err, "should be able to save annotation tag ID", err) - } - return err - }) - require.NoError(t, err) + newAnnotations = append(newAnnotations, a) + newAnnotationTags = append(newAnnotationTags, &annotationTag{AnnotationID: a.ID, TagID: 1}, &annotationTag{AnnotationID: a.ID, TagID: 2}) } + + err := store.WithDbSession(context.Background(), func(sess *db.Session) error { + batchsize := 500 + for i := 0; i < len(newAnnotations); i += batchsize { + _, err := sess.InsertMulti(newAnnotations[i:min(i+batchsize, len(newAnnotations))]) + require.NoError(t, err) + } + return nil + }) + require.NoError(t, err) + + err = store.WithDbSession(context.Background(), func(sess *db.Session) error { + batchsize := 500 + for i := 0; i < len(newAnnotationTags); i += batchsize { + _, err := sess.InsertMulti(newAnnotationTags[i:min(i+batchsize, len(newAnnotationTags))]) + require.NoError(t, err) + } + return nil + }) + require.NoError(t, err) } func settingsFn(maxAge time.Duration, maxCount int64) setting.AnnotationCleanupSettings { diff --git a/pkg/services/annotations/annotationsimpl/xorm_store.go b/pkg/services/annotations/annotationsimpl/xorm_store.go index e7072f2f052..2525601fed0 100644 --- a/pkg/services/annotations/annotationsimpl/xorm_store.go +++ b/pkg/services/annotations/annotationsimpl/xorm_store.go @@ -10,6 +10,7 @@ import ( "time" "github.com/grafana/grafana/pkg/services/annotations/accesscontrol" + "github.com/grafana/grafana/pkg/services/sqlstore/migrator" "github.com/grafana/grafana/pkg/infra/db" "github.com/grafana/grafana/pkg/infra/log" @@ -519,10 +520,23 @@ func (r *xormRepositoryImpl) CleanAnnotations(ctx context.Context, cfg setting.A var totalAffected int64 if cfg.MaxAge > 0 { cutoffDate := timeNow().Add(-cfg.MaxAge).UnixNano() / int64(time.Millisecond) - deleteQuery := `DELETE FROM annotation WHERE id IN (SELECT id FROM (SELECT id FROM annotation WHERE %s AND created < %v ORDER BY id DESC %s) a)` - sql := fmt.Sprintf(deleteQuery, annotationType, cutoffDate, r.db.GetDialect().Limit(r.cfg.AnnotationCleanupJobBatchSize)) + // Single-statement approaches, specifically ones using batched sub-queries, seem to deadlock with concurrent inserts on MySQL. + // We have a bounded batch size, so work around this by first loading the IDs into memory and allowing any locks to flush inside each batch. + // This may under-delete when concurrent inserts happen, but any such annotations will simply be cleaned on the next cycle. + // + // We execute the following batched operation repeatedly until either we run out of objects, the context is cancelled, or there is an error. + affected, err := untilDoneOrCancelled(ctx, func() (int64, error) { + cond := fmt.Sprintf(`%s AND created < %v ORDER BY id DESC %s`, annotationType, cutoffDate, r.db.GetDialect().Limit(r.cfg.AnnotationCleanupJobBatchSize)) + ids, err := r.fetchIDs(ctx, "annotation", cond) + if err != nil { + return 0, err + } + r.log.Error("Annotations to clean by time", "count", len(ids), "ids", ids, "cond", cond, "err", err) - affected, err := r.executeUntilDoneOrCancelled(ctx, sql) + x, y := r.deleteByIDs(ctx, "annotation", ids) + r.log.Error("cleaned annotations by time", "count", len(ids), "affected", x, "err", y) + return x, y + }) totalAffected += affected if err != nil { return totalAffected, err @@ -530,41 +544,111 @@ func (r *xormRepositoryImpl) CleanAnnotations(ctx context.Context, cfg setting.A } if cfg.MaxCount > 0 { - deleteQuery := `DELETE FROM annotation WHERE id IN (SELECT id FROM (SELECT id FROM annotation WHERE %s ORDER BY id DESC %s) a)` - sql := fmt.Sprintf(deleteQuery, annotationType, r.db.GetDialect().LimitOffset(r.cfg.AnnotationCleanupJobBatchSize, cfg.MaxCount)) - affected, err := r.executeUntilDoneOrCancelled(ctx, sql) + // Similar strategy as the above cleanup process, to avoid deadlocks. + affected, err := untilDoneOrCancelled(ctx, func() (int64, error) { + cond := fmt.Sprintf(`%s ORDER BY id DESC %s`, annotationType, r.db.GetDialect().LimitOffset(r.cfg.AnnotationCleanupJobBatchSize, cfg.MaxCount)) + ids, err := r.fetchIDs(ctx, "annotation", cond) + if err != nil { + return 0, err + } + r.log.Error("Annotations to clean by count", "count", len(ids), "ids", ids, "cond", cond, "err", err) + + x, y := r.deleteByIDs(ctx, "annotation", ids) + r.log.Error("cleaned annotations by count", "count", len(ids), "affected", x, "err", y) + return x, y + }) totalAffected += affected - return totalAffected, err + if err != nil { + return totalAffected, err + } } return totalAffected, nil } func (r *xormRepositoryImpl) CleanOrphanedAnnotationTags(ctx context.Context) (int64, error) { - deleteQuery := `DELETE FROM annotation_tag WHERE id IN ( SELECT id FROM (SELECT id FROM annotation_tag WHERE NOT EXISTS (SELECT 1 FROM annotation a WHERE annotation_id = a.id) %s) a)` - sql := fmt.Sprintf(deleteQuery, r.db.GetDialect().Limit(r.cfg.AnnotationCleanupJobBatchSize)) - return r.executeUntilDoneOrCancelled(ctx, sql) + return untilDoneOrCancelled(ctx, func() (int64, error) { + cond := fmt.Sprintf(`NOT EXISTS (SELECT 1 FROM annotation a WHERE annotation_id = a.id) %s`, r.db.GetDialect().Limit(r.cfg.AnnotationCleanupJobBatchSize)) + ids, err := r.fetchIDs(ctx, "annotation_tag", cond) + if err != nil { + return 0, err + } + r.log.Error("Tags to clean", "count", len(ids), "ids", ids, "cond", cond, "err", err) + + x, y := r.deleteByIDs(ctx, "annotation_tag", ids) + r.log.Error("cleaned tags", "count", len(ids), "affected", x, "err", y) + return x, y + }) +} + +func (r *xormRepositoryImpl) fetchIDs(ctx context.Context, table, condition string) ([]int64, error) { + sql := fmt.Sprintf(`SELECT id FROM %s`, table) + if condition == "" { + return nil, fmt.Errorf("condition must be supplied; cannot fetch IDs from entire table") + } + sql += fmt.Sprintf(` WHERE %s`, condition) + ids := make([]int64, 0) + err := r.db.WithDbSession(ctx, func(session *db.Session) error { + return session.SQL(sql).Find(&ids) + }) + return ids, err } -func (r *xormRepositoryImpl) executeUntilDoneOrCancelled(ctx context.Context, sql string) (int64, error) { +func (r *xormRepositoryImpl) deleteByIDs(ctx context.Context, table string, ids []int64) (int64, error) { + if len(ids) == 0 { + return 0, nil + } + + sql := "" + args := make([]any, 0) + + // SQLite has a parameter limit of 999. + // If the batch size is bigger than that, and we're on SQLite, we have to put the IDs directly into the statement. + const sqliteParameterLimit = 999 + if r.db.GetDBType() == migrator.SQLite && r.cfg.AnnotationCleanupJobBatchSize > sqliteParameterLimit { + values := fmt.Sprint(ids[0]) + for _, v := range ids[1:] { + values = fmt.Sprintf("%s, %d", values, v) + } + sql = fmt.Sprintf(`DELETE FROM %s WHERE id IN (%s)`, table, values) + } else { + placeholders := "?" + strings.Repeat(",?", len(ids)-1) + sql = fmt.Sprintf(`DELETE FROM %s WHERE id IN (%s)`, table, placeholders) + args = asAny(ids) + } + + var affected int64 + err := r.db.WithDbSession(ctx, func(session *db.Session) error { + res, err := session.Exec(append([]any{sql}, args...)...) + if err != nil { + return err + } + affected, err = res.RowsAffected() + return err + }) + return affected, err +} + +func asAny(vs []int64) []any { + r := make([]any, len(vs)) + for i, v := range vs { + r[i] = v + } + return r +} + +// untilDoneOrCancelled repeatedly executes batched work until that work is either done (i.e., returns zero affected objects), +// a batch produces an error, or the provided context is cancelled. +// The work to be done is given as a callback that returns the number of affected objects for each batch, plus that batch's errors. +func untilDoneOrCancelled(ctx context.Context, batchWork func() (int64, error)) (int64, error) { var totalAffected int64 for { select { case <-ctx.Done(): return totalAffected, ctx.Err() default: - var affected int64 - err := r.db.WithDbSession(ctx, func(session *db.Session) error { - res, err := session.Exec(sql) - if err != nil { - return err - } - - affected, err = res.RowsAffected() - totalAffected += affected - - return err - }) + affected, err := batchWork() + totalAffected += affected if err != nil { return totalAffected, err } diff --git a/pkg/services/cleanup/cleanup.go b/pkg/services/cleanup/cleanup.go index a96d5f5a702..d3e36eda27c 100644 --- a/pkg/services/cleanup/cleanup.go +++ b/pkg/services/cleanup/cleanup.go @@ -74,7 +74,7 @@ func (j cleanUpJob) String() string { func (srv *CleanUpService) Run(ctx context.Context) error { srv.cleanUpTmpFiles(ctx) - ticker := time.NewTicker(time.Minute * 10) + ticker := time.NewTicker(time.Minute * 1) for { select { case <-ticker.C: