diff --git a/pkg/tests/api/metrics/api_metrics_test.go b/pkg/tests/api/metrics/api_metrics_test.go index 7586ce2b0b7..20a25e7808d 100644 --- a/pkg/tests/api/metrics/api_metrics_test.go +++ b/pkg/tests/api/metrics/api_metrics_test.go @@ -67,36 +67,36 @@ func TestQueryCloudWatchMetrics(t *testing.T) { }), }, } - tr := makeCWRequest(t, req, addr) + result := makeCWRequest(t, req, addr) + + dataFrames := plugins.NewDecodedDataFrames(data.Frames{ + &data.Frame{ + RefID: "A", + Fields: []*data.Field{ + data.NewField("text", nil, []string{"Test_MetricName"}), + data.NewField("value", nil, []string{"Test_MetricName"}), + }, + Meta: &data.FrameMeta{ + Custom: map[string]interface{}{ + "rowCount": float64(1), + }, + }, + }, + }) + + // Have to call this so that dataFrames.encoded is non-nil, for the comparison + // In the future we should use gocmp instead and ignore this field + _, err := dataFrames.Encoded() + require.NoError(t, err) assert.Equal(t, plugins.DataResponse{ Results: map[string]plugins.DataQueryResult{ "A": { - RefID: "A", - Meta: simplejson.NewFromAny(map[string]interface{}{ - "rowCount": float64(1), - }), - Tables: []plugins.DataTable{ - { - Columns: []plugins.DataTableColumn{ - { - Text: "text", - }, - { - Text: "value", - }, - }, - Rows: []plugins.DataRowValues{ - { - "Test_MetricName", - "Test_MetricName", - }, - }, - }, - }, + RefID: "A", + Dataframes: dataFrames, }, }, - }, tr) + }, result) }) } @@ -132,7 +132,8 @@ func TestQueryCloudWatchLogs(t *testing.T) { dataFrames := plugins.NewDecodedDataFrames(data.Frames{ &data.Frame{ - Name: "logGroups", + Name: "logGroups", + RefID: "A", Fields: []*data.Field{ data.NewField("logGroupName", nil, []*string{}), }, diff --git a/pkg/tsdb/cloudwatch/annotation_query.go b/pkg/tsdb/cloudwatch/annotation_query.go index 06e9ad58436..1a427afaaeb 100644 --- a/pkg/tsdb/cloudwatch/annotation_query.go +++ b/pkg/tsdb/cloudwatch/annotation_query.go @@ -7,39 +7,34 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/cloudwatch" + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana/pkg/components/simplejson" - "github.com/grafana/grafana/pkg/plugins" "github.com/grafana/grafana/pkg/util/errutil" ) -func (e *cloudWatchExecutor) executeAnnotationQuery(ctx context.Context, queryContext plugins.DataQuery) ( - plugins.DataResponse, error) { - result := plugins.DataResponse{ - Results: make(map[string]plugins.DataQueryResult), - } - firstQuery := queryContext.Queries[0] - queryResult := plugins.DataQueryResult{Meta: simplejson.New(), RefID: firstQuery.RefID} - - parameters := firstQuery.Model - usePrefixMatch := parameters.Get("prefixMatching").MustBool(false) - region := parameters.Get("region").MustString("") - namespace := parameters.Get("namespace").MustString("") - metricName := parameters.Get("metricName").MustString("") - dimensions := parameters.Get("dimensions").MustMap() - statistics, err := parseStatistics(parameters) +func (e *cloudWatchExecutor) executeAnnotationQuery(ctx context.Context, model *simplejson.Json, query backend.DataQuery, pluginCtx backend.PluginContext) (*backend.QueryDataResponse, error) { + result := backend.NewQueryDataResponse() + + usePrefixMatch := model.Get("prefixMatching").MustBool(false) + region := model.Get("region").MustString("") + namespace := model.Get("namespace").MustString("") + metricName := model.Get("metricName").MustString("") + dimensions := model.Get("dimensions").MustMap() + statistics, err := parseStatistics(model) if err != nil { - return plugins.DataResponse{}, err + return nil, err } - period := int64(parameters.Get("period").MustInt(0)) + period := int64(model.Get("period").MustInt(0)) if period == 0 && !usePrefixMatch { period = 300 } - actionPrefix := parameters.Get("actionPrefix").MustString("") - alarmNamePrefix := parameters.Get("alarmNamePrefix").MustString("") + actionPrefix := model.Get("actionPrefix").MustString("") + alarmNamePrefix := model.Get("alarmNamePrefix").MustString("") - cli, err := e.getCWClient(region) + cli, err := e.getCWClient(region, pluginCtx) if err != nil { - return plugins.DataResponse{}, err + return nil, err } var alarmNames []*string @@ -51,7 +46,7 @@ func (e *cloudWatchExecutor) executeAnnotationQuery(ctx context.Context, queryCo } resp, err := cli.DescribeAlarms(params) if err != nil { - return plugins.DataResponse{}, errutil.Wrap("failed to call cloudwatch:DescribeAlarms", err) + return nil, errutil.Wrap("failed to call cloudwatch:DescribeAlarms", err) } alarmNames = filterAlarms(resp, namespace, metricName, dimensions, statistics, period) } else { @@ -82,7 +77,7 @@ func (e *cloudWatchExecutor) executeAnnotationQuery(ctx context.Context, queryCo } resp, err := cli.DescribeAlarmsForMetric(params) if err != nil { - return plugins.DataResponse{}, errutil.Wrap("failed to call cloudwatch:DescribeAlarmsForMetric", err) + return nil, errutil.Wrap("failed to call cloudwatch:DescribeAlarmsForMetric", err) } for _, alarm := range resp.MetricAlarms { alarmNames = append(alarmNames, alarm.AlarmName) @@ -90,26 +85,17 @@ func (e *cloudWatchExecutor) executeAnnotationQuery(ctx context.Context, queryCo } } - startTime, err := queryContext.TimeRange.ParseFrom() - if err != nil { - return plugins.DataResponse{}, err - } - endTime, err := queryContext.TimeRange.ParseTo() - if err != nil { - return plugins.DataResponse{}, err - } - annotations := make([]map[string]string, 0) for _, alarmName := range alarmNames { params := &cloudwatch.DescribeAlarmHistoryInput{ AlarmName: alarmName, - StartDate: aws.Time(startTime), - EndDate: aws.Time(endTime), + StartDate: aws.Time(query.TimeRange.From), + EndDate: aws.Time(query.TimeRange.To), MaxRecords: aws.Int64(100), } resp, err := cli.DescribeAlarmHistory(params) if err != nil { - return plugins.DataResponse{}, errutil.Wrap("failed to call cloudwatch:DescribeAlarmHistory", err) + return nil, errutil.Wrap("failed to call cloudwatch:DescribeAlarmHistory", err) } for _, history := range resp.AlarmHistoryItems { annotation := make(map[string]string) @@ -121,31 +107,32 @@ func (e *cloudWatchExecutor) executeAnnotationQuery(ctx context.Context, queryCo } } - transformAnnotationToTable(annotations, &queryResult) - result.Results[firstQuery.RefID] = queryResult - return result, nil + respD := result.Responses[query.RefID] + respD.Frames = append(respD.Frames, transformAnnotationToTable(annotations, query)) + result.Responses[query.RefID] = respD + + return result, err } -func transformAnnotationToTable(data []map[string]string, result *plugins.DataQueryResult) { - table := plugins.DataTable{ - Columns: make([]plugins.DataTableColumn, 4), - Rows: make([]plugins.DataRowValues, 0), +func transformAnnotationToTable(annotations []map[string]string, query backend.DataQuery) *data.Frame { + frame := data.NewFrame(query.RefID, + data.NewField("time", nil, []string{}), + data.NewField("title", nil, []string{}), + data.NewField("tags", nil, []string{}), + data.NewField("text", nil, []string{}), + ) + + for _, a := range annotations { + frame.AppendRow(a["time"], a["title"], a["tags"], a["text"]) } - table.Columns[0].Text = "time" - table.Columns[1].Text = "title" - table.Columns[2].Text = "tags" - table.Columns[3].Text = "text" - - for _, r := range data { - values := make([]interface{}, 4) - values[0] = r["time"] - values[1] = r["title"] - values[2] = r["tags"] - values[3] = r["text"] - table.Rows = append(table.Rows, values) + + frame.Meta = &data.FrameMeta{ + Custom: map[string]interface{}{ + "rowCount": len(annotations), + }, } - result.Tables = append(result.Tables, table) - result.Meta.Set("rowCount", len(data)) + + return frame } func filterAlarms(alarms *cloudwatch.DescribeAlarmsOutput, namespace string, metricName string, diff --git a/pkg/tsdb/cloudwatch/cloudwatch.go b/pkg/tsdb/cloudwatch/cloudwatch.go index b8b6c6144c2..b4eee605ed6 100644 --- a/pkg/tsdb/cloudwatch/cloudwatch.go +++ b/pkg/tsdb/cloudwatch/cloudwatch.go @@ -2,13 +2,11 @@ package cloudwatch import ( "context" + "encoding/json" "fmt" "regexp" "time" - "github.com/grafana/grafana-aws-sdk/pkg/awsds" - "github.com/grafana/grafana-plugin-sdk-go/data" - "github.com/aws/aws-sdk-go/aws/client" "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/session" @@ -20,14 +18,34 @@ import ( "github.com/aws/aws-sdk-go/service/ec2/ec2iface" "github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi" "github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi/resourcegroupstaggingapiiface" + "github.com/grafana/grafana-aws-sdk/pkg/awsds" + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/backend/datasource" + "github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt" + "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/infra/log" - "github.com/grafana/grafana/pkg/models" - "github.com/grafana/grafana/pkg/plugins" + "github.com/grafana/grafana/pkg/plugins/backendplugin" + "github.com/grafana/grafana/pkg/plugins/backendplugin/coreplugin" "github.com/grafana/grafana/pkg/registry" "github.com/grafana/grafana/pkg/setting" ) +type datasourceInfo struct { + profile string + region string + authType awsds.AuthType + assumeRoleARN string + externalID string + namespace string + endpoint string + + accessKey string + secretKey string + + datasourceID int64 +} + const cloudWatchTSFormat = "2006-01-02 15:04:05.000" const defaultRegion = "default" @@ -47,60 +65,133 @@ func init() { } type CloudWatchService struct { - LogsService *LogsService `inject:""` - Cfg *setting.Cfg `inject:""` - sessions SessionCache + LogsService *LogsService `inject:""` + BackendPluginManager backendplugin.Manager `inject:""` + Cfg *setting.Cfg `inject:""` } func (s *CloudWatchService) Init() error { - s.sessions = awsds.NewSessionCache() - return nil -} + plog.Debug("initing") + + im := datasource.NewInstanceManager(NewInstanceSettings()) + + factory := coreplugin.New(backend.ServeOpts{ + QueryDataHandler: newExecutor(s.LogsService, im, s.Cfg, awsds.NewSessionCache()), + }) -func (s *CloudWatchService) NewExecutor(*models.DataSource) (plugins.DataPlugin, error) { - return newExecutor(s.LogsService, s.Cfg, s.sessions), nil + if err := s.BackendPluginManager.Register("cloudwatch", factory); err != nil { + plog.Error("Failed to register plugin", "error", err) + } + return nil } type SessionCache interface { GetSession(region string, s awsds.AWSDatasourceSettings) (*session.Session, error) } -func newExecutor(logsService *LogsService, cfg *setting.Cfg, sessions SessionCache) *cloudWatchExecutor { +func newExecutor(logsService *LogsService, im instancemgmt.InstanceManager, cfg *setting.Cfg, sessions SessionCache) *cloudWatchExecutor { return &cloudWatchExecutor{ - cfg: cfg, logsService: logsService, + im: im, + cfg: cfg, sessions: sessions, } } +func NewInstanceSettings() datasource.InstanceFactoryFunc { + return func(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { + var jsonData map[string]string + + err := json.Unmarshal(settings.JSONData, &jsonData) + if err != nil { + return nil, fmt.Errorf("error reading settings: %w", err) + } + + model := datasourceInfo{ + profile: jsonData["profile"], + region: jsonData["defaultRegion"], + assumeRoleARN: jsonData["assumeRoleArn"], + externalID: jsonData["externalId"], + endpoint: jsonData["endpoint"], + namespace: jsonData["customMetricsNamespaces"], + datasourceID: settings.ID, + } + + atStr := jsonData["authType"] + at := awsds.AuthTypeDefault + switch atStr { + case "credentials": + at = awsds.AuthTypeSharedCreds + case "keys": + at = awsds.AuthTypeKeys + case "default": + at = awsds.AuthTypeDefault + case "ec2_iam_role": + at = awsds.AuthTypeEC2IAMRole + case "arn": + at = awsds.AuthTypeDefault + plog.Warn("Authentication type \"arn\" is deprecated, falling back to default") + default: + plog.Warn("Unrecognized AWS authentication type", "type", atStr) + } + + model.authType = at + + if model.profile == "" { + model.profile = settings.Database // legacy support + } + + model.accessKey = settings.DecryptedSecureJSONData["accessKey"] + model.secretKey = settings.DecryptedSecureJSONData["secretKey"] + + return model, nil + } +} + // cloudWatchExecutor executes CloudWatch requests. type cloudWatchExecutor struct { - *models.DataSource - ec2Client ec2iface.EC2API rgtaClient resourcegroupstaggingapiiface.ResourceGroupsTaggingAPIAPI logsService *LogsService + im instancemgmt.InstanceManager cfg *setting.Cfg sessions SessionCache } -func (e *cloudWatchExecutor) newSession(region string) (*session.Session, error) { - awsDatasourceSettings := e.getAWSDatasourceSettings(region) +func (e *cloudWatchExecutor) newSession(region string, pluginCtx backend.PluginContext) (*session.Session, error) { + dsInfo, err := e.getDSInfo(pluginCtx) + if err != nil { + return nil, err + } + + if region == defaultRegion { + region = dsInfo.region + } - return e.sessions.GetSession(region, *awsDatasourceSettings) + return e.sessions.GetSession(region, awsds.AWSDatasourceSettings{ + Profile: dsInfo.profile, + Region: region, + AuthType: dsInfo.authType, + AssumeRoleARN: dsInfo.assumeRoleARN, + ExternalID: dsInfo.externalID, + Endpoint: dsInfo.endpoint, + DefaultRegion: dsInfo.region, + AccessKey: dsInfo.accessKey, + SecretKey: dsInfo.secretKey, + }) } -func (e *cloudWatchExecutor) getCWClient(region string) (cloudwatchiface.CloudWatchAPI, error) { - sess, err := e.newSession(region) +func (e *cloudWatchExecutor) getCWClient(region string, pluginCtx backend.PluginContext) (cloudwatchiface.CloudWatchAPI, error) { + sess, err := e.newSession(region, pluginCtx) if err != nil { return nil, err } return NewCWClient(sess), nil } -func (e *cloudWatchExecutor) getCWLogsClient(region string) (cloudwatchlogsiface.CloudWatchLogsAPI, error) { - sess, err := e.newSession(region) +func (e *cloudWatchExecutor) getCWLogsClient(region string, pluginCtx backend.PluginContext) (cloudwatchlogsiface.CloudWatchLogsAPI, error) { + sess, err := e.newSession(region, pluginCtx) if err != nil { return nil, err } @@ -110,12 +201,12 @@ func (e *cloudWatchExecutor) getCWLogsClient(region string) (cloudwatchlogsiface return logsClient, nil } -func (e *cloudWatchExecutor) getEC2Client(region string) (ec2iface.EC2API, error) { +func (e *cloudWatchExecutor) getEC2Client(region string, pluginCtx backend.PluginContext) (ec2iface.EC2API, error) { if e.ec2Client != nil { return e.ec2Client, nil } - sess, err := e.newSession(region) + sess, err := e.newSession(region, pluginCtx) if err != nil { return nil, err } @@ -124,13 +215,13 @@ func (e *cloudWatchExecutor) getEC2Client(region string) (ec2iface.EC2API, error return e.ec2Client, nil } -func (e *cloudWatchExecutor) getRGTAClient(region string) (resourcegroupstaggingapiiface.ResourceGroupsTaggingAPIAPI, +func (e *cloudWatchExecutor) getRGTAClient(region string, pluginCtx backend.PluginContext) (resourcegroupstaggingapiiface.ResourceGroupsTaggingAPIAPI, error) { if e.rgtaClient != nil { return e.rgtaClient, nil } - sess, err := e.newSession(region) + sess, err := e.newSession(region, pluginCtx) if err != nil { return nil, err } @@ -140,18 +231,17 @@ func (e *cloudWatchExecutor) getRGTAClient(region string) (resourcegroupstagging } func (e *cloudWatchExecutor) alertQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, - queryContext plugins.DataQuery) (*cloudwatchlogs.GetQueryResultsOutput, error) { + queryContext backend.DataQuery, model *simplejson.Json) (*cloudwatchlogs.GetQueryResultsOutput, error) { const maxAttempts = 8 const pollPeriod = 1000 * time.Millisecond - queryParams := queryContext.Queries[0].Model - startQueryOutput, err := e.executeStartQuery(ctx, logsClient, queryParams, *queryContext.TimeRange) + startQueryOutput, err := e.executeStartQuery(ctx, logsClient, model, queryContext.TimeRange) if err != nil { return nil, err } requestParams := simplejson.NewFromAny(map[string]interface{}{ - "region": queryParams.Get("region").MustString(""), + "region": model.Get("region").MustString(""), "queryId": *startQueryOutput.QueryId, }) @@ -177,11 +267,7 @@ func (e *cloudWatchExecutor) alertQuery(ctx context.Context, logsClient cloudwat return nil, nil } -// DataQuery executes a CloudWatch query. -func (e *cloudWatchExecutor) DataQuery(ctx context.Context, dsInfo *models.DataSource, - queryContext plugins.DataQuery) (plugins.DataResponse, error) { - e.DataSource = dsInfo - +func (e *cloudWatchExecutor) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { /* Unlike many other data sources, with Cloudwatch Logs query requests don't receive the results as the response to the query, but rather an ID is first returned. Following this, a client is expected to send requests along @@ -189,146 +275,111 @@ func (e *cloudWatchExecutor) DataQuery(ctx context.Context, dsInfo *models.DataS queries made via dashboards and Explore, the logic of making these repeated queries is handled on the frontend, but because alerts are executed on the backend the logic needs to be reimplemented here. */ - queryParams := queryContext.Queries[0].Model - _, fromAlert := queryContext.Headers["FromAlert"] - isLogAlertQuery := fromAlert && queryParams.Get("queryMode").MustString("") == "Logs" + q := req.Queries[0] + model, err := simplejson.NewJson(q.JSON) + if err != nil { + return nil, err + } + _, fromAlert := req.Headers["FromAlert"] + isLogAlertQuery := fromAlert && model.Get("queryMode").MustString("") == "Logs" if isLogAlertQuery { - return e.executeLogAlertQuery(ctx, queryContext) + return e.executeLogAlertQuery(ctx, req) } - queryType := queryParams.Get("type").MustString("") + queryType := model.Get("type").MustString("") - var err error - var result plugins.DataResponse + var result *backend.QueryDataResponse switch queryType { case "metricFindQuery": - result, err = e.executeMetricFindQuery(ctx, queryContext) + result, err = e.executeMetricFindQuery(ctx, model, q, req.PluginContext) case "annotationQuery": - result, err = e.executeAnnotationQuery(ctx, queryContext) + result, err = e.executeAnnotationQuery(ctx, model, q, req.PluginContext) case "logAction": - result, err = e.executeLogActions(ctx, queryContext) + result, err = e.executeLogActions(ctx, req) case "liveLogAction": - result, err = e.executeLiveLogQuery(ctx, queryContext) + result, err = e.executeLiveLogQuery(ctx, req) case "timeSeriesQuery": fallthrough default: - result, err = e.executeTimeSeriesQuery(ctx, queryContext) + result, err = e.executeTimeSeriesQuery(ctx, req) } return result, err } -func (e *cloudWatchExecutor) executeLogAlertQuery(ctx context.Context, queryContext plugins.DataQuery) ( - plugins.DataResponse, error) { - queryParams := queryContext.Queries[0].Model - queryParams.Set("subtype", "StartQuery") - queryParams.Set("queryString", queryParams.Get("expression").MustString("")) +func (e *cloudWatchExecutor) executeLogAlertQuery(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { + resp := backend.NewQueryDataResponse() - region := queryParams.Get("region").MustString(defaultRegion) - if region == defaultRegion { - region = e.DataSource.JsonData.Get("defaultRegion").MustString() - queryParams.Set("region", region) - } + for _, q := range req.Queries { + model, err := simplejson.NewJson(q.JSON) + if err != nil { + continue + } - logsClient, err := e.getCWLogsClient(region) - if err != nil { - return plugins.DataResponse{}, err - } + model.Set("subtype", "StartQuery") + model.Set("queryString", model.Get("expression").MustString("")) - result, err := e.executeStartQuery(ctx, logsClient, queryParams, *queryContext.TimeRange) - if err != nil { - return plugins.DataResponse{}, err - } + region := model.Get("region").MustString(defaultRegion) + if region == defaultRegion { + dsInfo, err := e.getDSInfo(req.PluginContext) + if err != nil { + return nil, err + } + model.Set("region", dsInfo.region) + } - queryParams.Set("queryId", *result.QueryId) + logsClient, err := e.getCWLogsClient(region, req.PluginContext) + if err != nil { + return nil, err + } - // Get query results - getQueryResultsOutput, err := e.alertQuery(ctx, logsClient, queryContext) - if err != nil { - return plugins.DataResponse{}, err - } + result, err := e.executeStartQuery(ctx, logsClient, model, q.TimeRange) + if err != nil { + return nil, err + } - dataframe, err := logsResultsToDataframes(getQueryResultsOutput) - if err != nil { - return plugins.DataResponse{}, err - } + model.Set("queryId", *result.QueryId) - statsGroups := queryParams.Get("statsGroups").MustStringArray() - if len(statsGroups) > 0 && len(dataframe.Fields) > 0 { - groupedFrames, err := groupResults(dataframe, statsGroups) + getQueryResultsOutput, err := e.alertQuery(ctx, logsClient, q, model) if err != nil { - return plugins.DataResponse{}, err + return nil, err } - response := plugins.DataResponse{ - Results: make(map[string]plugins.DataQueryResult), + dataframe, err := logsResultsToDataframes(getQueryResultsOutput) + if err != nil { + return nil, err } - response.Results["A"] = plugins.DataQueryResult{ - RefID: "A", - Dataframes: plugins.NewDecodedDataFrames(groupedFrames), + var frames []*data.Frame + + statsGroups := model.Get("statsGroups").MustStringArray() + if len(statsGroups) > 0 && len(dataframe.Fields) > 0 { + frames, err = groupResults(dataframe, statsGroups) + if err != nil { + return nil, err + } + } else { + frames = data.Frames{dataframe} } - return response, nil + respD := resp.Responses["A"] + respD.Frames = frames + resp.Responses["A"] = respD } - response := plugins.DataResponse{ - Results: map[string]plugins.DataQueryResult{ - "A": { - RefID: "A", - Dataframes: plugins.NewDecodedDataFrames(data.Frames{dataframe}), - }, - }, - } - return response, nil + return resp, nil } -func (e *cloudWatchExecutor) getAWSDatasourceSettings(region string) *awsds.AWSDatasourceSettings { - if region == defaultRegion { - region = e.DataSource.JsonData.Get("defaultRegion").MustString() - } - - atStr := e.DataSource.JsonData.Get("authType").MustString() - assumeRoleARN := e.DataSource.JsonData.Get("assumeRoleArn").MustString() - externalID := e.DataSource.JsonData.Get("externalId").MustString() - endpoint := e.DataSource.JsonData.Get("endpoint").MustString() - decrypted := e.DataSource.DecryptedValues() - accessKey := decrypted["accessKey"] - secretKey := decrypted["secretKey"] - - at := awsds.AuthTypeDefault - switch atStr { - case "credentials": - at = awsds.AuthTypeSharedCreds - case "keys": - at = awsds.AuthTypeKeys - case "default": - at = awsds.AuthTypeDefault - case "arn": - at = awsds.AuthTypeDefault - plog.Warn("Authentication type \"arn\" is deprecated, falling back to default") - case "ec2_iam_role": - at = awsds.AuthTypeEC2IAMRole - default: - plog.Warn("Unrecognized AWS authentication type", "type", atStr) +func (e *cloudWatchExecutor) getDSInfo(pluginCtx backend.PluginContext) (*datasourceInfo, error) { + i, err := e.im.Get(pluginCtx) + if err != nil { + return nil, err } - profile := e.DataSource.JsonData.Get("profile").MustString() - if profile == "" { - profile = e.DataSource.Database // legacy support - } + instance := i.(datasourceInfo) - return &awsds.AWSDatasourceSettings{ - Region: region, - Profile: profile, - AuthType: at, - AssumeRoleARN: assumeRoleARN, - ExternalID: externalID, - AccessKey: accessKey, - SecretKey: secretKey, - Endpoint: endpoint, - } + return &instance, nil } func isTerminated(queryStatus string) bool { diff --git a/pkg/tsdb/cloudwatch/live.go b/pkg/tsdb/cloudwatch/live.go index 6397bcb3ae0..b2659de0b67 100644 --- a/pkg/tsdb/cloudwatch/live.go +++ b/pkg/tsdb/cloudwatch/live.go @@ -15,6 +15,7 @@ import ( "github.com/aws/aws-sdk-go/service/servicequotas/servicequotasiface" "github.com/centrifugal/centrifuge" "github.com/google/uuid" + "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/models" @@ -108,24 +109,24 @@ func (r *logQueryRunner) publishResults(channelName string) error { // executeLiveLogQuery executes a CloudWatch Logs query with live updates over WebSocket. // A WebSocket channel is created, which goroutines send responses over. -func (e *cloudWatchExecutor) executeLiveLogQuery(ctx context.Context, queryContext plugins.DataQuery) ( - plugins.DataResponse, error) { +func (e *cloudWatchExecutor) executeLiveLogQuery(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { responseChannelName := uuid.New().String() responseChannel := make(chan plugins.DataResponse) if err := e.logsService.AddResponseChannel("plugin/cloudwatch/"+responseChannelName, responseChannel); err != nil { close(responseChannel) - return plugins.DataResponse{}, err + return nil, err } - go e.sendLiveQueriesToChannel(queryContext, responseChannel) + go e.sendLiveQueriesToChannel(req, responseChannel) - response := plugins.DataResponse{ - Results: map[string]plugins.DataQueryResult{ + response := &backend.QueryDataResponse{ + Responses: backend.Responses{ "A": { - RefID: "A", - Meta: simplejson.NewFromAny(map[string]interface{}{ - "channelName": responseChannelName, - }), + Frames: data.Frames{data.NewFrame("A").SetMeta(&data.FrameMeta{ + Custom: map[string]interface{}{ + "channelName": responseChannelName, + }, + })}, }, }, } @@ -133,18 +134,17 @@ func (e *cloudWatchExecutor) executeLiveLogQuery(ctx context.Context, queryConte return response, nil } -func (e *cloudWatchExecutor) sendLiveQueriesToChannel(queryContext plugins.DataQuery, - responseChannel chan plugins.DataResponse) { +func (e *cloudWatchExecutor) sendLiveQueriesToChannel(req *backend.QueryDataRequest, responseChannel chan plugins.DataResponse) { defer close(responseChannel) ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute) defer cancel() eg, ectx := errgroup.WithContext(ctx) - for _, query := range queryContext.Queries { + for _, query := range req.Queries { query := query eg.Go(func() error { - return e.startLiveQuery(ectx, responseChannel, query, *queryContext.TimeRange) + return e.startLiveQuery(ectx, responseChannel, query, query.TimeRange, req.PluginContext) }) } @@ -153,7 +153,7 @@ func (e *cloudWatchExecutor) sendLiveQueriesToChannel(queryContext plugins.DataQ } } -func (e *cloudWatchExecutor) getQueue(queueKey string) (chan bool, error) { +func (e *cloudWatchExecutor) getQueue(queueKey string, pluginCtx backend.PluginContext) (chan bool, error) { e.logsService.queueLock.Lock() defer e.logsService.queueLock.Unlock() @@ -161,7 +161,7 @@ func (e *cloudWatchExecutor) getQueue(queueKey string) (chan bool, error) { return queue, nil } - concurrentQueriesQuota := e.fetchConcurrentQueriesQuota(queueKey) + concurrentQueriesQuota := e.fetchConcurrentQueriesQuota(queueKey, pluginCtx) queueChannel := make(chan bool, concurrentQueriesQuota) e.logsService.queues[queueKey] = queueChannel @@ -169,8 +169,8 @@ func (e *cloudWatchExecutor) getQueue(queueKey string) (chan bool, error) { return queueChannel, nil } -func (e *cloudWatchExecutor) fetchConcurrentQueriesQuota(region string) int { - sess, err := e.newSession(region) +func (e *cloudWatchExecutor) fetchConcurrentQueriesQuota(region string, pluginCtx backend.PluginContext) int { + sess, err := e.newSession(region, pluginCtx) if err != nil { plog.Warn("Could not get service quota client") return defaultConcurrentQueries @@ -211,17 +211,25 @@ func (e *cloudWatchExecutor) fetchConcurrentQueriesQuota(region string) int { return defaultConcurrentQueries } -func (e *cloudWatchExecutor) startLiveQuery(ctx context.Context, responseChannel chan plugins.DataResponse, - query plugins.DataSubQuery, timeRange plugins.DataTimeRange) error { - defaultRegion := e.DataSource.JsonData.Get("defaultRegion").MustString() - parameters := query.Model - region := parameters.Get("region").MustString(defaultRegion) - logsClient, err := e.getCWLogsClient(region) +func (e *cloudWatchExecutor) startLiveQuery(ctx context.Context, responseChannel chan plugins.DataResponse, query backend.DataQuery, timeRange backend.TimeRange, pluginCtx backend.PluginContext) error { + model, err := simplejson.NewJson(query.JSON) if err != nil { return err } - queue, err := e.getQueue(fmt.Sprintf("%s-%d", region, e.DataSource.Id)) + dsInfo, err := e.getDSInfo(pluginCtx) + if err != nil { + return err + } + + defaultRegion := dsInfo.region + region := model.Get("region").MustString(defaultRegion) + logsClient, err := e.getCWLogsClient(region, pluginCtx) + if err != nil { + return err + } + + queue, err := e.getQueue(fmt.Sprintf("%s-%d", region, dsInfo.datasourceID), pluginCtx) if err != nil { return err } @@ -230,7 +238,7 @@ func (e *cloudWatchExecutor) startLiveQuery(ctx context.Context, responseChannel queue <- true defer func() { <-queue }() - startQueryOutput, err := e.executeStartQuery(ctx, logsClient, parameters, timeRange) + startQueryOutput, err := e.executeStartQuery(ctx, logsClient, model, timeRange) if err != nil { return err } @@ -265,7 +273,7 @@ func (e *cloudWatchExecutor) startLiveQuery(ctx context.Context, responseChannel // Because of this, if the frontend sees that a "stats ... by ..." query is being made // the "statsGroups" parameter is sent along with the query to the backend so that we // can correctly group the CloudWatch logs response. - statsGroups := parameters.Get("statsGroups").MustStringArray() + statsGroups := model.Get("statsGroups").MustStringArray() if len(statsGroups) > 0 && len(dataFrame.Fields) > 0 { groupedFrames, err := groupResults(dataFrame, statsGroups) if err != nil { diff --git a/pkg/tsdb/cloudwatch/log_actions.go b/pkg/tsdb/cloudwatch/log_actions.go index bb3635950ba..99bb4fd44be 100644 --- a/pkg/tsdb/cloudwatch/log_actions.go +++ b/pkg/tsdb/cloudwatch/log_actions.go @@ -10,21 +10,27 @@ import ( "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/cloudwatchlogs" "github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface" + "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana/pkg/components/simplejson" - "github.com/grafana/grafana/pkg/plugins" "golang.org/x/sync/errgroup" ) -func (e *cloudWatchExecutor) executeLogActions(ctx context.Context, - queryContext plugins.DataQuery) (plugins.DataResponse, error) { - resultChan := make(chan plugins.DataQueryResult, len(queryContext.Queries)) +func (e *cloudWatchExecutor) executeLogActions(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { + resp := backend.NewQueryDataResponse() + + resultChan := make(chan backend.Responses, len(req.Queries)) eg, ectx := errgroup.WithContext(ctx) - for _, query := range queryContext.Queries { + for _, query := range req.Queries { + model, err := simplejson.NewJson(query.JSON) + if err != nil { + return nil, err + } + query := query eg.Go(func() error { - dataframe, err := e.executeLogAction(ectx, queryContext, query) + dataframe, err := e.executeLogAction(ectx, model, query, req.PluginContext) if err != nil { return err } @@ -36,16 +42,15 @@ func (e *cloudWatchExecutor) executeLogActions(ctx context.Context, // Because of this, if the frontend sees that a "stats ... by ..." query is being made // the "statsGroups" parameter is sent along with the query to the backend so that we // can correctly group the CloudWatch logs response. - statsGroups := query.Model.Get("statsGroups").MustStringArray() + statsGroups := model.Get("statsGroups").MustStringArray() if len(statsGroups) > 0 && len(dataframe.Fields) > 0 { groupedFrames, err := groupResults(dataframe, statsGroups) if err != nil { return err } - resultChan <- plugins.DataQueryResult{ - RefID: query.RefID, - Dataframes: plugins.NewDecodedDataFrames(groupedFrames), + resultChan <- backend.Responses{ + query.RefID: backend.DataResponse{Frames: groupedFrames}, } return nil } @@ -58,37 +63,41 @@ func (e *cloudWatchExecutor) executeLogActions(ctx context.Context, } } - resultChan <- plugins.DataQueryResult{ - RefID: query.RefID, - Dataframes: plugins.NewDecodedDataFrames(data.Frames{dataframe}), + resultChan <- backend.Responses{ + query.RefID: backend.DataResponse{Frames: data.Frames{dataframe}}, } return nil }) } if err := eg.Wait(); err != nil { - return plugins.DataResponse{}, err + return nil, err } close(resultChan) - response := plugins.DataResponse{ - Results: make(map[string]plugins.DataQueryResult), - } for result := range resultChan { - response.Results[result.RefID] = result + for refID, response := range result { + respD := resp.Responses[refID] + respD.Frames = response.Frames + resp.Responses[refID] = respD + } } - return response, nil + return resp, nil } -func (e *cloudWatchExecutor) executeLogAction(ctx context.Context, queryContext plugins.DataQuery, - query plugins.DataSubQuery) (*data.Frame, error) { - parameters := query.Model - subType := query.Model.Get("subtype").MustString() +func (e *cloudWatchExecutor) executeLogAction(ctx context.Context, model *simplejson.Json, query backend.DataQuery, pluginCtx backend.PluginContext) (*data.Frame, error) { + subType := model.Get("subtype").MustString() + + dsInfo, err := e.getDSInfo(pluginCtx) + if err != nil { + return nil, err + } + + defaultRegion := dsInfo.region - defaultRegion := e.DataSource.JsonData.Get("defaultRegion").MustString() - region := parameters.Get("region").MustString(defaultRegion) - logsClient, err := e.getCWLogsClient(region) + region := model.Get("region").MustString(defaultRegion) + logsClient, err := e.getCWLogsClient(region, pluginCtx) if err != nil { return nil, err } @@ -97,17 +106,17 @@ func (e *cloudWatchExecutor) executeLogAction(ctx context.Context, queryContext switch subType { case "DescribeLogGroups": - data, err = e.handleDescribeLogGroups(ctx, logsClient, parameters) + data, err = e.handleDescribeLogGroups(ctx, logsClient, model) case "GetLogGroupFields": - data, err = e.handleGetLogGroupFields(ctx, logsClient, parameters, query.RefID) + data, err = e.handleGetLogGroupFields(ctx, logsClient, model, query.RefID) case "StartQuery": - data, err = e.handleStartQuery(ctx, logsClient, parameters, *queryContext.TimeRange, query.RefID) + data, err = e.handleStartQuery(ctx, logsClient, model, query.TimeRange, query.RefID) case "StopQuery": - data, err = e.handleStopQuery(ctx, logsClient, parameters) + data, err = e.handleStopQuery(ctx, logsClient, model) case "GetQueryResults": - data, err = e.handleGetQueryResults(ctx, logsClient, parameters, query.RefID) + data, err = e.handleGetQueryResults(ctx, logsClient, model, query.RefID) case "GetLogEvents": - data, err = e.handleGetLogEvents(ctx, logsClient, parameters) + data, err = e.handleGetLogEvents(ctx, logsClient, model) } if err != nil { return nil, err @@ -200,16 +209,9 @@ func (e *cloudWatchExecutor) handleDescribeLogGroups(ctx context.Context, } func (e *cloudWatchExecutor) executeStartQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, - parameters *simplejson.Json, timeRange plugins.DataTimeRange) (*cloudwatchlogs.StartQueryOutput, error) { - startTime, err := timeRange.ParseFrom() - if err != nil { - return nil, err - } - - endTime, err := timeRange.ParseTo() - if err != nil { - return nil, err - } + parameters *simplejson.Json, timeRange backend.TimeRange) (*cloudwatchlogs.StartQueryOutput, error) { + startTime := timeRange.From + endTime := timeRange.To if !startTime.Before(endTime) { return nil, fmt.Errorf("invalid time range: start time must be before end time") @@ -237,8 +239,8 @@ func (e *cloudWatchExecutor) executeStartQuery(ctx context.Context, logsClient c } func (e *cloudWatchExecutor) handleStartQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, - parameters *simplejson.Json, timeRange plugins.DataTimeRange, refID string) (*data.Frame, error) { - startQueryResponse, err := e.executeStartQuery(ctx, logsClient, parameters, timeRange) + model *simplejson.Json, timeRange backend.TimeRange, refID string) (*data.Frame, error) { + startQueryResponse, err := e.executeStartQuery(ctx, logsClient, model, timeRange) if err != nil { return nil, err } @@ -246,7 +248,7 @@ func (e *cloudWatchExecutor) handleStartQuery(ctx context.Context, logsClient cl dataFrame := data.NewFrame(refID, data.NewField("queryId", nil, []string{*startQueryResponse.QueryId})) dataFrame.RefID = refID - clientRegion := parameters.Get("region").MustString("default") + clientRegion := model.Get("region").MustString("default") dataFrame.Meta = &data.FrameMeta{ Custom: map[string]interface{}{ diff --git a/pkg/tsdb/cloudwatch/log_actions_test.go b/pkg/tsdb/cloudwatch/log_actions_test.go index 66a4987814a..afce302d9d1 100644 --- a/pkg/tsdb/cloudwatch/log_actions_test.go +++ b/pkg/tsdb/cloudwatch/log_actions_test.go @@ -2,6 +2,7 @@ package cloudwatch import ( "context" + "encoding/json" "fmt" "testing" "time" @@ -10,9 +11,10 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/cloudwatchlogs" "github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface" + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/backend/datasource" + "github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt" "github.com/grafana/grafana-plugin-sdk-go/data" - "github.com/grafana/grafana/pkg/components/simplejson" - "github.com/grafana/grafana/pkg/plugins" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -47,39 +49,45 @@ func TestQuery_DescribeLogGroups(t *testing.T) { }, } - executor := newExecutor(nil, newTestConfig(), fakeSessionCache{}) - resp, err := executor.DataQuery(context.Background(), fakeDataSource(), plugins.DataQuery{ - Queries: []plugins.DataSubQuery{ + im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { + return datasourceInfo{}, nil + }) + + executor := newExecutor(nil, im, newTestConfig(), fakeSessionCache{}) + resp, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{ + PluginContext: backend.PluginContext{ + DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{}, + }, + Queries: []backend.DataQuery{ { - Model: simplejson.NewFromAny(map[string]interface{}{ + JSON: json.RawMessage(`{ "type": "logAction", "subtype": "DescribeLogGroups", - "limit": 50, - }), + "limit": 50 + }`), }, }, }) require.NoError(t, err) require.NotNil(t, resp) - assert.Equal(t, plugins.DataResponse{ - Results: map[string]plugins.DataQueryResult{ - "": { - Dataframes: plugins.NewDecodedDataFrames(data.Frames{ - &data.Frame{ - Name: "logGroups", - Fields: []*data.Field{ - data.NewField("logGroupName", nil, []*string{ - aws.String("group_a"), aws.String("group_b"), aws.String("group_c"), - }), - }, - Meta: &data.FrameMeta{ - PreferredVisualization: "logs", - }, + assert.Equal(t, &backend.QueryDataResponse{Responses: backend.Responses{ + "": backend.DataResponse{ + Frames: data.Frames{ + &data.Frame{ + Name: "logGroups", + Fields: []*data.Field{ + data.NewField("logGroupName", nil, []*string{ + aws.String("group_a"), aws.String("group_b"), aws.String("group_c"), + }), }, - }), + Meta: &data.FrameMeta{ + PreferredVisualization: "logs", + }, + }, }, }, + }, }, resp) }) @@ -100,39 +108,47 @@ func TestQuery_DescribeLogGroups(t *testing.T) { }, } - executor := newExecutor(nil, newTestConfig(), fakeSessionCache{}) - resp, err := executor.DataQuery(context.Background(), fakeDataSource(), plugins.DataQuery{ - Queries: []plugins.DataSubQuery{ + im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { + return datasourceInfo{}, nil + }) + + executor := newExecutor(nil, im, newTestConfig(), fakeSessionCache{}) + resp, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{ + PluginContext: backend.PluginContext{ + DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{}, + }, + Queries: []backend.DataQuery{ { - Model: simplejson.NewFromAny(map[string]interface{}{ - "type": "logAction", - "subtype": "DescribeLogGroups", - "logGroupNamePrefix": "g", - }), + JSON: json.RawMessage(`{ + "type": "logAction", + "subtype": "DescribeLogGroups", + "limit": 50, + "region": "default", + "logGroupNamePrefix": "g" + }`), }, }, }) require.NoError(t, err) require.NotNil(t, resp) - assert.Equal(t, plugins.DataResponse{ - Results: map[string]plugins.DataQueryResult{ - "": { - Dataframes: plugins.NewDecodedDataFrames(data.Frames{ - &data.Frame{ - Name: "logGroups", - Fields: []*data.Field{ - data.NewField("logGroupName", nil, []*string{ - aws.String("group_a"), aws.String("group_b"), aws.String("group_c"), - }), - }, - Meta: &data.FrameMeta{ - PreferredVisualization: "logs", - }, + assert.Equal(t, &backend.QueryDataResponse{Responses: backend.Responses{ + "": backend.DataResponse{ + Frames: data.Frames{ + &data.Frame{ + Name: "logGroups", + Fields: []*data.Field{ + data.NewField("logGroupName", nil, []*string{ + aws.String("group_a"), aws.String("group_b"), aws.String("group_c"), + }), + }, + Meta: &data.FrameMeta{ + PreferredVisualization: "logs", }, - }), + }, }, }, + }, }, resp) }) } @@ -170,17 +186,24 @@ func TestQuery_GetLogGroupFields(t *testing.T) { const refID = "A" - executor := newExecutor(nil, newTestConfig(), fakeSessionCache{}) - resp, err := executor.DataQuery(context.Background(), fakeDataSource(), plugins.DataQuery{ - Queries: []plugins.DataSubQuery{ + im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { + return datasourceInfo{}, nil + }) + + executor := newExecutor(nil, im, newTestConfig(), fakeSessionCache{}) + resp, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{ + PluginContext: backend.PluginContext{ + DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{}, + }, + Queries: []backend.DataQuery{ { RefID: refID, - Model: simplejson.NewFromAny(map[string]interface{}{ - "type": "logAction", - "subtype": "GetLogGroupFields", + JSON: json.RawMessage(`{ + "type": "logAction", + "subtype": "GetLogGroupFields", "logGroupName": "group_a", - "limit": 50, - }), + "limit": 50 + }`), }, }, }) @@ -202,13 +225,11 @@ func TestQuery_GetLogGroupFields(t *testing.T) { }, } expFrame.RefID = refID - assert.Equal(t, plugins.DataResponse{ - Results: map[string]plugins.DataQueryResult{ - refID: { - Dataframes: plugins.NewDecodedDataFrames(data.Frames{expFrame}), - RefID: refID, - }, + assert.Equal(t, &backend.QueryDataResponse{Responses: backend.Responses{ + refID: backend.DataResponse{ + Frames: data.Frames{expFrame}, }, + }, }, resp) } @@ -244,23 +265,30 @@ func TestQuery_StartQuery(t *testing.T) { }, } - timeRange := plugins.DataTimeRange{ - From: "1584873443000", - To: "1584700643000", + timeRange := backend.TimeRange{ + From: time.Unix(1584873443, 0), + To: time.Unix(1584700643, 0), } - executor := newExecutor(nil, newTestConfig(), fakeSessionCache{}) - _, err := executor.DataQuery(context.Background(), fakeDataSource(), plugins.DataQuery{ - TimeRange: &timeRange, - Queries: []plugins.DataSubQuery{ + im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { + return datasourceInfo{}, nil + }) + + executor := newExecutor(nil, im, newTestConfig(), fakeSessionCache{}) + _, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{ + PluginContext: backend.PluginContext{ + DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{}, + }, + Queries: []backend.DataQuery{ { - Model: simplejson.NewFromAny(map[string]interface{}{ + TimeRange: timeRange, + JSON: json.RawMessage(`{ "type": "logAction", "subtype": "StartQuery", "limit": 50, "region": "default", - "queryString": "fields @message", - }), + "queryString": "fields @message" + }`), }, }, }) @@ -290,24 +318,31 @@ func TestQuery_StartQuery(t *testing.T) { }, } - timeRange := plugins.DataTimeRange{ - From: "1584700643000", - To: "1584873443000", + timeRange := backend.TimeRange{ + From: time.Unix(1584700643000, 0), + To: time.Unix(1584873443000, 0), } - executor := newExecutor(nil, newTestConfig(), fakeSessionCache{}) - resp, err := executor.DataQuery(context.Background(), fakeDataSource(), plugins.DataQuery{ - TimeRange: &timeRange, - Queries: []plugins.DataSubQuery{ + im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { + return datasourceInfo{}, nil + }) + + executor := newExecutor(nil, im, newTestConfig(), fakeSessionCache{}) + resp, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{ + PluginContext: backend.PluginContext{ + DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{}, + }, + Queries: []backend.DataQuery{ { - RefID: refID, - Model: simplejson.NewFromAny(map[string]interface{}{ + RefID: refID, + TimeRange: timeRange, + JSON: json.RawMessage(`{ "type": "logAction", "subtype": "StartQuery", "limit": 50, "region": "default", - "queryString": "fields @message", - }), + "queryString": "fields @message" + }`), }, }, }) @@ -324,13 +359,11 @@ func TestQuery_StartQuery(t *testing.T) { }, PreferredVisualization: "logs", } - assert.Equal(t, plugins.DataResponse{ - Results: map[string]plugins.DataQueryResult{ - refID: { - Dataframes: plugins.NewDecodedDataFrames(data.Frames{expFrame}), - RefID: refID, - }, + assert.Equal(t, &backend.QueryDataResponse{Responses: backend.Responses{ + refID: { + Frames: data.Frames{expFrame}, }, + }, }, resp) }) } @@ -366,21 +399,28 @@ func TestQuery_StopQuery(t *testing.T) { }, } - timeRange := plugins.DataTimeRange{ - From: "1584873443000", - To: "1584700643000", + im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { + return datasourceInfo{}, nil + }) + + timeRange := backend.TimeRange{ + From: time.Unix(1584873443, 0), + To: time.Unix(1584700643, 0), } - executor := newExecutor(nil, newTestConfig(), fakeSessionCache{}) - resp, err := executor.DataQuery(context.Background(), fakeDataSource(), plugins.DataQuery{ - TimeRange: &timeRange, - Queries: []plugins.DataSubQuery{ + executor := newExecutor(nil, im, newTestConfig(), fakeSessionCache{}) + resp, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{ + PluginContext: backend.PluginContext{ + DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{}, + }, + Queries: []backend.DataQuery{ { - Model: simplejson.NewFromAny(map[string]interface{}{ + TimeRange: timeRange, + JSON: json.RawMessage(`{ "type": "logAction", "subtype": "StopQuery", - "queryId": "abcd-efgh-ijkl-mnop", - }), + "queryId": "abcd-efgh-ijkl-mnop" + }`), }, }, }) @@ -395,12 +435,11 @@ func TestQuery_StopQuery(t *testing.T) { PreferredVisualization: "logs", }, } - assert.Equal(t, plugins.DataResponse{ - Results: map[string]plugins.DataQueryResult{ - "": { - Dataframes: plugins.NewDecodedDataFrames(data.Frames{expFrame}), - }, + assert.Equal(t, &backend.QueryDataResponse{Responses: backend.Responses{ + "": { + Frames: data.Frames{expFrame}, }, + }, }, resp) } @@ -458,16 +497,23 @@ func TestQuery_GetQueryResults(t *testing.T) { }, } - executor := newExecutor(nil, newTestConfig(), fakeSessionCache{}) - resp, err := executor.DataQuery(context.Background(), fakeDataSource(), plugins.DataQuery{ - Queries: []plugins.DataSubQuery{ + im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { + return datasourceInfo{}, nil + }) + + executor := newExecutor(nil, im, newTestConfig(), fakeSessionCache{}) + resp, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{ + PluginContext: backend.PluginContext{ + DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{}, + }, + Queries: []backend.DataQuery{ { RefID: refID, - Model: simplejson.NewFromAny(map[string]interface{}{ + JSON: json.RawMessage(`{ "type": "logAction", "subtype": "GetQueryResults", - "queryId": "abcd-efgh-ijkl-mnop", - }), + "queryId": "abcd-efgh-ijkl-mnop" + }`), }, }, }) @@ -507,12 +553,10 @@ func TestQuery_GetQueryResults(t *testing.T) { PreferredVisualization: "logs", } - assert.Equal(t, plugins.DataResponse{ - Results: map[string]plugins.DataQueryResult{ - refID: { - RefID: refID, - Dataframes: plugins.NewDecodedDataFrames(data.Frames{expFrame}), - }, + assert.Equal(t, &backend.QueryDataResponse{Responses: backend.Responses{ + refID: { + Frames: data.Frames{expFrame}, }, + }, }, resp) } diff --git a/pkg/tsdb/cloudwatch/metric_find_query.go b/pkg/tsdb/cloudwatch/metric_find_query.go index d66f026bdfd..24f651c6de0 100644 --- a/pkg/tsdb/cloudwatch/metric_find_query.go +++ b/pkg/tsdb/cloudwatch/metric_find_query.go @@ -15,9 +15,10 @@ import ( "github.com/aws/aws-sdk-go/service/cloudwatch" "github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi" + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/infra/metrics" - "github.com/grafana/grafana/pkg/plugins" "github.com/grafana/grafana/pkg/util/errutil" ) @@ -241,68 +242,57 @@ var dimensionsMap = map[string][]string{ var regionCache sync.Map -func (e *cloudWatchExecutor) executeMetricFindQuery(ctx context.Context, queryContext plugins.DataQuery) ( - plugins.DataResponse, error) { - firstQuery := queryContext.Queries[0] +func (e *cloudWatchExecutor) executeMetricFindQuery(ctx context.Context, model *simplejson.Json, query backend.DataQuery, pluginCtx backend.PluginContext) (*backend.QueryDataResponse, error) { + subType := model.Get("subtype").MustString() - parameters := firstQuery.Model - subType := firstQuery.Model.Get("subtype").MustString() var data []suggestData var err error switch subType { case "regions": - data, err = e.handleGetRegions(ctx, parameters, queryContext) + data, err = e.handleGetRegions(ctx, model, pluginCtx) case "namespaces": - data, err = e.handleGetNamespaces(ctx, parameters, queryContext) + data, err = e.handleGetNamespaces(ctx, model, pluginCtx) case "metrics": - data, err = e.handleGetMetrics(ctx, parameters, queryContext) + data, err = e.handleGetMetrics(ctx, model, pluginCtx) case "dimension_keys": - data, err = e.handleGetDimensions(ctx, parameters, queryContext) + data, err = e.handleGetDimensions(ctx, model, pluginCtx) case "dimension_values": - data, err = e.handleGetDimensionValues(ctx, parameters, queryContext) + data, err = e.handleGetDimensionValues(ctx, model, pluginCtx) case "ebs_volume_ids": - data, err = e.handleGetEbsVolumeIds(ctx, parameters, queryContext) + data, err = e.handleGetEbsVolumeIds(ctx, model, pluginCtx) case "ec2_instance_attribute": - data, err = e.handleGetEc2InstanceAttribute(ctx, parameters, queryContext) + data, err = e.handleGetEc2InstanceAttribute(ctx, model, pluginCtx) case "resource_arns": - data, err = e.handleGetResourceArns(ctx, parameters, queryContext) + data, err = e.handleGetResourceArns(ctx, model, pluginCtx) } if err != nil { - return plugins.DataResponse{}, err + return nil, err } - queryResult := plugins.DataQueryResult{Meta: simplejson.New(), RefID: firstQuery.RefID} - transformToTable(data, &queryResult) - result := plugins.DataResponse{ - Results: map[string]plugins.DataQueryResult{ - firstQuery.RefID: queryResult, - }, - } - return result, nil + resp := backend.NewQueryDataResponse() + respD := resp.Responses[query.RefID] + respD.Frames = append(respD.Frames, transformToTable(data)) + resp.Responses[query.RefID] = respD + + return resp, nil } -func transformToTable(data []suggestData, result *plugins.DataQueryResult) { - table := plugins.DataTable{ - Columns: []plugins.DataTableColumn{ - { - Text: "text", - }, - { - Text: "value", - }, - }, - Rows: make([]plugins.DataRowValues, 0), +func transformToTable(d []suggestData) *data.Frame { + frame := data.NewFrame("", + data.NewField("text", nil, []string{}), + data.NewField("value", nil, []string{})) + + for _, r := range d { + frame.AppendRow(r.Text, r.Value) } - for _, r := range data { - values := []interface{}{ - r.Text, - r.Value, - } - table.Rows = append(table.Rows, values) + frame.Meta = &data.FrameMeta{ + Custom: map[string]interface{}{ + "rowCount": len(d), + }, } - result.Tables = append(result.Tables, table) - result.Meta.Set("rowCount", len(data)) + + return frame } func parseMultiSelectValue(input string) []string { @@ -322,16 +312,20 @@ func parseMultiSelectValue(input string) []string { // Whenever this list is updated, the frontend list should also be updated. // Please update the region list in public/app/plugins/datasource/cloudwatch/partials/config.html func (e *cloudWatchExecutor) handleGetRegions(ctx context.Context, parameters *simplejson.Json, - queryContext plugins.DataQuery) ([]suggestData, error) { - dsInfo := e.getAWSDatasourceSettings(defaultRegion) - profile := dsInfo.Profile + pluginCtx backend.PluginContext) ([]suggestData, error) { + dsInfo, err := e.getDSInfo(pluginCtx) + if err != nil { + return nil, err + } + + profile := dsInfo.profile if cache, ok := regionCache.Load(profile); ok { if cache2, ok2 := cache.([]suggestData); ok2 { return cache2, nil } } - client, err := e.getEC2Client(defaultRegion) + client, err := e.getEC2Client(defaultRegion, pluginCtx) if err != nil { return nil, err } @@ -367,12 +361,18 @@ func (e *cloudWatchExecutor) handleGetRegions(ctx context.Context, parameters *s return result, nil } -func (e *cloudWatchExecutor) handleGetNamespaces(ctx context.Context, parameters *simplejson.Json, queryContext plugins.DataQuery) ([]suggestData, error) { - keys := []string{} +func (e *cloudWatchExecutor) handleGetNamespaces(ctx context.Context, parameters *simplejson.Json, pluginCtx backend.PluginContext) ([]suggestData, error) { + var keys []string for key := range metricsMap { keys = append(keys, key) } - customNamespaces := e.DataSource.JsonData.Get("customMetricsNamespaces").MustString() + + dsInfo, err := e.getDSInfo(pluginCtx) + if err != nil { + return nil, err + } + + customNamespaces := dsInfo.namespace if customNamespaces != "" { keys = append(keys, strings.Split(customNamespaces, ",")...) } @@ -386,7 +386,7 @@ func (e *cloudWatchExecutor) handleGetNamespaces(ctx context.Context, parameters return result, nil } -func (e *cloudWatchExecutor) handleGetMetrics(ctx context.Context, parameters *simplejson.Json, queryContext plugins.DataQuery) ([]suggestData, error) { +func (e *cloudWatchExecutor) handleGetMetrics(ctx context.Context, parameters *simplejson.Json, pluginCtx backend.PluginContext) ([]suggestData, error) { region := parameters.Get("region").MustString() namespace := parameters.Get("namespace").MustString() @@ -398,7 +398,7 @@ func (e *cloudWatchExecutor) handleGetMetrics(ctx context.Context, parameters *s } } else { var err error - if namespaceMetrics, err = e.getMetricsForCustomMetrics(region, namespace); err != nil { + if namespaceMetrics, err = e.getMetricsForCustomMetrics(region, namespace, pluginCtx); err != nil { return nil, errutil.Wrap("unable to call AWS API", err) } } @@ -412,7 +412,7 @@ func (e *cloudWatchExecutor) handleGetMetrics(ctx context.Context, parameters *s return result, nil } -func (e *cloudWatchExecutor) handleGetDimensions(ctx context.Context, parameters *simplejson.Json, queryContext plugins.DataQuery) ([]suggestData, error) { +func (e *cloudWatchExecutor) handleGetDimensions(ctx context.Context, parameters *simplejson.Json, pluginCtx backend.PluginContext) ([]suggestData, error) { region := parameters.Get("region").MustString() namespace := parameters.Get("namespace").MustString() @@ -424,7 +424,7 @@ func (e *cloudWatchExecutor) handleGetDimensions(ctx context.Context, parameters } } else { var err error - if dimensionValues, err = e.getDimensionsForCustomMetrics(region, namespace); err != nil { + if dimensionValues, err = e.getDimensionsForCustomMetrics(region, namespace, pluginCtx); err != nil { return nil, errutil.Wrap("unable to call AWS API", err) } } @@ -438,7 +438,7 @@ func (e *cloudWatchExecutor) handleGetDimensions(ctx context.Context, parameters return result, nil } -func (e *cloudWatchExecutor) handleGetDimensionValues(ctx context.Context, parameters *simplejson.Json, queryContext plugins.DataQuery) ([]suggestData, error) { +func (e *cloudWatchExecutor) handleGetDimensionValues(ctx context.Context, parameters *simplejson.Json, pluginCtx backend.PluginContext) ([]suggestData, error) { region := parameters.Get("region").MustString() namespace := parameters.Get("namespace").MustString() metricName := parameters.Get("metricName").MustString() @@ -469,7 +469,7 @@ func (e *cloudWatchExecutor) handleGetDimensionValues(ctx context.Context, param if metricName != "" { params.MetricName = aws.String(metricName) } - metrics, err := e.listMetrics(region, params) + metrics, err := e.listMetrics(region, params, pluginCtx) if err != nil { return nil, err } @@ -497,12 +497,12 @@ func (e *cloudWatchExecutor) handleGetDimensionValues(ctx context.Context, param } func (e *cloudWatchExecutor) handleGetEbsVolumeIds(ctx context.Context, parameters *simplejson.Json, - queryContext plugins.DataQuery) ([]suggestData, error) { + pluginCtx backend.PluginContext) ([]suggestData, error) { region := parameters.Get("region").MustString() instanceId := parameters.Get("instanceId").MustString() instanceIds := aws.StringSlice(parseMultiSelectValue(instanceId)) - instances, err := e.ec2DescribeInstances(region, nil, instanceIds) + instances, err := e.ec2DescribeInstances(region, nil, instanceIds, pluginCtx) if err != nil { return nil, err } @@ -520,7 +520,7 @@ func (e *cloudWatchExecutor) handleGetEbsVolumeIds(ctx context.Context, paramete } func (e *cloudWatchExecutor) handleGetEc2InstanceAttribute(ctx context.Context, parameters *simplejson.Json, - queryContext plugins.DataQuery) ([]suggestData, error) { + pluginCtx backend.PluginContext) ([]suggestData, error) { region := parameters.Get("region").MustString() attributeName := parameters.Get("attributeName").MustString() filterJson := parameters.Get("filters").MustMap() @@ -541,7 +541,7 @@ func (e *cloudWatchExecutor) handleGetEc2InstanceAttribute(ctx context.Context, } } - instances, err := e.ec2DescribeInstances(region, filters, nil) + instances, err := e.ec2DescribeInstances(region, filters, nil, pluginCtx) if err != nil { return nil, err } @@ -600,7 +600,7 @@ func (e *cloudWatchExecutor) handleGetEc2InstanceAttribute(ctx context.Context, } func (e *cloudWatchExecutor) handleGetResourceArns(ctx context.Context, parameters *simplejson.Json, - queryContext plugins.DataQuery) ([]suggestData, error) { + pluginCtx backend.PluginContext) ([]suggestData, error) { region := parameters.Get("region").MustString() resourceType := parameters.Get("resourceType").MustString() filterJson := parameters.Get("tags").MustMap() @@ -624,7 +624,7 @@ func (e *cloudWatchExecutor) handleGetResourceArns(ctx context.Context, paramete var resourceTypes []*string resourceTypes = append(resourceTypes, &resourceType) - resources, err := e.resourceGroupsGetResources(region, filters, resourceTypes) + resources, err := e.resourceGroupsGetResources(region, filters, resourceTypes, pluginCtx) if err != nil { return nil, err } @@ -638,14 +638,14 @@ func (e *cloudWatchExecutor) handleGetResourceArns(ctx context.Context, paramete return result, nil } -func (e *cloudWatchExecutor) listMetrics(region string, params *cloudwatch.ListMetricsInput) ([]*cloudwatch.Metric, error) { - client, err := e.getCWClient(region) +func (e *cloudWatchExecutor) listMetrics(region string, params *cloudwatch.ListMetricsInput, pluginCtx backend.PluginContext) ([]*cloudwatch.Metric, error) { + client, err := e.getCWClient(region, pluginCtx) if err != nil { return nil, err } plog.Debug("Listing metrics pages") - cloudWatchMetrics := []*cloudwatch.Metric{} + var cloudWatchMetrics []*cloudwatch.Metric pageNum := 0 err = client.ListMetricsPages(params, func(page *cloudwatch.ListMetricsOutput, lastPage bool) bool { @@ -663,13 +663,13 @@ func (e *cloudWatchExecutor) listMetrics(region string, params *cloudwatch.ListM return cloudWatchMetrics, err } -func (e *cloudWatchExecutor) ec2DescribeInstances(region string, filters []*ec2.Filter, instanceIds []*string) (*ec2.DescribeInstancesOutput, error) { +func (e *cloudWatchExecutor) ec2DescribeInstances(region string, filters []*ec2.Filter, instanceIds []*string, pluginCtx backend.PluginContext) (*ec2.DescribeInstancesOutput, error) { params := &ec2.DescribeInstancesInput{ Filters: filters, InstanceIds: instanceIds, } - client, err := e.getEC2Client(region) + client, err := e.getEC2Client(region, pluginCtx) if err != nil { return nil, err } @@ -686,13 +686,13 @@ func (e *cloudWatchExecutor) ec2DescribeInstances(region string, filters []*ec2. } func (e *cloudWatchExecutor) resourceGroupsGetResources(region string, filters []*resourcegroupstaggingapi.TagFilter, - resourceTypes []*string) (*resourcegroupstaggingapi.GetResourcesOutput, error) { + resourceTypes []*string, pluginCtx backend.PluginContext) (*resourcegroupstaggingapi.GetResourcesOutput, error) { params := &resourcegroupstaggingapi.GetResourcesInput{ ResourceTypeFilters: resourceTypes, TagFilters: filters, } - client, err := e.getRGTAClient(region) + client, err := e.getRGTAClient(region, pluginCtx) if err != nil { return nil, err } @@ -711,90 +711,94 @@ func (e *cloudWatchExecutor) resourceGroupsGetResources(region string, filters [ var metricsCacheLock sync.Mutex -func (e *cloudWatchExecutor) getMetricsForCustomMetrics(region, namespace string) ([]string, error) { +func (e *cloudWatchExecutor) getMetricsForCustomMetrics(region, namespace string, pluginCtx backend.PluginContext) ([]string, error) { plog.Debug("Getting metrics for custom metrics", "region", region, "namespace", namespace) metricsCacheLock.Lock() defer metricsCacheLock.Unlock() - dsInfo := e.getAWSDatasourceSettings(region) + dsInfo, err := e.getDSInfo(pluginCtx) + if err != nil { + return nil, err + } - if _, ok := customMetricsMetricsMap[dsInfo.Profile]; !ok { - customMetricsMetricsMap[dsInfo.Profile] = make(map[string]map[string]*customMetricsCache) + if _, ok := customMetricsMetricsMap[dsInfo.profile]; !ok { + customMetricsMetricsMap[dsInfo.profile] = make(map[string]map[string]*customMetricsCache) } - if _, ok := customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region]; !ok { - customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region] = make(map[string]*customMetricsCache) + if _, ok := customMetricsMetricsMap[dsInfo.profile][dsInfo.region]; !ok { + customMetricsMetricsMap[dsInfo.profile][dsInfo.region] = make(map[string]*customMetricsCache) } - if _, ok := customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region][namespace]; !ok { - customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region][namespace] = &customMetricsCache{} - customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region][namespace].Cache = make([]string, 0) + if _, ok := customMetricsMetricsMap[dsInfo.profile][dsInfo.region][namespace]; !ok { + customMetricsMetricsMap[dsInfo.profile][dsInfo.region][namespace] = &customMetricsCache{} + customMetricsMetricsMap[dsInfo.profile][dsInfo.region][namespace].Cache = make([]string, 0) } - if customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region][namespace].Expire.After(time.Now()) { - return customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region][namespace].Cache, nil + if customMetricsMetricsMap[dsInfo.profile][dsInfo.region][namespace].Expire.After(time.Now()) { + return customMetricsMetricsMap[dsInfo.profile][dsInfo.region][namespace].Cache, nil } metrics, err := e.listMetrics(region, &cloudwatch.ListMetricsInput{ Namespace: aws.String(namespace), - }) - + }, pluginCtx) if err != nil { return []string{}, err } - customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region][namespace].Cache = make([]string, 0) - customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region][namespace].Expire = time.Now().Add(5 * time.Minute) + customMetricsMetricsMap[dsInfo.profile][dsInfo.region][namespace].Cache = make([]string, 0) + customMetricsMetricsMap[dsInfo.profile][dsInfo.region][namespace].Expire = time.Now().Add(5 * time.Minute) for _, metric := range metrics { - if isDuplicate(customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region][namespace].Cache, *metric.MetricName) { + if isDuplicate(customMetricsMetricsMap[dsInfo.profile][dsInfo.region][namespace].Cache, *metric.MetricName) { continue } - customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region][namespace].Cache = append( - customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region][namespace].Cache, *metric.MetricName) + customMetricsMetricsMap[dsInfo.profile][dsInfo.region][namespace].Cache = append( + customMetricsMetricsMap[dsInfo.profile][dsInfo.region][namespace].Cache, *metric.MetricName) } - return customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region][namespace].Cache, nil + return customMetricsMetricsMap[dsInfo.profile][dsInfo.region][namespace].Cache, nil } var dimensionsCacheLock sync.Mutex -func (e *cloudWatchExecutor) getDimensionsForCustomMetrics(region, namespace string) ([]string, error) { +func (e *cloudWatchExecutor) getDimensionsForCustomMetrics(region, namespace string, pluginCtx backend.PluginContext) ([]string, error) { dimensionsCacheLock.Lock() defer dimensionsCacheLock.Unlock() - dsInfo := e.getAWSDatasourceSettings(region) + dsInfo, err := e.getDSInfo(pluginCtx) + if err != nil { + return nil, err + } - if _, ok := customMetricsDimensionsMap[dsInfo.Profile]; !ok { - customMetricsDimensionsMap[dsInfo.Profile] = make(map[string]map[string]*customMetricsCache) + if _, ok := customMetricsDimensionsMap[dsInfo.profile]; !ok { + customMetricsDimensionsMap[dsInfo.profile] = make(map[string]map[string]*customMetricsCache) } - if _, ok := customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region]; !ok { - customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region] = make(map[string]*customMetricsCache) + if _, ok := customMetricsDimensionsMap[dsInfo.profile][dsInfo.region]; !ok { + customMetricsDimensionsMap[dsInfo.profile][dsInfo.region] = make(map[string]*customMetricsCache) } - if _, ok := customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region][namespace]; !ok { - customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region][namespace] = &customMetricsCache{} - customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region][namespace].Cache = make([]string, 0) + if _, ok := customMetricsDimensionsMap[dsInfo.profile][dsInfo.region][namespace]; !ok { + customMetricsDimensionsMap[dsInfo.profile][dsInfo.region][namespace] = &customMetricsCache{} + customMetricsDimensionsMap[dsInfo.profile][dsInfo.region][namespace].Cache = make([]string, 0) } - if customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region][namespace].Expire.After(time.Now()) { - return customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region][namespace].Cache, nil + if customMetricsDimensionsMap[dsInfo.profile][dsInfo.region][namespace].Expire.After(time.Now()) { + return customMetricsDimensionsMap[dsInfo.profile][dsInfo.region][namespace].Cache, nil } - - metrics, err := e.listMetrics(region, &cloudwatch.ListMetricsInput{Namespace: aws.String(namespace)}) + metrics, err := e.listMetrics(region, &cloudwatch.ListMetricsInput{Namespace: aws.String(namespace)}, pluginCtx) if err != nil { return []string{}, err } - customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region][namespace].Cache = make([]string, 0) - customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region][namespace].Expire = time.Now().Add(5 * time.Minute) + customMetricsDimensionsMap[dsInfo.profile][dsInfo.region][namespace].Cache = make([]string, 0) + customMetricsDimensionsMap[dsInfo.profile][dsInfo.region][namespace].Expire = time.Now().Add(5 * time.Minute) for _, metric := range metrics { for _, dimension := range metric.Dimensions { - if isDuplicate(customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region][namespace].Cache, *dimension.Name) { + if isDuplicate(customMetricsDimensionsMap[dsInfo.profile][dsInfo.region][namespace].Cache, *dimension.Name) { continue } - customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region][namespace].Cache = append( - customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region][namespace].Cache, *dimension.Name) + customMetricsDimensionsMap[dsInfo.profile][dsInfo.region][namespace].Cache = append( + customMetricsDimensionsMap[dsInfo.profile][dsInfo.region][namespace].Cache, *dimension.Name) } } - return customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region][namespace].Cache, nil + return customMetricsDimensionsMap[dsInfo.profile][dsInfo.region][namespace].Cache, nil } func isDuplicate(nameList []string, target string) bool { diff --git a/pkg/tsdb/cloudwatch/metric_find_query_test.go b/pkg/tsdb/cloudwatch/metric_find_query_test.go index 1688daf7e7e..01f4bd40484 100644 --- a/pkg/tsdb/cloudwatch/metric_find_query_test.go +++ b/pkg/tsdb/cloudwatch/metric_find_query_test.go @@ -2,6 +2,7 @@ package cloudwatch import ( "context" + "encoding/json" "testing" "github.com/aws/aws-sdk-go/aws" @@ -13,8 +14,10 @@ import ( "github.com/aws/aws-sdk-go/service/ec2/ec2iface" "github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi" "github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi/resourcegroupstaggingapiiface" - "github.com/grafana/grafana/pkg/components/simplejson" - "github.com/grafana/grafana/pkg/plugins" + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/backend/datasource" + "github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt" + "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana/pkg/setting" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -26,14 +29,14 @@ func TestQuery_Metrics(t *testing.T) { NewCWClient = origNewCWClient }) - var client FakeCWClient + var cwClient FakeCWClient NewCWClient = func(sess *session.Session) cloudwatchiface.CloudWatchAPI { - return client + return cwClient } t.Run("Custom metrics", func(t *testing.T) { - client = FakeCWClient{ + cwClient = FakeCWClient{ Metrics: []*cloudwatch.Metric{ { MetricName: aws.String("Test_MetricName"), @@ -45,52 +48,50 @@ func TestQuery_Metrics(t *testing.T) { }, }, } - executor := newExecutor(nil, newTestConfig(), fakeSessionCache{}) - resp, err := executor.DataQuery(context.Background(), fakeDataSource(), plugins.DataQuery{ - Queries: []plugins.DataSubQuery{ + + im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { + return datasourceInfo{}, nil + }) + + executor := newExecutor(nil, im, newTestConfig(), fakeSessionCache{}) + resp, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{ + PluginContext: backend.PluginContext{ + DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{}, + }, + Queries: []backend.DataQuery{ { - Model: simplejson.NewFromAny(map[string]interface{}{ + JSON: json.RawMessage(`{ "type": "metricFindQuery", "subtype": "metrics", "region": "us-east-1", - "namespace": "custom", - }), + "namespace": "custom" + }`), }, }, }) require.NoError(t, err) - assert.Equal(t, plugins.DataResponse{ - Results: map[string]plugins.DataQueryResult{ - "": { - Meta: simplejson.NewFromAny(map[string]interface{}{ - "rowCount": 1, - }), - Tables: []plugins.DataTable{ - { - Columns: []plugins.DataTableColumn{ - { - Text: "text", - }, - { - Text: "value", - }, - }, - Rows: []plugins.DataRowValues{ - { - "Test_MetricName", - "Test_MetricName", - }, - }, - }, - }, - }, + expFrame := data.NewFrame( + "", + data.NewField("text", nil, []string{"Test_MetricName"}), + data.NewField("value", nil, []string{"Test_MetricName"}), + ) + expFrame.Meta = &data.FrameMeta{ + Custom: map[string]interface{}{ + "rowCount": 1, + }, + } + + assert.Equal(t, &backend.QueryDataResponse{Responses: backend.Responses{ + "": { + Frames: data.Frames{expFrame}, }, + }, }, resp) }) t.Run("Dimension keys for custom metrics", func(t *testing.T) { - client = FakeCWClient{ + cwClient = FakeCWClient{ Metrics: []*cloudwatch.Metric{ { MetricName: aws.String("Test_MetricName"), @@ -102,47 +103,44 @@ func TestQuery_Metrics(t *testing.T) { }, }, } - executor := newExecutor(nil, newTestConfig(), fakeSessionCache{}) - resp, err := executor.DataQuery(context.Background(), fakeDataSource(), plugins.DataQuery{ - Queries: []plugins.DataSubQuery{ + + im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { + return datasourceInfo{}, nil + }) + + executor := newExecutor(nil, im, newTestConfig(), fakeSessionCache{}) + resp, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{ + PluginContext: backend.PluginContext{ + DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{}, + }, + Queries: []backend.DataQuery{ { - Model: simplejson.NewFromAny(map[string]interface{}{ + JSON: json.RawMessage(`{ "type": "metricFindQuery", "subtype": "dimension_keys", "region": "us-east-1", - "namespace": "custom", - }), + "namespace": "custom" + }`), }, }, }) require.NoError(t, err) - assert.Equal(t, plugins.DataResponse{ - Results: map[string]plugins.DataQueryResult{ - "": { - Meta: simplejson.NewFromAny(map[string]interface{}{ - "rowCount": 1, - }), - Tables: []plugins.DataTable{ - { - Columns: []plugins.DataTableColumn{ - { - Text: "text", - }, - { - Text: "value", - }, - }, - Rows: []plugins.DataRowValues{ - { - "Test_DimensionName", - "Test_DimensionName", - }, - }, - }, - }, - }, + expFrame := data.NewFrame( + "", + data.NewField("text", nil, []string{"Test_DimensionName"}), + data.NewField("value", nil, []string{"Test_DimensionName"}), + ) + expFrame.Meta = &data.FrameMeta{ + Custom: map[string]interface{}{ + "rowCount": 1, }, + } + assert.Equal(t, &backend.QueryDataResponse{Responses: backend.Responses{ + "": { + Frames: data.Frames{expFrame}, + }, + }, }, resp) }) } @@ -164,53 +162,46 @@ func TestQuery_Regions(t *testing.T) { cli = fakeEC2Client{ regions: []string{regionName}, } - executor := newExecutor(nil, newTestConfig(), fakeSessionCache{}) - resp, err := executor.DataQuery(context.Background(), fakeDataSource(), plugins.DataQuery{ - Queries: []plugins.DataSubQuery{ + + im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { + return datasourceInfo{}, nil + }) + + executor := newExecutor(nil, im, newTestConfig(), fakeSessionCache{}) + resp, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{ + PluginContext: backend.PluginContext{ + DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{}, + }, + Queries: []backend.DataQuery{ { - Model: simplejson.NewFromAny(map[string]interface{}{ + JSON: json.RawMessage(`{ "type": "metricFindQuery", "subtype": "regions", "region": "us-east-1", - "namespace": "custom", - }), + "namespace": "custom" + }`), }, }, }) require.NoError(t, err) - rows := []plugins.DataRowValues{} - for _, region := range knownRegions { - rows = append(rows, []interface{}{ - region, - region, - }) + expRegions := append(knownRegions, regionName) + expFrame := data.NewFrame( + "", + data.NewField("text", nil, expRegions), + data.NewField("value", nil, expRegions), + ) + expFrame.Meta = &data.FrameMeta{ + Custom: map[string]interface{}{ + "rowCount": len(knownRegions) + 1, + }, } - rows = append(rows, []interface{}{ - regionName, - regionName, - }) - assert.Equal(t, plugins.DataResponse{ - Results: map[string]plugins.DataQueryResult{ - "": { - Meta: simplejson.NewFromAny(map[string]interface{}{ - "rowCount": len(knownRegions) + 1, - }), - Tables: []plugins.DataTable{ - { - Columns: []plugins.DataTableColumn{ - { - Text: "text", - }, - { - Text: "value", - }, - }, - Rows: rows, - }, - }, - }, + + assert.Equal(t, &backend.QueryDataResponse{Responses: backend.Responses{ + "": { + Frames: data.Frames{expFrame}, }, + }, }, resp) }) } @@ -246,50 +237,48 @@ func TestQuery_InstanceAttributes(t *testing.T) { }, }, } - executor := newExecutor(nil, newTestConfig(), fakeSessionCache{}) - resp, err := executor.DataQuery(context.Background(), fakeDataSource(), plugins.DataQuery{ - Queries: []plugins.DataSubQuery{ + + im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { + return datasourceInfo{}, nil + }) + + executor := newExecutor(nil, im, newTestConfig(), fakeSessionCache{}) + resp, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{ + PluginContext: backend.PluginContext{ + DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{}, + }, + Queries: []backend.DataQuery{ { - Model: simplejson.NewFromAny(map[string]interface{}{ + JSON: json.RawMessage(`{ "type": "metricFindQuery", "subtype": "ec2_instance_attribute", "region": "us-east-1", "attributeName": "InstanceId", - "filters": map[string]interface{}{ - "tag:Environment": []string{"production"}, - }, - }), + "filters": { + "tag:Environment": ["production"] + } + }`), }, }, }) require.NoError(t, err) - assert.Equal(t, plugins.DataResponse{ - Results: map[string]plugins.DataQueryResult{ - "": { - Meta: simplejson.NewFromAny(map[string]interface{}{ - "rowCount": 1, - }), - Tables: []plugins.DataTable{ - { - Columns: []plugins.DataTableColumn{ - { - Text: "text", - }, - { - Text: "value", - }, - }, - Rows: []plugins.DataRowValues{ - { - instanceID, - instanceID, - }, - }, - }, - }, - }, + expFrame := data.NewFrame( + "", + data.NewField("text", nil, []string{instanceID}), + data.NewField("value", nil, []string{instanceID}), + ) + expFrame.Meta = &data.FrameMeta{ + Custom: map[string]interface{}{ + "rowCount": 1, + }, + } + + assert.Equal(t, &backend.QueryDataResponse{Responses: backend.Responses{ + "": { + Frames: data.Frames{expFrame}, }, + }, }, resp) }) } @@ -307,8 +296,6 @@ func TestQuery_EBSVolumeIDs(t *testing.T) { } t.Run("", func(t *testing.T) { - const instanceIDs = "{i-1, i-2, i-3}" - cli = fakeEC2Client{ reservations: []*ec2.Reservation{ { @@ -349,67 +336,46 @@ func TestQuery_EBSVolumeIDs(t *testing.T) { }, }, } - executor := newExecutor(nil, newTestConfig(), fakeSessionCache{}) - resp, err := executor.DataQuery(context.Background(), fakeDataSource(), plugins.DataQuery{ - Queries: []plugins.DataSubQuery{ + + im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { + return datasourceInfo{}, nil + }) + + executor := newExecutor(nil, im, newTestConfig(), fakeSessionCache{}) + resp, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{ + PluginContext: backend.PluginContext{ + DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{}, + }, + Queries: []backend.DataQuery{ { - Model: simplejson.NewFromAny(map[string]interface{}{ + JSON: json.RawMessage(`{ "type": "metricFindQuery", "subtype": "ebs_volume_ids", "region": "us-east-1", - "instanceId": instanceIDs, - }), + "instanceId": "{i-1, i-2, i-3}" + }`), }, }, }) require.NoError(t, err) - assert.Equal(t, plugins.DataResponse{ - Results: map[string]plugins.DataQueryResult{ - "": { - Meta: simplejson.NewFromAny(map[string]interface{}{ - "rowCount": 6, - }), - Tables: []plugins.DataTable{ - { - Columns: []plugins.DataTableColumn{ - { - Text: "text", - }, - { - Text: "value", - }, - }, - Rows: []plugins.DataRowValues{ - { - "vol-1-1", - "vol-1-1", - }, - { - "vol-1-2", - "vol-1-2", - }, - { - "vol-2-1", - "vol-2-1", - }, - { - "vol-2-2", - "vol-2-2", - }, - { - "vol-3-1", - "vol-3-1", - }, - { - "vol-3-2", - "vol-3-2", - }, - }, - }, - }, - }, + expValues := []string{"vol-1-1", "vol-1-2", "vol-2-1", "vol-2-2", "vol-3-1", "vol-3-2"} + expFrame := data.NewFrame( + "", + data.NewField("text", nil, expValues), + data.NewField("value", nil, expValues), + ) + expFrame.Meta = &data.FrameMeta{ + Custom: map[string]interface{}{ + "rowCount": 6, }, + } + + assert.Equal(t, &backend.QueryDataResponse{Responses: backend.Responses{ + "": { + Frames: data.Frames{expFrame}, + }, + }, }, resp) }) } @@ -449,54 +415,52 @@ func TestQuery_ResourceARNs(t *testing.T) { }, }, } - executor := newExecutor(nil, newTestConfig(), fakeSessionCache{}) - resp, err := executor.DataQuery(context.Background(), fakeDataSource(), plugins.DataQuery{ - Queries: []plugins.DataSubQuery{ + + im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { + return datasourceInfo{}, nil + }) + + executor := newExecutor(nil, im, newTestConfig(), fakeSessionCache{}) + resp, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{ + PluginContext: backend.PluginContext{ + DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{}, + }, + Queries: []backend.DataQuery{ { - Model: simplejson.NewFromAny(map[string]interface{}{ + JSON: json.RawMessage(`{ "type": "metricFindQuery", "subtype": "resource_arns", "region": "us-east-1", "resourceType": "ec2:instance", - "tags": map[string]interface{}{ - "Environment": []string{"production"}, - }, - }), + "tags": { + "Environment": ["production"] + } + }`), }, }, }) require.NoError(t, err) - assert.Equal(t, plugins.DataResponse{ - Results: map[string]plugins.DataQueryResult{ - "": { - Meta: simplejson.NewFromAny(map[string]interface{}{ - "rowCount": 2, - }), - Tables: []plugins.DataTable{ - { - Columns: []plugins.DataTableColumn{ - { - Text: "text", - }, - { - Text: "value", - }, - }, - Rows: []plugins.DataRowValues{ - { - "arn:aws:ec2:us-east-1:123456789012:instance/i-12345678901234567", - "arn:aws:ec2:us-east-1:123456789012:instance/i-12345678901234567", - }, - { - "arn:aws:ec2:us-east-1:123456789012:instance/i-76543210987654321", - "arn:aws:ec2:us-east-1:123456789012:instance/i-76543210987654321", - }, - }, - }, - }, - }, + expValues := []string{ + "arn:aws:ec2:us-east-1:123456789012:instance/i-12345678901234567", + "arn:aws:ec2:us-east-1:123456789012:instance/i-76543210987654321", + } + expFrame := data.NewFrame( + "", + data.NewField("text", nil, expValues), + data.NewField("value", nil, expValues), + ) + expFrame.Meta = &data.FrameMeta{ + Custom: map[string]interface{}{ + "rowCount": 2, + }, + } + + assert.Equal(t, &backend.QueryDataResponse{Responses: backend.Responses{ + "": { + Frames: data.Frames{expFrame}, }, + }, }, resp) }) } @@ -528,9 +492,13 @@ func TestQuery_ListMetricsPagination(t *testing.T) { t.Run("List Metrics and page limit is reached", func(t *testing.T) { client = FakeCWClient{Metrics: metrics, MetricsPerPage: 2} - executor := newExecutor(nil, &setting.Cfg{AWSListMetricsPageLimit: 3, AWSAllowedAuthProviders: []string{"default"}, AWSAssumeRoleEnabled: true}, fakeSessionCache{}) - executor.DataSource = fakeDataSource() - response, err := executor.listMetrics("default", &cloudwatch.ListMetricsInput{}) + im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { + return datasourceInfo{}, nil + }) + executor := newExecutor(nil, im, &setting.Cfg{AWSListMetricsPageLimit: 3, AWSAllowedAuthProviders: []string{"default"}, AWSAssumeRoleEnabled: true}, fakeSessionCache{}) + response, err := executor.listMetrics("default", &cloudwatch.ListMetricsInput{}, backend.PluginContext{ + DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{}, + }) require.NoError(t, err) expectedMetrics := client.MetricsPerPage * executor.cfg.AWSListMetricsPageLimit @@ -539,9 +507,13 @@ func TestQuery_ListMetricsPagination(t *testing.T) { t.Run("List Metrics and page limit is not reached", func(t *testing.T) { client = FakeCWClient{Metrics: metrics, MetricsPerPage: 2} - executor := newExecutor(nil, &setting.Cfg{AWSListMetricsPageLimit: 1000, AWSAllowedAuthProviders: []string{"default"}, AWSAssumeRoleEnabled: true}, fakeSessionCache{}) - executor.DataSource = fakeDataSource() - response, err := executor.listMetrics("default", &cloudwatch.ListMetricsInput{}) + im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { + return datasourceInfo{}, nil + }) + executor := newExecutor(nil, im, &setting.Cfg{AWSListMetricsPageLimit: 1000, AWSAllowedAuthProviders: []string{"default"}, AWSAssumeRoleEnabled: true}, fakeSessionCache{}) + response, err := executor.listMetrics("default", &cloudwatch.ListMetricsInput{}, backend.PluginContext{ + DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{}, + }) require.NoError(t, err) assert.Equal(t, len(metrics), len(response)) diff --git a/pkg/tsdb/cloudwatch/query_transformer.go b/pkg/tsdb/cloudwatch/query_transformer.go index 493a6568f17..aefa806ba4b 100644 --- a/pkg/tsdb/cloudwatch/query_transformer.go +++ b/pkg/tsdb/cloudwatch/query_transformer.go @@ -8,8 +8,8 @@ import ( "strings" "time" + "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" - "github.com/grafana/grafana/pkg/plugins" ) // returns a map of queries with query id as key. In the case a q request query @@ -55,8 +55,7 @@ func (e *cloudWatchExecutor) transformRequestQueriesToCloudWatchQueries(requestQ return cloudwatchQueries, nil } -func (e *cloudWatchExecutor) transformQueryResponsesToQueryResult(cloudwatchResponses []*cloudwatchResponse, - requestQueries []*requestQuery, startTime time.Time, endTime time.Time) (map[string]plugins.DataQueryResult, error) { +func (e *cloudWatchExecutor) transformQueryResponsesToQueryResult(cloudwatchResponses []*cloudwatchResponse, requestQueries []*requestQuery, startTime time.Time, endTime time.Time) (map[string]*backend.DataResponse, error) { responsesByRefID := make(map[string][]*cloudwatchResponse) refIDs := sort.StringSlice{} for _, res := range cloudwatchResponses { @@ -66,23 +65,35 @@ func (e *cloudWatchExecutor) transformQueryResponsesToQueryResult(cloudwatchResp // Ensure stable results refIDs.Sort() - results := make(map[string]plugins.DataQueryResult) + results := make(map[string]*backend.DataResponse) for _, refID := range refIDs { responses := responsesByRefID[refID] - queryResult := plugins.DataQueryResult{ - RefID: refID, - Series: plugins.DataTimeSeriesSlice{}, - } + queryResult := backend.DataResponse{} frames := make(data.Frames, 0, len(responses)) requestExceededMaxLimit := false partialData := false - executedQueries := []executedQuery{} + var executedQueries []executedQuery for _, response := range responses { frames = append(frames, response.DataFrames...) requestExceededMaxLimit = requestExceededMaxLimit || response.RequestExceededMaxLimit partialData = partialData || response.PartialData + + if requestExceededMaxLimit { + frames[0].AppendNotices(data.Notice{ + Severity: data.NoticeSeverityWarning, + Text: "cloudwatch GetMetricData error: Maximum number of allowed metrics exceeded. Your search may have been limited", + }) + } + + if partialData { + frames[0].AppendNotices(data.Notice{ + Severity: data.NoticeSeverityWarning, + Text: "cloudwatch GetMetricData error: Too many datapoints requested - your search has been limited. Please try to reduce the time range", + }) + } + executedQueries = append(executedQueries, executedQuery{ Expression: response.Expression, ID: response.Id, @@ -94,13 +105,6 @@ func (e *cloudWatchExecutor) transformQueryResponsesToQueryResult(cloudwatchResp return frames[i].Name < frames[j].Name }) - if requestExceededMaxLimit { - queryResult.ErrorString = "Cloudwatch GetMetricData error: Maximum number of allowed metrics exceeded. Your search may have been limited." - } - if partialData { - queryResult.ErrorString = "Cloudwatch GetMetricData error: Too many datapoints requested - your search has been limited. Please try to reduce the time range" - } - eq, err := json.Marshal(executedQueries) if err != nil { return nil, fmt.Errorf("could not marshal executedString struct: %w", err) @@ -120,8 +124,12 @@ func (e *cloudWatchExecutor) transformQueryResponsesToQueryResult(cloudwatchResp } for _, frame := range frames { - frame.Meta = &data.FrameMeta{ - ExecutedQueryString: string(eq), + if frame.Meta != nil { + frame.Meta.ExecutedQueryString = string(eq) + } else { + frame.Meta = &data.FrameMeta{ + ExecutedQueryString: string(eq), + } } if link == "" || len(frame.Fields) < 2 { @@ -135,8 +143,8 @@ func (e *cloudWatchExecutor) transformQueryResponsesToQueryResult(cloudwatchResp frame.Fields[1].Config.Links = createDataLinks(link) } - queryResult.Dataframes = plugins.NewDecodedDataFrames(frames) - results[refID] = queryResult + queryResult.Frames = frames + results[refID] = &queryResult } return results, nil diff --git a/pkg/tsdb/cloudwatch/query_transformer_test.go b/pkg/tsdb/cloudwatch/query_transformer_test.go index 3b96c11bd9d..daf0cc674b3 100644 --- a/pkg/tsdb/cloudwatch/query_transformer_test.go +++ b/pkg/tsdb/cloudwatch/query_transformer_test.go @@ -12,7 +12,7 @@ import ( ) func TestQueryTransformer(t *testing.T) { - executor := newExecutor(nil, &setting.Cfg{}, fakeSessionCache{}) + executor := newExecutor(nil, nil, &setting.Cfg{}, fakeSessionCache{}) t.Run("One cloudwatchQuery is generated when its request query has one stat", func(t *testing.T) { requestQueries := []*requestQuery{ { diff --git a/pkg/tsdb/cloudwatch/request_parser.go b/pkg/tsdb/cloudwatch/request_parser.go index fc66b3ca3b1..29a78522a29 100644 --- a/pkg/tsdb/cloudwatch/request_parser.go +++ b/pkg/tsdb/cloudwatch/request_parser.go @@ -10,22 +10,26 @@ import ( "time" "github.com/aws/aws-sdk-go/aws" + "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana/pkg/components/simplejson" - "github.com/grafana/grafana/pkg/plugins" ) // Parses the json queries and returns a requestQuery. The requestQuery has a 1 to 1 mapping to a query editor row -func (e *cloudWatchExecutor) parseQueries(queryContext plugins.DataQuery, startTime time.Time, - endTime time.Time) (map[string][]*requestQuery, error) { +func (e *cloudWatchExecutor) parseQueries(req *backend.QueryDataRequest, startTime time.Time, endTime time.Time) (map[string][]*requestQuery, error) { requestQueries := make(map[string][]*requestQuery) - for i, query := range queryContext.Queries { - queryType := query.Model.Get("type").MustString() + for _, query := range req.Queries { + model, err := simplejson.NewJson(query.JSON) + if err != nil { + return nil, &queryError{err: err, RefID: query.RefID} + } + + queryType := model.Get("type").MustString() if queryType != "timeSeriesQuery" && queryType != "" { continue } refID := query.RefID - query, err := parseRequestQuery(queryContext.Queries[i].Model, refID, startTime, endTime) + query, err := parseRequestQuery(model, refID, startTime, endTime) if err != nil { return nil, &queryError{err: err, RefID: refID} } diff --git a/pkg/tsdb/cloudwatch/test_utils.go b/pkg/tsdb/cloudwatch/test_utils.go index 9d28a99f70c..72206fdf5f5 100644 --- a/pkg/tsdb/cloudwatch/test_utils.go +++ b/pkg/tsdb/cloudwatch/test_utils.go @@ -15,37 +15,9 @@ import ( "github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi" "github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi/resourcegroupstaggingapiiface" "github.com/grafana/grafana-aws-sdk/pkg/awsds" - "github.com/grafana/grafana/pkg/components/securejsondata" - "github.com/grafana/grafana/pkg/components/simplejson" - "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/setting" ) -type fakeDataSourceCfg struct { - assumeRoleARN string - externalID string -} - -func fakeDataSource(cfgs ...fakeDataSourceCfg) *models.DataSource { - jsonData := simplejson.New() - jsonData.Set("defaultRegion", defaultRegion) - jsonData.Set("authType", "default") - for _, cfg := range cfgs { - if cfg.assumeRoleARN != "" { - jsonData.Set("assumeRoleArn", cfg.assumeRoleARN) - } - if cfg.externalID != "" { - jsonData.Set("externalId", cfg.externalID) - } - } - return &models.DataSource{ - Id: 1, - Database: "default", - JsonData: jsonData, - SecureJsonData: securejsondata.SecureJsonData{}, - } -} - type FakeCWLogsClient struct { cloudwatchlogsiface.CloudWatchLogsAPI logGroups cloudwatchlogs.DescribeLogGroupsOutput diff --git a/pkg/tsdb/cloudwatch/time_series_query.go b/pkg/tsdb/cloudwatch/time_series_query.go index 80e93b7230f..e2eed22706c 100644 --- a/pkg/tsdb/cloudwatch/time_series_query.go +++ b/pkg/tsdb/cloudwatch/time_series_query.go @@ -4,127 +4,122 @@ import ( "context" "fmt" + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana/pkg/infra/log" - "github.com/grafana/grafana/pkg/plugins" - "github.com/grafana/grafana/pkg/util/errutil" "golang.org/x/sync/errgroup" ) -func (e *cloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryContext plugins.DataQuery) ( - plugins.DataResponse, error) { +func (e *cloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { plog.Debug("Executing time series query") - startTime, err := queryContext.TimeRange.ParseFrom() - if err != nil { - return plugins.DataResponse{}, errutil.Wrap("failed to parse start time", err) - } - endTime, err := queryContext.TimeRange.ParseTo() - if err != nil { - return plugins.DataResponse{}, errutil.Wrap("failed to parse end time", err) - } - if !startTime.Before(endTime) { - return plugins.DataResponse{}, fmt.Errorf("invalid time range: start time must be before end time") - } - requestQueriesByRegion, err := e.parseQueries(queryContext, startTime, endTime) - if err != nil { - return plugins.DataResponse{}, err - } + resp := backend.NewQueryDataResponse() - if len(requestQueriesByRegion) == 0 { - return plugins.DataResponse{ - Results: make(map[string]plugins.DataQueryResult), - }, nil - } + for _, q := range req.Queries { + startTime := q.TimeRange.From + endTime := q.TimeRange.To + if !startTime.Before(endTime) { + return nil, fmt.Errorf("invalid time range: start time must be before end time") + } + + requestQueriesByRegion, err := e.parseQueries(req, startTime, endTime) + if err != nil { + return nil, err + } + + if len(requestQueriesByRegion) == 0 { + return backend.NewQueryDataResponse(), nil + } - resultChan := make(chan plugins.DataQueryResult, len(queryContext.Queries)) - eg, ectx := errgroup.WithContext(ctx) - for r, q := range requestQueriesByRegion { - requestQueries := q - region := r - eg.Go(func() error { - defer func() { - if err := recover(); err != nil { - plog.Error("Execute Get Metric Data Query Panic", "error", err, "stack", log.Stack(1)) - if theErr, ok := err.(error); ok { - resultChan <- plugins.DataQueryResult{ - Error: theErr, + resultChan := make(chan *backend.DataResponse, len(req.Queries)) + eg, ectx := errgroup.WithContext(ctx) + for r, q := range requestQueriesByRegion { + requestQueries := q + region := r + eg.Go(func() error { + defer func() { + if err := recover(); err != nil { + plog.Error("Execute Get Metric Data Query Panic", "error", err, "stack", log.Stack(1)) + if theErr, ok := err.(error); ok { + resultChan <- &backend.DataResponse{ + Error: theErr, + } } } + }() + + client, err := e.getCWClient(region, req.PluginContext) + if err != nil { + return err } - }() - - client, err := e.getCWClient(region) - if err != nil { - return err - } - - queries, err := e.transformRequestQueriesToCloudWatchQueries(requestQueries) - if err != nil { - for _, query := range requestQueries { - resultChan <- plugins.DataQueryResult{ - RefID: query.RefId, - Error: err, + + queries, err := e.transformRequestQueriesToCloudWatchQueries(requestQueries) + if err != nil { + for _, query := range requestQueries { + resultChan <- &backend.DataResponse{ + Frames: data.Frames{data.NewFrame(query.RefId)}, + Error: err, + } } + return nil } - return nil - } - - metricDataInput, err := e.buildMetricDataInput(startTime, endTime, queries) - if err != nil { - return err - } - - cloudwatchResponses := make([]*cloudwatchResponse, 0) - mdo, err := e.executeRequest(ectx, client, metricDataInput) - if err != nil { - for _, query := range requestQueries { - resultChan <- plugins.DataQueryResult{ - RefID: query.RefId, - Error: err, + + metricDataInput, err := e.buildMetricDataInput(startTime, endTime, queries) + if err != nil { + return err + } + + cloudwatchResponses := make([]*cloudwatchResponse, 0) + mdo, err := e.executeRequest(ectx, client, metricDataInput) + if err != nil { + for _, query := range requestQueries { + resultChan <- &backend.DataResponse{ + Frames: data.Frames{data.NewFrame(query.RefId)}, + Error: err, + } } + return nil } - return nil - } - - responses, err := e.parseResponse(mdo, queries) - if err != nil { - for _, query := range requestQueries { - resultChan <- plugins.DataQueryResult{ - RefID: query.RefId, - Error: err, + + responses, err := e.parseResponse(mdo, queries) + if err != nil { + for _, query := range requestQueries { + resultChan <- &backend.DataResponse{ + Frames: data.Frames{data.NewFrame(query.RefId)}, + Error: err, + } } + return nil } - return nil - } - - cloudwatchResponses = append(cloudwatchResponses, responses...) - res, err := e.transformQueryResponsesToQueryResult(cloudwatchResponses, requestQueries, startTime, endTime) - if err != nil { - for _, query := range requestQueries { - resultChan <- plugins.DataQueryResult{ - RefID: query.RefId, - Error: err, + + cloudwatchResponses = append(cloudwatchResponses, responses...) + res, err := e.transformQueryResponsesToQueryResult(cloudwatchResponses, requestQueries, startTime, endTime) + if err != nil { + for _, query := range requestQueries { + resultChan <- &backend.DataResponse{ + Frames: data.Frames{data.NewFrame(query.RefId)}, + Error: err, + } } + return nil + } + + for _, queryRes := range res { + resultChan <- queryRes } return nil - } + }) + } - for _, queryRes := range res { - resultChan <- queryRes - } - return nil - }) - } - if err := eg.Wait(); err != nil { - return plugins.DataResponse{}, err - } - close(resultChan) + if err := eg.Wait(); err != nil { + return nil, err + } + close(resultChan) - results := plugins.DataResponse{ - Results: make(map[string]plugins.DataQueryResult), - } - for result := range resultChan { - results.Results[result.RefID] = result + for result := range resultChan { + resp.Responses[q.RefID] = *result + } } - return results, nil + + return resp, nil } diff --git a/pkg/tsdb/cloudwatch/time_series_query_test.go b/pkg/tsdb/cloudwatch/time_series_query_test.go index d44b9bfe24a..a4d7349ed9e 100644 --- a/pkg/tsdb/cloudwatch/time_series_query_test.go +++ b/pkg/tsdb/cloudwatch/time_series_query_test.go @@ -3,25 +3,30 @@ package cloudwatch import ( "context" "testing" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/backend" - "github.com/grafana/grafana/pkg/plugins" "github.com/stretchr/testify/assert" ) func TestTimeSeriesQuery(t *testing.T) { - executor := newExecutor(nil, newTestConfig(), fakeSessionCache{}) + executor := newExecutor(nil, nil, newTestConfig(), fakeSessionCache{}) + now := time.Now() t.Run("End time before start time should result in error", func(t *testing.T) { - timeRange := plugins.NewDataTimeRange("now-1h", "now-2h") - _, err := executor.executeTimeSeriesQuery( - context.TODO(), plugins.DataQuery{TimeRange: &timeRange}) + _, err := executor.executeTimeSeriesQuery(context.TODO(), &backend.QueryDataRequest{Queries: []backend.DataQuery{{TimeRange: backend.TimeRange{ + From: now.Add(time.Hour * -1), + To: now.Add(time.Hour * -2), + }}}}) assert.EqualError(t, err, "invalid time range: start time must be before end time") }) t.Run("End time equals start time should result in error", func(t *testing.T) { - timeRange := plugins.NewDataTimeRange("now-1h", "now-1h") - _, err := executor.executeTimeSeriesQuery( - context.TODO(), plugins.DataQuery{TimeRange: &timeRange}) + _, err := executor.executeTimeSeriesQuery(context.TODO(), &backend.QueryDataRequest{Queries: []backend.DataQuery{{TimeRange: backend.TimeRange{ + From: now.Add(time.Hour * -1), + To: now.Add(time.Hour * -1), + }}}}) assert.EqualError(t, err, "invalid time range: start time must be before end time") }) } diff --git a/pkg/tsdb/service.go b/pkg/tsdb/service.go index d89e8ca459b..ffad1306024 100644 --- a/pkg/tsdb/service.go +++ b/pkg/tsdb/service.go @@ -60,7 +60,6 @@ func (s *Service) Init() error { s.registry["postgres"] = s.PostgresService.NewExecutor s.registry["mysql"] = mysql.NewExecutor s.registry["elasticsearch"] = elasticsearch.NewExecutor - s.registry["cloudwatch"] = s.CloudWatchService.NewExecutor s.registry["stackdriver"] = s.CloudMonitoringService.NewExecutor s.registry["grafana-azure-monitor-datasource"] = s.AzureMonitorService.NewExecutor s.registry["loki"] = loki.NewExecutor diff --git a/public/app/plugins/datasource/cloudwatch/datasource.ts b/public/app/plugins/datasource/cloudwatch/datasource.ts index 35124ab30aa..c56bbf782b1 100644 --- a/public/app/plugins/datasource/cloudwatch/datasource.ts +++ b/public/app/plugins/datasource/cloudwatch/datasource.ts @@ -32,7 +32,9 @@ import { LogRowModel, rangeUtil, ScopedVars, + TableData, TimeRange, + toLegacyResponseData, } from '@grafana/data'; import { notifyApp } from 'app/core/actions'; @@ -65,7 +67,7 @@ import { AwsUrl, encodeUrl } from './aws_url'; import { increasingInterval } from './utils/rxjs/increasingInterval'; import config from 'app/core/config'; -const TSDB_QUERY_ENDPOINT = '/api/tsdb/query'; +const DS_QUERY_ENDPOINT = '/api/ds/query'; // Constants also defined in tsdb/cloudwatch/cloudwatch.go const LOG_IDENTIFIER_INTERNAL = '__log__grafana_internal__'; @@ -184,9 +186,10 @@ export class CloudWatchDatasource extends DataSourceApi { - const channelName: string = response.results['A'].meta.channelName; + const dataQueryResponse = toDataQueryResponse({ data: response }, options.targets); + const channelName: string = dataQueryResponse.data[0].meta.custom.channelName; const channel = getGrafanaLiveSrv().getChannel({ scope: LiveChannelScope.Plugin, namespace: 'cloudwatch', @@ -543,7 +546,7 @@ export class CloudWatchDatasource extends DataSourceApi { - return this.awsRequest(TSDB_QUERY_ENDPOINT, request).pipe( + return this.awsRequest(DS_QUERY_ENDPOINT, request).pipe( map((res) => { const dataframes: DataFrame[] = toDataQueryResponse({ data: res }).data; if (!dataframes || dataframes.length <= 0) { @@ -576,8 +579,11 @@ export class CloudWatchDatasource extends DataSourceApi { - return suggestData.results['metricFindQuery'].tables[0].rows.map(([text, value]) => ({ + transformSuggestDataFromDataframes(suggestData: TSDBResponse): Array<{ text: any; label: any; value: any }> { + const frames = toDataQueryResponse({ data: suggestData }).data as DataFrame[]; + const table = toLegacyResponseData(frames[0]) as TableData; + + return table.rows.map(([text, value]) => ({ text, value, label: value, @@ -586,7 +592,7 @@ export class CloudWatchDatasource extends DataSourceApi> { const range = this.timeSrv.timeRange(); - return this.awsRequest(TSDB_QUERY_ENDPOINT, { + return this.awsRequest(DS_QUERY_ENDPOINT, { from: range.from.valueOf().toString(), to: range.to.valueOf().toString(), queries: [ @@ -603,7 +609,7 @@ export class CloudWatchDatasource extends DataSourceApi { - return this.transformSuggestDataFromTable(r); + return this.transformSuggestDataFromDataframes(r); }) ) .toPromise(); @@ -650,7 +656,7 @@ export class CloudWatchDatasource extends DataSourceApi toDataQueryResponse(val).data || []; - return this.awsRequest(TSDB_QUERY_ENDPOINT, requestParams).pipe( + return this.awsRequest(DS_QUERY_ENDPOINT, requestParams).pipe( map((response) => resultsToDataFrames({ data: response })), catchError((err) => { if (err.data?.error) { @@ -835,7 +841,7 @@ export class CloudWatchDatasource extends DataSourceApi { - return r.results['annotationQuery'].tables[0].rows.map((v) => ({ + const frames = toDataQueryResponse({ data: r }).data as DataFrame[]; + const table = toLegacyResponseData(frames[0]) as TableData; + return table.rows.map((v) => ({ annotation: annotation, time: Date.parse(v[0]), title: v[1],