feat(thanos): rewrite azure blob keys using chunk delimiter (#15935)

pull/15952/head
Ashwanth 1 year ago committed by GitHub
parent ed863fe1be
commit f658331a21
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 9
      pkg/storage/bucket/azure/bucket_client.go
  2. 2
      pkg/storage/bucket/azure/config.go
  3. 43
      pkg/storage/bucket/azure/key_rewrite_bucket.go
  4. 161
      pkg/storage/bucket/azure/key_rewrite_bucket_test.go

@ -9,7 +9,14 @@ import (
)
func NewBucketClient(cfg Config, name string, logger log.Logger, wrapRT func(http.RoundTripper) http.RoundTripper) (objstore.Bucket, error) {
return newBucketClient(cfg, name, logger, wrapRT, azure.NewBucketWithConfig)
bucket, err := newBucketClient(cfg, name, logger, wrapRT, azure.NewBucketWithConfig)
if err != nil {
return nil, err
}
return &keyRewriteBucket{
Bucket: bucket,
delimiter: cfg.ChunkDelimiter,
}, nil
}
func newBucketClient(cfg Config, name string, logger log.Logger, wrapRT func(http.RoundTripper) http.RoundTripper, factory func(log.Logger, azure.Config, string, func(http.RoundTripper) http.RoundTripper) (*azure.Bucket, error)) (objstore.Bucket, error) {

@ -16,6 +16,7 @@ type Config struct {
Endpoint string `yaml:"endpoint_suffix"`
MaxRetries int `yaml:"max_retries"`
UserAssignedID string `yaml:"user_assigned_id"`
ChunkDelimiter string `yaml:"chunk_delimiter"`
// Allow upstream callers to inject a round tripper
Transport http.RoundTripper `yaml:"-"`
@ -35,4 +36,5 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.Endpoint, prefix+"azure.endpoint-suffix", "", "Azure storage endpoint suffix without schema. The account name will be prefixed to this value to create the FQDN. If set to empty string, default endpoint suffix is used.")
f.IntVar(&cfg.MaxRetries, prefix+"azure.max-retries", 20, "Number of retries for recoverable errors")
f.StringVar(&cfg.UserAssignedID, prefix+"azure.user-assigned-id", "", "User assigned managed identity. If empty, then System assigned identity is used.")
f.StringVar(&cfg.ChunkDelimiter, prefix+"azure.chunk-delimiter", "-", "Delimiter used to replace ':' in chunk IDs when storing chunks")
}

@ -0,0 +1,43 @@
package azure
import (
"context"
"io"
"strings"
"github.com/thanos-io/objstore"
)
// keyRewriteBucket wraps a bucket and replaces ":" with a configured delimiter in all object keys.
type keyRewriteBucket struct {
objstore.Bucket
delimiter string
}
func (b *keyRewriteBucket) rewriteKey(key string) string {
return strings.Replace(key, ":", b.delimiter, -1)
}
func (b *keyRewriteBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
return b.Bucket.Get(ctx, b.rewriteKey(name))
}
func (b *keyRewriteBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
return b.Bucket.GetRange(ctx, b.rewriteKey(name), off, length)
}
func (b *keyRewriteBucket) Exists(ctx context.Context, name string) (bool, error) {
return b.Bucket.Exists(ctx, b.rewriteKey(name))
}
func (b *keyRewriteBucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
return b.Bucket.Attributes(ctx, b.rewriteKey(name))
}
func (b *keyRewriteBucket) Upload(ctx context.Context, name string, r io.Reader) error {
return b.Bucket.Upload(ctx, b.rewriteKey(name), r)
}
func (b *keyRewriteBucket) Delete(ctx context.Context, name string) error {
return b.Bucket.Delete(ctx, b.rewriteKey(name))
}

@ -0,0 +1,161 @@
package azure
import (
"context"
"io"
"strings"
"testing"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
)
type mockBucket struct {
objstore.Bucket
lastKey string
iterKeys []string
}
func (m *mockBucket) Get(_ context.Context, name string) (io.ReadCloser, error) {
m.lastKey = name
return nil, nil
}
func (m *mockBucket) GetRange(_ context.Context, name string, _, _ int64) (io.ReadCloser, error) {
m.lastKey = name
return nil, nil
}
func (m *mockBucket) Exists(_ context.Context, name string) (bool, error) {
m.lastKey = name
return false, nil
}
func (m *mockBucket) Attributes(_ context.Context, name string) (objstore.ObjectAttributes, error) {
m.lastKey = name
return objstore.ObjectAttributes{}, nil
}
func (m *mockBucket) Upload(_ context.Context, name string, _ io.Reader) error {
m.lastKey = name
return nil
}
func (m *mockBucket) Delete(_ context.Context, name string) error {
m.lastKey = name
return nil
}
func (m *mockBucket) Iter(_ context.Context, _ string, f func(string) error, _ ...objstore.IterOption) error {
for _, key := range m.iterKeys {
if err := f(key); err != nil {
return err
}
}
return nil
}
func TestKeyRewriteBucket(t *testing.T) {
mock := &mockBucket{}
bucket := &keyRewriteBucket{
Bucket: mock,
delimiter: "-",
}
tests := []struct {
name string
input string
expected string
fn func(string) error
}{
{
name: "Get replaces colons",
input: "foo:bar:baz",
expected: "foo-bar-baz",
fn: func(key string) error {
_, err := bucket.Get(context.Background(), key)
return err
},
},
{
name: "GetRange replaces colons",
input: "foo:bar:baz",
expected: "foo-bar-baz",
fn: func(key string) error {
_, err := bucket.GetRange(context.Background(), key, 0, 10)
return err
},
},
{
name: "Exists replaces colons",
input: "foo:bar:baz",
expected: "foo-bar-baz",
fn: func(key string) error {
_, err := bucket.Exists(context.Background(), key)
return err
},
},
{
name: "Attributes replaces colons",
input: "foo:bar:baz",
expected: "foo-bar-baz",
fn: func(key string) error {
_, err := bucket.Attributes(context.Background(), key)
return err
},
},
{
name: "Upload replaces colons",
input: "foo:bar:baz",
expected: "foo-bar-baz",
fn: func(key string) error {
return bucket.Upload(context.Background(), key, strings.NewReader("test"))
},
},
{
name: "Delete replaces colons",
input: "foo:bar:baz",
expected: "foo-bar-baz",
fn: func(key string) error {
return bucket.Delete(context.Background(), key)
},
},
{
name: "No colons remains unchanged",
input: "foo/bar/baz",
expected: "foo/bar/baz",
fn: func(key string) error {
return bucket.Delete(context.Background(), key)
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
err := tc.fn(tc.input)
require.NoError(t, err)
require.Equal(t, tc.expected, mock.lastKey)
})
}
}
func TestKeyRewriteBucket_Iter(t *testing.T) {
iterKeys := []string{"foo:bar:baz", "foo-bar-qux", "foo-bar-quux"}
mock := &mockBucket{
iterKeys: iterKeys,
}
bucket := &keyRewriteBucket{
Bucket: mock,
delimiter: "-",
}
var gotKeys []string
// keyRewriteBucket transparently returns the keys in the storage
err := bucket.Iter(context.Background(), "", func(name string) error {
gotKeys = append(gotKeys, name)
return nil
})
require.NoError(t, err)
require.EqualValues(t, iterKeys, gotKeys)
}
Loading…
Cancel
Save