feat(storage): AWS backend using thanos.io/objstore (#11221)

Co-authored-by: Ashwanth Goli <iamashwanth@gmail.com>
pull/14630/head
Joao Marcal 1 year ago committed by GitHub
parent 51c42e8645
commit b87224647d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 7
      pkg/storage/bucket/client.go
  2. 42
      pkg/storage/bucket/s3/bucket_client.go
  3. 164
      pkg/storage/bucket/s3/config.go
  4. 250
      pkg/storage/bucket/s3/config_test.go
  5. 11
      pkg/storage/chunk/client/aws/s3_storage_client.go
  6. 44
      pkg/storage/chunk/client/aws/s3_thanos_object_client.go
  7. 4
      pkg/storage/factory.go
  8. 9
      pkg/util/http.go

@ -84,10 +84,9 @@ func (cfg *StorageBackendConfig) RegisterFlagsWithPrefix(prefix string, f *flag.
}
func (cfg *StorageBackendConfig) Validate() error {
// TODO: enable validation when s3 flags are registered
// if err := cfg.S3.Validate(); err != nil {
// return err
//}
if err := cfg.S3.Validate(); err != nil {
return err
}
return nil
}

@ -4,6 +4,7 @@ import (
"github.com/go-kit/log"
"github.com/prometheus/common/model"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/exthttp"
"github.com/thanos-io/objstore/providers/s3"
)
@ -38,17 +39,28 @@ func newS3Config(cfg Config) (s3.Config, error) {
return s3.Config{}, err
}
putUserMetadata := map[string]string{}
if cfg.StorageClass != "" {
putUserMetadata[awsStorageClassHeader] = cfg.StorageClass
}
return s3.Config{
Bucket: cfg.BucketName,
Endpoint: cfg.Endpoint,
Region: cfg.Region,
AccessKey: cfg.AccessKeyID,
SecretKey: cfg.SecretAccessKey.String(),
SessionToken: cfg.SessionToken.String(),
Insecure: cfg.Insecure,
DisableDualstack: cfg.DisableDualstack,
SSEConfig: sseCfg,
PutUserMetadata: map[string]string{awsStorageClassHeader: cfg.StorageClass},
Bucket: cfg.BucketName,
Endpoint: cfg.Endpoint,
Region: cfg.Region,
AccessKey: cfg.AccessKeyID,
SecretKey: cfg.SecretAccessKey.String(),
SessionToken: cfg.SessionToken.String(),
Insecure: cfg.Insecure,
PutUserMetadata: putUserMetadata,
SendContentMd5: cfg.SendContentMd5,
SSEConfig: sseCfg,
DisableDualstack: !cfg.DualstackEnabled,
ListObjectsVersion: cfg.ListObjectsVersion,
BucketLookupType: cfg.BucketLookupType,
AWSSDKAuth: cfg.NativeAWSAuthEnabled,
PartSize: cfg.PartSize,
HTTPConfig: s3.HTTPConfig{
IdleConnTimeout: model.Duration(cfg.HTTP.IdleConnTimeout),
ResponseHeaderTimeout: model.Duration(cfg.HTTP.ResponseHeaderTimeout),
@ -59,6 +71,16 @@ func newS3Config(cfg Config) (s3.Config, error) {
MaxIdleConnsPerHost: cfg.HTTP.MaxIdleConnsPerHost,
MaxConnsPerHost: cfg.HTTP.MaxConnsPerHost,
Transport: cfg.HTTP.Transport,
TLSConfig: exthttp.TLSConfig{
CAFile: cfg.HTTP.TLSConfig.CAPath,
CertFile: cfg.HTTP.TLSConfig.CertPath,
KeyFile: cfg.HTTP.TLSConfig.KeyPath,
ServerName: cfg.HTTP.TLSConfig.ServerName,
},
},
TraceConfig: s3.TraceConfig{
Enable: cfg.TraceConfig.Enabled,
},
STSEndpoint: cfg.STSEndpoint,
}, nil
}

@ -5,23 +5,20 @@ import (
"flag"
"fmt"
"net/http"
"slices"
"strings"
"time"
s3_service "github.com/aws/aws-sdk-go/service/s3"
"github.com/grafana/dskit/flagext"
"github.com/minio/minio-go/v7/pkg/encrypt"
"github.com/pkg/errors"
"github.com/thanos-io/objstore/providers/s3"
bucket_http "github.com/grafana/loki/v3/pkg/storage/bucket/http"
"github.com/grafana/loki/v3/pkg/storage/common/aws"
"github.com/grafana/loki/v3/pkg/util"
)
const (
// Signature Version 2 is being turned off (deprecated) in Amazon S3. Amazon S3 will then only accept API requests that are signed using Signature Version 4.
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingAWSSDK.html#UsingAWSSDK-sig2-deprecation
SignatureVersionV4 = "v4"
// SSEKMS config type constant to configure S3 server side encryption using KMS
// https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingKMSEncryption.html
SSEKMS = "SSE-KMS"
@ -32,41 +29,99 @@ const (
)
var (
supportedSignatureVersions = []string{SignatureVersionV4}
supportedSSETypes = []string{SSEKMS, SSES3}
errUnsupportedSignatureVersion = errors.New("unsupported signature version")
errUnsupportedSSEType = errors.New("unsupported S3 SSE type")
errInvalidSSEContext = errors.New("invalid S3 SSE encryption context")
supportedSSETypes = []string{SSEKMS, SSES3}
supportedStorageClasses = s3_service.ObjectStorageClass_Values()
supportedBucketLookupTypes = thanosS3BucketLookupTypesValues()
errUnsupportedSSEType = errors.New("unsupported S3 SSE type")
errUnsupportedStorageClass = fmt.Errorf("unsupported S3 storage class (supported values: %s)", strings.Join(supportedStorageClasses, ", "))
errInvalidSSEContext = errors.New("invalid S3 SSE encryption context")
errInvalidEndpointPrefix = errors.New("the endpoint must not prefixed with the bucket name")
errInvalidSTSEndpoint = errors.New("sts-endpoint must be a valid url")
)
var thanosS3BucketLookupTypes = map[string]s3.BucketLookupType{
s3.AutoLookup.String(): s3.AutoLookup,
s3.VirtualHostLookup.String(): s3.VirtualHostLookup,
s3.PathLookup.String(): s3.PathLookup,
}
func thanosS3BucketLookupTypesValues() (list []string) {
for k := range thanosS3BucketLookupTypes {
list = append(list, k)
}
// sort the list for consistent output in help, where it's used
slices.Sort(list)
return list
}
// HTTPConfig stores the http.Transport configuration for the s3 minio client.
type HTTPConfig struct {
bucket_http.Config `yaml:",inline"`
IdleConnTimeout time.Duration `yaml:"idle_conn_timeout" category:"advanced"`
ResponseHeaderTimeout time.Duration `yaml:"response_header_timeout" category:"advanced"`
InsecureSkipVerify bool `yaml:"insecure_skip_verify" category:"advanced"`
TLSHandshakeTimeout time.Duration `yaml:"tls_handshake_timeout" category:"advanced"`
ExpectContinueTimeout time.Duration `yaml:"expect_continue_timeout" category:"advanced"`
MaxIdleConns int `yaml:"max_idle_connections" category:"advanced"`
MaxIdleConnsPerHost int `yaml:"max_idle_connections_per_host" category:"advanced"`
MaxConnsPerHost int `yaml:"max_connections_per_host" category:"advanced"`
// Allow upstream callers to inject a round tripper
Transport http.RoundTripper `yaml:"-"`
TLSConfig TLSConfig `yaml:",inline"`
}
// TLSConfig configures the options for TLS connections.
type TLSConfig struct {
CAPath string `yaml:"tls_ca_path" category:"advanced"`
CertPath string `yaml:"tls_cert_path" category:"advanced"`
KeyPath string `yaml:"tls_key_path" category:"advanced"`
ServerName string `yaml:"tls_server_name" category:"advanced"`
}
// RegisterFlagsWithPrefix registers the flags for s3 storage with the provided prefix
func (cfg *HTTPConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.Config.RegisterFlagsWithPrefix(prefix+"s3.", f)
f.DurationVar(&cfg.IdleConnTimeout, prefix+"s3.http.idle-conn-timeout", 90*time.Second, "The time an idle connection will remain idle before closing.")
f.DurationVar(&cfg.ResponseHeaderTimeout, prefix+"s3.http.response-header-timeout", 2*time.Minute, "The amount of time the client will wait for a servers response headers.")
f.BoolVar(&cfg.InsecureSkipVerify, prefix+"s3.http.insecure-skip-verify", false, "If the client connects to S3 via HTTPS and this option is enabled, the client will accept any certificate and hostname.")
f.DurationVar(&cfg.TLSHandshakeTimeout, prefix+"s3.tls-handshake-timeout", 10*time.Second, "Maximum time to wait for a TLS handshake. 0 means no limit.")
f.DurationVar(&cfg.ExpectContinueTimeout, prefix+"s3.expect-continue-timeout", 1*time.Second, "The time to wait for a server's first response headers after fully writing the request headers if the request has an Expect header. 0 to send the request body immediately.")
f.IntVar(&cfg.MaxIdleConns, prefix+"s3.max-idle-connections", 100, "Maximum number of idle (keep-alive) connections across all hosts. 0 means no limit.")
f.IntVar(&cfg.MaxIdleConnsPerHost, prefix+"s3.max-idle-connections-per-host", 100, "Maximum number of idle (keep-alive) connections to keep per-host. If 0, a built-in default value is used.")
f.IntVar(&cfg.MaxConnsPerHost, prefix+"s3.max-connections-per-host", 0, "Maximum number of connections per host. 0 means no limit.")
cfg.TLSConfig.RegisterFlagsWithPrefix(prefix, f)
}
// RegisterFlagsWithPrefix registers the flags for s3 storage with the provided prefix.
func (cfg *TLSConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.CAPath, prefix+"s3.http.tls-ca-path", "", "Path to the CA certificates to validate server certificate against. If not set, the host's root CA certificates are used.")
f.StringVar(&cfg.CertPath, prefix+"s3.http.tls-cert-path", "", "Path to the client certificate, which will be used for authenticating with the server. Also requires the key path to be configured.")
f.StringVar(&cfg.KeyPath, prefix+"s3.http.tls-key-path", "", "Path to the key for the client certificate. Also requires the client certificate to be configured.")
f.StringVar(&cfg.ServerName, prefix+"s3.http.tls-server-name", "", "Override the expected name on the server certificate.")
}
// Config holds the config options for an S3 backend
type Config struct {
Endpoint string `yaml:"endpoint"`
Region string `yaml:"region"`
BucketName string `yaml:"bucket_name"`
SecretAccessKey flagext.Secret `yaml:"secret_access_key"`
SessionToken flagext.Secret `yaml:"session_token"`
AccessKeyID string `yaml:"access_key_id"`
Insecure bool `yaml:"insecure"`
DisableDualstack bool `yaml:"disable_dualstack"`
SignatureVersion string `yaml:"signature_version"`
StorageClass string `yaml:"storage_class"`
Endpoint string `yaml:"endpoint"`
Region string `yaml:"region"`
BucketName string `yaml:"bucket_name"`
SecretAccessKey flagext.Secret `yaml:"secret_access_key"`
AccessKeyID string `yaml:"access_key_id"`
SessionToken flagext.Secret `yaml:"session_token"`
Insecure bool `yaml:"insecure" category:"advanced"`
ListObjectsVersion string `yaml:"list_objects_version" category:"advanced"`
BucketLookupType s3.BucketLookupType `yaml:"bucket_lookup_type" category:"advanced"`
DualstackEnabled bool `yaml:"dualstack_enabled" category:"experimental"`
StorageClass string `yaml:"storage_class" category:"experimental"`
NativeAWSAuthEnabled bool `yaml:"native_aws_auth_enabled" category:"experimental"`
PartSize uint64 `yaml:"part_size" category:"experimental"`
SendContentMd5 bool `yaml:"send_content_md5" category:"experimental"`
STSEndpoint string `yaml:"sts_endpoint"`
SSE SSEConfig `yaml:"sse"`
HTTP HTTPConfig `yaml:"http"`
SSE SSEConfig `yaml:"sse"`
HTTP HTTPConfig `yaml:"http"`
TraceConfig TraceConfig `yaml:"trace"`
}
// RegisterFlags registers the flags for s3 storage with the provided prefix
@ -83,21 +138,32 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.Region, prefix+"s3.region", "", "S3 region. If unset, the client will issue a S3 GetBucketLocation API call to autodetect it.")
f.StringVar(&cfg.Endpoint, prefix+"s3.endpoint", "", "The S3 bucket endpoint. It could be an AWS S3 endpoint listed at https://docs.aws.amazon.com/general/latest/gr/s3.html or the address of an S3-compatible service in hostname:port format.")
f.BoolVar(&cfg.Insecure, prefix+"s3.insecure", false, "If enabled, use http:// for the S3 endpoint instead of https://. This could be useful in local dev/test environments while using an S3-compatible backend storage, like Minio.")
f.BoolVar(&cfg.DisableDualstack, prefix+"s3.disable-dualstack", false, "Disable forcing S3 dualstack endpoint usage.")
f.StringVar(&cfg.SignatureVersion, prefix+"s3.signature-version", SignatureVersionV4, fmt.Sprintf("The signature version to use for authenticating against S3. Supported values are: %s.", strings.Join(supportedSignatureVersions, ", ")))
f.StringVar(&cfg.StorageClass, prefix+"s3.storage-class", aws.StorageClassStandard, "The S3 storage class to use. Details can be found at https://aws.amazon.com/s3/storage-classes/.")
f.StringVar(&cfg.ListObjectsVersion, prefix+"s3.list-objects-version", "", "Use a specific version of the S3 list object API. Supported values are v1 or v2. Default is unset.")
f.StringVar(&cfg.StorageClass, prefix+"s3.storage-class", "", "The S3 storage class to use, not set by default. Details can be found at https://aws.amazon.com/s3/storage-classes/. Supported values are: "+strings.Join(supportedStorageClasses, ", "))
f.BoolVar(&cfg.NativeAWSAuthEnabled, prefix+"s3.native-aws-auth-enabled", false, "If enabled, it will use the default authentication methods of the AWS SDK for go based on known environment variables and known AWS config files.")
f.Uint64Var(&cfg.PartSize, prefix+"s3.part-size", 0, "The minimum file size in bytes used for multipart uploads. If 0, the value is optimally computed for each object.")
f.BoolVar(&cfg.SendContentMd5, prefix+"s3.send-content-md5", false, "If enabled, a Content-MD5 header is sent with S3 Put Object requests. Consumes more resources to compute the MD5, but may improve compatibility with object storage services that do not support checksums.")
f.Var(newBucketLookupTypeValue(s3.AutoLookup, &cfg.BucketLookupType), prefix+"s3.bucket-lookup-type", fmt.Sprintf("Bucket lookup style type, used to access bucket in S3-compatible service. Default is auto. Supported values are: %s.", strings.Join(supportedBucketLookupTypes, ", ")))
f.BoolVar(&cfg.DualstackEnabled, prefix+"s3.dualstack-enabled", true, "When enabled, direct all AWS S3 requests to the dual-stack IPv4/IPv6 endpoint for the configured region.")
f.StringVar(&cfg.STSEndpoint, prefix+"s3.sts-endpoint", "", "Accessing S3 resources using temporary, secure credentials provided by AWS Security Token Service.")
cfg.SSE.RegisterFlagsWithPrefix(prefix+"s3.sse.", f)
cfg.HTTP.RegisterFlagsWithPrefix(prefix, f)
cfg.TraceConfig.RegisterFlagsWithPrefix(prefix+"s3.trace.", f)
}
// Validate config and returns error on failure
func (cfg *Config) Validate() error {
if !util.StringsContain(supportedSignatureVersions, cfg.SignatureVersion) {
return errUnsupportedSignatureVersion
if cfg.Endpoint != "" {
endpoint := strings.Split(cfg.Endpoint, ".")
if cfg.BucketName != "" && endpoint[0] != "" && endpoint[0] == cfg.BucketName {
return errInvalidEndpointPrefix
}
}
if err := aws.ValidateStorageClass(cfg.StorageClass); err != nil {
return err
if cfg.STSEndpoint != "" && !util.IsValidURL(cfg.STSEndpoint) {
return errInvalidSTSEndpoint
}
if !slices.Contains(supportedStorageClasses, cfg.StorageClass) && cfg.StorageClass != "" {
return errUnsupportedStorageClass
}
return cfg.SSE.Validate()
@ -191,3 +257,35 @@ func parseKMSEncryptionContext(data string) (map[string]string, error) {
err := errors.Wrap(json.Unmarshal([]byte(data), &decoded), "unable to parse KMS encryption context")
return decoded, err
}
type TraceConfig struct {
Enabled bool `yaml:"enabled" category:"advanced"`
}
func (cfg *TraceConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.BoolVar(&cfg.Enabled, prefix+"enabled", false, "When enabled, low-level S3 HTTP operation information is logged at the debug level.")
}
// bucketLookupTypeValue is an adapter between s3.BucketLookupType and flag.Value.
type bucketLookupTypeValue s3.BucketLookupType
func newBucketLookupTypeValue(value s3.BucketLookupType, p *s3.BucketLookupType) *bucketLookupTypeValue {
*p = value
return (*bucketLookupTypeValue)(p)
}
func (v *bucketLookupTypeValue) String() string {
if v == nil {
return s3.AutoLookup.String()
}
return s3.BucketLookupType(*v).String()
}
func (v *bucketLookupTypeValue) Set(s string) error {
t, ok := thanosS3BucketLookupTypes[s]
if !ok {
return fmt.Errorf("unsupported bucket lookup type: %s", s)
}
*v = bucketLookupTypeValue(t)
return nil
}

@ -1,127 +1,23 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/cortexproject/cortex/blob/master/pkg/storage/bucket/s3/config_test.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Cortex Authors.
package s3
import (
"bytes"
"encoding/base64"
"fmt"
"net/http"
"strings"
"testing"
"time"
s3_service "github.com/aws/aws-sdk-go/service/s3"
"github.com/grafana/dskit/flagext"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"
bucket_http "github.com/grafana/loki/v3/pkg/storage/bucket/http"
"github.com/grafana/loki/v3/pkg/storage/common/aws"
"gopkg.in/yaml.v3"
)
// defaultConfig should match the default flag values defined in RegisterFlagsWithPrefix.
var defaultConfig = Config{
SignatureVersion: SignatureVersionV4,
StorageClass: aws.StorageClassStandard,
HTTP: HTTPConfig{
Config: bucket_http.Config{
IdleConnTimeout: 90 * time.Second,
ResponseHeaderTimeout: 2 * time.Minute,
InsecureSkipVerify: false,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
MaxIdleConns: 100,
MaxIdleConnsPerHost: 100,
MaxConnsPerHost: 0,
},
},
}
func TestConfig(t *testing.T) {
t.Parallel()
tests := map[string]struct {
config string
expectedConfig Config
expectedErr error
}{
"default config": {
config: "",
expectedConfig: defaultConfig,
expectedErr: nil,
},
"custom config": {
config: `
endpoint: test-endpoint
region: test-region
bucket_name: test-bucket-name
secret_access_key: test-secret-access-key
access_key_id: test-access-key-id
insecure: true
signature_version: test-signature-version
storage_class: test-storage-class
disable_dualstack: true
sse:
type: test-type
kms_key_id: test-kms-key-id
kms_encryption_context: test-kms-encryption-context
http:
idle_conn_timeout: 2s
response_header_timeout: 3s
insecure_skip_verify: true
tls_handshake_timeout: 4s
expect_continue_timeout: 5s
max_idle_connections: 6
max_idle_connections_per_host: 7
max_connections_per_host: 8
`,
expectedConfig: Config{
Endpoint: "test-endpoint",
Region: "test-region",
BucketName: "test-bucket-name",
SecretAccessKey: flagext.SecretWithValue("test-secret-access-key"),
AccessKeyID: "test-access-key-id",
Insecure: true,
SignatureVersion: "test-signature-version",
StorageClass: "test-storage-class",
DisableDualstack: true,
SSE: SSEConfig{
Type: "test-type",
KMSKeyID: "test-kms-key-id",
KMSEncryptionContext: "test-kms-encryption-context",
},
HTTP: HTTPConfig{
Config: bucket_http.Config{
IdleConnTimeout: 2 * time.Second,
ResponseHeaderTimeout: 3 * time.Second,
InsecureSkipVerify: true,
TLSHandshakeTimeout: 4 * time.Second,
ExpectContinueTimeout: 5 * time.Second,
MaxIdleConns: 6,
MaxIdleConnsPerHost: 7,
MaxConnsPerHost: 8,
},
},
},
expectedErr: nil,
},
"invalid type": {
config: `insecure: foo`,
expectedConfig: defaultConfig,
expectedErr: &yaml.TypeError{Errors: []string{"line 1: cannot unmarshal !!str `foo` into bool"}},
},
}
for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
cfg := Config{}
flagext.DefaultValues(&cfg)
err := yaml.Unmarshal([]byte(testData.config), &cfg)
require.Equal(t, testData.expectedErr, err)
require.Equal(t, testData.expectedConfig, cfg)
})
}
}
func TestSSEConfig_Validate(t *testing.T) {
tests := map[string]struct {
setup func() *SSEConfig
@ -169,6 +65,85 @@ func TestSSEConfig_Validate(t *testing.T) {
}
}
func TestConfig_Validate(t *testing.T) {
tests := map[string]struct {
setup func() *Config
expected error
}{
"should pass with default config": {
setup: func() *Config {
sseCfg := &SSEConfig{}
flagext.DefaultValues(sseCfg)
cfg := &Config{
Endpoint: "s3.eu-central-1.amazonaws.com",
BucketName: "mimir-block",
SSE: *sseCfg,
StorageClass: s3_service.StorageClassStandard,
}
return cfg
},
},
"should fail if invalid storage class is set": {
setup: func() *Config {
return &Config{
StorageClass: "foo",
}
},
expected: errUnsupportedStorageClass,
},
"should fail on invalid endpoint prefix": {
setup: func() *Config {
return &Config{
Endpoint: "mimir-blocks.s3.eu-central-1.amazonaws.com",
BucketName: "mimir-blocks",
StorageClass: s3_service.StorageClassStandard,
}
},
expected: errInvalidEndpointPrefix,
},
"should pass if native_aws_auth_enabled is set": {
setup: func() *Config {
return &Config{
NativeAWSAuthEnabled: true,
}
},
},
"should pass with using sts endpoint": {
setup: func() *Config {
sseCfg := &SSEConfig{}
flagext.DefaultValues(sseCfg)
cfg := &Config{
BucketName: "mimir-block",
SSE: *sseCfg,
StorageClass: s3_service.StorageClassStandard,
STSEndpoint: "https://sts.eu-central-1.amazonaws.com",
}
return cfg
},
},
"should not pass with using sts endpoint as its using an invalid url": {
setup: func() *Config {
sseCfg := &SSEConfig{}
flagext.DefaultValues(sseCfg)
cfg := &Config{
BucketName: "mimir-block",
SSE: *sseCfg,
StorageClass: s3_service.StorageClassStandard,
STSEndpoint: "sts.eu-central-1.amazonaws.com",
}
return cfg
},
expected: errInvalidSTSEndpoint,
},
}
for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
assert.Equal(t, testData.expected, testData.setup().Validate())
})
}
}
func TestSSEConfig_BuildMinioConfig(t *testing.T) {
tests := map[string]struct {
cfg *SSEConfig
@ -225,31 +200,32 @@ func TestParseKMSEncryptionContext(t *testing.T) {
assert.Equal(t, expected, actual)
}
func TestConfig_Validate(t *testing.T) {
tests := map[string]struct {
cfg Config
expectedErr error
}{
"should fail if invalid signature version is set": {
Config{SignatureVersion: "foo"},
errUnsupportedSignatureVersion,
},
"should pass if valid signature version is set": {
defaultConfig,
nil,
},
"should fail if invalid storage class is set": {
Config{SignatureVersion: SignatureVersionV4, StorageClass: "foo"},
fmt.Errorf("unsupported S3 storage class: foo. Supported values: %s", strings.Join(aws.SupportedStorageClasses, ", ")),
},
"should pass if valid storage signature version is set": {
Config{SignatureVersion: SignatureVersionV4, StorageClass: aws.StorageClassStandardInfrequentAccess},
nil,
},
}
func TestConfigParsesCredentialsInlineWithSessionToken(t *testing.T) {
var cfg = Config{}
yamlCfg := `
access_key_id: access key id
secret_access_key: secret access key
session_token: session token
`
err := yaml.Unmarshal([]byte(yamlCfg), &cfg)
require.NoError(t, err)
require.Equal(t, cfg.AccessKeyID, "access key id")
require.Equal(t, cfg.SecretAccessKey.String(), "secret access key")
require.Equal(t, cfg.SessionToken.String(), "session token")
}
for name, test := range tests {
actual := test.cfg.Validate()
assert.Equal(t, test.expectedErr, actual, name)
func TestConfigRedactsCredentials(t *testing.T) {
cfg := Config{
AccessKeyID: "access key id",
SecretAccessKey: flagext.SecretWithValue("secret access key"),
SessionToken: flagext.SecretWithValue("session token"),
}
output, err := yaml.Marshal(cfg)
require.NoError(t, err)
require.True(t, bytes.Contains(output, []byte("access key id")))
require.False(t, bytes.Contains(output, []byte("secret access id")))
require.False(t, bytes.Contains(output, []byte("session token")))
}

@ -563,7 +563,7 @@ func isContextErr(err error) bool {
}
// IsStorageTimeoutErr returns true if error means that object cannot be retrieved right now due to server-side timeouts.
func (a *S3ObjectClient) IsStorageTimeoutErr(err error) bool {
func IsStorageTimeoutErr(err error) bool {
// TODO(dannyk): move these out to be generic
// context errors are all client-side
if isContextErr(err) {
@ -599,7 +599,7 @@ func (a *S3ObjectClient) IsStorageTimeoutErr(err error) bool {
}
// IsStorageThrottledErr returns true if error means that object cannot be retrieved right now due to throttling.
func (a *S3ObjectClient) IsStorageThrottledErr(err error) bool {
func IsStorageThrottledErr(err error) bool {
if rerr, ok := err.(awserr.RequestFailure); ok {
// https://docs.aws.amazon.com/sdkref/latest/guide/feature-retry-behavior.html
@ -609,6 +609,11 @@ func (a *S3ObjectClient) IsStorageThrottledErr(err error) bool {
return false
}
func IsRetryableErr(err error) bool {
return IsStorageTimeoutErr(err) || IsStorageThrottledErr(err)
}
func (a *S3ObjectClient) IsRetryableErr(err error) bool {
return a.IsStorageTimeoutErr(err) || a.IsStorageThrottledErr(err)
return IsRetryableErr(err)
}

@ -0,0 +1,44 @@
package aws
import (
"context"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"
"github.com/grafana/loki/v3/pkg/storage/bucket"
"github.com/grafana/loki/v3/pkg/storage/chunk/client"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging"
)
func NewS3ThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedgingCfg hedging.Config) (client.ObjectClient, error) {
b, err := newS3ThanosObjectClient(ctx, cfg, component, logger, false, hedgingCfg)
if err != nil {
return nil, err
}
var hedged objstore.Bucket
if hedgingCfg.At != 0 {
hedged, err = newS3ThanosObjectClient(ctx, cfg, component, logger, true, hedgingCfg)
if err != nil {
return nil, err
}
}
o := bucket.NewObjectClientAdapter(b, hedged, logger, bucket.WithRetryableErrFunc(IsRetryableErr))
return o, nil
}
func newS3ThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedging bool, hedgingCfg hedging.Config) (objstore.Bucket, error) {
if hedging {
hedgedTrasport, err := hedgingCfg.RoundTripperWithRegisterer(nil, prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer))
if err != nil {
return nil, err
}
cfg.S3.HTTP.Transport = hedgedTrasport
}
return bucket.NewClient(ctx, bucket.S3, cfg, component, logger)
}

@ -654,6 +654,10 @@ func internalNewObjectClient(storeName, component string, cfg Config, clientMetr
if cfg.CongestionControl.Enabled {
s3Cfg.BackoffConfig.MaxRetries = 1
}
if cfg.UseThanosObjstore {
return aws.NewS3ThanosObjectClient(context.Background(), cfg.ObjectStore, component, util_log.Logger, cfg.Hedging)
}
return aws.NewS3ObjectClient(s3Cfg, cfg.Hedging)
case types.StorageTypeAlibabaCloud:

@ -298,3 +298,12 @@ func FlagFromValues(values url.Values, key string, d bool) bool {
return d
}
}
func IsValidURL(endpoint string) bool {
u, err := url.Parse(endpoint)
if err != nil {
return false
}
return u.Scheme != "" && u.Host != ""
}

Loading…
Cancel
Save