Update hedgedhttp to v0.7.0 (#4892)

* Update hedgedhttp to v0.7.0

* Update CHANGELOG

* Fix returned results
pull/4894/head
Oleg Kovalov 4 years ago committed by GitHub
parent 8ff27e243b
commit e893e2696c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 2
      go.mod
  3. 4
      go.sum
  4. 5
      pkg/storage/chunk/aws/s3_storage_client.go
  5. 5
      pkg/storage/chunk/azure/blob_storage_client.go
  6. 5
      pkg/storage/chunk/gcp/gcs_object_client.go
  7. 20
      pkg/storage/chunk/hedging/hedging.go
  8. 10
      pkg/storage/chunk/hedging/hedging_test.go
  9. 6
      pkg/storage/chunk/openstack/swift_object_client.go
  10. 29
      vendor/github.com/cristalhq/hedgedhttp/README.md
  11. 115
      vendor/github.com/cristalhq/hedgedhttp/hedged.go
  12. 73
      vendor/github.com/cristalhq/hedgedhttp/stats.go
  13. 2
      vendor/modules.txt

@ -15,6 +15,7 @@
* [4813](https://github.com/grafana/loki/pull/4813) **cyriltovena**: Promtail: Adds the ability to pull logs from Cloudflare.
* [4853](https://github.com/grafana/loki/pull/4853) **sandeepsukhani**: recreate compacted boltdb files from compactor to reduce storage space usage
* [4875](https://github.com/grafana/loki/pull/4875) **trevorwhitney**: Loki: fix bug where common replication factor wasn't always getting applied
* [4892](https://github.com/grafana/loki/pull/4892) **cristaloleg**: Loki: upgrade cristalhq/hedgedhttp from v0.6.0 to v0.7.0
# 2.4.1 (2021/11/07)

@ -22,7 +22,7 @@ require (
github.com/cespare/xxhash/v2 v2.1.2
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf
github.com/cortexproject/cortex v1.10.1-0.20211124141505-4e9fc3a2b5ab
github.com/cristalhq/hedgedhttp v0.6.2
github.com/cristalhq/hedgedhttp v0.7.0
github.com/davecgh/go-spew v1.1.1
github.com/docker/docker v20.10.11+incompatible
github.com/docker/go-plugins-helpers v0.0.0-20181025120712-1e6269c305b8

@ -482,8 +482,8 @@ github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7Do
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw=
github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/cristalhq/hedgedhttp v0.6.2 h1:aWnUOzqPaM8/dgmPLR7wl0AoFOPYnqgdhTkcWgWUgpA=
github.com/cristalhq/hedgedhttp v0.6.2/go.mod h1:XkqWU6qVMutbhW68NnzjWrGtH8NUx1UfYqGYtHVKIsI=
github.com/cristalhq/hedgedhttp v0.7.0 h1:C2XPDC+AQH4QJt6vZI4jB5WNyF86QbSJD4C4fW3H3ro=
github.com/cristalhq/hedgedhttp v0.7.0/go.mod h1:XkqWU6qVMutbhW68NnzjWrGtH8NUx1UfYqGYtHVKIsI=
github.com/cucumber/godog v0.8.1/go.mod h1:vSh3r/lM+psC1BPXvdkSEuNjmXfpVqrMGYAElF6hxnA=
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
github.com/cyphar/filepath-securejoin v0.2.2/go.mod h1:FpkQEhXnPnOthhzymB7CGsFk2G9VLXONKD9G7QGMM+4=

@ -316,7 +316,10 @@ func buildS3Client(cfg S3Config, hedgingCfg hedging.Config, hedging bool) (*s3.S
}
if hedging {
httpClient = hedgingCfg.ClientWithRegisterer(httpClient, prometheus.WrapRegistererWithPrefix("loki", prometheus.DefaultRegisterer))
httpClient, err = hedgingCfg.ClientWithRegisterer(httpClient, prometheus.WrapRegistererWithPrefix("loki", prometheus.DefaultRegisterer))
if err != nil {
return nil, err
}
}
s3Config = s3Config.WithHTTPClient(httpClient)

@ -265,8 +265,11 @@ func (b *BlobStorage) newPipeline(hedgingCfg hedging.Config, hedging bool) (pipe
})
if hedging {
client, err := hedgingCfg.ClientWithRegisterer(client, prometheus.WrapRegistererWithPrefix("loki", prometheus.DefaultRegisterer))
if err != nil {
return nil, err
}
opts.HTTPSender = pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc {
client := hedgingCfg.ClientWithRegisterer(client, prometheus.WrapRegistererWithPrefix("loki", prometheus.DefaultRegisterer))
return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
resp, err := client.Do(request.WithContext(ctx))
return pipeline.NewHTTPResponse(resp), err

@ -88,7 +88,10 @@ func newBucketHandle(ctx context.Context, cfg GCSConfig, hedgingCfg hedging.Conf
}
if hedging {
httpClient = hedgingCfg.ClientWithRegisterer(httpClient, prometheus.WrapRegistererWithPrefix("loki", prometheus.DefaultRegisterer))
httpClient, err = hedgingCfg.ClientWithRegisterer(httpClient, prometheus.WrapRegistererWithPrefix("loki", prometheus.DefaultRegisterer))
if err != nil {
return nil, err
}
}
opts = append(opts, option.WithHTTPClient(httpClient))

@ -60,13 +60,13 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
// Client returns a hedged http client.
// The client transport will be mutated to use the hedged roundtripper.
func (cfg *Config) Client(client *http.Client) *http.Client {
func (cfg *Config) Client(client *http.Client) (*http.Client, error) {
return cfg.ClientWithRegisterer(client, prometheus.DefaultRegisterer)
}
// ClientWithRegisterer returns a hedged http client with instrumentation registered to the provided registerer.
// The client transport will be mutated to use the hedged roundtripper.
func (cfg *Config) ClientWithRegisterer(client *http.Client, reg prometheus.Registerer) *http.Client {
func (cfg *Config) ClientWithRegisterer(client *http.Client, reg prometheus.Registerer) (*http.Client, error) {
if reg == nil {
reg = prometheus.DefaultRegisterer
}
@ -74,14 +74,18 @@ func (cfg *Config) ClientWithRegisterer(client *http.Client, reg prometheus.Regi
client = http.DefaultClient
}
if cfg.At == 0 {
return client
return client, nil
}
client.Transport = cfg.RoundTripperWithRegisterer(client.Transport, reg)
return client
var err error
client.Transport, err = cfg.RoundTripperWithRegisterer(client.Transport, reg)
if err != nil {
return nil, err
}
return client, nil
}
// RoundTripperWithRegisterer returns a hedged roundtripper with instrumentation registered to the provided registerer.
func (cfg *Config) RoundTripperWithRegisterer(next http.RoundTripper, reg prometheus.Registerer) http.RoundTripper {
func (cfg *Config) RoundTripperWithRegisterer(next http.RoundTripper, reg prometheus.Registerer) (http.RoundTripper, error) {
if reg == nil {
reg = prometheus.DefaultRegisterer
}
@ -89,7 +93,7 @@ func (cfg *Config) RoundTripperWithRegisterer(next http.RoundTripper, reg promet
next = http.DefaultTransport
}
if cfg.At == 0 {
return next
return next, nil
}
// register metrics
once.Do(func() {
@ -104,7 +108,7 @@ func (cfg *Config) RoundTripperWithRegisterer(next http.RoundTripper, reg promet
}
// RoundTripper returns a hedged roundtripper.
func (cfg *Config) RoundTripper(next http.RoundTripper) http.RoundTripper {
func (cfg *Config) RoundTripper(next http.RoundTripper) (http.RoundTripper, error) {
return cfg.RoundTripperWithRegisterer(next, prometheus.DefaultRegisterer)
}

@ -33,7 +33,7 @@ func TestHedging(t *testing.T) {
MaxPerSecond: 1000,
}
count := atomic.NewInt32(0)
client := cfg.Client(&http.Client{
client, err := cfg.Client(&http.Client{
Transport: RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
count.Inc()
time.Sleep(200 * time.Millisecond)
@ -42,6 +42,9 @@ func TestHedging(t *testing.T) {
}, nil
}),
})
if err != nil {
t.Fatal(err)
}
_, _ = client.Get("http://example.com")
require.Equal(t, int32(3), count.Load())
@ -65,7 +68,7 @@ func TestHedgingRateLimit(t *testing.T) {
MaxPerSecond: 1,
}
count := atomic.NewInt32(0)
client := cfg.Client(&http.Client{
client, err := cfg.Client(&http.Client{
Transport: RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
count.Inc()
time.Sleep(200 * time.Millisecond)
@ -74,6 +77,9 @@ func TestHedgingRateLimit(t *testing.T) {
}, nil
}),
})
if err != nil {
t.Fatal(err)
}
_, _ = client.Get("http://example.com")
require.Equal(t, int32(2), count.Load())

@ -111,7 +111,11 @@ func createConnection(cfg SwiftConfig, hedgingCfg hedging.Config, hedging bool)
c.DomainId = cfg.UserDomainID
}
if hedging {
c.Transport = hedgingCfg.RoundTripperWithRegisterer(c.Transport, prometheus.WrapRegistererWithPrefix("loki", prometheus.DefaultRegisterer))
var err error
c.Transport, err = hedgingCfg.RoundTripperWithRegisterer(c.Transport, prometheus.WrapRegistererWithPrefix("loki", prometheus.DefaultRegisterer))
if err != nil {
return nil, err
}
}
err := c.Authenticate()

@ -4,6 +4,7 @@
[![pkg-img]][pkg-url]
[![reportcard-img]][reportcard-url]
[![coverage-img]][coverage-url]
[![version-img]][version-url]
Hedged HTTP client which helps to reduce tail latency at scale.
@ -21,6 +22,7 @@ Thanks to [Bohdan Storozhuk](https://github.com/storozhukbm) for the review and
* Easy to integrate.
* Optimized for speed.
* Clean and tested code.
* Supports `http.Client` and `http.RoundTripper`.
* Dependency-free.
## Install
@ -33,7 +35,30 @@ go get github.com/cristalhq/hedgedhttp
## Example
TODO
```go
ctx := context.Background()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, "https://google.com", http.NoBody)
if err != nil {
panic(err)
}
timeout := 10 * time.Millisecond
upto := 7
client := &http.Client{Timeout: time.Second}
hedged, err := hedgedhttp.NewClient(timeout, upto, client)
if err != nil {
panic(err)
}
// will take `upto` requests, with a `timeout` delay between them
resp, err := hedged.Do(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()
```
Also see examples: [examples_test.go](https://github.com/cristalhq/hedgedhttp/blob/main/examples_test.go).
## Documentation
@ -51,3 +76,5 @@ See [these docs][pkg-url].
[reportcard-url]: https://goreportcard.com/report/cristalhq/hedgedhttp
[coverage-img]: https://codecov.io/gh/cristalhq/hedgedhttp/branch/main/graph/badge.svg
[coverage-url]: https://codecov.io/gh/cristalhq/hedgedhttp
[version-img]: https://img.shields.io/github/v/release/cristalhq/hedgedhttp
[version-url]: https://github.com/cristalhq/hedgedhttp/releases

@ -2,11 +2,11 @@ package hedgedhttp
import (
"context"
"errors"
"fmt"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
)
@ -15,47 +15,56 @@ const infiniteTimeout = 30 * 24 * time.Hour // domain specific infinite
// NewClient returns a new http.Client which implements hedged requests pattern.
// Given Client starts a new request after a timeout from previous request.
// Starts no more than upto requests.
func NewClient(timeout time.Duration, upto int, client *http.Client) *http.Client {
newClient, _ := NewClientAndStats(timeout, upto, client)
return newClient
func NewClient(timeout time.Duration, upto int, client *http.Client) (*http.Client, error) {
newClient, _, err := NewClientAndStats(timeout, upto, client)
if err != nil {
return nil, err
}
return newClient, nil
}
// NewClientAndStats returns a new http.Client which implements hedged requests pattern
// And Stats object that can be queried to obtain client's metrics.
// Given Client starts a new request after a timeout from previous request.
// Starts no more than upto requests.
func NewClientAndStats(timeout time.Duration, upto int, client *http.Client) (*http.Client, *Stats) {
func NewClientAndStats(timeout time.Duration, upto int, client *http.Client) (*http.Client, *Stats, error) {
if client == nil {
client = &http.Client{
Timeout: 5 * time.Second,
}
}
newTransport, metrics := NewRoundTripperAndStats(timeout, upto, client.Transport)
newTransport, metrics, err := NewRoundTripperAndStats(timeout, upto, client.Transport)
if err != nil {
return nil, nil, err
}
client.Transport = newTransport
return client, metrics
return client, metrics, nil
}
// NewRoundTripper returns a new http.RoundTripper which implements hedged requests pattern.
// Given RoundTripper starts a new request after a timeout from previous request.
// Starts no more than upto requests.
func NewRoundTripper(timeout time.Duration, upto int, rt http.RoundTripper) http.RoundTripper {
newRT, _ := NewRoundTripperAndStats(timeout, upto, rt)
return newRT
func NewRoundTripper(timeout time.Duration, upto int, rt http.RoundTripper) (http.RoundTripper, error) {
newRT, _, err := NewRoundTripperAndStats(timeout, upto, rt)
if err != nil {
return nil, err
}
return newRT, nil
}
// NewRoundTripperAndStats returns a new http.RoundTripper which implements hedged requests pattern
// And Stats object that can be queried to obtain client's metrics.
// Given RoundTripper starts a new request after a timeout from previous request.
// Starts no more than upto requests.
func NewRoundTripperAndStats(timeout time.Duration, upto int, rt http.RoundTripper) (http.RoundTripper, *Stats) {
func NewRoundTripperAndStats(timeout time.Duration, upto int, rt http.RoundTripper) (http.RoundTripper, *Stats, error) {
switch {
case timeout < 0:
panic("hedgedhttp: timeout cannot be negative")
return nil, nil, errors.New("hedgedhttp: timeout cannot be negative")
case upto < 1:
panic("hedgedhttp: upto must be greater than 0")
return nil, nil, errors.New("hedgedhttp: upto must be greater than 0")
}
if rt == nil {
@ -72,7 +81,7 @@ func NewRoundTripperAndStats(timeout time.Duration, upto int, rt http.RoundTripp
upto: upto,
metrics: &Stats{},
}
return hedged, hedged.metrics
return hedged, hedged.metrics, nil
}
type hedgedTransport struct {
@ -174,7 +183,7 @@ type indexedResp struct {
Resp *http.Response
}
func reqWithCtx(r *http.Request, ctx context.Context, isHedged bool) (*http.Request, func()) {
func reqWithCtx(r *http.Request, ctx context.Context, isHedged bool) (*http.Request, context.CancelFunc) {
ctx, cancel := context.WithCancel(ctx)
if isHedged {
ctx = context.WithValue(ctx, hedgedRequest{}, struct{}{})
@ -191,82 +200,12 @@ func IsHedgedRequest(r *http.Request) bool {
return val != nil
}
// atomicCounter is a false sharing safe counter.
type atomicCounter struct {
count uint64
_ [7]uint64
}
type cacheLine [64]byte
// Stats object that can be queried to obtain certain metrics and get better observability.
type Stats struct {
_ cacheLine
requestedRoundTrips atomicCounter
actualRoundTrips atomicCounter
failedRoundTrips atomicCounter
canceledByUserRoundTrips atomicCounter
canceledSubRequests atomicCounter
_ cacheLine
}
func (s *Stats) requestedRoundTripsInc() { atomic.AddUint64(&s.requestedRoundTrips.count, 1) }
func (s *Stats) actualRoundTripsInc() { atomic.AddUint64(&s.actualRoundTrips.count, 1) }
func (s *Stats) failedRoundTripsInc() { atomic.AddUint64(&s.failedRoundTrips.count, 1) }
func (s *Stats) canceledByUserRoundTripsInc() { atomic.AddUint64(&s.canceledByUserRoundTrips.count, 1) }
func (s *Stats) canceledSubRequestsInc() { atomic.AddUint64(&s.canceledSubRequests.count, 1) }
// RequestedRoundTrips returns count of requests that were requested by client.
func (s *Stats) RequestedRoundTrips() uint64 {
return atomic.LoadUint64(&s.requestedRoundTrips.count)
}
// ActualRoundTrips returns count of requests that were actually sent.
func (s *Stats) ActualRoundTrips() uint64 {
return atomic.LoadUint64(&s.actualRoundTrips.count)
}
// FailedRoundTrips returns count of requests that failed.
func (s *Stats) FailedRoundTrips() uint64 {
return atomic.LoadUint64(&s.failedRoundTrips.count)
}
// CanceledByUserRoundTrips returns count of requests that were canceled by user, using request context.
func (s *Stats) CanceledByUserRoundTrips() uint64 {
return atomic.LoadUint64(&s.canceledByUserRoundTrips.count)
}
// CanceledSubRequests returns count of hedged sub-requests that were canceled by transport.
func (s *Stats) CanceledSubRequests() uint64 {
return atomic.LoadUint64(&s.canceledSubRequests.count)
}
// StatsSnapshot is a snapshot of Stats.
type StatsSnapshot struct {
RequestedRoundTrips uint64 // count of requests that were requested by client
ActualRoundTrips uint64 // count of requests that were actually sent
FailedRoundTrips uint64 // count of requests that failed
CanceledByUserRoundTrips uint64 // count of requests that were canceled by user, using request context
CanceledSubRequests uint64 // count of hedged sub-requests that were canceled by transport
}
// Snapshot of the stats.
func (s *Stats) Snapshot() StatsSnapshot {
return StatsSnapshot{
RequestedRoundTrips: s.RequestedRoundTrips(),
ActualRoundTrips: s.ActualRoundTrips(),
FailedRoundTrips: s.FailedRoundTrips(),
CanceledByUserRoundTrips: s.CanceledByUserRoundTrips(),
CanceledSubRequests: s.CanceledSubRequests(),
}
}
var taskQueue = make(chan func())
func runInPool(task func()) {
select {
case taskQueue <- task:
// submited, everything is ok
// submitted, everything is ok
default:
go func() {
@ -292,7 +231,7 @@ func runInPool(task func()) {
// MultiError is an error type to track multiple errors. This is used to
// accumulate errors in cases and return them as a single "error".
// Insiper by https://github.com/hashicorp/go-multierror
// Inspired by https://github.com/hashicorp/go-multierror
type MultiError struct {
Errors []error
ErrorFormatFn ErrorFormatFunc
@ -349,7 +288,7 @@ func getTimer(duration time.Duration) *time.Timer {
func returnTimer(timer *time.Timer) {
timer.Stop()
select {
case _ = <-timer.C:
case <-timer.C:
default:
}
timerPool.Put(timer)

@ -0,0 +1,73 @@
package hedgedhttp
import "sync/atomic"
// atomicCounter is a false sharing safe counter.
type atomicCounter struct {
count uint64
_ [7]uint64
}
type cacheLine [64]byte
// Stats object that can be queried to obtain certain metrics and get better observability.
type Stats struct {
_ cacheLine
requestedRoundTrips atomicCounter
actualRoundTrips atomicCounter
failedRoundTrips atomicCounter
canceledByUserRoundTrips atomicCounter
canceledSubRequests atomicCounter
_ cacheLine
}
func (s *Stats) requestedRoundTripsInc() { atomic.AddUint64(&s.requestedRoundTrips.count, 1) }
func (s *Stats) actualRoundTripsInc() { atomic.AddUint64(&s.actualRoundTrips.count, 1) }
func (s *Stats) failedRoundTripsInc() { atomic.AddUint64(&s.failedRoundTrips.count, 1) }
func (s *Stats) canceledByUserRoundTripsInc() { atomic.AddUint64(&s.canceledByUserRoundTrips.count, 1) }
func (s *Stats) canceledSubRequestsInc() { atomic.AddUint64(&s.canceledSubRequests.count, 1) }
// RequestedRoundTrips returns count of requests that were requested by client.
func (s *Stats) RequestedRoundTrips() uint64 {
return atomic.LoadUint64(&s.requestedRoundTrips.count)
}
// ActualRoundTrips returns count of requests that were actually sent.
func (s *Stats) ActualRoundTrips() uint64 {
return atomic.LoadUint64(&s.actualRoundTrips.count)
}
// FailedRoundTrips returns count of requests that failed.
func (s *Stats) FailedRoundTrips() uint64 {
return atomic.LoadUint64(&s.failedRoundTrips.count)
}
// CanceledByUserRoundTrips returns count of requests that were canceled by user, using request context.
func (s *Stats) CanceledByUserRoundTrips() uint64 {
return atomic.LoadUint64(&s.canceledByUserRoundTrips.count)
}
// CanceledSubRequests returns count of hedged sub-requests that were canceled by transport.
func (s *Stats) CanceledSubRequests() uint64 {
return atomic.LoadUint64(&s.canceledSubRequests.count)
}
// StatsSnapshot is a snapshot of Stats.
type StatsSnapshot struct {
RequestedRoundTrips uint64 // count of requests that were requested by client
ActualRoundTrips uint64 // count of requests that were actually sent
FailedRoundTrips uint64 // count of requests that failed
CanceledByUserRoundTrips uint64 // count of requests that were canceled by user, using request context
CanceledSubRequests uint64 // count of hedged sub-requests that were canceled by transport
}
// Snapshot of the stats.
func (s *Stats) Snapshot() StatsSnapshot {
return StatsSnapshot{
RequestedRoundTrips: s.RequestedRoundTrips(),
ActualRoundTrips: s.ActualRoundTrips(),
FailedRoundTrips: s.FailedRoundTrips(),
CanceledByUserRoundTrips: s.CanceledByUserRoundTrips(),
CanceledSubRequests: s.CanceledSubRequests(),
}
}

@ -311,7 +311,7 @@ github.com/cortexproject/cortex/pkg/util/spanlogger
github.com/cortexproject/cortex/pkg/util/test
github.com/cortexproject/cortex/pkg/util/validation
github.com/cortexproject/cortex/tools/querytee
# github.com/cristalhq/hedgedhttp v0.6.2
# github.com/cristalhq/hedgedhttp v0.7.0
## explicit; go 1.16
github.com/cristalhq/hedgedhttp
# github.com/davecgh/go-spew v1.1.1

Loading…
Cancel
Save