Upgrade dskit (#4313)

Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>

Use middleware package from dskit

Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
pull/4324/head
Arve Knudsen 4 years ago committed by GitHub
parent e8ac2214fb
commit f48e867002
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      pkg/ingester/client/client.go
  2. 3
      pkg/storage/chunk/gcp/instrumentation.go
  3. 83
      vendor/github.com/grafana/dskit/middleware/grpc.go
  4. 1
      vendor/modules.txt

@ -7,6 +7,7 @@ import (
"github.com/cortexproject/cortex/pkg/distributor"
"github.com/cortexproject/cortex/pkg/util/grpcclient"
dsmiddleware "github.com/grafana/dskit/middleware"
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
@ -15,8 +16,6 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
cortex_middleware "github.com/cortexproject/cortex/pkg/util/middleware"
"github.com/grafana/loki/pkg/logproto"
)
@ -89,14 +88,14 @@ func instrumentation(cfg *Config) ([]grpc.UnaryClientInterceptor, []grpc.StreamC
unaryInterceptors = append(unaryInterceptors,
otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()),
middleware.ClientUserHeaderInterceptor,
cortex_middleware.PrometheusGRPCUnaryInstrumentation(ingesterClientRequestDuration),
dsmiddleware.PrometheusGRPCUnaryInstrumentation(ingesterClientRequestDuration),
)
var streamInterceptors []grpc.StreamClientInterceptor
streamInterceptors = append(streamInterceptors, cfg.GRCPStreamClientInterceptors...)
streamInterceptors = append(streamInterceptors,
otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()),
middleware.StreamClientUserHeaderInterceptor,
cortex_middleware.PrometheusGRPCStreamInstrumentation(ingesterClientRequestDuration),
dsmiddleware.PrometheusGRPCStreamInstrumentation(ingesterClientRequestDuration),
)
return unaryInterceptors, streamInterceptors

@ -6,6 +6,7 @@ import (
"strconv"
"time"
"github.com/grafana/dskit/middleware"
otgrpc "github.com/opentracing-contrib/go-grpc"
opentracing "github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
@ -13,8 +14,6 @@ import (
"google.golang.org/api/option"
google_http "google.golang.org/api/transport/http"
"google.golang.org/grpc"
"github.com/cortexproject/cortex/pkg/util/middleware"
)
var (

@ -0,0 +1,83 @@
package middleware
import (
"context"
"io"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/common/instrument"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
// PrometheusGRPCUnaryInstrumentation records duration of gRPC requests client side.
func PrometheusGRPCUnaryInstrumentation(metric *prometheus.HistogramVec) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, resp interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
start := time.Now()
err := invoker(ctx, method, req, resp, cc, opts...)
metric.WithLabelValues(method, instrument.ErrorCode(err)).Observe(time.Since(start).Seconds())
return err
}
}
// PrometheusGRPCStreamInstrumentation records duration of streaming gRPC requests client side.
func PrometheusGRPCStreamInstrumentation(metric *prometheus.HistogramVec) grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string,
streamer grpc.Streamer, opts ...grpc.CallOption,
) (grpc.ClientStream, error) {
start := time.Now()
stream, err := streamer(ctx, desc, cc, method, opts...)
return &instrumentedClientStream{
metric: metric,
start: start,
method: method,
ClientStream: stream,
}, err
}
}
type instrumentedClientStream struct {
metric *prometheus.HistogramVec
start time.Time
method string
grpc.ClientStream
}
func (s *instrumentedClientStream) SendMsg(m interface{}) error {
err := s.ClientStream.SendMsg(m)
if err == nil {
return err
}
if err == io.EOF {
s.metric.WithLabelValues(s.method, instrument.ErrorCode(nil)).Observe(time.Since(s.start).Seconds())
} else {
s.metric.WithLabelValues(s.method, instrument.ErrorCode(err)).Observe(time.Since(s.start).Seconds())
}
return err
}
func (s *instrumentedClientStream) RecvMsg(m interface{}) error {
err := s.ClientStream.RecvMsg(m)
if err == nil {
return err
}
if err == io.EOF {
s.metric.WithLabelValues(s.method, instrument.ErrorCode(nil)).Observe(time.Since(s.start).Seconds())
} else {
s.metric.WithLabelValues(s.method, instrument.ErrorCode(err)).Observe(time.Since(s.start).Seconds())
}
return err
}
func (s *instrumentedClientStream) Header() (metadata.MD, error) {
md, err := s.ClientStream.Header()
if err != nil {
s.metric.WithLabelValues(s.method, instrument.ErrorCode(err)).Observe(time.Since(s.start).Seconds())
}
return md, err
}

@ -571,6 +571,7 @@ github.com/grafana/dskit/kv/consul
github.com/grafana/dskit/kv/etcd
github.com/grafana/dskit/kv/kvtls
github.com/grafana/dskit/kv/memberlist
github.com/grafana/dskit/middleware
github.com/grafana/dskit/modules
github.com/grafana/dskit/runtimeconfig
github.com/grafana/dskit/services

Loading…
Cancel
Save