The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
grafana/pkg/tsdb/loki/frame.go

381 lines
12 KiB

package loki
import (
"encoding/json"
"fmt"
"hash/fnv"
"sort"
"strings"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
// we adjust the dataframes to be the way frontend & alerting
// wants them.
func adjustFrame(frame *data.Frame, query *lokiQuery, setMetricFrameName bool, logsDataplane bool) error {
fields := frame.Fields
if len(fields) < 2 {
return fmt.Errorf("missing fields in frame")
}
// metric-fields have "timefield, valuefield"
// logs-fields have "labelsfield, timefield, ..."
secondField := fields[1]
if secondField.Type() == data.FieldTypeFloat64 {
return adjustMetricFrame(frame, query, setMetricFrameName)
} else {
return adjustLogsFrame(frame, query, logsDataplane)
}
}
func adjustMetricFrame(frame *data.Frame, query *lokiQuery, setFrameName bool) error {
fields := frame.Fields
// we check if the fields are of correct type
if len(fields) != 2 {
return fmt.Errorf("invalid field length in metric frame. expected 2, got %d", len(fields))
}
timeField := fields[0]
valueField := fields[1]
if (timeField.Type() != data.FieldTypeTime) || (valueField.Type() != data.FieldTypeFloat64) {
return fmt.Errorf("invalid field types in metric frame. expected time and float64, got %s and %s", timeField.Type(), valueField.Type())
}
labels := getFrameLabels(frame)
isMetricRange := query.QueryType == QueryTypeRange
name := formatName(labels, query)
if setFrameName {
frame.Name = name
}
if frame.Meta == nil {
frame.Meta = &data.FrameMeta{}
}
frame.Meta.Stats = parseStats(frame.Meta.Custom)
frame.Meta.Custom = nil
if isMetricRange {
frame.Meta.ExecutedQueryString = "Expr: " + query.Expr + "\n" + "Step: " + query.Step.String()
} else {
frame.Meta.ExecutedQueryString = "Expr: " + query.Expr
}
if isMetricRange {
if timeField.Config == nil {
timeField.Config = &data.FieldConfig{}
}
timeField.Config.Interval = float64(query.Step.Milliseconds())
}
if valueField.Config == nil {
valueField.Config = &data.FieldConfig{}
}
valueField.Config.DisplayNameFromDS = name
return nil
}
func adjustLogsFrame(frame *data.Frame, query *lokiQuery, dataplane bool) error {
if dataplane {
return adjustDataplaneLogsFrame(frame, query)
} else {
return adjustLegacyLogsFrame(frame, query)
}
}
func adjustLegacyLogsFrame(frame *data.Frame, query *lokiQuery) error {
// we check if the fields are of correct type and length
fields := frame.Fields
if len(fields) != 4 && len(fields) != 5 {
return fmt.Errorf("invalid field length in logs frame. expected 4 or 5, got %d", len(fields))
}
labelsField := fields[0]
timeField := fields[1]
lineField := fields[2]
stringTimeField := fields[3]
if len(fields) == 5 {
labelTypesField := fields[4]
if labelTypesField.Type() != data.FieldTypeJSON {
return fmt.Errorf("invalid field types in logs frame. expected json, got %s", labelTypesField.Type())
}
labelTypesField.Name = "labelTypes"
labelTypesField.Config = &data.FieldConfig{
Custom: map[string]interface{}{
"hidden": true,
},
}
}
if (timeField.Type() != data.FieldTypeTime) || (lineField.Type() != data.FieldTypeString) || (labelsField.Type() != data.FieldTypeJSON) || (stringTimeField.Type() != data.FieldTypeString) {
return fmt.Errorf("invalid field types in logs frame. expected time, string, json and string, got %s, %s, %s and %s", timeField.Type(), lineField.Type(), labelsField.Type(), stringTimeField.Type())
}
if (timeField.Len() != lineField.Len()) || (timeField.Len() != labelsField.Len()) || (timeField.Len() != stringTimeField.Len()) {
return fmt.Errorf("indifferent field lengths in logs frame. expected all to be equal, got %d, %d, %d and %d", timeField.Len(), lineField.Len(), labelsField.Len(), stringTimeField.Len())
}
// this returns an error when the length of fields do not match
_, err := frame.RowLen()
if err != nil {
return err
}
labelsField.Name = "labels"
stringTimeField.Name = "tsNs"
if frame.Meta == nil {
frame.Meta = &data.FrameMeta{}
}
frame.Meta.Stats = parseStats(frame.Meta.Custom)
// TODO: when we get a real frame-type in grafana-plugin-sdk-go,
// move this to frame.Meta.FrameType
frame.Meta.Custom = map[string]string{
"frameType": "LabeledTimeValues",
}
frame.Meta.ExecutedQueryString = "Expr: " + query.Expr
// we need to send to the browser the nanosecond-precision timestamp too.
// usually timestamps become javascript-date-objects in the browser automatically, which only
// have millisecond-precision.
// so we send a separate timestamp-as-string field too. it is provided by the
// loki-json-parser-code
idField, err := makeIdField(stringTimeField, lineField, labelsField, query.RefID)
if err != nil {
return err
}
frame.Fields = append(frame.Fields, idField)
return nil
}
func adjustDataplaneLogsFrame(frame *data.Frame, query *lokiQuery) error {
// we check if the fields are of correct type and length
fields := frame.Fields
if len(fields) != 4 && len(fields) != 5 {
return fmt.Errorf("invalid field length in logs frame. expected 4 or 5, got %d", len(fields))
}
labelsField := fields[0]
timeField := fields[1]
lineField := fields[2]
stringTimeField := fields[3]
var labelTypesField *data.Field
if len(fields) == 5 {
labelTypesField = fields[4]
if labelTypesField.Type() != data.FieldTypeJSON {
return fmt.Errorf("invalid field types in logs frame. expected json, got %s", labelTypesField.Type())
}
labelTypesField.Name = "labelTypes"
labelTypesField.Config = &data.FieldConfig{
Custom: map[string]interface{}{
"hidden": true,
},
}
}
if (timeField.Type() != data.FieldTypeTime) || (lineField.Type() != data.FieldTypeString) || (labelsField.Type() != data.FieldTypeJSON) || (stringTimeField.Type() != data.FieldTypeString) {
return fmt.Errorf("invalid field types in logs frame. expected time, string, json and string, got %s, %s, %s and %s", timeField.Type(), lineField.Type(), labelsField.Type(), stringTimeField.Type())
}
if (timeField.Len() != lineField.Len()) || (timeField.Len() != labelsField.Len()) || (timeField.Len() != stringTimeField.Len()) {
return fmt.Errorf("indifferent field lengths in logs frame. expected all to be equal, got %d, %d, %d and %d", timeField.Len(), lineField.Len(), labelsField.Len(), stringTimeField.Len())
}
// this returns an error when the length of fields do not match
_, err := frame.RowLen()
if err != nil {
return err
}
timeField.Name = "timestamp"
labelsField.Name = "labels"
lineField.Name = "body"
if frame.Meta == nil {
frame.Meta = &data.FrameMeta{}
}
frame.Meta.Stats = parseStats(frame.Meta.Custom)
frame.Meta.Custom = nil
frame.Meta.Type = data.FrameTypeLogLines
frame.Meta.ExecutedQueryString = "Expr: " + query.Expr
idField, err := makeIdField(stringTimeField, lineField, labelsField, query.RefID)
if err != nil {
return err
}
if labelTypesField != nil {
frame.Fields = data.Fields{labelsField, timeField, lineField, idField, labelTypesField}
} else {
frame.Fields = data.Fields{labelsField, timeField, lineField, idField}
}
return nil
}
func calculateCheckSum(time string, line string, labels []byte) (string, error) {
input := []byte(line + "_")
input = append(input, labels...)
hash := fnv.New32()
_, err := hash.Write(input)
if err != nil {
return "", err
}
return fmt.Sprintf("%s_%x", time, hash.Sum32()), nil
}
func makeIdField(stringTimeField *data.Field, lineField *data.Field, labelsField *data.Field, refId string) (*data.Field, error) {
length := stringTimeField.Len()
ids := make([]string, length)
checksums := make(map[string]int)
for i := 0; i < length; i++ {
time := stringTimeField.At(i).(string)
line := lineField.At(i).(string)
labels := labelsField.At(i).(json.RawMessage)
sum, err := calculateCheckSum(time, line, labels)
if err != nil {
return nil, err
}
sumCount := checksums[sum]
idSuffix := ""
if sumCount > 0 {
// we had this checksum already, we need to do something to make it unique
idSuffix = fmt.Sprintf("_%d", sumCount)
}
checksums[sum] = sumCount + 1
ids[i] = sum + idSuffix
}
return data.NewField("id", nil, ids), nil
}
func formatNamePrometheusStyle(labels map[string]string) string {
parts := make([]string, 0, len(labels))
for k, v := range labels {
parts = append(parts, fmt.Sprintf("%s=%q", k, v))
}
sort.Strings(parts)
return fmt.Sprintf("{%s}", strings.Join(parts, ", "))
}
// If legend (using of name or pattern instead of time series name) is used, use that name/pattern for formatting
func formatName(labels map[string]string, query *lokiQuery) string {
if query.LegendFormat == "" {
return formatNamePrometheusStyle(labels)
}
result := legendFormat.ReplaceAllFunc([]byte(query.LegendFormat), func(in []byte) []byte {
labelName := strings.Replace(string(in), "{{", "", 1)
labelName = strings.Replace(labelName, "}}", "", 1)
labelName = strings.TrimSpace(labelName)
if val, exists := labels[labelName]; exists {
return []byte(val)
}
return []byte{}
})
return string(result)
}
func getFrameLabels(frame *data.Frame) map[string]string {
labels := make(map[string]string)
for _, field := range frame.Fields {
for k, v := range field.Labels {
labels[k] = v
}
}
return labels
}
func parseStats(frameMetaCustom interface{}) []data.QueryStat {
customMap, ok := frameMetaCustom.(map[string]interface{})
if !ok {
return nil
}
rawStats, ok := customMap["stats"].(map[string]interface{})
if !ok {
return nil
}
var stats []data.QueryStat
summary, ok := rawStats["summary"].(map[string]interface{})
if ok {
stats = append(stats,
makeStat("Summary: bytes processed per second", summary["bytesProcessedPerSecond"], "Bps"),
makeStat("Summary: lines processed per second", summary["linesProcessedPerSecond"], ""),
makeStat("Summary: total bytes processed", summary["totalBytesProcessed"], "decbytes"),
makeStat("Summary: total lines processed", summary["totalLinesProcessed"], ""),
makeStat("Summary: exec time", summary["execTime"], "s"))
}
store, ok := rawStats["store"].(map[string]interface{})
if ok {
stats = append(stats,
makeStat("Store: total chunks ref", store["totalChunksRef"], ""),
makeStat("Store: total chunks downloaded", store["totalChunksDownloaded"], ""),
makeStat("Store: chunks download time", store["chunksDownloadTime"], "s"),
makeStat("Store: head chunk bytes", store["headChunkBytes"], "decbytes"),
makeStat("Store: head chunk lines", store["headChunkLines"], ""),
makeStat("Store: decompressed bytes", store["decompressedBytes"], "decbytes"),
makeStat("Store: decompressed lines", store["decompressedLines"], ""),
makeStat("Store: compressed bytes", store["compressedBytes"], "decbytes"),
makeStat("Store: total duplicates", store["totalDuplicates"], ""))
}
ingester, ok := rawStats["ingester"].(map[string]interface{})
if ok {
stats = append(stats,
makeStat("Ingester: total reached", ingester["totalReached"], ""),
makeStat("Ingester: total chunks matched", ingester["totalChunksMatched"], ""),
makeStat("Ingester: total batches", ingester["totalBatches"], ""),
makeStat("Ingester: total lines sent", ingester["totalLinesSent"], ""),
makeStat("Ingester: head chunk bytes", ingester["headChunkBytes"], "decbytes"),
makeStat("Ingester: head chunk lines", ingester["headChunkLines"], ""),
makeStat("Ingester: decompressed bytes", ingester["decompressedBytes"], "decbytes"),
makeStat("Ingester: decompressed lines", ingester["decompressedLines"], ""),
makeStat("Ingester: compressed bytes", ingester["compressedBytes"], "decbytes"),
makeStat("Ingester: total duplicates", ingester["totalDuplicates"], ""))
}
return stats
}
func makeStat(name string, interfaceValue interface{}, unit string) data.QueryStat {
var value float64
switch v := interfaceValue.(type) {
case float64:
value = v
case int:
value = float64(v)
}
return data.QueryStat{
FieldConfig: data.FieldConfig{
DisplayName: name,
Unit: unit,
},
Value: value,
}
}