From 4a8f62ba00bcf6d338833eb506f91789c9ed584e Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Tue, 13 Jul 2021 12:50:06 +0200 Subject: [PATCH] Improve head chunk allocations when reading samples. (#3968) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Improve head chunk allocations when reading samples. Reading sample is the worst because we are not limited and must return all of them. Re-using memory will helps ingesters read path. ``` ❯ benchcmp before.txt after.txt benchmark old ns/op new ns/op delta BenchmarkHeadBlockSampleIterator/Size_20000-16 2935102 1568140 -46.57% BenchmarkHeadBlockSampleIterator/Size_10000-16 1441980 750856 -47.93% BenchmarkHeadBlockSampleIterator/Size_8000-16 1146807 602796 -47.44% BenchmarkHeadBlockSampleIterator/Size_5000-16 698638 370655 -46.95% benchmark old allocs new allocs delta BenchmarkHeadBlockSampleIterator/Size_20000-16 32 16 -50.00% BenchmarkHeadBlockSampleIterator/Size_10000-16 29 15 -48.28% BenchmarkHeadBlockSampleIterator/Size_8000-16 28 15 -46.43% BenchmarkHeadBlockSampleIterator/Size_5000-16 26 15 -42.31% benchmark old bytes new bytes delta BenchmarkHeadBlockSampleIterator/Size_20000-16 2269664 484068 -78.67% BenchmarkHeadBlockSampleIterator/Size_10000-16 1073582 984 -99.91% BenchmarkHeadBlockSampleIterator/Size_8000-16 827825 819 -99.90% BenchmarkHeadBlockSampleIterator/Size_5000-16 475557 786 -99.83% ``` Signed-off-by: Cyril Tovena * Improve unorderedHeadBlock too. Signed-off-by: Cyril Tovena --- pkg/chunkenc/memchunk.go | 28 +++++++++++++++++++--------- pkg/chunkenc/memchunk_test.go | 25 +++++++++++++++++++++++-- pkg/chunkenc/pool.go | 5 +++++ pkg/chunkenc/unordered.go | 23 ++++++++++------------- pkg/iter/sample_iterator.go | 26 ++++++++++++++++++++++++++ 5 files changed, 83 insertions(+), 24 deletions(-) diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 7eaf01d565..e297a482da 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -9,8 +9,9 @@ import ( "hash" "hash/crc32" "io" - "sort" + "reflect" "time" + "unsafe" "github.com/cespare/xxhash/v2" util_log "github.com/cortexproject/cortex/pkg/util/log" @@ -912,14 +913,12 @@ func (hb *headBlock) sampleIterator(ctx context.Context, mint, maxt int64, extra lhash := parsedLabels.Hash() if s, found = series[lhash]; !found { s = &logproto.Series{ - Labels: parsedLabels.String(), + Labels: parsedLabels.String(), + Samples: SamplesPool.Get(len(hb.entries)).([]logproto.Sample)[:0], } series[lhash] = s } - - // []byte here doesn't create allocation because Sum64 has go:noescape directive - // It specifies that the function does not allow any of the pointers passed as arguments to escape into the heap or into the values returned from the function. - h := xxhash.Sum64([]byte(e.s)) + h := xxhash.Sum64(unsafeGetBytes(e.s)) s.Samples = append(s.Samples, logproto.Sample{ Timestamp: e.t, Value: value, @@ -932,11 +931,22 @@ func (hb *headBlock) sampleIterator(ctx context.Context, mint, maxt int64, extra } seriesRes := make([]logproto.Series, 0, len(series)) for _, s := range series { - // todo(ctovena) not sure we need this sort. - sort.Sort(s) seriesRes = append(seriesRes, *s) } - return iter.NewMultiSeriesIterator(ctx, seriesRes) + return iter.SampleIteratorWithClose(iter.NewMultiSeriesIterator(ctx, seriesRes), func() error { + for _, s := range series { + SamplesPool.Put(s.Samples) + } + return nil + }) +} + +func unsafeGetBytes(s string) []byte { + var buf []byte + p := unsafe.Pointer(&buf) + *(*string)(p) = s + (*reflect.SliceHeader)(p).Cap = len(s) + return buf } type bufferedIterator struct { diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index 63edb14c6f..ed457315c4 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -660,6 +660,27 @@ func BenchmarkRead(b *testing.B) { b.Log("bytes per second ", humanize.Bytes(uint64(float64(bytesRead)/time.Since(now).Seconds()))) b.Log("n=", b.N) }) + + b.Run(enc.String()+"_sample", func(b *testing.B) { + chunks, size := generateData(enc, 5) + b.ResetTimer() + bytesRead := uint64(0) + now := time.Now() + for n := 0; n < b.N; n++ { + for _, c := range chunks { + iterator := c.SampleIterator(context.Background(), time.Unix(0, 0), time.Now(), countExtractor) + for iterator.Next() { + _ = iterator.Sample() + } + if err := iterator.Close(); err != nil { + b.Fatal(err) + } + } + bytesRead += size + } + b.Log("bytes per second ", humanize.Bytes(uint64(float64(bytesRead)/time.Since(now).Seconds()))) + b.Log("n=", b.N) + }) } } @@ -733,7 +754,7 @@ func BenchmarkHeadBlockIterator(b *testing.B) { } func BenchmarkHeadBlockSampleIterator(b *testing.B) { - for _, j := range []int{100000, 50000, 15000, 10000} { + for _, j := range []int{20000, 10000, 8000, 5000} { b.Run(fmt.Sprintf("Size %d", j), func(b *testing.B) { h := headBlock{} @@ -751,6 +772,7 @@ func BenchmarkHeadBlockSampleIterator(b *testing.B) { for iter.Next() { _ = iter.Sample() } + iter.Close() } }) } @@ -1132,7 +1154,6 @@ func TestMemChunk_Rebound(t *testing.T) { require.Equal(t, originalChunkItr.Entry(), newChunkItr.Entry()) } - }) } } diff --git a/pkg/chunkenc/pool.go b/pkg/chunkenc/pool.go index b485aeedd5..ef28f0bdb3 100644 --- a/pkg/chunkenc/pool.go +++ b/pkg/chunkenc/pool.go @@ -13,6 +13,8 @@ import ( "github.com/klauspost/compress/zstd" "github.com/pierrec/lz4/v4" "github.com/prometheus/prometheus/pkg/pool" + + "github.com/grafana/loki/pkg/logproto" ) // WriterPool is a pool of io.Writer @@ -53,6 +55,9 @@ var ( // Buckets [0.5KB,1KB,2KB,4KB,8KB] BytesBufferPool = pool.New(1<<9, 1<<13, 2, func(size int) interface{} { return make([]byte, 0, size) }) + // SamplesPool pooling array of samples [512,1024,...,16k] + SamplesPool = pool.New(1<<9, 1<<14, 2, func(size int) interface{} { return make([]logproto.Sample, 0, size) }) + // Pool of crc32 hash crc32HashPool = sync.Pool{ New: func() interface{} { diff --git a/pkg/chunkenc/unordered.go b/pkg/chunkenc/unordered.go index 0732a07273..eed6376351 100644 --- a/pkg/chunkenc/unordered.go +++ b/pkg/chunkenc/unordered.go @@ -6,7 +6,6 @@ import ( "encoding/binary" "io" "math" - "sort" "time" "github.com/Workiva/go-datastructures/rangetree" @@ -20,9 +19,7 @@ import ( "github.com/grafana/loki/pkg/logqlmodel/stats" ) -var ( - noopStreamPipeline = log.NewNoopPipeline().ForStream(labels.Labels{}) -) +var noopStreamPipeline = log.NewNoopPipeline().ForStream(labels.Labels{}) type unorderedHeadBlock struct { // Opted for range tree over skiplist for space reduction. @@ -86,7 +83,6 @@ func (hb *unorderedHeadBlock) append(ts int64, line string) { hb.size += len(line) hb.lines++ - } // Implements rangetree.Interval @@ -238,15 +234,13 @@ func (hb *unorderedHeadBlock) sampleIterator( lhash := parsedLabels.Hash() if s, found = series[lhash]; !found { s = &logproto.Series{ - Labels: parsedLabels.String(), + Labels: parsedLabels.String(), + Samples: SamplesPool.Get(hb.lines).([]logproto.Sample)[:0], } series[lhash] = s } - // []byte here doesn't create allocation because Sum64 has go:noescape directive - // It specifies that the function does not allow any of the pointers passed as arguments - // to escape into the heap or into the values returned from the function. - h := xxhash.Sum64([]byte(line)) + h := xxhash.Sum64(unsafeGetBytes(line)) s.Samples = append(s.Samples, logproto.Sample{ Timestamp: ts, Value: value, @@ -261,11 +255,14 @@ func (hb *unorderedHeadBlock) sampleIterator( } seriesRes := make([]logproto.Series, 0, len(series)) for _, s := range series { - // todo(ctovena) not sure we need this sort. - sort.Sort(s) seriesRes = append(seriesRes, *s) } - return iter.NewMultiSeriesIterator(ctx, seriesRes) + return iter.SampleIteratorWithClose(iter.NewMultiSeriesIterator(ctx, seriesRes), func() error { + for _, s := range series { + SamplesPool.Put(s.Samples) + } + return nil + }) } // nolint:unused diff --git a/pkg/iter/sample_iterator.go b/pkg/iter/sample_iterator.go index 9dc97c053f..6040f7b9be 100644 --- a/pkg/iter/sample_iterator.go +++ b/pkg/iter/sample_iterator.go @@ -343,6 +343,32 @@ type seriesIterator struct { labels string } +type withCloseSampleIterator struct { + closeFn func() error + SampleIterator +} + +func (w *withCloseSampleIterator) Close() error { + var errs []error + if err := w.SampleIterator.Close(); err != nil { + errs = append(errs, err) + } + if err := w.closeFn(); err != nil { + errs = append(errs, err) + } + if len(errs) == 0 { + return nil + } + return util.MultiError(errs) +} + +func SampleIteratorWithClose(it SampleIterator, closeFn func() error) SampleIterator { + return &withCloseSampleIterator{ + closeFn: closeFn, + SampleIterator: it, + } +} + // NewMultiSeriesIterator returns an iterator over multiple logproto.Series func NewMultiSeriesIterator(ctx context.Context, series []logproto.Series) SampleIterator { is := make([]SampleIterator, 0, len(series))