diff --git a/go.mod b/go.mod index fc541b20c2a..2468e344503 100644 --- a/go.mod +++ b/go.mod @@ -111,7 +111,7 @@ require ( github.com/hashicorp/golang-lru/v2 v2.0.7 // @grafana/alerting-backend github.com/hashicorp/hcl/v2 v2.17.0 // @grafana/alerting-backend github.com/huandu/xstrings v1.5.0 // @grafana/partner-datasources - github.com/influxdata/influxdb-client-go/v2 v2.13.0 // @grafana/observability-metrics + github.com/influxdata/influxdb-client-go/v2 v2.13.0 // @grafana/partner-datasources github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf // @grafana/grafana-app-platform-squad github.com/jmespath/go-jmespath v0.4.0 // indirect; @grafana/grafana-backend-group github.com/jmoiron/sqlx v1.3.5 // @grafana/grafana-backend-group @@ -470,6 +470,7 @@ require ( require ( github.com/getkin/kin-openapi v0.128.0 // @grafana/grafana-app-platform-squad github.com/grafana/grafana/apps/playlist v0.0.0-20241105090059-facca37f4d1f // @grafana/grafana-app-platform-squad + github.com/influxdata/influxql v1.4.0 // @grafana/partner-datasources ) require github.com/jmespath-community/go-jmespath v1.1.1 // @grafana/identity-access-team diff --git a/go.sum b/go.sum index 1f27f212e76..c9b6e9ac46b 100644 --- a/go.sum +++ b/go.sum @@ -2528,6 +2528,8 @@ github.com/influxdata/influxdb-client-go/v2 v2.13.0 h1:ioBbLmR5NMbAjP4UVA5r9b5xG github.com/influxdata/influxdb-client-go/v2 v2.13.0/go.mod h1:k+spCbt9hcvqvUiz0sr5D8LolXHqAAOfPw9v/RIRHl4= github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= +github.com/influxdata/influxql v1.4.0 h1:Lf62rbAF8KWQf+4Djqf4hVXgmQuGozUoSD6kNWjye44= +github.com/influxdata/influxql v1.4.0/go.mod h1:VqxAKyQz5p8GzgGsxWalCWYGxEqw6kvJo2IickMQiQk= github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf h1:7JTmneyiNEwVBOHSjoMxiWAqB992atOeepeFYegn5RU= github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= github.com/invopop/jsonschema v0.12.0 h1:6ovsNSuvn9wEQVOyc72aycBMVQFKz7cPdMJn10CvzRI= diff --git a/pkg/tsdb/influxdb/influxql/buffered/response_parser.go b/pkg/tsdb/influxdb/influxql/buffered/response_parser.go index 6d208d324ad..3d63de7cc61 100644 --- a/pkg/tsdb/influxdb/influxql/buffered/response_parser.go +++ b/pkg/tsdb/influxdb/influxql/buffered/response_parser.go @@ -9,6 +9,7 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/influxdata/influxql" jsoniter "github.com/json-iterator/go" "github.com/grafana/grafana/pkg/tsdb/influxdb/influxql/util" @@ -346,23 +347,38 @@ func newFrameWithTimeField(row models.Row, column string, colIndex int, query mo func newFrameWithoutTimeField(row models.Row, query models.Query) *data.Frame { var values []*string - for _, valuePair := range row.Values { - if strings.Contains(strings.ToLower(query.RawQuery), strings.ToLower("CARDINALITY")) { - values = append(values, util.ParseString(valuePair[0])) - } else { - if strings.Contains(strings.ToLower(query.RawQuery), strings.ToLower("SHOW TAG VALUES")) { - if len(valuePair) >= 2 { - values = append(values, util.ParseString(valuePair[1])) - } - } else if strings.Contains(strings.ToLower(query.RawQuery), strings.ToLower("SHOW DIAGNOSTICS")) { - // https://docs.influxdata.com/platform/monitoring/influxdata-platform/tools/show-diagnostics/ - for _, vp := range valuePair { - values = append(values, util.ParseString(vp)) - } - } else { - if len(valuePair) >= 1 { - values = append(values, util.ParseString(valuePair[0])) - } + switch query.Statement.(type) { + case *influxql.ShowMeasurementCardinalityStatement, + *influxql.ShowSeriesCardinalityStatement, + *influxql.ShowFieldKeyCardinalityStatement, + *influxql.ShowTagValuesCardinalityStatement, + *influxql.ShowTagKeyCardinalityStatement: + // Handle all CARDINALITY queries + for _, valuePair := range row.Values { + if len(valuePair) >= 1 { + values = append(values, util.ParseString(valuePair[0])) + } + } + case *influxql.ShowDiagnosticsStatement: + // Handle SHOW DIAGNOSTICS + // https://docs.influxdata.com/platform/monitoring/influxdata-platform/tools/show-diagnostics/ + for _, valuePair := range row.Values { + for _, vp := range valuePair { + values = append(values, util.ParseString(vp)) + } + } + case *influxql.ShowTagValuesStatement: + // Handle SHOW TAG VALUES (non-CARDINALITY) + for _, valuePair := range row.Values { + if len(valuePair) >= 2 { + values = append(values, util.ParseString(valuePair[1])) + } + } + default: + // Handle other queries + for _, valuePair := range row.Values { + if len(valuePair) >= 1 { + values = append(values, util.ParseString(valuePair[0])) } } } diff --git a/pkg/tsdb/influxdb/influxql/buffered/response_parser_test.go b/pkg/tsdb/influxdb/influxql/buffered/response_parser_test.go index b1ec590c2d5..e2039c390b0 100644 --- a/pkg/tsdb/influxdb/influxql/buffered/response_parser_test.go +++ b/pkg/tsdb/influxdb/influxql/buffered/response_parser_test.go @@ -13,6 +13,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana-plugin-sdk-go/experimental" + "github.com/influxdata/influxql" "github.com/stretchr/testify/require" "github.com/grafana/grafana/pkg/tsdb/influxdb/influxql/util" @@ -34,11 +35,13 @@ func readJsonFile(filePath string) io.ReadCloser { } func generateQuery(query, resFormat, alias string) *models.Query { + statement, _ := influxql.ParseStatement(query) return &models.Query{ RawQuery: query, UseRawQuery: true, Alias: alias, ResultFormat: resFormat, + Statement: statement, } } @@ -331,6 +334,13 @@ func TestInfluxdbResponseParser(t *testing.T) { }) }) + t.Run("create frames for tag values and without time column even the query string has cardinality as string", func(t *testing.T) { + res := ResponseParse(readJsonFile("show_tag_values_response"), 200, generateQuery("SHOW TAG VALUES FROM custom_influxdb_cardinality WITH KEY = \"database\"", "time_series", "")) + require.NoError(t, res.Error) + require.Equal(t, "Value", res.Frames[0].Fields[0].Name) + require.Equal(t, "cpu-total", *res.Frames[0].Fields[0].At(0).(*string)) + }) + t.Run("Influxdb response parser with errors", func(t *testing.T) { result := ResponseParse(readJsonFile("error_response"), 200, generateQuery("Test raw query", "time_series", "")) diff --git a/pkg/tsdb/influxdb/influxql/converter/converter.go b/pkg/tsdb/influxdb/influxql/converter/converter.go index fdb49c30902..b0fe001c284 100644 --- a/pkg/tsdb/influxdb/influxql/converter/converter.go +++ b/pkg/tsdb/influxdb/influxql/converter/converter.go @@ -4,12 +4,12 @@ import ( "errors" "fmt" "strconv" - "strings" "time" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" sdkjsoniter "github.com/grafana/grafana-plugin-sdk-go/data/utils/jsoniter" + "github.com/influxdata/influxql" jsoniter "github.com/json-iterator/go" "github.com/grafana/grafana/pkg/tsdb/influxdb/influxql/util" @@ -382,8 +382,13 @@ func handleTimeSeriesFormatWithTimeColumn(valueFields data.Fields, tags map[stri } func handleTimeSeriesFormatWithoutTimeColumn(valueFields data.Fields, columns []string, measurement string, query *models.Query) *data.Frame { - // Frame without time column - if strings.Contains(strings.ToLower(query.RawQuery), strings.ToLower("CARDINALITY")) { + switch query.Statement.(type) { + case *influxql.ShowMeasurementCardinalityStatement, + *influxql.ShowSeriesCardinalityStatement, + *influxql.ShowFieldKeyCardinalityStatement, + *influxql.ShowTagValuesCardinalityStatement, + *influxql.ShowTagKeyCardinalityStatement: + // Handle all CARDINALITY queries var stringArray []*string for _, v := range valueFields { if f, ok := v.At(0).(*float64); ok { @@ -394,14 +399,15 @@ func handleTimeSeriesFormatWithoutTimeColumn(valueFields data.Fields, columns [] } } return data.NewFrame(measurement, data.NewField("Value", nil, stringArray)) - } - if len(columns) >= 2 && strings.Contains(strings.ToLower(query.RawQuery), strings.ToLower("SHOW TAG VALUES")) { + + case *influxql.ShowTagValuesStatement: + // Handle SHOW TAG VALUES (non-CARDINALITY) return data.NewFrame(measurement, valueFields[1]) - } - if len(columns) >= 1 { + + default: + // Handle generic queries with at least one column return data.NewFrame(measurement, valueFields[0]) } - return nil } func handleTableFormatFirstFrame(rsp *backend.DataResponse, measurement string, query *models.Query) { diff --git a/pkg/tsdb/influxdb/influxql/influxql.go b/pkg/tsdb/influxdb/influxql/influxql.go index 72053844cbb..b4d03ce6064 100644 --- a/pkg/tsdb/influxdb/influxql/influxql.go +++ b/pkg/tsdb/influxdb/influxql/influxql.go @@ -49,7 +49,7 @@ func Query(ctx context.Context, tracer trace.Tracer, dsInfo *models.DatasourceIn responseLock := sync.Mutex{} err = concurrency.ForEachJob(ctx, len(req.Queries), concurrentQueryCount, func(ctx context.Context, idx int) error { reqQuery := req.Queries[idx] - query, err := models.QueryParse(reqQuery) + query, err := models.QueryParse(reqQuery, logger) if err != nil { return err } @@ -88,7 +88,7 @@ func Query(ctx context.Context, tracer trace.Tracer, dsInfo *models.DatasourceIn } } else { for _, reqQuery := range req.Queries { - query, err := models.QueryParse(reqQuery) + query, err := models.QueryParse(reqQuery, logger) if err != nil { return &backend.QueryDataResponse{}, err } diff --git a/pkg/tsdb/influxdb/influxql/querydata/stream_parser_test.go b/pkg/tsdb/influxdb/influxql/querydata/stream_parser_test.go index 3c6222ea824..4a56eeaf5ba 100644 --- a/pkg/tsdb/influxdb/influxql/querydata/stream_parser_test.go +++ b/pkg/tsdb/influxdb/influxql/querydata/stream_parser_test.go @@ -10,6 +10,7 @@ import ( "testing" "github.com/grafana/grafana-plugin-sdk-go/experimental" + "github.com/influxdata/influxql" "github.com/stretchr/testify/require" "github.com/grafana/grafana/pkg/tsdb/influxdb/models" @@ -30,11 +31,13 @@ func readJsonFile(filePath string) io.ReadCloser { } func generateQuery(query, resFormat, alias string) *models.Query { + statement, _ := influxql.ParseStatement(query) return &models.Query{ RawQuery: query, UseRawQuery: true, Alias: alias, ResultFormat: resFormat, + Statement: statement, } } @@ -104,6 +107,13 @@ func TestParsingAsTimeSeriesWithoutTimeColumn(t *testing.T) { runQuery(t, f, "cardinality", "time_series", query) }) + + t.Run("create frames for tag values and without time column even the query string has cardinality as string", func(t *testing.T) { + res := ResponseParse(readJsonFile("show_tag_values_response"), 200, generateQuery("SHOW TAG VALUES FROM custom_influxdb_cardinality WITH KEY = \"database\"", "time_series", "")) + require.NoError(t, res.Error) + require.Equal(t, "Value", res.Frames[0].Fields[0].Name) + require.Equal(t, "cpu-total", *res.Frames[0].Fields[0].At(0).(*string)) + }) } func TestInfluxDBStreamingParser(t *testing.T) { diff --git a/pkg/tsdb/influxdb/models/model_parser.go b/pkg/tsdb/influxdb/models/model_parser.go index 06178f9f85b..011066b4ef9 100644 --- a/pkg/tsdb/influxdb/models/model_parser.go +++ b/pkg/tsdb/influxdb/models/model_parser.go @@ -6,13 +6,15 @@ import ( "time" "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/influxdata/influxql" "github.com/grafana/grafana/pkg/components/simplejson" + "github.com/grafana/grafana/pkg/infra/log" ) type InfluxdbQueryParser struct{} -func QueryParse(query backend.DataQuery) (*Query, error) { +func QueryParse(query backend.DataQuery, logger log.Logger) (*Query, error) { model, err := simplejson.NewJson(query.JSON) if err != nil { return nil, fmt.Errorf("couldn't unmarshal query") @@ -53,6 +55,14 @@ func QueryParse(query backend.DataQuery) (*Query, error) { interval = minInterval } + var statement influxql.Statement + if useRawQuery { + statement, err = influxql.ParseStatement(rawQuery) + if err != nil { + logger.Debug(fmt.Sprintf("Couldn't parse raw query: %v", err), "rawQuery", rawQuery) + } + } + return &Query{ Measurement: measurement, Policy: policy, @@ -68,6 +78,7 @@ func QueryParse(query backend.DataQuery) (*Query, error) { Slimit: slimit, OrderByTime: orderByTime, ResultFormat: resultFormat, + Statement: statement, }, nil } diff --git a/pkg/tsdb/influxdb/models/model_parser_test.go b/pkg/tsdb/influxdb/models/model_parser_test.go index 2396b110015..1d274a1c3d5 100644 --- a/pkg/tsdb/influxdb/models/model_parser_test.go +++ b/pkg/tsdb/influxdb/models/model_parser_test.go @@ -108,7 +108,7 @@ func TestInfluxdbQueryParser_Parse(t *testing.T) { Interval: time.Second * 20, } - res, err := QueryParse(query) + res, err := QueryParse(query, nil) require.NoError(t, err) require.Len(t, res.GroupBy, 3) require.Len(t, res.Selects, 3) @@ -140,7 +140,7 @@ func TestInfluxdbQueryParser_Parse(t *testing.T) { ], "interval": ">10s", "policy": "default", - "query": "RawDummyQuery", + "query": "SELECT \"value\" FROM \"measurement\"", "rawQuery": true, "refId": "A", "resultFormat": "time_series", @@ -171,9 +171,9 @@ func TestInfluxdbQueryParser_Parse(t *testing.T) { Interval: time.Second * 10, } - res, err := QueryParse(query) + res, err := QueryParse(query, nil) require.NoError(t, err) - require.Equal(t, "RawDummyQuery", res.RawQuery) + require.Equal(t, `SELECT "value" FROM "measurement"`, res.RawQuery) require.Len(t, res.GroupBy, 2) require.Len(t, res.Selects, 1) require.Empty(t, res.Tags) @@ -183,7 +183,7 @@ func TestInfluxdbQueryParser_Parse(t *testing.T) { t.Run("will enforce a minInterval of 1 millisecond", func(t *testing.T) { json := ` { - "query": "RawDummyQuery", + "query": "SELECT \"value\" FROM \"measurement\"", "rawQuery": true, "resultFormat": "time_series" } @@ -194,7 +194,7 @@ func TestInfluxdbQueryParser_Parse(t *testing.T) { Interval: time.Millisecond * 0, } - res, err := QueryParse(query) + res, err := QueryParse(query, nil) require.NoError(t, err) require.Equal(t, time.Millisecond*1, res.Interval) }) diff --git a/pkg/tsdb/influxdb/models/models.go b/pkg/tsdb/influxdb/models/models.go index 2afab2f848b..2216006cfe4 100644 --- a/pkg/tsdb/influxdb/models/models.go +++ b/pkg/tsdb/influxdb/models/models.go @@ -1,6 +1,10 @@ package models -import "time" +import ( + "time" + + "github.com/influxdata/influxql" +) type Query struct { Measurement string @@ -18,6 +22,7 @@ type Query struct { OrderByTime string RefID string ResultFormat string + Statement influxql.Statement } type Tag struct {