The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
grafana/pkg/tsdb/cloudwatch/cloudwatch.go

361 lines
12 KiB

package cloudwatch
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface"
"github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi/resourcegroupstaggingapiiface"
"github.com/grafana/grafana-aws-sdk/pkg/awsds"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
"github.com/grafana/grafana-plugin-sdk-go/backend/proxy"
"github.com/grafana/grafana-plugin-sdk-go/backend/resource/httpadapter"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/clients"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/kinds/dataquery"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/models"
"github.com/patrickmn/go-cache"
)
const (
tagValueCacheExpiration = time.Hour * 24
// headerFromExpression is used by datasources to identify expression queries
headerFromExpression = "X-Grafana-From-Expr"
// headerFromAlert is used by datasources to identify alert queries
headerFromAlert = "FromAlert"
)
type DataQueryJson struct {
dataquery.CloudWatchAnnotationQuery
Type string `json:"type,omitempty"`
}
type DataSource struct {
Settings models.CloudWatchSettings
HTTPClient *http.Client
tagValueCache *cache.Cache
ProxyOpts *proxy.Options
}
const (
defaultRegion = "default"
logsQueryMode = "Logs"
// QueryTypes
annotationQuery = "annotationQuery"
logAction = "logAction"
timeSeriesQuery = "timeSeriesQuery"
)
var logger = log.New("tsdb.cloudwatch")
func ProvideService(httpClientProvider *httpclient.Provider) *CloudWatchService {
logger.Debug("Initializing")
executor := newExecutor(datasource.NewInstanceManager(NewInstanceSettings(httpClientProvider)), awsds.NewSessionCache())
return &CloudWatchService{
Executor: executor,
}
}
type CloudWatchService struct {
Executor *cloudWatchExecutor
}
type SessionCache interface {
GetSession(c awsds.SessionConfig) (*session.Session, error)
}
func newExecutor(im instancemgmt.InstanceManager, sessions SessionCache) *cloudWatchExecutor {
e := &cloudWatchExecutor{
im: im,
sessions: sessions,
}
e.resourceHandler = httpadapter.New(e.newResourceMux())
return e
}
func NewInstanceSettings(httpClientProvider *httpclient.Provider) datasource.InstanceFactoryFunc {
return func(ctx context.Context, settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
instanceSettings, err := models.LoadCloudWatchSettings(ctx, settings)
if err != nil {
return nil, fmt.Errorf("error reading settings: %w", err)
}
opts, err := settings.HTTPClientOptions(ctx)
if err != nil {
return nil, err
}
httpClient, err := httpClientProvider.New(opts)
if err != nil {
return nil, fmt.Errorf("error creating http client: %w", err)
}
return DataSource{
Settings: instanceSettings,
HTTPClient: httpClient,
tagValueCache: cache.New(tagValueCacheExpiration, tagValueCacheExpiration*5),
// this is used to build a custom dialer when secure socks proxy is enabled
ProxyOpts: opts.ProxyOptions,
}, nil
}
}
// cloudWatchExecutor executes CloudWatch requests.
type cloudWatchExecutor struct {
im instancemgmt.InstanceManager
sessions SessionCache
regionCache sync.Map
resourceHandler backend.CallResourceHandler
}
func (e *cloudWatchExecutor) getRequestContext(ctx context.Context, pluginCtx backend.PluginContext, region string) (models.RequestContext, error) {
r := region
instance, err := e.getInstance(ctx, pluginCtx)
if region == defaultRegion {
if err != nil {
return models.RequestContext{}, err
}
r = instance.Settings.Region
}
ec2Client, err := e.getEC2Client(ctx, pluginCtx, defaultRegion)
if err != nil {
return models.RequestContext{}, err
}
sess, err := e.newSession(ctx, pluginCtx, r)
if err != nil {
return models.RequestContext{}, err
}
return models.RequestContext{
OAMAPIProvider: NewOAMAPI(sess),
MetricsClientProvider: clients.NewMetricsClient(NewMetricsAPI(sess), instance.Settings.GrafanaSettings.ListMetricsPageLimit),
LogsAPIProvider: NewLogsAPI(sess),
EC2APIProvider: ec2Client,
Settings: instance.Settings,
Logger: logger,
}, nil
}
func (e *cloudWatchExecutor) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
return e.resourceHandler.CallResource(ctx, req, sender)
}
func (e *cloudWatchExecutor) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
logger := logger.FromContext(ctx)
q := req.Queries[0]
var model DataQueryJson
err := json.Unmarshal(q.JSON, &model)
if err != nil {
return nil, err
}
_, fromAlert := req.Headers[headerFromAlert]
fromExpression := req.GetHTTPHeader(headerFromExpression) != ""
// Public dashboard queries execute like alert queries, i.e. they execute on the backend, therefore, we need to handle them synchronously.
// Since `model.Type` is set during execution on the frontend by the query runner and isn't saved with the query, we are checking here is
// missing the `model.Type` property and if it is a log query in order to determine if it is a public dashboard query.
fromPublicDashboard := (model.Type == "" && model.QueryMode == logsQueryMode)
isSyncLogQuery := ((fromAlert || fromExpression) && model.QueryMode == logsQueryMode) || fromPublicDashboard
if isSyncLogQuery {
return executeSyncLogQuery(ctx, e, req)
}
var result *backend.QueryDataResponse
switch model.Type {
case annotationQuery:
result, err = e.executeAnnotationQuery(ctx, req.PluginContext, model, q)
case logAction:
result, err = e.executeLogActions(ctx, logger, req)
case timeSeriesQuery:
fallthrough
default:
result, err = e.executeTimeSeriesQuery(ctx, logger, req)
}
return result, err
}
func (e *cloudWatchExecutor) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
status := backend.HealthStatusOk
metricsTest := "Successfully queried the CloudWatch metrics API."
logsTest := "Successfully queried the CloudWatch logs API."
err := e.checkHealthMetrics(ctx, req.PluginContext)
if err != nil {
status = backend.HealthStatusError
metricsTest = fmt.Sprintf("CloudWatch metrics query failed: %s", err.Error())
}
err = e.checkHealthLogs(ctx, req.PluginContext)
if err != nil {
status = backend.HealthStatusError
logsTest = fmt.Sprintf("CloudWatch logs query failed: %s", err.Error())
}
return &backend.CheckHealthResult{
Status: status,
Message: fmt.Sprintf("1. %s\n2. %s", metricsTest, logsTest),
}, nil
}
func (e *cloudWatchExecutor) checkHealthMetrics(ctx context.Context, pluginCtx backend.PluginContext) error {
namespace := "AWS/Billing"
metric := "EstimatedCharges"
params := &cloudwatch.ListMetricsInput{
Namespace: &namespace,
MetricName: &metric,
}
session, err := e.newSession(ctx, pluginCtx, defaultRegion)
if err != nil {
return err
}
instance, err := e.getInstance(ctx, pluginCtx)
if err != nil {
return err
}
metricClient := clients.NewMetricsClient(NewMetricsAPI(session), instance.Settings.GrafanaSettings.ListMetricsPageLimit)
_, err = metricClient.ListMetricsWithPageLimit(ctx, params)
return err
}
func (e *cloudWatchExecutor) checkHealthLogs(ctx context.Context, pluginCtx backend.PluginContext) error {
session, err := e.newSession(ctx, pluginCtx, defaultRegion)
if err != nil {
return err
}
logsClient := NewLogsAPI(session)
_, err = logsClient.DescribeLogGroupsWithContext(ctx, &cloudwatchlogs.DescribeLogGroupsInput{Limit: aws.Int64(1)})
return err
}
func (e *cloudWatchExecutor) newSession(ctx context.Context, pluginCtx backend.PluginContext, region string) (*session.Session, error) {
instance, err := e.getInstance(ctx, pluginCtx)
if err != nil {
return nil, err
}
if region == defaultRegion {
if len(instance.Settings.Region) == 0 {
return nil, models.ErrMissingRegion
}
region = instance.Settings.Region
}
CloudWatch: Re-implement authentication (#25548) * CloudWatch: Revisit authentication Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * CloudWatch: Simplify auth code Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * Use ARN Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * Add Drone configuration Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * Remove unused code Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * Remove .drone.yml Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * Fix external ID usage Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * CloudWatch: Fix issues after merge Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * Remove stale code Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * Remove stale code Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * Use auth type enum Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * Fix test snapshot * Coordinate frontend and backend option names Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * Remove old comments Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * Fix front-end tests Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * Introduce session cache Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * Use constants Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * Fix field alignment * CloudWatch: Fix log message Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * Tidy go.mod Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * CloudWatch: Handle arn auth type Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * CloudWatch: Fix role assumption duration Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * Fix test Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * CloudWatch: Inline unnecessary constants Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * CloudWatch: Use serial comma in UI Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * CloudWatch: Inline unnecessary constants Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * CloudWatch: Fail if missing region Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * CloudWatch: Handle unconfigured region Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * CloudWatch: Log when using cached session Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * CloudWatch: Include region in cache key Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * Add UI warnings for lecagy support * Do not clear ARN fields whenging change authentication provider * Graph NG: annotations display (#27972) * Annotations support POC * Fix markers memoization * dev dashboard update * Update public/app/plugins/panel/graph3/plugins/AnnotationsPlugin.tsx * CloudWatch: Remove errors.BadRequest Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * CloudWatch: Undo unintentional change Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * Remove log line Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * Fix cache key computation Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * Add region to cache key Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * Improve log messages Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * CloudWatch: Add documentation Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * Improve tooltip Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * Improve docs Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * Improve docs Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * Improve docs Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * Improve tooltip Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * Add role assumption provisioning example Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * Add upgrade notes Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * Improve docs Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * Apply suggestions from code review Co-authored-by: Marcus Efraimsson <marcus.efraimsson@gmail.com> * backend: use latest sdk (#28147) fixes #27713 via https://github.com/grafana/grafana-plugin-sdk-go/pull/227 * Docs: Update Permissions documentation (#28144) * removed overview.md * content updates * Update datasource_permissions.md * update content * content updates * Update organization_roles.md * Update docs/sources/enterprise/saml.md Co-authored-by: Kyle Brandt <kyle@grafana.com> * Update dashboard_folder_permissions.md Co-authored-by: Kyle Brandt <kyle@grafana.com> * area/grafana/toolkit: ci-package needs to use synchronous writes (#28148) * ci needs to use synchronous writes or the file ends up with zero length * <Enterprise Docs> Add instructions to upload license via UI (#28067) * Add UI license upload option, reformat Enterprise license activation section Added the option to upload a license file through the Server Admin UI, and did a little reformatting to make license activation look more like a process. * Headers not bold, hyphens not asterisks * Github: run metrics collector workflow every 10min (#28153) * GithubActions: Updated cron schedule * Updated * Docs: Update explore docs: remove dot at the end of line (#28151) HI - Removed Dot(.) at the end of line to make it consistent with other 2 points. Thanks, Ashish * Fix frontend tests Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * Fix frontend tests Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> * Docs: Update upgrade notes Co-authored-by: Sofia Papagiannaki <sofia@grafana.com> Co-authored-by: Dominik Prokop <dominik.prokop@grafana.com> Co-authored-by: Marcus Efraimsson <marcus.efraimsson@gmail.com> Co-authored-by: Kyle Brandt <kyle@grafana.com> Co-authored-by: Diana Payton <52059945+oddlittlebird@users.noreply.github.com> Co-authored-by: Brian Gann <briangann@users.noreply.github.com> Co-authored-by: Mitch Seaman <mjseaman@users.noreply.github.com> Co-authored-by: Torkel Ödegaard <torkel@grafana.org> Co-authored-by: Torkel Ödegaard <torkel@grafana.com> Co-authored-by: ashishagarwal06 <34888589+ashishagarwal06@users.noreply.github.com>
5 years ago
sess, err := e.sessions.GetSession(awsds.SessionConfig{
// https://github.com/grafana/grafana/issues/46365
// HTTPClient: instance.HTTPClient,
Settings: awsds.AWSDatasourceSettings{
Profile: instance.Settings.Profile,
Region: region,
AuthType: instance.Settings.AuthType,
AssumeRoleARN: instance.Settings.AssumeRoleARN,
ExternalID: instance.Settings.ExternalID,
Endpoint: instance.Settings.Endpoint,
DefaultRegion: instance.Settings.Region,
AccessKey: instance.Settings.AccessKey,
SecretKey: instance.Settings.SecretKey,
},
UserAgentName: aws.String("Cloudwatch"),
AuthSettings: &instance.Settings.GrafanaSettings,
})
if err != nil {
return nil, err
}
// work around until https://github.com/grafana/grafana/issues/39089 is implemented
if instance.Settings.GrafanaSettings.SecureSocksDSProxyEnabled && instance.Settings.SecureSocksProxyEnabled {
// only update the transport to try to avoid the issue mentioned here https://github.com/grafana/grafana/issues/46365
// also, 'sess' is cached and reused, so the first time it might have the transport not set, the following uses it will
if sess.Config.HTTPClient.Transport == nil {
// following go standard library logic (https://pkg.go.dev/net/http#Client), if no Transport is provided,
// then we use http.DefaultTransport
defTransport, ok := http.DefaultTransport.(*http.Transport)
if !ok {
// this should not happen but validating just in case
return nil, errors.New("default http client transport is not of type http.Transport")
}
sess.Config.HTTPClient.Transport = defTransport.Clone()
}
err = proxy.New(instance.ProxyOpts).ConfigureSecureSocksHTTPProxy(sess.Config.HTTPClient.Transport.(*http.Transport))
if err != nil {
return nil, fmt.Errorf("error configuring Secure Socks proxy for Transport: %w", err)
}
}
return sess, nil
}
func (e *cloudWatchExecutor) getInstance(ctx context.Context, pluginCtx backend.PluginContext) (*DataSource, error) {
i, err := e.im.Get(ctx, pluginCtx)
if err != nil {
return nil, err
}
instance := i.(DataSource)
return &instance, nil
}
func (e *cloudWatchExecutor) getCWClient(ctx context.Context, pluginCtx backend.PluginContext, region string) (cloudwatchiface.CloudWatchAPI, error) {
sess, err := e.newSession(ctx, pluginCtx, region)
if err != nil {
return nil, err
}
return NewCWClient(sess), nil
}
func (e *cloudWatchExecutor) getCWLogsClient(ctx context.Context, pluginCtx backend.PluginContext, region string) (cloudwatchlogsiface.CloudWatchLogsAPI, error) {
sess, err := e.newSession(ctx, pluginCtx, region)
if err != nil {
return nil, err
}
logsClient := NewCWLogsClient(sess)
return logsClient, nil
}
func (e *cloudWatchExecutor) getEC2Client(ctx context.Context, pluginCtx backend.PluginContext, region string) (models.EC2APIProvider, error) {
sess, err := e.newSession(ctx, pluginCtx, region)
if err != nil {
return nil, err
}
return NewEC2Client(sess), nil
}
func (e *cloudWatchExecutor) getRGTAClient(ctx context.Context, pluginCtx backend.PluginContext, region string) (resourcegroupstaggingapiiface.ResourceGroupsTaggingAPIAPI,
error) {
sess, err := e.newSession(ctx, pluginCtx, region)
if err != nil {
return nil, err
}
return newRGTAClient(sess), nil
}
func isTerminated(queryStatus string) bool {
return queryStatus == "Complete" || queryStatus == "Cancelled" || queryStatus == "Failed" || queryStatus == "Timeout"
}