mirror of https://github.com/grafana/grafana
Tempo: TraceQL query response streaming (#69212)
* Refactor Tempo datasource backend to support multiple queryData types. Added traceId query type that is set when performing the request but doesn't map to a tab. * WIP data is reaching the frontend * WIP * Use channels and goroutines * Some fixes * Simplify backend code. Return traces, metrics, state and error in a dataframe. Shared state type between FE and BE. Use getStream() instead of getQueryData() * Handle errors in frontend * Update Tempo and use same URL for RPC and HTTP * Cleanup backend code * Merge main * Create grpc client only with host and authenticate * Create grpc client only with host and authenticate * Cleanup * Add streaming to TraceQL Search tab * Fix merge conflicts * Added tests for processStream * make gen-cue * make gen-cue * goimports * lint * Cleanup go.mod * Comments * Addressing PR comments * Fix streaming for tracel search tab * Added streaming kill switch as the disableTraceQLStreaming feature toggle * Small comment * Fix conflicts * Correctly capture and send all errors as a DF to client * Fix infinite error loop * Fix merge conflicts * Fix test * Update deprecated import * Fix feature toggles gen * Fix merge conflictspull/69212/merge
parent
fb2a57d3a3
commit
c1709c9301
|
@ -0,0 +1,72 @@ |
||||
package tempo |
||||
|
||||
import ( |
||||
"context" |
||||
"crypto/tls" |
||||
"encoding/base64" |
||||
"fmt" |
||||
"net/url" |
||||
"strings" |
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend" |
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient" |
||||
"github.com/grafana/tempo/pkg/tempopb" |
||||
"google.golang.org/grpc" |
||||
"google.golang.org/grpc/credentials" |
||||
"google.golang.org/grpc/credentials/insecure" |
||||
) |
||||
|
||||
// This function creates a new gRPC client to connect to a streaming query service.
|
||||
// It starts by parsing the URL from the data source settings and extracting the host, since that's what the gRPC connection expects.
|
||||
// If the URL does not contain a port number, it adds a default port based on the scheme (80 for HTTP and 443 for HTTPS).
|
||||
// If basic authentication is enabled, it uses TLS transport credentials and sets the basic authentication header for each RPC call.
|
||||
// Otherwise, it uses insecure credentials.
|
||||
func newGrpcClient(settings backend.DataSourceInstanceSettings, opts httpclient.Options) (tempopb.StreamingQuerierClient, error) { |
||||
parsedUrl, err := url.Parse(settings.URL) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
onlyHost := parsedUrl.Host |
||||
if !strings.Contains(onlyHost, ":") { |
||||
if parsedUrl.Scheme == "http" { |
||||
onlyHost += ":80" |
||||
} else { |
||||
onlyHost += ":443" |
||||
} |
||||
} |
||||
|
||||
var dialOps []grpc.DialOption |
||||
if settings.BasicAuthEnabled { |
||||
dialOps = append(dialOps, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))) |
||||
dialOps = append(dialOps, grpc.WithPerRPCCredentials(&basicAuth{ |
||||
Header: basicHeaderForAuth(opts.BasicAuth.User, opts.BasicAuth.Password), |
||||
})) |
||||
} else { |
||||
dialOps = append(dialOps, grpc.WithTransportCredentials(insecure.NewCredentials())) |
||||
} |
||||
|
||||
clientConn, err := grpc.Dial(onlyHost, dialOps...) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return tempopb.NewStreamingQuerierClient(clientConn), nil |
||||
} |
||||
|
||||
type basicAuth struct { |
||||
Header string |
||||
} |
||||
|
||||
func (c *basicAuth) GetRequestMetadata(context.Context, ...string) (map[string]string, error) { |
||||
return map[string]string{ |
||||
"Authorization": c.Header, |
||||
}, nil |
||||
} |
||||
|
||||
func (c *basicAuth) RequireTransportSecurity() bool { |
||||
return true |
||||
} |
||||
|
||||
func basicHeaderForAuth(username, password string) string { |
||||
return fmt.Sprintf("Basic %s", base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%s:%s", username, password)))) |
||||
} |
||||
@ -0,0 +1,63 @@ |
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tempo |
||||
|
||||
import ( |
||||
"go.opentelemetry.io/collector/model/pdata" |
||||
) |
||||
|
||||
// Some of the keys used to represent OTLP constructs as tags or annotations in other formats.
|
||||
const ( |
||||
TagMessage = "message" |
||||
|
||||
TagSpanKind = "span.kind" |
||||
|
||||
TagStatusCode = "status.code" |
||||
TagStatusMsg = "status.message" |
||||
TagError = "error" |
||||
TagHTTPStatusMsg = "http.status_message" |
||||
|
||||
TagW3CTraceState = "w3c.tracestate" |
||||
) |
||||
|
||||
// Constants used for signifying batch-level attribute values where not supplied by OTLP data but required
|
||||
// by other protocols.
|
||||
const ( |
||||
ResourceNoServiceName = "OTLPResourceNoServiceName" |
||||
) |
||||
|
||||
// OpenTracingSpanKind are possible values for TagSpanKind and match the OpenTracing
|
||||
// conventions: https://github.com/opentracing/specification/blob/main/semantic_conventions.md
|
||||
// These values are used for representing span kinds that have no
|
||||
// equivalents in OpenCensus format. They are stored as values of TagSpanKind
|
||||
type OpenTracingSpanKind string |
||||
|
||||
const ( |
||||
OpenTracingSpanKindUnspecified OpenTracingSpanKind = "" |
||||
OpenTracingSpanKindClient OpenTracingSpanKind = "client" |
||||
OpenTracingSpanKindServer OpenTracingSpanKind = "server" |
||||
OpenTracingSpanKindConsumer OpenTracingSpanKind = "consumer" |
||||
OpenTracingSpanKindProducer OpenTracingSpanKind = "producer" |
||||
OpenTracingSpanKindInternal OpenTracingSpanKind = "internal" |
||||
) |
||||
|
||||
// StatusCodeFromHTTP takes an HTTP status code and return the appropriate OpenTelemetry status code
|
||||
// See: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md#status
|
||||
func StatusCodeFromHTTP(httpStatusCode int) pdata.StatusCode { |
||||
if httpStatusCode >= 100 && httpStatusCode < 399 { |
||||
return pdata.StatusCodeUnset |
||||
} |
||||
return pdata.StatusCodeError |
||||
} |
||||
@ -0,0 +1,160 @@ |
||||
package tempo |
||||
|
||||
import ( |
||||
"context" |
||||
"encoding/json" |
||||
"errors" |
||||
"fmt" |
||||
"io" |
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend" |
||||
"github.com/grafana/grafana-plugin-sdk-go/data" |
||||
"github.com/grafana/grafana/pkg/tsdb/tempo/kinds/dataquery" |
||||
"github.com/grafana/tempo/pkg/tempopb" |
||||
) |
||||
|
||||
const SearchPathPrefix = "search/" |
||||
|
||||
type ExtendedResponse struct { |
||||
*tempopb.SearchResponse |
||||
State dataquery.SearchStreamingState |
||||
} |
||||
|
||||
type StreamSender interface { |
||||
SendFrame(frame *data.Frame, include data.FrameInclude) error |
||||
SendJSON(data []byte) error |
||||
SendBytes(data []byte) error |
||||
} |
||||
|
||||
func (s *Service) runSearchStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender, datasource *Datasource) error { |
||||
response := &backend.DataResponse{} |
||||
|
||||
var backendQuery *backend.DataQuery |
||||
err := json.Unmarshal(req.Data, &backendQuery) |
||||
if err != nil { |
||||
response.Error = fmt.Errorf("error unmarshaling backend query model: %v", err) |
||||
return err |
||||
} |
||||
|
||||
var sr *tempopb.SearchRequest |
||||
err = json.Unmarshal(req.Data, &sr) |
||||
if err != nil { |
||||
response.Error = fmt.Errorf("error unmarshaling Tempo query model: %v", err) |
||||
return err |
||||
} |
||||
|
||||
if sr.GetQuery() == "" { |
||||
return fmt.Errorf("query is empty") |
||||
} |
||||
|
||||
sr.Start = uint32(backendQuery.TimeRange.From.Unix()) |
||||
sr.End = uint32(backendQuery.TimeRange.To.Unix()) |
||||
|
||||
stream, err := datasource.StreamingClient.Search(ctx, sr) |
||||
if err != nil { |
||||
s.logger.Error("Error Search()", "err", err) |
||||
return err |
||||
} |
||||
|
||||
return s.processStream(stream, sender) |
||||
} |
||||
|
||||
func (s *Service) processStream(stream tempopb.StreamingQuerier_SearchClient, sender StreamSender) error { |
||||
var traceList []*tempopb.TraceSearchMetadata |
||||
var metrics *tempopb.SearchMetrics |
||||
for { |
||||
msg, err := stream.Recv() |
||||
if errors.Is(err, io.EOF) { |
||||
if err := sendResponse(&ExtendedResponse{ |
||||
State: dataquery.SearchStreamingStateDone, |
||||
SearchResponse: &tempopb.SearchResponse{ |
||||
Metrics: metrics, |
||||
Traces: traceList, |
||||
}, |
||||
}, sender); err != nil { |
||||
return err |
||||
} |
||||
break |
||||
} |
||||
if err != nil { |
||||
s.logger.Error("Error receiving message", "err", err) |
||||
return err |
||||
} |
||||
|
||||
metrics = msg.Metrics |
||||
traceList = append(traceList, msg.Traces...) |
||||
traceList = removeDuplicates(traceList) |
||||
|
||||
if err := sendResponse(&ExtendedResponse{ |
||||
State: dataquery.SearchStreamingStateStreaming, |
||||
SearchResponse: &tempopb.SearchResponse{ |
||||
Metrics: metrics, |
||||
Traces: traceList, |
||||
}, |
||||
}, sender); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func sendResponse(response *ExtendedResponse, sender StreamSender) error { |
||||
frame := createResponseDataFrame() |
||||
|
||||
if response != nil { |
||||
tracesAsJson, err := json.Marshal(response.Traces) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
tracesRawMessage := json.RawMessage(tracesAsJson) |
||||
frame.Fields[0].Append(tracesRawMessage) |
||||
|
||||
metricsAsJson, err := json.Marshal(response.Metrics) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
metricsRawMessage := json.RawMessage(metricsAsJson) |
||||
frame.Fields[1].Append(metricsRawMessage) |
||||
frame.Fields[2].Append(string(response.State)) |
||||
frame.Fields[3].Append("") |
||||
} |
||||
|
||||
return sender.SendFrame(frame, data.IncludeAll) |
||||
} |
||||
|
||||
func sendError(searchErr error, sender StreamSender) error { |
||||
frame := createResponseDataFrame() |
||||
|
||||
if searchErr != nil { |
||||
frame.Fields[0].Append(json.RawMessage{}) |
||||
frame.Fields[1].Append(json.RawMessage{}) |
||||
frame.Fields[2].Append(string(dataquery.SearchStreamingStateError)) |
||||
frame.Fields[3].Append(searchErr.Error()) |
||||
} |
||||
|
||||
return sender.SendFrame(frame, data.IncludeAll) |
||||
} |
||||
|
||||
func createResponseDataFrame() *data.Frame { |
||||
frame := data.NewFrame("response") |
||||
frame.Fields = append(frame.Fields, data.NewField("traces", nil, []json.RawMessage{})) |
||||
frame.Fields = append(frame.Fields, data.NewField("metrics", nil, []json.RawMessage{})) |
||||
frame.Fields = append(frame.Fields, data.NewField("state", nil, []string{})) |
||||
frame.Fields = append(frame.Fields, data.NewField("error", nil, []string{})) |
||||
|
||||
return frame |
||||
} |
||||
|
||||
func removeDuplicates(traceList []*tempopb.TraceSearchMetadata) []*tempopb.TraceSearchMetadata { |
||||
keys := make(map[string]bool) |
||||
var list []*tempopb.TraceSearchMetadata |
||||
|
||||
for _, entry := range traceList { |
||||
if _, value := keys[entry.TraceID]; !value { |
||||
keys[entry.TraceID] = true |
||||
list = append(list, entry) |
||||
} |
||||
} |
||||
return list |
||||
} |
||||
@ -0,0 +1,229 @@ |
||||
package tempo |
||||
|
||||
import ( |
||||
"context" |
||||
"encoding/json" |
||||
"errors" |
||||
"io" |
||||
"reflect" |
||||
"strings" |
||||
"testing" |
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend" |
||||
"github.com/grafana/grafana-plugin-sdk-go/data" |
||||
"github.com/grafana/grafana/pkg/infra/log" |
||||
"github.com/grafana/grafana/pkg/tsdb/tempo/kinds/dataquery" |
||||
"github.com/grafana/tempo/pkg/tempopb" |
||||
"google.golang.org/grpc/metadata" |
||||
) |
||||
|
||||
func TestProcessStream_ValidInput_ReturnsNoError(t *testing.T) { |
||||
service := &Service{} |
||||
searchClient := &mockStreamer{} |
||||
streamSender := &mockSender{} |
||||
err := service.processStream(searchClient, streamSender) |
||||
if err != nil { |
||||
t.Errorf("Expected no error, but got %s", err) |
||||
} |
||||
} |
||||
func TestProcessStream_InvalidInput_ReturnsError(t *testing.T) { |
||||
logger := log.New("tsdb.tempo.test") |
||||
service := &Service{ |
||||
logger: logger, |
||||
} |
||||
searchClient := &mockStreamer{err: errors.New("invalid input")} |
||||
streamSender := &mockSender{} |
||||
err := service.processStream(searchClient, streamSender) |
||||
if err != nil { |
||||
if !strings.Contains(err.Error(), "invalid input") { |
||||
t.Errorf("Expected error message to contain 'invalid input', but got %s", err) |
||||
} |
||||
} |
||||
} |
||||
func TestProcessStream_ValidInput_ReturnsExpectedOutput(t *testing.T) { |
||||
logger := log.New("tsdb.tempo.test") |
||||
service := &Service{ |
||||
logger: logger, |
||||
} |
||||
searchClient := &mockStreamer{ |
||||
tracingMetadata: []*tempopb.TraceSearchMetadata{ |
||||
{TraceID: "abcdefg", StartTimeUnixNano: 1234}, |
||||
{TraceID: "hijklmn", StartTimeUnixNano: 5678}, |
||||
}, |
||||
metrics: &tempopb.SearchMetrics{ |
||||
CompletedJobs: 2, |
||||
TotalJobs: 5, |
||||
InspectedBytes: 123456789, |
||||
TotalBlockBytes: 987654321, |
||||
InspectedTraces: 123, |
||||
}, |
||||
expectedResponses: []ExtendedResponse{ |
||||
{ |
||||
SearchResponse: &tempopb.SearchResponse{ |
||||
Traces: []*tempopb.TraceSearchMetadata{ |
||||
{TraceID: "abcdefg", StartTimeUnixNano: 1234}, |
||||
}, |
||||
Metrics: &tempopb.SearchMetrics{ |
||||
CompletedJobs: 2, |
||||
TotalJobs: 5, |
||||
InspectedBytes: 123456789, |
||||
TotalBlockBytes: 987654321, |
||||
InspectedTraces: 123, |
||||
}, |
||||
}, |
||||
State: dataquery.SearchStreamingStateStreaming, |
||||
}, |
||||
{ |
||||
SearchResponse: &tempopb.SearchResponse{ |
||||
Traces: []*tempopb.TraceSearchMetadata{ |
||||
{TraceID: "abcdefg", StartTimeUnixNano: 1234}, |
||||
{TraceID: "hijklmn", StartTimeUnixNano: 5678}, |
||||
}, |
||||
Metrics: &tempopb.SearchMetrics{ |
||||
CompletedJobs: 2, |
||||
TotalJobs: 5, |
||||
InspectedBytes: 123456789, |
||||
TotalBlockBytes: 987654321, |
||||
InspectedTraces: 123, |
||||
}, |
||||
}, |
||||
State: dataquery.SearchStreamingStateStreaming, |
||||
}, |
||||
|
||||
{ |
||||
SearchResponse: &tempopb.SearchResponse{ |
||||
Traces: []*tempopb.TraceSearchMetadata{ |
||||
{TraceID: "abcdefg", StartTimeUnixNano: 1234}, |
||||
{TraceID: "hijklmn", StartTimeUnixNano: 5678}, |
||||
}, |
||||
Metrics: &tempopb.SearchMetrics{ |
||||
CompletedJobs: 2, |
||||
TotalJobs: 5, |
||||
InspectedBytes: 123456789, |
||||
TotalBlockBytes: 987654321, |
||||
InspectedTraces: 123, |
||||
}, |
||||
}, |
||||
State: dataquery.SearchStreamingStateDone, |
||||
}, |
||||
}, |
||||
} |
||||
streamSender := &mockSender{} |
||||
err := service.processStream(searchClient, streamSender) |
||||
if err != nil { |
||||
t.Errorf("Expected no error, but got %s", err) |
||||
return |
||||
} |
||||
if len(streamSender.responses) != 3 { |
||||
t.Errorf("Expected 3 responses, but got %d", len(streamSender.responses)) |
||||
return |
||||
} |
||||
|
||||
for i, frame := range streamSender.responses { |
||||
expectedMetrics := searchClient.expectedResponses[i].Metrics |
||||
expectedTraces := searchClient.expectedResponses[i].Traces |
||||
expectedState := string(searchClient.expectedResponses[i].State) |
||||
|
||||
if len(frame.Fields) != 4 { |
||||
t.Errorf("Expected 4 fields in data frame, but was '%d'", len(frame.Fields)) |
||||
return |
||||
} |
||||
var traceList []*tempopb.TraceSearchMetadata |
||||
if err := json.Unmarshal(frame.Fields[0].At(0).(json.RawMessage), &traceList); err != nil { |
||||
t.Errorf("Error unmarshaling trace list: %s", err) |
||||
} else { |
||||
if !reflect.DeepEqual(traceList, expectedTraces) { |
||||
t.Errorf("Expected response traces to be '%+v', but was '%+v'", |
||||
expectedTraces, traceList) |
||||
return |
||||
} |
||||
} |
||||
|
||||
var metrics *tempopb.SearchMetrics |
||||
if err := json.Unmarshal(frame.Fields[1].At(0).(json.RawMessage), &metrics); err != nil { |
||||
t.Errorf("Error unmarshaling metrics: %s", err) |
||||
} else { |
||||
if !reflect.DeepEqual(metrics, expectedMetrics) { |
||||
t.Errorf("Expected response metrics to be '%+v', but was '%+v'", |
||||
expectedMetrics, metrics) |
||||
return |
||||
} |
||||
} |
||||
|
||||
state := frame.Fields[2].At(0).(string) |
||||
if state != expectedState { |
||||
t.Errorf("Expected response state to be '%+v', but was '%+v'", expectedState, |
||||
state) |
||||
return |
||||
} |
||||
frameErr := frame.Fields[3].At(0).(string) |
||||
if frameErr != "" { |
||||
t.Errorf("Didn't expect error but got '%+v'", frameErr) |
||||
return |
||||
} |
||||
} |
||||
} |
||||
|
||||
type mockSender struct { |
||||
backend.StreamSender |
||||
responses []*data.Frame |
||||
} |
||||
|
||||
func (s *mockSender) SendFrame(frame *data.Frame, include data.FrameInclude) error { |
||||
s.responses = append(s.responses, frame) |
||||
return nil |
||||
} |
||||
|
||||
type mockStreamer struct { |
||||
tracingMetadata []*tempopb.TraceSearchMetadata |
||||
copyOfTracingMetadata []*tempopb.TraceSearchMetadata |
||||
metrics *tempopb.SearchMetrics |
||||
expectedResponses []ExtendedResponse |
||||
err error |
||||
} |
||||
|
||||
func (m *mockStreamer) Recv() (*tempopb.SearchResponse, error) { |
||||
if m.err != nil { |
||||
return nil, m.err |
||||
} |
||||
if m.copyOfTracingMetadata == nil { |
||||
m.copyOfTracingMetadata = make([]*tempopb.TraceSearchMetadata, len(m.tracingMetadata)) |
||||
copy(m.copyOfTracingMetadata, m.tracingMetadata) |
||||
} |
||||
if len(m.copyOfTracingMetadata) == 0 { |
||||
return &tempopb.SearchResponse{ |
||||
Metrics: m.metrics, |
||||
Traces: m.tracingMetadata, |
||||
}, io.EOF |
||||
} |
||||
traceMetadata := m.copyOfTracingMetadata[0] |
||||
m.copyOfTracingMetadata = m.copyOfTracingMetadata[1:] |
||||
return &tempopb.SearchResponse{ |
||||
Metrics: m.metrics, |
||||
Traces: []*tempopb.TraceSearchMetadata{traceMetadata}, |
||||
}, nil |
||||
} |
||||
|
||||
func (m *mockStreamer) Header() (metadata.MD, error) { |
||||
panic("implement me") |
||||
} |
||||
|
||||
func (m *mockStreamer) Trailer() metadata.MD { |
||||
panic("implement me") |
||||
} |
||||
|
||||
func (m *mockStreamer) CloseSend() error { |
||||
panic("implement me") |
||||
} |
||||
|
||||
func (m *mockStreamer) Context() context.Context { |
||||
panic("implement me") |
||||
} |
||||
|
||||
func (m *mockStreamer) SendMsg(a interface{}) error { |
||||
panic("implement me") |
||||
} |
||||
|
||||
func (m *mockStreamer) RecvMsg(a interface{}) error { |
||||
panic("implement me") |
||||
} |
||||
@ -0,0 +1,48 @@ |
||||
package tempo |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"strings" |
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend" |
||||
) |
||||
|
||||
func (s *Service) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) { |
||||
s.logger.Debug("Allowing access to stream", "path", req.Path, "user", req.PluginContext.User) |
||||
status := backend.SubscribeStreamStatusPermissionDenied |
||||
if strings.HasPrefix(req.Path, SearchPathPrefix) { |
||||
status = backend.SubscribeStreamStatusOK |
||||
} |
||||
|
||||
return &backend.SubscribeStreamResponse{ |
||||
Status: status, |
||||
}, nil |
||||
} |
||||
|
||||
func (s *Service) PublishStream(ctx context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) { |
||||
s.logger.Debug("PublishStream called") |
||||
|
||||
// Do not allow publishing at all.
|
||||
return &backend.PublishStreamResponse{ |
||||
Status: backend.PublishStreamStatusPermissionDenied, |
||||
}, nil |
||||
} |
||||
|
||||
func (s *Service) RunStream(ctx context.Context, request *backend.RunStreamRequest, sender *backend.StreamSender) error { |
||||
s.logger.Debug("New stream call", "path", request.Path) |
||||
|
||||
if strings.HasPrefix(request.Path, SearchPathPrefix) { |
||||
tempoDatasource, err := s.getDSInfo(ctx, request.PluginContext) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
if err = s.runSearchStream(ctx, request, sender, tempoDatasource); err != nil { |
||||
return sendError(err, sender) |
||||
} else { |
||||
return nil |
||||
} |
||||
} |
||||
|
||||
return fmt.Errorf("unknown path %s", request.Path) |
||||
} |
||||
@ -0,0 +1,90 @@ |
||||
package tempo |
||||
|
||||
import ( |
||||
"context" |
||||
"encoding/json" |
||||
"fmt" |
||||
"io" |
||||
"net/http" |
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend" |
||||
"github.com/grafana/grafana-plugin-sdk-go/data" |
||||
"github.com/grafana/grafana/pkg/tsdb/tempo/kinds/dataquery" |
||||
"go.opentelemetry.io/collector/model/otlp" |
||||
) |
||||
|
||||
func (s *Service) getTrace(ctx context.Context, pCtx backend.PluginContext, query backend.DataQuery) (*backend.DataResponse, error) { |
||||
result := &backend.DataResponse{} |
||||
refID := query.RefID |
||||
|
||||
model := &dataquery.TempoQuery{} |
||||
err := json.Unmarshal(query.JSON, model) |
||||
if err != nil { |
||||
return result, err |
||||
} |
||||
|
||||
dsInfo, err := s.getDSInfo(ctx, pCtx) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
request, err := s.createRequest(ctx, dsInfo, model.Query, query.TimeRange.From.Unix(), query.TimeRange.To.Unix()) |
||||
if err != nil { |
||||
return result, err |
||||
} |
||||
|
||||
resp, err := dsInfo.HTTPClient.Do(request) |
||||
if err != nil { |
||||
return result, fmt.Errorf("failed get to tempo: %w", err) |
||||
} |
||||
|
||||
defer func() { |
||||
if err := resp.Body.Close(); err != nil { |
||||
s.logger.FromContext(ctx).Warn("failed to close response body", "err", err) |
||||
} |
||||
}() |
||||
|
||||
body, err := io.ReadAll(resp.Body) |
||||
if err != nil { |
||||
return &backend.DataResponse{}, err |
||||
} |
||||
|
||||
if resp.StatusCode != http.StatusOK { |
||||
result.Error = fmt.Errorf("failed to get trace with id: %s Status: %s Body: %s", model.Query, resp.Status, string(body)) |
||||
return result, nil |
||||
} |
||||
|
||||
otTrace, err := otlp.NewProtobufTracesUnmarshaler().UnmarshalTraces(body) |
||||
|
||||
if err != nil { |
||||
return &backend.DataResponse{}, fmt.Errorf("failed to convert tempo response to Otlp: %w", err) |
||||
} |
||||
|
||||
frame, err := TraceToFrame(otTrace) |
||||
if err != nil { |
||||
return &backend.DataResponse{}, fmt.Errorf("failed to transform trace %v to data frame: %w", model.Query, err) |
||||
} |
||||
frame.RefID = refID |
||||
frames := []*data.Frame{frame} |
||||
result.Frames = frames |
||||
return result, nil |
||||
} |
||||
|
||||
func (s *Service) createRequest(ctx context.Context, dsInfo *Datasource, traceID string, start int64, end int64) (*http.Request, error) { |
||||
var tempoQuery string |
||||
if start == 0 || end == 0 { |
||||
tempoQuery = fmt.Sprintf("%s/api/traces/%s", dsInfo.URL, traceID) |
||||
} else { |
||||
tempoQuery = fmt.Sprintf("%s/api/traces/%s?start=%d&end=%d", dsInfo.URL, traceID, start, end) |
||||
} |
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", tempoQuery, nil) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
req.Header.Set("Accept", "application/protobuf") |
||||
|
||||
s.logger.FromContext(ctx).Debug("Tempo request", "url", req.URL.String(), "headers", req.Header) |
||||
return req, nil |
||||
} |
||||
@ -0,0 +1,166 @@ |
||||
import { capitalize } from 'lodash'; |
||||
import { map, Observable, defer, mergeMap } from 'rxjs'; |
||||
import { v4 as uuidv4 } from 'uuid'; |
||||
|
||||
import { |
||||
DataFrame, |
||||
DataQueryRequest, |
||||
DataQueryResponse, |
||||
DataSourceInstanceSettings, |
||||
FieldType, |
||||
LiveChannelScope, |
||||
LoadingState, |
||||
MutableDataFrame, |
||||
ThresholdsConfig, |
||||
ThresholdsMode, |
||||
} from '@grafana/data'; |
||||
import { getGrafanaLiveSrv } from '@grafana/runtime'; |
||||
|
||||
import { SearchStreamingState } from './dataquery.gen'; |
||||
import { TempoDatasource } from './datasource'; |
||||
import { createTableFrameFromTraceQlQuery } from './resultTransformer'; |
||||
import { SearchMetrics, TempoJsonData, TempoQuery } from './types'; |
||||
export async function getLiveStreamKey(): Promise<string> { |
||||
return uuidv4(); |
||||
} |
||||
|
||||
export function doTempoChannelStream( |
||||
query: TempoQuery, |
||||
ds: TempoDatasource, |
||||
options: DataQueryRequest<TempoQuery>, |
||||
instanceSettings: DataSourceInstanceSettings<TempoJsonData> |
||||
): Observable<DataQueryResponse> { |
||||
const range = options.range; |
||||
|
||||
let frames: DataFrame[] | undefined = undefined; |
||||
let state: LoadingState = LoadingState.NotStarted; |
||||
|
||||
return defer(() => getLiveStreamKey()).pipe( |
||||
mergeMap((key) => { |
||||
return getGrafanaLiveSrv() |
||||
.getStream<MutableDataFrame>({ |
||||
scope: LiveChannelScope.DataSource, |
||||
namespace: ds.uid, |
||||
path: `search/${key}`, |
||||
data: { |
||||
...query, |
||||
timeRange: { |
||||
from: range.from.toISOString(), |
||||
to: range.to.toISOString(), |
||||
}, |
||||
}, |
||||
}) |
||||
.pipe( |
||||
map((evt) => { |
||||
if ('message' in evt && evt?.message) { |
||||
// Schema should be [traces, metrics, state, error]
|
||||
const traces = evt.message.data.values[0][0]; |
||||
const metrics = evt.message.data.values[1][0]; |
||||
const frameState: SearchStreamingState = evt.message.data.values[2][0]; |
||||
const error = evt.message.data.values[3][0]; |
||||
|
||||
switch (frameState) { |
||||
case SearchStreamingState.Done: |
||||
state = LoadingState.Done; |
||||
break; |
||||
case SearchStreamingState.Streaming: |
||||
state = LoadingState.Streaming; |
||||
break; |
||||
case SearchStreamingState.Error: |
||||
throw new Error(error); |
||||
} |
||||
|
||||
frames = [ |
||||
metricsDataFrame(metrics, frameState), |
||||
...createTableFrameFromTraceQlQuery(traces, instanceSettings), |
||||
]; |
||||
} |
||||
return { |
||||
data: frames || [], |
||||
state, |
||||
}; |
||||
}) |
||||
); |
||||
}) |
||||
); |
||||
} |
||||
|
||||
function metricsDataFrame(metrics: SearchMetrics, state: SearchStreamingState) { |
||||
const progressThresholds: ThresholdsConfig = { |
||||
steps: [ |
||||
{ |
||||
color: 'blue', |
||||
value: -Infinity, |
||||
}, |
||||
{ |
||||
color: 'green', |
||||
value: 75, |
||||
}, |
||||
], |
||||
mode: ThresholdsMode.Absolute, |
||||
}; |
||||
|
||||
const frame: DataFrame = { |
||||
refId: 'streaming-progress', |
||||
name: 'Streaming Progress', |
||||
length: 1, |
||||
fields: [ |
||||
{ |
||||
name: 'state', |
||||
type: FieldType.string, |
||||
values: [capitalize(state.toString())], |
||||
config: { |
||||
displayNameFromDS: 'State', |
||||
}, |
||||
}, |
||||
{ |
||||
name: 'totalBlocks', |
||||
type: FieldType.number, |
||||
values: [metrics.totalBlocks], |
||||
config: { |
||||
displayNameFromDS: 'Total Blocks', |
||||
}, |
||||
}, |
||||
{ |
||||
name: 'completedJobs', |
||||
type: FieldType.number, |
||||
values: [metrics.completedJobs], |
||||
config: { |
||||
displayNameFromDS: 'Completed Jobs', |
||||
}, |
||||
}, |
||||
{ |
||||
name: 'totalJobs', |
||||
type: FieldType.number, |
||||
values: [metrics.totalJobs], |
||||
config: { |
||||
displayNameFromDS: 'Total Jobs', |
||||
}, |
||||
}, |
||||
{ |
||||
name: 'progress', |
||||
type: FieldType.number, |
||||
values: [ |
||||
state === SearchStreamingState.Done ? 100 : ((metrics.completedJobs || 0) / (metrics.totalJobs || 1)) * 100, |
||||
], |
||||
config: { |
||||
displayNameFromDS: 'Total Jobs', |
||||
unit: 'percent', |
||||
min: 0, |
||||
max: 100, |
||||
custom: { |
||||
cellOptions: { |
||||
type: 'gauge', |
||||
mode: 'gradient', |
||||
}, |
||||
}, |
||||
thresholds: progressThresholds, |
||||
}, |
||||
}, |
||||
], |
||||
meta: { |
||||
preferredVisualisationType: 'table', |
||||
}, |
||||
}; |
||||
return frame; |
||||
} |
||||
Loading…
Reference in new issue