From 1302ee48b994cfb2243fbc595459d5ef293e712a Mon Sep 17 00:00:00 2001 From: Andreas Christou Date: Tue, 25 Feb 2025 14:59:58 +0000 Subject: [PATCH] OpenTSDB: Support v2.4 (#100673) * Add version 2.4 to frontend * Update settings and types - Set all properties on backend for consistency * Update query logic to parse new and old format - Minor naming updates - Extract logic for initial frame creation - When parsing old api responses, ensure data is in ascending order - Update tests * Update docs and provisioning file * Fix lint * Update docs/sources/datasources/opentsdb/_index.md Co-authored-by: Larissa Wandzura <126723338+lwandz13@users.noreply.github.com> * Update docs/sources/datasources/opentsdb/_index.md Co-authored-by: Larissa Wandzura <126723338+lwandz13@users.noreply.github.com> * Review nit --------- Co-authored-by: Larissa Wandzura <126723338+lwandz13@users.noreply.github.com> --- devenv/datasources.yaml | 8 + docs/sources/datasources/opentsdb/_index.md | 8 +- pkg/tsdb/opentsdb/opentsdb.go | 148 +++++++++++++----- pkg/tsdb/opentsdb/opentsdb_test.go | 129 ++++++++++++++- pkg/tsdb/opentsdb/types.go | 15 +- .../opentsdb/components/OpenTsdbDetails.tsx | 1 + 6 files changed, 263 insertions(+), 46 deletions(-) diff --git a/devenv/datasources.yaml b/devenv/datasources.yaml index 452d2d3d1bc..00efa243c1c 100644 --- a/devenv/datasources.yaml +++ b/devenv/datasources.yaml @@ -136,6 +136,14 @@ datasources: tsdbResolution: 1 tsdbVersion: 3 + - name: gdev-opentsdb-v2.4 + type: opentsdb + access: proxy + url: http://localhost:4242 + jsonData: + tsdbResolution: 1 + tsdbVersion: 4 + - name: gdev-elasticsearch type: elasticsearch uid: gdev-elasticsearch diff --git a/docs/sources/datasources/opentsdb/_index.md b/docs/sources/datasources/opentsdb/_index.md index 560ba731bb5..07a5a66add0 100644 --- a/docs/sources/datasources/opentsdb/_index.md +++ b/docs/sources/datasources/opentsdb/_index.md @@ -62,7 +62,7 @@ To configure basic settings for the data source, complete the following steps: | **Default** | Default data source that will be be pre-selected for new panels. | | **URL** | The HTTP protocol, IP, and port of your OpenTSDB server (default port is usually 4242). | | **Allowed cookies** | Listing of cookies to forward to the data source. | -| **Version** | The OpenTSDB version. | +| **Version** | The OpenTSDB version (supported versions are: 2.4, 2.3, 2.2 and versions less than 2.1). | | **Resolution** | Metrics from OpenTSDB may have data points with either second or millisecond resolution. | | **Lookup limit** | Default is 1000. | @@ -98,9 +98,13 @@ can be used to query OpenTSDB. Fill Policy is also introduced in OpenTSDB 2.2. While using OpenTSDB 2.2 data source, make sure you use either Filters or Tags as they are mutually exclusive. If used together, might give you weird results. {{% /admonition %}} +{{% admonition type="note" %}} +When using OpenTSDB 2.4 with alerting, queries are executed with the parameter `arrays=true`. This causes OpenTSDB to return data points as an array of arrays instead of a map of key-value pairs. Grafana then converts this data into the appropriate data frame format. +{{% /admonition %}} + ### Auto complete suggestions -As soon as you start typing metric names, tag names and tag values , you should see highlighted auto complete suggestions for them. +As you begin typing metric names, tag names, or tag values, highlighted autocomplete suggestions will appear. The autocomplete only works if the OpenTSDB suggest API is enabled. ## Templating queries diff --git a/pkg/tsdb/opentsdb/opentsdb.go b/pkg/tsdb/opentsdb/opentsdb.go index 15a1fe27046..798b27e57ab 100644 --- a/pkg/tsdb/opentsdb/opentsdb.go +++ b/pkg/tsdb/opentsdb/opentsdb.go @@ -8,6 +8,8 @@ import ( "net/http" "net/url" "path" + "sort" + "strconv" "strings" "time" @@ -35,12 +37,21 @@ func ProvideService(httpClientProvider httpclient.Provider) *Service { } type datasourceInfo struct { - HTTPClient *http.Client - URL string + HTTPClient *http.Client + URL string + TSDBVersion float32 + TSDBResolution int32 + LookupLimit int32 } type DsAccess string +type JSONData struct { + TSDBVersion float32 `json:"tsdbVersion"` + TSDBResolution int32 `json:"tsdbResolution"` + LookupLimit int32 `json:"lookupLimit"` +} + func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.InstanceFactoryFunc { return func(ctx context.Context, settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { opts, err := settings.HTTPClientOptions(ctx) @@ -53,9 +64,18 @@ func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.Inst return nil, err } + jsonData := JSONData{} + err = json.Unmarshal(settings.JSONData, &jsonData) + if err != nil { + return nil, fmt.Errorf("error reading settings: %w", err) + } + model := &datasourceInfo{ - HTTPClient: client, - URL: settings.URL, + HTTPClient: client, + URL: settings.URL, + TSDBVersion: jsonData.TSDBVersion, + TSDBResolution: jsonData.TSDBResolution, + LookupLimit: jsonData.LookupLimit, } return model, nil @@ -69,7 +89,7 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) q := req.Queries[0] - myRefID := q.RefID + refID := q.RefID tsdbQuery.Start = q.TimeRange.From.UnixNano() / int64(time.Millisecond) tsdbQuery.End = q.TimeRange.To.UnixNano() / int64(time.Millisecond) @@ -106,7 +126,7 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) } }() - result, err := s.parseResponse(logger, res, myRefID) + result, err := s.parseResponse(logger, res, refID, dsInfo.TSDBVersion) if err != nil { return &backend.QueryDataResponse{}, err } @@ -120,9 +140,11 @@ func (s *Service) createRequest(ctx context.Context, logger log.Logger, dsInfo * return nil, err } u.Path = path.Join(u.Path, "api/query") - queryParams := u.Query() - queryParams.Set("arrays", "true") - u.RawQuery = queryParams.Encode() + if dsInfo.TSDBVersion == 4 { + queryParams := u.Query() + queryParams.Set("arrays", "true") + u.RawQuery = queryParams.Encode() + } postData, err := json.Marshal(data) if err != nil { @@ -140,7 +162,67 @@ func (s *Service) createRequest(ctx context.Context, logger log.Logger, dsInfo * return req, nil } -func (s *Service) parseResponse(logger log.Logger, res *http.Response, myRefID string) (*backend.QueryDataResponse, error) { +func createInitialFrame(val OpenTsdbCommon, length int, refID string) *data.Frame { + labels := data.Labels{} + for label, value := range val.Tags { + labels[label] = value + } + + frame := data.NewFrameOfFieldTypes(val.Metric, length, data.FieldTypeTime, data.FieldTypeFloat64) + frame.Meta = &data.FrameMeta{Type: data.FrameTypeTimeSeriesMulti, TypeVersion: data.FrameTypeVersion{0, 1}} + frame.RefID = refID + timeField := frame.Fields[0] + timeField.Name = data.TimeSeriesTimeFieldName + dataField := frame.Fields[1] + dataField.Name = val.Metric + dataField.Labels = labels + + return frame +} + +// Parse response function for OpenTSDB version 2.4 +func parseResponse24(responseData []OpenTsdbResponse24, refID string, frames data.Frames) data.Frames { + for _, val := range responseData { + frame := createInitialFrame(val.OpenTsdbCommon, len(val.DataPoints), refID) + + for i, point := range val.DataPoints { + frame.SetRow(i, time.Unix(int64(point[0]), 0).UTC(), point[1]) + } + + frames = append(frames, frame) + } + + return frames +} + +// Parse response function for OpenTSDB versions < 2.4 +func parseResponseLT24(responseData []OpenTsdbResponse, refID string, frames data.Frames) (data.Frames, error) { + for _, val := range responseData { + frame := createInitialFrame(val.OpenTsdbCommon, len(val.DataPoints), refID) + + // Order the timestamps in ascending order to avoid issues like https://github.com/grafana/grafana/issues/38729 + timestamps := make([]string, 0, len(val.DataPoints)) + for timestamp := range val.DataPoints { + timestamps = append(timestamps, timestamp) + } + sort.Strings(timestamps) + + for i, timeString := range timestamps { + timestamp, err := strconv.ParseInt(timeString, 10, 64) + if err != nil { + logger.Info("Failed to unmarshal opentsdb timestamp", "timestamp", timeString) + return frames, err + } + frame.SetRow(i, time.Unix(timestamp, 0).UTC(), val.DataPoints[timeString]) + } + + frames = append(frames, frame) + } + + return frames, nil +} + +func (s *Service) parseResponse(logger log.Logger, res *http.Response, refID string, tsdbVersion float32) (*backend.QueryDataResponse, error) { resp := backend.NewQueryDataResponse() body, err := io.ReadAll(res.Body) @@ -158,38 +240,34 @@ func (s *Service) parseResponse(logger log.Logger, res *http.Response, myRefID s return nil, fmt.Errorf("request failed, status: %s", res.Status) } + frames := data.Frames{} + var responseData []OpenTsdbResponse - err = json.Unmarshal(body, &responseData) - if err != nil { - logger.Info("Failed to unmarshal opentsdb response", "error", err, "status", res.Status, "body", string(body)) - return nil, err - } + var responseData24 []OpenTsdbResponse24 + if tsdbVersion == 4 { + err = json.Unmarshal(body, &responseData24) + if err != nil { + logger.Info("Failed to unmarshal opentsdb response", "error", err, "status", res.Status, "body", string(body)) + return nil, err + } - frames := data.Frames{} - for _, val := range responseData { - labels := data.Labels{} - for label, value := range val.Tags { - labels[label] = value + frames = parseResponse24(responseData24, refID, frames) + } else { + err = json.Unmarshal(body, &responseData) + if err != nil { + logger.Info("Failed to unmarshal opentsdb response", "error", err, "status", res.Status, "body", string(body)) + return nil, err } - frame := data.NewFrameOfFieldTypes(val.Metric, len(val.DataPoints), data.FieldTypeTime, data.FieldTypeFloat64) - frame.Meta = &data.FrameMeta{Type: data.FrameTypeTimeSeriesMulti, TypeVersion: data.FrameTypeVersion{0, 1}} - frame.RefID = myRefID - timeField := frame.Fields[0] - timeField.Name = data.TimeSeriesTimeFieldName - dataField := frame.Fields[1] - dataField.Name = "value" - dataField.Labels = labels - - points := val.DataPoints - for i, point := range points { - frame.SetRow(i, time.Unix(int64(point[0]), 0).UTC(), point[1]) + frames, err = parseResponseLT24(responseData, refID, frames) + if err != nil { + return nil, err } - frames = append(frames, frame) } - result := resp.Responses[myRefID] + + result := resp.Responses[refID] result.Frames = frames - resp.Responses[myRefID] = result + resp.Responses[refID] = result return resp, nil } diff --git a/pkg/tsdb/opentsdb/opentsdb_test.go b/pkg/tsdb/opentsdb/opentsdb_test.go index 6c5e142c250..44c21f6df15 100644 --- a/pkg/tsdb/opentsdb/opentsdb_test.go +++ b/pkg/tsdb/opentsdb/opentsdb_test.go @@ -33,12 +33,13 @@ func TestOpenTsdbExecutor(t *testing.T) { t.Run("Parse response should handle invalid JSON", func(t *testing.T) { response := `{ invalid }` - result, err := service.parseResponse(logger, &http.Response{Body: io.NopCloser(strings.NewReader(response))}, "A") + tsdbVersion := float32(4) + result, err := service.parseResponse(logger, &http.Response{Body: io.NopCloser(strings.NewReader(response))}, "A", tsdbVersion) require.Nil(t, result) require.Error(t, err) }) - t.Run("Parse response should handle JSON", func(t *testing.T) { + t.Run("Parse response should handle JSON (v2.4 and above)", func(t *testing.T) { response := ` [ { @@ -57,7 +58,7 @@ func TestOpenTsdbExecutor(t *testing.T) { data.NewField("Time", nil, []time.Time{ time.Date(2014, 7, 16, 20, 55, 46, 0, time.UTC), }), - data.NewField("value", map[string]string{"env": "prod", "app": "grafana"}, []float64{ + data.NewField("test", map[string]string{"env": "prod", "app": "grafana"}, []float64{ 50}), ) testFrame.Meta = &data.FrameMeta{ @@ -65,10 +66,124 @@ func TestOpenTsdbExecutor(t *testing.T) { TypeVersion: data.FrameTypeVersion{0, 1}, } testFrame.RefID = "A" + tsdbVersion := float32(4) resp := http.Response{Body: io.NopCloser(strings.NewReader(response))} resp.StatusCode = 200 - result, err := service.parseResponse(logger, &resp, "A") + result, err := service.parseResponse(logger, &resp, "A", tsdbVersion) + require.NoError(t, err) + + frame := result.Responses["A"] + + if diff := cmp.Diff(testFrame, frame.Frames[0], data.FrameTestCompareOptions()...); diff != "" { + t.Errorf("Result mismatch (-want +got):\n%s", diff) + } + }) + + t.Run("Parse response should handle JSON (v2.3 and below)", func(t *testing.T) { + response := ` + [ + { + "metric": "test", + "dps": { + "1405544146": 50.0 + }, + "tags" : { + "env": "prod", + "app": "grafana" + } + } + ]` + + testFrame := data.NewFrame("test", + data.NewField("Time", nil, []time.Time{ + time.Date(2014, 7, 16, 20, 55, 46, 0, time.UTC), + }), + data.NewField("test", map[string]string{"env": "prod", "app": "grafana"}, []float64{ + 50}), + ) + testFrame.Meta = &data.FrameMeta{ + Type: data.FrameTypeTimeSeriesMulti, + TypeVersion: data.FrameTypeVersion{0, 1}, + } + testFrame.RefID = "A" + tsdbVersion := float32(3) + + resp := http.Response{Body: io.NopCloser(strings.NewReader(response))} + resp.StatusCode = 200 + result, err := service.parseResponse(logger, &resp, "A", tsdbVersion) + require.NoError(t, err) + + frame := result.Responses["A"] + + if diff := cmp.Diff(testFrame, frame.Frames[0], data.FrameTestCompareOptions()...); diff != "" { + t.Errorf("Result mismatch (-want +got):\n%s", diff) + } + }) + + t.Run("Parse response should handle unordered JSON (v2.3 and below)", func(t *testing.T) { + response := ` + [ + { + "metric": "test", + "dps": { + "1405094109": 55.0, + "1405124146": 124.0, + "1405124212": 1284.0, + "1405019246": 50.0, + "1408352146": 812.0, + "1405534153": 153.0, + "1405124397": 9035.0, + "1401234774": 215.0, + "1409712532": 356.0, + "1491523811": 8953.0, + "1405239823": 258.0 + }, + "tags" : { + "env": "prod", + "app": "grafana" + } + } + ]` + + testFrame := data.NewFrame("test", + data.NewField("Time", nil, []time.Time{ + time.Date(2014, 5, 27, 23, 52, 54, 0, time.UTC), + time.Date(2014, 7, 10, 19, 7, 26, 0, time.UTC), + time.Date(2014, 7, 11, 15, 55, 9, 0, time.UTC), + time.Date(2014, 7, 12, 0, 15, 46, 0, time.UTC), + time.Date(2014, 7, 12, 0, 16, 52, 0, time.UTC), + time.Date(2014, 7, 12, 0, 19, 57, 0, time.UTC), + time.Date(2014, 7, 13, 8, 23, 43, 0, time.UTC), + time.Date(2014, 7, 16, 18, 9, 13, 0, time.UTC), + time.Date(2014, 8, 18, 8, 55, 46, 0, time.UTC), + time.Date(2014, 9, 3, 2, 48, 52, 0, time.UTC), + time.Date(2017, 4, 7, 0, 10, 11, 0, time.UTC), + }), + data.NewField("test", map[string]string{"env": "prod", "app": "grafana"}, []float64{ + 215, + 50, + 55, + 124, + 1284, + 9035, + 258, + 153, + 812, + 356, + 8953, + }), + ) + testFrame.Meta = &data.FrameMeta{ + Type: data.FrameTypeTimeSeriesMulti, + TypeVersion: data.FrameTypeVersion{0, 1}, + } + testFrame.RefID = "A" + tsdbVersion := float32(3) + + resp := http.Response{Body: io.NopCloser(strings.NewReader(response))} + resp.StatusCode = 200 + result, err := service.parseResponse(logger, &resp, "A", tsdbVersion) require.NoError(t, err) frame := result.Responses["A"] @@ -99,7 +214,7 @@ func TestOpenTsdbExecutor(t *testing.T) { data.NewField("Time", nil, []time.Time{ time.Date(2014, 7, 16, 20, 55, 46, 0, time.UTC), }), - data.NewField("value", map[string]string{"env": "prod", "app": "grafana"}, []float64{ + data.NewField("test", map[string]string{"env": "prod", "app": "grafana"}, []float64{ 50}), ) testFrame.Meta = &data.FrameMeta{ @@ -108,9 +223,11 @@ func TestOpenTsdbExecutor(t *testing.T) { } testFrame.RefID = myRefid + tsdbVersion := float32(4) + resp := http.Response{Body: io.NopCloser(strings.NewReader(response))} resp.StatusCode = 200 - result, err := service.parseResponse(logger, &resp, myRefid) + result, err := service.parseResponse(logger, &resp, myRefid, tsdbVersion) require.NoError(t, err) if diff := cmp.Diff(testFrame, result.Responses[myRefid].Frames[0], data.FrameTestCompareOptions()...); diff != "" { diff --git a/pkg/tsdb/opentsdb/types.go b/pkg/tsdb/opentsdb/types.go index 171a24e2067..19d2ba75197 100644 --- a/pkg/tsdb/opentsdb/types.go +++ b/pkg/tsdb/opentsdb/types.go @@ -6,8 +6,17 @@ type OpenTsdbQuery struct { Queries []map[string]any `json:"queries"` } +type OpenTsdbCommon struct { + Metric string `json:"metric"` + Tags map[string]string `json:"tags"` +} + type OpenTsdbResponse struct { - Metric string `json:"metric"` - Tags map[string]string `json:"tags"` - DataPoints [][]float64 `json:"dps"` + OpenTsdbCommon + DataPoints map[string]float64 `json:"dps"` +} + +type OpenTsdbResponse24 struct { + OpenTsdbCommon + DataPoints [][]float64 `json:"dps"` } diff --git a/public/app/plugins/datasource/opentsdb/components/OpenTsdbDetails.tsx b/public/app/plugins/datasource/opentsdb/components/OpenTsdbDetails.tsx index ba45bde7b29..2c913d55014 100644 --- a/public/app/plugins/datasource/opentsdb/components/OpenTsdbDetails.tsx +++ b/public/app/plugins/datasource/opentsdb/components/OpenTsdbDetails.tsx @@ -9,6 +9,7 @@ const tsdbVersions = [ { label: '<=2.1', value: 1 }, { label: '==2.2', value: 2 }, { label: '==2.3', value: 3 }, + { label: '==2.4', value: 4 }, ]; const tsdbResolutions = [