@ -43,6 +43,8 @@ type batchChunkIterator struct {
lastOverlapping [ ] * LazyChunk
iterFactory chunksIteratorFactory
begun bool
ctx context . Context
cancel context . CancelFunc
start , end time . Time
direction logproto . Direction
@ -69,6 +71,7 @@ func newBatchChunkIterator(
start : start ,
end : end ,
direction : direction ,
ctx : ctx ,
cancel : cancel ,
iterFactory : iterFactory ,
chunks : lazyChunks { direction : direction , chunks : chunks } ,
@ -78,10 +81,17 @@ func newBatchChunkIterator(
} ) ,
}
sort . Sort ( res . chunks )
go res . loop ( ctx )
return res
}
// Start is idempotent and will begin the processing thread which seeds the iterator data.
func ( it * batchChunkIterator ) Start ( ) {
if ! it . begun {
it . begun = true
go it . loop ( it . ctx )
}
}
func ( it * batchChunkIterator ) loop ( ctx context . Context ) {
for {
if it . chunks . Len ( ) == 0 {
@ -111,6 +121,8 @@ func (it *batchChunkIterator) loop(ctx context.Context) {
}
func ( it * batchChunkIterator ) Next ( ) bool {
it . Start ( ) // Ensure the iterator has started.
var err error
// for loop to avoid recursion
for {
@ -140,18 +152,22 @@ func (it *batchChunkIterator) nextBatch() (genericIterator, error) {
batch := make ( [ ] * LazyChunk , 0 , it . batchSize + len ( it . lastOverlapping ) )
var nextChunk * LazyChunk
var includesOverlap bool
for it . chunks . Len ( ) > 0 {
// pop the next batch of chunks and append/prepend previous overlapping chunks
// so we can merge/de-dupe overlapping entries.
if it . direction == logproto . FORWARD {
if ! includesOverlap && it . direction == logproto . FORWARD {
batch = append ( batch , it . lastOverlapping ... )
}
batch = append ( batch , it . chunks . pop ( it . batchSize ) ... )
if it . direction == logproto . BACKWARD {
if ! includesOverlap && it . direction == logproto . BACKWARD {
batch = append ( batch , it . lastOverlapping ... )
}
includesOverlap = true
if it . chunks . Len ( ) > 0 {
nextChunk = it . chunks . Peek ( )
// we max out our iterator boundaries to the next chunks in the queue
@ -294,8 +310,12 @@ func newLogBatchIterator(
filter : filter ,
ctx : ctx ,
}
batch := newBatchChunkIterator ( ctx , chunks , batchSize , direction , start , end , logbatch . newChunksIterator )
// Important: since the batchChunkIterator is bound to the LogBatchIterator,
// ensure embedded fields are present before it's started.
logbatch . batchChunkIterator = batch
batch . Start ( )
return logbatch , nil
}
@ -321,10 +341,12 @@ func (it *logBatchIterator) newChunksIterator(chunks []*LazyChunk, from, through
func ( it * logBatchIterator ) buildIterators ( chks map [ model . Fingerprint ] [ ] [ ] * LazyChunk , from , through time . Time , nextChunk * LazyChunk ) ( [ ] iter . EntryIterator , error ) {
result := make ( [ ] iter . EntryIterator , 0 , len ( chks ) )
for _ , chunks := range chks {
iterator , err := it . buildHeapIterator ( chunks , from , through , nextChunk )
if err != nil {
return nil , err
}
result = append ( result , iterator )
}
@ -391,7 +413,11 @@ func newSampleBatchIterator(
ctx : ctx ,
}
batch := newBatchChunkIterator ( ctx , chunks , batchSize , logproto . FORWARD , start , end , samplebatch . newChunksIterator )
// Important: since the batchChunkIterator is bound to the SampleBatchIterator,
// ensure embedded fields are present before it's started.
samplebatch . batchChunkIterator = batch
batch . Start ( )
return samplebatch , nil
}