feat(thanos): add support for aliyun oss and baidu bos (#14891)

pull/14941/head
Ashwanth 1 year ago committed by GitHub
parent a629212ceb
commit fb6789d9df
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 17
      pkg/storage/bucket/bos/bucket_client.go
  2. 26
      pkg/storage/bucket/bos/config.go
  3. 22
      pkg/storage/bucket/client.go
  4. 18
      pkg/storage/bucket/oss/bucket_client.go
  5. 28
      pkg/storage/bucket/oss/config.go
  6. 65
      vendor/github.com/thanos-io/objstore/clientutil/parse.go
  7. 440
      vendor/github.com/thanos-io/objstore/providers/bos/bos.go
  8. 426
      vendor/github.com/thanos-io/objstore/providers/oss/oss.go
  9. 3
      vendor/modules.txt

@ -0,0 +1,17 @@
package bos
import (
"github.com/go-kit/log"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/providers/bos"
)
func NewBucketClient(cfg Config, name string, logger log.Logger) (objstore.Bucket, error) {
bosCfg := bos.Config{
Endpoint: cfg.Endpoint,
Bucket: cfg.Bucket,
SecretKey: cfg.SecretKey.String(),
AccessKey: cfg.AccessKey,
}
return bos.NewBucketWithConfig(logger, bosCfg, name)
}

@ -0,0 +1,26 @@
package bos
import (
"flag"
"github.com/grafana/dskit/flagext"
)
// Config holds the configuration for Baidu Cloud BOS client
type Config struct {
Bucket string `yaml:"bucket"`
Endpoint string `yaml:"endpoint"`
AccessKey string `yaml:"access_key"`
SecretKey flagext.Secret `yaml:"secret_key"`
}
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("", f)
}
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.Bucket, prefix+"bos.bucket", "", "Name of BOS bucket.")
f.StringVar(&cfg.Endpoint, prefix+"bos.endpoint", "", "BOS endpoint to connect to.")
f.StringVar(&cfg.AccessKey, prefix+"bos.access-key", "", "Baidu Cloud Engine (BCE) Access Key ID.")
f.Var(&cfg.SecretKey, prefix+"bos.secret-key", "Baidu Cloud Engine (BCE) Secret Access Key.")
}

