mirror of https://github.com/grafana/grafana
Alerting/testing promql extraction (#34665)
* promql compat for marshaling * extracts upstream instant queries into data frame for alerting * eval string paritypull/34680/head
parent
a5082ab112
commit
0e0ed43153
@ -0,0 +1,145 @@ |
||||
package api |
||||
|
||||
import ( |
||||
"encoding/json" |
||||
"errors" |
||||
"fmt" |
||||
"strconv" |
||||
"strings" |
||||
|
||||
cortex_util "github.com/cortexproject/cortex/pkg/util" |
||||
"github.com/grafana/grafana-plugin-sdk-go/data" |
||||
"github.com/grafana/grafana/pkg/services/ngalert/eval" |
||||
"github.com/prometheus/prometheus/pkg/labels" |
||||
"github.com/prometheus/prometheus/promql" |
||||
"github.com/prometheus/prometheus/promql/parser" |
||||
|
||||
"github.com/grafana/grafana/pkg/util" |
||||
) |
||||
|
||||
type instantQueryResponse struct { |
||||
Status string `json:"status"` |
||||
Data queryData `json:"data,omitempty"` |
||||
ErrorType string `json:"errorType,omitempty"` |
||||
Error string `json:"error,omitempty"` |
||||
} |
||||
|
||||
type queryData struct { |
||||
ResultType parser.ValueType `json:"resultType"` |
||||
Result json.RawMessage `json:"result"` |
||||
vector vector `json:"-"` |
||||
scalar scalar `json:"-"` |
||||
} |
||||
|
||||
type scalar promql.Scalar |
||||
|
||||
func (s *scalar) UnmarshalJSON(b []byte) error { |
||||
var xs []interface{} |
||||
if err := json.Unmarshal(b, &xs); err != nil { |
||||
return err |
||||
} |
||||
// scalars are encoded like `[ts/1000, "value"]`
|
||||
if len(xs) != 2 { |
||||
return fmt.Errorf("unexpected number of scalar encoded values: %d", len(xs)) |
||||
} |
||||
ts, ok := xs[0].(float64) |
||||
if !ok { |
||||
return fmt.Errorf("first value in scalar uncoercible to timestamp: %v", xs[0]) |
||||
} |
||||
s.T = int64(ts) * 1000 |
||||
v, ok := xs[1].(string) |
||||
if !ok { |
||||
return fmt.Errorf("second value in scalar not string encoded: %v", xs[1]) |
||||
} |
||||
f, err := strconv.ParseFloat(v, 64) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
s.V = f |
||||
return nil |
||||
} |
||||
|
||||
func (d *queryData) UnmarshalJSON(b []byte) error { |
||||
type plain queryData |
||||
if err := json.Unmarshal(b, (*plain)(d)); err != nil { |
||||
return err |
||||
} |
||||
|
||||
switch d.ResultType { |
||||
case parser.ValueTypeScalar: |
||||
return json.Unmarshal(d.Result, &d.scalar) |
||||
case parser.ValueTypeVector: |
||||
return json.Unmarshal(d.Result, &d.vector) |
||||
default: |
||||
return fmt.Errorf("unexpected response type: %s", d.ResultType) |
||||
} |
||||
} |
||||
|
||||
type sample struct { |
||||
Metric labels.Labels `json:"metric"` |
||||
Value scalar `json:"value"` |
||||
} |
||||
type vector []sample |
||||
|
||||
func instantQueryResults(resp instantQueryResponse) (eval.Results, error) { |
||||
if resp.Error != "" || resp.Status != "success" { |
||||
return nil, errors.New(resp.Error) |
||||
} |
||||
|
||||
switch resp.Data.ResultType { |
||||
case parser.ValueTypeScalar: |
||||
return eval.Results{{ |
||||
Instance: map[string]string{}, |
||||
State: eval.Alerting, |
||||
EvaluatedAt: cortex_util.TimeFromMillis(resp.Data.scalar.T), |
||||
EvaluationString: extractEvalStringFromProm(sample{ |
||||
Value: resp.Data.scalar, |
||||
}), |
||||
}}, nil |
||||
case parser.ValueTypeVector: |
||||
results := make(eval.Results, 0, len(resp.Data.vector)) |
||||
for _, s := range resp.Data.vector { |
||||
results = append(results, eval.Result{ |
||||
Instance: s.Metric.Map(), |
||||
State: eval.Alerting, |
||||
EvaluatedAt: cortex_util.TimeFromMillis(s.Value.T), |
||||
EvaluationString: extractEvalStringFromProm(s), |
||||
}) |
||||
} |
||||
return results, nil |
||||
default: |
||||
return nil, fmt.Errorf("unexpected response type: %s", resp.Data.ResultType) |
||||
} |
||||
} |
||||
|
||||
func instantQueryResultsExtractor(b []byte) (interface{}, error) { |
||||
var resp instantQueryResponse |
||||
err := json.Unmarshal(b, &resp) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
res, err := instantQueryResults(resp) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
frame := res.AsDataFrame() |
||||
|
||||
return util.DynMap{ |
||||
"instances": []*data.Frame{&frame}, |
||||
}, nil |
||||
} |
||||
|
||||
// extractEvalStringFromProm is intended to mimic the functionality used in ngalert/eval
|
||||
func extractEvalStringFromProm(s sample) string { |
||||
var sb strings.Builder |
||||
sb.WriteString("[ ") |
||||
var ls string |
||||
if len(s.Metric) > 0 { |
||||
ls = s.Metric.String() |
||||
} |
||||
sb.WriteString(fmt.Sprintf("labels={%s} ", ls)) |
||||
sb.WriteString(fmt.Sprintf("value=%v ", fmt.Sprintf("%v", s.Value.V))) |
||||
sb.WriteString("]") |
||||
return sb.String() |
||||
} |
||||
@ -0,0 +1,107 @@ |
||||
package api |
||||
|
||||
import ( |
||||
"encoding/json" |
||||
"testing" |
||||
|
||||
"github.com/prometheus/prometheus/pkg/labels" |
||||
"github.com/prometheus/prometheus/promql/parser" |
||||
"github.com/stretchr/testify/require" |
||||
) |
||||
|
||||
func Test_instantQueryMarshaling(t *testing.T) { |
||||
for _, tc := range []struct { |
||||
desc string |
||||
in string |
||||
exp parser.ValueType |
||||
expScalar *scalar |
||||
expVector *vector |
||||
isError bool // successfully unpack an upstream error
|
||||
}{ |
||||
{ |
||||
desc: "scalar", |
||||
in: `{ |
||||
"status": "success", |
||||
"data": { |
||||
"resultType": "scalar", |
||||
"result": [ |
||||
12, |
||||
"2" |
||||
] |
||||
} |
||||
}`, |
||||
exp: parser.ValueTypeScalar, |
||||
expScalar: &scalar{ |
||||
T: 12000, |
||||
V: 2, |
||||
}, |
||||
}, |
||||
{ |
||||
desc: "vector", |
||||
in: `{ |
||||
"status": "success", |
||||
"data": { |
||||
"resultType": "vector", |
||||
"result": [ |
||||
{ |
||||
"metric": { |
||||
"__name__": "apiserver_request:burnrate1d" |
||||
}, |
||||
"value": [ |
||||
12.04, |
||||
"10.5" |
||||
] |
||||
} |
||||
] |
||||
} |
||||
}`, |
||||
exp: parser.ValueTypeVector, |
||||
expVector: &vector{ |
||||
sample{ |
||||
Value: scalar{ |
||||
T: 12000, // loses some precision during marshaling
|
||||
V: 10.5, |
||||
}, |
||||
Metric: []labels.Label{{ |
||||
Name: "__name__", |
||||
Value: "apiserver_request:burnrate1d", |
||||
}}, |
||||
}, |
||||
}, |
||||
}, |
||||
{ |
||||
desc: "successfully parse error", |
||||
in: `{ |
||||
"status": "failure", |
||||
"errorType": "someErr", |
||||
"error": "error doing something" |
||||
}`, |
||||
isError: true, |
||||
}, |
||||
} { |
||||
t.Run(tc.desc, func(t *testing.T) { |
||||
var out instantQueryResponse |
||||
err := json.Unmarshal([]byte(tc.in), &out) |
||||
require.NoError(t, err) |
||||
|
||||
if tc.isError { |
||||
require.Equal(t, out.Status, "failure") |
||||
require.Greater(t, len(out.ErrorType), 0) |
||||
require.Greater(t, len(out.Error), 0) |
||||
return |
||||
} |
||||
|
||||
require.Equal(t, tc.exp, out.Data.ResultType) |
||||
b, err := json.MarshalIndent(out, "", " ") |
||||
require.Nil(t, err) |
||||
require.Equal(t, tc.in, string(b)) |
||||
|
||||
if tc.expScalar != nil { |
||||
require.Equal(t, *tc.expScalar, out.Data.scalar) |
||||
} |
||||
if tc.expVector != nil { |
||||
require.Equal(t, *tc.expVector, out.Data.vector) |
||||
} |
||||
}) |
||||
} |
||||
} |
||||
Loading…
Reference in new issue