mirror of https://github.com/grafana/grafana
Pyroscope: Annotation support for series queries (#104130)
* Pyroscope: Add annotations frame to series response * Adapt to API change, add tests * Run make lint-go * Fix conflicts after rebase * Add annotation via a separate data frame * Process annotations fully at the datasource * Add mod owner for go-humanize * Pyroscope: Annotations in Query Response can be optional --------- Co-authored-by: Piotr Jamróz <pm.jamroz@gmail.com>pull/106134/head
parent
ea0e49a6e6
commit
0b8252fd7c
@ -0,0 +1,133 @@ |
||||
package pyroscope |
||||
|
||||
import ( |
||||
"encoding/json" |
||||
"fmt" |
||||
"time" |
||||
|
||||
"github.com/dustin/go-humanize" |
||||
"github.com/grafana/grafana-plugin-sdk-go/data" |
||||
) |
||||
|
||||
// profileAnnotationKey represents the key for different types of annotations
|
||||
type profileAnnotationKey string |
||||
|
||||
const ( |
||||
// profileAnnotationKeyThrottled is the key for throttling annotations
|
||||
profileAnnotationKeyThrottled profileAnnotationKey = "pyroscope.ingest.throttled" |
||||
) |
||||
|
||||
// ProfileAnnotation represents the parsed annotation data
|
||||
type ProfileAnnotation struct { |
||||
Body ProfileThrottledAnnotation `json:"body"` |
||||
} |
||||
|
||||
// ProfileThrottledAnnotation contains throttling information
|
||||
type ProfileThrottledAnnotation struct { |
||||
PeriodType string `json:"periodType"` |
||||
PeriodLimitMb float64 `json:"periodLimitMb"` |
||||
LimitResetTime int64 `json:"limitResetTime"` |
||||
SamplingPeriodSec float64 `json:"samplingPeriodSec"` |
||||
SamplingRequests int64 `json:"samplingRequests"` |
||||
UsageGroup string `json:"usageGroup"` |
||||
} |
||||
|
||||
// processedProfileAnnotation represents a processed annotation ready for display
|
||||
type processedProfileAnnotation struct { |
||||
text string |
||||
time int64 |
||||
timeEnd int64 |
||||
isRegion bool |
||||
duplicateTracker int64 |
||||
} |
||||
|
||||
// grafanaAnnotationData holds slices of processed annotation data
|
||||
type grafanaAnnotationData struct { |
||||
times []time.Time |
||||
timeEnds []time.Time |
||||
texts []string |
||||
isRegions []bool |
||||
} |
||||
|
||||
// convertAnnotation converts a Pyroscope profile annotation into a Grafana annotation
|
||||
func convertAnnotation(timedAnnotation *TimedAnnotation, duplicateTracker int64) (*processedProfileAnnotation, error) { |
||||
if timedAnnotation.getKey() != string(profileAnnotationKeyThrottled) { |
||||
// Currently we only support throttling annotations
|
||||
return nil, nil |
||||
} |
||||
|
||||
var profileAnnotation ProfileAnnotation |
||||
err := json.Unmarshal([]byte(timedAnnotation.getValue()), &profileAnnotation) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("error parsing annotation data: %w", err) |
||||
} |
||||
|
||||
throttlingInfo := profileAnnotation.Body |
||||
|
||||
if duplicateTracker == throttlingInfo.LimitResetTime { |
||||
return nil, nil |
||||
} |
||||
|
||||
limit := humanize.IBytes(uint64(throttlingInfo.PeriodLimitMb * 1024 * 1024)) |
||||
return &processedProfileAnnotation{ |
||||
text: fmt.Sprintf("Ingestion limit (%s/%s) reached", limit, throttlingInfo.PeriodType), |
||||
time: timedAnnotation.Timestamp, |
||||
timeEnd: throttlingInfo.LimitResetTime * 1000, |
||||
isRegion: throttlingInfo.LimitResetTime < time.Now().Unix(), |
||||
duplicateTracker: throttlingInfo.LimitResetTime, |
||||
}, nil |
||||
} |
||||
|
||||
// processAnnotations processes a slice of TimedAnnotation and returns grafanaAnnotationData
|
||||
func processAnnotations(timedAnnotations []*TimedAnnotation) (*grafanaAnnotationData, error) { |
||||
result := &grafanaAnnotationData{ |
||||
times: []time.Time{}, |
||||
timeEnds: []time.Time{}, |
||||
texts: []string{}, |
||||
isRegions: []bool{}, |
||||
} |
||||
|
||||
var duplicateTracker int64 |
||||
|
||||
for _, timedAnnotation := range timedAnnotations { |
||||
if timedAnnotation == nil || timedAnnotation.Annotation == nil { |
||||
continue |
||||
} |
||||
processed, err := convertAnnotation(timedAnnotation, duplicateTracker) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
if processed != nil { |
||||
result.times = append(result.times, time.UnixMilli(processed.time)) |
||||
result.timeEnds = append(result.timeEnds, time.UnixMilli(processed.timeEnd)) |
||||
result.isRegions = append(result.isRegions, processed.isRegion) |
||||
result.texts = append(result.texts, processed.text) |
||||
duplicateTracker = processed.duplicateTracker |
||||
} |
||||
} |
||||
|
||||
return result, nil |
||||
} |
||||
|
||||
// createAnnotationFrame creates a data frame for annotations
|
||||
func createAnnotationFrame(annotations []*TimedAnnotation) (*data.Frame, error) { |
||||
annotationData, err := processAnnotations(annotations) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
timeField := data.NewField("time", nil, annotationData.times) |
||||
timeEndField := data.NewField("timeEnd", nil, annotationData.timeEnds) |
||||
textField := data.NewField("text", nil, annotationData.texts) |
||||
isRegionField := data.NewField("isRegion", nil, annotationData.isRegions) |
||||
colorField := data.NewField("color", nil, make([]string, len(annotationData.times))) |
||||
|
||||
frame := data.NewFrame("annotations") |
||||
frame.Fields = data.Fields{timeField, timeEndField, textField, isRegionField, colorField} |
||||
frame.SetMeta(&data.FrameMeta{ |
||||
DataTopic: data.DataTopicAnnotations, |
||||
}) |
||||
|
||||
return frame, nil |
||||
} |
@ -0,0 +1,188 @@ |
||||
package pyroscope |
||||
|
||||
import ( |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data" |
||||
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" |
||||
"github.com/stretchr/testify/require" |
||||
) |
||||
|
||||
func TestConvertAnnotation(t *testing.T) { |
||||
rawAnnotation := `{"body":{"periodType":"day","periodLimitMb":1024,"limitResetTime":1609459200}}` |
||||
|
||||
t.Run("processes valid annotation", func(t *testing.T) { |
||||
timedAnnotation := &TimedAnnotation{ |
||||
Timestamp: 1609455600000, |
||||
Annotation: &typesv1.ProfileAnnotation{ |
||||
Key: string(profileAnnotationKeyThrottled), |
||||
Value: rawAnnotation, |
||||
}, |
||||
} |
||||
|
||||
processed, err := convertAnnotation(timedAnnotation, 0) |
||||
require.NoError(t, err) |
||||
require.NotNil(t, processed) |
||||
require.Contains(t, processed.text, "Ingestion limit (1.0 GiB/day) reached") |
||||
require.Contains(t, processed.text, "day") |
||||
require.Equal(t, int64(1609455600000), processed.time) |
||||
require.Equal(t, int64(1609459200000), processed.timeEnd) // LimitResetTime * 1000
|
||||
require.Equal(t, int64(1609459200), processed.duplicateTracker) |
||||
}) |
||||
|
||||
t.Run("ignores non-throttling annotations", func(t *testing.T) { |
||||
timedAnnotation := &TimedAnnotation{ |
||||
Timestamp: 1000, |
||||
Annotation: &typesv1.ProfileAnnotation{ |
||||
Key: "some.other.key", |
||||
Value: `{"test":"value"}`, |
||||
}, |
||||
} |
||||
|
||||
processed, err := convertAnnotation(timedAnnotation, 0) |
||||
require.NoError(t, err) |
||||
require.Nil(t, processed) |
||||
}) |
||||
|
||||
t.Run("handles invalid annotation data", func(t *testing.T) { |
||||
timedAnnotation := &TimedAnnotation{ |
||||
Timestamp: 1000, |
||||
Annotation: &typesv1.ProfileAnnotation{ |
||||
Key: string(profileAnnotationKeyThrottled), |
||||
Value: `invalid json`, |
||||
}, |
||||
} |
||||
|
||||
processed, err := convertAnnotation(timedAnnotation, 0) |
||||
require.Error(t, err) |
||||
require.Nil(t, processed) |
||||
require.Contains(t, err.Error(), "error parsing annotation data") |
||||
}) |
||||
|
||||
t.Run("skips duplicate annotations", func(t *testing.T) { |
||||
timedAnnotation := &TimedAnnotation{ |
||||
Timestamp: 1000, |
||||
Annotation: &typesv1.ProfileAnnotation{ |
||||
Key: string(profileAnnotationKeyThrottled), |
||||
Value: rawAnnotation, |
||||
}, |
||||
} |
||||
|
||||
// First call should process the annotation
|
||||
processed1, err := convertAnnotation(timedAnnotation, 0) |
||||
require.NoError(t, err) |
||||
require.NotNil(t, processed1) |
||||
|
||||
// Second call with the same duplicateTracker should skip
|
||||
processed2, err := convertAnnotation(timedAnnotation, processed1.duplicateTracker) |
||||
require.NoError(t, err) |
||||
require.Nil(t, processed2) |
||||
}) |
||||
} |
||||
|
||||
func TestProcessAnnotations(t *testing.T) { |
||||
rawAnnotation := `{"body":{"periodType":"day","periodLimitMb":1024,"limitResetTime":1609459200}}` |
||||
|
||||
t.Run("processes multiple annotations", func(t *testing.T) { |
||||
annotations := []*TimedAnnotation{ |
||||
{ |
||||
Timestamp: 1609455600000, |
||||
Annotation: &typesv1.ProfileAnnotation{ |
||||
Key: string(profileAnnotationKeyThrottled), |
||||
Value: rawAnnotation, |
||||
}, |
||||
}, |
||||
{ |
||||
Timestamp: 1609459200000, |
||||
Annotation: &typesv1.ProfileAnnotation{ |
||||
Key: string(profileAnnotationKeyThrottled), |
||||
Value: rawAnnotation, |
||||
}, |
||||
}, |
||||
} |
||||
|
||||
result, err := processAnnotations(annotations) |
||||
require.NoError(t, err) |
||||
require.Equal(t, 1, len(result.times)) |
||||
require.Equal(t, 1, len(result.timeEnds)) |
||||
require.Equal(t, 1, len(result.texts)) |
||||
require.Equal(t, 1, len(result.isRegions)) |
||||
}) |
||||
|
||||
t.Run("handles empty annotations list", func(t *testing.T) { |
||||
result, err := processAnnotations([]*TimedAnnotation{}) |
||||
require.NoError(t, err) |
||||
require.Equal(t, 0, len(result.times)) |
||||
require.Equal(t, 0, len(result.timeEnds)) |
||||
require.Equal(t, 0, len(result.texts)) |
||||
require.Equal(t, 0, len(result.isRegions)) |
||||
}) |
||||
|
||||
t.Run("handles nil annotations", func(t *testing.T) { |
||||
annotations := []*TimedAnnotation{nil} |
||||
result, err := processAnnotations(annotations) |
||||
require.NoError(t, err) |
||||
require.Equal(t, 0, len(result.times)) |
||||
}) |
||||
|
||||
t.Run("handles invalid annotation data", func(t *testing.T) { |
||||
annotations := []*TimedAnnotation{ |
||||
{ |
||||
Timestamp: 1000, |
||||
Annotation: &typesv1.ProfileAnnotation{ |
||||
Key: string(profileAnnotationKeyThrottled), |
||||
Value: `invalid json`, |
||||
}, |
||||
}, |
||||
} |
||||
|
||||
result, err := processAnnotations(annotations) |
||||
require.Error(t, err) |
||||
require.Nil(t, result) |
||||
require.Contains(t, err.Error(), "error parsing annotation data") |
||||
}) |
||||
} |
||||
|
||||
func TestCreateAnnotationFrame(t *testing.T) { |
||||
rawAnnotation := `{"body":{"periodType":"day","periodLimitMb":1024,"limitResetTime":1609459200}}` |
||||
|
||||
t.Run("creates frame with correct fields", func(t *testing.T) { |
||||
annotations := []*TimedAnnotation{ |
||||
{ |
||||
Timestamp: 1609455600000, |
||||
Annotation: &typesv1.ProfileAnnotation{ |
||||
Key: string(profileAnnotationKeyThrottled), |
||||
Value: rawAnnotation, |
||||
}, |
||||
}, |
||||
} |
||||
|
||||
frame, err := createAnnotationFrame(annotations) |
||||
require.NoError(t, err) |
||||
require.NotNil(t, frame) |
||||
|
||||
require.Equal(t, "annotations", frame.Name) |
||||
require.Equal(t, data.DataTopicAnnotations, frame.Meta.DataTopic) |
||||
|
||||
require.Equal(t, 5, len(frame.Fields)) |
||||
require.Equal(t, "time", frame.Fields[0].Name) |
||||
require.Equal(t, "timeEnd", frame.Fields[1].Name) |
||||
require.Equal(t, "text", frame.Fields[2].Name) |
||||
require.Equal(t, "isRegion", frame.Fields[3].Name) |
||||
require.Equal(t, "color", frame.Fields[4].Name) |
||||
|
||||
require.Equal(t, 1, frame.Fields[0].Len()) |
||||
require.Equal(t, time.UnixMilli(1609455600000), frame.Fields[0].At(0)) |
||||
require.Equal(t, time.UnixMilli(1609459200000), frame.Fields[1].At(0)) |
||||
require.Contains(t, frame.Fields[2].At(0).(string), "Ingestion limit") |
||||
}) |
||||
|
||||
t.Run("handles empty annotations list", func(t *testing.T) { |
||||
frame, err := createAnnotationFrame([]*TimedAnnotation{}) |
||||
require.NoError(t, err) |
||||
require.NotNil(t, frame) |
||||
require.Equal(t, 5, len(frame.Fields)) |
||||
require.Equal(t, 0, frame.Fields[0].Len()) |
||||
}) |
||||
} |
Loading…
Reference in new issue