diff --git a/pkg/dataobj/index/indexer.go b/pkg/dataobj/index/indexer.go index 5d8d16e6fa..4ebed57360 100644 --- a/pkg/dataobj/index/indexer.go +++ b/pkg/dataobj/index/indexer.go @@ -354,6 +354,34 @@ func (si *serialIndexer) buildIndex(ctx context.Context, events []metastore.Obje return indexPath, processed, nil } +// downloadObject downloads an object from the bucket, attempting to pre-allocate +// the buffer based on object size from Attributes. Falls back to io.ReadAll if Attributes fails. +func downloadObject(ctx context.Context, bucket objstore.Bucket, path string) ([]byte, error) { + reader, err := bucket.Get(ctx, path) + if err != nil { + return nil, fmt.Errorf("failed to fetch object from storage: %w", err) + } + defer reader.Close() + + // if possible, pre-allocate the buffer based on object size from Attributes + attrs, err := bucket.Attributes(ctx, path) + if err == nil && attrs.Size > 0 { + buf := make([]byte, attrs.Size) + _, err = io.ReadFull(reader, buf) + if err != nil { + return nil, fmt.Errorf("failed to read object: %w", err) + } + return buf, nil + } + + // fallback to io.ReadAll if Attributes fails + object, err := io.ReadAll(reader) + if err != nil { + return nil, fmt.Errorf("failed to read object: %w", err) + } + return object, nil +} + // downloadWorker processes downloads from the input downloadQueue and writes the resulting buffer to the downloadedObjects output channel. // It exits when downloadQueue is closed. func downloadWorker(ctx context.Context, logger log.Logger, downloadQueue chan metastore.ObjectWrittenEvent, objectBucket objstore.Bucket, downloadedObjects chan downloadedObject) { @@ -364,21 +392,11 @@ func downloadWorker(ctx context.Context, logger log.Logger, downloadQueue chan m objLogger := log.With(logger, "object_path", event.ObjectPath) downloadStart := time.Now() - objectReader, err := objectBucket.Get(ctx, event.ObjectPath) - if err != nil { - downloadedObjects <- downloadedObject{ - event: event, - err: fmt.Errorf("failed to fetch object from storage: %w", err), - } - continue - } - - object, err := io.ReadAll(objectReader) - _ = objectReader.Close() + object, err := downloadObject(ctx, objectBucket, event.ObjectPath) if err != nil { downloadedObjects <- downloadedObject{ event: event, - err: fmt.Errorf("failed to read object: %w", err), + err: err, } continue } diff --git a/pkg/dataobj/index/indexer_test.go b/pkg/dataobj/index/indexer_test.go index 43fb618cd3..9f684ce626 100644 --- a/pkg/dataobj/index/indexer_test.go +++ b/pkg/dataobj/index/indexer_test.go @@ -427,3 +427,33 @@ func TestSerialIndexer_FlushOnBuilderFull(t *testing.T) { require.Equal(t, float64(1), testutil.ToFloat64(indexerMetrics.totalRequests)) require.Equal(t, float64(1), testutil.ToFloat64(indexerMetrics.totalBuilds)) } + +func TestDownloadObject_Success(t *testing.T) { + t.Parallel() + ctx := context.Background() + + bucket := objstore.NewInMemBucket() + testData := []byte("test data content for download") + objectPath := "test-object" + + // Upload test object + require.NoError(t, bucket.Upload(ctx, objectPath, bytes.NewReader(testData))) + + // Download with pre-allocation + result, err := downloadObject(ctx, bucket, objectPath) + require.NoError(t, err) + require.Equal(t, testData, result) +} + +func TestDownloadObject_ObjectNotFound(t *testing.T) { + t.Parallel() + ctx := context.Background() + + bucket := objstore.NewInMemBucket() + objectPath := "non-existent-object" + + // Try to download non-existent object + _, err := downloadObject(ctx, bucket, objectPath) + require.Error(t, err) + require.Contains(t, err.Error(), "failed to fetch object from storage") +}