|
|
|
@ -409,178 +409,248 @@ func newTimeSeriesFrame(timeData []time.Time, tags map[string]string, values []* |
|
|
|
|
return frame |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// nolint:gocyclo
|
|
|
|
|
func processMetrics(esAgg *simplejson.Json, target *Query, query *backend.DataResponse, |
|
|
|
|
props map[string]string) error { |
|
|
|
|
func processCountMetric(buckets []*simplejson.Json, props map[string]string) (data.Frames, error) { |
|
|
|
|
tags := make(map[string]string, len(props)) |
|
|
|
|
timeVector := make([]time.Time, 0, len(buckets)) |
|
|
|
|
values := make([]*float64, 0, len(buckets)) |
|
|
|
|
|
|
|
|
|
for _, bucket := range buckets { |
|
|
|
|
value := castToFloat(bucket.Get("doc_count")) |
|
|
|
|
timeValue, err := getAsTime(bucket.Get("key")) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
timeVector = append(timeVector, timeValue) |
|
|
|
|
values = append(values, value) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for k, v := range props { |
|
|
|
|
tags[k] = v |
|
|
|
|
} |
|
|
|
|
tags["metric"] = countType |
|
|
|
|
return data.Frames{newTimeSeriesFrame(timeVector, tags, values)}, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func processPercentilesMetric(metric *MetricAgg, buckets []*simplejson.Json, props map[string]string) (data.Frames, error) { |
|
|
|
|
if len(buckets) == 0 { |
|
|
|
|
return data.Frames{}, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
firstBucket := buckets[0] |
|
|
|
|
percentiles := firstBucket.GetPath(metric.ID, "values").MustMap() |
|
|
|
|
|
|
|
|
|
percentileKeys := make([]string, 0) |
|
|
|
|
for k := range percentiles { |
|
|
|
|
percentileKeys = append(percentileKeys, k) |
|
|
|
|
} |
|
|
|
|
sort.Strings(percentileKeys) |
|
|
|
|
|
|
|
|
|
frames := data.Frames{} |
|
|
|
|
esAggBuckets := esAgg.Get("buckets").MustArray() |
|
|
|
|
|
|
|
|
|
for _, metric := range target.Metrics { |
|
|
|
|
if metric.Hide { |
|
|
|
|
continue |
|
|
|
|
for _, percentileName := range percentileKeys { |
|
|
|
|
tags := make(map[string]string, len(props)) |
|
|
|
|
timeVector := make([]time.Time, 0, len(buckets)) |
|
|
|
|
values := make([]*float64, 0, len(buckets)) |
|
|
|
|
|
|
|
|
|
for k, v := range props { |
|
|
|
|
tags[k] = v |
|
|
|
|
} |
|
|
|
|
tags["metric"] = "p" + percentileName |
|
|
|
|
tags["field"] = metric.Field |
|
|
|
|
for _, bucket := range buckets { |
|
|
|
|
value := castToFloat(bucket.GetPath(metric.ID, "values", percentileName)) |
|
|
|
|
key := bucket.Get("key") |
|
|
|
|
timeValue, err := getAsTime(key) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
timeVector = append(timeVector, timeValue) |
|
|
|
|
values = append(values, value) |
|
|
|
|
} |
|
|
|
|
frames = append(frames, newTimeSeriesFrame(timeVector, tags, values)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return frames, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func processTopMetricsMetric(metric *MetricAgg, buckets []*simplejson.Json, props map[string]string) (data.Frames, error) { |
|
|
|
|
metrics := metric.Settings.Get("metrics").MustArray() |
|
|
|
|
|
|
|
|
|
frames := data.Frames{} |
|
|
|
|
|
|
|
|
|
for _, metricField := range metrics { |
|
|
|
|
tags := make(map[string]string, len(props)) |
|
|
|
|
timeVector := make([]time.Time, 0, len(esAggBuckets)) |
|
|
|
|
values := make([]*float64, 0, len(esAggBuckets)) |
|
|
|
|
timeVector := make([]time.Time, 0, len(buckets)) |
|
|
|
|
values := make([]*float64, 0, len(buckets)) |
|
|
|
|
for k, v := range props { |
|
|
|
|
tags[k] = v |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
switch metric.Type { |
|
|
|
|
case countType: |
|
|
|
|
for _, v := range esAggBuckets { |
|
|
|
|
bucket := simplejson.NewFromAny(v) |
|
|
|
|
value := castToFloat(bucket.Get("doc_count")) |
|
|
|
|
key := castToFloat(bucket.Get("key")) |
|
|
|
|
timeVector = append(timeVector, time.Unix(int64(*key)/1000, 0).UTC()) |
|
|
|
|
values = append(values, value) |
|
|
|
|
} |
|
|
|
|
tags["field"] = metricField.(string) |
|
|
|
|
tags["metric"] = "top_metrics" |
|
|
|
|
|
|
|
|
|
for k, v := range props { |
|
|
|
|
tags[k] = v |
|
|
|
|
} |
|
|
|
|
tags["metric"] = countType |
|
|
|
|
frames = append(frames, newTimeSeriesFrame(timeVector, tags, values)) |
|
|
|
|
case percentilesType: |
|
|
|
|
buckets := esAggBuckets |
|
|
|
|
if len(buckets) == 0 { |
|
|
|
|
break |
|
|
|
|
for _, bucket := range buckets { |
|
|
|
|
stats := bucket.GetPath(metric.ID, "top") |
|
|
|
|
timeValue, err := getAsTime(bucket.Get("key")) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
timeVector = append(timeVector, timeValue) |
|
|
|
|
|
|
|
|
|
firstBucket := simplejson.NewFromAny(buckets[0]) |
|
|
|
|
percentiles := firstBucket.GetPath(metric.ID, "values").MustMap() |
|
|
|
|
for _, stat := range stats.MustArray() { |
|
|
|
|
stat := stat.(map[string]interface{}) |
|
|
|
|
|
|
|
|
|
percentileKeys := make([]string, 0) |
|
|
|
|
for k := range percentiles { |
|
|
|
|
percentileKeys = append(percentileKeys, k) |
|
|
|
|
} |
|
|
|
|
sort.Strings(percentileKeys) |
|
|
|
|
for _, percentileName := range percentileKeys { |
|
|
|
|
tags := make(map[string]string, len(props)) |
|
|
|
|
timeVector := make([]time.Time, 0, len(esAggBuckets)) |
|
|
|
|
values := make([]*float64, 0, len(esAggBuckets)) |
|
|
|
|
metrics, hasMetrics := stat["metrics"] |
|
|
|
|
if hasMetrics { |
|
|
|
|
metrics := metrics.(map[string]interface{}) |
|
|
|
|
metricValue, hasMetricValue := metrics[metricField.(string)] |
|
|
|
|
|
|
|
|
|
for k, v := range props { |
|
|
|
|
tags[k] = v |
|
|
|
|
} |
|
|
|
|
tags["metric"] = "p" + percentileName |
|
|
|
|
tags["field"] = metric.Field |
|
|
|
|
for _, v := range buckets { |
|
|
|
|
bucket := simplejson.NewFromAny(v) |
|
|
|
|
value := castToFloat(bucket.GetPath(metric.ID, "values", percentileName)) |
|
|
|
|
key := castToFloat(bucket.Get("key")) |
|
|
|
|
timeVector = append(timeVector, time.Unix(int64(*key)/1000, 0).UTC()) |
|
|
|
|
values = append(values, value) |
|
|
|
|
if hasMetricValue && metricValue != nil { |
|
|
|
|
v := metricValue.(float64) |
|
|
|
|
values = append(values, &v) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
frames = append(frames, newTimeSeriesFrame(timeVector, tags, values)) |
|
|
|
|
} |
|
|
|
|
case topMetricsType: |
|
|
|
|
buckets := esAggBuckets |
|
|
|
|
metrics := metric.Settings.Get("metrics").MustArray() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for _, metricField := range metrics { |
|
|
|
|
tags := make(map[string]string, len(props)) |
|
|
|
|
timeVector := make([]time.Time, 0, len(esAggBuckets)) |
|
|
|
|
values := make([]*float64, 0, len(esAggBuckets)) |
|
|
|
|
for k, v := range props { |
|
|
|
|
tags[k] = v |
|
|
|
|
} |
|
|
|
|
frames = append(frames, newTimeSeriesFrame(timeVector, tags, values)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
tags["field"] = metricField.(string) |
|
|
|
|
tags["metric"] = "top_metrics" |
|
|
|
|
return frames, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for _, v := range buckets { |
|
|
|
|
bucket := simplejson.NewFromAny(v) |
|
|
|
|
stats := bucket.GetPath(metric.ID, "top") |
|
|
|
|
key := castToFloat(bucket.Get("key")) |
|
|
|
|
func processExtendedStatsMetric(metric *MetricAgg, buckets []*simplejson.Json, props map[string]string) (data.Frames, error) { |
|
|
|
|
metaKeys := make([]string, 0) |
|
|
|
|
meta := metric.Meta.MustMap() |
|
|
|
|
for k := range meta { |
|
|
|
|
metaKeys = append(metaKeys, k) |
|
|
|
|
} |
|
|
|
|
sort.Strings(metaKeys) |
|
|
|
|
|
|
|
|
|
timeVector = append(timeVector, time.Unix(int64(*key)/1000, 0).UTC()) |
|
|
|
|
frames := data.Frames{} |
|
|
|
|
|
|
|
|
|
for _, stat := range stats.MustArray() { |
|
|
|
|
stat := stat.(map[string]interface{}) |
|
|
|
|
for _, statName := range metaKeys { |
|
|
|
|
v := meta[statName] |
|
|
|
|
if enabled, ok := v.(bool); !ok || !enabled { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
metrics, hasMetrics := stat["metrics"] |
|
|
|
|
if hasMetrics { |
|
|
|
|
metrics := metrics.(map[string]interface{}) |
|
|
|
|
metricValue, hasMetricValue := metrics[metricField.(string)] |
|
|
|
|
tags := make(map[string]string, len(props)) |
|
|
|
|
timeVector := make([]time.Time, 0, len(buckets)) |
|
|
|
|
values := make([]*float64, 0, len(buckets)) |
|
|
|
|
|
|
|
|
|
if hasMetricValue && metricValue != nil { |
|
|
|
|
v := metricValue.(float64) |
|
|
|
|
values = append(values, &v) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
for k, v := range props { |
|
|
|
|
tags[k] = v |
|
|
|
|
} |
|
|
|
|
tags["metric"] = statName |
|
|
|
|
tags["field"] = metric.Field |
|
|
|
|
|
|
|
|
|
frames = append(frames, newTimeSeriesFrame(timeVector, tags, values)) |
|
|
|
|
for _, bucket := range buckets { |
|
|
|
|
timeValue, err := getAsTime(bucket.Get("key")) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
var value *float64 |
|
|
|
|
switch statName { |
|
|
|
|
case "std_deviation_bounds_upper": |
|
|
|
|
value = castToFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "upper")) |
|
|
|
|
case "std_deviation_bounds_lower": |
|
|
|
|
value = castToFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "lower")) |
|
|
|
|
default: |
|
|
|
|
value = castToFloat(bucket.GetPath(metric.ID, statName)) |
|
|
|
|
} |
|
|
|
|
timeVector = append(timeVector, timeValue) |
|
|
|
|
values = append(values, value) |
|
|
|
|
} |
|
|
|
|
labels := tags |
|
|
|
|
frames = append(frames, newTimeSeriesFrame(timeVector, labels, values)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
case extendedStatsType: |
|
|
|
|
buckets := esAggBuckets |
|
|
|
|
return frames, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
metaKeys := make([]string, 0) |
|
|
|
|
meta := metric.Meta.MustMap() |
|
|
|
|
for k := range meta { |
|
|
|
|
metaKeys = append(metaKeys, k) |
|
|
|
|
} |
|
|
|
|
sort.Strings(metaKeys) |
|
|
|
|
for _, statName := range metaKeys { |
|
|
|
|
v := meta[statName] |
|
|
|
|
if enabled, ok := v.(bool); !ok || !enabled { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
func processDefaultMetric(metric *MetricAgg, buckets []*simplejson.Json, props map[string]string) (data.Frames, error) { |
|
|
|
|
tags := make(map[string]string, len(props)) |
|
|
|
|
timeVector := make([]time.Time, 0, len(buckets)) |
|
|
|
|
values := make([]*float64, 0, len(buckets)) |
|
|
|
|
|
|
|
|
|
tags := make(map[string]string, len(props)) |
|
|
|
|
timeVector := make([]time.Time, 0, len(esAggBuckets)) |
|
|
|
|
values := make([]*float64, 0, len(esAggBuckets)) |
|
|
|
|
for k, v := range props { |
|
|
|
|
tags[k] = v |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for k, v := range props { |
|
|
|
|
tags[k] = v |
|
|
|
|
} |
|
|
|
|
tags["metric"] = statName |
|
|
|
|
tags["field"] = metric.Field |
|
|
|
|
|
|
|
|
|
for _, v := range buckets { |
|
|
|
|
bucket := simplejson.NewFromAny(v) |
|
|
|
|
key := castToFloat(bucket.Get("key")) |
|
|
|
|
var value *float64 |
|
|
|
|
switch statName { |
|
|
|
|
case "std_deviation_bounds_upper": |
|
|
|
|
value = castToFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "upper")) |
|
|
|
|
case "std_deviation_bounds_lower": |
|
|
|
|
value = castToFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "lower")) |
|
|
|
|
default: |
|
|
|
|
value = castToFloat(bucket.GetPath(metric.ID, statName)) |
|
|
|
|
} |
|
|
|
|
timeVector = append(timeVector, time.Unix(int64(*key)/1000, 0).UTC()) |
|
|
|
|
values = append(values, value) |
|
|
|
|
} |
|
|
|
|
labels := tags |
|
|
|
|
frames = append(frames, newTimeSeriesFrame(timeVector, labels, values)) |
|
|
|
|
tags["metric"] = metric.Type |
|
|
|
|
tags["field"] = metric.Field |
|
|
|
|
tags["metricId"] = metric.ID |
|
|
|
|
for _, bucket := range buckets { |
|
|
|
|
timeValue, err := getAsTime(bucket.Get("key")) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
valueObj, err := bucket.Get(metric.ID).Map() |
|
|
|
|
if err != nil { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
var value *float64 |
|
|
|
|
if _, ok := valueObj["normalized_value"]; ok { |
|
|
|
|
value = castToFloat(bucket.GetPath(metric.ID, "normalized_value")) |
|
|
|
|
} else { |
|
|
|
|
value = castToFloat(bucket.GetPath(metric.ID, "value")) |
|
|
|
|
} |
|
|
|
|
timeVector = append(timeVector, timeValue) |
|
|
|
|
values = append(values, value) |
|
|
|
|
} |
|
|
|
|
return data.Frames{newTimeSeriesFrame(timeVector, tags, values)}, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// nolint:gocyclo
|
|
|
|
|
func processMetrics(esAgg *simplejson.Json, target *Query, query *backend.DataResponse, |
|
|
|
|
props map[string]string) error { |
|
|
|
|
frames := data.Frames{} |
|
|
|
|
esAggBuckets := esAgg.Get("buckets").MustArray() |
|
|
|
|
|
|
|
|
|
jsonBuckets := make([]*simplejson.Json, len(esAggBuckets)) |
|
|
|
|
|
|
|
|
|
for i, v := range esAggBuckets { |
|
|
|
|
jsonBuckets[i] = simplejson.NewFromAny(v) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for _, metric := range target.Metrics { |
|
|
|
|
if metric.Hide { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
switch metric.Type { |
|
|
|
|
case countType: |
|
|
|
|
countFrames, err := processCountMetric(jsonBuckets, props) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
default: |
|
|
|
|
for k, v := range props { |
|
|
|
|
tags[k] = v |
|
|
|
|
frames = append(frames, countFrames...) |
|
|
|
|
case percentilesType: |
|
|
|
|
percentileFrames, err := processPercentilesMetric(metric, jsonBuckets, props) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
frames = append(frames, percentileFrames...) |
|
|
|
|
case topMetricsType: |
|
|
|
|
topMetricsFrames, err := processTopMetricsMetric(metric, jsonBuckets, props) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
frames = append(frames, topMetricsFrames...) |
|
|
|
|
case extendedStatsType: |
|
|
|
|
extendedStatsFrames, err := processExtendedStatsMetric(metric, jsonBuckets, props) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
tags["metric"] = metric.Type |
|
|
|
|
tags["field"] = metric.Field |
|
|
|
|
tags["metricId"] = metric.ID |
|
|
|
|
for _, v := range esAggBuckets { |
|
|
|
|
bucket := simplejson.NewFromAny(v) |
|
|
|
|
key := castToFloat(bucket.Get("key")) |
|
|
|
|
valueObj, err := bucket.Get(metric.ID).Map() |
|
|
|
|
if err != nil { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
var value *float64 |
|
|
|
|
if _, ok := valueObj["normalized_value"]; ok { |
|
|
|
|
value = castToFloat(bucket.GetPath(metric.ID, "normalized_value")) |
|
|
|
|
} else { |
|
|
|
|
value = castToFloat(bucket.GetPath(metric.ID, "value")) |
|
|
|
|
} |
|
|
|
|
timeVector = append(timeVector, time.Unix(int64(*key)/1000, 0).UTC()) |
|
|
|
|
values = append(values, value) |
|
|
|
|
frames = append(frames, extendedStatsFrames...) |
|
|
|
|
default: |
|
|
|
|
defaultFrames, err := processDefaultMetric(metric, jsonBuckets, props) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
frames = append(frames, newTimeSeriesFrame(timeVector, tags, values)) |
|
|
|
|
frames = append(frames, defaultFrames...) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if query.Frames != nil { |
|
|
|
@ -905,6 +975,16 @@ func castToFloat(j *simplejson.Json) *float64 { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func getAsTime(j *simplejson.Json) (time.Time, error) { |
|
|
|
|
// these are stored as numbers
|
|
|
|
|
number, err := j.Float64() |
|
|
|
|
if err != nil { |
|
|
|
|
return time.Time{}, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return time.UnixMilli(int64(number)).UTC(), nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func findAgg(target *Query, aggID string) (*BucketAgg, error) { |
|
|
|
|
for _, v := range target.BucketAggs { |
|
|
|
|
if aggID == v.ID { |
|
|
|
|