Storage: Introduce new "ObjectExists" API (#10360)

**What this PR does / why we need it**:
Introduce to our `Store` interface a new function named `ObjectExists`,
which can be used by external tools to validate if a given file is
present on an object store without downloading the file.

**Which issue(s) this PR fixes**:
N/A
pull/10399/head
Dylan Guedes 2 years ago committed by GitHub
parent 3a4c0e2d47
commit 6ce510dc55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      pkg/storage/chunk/client/alibaba/oss_object_client.go
  2. 17
      pkg/storage/chunk/client/aws/s3_storage_client.go
  3. 18
      pkg/storage/chunk/client/azure/blob_storage_client.go
  4. 13
      pkg/storage/chunk/client/baidubce/bos_storage_client.go
  5. 5
      pkg/storage/chunk/client/congestion/controller.go
  6. 4
      pkg/storage/chunk/client/congestion/controller_test.go
  7. 9
      pkg/storage/chunk/client/gcp/gcs_object_client.go
  8. 18
      pkg/storage/chunk/client/ibmcloud/cos_object_client.go
  9. 9
      pkg/storage/chunk/client/local/fs_object_client.go
  10. 2
      pkg/storage/chunk/client/object_client.go
  11. 9
      pkg/storage/chunk/client/openstack/swift_object_client.go
  12. 16
      pkg/storage/chunk/client/testutils/inmemory_storage_client.go
  13. 4
      pkg/storage/stores/indexshipper/storage/prefixed_object_client.go

@ -71,6 +71,19 @@ func NewOssObjectClient(_ context.Context, cfg OssConfig) (client.ObjectClient,
func (s *OssObjectClient) Stop() {
}
func (s *OssObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
var options []oss.Option
err := instrument.CollectedRequest(ctx, "OSS.ObjectExists", ossRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
_, requestErr := s.defaultBucket.GetObjectMeta(objectKey, options...)
return requestErr
})
if err != nil {
return false, err
}
return true, nil
}
// GetObject returns a reader and the size for the specified object key from the configured OSS bucket.
func (s *OssObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) {
var resp *oss.GetObjectResult

@ -299,6 +299,23 @@ func buckets(cfg S3Config) ([]string, error) {
// Stop fulfills the chunk.ObjectClient interface
func (a *S3ObjectClient) Stop() {}
func (a *S3ObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
err := instrument.CollectedRequest(ctx, "S3.ObjectExists", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
headObjectInput := &s3.HeadObjectInput{
Bucket: aws.String(a.bucketFromKey(objectKey)),
Key: aws.String(objectKey),
}
_, err := a.S3.HeadObject(headObjectInput)
return err
})
if err != nil {
return false, err
}
return true, nil
}
// DeleteObject deletes the specified objectKey from the appropriate S3 bucket
func (a *S3ObjectClient) DeleteObject(ctx context.Context, objectKey string) error {
return instrument.CollectedRequest(ctx, "S3.DeleteObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {

@ -214,6 +214,24 @@ func NewBlobStorage(cfg *BlobStorageConfig, metrics BlobStorageMetrics, hedgingC
// Stop is a no op, as there are no background workers with this driver currently
func (b *BlobStorage) Stop() {}
func (b *BlobStorage) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
err := loki_instrument.TimeRequest(ctx, "azure.ObjectExists", instrument.NewHistogramCollector(b.metrics.requestDuration), instrument.ErrorCode, func(ctx context.Context) error {
blockBlobURL, err := b.getBlobURL(objectKey, false)
if err != nil {
return err
}
_, err = blockBlobURL.GetProperties(ctx, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{})
return err
})
if err != nil {
return false, err
}
return true, nil
}
// GetObject returns a reader and the size for the specified object key.
func (b *BlobStorage) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) {
var cancel context.CancelFunc = func() {}

@ -89,6 +89,19 @@ func (b *BOSObjectStorage) PutObject(ctx context.Context, objectKey string, obje
})
}
func (b *BOSObjectStorage) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
err := instrument.CollectedRequest(ctx, "BOS.ObjectExists", bosRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
var requestErr error
_, requestErr = b.client.GetObjectMeta(b.cfg.BucketName, objectKey)
return requestErr
})
if err != nil {
return false, err
}
return true, nil
}
func (b *BOSObjectStorage) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) {
var res *api.GetObjectResult
err := instrument.CollectedRequest(ctx, "BOS.GetObject", bosRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {

@ -131,6 +131,10 @@ func (a *AIMDController) List(ctx context.Context, prefix string, delimiter stri
return a.inner.List(ctx, prefix, delimiter)
}
func (a *AIMDController) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
return a.inner.ObjectExists(ctx, objectKey)
}
func (a *AIMDController) DeleteObject(ctx context.Context, objectKey string) error {
return a.inner.DeleteObject(ctx, objectKey)
}
@ -198,6 +202,7 @@ func NewNoopController(Config) *NoopController {
return &NoopController{}
}
func (n *NoopController) ObjectExists(context.Context, string) (bool, error) { return true, nil }
func (n *NoopController) PutObject(context.Context, string, io.ReadSeeker) error { return nil }
func (n *NoopController) GetObject(context.Context, string) (io.ReadCloser, int64, error) {
return nil, 0, nil

@ -251,6 +251,10 @@ func (m *mockObjectClient) GetObject(context.Context, string) (io.ReadCloser, in
return io.NopCloser(strings.NewReader("bar")), 3, nil
}
func (m *mockObjectClient) ObjectExists(context.Context, string) (bool, error) {
panic("not implemented")
}
func (m *mockObjectClient) List(context.Context, string, string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
panic("not implemented")
}

@ -125,6 +125,15 @@ func newBucketHandle(ctx context.Context, cfg GCSConfig, hedgingCfg hedging.Conf
func (s *GCSObjectClient) Stop() {
}
func (s *GCSObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
_, err := s.getsBuckets.Object(objectKey).Attrs(ctx)
if err != nil {
return false, err
}
return true, nil
}
// GetObject returns a reader and the size for the specified object key from the configured GCS bucket.
func (s *GCSObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) {
var cancel context.CancelFunc = func() {}

@ -316,6 +316,24 @@ func (c *COSObjectClient) DeleteObject(ctx context.Context, objectKey string) er
})
}
func (c *COSObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
bucket := c.bucketFromKey(objectKey)
err := instrument.CollectedRequest(ctx, "COS.GetObject", cosRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
var requestErr error
_, requestErr = c.hedgedCOS.HeadObject(&cos.HeadObjectInput{
Bucket: ibm.String(bucket),
Key: ibm.String(objectKey),
})
return requestErr
})
if err != nil {
return false, err
}
return true, nil
}
// GetObject returns a reader and the size for the specified object key from the configured S3 bucket.
func (c *COSObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) {

@ -65,6 +65,15 @@ func NewFSObjectClient(cfg FSConfig) (*FSObjectClient, error) {
// Stop implements ObjectClient
func (FSObjectClient) Stop() {}
func (f *FSObjectClient) ObjectExists(_ context.Context, objectKey string) (bool, error) {
_, err := os.Lstat(objectKey)
if err != nil {
return false, err
}
return true, nil
}
// GetObject from the store
func (f *FSObjectClient) GetObject(_ context.Context, objectKey string) (io.ReadCloser, int64, error) {
fl, err := os.Open(filepath.Join(f.cfg.Directory, filepath.FromSlash(objectKey)))

@ -17,6 +17,8 @@ import (
// ObjectClient is used to store arbitrary data in Object Store (S3/GCS/Azure/...)
type ObjectClient interface {
ObjectExists(ctx context.Context, objectKey string) (bool, error)
PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error
// NOTE: The consumer of GetObject should always call the Close method when it is done reading which otherwise could cause a resource leak.
GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error)

@ -124,6 +124,15 @@ func (s *SwiftObjectClient) Stop() {
s.hedgingConn.UnAuthenticate()
}
func (s *SwiftObjectClient) ObjectExists(_ context.Context, objectKey string) (bool, error) {
_, _, err := s.hedgingConn.Object(s.cfg.ContainerName, objectKey)
if err != nil {
return false, err
}
return true, nil
}
// GetObject returns a reader and the size for the specified object key from the configured swift container.
func (s *SwiftObjectClient) GetObject(_ context.Context, objectKey string) (io.ReadCloser, int64, error) {
var buf bytes.Buffer

@ -421,6 +421,22 @@ func (m *MockStorage) DeleteChunk(ctx context.Context, _, chunkID string) error
return m.DeleteObject(ctx, chunkID)
}
func (m *MockStorage) ObjectExists(_ context.Context, objectKey string) (bool, error) {
m.mtx.RLock()
defer m.mtx.RUnlock()
if m.mode == MockStorageModeWriteOnly {
return false, errPermissionDenied
}
_, ok := m.objects[objectKey]
if !ok {
return false, nil
}
return true, nil
}
func (m *MockStorage) GetObject(_ context.Context, objectKey string) (io.ReadCloser, int64, error) {
m.mtx.RLock()
defer m.mtx.RUnlock()

@ -21,6 +21,10 @@ func (p prefixedObjectClient) PutObject(ctx context.Context, objectKey string, o
return p.downstreamClient.PutObject(ctx, p.prefix+objectKey, object)
}
func (p prefixedObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
return p.downstreamClient.ObjectExists(ctx, p.prefix+objectKey)
}
func (p prefixedObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) {
return p.downstreamClient.GetObject(ctx, p.prefix+objectKey)
}

Loading…
Cancel
Save