The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
grafana/public/app/plugins/datasource/tempo/datasource.ts

175 lines
6.4 KiB

import { groupBy } from 'lodash';
import { from, lastValueFrom, merge, Observable, of, throwError } from 'rxjs';
import { map, mergeMap, toArray } from 'rxjs/operators';
import {
DataQuery,
DataQueryRequest,
DataQueryResponse,
DataSourceApi,
DataSourceInstanceSettings,
DataSourceJsonData,
LoadingState,
} from '@grafana/data';
import { DataSourceWithBackend } from '@grafana/runtime';
import { TraceToLogsOptions } from 'app/core/components/TraceToLogsSettings';
import { getDatasourceSrv } from 'app/features/plugins/datasource_srv';
import { LokiOptions, LokiQuery } from '../loki/types';
import { transformTrace, transformTraceList, transformFromOTLP as transformFromOTEL } from './resultTransformer';
import { PrometheusDatasource } from '../prometheus/datasource';
import { PromQuery } from '../prometheus/types';
import { mapPromMetricsToServiceMap, serviceMapMetrics } from './graphTransform';
export type TempoQueryType = 'search' | 'traceId' | 'serviceMap' | 'upload';
export interface TempoJsonData extends DataSourceJsonData {
tracesToLogs?: TraceToLogsOptions;
serviceMap?: {
datasourceUid?: string;
};
}
export type TempoQuery = {
query: string;
// Query to find list of traces, e.g., via Loki
linkedQuery?: LokiQuery;
queryType: TempoQueryType;
} & DataQuery;
export class TempoDatasource extends DataSourceWithBackend<TempoQuery, TempoJsonData> {
tracesToLogs?: TraceToLogsOptions;
serviceMap?: {
datasourceUid?: string;
};
uploadedJson?: string | ArrayBuffer | null = null;
constructor(instanceSettings: DataSourceInstanceSettings<TempoJsonData>) {
super(instanceSettings);
this.tracesToLogs = instanceSettings.jsonData.tracesToLogs;
this.serviceMap = instanceSettings.jsonData.serviceMap;
}
query(options: DataQueryRequest<TempoQuery>): Observable<DataQueryResponse> {
const subQueries: Array<Observable<DataQueryResponse>> = [];
const filteredTargets = options.targets.filter((target) => !target.hide);
const targets: { [type: string]: TempoQuery[] } = groupBy(filteredTargets, (t) => t.queryType || 'traceId');
// Run search queries on linked datasource
if (this.tracesToLogs?.datasourceUid && targets.search?.length > 0) {
const dsSrv = getDatasourceSrv();
subQueries.push(
from(dsSrv.get(this.tracesToLogs.datasourceUid)).pipe(
mergeMap((linkedDatasource: DataSourceApi) => {
// Wrap linked query into a data request based on original request
const linkedRequest: DataQueryRequest = { ...options, targets: targets.search.map((t) => t.linkedQuery!) };
// Find trace matchers in derived fields of the linked datasource that's identical to this datasource
const settings: DataSourceInstanceSettings<LokiOptions> = (linkedDatasource as any).instanceSettings;
const traceLinkMatcher: string[] =
settings.jsonData.derivedFields
?.filter((field) => field.datasourceUid === this.uid && field.matcherRegex)
.map((field) => field.matcherRegex) || [];
if (!traceLinkMatcher || traceLinkMatcher.length === 0) {
return throwError(
'No Loki datasource configured for search. Set up Derived Fields for traces in a Loki datasource settings and link it to this Tempo datasource.'
);
} else {
return (linkedDatasource.query(linkedRequest) as Observable<DataQueryResponse>).pipe(
map((response) =>
response.error ? response : transformTraceList(response, this.uid, this.name, traceLinkMatcher)
)
);
}
})
)
);
}
if (targets.upload?.length) {
if (this.uploadedJson) {
const otelTraceData = JSON.parse(this.uploadedJson as string);
if (!otelTraceData.batches) {
subQueries.push(of({ error: { message: 'JSON is not valid opentelemetry format' }, data: [] }));
} else {
subQueries.push(of(transformFromOTEL(otelTraceData.batches)));
}
} else {
subQueries.push(of({ data: [], state: LoadingState.Done }));
}
}
if (this.serviceMap?.datasourceUid && targets.serviceMap?.length > 0) {
subQueries.push(serviceMapQuery(options, this.serviceMap.datasourceUid));
}
if (targets.traceId?.length > 0) {
const traceRequest: DataQueryRequest<TempoQuery> = { ...options, targets: targets.traceId };
subQueries.push(
super.query(traceRequest).pipe(
map((response) => {
if (response.error) {
return response;
}
return transformTrace(response);
})
)
);
}
return merge(...subQueries);
}
async testDatasource(): Promise<any> {
// to test Tempo we send a dummy traceID and verify Tempo answers with 'trace not found'
const response = await lastValueFrom(super.query({ targets: [{ query: '0' }] } as any));
const errorMessage = response.error?.message;
if (
errorMessage &&
errorMessage.startsWith('failed to get trace') &&
errorMessage.endsWith('trace not found in Tempo')
) {
return { status: 'success', message: 'Data source is working' };
}
return { status: 'error', message: 'Data source is not working' + (errorMessage ? `: ${errorMessage}` : '') };
}
getQueryDisplayText(query: TempoQuery) {
return query.query;
}
}
function queryServiceMapPrometheus(request: DataQueryRequest<PromQuery>, datasourceUid: string) {
return from(getDatasourceSrv().get(datasourceUid)).pipe(
mergeMap((ds) => {
return (ds as PrometheusDatasource).query(request);
})
);
}
function serviceMapQuery(request: DataQueryRequest<TempoQuery>, datasourceUid: string) {
return queryServiceMapPrometheus(makePromServiceMapRequest(request), datasourceUid).pipe(
// Just collect all the responses first before processing into node graph data
toArray(),
map((responses: DataQueryResponse[]) => {
return {
data: mapPromMetricsToServiceMap(responses, request.range),
state: LoadingState.Done,
};
})
);
}
function makePromServiceMapRequest(options: DataQueryRequest<TempoQuery>): DataQueryRequest<PromQuery> {
return {
...options,
targets: serviceMapMetrics.map((metric) => {
return {
refId: metric,
expr: `delta(${metric}[$__range])`,
instant: true,
};
}),
};
}