Fix heap iterator & make tests a lot better. (#23)

Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
pull/25/head
Tom Wilkie 7 years ago committed by GitHub
parent 5233e78048
commit 9b1dc8062e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 51
      pkg/iter/iterator.go
  2. 113
      pkg/iter/iterator_test.go

@ -99,43 +99,38 @@ type heapIterator struct {
func NewHeapIterator(is []EntryIterator, direction logproto.Direction) EntryIterator {
result := &heapIterator{}
iterators := make(iteratorHeap, 0, len(is))
// pre-next each iterator, drop empty.
for _, i := range is {
if i.Next() {
iterators = append(iterators, i)
} else {
result.recordError(i)
i.Close()
}
}
switch direction {
case logproto.BACKWARD:
result.heap = &iteratorMaxHeap{iterators}
result.heap = &iteratorMaxHeap{}
case logproto.FORWARD:
result.heap = &iteratorMinHeap{iterators}
result.heap = &iteratorMinHeap{}
default:
panic("bad direction")
}
// pre-next each iterator, drop empty.
for _, i := range is {
result.requeue(i)
}
return result
}
func (i *heapIterator) recordError(ei EntryIterator) {
err := ei.Error()
if err != nil {
func (i *heapIterator) requeue(ei EntryIterator) {
if ei.Next() {
heap.Push(i.heap, ei)
return
}
if err := ei.Error(); err != nil {
i.errs = append(i.errs, err)
}
ei.Close()
}
func (i *heapIterator) Next() bool {
if i.curr != nil {
if i.curr.Next() {
heap.Push(i.heap, i.curr)
} else {
i.recordError(i.curr)
i.curr.Close()
}
i.requeue(i.curr)
}
if i.heap.Len() == 0 {
@ -143,16 +138,18 @@ func (i *heapIterator) Next() bool {
}
i.curr = heap.Pop(i.heap).(EntryIterator)
currEntry := i.curr.Entry()
// keep popping entries off if they match, to dedupe
curr := i.curr.Entry()
for i.heap.Len() > 0 {
next := i.heap.Peek().Entry()
if curr.Equal(next) {
i.heap.Pop()
} else {
next := i.heap.Peek()
nextEntry := next.Entry()
if !currEntry.Equal(nextEntry) {
break
}
next = heap.Pop(i.heap).(EntryIterator)
i.requeue(next)
}
return true

@ -1,6 +1,7 @@
package iter
import (
"fmt"
"testing"
"time"
@ -10,40 +11,64 @@ import (
const testSize = 100
func TestStreamIterator(t *testing.T) {
iterator := mkStreamIterator(testSize, func(i int64) logproto.Entry {
return logproto.Entry{
Timestamp: time.Unix(-i, 0),
}
})
testIterator(t, iterator, testSize, logproto.BACKWARD)
}
func TestIterator(t *testing.T) {
for i, tc := range []struct {
iterator EntryIterator
generator generator
length int64
}{
// Test basic identity.
{
iterator: mkStreamIterator(testSize, identity),
generator: identity,
length: testSize,
},
func TestHeapIteratorBackward(t *testing.T) {
iterators := []EntryIterator{}
for i := int64(0); i < 4; i++ {
iterators = append(iterators, mkStreamIterator(testSize/4, func(j int64) logproto.Entry {
return logproto.Entry{
Timestamp: time.Unix(-j*4-i, 0),
}
}))
}
testIterator(t, NewHeapIterator(iterators, logproto.BACKWARD), testSize, logproto.BACKWARD)
}
// Test basic identity (backwards).
{
iterator: mkStreamIterator(testSize, inverse(identity)),
generator: inverse(identity),
length: testSize,
},
func TestHeapIteratorForward(t *testing.T) {
iterators := []EntryIterator{}
for i := int64(0); i < 4; i++ {
iterators = append(iterators, mkStreamIterator(testSize/4, func(j int64) logproto.Entry {
return logproto.Entry{
Timestamp: time.Unix(j*4+i, 0),
// Test dedupe of overlapping iterators with the heap iterator.
{
iterator: NewHeapIterator([]EntryIterator{
mkStreamIterator(testSize, offset(0)),
mkStreamIterator(testSize, offset(testSize/2)),
mkStreamIterator(testSize, offset(testSize)),
}, logproto.FORWARD),
generator: identity,
length: 2 * testSize,
},
// Test dedupe of overlapping iterators with the heap iterator (backward).
{
iterator: NewHeapIterator([]EntryIterator{
mkStreamIterator(testSize, inverse(offset(0))),
mkStreamIterator(testSize, inverse(offset(-testSize/2))),
mkStreamIterator(testSize, inverse(offset(-testSize))),
}, logproto.BACKWARD),
generator: inverse(identity),
length: 2 * testSize,
},
} {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
for i := int64(0); i < tc.length; i++ {
assert.Equal(t, true, tc.iterator.Next())
assert.Equal(t, tc.generator(i), tc.iterator.Entry(), fmt.Sprintln("iteration", i))
}
}))
assert.Equal(t, false, tc.iterator.Next())
assert.Equal(t, nil, tc.iterator.Error())
assert.NoError(t, tc.iterator.Close())
})
}
testIterator(t, NewHeapIterator(iterators, logproto.FORWARD), testSize, logproto.FORWARD)
}
func mkStreamIterator(numEntries int64, f func(i int64) logproto.Entry) EntryIterator {
type generator func(i int64) logproto.Entry
func mkStreamIterator(numEntries int64, f generator) EntryIterator {
entries := []logproto.Entry{}
for i := int64(0); i < numEntries; i++ {
entries = append(entries, f(i))
@ -53,20 +78,24 @@ func mkStreamIterator(numEntries int64, f func(i int64) logproto.Entry) EntryIte
})
}
func testIterator(t *testing.T, iterator EntryIterator,
testSize int64, direction logproto.Direction) {
i := int64(0)
for ; i < testSize && iterator.Next(); i++ {
switch direction {
case logproto.BACKWARD:
assert.Equal(t, -i, iterator.Entry().Timestamp.Unix())
case logproto.FORWARD:
assert.Equal(t, i, iterator.Entry().Timestamp.Unix())
default:
panic(direction)
func identity(i int64) logproto.Entry {
return logproto.Entry{
Timestamp: time.Unix(i, 0),
Line: fmt.Sprintf("%d", i),
}
}
func offset(j int64) generator {
return func(i int64) logproto.Entry {
return logproto.Entry{
Timestamp: time.Unix(i+j, 0),
Line: fmt.Sprintf("%d", i+j),
}
}
assert.Equal(t, i, int64(testSize))
assert.NoError(t, iterator.Error())
assert.NoError(t, iterator.Close())
}
func inverse(g generator) generator {
return func(i int64) logproto.Entry {
return g(-i)
}
}

Loading…
Cancel
Save