feat: Add objstore support for Swift using thanos.io/objstore (#11672)

Co-authored-by: Joao Marcal <jmarcal@redhat.com>
Co-authored-by: Ashwanth Goli <iamashwanth@gmail.com>
pull/15098/head
Bayan Taani 6 months ago committed by GitHub
parent 5d5affae3b
commit 44523e085d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 6
      docs/sources/shared/configuration.md
  2. 2
      pkg/logqlmodel/stats/context.go
  3. 8
      pkg/loki/config_wrapper_test.go
  4. 2
      pkg/storage/bucket/client.go
  5. 31
      pkg/storage/bucket/http/config.go
  6. 53
      pkg/storage/bucket/s3/config.go
  7. 58
      pkg/storage/bucket/swift/bucket_client.go
  8. 60
      pkg/storage/bucket/swift/config.go
  9. 158
      pkg/storage/chunk/client/openstack/swift_object_client.go
  10. 20
      pkg/storage/chunk/client/openstack/swift_object_client_test.go

@ -5976,6 +5976,12 @@ The `swift_storage_config` block configures the connection to OpenStack Object S
# is received on a request.
# CLI flag: -<prefix>.swift.request-timeout
[request_timeout: <duration> | default = 5s]
http:
# Path to the CA certificates to validate server certificate against. If not
# set, the host's root CA certificates are used.
# CLI flag: -<prefix>.swift.http.tls-ca-path
[tls_ca_path: <string> | default = ""]
```
### table_manager

@ -108,7 +108,7 @@ func (c *Context) Index() Index {
return c.index
}
// Merge index stats from multiple respones in a concurrency-safe manner
// Merge index stats from multiple response in a concurrency-safe manner
func (c *Context) MergeIndex(i Index) {
c.mtx.Lock()
defer c.mtx.Unlock()

@ -547,9 +547,9 @@ memberlist:
assert.Equal(t, "swift", config.Ruler.StoreConfig.Type)
for _, actual := range []swift.Config{
config.Ruler.StoreConfig.Swift.Config,
config.StorageConfig.Swift.Config,
for _, actual := range []openstack.SwiftConfig{
config.Ruler.StoreConfig.Swift,
config.StorageConfig.Swift,
} {
assert.Equal(t, 3, actual.AuthVersion)
assert.Equal(t, "http://example.com", actual.AuthURL)
@ -557,7 +557,7 @@ memberlist:
assert.Equal(t, "example.com", actual.UserDomainName)
assert.Equal(t, "1", actual.UserDomainID)
assert.Equal(t, "27", actual.UserID)
assert.Equal(t, "supersecret", actual.Password)
assert.Equal(t, flagext.SecretWithValue("supersecret"), actual.Password)
assert.Equal(t, "2", actual.DomainID)
assert.Equal(t, "test.com", actual.DomainName)
assert.Equal(t, "13", actual.ProjectID)

@ -176,7 +176,7 @@ func (cfg *Config) configureTransport(backend string, rt http.RoundTripper) erro
case Azure:
cfg.Azure.Transport = rt
case Swift:
cfg.Swift.Transport = rt
cfg.Swift.HTTP.Transport = rt
case Filesystem, Alibaba, BOS:
// do nothing
default:

@ -2,6 +2,7 @@ package http
import (
"flag"
"net/http"
"time"
)
@ -15,7 +16,19 @@ type Config struct {
MaxIdleConns int `yaml:"max_idle_connections"`
MaxIdleConnsPerHost int `yaml:"max_idle_connections_per_host"`
MaxConnsPerHost int `yaml:"max_connections_per_host"`
CAFile string `yaml:"ca_file"`
// Allow upstream callers to inject a round tripper
Transport http.RoundTripper `yaml:"-"`
TLSConfig TLSConfig `yaml:",inline"`
}
// TLSConfig configures the options for TLS connections.
type TLSConfig struct {
CAPath string `yaml:"tls_ca_path" category:"advanced"`
CertPath string `yaml:"tls_cert_path" category:"advanced"`
KeyPath string `yaml:"tls_key_path" category:"advanced"`
ServerName string `yaml:"tls_server_name" category:"advanced"`
}
// RegisterFlags registers the flags for the storage HTTP client.
@ -25,13 +38,21 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
// RegisterFlagsWithPrefix registers the flags for the storage HTTP client with the provided prefix.
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&cfg.IdleConnTimeout, prefix+"idle-conn-timeout", 90*time.Second, "The time an idle connection will remain idle before closing.")
f.DurationVar(&cfg.ResponseHeaderTimeout, prefix+"response-header-timeout", 2*time.Minute, "The amount of time the client will wait for a servers response headers.")
f.BoolVar(&cfg.InsecureSkipVerify, prefix+"insecure-skip-verify", false, "If the client connects via HTTPS and this option is enabled, the client will accept any certificate and hostname.")
f.DurationVar(&cfg.IdleConnTimeout, prefix+"http.idle-conn-timeout", 90*time.Second, "The time an idle connection will remain idle before closing.")
f.DurationVar(&cfg.ResponseHeaderTimeout, prefix+"http.response-header-timeout", 2*time.Minute, "The amount of time the client will wait for a servers response headers.")
f.BoolVar(&cfg.InsecureSkipVerify, prefix+"http.insecure-skip-verify", false, "If the client connects via HTTPS and this option is enabled, the client will accept any certificate and hostname.")
f.DurationVar(&cfg.TLSHandshakeTimeout, prefix+"tls-handshake-timeout", 10*time.Second, "Maximum time to wait for a TLS handshake. 0 means no limit.")
f.DurationVar(&cfg.ExpectContinueTimeout, prefix+"expect-continue-timeout", 1*time.Second, "The time to wait for a server's first response headers after fully writing the request headers if the request has an Expect header. 0 to send the request body immediately.")
f.IntVar(&cfg.MaxIdleConns, prefix+"max-idle-connections", 100, "Maximum number of idle (keep-alive) connections across all hosts. 0 means no limit.")
f.IntVar(&cfg.MaxIdleConnsPerHost, prefix+"max-idle-connections-per-host", 100, "Maximum number of idle (keep-alive) connections to keep per-host. If 0, a built-in default value is used.")
f.IntVar(&cfg.MaxConnsPerHost, prefix+"max-connections-per-host", 0, "Maximum number of connections per host. 0 means no limit.")
f.StringVar(&cfg.CAFile, prefix+"ca-file", "", "Path to the trusted CA file that signed the SSL certificate of the object storage endpoint.")
cfg.TLSConfig.RegisterFlagsWithPrefix(prefix, f)
}
// RegisterFlagsWithPrefix registers the flags for s3 storage with the provided prefix.
func (cfg *TLSConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.CAPath, prefix+"http.tls-ca-path", "", "Path to the CA certificates to validate server certificate against. If not set, the host's root CA certificates are used.")
f.StringVar(&cfg.CertPath, prefix+"http.tls-cert-path", "", "Path to the client certificate, which will be used for authenticating with the server. Also requires the key path to be configured.")
f.StringVar(&cfg.KeyPath, prefix+"http.tls-key-path", "", "Path to the key for the client certificate. Also requires the client certificate to be configured.")
f.StringVar(&cfg.ServerName, prefix+"http.tls-server-name", "", "Override the expected name on the server certificate.")
}

@ -4,10 +4,8 @@ import (
"encoding/json"
"flag"
"fmt"
"net/http"
"slices"
"strings"
"time"
s3_service "github.com/aws/aws-sdk-go/service/s3"
"github.com/grafana/dskit/flagext"
@ -15,6 +13,7 @@ import (
"github.com/pkg/errors"
"github.com/thanos-io/objstore/providers/s3"
"github.com/grafana/loki/v3/pkg/storage/bucket/http"
"github.com/grafana/loki/v3/pkg/util"
)
@ -55,52 +54,6 @@ func thanosS3BucketLookupTypesValues() (list []string) {
return list
}
// HTTPConfig stores the http.Transport configuration for the s3 minio client.
type HTTPConfig struct {
IdleConnTimeout time.Duration `yaml:"idle_conn_timeout" category:"advanced"`
ResponseHeaderTimeout time.Duration `yaml:"response_header_timeout" category:"advanced"`
InsecureSkipVerify bool `yaml:"insecure_skip_verify" category:"advanced"`
TLSHandshakeTimeout time.Duration `yaml:"tls_handshake_timeout" category:"advanced"`
ExpectContinueTimeout time.Duration `yaml:"expect_continue_timeout" category:"advanced"`
MaxIdleConns int `yaml:"max_idle_connections" category:"advanced"`
MaxIdleConnsPerHost int `yaml:"max_idle_connections_per_host" category:"advanced"`
MaxConnsPerHost int `yaml:"max_connections_per_host" category:"advanced"`
// Allow upstream callers to inject a round tripper
Transport http.RoundTripper `yaml:"-"`
TLSConfig TLSConfig `yaml:",inline"`
}
// TLSConfig configures the options for TLS connections.
type TLSConfig struct {
CAPath string `yaml:"tls_ca_path" category:"advanced"`
CertPath string `yaml:"tls_cert_path" category:"advanced"`
KeyPath string `yaml:"tls_key_path" category:"advanced"`
ServerName string `yaml:"tls_server_name" category:"advanced"`
}
// RegisterFlagsWithPrefix registers the flags for s3 storage with the provided prefix
func (cfg *HTTPConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&cfg.IdleConnTimeout, prefix+"s3.http.idle-conn-timeout", 90*time.Second, "The time an idle connection will remain idle before closing.")
f.DurationVar(&cfg.ResponseHeaderTimeout, prefix+"s3.http.response-header-timeout", 2*time.Minute, "The amount of time the client will wait for a servers response headers.")
f.BoolVar(&cfg.InsecureSkipVerify, prefix+"s3.http.insecure-skip-verify", false, "If the client connects to S3 via HTTPS and this option is enabled, the client will accept any certificate and hostname.")
f.DurationVar(&cfg.TLSHandshakeTimeout, prefix+"s3.tls-handshake-timeout", 10*time.Second, "Maximum time to wait for a TLS handshake. 0 means no limit.")
f.DurationVar(&cfg.ExpectContinueTimeout, prefix+"s3.expect-continue-timeout", 1*time.Second, "The time to wait for a server's first response headers after fully writing the request headers if the request has an Expect header. 0 to send the request body immediately.")
f.IntVar(&cfg.MaxIdleConns, prefix+"s3.max-idle-connections", 100, "Maximum number of idle (keep-alive) connections across all hosts. 0 means no limit.")
f.IntVar(&cfg.MaxIdleConnsPerHost, prefix+"s3.max-idle-connections-per-host", 100, "Maximum number of idle (keep-alive) connections to keep per-host. If 0, a built-in default value is used.")
f.IntVar(&cfg.MaxConnsPerHost, prefix+"s3.max-connections-per-host", 0, "Maximum number of connections per host. 0 means no limit.")
cfg.TLSConfig.RegisterFlagsWithPrefix(prefix, f)
}
// RegisterFlagsWithPrefix registers the flags for s3 storage with the provided prefix.
func (cfg *TLSConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.CAPath, prefix+"s3.http.tls-ca-path", "", "Path to the CA certificates to validate server certificate against. If not set, the host's root CA certificates are used.")
f.StringVar(&cfg.CertPath, prefix+"s3.http.tls-cert-path", "", "Path to the client certificate, which will be used for authenticating with the server. Also requires the key path to be configured.")
f.StringVar(&cfg.KeyPath, prefix+"s3.http.tls-key-path", "", "Path to the key for the client certificate. Also requires the client certificate to be configured.")
f.StringVar(&cfg.ServerName, prefix+"s3.http.tls-server-name", "", "Override the expected name on the server certificate.")
}
// Config holds the config options for an S3 backend
type Config struct {
Endpoint string `yaml:"endpoint"`
@ -121,7 +74,7 @@ type Config struct {
MaxRetries int `yaml:"max_retries"`
SSE SSEConfig `yaml:"sse"`
HTTP HTTPConfig `yaml:"http"`
HTTP http.Config `yaml:"http"`
TraceConfig TraceConfig `yaml:"trace"`
}
@ -149,7 +102,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.STSEndpoint, prefix+"s3.sts-endpoint", "", "Accessing S3 resources using temporary, secure credentials provided by AWS Security Token Service.")
f.IntVar(&cfg.MaxRetries, prefix+"s3.max-retries", 10, "The maximum number of retries for S3 requests that are retryable. Default is 10, set this to 1 to disable retries.")
cfg.SSE.RegisterFlagsWithPrefix(prefix+"s3.sse.", f)
cfg.HTTP.RegisterFlagsWithPrefix(prefix, f)
cfg.HTTP.RegisterFlagsWithPrefix(prefix+"s3.", f)
cfg.TraceConfig.RegisterFlagsWithPrefix(prefix+"s3.trace.", f)
}

@ -13,31 +13,49 @@ import (
// NewBucketClient creates a new Swift bucket client
func NewBucketClient(cfg Config, _ string, logger log.Logger, wrapper func(http.RoundTripper) http.RoundTripper) (objstore.Bucket, error) {
bucketConfig := swift.Config{
AuthVersion: cfg.AuthVersion,
AuthUrl: cfg.AuthURL,
Username: cfg.Username,
UserDomainName: cfg.UserDomainName,
UserDomainID: cfg.UserDomainID,
UserId: cfg.UserID,
Password: cfg.Password,
DomainId: cfg.DomainID,
DomainName: cfg.DomainName,
ProjectID: cfg.ProjectID,
ProjectName: cfg.ProjectName,
ProjectDomainID: cfg.ProjectDomainID,
ProjectDomainName: cfg.ProjectDomainName,
RegionName: cfg.RegionName,
ContainerName: cfg.ContainerName,
Retries: cfg.MaxRetries,
ConnectTimeout: model.Duration(cfg.ConnectTimeout),
Timeout: model.Duration(cfg.RequestTimeout),
ApplicationCredentialID: cfg.ApplicationCredentialID,
ApplicationCredentialName: cfg.ApplicationCredentialName,
ApplicationCredentialSecret: cfg.ApplicationCredentialSecret.String(),
AuthVersion: cfg.AuthVersion,
AuthUrl: cfg.AuthURL,
Username: cfg.Username,
UserDomainName: cfg.UserDomainName,
UserDomainID: cfg.UserDomainID,
UserId: cfg.UserID,
Password: cfg.Password.String(),
DomainId: cfg.DomainID,
DomainName: cfg.DomainName,
ProjectID: cfg.ProjectID,
ProjectName: cfg.ProjectName,
ProjectDomainID: cfg.ProjectDomainID,
ProjectDomainName: cfg.ProjectDomainName,
RegionName: cfg.RegionName,
ContainerName: cfg.ContainerName,
Retries: cfg.MaxRetries,
ConnectTimeout: model.Duration(cfg.ConnectTimeout),
Timeout: model.Duration(cfg.RequestTimeout),
HTTPConfig: exthttp.HTTPConfig{
IdleConnTimeout: model.Duration(cfg.HTTP.IdleConnTimeout),
ResponseHeaderTimeout: model.Duration(cfg.HTTP.ResponseHeaderTimeout),
InsecureSkipVerify: cfg.HTTP.InsecureSkipVerify,
TLSHandshakeTimeout: model.Duration(cfg.HTTP.TLSHandshakeTimeout),
ExpectContinueTimeout: model.Duration(cfg.HTTP.ExpectContinueTimeout),
MaxIdleConns: cfg.HTTP.MaxIdleConns,
MaxIdleConnsPerHost: cfg.HTTP.MaxIdleConnsPerHost,
MaxConnsPerHost: cfg.HTTP.MaxConnsPerHost,
Transport: cfg.HTTP.Transport,
TLSConfig: exthttp.TLSConfig{
CAFile: cfg.HTTP.TLSConfig.CAPath,
CertFile: cfg.HTTP.TLSConfig.CertPath,
KeyFile: cfg.HTTP.TLSConfig.KeyPath,
ServerName: cfg.HTTP.TLSConfig.ServerName,
},
},
// Hard-coded defaults.
ChunkSize: swift.DefaultConfig.ChunkSize,
UseDynamicLargeObjects: false,
HTTPConfig: exthttp.DefaultHTTPConfig,
}
bucketConfig.HTTPConfig.Transport = cfg.Transport
return swift.NewContainerFromConfig(logger, &bucketConfig, false, wrapper)
}

@ -2,34 +2,37 @@ package swift
import (
"flag"
"net/http"
"time"
"github.com/grafana/dskit/flagext"
"github.com/grafana/loki/v3/pkg/storage/bucket/http"
)
// Config holds the config options for Swift backend
type Config struct {
AuthVersion int `yaml:"auth_version"`
AuthURL string `yaml:"auth_url"`
Internal bool `yaml:"internal"`
Username string `yaml:"username"`
UserDomainName string `yaml:"user_domain_name"`
UserDomainID string `yaml:"user_domain_id"`
UserID string `yaml:"user_id"`
Password string `yaml:"password"`
DomainID string `yaml:"domain_id"`
DomainName string `yaml:"domain_name"`
ProjectID string `yaml:"project_id"`
ProjectName string `yaml:"project_name"`
ProjectDomainID string `yaml:"project_domain_id"`
ProjectDomainName string `yaml:"project_domain_name"`
RegionName string `yaml:"region_name"`
ContainerName string `yaml:"container_name"`
MaxRetries int `yaml:"max_retries"`
ConnectTimeout time.Duration `yaml:"connect_timeout"`
RequestTimeout time.Duration `yaml:"request_timeout"`
// Allow upstream callers to inject a round tripper
Transport http.RoundTripper `yaml:"-"`
ApplicationCredentialID string `yaml:"application_credential_id"`
ApplicationCredentialName string `yaml:"application_credential_name"`
ApplicationCredentialSecret flagext.Secret `yaml:"application_credential_secret"`
AuthVersion int `yaml:"auth_version"`
AuthURL string `yaml:"auth_url"`
Username string `yaml:"username"`
UserDomainName string `yaml:"user_domain_name"`
UserDomainID string `yaml:"user_domain_id"`
UserID string `yaml:"user_id"`
Password flagext.Secret `yaml:"password"`
DomainID string `yaml:"domain_id"`
DomainName string `yaml:"domain_name"`
ProjectID string `yaml:"project_id"`
ProjectName string `yaml:"project_name"`
ProjectDomainID string `yaml:"project_domain_id"`
ProjectDomainName string `yaml:"project_domain_name"`
RegionName string `yaml:"region_name"`
ContainerName string `yaml:"container_name"`
MaxRetries int `yaml:"max_retries" category:"advanced"`
ConnectTimeout time.Duration `yaml:"connect_timeout" category:"advanced"`
RequestTimeout time.Duration `yaml:"request_timeout" category:"advanced"`
HTTP http.Config `yaml:"http"`
}
// RegisterFlags registers the flags for Swift storage
@ -39,14 +42,16 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
// RegisterFlagsWithPrefix registers the flags for Swift storage with the provided prefix
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.ApplicationCredentialID, prefix+"swift.application-credential-id", "", "OpenStack Swift application credential id")
f.StringVar(&cfg.ApplicationCredentialName, prefix+"swift.application-credential-name", "", "OpenStack Swift application credential name")
f.Var(&cfg.ApplicationCredentialSecret, prefix+"swift.application-credential-secret", "OpenStack Swift application credential secret")
f.IntVar(&cfg.AuthVersion, prefix+"swift.auth-version", 0, "OpenStack Swift authentication API version. 0 to autodetect.")
f.StringVar(&cfg.AuthURL, prefix+"swift.auth-url", "", "OpenStack Swift authentication URL")
f.BoolVar(&cfg.Internal, prefix+"swift.internal", false, "Set this to true to use the internal OpenStack Swift endpoint URL")
f.StringVar(&cfg.Username, prefix+"swift.username", "", "OpenStack Swift username.")
f.StringVar(&cfg.UserDomainName, prefix+"swift.user-domain-name", "", "OpenStack Swift user's domain name.")
f.StringVar(&cfg.UserDomainID, prefix+"swift.user-domain-id", "", "OpenStack Swift user's domain ID.")
f.StringVar(&cfg.UserID, prefix+"swift.user-id", "", "OpenStack Swift user ID.")
f.StringVar(&cfg.Password, prefix+"swift.password", "", "OpenStack Swift API key.")
f.Var(&cfg.Password, prefix+"swift.password", "OpenStack Swift API key.")
f.StringVar(&cfg.DomainID, prefix+"swift.domain-id", "", "OpenStack Swift user's domain ID.")
f.StringVar(&cfg.DomainName, prefix+"swift.domain-name", "", "OpenStack Swift user's domain name.")
f.StringVar(&cfg.ProjectID, prefix+"swift.project-id", "", "OpenStack Swift project ID (v2,v3 auth only).")
@ -58,8 +63,5 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.MaxRetries, prefix+"swift.max-retries", 3, "Max retries on requests error.")
f.DurationVar(&cfg.ConnectTimeout, prefix+"swift.connect-timeout", 10*time.Second, "Time after which a connection attempt is aborted.")
f.DurationVar(&cfg.RequestTimeout, prefix+"swift.request-timeout", 5*time.Second, "Time after which an idle request is aborted. The timeout watchdog is reset each time some data is received, so the timeout triggers after X time no data is received on a request.")
}
func (cfg *Config) Validate() error {
return nil
cfg.HTTP.RegisterFlagsWithPrefix(prefix+"swift.", f)
}

@ -3,27 +3,62 @@ package openstack
import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"flag"
"fmt"
"io"
"net/http"
"os"
"time"
"github.com/grafana/dskit/flagext"
swift "github.com/ncw/swift/v2"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
bucket_swift "github.com/grafana/loki/v3/pkg/storage/bucket/swift"
"github.com/grafana/loki/v3/pkg/storage/chunk/client"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging"
"github.com/grafana/loki/v3/pkg/util/log"
)
var defaultTransport http.RoundTripper = &http.Transport{
Proxy: http.ProxyFromEnvironment,
MaxIdleConnsPerHost: 200,
MaxIdleConns: 200,
ExpectContinueTimeout: 5 * time.Second,
// Config stores the http.Client configuration for the storage clients.
type HTTPConfig struct {
Transport http.RoundTripper `yaml:"-"`
TLSConfig TLSConfig `yaml:",inline"`
}
// TLSConfig configures the options for TLS connections.
type TLSConfig struct {
CAPath string `yaml:"tls_ca_path" category:"advanced"`
}
func defaultTransport(config HTTPConfig) (http.RoundTripper, error) {
if config.Transport != nil {
return config.Transport, nil
}
tlsConfig := &tls.Config{}
if len(config.TLSConfig.CAPath) > 0 {
caPath := config.TLSConfig.CAPath
data, err := os.ReadFile(caPath)
if err != nil {
return nil, fmt.Errorf("unable to load specified CA cert %s: %s", caPath, err)
}
caCertPool := x509.NewCertPool()
if !caCertPool.AppendCertsFromPEM(data) {
return nil, fmt.Errorf("unable to use specified CA cert %s", caPath)
}
tlsConfig.RootCAs = caCertPool
}
return &http.Transport{
Proxy: http.ProxyFromEnvironment,
MaxIdleConns: 200,
MaxIdleConnsPerHost: 200,
ExpectContinueTimeout: 5 * time.Second,
TLSClientConfig: tlsConfig,
}, nil
}
type SwiftObjectClient struct {
@ -34,7 +69,26 @@ type SwiftObjectClient struct {
// SwiftConfig is config for the Swift Chunk Client.
type SwiftConfig struct {
bucket_swift.Config `yaml:",inline"`
AuthVersion int `yaml:"auth_version"`
AuthURL string `yaml:"auth_url"`
Internal bool `yaml:"internal"`
Username string `yaml:"username"`
UserDomainName string `yaml:"user_domain_name"`
UserDomainID string `yaml:"user_domain_id"`
UserID string `yaml:"user_id"`
Password flagext.Secret `yaml:"password"`
DomainID string `yaml:"domain_id"`
DomainName string `yaml:"domain_name"`
ProjectID string `yaml:"project_id"`
ProjectName string `yaml:"project_name"`
ProjectDomainID string `yaml:"project_domain_id"`
ProjectDomainName string `yaml:"project_domain_name"`
RegionName string `yaml:"region_name"`
ContainerName string `yaml:"container_name"`
MaxRetries int `yaml:"max_retries" category:"advanced"`
ConnectTimeout time.Duration `yaml:"connect_timeout" category:"advanced"`
RequestTimeout time.Duration `yaml:"request_timeout" category:"advanced"`
HTTP HTTPConfig `yaml:"http"`
}
// RegisterFlags registers flags.
@ -49,7 +103,26 @@ func (cfg *SwiftConfig) Validate() error {
// RegisterFlagsWithPrefix registers flags with prefix.
func (cfg *SwiftConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.Config.RegisterFlagsWithPrefix(prefix, f)
f.IntVar(&cfg.AuthVersion, prefix+"swift.auth-version", 0, "OpenStack Swift authentication API version. 0 to autodetect.")
f.StringVar(&cfg.AuthURL, prefix+"swift.auth-url", "", "OpenStack Swift authentication URL")
f.BoolVar(&cfg.Internal, prefix+"swift.internal", false, "Set this to true to use the internal OpenStack Swift endpoint URL")
f.StringVar(&cfg.Username, prefix+"swift.username", "", "OpenStack Swift username.")
f.StringVar(&cfg.UserDomainName, prefix+"swift.user-domain-name", "", "OpenStack Swift user's domain name.")
f.StringVar(&cfg.UserDomainID, prefix+"swift.user-domain-id", "", "OpenStack Swift user's domain ID.")
f.StringVar(&cfg.UserID, prefix+"swift.user-id", "", "OpenStack Swift user ID.")
f.Var(&cfg.Password, prefix+"swift.password", "OpenStack Swift API key.")
f.StringVar(&cfg.DomainID, prefix+"swift.domain-id", "", "OpenStack Swift user's domain ID.")
f.StringVar(&cfg.DomainName, prefix+"swift.domain-name", "", "OpenStack Swift user's domain name.")
f.StringVar(&cfg.ProjectID, prefix+"swift.project-id", "", "OpenStack Swift project ID (v2,v3 auth only).")
f.StringVar(&cfg.ProjectName, prefix+"swift.project-name", "", "OpenStack Swift project name (v2,v3 auth only).")
f.StringVar(&cfg.ProjectDomainID, prefix+"swift.project-domain-id", "", "ID of the OpenStack Swift project's domain (v3 auth only), only needed if it differs the from user domain.")
f.StringVar(&cfg.ProjectDomainName, prefix+"swift.project-domain-name", "", "Name of the OpenStack Swift project's domain (v3 auth only), only needed if it differs from the user domain.")
f.StringVar(&cfg.RegionName, prefix+"swift.region-name", "", "OpenStack Swift Region to use (v2,v3 auth only).")
f.StringVar(&cfg.ContainerName, prefix+"swift.container-name", "", "Name of the OpenStack Swift container to put chunks in.")
f.IntVar(&cfg.MaxRetries, prefix+"swift.max-retries", 3, "Max retries on requests error.")
f.DurationVar(&cfg.ConnectTimeout, prefix+"swift.connect-timeout", 10*time.Second, "Time after which a connection attempt is aborted.")
f.DurationVar(&cfg.RequestTimeout, prefix+"swift.request-timeout", 5*time.Second, "Time after which an idle request is aborted. The timeout watchdog is reset each time some data is received, so the timeout triggers after X time no data is received on a request.")
f.StringVar(&cfg.HTTP.TLSConfig.CAPath, prefix+"swift.http.tls-ca-path", "", "Path to the CA certificates to validate server certificate against. If not set, the host's root CA certificates are used.")
}
// NewSwiftObjectClient makes a new chunk.Client that writes chunks to OpenStack Swift.
@ -61,7 +134,7 @@ func NewSwiftObjectClient(cfg SwiftConfig, hedgingCfg hedging.Config) (*SwiftObj
return nil, err
}
// Ensure the container is created, no error is returned if it already exists.
if err := c.ContainerCreate(context.Background(), cfg.Config.ContainerName, nil); err != nil {
if err := c.ContainerCreate(context.Background(), cfg.ContainerName, nil); err != nil {
return nil, err
}
hedging, err := createConnection(cfg, hedgingCfg, true)
@ -76,32 +149,36 @@ func NewSwiftObjectClient(cfg SwiftConfig, hedgingCfg hedging.Config) (*SwiftObj
}
func createConnection(cfg SwiftConfig, hedgingCfg hedging.Config, hedging bool) (*swift.Connection, error) {
// Create a connection
defaultTransport, err := defaultTransport(cfg.HTTP)
if err != nil {
return nil, err
}
c := &swift.Connection{
AuthVersion: cfg.Config.AuthVersion,
AuthUrl: cfg.Config.AuthURL,
Internal: cfg.Config.Internal,
ApiKey: cfg.Config.Password,
UserName: cfg.Config.Username,
UserId: cfg.Config.UserID,
Retries: cfg.Config.MaxRetries,
ConnectTimeout: cfg.Config.ConnectTimeout,
Timeout: cfg.Config.RequestTimeout,
TenantId: cfg.Config.ProjectID,
Tenant: cfg.Config.ProjectName,
TenantDomain: cfg.Config.ProjectDomainName,
TenantDomainId: cfg.Config.ProjectDomainID,
Domain: cfg.Config.DomainName,
DomainId: cfg.Config.DomainID,
Region: cfg.Config.RegionName,
AuthVersion: cfg.AuthVersion,
AuthUrl: cfg.AuthURL,
Internal: cfg.Internal,
ApiKey: cfg.Password.String(),
UserName: cfg.Username,
UserId: cfg.UserID,
Retries: cfg.MaxRetries,
ConnectTimeout: cfg.ConnectTimeout,
Timeout: cfg.RequestTimeout,
TenantId: cfg.ProjectID,
Tenant: cfg.ProjectName,
TenantDomain: cfg.ProjectDomainName,
TenantDomainId: cfg.ProjectDomainID,
Domain: cfg.DomainName,
DomainId: cfg.DomainID,
Region: cfg.RegionName,
Transport: defaultTransport,
}
switch {
case cfg.Config.UserDomainName != "":
c.Domain = cfg.Config.UserDomainName
case cfg.Config.UserDomainID != "":
c.DomainId = cfg.Config.UserDomainID
case cfg.UserDomainName != "":
c.Domain = cfg.UserDomainName
case cfg.UserDomainID != "":
c.DomainId = cfg.UserDomainID
}
if hedging {
var err error
@ -111,7 +188,8 @@ func createConnection(cfg SwiftConfig, hedgingCfg hedging.Config, hedging bool)
}
}
err := c.Authenticate(context.TODO())
// Create a connection
err = c.Authenticate(context.TODO())
if err != nil {
return nil, err
}
@ -135,7 +213,7 @@ func (s *SwiftObjectClient) ObjectExists(ctx context.Context, objectKey string)
}
func (s *SwiftObjectClient) GetAttributes(ctx context.Context, objectKey string) (client.ObjectAttributes, error) {
info, _, err := s.hedgingConn.Object(ctx, s.cfg.Config.ContainerName, objectKey)
info, _, err := s.hedgingConn.Object(ctx, s.cfg.ContainerName, objectKey)
if err != nil {
return client.ObjectAttributes{}, nil
}
@ -146,7 +224,7 @@ func (s *SwiftObjectClient) GetAttributes(ctx context.Context, objectKey string)
// GetObject returns a reader and the size for the specified object key from the configured swift container.
func (s *SwiftObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) {
var buf bytes.Buffer
_, err := s.hedgingConn.ObjectGet(ctx, s.cfg.Config.ContainerName, objectKey, &buf, false, nil)
_, err := s.hedgingConn.ObjectGet(ctx, s.cfg.ContainerName, objectKey, &buf, false, nil)
if err != nil {
return nil, 0, err
}
@ -160,7 +238,7 @@ func (s *SwiftObjectClient) GetObjectRange(ctx context.Context, objectKey string
h := swift.Headers{
"Range": fmt.Sprintf("bytes=%d-%d", offset, offset+length-1),
}
_, err := s.hedgingConn.ObjectGet(ctx, s.cfg.Config.ContainerName, objectKey, &buf, false, h)
_, err := s.hedgingConn.ObjectGet(ctx, s.cfg.ContainerName, objectKey, &buf, false, h)
if err != nil {
return nil, err
}
@ -170,7 +248,7 @@ func (s *SwiftObjectClient) GetObjectRange(ctx context.Context, objectKey string
// PutObject puts the specified bytes into the configured Swift container at the provided key
func (s *SwiftObjectClient) PutObject(ctx context.Context, objectKey string, object io.Reader) error {
_, err := s.conn.ObjectPut(ctx, s.cfg.Config.ContainerName, objectKey, object, false, "", "", nil)
_, err := s.conn.ObjectPut(ctx, s.cfg.ContainerName, objectKey, object, false, "", "", nil)
return err
}
@ -187,7 +265,7 @@ func (s *SwiftObjectClient) List(ctx context.Context, prefix, delimiter string)
opts.Delimiter = []rune(delimiter)[0]
}
objs, err := s.conn.ObjectsAll(ctx, s.cfg.Config.ContainerName, opts)
objs, err := s.conn.ObjectsAll(ctx, s.cfg.ContainerName, opts)
if err != nil {
return nil, nil, err
}
@ -214,7 +292,7 @@ func (s *SwiftObjectClient) List(ctx context.Context, prefix, delimiter string)
// DeleteObject deletes the specified object key from the configured Swift container.
func (s *SwiftObjectClient) DeleteObject(ctx context.Context, objectKey string) error {
return s.conn.ObjectDelete(ctx, s.cfg.Config.ContainerName, objectKey)
return s.conn.ObjectDelete(ctx, s.cfg.ContainerName, objectKey)
}
// IsObjectNotFoundErr returns true if error means that object is not found. Relevant to GetObject and DeleteObject operations.
@ -223,4 +301,8 @@ func (s *SwiftObjectClient) IsObjectNotFoundErr(err error) bool {
}
// TODO(dannyk): implement for client
func (s *SwiftObjectClient) IsRetryableErr(error) bool { return false }
func IsRetryableErr(error) bool { return false }
func (s *SwiftObjectClient) IsRetryableErr(err error) bool {
return IsRetryableErr(err)
}

@ -10,7 +10,8 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"github.com/grafana/loki/v3/pkg/storage/bucket/swift"
"github.com/grafana/dskit/flagext"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/hedging"
)
@ -61,7 +62,7 @@ func Test_Hedging(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
count := atomic.NewInt32(0)
// hijack the transport to count the number of calls
defaultTransport = RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
transportCounter := RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
// fake auth
if req.Header.Get("X-Auth-Key") == "passwd" {
return &http.Response{
@ -89,13 +90,14 @@ func Test_Hedging(t *testing.T) {
})
c, err := NewSwiftObjectClient(SwiftConfig{
Config: swift.Config{
MaxRetries: 1,
ContainerName: "foo",
AuthVersion: 1,
Password: "passwd",
ConnectTimeout: 10 * time.Second,
RequestTimeout: 10 * time.Second,
MaxRetries: 1,
ContainerName: "foo",
AuthVersion: 1,
Password: flagext.SecretWithValue("passwd"),
ConnectTimeout: 10 * time.Second,
RequestTimeout: 10 * time.Second,
HTTP: HTTPConfig{
Transport: transportCounter,
},
}, hedging.Config{
At: tc.hedgeAt,

Loading…
Cancel
Save