Plugins: Migrate CloudWatch to backend plugin SDK (#31149)

* first pass

* add instance manager

* fix tests

* remove dead code

* unexport fields

* cleanup

* remove ds instance from executor

* cleanup

* inline im

* remove old func

* get error working

* unexport field

* let fe do its magic

* fix channel name

* revert some tsdb changes

* fix annotations

* cleanup
pull/32265/head
Will Browne 4 years ago committed by GitHub
parent e165eda283
commit d7862c50b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 51
      pkg/tests/api/metrics/api_metrics_test.go
  2. 101
      pkg/tsdb/cloudwatch/annotation_query.go
  3. 327
      pkg/tsdb/cloudwatch/cloudwatch.go
  4. 62
      pkg/tsdb/cloudwatch/live.go
  5. 92
      pkg/tsdb/cloudwatch/log_actions.go
  6. 272
      pkg/tsdb/cloudwatch/log_actions_test.go
  7. 216
      pkg/tsdb/cloudwatch/metric_find_query.go
  8. 438
      pkg/tsdb/cloudwatch/metric_find_query_test.go
  9. 48
      pkg/tsdb/cloudwatch/query_transformer.go
  10. 2
      pkg/tsdb/cloudwatch/query_transformer_test.go
  11. 16
      pkg/tsdb/cloudwatch/request_parser.go
  12. 28
      pkg/tsdb/cloudwatch/test_utils.go
  13. 195
      pkg/tsdb/cloudwatch/time_series_query.go
  14. 21
      pkg/tsdb/cloudwatch/time_series_query_test.go
  15. 1
      pkg/tsdb/service.go
  16. 30
      public/app/plugins/datasource/cloudwatch/datasource.ts

@ -67,36 +67,36 @@ func TestQueryCloudWatchMetrics(t *testing.T) {
}),
},
}
tr := makeCWRequest(t, req, addr)
result := makeCWRequest(t, req, addr)
dataFrames := plugins.NewDecodedDataFrames(data.Frames{
&data.Frame{
RefID: "A",
Fields: []*data.Field{
data.NewField("text", nil, []string{"Test_MetricName"}),
data.NewField("value", nil, []string{"Test_MetricName"}),
},
Meta: &data.FrameMeta{
Custom: map[string]interface{}{
"rowCount": float64(1),
},
},
},
})
// Have to call this so that dataFrames.encoded is non-nil, for the comparison
// In the future we should use gocmp instead and ignore this field
_, err := dataFrames.Encoded()
require.NoError(t, err)
assert.Equal(t, plugins.DataResponse{
Results: map[string]plugins.DataQueryResult{
"A": {
RefID: "A",
Meta: simplejson.NewFromAny(map[string]interface{}{
"rowCount": float64(1),
}),
Tables: []plugins.DataTable{
{
Columns: []plugins.DataTableColumn{
{
Text: "text",
},
{
Text: "value",
},
},
Rows: []plugins.DataRowValues{
{
"Test_MetricName",
"Test_MetricName",
},
},
},
},
RefID: "A",
Dataframes: dataFrames,
},
},
}, tr)
}, result)
})
}
@ -132,7 +132,8 @@ func TestQueryCloudWatchLogs(t *testing.T) {
dataFrames := plugins.NewDecodedDataFrames(data.Frames{
&data.Frame{
Name: "logGroups",
Name: "logGroups",
RefID: "A",
Fields: []*data.Field{
data.NewField("logGroupName", nil, []*string{}),
},

@ -7,39 +7,34 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/util/errutil"
)
func (e *cloudWatchExecutor) executeAnnotationQuery(ctx context.Context, queryContext plugins.DataQuery) (
plugins.DataResponse, error) {
result := plugins.DataResponse{
Results: make(map[string]plugins.DataQueryResult),
}
firstQuery := queryContext.Queries[0]
queryResult := plugins.DataQueryResult{Meta: simplejson.New(), RefID: firstQuery.RefID}
parameters := firstQuery.Model
usePrefixMatch := parameters.Get("prefixMatching").MustBool(false)
region := parameters.Get("region").MustString("")
namespace := parameters.Get("namespace").MustString("")
metricName := parameters.Get("metricName").MustString("")
dimensions := parameters.Get("dimensions").MustMap()
statistics, err := parseStatistics(parameters)
func (e *cloudWatchExecutor) executeAnnotationQuery(ctx context.Context, model *simplejson.Json, query backend.DataQuery, pluginCtx backend.PluginContext) (*backend.QueryDataResponse, error) {
result := backend.NewQueryDataResponse()
usePrefixMatch := model.Get("prefixMatching").MustBool(false)
region := model.Get("region").MustString("")
namespace := model.Get("namespace").MustString("")
metricName := model.Get("metricName").MustString("")
dimensions := model.Get("dimensions").MustMap()
statistics, err := parseStatistics(model)
if err != nil {
return plugins.DataResponse{}, err
return nil, err
}
period := int64(parameters.Get("period").MustInt(0))
period := int64(model.Get("period").MustInt(0))
if period == 0 && !usePrefixMatch {
period = 300
}
actionPrefix := parameters.Get("actionPrefix").MustString("")
alarmNamePrefix := parameters.Get("alarmNamePrefix").MustString("")
actionPrefix := model.Get("actionPrefix").MustString("")
alarmNamePrefix := model.Get("alarmNamePrefix").MustString("")
cli, err := e.getCWClient(region)
cli, err := e.getCWClient(region, pluginCtx)
if err != nil {
return plugins.DataResponse{}, err
return nil, err
}
var alarmNames []*string
@ -51,7 +46,7 @@ func (e *cloudWatchExecutor) executeAnnotationQuery(ctx context.Context, queryCo
}
resp, err := cli.DescribeAlarms(params)
if err != nil {
return plugins.DataResponse{}, errutil.Wrap("failed to call cloudwatch:DescribeAlarms", err)
return nil, errutil.Wrap("failed to call cloudwatch:DescribeAlarms", err)
}
alarmNames = filterAlarms(resp, namespace, metricName, dimensions, statistics, period)
} else {
@ -82,7 +77,7 @@ func (e *cloudWatchExecutor) executeAnnotationQuery(ctx context.Context, queryCo
}
resp, err := cli.DescribeAlarmsForMetric(params)
if err != nil {
return plugins.DataResponse{}, errutil.Wrap("failed to call cloudwatch:DescribeAlarmsForMetric", err)
return nil, errutil.Wrap("failed to call cloudwatch:DescribeAlarmsForMetric", err)
}
for _, alarm := range resp.MetricAlarms {
alarmNames = append(alarmNames, alarm.AlarmName)
@ -90,26 +85,17 @@ func (e *cloudWatchExecutor) executeAnnotationQuery(ctx context.Context, queryCo
}
}
startTime, err := queryContext.TimeRange.ParseFrom()
if err != nil {
return plugins.DataResponse{}, err
}
endTime, err := queryContext.TimeRange.ParseTo()
if err != nil {
return plugins.DataResponse{}, err
}
annotations := make([]map[string]string, 0)
for _, alarmName := range alarmNames {
params := &cloudwatch.DescribeAlarmHistoryInput{
AlarmName: alarmName,
StartDate: aws.Time(startTime),
EndDate: aws.Time(endTime),
StartDate: aws.Time(query.TimeRange.From),
EndDate: aws.Time(query.TimeRange.To),
MaxRecords: aws.Int64(100),
}
resp, err := cli.DescribeAlarmHistory(params)
if err != nil {
return plugins.DataResponse{}, errutil.Wrap("failed to call cloudwatch:DescribeAlarmHistory", err)
return nil, errutil.Wrap("failed to call cloudwatch:DescribeAlarmHistory", err)
}
for _, history := range resp.AlarmHistoryItems {
annotation := make(map[string]string)
@ -121,31 +107,32 @@ func (e *cloudWatchExecutor) executeAnnotationQuery(ctx context.Context, queryCo
}
}
transformAnnotationToTable(annotations, &queryResult)
result.Results[firstQuery.RefID] = queryResult
return result, nil
respD := result.Responses[query.RefID]
respD.Frames = append(respD.Frames, transformAnnotationToTable(annotations, query))
result.Responses[query.RefID] = respD
return result, err
}
func transformAnnotationToTable(data []map[string]string, result *plugins.DataQueryResult) {
table := plugins.DataTable{
Columns: make([]plugins.DataTableColumn, 4),
Rows: make([]plugins.DataRowValues, 0),
func transformAnnotationToTable(annotations []map[string]string, query backend.DataQuery) *data.Frame {
frame := data.NewFrame(query.RefID,
data.NewField("time", nil, []string{}),
data.NewField("title", nil, []string{}),
data.NewField("tags", nil, []string{}),
data.NewField("text", nil, []string{}),
)
for _, a := range annotations {
frame.AppendRow(a["time"], a["title"], a["tags"], a["text"])
}
table.Columns[0].Text = "time"
table.Columns[1].Text = "title"
table.Columns[2].Text = "tags"
table.Columns[3].Text = "text"
for _, r := range data {
values := make([]interface{}, 4)
values[0] = r["time"]
values[1] = r["title"]
values[2] = r["tags"]
values[3] = r["text"]
table.Rows = append(table.Rows, values)
frame.Meta = &data.FrameMeta{
Custom: map[string]interface{}{
"rowCount": len(annotations),
},
}
result.Tables = append(result.Tables, table)
result.Meta.Set("rowCount", len(data))
return frame
}
func filterAlarms(alarms *cloudwatch.DescribeAlarmsOutput, namespace string, metricName string,

@ -2,13 +2,11 @@ package cloudwatch
import (
"context"
"encoding/json"
"fmt"
"regexp"
"time"
"github.com/grafana/grafana-aws-sdk/pkg/awsds"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/aws/aws-sdk-go/aws/client"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
@ -20,14 +18,34 @@ import (
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi"
"github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi/resourcegroupstaggingapiiface"
"github.com/grafana/grafana-aws-sdk/pkg/awsds"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/plugins/backendplugin"
"github.com/grafana/grafana/pkg/plugins/backendplugin/coreplugin"
"github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/setting"
)
type datasourceInfo struct {
profile string
region string
authType awsds.AuthType
assumeRoleARN string
externalID string
namespace string
endpoint string
accessKey string
secretKey string
datasourceID int64
}
const cloudWatchTSFormat = "2006-01-02 15:04:05.000"
const defaultRegion = "default"
@ -47,60 +65,133 @@ func init() {
}
type CloudWatchService struct {
LogsService *LogsService `inject:""`
Cfg *setting.Cfg `inject:""`
sessions SessionCache
LogsService *LogsService `inject:""`
BackendPluginManager backendplugin.Manager `inject:""`
Cfg *setting.Cfg `inject:""`
}
func (s *CloudWatchService) Init() error {
s.sessions = awsds.NewSessionCache()
return nil
}
plog.Debug("initing")
im := datasource.NewInstanceManager(NewInstanceSettings())
factory := coreplugin.New(backend.ServeOpts{
QueryDataHandler: newExecutor(s.LogsService, im, s.Cfg, awsds.NewSessionCache()),
})
func (s *CloudWatchService) NewExecutor(*models.DataSource) (plugins.DataPlugin, error) {
return newExecutor(s.LogsService, s.Cfg, s.sessions), nil
if err := s.BackendPluginManager.Register("cloudwatch", factory); err != nil {
plog.Error("Failed to register plugin", "error", err)
}
return nil
}
type SessionCache interface {
GetSession(region string, s awsds.AWSDatasourceSettings) (*session.Session, error)
}
func newExecutor(logsService *LogsService, cfg *setting.Cfg, sessions SessionCache) *cloudWatchExecutor {
func newExecutor(logsService *LogsService, im instancemgmt.InstanceManager, cfg *setting.Cfg, sessions SessionCache) *cloudWatchExecutor {
return &cloudWatchExecutor{
cfg: cfg,
logsService: logsService,
im: im,
cfg: cfg,
sessions: sessions,
}
}
func NewInstanceSettings() datasource.InstanceFactoryFunc {
return func(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
var jsonData map[string]string
err := json.Unmarshal(settings.JSONData, &jsonData)
if err != nil {
return nil, fmt.Errorf("error reading settings: %w", err)
}
model := datasourceInfo{
profile: jsonData["profile"],
region: jsonData["defaultRegion"],
assumeRoleARN: jsonData["assumeRoleArn"],
externalID: jsonData["externalId"],
endpoint: jsonData["endpoint"],
namespace: jsonData["customMetricsNamespaces"],
datasourceID: settings.ID,
}
atStr := jsonData["authType"]
at := awsds.AuthTypeDefault
switch atStr {
case "credentials":
at = awsds.AuthTypeSharedCreds
case "keys":
at = awsds.AuthTypeKeys
case "default":
at = awsds.AuthTypeDefault
case "ec2_iam_role":
at = awsds.AuthTypeEC2IAMRole
case "arn":
at = awsds.AuthTypeDefault
plog.Warn("Authentication type \"arn\" is deprecated, falling back to default")
default:
plog.Warn("Unrecognized AWS authentication type", "type", atStr)
}
model.authType = at
if model.profile == "" {
model.profile = settings.Database // legacy support
}
model.accessKey = settings.DecryptedSecureJSONData["accessKey"]
model.secretKey = settings.DecryptedSecureJSONData["secretKey"]
return model, nil
}
}
// cloudWatchExecutor executes CloudWatch requests.
type cloudWatchExecutor struct {
*models.DataSource
ec2Client ec2iface.EC2API
rgtaClient resourcegroupstaggingapiiface.ResourceGroupsTaggingAPIAPI
logsService *LogsService
im instancemgmt.InstanceManager
cfg *setting.Cfg
sessions SessionCache
}
func (e *cloudWatchExecutor) newSession(region string) (*session.Session, error) {
awsDatasourceSettings := e.getAWSDatasourceSettings(region)
func (e *cloudWatchExecutor) newSession(region string, pluginCtx backend.PluginContext) (*session.Session, error) {
dsInfo, err := e.getDSInfo(pluginCtx)
if err != nil {
return nil, err
}
if region == defaultRegion {
region = dsInfo.region
}
return e.sessions.GetSession(region, *awsDatasourceSettings)
return e.sessions.GetSession(region, awsds.AWSDatasourceSettings{
Profile: dsInfo.profile,
Region: region,
AuthType: dsInfo.authType,
AssumeRoleARN: dsInfo.assumeRoleARN,
ExternalID: dsInfo.externalID,
Endpoint: dsInfo.endpoint,
DefaultRegion: dsInfo.region,
AccessKey: dsInfo.accessKey,
SecretKey: dsInfo.secretKey,
})
}
func (e *cloudWatchExecutor) getCWClient(region string) (cloudwatchiface.CloudWatchAPI, error) {
sess, err := e.newSession(region)
func (e *cloudWatchExecutor) getCWClient(region string, pluginCtx backend.PluginContext) (cloudwatchiface.CloudWatchAPI, error) {
sess, err := e.newSession(region, pluginCtx)
if err != nil {
return nil, err
}
return NewCWClient(sess), nil
}
func (e *cloudWatchExecutor) getCWLogsClient(region string) (cloudwatchlogsiface.CloudWatchLogsAPI, error) {
sess, err := e.newSession(region)
func (e *cloudWatchExecutor) getCWLogsClient(region string, pluginCtx backend.PluginContext) (cloudwatchlogsiface.CloudWatchLogsAPI, error) {
sess, err := e.newSession(region, pluginCtx)
if err != nil {
return nil, err
}
@ -110,12 +201,12 @@ func (e *cloudWatchExecutor) getCWLogsClient(region string) (cloudwatchlogsiface
return logsClient, nil
}
func (e *cloudWatchExecutor) getEC2Client(region string) (ec2iface.EC2API, error) {
func (e *cloudWatchExecutor) getEC2Client(region string, pluginCtx backend.PluginContext) (ec2iface.EC2API, error) {
if e.ec2Client != nil {
return e.ec2Client, nil
}
sess, err := e.newSession(region)
sess, err := e.newSession(region, pluginCtx)
if err != nil {
return nil, err
}
@ -124,13 +215,13 @@ func (e *cloudWatchExecutor) getEC2Client(region string) (ec2iface.EC2API, error
return e.ec2Client, nil
}
func (e *cloudWatchExecutor) getRGTAClient(region string) (resourcegroupstaggingapiiface.ResourceGroupsTaggingAPIAPI,
func (e *cloudWatchExecutor) getRGTAClient(region string, pluginCtx backend.PluginContext) (resourcegroupstaggingapiiface.ResourceGroupsTaggingAPIAPI,
error) {
if e.rgtaClient != nil {
return e.rgtaClient, nil
}
sess, err := e.newSession(region)
sess, err := e.newSession(region, pluginCtx)
if err != nil {
return nil, err
}
@ -140,18 +231,17 @@ func (e *cloudWatchExecutor) getRGTAClient(region string) (resourcegroupstagging
}
func (e *cloudWatchExecutor) alertQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI,
queryContext plugins.DataQuery) (*cloudwatchlogs.GetQueryResultsOutput, error) {
queryContext backend.DataQuery, model *simplejson.Json) (*cloudwatchlogs.GetQueryResultsOutput, error) {
const maxAttempts = 8
const pollPeriod = 1000 * time.Millisecond
queryParams := queryContext.Queries[0].Model
startQueryOutput, err := e.executeStartQuery(ctx, logsClient, queryParams, *queryContext.TimeRange)
startQueryOutput, err := e.executeStartQuery(ctx, logsClient, model, queryContext.TimeRange)
if err != nil {
return nil, err
}
requestParams := simplejson.NewFromAny(map[string]interface{}{
"region": queryParams.Get("region").MustString(""),
"region": model.Get("region").MustString(""),
"queryId": *startQueryOutput.QueryId,
})
@ -177,11 +267,7 @@ func (e *cloudWatchExecutor) alertQuery(ctx context.Context, logsClient cloudwat
return nil, nil
}
// DataQuery executes a CloudWatch query.
func (e *cloudWatchExecutor) DataQuery(ctx context.Context, dsInfo *models.DataSource,
queryContext plugins.DataQuery) (plugins.DataResponse, error) {
e.DataSource = dsInfo
func (e *cloudWatchExecutor) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
/*
Unlike many other data sources, with Cloudwatch Logs query requests don't receive the results as the response
to the query, but rather an ID is first returned. Following this, a client is expected to send requests along
@ -189,146 +275,111 @@ func (e *cloudWatchExecutor) DataQuery(ctx context.Context, dsInfo *models.DataS
queries made via dashboards and Explore, the logic of making these repeated queries is handled on the
frontend, but because alerts are executed on the backend the logic needs to be reimplemented here.
*/
queryParams := queryContext.Queries[0].Model
_, fromAlert := queryContext.Headers["FromAlert"]
isLogAlertQuery := fromAlert && queryParams.Get("queryMode").MustString("") == "Logs"
q := req.Queries[0]
model, err := simplejson.NewJson(q.JSON)
if err != nil {
return nil, err
}
_, fromAlert := req.Headers["FromAlert"]
isLogAlertQuery := fromAlert && model.Get("queryMode").MustString("") == "Logs"
if isLogAlertQuery {
return e.executeLogAlertQuery(ctx, queryContext)
return e.executeLogAlertQuery(ctx, req)
}
queryType := queryParams.Get("type").MustString("")
queryType := model.Get("type").MustString("")
var err error
var result plugins.DataResponse
var result *backend.QueryDataResponse
switch queryType {
case "metricFindQuery":
result, err = e.executeMetricFindQuery(ctx, queryContext)
result, err = e.executeMetricFindQuery(ctx, model, q, req.PluginContext)
case "annotationQuery":
result, err = e.executeAnnotationQuery(ctx, queryContext)
result, err = e.executeAnnotationQuery(ctx, model, q, req.PluginContext)
case "logAction":
result, err = e.executeLogActions(ctx, queryContext)
result, err = e.executeLogActions(ctx, req)
case "liveLogAction":
result, err = e.executeLiveLogQuery(ctx, queryContext)
result, err = e.executeLiveLogQuery(ctx, req)
case "timeSeriesQuery":
fallthrough
default:
result, err = e.executeTimeSeriesQuery(ctx, queryContext)
result, err = e.executeTimeSeriesQuery(ctx, req)
}
return result, err
}
func (e *cloudWatchExecutor) executeLogAlertQuery(ctx context.Context, queryContext plugins.DataQuery) (
plugins.DataResponse, error) {
queryParams := queryContext.Queries[0].Model
queryParams.Set("subtype", "StartQuery")
queryParams.Set("queryString", queryParams.Get("expression").MustString(""))
func (e *cloudWatchExecutor) executeLogAlertQuery(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
resp := backend.NewQueryDataResponse()
region := queryParams.Get("region").MustString(defaultRegion)
if region == defaultRegion {
region = e.DataSource.JsonData.Get("defaultRegion").MustString()
queryParams.Set("region", region)
}
for _, q := range req.Queries {
model, err := simplejson.NewJson(q.JSON)
if err != nil {
continue
}
logsClient, err := e.getCWLogsClient(region)
if err != nil {
return plugins.DataResponse{}, err
}
model.Set("subtype", "StartQuery")
model.Set("queryString", model.Get("expression").MustString(""))
result, err := e.executeStartQuery(ctx, logsClient, queryParams, *queryContext.TimeRange)
if err != nil {
return plugins.DataResponse{}, err
}
region := model.Get("region").MustString(defaultRegion)
if region == defaultRegion {
dsInfo, err := e.getDSInfo(req.PluginContext)
if err != nil {
return nil, err
}
model.Set("region", dsInfo.region)
}
queryParams.Set("queryId", *result.QueryId)
logsClient, err := e.getCWLogsClient(region, req.PluginContext)
if err != nil {
return nil, err
}
// Get query results
getQueryResultsOutput, err := e.alertQuery(ctx, logsClient, queryContext)
if err != nil {
return plugins.DataResponse{}, err
}
result, err := e.executeStartQuery(ctx, logsClient, model, q.TimeRange)
if err != nil {
return nil, err
}
dataframe, err := logsResultsToDataframes(getQueryResultsOutput)
if err != nil {
return plugins.DataResponse{}, err
}
model.Set("queryId", *result.QueryId)
statsGroups := queryParams.Get("statsGroups").MustStringArray()
if len(statsGroups) > 0 && len(dataframe.Fields) > 0 {
groupedFrames, err := groupResults(dataframe, statsGroups)
getQueryResultsOutput, err := e.alertQuery(ctx, logsClient, q, model)
if err != nil {
return plugins.DataResponse{}, err
return nil, err
}
response := plugins.DataResponse{
Results: make(map[string]plugins.DataQueryResult),
dataframe, err := logsResultsToDataframes(getQueryResultsOutput)
if err != nil {
return nil, err
}
response.Results["A"] = plugins.DataQueryResult{
RefID: "A",
Dataframes: plugins.NewDecodedDataFrames(groupedFrames),
var frames []*data.Frame
statsGroups := model.Get("statsGroups").MustStringArray()
if len(statsGroups) > 0 && len(dataframe.Fields) > 0 {
frames, err = groupResults(dataframe, statsGroups)
if err != nil {
return nil, err
}
} else {
frames = data.Frames{dataframe}
}
return response, nil
respD := resp.Responses["A"]
respD.Frames = frames
resp.Responses["A"] = respD
}
response := plugins.DataResponse{
Results: map[string]plugins.DataQueryResult{
"A": {
RefID: "A",
Dataframes: plugins.NewDecodedDataFrames(data.Frames{dataframe}),
},
},
}
return response, nil
return resp, nil
}
func (e *cloudWatchExecutor) getAWSDatasourceSettings(region string) *awsds.AWSDatasourceSettings {
if region == defaultRegion {
region = e.DataSource.JsonData.Get("defaultRegion").MustString()
}
atStr := e.DataSource.JsonData.Get("authType").MustString()
assumeRoleARN := e.DataSource.JsonData.Get("assumeRoleArn").MustString()
externalID := e.DataSource.JsonData.Get("externalId").MustString()
endpoint := e.DataSource.JsonData.Get("endpoint").MustString()
decrypted := e.DataSource.DecryptedValues()
accessKey := decrypted["accessKey"]
secretKey := decrypted["secretKey"]
at := awsds.AuthTypeDefault
switch atStr {
case "credentials":
at = awsds.AuthTypeSharedCreds
case "keys":
at = awsds.AuthTypeKeys
case "default":
at = awsds.AuthTypeDefault
case "arn":
at = awsds.AuthTypeDefault
plog.Warn("Authentication type \"arn\" is deprecated, falling back to default")
case "ec2_iam_role":
at = awsds.AuthTypeEC2IAMRole
default:
plog.Warn("Unrecognized AWS authentication type", "type", atStr)
func (e *cloudWatchExecutor) getDSInfo(pluginCtx backend.PluginContext) (*datasourceInfo, error) {
i, err := e.im.Get(pluginCtx)
if err != nil {
return nil, err
}
profile := e.DataSource.JsonData.Get("profile").MustString()
if profile == "" {
profile = e.DataSource.Database // legacy support
}
instance := i.(datasourceInfo)
return &awsds.AWSDatasourceSettings{
Region: region,
Profile: profile,
AuthType: at,
AssumeRoleARN: assumeRoleARN,
ExternalID: externalID,
AccessKey: accessKey,
SecretKey: secretKey,
Endpoint: endpoint,
}
return &instance, nil
}
func isTerminated(queryStatus string) bool {

@ -15,6 +15,7 @@ import (
"github.com/aws/aws-sdk-go/service/servicequotas/servicequotasiface"
"github.com/centrifugal/centrifuge"
"github.com/google/uuid"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/models"
@ -108,24 +109,24 @@ func (r *logQueryRunner) publishResults(channelName string) error {
// executeLiveLogQuery executes a CloudWatch Logs query with live updates over WebSocket.
// A WebSocket channel is created, which goroutines send responses over.
func (e *cloudWatchExecutor) executeLiveLogQuery(ctx context.Context, queryContext plugins.DataQuery) (
plugins.DataResponse, error) {
func (e *cloudWatchExecutor) executeLiveLogQuery(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
responseChannelName := uuid.New().String()
responseChannel := make(chan plugins.DataResponse)
if err := e.logsService.AddResponseChannel("plugin/cloudwatch/"+responseChannelName, responseChannel); err != nil {
close(responseChannel)
return plugins.DataResponse{}, err
return nil, err
}
go e.sendLiveQueriesToChannel(queryContext, responseChannel)
go e.sendLiveQueriesToChannel(req, responseChannel)
response := plugins.DataResponse{
Results: map[string]plugins.DataQueryResult{
response := &backend.QueryDataResponse{
Responses: backend.Responses{
"A": {
RefID: "A",
Meta: simplejson.NewFromAny(map[string]interface{}{
"channelName": responseChannelName,
}),
Frames: data.Frames{data.NewFrame("A").SetMeta(&data.FrameMeta{
Custom: map[string]interface{}{
"channelName": responseChannelName,
},
})},
},
},
}
@ -133,18 +134,17 @@ func (e *cloudWatchExecutor) executeLiveLogQuery(ctx context.Context, queryConte
return response, nil
}
func (e *cloudWatchExecutor) sendLiveQueriesToChannel(queryContext plugins.DataQuery,
responseChannel chan plugins.DataResponse) {
func (e *cloudWatchExecutor) sendLiveQueriesToChannel(req *backend.QueryDataRequest, responseChannel chan plugins.DataResponse) {
defer close(responseChannel)
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute)
defer cancel()
eg, ectx := errgroup.WithContext(ctx)
for _, query := range queryContext.Queries {
for _, query := range req.Queries {
query := query
eg.Go(func() error {
return e.startLiveQuery(ectx, responseChannel, query, *queryContext.TimeRange)
return e.startLiveQuery(ectx, responseChannel, query, query.TimeRange, req.PluginContext)
})
}
@ -153,7 +153,7 @@ func (e *cloudWatchExecutor) sendLiveQueriesToChannel(queryContext plugins.DataQ
}
}
func (e *cloudWatchExecutor) getQueue(queueKey string) (chan bool, error) {
func (e *cloudWatchExecutor) getQueue(queueKey string, pluginCtx backend.PluginContext) (chan bool, error) {
e.logsService.queueLock.Lock()
defer e.logsService.queueLock.Unlock()
@ -161,7 +161,7 @@ func (e *cloudWatchExecutor) getQueue(queueKey string) (chan bool, error) {
return queue, nil
}
concurrentQueriesQuota := e.fetchConcurrentQueriesQuota(queueKey)
concurrentQueriesQuota := e.fetchConcurrentQueriesQuota(queueKey, pluginCtx)
queueChannel := make(chan bool, concurrentQueriesQuota)
e.logsService.queues[queueKey] = queueChannel
@ -169,8 +169,8 @@ func (e *cloudWatchExecutor) getQueue(queueKey string) (chan bool, error) {
return queueChannel, nil
}
func (e *cloudWatchExecutor) fetchConcurrentQueriesQuota(region string) int {
sess, err := e.newSession(region)
func (e *cloudWatchExecutor) fetchConcurrentQueriesQuota(region string, pluginCtx backend.PluginContext) int {
sess, err := e.newSession(region, pluginCtx)
if err != nil {
plog.Warn("Could not get service quota client")
return defaultConcurrentQueries
@ -211,17 +211,25 @@ func (e *cloudWatchExecutor) fetchConcurrentQueriesQuota(region string) int {
return defaultConcurrentQueries
}
func (e *cloudWatchExecutor) startLiveQuery(ctx context.Context, responseChannel chan plugins.DataResponse,
query plugins.DataSubQuery, timeRange plugins.DataTimeRange) error {
defaultRegion := e.DataSource.JsonData.Get("defaultRegion").MustString()
parameters := query.Model
region := parameters.Get("region").MustString(defaultRegion)
logsClient, err := e.getCWLogsClient(region)
func (e *cloudWatchExecutor) startLiveQuery(ctx context.Context, responseChannel chan plugins.DataResponse, query backend.DataQuery, timeRange backend.TimeRange, pluginCtx backend.PluginContext) error {
model, err := simplejson.NewJson(query.JSON)
if err != nil {
return err
}
queue, err := e.getQueue(fmt.Sprintf("%s-%d", region, e.DataSource.Id))
dsInfo, err := e.getDSInfo(pluginCtx)
if err != nil {
return err
}
defaultRegion := dsInfo.region
region := model.Get("region").MustString(defaultRegion)
logsClient, err := e.getCWLogsClient(region, pluginCtx)
if err != nil {
return err
}
queue, err := e.getQueue(fmt.Sprintf("%s-%d", region, dsInfo.datasourceID), pluginCtx)
if err != nil {
return err
}
@ -230,7 +238,7 @@ func (e *cloudWatchExecutor) startLiveQuery(ctx context.Context, responseChannel
queue <- true
defer func() { <-queue }()
startQueryOutput, err := e.executeStartQuery(ctx, logsClient, parameters, timeRange)
startQueryOutput, err := e.executeStartQuery(ctx, logsClient, model, timeRange)
if err != nil {
return err
}
@ -265,7 +273,7 @@ func (e *cloudWatchExecutor) startLiveQuery(ctx context.Context, responseChannel
// Because of this, if the frontend sees that a "stats ... by ..." query is being made
// the "statsGroups" parameter is sent along with the query to the backend so that we
// can correctly group the CloudWatch logs response.
statsGroups := parameters.Get("statsGroups").MustStringArray()
statsGroups := model.Get("statsGroups").MustStringArray()
if len(statsGroups) > 0 && len(dataFrame.Fields) > 0 {
groupedFrames, err := groupResults(dataFrame, statsGroups)
if err != nil {

@ -10,21 +10,27 @@ import (
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/plugins"
"golang.org/x/sync/errgroup"
)
func (e *cloudWatchExecutor) executeLogActions(ctx context.Context,
queryContext plugins.DataQuery) (plugins.DataResponse, error) {
resultChan := make(chan plugins.DataQueryResult, len(queryContext.Queries))
func (e *cloudWatchExecutor) executeLogActions(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
resp := backend.NewQueryDataResponse()
resultChan := make(chan backend.Responses, len(req.Queries))
eg, ectx := errgroup.WithContext(ctx)
for _, query := range queryContext.Queries {
for _, query := range req.Queries {
model, err := simplejson.NewJson(query.JSON)
if err != nil {
return nil, err
}
query := query
eg.Go(func() error {
dataframe, err := e.executeLogAction(ectx, queryContext, query)
dataframe, err := e.executeLogAction(ectx, model, query, req.PluginContext)
if err != nil {
return err
}
@ -36,16 +42,15 @@ func (e *cloudWatchExecutor) executeLogActions(ctx context.Context,
// Because of this, if the frontend sees that a "stats ... by ..." query is being made
// the "statsGroups" parameter is sent along with the query to the backend so that we
// can correctly group the CloudWatch logs response.
statsGroups := query.Model.Get("statsGroups").MustStringArray()
statsGroups := model.Get("statsGroups").MustStringArray()
if len(statsGroups) > 0 && len(dataframe.Fields) > 0 {
groupedFrames, err := groupResults(dataframe, statsGroups)
if err != nil {
return err
}
resultChan <- plugins.DataQueryResult{
RefID: query.RefID,
Dataframes: plugins.NewDecodedDataFrames(groupedFrames),
resultChan <- backend.Responses{
query.RefID: backend.DataResponse{Frames: groupedFrames},
}
return nil
}
@ -58,37 +63,41 @@ func (e *cloudWatchExecutor) executeLogActions(ctx context.Context,
}
}
resultChan <- plugins.DataQueryResult{
RefID: query.RefID,
Dataframes: plugins.NewDecodedDataFrames(data.Frames{dataframe}),
resultChan <- backend.Responses{
query.RefID: backend.DataResponse{Frames: data.Frames{dataframe}},
}
return nil
})
}
if err := eg.Wait(); err != nil {
return plugins.DataResponse{}, err
return nil, err
}
close(resultChan)
response := plugins.DataResponse{
Results: make(map[string]plugins.DataQueryResult),
}
for result := range resultChan {
response.Results[result.RefID] = result
for refID, response := range result {
respD := resp.Responses[refID]
respD.Frames = response.Frames
resp.Responses[refID] = respD
}
}
return response, nil
return resp, nil
}
func (e *cloudWatchExecutor) executeLogAction(ctx context.Context, queryContext plugins.DataQuery,
query plugins.DataSubQuery) (*data.Frame, error) {
parameters := query.Model
subType := query.Model.Get("subtype").MustString()
func (e *cloudWatchExecutor) executeLogAction(ctx context.Context, model *simplejson.Json, query backend.DataQuery, pluginCtx backend.PluginContext) (*data.Frame, error) {
subType := model.Get("subtype").MustString()
dsInfo, err := e.getDSInfo(pluginCtx)
if err != nil {
return nil, err
}
defaultRegion := dsInfo.region
defaultRegion := e.DataSource.JsonData.Get("defaultRegion").MustString()
region := parameters.Get("region").MustString(defaultRegion)
logsClient, err := e.getCWLogsClient(region)
region := model.Get("region").MustString(defaultRegion)
logsClient, err := e.getCWLogsClient(region, pluginCtx)
if err != nil {
return nil, err
}
@ -97,17 +106,17 @@ func (e *cloudWatchExecutor) executeLogAction(ctx context.Context, queryContext
switch subType {
case "DescribeLogGroups":
data, err = e.handleDescribeLogGroups(ctx, logsClient, parameters)
data, err = e.handleDescribeLogGroups(ctx, logsClient, model)
case "GetLogGroupFields":
data, err = e.handleGetLogGroupFields(ctx, logsClient, parameters, query.RefID)
data, err = e.handleGetLogGroupFields(ctx, logsClient, model, query.RefID)
case "StartQuery":
data, err = e.handleStartQuery(ctx, logsClient, parameters, *queryContext.TimeRange, query.RefID)
data, err = e.handleStartQuery(ctx, logsClient, model, query.TimeRange, query.RefID)
case "StopQuery":
data, err = e.handleStopQuery(ctx, logsClient, parameters)
data, err = e.handleStopQuery(ctx, logsClient, model)
case "GetQueryResults":
data, err = e.handleGetQueryResults(ctx, logsClient, parameters, query.RefID)
data, err = e.handleGetQueryResults(ctx, logsClient, model, query.RefID)
case "GetLogEvents":
data, err = e.handleGetLogEvents(ctx, logsClient, parameters)
data, err = e.handleGetLogEvents(ctx, logsClient, model)
}
if err != nil {
return nil, err
@ -200,16 +209,9 @@ func (e *cloudWatchExecutor) handleDescribeLogGroups(ctx context.Context,
}
func (e *cloudWatchExecutor) executeStartQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI,
parameters *simplejson.Json, timeRange plugins.DataTimeRange) (*cloudwatchlogs.StartQueryOutput, error) {
startTime, err := timeRange.ParseFrom()
if err != nil {
return nil, err
}
endTime, err := timeRange.ParseTo()
if err != nil {
return nil, err
}
parameters *simplejson.Json, timeRange backend.TimeRange) (*cloudwatchlogs.StartQueryOutput, error) {
startTime := timeRange.From
endTime := timeRange.To
if !startTime.Before(endTime) {
return nil, fmt.Errorf("invalid time range: start time must be before end time")
@ -237,8 +239,8 @@ func (e *cloudWatchExecutor) executeStartQuery(ctx context.Context, logsClient c
}
func (e *cloudWatchExecutor) handleStartQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI,
parameters *simplejson.Json, timeRange plugins.DataTimeRange, refID string) (*data.Frame, error) {
startQueryResponse, err := e.executeStartQuery(ctx, logsClient, parameters, timeRange)
model *simplejson.Json, timeRange backend.TimeRange, refID string) (*data.Frame, error) {
startQueryResponse, err := e.executeStartQuery(ctx, logsClient, model, timeRange)
if err != nil {
return nil, err
}
@ -246,7 +248,7 @@ func (e *cloudWatchExecutor) handleStartQuery(ctx context.Context, logsClient cl
dataFrame := data.NewFrame(refID, data.NewField("queryId", nil, []string{*startQueryResponse.QueryId}))
dataFrame.RefID = refID
clientRegion := parameters.Get("region").MustString("default")
clientRegion := model.Get("region").MustString("default")
dataFrame.Meta = &data.FrameMeta{
Custom: map[string]interface{}{

@ -2,6 +2,7 @@ package cloudwatch
import (
"context"
"encoding/json"
"fmt"
"testing"
"time"
@ -10,9 +11,10 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/plugins"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -47,39 +49,45 @@ func TestQuery_DescribeLogGroups(t *testing.T) {
},
}
executor := newExecutor(nil, newTestConfig(), fakeSessionCache{})
resp, err := executor.DataQuery(context.Background(), fakeDataSource(), plugins.DataQuery{
Queries: []plugins.DataSubQuery{
im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
return datasourceInfo{}, nil
})
executor := newExecutor(nil, im, newTestConfig(), fakeSessionCache{})
resp, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{
PluginContext: backend.PluginContext{
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{},
},
Queries: []backend.DataQuery{
{
Model: simplejson.NewFromAny(map[string]interface{}{
JSON: json.RawMessage(`{
"type": "logAction",
"subtype": "DescribeLogGroups",
"limit": 50,
}),
"limit": 50
}`),
},
},
})
require.NoError(t, err)
require.NotNil(t, resp)
assert.Equal(t, plugins.DataResponse{
Results: map[string]plugins.DataQueryResult{
"": {
Dataframes: plugins.NewDecodedDataFrames(data.Frames{
&data.Frame{
Name: "logGroups",
Fields: []*data.Field{
data.NewField("logGroupName", nil, []*string{
aws.String("group_a"), aws.String("group_b"), aws.String("group_c"),
}),
},
Meta: &data.FrameMeta{
PreferredVisualization: "logs",
},
assert.Equal(t, &backend.QueryDataResponse{Responses: backend.Responses{
"": backend.DataResponse{
Frames: data.Frames{
&data.Frame{
Name: "logGroups",
Fields: []*data.Field{
data.NewField("logGroupName", nil, []*string{
aws.String("group_a"), aws.String("group_b"), aws.String("group_c"),
}),
},
}),
Meta: &data.FrameMeta{
PreferredVisualization: "logs",
},
},
},
},
},
}, resp)
})
@ -100,39 +108,47 @@ func TestQuery_DescribeLogGroups(t *testing.T) {
},
}
executor := newExecutor(nil, newTestConfig(), fakeSessionCache{})
resp, err := executor.DataQuery(context.Background(), fakeDataSource(), plugins.DataQuery{
Queries: []plugins.DataSubQuery{
im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
return datasourceInfo{}, nil
})
executor := newExecutor(nil, im, newTestConfig(), fakeSessionCache{})
resp, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{
PluginContext: backend.PluginContext{
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{},
},
Queries: []backend.DataQuery{
{
Model: simplejson.NewFromAny(map[string]interface{}{
"type": "logAction",
"subtype": "DescribeLogGroups",
"logGroupNamePrefix": "g",
}),
JSON: json.RawMessage(`{
"type": "logAction",
"subtype": "DescribeLogGroups",
"limit": 50,
"region": "default",
"logGroupNamePrefix": "g"
}`),
},
},
})
require.NoError(t, err)
require.NotNil(t, resp)
assert.Equal(t, plugins.DataResponse{
Results: map[string]plugins.DataQueryResult{
"": {
Dataframes: plugins.NewDecodedDataFrames(data.Frames{
&data.Frame{
Name: "logGroups",
Fields: []*data.Field{
data.NewField("logGroupName", nil, []*string{
aws.String("group_a"), aws.String("group_b"), aws.String("group_c"),
}),
},
Meta: &data.FrameMeta{
PreferredVisualization: "logs",
},
assert.Equal(t, &backend.QueryDataResponse{Responses: backend.Responses{
"": backend.DataResponse{
Frames: data.Frames{
&data.Frame{
Name: "logGroups",
Fields: []*data.Field{
data.NewField("logGroupName", nil, []*string{
aws.String("group_a"), aws.String("group_b"), aws.String("group_c"),
}),
},
Meta: &data.FrameMeta{
PreferredVisualization: "logs",
},
}),
},
},
},
},
}, resp)
})
}
@ -170,17 +186,24 @@ func TestQuery_GetLogGroupFields(t *testing.T) {
const refID = "A"
executor := newExecutor(nil, newTestConfig(), fakeSessionCache{})
resp, err := executor.DataQuery(context.Background(), fakeDataSource(), plugins.DataQuery{
Queries: []plugins.DataSubQuery{
im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
return datasourceInfo{}, nil
})
executor := newExecutor(nil, im, newTestConfig(), fakeSessionCache{})
resp, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{
PluginContext: backend.PluginContext{
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{},
},
Queries: []backend.DataQuery{
{
RefID: refID,
Model: simplejson.NewFromAny(map[string]interface{}{
"type": "logAction",
"subtype": "GetLogGroupFields",
JSON: json.RawMessage(`{
"type": "logAction",
"subtype": "GetLogGroupFields",
"logGroupName": "group_a",
"limit": 50,
}),
"limit": 50
}`),
},
},
})
@ -202,13 +225,11 @@ func TestQuery_GetLogGroupFields(t *testing.T) {
},
}
expFrame.RefID = refID
assert.Equal(t, plugins.DataResponse{
Results: map[string]plugins.DataQueryResult{
refID: {
Dataframes: plugins.NewDecodedDataFrames(data.Frames{expFrame}),
RefID: refID,
},
assert.Equal(t, &backend.QueryDataResponse{Responses: backend.Responses{
refID: backend.DataResponse{
Frames: data.Frames{expFrame},
},
},
}, resp)
}
@ -244,23 +265,30 @@ func TestQuery_StartQuery(t *testing.T) {
},
}
timeRange := plugins.DataTimeRange{
From: "1584873443000",
To: "1584700643000",
timeRange := backend.TimeRange{
From: time.Unix(1584873443, 0),
To: time.Unix(1584700643, 0),
}
executor := newExecutor(nil, newTestConfig(), fakeSessionCache{})
_, err := executor.DataQuery(context.Background(), fakeDataSource(), plugins.DataQuery{
TimeRange: &timeRange,
Queries: []plugins.DataSubQuery{
im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
return datasourceInfo{}, nil
})
executor := newExecutor(nil, im, newTestConfig(), fakeSessionCache{})
_, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{
PluginContext: backend.PluginContext{
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{},
},
Queries: []backend.DataQuery{
{
Model: simplejson.NewFromAny(map[string]interface{}{
TimeRange: timeRange,
JSON: json.RawMessage(`{
"type": "logAction",
"subtype": "StartQuery",
"limit": 50,
"region": "default",
"queryString": "fields @message",
}),
"queryString": "fields @message"
}`),
},
},
})
@ -290,24 +318,31 @@ func TestQuery_StartQuery(t *testing.T) {
},
}
timeRange := plugins.DataTimeRange{
From: "1584700643000",
To: "1584873443000",
timeRange := backend.TimeRange{
From: time.Unix(1584700643000, 0),
To: time.Unix(1584873443000, 0),
}
executor := newExecutor(nil, newTestConfig(), fakeSessionCache{})
resp, err := executor.DataQuery(context.Background(), fakeDataSource(), plugins.DataQuery{
TimeRange: &timeRange,
Queries: []plugins.DataSubQuery{
im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
return datasourceInfo{}, nil
})
executor := newExecutor(nil, im, newTestConfig(), fakeSessionCache{})
resp, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{
PluginContext: backend.PluginContext{
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{},
},
Queries: []backend.DataQuery{
{
RefID: refID,
Model: simplejson.NewFromAny(map[string]interface{}{
RefID: refID,
TimeRange: timeRange,
JSON: json.RawMessage(`{
"type": "logAction",
"subtype": "StartQuery",
"limit": 50,
"region": "default",
"queryString": "fields @message",
}),
"queryString": "fields @message"
}`),
},
},
})
@ -324,13 +359,11 @@ func TestQuery_StartQuery(t *testing.T) {
},
PreferredVisualization: "logs",
}
assert.Equal(t, plugins.DataResponse{
Results: map[string]plugins.DataQueryResult{
refID: {
Dataframes: plugins.NewDecodedDataFrames(data.Frames{expFrame}),
RefID: refID,
},
assert.Equal(t, &backend.QueryDataResponse{Responses: backend.Responses{
refID: {
Frames: data.Frames{expFrame},
},
},
}, resp)
})
}
@ -366,21 +399,28 @@ func TestQuery_StopQuery(t *testing.T) {
},
}
timeRange := plugins.DataTimeRange{
From: "1584873443000",
To: "1584700643000",
im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
return datasourceInfo{}, nil
})
timeRange := backend.TimeRange{
From: time.Unix(1584873443, 0),
To: time.Unix(1584700643, 0),
}
executor := newExecutor(nil, newTestConfig(), fakeSessionCache{})
resp, err := executor.DataQuery(context.Background(), fakeDataSource(), plugins.DataQuery{
TimeRange: &timeRange,
Queries: []plugins.DataSubQuery{
executor := newExecutor(nil, im, newTestConfig(), fakeSessionCache{})
resp, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{
PluginContext: backend.PluginContext{
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{},
},
Queries: []backend.DataQuery{
{
Model: simplejson.NewFromAny(map[string]interface{}{
TimeRange: timeRange,
JSON: json.RawMessage(`{
"type": "logAction",
"subtype": "StopQuery",
"queryId": "abcd-efgh-ijkl-mnop",
}),
"queryId": "abcd-efgh-ijkl-mnop"
}`),
},
},
})
@ -395,12 +435,11 @@ func TestQuery_StopQuery(t *testing.T) {
PreferredVisualization: "logs",
},
}
assert.Equal(t, plugins.DataResponse{
Results: map[string]plugins.DataQueryResult{
"": {
Dataframes: plugins.NewDecodedDataFrames(data.Frames{expFrame}),
},
assert.Equal(t, &backend.QueryDataResponse{Responses: backend.Responses{
"": {
Frames: data.Frames{expFrame},
},
},
}, resp)
}
@ -458,16 +497,23 @@ func TestQuery_GetQueryResults(t *testing.T) {
},
}
executor := newExecutor(nil, newTestConfig(), fakeSessionCache{})
resp, err := executor.DataQuery(context.Background(), fakeDataSource(), plugins.DataQuery{
Queries: []plugins.DataSubQuery{
im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
return datasourceInfo{}, nil
})
executor := newExecutor(nil, im, newTestConfig(), fakeSessionCache{})
resp, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{
PluginContext: backend.PluginContext{
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{},
},
Queries: []backend.DataQuery{
{
RefID: refID,
Model: simplejson.NewFromAny(map[string]interface{}{
JSON: json.RawMessage(`{
"type": "logAction",
"subtype": "GetQueryResults",
"queryId": "abcd-efgh-ijkl-mnop",
}),
"queryId": "abcd-efgh-ijkl-mnop"
}`),
},
},
})
@ -507,12 +553,10 @@ func TestQuery_GetQueryResults(t *testing.T) {
PreferredVisualization: "logs",
}
assert.Equal(t, plugins.DataResponse{
Results: map[string]plugins.DataQueryResult{
refID: {
RefID: refID,
Dataframes: plugins.NewDecodedDataFrames(data.Frames{expFrame}),
},
assert.Equal(t, &backend.QueryDataResponse{Responses: backend.Responses{
refID: {
Frames: data.Frames{expFrame},
},
},
}, resp)
}

@ -15,9 +15,10 @@ import (
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/metrics"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/util/errutil"
)
@ -241,68 +242,57 @@ var dimensionsMap = map[string][]string{
var regionCache sync.Map
func (e *cloudWatchExecutor) executeMetricFindQuery(ctx context.Context, queryContext plugins.DataQuery) (
plugins.DataResponse, error) {
firstQuery := queryContext.Queries[0]
func (e *cloudWatchExecutor) executeMetricFindQuery(ctx context.Context, model *simplejson.Json, query backend.DataQuery, pluginCtx backend.PluginContext) (*backend.QueryDataResponse, error) {
subType := model.Get("subtype").MustString()
parameters := firstQuery.Model
subType := firstQuery.Model.Get("subtype").MustString()
var data []suggestData
var err error
switch subType {
case "regions":
data, err = e.handleGetRegions(ctx, parameters, queryContext)
data, err = e.handleGetRegions(ctx, model, pluginCtx)
case "namespaces":
data, err = e.handleGetNamespaces(ctx, parameters, queryContext)
data, err = e.handleGetNamespaces(ctx, model, pluginCtx)
case "metrics":
data, err = e.handleGetMetrics(ctx, parameters, queryContext)
data, err = e.handleGetMetrics(ctx, model, pluginCtx)
case "dimension_keys":
data, err = e.handleGetDimensions(ctx, parameters, queryContext)
data, err = e.handleGetDimensions(ctx, model, pluginCtx)
case "dimension_values":
data, err = e.handleGetDimensionValues(ctx, parameters, queryContext)
data, err = e.handleGetDimensionValues(ctx, model, pluginCtx)
case "ebs_volume_ids":
data, err = e.handleGetEbsVolumeIds(ctx, parameters, queryContext)
data, err = e.handleGetEbsVolumeIds(ctx, model, pluginCtx)
case "ec2_instance_attribute":
data, err = e.handleGetEc2InstanceAttribute(ctx, parameters, queryContext)
data, err = e.handleGetEc2InstanceAttribute(ctx, model, pluginCtx)
case "resource_arns":
data, err = e.handleGetResourceArns(ctx, parameters, queryContext)
data, err = e.handleGetResourceArns(ctx, model, pluginCtx)
}
if err != nil {
return plugins.DataResponse{}, err
return nil, err
}
queryResult := plugins.DataQueryResult{Meta: simplejson.New(), RefID: firstQuery.RefID}
transformToTable(data, &queryResult)
result := plugins.DataResponse{
Results: map[string]plugins.DataQueryResult{
firstQuery.RefID: queryResult,
},
}
return result, nil
resp := backend.NewQueryDataResponse()
respD := resp.Responses[query.RefID]
respD.Frames = append(respD.Frames, transformToTable(data))
resp.Responses[query.RefID] = respD
return resp, nil
}
func transformToTable(data []suggestData, result *plugins.DataQueryResult) {
table := plugins.DataTable{
Columns: []plugins.DataTableColumn{
{
Text: "text",
},
{
Text: "value",
},
},
Rows: make([]plugins.DataRowValues, 0),
func transformToTable(d []suggestData) *data.Frame {
frame := data.NewFrame("",
data.NewField("text", nil, []string{}),
data.NewField("value", nil, []string{}))
for _, r := range d {
frame.AppendRow(r.Text, r.Value)
}
for _, r := range data {
values := []interface{}{
r.Text,
r.Value,
}
table.Rows = append(table.Rows, values)
frame.Meta = &data.FrameMeta{
Custom: map[string]interface{}{
"rowCount": len(d),
},
}
result.Tables = append(result.Tables, table)
result.Meta.Set("rowCount", len(data))
return frame
}
func parseMultiSelectValue(input string) []string {
@ -322,16 +312,20 @@ func parseMultiSelectValue(input string) []string {
// Whenever this list is updated, the frontend list should also be updated.
// Please update the region list in public/app/plugins/datasource/cloudwatch/partials/config.html
func (e *cloudWatchExecutor) handleGetRegions(ctx context.Context, parameters *simplejson.Json,
queryContext plugins.DataQuery) ([]suggestData, error) {
dsInfo := e.getAWSDatasourceSettings(defaultRegion)
profile := dsInfo.Profile
pluginCtx backend.PluginContext) ([]suggestData, error) {
dsInfo, err := e.getDSInfo(pluginCtx)
if err != nil {
return nil, err
}
profile := dsInfo.profile
if cache, ok := regionCache.Load(profile); ok {
if cache2, ok2 := cache.([]suggestData); ok2 {
return cache2, nil
}
}
client, err := e.getEC2Client(defaultRegion)
client, err := e.getEC2Client(defaultRegion, pluginCtx)
if err != nil {
return nil, err
}
@ -367,12 +361,18 @@ func (e *cloudWatchExecutor) handleGetRegions(ctx context.Context, parameters *s
return result, nil
}
func (e *cloudWatchExecutor) handleGetNamespaces(ctx context.Context, parameters *simplejson.Json, queryContext plugins.DataQuery) ([]suggestData, error) {
keys := []string{}
func (e *cloudWatchExecutor) handleGetNamespaces(ctx context.Context, parameters *simplejson.Json, pluginCtx backend.PluginContext) ([]suggestData, error) {
var keys []string
for key := range metricsMap {
keys = append(keys, key)
}
customNamespaces := e.DataSource.JsonData.Get("customMetricsNamespaces").MustString()
dsInfo, err := e.getDSInfo(pluginCtx)
if err != nil {
return nil, err
}
customNamespaces := dsInfo.namespace
if customNamespaces != "" {
keys = append(keys, strings.Split(customNamespaces, ",")...)
}
@ -386,7 +386,7 @@ func (e *cloudWatchExecutor) handleGetNamespaces(ctx context.Context, parameters
return result, nil
}
func (e *cloudWatchExecutor) handleGetMetrics(ctx context.Context, parameters *simplejson.Json, queryContext plugins.DataQuery) ([]suggestData, error) {
func (e *cloudWatchExecutor) handleGetMetrics(ctx context.Context, parameters *simplejson.Json, pluginCtx backend.PluginContext) ([]suggestData, error) {
region := parameters.Get("region").MustString()
namespace := parameters.Get("namespace").MustString()
@ -398,7 +398,7 @@ func (e *cloudWatchExecutor) handleGetMetrics(ctx context.Context, parameters *s
}
} else {
var err error
if namespaceMetrics, err = e.getMetricsForCustomMetrics(region, namespace); err != nil {
if namespaceMetrics, err = e.getMetricsForCustomMetrics(region, namespace, pluginCtx); err != nil {
return nil, errutil.Wrap("unable to call AWS API", err)
}
}
@ -412,7 +412,7 @@ func (e *cloudWatchExecutor) handleGetMetrics(ctx context.Context, parameters *s
return result, nil
}
func (e *cloudWatchExecutor) handleGetDimensions(ctx context.Context, parameters *simplejson.Json, queryContext plugins.DataQuery) ([]suggestData, error) {
func (e *cloudWatchExecutor) handleGetDimensions(ctx context.Context, parameters *simplejson.Json, pluginCtx backend.PluginContext) ([]suggestData, error) {
region := parameters.Get("region").MustString()
namespace := parameters.Get("namespace").MustString()
@ -424,7 +424,7 @@ func (e *cloudWatchExecutor) handleGetDimensions(ctx context.Context, parameters
}
} else {
var err error
if dimensionValues, err = e.getDimensionsForCustomMetrics(region, namespace); err != nil {
if dimensionValues, err = e.getDimensionsForCustomMetrics(region, namespace, pluginCtx); err != nil {
return nil, errutil.Wrap("unable to call AWS API", err)
}
}
@ -438,7 +438,7 @@ func (e *cloudWatchExecutor) handleGetDimensions(ctx context.Context, parameters
return result, nil
}
func (e *cloudWatchExecutor) handleGetDimensionValues(ctx context.Context, parameters *simplejson.Json, queryContext plugins.DataQuery) ([]suggestData, error) {
func (e *cloudWatchExecutor) handleGetDimensionValues(ctx context.Context, parameters *simplejson.Json, pluginCtx backend.PluginContext) ([]suggestData, error) {
region := parameters.Get("region").MustString()
namespace := parameters.Get("namespace").MustString()
metricName := parameters.Get("metricName").MustString()
@ -469,7 +469,7 @@ func (e *cloudWatchExecutor) handleGetDimensionValues(ctx context.Context, param
if metricName != "" {
params.MetricName = aws.String(metricName)
}
metrics, err := e.listMetrics(region, params)
metrics, err := e.listMetrics(region, params, pluginCtx)
if err != nil {
return nil, err
}
@ -497,12 +497,12 @@ func (e *cloudWatchExecutor) handleGetDimensionValues(ctx context.Context, param
}
func (e *cloudWatchExecutor) handleGetEbsVolumeIds(ctx context.Context, parameters *simplejson.Json,
queryContext plugins.DataQuery) ([]suggestData, error) {
pluginCtx backend.PluginContext) ([]suggestData, error) {
region := parameters.Get("region").MustString()
instanceId := parameters.Get("instanceId").MustString()
instanceIds := aws.StringSlice(parseMultiSelectValue(instanceId))
instances, err := e.ec2DescribeInstances(region, nil, instanceIds)
instances, err := e.ec2DescribeInstances(region, nil, instanceIds, pluginCtx)
if err != nil {
return nil, err
}
@ -520,7 +520,7 @@ func (e *cloudWatchExecutor) handleGetEbsVolumeIds(ctx context.Context, paramete
}
func (e *cloudWatchExecutor) handleGetEc2InstanceAttribute(ctx context.Context, parameters *simplejson.Json,
queryContext plugins.DataQuery) ([]suggestData, error) {
pluginCtx backend.PluginContext) ([]suggestData, error) {
region := parameters.Get("region").MustString()
attributeName := parameters.Get("attributeName").MustString()
filterJson := parameters.Get("filters").MustMap()
@ -541,7 +541,7 @@ func (e *cloudWatchExecutor) handleGetEc2InstanceAttribute(ctx context.Context,
}
}
instances, err := e.ec2DescribeInstances(region, filters, nil)
instances, err := e.ec2DescribeInstances(region, filters, nil, pluginCtx)
if err != nil {
return nil, err
}
@ -600,7 +600,7 @@ func (e *cloudWatchExecutor) handleGetEc2InstanceAttribute(ctx context.Context,
}
func (e *cloudWatchExecutor) handleGetResourceArns(ctx context.Context, parameters *simplejson.Json,
queryContext plugins.DataQuery) ([]suggestData, error) {
pluginCtx backend.PluginContext) ([]suggestData, error) {
region := parameters.Get("region").MustString()
resourceType := parameters.Get("resourceType").MustString()
filterJson := parameters.Get("tags").MustMap()
@ -624,7 +624,7 @@ func (e *cloudWatchExecutor) handleGetResourceArns(ctx context.Context, paramete
var resourceTypes []*string
resourceTypes = append(resourceTypes, &resourceType)
resources, err := e.resourceGroupsGetResources(region, filters, resourceTypes)
resources, err := e.resourceGroupsGetResources(region, filters, resourceTypes, pluginCtx)
if err != nil {
return nil, err
}
@ -638,14 +638,14 @@ func (e *cloudWatchExecutor) handleGetResourceArns(ctx context.Context, paramete
return result, nil
}
func (e *cloudWatchExecutor) listMetrics(region string, params *cloudwatch.ListMetricsInput) ([]*cloudwatch.Metric, error) {
client, err := e.getCWClient(region)
func (e *cloudWatchExecutor) listMetrics(region string, params *cloudwatch.ListMetricsInput, pluginCtx backend.PluginContext) ([]*cloudwatch.Metric, error) {
client, err := e.getCWClient(region, pluginCtx)
if err != nil {
return nil, err
}
plog.Debug("Listing metrics pages")
cloudWatchMetrics := []*cloudwatch.Metric{}
var cloudWatchMetrics []*cloudwatch.Metric
pageNum := 0
err = client.ListMetricsPages(params, func(page *cloudwatch.ListMetricsOutput, lastPage bool) bool {
@ -663,13 +663,13 @@ func (e *cloudWatchExecutor) listMetrics(region string, params *cloudwatch.ListM
return cloudWatchMetrics, err
}
func (e *cloudWatchExecutor) ec2DescribeInstances(region string, filters []*ec2.Filter, instanceIds []*string) (*ec2.DescribeInstancesOutput, error) {
func (e *cloudWatchExecutor) ec2DescribeInstances(region string, filters []*ec2.Filter, instanceIds []*string, pluginCtx backend.PluginContext) (*ec2.DescribeInstancesOutput, error) {
params := &ec2.DescribeInstancesInput{
Filters: filters,
InstanceIds: instanceIds,
}
client, err := e.getEC2Client(region)
client, err := e.getEC2Client(region, pluginCtx)
if err != nil {
return nil, err
}
@ -686,13 +686,13 @@ func (e *cloudWatchExecutor) ec2DescribeInstances(region string, filters []*ec2.
}
func (e *cloudWatchExecutor) resourceGroupsGetResources(region string, filters []*resourcegroupstaggingapi.TagFilter,
resourceTypes []*string) (*resourcegroupstaggingapi.GetResourcesOutput, error) {
resourceTypes []*string, pluginCtx backend.PluginContext) (*resourcegroupstaggingapi.GetResourcesOutput, error) {
params := &resourcegroupstaggingapi.GetResourcesInput{
ResourceTypeFilters: resourceTypes,
TagFilters: filters,
}
client, err := e.getRGTAClient(region)
client, err := e.getRGTAClient(region, pluginCtx)
if err != nil {
return nil, err
}
@ -711,90 +711,94 @@ func (e *cloudWatchExecutor) resourceGroupsGetResources(region string, filters [
var metricsCacheLock sync.Mutex
func (e *cloudWatchExecutor) getMetricsForCustomMetrics(region, namespace string) ([]string, error) {
func (e *cloudWatchExecutor) getMetricsForCustomMetrics(region, namespace string, pluginCtx backend.PluginContext) ([]string, error) {
plog.Debug("Getting metrics for custom metrics", "region", region, "namespace", namespace)
metricsCacheLock.Lock()
defer metricsCacheLock.Unlock()
dsInfo := e.getAWSDatasourceSettings(region)
dsInfo, err := e.getDSInfo(pluginCtx)
if err != nil {
return nil, err
}
if _, ok := customMetricsMetricsMap[dsInfo.Profile]; !ok {
customMetricsMetricsMap[dsInfo.Profile] = make(map[string]map[string]*customMetricsCache)
if _, ok := customMetricsMetricsMap[dsInfo.profile]; !ok {
customMetricsMetricsMap[dsInfo.profile] = make(map[string]map[string]*customMetricsCache)
}
if _, ok := customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region]; !ok {
customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region] = make(map[string]*customMetricsCache)
if _, ok := customMetricsMetricsMap[dsInfo.profile][dsInfo.region]; !ok {
customMetricsMetricsMap[dsInfo.profile][dsInfo.region] = make(map[string]*customMetricsCache)
}
if _, ok := customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region][namespace]; !ok {
customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region][namespace] = &customMetricsCache{}
customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region][namespace].Cache = make([]string, 0)
if _, ok := customMetricsMetricsMap[dsInfo.profile][dsInfo.region][namespace]; !ok {
customMetricsMetricsMap[dsInfo.profile][dsInfo.region][namespace] = &customMetricsCache{}
customMetricsMetricsMap[dsInfo.profile][dsInfo.region][namespace].Cache = make([]string, 0)
}
if customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region][namespace].Expire.After(time.Now()) {
return customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region][namespace].Cache, nil
if customMetricsMetricsMap[dsInfo.profile][dsInfo.region][namespace].Expire.After(time.Now()) {
return customMetricsMetricsMap[dsInfo.profile][dsInfo.region][namespace].Cache, nil
}
metrics, err := e.listMetrics(region, &cloudwatch.ListMetricsInput{
Namespace: aws.String(namespace),
})
}, pluginCtx)
if err != nil {
return []string{}, err
}
customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region][namespace].Cache = make([]string, 0)
customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region][namespace].Expire = time.Now().Add(5 * time.Minute)
customMetricsMetricsMap[dsInfo.profile][dsInfo.region][namespace].Cache = make([]string, 0)
customMetricsMetricsMap[dsInfo.profile][dsInfo.region][namespace].Expire = time.Now().Add(5 * time.Minute)
for _, metric := range metrics {
if isDuplicate(customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region][namespace].Cache, *metric.MetricName) {
if isDuplicate(customMetricsMetricsMap[dsInfo.profile][dsInfo.region][namespace].Cache, *metric.MetricName) {
continue
}
customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region][namespace].Cache = append(
customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region][namespace].Cache, *metric.MetricName)
customMetricsMetricsMap[dsInfo.profile][dsInfo.region][namespace].Cache = append(
customMetricsMetricsMap[dsInfo.profile][dsInfo.region][namespace].Cache, *metric.MetricName)
}
return customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region][namespace].Cache, nil
return customMetricsMetricsMap[dsInfo.profile][dsInfo.region][namespace].Cache, nil
}
var dimensionsCacheLock sync.Mutex
func (e *cloudWatchExecutor) getDimensionsForCustomMetrics(region, namespace string) ([]string, error) {
func (e *cloudWatchExecutor) getDimensionsForCustomMetrics(region, namespace string, pluginCtx backend.PluginContext) ([]string, error) {
dimensionsCacheLock.Lock()
defer dimensionsCacheLock.Unlock()
dsInfo := e.getAWSDatasourceSettings(region)
dsInfo, err := e.getDSInfo(pluginCtx)
if err != nil {
return nil, err
}
if _, ok := customMetricsDimensionsMap[dsInfo.Profile]; !ok {
customMetricsDimensionsMap[dsInfo.Profile] = make(map[string]map[string]*customMetricsCache)
if _, ok := customMetricsDimensionsMap[dsInfo.profile]; !ok {
customMetricsDimensionsMap[dsInfo.profile] = make(map[string]map[string]*customMetricsCache)
}
if _, ok := customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region]; !ok {
customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region] = make(map[string]*customMetricsCache)
if _, ok := customMetricsDimensionsMap[dsInfo.profile][dsInfo.region]; !ok {
customMetricsDimensionsMap[dsInfo.profile][dsInfo.region] = make(map[string]*customMetricsCache)
}
if _, ok := customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region][namespace]; !ok {
customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region][namespace] = &customMetricsCache{}
customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region][namespace].Cache = make([]string, 0)
if _, ok := customMetricsDimensionsMap[dsInfo.profile][dsInfo.region][namespace]; !ok {
customMetricsDimensionsMap[dsInfo.profile][dsInfo.region][namespace] = &customMetricsCache{}
customMetricsDimensionsMap[dsInfo.profile][dsInfo.region][namespace].Cache = make([]string, 0)
}
if customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region][namespace].Expire.After(time.Now()) {
return customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region][namespace].Cache, nil
if customMetricsDimensionsMap[dsInfo.profile][dsInfo.region][namespace].Expire.After(time.Now()) {
return customMetricsDimensionsMap[dsInfo.profile][dsInfo.region][namespace].Cache, nil
}
metrics, err := e.listMetrics(region, &cloudwatch.ListMetricsInput{Namespace: aws.String(namespace)})
metrics, err := e.listMetrics(region, &cloudwatch.ListMetricsInput{Namespace: aws.String(namespace)}, pluginCtx)
if err != nil {
return []string{}, err
}
customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region][namespace].Cache = make([]string, 0)
customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region][namespace].Expire = time.Now().Add(5 * time.Minute)
customMetricsDimensionsMap[dsInfo.profile][dsInfo.region][namespace].Cache = make([]string, 0)
customMetricsDimensionsMap[dsInfo.profile][dsInfo.region][namespace].Expire = time.Now().Add(5 * time.Minute)
for _, metric := range metrics {
for _, dimension := range metric.Dimensions {
if isDuplicate(customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region][namespace].Cache, *dimension.Name) {
if isDuplicate(customMetricsDimensionsMap[dsInfo.profile][dsInfo.region][namespace].Cache, *dimension.Name) {
continue
}
customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region][namespace].Cache = append(
customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region][namespace].Cache, *dimension.Name)
customMetricsDimensionsMap[dsInfo.profile][dsInfo.region][namespace].Cache = append(
customMetricsDimensionsMap[dsInfo.profile][dsInfo.region][namespace].Cache, *dimension.Name)
}
}
return customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region][namespace].Cache, nil
return customMetricsDimensionsMap[dsInfo.profile][dsInfo.region][namespace].Cache, nil
}
func isDuplicate(nameList []string, target string) bool {

@ -2,6 +2,7 @@ package cloudwatch
import (
"context"
"encoding/json"
"testing"
"github.com/aws/aws-sdk-go/aws"
@ -13,8 +14,10 @@ import (
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi"
"github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi/resourcegroupstaggingapiiface"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/setting"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -26,14 +29,14 @@ func TestQuery_Metrics(t *testing.T) {
NewCWClient = origNewCWClient
})
var client FakeCWClient
var cwClient FakeCWClient
NewCWClient = func(sess *session.Session) cloudwatchiface.CloudWatchAPI {
return client
return cwClient
}
t.Run("Custom metrics", func(t *testing.T) {
client = FakeCWClient{
cwClient = FakeCWClient{
Metrics: []*cloudwatch.Metric{
{
MetricName: aws.String("Test_MetricName"),
@ -45,52 +48,50 @@ func TestQuery_Metrics(t *testing.T) {
},
},
}
executor := newExecutor(nil, newTestConfig(), fakeSessionCache{})
resp, err := executor.DataQuery(context.Background(), fakeDataSource(), plugins.DataQuery{
Queries: []plugins.DataSubQuery{
im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
return datasourceInfo{}, nil
})
executor := newExecutor(nil, im, newTestConfig(), fakeSessionCache{})
resp, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{
PluginContext: backend.PluginContext{
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{},
},
Queries: []backend.DataQuery{
{
Model: simplejson.NewFromAny(map[string]interface{}{
JSON: json.RawMessage(`{
"type": "metricFindQuery",
"subtype": "metrics",
"region": "us-east-1",
"namespace": "custom",
}),
"namespace": "custom"
}`),
},
},
})
require.NoError(t, err)
assert.Equal(t, plugins.DataResponse{
Results: map[string]plugins.DataQueryResult{
"": {
Meta: simplejson.NewFromAny(map[string]interface{}{
"rowCount": 1,
}),
Tables: []plugins.DataTable{
{
Columns: []plugins.DataTableColumn{
{
Text: "text",
},
{
Text: "value",
},
},
Rows: []plugins.DataRowValues{
{
"Test_MetricName",
"Test_MetricName",
},
},
},
},
},
expFrame := data.NewFrame(
"",
data.NewField("text", nil, []string{"Test_MetricName"}),
data.NewField("value", nil, []string{"Test_MetricName"}),
)
expFrame.Meta = &data.FrameMeta{
Custom: map[string]interface{}{
"rowCount": 1,
},
}
assert.Equal(t, &backend.QueryDataResponse{Responses: backend.Responses{
"": {
Frames: data.Frames{expFrame},
},
},
}, resp)
})
t.Run("Dimension keys for custom metrics", func(t *testing.T) {
client = FakeCWClient{
cwClient = FakeCWClient{
Metrics: []*cloudwatch.Metric{
{
MetricName: aws.String("Test_MetricName"),
@ -102,47 +103,44 @@ func TestQuery_Metrics(t *testing.T) {
},
},
}
executor := newExecutor(nil, newTestConfig(), fakeSessionCache{})
resp, err := executor.DataQuery(context.Background(), fakeDataSource(), plugins.DataQuery{
Queries: []plugins.DataSubQuery{
im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
return datasourceInfo{}, nil
})
executor := newExecutor(nil, im, newTestConfig(), fakeSessionCache{})
resp, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{
PluginContext: backend.PluginContext{
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{},
},
Queries: []backend.DataQuery{
{
Model: simplejson.NewFromAny(map[string]interface{}{
JSON: json.RawMessage(`{
"type": "metricFindQuery",
"subtype": "dimension_keys",
"region": "us-east-1",
"namespace": "custom",
}),
"namespace": "custom"
}`),
},
},
})
require.NoError(t, err)
assert.Equal(t, plugins.DataResponse{
Results: map[string]plugins.DataQueryResult{
"": {
Meta: simplejson.NewFromAny(map[string]interface{}{
"rowCount": 1,
}),
Tables: []plugins.DataTable{
{
Columns: []plugins.DataTableColumn{
{
Text: "text",
},
{
Text: "value",
},
},
Rows: []plugins.DataRowValues{
{
"Test_DimensionName",
"Test_DimensionName",
},
},
},
},
},
expFrame := data.NewFrame(
"",
data.NewField("text", nil, []string{"Test_DimensionName"}),
data.NewField("value", nil, []string{"Test_DimensionName"}),
)
expFrame.Meta = &data.FrameMeta{
Custom: map[string]interface{}{
"rowCount": 1,
},
}
assert.Equal(t, &backend.QueryDataResponse{Responses: backend.Responses{
"": {
Frames: data.Frames{expFrame},
},
},
}, resp)
})
}
@ -164,53 +162,46 @@ func TestQuery_Regions(t *testing.T) {
cli = fakeEC2Client{
regions: []string{regionName},
}
executor := newExecutor(nil, newTestConfig(), fakeSessionCache{})
resp, err := executor.DataQuery(context.Background(), fakeDataSource(), plugins.DataQuery{
Queries: []plugins.DataSubQuery{
im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
return datasourceInfo{}, nil
})
executor := newExecutor(nil, im, newTestConfig(), fakeSessionCache{})
resp, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{
PluginContext: backend.PluginContext{
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{},
},
Queries: []backend.DataQuery{
{
Model: simplejson.NewFromAny(map[string]interface{}{
JSON: json.RawMessage(`{
"type": "metricFindQuery",
"subtype": "regions",
"region": "us-east-1",
"namespace": "custom",
}),
"namespace": "custom"
}`),
},
},
})
require.NoError(t, err)
rows := []plugins.DataRowValues{}
for _, region := range knownRegions {
rows = append(rows, []interface{}{
region,
region,
})
expRegions := append(knownRegions, regionName)
expFrame := data.NewFrame(
"",
data.NewField("text", nil, expRegions),
data.NewField("value", nil, expRegions),
)
expFrame.Meta = &data.FrameMeta{
Custom: map[string]interface{}{
"rowCount": len(knownRegions) + 1,
},
}
rows = append(rows, []interface{}{
regionName,
regionName,
})
assert.Equal(t, plugins.DataResponse{
Results: map[string]plugins.DataQueryResult{
"": {
Meta: simplejson.NewFromAny(map[string]interface{}{
"rowCount": len(knownRegions) + 1,
}),
Tables: []plugins.DataTable{
{
Columns: []plugins.DataTableColumn{
{
Text: "text",
},
{
Text: "value",
},
},
Rows: rows,
},
},
},
assert.Equal(t, &backend.QueryDataResponse{Responses: backend.Responses{
"": {
Frames: data.Frames{expFrame},
},
},
}, resp)
})
}
@ -246,50 +237,48 @@ func TestQuery_InstanceAttributes(t *testing.T) {
},
},
}
executor := newExecutor(nil, newTestConfig(), fakeSessionCache{})
resp, err := executor.DataQuery(context.Background(), fakeDataSource(), plugins.DataQuery{
Queries: []plugins.DataSubQuery{
im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
return datasourceInfo{}, nil
})
executor := newExecutor(nil, im, newTestConfig(), fakeSessionCache{})
resp, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{
PluginContext: backend.PluginContext{
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{},
},
Queries: []backend.DataQuery{
{
Model: simplejson.NewFromAny(map[string]interface{}{
JSON: json.RawMessage(`{
"type": "metricFindQuery",
"subtype": "ec2_instance_attribute",
"region": "us-east-1",
"attributeName": "InstanceId",
"filters": map[string]interface{}{
"tag:Environment": []string{"production"},
},
}),
"filters": {
"tag:Environment": ["production"]
}
}`),
},
},
})
require.NoError(t, err)
assert.Equal(t, plugins.DataResponse{
Results: map[string]plugins.DataQueryResult{
"": {
Meta: simplejson.NewFromAny(map[string]interface{}{
"rowCount": 1,
}),
Tables: []plugins.DataTable{
{
Columns: []plugins.DataTableColumn{
{
Text: "text",
},
{
Text: "value",
},
},
Rows: []plugins.DataRowValues{
{
instanceID,
instanceID,
},
},
},
},
},
expFrame := data.NewFrame(
"",
data.NewField("text", nil, []string{instanceID}),
data.NewField("value", nil, []string{instanceID}),
)
expFrame.Meta = &data.FrameMeta{
Custom: map[string]interface{}{
"rowCount": 1,
},
}
assert.Equal(t, &backend.QueryDataResponse{Responses: backend.Responses{
"": {
Frames: data.Frames{expFrame},
},
},
}, resp)
})
}
@ -307,8 +296,6 @@ func TestQuery_EBSVolumeIDs(t *testing.T) {
}
t.Run("", func(t *testing.T) {
const instanceIDs = "{i-1, i-2, i-3}"
cli = fakeEC2Client{
reservations: []*ec2.Reservation{
{
@ -349,67 +336,46 @@ func TestQuery_EBSVolumeIDs(t *testing.T) {
},
},
}
executor := newExecutor(nil, newTestConfig(), fakeSessionCache{})
resp, err := executor.DataQuery(context.Background(), fakeDataSource(), plugins.DataQuery{
Queries: []plugins.DataSubQuery{
im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
return datasourceInfo{}, nil
})
executor := newExecutor(nil, im, newTestConfig(), fakeSessionCache{})
resp, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{
PluginContext: backend.PluginContext{
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{},
},
Queries: []backend.DataQuery{
{
Model: simplejson.NewFromAny(map[string]interface{}{
JSON: json.RawMessage(`{
"type": "metricFindQuery",
"subtype": "ebs_volume_ids",
"region": "us-east-1",
"instanceId": instanceIDs,
}),
"instanceId": "{i-1, i-2, i-3}"
}`),
},
},
})
require.NoError(t, err)
assert.Equal(t, plugins.DataResponse{
Results: map[string]plugins.DataQueryResult{
"": {
Meta: simplejson.NewFromAny(map[string]interface{}{
"rowCount": 6,
}),
Tables: []plugins.DataTable{
{
Columns: []plugins.DataTableColumn{
{
Text: "text",
},
{
Text: "value",
},
},
Rows: []plugins.DataRowValues{
{
"vol-1-1",
"vol-1-1",
},
{
"vol-1-2",
"vol-1-2",
},
{
"vol-2-1",
"vol-2-1",
},
{
"vol-2-2",
"vol-2-2",
},
{
"vol-3-1",
"vol-3-1",
},
{
"vol-3-2",
"vol-3-2",
},
},
},
},
},
expValues := []string{"vol-1-1", "vol-1-2", "vol-2-1", "vol-2-2", "vol-3-1", "vol-3-2"}
expFrame := data.NewFrame(
"",
data.NewField("text", nil, expValues),
data.NewField("value", nil, expValues),
)
expFrame.Meta = &data.FrameMeta{
Custom: map[string]interface{}{
"rowCount": 6,
},
}
assert.Equal(t, &backend.QueryDataResponse{Responses: backend.Responses{
"": {
Frames: data.Frames{expFrame},
},
},
}, resp)
})
}
@ -449,54 +415,52 @@ func TestQuery_ResourceARNs(t *testing.T) {
},
},
}
executor := newExecutor(nil, newTestConfig(), fakeSessionCache{})
resp, err := executor.DataQuery(context.Background(), fakeDataSource(), plugins.DataQuery{
Queries: []plugins.DataSubQuery{
im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
return datasourceInfo{}, nil
})
executor := newExecutor(nil, im, newTestConfig(), fakeSessionCache{})
resp, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{
PluginContext: backend.PluginContext{
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{},
},
Queries: []backend.DataQuery{
{
Model: simplejson.NewFromAny(map[string]interface{}{
JSON: json.RawMessage(`{
"type": "metricFindQuery",
"subtype": "resource_arns",
"region": "us-east-1",
"resourceType": "ec2:instance",
"tags": map[string]interface{}{
"Environment": []string{"production"},
},
}),
"tags": {
"Environment": ["production"]
}
}`),
},
},
})
require.NoError(t, err)
assert.Equal(t, plugins.DataResponse{
Results: map[string]plugins.DataQueryResult{
"": {
Meta: simplejson.NewFromAny(map[string]interface{}{
"rowCount": 2,
}),
Tables: []plugins.DataTable{
{
Columns: []plugins.DataTableColumn{
{
Text: "text",
},
{
Text: "value",
},
},
Rows: []plugins.DataRowValues{
{
"arn:aws:ec2:us-east-1:123456789012:instance/i-12345678901234567",
"arn:aws:ec2:us-east-1:123456789012:instance/i-12345678901234567",
},
{
"arn:aws:ec2:us-east-1:123456789012:instance/i-76543210987654321",
"arn:aws:ec2:us-east-1:123456789012:instance/i-76543210987654321",
},
},
},
},
},
expValues := []string{
"arn:aws:ec2:us-east-1:123456789012:instance/i-12345678901234567",
"arn:aws:ec2:us-east-1:123456789012:instance/i-76543210987654321",
}
expFrame := data.NewFrame(
"",
data.NewField("text", nil, expValues),
data.NewField("value", nil, expValues),
)
expFrame.Meta = &data.FrameMeta{
Custom: map[string]interface{}{
"rowCount": 2,
},
}
assert.Equal(t, &backend.QueryDataResponse{Responses: backend.Responses{
"": {
Frames: data.Frames{expFrame},
},
},
}, resp)
})
}
@ -528,9 +492,13 @@ func TestQuery_ListMetricsPagination(t *testing.T) {
t.Run("List Metrics and page limit is reached", func(t *testing.T) {
client = FakeCWClient{Metrics: metrics, MetricsPerPage: 2}
executor := newExecutor(nil, &setting.Cfg{AWSListMetricsPageLimit: 3, AWSAllowedAuthProviders: []string{"default"}, AWSAssumeRoleEnabled: true}, fakeSessionCache{})
executor.DataSource = fakeDataSource()
response, err := executor.listMetrics("default", &cloudwatch.ListMetricsInput{})
im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
return datasourceInfo{}, nil
})
executor := newExecutor(nil, im, &setting.Cfg{AWSListMetricsPageLimit: 3, AWSAllowedAuthProviders: []string{"default"}, AWSAssumeRoleEnabled: true}, fakeSessionCache{})
response, err := executor.listMetrics("default", &cloudwatch.ListMetricsInput{}, backend.PluginContext{
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{},
})
require.NoError(t, err)
expectedMetrics := client.MetricsPerPage * executor.cfg.AWSListMetricsPageLimit
@ -539,9 +507,13 @@ func TestQuery_ListMetricsPagination(t *testing.T) {
t.Run("List Metrics and page limit is not reached", func(t *testing.T) {
client = FakeCWClient{Metrics: metrics, MetricsPerPage: 2}
executor := newExecutor(nil, &setting.Cfg{AWSListMetricsPageLimit: 1000, AWSAllowedAuthProviders: []string{"default"}, AWSAssumeRoleEnabled: true}, fakeSessionCache{})
executor.DataSource = fakeDataSource()
response, err := executor.listMetrics("default", &cloudwatch.ListMetricsInput{})
im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
return datasourceInfo{}, nil
})
executor := newExecutor(nil, im, &setting.Cfg{AWSListMetricsPageLimit: 1000, AWSAllowedAuthProviders: []string{"default"}, AWSAssumeRoleEnabled: true}, fakeSessionCache{})
response, err := executor.listMetrics("default", &cloudwatch.ListMetricsInput{}, backend.PluginContext{
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{},
})
require.NoError(t, err)
assert.Equal(t, len(metrics), len(response))

@ -8,8 +8,8 @@ import (
"strings"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/plugins"
)
// returns a map of queries with query id as key. In the case a q request query
@ -55,8 +55,7 @@ func (e *cloudWatchExecutor) transformRequestQueriesToCloudWatchQueries(requestQ
return cloudwatchQueries, nil
}
func (e *cloudWatchExecutor) transformQueryResponsesToQueryResult(cloudwatchResponses []*cloudwatchResponse,
requestQueries []*requestQuery, startTime time.Time, endTime time.Time) (map[string]plugins.DataQueryResult, error) {
func (e *cloudWatchExecutor) transformQueryResponsesToQueryResult(cloudwatchResponses []*cloudwatchResponse, requestQueries []*requestQuery, startTime time.Time, endTime time.Time) (map[string]*backend.DataResponse, error) {
responsesByRefID := make(map[string][]*cloudwatchResponse)
refIDs := sort.StringSlice{}
for _, res := range cloudwatchResponses {
@ -66,23 +65,35 @@ func (e *cloudWatchExecutor) transformQueryResponsesToQueryResult(cloudwatchResp
// Ensure stable results
refIDs.Sort()
results := make(map[string]plugins.DataQueryResult)
results := make(map[string]*backend.DataResponse)
for _, refID := range refIDs {
responses := responsesByRefID[refID]
queryResult := plugins.DataQueryResult{
RefID: refID,
Series: plugins.DataTimeSeriesSlice{},
}
queryResult := backend.DataResponse{}
frames := make(data.Frames, 0, len(responses))
requestExceededMaxLimit := false
partialData := false
executedQueries := []executedQuery{}
var executedQueries []executedQuery
for _, response := range responses {
frames = append(frames, response.DataFrames...)
requestExceededMaxLimit = requestExceededMaxLimit || response.RequestExceededMaxLimit
partialData = partialData || response.PartialData
if requestExceededMaxLimit {
frames[0].AppendNotices(data.Notice{
Severity: data.NoticeSeverityWarning,
Text: "cloudwatch GetMetricData error: Maximum number of allowed metrics exceeded. Your search may have been limited",
})
}
if partialData {
frames[0].AppendNotices(data.Notice{
Severity: data.NoticeSeverityWarning,
Text: "cloudwatch GetMetricData error: Too many datapoints requested - your search has been limited. Please try to reduce the time range",
})
}
executedQueries = append(executedQueries, executedQuery{
Expression: response.Expression,
ID: response.Id,
@ -94,13 +105,6 @@ func (e *cloudWatchExecutor) transformQueryResponsesToQueryResult(cloudwatchResp
return frames[i].Name < frames[j].Name
})
if requestExceededMaxLimit {
queryResult.ErrorString = "Cloudwatch GetMetricData error: Maximum number of allowed metrics exceeded. Your search may have been limited."
}
if partialData {
queryResult.ErrorString = "Cloudwatch GetMetricData error: Too many datapoints requested - your search has been limited. Please try to reduce the time range"
}
eq, err := json.Marshal(executedQueries)
if err != nil {
return nil, fmt.Errorf("could not marshal executedString struct: %w", err)
@ -120,8 +124,12 @@ func (e *cloudWatchExecutor) transformQueryResponsesToQueryResult(cloudwatchResp
}
for _, frame := range frames {
frame.Meta = &data.FrameMeta{
ExecutedQueryString: string(eq),
if frame.Meta != nil {
frame.Meta.ExecutedQueryString = string(eq)
} else {
frame.Meta = &data.FrameMeta{
ExecutedQueryString: string(eq),
}
}
if link == "" || len(frame.Fields) < 2 {
@ -135,8 +143,8 @@ func (e *cloudWatchExecutor) transformQueryResponsesToQueryResult(cloudwatchResp
frame.Fields[1].Config.Links = createDataLinks(link)
}
queryResult.Dataframes = plugins.NewDecodedDataFrames(frames)
results[refID] = queryResult
queryResult.Frames = frames
results[refID] = &queryResult
}
return results, nil

@ -12,7 +12,7 @@ import (
)
func TestQueryTransformer(t *testing.T) {
executor := newExecutor(nil, &setting.Cfg{}, fakeSessionCache{})
executor := newExecutor(nil, nil, &setting.Cfg{}, fakeSessionCache{})
t.Run("One cloudwatchQuery is generated when its request query has one stat", func(t *testing.T) {
requestQueries := []*requestQuery{
{

@ -10,22 +10,26 @@ import (
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/plugins"
)
// Parses the json queries and returns a requestQuery. The requestQuery has a 1 to 1 mapping to a query editor row
func (e *cloudWatchExecutor) parseQueries(queryContext plugins.DataQuery, startTime time.Time,
endTime time.Time) (map[string][]*requestQuery, error) {
func (e *cloudWatchExecutor) parseQueries(req *backend.QueryDataRequest, startTime time.Time, endTime time.Time) (map[string][]*requestQuery, error) {
requestQueries := make(map[string][]*requestQuery)
for i, query := range queryContext.Queries {
queryType := query.Model.Get("type").MustString()
for _, query := range req.Queries {
model, err := simplejson.NewJson(query.JSON)
if err != nil {
return nil, &queryError{err: err, RefID: query.RefID}
}
queryType := model.Get("type").MustString()
if queryType != "timeSeriesQuery" && queryType != "" {
continue
}
refID := query.RefID
query, err := parseRequestQuery(queryContext.Queries[i].Model, refID, startTime, endTime)
query, err := parseRequestQuery(model, refID, startTime, endTime)
if err != nil {
return nil, &queryError{err: err, RefID: refID}
}

@ -15,37 +15,9 @@ import (
"github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi"
"github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi/resourcegroupstaggingapiiface"
"github.com/grafana/grafana-aws-sdk/pkg/awsds"
"github.com/grafana/grafana/pkg/components/securejsondata"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/setting"
)
type fakeDataSourceCfg struct {
assumeRoleARN string
externalID string
}
func fakeDataSource(cfgs ...fakeDataSourceCfg) *models.DataSource {
jsonData := simplejson.New()
jsonData.Set("defaultRegion", defaultRegion)
jsonData.Set("authType", "default")
for _, cfg := range cfgs {
if cfg.assumeRoleARN != "" {
jsonData.Set("assumeRoleArn", cfg.assumeRoleARN)
}
if cfg.externalID != "" {
jsonData.Set("externalId", cfg.externalID)
}
}
return &models.DataSource{
Id: 1,
Database: "default",
JsonData: jsonData,
SecureJsonData: securejsondata.SecureJsonData{},
}
}
type FakeCWLogsClient struct {
cloudwatchlogsiface.CloudWatchLogsAPI
logGroups cloudwatchlogs.DescribeLogGroupsOutput

@ -4,127 +4,122 @@ import (
"context"
"fmt"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/util/errutil"
"golang.org/x/sync/errgroup"
)
func (e *cloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryContext plugins.DataQuery) (
plugins.DataResponse, error) {
func (e *cloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
plog.Debug("Executing time series query")
startTime, err := queryContext.TimeRange.ParseFrom()
if err != nil {
return plugins.DataResponse{}, errutil.Wrap("failed to parse start time", err)
}
endTime, err := queryContext.TimeRange.ParseTo()
if err != nil {
return plugins.DataResponse{}, errutil.Wrap("failed to parse end time", err)
}
if !startTime.Before(endTime) {
return plugins.DataResponse{}, fmt.Errorf("invalid time range: start time must be before end time")
}
requestQueriesByRegion, err := e.parseQueries(queryContext, startTime, endTime)
if err != nil {
return plugins.DataResponse{}, err
}
resp := backend.NewQueryDataResponse()
if len(requestQueriesByRegion) == 0 {
return plugins.DataResponse{
Results: make(map[string]plugins.DataQueryResult),
}, nil
}
for _, q := range req.Queries {
startTime := q.TimeRange.From
endTime := q.TimeRange.To
if !startTime.Before(endTime) {
return nil, fmt.Errorf("invalid time range: start time must be before end time")
}
requestQueriesByRegion, err := e.parseQueries(req, startTime, endTime)
if err != nil {
return nil, err
}
if len(requestQueriesByRegion) == 0 {
return backend.NewQueryDataResponse(), nil
}
resultChan := make(chan plugins.DataQueryResult, len(queryContext.Queries))
eg, ectx := errgroup.WithContext(ctx)
for r, q := range requestQueriesByRegion {
requestQueries := q
region := r
eg.Go(func() error {
defer func() {
if err := recover(); err != nil {
plog.Error("Execute Get Metric Data Query Panic", "error", err, "stack", log.Stack(1))
if theErr, ok := err.(error); ok {
resultChan <- plugins.DataQueryResult{
Error: theErr,
resultChan := make(chan *backend.DataResponse, len(req.Queries))
eg, ectx := errgroup.WithContext(ctx)
for r, q := range requestQueriesByRegion {
requestQueries := q
region := r
eg.Go(func() error {
defer func() {
if err := recover(); err != nil {
plog.Error("Execute Get Metric Data Query Panic", "error", err, "stack", log.Stack(1))
if theErr, ok := err.(error); ok {
resultChan <- &backend.DataResponse{
Error: theErr,
}
}
}
}()
client, err := e.getCWClient(region, req.PluginContext)
if err != nil {
return err
}
}()
client, err := e.getCWClient(region)
if err != nil {
return err
}
queries, err := e.transformRequestQueriesToCloudWatchQueries(requestQueries)
if err != nil {
for _, query := range requestQueries {
resultChan <- plugins.DataQueryResult{
RefID: query.RefId,
Error: err,
queries, err := e.transformRequestQueriesToCloudWatchQueries(requestQueries)
if err != nil {
for _, query := range requestQueries {
resultChan <- &backend.DataResponse{
Frames: data.Frames{data.NewFrame(query.RefId)},
Error: err,
}
}
return nil
}
return nil
}
metricDataInput, err := e.buildMetricDataInput(startTime, endTime, queries)
if err != nil {
return err
}
cloudwatchResponses := make([]*cloudwatchResponse, 0)
mdo, err := e.executeRequest(ectx, client, metricDataInput)
if err != nil {
for _, query := range requestQueries {
resultChan <- plugins.DataQueryResult{
RefID: query.RefId,
Error: err,
metricDataInput, err := e.buildMetricDataInput(startTime, endTime, queries)
if err != nil {
return err
}
cloudwatchResponses := make([]*cloudwatchResponse, 0)
mdo, err := e.executeRequest(ectx, client, metricDataInput)
if err != nil {
for _, query := range requestQueries {
resultChan <- &backend.DataResponse{
Frames: data.Frames{data.NewFrame(query.RefId)},
Error: err,
}
}
return nil
}
return nil
}
responses, err := e.parseResponse(mdo, queries)
if err != nil {
for _, query := range requestQueries {
resultChan <- plugins.DataQueryResult{
RefID: query.RefId,
Error: err,
responses, err := e.parseResponse(mdo, queries)
if err != nil {
for _, query := range requestQueries {
resultChan <- &backend.DataResponse{
Frames: data.Frames{data.NewFrame(query.RefId)},
Error: err,
}
}
return nil
}
return nil
}
cloudwatchResponses = append(cloudwatchResponses, responses...)
res, err := e.transformQueryResponsesToQueryResult(cloudwatchResponses, requestQueries, startTime, endTime)
if err != nil {
for _, query := range requestQueries {
resultChan <- plugins.DataQueryResult{
RefID: query.RefId,
Error: err,
cloudwatchResponses = append(cloudwatchResponses, responses...)
res, err := e.transformQueryResponsesToQueryResult(cloudwatchResponses, requestQueries, startTime, endTime)
if err != nil {
for _, query := range requestQueries {
resultChan <- &backend.DataResponse{
Frames: data.Frames{data.NewFrame(query.RefId)},
Error: err,
}
}
return nil
}
for _, queryRes := range res {
resultChan <- queryRes
}
return nil
}
})
}
for _, queryRes := range res {
resultChan <- queryRes
}
return nil
})
}
if err := eg.Wait(); err != nil {
return plugins.DataResponse{}, err
}
close(resultChan)
if err := eg.Wait(); err != nil {
return nil, err
}
close(resultChan)
results := plugins.DataResponse{
Results: make(map[string]plugins.DataQueryResult),
}
for result := range resultChan {
results.Results[result.RefID] = result
for result := range resultChan {
resp.Responses[q.RefID] = *result
}
}
return results, nil
return resp, nil
}

@ -3,25 +3,30 @@ package cloudwatch
import (
"context"
"testing"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/plugins"
"github.com/stretchr/testify/assert"
)
func TestTimeSeriesQuery(t *testing.T) {
executor := newExecutor(nil, newTestConfig(), fakeSessionCache{})
executor := newExecutor(nil, nil, newTestConfig(), fakeSessionCache{})
now := time.Now()
t.Run("End time before start time should result in error", func(t *testing.T) {
timeRange := plugins.NewDataTimeRange("now-1h", "now-2h")
_, err := executor.executeTimeSeriesQuery(
context.TODO(), plugins.DataQuery{TimeRange: &timeRange})
_, err := executor.executeTimeSeriesQuery(context.TODO(), &backend.QueryDataRequest{Queries: []backend.DataQuery{{TimeRange: backend.TimeRange{
From: now.Add(time.Hour * -1),
To: now.Add(time.Hour * -2),
}}}})
assert.EqualError(t, err, "invalid time range: start time must be before end time")
})
t.Run("End time equals start time should result in error", func(t *testing.T) {
timeRange := plugins.NewDataTimeRange("now-1h", "now-1h")
_, err := executor.executeTimeSeriesQuery(
context.TODO(), plugins.DataQuery{TimeRange: &timeRange})
_, err := executor.executeTimeSeriesQuery(context.TODO(), &backend.QueryDataRequest{Queries: []backend.DataQuery{{TimeRange: backend.TimeRange{
From: now.Add(time.Hour * -1),
To: now.Add(time.Hour * -1),
}}}})
assert.EqualError(t, err, "invalid time range: start time must be before end time")
})
}

@ -60,7 +60,6 @@ func (s *Service) Init() error {
s.registry["postgres"] = s.PostgresService.NewExecutor
s.registry["mysql"] = mysql.NewExecutor
s.registry["elasticsearch"] = elasticsearch.NewExecutor
s.registry["cloudwatch"] = s.CloudWatchService.NewExecutor
s.registry["stackdriver"] = s.CloudMonitoringService.NewExecutor
s.registry["grafana-azure-monitor-datasource"] = s.AzureMonitorService.NewExecutor
s.registry["loki"] = loki.NewExecutor

@ -32,7 +32,9 @@ import {
LogRowModel,
rangeUtil,
ScopedVars,
TableData,
TimeRange,
toLegacyResponseData,
} from '@grafana/data';
import { notifyApp } from 'app/core/actions';
@ -65,7 +67,7 @@ import { AwsUrl, encodeUrl } from './aws_url';
import { increasingInterval } from './utils/rxjs/increasingInterval';
import config from 'app/core/config';
const TSDB_QUERY_ENDPOINT = '/api/tsdb/query';
const DS_QUERY_ENDPOINT = '/api/ds/query';
// Constants also defined in tsdb/cloudwatch/cloudwatch.go
const LOG_IDENTIFIER_INTERNAL = '__log__grafana_internal__';
@ -184,9 +186,10 @@ export class CloudWatchDatasource extends DataSourceApi<CloudWatchQuery, CloudWa
queries: queryParams,
};
return this.awsRequest(TSDB_QUERY_ENDPOINT, requestParams).pipe(
return this.awsRequest(DS_QUERY_ENDPOINT, requestParams).pipe(
mergeMap((response: TSDBResponse) => {
const channelName: string = response.results['A'].meta.channelName;
const dataQueryResponse = toDataQueryResponse({ data: response }, options.targets);
const channelName: string = dataQueryResponse.data[0].meta.custom.channelName;
const channel = getGrafanaLiveSrv().getChannel({
scope: LiveChannelScope.Plugin,
namespace: 'cloudwatch',
@ -543,7 +546,7 @@ export class CloudWatchDatasource extends DataSourceApi<CloudWatchQuery, CloudWa
}
performTimeSeriesQuery(request: MetricRequest, { from, to }: TimeRange): Observable<any> {
return this.awsRequest(TSDB_QUERY_ENDPOINT, request).pipe(
return this.awsRequest(DS_QUERY_ENDPOINT, request).pipe(
map((res) => {
const dataframes: DataFrame[] = toDataQueryResponse({ data: res }).data;
if (!dataframes || dataframes.length <= 0) {
@ -576,8 +579,11 @@ export class CloudWatchDatasource extends DataSourceApi<CloudWatchQuery, CloudWa
);
}
transformSuggestDataFromTable(suggestData: TSDBResponse): Array<{ text: any; label: any; value: any }> {
return suggestData.results['metricFindQuery'].tables[0].rows.map(([text, value]) => ({
transformSuggestDataFromDataframes(suggestData: TSDBResponse): Array<{ text: any; label: any; value: any }> {
const frames = toDataQueryResponse({ data: suggestData }).data as DataFrame[];
const table = toLegacyResponseData(frames[0]) as TableData;
return table.rows.map(([text, value]) => ({
text,
value,
label: value,
@ -586,7 +592,7 @@ export class CloudWatchDatasource extends DataSourceApi<CloudWatchQuery, CloudWa
doMetricQueryRequest(subtype: string, parameters: any): Promise<Array<{ text: any; label: any; value: any }>> {
const range = this.timeSrv.timeRange();
return this.awsRequest(TSDB_QUERY_ENDPOINT, {
return this.awsRequest(DS_QUERY_ENDPOINT, {
from: range.from.valueOf().toString(),
to: range.to.valueOf().toString(),
queries: [
@ -603,7 +609,7 @@ export class CloudWatchDatasource extends DataSourceApi<CloudWatchQuery, CloudWa
})
.pipe(
map((r) => {
return this.transformSuggestDataFromTable(r);
return this.transformSuggestDataFromDataframes(r);
})
)
.toPromise();
@ -650,7 +656,7 @@ export class CloudWatchDatasource extends DataSourceApi<CloudWatchQuery, CloudWa
const resultsToDataFrames = (val: any): DataFrame[] => toDataQueryResponse(val).data || [];
return this.awsRequest(TSDB_QUERY_ENDPOINT, requestParams).pipe(
return this.awsRequest(DS_QUERY_ENDPOINT, requestParams).pipe(
map((response) => resultsToDataFrames({ data: response })),
catchError((err) => {
if (err.data?.error) {
@ -835,7 +841,7 @@ export class CloudWatchDatasource extends DataSourceApi<CloudWatchQuery, CloudWa
alarmNamePrefix: annotation.alarmNamePrefix || '',
};
return this.awsRequest(TSDB_QUERY_ENDPOINT, {
return this.awsRequest(DS_QUERY_ENDPOINT, {
from: options.range.from.valueOf().toString(),
to: options.range.to.valueOf().toString(),
queries: [
@ -849,7 +855,9 @@ export class CloudWatchDatasource extends DataSourceApi<CloudWatchQuery, CloudWa
})
.pipe(
map((r) => {
return r.results['annotationQuery'].tables[0].rows.map((v) => ({
const frames = toDataQueryResponse({ data: r }).data as DataFrame[];
const table = toLegacyResponseData(frames[0]) as TableData;
return table.rows.map((v) => ({
annotation: annotation,
time: Date.parse(v[0]),
title: v[1],

Loading…
Cancel
Save