The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
grafana/pkg/expr/transform.go

196 lines
5.4 KiB

package expr
import (
"encoding/json"
"fmt"
"strconv"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var (
expressionsQuerySummary *prometheus.SummaryVec
)
func init() {
expressionsQuerySummary = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "expressions_queries_duration_milliseconds",
Help: "Expressions query summary",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{"status"},
)
prometheus.MustRegister(expressionsQuerySummary)
}
// WrapTransformData creates and executes transform requests
func (s *Service) WrapTransformData(ctx context.Context, query plugins.DataQuery) (*backend.QueryDataResponse, error) {
sdkReq := &backend.QueryDataRequest{
PluginContext: backend.PluginContext{
OrgID: query.User.OrgId,
},
Queries: []backend.DataQuery{},
}
for _, q := range query.Queries {
modelJSON, err := q.Model.MarshalJSON()
if err != nil {
return nil, err
}
sdkReq.Queries = append(sdkReq.Queries, backend.DataQuery{
JSON: modelJSON,
Interval: time.Duration(q.IntervalMS) * time.Millisecond,
RefID: q.RefID,
MaxDataPoints: q.MaxDataPoints,
QueryType: q.QueryType,
TimeRange: backend.TimeRange{
From: query.TimeRange.GetFromAsTimeUTC(),
To: query.TimeRange.GetToAsTimeUTC(),
},
})
}
return s.TransformData(ctx, sdkReq)
}
// TransformData takes Queries which are either expressions nodes
// or are datasource requests.
func (s *Service) TransformData(ctx context.Context, req *backend.QueryDataRequest) (r *backend.QueryDataResponse, err error) {
if s.isDisabled() {
return nil, status.Error(codes.PermissionDenied, "Expressions are disabled")
}
start := time.Now()
defer func() {
var respStatus string
switch {
case err == nil:
respStatus = "success"
default:
respStatus = "failure"
}
duration := float64(time.Since(start).Nanoseconds()) / float64(time.Millisecond)
expressionsQuerySummary.WithLabelValues(respStatus).Observe(duration)
}()
// Build the pipeline from the request, checking for ordering issues (e.g. loops)
// and parsing graph nodes from the queries.
pipeline, err := s.BuildPipeline(req)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
// Execute the pipeline
responses, err := s.ExecutePipeline(ctx, pipeline)
if err != nil {
return nil, status.Error(codes.Unknown, err.Error())
}
// Get which queries have the Hide property so they those queries' results
// can be excluded from the response.
hidden, err := hiddenRefIDs(req.Queries)
if err != nil {
return nil, status.Error((codes.Internal), err.Error())
}
if len(hidden) != 0 {
filteredRes := backend.NewQueryDataResponse()
for refID, res := range responses.Responses {
if _, ok := hidden[refID]; !ok {
filteredRes.Responses[refID] = res
}
}
responses = filteredRes
}
return responses, nil
}
func hiddenRefIDs(queries []backend.DataQuery) (map[string]struct{}, error) {
hidden := make(map[string]struct{})
for _, query := range queries {
hide := struct {
Hide bool `json:"hide"`
}{}
if err := json.Unmarshal(query.JSON, &hide); err != nil {
return nil, err
}
if hide.Hide {
hidden[query.RefID] = struct{}{}
}
}
return hidden, nil
}
// queryData is called used to query datasources that are not expression commands, but are used
// alongside expressions and/or are the input of an expression command.
func (s *Service) queryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
if len(req.Queries) == 0 {
return nil, fmt.Errorf("zero queries found in datasource request")
}
datasourceID := int64(0)
var datasourceUID string
if req.PluginContext.DataSourceInstanceSettings != nil {
datasourceID = req.PluginContext.DataSourceInstanceSettings.ID
datasourceUID = req.PluginContext.DataSourceInstanceSettings.UID
}
getDsInfo := &models.GetDataSourceQuery{
OrgId: req.PluginContext.OrgID,
Id: datasourceID,
Uid: datasourceUID,
}
if err := bus.Dispatch(getDsInfo); err != nil {
return nil, fmt.Errorf("could not find datasource: %w", err)
}
// Convert plugin-model (datasource) queries to tsdb queries
queries := make([]plugins.DataSubQuery, len(req.Queries))
for i, query := range req.Queries {
sj, err := simplejson.NewJson(query.JSON)
if err != nil {
return nil, err
}
queries[i] = plugins.DataSubQuery{
RefID: query.RefID,
IntervalMS: query.Interval.Milliseconds(),
MaxDataPoints: query.MaxDataPoints,
QueryType: query.QueryType,
DataSource: getDsInfo.Result,
Model: sj,
}
}
// For now take Time Range from first query.
timeRange := plugins.NewDataTimeRange(strconv.FormatInt(req.Queries[0].TimeRange.From.Unix()*1000, 10),
strconv.FormatInt(req.Queries[0].TimeRange.To.Unix()*1000, 10))
tQ := plugins.DataQuery{
TimeRange: &timeRange,
Queries: queries,
}
// Execute the converted queries
tsdbRes, err := s.DataService.HandleRequest(ctx, getDsInfo.Result, tQ)
if err != nil {
return nil, err
}
return tsdbRes.ToBackendDataResponse()
}