mirror of https://github.com/grafana/loki
refactor: Use OTel tracing library (#17859)
Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>pull/17943/head
parent
ad0bef31e7
commit
c8a15f451c
@ -0,0 +1,22 @@ |
||||
package tracing |
||||
|
||||
import ( |
||||
"fmt" |
||||
|
||||
"github.com/grafana/dskit/tracing" |
||||
"go.opentelemetry.io/otel/attribute" |
||||
) |
||||
|
||||
func KeyValuesToOTelAttributes(kvps ...any) []attribute.KeyValue { |
||||
attrs := make([]attribute.KeyValue, 0, len(kvps)/2) |
||||
for i := 0; i < len(kvps); i += 2 { |
||||
if i+1 < len(kvps) { |
||||
key, ok := kvps[i].(string) |
||||
if !ok { |
||||
key = fmt.Sprintf("not_string_key:%v", kvps[i]) |
||||
} |
||||
attrs = append(attrs, tracing.KeyValueToOTelAttribute(key, kvps[i+1])) |
||||
} |
||||
} |
||||
return attrs |
||||
} |
||||
@ -1,52 +0,0 @@ |
||||
package spanlogger |
||||
|
||||
import ( |
||||
opentracing "github.com/opentracing/opentracing-go" |
||||
"github.com/opentracing/opentracing-go/log" |
||||
) |
||||
|
||||
type noopTracer struct{} |
||||
|
||||
type noopSpan struct{} |
||||
type noopSpanContext struct{} |
||||
|
||||
var ( |
||||
defaultNoopSpanContext = noopSpanContext{} |
||||
defaultNoopSpan = noopSpan{} |
||||
defaultNoopTracer = noopTracer{} |
||||
) |
||||
|
||||
const ( |
||||
emptyString = "" |
||||
) |
||||
|
||||
func (n noopSpanContext) ForeachBaggageItem(_ func(k, v string) bool) {} |
||||
|
||||
func (n noopSpan) Context() opentracing.SpanContext { return defaultNoopSpanContext } |
||||
func (n noopSpan) SetBaggageItem(_, _ string) opentracing.Span { return defaultNoopSpan } |
||||
func (n noopSpan) BaggageItem(_ string) string { return emptyString } |
||||
func (n noopSpan) SetTag(_ string, _ interface{}) opentracing.Span { return n } |
||||
func (n noopSpan) LogFields(_ ...log.Field) {} |
||||
func (n noopSpan) LogKV(_ ...interface{}) {} |
||||
func (n noopSpan) Finish() {} |
||||
func (n noopSpan) FinishWithOptions(_ opentracing.FinishOptions) {} |
||||
func (n noopSpan) SetOperationName(_ string) opentracing.Span { return n } |
||||
func (n noopSpan) Tracer() opentracing.Tracer { return defaultNoopTracer } |
||||
func (n noopSpan) LogEvent(_ string) {} |
||||
func (n noopSpan) LogEventWithPayload(_ string, _ interface{}) {} |
||||
func (n noopSpan) Log(_ opentracing.LogData) {} |
||||
|
||||
// StartSpan belongs to the Tracer interface.
|
||||
func (n noopTracer) StartSpan(_ string, _ ...opentracing.StartSpanOption) opentracing.Span { |
||||
return defaultNoopSpan |
||||
} |
||||
|
||||
// Inject belongs to the Tracer interface.
|
||||
func (n noopTracer) Inject(_ opentracing.SpanContext, _ interface{}, _ interface{}) error { |
||||
return nil |
||||
} |
||||
|
||||
// Extract belongs to the Tracer interface.
|
||||
func (n noopTracer) Extract(_ interface{}, _ interface{}) (opentracing.SpanContext, error) { |
||||
return nil, opentracing.ErrSpanContextNotFound |
||||
} |
||||
@ -1,104 +0,0 @@ |
||||
# Span Profiler for OpenTracing-Go |
||||
|
||||
## Overview |
||||
|
||||
The Span Profiler for OpenTracing-Go is a package that seamlessly integrates `opentracing-go` instrumentation with |
||||
profiling through the use of pprof labels. |
||||
|
||||
Accessing trace span profiles is made convenient through the Grafana Explore view. You can find a complete example setup |
||||
with Grafana Tempo in the [Pyroscope repository](https://github.com/grafana/pyroscope/tree/main/examples/tracing/golang-push): |
||||
|
||||
 |
||||
|
||||
## Usage |
||||
|
||||
There are two primary ways to use the Span Profiler: |
||||
|
||||
### 1. Wrap the Global Tracer. |
||||
|
||||
You can wrap the global tracer using `spanprofiler.NewTracer`: |
||||
|
||||
```go |
||||
import ( |
||||
"github.com/opentracing/opentracing-go" |
||||
"github.com/grafana/dskit/spanprofiler" |
||||
) |
||||
|
||||
func main() { |
||||
// Initialize your OpenTracing tracer |
||||
tracer := opentracing.GlobalTracer() |
||||
// Wrap it with the tracer-profiler |
||||
wrappedTracer := spanprofiler.NewTracer(tracer) |
||||
// Use the wrapped tracer in your application |
||||
opentracing.SetGlobalTracer(wrappedTracer) |
||||
|
||||
// Or, as an oneliner: |
||||
// opentracing.SetGlobalTracer(spanprofiler.NewTracer(opentracing.GlobalTracer())) |
||||
|
||||
// Your application logic here |
||||
} |
||||
``` |
||||
|
||||
For efficiency, the tracer selectively records profiles for _root_ spans — the initial _local_ span in a process — since |
||||
a trace may encompass thousands of spans. All stack trace samples accumulated during the execution of their child spans |
||||
contribute to the root span's profile. In practical terms, this signifies that, for instance, an HTTP request results |
||||
in a singular profile, irrespective of the numerous spans within the trace. It's important to note that these profiles |
||||
don't extend beyond the boundaries of a single process. |
||||
|
||||
The limitation of this approach is that only spans created within the same goroutine, or its children, as the parent are |
||||
taken into account. Consequently, in scenarios involving asynchronous execution, where the parent span context is passed |
||||
to another goroutine, explicit profiling becomes necessary using `spanprofiler.StartSpanFromContext`. |
||||
|
||||
### 2. Profile individual spans. |
||||
|
||||
The `spanprofiler.StartSpanFromContext` function allows you to granularly control which spans to profile: |
||||
|
||||
```go |
||||
func YourOperationName(ctx context.Background()) { |
||||
// Start a span and enable profiling for it |
||||
span, ctx := spanprofiler.StartSpanFromContext(ctx, "YourOperationName", tracer) |
||||
defer span.Finish() // Finish the span when done |
||||
|
||||
// Use the span in your application logic |
||||
} |
||||
``` |
||||
|
||||
The function guarantees that the span is to be profiled. |
||||
|
||||
Both methods can be employed either in conjunction or independently. Our recommendation is to utilize the tracer for |
||||
seamless integration, reserving explicit span profiling only for cases where spans are spawned in detached goroutines. |
||||
|
||||
## Implementation details |
||||
|
||||
When a new trace span is created, and is eligible for profiling, the tracer sets `span_id` and `span_name` [pprof labels](https://github.com/google/pprof/blob/master/doc/README.md#tag-filtering) |
||||
that point to the respective span. These labels are stored in the goroutine's local storage and inherited by any |
||||
subsequent child goroutines. |
||||
|
||||
`span_name` is available as a regular label and can be used in the query expressions. For example, the following query |
||||
will show you profile for the code that is not covered with traces: |
||||
``` |
||||
{service_name="my-service",span_name=""} |
||||
``` |
||||
|
||||
Additionally, trace spans are identified by the `pyroscope.profile.id` attribute, indicating the associated profile. |
||||
This allows to find such spans in the trace view (in the screenshot) and fetch profiles for specific spans. |
||||
|
||||
It's important to note that the presence of this attribute does not guarantee profile availability; stack trace samples |
||||
might not be collected if the CPU time utilized falls below the sample interval (10ms). |
||||
|
||||
It is crucial to understand that this module doesn't directly control the pprof profiler; its initialization is still |
||||
necessary for profile collection. This initialization can be achieved through the `runtime/pprof` package, or using the |
||||
[Pyroscope client](https://github.com/grafana/pyroscope-go). |
||||
|
||||
Limitations: |
||||
- Only CPU profiling is fully supported at the moment. |
||||
- Only [Jaeger tracer](https://github.com/jaegertracing/jaeger-client-go) implementation is supported. |
||||
|
||||
## Performance implications |
||||
|
||||
The typical performance impact is generally imperceptible and primarily arises from the cost of pprof labeling. However, |
||||
intensive use of pprof labels may have negative impact on the profiled application. |
||||
|
||||
In the case of the tracer provided by this package, the `StartSpan` method wrapper introduces an approximate 20% increase |
||||
in CPU time compared to the original call. In vase majority of cases, the overhead constitutes less than 0.01% of the total |
||||
CPU time and is considered safe for deployment in production systems. |
||||
@ -1,107 +0,0 @@ |
||||
package spanprofiler |
||||
|
||||
import ( |
||||
"context" |
||||
"runtime/pprof" |
||||
|
||||
"github.com/opentracing/opentracing-go" |
||||
"github.com/uber/jaeger-client-go" |
||||
) |
||||
|
||||
// StartSpanFromContext starts and returns a Span with `operationName`, using
|
||||
// any Span found within `ctx` as a ChildOfRef. If no such parent could be
|
||||
// found, StartSpanFromContext creates a root (parentless) Span.
|
||||
//
|
||||
// The call sets `operationName` as `span_name` pprof label, and the new span
|
||||
// identifier as `span_id` pprof label, if the trace is sampled.
|
||||
//
|
||||
// The second return value is a context.Context object built around the
|
||||
// returned Span.
|
||||
//
|
||||
// Example usage:
|
||||
//
|
||||
// SomeFunction(ctx context.Context, ...) {
|
||||
// sp, ctx := opentracing.StartSpanFromContext(ctx, "SomeFunction")
|
||||
// defer sp.Finish()
|
||||
// ...
|
||||
// }
|
||||
func StartSpanFromContext(ctx context.Context, operationName string, opts ...opentracing.StartSpanOption) (opentracing.Span, context.Context) { |
||||
return StartSpanFromContextWithTracer(ctx, opentracing.GlobalTracer(), operationName, opts...) |
||||
} |
||||
|
||||
// StartSpanFromContextWithTracer starts and returns a span with `operationName`
|
||||
// using a span found within the context as a ChildOfRef. If that doesn't exist
|
||||
// it creates a root span. It also returns a context.Context object built
|
||||
// around the returned span.
|
||||
//
|
||||
// The call sets `operationName` as `span_name` pprof label, and the new span
|
||||
// identifier as `span_id` pprof label, if the trace is sampled.
|
||||
//
|
||||
// It's behavior is identical to StartSpanFromContext except that it takes an explicit
|
||||
// tracer as opposed to using the global tracer.
|
||||
func StartSpanFromContextWithTracer(ctx context.Context, tracer opentracing.Tracer, operationName string, opts ...opentracing.StartSpanOption) (opentracing.Span, context.Context) { |
||||
span, ctx := opentracing.StartSpanFromContextWithTracer(ctx, tracer, operationName, opts...) |
||||
spanCtx, ok := span.Context().(jaeger.SpanContext) |
||||
if ok { |
||||
span = wrapJaegerSpanWithGoroutineLabels(ctx, span, operationName, sampledSpanID(spanCtx)) |
||||
} |
||||
return span, ctx |
||||
} |
||||
|
||||
func wrapJaegerSpanWithGoroutineLabels( |
||||
parentCtx context.Context, |
||||
span opentracing.Span, |
||||
operationName string, |
||||
spanID string, |
||||
) *spanWrapper { |
||||
// Note that pprof labels are propagated through the goroutine's local
|
||||
// storage and are always copied to child goroutines. This way, stack
|
||||
// trace samples collected during execution of child spans will be taken
|
||||
// into account at the root.
|
||||
var ctx context.Context |
||||
if spanID != "" { |
||||
ctx = pprof.WithLabels(parentCtx, pprof.Labels( |
||||
spanNameLabelName, operationName, |
||||
spanIDLabelName, spanID)) |
||||
} else { |
||||
// Even if the trace has not been sampled, we still need to keep track
|
||||
// of samples that belong to the span (all spans with the given name).
|
||||
ctx = pprof.WithLabels(parentCtx, pprof.Labels( |
||||
spanNameLabelName, operationName)) |
||||
} |
||||
// Goroutine labels should be set as early as possible,
|
||||
// in order to capture the overhead of the function call.
|
||||
pprof.SetGoroutineLabels(ctx) |
||||
// We create a span wrapper to ensure we remove the newly attached pprof
|
||||
// labels when span finishes. The need of this wrapper is questioned:
|
||||
// as we do not have the original context, we could leave the goroutine
|
||||
// labels – normally, span is finished at the very end of the goroutine's
|
||||
// lifetime, so no significant side effects should take place.
|
||||
w := spanWrapper{ |
||||
parentPprofCtx: parentCtx, |
||||
currentPprofCtx: ctx, |
||||
} |
||||
w.Span = span.SetTag(profileIDTagKey, spanID) |
||||
return &w |
||||
} |
||||
|
||||
type spanWrapper struct { |
||||
parentPprofCtx context.Context |
||||
currentPprofCtx context.Context |
||||
opentracing.Span |
||||
} |
||||
|
||||
func (s *spanWrapper) Finish() { |
||||
s.Span.Finish() |
||||
pprof.SetGoroutineLabels(s.parentPprofCtx) |
||||
s.currentPprofCtx = s.parentPprofCtx |
||||
} |
||||
|
||||
// sampledSpanID returns the span ID, if the span is sampled,
|
||||
// otherwise an empty string is returned.
|
||||
func sampledSpanID(spanCtx jaeger.SpanContext) string { |
||||
if spanCtx.IsSampled() { |
||||
return spanCtx.SpanID().String() |
||||
} |
||||
return "" |
||||
} |
||||
@ -1,112 +0,0 @@ |
||||
package spanprofiler |
||||
|
||||
import ( |
||||
"context" |
||||
"unsafe" |
||||
|
||||
"github.com/opentracing/opentracing-go" |
||||
"github.com/uber/jaeger-client-go" |
||||
) |
||||
|
||||
const ( |
||||
profileIDTagKey = "pyroscope.profile.id" |
||||
|
||||
spanIDLabelName = "span_id" |
||||
spanNameLabelName = "span_name" |
||||
) |
||||
|
||||
type tracer struct{ opentracing.Tracer } |
||||
|
||||
// NewTracer creates a new opentracing.Tracer with the span profiler integrated.
|
||||
//
|
||||
// For efficiency, the tracer selectively records profiles for _root_ spans
|
||||
// — the initial _local_ span in a process — since a trace may encompass
|
||||
// thousands of spans. All stack trace samples accumulated during the execution
|
||||
// of their child spans contribute to the root span's profile. In practical
|
||||
// terms, this signifies that, for instance, an HTTP request results in a
|
||||
// singular profile, irrespective of the numerous spans within the trace. It's
|
||||
// important to note that these profiles don't extend beyond the boundaries of
|
||||
// a single process.
|
||||
//
|
||||
// The limitation of this approach is that only spans created within the same
|
||||
// goroutine, or its children, as the parent are taken into account.
|
||||
// Consequently, in scenarios involving asynchronous execution, where the parent
|
||||
// span context is passed to another goroutine, explicit profiling becomes
|
||||
// necessary using `spanprofiler.StartSpanFromContext`.
|
||||
func NewTracer(tr opentracing.Tracer) opentracing.Tracer { return &tracer{tr} } |
||||
|
||||
func (t *tracer) StartSpan(operationName string, opts ...opentracing.StartSpanOption) opentracing.Span { |
||||
span := t.Tracer.StartSpan(operationName, opts...) |
||||
spanCtx, ok := span.Context().(jaeger.SpanContext) |
||||
if !ok { |
||||
return span |
||||
} |
||||
if !spanCtx.IsSampled() { |
||||
return span |
||||
} |
||||
// pprof labels are attached only once, at the span root level.
|
||||
if !isRootSpan(opts...) { |
||||
return span |
||||
} |
||||
// The pprof label API assumes that pairs of labels are passed through the
|
||||
// context. Unfortunately, the opentracing Tracer API doesn't match this
|
||||
// concept: this makes it impossible to save an existing pprof context and
|
||||
// all the original pprof labels associated with the goroutine.
|
||||
ctx := context.Background() |
||||
return wrapJaegerSpanWithGoroutineLabels(ctx, span, operationName, sampledSpanID(spanCtx)) |
||||
} |
||||
|
||||
// isRootSpan reports whether the span is a root span.
|
||||
//
|
||||
// There are only two valid cases: if the span is the first span in the trace,
|
||||
// or is the first _local_ span in the trace.
|
||||
//
|
||||
// An exception is made for FollowsFrom reference: spans without an explicit
|
||||
// parent are considered as root ones.
|
||||
func isRootSpan(opts ...opentracing.StartSpanOption) bool { |
||||
parent, ok := parentSpanContextFromRef(opts...) |
||||
return !ok || isRemoteSpan(parent) |
||||
} |
||||
|
||||
// parentSpanContextFromRef returns the first parent reference.
|
||||
func parentSpanContextFromRef(options ...opentracing.StartSpanOption) (sc jaeger.SpanContext, ok bool) { |
||||
var sso opentracing.StartSpanOptions |
||||
for _, option := range options { |
||||
option.Apply(&sso) |
||||
} |
||||
for _, ref := range sso.References { |
||||
if ref.Type == opentracing.ChildOfRef && ref.ReferencedContext != nil { |
||||
sc, ok = ref.ReferencedContext.(jaeger.SpanContext) |
||||
return sc, ok |
||||
} |
||||
} |
||||
return sc, ok |
||||
} |
||||
|
||||
// isRemoteSpan reports whether the span context represents a remote parent.
|
||||
//
|
||||
// NOTE(kolesnikovae): this is ugly, but the only reliable method I found.
|
||||
// The opentracing-go package and Jaeger client are not meant to change as
|
||||
// both are deprecated.
|
||||
func isRemoteSpan(c jaeger.SpanContext) bool { |
||||
jaegerCtx := *(*jaegerSpanCtx)(unsafe.Pointer(&c)) |
||||
return jaegerCtx.remote |
||||
} |
||||
|
||||
// jaegerSpanCtx represents memory layout of the jaeger.SpanContext type.
|
||||
type jaegerSpanCtx struct { |
||||
traceID [16]byte // TraceID
|
||||
spanID [8]byte // SpanID
|
||||
parentID [8]byte // SpanID
|
||||
baggage uintptr // map[string]string
|
||||
debugID [2]uintptr // string
|
||||
|
||||
// samplingState is a pointer to a struct that has "localRootSpan" member,
|
||||
// which we could probably use: that would allow omitting quite expensive
|
||||
// parentSpanContextFromRef call. However, interpreting the pointer and
|
||||
// the complex struct memory layout is more complicated and dangerous.
|
||||
samplingState uintptr |
||||
|
||||
// remote indicates that span context represents a remote parent
|
||||
remote bool |
||||
} |
||||
@ -1,27 +0,0 @@ |
||||
Copyright (c) 2016, gRPC Ecosystem |
||||
All rights reserved. |
||||
|
||||
Redistribution and use in source and binary forms, with or without |
||||
modification, are permitted provided that the following conditions are met: |
||||
|
||||
* Redistributions of source code must retain the above copyright notice, this |
||||
list of conditions and the following disclaimer. |
||||
|
||||
* Redistributions in binary form must reproduce the above copyright notice, |
||||
this list of conditions and the following disclaimer in the documentation |
||||
and/or other materials provided with the distribution. |
||||
|
||||
* Neither the name of grpc-opentracing nor the names of its |
||||
contributors may be used to endorse or promote products derived from |
||||
this software without specific prior written permission. |
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
||||
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
||||
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE |
||||
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE |
||||
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL |
||||
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR |
||||
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER |
||||
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, |
||||
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||
@ -1,23 +0,0 @@ |
||||
Additional IP Rights Grant (Patents) |
||||
|
||||
"This implementation" means the copyrightable works distributed by |
||||
Google as part of the GRPC project. |
||||
|
||||
Google hereby grants to You a perpetual, worldwide, non-exclusive, |
||||
no-charge, royalty-free, irrevocable (except as stated in this section) |
||||
patent license to make, have made, use, offer to sell, sell, import, |
||||
transfer and otherwise run, modify and propagate the contents of this |
||||
implementation of GRPC, where such license applies only to those patent |
||||
claims, both currently owned or controlled by Google and acquired in |
||||
the future, licensable by Google that are necessarily infringed by this |
||||
implementation of GRPC. This grant does not include claims that would be |
||||
infringed only as a consequence of further modification of this |
||||
implementation. If you or your agent or exclusive licensee institute or |
||||
order or agree to the institution of patent litigation against any |
||||
entity (including a cross-claim or counterclaim in a lawsuit) alleging |
||||
that this implementation of GRPC or any code incorporated within this |
||||
implementation of GRPC constitutes direct or contributory patent |
||||
infringement, or inducement of patent infringement, then any patent |
||||
rights granted to you under this License for this implementation of GRPC |
||||
shall terminate as of the date such litigation is filed. |
||||
Status API Training Shop Blog About |
||||
@ -1,57 +0,0 @@ |
||||
# OpenTracing support for gRPC in Go |
||||
|
||||
The `otgrpc` package makes it easy to add OpenTracing support to gRPC-based |
||||
systems in Go. |
||||
|
||||
## Installation |
||||
|
||||
``` |
||||
go get github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc |
||||
``` |
||||
|
||||
## Documentation |
||||
|
||||
See the basic usage examples below and the [package documentation on |
||||
godoc.org](https://godoc.org/github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc). |
||||
|
||||
## Client-side usage example |
||||
|
||||
Wherever you call `grpc.Dial`: |
||||
|
||||
```go |
||||
// You must have some sort of OpenTracing Tracer instance on hand. |
||||
var tracer opentracing.Tracer = ... |
||||
... |
||||
|
||||
// Set up a connection to the server peer. |
||||
conn, err := grpc.Dial( |
||||
address, |
||||
... // other options |
||||
grpc.WithUnaryInterceptor( |
||||
otgrpc.OpenTracingClientInterceptor(tracer)), |
||||
grpc.WithStreamInterceptor( |
||||
otgrpc.OpenTracingStreamClientInterceptor(tracer))) |
||||
|
||||
// All future RPC activity involving `conn` will be automatically traced. |
||||
``` |
||||
|
||||
## Server-side usage example |
||||
|
||||
Wherever you call `grpc.NewServer`: |
||||
|
||||
```go |
||||
// You must have some sort of OpenTracing Tracer instance on hand. |
||||
var tracer opentracing.Tracer = ... |
||||
... |
||||
|
||||
// Initialize the gRPC server. |
||||
s := grpc.NewServer( |
||||
... // other options |
||||
grpc.UnaryInterceptor( |
||||
otgrpc.OpenTracingServerInterceptor(tracer)), |
||||
grpc.StreamInterceptor( |
||||
otgrpc.OpenTracingStreamServerInterceptor(tracer))) |
||||
|
||||
// All future RPC activity involving `s` will be automatically traced. |
||||
``` |
||||
|
||||
@ -1,239 +0,0 @@ |
||||
package otgrpc |
||||
|
||||
import ( |
||||
"github.com/opentracing/opentracing-go" |
||||
"github.com/opentracing/opentracing-go/ext" |
||||
"github.com/opentracing/opentracing-go/log" |
||||
"golang.org/x/net/context" |
||||
"google.golang.org/grpc" |
||||
"google.golang.org/grpc/metadata" |
||||
"io" |
||||
"runtime" |
||||
"sync/atomic" |
||||
) |
||||
|
||||
// OpenTracingClientInterceptor returns a grpc.UnaryClientInterceptor suitable
|
||||
// for use in a grpc.Dial call.
|
||||
//
|
||||
// For example:
|
||||
//
|
||||
// conn, err := grpc.Dial(
|
||||
// address,
|
||||
// ..., // (existing DialOptions)
|
||||
// grpc.WithUnaryInterceptor(otgrpc.OpenTracingClientInterceptor(tracer)))
|
||||
//
|
||||
// All gRPC client spans will inject the OpenTracing SpanContext into the gRPC
|
||||
// metadata; they will also look in the context.Context for an active
|
||||
// in-process parent Span and establish a ChildOf reference if such a parent
|
||||
// Span could be found.
|
||||
func OpenTracingClientInterceptor(tracer opentracing.Tracer, optFuncs ...Option) grpc.UnaryClientInterceptor { |
||||
otgrpcOpts := newOptions() |
||||
otgrpcOpts.apply(optFuncs...) |
||||
return func( |
||||
ctx context.Context, |
||||
method string, |
||||
req, resp interface{}, |
||||
cc *grpc.ClientConn, |
||||
invoker grpc.UnaryInvoker, |
||||
opts ...grpc.CallOption, |
||||
) error { |
||||
var err error |
||||
var parentCtx opentracing.SpanContext |
||||
if parent := opentracing.SpanFromContext(ctx); parent != nil { |
||||
parentCtx = parent.Context() |
||||
} |
||||
if otgrpcOpts.inclusionFunc != nil && |
||||
!otgrpcOpts.inclusionFunc(parentCtx, method, req, resp) { |
||||
return invoker(ctx, method, req, resp, cc, opts...) |
||||
} |
||||
clientSpan := tracer.StartSpan( |
||||
method, |
||||
opentracing.ChildOf(parentCtx), |
||||
ext.SpanKindRPCClient, |
||||
gRPCComponentTag, |
||||
) |
||||
defer clientSpan.Finish() |
||||
ctx = injectSpanContext(ctx, tracer, clientSpan) |
||||
if otgrpcOpts.logPayloads { |
||||
clientSpan.LogFields(log.Object("gRPC request", req)) |
||||
} |
||||
err = invoker(ctx, method, req, resp, cc, opts...) |
||||
if err == nil { |
||||
if otgrpcOpts.logPayloads { |
||||
clientSpan.LogFields(log.Object("gRPC response", resp)) |
||||
} |
||||
} else { |
||||
SetSpanTags(clientSpan, err, true) |
||||
clientSpan.LogFields(log.String("event", "error"), log.String("message", err.Error())) |
||||
} |
||||
if otgrpcOpts.decorator != nil { |
||||
otgrpcOpts.decorator(clientSpan, method, req, resp, err) |
||||
} |
||||
return err |
||||
} |
||||
} |
||||
|
||||
// OpenTracingStreamClientInterceptor returns a grpc.StreamClientInterceptor suitable
|
||||
// for use in a grpc.Dial call. The interceptor instruments streaming RPCs by creating
|
||||
// a single span to correspond to the lifetime of the RPC's stream.
|
||||
//
|
||||
// For example:
|
||||
//
|
||||
// conn, err := grpc.Dial(
|
||||
// address,
|
||||
// ..., // (existing DialOptions)
|
||||
// grpc.WithStreamInterceptor(otgrpc.OpenTracingStreamClientInterceptor(tracer)))
|
||||
//
|
||||
// All gRPC client spans will inject the OpenTracing SpanContext into the gRPC
|
||||
// metadata; they will also look in the context.Context for an active
|
||||
// in-process parent Span and establish a ChildOf reference if such a parent
|
||||
// Span could be found.
|
||||
func OpenTracingStreamClientInterceptor(tracer opentracing.Tracer, optFuncs ...Option) grpc.StreamClientInterceptor { |
||||
otgrpcOpts := newOptions() |
||||
otgrpcOpts.apply(optFuncs...) |
||||
return func( |
||||
ctx context.Context, |
||||
desc *grpc.StreamDesc, |
||||
cc *grpc.ClientConn, |
||||
method string, |
||||
streamer grpc.Streamer, |
||||
opts ...grpc.CallOption, |
||||
) (grpc.ClientStream, error) { |
||||
var err error |
||||
var parentCtx opentracing.SpanContext |
||||
if parent := opentracing.SpanFromContext(ctx); parent != nil { |
||||
parentCtx = parent.Context() |
||||
} |
||||
if otgrpcOpts.inclusionFunc != nil && |
||||
!otgrpcOpts.inclusionFunc(parentCtx, method, nil, nil) { |
||||
return streamer(ctx, desc, cc, method, opts...) |
||||
} |
||||
|
||||
clientSpan := tracer.StartSpan( |
||||
method, |
||||
opentracing.ChildOf(parentCtx), |
||||
ext.SpanKindRPCClient, |
||||
gRPCComponentTag, |
||||
) |
||||
ctx = injectSpanContext(ctx, tracer, clientSpan) |
||||
cs, err := streamer(ctx, desc, cc, method, opts...) |
||||
if err != nil { |
||||
clientSpan.LogFields(log.String("event", "error"), log.String("message", err.Error())) |
||||
SetSpanTags(clientSpan, err, true) |
||||
clientSpan.Finish() |
||||
return cs, err |
||||
} |
||||
return newOpenTracingClientStream(cs, method, desc, clientSpan, otgrpcOpts), nil |
||||
} |
||||
} |
||||
|
||||
func newOpenTracingClientStream(cs grpc.ClientStream, method string, desc *grpc.StreamDesc, clientSpan opentracing.Span, otgrpcOpts *options) grpc.ClientStream { |
||||
finishChan := make(chan struct{}) |
||||
|
||||
isFinished := new(int32) |
||||
*isFinished = 0 |
||||
finishFunc := func(err error) { |
||||
// The current OpenTracing specification forbids finishing a span more than
|
||||
// once. Since we have multiple code paths that could concurrently call
|
||||
// `finishFunc`, we need to add some sort of synchronization to guard against
|
||||
// multiple finishing.
|
||||
if !atomic.CompareAndSwapInt32(isFinished, 0, 1) { |
||||
return |
||||
} |
||||
close(finishChan) |
||||
defer clientSpan.Finish() |
||||
if err != nil { |
||||
clientSpan.LogFields(log.String("event", "error"), log.String("message", err.Error())) |
||||
SetSpanTags(clientSpan, err, true) |
||||
} |
||||
if otgrpcOpts.decorator != nil { |
||||
otgrpcOpts.decorator(clientSpan, method, nil, nil, err) |
||||
} |
||||
} |
||||
go func() { |
||||
select { |
||||
case <-finishChan: |
||||
// The client span is being finished by another code path; hence, no
|
||||
// action is necessary.
|
||||
case <-cs.Context().Done(): |
||||
finishFunc(cs.Context().Err()) |
||||
} |
||||
}() |
||||
otcs := &openTracingClientStream{ |
||||
ClientStream: cs, |
||||
desc: desc, |
||||
finishFunc: finishFunc, |
||||
} |
||||
|
||||
// The `ClientStream` interface allows one to omit calling `Recv` if it's
|
||||
// known that the result will be `io.EOF`. See
|
||||
// http://stackoverflow.com/q/42915337
|
||||
// In such cases, there's nothing that triggers the span to finish. We,
|
||||
// therefore, set a finalizer so that the span and the context goroutine will
|
||||
// at least be cleaned up when the garbage collector is run.
|
||||
runtime.SetFinalizer(otcs, func(otcs *openTracingClientStream) { |
||||
otcs.finishFunc(nil) |
||||
}) |
||||
return otcs |
||||
} |
||||
|
||||
type openTracingClientStream struct { |
||||
grpc.ClientStream |
||||
desc *grpc.StreamDesc |
||||
finishFunc func(error) |
||||
} |
||||
|
||||
func (cs *openTracingClientStream) Header() (metadata.MD, error) { |
||||
md, err := cs.ClientStream.Header() |
||||
if err != nil { |
||||
cs.finishFunc(err) |
||||
} |
||||
return md, err |
||||
} |
||||
|
||||
func (cs *openTracingClientStream) SendMsg(m interface{}) error { |
||||
err := cs.ClientStream.SendMsg(m) |
||||
if err != nil { |
||||
cs.finishFunc(err) |
||||
} |
||||
return err |
||||
} |
||||
|
||||
func (cs *openTracingClientStream) RecvMsg(m interface{}) error { |
||||
err := cs.ClientStream.RecvMsg(m) |
||||
if err == io.EOF { |
||||
cs.finishFunc(nil) |
||||
return err |
||||
} else if err != nil { |
||||
cs.finishFunc(err) |
||||
return err |
||||
} |
||||
if !cs.desc.ServerStreams { |
||||
cs.finishFunc(nil) |
||||
} |
||||
return err |
||||
} |
||||
|
||||
func (cs *openTracingClientStream) CloseSend() error { |
||||
err := cs.ClientStream.CloseSend() |
||||
if err != nil { |
||||
cs.finishFunc(err) |
||||
} |
||||
return err |
||||
} |
||||
|
||||
func injectSpanContext(ctx context.Context, tracer opentracing.Tracer, clientSpan opentracing.Span) context.Context { |
||||
md, ok := metadata.FromOutgoingContext(ctx) |
||||
if !ok { |
||||
md = metadata.New(nil) |
||||
} else { |
||||
md = md.Copy() |
||||
} |
||||
mdWriter := metadataReaderWriter{md} |
||||
err := tracer.Inject(clientSpan.Context(), opentracing.HTTPHeaders, mdWriter) |
||||
// We have no better place to record an error than the Span itself :-/
|
||||
if err != nil { |
||||
clientSpan.LogFields(log.String("event", "Tracer.Inject() failed"), log.Error(err)) |
||||
} |
||||
return metadata.NewOutgoingContext(ctx, md) |
||||
} |
||||
@ -1,69 +0,0 @@ |
||||
package otgrpc |
||||
|
||||
import ( |
||||
"github.com/opentracing/opentracing-go" |
||||
"github.com/opentracing/opentracing-go/ext" |
||||
"google.golang.org/grpc/codes" |
||||
"google.golang.org/grpc/status" |
||||
) |
||||
|
||||
// A Class is a set of types of outcomes (including errors) that will often
|
||||
// be handled in the same way.
|
||||
type Class string |
||||
|
||||
const ( |
||||
Unknown Class = "0xx" |
||||
// Success represents outcomes that achieved the desired results.
|
||||
Success Class = "2xx" |
||||
// ClientError represents errors that were the client's fault.
|
||||
ClientError Class = "4xx" |
||||
// ServerError represents errors that were the server's fault.
|
||||
ServerError Class = "5xx" |
||||
) |
||||
|
||||
// ErrorClass returns the class of the given error
|
||||
func ErrorClass(err error) Class { |
||||
if s, ok := status.FromError(err); ok { |
||||
switch s.Code() { |
||||
// Success or "success"
|
||||
case codes.OK, codes.Canceled: |
||||
return Success |
||||
|
||||
// Client errors
|
||||
case codes.InvalidArgument, codes.NotFound, codes.AlreadyExists, |
||||
codes.PermissionDenied, codes.Unauthenticated, codes.FailedPrecondition, |
||||
codes.OutOfRange: |
||||
return ClientError |
||||
|
||||
// Server errors
|
||||
case codes.DeadlineExceeded, codes.ResourceExhausted, codes.Aborted, |
||||
codes.Unimplemented, codes.Internal, codes.Unavailable, codes.DataLoss: |
||||
return ServerError |
||||
|
||||
// Not sure
|
||||
case codes.Unknown: |
||||
fallthrough |
||||
default: |
||||
return Unknown |
||||
} |
||||
} |
||||
return Unknown |
||||
} |
||||
|
||||
// SetSpanTags sets one or more tags on the given span according to the
|
||||
// error.
|
||||
func SetSpanTags(span opentracing.Span, err error, client bool) { |
||||
c := ErrorClass(err) |
||||
code := codes.Unknown |
||||
if s, ok := status.FromError(err); ok { |
||||
code = s.Code() |
||||
} |
||||
span.SetTag("response_code", code) |
||||
span.SetTag("response_class", c) |
||||
if err == nil { |
||||
return |
||||
} |
||||
if client || c == ServerError { |
||||
ext.Error.Set(span, true) |
||||
} |
||||
} |
||||
@ -1,76 +0,0 @@ |
||||
package otgrpc |
||||
|
||||
import "github.com/opentracing/opentracing-go" |
||||
|
||||
// Option instances may be used in OpenTracing(Server|Client)Interceptor
|
||||
// initialization.
|
||||
//
|
||||
// See this post about the "functional options" pattern:
|
||||
// http://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis
|
||||
type Option func(o *options) |
||||
|
||||
// LogPayloads returns an Option that tells the OpenTracing instrumentation to
|
||||
// try to log application payloads in both directions.
|
||||
func LogPayloads() Option { |
||||
return func(o *options) { |
||||
o.logPayloads = true |
||||
} |
||||
} |
||||
|
||||
// SpanInclusionFunc provides an optional mechanism to decide whether or not
|
||||
// to trace a given gRPC call. Return true to create a Span and initiate
|
||||
// tracing, false to not create a Span and not trace.
|
||||
//
|
||||
// parentSpanCtx may be nil if no parent could be extraction from either the Go
|
||||
// context.Context (on the client) or the RPC (on the server).
|
||||
type SpanInclusionFunc func( |
||||
parentSpanCtx opentracing.SpanContext, |
||||
method string, |
||||
req, resp interface{}) bool |
||||
|
||||
// IncludingSpans binds a IncludeSpanFunc to the options
|
||||
func IncludingSpans(inclusionFunc SpanInclusionFunc) Option { |
||||
return func(o *options) { |
||||
o.inclusionFunc = inclusionFunc |
||||
} |
||||
} |
||||
|
||||
// SpanDecoratorFunc provides an (optional) mechanism for otgrpc users to add
|
||||
// arbitrary tags/logs/etc to the opentracing.Span associated with client
|
||||
// and/or server RPCs.
|
||||
type SpanDecoratorFunc func( |
||||
span opentracing.Span, |
||||
method string, |
||||
req, resp interface{}, |
||||
grpcError error) |
||||
|
||||
// SpanDecorator binds a function that decorates gRPC Spans.
|
||||
func SpanDecorator(decorator SpanDecoratorFunc) Option { |
||||
return func(o *options) { |
||||
o.decorator = decorator |
||||
} |
||||
} |
||||
|
||||
// The internal-only options struct. Obviously overkill at the moment; but will
|
||||
// scale well as production use dictates other configuration and tuning
|
||||
// parameters.
|
||||
type options struct { |
||||
logPayloads bool |
||||
decorator SpanDecoratorFunc |
||||
// May be nil.
|
||||
inclusionFunc SpanInclusionFunc |
||||
} |
||||
|
||||
// newOptions returns the default options.
|
||||
func newOptions() *options { |
||||
return &options{ |
||||
logPayloads: false, |
||||
inclusionFunc: nil, |
||||
} |
||||
} |
||||
|
||||
func (o *options) apply(opts ...Option) { |
||||
for _, opt := range opts { |
||||
opt(o) |
||||
} |
||||
} |
||||
@ -1,5 +0,0 @@ |
||||
// Package otgrpc provides OpenTracing support for any gRPC client or server.
|
||||
//
|
||||
// See the README for simple usage examples:
|
||||
// https://github.com/grpc-ecosystem/grpc-opentracing/blob/master/go/otgrpc/README.md
|
||||
package otgrpc |
||||
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue