Improve head chunk allocations when reading samples. (#3968)

* Improve head chunk allocations when reading samples.

Reading sample is the worst because we are not limited and must return all of them.
 Re-using memory will helps ingesters read path.

```
❯ benchcmp  before.txt after.txt
benchmark                                          old ns/op     new ns/op     delta
BenchmarkHeadBlockSampleIterator/Size_20000-16     2935102       1568140       -46.57%
BenchmarkHeadBlockSampleIterator/Size_10000-16     1441980       750856        -47.93%
BenchmarkHeadBlockSampleIterator/Size_8000-16      1146807       602796        -47.44%
BenchmarkHeadBlockSampleIterator/Size_5000-16      698638        370655        -46.95%

benchmark                                          old allocs     new allocs     delta
BenchmarkHeadBlockSampleIterator/Size_20000-16     32             16             -50.00%
BenchmarkHeadBlockSampleIterator/Size_10000-16     29             15             -48.28%
BenchmarkHeadBlockSampleIterator/Size_8000-16      28             15             -46.43%
BenchmarkHeadBlockSampleIterator/Size_5000-16      26             15             -42.31%

benchmark                                          old bytes     new bytes     delta
BenchmarkHeadBlockSampleIterator/Size_20000-16     2269664       484068        -78.67%
BenchmarkHeadBlockSampleIterator/Size_10000-16     1073582       984           -99.91%
BenchmarkHeadBlockSampleIterator/Size_8000-16      827825        819           -99.90%
BenchmarkHeadBlockSampleIterator/Size_5000-16      475557        786           -99.83%
```

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Improve unorderedHeadBlock too.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
pull/3995/head^2
Cyril Tovena 4 years ago committed by GitHub
parent d6dc8b991c
commit 4a8f62ba00
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 28
      pkg/chunkenc/memchunk.go
  2. 25
      pkg/chunkenc/memchunk_test.go
  3. 5
      pkg/chunkenc/pool.go
  4. 23
      pkg/chunkenc/unordered.go
  5. 26
      pkg/iter/sample_iterator.go

@ -9,8 +9,9 @@ import (
"hash" "hash"
"hash/crc32" "hash/crc32"
"io" "io"
"sort" "reflect"
"time" "time"
"unsafe"
"github.com/cespare/xxhash/v2" "github.com/cespare/xxhash/v2"
util_log "github.com/cortexproject/cortex/pkg/util/log" util_log "github.com/cortexproject/cortex/pkg/util/log"
@ -912,14 +913,12 @@ func (hb *headBlock) sampleIterator(ctx context.Context, mint, maxt int64, extra
lhash := parsedLabels.Hash() lhash := parsedLabels.Hash()
if s, found = series[lhash]; !found { if s, found = series[lhash]; !found {
s = &logproto.Series{ s = &logproto.Series{
Labels: parsedLabels.String(), Labels: parsedLabels.String(),
Samples: SamplesPool.Get(len(hb.entries)).([]logproto.Sample)[:0],
} }
series[lhash] = s series[lhash] = s
} }
h := xxhash.Sum64(unsafeGetBytes(e.s))
// []byte here doesn't create allocation because Sum64 has go:noescape directive
// It specifies that the function does not allow any of the pointers passed as arguments to escape into the heap or into the values returned from the function.
h := xxhash.Sum64([]byte(e.s))
s.Samples = append(s.Samples, logproto.Sample{ s.Samples = append(s.Samples, logproto.Sample{
Timestamp: e.t, Timestamp: e.t,
Value: value, Value: value,
@ -932,11 +931,22 @@ func (hb *headBlock) sampleIterator(ctx context.Context, mint, maxt int64, extra
} }
seriesRes := make([]logproto.Series, 0, len(series)) seriesRes := make([]logproto.Series, 0, len(series))
for _, s := range series { for _, s := range series {
// todo(ctovena) not sure we need this sort.
sort.Sort(s)
seriesRes = append(seriesRes, *s) seriesRes = append(seriesRes, *s)
} }
return iter.NewMultiSeriesIterator(ctx, seriesRes) return iter.SampleIteratorWithClose(iter.NewMultiSeriesIterator(ctx, seriesRes), func() error {
for _, s := range series {
SamplesPool.Put(s.Samples)
}
return nil
})
}
func unsafeGetBytes(s string) []byte {
var buf []byte
p := unsafe.Pointer(&buf)
*(*string)(p) = s
(*reflect.SliceHeader)(p).Cap = len(s)
return buf
} }
type bufferedIterator struct { type bufferedIterator struct {

@ -660,6 +660,27 @@ func BenchmarkRead(b *testing.B) {
b.Log("bytes per second ", humanize.Bytes(uint64(float64(bytesRead)/time.Since(now).Seconds()))) b.Log("bytes per second ", humanize.Bytes(uint64(float64(bytesRead)/time.Since(now).Seconds())))
b.Log("n=", b.N) b.Log("n=", b.N)
}) })
b.Run(enc.String()+"_sample", func(b *testing.B) {
chunks, size := generateData(enc, 5)
b.ResetTimer()
bytesRead := uint64(0)
now := time.Now()
for n := 0; n < b.N; n++ {
for _, c := range chunks {
iterator := c.SampleIterator(context.Background(), time.Unix(0, 0), time.Now(), countExtractor)
for iterator.Next() {
_ = iterator.Sample()
}
if err := iterator.Close(); err != nil {
b.Fatal(err)
}
}
bytesRead += size
}
b.Log("bytes per second ", humanize.Bytes(uint64(float64(bytesRead)/time.Since(now).Seconds())))
b.Log("n=", b.N)
})
} }
} }
@ -733,7 +754,7 @@ func BenchmarkHeadBlockIterator(b *testing.B) {
} }
func BenchmarkHeadBlockSampleIterator(b *testing.B) { func BenchmarkHeadBlockSampleIterator(b *testing.B) {
for _, j := range []int{100000, 50000, 15000, 10000} { for _, j := range []int{20000, 10000, 8000, 5000} {
b.Run(fmt.Sprintf("Size %d", j), func(b *testing.B) { b.Run(fmt.Sprintf("Size %d", j), func(b *testing.B) {
h := headBlock{} h := headBlock{}
@ -751,6 +772,7 @@ func BenchmarkHeadBlockSampleIterator(b *testing.B) {
for iter.Next() { for iter.Next() {
_ = iter.Sample() _ = iter.Sample()
} }
iter.Close()
} }
}) })
} }
@ -1132,7 +1154,6 @@ func TestMemChunk_Rebound(t *testing.T) {
require.Equal(t, originalChunkItr.Entry(), newChunkItr.Entry()) require.Equal(t, originalChunkItr.Entry(), newChunkItr.Entry())
} }
}) })
} }
} }

