From c9d22f06c38b8bb2dd4629db96d1760e0b49d8ff Mon Sep 17 00:00:00 2001 From: Michael Mandrus <41969079+mmandrus@users.noreply.github.com> Date: Fri, 10 Jan 2025 14:42:18 -0500 Subject: [PATCH] CloudMigrations: Bulk update local resources (#96002) * wip * make tests pass * get all tests passing * fixes * some small cleanup * fix test * convert delimiter keys to struct keys * dont execute empty sql statement * remove printlns * fix unit test * a bit more cleanup * whoops --- .../cloudmigrationimpl/cloudmigration.go | 8 +- .../cloudmigrationimpl/cloudmigration_test.go | 10 +- .../cloudmigrationimpl/snapshot_mgmt.go | 10 +- .../cloudmigrationimpl/store.go | 9 -- .../cloudmigrationimpl/xorm_store.go | 119 ++++++++++++++---- .../cloudmigrationimpl/xorm_store_test.go | 23 ++-- .../gmsclient/inmemory_client.go | 6 - pkg/services/cloudmigration/model.go | 15 ++- .../dashboards/dashboard_service_mock.go | 2 +- 9 files changed, 139 insertions(+), 63 deletions(-) diff --git a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go index fe74d5c88bb..3a8806e26af 100644 --- a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go +++ b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go @@ -612,10 +612,10 @@ func (s *Service) GetSnapshot(ctx context.Context, query cloudmigration.GetSnaps // We need to update the snapshot in our db before reporting anything if err := s.store.UpdateSnapshot(ctx, cloudmigration.UpdateSnapshotCmd{ - UID: snapshot.UID, - SessionID: sessionUid, - Status: localStatus, - Resources: resources, + UID: snapshot.UID, + SessionID: sessionUid, + Status: localStatus, + CloudResourcesToUpdate: resources, }); err != nil { return nil, fmt.Errorf("error updating snapshot status: %w", err) } diff --git a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_test.go b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_test.go index 378d2546832..9bde512b504 100644 --- a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_test.go +++ b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_test.go @@ -114,11 +114,19 @@ func Test_GetSnapshotStatusFromGMS(t *testing.T) { assert.Equal(t, cloudmigration.SnapshotStatusCreating, snapshot.Status) assert.Never(t, func() bool { return gmsClientFake.GetSnapshotStatusCallCount() > 0 }, time.Second, 10*time.Millisecond) - // Make the status pending processing and ensure GMS gets called + // Make the status pending processing to ensure GMS gets called and initialize a resource err = s.store.UpdateSnapshot(ctx, cloudmigration.UpdateSnapshotCmd{ UID: uid, SessionID: sess.UID, Status: cloudmigration.SnapshotStatusPendingProcessing, + LocalResourcesToCreate: []cloudmigration.CloudMigrationResource{ + { + Name: "A name", + Type: cloudmigration.DatasourceDataType, + RefID: "A", + Status: cloudmigration.ItemStatusPending, + }, + }, }) assert.NoError(t, err) diff --git a/pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go b/pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go index 1b5cb94a971..870c4949533 100644 --- a/pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go +++ b/pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go @@ -569,10 +569,10 @@ func (s *Service) buildSnapshot(ctx context.Context, signedInUser *user.SignedIn // update snapshot status to pending upload with retries if err := s.updateSnapshotWithRetries(ctx, cloudmigration.UpdateSnapshotCmd{ - UID: snapshotMeta.UID, - SessionID: snapshotMeta.SessionUID, - Status: cloudmigration.SnapshotStatusPendingUpload, - Resources: localSnapshotResource, + UID: snapshotMeta.UID, + SessionID: snapshotMeta.SessionUID, + Status: cloudmigration.SnapshotStatusPendingUpload, + LocalResourcesToCreate: localSnapshotResource, }); err != nil { return err } @@ -714,7 +714,7 @@ func (s *Service) updateSnapshotWithRetries(ctx context.Context, cmd cloudmigrat } return retryer.FuncComplete, nil }, maxRetries, time.Millisecond*10, time.Second*5); err != nil { - s.log.Error("failed to update snapshot status", "snapshotUid", cmd.UID, "status", cmd.Status, "num_resources", len(cmd.Resources), "error", err.Error()) + s.log.Error("failed to update snapshot status", "snapshotUid", cmd.UID, "status", cmd.Status, "num_local_resources", len(cmd.LocalResourcesToCreate), "num_cloud_resources", len(cmd.CloudResourcesToUpdate), "error", err.Error()) return fmt.Errorf("failed to update snapshot status: %w", err) } return nil diff --git a/pkg/services/cloudmigration/cloudmigrationimpl/store.go b/pkg/services/cloudmigration/cloudmigrationimpl/store.go index f8f7dd76263..86f130a1f27 100644 --- a/pkg/services/cloudmigration/cloudmigrationimpl/store.go +++ b/pkg/services/cloudmigration/cloudmigrationimpl/store.go @@ -10,19 +10,10 @@ type store interface { CreateMigrationSession(ctx context.Context, session cloudmigration.CloudMigrationSession) (*cloudmigration.CloudMigrationSession, error) GetMigrationSessionByUID(ctx context.Context, orgID int64, uid string) (*cloudmigration.CloudMigrationSession, error) GetCloudMigrationSessionList(ctx context.Context, orgID int64) ([]*cloudmigration.CloudMigrationSession, error) - // DeleteMigrationSessionByUID deletes the migration session, and all the related snapshot and resources. - // the work is done in a transaction. DeleteMigrationSessionByUID(ctx context.Context, orgID int64, uid string) (*cloudmigration.CloudMigrationSession, []cloudmigration.CloudMigrationSnapshot, error) CreateSnapshot(ctx context.Context, snapshot cloudmigration.CloudMigrationSnapshot) (string, error) UpdateSnapshot(ctx context.Context, snapshot cloudmigration.UpdateSnapshotCmd) error GetSnapshotByUID(ctx context.Context, orgID int64, sessUid, id string, resultPage int, resultLimit int) (*cloudmigration.CloudMigrationSnapshot, error) GetSnapshotList(ctx context.Context, query cloudmigration.ListSnapshotsQuery) ([]cloudmigration.CloudMigrationSnapshot, error) - - // Deleted because were not used externally - // - DeleteSnapshot(ctx context.Context, snapshotUid string) error - // - CreateUpdateSnapshotResources(ctx context.Context, snapshotUid string, resources []cloudmigration.CloudMigrationResource) error - // - GetSnapshotResources(ctx context.Context, snapshotUid string, page int, limit int) ([]cloudmigration.CloudMigrationResource, error) - // - GetSnapshotResourceStats(ctx context.Context, snapshotUid string) (*cloudmigration.SnapshotResourceStats, error) - // - DeleteSnapshotResources(ctx context.Context, snapshotUid string) error } diff --git a/pkg/services/cloudmigration/cloudmigrationimpl/xorm_store.go b/pkg/services/cloudmigration/cloudmigrationimpl/xorm_store.go index a3b088b09a5..efbe701e51c 100644 --- a/pkg/services/cloudmigration/cloudmigrationimpl/xorm_store.go +++ b/pkg/services/cloudmigration/cloudmigrationimpl/xorm_store.go @@ -4,6 +4,7 @@ import ( "context" "encoding/base64" "fmt" + "strings" "time" "github.com/grafana/grafana/pkg/infra/db" @@ -94,6 +95,7 @@ func (ss *sqlStore) GetCloudMigrationSessionList(ctx context.Context, orgID int6 return migrations, nil } +// DeleteMigrationSessionByUID deletes the migration session, and all the related snapshot and resources the work is done in a transaction. func (ss *sqlStore) DeleteMigrationSessionByUID(ctx context.Context, orgID int64, uid string) (*cloudmigration.CloudMigrationSession, []cloudmigration.CloudMigrationSnapshot, error) { var c cloudmigration.CloudMigrationSession err := ss.db.WithDbSession(ctx, func(sess *db.Session) error { @@ -127,7 +129,7 @@ func (ss *sqlStore) DeleteMigrationSessionByUID(ctx context.Context, orgID int64 if err != nil { return fmt.Errorf("deleting snapshot resource from db: %w", err) } - err = ss.deleteSnapshot(ctx, orgID, snapshot.UID) + err = ss.deleteSnapshot(ctx, snapshot.UID) if err != nil { return fmt.Errorf("deleting snapshot from db: %w", err) } @@ -211,19 +213,26 @@ func (ss *sqlStore) UpdateSnapshot(ctx context.Context, update cloudmigration.Up } } - // Update resources if set - if len(update.Resources) > 0 { - if err := ss.createUpdateSnapshotResources(ctx, update.UID, update.Resources); err != nil { + // If local resources are set, it means we have to create them for the first time + if len(update.LocalResourcesToCreate) > 0 { + if err := ss.CreateSnapshotResources(ctx, update.UID, update.LocalResourcesToCreate); err != nil { return err } } + // If cloud resources are set, it means we have to update our resource local state + if len(update.CloudResourcesToUpdate) > 0 { + if err := ss.UpdateSnapshotResources(ctx, update.UID, update.CloudResourcesToUpdate); err != nil { + return err + } + } + return nil }) return err } -func (ss *sqlStore) deleteSnapshot(ctx context.Context, orgID int64, snapshotUid string) error { +func (ss *sqlStore) deleteSnapshot(ctx context.Context, snapshotUid string) error { return ss.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error { _, err := sess.Delete(cloudmigration.CloudMigrationSnapshot{ UID: snapshotUid, @@ -316,30 +325,90 @@ func (ss *sqlStore) GetSnapshotList(ctx context.Context, query cloudmigration.Li return snapshots, nil } -// CreateUpdateSnapshotResources either updates a migration resource for a snapshot, or creates it if it does not exist -// If the uid is not known, it uses snapshot_uid + resource_uid as a lookup -func (ss *sqlStore) createUpdateSnapshotResources(ctx context.Context, snapshotUid string, resources []cloudmigration.CloudMigrationResource) error { +// CreateSnapshotResources initializes the local state of a resources belonging to a snapshot +func (ss *sqlStore) CreateSnapshotResources(ctx context.Context, snapshotUid string, resources []cloudmigration.CloudMigrationResource) error { + for i := 0; i < len(resources); i++ { + resources[i].UID = util.GenerateShortUID() + // ensure snapshot_uids are consistent so that we can use in conjunction with refID for lookup later + resources[i].SnapshotUID = snapshotUid + } + + err := ss.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error { + _, err := sess.Insert(resources) + if err != nil { + return err + } + return nil + }) + if err != nil { + return fmt.Errorf("creating resources: %w", err) + } + + return nil +} + +// UpdateSnapshotResources updates a migration resource for a snapshot, using snapshot_uid + resource_uid as a lookup +// It does preprocessing on the results in order to minimize the sql queries executed. +func (ss *sqlStore) UpdateSnapshotResources(ctx context.Context, snapshotUid string, resources []cloudmigration.CloudMigrationResource) error { + // refIds of resources that migrated successfully in order to update in bulk + okIds := make([]any, 0, len(resources)) + + // group any failed resources by errCode and errStr + type errId struct { + errCode cloudmigration.ResourceErrorCode + errStr string + } + errorIds := make(map[errId][]any) + + for _, r := range resources { + if r.Status == cloudmigration.ItemStatusOK { + okIds = append(okIds, r.RefID) + } else if r.Status == cloudmigration.ItemStatusError { + key := errId{errCode: r.ErrorCode, errStr: r.Error} + if ids, ok := errorIds[key]; ok { + errorIds[key] = append(ids, r.RefID) + } else { + errorIds[key] = []any{r.RefID} + } + } + } + + type statement struct { + sql string + args []any + } + + // Prepare a sql statement for all of the OK statuses + var okUpdateStatement *statement + if len(okIds) > 0 { + okUpdateStatement = &statement{ + sql: fmt.Sprintf("UPDATE cloud_migration_resource SET status=? WHERE snapshot_uid=? AND resource_uid IN (?%s)", strings.Repeat(", ?", len(okIds)-1)), + args: append([]any{cloudmigration.ItemStatusOK, snapshotUid}, okIds...), + } + } + + // Prepare however many sql statements are necessary for the error statuses + errorStatements := make([]statement, 0, len(errorIds)) + for k, ids := range errorIds { + errorStatements = append(errorStatements, statement{ + sql: fmt.Sprintf("UPDATE cloud_migration_resource SET status=?, error_code=?, error_string=? WHERE snapshot_uid=? AND resource_uid IN (?%s)", strings.Repeat(", ?", len(ids)-1)), + args: append([]any{cloudmigration.ItemStatusError, k.errCode, k.errStr, snapshotUid}, ids...), + }) + } + + // Execute the minimum number of required statements! + return ss.db.InTransaction(ctx, func(ctx context.Context) error { - sql := "UPDATE cloud_migration_resource SET status=?, error_string=?, error_code=? WHERE uid=? OR (snapshot_uid=? AND resource_uid=?)" err := ss.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error { - for _, r := range resources { - // try an update first - result, err := sess.Exec(sql, r.Status, r.Error, r.ErrorCode, r.UID, snapshotUid, r.RefID) - if err != nil { + if okUpdateStatement != nil { + if _, err := sess.Exec(append([]any{okUpdateStatement.sql}, okUpdateStatement.args...)...); err != nil { return err } - // if this had no effect, assign a uid and insert instead - n, err := result.RowsAffected() - if err != nil { + } + + for _, q := range errorStatements { + if _, err := sess.Exec(append([]any{q.sql}, q.args...)...); err != nil { return err - } else if n == 0 { - r.UID = util.GenerateShortUID() - // ensure snapshot_uids are consistent so that we can use them to query when uid isn't known - r.SnapshotUID = snapshotUid - _, err := sess.Insert(r) - if err != nil { - return err - } } } return nil @@ -364,7 +433,7 @@ func (ss *sqlStore) getSnapshotResources(ctx context.Context, snapshotUid string err := ss.db.WithDbSession(ctx, func(sess *db.Session) error { offset := (page - 1) * limit sess.Limit(limit, offset) - return sess.Find(&resources, &cloudmigration.CloudMigrationResource{ + return sess.OrderBy("id ASC").Find(&resources, &cloudmigration.CloudMigrationResource{ SnapshotUID: snapshotUid, }) }) diff --git a/pkg/services/cloudmigration/cloudmigrationimpl/xorm_store_test.go b/pkg/services/cloudmigration/cloudmigrationimpl/xorm_store_test.go index c9ae1163eba..03ab9065c57 100644 --- a/pkg/services/cloudmigration/cloudmigrationimpl/xorm_store_test.go +++ b/pkg/services/cloudmigration/cloudmigrationimpl/xorm_store_test.go @@ -155,7 +155,7 @@ func Test_SnapshotManagement(t *testing.T) { require.Equal(t, *snapshot, snapshots[0]) // delete snapshot - err = s.deleteSnapshot(ctx, 1, snapshotUid) + err = s.deleteSnapshot(ctx, snapshotUid) require.NoError(t, err) // now we expect not to find the snapshot @@ -174,16 +174,25 @@ func Test_SnapshotResources(t *testing.T) { resources, err := s.getSnapshotResources(ctx, "poiuy", 0, 100) assert.NoError(t, err) assert.Len(t, resources, 3) + for _, r := range resources { + if r.RefID == "ejcx4d" { + assert.Equal(t, cloudmigration.ItemStatusError, r.Status) + break + } + } - // create a new resource and update an existing resource - err = s.createUpdateSnapshotResources(ctx, "poiuy", []cloudmigration.CloudMigrationResource{ + // create a new resource + err = s.CreateSnapshotResources(ctx, "poiuy", []cloudmigration.CloudMigrationResource{ { Type: cloudmigration.DatasourceDataType, RefID: "mi39fj", Status: cloudmigration.ItemStatusOK, }, + }) + assert.NoError(t, err) + err = s.UpdateSnapshotResources(ctx, "poiuy", []cloudmigration.CloudMigrationResource{ { - UID: "qwerty", + RefID: "ejcx4d", Status: cloudmigration.ItemStatusOK, }, }) @@ -193,16 +202,16 @@ func Test_SnapshotResources(t *testing.T) { resources, err = s.getSnapshotResources(ctx, "poiuy", 0, 100) assert.NoError(t, err) assert.Len(t, resources, 4) - // ensure existing resource was updated + // ensure existing resource was updated from ERROR for _, r := range resources { - if r.UID == "querty" { + if r.RefID == "ejcx4d" { assert.Equal(t, cloudmigration.ItemStatusOK, r.Status) break } } // ensure a new one was made for _, r := range resources { - if r.UID == "mi39fj" { + if r.RefID == "mi39fj" { assert.Equal(t, cloudmigration.ItemStatusOK, r.Status) break } diff --git a/pkg/services/cloudmigration/gmsclient/inmemory_client.go b/pkg/services/cloudmigration/gmsclient/inmemory_client.go index 74cb326382d..2a9d73e0b05 100644 --- a/pkg/services/cloudmigration/gmsclient/inmemory_client.go +++ b/pkg/services/cloudmigration/gmsclient/inmemory_client.go @@ -105,12 +105,6 @@ func (c *memoryClientImpl) GetSnapshotStatus(ctx context.Context, session cloudm RefID: "folder1", Status: cloudmigration.ItemStatusOK, }, - { - Type: cloudmigration.DatasourceDataType, - RefID: "ds2", - Status: cloudmigration.ItemStatusWarning, - Error: "Only core data sources are supported. Please ensure the plugin is installed on the cloud stack.", - }, }, } diff --git a/pkg/services/cloudmigration/model.go b/pkg/services/cloudmigration/model.go index 1f6c2a54769..7d0b4b58f49 100644 --- a/pkg/services/cloudmigration/model.go +++ b/pkg/services/cloudmigration/model.go @@ -67,7 +67,7 @@ const ( type CloudMigrationResource struct { ID int64 `xorm:"pk autoincr 'id'"` - UID string `xorm:"uid"` + UID string `xorm:"uid" json:"uid"` Name string `xorm:"name" json:"name"` Type MigrateDataType `xorm:"resource_type" json:"type"` @@ -98,9 +98,10 @@ const ( type ItemStatus string const ( - ItemStatusOK ItemStatus = "OK" - ItemStatusWarning ItemStatus = "WARNING" - ItemStatusError ItemStatus = "ERROR" + // Returned by GMS + ItemStatusOK ItemStatus = "OK" + ItemStatusError ItemStatus = "ERROR" + // Used by default while awaiting GMS results ItemStatusPending ItemStatus = "PENDING" ) @@ -180,7 +181,11 @@ type UpdateSnapshotCmd struct { UID string SessionID string Status SnapshotStatus - Resources []CloudMigrationResource + + // LocalResourcesToCreate represents the local state of a resource before it has been uploaded to GMS + LocalResourcesToCreate []CloudMigrationResource + // CloudResourcesToUpdate represents resource state from GMS, to be merged with the local state + CloudResourcesToUpdate []CloudMigrationResource } // access token diff --git a/pkg/services/dashboards/dashboard_service_mock.go b/pkg/services/dashboards/dashboard_service_mock.go index 5e49ea6ee3e..8847aafde85 100644 --- a/pkg/services/dashboards/dashboard_service_mock.go +++ b/pkg/services/dashboards/dashboard_service_mock.go @@ -121,7 +121,7 @@ func (_m *FakeDashboardService) DeleteDashboard(ctx context.Context, dashboardId } // DeleteAllDashboards provides a mock function with given fields: ctx, orgID -func (_m *FakeDashboardService) DeleteAllDashboards(ctx context.Context, orgID int64) error { +func (_m *FakeDashboardService) DeleteAllDashboards(ctx context.Context, orgID int64) error { ret := _m.Called(ctx, orgID) if len(ret) == 0 {