diff --git a/pkg/storage/batch.go b/pkg/storage/batch.go index aa492d7613..32b35be6e3 100644 --- a/pkg/storage/batch.go +++ b/pkg/storage/batch.go @@ -128,25 +128,18 @@ func newBatchChunkIterator( func (it *batchChunkIterator) Start() { if !it.begun { it.begun = true - go it.loop(it.ctx) + go it.loop() } } -func (it *batchChunkIterator) loop(ctx context.Context) { - defer func() { - if p := recover(); p != nil { - it.next <- &chunkBatch{err: errors.Errorf("panic while fecthing chunks %+v", p)} - close(it.next) - } - }() +func (it *batchChunkIterator) loop() { for { if it.chunks.Len() == 0 { close(it.next) return } select { - case <-ctx.Done(): - it.next <- &chunkBatch{err: ctx.Err()} + case <-it.ctx.Done(): close(it.next) return case it.next <- it.nextBatch(): @@ -159,7 +152,15 @@ func (it *batchChunkIterator) Next() *chunkBatch { return <-it.next } -func (it *batchChunkIterator) nextBatch() *chunkBatch { +func (it *batchChunkIterator) nextBatch() (res *chunkBatch) { + defer func() { + if p := recover(); p != nil { + level.Error(util_log.Logger).Log("msg", "panic while fetching chunks", "panic", p) + res = &chunkBatch{ + err: errors.Errorf("panic while fecthing chunks %+v", p), + } + } + }() // the first chunk of the batch headChunk := it.chunks.Peek() from, through := it.start, it.end @@ -329,9 +330,12 @@ func (it *logBatchIterator) Error() error { if it.err != nil { return it.err } - if it.curr != nil { + if it.curr != nil && it.curr.Error() != nil { return it.curr.Error() } + if it.ctx.Err() != nil { + return it.ctx.Err() + } return nil } @@ -464,9 +468,12 @@ func (it *sampleBatchIterator) Error() error { if it.err != nil { return it.err } - if it.curr != nil { + if it.curr != nil && it.curr.Error() != nil { return it.curr.Error() } + if it.ctx.Err() != nil { + return it.ctx.Err() + } return nil }