The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
grafana/pkg/infra/tracing/tracing.go

417 lines
12 KiB

package tracing
import (
"context"
"errors"
"fmt"
"math"
"net"
"net/http"
"strings"
"sync"
"time"
jaegerpropagator "go.opentelemetry.io/contrib/propagators/jaeger"
"go.opentelemetry.io/contrib/samplers/jaegerremote"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
trace "go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"github.com/go-kit/log/level"
"github.com/grafana/grafana/pkg/apimachinery/errutil"
"github.com/grafana/grafana/pkg/infra/log"
)
const (
envJaegerAgentHost = "JAEGER_AGENT_HOST"
envJaegerAgentPort = "JAEGER_AGENT_PORT"
)
const (
jaegerExporter string = "jaeger"
otlpExporter string = "otlp"
noopExporter string = "noop"
jaegerPropagator string = "jaeger"
w3cPropagator string = "w3c"
)
type TracingService struct {
cfg *TracingConfig
log log.Logger
tracerProvider tracerProvider
trace.Tracer
}
type tracerProvider interface {
trace.TracerProvider
Shutdown(ctx context.Context) error
}
// Tracer defines the service used to create new spans.
type Tracer interface {
trace.Tracer
// Inject adds identifying information for the span to the
// headers defined in [http.Header] map (this mutates http.Header).
//
// Implementation quirk: Where OpenTelemetry is used, the [Span] is
// picked up from [context.Context] and for OpenTracing the
// information passed as [Span] is preferred.
// Both the context and span must be derived from the same call to
// [Tracer.Start].
Inject(context.Context, http.Header, trace.Span)
}
func ProvideService(tracingCfg *TracingConfig) (*TracingService, error) {
if tracingCfg == nil {
return nil, fmt.Errorf("tracingCfg cannot be nil")
}
log.RegisterContextualLogProvider(func(ctx context.Context) ([]any, bool) {
if traceID := TraceIDFromContext(ctx, false); traceID != "" {
return []any{"traceID", traceID}, true
}
return nil, false
})
ots := &TracingService{
cfg: tracingCfg,
log: log.New("tracing"),
}
if err := ots.initOpentelemetryTracer(); err != nil {
return nil, err
}
return ots, nil
}
func NewNoopTracerService() *TracingService {
tp := &noopTracerProvider{TracerProvider: noop.NewTracerProvider()}
otel.SetTracerProvider(tp)
cfg := NewEmptyTracingConfig()
ots := &TracingService{cfg: cfg, tracerProvider: tp}
_ = ots.initOpentelemetryTracer()
return ots
}
func (ots *TracingService) GetTracerProvider() tracerProvider {
return ots.tracerProvider
}
type noopTracerProvider struct {
trace.TracerProvider
}
func (noopTracerProvider) Shutdown(ctx context.Context) error {
return nil
}
func (ots *TracingService) initJaegerTracerProvider() (*tracesdk.TracerProvider, error) {
var ep jaeger.EndpointOption
// Create the Jaeger exporter: address can be either agent address (host:port) or collector URL
if strings.HasPrefix(ots.cfg.Address, "http://") || strings.HasPrefix(ots.cfg.Address, "https://") {
ots.log.Debug("using jaeger collector", "address", ots.cfg.Address)
ep = jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(ots.cfg.Address))
} else if host, port, err := net.SplitHostPort(ots.cfg.Address); err == nil {
ots.log.Debug("using jaeger agent", "host", host, "port", port)
ep = jaeger.WithAgentEndpoint(jaeger.WithAgentHost(host), jaeger.WithAgentPort(port), jaeger.WithMaxPacketSize(64000))
} else {
return nil, fmt.Errorf("invalid tracer address: %s", ots.cfg.Address)
}
exp, err := jaeger.New(ep)
if err != nil {
return nil, err
}
res, err := resource.New(
context.Background(),
resource.WithAttributes(
// TODO: why are these attributes different from ones added to the
// OTLP provider?
semconv.ServiceNameKey.String(ots.cfg.ServiceName),
attribute.String("environment", "production"),
),
resource.WithAttributes(ots.cfg.CustomAttribs...),
)
if err != nil {
return nil, err
}
sampler, err := ots.initSampler()
if err != nil {
return nil, err
}
tp := tracesdk.NewTracerProvider(
tracesdk.WithBatcher(exp),
tracesdk.WithResource(res),
tracesdk.WithSampler(sampler),
)
return tp, nil
}
func (ots *TracingService) initOTLPTracerProvider() (*tracesdk.TracerProvider, error) {
client := otlptracegrpc.NewClient(otlptracegrpc.WithEndpoint(ots.cfg.Address), otlptracegrpc.WithInsecure())
exp, err := otlptrace.New(context.Background(), client)
if err != nil {
return nil, err
}
sampler, err := ots.initSampler()
if err != nil {
return nil, err
}
return initTracerProvider(exp, ots.cfg.ServiceName, ots.cfg.ServiceVersion, sampler, ots.cfg.CustomAttribs...)
}
func (ots *TracingService) initSampler() (tracesdk.Sampler, error) {
switch ots.cfg.Sampler {
case "const", "":
if ots.cfg.SamplerParam >= 1 {
return tracesdk.AlwaysSample(), nil
} else if ots.cfg.SamplerParam <= 0 {
return tracesdk.NeverSample(), nil
}
return nil, fmt.Errorf("invalid param for const sampler - must be 0 or 1: %f", ots.cfg.SamplerParam)
case "probabilistic":
return tracesdk.TraceIDRatioBased(ots.cfg.SamplerParam), nil
case "rateLimiting":
return newRateLimiter(ots.cfg.SamplerParam), nil
case "remote":
return jaegerremote.New("grafana",
jaegerremote.WithSamplingServerURL(ots.cfg.SamplerRemoteURL),
jaegerremote.WithInitialSampler(tracesdk.TraceIDRatioBased(ots.cfg.SamplerParam)),
), nil
default:
return nil, fmt.Errorf("invalid sampler type: %s", ots.cfg.Sampler)
}
}
func initTracerProvider(exp tracesdk.SpanExporter, serviceName string, serviceVersion string, sampler tracesdk.Sampler, customAttribs ...attribute.KeyValue) (*tracesdk.TracerProvider, error) {
res, err := resource.New(
context.Background(),
resource.WithAttributes(
semconv.ServiceNameKey.String(serviceName),
semconv.ServiceVersionKey.String(serviceVersion),
),
resource.WithAttributes(customAttribs...),
resource.WithFromEnv(),
resource.WithProcessRuntimeDescription(),
resource.WithTelemetrySDK(),
)
if err != nil {
return nil, err
}
tp := tracesdk.NewTracerProvider(
tracesdk.WithBatcher(exp),
tracesdk.WithSampler(tracesdk.ParentBased(sampler)),
tracesdk.WithResource(res),
)
return tp, nil
}
func (ots *TracingService) initNoopTracerProvider() (tracerProvider, error) {
return &noopTracerProvider{TracerProvider: noop.NewTracerProvider()}, nil
}
func (ots *TracingService) initOpentelemetryTracer() error {
var tp tracerProvider
var err error
switch ots.cfg.enabled {
case jaegerExporter:
tp, err = ots.initJaegerTracerProvider()
if err != nil {
return err
}
case otlpExporter:
tp, err = ots.initOTLPTracerProvider()
if err != nil {
return err
}
default:
tp, err = ots.initNoopTracerProvider()
if err != nil {
return err
}
}
if ots.cfg.ProfilingIntegration {
tp = NewProfilingTracerProvider(tp)
}
// Register our TracerProvider as the global so any imported
// instrumentation in the future will default to using it
// only if tracing is enabled
if ots.cfg.enabled != "" {
otel.SetTracerProvider(tp)
}
propagators := []propagation.TextMapPropagator{}
for _, p := range strings.Split(ots.cfg.Propagation, ",") {
switch p {
case w3cPropagator:
propagators = append(propagators, propagation.TraceContext{}, propagation.Baggage{})
case jaegerPropagator:
propagators = append(propagators, jaegerpropagator.Jaeger{})
case "":
default:
return fmt.Errorf("unsupported OpenTelemetry propagator: %q", p)
}
}
switch len(propagators) {
case 0:
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{}, propagation.Baggage{},
))
case 1:
otel.SetTextMapPropagator(propagators[0])
default:
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagators...))
}
if ots.tracerProvider == nil {
ots.tracerProvider = tp
}
ots.Tracer = otel.GetTracerProvider().Tracer("component-main")
return nil
}
func (ots *TracingService) Run(ctx context.Context) error {
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) {
err = level.Error(ots.log).Log("msg", "OpenTelemetry handler returned an error", "err", err)
if err != nil {
ots.log.Error("OpenTelemetry log returning error", err)
}
}))
<-ctx.Done()
ots.log.Info("Closing tracing")
if ots.tracerProvider == nil {
return nil
}
ctxShutdown, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
if err := ots.tracerProvider.Shutdown(ctxShutdown); err != nil {
return err
}
return nil
}
func (ots *TracingService) Inject(ctx context.Context, header http.Header, _ trace.Span) {
otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(header))
}
func (ots *TracingService) OtelTracer() trace.Tracer {
return ots
}
type rateLimiter struct {
sync.Mutex
description string
rps float64
balance float64
maxBalance float64
lastTick time.Time
now func() time.Time
}
func newRateLimiter(rps float64) *rateLimiter {
return &rateLimiter{
rps: rps,
description: fmt.Sprintf("RateLimitingSampler{%g}", rps),
balance: math.Max(rps, 1),
maxBalance: math.Max(rps, 1),
lastTick: time.Now(),
now: time.Now,
}
}
func (rl *rateLimiter) ShouldSample(p tracesdk.SamplingParameters) tracesdk.SamplingResult {
rl.Lock()
defer rl.Unlock()
psc := trace.SpanContextFromContext(p.ParentContext)
if rl.balance >= 1 {
rl.balance -= 1
return tracesdk.SamplingResult{Decision: tracesdk.RecordAndSample, Tracestate: psc.TraceState()}
}
currentTime := rl.now()
elapsedTime := currentTime.Sub(rl.lastTick).Seconds()
rl.lastTick = currentTime
rl.balance = math.Min(rl.maxBalance, rl.balance+elapsedTime*rl.rps)
if rl.balance >= 1 {
rl.balance -= 1
return tracesdk.SamplingResult{Decision: tracesdk.RecordAndSample, Tracestate: psc.TraceState()}
}
return tracesdk.SamplingResult{Decision: tracesdk.Drop, Tracestate: psc.TraceState()}
}
func (rl *rateLimiter) Description() string { return rl.description }
func TraceIDFromContext(ctx context.Context, requireSampled bool) string {
spanCtx := trace.SpanContextFromContext(ctx)
if !spanCtx.HasTraceID() || !spanCtx.IsValid() || (requireSampled && !spanCtx.IsSampled()) {
return ""
}
return spanCtx.TraceID().String()
}
func ServerTimingForSpan(span trace.Span) string {
spanCtx := span.SpanContext()
if !spanCtx.HasTraceID() || !spanCtx.IsValid() {
return ""
}
return fmt.Sprintf("00-%s-%s-01", spanCtx.TraceID().String(), spanCtx.SpanID().String())
}
// Error sets the status to error and record the error as an exception in the provided span.
func Error(span trace.Span, err error) error {
attr := []attribute.KeyValue{}
grafanaErr := errutil.Error{}
if errors.As(err, &grafanaErr) {
attr = append(attr, attribute.String("message_id", grafanaErr.MessageID))
}
span.SetStatus(codes.Error, err.Error())
span.RecordError(err, trace.WithAttributes(attr...))
return err
}
// Errorf wraps fmt.Errorf and also sets the status to error and record the error as an exception in the provided span.
func Errorf(span trace.Span, format string, args ...any) error {
err := fmt.Errorf(format, args...)
return Error(span, err)
}
var instrumentationScope = "github.com/grafana/grafana/pkg/infra/tracing"
// Start only creates an OpenTelemetry span if the incoming context already includes a span.
func Start(ctx context.Context, name string, attributes ...attribute.KeyValue) (context.Context, trace.Span) {
return trace.SpanFromContext(ctx).TracerProvider().Tracer(instrumentationScope).Start(ctx, name, trace.WithAttributes(attributes...))
}