Iterators: re-implement entrySortIterator using loserTree for performance (#8351)

**What this PR does / why we need it**:

[Draft PR for comment at this time]

This implementation uses a data structure called Loser Tree, also known
as Tournament Tree, based on Knuth, "Sorting and Searching" section
5.4.1.

I couldn't find an existing Loser Tree in Go, so I wrote my own; the
file is 132 lines long. It uses generics in the hope it can also be
applied elsewhere.

One benchmark result: 
```
name                 old time/op    new time/op    delta
SortIterator/sort-4    3.78ms ± 4%    2.66ms ± 3%  -29.54%  (p=0.008 n=5+5)

name                 old alloc/op   new alloc/op   delta
SortIterator/sort-4     319kB ± 0%      14kB ± 0%  -95.52%  (p=0.008 n=5+5)

name                 old allocs/op  new allocs/op  delta
SortIterator/sort-4       104 ± 0%         5 ± 0%  -95.19%  (p=0.008 n=5+5)
```

**Checklist**
- [x] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- NA Documentation added
- [x] Tests updated
- [ ] `CHANGELOG.md` updated
- NA Changes that require user attention or interaction to upgrade are
documented in `docs/sources/upgrading/_index.md`
pull/8593/head
Bryan Boreham 3 years ago committed by GitHub
parent eeca4589fb
commit 019ac9975c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 150
      pkg/iter/entry_iterator.go
  2. 128
      pkg/util/loser/tree.go
  3. 120
      pkg/util/loser/tree_test.go

@ -4,13 +4,14 @@ import (
"container/heap"
"context"
"io"
"sort"
"math"
"sync"
"time"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/loser"
)
// EntryIterator iterates over entries in time-order.
@ -327,11 +328,9 @@ func (i *mergeEntryIterator) Len() int {
}
type entrySortIterator struct {
is []EntryIterator
prefetched bool
byAscendingTime bool
currEntry entryWithLabels
errs []error
tree *loser.Tree[sortFields, EntryIterator]
currEntry entryWithLabels
errs []error
}
// NewSortEntryIterator returns a new EntryIterator that sorts entries by timestamp (depending on the direction) the input iterators.
@ -345,120 +344,79 @@ func NewSortEntryIterator(is []EntryIterator, direction logproto.Direction) Entr
if len(is) == 1 {
return is[0]
}
result := &entrySortIterator{is: is}
maxVal, less := treeLess(direction)
result := &entrySortIterator{}
result.tree = loser.New(is, maxVal, sortFieldsAt, less, result.closeEntry)
return result
}
func treeLess(direction logproto.Direction) (maxVal sortFields, less func(a, b sortFields) bool) {
switch direction {
case logproto.BACKWARD:
result.byAscendingTime = false
maxVal = sortFields{timeNanos: math.MinInt64}
less = lessDescending
case logproto.FORWARD:
result.byAscendingTime = true
maxVal = sortFields{timeNanos: math.MaxInt64}
less = lessAscending
default:
panic("bad direction")
}
return result
return
}
func (i *entrySortIterator) lessByIndex(k, j int) bool {
t1, t2 := i.is[k].Entry().Timestamp.UnixNano(), i.is[j].Entry().Timestamp.UnixNano()
if t1 == t2 {
// The underlying stream hash may not be available, such as when merging LokiResponses in the
// frontend which were sharded. Prefer to use the underlying stream hash when available,
// which is needed in deduping code, but defer to label sorting when it's not present.
if i.is[k].StreamHash() == 0 {
return i.is[k].Labels() < i.is[j].Labels()
}
return i.is[k].StreamHash() < i.is[j].StreamHash()
}
if i.byAscendingTime {
return t1 < t2
}
return t1 > t2
type sortFields struct {
labels string
timeNanos int64
streamHash uint64
}
func (i *entrySortIterator) lessByValue(t1 int64, l1 uint64, lb string, index int) bool {
t2 := i.is[index].Entry().Timestamp.UnixNano()
if t1 == t2 {
if l1 == 0 {
return lb < i.is[index].Labels()
}
return l1 < i.is[index].StreamHash()
func sortFieldsAt(i EntryIterator) sortFields {
return sortFields{
timeNanos: i.Entry().Timestamp.UnixNano(),
labels: i.Labels(),
streamHash: i.StreamHash(),
}
if i.byAscendingTime {
return t1 < t2
}
return t1 > t2
}
// init throws out empty iterators and sorts them.
func (i *entrySortIterator) init() {
if i.prefetched {
return
}
i.prefetched = true
tmp := make([]EntryIterator, 0, len(i.is))
for _, it := range i.is {
if it.Next() {
tmp = append(tmp, it)
continue
func lessAscending(e1, e2 sortFields) bool {
if e1.timeNanos == e2.timeNanos {
// The underlying stream hash may not be available, such as when merging LokiResponses in the
// frontend which were sharded. Prefer to use the underlying stream hash when available,
// which is needed in deduping code, but defer to label sorting when it's not present.
if e1.streamHash == 0 {
return e1.labels < e2.labels
}
return e1.streamHash < e2.streamHash
}
return e1.timeNanos < e2.timeNanos
}
if err := it.Error(); err != nil {
i.errs = append(i.errs, err)
func lessDescending(e1, e2 sortFields) bool {
if e1.timeNanos == e2.timeNanos {
if e1.streamHash == 0 {
return e1.labels < e2.labels
}
util.LogError("closing iterator", it.Close)
return e1.streamHash < e2.streamHash
}
i.is = tmp
sort.Slice(i.is, i.lessByIndex)
return e1.timeNanos > e2.timeNanos
}
func (i *entrySortIterator) fix() {
head := i.is[0]
t1 := head.Entry().Timestamp.UnixNano()
l1 := head.StreamHash()
lb := head.Labels()
// shortcut
if len(i.is) <= 1 || i.lessByValue(t1, l1, lb, 1) {
return
func (i *entrySortIterator) closeEntry(e EntryIterator) {
if err := e.Error(); err != nil {
i.errs = append(i.errs, err)
}
util.LogError("closing iterator", e.Close)
// First element is out of place. So we reposition it.
i.is = i.is[1:] // drop head
index := sort.Search(len(i.is), func(in int) bool { return i.lessByValue(t1, l1, lb, in) })
if index == len(i.is) {
i.is = append(i.is, head)
} else {
i.is = append(i.is[:index+1], i.is[index:]...)
i.is[index] = head
}
}
func (i *entrySortIterator) Next() bool {
i.init()
if len(i.is) == 0 {
ret := i.tree.Next()
if !ret {
return false
}
next := i.is[0]
next := i.tree.Winner()
i.currEntry.Entry = next.Entry()
i.currEntry.labels = next.Labels()
i.currEntry.streamHash = next.StreamHash()
// if the top iterator is empty, we remove it.
if !next.Next() {
i.is = i.is[1:]
if err := next.Error(); err != nil {
i.errs = append(i.errs, err)
}
util.LogError("closing iterator", next.Close)
return true
}
if len(i.is) > 1 {
i.fix()
}
return true
}
@ -486,12 +444,8 @@ func (i *entrySortIterator) Error() error {
}
func (i *entrySortIterator) Close() error {
for _, entryIterator := range i.is {
if err := entryIterator.Close(); err != nil {
return err
}
}
return nil
i.tree.Close()
return i.Error()
}
// NewStreamsIterator returns an iterator over logproto.Stream

@ -0,0 +1,128 @@
// Loser tree, from https://en.wikipedia.org/wiki/K-way_merge_algorithm#Tournament_Tree
package loser
type Sequence interface {
Next() bool // Advances and returns true if there is a value at this new position.
}
func New[E any, S Sequence](sequences []S, maxVal E, at func(S) E, less func(E, E) bool, close func(S)) *Tree[E, S] {
nSequences := len(sequences)
t := Tree[E, S]{
maxVal: maxVal,
at: at,
less: less,
close: close,
nodes: make([]node[E, S], nSequences*2),
}
for i, s := range sequences {
t.nodes[i+nSequences].items = s
}
if nSequences > 0 {
t.nodes[0].index = -1 // flag to be initialized on first call to Next().
}
return &t
}
// Call the close function on all sequences that are still open.
func (t *Tree[E, S]) Close() {
for _, e := range t.nodes[len(t.nodes)/2 : len(t.nodes)] {
if e.index == -1 {
continue
}
t.close(e.items)
}
}
// A loser tree is a binary tree laid out such that nodes N and N+1 have parent N/2.
// We store M leaf nodes in positions M...2M-1, and M-1 internal nodes in positions 1..M-1.
// Node 0 is a special node, containing the winner of the contest.
type Tree[E any, S Sequence] struct {
maxVal E
at func(S) E
less func(E, E) bool
close func(S) // Called when Next() returns false.
nodes []node[E, S]
}
type node[E any, S Sequence] struct {
index int // This is the loser for all nodes except the 0th, where it is the winner.
value E // Value copied from the loser node, or winner for node 0.
items S // Only populated for leaf nodes.
}
func (t *Tree[E, S]) moveNext(index int) bool {
n := &t.nodes[index]
if n.items.Next() {
n.value = t.at(n.items)
return true
}
t.close(n.items) // Next() returned false; close it and mark as finished.
n.value = t.maxVal
n.index = -1
return false
}
func (t *Tree[E, S]) Winner() S {
return t.nodes[t.nodes[0].index].items
}
func (t *Tree[E, S]) Next() bool {
if len(t.nodes) == 0 {
return false
}
if t.nodes[0].index == -1 { // If tree has not been initialized yet, do that.
t.initialize()
return t.nodes[t.nodes[0].index].index != -1
}
t.moveNext(t.nodes[0].index)
t.replayGames(t.nodes[0].index)
return t.nodes[t.nodes[0].index].index != -1
}
func (t *Tree[E, S]) initialize() {
winners := make([]int, len(t.nodes))
// Initialize leaf nodes as winners to start.
for i := len(t.nodes) / 2; i < len(t.nodes); i++ {
winners[i] = i
t.moveNext(i) // Must call Next on each item so that At() has a value.
}
for i := len(t.nodes) - 2; i > 0; i -= 2 {
// At each stage the winners play each other, and we record the loser in the node.
loser, winner := t.playGame(winners[i], winners[i+1])
p := parent(i)
t.nodes[p].index = loser
t.nodes[p].value = t.nodes[loser].value
winners[p] = winner
}
t.nodes[0].index = winners[1]
t.nodes[0].value = t.nodes[winners[1]].value
}
// Starting at pos, re-consider all values up to the root.
func (t *Tree[E, S]) replayGames(pos int) {
// At the start, pos is a leaf node, and is the winner at that level.
n := parent(pos)
for n != 0 {
if t.less(t.nodes[n].value, t.nodes[pos].value) {
loser := pos
// Record pos as the loser here, and the old loser is the new winner.
pos = t.nodes[n].index
t.nodes[n].index = loser
t.nodes[n].value = t.nodes[loser].value
}
n = parent(n)
}
// pos is now the winner; store it in node 0.
t.nodes[0].index = pos
t.nodes[0].value = t.nodes[pos].value
}
func (t *Tree[E, S]) playGame(a, b int) (loser, winner int) {
if t.less(t.nodes[a].value, t.nodes[b].value) {
return b, a
}
return a, b
}
func parent(i int) int { return i / 2 }

@ -0,0 +1,120 @@
package loser_test
import (
"math"
"testing"
"github.com/grafana/loki/pkg/util/loser"
)
type List struct {
list []uint64
cur uint64
}
func NewList(list ...uint64) *List {
return &List{list: list}
}
func (it *List) At() uint64 {
return it.cur
}
func (it *List) Next() bool {
if len(it.list) > 0 {
it.cur = it.list[0]
it.list = it.list[1:]
return true
}
it.cur = 0
return false
}
func (it *List) Seek(val uint64) bool {
for it.cur < val && len(it.list) > 0 {
it.cur = it.list[0]
it.list = it.list[1:]
}
return len(it.list) > 0
}
func checkIterablesEqual[E any, S1 loser.Sequence, S2 loser.Sequence](t *testing.T, a S1, b S2, at1 func(S1) E, at2 func(S2) E, less func(E, E) bool) {
t.Helper()
count := 0
for a.Next() {
count++
if !b.Next() {
t.Fatalf("b ended before a after %d elements", count)
}
if less(at1(a), at2(b)) || less(at2(b), at1(a)) {
t.Fatalf("position %d: %v != %v", count, at1(a), at2(b))
}
}
if b.Next() {
t.Fatalf("a ended before b after %d elements", count)
}
}
func TestMerge(t *testing.T) {
tests := []struct {
name string
args []*List
want *List
}{
{
name: "empty input",
want: NewList(),
},
{
name: "one list",
args: []*List{NewList(1, 2, 3, 4)},
want: NewList(1, 2, 3, 4),
},
{
name: "two lists",
args: []*List{NewList(3, 4, 5), NewList(1, 2)},
want: NewList(1, 2, 3, 4, 5),
},
{
name: "two lists, first empty",
args: []*List{NewList(), NewList(1, 2)},
want: NewList(1, 2),
},
{
name: "two lists, second empty",
args: []*List{NewList(1, 2), NewList()},
want: NewList(1, 2),
},
{
name: "two lists b",
args: []*List{NewList(1, 2), NewList(3, 4, 5)},
want: NewList(1, 2, 3, 4, 5),
},
{
name: "two lists c",
args: []*List{NewList(1, 3), NewList(2, 4, 5)},
want: NewList(1, 2, 3, 4, 5),
},
{
name: "three lists",
args: []*List{NewList(1, 3), NewList(2, 4), NewList(5)},
want: NewList(1, 2, 3, 4, 5),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
at := func(s *List) uint64 { return s.At() }
less := func(a, b uint64) bool { return a < b }
numCloses := 0
close := func(s *List) {
numCloses++
}
lt := loser.New(tt.args, math.MaxUint64, at, less, close)
at2 := func(s *loser.Tree[uint64, *List]) uint64 { return s.Winner().At() }
checkIterablesEqual(t, tt.want, lt, at, at2, less)
if numCloses != len(tt.args) {
t.Errorf("Expected %d closes, got %d", len(tt.args), numCloses)
}
})
}
}
Loading…
Cancel
Save