Annotations: Split cleanup into separate queries and deletes to avoid deadlocks on MySQL (#80329)

* Split subquery when cleaning annotations

* update comment

* Raise batch size, now that we pay attention to it

* Iterate in batches

* Separate cancellable batch implementation to allow for multi-statement callbacks, add overload for single-statement use

* Use split-out utility in outer batching loop so it respects context cancellation

* guard against empty queries

* Use SQL parameters

* Use same approach for tags

* drop unused function

* Work around parameter limit on sqlite for large batches

* Bulk insert test data in DB

* Refactor test to customise test data creation

* Add test for catching SQLITE_MAX_VARIABLE_NUMBER limit

* Turn annotation cleanup test to integration tests

* lint

---------

Co-authored-by: Sofia Papagiannaki <1632407+papagian@users.noreply.github.com>
pull/80487/head
Alexander Weaver 1 year ago committed by GitHub
parent f84c8f6853
commit 81c45bfe44
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 129
      pkg/services/annotations/annotationsimpl/cleanup_test.go
  2. 130
      pkg/services/annotations/annotationsimpl/xorm_store.go
  3. 2
      pkg/services/cleanup/cleanup.go

@ -2,6 +2,7 @@ package annotationsimpl
import (
"context"
"errors"
"testing"
"time"
@ -14,31 +15,30 @@ import (
"github.com/grafana/grafana/pkg/setting"
)
func TestAnnotationCleanUp(t *testing.T) {
fakeSQL := db.InitTestDB(t)
t.Cleanup(func() {
err := fakeSQL.WithDbSession(context.Background(), func(session *db.Session) error {
_, err := session.Exec("DELETE FROM annotation")
return err
})
assert.NoError(t, err)
})
func TestIntegrationAnnotationCleanUp(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test")
}
createTestAnnotations(t, fakeSQL, 21, 6)
assertAnnotationCount(t, fakeSQL, "", 21)
assertAnnotationTagCount(t, fakeSQL, 42)
fakeSQL := db.InitTestDB(t)
tests := []struct {
name string
cfg *setting.Cfg
alertAnnotationCount int64
dashboardAnnotationCount int64
APIAnnotationCount int64
affectedAnnotations int64
name string
createAnnotationsNum int
createOldAnnotationsNum int
cfg *setting.Cfg
alertAnnotationCount int64
annotationCleanupJobBatchSize int
dashboardAnnotationCount int64
APIAnnotationCount int64
affectedAnnotations int64
}{
{
name: "default settings should not delete any annotations",
name: "default settings should not delete any annotations",
createAnnotationsNum: 21,
createOldAnnotationsNum: 6,
annotationCleanupJobBatchSize: 1,
cfg: &setting.Cfg{
AlertingAnnotationCleanupSetting: settingsFn(0, 0),
DashboardAnnotationCleanupSettings: settingsFn(0, 0),
@ -50,7 +50,10 @@ func TestAnnotationCleanUp(t *testing.T) {
affectedAnnotations: 0,
},
{
name: "should remove annotations created before cut off point",
name: "should remove annotations created before cut off point",
createAnnotationsNum: 21,
createOldAnnotationsNum: 6,
annotationCleanupJobBatchSize: 1,
cfg: &setting.Cfg{
AlertingAnnotationCleanupSetting: settingsFn(time.Hour*48, 0),
DashboardAnnotationCleanupSettings: settingsFn(time.Hour*48, 0),
@ -62,7 +65,10 @@ func TestAnnotationCleanUp(t *testing.T) {
affectedAnnotations: 6,
},
{
name: "should only keep three annotations",
name: "should only keep three annotations",
createAnnotationsNum: 15,
createOldAnnotationsNum: 6,
annotationCleanupJobBatchSize: 1,
cfg: &setting.Cfg{
AlertingAnnotationCleanupSetting: settingsFn(0, 3),
DashboardAnnotationCleanupSettings: settingsFn(0, 3),
@ -74,7 +80,10 @@ func TestAnnotationCleanUp(t *testing.T) {
affectedAnnotations: 6,
},
{
name: "running the max count delete again should not remove any annotations",
name: "running the max count delete again should not remove any annotations",
createAnnotationsNum: 9,
createOldAnnotationsNum: 6,
annotationCleanupJobBatchSize: 1,
cfg: &setting.Cfg{
AlertingAnnotationCleanupSetting: settingsFn(0, 3),
DashboardAnnotationCleanupSettings: settingsFn(0, 3),
@ -85,12 +94,40 @@ func TestAnnotationCleanUp(t *testing.T) {
APIAnnotationCount: 3,
affectedAnnotations: 0,
},
{
name: "should not fail if batch size is larger than SQLITE_MAX_VARIABLE_NUMBER for SQLite >= 3.32.0",
createAnnotationsNum: 40003,
createOldAnnotationsNum: 0,
annotationCleanupJobBatchSize: 32767,
cfg: &setting.Cfg{
AlertingAnnotationCleanupSetting: settingsFn(0, 1),
DashboardAnnotationCleanupSettings: settingsFn(0, 1),
APIAnnotationCleanupSettings: settingsFn(0, 1),
},
alertAnnotationCount: 1,
dashboardAnnotationCount: 1,
APIAnnotationCount: 1,
affectedAnnotations: 40000,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
createTestAnnotations(t, fakeSQL, test.createAnnotationsNum, test.createOldAnnotationsNum)
assertAnnotationCount(t, fakeSQL, "", int64(test.createAnnotationsNum))
assertAnnotationTagCount(t, fakeSQL, 2*int64(test.createAnnotationsNum))
t.Cleanup(func() {
err := fakeSQL.WithDbSession(context.Background(), func(session *db.Session) error {
_, deleteAnnotationErr := session.Exec("DELETE FROM annotation")
_, deleteAnnotationTagErr := session.Exec("DELETE FROM annotation_tag")
return errors.Join(deleteAnnotationErr, deleteAnnotationTagErr)
})
assert.NoError(t, err)
})
cfg := setting.NewCfg()
cfg.AnnotationCleanupJobBatchSize = 1
cfg.AnnotationCleanupJobBatchSize = int64(test.annotationCleanupJobBatchSize)
cleaner := ProvideCleanupService(fakeSQL, cfg)
affectedAnnotations, affectedAnnotationTags, err := cleaner.Run(context.Background(), test.cfg)
require.NoError(t, err)
@ -111,7 +148,11 @@ func TestAnnotationCleanUp(t *testing.T) {
}
}
func TestOldAnnotationsAreDeletedFirst(t *testing.T) {
func TestIntegrationOldAnnotationsAreDeletedFirst(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test")
}
fakeSQL := db.InitTestDB(t)
t.Cleanup(func() {
@ -193,8 +234,11 @@ func createTestAnnotations(t *testing.T, store db.DB, expectedCount int, oldAnno
cutoffDate := time.Now()
newAnnotations := make([]*annotations.Item, 0, expectedCount)
newAnnotationTags := make([]*annotationTag, 0, 2*expectedCount)
for i := 0; i < expectedCount; i++ {
a := &annotations.Item{
ID: int64(i + 1),
DashboardID: 1,
OrgID: 1,
UserID: 1,
@ -222,20 +266,29 @@ func createTestAnnotations(t *testing.T, store db.DB, expectedCount int, oldAnno
a.Created = cutoffDate.AddDate(-10, 0, -10).UnixNano() / int64(time.Millisecond)
}
err := store.WithDbSession(context.Background(), func(sess *db.Session) error {
_, err := sess.Insert(a)
require.NoError(t, err, "should be able to save annotation", err)
// mimick the SQL annotation Save logic by writing records to the annotation_tag table
// we need to ensure they get deleted when we clean up annotations
for tagID := range []int{1, 2} {
_, err = sess.Exec("INSERT INTO annotation_tag (annotation_id, tag_id) VALUES(?,?)", a.ID, tagID)
require.NoError(t, err, "should be able to save annotation tag ID", err)
}
return err
})
require.NoError(t, err)
newAnnotations = append(newAnnotations, a)
newAnnotationTags = append(newAnnotationTags, &annotationTag{AnnotationID: a.ID, TagID: 1}, &annotationTag{AnnotationID: a.ID, TagID: 2})
}
err := store.WithDbSession(context.Background(), func(sess *db.Session) error {
batchsize := 500
for i := 0; i < len(newAnnotations); i += batchsize {
_, err := sess.InsertMulti(newAnnotations[i:min(i+batchsize, len(newAnnotations))])
require.NoError(t, err)
}
return nil
})
require.NoError(t, err)
err = store.WithDbSession(context.Background(), func(sess *db.Session) error {
batchsize := 500
for i := 0; i < len(newAnnotationTags); i += batchsize {
_, err := sess.InsertMulti(newAnnotationTags[i:min(i+batchsize, len(newAnnotationTags))])
require.NoError(t, err)
}
return nil
})
require.NoError(t, err)
}
func settingsFn(maxAge time.Duration, maxCount int64) setting.AnnotationCleanupSettings {

@ -10,6 +10,7 @@ import (
"time"
"github.com/grafana/grafana/pkg/services/annotations/accesscontrol"
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/log"
@ -519,10 +520,23 @@ func (r *xormRepositoryImpl) CleanAnnotations(ctx context.Context, cfg setting.A
var totalAffected int64
if cfg.MaxAge > 0 {
cutoffDate := timeNow().Add(-cfg.MaxAge).UnixNano() / int64(time.Millisecond)
deleteQuery := `DELETE FROM annotation WHERE id IN (SELECT id FROM (SELECT id FROM annotation WHERE %s AND created < %v ORDER BY id DESC %s) a)`
sql := fmt.Sprintf(deleteQuery, annotationType, cutoffDate, r.db.GetDialect().Limit(r.cfg.AnnotationCleanupJobBatchSize))
// Single-statement approaches, specifically ones using batched sub-queries, seem to deadlock with concurrent inserts on MySQL.
// We have a bounded batch size, so work around this by first loading the IDs into memory and allowing any locks to flush inside each batch.
// This may under-delete when concurrent inserts happen, but any such annotations will simply be cleaned on the next cycle.
//
// We execute the following batched operation repeatedly until either we run out of objects, the context is cancelled, or there is an error.
affected, err := untilDoneOrCancelled(ctx, func() (int64, error) {
cond := fmt.Sprintf(`%s AND created < %v ORDER BY id DESC %s`, annotationType, cutoffDate, r.db.GetDialect().Limit(r.cfg.AnnotationCleanupJobBatchSize))
ids, err := r.fetchIDs(ctx, "annotation", cond)
if err != nil {
return 0, err
}
r.log.Error("Annotations to clean by time", "count", len(ids), "ids", ids, "cond", cond, "err", err)
affected, err := r.executeUntilDoneOrCancelled(ctx, sql)
x, y := r.deleteByIDs(ctx, "annotation", ids)
r.log.Error("cleaned annotations by time", "count", len(ids), "affected", x, "err", y)
return x, y
})
totalAffected += affected
if err != nil {
return totalAffected, err
@ -530,41 +544,111 @@ func (r *xormRepositoryImpl) CleanAnnotations(ctx context.Context, cfg setting.A
}
if cfg.MaxCount > 0 {
deleteQuery := `DELETE FROM annotation WHERE id IN (SELECT id FROM (SELECT id FROM annotation WHERE %s ORDER BY id DESC %s) a)`
sql := fmt.Sprintf(deleteQuery, annotationType, r.db.GetDialect().LimitOffset(r.cfg.AnnotationCleanupJobBatchSize, cfg.MaxCount))
affected, err := r.executeUntilDoneOrCancelled(ctx, sql)
// Similar strategy as the above cleanup process, to avoid deadlocks.
affected, err := untilDoneOrCancelled(ctx, func() (int64, error) {
cond := fmt.Sprintf(`%s ORDER BY id DESC %s`, annotationType, r.db.GetDialect().LimitOffset(r.cfg.AnnotationCleanupJobBatchSize, cfg.MaxCount))
ids, err := r.fetchIDs(ctx, "annotation", cond)
if err != nil {
return 0, err
}
r.log.Error("Annotations to clean by count", "count", len(ids), "ids", ids, "cond", cond, "err", err)
x, y := r.deleteByIDs(ctx, "annotation", ids)
r.log.Error("cleaned annotations by count", "count", len(ids), "affected", x, "err", y)
return x, y
})
totalAffected += affected
return totalAffected, err
if err != nil {
return totalAffected, err
}
}
return totalAffected, nil
}
func (r *xormRepositoryImpl) CleanOrphanedAnnotationTags(ctx context.Context) (int64, error) {
deleteQuery := `DELETE FROM annotation_tag WHERE id IN ( SELECT id FROM (SELECT id FROM annotation_tag WHERE NOT EXISTS (SELECT 1 FROM annotation a WHERE annotation_id = a.id) %s) a)`
sql := fmt.Sprintf(deleteQuery, r.db.GetDialect().Limit(r.cfg.AnnotationCleanupJobBatchSize))
return r.executeUntilDoneOrCancelled(ctx, sql)
return untilDoneOrCancelled(ctx, func() (int64, error) {
cond := fmt.Sprintf(`NOT EXISTS (SELECT 1 FROM annotation a WHERE annotation_id = a.id) %s`, r.db.GetDialect().Limit(r.cfg.AnnotationCleanupJobBatchSize))
ids, err := r.fetchIDs(ctx, "annotation_tag", cond)
if err != nil {
return 0, err
}
r.log.Error("Tags to clean", "count", len(ids), "ids", ids, "cond", cond, "err", err)
x, y := r.deleteByIDs(ctx, "annotation_tag", ids)
r.log.Error("cleaned tags", "count", len(ids), "affected", x, "err", y)
return x, y
})
}
func (r *xormRepositoryImpl) fetchIDs(ctx context.Context, table, condition string) ([]int64, error) {
sql := fmt.Sprintf(`SELECT id FROM %s`, table)
if condition == "" {
return nil, fmt.Errorf("condition must be supplied; cannot fetch IDs from entire table")
}
sql += fmt.Sprintf(` WHERE %s`, condition)
ids := make([]int64, 0)
err := r.db.WithDbSession(ctx, func(session *db.Session) error {
return session.SQL(sql).Find(&ids)
})
return ids, err
}
func (r *xormRepositoryImpl) executeUntilDoneOrCancelled(ctx context.Context, sql string) (int64, error) {
func (r *xormRepositoryImpl) deleteByIDs(ctx context.Context, table string, ids []int64) (int64, error) {
if len(ids) == 0 {
return 0, nil
}
sql := ""
args := make([]any, 0)
// SQLite has a parameter limit of 999.
// If the batch size is bigger than that, and we're on SQLite, we have to put the IDs directly into the statement.
const sqliteParameterLimit = 999
if r.db.GetDBType() == migrator.SQLite && r.cfg.AnnotationCleanupJobBatchSize > sqliteParameterLimit {
values := fmt.Sprint(ids[0])
for _, v := range ids[1:] {
values = fmt.Sprintf("%s, %d", values, v)
}
sql = fmt.Sprintf(`DELETE FROM %s WHERE id IN (%s)`, table, values)
} else {
placeholders := "?" + strings.Repeat(",?", len(ids)-1)
sql = fmt.Sprintf(`DELETE FROM %s WHERE id IN (%s)`, table, placeholders)
args = asAny(ids)
}
var affected int64
err := r.db.WithDbSession(ctx, func(session *db.Session) error {
res, err := session.Exec(append([]any{sql}, args...)...)
if err != nil {
return err
}
affected, err = res.RowsAffected()
return err
})
return affected, err
}
func asAny(vs []int64) []any {
r := make([]any, len(vs))
for i, v := range vs {
r[i] = v
}
return r
}
// untilDoneOrCancelled repeatedly executes batched work until that work is either done (i.e., returns zero affected objects),
// a batch produces an error, or the provided context is cancelled.
// The work to be done is given as a callback that returns the number of affected objects for each batch, plus that batch's errors.
func untilDoneOrCancelled(ctx context.Context, batchWork func() (int64, error)) (int64, error) {
var totalAffected int64
for {
select {
case <-ctx.Done():
return totalAffected, ctx.Err()
default:
var affected int64
err := r.db.WithDbSession(ctx, func(session *db.Session) error {
res, err := session.Exec(sql)
if err != nil {
return err
}
affected, err = res.RowsAffected()
totalAffected += affected
return err
})
affected, err := batchWork()
totalAffected += affected
if err != nil {
return totalAffected, err
}

@ -74,7 +74,7 @@ func (j cleanUpJob) String() string {
func (srv *CleanUpService) Run(ctx context.Context) error {
srv.cleanUpTmpFiles(ctx)
ticker := time.NewTicker(time.Minute * 10)
ticker := time.NewTicker(time.Minute * 1)
for {
select {
case <-ticker.C:

Loading…
Cancel
Save