package elasticsearch import ( "fmt" "regexp" "strconv" "github.com/Masterminds/semver" "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/plugins" es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client" "github.com/grafana/grafana/pkg/tsdb/interval" ) type timeSeriesQuery struct { client es.Client tsdbQuery plugins.DataQuery intervalCalculator interval.Calculator } var newTimeSeriesQuery = func(client es.Client, dataQuery plugins.DataQuery, intervalCalculator interval.Calculator) *timeSeriesQuery { return &timeSeriesQuery{ client: client, tsdbQuery: dataQuery, intervalCalculator: intervalCalculator, } } // nolint:staticcheck // plugins.DataQueryResult deprecated func (e *timeSeriesQuery) execute() (plugins.DataResponse, error) { tsQueryParser := newTimeSeriesQueryParser() queries, err := tsQueryParser.parse(e.tsdbQuery) if err != nil { return plugins.DataResponse{}, err } ms := e.client.MultiSearch() from := fmt.Sprintf("%d", e.tsdbQuery.TimeRange.GetFromAsMsEpoch()) to := fmt.Sprintf("%d", e.tsdbQuery.TimeRange.GetToAsMsEpoch()) result := plugins.DataResponse{ Results: make(map[string]plugins.DataQueryResult), } for _, q := range queries { if err := e.processQuery(q, ms, from, to, result); err != nil { return plugins.DataResponse{}, err } } req, err := ms.Build() if err != nil { return plugins.DataResponse{}, err } res, err := e.client.ExecuteMultisearch(req) if err != nil { return plugins.DataResponse{}, err } rp := newResponseParser(res.Responses, queries, res.DebugInfo) return rp.getTimeSeries() } // nolint:staticcheck // plugins.DataQueryResult deprecated func (e *timeSeriesQuery) processQuery(q *Query, ms *es.MultiSearchRequestBuilder, from, to string, result plugins.DataResponse) error { minInterval, err := e.client.GetMinInterval(q.Interval) if err != nil { return err } interval := e.intervalCalculator.Calculate(*e.tsdbQuery.TimeRange, minInterval) b := ms.Search(interval) b.Size(0) filters := b.Query().Bool().Filter() filters.AddDateRangeFilter(e.client.GetTimeField(), to, from, es.DateFormatEpochMS) if q.RawQuery != "" { filters.AddQueryStringFilter(q.RawQuery, true) } if len(q.BucketAggs) == 0 { if len(q.Metrics) == 0 || q.Metrics[0].Type != "raw_document" { result.Results[q.RefID] = plugins.DataQueryResult{ RefID: q.RefID, Error: fmt.Errorf("invalid query, missing metrics and aggregations"), ErrorString: "invalid query, missing metrics and aggregations", } return nil } metric := q.Metrics[0] b.Size(metric.Settings.Get("size").MustInt(500)) b.SortDesc("@timestamp", "boolean") b.AddDocValueField("@timestamp") return nil } aggBuilder := b.Agg() // iterate backwards to create aggregations bottom-down for _, bucketAgg := range q.BucketAggs { switch bucketAgg.Type { case dateHistType: aggBuilder = addDateHistogramAgg(aggBuilder, bucketAgg, from, to) case histogramType: aggBuilder = addHistogramAgg(aggBuilder, bucketAgg) case filtersType: aggBuilder = addFiltersAgg(aggBuilder, bucketAgg) case termsType: aggBuilder = addTermsAgg(aggBuilder, bucketAgg, q.Metrics) case geohashGridType: aggBuilder = addGeoHashGridAgg(aggBuilder, bucketAgg) } } for _, m := range q.Metrics { m := m if m.Type == countType { continue } if isPipelineAgg(m.Type) { if isPipelineAggWithMultipleBucketPaths(m.Type) { if len(m.PipelineVariables) > 0 { bucketPaths := map[string]interface{}{} for name, pipelineAgg := range m.PipelineVariables { if _, err := strconv.Atoi(pipelineAgg); err == nil { var appliedAgg *MetricAgg for _, pipelineMetric := range q.Metrics { if pipelineMetric.ID == pipelineAgg { appliedAgg = pipelineMetric break } } if appliedAgg != nil { if appliedAgg.Type == countType { bucketPaths[name] = "_count" } else { bucketPaths[name] = pipelineAgg } } } } aggBuilder.Pipeline(m.ID, m.Type, bucketPaths, func(a *es.PipelineAggregation) { a.Settings = m.generateSettingsForDSL(e.client.GetVersion()) }) } else { continue } } else { if _, err := strconv.Atoi(m.PipelineAggregate); err == nil { var appliedAgg *MetricAgg for _, pipelineMetric := range q.Metrics { if pipelineMetric.ID == m.PipelineAggregate { appliedAgg = pipelineMetric break } } if appliedAgg != nil { bucketPath := m.PipelineAggregate if appliedAgg.Type == countType { bucketPath = "_count" } aggBuilder.Pipeline(m.ID, m.Type, bucketPath, func(a *es.PipelineAggregation) { a.Settings = m.generateSettingsForDSL(e.client.GetVersion()) }) } } else { continue } } } else { aggBuilder.Metric(m.ID, m.Type, m.Field, func(a *es.MetricAggregation) { a.Settings = m.generateSettingsForDSL(e.client.GetVersion()) }) } } return nil } // Casts values to int when required by Elastic's query DSL func (metricAggregation MetricAgg) generateSettingsForDSL(version *semver.Version) map[string]interface{} { setFloatPath := func(path ...string) { if stringValue, err := metricAggregation.Settings.GetPath(path...).String(); err == nil { if value, err := strconv.ParseFloat(stringValue, 64); err == nil { metricAggregation.Settings.SetPath(path, value) } } } switch metricAggregation.Type { case "moving_avg": setFloatPath("window") setFloatPath("predict") setFloatPath("settings", "alpha") setFloatPath("settings", "beta") setFloatPath("settings", "gamma") setFloatPath("settings", "period") case "serial_diff": setFloatPath("lag") } if isMetricAggregationWithInlineScriptSupport(metricAggregation.Type) { scriptValue, err := metricAggregation.Settings.GetPath("script").String() if err != nil { // the script is stored using the old format : `script:{inline: "value"}` or is not set scriptValue, err = metricAggregation.Settings.GetPath("script", "inline").String() } constraint, _ := semver.NewConstraint(">=5.6.0") if err == nil { if constraint.Check(version) { metricAggregation.Settings.SetPath([]string{"script"}, scriptValue) } else { metricAggregation.Settings.SetPath([]string{"script"}, map[string]interface{}{"inline": scriptValue}) } } } return metricAggregation.Settings.MustMap() } func addDateHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, timeFrom, timeTo string) es.AggBuilder { aggBuilder.DateHistogram(bucketAgg.ID, bucketAgg.Field, func(a *es.DateHistogramAgg, b es.AggBuilder) { a.Interval = bucketAgg.Settings.Get("interval").MustString("auto") a.MinDocCount = bucketAgg.Settings.Get("min_doc_count").MustInt(0) a.ExtendedBounds = &es.ExtendedBounds{Min: timeFrom, Max: timeTo} a.Format = bucketAgg.Settings.Get("format").MustString(es.DateFormatEpochMS) if a.Interval == "auto" { a.Interval = "$__interval" } if offset, err := bucketAgg.Settings.Get("offset").String(); err == nil { a.Offset = offset } if missing, err := bucketAgg.Settings.Get("missing").String(); err == nil { a.Missing = &missing } aggBuilder = b }) return aggBuilder } func addHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg) es.AggBuilder { aggBuilder.Histogram(bucketAgg.ID, bucketAgg.Field, func(a *es.HistogramAgg, b es.AggBuilder) { a.Interval = bucketAgg.Settings.Get("interval").MustInt(1000) a.MinDocCount = bucketAgg.Settings.Get("min_doc_count").MustInt(0) if missing, err := bucketAgg.Settings.Get("missing").Int(); err == nil { a.Missing = &missing } aggBuilder = b }) return aggBuilder } func addTermsAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, metrics []*MetricAgg) es.AggBuilder { aggBuilder.Terms(bucketAgg.ID, bucketAgg.Field, func(a *es.TermsAggregation, b es.AggBuilder) { if size, err := bucketAgg.Settings.Get("size").Int(); err == nil { a.Size = size } else if size, err := bucketAgg.Settings.Get("size").String(); err == nil { a.Size, err = strconv.Atoi(size) if err != nil { a.Size = 500 } } else { a.Size = 500 } if a.Size == 0 { a.Size = 500 } if minDocCount, err := bucketAgg.Settings.Get("min_doc_count").Int(); err == nil { a.MinDocCount = &minDocCount } if missing, err := bucketAgg.Settings.Get("missing").String(); err == nil { a.Missing = &missing } if orderBy, err := bucketAgg.Settings.Get("orderBy").String(); err == nil { /* The format for extended stats and percentiles is {metricId}[bucket_path] for everything else it's just {metricId}, _count, _term, or _key */ metricIdRegex := regexp.MustCompile(`^(\d+)`) metricId := metricIdRegex.FindString(orderBy) if len(metricId) > 0 { for _, m := range metrics { if m.ID == metricId { if m.Type == "count" { a.Order["_count"] = bucketAgg.Settings.Get("order").MustString("desc") } else { a.Order[orderBy] = bucketAgg.Settings.Get("order").MustString("desc") b.Metric(m.ID, m.Type, m.Field, nil) } break } } } else { a.Order[orderBy] = bucketAgg.Settings.Get("order").MustString("desc") } } aggBuilder = b }) return aggBuilder } func addFiltersAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg) es.AggBuilder { filters := make(map[string]interface{}) for _, filter := range bucketAgg.Settings.Get("filters").MustArray() { json := simplejson.NewFromAny(filter) query := json.Get("query").MustString() label := json.Get("label").MustString() if label == "" { label = query } filters[label] = &es.QueryStringFilter{Query: query, AnalyzeWildcard: true} } if len(filters) > 0 { aggBuilder.Filters(bucketAgg.ID, func(a *es.FiltersAggregation, b es.AggBuilder) { a.Filters = filters aggBuilder = b }) } return aggBuilder } func addGeoHashGridAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg) es.AggBuilder { aggBuilder.GeoHashGrid(bucketAgg.ID, bucketAgg.Field, func(a *es.GeoHashGridAggregation, b es.AggBuilder) { a.Precision = bucketAgg.Settings.Get("precision").MustInt(3) aggBuilder = b }) return aggBuilder } type timeSeriesQueryParser struct{} func newTimeSeriesQueryParser() *timeSeriesQueryParser { return &timeSeriesQueryParser{} } func (p *timeSeriesQueryParser) parse(tsdbQuery plugins.DataQuery) ([]*Query, error) { queries := make([]*Query, 0) for _, q := range tsdbQuery.Queries { model := q.Model timeField, err := model.Get("timeField").String() if err != nil { return nil, err } rawQuery := model.Get("query").MustString() bucketAggs, err := p.parseBucketAggs(model) if err != nil { return nil, err } metrics, err := p.parseMetrics(model) if err != nil { return nil, err } alias := model.Get("alias").MustString("") interval := model.Get("interval").MustString("") queries = append(queries, &Query{ TimeField: timeField, RawQuery: rawQuery, BucketAggs: bucketAggs, Metrics: metrics, Alias: alias, Interval: interval, RefID: q.RefID, }) } return queries, nil } func (p *timeSeriesQueryParser) parseBucketAggs(model *simplejson.Json) ([]*BucketAgg, error) { var err error var result []*BucketAgg for _, t := range model.Get("bucketAggs").MustArray() { aggJSON := simplejson.NewFromAny(t) agg := &BucketAgg{} agg.Type, err = aggJSON.Get("type").String() if err != nil { return nil, err } agg.ID, err = aggJSON.Get("id").String() if err != nil { return nil, err } agg.Field = aggJSON.Get("field").MustString() agg.Settings = simplejson.NewFromAny(aggJSON.Get("settings").MustMap()) result = append(result, agg) } return result, nil } func (p *timeSeriesQueryParser) parseMetrics(model *simplejson.Json) ([]*MetricAgg, error) { var err error var result []*MetricAgg for _, t := range model.Get("metrics").MustArray() { metricJSON := simplejson.NewFromAny(t) metric := &MetricAgg{} metric.Field = metricJSON.Get("field").MustString() metric.Hide = metricJSON.Get("hide").MustBool(false) metric.ID = metricJSON.Get("id").MustString() metric.PipelineAggregate = metricJSON.Get("pipelineAgg").MustString() metric.Settings = simplejson.NewFromAny(metricJSON.Get("settings").MustMap()) metric.Meta = simplejson.NewFromAny(metricJSON.Get("meta").MustMap()) metric.Type, err = metricJSON.Get("type").String() if err != nil { return nil, err } if isPipelineAggWithMultipleBucketPaths(metric.Type) { metric.PipelineVariables = map[string]string{} pvArr := metricJSON.Get("pipelineVariables").MustArray() for _, v := range pvArr { kv := v.(map[string]interface{}) metric.PipelineVariables[kv["name"].(string)] = kv["pipelineAgg"].(string) } } result = append(result, metric) } return result, nil }