The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
grafana/public/app/plugins/datasource/influxdb/datasource.ts

803 lines
24 KiB

import { cloneDeep, extend, has, isString, map as _map, omit, pick, reduce } from 'lodash';
import { lastValueFrom, merge, Observable, of, throwError } from 'rxjs';
import { catchError, map } from 'rxjs/operators';
import {
AnnotationEvent,
DataFrame,
DataQueryError,
DataQueryRequest,
DataQueryResponse,
DataSourceGetTagKeysOptions,
DataSourceGetTagValuesOptions,
DataSourceInstanceSettings,
dateMath,
DateTime,
escapeRegex,
FieldType,
MetricFindValue,
QueryResultMeta,
RawTimeRange,
ScopedVars,
TIME_SERIES_TIME_FIELD_NAME,
TIME_SERIES_VALUE_FIELD_NAME,
TimeSeries,
toDataFrame,
} from '@grafana/data';
import {
BackendDataSourceResponse,
DataSourceWithBackend,
FetchResponse,
frameToMetricFindValue,
getBackendSrv,
} from '@grafana/runtime';
import { CustomFormatterVariable } from '@grafana/scenes';
import config from 'app/core/config';
import { getTemplateSrv, TemplateSrv } from 'app/features/templating/template_srv';
import { QueryFormat, SQLQuery } from '../../../features/plugins/sql';
import { AnnotationEditor } from './components/editor/annotation/AnnotationEditor';
import { FluxQueryEditor } from './components/editor/query/flux/FluxQueryEditor';
import { BROWSER_MODE_DISABLED_MESSAGE } from './constants';
import { toRawSql } from './fsql/sqlUtil';
import InfluxQueryModel from './influx_query_model';
import InfluxSeries from './influx_series';
import { buildMetadataQuery } from './influxql_query_builder';
import { prepareAnnotation } from './migrations';
import { buildRawQuery } from './queryUtils';
import ResponseParser from './response_parser';
import { DEFAULT_POLICY, InfluxOptions, InfluxQuery, InfluxQueryTag, InfluxVersion } from './types';
export default class InfluxDatasource extends DataSourceWithBackend<InfluxQuery, InfluxOptions> {
type: string;
urls: string[];
username: string;
password: string;
name: string;
database?: string;
basicAuth?: string;
withCredentials?: boolean;
access: 'direct' | 'proxy';
responseParser: ResponseParser;
httpMode: string;
version?: InfluxVersion;
isProxyAccess: boolean;
constructor(
instanceSettings: DataSourceInstanceSettings<InfluxOptions>,
readonly templateSrv: TemplateSrv = getTemplateSrv()
) {
super(instanceSettings);
this.type = 'influxdb';
this.urls = (instanceSettings.url ?? '').split(',').map((url) => {
return url.trim();
});
this.username = instanceSettings.username ?? '';
this.password = instanceSettings.password ?? '';
this.name = instanceSettings.name;
this.basicAuth = instanceSettings.basicAuth;
this.withCredentials = instanceSettings.withCredentials;
this.access = instanceSettings.access;
const settingsData: InfluxOptions = instanceSettings.jsonData ?? {};
this.database = settingsData.dbName ?? instanceSettings.database;
this.interval = settingsData.timeInterval;
this.httpMode = settingsData.httpMode || 'GET';
this.responseParser = new ResponseParser();
this.version = settingsData.version ?? InfluxVersion.InfluxQL;
this.isProxyAccess = instanceSettings.access === 'proxy';
if (this.version === InfluxVersion.Flux) {
// When flux, use an annotation processor rather than the `annotationQuery` lifecycle
this.annotations = {
QueryEditor: FluxQueryEditor,
};
} else {
this.annotations = {
QueryEditor: AnnotationEditor,
prepareAnnotation,
};
}
}
query(request: DataQueryRequest<InfluxQuery>): Observable<DataQueryResponse> {
if (!this.isProxyAccess) {
const error = new Error(BROWSER_MODE_DISABLED_MESSAGE);
return throwError(() => error);
}
return this._query(request);
}
_query(request: DataQueryRequest<InfluxQuery>): Observable<DataQueryResponse> {
// for not-flux queries we call `this.classicQuery`, and that
// handles the is-hidden situation.
// for the flux-case, we do the filtering here
const filteredRequest = {
...request,
targets: request.targets.filter((t) => t.hide !== true),
};
// migrate annotations
if (filteredRequest.targets.some((target: InfluxQuery) => target.fromAnnotations)) {
const streams: Array<Observable<DataQueryResponse>> = [];
for (const target of filteredRequest.targets) {
if (target.query) {
streams.push(
new Observable((subscriber) => {
this.annotationEvents(filteredRequest, target)
.then((events) => subscriber.next({ data: [toDataFrame(events)] }))
.catch((ex) => subscriber.error(new Error(ex)))
.finally(() => subscriber.complete());
})
);
}
}
return merge(...streams);
}
if (this.version === InfluxVersion.InfluxQL && !this.isMigrationToggleOnAndIsAccessProxy()) {
// Fallback to classic query support
return this.classicQuery(request);
}
return super.query(filteredRequest);
}
getQueryDisplayText(query: InfluxQuery) {
switch (this.version) {
case InfluxVersion.Flux:
return query.query;
case InfluxVersion.SQL:
return toRawSql(query);
case InfluxVersion.InfluxQL:
return new InfluxQueryModel(query).render(false);
default:
return '';
}
}
/**
* Returns false if the query should be skipped
*/
filterQuery(query: InfluxQuery): boolean {
if (this.version === InfluxVersion.Flux) {
return !!query.query;
}
return true;
}
applyTemplateVariables(query: InfluxQuery, scopedVars: ScopedVars): InfluxQuery & SQLQuery {
// We want to interpolate these variables on backend
const { __interval, __interval_ms, ...rest } = scopedVars || {};
if (this.version === InfluxVersion.Flux) {
return {
...query,
query: this.templateSrv.replace(query.query ?? '', rest), // The raw query text
};
}
if (this.version === InfluxVersion.SQL || this.isMigrationToggleOnAndIsAccessProxy()) {
query = this.applyVariables(query, rest);
if (query.adhocFilters?.length) {
const adhocFiltersToTags: InfluxQueryTag[] = (query.adhocFilters ?? []).map((af) => {
const { condition, ...asTag } = af;
return asTag;
});
query.tags = [...(query.tags ?? []), ...adhocFiltersToTags];
}
}
return query;
}
targetContainsTemplate(target: InfluxQuery) {
// for flux-mode we just take target.query,
// for influxql-mode we use InfluxQueryModel to create the text-representation
const queryText = this.version === InfluxVersion.Flux ? target.query : buildRawQuery(target);
return this.templateSrv.containsTemplate(queryText);
}
interpolateVariablesInQueries(queries: InfluxQuery[], scopedVars: ScopedVars): InfluxQuery[] {
if (!queries || queries.length === 0) {
return [];
}
return queries.map((query) => {
if (this.version === InfluxVersion.Flux) {
return {
...query,
datasource: this.getRef(),
query: this.templateSrv.replace(query.query ?? '', scopedVars, this.interpolateQueryExpr), // The raw query text
};
}
return {
...query,
datasource: this.getRef(),
...this.applyVariables(query, scopedVars),
};
});
}
applyVariables(query: InfluxQuery & SQLQuery, scopedVars: ScopedVars) {
const expandedQuery = { ...query };
if (query.groupBy) {
expandedQuery.groupBy = query.groupBy.map((groupBy) => {
return {
...groupBy,
params: groupBy.params?.map((param) => {
return this.templateSrv.replace(param.toString(), undefined, this.interpolateQueryExpr);
}),
};
});
}
if (query.select) {
expandedQuery.select = query.select.map((selects) => {
return selects.map((select) => {
return {
...select,
params: select.params?.map((param) => {
return this.templateSrv.replace(param.toString(), undefined, this.interpolateQueryExpr);
}),
};
});
});
}
if (query.tags) {
expandedQuery.tags = query.tags.map((tag) => {
return {
...tag,
key: this.templateSrv.replace(tag.key, scopedVars, this.interpolateQueryExpr),
value: this.templateSrv.replace(tag.value, scopedVars, this.interpolateQueryExpr),
};
});
}
return {
...expandedQuery,
adhocFilters: this.templateSrv.getAdhocFilters(this.name) ?? [],
query: this.templateSrv.replace(query.query ?? '', scopedVars, this.interpolateQueryExpr), // The raw query text
rawSql: this.templateSrv.replace(query.rawSql ?? '', scopedVars, this.interpolateQueryExpr), // The raw query text
alias: this.templateSrv.replace(query.alias ?? '', scopedVars),
limit: this.templateSrv.replace(query.limit?.toString() ?? '', scopedVars, this.interpolateQueryExpr),
measurement: this.templateSrv.replace(query.measurement ?? '', scopedVars, this.interpolateQueryExpr),
policy: this.templateSrv.replace(query.policy ?? '', scopedVars, this.interpolateQueryExpr),
slimit: this.templateSrv.replace(query.slimit?.toString() ?? '', scopedVars, this.interpolateQueryExpr),
tz: this.templateSrv.replace(query.tz ?? '', scopedVars),
};
}
interpolateQueryExpr(value: string | string[] = [], variable: Partial<CustomFormatterVariable>) {
// if no multi or include all do not regexEscape
if (!variable.multi && !variable.includeAll) {
return influxRegularEscape(value);
}
if (typeof value === 'string') {
return influxSpecialRegexEscape(value);
}
const escapedValues = value.map((val) => influxSpecialRegexEscape(val));
if (escapedValues.length === 1) {
return escapedValues[0];
}
return escapedValues.join('|');
}
async runMetadataQuery(target: InfluxQuery): Promise<MetricFindValue[]> {
return lastValueFrom(
super.query({
targets: [target],
} as DataQueryRequest)
).then(this.toMetricFindValue);
}
async metricFindQuery(query: string, options?: any): Promise<MetricFindValue[]> {
if (
this.version === InfluxVersion.Flux ||
this.version === InfluxVersion.SQL ||
this.isMigrationToggleOnAndIsAccessProxy()
) {
const target: InfluxQuery & SQLQuery = {
refId: 'metricFindQuery',
query,
rawQuery: true,
...(this.version === InfluxVersion.SQL ? { rawSql: query, format: QueryFormat.Table } : {}),
};
return lastValueFrom(
super.query({
...(options ?? {}), // includes 'range'
targets: [target],
})
).then(this.toMetricFindValue);
}
const interpolated = this.templateSrv.replace(query, options?.scopedVars, this.interpolateQueryExpr);
return lastValueFrom(this._seriesQuery(interpolated, options)).then((resp) => {
return this.responseParser.parse(query, resp);
});
}
toMetricFindValue(rsp: DataQueryResponse): MetricFindValue[] {
const data = rsp.data ?? [];
// Create MetricFindValue object for all frames
const values = data.map((d) => frameToMetricFindValue(d)).flat();
// Filter out duplicate elements
return values.filter((elm, idx, self) => idx === self.findIndex((t) => t.text === elm.text));
}
// By implementing getTagKeys and getTagValues we add ad-hoc filters functionality
// Used in public/app/features/variables/adhoc/picker/AdHocFilterKey.tsx::fetchFilterKeys
getTagKeys(options?: DataSourceGetTagKeysOptions) {
const query = buildMetadataQuery({
type: 'TAG_KEYS',
templateService: this.templateSrv,
database: this.database,
});
return this.metricFindQuery(query);
}
getTagValues(options: DataSourceGetTagValuesOptions) {
const query = buildMetadataQuery({
type: 'TAG_VALUES',
templateService: this.templateSrv,
database: this.database,
withKey: options.key,
});
return this.metricFindQuery(query);
}
/**
* @deprecated
*/
_seriesQuery(query: string, options?: any) {
if (!query) {
return of({ results: [] });
}
if (options && options.range) {
const timeFilter = this.getTimeFilter({ rangeRaw: options.range, timezone: options.timezone });
query = query.replace('$timeFilter', timeFilter);
}
return this._influxRequest(this.httpMode, '/query', { q: query, epoch: 'ms' }, options);
}
/**
* @deprecated
*/
serializeParams(params: any) {
if (!params) {
return '';
}
return reduce(
params,
(memo: string[], value, key) => {
if (value === null || value === undefined) {
return memo;
}
memo.push(encodeURIComponent(key) + '=' + encodeURIComponent(value));
return memo;
},
[]
).join('&');
}
/**
* @deprecated
*/
_influxRequest(method: string, url: string, data: any, options?: any) {
const currentUrl = this.urls.shift()!;
this.urls.push(currentUrl);
const params: any = {};
if (this.username) {
params.u = this.username;
params.p = this.password;
}
if (options && options.database) {
params.db = options.database;
} else if (this.database) {
params.db = this.database;
}
if (options?.policy && options.policy !== DEFAULT_POLICY) {
params.rp = options.policy;
}
const { q } = data;
if (method === 'POST' && has(data, 'q')) {
// verb is POST and 'q' param is defined
extend(params, omit(data, ['q']));
data = this.serializeParams(pick(data, ['q']));
} else if (method === 'GET' || method === 'POST') {
// verb is GET, or POST without 'q' param
extend(params, data);
data = null;
}
const req: any = {
method: method,
url: currentUrl + url,
params: params,
data: data,
precision: 'ms',
inspect: { type: 'influxdb' },
paramSerializer: this.serializeParams,
};
req.headers = req.headers || {};
if (this.basicAuth || this.withCredentials) {
req.withCredentials = true;
}
if (this.basicAuth) {
req.headers.Authorization = this.basicAuth;
}
if (method === 'POST') {
req.headers['Content-type'] = 'application/x-www-form-urlencoded';
}
return getBackendSrv()
.fetch(req)
.pipe(
map((result: any) => {
const { data } = result;
if (data) {
data.executedQueryString = q;
if (data.results) {
const errors = result.data.results.filter((elem: any) => elem.error);
if (errors.length > 0) {
throw {
message: 'InfluxDB Error: ' + errors[0].error,
data,
};
}
}
}
return data;
}),
catchError((err) => {
if (err.cancelled) {
return of(err);
}
return throwError(this.handleErrors(err));
})
);
}
/**
* @deprecated
*/
handleErrors(err: any) {
const error: DataQueryError = {
message:
(err && err.status) ||
(err && err.message) ||
'Unknown error during query transaction. Please check JS console logs.',
};
if ((Number.isInteger(err.status) && err.status !== 0) || err.status >= 300) {
if (err.data && err.data.error) {
error.message = 'InfluxDB Error: ' + err.data.error;
error.data = err.data;
// @ts-ignore
error.config = err.config;
} else {
error.message = 'Network Error: ' + err.statusText + '(' + err.status + ')';
error.data = err.data;
// @ts-ignore
error.config = err.config;
}
}
return error;
}
getTimeFilter(options: { rangeRaw: RawTimeRange; timezone: string }) {
const from = this.getInfluxTime(options.rangeRaw.from, false, options.timezone);
const until = this.getInfluxTime(options.rangeRaw.to, true, options.timezone);
return 'time >= ' + from + ' and time <= ' + until;
}
getInfluxTime(date: DateTime | string, roundUp: any, timezone: any) {
let outPutDate;
if (isString(date)) {
if (date === 'now') {
return 'now()';
}
const parts = /^now-(\d+)([dhms])$/.exec(date);
if (parts) {
const amount = parseInt(parts[1], 10);
const unit = parts[2];
return 'now() - ' + amount + unit;
}
outPutDate = dateMath.parse(date, roundUp, timezone);
if (!outPutDate) {
throw new Error('unable to parse date');
}
date = outPutDate;
}
return date.valueOf() + 'ms';
}
// ------------------------ Legacy Code - Before Backend Migration ---------------
isMigrationToggleOnAndIsAccessProxy() {
return config.featureToggles.influxdbBackendMigration && this.access === 'proxy';
}
/**
* The unchanged pre 7.1 query implementation
* @deprecated
*/
classicQuery(options: any): Observable<DataQueryResponse> {
let timeFilter = this.getTimeFilter(options);
const scopedVars = options.scopedVars;
const targets = cloneDeep(options.targets);
const queryTargets: any[] = [];
let i, y;
let allQueries = _map(targets, (target) => {
if (target.hide) {
return '';
}
queryTargets.push(target);
// backward compatibility
scopedVars.interval = scopedVars.__interval;
return new InfluxQueryModel(target, this.templateSrv, scopedVars).render(true);
}).reduce((acc, current) => {
if (current !== '') {
acc += ';' + current;
}
return acc;
});
if (allQueries === '') {
return of({ data: [] });
}
// add global adhoc filters to timeFilter
const adhocFilters = this.templateSrv.getAdhocFilters(this.name);
const adhocFiltersFromDashboard = options.targets.flatMap((target: InfluxQuery) => target.adhocFilters ?? []);
if (adhocFilters?.length || adhocFiltersFromDashboard?.length) {
const ahFilters = adhocFilters?.length ? adhocFilters : adhocFiltersFromDashboard;
const tmpQuery = new InfluxQueryModel({ refId: 'A' }, this.templateSrv, scopedVars);
timeFilter += ' AND ' + tmpQuery.renderAdhocFilters(ahFilters);
}
// replace grafana variables
scopedVars.timeFilter = { value: timeFilter };
// replace templated variables
allQueries = this.templateSrv.replace(allQueries, scopedVars);
return this._seriesQuery(allQueries, options).pipe(
map((data) => {
if (!data || !data.results) {
return { data: [] };
}
const seriesList = [];
for (i = 0; i < data.results.length; i++) {
const result = data.results[i];
if (!result || !result.series) {
continue;
}
const target = queryTargets[i];
let alias = target.alias;
if (alias) {
alias = this.templateSrv.replace(target.alias, options.scopedVars);
}
const meta: QueryResultMeta = {
executedQueryString: data.executedQueryString,
};
const influxSeries = new InfluxSeries({
refId: target.refId,
series: data.results[i].series,
alias: alias,
meta,
});
switch (target.resultFormat) {
case 'logs':
meta.preferredVisualisationType = 'logs';
case 'table': {
seriesList.push(influxSeries.getTable());
break;
}
default: {
const timeSeries = influxSeries.getTimeSeries();
for (y = 0; y < timeSeries.length; y++) {
seriesList.push(timeSeriesToDataFrame(timeSeries[y]));
}
break;
}
}
}
return { data: seriesList };
})
);
}
async annotationEvents(options: DataQueryRequest, annotation: InfluxQuery): Promise<AnnotationEvent[]> {
if (this.version === InfluxVersion.Flux) {
return Promise.reject({
message: 'Flux requires the standard annotation query',
});
}
// InfluxQL puts a query string on the annotation
if (!annotation.query) {
return Promise.reject({
message: 'Query missing in annotation definition',
});
}
if (this.isMigrationToggleOnAndIsAccessProxy()) {
// We want to send our query to the backend as a raw query
const target: InfluxQuery = {
refId: 'metricFindQuery',
datasource: this.getRef(),
query: this.templateSrv.replace(annotation.query, undefined, this.interpolateQueryExpr),
rawQuery: true,
};
return lastValueFrom(
getBackendSrv()
.fetch<BackendDataSourceResponse>({
url: '/api/ds/query',
method: 'POST',
headers: this.getRequestHeaders(),
data: {
from: options.range.from.valueOf().toString(),
to: options.range.to.valueOf().toString(),
queries: [target],
},
requestId: annotation.name,
})
.pipe(
map(
async (res: FetchResponse<BackendDataSourceResponse>) =>
await this.responseParser.transformAnnotationResponse(annotation, res, target)
)
)
);
}
const timeFilter = this.getTimeFilter({ rangeRaw: options.range.raw, timezone: options.timezone });
let query = annotation.query.replace('$timeFilter', timeFilter);
query = this.templateSrv.replace(query, undefined, this.interpolateQueryExpr);
return lastValueFrom(this._seriesQuery(query, options)).then((data) => {
if (!data || !data.results || !data.results[0]) {
throw { message: 'No results in response from InfluxDB' };
}
return new InfluxSeries({
series: data.results[0].series,
annotation: annotation,
}).getAnnotations();
});
}
}
// we detect the field type based on the value-array
function getFieldType(values: unknown[]): FieldType {
// the values-array may contain a lot of nulls.
// we need the first not-null item
const firstNotNull = values.find((v) => v !== null);
if (firstNotNull === undefined) {
// we could not find any not-null values
return FieldType.number;
}
const valueType = typeof firstNotNull;
switch (valueType) {
case 'string':
return FieldType.string;
case 'boolean':
return FieldType.boolean;
case 'number':
return FieldType.number;
default:
// this should never happen, influxql values
// can only be numbers, strings and booleans.
throw new Error(`InfluxQL: invalid value type ${valueType}`);
}
}
// this conversion function is specialized to work with the timeseries
// data returned by InfluxDatasource.getTimeSeries()
function timeSeriesToDataFrame(timeSeries: TimeSeries): DataFrame {
const times: number[] = [];
const values: unknown[] = [];
// the data we process here is not correctly typed.
// the typescript types say every data-point is number|null,
// but in fact it can be string or boolean too.
const points = timeSeries.datapoints;
for (const point of points) {
values.push(point[0]);
times.push(point[1] as number);
}
const timeField = {
name: TIME_SERIES_TIME_FIELD_NAME,
type: FieldType.time,
config: {},
values: times,
};
const valueField = {
name: TIME_SERIES_VALUE_FIELD_NAME,
type: getFieldType(values),
config: {
displayNameFromDS: timeSeries.title,
},
values: values,
labels: timeSeries.tags,
};
const fields = [timeField, valueField];
return {
name: timeSeries.target,
refId: timeSeries.refId,
meta: timeSeries.meta,
fields,
length: values.length,
};
}
export function influxRegularEscape(value: string | string[]) {
if (typeof value === 'string') {
// Check the value is a number. If not run to escape special characters
if (isNaN(parseFloat(value))) {
return escapeRegex(value);
}
}
return value;
}
export function influxSpecialRegexEscape(value: string | string[]) {
if (typeof value !== 'string') {
return value;
}
value = value.replace(/\\/g, '\\\\\\\\');
value = value.replace(/[$^*{}\[\]\'+?.()|]/g, '$&');
return value;
}