Configuration: add a common config section for object storage (#4473)

* add common config for object storage

Signed-off-by: Trevor Whitney <trevorjwhitney@gmail.com>

* wip

Signed-off-by: Trevor Whitney <trevorjwhitney@gmail.com>

* add test to compactor config override

Signed-off-by: Trevor Whitney <trevorjwhitney@gmail.com>

* update changelog

Signed-off-by: Trevor Whitney <trevorjwhitney@gmail.com>

* changes from PR review

* rename object_store -> storage

* add local/filesytem support to common object storage config
pull/4525/head
Trevor Whitney 4 years ago committed by GitHub
parent d8e6debaa5
commit 2427fab32d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 19
      pkg/loki/common/common.go
  3. 97
      pkg/loki/config_wrapper.go
  4. 439
      pkg/loki/config_wrapper_test.go
  5. 27
      pkg/storage/chunk/aws/s3_storage_client.go
  6. 17
      pkg/storage/chunk/azure/blob_storage_client.go
  7. 10
      pkg/storage/chunk/gcp/gcs_object_client.go
  8. 7
      pkg/storage/chunk/local/fs_object_client.go
  9. 7
      pkg/storage/chunk/openstack/swift_object_client.go

@ -4,6 +4,7 @@
* [4440](https://github.com/grafana/loki/pull/4440) **DylanGuedes**: Config: Override distributor's default ring KV store
* [4443](https://github.com/grafana/loki/pull/4443) **DylanGuedes**: Loki: Change how push API checks for contentType
* [4415](https://github.com/grafana/loki/pull/4415) **DylanGuedes**: Change default limits to common values
* [4473](https://github.com/grafana/loki/pull/4473) **trevorwhitney**: Config: add object storage configuration to common config
# 2.3.0 (2021/08/06)

@ -1,6 +1,23 @@
package common
import (
"github.com/grafana/loki/pkg/storage/chunk/aws"
"github.com/grafana/loki/pkg/storage/chunk/azure"
"github.com/grafana/loki/pkg/storage/chunk/gcp"
"github.com/grafana/loki/pkg/storage/chunk/local"
"github.com/grafana/loki/pkg/storage/chunk/openstack"
)
// Config holds common config that can be shared between multiple other config sections
type Config struct {
PathPrefix string `yaml:"path_prefix"`
PathPrefix string `yaml:"path_prefix"`
Storage Storage `yaml:"storage"`
}
type Storage struct {
S3 *aws.S3Config `yaml:"s3"`
GCS *gcp.GCSConfig `yaml:"gcs"`
Azure *azure.BlobStorageConfig `yaml:"azure"`
Swift *openstack.SwiftConfig `yaml:"swift"`
FSConfig *local.FSConfig `yaml:"filesystem"`
}

@ -3,10 +3,12 @@ package loki
import (
"flag"
"fmt"
"reflect"
"github.com/grafana/dskit/flagext"
"github.com/pkg/errors"
"github.com/grafana/loki/pkg/storage/chunk/storage"
"github.com/grafana/loki/pkg/util/cfg"
)
@ -72,6 +74,7 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source {
}
applyMemberlistConfig(r)
applyStorageConfig(r, &defaults)
return nil
}
@ -89,3 +92,97 @@ func applyMemberlistConfig(r *ConfigWrapper) {
r.Ruler.Ring.KVStore.Store = memberlistStr
}
}
// applyStorageConfig will attempt to apply a common storage config for either
// s3, gcs, azure, or swift to all the places we create an object storage client.
// If any specific configs for an object storage client have been provided elsewhere in the
// configuration file, applyStorageConfig will not override them.
// If multiple storage configurations are provided, applyStorageConfig will apply
// all of them, and will set the value for the Ruler's StoreConfig `type` to the
// last one (alphabetically) that was defined.
func applyStorageConfig(cfg, defaults *ConfigWrapper) {
rulerStoreConfigsToApply := make([]func(*ConfigWrapper), 0, 4)
chunkStorageConfigsToApply := make([]func(*ConfigWrapper), 0, 4)
if cfg.Common.Storage.Azure != nil {
rulerStoreConfigsToApply = append(rulerStoreConfigsToApply, func(r *ConfigWrapper) {
r.Ruler.StoreConfig.Type = "azure"
r.Ruler.StoreConfig.Azure = r.Common.Storage.Azure.ToCortexAzureConfig()
})
chunkStorageConfigsToApply = append(chunkStorageConfigsToApply, func(r *ConfigWrapper) {
r.StorageConfig.AzureStorageConfig = *r.Common.Storage.Azure
r.CompactorConfig.SharedStoreType = storage.StorageTypeAzure
})
}
if cfg.Common.Storage.GCS != nil {
rulerStoreConfigsToApply = append(rulerStoreConfigsToApply, func(r *ConfigWrapper) {
r.Ruler.StoreConfig.Type = "gcs"
r.Ruler.StoreConfig.GCS = r.Common.Storage.GCS.ToCortexGCSConfig()
})
chunkStorageConfigsToApply = append(chunkStorageConfigsToApply, func(r *ConfigWrapper) {
r.StorageConfig.GCSConfig = *r.Common.Storage.GCS
r.CompactorConfig.SharedStoreType = storage.StorageTypeGCS
})
}
if cfg.Common.Storage.FSConfig != nil {
rulerStoreConfigsToApply = append(rulerStoreConfigsToApply, func(r *ConfigWrapper) {
r.Ruler.StoreConfig.Type = "local"
r.Ruler.StoreConfig.Local = r.Common.Storage.FSConfig.ToCortexLocalConfig()
})
chunkStorageConfigsToApply = append(chunkStorageConfigsToApply, func(r *ConfigWrapper) {
r.StorageConfig.FSConfig = *r.Common.Storage.FSConfig
r.CompactorConfig.SharedStoreType = storage.StorageTypeFileSystem
})
}
if cfg.Common.Storage.S3 != nil {
rulerStoreConfigsToApply = append(rulerStoreConfigsToApply, func(r *ConfigWrapper) {
r.Ruler.StoreConfig.Type = "s3"
r.Ruler.StoreConfig.S3 = r.Common.Storage.S3.ToCortexS3Config()
})
chunkStorageConfigsToApply = append(chunkStorageConfigsToApply, func(r *ConfigWrapper) {
r.StorageConfig.AWSStorageConfig.S3Config = *r.Common.Storage.S3
r.CompactorConfig.SharedStoreType = storage.StorageTypeS3
})
}
if cfg.Common.Storage.Swift != nil {
rulerStoreConfigsToApply = append(rulerStoreConfigsToApply, func(r *ConfigWrapper) {
r.Ruler.StoreConfig.Type = "swift"
r.Ruler.StoreConfig.Swift = r.Common.Storage.Swift.ToCortexSwiftConfig()
})
chunkStorageConfigsToApply = append(chunkStorageConfigsToApply, func(r *ConfigWrapper) {
r.StorageConfig.Swift = *r.Common.Storage.Swift
r.CompactorConfig.SharedStoreType = storage.StorageTypeSwift
})
}
// store change funcs in slices and apply all at once, because once we change the
// config we can no longer compare it to the default, this allows us to only
// do that comparison once
applyRulerStoreConfigs(cfg, defaults, rulerStoreConfigsToApply)
applyChunkStorageConfigs(cfg, defaults, chunkStorageConfigsToApply)
}
func applyRulerStoreConfigs(cfg, defaults *ConfigWrapper, apply []func(*ConfigWrapper)) {
if reflect.DeepEqual(cfg.Ruler.StoreConfig, defaults.Ruler.StoreConfig) {
for _, ap := range apply {
ap(cfg)
}
}
}
func applyChunkStorageConfigs(cfg, defaults *ConfigWrapper, apply []func(*ConfigWrapper)) {
if reflect.DeepEqual(cfg.StorageConfig, defaults.StorageConfig) {
for _, ap := range apply {
ap(cfg)
}
}
}

@ -3,12 +3,21 @@ package loki
import (
"flag"
"io/ioutil"
"net/url"
"os"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
cortex_aws "github.com/cortexproject/cortex/pkg/chunk/aws"
cortex_azure "github.com/cortexproject/cortex/pkg/chunk/azure"
cortex_gcp "github.com/cortexproject/cortex/pkg/chunk/gcp"
cortex_local "github.com/cortexproject/cortex/pkg/ruler/rulestore/local"
cortex_swift "github.com/cortexproject/cortex/pkg/storage/bucket/swift"
"github.com/grafana/loki/pkg/storage/chunk/storage"
"github.com/grafana/loki/pkg/util/cfg"
)
@ -43,10 +52,15 @@ func Test_CommonConfig(t *testing.T) {
return config, defaults
}
//the unmarshaller overwrites default values with 0s when a completely empty
//config file is passed, so our "empty" config has some non-relevant config in it
const emptyConfigString = `---
server:
http_listen_port: 80`
t.Run("common path prefix config", func(t *testing.T) {
t.Run("does not override defaults for file paths when not provided", func(t *testing.T) {
configFileString := `---`
config, defaults := testContext(configFileString, nil)
config, defaults := testContext(emptyConfigString, nil)
assert.EqualValues(t, defaults.Ruler.RulePath, config.Ruler.RulePath)
assert.EqualValues(t, defaults.Ingester.WAL.Dir, config.Ingester.WAL.Dir)
@ -92,8 +106,7 @@ common:
// * ruler
t.Run("does not automatically configure memberlist when no top-level memberlist config is provided", func(t *testing.T) {
configFileString := `---`
config, defaults := testContext(configFileString, nil)
config, defaults := testContext(emptyConfigString, nil)
assert.EqualValues(t, defaults.Ingester.LifecyclerConfig.RingConfig.KVStore.Store, config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store)
assert.EqualValues(t, defaults.Distributor.DistributorRing.KVStore.Store, config.Distributor.DistributorRing.KVStore.Store)
@ -145,6 +158,424 @@ memberlist:
assert.EqualValues(t, memberlistStr, config.Distributor.DistributorRing.KVStore.Store)
})
})
t.Run("common object store config", func(t *testing.T) {
//config file structure
//common:
// storage:
// azure: azure.BlobStorageConfig
// gcs: gcp.GCSConfig
// s3: aws.S3Config
// swift: openstack.SwiftConfig
t.Run("does not automatically configure cloud object storage", func(t *testing.T) {
config, defaults := testContext(emptyConfigString, nil)
assert.EqualValues(t, defaults.Ruler.StoreConfig.Type, config.Ruler.StoreConfig.Type)
assert.EqualValues(t, defaults.Ruler.StoreConfig.Azure, config.Ruler.StoreConfig.Azure)
assert.EqualValues(t, defaults.Ruler.StoreConfig.GCS, config.Ruler.StoreConfig.GCS)
assert.EqualValues(t, defaults.Ruler.StoreConfig.S3, config.Ruler.StoreConfig.S3)
assert.EqualValues(t, defaults.Ruler.StoreConfig.Swift, config.Ruler.StoreConfig.Swift)
assert.EqualValues(t, defaults.Ruler.StoreConfig.Local, config.Ruler.StoreConfig.Local)
assert.EqualValues(t, defaults.StorageConfig.AWSStorageConfig, config.StorageConfig.AWSStorageConfig)
assert.EqualValues(t, defaults.StorageConfig.AzureStorageConfig, config.StorageConfig.AzureStorageConfig)
assert.EqualValues(t, defaults.StorageConfig.GCSConfig, config.StorageConfig.GCSConfig)
assert.EqualValues(t, defaults.StorageConfig.Swift, config.StorageConfig.Swift)
assert.EqualValues(t, defaults.StorageConfig.FSConfig, config.StorageConfig.FSConfig)
})
t.Run("when multiple configs are provided, the last (alphabetically) is used as the ruler store type", func(t *testing.T) {
multipleConfig := `common:
storage:
s3:
s3: s3://foo-bucket/example
endpoint: s3://foo-bucket
region: us-east1
access_key_id: abc123
secret_access_key: def789
gcs:
bucket_name: foobar
chunk_buffer_size: 27
request_timeout: 5m`
config, _ := testContext(multipleConfig, nil)
assert.Equal(t, "s3", config.Ruler.StoreConfig.Type)
assert.Equal(t, "s3://foo-bucket", config.Ruler.StoreConfig.S3.Endpoint)
assert.Equal(t, "foobar", config.Ruler.StoreConfig.GCS.BucketName)
assert.Equal(t, "s3://foo-bucket", config.StorageConfig.AWSStorageConfig.S3Config.Endpoint)
assert.Equal(t, "foobar", config.StorageConfig.GCSConfig.BucketName)
})
t.Run("when common s3 storage config is provided, ruler and storage config are defaulted to use it", func(t *testing.T) {
s3Config := `common:
storage:
s3:
s3: s3://foo-bucket/example
endpoint: s3://foo-bucket
region: us-east1
access_key_id: abc123
secret_access_key: def789
insecure: true
signature_version: v4
http_config:
idle_conn_timeout: 5m
response_header_timeout: 5m`
config, defaults := testContext(s3Config, nil)
expected, err := url.Parse("s3://foo-bucket/example")
require.NoError(t, err)
assert.Equal(t, "s3", config.Ruler.StoreConfig.Type)
for _, actual := range []cortex_aws.S3Config{
config.Ruler.StoreConfig.S3,
config.StorageConfig.AWSStorageConfig.S3Config.ToCortexS3Config(),
} {
require.NotNil(t, actual.S3.URL)
assert.Equal(t, *expected, *actual.S3.URL)
assert.Equal(t, false, actual.S3ForcePathStyle)
assert.Equal(t, "s3://foo-bucket", actual.Endpoint)
assert.Equal(t, "us-east1", actual.Region)
assert.Equal(t, "abc123", actual.AccessKeyID)
assert.Equal(t, "def789", actual.SecretAccessKey)
assert.Equal(t, true, actual.Insecure)
assert.Equal(t, false, actual.SSEEncryption)
assert.Equal(t, 5*time.Minute, actual.HTTPConfig.IdleConnTimeout)
assert.Equal(t, 5*time.Minute, actual.HTTPConfig.ResponseHeaderTimeout)
assert.Equal(t, false, actual.HTTPConfig.InsecureSkipVerify)
assert.Equal(t, "v4", actual.SignatureVersion)
}
//should remain empty
assert.EqualValues(t, defaults.Ruler.StoreConfig.Azure, config.Ruler.StoreConfig.Azure)
assert.EqualValues(t, defaults.Ruler.StoreConfig.GCS, config.Ruler.StoreConfig.GCS)
assert.EqualValues(t, defaults.Ruler.StoreConfig.Swift, config.Ruler.StoreConfig.Swift)
assert.EqualValues(t, defaults.Ruler.StoreConfig.Local, config.Ruler.StoreConfig.Local)
//should remain empty
assert.EqualValues(t, defaults.StorageConfig.AzureStorageConfig, config.StorageConfig.AzureStorageConfig)
assert.EqualValues(t, defaults.StorageConfig.GCSConfig, config.StorageConfig.GCSConfig)
assert.EqualValues(t, defaults.StorageConfig.Swift, config.StorageConfig.Swift)
assert.EqualValues(t, defaults.StorageConfig.FSConfig, config.StorageConfig.FSConfig)
})
t.Run("when common gcs storage config is provided, ruler and storage config are defaulted to use it", func(t *testing.T) {
gcsConfig := `common:
storage:
gcs:
bucket_name: foobar
chunk_buffer_size: 27
request_timeout: 5m
enable_opencensus: true`
config, defaults := testContext(gcsConfig, nil)
assert.Equal(t, "gcs", config.Ruler.StoreConfig.Type)
for _, actual := range []cortex_gcp.GCSConfig{
config.Ruler.StoreConfig.GCS,
config.StorageConfig.GCSConfig.ToCortexGCSConfig(),
} {
assert.Equal(t, "foobar", actual.BucketName)
assert.Equal(t, 27, actual.ChunkBufferSize)
assert.Equal(t, 5*time.Minute, actual.RequestTimeout)
assert.Equal(t, true, actual.EnableOpenCensus)
}
//should remain empty
assert.EqualValues(t, defaults.Ruler.StoreConfig.Azure, config.Ruler.StoreConfig.Azure)
assert.EqualValues(t, defaults.Ruler.StoreConfig.S3, config.Ruler.StoreConfig.S3)
assert.EqualValues(t, defaults.Ruler.StoreConfig.Swift, config.Ruler.StoreConfig.Swift)
assert.EqualValues(t, defaults.Ruler.StoreConfig.Local, config.Ruler.StoreConfig.Local)
//should remain empty
assert.EqualValues(t, defaults.StorageConfig.AzureStorageConfig, config.StorageConfig.AzureStorageConfig)
assert.EqualValues(t, defaults.StorageConfig.AWSStorageConfig.S3Config, config.StorageConfig.AWSStorageConfig.S3Config)
assert.EqualValues(t, defaults.StorageConfig.Swift, config.StorageConfig.Swift)
assert.EqualValues(t, defaults.StorageConfig.FSConfig, config.StorageConfig.FSConfig)
})
t.Run("when common azure storage config is provided, ruler and storage config are defaulted to use it", func(t *testing.T) {
azureConfig := `common:
storage:
azure:
environment: earth
container_name: milkyway
account_name: 3rd_planet
account_key: water
download_buffer_size: 27
upload_buffer_size: 42
upload_buffer_count: 13
request_timeout: 5m
max_retries: 3
min_retry_delay: 10s
max_retry_delay: 10m`
config, defaults := testContext(azureConfig, nil)
assert.Equal(t, "azure", config.Ruler.StoreConfig.Type)
for _, actual := range []cortex_azure.BlobStorageConfig{
config.Ruler.StoreConfig.Azure,
config.StorageConfig.AzureStorageConfig.ToCortexAzureConfig(),
} {
assert.Equal(t, "earth", actual.Environment)
assert.Equal(t, "milkyway", actual.ContainerName)
assert.Equal(t, "3rd_planet", actual.AccountName)
assert.Equal(t, "water", actual.AccountKey.Value)
assert.Equal(t, 27, actual.DownloadBufferSize)
assert.Equal(t, 42, actual.UploadBufferSize)
assert.Equal(t, 13, actual.UploadBufferCount)
assert.Equal(t, 5*time.Minute, actual.RequestTimeout)
assert.Equal(t, 3, actual.MaxRetries)
assert.Equal(t, 10*time.Second, actual.MinRetryDelay)
assert.Equal(t, 10*time.Minute, actual.MaxRetryDelay)
}
//should remain empty
assert.EqualValues(t, defaults.Ruler.StoreConfig.GCS, config.Ruler.StoreConfig.GCS)
assert.EqualValues(t, defaults.Ruler.StoreConfig.S3, config.Ruler.StoreConfig.S3)
assert.EqualValues(t, defaults.Ruler.StoreConfig.Swift, config.Ruler.StoreConfig.Swift)
assert.EqualValues(t, defaults.Ruler.StoreConfig.Local, config.Ruler.StoreConfig.Local)
//should remain empty
assert.EqualValues(t, defaults.StorageConfig.GCSConfig, config.StorageConfig.GCSConfig)
assert.EqualValues(t, defaults.StorageConfig.AWSStorageConfig.S3Config, config.StorageConfig.AWSStorageConfig.S3Config)
assert.EqualValues(t, defaults.StorageConfig.Swift, config.StorageConfig.Swift)
assert.EqualValues(t, defaults.StorageConfig.FSConfig, config.StorageConfig.FSConfig)
})
t.Run("when common swift storage config is provided, ruler and storage config are defaulted to use it", func(t *testing.T) {
swiftConfig := `common:
storage:
swift:
auth_version: 3
auth_url: http://example.com
username: steve
user_domain_name: example.com
user_domain_id: 1
user_id: 27
password: supersecret
domain_id: 2
domain_name: test.com
project_id: 13
project_name: tower
project_domain_id: 3
project_domain_name: tower.com
region_name: us-east1
container_name: tupperware
max_retries: 6
connect_timeout: 5m
request_timeout: 5s`
config, defaults := testContext(swiftConfig, nil)
assert.Equal(t, "swift", config.Ruler.StoreConfig.Type)
for _, actual := range []cortex_swift.Config{
config.Ruler.StoreConfig.Swift.Config,
config.StorageConfig.Swift.Config,
} {
assert.Equal(t, 3, actual.AuthVersion)
assert.Equal(t, "http://example.com", actual.AuthURL)
assert.Equal(t, "steve", actual.Username)
assert.Equal(t, "example.com", actual.UserDomainName)
assert.Equal(t, "1", actual.UserDomainID)
assert.Equal(t, "27", actual.UserID)
assert.Equal(t, "supersecret", actual.Password)
assert.Equal(t, "2", actual.DomainID)
assert.Equal(t, "test.com", actual.DomainName)
assert.Equal(t, "13", actual.ProjectID)
assert.Equal(t, "tower", actual.ProjectName)
assert.Equal(t, "3", actual.ProjectDomainID)
assert.Equal(t, "tower.com", actual.ProjectDomainName)
assert.Equal(t, "us-east1", actual.RegionName)
assert.Equal(t, "tupperware", actual.ContainerName)
assert.Equal(t, 6, actual.MaxRetries)
assert.Equal(t, 5*time.Minute, actual.ConnectTimeout)
assert.Equal(t, 5*time.Second, actual.RequestTimeout)
}
//should remain empty
assert.EqualValues(t, defaults.Ruler.StoreConfig.GCS, config.Ruler.StoreConfig.GCS)
assert.EqualValues(t, defaults.Ruler.StoreConfig.S3, config.Ruler.StoreConfig.S3)
assert.EqualValues(t, defaults.Ruler.StoreConfig.Azure, config.Ruler.StoreConfig.Azure)
assert.EqualValues(t, defaults.Ruler.StoreConfig.Local, config.Ruler.StoreConfig.Local)
//should remain empty
assert.EqualValues(t, defaults.StorageConfig.GCSConfig, config.StorageConfig.GCSConfig)
assert.EqualValues(t, defaults.StorageConfig.AWSStorageConfig.S3Config, config.StorageConfig.AWSStorageConfig.S3Config)
assert.EqualValues(t, defaults.StorageConfig.AzureStorageConfig, config.StorageConfig.AzureStorageConfig)
assert.EqualValues(t, defaults.StorageConfig.FSConfig, config.StorageConfig.FSConfig)
})
t.Run("when common filesystem/local config is provided, ruler and storage config are defaulted to use it", func(t *testing.T) {
fsConfig := `common:
storage:
filesystem:
directory: /tmp/foo`
config, defaults := testContext(fsConfig, nil)
assert.Equal(t, "local", config.Ruler.StoreConfig.Type)
for _, actual := range []cortex_local.Config{
config.Ruler.StoreConfig.Local,
config.StorageConfig.FSConfig.ToCortexLocalConfig(),
} {
assert.Equal(t, "/tmp/foo", actual.Directory)
}
//should remain empty
assert.EqualValues(t, defaults.Ruler.StoreConfig.GCS, config.Ruler.StoreConfig.GCS)
assert.EqualValues(t, defaults.Ruler.StoreConfig.S3, config.Ruler.StoreConfig.S3)
assert.EqualValues(t, defaults.Ruler.StoreConfig.Azure, config.Ruler.StoreConfig.Azure)
assert.EqualValues(t, defaults.Ruler.StoreConfig.Swift, config.Ruler.StoreConfig.Swift)
//should remain empty
assert.EqualValues(t, defaults.StorageConfig.GCSConfig, config.StorageConfig.GCSConfig)
assert.EqualValues(t, defaults.StorageConfig.AWSStorageConfig.S3Config, config.StorageConfig.AWSStorageConfig.S3Config)
assert.EqualValues(t, defaults.StorageConfig.AzureStorageConfig, config.StorageConfig.AzureStorageConfig)
assert.EqualValues(t, defaults.StorageConfig.Swift, config.StorageConfig.Swift)
})
t.Run("explicit ruler storage object storage configuration provided via config file is preserved", func(t *testing.T) {
specificRulerConfig := `common:
storage:
gcs:
bucket_name: foobar
chunk_buffer_size: 27
request_timeout: 5m
ruler:
storage:
type: s3
s3:
endpoint: s3://foo-bucket
region: us-east1
access_key_id: abc123
secret_access_key: def789`
config, defaults := testContext(specificRulerConfig, nil)
assert.Equal(t, "s3", config.Ruler.StoreConfig.Type)
assert.Equal(t, "s3://foo-bucket", config.Ruler.StoreConfig.S3.Endpoint)
assert.Equal(t, "us-east1", config.Ruler.StoreConfig.S3.Region)
assert.Equal(t, "abc123", config.Ruler.StoreConfig.S3.AccessKeyID)
assert.Equal(t, "def789", config.Ruler.StoreConfig.S3.SecretAccessKey)
//should remain empty
assert.EqualValues(t, defaults.Ruler.StoreConfig.GCS, config.Ruler.StoreConfig.GCS)
//should be set by common config
assert.EqualValues(t, "foobar", config.StorageConfig.GCSConfig.BucketName)
assert.EqualValues(t, 27, config.StorageConfig.GCSConfig.ChunkBufferSize)
assert.EqualValues(t, 5*time.Minute, config.StorageConfig.GCSConfig.RequestTimeout)
//should remain empty
assert.EqualValues(t, defaults.StorageConfig.AWSStorageConfig.S3Config, config.StorageConfig.AWSStorageConfig.S3Config)
})
t.Run("explicit storage config provided via config file is preserved", func(t *testing.T) {
specificRulerConfig := `common:
storage:
gcs:
bucket_name: foobar
chunk_buffer_size: 27
request_timeout: 5m
storage_config:
aws:
endpoint: s3://foo-bucket
region: us-east1
access_key_id: abc123
secret_access_key: def789`
config, defaults := testContext(specificRulerConfig, nil)
assert.Equal(t, "s3://foo-bucket", config.StorageConfig.AWSStorageConfig.S3Config.Endpoint)
assert.Equal(t, "us-east1", config.StorageConfig.AWSStorageConfig.S3Config.Region)
assert.Equal(t, "abc123", config.StorageConfig.AWSStorageConfig.S3Config.AccessKeyID)
assert.Equal(t, "def789", config.StorageConfig.AWSStorageConfig.S3Config.SecretAccessKey)
//should remain empty
assert.EqualValues(t, defaults.StorageConfig.GCSConfig, config.StorageConfig.GCSConfig)
//should be set by common config
assert.EqualValues(t, "foobar", config.Ruler.StoreConfig.GCS.BucketName)
assert.EqualValues(t, 27, config.Ruler.StoreConfig.GCS.ChunkBufferSize)
assert.EqualValues(t, 5*time.Minute, config.Ruler.StoreConfig.GCS.RequestTimeout)
//should remain empty
assert.EqualValues(t, defaults.Ruler.StoreConfig.S3, config.Ruler.StoreConfig.S3)
})
t.Run("when common object store config is provided, compactor shared store is defaulted to use it", func(t *testing.T) {
for _, tt := range []struct {
configString string
expected string
}{
{
configString: `common:
storage:
s3:
s3: s3://foo-bucket/example
access_key_id: abc123
secret_access_key: def789`,
expected: storage.StorageTypeS3,
},
{
configString: `common:
storage:
gcs:
bucket_name: foobar`,
expected: storage.StorageTypeGCS,
},
{
configString: `common:
storage:
azure:
account_name: 3rd_planet
account_key: water`,
expected: storage.StorageTypeAzure,
},
{
configString: `common:
storage:
swift:
username: steve
password: supersecret`,
expected: storage.StorageTypeSwift,
},
{
configString: `common:
storage:
filesystem:
directory: /tmp/foo`,
expected: storage.StorageTypeFileSystem,
},
} {
config, _ := testContext(tt.configString, nil)
assert.Equal(t, tt.expected, config.CompactorConfig.SharedStoreType)
}
})
t.Run("explicit compactor shared_store config is preserved", func(t *testing.T) {
configString := `common:
storage:
s3:
s3: s3://foo-bucket/example
access_key_id: abc123
secret_access_key: def789
compactor:
shared_store: gcs`
config, _ := testContext(configString, nil)
assert.Equal(t, "gcs", config.CompactorConfig.SharedStoreType)
})
})
}
// Can't use a totally empty yaml file or it causes weird behavior in the unmarhsalling

@ -28,6 +28,7 @@ import (
awscommon "github.com/weaveworks/common/aws"
"github.com/weaveworks/common/instrument"
cortex_aws "github.com/cortexproject/cortex/pkg/chunk/aws"
cortex_s3 "github.com/cortexproject/cortex/pkg/storage/bucket/s3"
"github.com/cortexproject/cortex/pkg/util"
"github.com/grafana/dskit/flagext"
@ -125,6 +126,32 @@ func (cfg *S3Config) Validate() error {
return nil
}
func (cfg *S3Config) ToCortexS3Config() cortex_aws.S3Config {
return cortex_aws.S3Config{
S3: cfg.S3,
S3ForcePathStyle: cfg.S3ForcePathStyle,
BucketNames: cfg.BucketNames,
Endpoint: cfg.Endpoint,
Region: cfg.Region,
AccessKeyID: cfg.AccessKeyID,
SecretAccessKey: cfg.SecretAccessKey,
Insecure: cfg.Insecure,
SSEEncryption: cfg.SSEEncryption,
HTTPConfig: cfg.HTTPConfig.ToCortexHTTPConfig(),
SignatureVersion: cfg.SignatureVersion,
SSEConfig: cfg.SSEConfig,
Inject: cortex_aws.InjectRequestMiddleware(cfg.Inject),
}
}
func (cfg *HTTPConfig) ToCortexHTTPConfig() cortex_aws.HTTPConfig {
return cortex_aws.HTTPConfig{
IdleConnTimeout: cfg.IdleConnTimeout,
ResponseHeaderTimeout: cfg.ResponseHeaderTimeout,
InsecureSkipVerify: cfg.InsecureSkipVerify,
}
}
type S3ObjectClient struct {
bucketNames []string
S3 s3iface.S3API

@ -13,6 +13,7 @@ import (
"github.com/Azure/azure-pipeline-go/pipeline"
"github.com/Azure/azure-storage-blob-go/azblob"
cortex_azure "github.com/cortexproject/cortex/pkg/chunk/azure"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/log"
"github.com/grafana/dskit/flagext"
@ -87,6 +88,22 @@ func (c *BlobStorageConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagS
f.DurationVar(&c.MaxRetryDelay, prefix+"azure.max-retry-delay", 500*time.Millisecond, "Maximum time to wait before retrying a request.")
}
func (c *BlobStorageConfig) ToCortexAzureConfig() cortex_azure.BlobStorageConfig {
return cortex_azure.BlobStorageConfig{
Environment: c.Environment,
ContainerName: c.ContainerName,
AccountName: c.AccountName,
AccountKey: c.AccountKey,
DownloadBufferSize: c.DownloadBufferSize,
UploadBufferSize: c.UploadBufferSize,
UploadBufferCount: c.UploadBufferCount,
RequestTimeout: c.RequestTimeout,
MaxRetries: c.MaxRetries,
MinRetryDelay: c.MinRetryDelay,
MaxRetryDelay: c.MaxRetryDelay,
}
}
// BlobStorage is used to interact with azure blob storage for setting or getting time series chunks.
// Implements ObjectStorage
type BlobStorage struct {

@ -7,6 +7,7 @@ import (
"time"
"cloud.google.com/go/storage"
cortex_gcp "github.com/cortexproject/cortex/pkg/chunk/gcp"
"github.com/pkg/errors"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
@ -42,6 +43,15 @@ func (cfg *GCSConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.BoolVar(&cfg.EnableOpenCensus, prefix+"gcs.enable-opencensus", true, "Enabled OpenCensus (OC) instrumentation for all requests.")
}
func (cfg *GCSConfig) ToCortexGCSConfig() cortex_gcp.GCSConfig {
return cortex_gcp.GCSConfig{
BucketName: cfg.BucketName,
ChunkBufferSize: cfg.ChunkBufferSize,
RequestTimeout: cfg.RequestTimeout,
EnableOpenCensus: cfg.EnableOpenCensus,
}
}
// NewGCSObjectClient makes a new chunk.Client that writes chunks to GCS.
func NewGCSObjectClient(ctx context.Context, cfg GCSConfig) (*GCSObjectClient, error) {
var opts []option.ClientOption

@ -13,6 +13,7 @@ import (
"github.com/pkg/errors"
"github.com/thanos-io/thanos/pkg/runutil"
cortex_local "github.com/cortexproject/cortex/pkg/ruler/rulestore/local"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/grafana/loki/pkg/storage/chunk"
@ -34,6 +35,12 @@ func (cfg *FSConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.Directory, prefix+"local.chunk-directory", "", "Directory to store chunks in.")
}
func (cfg *FSConfig) ToCortexLocalConfig() cortex_local.Config {
return cortex_local.Config{
Directory: cfg.Directory,
}
}
// FSObjectClient holds config for filesystem as object store
type FSObjectClient struct {
cfg FSConfig

@ -11,6 +11,7 @@ import (
"github.com/ncw/swift"
"github.com/pkg/errors"
cortex_openstack "github.com/cortexproject/cortex/pkg/chunk/openstack"
cortex_swift "github.com/cortexproject/cortex/pkg/storage/bucket/swift"
"github.com/cortexproject/cortex/pkg/util/log"
@ -42,6 +43,12 @@ func (cfg *SwiftConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)
cfg.Config.RegisterFlagsWithPrefix(prefix, f)
}
func (cfg *SwiftConfig) ToCortexSwiftConfig() cortex_openstack.SwiftConfig {
return cortex_openstack.SwiftConfig{
Config: cfg.Config,
}
}
// NewSwiftObjectClient makes a new chunk.Client that writes chunks to OpenStack Swift.
func NewSwiftObjectClient(cfg SwiftConfig) (*SwiftObjectClient, error) {
log.WarnExperimentalUse("OpenStack Swift Storage")

Loading…
Cancel
Save