SSE: Support for ML query node (#69963)

* introduce a new node-type ML and implement a command outlier that uses ML plugin as a source of data.
* add feature flag mlExpressions that guards the feature
pull/71600/head
Yuri Tseretyan 3 years ago committed by GitHub
parent e8b4228f89
commit 541bfe636d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      docs/sources/setup-grafana/configure-grafana/feature-toggles/index.md
  2. 1
      packages/grafana-data/src/types/featureToggles.gen.ts
  3. 17
      pkg/expr/graph.go
  4. 158
      pkg/expr/ml.go
  5. 83
      pkg/expr/ml/model.go
  6. 63
      pkg/expr/ml/node.go
  7. 167
      pkg/expr/ml/node_test.go
  8. 116
      pkg/expr/ml/outlier.go
  9. 194
      pkg/expr/ml/outlier_test.go
  10. 44
      pkg/expr/ml/testing.go
  11. 219
      pkg/expr/ml_test.go
  12. 1
      pkg/expr/nodes.go
  13. 20
      pkg/expr/service.go
  14. 84
      pkg/expr/testing.go
  15. 7
      pkg/services/featuremgmt/registry.go
  16. 1
      pkg/services/featuremgmt/toggles_gen.csv
  17. 4
      pkg/services/featuremgmt/toggles_gen.go
  18. 27
      pkg/services/ngalert/eval/eval.go

@ -126,6 +126,7 @@ Experimental features might be changed or removed without prior notice.
| `logsExploreTableVisualisation` | A table visualisation for logs in Explore |
| `awsDatasourcesTempCredentials` | Support temporary security credentials in AWS plugins for Grafana Cloud customers |
| `transformationsRedesign` | Enables the transformations redesign |
| `mlExpressions` | Enable support for Machine Learning in server-side expressions |
## Development feature toggles

@ -113,4 +113,5 @@ export interface FeatureToggles {
logsExploreTableVisualisation?: boolean;
awsDatasourcesTempCredentials?: boolean;
transformationsRedesign?: boolean;
mlExpressions?: boolean;
}

@ -11,6 +11,7 @@ import (
"gonum.org/v1/gonum/graph/topo"
"github.com/grafana/grafana/pkg/expr/mathexp"
"github.com/grafana/grafana/pkg/services/featuremgmt"
)
// NodeType is the type of a DPNode. Currently either a expression command or datasource query.
@ -21,6 +22,8 @@ const (
TypeCMDNode NodeType = iota
// TypeDatasourceNode is a NodeType for datasource queries.
TypeDatasourceNode
// TypeMLNode is a NodeType for Machine Learning queries.
TypeMLNode
)
func (nt NodeType) String() string {
@ -29,6 +32,8 @@ func (nt NodeType) String() string {
return "Expression"
case TypeDatasourceNode:
return "Datasource"
case TypeMLNode:
return "Machine Learning"
default:
return "Unknown"
}
@ -159,6 +164,7 @@ func (s *Service) buildGraph(req *Request) (*simple.DirectedGraph, error) {
rn := &rawNode{
Query: rawQueryProp,
QueryRaw: query.JSON,
RefID: query.RefID,
TimeRange: query.TimeRange,
QueryType: query.QueryType,
@ -171,7 +177,16 @@ func (s *Service) buildGraph(req *Request) (*simple.DirectedGraph, error) {
node, err = s.buildDSNode(dp, rn, req)
case TypeCMDNode:
node, err = buildCMDNode(dp, rn)
default:
case TypeMLNode:
if s.features.IsEnabled(featuremgmt.FlagMlExpressions) {
node, err = s.buildMLNode(dp, rn, req)
if err != nil {
err = fmt.Errorf("fail to parse expression with refID %v: %w", rn.RefID, err)
}
}
}
if node == nil && err == nil {
err = fmt.Errorf("unsupported node type '%s'", NodeTypeFromDatasourceUID(query.DataSource.UID))
}

@ -0,0 +1,158 @@
package expr
import (
"context"
"errors"
"fmt"
"net/http"
"strings"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
jsoniter "github.com/json-iterator/go"
"gonum.org/v1/gonum/graph/simple"
"github.com/grafana/grafana/pkg/api/response"
"github.com/grafana/grafana/pkg/expr/mathexp"
"github.com/grafana/grafana/pkg/expr/ml"
"github.com/grafana/grafana/pkg/plugins/httpresponsesender"
"github.com/grafana/grafana/pkg/services/pluginsintegration/plugincontext"
)
var (
errMLPluginDoesNotExist = fmt.Errorf("expression type Machine Learning cannot be executed. Plugin '%s' must be installed and initialized", mlPluginID)
)
const (
// mlDatasourceID is similar to a fake ID for CMDNode. There is no specific reason for the selection of this value.
mlDatasourceID = -200
// DatasourceUID is the string constant used as the datasource name in requests
// to identify it as an expression command when use in Datasource.UID.
MLDatasourceUID = "__ml__"
// mlPluginID is a known constant and used in other places of the code
mlPluginID = "grafana-ml-app"
)
// MLNode is a node of expression tree that evaluates the expression by sending the payload to Machine Learning back-end.
// See ml.UnmarshalCommand for supported commands.
type MLNode struct {
baseNode
command ml.Command
TimeRange TimeRange
request *Request
}
// NodeType returns the data pipeline node type.
func (m *MLNode) NodeType() NodeType {
return TypeMLNode
}
// Execute initializes plugin API client, executes a ml.Command and then converts the result of the execution.
// Returns non-empty mathexp.Results if evaluation was successful. Returns QueryError if command execution failed
func (m *MLNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s *Service) (r mathexp.Results, e error) {
logger := logger.FromContext(ctx).New("datasourceType", mlPluginID, "queryRefId", m.refID)
var result mathexp.Results
timeRange := m.TimeRange.AbsoluteTime(now)
// get the plugin configuration that will be used by client (auth, host, etc)
pCtx, err := s.pCtxProvider.Get(ctx, mlPluginID, m.request.User, m.request.OrgId)
if err != nil {
if errors.Is(err, plugincontext.ErrPluginNotFound) {
return result, errMLPluginDoesNotExist
}
return result, fmt.Errorf("failed to get plugin settings: %w", err)
}
// Plugin must be initialized by the admin first. That will create service account, and update plugin settings so all requests can use it.
// Fail if it is not initialized.
if pCtx.AppInstanceSettings == nil || !jsoniter.Get(pCtx.AppInstanceSettings.JSONData, "initialized").ToBool() {
return mathexp.Results{}, errMLPluginDoesNotExist
}
// responseType and respStatus will be updated below. Use defer to ensure that debug log message is always emitted
responseType := "unknown"
respStatus := "success"
defer func() {
if e != nil {
responseType = "error"
respStatus = "failure"
}
logger.Debug("Data source queried", "responseType", responseType)
useDataplane := strings.HasPrefix("dataplane-", responseType)
s.metrics.dsRequests.WithLabelValues(respStatus, fmt.Sprintf("%t", useDataplane)).Inc()
}()
// Execute the command and provide callback function for sending a request via plugin API.
// This lets us make commands abstracted from peculiarities of the transfer protocol.
data, err := m.command.Execute(timeRange.From, timeRange.To, func(method string, path string, payload []byte) (response.Response, error) {
crReq := &backend.CallResourceRequest{
PluginContext: pCtx,
Path: path,
Method: method,
URL: path,
Headers: make(map[string][]string, len(m.request.Headers)),
Body: payload,
}
// copy headers from the request to evaluate the expression pipeline. Usually this contains information from upstream, e.g. FromAlert
for key, val := range m.request.Headers {
crReq.SetHTTPHeader(key, val)
}
resp := response.CreateNormalResponse(make(http.Header), nil, 0)
httpSender := httpresponsesender.New(resp)
err = s.pluginsClient.CallResource(ctx, crReq, httpSender)
if err != nil {
return nil, err
}
return resp, nil
})
if err != nil {
return result, QueryError{
RefID: m.refID,
Err: err,
}
}
// data is not guaranteed to be specified. In this case simulate NoData scenario
if data == nil {
data = &backend.QueryDataResponse{Responses: map[string]backend.DataResponse{}}
}
dataFrames, err := getResponseFrame(data, m.refID)
if err != nil {
return mathexp.Results{}, QueryError{
RefID: m.refID,
DatasourceUID: mlPluginID,
Err: err,
}
}
// process the response the same way DSNode does. Use plugin ID as data source type. Semantically, they are the same.
responseType, result, err = convertDataFramesToResults(ctx, dataFrames, mlPluginID, s, logger)
return result, err
}
func (s *Service) buildMLNode(dp *simple.DirectedGraph, rn *rawNode, req *Request) (Node, error) {
if rn.TimeRange == nil {
return nil, errors.New("time range must be specified")
}
cmd, err := ml.UnmarshalCommand(rn.QueryRaw, s.cfg.AppURL)
if err != nil {
return nil, err
}
return &MLNode{
baseNode: baseNode{
id: dp.NewNode().ID(),
refID: rn.RefID,
},
TimeRange: rn.TimeRange,
command: cmd,
request: req,
}, nil
}

@ -0,0 +1,83 @@
package ml
import (
"strings"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
jsoniter "github.com/json-iterator/go"
)
type CommandConfiguration struct {
Type string `json:"type"`
IntervalMs *uint `json:"intervalMs,omitempty"`
Config jsoniter.RawMessage `json:"config"`
}
type OutlierCommandConfiguration struct {
DatasourceType string `json:"datasource_type"`
DatasourceUID string `json:"datasource_uid,omitempty"`
// If Query is empty it should be contained in a datasource specific format
// inside of QueryParms.
Query string `json:"query,omitempty"`
QueryParams map[string]interface{} `json:"query_params,omitempty"`
Algorithm map[string]interface{} `json:"algorithm"`
ResponseType string `json:"response_type"`
}
// outlierAttributes is outlier command configuration that is sent to Machine learning API
type outlierAttributes struct {
OutlierCommandConfiguration
GrafanaURL string `json:"grafana_url"`
StartEndAttributes timeRangeAndInterval `json:"start_end_attributes"`
}
type outlierData struct {
Attributes outlierAttributes `json:"attributes"`
}
// outlierRequestBody describes a request body that is sent to Outlier API
type outlierRequestBody struct {
Data outlierData `json:"data"`
}
type timeRangeAndInterval struct {
Start mlTime `json:"start"`
End mlTime `json:"end"`
Interval int64 `json:"interval"` // Interval is expected to be in milliseconds
}
func newTimeRangeAndInterval(from, to time.Time, interval time.Duration) timeRangeAndInterval {
return timeRangeAndInterval{
Start: mlTime(from),
End: mlTime(to),
Interval: interval.Milliseconds(),
}
}
// mlTime is a time.Time that is marshalled as a string in a format is supported by Machine Learning API
type mlTime time.Time
func (t *mlTime) UnmarshalJSON(b []byte) error {
s := strings.Trim(string(b), "\"")
parsed, err := time.Parse(timeFormat, s)
if err != nil {
return err
}
*t = mlTime(parsed)
return nil
}
// MarshalJSON implements the Marshaler interface.
func (t mlTime) MarshalJSON() ([]byte, error) {
return []byte("\"" + time.Time(t).Format(timeFormat) + "\""), nil
}
// outlierResponse is a model that represents a response of the outlier proxy API.
type outlierResponse struct {
Status string `json:"status"`
Data *backend.QueryDataResponse `json:"data,omitempty"`
Error string `json:"error,omitempty"`
}

@ -0,0 +1,63 @@
package ml
import (
"fmt"
"strings"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
jsoniter "github.com/json-iterator/go"
"github.com/grafana/grafana/pkg/api/response"
)
var json = jsoniter.ConfigCompatibleWithStandardLibrary
type CommandType string
const (
Outlier CommandType = "outlier"
// format of the time used by outlier API
timeFormat = "2006-01-02T15:04:05.999999999"
defaultInterval = 1000 * time.Millisecond
)
// Command is an interface implemented by all Machine Learning commands that can be executed against ML API.
type Command interface {
// DatasourceUID returns UID of a data source that is used by machine learning as the source of data
DatasourceUID() string
// Execute creates a payload send request to the ML API by calling the function argument sendRequest, and then parses response.
// Function sendRequest is supposed to abstract the client configuration such creating http request, adding authorization parameters, host etc.
Execute(from, to time.Time, sendRequest func(method string, path string, payload []byte) (response.Response, error)) (*backend.QueryDataResponse, error)
}
// UnmarshalCommand parses a config parameters and creates a command. Requires key `type` to be specified.
// Based on the value of `type` field it parses a Command
func UnmarshalCommand(query []byte, appURL string) (Command, error) {
var expr CommandConfiguration
err := json.Unmarshal(query, &expr)
if err != nil {
return nil, fmt.Errorf("failed to unmarshall Machine learning command: %w", err)
}
if len(expr.Type) == 0 {
return nil, fmt.Errorf("required field 'type' is not specified or empty. Should be one of [%s]", Outlier)
}
if len(expr.Config) == 0 {
return nil, fmt.Errorf("required field 'config' is not specified")
}
var cmd Command
switch mlType := strings.ToLower(expr.Type); mlType {
case string(Outlier):
cmd, err = unmarshalOutlierCommand(expr, appURL)
default:
return nil, fmt.Errorf("unsupported command type. Should be one of [%s]", Outlier)
}
if err != nil {
return nil, fmt.Errorf("failed to unmarshal Machine learning %s command: %w", expr.Type, err)
}
return cmd, nil
}

@ -0,0 +1,167 @@
package ml
import (
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
)
func TestUnmarshalCommand(t *testing.T) {
appURL := "https://grafana.com"
updateJson := func(cmd string, f func(m map[string]interface{})) func(t *testing.T) []byte {
return func(t *testing.T) []byte {
var d map[string]interface{}
require.NoError(t, json.UnmarshalFromString(cmd, &d))
f(d)
data, err := json.Marshal(d)
require.NoError(t, err)
return data
}
}
t.Run("should parse outlier command", func(t *testing.T) {
cmd, err := UnmarshalCommand([]byte(outlierQuery), appURL)
require.NoError(t, err)
require.IsType(t, &OutlierCommand{}, cmd)
outlier := cmd.(*OutlierCommand)
require.Equal(t, 1234*time.Millisecond, outlier.interval)
require.Equal(t, appURL, outlier.appURL)
require.Equal(t, OutlierCommandConfiguration{
DatasourceType: "prometheus",
DatasourceUID: "a4ce599c-4c93-44b9-be5b-76385b8c01be",
QueryParams: map[string]interface{}{
"expr": "go_goroutines{}",
"range": true,
"refId": "A",
},
Algorithm: map[string]interface{}{
"name": "dbscan",
"config": map[string]interface{}{
"epsilon": 7.667,
},
"sensitivity": 0.83,
},
ResponseType: "binary",
}, outlier.config)
})
t.Run("should fallback to default if 'intervalMs' is not specified", func(t *testing.T) {
data := updateJson(outlierQuery, func(m map[string]interface{}) {
delete(m, "intervalMs")
})(t)
cmd, err := UnmarshalCommand(data, appURL)
require.NoError(t, err)
outlier := cmd.(*OutlierCommand)
require.Equal(t, defaultInterval, outlier.interval)
})
t.Run("fails when", func(t *testing.T) {
testCases := []struct {
name string
config func(t *testing.T) []byte
err string
}{
{
name: "field 'type' is missing",
config: updateJson(outlierQuery, func(cmd map[string]interface{}) {
delete(cmd, "type")
}),
err: "required field 'type' is not specified or empty. Should be one of [outlier]",
},
{
name: "field 'type' is not known",
config: updateJson(outlierQuery, func(cmd map[string]interface{}) {
cmd["type"] = uuid.NewString()
}),
err: "unsupported command type. Should be one of [outlier]",
},
{
name: "field 'type' is not string",
config: updateJson(outlierQuery, func(cmd map[string]interface{}) {
cmd["type"] = map[string]interface{}{
"data": 1,
}
}),
err: "failed to unmarshall Machine learning command",
},
{
name: "field 'config' is missing",
config: updateJson(outlierQuery, func(cmd map[string]interface{}) {
delete(cmd, "config")
}),
err: "required field 'config' is not specified",
},
{
name: "field 'intervalMs' is not number",
config: updateJson(outlierQuery, func(cmd map[string]interface{}) {
cmd["intervalMs"] = "test"
}),
err: "failed to unmarshall Machine learning command",
},
{
name: "field 'config.datasource_uid' is not specified",
config: updateJson(outlierQuery, func(cmd map[string]interface{}) {
cfg := cmd["config"].(map[string]interface{})
delete(cfg, "datasource_uid")
}),
err: "required field `config.datasource_uid` is not specified",
},
{
name: "field 'config.algorithm' is not specified",
config: updateJson(outlierQuery, func(cmd map[string]interface{}) {
cfg := cmd["config"].(map[string]interface{})
delete(cfg, "algorithm")
}),
err: "required field `config.algorithm` is not specified",
},
{
name: "field 'config.response_type' is not specified",
config: updateJson(outlierQuery, func(cmd map[string]interface{}) {
cfg := cmd["config"].(map[string]interface{})
delete(cfg, "response_type")
}),
err: "required field `config.response_type` is not specified",
},
{
name: "fields 'config.query' and 'config.query_params' are not specified",
config: updateJson(outlierQuery, func(cmd map[string]interface{}) {
cfg := cmd["config"].(map[string]interface{})
delete(cfg, "query")
delete(cfg, "query_params")
}),
err: "neither of required fields `config.query_params` or `config.query` are specified",
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
_, err := UnmarshalCommand(testCase.config(t), appURL)
require.ErrorContains(t, err, testCase.err)
})
}
})
}
const outlierQuery = `
{
"type": "outlier",
"intervalMs": 1234,
"config": {
"datasource_uid": "a4ce599c-4c93-44b9-be5b-76385b8c01be",
"datasource_type": "prometheus",
"query_params": {
"expr": "go_goroutines{}",
"range": true,
"refId": "A"
},
"response_type": "binary",
"algorithm": {
"name": "dbscan",
"config": {
"epsilon": 7.667
},
"sensitivity": 0.83
}
}
}
`

@ -0,0 +1,116 @@
package ml
import (
"fmt"
"net/http"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/api/response"
)
// OutlierCommand implements Command that sends a request to outlier proxy API and converts response to backend.QueryDataResponse
type OutlierCommand struct {
config OutlierCommandConfiguration
appURL string
interval time.Duration
}
var _ Command = OutlierCommand{}
func (c OutlierCommand) DatasourceUID() string {
return c.config.DatasourceUID
}
// Execute copies the original configuration JSON and appends (overwrites) a field "start_end_attributes" and "grafana_url" to the root object.
// The value of "start_end_attributes" is JSON object that configures time range and interval.
// The value of "grafana_url" is app URL that should be used by ML to query the data source.
// After payload is generated it sends it to POST /proxy/api/v1/outlier endpoint and parses the response.
// The proxy API normally responds with a structured data. It recognizes status 200 and 204 as successful result.
// Other statuses are considered unsuccessful and result in error. Tries to extract error from the structured payload.
// Otherwise, mentions the full message in error
func (c OutlierCommand) Execute(from, to time.Time, sendRequest func(method string, path string, payload []byte) (response.Response, error)) (*backend.QueryDataResponse, error) {
payload := outlierRequestBody{
Data: outlierData{
Attributes: outlierAttributes{
OutlierCommandConfiguration: c.config,
GrafanaURL: c.appURL,
StartEndAttributes: newTimeRangeAndInterval(from, to, c.interval),
},
},
}
requestBody, err := json.Marshal(payload)
if err != nil {
return nil, err
}
resp, err := sendRequest(http.MethodPost, "/proxy/api/v1/outlier", requestBody)
if err != nil {
return nil, fmt.Errorf("failed to call ML API: %w", err)
}
if resp == nil {
return nil, fmt.Errorf("response is nil")
}
// Outlier proxy API usually returns all responses with this body.
var respData outlierResponse
respBody := resp.Body()
err = json.Unmarshal(respBody, &respData)
if err != nil {
return nil, fmt.Errorf("unexpected format of the response from ML API, status: %d, response: %s", resp.Status(), respBody)
}
if respData.Status == "error" {
return nil, fmt.Errorf("ML API responded with error: %s", respData.Error)
}
if resp.Status() == http.StatusNoContent {
return nil, nil
}
if resp.Status() == http.StatusOK {
return respData.Data, nil
}
return nil, fmt.Errorf("unexpected status %d returned by ML API, response: %s", resp.Status(), respBody)
}
// unmarshalOutlierCommand parses the CommandConfiguration.Config, validates data and produces OutlierCommand.
func unmarshalOutlierCommand(expr CommandConfiguration, appURL string) (*OutlierCommand, error) {
var cfg OutlierCommandConfiguration
err := json.Unmarshal(expr.Config, &cfg)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal outlier command: %w", err)
}
if len(cfg.DatasourceUID) == 0 {
return nil, fmt.Errorf("required field `config.datasource_uid` is not specified")
}
if len(cfg.Query) == 0 && len(cfg.QueryParams) == 0 {
return nil, fmt.Errorf("neither of required fields `config.query_params` or `config.query` are specified")
}
if len(cfg.ResponseType) == 0 {
return nil, fmt.Errorf("required field `config.response_type` is not specified")
}
if len(cfg.Algorithm) == 0 {
return nil, fmt.Errorf("required field `config.algorithm` is not specified")
}
interval := defaultInterval
if expr.IntervalMs != nil {
i := time.Duration(*expr.IntervalMs) * time.Millisecond
if i > 0 {
interval = i
}
}
return &OutlierCommand{
config: cfg,
interval: interval,
appURL: appURL,
}, nil
}

@ -0,0 +1,194 @@
package ml
import (
"errors"
"fmt"
"net/http"
"testing"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
"github.com/grafana/grafana/pkg/api/response"
)
func TestOutlierExec(t *testing.T) {
outlier := OutlierCommand{
config: OutlierCommandConfiguration{
DatasourceType: "prometheus",
DatasourceUID: "a4ce599c-4c93-44b9-be5b-76385b8c01be",
QueryParams: map[string]interface{}{
"expr": "go_goroutines{}",
"range": true,
"refId": "A",
},
Algorithm: map[string]interface{}{
"name": "dbscan",
"config": map[string]interface{}{
"epsilon": 7.667,
},
"sensitivity": 0.83,
},
ResponseType: "binary",
},
appURL: "https://grafana.com",
interval: 1000 * time.Second,
}
t.Run("should generate expected parameters for request", func(t *testing.T) {
to := time.Now().UTC()
from := to.Add(-10 * time.Hour)
called := false
_, err := outlier.Execute(from, to, func(method string, path string, payload []byte) (response.Response, error) {
require.Equal(t, "POST", method)
require.Equal(t, "/proxy/api/v1/outlier", path)
assert.JSONEq(t, fmt.Sprintf(`{
"data": {
"attributes": {
"datasource_type": "prometheus",
"datasource_uid": "a4ce599c-4c93-44b9-be5b-76385b8c01be",
"query_params": {
"expr": "go_goroutines{}",
"range": true,
"refId": "A"
},
"algorithm": {
"config": {
"epsilon": 7.667
},
"name": "dbscan",
"sensitivity": 0.83
},
"response_type": "binary",
"grafana_url": "https://grafana.com",
"start_end_attributes": {
"start": "%s",
"end": "%s",
"interval": 1000000
}
}
}
}`, from.Format(timeFormat), to.Format(timeFormat)), string(payload))
called = true
return nil, nil
})
require.Truef(t, called, "request function was not called")
require.ErrorContains(t, err, "response is nil")
})
successResponse := `{"status":"success","data":{"results":{"A":{"status":200,"frames":[{"schema":{"name":"test","fields":[{"name":"Time","type":"time","typeInfo":{"frame":"time.Time"}},{"name":"Value","type":"number","typeInfo":{"frame":"float64"},"labels":{"instance":"test"}}]},"data":{"values":[[1686945300000],[0]]}}]}}}}`
testCases := []struct {
name string
response response.Response
assert func(t *testing.T, r *backend.QueryDataResponse, err error)
}{
{
name: "should return parsed frames when 200",
response: response.CreateNormalResponse(nil, []byte(successResponse), http.StatusOK),
assert: func(t *testing.T, r *backend.QueryDataResponse, err error) {
require.NoError(t, err)
require.NotNil(t, r)
require.Contains(t, r.Responses, "A")
require.NoError(t, r.Responses["A"].Error)
require.Equal(t, backend.StatusOK, r.Responses["A"].Status)
require.Len(t, r.Responses["A"].Frames, 1)
},
},
{
name: "should return nil if 204",
response: response.CreateNormalResponse(nil, []byte(`{"status": "success"}`), http.StatusNoContent),
assert: func(t *testing.T, r *backend.QueryDataResponse, err error) {
require.NoError(t, err)
require.Nil(t, r)
},
},
{
name: "should return error if any status and body has status error",
response: response.CreateNormalResponse(nil, []byte(`{"status": "error"}`), 0),
assert: func(t *testing.T, r *backend.QueryDataResponse, err error) {
require.ErrorContains(t, err, "ML API responded with error")
require.Nil(t, r)
},
},
{
name: "should return error with explanations if any status and body has status error",
response: response.CreateNormalResponse(nil, []byte(`{"status": "error", "error": "test-error"}`), 0),
assert: func(t *testing.T, r *backend.QueryDataResponse, err error) {
require.ErrorContains(t, err, "ML API responded with error")
require.ErrorContains(t, err, "test-error")
require.Nil(t, r)
},
},
{
name: "should return error response is empty",
response: response.CreateNormalResponse(nil, []byte(``), 0),
assert: func(t *testing.T, r *backend.QueryDataResponse, err error) {
require.ErrorContains(t, err, "unexpected format of the response from ML API")
require.Nil(t, r)
},
},
{
name: "should return error response is not a valid JSON",
response: response.CreateNormalResponse(nil, []byte(`{`), 0),
assert: func(t *testing.T, r *backend.QueryDataResponse, err error) {
require.ErrorContains(t, err, "unexpected format of the response from ML API")
require.Nil(t, r)
},
},
{
name: "should error if response is nil and no error",
response: nil,
assert: func(t *testing.T, r *backend.QueryDataResponse, err error) {
require.ErrorContains(t, err, "response is nil")
require.Nil(t, r)
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
to := time.Now()
from := to.Add(-10 * time.Hour)
resp, err := outlier.Execute(from, to, func(method string, path string, payload []byte) (response.Response, error) {
return tc.response, nil
})
tc.assert(t, resp, err)
})
}
t.Run("should return error if status is not known", func(t *testing.T) {
knownStatuses := []int{http.StatusOK, http.StatusNoContent}
for status := 100; status < 600; status++ {
if http.StatusText(status) == "" || slices.Contains(knownStatuses, status) {
continue
}
to := time.Now()
from := to.Add(-10 * time.Hour)
resp, err := outlier.Execute(from, to, func(method string, path string, payload []byte) (response.Response, error) {
return response.CreateNormalResponse(nil, []byte(successResponse), status), nil
})
require.ErrorContains(t, err, fmt.Sprintf("unexpected status %d returned by ML API", status))
require.Nil(t, resp)
}
})
t.Run("should propagate error from request function", func(t *testing.T) {
to := time.Now()
from := to.Add(-10 * time.Hour)
resp, err := outlier.Execute(from, to, func(method string, path string, payload []byte) (response.Response, error) {
return nil, errors.New("test-error")
})
require.ErrorContains(t, err, "test-error")
require.Nil(t, resp)
})
}

@ -0,0 +1,44 @@
package ml
import (
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/api/response"
)
type FakeCommand struct {
Method string
Path string
Payload []byte
Response *backend.QueryDataResponse
Error error
Recordings []struct {
From time.Time
To time.Time
Response response.Response
Error error
}
}
var _ Command = &FakeCommand{}
func (f *FakeCommand) DatasourceUID() string {
return "fake-ml-datasource"
}
func (f *FakeCommand) Execute(from, to time.Time, executor func(method string, path string, payload []byte) (response.Response, error)) (*backend.QueryDataResponse, error) {
r, err := executor(f.Method, f.Path, f.Payload)
f.Recordings = append(f.Recordings, struct {
From time.Time
To time.Time
Response response.Response
Error error
}{From: from, To: to, Response: r, Error: err})
if err != nil {
return nil, err
}
return f.Response, f.Error
}

@ -0,0 +1,219 @@
package expr
import (
"context"
"encoding/json"
"errors"
"net/http"
"testing"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/expr/ml"
"github.com/grafana/grafana/pkg/services/pluginsintegration/plugincontext"
"github.com/grafana/grafana/pkg/services/user"
)
func TestMLNodeExecute(t *testing.T) {
timeNow := time.Now()
expectedOrgID := int64(123)
timeRange := RelativeTimeRange{
From: -10 * time.Hour,
To: 0,
}
request := &Request{
Headers: map[string]string{
"test": "test",
},
Debug: false,
OrgId: expectedOrgID,
Queries: nil,
User: &user.SignedInUser{
UserID: 1,
},
}
expectedResponse := &backend.CallResourceResponse{
Status: 200,
Headers: nil,
Body: []byte("test-response"),
}
pluginsClient := &recordingCallResourceHandler{
response: expectedResponse,
}
pluginCtx := &fakePluginContextProvider{
result: map[string]*backend.AppInstanceSettings{
mlPluginID: {
JSONData: json.RawMessage(`{ "initialized": true }`),
},
},
}
s := &Service{
cfg: nil,
dataService: nil,
pCtxProvider: pluginCtx,
features: nil,
pluginsClient: pluginsClient,
tracer: nil,
metrics: newMetrics(nil),
}
cmdResponse := data.NewFrame("test",
data.NewField("Time", nil, []time.Time{time.Unix(1, 0)}),
data.NewField("Value", nil, []*float64{fp(1)}),
)
cmd := &ml.FakeCommand{
Method: http.MethodPost,
Path: "/test/ml",
Payload: []byte(`{}`),
Response: &backend.QueryDataResponse{
Responses: map[string]backend.DataResponse{
"A": {
Frames: data.Frames{
cmdResponse,
},
Status: backend.StatusOK,
},
},
},
Error: nil,
}
node := &MLNode{
baseNode: baseNode{},
command: cmd,
TimeRange: timeRange,
request: request,
}
result, err := node.Execute(context.Background(), timeNow, nil, s)
require.NoError(t, err)
require.NotNil(t, result)
require.NotEmpty(t, result.Values)
t.Run("should get plugin context", func(t *testing.T) {
require.NotEmpty(t, pluginCtx.recordings)
require.Equal(t, "Get", pluginCtx.recordings[0].method)
require.Equal(t, mlPluginID, pluginCtx.recordings[0].params[0])
require.Equal(t, request.User, pluginCtx.recordings[0].params[1])
})
t.Run("should call command execute with correct parameters", func(t *testing.T) {
require.NotEmpty(t, cmd.Recordings)
rec := cmd.Recordings[0]
require.Equal(t, timeRange.AbsoluteTime(timeNow).From, rec.From)
require.Equal(t, timeRange.AbsoluteTime(timeNow).To, rec.To)
require.NotNil(t, rec.Response)
require.Equal(t, expectedResponse.Status, rec.Response.Status())
require.Equal(t, expectedResponse.Body, rec.Response.Body())
})
t.Run("should call plugin API", func(t *testing.T) {
require.NotEmpty(t, pluginsClient.recordings)
req := pluginsClient.recordings[0]
require.Equal(t, cmd.Payload, req.Body)
require.Equal(t, cmd.Path, req.Path)
require.Equal(t, cmd.Method, req.Method)
require.NotNil(t, req.PluginContext)
require.Equal(t, mlPluginID, req.PluginContext.PluginID)
t.Run("should append request headers to API call", func(t *testing.T) {
for key, value := range request.Headers {
require.Contains(t, req.Headers, key)
require.Equal(t, value, req.Headers[key][0])
}
})
})
t.Run("should fail if plugin is not installed", func(t *testing.T) {
s := &Service{
cfg: nil,
dataService: nil,
pCtxProvider: &fakePluginContextProvider{
errorResult: plugincontext.ErrPluginNotFound,
},
features: nil,
pluginsClient: nil,
tracer: nil,
metrics: nil,
}
_, err := node.Execute(context.Background(), timeNow, nil, s)
require.ErrorIs(t, err, errMLPluginDoesNotExist)
})
t.Run("should fail if plugin settings cannot be retrieved", func(t *testing.T) {
expectedErr := errors.New("test-error")
s := &Service{
cfg: nil,
dataService: nil,
pCtxProvider: &fakePluginContextProvider{
errorResult: expectedErr,
},
features: nil,
pluginsClient: nil,
tracer: nil,
metrics: nil,
}
_, err := node.Execute(context.Background(), timeNow, nil, s)
require.ErrorIs(t, err, expectedErr)
})
t.Run("should fail if plugin is not initialized", func(t *testing.T) {
s := &Service{
cfg: nil,
dataService: nil,
pCtxProvider: &fakePluginContextProvider{
result: map[string]*backend.AppInstanceSettings{
mlPluginID: {
JSONData: json.RawMessage(`{}`),
},
},
},
features: nil,
pluginsClient: nil,
tracer: nil,
metrics: nil,
}
_, err := node.Execute(context.Background(), timeNow, nil, s)
require.ErrorIs(t, err, errMLPluginDoesNotExist)
})
t.Run("should return QueryError if command failed", func(t *testing.T) {
s := &Service{
cfg: nil,
dataService: nil,
pCtxProvider: pluginCtx,
features: nil,
pluginsClient: pluginsClient,
tracer: nil,
metrics: newMetrics(nil),
}
cmd := &ml.FakeCommand{
Error: errors.New("failed to execute command"),
}
node := &MLNode{
baseNode: baseNode{},
command: cmd,
TimeRange: timeRange,
request: request,
}
_, err := node.Execute(context.Background(), timeNow, nil, s)
require.IsType(t, err, QueryError{})
require.ErrorIs(t, err, cmd.Error)
})
}

@ -47,6 +47,7 @@ type baseNode struct {
type rawNode struct {
RefID string `json:"refId"`
Query map[string]interface{}
QueryRaw []byte
QueryType string
TimeRange TimeRange
DataSource *datasources.DataSource

@ -15,6 +15,7 @@ import (
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/pluginsintegration/plugincontext"
"github.com/grafana/grafana/pkg/services/user"
"github.com/grafana/grafana/pkg/setting"
)
@ -47,6 +48,9 @@ func NodeTypeFromDatasourceUID(uid string) NodeType {
if IsDataSource(uid) {
return TypeCMDNode
}
if uid == MLDatasourceUID {
return TypeMLNode
}
return TypeDatasourceNode
}
@ -54,7 +58,7 @@ func NodeTypeFromDatasourceUID(uid string) NodeType {
type Service struct {
cfg *setting.Cfg
dataService backend.QueryDataHandler
pCtxProvider *plugincontext.Provider
pCtxProvider pluginContextProvider
features featuremgmt.FeatureToggles
pluginsClient backend.CallResourceHandler
@ -63,6 +67,11 @@ type Service struct {
metrics *metrics
}
type pluginContextProvider interface {
Get(ctx context.Context, pluginID string, user *user.SignedInUser, orgID int64) (backend.PluginContext, error)
GetWithDataSource(ctx context.Context, pluginID string, user *user.SignedInUser, ds *datasources.DataSource) (backend.PluginContext, error)
}
func ProvideService(cfg *setting.Cfg, pluginClient plugins.Client, pCtxProvider *plugincontext.Provider,
features featuremgmt.FeatureToggles, registerer prometheus.Registerer, tracer tracing.Tracer) *Service {
return &Service{
@ -117,6 +126,15 @@ func DataSourceModelFromNodeType(kind NodeType) (*datasources.DataSource, error)
JsonData: simplejson.New(),
SecureJsonData: make(map[string][]byte),
}, nil
case TypeMLNode:
return &datasources.DataSource{
ID: mlDatasourceID,
UID: MLDatasourceUID,
Name: DatasourceUID,
Type: mlPluginID,
JsonData: simplejson.New(),
SecureJsonData: make(map[string][]byte),
}, nil
case TypeDatasourceNode:
return nil, errors.New("cannot create expression data source for data source kind")
default:

@ -0,0 +1,84 @@
package expr
import (
"context"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/user"
)
type fakePluginContextProvider struct {
recordings []struct {
method string
params []interface{}
}
result map[string]*backend.AppInstanceSettings
errorResult error
}
var _ pluginContextProvider = &fakePluginContextProvider{}
func (f *fakePluginContextProvider) Get(_ context.Context, pluginID string, user *user.SignedInUser, orgID int64) (backend.PluginContext, error) {
f.recordings = append(f.recordings, struct {
method string
params []interface{}
}{method: "Get", params: []interface{}{pluginID, user, orgID}})
if f.errorResult != nil {
return backend.PluginContext{}, f.errorResult
}
var u *backend.User
if user != nil {
u = &backend.User{
Login: user.Login,
Name: user.Name,
Email: user.Email,
}
}
return backend.PluginContext{
OrgID: orgID,
PluginID: pluginID,
User: u,
AppInstanceSettings: f.result[pluginID],
DataSourceInstanceSettings: nil,
}, nil
}
func (f *fakePluginContextProvider) GetWithDataSource(ctx context.Context, pluginID string, user *user.SignedInUser, ds *datasources.DataSource) (backend.PluginContext, error) {
f.recordings = append(f.recordings, struct {
method string
params []interface{}
}{method: "GetWithDataSource", params: []interface{}{pluginID, user, ds}})
if f.errorResult != nil {
return backend.PluginContext{}, f.errorResult
}
orgId := int64(1)
if user != nil {
orgId = user.OrgID
}
r, err := f.Get(ctx, pluginID, user, orgId)
if ds != nil {
r.DataSourceInstanceSettings = &backend.DataSourceInstanceSettings{
ID: ds.ID,
UID: ds.UID,
Type: ds.Type,
Name: ds.Name,
}
}
return r, err
}
type recordingCallResourceHandler struct {
recordings []*backend.CallResourceRequest
response *backend.CallResourceResponse
}
var _ backend.CallResourceHandler = &recordingCallResourceHandler{}
func (f *recordingCallResourceHandler) CallResource(_ context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
f.recordings = append(f.recordings, req)
return sender.Send(f.response)
}

@ -647,5 +647,12 @@ var (
FrontendOnly: true,
Owner: grafanaObservabilityMetricsSquad,
},
{
Name: "mlExpressions",
Description: "Enable support for Machine Learning in server-side expressions",
Stage: FeatureStageExperimental,
FrontendOnly: false,
Owner: grafanaAlertingSquad,
},
}
)

@ -94,3 +94,4 @@ prometheusIncrementalQueryInstrumentation,experimental,@grafana/observability-me
logsExploreTableVisualisation,experimental,@grafana/observability-logs,false,false,false,true
awsDatasourcesTempCredentials,experimental,@grafana/aws-datasources,false,false,false,false
transformationsRedesign,experimental,@grafana/observability-metrics,false,false,false,true
mlExpressions,experimental,@grafana/alerting-squad,false,false,false,false

1 Name Stage Owner requiresDevMode RequiresLicense RequiresRestart FrontendOnly
94 logsExploreTableVisualisation experimental @grafana/observability-logs false false false true
95 awsDatasourcesTempCredentials experimental @grafana/aws-datasources false false false false
96 transformationsRedesign experimental @grafana/observability-metrics false false false true
97 mlExpressions experimental @grafana/alerting-squad false false false false

@ -386,4 +386,8 @@ const (
// FlagTransformationsRedesign
// Enables the transformations redesign
FlagTransformationsRedesign = "transformationsRedesign"
// FlagMlExpressions
// Enable support for Machine Learning in server-side expressions
FlagMlExpressions = "mlExpressions"
)

@ -270,10 +270,10 @@ func getExprRequest(ctx EvaluationContext, data []models.AlertQuery, dsCacheServ
ds, ok := datasources[q.DatasourceUID]
if !ok {
switch nodeType := expr.NodeTypeFromDatasourceUID(q.DatasourceUID); nodeType {
case expr.TypeCMDNode:
ds, err = expr.DataSourceModelFromNodeType(nodeType)
case expr.TypeDatasourceNode:
ds, err = dsCacheService.GetDatasourceByUID(ctx.Ctx, q.DatasourceUID, ctx.User, false /*skipCache*/)
default:
ds, err = expr.DataSourceModelFromNodeType(nodeType)
}
if err != nil {
return nil, fmt.Errorf("failed to build query '%s': %w", q.RefID, err)
@ -622,15 +622,24 @@ func (e *evaluatorImpl) Validate(ctx EvaluationContext, condition models.Conditi
return err
}
for _, query := range req.Queries {
if query.DataSource == nil || expr.NodeTypeFromDatasourceUID(query.DataSource.UID) != expr.TypeDatasourceNode {
if query.DataSource == nil {
continue
}
p, found := e.pluginsStore.Plugin(ctx.Ctx, query.DataSource.Type)
if !found { // technically this should fail earlier during datasource resolution phase.
return fmt.Errorf("datasource refID %s could not be found: %w", query.RefID, plugins.ErrPluginUnavailable)
}
if !p.Backend {
return fmt.Errorf("datasource refID %s is not a backend datasource", query.RefID)
switch expr.NodeTypeFromDatasourceUID(query.DataSource.UID) {
case expr.TypeDatasourceNode:
p, found := e.pluginsStore.Plugin(ctx.Ctx, query.DataSource.Type)
if !found { // technically this should fail earlier during datasource resolution phase.
return fmt.Errorf("datasource refID %s could not be found: %w", query.RefID, plugins.ErrPluginUnavailable)
}
if !p.Backend {
return fmt.Errorf("datasource refID %s is not a backend datasource", query.RefID)
}
case expr.TypeMLNode:
_, found := e.pluginsStore.Plugin(ctx.Ctx, query.DataSource.Type)
if !found {
return fmt.Errorf("datasource refID %s could not be found: %w", query.RefID, plugins.ErrPluginUnavailable)
}
case expr.TypeCMDNode:
}
}
_, err = e.create(condition, req)

Loading…
Cancel
Save