utilities for reducing ngram allocations in token iterator construction (#11276)

Previously, the `calculatePrefix` function was used to generate the
token buffer used, but this wouldn't necessarily have enough space to
append the ngram afterwards without reallocating+copying the token
buffer. This PR ensures we allocate enough space for both prefix and
ngram in the token buffer.
pull/11246/head
Owen Diehl 2 years ago committed by GitHub
parent 4455cd9d7d
commit 6e93d150d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 37
      pkg/storage/bloom/v1/bloom_tokenizer.go
  2. 44
      pkg/storage/bloom/v1/bloom_tokenizer_test.go
  3. 16
      pkg/storage/bloom/v1/tokenizer.go
  4. 15
      pkg/storage/bloom/v1/tokenizer_test.go

@ -2,7 +2,6 @@ package v1
import (
"context"
"encoding/binary"
"math"
"time"
@ -14,6 +13,7 @@ import (
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/util/encoding"
util_log "github.com/grafana/loki/pkg/util/log"
)
@ -68,27 +68,32 @@ func clearCache(cache map[string]interface{}) {
}
}
func calculatePrefix(chk logproto.ChunkRef) []byte {
i64buf := make([]byte, binary.MaxVarintLen64)
i32buf := make([]byte, 4)
prefix := make([]byte, 32)
binary.PutVarint(i64buf, int64(chk.From))
prefix = append(prefix, i64buf...)
binary.PutVarint(i64buf, int64(chk.Through))
prefix = append(prefix, i64buf...)
binary.LittleEndian.PutUint32(i32buf, chk.Checksum)
prefix = append(prefix, i32buf...)
return prefix
// prefixedToken returns a byte slice with sufficient capacity for a chunk-ref prefixed token
// of specific ngram length, along with the length of the prefix.
// It ensures enough capacity for the prefix and the token so additional tokens can be created
// without allocations by appending them to the prefix length
func prefixedToken(ngram int, chk logproto.ChunkRef) ([]byte, int) {
var enc encoding.Encbuf
enc.PutBE64(uint64(chk.From))
enc.PutBE64(uint64(chk.Through))
enc.PutBE32(chk.Checksum)
prefixLn := enc.Len() // record the length of the prefix
enc.PutBytes(make([]byte, ngram*MaxRuneLen)) // ensure enough capacity for the ngram
// return the underlying byte slice and the length of the prefix
return enc.Get(), prefixLn
}
// PopulateSeriesWithBloom is intended to be called on the write path, and is used to populate the bloom filter for a given series.
func (bt *BloomTokenizer) PopulateSeriesWithBloom(seriesWithBloom *SeriesWithBloom, chunks []chunk.Chunk) {
clearCache(bt.cache)
// allocate a reusable key buffer long enough to store both the chunk ref and the ngram
for idx := range chunks {
lc := chunks[idx].Data.(*chunkenc.Facade).LokiChunk()
prefix := calculatePrefix(chunks[idx].ChunkRef)
tokenBuf, prefixLn := prefixedToken(bt.lineTokenizer.N, chunks[idx].ChunkRef)
// TODO: error handling
itr, err := lc.Iterator(
@ -106,7 +111,7 @@ func (bt *BloomTokenizer) PopulateSeriesWithBloom(seriesWithBloom *SeriesWithBlo
defer itr.Close()
for itr.Next() && itr.Error() == nil {
chunkTokenizer := NewPrefixedTokenIter(prefix, bt.lineTokenizer.Tokens(itr.Entry().Line))
chunkTokenizer := NewPrefixedTokenIter(tokenBuf, prefixLn, bt.lineTokenizer.Tokens(itr.Entry().Line))
for chunkTokenizer.Next() {
tok := chunkTokenizer.At()
if tok != nil {

@ -7,6 +7,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/push"
"github.com/grafana/loki/pkg/storage/chunk"
@ -24,6 +25,49 @@ var (
four = NewNGramTokenizer(4, 0)
)
func TestPrefixedKeyCreation(t *testing.T) {
var ones uint64 = 0xffffffffffffffff
ref := logproto.ChunkRef{
From: 0,
Through: model.Time(int64(ones)),
Checksum: 0xffffffff,
}
for _, tc := range []struct {
desc string
ngram, expLen int
}{
{
desc: "0-gram",
ngram: 0,
expLen: 20,
},
{
desc: "4-gram",
ngram: 4,
expLen: 20 + 4*MaxRuneLen,
},
} {
t.Run(tc.desc, func(t *testing.T) {
token, prefixLn := prefixedToken(tc.ngram, ref)
require.Equal(t, 20, prefixLn)
require.Equal(t, tc.expLen, len(token))
// first 8 bytes should be zeros from `from`
for i := 0; i < 8; i++ {
require.Equal(t, byte(0), token[i])
}
// next 8 bytes should be ones from `through`
for i := 8; i < 16; i++ {
require.Equal(t, byte(255), token[i])
}
// next 4 bytes should be ones from `checksum`
for i := 16; i < 20; i++ {
require.Equal(t, byte(255), token[i])
}
})
}
}
func TestSetLineTokenizer(t *testing.T) {
bt, _ := NewBloomTokenizer(prometheus.DefaultRegisterer)

@ -4,6 +4,10 @@ import (
"unicode/utf8"
)
const (
MaxRuneLen = 4
)
func reassemble(buf []rune, ln, pos int, result []byte) []byte {
result = result[:0] // Reset the result slice
for i := 0; i < ln; i++ {
@ -29,7 +33,7 @@ func NewNGramTokenizer(n, skip int) *NGramTokenizer {
N: n,
Skip: skip,
buffer: make([]rune, n+skip),
res: make([]byte, 0, n*4), // maximum 4 bytes per rune
res: make([]byte, 0, n*MaxRuneLen), // maximum 4 bytes per rune
}
return t
@ -89,20 +93,20 @@ func (t *NGramTokenIter) Err() error {
}
type PrefixedTokenIter struct {
prefix []byte
buf []byte
prefixLen int
NGramTokenIter
}
func (t *PrefixedTokenIter) At() []byte {
return append(t.prefix[:t.prefixLen], t.NGramTokenIter.At()...)
return append(t.buf[:t.prefixLen], t.NGramTokenIter.At()...)
}
func NewPrefixedTokenIter(prefix []byte, iter NGramTokenIter) *PrefixedTokenIter {
func NewPrefixedTokenIter(buf []byte, prefixLn int, iter NGramTokenIter) *PrefixedTokenIter {
return &PrefixedTokenIter{
prefix: prefix,
prefixLen: len(prefix),
buf: buf,
prefixLen: prefixLn,
NGramTokenIter: iter,
}
}

@ -4,6 +4,8 @@ import (
"testing"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/logproto"
)
const BigFile = "../../../logql/sketch/testdata/war_peace.txt"
@ -101,7 +103,7 @@ func TestPrefixedIterator(t *testing.T) {
} {
prefix := []byte("0123")
t.Run(tc.desc, func(t *testing.T) {
itr := NewPrefixedTokenIter(prefix, three.Tokens(tc.input))
itr := NewPrefixedTokenIter(prefix, len(prefix), three.Tokens(tc.input))
for _, exp := range tc.exp {
require.True(t, itr.Next())
require.Equal(t, exp, string(itr.At()))
@ -126,9 +128,6 @@ func BenchmarkTokens(b *testing.B) {
var (
v2Three = NewNGramTokenizer(3, 0)
v2ThreeSkip1 = NewNGramTokenizer(3, 1)
// fp + from + through + checksum
chunkPrefixLen = 8 + 8 + 8 + 4
)
type impl struct {
@ -174,9 +173,9 @@ func BenchmarkTokens(b *testing.B) {
{
desc: "v2",
f: func() func() {
prefix := make([]byte, chunkPrefixLen, 512)
buf, prefixLn := prefixedToken(v2Three.N, logproto.ChunkRef{})
return func() {
itr := NewPrefixedTokenIter(prefix, v2Three.Tokens(lorem))
itr := NewPrefixedTokenIter(buf, prefixLn, v2Three.Tokens(lorem))
for itr.Next() {
_ = itr.At()
}
@ -191,9 +190,9 @@ func BenchmarkTokens(b *testing.B) {
{
desc: "v2",
f: func() func() {
prefix := make([]byte, chunkPrefixLen, 512)
buf, prefixLn := prefixedToken(v2Three.N, logproto.ChunkRef{})
return func() {
itr := NewPrefixedTokenIter(prefix, v2ThreeSkip1.Tokens(lorem))
itr := NewPrefixedTokenIter(buf, prefixLn, v2ThreeSkip1.Tokens(lorem))
for itr.Next() {
_ = itr.At()
}

Loading…
Cancel
Save