mirror of https://github.com/grafana/grafana
Tempo: convert to backend data source (#31618)
* Tempo: Support opentelemetry response * Tempo: convert Tempo to backend data source * Update data source test * Fix lint issues * Apply suggestions from code review Co-authored-by: Arve Knudsen <arve.knudsen@gmail.com> * Return error when trace not found Co-authored-by: Arve Knudsen <arve.knudsen@gmail.com>pull/31698/head
parent
57f3de74c6
commit
862f09376f
@ -0,0 +1,127 @@ |
||||
package tempo |
||||
|
||||
import ( |
||||
"context" |
||||
"encoding/json" |
||||
"fmt" |
||||
"io/ioutil" |
||||
"net/http" |
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data" |
||||
"github.com/grafana/grafana/pkg/infra/log" |
||||
"github.com/grafana/grafana/pkg/models" |
||||
"github.com/grafana/grafana/pkg/tsdb" |
||||
|
||||
jaeger "github.com/jaegertracing/jaeger/model" |
||||
jaeger_json "github.com/jaegertracing/jaeger/model/converter/json" |
||||
|
||||
ot_pdata "go.opentelemetry.io/collector/consumer/pdata" |
||||
ot_jaeger "go.opentelemetry.io/collector/translator/trace/jaeger" |
||||
) |
||||
|
||||
type tempoExecutor struct { |
||||
httpClient *http.Client |
||||
} |
||||
|
||||
func newTempoExecutor(dsInfo *models.DataSource) (tsdb.TsdbQueryEndpoint, error) { |
||||
httpClient, err := dsInfo.GetHttpClient() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return &tempoExecutor{ |
||||
httpClient: httpClient, |
||||
}, nil |
||||
} |
||||
|
||||
var ( |
||||
plog log.Logger |
||||
) |
||||
|
||||
func init() { |
||||
plog = log.New("tsdb.tempo") |
||||
tsdb.RegisterTsdbQueryEndpoint("tempo", newTempoExecutor) |
||||
} |
||||
|
||||
func (e *tempoExecutor) Query(ctx context.Context, dsInfo *models.DataSource, tsdbQuery *tsdb.TsdbQuery) (*tsdb.Response, error) { |
||||
result := &tsdb.Response{ |
||||
Results: map[string]*tsdb.QueryResult{}, |
||||
} |
||||
refID := tsdbQuery.Queries[0].RefId |
||||
queryResult := &tsdb.QueryResult{} |
||||
result.Results[refID] = queryResult |
||||
|
||||
traceID := tsdbQuery.Queries[0].Model.Get("query").MustString("") |
||||
|
||||
plog.Debug("Querying tempo with traceID", "traceID", traceID) |
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", dsInfo.Url+"/api/traces/"+traceID, nil) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
req.Header.Set("Accept", "application/protobuf") |
||||
|
||||
resp, err := e.httpClient.Do(req) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("failed get to tempo: %w", err) |
||||
} |
||||
|
||||
defer func() { |
||||
if err := resp.Body.Close(); err != nil { |
||||
plog.Warn("failed to close response body", "err", err) |
||||
} |
||||
}() |
||||
|
||||
if resp.StatusCode == http.StatusNotFound { |
||||
queryResult.Error = fmt.Errorf("failed to get trace: %s", traceID) |
||||
return result, nil |
||||
} |
||||
|
||||
body, err := ioutil.ReadAll(resp.Body) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
otTrace := ot_pdata.NewTraces() |
||||
err = otTrace.FromOtlpProtoBytes(body) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("failed to convert tempo response to Otlp: %w", err) |
||||
} |
||||
|
||||
jaegerBatches, err := ot_jaeger.InternalTracesToJaegerProto(otTrace) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("failed to translate to jaegerBatches %v: %w", traceID, err) |
||||
} |
||||
|
||||
jaegerTrace := &jaeger.Trace{ |
||||
Spans: []*jaeger.Span{}, |
||||
ProcessMap: []jaeger.Trace_ProcessMapping{}, |
||||
} |
||||
|
||||
// otel proto conversion doesn't set jaeger processes
|
||||
for _, batch := range jaegerBatches { |
||||
for _, s := range batch.Spans { |
||||
s.Process = batch.Process |
||||
} |
||||
|
||||
jaegerTrace.Spans = append(jaegerTrace.Spans, batch.Spans...) |
||||
jaegerTrace.ProcessMap = append(jaegerTrace.ProcessMap, jaeger.Trace_ProcessMapping{ |
||||
Process: *batch.Process, |
||||
ProcessID: batch.Process.ServiceName, |
||||
}) |
||||
} |
||||
jsonTrace := jaeger_json.FromDomain(jaegerTrace) |
||||
|
||||
traceBytes, err := json.Marshal(jsonTrace) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("failed to json.Marshal trace \"%s\" :%w", traceID, err) |
||||
} |
||||
|
||||
frames := []*data.Frame{ |
||||
{Name: "Traces", RefID: refID, Fields: []*data.Field{data.NewField("trace", nil, []string{string(traceBytes)})}}, |
||||
} |
||||
queryResult.Dataframes = tsdb.NewDecodedDataFrames(frames) |
||||
|
||||
return result, nil |
||||
} |
@ -1,121 +1,63 @@ |
||||
import { |
||||
dateMath, |
||||
DateTime, |
||||
MutableDataFrame, |
||||
DataSourceApi, |
||||
DataSourceInstanceSettings, |
||||
DataFrame, |
||||
DataQuery, |
||||
DataQueryRequest, |
||||
DataQueryResponse, |
||||
DataQuery, |
||||
DataSourceInstanceSettings, |
||||
FieldType, |
||||
MutableDataFrame, |
||||
} from '@grafana/data'; |
||||
import { getBackendSrv, BackendSrvRequest } from '@grafana/runtime'; |
||||
import { Observable, from, of } from 'rxjs'; |
||||
import { DataSourceWithBackend } from '@grafana/runtime'; |
||||
import { Observable } from 'rxjs'; |
||||
import { map } from 'rxjs/operators'; |
||||
|
||||
import { getTimeSrv, TimeSrv } from 'app/features/dashboard/services/TimeSrv'; |
||||
import { serializeParams } from 'app/core/utils/fetch'; |
||||
|
||||
export type TempoQuery = { |
||||
query: string; |
||||
} & DataQuery; |
||||
|
||||
export class TempoDatasource extends DataSourceApi<TempoQuery> { |
||||
constructor(private instanceSettings: DataSourceInstanceSettings, private readonly timeSrv: TimeSrv = getTimeSrv()) { |
||||
export class TempoDatasource extends DataSourceWithBackend<TempoQuery> { |
||||
constructor(instanceSettings: DataSourceInstanceSettings) { |
||||
super(instanceSettings); |
||||
} |
||||
|
||||
async metadataRequest(url: string, params?: Record<string, any>): Promise<any> { |
||||
const res = await this._request(url, params, { hideFromInspector: true }).toPromise(); |
||||
return res.data.data; |
||||
} |
||||
|
||||
query(options: DataQueryRequest<TempoQuery>): Observable<DataQueryResponse> { |
||||
// At this moment we expect only one target. In case we somehow change the UI to be able to show multiple
|
||||
// traces at one we need to change this.
|
||||
const id = options.targets[0]?.query; |
||||
if (id) { |
||||
return this._request(`/api/traces/${encodeURIComponent(id)}`).pipe( |
||||
map((response) => { |
||||
return { |
||||
data: [ |
||||
new MutableDataFrame({ |
||||
fields: [ |
||||
{ |
||||
name: 'trace', |
||||
type: FieldType.trace, |
||||
values: response?.data?.data || [], |
||||
}, |
||||
], |
||||
meta: { |
||||
preferredVisualisationType: 'trace', |
||||
return super.query(options).pipe( |
||||
map((response) => { |
||||
if (response.error) { |
||||
return response; |
||||
} |
||||
|
||||
return { |
||||
data: [ |
||||
new MutableDataFrame({ |
||||
fields: [ |
||||
{ |
||||
name: 'trace', |
||||
type: FieldType.trace, |
||||
values: [JSON.parse((response.data as DataFrame[])[0].fields[0].values.get(0))], |
||||
}, |
||||
}), |
||||
], |
||||
}; |
||||
}) |
||||
); |
||||
} else { |
||||
return of({ |
||||
data: [ |
||||
new MutableDataFrame({ |
||||
fields: [ |
||||
{ |
||||
name: 'trace', |
||||
type: FieldType.trace, |
||||
values: [], |
||||
], |
||||
meta: { |
||||
preferredVisualisationType: 'trace', |
||||
}, |
||||
], |
||||
meta: { |
||||
preferredVisualisationType: 'trace', |
||||
}, |
||||
}), |
||||
], |
||||
}); |
||||
} |
||||
}), |
||||
], |
||||
}; |
||||
}) |
||||
); |
||||
} |
||||
|
||||
async testDatasource(): Promise<any> { |
||||
try { |
||||
await this._request(`/api/traces/random`).toPromise(); |
||||
} catch (e) { |
||||
// If all went well this request will get back with 400 - Bad request
|
||||
if (e?.status !== 400) { |
||||
throw e; |
||||
} |
||||
const response = await super.query({ targets: [{ query: '', refId: 'A' }] } as any).toPromise(); |
||||
|
||||
if (!response.error?.message?.startsWith('failed to get trace')) { |
||||
return { status: 'error', message: 'Data source is not working' }; |
||||
} |
||||
return { status: 'success', message: 'Data source is working' }; |
||||
} |
||||
|
||||
getTimeRange(): { start: number; end: number } { |
||||
const range = this.timeSrv.timeRange(); |
||||
return { |
||||
start: getTime(range.from, false), |
||||
end: getTime(range.to, true), |
||||
}; |
||||
return { status: 'success', message: 'Data source is working' }; |
||||
} |
||||
|
||||
getQueryDisplayText(query: TempoQuery) { |
||||
return query.query; |
||||
} |
||||
|
||||
private _request(apiUrl: string, data?: any, options?: Partial<BackendSrvRequest>): Observable<Record<string, any>> { |
||||
// Hack for proxying metadata requests
|
||||
const baseUrl = `/api/datasources/proxy/${this.instanceSettings.id}`; |
||||
const params = data ? serializeParams(data) : ''; |
||||
const url = `${baseUrl}${apiUrl}${params.length ? `?${params}` : ''}`; |
||||
const req = { |
||||
...options, |
||||
url, |
||||
}; |
||||
|
||||
return from(getBackendSrv().datasourceRequest(req)); |
||||
} |
||||
} |
||||
|
||||
function getTime(date: string | DateTime, roundUp: boolean) { |
||||
if (typeof date === 'string') { |
||||
date = dateMath.parse(date, roundUp)!; |
||||
} |
||||
return date.valueOf() * 1000; |
||||
} |
||||
|
Loading…
Reference in new issue