Tracing: Migrate to OpenTelemetry library (#9724)
Signed-off-by: Matej Gera <matejgera@gmail.com>pull/10209/head
parent
71a0f42331
commit
2c61d29b2a
@ -0,0 +1,2 @@ |
||||
tracing: |
||||
sampling_fraction: 1 |
||||
@ -0,0 +1,205 @@ |
||||
// Copyright 2021 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tracing |
||||
|
||||
import ( |
||||
"context" |
||||
"time" |
||||
|
||||
"github.com/go-kit/log" |
||||
"github.com/go-kit/log/level" |
||||
"github.com/pkg/errors" |
||||
config_util "github.com/prometheus/common/config" |
||||
"github.com/prometheus/common/version" |
||||
"go.opentelemetry.io/otel" |
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace" |
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" |
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" |
||||
"go.opentelemetry.io/otel/propagation" |
||||
"go.opentelemetry.io/otel/sdk/resource" |
||||
tracesdk "go.opentelemetry.io/otel/sdk/trace" |
||||
semconv "go.opentelemetry.io/otel/semconv/v1.7.0" |
||||
"go.opentelemetry.io/otel/trace" |
||||
"google.golang.org/grpc/credentials" |
||||
|
||||
"github.com/prometheus/prometheus/config" |
||||
) |
||||
|
||||
const serviceName = "prometheus" |
||||
|
||||
// Manager is capable of building, (re)installing and shutting down
|
||||
// the tracer provider.
|
||||
type Manager struct { |
||||
logger log.Logger |
||||
done chan struct{} |
||||
config config.TracingConfig |
||||
shutdownFunc func() error |
||||
} |
||||
|
||||
// NewManager creates a new tracing manager.
|
||||
func NewManager(logger log.Logger) *Manager { |
||||
return &Manager{ |
||||
logger: logger, |
||||
done: make(chan struct{}), |
||||
} |
||||
} |
||||
|
||||
// Run starts the tracing manager. It registers the global text map propagator and error handler.
|
||||
// It is blocking.
|
||||
func (m *Manager) Run() { |
||||
otel.SetTextMapPropagator(propagation.TraceContext{}) |
||||
otel.SetErrorHandler(otelErrHandler(func(err error) { |
||||
level.Error(m.logger).Log("msg", "OpenTelemetry handler returned an error", "err", err) |
||||
})) |
||||
<-m.done |
||||
} |
||||
|
||||
// ApplyConfig takes care of refreshing the tracing configuration by shutting down
|
||||
// the current tracer provider (if any is registered) and installing a new one.
|
||||
func (m *Manager) ApplyConfig(cfg *config.Config) error { |
||||
// Update only if a config change is detected.
|
||||
if m.config == cfg.TracingConfig { |
||||
return nil |
||||
} |
||||
|
||||
if m.shutdownFunc != nil { |
||||
if err := m.shutdownFunc(); err != nil { |
||||
return errors.Wrap(err, "failed to shut down the tracer provider") |
||||
} |
||||
} |
||||
|
||||
// If no endpoint is set, assume tracing should be disabled.
|
||||
if cfg.TracingConfig.Endpoint == "" { |
||||
m.config = cfg.TracingConfig |
||||
m.shutdownFunc = nil |
||||
otel.SetTracerProvider(trace.NewNoopTracerProvider()) |
||||
level.Info(m.logger).Log("msg", "Tracing provider uninstalled.") |
||||
return nil |
||||
} |
||||
|
||||
tp, shutdownFunc, err := buildTracerProvider(context.Background(), cfg.TracingConfig) |
||||
if err != nil { |
||||
return errors.Wrap(err, "failed to install a new tracer provider") |
||||
} |
||||
|
||||
m.shutdownFunc = shutdownFunc |
||||
m.config = cfg.TracingConfig |
||||
otel.SetTracerProvider(tp) |
||||
|
||||
level.Info(m.logger).Log("msg", "Successfully installed a new tracer provider.") |
||||
return nil |
||||
} |
||||
|
||||
// Stop gracefully shuts down the tracer provider and stops the tracing manager.
|
||||
func (m *Manager) Stop() { |
||||
defer close(m.done) |
||||
|
||||
if m.shutdownFunc == nil { |
||||
return |
||||
} |
||||
|
||||
if err := m.shutdownFunc(); err != nil { |
||||
level.Error(m.logger).Log("msg", "failed to shut down the tracer provider", "err", err) |
||||
} |
||||
|
||||
level.Info(m.logger).Log("msg", "Tracing manager stopped") |
||||
} |
||||
|
||||
type otelErrHandler func(err error) |
||||
|
||||
func (o otelErrHandler) Handle(err error) { |
||||
o(err) |
||||
} |
||||
|
||||
// buildTracerProvider return a new tracer provider ready for installation, together
|
||||
// with a shutdown function.
|
||||
func buildTracerProvider(ctx context.Context, tracingCfg config.TracingConfig) (trace.TracerProvider, func() error, error) { |
||||
client, err := getClient(tracingCfg) |
||||
if err != nil { |
||||
return nil, nil, err |
||||
} |
||||
|
||||
exp, err := otlptrace.New(ctx, client) |
||||
if err != nil { |
||||
return nil, nil, err |
||||
} |
||||
|
||||
// Create a resource describing the service and the runtime.
|
||||
res, err := resource.New( |
||||
ctx, |
||||
resource.WithSchemaURL(semconv.SchemaURL), |
||||
resource.WithAttributes( |
||||
semconv.ServiceNameKey.String(serviceName), |
||||
semconv.ServiceVersionKey.String(version.Version), |
||||
), |
||||
resource.WithProcessRuntimeDescription(), |
||||
resource.WithTelemetrySDK(), |
||||
) |
||||
if err != nil { |
||||
return nil, nil, err |
||||
} |
||||
|
||||
tp := tracesdk.NewTracerProvider( |
||||
tracesdk.WithBatcher(exp), |
||||
tracesdk.WithSampler(tracesdk.ParentBased( |
||||
tracesdk.TraceIDRatioBased(tracingCfg.SamplingFraction), |
||||
)), |
||||
tracesdk.WithResource(res), |
||||
) |
||||
|
||||
return tp, func() error { |
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
||||
defer cancel() |
||||
err := tp.Shutdown(ctx) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
return nil |
||||
}, nil |
||||
} |
||||
|
||||
// getClient returns an appropriate OTLP client (either gRPC or HTTP), based
|
||||
// on the provided tracing configuration.
|
||||
func getClient(tracingCfg config.TracingConfig) (otlptrace.Client, error) { |
||||
var client otlptrace.Client |
||||
switch tracingCfg.ClientType { |
||||
case config.TracingClientGRPC: |
||||
opts := []otlptracegrpc.Option{otlptracegrpc.WithEndpoint(tracingCfg.Endpoint)} |
||||
if !tracingCfg.WithSecure { |
||||
opts = append(opts, otlptracegrpc.WithInsecure()) |
||||
} else { |
||||
tlsConf, err := config_util.NewTLSConfig(&tracingCfg.TLSConfig) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
opts = append(opts, otlptracegrpc.WithTLSCredentials(credentials.NewTLS(tlsConf))) |
||||
} |
||||
client = otlptracegrpc.NewClient(opts...) |
||||
case config.TracingClientHTTP: |
||||
opts := []otlptracehttp.Option{otlptracehttp.WithEndpoint(tracingCfg.Endpoint)} |
||||
if !tracingCfg.WithSecure { |
||||
opts = append(opts, otlptracehttp.WithInsecure()) |
||||
} else { |
||||
tlsConf, err := config_util.NewTLSConfig(&tracingCfg.TLSConfig) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
opts = append(opts, otlptracehttp.WithTLSClientConfig(tlsConf)) |
||||
} |
||||
client = otlptracehttp.NewClient(opts...) |
||||
} |
||||
|
||||
return client, nil |
||||
} |
||||
@ -0,0 +1,117 @@ |
||||
// Copyright 2021 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tracing |
||||
|
||||
import ( |
||||
"testing" |
||||
|
||||
"github.com/go-kit/log" |
||||
config_util "github.com/prometheus/common/config" |
||||
"github.com/stretchr/testify/require" |
||||
"go.opentelemetry.io/otel" |
||||
"go.opentelemetry.io/otel/trace" |
||||
|
||||
"github.com/prometheus/prometheus/config" |
||||
) |
||||
|
||||
func TestInstallingNewTracerProvider(t *testing.T) { |
||||
tpBefore := otel.GetTracerProvider() |
||||
|
||||
m := NewManager(log.NewNopLogger()) |
||||
cfg := config.Config{ |
||||
TracingConfig: config.TracingConfig{ |
||||
Endpoint: "localhost:1234", |
||||
ClientType: config.TracingClientGRPC, |
||||
}, |
||||
} |
||||
|
||||
require.NoError(t, m.ApplyConfig(&cfg)) |
||||
require.NotEqual(t, tpBefore, otel.GetTracerProvider()) |
||||
} |
||||
|
||||
func TestReinstallingTracerProvider(t *testing.T) { |
||||
m := NewManager(log.NewNopLogger()) |
||||
cfg := config.Config{ |
||||
TracingConfig: config.TracingConfig{ |
||||
Endpoint: "localhost:1234", |
||||
ClientType: config.TracingClientGRPC, |
||||
TLSConfig: config_util.TLSConfig{ |
||||
CAFile: "ca-file.pem", |
||||
CertFile: "cert.pem", |
||||
ServerName: "test-server", |
||||
}, |
||||
}, |
||||
} |
||||
|
||||
require.NoError(t, m.ApplyConfig(&cfg)) |
||||
tpFirstConfig := otel.GetTracerProvider() |
||||
|
||||
// Trying to apply the same config should not reinstall provider.
|
||||
require.NoError(t, m.ApplyConfig(&cfg)) |
||||
require.Equal(t, tpFirstConfig, otel.GetTracerProvider()) |
||||
|
||||
cfg2 := config.Config{ |
||||
TracingConfig: config.TracingConfig{ |
||||
Endpoint: "localhost:1234", |
||||
ClientType: config.TracingClientHTTP, |
||||
TLSConfig: config_util.TLSConfig{ |
||||
CAFile: "ca-file.pem", |
||||
CertFile: "cert.pem", |
||||
ServerName: "test-server", |
||||
}, |
||||
}, |
||||
} |
||||
|
||||
require.NoError(t, m.ApplyConfig(&cfg2)) |
||||
require.NotEqual(t, tpFirstConfig, otel.GetTracerProvider()) |
||||
} |
||||
|
||||
func TestUninstallingTracerProvider(t *testing.T) { |
||||
m := NewManager(log.NewNopLogger()) |
||||
cfg := config.Config{ |
||||
TracingConfig: config.TracingConfig{ |
||||
Endpoint: "localhost:1234", |
||||
ClientType: config.TracingClientGRPC, |
||||
}, |
||||
} |
||||
|
||||
require.NoError(t, m.ApplyConfig(&cfg)) |
||||
require.NotEqual(t, trace.NewNoopTracerProvider(), otel.GetTracerProvider()) |
||||
|
||||
// Uninstall by passing empty config.
|
||||
cfg2 := config.Config{ |
||||
TracingConfig: config.TracingConfig{}, |
||||
} |
||||
|
||||
require.NoError(t, m.ApplyConfig(&cfg2)) |
||||
// Make sure we get a no-op tracer provider after uninstallation.
|
||||
require.Equal(t, trace.NewNoopTracerProvider(), otel.GetTracerProvider()) |
||||
} |
||||
|
||||
func TestTracerProviderShutdown(t *testing.T) { |
||||
m := NewManager(log.NewNopLogger()) |
||||
cfg := config.Config{ |
||||
TracingConfig: config.TracingConfig{ |
||||
Endpoint: "localhost:1234", |
||||
ClientType: config.TracingClientGRPC, |
||||
}, |
||||
} |
||||
|
||||
require.NoError(t, m.ApplyConfig(&cfg)) |
||||
m.Stop() |
||||
|
||||
// Check if we closed the done channel.
|
||||
_, ok := <-m.done |
||||
require.Equal(t, ok, false) |
||||
} |
||||
Loading…
Reference in new issue