The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
grafana/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go

941 lines
34 KiB

package cloudmigrationimpl
import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
"github.com/google/uuid"
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
"github.com/grafana/grafana/pkg/api/routing"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/kvstore"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/authapi"
"github.com/grafana/grafana/pkg/services/authapi/fake"
"github.com/grafana/grafana/pkg/services/cloudmigration"
"github.com/grafana/grafana/pkg/services/cloudmigration/api"
"github.com/grafana/grafana/pkg/services/cloudmigration/gmsclient"
"github.com/grafana/grafana/pkg/services/cloudmigration/objectstorage"
"github.com/grafana/grafana/pkg/services/dashboards"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/folder"
"github.com/grafana/grafana/pkg/services/gcom"
"github.com/grafana/grafana/pkg/services/libraryelements"
"github.com/grafana/grafana/pkg/services/ngalert"
"github.com/grafana/grafana/pkg/services/pluginsintegration/pluginstore"
"github.com/grafana/grafana/pkg/services/secrets"
secretskv "github.com/grafana/grafana/pkg/services/secrets/kvstore"
"github.com/grafana/grafana/pkg/services/user"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)
// Service Define the cloudmigration.Service Implementation.
type Service struct {
store store
log *log.ConcreteLogger
cfg *setting.Cfg
buildSnapshotMutex sync.Mutex
cancelMutex sync.Mutex
cancelFunc context.CancelFunc
isSyncSnapshotStatusFromGMSRunning int32
features featuremgmt.FeatureToggles
gmsClient gmsclient.Client
objectStorage objectstorage.ObjectStorage
dsService datasources.DataSourceService
gcomService gcom.Service
authApiService authapi.Service
dashboardService dashboards.DashboardService
folderService folder.Service
pluginStore pluginstore.Store
secretsService secrets.Service
kvStore *kvstore.NamespacedKVStore
libraryElementsService libraryelements.Service
ngAlert *ngalert.AlertNG
api *api.CloudMigrationAPI
tracer tracing.Tracer
metrics *Metrics
}
var LogPrefix = "cloudmigration.service"
const (
// nolint:gosec
cloudMigrationAccessPolicyNamePrefix = "grafana-cloud-migrations"
//nolint:gosec
cloudMigrationTokenNamePrefix = "grafana-cloud-migrations"
)
var _ cloudmigration.Service = (*Service)(nil)
// ProvideService Factory for method used by wire to inject dependencies.
// builds the service, and api, and configures routes
func ProvideService(
cfg *setting.Cfg,
httpClientProvider *httpclient.Provider,
features featuremgmt.FeatureToggles,
db db.DB,
dsService datasources.DataSourceService,
secretsStore secretskv.SecretsKVStore,
secretsService secrets.Service,
routeRegister routing.RouteRegister,
prom prometheus.Registerer,
tracer tracing.Tracer,
dashboardService dashboards.DashboardService,
folderService folder.Service,
pluginStore pluginstore.Store,
kvStore kvstore.KVStore,
libraryElementsService libraryelements.Service,
ngAlert *ngalert.AlertNG,
) (cloudmigration.Service, error) {
if !features.IsEnabledGlobally(featuremgmt.FlagOnPremToCloudMigrations) {
return &NoopServiceImpl{}, nil
}
s := &Service{
store: &sqlStore{db: db, secretsStore: secretsStore, secretsService: secretsService},
log: log.New(LogPrefix),
cfg: cfg,
features: features,
dsService: dsService,
tracer: tracer,
metrics: newMetrics(),
secretsService: secretsService,
dashboardService: dashboardService,
folderService: folderService,
pluginStore: pluginStore,
kvStore: kvstore.WithNamespace(kvStore, 0, "cloudmigration"),
libraryElementsService: libraryElementsService,
ngAlert: ngAlert,
}
s.api = api.RegisterApi(routeRegister, s, tracer)
httpClientS3, err := httpClientProvider.New()
if err != nil {
return nil, fmt.Errorf("creating http client for S3: %w", err)
}
s.objectStorage = objectstorage.NewS3(httpClientS3, tracer)
if !cfg.CloudMigration.IsDeveloperMode {
httpClientGMS, err := httpClientProvider.New()
if err != nil {
return nil, fmt.Errorf("creating http client for GMS: %w", err)
}
c, err := gmsclient.NewGMSClient(cfg, httpClientGMS)
if err != nil {
return nil, fmt.Errorf("initializing GMS client: %w", err)
}
s.gmsClient = c
httpClientGcom, err := httpClientProvider.New()
if err != nil {
return nil, fmt.Errorf("creating http client for GCOM: %w", err)
}
s.gcomService = gcom.New(gcom.Config{ApiURL: cfg.GrafanaComAPIURL, Token: cfg.CloudMigration.GcomAPIToken}, httpClientGcom)
if features.IsEnabledGlobally(featuremgmt.FlagOnPremToCloudMigrationsAuthApiMig) {
s.log.Info("using authapi client because feature flag is enabled")
httpClientAuthApi, err := httpClientProvider.New()
if err != nil {
return nil, fmt.Errorf("creating http client for AuthApi: %w", err)
}
// the api token is the same as for gcom
s.authApiService = authapi.New(authapi.Config{ApiURL: cfg.CloudMigration.AuthAPIUrl, Token: cfg.CloudMigration.GcomAPIToken}, httpClientAuthApi)
} else {
s.log.Info("using gcom client for auth")
s.authApiService = gcom.New(gcom.Config{ApiURL: cfg.GrafanaComAPIURL, Token: cfg.CloudMigration.GcomAPIToken}, httpClientGcom).(*gcom.GcomClient)
}
} else {
s.gmsClient = gmsclient.NewInMemoryClient()
s.gcomService = &gcomStub{}
s.authApiService = &fake.AuthapiStub{Policies: map[string]authapi.AccessPolicy{}, Token: nil}
s.cfg.StackID = "12345"
}
if err := prom.Register(s.metrics); err != nil {
var alreadyRegisterErr prometheus.AlreadyRegisteredError
if errors.As(err, &alreadyRegisterErr) {
s.log.Warn("cloud migration metrics already registered")
} else {
return s, fmt.Errorf("registering cloud migration metrics: %w", err)
}
}
return s, nil
}
func (s *Service) GetToken(ctx context.Context) (authapi.TokenView, error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.GetToken")
defer span.End()
logger := s.log.FromContext(ctx)
requestID := tracing.TraceIDFromContext(ctx, false)
timeoutCtx, cancel := context.WithTimeout(ctx, s.cfg.CloudMigration.FetchInstanceTimeout)
defer cancel()
instance, err := s.gcomService.GetInstanceByID(timeoutCtx, requestID, s.cfg.StackID)
if err != nil {
return authapi.TokenView{}, fmt.Errorf("fetching instance by id: id=%s %w", s.cfg.StackID, err)
}
logger.Info("instance found", "slug", instance.Slug)
accessPolicyName := fmt.Sprintf("%s-%s", cloudMigrationAccessPolicyNamePrefix, s.cfg.StackID)
accessTokenName := fmt.Sprintf("%s-%s", cloudMigrationTokenNamePrefix, s.cfg.StackID)
timeoutCtx, cancel = context.WithTimeout(ctx, s.cfg.CloudMigration.ListTokensTimeout)
defer cancel()
tokens, err := s.authApiService.ListTokens(timeoutCtx, authapi.ListTokenParams{
RequestID: requestID,
AccessPolicyName: accessPolicyName,
TokenName: accessTokenName,
Region: instance.RegionSlug,
})
if err != nil {
return authapi.TokenView{}, fmt.Errorf("listing tokens: %w", err)
}
logger.Info("found access tokens", "num_tokens", len(tokens))
for _, token := range tokens {
if token.Name == accessTokenName {
logger.Info("found existing cloud migration token", "tokenID", token.ID, "accessPolicyID", token.AccessPolicyID)
return token, nil
}
}
logger.Info("cloud migration token not found")
return authapi.TokenView{}, fmt.Errorf("fetching cloud migration token: instance=%+v accessPolicyName=%s accessTokenName=%s %w",
instance, accessPolicyName, accessTokenName, cloudmigration.ErrTokenNotFound)
}
func (s *Service) CreateToken(ctx context.Context) (cloudmigration.CreateAccessTokenResponse, error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.CreateToken")
defer span.End()
logger := s.log.FromContext(ctx)
requestID := tracing.TraceIDFromContext(ctx, false)
timeoutCtx, cancel := context.WithTimeout(ctx, s.cfg.CloudMigration.FetchInstanceTimeout)
defer cancel()
instance, err := s.gcomService.GetInstanceByID(timeoutCtx, requestID, s.cfg.StackID)
if err != nil {
return cloudmigration.CreateAccessTokenResponse{}, fmt.Errorf("fetching instance by id: id=%s %w", s.cfg.StackID, err)
}
// Add the stack id to the access policy name to ensure access policies in a org have unique names.
accessPolicyName := fmt.Sprintf("%s-%s", cloudMigrationAccessPolicyNamePrefix, s.cfg.StackID)
accessPolicyDisplayName := fmt.Sprintf("%s-%s", s.cfg.Slug, cloudMigrationAccessPolicyNamePrefix)
timeoutCtx, cancel = context.WithTimeout(ctx, s.cfg.CloudMigration.FetchAccessPolicyTimeout)
defer cancel()
existingAccessPolicy, err := s.findAccessPolicyByName(timeoutCtx, accessPolicyName, instance.RegionSlug)
if err != nil {
return cloudmigration.CreateAccessTokenResponse{}, fmt.Errorf("fetching access policy by name: name=%s %w", accessPolicyName, err)
}
if existingAccessPolicy != nil {
timeoutCtx, cancel := context.WithTimeout(ctx, s.cfg.CloudMigration.DeleteAccessPolicyTimeout)
defer cancel()
if _, err := s.authApiService.DeleteAccessPolicy(timeoutCtx, authapi.DeleteAccessPolicyParams{
RequestID: requestID,
AccessPolicyID: existingAccessPolicy.ID,
Region: instance.RegionSlug,
}); err != nil {
return cloudmigration.CreateAccessTokenResponse{}, fmt.Errorf("deleting access policy: id=%s region=%s %w", existingAccessPolicy.ID, instance.RegionSlug, err)
}
logger.Info("deleted access policy", existingAccessPolicy.ID, "name", existingAccessPolicy.Name)
}
timeoutCtx, cancel = context.WithTimeout(ctx, s.cfg.CloudMigration.CreateAccessPolicyTimeout)
defer cancel()
accessPolicy, err := s.authApiService.CreateAccessPolicy(timeoutCtx,
authapi.CreateAccessPolicyParams{
RequestID: requestID,
Region: instance.RegionSlug,
},
authapi.CreateAccessPolicyPayload{
Name: accessPolicyName,
DisplayName: accessPolicyDisplayName,
Realms: []authapi.Realm{{Type: "stack", Identifier: s.cfg.StackID, LabelPolicies: []authapi.LabelPolicy{}}},
Scopes: []string{"cloud-migrations:read", "cloud-migrations:write"},
})
if err != nil {
return cloudmigration.CreateAccessTokenResponse{}, fmt.Errorf("creating access policy: %w", err)
}
logger.Info("created access policy", "id", accessPolicy.ID, "name", accessPolicy.Name)
// Add the stack id to the token name to ensure tokens in a org have unique names.
accessTokenName := fmt.Sprintf("%s-%s", cloudMigrationTokenNamePrefix, s.cfg.StackID)
accessTokenDisplayName := fmt.Sprintf("%s-%s", s.cfg.Slug, cloudMigrationTokenNamePrefix)
timeoutCtx, cancel = context.WithTimeout(ctx, s.cfg.CloudMigration.CreateTokenTimeout)
defer cancel()
token, err := s.authApiService.CreateToken(timeoutCtx,
authapi.CreateTokenParams{
RequestID: requestID,
Region: instance.RegionSlug,
},
authapi.CreateTokenPayload{
AccessPolicyID: accessPolicy.ID,
Name: accessTokenName,
DisplayName: accessTokenDisplayName,
ExpiresAt: time.Now().Add(s.cfg.CloudMigration.TokenExpiresAfter),
})
if err != nil {
return cloudmigration.CreateAccessTokenResponse{}, fmt.Errorf("creating access token: %w", err)
}
logger.Info("created access token", "id", token.ID, "name", token.Name)
s.metrics.accessTokenCreated.With(prometheus.Labels{"slug": s.cfg.Slug}).Inc()
bytes, err := json.Marshal(cloudmigration.Base64EncodedTokenPayload{
Token: token.Token,
Instance: cloudmigration.Base64HGInstance{
StackID: instance.ID,
RegionSlug: instance.RegionSlug,
ClusterSlug: instance.ClusterSlug, // This should be used for routing to GMS
Slug: instance.Slug,
},
})
if err != nil {
return cloudmigration.CreateAccessTokenResponse{}, fmt.Errorf("encoding token: %w", err)
}
return cloudmigration.CreateAccessTokenResponse{Token: base64.StdEncoding.EncodeToString(bytes)}, nil
}
func (s *Service) findAccessPolicyByName(ctx context.Context, accessPolicyName string, region string) (*authapi.AccessPolicy, error) {
accessPolicies, err := s.authApiService.ListAccessPolicies(ctx, authapi.ListAccessPoliciesParams{
RequestID: tracing.TraceIDFromContext(ctx, false),
Name: accessPolicyName,
Region: region,
})
if err != nil {
return nil, fmt.Errorf("listing access policies: name=%s :%w", accessPolicyName, err)
}
for _, accessPolicy := range accessPolicies {
if accessPolicy.Name == accessPolicyName {
return &accessPolicy, nil
}
}
return nil, nil
}
func (s *Service) ValidateToken(ctx context.Context, cm cloudmigration.CloudMigrationSession) error {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.ValidateToken")
defer span.End()
if err := s.gmsClient.ValidateKey(ctx, cm); err != nil {
return err
}
return nil
}
func (s *Service) DeleteToken(ctx context.Context, tokenID string) error {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.DeleteToken", trace.WithAttributes(attribute.String("tokenID", tokenID)))
defer span.End()
logger := s.log.FromContext(ctx)
requestID := tracing.TraceIDFromContext(ctx, false)
timeoutCtx, cancel := context.WithTimeout(ctx, s.cfg.CloudMigration.FetchInstanceTimeout)
defer cancel()
instance, err := s.gcomService.GetInstanceByID(timeoutCtx, requestID, s.cfg.StackID)
if err != nil {
return fmt.Errorf("fetching instance by id: id=%s %w", s.cfg.StackID, err)
}
logger.Info("found instance", "instanceID", instance.ID)
timeoutCtx, cancel = context.WithTimeout(ctx, s.cfg.CloudMigration.DeleteTokenTimeout)
defer cancel()
if err := s.authApiService.DeleteToken(timeoutCtx, authapi.DeleteTokenParams{
RequestID: tracing.TraceIDFromContext(ctx, false),
TokenID: tokenID,
Region: instance.RegionSlug,
}); err != nil && !errors.Is(err, gcom.ErrTokenNotFound) {
return fmt.Errorf("deleting cloud migration token: tokenID=%s %w", tokenID, err)
}
logger.Info("deleted cloud migration token", "tokenID", tokenID)
s.metrics.accessTokenDeleted.With(prometheus.Labels{"slug": s.cfg.Slug}).Inc()
return nil
}
func (s *Service) GetSession(ctx context.Context, orgID int64, uid string) (*cloudmigration.CloudMigrationSession, error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.GetSession")
defer span.End()
migration, err := s.store.GetMigrationSessionByUID(ctx, orgID, uid)
if err != nil {
return nil, err
}
return migration, nil
}
func (s *Service) GetSessionList(ctx context.Context, orgID int64) (*cloudmigration.CloudMigrationSessionListResponse, error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.GetSessionList")
defer span.End()
values, err := s.store.GetCloudMigrationSessionList(ctx, orgID)
if err != nil {
return nil, fmt.Errorf("retrieving session list from store: %w", err)
}
migrations := make([]cloudmigration.CloudMigrationSessionResponse, 0, len(values))
for _, v := range values {
migrations = append(migrations, cloudmigration.CloudMigrationSessionResponse{
UID: v.UID,
Slug: v.Slug,
Created: v.Created,
Updated: v.Updated,
})
}
return &cloudmigration.CloudMigrationSessionListResponse{Sessions: migrations}, nil
}
func (s *Service) CreateSession(ctx context.Context, signedInUser *user.SignedInUser, cmd cloudmigration.CloudMigrationSessionRequest) (*cloudmigration.CloudMigrationSessionResponse, error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.CreateSession")
defer span.End()
base64Token := cmd.AuthToken
b, err := base64.StdEncoding.DecodeString(base64Token)
if err != nil {
return nil, cloudmigration.ErrTokenInvalid.Errorf("token could not be decoded")
}
var token cloudmigration.Base64EncodedTokenPayload
if err := json.Unmarshal(b, &token); err != nil {
return nil, cloudmigration.ErrTokenInvalid.Errorf("token could not be decoded") // don't want to leak info here
}
migration := token.ToMigration(cmd.OrgID)
// validate token against GMS before saving
if err := s.ValidateToken(ctx, migration); err != nil {
return nil, err
}
cm, err := s.store.CreateMigrationSession(ctx, migration)
if err != nil {
return nil, cloudmigration.ErrSessionCreationFailure.Errorf("error creating migration")
}
s.report(ctx, &migration, gmsclient.EventConnect, 0, nil, signedInUser.UserUID)
return &cloudmigration.CloudMigrationSessionResponse{
UID: cm.UID,
Slug: token.Instance.Slug,
Created: cm.Created,
Updated: cm.Updated,
}, nil
}
func (s *Service) DeleteSession(ctx context.Context, orgID int64, signedInUser *user.SignedInUser, sessionUID string) (*cloudmigration.CloudMigrationSession, error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.DeleteSession")
defer span.End()
session, snapshots, err := s.store.DeleteMigrationSessionByUID(ctx, orgID, sessionUID)
if err != nil {
s.report(ctx, session, gmsclient.EventDisconnect, 0, err, signedInUser.UserUID)
return nil, fmt.Errorf("deleting migration from db for session %v: %w", sessionUID, err)
}
err = s.deleteLocalFiles(snapshots)
s.report(ctx, session, gmsclient.EventDisconnect, 0, err, signedInUser.UserUID)
return session, nil
}
func (s *Service) CreateSnapshot(ctx context.Context, signedInUser *user.SignedInUser, sessionUid string) (*cloudmigration.CloudMigrationSnapshot, error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.CreateSnapshot", trace.WithAttributes(
attribute.String("sessionUid", sessionUid),
))
defer span.End()
// fetch session for the gms auth token
session, err := s.store.GetMigrationSessionByUID(ctx, signedInUser.GetOrgID(), sessionUid)
if err != nil {
return nil, fmt.Errorf("fetching migration session for uid %s: %w", sessionUid, err)
}
// query gms to establish new snapshot s.cfg.CloudMigration.StartSnapshotTimeout
initResp, err := s.gmsClient.StartSnapshot(ctx, *session)
if err != nil {
return nil, fmt.Errorf("initializing snapshot with GMS for session %s: %w", sessionUid, err)
}
if s.cfg.CloudMigration.SnapshotFolder == "" {
return nil, fmt.Errorf("snapshot folder is not set")
}
// save snapshot to the db
snapshot := cloudmigration.CloudMigrationSnapshot{
UID: util.GenerateShortUID(),
SessionUID: sessionUid,
Status: cloudmigration.SnapshotStatusCreating,
EncryptionKey: initResp.EncryptionKey,
GMSSnapshotUID: initResp.SnapshotID,
LocalDir: filepath.Join(s.cfg.CloudMigration.SnapshotFolder, "grafana", "snapshots", initResp.SnapshotID),
}
uid, err := s.store.CreateSnapshot(ctx, snapshot)
if err != nil {
return nil, fmt.Errorf("saving snapshot: %w", err)
}
snapshot.UID = uid
// Update status to "creating" to ensure the frontend polls from now on
if err := s.updateSnapshotWithRetries(ctx, cloudmigration.UpdateSnapshotCmd{
UID: uid,
SessionID: sessionUid,
Status: cloudmigration.SnapshotStatusCreating,
}); err != nil {
return nil, err
}
// start building the snapshot asynchronously while we return a success response to the client
go func() {
s.cancelMutex.Lock()
defer func() {
s.cancelFunc = nil
s.cancelMutex.Unlock()
}()
// Create context out the span context to ensure the trace is propagated
asyncCtx := trace.ContextWithSpanContext(context.Background(), span.SpanContext())
asyncCtx, asyncSpan := s.tracer.Start(asyncCtx, "CloudMigrationService.CreateSnapshotAsync")
defer asyncSpan.End()
asyncCtx, cancelFunc := context.WithCancel(asyncCtx)
s.cancelFunc = cancelFunc
s.report(asyncCtx, session, gmsclient.EventStartBuildingSnapshot, 0, nil, signedInUser.UserUID)
start := time.Now()
err := s.buildSnapshot(asyncCtx, signedInUser, initResp.MaxItemsPerPartition, initResp.Metadata, snapshot)
if err != nil {
asyncSpan.SetStatus(codes.Error, "error building snapshot")
asyncSpan.RecordError(err)
s.log.Error("building snapshot", "err", err.Error())
// Update status to error with retries
if err := s.updateSnapshotWithRetries(asyncCtx, cloudmigration.UpdateSnapshotCmd{
UID: snapshot.UID,
SessionID: sessionUid,
Status: cloudmigration.SnapshotStatusError,
}); err != nil {
s.log.Error("critical failure during snapshot creation - please report any error logs")
asyncSpan.RecordError(err)
}
}
span.SetStatus(codes.Ok, "snapshot built")
s.report(asyncCtx, session, gmsclient.EventDoneBuildingSnapshot, time.Since(start), err, signedInUser.UserUID)
}()
return &snapshot, nil
}
// GetSnapshot returns the on-prem version of a snapshot, supplemented with processing status from GMS
func (s *Service) GetSnapshot(ctx context.Context, query cloudmigration.GetSnapshotsQuery) (*cloudmigration.CloudMigrationSnapshot, error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.GetSnapshot")
defer span.End()
orgID, sessionUid, snapshotUid := query.OrgID, query.SessionUID, query.SnapshotUID
snapshot, err := s.store.GetSnapshotByUID(ctx, orgID, sessionUid, snapshotUid, query.ResultPage, query.ResultLimit)
if err != nil {
return nil, fmt.Errorf("fetching snapshot for uid %s: %w", snapshotUid, err)
}
session, err := s.store.GetMigrationSessionByUID(ctx, orgID, sessionUid)
if err != nil {
return nil, fmt.Errorf("fetching session for uid %s: %w", sessionUid, err)
}
// FIXME: this function should be a method
syncStatus := func(ctx context.Context, session *cloudmigration.CloudMigrationSession, snapshot *cloudmigration.CloudMigrationSnapshot) (*cloudmigration.CloudMigrationSnapshot, error) {
// Calculate offset based on how many results we currently have responses for
pending := snapshot.StatsRollup.CountsByStatus[cloudmigration.ItemStatusPending]
snapshotMeta, err := s.gmsClient.GetSnapshotStatus(ctx, *session, *snapshot, snapshot.StatsRollup.Total-pending)
if err != nil {
return snapshot, fmt.Errorf("error fetching snapshot status from GMS: sessionUid: %s, snapshotUid: %s", sessionUid, snapshotUid)
}
if snapshotMeta.State == cloudmigration.SnapshotStateUnknown {
// If a status from Grafana Migration Service is unavailable, return the snapshot as-is
return snapshot, nil
}
localStatus, ok := gmsStateToLocalStatus[snapshotMeta.State]
if !ok {
s.log.Error("unexpected GMS snapshot state: %s", snapshotMeta.State)
return snapshot, nil
}
// For 11.2 we only support core data sources. Apply a warning for any non-core ones before storing.
resources, err := s.getResourcesWithPluginWarnings(ctx, snapshotMeta.Results)
if err != nil {
// treat this as non-fatal since the migration still succeeded
s.log.Error("error applying plugin warnings, please open a bug report: %w", err)
}
// Log the errors for resources with errors at migration
for _, resource := range resources {
if resource.Status == cloudmigration.ItemStatusError && resource.Error != "" {
s.log.Error("Could not migrate resource", "resourceID", resource.RefID, "error", resource.Error)
}
}
// We need to update the snapshot in our db before reporting anything
if err := s.store.UpdateSnapshot(ctx, cloudmigration.UpdateSnapshotCmd{
UID: snapshot.UID,
SessionID: sessionUid,
Status: localStatus,
Resources: resources,
}); err != nil {
return nil, fmt.Errorf("error updating snapshot status: %w", err)
}
// Refresh the snapshot after the update
snapshot, err = s.store.GetSnapshotByUID(ctx, orgID, sessionUid, snapshotUid, query.ResultPage, query.ResultLimit)
if err != nil {
return nil, fmt.Errorf("fetching snapshot for uid %s: %w", snapshotUid, err)
}
return snapshot, nil
}
// Create a context out the span context to ensure the trace is propagated
asyncSyncCtx := trace.ContextWithSpanContext(context.Background(), span.SpanContext())
// Sync snapshot results from GMS if the one created after upload is not running (e.g. due to a restart)
// and anybody is interested in the status.
go s.syncSnapshotStatusFromGMSUntilDone(asyncSyncCtx, session, snapshot, syncStatus)
return snapshot, nil
}
// FIXME: this definition should not exist once the function is GetSnapshot is converted to a method
type syncStatusFunc func(context.Context, *cloudmigration.CloudMigrationSession, *cloudmigration.CloudMigrationSnapshot) (*cloudmigration.CloudMigrationSnapshot, error)
func (s *Service) syncSnapshotStatusFromGMSUntilDone(ctx context.Context, session *cloudmigration.CloudMigrationSession, snapshot *cloudmigration.CloudMigrationSnapshot, syncStatus syncStatusFunc) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.syncSnapshotStatusFromGMSUntilDone")
span.SetAttributes(
attribute.String("sessionUID", session.UID),
attribute.String("snapshotUID", snapshot.UID),
)
defer span.End()
// Ensure only one in-flight sync running
if !atomic.CompareAndSwapInt32(&s.isSyncSnapshotStatusFromGMSRunning, 0, 1) {
s.log.Info("synchronize snapshot status already running", "sessionUID", session.UID, "snapshotUID", snapshot.UID)
return
}
defer atomic.StoreInt32(&s.isSyncSnapshotStatusFromGMSRunning, 0)
if !snapshot.ShouldQueryGMS() {
return
}
s.cancelMutex.Lock()
defer func() {
s.cancelFunc = nil
s.cancelMutex.Unlock()
}()
ctx, s.cancelFunc = context.WithCancel(ctx)
updatedSnapshot, err := syncStatus(ctx, session, snapshot)
if err != nil {
s.log.Error("error fetching snapshot status from GMS", "error", err, "sessionUID", session.UID, "snapshotUID", snapshot.UID)
} else {
snapshot = updatedSnapshot
}
tick := time.NewTicker(10 * time.Second)
defer tick.Stop()
for snapshot.ShouldQueryGMS() {
select {
case <-ctx.Done():
s.log.Info("cancelling snapshot status polling", "sessionUID", session.UID, "snapshotUID", snapshot.UID)
return
case <-tick.C:
updatedSnapshot, err := syncStatus(ctx, session, snapshot)
if err != nil {
s.log.Error("error fetching snapshot status from GMS", "error", err, "sessionUID", session.UID, "snapshotUID", snapshot.UID)
continue
}
snapshot = updatedSnapshot
}
}
}
var gmsStateToLocalStatus map[cloudmigration.SnapshotState]cloudmigration.SnapshotStatus = map[cloudmigration.SnapshotState]cloudmigration.SnapshotStatus{
cloudmigration.SnapshotStateInitialized: cloudmigration.SnapshotStatusPendingProcessing, // GMS has not yet received a notification for the data
cloudmigration.SnapshotStateProcessing: cloudmigration.SnapshotStatusProcessing, // GMS has received a notification and is migrating the data
cloudmigration.SnapshotStateFinished: cloudmigration.SnapshotStatusFinished, // GMS has completed the migration - all resources were attempted to be migrated
cloudmigration.SnapshotStateCanceled: cloudmigration.SnapshotStatusCanceled, // GMS has processed a cancelation request. Snapshot cancelation is not supported yet.
cloudmigration.SnapshotStateError: cloudmigration.SnapshotStatusError, // Something unrecoverable has occurred in the migration process.
}
func (s *Service) GetSnapshotList(ctx context.Context, query cloudmigration.ListSnapshotsQuery) ([]cloudmigration.CloudMigrationSnapshot, error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.GetSnapshotList")
defer span.End()
snapshotList, err := s.store.GetSnapshotList(ctx, query)
if err != nil {
return nil, fmt.Errorf("fetching snapshots for session uid %s: %w", query.SessionUID, err)
}
return snapshotList, nil
}
func (s *Service) UploadSnapshot(ctx context.Context, orgID int64, signedInUser *user.SignedInUser, sessionUid string, snapshotUid string) error {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.UploadSnapshot",
trace.WithAttributes(
attribute.String("sessionUid", sessionUid),
attribute.String("snapshotUid", snapshotUid),
),
)
defer span.End()
// fetch session for the gms auth token
session, err := s.store.GetMigrationSessionByUID(ctx, orgID, sessionUid)
if err != nil {
return fmt.Errorf("fetching migration session for uid %s: %w", sessionUid, err)
}
snapshot, err := s.GetSnapshot(ctx, cloudmigration.GetSnapshotsQuery{
SnapshotUID: snapshotUid,
SessionUID: sessionUid,
OrgID: orgID,
})
if err != nil {
return fmt.Errorf("fetching snapshot with uid %s: %w", snapshotUid, err)
}
uploadUrl, err := s.gmsClient.CreatePresignedUploadUrl(ctx, *session, *snapshot)
if err != nil {
return fmt.Errorf("creating presigned upload url for snapshot %s: %w", snapshotUid, err)
}
s.log.Info("Uploading snapshot in local directory", "gmsSnapshotUID", snapshot.GMSSnapshotUID, "localDir", snapshot.LocalDir, "uploadURL", uploadUrl)
// Update status to "uploading" to ensure the frontend polls from now on
if err := s.updateSnapshotWithRetries(ctx, cloudmigration.UpdateSnapshotCmd{
UID: snapshotUid,
SessionID: sessionUid,
Status: cloudmigration.SnapshotStatusUploading,
}); err != nil {
return err
}
// start uploading the snapshot asynchronously while we return a success response to the client
go func() {
s.cancelMutex.Lock()
defer func() {
s.cancelFunc = nil
s.cancelMutex.Unlock()
}()
// Create context out the span context to ensure the trace is propagated
asyncCtx := trace.ContextWithSpanContext(context.Background(), span.SpanContext())
asyncCtx, asyncSpan := s.tracer.Start(asyncCtx, "CloudMigrationService.UploadSnapshot")
defer asyncSpan.End()
asyncCtx, s.cancelFunc = context.WithCancel(asyncCtx)
s.report(asyncCtx, session, gmsclient.EventStartUploadingSnapshot, 0, nil, signedInUser.UserUID)
start := time.Now()
err := s.uploadSnapshot(asyncCtx, session, snapshot, uploadUrl)
if err != nil {
asyncSpan.SetStatus(codes.Error, "error uploading snapshot")
asyncSpan.RecordError(err)
s.log.Error("uploading snapshot", "err", err.Error())
// Update status to error with retries
if err := s.updateSnapshotWithRetries(asyncCtx, cloudmigration.UpdateSnapshotCmd{
UID: snapshot.UID,
SessionID: sessionUid,
Status: cloudmigration.SnapshotStatusError,
}); err != nil {
asyncSpan.RecordError(err)
s.log.Error("critical failure during snapshot upload - please report any error logs")
}
}
s.report(asyncCtx, session, gmsclient.EventDoneUploadingSnapshot, time.Since(start), err, signedInUser.UserUID)
}()
return nil
}
func (s *Service) CancelSnapshot(ctx context.Context, sessionUid string, snapshotUid string) (err error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.CancelSnapshot",
trace.WithAttributes(
attribute.String("sessionUid", sessionUid),
attribute.String("snapshotUid", snapshotUid),
),
)
defer span.End()
// The cancel func itself is protected by a mutex in the async threads, so it may or may not be set by the time CancelSnapshot is called
// Attempt to cancel and recover from the panic if the cancel function is nil
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("nothing to cancel")
}
}()
s.cancelFunc()
// Canceling will ensure that any goroutines holding the lock finish and release the lock
s.cancelMutex.Lock()
defer s.cancelMutex.Unlock()
s.cancelFunc = nil
if err := s.updateSnapshotWithRetries(ctx, cloudmigration.UpdateSnapshotCmd{
UID: snapshotUid,
SessionID: sessionUid,
Status: cloudmigration.SnapshotStatusCanceled,
}); err != nil {
s.log.Error("critical failure during snapshot cancelation - please report any error logs")
}
s.log.Info("canceled snapshot", "sessionUid", sessionUid, "snapshotUid", snapshotUid)
return nil
}
func (s *Service) report(
ctx context.Context,
sess *cloudmigration.CloudMigrationSession,
t gmsclient.LocalEventType,
d time.Duration,
evtErr error,
userUID string,
) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.report")
defer span.End()
id, err := s.getLocalEventId(ctx)
if err != nil {
s.log.Error("failed to report event", "type", t, "error", err.Error())
return
}
if sess == nil {
errMessage := "session not found"
if evtErr != nil {
errMessage = evtErr.Error()
}
s.log.Error("failed to report event", "type", t, "error", errMessage)
return
}
e := gmsclient.EventRequestDTO{
Event: t,
LocalID: id,
UserUID: userUID,
}
if d != 0 {
e.DurationIfFinished = d
}
if evtErr != nil {
e.Error = evtErr.Error()
}
s.gmsClient.ReportEvent(ctx, *sess, e)
}
func (s *Service) getLocalEventId(ctx context.Context) (string, error) {
anonId, ok, err := s.kvStore.Get(ctx, "anonymous_id")
if err != nil {
return "", fmt.Errorf("failed to get usage stats id: %w", err)
}
if ok {
return anonId, nil
}
anonId = uuid.NewString()
err = s.kvStore.Set(ctx, "anonymous_id", anonId)
if err != nil {
s.log.Error("Failed to store usage stats id", "error", err)
return "", fmt.Errorf("failed to store usage stats id: %w", err)
}
return anonId, nil
}
func (s *Service) deleteLocalFiles(snapshots []cloudmigration.CloudMigrationSnapshot) error {
_, span := s.tracer.Start(context.Background(), "CloudMigrationService.deleteLocalFiles")
defer span.End()
var err error
for _, snapshot := range snapshots {
err = os.RemoveAll(snapshot.LocalDir)
if err != nil {
// in this case we only log the error, don't return it to continue with the process
s.log.Error("deleting migration snapshot files", "err", err)
}
}
return err
}
// getResourcesWithPluginWarnings iterates through each resource and, if a non-core datasource, applies a warning that we only support core
func (s *Service) getResourcesWithPluginWarnings(ctx context.Context, results []cloudmigration.CloudMigrationResource) ([]cloudmigration.CloudMigrationResource, error) {
dsList, err := s.dsService.GetAllDataSources(ctx, &datasources.GetAllDataSourcesQuery{})
if err != nil {
return nil, fmt.Errorf("getting all data sources: %w", err)
}
dsMap := make(map[string]*datasources.DataSource, len(dsList))
for i := 0; i < len(dsList); i++ {
dsMap[dsList[i].UID] = dsList[i]
}
for i := 0; i < len(results); i++ {
r := results[i]
if r.Type == cloudmigration.DatasourceDataType &&
r.Error == "" { // any error returned by GMS takes priority
ds, ok := dsMap[r.RefID]
if !ok {
s.log.Error("data source with id %s was not found in data sources list", r.RefID)
continue
}
p, found := s.pluginStore.Plugin(ctx, ds.Type)
// if the plugin is not found, it means it was uninstalled, meaning it wasn't core
if !p.IsCorePlugin() || !found {
r.Status = cloudmigration.ItemStatusWarning
r.ErrorCode = cloudmigration.ErrOnlyCoreDataSources
r.Error = "Only core data sources are supported. Please ensure the plugin is installed on the cloud stack."
}
results[i] = r
}
}
return results, nil
}