Loki: Update `grafana/dskit` and `weaveworks/common` (#6643)

* Upgrade dskit and weaveworks/common

Signed-off-by: Marco Pracucci <marco@pracucci.com>

* 'go mod vendor' again

Signed-off-by: Marco Pracucci <marco@pracucci.com>
pull/6647/head
Marco Pracucci 4 years ago committed by GitHub
parent 094f8f3b20
commit e7fb10f456
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      go.mod
  2. 10
      go.sum
  3. 5
      pkg/ingester/client/client.go
  4. 3
      pkg/querier/worker/scheduler_processor.go
  5. 6
      pkg/storage/chunk/client/gcp/instrumentation.go
  6. 6
      vendor/github.com/grafana/dskit/grpcclient/instrumentation.go
  7. 11
      vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go
  8. 101
      vendor/github.com/grafana/dskit/middleware/grpc.go
  9. 21
      vendor/github.com/grafana/dskit/ring/model.go
  10. 30
      vendor/github.com/grafana/dskit/ring/ring.go
  11. 0
      vendor/github.com/prometheus/exporter-toolkit/LICENSE
  12. 10
      vendor/github.com/prometheus/exporter-toolkit/web/README.md
  13. 91
      vendor/github.com/prometheus/exporter-toolkit/web/cache.go
  14. 137
      vendor/github.com/prometheus/exporter-toolkit/web/handler.go
  15. 361
      vendor/github.com/prometheus/exporter-toolkit/web/tls_config.go
  16. 6
      vendor/github.com/prometheus/exporter-toolkit/web/web-config.yml
  17. 17
      vendor/github.com/prometheus/node_exporter/NOTICE
  18. 28
      vendor/github.com/prometheus/node_exporter/https/README.md
  19. 126
      vendor/github.com/prometheus/node_exporter/https/tls_config.go
  20. 11
      vendor/github.com/prometheus/node_exporter/https/web-config.yml
  21. 9
      vendor/github.com/weaveworks/common/instrument/instrument.go
  22. 137
      vendor/github.com/weaveworks/common/logging/dedupe.go
  23. 43
      vendor/github.com/weaveworks/common/logging/gokit.go
  24. 91
      vendor/github.com/weaveworks/common/middleware/grpc_instrumentation.go
  25. 11
      vendor/github.com/weaveworks/common/middleware/grpc_logging.go
  26. 18
      vendor/github.com/weaveworks/common/middleware/http_tracing.go
  27. 32
      vendor/github.com/weaveworks/common/middleware/logging.go
  28. 105
      vendor/github.com/weaveworks/common/server/server.go
  29. 13
      vendor/modules.txt

@ -49,7 +49,7 @@ require (
github.com/google/uuid v1.2.0
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.4.2
github.com/grafana/dskit v0.0.0-20220624123803-3624a963826e
github.com/grafana/dskit v0.0.0-20220708141012-99f3d0043c23
github.com/grafana/go-gelf/v2 v2.0.1
github.com/grafana/regexp v0.0.0-20220304100321-149c8afcd6cb
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
@ -92,7 +92,7 @@ require (
github.com/thanos-io/thanos v0.22.0
github.com/tonistiigi/fifo v0.0.0-20190226154929-a9fb20d87448
github.com/uber/jaeger-client-go v2.30.0+incompatible
github.com/weaveworks/common v0.0.0-20211015155308-ebe5bdc2c89e
github.com/weaveworks/common v0.0.0-20220706100410-67d27ed40fae
github.com/xdg-go/scram v1.0.2
go.etcd.io/bbolt v1.3.6
go.uber.org/atomic v1.9.0
@ -230,7 +230,7 @@ require (
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/node_exporter v1.0.0-rc.0.0.20200428091818-01054558c289 // indirect
github.com/prometheus/exporter-toolkit v0.7.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/rs/xid v1.2.1 // indirect

@ -839,8 +839,8 @@ github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0U
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gotestyourself/gotestyourself v2.2.0+incompatible/go.mod h1:zZKM6oeNM8k+FRljX1mnzVYeS8wiGgQyvST1/GafPbY=
github.com/grafana/dskit v0.0.0-20211021180445-3bd016e9d7f1/go.mod h1:uPG2nyK4CtgNDmWv7qyzYcdI+S90kHHRWvHnBtEMBXM=
github.com/grafana/dskit v0.0.0-20220624123803-3624a963826e h1:8aN+oHEUvvZUUnH17M08lIWWzRRNHi2wnQmNStl7xa0=
github.com/grafana/dskit v0.0.0-20220624123803-3624a963826e/go.mod h1:9It/K30QPyj/FuTqBb/SYnaS4/BJCP5YL4SRfXB7dG0=
github.com/grafana/dskit v0.0.0-20220708141012-99f3d0043c23 h1:VF+BC/NBcxMP33P04x9j+4cSa1aBKafoS5RSVYtT4ic=
github.com/grafana/dskit v0.0.0-20220708141012-99f3d0043c23/go.mod h1:D5GdDQDsPN12+eGhq+lSCY4o/glBYO6NC8CRkzb23gs=
github.com/grafana/go-gelf/v2 v2.0.1 h1:BOChP0h/jLeD+7F9mL7tq10xVkDG15he3T1zHuQaWak=
github.com/grafana/go-gelf/v2 v2.0.1/go.mod h1:lexHie0xzYGwCgiRGcvZ723bSNyNI8ZRD4s0CLobh90=
github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY=
@ -1376,7 +1376,6 @@ github.com/prometheus/exporter-toolkit v0.6.1/go.mod h1:ZUBIj498ePooX9t/2xtDjeQY
github.com/prometheus/exporter-toolkit v0.7.0/go.mod h1:ZUBIj498ePooX9t/2xtDjeQYwvRpiPP2lh5u4iblj2g=
github.com/prometheus/exporter-toolkit v0.7.1 h1:c6RXaK8xBVercEeUQ4tRNL8UGWzDHfvj9dseo1FcK1Y=
github.com/prometheus/exporter-toolkit v0.7.1/go.mod h1:ZUBIj498ePooX9t/2xtDjeQYwvRpiPP2lh5u4iblj2g=
github.com/prometheus/node_exporter v1.0.0-rc.0.0.20200428091818-01054558c289 h1:dTUS1vaLWq+Y6XKOTnrFpoVsQKLCbCp1OLj24TDi7oM=
github.com/prometheus/node_exporter v1.0.0-rc.0.0.20200428091818-01054558c289/go.mod h1:FGbBv5OPKjch+jNUJmEQpMZytIdyW0NdBtWFcfSKusc=
github.com/prometheus/procfs v0.0.0-20180408092902-8b1c2da0d56d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
@ -1544,8 +1543,9 @@ github.com/vultr/govultr/v2 v2.17.1 h1:UBmotwA0mkGtyJMakUF9jhLH/W3mN5wfGRn543i/B
github.com/vultr/govultr/v2 v2.17.1/go.mod h1:ZFOKGWmgjytfyjeyAdhQlSWwTjh2ig+X49cAp50dzXI=
github.com/wavefronthq/wavefront-sdk-go v0.9.2/go.mod h1:hQI6y8M9OtTCtc0xdwh+dCER4osxXdEAeCpacjpDZEU=
github.com/weaveworks/common v0.0.0-20210913144402-035033b78a78/go.mod h1:YU9FvnS7kUnRt6HY10G+2qHkwzP3n3Vb1XsXDsJTSp8=
github.com/weaveworks/common v0.0.0-20211015155308-ebe5bdc2c89e h1:B0gVGyVpjfWJWSRe027EkhmEype0a0Dt2uHVxcPrhfs=
github.com/weaveworks/common v0.0.0-20211015155308-ebe5bdc2c89e/go.mod h1:GWX2dQ7yjrgvqH0+d3kCJC5bsY8oOFwqjxFMHaRK4/k=
github.com/weaveworks/common v0.0.0-20220629114710-e3b70df0f08b/go.mod h1:YfOOLoW1Q/jIIu0WLeSwgStmrKjuJEZSKTAUc+0KFvE=
github.com/weaveworks/common v0.0.0-20220706100410-67d27ed40fae h1:Z8YibUpdBEdCq8nwrYXJQ8vYooevbmEBIdFpseXK3/8=
github.com/weaveworks/common v0.0.0-20220706100410-67d27ed40fae/go.mod h1:YfOOLoW1Q/jIIu0WLeSwgStmrKjuJEZSKTAUc+0KFvE=
github.com/weaveworks/promrus v1.2.0 h1:jOLf6pe6/vss4qGHjXmGz4oDJQA+AOCqEL3FvvZGz7M=
github.com/weaveworks/promrus v1.2.0/go.mod h1:SaE82+OJ91yqjrE1rsvBWVzNZKcHYFtMUyS1+Ogs/KA=
github.com/willf/bitset v1.1.11 h1:N7Z7E9UvjW+sGsEl7k/SJrvY2reP1A07MrGuCjIOjRE=

@ -6,7 +6,6 @@ import (
"time"
"github.com/grafana/dskit/grpcclient"
dsmiddleware "github.com/grafana/dskit/middleware"
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
@ -88,14 +87,14 @@ func instrumentation(cfg *Config) ([]grpc.UnaryClientInterceptor, []grpc.StreamC
unaryInterceptors = append(unaryInterceptors,
otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()),
middleware.ClientUserHeaderInterceptor,
dsmiddleware.PrometheusGRPCUnaryInstrumentation(ingesterClientRequestDuration),
middleware.UnaryClientInstrumentInterceptor(ingesterClientRequestDuration),
)
var streamInterceptors []grpc.StreamClientInterceptor
streamInterceptors = append(streamInterceptors, cfg.GRCPStreamClientInterceptors...)
streamInterceptors = append(streamInterceptors,
otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()),
middleware.StreamClientUserHeaderInterceptor,
dsmiddleware.PrometheusGRPCStreamInstrumentation(ingesterClientRequestDuration),
middleware.StreamClientInstrumentInterceptor(ingesterClientRequestDuration),
)
return unaryInterceptors, streamInterceptors

@ -11,7 +11,6 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/grpcclient"
dskit_middleware "github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/ring/client"
"github.com/grafana/dskit/services"
otgrpc "github.com/opentracing-contrib/go-grpc"
@ -188,7 +187,7 @@ func (sp *schedulerProcessor) createFrontendClient(addr string) (client.PoolClie
opts, err := sp.grpcConfig.DialOption([]grpc.UnaryClientInterceptor{
otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()),
middleware.ClientUserHeaderInterceptor,
dskit_middleware.PrometheusGRPCUnaryInstrumentation(sp.metrics.frontendClientRequestDuration),
middleware.UnaryClientInstrumentInterceptor(sp.metrics.frontendClientRequestDuration),
}, nil)
if err != nil {
return nil, err

@ -5,11 +5,11 @@ import (
"strconv"
"time"
"github.com/grafana/dskit/middleware"
otgrpc "github.com/opentracing-contrib/go-grpc"
opentracing "github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/middleware"
"google.golang.org/api/option"
"google.golang.org/grpc"
)
@ -39,11 +39,11 @@ var (
func bigtableInstrumentation() ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) {
return []grpc.UnaryClientInterceptor{
otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()),
middleware.PrometheusGRPCUnaryInstrumentation(bigtableRequestDuration),
middleware.UnaryClientInstrumentInterceptor(bigtableRequestDuration),
},
[]grpc.StreamClientInterceptor{
otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()),
middleware.PrometheusGRPCStreamInstrumentation(bigtableRequestDuration),
middleware.StreamClientInstrumentInterceptor(bigtableRequestDuration),
}
}

@ -6,18 +6,16 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/common/middleware"
"google.golang.org/grpc"
dsmiddleware "github.com/grafana/dskit/middleware"
)
func Instrument(requestDuration *prometheus.HistogramVec) ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) {
return []grpc.UnaryClientInterceptor{
otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()),
middleware.ClientUserHeaderInterceptor,
dsmiddleware.PrometheusGRPCUnaryInstrumentation(requestDuration),
middleware.UnaryClientInstrumentInterceptor(requestDuration),
}, []grpc.StreamClientInterceptor{
otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()),
middleware.StreamClientUserHeaderInterceptor,
dsmiddleware.PrometheusGRPCStreamInstrumentation(requestDuration),
middleware.StreamClientInstrumentInterceptor(requestDuration),
}
}

