|
|
|
|
@ -134,6 +134,11 @@ func (i *heapIterator) requeue(ei EntryIterator, advanced bool) { |
|
|
|
|
helpers.LogError("closing iterator", ei.Close) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type tuple struct { |
|
|
|
|
logproto.Entry |
|
|
|
|
EntryIterator |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (i *heapIterator) Next() bool { |
|
|
|
|
if i.heap.Len() == 0 { |
|
|
|
|
return false |
|
|
|
|
@ -143,10 +148,7 @@ func (i *heapIterator) Next() bool { |
|
|
|
|
// preserve their original order. We look at all the top entries in the
|
|
|
|
|
// heap with the same timestamp, and pop the ones whose common value
|
|
|
|
|
// occurs most often.
|
|
|
|
|
type tuple struct { |
|
|
|
|
logproto.Entry |
|
|
|
|
EntryIterator |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
tuples := make([]tuple, 0, i.heap.Len()) |
|
|
|
|
for i.heap.Len() > 0 { |
|
|
|
|
next := i.heap.Peek() |
|
|
|
|
@ -164,30 +166,38 @@ func (i *heapIterator) Next() bool { |
|
|
|
|
|
|
|
|
|
// Find in entry which occurs most often which, due to quorum based
|
|
|
|
|
// replication, is guaranteed to be the correct next entry.
|
|
|
|
|
i.currEntry = mostCommon(tuples).Entry |
|
|
|
|
|
|
|
|
|
// Requeue the iterators, only advancing them if they were not the
|
|
|
|
|
// correct pick.
|
|
|
|
|
for j := range tuples { |
|
|
|
|
i.requeue(tuples[j].EntryIterator, tuples[j].Line != i.currEntry.Line) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return true |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func mostCommon(tuples []tuple) tuple { |
|
|
|
|
sort.Slice(tuples, func(i, j int) bool { |
|
|
|
|
return tuples[i].Line < tuples[j].Line |
|
|
|
|
}) |
|
|
|
|
i.currEntry = tuples[0].Entry |
|
|
|
|
count, max := 1, 1 |
|
|
|
|
for j := 1; j < len(tuples); j++ { |
|
|
|
|
if tuples[j].Equal(tuples[j-1]) { |
|
|
|
|
result := tuples[0] |
|
|
|
|
count, max := 0, 0 |
|
|
|
|
for i := 0; i < len(tuples)-1; i++ { |
|
|
|
|
if tuples[i].Equal(tuples[i+1].Entry) { |
|
|
|
|
count++ |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
if count > max { |
|
|
|
|
i.currEntry = tuples[j-1].Entry |
|
|
|
|
result = tuples[i] |
|
|
|
|
max = count |
|
|
|
|
} |
|
|
|
|
count++ |
|
|
|
|
count = 0 |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Requeue the iterators, only advancing them if they were not the
|
|
|
|
|
// correct pick.
|
|
|
|
|
for j := range tuples { |
|
|
|
|
i.requeue(tuples[j].EntryIterator, tuples[j].Line != i.currEntry.Line) |
|
|
|
|
if count > max { |
|
|
|
|
result = tuples[len(tuples)-1] |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return true |
|
|
|
|
return result |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (i *heapIterator) Entry() logproto.Entry { |
|
|
|
|
|