The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
grafana/pkg/expr/transform.go

134 lines
3.2 KiB

package expr
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/grafana/pkg/services/datasources"
)
var (
expressionsQuerySummary *prometheus.SummaryVec
)
func init() {
expressionsQuerySummary = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "expressions_queries_duration_milliseconds",
Help: "Expressions query summary",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{"status"},
)
prometheus.MustRegister(expressionsQuerySummary)
}
// Request is similar to plugins.DataQuery but with the Time Ranges is per Query.
type Request struct {
Headers map[string]string
Debug bool
OrgId int64
Queries []Query
}
// Query is like plugins.DataSubQuery, but with a a time range, and only the UID
// for the data source. Also interval is a time.Duration.
type Query struct {
RefID string
TimeRange TimeRange
DataSource *datasources.DataSource `json:"datasource"`
JSON json.RawMessage
Interval time.Duration
QueryType string
MaxDataPoints int64
}
// TimeRange is a time.Time based TimeRange.
type TimeRange struct {
From time.Time
To time.Time
}
// TransformData takes Queries which are either expressions nodes
// or are datasource requests.
func (s *Service) TransformData(ctx context.Context, req *Request) (r *backend.QueryDataResponse, err error) {
if s.isDisabled() {
return nil, fmt.Errorf("server side expressions are disabled")
}
start := time.Now()
defer func() {
var respStatus string
switch {
case err == nil:
respStatus = "success"
default:
respStatus = "failure"
}
duration := float64(time.Since(start).Nanoseconds()) / float64(time.Millisecond)
expressionsQuerySummary.WithLabelValues(respStatus).Observe(duration)
}()
// Build the pipeline from the request, checking for ordering issues (e.g. loops)
// and parsing graph nodes from the queries.
pipeline, err := s.BuildPipeline(req)
if err != nil {
return nil, err
}
// Execute the pipeline
responses, err := s.ExecutePipeline(ctx, pipeline)
if err != nil {
return nil, err
}
// Get which queries have the Hide property so they those queries' results
// can be excluded from the response.
hidden, err := hiddenRefIDs(req.Queries)
if err != nil {
return nil, err
}
if len(hidden) != 0 {
filteredRes := backend.NewQueryDataResponse()
for refID, res := range responses.Responses {
if _, ok := hidden[refID]; !ok {
filteredRes.Responses[refID] = res
}
}
responses = filteredRes
}
return responses, nil
}
func hiddenRefIDs(queries []Query) (map[string]struct{}, error) {
hidden := make(map[string]struct{})
for _, query := range queries {
hide := struct {
Hide bool `json:"hide"`
}{}
if err := json.Unmarshal(query.JSON, &hide); err != nil {
return nil, err
}
if hide.Hide {
hidden[query.RefID] = struct{}{}
}
}
return hidden, nil
}
func (s *Service) decryptSecureJsonDataFn(ctx context.Context) func(ds *datasources.DataSource) (map[string]string, error) {
return func(ds *datasources.DataSource) (map[string]string, error) {
return s.dataSourceService.DecryptedValues(ctx, ds)
}
}