Adds the ability to hedge storage requests. (#4826)

* Adds the ability to hedge request for all backends

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Remove race from tests

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Remove the race

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Remove the race

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Testing

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* More testing

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Setup credentials to avoid auth

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* gomod

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* improve tests

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* gomod

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* changelog

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Group the configuration

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
pull/4851/head
Cyril Tovena 4 years ago committed by GitHub
parent c6d3228263
commit b27894fbf4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      CHANGELOG.md
  2. 44
      docs/sources/configuration/_index.md
  3. 3
      go.mod
  4. 2
      go.sum
  5. 3
      pkg/loki/common/common.go
  6. 12
      pkg/loki/config_wrapper.go
  7. 3
      pkg/storage/chunk/aws/fixtures.go
  8. 66
      pkg/storage/chunk/aws/s3_storage_client.go
  9. 74
      pkg/storage/chunk/aws/s3_storage_client_test.go
  10. 67
      pkg/storage/chunk/azure/blob_storage_client.go
  11. 85
      pkg/storage/chunk/azure/blob_storage_client_test.go
  12. 11
      pkg/storage/chunk/gcp/fixtures.go
  13. 58
      pkg/storage/chunk/gcp/gcs_object_client.go
  14. 90
      pkg/storage/chunk/gcp/gcs_object_client_test.go
  15. 12
      pkg/storage/chunk/gcp/instrumentation.go
  16. 42
      pkg/storage/chunk/hedging/hedging.go
  17. 54
      pkg/storage/chunk/openstack/swift_object_client.go
  18. 110
      pkg/storage/chunk/openstack/swift_object_client_test.go
  19. 20
      pkg/storage/chunk/storage/factory.go
  20. 21
      vendor/github.com/cristalhq/hedgedhttp/LICENSE
  21. 53
      vendor/github.com/cristalhq/hedgedhttp/README.md
  22. 345
      vendor/github.com/cristalhq/hedgedhttp/hedged.go
  23. 3
      vendor/modules.txt

@ -1,7 +1,8 @@
## Main
* [4826](https://github.com/grafana/loki/pull/4826) **cyriltovena**: Adds the ability to hedge storage requests.
* [4828](https://github.com/grafana/loki/pull/4282) **chaudum**: Set correct `Content-Type` header in query response
* [4832](https://github.com/grafana/loki/pull/4832) **taisho6339**: Use http prefix path correctly in Promtail
* [4828](https://github.com/grafana/loki/pull/4828) **chaudum**: Set correct `Content-Type` header in query response
* [4794](https://github.com/grafana/loki/pull/4794) **taisho6339**: Aggregate inotify watcher to file target manager
* [4663](https://github.com/grafana/loki/pull/4663) **taisho6339**: Add SASL&mTLS authentication support for Kafka in Promtail
* [4745](https://github.com/grafana/loki/pull/4745) **taisho6339**: Expose Kafka message key in labels

@ -312,9 +312,9 @@ The `query_scheduler` block configures the Loki query scheduler.
# This configures the gRPC client used to report errors back to the
# query-frontend.
[grpc_client_config: <grpc_client_config>]
# Set to true to have the query schedulers create and place themselves in a ring.
# If no frontend_address or scheduler_address are present
# If no frontend_address or scheduler_address are present
# anywhere else in the configuration, Loki will toggle this value to true.
[use_scheduler_ring: <boolean> | default = false]
@ -453,7 +453,7 @@ storage:
# Method to use for backend rule storage (azure, gcs, s3, swift, local).
# CLI flag: -ruler.storage.type
[type: <string> ]
# Configures backend rule storage for Azure.
[azure: <azure_storage_config>]
@ -469,6 +469,9 @@ storage:
# Configures backend rule storage for a local filesystem directory.
[local: <local_storage_config>]
# The `hedging_config` configures how to hedge requests for the storage.
[hedging: <hedging_config>]
# Remote-write configuration to send rule samples to a Prometheus remote-write endpoint.
remote_write:
# Enable remote-write functionality.
@ -509,8 +512,8 @@ remote_write:
# List of remote write relabel configurations.
write_relabel_configs:
[- <relabel_config> ...]
# Name of the remote write config, which if specified must be unique among remote
# Name of the remote write config, which if specified must be unique among remote
# write configs.
# The name will be used in metrics and logging in place of a generated value
# to help users distinguish between remote write configs.
@ -853,6 +856,26 @@ The `swift_storage_config` configures Swift as a general storage for different d
[container_name: <string> | default = "cortex"]
```
## hedging_config
The `hedging_config` configures how to hedge requests for the storage.
Hedged requests is sending a secondary request until the first request has been outstanding for more than a configure expected latency
for this class of requests.
You should configure the latency based on your p99 of object store requests.
```yaml
# Optional. Default is 0 (disabled)
# Example: "at: 500ms"
# If set to a non-zero value another request will be issued at the provided duration. Recommended to
# be set to p99 of object store requests to reduce long tail latency. This setting is most impactful when
# used with queriers and has minimal to no impact on other pieces.
[at: <duration> | default = 0]
# Optional. Default is 2
# The maximum amount of requests to be issued.
[up_to: <int> | default = 2]
```
## local_storage_config
The `local_storage_config` configures a (local) filesystem as a general storage for different data generated by Loki.
@ -1323,11 +1346,11 @@ aws:
# Minimum duration to back off.
# CLI flag: -s3.backoff-min-period
[min_period: <duration> | default = 100ms]
# The duration to back off.
# CLI flag: -s3.backoff-max-period
[max_period: <duration> | default = 3s]
# Number of times to back off and retry before failing.
# CLI flag: -s3.backoff-retries
[max_retries: <int> | default = 5]
@ -2081,7 +2104,7 @@ The `limits_config` block configures global and per-tenant limits in Loki.
# up until lookback duration ago.
# This limit is enforced in the query frontend, the querier and the ruler.
# If the requested time range is outside the allowed range, the request will not fail,
# but will be modified to only query data within the allowed time range.
# but will be modified to only query data within the allowed time range.
# The default value of 0 does not set a limit.
# CLI flag: -querier.max-query-lookback
[max_query_lookback: <duration> | default = 0]
@ -2360,6 +2383,9 @@ If any specific configuration for an object storage client have been provided el
# Configures a (local) filesystem as the common storage.
[filesystem: <local_storage_config>]
# The `hedging_config` configures how to hedge requests for the storage.
[hedging: <hedging_config>]
```
### ring_config
@ -2512,7 +2538,7 @@ How far into the past accepted out-of-order log entries may be
is configurable with `max_chunk_age`.
`max_chunk_age` defaults to 1 hour.
Loki calculates the earliest time that out-of-order entries may have
and be accepted with
and be accepted with
```
time_of_most_recent_line - (max_chunk_age/2)

@ -22,6 +22,7 @@ require (
github.com/cespare/xxhash/v2 v2.1.2
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf
github.com/cortexproject/cortex v1.10.1-0.20211124141505-4e9fc3a2b5ab
github.com/cristalhq/hedgedhttp v0.6.1
github.com/davecgh/go-spew v1.1.1
github.com/docker/docker v20.10.11+incompatible
github.com/docker/go-plugins-helpers v0.0.0-20181025120712-1e6269c305b8
@ -103,6 +104,7 @@ require (
)
require (
github.com/mattn/go-ieproxy v0.0.1
github.com/xdg-go/scram v1.0.2
gopkg.in/Graylog2/go-gelf.v2 v2.0.0-20191017102106-1550ee647df0
)
@ -211,7 +213,6 @@ require (
github.com/leodido/ragel-machinery v0.0.0-20181214104525-299bdde78165 // indirect
github.com/mailru/easyjson v0.7.6 // indirect
github.com/mattn/go-colorable v0.1.8 // indirect
github.com/mattn/go-ieproxy v0.0.1 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/miekg/dns v1.1.43 // indirect

@ -482,6 +482,8 @@ github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7Do
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw=
github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/cristalhq/hedgedhttp v0.6.1 h1:o3tcl+HwEFrGfNkZbgbQW4N7UNmorKvqhUFLN1rrkdA=
github.com/cristalhq/hedgedhttp v0.6.1/go.mod h1:XkqWU6qVMutbhW68NnzjWrGtH8NUx1UfYqGYtHVKIsI=
github.com/cucumber/godog v0.8.1/go.mod h1:vSh3r/lM+psC1BPXvdkSEuNjmXfpVqrMGYAElF6hxnA=
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
github.com/cyphar/filepath-securejoin v0.2.2/go.mod h1:FpkQEhXnPnOthhzymB7CGsFk2G9VLXONKD9G7QGMM+4=

@ -6,6 +6,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/aws"
"github.com/grafana/loki/pkg/storage/chunk/azure"
"github.com/grafana/loki/pkg/storage/chunk/gcp"
"github.com/grafana/loki/pkg/storage/chunk/hedging"
"github.com/grafana/loki/pkg/storage/chunk/openstack"
"github.com/grafana/loki/pkg/util"
)
@ -32,6 +33,7 @@ type Storage struct {
Azure azure.BlobStorageConfig `yaml:"azure"`
Swift openstack.SwiftConfig `yaml:"swift"`
FSConfig FilesystemConfig `yaml:"filesystem"`
Hedging hedging.Config `yaml:"hedging"`
}
func (s *Storage) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
@ -40,6 +42,7 @@ func (s *Storage) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
s.Azure.RegisterFlagsWithPrefix(prefix+".azure", f)
s.Swift.RegisterFlagsWithPrefix(prefix+".swift", f)
s.FSConfig.RegisterFlagsWithPrefix(prefix+".filesystem", f)
s.Hedging.RegisterFlagsWithPrefix(prefix, f)
}
type FilesystemConfig struct {

@ -146,9 +146,9 @@ func applyDynamicRingConfigs(r, defaults *ConfigWrapper) {
//any deviations from defaults. When mergeWithExisting is true, the ring config is overlaid on top of any specified
//derivations, with the derivations taking precedence.
func applyConfigToRings(r, defaults *ConfigWrapper, rc util.RingConfig, mergeWithExisting bool) {
//Ingester - mergeWithExisting is false when applying the ingester config, and we only want to
//change ingester ring values when applying the common config, so there's no need for the DeepEqual
//check here.
// Ingester - mergeWithExisting is false when applying the ingester config, and we only want to
// change ingester ring values when applying the common config, so there's no need for the DeepEqual
// check here.
if mergeWithExisting {
r.Ingester.LifecyclerConfig.RingConfig.KVStore = rc.KVStore
r.Ingester.LifecyclerConfig.HeartbeatPeriod = rc.HeartbeatPeriod
@ -331,7 +331,7 @@ var ErrTooManyStorageConfigs = errors.New("too many storage configs provided in
func applyStorageConfig(cfg, defaults *ConfigWrapper) error {
var applyConfig func(*ConfigWrapper)
//only one config is allowed
// only one config is allowed
configsFound := 0
if !reflect.DeepEqual(cfg.Common.Storage.Azure, defaults.StorageConfig.AzureStorageConfig) {
@ -341,6 +341,7 @@ func applyStorageConfig(cfg, defaults *ConfigWrapper) error {
r.Ruler.StoreConfig.Type = "azure"
r.Ruler.StoreConfig.Azure = r.Common.Storage.Azure.ToCortexAzureConfig()
r.StorageConfig.AzureStorageConfig = r.Common.Storage.Azure
r.StorageConfig.Hedging = r.Common.Storage.Hedging
r.CompactorConfig.SharedStoreType = chunk_storage.StorageTypeAzure
}
}
@ -368,6 +369,7 @@ func applyStorageConfig(cfg, defaults *ConfigWrapper) error {
r.Ruler.StoreConfig.GCS = r.Common.Storage.GCS.ToCortexGCSConfig()
r.StorageConfig.GCSConfig = r.Common.Storage.GCS
r.CompactorConfig.SharedStoreType = chunk_storage.StorageTypeGCS
r.StorageConfig.Hedging = r.Common.Storage.Hedging
}
}
@ -379,6 +381,7 @@ func applyStorageConfig(cfg, defaults *ConfigWrapper) error {
r.Ruler.StoreConfig.S3 = r.Common.Storage.S3.ToCortexS3Config()
r.StorageConfig.AWSStorageConfig.S3Config = r.Common.Storage.S3
r.CompactorConfig.SharedStoreType = chunk_storage.StorageTypeS3
r.StorageConfig.Hedging = r.Common.Storage.Hedging
}
}
@ -390,6 +393,7 @@ func applyStorageConfig(cfg, defaults *ConfigWrapper) error {
r.Ruler.StoreConfig.Swift = r.Common.Storage.Swift.ToCortexSwiftConfig()
r.StorageConfig.Swift = r.Common.Storage.Swift
r.CompactorConfig.SharedStoreType = chunk_storage.StorageTypeSwift
r.StorageConfig.Hedging = r.Common.Storage.Hedging
}
}

@ -44,7 +44,8 @@ var Fixtures = []testutils.Fixture{
schemaCfg: schemaConfig,
metrics: newMetrics(nil),
}
object := objectclient.NewClient(&S3ObjectClient{S3: newMockS3()}, nil)
mock := newMockS3()
object := objectclient.NewClient(&S3ObjectClient{S3: mock, hedgedS3: mock}, nil)
return index, object, table, schemaConfig, testutils.CloserFunc(func() error {
table.Stop()
index.Stop()

@ -35,6 +35,7 @@ import (
"github.com/grafana/dskit/flagext"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/hedging"
)
const (
@ -159,28 +160,27 @@ func (cfg *HTTPConfig) ToCortexHTTPConfig() cortex_aws.HTTPConfig {
}
type S3ObjectClient struct {
cfg S3Config
cfg S3Config
bucketNames []string
S3 s3iface.S3API
hedgedS3 s3iface.S3API
sseConfig *SSEParsedConfig
}
// NewS3ObjectClient makes a new S3-backed ObjectClient.
func NewS3ObjectClient(cfg S3Config) (*S3ObjectClient, error) {
s3Config, bucketNames, err := buildS3Config(cfg)
func NewS3ObjectClient(cfg S3Config, hedgingCfg hedging.Config) (*S3ObjectClient, error) {
bucketNames, err := buckets(cfg)
if err != nil {
return nil, errors.Wrap(err, "failed to build s3 config")
return nil, err
}
sess, err := session.NewSession(s3Config)
s3Client, err := buildS3Client(cfg, hedgingCfg, false)
if err != nil {
return nil, errors.Wrap(err, "failed to create new s3 session")
return nil, errors.Wrap(err, "failed to build s3 config")
}
s3Client := s3.New(sess)
if cfg.SignatureVersion == SignatureVersionV2 {
s3Client.Handlers.Sign.Swap(v4.SignRequestHandler.Name, v2SignRequestHandler(cfg))
s3ClientHedging, err := buildS3Client(cfg, hedgingCfg, true)
if err != nil {
return nil, errors.Wrap(err, "failed to build s3 config")
}
sseCfg, err := buildSSEParsedConfig(cfg)
@ -191,6 +191,7 @@ func NewS3ObjectClient(cfg S3Config) (*S3ObjectClient, error) {
client := S3ObjectClient{
cfg: cfg,
S3: s3Client,
hedgedS3: s3ClientHedging,
bucketNames: bucketNames,
sseConfig: sseCfg,
}
@ -234,7 +235,7 @@ func v2SignRequestHandler(cfg S3Config) request.NamedHandler {
}
}
func buildS3Config(cfg S3Config) (*aws.Config, []string, error) {
func buildS3Client(cfg S3Config, hedgingCfg hedging.Config, hedging bool) (*s3.S3, error) {
var s3Config *aws.Config
var err error
@ -242,7 +243,7 @@ func buildS3Config(cfg S3Config) (*aws.Config, []string, error) {
if cfg.S3.URL != nil {
s3Config, err = awscommon.ConfigFromURL(cfg.S3.URL)
if err != nil {
return nil, nil, err
return nil, err
}
} else {
s3Config = &aws.Config{}
@ -266,7 +267,7 @@ func buildS3Config(cfg S3Config) (*aws.Config, []string, error) {
if cfg.AccessKeyID != "" && cfg.SecretAccessKey == "" ||
cfg.AccessKeyID == "" && cfg.SecretAccessKey != "" {
return nil, nil, errors.New("must supply both an Access Key ID and Secret Access Key or neither")
return nil, errors.New("must supply both an Access Key ID and Secret Access Key or neither")
}
if cfg.AccessKeyID != "" && cfg.SecretAccessKey != "" {
@ -282,7 +283,7 @@ func buildS3Config(cfg S3Config) (*aws.Config, []string, error) {
tlsConfig.RootCAs = x509.NewCertPool()
data, err := os.ReadFile(cfg.HTTPConfig.CAFile)
if err != nil {
return nil, nil, err
return nil, err
}
tlsConfig.RootCAs.AppendCertsFromPEM(data)
}
@ -310,11 +311,31 @@ func buildS3Config(cfg S3Config) (*aws.Config, []string, error) {
if cfg.Inject != nil {
transport = cfg.Inject(transport)
}
s3Config = s3Config.WithHTTPClient(&http.Client{
httpClient := &http.Client{
Transport: transport,
})
}
if hedging {
httpClient = hedgingCfg.Client(httpClient)
}
s3Config = s3Config.WithHTTPClient(httpClient)
sess, err := session.NewSession(s3Config)
if err != nil {
return nil, errors.Wrap(err, "failed to create new s3 session")
}
s3Client := s3.New(sess)
if cfg.SignatureVersion == SignatureVersionV2 {
s3Client.Handlers.Sign.Swap(v4.SignRequestHandler.Name, v2SignRequestHandler(cfg))
}
return s3Client, nil
}
func buckets(cfg S3Config) ([]string, error) {
// bucketnames
var bucketNames []string
if cfg.S3.URL != nil {
@ -326,10 +347,9 @@ func buildS3Config(cfg S3Config) (*aws.Config, []string, error) {
}
if len(bucketNames) == 0 {
return nil, nil, errors.New("at least one bucket name must be specified")
return nil, errors.New("at least one bucket name must be specified")
}
return s3Config, bucketNames, nil
return bucketNames, nil
}
// Stop fulfills the chunk.ObjectClient interface
@ -376,7 +396,7 @@ func (a *S3ObjectClient) GetObject(ctx context.Context, objectKey string) (io.Re
}
err = instrument.CollectedRequest(ctx, "S3.GetObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
var requestErr error
resp, requestErr = a.S3.GetObjectWithContext(ctx, &s3.GetObjectInput{
resp, requestErr = a.hedgedS3.GetObjectWithContext(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(objectKey),
})

@ -1,16 +1,23 @@
package aws
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/grafana/dskit/backoff"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"github.com/grafana/loki/pkg/storage/chunk/hedging"
)
type RoundTripperFunc func(*http.Request) (*http.Response, error)
@ -59,7 +66,7 @@ func TestRequestMiddleware(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg.Inject = tt.fn
client, err := NewS3ObjectClient(cfg)
client, err := NewS3ObjectClient(cfg, hedging.Config{})
require.NoError(t, err)
readCloser, err := client.GetObject(context.Background(), "key")
@ -75,3 +82,68 @@ func TestRequestMiddleware(t *testing.T) {
})
}
}
func Test_Hedging(t *testing.T) {
for _, tc := range []struct {
name string
expectedCalls int32
hedgeAt time.Duration
upTo int
do func(c *S3ObjectClient)
}{
{
"delete/put/list are not hedged",
3,
20 * time.Nanosecond,
10,
func(c *S3ObjectClient) {
_ = c.DeleteObject(context.Background(), "foo")
_, _, _ = c.List(context.Background(), "foo", "/")
_ = c.PutObject(context.Background(), "foo", bytes.NewReader([]byte("bar")))
},
},
{
"gets are hedged",
3,
20 * time.Nanosecond,
3,
func(c *S3ObjectClient) {
_, _ = c.GetObject(context.Background(), "foo")
},
},
{
"gets are not hedged when not configured",
1,
0,
0,
func(c *S3ObjectClient) {
_, _ = c.GetObject(context.Background(), "foo")
},
},
} {
tc := tc
t.Run(tc.name, func(t *testing.T) {
count := atomic.NewInt32(0)
c, err := NewS3ObjectClient(S3Config{
AccessKeyID: "foo",
SecretAccessKey: "bar",
BackoffConfig: backoff.Config{MaxRetries: 1},
BucketNames: "foo",
Inject: func(next http.RoundTripper) http.RoundTripper {
return RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
count.Inc()
time.Sleep(200 * time.Millisecond)
return nil, errors.New("foo")
})
},
}, hedging.Config{
At: tc.hedgeAt,
UpTo: tc.upTo,
})
require.NoError(t, err)
tc.do(c)
require.Equal(t, tc.expectedCalls, count.Load())
})
}
}

@ -6,12 +6,15 @@ import (
"flag"
"fmt"
"io"
"net"
"net/http"
"net/url"
"strings"
"time"
"github.com/Azure/azure-pipeline-go/pipeline"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/mattn/go-ieproxy"
cortex_azure "github.com/cortexproject/cortex/pkg/chunk/azure"
"github.com/cortexproject/cortex/pkg/util"
@ -19,6 +22,7 @@ import (
"github.com/grafana/dskit/flagext"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/hedging"
chunk_util "github.com/grafana/loki/pkg/storage/chunk/util"
)
@ -51,6 +55,26 @@ var (
"https://%s.blob.core.usgovcloudapi.net/%s",
},
}
// default Azure http client.
defaultClient = &http.Client{
Transport: &http.Transport{
Proxy: ieproxy.GetProxyFunc(),
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).Dial,
MaxIdleConns: 0,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
DisableKeepAlives: false,
DisableCompression: false,
MaxResponseHeaderBytes: 0,
},
}
)
// BlobStorageConfig defines the configurable flags that can be defined when using azure blob storage.
@ -109,14 +133,16 @@ func (c *BlobStorageConfig) ToCortexAzureConfig() cortex_azure.BlobStorageConfig
type BlobStorage struct {
// blobService storage.Serv
cfg *BlobStorageConfig
hedgingCfg hedging.Config
containerURL azblob.ContainerURL
}
// NewBlobStorage creates a new instance of the BlobStorage struct.
func NewBlobStorage(cfg *BlobStorageConfig) (*BlobStorage, error) {
func NewBlobStorage(cfg *BlobStorageConfig, hedgingCfg hedging.Config) (*BlobStorage, error) {
log.WarnExperimentalUse("Azure Blob Storage")
blobStorage := &BlobStorage{
cfg: cfg,
cfg: cfg,
hedgingCfg: hedgingCfg,
}
var err error
@ -148,7 +174,7 @@ func (b *BlobStorage) GetObject(ctx context.Context, objectKey string) (io.ReadC
}
func (b *BlobStorage) getObject(ctx context.Context, objectKey string) (rc io.ReadCloser, err error) {
blockBlobURL, err := b.getBlobURL(objectKey)
blockBlobURL, err := b.getBlobURL(objectKey, true)
if err != nil {
return nil, err
}
@ -163,7 +189,7 @@ func (b *BlobStorage) getObject(ctx context.Context, objectKey string) (rc io.Re
}
func (b *BlobStorage) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error {
blockBlobURL, err := b.getBlobURL(objectKey)
blockBlobURL, err := b.getBlobURL(objectKey, false)
if err != nil {
return err
}
@ -176,7 +202,7 @@ func (b *BlobStorage) PutObject(ctx context.Context, objectKey string, object io
return err
}
func (b *BlobStorage) getBlobURL(blobID string) (azblob.BlockBlobURL, error) {
func (b *BlobStorage) getBlobURL(blobID string, hedging bool) (azblob.BlockBlobURL, error) {
blobID = strings.Replace(blobID, ":", "-", -1)
// generate url for new chunk blob
@ -185,7 +211,7 @@ func (b *BlobStorage) getBlobURL(blobID string) (azblob.BlockBlobURL, error) {
return azblob.BlockBlobURL{}, err
}
azPipeline, err := b.newPipeline()
azPipeline, err := b.newPipeline(hedging)
if err != nil {
return azblob.BlockBlobURL{}, err
}
@ -199,7 +225,7 @@ func (b *BlobStorage) buildContainerURL() (azblob.ContainerURL, error) {
return azblob.ContainerURL{}, err
}
azPipeline, err := b.newPipeline()
azPipeline, err := b.newPipeline(false)
if err != nil {
return azblob.ContainerURL{}, err
}
@ -207,13 +233,13 @@ func (b *BlobStorage) buildContainerURL() (azblob.ContainerURL, error) {
return azblob.NewContainerURL(*u, azPipeline), nil
}
func (b *BlobStorage) newPipeline() (pipeline.Pipeline, error) {
func (b *BlobStorage) newPipeline(hedging bool) (pipeline.Pipeline, error) {
credential, err := azblob.NewSharedKeyCredential(b.cfg.AccountName, b.cfg.AccountKey.Value)
if err != nil {
return nil, err
}
return azblob.NewPipeline(credential, azblob.PipelineOptions{
opts := azblob.PipelineOptions{
Retry: azblob.RetryOptions{
Policy: azblob.RetryPolicyExponential,
MaxTries: (int32)(b.cfg.MaxRetries),
@ -221,7 +247,26 @@ func (b *BlobStorage) newPipeline() (pipeline.Pipeline, error) {
RetryDelay: b.cfg.MinRetryDelay,
MaxRetryDelay: b.cfg.MaxRetryDelay,
},
}), nil
}
opts.HTTPSender = pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc {
return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
resp, err := defaultClient.Do(request.WithContext(ctx))
return pipeline.NewHTTPResponse(resp), err
}
})
if hedging {
opts.HTTPSender = pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc {
client := b.hedgingCfg.Client(defaultClient)
return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
resp, err := client.Do(request.WithContext(ctx))
return pipeline.NewHTTPResponse(resp), err
}
})
}
return azblob.NewPipeline(credential, opts), nil
}
// List implements chunk.ObjectClient.
@ -259,7 +304,7 @@ func (b *BlobStorage) List(ctx context.Context, prefix, delimiter string) ([]chu
}
func (b *BlobStorage) DeleteObject(ctx context.Context, blobID string) error {
blockBlobURL, err := b.getBlobURL(blobID)
blockBlobURL, err := b.getBlobURL(blobID, false)
if err != nil {
return err
}

@ -0,0 +1,85 @@
package azure
import (
"bytes"
"context"
"errors"
"net/http"
"testing"
"time"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"github.com/grafana/loki/pkg/storage/chunk/hedging"
)
type RoundTripperFunc func(*http.Request) (*http.Response, error)
func (fn RoundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) {
return fn(req)
}
func Test_Hedging(t *testing.T) {
for _, tc := range []struct {
name string
expectedCalls int32
hedgeAt time.Duration
upTo int
do func(c *BlobStorage)
}{
{
"delete/put/list are not hedged",
3,
20 * time.Nanosecond,
10,
func(c *BlobStorage) {
_ = c.DeleteObject(context.Background(), "foo")
_, _, _ = c.List(context.Background(), "foo", "/")
_ = c.PutObject(context.Background(), "foo", bytes.NewReader([]byte("bar")))
},
},
{
"gets are hedged",
3,
20 * time.Nanosecond,
3,
func(c *BlobStorage) {
_, _ = c.GetObject(context.Background(), "foo")
},
},
{
"gets are not hedged when not configured",
1,
0,
0,
func(c *BlobStorage) {
_, _ = c.GetObject(context.Background(), "foo")
},
},
} {
tc := tc
t.Run(tc.name, func(t *testing.T) {
count := atomic.NewInt32(0)
// hijack the client to count the number of calls
defaultClient = &http.Client{
Transport: RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
count.Inc()
time.Sleep(200 * time.Millisecond)
return nil, errors.New("fo")
}),
}
c, err := NewBlobStorage(&BlobStorageConfig{
ContainerName: "foo",
Environment: azureGlobal,
MaxRetries: 1,
}, hedging.Config{
At: tc.hedgeAt,
UpTo: tc.upTo,
})
require.NoError(t, err)
tc.do(c)
require.Equal(t, tc.expectedCalls, count.Load())
})
}
}

@ -7,11 +7,13 @@ import (
"cloud.google.com/go/bigtable"
"cloud.google.com/go/bigtable/bttest"
"cloud.google.com/go/storage"
"github.com/fsouza/fake-gcs-server/fakestorage"
"google.golang.org/api/option"
"google.golang.org/grpc"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/hedging"
"github.com/grafana/loki/pkg/storage/chunk/objectclient"
"github.com/grafana/loki/pkg/storage/chunk/testutils"
)
@ -78,7 +80,14 @@ func (f *fixture) Clients() (
}
if f.gcsObjectClient {
cClient = objectclient.NewClient(newGCSObjectClient(GCSConfig{BucketName: "chunks"}, f.gcssrv.Client()), nil)
var c *GCSObjectClient
c, err = newGCSObjectClient(ctx, GCSConfig{BucketName: "chunks"}, hedging.Config{}, func(ctx context.Context, opts ...option.ClientOption) (*storage.Client, error) {
return f.gcssrv.Client(), nil
})
if err != nil {
return
}
cClient = objectclient.NewClient(c, nil)
} else {
cClient = newBigtableObjectClient(Config{}, schemaConfig, client)
}

@ -13,13 +13,17 @@ import (
"google.golang.org/api/option"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/hedging"
"github.com/grafana/loki/pkg/storage/chunk/util"
)
type ClientFactory func(ctx context.Context, opts ...option.ClientOption) (*storage.Client, error)
type GCSObjectClient struct {
cfg GCSConfig
client *storage.Client
bucket *storage.BucketHandle
cfg GCSConfig
bucket *storage.BucketHandle
hedgingBucket *storage.BucketHandle
}
// GCSConfig is config for the GCS Chunk Client.
@ -28,6 +32,8 @@ type GCSConfig struct {
ChunkBufferSize int `yaml:"chunk_buffer_size"`
RequestTimeout time.Duration `yaml:"request_timeout"`
EnableOpenCensus bool `yaml:"enable_opencensus"`
Insecure bool `yaml:"-"`
}
// RegisterFlags registers flags.
@ -53,35 +59,51 @@ func (cfg *GCSConfig) ToCortexGCSConfig() cortex_gcp.GCSConfig {
}
// NewGCSObjectClient makes a new chunk.Client that writes chunks to GCS.
func NewGCSObjectClient(ctx context.Context, cfg GCSConfig) (*GCSObjectClient, error) {
func NewGCSObjectClient(ctx context.Context, cfg GCSConfig, hedgingCfg hedging.Config) (*GCSObjectClient, error) {
return newGCSObjectClient(ctx, cfg, hedgingCfg, storage.NewClient)
}
func newGCSObjectClient(ctx context.Context, cfg GCSConfig, hedgingCfg hedging.Config, clientFactory ClientFactory) (*GCSObjectClient, error) {
bucket, err := newBucketHandle(ctx, cfg, hedgingCfg, false, clientFactory)
if err != nil {
return nil, err
}
hedgingBucket, err := newBucketHandle(ctx, cfg, hedgingCfg, true, clientFactory)
if err != nil {
return nil, err
}
return &GCSObjectClient{
cfg: cfg,
bucket: bucket,
hedgingBucket: hedgingBucket,
}, nil
}
func newBucketHandle(ctx context.Context, cfg GCSConfig, hedgingCfg hedging.Config, hedging bool, clientFactory ClientFactory) (*storage.BucketHandle, error) {
var opts []option.ClientOption
instrumentation, err := gcsInstrumentation(ctx, storage.ScopeReadWrite)
httpClient, err := gcsInstrumentation(ctx, storage.ScopeReadWrite, cfg.Insecure)
if err != nil {
return nil, err
}
opts = append(opts, instrumentation)
if hedging {
httpClient = hedgingCfg.Client(httpClient)
}
opts = append(opts, option.WithHTTPClient(httpClient))
if !cfg.EnableOpenCensus {
opts = append(opts, option.WithTelemetryDisabled())
}
client, err := storage.NewClient(ctx, opts...)
client, err := clientFactory(ctx, opts...)
if err != nil {
return nil, err
}
return newGCSObjectClient(cfg, client), nil
}
func newGCSObjectClient(cfg GCSConfig, client *storage.Client) *GCSObjectClient {
bucket := client.Bucket(cfg.BucketName)
return &GCSObjectClient{
cfg: cfg,
client: client,
bucket: bucket,
}
return client.Bucket(cfg.BucketName), nil
}
func (s *GCSObjectClient) Stop() {
s.client.Close()
}
// GetObject returns a reader for the specified object key from the configured GCS bucket.
@ -102,7 +124,7 @@ func (s *GCSObjectClient) GetObject(ctx context.Context, objectKey string) (io.R
}
func (s *GCSObjectClient) getObject(ctx context.Context, objectKey string) (rc io.ReadCloser, err error) {
reader, err := s.bucket.Object(objectKey).NewReader(ctx)
reader, err := s.hedgingBucket.Object(objectKey).NewReader(ctx)
if err != nil {
return nil, err
}

@ -0,0 +1,90 @@
package gcp
import (
"bytes"
"context"
"net/http"
"net/http/httptest"
"testing"
"time"
"cloud.google.com/go/storage"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"google.golang.org/api/option"
"github.com/grafana/loki/pkg/storage/chunk/hedging"
)
func Test_Hedging(t *testing.T) {
for _, tc := range []struct {
name string
expectedCalls int32
hedgeAt time.Duration
upTo int
do func(c *GCSObjectClient)
}{
{
"delete/put/list are not hedged",
3,
20 * time.Nanosecond,
10,
func(c *GCSObjectClient) {
_ = c.DeleteObject(context.Background(), "foo")
_, _, _ = c.List(context.Background(), "foo", "/")
_ = c.PutObject(context.Background(), "foo", bytes.NewReader([]byte("bar")))
},
},
{
"gets are hedged",
3,
20 * time.Nanosecond,
3,
func(c *GCSObjectClient) {
_, _ = c.GetObject(context.Background(), "foo")
},
},
{
"gets are not hedged when not configured",
1,
0,
0,
func(c *GCSObjectClient) {
_, _ = c.GetObject(context.Background(), "foo")
},
},
} {
tc := tc
t.Run(tc.name, func(t *testing.T) {
count := atomic.NewInt32(0)
server := fakeServer(t, 200*time.Millisecond, count)
ctx := context.Background()
c, err := newGCSObjectClient(ctx, GCSConfig{
BucketName: "test-bucket",
Insecure: true,
}, hedging.Config{
At: tc.hedgeAt,
UpTo: tc.upTo,
}, func(ctx context.Context, opts ...option.ClientOption) (*storage.Client, error) {
opts = append(opts, option.WithEndpoint(server.URL))
opts = append(opts, option.WithoutAuthentication())
return storage.NewClient(ctx, opts...)
})
require.NoError(t, err)
tc.do(c)
require.Equal(t, tc.expectedCalls, count.Load())
})
}
}
func fakeServer(t *testing.T, returnIn time.Duration, counter *atomic.Int32) *httptest.Server {
server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
counter.Inc()
time.Sleep(returnIn)
_, _ = w.Write([]byte(`{}`))
}))
server.StartTLS()
t.Cleanup(server.Close)
return server
}

@ -2,6 +2,7 @@ package gcp
import (
"context"
"crypto/tls"
"net/http"
"strconv"
"time"
@ -49,8 +50,13 @@ func bigtableInstrumentation() ([]grpc.UnaryClientInterceptor, []grpc.StreamClie
}
}
func gcsInstrumentation(ctx context.Context, scope string) (option.ClientOption, error) {
transport, err := google_http.NewTransport(ctx, http.DefaultTransport, option.WithScopes(scope))
func gcsInstrumentation(ctx context.Context, scope string, insecure bool) (*http.Client, error) {
// start with default transport
customTransport := http.DefaultTransport.(*http.Transport).Clone()
if insecure {
customTransport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
}
transport, err := google_http.NewTransport(ctx, customTransport, option.WithScopes(scope))
if err != nil {
return nil, err
}
@ -60,7 +66,7 @@ func gcsInstrumentation(ctx context.Context, scope string) (option.ClientOption,
next: transport,
},
}
return option.WithHTTPClient(client), nil
return client, nil
}
func toOptions(opts []grpc.DialOption) []option.ClientOption {

@ -0,0 +1,42 @@
package hedging
import (
"flag"
"net/http"
"time"
"github.com/cristalhq/hedgedhttp"
)
// Config is the configuration for hedging requests.
type Config struct {
// At is the duration after which a second request will be issued.
At time.Duration `yaml:"at"`
// UpTo is the maximum number of requests that will be issued.
UpTo int `yaml:"up_to"`
}
// RegisterFlags registers flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("", f)
}
// RegisterFlagsWithPrefix registers flags with prefix.
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.UpTo, prefix+"hedge-requests-up-to", 2, "The maximun of hedge requests allowed.")
f.DurationVar(&cfg.At, prefix+"hedge-requests-at", 0, "If set to a non-zero value a second request will be issued at the provided duration. Default is 0 (disabled)")
}
func (cfg *Config) Client(client *http.Client) *http.Client {
if cfg.At == 0 {
return client
}
return hedgedhttp.NewClient(cfg.At, cfg.UpTo, client)
}
func (cfg *Config) RoundTripper(next http.RoundTripper) http.RoundTripper {
if cfg.At == 0 {
return next
}
return hedgedhttp.NewRoundTripper(cfg.At, cfg.UpTo, next)
}

@ -7,6 +7,8 @@ import (
"fmt"
"io"
"io/ioutil"
"net/http"
"time"
"github.com/ncw/swift"
"github.com/pkg/errors"
@ -16,11 +18,19 @@ import (
"github.com/cortexproject/cortex/pkg/util/log"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/hedging"
)
var defaultTransport http.RoundTripper = &http.Transport{
Proxy: http.ProxyFromEnvironment,
MaxIdleConnsPerHost: 512,
ExpectContinueTimeout: 5 * time.Second,
}
type SwiftObjectClient struct {
conn *swift.Connection
cfg SwiftConfig
conn *swift.Connection
hedgingConn *swift.Connection
cfg SwiftConfig
}
// SwiftConfig is config for the Swift Chunk Client.
@ -50,9 +60,29 @@ func (cfg *SwiftConfig) ToCortexSwiftConfig() cortex_openstack.SwiftConfig {
}
// NewSwiftObjectClient makes a new chunk.Client that writes chunks to OpenStack Swift.
func NewSwiftObjectClient(cfg SwiftConfig) (*SwiftObjectClient, error) {
func NewSwiftObjectClient(cfg SwiftConfig, hedgingCfg hedging.Config) (*SwiftObjectClient, error) {
log.WarnExperimentalUse("OpenStack Swift Storage")
c, err := createConnection(cfg, hedgingCfg, false)
if err != nil {
return nil, err
}
// Ensure the container is created, no error is returned if it already exists.
if err := c.ContainerCreate(cfg.ContainerName, nil); err != nil {
return nil, err
}
hedging, err := createConnection(cfg, hedgingCfg, true)
if err != nil {
return nil, err
}
return &SwiftObjectClient{
conn: c,
hedgingConn: hedging,
cfg: cfg,
}, nil
}
func createConnection(cfg SwiftConfig, hedgingCfg hedging.Config, hedging bool) (*swift.Connection, error) {
// Create a connection
c := &swift.Connection{
AuthVersion: cfg.AuthVersion,
@ -70,6 +100,7 @@ func NewSwiftObjectClient(cfg SwiftConfig) (*SwiftObjectClient, error) {
Domain: cfg.DomainName,
DomainId: cfg.DomainID,
Region: cfg.RegionName,
Transport: defaultTransport,
}
switch {
@ -78,32 +109,27 @@ func NewSwiftObjectClient(cfg SwiftConfig) (*SwiftObjectClient, error) {
case cfg.UserDomainID != "":
c.DomainId = cfg.UserDomainID
}
if hedging {
c.Transport = hedgingCfg.RoundTripper(c.Transport)
}
// Authenticate
err := c.Authenticate()
if err != nil {
return nil, err
}
// Ensure the container is created, no error is returned if it already exists.
if err := c.ContainerCreate(cfg.ContainerName, nil); err != nil {
return nil, err
}
return &SwiftObjectClient{
conn: c,
cfg: cfg,
}, nil
return c, nil
}
func (s *SwiftObjectClient) Stop() {
s.conn.UnAuthenticate()
s.hedgingConn.UnAuthenticate()
}
// GetObject returns a reader for the specified object key from the configured swift container.
func (s *SwiftObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, error) {
var buf bytes.Buffer
_, err := s.conn.ObjectGet(s.cfg.ContainerName, objectKey, &buf, false, nil)
_, err := s.hedgingConn.ObjectGet(s.cfg.ContainerName, objectKey, &buf, false, nil)
if err != nil {
return nil, err
}

@ -0,0 +1,110 @@
package openstack
import (
"bytes"
"context"
"net/http"
"testing"
"time"
"github.com/cortexproject/cortex/pkg/storage/bucket/swift"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"github.com/grafana/loki/pkg/storage/chunk/hedging"
)
type RoundTripperFunc func(*http.Request) (*http.Response, error)
func (fn RoundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) {
return fn(req)
}
func Test_Hedging(t *testing.T) {
for _, tc := range []struct {
name string
expectedCalls int32
hedgeAt time.Duration
upTo int
do func(c *SwiftObjectClient)
}{
{
"delete/put/list are not hedged",
3,
20 * time.Nanosecond,
10,
func(c *SwiftObjectClient) {
_ = c.DeleteObject(context.Background(), "foo")
_, _, _ = c.List(context.Background(), "foo", "/")
_ = c.PutObject(context.Background(), "foo", bytes.NewReader([]byte("bar")))
},
},
{
"gets are hedged",
3,
20 * time.Nanosecond,
3,
func(c *SwiftObjectClient) {
_, _ = c.GetObject(context.Background(), "foo")
},
},
{
"gets are not hedged when not configured",
1,
0,
0,
func(c *SwiftObjectClient) {
_, _ = c.GetObject(context.Background(), "foo")
},
},
} {
tc := tc
t.Run(tc.name, func(t *testing.T) {
count := atomic.NewInt32(0)
// hijack the transport to count the number of calls
defaultTransport = RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
// fake auth
if req.Header.Get("X-Auth-Key") == "passwd" {
return &http.Response{
StatusCode: http.StatusOK,
Body: http.NoBody,
Header: http.Header{
"X-Storage-Url": []string{"http://swift.example.com/v1/AUTH_test"},
"X-Auth-Token": []string{"token"},
},
}, nil
}
// fake container creation
if req.Method == "PUT" && req.URL.Path == "/v1/AUTH_test/foo" {
return &http.Response{
StatusCode: http.StatusCreated,
Body: http.NoBody,
}, nil
}
count.Inc()
time.Sleep(200 * time.Millisecond)
return &http.Response{
StatusCode: http.StatusOK,
Body: http.NoBody,
}, nil
})
c, err := NewSwiftObjectClient(SwiftConfig{
Config: swift.Config{
MaxRetries: 1,
ContainerName: "foo",
AuthVersion: 1,
Password: "passwd",
ConnectTimeout: 10 * time.Second,
RequestTimeout: 10 * time.Second,
},
}, hedging.Config{
At: tc.hedgeAt,
UpTo: tc.upTo,
})
require.NoError(t, err)
tc.do(c)
require.Equal(t, tc.expectedCalls, count.Load())
})
}
}

@ -21,6 +21,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/cassandra"
"github.com/grafana/loki/pkg/storage/chunk/gcp"
"github.com/grafana/loki/pkg/storage/chunk/grpc"
"github.com/grafana/loki/pkg/storage/chunk/hedging"
"github.com/grafana/loki/pkg/storage/chunk/local"
"github.com/grafana/loki/pkg/storage/chunk/objectclient"
"github.com/grafana/loki/pkg/storage/chunk/openstack"
@ -95,6 +96,8 @@ type Config struct {
DisableBroadIndexQueries bool `yaml:"disable_broad_index_queries"`
GrpcConfig grpc.Config `yaml:"grpc_store"`
Hedging hedging.Config `yaml:"hedging"`
}
// RegisterFlags adds the flags required to configure this flag set.
@ -108,6 +111,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.FSConfig.RegisterFlags(f)
cfg.Swift.RegisterFlags(f)
cfg.GrpcConfig.RegisterFlags(f)
cfg.Hedging.RegisterFlagsWithPrefix("store.", f)
f.StringVar(&cfg.Engine, "store.engine", "chunks", "The storage engine to use: chunks or blocks.")
cfg.IndexQueriesCacheConfig.RegisterFlagsWithPrefix("store.index-cache-read.", "Cache config for index entry reading.", f)
@ -267,7 +271,7 @@ func NewChunkClient(name string, cfg Config, schemaCfg chunk.SchemaConfig, regis
case StorageTypeInMemory:
return chunk.NewMockStorage(), nil
case StorageTypeAWS, StorageTypeS3:
return newChunkClientFromStore(aws.NewS3ObjectClient(cfg.AWSStorageConfig.S3Config))
return newChunkClientFromStore(aws.NewS3ObjectClient(cfg.AWSStorageConfig.S3Config, cfg.Hedging))
case StorageTypeAWSDynamo:
if cfg.AWSStorageConfig.DynamoDB.URL == nil {
return nil, fmt.Errorf("Must set -dynamodb.url in aws mode")
@ -278,15 +282,15 @@ func NewChunkClient(name string, cfg Config, schemaCfg chunk.SchemaConfig, regis
}
return aws.NewDynamoDBChunkClient(cfg.AWSStorageConfig.DynamoDBConfig, schemaCfg, registerer)
case StorageTypeAzure:
return newChunkClientFromStore(azure.NewBlobStorage(&cfg.AzureStorageConfig))
return newChunkClientFromStore(azure.NewBlobStorage(&cfg.AzureStorageConfig, cfg.Hedging))
case StorageTypeGCP:
return gcp.NewBigtableObjectClient(context.Background(), cfg.GCPStorageConfig, schemaCfg)
case StorageTypeGCPColumnKey, StorageTypeBigTable, StorageTypeBigTableHashed:
return gcp.NewBigtableObjectClient(context.Background(), cfg.GCPStorageConfig, schemaCfg)
case StorageTypeGCS:
return newChunkClientFromStore(gcp.NewGCSObjectClient(context.Background(), cfg.GCSConfig))
return newChunkClientFromStore(gcp.NewGCSObjectClient(context.Background(), cfg.GCSConfig, cfg.Hedging))
case StorageTypeSwift:
return newChunkClientFromStore(openstack.NewSwiftObjectClient(cfg.Swift))
return newChunkClientFromStore(openstack.NewSwiftObjectClient(cfg.Swift, cfg.Hedging))
case StorageTypeCassandra:
return cassandra.NewObjectClient(cfg.CassandraStorageConfig, schemaCfg, registerer)
case StorageTypeFileSystem:
@ -355,13 +359,13 @@ func NewBucketClient(storageConfig Config) (chunk.BucketClient, error) {
func NewObjectClient(name string, cfg Config) (chunk.ObjectClient, error) {
switch name {
case StorageTypeAWS, StorageTypeS3:
return aws.NewS3ObjectClient(cfg.AWSStorageConfig.S3Config)
return aws.NewS3ObjectClient(cfg.AWSStorageConfig.S3Config, cfg.Hedging)
case StorageTypeGCS:
return gcp.NewGCSObjectClient(context.Background(), cfg.GCSConfig)
return gcp.NewGCSObjectClient(context.Background(), cfg.GCSConfig, cfg.Hedging)
case StorageTypeAzure:
return azure.NewBlobStorage(&cfg.AzureStorageConfig)
return azure.NewBlobStorage(&cfg.AzureStorageConfig, cfg.Hedging)
case StorageTypeSwift:
return openstack.NewSwiftObjectClient(cfg.Swift)
return openstack.NewSwiftObjectClient(cfg.Swift, cfg.Hedging)
case StorageTypeInMemory:
return chunk.NewMockStorage(), nil
case StorageTypeFileSystem:

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2021 cristaltech
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

@ -0,0 +1,53 @@
# hedgedhttp
[![build-img]][build-url]
[![pkg-img]][pkg-url]
[![reportcard-img]][reportcard-url]
[![coverage-img]][coverage-url]
Hedged HTTP client which helps to reduce tail latency at scale.
## Rationale
See paper [Tail at Scale](https://cacm.acm.org/magazines/2013/2/160173-the-tail-at-scale/fulltext) by Jeffrey Dean, Luiz André Barroso. In short: the client first sends one request, but then sends an additional request after a timeout if the previous hasn't returned an answer in the expected time. The client cancels remaining requests once the first result is received.
## Acknowledge
Thanks to [Bohdan Storozhuk](https://github.com/storozhukbm) for the review and powerful hints.
## Features
* Simple API.
* Easy to integrate.
* Optimized for speed.
* Clean and tested code.
* Dependency-free.
## Install
Go version 1.16+
```
go get github.com/cristalhq/hedgedhttp
```
## Example
TODO
## Documentation
See [these docs][pkg-url].
## License
[MIT License](LICENSE).
[build-img]: https://github.com/cristalhq/hedgedhttp/workflows/build/badge.svg
[build-url]: https://github.com/cristalhq/hedgedhttp/actions
[pkg-img]: https://pkg.go.dev/badge/cristalhq/hedgedhttp
[pkg-url]: https://pkg.go.dev/github.com/cristalhq/hedgedhttp
[reportcard-img]: https://goreportcard.com/badge/cristalhq/hedgedhttp
[reportcard-url]: https://goreportcard.com/report/cristalhq/hedgedhttp
[coverage-img]: https://codecov.io/gh/cristalhq/hedgedhttp/branch/main/graph/badge.svg
[coverage-url]: https://codecov.io/gh/cristalhq/hedgedhttp

@ -0,0 +1,345 @@
package hedgedhttp
import (
"context"
"fmt"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
)
const infiniteTimeout = 30 * 24 * time.Hour // domain specific infinite
// NewClient returns a new http.Client which implements hedged requests pattern.
// Given Client starts a new request after a timeout from previous request.
// Starts no more than upto requests.
func NewClient(timeout time.Duration, upto int, client *http.Client) *http.Client {
newClient, _ := NewClientAndStats(timeout, upto, client)
return newClient
}
// NewClientAndStats returns a new http.Client which implements hedged requests pattern
// And Stats object that can be queried to obtain client's metrics.
// Given Client starts a new request after a timeout from previous request.
// Starts no more than upto requests.
func NewClientAndStats(timeout time.Duration, upto int, client *http.Client) (*http.Client, *Stats) {
if client == nil {
client = &http.Client{
Timeout: 5 * time.Second,
}
}
newTransport, metrics := NewRoundTripperAndStats(timeout, upto, client.Transport)
client.Transport = newTransport
return client, metrics
}
// NewRoundTripper returns a new http.RoundTripper which implements hedged requests pattern.
// Given RoundTripper starts a new request after a timeout from previous request.
// Starts no more than upto requests.
func NewRoundTripper(timeout time.Duration, upto int, rt http.RoundTripper) http.RoundTripper {
newRT, _ := NewRoundTripperAndStats(timeout, upto, rt)
return newRT
}
// NewRoundTripperAndStats returns a new http.RoundTripper which implements hedged requests pattern
// And Stats object that can be queried to obtain client's metrics.
// Given RoundTripper starts a new request after a timeout from previous request.
// Starts no more than upto requests.
func NewRoundTripperAndStats(timeout time.Duration, upto int, rt http.RoundTripper) (http.RoundTripper, *Stats) {
switch {
case timeout < 0:
panic("hedgedhttp: timeout cannot be negative")
case upto < 1:
panic("hedgedhttp: upto must be greater than 0")
}
if rt == nil {
rt = http.DefaultTransport
}
if timeout == 0 {
timeout = time.Nanosecond // smallest possible timeout if not set
}
hedged := &hedgedTransport{
rt: rt,
timeout: timeout,
upto: upto,
metrics: &Stats{},
}
return hedged, hedged.metrics
}
type hedgedTransport struct {
rt http.RoundTripper
timeout time.Duration
upto int
metrics *Stats
}
func (ht *hedgedTransport) RoundTrip(req *http.Request) (*http.Response, error) {
mainCtx := req.Context()
timeout := ht.timeout
errOverall := &MultiError{}
resultCh := make(chan indexedResp, ht.upto)
errorCh := make(chan error, ht.upto)
ht.metrics.requestedRoundTripsInc()
resultIdx := -1
cancels := make([]func(), ht.upto)
defer runInPool(func() {
for i, cancel := range cancels {
if i != resultIdx && cancel != nil {
ht.metrics.canceledSubRequestsInc()
cancel()
}
}
})
for sent := 0; len(errOverall.Errors) < ht.upto; sent++ {
if sent < ht.upto {
idx := sent
subReq, cancel := reqWithCtx(req, mainCtx)
cancels[idx] = cancel
runInPool(func() {
ht.metrics.actualRoundTripsInc()
resp, err := ht.rt.RoundTrip(subReq)
if err != nil {
ht.metrics.failedRoundTripsInc()
errorCh <- err
} else {
resultCh <- indexedResp{idx, resp}
}
})
}
// all request sent - effectively disabling timeout between requests
if sent == ht.upto {
timeout = infiniteTimeout
}
resp, err := waitResult(mainCtx, resultCh, errorCh, timeout)
switch {
case resp.Resp != nil:
resultIdx = resp.Index
return resp.Resp, nil
case mainCtx.Err() != nil:
ht.metrics.canceledByUserRoundTripsInc()
return nil, mainCtx.Err()
case err != nil:
errOverall.Errors = append(errOverall.Errors, err)
}
}
// all request have returned errors
return nil, errOverall
}
func waitResult(ctx context.Context, resultCh <-chan indexedResp, errorCh <-chan error, timeout time.Duration) (indexedResp, error) {
// try to read result first before blocking on all other channels
select {
case res := <-resultCh:
return res, nil
default:
timer := getTimer(timeout)
defer returnTimer(timer)
select {
case res := <-resultCh:
return res, nil
case reqErr := <-errorCh:
return indexedResp{}, reqErr
case <-ctx.Done():
return indexedResp{}, ctx.Err()
case <-timer.C:
return indexedResp{}, nil // it's not a request timeout, it's timeout BETWEEN consecutive requests
}
}
}
type indexedResp struct {
Index int
Resp *http.Response
}
func reqWithCtx(r *http.Request, ctx context.Context) (*http.Request, func()) {
ctx, cancel := context.WithCancel(ctx)
req := r.WithContext(ctx)
return req, cancel
}
// atomicCounter is a false sharing safe counter.
type atomicCounter struct {
count uint64
_ [7]uint64
}
type cacheLine [64]byte
// Stats object that can be queried to obtain certain metrics and get better observability.
type Stats struct {
_ cacheLine
requestedRoundTrips atomicCounter
actualRoundTrips atomicCounter
failedRoundTrips atomicCounter
canceledByUserRoundTrips atomicCounter
canceledSubRequests atomicCounter
_ cacheLine
}
func (s *Stats) requestedRoundTripsInc() { atomic.AddUint64(&s.requestedRoundTrips.count, 1) }
func (s *Stats) actualRoundTripsInc() { atomic.AddUint64(&s.actualRoundTrips.count, 1) }
func (s *Stats) failedRoundTripsInc() { atomic.AddUint64(&s.failedRoundTrips.count, 1) }
func (s *Stats) canceledByUserRoundTripsInc() { atomic.AddUint64(&s.canceledByUserRoundTrips.count, 1) }
func (s *Stats) canceledSubRequestsInc() { atomic.AddUint64(&s.canceledSubRequests.count, 1) }
// RequestedRoundTrips returns count of requests that were requested by client.
func (s *Stats) RequestedRoundTrips() uint64 {
return atomic.LoadUint64(&s.requestedRoundTrips.count)
}
// ActualRoundTrips returns count of requests that were actually sent.
func (s *Stats) ActualRoundTrips() uint64 {
return atomic.LoadUint64(&s.actualRoundTrips.count)
}
// FailedRoundTrips returns count of requests that failed.
func (s *Stats) FailedRoundTrips() uint64 {
return atomic.LoadUint64(&s.failedRoundTrips.count)
}
// CanceledByUserRoundTrips returns count of requests that were canceled by user, using request context.
func (s *Stats) CanceledByUserRoundTrips() uint64 {
return atomic.LoadUint64(&s.canceledByUserRoundTrips.count)
}
// CanceledSubRequests returns count of hedged sub-requests that were canceled by transport.
func (s *Stats) CanceledSubRequests() uint64 {
return atomic.LoadUint64(&s.canceledSubRequests.count)
}
// StatsSnapshot is a snapshot of Stats.
type StatsSnapshot struct {
RequestedRoundTrips uint64 // count of requests that were requested by client
ActualRoundTrips uint64 // count of requests that were actually sent
FailedRoundTrips uint64 // count of requests that failed
CanceledByUserRoundTrips uint64 // count of requests that were canceled by user, using request context
CanceledSubRequests uint64 // count of hedged sub-requests that were canceled by transport
}
// Snapshot of the stats.
func (s *Stats) Snapshot() StatsSnapshot {
return StatsSnapshot{
RequestedRoundTrips: s.RequestedRoundTrips(),
ActualRoundTrips: s.ActualRoundTrips(),
FailedRoundTrips: s.FailedRoundTrips(),
CanceledByUserRoundTrips: s.CanceledByUserRoundTrips(),
CanceledSubRequests: s.CanceledSubRequests(),
}
}
var taskQueue = make(chan func())
func runInPool(task func()) {
select {
case taskQueue <- task:
// submited, everything is ok
default:
go func() {
// do the given task
task()
const cleanupDuration = 10 * time.Second
cleanupTicker := time.NewTicker(cleanupDuration)
defer cleanupTicker.Stop()
for {
select {
case t := <-taskQueue:
t()
cleanupTicker.Reset(cleanupDuration)
case <-cleanupTicker.C:
return
}
}
}()
}
}
// MultiError is an error type to track multiple errors. This is used to
// accumulate errors in cases and return them as a single "error".
// Insiper by https://github.com/hashicorp/go-multierror
type MultiError struct {
Errors []error
ErrorFormatFn ErrorFormatFunc
}
func (e *MultiError) Error() string {
fn := e.ErrorFormatFn
if fn == nil {
fn = listFormatFunc
}
return fn(e.Errors)
}
func (e *MultiError) String() string {
return fmt.Sprintf("*%#v", e.Errors)
}
// ErrorOrNil returns an error if there are some.
func (e *MultiError) ErrorOrNil() error {
switch {
case e == nil || len(e.Errors) == 0:
return nil
default:
return e
}
}
// ErrorFormatFunc is called by MultiError to return the list of errors as a string.
type ErrorFormatFunc func([]error) string
func listFormatFunc(es []error) string {
if len(es) == 1 {
return fmt.Sprintf("1 error occurred:\n\t* %s\n\n", es[0])
}
points := make([]string, len(es))
for i, err := range es {
points[i] = fmt.Sprintf("* %s", err)
}
return fmt.Sprintf("%d errors occurred:\n\t%s\n\n", len(es), strings.Join(points, "\n\t"))
}
var timerPool = sync.Pool{New: func() interface{} {
return time.NewTimer(time.Second)
}}
func getTimer(duration time.Duration) *time.Timer {
timer := timerPool.Get().(*time.Timer)
timer.Reset(duration)
return timer
}
func returnTimer(timer *time.Timer) {
timer.Stop()
select {
case _ = <-timer.C:
default:
}
timerPool.Put(timer)
}

@ -308,6 +308,9 @@ github.com/cortexproject/cortex/pkg/util/spanlogger
github.com/cortexproject/cortex/pkg/util/test
github.com/cortexproject/cortex/pkg/util/validation
github.com/cortexproject/cortex/tools/querytee
# github.com/cristalhq/hedgedhttp v0.6.1
## explicit; go 1.16
github.com/cristalhq/hedgedhttp
# github.com/davecgh/go-spew v1.1.1
## explicit
github.com/davecgh/go-spew/spew

Loading…
Cancel
Save