diff --git a/pkg/tsdb/elasticsearch/time_series_query.go b/pkg/tsdb/elasticsearch/time_series_query.go index 3a98455fa17..fb14dc11419 100644 --- a/pkg/tsdb/elasticsearch/time_series_query.go +++ b/pkg/tsdb/elasticsearch/time_series_query.go @@ -172,16 +172,17 @@ func (e *timeSeriesQuery) processQuery(q *Query, ms *es.MultiSearchRequestBuilde continue } } else { - if _, err := strconv.Atoi(m.PipelineAggregate); err == nil { + pipelineAggField := getPipelineAggField(m) + if _, err := strconv.Atoi(pipelineAggField); err == nil { var appliedAgg *MetricAgg for _, pipelineMetric := range q.Metrics { - if pipelineMetric.ID == m.PipelineAggregate { + if pipelineMetric.ID == pipelineAggField { appliedAgg = pipelineMetric break } } if appliedAgg != nil { - bucketPath := m.PipelineAggregate + bucketPath := pipelineAggField if appliedAgg.Type == countType { bucketPath = "_count" } @@ -397,3 +398,16 @@ func addGeoHashGridAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg) es.AggBui return aggBuilder } + +func getPipelineAggField(m *MetricAgg) string { + // In frontend we are using Field as pipelineAggField + // There might be historical reason why in backend we were using PipelineAggregate as pipelineAggField + // So for now let's check Field first and then PipelineAggregate to ensure that we are not breaking anything + // TODO: Investigate, if we can remove check for PipelineAggregate + pipelineAggField := m.Field + + if pipelineAggField == "" { + pipelineAggField = m.PipelineAggregate + } + return pipelineAggField +} diff --git a/pkg/tsdb/elasticsearch/time_series_query_test.go b/pkg/tsdb/elasticsearch/time_series_query_test.go index 9ff8b48f11b..2d859858a1c 100644 --- a/pkg/tsdb/elasticsearch/time_series_query_test.go +++ b/pkg/tsdb/elasticsearch/time_series_query_test.go @@ -674,8 +674,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) { sr := c.multisearchRequests[0].Requests[0] firstLevel := sr.Aggs[0] require.Equal(t, firstLevel.Key, "4") - // FIXME: Currently this is 1 as movingAvg is completely missing. We have only sum. - // require.Equal(t, len(firstLevel.Aggregation.Aggs), 2) + require.Equal(t, len(firstLevel.Aggregation.Aggs), 2) sumAgg := firstLevel.Aggregation.Aggs[0] require.Equal(t, sumAgg.Key, "3") @@ -683,17 +682,14 @@ func TestExecuteTimeSeriesQuery(t *testing.T) { mAgg := sumAgg.Aggregation.Aggregation.(*es.MetricAggregation) require.Equal(t, mAgg.Field, "@value") - // FIXME: This is currently fully missing - // in the test bellow with pipelineAgg it is working as expected - // movingAvgAgg := firstLevel.Aggregation.Aggs[1] - // require.Equal(t, movingAvgAgg.Key, "2") - // require.Equal(t, movingAvgAgg.Aggregation.Type, "moving_avg") - // pl := movingAvgAgg.Aggregation.Aggregation.(*es.PipelineAggregation) - // require.Equal(t, pl.BucketPath, "3") + movingAvgAgg := firstLevel.Aggregation.Aggs[1] + require.Equal(t, movingAvgAgg.Key, "2") + require.Equal(t, movingAvgAgg.Aggregation.Type, "moving_avg") + pl := movingAvgAgg.Aggregation.Aggregation.(*es.PipelineAggregation) + require.Equal(t, pl.BucketPath, "3") }) t.Run("With moving average", func(t *testing.T) { - // This test is with pipelineAgg and is passing. Same test without pipelineAgg is failing. c := newFakeClient() _, err := executeTsdbQuery(c, `{ "timeField": "@timestamp", @@ -753,19 +749,16 @@ func TestExecuteTimeSeriesQuery(t *testing.T) { firstLevel := sr.Aggs[0] require.Equal(t, firstLevel.Key, "4") require.Equal(t, firstLevel.Aggregation.Type, "date_histogram") - // FIXME: Currently, movingAvg is completely missing - // in the test bellow with pipelineAgg it is working as expected - // require.Len(t, firstLevel.Aggregation.Aggs, 1) - - // movingAvgAgg := firstLevel.Aggregation.Aggs[0] - // require.Equal(t, movingAvgAgg.Key, "2") - // require.Equal(t, movingAvgAgg.Aggregation.Type, "moving_avg") - // pl := movingAvgAgg.Aggregation.Aggregation.(*es.PipelineAggregation) - // require.Equal(t, pl.BucketPath, "_count") + require.Len(t, firstLevel.Aggregation.Aggs, 1) + + movingAvgAgg := firstLevel.Aggregation.Aggs[0] + require.Equal(t, movingAvgAgg.Key, "2") + require.Equal(t, movingAvgAgg.Aggregation.Type, "moving_avg") + pl := movingAvgAgg.Aggregation.Aggregation.(*es.PipelineAggregation) + require.Equal(t, pl.BucketPath, "_count") }) t.Run("With moving average doc count", func(t *testing.T) { - // This test is with pipelineAgg and is passing. Same test without pipelineAgg is failing. c := newFakeClient() _, err := executeTsdbQuery(c, `{ "timeField": "@timestamp", @@ -822,21 +815,18 @@ func TestExecuteTimeSeriesQuery(t *testing.T) { firstLevel := sr.Aggs[0] require.Equal(t, firstLevel.Key, "3") - // FIXME: Currently, movingAvg is completely missing - // in the test bellow with pipelineAgg it is working as expected - // require.Len(t, firstLevel.Aggregation.Aggs, 2) + require.Len(t, firstLevel.Aggregation.Aggs, 2) sumAgg := firstLevel.Aggregation.Aggs[0] require.Equal(t, sumAgg.Key, "3") - // movingAvgAgg := firstLevel.Aggregation.Aggs[1] - // require.Equal(t, movingAvgAgg.Key, "2") - // plAgg := movingAvgAgg.Aggregation.Aggregation.(*es.PipelineAggregation) - // require.Equal(t, plAgg.BucketPath, "3") + movingAvgAgg := firstLevel.Aggregation.Aggs[1] + require.Equal(t, movingAvgAgg.Key, "2") + plAgg := movingAvgAgg.Aggregation.Aggregation.(*es.PipelineAggregation) + require.Equal(t, plAgg.BucketPath, "3") }) t.Run("With broken moving average", func(t *testing.T) { - // This test is with pipelineAgg and is passing. Same test without pipelineAgg is failing. c := newFakeClient() _, err := executeTsdbQuery(c, `{ "timeField": "@timestamp", @@ -1084,12 +1074,10 @@ func TestExecuteTimeSeriesQuery(t *testing.T) { require.Equal(t, firstLevel.Key, "4") require.Equal(t, firstLevel.Aggregation.Type, "date_histogram") - // FIXME: This is currently fully missing - // in the test above with pipelineAgg it is working as expected - // derivativeAgg := firstLevel.Aggregation.Aggs[0] - // require.Equal(t, derivativeAgg.Key, "2") - // plAgg := derivativeAgg.Aggregation.Aggregation.(*es.PipelineAggregation) - // require.Equal(t, plAgg.BucketPath, "_count") + derivativeAgg := firstLevel.Aggregation.Aggs[0] + require.Equal(t, derivativeAgg.Key, "2") + plAgg := derivativeAgg.Aggregation.Aggregation.(*es.PipelineAggregation) + require.Equal(t, plAgg.BucketPath, "_count") }) t.Run("With serial_diff", func(t *testing.T) { @@ -1147,13 +1135,11 @@ func TestExecuteTimeSeriesQuery(t *testing.T) { firstLevel := sr.Aggs[0] require.Equal(t, firstLevel.Key, "3") require.Equal(t, firstLevel.Aggregation.Type, "date_histogram") - // FIXME: This is currently fully missing - // in the test above with pipelineAgg it is working as expected - // serialDiffAgg := firstLevel.Aggregation.Aggs[1] - // require.Equal(t, serialDiffAgg.Key, "2") - // plAgg := serialDiffAgg.Aggregation.Aggregation.(*es.PipelineAggregation) - // require.Equal(t, plAgg.BucketPath, "3") - // require.Equal(t, plAgg.Settings["lag"], "5") + serialDiffAgg := firstLevel.Aggregation.Aggs[1] + require.Equal(t, serialDiffAgg.Key, "2") + plAgg := serialDiffAgg.Aggregation.Aggregation.(*es.PipelineAggregation) + require.Equal(t, plAgg.BucketPath, "3") + require.Equal(t, plAgg.Settings["lag"], 5.0) }) t.Run("With serial_diff doc count", func(t *testing.T) { @@ -1447,20 +1433,18 @@ func TestSettingsCasting(t *testing.T) { "bucketAggs": [{"type": "date_histogram", "field": "@timestamp", "id": "1"}] }`, from, to, 15*time.Second) require.NoError(t, err) - // FIXME - // This is working correctly if instead of field we use pipelineAgg - // sr := c.multisearchRequests[0].Requests[0] - // movingAvgSettings := sr.Aggs[0].Aggregation.Aggs[1].Aggregation.Aggregation.(*es.PipelineAggregation).Settings + sr := c.multisearchRequests[0].Requests[0] + movingAvgSettings := sr.Aggs[0].Aggregation.Aggs[1].Aggregation.Aggregation.(*es.PipelineAggregation).Settings - // assert.Equal(t, movingAvgSettings["window"], 5) - // assert.Equal(t, movingAvgSettings["predict"], 10) + assert.Equal(t, movingAvgSettings["window"], 5.0) + assert.Equal(t, movingAvgSettings["predict"], 10.0) - // modelSettings := movingAvgSettings["settings"].(map[string]interface{}) + modelSettings := movingAvgSettings["settings"].(map[string]interface{}) - // assert.Equal(t, modelSettings["alpha"], 1) - // assert.Equal(t, modelSettings["beta"], 2) - // assert.Equal(t, modelSettings["gamma"], 3) - // assert.Equal(t, modelSettings["period"], 4) + assert.Equal(t, modelSettings["alpha"], 1.0) + assert.Equal(t, modelSettings["beta"], 2.0) + assert.Equal(t, modelSettings["gamma"], 3.0) + assert.Equal(t, modelSettings["period"], 4.0) }) t.Run("Correctly transforms moving_average settings", func(t *testing.T) { @@ -1528,10 +1512,9 @@ func TestSettingsCasting(t *testing.T) { ] }`, from, to, 15*time.Second) assert.Nil(t, err) - // FIXME This fails, but if we add pipelineAgg it works - // sr := c.multisearchRequests[0].Requests[0] - // serialDiffSettings := sr.Aggs[0].Aggregation.Aggs[1].Aggregation.Aggregation.(*es.PipelineAggregation).Settings - // assert.Equal(t, serialDiffSettings["lag"], 1.) + sr := c.multisearchRequests[0].Requests[0] + serialDiffSettings := sr.Aggs[0].Aggregation.Aggs[1].Aggregation.Aggregation.(*es.PipelineAggregation).Settings + assert.Equal(t, serialDiffSettings["lag"], 1.) }) t.Run("Correctly transforms serial_diff settings", func(t *testing.T) {