[querier] s3: add getObject retry (#4453)

* [querier] s3: add getObject retry

* [querier] s3: add getObject retry

* [querier] s3: add getObject retry #4452

* [querier] s3: add getObject retry

* [querier] s3: add getObject retry #4453

* [querier] s3: add getObject retry

* [querier] s3: add getObject retry #4453

* [querier] s3: add getObject retry #4453

* [querier] s3: add getObject retry #4453

* [querier] s3: add getObject retry

Co-authored-by: Owen Diehl <ow.diehl@gmail.com>
pull/4542/head^2
李国忠 4 years ago committed by GitHub
parent e0b4e25962
commit 6fcd02bad4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 14
      docs/sources/configuration/_index.md
  3. 36
      pkg/storage/chunk/aws/s3_storage_client.go

@ -8,6 +8,7 @@
* [4425](https://github.com/grafana/loki/pull/4425) **trevorwhitney** and **slim-bean**: Add a ring for the query scheduler
* [4519](https://github.com/grafana/loki/pull/4519) **DylanGuedes** and **replay**: Loki: Enable FIFO cache by default
* [4520](https://github.com/grafana/loki/pull/4520) **jordanrushing** and **owen-d**: Introduce overrides-exporter module for tenant limits
* [4453](https://github.com/grafana/loki/pull/4453) **liguozhong**: Loki: Implement retry to s3 chunk storage
# 2.3.0 (2021/08/06)

@ -1366,6 +1366,20 @@ aws:
# CLI flag: -s3.http.ca-file
[ca_file: <string> | default = ""]
# Configures back off when s3 get Object.
backoff_config:
# Minimum duration to back off.
# CLI flag: -s3.backoff-min-period
[min_period: <duration> | default = 100ms]
# The duration to back off.
# CLI flag: -s3.backoff-max-period
[max_period: <duration> | default = 3s]
# Number of times to back off and retry before failing.
# CLI flag: -s3.backoff-retries
[max_retries: <int> | default = 5]
# Configure the DynamoDB connection
dynamodb:
# URL for DynamoDB with escaped Key and Secret encoded. If only region is specified as a

@ -31,6 +31,7 @@ import (
cortex_aws "github.com/cortexproject/cortex/pkg/chunk/aws"
cortex_s3 "github.com/cortexproject/cortex/pkg/storage/bucket/s3"
"github.com/cortexproject/cortex/pkg/util"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/flagext"
"github.com/grafana/loki/pkg/storage/chunk"
@ -76,6 +77,7 @@ type S3Config struct {
HTTPConfig HTTPConfig `yaml:"http_config"`
SignatureVersion string `yaml:"signature_version"`
SSEConfig cortex_s3.SSEConfig `yaml:"sse"`
BackoffConfig backoff.Config `yaml:"backoff_config"`
Inject InjectRequestMiddleware `yaml:"-"`
}
@ -116,6 +118,9 @@ func (cfg *S3Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.BoolVar(&cfg.HTTPConfig.InsecureSkipVerify, prefix+"s3.http.insecure-skip-verify", false, "Set to true to skip verifying the certificate chain and hostname.")
f.StringVar(&cfg.HTTPConfig.CAFile, prefix+"s3.http.ca-file", "", "Path to the trusted CA file that signed the SSL certificate of the S3 endpoint.")
f.StringVar(&cfg.SignatureVersion, prefix+"s3.signature-version", SignatureVersionV4, fmt.Sprintf("The signature version to use for authenticating against S3. Supported values are: %s.", strings.Join(supportedSignatureVersions, ", ")))
f.DurationVar(&cfg.BackoffConfig.MinBackoff, "s3.min-backoff", 100*time.Millisecond, "Minimum backoff time when s3 get Object")
f.DurationVar(&cfg.BackoffConfig.MaxBackoff, "s3.max-backoff", 3*time.Second, "Maximum backoff time when s3 get Object")
f.IntVar(&cfg.BackoffConfig.MaxRetries, "s3.max-retries", 5, "Maximum number of times to retry when s3 get Object")
}
// Validate config and returns error on failure
@ -153,6 +158,7 @@ func (cfg *HTTPConfig) ToCortexHTTPConfig() cortex_aws.HTTPConfig {
}
type S3ObjectClient struct {
cfg S3Config
bucketNames []string
S3 s3iface.S3API
sseConfig *SSEParsedConfig
@ -182,6 +188,7 @@ func NewS3ObjectClient(cfg S3Config) (*S3ObjectClient, error) {
}
client := S3ObjectClient{
cfg: cfg,
S3: s3Client,
bucketNames: bucketNames,
sseConfig: sseCfg,
@ -360,19 +367,26 @@ func (a *S3ObjectClient) GetObject(ctx context.Context, objectKey string) (io.Re
// Map the key into a bucket
bucket := a.bucketFromKey(objectKey)
err := instrument.CollectedRequest(ctx, "S3.GetObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
var err error
resp, err = a.S3.GetObjectWithContext(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(objectKey),
retries := backoff.New(ctx, a.cfg.BackoffConfig)
err := ctx.Err()
for retries.Ongoing() {
if ctx.Err() != nil {
return nil, errors.Wrap(ctx.Err(), "ctx related error during s3 getObject")
}
err = instrument.CollectedRequest(ctx, "S3.GetObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
var requestErr error
resp, requestErr = a.S3.GetObjectWithContext(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(objectKey),
})
return requestErr
})
return err
})
if err != nil {
return nil, err
if err == nil {
return resp.Body, nil
}
retries.Wait()
}
return resp.Body, nil
return nil, errors.Wrap(err, "failed to get s3 object")
}
// PutObject into the store

Loading…
Cancel
Save