From d1952bb68142e4da857bbc82b2d5517a7503b8fe Mon Sep 17 00:00:00 2001 From: Bruno Date: Wed, 3 Jul 2024 10:38:26 -0300 Subject: [PATCH] Cloud migrations: create snapshot files (#89693) * Cloud migrations: create snapshot and store it on disk * fix merge conflicts * implement StartSnapshot for gms client * pass snapshot directory as argument to snapshot builder * ensure snapshot folder is set * make swagger-gen * remove Test_ExecuteAsyncWorkflow * pass signed in user to buildSnapshot method / use github.com/grafana/grafana-cloud-migration-snapshot to create snapshot files * fix FakeServiceImpl.CreateSnapshot * remove new line --- conf/defaults.ini | 4 + go.mod | 7 +- go.sum | 2 + pkg/services/cloudmigration/api/api.go | 3 +- .../cloudmigration/api/curl_commands.txt | 2 +- pkg/services/cloudmigration/cloudmigration.go | 3 +- .../cloudmigrationimpl/cloudmigration.go | 36 ++++---- .../cloudmigrationimpl/cloudmigration_noop.go | 3 +- .../cloudmigrationimpl/cloudmigration_test.go | 71 +-------------- .../fake/cloudmigration_fake.go | 3 +- .../cloudmigrationimpl/snapshot_mgmt.go | 90 ++++++++++++++----- .../cloudmigration/gmsclient/client.go | 2 +- .../cloudmigration/gmsclient/gms_client.go | 39 +++++++- .../gmsclient/inmemory_client.go | 12 +-- pkg/services/cloudmigration/model.go | 12 ++- .../cloudmigration/slicesext/slicesext.go | 33 +++++++ .../slicesext/slicesext_test.go | 80 +++++++++++++++++ pkg/setting/setting_cloud_migration.go | 10 +++ 18 files changed, 285 insertions(+), 127 deletions(-) create mode 100644 pkg/services/cloudmigration/slicesext/slicesext.go create mode 100644 pkg/services/cloudmigration/slicesext/slicesext_test.go diff --git a/conf/defaults.ini b/conf/defaults.ini index 931b9268897..88ed4791098 100644 --- a/conf/defaults.ini +++ b/conf/defaults.ini @@ -1929,6 +1929,8 @@ enabled = true is_target = false # Token used to send requests to grafana com gcom_api_token = "" +# How long to wait for a request sent to gms to start a snapshot to complete +start_snapshot_timeout = 5s # How long to wait for a request to fetch an instance to complete fetch_instance_timeout = 5s # How long to wait for a request to create an access policy to complete @@ -1939,3 +1941,5 @@ fetch_access_policy_timeout = 5s delete_access_policy_timeout = 5s # The domain name used to access cms domain = grafana-dev.net +# Folder used to store snapshot files. Defaults to the home dir +snapshot_folder = "" \ No newline at end of file diff --git a/go.mod b/go.mod index 709f49e3c88..a1a32c1d0ac 100644 --- a/go.mod +++ b/go.mod @@ -89,6 +89,7 @@ require ( github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586 // @grafana/grafana-operator-experience-squad github.com/grafana/grafana-aws-sdk v0.28.0 // @grafana/aws-datasources github.com/grafana/grafana-azure-sdk-go/v2 v2.0.4 // @grafana/partner-datasources + github.com/grafana/grafana-cloud-migration-snapshot v1.0.0 // @grafana/grafana-operator-experience-squad github.com/grafana/grafana-google-sdk-go v0.1.0 // @grafana/partner-datasources github.com/grafana/grafana-openapi-client-go v0.0.0-20231213163343-bd475d63fb79 // @grafana/grafana-backend-group github.com/grafana/grafana-plugin-sdk-go v0.235.0 // @grafana/plugins-platform-backend @@ -381,10 +382,7 @@ require ( github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/rivo/uniseg v0.4.7 // indirect github.com/rs/cors v1.10.1 // @grafana/identity-access-team - github.com/russross/blackfriday/v2 v2.1.0 // indirect - github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect github.com/segmentio/asm v1.2.0 // indirect - github.com/segmentio/encoding v0.3.6 // indirect github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3 // indirect github.com/shopspring/decimal v1.3.1 // indirect github.com/shurcooL/httpfs v0.0.0-20230704072500-f1e31cf0ba5c // indirect @@ -458,8 +456,11 @@ require ( github.com/ncruces/go-strftime v0.1.9 // indirect github.com/pelletier/go-toml/v2 v2.1.1 // indirect github.com/pressly/goose/v3 v3.20.0 // indirect + github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect + github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect + github.com/segmentio/encoding v0.3.6 // indirect github.com/sethvargo/go-retry v0.2.4 // indirect github.com/sourcegraph/conc v0.3.0 // indirect github.com/spf13/afero v1.11.0 // indirect diff --git a/go.sum b/go.sum index 7db2c385dae..74c8f8e568e 100644 --- a/go.sum +++ b/go.sum @@ -2321,6 +2321,8 @@ github.com/grafana/grafana-aws-sdk v0.28.0 h1:ShdA+msLPGJGWWS1SFUYnF+ch1G3gUOlAd github.com/grafana/grafana-aws-sdk v0.28.0/go.mod h1:ZSVPU7IIJSi5lEg+K3Js+EUpZLXxUaBdaQWH+As1ihI= github.com/grafana/grafana-azure-sdk-go/v2 v2.0.4 h1:z6amQ286IJSBctHf6c+ibJq/v0+TvmEjVkrdMNBd4uY= github.com/grafana/grafana-azure-sdk-go/v2 v2.0.4/go.mod h1:aKlFPE36IDa8qccRg3KbgZX3MQ5xymS3RelT4j6kkVU= +github.com/grafana/grafana-cloud-migration-snapshot v1.0.0 h1:vOepRtpYS5ssG/PXLTpc/7OcL4lJiGruiU3Cw0c0DE4= +github.com/grafana/grafana-cloud-migration-snapshot v1.0.0/go.mod h1:rWNhyxYkgiXgV7xZ4yOQzMV08yikO8L8S8M5KNoQNpA= github.com/grafana/grafana-google-sdk-go v0.1.0 h1:LKGY8z2DSxKjYfr2flZsWgTRTZ6HGQbTqewE3JvRaNA= github.com/grafana/grafana-google-sdk-go v0.1.0/go.mod h1:Vo2TKWfDVmNTELBUM+3lkrZvFtBws0qSZdXhQxRdJrE= github.com/grafana/grafana-openapi-client-go v0.0.0-20231213163343-bd475d63fb79 h1:r+mU5bGMzcXCRVAuOrTn54S80qbfVkvTdUJZfSfTNbs= diff --git a/pkg/services/cloudmigration/api/api.go b/pkg/services/cloudmigration/api/api.go index cc221f66b98..e436857ab64 100644 --- a/pkg/services/cloudmigration/api/api.go +++ b/pkg/services/cloudmigration/api/api.go @@ -376,11 +376,12 @@ func (cma *CloudMigrationAPI) CreateSnapshot(c *contextmodel.ReqContext) respons defer span.End() uid := web.Params(c.Req)[":uid"] + if err := util.ValidateUID(uid); err != nil { return response.ErrOrFallback(http.StatusBadRequest, "invalid session uid", err) } - ss, err := cma.cloudMigrationService.CreateSnapshot(ctx, uid) + ss, err := cma.cloudMigrationService.CreateSnapshot(ctx, c.SignedInUser, uid) if err != nil { return response.ErrOrFallback(http.StatusInternalServerError, "error creating snapshot", err) } diff --git a/pkg/services/cloudmigration/api/curl_commands.txt b/pkg/services/cloudmigration/api/curl_commands.txt index f4556aae0e1..925df467c31 100644 --- a/pkg/services/cloudmigration/api/curl_commands.txt +++ b/pkg/services/cloudmigration/api/curl_commands.txt @@ -1,7 +1,7 @@ [sample token] // NOT A REAL TOKEN eyJUb2tlbiI6ImNvbXBsZXRlbHlfZmFrZV90b2tlbl9jZG9peTFhYzdwdXlwZCIsIkluc3RhbmNlIjp7IlN0YWNrSUQiOjEyMzQ1LCJTbHVnIjoic3R1Ymluc3RhbmNlIiwiUmVnaW9uU2x1ZyI6ImZha2UtcmVnaW9uIiwiQ2x1c3RlclNsdWciOiJmYWtlLWNsdXNlciJ9fQ== -[create session} +[create session] curl -X POST -H "Content-Type: application/json" \ http://admin:admin@localhost:3000/api/cloudmigration/migration \ -d '{"AuthToken":"eyJUb2tlbiI6ImNvbXBsZXRlbHlfZmFrZV90b2tlbl9jZG9peTFhYzdwdXlwZCIsIkluc3RhbmNlIjp7IlN0YWNrSUQiOjEyMzQ1LCJTbHVnIjoic3R1Ymluc3RhbmNlIiwiUmVnaW9uU2x1ZyI6ImZha2UtcmVnaW9uIiwiQ2x1c3RlclNsdWciOiJmYWtlLWNsdXNlciJ9fQ=="}' diff --git a/pkg/services/cloudmigration/cloudmigration.go b/pkg/services/cloudmigration/cloudmigration.go index a431ba10f67..5c0d13cbe08 100644 --- a/pkg/services/cloudmigration/cloudmigration.go +++ b/pkg/services/cloudmigration/cloudmigration.go @@ -4,6 +4,7 @@ import ( "context" "github.com/grafana/grafana/pkg/services/gcom" + "github.com/grafana/grafana/pkg/services/user" ) type Service interface { @@ -24,7 +25,7 @@ type Service interface { GetMigrationStatus(ctx context.Context, runUID string) (*CloudMigrationSnapshot, error) GetMigrationRunList(ctx context.Context, migUID string) (*CloudMigrationRunList, error) - CreateSnapshot(ctx context.Context, sessionUid string) (*CloudMigrationSnapshot, error) + CreateSnapshot(ctx context.Context, signedInUser *user.SignedInUser, sessionUid string) (*CloudMigrationSnapshot, error) GetSnapshot(ctx context.Context, query GetSnapshotsQuery) (*CloudMigrationSnapshot, error) GetSnapshotList(ctx context.Context, query ListSnapshotsQuery) ([]CloudMigrationSnapshot, error) UploadSnapshot(ctx context.Context, sessionUid string, snapshotUid string) error diff --git a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go index 3b631d84bd9..7b085ea15d8 100644 --- a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go +++ b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go @@ -7,7 +7,6 @@ import ( "errors" "fmt" "net/http" - "os" "path/filepath" "sync" "time" @@ -26,6 +25,7 @@ import ( "github.com/grafana/grafana/pkg/services/folder" "github.com/grafana/grafana/pkg/services/gcom" "github.com/grafana/grafana/pkg/services/secrets" + "github.com/grafana/grafana/pkg/services/user" "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/util" "github.com/prometheus/client_golang/prometheus" @@ -41,7 +41,6 @@ type Service struct { cfg *setting.Cfg buildSnapshotMutex sync.Mutex - buildSnapshotError bool features featuremgmt.FeatureToggles gmsClient gmsclient.Client @@ -391,7 +390,7 @@ func (s *Service) RunMigration(ctx context.Context, uid string) (*cloudmigration } // Get migration data JSON - request, err := s.getMigrationDataJSON(ctx) + request, err := s.getMigrationDataJSON(ctx, &user.SignedInUser{}) if err != nil { s.log.Error("error getting the json request body for migration run", "err", err.Error()) return nil, fmt.Errorf("migration data get error: %w", err) @@ -459,8 +458,10 @@ func (s *Service) DeleteSession(ctx context.Context, uid string) (*cloudmigratio return c, nil } -func (s *Service) CreateSnapshot(ctx context.Context, sessionUid string) (*cloudmigration.CloudMigrationSnapshot, error) { - ctx, span := s.tracer.Start(ctx, "CloudMigrationService.CreateSnapshot") +func (s *Service) CreateSnapshot(ctx context.Context, signedInUser *user.SignedInUser, sessionUid string) (*cloudmigration.CloudMigrationSnapshot, error) { + ctx, span := s.tracer.Start(ctx, "CloudMigrationService.CreateSnapshot", trace.WithAttributes( + attribute.String("sessionUid", sessionUid), + )) defer span.End() // fetch session for the gms auth token @@ -470,28 +471,25 @@ func (s *Service) CreateSnapshot(ctx context.Context, sessionUid string) (*cloud } // query gms to establish new snapshot - initResp, err := s.gmsClient.InitializeSnapshot(ctx, *session) + timeoutCtx, cancel := context.WithTimeout(ctx, s.cfg.CloudMigration.StartSnapshotTimeout) + defer cancel() + initResp, err := s.gmsClient.StartSnapshot(timeoutCtx, *session) if err != nil { return nil, fmt.Errorf("initializing snapshot with GMS for session %s: %w", sessionUid, err) } - // create new directory for snapshot writing - snapshotUid := util.GenerateShortUID() - dir := filepath.Join("cloudmigration.snapshots", fmt.Sprintf("snapshot-%s-%s", snapshotUid, initResp.GMSSnapshotUID)) - err = os.MkdirAll(dir, 0750) - if err != nil { - return nil, fmt.Errorf("creating snapshot directory: %w", err) + if s.cfg.CloudMigration.SnapshotFolder == "" { + return nil, fmt.Errorf("snapshot folder is not set") } - // save snapshot to the db snapshot := cloudmigration.CloudMigrationSnapshot{ - UID: snapshotUid, + UID: util.GenerateShortUID(), SessionUID: sessionUid, Status: cloudmigration.SnapshotStatusInitializing, EncryptionKey: initResp.EncryptionKey, UploadURL: initResp.UploadURL, - GMSSnapshotUID: initResp.GMSSnapshotUID, - LocalDir: dir, + GMSSnapshotUID: initResp.SnapshotID, + LocalDir: filepath.Join(s.cfg.CloudMigration.SnapshotFolder, "grafana", "snapshots", initResp.SnapshotID), } uid, err := s.store.CreateSnapshot(ctx, snapshot) @@ -501,7 +499,11 @@ func (s *Service) CreateSnapshot(ctx context.Context, sessionUid string) (*cloud snapshot.UID = uid // start building the snapshot asynchronously while we return a success response to the client - go s.buildSnapshot(context.Background(), snapshot) + go func() { + if err := s.buildSnapshot(context.Background(), signedInUser, initResp.MaxItemsPerPartition, snapshot); err != nil { + s.log.Error("building snapshot", "err", err.Error()) + } + }() return &snapshot, nil } diff --git a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_noop.go b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_noop.go index d435b9f1ba8..1027f1254f4 100644 --- a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_noop.go +++ b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_noop.go @@ -5,6 +5,7 @@ import ( "github.com/grafana/grafana/pkg/services/cloudmigration" "github.com/grafana/grafana/pkg/services/gcom" + "github.com/grafana/grafana/pkg/services/user" ) // NoopServiceImpl Define the Service Implementation. @@ -60,7 +61,7 @@ func (s *NoopServiceImpl) RunMigration(context.Context, string) (*cloudmigration return nil, cloudmigration.ErrFeatureDisabledError } -func (s *NoopServiceImpl) CreateSnapshot(ctx context.Context, sessionUid string) (*cloudmigration.CloudMigrationSnapshot, error) { +func (s *NoopServiceImpl) CreateSnapshot(ctx context.Context, user *user.SignedInUser, sessionUid string) (*cloudmigration.CloudMigrationSnapshot, error) { return nil, cloudmigration.ErrFeatureDisabledError } diff --git a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_test.go b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_test.go index 04ac1ff3217..eef78c9401f 100644 --- a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_test.go +++ b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_test.go @@ -2,8 +2,11 @@ package cloudmigrationimpl import ( "context" + "os" + "path/filepath" "testing" + "github.com/google/uuid" "github.com/grafana/grafana/pkg/api/routing" "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/infra/db" @@ -109,73 +112,6 @@ func Test_CreateGetRunMigrationsAndRuns(t *testing.T) { require.NotNil(t, createResp.UID, delMigResp.UID) } -func Test_ExecuteAsyncWorkflow(t *testing.T) { - s := setUpServiceTest(t, false) - - createTokenResp, err := s.CreateToken(context.Background()) - assert.NoError(t, err) - assert.NotEmpty(t, createTokenResp.Token) - - cmd := cloudmigration.CloudMigrationSessionRequest{ - AuthToken: createTokenResp.Token, - } - - createResp, err := s.CreateSession(context.Background(), cmd) - require.NoError(t, err) - require.NotEmpty(t, createResp.UID) - require.NotEmpty(t, createResp.Slug) - - getSessionResp, err := s.GetSession(context.Background(), createResp.UID) - require.NoError(t, err) - require.NotNil(t, getSessionResp) - require.Equal(t, createResp.UID, getSessionResp.UID) - require.Equal(t, createResp.Slug, getSessionResp.Slug) - - listResp, err := s.GetSessionList(context.Background()) - require.NoError(t, err) - require.NotNil(t, listResp) - require.Equal(t, 1, len(listResp.Sessions)) - require.Equal(t, createResp.UID, listResp.Sessions[0].UID) - require.Equal(t, createResp.Slug, listResp.Sessions[0].Slug) - - sessionUid := createResp.UID - snapshotResp, err := s.CreateSnapshot(ctxWithSignedInUser(), sessionUid) - require.NoError(t, err) - require.NotEmpty(t, snapshotResp.UID) - require.Equal(t, sessionUid, snapshotResp.SessionUID) - snapshotUid := snapshotResp.UID - - // Service doesn't currently expose updating a snapshot externally, so we will just manually add a resource - err = (s.(*Service)).store.CreateUpdateSnapshotResources(context.Background(), snapshotUid, []cloudmigration.CloudMigrationResource{{Type: cloudmigration.DashboardDataType, RefID: "qwerty", Status: cloudmigration.ItemStatusOK}}) - assert.NoError(t, err) - - snapshot, err := s.GetSnapshot(ctxWithSignedInUser(), cloudmigration.GetSnapshotsQuery{ - SnapshotUID: snapshotUid, - SessionUID: sessionUid, - ResultPage: 1, - ResultLimit: 100, - }) - require.NoError(t, err) - assert.Equal(t, snapshotResp.UID, snapshot.UID) - assert.Equal(t, snapshotResp.EncryptionKey, snapshot.EncryptionKey) - assert.Len(t, snapshot.Resources, 1) - assert.Equal(t, "qwerty", snapshot.Resources[0].RefID) - - snapshots, err := s.GetSnapshotList(ctxWithSignedInUser(), cloudmigration.ListSnapshotsQuery{SessionUID: sessionUid, Page: 1, Limit: 100}) - require.NoError(t, err) - assert.Len(t, snapshots, 1) - assert.Equal(t, snapshotResp.UID, snapshots[0].UID) - assert.Equal(t, snapshotResp.EncryptionKey, snapshots[0].EncryptionKey) - assert.Empty(t, snapshots[0].Resources) - - err = s.UploadSnapshot(ctxWithSignedInUser(), sessionUid, snapshotUid) - require.NoError(t, err) - - assert.Panics(t, func() { - err = s.CancelSnapshot(ctxWithSignedInUser(), sessionUid, snapshotUid) - }) -} - func ctxWithSignedInUser() context.Context { c := &contextmodel.ReqContext{ SignedInUser: &user.SignedInUser{OrgID: 1}, @@ -202,6 +138,7 @@ func setUpServiceTest(t *testing.T, withDashboardMock bool) cloudmigration.Servi require.NoError(t, err) // dont know if this is the best, but dont want to refactor at the moment cfg.CloudMigration.IsDeveloperMode = true + cfg.CloudMigration.SnapshotFolder = filepath.Join(os.TempDir(), uuid.NewString()) dashboardService := dashboards.NewFakeDashboardService(t) if withDashboardMock { diff --git a/pkg/services/cloudmigration/cloudmigrationimpl/fake/cloudmigration_fake.go b/pkg/services/cloudmigration/cloudmigrationimpl/fake/cloudmigration_fake.go index 2de9080c4f0..9192ebd99b5 100644 --- a/pkg/services/cloudmigration/cloudmigrationimpl/fake/cloudmigration_fake.go +++ b/pkg/services/cloudmigration/cloudmigrationimpl/fake/cloudmigration_fake.go @@ -7,6 +7,7 @@ import ( "github.com/grafana/grafana/pkg/services/cloudmigration" "github.com/grafana/grafana/pkg/services/gcom" + "github.com/grafana/grafana/pkg/services/user" ) var fixedDate = time.Date(2024, 6, 5, 17, 30, 40, 0, time.UTC) @@ -129,7 +130,7 @@ func (m FakeServiceImpl) GetMigrationRunList(_ context.Context, _ string) (*clou }, nil } -func (m FakeServiceImpl) CreateSnapshot(ctx context.Context, sessionUid string) (*cloudmigration.CloudMigrationSnapshot, error) { +func (m FakeServiceImpl) CreateSnapshot(ctx context.Context, user *user.SignedInUser, sessionUid string) (*cloudmigration.CloudMigrationSnapshot, error) { if m.ReturnError { return nil, fmt.Errorf("mock error") } diff --git a/pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go b/pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go index 0d43a3c7127..fad2fba3d2e 100644 --- a/pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go +++ b/pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go @@ -2,17 +2,24 @@ package cloudmigrationimpl import ( "context" + cryptoRand "crypto/rand" + "fmt" "time" + snapshot "github.com/grafana/grafana-cloud-migration-snapshot/src" + "github.com/grafana/grafana-cloud-migration-snapshot/src/contracts" + "github.com/grafana/grafana-cloud-migration-snapshot/src/infra/crypto" "github.com/grafana/grafana/pkg/services/cloudmigration" - "github.com/grafana/grafana/pkg/services/contexthandler" + "github.com/grafana/grafana/pkg/services/cloudmigration/slicesext" "github.com/grafana/grafana/pkg/services/dashboards" "github.com/grafana/grafana/pkg/services/datasources" "github.com/grafana/grafana/pkg/services/folder" + "github.com/grafana/grafana/pkg/services/user" "github.com/grafana/grafana/pkg/util/retryer" + "golang.org/x/crypto/nacl/box" ) -func (s *Service) getMigrationDataJSON(ctx context.Context) (*cloudmigration.MigrateDataRequest, error) { +func (s *Service) getMigrationDataJSON(ctx context.Context, signedInUser *user.SignedInUser) (*cloudmigration.MigrateDataRequest, error) { // Data sources dataSources, err := s.getDataSources(ctx) if err != nil { @@ -28,7 +35,7 @@ func (s *Service) getMigrationDataJSON(ctx context.Context) (*cloudmigration.Mig } // Folders - folders, err := s.getFolders(ctx) + folders, err := s.getFolders(ctx, signedInUser) if err != nil { s.log.Error("Failed to get folders", "err", err) return nil, err @@ -111,10 +118,9 @@ func (s *Service) getDataSources(ctx context.Context) ([]datasources.AddDataSour return result, err } -func (s *Service) getFolders(ctx context.Context) ([]folder.Folder, error) { - reqCtx := contexthandler.FromContext(ctx) +func (s *Service) getFolders(ctx context.Context, signedInUser *user.SignedInUser) ([]folder.Folder, error) { folders, err := s.folderService.GetFolders(ctx, folder.GetFoldersQuery{ - SignedInUser: reqCtx.SignedInUser, + SignedInUser: signedInUser, }) if err != nil { return nil, err @@ -143,11 +149,10 @@ func (s *Service) getDashboards(ctx context.Context) ([]dashboards.Dashboard, er } // asynchronous process for writing the snapshot to the filesystem and updating the snapshot status -func (s *Service) buildSnapshot(ctx context.Context, snapshotMeta cloudmigration.CloudMigrationSnapshot) { +func (s *Service) buildSnapshot(ctx context.Context, signedInUser *user.SignedInUser, maxItemsPerPartition uint32, snapshotMeta cloudmigration.CloudMigrationSnapshot) error { // TODO -- make sure we can only build one snapshot at a time s.buildSnapshotMutex.Lock() defer s.buildSnapshotMutex.Unlock() - s.buildSnapshotError = false // update snapshot status to creating, add some retries since this is a background task if err := retryer.Retry(func() (retryer.RetrySignal, error) { @@ -158,18 +163,60 @@ func (s *Service) buildSnapshot(ctx context.Context, snapshotMeta cloudmigration return retryer.FuncComplete, err }, 10, time.Millisecond*100, time.Second*10); err != nil { s.log.Error("failed to set snapshot status to 'creating'", "err", err) - s.buildSnapshotError = true - return + return fmt.Errorf("setting snapshot status to creating: snapshotUID=%s %w", snapshotMeta.UID, err) } - // build snapshot - // just sleep for now to simulate snapshot creation happening - // need to do a couple of fancy things when we implement this: - // - some sort of regular check-in so we know we haven't timed out - // - a channel to listen for cancel events - // - retries baked into the snapshot writing process? - s.log.Debug("snapshot meta", "snapshot", snapshotMeta) - time.Sleep(3 * time.Second) + publicKey, privateKey, err := box.GenerateKey(cryptoRand.Reader) + if err != nil { + return fmt.Errorf("nacl: generating public and private key: %w", err) + } + + // Use GMS public key + the grafana generated private private key to encrypt snapshot files. + snapshotWriter, err := snapshot.NewSnapshotWriter(contracts.AssymetricKeys{ + Public: []byte(snapshotMeta.EncryptionKey), + Private: privateKey[:], + }, + crypto.NewNacl(), + snapshotMeta.LocalDir, + ) + if err != nil { + return fmt.Errorf("instantiating snapshot writer: %w", err) + } + + migrationData, err := s.getMigrationDataJSON(ctx, signedInUser) + if err != nil { + return fmt.Errorf("fetching migration data: %w", err) + } + + resourcesGroupedByType := make(map[cloudmigration.MigrateDataType][]snapshot.MigrateDataRequestItemDTO, 0) + for _, item := range migrationData.Items { + resourcesGroupedByType[item.Type] = append(resourcesGroupedByType[item.Type], snapshot.MigrateDataRequestItemDTO{ + Type: snapshot.MigrateDataType(item.Type), + RefID: item.RefID, + Name: item.Name, + Data: item.Data, + }) + } + + for _, resourceType := range []cloudmigration.MigrateDataType{ + cloudmigration.DatasourceDataType, + cloudmigration.FolderDataType, + cloudmigration.DashboardDataType, + } { + for _, chunk := range slicesext.Chunks(int(maxItemsPerPartition), resourcesGroupedByType[resourceType]) { + if err := snapshotWriter.Write(string(resourceType), chunk); err != nil { + return fmt.Errorf("writing resources to snapshot writer: resourceType=%s %w", resourceType, err) + } + } + } + + // Add the grafana generated public key to the index file so gms can use it to decrypt the snapshot files later. + // This works because the snapshot files are being encrypted with + // the grafana generated private key + the gms public key. + _, err = snapshotWriter.Finish(publicKey[:]) + if err != nil { + return fmt.Errorf("finishing writing snapshot files and generating index file: %w", err) + } // update snapshot status to pending upload with retry if err := retryer.Retry(func() (retryer.RetrySignal, error) { @@ -180,8 +227,10 @@ func (s *Service) buildSnapshot(ctx context.Context, snapshotMeta cloudmigration return retryer.FuncComplete, err }, 10, time.Millisecond*100, time.Second*10); err != nil { s.log.Error("failed to set snapshot status to 'pending upload'", "err", err) - s.buildSnapshotError = true + return fmt.Errorf("setting snapshot status to pending upload: snapshotID=%s %w", snapshotMeta.UID, err) } + + return nil } // asynchronous process for and updating the snapshot status @@ -189,7 +238,6 @@ func (s *Service) uploadSnapshot(ctx context.Context, snapshotMeta cloudmigratio // TODO -- make sure we can only upload one snapshot at a time s.buildSnapshotMutex.Lock() defer s.buildSnapshotMutex.Unlock() - s.buildSnapshotError = false // update snapshot status to uploading, add some retries since this is a background task if err := retryer.Retry(func() (retryer.RetrySignal, error) { @@ -200,7 +248,6 @@ func (s *Service) uploadSnapshot(ctx context.Context, snapshotMeta cloudmigratio return retryer.FuncComplete, err }, 10, time.Millisecond*100, time.Second*10); err != nil { s.log.Error("failed to set snapshot status to 'creating'", "err", err) - s.buildSnapshotError = true return } @@ -218,7 +265,6 @@ func (s *Service) uploadSnapshot(ctx context.Context, snapshotMeta cloudmigratio return retryer.FuncComplete, err }, 10, time.Millisecond*100, time.Second*10); err != nil { s.log.Error("failed to set snapshot status to 'pending upload'", "err", err) - s.buildSnapshotError = true } // simulate the rest diff --git a/pkg/services/cloudmigration/gmsclient/client.go b/pkg/services/cloudmigration/gmsclient/client.go index 6f13a60acf3..6e0fb157ccc 100644 --- a/pkg/services/cloudmigration/gmsclient/client.go +++ b/pkg/services/cloudmigration/gmsclient/client.go @@ -9,7 +9,7 @@ import ( type Client interface { ValidateKey(context.Context, cloudmigration.CloudMigrationSession) error MigrateData(context.Context, cloudmigration.CloudMigrationSession, cloudmigration.MigrateDataRequest) (*cloudmigration.MigrateDataResponse, error) - InitializeSnapshot(context.Context, cloudmigration.CloudMigrationSession) (*cloudmigration.InitializeSnapshotResponse, error) + StartSnapshot(context.Context, cloudmigration.CloudMigrationSession) (*cloudmigration.StartSnapshotResponse, error) GetSnapshotStatus(context.Context, cloudmigration.CloudMigrationSession, cloudmigration.CloudMigrationSnapshot) (*cloudmigration.CloudMigrationSnapshot, error) } diff --git a/pkg/services/cloudmigration/gmsclient/gms_client.go b/pkg/services/cloudmigration/gmsclient/gms_client.go index 383fcabb8e2..4d4e6cc41b2 100644 --- a/pkg/services/cloudmigration/gmsclient/gms_client.go +++ b/pkg/services/cloudmigration/gmsclient/gms_client.go @@ -111,8 +111,43 @@ func (c *gmsClientImpl) MigrateData(ctx context.Context, cm cloudmigration.Cloud return &result, nil } -func (c *gmsClientImpl) InitializeSnapshot(context.Context, cloudmigration.CloudMigrationSession) (*cloudmigration.InitializeSnapshotResponse, error) { - panic("not implemented") +func (c *gmsClientImpl) StartSnapshot(ctx context.Context, session cloudmigration.CloudMigrationSession) (*cloudmigration.StartSnapshotResponse, error) { + logger := c.log.FromContext(ctx) + + path := fmt.Sprintf("https://cms-%s.%s/cloud-migrations/api/v1/start-snapshot", session.ClusterSlug, c.domain) + + // Send the request to cms with the associated auth token + req, err := http.NewRequest(http.MethodPost, path, nil) + if err != nil { + c.log.Error("error creating http request to start snapshot", "err", err.Error()) + return nil, fmt.Errorf("http request error: %w", err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", fmt.Sprintf("Bearer %d:%s", session.StackID, session.AuthToken)) + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + c.log.Error("error sending http request to start snapshot", "err", err.Error()) + return nil, fmt.Errorf("http request error: %w", err) + } else if resp.StatusCode >= 400 { + c.log.Error("received error response to start snapshot", "statusCode", resp.StatusCode) + return nil, fmt.Errorf("http request error: %w", err) + } + + defer func() { + if err := resp.Body.Close(); err != nil { + logger.Error("closing request body: %w", err) + } + }() + + var result cloudmigration.StartSnapshotResponse + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + logger.Error("unmarshalling response body: %w", err) + return nil, fmt.Errorf("unmarshalling start snapshot response: %w", err) + } + + return &result, nil } func (c *gmsClientImpl) GetSnapshotStatus(context.Context, cloudmigration.CloudMigrationSession, cloudmigration.CloudMigrationSnapshot) (*cloudmigration.CloudMigrationSnapshot, error) { diff --git a/pkg/services/cloudmigration/gmsclient/inmemory_client.go b/pkg/services/cloudmigration/gmsclient/inmemory_client.go index 3148fb109f1..c7b9b9abb87 100644 --- a/pkg/services/cloudmigration/gmsclient/inmemory_client.go +++ b/pkg/services/cloudmigration/gmsclient/inmemory_client.go @@ -15,7 +15,7 @@ func NewInMemoryClient() Client { } type memoryClientImpl struct { - snapshot *cloudmigration.InitializeSnapshotResponse + snapshot *cloudmigration.StartSnapshotResponse } func (c *memoryClientImpl) ValidateKey(ctx context.Context, cm cloudmigration.CloudMigrationSession) error { @@ -48,11 +48,11 @@ func (c *memoryClientImpl) MigrateData( return &result, nil } -func (c *memoryClientImpl) InitializeSnapshot(context.Context, cloudmigration.CloudMigrationSession) (*cloudmigration.InitializeSnapshotResponse, error) { - c.snapshot = &cloudmigration.InitializeSnapshotResponse{ - EncryptionKey: util.GenerateShortUID(), - GMSSnapshotUID: util.GenerateShortUID(), - UploadURL: "localhost:3000", +func (c *memoryClientImpl) StartSnapshot(context.Context, cloudmigration.CloudMigrationSession) (*cloudmigration.StartSnapshotResponse, error) { + c.snapshot = &cloudmigration.StartSnapshotResponse{ + EncryptionKey: util.GenerateShortUID(), + SnapshotID: util.GenerateShortUID(), + UploadURL: "localhost:3000", } return c.snapshot, nil diff --git a/pkg/services/cloudmigration/model.go b/pkg/services/cloudmigration/model.go index 40904dd4d93..fe257f07c50 100644 --- a/pkg/services/cloudmigration/model.go +++ b/pkg/services/cloudmigration/model.go @@ -195,8 +195,12 @@ type CreateSessionResponse struct { SnapshotUid string } -type InitializeSnapshotResponse struct { - EncryptionKey string - UploadURL string - GMSSnapshotUID string +type StartSnapshotResponse struct { + SnapshotID string `json:"snapshotID"` + MaxItemsPerPartition uint32 `json:"maxItemsPerPartition"` + Algo string `json:"algo"` + UploadURL string `json:"uploadURL"` + PresignedURLFormData map[string]string `json:"presignedURLFormData"` + EncryptionKey string `json:"encryptionKey"` + Nonce string `json:"nonce"` } diff --git a/pkg/services/cloudmigration/slicesext/slicesext.go b/pkg/services/cloudmigration/slicesext/slicesext.go new file mode 100644 index 00000000000..66d9fb6b6cf --- /dev/null +++ b/pkg/services/cloudmigration/slicesext/slicesext.go @@ -0,0 +1,33 @@ +package slicesext + +import "math" + +// Partitions the input into slices where the length is <= chunkSize. +// +// Example: +// +// Chunks(2, []int{1, 2, 3, 4}) +// => [][]int{{1, 2}, {3, 4}} +func Chunks[T any](chunkSize int, xs []T) [][]T { + if chunkSize < 0 { + panic("chunk size must be greater than or equal to 0") + } + if chunkSize == 0 { + return [][]T{} + } + + out := make([][]T, 0, int(math.Ceil(float64(len(xs))/float64(chunkSize)))) + + for i := 0; i < len(xs); i += chunkSize { + var chunk []T + if i+chunkSize < len(xs) { + chunk = xs[i : i+chunkSize] + } else { + chunk = xs[i:] + } + + out = append(out, chunk) + } + + return out +} diff --git a/pkg/services/cloudmigration/slicesext/slicesext_test.go b/pkg/services/cloudmigration/slicesext/slicesext_test.go new file mode 100644 index 00000000000..150f92b004e --- /dev/null +++ b/pkg/services/cloudmigration/slicesext/slicesext_test.go @@ -0,0 +1,80 @@ +package slicesext + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestChunks(t *testing.T) { + t.Parallel() + + t.Run("chunkSize must be greater than 0", func(t *testing.T) { + t.Parallel() + + assert.PanicsWithValue(t, "chunk size must be greater than or equal to 0", func() { + Chunks(-1, []string{}) + }) + }) + + t.Run("basic", func(t *testing.T) { + t.Parallel() + + cases := []struct { + description string + chunkSize int + input []int + expected [][]int + }{ + { + description: "empty slice", + chunkSize: 2, + input: []int{}, + expected: [][]int{}, + }, + { + description: "nil slice", + chunkSize: 2, + input: nil, + expected: [][]int{}, + }, + { + description: "chunk size is 0", + chunkSize: 0, + input: []int{1, 2, 3}, + expected: [][]int{}, + }, + { + description: "chunk size is greater than slice length", + chunkSize: 3, + input: []int{1}, + expected: [][]int{{1}}, + }, + { + description: "chunk size is 1", + chunkSize: 1, + input: []int{1, 2, 3}, + expected: [][]int{{1}, {2}, {3}}, + }, + { + description: "chunk size is 2 and slice length is 3", + chunkSize: 2, + input: []int{1, 2, 3}, + expected: [][]int{{1, 2}, {3}}, + }, + { + description: "chunk size is 2 and slice length is 6", + chunkSize: 2, + input: []int{1, 2, 3, 4, 5, 6}, + expected: [][]int{{1, 2}, {3, 4}, {5, 6}}, + }, + } + + for _, tt := range cases { + t.Run(tt.description, func(t *testing.T) { + result := Chunks(tt.chunkSize, tt.input) + assert.Equal(t, tt.expected, result) + }) + } + }) +} diff --git a/pkg/setting/setting_cloud_migration.go b/pkg/setting/setting_cloud_migration.go index 3c68147a5ec..67d295d34b8 100644 --- a/pkg/setting/setting_cloud_migration.go +++ b/pkg/setting/setting_cloud_migration.go @@ -1,12 +1,15 @@ package setting import ( + "os" "time" ) type CloudMigrationSettings struct { IsTarget bool GcomAPIToken string + SnapshotFolder string + StartSnapshotTimeout time.Duration FetchInstanceTimeout time.Duration CreateAccessPolicyTimeout time.Duration FetchAccessPolicyTimeout time.Duration @@ -23,6 +26,8 @@ func (cfg *Cfg) readCloudMigrationSettings() { cloudMigration := cfg.Raw.Section("cloud_migration") cfg.CloudMigration.IsTarget = cloudMigration.Key("is_target").MustBool(false) cfg.CloudMigration.GcomAPIToken = cloudMigration.Key("gcom_api_token").MustString("") + cfg.CloudMigration.SnapshotFolder = cloudMigration.Key("snapshot_folder").MustString("") + cfg.CloudMigration.StartSnapshotTimeout = cloudMigration.Key("start_snapshot_timeout").MustDuration(5 * time.Second) cfg.CloudMigration.FetchInstanceTimeout = cloudMigration.Key("fetch_instance_timeout").MustDuration(5 * time.Second) cfg.CloudMigration.CreateAccessPolicyTimeout = cloudMigration.Key("create_access_policy_timeout").MustDuration(5 * time.Second) cfg.CloudMigration.FetchAccessPolicyTimeout = cloudMigration.Key("fetch_access_policy_timeout").MustDuration(5 * time.Second) @@ -32,4 +37,9 @@ func (cfg *Cfg) readCloudMigrationSettings() { cfg.CloudMigration.DeleteTokenTimeout = cloudMigration.Key("delete_token_timeout").MustDuration(5 * time.Second) cfg.CloudMigration.TokenExpiresAfter = cloudMigration.Key("token_expires_after").MustDuration(7 * 24 * time.Hour) cfg.CloudMigration.IsDeveloperMode = cloudMigration.Key("developer_mode").MustBool(false) + + if cfg.CloudMigration.SnapshotFolder == "" { + homeDir, _ := os.UserHomeDir() + cfg.CloudMigration.SnapshotFolder = homeDir + } }