@ -140,6 +140,9 @@ type KVConfig struct {
AdvertiseAddr string `yaml:"advertise_addr"`
AdvertisePort int `yaml:"advertise_port"`
ClusterLabel string `yaml:"cluster_label" category:"experimental"`
ClusterLabelVerificationDisabled bool `yaml:"cluster_label_verification_disabled" category:"experimental"`
// List of members to join
JoinMembers flagext.StringSlice `yaml:"join_members"`
MinJoinBackoff time.Duration `yaml:"min_join_backoff" category:"advanced"`
@ -193,6 +196,8 @@ func (cfg *KVConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
f.BoolVar(&cfg.EnableCompression, prefix+"memberlist.compression-enabled", mlDefaults.EnableCompression, "Enable message compression. This can be used to reduce bandwidth usage at the cost of slightly more CPU utilization.")
f.StringVar(&cfg.AdvertiseAddr, prefix+"memberlist.advertise-addr", mlDefaults.AdvertiseAddr, "Gossip address to advertise to other members in the cluster. Used for NAT traversal.")
f.IntVar(&cfg.AdvertisePort, prefix+"memberlist.advertise-port", mlDefaults.AdvertisePort, "Gossip port to advertise to other members in the cluster. Used for NAT traversal.")
f.StringVar(&cfg.ClusterLabel, prefix+"memberlist.cluster-label", mlDefaults.Label, "The cluster label is an optional string to include in outbound packets and gossip streams. Other members in the memberlist cluster will discard any message whose label doesn't match the configured one, unless the 'cluster-label-verification-disabled' configuration option is set to true.")
f.BoolVar(&cfg.ClusterLabelVerificationDisabled, prefix+"memberlist.cluster-label-verification-disabled", mlDefaults.SkipInboundLabelCheck, "When true, memberlist doesn't verify that inbound packets and gossip streams have the cluster label matching the configured one. This verification should be disabled while rolling out the change to the configured cluster label in a live memberlist cluster.")
cfg.TCPTransport.RegisterFlagsWithPrefix(f, prefix)
}
@ -399,12 +404,14 @@ func (m *KV) buildMemberlistConfig() (*memberlist.Config, error) {
mlCfg.AdvertiseAddr = m.cfg.AdvertiseAddr
mlCfg.AdvertisePort = m.cfg.AdvertisePort
mlCfg.Label = m.cfg.ClusterLabel
mlCfg.SkipInboundLabelCheck = m.cfg.ClusterLabelVerificationDisabled
if m.cfg.NodeName != "" {
mlCfg.Name = m.cfg.NodeName
}
if m.cfg.RandomizeNodeName {
mlCfg.Name = mlCfg.Name + "-" + generateRandomSuffix(m.logger)
level.Info(m.logger).Log("msg", "Using memberlist cluster node name", "name", mlCfg.Name)
}
mlCfg.LogOutput = newMemberlistLoggerAdapter(m.logger, false)
@ -420,6 +427,8 @@ func (m *KV) buildMemberlistConfig() (*memberlist.Config, error) {
mlCfg.ProbeInterval = 5 * time.Second // Probe a random node every this interval. This setting is also the total timeout for the direct + indirect probes.
mlCfg.ProbeTimeout = 2 * time.Second // Timeout for the direct probe.
level.Info(m.logger).Log("msg", "Using memberlist cluster label %q and node name %q", mlCfg.Label, mlCfg.Name)
return mlCfg, nil
}

@ -1,101 +0,0 @@
package middleware
import (
"context"
"io"
"strconv"
"time"
"github.com/prometheus/client_golang/prometheus"
grpcUtils "github.com/weaveworks/common/grpc"
"github.com/weaveworks/common/httpgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
// PrometheusGRPCUnaryInstrumentation records duration of gRPC requests client side.
func PrometheusGRPCUnaryInstrumentation(metric *prometheus.HistogramVec) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, resp interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
start := time.Now()
err := invoker(ctx, method, req, resp, cc, opts...)
metric.WithLabelValues(method, errorCode(err)).Observe(time.Since(start).Seconds())
return err
}
}
// PrometheusGRPCStreamInstrumentation records duration of streaming gRPC requests client side.
func PrometheusGRPCStreamInstrumentation(metric *prometheus.HistogramVec) grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string,
streamer grpc.Streamer, opts ...grpc.CallOption,
) (grpc.ClientStream, error) {
start := time.Now()
stream, err := streamer(ctx, desc, cc, method, opts...)
return &instrumentedClientStream{
metric: metric,
start: start,
method: method,
ClientStream: stream,
}, err
}
}
type instrumentedClientStream struct {
metric *prometheus.HistogramVec
start time.Time
method string
grpc.ClientStream
}
func (s *instrumentedClientStream) SendMsg(m interface{}) error {
err := s.ClientStream.SendMsg(m)
if err == nil {
return err
}
if err == io.EOF {
s.metric.WithLabelValues(s.method, errorCode(nil)).Observe(time.Since(s.start).Seconds())
} else {
s.metric.WithLabelValues(s.method, errorCode(err)).Observe(time.Since(s.start).Seconds())
}
return err
}
func (s *instrumentedClientStream) RecvMsg(m interface{}) error {
err := s.ClientStream.RecvMsg(m)
if err == nil {
return err
}
if err == io.EOF {
s.metric.WithLabelValues(s.method, errorCode(nil)).Observe(time.Since(s.start).Seconds())
} else {
s.metric.WithLabelValues(s.method, errorCode(err)).Observe(time.Since(s.start).Seconds())
}
return err
}
func (s *instrumentedClientStream) Header() (metadata.MD, error) {
md, err := s.ClientStream.Header()
if err != nil {
s.metric.WithLabelValues(s.method, errorCode(err)).Observe(time.Since(s.start).Seconds())
}
return md, err
}
func errorCode(err error) string {
respStatus := "2xx"
if err != nil {
if errResp, ok := httpgrpc.HTTPResponseFromError(err); ok {
statusFamily := int(errResp.Code / 100)
respStatus = strconv.Itoa(statusFamily) + "xx"
} else if grpcUtils.IsCanceled(err) {
respStatus = "cancel"
} else {
respStatus = "error"
}
}
return respStatus
}

