Add rate limiting and metrics to hedging (#4860)

* Add rate limiting and metrics to hedging

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add changelog entry

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Allow to pass a custom registerer

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
pull/4872/head
Cyril Tovena 4 years ago committed by GitHub
parent 77dcb16b4e
commit c33a651645
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 6
      docs/sources/configuration/_index.md
  3. 2
      go.mod
  4. 4
      go.sum
  5. 2
      pkg/storage/chunk/aws/s3_storage_client.go
  6. 5
      pkg/storage/chunk/aws/s3_storage_client_test.go
  7. 82
      pkg/storage/chunk/azure/blob_storage_client.go
  8. 19
      pkg/storage/chunk/azure/blob_storage_client_test.go
  9. 3
      pkg/storage/chunk/gcp/gcs_object_client.go
  10. 5
      pkg/storage/chunk/gcp/gcs_object_client_test.go
  11. 96
      pkg/storage/chunk/hedging/hedging.go
  12. 90
      pkg/storage/chunk/hedging/hedging_test.go
  13. 3
      pkg/storage/chunk/openstack/swift_object_client.go
  14. 5
      pkg/storage/chunk/openstack/swift_object_client_test.go
  15. 15
      vendor/github.com/cristalhq/hedgedhttp/hedged.go
  16. 2
      vendor/modules.txt

@ -1,5 +1,6 @@
## Main
* [4860](https://github.com/grafana/loki/pull/4860) **cyriltovena**: Add rate limiting and metrics to hedging
* [4865](https://github.com/grafana/loki/pull/4865) **taisho6339**: Fix duplicate registry.MustRegister call in Promtail Kafka
* [4845](https://github.com/grafana/loki/pull/4845) **chaudum** Return error responses consistently as JSON
* [4826](https://github.com/grafana/loki/pull/4826) **cyriltovena**: Adds the ability to hedge storage requests.

@ -872,8 +872,12 @@ You should configure the latency based on your p99 of object store requests.
# used with queriers and has minimal to no impact on other pieces.
[at: <duration> | default = 0]
# Optional. Default is 2
# The maximum amount of requests to be issued.
# The maximum amount of hedge requests to be issued for a given request.
[up_to: <int> | default = 2]
# Optional. Default is 5
# The maximum amount of hedged requests to be issued per seconds.
[max_per_second: <int> | default = 5]
```
## local_storage_config

@ -22,7 +22,7 @@ require (
github.com/cespare/xxhash/v2 v2.1.2
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf
github.com/cortexproject/cortex v1.10.1-0.20211124141505-4e9fc3a2b5ab
github.com/cristalhq/hedgedhttp v0.6.1
github.com/cristalhq/hedgedhttp v0.6.2
github.com/davecgh/go-spew v1.1.1
github.com/docker/docker v20.10.11+incompatible
github.com/docker/go-plugins-helpers v0.0.0-20181025120712-1e6269c305b8

@ -482,8 +482,8 @@ github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7Do
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw=
github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/cristalhq/hedgedhttp v0.6.1 h1:o3tcl+HwEFrGfNkZbgbQW4N7UNmorKvqhUFLN1rrkdA=
github.com/cristalhq/hedgedhttp v0.6.1/go.mod h1:XkqWU6qVMutbhW68NnzjWrGtH8NUx1UfYqGYtHVKIsI=
github.com/cristalhq/hedgedhttp v0.6.2 h1:aWnUOzqPaM8/dgmPLR7wl0AoFOPYnqgdhTkcWgWUgpA=
github.com/cristalhq/hedgedhttp v0.6.2/go.mod h1:XkqWU6qVMutbhW68NnzjWrGtH8NUx1UfYqGYtHVKIsI=
github.com/cucumber/godog v0.8.1/go.mod h1:vSh3r/lM+psC1BPXvdkSEuNjmXfpVqrMGYAElF6hxnA=
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
github.com/cyphar/filepath-securejoin v0.2.2/go.mod h1:FpkQEhXnPnOthhzymB7CGsFk2G9VLXONKD9G7QGMM+4=

@ -316,7 +316,7 @@ func buildS3Client(cfg S3Config, hedgingCfg hedging.Config, hedging bool) (*s3.S
}
if hedging {
httpClient = hedgingCfg.Client(httpClient)
httpClient = hedgingCfg.ClientWithRegisterer(httpClient, prometheus.WrapRegistererWithPrefix("loki", prometheus.DefaultRegisterer))
}
s3Config = s3Config.WithHTTPClient(httpClient)

@ -138,8 +138,9 @@ func Test_Hedging(t *testing.T) {
})
},
}, hedging.Config{
At: tc.hedgeAt,
UpTo: tc.upTo,
At: tc.hedgeAt,
UpTo: tc.upTo,
MaxPerSecond: 1000,
})
require.NoError(t, err)
tc.do(c)

@ -15,6 +15,7 @@ import (
"github.com/Azure/azure-pipeline-go/pipeline"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/mattn/go-ieproxy"
"github.com/prometheus/client_golang/prometheus"
cortex_azure "github.com/cortexproject/cortex/pkg/chunk/azure"
"github.com/cortexproject/cortex/pkg/util"
@ -57,23 +58,25 @@ var (
}
// default Azure http client.
defaultClient = &http.Client{
Transport: &http.Transport{
Proxy: ieproxy.GetProxyFunc(),
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).Dial,
MaxIdleConns: 0,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
DisableKeepAlives: false,
DisableCompression: false,
MaxResponseHeaderBytes: 0,
},
defaultClientFactory = func() *http.Client {
return &http.Client{
Transport: &http.Transport{
Proxy: ieproxy.GetProxyFunc(),
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).Dial,
MaxIdleConns: 0,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
DisableKeepAlives: false,
DisableCompression: false,
MaxResponseHeaderBytes: 0,
},
}
}
)
@ -132,20 +135,30 @@ func (c *BlobStorageConfig) ToCortexAzureConfig() cortex_azure.BlobStorageConfig
// Implements ObjectStorage
type BlobStorage struct {
// blobService storage.Serv
cfg *BlobStorageConfig
hedgingCfg hedging.Config
cfg *BlobStorageConfig
containerURL azblob.ContainerURL
pipeline pipeline.Pipeline
hedgingPipeline pipeline.Pipeline
}
// NewBlobStorage creates a new instance of the BlobStorage struct.
func NewBlobStorage(cfg *BlobStorageConfig, hedgingCfg hedging.Config) (*BlobStorage, error) {
log.WarnExperimentalUse("Azure Blob Storage")
blobStorage := &BlobStorage{
cfg: cfg,
hedgingCfg: hedgingCfg,
cfg: cfg,
}
var err error
pipeline, err := blobStorage.newPipeline(hedgingCfg, false)
if err != nil {
return nil, err
}
blobStorage.pipeline = pipeline
hedgingPipeline, err := blobStorage.newPipeline(hedgingCfg, true)
if err != nil {
return nil, err
}
blobStorage.hedgingPipeline = hedgingPipeline
blobStorage.containerURL, err = blobStorage.buildContainerURL()
if err != nil {
return nil, err
@ -210,13 +223,12 @@ func (b *BlobStorage) getBlobURL(blobID string, hedging bool) (azblob.BlockBlobU
if err != nil {
return azblob.BlockBlobURL{}, err
}
azPipeline, err := b.newPipeline(hedging)
if err != nil {
return azblob.BlockBlobURL{}, err
pipeline := b.pipeline
if hedging {
pipeline = b.hedgingPipeline
}
return azblob.NewBlockBlobURL(*u, azPipeline), nil
return azblob.NewBlockBlobURL(*u, pipeline), nil
}
func (b *BlobStorage) buildContainerURL() (azblob.ContainerURL, error) {
@ -225,15 +237,10 @@ func (b *BlobStorage) buildContainerURL() (azblob.ContainerURL, error) {
return azblob.ContainerURL{}, err
}
azPipeline, err := b.newPipeline(false)
if err != nil {
return azblob.ContainerURL{}, err
}
return azblob.NewContainerURL(*u, azPipeline), nil
return azblob.NewContainerURL(*u, b.pipeline), nil
}
func (b *BlobStorage) newPipeline(hedging bool) (pipeline.Pipeline, error) {
func (b *BlobStorage) newPipeline(hedgingCfg hedging.Config, hedging bool) (pipeline.Pipeline, error) {
credential, err := azblob.NewSharedKeyCredential(b.cfg.AccountName, b.cfg.AccountKey.Value)
if err != nil {
return nil, err
@ -248,17 +255,18 @@ func (b *BlobStorage) newPipeline(hedging bool) (pipeline.Pipeline, error) {
MaxRetryDelay: b.cfg.MaxRetryDelay,
},
}
client := defaultClientFactory()
opts.HTTPSender = pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc {
return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
resp, err := defaultClient.Do(request.WithContext(ctx))
resp, err := client.Do(request.WithContext(ctx))
return pipeline.NewHTTPResponse(resp), err
}
})
if hedging {
opts.HTTPSender = pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc {
client := b.hedgingCfg.Client(defaultClient)
client := hedgingCfg.ClientWithRegisterer(client, prometheus.WrapRegistererWithPrefix("loki", prometheus.DefaultRegisterer))
return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
resp, err := client.Do(request.WithContext(ctx))
return pipeline.NewHTTPResponse(resp), err

@ -62,20 +62,23 @@ func Test_Hedging(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
count := atomic.NewInt32(0)
// hijack the client to count the number of calls
defaultClient = &http.Client{
Transport: RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
count.Inc()
time.Sleep(200 * time.Millisecond)
return nil, errors.New("fo")
}),
defaultClientFactory = func() *http.Client {
return &http.Client{
Transport: RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
count.Inc()
time.Sleep(200 * time.Millisecond)
return nil, errors.New("fo")
}),
}
}
c, err := NewBlobStorage(&BlobStorageConfig{
ContainerName: "foo",
Environment: azureGlobal,
MaxRetries: 1,
}, hedging.Config{
At: tc.hedgeAt,
UpTo: tc.upTo,
At: tc.hedgeAt,
UpTo: tc.upTo,
MaxPerSecond: 1000,
})
require.NoError(t, err)
tc.do(c)

@ -9,6 +9,7 @@ import (
"cloud.google.com/go/storage"
cortex_gcp "github.com/cortexproject/cortex/pkg/chunk/gcp"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
@ -87,7 +88,7 @@ func newBucketHandle(ctx context.Context, cfg GCSConfig, hedgingCfg hedging.Conf
}
if hedging {
httpClient = hedgingCfg.Client(httpClient)
httpClient = hedgingCfg.ClientWithRegisterer(httpClient, prometheus.WrapRegistererWithPrefix("loki", prometheus.DefaultRegisterer))
}
opts = append(opts, option.WithHTTPClient(httpClient))

@ -63,8 +63,9 @@ func Test_Hedging(t *testing.T) {
BucketName: "test-bucket",
Insecure: true,
}, hedging.Config{
At: tc.hedgeAt,
UpTo: tc.upTo,
At: tc.hedgeAt,
UpTo: tc.upTo,
MaxPerSecond: 1000,
}, func(ctx context.Context, opts ...option.ClientOption) (*storage.Client, error) {
opts = append(opts, option.WithEndpoint(server.URL))
opts = append(opts, option.WithoutAuthentication())

@ -1,19 +1,49 @@
package hedging
import (
"errors"
"flag"
"net/http"
"sync"
"time"
"github.com/cristalhq/hedgedhttp"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"
)
var (
ErrTooManyHedgeRequests = errors.New("too many hedge requests")
totalHedgeRequests prometheus.Counter
totalRateLimitedHedgeRequests prometheus.Counter
once sync.Once
)
func init() {
initMetrics()
}
func initMetrics() {
once = sync.Once{}
totalHedgeRequests = prometheus.NewCounter(prometheus.CounterOpts{
Name: "hedged_requests_total",
Help: "The total number of hedged requests.",
})
totalRateLimitedHedgeRequests = prometheus.NewCounter(prometheus.CounterOpts{
Name: "hedged_requests_rate_limited_total",
Help: "The total number of hedged requests rejected via rate limiting.",
})
}
// Config is the configuration for hedging requests.
type Config struct {
// At is the duration after which a second request will be issued.
At time.Duration `yaml:"at"`
// UpTo is the maximum number of requests that will be issued.
UpTo int `yaml:"up_to"`
// The maximun of hedge requests allowed per second.
MaxPerSecond int `yaml:"max_per_second"`
}
// RegisterFlags registers flags.
@ -25,18 +55,78 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.UpTo, prefix+"hedge-requests-up-to", 2, "The maximun of hedge requests allowed.")
f.DurationVar(&cfg.At, prefix+"hedge-requests-at", 0, "If set to a non-zero value a second request will be issued at the provided duration. Default is 0 (disabled)")
f.IntVar(&cfg.MaxPerSecond, prefix+"hedge-max-per-second", 5, "The maximun of hedge requests allowed per seconds.")
}
// Client returns a hedged http client.
// The client transport will be mutated to use the hedged roundtripper.
func (cfg *Config) Client(client *http.Client) *http.Client {
return cfg.ClientWithRegisterer(client, prometheus.DefaultRegisterer)
}
// ClientWithRegisterer returns a hedged http client with instrumentation registered to the provided registerer.
// The client transport will be mutated to use the hedged roundtripper.
func (cfg *Config) ClientWithRegisterer(client *http.Client, reg prometheus.Registerer) *http.Client {
if reg == nil {
reg = prometheus.DefaultRegisterer
}
if client == nil {
client = http.DefaultClient
}
if cfg.At == 0 {
return client
}
return hedgedhttp.NewClient(cfg.At, cfg.UpTo, client)
client.Transport = cfg.RoundTripperWithRegisterer(client.Transport, reg)
return client
}
func (cfg *Config) RoundTripper(next http.RoundTripper) http.RoundTripper {
// RoundTripperWithRegisterer returns a hedged roundtripper with instrumentation registered to the provided registerer.
func (cfg *Config) RoundTripperWithRegisterer(next http.RoundTripper, reg prometheus.Registerer) http.RoundTripper {
if reg == nil {
reg = prometheus.DefaultRegisterer
}
if next == nil {
next = http.DefaultTransport
}
if cfg.At == 0 {
return next
}
return hedgedhttp.NewRoundTripper(cfg.At, cfg.UpTo, next)
// register metrics
once.Do(func() {
reg.MustRegister(totalHedgeRequests)
reg.MustRegister(totalRateLimitedHedgeRequests)
})
return hedgedhttp.NewRoundTripper(
cfg.At,
cfg.UpTo,
newLimitedHedgingRoundTripper(cfg.MaxPerSecond, next),
)
}
// RoundTripper returns a hedged roundtripper.
func (cfg *Config) RoundTripper(next http.RoundTripper) http.RoundTripper {
return cfg.RoundTripperWithRegisterer(next, prometheus.DefaultRegisterer)
}
type limitedHedgingRoundTripper struct {
next http.RoundTripper
limiter *rate.Limiter
}
func newLimitedHedgingRoundTripper(max int, next http.RoundTripper) *limitedHedgingRoundTripper {
return &limitedHedgingRoundTripper{
next: next,
limiter: rate.NewLimiter(rate.Limit(max), max),
}
}
func (rt *limitedHedgingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
if hedgedhttp.IsHedgedRequest(req) {
if !rt.limiter.Allow() {
totalRateLimitedHedgeRequests.Inc()
return nil, ErrTooManyHedgeRequests
}
totalHedgeRequests.Inc()
}
return rt.next.RoundTrip(req)
}

@ -0,0 +1,90 @@
package hedging
import (
"net/http"
"strings"
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)
type RoundTripperFunc func(*http.Request) (*http.Response, error)
func (fn RoundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) {
return fn(req)
}
func resetMetrics() {
reg := prometheus.NewRegistry()
prometheus.DefaultRegisterer = reg
prometheus.DefaultGatherer = reg
initMetrics()
}
func TestHedging(t *testing.T) {
resetMetrics()
cfg := &Config{
At: time.Duration(1),
UpTo: 3,
MaxPerSecond: 1000,
}
count := atomic.NewInt32(0)
client := cfg.Client(&http.Client{
Transport: RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
count.Inc()
time.Sleep(200 * time.Millisecond)
return &http.Response{
StatusCode: http.StatusOK,
}, nil
}),
})
_, _ = client.Get("http://example.com")
require.Equal(t, int32(3), count.Load())
require.NoError(t, testutil.GatherAndCompare(prometheus.DefaultGatherer,
strings.NewReader(`
# HELP hedged_requests_rate_limited_total The total number of hedged requests rejected via rate limiting.
# TYPE hedged_requests_rate_limited_total counter
hedged_requests_rate_limited_total 0
# HELP hedged_requests_total The total number of hedged requests.
# TYPE hedged_requests_total counter
hedged_requests_total 2
`,
), "hedged_requests_total", "hedged_requests_rate_limited_total"))
}
func TestHedgingRateLimit(t *testing.T) {
resetMetrics()
cfg := &Config{
At: time.Duration(1),
UpTo: 20,
MaxPerSecond: 1,
}
count := atomic.NewInt32(0)
client := cfg.Client(&http.Client{
Transport: RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
count.Inc()
time.Sleep(200 * time.Millisecond)
return &http.Response{
StatusCode: http.StatusOK,
}, nil
}),
})
_, _ = client.Get("http://example.com")
require.Equal(t, int32(2), count.Load())
require.NoError(t, testutil.GatherAndCompare(prometheus.DefaultGatherer,
strings.NewReader(`
# HELP hedged_requests_rate_limited_total The total number of hedged requests rejected via rate limiting.
# TYPE hedged_requests_rate_limited_total counter
hedged_requests_rate_limited_total 18
# HELP hedged_requests_total The total number of hedged requests.
# TYPE hedged_requests_total counter
hedged_requests_total 1
`,
), "hedged_requests_total", "hedged_requests_rate_limited_total"))
}

@ -12,6 +12,7 @@ import (
"github.com/ncw/swift"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
cortex_openstack "github.com/cortexproject/cortex/pkg/chunk/openstack"
cortex_swift "github.com/cortexproject/cortex/pkg/storage/bucket/swift"
@ -110,7 +111,7 @@ func createConnection(cfg SwiftConfig, hedgingCfg hedging.Config, hedging bool)
c.DomainId = cfg.UserDomainID
}
if hedging {
c.Transport = hedgingCfg.RoundTripper(c.Transport)
c.Transport = hedgingCfg.RoundTripperWithRegisterer(c.Transport, prometheus.WrapRegistererWithPrefix("loki", prometheus.DefaultRegisterer))
}
err := c.Authenticate()

@ -99,8 +99,9 @@ func Test_Hedging(t *testing.T) {
RequestTimeout: 10 * time.Second,
},
}, hedging.Config{
At: tc.hedgeAt,
UpTo: tc.upTo,
At: tc.hedgeAt,
UpTo: tc.upTo,
MaxPerSecond: 1000,
})
require.NoError(t, err)
tc.do(c)

@ -107,7 +107,7 @@ func (ht *hedgedTransport) RoundTrip(req *http.Request) (*http.Response, error)
for sent := 0; len(errOverall.Errors) < ht.upto; sent++ {
if sent < ht.upto {
idx := sent
subReq, cancel := reqWithCtx(req, mainCtx)
subReq, cancel := reqWithCtx(req, mainCtx, idx != 0)
cancels[idx] = cancel
runInPool(func() {
@ -174,12 +174,23 @@ type indexedResp struct {
Resp *http.Response
}
func reqWithCtx(r *http.Request, ctx context.Context) (*http.Request, func()) {
func reqWithCtx(r *http.Request, ctx context.Context, isHedged bool) (*http.Request, func()) {
ctx, cancel := context.WithCancel(ctx)
if isHedged {
ctx = context.WithValue(ctx, hedgedRequest{}, struct{}{})
}
req := r.WithContext(ctx)
return req, cancel
}
type hedgedRequest struct{}
// IsHedgedRequest reports when a request is hedged.
func IsHedgedRequest(r *http.Request) bool {
val := r.Context().Value(hedgedRequest{})
return val != nil
}
// atomicCounter is a false sharing safe counter.
type atomicCounter struct {
count uint64

@ -308,7 +308,7 @@ github.com/cortexproject/cortex/pkg/util/spanlogger
github.com/cortexproject/cortex/pkg/util/test
github.com/cortexproject/cortex/pkg/util/validation
github.com/cortexproject/cortex/tools/querytee
# github.com/cristalhq/hedgedhttp v0.6.1
# github.com/cristalhq/hedgedhttp v0.6.2
## explicit; go 1.16
github.com/cristalhq/hedgedhttp
# github.com/davecgh/go-spew v1.1.1

Loading…
Cancel
Save