From 1d4d4b5f31b7750f3dee6683b04efc87bd180110 Mon Sep 17 00:00:00 2001 From: PoorlyDefinedBehaviour Date: Fri, 10 Jan 2025 11:26:14 -0300 Subject: [PATCH] wip --- pkg/services/authapi/fake/authapistub.go | 11 +- pkg/services/cloudmigration/async/async.go | 33 ++ .../cloudmigrationimpl/cloudmigration.go | 46 +- .../cloudmigrationimpl/cloudmigration_test.go | 511 ++++++++++++++++++ .../cloudmigrationimpl/snapshot_mgmt.go | 1 + .../cloudmigrationimpl/xorm_store.go | 2 + 6 files changed, 579 insertions(+), 25 deletions(-) create mode 100644 pkg/services/cloudmigration/async/async.go diff --git a/pkg/services/authapi/fake/authapistub.go b/pkg/services/authapi/fake/authapistub.go index b87f393246b..3f1208eb167 100644 --- a/pkg/services/authapi/fake/authapistub.go +++ b/pkg/services/authapi/fake/authapistub.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/grafana/grafana/pkg/services/authapi" - "github.com/grafana/grafana/pkg/util" ) var _ authapi.Service = (*AuthapiStub)(nil) @@ -16,8 +15,9 @@ type AuthapiStub struct { Policies map[string]authapi.AccessPolicy } -func (client *AuthapiStub) CreateAccessPolicy(_ context.Context, _ authapi.CreateAccessPolicyParams, _ authapi.CreateAccessPolicyPayload) (authapi.AccessPolicy, error) { - randStr := fmt.Sprintf("random-policy-%s", util.GenerateShortUID()) +func (client *AuthapiStub) CreateAccessPolicy(_ context.Context, _ authapi.CreateAccessPolicyParams, payload authapi.CreateAccessPolicyPayload) (authapi.AccessPolicy, error) { + // randStr := fmt.Sprintf("random-policy-%s", util.GenerateShortUID()) + randStr := fmt.Sprintf("random-policy-%s", payload.Name) policy := authapi.AccessPolicy{ ID: randStr, Name: randStr, @@ -49,10 +49,11 @@ func (client *AuthapiStub) ListTokens(_ context.Context, _ authapi.ListTokenPara func (client *AuthapiStub) CreateToken(_ context.Context, _ authapi.CreateTokenParams, payload authapi.CreateTokenPayload) (authapi.Token, error) { token := authapi.Token{ - ID: fmt.Sprintf("random-token-%s", util.GenerateShortUID()), + // ID: fmt.Sprintf("random-token-%s", util.GenerateShortUID()), + ID: fmt.Sprintf("random-token-%s", payload.Name), Name: payload.Name, AccessPolicyID: payload.AccessPolicyID, - Token: fmt.Sprintf("completely_fake_token_%s", util.GenerateShortUID()), + Token: fmt.Sprintf("completely_fake_token_%s", payload.Name), } client.Token = &authapi.TokenView{ ID: token.ID, diff --git a/pkg/services/cloudmigration/async/async.go b/pkg/services/cloudmigration/async/async.go new file mode 100644 index 00000000000..247994f422a --- /dev/null +++ b/pkg/services/cloudmigration/async/async.go @@ -0,0 +1,33 @@ +package async + +import "time" + +type Async interface { + Go(f func()) + Tick(interval time.Duration, f func() bool) +} + +type AsyncImpl struct{} + +func NewAsync() *AsyncImpl { + return &AsyncImpl{} +} + +func (a *AsyncImpl) Go(f func()) { + go f() +} + +func (a *AsyncImpl) Tick(interval time.Duration, f func() bool) { + go func() { + tick := time.NewTicker(interval) + defer tick.Stop() + + for { + <-tick.C + + if f() { + return + } + } + }() +} diff --git a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go index 2320e194e9d..7be2ba4eed7 100644 --- a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go +++ b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go @@ -24,6 +24,7 @@ import ( "github.com/grafana/grafana/pkg/services/authapi/fake" "github.com/grafana/grafana/pkg/services/cloudmigration" "github.com/grafana/grafana/pkg/services/cloudmigration/api" + "github.com/grafana/grafana/pkg/services/cloudmigration/async" "github.com/grafana/grafana/pkg/services/cloudmigration/gmsclient" "github.com/grafana/grafana/pkg/services/cloudmigration/objectstorage" "github.com/grafana/grafana/pkg/services/dashboards" @@ -53,6 +54,8 @@ type Service struct { log *log.ConcreteLogger cfg *setting.Cfg + async async.Async + buildSnapshotMutex sync.Mutex cancelMutex sync.Mutex @@ -114,6 +117,7 @@ func ProvideService( kvStore kvstore.KVStore, libraryElementsService libraryelements.Service, ngAlert *ngalert.AlertNG, + async async.Async, ) (cloudmigration.Service, error) { if !features.IsEnabledGlobally(featuremgmt.FlagOnPremToCloudMigrations) { return &NoopServiceImpl{}, nil @@ -123,6 +127,7 @@ func ProvideService( store: &sqlStore{db: db, secretsStore: secretsStore, secretsService: secretsService}, log: log.New(LogPrefix), cfg: cfg, + async: async, features: features, dsService: dsService, tracer: tracer, @@ -519,7 +524,8 @@ func (s *Service) CreateSnapshot(ctx context.Context, signedInUser *user.SignedI } // start building the snapshot asynchronously while we return a success response to the client - go func() { + // go func() { + s.async.Go(func() { s.cancelMutex.Lock() defer func() { s.cancelFunc = nil @@ -556,7 +562,7 @@ func (s *Service) CreateSnapshot(ctx context.Context, signedInUser *user.SignedI span.SetStatus(codes.Ok, "snapshot built") s.report(asyncCtx, session, gmsclient.EventDoneBuildingSnapshot, time.Since(start), err, signedInUser.UserUID) - }() + }) return &snapshot, nil } @@ -627,7 +633,7 @@ func (s *Service) GetSnapshot(ctx context.Context, query cloudmigration.GetSnaps asyncSyncCtx := trace.ContextWithSpanContext(context.Background(), span.SpanContext()) // Sync snapshot results from GMS if the one created after upload is not running (e.g. due to a restart) // and anybody is interested in the status. - go s.syncSnapshotStatusFromGMSUntilDone(asyncSyncCtx, session, snapshot, syncStatus) + s.async.Go(func() { s.syncSnapshotStatusFromGMSUntilDone(asyncSyncCtx, session, snapshot, syncStatus) }) return snapshot, nil } @@ -669,23 +675,22 @@ func (s *Service) syncSnapshotStatusFromGMSUntilDone(ctx context.Context, sessio snapshot = updatedSnapshot } - tick := time.NewTicker(10 * time.Second) - defer tick.Stop() - - for snapshot.ShouldQueryGMS() { - select { - case <-ctx.Done(): + s.async.Tick(10*time.Second, func() bool { + // If the context has been canceled + if ctx.Err() != nil { s.log.Info("cancelling snapshot status polling", "sessionUID", session.UID, "snapshotUID", snapshot.UID) - return - case <-tick.C: - updatedSnapshot, err := syncStatus(ctx, session, snapshot) - if err != nil { - s.log.Error("error fetching snapshot status from GMS", "error", err, "sessionUID", session.UID, "snapshotUID", snapshot.UID) - continue - } - snapshot = updatedSnapshot + return true } - } + + updatedSnapshot, err := syncStatus(ctx, session, snapshot) + if err != nil { + s.log.Error("error fetching snapshot status from GMS", "error", err, "sessionUID", session.UID, "snapshotUID", snapshot.UID) + return false + } + snapshot = updatedSnapshot + + return false + }) } var gmsStateToLocalStatus map[cloudmigration.SnapshotState]cloudmigration.SnapshotStatus = map[cloudmigration.SnapshotState]cloudmigration.SnapshotStatus{ @@ -748,7 +753,8 @@ func (s *Service) UploadSnapshot(ctx context.Context, orgID int64, signedInUser } // start uploading the snapshot asynchronously while we return a success response to the client - go func() { + s.async.Go(func() { + fmt.Printf("\n\naaaaaaa running goroutine to upload snapshot\n\n") s.cancelMutex.Lock() defer func() { s.cancelFunc = nil @@ -783,7 +789,7 @@ func (s *Service) UploadSnapshot(ctx context.Context, orgID int64, signedInUser } s.report(asyncCtx, session, gmsclient.EventDoneUploadingSnapshot, time.Since(start), err, signedInUser.UserUID) - }() + }) return nil } diff --git a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_test.go b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_test.go index d6576cb35fd..ee8f8d9aeba 100644 --- a/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_test.go +++ b/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_test.go @@ -1,18 +1,27 @@ package cloudmigrationimpl import ( + "bytes" "context" + "encoding/json" "errors" + "fmt" + "io" "maps" + "net/http" "os" "path/filepath" "slices" + "strings" "sync" "testing" "time" + cryptorand "crypto/rand" + "github.com/google/uuid" "github.com/grafana/grafana/pkg/api/routing" + "github.com/grafana/grafana/pkg/apimachinery/identity" "github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/infra/db" @@ -22,7 +31,9 @@ import ( "github.com/grafana/grafana/pkg/plugins" "github.com/grafana/grafana/pkg/services/accesscontrol/actest" "github.com/grafana/grafana/pkg/services/annotations/annotationstest" + "github.com/grafana/grafana/pkg/services/authapi" "github.com/grafana/grafana/pkg/services/cloudmigration" + "github.com/grafana/grafana/pkg/services/cloudmigration/async" "github.com/grafana/grafana/pkg/services/cloudmigration/gmsclient" "github.com/grafana/grafana/pkg/services/dashboards" "github.com/grafana/grafana/pkg/services/datasources" @@ -40,6 +51,7 @@ import ( "github.com/grafana/grafana/pkg/services/pluginsintegration/pluginsettings" "github.com/grafana/grafana/pkg/services/pluginsintegration/pluginstore" "github.com/grafana/grafana/pkg/services/quota/quotatest" + "github.com/grafana/grafana/pkg/services/search/model" secretsfakes "github.com/grafana/grafana/pkg/services/secrets/fakes" secretskv "github.com/grafana/grafana/pkg/services/secrets/kvstore" "github.com/grafana/grafana/pkg/services/user" @@ -48,8 +60,212 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "golang.org/x/crypto/nacl/box" + "golang.org/x/exp/rand" +) + +type Op = string + +const ( + GetToken Op = "GetToken" + CreateToken Op = "CreateToken" + ValidateToken Op = "ValidateToken" + DeleteToken Op = "DeleteToken" + CreateSession Op = "CreateSession" + GetSession Op = "GetSession" + DeleteSession Op = "DeleteSession" + GetSessionList Op = "GetSessionList" + CreateSnapshot Op = "CreateSnapshot" + GetSnapshot Op = "GetSnapshot" + GetSnapshotList Op = "GetSnapshotList" + UploadSnapshot Op = "UploadSnapshot" + CancelSnapshot Op = "CancelSnapshot" + AsyncTick Op = "AsyncTick" ) +type cloudMigrationServiceModel struct { + token string + session *cloudmigration.CloudMigrationSessionResponse + snapshots []*cloudmigration.CloudMigrationSnapshot +} + +func newCloudMigrationServiceModel() *cloudMigrationServiceModel { + return &cloudMigrationServiceModel{} +} + +func (model *cloudMigrationServiceModel) GetToken(ctx context.Context) (authapi.TokenView, error) { + return authapi.TokenView{}, cloudmigration.ErrTokenNotFound +} + +func (model *cloudMigrationServiceModel) TokenCreated(response cloudmigration.CreateAccessTokenResponse) { + model.token = response.Token +} + +func (model *cloudMigrationServiceModel) SessionCreated(response *cloudmigration.CloudMigrationSessionResponse) { + model.session = response +} + +func (model *cloudMigrationServiceModel) SnapshotCreated(snapshot *cloudmigration.CloudMigrationSnapshot) { + model.snapshots = append(model.snapshots, snapshot) +} + +func enabledActions(model *cloudMigrationServiceModel, async *mockAsync) []Op { + enabled := make([]Op, 0) + + for _, op := range []Op{ + // GetToken, + CreateToken, + // ValidateToken, + // DeleteToken, + CreateSession, + GetSession, + // DeleteSession, + // GetSessionList, + CreateSnapshot, + // GetSnapshot, + // GetSnapshotList, + UploadSnapshot, + // CancelSnapshot, + AsyncTick, + } { + switch op { + case GetToken: + case CreateToken: + enabled = append(enabled, CreateToken) + case ValidateToken: + case DeleteToken: + case CreateSession: + // If a token exists + if model.token != "" { + enabled = append(enabled, CreateSession) + } + case GetSession: + case DeleteSession: + case GetSessionList: + case CreateSnapshot: + // If a session exists + if model.session != nil { + enabled = append(enabled, CreateSnapshot) + } + case GetSnapshot: + enabled = append(enabled, GetSnapshot) + case GetSnapshotList: + case UploadSnapshot: + if len(model.snapshots) > 0 { + enabled = append(enabled, UploadSnapshot) + } + case CancelSnapshot: + case AsyncTick: + if async.hasWork() { + enabled = append(enabled, AsyncTick) + } + default: + panic(fmt.Sprintf("unexpected op: %+v", op)) + } + } + + return enabled +} + +func choose(rng *rand.Rand, actions []Op) Op { + i := rng.Intn(len(actions)) + return actions[i] +} + +func Test_Basic(t *testing.T) { + t.Parallel() + + seed := time.Now().UnixMicro() + fmt.Printf("SEED=%d\n", seed) + rng := rand.New(rand.NewSource(uint64(seed))) + + mockDashboardService := newMockDashboardService() + + async := newMockAsync(rng) + + s := setUpServiceTestv2(t, mockDashboardService, async).(*Service) + gmsClient, err := gmsclient.NewGMSClient(&setting.Cfg{ + CloudMigration: setting.CloudMigrationSettings{GMSDomain: "localhost"}, + }, newGmsMockHttpClient(rng)) + if err != nil { + require.NoError(t, err) + } + s.gmsClient = gmsClient + + model := newCloudMigrationServiceModel() + + signedInUser := user.SignedInUser{UserID: 1, OrgID: 1} + + for range 30 { + actions := enabledActions(model, async) + op := choose(rng, actions) + + switch op { + case GetToken: + tokenView, err := s.GetToken(context.Background()) + modelTokenView, modelErr := model.GetToken(context.Background()) + if modelErr != nil { + require.ErrorIs(t, err, modelErr) + } else { + require.Equal(t, modelTokenView, tokenView) + } + + case CreateToken: + response, err := s.CreateToken(context.Background()) + require.NoError(t, err) + + model.TokenCreated(response) + + case ValidateToken: + err := s.ValidateToken(context.Background(), cloudmigration.CloudMigrationSession{}) + fmt.Printf("\n\naaaaaaa ValidateToken err %+v\n\n", err) + + case DeleteToken: + + case CreateSession: + response, err := s.CreateSession(context.Background(), &signedInUser, cloudmigration.CloudMigrationSessionRequest{ + AuthToken: model.token, + OrgID: signedInUser.OrgID, + }) + require.NoError(t, err) + model.SessionCreated(response) + + case GetSession: + case DeleteSession: + case GetSessionList: + case CreateSnapshot: + response, err := s.CreateSnapshot(context.Background(), &signedInUser, model.session.UID) + require.NoError(t, err) + model.SnapshotCreated(response) + + case GetSnapshot: + index := rng.Intn(len(model.snapshots)) + snapshotToGet := model.snapshots[index] + snapshot, err := s.GetSnapshot(context.Background(), cloudmigration.GetSnapshotsQuery{ + OrgID: signedInUser.OrgID, + SessionUID: model.session.UID, + SnapshotUID: snapshotToGet.UID, + }) + require.NoError(t, err) + require.Equal(t, snapshotToGet.UID, snapshot.UID) + + case GetSnapshotList: + case UploadSnapshot: + index := rng.Intn(len(model.snapshots)) + snapshot := model.snapshots[index] + err := s.UploadSnapshot(context.Background(), signedInUser.OrgID, &signedInUser, model.session.UID, snapshot.UID) + fmt.Printf("\n\naaaaaaa UploadSnapshot: err %+v\n\n", err) + case CancelSnapshot: + + case AsyncTick: + async.tick() + + default: + panic(fmt.Sprintf("Unexpected op: '%s'. Did you forget to handle it?", op)) + } + } +} + func Test_NoopServiceDoesNothing(t *testing.T) { t.Parallel() s := &NoopServiceImpl{} @@ -940,6 +1156,127 @@ func setUpServiceTest(t *testing.T, withDashboardMock bool, cfgOverrides ...conf kvstore.ProvideService(sqlStore), &libraryelementsfake.LibraryElementService{}, ng, + async.NewAsync(), + ) + require.NoError(t, err) + + return s +} + +func setUpServiceTestv2(t *testing.T, mockDashboardService *mockDashboardService, async async.Async, cfgOverrides ...configOverrides) cloudmigration.Service { + sqlStore := db.InitTestDB(t) + secretsService := secretsfakes.NewFakeSecretsService() + rr := routing.NewRouteRegister() + tracer := tracing.InitializeTracerForTest() + mockFolder := &foldertest.FakeService{ + ExpectedFolder: &folder.Folder{UID: "folderUID", Title: "Folder"}, + } + + cfg := setting.NewCfg() + section, err := cfg.Raw.NewSection("cloud_migration") + require.NoError(t, err) + _, err = section.NewKey("domain", "localhost:1234") + require.NoError(t, err) + + cfg.CloudMigration.IsDeveloperMode = true // ensure local implementations are used + cfg.CloudMigration.SnapshotFolder = filepath.Join(os.TempDir(), uuid.NewString()) + + dsService := &datafakes.FakeDataSourceService{ + DataSources: []*datasources.DataSource{ + {Name: "mmm", Type: "mysql"}, + {Name: "ZZZ", Type: "infinity"}, + }, + } + + featureToggles := featuremgmt.WithFeatures( + featuremgmt.FlagOnPremToCloudMigrations, + featuremgmt.FlagDashboardRestore, // needed for skipping creating soft-deleted dashboards in the snapshot. + ) + + kvStore := kvstore.ProvideService(sqlStore) + + bus := bus.ProvideBus(tracer) + fakeAccessControl := actest.FakeAccessControl{ExpectedEvaluate: true} + fakeAccessControlService := actest.FakeService{} + alertMetrics := metrics.NewNGAlert(prometheus.NewRegistry()) + + cfg.UnifiedAlerting.DefaultRuleEvaluationInterval = time.Minute + cfg.UnifiedAlerting.BaseInterval = time.Minute + cfg.UnifiedAlerting.InitializationTimeout = 30 * time.Second + ruleStore, err := ngalertstore.ProvideDBStore(cfg, featureToggles, sqlStore, mockFolder, mockDashboardService, fakeAccessControl, bus) + require.NoError(t, err) + + ng, err := ngalert.ProvideService( + cfg, featureToggles, nil, nil, rr, sqlStore, kvStore, nil, nil, quotatest.New(false, nil), + secretsService, nil, alertMetrics, mockFolder, fakeAccessControl, mockDashboardService, nil, bus, fakeAccessControlService, + annotationstest.NewFakeAnnotationsRepo(), &pluginstore.FakePluginStore{}, tracer, ruleStore, + httpclient.NewProvider(), ngalertfakes.NewFakeReceiverPermissionsService(), + ) + require.NoError(t, err) + + validConfig := `{ + "alertmanager_config": { + "route": { + "receiver": "grafana-default-email" + }, + "receivers": [{ + "name": "grafana-default-email", + "grafana_managed_receiver_configs": [{ + "uid": "", + "name": "email receiver", + "type": "email", + "settings": { + "addresses": "" + } + }] + }] + } + }` + require.NoError(t, ng.Api.AlertingStore.SaveAlertmanagerConfiguration(context.Background(), &models.SaveAlertmanagerConfigurationCmd{ + AlertmanagerConfiguration: validConfig, + OrgID: 1, + LastApplied: time.Now().Unix(), + })) + + // Insert test data for dashboard test, should be removed later when we move GetAllDashboardsByOrgId() to the dashboard service + _, err = sqlStore.GetSqlxSession().Exec(context.Background(), ` + INSERT INTO + dashboard (id, org_id, data, deleted, slug, title, created, version, updated ) + VALUES + (1, 1, '{}', null, 'asdf', 'ghjk', '2024-03-27 15:30:43.000' , '1','2024-03-27 15:30:43.000' ), + (2, 1, '{}', '2024-03-27 15:30:43.000','qwert', 'yuio', '2024-03-27 15:30:43.000' , '2','2024-03-27 15:30:43.000'), + (3, 2, '{}', null, 'asdf', 'ghjk', '2024-03-27 15:30:43.000' , '1','2024-03-27 15:30:43.000' ), + (4, 2, '{}', '2024-03-27 15:30:43.000','qwert', 'yuio', '2024-03-27 15:30:43.000' , '2','2024-03-27 15:30:43.000'); + `, + ) + if err != nil { + require.NoError(t, err) + } + + for _, cfgOverride := range cfgOverrides { + cfgOverride(cfg) + } + + s, err := ProvideService( + cfg, + httpclient.NewProvider(), + featureToggles, + sqlStore, + dsService, + secretskv.NewFakeSQLSecretsKVStore(t, sqlStore), + secretsService, + rr, + prometheus.DefaultRegisterer, + tracer, + mockDashboardService, + mockFolder, + &pluginstore.FakePluginStore{}, + &pluginsettings.FakePluginSettings{}, + actest.FakeAccessControl{ExpectedEvaluate: true}, + kvstore.ProvideService(sqlStore), + &libraryelementsfake.LibraryElementService{}, + ng, + async, ) require.NoError(t, err) @@ -998,3 +1335,177 @@ func (m *gmsClientMock) GetSnapshotStatusCallCount() int { return m.getStatusCalled } + +// A mock of the GMS api. Use this mock as the http client. +type gmsMockHttpClient struct { + rng *rand.Rand +} + +func newGmsMockHttpClient(rng *rand.Rand) *http.Client { + return &http.Client{Transport: &gmsMockHttpClient{rng: rng}} +} + +func (t *gmsMockHttpClient) RoundTrip(r *http.Request) (*http.Response, error) { + if strings.HasSuffix(r.URL.Path, "/api/v1/validate-key") { + // if t.rng.Int()%2 == 0 { + // return &http.Response{ + // StatusCode: http.StatusInternalServerError, + // Body: io.NopCloser(strings.NewReader(`{"message": "instance is unreachable, make sure the instance is running"}`)), + // }, nil + // } + return &http.Response{StatusCode: http.StatusOK}, nil + } + + if strings.HasSuffix(r.URL.Path, "/api/v1/start-snapshot") { + publicKey, _, err := box.GenerateKey(cryptorand.Reader) + if err != nil { + return nil, fmt.Errorf("nacl: generating public and private key: %w", err) + } + + buffer, err := json.Marshal(map[string]any{ + "snapshotID": "TODO", + "algo": "TODO", + "encryptionKey": publicKey, + "maxItemsPerPartition": 10, + "metadata": "TODO", + }) + if err != nil { + return &http.Response{StatusCode: http.StatusInternalServerError}, err + } + + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(bytes.NewReader(buffer)), + }, + nil + } + + if strings.HasSuffix(r.URL.Path, "/cloud-migrations/api/v1/events") { + return &http.Response{StatusCode: http.StatusOK}, nil + } + + if strings.HasSuffix(r.URL.Path, "create-upload-url") { + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader(`{"uploadUrl": "url"}`)), + }, + nil + } + + panic(fmt.Sprintf("gmsMockHttpClient.RoundTrip: Unexpected http request path: '%+v'. Did you forget to handle it?", r.URL.Path)) +} + +type mockDashboardService struct { + dashboards map[int64][]*dashboards.Dashboard +} + +func newMockDashboardService() *mockDashboardService { + return &mockDashboardService{} +} + +func (mock *mockDashboardService) BuildSaveDashboardCommand(ctx context.Context, dto *dashboards.SaveDashboardDTO, validateProvisionedDashboard bool) (*dashboards.SaveDashboardCommand, error) { + panic("unimplemented") +} +func (mock *mockDashboardService) DeleteDashboard(ctx context.Context, dashboardId int64, dashboardUID string, orgId int64) error { + panic("unimplemented") +} +func (mock *mockDashboardService) DeleteAllDashboards(ctx context.Context, orgID int64) error { + panic("unimplemented") +} +func (mock *mockDashboardService) FindDashboards(ctx context.Context, query *dashboards.FindPersistedDashboardsQuery) ([]dashboards.DashboardSearchProjection, error) { + panic("unimplemented") +} + +// GetDashboard fetches a dashboard. +// To fetch a dashboard under root by title should set the folder UID to point to an empty string +// eg. util.Pointer("") +func (mock *mockDashboardService) GetDashboard(ctx context.Context, query *dashboards.GetDashboardQuery) (*dashboards.Dashboard, error) { + panic("unimplemented") +} +func (mock *mockDashboardService) GetDashboards(ctx context.Context, query *dashboards.GetDashboardsQuery) ([]*dashboards.Dashboard, error) { + panic("unimplemented") +} +func (mock *mockDashboardService) GetDashboardTags(ctx context.Context, query *dashboards.GetDashboardTagsQuery) ([]*dashboards.DashboardTagCloudItem, error) { + panic("unimplemented") +} +func (mock *mockDashboardService) GetDashboardUIDByID(ctx context.Context, query *dashboards.GetDashboardRefByIDQuery) (*dashboards.DashboardRef, error) { + panic("unimplemented") +} +func (mock *mockDashboardService) ImportDashboard(ctx context.Context, dto *dashboards.SaveDashboardDTO) (*dashboards.Dashboard, error) { + panic("unimplemented") +} +func (mock *mockDashboardService) SaveDashboard(ctx context.Context, dto *dashboards.SaveDashboardDTO, allowUiUpdate bool) (*dashboards.Dashboard, error) { + panic("unimplemented") +} +func (mock *mockDashboardService) SearchDashboards(ctx context.Context, query *dashboards.FindPersistedDashboardsQuery) (model.HitList, error) { + panic("unimplemented") +} +func (mock *mockDashboardService) CountInFolders(ctx context.Context, orgID int64, folderUIDs []string, user identity.Requester) (int64, error) { + panic("unimplemented") +} +func (mock *mockDashboardService) GetDashboardsSharedWithUser(ctx context.Context, user identity.Requester) ([]*dashboards.Dashboard, error) { + panic("unimplemented") +} +func (mock *mockDashboardService) GetAllDashboards(ctx context.Context) ([]*dashboards.Dashboard, error) { + panic("unimplemented") +} +func (mock *mockDashboardService) GetAllDashboardsByOrgId(ctx context.Context, orgID int64) ([]*dashboards.Dashboard, error) { + return mock.dashboards[orgID], nil +} +func (mock *mockDashboardService) SoftDeleteDashboard(ctx context.Context, orgID int64, dashboardUid string) error { + panic("unimplemented") +} +func (mock *mockDashboardService) RestoreDashboard(ctx context.Context, dashboard *dashboards.Dashboard, user identity.Requester, optionalFolderUID string) error { + panic("unimplemented") +} +func (mock *mockDashboardService) CleanUpDeletedDashboards(ctx context.Context) (int64, error) { + panic("unimplemented") +} +func (mock *mockDashboardService) GetSoftDeletedDashboard(ctx context.Context, orgID int64, uid string) (*dashboards.Dashboard, error) { + panic("unimplemented") +} + +type mockAsync struct { + rng *rand.Rand + goFuncs []func() + tickFuncs []func() bool +} + +func newMockAsync(rng *rand.Rand) *mockAsync { + return &mockAsync{rng: rng} +} + +func (a *mockAsync) hasWork() bool { + return len(a.goFuncs) > 0 || len(a.tickFuncs) > 0 +} + +func (a *mockAsync) tick() { + if a.rng.Int()%2 == 0 && len(a.tickFuncs) > 0 { + // Choose a random ticker + i := rand.Intn(len(a.tickFuncs)) + f := a.tickFuncs[i] + + // If done, remove the ticker + if f() { + a.tickFuncs = append(a.tickFuncs[:i], a.tickFuncs[i+1:]...) + } + } else if len(a.goFuncs) > 0 { + // Choose a random goroutine + i := rand.Intn(len(a.goFuncs)) + f := a.goFuncs[i] + + // Remove the goroutine + a.goFuncs = append(a.goFuncs[:i], a.goFuncs[i+1:]...) + + f() + } +} + +func (a *mockAsync) Go(f func()) { + a.goFuncs = append(a.goFuncs, f) +} + +// interval can be ignored, time is not used +func (a *mockAsync) Tick(_interval time.Duration, f func() bool) { + a.tickFuncs = append(a.tickFuncs, f) +} diff --git a/pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go b/pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go index 870c4949533..9d77038d56a 100644 --- a/pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go +++ b/pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go @@ -488,6 +488,7 @@ func (s *Service) buildSnapshot(ctx context.Context, signedInUser *user.SignedIn s.buildSnapshotMutex.Lock() defer s.buildSnapshotMutex.Unlock() + // TODO: time should be a dependency start := time.Now() defer func() { s.log.Debug(fmt.Sprintf("buildSnapshot: method completed in %d ms", time.Since(start).Milliseconds())) diff --git a/pkg/services/cloudmigration/cloudmigrationimpl/xorm_store.go b/pkg/services/cloudmigration/cloudmigrationimpl/xorm_store.go index efbe701e51c..97d4a2c64e8 100644 --- a/pkg/services/cloudmigration/cloudmigrationimpl/xorm_store.go +++ b/pkg/services/cloudmigration/cloudmigrationimpl/xorm_store.go @@ -37,6 +37,7 @@ func (ss *sqlStore) GetMigrationSessionByUID(ctx context.Context, orgID int64, u if err != nil { return err } + if !exist { return cloudmigration.ErrMigrationNotFound } @@ -199,6 +200,7 @@ func (ss *sqlStore) UpdateSnapshot(ctx context.Context, update cloudmigration.Up if update.SessionID == "" { return fmt.Errorf("missing session uid") } + err := ss.db.InTransaction(ctx, func(ctx context.Context) error { // Update status if set if update.Status != "" {