diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 40ee8f681f..f8a1b518fa 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -5976,6 +5976,12 @@ The `swift_storage_config` block configures the connection to OpenStack Object S # is received on a request. # CLI flag: -.swift.request-timeout [request_timeout: | default = 5s] + +http: + # Path to the CA certificates to validate server certificate against. If not + # set, the host's root CA certificates are used. + # CLI flag: -.swift.http.tls-ca-path + [tls_ca_path: | default = ""] ``` ### table_manager diff --git a/pkg/logqlmodel/stats/context.go b/pkg/logqlmodel/stats/context.go index 36235d8d9f..cacd509a11 100644 --- a/pkg/logqlmodel/stats/context.go +++ b/pkg/logqlmodel/stats/context.go @@ -108,7 +108,7 @@ func (c *Context) Index() Index { return c.index } -// Merge index stats from multiple respones in a concurrency-safe manner +// Merge index stats from multiple response in a concurrency-safe manner func (c *Context) MergeIndex(i Index) { c.mtx.Lock() defer c.mtx.Unlock() diff --git a/pkg/loki/config_wrapper_test.go b/pkg/loki/config_wrapper_test.go index 203baecf4a..952804d607 100644 --- a/pkg/loki/config_wrapper_test.go +++ b/pkg/loki/config_wrapper_test.go @@ -547,9 +547,9 @@ memberlist: assert.Equal(t, "swift", config.Ruler.StoreConfig.Type) - for _, actual := range []swift.Config{ - config.Ruler.StoreConfig.Swift.Config, - config.StorageConfig.Swift.Config, + for _, actual := range []openstack.SwiftConfig{ + config.Ruler.StoreConfig.Swift, + config.StorageConfig.Swift, } { assert.Equal(t, 3, actual.AuthVersion) assert.Equal(t, "http://example.com", actual.AuthURL) @@ -557,7 +557,7 @@ memberlist: assert.Equal(t, "example.com", actual.UserDomainName) assert.Equal(t, "1", actual.UserDomainID) assert.Equal(t, "27", actual.UserID) - assert.Equal(t, "supersecret", actual.Password) + assert.Equal(t, flagext.SecretWithValue("supersecret"), actual.Password) assert.Equal(t, "2", actual.DomainID) assert.Equal(t, "test.com", actual.DomainName) assert.Equal(t, "13", actual.ProjectID) diff --git a/pkg/storage/bucket/client.go b/pkg/storage/bucket/client.go index 327f6f3f9c..6372aea2a9 100644 --- a/pkg/storage/bucket/client.go +++ b/pkg/storage/bucket/client.go @@ -176,7 +176,7 @@ func (cfg *Config) configureTransport(backend string, rt http.RoundTripper) erro case Azure: cfg.Azure.Transport = rt case Swift: - cfg.Swift.Transport = rt + cfg.Swift.HTTP.Transport = rt case Filesystem, Alibaba, BOS: // do nothing default: diff --git a/pkg/storage/bucket/http/config.go b/pkg/storage/bucket/http/config.go index 509de0bf30..0527051baf 100644 --- a/pkg/storage/bucket/http/config.go +++ b/pkg/storage/bucket/http/config.go @@ -2,6 +2,7 @@ package http import ( "flag" + "net/http" "time" ) @@ -15,7 +16,19 @@ type Config struct { MaxIdleConns int `yaml:"max_idle_connections"` MaxIdleConnsPerHost int `yaml:"max_idle_connections_per_host"` MaxConnsPerHost int `yaml:"max_connections_per_host"` - CAFile string `yaml:"ca_file"` + + // Allow upstream callers to inject a round tripper + Transport http.RoundTripper `yaml:"-"` + + TLSConfig TLSConfig `yaml:",inline"` +} + +// TLSConfig configures the options for TLS connections. +type TLSConfig struct { + CAPath string `yaml:"tls_ca_path" category:"advanced"` + CertPath string `yaml:"tls_cert_path" category:"advanced"` + KeyPath string `yaml:"tls_key_path" category:"advanced"` + ServerName string `yaml:"tls_server_name" category:"advanced"` } // RegisterFlags registers the flags for the storage HTTP client. @@ -25,13 +38,21 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { // RegisterFlagsWithPrefix registers the flags for the storage HTTP client with the provided prefix. func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - f.DurationVar(&cfg.IdleConnTimeout, prefix+"idle-conn-timeout", 90*time.Second, "The time an idle connection will remain idle before closing.") - f.DurationVar(&cfg.ResponseHeaderTimeout, prefix+"response-header-timeout", 2*time.Minute, "The amount of time the client will wait for a servers response headers.") - f.BoolVar(&cfg.InsecureSkipVerify, prefix+"insecure-skip-verify", false, "If the client connects via HTTPS and this option is enabled, the client will accept any certificate and hostname.") + f.DurationVar(&cfg.IdleConnTimeout, prefix+"http.idle-conn-timeout", 90*time.Second, "The time an idle connection will remain idle before closing.") + f.DurationVar(&cfg.ResponseHeaderTimeout, prefix+"http.response-header-timeout", 2*time.Minute, "The amount of time the client will wait for a servers response headers.") + f.BoolVar(&cfg.InsecureSkipVerify, prefix+"http.insecure-skip-verify", false, "If the client connects via HTTPS and this option is enabled, the client will accept any certificate and hostname.") f.DurationVar(&cfg.TLSHandshakeTimeout, prefix+"tls-handshake-timeout", 10*time.Second, "Maximum time to wait for a TLS handshake. 0 means no limit.") f.DurationVar(&cfg.ExpectContinueTimeout, prefix+"expect-continue-timeout", 1*time.Second, "The time to wait for a server's first response headers after fully writing the request headers if the request has an Expect header. 0 to send the request body immediately.") f.IntVar(&cfg.MaxIdleConns, prefix+"max-idle-connections", 100, "Maximum number of idle (keep-alive) connections across all hosts. 0 means no limit.") f.IntVar(&cfg.MaxIdleConnsPerHost, prefix+"max-idle-connections-per-host", 100, "Maximum number of idle (keep-alive) connections to keep per-host. If 0, a built-in default value is used.") f.IntVar(&cfg.MaxConnsPerHost, prefix+"max-connections-per-host", 0, "Maximum number of connections per host. 0 means no limit.") - f.StringVar(&cfg.CAFile, prefix+"ca-file", "", "Path to the trusted CA file that signed the SSL certificate of the object storage endpoint.") + cfg.TLSConfig.RegisterFlagsWithPrefix(prefix, f) +} + +// RegisterFlagsWithPrefix registers the flags for s3 storage with the provided prefix. +func (cfg *TLSConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.StringVar(&cfg.CAPath, prefix+"http.tls-ca-path", "", "Path to the CA certificates to validate server certificate against. If not set, the host's root CA certificates are used.") + f.StringVar(&cfg.CertPath, prefix+"http.tls-cert-path", "", "Path to the client certificate, which will be used for authenticating with the server. Also requires the key path to be configured.") + f.StringVar(&cfg.KeyPath, prefix+"http.tls-key-path", "", "Path to the key for the client certificate. Also requires the client certificate to be configured.") + f.StringVar(&cfg.ServerName, prefix+"http.tls-server-name", "", "Override the expected name on the server certificate.") } diff --git a/pkg/storage/bucket/s3/config.go b/pkg/storage/bucket/s3/config.go index 67c412de6d..053e35209d 100644 --- a/pkg/storage/bucket/s3/config.go +++ b/pkg/storage/bucket/s3/config.go @@ -4,10 +4,8 @@ import ( "encoding/json" "flag" "fmt" - "net/http" "slices" "strings" - "time" s3_service "github.com/aws/aws-sdk-go/service/s3" "github.com/grafana/dskit/flagext" @@ -15,6 +13,7 @@ import ( "github.com/pkg/errors" "github.com/thanos-io/objstore/providers/s3" + "github.com/grafana/loki/v3/pkg/storage/bucket/http" "github.com/grafana/loki/v3/pkg/util" ) @@ -55,52 +54,6 @@ func thanosS3BucketLookupTypesValues() (list []string) { return list } -// HTTPConfig stores the http.Transport configuration for the s3 minio client. -type HTTPConfig struct { - IdleConnTimeout time.Duration `yaml:"idle_conn_timeout" category:"advanced"` - ResponseHeaderTimeout time.Duration `yaml:"response_header_timeout" category:"advanced"` - InsecureSkipVerify bool `yaml:"insecure_skip_verify" category:"advanced"` - TLSHandshakeTimeout time.Duration `yaml:"tls_handshake_timeout" category:"advanced"` - ExpectContinueTimeout time.Duration `yaml:"expect_continue_timeout" category:"advanced"` - MaxIdleConns int `yaml:"max_idle_connections" category:"advanced"` - MaxIdleConnsPerHost int `yaml:"max_idle_connections_per_host" category:"advanced"` - MaxConnsPerHost int `yaml:"max_connections_per_host" category:"advanced"` - - // Allow upstream callers to inject a round tripper - Transport http.RoundTripper `yaml:"-"` - - TLSConfig TLSConfig `yaml:",inline"` -} - -// TLSConfig configures the options for TLS connections. -type TLSConfig struct { - CAPath string `yaml:"tls_ca_path" category:"advanced"` - CertPath string `yaml:"tls_cert_path" category:"advanced"` - KeyPath string `yaml:"tls_key_path" category:"advanced"` - ServerName string `yaml:"tls_server_name" category:"advanced"` -} - -// RegisterFlagsWithPrefix registers the flags for s3 storage with the provided prefix -func (cfg *HTTPConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - f.DurationVar(&cfg.IdleConnTimeout, prefix+"s3.http.idle-conn-timeout", 90*time.Second, "The time an idle connection will remain idle before closing.") - f.DurationVar(&cfg.ResponseHeaderTimeout, prefix+"s3.http.response-header-timeout", 2*time.Minute, "The amount of time the client will wait for a servers response headers.") - f.BoolVar(&cfg.InsecureSkipVerify, prefix+"s3.http.insecure-skip-verify", false, "If the client connects to S3 via HTTPS and this option is enabled, the client will accept any certificate and hostname.") - f.DurationVar(&cfg.TLSHandshakeTimeout, prefix+"s3.tls-handshake-timeout", 10*time.Second, "Maximum time to wait for a TLS handshake. 0 means no limit.") - f.DurationVar(&cfg.ExpectContinueTimeout, prefix+"s3.expect-continue-timeout", 1*time.Second, "The time to wait for a server's first response headers after fully writing the request headers if the request has an Expect header. 0 to send the request body immediately.") - f.IntVar(&cfg.MaxIdleConns, prefix+"s3.max-idle-connections", 100, "Maximum number of idle (keep-alive) connections across all hosts. 0 means no limit.") - f.IntVar(&cfg.MaxIdleConnsPerHost, prefix+"s3.max-idle-connections-per-host", 100, "Maximum number of idle (keep-alive) connections to keep per-host. If 0, a built-in default value is used.") - f.IntVar(&cfg.MaxConnsPerHost, prefix+"s3.max-connections-per-host", 0, "Maximum number of connections per host. 0 means no limit.") - cfg.TLSConfig.RegisterFlagsWithPrefix(prefix, f) -} - -// RegisterFlagsWithPrefix registers the flags for s3 storage with the provided prefix. -func (cfg *TLSConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - f.StringVar(&cfg.CAPath, prefix+"s3.http.tls-ca-path", "", "Path to the CA certificates to validate server certificate against. If not set, the host's root CA certificates are used.") - f.StringVar(&cfg.CertPath, prefix+"s3.http.tls-cert-path", "", "Path to the client certificate, which will be used for authenticating with the server. Also requires the key path to be configured.") - f.StringVar(&cfg.KeyPath, prefix+"s3.http.tls-key-path", "", "Path to the key for the client certificate. Also requires the client certificate to be configured.") - f.StringVar(&cfg.ServerName, prefix+"s3.http.tls-server-name", "", "Override the expected name on the server certificate.") -} - // Config holds the config options for an S3 backend type Config struct { Endpoint string `yaml:"endpoint"` @@ -121,7 +74,7 @@ type Config struct { MaxRetries int `yaml:"max_retries"` SSE SSEConfig `yaml:"sse"` - HTTP HTTPConfig `yaml:"http"` + HTTP http.Config `yaml:"http"` TraceConfig TraceConfig `yaml:"trace"` } @@ -149,7 +102,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.StringVar(&cfg.STSEndpoint, prefix+"s3.sts-endpoint", "", "Accessing S3 resources using temporary, secure credentials provided by AWS Security Token Service.") f.IntVar(&cfg.MaxRetries, prefix+"s3.max-retries", 10, "The maximum number of retries for S3 requests that are retryable. Default is 10, set this to 1 to disable retries.") cfg.SSE.RegisterFlagsWithPrefix(prefix+"s3.sse.", f) - cfg.HTTP.RegisterFlagsWithPrefix(prefix, f) + cfg.HTTP.RegisterFlagsWithPrefix(prefix+"s3.", f) cfg.TraceConfig.RegisterFlagsWithPrefix(prefix+"s3.trace.", f) } diff --git a/pkg/storage/bucket/swift/bucket_client.go b/pkg/storage/bucket/swift/bucket_client.go index 93ecdee9da..5f502f2fd1 100644 --- a/pkg/storage/bucket/swift/bucket_client.go +++ b/pkg/storage/bucket/swift/bucket_client.go @@ -13,31 +13,49 @@ import ( // NewBucketClient creates a new Swift bucket client func NewBucketClient(cfg Config, _ string, logger log.Logger, wrapper func(http.RoundTripper) http.RoundTripper) (objstore.Bucket, error) { bucketConfig := swift.Config{ - AuthVersion: cfg.AuthVersion, - AuthUrl: cfg.AuthURL, - Username: cfg.Username, - UserDomainName: cfg.UserDomainName, - UserDomainID: cfg.UserDomainID, - UserId: cfg.UserID, - Password: cfg.Password, - DomainId: cfg.DomainID, - DomainName: cfg.DomainName, - ProjectID: cfg.ProjectID, - ProjectName: cfg.ProjectName, - ProjectDomainID: cfg.ProjectDomainID, - ProjectDomainName: cfg.ProjectDomainName, - RegionName: cfg.RegionName, - ContainerName: cfg.ContainerName, - Retries: cfg.MaxRetries, - ConnectTimeout: model.Duration(cfg.ConnectTimeout), - Timeout: model.Duration(cfg.RequestTimeout), + ApplicationCredentialID: cfg.ApplicationCredentialID, + ApplicationCredentialName: cfg.ApplicationCredentialName, + ApplicationCredentialSecret: cfg.ApplicationCredentialSecret.String(), + AuthVersion: cfg.AuthVersion, + AuthUrl: cfg.AuthURL, + Username: cfg.Username, + UserDomainName: cfg.UserDomainName, + UserDomainID: cfg.UserDomainID, + UserId: cfg.UserID, + Password: cfg.Password.String(), + DomainId: cfg.DomainID, + DomainName: cfg.DomainName, + ProjectID: cfg.ProjectID, + ProjectName: cfg.ProjectName, + ProjectDomainID: cfg.ProjectDomainID, + ProjectDomainName: cfg.ProjectDomainName, + RegionName: cfg.RegionName, + ContainerName: cfg.ContainerName, + Retries: cfg.MaxRetries, + ConnectTimeout: model.Duration(cfg.ConnectTimeout), + Timeout: model.Duration(cfg.RequestTimeout), + HTTPConfig: exthttp.HTTPConfig{ + IdleConnTimeout: model.Duration(cfg.HTTP.IdleConnTimeout), + ResponseHeaderTimeout: model.Duration(cfg.HTTP.ResponseHeaderTimeout), + InsecureSkipVerify: cfg.HTTP.InsecureSkipVerify, + TLSHandshakeTimeout: model.Duration(cfg.HTTP.TLSHandshakeTimeout), + ExpectContinueTimeout: model.Duration(cfg.HTTP.ExpectContinueTimeout), + MaxIdleConns: cfg.HTTP.MaxIdleConns, + MaxIdleConnsPerHost: cfg.HTTP.MaxIdleConnsPerHost, + MaxConnsPerHost: cfg.HTTP.MaxConnsPerHost, + Transport: cfg.HTTP.Transport, + TLSConfig: exthttp.TLSConfig{ + CAFile: cfg.HTTP.TLSConfig.CAPath, + CertFile: cfg.HTTP.TLSConfig.CertPath, + KeyFile: cfg.HTTP.TLSConfig.KeyPath, + ServerName: cfg.HTTP.TLSConfig.ServerName, + }, + }, // Hard-coded defaults. ChunkSize: swift.DefaultConfig.ChunkSize, UseDynamicLargeObjects: false, - HTTPConfig: exthttp.DefaultHTTPConfig, } - bucketConfig.HTTPConfig.Transport = cfg.Transport return swift.NewContainerFromConfig(logger, &bucketConfig, false, wrapper) } diff --git a/pkg/storage/bucket/swift/config.go b/pkg/storage/bucket/swift/config.go index 22717efcc8..5219387390 100644 --- a/pkg/storage/bucket/swift/config.go +++ b/pkg/storage/bucket/swift/config.go @@ -2,34 +2,37 @@ package swift import ( "flag" - "net/http" "time" + + "github.com/grafana/dskit/flagext" + + "github.com/grafana/loki/v3/pkg/storage/bucket/http" ) // Config holds the config options for Swift backend type Config struct { - AuthVersion int `yaml:"auth_version"` - AuthURL string `yaml:"auth_url"` - Internal bool `yaml:"internal"` - Username string `yaml:"username"` - UserDomainName string `yaml:"user_domain_name"` - UserDomainID string `yaml:"user_domain_id"` - UserID string `yaml:"user_id"` - Password string `yaml:"password"` - DomainID string `yaml:"domain_id"` - DomainName string `yaml:"domain_name"` - ProjectID string `yaml:"project_id"` - ProjectName string `yaml:"project_name"` - ProjectDomainID string `yaml:"project_domain_id"` - ProjectDomainName string `yaml:"project_domain_name"` - RegionName string `yaml:"region_name"` - ContainerName string `yaml:"container_name"` - MaxRetries int `yaml:"max_retries"` - ConnectTimeout time.Duration `yaml:"connect_timeout"` - RequestTimeout time.Duration `yaml:"request_timeout"` - - // Allow upstream callers to inject a round tripper - Transport http.RoundTripper `yaml:"-"` + ApplicationCredentialID string `yaml:"application_credential_id"` + ApplicationCredentialName string `yaml:"application_credential_name"` + ApplicationCredentialSecret flagext.Secret `yaml:"application_credential_secret"` + AuthVersion int `yaml:"auth_version"` + AuthURL string `yaml:"auth_url"` + Username string `yaml:"username"` + UserDomainName string `yaml:"user_domain_name"` + UserDomainID string `yaml:"user_domain_id"` + UserID string `yaml:"user_id"` + Password flagext.Secret `yaml:"password"` + DomainID string `yaml:"domain_id"` + DomainName string `yaml:"domain_name"` + ProjectID string `yaml:"project_id"` + ProjectName string `yaml:"project_name"` + ProjectDomainID string `yaml:"project_domain_id"` + ProjectDomainName string `yaml:"project_domain_name"` + RegionName string `yaml:"region_name"` + ContainerName string `yaml:"container_name"` + MaxRetries int `yaml:"max_retries" category:"advanced"` + ConnectTimeout time.Duration `yaml:"connect_timeout" category:"advanced"` + RequestTimeout time.Duration `yaml:"request_timeout" category:"advanced"` + HTTP http.Config `yaml:"http"` } // RegisterFlags registers the flags for Swift storage @@ -39,14 +42,16 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { // RegisterFlagsWithPrefix registers the flags for Swift storage with the provided prefix func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.StringVar(&cfg.ApplicationCredentialID, prefix+"swift.application-credential-id", "", "OpenStack Swift application credential id") + f.StringVar(&cfg.ApplicationCredentialName, prefix+"swift.application-credential-name", "", "OpenStack Swift application credential name") + f.Var(&cfg.ApplicationCredentialSecret, prefix+"swift.application-credential-secret", "OpenStack Swift application credential secret") f.IntVar(&cfg.AuthVersion, prefix+"swift.auth-version", 0, "OpenStack Swift authentication API version. 0 to autodetect.") f.StringVar(&cfg.AuthURL, prefix+"swift.auth-url", "", "OpenStack Swift authentication URL") - f.BoolVar(&cfg.Internal, prefix+"swift.internal", false, "Set this to true to use the internal OpenStack Swift endpoint URL") f.StringVar(&cfg.Username, prefix+"swift.username", "", "OpenStack Swift username.") f.StringVar(&cfg.UserDomainName, prefix+"swift.user-domain-name", "", "OpenStack Swift user's domain name.") f.StringVar(&cfg.UserDomainID, prefix+"swift.user-domain-id", "", "OpenStack Swift user's domain ID.") f.StringVar(&cfg.UserID, prefix+"swift.user-id", "", "OpenStack Swift user ID.") - f.StringVar(&cfg.Password, prefix+"swift.password", "", "OpenStack Swift API key.") + f.Var(&cfg.Password, prefix+"swift.password", "OpenStack Swift API key.") f.StringVar(&cfg.DomainID, prefix+"swift.domain-id", "", "OpenStack Swift user's domain ID.") f.StringVar(&cfg.DomainName, prefix+"swift.domain-name", "", "OpenStack Swift user's domain name.") f.StringVar(&cfg.ProjectID, prefix+"swift.project-id", "", "OpenStack Swift project ID (v2,v3 auth only).") @@ -58,8 +63,5 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.IntVar(&cfg.MaxRetries, prefix+"swift.max-retries", 3, "Max retries on requests error.") f.DurationVar(&cfg.ConnectTimeout, prefix+"swift.connect-timeout", 10*time.Second, "Time after which a connection attempt is aborted.") f.DurationVar(&cfg.RequestTimeout, prefix+"swift.request-timeout", 5*time.Second, "Time after which an idle request is aborted. The timeout watchdog is reset each time some data is received, so the timeout triggers after X time no data is received on a request.") -} - -func (cfg *Config) Validate() error { - return nil + cfg.HTTP.RegisterFlagsWithPrefix(prefix+"swift.", f) } diff --git a/pkg/storage/chunk/client/openstack/swift_object_client.go b/pkg/storage/chunk/client/openstack/swift_object_client.go index 03721d3e16..63de9458e7 100644 --- a/pkg/storage/chunk/client/openstack/swift_object_client.go +++ b/pkg/storage/chunk/client/openstack/swift_object_client.go @@ -3,27 +3,62 @@ package openstack import ( "bytes" "context" + "crypto/tls" + "crypto/x509" "flag" "fmt" "io" "net/http" + "os" "time" + "github.com/grafana/dskit/flagext" swift "github.com/ncw/swift/v2" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - bucket_swift "github.com/grafana/loki/v3/pkg/storage/bucket/swift" "github.com/grafana/loki/v3/pkg/storage/chunk/client" "github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging" "github.com/grafana/loki/v3/pkg/util/log" ) -var defaultTransport http.RoundTripper = &http.Transport{ - Proxy: http.ProxyFromEnvironment, - MaxIdleConnsPerHost: 200, - MaxIdleConns: 200, - ExpectContinueTimeout: 5 * time.Second, +// Config stores the http.Client configuration for the storage clients. +type HTTPConfig struct { + Transport http.RoundTripper `yaml:"-"` + TLSConfig TLSConfig `yaml:",inline"` +} + +// TLSConfig configures the options for TLS connections. +type TLSConfig struct { + CAPath string `yaml:"tls_ca_path" category:"advanced"` +} + +func defaultTransport(config HTTPConfig) (http.RoundTripper, error) { + if config.Transport != nil { + return config.Transport, nil + } + + tlsConfig := &tls.Config{} + if len(config.TLSConfig.CAPath) > 0 { + caPath := config.TLSConfig.CAPath + data, err := os.ReadFile(caPath) + if err != nil { + return nil, fmt.Errorf("unable to load specified CA cert %s: %s", caPath, err) + } + caCertPool := x509.NewCertPool() + if !caCertPool.AppendCertsFromPEM(data) { + return nil, fmt.Errorf("unable to use specified CA cert %s", caPath) + } + tlsConfig.RootCAs = caCertPool + } + + return &http.Transport{ + Proxy: http.ProxyFromEnvironment, + MaxIdleConns: 200, + MaxIdleConnsPerHost: 200, + ExpectContinueTimeout: 5 * time.Second, + TLSClientConfig: tlsConfig, + }, nil } type SwiftObjectClient struct { @@ -34,7 +69,26 @@ type SwiftObjectClient struct { // SwiftConfig is config for the Swift Chunk Client. type SwiftConfig struct { - bucket_swift.Config `yaml:",inline"` + AuthVersion int `yaml:"auth_version"` + AuthURL string `yaml:"auth_url"` + Internal bool `yaml:"internal"` + Username string `yaml:"username"` + UserDomainName string `yaml:"user_domain_name"` + UserDomainID string `yaml:"user_domain_id"` + UserID string `yaml:"user_id"` + Password flagext.Secret `yaml:"password"` + DomainID string `yaml:"domain_id"` + DomainName string `yaml:"domain_name"` + ProjectID string `yaml:"project_id"` + ProjectName string `yaml:"project_name"` + ProjectDomainID string `yaml:"project_domain_id"` + ProjectDomainName string `yaml:"project_domain_name"` + RegionName string `yaml:"region_name"` + ContainerName string `yaml:"container_name"` + MaxRetries int `yaml:"max_retries" category:"advanced"` + ConnectTimeout time.Duration `yaml:"connect_timeout" category:"advanced"` + RequestTimeout time.Duration `yaml:"request_timeout" category:"advanced"` + HTTP HTTPConfig `yaml:"http"` } // RegisterFlags registers flags. @@ -49,7 +103,26 @@ func (cfg *SwiftConfig) Validate() error { // RegisterFlagsWithPrefix registers flags with prefix. func (cfg *SwiftConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - cfg.Config.RegisterFlagsWithPrefix(prefix, f) + f.IntVar(&cfg.AuthVersion, prefix+"swift.auth-version", 0, "OpenStack Swift authentication API version. 0 to autodetect.") + f.StringVar(&cfg.AuthURL, prefix+"swift.auth-url", "", "OpenStack Swift authentication URL") + f.BoolVar(&cfg.Internal, prefix+"swift.internal", false, "Set this to true to use the internal OpenStack Swift endpoint URL") + f.StringVar(&cfg.Username, prefix+"swift.username", "", "OpenStack Swift username.") + f.StringVar(&cfg.UserDomainName, prefix+"swift.user-domain-name", "", "OpenStack Swift user's domain name.") + f.StringVar(&cfg.UserDomainID, prefix+"swift.user-domain-id", "", "OpenStack Swift user's domain ID.") + f.StringVar(&cfg.UserID, prefix+"swift.user-id", "", "OpenStack Swift user ID.") + f.Var(&cfg.Password, prefix+"swift.password", "OpenStack Swift API key.") + f.StringVar(&cfg.DomainID, prefix+"swift.domain-id", "", "OpenStack Swift user's domain ID.") + f.StringVar(&cfg.DomainName, prefix+"swift.domain-name", "", "OpenStack Swift user's domain name.") + f.StringVar(&cfg.ProjectID, prefix+"swift.project-id", "", "OpenStack Swift project ID (v2,v3 auth only).") + f.StringVar(&cfg.ProjectName, prefix+"swift.project-name", "", "OpenStack Swift project name (v2,v3 auth only).") + f.StringVar(&cfg.ProjectDomainID, prefix+"swift.project-domain-id", "", "ID of the OpenStack Swift project's domain (v3 auth only), only needed if it differs the from user domain.") + f.StringVar(&cfg.ProjectDomainName, prefix+"swift.project-domain-name", "", "Name of the OpenStack Swift project's domain (v3 auth only), only needed if it differs from the user domain.") + f.StringVar(&cfg.RegionName, prefix+"swift.region-name", "", "OpenStack Swift Region to use (v2,v3 auth only).") + f.StringVar(&cfg.ContainerName, prefix+"swift.container-name", "", "Name of the OpenStack Swift container to put chunks in.") + f.IntVar(&cfg.MaxRetries, prefix+"swift.max-retries", 3, "Max retries on requests error.") + f.DurationVar(&cfg.ConnectTimeout, prefix+"swift.connect-timeout", 10*time.Second, "Time after which a connection attempt is aborted.") + f.DurationVar(&cfg.RequestTimeout, prefix+"swift.request-timeout", 5*time.Second, "Time after which an idle request is aborted. The timeout watchdog is reset each time some data is received, so the timeout triggers after X time no data is received on a request.") + f.StringVar(&cfg.HTTP.TLSConfig.CAPath, prefix+"swift.http.tls-ca-path", "", "Path to the CA certificates to validate server certificate against. If not set, the host's root CA certificates are used.") } // NewSwiftObjectClient makes a new chunk.Client that writes chunks to OpenStack Swift. @@ -61,7 +134,7 @@ func NewSwiftObjectClient(cfg SwiftConfig, hedgingCfg hedging.Config) (*SwiftObj return nil, err } // Ensure the container is created, no error is returned if it already exists. - if err := c.ContainerCreate(context.Background(), cfg.Config.ContainerName, nil); err != nil { + if err := c.ContainerCreate(context.Background(), cfg.ContainerName, nil); err != nil { return nil, err } hedging, err := createConnection(cfg, hedgingCfg, true) @@ -76,32 +149,36 @@ func NewSwiftObjectClient(cfg SwiftConfig, hedgingCfg hedging.Config) (*SwiftObj } func createConnection(cfg SwiftConfig, hedgingCfg hedging.Config, hedging bool) (*swift.Connection, error) { - // Create a connection + defaultTransport, err := defaultTransport(cfg.HTTP) + if err != nil { + return nil, err + } + c := &swift.Connection{ - AuthVersion: cfg.Config.AuthVersion, - AuthUrl: cfg.Config.AuthURL, - Internal: cfg.Config.Internal, - ApiKey: cfg.Config.Password, - UserName: cfg.Config.Username, - UserId: cfg.Config.UserID, - Retries: cfg.Config.MaxRetries, - ConnectTimeout: cfg.Config.ConnectTimeout, - Timeout: cfg.Config.RequestTimeout, - TenantId: cfg.Config.ProjectID, - Tenant: cfg.Config.ProjectName, - TenantDomain: cfg.Config.ProjectDomainName, - TenantDomainId: cfg.Config.ProjectDomainID, - Domain: cfg.Config.DomainName, - DomainId: cfg.Config.DomainID, - Region: cfg.Config.RegionName, + AuthVersion: cfg.AuthVersion, + AuthUrl: cfg.AuthURL, + Internal: cfg.Internal, + ApiKey: cfg.Password.String(), + UserName: cfg.Username, + UserId: cfg.UserID, + Retries: cfg.MaxRetries, + ConnectTimeout: cfg.ConnectTimeout, + Timeout: cfg.RequestTimeout, + TenantId: cfg.ProjectID, + Tenant: cfg.ProjectName, + TenantDomain: cfg.ProjectDomainName, + TenantDomainId: cfg.ProjectDomainID, + Domain: cfg.DomainName, + DomainId: cfg.DomainID, + Region: cfg.RegionName, Transport: defaultTransport, } switch { - case cfg.Config.UserDomainName != "": - c.Domain = cfg.Config.UserDomainName - case cfg.Config.UserDomainID != "": - c.DomainId = cfg.Config.UserDomainID + case cfg.UserDomainName != "": + c.Domain = cfg.UserDomainName + case cfg.UserDomainID != "": + c.DomainId = cfg.UserDomainID } if hedging { var err error @@ -111,7 +188,8 @@ func createConnection(cfg SwiftConfig, hedgingCfg hedging.Config, hedging bool) } } - err := c.Authenticate(context.TODO()) + // Create a connection + err = c.Authenticate(context.TODO()) if err != nil { return nil, err } @@ -135,7 +213,7 @@ func (s *SwiftObjectClient) ObjectExists(ctx context.Context, objectKey string) } func (s *SwiftObjectClient) GetAttributes(ctx context.Context, objectKey string) (client.ObjectAttributes, error) { - info, _, err := s.hedgingConn.Object(ctx, s.cfg.Config.ContainerName, objectKey) + info, _, err := s.hedgingConn.Object(ctx, s.cfg.ContainerName, objectKey) if err != nil { return client.ObjectAttributes{}, nil } @@ -146,7 +224,7 @@ func (s *SwiftObjectClient) GetAttributes(ctx context.Context, objectKey string) // GetObject returns a reader and the size for the specified object key from the configured swift container. func (s *SwiftObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) { var buf bytes.Buffer - _, err := s.hedgingConn.ObjectGet(ctx, s.cfg.Config.ContainerName, objectKey, &buf, false, nil) + _, err := s.hedgingConn.ObjectGet(ctx, s.cfg.ContainerName, objectKey, &buf, false, nil) if err != nil { return nil, 0, err } @@ -160,7 +238,7 @@ func (s *SwiftObjectClient) GetObjectRange(ctx context.Context, objectKey string h := swift.Headers{ "Range": fmt.Sprintf("bytes=%d-%d", offset, offset+length-1), } - _, err := s.hedgingConn.ObjectGet(ctx, s.cfg.Config.ContainerName, objectKey, &buf, false, h) + _, err := s.hedgingConn.ObjectGet(ctx, s.cfg.ContainerName, objectKey, &buf, false, h) if err != nil { return nil, err } @@ -170,7 +248,7 @@ func (s *SwiftObjectClient) GetObjectRange(ctx context.Context, objectKey string // PutObject puts the specified bytes into the configured Swift container at the provided key func (s *SwiftObjectClient) PutObject(ctx context.Context, objectKey string, object io.Reader) error { - _, err := s.conn.ObjectPut(ctx, s.cfg.Config.ContainerName, objectKey, object, false, "", "", nil) + _, err := s.conn.ObjectPut(ctx, s.cfg.ContainerName, objectKey, object, false, "", "", nil) return err } @@ -187,7 +265,7 @@ func (s *SwiftObjectClient) List(ctx context.Context, prefix, delimiter string) opts.Delimiter = []rune(delimiter)[0] } - objs, err := s.conn.ObjectsAll(ctx, s.cfg.Config.ContainerName, opts) + objs, err := s.conn.ObjectsAll(ctx, s.cfg.ContainerName, opts) if err != nil { return nil, nil, err } @@ -214,7 +292,7 @@ func (s *SwiftObjectClient) List(ctx context.Context, prefix, delimiter string) // DeleteObject deletes the specified object key from the configured Swift container. func (s *SwiftObjectClient) DeleteObject(ctx context.Context, objectKey string) error { - return s.conn.ObjectDelete(ctx, s.cfg.Config.ContainerName, objectKey) + return s.conn.ObjectDelete(ctx, s.cfg.ContainerName, objectKey) } // IsObjectNotFoundErr returns true if error means that object is not found. Relevant to GetObject and DeleteObject operations. @@ -223,4 +301,8 @@ func (s *SwiftObjectClient) IsObjectNotFoundErr(err error) bool { } // TODO(dannyk): implement for client -func (s *SwiftObjectClient) IsRetryableErr(error) bool { return false } +func IsRetryableErr(error) bool { return false } + +func (s *SwiftObjectClient) IsRetryableErr(err error) bool { + return IsRetryableErr(err) +} diff --git a/pkg/storage/chunk/client/openstack/swift_object_client_test.go b/pkg/storage/chunk/client/openstack/swift_object_client_test.go index efcd2807fd..6f970102c1 100644 --- a/pkg/storage/chunk/client/openstack/swift_object_client_test.go +++ b/pkg/storage/chunk/client/openstack/swift_object_client_test.go @@ -10,7 +10,8 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/atomic" - "github.com/grafana/loki/v3/pkg/storage/bucket/swift" + "github.com/grafana/dskit/flagext" + "github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging" ) @@ -61,7 +62,7 @@ func Test_Hedging(t *testing.T) { t.Run(tc.name, func(t *testing.T) { count := atomic.NewInt32(0) // hijack the transport to count the number of calls - defaultTransport = RoundTripperFunc(func(req *http.Request) (*http.Response, error) { + transportCounter := RoundTripperFunc(func(req *http.Request) (*http.Response, error) { // fake auth if req.Header.Get("X-Auth-Key") == "passwd" { return &http.Response{ @@ -89,13 +90,14 @@ func Test_Hedging(t *testing.T) { }) c, err := NewSwiftObjectClient(SwiftConfig{ - Config: swift.Config{ - MaxRetries: 1, - ContainerName: "foo", - AuthVersion: 1, - Password: "passwd", - ConnectTimeout: 10 * time.Second, - RequestTimeout: 10 * time.Second, + MaxRetries: 1, + ContainerName: "foo", + AuthVersion: 1, + Password: flagext.SecretWithValue("passwd"), + ConnectTimeout: 10 * time.Second, + RequestTimeout: 10 * time.Second, + HTTP: HTTPConfig{ + Transport: transportCounter, }, }, hedging.Config{ At: tc.hedgeAt,