@ -13,6 +13,8 @@ import (
"github.com/klauspost/compress/zstd" "github.com/klauspost/compress/zstd"
"github.com/pierrec/lz4/v4" "github.com/pierrec/lz4/v4"
"github.com/prometheus/prometheus/pkg/pool" "github.com/prometheus/prometheus/pkg/pool"
"github.com/grafana/loki/pkg/logproto"
) )
// WriterPool is a pool of io.Writer // WriterPool is a pool of io.Writer
@ -53,6 +55,9 @@ var (
// Buckets [0.5KB,1KB,2KB,4KB,8KB] // Buckets [0.5KB,1KB,2KB,4KB,8KB]
BytesBufferPool = pool.New(1<<9, 1<<13, 2, func(size int) interface{} { return make([]byte, 0, size) }) BytesBufferPool = pool.New(1<<9, 1<<13, 2, func(size int) interface{} { return make([]byte, 0, size) })
// SamplesPool pooling array of samples [512,1024,...,16k]
SamplesPool = pool.New(1<<9, 1<<14, 2, func(size int) interface{} { return make([]logproto.Sample, 0, size) })
// Pool of crc32 hash // Pool of crc32 hash
crc32HashPool = sync.Pool{ crc32HashPool = sync.Pool{
New: func() interface{} { New: func() interface{} {

@ -6,7 +6,6 @@ import (
"encoding/binary" "encoding/binary"
"io" "io"
"math" "math"
"sort"
"time" "time"
"github.com/Workiva/go-datastructures/rangetree" "github.com/Workiva/go-datastructures/rangetree"
@ -20,9 +19,7 @@ import (
"github.com/grafana/loki/pkg/logqlmodel/stats" "github.com/grafana/loki/pkg/logqlmodel/stats"
) )
var ( var noopStreamPipeline = log.NewNoopPipeline().ForStream(labels.Labels{})
noopStreamPipeline = log.NewNoopPipeline().ForStream(labels.Labels{})
)
type unorderedHeadBlock struct { type unorderedHeadBlock struct {
// Opted for range tree over skiplist for space reduction. // Opted for range tree over skiplist for space reduction.
@ -86,7 +83,6 @@ func (hb *unorderedHeadBlock) append(ts int64, line string) {
hb.size += len(line) hb.size += len(line)
hb.lines++ hb.lines++
} }
// Implements rangetree.Interval // Implements rangetree.Interval
@ -238,15 +234,13 @@ func (hb *unorderedHeadBlock) sampleIterator(
lhash := parsedLabels.Hash() lhash := parsedLabels.Hash()
if s, found = series[lhash]; !found { if s, found = series[lhash]; !found {
s = &logproto.Series{ s = &logproto.Series{
Labels: parsedLabels.String(), Labels: parsedLabels.String(),
Samples: SamplesPool.Get(hb.lines).([]logproto.Sample)[:0],
} }
series[lhash] = s series[lhash] = s
} }
// []byte here doesn't create allocation because Sum64 has go:noescape directive h := xxhash.Sum64(unsafeGetBytes(line))
// It specifies that the function does not allow any of the pointers passed as arguments
// to escape into the heap or into the values returned from the function.
h := xxhash.Sum64([]byte(line))
s.Samples = append(s.Samples, logproto.Sample{ s.Samples = append(s.Samples, logproto.Sample{
Timestamp: ts, Timestamp: ts,
Value: value, Value: value,
@ -261,11 +255,14 @@ func (hb *unorderedHeadBlock) sampleIterator(
} }
seriesRes := make([]logproto.Series, 0, len(series)) seriesRes := make([]logproto.Series, 0, len(series))
for _, s := range series { for _, s := range series {
// todo(ctovena) not sure we need this sort.
sort.Sort(s)
seriesRes = append(seriesRes, *s) seriesRes = append(seriesRes, *s)
} }
return iter.NewMultiSeriesIterator(ctx, seriesRes) return iter.SampleIteratorWithClose(iter.NewMultiSeriesIterator(ctx, seriesRes), func() error {
for _, s := range series {
SamplesPool.Put(s.Samples)
}
return nil
})
} }
// nolint:unused // nolint:unused

@ -343,6 +343,32 @@ type seriesIterator struct {
labels string labels string
} }
type withCloseSampleIterator struct {
closeFn func() error
SampleIterator
}
func (w *withCloseSampleIterator) Close() error {
var errs []error
if err := w.SampleIterator.Close(); err != nil {
errs = append(errs, err)
}
if err := w.closeFn(); err != nil {
errs = append(errs, err)
}
if len(errs) == 0 {
return nil
}
return util.MultiError(errs)
}
func SampleIteratorWithClose(it SampleIterator, closeFn func() error) SampleIterator {
return &withCloseSampleIterator{
closeFn: closeFn,
SampleIterator: it,
}
}
// NewMultiSeriesIterator returns an iterator over multiple logproto.Series // NewMultiSeriesIterator returns an iterator over multiple logproto.Series
func NewMultiSeriesIterator(ctx context.Context, series []logproto.Series) SampleIterator { func NewMultiSeriesIterator(ctx context.Context, series []logproto.Series) SampleIterator {
is := make([]SampleIterator, 0, len(series)) is := make([]SampleIterator, 0, len(series))

Loading…
Cancel
Save