RangeQuery benchmark optimizations (#1413)

* Added some duplicate streams.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Instead of using sort.Sort(), we keep tuples sorted already.

Also just return first element if there is only one tuple.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Don't do time comparison twice, compare nanoseconds instead.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>
pull/1415/head
Peter Štibraný 6 years ago committed by Cyril Tovena
parent f0f6f24926
commit f6be636e13
  1. 66
      pkg/iter/iterator.go
  2. 26
      pkg/iter/iterator_test.go
  3. 5
      pkg/logql/engine_test.go

@ -4,7 +4,6 @@ import (
"container/heap"
"fmt"
"io"
"sort"
"time"
"github.com/grafana/loki/pkg/helpers"
@ -80,10 +79,18 @@ type iteratorMinHeap struct {
func (h iteratorMinHeap) Less(i, j int) bool {
t1, t2 := h.iteratorHeap[i].Entry().Timestamp, h.iteratorHeap[j].Entry().Timestamp
if !t1.Equal(t2) {
return t1.Before(t2)
un1 := t1.UnixNano()
un2 := t2.UnixNano()
switch {
case un1 < un2:
return true
case un1 > un2:
return false
default: // un1 == un2:
return h.iteratorHeap[i].Labels() < h.iteratorHeap[j].Labels()
}
return h.iteratorHeap[i].Labels() < h.iteratorHeap[j].Labels()
}
type iteratorMaxHeap struct {
@ -92,10 +99,18 @@ type iteratorMaxHeap struct {
func (h iteratorMaxHeap) Less(i, j int) bool {
t1, t2 := h.iteratorHeap[i].Entry().Timestamp, h.iteratorHeap[j].Entry().Timestamp
if !t1.Equal(t2) {
return t1.After(t2)
un1 := t1.UnixNano()
un2 := t2.UnixNano()
switch {
case un1 < un2:
return false
case un1 > un2:
return true
default: // un1 == un2
return h.iteratorHeap[i].Labels() > h.iteratorHeap[j].Labels()
}
return h.iteratorHeap[i].Labels() > h.iteratorHeap[j].Labels()
}
// HeapIterator iterates over a heap of iterators with ability to push new iterators and get some properties like time of entry at peek and len
@ -116,7 +131,7 @@ type heapIterator struct {
is []EntryIterator
prefetched bool
tuples tuples
tuples []tuple
currEntry logproto.Entry
currLabels string
errs []error
@ -184,12 +199,6 @@ type tuple struct {
EntryIterator
}
type tuples []tuple
func (t tuples) Len() int { return len(t) }
func (t tuples) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
func (t tuples) Less(i, j int) bool { return t[i].Line < t[j].Line }
func (i *heapIterator) Next() bool {
i.prefetch()
@ -209,7 +218,8 @@ func (i *heapIterator) Next() bool {
}
heap.Pop(i.heap)
i.tuples = append(i.tuples, tuple{
// insert keeps i.tuples sorted
i.tuples = insert(i.tuples, tuple{
Entry: entry,
EntryIterator: next,
})
@ -229,8 +239,30 @@ func (i *heapIterator) Next() bool {
return true
}
func mostCommon(tuples tuples) tuple {
sort.Sort(tuples)
// Insert new tuple to correct position into ordered set of tuples.
// Insert sort is fast for small number of elements, and here we only expect max [number of replicas] elements.
func insert(ts []tuple, n tuple) []tuple {
ix := 0
for ix < len(ts) && ts[ix].Line <= n.Line {
ix++
}
if ix < len(ts) {
ts = append(ts, tuple{}) // zero element
copy(ts[ix+1:], ts[ix:])
ts[ix] = n
} else {
ts = append(ts, n)
}
return ts
}
// Expects that tuples are sorted already. We achieve that by using insert.
func mostCommon(tuples []tuple) tuple {
// trivial case, no need to do extra work.
if len(tuples) == 1 {
return tuples[0]
}
result := tuples[0]
count, max := 0, 0
for i := 0; i < len(tuples)-1; i++ {

@ -2,6 +2,7 @@ package iter
import (
"fmt"
"sort"
"testing"
"time"
@ -241,18 +242,39 @@ func TestMostCommon(t *testing.T) {
}
require.Equal(t, "a", mostCommon(tuples).Entry.Line)
// Last is most common
tuples = []tuple{
{Entry: logproto.Entry{Line: "a"}},
{Entry: logproto.Entry{Line: "b"}},
{Entry: logproto.Entry{Line: "c"}},
{Entry: logproto.Entry{Line: "b"}},
{Entry: logproto.Entry{Line: "c"}},
{Entry: logproto.Entry{Line: "c"}},
{Entry: logproto.Entry{Line: "c"}},
{Entry: logproto.Entry{Line: "d"}},
}
require.Equal(t, "c", mostCommon(tuples).Entry.Line)
}
func TestInsert(t *testing.T) {
toInsert := []tuple{
{Entry: logproto.Entry{Line: "a"}},
{Entry: logproto.Entry{Line: "e"}},
{Entry: logproto.Entry{Line: "c"}},
{Entry: logproto.Entry{Line: "b"}},
{Entry: logproto.Entry{Line: "d"}},
{Entry: logproto.Entry{Line: "a"}},
{Entry: logproto.Entry{Line: "c"}},
}
var ts []tuple
for _, e := range toInsert {
ts = insert(ts, e)
}
require.True(t, sort.SliceIsSorted(ts, func(i, j int) bool {
return ts[i].Line < ts[j].Line
}))
}
func TestEntryIteratorForward(t *testing.T) {
itr1 := mkStreamIterator(inverse(offset(testSize, identity)), defaultLabels)
itr2 := mkStreamIterator(inverse(offset(testSize, identity)), "{foobar: \"bazbar\"}")

@ -764,6 +764,11 @@ func getLocalQuerier(size int64) Querier {
iter.NewStreamIterator(newStream(size, identity, `{app="bar",bar="foo"}`)),
iter.NewStreamIterator(newStream(size, identity, `{app="bar",bar="bazz"}`)),
iter.NewStreamIterator(newStream(size, identity, `{app="bar",bar="fuzz"}`)),
// some duplicates
iter.NewStreamIterator(newStream(size, identity, `{app="foo"}`)),
iter.NewStreamIterator(newStream(size, identity, `{app="bar"}`)),
iter.NewStreamIterator(newStream(size, identity, `{app="bar",bar="bazz"}`)),
iter.NewStreamIterator(newStream(size, identity, `{app="bar"}`)),
}
return QuerierFunc(func(ctx context.Context, p SelectParams) (iter.EntryIterator, error) {
return iter.NewHeapIterator(iters, p.Direction), nil

Loading…
Cancel
Save