|
|
|
@ -13,10 +13,13 @@ import ( |
|
|
|
|
"github.com/grafana/dskit/concurrency" |
|
|
|
|
"github.com/grafana/dskit/grpcclient" |
|
|
|
|
"github.com/grafana/dskit/instrument" |
|
|
|
|
"github.com/grafana/dskit/middleware" |
|
|
|
|
"github.com/grafana/dskit/ring" |
|
|
|
|
"github.com/grafana/dskit/ring/client" |
|
|
|
|
"github.com/grafana/dskit/services" |
|
|
|
|
"github.com/grafana/dskit/tenant" |
|
|
|
|
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" |
|
|
|
|
"github.com/opentracing/opentracing-go" |
|
|
|
|
"github.com/pkg/errors" |
|
|
|
|
"github.com/prometheus/client_golang/prometheus" |
|
|
|
|
"google.golang.org/grpc" |
|
|
|
@ -72,6 +75,9 @@ type IndexGatewayClientConfig struct { |
|
|
|
|
// LogGatewayRequests configures if requests sent to the gateway should be logged or not.
|
|
|
|
|
// The log messages are of type debug and contain the address of the gateway and the relevant tenant.
|
|
|
|
|
LogGatewayRequests bool `yaml:"log_gateway_requests"` |
|
|
|
|
|
|
|
|
|
GRPCUnaryClientInterceptors []grpc.UnaryClientInterceptor `yaml:"-"` |
|
|
|
|
GRCPStreamClientInterceptors []grpc.StreamClientInterceptor `yaml:"-"` |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// RegisterFlagsWithPrefix register client-specific flags with the given prefix.
|
|
|
|
@ -136,7 +142,7 @@ func NewGatewayClient(cfg IndexGatewayClientConfig, r prometheus.Registerer, lim |
|
|
|
|
done: make(chan struct{}), |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
dialOpts, err := cfg.GRPCClientConfig.DialOption(grpcclient.Instrument(sgClient.storeGatewayClientRequestDuration)) |
|
|
|
|
dialOpts, err := cfg.GRPCClientConfig.DialOption(instrumentation(cfg, sgClient.storeGatewayClientRequestDuration)) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, errors.Wrap(err, "index gateway grpc dial option") |
|
|
|
|
} |
|
|
|
@ -458,3 +464,19 @@ func (b *grpcIter) RangeValue() []byte { |
|
|
|
|
func (b *grpcIter) Value() []byte { |
|
|
|
|
return b.Rows[b.i].Value |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func instrumentation(cfg IndexGatewayClientConfig, clientRequestDuration *prometheus.HistogramVec) ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) { |
|
|
|
|
var unaryInterceptors []grpc.UnaryClientInterceptor |
|
|
|
|
unaryInterceptors = append(unaryInterceptors, cfg.GRPCUnaryClientInterceptors...) |
|
|
|
|
unaryInterceptors = append(unaryInterceptors, otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer())) |
|
|
|
|
unaryInterceptors = append(unaryInterceptors, middleware.ClientUserHeaderInterceptor) |
|
|
|
|
unaryInterceptors = append(unaryInterceptors, middleware.UnaryClientInstrumentInterceptor(clientRequestDuration)) |
|
|
|
|
|
|
|
|
|
var streamInterceptors []grpc.StreamClientInterceptor |
|
|
|
|
streamInterceptors = append(streamInterceptors, cfg.GRCPStreamClientInterceptors...) |
|
|
|
|
streamInterceptors = append(streamInterceptors, otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer())) |
|
|
|
|
streamInterceptors = append(streamInterceptors, middleware.StreamClientUserHeaderInterceptor) |
|
|
|
|
streamInterceptors = append(streamInterceptors, middleware.StreamClientInstrumentInterceptor(clientRequestDuration)) |
|
|
|
|
|
|
|
|
|
return unaryInterceptors, streamInterceptors |
|
|
|
|
} |
|
|
|
|