The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
grafana/pkg/tsdb/elasticsearch/time_series_query.go

476 lines
14 KiB

package elasticsearch
import (
"fmt"
"regexp"
"strconv"
"time"
"github.com/Masterminds/semver"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/components/simplejson"
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
"github.com/grafana/grafana/pkg/tsdb/intervalv2"
)
type timeSeriesQuery struct {
client es.Client
dataQueries []backend.DataQuery
intervalCalculator intervalv2.Calculator
}
var newTimeSeriesQuery = func(client es.Client, dataQuery []backend.DataQuery,
intervalCalculator intervalv2.Calculator) *timeSeriesQuery {
return &timeSeriesQuery{
client: client,
dataQueries: dataQuery,
intervalCalculator: intervalCalculator,
}
}
func (e *timeSeriesQuery) execute() (*backend.QueryDataResponse, error) {
tsQueryParser := newTimeSeriesQueryParser()
queries, err := tsQueryParser.parse(e.dataQueries)
if err != nil {
return &backend.QueryDataResponse{}, err
}
ms := e.client.MultiSearch()
from := e.dataQueries[0].TimeRange.From.UnixNano() / int64(time.Millisecond)
to := e.dataQueries[0].TimeRange.To.UnixNano() / int64(time.Millisecond)
result := backend.QueryDataResponse{
Responses: backend.Responses{},
}
for _, q := range queries {
if err := e.processQuery(q, ms, from, to, result); err != nil {
return &backend.QueryDataResponse{}, err
}
}
req, err := ms.Build()
if err != nil {
return &backend.QueryDataResponse{}, err
}
res, err := e.client.ExecuteMultisearch(req)
if err != nil {
return &backend.QueryDataResponse{}, err
}
rp := newResponseParser(res.Responses, queries, res.DebugInfo)
return rp.getTimeSeries()
}
func (e *timeSeriesQuery) processQuery(q *Query, ms *es.MultiSearchRequestBuilder, from, to int64,
result backend.QueryDataResponse) error {
minInterval, err := e.client.GetMinInterval(q.Interval)
if err != nil {
return err
}
interval := e.intervalCalculator.Calculate(e.dataQueries[0].TimeRange, minInterval, q.MaxDataPoints)
b := ms.Search(interval)
b.Size(0)
filters := b.Query().Bool().Filter()
filters.AddDateRangeFilter(e.client.GetTimeField(), to, from, es.DateFormatEpochMS)
if q.RawQuery != "" {
filters.AddQueryStringFilter(q.RawQuery, true)
}
if len(q.BucketAggs) == 0 {
if len(q.Metrics) == 0 || q.Metrics[0].Type != "raw_document" {
result.Responses[q.RefID] = backend.DataResponse{
Error: fmt.Errorf("invalid query, missing metrics and aggregations"),
}
return nil
}
metric := q.Metrics[0]
b.Size(metric.Settings.Get("size").MustInt(500))
b.SortDesc("@timestamp", "boolean")
b.AddDocValueField("@timestamp")
return nil
}
aggBuilder := b.Agg()
// iterate backwards to create aggregations bottom-down
for _, bucketAgg := range q.BucketAggs {
bucketAgg.Settings = simplejson.NewFromAny(
bucketAgg.generateSettingsForDSL(),
)
switch bucketAgg.Type {
case dateHistType:
aggBuilder = addDateHistogramAgg(aggBuilder, bucketAgg, from, to)
case histogramType:
aggBuilder = addHistogramAgg(aggBuilder, bucketAgg)
case filtersType:
aggBuilder = addFiltersAgg(aggBuilder, bucketAgg)
case termsType:
aggBuilder = addTermsAgg(aggBuilder, bucketAgg, q.Metrics)
case geohashGridType:
aggBuilder = addGeoHashGridAgg(aggBuilder, bucketAgg)
}
}
for _, m := range q.Metrics {
m := m
if m.Type == countType {
continue
}
if isPipelineAgg(m.Type) {
if isPipelineAggWithMultipleBucketPaths(m.Type) {
if len(m.PipelineVariables) > 0 {
bucketPaths := map[string]interface{}{}
for name, pipelineAgg := range m.PipelineVariables {
if _, err := strconv.Atoi(pipelineAgg); err == nil {
var appliedAgg *MetricAgg
for _, pipelineMetric := range q.Metrics {
if pipelineMetric.ID == pipelineAgg {
appliedAgg = pipelineMetric
break
}
}
if appliedAgg != nil {
if appliedAgg.Type == countType {
bucketPaths[name] = "_count"
} else {
bucketPaths[name] = pipelineAgg
}
}
}
}
aggBuilder.Pipeline(m.ID, m.Type, bucketPaths, func(a *es.PipelineAggregation) {
a.Settings = m.generateSettingsForDSL(e.client.GetVersion())
})
} else {
continue
}
} else {
if _, err := strconv.Atoi(m.PipelineAggregate); err == nil {
var appliedAgg *MetricAgg
for _, pipelineMetric := range q.Metrics {
if pipelineMetric.ID == m.PipelineAggregate {
appliedAgg = pipelineMetric
break
}
}
if appliedAgg != nil {
bucketPath := m.PipelineAggregate
if appliedAgg.Type == countType {
bucketPath = "_count"
}
aggBuilder.Pipeline(m.ID, m.Type, bucketPath, func(a *es.PipelineAggregation) {
a.Settings = m.generateSettingsForDSL(e.client.GetVersion())
})
}
} else {
continue
}
}
} else {
aggBuilder.Metric(m.ID, m.Type, m.Field, func(a *es.MetricAggregation) {
a.Settings = m.generateSettingsForDSL(e.client.GetVersion())
})
}
}
return nil
}
func setFloatPath(settings *simplejson.Json, path ...string) {
if stringValue, err := settings.GetPath(path...).String(); err == nil {
if value, err := strconv.ParseFloat(stringValue, 64); err == nil {
settings.SetPath(path, value)
}
}
}
func setIntPath(settings *simplejson.Json, path ...string) {
if stringValue, err := settings.GetPath(path...).String(); err == nil {
if value, err := strconv.ParseInt(stringValue, 10, 64); err == nil {
settings.SetPath(path, value)
}
}
}
// Casts values to float when required by Elastic's query DSL
func (metricAggregation MetricAgg) generateSettingsForDSL(version *semver.Version) map[string]interface{} {
switch metricAggregation.Type {
case "moving_avg":
setFloatPath(metricAggregation.Settings, "window")
setFloatPath(metricAggregation.Settings, "predict")
setFloatPath(metricAggregation.Settings, "settings", "alpha")
setFloatPath(metricAggregation.Settings, "settings", "beta")
setFloatPath(metricAggregation.Settings, "settings", "gamma")
setFloatPath(metricAggregation.Settings, "settings", "period")
case "serial_diff":
setFloatPath(metricAggregation.Settings, "lag")
}
if isMetricAggregationWithInlineScriptSupport(metricAggregation.Type) {
scriptValue, err := metricAggregation.Settings.GetPath("script").String()
if err != nil {
// the script is stored using the old format : `script:{inline: "value"}` or is not set
scriptValue, err = metricAggregation.Settings.GetPath("script", "inline").String()
}
constraint, _ := semver.NewConstraint(">=5.6.0")
if err == nil {
if constraint.Check(version) {
metricAggregation.Settings.SetPath([]string{"script"}, scriptValue)
} else {
metricAggregation.Settings.SetPath([]string{"script"}, map[string]interface{}{"inline": scriptValue})
}
}
}
return metricAggregation.Settings.MustMap()
}
func (bucketAgg BucketAgg) generateSettingsForDSL() map[string]interface{} {
// TODO: This might also need to be applied to other bucket aggregations and other fields.
switch bucketAgg.Type {
case "date_histogram":
setIntPath(bucketAgg.Settings, "min_doc_count")
}
return bucketAgg.Settings.MustMap()
}
func addDateHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, timeFrom, timeTo int64) es.AggBuilder {
aggBuilder.DateHistogram(bucketAgg.ID, bucketAgg.Field, func(a *es.DateHistogramAgg, b es.AggBuilder) {
a.Interval = bucketAgg.Settings.Get("interval").MustString("auto")
a.MinDocCount = bucketAgg.Settings.Get("min_doc_count").MustInt(0)
a.ExtendedBounds = &es.ExtendedBounds{Min: timeFrom, Max: timeTo}
a.Format = bucketAgg.Settings.Get("format").MustString(es.DateFormatEpochMS)
if a.Interval == "auto" {
a.Interval = "$__interval"
}
if offset, err := bucketAgg.Settings.Get("offset").String(); err == nil {
a.Offset = offset
}
if missing, err := bucketAgg.Settings.Get("missing").String(); err == nil {
a.Missing = &missing
}
if timezone, err := bucketAgg.Settings.Get("timeZone").String(); err == nil {
if timezone != "utc" {
a.TimeZone = timezone
}
}
aggBuilder = b
})
return aggBuilder
}
func addHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg) es.AggBuilder {
aggBuilder.Histogram(bucketAgg.ID, bucketAgg.Field, func(a *es.HistogramAgg, b es.AggBuilder) {
a.Interval = bucketAgg.Settings.Get("interval").MustInt(1000)
a.MinDocCount = bucketAgg.Settings.Get("min_doc_count").MustInt(0)
if missing, err := bucketAgg.Settings.Get("missing").Int(); err == nil {
a.Missing = &missing
}
aggBuilder = b
})
return aggBuilder
}
func addTermsAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, metrics []*MetricAgg) es.AggBuilder {
aggBuilder.Terms(bucketAgg.ID, bucketAgg.Field, func(a *es.TermsAggregation, b es.AggBuilder) {
if size, err := bucketAgg.Settings.Get("size").Int(); err == nil {
a.Size = size
} else if size, err := bucketAgg.Settings.Get("size").String(); err == nil {
a.Size, err = strconv.Atoi(size)
if err != nil {
a.Size = 500
}
} else {
a.Size = 500
}
if a.Size == 0 {
a.Size = 500
}
if minDocCount, err := bucketAgg.Settings.Get("min_doc_count").Int(); err == nil {
a.MinDocCount = &minDocCount
}
if missing, err := bucketAgg.Settings.Get("missing").String(); err == nil {
a.Missing = &missing
}
if orderBy, err := bucketAgg.Settings.Get("orderBy").String(); err == nil {
/*
The format for extended stats and percentiles is {metricId}[bucket_path]
for everything else it's just {metricId}, _count, _term, or _key
*/
metricIdRegex := regexp.MustCompile(`^(\d+)`)
metricId := metricIdRegex.FindString(orderBy)
if len(metricId) > 0 {
for _, m := range metrics {
if m.ID == metricId {
if m.Type == "count" {
a.Order["_count"] = bucketAgg.Settings.Get("order").MustString("desc")
} else {
a.Order[orderBy] = bucketAgg.Settings.Get("order").MustString("desc")
b.Metric(m.ID, m.Type, m.Field, nil)
}
break
}
}
} else {
a.Order[orderBy] = bucketAgg.Settings.Get("order").MustString("desc")
}
}
aggBuilder = b
})
return aggBuilder
}
func addFiltersAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg) es.AggBuilder {
filters := make(map[string]interface{})
for _, filter := range bucketAgg.Settings.Get("filters").MustArray() {
json := simplejson.NewFromAny(filter)
query := json.Get("query").MustString()
label := json.Get("label").MustString()
if label == "" {
label = query
}
filters[label] = &es.QueryStringFilter{Query: query, AnalyzeWildcard: true}
}
if len(filters) > 0 {
aggBuilder.Filters(bucketAgg.ID, func(a *es.FiltersAggregation, b es.AggBuilder) {
a.Filters = filters
aggBuilder = b
})
}
return aggBuilder
}
func addGeoHashGridAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg) es.AggBuilder {
aggBuilder.GeoHashGrid(bucketAgg.ID, bucketAgg.Field, func(a *es.GeoHashGridAggregation, b es.AggBuilder) {
a.Precision = bucketAgg.Settings.Get("precision").MustInt(3)
aggBuilder = b
})
return aggBuilder
}
type timeSeriesQueryParser struct{}
func newTimeSeriesQueryParser() *timeSeriesQueryParser {
return &timeSeriesQueryParser{}
}
func (p *timeSeriesQueryParser) parse(tsdbQuery []backend.DataQuery) ([]*Query, error) {
queries := make([]*Query, 0)
for _, q := range tsdbQuery {
model, err := simplejson.NewJson(q.JSON)
if err != nil {
return nil, err
}
timeField, err := model.Get("timeField").String()
if err != nil {
return nil, err
}
rawQuery := model.Get("query").MustString()
bucketAggs, err := p.parseBucketAggs(model)
if err != nil {
return nil, err
}
metrics, err := p.parseMetrics(model)
if err != nil {
return nil, err
}
alias := model.Get("alias").MustString("")
interval := model.Get("interval").MustString("")
queries = append(queries, &Query{
TimeField: timeField,
RawQuery: rawQuery,
BucketAggs: bucketAggs,
Metrics: metrics,
Alias: alias,
Interval: interval,
RefID: q.RefID,
MaxDataPoints: q.MaxDataPoints,
})
}
return queries, nil
}
func (p *timeSeriesQueryParser) parseBucketAggs(model *simplejson.Json) ([]*BucketAgg, error) {
var err error
var result []*BucketAgg
for _, t := range model.Get("bucketAggs").MustArray() {
aggJSON := simplejson.NewFromAny(t)
agg := &BucketAgg{}
agg.Type, err = aggJSON.Get("type").String()
if err != nil {
return nil, err
}
agg.ID, err = aggJSON.Get("id").String()
if err != nil {
return nil, err
}
agg.Field = aggJSON.Get("field").MustString()
agg.Settings = simplejson.NewFromAny(aggJSON.Get("settings").MustMap())
result = append(result, agg)
}
return result, nil
}
func (p *timeSeriesQueryParser) parseMetrics(model *simplejson.Json) ([]*MetricAgg, error) {
var err error
var result []*MetricAgg
for _, t := range model.Get("metrics").MustArray() {
metricJSON := simplejson.NewFromAny(t)
metric := &MetricAgg{}
metric.Field = metricJSON.Get("field").MustString()
metric.Hide = metricJSON.Get("hide").MustBool(false)
metric.ID = metricJSON.Get("id").MustString()
metric.PipelineAggregate = metricJSON.Get("pipelineAgg").MustString()
metric.Settings = simplejson.NewFromAny(metricJSON.Get("settings").MustMap())
metric.Meta = simplejson.NewFromAny(metricJSON.Get("meta").MustMap())
metric.Type, err = metricJSON.Get("type").String()
if err != nil {
return nil, err
}
if isPipelineAggWithMultipleBucketPaths(metric.Type) {
metric.PipelineVariables = map[string]string{}
pvArr := metricJSON.Get("pipelineVariables").MustArray()
for _, v := range pvArr {
kv := v.(map[string]interface{})
metric.PipelineVariables[kv["name"].(string)] = kv["pipelineAgg"].(string)
}
}
result = append(result, metric)
}
return result, nil
}