diff --git a/pkg/tsdb/elasticsearch/response_parser.go b/pkg/tsdb/elasticsearch/response_parser.go index b4553604f03..7de6a0f026e 100644 --- a/pkg/tsdb/elasticsearch/response_parser.go +++ b/pkg/tsdb/elasticsearch/response_parser.go @@ -593,33 +593,10 @@ func processMetrics(esAgg *simplejson.Json, target *Query, query *backend.DataRe func processAggregationDocs(esAgg *simplejson.Json, aggDef *BucketAgg, target *Query, queryResult *backend.DataResponse, props map[string]string) error { - propKeys := make([]string, 0) - for k := range props { - propKeys = append(propKeys, k) - } - sort.Strings(propKeys) + propKeys := createPropKeys(props) frames := data.Frames{} fields := createFieldsFromPropKeys(queryResult.Frames, propKeys) - addMetricValue := func(values []interface{}, metricName string, value *float64) { - index := -1 - for i, f := range fields { - if f.Name == metricName { - index = i - break - } - } - - var field data.Field - if index == -1 { - field = *data.NewField(metricName, nil, []*float64{}) - fields = append(fields, &field) - } else { - field = *fields[index] - } - field.Append(value) - } - for _, v := range esAgg.Get("buckets").MustArray() { bucket := simplejson.NewFromAny(v) var values []interface{} @@ -664,58 +641,15 @@ func processAggregationDocs(esAgg *simplejson.Json, aggDef *BucketAgg, target *Q for _, metric := range target.Metrics { switch metric.Type { case countType: - addMetricValue(values, getMetricName(metric.Type), castToFloat(bucket.Get("doc_count"))) + addMetricValueToFields(&fields, values, getMetricName(metric.Type), castToFloat(bucket.Get("doc_count"))) case extendedStatsType: - metaKeys := make([]string, 0) - meta := metric.Meta.MustMap() - for k := range meta { - metaKeys = append(metaKeys, k) - } - sort.Strings(metaKeys) - for _, statName := range metaKeys { - v := meta[statName] - if enabled, ok := v.(bool); !ok || !enabled { - continue - } - - var value *float64 - switch statName { - case "std_deviation_bounds_upper": - value = castToFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "upper")) - case "std_deviation_bounds_lower": - value = castToFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "lower")) - default: - value = castToFloat(bucket.GetPath(metric.ID, statName)) - } - - addMetricValue(values, getMetricName(metric.Type), value) - break - } + addExtendedStatsToFields(&fields, bucket, metric, values) case percentilesType: - percentiles := bucket.GetPath(metric.ID, "values") - for _, percentileName := range getSortedKeys(percentiles.MustMap()) { - percentileValue := percentiles.Get(percentileName).MustFloat64() - addMetricValue(values, fmt.Sprintf("p%v %v", percentileName, metric.Field), &percentileValue) - } + addPercentilesToFields(&fields, bucket, metric, values) + case topMetricsType: + addTopMetricsToFields(&fields, bucket, metric, values) default: - metricName := getMetricName(metric.Type) - otherMetrics := make([]*MetricAgg, 0) - - for _, m := range target.Metrics { - if m.Type == metric.Type { - otherMetrics = append(otherMetrics, m) - } - } - - if len(otherMetrics) > 1 { - metricName += " " + metric.Field - if metric.Type == "bucket_script" { - // Use the formula in the column name - metricName = metric.Settings.Get("script").MustString("") - } - } - - addMetricValue(values, metricName, castToFloat(bucket.GetPath(metric.ID, "value"))) + addOtherMetricsToFields(&fields, bucket, metric, values, target) } } @@ -1140,3 +1074,108 @@ func getSortedKeys(data map[string]interface{}) []string { sort.Strings(keys) return keys } + +func createPropKeys(props map[string]string) []string { + propKeys := make([]string, 0) + for k := range props { + propKeys = append(propKeys, k) + } + sort.Strings(propKeys) + return propKeys +} + +func addMetricValueToFields(fields *[]*data.Field, values []interface{}, metricName string, value *float64) { + index := -1 + for i, f := range *fields { + if f.Name == metricName { + index = i + break + } + } + + var field data.Field + if index == -1 { + field = *data.NewField(metricName, nil, []*float64{}) + *fields = append(*fields, &field) + } else { + field = *(*fields)[index] + } + field.Append(value) +} + +func addPercentilesToFields(fields *[]*data.Field, bucket *simplejson.Json, metric *MetricAgg, values []interface{}) { + percentiles := bucket.GetPath(metric.ID, "values") + for _, percentileName := range getSortedKeys(percentiles.MustMap()) { + percentileValue := percentiles.Get(percentileName).MustFloat64() + addMetricValueToFields(fields, values, fmt.Sprintf("p%v %v", percentileName, metric.Field), &percentileValue) + } +} + +func addExtendedStatsToFields(fields *[]*data.Field, bucket *simplejson.Json, metric *MetricAgg, values []interface{}) { + metaKeys := make([]string, 0) + meta := metric.Meta.MustMap() + for k := range meta { + metaKeys = append(metaKeys, k) + } + sort.Strings(metaKeys) + for _, statName := range metaKeys { + v := meta[statName] + if enabled, ok := v.(bool); !ok || !enabled { + continue + } + var value *float64 + switch statName { + case "std_deviation_bounds_upper": + value = castToFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "upper")) + case "std_deviation_bounds_lower": + value = castToFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "lower")) + default: + value = castToFloat(bucket.GetPath(metric.ID, statName)) + } + + addMetricValueToFields(fields, values, getMetricName(metric.Type), value) + break + } +} + +func addTopMetricsToFields(fields *[]*data.Field, bucket *simplejson.Json, metric *MetricAgg, values []interface{}) { + baseName := getMetricName(metric.Type) + metrics := metric.Settings.Get("metrics").MustStringArray() + for _, metricField := range metrics { + // If we selected more than one metric we also add each metric name + metricName := baseName + if len(metrics) > 1 { + metricName += " " + metricField + } + top := bucket.GetPath(metric.ID, "top").MustArray() + metrics, hasMetrics := top[0].(map[string]interface{})["metrics"] + if hasMetrics { + metrics := metrics.(map[string]interface{}) + metricValue, hasMetricValue := metrics[metricField] + if hasMetricValue && metricValue != nil { + v := metricValue.(float64) + addMetricValueToFields(fields, values, metricName, &v) + } + } + } +} + +func addOtherMetricsToFields(fields *[]*data.Field, bucket *simplejson.Json, metric *MetricAgg, values []interface{}, target *Query) { + metricName := getMetricName(metric.Type) + otherMetrics := make([]*MetricAgg, 0) + + for _, m := range target.Metrics { + if m.Type == metric.Type { + otherMetrics = append(otherMetrics, m) + } + } + + if len(otherMetrics) > 1 { + metricName += " " + metric.Field + if metric.Type == "bucket_script" { + // Use the formula in the column name + metricName = metric.Settings.Get("script").MustString("") + } + } + addMetricValueToFields(fields, values, metricName, castToFloat(bucket.GetPath(metric.ID, "value"))) +} diff --git a/pkg/tsdb/elasticsearch/response_parser_test.go b/pkg/tsdb/elasticsearch/response_parser_test.go index 15fc393d3b2..56414a61b2f 100644 --- a/pkg/tsdb/elasticsearch/response_parser_test.go +++ b/pkg/tsdb/elasticsearch/response_parser_test.go @@ -1484,7 +1484,7 @@ func TestResponseParser(t *testing.T) { }) }) - t.Run("With top_metrics", func(t *testing.T) { + t.Run("With top_metrics and date_histogram agg", func(t *testing.T) { targets := map[string]string{ "A": `{ "metrics": [ @@ -1571,6 +1571,76 @@ func TestResponseParser(t *testing.T) { v, _ = frame.FloatAt(1, 1) assert.Equal(t, 2., v) }) + + t.Run("With top_metrics and terms agg", func(t *testing.T) { + targets := map[string]string{ + "A": `{ + "metrics": [ + { + "type": "top_metrics", + "settings": { + "order": "desc", + "orderBy": "@timestamp", + "metrics": ["@value", "@anotherValue"] + }, + "id": "1" + } + ], + "bucketAggs": [{ "type": "terms", "field": "id", "id": "3" }] + }`, + } + response := `{ + "responses": [{ + "aggregations": { + "3": { + "buckets": [ + { + "key": "id1", + "1": { + "top": [ + { "sort": [10], "metrics": { "@value": 10, "@anotherValue": 2 } } + ] + } + }, + { + "key": "id2", + "1": { + "top": [ + { "sort": [5], "metrics": { "@value": 5, "@anotherValue": 2 } } + ] + } + } + ] + } + } + }] + }` + + result, err := parseTestResponse(targets, response) + assert.Nil(t, err) + assert.Len(t, result.Responses, 1) + frames := result.Responses["A"].Frames + require.Len(t, frames, 1) + requireFrameLength(t, frames[0], 2) + require.Len(t, frames[0].Fields, 3) + + f1 := frames[0].Fields[0] + f2 := frames[0].Fields[1] + f3 := frames[0].Fields[2] + + require.Equal(t, "id", f1.Name) + require.Equal(t, "Top Metrics @value", f2.Name) + require.Equal(t, "Top Metrics @anotherValue", f3.Name) + + requireStringAt(t, "id1", f1, 0) + requireStringAt(t, "id2", f1, 1) + + requireFloatAt(t, 10, f2, 0) + requireFloatAt(t, 5, f2, 1) + + requireFloatAt(t, 2, f3, 0) + requireFloatAt(t, 2, f3, 1) + }) } func parseTestResponse(tsdbQueries map[string]string, responseBody string) (*backend.QueryDataResponse, error) {