diff --git a/pkg/api/pluginproxy/ds_auth_provider.go b/pkg/api/pluginproxy/ds_auth_provider.go index 99bb8903ddbb..cd41a815c0ba 100644 --- a/pkg/api/pluginproxy/ds_auth_provider.go +++ b/pkg/api/pluginproxy/ds_auth_provider.go @@ -6,28 +6,28 @@ import ( "net/http" "net/url" "strings" + "time" - "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/plugins" - "github.com/grafana/grafana/pkg/services/encryption" "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/util" ) +type DSInfo struct { + ID int64 + Updated time.Time + JSONData map[string]interface{} + DecryptedSecureJSONData map[string]string +} + // ApplyRoute should use the plugin route data to set auth headers and custom headers. func ApplyRoute(ctx context.Context, req *http.Request, proxyPath string, route *plugins.AppPluginRoute, - ds *models.DataSource, cfg *setting.Cfg, encryptionService encryption.Service) { + ds DSInfo, cfg *setting.Cfg) { proxyPath = strings.TrimPrefix(proxyPath, route.Path) - secureJsonData, err := encryptionService.DecryptJsonData(ctx, ds.SecureJsonData, setting.SecretKey) - if err != nil { - logger.Error("Error interpolating proxy url", "error", err) - return - } - data := templateData{ - JsonData: ds.JsonData.Interface().(map[string]interface{}), - SecureJsonData: secureJsonData, + JsonData: ds.JSONData, + SecureJsonData: ds.DecryptedSecureJSONData, } if len(route.URL) > 0 { @@ -76,12 +76,12 @@ func ApplyRoute(ctx context.Context, req *http.Request, proxyPath string, route } } -func getTokenProvider(ctx context.Context, cfg *setting.Cfg, ds *models.DataSource, pluginRoute *plugins.AppPluginRoute, +func getTokenProvider(ctx context.Context, cfg *setting.Cfg, ds DSInfo, pluginRoute *plugins.AppPluginRoute, data templateData) (accessTokenProvider, error) { authType := pluginRoute.AuthType // Plugin can override authentication type specified in route configuration - if authTypeOverride := ds.JsonData.Get("authenticationType").MustString(); authTypeOverride != "" { + if authTypeOverride, ok := ds.JSONData["authenticationType"].(string); ok && authTypeOverride != "" { authType = authTypeOverride } diff --git a/pkg/api/pluginproxy/ds_proxy.go b/pkg/api/pluginproxy/ds_proxy.go index 1f7d0e158dbc..185518060815 100644 --- a/pkg/api/pluginproxy/ds_proxy.go +++ b/pkg/api/pluginproxy/ds_proxy.go @@ -238,16 +238,32 @@ func (proxy *DataSourceProxy) director(req *http.Request) { req.Header.Set("User-Agent", fmt.Sprintf("Grafana/%s", setting.BuildVersion)) - // Clear Origin and Referer to avoir CORS issues + // Clear Origin and Referer to avoid CORS issues req.Header.Del("Origin") req.Header.Del("Referer") + jsonData := make(map[string]interface{}) + if proxy.ds.JsonData != nil { + jsonData, err = proxy.ds.JsonData.Map() + if err != nil { + logger.Error("Failed to get json data as map", "jsonData", proxy.ds.JsonData, "error", err) + return + } + } + + secureJsonData, err := proxy.dataSourcesService.EncryptionService.DecryptJsonData(req.Context(), proxy.ds.SecureJsonData, setting.SecretKey) + if err != nil { + logger.Error("Error interpolating proxy url", "error", err) + return + } + if proxy.route != nil { - ApplyRoute( - proxy.ctx.Req.Context(), req, proxy.proxyPath, - proxy.route, proxy.ds, proxy.cfg, - proxy.dataSourcesService.EncryptionService, - ) + ApplyRoute(proxy.ctx.Req.Context(), req, proxy.proxyPath, proxy.route, DSInfo{ + ID: proxy.ds.Id, + Updated: proxy.ds.Updated, + JSONData: jsonData, + DecryptedSecureJSONData: secureJsonData, + }, proxy.cfg) } if proxy.oAuthTokenService.IsOAuthPassThruEnabled(proxy.ds) { diff --git a/pkg/api/pluginproxy/ds_proxy_test.go b/pkg/api/pluginproxy/ds_proxy_test.go index 978e3f69b164..b824bae81556 100644 --- a/pkg/api/pluginproxy/ds_proxy_test.go +++ b/pkg/api/pluginproxy/ds_proxy_test.go @@ -99,6 +99,17 @@ func TestDataSourceProxy_routeRule(t *testing.T) { }, } + jd, err := ds.JsonData.Map() + require.NoError(t, err) + dsInfo := DSInfo{ + ID: ds.Id, + Updated: ds.Updated, + JSONData: jd, + DecryptedSecureJSONData: map[string]string{ + "key": "123", + }, + } + setUp := func() (*models.ReqContext, *http.Request) { req, err := http.NewRequest("GET", "http://localhost/asd", nil) require.NoError(t, err) @@ -117,7 +128,7 @@ func TestDataSourceProxy_routeRule(t *testing.T) { proxy, err := NewDataSourceProxy(ds, plugin, ctx, "api/v4/some/method", cfg, httpClientProvider, &oauthtoken.Service{}, dsService) require.NoError(t, err) proxy.route = plugin.Routes[0] - ApplyRoute(proxy.ctx.Req.Context(), req, proxy.proxyPath, proxy.route, proxy.ds, cfg, ossencryption.ProvideService()) + ApplyRoute(proxy.ctx.Req.Context(), req, proxy.proxyPath, proxy.route, dsInfo, cfg) assert.Equal(t, "https://www.google.com/some/method", req.URL.String()) assert.Equal(t, "my secret 123", req.Header.Get("x-header")) @@ -129,7 +140,7 @@ func TestDataSourceProxy_routeRule(t *testing.T) { proxy, err := NewDataSourceProxy(ds, plugin, ctx, "api/common/some/method", cfg, httpClientProvider, &oauthtoken.Service{}, dsService) require.NoError(t, err) proxy.route = plugin.Routes[3] - ApplyRoute(proxy.ctx.Req.Context(), req, proxy.proxyPath, proxy.route, proxy.ds, cfg, ossencryption.ProvideService()) + ApplyRoute(proxy.ctx.Req.Context(), req, proxy.proxyPath, proxy.route, dsInfo, cfg) assert.Equal(t, "https://dynamic.grafana.com/some/method?apiKey=123", req.URL.String()) assert.Equal(t, "my secret 123", req.Header.Get("x-header")) @@ -141,7 +152,7 @@ func TestDataSourceProxy_routeRule(t *testing.T) { proxy, err := NewDataSourceProxy(ds, plugin, ctx, "", cfg, httpClientProvider, &oauthtoken.Service{}, dsService) require.NoError(t, err) proxy.route = plugin.Routes[4] - ApplyRoute(proxy.ctx.Req.Context(), req, proxy.proxyPath, proxy.route, proxy.ds, cfg, ossencryption.ProvideService()) + ApplyRoute(proxy.ctx.Req.Context(), req, proxy.proxyPath, proxy.route, dsInfo, cfg) assert.Equal(t, "http://localhost/asd", req.URL.String()) }) @@ -152,7 +163,7 @@ func TestDataSourceProxy_routeRule(t *testing.T) { proxy, err := NewDataSourceProxy(ds, plugin, ctx, "api/body", cfg, httpClientProvider, &oauthtoken.Service{}, dsService) require.NoError(t, err) proxy.route = plugin.Routes[5] - ApplyRoute(proxy.ctx.Req.Context(), req, proxy.proxyPath, proxy.route, proxy.ds, cfg, ossencryption.ProvideService()) + ApplyRoute(proxy.ctx.Req.Context(), req, proxy.proxyPath, proxy.route, dsInfo, cfg) content, err := ioutil.ReadAll(req.Body) require.NoError(t, err) @@ -262,10 +273,21 @@ func TestDataSourceProxy_routeRule(t *testing.T) { cfg := &setting.Cfg{} + jd, err := ds.JsonData.Map() + require.NoError(t, err) + dsInfo := DSInfo{ + ID: ds.Id, + Updated: ds.Updated, + JSONData: jd, + DecryptedSecureJSONData: map[string]string{ + "clientSecret": "123", + }, + } + dsService := datasources.ProvideService(bus.New(), nil, ossencryption.ProvideService()) proxy, err := NewDataSourceProxy(ds, plugin, ctx, "pathwithtoken1", cfg, httpClientProvider, &oauthtoken.Service{}, dsService) require.NoError(t, err) - ApplyRoute(proxy.ctx.Req.Context(), req, proxy.proxyPath, plugin.Routes[0], proxy.ds, cfg, ossencryption.ProvideService()) + ApplyRoute(proxy.ctx.Req.Context(), req, proxy.proxyPath, plugin.Routes[0], dsInfo, cfg) authorizationHeaderCall1 = req.Header.Get("Authorization") assert.Equal(t, "https://api.nr1.io/some/path", req.URL.String()) @@ -281,7 +303,7 @@ func TestDataSourceProxy_routeRule(t *testing.T) { dsService := datasources.ProvideService(bus.New(), nil, ossencryption.ProvideService()) proxy, err := NewDataSourceProxy(ds, plugin, ctx, "pathwithtoken2", cfg, httpClientProvider, &oauthtoken.Service{}, dsService) require.NoError(t, err) - ApplyRoute(proxy.ctx.Req.Context(), req, proxy.proxyPath, plugin.Routes[1], proxy.ds, cfg, ossencryption.ProvideService()) + ApplyRoute(proxy.ctx.Req.Context(), req, proxy.proxyPath, plugin.Routes[1], dsInfo, cfg) authorizationHeaderCall2 = req.Header.Get("Authorization") @@ -298,7 +320,7 @@ func TestDataSourceProxy_routeRule(t *testing.T) { dsService := datasources.ProvideService(bus.New(), nil, ossencryption.ProvideService()) proxy, err := NewDataSourceProxy(ds, plugin, ctx, "pathwithtoken1", cfg, httpClientProvider, &oauthtoken.Service{}, dsService) require.NoError(t, err) - ApplyRoute(proxy.ctx.Req.Context(), req, proxy.proxyPath, plugin.Routes[0], proxy.ds, cfg, ossencryption.ProvideService()) + ApplyRoute(proxy.ctx.Req.Context(), req, proxy.proxyPath, plugin.Routes[0], dsInfo, cfg) authorizationHeaderCall3 := req.Header.Get("Authorization") assert.Equal(t, "https://api.nr1.io/some/path", req.URL.String()) diff --git a/pkg/api/pluginproxy/token_provider_gce.go b/pkg/api/pluginproxy/token_provider_gce.go index 1ba10aeb4685..b1f7909c5dad 100644 --- a/pkg/api/pluginproxy/token_provider_gce.go +++ b/pkg/api/pluginproxy/token_provider_gce.go @@ -2,25 +2,25 @@ package pluginproxy import ( "context" + "time" - "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/plugins" "golang.org/x/oauth2/google" ) type gceAccessTokenProvider struct { datasourceId int64 - datasourceVersion int + datasourceUpdated time.Time ctx context.Context route *plugins.AppPluginRoute authParams *plugins.JwtTokenAuth } -func newGceAccessTokenProvider(ctx context.Context, ds *models.DataSource, pluginRoute *plugins.AppPluginRoute, +func newGceAccessTokenProvider(ctx context.Context, ds DSInfo, pluginRoute *plugins.AppPluginRoute, authParams *plugins.JwtTokenAuth) *gceAccessTokenProvider { return &gceAccessTokenProvider{ - datasourceId: ds.Id, - datasourceVersion: ds.Version, + datasourceId: ds.ID, + datasourceUpdated: ds.Updated, ctx: ctx, route: pluginRoute, authParams: authParams, diff --git a/pkg/api/pluginproxy/token_provider_generic.go b/pkg/api/pluginproxy/token_provider_generic.go index fe3b5428202c..16091bf514a8 100644 --- a/pkg/api/pluginproxy/token_provider_generic.go +++ b/pkg/api/pluginproxy/token_provider_generic.go @@ -10,7 +10,6 @@ import ( "sync" "time" - "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/plugins" ) @@ -27,7 +26,7 @@ type tokenCacheType struct { type genericAccessTokenProvider struct { datasourceId int64 - datasourceVersion int + datasourceUpdated time.Time route *plugins.AppPluginRoute authParams *plugins.JwtTokenAuth } @@ -68,11 +67,11 @@ func (token *jwtToken) UnmarshalJSON(b []byte) error { return nil } -func newGenericAccessTokenProvider(ds *models.DataSource, pluginRoute *plugins.AppPluginRoute, +func newGenericAccessTokenProvider(ds DSInfo, pluginRoute *plugins.AppPluginRoute, authParams *plugins.JwtTokenAuth) *genericAccessTokenProvider { return &genericAccessTokenProvider{ - datasourceId: ds.Id, - datasourceVersion: ds.Version, + datasourceId: ds.ID, + datasourceUpdated: ds.Updated, route: pluginRoute, authParams: authParams, } @@ -124,5 +123,5 @@ func (provider *genericAccessTokenProvider) GetAccessToken() (string, error) { } func (provider *genericAccessTokenProvider) getAccessTokenCacheKey() string { - return fmt.Sprintf("%v_%v_%v_%v", provider.datasourceId, provider.datasourceVersion, provider.route.Path, provider.route.Method) + return fmt.Sprintf("%v_%v_%v_%v", provider.datasourceId, provider.datasourceUpdated.Unix(), provider.route.Path, provider.route.Method) } diff --git a/pkg/api/pluginproxy/token_provider_jwt.go b/pkg/api/pluginproxy/token_provider_jwt.go index 05e9c8ff079f..27ee7e97af0f 100644 --- a/pkg/api/pluginproxy/token_provider_jwt.go +++ b/pkg/api/pluginproxy/token_provider_jwt.go @@ -6,7 +6,6 @@ import ( "sync" "time" - "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/plugins" "golang.org/x/oauth2" "golang.org/x/oauth2/jwt" @@ -25,17 +24,17 @@ type oauthJwtTokenCacheType struct { type jwtAccessTokenProvider struct { datasourceId int64 - datasourceVersion int + datasourceUpdated time.Time ctx context.Context route *plugins.AppPluginRoute authParams *plugins.JwtTokenAuth } -func newJwtAccessTokenProvider(ctx context.Context, ds *models.DataSource, pluginRoute *plugins.AppPluginRoute, +func newJwtAccessTokenProvider(ctx context.Context, ds DSInfo, pluginRoute *plugins.AppPluginRoute, authParams *plugins.JwtTokenAuth) *jwtAccessTokenProvider { return &jwtAccessTokenProvider{ - datasourceId: ds.Id, - datasourceVersion: ds.Version, + datasourceId: ds.ID, + datasourceUpdated: ds.Updated, ctx: ctx, route: pluginRoute, authParams: authParams, @@ -93,5 +92,5 @@ var getTokenSource = func(conf *jwt.Config, ctx context.Context) (*oauth2.Token, } func (provider *jwtAccessTokenProvider) getAccessTokenCacheKey() string { - return fmt.Sprintf("%v_%v_%v_%v", provider.datasourceId, provider.datasourceVersion, provider.route.Path, provider.route.Method) + return fmt.Sprintf("%v_%v_%v_%v", provider.datasourceId, provider.datasourceUpdated.Unix(), provider.route.Path, provider.route.Method) } diff --git a/pkg/api/pluginproxy/token_provider_test.go b/pkg/api/pluginproxy/token_provider_test.go index 394c14534463..d2a89a89ace8 100644 --- a/pkg/api/pluginproxy/token_provider_test.go +++ b/pkg/api/pluginproxy/token_provider_test.go @@ -12,7 +12,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/plugins" "golang.org/x/oauth2" "golang.org/x/oauth2/jwt" @@ -63,7 +62,7 @@ func TestAccessToken_pluginWithJWTTokenAuthRoute(t *testing.T) { getTokenSource = fn } - ds := &models.DataSource{Id: 1, Version: 2} + ds := DSInfo{ID: 1, Updated: time.Now()} t.Run("should fetch token using JWT private key", func(t *testing.T) { setUp(t, func(conf *jwt.Config, ctx context.Context) (*oauth2.Token, error) { @@ -170,7 +169,7 @@ func TestAccessToken_pluginWithTokenAuthRoute(t *testing.T) { mockTimeNow(time.Now()) defer resetTimeNow() - provider := newGenericAccessTokenProvider(&models.DataSource{}, pluginRoute, authParams) + provider := newGenericAccessTokenProvider(DSInfo{}, pluginRoute, authParams) testCases := []tokenTestDescription{ { @@ -252,7 +251,7 @@ func TestAccessToken_pluginWithTokenAuthRoute(t *testing.T) { mockTimeNow(time.Now()) defer resetTimeNow() - provider := newGenericAccessTokenProvider(&models.DataSource{}, pluginRoute, authParams) + provider := newGenericAccessTokenProvider(DSInfo{}, pluginRoute, authParams) token = map[string]interface{}{ "access_token": "2YotnFZFEjr1zCsicMWpAA", diff --git a/pkg/server/backgroundsvcs/background_services.go b/pkg/server/backgroundsvcs/background_services.go index 3c185aaf2132..df53bfa55bb4 100644 --- a/pkg/server/backgroundsvcs/background_services.go +++ b/pkg/server/backgroundsvcs/background_services.go @@ -23,6 +23,7 @@ import ( "github.com/grafana/grafana/pkg/services/rendering" "github.com/grafana/grafana/pkg/services/secrets" "github.com/grafana/grafana/pkg/tsdb/azuremonitor" + "github.com/grafana/grafana/pkg/tsdb/cloudmonitoring" "github.com/grafana/grafana/pkg/tsdb/cloudwatch" "github.com/grafana/grafana/pkg/tsdb/elasticsearch" "github.com/grafana/grafana/pkg/tsdb/grafanads" @@ -49,7 +50,7 @@ func ProvideBackgroundServiceRegistry( _ *azuremonitor.Service, _ *cloudwatch.CloudWatchService, _ *elasticsearch.Service, _ *graphite.Service, _ *influxdb.Service, _ *loki.Service, _ *opentsdb.Service, _ *prometheus.Service, _ *tempo.Service, _ *testdatasource.TestDataPlugin, _ *plugindashboards.Service, _ *dashboardsnapshots.Service, _ secrets.SecretsService, - _ *postgres.Service, _ *mysql.Service, _ *mssql.Service, _ *grafanads.Service, + _ *postgres.Service, _ *mysql.Service, _ *mssql.Service, _ *grafanads.Service, _ *cloudmonitoring.Service, _ *pluginsettings.Service, _ *alerting.AlertNotificationService, ) *BackgroundServiceRegistry { return NewBackgroundServiceRegistry( diff --git a/pkg/tsdb/cloudmonitoring/annotation_query.go b/pkg/tsdb/cloudmonitoring/annotation_query.go index 0d2dca0cbbbc..81859abd9d16 100644 --- a/pkg/tsdb/cloudmonitoring/annotation_query.go +++ b/pkg/tsdb/cloudmonitoring/annotation_query.go @@ -2,63 +2,65 @@ package cloudmonitoring import ( "context" + "encoding/json" "strings" - "github.com/grafana/grafana/pkg/plugins" + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/data" ) -//nolint: staticcheck // plugins.DataPlugin deprecated -func (e *Executor) executeAnnotationQuery(ctx context.Context, tsdbQuery plugins.DataQuery) ( - plugins.DataResponse, error) { - result := plugins.DataResponse{ - Results: make(map[string]plugins.DataQueryResult), - } - - firstQuery := tsdbQuery.Queries[0] +func (s *Service) executeAnnotationQuery(ctx context.Context, req *backend.QueryDataRequest, dsInfo datasourceInfo) ( + *backend.QueryDataResponse, error) { + resp := backend.NewQueryDataResponse() - queries, err := e.buildQueryExecutors(tsdbQuery) + queries, err := s.buildQueryExecutors(req) if err != nil { - return plugins.DataResponse{}, err + return resp, err } - queryRes, resp, _, err := queries[0].run(ctx, tsdbQuery, e) + queryRes, dr, _, err := queries[0].run(ctx, req, s, dsInfo) if err != nil { - return plugins.DataResponse{}, err + return resp, err } - metricQuery := firstQuery.Model.Get("metricQuery") - title := metricQuery.Get("title").MustString() - text := metricQuery.Get("text").MustString() - tags := metricQuery.Get("tags").MustString() + mq := struct { + Title string `json:"title"` + Text string `json:"text"` + Tags string `json:"tags"` + }{} - err = queries[0].parseToAnnotations(&queryRes, resp, title, text, tags) - result.Results[firstQuery.RefID] = queryRes + firstQuery := req.Queries[0] + err = json.Unmarshal(firstQuery.JSON, &mq) + if err != nil { + return resp, nil + } + err = queries[0].parseToAnnotations(queryRes, dr, mq.Title, mq.Text, mq.Tags) + resp.Responses[firstQuery.RefID] = *queryRes - return result, err + return resp, err } -//nolint: staticcheck // plugins.DataPlugin deprecated -func transformAnnotationToTable(data []map[string]string, result *plugins.DataQueryResult) { - table := plugins.DataTable{ - Columns: make([]plugins.DataTableColumn, 4), - Rows: make([]plugins.DataRowValues, 0), - } - table.Columns[0].Text = "time" - table.Columns[1].Text = "title" - table.Columns[2].Text = "tags" - table.Columns[3].Text = "text" - - for _, r := range data { - values := make([]interface{}, 4) - values[0] = r["time"] - values[1] = r["title"] - values[2] = r["tags"] - values[3] = r["text"] - table.Rows = append(table.Rows, values) +func (timeSeriesQuery cloudMonitoringTimeSeriesQuery) transformAnnotationToFrame(annotations []map[string]string, result *backend.DataResponse) { + frames := data.Frames{} + for _, a := range annotations { + frame := &data.Frame{ + RefID: timeSeriesQuery.getRefID(), + Fields: []*data.Field{ + data.NewField("time", nil, a["time"]), + data.NewField("title", nil, a["title"]), + data.NewField("tags", nil, a["tags"]), + data.NewField("text", nil, a["text"]), + }, + Meta: &data.FrameMeta{ + Custom: map[string]interface{}{ + "rowCount": len(a), + }, + }, + } + frames = append(frames, frame) } - result.Tables = append(result.Tables, table) - result.Meta.Set("rowCount", len(data)) - slog.Info("anno", "len", len(data)) + result.Frames = frames + slog.Info("anno", "len", len(annotations)) } func formatAnnotationText(annotationText string, pointValue string, metricType string, metricLabels map[string]string, resourceLabels map[string]string) string { diff --git a/pkg/tsdb/cloudmonitoring/annotation_query_test.go b/pkg/tsdb/cloudmonitoring/annotation_query_test.go index c62f41bae8f0..b02cca21e7f5 100644 --- a/pkg/tsdb/cloudmonitoring/annotation_query_test.go +++ b/pkg/tsdb/cloudmonitoring/annotation_query_test.go @@ -3,8 +3,7 @@ package cloudmonitoring import ( "testing" - "github.com/grafana/grafana/pkg/components/simplejson" - "github.com/grafana/grafana/pkg/plugins" + "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -14,25 +13,21 @@ func TestExecutor_parseToAnnotations(t *testing.T) { require.NoError(t, err) require.Len(t, d.TimeSeries, 3) - //nolint: staticcheck // plugins.DataPlugin deprecated - res := &plugins.DataQueryResult{Meta: simplejson.New(), RefID: "annotationQuery"} + res := &backend.DataResponse{} query := &cloudMonitoringTimeSeriesFilter{} err = query.parseToAnnotations(res, d, "atitle {{metric.label.instance_name}} {{metric.value}}", "atext {{resource.label.zone}}", "atag") require.NoError(t, err) - decoded, err := res.Dataframes.Decoded() - require.NoError(t, err) - require.Len(t, decoded, 3) - assert.Equal(t, "title", decoded[0].Fields[1].Name) - assert.Equal(t, "tags", decoded[0].Fields[2].Name) - assert.Equal(t, "text", decoded[0].Fields[3].Name) + require.Len(t, res.Frames, 3) + assert.Equal(t, "title", res.Frames[0].Fields[1].Name) + assert.Equal(t, "tags", res.Frames[0].Fields[2].Name) + assert.Equal(t, "text", res.Frames[0].Fields[3].Name) } func TestCloudMonitoringExecutor_parseToAnnotations_emptyTimeSeries(t *testing.T) { - //nolint: staticcheck // plugins.DataPlugin deprecated - res := &plugins.DataQueryResult{Meta: simplejson.New(), RefID: "annotationQuery"} + res := &backend.DataResponse{} query := &cloudMonitoringTimeSeriesFilter{} response := cloudMonitoringResponse{ @@ -42,14 +37,11 @@ func TestCloudMonitoringExecutor_parseToAnnotations_emptyTimeSeries(t *testing.T err := query.parseToAnnotations(res, response, "atitle", "atext", "atag") require.NoError(t, err) - decoded, err := res.Dataframes.Decoded() - require.NoError(t, err) - require.Len(t, decoded, 0) + require.Len(t, res.Frames, 0) } func TestCloudMonitoringExecutor_parseToAnnotations_noPointsInSeries(t *testing.T) { - //nolint: staticcheck // plugins.DataPlugin deprecated - res := &plugins.DataQueryResult{Meta: simplejson.New(), RefID: "annotationQuery"} + res := &backend.DataResponse{} query := &cloudMonitoringTimeSeriesFilter{} response := cloudMonitoringResponse{ @@ -61,6 +53,5 @@ func TestCloudMonitoringExecutor_parseToAnnotations_noPointsInSeries(t *testing. err := query.parseToAnnotations(res, response, "atitle", "atext", "atag") require.NoError(t, err) - decoded, _ := res.Dataframes.Decoded() - require.Len(t, decoded, 0) + require.Len(t, res.Frames, 0) } diff --git a/pkg/tsdb/cloudmonitoring/cloudmonitoring.go b/pkg/tsdb/cloudmonitoring/cloudmonitoring.go index f79b12705485..4ed67d7d1a2f 100644 --- a/pkg/tsdb/cloudmonitoring/cloudmonitoring.go +++ b/pkg/tsdb/cloudmonitoring/cloudmonitoring.go @@ -16,18 +16,22 @@ import ( "strings" "time" - "github.com/grafana/grafana/pkg/services/encryption" + "golang.org/x/oauth2/google" + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/backend/datasource" + "github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt" "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/grafana/grafana/pkg/api/pluginproxy" - "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/infra/httpclient" "github.com/grafana/grafana/pkg/infra/log" - "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/plugins" + "github.com/grafana/grafana/pkg/plugins/backendplugin" + "github.com/grafana/grafana/pkg/plugins/backendplugin/coreplugin" "github.com/grafana/grafana/pkg/services/datasources" + "github.com/grafana/grafana/pkg/setting" - "golang.org/x/oauth2/google" ) var ( @@ -68,150 +72,213 @@ const ( perSeriesAlignerDefault string = "ALIGN_MEAN" ) -func ProvideService(cfg *setting.Cfg, pluginManager plugins.Manager, httpClientProvider httpclient.Provider, - dsService *datasources.Service) *Service { - return &Service{ - PluginManager: pluginManager, - HTTPClientProvider: httpClientProvider, - Cfg: cfg, - dsService: dsService, +func ProvideService(cfg *setting.Cfg, httpClientProvider httpclient.Provider, pluginManager plugins.Manager, + backendPluginManager backendplugin.Manager, dsService *datasources.Service) *Service { + s := &Service{ + pluginManager: pluginManager, + backendPluginManager: backendPluginManager, + httpClientProvider: httpClientProvider, + cfg: cfg, + im: datasource.NewInstanceManager(newInstanceSettings(httpClientProvider)), + dsService: dsService, } + + factory := coreplugin.New(backend.ServeOpts{ + QueryDataHandler: s, + }) + + if err := s.backendPluginManager.Register("stackdriver", factory); err != nil { + slog.Error("Failed to register plugin", "error", err) + } + return s } type Service struct { - PluginManager plugins.Manager - HTTPClientProvider httpclient.Provider - Cfg *setting.Cfg - dsService *datasources.Service + pluginManager plugins.Manager + backendPluginManager backendplugin.Manager + httpClientProvider httpclient.Provider + cfg *setting.Cfg + im instancemgmt.InstanceManager + dsService *datasources.Service } -// Executor executes queries for the CloudMonitoring datasource. -type Executor struct { - httpClient *http.Client - dsInfo *models.DataSource - pluginManager plugins.Manager - encryptionService encryption.Service - cfg *setting.Cfg +type QueryModel struct { + Type string `json:"type"` } -// NewExecutor returns an Executor. -//nolint: staticcheck // plugins.DataPlugin deprecated -func (s *Service) NewExecutor(dsInfo *models.DataSource) (plugins.DataPlugin, error) { - httpClient, err := s.dsService.GetHTTPClient(dsInfo, s.HTTPClientProvider) - if err != nil { - return nil, err - } +type datasourceInfo struct { + id int64 + updated time.Time + url string + authenticationType string + defaultProject string + client *http.Client - return &Executor{ - httpClient: httpClient, - dsInfo: dsInfo, - pluginManager: s.PluginManager, - encryptionService: s.dsService.EncryptionService, - cfg: s.Cfg, - }, nil + jsonData map[string]interface{} + decryptedSecureJSONData map[string]string +} + +func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.InstanceFactoryFunc { + return func(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { + var jsonData map[string]interface{} + err := json.Unmarshal(settings.JSONData, &jsonData) + if err != nil { + return nil, fmt.Errorf("error reading settings: %w", err) + } + + opts, err := settings.HTTPClientOptions() + if err != nil { + return nil, err + } + + client, err := httpClientProvider.New(opts) + if err != nil { + return nil, err + } + + authType := jwtAuthentication + if authTypeOverride, ok := jsonData["authenticationType"].(string); ok && authTypeOverride != "" { + authType = authTypeOverride + } + + var defaultProject string + if jsonData["defaultProject"] != nil { + defaultProject = jsonData["defaultProject"].(string) + } + + return &datasourceInfo{ + id: settings.ID, + updated: settings.Updated, + url: settings.URL, + authenticationType: authType, + defaultProject: defaultProject, + client: client, + jsonData: jsonData, + decryptedSecureJSONData: settings.DecryptedSecureJSONData, + }, nil + } } // Query takes in the frontend queries, parses them into the CloudMonitoring query format // executes the queries against the CloudMonitoring API and parses the response into -// the time series or table format -//nolint: staticcheck // plugins.DataPlugin deprecated -func (e *Executor) DataQuery(ctx context.Context, dsInfo *models.DataSource, tsdbQuery plugins.DataQuery) ( - plugins.DataResponse, error) { - var result plugins.DataResponse - var err error - queryType := tsdbQuery.Queries[0].Model.Get("type").MustString("") - - switch queryType { +// the data frames +func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { + resp := backend.NewQueryDataResponse() + if len(req.Queries) == 0 { + return resp, fmt.Errorf("query contains no queries") + } + + model := &QueryModel{} + err := json.Unmarshal(req.Queries[0].JSON, model) + if err != nil { + return resp, err + } + + dsInfo, err := s.getDSInfo(req.PluginContext) + if err != nil { + return nil, err + } + + switch model.Type { case "annotationQuery": - result, err = e.executeAnnotationQuery(ctx, tsdbQuery) + resp, err = s.executeAnnotationQuery(ctx, req, *dsInfo) case "getGCEDefaultProject": - result, err = e.getGCEDefaultProject(ctx, tsdbQuery) + resp, err = s.getGCEDefaultProject(ctx, req, *dsInfo) case "timeSeriesQuery": fallthrough default: - result, err = e.executeTimeSeriesQuery(ctx, tsdbQuery) + resp, err = s.executeTimeSeriesQuery(ctx, req, *dsInfo) } - return result, err + return resp, err } -//nolint: staticcheck // plugins.DataPlugin deprecated -func (e *Executor) getGCEDefaultProject(ctx context.Context, tsdbQuery plugins.DataQuery) (plugins.DataResponse, error) { - result := plugins.DataResponse{ - //nolint: staticcheck // plugins.DataPlugin deprecated - Results: make(map[string]plugins.DataQueryResult), - } - refID := tsdbQuery.Queries[0].RefID - //nolint: staticcheck // plugins.DataPlugin deprecated - queryResult := plugins.DataQueryResult{Meta: simplejson.New(), RefID: refID} - - gceDefaultProject, err := e.getDefaultProject(ctx) +func (s *Service) getGCEDefaultProject(ctx context.Context, req *backend.QueryDataRequest, dsInfo datasourceInfo) (*backend.QueryDataResponse, error) { + gceDefaultProject, err := s.getDefaultProject(ctx, dsInfo) if err != nil { - return plugins.DataResponse{}, fmt.Errorf( + return backend.NewQueryDataResponse(), fmt.Errorf( "failed to retrieve default project from GCE metadata server, error: %w", err) } - queryResult.Meta.Set("defaultProject", gceDefaultProject) - result.Results[refID] = queryResult - - return result, nil + return &backend.QueryDataResponse{ + Responses: backend.Responses{ + req.Queries[0].RefID: { + Frames: data.Frames{data.NewFrame("").SetMeta(&data.FrameMeta{ + Custom: map[string]interface{}{ + "defaultProject": gceDefaultProject, + }, + })}, + }, + }, + }, nil } -//nolint: staticcheck // plugins.DataPlugin deprecated -func (e *Executor) executeTimeSeriesQuery(ctx context.Context, tsdbQuery plugins.DataQuery) ( - plugins.DataResponse, error) { - result := plugins.DataResponse{ - //nolint: staticcheck // plugins.DataPlugin deprecated - Results: make(map[string]plugins.DataQueryResult), - } - - queryExecutors, err := e.buildQueryExecutors(tsdbQuery) +func (s *Service) executeTimeSeriesQuery(ctx context.Context, req *backend.QueryDataRequest, dsInfo datasourceInfo) ( + *backend.QueryDataResponse, error) { + resp := backend.NewQueryDataResponse() + queryExecutors, err := s.buildQueryExecutors(req) if err != nil { - return plugins.DataResponse{}, err + return resp, err } for _, queryExecutor := range queryExecutors { - queryRes, resp, executedQueryString, err := queryExecutor.run(ctx, tsdbQuery, e) + queryRes, dr, executedQueryString, err := queryExecutor.run(ctx, req, s, dsInfo) if err != nil { - return plugins.DataResponse{}, err + return resp, err } - err = queryExecutor.parseResponse(&queryRes, resp, executedQueryString) + err = queryExecutor.parseResponse(queryRes, dr, executedQueryString) if err != nil { queryRes.Error = err } - result.Results[queryExecutor.getRefID()] = queryRes + resp.Responses[queryExecutor.getRefID()] = *queryRes } - return result, nil + return resp, nil } -func (e *Executor) buildQueryExecutors(tsdbQuery plugins.DataQuery) ([]cloudMonitoringQueryExecutor, error) { - cloudMonitoringQueryExecutors := []cloudMonitoringQueryExecutor{} - - startTime, err := tsdbQuery.TimeRange.ParseFrom() +func queryModel(query backend.DataQuery) (grafanaQuery, error) { + var rawQuery map[string]interface{} + err := json.Unmarshal(query.JSON, &rawQuery) if err != nil { - return nil, err + return grafanaQuery{}, err + } + + if rawQuery["metricQuery"] == nil { + // migrate legacy query + var mq metricQuery + err = json.Unmarshal(query.JSON, &mq) + if err != nil { + return grafanaQuery{}, err + } + + return grafanaQuery{ + QueryType: metricQueryType, + MetricQuery: mq, + }, nil } - endTime, err := tsdbQuery.TimeRange.ParseTo() + var q grafanaQuery + err = json.Unmarshal(query.JSON, &q) if err != nil { - return nil, err + return grafanaQuery{}, err } + return q, nil +} + +func (s *Service) buildQueryExecutors(req *backend.QueryDataRequest) ([]cloudMonitoringQueryExecutor, error) { + var cloudMonitoringQueryExecutors []cloudMonitoringQueryExecutor + startTime := req.Queries[0].TimeRange.From + endTime := req.Queries[0].TimeRange.To durationSeconds := int(endTime.Sub(startTime).Seconds()) - for i := range tsdbQuery.Queries { - migrateLegacyQueryModel(&tsdbQuery.Queries[i]) - query := tsdbQuery.Queries[i] - q := grafanaQuery{} - model, err := query.Model.MarshalJSON() + for _, query := range req.Queries { + q, err := queryModel(query) if err != nil { - return nil, err - } - if err := json.Unmarshal(model, &q); err != nil { return nil, fmt.Errorf("could not unmarshal CloudMonitoringQuery json: %w", err) } + q.MetricQuery.PreprocessorType = toPreprocessorType(q.MetricQuery.Preprocessor) var target string params := url.Values{} @@ -230,9 +297,9 @@ func (e *Executor) buildQueryExecutors(tsdbQuery plugins.DataQuery) ([]cloudMoni RefID: query.RefID, ProjectName: q.MetricQuery.ProjectName, Query: q.MetricQuery.Query, - IntervalMS: query.IntervalMS, + IntervalMS: query.Interval.Milliseconds(), AliasBy: q.MetricQuery.AliasBy, - timeRange: *tsdbQuery.TimeRange, + timeRange: req.Queries[0].TimeRange, } } else { cmtsf.AliasBy = q.MetricQuery.AliasBy @@ -243,7 +310,7 @@ func (e *Executor) buildQueryExecutors(tsdbQuery plugins.DataQuery) ([]cloudMoni } params.Add("filter", buildFilterString(q.MetricQuery.MetricType, q.MetricQuery.Filters)) params.Add("view", q.MetricQuery.View) - setMetricAggParams(¶ms, &q.MetricQuery, durationSeconds, query.IntervalMS) + setMetricAggParams(¶ms, &q.MetricQuery, durationSeconds, query.Interval.Milliseconds()) queryInterface = cmtsf } case sloQueryType: @@ -253,7 +320,7 @@ func (e *Executor) buildQueryExecutors(tsdbQuery plugins.DataQuery) ([]cloudMoni cmtsf.Service = q.SloQuery.ServiceId cmtsf.Slo = q.SloQuery.SloId params.Add("filter", buildSLOFilterExpression(q.SloQuery)) - setSloAggParams(¶ms, &q.SloQuery, durationSeconds, query.IntervalMS) + setSloAggParams(¶ms, &q.SloQuery, durationSeconds, query.Interval.Milliseconds()) queryInterface = cmtsf default: panic(fmt.Sprintf("Unrecognized query type %q", q.QueryType)) @@ -273,17 +340,6 @@ func (e *Executor) buildQueryExecutors(tsdbQuery plugins.DataQuery) ([]cloudMoni return cloudMonitoringQueryExecutors, nil } -func migrateLegacyQueryModel(query *plugins.DataSubQuery) { - mq := query.Model.Get("metricQuery").MustMap() - if mq == nil { - migratedModel := simplejson.NewFromAny(map[string]interface{}{ - "queryType": metricQueryType, - "metricQuery": query.Model.MustMap(), - }) - query.Model = migratedModel - } -} - func reverse(s string) string { chars := []rune(s) for i, j := 0, len(chars)-1; i < j; i, j = i+1, j-1 { @@ -522,8 +578,8 @@ func calcBucketBound(bucketOptions cloudMonitoringBucketOptions, n int) string { return bucketBound } -func (e *Executor) createRequest(ctx context.Context, dsInfo *models.DataSource, proxyPass string, body io.Reader) (*http.Request, error) { - u, err := url.Parse(dsInfo.Url) +func (s *Service) createRequest(ctx context.Context, pluginCtx backend.PluginContext, dsInfo *datasourceInfo, proxyPass string, body io.Reader) (*http.Request, error) { + u, err := url.Parse(dsInfo.url) if err != nil { return nil, err } @@ -542,7 +598,7 @@ func (e *Executor) createRequest(ctx context.Context, dsInfo *models.DataSource, req.Header.Set("Content-Type", "application/json") // find plugin - plugin := e.pluginManager.GetDataSource(dsInfo.Type) + plugin := s.pluginManager.GetDataSource(pluginCtx.PluginID) if plugin == nil { return nil, errors.New("unable to find datasource plugin CloudMonitoring") } @@ -555,14 +611,18 @@ func (e *Executor) createRequest(ctx context.Context, dsInfo *models.DataSource, } } - pluginproxy.ApplyRoute(ctx, req, proxyPass, cloudMonitoringRoute, dsInfo, e.cfg, e.encryptionService) + pluginproxy.ApplyRoute(ctx, req, proxyPass, cloudMonitoringRoute, pluginproxy.DSInfo{ + ID: dsInfo.id, + Updated: dsInfo.updated, + JSONData: dsInfo.jsonData, + DecryptedSecureJSONData: dsInfo.decryptedSecureJSONData, + }, s.cfg) return req, nil } -func (e *Executor) getDefaultProject(ctx context.Context) (string, error) { - authenticationType := e.dsInfo.JsonData.Get("authenticationType").MustString(jwtAuthentication) - if authenticationType == gceAuthentication { +func (s *Service) getDefaultProject(ctx context.Context, dsInfo datasourceInfo) (string, error) { + if dsInfo.authenticationType == gceAuthentication { defaultCredentials, err := google.FindDefaultCredentials(ctx, "https://www.googleapis.com/auth/monitoring.read") if err != nil { return "", fmt.Errorf("failed to retrieve default project from GCE metadata server: %w", err) @@ -577,7 +637,7 @@ func (e *Executor) getDefaultProject(ctx context.Context) (string, error) { return defaultCredentials.ProjectID, nil } - return e.dsInfo.JsonData.Get("defaultProject").MustString(), nil + return dsInfo.defaultProject, nil } func unmarshalResponse(res *http.Response) (cloudMonitoringResponse, error) { @@ -626,3 +686,17 @@ func addConfigData(frames data.Frames, dl string, unit string) data.Frames { } return frames } + +func (s *Service) getDSInfo(pluginCtx backend.PluginContext) (*datasourceInfo, error) { + i, err := s.im.Get(pluginCtx) + if err != nil { + return nil, err + } + + instance, ok := i.(*datasourceInfo) + if !ok { + return nil, fmt.Errorf("failed to cast datsource info") + } + + return instance, nil +} diff --git a/pkg/tsdb/cloudmonitoring/cloudmonitoring_test.go b/pkg/tsdb/cloudmonitoring/cloudmonitoring_test.go index 0125970af444..3812a0119700 100644 --- a/pkg/tsdb/cloudmonitoring/cloudmonitoring_test.go +++ b/pkg/tsdb/cloudmonitoring/cloudmonitoring_test.go @@ -12,18 +12,18 @@ import ( "testing" "time" - "github.com/grafana/grafana/pkg/components/simplejson" - "github.com/grafana/grafana/pkg/plugins" + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestCloudMonitoring(t *testing.T) { - executor := &Executor{} + service := &Service{} t.Run("Parse migrated queries from frontend and build Google Cloud Monitoring API queries", func(t *testing.T) { t.Run("and query has no aggregation set", func(t *testing.T) { - qes, err := executor.buildQueryExecutors(getBaseQuery()) + qes, err := service.buildQueryExecutors(baseReq()) require.NoError(t, err) queries := getCloudMonitoringQueriesFromInterface(t, qes) @@ -58,13 +58,13 @@ func TestCloudMonitoring(t *testing.T) { }) t.Run("and query has filters", func(t *testing.T) { - query := getBaseQuery() - query.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{ + query := baseReq() + query.Queries[0].JSON = json.RawMessage(`{ "metricType": "a/metric/type", - "filters": []interface{}{"key", "=", "value", "AND", "key2", "=", "value2", "AND", "resource.type", "=", "another/resource/type"}, - }) + "filters": ["key", "=", "value", "AND", "key2", "=", "value2", "AND", "resource.type", "=", "another/resource/type"] + }`) - qes, err := executor.buildQueryExecutors(query) + qes, err := service.buildQueryExecutors(query) require.NoError(t, err) queries := getCloudMonitoringQueriesFromInterface(t, qes) assert.Equal(t, 1, len(queries)) @@ -89,14 +89,14 @@ func TestCloudMonitoring(t *testing.T) { t.Run("and alignmentPeriod is set to grafana-auto", func(t *testing.T) { t.Run("and IntervalMS is larger than 60000", func(t *testing.T) { - tsdbQuery := getBaseQuery() - tsdbQuery.Queries[0].IntervalMS = 1000000 - tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{ + req := baseReq() + req.Queries[0].Interval = 1000000 * time.Millisecond + req.Queries[0].JSON = json.RawMessage(`{ "alignmentPeriod": "grafana-auto", - "filters": []interface{}{"key", "=", "value", "AND", "key2", "=", "value2"}, - }) + "filters": ["key", "=", "value", "AND", "key2", "=", "value2"] + }`) - qes, err := executor.buildQueryExecutors(tsdbQuery) + qes, err := service.buildQueryExecutors(req) require.NoError(t, err) queries := getCloudMonitoringQueriesFromInterface(t, qes) assert.Equal(t, `+1000s`, queries[0].Params["aggregation.alignmentPeriod"][0]) @@ -117,14 +117,14 @@ func TestCloudMonitoring(t *testing.T) { verifyDeepLink(t, dl, expectedTimeSelection, expectedTimeSeriesFilter) }) t.Run("and IntervalMS is less than 60000", func(t *testing.T) { - tsdbQuery := getBaseQuery() - tsdbQuery.Queries[0].IntervalMS = 30000 - tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{ + req := baseReq() + req.Queries[0].Interval = 30000 * time.Millisecond + req.Queries[0].JSON = json.RawMessage(`{ "alignmentPeriod": "grafana-auto", - "filters": []interface{}{"key", "=", "value", "AND", "key2", "=", "value2"}, - }) + "filters": ["key", "=", "value", "AND", "key2", "=", "value2"] + }`) - qes, err := executor.buildQueryExecutors(tsdbQuery) + qes, err := service.buildQueryExecutors(req) require.NoError(t, err) queries := getCloudMonitoringQueriesFromInterface(t, qes) assert.Equal(t, `+60s`, queries[0].Params["aggregation.alignmentPeriod"][0]) @@ -147,61 +147,63 @@ func TestCloudMonitoring(t *testing.T) { }) t.Run("and alignmentPeriod is set to cloud-monitoring-auto", func(t *testing.T) { // legacy + now := time.Now().UTC() + t.Run("and range is two hours", func(t *testing.T) { - tsdbQuery := getBaseQuery() - tsdbQuery.TimeRange.From = "1538033322461" - tsdbQuery.TimeRange.To = "1538040522461" - tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{ - "target": "target", - "alignmentPeriod": "cloud-monitoring-auto", - }) - - qes, err := executor.buildQueryExecutors(tsdbQuery) + req := baseReq() + req.Queries[0].TimeRange.From = now.Add(-(time.Hour * 2)) + req.Queries[0].TimeRange.To = now + req.Queries[0].JSON = json.RawMessage(`{ + "target": "target", + "alignmentPeriod": "cloud-monitoring-auto" + }`) + + qes, err := service.buildQueryExecutors(req) require.NoError(t, err) queries := getCloudMonitoringQueriesFromInterface(t, qes) assert.Equal(t, `+60s`, queries[0].Params["aggregation.alignmentPeriod"][0]) }) t.Run("and range is 22 hours", func(t *testing.T) { - tsdbQuery := getBaseQuery() - tsdbQuery.TimeRange.From = "1538034524922" - tsdbQuery.TimeRange.To = "1538113724922" - tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{ - "target": "target", - "alignmentPeriod": "cloud-monitoring-auto", - }) - - qes, err := executor.buildQueryExecutors(tsdbQuery) + req := baseReq() + req.Queries[0].TimeRange.From = now.Add(-(time.Hour * 22)) + req.Queries[0].TimeRange.To = now + req.Queries[0].JSON = json.RawMessage(`{ + "target": "target", + "alignmentPeriod": "cloud-monitoring-auto" + }`) + + qes, err := service.buildQueryExecutors(req) require.NoError(t, err) queries := getCloudMonitoringQueriesFromInterface(t, qes) assert.Equal(t, `+60s`, queries[0].Params["aggregation.alignmentPeriod"][0]) }) t.Run("and range is 23 hours", func(t *testing.T) { - tsdbQuery := getBaseQuery() - tsdbQuery.TimeRange.From = "1538034567985" - tsdbQuery.TimeRange.To = "1538117367985" - tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{ - "target": "target", - "alignmentPeriod": "cloud-monitoring-auto", - }) - - qes, err := executor.buildQueryExecutors(tsdbQuery) + req := baseReq() + req.Queries[0].TimeRange.From = now.Add(-(time.Hour * 23)) + req.Queries[0].TimeRange.To = now + req.Queries[0].JSON = json.RawMessage(`{ + "target": "target", + "alignmentPeriod": "cloud-monitoring-auto" + }`) + + qes, err := service.buildQueryExecutors(req) require.NoError(t, err) queries := getCloudMonitoringQueriesFromInterface(t, qes) assert.Equal(t, `+300s`, queries[0].Params["aggregation.alignmentPeriod"][0]) }) t.Run("and range is 7 days", func(t *testing.T) { - tsdbQuery := getBaseQuery() - tsdbQuery.TimeRange.From = "1538036324073" - tsdbQuery.TimeRange.To = "1538641124073" - tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{ - "target": "target", - "alignmentPeriod": "cloud-monitoring-auto", - }) - - qes, err := executor.buildQueryExecutors(tsdbQuery) + req := baseReq() + req.Queries[0].TimeRange.From = now + req.Queries[0].TimeRange.To = now.AddDate(0, 0, 7) + req.Queries[0].JSON = json.RawMessage(`{ + "target": "target", + "alignmentPeriod": "cloud-monitoring-auto" + }`) + + qes, err := service.buildQueryExecutors(req) require.NoError(t, err) queries := getCloudMonitoringQueriesFromInterface(t, qes) assert.Equal(t, `+3600s`, queries[0].Params["aggregation.alignmentPeriod"][0]) @@ -209,16 +211,18 @@ func TestCloudMonitoring(t *testing.T) { }) t.Run("and alignmentPeriod is set to stackdriver-auto", func(t *testing.T) { // legacy + now := time.Now().UTC() + t.Run("and range is two hours", func(t *testing.T) { - tsdbQuery := getBaseQuery() - tsdbQuery.TimeRange.From = "1538033322461" - tsdbQuery.TimeRange.To = "1538040522461" - tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{ - "target": "target", - "alignmentPeriod": "stackdriver-auto", - }) - - qes, err := executor.buildQueryExecutors(tsdbQuery) + req := baseReq() + req.Queries[0].TimeRange.From = now.Add(-(time.Hour * 2)) + req.Queries[0].TimeRange.To = now + req.Queries[0].JSON = json.RawMessage(`{ + "target": "target", + "alignmentPeriod": "stackdriver-auto" + }`) + + qes, err := service.buildQueryExecutors(req) require.NoError(t, err) queries := getCloudMonitoringQueriesFromInterface(t, qes) assert.Equal(t, `+60s`, queries[0].Params["aggregation.alignmentPeriod"][0]) @@ -230,8 +234,8 @@ func TestCloudMonitoring(t *testing.T) { expectedTimeSelection := map[string]string{ "timeRange": "custom", - "start": "2018-09-27T07:28:42Z", - "end": "2018-09-27T09:28:42Z", + "start": req.Queries[0].TimeRange.From.Format(time.RFC3339), + "end": req.Queries[0].TimeRange.To.Format(time.RFC3339), } expectedTimeSeriesFilter := map[string]interface{}{ "minAlignmentPeriod": `60s`, @@ -240,15 +244,15 @@ func TestCloudMonitoring(t *testing.T) { }) t.Run("and range is 22 hours", func(t *testing.T) { - tsdbQuery := getBaseQuery() - tsdbQuery.TimeRange.From = "1538034524922" - tsdbQuery.TimeRange.To = "1538113724922" - tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{ - "target": "target", - "alignmentPeriod": "stackdriver-auto", - }) - - qes, err := executor.buildQueryExecutors(tsdbQuery) + req := baseReq() + req.Queries[0].TimeRange.From = now.Add(-(time.Hour * 22)) + req.Queries[0].TimeRange.To = now + req.Queries[0].JSON = json.RawMessage(`{ + "target": "target", + "alignmentPeriod": "stackdriver-auto" + }`) + + qes, err := service.buildQueryExecutors(req) require.NoError(t, err) queries := getCloudMonitoringQueriesFromInterface(t, qes) assert.Equal(t, `+60s`, queries[0].Params["aggregation.alignmentPeriod"][0]) @@ -260,8 +264,8 @@ func TestCloudMonitoring(t *testing.T) { expectedTimeSelection := map[string]string{ "timeRange": "custom", - "start": "2018-09-27T07:48:44Z", - "end": "2018-09-28T05:48:44Z", + "start": req.Queries[0].TimeRange.From.Format(time.RFC3339), + "end": req.Queries[0].TimeRange.To.Format(time.RFC3339), } expectedTimeSeriesFilter := map[string]interface{}{ "minAlignmentPeriod": `60s`, @@ -270,15 +274,15 @@ func TestCloudMonitoring(t *testing.T) { }) t.Run("and range is 23 hours", func(t *testing.T) { - tsdbQuery := getBaseQuery() - tsdbQuery.TimeRange.From = "1538034567985" - tsdbQuery.TimeRange.To = "1538117367985" - tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{ - "target": "target", - "alignmentPeriod": "stackdriver-auto", - }) - - qes, err := executor.buildQueryExecutors(tsdbQuery) + req := baseReq() + req.Queries[0].TimeRange.From = now.Add(-(time.Hour * 23)) + req.Queries[0].TimeRange.To = now + req.Queries[0].JSON = json.RawMessage(`{ + "target": "target", + "alignmentPeriod": "stackdriver-auto" + }`) + + qes, err := service.buildQueryExecutors(req) require.NoError(t, err) queries := getCloudMonitoringQueriesFromInterface(t, qes) assert.Equal(t, `+300s`, queries[0].Params["aggregation.alignmentPeriod"][0]) @@ -290,8 +294,8 @@ func TestCloudMonitoring(t *testing.T) { expectedTimeSelection := map[string]string{ "timeRange": "custom", - "start": "2018-09-27T07:49:27Z", - "end": "2018-09-28T06:49:27Z", + "start": req.Queries[0].TimeRange.From.Format(time.RFC3339), + "end": req.Queries[0].TimeRange.To.Format(time.RFC3339), } expectedTimeSeriesFilter := map[string]interface{}{ "minAlignmentPeriod": `300s`, @@ -300,15 +304,15 @@ func TestCloudMonitoring(t *testing.T) { }) t.Run("and range is 7 days", func(t *testing.T) { - tsdbQuery := getBaseQuery() - tsdbQuery.TimeRange.From = "1538036324073" - tsdbQuery.TimeRange.To = "1538641124073" - tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{ - "target": "target", - "alignmentPeriod": "stackdriver-auto", - }) - - qes, err := executor.buildQueryExecutors(tsdbQuery) + req := baseReq() + req.Queries[0].TimeRange.From = now.AddDate(0, 0, -7) + req.Queries[0].TimeRange.To = now + req.Queries[0].JSON = json.RawMessage(`{ + "target": "target", + "alignmentPeriod": "stackdriver-auto" + }`) + + qes, err := service.buildQueryExecutors(req) require.NoError(t, err) queries := getCloudMonitoringQueriesFromInterface(t, qes) assert.Equal(t, `+3600s`, queries[0].Params["aggregation.alignmentPeriod"][0]) @@ -320,8 +324,8 @@ func TestCloudMonitoring(t *testing.T) { expectedTimeSelection := map[string]string{ "timeRange": "custom", - "start": "2018-09-27T08:18:44Z", - "end": "2018-10-04T08:18:44Z", + "start": req.Queries[0].TimeRange.From.Format(time.RFC3339), + "end": req.Queries[0].TimeRange.To.Format(time.RFC3339), } expectedTimeSeriesFilter := map[string]interface{}{ "minAlignmentPeriod": `3600s`, @@ -332,13 +336,13 @@ func TestCloudMonitoring(t *testing.T) { t.Run("and alignmentPeriod is set in frontend", func(t *testing.T) { t.Run("and alignment period is within accepted range", func(t *testing.T) { - tsdbQuery := getBaseQuery() - tsdbQuery.Queries[0].IntervalMS = 1000 - tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{ - "alignmentPeriod": "+600s", - }) + req := baseReq() + req.Queries[0].Interval = 1000 + req.Queries[0].JSON = json.RawMessage(`{ + "alignmentPeriod": "+600s" + }`) - qes, err := executor.buildQueryExecutors(tsdbQuery) + qes, err := service.buildQueryExecutors(req) require.NoError(t, err) queries := getCloudMonitoringQueriesFromInterface(t, qes) assert.Equal(t, `+600s`, queries[0].Params["aggregation.alignmentPeriod"][0]) @@ -361,14 +365,14 @@ func TestCloudMonitoring(t *testing.T) { }) t.Run("and query has aggregation mean set", func(t *testing.T) { - tsdbQuery := getBaseQuery() - tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{ + req := baseReq() + req.Queries[0].JSON = json.RawMessage(`{ "metricType": "a/metric/type", "crossSeriesReducer": "REDUCE_SUM", - "view": "FULL", - }) + "view": "FULL" + }`) - qes, err := executor.buildQueryExecutors(tsdbQuery) + qes, err := service.buildQueryExecutors(req) require.NoError(t, err) queries := getCloudMonitoringQueriesFromInterface(t, qes) @@ -404,15 +408,15 @@ func TestCloudMonitoring(t *testing.T) { }) t.Run("and query has group bys", func(t *testing.T) { - tsdbQuery := getBaseQuery() - tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{ + req := baseReq() + req.Queries[0].JSON = json.RawMessage(`{ "metricType": "a/metric/type", "crossSeriesReducer": "REDUCE_NONE", - "groupBys": []interface{}{"metric.label.group1", "metric.label.group2"}, - "view": "FULL", - }) + "groupBys": ["metric.label.group1", "metric.label.group2"], + "view": "FULL" + }`) - qes, err := executor.buildQueryExecutors(tsdbQuery) + qes, err := service.buildQueryExecutors(req) require.NoError(t, err) queries := getCloudMonitoringQueriesFromInterface(t, qes) @@ -450,30 +454,29 @@ func TestCloudMonitoring(t *testing.T) { t.Run("Parse queries from frontend and build Google Cloud Monitoring API queries", func(t *testing.T) { fromStart := time.Date(2018, 3, 15, 13, 0, 0, 0, time.UTC).In(time.Local) - tsdbQuery := plugins.DataQuery{ - TimeRange: &plugins.DataTimeRange{ - From: fmt.Sprintf("%v", fromStart.Unix()*1000), - To: fmt.Sprintf("%v", fromStart.Add(34*time.Minute).Unix()*1000), - }, - Queries: []plugins.DataSubQuery{ + req := &backend.QueryDataRequest{ + Queries: []backend.DataQuery{ { - Model: simplejson.NewFromAny(map[string]interface{}{ - "queryType": metricQueryType, - "metricQuery": map[string]interface{}{ - "metricType": "a/metric/type", - "view": "FULL", - "aliasBy": "testalias", - "type": "timeSeriesQuery", - "groupBys": []interface{}{"metric.label.group1", "metric.label.group2"}, - }, - }), RefID: "A", + TimeRange: backend.TimeRange{ + From: fromStart, + To: fromStart.Add(34 * time.Minute), + }, + JSON: json.RawMessage(`{ + "queryType": "metrics", + "metricQuery": { + "metricType": "a/metric/type", + "view": "FULL", + "aliasBy": "testalias", + "type": "timeSeriesQuery", + "groupBys": ["metric.label.group1", "metric.label.group2"] + } + }`), }, }, } - t.Run("and query type is metrics", func(t *testing.T) { - qes, err := executor.buildQueryExecutors(tsdbQuery) + qes, err := service.buildQueryExecutors(req) require.NoError(t, err) queries := getCloudMonitoringQueriesFromInterface(t, qes) @@ -509,18 +512,18 @@ func TestCloudMonitoring(t *testing.T) { } verifyDeepLink(t, dl, expectedTimeSelection, expectedTimeSeriesFilter) - tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{ - "queryType": metricQueryType, - "metricQuery": map[string]interface{}{ - "editorMode": mqlEditorMode, + req.Queries[0].JSON = json.RawMessage(`{ + "queryType": "metrics", + "metricQuery": { + "editorMode": "mql", "projectName": "test-proj", "query": "test-query", - "aliasBy": "test-alias", + "aliasBy": "test-alias" }, - "sloQuery": map[string]interface{}{}, - }) + "sloQuery": {} + }`) - qes, err = executor.buildQueryExecutors(tsdbQuery) + qes, err = service.buildQueryExecutors(req) require.NoError(t, err) tqueries := make([]*cloudMonitoringTimeSeriesQuery, 0) for _, qi := range qes { @@ -537,21 +540,21 @@ func TestCloudMonitoring(t *testing.T) { }) t.Run("and query type is SLOs", func(t *testing.T) { - tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{ - "queryType": sloQueryType, - "metricQuery": map[string]interface{}{}, - "sloQuery": map[string]interface{}{ + req.Queries[0].JSON = json.RawMessage(`{ + "queryType": "slo", + "sloQuery": { "projectName": "test-proj", "alignmentPeriod": "stackdriver-auto", "perSeriesAligner": "ALIGN_NEXT_OLDER", "aliasBy": "", "selectorName": "select_slo_health", "serviceId": "test-service", - "sloId": "test-slo", + "sloId": "test-slo" }, - }) + "metricQuery": {} + }`) - qes, err := executor.buildQueryExecutors(tsdbQuery) + qes, err := service.buildQueryExecutors(req) require.NoError(t, err) queries := getCloudMonitoringQueriesFromInterface(t, qes) @@ -565,21 +568,21 @@ func TestCloudMonitoring(t *testing.T) { assert.Equal(t, `aggregation.alignmentPeriod=%2B60s&aggregation.perSeriesAligner=ALIGN_MEAN&filter=select_slo_health%28%22projects%2Ftest-proj%2Fservices%2Ftest-service%2FserviceLevelObjectives%2Ftest-slo%22%29&interval.endTime=2018-03-15T13%3A34%3A00Z&interval.startTime=2018-03-15T13%3A00%3A00Z`, queries[0].Target) assert.Equal(t, 5, len(queries[0].Params)) - tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{ - "queryType": sloQueryType, - "metricQuery": map[string]interface{}{}, - "sloQuery": map[string]interface{}{ + req.Queries[0].JSON = json.RawMessage(`{ + "queryType": "slo", + "sloQuery": { "projectName": "test-proj", "alignmentPeriod": "stackdriver-auto", "perSeriesAligner": "ALIGN_NEXT_OLDER", "aliasBy": "", "selectorName": "select_slo_compliance", "serviceId": "test-service", - "sloId": "test-slo", + "sloId": "test-slo" }, - }) + "metricQuery": {} + }`) - qes, err = executor.buildQueryExecutors(tsdbQuery) + qes, err = service.buildQueryExecutors(req) require.NoError(t, err) qqueries := getCloudMonitoringQueriesFromInterface(t, qes) assert.Equal(t, "ALIGN_NEXT_OLDER", qqueries[0].Params["aggregation.perSeriesAligner"][0]) @@ -595,13 +598,11 @@ func TestCloudMonitoring(t *testing.T) { require.NoError(t, err) assert.Equal(t, 1, len(data.TimeSeries)) - //nolint: staticcheck // plugins.DataPlugin deprecated - res := &plugins.DataQueryResult{Meta: simplejson.New(), RefID: "A"} + res := &backend.DataResponse{} query := &cloudMonitoringTimeSeriesFilter{Params: url.Values{}} err = query.parseResponse(res, data, "") require.NoError(t, err) - frames, err := res.Dataframes.Decoded() - require.NoError(t, err) + frames := res.Frames require.Len(t, frames, 1) assert.Equal(t, "serviceruntime.googleapis.com/api/request_count", frames[0].Fields[1].Name) assert.Equal(t, 3, frames[0].Fields[1].Len()) @@ -620,52 +621,53 @@ func TestCloudMonitoring(t *testing.T) { data, err := loadTestFile("./test-data/2-series-response-no-agg.json") require.NoError(t, err) assert.Equal(t, 3, len(data.TimeSeries)) - //nolint: staticcheck // plugins.DataPlugin deprecated - res := &plugins.DataQueryResult{Meta: simplejson.New(), RefID: "A"} + res := &backend.DataResponse{} query := &cloudMonitoringTimeSeriesFilter{Params: url.Values{}} err = query.parseResponse(res, data, "") require.NoError(t, err) - frames, err := res.Dataframes.Decoded() - require.NoError(t, err) - assert.Equal(t, 3, len(frames)) - assert.Equal(t, "compute.googleapis.com/instance/cpu/usage_time collector-asia-east-1", frames[0].Fields[1].Name) - assert.Equal(t, "compute.googleapis.com/instance/cpu/usage_time collector-europe-west-1", frames[1].Fields[1].Name) - assert.Equal(t, "compute.googleapis.com/instance/cpu/usage_time collector-us-east-1", frames[2].Fields[1].Name) - - assert.Equal(t, 3, frames[0].Fields[1].Len()) - assert.Equal(t, 9.8566497180145, frames[0].Fields[1].At(0)) - assert.Equal(t, 9.7323568146676, frames[0].Fields[1].At(1)) - assert.Equal(t, 9.7730520330369, frames[0].Fields[1].At(2)) - - labels := res.Meta.Get("labels").Interface().(map[string][]string) - require.NotNil(t, labels) - assert.Equal(t, 3, len(labels["metric.label.instance_name"])) - assert.Contains(t, labels["metric.label.instance_name"], "collector-asia-east-1") - assert.Contains(t, labels["metric.label.instance_name"], "collector-europe-west-1") - assert.Contains(t, labels["metric.label.instance_name"], "collector-us-east-1") - - assert.Equal(t, 3, len(labels["resource.label.zone"])) - assert.Contains(t, labels["resource.label.zone"], "asia-east1-a") - assert.Contains(t, labels["resource.label.zone"], "europe-west1-b") - assert.Contains(t, labels["resource.label.zone"], "us-east1-b") - - assert.Equal(t, 1, len(labels["resource.label.project_id"])) - assert.Equal(t, "grafana-prod", labels["resource.label.project_id"][0]) + field := res.Frames[0].Fields[1] + assert.Equal(t, 3, field.Len()) + assert.Equal(t, 9.8566497180145, field.At(0)) + assert.Equal(t, 9.7323568146676, field.At(1)) + assert.Equal(t, 9.7730520330369, field.At(2)) + assert.Equal(t, "compute.googleapis.com/instance/cpu/usage_time collector-asia-east-1", field.Name) + assert.Equal(t, "collector-asia-east-1", field.Labels["metric.label.instance_name"]) + assert.Equal(t, "asia-east1-a", field.Labels["resource.label.zone"]) + assert.Equal(t, "grafana-prod", field.Labels["resource.label.project_id"]) + + field = res.Frames[1].Fields[1] + assert.Equal(t, 3, field.Len()) + assert.Equal(t, 9.0238475054502, field.At(0)) + assert.Equal(t, 8.9689492364414, field.At(1)) + assert.Equal(t, 8.8210971239023, field.At(2)) + assert.Equal(t, "compute.googleapis.com/instance/cpu/usage_time collector-europe-west-1", field.Name) + assert.Equal(t, "collector-europe-west-1", field.Labels["metric.label.instance_name"]) + assert.Equal(t, "europe-west1-b", field.Labels["resource.label.zone"]) + assert.Equal(t, "grafana-prod", field.Labels["resource.label.project_id"]) + + field = res.Frames[2].Fields[1] + assert.Equal(t, 3, field.Len()) + assert.Equal(t, 30.829426143318, field.At(0)) + assert.Equal(t, 30.903974115849, field.At(1)) + assert.Equal(t, 30.807846801355, field.At(2)) + assert.Equal(t, "compute.googleapis.com/instance/cpu/usage_time collector-us-east-1", field.Name) + assert.Equal(t, "collector-us-east-1", field.Labels["metric.label.instance_name"]) + assert.Equal(t, "us-east1-b", field.Labels["resource.label.zone"]) + assert.Equal(t, "grafana-prod", field.Labels["resource.label.project_id"]) }) t.Run("when data from query with no aggregation and group bys", func(t *testing.T) { data, err := loadTestFile("./test-data/2-series-response-no-agg.json") require.NoError(t, err) assert.Equal(t, 3, len(data.TimeSeries)) - //nolint: staticcheck // plugins.DataPlugin deprecated - res := &plugins.DataQueryResult{Meta: simplejson.New(), RefID: "A"} + res := &backend.DataResponse{} query := &cloudMonitoringTimeSeriesFilter{Params: url.Values{}, GroupBys: []string{ "metric.label.instance_name", "resource.label.zone", }} err = query.parseResponse(res, data, "") require.NoError(t, err) - frames, err := res.Dataframes.Decoded() + frames := res.Frames require.NoError(t, err) assert.Equal(t, 3, len(frames)) @@ -678,14 +680,13 @@ func TestCloudMonitoring(t *testing.T) { data, err := loadTestFile("./test-data/2-series-response-no-agg.json") require.NoError(t, err) assert.Equal(t, 3, len(data.TimeSeries)) - //nolint: staticcheck // plugins.DataPlugin deprecated - res := &plugins.DataQueryResult{Meta: simplejson.New(), RefID: "A"} + res := &backend.DataResponse{} t.Run("and the alias pattern is for metric type, a metric label and a resource label", func(t *testing.T) { query := &cloudMonitoringTimeSeriesFilter{Params: url.Values{}, AliasBy: "{{metric.type}} - {{metric.label.instance_name}} - {{resource.label.zone}}", GroupBys: []string{"metric.label.instance_name", "resource.label.zone"}} err = query.parseResponse(res, data, "") require.NoError(t, err) - frames, err := res.Dataframes.Decoded() + frames := res.Frames require.NoError(t, err) assert.Equal(t, 3, len(frames)) @@ -698,7 +699,7 @@ func TestCloudMonitoring(t *testing.T) { query := &cloudMonitoringTimeSeriesFilter{Params: url.Values{}, AliasBy: "metric {{metric.name}} service {{metric.service}}", GroupBys: []string{"metric.label.instance_name", "resource.label.zone"}} err = query.parseResponse(res, data, "") require.NoError(t, err) - frames, err := res.Dataframes.Decoded() + frames := res.Frames require.NoError(t, err) assert.Equal(t, 3, len(frames)) @@ -712,12 +713,11 @@ func TestCloudMonitoring(t *testing.T) { data, err := loadTestFile("./test-data/3-series-response-distribution-exponential.json") require.NoError(t, err) assert.Equal(t, 1, len(data.TimeSeries)) - //nolint: staticcheck // plugins.DataPlugin deprecated - res := &plugins.DataQueryResult{Meta: simplejson.New(), RefID: "A"} + res := &backend.DataResponse{} query := &cloudMonitoringTimeSeriesFilter{Params: url.Values{}, AliasBy: "{{bucket}}"} err = query.parseResponse(res, data, "") require.NoError(t, err) - frames, err := res.Dataframes.Decoded() + frames := res.Frames require.NoError(t, err) assert.Equal(t, 11, len(frames)) for i := 0; i < 11; i++ { @@ -754,12 +754,11 @@ func TestCloudMonitoring(t *testing.T) { data, err := loadTestFile("./test-data/4-series-response-distribution-explicit.json") require.NoError(t, err) assert.Equal(t, 1, len(data.TimeSeries)) - //nolint: staticcheck // plugins.DataPlugin deprecated - res := &plugins.DataQueryResult{Meta: simplejson.New(), RefID: "A"} + res := &backend.DataResponse{} query := &cloudMonitoringTimeSeriesFilter{Params: url.Values{}, AliasBy: "{{bucket}}"} err = query.parseResponse(res, data, "") require.NoError(t, err) - frames, err := res.Dataframes.Decoded() + frames := res.Frames require.NoError(t, err) assert.Equal(t, 33, len(frames)) for i := 0; i < 33; i++ { @@ -789,34 +788,34 @@ func TestCloudMonitoring(t *testing.T) { data, err := loadTestFile("./test-data/5-series-response-meta-data.json") require.NoError(t, err) assert.Equal(t, 3, len(data.TimeSeries)) - //nolint: staticcheck // plugins.DataPlugin deprecated - res := &plugins.DataQueryResult{Meta: simplejson.New(), RefID: "A"} + res := &backend.DataResponse{} query := &cloudMonitoringTimeSeriesFilter{Params: url.Values{}, AliasBy: "{{bucket}}"} err = query.parseResponse(res, data, "") require.NoError(t, err) - labels := res.Meta.Get("labels").Interface().(map[string][]string) - frames, err := res.Dataframes.Decoded() require.NoError(t, err) - assert.Equal(t, 3, len(frames)) - - assert.Equal(t, 5, len(labels["metadata.system_labels.test"])) - assert.Contains(t, labels["metadata.system_labels.test"], "value1") - assert.Contains(t, labels["metadata.system_labels.test"], "value2") - assert.Contains(t, labels["metadata.system_labels.test"], "value3") - assert.Contains(t, labels["metadata.system_labels.test"], "value4") - assert.Contains(t, labels["metadata.system_labels.test"], "value5") - - assert.Equal(t, 2, len(labels["metadata.system_labels.region"])) - assert.Contains(t, labels["metadata.system_labels.region"], "us-central1") - assert.Contains(t, labels["metadata.system_labels.region"], "us-west1") - - assert.Equal(t, 2, len(labels["metadata.user_labels.region"])) - assert.Contains(t, labels["metadata.user_labels.region"], "region1") - assert.Contains(t, labels["metadata.user_labels.region"], "region3") - - assert.Equal(t, 2, len(labels["metadata.user_labels.name"])) - assert.Contains(t, labels["metadata.user_labels.name"], "name1") - assert.Contains(t, labels["metadata.user_labels.name"], "name3") + assert.Equal(t, 3, len(res.Frames)) + + field := res.Frames[0].Fields[1] + assert.Equal(t, "diana-debian9", field.Labels["metadata.system_labels.name"]) + assert.Equal(t, "value1, value2", field.Labels["metadata.system_labels.test"]) + assert.Equal(t, "us-west1", field.Labels["metadata.system_labels.region"]) + assert.Equal(t, "false", field.Labels["metadata.system_labels.spot_instance"]) + assert.Equal(t, "name1", field.Labels["metadata.user_labels.name"]) + assert.Equal(t, "region1", field.Labels["metadata.user_labels.region"]) + + field = res.Frames[1].Fields[1] + assert.Equal(t, "diana-ubuntu1910", field.Labels["metadata.system_labels.name"]) + assert.Equal(t, "value1, value2, value3", field.Labels["metadata.system_labels.test"]) + assert.Equal(t, "us-west1", field.Labels["metadata.system_labels.region"]) + assert.Equal(t, "false", field.Labels["metadata.system_labels.spot_instance"]) + + field = res.Frames[2].Fields[1] + assert.Equal(t, "premium-plugin-staging", field.Labels["metadata.system_labels.name"]) + assert.Equal(t, "value1, value2, value4, value5", field.Labels["metadata.system_labels.test"]) + assert.Equal(t, "us-central1", field.Labels["metadata.system_labels.region"]) + assert.Equal(t, "true", field.Labels["metadata.system_labels.spot_instance"]) + assert.Equal(t, "name3", field.Labels["metadata.user_labels.name"]) + assert.Equal(t, "region3", field.Labels["metadata.user_labels.region"]) }) t.Run("when data from query returns metadata system labels and alias by is defined", func(t *testing.T) { @@ -825,12 +824,11 @@ func TestCloudMonitoring(t *testing.T) { assert.Equal(t, 3, len(data.TimeSeries)) t.Run("and systemlabel contains key with array of string", func(t *testing.T) { - //nolint: staticcheck // plugins.DataPlugin deprecated - res := &plugins.DataQueryResult{Meta: simplejson.New(), RefID: "A"} + res := &backend.DataResponse{} query := &cloudMonitoringTimeSeriesFilter{Params: url.Values{}, AliasBy: "{{metadata.system_labels.test}}"} err = query.parseResponse(res, data, "") require.NoError(t, err) - frames, err := res.Dataframes.Decoded() + frames := res.Frames require.NoError(t, err) assert.Equal(t, 3, len(frames)) fmt.Println(frames[0].Fields[1].Name) @@ -840,12 +838,11 @@ func TestCloudMonitoring(t *testing.T) { }) t.Run("and systemlabel contains key with array of string2", func(t *testing.T) { - //nolint: staticcheck // plugins.DataPlugin deprecated - res := &plugins.DataQueryResult{Meta: simplejson.New(), RefID: "A"} + res := &backend.DataResponse{} query := &cloudMonitoringTimeSeriesFilter{Params: url.Values{}, AliasBy: "{{metadata.system_labels.test2}}"} err = query.parseResponse(res, data, "") require.NoError(t, err) - frames, err := res.Dataframes.Decoded() + frames := res.Frames require.NoError(t, err) assert.Equal(t, 3, len(frames)) assert.Equal(t, "testvalue", frames[2].Fields[1].Name) @@ -858,8 +855,7 @@ func TestCloudMonitoring(t *testing.T) { assert.Equal(t, 1, len(data.TimeSeries)) t.Run("and alias by is expanded", func(t *testing.T) { - //nolint: staticcheck // plugins.DataPlugin deprecated - res := &plugins.DataQueryResult{Meta: simplejson.New(), RefID: "A"} + res := &backend.DataResponse{} query := &cloudMonitoringTimeSeriesFilter{ Params: url.Values{}, ProjectName: "test-proj", @@ -870,7 +866,7 @@ func TestCloudMonitoring(t *testing.T) { } err = query.parseResponse(res, data, "") require.NoError(t, err) - frames, err := res.Dataframes.Decoded() + frames := res.Frames require.NoError(t, err) assert.Equal(t, "test-proj - test-service - test-slo - select_slo_compliance", frames[0].Fields[1].Name) }) @@ -882,8 +878,7 @@ func TestCloudMonitoring(t *testing.T) { assert.Equal(t, 1, len(data.TimeSeries)) t.Run("and alias by is expanded", func(t *testing.T) { - //nolint: staticcheck // plugins.DataPlugin deprecated - res := &plugins.DataQueryResult{Meta: simplejson.New(), RefID: "A"} + res := &backend.DataResponse{} query := &cloudMonitoringTimeSeriesFilter{ Params: url.Values{}, ProjectName: "test-proj", @@ -893,7 +888,7 @@ func TestCloudMonitoring(t *testing.T) { } err = query.parseResponse(res, data, "") require.NoError(t, err) - frames, err := res.Dataframes.Decoded() + frames := res.Frames require.NoError(t, err) assert.Equal(t, "select_slo_compliance(\"projects/test-proj/services/test-service/serviceLevelObjectives/test-slo\")", frames[0].Fields[1].Name) }) @@ -904,12 +899,11 @@ func TestCloudMonitoring(t *testing.T) { data, err := loadTestFile("./test-data/1-series-response-agg-one-metric.json") require.NoError(t, err) assert.Equal(t, 1, len(data.TimeSeries)) - //nolint: staticcheck // plugins.DataPlugin deprecated - res := &plugins.DataQueryResult{Meta: simplejson.New(), RefID: "A"} + res := &backend.DataResponse{} query := &cloudMonitoringTimeSeriesFilter{Params: url.Values{}} err = query.parseResponse(res, data, "") require.NoError(t, err) - frames, err := res.Dataframes.Decoded() + frames := res.Frames require.NoError(t, err) assert.Equal(t, "Bps", frames[0].Fields[1].Config.Unit) }) @@ -918,12 +912,11 @@ func TestCloudMonitoring(t *testing.T) { data, err := loadTestFile("./test-data/2-series-response-no-agg.json") require.NoError(t, err) assert.Equal(t, 3, len(data.TimeSeries)) - //nolint: staticcheck // plugins.DataPlugin deprecated - res := &plugins.DataQueryResult{Meta: simplejson.New(), RefID: "A"} + res := &backend.DataResponse{} query := &cloudMonitoringTimeSeriesFilter{Params: url.Values{}} err = query.parseResponse(res, data, "") require.NoError(t, err) - frames, err := res.Dataframes.Decoded() + frames := res.Frames require.NoError(t, err) assert.Equal(t, "", frames[0].Fields[1].Config.Unit) }) @@ -937,21 +930,20 @@ func TestCloudMonitoring(t *testing.T) { t.Run("and alias by is expanded", func(t *testing.T) { fromStart := time.Date(2018, 3, 15, 13, 0, 0, 0, time.UTC).In(time.Local) - //nolint: staticcheck // plugins.DataPlugin deprecated - res := &plugins.DataQueryResult{Meta: simplejson.New(), RefID: "A"} + + res := &backend.DataResponse{} query := &cloudMonitoringTimeSeriesQuery{ ProjectName: "test-proj", Query: "test-query", AliasBy: "{{project}} - {{resource.label.zone}} - {{resource.label.instance_id}} - {{metric.label.response_code_class}}", - timeRange: plugins.DataTimeRange{ - From: fmt.Sprintf("%v", fromStart.Unix()*1000), - To: fmt.Sprintf("%v", fromStart.Add(34*time.Minute).Unix()*1000), + timeRange: backend.TimeRange{ + From: fromStart, + To: fromStart.Add(34 * time.Minute), }, } err = query.parseResponse(res, data, "") require.NoError(t, err) - frames, err := res.Dataframes.Decoded() - require.NoError(t, err) + frames := res.Frames assert.Equal(t, "test-proj - asia-northeast1-c - 6724404429462225363 - 200", frames[0].Fields[1].Name) }) }) @@ -1045,17 +1037,17 @@ func TestCloudMonitoring(t *testing.T) { }) t.Run("and query preprocessor is not defined", func(t *testing.T) { - tsdbQuery := getBaseQuery() - tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{ + req := baseReq() + req.Queries[0].JSON = json.RawMessage(`{ "metricType": "a/metric/type", "crossSeriesReducer": "REDUCE_MIN", "perSeriesAligner": "REDUCE_SUM", "alignmentPeriod": "+60s", - "groupBys": []string{"labelname"}, - "view": "FULL", - }) + "groupBys": ["labelname"], + "view": "FULL" + }`) - qes, err := executor.buildQueryExecutors(tsdbQuery) + qes, err := service.buildQueryExecutors(req) require.NoError(t, err) queries := getCloudMonitoringQueriesFromInterface(t, qes) @@ -1072,18 +1064,18 @@ func TestCloudMonitoring(t *testing.T) { }) t.Run("and query preprocessor is set to none", func(t *testing.T) { - tsdbQuery := getBaseQuery() - tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{ + req := baseReq() + req.Queries[0].JSON = json.RawMessage(`{ "metricType": "a/metric/type", "crossSeriesReducer": "REDUCE_MIN", "perSeriesAligner": "REDUCE_SUM", "alignmentPeriod": "+60s", - "groupBys": []string{"labelname"}, + "groupBys": ["labelname"], "view": "FULL", - "preprocessor": "none", - }) + "preprocessor": "none" + }`) - qes, err := executor.buildQueryExecutors(tsdbQuery) + qes, err := service.buildQueryExecutors(req) require.NoError(t, err) queries := getCloudMonitoringQueriesFromInterface(t, qes) @@ -1100,18 +1092,18 @@ func TestCloudMonitoring(t *testing.T) { }) t.Run("and query preprocessor is set to rate and there's no group bys", func(t *testing.T) { - tsdbQuery := getBaseQuery() - tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{ + req := baseReq() + req.Queries[0].JSON = json.RawMessage(`{ "metricType": "a/metric/type", "crossSeriesReducer": "REDUCE_SUM", "perSeriesAligner": "REDUCE_MIN", "alignmentPeriod": "+60s", - "groupBys": []string{}, + "groupBys": [], "view": "FULL", - "preprocessor": "rate", - }) + "preprocessor": "rate" + }`) - qes, err := executor.buildQueryExecutors(tsdbQuery) + qes, err := service.buildQueryExecutors(req) require.NoError(t, err) queries := getCloudMonitoringQueriesFromInterface(t, qes) @@ -1126,18 +1118,18 @@ func TestCloudMonitoring(t *testing.T) { }) t.Run("and query preprocessor is set to rate and group bys exist", func(t *testing.T) { - tsdbQuery := getBaseQuery() - tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{ + req := baseReq() + req.Queries[0].JSON = json.RawMessage(`{ "metricType": "a/metric/type", "crossSeriesReducer": "REDUCE_SUM", "perSeriesAligner": "REDUCE_MIN", "alignmentPeriod": "+60s", - "groupBys": []string{"labelname"}, + "groupBys": ["labelname"], "view": "FULL", - "preprocessor": "rate", - }) + "preprocessor": "rate" + }`) - qes, err := executor.buildQueryExecutors(tsdbQuery) + qes, err := service.buildQueryExecutors(req) require.NoError(t, err) queries := getCloudMonitoringQueriesFromInterface(t, qes) @@ -1154,18 +1146,18 @@ func TestCloudMonitoring(t *testing.T) { }) t.Run("and query preprocessor is set to delta and there's no group bys", func(t *testing.T) { - tsdbQuery := getBaseQuery() - tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{ + req := baseReq() + req.Queries[0].JSON = json.RawMessage(`{ "metricType": "a/metric/type", "crossSeriesReducer": "REDUCE_MIN", "perSeriesAligner": "REDUCE_SUM", "alignmentPeriod": "+60s", - "groupBys": []string{}, + "groupBys": [], "view": "FULL", - "preprocessor": "delta", - }) + "preprocessor": "delta" + }`) - qes, err := executor.buildQueryExecutors(tsdbQuery) + qes, err := service.buildQueryExecutors(req) require.NoError(t, err) queries := getCloudMonitoringQueriesFromInterface(t, qes) @@ -1180,18 +1172,18 @@ func TestCloudMonitoring(t *testing.T) { }) t.Run("and query preprocessor is set to delta and group bys exist", func(t *testing.T) { - tsdbQuery := getBaseQuery() - tsdbQuery.Queries[0].Model = simplejson.NewFromAny(map[string]interface{}{ + req := baseReq() + req.Queries[0].JSON = json.RawMessage(`{ "metricType": "a/metric/type", "crossSeriesReducer": "REDUCE_MIN", "perSeriesAligner": "REDUCE_SUM", "alignmentPeriod": "+60s", - "groupBys": []string{"labelname"}, + "groupBys": ["labelname"], "view": "FULL", - "preprocessor": "delta", - }) + "preprocessor": "delta" + }`) - qes, err := executor.buildQueryExecutors(tsdbQuery) + qes, err := service.buildQueryExecutors(req) require.NoError(t, err) queries := getCloudMonitoringQueriesFromInterface(t, qes) @@ -1294,22 +1286,22 @@ func verifyDeepLink(t *testing.T, dl string, expectedTimeSelection map[string]st } } -func getBaseQuery() plugins.DataQuery { +func baseReq() *backend.QueryDataRequest { fromStart := time.Date(2018, 3, 15, 13, 0, 0, 0, time.UTC).In(time.Local) - query := plugins.DataQuery{ - TimeRange: &plugins.DataTimeRange{ - From: fmt.Sprintf("%v", fromStart.Unix()*1000), - To: fmt.Sprintf("%v", fromStart.Add(34*time.Minute).Unix()*1000), - }, - Queries: []plugins.DataSubQuery{ + query := &backend.QueryDataRequest{ + Queries: []backend.DataQuery{ { - Model: simplejson.NewFromAny(map[string]interface{}{ + RefID: "A", + TimeRange: backend.TimeRange{ + From: fromStart, + To: fromStart.Add(34 * time.Minute), + }, + JSON: json.RawMessage(`{ "metricType": "a/metric/type", "view": "FULL", "aliasBy": "testalias", - "type": "timeSeriesQuery", - }), - RefID: "A", + "type": "timeSeriesQuery" + }`), }, }, } diff --git a/pkg/tsdb/cloudmonitoring/time_series_filter.go b/pkg/tsdb/cloudmonitoring/time_series_filter.go index eeb6ed9769f4..f091b56781e7 100644 --- a/pkg/tsdb/cloudmonitoring/time_series_filter.go +++ b/pkg/tsdb/cloudmonitoring/time_series_filter.go @@ -10,81 +10,93 @@ import ( "strings" "time" + "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" - "github.com/grafana/grafana/pkg/components/simplejson" - "github.com/grafana/grafana/pkg/plugins" "github.com/opentracing/opentracing-go" - "golang.org/x/net/context/ctxhttp" ) -//nolint: staticcheck // plugins.DataPlugin deprecated -func (timeSeriesFilter *cloudMonitoringTimeSeriesFilter) run(ctx context.Context, tsdbQuery plugins.DataQuery, - e *Executor) (plugins.DataQueryResult, cloudMonitoringResponse, string, error) { - queryResult := plugins.DataQueryResult{Meta: simplejson.New(), RefID: timeSeriesFilter.RefID} +func (timeSeriesFilter *cloudMonitoringTimeSeriesFilter) run(ctx context.Context, req *backend.QueryDataRequest, + s *Service, dsInfo datasourceInfo) (*backend.DataResponse, cloudMonitoringResponse, string, error) { + dr := &backend.DataResponse{} projectName := timeSeriesFilter.ProjectName if projectName == "" { - defaultProject, err := e.getDefaultProject(ctx) + var err error + projectName, err = s.getDefaultProject(ctx, dsInfo) if err != nil { - queryResult.Error = err - return queryResult, cloudMonitoringResponse{}, "", nil + dr.Error = err + return dr, cloudMonitoringResponse{}, "", nil } - projectName = defaultProject slog.Info("No project name set on query, using project name from datasource", "projectName", projectName) } - req, err := e.createRequest(ctx, e.dsInfo, path.Join("cloudmonitoringv3/projects", projectName, "timeSeries"), nil) + r, err := s.createRequest(ctx, req.PluginContext, &dsInfo, path.Join("cloudmonitoringv3/projects", projectName, "timeSeries"), nil) if err != nil { - queryResult.Error = err - return queryResult, cloudMonitoringResponse{}, "", nil + dr.Error = err + return dr, cloudMonitoringResponse{}, "", nil } - req.URL.RawQuery = timeSeriesFilter.Params.Encode() - alignmentPeriod, ok := req.URL.Query()["aggregation.alignmentPeriod"] + r.URL.RawQuery = timeSeriesFilter.Params.Encode() + alignmentPeriod, ok := r.URL.Query()["aggregation.alignmentPeriod"] if ok { seconds, err := strconv.ParseInt(alignmentPeriodRe.FindString(alignmentPeriod[0]), 10, 64) if err == nil { - queryResult.Meta.Set("alignmentPeriod", seconds) + if len(dr.Frames) == 0 { + dr.Frames = append(dr.Frames, data.NewFrame("")) + } + firstFrame := dr.Frames[0] + if firstFrame.Meta == nil { + firstFrame.SetMeta(&data.FrameMeta{ + Custom: map[string]interface{}{ + "alignmentPeriod": seconds, + }, + }) + } } } span, ctx := opentracing.StartSpanFromContext(ctx, "cloudMonitoring query") span.SetTag("target", timeSeriesFilter.Target) - span.SetTag("from", tsdbQuery.TimeRange.From) - span.SetTag("until", tsdbQuery.TimeRange.To) - span.SetTag("datasource_id", e.dsInfo.Id) - span.SetTag("org_id", e.dsInfo.OrgId) + span.SetTag("from", req.Queries[0].TimeRange.From) + span.SetTag("until", req.Queries[0].TimeRange.To) + span.SetTag("datasource_id", dsInfo.id) + span.SetTag("org_id", req.PluginContext.OrgID) defer span.Finish() if err := opentracing.GlobalTracer().Inject( span.Context(), opentracing.HTTPHeaders, - opentracing.HTTPHeadersCarrier(req.Header)); err != nil { - queryResult.Error = err - return queryResult, cloudMonitoringResponse{}, "", nil + opentracing.HTTPHeadersCarrier(r.Header)); err != nil { + dr.Error = err + return dr, cloudMonitoringResponse{}, "", nil } - res, err := ctxhttp.Do(ctx, e.httpClient, req) + r = r.WithContext(ctx) + res, err := dsInfo.client.Do(r) if err != nil { - queryResult.Error = err - return queryResult, cloudMonitoringResponse{}, "", nil + dr.Error = err + return dr, cloudMonitoringResponse{}, "", nil } - data, err := unmarshalResponse(res) + d, err := unmarshalResponse(res) if err != nil { - queryResult.Error = err - return queryResult, cloudMonitoringResponse{}, "", nil + dr.Error = err + return dr, cloudMonitoringResponse{}, "", nil } - return queryResult, data, req.URL.RawQuery, nil + return dr, d, r.URL.RawQuery, nil } -//nolint: staticcheck // plugins.DataPlugin deprecated -func (timeSeriesFilter *cloudMonitoringTimeSeriesFilter) parseResponse(queryRes *plugins.DataQueryResult, +//nolint: gocyclo +func (timeSeriesFilter *cloudMonitoringTimeSeriesFilter) parseResponse(queryRes *backend.DataResponse, response cloudMonitoringResponse, executedQueryString string) error { labels := make(map[string]map[string]bool) frames := data.Frames{} + + customFrameMeta := map[string]interface{}{} + customFrameMeta["alignmentPeriod"] = timeSeriesFilter.Params.Get("aggregation.alignmentPeriod") + customFrameMeta["perSeriesAligner"] = timeSeriesFilter.Params.Get("aggregation.perSeriesAligner") for _, series := range response.TimeSeries { seriesLabels := data.Labels{} defaultMetricName := series.Metric.Type @@ -95,10 +107,6 @@ func (timeSeriesFilter *cloudMonitoringTimeSeriesFilter) parseResponse(queryRes frame.RefID = timeSeriesFilter.RefID frame.Meta = &data.FrameMeta{ ExecutedQueryString: executedQueryString, - Custom: map[string]interface{}{ - "alignmentPeriod": timeSeriesFilter.Params.Get("aggregation.alignmentPeriod"), - "perSeriesAligner": timeSeriesFilter.Params.Get("aggregation.perSeriesAligner"), - }, } for key, value := range series.Metric.Labels { @@ -155,8 +163,7 @@ func (timeSeriesFilter *cloudMonitoringTimeSeriesFilter) parseResponse(queryRes // reverse the order to be ascending if series.ValueType != "DISTRIBUTION" { - timeSeriesFilter.handleNonDistributionSeries( - series, defaultMetricName, seriesLabels, queryRes, frame) + timeSeriesFilter.handleNonDistributionSeries(series, defaultMetricName, seriesLabels, frame) frames = append(frames, frame) continue } @@ -214,12 +221,12 @@ func (timeSeriesFilter *cloudMonitoringTimeSeriesFilter) parseResponse(queryRes setDisplayNameAsFieldName(valueField) buckets[i] = &data.Frame{ - Name: frameName, + Name: frameName, + RefID: timeSeriesFilter.RefID, Fields: []*data.Field{ timeField, valueField, }, - RefID: timeSeriesFilter.RefID, } } } @@ -233,24 +240,30 @@ func (timeSeriesFilter *cloudMonitoringTimeSeriesFilter) parseResponse(queryRes frames = addConfigData(frames, dl, response.Unit) } - queryRes.Dataframes = plugins.NewDecodedDataFrames(frames) - labelsByKey := make(map[string][]string) for key, values := range labels { for value := range values { labelsByKey[key] = append(labelsByKey[key], value) } } + customFrameMeta["labels"] = labelsByKey + customFrameMeta["groupBys"] = timeSeriesFilter.GroupBys + + for _, frame := range frames { + if frame.Meta != nil { + frame.Meta.Custom = customFrameMeta + } else { + frame.SetMeta(&data.FrameMeta{Custom: customFrameMeta}) + } + } + + queryRes.Frames = frames - queryRes.Meta.Set("labels", labelsByKey) - queryRes.Meta.Set("groupBys", timeSeriesFilter.GroupBys) return nil } -//nolint: staticcheck // plugins.DataPlugin deprecated func (timeSeriesFilter *cloudMonitoringTimeSeriesFilter) handleNonDistributionSeries(series timeSeries, - defaultMetricName string, seriesLabels map[string]string, queryRes *plugins.DataQueryResult, - frame *data.Frame) { + defaultMetricName string, seriesLabels map[string]string, frame *data.Frame) { for i := 0; i < len(series.Points); i++ { point := series.Points[i] value := point.Value.DoubleValue @@ -279,9 +292,8 @@ func (timeSeriesFilter *cloudMonitoringTimeSeriesFilter) handleNonDistributionSe setDisplayNameAsFieldName(dataField) } -//nolint: staticcheck // plugins.DataPlugin deprecated -func (timeSeriesFilter *cloudMonitoringTimeSeriesFilter) parseToAnnotations(queryRes *plugins.DataQueryResult, - response cloudMonitoringResponse, title string, text string, tags string) error { +func (timeSeriesFilter *cloudMonitoringTimeSeriesFilter) parseToAnnotations(dr *backend.DataResponse, + response cloudMonitoringResponse, title, text, tags string) error { frames := data.Frames{} for _, series := range response.TimeSeries { if len(series.Points) == 0 { @@ -301,14 +313,14 @@ func (timeSeriesFilter *cloudMonitoringTimeSeriesFilter) parseToAnnotations(quer annotation["text"] = append(annotation["text"], formatAnnotationText(text, value, series.Metric.Type, series.Metric.Labels, series.Resource.Labels)) } - frames = append(frames, data.NewFrame(queryRes.RefID, + frames = append(frames, data.NewFrame(timeSeriesFilter.getRefID(), data.NewField("time", nil, annotation["time"]), data.NewField("title", nil, annotation["title"]), data.NewField("tags", nil, annotation["tags"]), data.NewField("text", nil, annotation["text"]), )) } - queryRes.Dataframes = plugins.NewDecodedDataFrames(frames) + dr.Frames = frames return nil } diff --git a/pkg/tsdb/cloudmonitoring/time_series_query.go b/pkg/tsdb/cloudmonitoring/time_series_query.go index 2fb54136ec21..645c413889c4 100644 --- a/pkg/tsdb/cloudmonitoring/time_series_query.go +++ b/pkg/tsdb/cloudmonitoring/time_series_query.go @@ -11,42 +11,34 @@ import ( "strings" "time" - "github.com/grafana/grafana-plugin-sdk-go/data" - "github.com/grafana/grafana/pkg/components/simplejson" - "github.com/grafana/grafana/pkg/plugins" - "github.com/grafana/grafana/pkg/tsdb/interval" "github.com/opentracing/opentracing-go" - "golang.org/x/net/context/ctxhttp" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/data" + + "github.com/grafana/grafana/pkg/tsdb/intervalv2" ) -//nolint: staticcheck // plugins.DataPlugin deprecated -func (timeSeriesQuery cloudMonitoringTimeSeriesQuery) run(ctx context.Context, tsdbQuery plugins.DataQuery, - e *Executor) (plugins.DataQueryResult, cloudMonitoringResponse, string, error) { - queryResult := plugins.DataQueryResult{Meta: simplejson.New(), RefID: timeSeriesQuery.RefID} +func (timeSeriesQuery cloudMonitoringTimeSeriesQuery) run(ctx context.Context, req *backend.QueryDataRequest, + s *Service, dsInfo datasourceInfo) (*backend.DataResponse, cloudMonitoringResponse, string, error) { + dr := &backend.DataResponse{} projectName := timeSeriesQuery.ProjectName + if projectName == "" { - defaultProject, err := e.getDefaultProject(ctx) + var err error + projectName, err = s.getDefaultProject(ctx, dsInfo) if err != nil { - queryResult.Error = err - return queryResult, cloudMonitoringResponse{}, "", nil + dr.Error = err + return dr, cloudMonitoringResponse{}, "", nil } - projectName = defaultProject slog.Info("No project name set on query, using project name from datasource", "projectName", projectName) } - from, err := tsdbQuery.TimeRange.ParseFrom() - if err != nil { - queryResult.Error = err - return queryResult, cloudMonitoringResponse{}, "", nil - } - to, err := tsdbQuery.TimeRange.ParseTo() - if err != nil { - queryResult.Error = err - return queryResult, cloudMonitoringResponse{}, "", nil - } - intervalCalculator := interval.NewCalculator(interval.CalculatorOptions{}) - interval := intervalCalculator.Calculate(*tsdbQuery.TimeRange, time.Duration(timeSeriesQuery.IntervalMS/1000)*time.Second) + intervalCalculator := intervalv2.NewCalculator(intervalv2.CalculatorOptions{}) + interval := intervalCalculator.Calculate(req.Queries[0].TimeRange, time.Duration(timeSeriesQuery.IntervalMS/1000)*time.Second, req.Queries[0].MaxDataPoints) + from := req.Queries[0].TimeRange.From + to := req.Queries[0].TimeRange.To timeFormat := "2006/01/02-15:04:05" timeSeriesQuery.Query += fmt.Sprintf(" | graph_period %s | within d'%s', d'%s'", interval.Text, from.UTC().Format(timeFormat), to.UTC().Format(timeFormat)) @@ -54,53 +46,54 @@ func (timeSeriesQuery cloudMonitoringTimeSeriesQuery) run(ctx context.Context, t "query": timeSeriesQuery.Query, }) if err != nil { - queryResult.Error = err - return queryResult, cloudMonitoringResponse{}, "", nil + dr.Error = err + return dr, cloudMonitoringResponse{}, "", nil } - req, err := e.createRequest(ctx, e.dsInfo, path.Join("cloudmonitoringv3/projects", projectName, "timeSeries:query"), bytes.NewBuffer(buf)) + r, err := s.createRequest(ctx, req.PluginContext, &dsInfo, path.Join("cloudmonitoringv3/projects", projectName, "timeSeries:query"), bytes.NewBuffer(buf)) if err != nil { - queryResult.Error = err - return queryResult, cloudMonitoringResponse{}, "", nil + dr.Error = err + return dr, cloudMonitoringResponse{}, "", nil } span, ctx := opentracing.StartSpanFromContext(ctx, "cloudMonitoring MQL query") span.SetTag("query", timeSeriesQuery.Query) - span.SetTag("from", tsdbQuery.TimeRange.From) - span.SetTag("until", tsdbQuery.TimeRange.To) - span.SetTag("datasource_id", e.dsInfo.Id) - span.SetTag("org_id", e.dsInfo.OrgId) + span.SetTag("from", req.Queries[0].TimeRange.From) + span.SetTag("until", req.Queries[0].TimeRange.To) + span.SetTag("datasource_id", dsInfo.id) + span.SetTag("org_id", req.PluginContext.OrgID) defer span.Finish() if err := opentracing.GlobalTracer().Inject( span.Context(), opentracing.HTTPHeaders, - opentracing.HTTPHeadersCarrier(req.Header)); err != nil { - queryResult.Error = err - return queryResult, cloudMonitoringResponse{}, "", nil + opentracing.HTTPHeadersCarrier(r.Header)); err != nil { + dr.Error = err + return dr, cloudMonitoringResponse{}, "", nil } - res, err := ctxhttp.Do(ctx, e.httpClient, req) + r = r.WithContext(ctx) + res, err := dsInfo.client.Do(r) if err != nil { - queryResult.Error = err - return queryResult, cloudMonitoringResponse{}, "", nil + dr.Error = err + return dr, cloudMonitoringResponse{}, "", nil } - data, err := unmarshalResponse(res) - + d, err := unmarshalResponse(res) if err != nil { - queryResult.Error = err - return queryResult, cloudMonitoringResponse{}, "", nil + dr.Error = err + return dr, cloudMonitoringResponse{}, "", nil } - return queryResult, data, timeSeriesQuery.Query, nil + return dr, d, timeSeriesQuery.Query, nil } -//nolint: staticcheck // plugins.DataPlugin deprecated -func (timeSeriesQuery cloudMonitoringTimeSeriesQuery) parseResponse(queryRes *plugins.DataQueryResult, +func (timeSeriesQuery cloudMonitoringTimeSeriesQuery) parseResponse(queryRes *backend.DataResponse, response cloudMonitoringResponse, executedQueryString string) error { labels := make(map[string]map[string]bool) frames := data.Frames{} + + customFrameMeta := map[string]interface{}{} for _, series := range response.TimeSeriesData { seriesLabels := make(map[string]string) frame := data.NewFrameOfFieldTypes("", len(series.PointData), data.FieldTypeTime, data.FieldTypeFloat64) @@ -252,23 +245,29 @@ func (timeSeriesQuery cloudMonitoringTimeSeriesQuery) parseResponse(queryRes *pl frames = addConfigData(frames, dl, response.Unit) } - queryRes.Dataframes = plugins.NewDecodedDataFrames(frames) - labelsByKey := make(map[string][]string) for key, values := range labels { for value := range values { labelsByKey[key] = append(labelsByKey[key], value) } } + customFrameMeta["labels"] = labelsByKey + + for _, frame := range frames { + if frame.Meta != nil { + frame.Meta.Custom = customFrameMeta + } else { + frame.SetMeta(&data.FrameMeta{Custom: customFrameMeta}) + } + } - queryRes.Meta.Set("labels", labelsByKey) + queryRes.Frames = frames return nil } -//nolint: staticcheck // plugins.DataPlugin deprecated -func (timeSeriesQuery cloudMonitoringTimeSeriesQuery) parseToAnnotations(queryRes *plugins.DataQueryResult, - data cloudMonitoringResponse, title string, text string, tags string) error { +func (timeSeriesQuery cloudMonitoringTimeSeriesQuery) parseToAnnotations(queryRes *backend.DataResponse, + data cloudMonitoringResponse, title, text, tags string) error { annotations := make([]map[string]string, 0) for _, series := range data.TimeSeriesData { @@ -315,7 +314,7 @@ func (timeSeriesQuery cloudMonitoringTimeSeriesQuery) parseToAnnotations(queryRe } } - transformAnnotationToTable(annotations, queryRes) + timeSeriesQuery.transformAnnotationToFrame(annotations, queryRes) return nil } @@ -348,8 +347,8 @@ func (timeSeriesQuery cloudMonitoringTimeSeriesQuery) buildDeepLink() string { }, "timeSelection": map[string]string{ "timeRange": "custom", - "start": timeSeriesQuery.timeRange.MustGetFrom().Format(time.RFC3339Nano), - "end": timeSeriesQuery.timeRange.MustGetTo().Format(time.RFC3339Nano), + "start": timeSeriesQuery.timeRange.From.Format(time.RFC3339Nano), + "end": timeSeriesQuery.timeRange.To.Format(time.RFC3339Nano), }, } diff --git a/pkg/tsdb/cloudmonitoring/types.go b/pkg/tsdb/cloudmonitoring/types.go index 95c6dc76a513..ec6365eef651 100644 --- a/pkg/tsdb/cloudmonitoring/types.go +++ b/pkg/tsdb/cloudmonitoring/types.go @@ -5,18 +5,15 @@ import ( "net/url" "time" - "github.com/grafana/grafana/pkg/plugins" + "github.com/grafana/grafana-plugin-sdk-go/backend" ) type ( cloudMonitoringQueryExecutor interface { - //nolint: staticcheck // plugins.DataPlugin deprecated - run(ctx context.Context, tsdbQuery plugins.DataQuery, e *Executor) ( - plugins.DataQueryResult, cloudMonitoringResponse, string, error) - //nolint: staticcheck // plugins.DataPlugin deprecated - parseResponse(queryRes *plugins.DataQueryResult, data cloudMonitoringResponse, executedQueryString string) error - //nolint: staticcheck // plugins.DataPlugin deprecated - parseToAnnotations(queryRes *plugins.DataQueryResult, data cloudMonitoringResponse, title string, text string, tags string) error + run(ctx context.Context, req *backend.QueryDataRequest, s *Service, dsInfo datasourceInfo) ( + *backend.DataResponse, cloudMonitoringResponse, string, error) + parseResponse(dr *backend.DataResponse, data cloudMonitoringResponse, executedQueryString string) error + parseToAnnotations(dr *backend.DataResponse, data cloudMonitoringResponse, title, text, tags string) error buildDeepLink() string getRefID() string } @@ -41,7 +38,7 @@ type ( Query string IntervalMS int64 AliasBy string - timeRange plugins.DataTimeRange + timeRange backend.TimeRange } metricQuery struct { diff --git a/pkg/tsdb/service.go b/pkg/tsdb/service.go index 64fca330ba56..916a5dfeeb00 100644 --- a/pkg/tsdb/service.go +++ b/pkg/tsdb/service.go @@ -2,7 +2,6 @@ package tsdb import ( "context" - "fmt" "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/plugins" @@ -10,72 +9,35 @@ import ( "github.com/grafana/grafana/pkg/services/datasources" "github.com/grafana/grafana/pkg/services/oauthtoken" "github.com/grafana/grafana/pkg/setting" - "github.com/grafana/grafana/pkg/tsdb/cloudmonitoring" _ "github.com/grafana/grafana/pkg/tsdb/postgres" ) // NewService returns a new Service. func NewService( - cfg *setting.Cfg, - pluginManager plugins.Manager, - backendPluginManager backendplugin.Manager, - oauthTokenService *oauthtoken.Service, - dataSourcesService *datasources.Service, - cloudMonitoringService *cloudmonitoring.Service, -) *Service { - s := newService(cfg, pluginManager, backendPluginManager, oauthTokenService, dataSourcesService) - - // register backend data sources using legacy plugin - // contracts/non-SDK contracts - s.registry["stackdriver"] = cloudMonitoringService.NewExecutor - - return s + cfg *setting.Cfg, backendPluginManager backendplugin.Manager, + oauthTokenService *oauthtoken.Service, dataSourcesService *datasources.Service) *Service { + return newService(cfg, backendPluginManager, oauthTokenService, dataSourcesService) } -func newService(cfg *setting.Cfg, manager plugins.Manager, backendPluginManager backendplugin.Manager, +func newService(cfg *setting.Cfg, backendPluginManager backendplugin.Manager, oauthTokenService oauthtoken.OAuthTokenService, dataSourcesService *datasources.Service) *Service { return &Service{ Cfg: cfg, - PluginManager: manager, BackendPluginManager: backendPluginManager, - // nolint:staticcheck // plugins.DataPlugin deprecated - registry: map[string]func(*models.DataSource) (plugins.DataPlugin, error){}, - OAuthTokenService: oauthTokenService, - DataSourcesService: dataSourcesService, + OAuthTokenService: oauthTokenService, + DataSourcesService: dataSourcesService, } } // Service handles data requests to data sources. type Service struct { Cfg *setting.Cfg - PluginManager plugins.Manager BackendPluginManager backendplugin.Manager OAuthTokenService oauthtoken.OAuthTokenService DataSourcesService *datasources.Service - //nolint: staticcheck // plugins.DataPlugin deprecated - registry map[string]func(*models.DataSource) (plugins.DataPlugin, error) } //nolint: staticcheck // plugins.DataPlugin deprecated func (s *Service) HandleRequest(ctx context.Context, ds *models.DataSource, query plugins.DataQuery) (plugins.DataResponse, error) { - if factory, exists := s.registry[ds.Type]; exists { - var err error - plugin, err := factory(ds) - if err != nil { - //nolint: staticcheck // plugins.DataPlugin deprecated - return plugins.DataResponse{}, fmt.Errorf("could not instantiate endpoint for data plugin %q: %w", - ds.Type, err) - } - - return plugin.DataQuery(ctx, ds, query) - } - return dataPluginQueryAdapter(ds.Type, s.BackendPluginManager, s.OAuthTokenService, s.DataSourcesService). - DataQuery(ctx, ds, query) -} - -// RegisterQueryHandler registers a query handler factory. -// This is only exposed for tests! -//nolint: staticcheck // plugins.DataPlugin deprecated -func (s *Service) RegisterQueryHandler(name string, factory func(*models.DataSource) (plugins.DataPlugin, error)) { - s.registry[name] = factory + return dataPluginQueryAdapter(ds.Type, s.BackendPluginManager, s.OAuthTokenService, s.DataSourcesService).DataQuery(ctx, ds, query) } diff --git a/pkg/tsdb/service_test.go b/pkg/tsdb/service_test.go index 5c7252578396..f1f321b5e338 100644 --- a/pkg/tsdb/service_test.go +++ b/pkg/tsdb/service_test.go @@ -10,7 +10,6 @@ import ( "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/plugins" "github.com/grafana/grafana/pkg/plugins/backendplugin" - "github.com/grafana/grafana/pkg/plugins/manager" "github.com/grafana/grafana/pkg/services/datasources" "github.com/grafana/grafana/pkg/services/encryption/ossencryption" "github.com/grafana/grafana/pkg/setting" @@ -19,59 +18,20 @@ import ( ) func TestHandleRequest(t *testing.T) { - t.Run("Should return query result when handling request for query", func(t *testing.T) { - req := plugins.DataQuery{ - Queries: []plugins.DataSubQuery{ - {RefID: "A", DataSource: &models.DataSource{Id: 1, Type: "test"}}, - }, - } - - svc, exe, _ := createService() - exe.Return("A", plugins.DataTimeSeriesSlice{plugins.DataTimeSeries{Name: "argh"}}) - - res, err := svc.HandleRequest(context.TODO(), &models.DataSource{Id: 1, Type: "test"}, req) - require.NoError(t, err) - require.NotEmpty(t, res.Results["A"].Series) - require.Equal(t, "argh", res.Results["A"].Series[0].Name) - }) - - t.Run("Should return query results when handling request for two queries with same data source", func(t *testing.T) { - req := plugins.DataQuery{ - Queries: []plugins.DataSubQuery{ - {RefID: "A", DataSource: &models.DataSource{Id: 1, Type: "test"}}, - {RefID: "B", DataSource: &models.DataSource{Id: 1, Type: "test"}}, - }, - } - - svc, exe, _ := createService() - exe.Return("A", plugins.DataTimeSeriesSlice{plugins.DataTimeSeries{Name: "argh"}}) - exe.Return("B", plugins.DataTimeSeriesSlice{plugins.DataTimeSeries{Name: "barg"}}) - - res, err := svc.HandleRequest(context.TODO(), &models.DataSource{Id: 1, Type: "test"}, req) - require.NoError(t, err) - - require.Len(t, res.Results, 2) - require.Equal(t, "argh", res.Results["A"].Series[0].Name) - require.Equal(t, "barg", res.Results["B"].Series[0].Name) - }) - - t.Run("Should fallback to backend plugin manager when handling request for query with unregistered type", func(t *testing.T) { - svc, _, manager := createService() + t.Run("Should invoke plugin manager QueryData when handling request for query", func(t *testing.T) { + svc, _, pm := createService() backendPluginManagerCalled := false - manager.QueryDataHandlerFunc = backend.QueryDataHandlerFunc(func(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { + pm.QueryDataHandlerFunc = func(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { backendPluginManagerCalled = true - return &backend.QueryDataResponse{}, nil - }) + return backend.NewQueryDataResponse(), nil + } ds := &models.DataSource{Id: 12, Type: "unregisteredType", JsonData: simplejson.New()} req := plugins.DataQuery{ TimeRange: &plugins.DataTimeRange{}, Queries: []plugins.DataSubQuery{ - { - RefID: "A", - DataSource: ds, - Model: simplejson.New(), - }, + {RefID: "A", DataSource: &models.DataSource{Id: 1, Type: "test"}, Model: simplejson.New()}, + {RefID: "B", DataSource: &models.DataSource{Id: 1, Type: "test"}, Model: simplejson.New()}, }, } _, err := svc.HandleRequest(context.Background(), ds, req) @@ -142,21 +102,18 @@ func (s *fakeOAuthTokenService) IsOAuthPassThruEnabled(*models.DataSource) bool func createService() (*Service, *fakeExecutor, *fakeBackendPM) { fakeBackendPM := &fakeBackendPM{} - manager := &manager.PluginManager{ - BackendPluginManager: fakeBackendPM, - } dsService := datasources.ProvideService(bus.New(), nil, ossencryption.ProvideService()) - - s := newService(setting.NewCfg(), manager, fakeBackendPM, &fakeOAuthTokenService{}, dsService) + s := newService( + setting.NewCfg(), + fakeBackendPM, + &fakeOAuthTokenService{}, + dsService, + ) e := &fakeExecutor{ //nolint: staticcheck // plugins.DataPlugin deprecated results: make(map[string]plugins.DataQueryResult), resultsFn: make(map[string]resultsFn), } - //nolint: staticcheck // plugins.DataPlugin deprecated - s.registry["test"] = func(*models.DataSource) (plugins.DataPlugin, error) { - return e, nil - } return s, e, fakeBackendPM } diff --git a/public/app/plugins/datasource/cloud-monitoring/api.ts b/public/app/plugins/datasource/cloud-monitoring/api.ts index 7cb1db808895..4bb089808055 100644 --- a/public/app/plugins/datasource/cloud-monitoring/api.ts +++ b/public/app/plugins/datasource/cloud-monitoring/api.ts @@ -70,7 +70,7 @@ export default class Api { post(data: Record): Observable> { return getBackendSrv().fetch({ - url: '/api/tsdb/query', + url: '/api/ds/query', method: 'POST', data, }); diff --git a/public/app/plugins/datasource/cloud-monitoring/datasource.ts b/public/app/plugins/datasource/cloud-monitoring/datasource.ts index 880565f33896..bb2e866eacbc 100644 --- a/public/app/plugins/datasource/cloud-monitoring/datasource.ts +++ b/public/app/plugins/datasource/cloud-monitoring/datasource.ts @@ -163,11 +163,10 @@ export default class CloudMonitoringDatasource extends DataSourceWithBackend< }); }), map(({ data }) => { - return data; - }), - map((response) => { - const result = response.results[refId]; - return result && result.meta ? result.meta.labels : {}; + const dataQueryResponse = toDataQueryResponse({ + data: data, + }); + return dataQueryResponse?.data[0]?.meta?.custom?.labels ?? {}; }) ) ); @@ -219,9 +218,10 @@ export default class CloudMonitoringDatasource extends DataSourceWithBackend< }) .pipe( map(({ data }) => { - return data && data.results && data.results.getGCEDefaultProject && data.results.getGCEDefaultProject.meta - ? data.results.getGCEDefaultProject.meta.defaultProject - : ''; + const dataQueryResponse = toDataQueryResponse({ + data: data, + }); + return dataQueryResponse?.data[0]?.meta?.custom?.defaultProject ?? ''; }), catchError((err) => { return throwError(err.data.error);