@ -15,8 +15,10 @@ import (
objstoretracing "github.com/thanos-io/objstore/tracing/opentracing"
"github.com/grafana/loki/v3/pkg/storage/bucket/azure"
"github.com/grafana/loki/v3/pkg/storage/bucket/bos"
"github.com/grafana/loki/v3/pkg/storage/bucket/filesystem"
"github.com/grafana/loki/v3/pkg/storage/bucket/gcs"
"github.com/grafana/loki/v3/pkg/storage/bucket/oss"
"github.com/grafana/loki/v3/pkg/storage/bucket/s3"
"github.com/grafana/loki/v3/pkg/storage/bucket/swift"
)
@ -37,12 +39,18 @@ const (
// Filesystem is the value for the filesystem storage backend.
Filesystem = "filesystem"
// Alibaba is the value for the Alibaba Cloud OSS storage backend
Alibaba = "alibabacloud"
// BOS is the value for the Baidu Cloud BOS storage backend
BOS = "bos"
// validPrefixCharactersRegex allows only alphanumeric characters to prevent subtle bugs and simplify validation
validPrefixCharactersRegex = `^[\da-zA-Z]+$`
)
var (
SupportedBackends = []string{S3, GCS, Azure, Swift, Filesystem}
SupportedBackends = []string{S3, GCS, Azure, Swift, Filesystem, Alibaba, BOS}
ErrUnsupportedStorageBackend = errors.New("unsupported storage backend")
ErrInvalidCharactersInStoragePrefix = errors.New("storage prefix contains invalid characters, it may only contain digits and English alphabet letters")
@ -73,6 +81,8 @@ type StorageBackendConfig struct {
Azure azure.Config `yaml:"azure"`
Swift swift.Config `yaml:"swift"`
Filesystem filesystem.Config `yaml:"filesystem"`
Alibaba oss.Config `yaml:"alibaba"`
BOS bos.Config `yaml:"bos"`
// Used to inject additional backends into the config. Allows for this config to
// be embedded in multiple contexts and support non-object storage based backends.
@ -95,6 +105,8 @@ func (cfg *StorageBackendConfig) RegisterFlagsWithPrefixAndDefaultDirectory(pref
cfg.Azure.RegisterFlagsWithPrefix(prefix, f)
cfg.Swift.RegisterFlagsWithPrefix(prefix, f)
cfg.Filesystem.RegisterFlagsWithPrefixAndDefaultDirectory(prefix, dir, f)
cfg.Alibaba.RegisterFlagsWithPrefix(prefix, f)
cfg.BOS.RegisterFlagsWithPrefix(prefix, f)
}
func (cfg *StorageBackendConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
@ -154,7 +166,7 @@ func (cfg *Config) disableRetries(backend string) error {
cfg.Azure.MaxRetries = 1
case Swift:
cfg.Swift.MaxRetries = 1
case Filesystem:
case Filesystem, Alibaba, BOS:
// do nothing
default:
return fmt.Errorf("cannot disable retries for backend: %s", backend)
@ -173,7 +185,7 @@ func (cfg *Config) configureTransport(backend string, rt http.RoundTripper) erro
cfg.Azure.Transport = rt
case Swift:
cfg.Swift.Transport = rt
case Filesystem:
case Filesystem, Alibaba, BOS:
// do nothing
default:
return fmt.Errorf("cannot configure transport for backend: %s", backend)
@ -201,6 +213,10 @@ func NewClient(ctx context.Context, backend string, cfg Config, name string, log
client, err = swift.NewBucketClient(cfg.Swift, name, logger, instrumentTransport())
case Filesystem:
client, err = filesystem.NewBucketClient(cfg.Filesystem)
case Alibaba:
client, err = oss.NewBucketClient(cfg.Alibaba, name, logger)
case BOS:
client, err = bos.NewBucketClient(cfg.BOS, name, logger)
default:
return nil, ErrUnsupportedStorageBackend
}

@ -0,0 +1,18 @@
package oss
import (
"github.com/go-kit/log"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/providers/oss"
)
// NewBucketClient creates a new Alibaba Cloud OSS bucket client
func NewBucketClient(cfg Config, component string, logger log.Logger) (objstore.Bucket, error) {
ossCfg := oss.Config{
Endpoint: cfg.Endpoint,
Bucket: cfg.Bucket,
AccessKeyID: cfg.AccessKeyID,
AccessKeySecret: cfg.AccessKeySecret.String(),
}
return oss.NewBucketWithConfig(logger, ossCfg, component, nil)
}

@ -0,0 +1,28 @@
package oss
import (
"flag"
"github.com/grafana/dskit/flagext"
)
// Config holds the configuration for Alibaba Cloud OSS client
type Config struct {
Endpoint string `yaml:"endpoint"`
Bucket string `yaml:"bucket"`
AccessKeyID string `yaml:"access_key_id"`
AccessKeySecret flagext.Secret `yaml:"access_key_secret"`
}
// RegisterFlags registers the flags for Alibaba Cloud OSS storage config
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("", f)
}
// RegisterFlagsWithPrefix registers the flags for Alibaba Cloud OSS storage config with prefix
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.Bucket, prefix+"oss.bucketname", "", "Name of OSS bucket.")
f.StringVar(&cfg.Endpoint, prefix+"oss.endpoint", "", "Endpoint to connect to.")
f.StringVar(&cfg.AccessKeyID, prefix+"oss.access-key-id", "", "alibabacloud Access Key ID")
f.Var(&cfg.AccessKeySecret, prefix+"oss.access-key-secret", "alibabacloud Secret Access Key")
}

@ -0,0 +1,65 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.
package clientutil
import (
"net/http"
"strconv"
"time"
"github.com/pkg/errors"
)
// ParseContentLength returns the content length (in bytes) parsed from the Content-Length
// HTTP header in input.
func ParseContentLength(m http.Header) (int64, error) {
const name = "Content-Length"
v, ok := m[name]
if !ok {
return 0, errors.Errorf("%s header not found", name)
}
if len(v) == 0 {
return 0, errors.Errorf("%s header has no values", name)
}
ret, err := strconv.ParseInt(v[0], 10, 64)
if err != nil {
return 0, errors.Wrapf(err, "convert %s", name)
}
return ret, nil
}
// ParseLastModified returns the timestamp parsed from the Last-Modified
// HTTP header in input.
// Passing an second parameter, named f, to specify the time format.
// If f is empty then RFC3339 will be used as default format.
func ParseLastModified(m http.Header, f string) (time.Time, error) {
const (
name = "Last-Modified"
defaultFormat = time.RFC3339
)
v, ok := m[name]
if !ok {
return time.Time{}, errors.Errorf("%s header not found", name)
}
if len(v) == 0 {
return time.Time{}, errors.Errorf("%s header has no values", name)
}
if f == "" {
f = defaultFormat
}
mod, err := time.Parse(f, v[0])
if err != nil {
return time.Time{}, errors.Wrapf(err, "parse %s", name)
}
return mod, nil
}

@ -0,0 +1,440 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.
package bos
import (
"context"
"fmt"
"io"
"math"
"math/rand"
"net/http"
"os"
"strings"
"testing"
"time"
"github.com/baidubce/bce-sdk-go/bce"
"github.com/baidubce/bce-sdk-go/services/bos"
"github.com/baidubce/bce-sdk-go/services/bos/api"
"github.com/go-kit/log"
"github.com/pkg/errors"
"gopkg.in/yaml.v2"
"github.com/thanos-io/objstore"
)
// partSize 128MB.
const partSize = 1024 * 1024 * 128
// Bucket implements the store.Bucket interface against bos-compatible(Baidu Object Storage) APIs.
type Bucket struct {
logger log.Logger
client *bos.Client
name string
}
// Config encapsulates the necessary config values to instantiate an bos client.
type Config struct {
Bucket string `yaml:"bucket"`
Endpoint string `yaml:"endpoint"`
AccessKey string `yaml:"access_key"`
SecretKey string `yaml:"secret_key"`
}
func (conf *Config) validate() error {
if conf.Bucket == "" ||
conf.Endpoint == "" ||
conf.AccessKey == "" ||
conf.SecretKey == "" {
return errors.New("insufficient BOS configuration information")
}
return nil
}
// parseConfig unmarshal a buffer into a Config with default HTTPConfig values.
func parseConfig(conf []byte) (Config, error) {
config := Config{}
if err := yaml.Unmarshal(conf, &config); err != nil {
return Config{}, err
}
return config, nil
}
// NewBucket new bos bucket.
func NewBucket(logger log.Logger, conf []byte, component string) (*Bucket, error) {
// TODO(https://github.com/thanos-io/objstore/pull/150): Add support for roundtripper wrapper.
if logger == nil {
logger = log.NewNopLogger()
}
config, err := parseConfig(conf)
if err != nil {
return nil, errors.Wrap(err, "parsing BOS configuration")
}
return NewBucketWithConfig(logger, config, component)
}
// NewBucketWithConfig returns a new Bucket using the provided bos config struct.
func NewBucketWithConfig(logger log.Logger, config Config, component string) (*Bucket, error) {
if err := config.validate(); err != nil {
return nil, errors.Wrap(err, "validating BOS configuration")
}
client, err := bos.NewClient(config.AccessKey, config.SecretKey, config.Endpoint)
if err != nil {
return nil, errors.Wrap(err, "creating BOS client")
}
client.Config.UserAgent = fmt.Sprintf("thanos-%s", component)
bkt := &Bucket{
logger: logger,
client: client,
name: config.Bucket,
}
return bkt, nil
}
// Name returns the bucket name for the provider.
func (b *Bucket) Name() string {
return b.name
}
// Delete removes the object with the given name.
func (b *Bucket) Delete(_ context.Context, name string) error {
return b.client.DeleteObject(b.name, name)
}
// Upload the contents of the reader as an object into the bucket.
func (b *Bucket) Upload(_ context.Context, name string, r io.Reader) error {
size, err := objstore.TryToGetSize(r)
if err != nil {
return errors.Wrapf(err, "getting size of %s", name)
}
partNums, lastSlice := int(math.Floor(float64(size)/partSize)), size%partSize
if partNums == 0 {
body, err := bce.NewBodyFromSizedReader(r, lastSlice)
if err != nil {
return errors.Wrapf(err, "failed to create SizedReader for %s", name)
}
if _, err := b.client.PutObject(b.name, name, body, nil); err != nil {
return errors.Wrapf(err, "failed to upload %s", name)
}
return nil
}
result, err := b.client.BasicInitiateMultipartUpload(b.name, name)
if err != nil {
return errors.Wrapf(err, "failed to initiate MultipartUpload for %s", name)
}
uploadEveryPart := func(partSize int64, part int, uploadId string) (string, error) {
body, err := bce.NewBodyFromSizedReader(r, partSize)
if err != nil {
return "", err
}
etag, err := b.client.UploadPart(b.name, name, uploadId, part, body, nil)
if err != nil {
if err := b.client.AbortMultipartUpload(b.name, name, uploadId); err != nil {
return etag, err
}
return etag, err
}
return etag, nil
}
var parts []api.UploadInfoType
for part := 1; part <= partNums; part++ {
etag, err := uploadEveryPart(partSize, part, result.UploadId)
if err != nil {
return errors.Wrapf(err, "failed to upload part %d for %s", part, name)
}
parts = append(parts, api.UploadInfoType{PartNumber: part, ETag: etag})
}
if lastSlice != 0 {
etag, err := uploadEveryPart(lastSlice, partNums+1, result.UploadId)
if err != nil {
return errors.Wrapf(err, "failed to upload the last part for %s", name)
}
parts = append(parts, api.UploadInfoType{PartNumber: partNums + 1, ETag: etag})
}
if _, err := b.client.CompleteMultipartUploadFromStruct(b.name, name, result.UploadId, &api.CompleteMultipartUploadArgs{Parts: parts}); err != nil {
return errors.Wrapf(err, "failed to set %s upload completed", name)
}
return nil
}
func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType {
return []objstore.IterOptionType{objstore.Recursive, objstore.UpdatedAt}
}
func (b *Bucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error {
if err := objstore.ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil {
return err
}
if dir != "" {
dir = strings.TrimSuffix(dir, objstore.DirDelim) + objstore.DirDelim
}
delimiter := objstore.DirDelim
params := objstore.ApplyIterOptions(options...)
if params.Recursive {
delimiter = ""
}
var marker string
for {
if err := ctx.Err(); err != nil {
return err
}
objects, err := b.client.ListObjects(b.name, &api.ListObjectsArgs{
Delimiter: delimiter,
Marker: marker,
MaxKeys: 1000,
Prefix: dir,
})
if err != nil {
return err
}
marker = objects.NextMarker
for _, object := range objects.Contents {
attrs := objstore.IterObjectAttributes{
Name: object.Key,
}
if params.LastModified && object.LastModified != "" {
lastModified, err := time.Parse(time.RFC1123, object.LastModified)
if err != nil {
return fmt.Errorf("iter: get last modified: %w", err)
}
attrs.SetLastModified(lastModified)
}
if err := f(attrs); err != nil {
return err
}
}
for _, object := range objects.CommonPrefixes {
if err := f(objstore.IterObjectAttributes{Name: object.Prefix}); err != nil {
return err
}
}
if !objects.IsTruncated {
break
}
}
return nil
}
// Iter calls f for each entry in the given directory. The argument to f is the full
// object name including the prefix of the inspected directory.
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opts ...objstore.IterOption) error {
// Only include recursive option since attributes are not used in this method.
var filteredOpts []objstore.IterOption
for _, opt := range opts {
if opt.Type == objstore.Recursive {
filteredOpts = append(filteredOpts, opt)
break
}
}
return b.IterWithAttributes(ctx, dir, func(attrs objstore.IterObjectAttributes) error {
return f(attrs.Name)
}, filteredOpts...)
}
// Get returns a reader for the given object name.
func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
return b.getRange(ctx, b.name, name, 0, -1)
}
// GetRange returns a new range reader for the given object name and range.
func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
return b.getRange(ctx, b.name, name, off, length)
}
// Exists checks if the given object exists in the bucket.
func (b *Bucket) Exists(_ context.Context, name string) (bool, error) {
_, err := b.client.GetObjectMeta(b.name, name)
if err != nil {
if b.IsObjNotFoundErr(err) {
return false, nil
}
return false, errors.Wrapf(err, "getting object metadata of %s", name)
}
return true, nil
}
func (b *Bucket) Close() error {
return nil
}
// ObjectSize returns the size of the specified object.
func (b *Bucket) ObjectSize(_ context.Context, name string) (uint64, error) {
objMeta, err := b.client.GetObjectMeta(b.name, name)
if err != nil {
return 0, err
}
return uint64(objMeta.ContentLength), nil
}
// Attributes returns information about the specified object.
func (b *Bucket) Attributes(_ context.Context, name string) (objstore.ObjectAttributes, error) {
objMeta, err := b.client.GetObjectMeta(b.name, name)
if err != nil {
return objstore.ObjectAttributes{}, errors.Wrapf(err, "gettting objectmeta of %s", name)
}
lastModified, err := time.Parse(time.RFC1123, objMeta.LastModified)
if err != nil {
return objstore.ObjectAttributes{}, err
}
return objstore.ObjectAttributes{
Size: objMeta.ContentLength,
LastModified: lastModified,
}, nil
}
// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations.
func (b *Bucket) IsObjNotFoundErr(err error) bool {
switch bosErr := errors.Cause(err).(type) {
case *bce.BceServiceError:
if bosErr.StatusCode == http.StatusNotFound || bosErr.Code == "NoSuchKey" {
return true
}
}
return false
}
// IsAccessDeniedErr returns true if access to object is denied.
func (b *Bucket) IsAccessDeniedErr(_ error) bool {
return false
}
func (b *Bucket) getRange(_ context.Context, bucketName, objectKey string, off, length int64) (io.ReadCloser, error) {
if len(objectKey) == 0 {
return nil, errors.Errorf("given object name should not empty")
}
ranges := []int64{off}
if length != -1 {
ranges = append(ranges, off+length-1)
}
obj, err := b.client.GetObject(bucketName, objectKey, map[string]string{}, ranges...)
if err != nil {
return nil, err
}
return objstore.ObjectSizerReadCloser{
ReadCloser: obj.Body,
Size: func() (int64, error) {
return obj.ContentLength, nil
},
}, err
}
func configFromEnv() Config {
c := Config{
Bucket: os.Getenv("BOS_BUCKET"),
Endpoint: os.Getenv("BOS_ENDPOINT"),
AccessKey: os.Getenv("BOS_ACCESS_KEY"),
SecretKey: os.Getenv("BOS_SECRET_KEY"),
}
return c
}
// NewTestBucket creates test bkt client that before returning creates temporary bucket.
// In a close function it empties and deletes the bucket.
func NewTestBucket(t testing.TB) (objstore.Bucket, func(), error) {
c := configFromEnv()
if err := validateForTest(c); err != nil {
return nil, nil, err
}
if c.Bucket != "" {
if os.Getenv("THANOS_ALLOW_EXISTING_BUCKET_USE") == "" {
return nil, nil, errors.New("BOS_BUCKET is defined. Normally this tests will create temporary bucket " +
"and delete it after test. Unset BOS_BUCKET env variable to use default logic. If you really want to run " +
"tests against provided (NOT USED!) bucket, set THANOS_ALLOW_EXISTING_BUCKET_USE=true. WARNING: That bucket " +
"needs to be manually cleared. This means that it is only useful to run one test in a time. This is due " +
"to safety (accidentally pointing prod bucket for test) as well as BOS not being fully strong consistent.")
}
bc, err := yaml.Marshal(c)
if err != nil {
return nil, nil, err
}
b, err := NewBucket(log.NewNopLogger(), bc, "thanos-e2e-test")
if err != nil {
return nil, nil, err
}
if err := b.Iter(context.Background(), "", func(f string) error {
return errors.Errorf("bucket %s is not empty", c.Bucket)
}); err != nil {
return nil, nil, errors.Wrapf(err, "checking bucket %s", c.Bucket)
}
t.Log("WARNING. Reusing", c.Bucket, "BOS bucket for BOS tests. Manual cleanup afterwards is required")
return b, func() {}, nil
}
src := rand.NewSource(time.Now().UnixNano())
tmpBucketName := strings.Replace(fmt.Sprintf("test_%x", src.Int63()), "_", "-", -1)
if len(tmpBucketName) >= 31 {
tmpBucketName = tmpBucketName[:31]
}
c.Bucket = tmpBucketName
bc, err := yaml.Marshal(c)
if err != nil {
return nil, nil, err
}
b, err := NewBucket(log.NewNopLogger(), bc, "thanos-e2e-test")
if err != nil {
return nil, nil, err
}
if _, err := b.client.PutBucket(b.name); err != nil {
return nil, nil, err
}
t.Log("created temporary BOS bucket for BOS tests with name", tmpBucketName)
return b, func() {
objstore.EmptyBucket(t, context.Background(), b)
if err := b.client.DeleteBucket(b.name); err != nil {
t.Logf("deleting bucket %s failed: %s", tmpBucketName, err)
}
}, nil
}
func validateForTest(conf Config) error {
if conf.Endpoint == "" ||
conf.AccessKey == "" ||
conf.SecretKey == "" {
return errors.New("insufficient BOS configuration information")
}
return nil
}

