Improve heap iterators. (#4731)

Mostly improve allocations but I'm planning to do more using those benchmark.

```
name                   old time/op    new time/op    delta
HeapIterator-16          3.87ms ± 6%    3.41ms ± 1%  -11.92%  (p=0.008 n=5+5)
HeapSampleIterator-16    2.09ms ± 1%    2.09ms ± 1%     ~     (p=0.421 n=5+5)

name                   old alloc/op   new alloc/op   delta
HeapIterator-16          10.5kB ± 0%     8.2kB ± 0%  -21.88%  (p=0.008 n=5+5)
HeapSampleIterator-16    8.40kB ± 0%    6.10kB ± 0%  -27.34%  (p=0.000 n=5+4)

name                   old allocs/op  new allocs/op  delta
HeapIterator-16            12.0 ± 0%       5.0 ± 0%  -58.33%  (p=0.008 n=5+5)
HeapSampleIterator-16      12.0 ± 0%       5.0 ± 0%  -58.33%  (p=0.008 n=5+5)
```

I want to introduce a difference between the need to dedupe vs order data.

The later doesn't requires poping from the heap, we could actually use a sort.slice.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
pull/4772/head^2
Cyril Tovena 4 years ago committed by GitHub
parent d61dd1872a
commit 88feda41a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      pkg/iter/entry_iterator.go
  2. 39
      pkg/iter/entry_iterator_test.go
  3. 13
      pkg/iter/sample_iterator.go
  4. 42
      pkg/iter/sample_iterator_test.go

@ -157,9 +157,9 @@ func NewHeapIterator(ctx context.Context, is []EntryIterator, direction logproto
result := &heapIterator{is: is, stats: stats.FromContext(ctx)}
switch direction {
case logproto.BACKWARD:
result.heap = &iteratorMaxHeap{}
result.heap = &iteratorMaxHeap{iteratorHeap: make([]EntryIterator, 0, len(is))}
case logproto.FORWARD:
result.heap = &iteratorMinHeap{}
result.heap = &iteratorMinHeap{iteratorHeap: make([]EntryIterator, 0, len(is))}
default:
panic("bad direction")
}
@ -220,6 +220,16 @@ func (i *heapIterator) Next() bool {
return false
}
// shortcut for the last iterator.
if i.heap.Len() == 1 {
i.currEntry = i.heap.Peek().Entry()
i.currLabels = i.heap.Peek().Labels()
if !i.heap.Peek().Next() {
i.heap.Pop()
}
return true
}
// We support multiple entries with the same timestamp, and we want to
// preserve their original order. We look at all the top entries in the
// heap with the same timestamp, and pop the ones whose common value

@ -3,6 +3,7 @@ package iter
import (
"context"
"fmt"
"math/rand"
"testing"
"time"
@ -634,3 +635,41 @@ func TestNonOverlappingClose(t *testing.T) {
require.Equal(t, true, a.closed.Load())
require.Equal(t, true, b.closed.Load())
}
func BenchmarkHeapIterator(b *testing.B) {
var (
ctx = context.Background()
streams []logproto.Stream
entriesCount = 10000
streamsCount = 100
)
for i := 0; i < streamsCount; i++ {
streams = append(streams, logproto.Stream{
Labels: fmt.Sprintf(`{i="%d"}`, i),
})
}
for i := 0; i < entriesCount; i++ {
streams[i%streamsCount].Entries = append(streams[i%streamsCount].Entries, logproto.Entry{
Timestamp: time.Unix(0, int64(streamsCount-i)),
Line: fmt.Sprintf("%d", i),
})
}
rand.Shuffle(len(streams), func(i, j int) {
streams[i], streams[j] = streams[j], streams[i]
})
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
var itrs []EntryIterator
for i := 0; i < streamsCount; i++ {
itrs = append(itrs, NewStreamIterator(streams[i]))
}
b.StartTimer()
it := NewHeapIterator(ctx, itrs, logproto.BACKWARD)
for it.Next() {
it.Entry()
}
it.Close()
}
}

@ -154,10 +154,11 @@ type heapSampleIterator struct {
// NewHeapSampleIterator returns a new iterator which uses a heap to merge together
// entries for multiple iterators.
func NewHeapSampleIterator(ctx context.Context, is []SampleIterator) SampleIterator {
h := sampleIteratorHeap(make([]SampleIterator, 0, len(is)))
return &heapSampleIterator{
stats: stats.FromContext(ctx),
is: is,
heap: &sampleIteratorHeap{},
heap: &h,
tuples: make([]sampletuple, 0, len(is)),
}
}
@ -210,6 +211,16 @@ func (i *heapSampleIterator) Next() bool {
return false
}
// shortcut for the last iterator.
if i.heap.Len() == 1 {
i.curr = i.heap.Peek().Sample()
i.currLabels = i.heap.Peek().Labels()
if !i.heap.Peek().Next() {
i.heap.Pop()
}
return true
}
// We support multiple entries with the same timestamp, and we want to
// preserve their original order. We look at all the top entries in the
// heap with the same timestamp, and pop the ones whose common value

@ -2,7 +2,9 @@ package iter
import (
"context"
"fmt"
"io"
"math/rand"
"testing"
"time"
@ -94,6 +96,7 @@ var varSeries = logproto.Series{
sample(1), sample(2), sample(3),
},
}
var carSeries = logproto.Series{
Labels: `{foo="car"}`,
Samples: []logproto.Sample{
@ -145,7 +148,6 @@ func (f *fakeSampleClient) Recv() (*logproto.SampleQueryResponse, error) {
func (fakeSampleClient) Context() context.Context { return context.Background() }
func (fakeSampleClient) CloseSend() error { return nil }
func TestNewSampleQueryClientIterator(t *testing.T) {
it := NewSampleQueryClientIterator(&fakeSampleClient{
series: [][]logproto.Series{
{varSeries},
@ -274,3 +276,41 @@ func TestSampleIteratorWithClose_ReturnsError(t *testing.T) {
err2 := it.Close()
assert.Equal(t, err, err2)
}
func BenchmarkHeapSampleIterator(b *testing.B) {
var (
ctx = context.Background()
series []logproto.Series
entriesCount = 10000
seriesCount = 100
)
for i := 0; i < seriesCount; i++ {
series = append(series, logproto.Series{
Labels: fmt.Sprintf(`{i="%d"}`, i),
})
}
for i := 0; i < entriesCount; i++ {
series[i%seriesCount].Samples = append(series[i%seriesCount].Samples, logproto.Sample{
Timestamp: int64(seriesCount - i),
Value: float64(i),
})
}
rand.Shuffle(len(series), func(i, j int) {
series[i], series[j] = series[j], series[i]
})
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
var itrs []SampleIterator
for i := 0; i < seriesCount; i++ {
itrs = append(itrs, NewSeriesIterator(series[i]))
}
b.StartTimer()
it := NewHeapSampleIterator(ctx, itrs)
for it.Next() {
it.Sample()
}
it.Close()
}
}

Loading…
Cancel
Save