The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
grafana/pkg/util/converter/prom.go

766 lines
18 KiB

package converter
import (
"encoding/json"
"fmt"
"sort"
"strconv"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana-plugin-sdk-go/experimental"
jsoniter "github.com/json-iterator/go"
)
// helpful while debugging all the options that may appear
func logf(format string, a ...interface{}) {
//fmt.Printf(format, a...)
}
type Options struct {
MatrixWideSeries bool
VectorWideSeries bool
}
// ReadPrometheusStyleResult will read results from a prometheus or loki server and return data frames
func ReadPrometheusStyleResult(iter *jsoniter.Iterator, opt Options) *backend.DataResponse {
var rsp *backend.DataResponse
status := "unknown"
errorType := ""
err := ""
warnings := []data.Notice{}
for l1Field := iter.ReadObject(); l1Field != ""; l1Field = iter.ReadObject() {
switch l1Field {
case "status":
status = iter.ReadString()
case "data":
rsp = readPrometheusData(iter, opt)
case "error":
err = iter.ReadString()
case "errorType":
errorType = iter.ReadString()
case "warnings":
warnings = readWarnings(iter)
default:
v := iter.Read()
logf("[ROOT] TODO, support key: %s / %v\n", l1Field, v)
}
}
if status == "error" {
return &backend.DataResponse{
Error: fmt.Errorf("%s: %s", errorType, err),
}
}
if len(warnings) > 0 {
for _, frame := range rsp.Frames {
if frame.Meta == nil {
frame.Meta = &data.FrameMeta{}
}
frame.Meta.Notices = warnings
}
}
return rsp
}
func readWarnings(iter *jsoniter.Iterator) []data.Notice {
warnings := []data.Notice{}
if iter.WhatIsNext() != jsoniter.ArrayValue {
return warnings
}
for iter.ReadArray() {
if iter.WhatIsNext() == jsoniter.StringValue {
notice := data.Notice{
Severity: data.NoticeSeverityWarning,
Text: iter.ReadString(),
}
warnings = append(warnings, notice)
}
}
return warnings
}
func readPrometheusData(iter *jsoniter.Iterator, opt Options) *backend.DataResponse {
t := iter.WhatIsNext()
if t == jsoniter.ArrayValue {
return readArrayData(iter)
}
if t != jsoniter.ObjectValue {
return &backend.DataResponse{
Error: fmt.Errorf("expected object type"),
}
}
resultType := ""
var rsp *backend.DataResponse
for l1Field := iter.ReadObject(); l1Field != ""; l1Field = iter.ReadObject() {
switch l1Field {
case "resultType":
resultType = iter.ReadString()
case "result":
switch resultType {
case "matrix":
if opt.MatrixWideSeries {
rsp = readMatrixOrVectorWide(iter, resultType)
} else {
rsp = readMatrixOrVectorMulti(iter, resultType)
}
case "vector":
if opt.VectorWideSeries {
rsp = readMatrixOrVectorWide(iter, resultType)
} else {
rsp = readMatrixOrVectorMulti(iter, resultType)
}
case "streams":
rsp = readStream(iter)
case "string":
rsp = readString(iter)
case "scalar":
rsp = readScalar(iter)
default:
iter.Skip()
rsp = &backend.DataResponse{
Error: fmt.Errorf("unknown result type: %s", resultType),
}
}
case "stats":
v := iter.Read()
if len(rsp.Frames) > 0 {
meta := rsp.Frames[0].Meta
if meta == nil {
meta = &data.FrameMeta{}
rsp.Frames[0].Meta = meta
}
meta.Custom = map[string]interface{}{
"stats": v,
}
}
default:
v := iter.Read()
logf("[data] TODO, support key: %s / %v\n", l1Field, v)
}
}
return rsp
}
// will return strings or exemplars
func readArrayData(iter *jsoniter.Iterator) *backend.DataResponse {
lookup := make(map[string]*data.Field)
var labelFrame *data.Frame
rsp := &backend.DataResponse{}
stringField := data.NewFieldFromFieldType(data.FieldTypeString, 0)
stringField.Name = "Value"
for iter.ReadArray() {
switch iter.WhatIsNext() {
case jsoniter.StringValue:
stringField.Append(iter.ReadString())
// Either label or exemplars
case jsoniter.ObjectValue:
exemplar, labelPairs := readLabelsOrExemplars(iter)
if exemplar != nil {
rsp.Frames = append(rsp.Frames, exemplar)
} else if labelPairs != nil {
max := 0
for _, pair := range labelPairs {
k := pair[0]
v := pair[1]
f, ok := lookup[k]
if !ok {
f = data.NewFieldFromFieldType(data.FieldTypeString, 0)
f.Name = k
lookup[k] = f
if labelFrame == nil {
labelFrame = data.NewFrame("")
rsp.Frames = append(rsp.Frames, labelFrame)
}
labelFrame.Fields = append(labelFrame.Fields, f)
}
f.Append(fmt.Sprintf("%v", v))
if f.Len() > max {
max = f.Len()
}
}
// Make sure all fields have equal length
for _, f := range lookup {
diff := max - f.Len()
if diff > 0 {
f.Extend(diff)
}
}
}
default:
{
ext := iter.ReadAny()
v := fmt.Sprintf("%v", ext)
stringField.Append(v)
}
}
}
if stringField.Len() > 0 {
rsp.Frames = append(rsp.Frames, data.NewFrame("", stringField))
}
return rsp
}
// For consistent ordering read values to an array not a map
func readLabelsAsPairs(iter *jsoniter.Iterator) [][2]string {
pairs := make([][2]string, 0, 10)
for k := iter.ReadObject(); k != ""; k = iter.ReadObject() {
pairs = append(pairs, [2]string{k, iter.ReadString()})
}
return pairs
}
func readLabelsOrExemplars(iter *jsoniter.Iterator) (*data.Frame, [][2]string) {
pairs := make([][2]string, 0, 10)
labels := data.Labels{}
var frame *data.Frame
for l1Field := iter.ReadObject(); l1Field != ""; l1Field = iter.ReadObject() {
switch l1Field {
case "seriesLabels":
iter.ReadVal(&labels)
case "exemplars":
lookup := make(map[string]*data.Field)
timeField := data.NewFieldFromFieldType(data.FieldTypeTime, 0)
timeField.Name = data.TimeSeriesTimeFieldName
valueField := data.NewFieldFromFieldType(data.FieldTypeFloat64, 0)
valueField.Name = data.TimeSeriesValueFieldName
valueField.Labels = labels
frame = data.NewFrame("", timeField, valueField)
frame.Meta = &data.FrameMeta{
Custom: resultTypeToCustomMeta("exemplar"),
}
for iter.ReadArray() {
for l2Field := iter.ReadObject(); l2Field != ""; l2Field = iter.ReadObject() {
switch l2Field {
// nolint:goconst
case "value":
v, _ := strconv.ParseFloat(iter.ReadString(), 64)
valueField.Append(v)
case "timestamp":
ts := timeFromFloat(iter.ReadFloat64())
timeField.Append(ts)
case "labels":
max := 0
for _, pair := range readLabelsAsPairs(iter) {
k := pair[0]
v := pair[1]
f, ok := lookup[k]
if !ok {
f = data.NewFieldFromFieldType(data.FieldTypeString, 0)
f.Name = k
lookup[k] = f
frame.Fields = append(frame.Fields, f)
}
f.Append(v)
if f.Len() > max {
max = f.Len()
}
}
// Make sure all fields have equal length
for _, f := range lookup {
diff := max - f.Len()
if diff > 0 {
f.Extend(diff)
}
}
default:
iter.Skip()
frame.AppendNotices(data.Notice{
Severity: data.NoticeSeverityError,
Text: fmt.Sprintf("unable to parse key: %s in response body", l2Field),
})
}
}
}
default:
v := fmt.Sprintf("%v", iter.Read())
pairs = append(pairs, [2]string{l1Field, v})
}
}
return frame, pairs
}
func readString(iter *jsoniter.Iterator) *backend.DataResponse {
timeField := data.NewFieldFromFieldType(data.FieldTypeTime, 0)
timeField.Name = data.TimeSeriesTimeFieldName
valueField := data.NewFieldFromFieldType(data.FieldTypeString, 0)
valueField.Name = data.TimeSeriesValueFieldName
valueField.Labels = data.Labels{}
iter.ReadArray()
t := iter.ReadFloat64()
iter.ReadArray()
v := iter.ReadString()
iter.ReadArray()
tt := timeFromFloat(t)
timeField.Append(tt)
valueField.Append(v)
frame := data.NewFrame("", timeField, valueField)
frame.Meta = &data.FrameMeta{
Type: data.FrameTypeTimeSeriesMany,
Custom: resultTypeToCustomMeta("string"),
}
return &backend.DataResponse{
Frames: []*data.Frame{frame},
}
}
func readScalar(iter *jsoniter.Iterator) *backend.DataResponse {
timeField := data.NewFieldFromFieldType(data.FieldTypeTime, 0)
timeField.Name = data.TimeSeriesTimeFieldName
valueField := data.NewFieldFromFieldType(data.FieldTypeFloat64, 0)
valueField.Name = data.TimeSeriesValueFieldName
valueField.Labels = data.Labels{}
t, v, err := readTimeValuePair(iter)
if err == nil {
timeField.Append(t)
valueField.Append(v)
}
frame := data.NewFrame("", timeField, valueField)
frame.Meta = &data.FrameMeta{
Type: data.FrameTypeTimeSeriesMany,
Custom: resultTypeToCustomMeta("scalar"),
}
return &backend.DataResponse{
Frames: []*data.Frame{frame},
}
}
func readMatrixOrVectorWide(iter *jsoniter.Iterator, resultType string) *backend.DataResponse {
rowIdx := 0
timeMap := map[int64]int{}
timeField := data.NewFieldFromFieldType(data.FieldTypeTime, 0)
timeField.Name = data.TimeSeriesTimeFieldName
frame := data.NewFrame("", timeField)
frame.Meta = &data.FrameMeta{
Type: data.FrameTypeTimeSeriesWide,
Custom: resultTypeToCustomMeta(resultType),
}
rsp := &backend.DataResponse{
Frames: []*data.Frame{},
}
for iter.ReadArray() {
valueField := data.NewFieldFromFieldType(data.FieldTypeNullableFloat64, frame.Rows())
valueField.Name = data.TimeSeriesValueFieldName
valueField.Labels = data.Labels{}
frame.Fields = append(frame.Fields, valueField)
var histogram *histogramInfo
for l1Field := iter.ReadObject(); l1Field != ""; l1Field = iter.ReadObject() {
switch l1Field {
case "metric":
iter.ReadVal(&valueField.Labels)
case "value":
timeMap, rowIdx = addValuePairToFrame(frame, timeMap, rowIdx, iter)
// nolint:goconst
case "values":
for iter.ReadArray() {
timeMap, rowIdx = addValuePairToFrame(frame, timeMap, rowIdx, iter)
}
case "histogram":
if histogram == nil {
histogram = newHistogramInfo()
}
err := readHistogram(iter, histogram)
if err != nil {
rsp.Error = err
}
case "histograms":
if histogram == nil {
histogram = newHistogramInfo()
}
for iter.ReadArray() {
err := readHistogram(iter, histogram)
if err != nil {
rsp.Error = err
}
}
default:
iter.Skip()
logf("readMatrixOrVector: %s\n", l1Field)
}
}
if histogram != nil {
histogram.yMin.Labels = valueField.Labels
frame := data.NewFrame(valueField.Name, histogram.time, histogram.yMin, histogram.yMax, histogram.count, histogram.yLayout)
frame.Meta = &data.FrameMeta{
Type: "heatmap-cells",
}
if frame.Name == data.TimeSeriesValueFieldName {
frame.Name = "" // only set the name if useful
}
rsp.Frames = append(rsp.Frames, frame)
}
}
if len(rsp.Frames) == 0 {
sorter := experimental.NewFrameSorter(frame, frame.Fields[0])
sort.Sort(sorter)
rsp.Frames = append(rsp.Frames, frame)
}
return rsp
}
func addValuePairToFrame(frame *data.Frame, timeMap map[int64]int, rowIdx int, iter *jsoniter.Iterator) (map[int64]int, int) {
timeField := frame.Fields[0]
valueField := frame.Fields[len(frame.Fields)-1]
t, v, err := readTimeValuePair(iter)
if err != nil {
return timeMap, rowIdx
}
ns := t.UnixNano()
i, ok := timeMap[ns]
if !ok {
timeMap[ns] = rowIdx
i = rowIdx
expandFrame(frame, i)
rowIdx++
}
timeField.Set(i, t)
valueField.Set(i, &v)
return timeMap, rowIdx
}
func readMatrixOrVectorMulti(iter *jsoniter.Iterator, resultType string) *backend.DataResponse {
rsp := &backend.DataResponse{}
for iter.ReadArray() {
timeField := data.NewFieldFromFieldType(data.FieldTypeTime, 0)
timeField.Name = data.TimeSeriesTimeFieldName
valueField := data.NewFieldFromFieldType(data.FieldTypeFloat64, 0)
valueField.Name = data.TimeSeriesValueFieldName
valueField.Labels = data.Labels{}
var histogram *histogramInfo
for l1Field := iter.ReadObject(); l1Field != ""; l1Field = iter.ReadObject() {
switch l1Field {
case "metric":
iter.ReadVal(&valueField.Labels)
case "value":
t, v, err := readTimeValuePair(iter)
if err == nil {
timeField.Append(t)
valueField.Append(v)
}
// nolint:goconst
case "values":
for iter.ReadArray() {
t, v, err := readTimeValuePair(iter)
if err == nil {
timeField.Append(t)
valueField.Append(v)
}
}
case "histogram":
if histogram == nil {
histogram = newHistogramInfo()
}
err := readHistogram(iter, histogram)
if err != nil {
rsp.Error = err
}
case "histograms":
if histogram == nil {
histogram = newHistogramInfo()
}
for iter.ReadArray() {
err := readHistogram(iter, histogram)
if err != nil {
rsp.Error = err
}
}
default:
iter.Skip()
logf("readMatrixOrVector: %s\n", l1Field)
}
}
if histogram != nil {
histogram.yMin.Labels = valueField.Labels
frame := data.NewFrame(valueField.Name, histogram.time, histogram.yMin, histogram.yMax, histogram.count, histogram.yLayout)
frame.Meta = &data.FrameMeta{
Type: "heatmap-cells",
}
if frame.Name == data.TimeSeriesValueFieldName {
frame.Name = "" // only set the name if useful
}
rsp.Frames = append(rsp.Frames, frame)
} else {
frame := data.NewFrame("", timeField, valueField)
frame.Meta = &data.FrameMeta{
Type: data.FrameTypeTimeSeriesMany,
Custom: resultTypeToCustomMeta(resultType),
}
rsp.Frames = append(rsp.Frames, frame)
}
}
return rsp
}
func readTimeValuePair(iter *jsoniter.Iterator) (time.Time, float64, error) {
iter.ReadArray()
t := iter.ReadFloat64()
iter.ReadArray()
v := iter.ReadString()
iter.ReadArray()
tt := timeFromFloat(t)
fv, err := strconv.ParseFloat(v, 64)
return tt, fv, err
}
func expandFrame(frame *data.Frame, idx int) {
for _, f := range frame.Fields {
if idx+1 > f.Len() {
f.Extend(idx + 1 - f.Len())
}
}
}
type histogramInfo struct {
//XMax (time) YMin Ymax Count YLayout
time *data.Field
yMin *data.Field // will have labels?
yMax *data.Field
count *data.Field
yLayout *data.Field
}
func newHistogramInfo() *histogramInfo {
hist := &histogramInfo{
time: data.NewFieldFromFieldType(data.FieldTypeTime, 0),
yMin: data.NewFieldFromFieldType(data.FieldTypeFloat64, 0),
yMax: data.NewFieldFromFieldType(data.FieldTypeFloat64, 0),
count: data.NewFieldFromFieldType(data.FieldTypeFloat64, 0),
yLayout: data.NewFieldFromFieldType(data.FieldTypeInt8, 0),
}
hist.time.Name = "xMax"
hist.yMin.Name = "yMin"
hist.yMax.Name = "yMax"
hist.count.Name = "count"
hist.yLayout.Name = "yLayout"
return hist
}
// This will read a single sparse histogram
// [ time, { count, sum, buckets: [...] }]
func readHistogram(iter *jsoniter.Iterator, hist *histogramInfo) error {
// first element
iter.ReadArray()
t := timeFromFloat(iter.ReadFloat64())
var err error
// next object element
iter.ReadArray()
for l1Field := iter.ReadObject(); l1Field != ""; l1Field = iter.ReadObject() {
switch l1Field {
case "count":
iter.Skip()
case "sum":
iter.Skip()
case "buckets":
for iter.ReadArray() {
hist.time.Append(t)
iter.ReadArray()
hist.yLayout.Append(iter.ReadInt8())
iter.ReadArray()
err = appendValueFromString(iter, hist.yMin)
if err != nil {
return err
}
iter.ReadArray()
err = appendValueFromString(iter, hist.yMax)
if err != nil {
return err
}
iter.ReadArray()
err = appendValueFromString(iter, hist.count)
if err != nil {
return err
}
if iter.ReadArray() {
return fmt.Errorf("expected close array")
}
}
default:
iter.Skip()
logf("[SKIP]readHistogram: %s\n", l1Field)
}
}
if iter.ReadArray() {
return fmt.Errorf("expected to be done")
}
return nil
}
func appendValueFromString(iter *jsoniter.Iterator, field *data.Field) error {
v, err := strconv.ParseFloat(iter.ReadString(), 64)
if err != nil {
return err
}
field.Append(v)
return nil
}
func readStream(iter *jsoniter.Iterator) *backend.DataResponse {
rsp := &backend.DataResponse{}
labelsField := data.NewFieldFromFieldType(data.FieldTypeJSON, 0)
labelsField.Name = "__labels" // avoid automatically spreading this by labels
timeField := data.NewFieldFromFieldType(data.FieldTypeTime, 0)
timeField.Name = "Time"
lineField := data.NewFieldFromFieldType(data.FieldTypeString, 0)
lineField.Name = "Line"
// Nanoseconds time field
tsField := data.NewFieldFromFieldType(data.FieldTypeString, 0)
tsField.Name = "TS"
labels := data.Labels{}
labelJson, err := labelsToRawJson(labels)
if err != nil {
return &backend.DataResponse{Error: err}
}
for iter.ReadArray() {
for l1Field := iter.ReadObject(); l1Field != ""; l1Field = iter.ReadObject() {
switch l1Field {
case "stream":
// we need to clear `labels`, because `iter.ReadVal`
// only appends to it
labels := data.Labels{}
iter.ReadVal(&labels)
labelJson, err = labelsToRawJson(labels)
if err != nil {
return &backend.DataResponse{Error: err}
}
case "values":
for iter.ReadArray() {
iter.ReadArray()
ts := iter.ReadString()
iter.ReadArray()
line := iter.ReadString()
iter.ReadArray()
t := timeFromLokiString(ts)
labelsField.Append(labelJson)
timeField.Append(t)
lineField.Append(line)
tsField.Append(ts)
}
}
}
}
frame := data.NewFrame("", labelsField, timeField, lineField, tsField)
frame.Meta = &data.FrameMeta{}
rsp.Frames = append(rsp.Frames, frame)
return rsp
}
func resultTypeToCustomMeta(resultType string) map[string]string {
return map[string]string{"resultType": resultType}
}
func timeFromFloat(fv float64) time.Time {
return time.UnixMilli(int64(fv * 1000.0)).UTC()
}
func timeFromLokiString(str string) time.Time {
// normal time values look like: 1645030246277587968
// and are less than: math.MaxInt65=9223372036854775807
// This will do a fast path for any date before 2033
s := len(str)
if s < 19 || (s == 19 && str[0] == '1') {
ns, err := strconv.ParseInt(str, 10, 64)
if err == nil {
return time.Unix(0, ns).UTC()
}
}
ss, _ := strconv.ParseInt(str[0:10], 10, 64)
ns, _ := strconv.ParseInt(str[10:], 10, 64)
return time.Unix(ss, ns).UTC()
}
func labelsToRawJson(labels data.Labels) (json.RawMessage, error) {
// data.Labels when converted to JSON keep the fields sorted
bytes, err := jsoniter.Marshal(labels)
if err != nil {
return nil, err
}
return json.RawMessage(bytes), nil
}