@ -0,0 +1,426 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.
package oss
import (
"context"
"fmt"
"io"
"math"
"math/rand"
"net/http"
"os"
"strconv"
"strings"
"testing"
"time"
"github.com/aliyun/aliyun-oss-go-sdk/oss"
alioss "github.com/aliyun/aliyun-oss-go-sdk/oss"
"github.com/go-kit/log"
"github.com/pkg/errors"
"gopkg.in/yaml.v2"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/clientutil"
"github.com/thanos-io/objstore/exthttp"
)
// PartSize is a part size for multi part upload.
const PartSize = 1024 * 1024 * 128
// Config stores the configuration for oss bucket.
type Config struct {
Endpoint string `yaml:"endpoint"`
Bucket string `yaml:"bucket"`
AccessKeyID string `yaml:"access_key_id"`
AccessKeySecret string `yaml:"access_key_secret"`
}
// Bucket implements the store.Bucket interface.
type Bucket struct {
name string
logger log.Logger
client *alioss.Client
config Config
bucket *alioss.Bucket
}
func NewTestBucket(t testing.TB) (objstore.Bucket, func(), error) {
c := Config{
Endpoint: os.Getenv("ALIYUNOSS_ENDPOINT"),
Bucket: os.Getenv("ALIYUNOSS_BUCKET"),
AccessKeyID: os.Getenv("ALIYUNOSS_ACCESS_KEY_ID"),
AccessKeySecret: os.Getenv("ALIYUNOSS_ACCESS_KEY_SECRET"),
}
if c.Endpoint == "" || c.AccessKeyID == "" || c.AccessKeySecret == "" {
return nil, nil, errors.New("aliyun oss endpoint or access_key_id or access_key_secret " +
"is not present in config file")
}
if c.Bucket != "" && os.Getenv("THANOS_ALLOW_EXISTING_BUCKET_USE") == "true" {
t.Log("ALIYUNOSS_BUCKET is defined. Normally this tests will create temporary bucket " +
"and delete it after test. Unset ALIYUNOSS_BUCKET env variable to use default logic. If you really want to run " +
"tests against provided (NOT USED!) bucket, set THANOS_ALLOW_EXISTING_BUCKET_USE=true.")
return NewTestBucketFromConfig(t, c, true)
}
return NewTestBucketFromConfig(t, c, false)
}
// Upload the contents of the reader as an object into the bucket.
func (b *Bucket) Upload(_ context.Context, name string, r io.Reader) error {
// TODO(https://github.com/thanos-io/thanos/issues/678): Remove guessing length when minio provider will support multipart upload without this.
size, err := objstore.TryToGetSize(r)
if err != nil {
return errors.Wrapf(err, "failed to get size apriori to upload %s", name)
}
chunksnum, lastslice := int(math.Floor(float64(size)/PartSize)), size%PartSize
ncloser := io.NopCloser(r)
switch chunksnum {
case 0:
if err := b.bucket.PutObject(name, ncloser); err != nil {
return errors.Wrap(err, "failed to upload oss object")
}
default:
{
init, err := b.bucket.InitiateMultipartUpload(name)
if err != nil {
return errors.Wrap(err, "failed to initiate multi-part upload")
}
chunk := 0
uploadEveryPart := func(everypartsize int64, cnk int) (alioss.UploadPart, error) {
prt, err := b.bucket.UploadPart(init, ncloser, everypartsize, cnk)
if err != nil {
if err := b.bucket.AbortMultipartUpload(init); err != nil {
return prt, errors.Wrap(err, "failed to abort multi-part upload")
}
return prt, errors.Wrap(err, "failed to upload multi-part chunk")
}
return prt, nil
}
var parts []alioss.UploadPart
for ; chunk < chunksnum; chunk++ {
part, err := uploadEveryPart(PartSize, chunk+1)
if err != nil {
return errors.Wrap(err, "failed to upload every part")
}
parts = append(parts, part)
}
if lastslice != 0 {
part, err := uploadEveryPart(lastslice, chunksnum+1)
if err != nil {
return errors.Wrap(err, "failed to upload the last chunk")
}
parts = append(parts, part)
}
if _, err := b.bucket.CompleteMultipartUpload(init, parts); err != nil {
return errors.Wrap(err, "failed to set multi-part upload completive")
}
}
}
return nil
}
// Delete removes the object with the given name.
func (b *Bucket) Delete(ctx context.Context, name string) error {
if err := b.bucket.DeleteObject(name); err != nil {
return errors.Wrap(err, "delete oss object")
}
return nil
}
// Attributes returns information about the specified object.
func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
m, err := b.bucket.GetObjectMeta(name)
if err != nil {
return objstore.ObjectAttributes{}, err
}
size, err := clientutil.ParseContentLength(m)
if err != nil {
return objstore.ObjectAttributes{}, err
}
// aliyun oss return Last-Modified header in RFC1123 format.
// see api doc for details: https://www.alibabacloud.com/help/doc-detail/31985.htm
mod, err := clientutil.ParseLastModified(m, time.RFC1123)
if err != nil {
return objstore.ObjectAttributes{}, err
}
return objstore.ObjectAttributes{
Size: size,
LastModified: mod,
}, nil
}
// NewBucket returns a new Bucket using the provided oss config values.
func NewBucket(logger log.Logger, conf []byte, component string, wrapRoundtripper func(http.RoundTripper) http.RoundTripper) (*Bucket, error) {
var config Config
if err := yaml.Unmarshal(conf, &config); err != nil {
return nil, errors.Wrap(err, "parse aliyun oss config file failed")
}
return NewBucketWithConfig(logger, config, component, wrapRoundtripper)
}
// NewBucketWithConfig returns a new Bucket using the provided oss config struct.
func NewBucketWithConfig(logger log.Logger, config Config, component string, wrapRoundtripper func(http.RoundTripper) http.RoundTripper) (*Bucket, error) {
if err := validate(config); err != nil {
return nil, err
}
var clientOptions []alioss.ClientOption
if wrapRoundtripper != nil {
rt, err := exthttp.DefaultTransport(exthttp.DefaultHTTPConfig)
if err != nil {
return nil, err
}
clientOptions = append(clientOptions, func(client *alioss.Client) {
client.HTTPClient = &http.Client{
Transport: wrapRoundtripper(rt),
}
})
}
client, err := alioss.New(config.Endpoint, config.AccessKeyID, config.AccessKeySecret, clientOptions...)
if err != nil {
return nil, errors.Wrap(err, "create aliyun oss client failed")
}
bk, err := client.Bucket(config.Bucket)
if err != nil {
return nil, errors.Wrapf(err, "use aliyun oss bucket %s failed", config.Bucket)
}
bkt := &Bucket{
logger: logger,
client: client,
name: config.Bucket,
config: config,
bucket: bk,
}
return bkt, nil
}
// validate checks to see the config options are set.
func validate(config Config) error {
if config.Endpoint == "" || config.Bucket == "" {
return errors.New("aliyun oss endpoint or bucket is not present in config file")
}
if config.AccessKeyID == "" || config.AccessKeySecret == "" {
return errors.New("aliyun oss access_key_id or access_key_secret is not present in config file")
}
return nil
}
func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType {
return []objstore.IterOptionType{objstore.Recursive}
}
// Iter calls f for each entry in the given directory. The argument to f is the full
// object name including the prefix of the inspected directory.
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
if dir != "" {
dir = strings.TrimSuffix(dir, objstore.DirDelim) + objstore.DirDelim
}
delimiter := alioss.Delimiter(objstore.DirDelim)
if objstore.ApplyIterOptions(options...).Recursive {
delimiter = nil
}
marker := alioss.Marker("")
for {
if err := ctx.Err(); err != nil {
return errors.Wrap(err, "context closed while iterating bucket")
}
objects, err := b.bucket.ListObjects(alioss.Prefix(dir), delimiter, marker)
if err != nil {
return errors.Wrap(err, "listing aliyun oss bucket failed")
}
marker = alioss.Marker(objects.NextMarker)
for _, object := range objects.Objects {
if err := f(object.Key); err != nil {
return errors.Wrapf(err, "callback func invoke for object %s failed ", object.Key)
}
}
for _, object := range objects.CommonPrefixes {
if err := f(object); err != nil {
return errors.Wrapf(err, "callback func invoke for directory %s failed", object)
}
}
if !objects.IsTruncated {
break
}
}
return nil
}
func (b *Bucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error {
if err := objstore.ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil {
return err
}
return b.Iter(ctx, dir, func(name string) error {
return f(objstore.IterObjectAttributes{Name: name})
}, options...)
}
func (b *Bucket) Name() string {
return b.name
}
func NewTestBucketFromConfig(t testing.TB, c Config, reuseBucket bool) (objstore.Bucket, func(), error) {
if c.Bucket == "" {
src := rand.NewSource(time.Now().UnixNano())
bktToCreate := strings.ReplaceAll(fmt.Sprintf("test_%s_%x", strings.ToLower(t.Name()), src.Int63()), "_", "-")
if len(bktToCreate) >= 63 {
bktToCreate = bktToCreate[:63]
}
testclient, err := alioss.New(c.Endpoint, c.AccessKeyID, c.AccessKeySecret)
if err != nil {
return nil, nil, errors.Wrap(err, "create aliyun oss client failed")
}
if err := testclient.CreateBucket(bktToCreate); err != nil {
return nil, nil, errors.Wrapf(err, "create aliyun oss bucket %s failed", bktToCreate)
}
c.Bucket = bktToCreate
}
bc, err := yaml.Marshal(c)
if err != nil {
return nil, nil, err
}
b, err := NewBucket(log.NewNopLogger(), bc, "thanos-aliyun-oss-test", nil)
if err != nil {
return nil, nil, err
}
if reuseBucket {
if err := b.Iter(context.Background(), "", func(_ string) error {
return errors.Errorf("bucket %s is not empty", c.Bucket)
}); err != nil {
return nil, nil, errors.Wrapf(err, "oss check bucket %s", c.Bucket)
}
t.Log("WARNING. Reusing", c.Bucket, "Aliyun OSS bucket for OSS tests. Manual cleanup afterwards is required")
return b, func() {}, nil
}
return b, func() {
objstore.EmptyBucket(t, context.Background(), b)
if err := b.client.DeleteBucket(c.Bucket); err != nil {
t.Logf("deleting bucket %s failed: %s", c.Bucket, err)
}
}, nil
}
func (b *Bucket) Close() error { return nil }
func (b *Bucket) setRange(start, end int64, name string) (alioss.Option, error) {
var opt alioss.Option
if 0 <= start && start <= end {
header, err := b.bucket.GetObjectMeta(name)
if err != nil {
return nil, err
}
size, err := strconv.ParseInt(header["Content-Length"][0], 10, 64)
if err != nil {
return nil, err
}
if end > size {
end = size - 1
}
opt = alioss.Range(start, end)
} else {
return nil, errors.Errorf("Invalid range specified: start=%d end=%d", start, end)
}
return opt, nil
}
func (b *Bucket) getRange(_ context.Context, name string, off, length int64) (io.ReadCloser, error) {
if name == "" {
return nil, errors.New("given object name should not empty")
}
var opts []alioss.Option
if length != -1 {
opt, err := b.setRange(off, off+length-1, name)
if err != nil {
return nil, err
}
opts = append(opts, opt)
}
resp, err := b.bucket.DoGetObject(&oss.GetObjectRequest{ObjectKey: name}, opts)
if err != nil {
return nil, err
}
size, err := clientutil.ParseContentLength(resp.Response.Headers)
if err == nil {
return objstore.ObjectSizerReadCloser{
ReadCloser: resp.Response,
Size: func() (int64, error) {
return size, nil
},
}, nil
}
return resp.Response, nil
}
// Get returns a reader for the given object name.
func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
return b.getRange(ctx, name, 0, -1)
}
func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
return b.getRange(ctx, name, off, length)
}
// Exists checks if the given object exists in the bucket.
func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) {
exists, err := b.bucket.IsObjectExist(name)
if err != nil {
if b.IsObjNotFoundErr(err) {
return false, nil
}
return false, errors.Wrap(err, "cloud not check if object exists")
}
return exists, nil
}
// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations.
func (b *Bucket) IsObjNotFoundErr(err error) bool {
switch aliErr := errors.Cause(err).(type) {
case alioss.ServiceError:
if aliErr.StatusCode == http.StatusNotFound {
return true
}
}
return false
}
// IsAccessDeniedErr returns true if access to object is denied.
func (b *Bucket) IsAccessDeniedErr(err error) bool {
switch aliErr := errors.Cause(err).(type) {
case alioss.ServiceError:
if aliErr.StatusCode == http.StatusForbidden {
return true
}
}
return false
}

@ -1588,10 +1588,13 @@ github.com/stretchr/testify/suite
# github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97
## explicit; go 1.22
github.com/thanos-io/objstore
github.com/thanos-io/objstore/clientutil
github.com/thanos-io/objstore/exthttp
github.com/thanos-io/objstore/providers/azure
github.com/thanos-io/objstore/providers/bos
github.com/thanos-io/objstore/providers/filesystem
github.com/thanos-io/objstore/providers/gcs
github.com/thanos-io/objstore/providers/oss
github.com/thanos-io/objstore/providers/s3
github.com/thanos-io/objstore/providers/swift
github.com/thanos-io/objstore/tracing/opentracing

Loading…
Cancel
Save