The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
grafana/pkg/tsdb/tempo/trace_transform.go

360 lines
9.3 KiB

package tempo
import (
"encoding/json"
"fmt"
"strings"
"github.com/grafana/grafana-plugin-sdk-go/data"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/translator/conventions"
tracetranslator "go.opentelemetry.io/collector/translator/trace"
)
type KeyValue struct {
Value interface{} `json:"value"`
Key string `json:"key"`
}
type TraceLog struct {
// Millisecond epoch time
Timestamp float64 `json:"timestamp"`
Fields []*KeyValue `json:"fields"`
}
type TraceReference struct {
SpanID string `json:"spanID"`
TraceID string `json:"traceID"`
Tags []*KeyValue `json:"tags"`
}
func TraceToFrame(td pdata.Traces) (*data.Frame, error) {
// In open telemetry format the spans are grouped first by resource/service they originated in and inside that
// resource they are grouped by the instrumentation library which created them.
resourceSpans := td.ResourceSpans()
if resourceSpans.Len() == 0 {
return nil, nil
}
frame := &data.Frame{
Name: "Trace",
Fields: []*data.Field{
data.NewField("traceID", nil, []string{}),
data.NewField("spanID", nil, []string{}),
data.NewField("parentSpanID", nil, []string{}),
data.NewField("operationName", nil, []string{}),
data.NewField("serviceName", nil, []string{}),
data.NewField("serviceTags", nil, []json.RawMessage{}),
data.NewField("startTime", nil, []float64{}),
data.NewField("duration", nil, []float64{}),
data.NewField("logs", nil, []json.RawMessage{}),
data.NewField("references", nil, []json.RawMessage{}),
data.NewField("tags", nil, []json.RawMessage{}),
},
Meta: &data.FrameMeta{
// TODO: use constant once available in the SDK
PreferredVisualization: "trace",
},
}
for i := 0; i < resourceSpans.Len(); i++ {
rs := resourceSpans.At(i)
rows, err := resourceSpansToRows(rs)
if err != nil {
return nil, err
}
for _, row := range rows {
frame.AppendRow(row...)
}
}
return frame, nil
}
// resourceSpansToRows processes all the spans for a particular resource/service
func resourceSpansToRows(rs pdata.ResourceSpans) ([][]interface{}, error) {
resource := rs.Resource()
ilss := rs.InstrumentationLibrarySpans()
if resource.Attributes().Len() == 0 || ilss.Len() == 0 {
return [][]interface{}{}, nil
}
// Approximate the number of the spans as the number of the spans in the first
// instrumentation library info.
rows := make([][]interface{}, 0, ilss.At(0).Spans().Len())
for i := 0; i < ilss.Len(); i++ {
ils := ilss.At(i)
// These are finally the actual spans
spans := ils.Spans()
for j := 0; j < spans.Len(); j++ {
span := spans.At(j)
row, err := spanToSpanRow(span, ils.InstrumentationLibrary(), resource)
if err != nil {
return nil, err
}
if row != nil {
rows = append(rows, row)
}
}
}
return rows, nil
}
func spanToSpanRow(span pdata.Span, libraryTags pdata.InstrumentationLibrary, resource pdata.Resource) ([]interface{}, error) {
// If the id representation changed from hexstring to something else we need to change the transformBase64IDToHexString in the frontend code
traceID := span.TraceID().HexString()
traceID = strings.TrimPrefix(traceID, strings.Repeat("0", 16))
spanID := span.SpanID().HexString()
parentSpanID := span.ParentSpanID().HexString()
startTime := float64(span.StartTimestamp()) / 1_000_000
serviceName, serviceTags := resourceToProcess(resource)
serviceTagsJson, err := json.Marshal(serviceTags)
if err != nil {
return nil, fmt.Errorf("failed to marshal service tags: %w", err)
}
spanTags, err := json.Marshal(getSpanTags(span, libraryTags))
if err != nil {
return nil, fmt.Errorf("failed to marshal span tags: %w", err)
}
logs, err := json.Marshal(spanEventsToLogs(span.Events()))
if err != nil {
return nil, fmt.Errorf("failed to marshal span logs: %w", err)
}
references, err := json.Marshal(spanLinksToReferences(span.Links()))
if err != nil {
return nil, fmt.Errorf("failed to marshal span links: %w", err)
}
// Order matters (look at dataframe order)
return []interface{}{
traceID,
spanID,
parentSpanID,
span.Name(),
serviceName,
json.RawMessage(serviceTagsJson),
startTime,
float64(span.EndTimestamp()-span.StartTimestamp()) / 1_000_000,
json.RawMessage(logs),
json.RawMessage(references),
json.RawMessage(spanTags),
}, nil
}
func resourceToProcess(resource pdata.Resource) (string, []*KeyValue) {
attrs := resource.Attributes()
serviceName := tracetranslator.ResourceNoServiceName
if attrs.Len() == 0 {
return serviceName, nil
}
tags := make([]*KeyValue, 0, attrs.Len()-1)
attrs.Range(func(key string, attr pdata.AttributeValue) bool {
if key == conventions.AttributeServiceName {
serviceName = attr.StringVal()
}
tags = append(tags, &KeyValue{Key: key, Value: getAttributeVal(attr)})
return true
})
return serviceName, tags
}
func getAttributeVal(attr pdata.AttributeValue) interface{} {
switch attr.Type() {
case pdata.AttributeValueTypeString:
return attr.StringVal()
case pdata.AttributeValueTypeInt:
return attr.IntVal()
case pdata.AttributeValueTypeBool:
return attr.BoolVal()
case pdata.AttributeValueTypeDouble:
return attr.DoubleVal()
case pdata.AttributeValueTypeMap, pdata.AttributeValueTypeArray:
return tracetranslator.AttributeValueToString(attr)
default:
return nil
}
}
func getSpanTags(span pdata.Span, instrumentationLibrary pdata.InstrumentationLibrary) []*KeyValue {
var tags []*KeyValue
libraryTags := getTagsFromInstrumentationLibrary(instrumentationLibrary)
if libraryTags != nil {
tags = append(tags, libraryTags...)
}
span.Attributes().Range(func(key string, attr pdata.AttributeValue) bool {
tags = append(tags, &KeyValue{Key: key, Value: getAttributeVal(attr)})
return true
})
status := span.Status()
possibleNilTags := []*KeyValue{
getTagFromSpanKind(span.Kind()),
getTagFromStatusCode(status.Code()),
getErrorTagFromStatusCode(status.Code()),
getTagFromStatusMsg(status.Message()),
getTagFromTraceState(span.TraceState()),
}
for _, tag := range possibleNilTags {
if tag != nil {
tags = append(tags, tag)
}
}
return tags
}
func getTagsFromInstrumentationLibrary(il pdata.InstrumentationLibrary) []*KeyValue {
var keyValues []*KeyValue
if ilName := il.Name(); ilName != "" {
kv := &KeyValue{
Key: conventions.InstrumentationLibraryName,
Value: ilName,
}
keyValues = append(keyValues, kv)
}
if ilVersion := il.Version(); ilVersion != "" {
kv := &KeyValue{
Key: conventions.InstrumentationLibraryVersion,
Value: ilVersion,
}
keyValues = append(keyValues, kv)
}
return keyValues
}
func getTagFromSpanKind(spanKind pdata.SpanKind) *KeyValue {
var tagStr string
switch spanKind {
case pdata.SpanKindClient:
tagStr = string(tracetranslator.OpenTracingSpanKindClient)
case pdata.SpanKindServer:
tagStr = string(tracetranslator.OpenTracingSpanKindServer)
case pdata.SpanKindProducer:
tagStr = string(tracetranslator.OpenTracingSpanKindProducer)
case pdata.SpanKindConsumer:
tagStr = string(tracetranslator.OpenTracingSpanKindConsumer)
case pdata.SpanKindInternal:
tagStr = string(tracetranslator.OpenTracingSpanKindInternal)
default:
return nil
}
return &KeyValue{
Key: tracetranslator.TagSpanKind,
Value: tagStr,
}
}
func getTagFromStatusCode(statusCode pdata.StatusCode) *KeyValue {
return &KeyValue{
Key: tracetranslator.TagStatusCode,
Value: int64(statusCode),
}
}
func getErrorTagFromStatusCode(statusCode pdata.StatusCode) *KeyValue {
if statusCode == pdata.StatusCodeError {
return &KeyValue{
Key: tracetranslator.TagError,
Value: true,
}
}
return nil
}
func getTagFromStatusMsg(statusMsg string) *KeyValue {
if statusMsg == "" {
return nil
}
return &KeyValue{
Key: tracetranslator.TagStatusMsg,
Value: statusMsg,
}
}
func getTagFromTraceState(traceState pdata.TraceState) *KeyValue {
if traceState != pdata.TraceStateEmpty {
return &KeyValue{
Key: tracetranslator.TagW3CTraceState,
Value: string(traceState),
}
}
return nil
}
func spanEventsToLogs(events pdata.SpanEventSlice) []*TraceLog {
if events.Len() == 0 {
return nil
}
logs := make([]*TraceLog, 0, events.Len())
for i := 0; i < events.Len(); i++ {
event := events.At(i)
fields := make([]*KeyValue, 0, event.Attributes().Len()+1)
if event.Name() != "" {
fields = append(fields, &KeyValue{
Key: tracetranslator.TagMessage,
Value: event.Name(),
})
}
event.Attributes().Range(func(key string, attr pdata.AttributeValue) bool {
fields = append(fields, &KeyValue{Key: key, Value: getAttributeVal(attr)})
return true
})
logs = append(logs, &TraceLog{
Timestamp: float64(event.Timestamp()) / 1_000_000,
Fields: fields,
})
}
return logs
}
func spanLinksToReferences(links pdata.SpanLinkSlice) []*TraceReference {
if links.Len() == 0 {
return nil
}
references := make([]*TraceReference, 0, links.Len())
for i := 0; i < links.Len(); i++ {
link := links.At(i)
traceId := link.TraceID().HexString()
traceId = strings.TrimLeft(traceId, "0")
spanId := link.SpanID().HexString()
tags := make([]*KeyValue, 0, link.Attributes().Len())
link.Attributes().Range(func(key string, attr pdata.AttributeValue) bool {
tags = append(tags, &KeyValue{Key: key, Value: getAttributeVal(attr)})
return true
})
references = append(references, &TraceReference{
TraceID: traceId,
SpanID: spanId,
Tags: tags,
})
}
return references
}