diff --git a/pkg/storage/chunk/aws/dynamodb_storage_client.go b/pkg/storage/chunk/aws/dynamodb_storage_client.go index a94673a9aa..ab4687dc82 100644 --- a/pkg/storage/chunk/aws/dynamodb_storage_client.go +++ b/pkg/storage/chunk/aws/dynamodb_storage_client.go @@ -570,6 +570,10 @@ func (a dynamoDBStorageClient) DeleteChunk(ctx context.Context, userID, chunkID return a.BatchWrite(ctx, dynamoDBWrites) } +func (a dynamoDBStorageClient) IsChunkNotFoundErr(_ error) bool { + return false +} + func (a dynamoDBStorageClient) writesForChunks(chunks []chunk.Chunk) (dynamoDBWriteBatch, error) { dynamoDBWrites := dynamoDBWriteBatch{} diff --git a/pkg/storage/chunk/aws/s3_storage_client.go b/pkg/storage/chunk/aws/s3_storage_client.go index 5a85712be2..130c1f47a0 100644 --- a/pkg/storage/chunk/aws/s3_storage_client.go +++ b/pkg/storage/chunk/aws/s3_storage_client.go @@ -290,11 +290,6 @@ func (a *S3ObjectClient) DeleteObject(ctx context.Context, objectKey string) err Key: aws.String(objectKey), }) if err != nil { - if aerr, ok := err.(awserr.Error); ok { - if aerr.Code() == s3.ErrCodeNoSuchKey { - return chunk.ErrStorageObjectNotFound - } - } return err } @@ -314,8 +309,7 @@ func (a *S3ObjectClient) bucketFromKey(key string) string { return a.bucketNames[hash%uint32(len(a.bucketNames))] } -// GetObject returns a reader for the specified object key from the configured S3 bucket. If the -// key does not exist a generic chunk.ErrStorageObjectNotFound error is returned. +// GetObject returns a reader for the specified object key from the configured S3 bucket. func (a *S3ObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, error) { var resp *s3.GetObjectOutput @@ -331,11 +325,6 @@ func (a *S3ObjectClient) GetObject(ctx context.Context, objectKey string) (io.Re return err }) if err != nil { - if aerr, ok := err.(awserr.Error); ok { - if aerr.Code() == s3.ErrCodeNoSuchKey { - return nil, chunk.ErrStorageObjectNotFound - } - } return nil, err } @@ -412,3 +401,12 @@ func (a *S3ObjectClient) List(ctx context.Context, prefix, delimiter string) ([] return storageObjects, commonPrefixes, nil } + +// IsObjectNotFoundErr returns true if error means that object is not found. Relevant to GetObject and DeleteObject operations. +func (a *S3ObjectClient) IsObjectNotFoundErr(err error) bool { + if aerr, ok := errors.Cause(err).(awserr.Error); ok && aerr.Code() == s3.ErrCodeNoSuchKey { + return true + } + + return false +} diff --git a/pkg/storage/chunk/azure/blob_storage_client.go b/pkg/storage/chunk/azure/blob_storage_client.go index 55aefa6cbe..ef18b29c69 100644 --- a/pkg/storage/chunk/azure/blob_storage_client.go +++ b/pkg/storage/chunk/azure/blob_storage_client.go @@ -138,9 +138,6 @@ func (b *BlobStorage) getObject(ctx context.Context, objectKey string) (rc io.Re // Request access to the blob downloadResponse, err := blockBlobURL.Download(ctx, 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false) if err != nil { - if isObjNotFoundErr(err) { - return nil, chunk.ErrStorageObjectNotFound - } return nil, err } @@ -250,9 +247,6 @@ func (b *BlobStorage) DeleteObject(ctx context.Context, blobID string) error { } _, err = blockBlobURL.Delete(ctx, azblob.DeleteSnapshotsOptionInclude, azblob.BlobAccessConditions{}) - if err != nil && isObjNotFoundErr(err) { - return chunk.ErrStorageObjectNotFound - } return err } @@ -272,8 +266,8 @@ func (b *BlobStorage) selectContainerURLFmt() string { return endpoints[b.cfg.Environment].containerURLFmt } -// isObjNotFoundErr returns true if error means that object is not found. Relevant to GetObject and DeleteObject operations. -func isObjNotFoundErr(err error) bool { +// IsObjectNotFoundErr returns true if error means that object is not found. Relevant to GetObject and DeleteObject operations. +func (b *BlobStorage) IsObjectNotFoundErr(err error) bool { var e azblob.StorageError if errors.As(err, &e) && e.ServiceCode() == azblob.ServiceCodeBlobNotFound { return true diff --git a/pkg/storage/chunk/cassandra/storage_client.go b/pkg/storage/chunk/cassandra/storage_client.go index 2a644583c6..ca8f1c606c 100644 --- a/pkg/storage/chunk/cassandra/storage_client.go +++ b/pkg/storage/chunk/cassandra/storage_client.go @@ -546,6 +546,10 @@ func (s *ObjectClient) DeleteChunk(ctx context.Context, userID, chunkID string) return nil } +func (s *ObjectClient) IsChunkNotFoundErr(_ error) bool { + return false +} + // Stop implement chunk.ObjectClient. func (s *ObjectClient) Stop() { s.readSession.Close() diff --git a/pkg/storage/chunk/chunk_store.go b/pkg/storage/chunk/chunk_store.go index 6a46b26259..9cd33a468f 100644 --- a/pkg/storage/chunk/chunk_store.go +++ b/pkg/storage/chunk/chunk_store.go @@ -636,7 +636,7 @@ func (c *baseStore) deleteChunk(ctx context.Context, err = c.chunks.DeleteChunk(ctx, userID, chunkID) if err != nil { - if err == ErrStorageObjectNotFound { + if c.chunks.IsChunkNotFoundErr(err) { return nil } return errors.Wrapf(err, "when deleting chunk from storage with chunkID=%s", chunkID) @@ -657,7 +657,7 @@ func (c *baseStore) reboundChunk(ctx context.Context, userID, chunkID string, pa chunks, err := c.fetcher.FetchChunks(ctx, []Chunk{chunk}, []string{chunkID}) if err != nil { - if err == ErrStorageObjectNotFound { + if c.fetcher.IsChunkNotFoundErr(err) { return nil } return errors.Wrap(err, "when fetching chunk from storage for slicing") diff --git a/pkg/storage/chunk/chunk_store_utils.go b/pkg/storage/chunk/chunk_store_utils.go index 0f68375c27..e2cc1b2e1e 100644 --- a/pkg/storage/chunk/chunk_store_utils.go +++ b/pkg/storage/chunk/chunk_store_utils.go @@ -252,3 +252,7 @@ func (c *Fetcher) processCacheResponse(ctx context.Context, chunks []Chunk, keys } return found, missing, err } + +func (c *Fetcher) IsChunkNotFoundErr(err error) bool { + return c.storage.IsChunkNotFoundErr(err) +} diff --git a/pkg/storage/chunk/gcp/bigtable_object_client.go b/pkg/storage/chunk/gcp/bigtable_object_client.go index f3d847c8c7..fa59005929 100644 --- a/pkg/storage/chunk/gcp/bigtable_object_client.go +++ b/pkg/storage/chunk/gcp/bigtable_object_client.go @@ -181,3 +181,7 @@ func (s *bigtableObjectClient) DeleteChunk(ctx context.Context, userID, chunkID return s.client.Open(tableName).Apply(ctx, chunkID, mut) } + +func (s *bigtableObjectClient) IsChunkNotFoundErr(_ error) bool { + return false +} diff --git a/pkg/storage/chunk/gcp/gcs_object_client.go b/pkg/storage/chunk/gcp/gcs_object_client.go index 6fd40e3e3d..537aeecb61 100644 --- a/pkg/storage/chunk/gcp/gcs_object_client.go +++ b/pkg/storage/chunk/gcp/gcs_object_client.go @@ -7,6 +7,7 @@ import ( "time" "cloud.google.com/go/storage" + "github.com/pkg/errors" "google.golang.org/api/iterator" "google.golang.org/api/option" @@ -73,8 +74,7 @@ func (s *GCSObjectClient) Stop() { s.client.Close() } -// GetObject returns a reader for the specified object key from the configured GCS bucket. If the -// key does not exist a generic chunk.ErrStorageObjectNotFound error is returned. +// GetObject returns a reader for the specified object key from the configured GCS bucket. func (s *GCSObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, error) { var cancel context.CancelFunc = func() {} if s.cfg.RequestTimeout > 0 { @@ -94,9 +94,6 @@ func (s *GCSObjectClient) GetObject(ctx context.Context, objectKey string) (io.R func (s *GCSObjectClient) getObject(ctx context.Context, objectKey string) (rc io.ReadCloser, err error) { reader, err := s.bucket.Object(objectKey).NewReader(ctx) if err != nil { - if err == storage.ErrObjectNotExist { - return nil, chunk.ErrStorageObjectNotFound - } return nil, err } @@ -168,16 +165,17 @@ func (s *GCSObjectClient) List(ctx context.Context, prefix, delimiter string) ([ return storageObjects, commonPrefixes, nil } -// DeleteObject deletes the specified object key from the configured GCS bucket. If the -// key does not exist a generic chunk.ErrStorageObjectNotFound error is returned. +// DeleteObject deletes the specified object key from the configured GCS bucket. func (s *GCSObjectClient) DeleteObject(ctx context.Context, objectKey string) error { err := s.bucket.Object(objectKey).Delete(ctx) if err != nil { - if err == storage.ErrObjectNotExist { - return chunk.ErrStorageObjectNotFound - } return err } return nil } + +// IsObjectNotFoundErr returns true if error means that object is not found. Relevant to GetObject and DeleteObject operations. +func (s *GCSObjectClient) IsObjectNotFoundErr(err error) bool { + return errors.Is(err, storage.ErrObjectNotExist) +} diff --git a/pkg/storage/chunk/grpc/storage_client.go b/pkg/storage/chunk/grpc/storage_client.go index 31bc876a04..66a7a5731b 100644 --- a/pkg/storage/chunk/grpc/storage_client.go +++ b/pkg/storage/chunk/grpc/storage_client.go @@ -74,6 +74,10 @@ func (s *StorageClient) DeleteChunk(ctx context.Context, userID, chunkID string) return nil } +func (s *StorageClient) IsChunkNotFoundErr(_ error) bool { + return false +} + func (s *StorageClient) GetChunks(ctx context.Context, input []chunk.Chunk) ([]chunk.Chunk, error) { req := &GetChunksRequest{} req.Chunks = []*Chunk{} diff --git a/pkg/storage/chunk/inmemory_storage_client.go b/pkg/storage/chunk/inmemory_storage_client.go index 793bbf3a8f..c310b37a55 100644 --- a/pkg/storage/chunk/inmemory_storage_client.go +++ b/pkg/storage/chunk/inmemory_storage_client.go @@ -18,7 +18,10 @@ import ( type MockStorageMode int -var errPermissionDenied = errors.New("permission denied") +var ( + errPermissionDenied = errors.New("permission denied") + errStorageObjectNotFound = errors.New("object not found in storage") +) const ( MockStorageModeReadWrite = 0 @@ -377,7 +380,7 @@ func (m *MockStorage) GetChunks(ctx context.Context, chunkSet []Chunk) ([]Chunk, key := chunk.ExternalKey() buf, ok := m.objects[key] if !ok { - return nil, ErrStorageObjectNotFound + return nil, errStorageObjectNotFound } if err := chunk.Decode(decodeContext, buf); err != nil { return nil, err @@ -406,7 +409,7 @@ func (m *MockStorage) GetObject(ctx context.Context, objectKey string) (io.ReadC buf, ok := m.objects[objectKey] if !ok { - return nil, ErrStorageObjectNotFound + return nil, errStorageObjectNotFound } return ioutil.NopCloser(bytes.NewReader(buf)), nil @@ -429,6 +432,14 @@ func (m *MockStorage) PutObject(ctx context.Context, objectKey string, object io return nil } +func (m *MockStorage) IsObjectNotFoundErr(err error) bool { + return errors.Is(err, errStorageObjectNotFound) +} + +func (m *MockStorage) IsChunkNotFoundErr(err error) bool { + return m.IsObjectNotFoundErr(err) +} + func (m *MockStorage) DeleteObject(ctx context.Context, objectKey string) error { m.mtx.Lock() defer m.mtx.Unlock() @@ -438,7 +449,7 @@ func (m *MockStorage) DeleteObject(ctx context.Context, objectKey string) error } if _, ok := m.objects[objectKey]; !ok { - return ErrStorageObjectNotFound + return errStorageObjectNotFound } delete(m.objects, objectKey) diff --git a/pkg/storage/chunk/local/fs_object_client.go b/pkg/storage/chunk/local/fs_object_client.go index d1c1e5d023..6f733110c7 100644 --- a/pkg/storage/chunk/local/fs_object_client.go +++ b/pkg/storage/chunk/local/fs_object_client.go @@ -10,6 +10,7 @@ import ( "time" "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" "github.com/thanos-io/thanos/pkg/runutil" util_log "github.com/cortexproject/cortex/pkg/util/log" @@ -61,11 +62,11 @@ func (FSObjectClient) Stop() {} // GetObject from the store func (f *FSObjectClient) GetObject(_ context.Context, objectKey string) (io.ReadCloser, error) { fl, err := os.Open(filepath.Join(f.cfg.Directory, filepath.FromSlash(objectKey))) - if err != nil && os.IsNotExist(err) { - return nil, chunk.ErrStorageObjectNotFound + if err != nil { + return nil, err } - return fl, err + return fl, nil } // PutObject into the store @@ -197,6 +198,11 @@ func (f *FSObjectClient) DeleteChunksBefore(ctx context.Context, ts time.Time) e }) } +// IsObjectNotFoundErr returns true if error means that object is not found. Relevant to GetObject and DeleteObject operations. +func (f *FSObjectClient) IsObjectNotFoundErr(err error) bool { + return os.IsNotExist(errors.Cause(err)) +} + // copied from https://github.com/thanos-io/thanos/blob/55cb8ca38b3539381dc6a781e637df15c694e50a/pkg/objstore/filesystem/filesystem.go#L181 func isDirEmpty(name string) (ok bool, err error) { f, err := os.Open(name) diff --git a/pkg/storage/chunk/objectclient/client.go b/pkg/storage/chunk/objectclient/client.go index 23e51a6300..395b5f41cb 100644 --- a/pkg/storage/chunk/objectclient/client.go +++ b/pkg/storage/chunk/objectclient/client.go @@ -117,3 +117,7 @@ func (o *Client) DeleteChunk(ctx context.Context, userID, chunkID string) error } return o.store.DeleteObject(ctx, key) } + +func (o *Client) IsChunkNotFoundErr(err error) bool { + return o.store.IsObjectNotFoundErr(err) +} diff --git a/pkg/storage/chunk/openstack/swift_object_client.go b/pkg/storage/chunk/openstack/swift_object_client.go index 238a9f780e..e45f58a6c7 100644 --- a/pkg/storage/chunk/openstack/swift_object_client.go +++ b/pkg/storage/chunk/openstack/swift_object_client.go @@ -9,6 +9,7 @@ import ( "io/ioutil" "github.com/ncw/swift" + "github.com/pkg/errors" cortex_swift "github.com/cortexproject/cortex/pkg/storage/bucket/swift" "github.com/cortexproject/cortex/pkg/util/log" @@ -92,15 +93,11 @@ func (s *SwiftObjectClient) Stop() { s.conn.UnAuthenticate() } -// GetObject returns a reader for the specified object key from the configured swift container. If the -// key does not exist a generic chunk.ErrStorageObjectNotFound error is returned. +// GetObject returns a reader for the specified object key from the configured swift container. func (s *SwiftObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, error) { var buf bytes.Buffer _, err := s.conn.ObjectGet(s.cfg.ContainerName, objectKey, &buf, false, nil) if err != nil { - if err == swift.ObjectNotFound { - return nil, chunk.ErrStorageObjectNotFound - } return nil, err } @@ -151,15 +148,12 @@ func (s *SwiftObjectClient) List(ctx context.Context, prefix, delimiter string) return storageObjects, storagePrefixes, nil } -// DeleteObject deletes the specified object key from the configured Swift container. If the -// key does not exist a generic chunk.ErrStorageObjectNotFound error is returned. +// DeleteObject deletes the specified object key from the configured Swift container. func (s *SwiftObjectClient) DeleteObject(ctx context.Context, objectKey string) error { - err := s.conn.ObjectDelete(s.cfg.ContainerName, objectKey) - if err == nil { - return nil - } - if err == swift.ObjectNotFound { - return chunk.ErrStorageObjectNotFound - } - return err + return s.conn.ObjectDelete(s.cfg.ContainerName, objectKey) +} + +// IsObjectNotFoundErr returns true if error means that object is not found. Relevant to GetObject and DeleteObject operations. +func (s *SwiftObjectClient) IsObjectNotFoundErr(err error) bool { + return errors.Is(err, swift.ObjectNotFound) } diff --git a/pkg/storage/chunk/purger/purger.go b/pkg/storage/chunk/purger/purger.go index 640cb04d18..5b2f478d90 100644 --- a/pkg/storage/chunk/purger/purger.go +++ b/pkg/storage/chunk/purger/purger.go @@ -337,7 +337,7 @@ func (p *Purger) executePlan(userID, requestID string, planNo int, logger log.Lo plan, err := p.getDeletePlan(context.Background(), userID, requestID, planNo) if err != nil { - if err == chunk.ErrStorageObjectNotFound { + if p.objectClient.IsObjectNotFoundErr(err) { level.Info(logger).Log("msg", "plan not found, must have been executed already") // this means plan was already executed and got removed. Do nothing. return nil @@ -369,7 +369,7 @@ func (p *Purger) executePlan(userID, requestID string, planNo int, logger log.Lo err = p.chunkStore.DeleteChunk(ctx, chunkRef.From, chunkRef.Through, chunkRef.UserID, chunkDetails.ID, cortexpb.FromLabelAdaptersToLabels(plan.ChunksGroup[i].Labels), partiallyDeletedInterval) if err != nil { - if isMissingChunkErr(err) { + if p.isMissingChunkErr(err) { level.Error(logger).Log("msg", "chunk not found for deletion. We may have already deleted it", "chunk_id", chunkDetails.ID) continue @@ -727,11 +727,11 @@ func groupChunks(chunks []chunk.Chunk, deleteFrom, deleteThrough model.Time, inc return chunksGroups, includedChunkIDs } -func isMissingChunkErr(err error) bool { - if err == chunk.ErrStorageObjectNotFound { +func (p *Purger) isMissingChunkErr(err error) bool { + if p.objectClient.IsObjectNotFoundErr(err) { return true } - if promqlStorageErr, ok := err.(promql.ErrStorage); ok && promqlStorageErr.Err == chunk.ErrStorageObjectNotFound { + if promqlStorageErr, ok := err.(promql.ErrStorage); ok && p.objectClient.IsObjectNotFoundErr(promqlStorageErr.Err) { return true } diff --git a/pkg/storage/chunk/storage/metrics.go b/pkg/storage/chunk/storage/metrics.go index 0821134999..8cddf186bd 100644 --- a/pkg/storage/chunk/storage/metrics.go +++ b/pkg/storage/chunk/storage/metrics.go @@ -108,3 +108,7 @@ func (c metricsChunkClient) GetChunks(ctx context.Context, chunks []chunk.Chunk) func (c metricsChunkClient) DeleteChunk(ctx context.Context, userID, chunkID string) error { return c.client.DeleteChunk(ctx, userID, chunkID) } + +func (c metricsChunkClient) IsChunkNotFoundErr(err error) bool { + return c.client.IsChunkNotFoundErr(err) +} diff --git a/pkg/storage/chunk/storage_client.go b/pkg/storage/chunk/storage_client.go index 62d4eaa49a..7314e5a29a 100644 --- a/pkg/storage/chunk/storage_client.go +++ b/pkg/storage/chunk/storage_client.go @@ -8,8 +8,6 @@ import ( ) var ( - // ErrStorageObjectNotFound when object storage does not have requested object - ErrStorageObjectNotFound = errors.New("object not found in storage") // ErrMethodNotImplemented when any of the storage clients do not implement a method ErrMethodNotImplemented = errors.New("method is not implemented") ) @@ -33,6 +31,7 @@ type Client interface { PutChunks(ctx context.Context, chunks []Chunk) error GetChunks(ctx context.Context, chunks []Chunk) ([]Chunk, error) DeleteChunk(ctx context.Context, userID, chunkID string) error + IsChunkNotFoundErr(err error) bool } // ObjectAndIndexClient allows optimisations where the same client handles both @@ -76,6 +75,7 @@ type ObjectClient interface { // Keys of returned storage objects have given prefix. List(ctx context.Context, prefix string, delimiter string) ([]StorageObject, []StorageCommonPrefix, error) DeleteObject(ctx context.Context, objectKey string) error + IsObjectNotFoundErr(err error) bool Stop() } diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_table.go b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_table.go index 64769027a1..61fbe18d22 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_table.go +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_table.go @@ -65,7 +65,7 @@ func (t *deleteRequestsTable) init() error { _, err := os.Stat(t.dbPath) if err != nil { err = shipper_util.GetFileFromStorage(context.Background(), t.objectClient, objectPathInStorage, t.dbPath, true) - if err != nil && !errors.Is(err, chunk.ErrStorageObjectNotFound) { + if err != nil && !t.objectClient.IsObjectNotFoundErr(err) { return err } } diff --git a/pkg/storage/stores/shipper/compactor/retention/retention.go b/pkg/storage/stores/shipper/compactor/retention/retention.go index ba0090af77..347434f773 100644 --- a/pkg/storage/stores/shipper/compactor/retention/retention.go +++ b/pkg/storage/stores/shipper/compactor/retention/retention.go @@ -175,6 +175,7 @@ func markforDelete(ctx context.Context, marker MarkerStorageWriter, chunkIt Chun type ChunkClient interface { DeleteChunk(ctx context.Context, userID, chunkID string) error + IsChunkNotFoundErr(err error) bool } type Sweeper struct { @@ -210,7 +211,7 @@ func (s *Sweeper) Start() { } err = s.chunkClient.DeleteChunk(ctx, unsafeGetString(userID), chunkIDString) - if err == chunk.ErrStorageObjectNotFound { + if s.chunkClient.IsChunkNotFoundErr(err) { status = statusNotFound level.Debug(util_log.Logger).Log("msg", "delete on not found chunk", "chunkID", chunkIDString) return nil diff --git a/pkg/storage/stores/shipper/compactor/retention/retention_test.go b/pkg/storage/stores/shipper/compactor/retention/retention_test.go index e3eaaf34a8..e1f7843613 100644 --- a/pkg/storage/stores/shipper/compactor/retention/retention_test.go +++ b/pkg/storage/stores/shipper/compactor/retention/retention_test.go @@ -41,6 +41,10 @@ func (m *mockChunkClient) DeleteChunk(_ context.Context, _, chunkID string) erro return nil } +func (m *mockChunkClient) IsChunkNotFoundErr(err error) bool { + return false +} + func (m *mockChunkClient) getDeletedChunkIds() []string { m.mtx.Lock() defer m.mtx.Unlock() diff --git a/pkg/storage/stores/shipper/downloads/table.go b/pkg/storage/stores/shipper/downloads/table.go index 947bf6addb..756df33d9c 100644 --- a/pkg/storage/stores/shipper/downloads/table.go +++ b/pkg/storage/stores/shipper/downloads/table.go @@ -2,7 +2,6 @@ package downloads import ( "context" - "errors" "fmt" "io" "io/ioutil" @@ -41,6 +40,7 @@ type BoltDBIndexClient interface { type StorageClient interface { GetObject(ctx context.Context, objectKey string) (io.ReadCloser, error) List(ctx context.Context, prefix, delimiter string) ([]chunk.StorageObject, []chunk.StorageCommonPrefix, error) + IsObjectNotFoundErr(err error) bool } // Table is a collection of multiple files created for a same table by various ingesters. @@ -458,7 +458,7 @@ func (t *Table) downloadFile(ctx context.Context, storageObject chunk.StorageObj err = shipper_util.GetFileFromStorage(ctx, t.storageClient, storageObject.Key, filePath, true) if err != nil { - if errors.Is(err, chunk.ErrStorageObjectNotFound) { + if t.storageClient.IsObjectNotFoundErr(err) { level.Info(util_log.Logger).Log("msg", fmt.Sprintf("ignoring missing object %s, possibly removed during compaction", storageObject.Key)) return nil } @@ -537,7 +537,7 @@ func (t *Table) doParallelDownload(ctx context.Context, objects []chunk.StorageO filePath := path.Join(folderPathForTable, dbName) err = shipper_util.GetFileFromStorage(ctx, t.storageClient, object.Key, filePath, true) if err != nil { - if errors.Is(err, chunk.ErrStorageObjectNotFound) { + if t.storageClient.IsObjectNotFoundErr(err) { level.Info(util_log.Logger).Log("msg", fmt.Sprintf("ignoring missing object %s, possibly removed during compaction", object.Key)) err = nil } else { diff --git a/pkg/storage/stores/util/object_client.go b/pkg/storage/stores/util/object_client.go index 4d571abb49..0dec53266c 100644 --- a/pkg/storage/stores/util/object_client.go +++ b/pkg/storage/stores/util/object_client.go @@ -42,6 +42,10 @@ func (p PrefixedObjectClient) DeleteObject(ctx context.Context, objectKey string return p.downstreamClient.DeleteObject(ctx, p.prefix+objectKey) } +func (p PrefixedObjectClient) IsObjectNotFoundErr(err error) bool { + return p.downstreamClient.IsObjectNotFoundErr(err) +} + func (p PrefixedObjectClient) Stop() { p.downstreamClient.Stop() } diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go index c4e558dfbe..8703428faf 100644 --- a/pkg/storage/util_test.go +++ b/pkg/storage/util_test.go @@ -245,6 +245,10 @@ func (m mockChunkStoreClient) DeleteChunk(ctx context.Context, userID, chunkID s return nil } +func (m mockChunkStoreClient) IsChunkNotFoundErr(_ error) bool { + return false +} + var streamsFixture = []*logproto.Stream{ { Labels: "{foo=\"bar\"}",