The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
grafana/pkg/expr/transform.go

229 lines
6.1 KiB

package expr
import (
"encoding/json"
"fmt"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins/adapters"
"github.com/grafana/grafana/pkg/tsdb/legacydata"
"github.com/grafana/grafana/pkg/util/errutil"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/net/context"
)
var (
expressionsQuerySummary *prometheus.SummaryVec
)
func init() {
expressionsQuerySummary = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "expressions_queries_duration_milliseconds",
Help: "Expressions query summary",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{"status"},
)
prometheus.MustRegister(expressionsQuerySummary)
}
// WrapTransformData creates and executes transform requests
func (s *Service) WrapTransformData(ctx context.Context, query legacydata.DataQuery) (*backend.QueryDataResponse, error) {
req := Request{
OrgId: query.User.OrgId,
Queries: []Query{},
}
for _, q := range query.Queries {
if q.DataSource == nil {
return nil, fmt.Errorf("mising datasource info: " + q.RefID)
}
modelJSON, err := q.Model.MarshalJSON()
if err != nil {
return nil, err
}
req.Queries = append(req.Queries, Query{
JSON: modelJSON,
Interval: time.Duration(q.IntervalMS) * time.Millisecond,
RefID: q.RefID,
MaxDataPoints: q.MaxDataPoints,
QueryType: q.QueryType,
Datasource: DataSourceRef{
Type: q.DataSource.Type,
UID: q.DataSource.Uid,
},
TimeRange: TimeRange{
From: query.TimeRange.GetFromAsTimeUTC(),
To: query.TimeRange.GetToAsTimeUTC(),
},
})
}
return s.TransformData(ctx, &req)
}
// Request is similar to plugins.DataQuery but with the Time Ranges is per Query.
type Request struct {
Headers map[string]string
Debug bool
OrgId int64
Queries []Query
}
// Query is like plugins.DataSubQuery, but with a a time range, and only the UID
// for the data source. Also interval is a time.Duration.
type Query struct {
RefID string
TimeRange TimeRange
DatasourceUID string // deprecated, value -100 when expressions
Datasource DataSourceRef `json:"datasource"`
JSON json.RawMessage
Interval time.Duration
QueryType string
MaxDataPoints int64
}
type DataSourceRef struct {
Type string `json:"type"` // value should be __expr__
UID string `json:"uid"` // value should be __expr__
}
func (q *Query) GetDatasourceUID() string {
if q.DatasourceUID != "" {
return q.DatasourceUID // backwards compatibility gets precedence
}
if q.Datasource.UID != "" {
return q.Datasource.UID
}
return ""
}
// TimeRange is a time.Time based TimeRange.
type TimeRange struct {
From time.Time
To time.Time
}
// TransformData takes Queries which are either expressions nodes
// or are datasource requests.
func (s *Service) TransformData(ctx context.Context, req *Request) (r *backend.QueryDataResponse, err error) {
if s.isDisabled() {
return nil, fmt.Errorf("server side expressions are disabled")
}
start := time.Now()
defer func() {
var respStatus string
switch {
case err == nil:
respStatus = "success"
default:
respStatus = "failure"
}
duration := float64(time.Since(start).Nanoseconds()) / float64(time.Millisecond)
expressionsQuerySummary.WithLabelValues(respStatus).Observe(duration)
}()
// Build the pipeline from the request, checking for ordering issues (e.g. loops)
// and parsing graph nodes from the queries.
pipeline, err := s.BuildPipeline(req)
if err != nil {
return nil, err
}
// Execute the pipeline
responses, err := s.ExecutePipeline(ctx, pipeline)
if err != nil {
return nil, err
}
// Get which queries have the Hide property so they those queries' results
// can be excluded from the response.
hidden, err := hiddenRefIDs(req.Queries)
if err != nil {
return nil, err
}
if len(hidden) != 0 {
filteredRes := backend.NewQueryDataResponse()
for refID, res := range responses.Responses {
if _, ok := hidden[refID]; !ok {
filteredRes.Responses[refID] = res
}
}
responses = filteredRes
}
return responses, nil
}
func hiddenRefIDs(queries []Query) (map[string]struct{}, error) {
hidden := make(map[string]struct{})
for _, query := range queries {
hide := struct {
Hide bool `json:"hide"`
}{}
if err := json.Unmarshal(query.JSON, &hide); err != nil {
return nil, err
}
if hide.Hide {
hidden[query.RefID] = struct{}{}
}
}
return hidden, nil
}
// queryData is called used to query datasources that are not expression commands, but are used
// alongside expressions and/or are the input of an expression command.
func (s *Service) queryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
if len(req.Queries) == 0 {
return nil, fmt.Errorf("zero queries found in datasource request")
}
datasourceID := int64(0)
var datasourceUID string
if req.PluginContext.DataSourceInstanceSettings != nil {
datasourceID = req.PluginContext.DataSourceInstanceSettings.ID
datasourceUID = req.PluginContext.DataSourceInstanceSettings.UID
}
getDsInfo := &models.GetDataSourceQuery{
OrgId: req.PluginContext.OrgID,
Id: datasourceID,
Uid: datasourceUID,
}
if err := bus.DispatchCtx(ctx, getDsInfo); err != nil {
return nil, fmt.Errorf("could not find datasource: %w", err)
}
dsInstanceSettings, err := adapters.ModelToInstanceSettings(getDsInfo.Result, s.decryptSecureJsonDataFn(ctx))
if err != nil {
return nil, errutil.Wrap("failed to convert datasource instance settings", err)
}
req.PluginContext.DataSourceInstanceSettings = dsInstanceSettings
req.PluginContext.PluginID = getDsInfo.Result.Type
return s.dataService.QueryData(ctx, req)
}
func (s *Service) decryptSecureJsonDataFn(ctx context.Context) func(map[string][]byte) map[string]string {
return func(m map[string][]byte) map[string]string {
decryptedJsonData, err := s.encryptionService.DecryptJsonData(ctx, m, s.cfg.SecretKey)
if err != nil {
logger.Error("Failed to decrypt secure json data", "error", err)
}
return decryptedJsonData
}
}