From c0ff685d3b89145167e7a945390a1f51bfced2f1 Mon Sep 17 00:00:00 2001 From: Ryan McKinley Date: Tue, 14 Dec 2021 09:38:20 -0800 Subject: [PATCH] Live: support query execution with live RPC (#43118) Co-authored-by: Alexander Emelin --- pkg/api/dashboard_test.go | 2 +- pkg/api/http_server.go | 23 +-- pkg/api/metrics.go | 229 +----------------------------- pkg/server/wire.go | 2 + pkg/services/live/live.go | 68 ++++++++- pkg/services/query/errors.go | 17 +++ pkg/services/query/query.go | 265 +++++++++++++++++++++++++++++++++++ 7 files changed, 359 insertions(+), 247 deletions(-) create mode 100644 pkg/services/query/errors.go create mode 100644 pkg/services/query/query.go diff --git a/pkg/api/dashboard_test.go b/pkg/api/dashboard_test.go index adde485a539..d8e03449817 100644 --- a/pkg/api/dashboard_test.go +++ b/pkg/api/dashboard_test.go @@ -89,7 +89,7 @@ type testState struct { func newTestLive(t *testing.T) *live.GrafanaLive { cfg := &setting.Cfg{AppURL: "http://localhost:3000/"} - gLive, err := live.ProvideService(nil, cfg, routing.NewRouteRegister(), nil, nil, nil, nil, sqlstore.InitTestDB(t), nil, &usagestats.UsageStatsMock{T: t}) + gLive, err := live.ProvideService(nil, cfg, routing.NewRouteRegister(), nil, nil, nil, nil, sqlstore.InitTestDB(t), nil, &usagestats.UsageStatsMock{T: t}, nil) require.NoError(t, err) return gLive } diff --git a/pkg/api/http_server.go b/pkg/api/http_server.go index 52ec1ece523..ea0a039316d 100644 --- a/pkg/api/http_server.go +++ b/pkg/api/http_server.go @@ -13,14 +13,14 @@ import ( "strings" "sync" + "github.com/grafana/grafana/pkg/services/query" + "github.com/grafana/grafana/pkg/api/routing" httpstatic "github.com/grafana/grafana/pkg/api/static" "github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/components/simplejson" - "github.com/grafana/grafana/pkg/expr" "github.com/grafana/grafana/pkg/infra/localcache" "github.com/grafana/grafana/pkg/infra/log" - "github.com/grafana/grafana/pkg/infra/metrics" "github.com/grafana/grafana/pkg/infra/remotecache" "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/login/social" @@ -43,8 +43,6 @@ import ( "github.com/grafana/grafana/pkg/services/live/pushhttp" "github.com/grafana/grafana/pkg/services/login" "github.com/grafana/grafana/pkg/services/ngalert" - "github.com/grafana/grafana/pkg/services/notifications" - "github.com/grafana/grafana/pkg/services/oauthtoken" "github.com/grafana/grafana/pkg/services/provisioning" "github.com/grafana/grafana/pkg/services/quota" "github.com/grafana/grafana/pkg/services/rendering" @@ -103,19 +101,16 @@ type HTTPServer struct { AlertNG *ngalert.AlertNG LibraryPanelService librarypanels.Service LibraryElementService libraryelements.Service - notificationService *notifications.NotificationService SocialService social.Service - OAuthTokenService oauthtoken.OAuthTokenService Listener net.Listener EncryptionService encryption.Internal SecretsService secrets.Service DataSourcesService *datasources.Service cleanUpService *cleanup.CleanUpService tracingService tracing.Tracer - internalMetricsSvc *metrics.InternalMetricsService updateChecker *updatechecker.Service searchUsersService searchusers.Service - expressionService *expr.Service + queryDataService *query.Service } type ServerOptions struct { @@ -137,11 +132,10 @@ func ProvideHTTPServer(opts ServerOptions, cfg *setting.Cfg, routeRegister routi contextHandler *contexthandler.ContextHandler, schemaService *schemaloader.SchemaLoaderService, alertNG *ngalert.AlertNG, libraryPanelService librarypanels.Service, libraryElementService libraryelements.Service, - notificationService *notifications.NotificationService, tracingService tracing.Tracer, - internalMetricsSvc *metrics.InternalMetricsService, quotaService *quota.QuotaService, - socialService social.Service, oauthTokenService oauthtoken.OAuthTokenService, + quotaService *quota.QuotaService, socialService social.Service, tracingService tracing.Tracer, encryptionService encryption.Internal, updateChecker *updatechecker.Service, searchUsersService searchusers.Service, - dataSourcesService *datasources.Service, secretsService secrets.Service, expressionService *expr.Service) (*HTTPServer, error) { + dataSourcesService *datasources.Service, secretsService secrets.Service, + queryDataService *query.Service) (*HTTPServer, error) { web.Env = cfg.Env m := web.New() @@ -182,19 +176,16 @@ func ProvideHTTPServer(opts ServerOptions, cfg *setting.Cfg, routeRegister routi LibraryPanelService: libraryPanelService, LibraryElementService: libraryElementService, QuotaService: quotaService, - notificationService: notificationService, tracingService: tracingService, - internalMetricsSvc: internalMetricsSvc, log: log.New("http.server"), web: m, Listener: opts.Listener, SocialService: socialService, - OAuthTokenService: oauthTokenService, EncryptionService: encryptionService, SecretsService: secretsService, DataSourcesService: dataSourcesService, searchUsersService: searchUsersService, - expressionService: expressionService, + queryDataService: queryDataService, } if hs.Listener != nil { hs.log.Debug("Using provided listener") diff --git a/pkg/api/metrics.go b/pkg/api/metrics.go index 97a9274cc64..b250c2a3817 100644 --- a/pkg/api/metrics.go +++ b/pkg/api/metrics.go @@ -1,44 +1,24 @@ package api import ( - "context" "errors" - "fmt" "net/http" - "time" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana/pkg/api/dtos" "github.com/grafana/grafana/pkg/api/response" - "github.com/grafana/grafana/pkg/components/simplejson" - "github.com/grafana/grafana/pkg/expr" "github.com/grafana/grafana/pkg/models" - "github.com/grafana/grafana/pkg/plugins/adapters" - "github.com/grafana/grafana/pkg/tsdb/grafanads" + "github.com/grafana/grafana/pkg/services/query" "github.com/grafana/grafana/pkg/tsdb/legacydata" "github.com/grafana/grafana/pkg/util" "github.com/grafana/grafana/pkg/web" ) -// ErrBadQuery returned whenever request is malformed and must contain a message -// suitable to return in API response. -type ErrBadQuery struct { - Message string -} - -func NewErrBadQuery(msg string) *ErrBadQuery { - return &ErrBadQuery{Message: msg} -} - -func (e ErrBadQuery) Error() string { - return fmt.Sprintf("bad query: %s", e.Message) -} - func (hs *HTTPServer) handleQueryMetricsError(err error) *response.NormalResponse { if errors.Is(err, models.ErrDataSourceAccessDenied) { return response.Error(http.StatusForbidden, "Access denied to data source", err) } - var badQuery *ErrBadQuery + var badQuery *query.ErrBadQuery if errors.As(err, &badQuery) { return response.Error(http.StatusBadRequest, util.Capitalize(badQuery.Message), err) } @@ -53,7 +33,7 @@ func (hs *HTTPServer) QueryMetricsV2(c *models.ReqContext) response.Response { return response.Error(http.StatusBadRequest, "bad request data", err) } - resp, err := hs.queryMetrics(c.Req.Context(), c.SignedInUser, c.SkipCache, reqDTO, true) + resp, err := hs.queryDataService.QueryData(c.Req.Context(), c.SignedInUser, c.SkipCache, reqDTO, true) if err != nil { return hs.handleQueryMetricsError(err) } @@ -70,7 +50,7 @@ func (hs *HTTPServer) QueryMetrics(c *models.ReqContext) response.Response { return response.Error(http.StatusBadRequest, "bad request data", err) } - sdkResp, err := hs.queryMetrics(c.Req.Context(), c.SignedInUser, c.SkipCache, reqDto, false) + sdkResp, err := hs.queryDataService.QueryData(c.Req.Context(), c.SignedInUser, c.SkipCache, reqDto, false) if err != nil { return hs.handleQueryMetricsError(err) } @@ -107,207 +87,6 @@ func (hs *HTTPServer) QueryMetrics(c *models.ReqContext) response.Response { return response.JSON(statusCode, &legacyResp) } -func (hs *HTTPServer) queryMetrics(ctx context.Context, user *models.SignedInUser, skipCache bool, reqDTO dtos.MetricRequest, handleExpressions bool) (*backend.QueryDataResponse, error) { - parsedReq, err := hs.parseMetricRequest(user, skipCache, reqDTO) - if err != nil { - return nil, err - } - if handleExpressions && parsedReq.hasExpression { - return hs.handleExpressions(ctx, user, parsedReq) - } - return hs.handleQueryData(ctx, user, parsedReq) -} - -// handleExpressions handles POST /api/ds/query when there is an expression. -func (hs *HTTPServer) handleExpressions(ctx context.Context, user *models.SignedInUser, parsedReq *parsedRequest) (*backend.QueryDataResponse, error) { - exprReq := expr.Request{ - OrgId: user.OrgId, - Queries: []expr.Query{}, - } - - for _, pq := range parsedReq.parsedQueries { - if pq.datasource == nil { - return nil, NewErrBadQuery(fmt.Sprintf("query mising datasource info: %s", pq.query.RefID)) - } - - exprReq.Queries = append(exprReq.Queries, expr.Query{ - JSON: pq.query.JSON, - Interval: pq.query.Interval, - RefID: pq.query.RefID, - MaxDataPoints: pq.query.MaxDataPoints, - QueryType: pq.query.QueryType, - Datasource: expr.DataSourceRef{ - Type: pq.datasource.Type, - UID: pq.datasource.Uid, - }, - TimeRange: expr.TimeRange{ - From: pq.query.TimeRange.From, - To: pq.query.TimeRange.To, - }, - }) - } - - qdr, err := hs.expressionService.TransformData(ctx, &exprReq) - if err != nil { - return nil, fmt.Errorf("expression request error: %w", err) - } - return qdr, nil -} - -func (hs *HTTPServer) handleQueryData(ctx context.Context, user *models.SignedInUser, parsedReq *parsedRequest) (*backend.QueryDataResponse, error) { - ds := parsedReq.parsedQueries[0].datasource - if err := hs.PluginRequestValidator.Validate(ds.Url, nil); err != nil { - return nil, models.ErrDataSourceAccessDenied - } - - instanceSettings, err := adapters.ModelToInstanceSettings(ds, hs.decryptSecureJsonDataFn()) - if err != nil { - return nil, fmt.Errorf("failed to convert data source to instance settings") - } - - req := &backend.QueryDataRequest{ - PluginContext: backend.PluginContext{ - OrgID: ds.OrgId, - PluginID: ds.Type, - User: adapters.BackendUserFromSignedInUser(user), - DataSourceInstanceSettings: instanceSettings, - }, - Headers: map[string]string{}, - Queries: []backend.DataQuery{}, - } - - if hs.OAuthTokenService.IsOAuthPassThruEnabled(ds) { - if token := hs.OAuthTokenService.GetCurrentOAuthToken(ctx, user); token != nil { - req.Headers["Authorization"] = fmt.Sprintf("%s %s", token.Type(), token.AccessToken) - - idToken, ok := token.Extra("id_token").(string) - if ok && idToken != "" { - req.Headers["X-ID-Token"] = idToken - } - } - } - - for _, q := range parsedReq.parsedQueries { - req.Queries = append(req.Queries, q.query) - } - - return hs.pluginClient.QueryData(ctx, req) -} - -type parsedQuery struct { - datasource *models.DataSource - query backend.DataQuery -} - -type parsedRequest struct { - hasExpression bool - parsedQueries []parsedQuery -} - -func (hs *HTTPServer) parseMetricRequest(user *models.SignedInUser, skipCache bool, reqDTO dtos.MetricRequest) (*parsedRequest, error) { - if len(reqDTO.Queries) == 0 { - return nil, NewErrBadQuery("no queries found") - } - - timeRange := legacydata.NewDataTimeRange(reqDTO.From, reqDTO.To) - req := &parsedRequest{ - hasExpression: false, - parsedQueries: []parsedQuery{}, - } - - // Parse the queries - datasources := map[string]*models.DataSource{} - for _, query := range reqDTO.Queries { - ds, err := hs.getDataSourceFromQuery(user, skipCache, query, datasources) - if err != nil { - return nil, err - } - if ds == nil { - return nil, NewErrBadQuery("invalid data source ID") - } - - datasources[ds.Uid] = ds - if expr.IsDataSource(ds.Uid) { - req.hasExpression = true - } - - hs.log.Debug("Processing metrics query", "query", query) - - modelJSON, err := query.MarshalJSON() - if err != nil { - return nil, err - } - - req.parsedQueries = append(req.parsedQueries, parsedQuery{ - datasource: ds, - query: backend.DataQuery{ - TimeRange: backend.TimeRange{ - From: timeRange.GetFromAsTimeUTC(), - To: timeRange.GetToAsTimeUTC(), - }, - RefID: query.Get("refId").MustString("A"), - MaxDataPoints: query.Get("maxDataPoints").MustInt64(100), - Interval: time.Duration(query.Get("intervalMs").MustInt64(1000)) * time.Millisecond, - QueryType: query.Get("queryType").MustString(""), - JSON: modelJSON, - }, - }) - } - - if !req.hasExpression { - if len(datasources) > 1 { - // We do not (yet) support mixed query type - return nil, NewErrBadQuery("all queries must use the same datasource") - } - } - - return req, nil -} - -func (hs *HTTPServer) getDataSourceFromQuery(user *models.SignedInUser, skipCache bool, query *simplejson.Json, history map[string]*models.DataSource) (*models.DataSource, error) { - var err error - uid := query.Get("datasource").Get("uid").MustString() - - // before 8.3 special types could be sent as datasource (expr) - if uid == "" { - uid = query.Get("datasource").MustString() - } - - // check cache value - ds, ok := history[uid] - if ok { - return ds, nil - } - - if expr.IsDataSource(uid) { - return expr.DataSourceModel(), nil - } - - if uid == grafanads.DatasourceUID { - return grafanads.DataSourceModel(user.OrgId), nil - } - - // use datasourceId if it exists - id := query.Get("datasourceId").MustInt64(0) - if id > 0 { - ds, err = hs.DataSourceCache.GetDatasource(id, user, skipCache) - if err != nil { - return nil, err - } - return ds, nil - } - - if uid != "" { - ds, err = hs.DataSourceCache.GetDatasourceByUID(uid, user, skipCache) - if err != nil { - return nil, err - } - return ds, nil - } - - return nil, NewErrBadQuery("missing data source ID/UID") -} - func toJsonStreamingResponse(qdr *backend.QueryDataResponse) response.Response { statusCode := http.StatusOK for _, res := range qdr.Responses { diff --git a/pkg/server/wire.go b/pkg/server/wire.go index 3590de89fe5..2768778a877 100644 --- a/pkg/server/wire.go +++ b/pkg/server/wire.go @@ -47,6 +47,7 @@ import ( "github.com/grafana/grafana/pkg/services/notifications" "github.com/grafana/grafana/pkg/services/oauthtoken" "github.com/grafana/grafana/pkg/services/pluginsettings" + "github.com/grafana/grafana/pkg/services/query" "github.com/grafana/grafana/pkg/services/quota" "github.com/grafana/grafana/pkg/services/rendering" "github.com/grafana/grafana/pkg/services/schemaloader" @@ -87,6 +88,7 @@ var wireBasicSet = wire.NewSet( setting.NewCfgFromArgs, New, api.ProvideHTTPServer, + query.ProvideService, bus.ProvideBus, wire.Bind(new(bus.Bus), new(*bus.InProcBus)), rendering.ProvideService, diff --git a/pkg/services/live/live.go b/pkg/services/live/live.go index b7b013dfbed..74eaf541c8e 100644 --- a/pkg/services/live/live.go +++ b/pkg/services/live/live.go @@ -13,6 +13,10 @@ import ( "sync" "time" + jsoniter "github.com/json-iterator/go" + + "github.com/grafana/grafana/pkg/services/query" + "github.com/centrifugal/centrifuge" "github.com/go-redis/redis/v8" "github.com/gobwas/glob" @@ -64,7 +68,7 @@ type CoreGrafanaScope struct { func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, routeRegister routing.RouteRegister, logsService *cloudwatch.LogsService, pluginStore plugins.Store, cacheService *localcache.CacheService, dataSourceCache datasources.CacheService, sqlStore *sqlstore.SQLStore, secretsService secrets.Service, - usageStatsService usagestats.Service) (*GrafanaLive, error) { + usageStatsService usagestats.Service, queryDataService *query.Service) (*GrafanaLive, error) { g := &GrafanaLive{ Cfg: cfg, PluginContextProvider: plugCtxProvider, @@ -75,6 +79,7 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r DataSourceCache: dataSourceCache, SQLStore: sqlStore, SecretsService: secretsService, + queryDataService: queryDataService, channels: make(map[string]models.ChannelHandler), GrafanaScope: CoreGrafanaScope{ Features: make(map[string]models.ChannelHandlerFactory), @@ -200,12 +205,12 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r // Pre-build/validate channel rules for all organizations on start. // This can be unreasonable to have in production scenario with many // organizations. - query := &models.SearchOrgsQuery{} - err := sqlstore.SearchOrgs(context.TODO(), query) + orgQuery := &models.SearchOrgsQuery{} + err := sqlstore.SearchOrgs(context.TODO(), orgQuery) if err != nil { return nil, fmt.Errorf("can't get org list: %w", err) } - for _, org := range query.Result { + for _, org := range orgQuery.Result { _, _, err := channelRuleGetter.Get(org.Id, "") if err != nil { return nil, fmt.Errorf("error building channel rules for org %d: %w", org.Id, err) @@ -260,6 +265,16 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r logger.Debug("Client connected", "user", client.UserID(), "client", client.ID()) connectedAt := time.Now() + // Called when client issues RPC (async request over Live connection). + client.OnRPC(func(e centrifuge.RPCEvent, cb centrifuge.RPCCallback) { + err := runConcurrentlyIfNeeded(client.Context(), semaphore, func() { + cb(g.handleOnRPC(client, e)) + }) + if err != nil { + cb(centrifuge.RPCReply{}, err) + } + }) + // Called when client subscribes to the channel. client.OnSubscribe(func(e centrifuge.SubscribeEvent, cb centrifuge.SubscribeCallback) { err := runConcurrentlyIfNeeded(client.Context(), semaphore, func() { @@ -385,6 +400,7 @@ type GrafanaLive struct { SQLStore *sqlstore.SQLStore SecretsService secrets.Service pluginStore plugins.Store + queryDataService *query.Service node *centrifuge.Node surveyCaller *survey.Caller @@ -502,7 +518,7 @@ func checkAllowedOrigin(origin string, originURL *url.URL, appURL *url.URL, orig return false, nil } -var clientConcurrency = 8 +var clientConcurrency = 12 func (g *GrafanaLive) IsHA() bool { return g.Cfg != nil && g.Cfg.LiveHAEngine != "" @@ -546,6 +562,48 @@ func (g *GrafanaLive) HandleDatasourceUpdate(orgID int64, dsUID string) { } } +// Use a configuration that's compatible with the standard library +// to minimize the risk of introducing bugs. This will make sure +// that map keys is ordered. +var jsonStd = jsoniter.ConfigCompatibleWithStandardLibrary + +func (g *GrafanaLive) handleOnRPC(client *centrifuge.Client, e centrifuge.RPCEvent) (centrifuge.RPCReply, error) { + logger.Debug("Client calls RPC", "user", client.UserID(), "client", client.ID(), "method", e.Method) + if e.Method != "grafana.query" { + return centrifuge.RPCReply{}, centrifuge.ErrorMethodNotFound + } + user, ok := livecontext.GetContextSignedUser(client.Context()) + if !ok { + logger.Error("No user found in context", "user", client.UserID(), "client", client.ID(), "method", e.Method) + return centrifuge.RPCReply{}, centrifuge.ErrorInternal + } + var req dtos.MetricRequest + err := json.Unmarshal(e.Data, &req) + if err != nil { + return centrifuge.RPCReply{}, centrifuge.ErrorBadRequest + } + resp, err := g.queryDataService.QueryData(client.Context(), user, false, req, true) + if err != nil { + logger.Error("Error query data", "user", client.UserID(), "client", client.ID(), "method", e.Method, "error", err) + if errors.Is(err, models.ErrDataSourceAccessDenied) { + return centrifuge.RPCReply{}, ¢rifuge.Error{Code: uint32(http.StatusForbidden), Message: http.StatusText(http.StatusForbidden)} + } + var badQuery *query.ErrBadQuery + if errors.As(err, &badQuery) { + return centrifuge.RPCReply{}, ¢rifuge.Error{Code: uint32(http.StatusBadRequest), Message: http.StatusText(http.StatusBadRequest)} + } + return centrifuge.RPCReply{}, centrifuge.ErrorInternal + } + data, err := jsonStd.Marshal(resp) + if err != nil { + logger.Error("Error marshaling query response", "user", client.UserID(), "client", client.ID(), "method", e.Method, "error", err) + return centrifuge.RPCReply{}, centrifuge.ErrorInternal + } + return centrifuge.RPCReply{ + Data: data, + }, nil +} + func (g *GrafanaLive) handleOnSubscribe(client *centrifuge.Client, e centrifuge.SubscribeEvent) (centrifuge.SubscribeReply, error) { logger.Debug("Client wants to subscribe", "user", client.UserID(), "client", client.ID(), "channel", e.Channel) diff --git a/pkg/services/query/errors.go b/pkg/services/query/errors.go new file mode 100644 index 00000000000..f94278669ff --- /dev/null +++ b/pkg/services/query/errors.go @@ -0,0 +1,17 @@ +package query + +import "fmt" + +// ErrBadQuery returned whenever request is malformed and must contain a message +// suitable to return in API response. +type ErrBadQuery struct { + Message string +} + +func NewErrBadQuery(msg string) *ErrBadQuery { + return &ErrBadQuery{Message: msg} +} + +func (e ErrBadQuery) Error() string { + return fmt.Sprintf("bad query: %s", e.Message) +} diff --git a/pkg/services/query/query.go b/pkg/services/query/query.go new file mode 100644 index 00000000000..7c48ed01af5 --- /dev/null +++ b/pkg/services/query/query.go @@ -0,0 +1,265 @@ +package query + +import ( + "context" + "fmt" + "time" + + "github.com/grafana/grafana/pkg/api/dtos" + "github.com/grafana/grafana/pkg/components/simplejson" + "github.com/grafana/grafana/pkg/expr" + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/plugins" + "github.com/grafana/grafana/pkg/plugins/adapters" + "github.com/grafana/grafana/pkg/services/datasources" + "github.com/grafana/grafana/pkg/services/oauthtoken" + "github.com/grafana/grafana/pkg/services/secrets" + "github.com/grafana/grafana/pkg/setting" + "github.com/grafana/grafana/pkg/tsdb/grafanads" + "github.com/grafana/grafana/pkg/tsdb/legacydata" + + "github.com/grafana/grafana-plugin-sdk-go/backend" +) + +func ProvideService(cfg *setting.Cfg, dataSourceCache datasources.CacheService, expressionService *expr.Service, + pluginRequestValidator models.PluginRequestValidator, SecretsService secrets.Service, + pluginClient plugins.Client, OAuthTokenService oauthtoken.OAuthTokenService) *Service { + g := &Service{ + cfg: cfg, + dataSourceCache: dataSourceCache, + expressionService: expressionService, + pluginRequestValidator: pluginRequestValidator, + secretsService: SecretsService, + pluginClient: pluginClient, + oAuthTokenService: OAuthTokenService, + log: log.New("query_data"), + } + g.log.Info("Query Service initialization") + return g +} + +// Gateway receives data and translates it to Grafana Live publications. +type Service struct { + cfg *setting.Cfg + dataSourceCache datasources.CacheService + expressionService *expr.Service + pluginRequestValidator models.PluginRequestValidator + secretsService secrets.Service + pluginClient plugins.Client + oAuthTokenService oauthtoken.OAuthTokenService + log log.Logger +} + +// Run Service. +func (s *Service) Run(ctx context.Context) error { + <-ctx.Done() + return ctx.Err() +} + +// QueryData can process queries and return query responses. +func (s *Service) QueryData(ctx context.Context, user *models.SignedInUser, skipCache bool, reqDTO dtos.MetricRequest, handleExpressions bool) (*backend.QueryDataResponse, error) { + parsedReq, err := s.parseMetricRequest(user, skipCache, reqDTO) + if err != nil { + return nil, err + } + if handleExpressions && parsedReq.hasExpression { + return s.handleExpressions(ctx, user, parsedReq) + } + return s.handleQueryData(ctx, user, parsedReq) +} + +// handleExpressions handles POST /api/ds/query when there is an expression. +func (s *Service) handleExpressions(ctx context.Context, user *models.SignedInUser, parsedReq *parsedRequest) (*backend.QueryDataResponse, error) { + exprReq := expr.Request{ + OrgId: user.OrgId, + Queries: []expr.Query{}, + } + + for _, pq := range parsedReq.parsedQueries { + if pq.datasource == nil { + return nil, NewErrBadQuery(fmt.Sprintf("query mising datasource info: %s", pq.query.RefID)) + } + + exprReq.Queries = append(exprReq.Queries, expr.Query{ + JSON: pq.query.JSON, + Interval: pq.query.Interval, + RefID: pq.query.RefID, + MaxDataPoints: pq.query.MaxDataPoints, + QueryType: pq.query.QueryType, + Datasource: expr.DataSourceRef{ + Type: pq.datasource.Type, + UID: pq.datasource.Uid, + }, + TimeRange: expr.TimeRange{ + From: pq.query.TimeRange.From, + To: pq.query.TimeRange.To, + }, + }) + } + + qdr, err := s.expressionService.TransformData(ctx, &exprReq) + if err != nil { + return nil, fmt.Errorf("expression request error: %w", err) + } + return qdr, nil +} + +func (s *Service) handleQueryData(ctx context.Context, user *models.SignedInUser, parsedReq *parsedRequest) (*backend.QueryDataResponse, error) { + ds := parsedReq.parsedQueries[0].datasource + if err := s.pluginRequestValidator.Validate(ds.Url, nil); err != nil { + return nil, models.ErrDataSourceAccessDenied + } + + instanceSettings, err := adapters.ModelToInstanceSettings(ds, s.decryptSecureJsonDataFn(ctx)) + if err != nil { + return nil, fmt.Errorf("failed to convert data source to instance settings: %w", err) + } + + req := &backend.QueryDataRequest{ + PluginContext: backend.PluginContext{ + OrgID: ds.OrgId, + PluginID: ds.Type, + User: adapters.BackendUserFromSignedInUser(user), + DataSourceInstanceSettings: instanceSettings, + }, + Headers: map[string]string{}, + Queries: []backend.DataQuery{}, + } + + if s.oAuthTokenService.IsOAuthPassThruEnabled(ds) { + if token := s.oAuthTokenService.GetCurrentOAuthToken(ctx, user); token != nil { + req.Headers["Authorization"] = fmt.Sprintf("%s %s", token.Type(), token.AccessToken) + } + } + + for _, q := range parsedReq.parsedQueries { + req.Queries = append(req.Queries, q.query) + } + + return s.pluginClient.QueryData(ctx, req) +} + +type parsedQuery struct { + datasource *models.DataSource + query backend.DataQuery +} + +type parsedRequest struct { + hasExpression bool + parsedQueries []parsedQuery +} + +func (s *Service) parseMetricRequest(user *models.SignedInUser, skipCache bool, reqDTO dtos.MetricRequest) (*parsedRequest, error) { + if len(reqDTO.Queries) == 0 { + return nil, NewErrBadQuery("no queries found") + } + + timeRange := legacydata.NewDataTimeRange(reqDTO.From, reqDTO.To) + req := &parsedRequest{ + hasExpression: false, + parsedQueries: []parsedQuery{}, + } + + // Parse the queries + datasourcesByUid := map[string]*models.DataSource{} + for _, query := range reqDTO.Queries { + ds, err := s.getDataSourceFromQuery(user, skipCache, query, datasourcesByUid) + if err != nil { + return nil, err + } + if ds == nil { + return nil, NewErrBadQuery("invalid data source ID") + } + + datasourcesByUid[ds.Uid] = ds + if expr.IsDataSource(ds.Uid) { + req.hasExpression = true + } + + s.log.Debug("Processing metrics query", "query", query) + + modelJSON, err := query.MarshalJSON() + if err != nil { + return nil, err + } + + req.parsedQueries = append(req.parsedQueries, parsedQuery{ + datasource: ds, + query: backend.DataQuery{ + TimeRange: backend.TimeRange{ + From: timeRange.GetFromAsTimeUTC(), + To: timeRange.GetToAsTimeUTC(), + }, + RefID: query.Get("refId").MustString("A"), + MaxDataPoints: query.Get("maxDataPoints").MustInt64(100), + Interval: time.Duration(query.Get("intervalMs").MustInt64(1000)) * time.Millisecond, + QueryType: query.Get("queryType").MustString(""), + JSON: modelJSON, + }, + }) + } + + if !req.hasExpression { + if len(datasourcesByUid) > 1 { + // We do not (yet) support mixed query type + return nil, NewErrBadQuery("all queries must use the same datasource") + } + } + + return req, nil +} + +func (s *Service) getDataSourceFromQuery(user *models.SignedInUser, skipCache bool, query *simplejson.Json, history map[string]*models.DataSource) (*models.DataSource, error) { + var err error + uid := query.Get("datasource").Get("uid").MustString() + + // before 8.3 special types could be sent as datasource (expr) + if uid == "" { + uid = query.Get("datasource").MustString() + } + + // check cache value + ds, ok := history[uid] + if ok { + return ds, nil + } + + if expr.IsDataSource(uid) { + return expr.DataSourceModel(), nil + } + + if uid == grafanads.DatasourceUID { + return grafanads.DataSourceModel(user.OrgId), nil + } + + // use datasourceId if it exists + id := query.Get("datasourceId").MustInt64(0) + if id > 0 { + ds, err = s.dataSourceCache.GetDatasource(id, user, skipCache) + if err != nil { + return nil, err + } + return ds, nil + } + + if uid != "" { + ds, err = s.dataSourceCache.GetDatasourceByUID(uid, user, skipCache) + if err != nil { + return nil, err + } + return ds, nil + } + + return nil, NewErrBadQuery("missing data source ID/UID") +} + +func (s *Service) decryptSecureJsonDataFn(ctx context.Context) func(map[string][]byte) map[string]string { + return func(m map[string][]byte) map[string]string { + decryptedJsonData, err := s.secretsService.DecryptJsonData(ctx, m) + if err != nil { + s.log.Error("Failed to decrypt secure json data", "error", err) + } + return decryptedJsonData + } +}