@ -489,6 +489,27 @@ func (d *Desc) getTokensByZone() map[string][]uint32 {
return MergeTokensByZone(zones)
}
// getOldestRegisteredTimestamp returns unix timestamp of oldest "RegisteredTimestamp" value from all instances.
// If any instance has 0 value of RegisteredTimestamp, this function returns 0.
func (d *Desc) getOldestRegisteredTimestamp() int64 {
var result int64
for _, instance := range d.Ingesters {
switch {
case instance.RegisteredTimestamp == 0:
return 0
case result == 0:
result = instance.RegisteredTimestamp
case instance.RegisteredTimestamp < result:
result = instance.RegisteredTimestamp
}
}
return result
}
type CompareResult int
// CompareResult responses

@ -167,6 +167,10 @@ type Ring struct {
ringTokens []uint32
ringTokensByZone map[string][]uint32
// Oldest value of RegisteredTimestamp from all instances. If any instance had RegisteredTimestamp == 0,
// then this value will be 0.
oldestRegisteredTimestamp int64
// Maps a token with the information of the instance holding it. This map is immutable and
// cannot be chanced in place because it's shared "as is" between subrings (the only way to
// change it is to create a new one and replace it).
@ -310,6 +314,7 @@ func (r *Ring) updateRingState(ringDesc *Desc) {
ringTokensByZone := ringDesc.getTokensByZone()
ringInstanceByToken := ringDesc.getTokensInfo()
ringZones := getZones(ringTokensByZone)
oldestRegisteredTimestamp := ringDesc.getOldestRegisteredTimestamp()
r.mtx.Lock()
defer r.mtx.Unlock()
@ -318,6 +323,7 @@ func (r *Ring) updateRingState(ringDesc *Desc) {
r.ringTokensByZone = ringTokensByZone
r.ringInstanceByToken = ringInstanceByToken
r.ringZones = ringZones
r.oldestRegisteredTimestamp = oldestRegisteredTimestamp
r.lastTopologyChange = now
if r.shuffledSubringCache != nil {
// Invalidate all cached subrings.
@ -603,8 +609,11 @@ func (r *Ring) ShuffleShard(identifier string, size int) ReadRing {
}
result := r.shuffleShard(identifier, size, 0, time.Now())
r.setCachedShuffledSubring(identifier, size, result)
// Only cache subring if it is different from this ring, to avoid deadlocks in getCachedShuffledSubring,
// when we update the cached ring.
if result != r {
r.setCachedShuffledSubring(identifier, size, result)
}
return result
}
@ -630,6 +639,16 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur
r.mtx.RLock()
defer r.mtx.RUnlock()
// If all instances have RegisteredTimestamp within the lookback period,
// then all instances would be included in the resulting ring, so we can
// simply return this ring.
//
// If any instance had RegisteredTimestamp equal to 0 (it would not cause additional lookup of next instance),
// then r.oldestRegisteredTimestamp is zero too, and we skip this optimization.
if lookbackPeriod > 0 && r.oldestRegisteredTimestamp > 0 && r.oldestRegisteredTimestamp >= lookbackUntil {
return r
}
var numInstancesPerZone int
var actualZones []string
@ -721,6 +740,8 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur
ringTokensByZone: shardTokensByZone,
ringZones: getZones(shardTokensByZone),
oldestRegisteredTimestamp: shardDesc.getOldestRegisteredTimestamp(),
// We reference the original map as is in order to avoid copying. It's safe to do
// because this map is immutable by design and it's a superset of the actual instances
// with the subring.
@ -770,6 +791,11 @@ func (r *Ring) getCachedShuffledSubring(identifier string, size int) *Ring {
return nil
}
// No need to update cached subring, if it is the original ring itself.
if r == cached {
return cached
}
cached.mtx.Lock()
defer cached.mtx.Unlock()

@ -0,0 +1,10 @@
# web package
This package can be used by Prometheus exporters to enable TLS and
authentication.
We actively encourage the community to use this repository, to provide a
consistent experience across the ecosystem.
Developers documentation can be found on
[pkg.go.dev](https://pkg.go.dev/github.com/prometheus/exporter-toolkit/).

@ -0,0 +1,91 @@
// Copyright 2021 The Prometheus Authors
// This code is partly borrowed from Caddy:
// Copyright 2015 Matthew Holt and The Caddy Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package web
import (
weakrand "math/rand"
"sync"
"time"
)
var cacheSize = 100
func init() {
weakrand.Seed(time.Now().UnixNano())
}
type cache struct {
cache map[string]bool
mtx sync.Mutex
}
// newCache returns a cache that contains a mapping of plaintext passwords
// to their hashes (with random eviction). This can greatly improve the
// performance of traffic-heavy servers that use secure password hashing
// algorithms, with the downside that plaintext passwords will be stored in
// memory for a longer time (this should not be a problem as long as your
// machine is not compromised, at which point all bets are off, since basicauth
// necessitates plaintext passwords being received over the wire anyway).
func newCache() *cache {
return &cache{
cache: make(map[string]bool),
}
}
func (c *cache) get(key string) (bool, bool) {
c.mtx.Lock()
defer c.mtx.Unlock()
v, ok := c.cache[key]
return v, ok
}
func (c *cache) set(key string, value bool) {
c.mtx.Lock()
defer c.mtx.Unlock()
c.makeRoom()
c.cache[key] = value
}
func (c *cache) makeRoom() {
if len(c.cache) < cacheSize {
return
}
// We delete more than just 1 entry so that we don't have
// to do this on every request; assuming the capacity of
// the cache is on a long tail, we can save a lot of CPU
// time by doing a whole bunch of deletions now and then
// we won't have to do them again for a while.
numToDelete := len(c.cache) / 10
if numToDelete < 1 {
numToDelete = 1
}
for deleted := 0; deleted <= numToDelete; deleted++ {
// Go maps are "nondeterministic" not actually random,
// so although we could just chop off the "front" of the
// map with less code, this is a heavily skewed eviction
// strategy; generating random numbers is cheap and
// ensures a much better distribution.
rnd := weakrand.Intn(len(c.cache))
i := 0
for key := range c.cache {
if i == rnd {
delete(c.cache, key)
break
}
i++
}
}
}

@ -0,0 +1,137 @@
// Copyright 2020 The Prometheus Authors
// This code is partly borrowed from Caddy:
// Copyright 2015 Matthew Holt and The Caddy Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package web
import (
"encoding/hex"
"fmt"
"net/http"
"sync"
"github.com/go-kit/log"
"golang.org/x/crypto/bcrypt"
)
// extraHTTPHeaders is a map of HTTP headers that can be added to HTTP
// responses.
// This is private on purpose to ensure consistency in the Prometheus ecosystem.
var extraHTTPHeaders = map[string][]string{
"Strict-Transport-Security": nil,
"X-Content-Type-Options": {"nosniff"},
"X-Frame-Options": {"deny", "sameorigin"},
"X-XSS-Protection": nil,
"Content-Security-Policy": nil,
}
func validateUsers(configPath string) error {
c, err := getConfig(configPath)
if err != nil {
return err
}
for _, p := range c.Users {
_, err = bcrypt.Cost([]byte(p))
if err != nil {
return err
}
}
return nil
}
// validateHeaderConfig checks that the provided header configuration is correct.
// It does not check the validity of all the values, only the ones which are
// well-defined enumerations.
func validateHeaderConfig(headers map[string]string) error {
HeadersLoop:
for k, v := range headers {
values, ok := extraHTTPHeaders[k]
if !ok {
return fmt.Errorf("HTTP header %q can not be configured", k)
}
for _, allowedValue := range values {
if v == allowedValue {
continue HeadersLoop
}
}
if len(values) > 0 {
return fmt.Errorf("invalid value for %s. Expected one of: %q, but got: %q", k, values, v)
}
}
return nil
}
type webHandler struct {
tlsConfigPath string
handler http.Handler
logger log.Logger
cache *cache
// bcryptMtx is there to ensure that bcrypt.CompareHashAndPassword is run
// only once in parallel as this is CPU intensive.
bcryptMtx sync.Mutex
}
func (u *webHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
c, err := getConfig(u.tlsConfigPath)
if err != nil {
u.logger.Log("msg", "Unable to parse configuration", "err", err)
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}
// Configure http headers.
for k, v := range c.HTTPConfig.Header {
w.Header().Set(k, v)
}
if len(c.Users) == 0 {
u.handler.ServeHTTP(w, r)
return
}
user, pass, auth := r.BasicAuth()
if auth {
hashedPassword, validUser := c.Users[user]
if !validUser {
// The user is not found. Use a fixed password hash to
// prevent user enumeration by timing requests.
// This is a bcrypt-hashed version of "fakepassword".
hashedPassword = "$2y$10$QOauhQNbBCuQDKes6eFzPeMqBSjb7Mr5DUmpZ/VcEd00UAV/LDeSi"
}
cacheKey := hex.EncodeToString(append(append([]byte(user), []byte(hashedPassword)...), []byte(pass)...))
authOk, ok := u.cache.get(cacheKey)
if !ok {
// This user, hashedPassword, password is not cached.
u.bcryptMtx.Lock()
err := bcrypt.CompareHashAndPassword([]byte(hashedPassword), []byte(pass))
u.bcryptMtx.Unlock()
authOk = err == nil
u.cache.set(cacheKey, authOk)
}
if authOk && validUser {
u.handler.ServeHTTP(w, r)
return
}
}
w.Header().Set("WWW-Authenticate", "Basic")
http.Error(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized)
}

@ -0,0 +1,361 @@
// Copyright 2019 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package web
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"net"
"net/http"
"path/filepath"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
config_util "github.com/prometheus/common/config"
"gopkg.in/yaml.v2"
)
var (
errNoTLSConfig = errors.New("TLS config is not present")
)
type Config struct {
TLSConfig TLSStruct `yaml:"tls_server_config"`
HTTPConfig HTTPStruct `yaml:"http_server_config"`
Users map[string]config_util.Secret `yaml:"basic_auth_users"`
}
type TLSStruct struct {
TLSCertPath string `yaml:"cert_file"`
TLSKeyPath string `yaml:"key_file"`
ClientAuth string `yaml:"client_auth_type"`
ClientCAs string `yaml:"client_ca_file"`
CipherSuites []cipher `yaml:"cipher_suites"`
CurvePreferences []curve `yaml:"curve_preferences"`
MinVersion tlsVersion `yaml:"min_version"`
MaxVersion tlsVersion `yaml:"max_version"`
PreferServerCipherSuites bool `yaml:"prefer_server_cipher_suites"`
}
// SetDirectory joins any relative file paths with dir.
func (t *TLSStruct) SetDirectory(dir string) {
t.TLSCertPath = config_util.JoinDir(dir, t.TLSCertPath)
t.TLSKeyPath = config_util.JoinDir(dir, t.TLSKeyPath)
t.ClientCAs = config_util.JoinDir(dir, t.ClientCAs)
}
type HTTPStruct struct {
HTTP2 bool `yaml:"http2"`
Header map[string]string `yaml:"headers,omitempty"`
}
func getConfig(configPath string) (*Config, error) {
content, err := ioutil.ReadFile(configPath)
if err != nil {
return nil, err
}
c := &Config{
TLSConfig: TLSStruct{
MinVersion: tls.VersionTLS12,
MaxVersion: tls.VersionTLS13,
PreferServerCipherSuites: true,
},
HTTPConfig: HTTPStruct{HTTP2: true},
}
err = yaml.UnmarshalStrict(content, c)
if err == nil {
err = validateHeaderConfig(c.HTTPConfig.Header)
}
c.TLSConfig.SetDirectory(filepath.Dir(configPath))
return c, err
}
func getTLSConfig(configPath string) (*tls.Config, error) {
c, err := getConfig(configPath)
if err != nil {
return nil, err
}
return ConfigToTLSConfig(&c.TLSConfig)
}
// ConfigToTLSConfig generates the golang tls.Config from the TLSStruct config.
func ConfigToTLSConfig(c *TLSStruct) (*tls.Config, error) {
if c.TLSCertPath == "" && c.TLSKeyPath == "" && c.ClientAuth == "" && c.ClientCAs == "" {
return nil, errNoTLSConfig
}
if c.TLSCertPath == "" {
return nil, errors.New("missing cert_file")
}
if c.TLSKeyPath == "" {
return nil, errors.New("missing key_file")
}
loadCert := func() (*tls.Certificate, error) {
cert, err := tls.LoadX509KeyPair(c.TLSCertPath, c.TLSKeyPath)
if err != nil {
return nil, errors.Wrap(err, "failed to load X509KeyPair")
}
return &cert, nil
}
// Confirm that certificate and key paths are valid.
if _, err := loadCert(); err != nil {
return nil, err
}
cfg := &tls.Config{
MinVersion: (uint16)(c.MinVersion),
MaxVersion: (uint16)(c.MaxVersion),
PreferServerCipherSuites: c.PreferServerCipherSuites,
}
cfg.GetCertificate = func(*tls.ClientHelloInfo) (*tls.Certificate, error) {
return loadCert()
}
var cf []uint16
for _, c := range c.CipherSuites {
cf = append(cf, (uint16)(c))
}
if len(cf) > 0 {
cfg.CipherSuites = cf
}
var cp []tls.CurveID
for _, c := range c.CurvePreferences {
cp = append(cp, (tls.CurveID)(c))
}
if len(cp) > 0 {
cfg.CurvePreferences = cp
}
if c.ClientCAs != "" {
clientCAPool := x509.NewCertPool()
clientCAFile, err := ioutil.ReadFile(c.ClientCAs)
if err != nil {
return nil, err
}
clientCAPool.AppendCertsFromPEM(clientCAFile)
cfg.ClientCAs = clientCAPool
}
switch c.ClientAuth {
case "RequestClientCert":
cfg.ClientAuth = tls.RequestClientCert
case "RequireAnyClientCert", "RequireClientCert": // Preserved for backwards compatibility.
cfg.ClientAuth = tls.RequireAnyClientCert
case "VerifyClientCertIfGiven":
cfg.ClientAuth = tls.VerifyClientCertIfGiven
case "RequireAndVerifyClientCert":
cfg.ClientAuth = tls.RequireAndVerifyClientCert
case "", "NoClientCert":
cfg.ClientAuth = tls.NoClientCert
default:
return nil, errors.New("Invalid ClientAuth: " + c.ClientAuth)
}
if c.ClientCAs != "" && cfg.ClientAuth == tls.NoClientCert {
return nil, errors.New("Client CA's have been configured without a Client Auth Policy")
}
return cfg, nil
}
// ListenAndServe starts the server on the given address. Based on the file
// tlsConfigPath, TLS or basic auth could be enabled.
func ListenAndServe(server *http.Server, tlsConfigPath string, logger log.Logger) error {
listener, err := net.Listen("tcp", server.Addr)
if err != nil {
return err
}
defer listener.Close()
return Serve(listener, server, tlsConfigPath, logger)
}
// Server starts the server on the given listener. Based on the file
// tlsConfigPath, TLS or basic auth could be enabled.
func Serve(l net.Listener, server *http.Server, tlsConfigPath string, logger log.Logger) error {
if tlsConfigPath == "" {
level.Info(logger).Log("msg", "TLS is disabled.", "http2", false)
return server.Serve(l)
}
if err := validateUsers(tlsConfigPath); err != nil {
return err
}
// Setup basic authentication.
var handler http.Handler = http.DefaultServeMux
if server.Handler != nil {
handler = server.Handler
}
c, err := getConfig(tlsConfigPath)
if err != nil {
return err
}
server.Handler = &webHandler{
tlsConfigPath: tlsConfigPath,
logger: logger,
handler: handler,
cache: newCache(),
}
config, err := ConfigToTLSConfig(&c.TLSConfig)
switch err {
case nil:
if !c.HTTPConfig.HTTP2 {
server.TLSNextProto = make(map[string]func(*http.Server, *tls.Conn, http.Handler))
}
// Valid TLS config.
level.Info(logger).Log("msg", "TLS is enabled.", "http2", c.HTTPConfig.HTTP2)
case errNoTLSConfig:
// No TLS config, back to plain HTTP.
level.Info(logger).Log("msg", "TLS is disabled.", "http2", false)
return server.Serve(l)
default:
// Invalid TLS config.
return err
}
server.TLSConfig = config
// Set the GetConfigForClient method of the HTTPS server so that the config
// and certs are reloaded on new connections.
server.TLSConfig.GetConfigForClient = func(*tls.ClientHelloInfo) (*tls.Config, error) {
config, err := getTLSConfig(tlsConfigPath)
if err != nil {
return nil, err
}
config.NextProtos = server.TLSConfig.NextProtos
return config, nil
}
return server.ServeTLS(l, "", "")
}
// Validate configuration file by reading the configuration and the certificates.
func Validate(tlsConfigPath string) error {
if tlsConfigPath == "" {
return nil
}
if err := validateUsers(tlsConfigPath); err != nil {
return err
}
c, err := getConfig(tlsConfigPath)
if err != nil {
return err
}
_, err = ConfigToTLSConfig(&c.TLSConfig)
if err == errNoTLSConfig {
return nil
}
return err
}
type cipher uint16
func (c *cipher) UnmarshalYAML(unmarshal func(interface{}) error) error {
var s string
err := unmarshal((*string)(&s))
if err != nil {
return err
}
for _, cs := range tls.CipherSuites() {
if cs.Name == s {
*c = (cipher)(cs.ID)
return nil
}
}
return errors.New("unknown cipher: " + s)
}
func (c cipher) MarshalYAML() (interface{}, error) {
return tls.CipherSuiteName((uint16)(c)), nil
}
type curve tls.CurveID
var curves = map[string]curve{
"CurveP256": (curve)(tls.CurveP256),
"CurveP384": (curve)(tls.CurveP384),
"CurveP521": (curve)(tls.CurveP521),
"X25519": (curve)(tls.X25519),
}
func (c *curve) UnmarshalYAML(unmarshal func(interface{}) error) error {
var s string
err := unmarshal((*string)(&s))
if err != nil {
return err
}
if curveid, ok := curves[s]; ok {
*c = curveid
return nil
}
return errors.New("unknown curve: " + s)
}
func (c *curve) MarshalYAML() (interface{}, error) {
for s, curveid := range curves {
if *c == curveid {
return s, nil
}
}
return fmt.Sprintf("%v", c), nil
}
type tlsVersion uint16
var tlsVersions = map[string]tlsVersion{
"TLS13": (tlsVersion)(tls.VersionTLS13),
"TLS12": (tlsVersion)(tls.VersionTLS12),
"TLS11": (tlsVersion)(tls.VersionTLS11),
"TLS10": (tlsVersion)(tls.VersionTLS10),
}
func (tv *tlsVersion) UnmarshalYAML(unmarshal func(interface{}) error) error {
var s string
err := unmarshal((*string)(&s))
if err != nil {
return err
}
if v, ok := tlsVersions[s]; ok {
*tv = v
return nil
}
return errors.New("unknown TLS version: " + s)
}
func (tv *tlsVersion) MarshalYAML() (interface{}, error) {
for s, v := range tlsVersions {
if *tv == v {
return s, nil
}
}
return fmt.Sprintf("%v", tv), nil
}
// Listen starts the server on the given address. Based on the file
// tlsConfigPath, TLS or basic auth could be enabled.
//
// Deprecated: Use ListenAndServe instead.
func Listen(server *http.Server, tlsConfigPath string, logger log.Logger) error {
return ListenAndServe(server, tlsConfigPath, logger)
}

@ -0,0 +1,6 @@
# Minimal TLS configuration example. Additionally, a certificate and a key file
# are needed.
tls_server_config:
cert_file: server.crt
key_file: server.key

@ -1,17 +0,0 @@
Configurable modular Prometheus exporter for various node metrics.
Copyright 2013-2015 The Prometheus Authors
This product includes software developed at
SoundCloud Ltd. (http://soundcloud.com/).
The following components are included in this product:
wifi
https://github.com/mdlayher/wifi
Copyright 2016-2017 Matt Layher
Licensed under the MIT License
netlink
https://github.com/mdlayher/netlink
Copyright 2016-2017 Matt Layher
Licensed under the MIT License

@ -1,28 +0,0 @@
# HTTPS Package for Prometheus
The `https` directory contains a Go package and a sample configuration file for
running `node_exporter` with HTTPS instead of HTTP. We currently support TLS 1.3
and TLS 1.2.
To run a server with TLS, use the flag `--web.config`.
e.g. `./node_exporter --web.config="web-config.yml"`
If the config is kept within the https directory.
The config file should be written in YAML format, and is reloaded on each connection to check for new certificates and/or authentication policy.
## Sample Config
```
tls_config:
# Certificate and key files for server to use to authenticate to client
cert_file: <filename>
key_file: <filename>
# Server policy for client authentication. Maps to ClientAuth Policies
# For more detail on clientAuth options: [ClientAuthType](https://golang.org/pkg/crypto/tls/#ClientAuthType)
[ client_auth_type: <string> | default = "NoClientCert" ]
# CA certificate for client certificate authentication to the server
[ client_ca_file: <filename> ]
```

@ -1,126 +0,0 @@
// Copyright 2019 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package https allows the implementation of TLS.
package https
import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
"net/http"
"github.com/pkg/errors"
"gopkg.in/yaml.v2"
)
type Config struct {
TLSConfig TLSStruct `yaml:"tls_config"`
}
type TLSStruct struct {
TLSCertPath string `yaml:"cert_file"`
TLSKeyPath string `yaml:"key_file"`
ClientAuth string `yaml:"client_auth_type"`
ClientCAs string `yaml:"client_ca_file"`
}
func getTLSConfig(configPath string) (*tls.Config, error) {
content, err := ioutil.ReadFile(configPath)
if err != nil {
return nil, err
}
c := &Config{}
err = yaml.Unmarshal(content, c)
if err != nil {
return nil, err
}
return ConfigToTLSConfig(&c.TLSConfig)
}
// ConfigToTLSConfig generates the golang tls.Config from the TLSStruct config.
func ConfigToTLSConfig(c *TLSStruct) (*tls.Config, error) {
cfg := &tls.Config{
MinVersion: tls.VersionTLS12,
}
if len(c.TLSCertPath) == 0 {
return nil, errors.New("missing TLSCertPath")
}
if len(c.TLSKeyPath) == 0 {
return nil, errors.New("missing TLSKeyPath")
}
loadCert := func() (*tls.Certificate, error) {
cert, err := tls.LoadX509KeyPair(c.TLSCertPath, c.TLSKeyPath)
if err != nil {
return nil, errors.Wrap(err, "failed to load X509KeyPair")
}
return &cert, nil
}
// Confirm that certificate and key paths are valid.
if _, err := loadCert(); err != nil {
return nil, err
}
cfg.GetCertificate = func(*tls.ClientHelloInfo) (*tls.Certificate, error) {
return loadCert()
}
if len(c.ClientCAs) > 0 {
clientCAPool := x509.NewCertPool()
clientCAFile, err := ioutil.ReadFile(c.ClientCAs)
if err != nil {
return nil, err
}
clientCAPool.AppendCertsFromPEM(clientCAFile)
cfg.ClientCAs = clientCAPool
}
if len(c.ClientAuth) > 0 {
switch s := (c.ClientAuth); s {
case "NoClientCert":
cfg.ClientAuth = tls.NoClientCert
case "RequestClientCert":
cfg.ClientAuth = tls.RequestClientCert
case "RequireClientCert":
cfg.ClientAuth = tls.RequireAnyClientCert
case "VerifyClientCertIfGiven":
cfg.ClientAuth = tls.VerifyClientCertIfGiven
case "RequireAndVerifyClientCert":
cfg.ClientAuth = tls.RequireAndVerifyClientCert
case "":
cfg.ClientAuth = tls.NoClientCert
default:
return nil, errors.New("Invalid ClientAuth: " + s)
}
}
if len(c.ClientCAs) > 0 && cfg.ClientAuth == tls.NoClientCert {
return nil, errors.New("Client CA's have been configured without a Client Auth Policy")
}
return cfg, nil
}
// Listen starts the server on the given address. If tlsConfigPath isn't empty the server connection will be started using TLS.
func Listen(server *http.Server, tlsConfigPath string) error {
if (tlsConfigPath) == "" {
return server.ListenAndServe()
}
var err error
server.TLSConfig, err = getTLSConfig(tlsConfigPath)
if err != nil {
return err
}
// Set the GetConfigForClient method of the HTTPS server so that the config
// and certs are reloaded on new connections.
server.TLSConfig.GetConfigForClient = func(*tls.ClientHelloInfo) (*tls.Config, error) {
return getTLSConfig(tlsConfigPath)
}
return server.ListenAndServeTLS("", "")
}

@ -1,11 +0,0 @@
tls_config:
# Certificate and key files for server to use to authenticate to client
cert_file: <filename>
key_file: <filename>
# Server policy for client authentication. Maps to ClientAuth Policies
# For more detail on clientAuth options: [ClientAuthType](https://golang.org/pkg/crypto/tls/#ClientAuthType)
[ client_auth_type: <string> | default = "NoClientCert" ]
# CA certificate for client certificate authentication to the server
[ client_ca_file: <filename> ]

@ -68,9 +68,16 @@ func (c *HistogramCollector) After(ctx context.Context, method, statusCode strin
// (this will always work for a HistogramVec).
func ObserveWithExemplar(ctx context.Context, histogram prometheus.Observer, seconds float64) {
if traceID, ok := tracing.ExtractSampledTraceID(ctx); ok {
lbls := prometheus.Labels{"traceID": traceID}
if userID, err := user.ExtractUserID(ctx); err == nil {
lbls["user"] = userID
}
if orgID, err := user.ExtractOrgID(ctx); err == nil {
lbls["organization"] = orgID
}
histogram.(prometheus.ExemplarObserver).ObserveWithExemplar(
seconds,
prometheus.Labels{"traceID": traceID},
lbls,
)
return
}

@ -1,137 +0,0 @@
package logging
import (
"fmt"
"strings"
"sync"
"time"
log "github.com/sirupsen/logrus"
)
const (
defaultDedupeInterval = time.Minute
)
// SetupDeduplication should be performed after any other logging setup.
// For all logs less severe or equal to the given log level (but still higher than the logger's configured log level),
// these logs will be 'deduplicated'. What this means is that, excluding certain special fields like time, multiple
// identical log entries will be grouped up and a summary message emitted.
// For example, instead of:
// 00:00:00 INFO User 123 did xyz
// 00:00:10 INFO User 123 did xyz
// 00:00:25 INFO User 123 did xyz
// 00:00:55 INFO User 123 did xyz
// you would get:
// 00:00:00 INFO User 123 did xyz
// 00:01:00 INFO Repeated 3 times: User 123 did xyz
// The interval argument controls how long to wait for additional messages to arrive before reporting.
// Increase it to deduplicate more aggressively, decrease it to lower latency from a log occurring to it appearing.
// Set it to 0 to pick a sensible default value (recommended).
// NOTE: For simplicity and efficiency, fields are considered 'equal' if and only if their string representations (%v) are equal.
func SetupDeduplication(logLevel string, interval time.Duration) error {
dedupeLevel, err := log.ParseLevel(logLevel)
if err != nil {
return fmt.Errorf("Error parsing log level: %v", err)
}
if interval <= 0 {
interval = defaultDedupeInterval
}
// We use a special Formatter to either format the log using the original formatter, or to return ""
// so nothing will be written for that event. The repeated entries are later logged along with a field flag
// that tells the formatter to ignore the message.
stdLogger := log.StandardLogger()
stdLogger.Formatter = newDedupeFormatter(stdLogger.Formatter, dedupeLevel, interval)
return nil
}
type entryCount struct {
entry log.Entry
count int
}
type dedupeFormatter struct {
innerFormatter log.Formatter
level log.Level
interval time.Duration
seen map[string]entryCount
lock sync.Mutex
}
func newDedupeFormatter(innerFormatter log.Formatter, level log.Level, interval time.Duration) *dedupeFormatter {
return &dedupeFormatter{
innerFormatter: innerFormatter,
level: level,
interval: interval,
seen: map[string]entryCount{},
}
}
func (f *dedupeFormatter) Format(entry *log.Entry) ([]byte, error) {
if f.shouldLog(entry) {
b, err := f.innerFormatter.Format(entry)
return b, err
}
return []byte{}, nil
}
func (f *dedupeFormatter) shouldLog(entry *log.Entry) bool {
if _, ok := entry.Data["deduplicated"]; ok {
// ignore our own logs about deduped messages
return true
}
if entry.Level < f.level {
// ignore logs more severe than our level
return true
}
key := fmt.Sprintf("%s %s", entry.Message, fieldsToString(entry.Data))
f.lock.Lock()
defer f.lock.Unlock()
if ec, ok := f.seen[key]; ok {
// already seen, increment count and do not log
ec.count++
f.seen[key] = ec
return false
}
// New message, log it but add it to seen.
// We need to copy because the pointer ceases to be valid after we return from Format
f.seen[key] = entryCount{entry: *entry}
go f.evictEntry(key) // queue to evict later
return true
}
// Wait for interval seconds then evict the entry and send the log
func (f *dedupeFormatter) evictEntry(key string) {
time.Sleep(f.interval)
var ec entryCount
func() {
f.lock.Lock()
defer f.lock.Unlock()
ec = f.seen[key]
delete(f.seen, key)
}()
if ec.count == 0 {
return
}
entry := log.WithFields(ec.entry.Data).WithField("deduplicated", ec.count)
message := fmt.Sprintf("Repeated %d times: %s", ec.count, ec.entry.Message)
// There's no way to choose the log level dynamically, so we have to do this hack
map[log.Level]func(args ...interface{}){
log.PanicLevel: entry.Panic,
log.FatalLevel: entry.Fatal,
log.ErrorLevel: entry.Error,
log.WarnLevel: entry.Warn,
log.InfoLevel: entry.Info,
log.DebugLevel: entry.Debug,
}[ec.entry.Level](message)
}
func fieldsToString(data log.Fields) string {
parts := make([]string, 0, len(data))
// traversal order here is arbitrary but stable, which is fine for our purposes
for k, v := range data {
parts = append(parts, fmt.Sprintf("%s=%v", k, v))
}
return strings.Join(parts, " ")
}

@ -17,8 +17,13 @@ func NewGoKitFormat(l Level, f Format) Interface {
} else {
logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
}
return addStandardFields(logger, l)
}
// stand-alone for test purposes
func addStandardFields(logger log.Logger, l Level) Interface {
logger = log.With(logger, "ts", log.DefaultTimestampUTC, "caller", log.Caller(5))
logger = level.NewFilter(logger, l.Gokit)
logger = log.With(logger, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller)
return gokit{logger}
}
@ -36,32 +41,52 @@ type gokit struct {
log.Logger
}
// Helper to defer sprintf until it is needed.
type sprintf struct {
format string
args []interface{}
}
func (s *sprintf) String() string {
return fmt.Sprintf(s.format, s.args...)
}
// Helper to defer sprint until it is needed.
// Note we don't use Sprintln because the output is passed to go-kit as one value among many on a line
type sprint struct {
args []interface{}
}
func (s *sprint) String() string {
return fmt.Sprint(s.args...)
}
func (g gokit) Debugf(format string, args ...interface{}) {
level.Debug(g.Logger).Log("msg", fmt.Sprintf(format, args...))
level.Debug(g.Logger).Log("msg", &sprintf{format: format, args: args})
}
func (g gokit) Debugln(args ...interface{}) {
level.Debug(g.Logger).Log("msg", fmt.Sprintln(args...))
level.Debug(g.Logger).Log("msg", &sprint{args: args})
}
func (g gokit) Infof(format string, args ...interface{}) {
level.Info(g.Logger).Log("msg", fmt.Sprintf(format, args...))
level.Info(g.Logger).Log("msg", &sprintf{format: format, args: args})
}
func (g gokit) Infoln(args ...interface{}) {
level.Info(g.Logger).Log("msg", fmt.Sprintln(args...))
level.Info(g.Logger).Log("msg", &sprint{args: args})
}
func (g gokit) Warnf(format string, args ...interface{}) {
level.Warn(g.Logger).Log("msg", fmt.Sprintf(format, args...))
level.Warn(g.Logger).Log("msg", &sprintf{format: format, args: args})
}
func (g gokit) Warnln(args ...interface{}) {
level.Warn(g.Logger).Log("msg", fmt.Sprintln(args...))
level.Warn(g.Logger).Log("msg", &sprint{args: args})
}
func (g gokit) Errorf(format string, args ...interface{}) {
level.Error(g.Logger).Log("msg", fmt.Sprintf(format, args...))
level.Error(g.Logger).Log("msg", &sprintf{format: format, args: args})
}
func (g gokit) Errorln(args ...interface{}) {
level.Error(g.Logger).Log("msg", fmt.Sprintln(args...))
level.Error(g.Logger).Log("msg", &sprint{args: args})
}
func (g gokit) WithField(key string, value interface{}) Interface {

@ -1,6 +1,8 @@
package middleware
import (
"context"
"io"
"strconv"
"time"
@ -8,8 +10,8 @@ import (
grpcUtils "github.com/weaveworks/common/grpc"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/instrument"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
func observe(ctx context.Context, hist *prometheus.HistogramVec, method string, err error, duration time.Duration) {
@ -45,3 +47,90 @@ func StreamServerInstrumentInterceptor(hist *prometheus.HistogramVec) grpc.Strea
return err
}
}
// UnaryClientInstrumentInterceptor records duration of gRPC requests client side.
func UnaryClientInstrumentInterceptor(metric *prometheus.HistogramVec) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, resp interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
start := time.Now()
err := invoker(ctx, method, req, resp, cc, opts...)
metric.WithLabelValues(method, errorCode(err)).Observe(time.Since(start).Seconds())
return err
}
}
// StreamClientInstrumentInterceptor records duration of streaming gRPC requests client side.
func StreamClientInstrumentInterceptor(metric *prometheus.HistogramVec) grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string,
streamer grpc.Streamer, opts ...grpc.CallOption,
) (grpc.ClientStream, error) {
start := time.Now()
stream, err := streamer(ctx, desc, cc, method, opts...)
return &instrumentedClientStream{
metric: metric,
start: start,
method: method,
ClientStream: stream,
}, err
}
}
type instrumentedClientStream struct {
metric *prometheus.HistogramVec
start time.Time
method string
grpc.ClientStream
}
func (s *instrumentedClientStream) SendMsg(m interface{}) error {
err := s.ClientStream.SendMsg(m)
if err == nil {
return nil
}
if err == io.EOF {
s.metric.WithLabelValues(s.method, errorCode(nil)).Observe(time.Since(s.start).Seconds())
} else {
s.metric.WithLabelValues(s.method, errorCode(err)).Observe(time.Since(s.start).Seconds())
}
return err
}
func (s *instrumentedClientStream) RecvMsg(m interface{}) error {
err := s.ClientStream.RecvMsg(m)
if err == nil {
return nil
}
if err == io.EOF {
s.metric.WithLabelValues(s.method, errorCode(nil)).Observe(time.Since(s.start).Seconds())
} else {
s.metric.WithLabelValues(s.method, errorCode(err)).Observe(time.Since(s.start).Seconds())
}
return err
}
func (s *instrumentedClientStream) Header() (metadata.MD, error) {
md, err := s.ClientStream.Header()
if err != nil {
s.metric.WithLabelValues(s.method, errorCode(err)).Observe(time.Since(s.start).Seconds())
}
return md, err
}
// errorCode converts an error into an error code string.
func errorCode(err error) string {
if err == nil {
return "2xx"
}
if errResp, ok := httpgrpc.HTTPResponseFromError(err); ok {
statusFamily := int(errResp.Code / 100)
return strconv.Itoa(statusFamily) + "xx"
} else if grpcUtils.IsCanceled(err) {
return "cancel"
} else {
return "error"
}
}

@ -20,13 +20,18 @@ const (
type GRPCServerLog struct {
Log logging.Interface
// WithRequest will log the entire request rather than just the error
WithRequest bool
WithRequest bool
DisableRequestSuccessLog bool
}
// UnaryServerInterceptor returns an interceptor that logs gRPC requests
func (s GRPCServerLog) UnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
begin := time.Now()
resp, err := handler(ctx, req)
if err == nil && s.DisableRequestSuccessLog {
return resp, nil
}
entry := user.LogWith(ctx, s.Log).WithFields(logging.Fields{"method": info.FullMethod, "duration": time.Since(begin)})
if err != nil {
if s.WithRequest {
@ -47,6 +52,10 @@ func (s GRPCServerLog) UnaryServerInterceptor(ctx context.Context, req interface
func (s GRPCServerLog) StreamServerInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
begin := time.Now()
err := handler(srv, ss)
if err == nil && s.DisableRequestSuccessLog {
return nil
}
entry := user.LogWith(ss.Context(), s.Log).WithFields(logging.Fields{"method": info.FullMethod, "duration": time.Since(begin)})
if err != nil {
if grpcUtils.IsCanceled(err) {

@ -29,11 +29,19 @@ func (t Tracer) Wrap(next http.Handler) http.Handler {
return fmt.Sprintf("HTTP %s - %s", r.Method, op)
}),
}
if t.SourceIPs != nil {
options = append(options, nethttp.MWSpanObserver(func(sp opentracing.Span, r *http.Request) {
sp.SetTag("sourceIPs", t.SourceIPs.Get(r))
}))
nethttp.MWSpanObserver(func(sp opentracing.Span, r *http.Request) {
// add a tag with the client's user agent to the span
userAgent := r.Header.Get("User-Agent")
if userAgent != "" {
sp.SetTag("http.user_agent", userAgent)
}
// add a tag with the client's sourceIPs to the span, if a
// SourceIPExtractor is given.
if t.SourceIPs != nil {
sp.SetTag("sourceIPs", t.SourceIPs.Get(r))
}
}),
}
return nethttp.Middleware(opentracing.GlobalTracer(), next, options...)

@ -14,9 +14,10 @@ import (
// Log middleware logs http requests
type Log struct {
Log logging.Interface
LogRequestHeaders bool // LogRequestHeaders true -> dump http headers at debug log level
SourceIPs *SourceIPExtractor
Log logging.Interface
LogRequestHeaders bool // LogRequestHeaders true -> dump http headers at debug log level
LogRequestAtInfoLevel bool // LogRequestAtInfoLevel true -> log requests at info log level
SourceIPs *SourceIPExtractor
}
// logWithRequest information from the request and context as fields.
@ -42,11 +43,12 @@ func (l Log) Wrap(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
begin := time.Now()
uri := r.RequestURI // capture the URI before running next, as it may get rewritten
requestLog := l.logWithRequest(r)
// Log headers before running 'next' in case other interceptors change the data.
headers, err := dumpRequest(r)
if err != nil {
headers = nil
l.logWithRequest(r).Errorf("Could not dump request headers: %v", err)
requestLog.Errorf("Could not dump request headers: %v", err)
}
var buf bytes.Buffer
wrapped := newBadResponseLoggingWriter(w, &buf)
@ -56,20 +58,32 @@ func (l Log) Wrap(next http.Handler) http.Handler {
if writeErr != nil {
if errors.Is(writeErr, context.Canceled) {
l.logWithRequest(r).Debugf("%s %s %s, request cancelled: %s ws: %v; %s", r.Method, uri, time.Since(begin), writeErr, IsWSHandshakeRequest(r), headers)
if l.LogRequestAtInfoLevel {
requestLog.Infof("%s %s %s, request cancelled: %s ws: %v; %s", r.Method, uri, time.Since(begin), writeErr, IsWSHandshakeRequest(r), headers)
} else {
requestLog.Debugf("%s %s %s, request cancelled: %s ws: %v; %s", r.Method, uri, time.Since(begin), writeErr, IsWSHandshakeRequest(r), headers)
}
} else {
l.logWithRequest(r).Warnf("%s %s %s, error: %s ws: %v; %s", r.Method, uri, time.Since(begin), writeErr, IsWSHandshakeRequest(r), headers)
requestLog.Warnf("%s %s %s, error: %s ws: %v; %s", r.Method, uri, time.Since(begin), writeErr, IsWSHandshakeRequest(r), headers)
}
return
}
if 100 <= statusCode && statusCode < 500 || statusCode == http.StatusBadGateway || statusCode == http.StatusServiceUnavailable {
l.logWithRequest(r).Debugf("%s %s (%d) %s", r.Method, uri, statusCode, time.Since(begin))
if l.LogRequestAtInfoLevel {
requestLog.Infof("%s %s (%d) %s", r.Method, uri, statusCode, time.Since(begin))
} else {
requestLog.Debugf("%s %s (%d) %s", r.Method, uri, statusCode, time.Since(begin))
}
if l.LogRequestHeaders && headers != nil {
l.logWithRequest(r).Debugf("ws: %v; %s", IsWSHandshakeRequest(r), string(headers))
if l.LogRequestAtInfoLevel {
requestLog.Infof("ws: %v; %s", IsWSHandshakeRequest(r), string(headers))
} else {
requestLog.Debugf("ws: %v; %s", IsWSHandshakeRequest(r), string(headers))
}
}
} else {
l.logWithRequest(r).Warnf("%s %s (%d) %s Response: %q ws: %v; %s",
requestLog.Warnf("%s %s (%d) %s Response: %q ws: %v; %s",
r.Method, uri, statusCode, time.Since(begin), buf.Bytes(), IsWSHandshakeRequest(r), headers)
}
})

@ -16,7 +16,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
node_https "github.com/prometheus/node_exporter/https"
"github.com/prometheus/exporter-toolkit/web"
"golang.org/x/net/context"
"golang.org/x/net/netutil"
"google.golang.org/grpc"
@ -50,6 +50,14 @@ type SignalHandler interface {
Stop()
}
// TLSConfig contains TLS parameters for Config.
type TLSConfig struct {
TLSCertPath string `yaml:"cert_file"`
TLSKeyPath string `yaml:"key_file"`
ClientAuth string `yaml:"client_auth_type"`
ClientCAs string `yaml:"client_ca_file"`
}
// Config for a Server
type Config struct {
MetricsNamespace string `yaml:"-"`
@ -62,11 +70,12 @@ type Config struct {
GRPCListenPort int `yaml:"grpc_listen_port"`
GRPCConnLimit int `yaml:"grpc_listen_conn_limit"`
HTTPTLSConfig node_https.TLSStruct `yaml:"http_tls_config"`
GRPCTLSConfig node_https.TLSStruct `yaml:"grpc_tls_config"`
HTTPTLSConfig TLSConfig `yaml:"http_tls_config"`
GRPCTLSConfig TLSConfig `yaml:"grpc_tls_config"`
RegisterInstrumentation bool `yaml:"register_instrumentation"`
ExcludeRequestInLog bool `yaml:"-"`
RegisterInstrumentation bool `yaml:"register_instrumentation"`
ExcludeRequestInLog bool `yaml:"-"`
DisableRequestSuccessLog bool `yaml:"-"`
ServerGracefulShutdownTimeout time.Duration `yaml:"graceful_shutdown_timeout"`
HTTPServerReadTimeout time.Duration `yaml:"http_server_read_timeout"`
@ -91,16 +100,21 @@ type Config struct {
GRPCServerMinTimeBetweenPings time.Duration `yaml:"grpc_server_min_time_between_pings"`
GRPCServerPingWithoutStreamAllowed bool `yaml:"grpc_server_ping_without_stream_allowed"`
LogFormat logging.Format `yaml:"log_format"`
LogLevel logging.Level `yaml:"log_level"`
Log logging.Interface `yaml:"-"`
LogSourceIPs bool `yaml:"log_source_ips_enabled"`
LogSourceIPsHeader string `yaml:"log_source_ips_header"`
LogSourceIPsRegex string `yaml:"log_source_ips_regex"`
LogFormat logging.Format `yaml:"log_format"`
LogLevel logging.Level `yaml:"log_level"`
Log logging.Interface `yaml:"-"`
LogSourceIPs bool `yaml:"log_source_ips_enabled"`
LogSourceIPsHeader string `yaml:"log_source_ips_header"`
LogSourceIPsRegex string `yaml:"log_source_ips_regex"`
LogRequestAtInfoLevel bool `yaml:"log_request_at_info_level_enabled"`
// If not set, default signal handler is used.
SignalHandler SignalHandler `yaml:"-"`
// If not set, default Prometheus registry is used.
Registerer prometheus.Registerer `yaml:"-"`
Gatherer prometheus.Gatherer `yaml:"-"`
PathPrefix string `yaml:"http_path_prefix"`
}
@ -145,6 +159,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.LogSourceIPs, "server.log-source-ips-enabled", false, "Optionally log the source IPs.")
f.StringVar(&cfg.LogSourceIPsHeader, "server.log-source-ips-header", "", "Header field storing the source IPs. Only used if server.log-source-ips-enabled is true. If not set the default Forwarded, X-Real-IP and X-Forwarded-For headers are used")
f.StringVar(&cfg.LogSourceIPsRegex, "server.log-source-ips-regex", "", "Regex for matching the source IPs. Only used if server.log-source-ips-enabled is true. If not set the default Forwarded, X-Real-IP and X-Forwarded-For headers are used")
f.BoolVar(&cfg.LogRequestAtInfoLevel, "server.log-request-at-info-level-enabled", false, "Optionally log requests at info level instead of debug level.")
}
// Server wraps a HTTP and gRPC server, and some common initialization.
@ -160,6 +175,8 @@ type Server struct {
HTTPServer *http.Server
GRPC *grpc.Server
Log logging.Interface
Registerer prometheus.Registerer
Gatherer prometheus.Gatherer
}
// New makes a new Server
@ -171,6 +188,13 @@ func New(cfg Config) (*Server, error) {
}, []string{"protocol"})
prometheus.MustRegister(tcpConnections)
tcpConnectionsLimit := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: cfg.MetricsNamespace,
Name: "tcp_connections_limit",
Help: "The max number of TCP connections that can be accepted (0 means no limit).",
}, []string{"protocol"})
prometheus.MustRegister(tcpConnectionsLimit)
network := cfg.HTTPListenNetwork
if network == "" {
network = DefaultNetwork
@ -182,6 +206,7 @@ func New(cfg Config) (*Server, error) {
}
httpListener = middleware.CountingListener(httpListener, tcpConnections.WithLabelValues("http"))
tcpConnectionsLimit.WithLabelValues("http").Set(float64(cfg.HTTPConnLimit))
if cfg.HTTPConnLimit > 0 {
httpListener = netutil.LimitListener(httpListener, cfg.HTTPConnLimit)
}
@ -196,6 +221,7 @@ func New(cfg Config) (*Server, error) {
}
grpcListener = middleware.CountingListener(grpcListener, tcpConnections.WithLabelValues("grpc"))
tcpConnectionsLimit.WithLabelValues("grpc").Set(float64(cfg.GRPCConnLimit))
if cfg.GRPCConnLimit > 0 {
grpcListener = netutil.LimitListener(grpcListener, cfg.GRPCConnLimit)
}
@ -207,19 +233,39 @@ func New(cfg Config) (*Server, error) {
log = logging.NewLogrus(cfg.LogLevel)
}
// If user doesn't supply a registerer/gatherer, use Prometheus' by default.
reg := cfg.Registerer
if reg == nil {
reg = prometheus.DefaultRegisterer
}
gatherer := cfg.Gatherer
if gatherer == nil {
gatherer = prometheus.DefaultGatherer
}
// Setup TLS
var httpTLSConfig *tls.Config
if len(cfg.HTTPTLSConfig.TLSCertPath) > 0 && len(cfg.HTTPTLSConfig.TLSKeyPath) > 0 {
// Note: ConfigToTLSConfig from prometheus/node_exporter is awaiting security review.
httpTLSConfig, err = node_https.ConfigToTLSConfig(&cfg.HTTPTLSConfig)
// Note: ConfigToTLSConfig from prometheus/exporter-toolkit is awaiting security review.
httpTLSConfig, err = web.ConfigToTLSConfig(&web.TLSStruct{
TLSCertPath: cfg.HTTPTLSConfig.TLSCertPath,
TLSKeyPath: cfg.HTTPTLSConfig.TLSKeyPath,
ClientAuth: cfg.HTTPTLSConfig.ClientAuth,
ClientCAs: cfg.HTTPTLSConfig.ClientCAs,
})
if err != nil {
return nil, fmt.Errorf("error generating http tls config: %v", err)
}
}
var grpcTLSConfig *tls.Config
if len(cfg.GRPCTLSConfig.TLSCertPath) > 0 && len(cfg.GRPCTLSConfig.TLSKeyPath) > 0 {
// Note: ConfigToTLSConfig from prometheus/node_exporter is awaiting security review.
grpcTLSConfig, err = node_https.ConfigToTLSConfig(&cfg.GRPCTLSConfig)
// Note: ConfigToTLSConfig from prometheus/exporter-toolkit is awaiting security review.
grpcTLSConfig, err = web.ConfigToTLSConfig(&web.TLSStruct{
TLSCertPath: cfg.GRPCTLSConfig.TLSCertPath,
TLSKeyPath: cfg.GRPCTLSConfig.TLSKeyPath,
ClientAuth: cfg.GRPCTLSConfig.ClientAuth,
ClientCAs: cfg.GRPCTLSConfig.ClientCAs,
})
if err != nil {
return nil, fmt.Errorf("error generating grpc tls config: %v", err)
}
@ -232,7 +278,7 @@ func New(cfg Config) (*Server, error) {
Help: "Time (in seconds) spent serving HTTP requests.",
Buckets: instrument.DefBuckets,
}, []string{"method", "route", "status_code", "ws"})
prometheus.MustRegister(requestDuration)
reg.MustRegister(requestDuration)
receivedMessageSize := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: cfg.MetricsNamespace,
@ -240,7 +286,7 @@ func New(cfg Config) (*Server, error) {
Help: "Size (in bytes) of messages received in the request.",
Buckets: middleware.BodySizeBuckets,
}, []string{"method", "route"})
prometheus.MustRegister(receivedMessageSize)
reg.MustRegister(receivedMessageSize)
sentMessageSize := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: cfg.MetricsNamespace,
@ -248,21 +294,22 @@ func New(cfg Config) (*Server, error) {
Help: "Size (in bytes) of messages sent in response.",
Buckets: middleware.BodySizeBuckets,
}, []string{"method", "route"})
prometheus.MustRegister(sentMessageSize)
reg.MustRegister(sentMessageSize)
inflightRequests := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: cfg.MetricsNamespace,
Name: "inflight_requests",
Help: "Current number of inflight requests.",
}, []string{"method", "route"})
prometheus.MustRegister(inflightRequests)
reg.MustRegister(inflightRequests)
log.WithField("http", httpListener.Addr()).WithField("grpc", grpcListener.Addr()).Infof("server listening on addresses")
// Setup gRPC server
serverLog := middleware.GRPCServerLog{
WithRequest: !cfg.ExcludeRequestInLog,
Log: log,
Log: log,
WithRequest: !cfg.ExcludeRequestInLog,
DisableRequestSuccessLog: cfg.DisableRequestSuccessLog,
}
grpcMiddleware := []grpc.UnaryServerInterceptor{
serverLog.UnaryServerInterceptor,
@ -325,7 +372,7 @@ func New(cfg Config) (*Server, error) {
router = router.PathPrefix(cfg.PathPrefix).Subrouter()
}
if cfg.RegisterInstrumentation {
RegisterInstrumentation(router)
RegisterInstrumentationWithGatherer(router, gatherer)
}
var sourceIPs *middleware.SourceIPExtractor
@ -342,8 +389,9 @@ func New(cfg Config) (*Server, error) {
SourceIPs: sourceIPs,
},
middleware.Log{
Log: log,
SourceIPs: sourceIPs,
Log: log,
SourceIPs: sourceIPs,
LogRequestAtInfoLevel: cfg.LogRequestAtInfoLevel,
},
middleware.Instrument{
RouteMatcher: router,
@ -385,12 +433,19 @@ func New(cfg Config) (*Server, error) {
HTTPServer: httpServer,
GRPC: grpcServer,
Log: log,
Registerer: reg,
Gatherer: gatherer,
}, nil
}
// RegisterInstrumentation on the given router.
func RegisterInstrumentation(router *mux.Router) {
router.Handle("/metrics", promhttp.HandlerFor(prometheus.DefaultGatherer, promhttp.HandlerOpts{
RegisterInstrumentationWithGatherer(router, prometheus.DefaultGatherer)
}
// RegisterInstrumentationWithGatherer on the given router.
func RegisterInstrumentationWithGatherer(router *mux.Router, gatherer prometheus.Gatherer) {
router.Handle("/metrics", promhttp.HandlerFor(gatherer, promhttp.HandlerOpts{
EnableOpenMetrics: true,
}))
router.PathPrefix("/debug/pprof").Handler(http.DefaultServeMux)

13
vendor/modules.txt vendored

@ -526,7 +526,7 @@ github.com/gorilla/mux
# github.com/gorilla/websocket v1.4.2
## explicit; go 1.12
github.com/gorilla/websocket
# github.com/grafana/dskit v0.0.0-20220624123803-3624a963826e
# github.com/grafana/dskit v0.0.0-20220708141012-99f3d0043c23
## explicit; go 1.17
github.com/grafana/dskit/backoff
github.com/grafana/dskit/concurrency
@ -542,7 +542,6 @@ github.com/grafana/dskit/kv/consul
github.com/grafana/dskit/kv/etcd
github.com/grafana/dskit/kv/memberlist
github.com/grafana/dskit/limiter
github.com/grafana/dskit/middleware
github.com/grafana/dskit/modules
github.com/grafana/dskit/multierror
github.com/grafana/dskit/netutil
@ -868,9 +867,9 @@ github.com/prometheus/common/version
# github.com/prometheus/common/sigv4 v0.1.0
## explicit; go 1.15
github.com/prometheus/common/sigv4
# github.com/prometheus/node_exporter v1.0.0-rc.0.0.20200428091818-01054558c289
## explicit; go 1.13
github.com/prometheus/node_exporter/https
# github.com/prometheus/exporter-toolkit v0.7.1
## explicit; go 1.14
github.com/prometheus/exporter-toolkit/web
# github.com/prometheus/procfs v0.7.3
## explicit; go 1.13
github.com/prometheus/procfs
@ -1034,8 +1033,8 @@ github.com/uber/jaeger-lib/metrics/prometheus
# github.com/ugorji/go/codec v1.1.7
## explicit
github.com/ugorji/go/codec
# github.com/weaveworks/common v0.0.0-20211015155308-ebe5bdc2c89e
## explicit; go 1.13
# github.com/weaveworks/common v0.0.0-20220706100410-67d27ed40fae
## explicit; go 1.14
github.com/weaveworks/common/aws
github.com/weaveworks/common/errors
github.com/weaveworks/common/grpc

Loading…
Cancel
Save