@ -128,25 +128,18 @@ func newBatchChunkIterator(
func ( it * batchChunkIterator ) Start ( ) {
if ! it . begun {
it . begun = true
go it . loop ( it . ctx )
go it . loop ( )
}
}
func ( it * batchChunkIterator ) loop ( ctx context . Context ) {
defer func ( ) {
if p := recover ( ) ; p != nil {
it . next <- & chunkBatch { err : errors . Errorf ( "panic while fecthing chunks %+v" , p ) }
close ( it . next )
}
} ( )
func ( it * batchChunkIterator ) loop ( ) {
for {
if it . chunks . Len ( ) == 0 {
close ( it . next )
return
}
select {
case <- ctx . Done ( ) :
it . next <- & chunkBatch { err : ctx . Err ( ) }
case <- it . ctx . Done ( ) :
close ( it . next )
return
case it . next <- it . nextBatch ( ) :
@ -159,7 +152,15 @@ func (it *batchChunkIterator) Next() *chunkBatch {
return <- it . next
}
func ( it * batchChunkIterator ) nextBatch ( ) * chunkBatch {
func ( it * batchChunkIterator ) nextBatch ( ) ( res * chunkBatch ) {
defer func ( ) {
if p := recover ( ) ; p != nil {
level . Error ( util_log . Logger ) . Log ( "msg" , "panic while fetching chunks" , "panic" , p )
res = & chunkBatch {
err : errors . Errorf ( "panic while fecthing chunks %+v" , p ) ,
}
}
} ( )
// the first chunk of the batch
headChunk := it . chunks . Peek ( )
from , through := it . start , it . end
@ -329,9 +330,12 @@ func (it *logBatchIterator) Error() error {
if it . err != nil {
return it . err
}
if it . curr != nil {
if it . curr != nil && it . curr . Error ( ) != nil {
return it . curr . Error ( )
}
if it . ctx . Err ( ) != nil {
return it . ctx . Err ( )
}
return nil
}
@ -464,9 +468,12 @@ func (it *sampleBatchIterator) Error() error {
if it . err != nil {
return it . err
}
if it . curr != nil {
if it . curr != nil && it . curr . Error ( ) != nil {
return it . curr . Error ( )
}
if it . ctx . Err ( ) != nil {
return it . ctx . Err ( )
}
return nil
}