CloudMigrations: Bulk update local resources (#96002)

* wip

* make tests pass

* get all tests passing

* fixes

* some small cleanup

* fix test

* convert delimiter keys to struct keys

* dont execute empty sql statement

* remove printlns

* fix unit test

* a bit more cleanup

* whoops
pull/97986/head
Michael Mandrus 5 months ago committed by GitHub
parent 99a0eb825d
commit c9d22f06c3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 8
      pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go
  2. 10
      pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_test.go
  3. 10
      pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go
  4. 9
      pkg/services/cloudmigration/cloudmigrationimpl/store.go
  5. 119
      pkg/services/cloudmigration/cloudmigrationimpl/xorm_store.go
  6. 23
      pkg/services/cloudmigration/cloudmigrationimpl/xorm_store_test.go
  7. 6
      pkg/services/cloudmigration/gmsclient/inmemory_client.go
  8. 15
      pkg/services/cloudmigration/model.go
  9. 2
      pkg/services/dashboards/dashboard_service_mock.go

@ -612,10 +612,10 @@ func (s *Service) GetSnapshot(ctx context.Context, query cloudmigration.GetSnaps
// We need to update the snapshot in our db before reporting anything
if err := s.store.UpdateSnapshot(ctx, cloudmigration.UpdateSnapshotCmd{
UID: snapshot.UID,
SessionID: sessionUid,
Status: localStatus,
Resources: resources,
UID: snapshot.UID,
SessionID: sessionUid,
Status: localStatus,
CloudResourcesToUpdate: resources,
}); err != nil {
return nil, fmt.Errorf("error updating snapshot status: %w", err)
}

@ -114,11 +114,19 @@ func Test_GetSnapshotStatusFromGMS(t *testing.T) {
assert.Equal(t, cloudmigration.SnapshotStatusCreating, snapshot.Status)
assert.Never(t, func() bool { return gmsClientFake.GetSnapshotStatusCallCount() > 0 }, time.Second, 10*time.Millisecond)
// Make the status pending processing and ensure GMS gets called
// Make the status pending processing to ensure GMS gets called and initialize a resource
err = s.store.UpdateSnapshot(ctx, cloudmigration.UpdateSnapshotCmd{
UID: uid,
SessionID: sess.UID,
Status: cloudmigration.SnapshotStatusPendingProcessing,
LocalResourcesToCreate: []cloudmigration.CloudMigrationResource{
{
Name: "A name",
Type: cloudmigration.DatasourceDataType,
RefID: "A",
Status: cloudmigration.ItemStatusPending,
},
},
})
assert.NoError(t, err)

@ -569,10 +569,10 @@ func (s *Service) buildSnapshot(ctx context.Context, signedInUser *user.SignedIn
// update snapshot status to pending upload with retries
if err := s.updateSnapshotWithRetries(ctx, cloudmigration.UpdateSnapshotCmd{
UID: snapshotMeta.UID,
SessionID: snapshotMeta.SessionUID,
Status: cloudmigration.SnapshotStatusPendingUpload,
Resources: localSnapshotResource,
UID: snapshotMeta.UID,
SessionID: snapshotMeta.SessionUID,
Status: cloudmigration.SnapshotStatusPendingUpload,
LocalResourcesToCreate: localSnapshotResource,
}); err != nil {
return err
}
@ -714,7 +714,7 @@ func (s *Service) updateSnapshotWithRetries(ctx context.Context, cmd cloudmigrat
}
return retryer.FuncComplete, nil
}, maxRetries, time.Millisecond*10, time.Second*5); err != nil {
s.log.Error("failed to update snapshot status", "snapshotUid", cmd.UID, "status", cmd.Status, "num_resources", len(cmd.Resources), "error", err.Error())
s.log.Error("failed to update snapshot status", "snapshotUid", cmd.UID, "status", cmd.Status, "num_local_resources", len(cmd.LocalResourcesToCreate), "num_cloud_resources", len(cmd.CloudResourcesToUpdate), "error", err.Error())
return fmt.Errorf("failed to update snapshot status: %w", err)
}
return nil

@ -10,19 +10,10 @@ type store interface {
CreateMigrationSession(ctx context.Context, session cloudmigration.CloudMigrationSession) (*cloudmigration.CloudMigrationSession, error)
GetMigrationSessionByUID(ctx context.Context, orgID int64, uid string) (*cloudmigration.CloudMigrationSession, error)
GetCloudMigrationSessionList(ctx context.Context, orgID int64) ([]*cloudmigration.CloudMigrationSession, error)
// DeleteMigrationSessionByUID deletes the migration session, and all the related snapshot and resources.
// the work is done in a transaction.
DeleteMigrationSessionByUID(ctx context.Context, orgID int64, uid string) (*cloudmigration.CloudMigrationSession, []cloudmigration.CloudMigrationSnapshot, error)
CreateSnapshot(ctx context.Context, snapshot cloudmigration.CloudMigrationSnapshot) (string, error)
UpdateSnapshot(ctx context.Context, snapshot cloudmigration.UpdateSnapshotCmd) error
GetSnapshotByUID(ctx context.Context, orgID int64, sessUid, id string, resultPage int, resultLimit int) (*cloudmigration.CloudMigrationSnapshot, error)
GetSnapshotList(ctx context.Context, query cloudmigration.ListSnapshotsQuery) ([]cloudmigration.CloudMigrationSnapshot, error)
// Deleted because were not used externally
// - DeleteSnapshot(ctx context.Context, snapshotUid string) error
// - CreateUpdateSnapshotResources(ctx context.Context, snapshotUid string, resources []cloudmigration.CloudMigrationResource) error
// - GetSnapshotResources(ctx context.Context, snapshotUid string, page int, limit int) ([]cloudmigration.CloudMigrationResource, error)
// - GetSnapshotResourceStats(ctx context.Context, snapshotUid string) (*cloudmigration.SnapshotResourceStats, error)
// - DeleteSnapshotResources(ctx context.Context, snapshotUid string) error
}

@ -4,6 +4,7 @@ import (
"context"
"encoding/base64"
"fmt"
"strings"
"time"
"github.com/grafana/grafana/pkg/infra/db"
@ -94,6 +95,7 @@ func (ss *sqlStore) GetCloudMigrationSessionList(ctx context.Context, orgID int6
return migrations, nil
}
// DeleteMigrationSessionByUID deletes the migration session, and all the related snapshot and resources the work is done in a transaction.
func (ss *sqlStore) DeleteMigrationSessionByUID(ctx context.Context, orgID int64, uid string) (*cloudmigration.CloudMigrationSession, []cloudmigration.CloudMigrationSnapshot, error) {
var c cloudmigration.CloudMigrationSession
err := ss.db.WithDbSession(ctx, func(sess *db.Session) error {
@ -127,7 +129,7 @@ func (ss *sqlStore) DeleteMigrationSessionByUID(ctx context.Context, orgID int64
if err != nil {
return fmt.Errorf("deleting snapshot resource from db: %w", err)
}
err = ss.deleteSnapshot(ctx, orgID, snapshot.UID)
err = ss.deleteSnapshot(ctx, snapshot.UID)
if err != nil {
return fmt.Errorf("deleting snapshot from db: %w", err)
}
@ -211,19 +213,26 @@ func (ss *sqlStore) UpdateSnapshot(ctx context.Context, update cloudmigration.Up
}
}
// Update resources if set
if len(update.Resources) > 0 {
if err := ss.createUpdateSnapshotResources(ctx, update.UID, update.Resources); err != nil {
// If local resources are set, it means we have to create them for the first time
if len(update.LocalResourcesToCreate) > 0 {
if err := ss.CreateSnapshotResources(ctx, update.UID, update.LocalResourcesToCreate); err != nil {
return err
}
}
// If cloud resources are set, it means we have to update our resource local state
if len(update.CloudResourcesToUpdate) > 0 {
if err := ss.UpdateSnapshotResources(ctx, update.UID, update.CloudResourcesToUpdate); err != nil {
return err
}
}
return nil
})
return err
}
func (ss *sqlStore) deleteSnapshot(ctx context.Context, orgID int64, snapshotUid string) error {
func (ss *sqlStore) deleteSnapshot(ctx context.Context, snapshotUid string) error {
return ss.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
_, err := sess.Delete(cloudmigration.CloudMigrationSnapshot{
UID: snapshotUid,
@ -316,30 +325,90 @@ func (ss *sqlStore) GetSnapshotList(ctx context.Context, query cloudmigration.Li
return snapshots, nil
}
// CreateUpdateSnapshotResources either updates a migration resource for a snapshot, or creates it if it does not exist
// If the uid is not known, it uses snapshot_uid + resource_uid as a lookup
func (ss *sqlStore) createUpdateSnapshotResources(ctx context.Context, snapshotUid string, resources []cloudmigration.CloudMigrationResource) error {
// CreateSnapshotResources initializes the local state of a resources belonging to a snapshot
func (ss *sqlStore) CreateSnapshotResources(ctx context.Context, snapshotUid string, resources []cloudmigration.CloudMigrationResource) error {
for i := 0; i < len(resources); i++ {
resources[i].UID = util.GenerateShortUID()
// ensure snapshot_uids are consistent so that we can use in conjunction with refID for lookup later
resources[i].SnapshotUID = snapshotUid
}
err := ss.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
_, err := sess.Insert(resources)
if err != nil {
return err
}
return nil
})
if err != nil {
return fmt.Errorf("creating resources: %w", err)
}
return nil
}
// UpdateSnapshotResources updates a migration resource for a snapshot, using snapshot_uid + resource_uid as a lookup
// It does preprocessing on the results in order to minimize the sql queries executed.
func (ss *sqlStore) UpdateSnapshotResources(ctx context.Context, snapshotUid string, resources []cloudmigration.CloudMigrationResource) error {
// refIds of resources that migrated successfully in order to update in bulk
okIds := make([]any, 0, len(resources))
// group any failed resources by errCode and errStr
type errId struct {
errCode cloudmigration.ResourceErrorCode
errStr string
}
errorIds := make(map[errId][]any)
for _, r := range resources {
if r.Status == cloudmigration.ItemStatusOK {
okIds = append(okIds, r.RefID)
} else if r.Status == cloudmigration.ItemStatusError {
key := errId{errCode: r.ErrorCode, errStr: r.Error}
if ids, ok := errorIds[key]; ok {
errorIds[key] = append(ids, r.RefID)
} else {
errorIds[key] = []any{r.RefID}
}
}
}
type statement struct {
sql string
args []any
}
// Prepare a sql statement for all of the OK statuses
var okUpdateStatement *statement
if len(okIds) > 0 {
okUpdateStatement = &statement{
sql: fmt.Sprintf("UPDATE cloud_migration_resource SET status=? WHERE snapshot_uid=? AND resource_uid IN (?%s)", strings.Repeat(", ?", len(okIds)-1)),
args: append([]any{cloudmigration.ItemStatusOK, snapshotUid}, okIds...),
}
}
// Prepare however many sql statements are necessary for the error statuses
errorStatements := make([]statement, 0, len(errorIds))
for k, ids := range errorIds {
errorStatements = append(errorStatements, statement{
sql: fmt.Sprintf("UPDATE cloud_migration_resource SET status=?, error_code=?, error_string=? WHERE snapshot_uid=? AND resource_uid IN (?%s)", strings.Repeat(", ?", len(ids)-1)),
args: append([]any{cloudmigration.ItemStatusError, k.errCode, k.errStr, snapshotUid}, ids...),
})
}
// Execute the minimum number of required statements!
return ss.db.InTransaction(ctx, func(ctx context.Context) error {
sql := "UPDATE cloud_migration_resource SET status=?, error_string=?, error_code=? WHERE uid=? OR (snapshot_uid=? AND resource_uid=?)"
err := ss.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
for _, r := range resources {
// try an update first
result, err := sess.Exec(sql, r.Status, r.Error, r.ErrorCode, r.UID, snapshotUid, r.RefID)
if err != nil {
if okUpdateStatement != nil {
if _, err := sess.Exec(append([]any{okUpdateStatement.sql}, okUpdateStatement.args...)...); err != nil {
return err
}
// if this had no effect, assign a uid and insert instead
n, err := result.RowsAffected()
if err != nil {
}
for _, q := range errorStatements {
if _, err := sess.Exec(append([]any{q.sql}, q.args...)...); err != nil {
return err
} else if n == 0 {
r.UID = util.GenerateShortUID()
// ensure snapshot_uids are consistent so that we can use them to query when uid isn't known
r.SnapshotUID = snapshotUid
_, err := sess.Insert(r)
if err != nil {
return err
}
}
}
return nil
@ -364,7 +433,7 @@ func (ss *sqlStore) getSnapshotResources(ctx context.Context, snapshotUid string
err := ss.db.WithDbSession(ctx, func(sess *db.Session) error {
offset := (page - 1) * limit
sess.Limit(limit, offset)
return sess.Find(&resources, &cloudmigration.CloudMigrationResource{
return sess.OrderBy("id ASC").Find(&resources, &cloudmigration.CloudMigrationResource{
SnapshotUID: snapshotUid,
})
})

@ -155,7 +155,7 @@ func Test_SnapshotManagement(t *testing.T) {
require.Equal(t, *snapshot, snapshots[0])
// delete snapshot
err = s.deleteSnapshot(ctx, 1, snapshotUid)
err = s.deleteSnapshot(ctx, snapshotUid)
require.NoError(t, err)
// now we expect not to find the snapshot
@ -174,16 +174,25 @@ func Test_SnapshotResources(t *testing.T) {
resources, err := s.getSnapshotResources(ctx, "poiuy", 0, 100)
assert.NoError(t, err)
assert.Len(t, resources, 3)
for _, r := range resources {
if r.RefID == "ejcx4d" {
assert.Equal(t, cloudmigration.ItemStatusError, r.Status)
break
}
}
// create a new resource and update an existing resource
err = s.createUpdateSnapshotResources(ctx, "poiuy", []cloudmigration.CloudMigrationResource{
// create a new resource
err = s.CreateSnapshotResources(ctx, "poiuy", []cloudmigration.CloudMigrationResource{
{
Type: cloudmigration.DatasourceDataType,
RefID: "mi39fj",
Status: cloudmigration.ItemStatusOK,
},
})
assert.NoError(t, err)
err = s.UpdateSnapshotResources(ctx, "poiuy", []cloudmigration.CloudMigrationResource{
{
UID: "qwerty",
RefID: "ejcx4d",
Status: cloudmigration.ItemStatusOK,
},
})
@ -193,16 +202,16 @@ func Test_SnapshotResources(t *testing.T) {
resources, err = s.getSnapshotResources(ctx, "poiuy", 0, 100)
assert.NoError(t, err)
assert.Len(t, resources, 4)
// ensure existing resource was updated
// ensure existing resource was updated from ERROR
for _, r := range resources {
if r.UID == "querty" {
if r.RefID == "ejcx4d" {
assert.Equal(t, cloudmigration.ItemStatusOK, r.Status)
break
}
}
// ensure a new one was made
for _, r := range resources {
if r.UID == "mi39fj" {
if r.RefID == "mi39fj" {
assert.Equal(t, cloudmigration.ItemStatusOK, r.Status)
break
}

@ -105,12 +105,6 @@ func (c *memoryClientImpl) GetSnapshotStatus(ctx context.Context, session cloudm
RefID: "folder1",
Status: cloudmigration.ItemStatusOK,
},
{
Type: cloudmigration.DatasourceDataType,
RefID: "ds2",
Status: cloudmigration.ItemStatusWarning,
Error: "Only core data sources are supported. Please ensure the plugin is installed on the cloud stack.",
},
},
}

@ -67,7 +67,7 @@ const (
type CloudMigrationResource struct {
ID int64 `xorm:"pk autoincr 'id'"`
UID string `xorm:"uid"`
UID string `xorm:"uid" json:"uid"`
Name string `xorm:"name" json:"name"`
Type MigrateDataType `xorm:"resource_type" json:"type"`
@ -98,9 +98,10 @@ const (
type ItemStatus string
const (
ItemStatusOK ItemStatus = "OK"
ItemStatusWarning ItemStatus = "WARNING"
ItemStatusError ItemStatus = "ERROR"
// Returned by GMS
ItemStatusOK ItemStatus = "OK"
ItemStatusError ItemStatus = "ERROR"
// Used by default while awaiting GMS results
ItemStatusPending ItemStatus = "PENDING"
)
@ -180,7 +181,11 @@ type UpdateSnapshotCmd struct {
UID string
SessionID string
Status SnapshotStatus
Resources []CloudMigrationResource
// LocalResourcesToCreate represents the local state of a resource before it has been uploaded to GMS
LocalResourcesToCreate []CloudMigrationResource
// CloudResourcesToUpdate represents resource state from GMS, to be merged with the local state
CloudResourcesToUpdate []CloudMigrationResource
}
// access token

@ -121,7 +121,7 @@ func (_m *FakeDashboardService) DeleteDashboard(ctx context.Context, dashboardId
}
// DeleteAllDashboards provides a mock function with given fields: ctx, orgID
func (_m *FakeDashboardService) DeleteAllDashboards(ctx context.Context, orgID int64) error {
func (_m *FakeDashboardService) DeleteAllDashboards(ctx context.Context, orgID int64) error {
ret := _m.Called(ctx, orgID)
if len(ret) == 0 {

Loading…
Cancel
Save