perf: Prealloc slice in index builder when downloading objects (#20647)

Signed-off-by: Joe Elliott <number101010@gmail.com>
pull/17964/head
Joe Elliott 4 months ago committed by GitHub
parent 734e23715d
commit 08e5c9d77c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 42
      pkg/dataobj/index/indexer.go
  2. 30
      pkg/dataobj/index/indexer_test.go

@ -354,6 +354,34 @@ func (si *serialIndexer) buildIndex(ctx context.Context, events []metastore.Obje
return indexPath, processed, nil
}
// downloadObject downloads an object from the bucket, attempting to pre-allocate
// the buffer based on object size from Attributes. Falls back to io.ReadAll if Attributes fails.
func downloadObject(ctx context.Context, bucket objstore.Bucket, path string) ([]byte, error) {
reader, err := bucket.Get(ctx, path)
if err != nil {
return nil, fmt.Errorf("failed to fetch object from storage: %w", err)
}
defer reader.Close()
// if possible, pre-allocate the buffer based on object size from Attributes
attrs, err := bucket.Attributes(ctx, path)
if err == nil && attrs.Size > 0 {
buf := make([]byte, attrs.Size)
_, err = io.ReadFull(reader, buf)
if err != nil {
return nil, fmt.Errorf("failed to read object: %w", err)
}
return buf, nil
}
// fallback to io.ReadAll if Attributes fails
object, err := io.ReadAll(reader)
if err != nil {
return nil, fmt.Errorf("failed to read object: %w", err)
}
return object, nil
}
// downloadWorker processes downloads from the input downloadQueue and writes the resulting buffer to the downloadedObjects output channel.
// It exits when downloadQueue is closed.
func downloadWorker(ctx context.Context, logger log.Logger, downloadQueue chan metastore.ObjectWrittenEvent, objectBucket objstore.Bucket, downloadedObjects chan downloadedObject) {
@ -364,21 +392,11 @@ func downloadWorker(ctx context.Context, logger log.Logger, downloadQueue chan m
objLogger := log.With(logger, "object_path", event.ObjectPath)
downloadStart := time.Now()
objectReader, err := objectBucket.Get(ctx, event.ObjectPath)
if err != nil {
downloadedObjects <- downloadedObject{
event: event,
err: fmt.Errorf("failed to fetch object from storage: %w", err),
}
continue
}
object, err := io.ReadAll(objectReader)
_ = objectReader.Close()
object, err := downloadObject(ctx, objectBucket, event.ObjectPath)
if err != nil {
downloadedObjects <- downloadedObject{
event: event,
err: fmt.Errorf("failed to read object: %w", err),
err: err,
}
continue
}

@ -427,3 +427,33 @@ func TestSerialIndexer_FlushOnBuilderFull(t *testing.T) {
require.Equal(t, float64(1), testutil.ToFloat64(indexerMetrics.totalRequests))
require.Equal(t, float64(1), testutil.ToFloat64(indexerMetrics.totalBuilds))
}
func TestDownloadObject_Success(t *testing.T) {
t.Parallel()
ctx := context.Background()
bucket := objstore.NewInMemBucket()
testData := []byte("test data content for download")
objectPath := "test-object"
// Upload test object
require.NoError(t, bucket.Upload(ctx, objectPath, bytes.NewReader(testData)))
// Download with pre-allocation
result, err := downloadObject(ctx, bucket, objectPath)
require.NoError(t, err)
require.Equal(t, testData, result)
}
func TestDownloadObject_ObjectNotFound(t *testing.T) {
t.Parallel()
ctx := context.Background()
bucket := objstore.NewInMemBucket()
objectPath := "non-existent-object"
// Try to download non-existent object
_, err := downloadObject(ctx, bucket, objectPath)
require.Error(t, err)
require.Contains(t, err.Error(), "failed to fetch object from storage")
}

Loading…
Cancel
Save