InfluxDB: Fix backend mode table result with aliases (#69943)

Co-authored-by: ludovio <ludovic.viaud@gmail.com>
pull/70950/head^2
ismail simsek 2 years ago committed by GitHub
parent 4217c8057b
commit 80c432e524
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 195
      pkg/tsdb/influxdb/response_parser.go
  2. 562
      public/app/plugins/datasource/influxdb/response_parser.test.ts
  3. 7
      public/app/plugins/datasource/influxdb/response_parser.ts

@ -70,7 +70,7 @@ func transformRows(rows []Row, query Query) data.Frames {
}
frames := make([]*data.Frame, 0, len(rows)+cols)
// frameName is pre-allocated so we can reuse it, saving memory.
// frameName is pre-allocated. So we can reuse it, saving memory.
// It's sized for a reasonably-large name, but will grow if needed.
frameName := make([]byte, 0, 128)
@ -87,111 +87,118 @@ func transformRows(rows []Row, query Query) data.Frames {
}
if !hasTimeCol {
var values []string
if retentionPolicyQuery {
values = make([]string, 1, len(row.Values))
} else {
values = make([]string, 0, len(row.Values))
}
for _, valuePair := range row.Values {
if tagValuesQuery {
if len(valuePair) >= 2 {
values = append(values, valuePair[1].(string))
}
} else if retentionPolicyQuery {
// We want to know whether the given retention policy is the default one or not.
// If it is default policy then we should add it to the beginning.
// The index 4 gives us if that policy is default or not.
// https://docs.influxdata.com/influxdb/v1.8/query_language/explore-schema/#show-retention-policies
// Only difference is v0.9. In that version we don't receive shardGroupDuration value.
// https://archive.docs.influxdata.com/influxdb/v0.9/query_language/schema_exploration/#show-retention-policies
// Since it is always the last value we will check that last value always.
if len(valuePair) >= 1 {
if valuePair[len(row.Columns)-1].(bool) {
values[0] = valuePair[0].(string)
} else {
values = append(values, valuePair[0].(string))
}
}
} else {
if len(valuePair) >= 1 {
values = append(values, valuePair[0].(string))
}
}
}
field := data.NewField("Value", nil, values)
frames = append(frames, data.NewFrame(row.Name, field))
newFrame := newFrameWithoutTimeField(row, retentionPolicyQuery, tagValuesQuery)
frames = append(frames, newFrame)
} else {
for colIndex, column := range row.Columns {
if column == "time" {
continue
}
newFrame := newFrameWithTimeField(row, column, colIndex, query, frameName)
frames = append(frames, newFrame)
}
}
}
var timeArray []time.Time
var floatArray []*float64
var stringArray []*string
var boolArray []*bool
valType := typeof(row.Values, colIndex)
for _, valuePair := range row.Values {
timestamp, timestampErr := parseTimestamp(valuePair[0])
// we only add this row if the timestamp is valid
if timestampErr == nil {
timeArray = append(timeArray, timestamp)
switch valType {
case "string":
{
value, chk := valuePair[colIndex].(string)
if chk {
stringArray = append(stringArray, &value)
} else {
stringArray = append(stringArray, nil)
}
}
case "json.Number":
value := parseNumber(valuePair[colIndex])
floatArray = append(floatArray, value)
case "bool":
value, chk := valuePair[colIndex].(bool)
if chk {
boolArray = append(boolArray, &value)
} else {
boolArray = append(boolArray, nil)
}
case "null":
floatArray = append(floatArray, nil)
}
}
}
return frames
}
name := string(formatFrameName(row, column, query, frameName[:]))
timeField := data.NewField("Time", nil, timeArray)
if valType == "string" {
valueField := data.NewField("Value", row.Tags, stringArray)
valueField.SetConfig(&data.FieldConfig{DisplayNameFromDS: name})
frames = append(frames, newDataFrame(name, query.RawQuery, timeField, valueField))
} else if valType == "json.Number" {
valueField := data.NewField("Value", row.Tags, floatArray)
valueField.SetConfig(&data.FieldConfig{DisplayNameFromDS: name})
frames = append(frames, newDataFrame(name, query.RawQuery, timeField, valueField))
} else if valType == "bool" {
valueField := data.NewField("Value", row.Tags, boolArray)
valueField.SetConfig(&data.FieldConfig{DisplayNameFromDS: name})
frames = append(frames, newDataFrame(name, query.RawQuery, timeField, valueField))
} else if valType == "null" {
valueField := data.NewField("Value", row.Tags, floatArray)
valueField.SetConfig(&data.FieldConfig{DisplayNameFromDS: name})
frames = append(frames, newDataFrame(name, query.RawQuery, timeField, valueField))
func newFrameWithTimeField(row Row, column string, colIndex int, query Query, frameName []byte) *data.Frame {
var timeArray []time.Time
var floatArray []*float64
var stringArray []*string
var boolArray []*bool
valType := typeof(row.Values, colIndex)
for _, valuePair := range row.Values {
timestamp, timestampErr := parseTimestamp(valuePair[0])
// we only add this row if the timestamp is valid
if timestampErr != nil {
continue
}
timeArray = append(timeArray, timestamp)
switch valType {
case "string":
value, ok := valuePair[colIndex].(string)
if ok {
stringArray = append(stringArray, &value)
} else {
stringArray = append(stringArray, nil)
}
case "json.Number":
value := parseNumber(valuePair[colIndex])
floatArray = append(floatArray, value)
case "bool":
value, ok := valuePair[colIndex].(bool)
if ok {
boolArray = append(boolArray, &value)
} else {
boolArray = append(boolArray, nil)
}
case "null":
floatArray = append(floatArray, nil)
}
}
timeField := data.NewField("Time", nil, timeArray)
var valueField *data.Field
switch valType {
case "string":
valueField = data.NewField("Value", row.Tags, stringArray)
case "json.Number":
valueField = data.NewField("Value", row.Tags, floatArray)
case "bool":
valueField = data.NewField("Value", row.Tags, boolArray)
case "null":
valueField = data.NewField("Value", row.Tags, floatArray)
}
name := string(formatFrameName(row, column, query, frameName[:]))
valueField.SetConfig(&data.FieldConfig{DisplayNameFromDS: name})
return newDataFrame(name, query.RawQuery, timeField, valueField)
}
func newFrameWithoutTimeField(row Row, retentionPolicyQuery bool, tagValuesQuery bool) *data.Frame {
var values []string
if retentionPolicyQuery {
values = make([]string, 1, len(row.Values))
} else {
values = make([]string, 0, len(row.Values))
}
for _, valuePair := range row.Values {
if tagValuesQuery {
if len(valuePair) >= 2 {
values = append(values, valuePair[1].(string))
}
} else if retentionPolicyQuery {
// We want to know whether the given retention policy is the default one or not.
// If it is default policy then we should add it to the beginning.
// The index 4 gives us if that policy is default or not.
// https://docs.influxdata.com/influxdb/v1.8/query_language/explore-schema/#show-retention-policies
// Only difference is v0.9. In that version we don't receive shardGroupDuration value.
// https://archive.docs.influxdata.com/influxdb/v0.9/query_language/schema_exploration/#show-retention-policies
// Since it is always the last value we will check that last value always.
if len(valuePair) >= 1 {
if valuePair[len(row.Columns)-1].(bool) {
values[0] = valuePair[0].(string)
} else {
values = append(values, valuePair[0].(string))
}
}
} else {
if len(valuePair) >= 1 {
values = append(values, valuePair[0].(string))
}
}
}
return frames
field := data.NewField("Value", nil, values)
return data.NewFrame(row.Name, field)
}
func newDataFrame(name string, queryString string, timeField *data.Field, valueField *data.Field) *data.Frame {

@ -1,7 +1,7 @@
import { size } from 'lodash';
import { of } from 'rxjs';
import { AnnotationEvent, DataQueryRequest, dateTime, FieldType, MutableDataFrame } from '@grafana/data';
import { AnnotationEvent, DataFrame, DataQueryRequest, dateTime, FieldType, MutableDataFrame } from '@grafana/data';
import { FetchResponse } from '@grafana/runtime';
import config from 'app/core/config';
import { backendSrv } from 'app/core/services/backend_srv'; // will use the version in __mocks__
@ -296,6 +296,27 @@ describe('influxdb response parser', () => {
});
});
describe('table with aliases', () => {
it('should parse the table with alias', () => {
const table = parser.getTable(mockDataFramesWithAlias, mockQuery, { preferredVisualisationType: 'table' });
expect(table.columns.length).toBe(4);
expect(table.columns[0].text).toBe('Time');
expect(table.columns[1].text).toBe('geohash');
expect(table.columns[2].text).toBe('ALIAS1');
expect(table.columns[3].text).toBe('ALIAS2');
});
it('should parse the table when there is no alias and two field selects', () => {
const table = parser.getTable(mockDataframesWithTwoFieldSelect, mockQueryWithTwoFieldSelect, {
preferredVisualisationType: 'table',
});
expect(table.columns.length).toBe(3);
expect(table.columns[0].text).toBe('Time');
expect(table.columns[1].text).toBe('mean');
expect(table.columns[2].text).toBe('mean_1');
});
});
describe('When issuing annotationQuery', () => {
const ctx = {
ds: getMockDS(getMockDSInstanceSettings()),
@ -330,114 +351,8 @@ describe('influxdb response parser', () => {
let response: AnnotationEvent[];
beforeEach(async () => {
const mockResponse: FetchResponse = {
config: { url: '' },
headers: new Headers(),
ok: false,
redirected: false,
status: 0,
statusText: '',
type: 'basic',
url: '',
data: {
results: {
metricFindQuery: {
frames: [
{
schema: {
name: 'logs.host',
fields: [
{
name: 'time',
type: 'time',
},
{
name: 'value',
type: 'string',
},
],
},
data: {
values: [
[1645208701000, 1645208702000],
['cbfa07e0e3bb 1', 'cbfa07e0e3bb 2'],
],
},
},
{
schema: {
name: 'logs.message',
fields: [
{
name: 'time',
type: 'time',
},
{
name: 'value',
type: 'string',
},
],
},
data: {
values: [
[1645208701000, 1645208702000],
[
'Station softwareupdated[447]: Adding client 1',
'Station softwareupdated[447]: Adding client 2',
],
],
},
},
{
schema: {
name: 'logs.path',
fields: [
{
name: 'time',
type: 'time',
},
{
name: 'value',
type: 'string',
},
],
},
data: {
values: [
[1645208701000, 1645208702000],
['/var/log/host/install.log 1', '/var/log/host/install.log 2'],
],
},
},
{
schema: {
name: 'textColumn',
fields: [
{
name: 'time',
type: 'time',
},
{
name: 'value',
type: 'string',
},
],
},
data: {
values: [
[1645208701000, 1645208702000],
['text 1', 'text 2'],
],
},
},
],
},
},
},
};
fetchMock.mockImplementation(() => {
return of(mockResponse);
return of(annotationMockResponse);
});
config.featureToggles.influxdbBackendMigration = true;
@ -459,3 +374,434 @@ describe('influxdb response parser', () => {
});
});
});
const mockQuery: InfluxQuery = {
datasource: {
type: 'influxdb',
uid: '12345',
},
groupBy: [
{
params: ['$__interval'],
type: 'time',
},
{
type: 'tag',
params: ['geohash::tag'],
},
{
params: ['null'],
type: 'fill',
},
],
measurement: 'cpu',
orderByTime: 'ASC',
policy: 'bar',
refId: 'A',
resultFormat: 'table',
select: [
[
{
type: 'field',
params: ['value'],
},
{
type: 'mean',
params: [],
},
{
type: 'alias',
params: ['ALIAS1'],
},
],
[
{
type: 'field',
params: ['value'],
},
{
type: 'mean',
params: [],
},
{
type: 'alias',
params: ['ALIAS2'],
},
],
],
tags: [],
};
const mockDataFramesWithAlias: DataFrame[] = [
{
name: 'cpu.ALIAS1 { geohash: tz6h548nc111 }',
refId: 'A',
meta: {
executedQueryString:
'SELECT mean("value") AS "ALIAS1", mean("value") AS "ALIAS2" FROM "bar"."cpu" WHERE time >= 1686582333244ms and time <= 1686583233244ms GROUP BY time(500ms), "geohash"::tag fill(null) ORDER BY time ASC',
},
fields: [
{
name: 'Time',
type: FieldType.time,
config: {},
values: [1686582333000, 1686582333500, 1686582334000],
},
{
name: 'Value',
type: FieldType.number,
labels: {
geohash: 'tz6h548nc111',
},
config: {
displayNameFromDS: 'cpu.ALIAS1 { geohash: tz6h548nc111 }',
},
values: [null, 111.98024577663908, null],
},
],
length: 1801,
},
{
name: 'cpu.ALIAS2 { geohash: tz6h548nc111 }',
refId: 'A',
meta: {
executedQueryString:
'SELECT mean("value") AS "ALIAS1", mean("value") AS "ALIAS2" FROM "bar"."cpu" WHERE time >= 1686582333244ms and time <= 1686583233244ms GROUP BY time(500ms), "geohash"::tag fill(null) ORDER BY time ASC',
},
fields: [
{
name: 'Time',
type: FieldType.time,
config: {},
values: [1686582333000, 1686582333500, 1686582334000],
},
{
name: 'Value',
type: FieldType.number,
labels: {
geohash: 'tz6h548nc111',
},
config: {
displayNameFromDS: 'cpu.ALIAS2 { geohash: tz6h548nc111 }',
},
values: [null, 111.98024577663908, null],
},
],
length: 1801,
},
{
name: 'cpu.ALIAS1 { geohash: wj7c61wnv111 }',
refId: 'A',
meta: {
executedQueryString:
'SELECT mean("value") AS "ALIAS1", mean("value") AS "ALIAS2" FROM "bar"."cpu" WHERE time >= 1686582333244ms and time <= 1686583233244ms GROUP BY time(500ms), "geohash"::tag fill(null) ORDER BY time ASC',
},
fields: [
{
name: 'Time',
type: FieldType.time,
config: {},
values: [1686582333000, 1686582333500, 1686582334000],
},
{
name: 'Value',
type: FieldType.number,
labels: {
geohash: 'wj7c61wnv111',
},
config: {
displayNameFromDS: 'cpu.ALIAS1 { geohash: wj7c61wnv111 }',
},
values: [null, 112.97136059147347, null],
},
],
length: 1801,
},
{
name: 'cpu.ALIAS2 { geohash: wj7c61wnv111 }',
refId: 'A',
meta: {
executedQueryString:
'SELECT mean("value") AS "ALIAS1", mean("value") AS "ALIAS2" FROM "bar"."cpu" WHERE time >= 1686582333244ms and time <= 1686583233244ms GROUP BY time(500ms), "geohash"::tag fill(null) ORDER BY time ASC',
},
fields: [
{
name: 'Time',
type: FieldType.time,
config: {},
values: [1686582333000, 1686582333500, 1686582334000],
},
{
name: 'Value',
type: FieldType.number,
labels: {
geohash: 'wj7c61wnv111',
},
config: {
displayNameFromDS: 'cpu.ALIAS2 { geohash: wj7c61wnv111 }',
},
values: [null, 112.97136059147347, null],
},
],
length: 1801,
},
{
name: 'cpu.ALIAS1 { geohash: wr50zpuhj111 }',
refId: 'A',
meta: {
executedQueryString:
'SELECT mean("value") AS "ALIAS1", mean("value") AS "ALIAS2" FROM "bar"."cpu" WHERE time >= 1686582333244ms and time <= 1686583233244ms GROUP BY time(500ms), "geohash"::tag fill(null) ORDER BY time ASC',
},
fields: [
{
name: 'Time',
type: FieldType.time,
config: {},
values: [1686582333000, 1686582333500, 1686582334000],
},
{
name: 'Value',
type: FieldType.number,
labels: {
geohash: 'wr50zpuhj111',
},
config: {
displayNameFromDS: 'cpu.ALIAS1 { geohash: wr50zpuhj111 }',
},
values: [null, 112.27638560052755, null],
},
],
length: 1801,
},
{
name: 'cpu.ALIAS2 { geohash: wr50zpuhj111 }',
refId: 'A',
meta: {
executedQueryString:
'SELECT mean("value") AS "ALIAS1", mean("value") AS "ALIAS2" FROM "bar"."cpu" WHERE time >= 1686582333244ms and time <= 1686583233244ms GROUP BY time(500ms), "geohash"::tag fill(null) ORDER BY time ASC',
},
fields: [
{
name: 'Time',
type: FieldType.time,
config: {},
values: [1686582333000, 1686582333500, 1686582334000],
},
{
name: 'Value',
type: FieldType.number,
labels: {
geohash: 'wr50zpuhj111',
},
config: {
displayNameFromDS: 'cpu.ALIAS2 { geohash: wr50zpuhj111 }',
},
values: [null, 112.27638560052755, null],
},
],
length: 1801,
},
];
const mockDataframesWithTwoFieldSelect: DataFrame[] = [
{
name: 'cpu.mean',
refId: 'A',
meta: {
typeVersion: [0, 0],
executedQueryString:
'SELECT mean("value"), mean("value") FROM "bar"."cpu" WHERE time >= 1686585763070ms and time <= 1686585793070ms GROUP BY time(10ms) fill(null) ORDER BY time ASC',
},
fields: [
{
name: 'Time',
type: FieldType.time,
config: {},
values: [1686585763070, 1686585763080, 1686585763090],
},
{
name: 'Value',
type: FieldType.number,
config: {
displayNameFromDS: 'cpu.mean',
},
values: [null, 87.42703187930438, null],
},
],
length: 3,
},
{
name: 'cpu.mean_1',
refId: 'A',
meta: {
typeVersion: [0, 0],
executedQueryString:
'SELECT mean("value"), mean("value") FROM "bar"."cpu" WHERE time >= 1686585763070ms and time <= 1686585793070ms GROUP BY time(10ms) fill(null) ORDER BY time ASC',
},
fields: [
{
name: 'Time',
type: FieldType.time,
config: {},
values: [1686585763070, 1686585763080, 1686585763090],
},
{
name: 'Value',
type: FieldType.number,
config: {
displayNameFromDS: 'cpu.mean_1',
},
values: [87.3, 87.4, 87.5],
},
],
length: 3,
},
];
const mockQueryWithTwoFieldSelect: InfluxQuery = {
datasource: {
type: 'influxdb',
uid: '1234',
},
groupBy: [
{
params: ['$__interval'],
type: 'time',
},
{
params: ['null'],
type: 'fill',
},
],
measurement: 'cpu',
orderByTime: 'ASC',
policy: 'bar',
refId: 'A',
resultFormat: 'table',
select: [
[
{
type: 'field',
params: ['value'],
},
{
type: 'mean',
params: [],
},
],
[
{
type: 'field',
params: ['value'],
},
{
type: 'mean',
params: [],
},
],
],
tags: [],
};
const annotationMockResponse: FetchResponse = {
config: { url: '' },
headers: new Headers(),
ok: false,
redirected: false,
status: 0,
statusText: '',
type: 'basic',
url: '',
data: {
results: {
metricFindQuery: {
frames: [
{
schema: {
name: 'logs.host',
fields: [
{
name: 'time',
type: 'time',
},
{
name: 'value',
type: 'string',
},
],
},
data: {
values: [
[1645208701000, 1645208702000],
['cbfa07e0e3bb 1', 'cbfa07e0e3bb 2'],
],
},
},
{
schema: {
name: 'logs.message',
fields: [
{
name: 'time',
type: 'time',
},
{
name: 'value',
type: 'string',
},
],
},
data: {
values: [
[1645208701000, 1645208702000],
['Station softwareupdated[447]: Adding client 1', 'Station softwareupdated[447]: Adding client 2'],
],
},
},
{
schema: {
name: 'logs.path',
fields: [
{
name: 'time',
type: 'time',
},
{
name: 'value',
type: 'string',
},
],
},
data: {
values: [
[1645208701000, 1645208702000],
['/var/log/host/install.log 1', '/var/log/host/install.log 2'],
],
},
},
{
schema: {
name: 'textColumn',
fields: [
{
name: 'time',
type: 'time',
},
{
name: 'value',
type: 'string',
},
],
},
data: {
values: [
[1645208701000, 1645208702000],
['text 1', 'text 2'],
],
},
},
],
},
},
},
};

@ -258,7 +258,12 @@ export function getSelectedParams(target: InfluxQuery): string[] {
target.select?.forEach((select) => {
const selector = select.filter((x) => x.type !== 'field');
if (selector.length > 0) {
allParams.push(selector[0].type);
const aliasIfExist = selector.find((s) => s.type === 'alias');
if (aliasIfExist) {
allParams.push(aliasIfExist.params?.[0].toString() ?? '');
} else {
allParams.push(selector[0].type);
}
} else {
if (select[0] && select[0].params && select[0].params[0]) {
allParams.push(select[0].params[0].toString());

Loading…
Cancel
Save