From a43a5389760e0d81f0ef1ee8b7f174299e3285b5 Mon Sep 17 00:00:00 2001 From: Michael Mandrus <41969079+mmandrus@users.noreply.github.com> Date: Thu, 18 Jul 2024 11:34:28 -0400 Subject: [PATCH] CloudMigrations: Fix issues discovered during end to end testing (#90562) * improve error handling a retries during async operations * fix use of contexts * updates to how we call the folder api * fix urls for gms * more progress on the folder issue * fix folders * refactor for readability --- .../cloudmigrationimpl/cloudmigration.go | 16 +- .../cloudmigrationimpl/snapshot_mgmt.go | 149 +++++++++--------- .../cloudmigration/gmsclient/gms_client.go | 4 +- 3 files changed, 95 insertions(+), 74 deletions(-) diff --git a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go index 01218dd8d10..a9b046a5b1e 100644 --- a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go +++ b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go @@ -501,6 +501,13 @@ func (s *Service) CreateSnapshot(ctx context.Context, signedInUser *user.SignedI go func() { if err := s.buildSnapshot(context.Background(), signedInUser, initResp.MaxItemsPerPartition, snapshot); err != nil { s.log.Error("building snapshot", "err", err.Error()) + // Update status to error with retries + if err := s.updateStatusWithRetries(context.Background(), cloudmigration.UpdateSnapshotCmd{ + UID: snapshot.UID, + Status: cloudmigration.SnapshotStatusError, + }); err != nil { + s.log.Error("critical failure during snapshot creation - please report any error logs") + } } }() @@ -610,7 +617,14 @@ func (s *Service) UploadSnapshot(ctx context.Context, sessionUid string, snapsho // start uploading the snapshot asynchronously while we return a success response to the client go func() { if err := s.uploadSnapshot(context.Background(), session, snapshot, uploadUrl); err != nil { - s.log.Error("uploading snapshot", "err", err) + s.log.Error("uploading snapshot", "err", err.Error()) + // Update status to error with retries + if err := s.updateStatusWithRetries(context.Background(), cloudmigration.UpdateSnapshotCmd{ + UID: snapshot.UID, + Status: cloudmigration.SnapshotStatusError, + }); err != nil { + s.log.Error("critical failure during snapshot upload - please report any error logs") + } } }() diff --git a/pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go b/pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go index d922ceb8331..86faee333dc 100644 --- a/pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go +++ b/pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go @@ -24,23 +24,16 @@ import ( func (s *Service) getMigrationDataJSON(ctx context.Context, signedInUser *user.SignedInUser) (*cloudmigration.MigrateDataRequest, error) { // Data sources - dataSources, err := s.getDataSources(ctx) + dataSources, err := s.getDataSourceCommands(ctx) if err != nil { s.log.Error("Failed to get datasources", "err", err) return nil, err } - // Dashboards - dashboards, err := s.getDashboards(ctx) + // Dashboards and folders are linked via the schema, so we need to get both + dashboards, folders, err := s.getDashboardAndFolderCommands(ctx, signedInUser) if err != nil { - s.log.Error("Failed to get dashboards", "err", err) - return nil, err - } - - // Folders - folders, err := s.getFolders(ctx, signedInUser) - if err != nil { - s.log.Error("Failed to get folders", "err", err) + s.log.Error("Failed to get dashboards and folders", "err", err) return nil, err } @@ -58,13 +51,7 @@ func (s *Service) getMigrationDataJSON(ctx context.Context, signedInUser *user.S }) } - softDeleteEnabled := s.features.IsEnabledGlobally(featuremgmt.FlagDashboardRestore) - for _, dashboard := range dashboards { - if softDeleteEnabled && !dashboard.Deleted.IsZero() { - continue - } - dashboard.Data.Del("id") migrationDataSlice = append(migrationDataSlice, cloudmigration.MigrateDataRequestItem{ Type: cloudmigration.DashboardDataType, @@ -90,7 +77,7 @@ func (s *Service) getMigrationDataJSON(ctx context.Context, signedInUser *user.S return migrationData, nil } -func (s *Service) getDataSources(ctx context.Context) ([]datasources.AddDataSourceCommand, error) { +func (s *Service) getDataSourceCommands(ctx context.Context) ([]datasources.AddDataSourceCommand, error) { dataSources, err := s.dsService.GetAllDataSources(ctx, &datasources.GetAllDataSourcesQuery{}) if err != nil { s.log.Error("Failed to get all datasources", "err", err) @@ -127,34 +114,50 @@ func (s *Service) getDataSources(ctx context.Context) ([]datasources.AddDataSour return result, err } -func (s *Service) getFolders(ctx context.Context, signedInUser *user.SignedInUser) ([]folder.Folder, error) { - folders, err := s.folderService.GetFolders(ctx, folder.GetFoldersQuery{ - SignedInUser: signedInUser, - }) +func (s *Service) getDashboardAndFolderCommands(ctx context.Context, signedInUser *user.SignedInUser) ([]dashboards.Dashboard, []folder.CreateFolderCommand, error) { + dashs, err := s.dashboardService.GetAllDashboards(ctx) if err != nil { - return nil, err + return nil, nil, err } - result := make([]folder.Folder, len(folders)) - for i, folder := range folders { - result[i] = *folder - } + dashboardCmds := make([]dashboards.Dashboard, 0) + folderUids := make([]string, 0) + softDeleteEnabled := s.features.IsEnabledGlobally(featuremgmt.FlagDashboardRestore) - return result, nil -} + // Folders need to be fetched by UID in a separate step, separate dashboards from folders + // If any result is in the trash bin, don't migrate it + for _, d := range dashs { + if softDeleteEnabled && !d.Deleted.IsZero() { + continue + } -func (s *Service) getDashboards(ctx context.Context) ([]dashboards.Dashboard, error) { - dashs, err := s.dashboardService.GetAllDashboards(ctx) + if d.IsFolder { + folderUids = append(folderUids, d.UID) + } else { + dashboardCmds = append(dashboardCmds, *d) + } + } + + folders, err := s.folderService.GetFolders(ctx, folder.GetFoldersQuery{ + UIDs: folderUids, + SignedInUser: signedInUser, + WithFullpathUIDs: true, + }) if err != nil { - return nil, err + return nil, nil, err } - result := make([]dashboards.Dashboard, len(dashs)) - for i, dashboard := range dashs { - result[i] = *dashboard + folderCmds := make([]folder.CreateFolderCommand, len(folders)) + for i, f := range folders { + folderCmds[i] = folder.CreateFolderCommand{ + UID: f.UID, + Title: f.Title, + Description: f.Description, + ParentUID: f.ParentUID, + } } - return result, nil + return dashboardCmds, folderCmds, nil } // asynchronous process for writing the snapshot to the filesystem and updating the snapshot status @@ -163,16 +166,12 @@ func (s *Service) buildSnapshot(ctx context.Context, signedInUser *user.SignedIn s.buildSnapshotMutex.Lock() defer s.buildSnapshotMutex.Unlock() - // update snapshot status to creating, add some retries since this is a background task - if err := retryer.Retry(func() (retryer.RetrySignal, error) { - err := s.store.UpdateSnapshot(ctx, cloudmigration.UpdateSnapshotCmd{ - UID: snapshotMeta.UID, - Status: cloudmigration.SnapshotStatusCreating, - }) - return retryer.FuncComplete, err - }, 10, time.Millisecond*100, time.Second*10); err != nil { - s.log.Error("failed to set snapshot status to 'creating'", "err", err) - return fmt.Errorf("setting snapshot status to creating: snapshotUID=%s %w", snapshotMeta.UID, err) + // Update status to snapshot creating with retries + if err := s.updateStatusWithRetries(ctx, cloudmigration.UpdateSnapshotCmd{ + UID: snapshotMeta.UID, + Status: cloudmigration.SnapshotStatusCreating, + }); err != nil { + return err } publicKey, privateKey, err := box.GenerateKey(cryptoRand.Reader) @@ -199,18 +198,19 @@ func (s *Service) buildSnapshot(ctx context.Context, signedInUser *user.SignedIn localSnapshotResource := make([]cloudmigration.CloudMigrationResource, len(migrationData.Items)) resourcesGroupedByType := make(map[cloudmigration.MigrateDataType][]snapshot.MigrateDataRequestItemDTO, 0) - for _, item := range migrationData.Items { + for i, item := range migrationData.Items { resourcesGroupedByType[item.Type] = append(resourcesGroupedByType[item.Type], snapshot.MigrateDataRequestItemDTO{ Type: snapshot.MigrateDataType(item.Type), RefID: item.RefID, Name: item.Name, Data: item.Data, }) - localSnapshotResource = append(localSnapshotResource, cloudmigration.CloudMigrationResource{ + + localSnapshotResource[i] = cloudmigration.CloudMigrationResource{ Type: item.Type, RefID: item.RefID, Status: cloudmigration.ItemStatusPending, - }) + } } for _, resourceType := range []cloudmigration.MigrateDataType{ @@ -232,17 +232,13 @@ func (s *Service) buildSnapshot(ctx context.Context, signedInUser *user.SignedIn return fmt.Errorf("finishing writing snapshot files and generating index file: %w", err) } - // update snapshot status to pending upload with retry - if err := retryer.Retry(func() (retryer.RetrySignal, error) { - err := s.store.UpdateSnapshot(ctx, cloudmigration.UpdateSnapshotCmd{ - UID: snapshotMeta.UID, - Status: cloudmigration.SnapshotStatusPendingUpload, - Resources: localSnapshotResource, - }) - return retryer.FuncComplete, err - }, 10, time.Millisecond*100, time.Second*10); err != nil { - s.log.Error("failed to set snapshot status to 'pending upload'", "err", err) - return fmt.Errorf("setting snapshot status to pending upload: snapshotID=%s %w", snapshotMeta.UID, err) + // update snapshot status to pending upload with retries + if err := s.updateStatusWithRetries(ctx, cloudmigration.UpdateSnapshotCmd{ + UID: snapshotMeta.UID, + Status: cloudmigration.SnapshotStatusPendingUpload, + Resources: localSnapshotResource, + }); err != nil { + return err } return nil @@ -254,15 +250,12 @@ func (s *Service) uploadSnapshot(ctx context.Context, session *cloudmigration.Cl s.buildSnapshotMutex.Lock() defer s.buildSnapshotMutex.Unlock() - // update snapshot status to uploading, add some retries since this is a background task - if err := retryer.Retry(func() (retryer.RetrySignal, error) { - err := s.store.UpdateSnapshot(ctx, cloudmigration.UpdateSnapshotCmd{ - UID: snapshotMeta.UID, - Status: cloudmigration.SnapshotStatusUploading, - }) - return retryer.FuncComplete, err - }, 10, time.Millisecond*100, time.Second*10); err != nil { - return fmt.Errorf("failed to set snapshot status to 'creating': %w", err) + // update snapshot status to uploading with retries + if err := s.updateStatusWithRetries(ctx, cloudmigration.UpdateSnapshotCmd{ + UID: snapshotMeta.UID, + Status: cloudmigration.SnapshotStatusUploading, + }); err != nil { + return err } indexFilePath := filepath.Join(snapshotMeta.LocalDir, "index.json") @@ -304,11 +297,14 @@ func (s *Service) uploadSnapshot(ctx context.Context, session *cloudmigration.Cl return fmt.Errorf("uploading file using presigned url: %w", err) } - if err := s.store.UpdateSnapshot(ctx, cloudmigration.UpdateSnapshotCmd{ + s.log.Info("successfully uploaded snapshot", "snapshotUid", snapshotMeta.UID, "cloud_snapshotUid", snapshotMeta.GMSSnapshotUID) + + // update snapshot status to processing with retries + if err := s.updateStatusWithRetries(ctx, cloudmigration.UpdateSnapshotCmd{ UID: snapshotMeta.UID, Status: cloudmigration.SnapshotStatusProcessing, }); err != nil { - return fmt.Errorf("updating snapshot: %w", err) + return err } return nil @@ -333,3 +329,14 @@ func (s *Service) uploadUsingPresignedURL(ctx context.Context, uploadURL, key st return nil } + +func (s *Service) updateStatusWithRetries(ctx context.Context, cmd cloudmigration.UpdateSnapshotCmd) (err error) { + if err := retryer.Retry(func() (retryer.RetrySignal, error) { + err := s.store.UpdateSnapshot(ctx, cmd) + return retryer.FuncComplete, err + }, 10, time.Millisecond*100, time.Second*10); err != nil { + s.log.Error("failed to update snapshot status", "snapshotUid", cmd.UID, "status", cmd.Status, "num_resources", len(cmd.Resources), "error", err.Error()) + return fmt.Errorf("failed to update snapshot status: %w", err) + } + return nil +} diff --git a/pkg/services/cloudmigration/gmsclient/gms_client.go b/pkg/services/cloudmigration/gmsclient/gms_client.go index 19f152d9966..d06b788a1f9 100644 --- a/pkg/services/cloudmigration/gmsclient/gms_client.go +++ b/pkg/services/cloudmigration/gmsclient/gms_client.go @@ -167,7 +167,7 @@ func (c *gmsClientImpl) GetSnapshotStatus(ctx context.Context, session cloudmigr defer c.getStatusMux.Unlock() logger := c.log.FromContext(ctx) - path := fmt.Sprintf("%s/api/v1/status/%s/status?offset=%d", c.buildBasePath(session.ClusterSlug), snapshot.GMSSnapshotUID, offset) + path := fmt.Sprintf("%s/api/v1/snapshots/%s/status?offset=%d", c.buildBasePath(session.ClusterSlug), snapshot.GMSSnapshotUID, offset) // Send the request to gms with the associated auth token req, err := http.NewRequest(http.MethodGet, path, nil) @@ -209,7 +209,7 @@ func (c *gmsClientImpl) GetSnapshotStatus(ctx context.Context, session cloudmigr func (c *gmsClientImpl) CreatePresignedUploadUrl(ctx context.Context, session cloudmigration.CloudMigrationSession, snapshot cloudmigration.CloudMigrationSnapshot) (string, error) { logger := c.log.FromContext(ctx) - path := fmt.Sprintf("%s/api/v1/status/%s/create-upload-url", c.buildBasePath(session.ClusterSlug), snapshot.GMSSnapshotUID) + path := fmt.Sprintf("%s/api/v1/snapshots/%s/create-upload-url", c.buildBasePath(session.ClusterSlug), snapshot.GMSSnapshotUID) // Send the request to gms with the associated auth token req, err := http.NewRequest(http.MethodPost, path, nil)