Fix prefetch in HeapIterator when Len() or Peek() are called as first methods

pull/848/head
Marco Pracucci 6 years ago committed by Sandeep Sukhani
parent 347a3e18f4
commit a22be17859
  1. 46
      pkg/iter/iterator.go
  2. 34
      pkg/iter/iterator_test.go

@ -113,8 +113,8 @@ type heapIterator struct {
heap.Interface
Peek() EntryIterator
}
is []EntryIterator
prenext bool
is []EntryIterator
prefetched bool
tuples tuples
currEntry logproto.Entry
@ -139,6 +139,30 @@ func NewHeapIterator(is []EntryIterator, direction logproto.Direction) HeapItera
return result
}
// prefetch iterates over all inner iterators to merge together, calls Next() on
// each of them to prefetch the first entry and pushes of them - who are not
// empty - to the heap
func (i *heapIterator) prefetch() {
if i.prefetched {
return
}
i.prefetched = true
for _, it := range i.is {
i.requeue(it, false)
}
// We can now clear the list of input iterators to merge, given they have all
// been processed and the non empty ones have been pushed to the heap
i.is = nil
}
// requeue pushes the input ei EntryIterator to the heap, advancing it via an ei.Next()
// call unless the advanced input parameter is true. In this latter case it expects that
// the iterator has already been advanced before calling requeue().
//
// If the iterator has no more entries or an error occur while advancing it, the iterator
// is not pushed to the heap and any possible error captured, so that can be get via Error().
func (i *heapIterator) requeue(ei EntryIterator, advanced bool) {
if advanced || ei.Next() {
heap.Push(i.heap, ei)
@ -167,14 +191,8 @@ func (t tuples) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
func (t tuples) Less(i, j int) bool { return t[i].Line < t[j].Line }
func (i *heapIterator) Next() bool {
if !i.prenext {
i.prenext = true
// pre-next each iterator, drop empty.
for _, it := range i.is {
i.requeue(it, false)
}
i.is = nil
}
i.prefetch()
if i.heap.Len() == 0 {
return false
}
@ -183,7 +201,6 @@ func (i *heapIterator) Next() bool {
// preserve their original order. We look at all the top entries in the
// heap with the same timestamp, and pop the ones whose common value
// occurs most often.
for i.heap.Len() > 0 {
next := i.heap.Peek()
entry := next.Entry()
@ -198,7 +215,7 @@ func (i *heapIterator) Next() bool {
})
}
// Find in entry which occurs most often which, due to quorum based
// Find in tuples which entry occurs most often which, due to quorum based
// replication, is guaranteed to be the correct next entry.
t := mostCommon(i.tuples)
i.currEntry = t.Entry
@ -263,10 +280,15 @@ func (i *heapIterator) Close() error {
}
func (i *heapIterator) Peek() time.Time {
i.prefetch()
return i.heap.Peek().Entry().Timestamp
}
// Len returns the number of inner iterators on the heap, still having entries
func (i *heapIterator) Len() int {
i.prefetch()
return i.heap.Len()
}

@ -152,6 +152,40 @@ func TestIteratorMultipleLabels(t *testing.T) {
}
}
func TestHeapIteratorPrefetch(t *testing.T) {
t.Parallel()
type tester func(t *testing.T, i HeapIterator)
tests := map[string]tester{
"prefetch on Len() when called as first method": func(t *testing.T, i HeapIterator) {
assert.Equal(t, 2, i.Len())
},
"prefetch on Peek() when called as first method": func(t *testing.T, i HeapIterator) {
assert.Equal(t, time.Unix(0, 0), i.Peek())
},
"prefetch on Next() when called as first method": func(t *testing.T, i HeapIterator) {
assert.True(t, i.Next())
assert.Equal(t, logproto.Entry{Timestamp: time.Unix(0, 0), Line: "0"}, i.Entry())
},
}
for testName, testFunc := range tests {
testFunc := testFunc
t.Run(testName, func(t *testing.T) {
t.Parallel()
i := NewHeapIterator([]EntryIterator{
mkStreamIterator(identity, "{foobar: \"baz1\"}"),
mkStreamIterator(identity, "{foobar: \"baz2\"}"),
}, logproto.FORWARD)
testFunc(t, i)
})
}
}
type generator func(i int64) logproto.Entry
func mkStreamIterator(f generator, labels string) EntryIterator {

Loading…
Cancel
Save