InfluxDB: Refactor frame and field creation (#97635)

* refactor frame and field creation

* use influxql package to get the type of the query

* remove unnecessary tests

* add influxql in go.mod

* fix unit test

* update ownership

* update query expression
pull/98322/head
ismail simsek 6 months ago committed by GitHub
parent 293a90f76f
commit 58d17ecad1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 3
      go.mod
  2. 2
      go.sum
  3. 50
      pkg/tsdb/influxdb/influxql/buffered/response_parser.go
  4. 10
      pkg/tsdb/influxdb/influxql/buffered/response_parser_test.go
  5. 22
      pkg/tsdb/influxdb/influxql/converter/converter.go
  6. 4
      pkg/tsdb/influxdb/influxql/influxql.go
  7. 10
      pkg/tsdb/influxdb/influxql/querydata/stream_parser_test.go
  8. 13
      pkg/tsdb/influxdb/models/model_parser.go
  9. 12
      pkg/tsdb/influxdb/models/model_parser_test.go
  10. 7
      pkg/tsdb/influxdb/models/models.go

@ -111,7 +111,7 @@ require (
github.com/hashicorp/golang-lru/v2 v2.0.7 // @grafana/alerting-backend
github.com/hashicorp/hcl/v2 v2.17.0 // @grafana/alerting-backend
github.com/huandu/xstrings v1.5.0 // @grafana/partner-datasources
github.com/influxdata/influxdb-client-go/v2 v2.13.0 // @grafana/observability-metrics
github.com/influxdata/influxdb-client-go/v2 v2.13.0 // @grafana/partner-datasources
github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf // @grafana/grafana-app-platform-squad
github.com/jmespath/go-jmespath v0.4.0 // indirect; @grafana/grafana-backend-group
github.com/jmoiron/sqlx v1.3.5 // @grafana/grafana-backend-group
@ -470,6 +470,7 @@ require (
require (
github.com/getkin/kin-openapi v0.128.0 // @grafana/grafana-app-platform-squad
github.com/grafana/grafana/apps/playlist v0.0.0-20241105090059-facca37f4d1f // @grafana/grafana-app-platform-squad
github.com/influxdata/influxql v1.4.0 // @grafana/partner-datasources
)
require github.com/jmespath-community/go-jmespath v1.1.1 // @grafana/identity-access-team

@ -2528,6 +2528,8 @@ github.com/influxdata/influxdb-client-go/v2 v2.13.0 h1:ioBbLmR5NMbAjP4UVA5r9b5xG
github.com/influxdata/influxdb-client-go/v2 v2.13.0/go.mod h1:k+spCbt9hcvqvUiz0sr5D8LolXHqAAOfPw9v/RIRHl4=
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/influxdata/influxql v1.4.0 h1:Lf62rbAF8KWQf+4Djqf4hVXgmQuGozUoSD6kNWjye44=
github.com/influxdata/influxql v1.4.0/go.mod h1:VqxAKyQz5p8GzgGsxWalCWYGxEqw6kvJo2IickMQiQk=
github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf h1:7JTmneyiNEwVBOHSjoMxiWAqB992atOeepeFYegn5RU=
github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/invopop/jsonschema v0.12.0 h1:6ovsNSuvn9wEQVOyc72aycBMVQFKz7cPdMJn10CvzRI=

@ -9,6 +9,7 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/influxdata/influxql"
jsoniter "github.com/json-iterator/go"
"github.com/grafana/grafana/pkg/tsdb/influxdb/influxql/util"
@ -346,23 +347,38 @@ func newFrameWithTimeField(row models.Row, column string, colIndex int, query mo
func newFrameWithoutTimeField(row models.Row, query models.Query) *data.Frame {
var values []*string
for _, valuePair := range row.Values {
if strings.Contains(strings.ToLower(query.RawQuery), strings.ToLower("CARDINALITY")) {
values = append(values, util.ParseString(valuePair[0]))
} else {
if strings.Contains(strings.ToLower(query.RawQuery), strings.ToLower("SHOW TAG VALUES")) {
if len(valuePair) >= 2 {
values = append(values, util.ParseString(valuePair[1]))
}
} else if strings.Contains(strings.ToLower(query.RawQuery), strings.ToLower("SHOW DIAGNOSTICS")) {
// https://docs.influxdata.com/platform/monitoring/influxdata-platform/tools/show-diagnostics/
for _, vp := range valuePair {
values = append(values, util.ParseString(vp))
}
} else {
if len(valuePair) >= 1 {
values = append(values, util.ParseString(valuePair[0]))
}
switch query.Statement.(type) {
case *influxql.ShowMeasurementCardinalityStatement,
*influxql.ShowSeriesCardinalityStatement,
*influxql.ShowFieldKeyCardinalityStatement,
*influxql.ShowTagValuesCardinalityStatement,
*influxql.ShowTagKeyCardinalityStatement:
// Handle all CARDINALITY queries
for _, valuePair := range row.Values {
if len(valuePair) >= 1 {
values = append(values, util.ParseString(valuePair[0]))
}
}
case *influxql.ShowDiagnosticsStatement:
// Handle SHOW DIAGNOSTICS
// https://docs.influxdata.com/platform/monitoring/influxdata-platform/tools/show-diagnostics/
for _, valuePair := range row.Values {
for _, vp := range valuePair {
values = append(values, util.ParseString(vp))
}
}
case *influxql.ShowTagValuesStatement:
// Handle SHOW TAG VALUES (non-CARDINALITY)
for _, valuePair := range row.Values {
if len(valuePair) >= 2 {
values = append(values, util.ParseString(valuePair[1]))
}
}
default:
// Handle other queries
for _, valuePair := range row.Values {
if len(valuePair) >= 1 {
values = append(values, util.ParseString(valuePair[0]))
}
}
}

@ -13,6 +13,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana-plugin-sdk-go/experimental"
"github.com/influxdata/influxql"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/tsdb/influxdb/influxql/util"
@ -34,11 +35,13 @@ func readJsonFile(filePath string) io.ReadCloser {
}
func generateQuery(query, resFormat, alias string) *models.Query {
statement, _ := influxql.ParseStatement(query)
return &models.Query{
RawQuery: query,
UseRawQuery: true,
Alias: alias,
ResultFormat: resFormat,
Statement: statement,
}
}
@ -331,6 +334,13 @@ func TestInfluxdbResponseParser(t *testing.T) {
})
})
t.Run("create frames for tag values and without time column even the query string has cardinality as string", func(t *testing.T) {
res := ResponseParse(readJsonFile("show_tag_values_response"), 200, generateQuery("SHOW TAG VALUES FROM custom_influxdb_cardinality WITH KEY = \"database\"", "time_series", ""))
require.NoError(t, res.Error)
require.Equal(t, "Value", res.Frames[0].Fields[0].Name)
require.Equal(t, "cpu-total", *res.Frames[0].Fields[0].At(0).(*string))
})
t.Run("Influxdb response parser with errors", func(t *testing.T) {
result := ResponseParse(readJsonFile("error_response"), 200, generateQuery("Test raw query", "time_series", ""))

@ -4,12 +4,12 @@ import (
"errors"
"fmt"
"strconv"
"strings"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
sdkjsoniter "github.com/grafana/grafana-plugin-sdk-go/data/utils/jsoniter"
"github.com/influxdata/influxql"
jsoniter "github.com/json-iterator/go"
"github.com/grafana/grafana/pkg/tsdb/influxdb/influxql/util"
@ -382,8 +382,13 @@ func handleTimeSeriesFormatWithTimeColumn(valueFields data.Fields, tags map[stri
}
func handleTimeSeriesFormatWithoutTimeColumn(valueFields data.Fields, columns []string, measurement string, query *models.Query) *data.Frame {
// Frame without time column
if strings.Contains(strings.ToLower(query.RawQuery), strings.ToLower("CARDINALITY")) {
switch query.Statement.(type) {
case *influxql.ShowMeasurementCardinalityStatement,
*influxql.ShowSeriesCardinalityStatement,
*influxql.ShowFieldKeyCardinalityStatement,
*influxql.ShowTagValuesCardinalityStatement,
*influxql.ShowTagKeyCardinalityStatement:
// Handle all CARDINALITY queries
var stringArray []*string
for _, v := range valueFields {
if f, ok := v.At(0).(*float64); ok {
@ -394,14 +399,15 @@ func handleTimeSeriesFormatWithoutTimeColumn(valueFields data.Fields, columns []
}
}
return data.NewFrame(measurement, data.NewField("Value", nil, stringArray))
}
if len(columns) >= 2 && strings.Contains(strings.ToLower(query.RawQuery), strings.ToLower("SHOW TAG VALUES")) {
case *influxql.ShowTagValuesStatement:
// Handle SHOW TAG VALUES (non-CARDINALITY)
return data.NewFrame(measurement, valueFields[1])
}
if len(columns) >= 1 {
default:
// Handle generic queries with at least one column
return data.NewFrame(measurement, valueFields[0])
}
return nil
}
func handleTableFormatFirstFrame(rsp *backend.DataResponse, measurement string, query *models.Query) {

@ -49,7 +49,7 @@ func Query(ctx context.Context, tracer trace.Tracer, dsInfo *models.DatasourceIn
responseLock := sync.Mutex{}
err = concurrency.ForEachJob(ctx, len(req.Queries), concurrentQueryCount, func(ctx context.Context, idx int) error {
reqQuery := req.Queries[idx]
query, err := models.QueryParse(reqQuery)
query, err := models.QueryParse(reqQuery, logger)
if err != nil {
return err
}
@ -88,7 +88,7 @@ func Query(ctx context.Context, tracer trace.Tracer, dsInfo *models.DatasourceIn
}
} else {
for _, reqQuery := range req.Queries {
query, err := models.QueryParse(reqQuery)
query, err := models.QueryParse(reqQuery, logger)
if err != nil {
return &backend.QueryDataResponse{}, err
}

@ -10,6 +10,7 @@ import (
"testing"
"github.com/grafana/grafana-plugin-sdk-go/experimental"
"github.com/influxdata/influxql"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/tsdb/influxdb/models"
@ -30,11 +31,13 @@ func readJsonFile(filePath string) io.ReadCloser {
}
func generateQuery(query, resFormat, alias string) *models.Query {
statement, _ := influxql.ParseStatement(query)
return &models.Query{
RawQuery: query,
UseRawQuery: true,
Alias: alias,
ResultFormat: resFormat,
Statement: statement,
}
}
@ -104,6 +107,13 @@ func TestParsingAsTimeSeriesWithoutTimeColumn(t *testing.T) {
runQuery(t, f, "cardinality", "time_series", query)
})
t.Run("create frames for tag values and without time column even the query string has cardinality as string", func(t *testing.T) {
res := ResponseParse(readJsonFile("show_tag_values_response"), 200, generateQuery("SHOW TAG VALUES FROM custom_influxdb_cardinality WITH KEY = \"database\"", "time_series", ""))
require.NoError(t, res.Error)
require.Equal(t, "Value", res.Frames[0].Fields[0].Name)
require.Equal(t, "cpu-total", *res.Frames[0].Fields[0].At(0).(*string))
})
}
func TestInfluxDBStreamingParser(t *testing.T) {

@ -6,13 +6,15 @@ import (
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/influxdata/influxql"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/log"
)
type InfluxdbQueryParser struct{}
func QueryParse(query backend.DataQuery) (*Query, error) {
func QueryParse(query backend.DataQuery, logger log.Logger) (*Query, error) {
model, err := simplejson.NewJson(query.JSON)
if err != nil {
return nil, fmt.Errorf("couldn't unmarshal query")
@ -53,6 +55,14 @@ func QueryParse(query backend.DataQuery) (*Query, error) {
interval = minInterval
}
var statement influxql.Statement
if useRawQuery {
statement, err = influxql.ParseStatement(rawQuery)
if err != nil {
logger.Debug(fmt.Sprintf("Couldn't parse raw query: %v", err), "rawQuery", rawQuery)
}
}
return &Query{
Measurement: measurement,
Policy: policy,
@ -68,6 +78,7 @@ func QueryParse(query backend.DataQuery) (*Query, error) {
Slimit: slimit,
OrderByTime: orderByTime,
ResultFormat: resultFormat,
Statement: statement,
}, nil
}

@ -108,7 +108,7 @@ func TestInfluxdbQueryParser_Parse(t *testing.T) {
Interval: time.Second * 20,
}
res, err := QueryParse(query)
res, err := QueryParse(query, nil)
require.NoError(t, err)
require.Len(t, res.GroupBy, 3)
require.Len(t, res.Selects, 3)
@ -140,7 +140,7 @@ func TestInfluxdbQueryParser_Parse(t *testing.T) {
],
"interval": ">10s",
"policy": "default",
"query": "RawDummyQuery",
"query": "SELECT \"value\" FROM \"measurement\"",
"rawQuery": true,
"refId": "A",
"resultFormat": "time_series",
@ -171,9 +171,9 @@ func TestInfluxdbQueryParser_Parse(t *testing.T) {
Interval: time.Second * 10,
}
res, err := QueryParse(query)
res, err := QueryParse(query, nil)
require.NoError(t, err)
require.Equal(t, "RawDummyQuery", res.RawQuery)
require.Equal(t, `SELECT "value" FROM "measurement"`, res.RawQuery)
require.Len(t, res.GroupBy, 2)
require.Len(t, res.Selects, 1)
require.Empty(t, res.Tags)
@ -183,7 +183,7 @@ func TestInfluxdbQueryParser_Parse(t *testing.T) {
t.Run("will enforce a minInterval of 1 millisecond", func(t *testing.T) {
json := `
{
"query": "RawDummyQuery",
"query": "SELECT \"value\" FROM \"measurement\"",
"rawQuery": true,
"resultFormat": "time_series"
}
@ -194,7 +194,7 @@ func TestInfluxdbQueryParser_Parse(t *testing.T) {
Interval: time.Millisecond * 0,
}
res, err := QueryParse(query)
res, err := QueryParse(query, nil)
require.NoError(t, err)
require.Equal(t, time.Millisecond*1, res.Interval)
})

@ -1,6 +1,10 @@
package models
import "time"
import (
"time"
"github.com/influxdata/influxql"
)
type Query struct {
Measurement string
@ -18,6 +22,7 @@ type Query struct {
OrderByTime string
RefID string
ResultFormat string
Statement influxql.Statement
}
type Tag struct {

Loading…
Cancel
Save