CloudMigrations: Fix traceability & HTTP Client initialisation (#94141)

* Add traceability to Migration Assistant feature

* Fix some compilation errors

* Fix lint issues

* Use async context

* Add trace for LibraryElements
pull/94308/head
Roberto Jiménez Sánchez 9 months ago committed by GitHub
parent 9680722b78
commit 19c77eaae1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 74
      pkg/services/cloudmigration/api/api.go
  2. 93
      pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration.go
  3. 2
      pkg/services/cloudmigration/cloudmigrationimpl/cloudmigration_test.go
  4. 51
      pkg/services/cloudmigration/cloudmigrationimpl/snapshot_mgmt.go
  5. 62
      pkg/services/cloudmigration/gmsclient/gms_client.go
  6. 9
      pkg/services/cloudmigration/gmsclient/gms_client_test.go
  7. 23
      pkg/services/cloudmigration/objectstorage/s3.go
  8. 5
      pkg/services/gcom/gcom.go

@ -13,6 +13,8 @@ import (
contextmodel "github.com/grafana/grafana/pkg/services/contexthandler/model"
"github.com/grafana/grafana/pkg/util"
"github.com/grafana/grafana/pkg/web"
"go.opentelemetry.io/otel/codes"
)
type CloudMigrationAPI struct {
@ -78,6 +80,9 @@ func (cma *CloudMigrationAPI) GetToken(c *contextmodel.ReqContext) response.Resp
token, err := cma.cloudMigrationService.GetToken(ctx)
if err != nil {
span.SetStatus(codes.Error, "fetching cloud migration access token")
span.RecordError(err)
if !errors.Is(err, cloudmigration.ErrTokenNotFound) {
logger.Error("fetching cloud migration access token", "err", err.Error())
}
@ -112,7 +117,10 @@ func (cma *CloudMigrationAPI) CreateToken(c *contextmodel.ReqContext) response.R
resp, err := cma.cloudMigrationService.CreateToken(ctx)
if err != nil {
span.SetStatus(codes.Error, "creating gcom access token")
span.RecordError(err)
logger.Error("creating gcom access token", "err", err.Error())
return response.ErrOrFallback(http.StatusInternalServerError, "creating gcom access token", err)
}
@ -137,11 +145,17 @@ func (cma *CloudMigrationAPI) DeleteToken(c *contextmodel.ReqContext) response.R
uid := web.Params(c.Req)[":uid"]
if err := util.ValidateUID(uid); err != nil {
span.SetStatus(codes.Error, "invalid migration uid")
span.RecordError(err)
return response.Error(http.StatusBadRequest, "invalid migration uid", err)
}
if err := cma.cloudMigrationService.DeleteToken(ctx, uid); err != nil {
span.SetStatus(codes.Error, "deleting cloud migration token")
span.RecordError(err)
logger.Error("deleting cloud migration token", "err", err.Error())
return response.ErrOrFallback(http.StatusInternalServerError, "deleting cloud migration token", err)
}
@ -163,6 +177,9 @@ func (cma *CloudMigrationAPI) GetSessionList(c *contextmodel.ReqContext) respons
sl, err := cma.cloudMigrationService.GetSessionList(ctx)
if err != nil {
span.SetStatus(codes.Error, "session list error")
span.RecordError(err)
return response.ErrOrFallback(http.StatusInternalServerError, "session list error", err)
}
@ -185,11 +202,17 @@ func (cma *CloudMigrationAPI) GetSession(c *contextmodel.ReqContext) response.Re
uid := web.Params(c.Req)[":uid"]
if err := util.ValidateUID(uid); err != nil {
span.SetStatus(codes.Error, "invalid session uid")
span.RecordError(err)
return response.Error(http.StatusBadRequest, "invalid session uid", err)
}
s, err := cma.cloudMigrationService.GetSession(ctx, uid)
if err != nil {
span.SetStatus(codes.Error, "session not found")
span.RecordError(err)
return response.ErrOrFallback(http.StatusNotFound, "session not found", err)
}
@ -217,12 +240,18 @@ func (cma *CloudMigrationAPI) CreateSession(c *contextmodel.ReqContext) response
cmd := CloudMigrationSessionRequestDTO{}
if err := web.Bind(c.Req, &cmd); err != nil {
span.SetStatus(codes.Error, "bad request data")
span.RecordError(err)
return response.ErrOrFallback(http.StatusBadRequest, "bad request data", err)
}
s, err := cma.cloudMigrationService.CreateSession(ctx, cloudmigration.CloudMigrationSessionRequest{
AuthToken: cmd.AuthToken,
})
if err != nil {
span.SetStatus(codes.Error, "session creation error")
span.RecordError(err)
return response.ErrOrFallback(http.StatusInternalServerError, "session creation error", err)
}
@ -250,11 +279,17 @@ func (cma *CloudMigrationAPI) DeleteSession(c *contextmodel.ReqContext) response
uid := web.Params(c.Req)[":uid"]
if err := util.ValidateUID(uid); err != nil {
span.SetStatus(codes.Error, "invalid session uid")
span.RecordError(err)
return response.ErrOrFallback(http.StatusBadRequest, "invalid session uid", err)
}
_, err := cma.cloudMigrationService.DeleteSession(ctx, uid)
if err != nil {
span.SetStatus(codes.Error, "session delete error")
span.RecordError(err)
return response.ErrOrFallback(http.StatusInternalServerError, "session delete error", err)
}
return response.Empty(http.StatusOK)
@ -278,11 +313,17 @@ func (cma *CloudMigrationAPI) CreateSnapshot(c *contextmodel.ReqContext) respons
uid := web.Params(c.Req)[":uid"]
if err := util.ValidateUID(uid); err != nil {
span.SetStatus(codes.Error, "invalid session uid")
span.RecordError(err)
return response.ErrOrFallback(http.StatusBadRequest, "invalid session uid", err)
}
ss, err := cma.cloudMigrationService.CreateSnapshot(ctx, c.SignedInUser, uid)
if err != nil {
span.SetStatus(codes.Error, "error creating snapshot")
span.RecordError(err)
return response.ErrOrFallback(http.StatusInternalServerError, "error creating snapshot", err)
}
@ -307,9 +348,15 @@ func (cma *CloudMigrationAPI) GetSnapshot(c *contextmodel.ReqContext) response.R
sessUid, snapshotUid := web.Params(c.Req)[":uid"], web.Params(c.Req)[":snapshotUid"]
if err := util.ValidateUID(sessUid); err != nil {
span.SetStatus(codes.Error, "invalid session uid")
span.RecordError(err)
return response.ErrOrFallback(http.StatusBadRequest, "invalid session uid", err)
}
if err := util.ValidateUID(snapshotUid); err != nil {
span.SetStatus(codes.Error, "invalid snapshot uid")
span.RecordError(err)
return response.ErrOrFallback(http.StatusBadRequest, "invalid snapshot uid", err)
}
@ -327,6 +374,9 @@ func (cma *CloudMigrationAPI) GetSnapshot(c *contextmodel.ReqContext) response.R
}
snapshot, err := cma.cloudMigrationService.GetSnapshot(ctx, q)
if err != nil {
span.SetStatus(codes.Error, "error retrieving snapshot")
span.RecordError(err)
return response.ErrOrFallback(http.StatusInternalServerError, "error retrieving snapshot", err)
}
@ -386,6 +436,9 @@ func (cma *CloudMigrationAPI) GetSnapshotList(c *contextmodel.ReqContext) respon
uid := web.Params(c.Req)[":uid"]
if err := util.ValidateUID(uid); err != nil {
span.SetStatus(codes.Error, "invalid session uid")
span.RecordError(err)
return response.ErrOrFallback(http.StatusBadRequest, "invalid session uid", err)
}
q := cloudmigration.ListSnapshotsQuery{
@ -403,6 +456,9 @@ func (cma *CloudMigrationAPI) GetSnapshotList(c *contextmodel.ReqContext) respon
snapshotList, err := cma.cloudMigrationService.GetSnapshotList(ctx, q)
if err != nil {
span.SetStatus(codes.Error, "error retrieving snapshot list")
span.RecordError(err)
return response.ErrOrFallback(http.StatusInternalServerError, "error retrieving snapshot list", err)
}
@ -438,13 +494,22 @@ func (cma *CloudMigrationAPI) UploadSnapshot(c *contextmodel.ReqContext) respons
sessUid, snapshotUid := web.Params(c.Req)[":uid"], web.Params(c.Req)[":snapshotUid"]
if err := util.ValidateUID(sessUid); err != nil {
span.SetStatus(codes.Error, "invalid session uid")
span.RecordError(err)
return response.ErrOrFallback(http.StatusBadRequest, "invalid session uid", err)
}
if err := util.ValidateUID(snapshotUid); err != nil {
span.SetStatus(codes.Error, "invalid snapshot uid")
span.RecordError(err)
return response.ErrOrFallback(http.StatusBadRequest, "invalid snapshot uid", err)
}
if err := cma.cloudMigrationService.UploadSnapshot(ctx, sessUid, snapshotUid); err != nil {
span.SetStatus(codes.Error, "error uploading snapshot")
span.RecordError(err)
return response.ErrOrFallback(http.StatusInternalServerError, "error uploading snapshot", err)
}
@ -468,13 +533,22 @@ func (cma *CloudMigrationAPI) CancelSnapshot(c *contextmodel.ReqContext) respons
sessUid, snapshotUid := web.Params(c.Req)[":uid"], web.Params(c.Req)[":snapshotUid"]
if err := util.ValidateUID(sessUid); err != nil {
span.SetStatus(codes.Error, "invalid session uid")
span.RecordError(err)
return response.ErrOrFallback(http.StatusBadRequest, "invalid session uid", err)
}
if err := util.ValidateUID(snapshotUid); err != nil {
span.SetStatus(codes.Error, "invalid snapshot uid")
span.RecordError(err)
return response.ErrOrFallback(http.StatusBadRequest, "invalid snapshot uid", err)
}
if err := cma.cloudMigrationService.CancelSnapshot(ctx, sessUid, snapshotUid); err != nil {
span.SetStatus(codes.Error, "error canceling snapshot")
span.RecordError(err)
return response.ErrOrFallback(http.StatusInternalServerError, "error canceling snapshot", err)
}

@ -12,6 +12,7 @@ import (
"time"
"github.com/google/uuid"
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
"github.com/grafana/grafana/pkg/api/routing"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/kvstore"
@ -35,6 +36,7 @@ import (
"github.com/grafana/grafana/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)
@ -83,6 +85,7 @@ var _ cloudmigration.Service = (*Service)(nil)
// builds the service, and api, and configures routes
func ProvideService(
cfg *setting.Cfg,
httpClientProvider *httpclient.Provider,
features featuremgmt.FeatureToggles,
db db.DB,
dsService datasources.DataSourceService,
@ -118,15 +121,29 @@ func ProvideService(
}
s.api = api.RegisterApi(routeRegister, s, tracer)
s.objectStorage = objectstorage.NewS3()
httpClientS3, err := httpClientProvider.New()
if err != nil {
return nil, fmt.Errorf("creating http client for S3: %w", err)
}
s.objectStorage = objectstorage.NewS3(httpClientS3, tracer)
if !cfg.CloudMigration.IsDeveloperMode {
c, err := gmsclient.NewGMSClient(cfg)
httpClientGMS, err := httpClientProvider.New()
if err != nil {
return nil, fmt.Errorf("creating http client for GMS: %w", err)
}
c, err := gmsclient.NewGMSClient(cfg, httpClientGMS)
if err != nil {
return nil, fmt.Errorf("initializing GMS client: %w", err)
}
s.gmsClient = c
s.gcomService = gcom.New(gcom.Config{ApiURL: cfg.GrafanaComAPIURL, Token: cfg.CloudMigration.GcomAPIToken})
httpClientGcom, err := httpClientProvider.New()
if err != nil {
return nil, fmt.Errorf("creating http client for GCOM: %w", err)
}
s.gcomService = gcom.New(gcom.Config{ApiURL: cfg.GrafanaComAPIURL, Token: cfg.CloudMigration.GcomAPIToken}, httpClientGcom)
} else {
s.gmsClient = gmsclient.NewInMemoryClient()
s.gcomService = &gcomStub{policies: map[string]gcom.AccessPolicy{}, token: nil}
@ -169,7 +186,8 @@ func (s *Service) GetToken(ctx context.Context) (gcom.TokenView, error) {
RequestID: requestID,
Region: instance.RegionSlug,
AccessPolicyName: accessPolicyName,
TokenName: accessTokenName})
TokenName: accessTokenName,
})
if err != nil {
return gcom.TokenView{}, fmt.Errorf("listing tokens: %w", err)
}
@ -279,9 +297,6 @@ func (s *Service) CreateToken(ctx context.Context) (cloudmigration.CreateAccessT
}
func (s *Service) findAccessPolicyByName(ctx context.Context, regionSlug, accessPolicyName string) (*gcom.AccessPolicy, error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.findAccessPolicyByName")
defer span.End()
accessPolicies, err := s.gcomService.ListAccessPolicies(ctx, gcom.ListAccessPoliciesParams{
RequestID: tracing.TraceIDFromContext(ctx, false),
Region: regionSlug,
@ -341,7 +356,7 @@ func (s *Service) DeleteToken(ctx context.Context, tokenID string) error {
}
func (s *Service) GetSession(ctx context.Context, uid string) (*cloudmigration.CloudMigrationSession, error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.GetMigration")
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.GetSession")
defer span.End()
migration, err := s.store.GetMigrationSessionByUID(ctx, uid)
if err != nil {
@ -352,6 +367,9 @@ func (s *Service) GetSession(ctx context.Context, uid string) (*cloudmigration.C
}
func (s *Service) GetSessionList(ctx context.Context) (*cloudmigration.CloudMigrationSessionListResponse, error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.GetSessionList")
defer span.End()
values, err := s.store.GetCloudMigrationSessionList(ctx)
if err != nil {
return nil, fmt.Errorf("retrieving session list from store: %w", err)
@ -370,7 +388,7 @@ func (s *Service) GetSessionList(ctx context.Context) (*cloudmigration.CloudMigr
}
func (s *Service) CreateSession(ctx context.Context, cmd cloudmigration.CloudMigrationSessionRequest) (*cloudmigration.CloudMigrationSessionResponse, error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.createMigration")
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.CreateSession")
defer span.End()
base64Token := cmd.AuthToken
@ -405,6 +423,9 @@ func (s *Service) CreateSession(ctx context.Context, cmd cloudmigration.CloudMig
}
func (s *Service) DeleteSession(ctx context.Context, sessionUID string) (*cloudmigration.CloudMigrationSession, error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.DeleteSession")
defer span.End()
session, snapshots, err := s.store.DeleteMigrationSessionByUID(ctx, sessionUID)
if err != nil {
s.report(ctx, session, gmsclient.EventDisconnect, 0, err)
@ -470,26 +491,36 @@ func (s *Service) CreateSnapshot(ctx context.Context, signedInUser *user.SignedI
s.cancelMutex.Unlock()
}()
ctx, cancelFunc := context.WithCancel(context.Background())
// Create context out the span context to ensure the trace is propagated
asyncCtx := trace.ContextWithSpanContext(context.Background(), span.SpanContext())
asyncCtx, asyncSpan := s.tracer.Start(asyncCtx, "CloudMigrationService.CreateSnapshotAsync")
defer asyncSpan.End()
asyncCtx, cancelFunc := context.WithCancel(asyncCtx)
s.cancelFunc = cancelFunc
s.report(ctx, session, gmsclient.EventStartBuildingSnapshot, 0, nil)
s.report(asyncCtx, session, gmsclient.EventStartBuildingSnapshot, 0, nil)
start := time.Now()
err := s.buildSnapshot(ctx, signedInUser, initResp.MaxItemsPerPartition, initResp.Metadata, snapshot)
err := s.buildSnapshot(asyncCtx, signedInUser, initResp.MaxItemsPerPartition, initResp.Metadata, snapshot)
if err != nil {
asyncSpan.SetStatus(codes.Error, "error building snapshot")
asyncSpan.RecordError(err)
s.log.Error("building snapshot", "err", err.Error())
// Update status to error with retries
if err := s.updateSnapshotWithRetries(context.Background(), cloudmigration.UpdateSnapshotCmd{
if err := s.updateSnapshotWithRetries(asyncCtx, cloudmigration.UpdateSnapshotCmd{
UID: snapshot.UID,
SessionID: sessionUid,
Status: cloudmigration.SnapshotStatusError,
}); err != nil {
s.log.Error("critical failure during snapshot creation - please report any error logs")
asyncSpan.RecordError(err)
}
}
s.report(ctx, session, gmsclient.EventDoneBuildingSnapshot, time.Since(start), err)
span.SetStatus(codes.Ok, "snapshot built")
s.report(asyncCtx, session, gmsclient.EventDoneBuildingSnapshot, time.Since(start), err)
}()
return &snapshot, nil
@ -624,32 +655,48 @@ func (s *Service) UploadSnapshot(ctx context.Context, sessionUid string, snapsho
s.cancelMutex.Unlock()
}()
ctx, cancelFunc := context.WithCancel(context.Background())
s.cancelFunc = cancelFunc
// Create context out the span context to ensure the trace is propagated
asyncCtx := trace.ContextWithSpanContext(context.Background(), span.SpanContext())
asyncCtx, asyncSpan := s.tracer.Start(asyncCtx, "CloudMigrationService.UploadSnapshot")
defer asyncSpan.End()
s.report(ctx, session, gmsclient.EventStartUploadingSnapshot, 0, nil)
asyncCtx, s.cancelFunc = context.WithCancel(asyncCtx)
s.report(asyncCtx, session, gmsclient.EventStartUploadingSnapshot, 0, nil)
start := time.Now()
err := s.uploadSnapshot(ctx, session, snapshot, uploadUrl)
err := s.uploadSnapshot(asyncCtx, session, snapshot, uploadUrl)
if err != nil {
asyncSpan.SetStatus(codes.Error, "error uploading snapshot")
asyncSpan.RecordError(err)
s.log.Error("uploading snapshot", "err", err.Error())
// Update status to error with retries
if err := s.updateSnapshotWithRetries(context.Background(), cloudmigration.UpdateSnapshotCmd{
if err := s.updateSnapshotWithRetries(asyncCtx, cloudmigration.UpdateSnapshotCmd{
UID: snapshot.UID,
SessionID: sessionUid,
Status: cloudmigration.SnapshotStatusError,
}); err != nil {
asyncSpan.RecordError(err)
s.log.Error("critical failure during snapshot upload - please report any error logs")
}
}
s.report(ctx, session, gmsclient.EventDoneUploadingSnapshot, time.Since(start), err)
s.report(asyncCtx, session, gmsclient.EventDoneUploadingSnapshot, time.Since(start), err)
}()
return nil
}
func (s *Service) CancelSnapshot(ctx context.Context, sessionUid string, snapshotUid string) (err error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.CancelSnapshot",
trace.WithAttributes(
attribute.String("sessionUid", sessionUid),
attribute.String("snapshotUid", snapshotUid),
),
)
defer span.End()
// The cancel func itself is protected by a mutex in the async threads, so it may or may not be set by the time CancelSnapshot is called
// Attempt to cancel and recover from the panic if the cancel function is nil
defer func() {
@ -684,6 +731,9 @@ func (s *Service) report(
d time.Duration,
evtErr error,
) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.report")
defer span.End()
id, err := s.getLocalEventId(ctx)
if err != nil {
s.log.Error("failed to report event", "type", t, "error", err.Error())
@ -738,6 +788,9 @@ func (s *Service) getLocalEventId(ctx context.Context) (string, error) {
}
func (s *Service) deleteLocalFiles(snapshots []cloudmigration.CloudMigrationSnapshot) error {
_, span := s.tracer.Start(context.Background(), "CloudMigrationService.deleteLocalFiles")
defer span.End()
var err error
for _, snapshot := range snapshots {
err = os.RemoveAll(snapshot.LocalDir)

@ -8,6 +8,7 @@ import (
"time"
"github.com/google/uuid"
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
"github.com/grafana/grafana/pkg/api/routing"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/db"
@ -644,6 +645,7 @@ func setUpServiceTest(t *testing.T, withDashboardMock bool) cloudmigration.Servi
s, err := ProvideService(
cfg,
httpclient.NewProvider(),
featuremgmt.WithFeatures(
featuremgmt.FlagOnPremToCloudMigrations,
featuremgmt.FlagDashboardRestore),

@ -23,9 +23,14 @@ import (
"github.com/grafana/grafana/pkg/services/user"
"github.com/grafana/grafana/pkg/util/retryer"
"golang.org/x/crypto/nacl/box"
"go.opentelemetry.io/otel/codes"
)
func (s *Service) getMigrationDataJSON(ctx context.Context, signedInUser *user.SignedInUser) (*cloudmigration.MigrateDataRequest, error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.getMigrationDataJSON")
defer span.End()
// Data sources
dataSources, err := s.getDataSourceCommands(ctx)
if err != nil {
@ -103,6 +108,9 @@ func (s *Service) getMigrationDataJSON(ctx context.Context, signedInUser *user.S
}
func (s *Service) getDataSourceCommands(ctx context.Context) ([]datasources.AddDataSourceCommand, error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.getDataSourceCommands")
defer span.End()
dataSources, err := s.dsService.GetAllDataSources(ctx, &datasources.GetAllDataSourcesQuery{})
if err != nil {
s.log.Error("Failed to get all datasources", "err", err)
@ -141,6 +149,9 @@ func (s *Service) getDataSourceCommands(ctx context.Context) ([]datasources.AddD
// getDashboardAndFolderCommands returns the json payloads required by the dashboard and folder creation APIs
func (s *Service) getDashboardAndFolderCommands(ctx context.Context, signedInUser *user.SignedInUser) ([]dashboards.Dashboard, []folder.CreateFolderCommand, error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.getDashboardAndFolderCommands")
defer span.End()
dashs, err := s.dashboardService.GetAllDashboards(ctx)
if err != nil {
return nil, nil, err
@ -196,6 +207,9 @@ type libraryElement struct {
// getLibraryElementsCommands returns the json payloads required by the library elements creation API
func (s *Service) getLibraryElementsCommands(ctx context.Context, signedInUser *user.SignedInUser) ([]libraryElement, error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.getLibraryElementsCommands")
defer span.End()
const perPage = 100
cmds := make([]libraryElement, 0)
@ -242,6 +256,9 @@ func (s *Service) getLibraryElementsCommands(ctx context.Context, signedInUser *
// asynchronous process for writing the snapshot to the filesystem and updating the snapshot status
func (s *Service) buildSnapshot(ctx context.Context, signedInUser *user.SignedInUser, maxItemsPerPartition uint32, metadata []byte, snapshotMeta cloudmigration.CloudMigrationSnapshot) error {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.buildSnapshot")
defer span.End()
// TODO -- make sure we can only build one snapshot at a time
s.buildSnapshotMutex.Lock()
defer s.buildSnapshotMutex.Unlock()
@ -339,6 +356,9 @@ func (s *Service) buildSnapshot(ctx context.Context, signedInUser *user.SignedIn
// asynchronous process for and updating the snapshot status
func (s *Service) uploadSnapshot(ctx context.Context, session *cloudmigration.CloudMigrationSession, snapshotMeta *cloudmigration.CloudMigrationSnapshot, uploadUrl string) (err error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.uploadSnapshot")
defer span.End()
// TODO -- make sure we can only upload one snapshot at a time
s.buildSnapshotMutex.Lock()
defer s.buildSnapshotMutex.Unlock()
@ -361,37 +381,61 @@ func (s *Service) uploadSnapshot(ctx context.Context, session *cloudmigration.Cl
}
}()
_, readIndexSpan := s.tracer.Start(ctx, "CloudMigrationService.uploadSnapshot.readIndex")
index, err := snapshot.ReadIndex(indexFile)
if err != nil {
readIndexSpan.SetStatus(codes.Error, "reading index from file")
readIndexSpan.RecordError(err)
readIndexSpan.End()
return fmt.Errorf("reading index from file: %w", err)
}
readIndexSpan.End()
s.log.Debug(fmt.Sprintf("uploadSnapshot: read index file in %d ms", time.Since(start).Milliseconds()))
uploadCtx, uploadSpan := s.tracer.Start(ctx, "CloudMigrationService.uploadSnapshot.uploadDataFiles")
// Upload the data files.
for _, fileNames := range index.Items {
for _, fileName := range fileNames {
filePath := filepath.Join(snapshotMeta.LocalDir, fileName)
key := fmt.Sprintf("%d/snapshots/%s/%s", session.StackID, snapshotMeta.GMSSnapshotUID, fileName)
if err := s.uploadUsingPresignedURL(ctx, uploadUrl, key, filePath); err != nil {
if err := s.uploadUsingPresignedURL(uploadCtx, uploadUrl, key, filePath); err != nil {
uploadSpan.SetStatus(codes.Error, "uploading snapshot data file using presigned url")
uploadSpan.RecordError(err)
uploadSpan.End()
return fmt.Errorf("uploading snapshot file using presigned url: %w", err)
}
s.log.Debug(fmt.Sprintf("uploadSnapshot: uploaded %s in %d ms", fileName, time.Since(start).Milliseconds()))
}
}
uploadSpan.End()
s.log.Debug(fmt.Sprintf("uploadSnapshot: uploaded all data files in %d ms", time.Since(start).Milliseconds()))
uploadCtx, uploadSpan = s.tracer.Start(ctx, "CloudMigrationService.uploadSnapshot.uploadIndex")
// Upload the index file. Must be done after uploading the data files.
key := fmt.Sprintf("%d/snapshots/%s/%s", session.StackID, snapshotMeta.GMSSnapshotUID, "index.json")
if _, err := indexFile.Seek(0, 0); err != nil {
uploadSpan.SetStatus(codes.Error, "seeking to beginning of index file")
uploadSpan.RecordError(err)
uploadSpan.End()
return fmt.Errorf("seeking to beginning of index file: %w", err)
}
if err := s.objectStorage.PresignedURLUpload(ctx, uploadUrl, key, indexFile); err != nil {
if err := s.objectStorage.PresignedURLUpload(uploadCtx, uploadUrl, key, indexFile); err != nil {
uploadSpan.SetStatus(codes.Error, "uploading index file using presigned url")
uploadSpan.RecordError(err)
uploadSpan.End()
return fmt.Errorf("uploading file using presigned url: %w", err)
}
uploadSpan.End()
s.log.Debug(fmt.Sprintf("uploadSnapshot: uploaded index file in %d ms", time.Since(start).Milliseconds()))
s.log.Info("successfully uploaded snapshot", "snapshotUid", snapshotMeta.UID, "cloud_snapshotUid", snapshotMeta.GMSSnapshotUID)
@ -408,6 +452,9 @@ func (s *Service) uploadSnapshot(ctx context.Context, session *cloudmigration.Cl
}
func (s *Service) uploadUsingPresignedURL(ctx context.Context, uploadURL, key string, filePath string) (err error) {
ctx, span := s.tracer.Start(ctx, "CloudMigrationService.uploadUsingPresignedURL")
defer span.End()
// The directory that contains the file can set in the configuration, therefore the directory can be any directory.
// nolint:gosec
file, err := os.Open(filePath)

@ -18,19 +18,21 @@ import (
)
// NewGMSClient returns an implementation of Client that queries GrafanaMigrationService
func NewGMSClient(cfg *setting.Cfg) (Client, error) {
func NewGMSClient(cfg *setting.Cfg, httpClient *http.Client) (Client, error) {
if cfg.CloudMigration.GMSDomain == "" {
return nil, fmt.Errorf("missing GMS domain")
}
return &gmsClientImpl{
cfg: cfg,
log: log.New(logPrefix),
cfg: cfg,
log: log.New(logPrefix),
httpClient: httpClient,
}, nil
}
type gmsClientImpl struct {
cfg *setting.Cfg
log *log.ConcreteLogger
cfg *setting.Cfg
log *log.ConcreteLogger
httpClient *http.Client
getStatusMux sync.Mutex
getStatusLastQueried time.Time
@ -40,8 +42,11 @@ func (c *gmsClientImpl) ValidateKey(ctx context.Context, cm cloudmigration.Cloud
// TODO: there is a lot of boilerplate code in these methods, we should consolidate them when we have a gardening period
path := fmt.Sprintf("%s/api/v1/validate-key", c.buildBasePath(cm.ClusterSlug))
ctx, cancel := context.WithTimeout(ctx, c.cfg.CloudMigration.GMSValidateKeyTimeout)
defer cancel()
// validation is an empty POST to GMS with the authorization header included
req, err := http.NewRequest("POST", path, bytes.NewReader(nil))
req, err := http.NewRequestWithContext(ctx, "POST", path, bytes.NewReader(nil))
if err != nil {
c.log.Error("error creating http request for token validation", "err", err.Error())
return fmt.Errorf("http request error: %w", err)
@ -49,10 +54,7 @@ func (c *gmsClientImpl) ValidateKey(ctx context.Context, cm cloudmigration.Cloud
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", fmt.Sprintf("Bearer %d:%s", cm.StackID, cm.AuthToken))
client := &http.Client{
Timeout: c.cfg.CloudMigration.GMSValidateKeyTimeout,
}
resp, err := client.Do(req)
resp, err := c.httpClient.Do(req)
if err != nil {
c.log.Error("error sending http request for token validation", "err", err.Error())
return fmt.Errorf("http request error: %w", err)
@ -74,8 +76,11 @@ func (c *gmsClientImpl) ValidateKey(ctx context.Context, cm cloudmigration.Cloud
func (c *gmsClientImpl) StartSnapshot(ctx context.Context, session cloudmigration.CloudMigrationSession) (out *cloudmigration.StartSnapshotResponse, err error) {
path := fmt.Sprintf("%s/api/v1/start-snapshot", c.buildBasePath(session.ClusterSlug))
ctx, cancel := context.WithTimeout(ctx, c.cfg.CloudMigration.GMSStartSnapshotTimeout)
defer cancel()
// Send the request to cms with the associated auth token
req, err := http.NewRequest(http.MethodPost, path, nil)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, path, nil)
if err != nil {
c.log.Error("error creating http request to start snapshot", "err", err.Error())
return nil, fmt.Errorf("http request error: %w", err)
@ -83,10 +88,7 @@ func (c *gmsClientImpl) StartSnapshot(ctx context.Context, session cloudmigratio
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", fmt.Sprintf("Bearer %d:%s", session.StackID, session.AuthToken))
client := &http.Client{
Timeout: c.cfg.CloudMigration.GMSStartSnapshotTimeout,
}
resp, err := client.Do(req)
resp, err := c.httpClient.Do(req)
if err != nil {
c.log.Error("error sending http request to start snapshot", "err", err.Error())
return nil, fmt.Errorf("http request error: %w", err)
@ -119,8 +121,11 @@ func (c *gmsClientImpl) GetSnapshotStatus(ctx context.Context, session cloudmigr
path := fmt.Sprintf("%s/api/v1/snapshots/%s/status?offset=%d", c.buildBasePath(session.ClusterSlug), snapshot.GMSSnapshotUID, offset)
ctx, cancel := context.WithTimeout(ctx, c.cfg.CloudMigration.GMSGetSnapshotStatusTimeout)
defer cancel()
// Send the request to gms with the associated auth token
req, err := http.NewRequest(http.MethodGet, path, nil)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, path, nil)
if err != nil {
c.log.Error("error creating http request to get snapshot status", "err", err.Error())
return nil, fmt.Errorf("http request error: %w", err)
@ -128,11 +133,8 @@ func (c *gmsClientImpl) GetSnapshotStatus(ctx context.Context, session cloudmigr
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", fmt.Sprintf("Bearer %d:%s", session.StackID, session.AuthToken))
client := &http.Client{
Timeout: c.cfg.CloudMigration.GMSGetSnapshotStatusTimeout,
}
c.getStatusLastQueried = time.Now()
resp, err := client.Do(req)
resp, err := c.httpClient.Do(req)
if err != nil {
c.log.Error("error sending http request to get snapshot status", "err", err.Error())
return nil, fmt.Errorf("http request error: %w", err)
@ -163,8 +165,11 @@ func (c *gmsClientImpl) GetSnapshotStatus(ctx context.Context, session cloudmigr
func (c *gmsClientImpl) CreatePresignedUploadUrl(ctx context.Context, session cloudmigration.CloudMigrationSession, snapshot cloudmigration.CloudMigrationSnapshot) (string, error) {
path := fmt.Sprintf("%s/api/v1/snapshots/%s/create-upload-url", c.buildBasePath(session.ClusterSlug), snapshot.GMSSnapshotUID)
ctx, cancel := context.WithTimeout(ctx, c.cfg.CloudMigration.GMSCreateUploadUrlTimeout)
defer cancel()
// Send the request to gms with the associated auth token
req, err := http.NewRequest(http.MethodPost, path, nil)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, path, nil)
if err != nil {
c.log.Error("error creating http request to create upload url", "err", err.Error())
return "", fmt.Errorf("http request error: %w", err)
@ -172,10 +177,7 @@ func (c *gmsClientImpl) CreatePresignedUploadUrl(ctx context.Context, session cl
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", fmt.Sprintf("Bearer %d:%s", session.StackID, session.AuthToken))
client := &http.Client{
Timeout: c.cfg.CloudMigration.GMSCreateUploadUrlTimeout,
}
resp, err := client.Do(req)
resp, err := c.httpClient.Do(req)
if err != nil {
c.log.Error("error sending http request to create an upload url", "err", err.Error())
return "", fmt.Errorf("http request error: %w", err)
@ -208,6 +210,9 @@ func (c *gmsClientImpl) ReportEvent(ctx context.Context, session cloudmigration.
return
}
ctx, cancel := context.WithTimeout(ctx, c.cfg.CloudMigration.GMSReportEventTimeout)
defer cancel()
path := fmt.Sprintf("%s/api/v1/events", c.buildBasePath(session.ClusterSlug))
var buf bytes.Buffer
@ -216,7 +221,7 @@ func (c *gmsClientImpl) ReportEvent(ctx context.Context, session cloudmigration.
return
}
// Send the request to gms with the associated auth token
req, err := http.NewRequest(http.MethodPost, path, &buf)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, path, &buf)
if err != nil {
c.log.Error("error creating http request to report event", "err", err.Error())
return
@ -224,10 +229,7 @@ func (c *gmsClientImpl) ReportEvent(ctx context.Context, session cloudmigration.
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", fmt.Sprintf("Bearer %d:%s", session.StackID, session.AuthToken))
client := &http.Client{
Timeout: c.cfg.CloudMigration.GMSReportEventTimeout,
}
resp, err := client.Do(req)
resp, err := c.httpClient.Do(req)
if err != nil {
c.log.Error("error sending http request for report event", "err", err.Error())
return

@ -1,6 +1,7 @@
package gmsclient
import (
"net/http"
"testing"
"github.com/grafana/grafana/pkg/setting"
@ -16,7 +17,9 @@ func Test_buildBasePath(t *testing.T) {
CloudMigration: setting.CloudMigrationSettings{
GMSDomain: "",
},
})
},
http.DefaultClient,
)
require.Error(t, err)
// Domain is required
@ -24,7 +27,9 @@ func Test_buildBasePath(t *testing.T) {
CloudMigration: setting.CloudMigrationSettings{
GMSDomain: "non-empty",
},
})
},
http.DefaultClient,
)
require.NoError(t, err)
client := c.(*gmsClientImpl)

@ -9,15 +9,26 @@ import (
"mime/multipart"
"net/http"
"net/url"
"github.com/grafana/grafana/pkg/infra/tracing"
"go.opentelemetry.io/otel/attribute"
)
type S3 struct{}
type S3 struct {
httpClient *http.Client
tracer tracing.Tracer
}
func NewS3() *S3 {
return &S3{}
func NewS3(httpClient *http.Client, tracer tracing.Tracer) *S3 {
return &S3{httpClient: httpClient, tracer: tracer}
}
func (s3 *S3) PresignedURLUpload(ctx context.Context, presignedURL, key string, reader io.Reader) (err error) {
ctx, span := s3.tracer.Start(ctx, "objectstorage.S3.PresignedURLUpload")
span.SetAttributes(attribute.String("key", key))
defer span.End()
url, err := url.Parse(presignedURL)
if err != nil {
return fmt.Errorf("parsing presigned url")
@ -68,13 +79,13 @@ func (s3 *S3) PresignedURLUpload(ctx context.Context, presignedURL, key string,
endpoint := fmt.Sprintf("%s://%s%s", url.Scheme, url.Host, url.Path)
request, err := http.NewRequest(http.MethodPost, endpoint, buffer)
request, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, buffer)
if err != nil {
return fmt.Errorf("creating http request: %w", err)
}
request.Header.Set("Content-Type", writer.FormDataContentType())
httpClient := http.Client{}
response, err := httpClient.Do(request)
response, err := s3.httpClient.Do(request)
if err != nil {
return fmt.Errorf("sending http request: %w", err)
}

@ -138,11 +138,11 @@ type Config struct {
Token string
}
func New(cfg Config) Service {
func New(cfg Config, httpClient *http.Client) Service {
return &GcomClient{
log: log.New(LogPrefix),
cfg: cfg,
httpClient: &http.Client{},
httpClient: httpClient,
}
}
@ -360,6 +360,7 @@ func (client *GcomClient) ListTokens(ctx context.Context, params ListTokenParams
return body.Items, nil
}
func (client *GcomClient) CreateToken(ctx context.Context, params CreateTokenParams, payload CreateTokenPayload) (Token, error) {
endpoint, err := url.JoinPath(client.cfg.ApiURL, "/v1/tokens")
if err != nil {

Loading…
Cancel
Save