Elasticsearch: Use interval provided by data request in backend (#60480)

* Elasticsearch: Remove interval caulation and use interval provises by grafana

* Remove redundant code

* Adjust snapshot tests

* Update test

* Fix lint
svennergr/test^2
Ivana Huckova 2 years ago committed by GitHub
parent 8b50c60342
commit 772e8cbf60
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      pkg/tsdb/elasticsearch/client/client.go
  2. 3
      pkg/tsdb/elasticsearch/client/client_test.go
  3. 5
      pkg/tsdb/elasticsearch/client/models.go
  4. 9
      pkg/tsdb/elasticsearch/client/search_request.go
  5. 9
      pkg/tsdb/elasticsearch/client/search_request_test.go
  6. 10
      pkg/tsdb/elasticsearch/elasticsearch.go
  7. 4
      pkg/tsdb/elasticsearch/models.go
  8. 4
      pkg/tsdb/elasticsearch/parse_query.go
  9. 2
      pkg/tsdb/elasticsearch/parse_query_test.go
  10. 6
      pkg/tsdb/elasticsearch/querydata_test.go
  11. 3
      pkg/tsdb/elasticsearch/testdata_request/logs.queries.json
  12. 6
      pkg/tsdb/elasticsearch/testdata_request/metric_multi.queries.json
  13. 21
      pkg/tsdb/elasticsearch/time_series_query.go
  14. 132
      pkg/tsdb/elasticsearch/time_series_query_test.go

@ -89,7 +89,7 @@ func (c *baseClientImpl) GetMinInterval(queryInterval string) (time.Duration, er
type multiRequest struct {
header map[string]interface{}
body interface{}
interval intervalv2.Interval
interval time.Duration
}
func (c *baseClientImpl) executeBatchRequest(uriPath, uriQuery string, requests []*multiRequest) (*http.Response, error) {
@ -119,7 +119,7 @@ func (c *baseClientImpl) encodeBatchRequests(requests []*multiRequest) ([]byte,
body := string(reqBody)
body = strings.ReplaceAll(body, "$__interval_ms", strconv.FormatInt(r.interval.Milliseconds(), 10))
body = strings.ReplaceAll(body, "$__interval", r.interval.Text)
body = strings.ReplaceAll(body, "$__interval", r.interval.String())
payload.WriteString(body + "\n")
}

@ -12,7 +12,6 @@ import (
"github.com/Masterminds/semver"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/tsdb/intervalv2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -114,7 +113,7 @@ func createMultisearchForTest(t *testing.T, c Client) (*MultiSearchRequest, erro
t.Helper()
msb := c.MultiSearch()
s := msb.Search(intervalv2.Interval{Value: 15 * time.Second, Text: "15s"})
s := msb.Search(15 * time.Second)
s.Agg().DateHistogram("2", "@timestamp", func(a *DateHistogramAgg, ab AggBuilder) {
a.FixedInterval = "$__interval"

@ -2,14 +2,13 @@ package es
import (
"encoding/json"
"github.com/grafana/grafana/pkg/tsdb/intervalv2"
"time"
)
// SearchRequest represents a search request
type SearchRequest struct {
Index string
Interval intervalv2.Interval
Interval time.Duration
Size int
Sort map[string]interface{}
Query *Query

@ -2,8 +2,7 @@ package es
import (
"strings"
"github.com/grafana/grafana/pkg/tsdb/intervalv2"
"time"
)
const (
@ -14,7 +13,7 @@ const (
// SearchRequestBuilder represents a builder which can build a search request
type SearchRequestBuilder struct {
interval intervalv2.Interval
interval time.Duration
index string
size int
// Currently sort is map, but based in examples it should be an array https://www.elastic.co/guide/en/elasticsearch/reference/current/sort-search-results.html
@ -25,7 +24,7 @@ type SearchRequestBuilder struct {
}
// NewSearchRequestBuilder create a new search request builder
func NewSearchRequestBuilder(interval intervalv2.Interval) *SearchRequestBuilder {
func NewSearchRequestBuilder(interval time.Duration) *SearchRequestBuilder {
builder := &SearchRequestBuilder{
interval: interval,
sort: make(map[string]interface{}),
@ -137,7 +136,7 @@ func NewMultiSearchRequestBuilder() *MultiSearchRequestBuilder {
}
// Search initiates and returns a new search request builder
func (m *MultiSearchRequestBuilder) Search(interval intervalv2.Interval) *SearchRequestBuilder {
func (m *MultiSearchRequestBuilder) Search(interval time.Duration) *SearchRequestBuilder {
b := NewSearchRequestBuilder(interval)
m.requestBuilders = append(m.requestBuilders, b)
return b

@ -6,7 +6,6 @@ import (
"time"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/tsdb/intervalv2"
"github.com/stretchr/testify/require"
)
@ -15,7 +14,7 @@ func TestSearchRequest(t *testing.T) {
timeField := "@timestamp"
setup := func() *SearchRequestBuilder {
return NewSearchRequestBuilder(intervalv2.Interval{Value: 15 * time.Second, Text: "15s"})
return NewSearchRequestBuilder(15 * time.Second)
}
t.Run("When building search request", func(t *testing.T) {
@ -401,7 +400,7 @@ func TestSearchRequest(t *testing.T) {
func TestMultiSearchRequest(t *testing.T) {
t.Run("When adding one search request", func(t *testing.T) {
b := NewMultiSearchRequestBuilder()
b.Search(intervalv2.Interval{Value: 15 * time.Second, Text: "15s"})
b.Search(15 * time.Second)
t.Run("When building search request should contain one search request", func(t *testing.T) {
mr, err := b.Build()
@ -412,8 +411,8 @@ func TestMultiSearchRequest(t *testing.T) {
t.Run("When adding two search requests", func(t *testing.T) {
b := NewMultiSearchRequestBuilder()
b.Search(intervalv2.Interval{Value: 15 * time.Second, Text: "15s"})
b.Search(intervalv2.Interval{Value: 15 * time.Second, Text: "15s"})
b.Search(15 * time.Second)
b.Search(15 * time.Second)
t.Run("When building search request should contain two search requests", func(t *testing.T) {
mr, err := b.Build()

@ -15,14 +15,12 @@ import (
"github.com/grafana/grafana/pkg/infra/httpclient"
"github.com/grafana/grafana/pkg/infra/log"
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
"github.com/grafana/grafana/pkg/tsdb/intervalv2"
)
var eslog = log.New("tsdb.elasticsearch")
type Service struct {
httpClientProvider httpclient.Provider
intervalCalculator intervalv2.Calculator
im instancemgmt.InstanceManager
}
@ -32,7 +30,6 @@ func ProvideService(httpClientProvider httpclient.Provider) *Service {
return &Service{
im: datasource.NewInstanceManager(newInstanceSettings(httpClientProvider)),
httpClientProvider: httpClientProvider,
intervalCalculator: intervalv2.NewCalculator(),
}
}
@ -42,11 +39,11 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
return &backend.QueryDataResponse{}, err
}
return queryData(ctx, req.Queries, dsInfo, s.intervalCalculator)
return queryData(ctx, req.Queries, dsInfo)
}
// separate function to allow testing the whole transformation and query flow
func queryData(ctx context.Context, queries []backend.DataQuery, dsInfo *es.DatasourceInfo, intervalCalculator intervalv2.Calculator) (*backend.QueryDataResponse, error) {
func queryData(ctx context.Context, queries []backend.DataQuery, dsInfo *es.DatasourceInfo) (*backend.QueryDataResponse, error) {
// Support for version after their end-of-life (currently <7.10.0) was removed
lastSupportedVersion, _ := semver.NewVersion("7.10.0")
if dsInfo.ESVersion.LessThan(lastSupportedVersion) {
@ -61,8 +58,7 @@ func queryData(ctx context.Context, queries []backend.DataQuery, dsInfo *es.Data
if err != nil {
return &backend.QueryDataResponse{}, err
}
query := newTimeSeriesQuery(client, queries, intervalCalculator)
query := newTimeSeriesQuery(client, queries)
return query.execute()
}

@ -1,6 +1,8 @@
package elasticsearch
import (
"time"
"github.com/grafana/grafana/pkg/components/simplejson"
)
@ -11,7 +13,7 @@ type Query struct {
BucketAggs []*BucketAgg `json:"bucketAggs"`
Metrics []*MetricAgg `json:"metrics"`
Alias string `json:"alias"`
Interval string
Interval time.Duration
IntervalMs int64
RefID string
MaxDataPoints int64

@ -26,7 +26,8 @@ func parseQuery(tsdbQuery []backend.DataQuery) ([]*Query, error) {
return nil, err
}
alias := model.Get("alias").MustString("")
interval := model.Get("interval").MustString("")
intervalMs := model.Get("intervalMs").MustInt64(0)
interval := q.Interval
queries = append(queries, &Query{
TimeField: timeField,
@ -35,6 +36,7 @@ func parseQuery(tsdbQuery []backend.DataQuery) ([]*Query, error) {
Metrics: metrics,
Alias: alias,
Interval: interval,
IntervalMs: intervalMs,
RefID: q.RefID,
MaxDataPoints: q.MaxDataPoints,
})

@ -70,7 +70,7 @@ func TestParseQuery(t *testing.T) {
require.Equal(t, q.TimeField, "@timestamp")
require.Equal(t, q.RawQuery, "@metric:cpu")
require.Equal(t, q.Alias, "{{@hostname}} {{metric}}")
require.Equal(t, q.Interval, "10m")
require.Equal(t, q.Interval.String(), "10s")
require.Len(t, q.Metrics, 2)
require.Equal(t, q.Metrics[0].Field, "@value")

@ -12,7 +12,6 @@ import (
"github.com/Masterminds/semver"
"github.com/grafana/grafana-plugin-sdk-go/backend"
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
"github.com/grafana/grafana/pkg/tsdb/intervalv2"
)
type queryDataTestRoundTripper struct {
@ -56,6 +55,7 @@ func newFlowTestDsInfo(body []byte, reuestCallback func(req *http.Request) error
type queryDataTestQueryJSON struct {
IntervalMs int64
Interval time.Duration
MaxDataPoints int64
RefID string
}
@ -90,7 +90,7 @@ func newFlowTestQueries(allJsonBytes []byte) ([]backend.DataQuery, error) {
query := backend.DataQuery{
RefID: jsonInfo.RefID,
MaxDataPoints: jsonInfo.MaxDataPoints,
Interval: time.Duration(jsonInfo.IntervalMs) * time.Millisecond,
Interval: jsonInfo.Interval,
TimeRange: timeRange,
JSON: jsonBytes,
}
@ -130,7 +130,7 @@ func queryDataTest(queriesBytes []byte, responseBytes []byte) (queryDataTestResu
return nil
})
result, err := queryData(context.Background(), queries, dsInfo, intervalv2.NewCalculator())
result, err := queryData(context.Background(), queries, dsInfo)
if err != nil {
return queryDataTestResult{}, err
}

@ -17,7 +17,8 @@
"timeField": "testtime",
"key": "Q-ee8fea91-a4c4-4ded-9827-b362476a4083-0",
"datasourceId": 39,
"intervalMs": 2000,
"intervalMs": 1000,
"interval": 1000000000,
"maxDataPoints": 1318
}
]

@ -15,7 +15,8 @@
}
],
"timeField": "testtime",
"intervalMs": 30000,
"intervalMs": 1000,
"interval": 1000000000,
"maxDataPoints": 814
},
{
@ -35,7 +36,8 @@
}
],
"timeField": "testtime",
"intervalMs": 30000,
"intervalMs": 1000,
"interval": 1000000000,
"maxDataPoints": 814
}
]

@ -9,7 +9,6 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/components/simplejson"
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
"github.com/grafana/grafana/pkg/tsdb/intervalv2"
)
const (
@ -17,17 +16,14 @@ const (
)
type timeSeriesQuery struct {
client es.Client
dataQueries []backend.DataQuery
intervalCalculator intervalv2.Calculator
client es.Client
dataQueries []backend.DataQuery
}
var newTimeSeriesQuery = func(client es.Client, dataQuery []backend.DataQuery,
intervalCalculator intervalv2.Calculator) *timeSeriesQuery {
var newTimeSeriesQuery = func(client es.Client, dataQuery []backend.DataQuery) *timeSeriesQuery {
return &timeSeriesQuery{
client: client,
dataQueries: dataQuery,
intervalCalculator: intervalCalculator,
client: client,
dataQueries: dataQuery,
}
}
@ -65,14 +61,9 @@ func (e *timeSeriesQuery) execute() (*backend.QueryDataResponse, error) {
func (e *timeSeriesQuery) processQuery(q *Query, ms *es.MultiSearchRequestBuilder, from, to int64,
result backend.QueryDataResponse) error {
minInterval, err := e.client.GetMinInterval(q.Interval)
if err != nil {
return err
}
interval := e.intervalCalculator.Calculate(e.dataQueries[0].TimeRange, minInterval, q.MaxDataPoints)
defaultTimeField := e.client.GetTimeField()
b := ms.Search(interval)
b := ms.Search(q.Interval)
b.Size(0)
filters := b.Query().Bool().Filter()
filters.AddDateRangeFilter(e.client.GetTimeField(), to, from, es.DateFormatEpochMS)

@ -7,7 +7,6 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend"
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
"github.com/grafana/grafana/pkg/tsdb/intervalv2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -25,7 +24,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
"timeField": "@timestamp",
"bucketAggs": [{ "type": "date_histogram", "field": "@timestamp", "id": "2" }],
"metrics": [{"type": "count", "id": "0" }]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
rangeFilter := sr.Query.Bool.Filters[0].(*es.RangeFilter)
@ -45,7 +44,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
"timeField": "@timestamp",
"bucketAggs": [{ "type": "date_histogram", "field": "@timestamp", "id": "1" }],
"metrics": [{"type": "avg", "id": "0", "settings": {"missing": "null", "script": "1" } }]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
firstLevel := sr.Aggs[0]
@ -63,7 +62,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
{ "type": "date_histogram", "field": "@timestamp", "id": "3" }
],
"metrics": [{"type": "count", "id": "1" }]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
firstLevel := sr.Aggs[0]
@ -84,7 +83,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
{ "type": "date_histogram", "field": "@timestamp", "id": "2" }
],
"metrics": [{"type": "avg", "field": "@value", "id": "1" }]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
firstLevel := sr.Aggs[0]
@ -113,7 +112,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
{"type": "count", "id": "1" },
{"type": "avg", "field": "@value", "id": "5" }
]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
firstLevel := sr.Aggs[0]
@ -137,7 +136,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
{"type": "count", "id": "1" },
{"type": "avg", "field": "@value", "id": "5" }
]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -168,7 +167,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
"metrics": [
{"type": "count", "id": "1" }
]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -193,7 +192,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
},
{ "type": "date_histogram", "field": "@timestamp", "id": "3" }
]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
firstLevel := sr.Aggs[0]
@ -219,7 +218,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
"metrics": [
{"type": "percentiles", "field": "@value", "id": "1", "settings": { "percents": ["95","99"] } }
]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -248,7 +247,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
"metrics": [
{"type": "extended_stats", "field": "@value", "id": "1", "meta": { "std_deviation": true } }
]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -279,7 +278,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
{"type": "count", "id": "1" },
{"type": "avg", "field": "@value", "id": "5" }
]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -305,7 +304,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
"metrics": [
{"type": "count", "id": "1" }
]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
firstLevel := sr.Aggs[0]
@ -332,7 +331,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
}
}
]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -364,7 +363,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
{ "type": "date_histogram", "field": "@timestamp", "id": "4" }
],
"metrics": [{"type": "count", "id": "1" }]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -395,7 +394,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
{ "type": "date_histogram", "field": "@timestamp", "id": "4" }
],
"metrics": [{"type": "count", "id": "1" }]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -417,7 +416,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
"timeField": "@timestamp",
"bucketAggs": [],
"metrics": [{ "id": "1", "type": "raw_document", "settings": {} }]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -430,7 +429,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
"timeField": "@timestamp",
"bucketAggs": [],
"metrics": [{ "id": "1", "type": "raw_document", "settings": {} }]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -452,7 +451,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
"timeField": "@timestamp",
"bucketAggs": [],
"metrics": [{ "id": "1", "type": "raw_data", "settings": {} }]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -474,7 +473,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
"timeField": "@timestamp",
"bucketAggs": [],
"metrics": [{ "id": "1", "type": "raw_document", "settings": { "size": 1337 } }]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -494,7 +493,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
}
],
"metrics": [{"type": "count", "id": "1" }]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -521,7 +520,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
}
],
"metrics": [{"type": "count", "id": "1" }]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -544,7 +543,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
}
],
"metrics": [{"type": "count", "id": "1" }]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -567,7 +566,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
}
],
"metrics": [{"type": "count", "id": "1" }]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -589,7 +588,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
}
],
"metrics": [{"type": "count", "id": "1" }]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -616,7 +615,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
}
],
"metrics": [{"type": "count", "id": "1" }]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -642,7 +641,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
}
],
"metrics": [{"type": "count", "id": "1" }]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -669,7 +668,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
"field": "3"
}
]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
firstLevel := sr.Aggs[0]
@ -705,7 +704,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
"pipelineAgg": "3"
}
]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -742,7 +741,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
"field": "3"
}
]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -774,7 +773,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
"pipelineAgg": "3"
}
]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -809,7 +808,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
"type": "moving_avg"
}
]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -846,7 +845,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
"pipelineAgg": "Metric to apply moving average"
}
]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -872,7 +871,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
"metrics": [
{ "id": "2", "type": "top_metrics", "settings": { "order": "desc", "orderBy": "@timestamp", "metrics": ["@value"]} }
]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
firstLevel := sr.Aggs[0]
@ -902,7 +901,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
"pipelineAgg": "3"
}
]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -940,7 +939,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
"pipelineAgg": "3"
}
]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -976,7 +975,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
"pipelineAgg": "Metric to apply cumulative sum"
}
]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -1007,7 +1006,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
"pipelineAgg": "3"
}
]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -1037,7 +1036,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
"pipelineAgg": "3"
}
]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -1066,7 +1065,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
"field": "3"
}
]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -1097,7 +1096,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
"settings": { "lag": "5" }
}
]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -1128,7 +1127,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
"settings": { "lag": "5" }
}
]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -1157,7 +1156,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
"pipelineAgg": "3"
}
]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -1191,7 +1190,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
"settings": { "script": "params.var1 * params.var2" }
}
]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -1228,7 +1227,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
"settings": { "script": "params.var1 * params.var2" }
}
]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -1263,7 +1262,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
"settings": { "script": "params.var1 * 1000" }
}
]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -1297,7 +1296,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
"settings": { "script": "params.var1 * 1000" }
}
]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -1318,7 +1317,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
_, err := executeTsdbQuery(c, `{
"timeField": "@timestamp",
"query": "foo"
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
filter := sr.Query.Bool.Filters[1].(*es.QueryStringFilter)
@ -1331,7 +1330,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
_, err := executeTsdbQuery(c, `{
"timeField": "@timestamp",
"query": "foo"
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
filter := sr.Query.Bool.Filters[1].(*es.QueryStringFilter)
@ -1344,7 +1343,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
_, err := executeTsdbQuery(c, `{
"timeField": "@timestamp",
"metrics": [{ "type": "logs", "id": "1"}]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
require.Equal(t, sr.Size, defaultSize)
@ -1377,7 +1376,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
_, err := executeTsdbQuery(c, `{
"timeField": "@timestamp",
"metrics": [{ "type": "logs", "id": "1", "settings": { "limit": 1000 }}]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
require.Equal(t, sr.Size, 1000)
@ -1388,7 +1387,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
_, err := executeTsdbQuery(c, `{
"timeField": "@timestamp",
"metrics": [{ "type": "logs", "id": "1" }]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
require.Equal(t, sr.CustomProps["highlight"], map[string]interface{}{
@ -1431,7 +1430,7 @@ func TestSettingsCasting(t *testing.T) {
}
],
"bucketAggs": [{"type": "date_histogram", "field": "@timestamp", "id": "1"}]
}`, from, to, 15*time.Second)
}`, from, to)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
movingAvgSettings := sr.Aggs[0].Aggregation.Aggs[1].Aggregation.Aggregation.(*es.PipelineAggregation).Settings
@ -1475,7 +1474,7 @@ func TestSettingsCasting(t *testing.T) {
}
}
]
}`, from, to, 15*time.Second)
}`, from, to)
assert.Nil(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -1510,7 +1509,7 @@ func TestSettingsCasting(t *testing.T) {
}
}
]
}`, from, to, 15*time.Second)
}`, from, to)
assert.Nil(t, err)
sr := c.multisearchRequests[0].Requests[0]
serialDiffSettings := sr.Aggs[0].Aggregation.Aggs[1].Aggregation.Aggregation.(*es.PipelineAggregation).Settings
@ -1537,7 +1536,7 @@ func TestSettingsCasting(t *testing.T) {
}
}
]
}`, from, to, 15*time.Second)
}`, from, to)
assert.Nil(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -1573,7 +1572,7 @@ func TestSettingsCasting(t *testing.T) {
}
}
]
}`, from, to, 15*time.Second)
}`, from, to)
assert.Nil(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -1608,7 +1607,7 @@ func TestSettingsCasting(t *testing.T) {
}
}
]
}`, from, to, 15*time.Second)
}`, from, to)
assert.Nil(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -1635,7 +1634,7 @@ func TestSettingsCasting(t *testing.T) {
"metrics": [
{ "id": "1", "type": "average", "field": "@value" }
]
}`, from, to, 15*time.Second)
}`, from, to)
assert.Nil(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -1672,7 +1671,7 @@ func TestSettingsCasting(t *testing.T) {
}
}
]
}`, from, to, 15*time.Second)
}`, from, to)
assert.Nil(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -1694,7 +1693,7 @@ func TestSettingsCasting(t *testing.T) {
"bucketAggs": [
{ "type": "date_histogram", "id": "2", "settings": { "min_doc_count": "1" } }
]
}`, from, to, 15*time.Second)
}`, from, to)
assert.Nil(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -1710,7 +1709,7 @@ func TestSettingsCasting(t *testing.T) {
"bucketAggs": [
{ "type": "date_histogram", "id": "2", "field": "@time", "settings": { "min_doc_count": "1" } }
]
}`, from, to, 15*time.Second)
}`, from, to)
assert.Nil(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -1726,7 +1725,7 @@ func TestSettingsCasting(t *testing.T) {
"bucketAggs": [
{ "type": "date_histogram", "id": "2", "field": "@time", "settings": { "min_doc_count": "1", "interval": "1d" } }
]
}`, from, to, 15*time.Second)
}`, from, to)
assert.Nil(t, err)
sr := c.multisearchRequests[0].Requests[0]
@ -1774,13 +1773,14 @@ func newDataQuery(body string) (backend.QueryDataRequest, error) {
return backend.QueryDataRequest{
Queries: []backend.DataQuery{
{
JSON: json.RawMessage(body),
JSON: json.RawMessage(body),
Interval: 10 * time.Second,
},
},
}, nil
}
func executeTsdbQuery(c es.Client, body string, from, to time.Time, minInterval time.Duration) (
func executeTsdbQuery(c es.Client, body string, from, to time.Time) (
*backend.QueryDataResponse, error) {
timeRange := backend.TimeRange{
From: from,
@ -1794,6 +1794,6 @@ func executeTsdbQuery(c es.Client, body string, from, to time.Time, minInterval
},
},
}
query := newTimeSeriesQuery(c, dataRequest.Queries, intervalv2.NewCalculator(intervalv2.CalculatorOptions{MinInterval: minInterval}))
query := newTimeSeriesQuery(c, dataRequest.Queries)
return query.execute()
}

Loading…
Cancel
Save