The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
grafana/pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go

281 lines
9.6 KiB

package cloudmigrationimpl
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"time"
"github.com/grafana/grafana/pkg/api/routing"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/cloudmigration"
"github.com/grafana/grafana/pkg/services/cloudmigration/api"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/gcom"
"github.com/grafana/grafana/pkg/setting"
"github.com/prometheus/client_golang/prometheus"
)
// Service Define the cloudmigration.Service Implementation.
type Service struct {
store store
log *log.ConcreteLogger
cfg *setting.Cfg
features featuremgmt.FeatureToggles
dsService datasources.DataSourceService
gcomService gcom.Service
api *api.CloudMigrationAPI
tracer tracing.Tracer
metrics *Metrics
}
var LogPrefix = "cloudmigration.service"
const (
// nolint:gosec
cloudMigrationAccessPolicyName = "grafana-cloud-migrations"
//nolint:gosec
cloudMigrationTokenName = "grafana-cloud-migrations"
)
var _ cloudmigration.Service = (*Service)(nil)
// ProvideService Factory for method used by wire to inject dependencies.
// builds the service, and api, and configures routes
func ProvideService(
cfg *setting.Cfg,
features featuremgmt.FeatureToggles,
db db.DB,
dsService datasources.DataSourceService,
routeRegister routing.RouteRegister,
prom prometheus.Registerer,
tracer tracing.Tracer,
) cloudmigration.Service {
if !features.IsEnabledGlobally(featuremgmt.FlagOnPremToCloudMigrations) {
return &NoopServiceImpl{}
}
s := &Service{
store: &sqlStore{db: db},
log: log.New(LogPrefix),
cfg: cfg,
features: features,
dsService: dsService,
gcomService: gcom.New(gcom.Config{ApiURL: cfg.GrafanaComAPIURL, Token: cfg.CloudMigration.GcomAPIToken}),
tracer: tracer,
metrics: newMetrics(),
}
s.api = api.RegisterApi(routeRegister, s, tracer)
if err := s.registerMetrics(prom, s.metrics); err != nil {
s.log.Warn("error registering prom metrics", "error", err.Error())
}
return s
}
func (s *Service) CreateToken(ctx context.Context) (cloudmigration.CreateAccessTokenResponse, error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.CreateToken")
defer span.End()
logger := s.log.FromContext(ctx)
requestID := tracing.TraceIDFromContext(ctx, false)
timeoutCtx, cancel := context.WithTimeout(ctx, s.cfg.CloudMigration.FetchInstanceTimeout)
defer cancel()
instance, err := s.gcomService.GetInstanceByID(timeoutCtx, requestID, s.cfg.StackID)
if err != nil {
return cloudmigration.CreateAccessTokenResponse{}, fmt.Errorf("fetching instance by id: id=%s %w", s.cfg.StackID, err)
}
timeoutCtx, cancel = context.WithTimeout(ctx, s.cfg.CloudMigration.FetchAccessPolicyTimeout)
defer cancel()
existingAccessPolicy, err := s.findAccessPolicyByName(timeoutCtx, instance.RegionSlug, cloudMigrationAccessPolicyName)
if err != nil {
return cloudmigration.CreateAccessTokenResponse{}, fmt.Errorf("fetching access policy by name: name=%s %w", cloudMigrationAccessPolicyName, err)
}
if existingAccessPolicy != nil {
timeoutCtx, cancel := context.WithTimeout(ctx, s.cfg.CloudMigration.DeleteAccessPolicyTimeout)
defer cancel()
if _, err := s.gcomService.DeleteAccessPolicy(timeoutCtx, gcom.DeleteAccessPolicyParams{
RequestID: requestID,
AccessPolicyID: existingAccessPolicy.ID,
Region: instance.RegionSlug,
}); err != nil {
return cloudmigration.CreateAccessTokenResponse{}, fmt.Errorf("deleting access policy: id=%s region=%s %w", existingAccessPolicy.ID, instance.RegionSlug, err)
}
logger.Info("deleted access policy", existingAccessPolicy.ID, "name", existingAccessPolicy.Name)
}
timeoutCtx, cancel = context.WithTimeout(ctx, s.cfg.CloudMigration.CreateAccessPolicyTimeout)
defer cancel()
accessPolicy, err := s.gcomService.CreateAccessPolicy(timeoutCtx,
gcom.CreateAccessPolicyParams{
RequestID: requestID,
Region: instance.RegionSlug,
},
gcom.CreateAccessPolicyPayload{
Name: cloudMigrationAccessPolicyName,
DisplayName: cloudMigrationAccessPolicyName,
Realms: []gcom.Realm{{Type: "stack", Identifier: s.cfg.StackID, LabelPolicies: []gcom.LabelPolicy{}}},
Scopes: []string{"cloud-migrations:read", "cloud-migrations:write"},
})
if err != nil {
return cloudmigration.CreateAccessTokenResponse{}, fmt.Errorf("creating access policy: %w", err)
}
logger.Info("created access policy", "id", accessPolicy.ID, "name", accessPolicy.Name)
timeoutCtx, cancel = context.WithTimeout(ctx, s.cfg.CloudMigration.CreateTokenTimeout)
defer cancel()
token, err := s.gcomService.CreateToken(timeoutCtx,
gcom.CreateTokenParams{RequestID: requestID, Region: instance.RegionSlug},
gcom.CreateTokenPayload{
AccessPolicyID: accessPolicy.ID,
DisplayName: cloudMigrationTokenName,
Name: cloudMigrationTokenName,
ExpiresAt: time.Now().Add(s.cfg.CloudMigration.TokenExpiresAfter),
})
if err != nil {
return cloudmigration.CreateAccessTokenResponse{}, fmt.Errorf("creating access token: %w", err)
}
logger.Info("created access token", "id", token.ID, "name", token.Name)
s.metrics.accessTokenCreated.With(prometheus.Labels{"slug": s.cfg.Slug}).Inc()
bytes, err := json.Marshal(cloudmigration.Base64EncodedTokenPayload{
Token: token.Token,
Instance: cloudmigration.Base64HGInstance{
StackID: instance.ID,
RegionSlug: instance.RegionSlug,
ClusterSlug: instance.ClusterSlug, // This should be used for routing to CMS
Slug: instance.Slug,
},
})
if err != nil {
return cloudmigration.CreateAccessTokenResponse{}, fmt.Errorf("encoding token: %w", err)
}
return cloudmigration.CreateAccessTokenResponse{Token: base64.StdEncoding.EncodeToString(bytes)}, nil
}
func (s *Service) findAccessPolicyByName(ctx context.Context, regionSlug, accessPolicyName string) (*gcom.AccessPolicy, error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.findAccessPolicyByName")
defer span.End()
accessPolicies, err := s.gcomService.ListAccessPolicies(ctx, gcom.ListAccessPoliciesParams{
RequestID: tracing.TraceIDFromContext(ctx, false),
Region: regionSlug,
Name: accessPolicyName,
})
if err != nil {
return nil, fmt.Errorf("listing access policies: name=%s region=%s :%w", accessPolicyName, regionSlug, err)
}
for _, accessPolicy := range accessPolicies {
if accessPolicy.Name == accessPolicyName {
return &accessPolicy, nil
}
}
return nil, nil
}
func (s *Service) ValidateToken(ctx context.Context, token string) error {
// TODO: Implement method
return nil
}
func (s *Service) SaveEncryptedToken(ctx context.Context, token string) error {
// TODO: Implement method
return nil
}
func (s *Service) GetMigration(ctx context.Context, id int64) (*cloudmigration.CloudMigrationResponse, error) {
// commenting to fix linter, uncomment when this function is implemented
// ctx, span := s.tracer.Start(ctx, "CloudMigrationService.GetMigration")
// defer span.End()
return nil, nil
}
func (s *Service) GetMigrationList(ctx context.Context) (*cloudmigration.CloudMigrationListResponse, error) {
values, err := s.store.GetAllCloudMigrations(ctx)
if err != nil {
return nil, err
}
migrations := make([]cloudmigration.CloudMigrationResponse, 0)
for _, v := range values {
migrations = append(migrations, cloudmigration.CloudMigrationResponse{
ID: v.ID,
Stack: v.Stack,
Created: v.Created,
Updated: v.Updated,
})
}
return &cloudmigration.CloudMigrationListResponse{Migrations: migrations}, nil
}
func (s *Service) CreateMigration(ctx context.Context, cmd cloudmigration.CloudMigrationRequest) (*cloudmigration.CloudMigrationResponse, error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.createMigration")
defer span.End()
base64Token := cmd.AuthToken
b, err := base64.StdEncoding.DecodeString(base64Token)
if err != nil {
return nil, fmt.Errorf("token could not be decoded")
}
var token cloudmigration.Base64EncodedTokenPayload
if err := json.Unmarshal(b, &token); err != nil {
return nil, fmt.Errorf("invalid token") // don't want to leak info here
}
migration := token.ToMigration()
if err := s.store.CreateMigration(ctx, migration); err != nil {
return nil, fmt.Errorf("error creating migration: %w", err)
}
return &cloudmigration.CloudMigrationResponse{
ID: int64(token.Instance.StackID),
Stack: token.Instance.Slug,
// TODO replace this with the actual value once the storage piece is implemented
Created: time.Now(),
Updated: time.Now(),
}, nil
}
func (s *Service) UpdateMigration(ctx context.Context, id int64, cm cloudmigration.CloudMigrationRequest) (*cloudmigration.CloudMigrationResponse, error) {
// TODO: Implement method
return nil, nil
}
func (s *Service) RunMigration(ctx context.Context, uid string) (*cloudmigration.CloudMigrationRun, error) {
// TODO: Implement method
return nil, nil
}
func (s *Service) GetMigrationStatus(ctx context.Context, id string, runID string) (*cloudmigration.CloudMigrationRun, error) {
// TODO: Implement method
return nil, nil
}
func (s *Service) GetMigrationStatusList(ctx context.Context, id string) ([]cloudmigration.CloudMigrationRun, error) {
// TODO: Implement method
return nil, nil
}
func (s *Service) DeleteMigration(ctx context.Context, id string) error {
// TODO: Implement method
return nil
}
// func (s *Service) MigrateDatasources(ctx context.Context, request *cloudmigration.MigrateDatasourcesRequest) (*cloudmigration.MigrateDatasourcesResponse, error) {
// return s.store.MigrateDatasources(ctx, request)
// }