The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
grafana/pkg/services/cloudmigration/gmsclient/gms_client.go

283 lines
10 KiB

package gmsclient
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"
"sync"
"time"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/cloudmigration"
"github.com/grafana/grafana/pkg/setting"
)
// NewGMSClient returns an implementation of Client that queries GrafanaMigrationService
func NewGMSClient(cfg *setting.Cfg, httpClient *http.Client) (Client, error) {
if cfg.CloudMigration.GMSDomain == "" {
return nil, fmt.Errorf("missing GMS domain")
}
return &gmsClientImpl{
cfg: cfg,
log: log.New(logPrefix),
httpClient: httpClient,
}, nil
}
type gmsClientImpl struct {
cfg *setting.Cfg
log *log.ConcreteLogger
httpClient *http.Client
getStatusMux sync.Mutex
getStatusLastQueried time.Time
}
func (c *gmsClientImpl) ValidateKey(ctx context.Context, cm cloudmigration.CloudMigrationSession) (err error) {
// TODO: there is a lot of boilerplate code in these methods, we should consolidate them when we have a gardening period
path := fmt.Sprintf("%s/api/v1/validate-key", c.buildBasePath(cm.ClusterSlug))
ctx, cancel := context.WithTimeout(ctx, c.cfg.CloudMigration.GMSValidateKeyTimeout)
defer cancel()
// validation is an empty POST to GMS with the authorization header included
req, err := http.NewRequestWithContext(ctx, "POST", path, bytes.NewReader(nil))
if err != nil {
c.log.Error("error creating http request for token validation", "err", err.Error())
return cloudmigration.ErrTokenRequestError.Errorf("create http request error")
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", fmt.Sprintf("Bearer %d:%s", cm.StackID, cm.AuthToken))
resp, err := c.httpClient.Do(req)
if err != nil {
c.log.Error("error sending http request for token validation", "err", err.Error())
return cloudmigration.ErrTokenRequestError.Errorf("send http request error")
}
defer func() {
if closeErr := resp.Body.Close(); closeErr != nil {
c.log.Error("error closing the request body", "err", err.Error())
err = errors.Join(err, cloudmigration.ErrTokenRequestError.Errorf("closing response body"))
}
}()
if resp.StatusCode != 200 {
body, _ := io.ReadAll(resp.Body)
if gmsErr := c.handleGMSErrors(body); gmsErr != nil {
return gmsErr
}
return cloudmigration.ErrTokenValidationFailure.Errorf("token validation failure")
}
return nil
}
func (c *gmsClientImpl) StartSnapshot(ctx context.Context, session cloudmigration.CloudMigrationSession) (out *cloudmigration.StartSnapshotResponse, err error) {
path := fmt.Sprintf("%s/api/v1/start-snapshot", c.buildBasePath(session.ClusterSlug))
ctx, cancel := context.WithTimeout(ctx, c.cfg.CloudMigration.GMSStartSnapshotTimeout)
defer cancel()
// Send the request to cms with the associated auth token
req, err := http.NewRequestWithContext(ctx, http.MethodPost, path, nil)
if err != nil {
c.log.Error("error creating http request to start snapshot", "err", err.Error())
return nil, fmt.Errorf("http request error: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", fmt.Sprintf("Bearer %d:%s", session.StackID, session.AuthToken))
resp, err := c.httpClient.Do(req)
if err != nil {
c.log.Error("error sending http request to start snapshot", "err", err.Error())
return nil, fmt.Errorf("http request error: %w", err)
} else if resp.StatusCode >= 400 {
c.log.Error("received error response for start snapshot", "statusCode", resp.StatusCode)
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("reading response body: %w", err)
}
return nil, fmt.Errorf("http request error: body=%s", string(body))
}
defer func() {
if closeErr := resp.Body.Close(); closeErr != nil {
err = errors.Join(err, fmt.Errorf("closing response body: %w", closeErr))
}
}()
var result cloudmigration.StartSnapshotResponse
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("unmarshalling start snapshot response: %w", err)
}
return &result, nil
}
func (c *gmsClientImpl) GetSnapshotStatus(ctx context.Context, session cloudmigration.CloudMigrationSession, snapshot cloudmigration.CloudMigrationSnapshot, offset int) (*cloudmigration.GetSnapshotStatusResponse, error) {
c.getStatusMux.Lock()
defer c.getStatusMux.Unlock()
path := fmt.Sprintf("%s/api/v1/snapshots/%s/status?offset=%d", c.buildBasePath(session.ClusterSlug), snapshot.GMSSnapshotUID, offset)
ctx, cancel := context.WithTimeout(ctx, c.cfg.CloudMigration.GMSGetSnapshotStatusTimeout)
defer cancel()
// Send the request to gms with the associated auth token
req, err := http.NewRequestWithContext(ctx, http.MethodGet, path, nil)
if err != nil {
c.log.Error("error creating http request to get snapshot status", "err", err.Error())
return nil, fmt.Errorf("http request error: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", fmt.Sprintf("Bearer %d:%s", session.StackID, session.AuthToken))
c.getStatusLastQueried = time.Now()
resp, err := c.httpClient.Do(req)
if err != nil {
c.log.Error("error sending http request to get snapshot status", "err", err.Error())
return nil, fmt.Errorf("http request error: %w", err)
} else if resp.StatusCode >= 400 {
c.log.Error("received error response for get snapshot status", "statusCode", resp.StatusCode)
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("reading response body: %w", err)
}
return nil, fmt.Errorf("http request error: body=%s", string(body))
}
defer func() {
if err := resp.Body.Close(); err != nil {
c.log.Error("closing request body", "err", err.Error())
}
}()
var result cloudmigration.GetSnapshotStatusResponse
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
c.log.Error("unmarshalling response body", "err", err.Error())
return nil, fmt.Errorf("unmarshalling get snapshot status response: %w", err)
}
return &result, nil
}
func (c *gmsClientImpl) CreatePresignedUploadUrl(ctx context.Context, session cloudmigration.CloudMigrationSession, snapshot cloudmigration.CloudMigrationSnapshot) (string, error) {
path := fmt.Sprintf("%s/api/v1/snapshots/%s/create-upload-url", c.buildBasePath(session.ClusterSlug), snapshot.GMSSnapshotUID)
ctx, cancel := context.WithTimeout(ctx, c.cfg.CloudMigration.GMSCreateUploadUrlTimeout)
defer cancel()
// Send the request to gms with the associated auth token
req, err := http.NewRequestWithContext(ctx, http.MethodPost, path, nil)
if err != nil {
c.log.Error("error creating http request to create upload url", "err", err.Error())
return "", fmt.Errorf("http request error: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", fmt.Sprintf("Bearer %d:%s", session.StackID, session.AuthToken))
resp, err := c.httpClient.Do(req)
if err != nil {
c.log.Error("error sending http request to create an upload url", "err", err.Error())
return "", fmt.Errorf("http request error: %w", err)
} else if resp.StatusCode >= 400 {
c.log.Error("received error response to create an upload url", "statusCode", resp.StatusCode)
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("reading response body: %w", err)
}
return "", fmt.Errorf("http request error: body=%s", string(body))
}
defer func() {
if err := resp.Body.Close(); err != nil {
c.log.Error("closing request body", "err", err.Error())
}
}()
var result CreateSnapshotUploadUrlResponseDTO
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
c.log.Error("unmarshalling response body", "err", err.Error())
return "", fmt.Errorf("unmarshalling create upload url response: %w", err)
}
return result.UploadUrl, nil
}
func (c *gmsClientImpl) ReportEvent(ctx context.Context, session cloudmigration.CloudMigrationSession, event EventRequestDTO) {
if event.LocalID == "" || event.Event == "" {
return
}
ctx, cancel := context.WithTimeout(ctx, c.cfg.CloudMigration.GMSReportEventTimeout)
defer cancel()
path := fmt.Sprintf("%s/api/v1/events", c.buildBasePath(session.ClusterSlug))
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(event); err != nil {
c.log.Error("encoding event", "err", err.Error())
return
}
// Send the request to gms with the associated auth token
req, err := http.NewRequestWithContext(ctx, http.MethodPost, path, &buf)
if err != nil {
c.log.Error("error creating http request to report event", "err", err.Error())
return
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", fmt.Sprintf("Bearer %d:%s", session.StackID, session.AuthToken))
resp, err := c.httpClient.Do(req)
if err != nil {
c.log.Error("error sending http request for report event", "err", err.Error())
return
} else if resp.StatusCode >= 400 {
c.log.Error("received error response for report event", "type", event.Event, "statusCode", resp.StatusCode)
body, err := io.ReadAll(resp.Body)
if err != nil {
c.log.Error("reading request body", "err", err.Error())
return
}
c.log.Error("http request error", "body", string(body))
return
}
defer func() {
if err := resp.Body.Close(); err != nil {
c.log.Error("closing request body", "err", err.Error())
}
}()
}
func (c *gmsClientImpl) buildBasePath(clusterSlug string) string {
domain := c.cfg.CloudMigration.GMSDomain
if strings.HasPrefix(domain, "http://") || strings.HasPrefix(domain, "https://") {
return domain
}
return fmt.Sprintf("https://cms-%s.%s/cloud-migrations", clusterSlug, domain)
}
// handleGMSErrors parses the error message from GMS and translates it to an appropriate error message
// use ErrTokenValidationFailure for any errors which are not specifically handled
func (c *gmsClientImpl) handleGMSErrors(responseBody []byte) error {
var apiError GMSAPIError
if err := json.Unmarshal(responseBody, &apiError); err != nil {
return cloudmigration.ErrTokenValidationFailure.Errorf("token validation failure")
}
if strings.Contains(apiError.Message, GMSErrorMessageInstanceUnreachable) {
return cloudmigration.ErrInstanceUnreachable.Errorf("instance unreachable")
} else if strings.Contains(apiError.Message, GMSErrorMessageInstanceCheckingError) {
return cloudmigration.ErrInstanceRequestError.Errorf("instance checking error")
} else if strings.Contains(apiError.Message, GMSErrorMessageInstanceFetching) {
return cloudmigration.ErrInstanceRequestError.Errorf("fetching instance")
}
return cloudmigration.ErrTokenValidationFailure.Errorf("token validation failure")
}