diff --git a/pkg/storage/stores/shipper/bloomshipper/fetcher.go b/pkg/storage/stores/shipper/bloomshipper/fetcher.go index fef1e89404..7069af2fb3 100644 --- a/pkg/storage/stores/shipper/bloomshipper/fetcher.go +++ b/pkg/storage/stores/shipper/bloomshipper/fetcher.go @@ -484,7 +484,7 @@ type downloadResponse[R any] struct { } type downloadQueue[T any, R any] struct { - queue chan downloadRequest[T, R] + queue chan *downloadRequest[T, R] enqueued *swiss.Map[string, struct{}] enqueuedMutex sync.Mutex mu keymutex.KeyMutex @@ -502,7 +502,7 @@ func newDownloadQueue[T any, R any](size, workers int, process processFunc[T, R] return nil, errors.New("queue requires at least 1 worker") } q := &downloadQueue[T, R]{ - queue: make(chan downloadRequest[T, R], size), + queue: make(chan *downloadRequest[T, R], size), enqueued: swiss.NewMap[string, struct{}](uint32(size)), mu: keymutex.NewHashed(workers), done: make(chan struct{}), @@ -518,7 +518,7 @@ func newDownloadQueue[T any, R any](size, workers int, process processFunc[T, R] func (q *downloadQueue[T, R]) enqueue(t downloadRequest[T, R]) { if !t.async { - q.queue <- t + q.queue <- &t return } // for async task we attempt to dedupe task already in progress. @@ -528,7 +528,7 @@ func (q *downloadQueue[T, R]) enqueue(t downloadRequest[T, R]) { return } select { - case q.queue <- t: + case q.queue <- &t: q.enqueued.Put(t.key, struct{}{}) default: // todo we probably want a metric on dropped items @@ -544,7 +544,7 @@ func (q *downloadQueue[T, R]) runWorker() { case <-q.done: return case task := <-q.queue: - q.do(task.ctx, task) + q.do(task.ctx, *task) } } }