refactor cloudwatch to support new tsdb interface

pull/8050/head
bergquist 8 years ago committed by Mitsuhiro Tanda
parent 4b34ff5b83
commit fe1d395d79
  1. 51
      pkg/tsdb/cloudwatch/cloudwatch.go
  2. 35
      pkg/tsdb/cloudwatch/metric_find_query.go

@ -3,6 +3,7 @@ package cloudwatch
import (
"context"
"errors"
"fmt"
"regexp"
"sort"
"strconv"
@ -27,10 +28,8 @@ type CloudWatchExecutor struct {
*models.DataSource
}
func NewCloudWatchExecutor(dsInfo *models.DataSource) (tsdb.Executor, error) {
return &CloudWatchExecutor{
DataSource: dsInfo,
}, nil
func NewCloudWatchExecutor(dsInfo *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
return &CloudWatchExecutor{}, nil
}
var (
@ -41,7 +40,7 @@ var (
func init() {
plog = log.New("tsdb.cloudwatch")
tsdb.RegisterExecutor("cloudwatch", NewCloudWatchExecutor)
tsdb.RegisterTsdbQueryEndpoint("cloudwatch", NewCloudWatchExecutor)
standardStatistics = map[string]bool{
"Average": true,
"Maximum": true,
@ -52,37 +51,43 @@ func init() {
aliasFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`)
}
func (e *CloudWatchExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, queryContext *tsdb.QueryContext) *tsdb.BatchResult {
var result *tsdb.BatchResult
queryType := queries[0].Model.Get("type").MustString()
func (e *CloudWatchExecutor) Query(ctx context.Context, dsInfo *models.DataSource, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) {
var result *tsdb.Response
e.DataSource = dsInfo
queryType := queryContext.Queries[0].Model.Get("type").MustString("")
var err error
switch queryType {
case "timeSeriesQuery":
result = e.executeTimeSeriesQuery(ctx, queries, queryContext)
result, err = e.executeTimeSeriesQuery(ctx, queryContext)
break
case "metricFindQuery":
result = e.executeMetricFindQuery(ctx, queries, queryContext)
result, err = e.executeMetricFindQuery(ctx, queryContext)
break
default:
err = fmt.Errorf("missing querytype")
}
return result
return result, err
}
func (e *CloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queries tsdb.QuerySlice, queryContext *tsdb.QueryContext) *tsdb.BatchResult {
result := &tsdb.BatchResult{
QueryResults: make(map[string]*tsdb.QueryResult),
func (e *CloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) {
result := &tsdb.Response{
Results: make(map[string]*tsdb.QueryResult),
}
errCh := make(chan error, 1)
resCh := make(chan *tsdb.QueryResult, 1)
currentlyExecuting := 0
for _, model := range queries {
for i, model := range queryContext.Queries {
queryType := model.Model.Get("type").MustString()
if queryType != "timeSeriesQuery" {
continue
}
currentlyExecuting++
go func(refId string) {
queryRes, err := e.executeQuery(ctx, model.Model.Get("parameters"), queryContext)
go func(refId string, index int) {
queryRes, err := e.executeQuery(ctx, queryContext.Queries[index].Model.Get("parameters"), queryContext)
currentlyExecuting--
if err != nil {
errCh <- err
@ -90,21 +95,21 @@ func (e *CloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queries
queryRes.RefId = refId
resCh <- queryRes
}
}(model.RefId)
}(model.RefId, i)
}
for currentlyExecuting != 0 {
select {
case res := <-resCh:
result.QueryResults[res.RefId] = res
result.Results[res.RefId] = res
case err := <-errCh:
return result.WithError(err)
return result, err
case <-ctx.Done():
return result.WithError(ctx.Err())
return result, ctx.Err()
}
}
return result
return result, nil
}
func (e *CloudWatchExecutor) getClient(region string) (*cloudwatch.CloudWatch, error) {
@ -148,7 +153,7 @@ func (e *CloudWatchExecutor) getClient(region string) (*cloudwatch.CloudWatch, e
return client, nil
}
func (e *CloudWatchExecutor) executeQuery(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.QueryContext) (*tsdb.QueryResult, error) {
func (e *CloudWatchExecutor) executeQuery(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.TsdbQuery) (*tsdb.QueryResult, error) {
query, err := parseQuery(parameters)
if err != nil {
return nil, err

@ -153,14 +153,15 @@ func init() {
customMetricsDimensionsMap = make(map[string]map[string]map[string]*CustomMetricsCache)
}
func (e *CloudWatchExecutor) executeMetricFindQuery(ctx context.Context, queries tsdb.QuerySlice, queryContext *tsdb.QueryContext) *tsdb.BatchResult {
result := &tsdb.BatchResult{
QueryResults: make(map[string]*tsdb.QueryResult),
func (e *CloudWatchExecutor) executeMetricFindQuery(ctx context.Context, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) {
result := &tsdb.Response{
Results: make(map[string]*tsdb.QueryResult),
}
queryResult := &tsdb.QueryResult{Meta: simplejson.New(), RefId: queries[0].RefId}
firstQuery := queryContext.Queries[0]
queryResult := &tsdb.QueryResult{Meta: simplejson.New(), RefId: firstQuery.RefId}
parameters := queries[0].Model.Get("parameters")
subType := queries[0].Model.Get("subtype").MustString()
parameters := firstQuery.Model.Get("parameters")
subType := firstQuery.Model.Get("subtype").MustString()
var data []suggestData
var err error
switch subType {
@ -186,12 +187,10 @@ func (e *CloudWatchExecutor) executeMetricFindQuery(ctx context.Context, queries
data, err = e.handleGetEc2InstanceAttribute(ctx, parameters, queryContext)
break
}
if err != nil {
queryResult.Error = err
}
transformToTable(data, queryResult)
result.QueryResults[queries[0].RefId] = queryResult
return result
result.Results[firstQuery.RefId] = queryResult
return result, err
}
func transformToTable(data []suggestData, result *tsdb.QueryResult) {
@ -238,7 +237,7 @@ func (e *CloudWatchExecutor) getDsInfo(region string) *cwapi.DatasourceInfo {
// Whenever this list is updated, frontend list should also be updated.
// Please update the region list in public/app/plugins/datasource/cloudwatch/partials/config.html
func (e *CloudWatchExecutor) handleGetRegions(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.QueryContext) ([]suggestData, error) {
func (e *CloudWatchExecutor) handleGetRegions(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.TsdbQuery) ([]suggestData, error) {
regions := []string{
"ap-northeast-1", "ap-northeast-2", "ap-southeast-1", "ap-southeast-2", "ap-south-1", "ca-central-1", "cn-north-1",
"eu-central-1", "eu-west-1", "eu-west-2", "sa-east-1", "us-east-1", "us-east-2", "us-gov-west-1", "us-west-1", "us-west-2",
@ -252,7 +251,7 @@ func (e *CloudWatchExecutor) handleGetRegions(ctx context.Context, parameters *s
return result, nil
}
func (e *CloudWatchExecutor) handleGetNamespaces(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.QueryContext) ([]suggestData, error) {
func (e *CloudWatchExecutor) handleGetNamespaces(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.TsdbQuery) ([]suggestData, error) {
keys := []string{}
for key := range metricsMap {
keys = append(keys, key)
@ -273,7 +272,7 @@ func (e *CloudWatchExecutor) handleGetNamespaces(ctx context.Context, parameters
return result, nil
}
func (e *CloudWatchExecutor) handleGetMetrics(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.QueryContext) ([]suggestData, error) {
func (e *CloudWatchExecutor) handleGetMetrics(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.TsdbQuery) ([]suggestData, error) {
region := parameters.Get("region").MustString()
namespace := parameters.Get("namespace").MustString()
@ -302,7 +301,7 @@ func (e *CloudWatchExecutor) handleGetMetrics(ctx context.Context, parameters *s
return result, nil
}
func (e *CloudWatchExecutor) handleGetDimensions(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.QueryContext) ([]suggestData, error) {
func (e *CloudWatchExecutor) handleGetDimensions(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.TsdbQuery) ([]suggestData, error) {
region := parameters.Get("region").MustString()
namespace := parameters.Get("namespace").MustString()
@ -331,7 +330,7 @@ func (e *CloudWatchExecutor) handleGetDimensions(ctx context.Context, parameters
return result, nil
}
func (e *CloudWatchExecutor) handleGetDimensionValues(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.QueryContext) ([]suggestData, error) {
func (e *CloudWatchExecutor) handleGetDimensionValues(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.TsdbQuery) ([]suggestData, error) {
region := parameters.Get("region").MustString()
namespace := parameters.Get("namespace").MustString()
metricName := parameters.Get("metricName").MustString()
@ -374,7 +373,7 @@ func (e *CloudWatchExecutor) handleGetDimensionValues(ctx context.Context, param
return result, nil
}
func (e *CloudWatchExecutor) handleGetEbsVolumeIds(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.QueryContext) ([]suggestData, error) {
func (e *CloudWatchExecutor) handleGetEbsVolumeIds(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.TsdbQuery) ([]suggestData, error) {
region := parameters.Get("region").MustString()
instanceId := parameters.Get("instanceId").MustString()
@ -392,7 +391,7 @@ func (e *CloudWatchExecutor) handleGetEbsVolumeIds(ctx context.Context, paramete
return result, nil
}
func (e *CloudWatchExecutor) handleGetEc2InstanceAttribute(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.QueryContext) ([]suggestData, error) {
func (e *CloudWatchExecutor) handleGetEc2InstanceAttribute(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.TsdbQuery) ([]suggestData, error) {
region := parameters.Get("region").MustString()
attributeName := parameters.Get("attributeName").MustString()
filterJson := parameters.Get("filters").MustMap()

Loading…
Cancel
Save