elastic: response parser: simplify code (#59782)

elastic: simplify code
pull/60169/head
Gábor Farkas 2 years ago committed by GitHub
parent 0ab5ea0843
commit 3172d8dd9b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 58
      pkg/tsdb/elasticsearch/response_parser.go
  2. 72
      pkg/tsdb/elasticsearch/response_parser_test.go
  3. 3
      pkg/tsdb/elasticsearch/time_series_query.go

@ -28,28 +28,16 @@ const (
geohashGridType = "geohash_grid"
)
type responseParser struct {
Responses []*es.SearchResponse
Targets []*Query
}
var newResponseParser = func(responses []*es.SearchResponse, targets []*Query) *responseParser {
return &responseParser{
Responses: responses,
Targets: targets,
}
}
func (rp *responseParser) getTimeSeries() (*backend.QueryDataResponse, error) {
func parseResponse(responses []*es.SearchResponse, targets []*Query) (*backend.QueryDataResponse, error) {
result := backend.QueryDataResponse{
Responses: backend.Responses{},
}
if rp.Responses == nil {
if responses == nil {
return &result, nil
}
for i, res := range rp.Responses {
target := rp.Targets[i]
for i, res := range responses {
target := targets[i]
if res.Error != nil {
errResult := getErrorFromElasticResponse(res)
@ -62,19 +50,19 @@ func (rp *responseParser) getTimeSeries() (*backend.QueryDataResponse, error) {
queryRes := backend.DataResponse{}
props := make(map[string]string)
err := rp.processBuckets(res.Aggregations, target, &queryRes, props, 0)
err := processBuckets(res.Aggregations, target, &queryRes, props, 0)
if err != nil {
return &backend.QueryDataResponse{}, err
}
rp.nameFields(queryRes, target)
rp.trimDatapoints(queryRes, target)
nameFields(queryRes, target)
trimDatapoints(queryRes, target)
result.Responses[target.RefID] = queryRes
}
return &result, nil
}
func (rp *responseParser) processBuckets(aggs map[string]interface{}, target *Query,
func processBuckets(aggs map[string]interface{}, target *Query,
queryResult *backend.DataResponse, props map[string]string, depth int) error {
var err error
maxDepth := len(target.BucketAggs) - 1
@ -94,9 +82,9 @@ func (rp *responseParser) processBuckets(aggs map[string]interface{}, target *Qu
if depth == maxDepth {
if aggDef.Type == dateHistType {
err = rp.processMetrics(esAgg, target, queryResult, props)
err = processMetrics(esAgg, target, queryResult, props)
} else {
err = rp.processAggregationDocs(esAgg, aggDef, target, queryResult, props)
err = processAggregationDocs(esAgg, aggDef, target, queryResult, props)
}
if err != nil {
return err
@ -119,7 +107,7 @@ func (rp *responseParser) processBuckets(aggs map[string]interface{}, target *Qu
if key, err := bucket.Get("key_as_string").String(); err == nil {
newProps[aggDef.Field] = key
}
err = rp.processBuckets(bucket.MustMap(), target, queryResult, newProps, depth+1)
err = processBuckets(bucket.MustMap(), target, queryResult, newProps, depth+1)
if err != nil {
return err
}
@ -142,7 +130,7 @@ func (rp *responseParser) processBuckets(aggs map[string]interface{}, target *Qu
newProps["filter"] = bucketKey
err = rp.processBuckets(bucket.MustMap(), target, queryResult, newProps, depth+1)
err = processBuckets(bucket.MustMap(), target, queryResult, newProps, depth+1)
if err != nil {
return err
}
@ -153,7 +141,7 @@ func (rp *responseParser) processBuckets(aggs map[string]interface{}, target *Qu
}
// nolint:gocyclo
func (rp *responseParser) processMetrics(esAgg *simplejson.Json, target *Query, query *backend.DataResponse,
func processMetrics(esAgg *simplejson.Json, target *Query, query *backend.DataResponse,
props map[string]string) error {
frames := data.Frames{}
esAggBuckets := esAgg.Get("buckets").MustArray()
@ -345,7 +333,7 @@ func (rp *responseParser) processMetrics(esAgg *simplejson.Json, target *Query,
return nil
}
func (rp *responseParser) processAggregationDocs(esAgg *simplejson.Json, aggDef *BucketAgg, target *Query,
func processAggregationDocs(esAgg *simplejson.Json, aggDef *BucketAgg, target *Query,
queryResult *backend.DataResponse, props map[string]string) error {
propKeys := make([]string, 0)
for k := range props {
@ -424,7 +412,7 @@ func (rp *responseParser) processAggregationDocs(esAgg *simplejson.Json, aggDef
for _, metric := range target.Metrics {
switch metric.Type {
case countType:
addMetricValue(values, rp.getMetricName(metric.Type), castToFloat(bucket.Get("doc_count")))
addMetricValue(values, getMetricName(metric.Type), castToFloat(bucket.Get("doc_count")))
case extendedStatsType:
metaKeys := make([]string, 0)
meta := metric.Meta.MustMap()
@ -448,11 +436,11 @@ func (rp *responseParser) processAggregationDocs(esAgg *simplejson.Json, aggDef
value = castToFloat(bucket.GetPath(metric.ID, statName))
}
addMetricValue(values, rp.getMetricName(metric.Type), value)
addMetricValue(values, getMetricName(metric.Type), value)
break
}
default:
metricName := rp.getMetricName(metric.Type)
metricName := getMetricName(metric.Type)
otherMetrics := make([]*MetricAgg, 0)
for _, m := range target.Metrics {
@ -496,7 +484,7 @@ func extractDataField(name string, v interface{}) *data.Field {
}
}
func (rp *responseParser) trimDatapoints(queryResult backend.DataResponse, target *Query) {
func trimDatapoints(queryResult backend.DataResponse, target *Query) {
var histogram *BucketAgg
for _, bucketAgg := range target.BucketAggs {
if bucketAgg.Type == dateHistType {
@ -533,7 +521,7 @@ func (rp *responseParser) trimDatapoints(queryResult backend.DataResponse, targe
}
}
func (rp *responseParser) nameFields(queryResult backend.DataResponse, target *Query) {
func nameFields(queryResult backend.DataResponse, target *Query) {
set := make(map[string]struct{})
frames := queryResult.Frames
for _, v := range frames {
@ -547,7 +535,7 @@ func (rp *responseParser) nameFields(queryResult backend.DataResponse, target *Q
}
metricTypeCount := len(set)
for i := range frames {
fieldName := rp.getFieldName(*frames[i].Fields[1], target, metricTypeCount)
fieldName := getFieldName(*frames[i].Fields[1], target, metricTypeCount)
for _, field := range frames[i].Fields {
field.SetConfig(&data.FieldConfig{DisplayNameFromDS: fieldName})
}
@ -556,9 +544,9 @@ func (rp *responseParser) nameFields(queryResult backend.DataResponse, target *Q
var aliasPatternRegex = regexp.MustCompile(`\{\{([\s\S]+?)\}\}`)
func (rp *responseParser) getFieldName(dataField data.Field, target *Query, metricTypeCount int) string {
func getFieldName(dataField data.Field, target *Query, metricTypeCount int) string {
metricType := dataField.Labels["metric"]
metricName := rp.getMetricName(metricType)
metricName := getMetricName(metricType)
delete(dataField.Labels, "metric")
field := ""
@ -648,7 +636,7 @@ func (rp *responseParser) getFieldName(dataField data.Field, target *Query, metr
return strings.TrimSpace(name) + " " + metricName
}
func (rp *responseParser) getMetricName(metric string) string {
func getMetricName(metric string) string {
if text, ok := metricAggType[metric]; ok {
return text
}

@ -46,9 +46,7 @@ func TestResponseParser(t *testing.T) {
}
]
}`
rp, err := newResponseParserForTest(targets, response)
require.NoError(t, err)
result, err := rp.getTimeSeries()
result, err := parseTestResponse(targets, response)
require.NoError(t, err)
require.Len(t, result.Responses, 1)
@ -97,9 +95,7 @@ func TestResponseParser(t *testing.T) {
}
]
}`
rp, err := newResponseParserForTest(targets, response)
require.NoError(t, err)
result, err := rp.getTimeSeries()
result, err := parseTestResponse(targets, response)
require.NoError(t, err)
require.Len(t, result.Responses, 1)
@ -165,9 +161,7 @@ func TestResponseParser(t *testing.T) {
}
]
}`
rp, err := newResponseParserForTest(targets, response)
require.NoError(t, err)
result, err := rp.getTimeSeries()
result, err := parseTestResponse(targets, response)
require.NoError(t, err)
queryRes := result.Responses["A"]
@ -236,9 +230,7 @@ func TestResponseParser(t *testing.T) {
}
]
}`
rp, err := newResponseParserForTest(targets, response)
require.NoError(t, err)
result, err := rp.getTimeSeries()
result, err := parseTestResponse(targets, response)
require.NoError(t, err)
require.Len(t, result.Responses, 1)
@ -311,9 +303,7 @@ func TestResponseParser(t *testing.T) {
}
]
}`
rp, err := newResponseParserForTest(targets, response)
require.NoError(t, err)
result, err := rp.getTimeSeries()
result, err := parseTestResponse(targets, response)
require.NoError(t, err)
require.Len(t, result.Responses, 1)
@ -395,9 +385,7 @@ func TestResponseParser(t *testing.T) {
}
]
}`
rp, err := newResponseParserForTest(targets, response)
require.NoError(t, err)
result, err := rp.getTimeSeries()
result, err := parseTestResponse(targets, response)
require.NoError(t, err)
require.Len(t, result.Responses, 1)
@ -501,9 +489,7 @@ func TestResponseParser(t *testing.T) {
}
]
}`
rp, err := newResponseParserForTest(targets, response)
require.NoError(t, err)
result, err := rp.getTimeSeries()
result, err := parseTestResponse(targets, response)
require.NoError(t, err)
require.Len(t, result.Responses, 1)
@ -557,9 +543,7 @@ func TestResponseParser(t *testing.T) {
}
]
}`
rp, err := newResponseParserForTest(targets, response)
require.NoError(t, err)
result, err := rp.getTimeSeries()
result, err := parseTestResponse(targets, response)
require.NoError(t, err)
require.Len(t, result.Responses, 1)
@ -609,9 +593,7 @@ func TestResponseParser(t *testing.T) {
}
]
}`
rp, err := newResponseParserForTest(targets, response)
require.NoError(t, err)
result, err := rp.getTimeSeries()
result, err := parseTestResponse(targets, response)
require.NoError(t, err)
require.Len(t, result.Responses, 1)
@ -680,9 +662,7 @@ func TestResponseParser(t *testing.T) {
}
]
}`
rp, err := newResponseParserForTest(targets, response)
require.NoError(t, err)
result, err := rp.getTimeSeries()
result, err := parseTestResponse(targets, response)
require.NoError(t, err)
require.Len(t, result.Responses, 1)
@ -751,9 +731,7 @@ func TestResponseParser(t *testing.T) {
}
]
}`
rp, err := newResponseParserForTest(targets, response)
require.NoError(t, err)
result, err := rp.getTimeSeries()
result, err := parseTestResponse(targets, response)
require.NoError(t, err)
require.Len(t, result.Responses, 1)
@ -816,10 +794,8 @@ func TestResponseParser(t *testing.T) {
}
]
}`
rp, err := newResponseParserForTest(targets, response)
result, err := parseTestResponse(targets, response)
require.NoError(t, err)
result, err := rp.getTimeSeries()
require.NoError(t, err)
require.Len(t, result.Responses, 1)
@ -859,9 +835,7 @@ func TestResponseParser(t *testing.T) {
}
]
}`
rp, err := newResponseParserForTest(targets, response)
require.NoError(t, err)
result, err := rp.getTimeSeries()
result, err := parseTestResponse(targets, response)
require.NoError(t, err)
require.Len(t, result.Responses, 1)
@ -908,9 +882,7 @@ func TestResponseParser(t *testing.T) {
}
]
}`
rp, err := newResponseParserForTest(targets, response)
require.NoError(t, err)
result, err := rp.getTimeSeries()
result, err := parseTestResponse(targets, response)
require.NoError(t, err)
require.Len(t, result.Responses, 1)
@ -975,9 +947,7 @@ func TestResponseParser(t *testing.T) {
}
]
}`
rp, err := newResponseParserForTest(targets, response)
require.NoError(t, err)
result, err := rp.getTimeSeries()
result, err := parseTestResponse(targets, response)
require.NoError(t, err)
require.Len(t, result.Responses, 1)
@ -1065,9 +1035,7 @@ func TestResponseParser(t *testing.T) {
}
]
}`
rp, err := newResponseParserForTest(targets, response)
require.NoError(t, err)
result, err := rp.getTimeSeries()
result, err := parseTestResponse(targets, response)
require.NoError(t, err)
require.Len(t, result.Responses, 1)
@ -1139,9 +1107,7 @@ func TestResponseParser(t *testing.T) {
}
}]
}`
rp, err := newResponseParserForTest(targets, response)
assert.Nil(t, err)
result, err := rp.getTimeSeries()
result, err := parseTestResponse(targets, response)
assert.Nil(t, err)
assert.Len(t, result.Responses, 1)
@ -1185,7 +1151,7 @@ func TestResponseParser(t *testing.T) {
})
}
func newResponseParserForTest(tsdbQueries map[string]string, responseBody string) (*responseParser, error) {
func parseTestResponse(tsdbQueries map[string]string, responseBody string) (*backend.QueryDataResponse, error) {
from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC)
to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC)
timeRange := backend.TimeRange{
@ -1216,5 +1182,5 @@ func newResponseParserForTest(tsdbQueries map[string]string, responseBody string
return nil, err
}
return newResponseParser(response.Responses, queries), nil
return parseResponse(response.Responses, queries)
}

@ -57,8 +57,7 @@ func (e *timeSeriesQuery) execute() (*backend.QueryDataResponse, error) {
return &backend.QueryDataResponse{}, err
}
rp := newResponseParser(res.Responses, queries)
return rp.getTimeSeries()
return parseResponse(res.Responses, queries)
}
func (e *timeSeriesQuery) processQuery(q *Query, ms *es.MultiSearchRequestBuilder, from, to int64,

Loading…
Cancel
Save