From d332dab3ece752a17ea27a6b84aa2227fd726df5 Mon Sep 17 00:00:00 2001 From: Ivana Huckova <30407135+ivanahuckova@users.noreply.github.com> Date: Fri, 16 Dec 2022 16:30:36 +0100 Subject: [PATCH] Elasticsearch: Refactor parse query (#60440) * Refactor parse query to functions * Move parsing to new file * Create empty result variable and use it when returning early * Fix linting * Revert "Create empty result variable and use it when returning early" This reverts commit 36a503f66e52f8213c673972774329a963a78100. --- pkg/tsdb/elasticsearch/parse_query.go | 110 +++++++++++++++++ pkg/tsdb/elasticsearch/parse_query_test.go | 107 +++++++++++++++++ .../elasticsearch/response_parser_test.go | 3 +- pkg/tsdb/elasticsearch/time_series_query.go | 113 +----------------- .../elasticsearch/time_series_query_test.go | 102 ---------------- 5 files changed, 219 insertions(+), 216 deletions(-) create mode 100644 pkg/tsdb/elasticsearch/parse_query.go create mode 100644 pkg/tsdb/elasticsearch/parse_query_test.go diff --git a/pkg/tsdb/elasticsearch/parse_query.go b/pkg/tsdb/elasticsearch/parse_query.go new file mode 100644 index 00000000000..8ffae3a216f --- /dev/null +++ b/pkg/tsdb/elasticsearch/parse_query.go @@ -0,0 +1,110 @@ +package elasticsearch + +import ( + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana/pkg/components/simplejson" +) + +func parseQuery(tsdbQuery []backend.DataQuery) ([]*Query, error) { + queries := make([]*Query, 0) + for _, q := range tsdbQuery { + model, err := simplejson.NewJson(q.JSON) + if err != nil { + return nil, err + } + timeField, err := model.Get("timeField").String() + if err != nil { + return nil, err + } + rawQuery := model.Get("query").MustString() + bucketAggs, err := parseBucketAggs(model) + if err != nil { + return nil, err + } + metrics, err := parseMetrics(model) + if err != nil { + return nil, err + } + alias := model.Get("alias").MustString("") + interval := model.Get("interval").MustString("") + + queries = append(queries, &Query{ + TimeField: timeField, + RawQuery: rawQuery, + BucketAggs: bucketAggs, + Metrics: metrics, + Alias: alias, + Interval: interval, + RefID: q.RefID, + MaxDataPoints: q.MaxDataPoints, + }) + } + + return queries, nil +} + +func parseBucketAggs(model *simplejson.Json) ([]*BucketAgg, error) { + var err error + var result []*BucketAgg + for _, t := range model.Get("bucketAggs").MustArray() { + aggJSON := simplejson.NewFromAny(t) + agg := &BucketAgg{} + + agg.Type, err = aggJSON.Get("type").String() + if err != nil { + return nil, err + } + + agg.ID, err = aggJSON.Get("id").String() + if err != nil { + return nil, err + } + + agg.Field = aggJSON.Get("field").MustString() + agg.Settings = simplejson.NewFromAny(aggJSON.Get("settings").MustMap()) + + result = append(result, agg) + } + return result, nil +} + +func parseMetrics(model *simplejson.Json) ([]*MetricAgg, error) { + var err error + var result []*MetricAgg + for _, t := range model.Get("metrics").MustArray() { + metricJSON := simplejson.NewFromAny(t) + metric := &MetricAgg{} + + metric.Field = metricJSON.Get("field").MustString() + metric.Hide = metricJSON.Get("hide").MustBool(false) + metric.ID = metricJSON.Get("id").MustString() + metric.PipelineAggregate = metricJSON.Get("pipelineAgg").MustString() + // In legacy editors, we were storing empty settings values as "null" + // The new editor doesn't store empty strings at all + // We need to ensures backward compatibility with old queries and remove empty fields + settings := metricJSON.Get("settings").MustMap() + for k, v := range settings { + if v == "null" { + delete(settings, k) + } + } + metric.Settings = simplejson.NewFromAny(settings) + metric.Meta = simplejson.NewFromAny(metricJSON.Get("meta").MustMap()) + metric.Type, err = metricJSON.Get("type").String() + if err != nil { + return nil, err + } + + if isPipelineAggWithMultipleBucketPaths(metric.Type) { + metric.PipelineVariables = map[string]string{} + pvArr := metricJSON.Get("pipelineVariables").MustArray() + for _, v := range pvArr { + kv := v.(map[string]interface{}) + metric.PipelineVariables[kv["name"].(string)] = kv["pipelineAgg"].(string) + } + } + + result = append(result, metric) + } + return result, nil +} diff --git a/pkg/tsdb/elasticsearch/parse_query_test.go b/pkg/tsdb/elasticsearch/parse_query_test.go new file mode 100644 index 00000000000..fc6cf047270 --- /dev/null +++ b/pkg/tsdb/elasticsearch/parse_query_test.go @@ -0,0 +1,107 @@ +package elasticsearch + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestParseQuery(t *testing.T) { + t.Run("Test parse query", func(t *testing.T) { + t.Run("Should be able to parse query", func(t *testing.T) { + body := `{ + "timeField": "@timestamp", + "query": "@metric:cpu", + "alias": "{{@hostname}} {{metric}}", + "interval": "10m", + "metrics": [ + { + "field": "@value", + "id": "1", + "meta": {}, + "settings": { + "percents": [ + "90" + ] + }, + "type": "percentiles" + }, + { + "type": "count", + "field": "select field", + "id": "4", + "settings": {}, + "meta": {} + } + ], + "bucketAggs": [ + { + "fake": true, + "field": "@hostname", + "id": "3", + "settings": { + "min_doc_count": 1, + "order": "desc", + "orderBy": "_term", + "size": "10" + }, + "type": "terms" + }, + { + "field": "@timestamp", + "id": "2", + "settings": { + "interval": "5m", + "min_doc_count": 0, + "trimEdges": 0 + }, + "type": "date_histogram" + } + ] + }` + dataQuery, err := newDataQuery(body) + require.NoError(t, err) + queries, err := parseQuery(dataQuery.Queries) + require.NoError(t, err) + require.Len(t, queries, 1) + + q := queries[0] + + require.Equal(t, q.TimeField, "@timestamp") + require.Equal(t, q.RawQuery, "@metric:cpu") + require.Equal(t, q.Alias, "{{@hostname}} {{metric}}") + require.Equal(t, q.Interval, "10m") + + require.Len(t, q.Metrics, 2) + require.Equal(t, q.Metrics[0].Field, "@value") + require.Equal(t, q.Metrics[0].ID, "1") + require.Equal(t, q.Metrics[0].Type, "percentiles") + require.False(t, q.Metrics[0].Hide) + require.Equal(t, q.Metrics[0].PipelineAggregate, "") + require.Equal(t, q.Metrics[0].Settings.Get("percents").MustStringArray()[0], "90") + + require.Equal(t, q.Metrics[1].Field, "select field") + require.Equal(t, q.Metrics[1].ID, "4") + require.Equal(t, q.Metrics[1].Type, "count") + require.False(t, q.Metrics[1].Hide) + require.Equal(t, q.Metrics[1].PipelineAggregate, "") + require.Empty(t, q.Metrics[1].Settings.MustMap()) + + require.Len(t, q.BucketAggs, 2) + require.Equal(t, q.BucketAggs[0].Field, "@hostname") + require.Equal(t, q.BucketAggs[0].ID, "3") + require.Equal(t, q.BucketAggs[0].Type, "terms") + require.Equal(t, q.BucketAggs[0].Settings.Get("min_doc_count").MustInt(), 1) + require.Equal(t, q.BucketAggs[0].Settings.Get("order").MustString(), "desc") + require.Equal(t, q.BucketAggs[0].Settings.Get("orderBy").MustString(), "_term") + require.Equal(t, q.BucketAggs[0].Settings.Get("size").MustString(), "10") + + require.Equal(t, q.BucketAggs[1].Field, "@timestamp") + require.Equal(t, q.BucketAggs[1].ID, "2") + require.Equal(t, q.BucketAggs[1].Type, "date_histogram") + require.Equal(t, q.BucketAggs[1].Settings.Get("interval").MustString(), "5m") + require.Equal(t, q.BucketAggs[1].Settings.Get("min_doc_count").MustInt(), 0) + require.Equal(t, q.BucketAggs[1].Settings.Get("trimEdges").MustInt(), 0) + }) + }) +} diff --git a/pkg/tsdb/elasticsearch/response_parser_test.go b/pkg/tsdb/elasticsearch/response_parser_test.go index bd73c946a7e..e8b47973a57 100644 --- a/pkg/tsdb/elasticsearch/response_parser_test.go +++ b/pkg/tsdb/elasticsearch/response_parser_test.go @@ -1176,8 +1176,7 @@ func parseTestResponse(tsdbQueries map[string]string, responseBody string) (*bac return nil, err } - tsQueryParser := newTimeSeriesQueryParser() - queries, err := tsQueryParser.parse(tsdbQuery.Queries) + queries, err := parseQuery(tsdbQuery.Queries) if err != nil { return nil, err } diff --git a/pkg/tsdb/elasticsearch/time_series_query.go b/pkg/tsdb/elasticsearch/time_series_query.go index 75bdf086689..3a98455fa17 100644 --- a/pkg/tsdb/elasticsearch/time_series_query.go +++ b/pkg/tsdb/elasticsearch/time_series_query.go @@ -28,8 +28,7 @@ var newTimeSeriesQuery = func(client es.Client, dataQuery []backend.DataQuery, } func (e *timeSeriesQuery) execute() (*backend.QueryDataResponse, error) { - tsQueryParser := newTimeSeriesQueryParser() - queries, err := tsQueryParser.parse(e.dataQueries) + queries, err := parseQuery(e.dataQueries) if err != nil { return &backend.QueryDataResponse{}, err } @@ -398,113 +397,3 @@ func addGeoHashGridAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg) es.AggBui return aggBuilder } - -type timeSeriesQueryParser struct{} - -func newTimeSeriesQueryParser() *timeSeriesQueryParser { - return &timeSeriesQueryParser{} -} - -func (p *timeSeriesQueryParser) parse(tsdbQuery []backend.DataQuery) ([]*Query, error) { - queries := make([]*Query, 0) - for _, q := range tsdbQuery { - model, err := simplejson.NewJson(q.JSON) - if err != nil { - return nil, err - } - timeField, err := model.Get("timeField").String() - if err != nil { - return nil, err - } - rawQuery := model.Get("query").MustString() - bucketAggs, err := p.parseBucketAggs(model) - if err != nil { - return nil, err - } - metrics, err := p.parseMetrics(model) - if err != nil { - return nil, err - } - alias := model.Get("alias").MustString("") - interval := model.Get("interval").MustString("") - - queries = append(queries, &Query{ - TimeField: timeField, - RawQuery: rawQuery, - BucketAggs: bucketAggs, - Metrics: metrics, - Alias: alias, - Interval: interval, - RefID: q.RefID, - MaxDataPoints: q.MaxDataPoints, - }) - } - - return queries, nil -} - -func (p *timeSeriesQueryParser) parseBucketAggs(model *simplejson.Json) ([]*BucketAgg, error) { - var err error - var result []*BucketAgg - for _, t := range model.Get("bucketAggs").MustArray() { - aggJSON := simplejson.NewFromAny(t) - agg := &BucketAgg{} - - agg.Type, err = aggJSON.Get("type").String() - if err != nil { - return nil, err - } - - agg.ID, err = aggJSON.Get("id").String() - if err != nil { - return nil, err - } - - agg.Field = aggJSON.Get("field").MustString() - agg.Settings = simplejson.NewFromAny(aggJSON.Get("settings").MustMap()) - - result = append(result, agg) - } - return result, nil -} - -func (p *timeSeriesQueryParser) parseMetrics(model *simplejson.Json) ([]*MetricAgg, error) { - var err error - var result []*MetricAgg - for _, t := range model.Get("metrics").MustArray() { - metricJSON := simplejson.NewFromAny(t) - metric := &MetricAgg{} - - metric.Field = metricJSON.Get("field").MustString() - metric.Hide = metricJSON.Get("hide").MustBool(false) - metric.ID = metricJSON.Get("id").MustString() - metric.PipelineAggregate = metricJSON.Get("pipelineAgg").MustString() - // In legacy editors, we were storing empty settings values as "null" - // The new editor doesn't store empty strings at all - // We need to ensures backward compatibility with old queries and remove empty fields - settings := metricJSON.Get("settings").MustMap() - for k, v := range settings { - if v == "null" { - delete(settings, k) - } - } - metric.Settings = simplejson.NewFromAny(settings) - metric.Meta = simplejson.NewFromAny(metricJSON.Get("meta").MustMap()) - metric.Type, err = metricJSON.Get("type").String() - if err != nil { - return nil, err - } - - if isPipelineAggWithMultipleBucketPaths(metric.Type) { - metric.PipelineVariables = map[string]string{} - pvArr := metricJSON.Get("pipelineVariables").MustArray() - for _, v := range pvArr { - kv := v.(map[string]interface{}) - metric.PipelineVariables[kv["name"].(string)] = kv["pipelineAgg"].(string) - } - } - - result = append(result, metric) - } - return result, nil -} diff --git a/pkg/tsdb/elasticsearch/time_series_query_test.go b/pkg/tsdb/elasticsearch/time_series_query_test.go index 9450f8b1d21..9ff8b48f11b 100644 --- a/pkg/tsdb/elasticsearch/time_series_query_test.go +++ b/pkg/tsdb/elasticsearch/time_series_query_test.go @@ -1814,105 +1814,3 @@ func executeTsdbQuery(c es.Client, body string, from, to time.Time, minInterval query := newTimeSeriesQuery(c, dataRequest.Queries, intervalv2.NewCalculator(intervalv2.CalculatorOptions{MinInterval: minInterval})) return query.execute() } - -func TestTimeSeriesQueryParser(t *testing.T) { - t.Run("Test time series query parser", func(t *testing.T) { - p := newTimeSeriesQueryParser() - - t.Run("Should be able to parse query", func(t *testing.T) { - body := `{ - "timeField": "@timestamp", - "query": "@metric:cpu", - "alias": "{{@hostname}} {{metric}}", - "interval": "10m", - "metrics": [ - { - "field": "@value", - "id": "1", - "meta": {}, - "settings": { - "percents": [ - "90" - ] - }, - "type": "percentiles" - }, - { - "type": "count", - "field": "select field", - "id": "4", - "settings": {}, - "meta": {} - } - ], - "bucketAggs": [ - { - "fake": true, - "field": "@hostname", - "id": "3", - "settings": { - "min_doc_count": 1, - "order": "desc", - "orderBy": "_term", - "size": "10" - }, - "type": "terms" - }, - { - "field": "@timestamp", - "id": "2", - "settings": { - "interval": "5m", - "min_doc_count": 0, - "trimEdges": 0 - }, - "type": "date_histogram" - } - ] - }` - dataQuery, err := newDataQuery(body) - require.NoError(t, err) - queries, err := p.parse(dataQuery.Queries) - require.NoError(t, err) - require.Len(t, queries, 1) - - q := queries[0] - - require.Equal(t, q.TimeField, "@timestamp") - require.Equal(t, q.RawQuery, "@metric:cpu") - require.Equal(t, q.Alias, "{{@hostname}} {{metric}}") - require.Equal(t, q.Interval, "10m") - - require.Len(t, q.Metrics, 2) - require.Equal(t, q.Metrics[0].Field, "@value") - require.Equal(t, q.Metrics[0].ID, "1") - require.Equal(t, q.Metrics[0].Type, "percentiles") - require.False(t, q.Metrics[0].Hide) - require.Equal(t, q.Metrics[0].PipelineAggregate, "") - require.Equal(t, q.Metrics[0].Settings.Get("percents").MustStringArray()[0], "90") - - require.Equal(t, q.Metrics[1].Field, "select field") - require.Equal(t, q.Metrics[1].ID, "4") - require.Equal(t, q.Metrics[1].Type, "count") - require.False(t, q.Metrics[1].Hide) - require.Equal(t, q.Metrics[1].PipelineAggregate, "") - require.Empty(t, q.Metrics[1].Settings.MustMap()) - - require.Len(t, q.BucketAggs, 2) - require.Equal(t, q.BucketAggs[0].Field, "@hostname") - require.Equal(t, q.BucketAggs[0].ID, "3") - require.Equal(t, q.BucketAggs[0].Type, "terms") - require.Equal(t, q.BucketAggs[0].Settings.Get("min_doc_count").MustInt(), 1) - require.Equal(t, q.BucketAggs[0].Settings.Get("order").MustString(), "desc") - require.Equal(t, q.BucketAggs[0].Settings.Get("orderBy").MustString(), "_term") - require.Equal(t, q.BucketAggs[0].Settings.Get("size").MustString(), "10") - - require.Equal(t, q.BucketAggs[1].Field, "@timestamp") - require.Equal(t, q.BucketAggs[1].ID, "2") - require.Equal(t, q.BucketAggs[1].Type, "date_histogram") - require.Equal(t, q.BucketAggs[1].Settings.Get("interval").MustString(), "5m") - require.Equal(t, q.BucketAggs[1].Settings.Get("min_doc_count").MustInt(), 0) - require.Equal(t, q.BucketAggs[1].Settings.Get("trimEdges").MustInt(), 0) - }) - }) -}