diff --git a/pkg/storage/bloom/v1/bloom_tokenizer.go b/pkg/storage/bloom/v1/bloom_tokenizer.go new file mode 100644 index 0000000000..7060052438 --- /dev/null +++ b/pkg/storage/bloom/v1/bloom_tokenizer.go @@ -0,0 +1,111 @@ +package v1 + +import ( + "context" + "math" + "time" + + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + + "github.com/grafana/loki/pkg/chunkenc" + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql/log" + "github.com/grafana/loki/pkg/storage/bloom/v1/filter" + "github.com/grafana/loki/pkg/storage/chunk" + util_log "github.com/grafana/loki/pkg/util/log" + //"github.com/grafana/loki/tools/tsdb/helpers" +) + +type metrics struct{} + +/* +BloomTokenizer is a utility that converts either Loki chunks or individual lines into tokens. +These tokens are n-grams, representing adjacent letters, that are used to populate a bloom filter. +https://en.wikipedia.org/wiki/Bloom_filter +Bloom filters are utilized for faster lookups of log lines. +*/ +type BloomTokenizer struct { + metrics *metrics + + lineTokenizer Tokenizer + chunkIDTokenizer *WrappedTokenizer + cache map[string]interface{} +} + +const CacheSize = 150000 + +// NewBloomTokenizer returns a new instance of the Bloom Tokenizer. +// Warning: the tokens returned use the same byte slice to reduce allocations. This has two consequences: +// 1) The token slices generated must not be mutated externally +// 2) The token slice must not be used after the next call to `Tokens()` as it will repopulate the slice. +// 2) This is not thread safe. +func NewBloomTokenizer(reg prometheus.Registerer) (*BloomTokenizer, error) { + t := &BloomTokenizer{ + metrics: newMetrics(reg), + } + t.cache = make(map[string]interface{}, CacheSize) + t.lineTokenizer = NewNGramTokenizer(4, 5, 0) // default to 4-grams, no skip + t.chunkIDTokenizer = ChunkIDTokenizer(t.lineTokenizer) + + level.Info(util_log.Logger).Log("bloom tokenizer created") + + return t, nil +} + +func (bt *BloomTokenizer) SetLineTokenizer(t Tokenizer) { + bt.lineTokenizer = t + bt.chunkIDTokenizer = ChunkIDTokenizer(bt.lineTokenizer) +} + +// TODO: Something real here with metrics +func newMetrics(r prometheus.Registerer) *metrics { + return &metrics{} +} + +func clearCache(cache map[string]interface{}) { + for k := range cache { + delete(cache, k) + } +} + +func (bt *BloomTokenizer) PopulateSBF(sbf *filter.ScalableBloomFilter, chunks []chunk.Chunk) { + clearCache(bt.cache) + for idx := range chunks { + lc := chunks[idx].Data.(*chunkenc.Facade).LokiChunk() + bt.chunkIDTokenizer.Reinit(chunks[idx].ChunkRef) + + // TODO: error handling + itr, _ := lc.Iterator( + context.Background(), + time.Unix(0, 0), // TODO: Parameterize/better handle the timestamps? + time.Unix(0, math.MaxInt64), + logproto.FORWARD, + log.NewNoopPipeline().ForStream(chunks[idx].Metric), + ) + + for itr.Next() && itr.Error() == nil { + toks := bt.chunkIDTokenizer.Tokens(itr.Entry().Line) + + for _, tok := range toks { + if tok.Key != nil { + str := string(tok.Key) + _, found := bt.cache[str] // A cache is used ahead of the SBF, as it cuts out the costly operations of scaling bloom filters + if !found { + bt.cache[str] = nil + + sbf.TestAndAdd(tok.Key) + + if len(bt.cache) > 150000 { // While crude, this has proven efficient in performance testing. This speaks to the similarity in log lines near each other + clearCache(bt.cache) + } + } + } + } + } + } // for each chunk +} + +func (bt *BloomTokenizer) TokenizeLine(line string) []Token { + return bt.lineTokenizer.Tokens(line) +} diff --git a/pkg/storage/bloom/v1/bloom_tokenizer_test.go b/pkg/storage/bloom/v1/bloom_tokenizer_test.go new file mode 100644 index 0000000000..eaff6c7837 --- /dev/null +++ b/pkg/storage/bloom/v1/bloom_tokenizer_test.go @@ -0,0 +1,30 @@ +package v1 + +import ( + "fmt" + "testing" + + "github.com/prometheus/client_golang/prometheus" +) + +func BenchmarkMapClear(b *testing.B) { + bt, _ := NewBloomTokenizer(prometheus.DefaultRegisterer) + for i := 0; i < b.N; i++ { + for k := 0; k < CacheSize; k++ { + bt.cache[fmt.Sprint(k)] = k + } + + clearCache(bt.cache) + } +} + +func BenchmarkNewMap(b *testing.B) { + bt, _ := NewBloomTokenizer(prometheus.DefaultRegisterer) + for i := 0; i < b.N; i++ { + for k := 0; k < CacheSize; k++ { + bt.cache[fmt.Sprint(k)] = k + } + + bt.cache = make(map[string]interface{}, CacheSize) + } +} diff --git a/pkg/storage/bloom/v1/tokenizer.go b/pkg/storage/bloom/v1/tokenizer.go new file mode 100644 index 0000000000..22da439f07 --- /dev/null +++ b/pkg/storage/bloom/v1/tokenizer.go @@ -0,0 +1,162 @@ +package v1 + +import ( + "encoding/binary" + "unicode/utf8" + + "github.com/grafana/loki/pkg/logproto" +) + +type Token struct { + Key []byte +} + +type Tokenizer interface { + Tokens(line string) []Token + GetSkip() int + GetMin() int + GetMax() int +} + +const TokenBufferSize = 4096 +const TokenKeySize = 132 + +type NgramTokenizer struct { + // [min,max) exclusivity + min, max, skip int + buffers [][]rune // circular buffers used for ngram generation + runeBuffer []byte // buffer used for token generation + internalTokenBuffer []Token // circular buffer for tokens +} + +/* +N-Grams (https://en.wikipedia.org/wiki/N-gram) are a series of 'n' adjacent characters in a string. +These will be utilized for the bloom filters to allow for fuzzy searching. +*/ +func NewNGramTokenizer(min, max, skip int) *NgramTokenizer { + capacity := max - min + t := &NgramTokenizer{ + min: min, + max: max, + skip: skip, + buffers: make([][]rune, capacity), + runeBuffer: make([]byte, 0, max*4), + internalTokenBuffer: make([]Token, 0, TokenBufferSize), + } + + for i := range t.buffers { + t.buffers[i] = make([]rune, t.min+i) + } + + for i := 0; i < cap(t.internalTokenBuffer); i++ { + t.internalTokenBuffer = append(t.internalTokenBuffer, Token{Key: make([]byte, 0, TokenKeySize)}) + } + + return t +} + +func (t *NgramTokenizer) GetSkip() int { + return t.skip +} + +func (t *NgramTokenizer) GetMin() int { + return t.min +} + +func (t *NgramTokenizer) GetMax() int { + return t.max +} + +func (t *NgramTokenizer) Tokens(line string) []Token { + var i int // rune index (not position that is measured in the range loop) + numToks := 0 + for _, r := range line { + + // j is the index of the buffer to use + for j := 0; j < (t.max - t.min); j++ { + // n is the length of the ngram + n := j + t.min + // pos is the position in the buffer to overwrite + pos := i % n + t.buffers[j][pos] = r + + if i >= n-1 && (i+1-n)%(t.skip+1) == 0 { + t.runeBuffer = reassemble(t.buffers[j], (i+1)%n, t.runeBuffer) + if numToks >= cap(t.internalTokenBuffer) || numToks == len(t.internalTokenBuffer) { + t.internalTokenBuffer = append(t.internalTokenBuffer, Token{Key: make([]byte, 0, TokenKeySize)}) + } + t.internalTokenBuffer[numToks].Key = t.internalTokenBuffer[numToks].Key[:0] + t.internalTokenBuffer[numToks].Key = append(t.internalTokenBuffer[numToks].Key, t.runeBuffer...) + numToks++ + } + } + i++ + } + return t.internalTokenBuffer[0:numToks] +} + +func reassemble(buf []rune, pos int, result []byte) []byte { + result = result[:0] // Reset the result slice + for i := 0; i < len(buf); i++ { + cur := (pos + i) % len(buf) + result = utf8.AppendRune(result, buf[cur]) + } + return result +} + +func chunkIDTransformer(tok Token, prefix []byte) Token { + tok.Key = append(append(tok.Key, prefix...), tok.Key...)[len(tok.Key):] + return tok +} + +type WrappedTokenizer struct { + t Tokenizer + tokenBuffer []Token + prefix []byte + i64buf []byte + i32buf []byte +} + +func (w *WrappedTokenizer) Tokens(line string) []Token { + w.tokenBuffer = w.tokenBuffer[:0] // Reset the result slice + toks := w.t.Tokens(line) + for _, tok := range toks { + w.tokenBuffer = append(w.tokenBuffer, chunkIDTransformer(tok, w.prefix), tok) + } + + return w.tokenBuffer +} + +func (w *WrappedTokenizer) GetSkip() int { + return w.t.GetSkip() +} + +func (w *WrappedTokenizer) GetMin() int { + return w.t.GetMin() +} + +func (w *WrappedTokenizer) GetMax() int { + return w.t.GetMax() +} + +func ChunkIDTokenizer(t Tokenizer) *WrappedTokenizer { + p := make([]byte, 0, 256) + return &WrappedTokenizer{ + t: t, + tokenBuffer: make([]Token, 0, TokenBufferSize), + prefix: p, + i64buf: make([]byte, binary.MaxVarintLen64), + i32buf: make([]byte, 4), + } +} + +func (w *WrappedTokenizer) Reinit(chk logproto.ChunkRef) { + w.prefix = w.prefix[:0] + + binary.PutVarint(w.i64buf, int64(chk.From)) + w.prefix = append(w.prefix, w.i64buf...) + binary.PutVarint(w.i64buf, int64(chk.Through)) + w.prefix = append(w.prefix, w.i64buf...) + binary.LittleEndian.PutUint32(w.i32buf, chk.Checksum) + w.prefix = append(w.prefix, w.i32buf...) +} diff --git a/tools/tsdb/bloom-tester/tokenizer_test.go b/pkg/storage/bloom/v1/tokenizer_test.go similarity index 52% rename from tools/tsdb/bloom-tester/tokenizer_test.go rename to pkg/storage/bloom/v1/tokenizer_test.go index b0660663e1..8a2c32d793 100644 --- a/tools/tsdb/bloom-tester/tokenizer_test.go +++ b/pkg/storage/bloom/v1/tokenizer_test.go @@ -1,17 +1,32 @@ -package main +package v1 import ( "bufio" "encoding/binary" - "github.com/grafana/loki/pkg/logproto" "os" "testing" + "github.com/grafana/loki/pkg/logproto" + "github.com/stretchr/testify/require" ) -func TestNGramTokenizer(t *testing.T) { - tokenizer := threeSkip2 +const BigFile = "../../../logql/sketch/testdata/war_peace.txt" + +var ( + twoSkipOne = NewNGramTokenizer(2, 3, 1) + three = NewNGramTokenizer(3, 4, 0) + threeSkip1 = NewNGramTokenizer(3, 4, 1) + threeSkip2 = NewNGramTokenizer(3, 4, 2) + four = NewNGramTokenizer(4, 5, 0) + fourSkip1 = NewNGramTokenizer(4, 5, 1) + fourSkip2 = NewNGramTokenizer(4, 5, 2) + five = NewNGramTokenizer(5, 6, 0) + six = NewNGramTokenizer(6, 7, 0) +) + +func TestNGrams(t *testing.T) { + tokenizer := NewNGramTokenizer(2, 4, 0) for _, tc := range []struct { desc string input string @@ -27,10 +42,25 @@ func TestNGramTokenizer(t *testing.T) { input: "a", exp: []Token{}, }, + { + desc: "two chars", + input: "ab", + exp: []Token{{Key: []byte("ab")}}, + }, + { + desc: "three chars", + input: "abc", + exp: []Token{{Key: []byte("ab")}, {Key: []byte("bc")}, {Key: []byte("abc")}}, + }, { desc: "four chars", input: "abcd", - exp: []Token{{Key: []byte("abc"), Value: "abc"}}, + exp: []Token{{Key: []byte("ab")}, {Key: []byte("bc")}, {Key: []byte("abc")}, {Key: []byte("cd")}, {Key: []byte("bcd")}}, + }, + { + desc: "foo", + input: "日本語", + exp: []Token{{Key: []byte("日本")}, {Key: []byte("本語")}, {Key: []byte("日本語")}}, }, } { t.Run(tc.desc, func(t *testing.T) { @@ -39,7 +69,50 @@ func TestNGramTokenizer(t *testing.T) { } } -func Test3Gram0SkipTokenizer(t *testing.T) { +func TestNGramsSkip(t *testing.T) { + + for _, tc := range []struct { + desc string + tokenizer *NgramTokenizer + input string + exp []Token + }{ + { + desc: "four chars", + tokenizer: twoSkipOne, + input: "abcd", + exp: []Token{{Key: []byte("ab")}, {Key: []byte("cd")}}, + }, + { + desc: "special chars", + tokenizer: twoSkipOne, + input: "日本語", + exp: []Token{{Key: []byte("日本")}}, + }, + { + desc: "multi", + tokenizer: NewNGramTokenizer(2, 4, 1), + input: "abcdefghij", + exp: []Token{ + {Key: []byte("ab")}, + {Key: []byte("abc")}, + {Key: []byte("cd")}, + {Key: []byte("cde")}, + {Key: []byte("ef")}, + {Key: []byte("efg")}, + {Key: []byte("gh")}, + {Key: []byte("ghi")}, + {Key: []byte("ij")}, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + require.Equal(t, tc.exp, tc.tokenizer.Tokens(tc.input)) + }) + } +} + +func Test3GramSkip0Tokenizer(t *testing.T) { tokenizer := three for _, tc := range []struct { desc string @@ -59,12 +132,12 @@ func Test3Gram0SkipTokenizer(t *testing.T) { { desc: "three char", input: "abc", - exp: []Token{{Key: []byte("abc"), Value: "abc"}}, + exp: []Token{{Key: []byte("abc")}}, }, { desc: "four chars", input: "abcd", - exp: []Token{{Key: []byte("abc"), Value: "abc"}, {Key: []byte("bcd"), Value: "bcd"}}, + exp: []Token{{Key: []byte("abc")}, {Key: []byte("bcd")}}, }, } { t.Run(tc.desc, func(t *testing.T) { @@ -73,7 +146,7 @@ func Test3Gram0SkipTokenizer(t *testing.T) { } } -func Test3Gram1SkipTokenizer(t *testing.T) { +func Test3GramSkip1Tokenizer(t *testing.T) { tokenizer := threeSkip1 for _, tc := range []struct { desc string @@ -93,17 +166,46 @@ func Test3Gram1SkipTokenizer(t *testing.T) { { desc: "three char", input: "abc", - exp: []Token{{Key: []byte("abc"), Value: "abc"}}, + exp: []Token{{Key: []byte("abc")}}, }, { desc: "four chars", input: "abcd", - exp: []Token{{Key: []byte("abc"), Value: "abc"}}, + exp: []Token{{Key: []byte("abc")}}, }, { desc: "five chars", input: "abcde", - exp: []Token{{Key: []byte("abc"), Value: "abc"}, {Key: []byte("cde"), Value: "cde"}}, + exp: []Token{{Key: []byte("abc")}, {Key: []byte("cde")}}, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + require.Equal(t, tc.exp, tokenizer.Tokens(tc.input)) + }) + } +} + +func Test3GramSkip2Tokenizer(t *testing.T) { + tokenizer := threeSkip2 + for _, tc := range []struct { + desc string + input string + exp []Token + }{ + { + desc: "empty", + input: "", + exp: []Token{}, + }, + { + desc: "single char", + input: "a", + exp: []Token{}, + }, + { + desc: "four chars", + input: "abcd", + exp: []Token{{Key: []byte("abc")}}, }, } { t.Run(tc.desc, func(t *testing.T) { @@ -112,7 +214,7 @@ func Test3Gram1SkipTokenizer(t *testing.T) { } } -func Test4Gram0SkipTokenizer(t *testing.T) { +func Test4GramSkip0Tokenizer(t *testing.T) { tokenizer := four for _, tc := range []struct { desc string @@ -137,12 +239,12 @@ func Test4Gram0SkipTokenizer(t *testing.T) { { desc: "four chars", input: "abcd", - exp: []Token{{Key: []byte("abcd"), Value: "abcd"}}, + exp: []Token{{Key: []byte("abcd")}}, }, { desc: "five chars", input: "abcde", - exp: []Token{{Key: []byte("abcd"), Value: "abcd"}, {Key: []byte("bcde"), Value: "bcde"}}, + exp: []Token{{Key: []byte("abcd")}, {Key: []byte("bcde")}}, }, } { t.Run(tc.desc, func(t *testing.T) { @@ -151,7 +253,7 @@ func Test4Gram0SkipTokenizer(t *testing.T) { } } -func Test4Gram1SkipTokenizer(t *testing.T) { +func Test4GramSkip1Tokenizer(t *testing.T) { tokenizer := fourSkip1 for _, tc := range []struct { desc string @@ -176,27 +278,27 @@ func Test4Gram1SkipTokenizer(t *testing.T) { { desc: "four chars", input: "abcd", - exp: []Token{{Key: []byte("abcd"), Value: "abcd"}}, + exp: []Token{{Key: []byte("abcd")}}, }, { desc: "five chars", input: "abcde", - exp: []Token{{Key: []byte("abcd"), Value: "abcd"}}, + exp: []Token{{Key: []byte("abcd")}}, }, { desc: "six chars", input: "abcdef", - exp: []Token{{Key: []byte("abcd"), Value: "abcd"}, {Key: []byte("cdef"), Value: "cdef"}}, + exp: []Token{{Key: []byte("abcd")}, {Key: []byte("cdef")}}, }, { desc: "seven chars", input: "abcdefg", - exp: []Token{{Key: []byte("abcd"), Value: "abcd"}, {Key: []byte("cdef"), Value: "cdef"}}, + exp: []Token{{Key: []byte("abcd")}, {Key: []byte("cdef")}}, }, { desc: "eight chars", input: "abcdefgh", - exp: []Token{{Key: []byte("abcd"), Value: "abcd"}, {Key: []byte("cdef"), Value: "cdef"}, {Key: []byte("efgh"), Value: "efgh"}}, + exp: []Token{{Key: []byte("abcd")}, {Key: []byte("cdef")}, {Key: []byte("efgh")}}, }, } { t.Run(tc.desc, func(t *testing.T) { @@ -205,7 +307,7 @@ func Test4Gram1SkipTokenizer(t *testing.T) { } } -func Test4Gram2SkipTokenizer(t *testing.T) { +func Test4GramSkip2Tokenizer(t *testing.T) { tokenizer := fourSkip2 for _, tc := range []struct { desc string @@ -230,37 +332,37 @@ func Test4Gram2SkipTokenizer(t *testing.T) { { desc: "four chars", input: "abcd", - exp: []Token{{Key: []byte("abcd"), Value: "abcd"}}, + exp: []Token{{Key: []byte("abcd")}}, }, { desc: "five chars", input: "abcde", - exp: []Token{{Key: []byte("abcd"), Value: "abcd"}}, + exp: []Token{{Key: []byte("abcd")}}, }, { desc: "six chars", input: "abcdef", - exp: []Token{{Key: []byte("abcd"), Value: "abcd"}}, + exp: []Token{{Key: []byte("abcd")}}, }, { desc: "seven chars", input: "abcdefg", - exp: []Token{{Key: []byte("abcd"), Value: "abcd"}, {Key: []byte("defg"), Value: "defg"}}, + exp: []Token{{Key: []byte("abcd")}, {Key: []byte("defg")}}, }, { desc: "eight chars", input: "abcdefgh", - exp: []Token{{Key: []byte("abcd"), Value: "abcd"}, {Key: []byte("defg"), Value: "defg"}}, + exp: []Token{{Key: []byte("abcd")}, {Key: []byte("defg")}}, }, { desc: "nine chars", input: "abcdefghi", - exp: []Token{{Key: []byte("abcd"), Value: "abcd"}, {Key: []byte("defg"), Value: "defg"}}, + exp: []Token{{Key: []byte("abcd")}, {Key: []byte("defg")}}, }, { desc: "ten chars", input: "abcdefghij", - exp: []Token{{Key: []byte("abcd"), Value: "abcd"}, {Key: []byte("defg"), Value: "defg"}, {Key: []byte("ghij"), Value: "ghij"}}, + exp: []Token{{Key: []byte("abcd")}, {Key: []byte("defg")}, {Key: []byte("ghij")}}, }, } { t.Run(tc.desc, func(t *testing.T) { @@ -269,7 +371,7 @@ func Test4Gram2SkipTokenizer(t *testing.T) { } } -func Test5Gram0SkipTokenizer(t *testing.T) { +func Test5GramSkip0Tokenizer(t *testing.T) { tokenizer := five for _, tc := range []struct { desc string @@ -299,12 +401,12 @@ func Test5Gram0SkipTokenizer(t *testing.T) { { desc: "five chars", input: "abcde", - exp: []Token{{Key: []byte("abcde"), Value: "abcde"}}, + exp: []Token{{Key: []byte("abcde")}}, }, { desc: "six chars", input: "abcdef", - exp: []Token{{Key: []byte("abcde"), Value: "abcde"}, {Key: []byte("bcdef"), Value: "bcdef"}}, + exp: []Token{{Key: []byte("abcde")}, {Key: []byte("bcdef")}}, }, } { t.Run(tc.desc, func(t *testing.T) { @@ -313,7 +415,7 @@ func Test5Gram0SkipTokenizer(t *testing.T) { } } -func Test6Gram0SkipTokenizer(t *testing.T) { +func Test6GramSkip0Tokenizer(t *testing.T) { tokenizer := six for _, tc := range []struct { desc string @@ -348,12 +450,12 @@ func Test6Gram0SkipTokenizer(t *testing.T) { { desc: "six chars", input: "abcdef", - exp: []Token{{Key: []byte("abcdef"), Value: "abcdef"}}, + exp: []Token{{Key: []byte("abcdef")}}, }, { desc: "seven chars", input: "abcdefg", - exp: []Token{{Key: []byte("abcdef"), Value: "abcdef"}, {Key: []byte("bcdefg"), Value: "bcdefg"}}, + exp: []Token{{Key: []byte("abcdef")}, {Key: []byte("bcdefg")}}, }, } { t.Run(tc.desc, func(t *testing.T) { @@ -369,13 +471,10 @@ func makeBuf(from, through, checksum int) []byte { binary.PutVarint(i64buf, int64(from)) p = append(p, i64buf...) - p = append(p, 58) binary.PutVarint(i64buf, int64(through)) p = append(p, i64buf...) - p = append(p, 58) binary.LittleEndian.PutUint32(i32buf, uint32(checksum)) p = append(p, i32buf...) - p = append(p, 58) return p } @@ -400,46 +499,43 @@ func TestWrappedTokenizer(t *testing.T) { desc: "four chars", input: "abcd", exp: []Token{ - {Key: append(makeBuf(0, 999999, 1), []byte("abc")...), Value: string(makeBuf(0, 999999, 1)) + "abc"}, - {Key: []byte("abc"), Value: "abc"}}, + {Key: append(makeBuf(0, 999999, 1), []byte("abc")...)}, + {Key: []byte("abc")}}, }, { desc: "uuid", input: "2b1a5e46-36a2-4694-a4b1-f34cc7bdfc45", exp: []Token{ - {Key: append(makeBuf(0, 999999, 1), []byte("2b1")...), Value: string(makeBuf(0, 999999, 1)) + "2b1"}, - {Key: append(makeBuf(0, 999999, 1), []byte("a5e")...), Value: string(makeBuf(0, 999999, 1)) + "a5e"}, - {Key: append(makeBuf(0, 999999, 1), []byte("46-")...), Value: string(makeBuf(0, 999999, 1)) + "46-"}, - {Key: append(makeBuf(0, 999999, 1), []byte("36a")...), Value: string(makeBuf(0, 999999, 1)) + "36a"}, - {Key: append(makeBuf(0, 999999, 1), []byte("2-4")...), Value: string(makeBuf(0, 999999, 1)) + "2-4"}, - {Key: append(makeBuf(0, 999999, 1), []byte("694")...), Value: string(makeBuf(0, 999999, 1)) + "694"}, - {Key: append(makeBuf(0, 999999, 1), []byte("-a4")...), Value: string(makeBuf(0, 999999, 1)) + "-a4"}, - {Key: append(makeBuf(0, 999999, 1), []byte("b1-")...), Value: string(makeBuf(0, 999999, 1)) + "b1-"}, - {Key: append(makeBuf(0, 999999, 1), []byte("f34")...), Value: string(makeBuf(0, 999999, 1)) + "f34"}, - {Key: append(makeBuf(0, 999999, 1), []byte("cc7")...), Value: string(makeBuf(0, 999999, 1)) + "cc7"}, - {Key: append(makeBuf(0, 999999, 1), []byte("bdf")...), Value: string(makeBuf(0, 999999, 1)) + "bdf"}, - {Key: append(makeBuf(0, 999999, 1), []byte("c45")...), Value: string(makeBuf(0, 999999, 1)) + "c45"}, - {Key: []byte("2b1"), Value: "2b1"}, - {Key: []byte("a5e"), Value: "a5e"}, - {Key: []byte("46-"), Value: "46-"}, - {Key: []byte("36a"), Value: "36a"}, - {Key: []byte("2-4"), Value: "2-4"}, - {Key: []byte("694"), Value: "694"}, - {Key: []byte("-a4"), Value: "-a4"}, - {Key: []byte("b1-"), Value: "b1-"}, - {Key: []byte("f34"), Value: "f34"}, - {Key: []byte("cc7"), Value: "cc7"}, - {Key: []byte("bdf"), Value: "bdf"}, - {Key: []byte("c45"), Value: "c45"}, + {Key: append(makeBuf(0, 999999, 1), []byte("2b1")...)}, + {Key: []byte("2b1")}, + {Key: append(makeBuf(0, 999999, 1), []byte("a5e")...)}, + {Key: []byte("a5e")}, + {Key: append(makeBuf(0, 999999, 1), []byte("46-")...)}, + {Key: []byte("46-")}, + {Key: append(makeBuf(0, 999999, 1), []byte("36a")...)}, + {Key: []byte("36a")}, + {Key: append(makeBuf(0, 999999, 1), []byte("2-4")...)}, + {Key: []byte("2-4")}, + {Key: append(makeBuf(0, 999999, 1), []byte("694")...)}, + {Key: []byte("694")}, + {Key: append(makeBuf(0, 999999, 1), []byte("-a4")...)}, + {Key: []byte("-a4")}, + {Key: append(makeBuf(0, 999999, 1), []byte("b1-")...)}, + {Key: []byte("b1-")}, + {Key: append(makeBuf(0, 999999, 1), []byte("f34")...)}, + {Key: []byte("f34")}, + {Key: append(makeBuf(0, 999999, 1), []byte("cc7")...)}, + {Key: []byte("cc7")}, + {Key: append(makeBuf(0, 999999, 1), []byte("bdf")...)}, + {Key: []byte("bdf")}, + {Key: append(makeBuf(0, 999999, 1), []byte("c45")...)}, + {Key: []byte("c45")}, }, }, } { t.Run(tc.desc, func(t *testing.T) { - chunkTokenizer := ChunkIDTokenizer(logproto.ChunkRef{Fingerprint: 1, - From: 0, - Through: 999999, - Checksum: 1, - }, tokenizer) + chunkTokenizer := ChunkIDTokenizer(tokenizer) + chunkTokenizer.Reinit(logproto.ChunkRef{From: 0, Through: 999999, Checksum: 1}) require.Equal(t, tc.exp, chunkTokenizer.Tokens(tc.input)) }) } @@ -448,7 +544,7 @@ func TestWrappedTokenizer(t *testing.T) { func BenchmarkTokens(b *testing.B) { for i := 0; i < b.N; i++ { b.StopTimer() - file, _ := os.Open("big.txt") + file, _ := os.Open(BigFile) defer file.Close() scanner := bufio.NewScanner(file) @@ -460,17 +556,19 @@ func BenchmarkTokens(b *testing.B) { } } -func BenchmarkOldTokens(b *testing.B) { +func BenchmarkWrappedTokens(b *testing.B) { + chunkTokenizer := ChunkIDTokenizer(three) + chunkTokenizer.Reinit(logproto.ChunkRef{From: 0, Through: 999999, Checksum: 1}) for i := 0; i < b.N; i++ { b.StopTimer() - file, _ := os.Open("big.txt") + file, _ := os.Open(BigFile) defer file.Close() scanner := bufio.NewScanner(file) b.StartTimer() for scanner.Scan() { line := scanner.Text() - _ = three.OldTokens(line) + _ = chunkTokenizer.Tokens(line) } } } diff --git a/tools/tsdb/bloom-tester/lib.go b/tools/tsdb/bloom-tester/lib.go index 7ea76fa522..edc804cad5 100644 --- a/tools/tsdb/bloom-tester/lib.go +++ b/tools/tsdb/bloom-tester/lib.go @@ -10,7 +10,6 @@ import ( "github.com/grafana/loki/pkg/storage/bloom/v1/filter" tsdbindex "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" - //"github.com/grafana/loki/pkg/storage/stores/tsdb/index" "hash/fnv" "math" "os" @@ -26,8 +25,8 @@ import ( "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/logql/log" "github.com/grafana/loki/pkg/storage" + bt "github.com/grafana/loki/pkg/storage/bloom/v1" "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/chunk/client" "github.com/grafana/loki/pkg/storage/config" @@ -89,15 +88,15 @@ func execute() { } var ( - three = newNGramTokenizer(3, 4, 0) - threeSkip1 = newNGramTokenizer(3, 4, 1) - threeSkip2 = newNGramTokenizer(3, 4, 2) - threeSkip3 = newNGramTokenizer(3, 4, 3) - four = newNGramTokenizer(4, 5, 0) - fourSkip1 = newNGramTokenizer(4, 5, 1) - fourSkip2 = newNGramTokenizer(4, 5, 2) - five = newNGramTokenizer(5, 6, 0) - six = newNGramTokenizer(6, 7, 0) + three = bt.NewNGramTokenizer(3, 4, 0) + threeSkip1 = bt.NewNGramTokenizer(3, 4, 1) + threeSkip2 = bt.NewNGramTokenizer(3, 4, 2) + threeSkip3 = bt.NewNGramTokenizer(3, 4, 3) + four = bt.NewNGramTokenizer(4, 5, 0) + fourSkip1 = bt.NewNGramTokenizer(4, 5, 1) + fourSkip2 = bt.NewNGramTokenizer(4, 5, 2) + five = bt.NewNGramTokenizer(5, 6, 0) + six = bt.NewNGramTokenizer(6, 7, 0) onePctError = func() *filter.ScalableBloomFilter { return filter.NewScalableBloomFilter(1024, 0.01, 0.8) } fivePctError = func() *filter.ScalableBloomFilter { return filter.NewScalableBloomFilter(1024, 0.05, 0.8) } @@ -120,26 +119,26 @@ var experiments = []Experiment{ true, onePctError, ), - - NewExperiment( - "token=4skip1_error=1%_indexchunks=true", - fourSkip1, - true, - onePctError, - ), /* + NewExperiment( + "token=4skip1_error=1%_indexchunks=true", + fourSkip1, + true, + onePctError, + ), + + NewExperiment( + "token=4skip2_error=1%_indexchunks=true", + fourSkip2, + true, + onePctError, + ), NewExperiment( - "token=4skip2_error=1%_indexchunks=true", - fourSkip2, + "token=4skip0_error=5%_indexchunks=true", + four, true, - onePctError, + fivePctError, ),*/ - NewExperiment( - "token=4skip0_error=5%_indexchunks=true", - four, - true, - fivePctError, - ), /* NewExperiment( "token=4skip1_error=5%_indexchunks=true", @@ -266,11 +265,10 @@ func analyze(metrics *Metrics, sampler Sampler, indexShipper indexshipper.IndexS } level.Info(util_log.Logger).Log("msg", "starting analyze()", "tester", testerNumber, "total", numTesters) - var n int // count iterated series - reportEvery := 10 // report every n chunks + var n int // count iterated series //pool := newPool(runtime.NumCPU()) //pool := newPool(1) - + bloomTokenizer, _ := bt.NewBloomTokenizer(prometheus.DefaultRegisterer) for _, tenant := range tenants { level.Info(util_log.Logger).Log("Analyzing tenant", tenant, "table", tableName) err := indexShipper.ForEach( @@ -308,8 +306,6 @@ func analyze(metrics *Metrics, sampler Sampler, indexShipper indexshipper.IndexS return } - cache := NewLRUCache4(150000) - transformed := make([]chunk.Chunk, 0, len(chks)) for _, chk := range chks { transformed = append(transformed, chunk.Chunk{ @@ -333,11 +329,10 @@ func analyze(metrics *Metrics, sampler Sampler, indexShipper indexshipper.IndexS for _, c := range got { chunkTotalUncompressedSize += c.Data.(*chunkenc.Facade).LokiChunk().UncompressedSize() } - metrics.chunkSize.Observe(float64(chunkTotalUncompressedSize)) n += len(got) // iterate experiments - for experimentIdx, experiment := range experiments { + for _, experiment := range experiments { bucketPrefix := os.Getenv("BUCKET_PREFIX") if strings.EqualFold(bucketPrefix, "") { bucketPrefix = "named-experiments-" @@ -348,63 +343,13 @@ func analyze(metrics *Metrics, sampler Sampler, indexShipper indexshipper.IndexS tenant, ls.String(), objectClient) { + bloomTokenizer.SetLineTokenizer(experiment.tokenizer) level.Info(util_log.Logger).Log("Starting work on: ", ls.String(), "'", FNV32a(ls.String()), "'", experiment.name, tenant) startTime := time.Now().UnixMilli() sbf := experiment.bloom() - cache.Clear() - - // Iterate chunks - var ( - lines, inserts, collisions float64 - ) - for cidx := range got { - chunkTokenizer := ChunkIDTokenizer(got[cidx].ChunkRef, experiment.tokenizer) - - var tokenizer Tokenizer = chunkTokenizer - if !experiment.encodeChunkID { - tokenizer = experiment.tokenizer // so I don't have to change the lines of code below - } - lc := got[cidx].Data.(*chunkenc.Facade).LokiChunk() - - // Only report on the last experiment since they run serially - if experimentIdx == len(experiments)-1 && (n+cidx+1)%reportEvery == 0 { - estimatedProgress := float64(fp) / float64(model.Fingerprint(math.MaxUint64)) * 100. - level.Info(util_log.Logger).Log( - "msg", "iterated", - "progress", fmt.Sprintf("%.2f%%", estimatedProgress), - "chunks", len(chks), - "series", ls.String(), - ) - } - - itr, err := lc.Iterator( - context.Background(), - time.Unix(0, 0), - time.Unix(0, math.MaxInt64), - logproto.FORWARD, - log.NewNoopPipeline().ForStream(ls), - ) - helpers.ExitErr("getting iterator", err) - - for itr.Next() && itr.Error() == nil { - toks := tokenizer.Tokens(itr.Entry().Line) - lines++ - for _, tok := range toks { - if tok.Key != nil { - if !cache.GetString(tok.Value) { - cache.PutStringByte(tok.Value, tok.Key) - if dup := sbf.TestAndAdd(tok.Key); dup { - collisions++ - } - inserts++ - } - } - } - } - helpers.ExitErr("iterating chunks", itr.Error()) - } // for each chunk + bloomTokenizer.PopulateSBF(sbf, got) endTime := time.Now().UnixMilli() if len(got) > 0 { @@ -414,9 +359,6 @@ func analyze(metrics *Metrics, sampler Sampler, indexShipper indexshipper.IndexS metrics.estimatedCount.WithLabelValues(experiment.name).Observe( float64(estimatedCount(sbf.Capacity(), sbf.FillRatio())), ) - metrics.lines.WithLabelValues(experiment.name).Add(lines) - metrics.inserts.WithLabelValues(experiment.name).Add(inserts) - metrics.collisions.WithLabelValues(experiment.name).Add(collisions) writeSBF(sbf, os.Getenv("DIR"), @@ -428,6 +370,7 @@ func analyze(metrics *Metrics, sampler Sampler, indexShipper indexshipper.IndexS metrics.sbfCreationTime.WithLabelValues(experiment.name).Add(float64(endTime - startTime)) metrics.sbfsCreated.WithLabelValues(experiment.name).Inc() + metrics.chunkSize.Observe(float64(chunkTotalUncompressedSize)) if err != nil { helpers.ExitErr("writing sbf to file", err) diff --git a/tools/tsdb/bloom-tester/lib_test.go b/tools/tsdb/bloom-tester/lib_test.go index 68460897c5..419ff44f59 100644 --- a/tools/tsdb/bloom-tester/lib_test.go +++ b/tools/tsdb/bloom-tester/lib_test.go @@ -3,262 +3,151 @@ package main import ( "bufio" "os" - "strconv" "testing" - - "github.com/stretchr/testify/require" ) -func TestNGrams(t *testing.T) { - tokenizer := newNGramTokenizer(2, 4, 0) - for _, tc := range []struct { - desc string - input string - exp []Token - }{ - { - desc: "empty", - input: "", - exp: []Token{}, - }, - { - desc: "single char", - input: "a", - exp: []Token{}, - }, - { - desc: "two chars", - input: "ab", - exp: []Token{{Key: []byte("ab"), Value: "ab"}}, - }, - { - desc: "three chars", - input: "abc", - exp: []Token{{Key: []byte("ab"), Value: "ab"}, {Key: []byte("bc"), Value: "bc"}, {Key: []byte("abc"), Value: "abc"}}, - }, - { - desc: "four chars", - input: "abcd", - exp: []Token{{Key: []byte("ab"), Value: "ab"}, {Key: []byte("bc"), Value: "bc"}, {Key: []byte("abc"), Value: "abc"}, {Key: []byte("cd"), Value: "cd"}, {Key: []byte("bcd"), Value: "bcd"}}, - }, - { - desc: "foo", - input: "日本語", - exp: []Token{{Key: []byte("日本"), Value: "日本"}, {Key: []byte("本語"), Value: "本語"}, {Key: []byte("日本語"), Value: "日本語"}}, - }, - } { - t.Run(tc.desc, func(t *testing.T) { - require.Equal(t, tc.exp, tokenizer.Tokens(tc.input)) - }) - } -} - -func Test4NGrams(t *testing.T) { - tokenizer := four - for _, tc := range []struct { - desc string - input string - exp []Token - }{ - { - desc: "empty", - input: "", - exp: []Token{}, - }, - { - desc: "single char", - input: "a", - exp: []Token{}, - }, - { - desc: "two chars", - input: "ab", - exp: []Token{}, - }, - { - desc: "three chars", - input: "abc", - exp: []Token{}, - }, - { - desc: "four chars", - input: "abcd", - exp: []Token{{Key: []byte("abcd"), Value: "abcd"}}, - }, - { - desc: "five chars", - input: "abcde", - exp: []Token{{Key: []byte("abcd"), Value: "abcd"}, {Key: []byte("bcde"), Value: "bcde"}}, - }, - } { - t.Run(tc.desc, func(t *testing.T) { - require.Equal(t, tc.exp, tokenizer.Tokens(tc.input)) - }) - } -} - -func Test6NGrams(t *testing.T) { - tokenizer := six - for _, tc := range []struct { - desc string - input string - exp []Token - }{ - { - desc: "empty", - input: "", - exp: []Token{}, - }, - { - desc: "single char", - input: "a", - exp: []Token{}, - }, - { - desc: "two chars", - input: "ab", - exp: []Token{}, - }, - { - desc: "three chars", - input: "abc", - exp: []Token{}, - }, - { - desc: "four chars", - input: "abcd", - exp: []Token{}, - }, - { - desc: "five chars", - input: "abcde", - exp: []Token{}, - }, - { - desc: "six chars", - input: "abcdef", - exp: []Token{{Key: []byte("abcdef"), Value: "abcdef"}}, - }, - { - desc: "seven chars", - input: "abcdefg", - exp: []Token{{Key: []byte("abcdef"), Value: "abcdef"}, {Key: []byte("bcdefg"), Value: "bcdefg"}}, - }, - { - desc: "eight chars", - input: "abcdefgh", - exp: []Token{{Key: []byte("abcdef"), Value: "abcdef"}, {Key: []byte("bcdefg"), Value: "bcdefg"}, {Key: []byte("cdefgh"), Value: "cdefgh"}}, - }, - } { - t.Run(tc.desc, func(t *testing.T) { - require.Equal(t, tc.exp, tokenizer.Tokens(tc.input)) - }) - } -} - -func TestNGramsSkip(t *testing.T) { - twoSkipOne := newNGramTokenizer(2, 3, 1) - for _, tc := range []struct { - desc string - tokenizer *ngramTokenizer - input string - exp []Token - }{ - { - desc: "four chars", - tokenizer: twoSkipOne, - input: "abcd", - exp: []Token{{Key: []byte("ab"), Value: "ab"}, {Key: []byte("cd"), Value: "cd"}}, - }, - { - desc: "special chars", - tokenizer: twoSkipOne, - input: "日本語", - exp: []Token{{Key: []byte("日本"), Value: "日本"}}, - }, - { - desc: "multi", - tokenizer: newNGramTokenizer(2, 4, 1), - input: "abcdefghij", - exp: []Token{ - {Key: []byte("ab"), Value: "ab"}, - {Key: []byte("abc"), Value: "abc"}, - {Key: []byte("cd"), Value: "cd"}, - {Key: []byte("cde"), Value: "cde"}, - {Key: []byte("ef"), Value: "ef"}, - {Key: []byte("efg"), Value: "efg"}, - {Key: []byte("gh"), Value: "gh"}, - {Key: []byte("ghi"), Value: "ghi"}, - {Key: []byte("ij"), Value: "ij"}, - }, - }, - } { - t.Run(tc.desc, func(t *testing.T) { - require.Equal(t, tc.exp, tc.tokenizer.Tokens(tc.input)) - }) - } -} - -var num = 1000000 +const BigFile = "../../../pkg/logql/sketch/testdata/war_peace.txt" -func BenchmarkLRU1Put(b *testing.B) { - cache := NewLRUCache(num) +func BenchmarkSBFTestAndAdd(b *testing.B) { for i := 0; i < b.N; i++ { - cache.Put(strconv.Itoa(i)) + b.StopTimer() + file, _ := os.Open(BigFile) + defer file.Close() + scanner := bufio.NewScanner(file) + experiment := NewExperiment( + "token=3skip0_error=1%_indexchunks=true", + three, + true, + onePctError, + ) + sbf := experiment.bloom() + b.StartTimer() + for scanner.Scan() { + line := scanner.Text() + tokens := experiment.tokenizer.Tokens(line) + for _, token := range tokens { + sbf.TestAndAdd(token.Key) + } + } } } -func BenchmarkLRU1Get(b *testing.B) { - cache := NewLRUCache(num) - for i := 0; i < num; i++ { - cache.Put(strconv.Itoa(i)) - } - b.ResetTimer() +func BenchmarkSBFAdd(b *testing.B) { for i := 0; i < b.N; i++ { - cache.Get(strconv.Itoa(i)) + b.StopTimer() + file, _ := os.Open(BigFile) + defer file.Close() + scanner := bufio.NewScanner(file) + experiment := NewExperiment( + "token=3skip0_error=1%_indexchunks=true", + three, + true, + onePctError, + ) + sbf := experiment.bloom() + b.StartTimer() + for scanner.Scan() { + line := scanner.Text() + tokens := experiment.tokenizer.Tokens(line) + for _, token := range tokens { + sbf.Add(token.Key) + } + } } } -func BenchmarkLRU2Put(b *testing.B) { - cache := NewLRUCache2(num) +func BenchmarkSBFSeparateTestAndAdd(b *testing.B) { for i := 0; i < b.N; i++ { - cache.Put(strconv.Itoa(i)) + b.StopTimer() + file, _ := os.Open(BigFile) + defer file.Close() + scanner := bufio.NewScanner(file) + experiment := NewExperiment( + "token=3skip0_error=1%_indexchunks=true", + three, + true, + onePctError, + ) + sbf := experiment.bloom() + b.StartTimer() + for scanner.Scan() { + line := scanner.Text() + tokens := experiment.tokenizer.Tokens(line) + for _, token := range tokens { + found := sbf.Test(token.Key) + if !found { + sbf.Add(token.Key) + } + } + } } } -func BenchmarkLRU2Get(b *testing.B) { - cache := NewLRUCache2(num) - for i := 0; i < num; i++ { - cache.Put(strconv.Itoa(i)) - } - b.ResetTimer() +func BenchmarkSBFTestAndAddWithLRU(b *testing.B) { for i := 0; i < b.N; i++ { - cache.Get(strconv.Itoa(i)) + b.StopTimer() + file, _ := os.Open(BigFile) + defer file.Close() + scanner := bufio.NewScanner(file) + experiment := NewExperiment( + "token=3skip0_error=1%_indexchunks=true", + three, + true, + onePctError, + ) + sbf := experiment.bloom() + cache := NewLRUCache4(150000) + b.StartTimer() + for scanner.Scan() { + line := scanner.Text() + tokens := experiment.tokenizer.Tokens(line) + for _, token := range tokens { + if !cache.Get(token.Key) { + cache.Put(token.Key) + sbf.TestAndAdd(token.Key) + } + } + } } } -func BenchmarkLRU4Put(b *testing.B) { - cache := NewLRUCache4(num) +func BenchmarkSBFSeparateTestAndAddWithLRU(b *testing.B) { for i := 0; i < b.N; i++ { - cache.Put([]byte(strconv.Itoa(i))) - } -} + b.StopTimer() + file, _ := os.Open(BigFile) + defer file.Close() + scanner := bufio.NewScanner(file) + experiment := NewExperiment( + "token=3skip0_error=1%_indexchunks=true", + three, + true, + onePctError, + ) + sbf := experiment.bloom() + cache := NewLRUCache4(150000) + b.StartTimer() + for scanner.Scan() { + line := scanner.Text() + tokens := experiment.tokenizer.Tokens(line) + for _, token := range tokens { + if !cache.Get(token.Key) { + cache.Put(token.Key) -func BenchmarkLRU4Get(b *testing.B) { - cache := NewLRUCache4(num) - for i := 0; i < num; i++ { - cache.Put([]byte(strconv.Itoa(i))) - } - b.ResetTimer() - for i := 0; i < b.N; i++ { - cache.Get([]byte(strconv.Itoa(i))) + found := sbf.Test(token.Key) + if !found { + sbf.Add(token.Key) + } + //sbf.TestAndAdd(token.Key) + } + } + } } } -func BenchmarkSBFTestAndAdd(b *testing.B) { +func BenchmarkSBFSeparateTestAndAddWithLRU5(b *testing.B) { for i := 0; i < b.N; i++ { b.StopTimer() - file, _ := os.Open("big.txt") + file, _ := os.Open(BigFile) defer file.Close() scanner := bufio.NewScanner(file) experiment := NewExperiment( @@ -268,21 +157,31 @@ func BenchmarkSBFTestAndAdd(b *testing.B) { onePctError, ) sbf := experiment.bloom() + cache := NewLRUCache5(150000) + b.StartTimer() for scanner.Scan() { line := scanner.Text() tokens := experiment.tokenizer.Tokens(line) for _, token := range tokens { - sbf.TestAndAdd(token.Key) + str := string(token.Key) + if !cache.Get(str) { + cache.Put(str) + + found := sbf.Test(token.Key) + if !found { + sbf.Add(token.Key) + } + } } } } } -func BenchmarkSBFAdd(b *testing.B) { +func BenchmarkSBFTestAndAddWithLRU5(b *testing.B) { for i := 0; i < b.N; i++ { b.StopTimer() - file, _ := os.Open("big.txt") + file, _ := os.Open(BigFile) defer file.Close() scanner := bufio.NewScanner(file) experiment := NewExperiment( @@ -292,21 +191,92 @@ func BenchmarkSBFAdd(b *testing.B) { onePctError, ) sbf := experiment.bloom() + cache := NewLRUCache5(150000) + b.StartTimer() for scanner.Scan() { line := scanner.Text() tokens := experiment.tokenizer.Tokens(line) for _, token := range tokens { - sbf.Add(token.Key) + str := string(token.Key) + if !cache.Get(str) { + cache.Put(str) + + sbf.TestAndAdd(token.Key) + } } } } } -func BenchmarkSBFSeparateTestAndAdd(b *testing.B) { +func BenchmarkSBFTestAndAddWithByteKeyLRU(b *testing.B) { + for i := 0; i < b.N; i++ { + b.StopTimer() + file, _ := os.Open(BigFile) + defer file.Close() + scanner := bufio.NewScanner(file) + experiment := NewExperiment( + "token=4skip0_error=1%_indexchunks=false", + four, + false, + onePctError, + ) + sbf := experiment.bloom() + cache := NewByteKeyLRUCache(150000) + b.StartTimer() + for scanner.Scan() { + line := scanner.Text() + tokens := experiment.tokenizer.Tokens(line) + for _, token := range tokens { + + array := NewFourByteKeyFromSlice(token.Key) + if !cache.Get(array) { + cache.Put(array) + sbf.TestAndAdd(token.Key) + } + + } + } + } +} + +func BenchmarkSBFTestAndAddWithFourByteKeyLRU(b *testing.B) { for i := 0; i < b.N; i++ { b.StopTimer() - file, _ := os.Open("big.txt") + file, _ := os.Open(BigFile) + defer file.Close() + scanner := bufio.NewScanner(file) + experiment := NewExperiment( + "token=4skip0_error=1%_indexchunks=false", + four, + false, + onePctError, + ) + sbf := experiment.bloom() + cache := NewFourByteKeyLRUCache(150000) + b.StartTimer() + for scanner.Scan() { + line := scanner.Text() + tokens := experiment.tokenizer.Tokens(line) + for _, token := range tokens { + if !cache.Get([4]byte(token.Key)) { + cache.Put([4]byte(token.Key)) + found := sbf.Test(token.Key) + if !found { + sbf.Add(token.Key) + } + //sbf.TestAndAdd(token.Key) + } + + } + } + } +} + +func BenchmarkSBFAddWithLRU(b *testing.B) { + for i := 0; i < b.N; i++ { + b.StopTimer() + file, _ := os.Open(BigFile) defer file.Close() scanner := bufio.NewScanner(file) experiment := NewExperiment( @@ -316,13 +286,14 @@ func BenchmarkSBFSeparateTestAndAdd(b *testing.B) { onePctError, ) sbf := experiment.bloom() + cache := NewLRUCache4(150000) b.StartTimer() for scanner.Scan() { line := scanner.Text() tokens := experiment.tokenizer.Tokens(line) for _, token := range tokens { - found := sbf.Test(token.Key) - if !found { + if !cache.Get(token.Key) { + cache.Put(token.Key) sbf.Add(token.Key) } } @@ -330,10 +301,10 @@ func BenchmarkSBFSeparateTestAndAdd(b *testing.B) { } } -func BenchmarkSBFTestAndAddWithLRU(b *testing.B) { +func BenchmarkSBFSeparateTestAndAddWithLRU1(b *testing.B) { for i := 0; i < b.N; i++ { b.StopTimer() - file, _ := os.Open("big.txt") + file, _ := os.Open(BigFile) defer file.Close() scanner := bufio.NewScanner(file) experiment := NewExperiment( @@ -343,25 +314,30 @@ func BenchmarkSBFTestAndAddWithLRU(b *testing.B) { onePctError, ) sbf := experiment.bloom() - cache := NewLRUCache4(150000) + cache := NewLRUCache(150000) b.StartTimer() for scanner.Scan() { line := scanner.Text() tokens := experiment.tokenizer.Tokens(line) for _, token := range tokens { - if !cache.Get(token.Key) { - cache.Put(token.Key) - sbf.TestAndAdd(token.Key) + str := string(token.Key) + if !cache.Get(str) { + cache.Put(str) + found := sbf.Test(token.Key) + if !found { + sbf.Add(token.Key) + } + //sbf.Add(token.Key) } } } } } -func BenchmarkSBFAddWithLRU(b *testing.B) { +func BenchmarkSBFSeparateTestAndAddWithMap(b *testing.B) { for i := 0; i < b.N; i++ { b.StopTimer() - file, _ := os.Open("big.txt") + file, _ := os.Open(BigFile) defer file.Close() scanner := bufio.NewScanner(file) experiment := NewExperiment( @@ -371,15 +347,27 @@ func BenchmarkSBFAddWithLRU(b *testing.B) { onePctError, ) sbf := experiment.bloom() - cache := NewLRUCache4(150000) + cache := make(map[string]interface{}, 150000) b.StartTimer() for scanner.Scan() { line := scanner.Text() tokens := experiment.tokenizer.Tokens(line) for _, token := range tokens { - if !cache.Get(token.Key) { - cache.Put(token.Key) - sbf.Add(token.Key) + str := string(token.Key) + + _, found := cache[str] + if !found { + cache[str] = "" + f := sbf.Test(token.Key) + if !f { + sbf.Add(token.Key) + } + + if len(cache) > 150000 { + for elem := range cache { + delete(cache, elem) + } + } } } } diff --git a/tools/tsdb/bloom-tester/lrucache.go b/tools/tsdb/bloom-tester/lrucache.go index 160163a920..56caba451f 100644 --- a/tools/tsdb/bloom-tester/lrucache.go +++ b/tools/tsdb/bloom-tester/lrucache.go @@ -1,6 +1,9 @@ package main -import "container/list" +import ( + "container/list" + "fmt" +) type LRUCache struct { capacity int @@ -267,3 +270,337 @@ func (c *HashSet) Clear() { delete(c.cache, k) } } + +// ByteKey is an interface for types that represent keys of a certain size. +type ByteKey interface { + Size() int + Equal(other ByteKey) bool +} + +// FourByteKey represents a key of 4 bytes. +type FourByteKey [4]byte + +// Size returns the size of the FourByteKey. +func (k FourByteKey) Size() int { + return 4 +} + +// Equal checks if two FourByteKeys are equal. +func (k FourByteKey) Equal(other ByteKey) bool { + if otherFourByteKey, ok := other.(FourByteKey); ok { + return k == otherFourByteKey + } + return false +} + +// ThirtyOneByteKey represents a key of 31 bytes. +type ThirtyOneByteKey [31]byte + +// Size returns the size of the ThirtyOneByteKey. +func (k ThirtyOneByteKey) Size() int { + return 31 +} + +// Equal checks if two ThirtyOneByteKeys are equal. +func (k ThirtyOneByteKey) Equal(other ByteKey) bool { + if otherThirtyOneByteKey, ok := other.(ThirtyOneByteKey); ok { + return k == otherThirtyOneByteKey + } + return false +} + +type ByteKeyLRUCache struct { + capacity int + //m map[ByteKey]struct{} + m map[ByteKey]*list.Element + list *list.List +} + +func NewByteKeyLRUCache(capacity int) *ByteKeyLRUCache { + return &ByteKeyLRUCache{ + capacity: capacity, + m: make(map[ByteKey]*list.Element, capacity), + list: list.New(), + } +} + +func (c *ByteKeyLRUCache) Get(key ByteKey) bool { + if value, ok := c.m[key]; ok { + // Move the accessed element to the front of the list + c.list.MoveToFront(value) + return true + } + return false +} + +func (c *ByteKeyLRUCache) Put(key ByteKey) { + if value, ok := c.m[key]; ok { + // If the key already exists, move it to the front + c.list.MoveToFront(value) + } else { + // If the cache is full, remove the least recently used element + if len(c.m) >= c.capacity { + // Get the least recently used element from the back of the list + tailElem := c.list.Back() + if tailElem != nil { + deletedEntry := c.list.Remove(tailElem).(ByteKey) + delete(c.m, deletedEntry) + } + } + + // Add the new key to the cache and the front of the list + elem := c.list.PushFront(key) + c.m[key] = elem + } +} + +func (c *ByteKeyLRUCache) Clear() { + // Iterate through the list and remove all elements + for elem := c.list.Front(); elem != nil; elem = elem.Next() { + delete(c.m, elem.Value.(ByteKey)) + } + + // Clear the list + c.list.Init() +} + +// ByteKeyMap is a map that uses ByteKey as a key. +type ByteKeyMap struct { + capacity int + m map[ByteKey]struct{} +} + +// NewByteKeyMap creates a new ByteKeyMap. +func NewByteKeyMap(capacity int) ByteKeyMap { + return ByteKeyMap{ + capacity: capacity, + m: make(map[ByteKey]struct{}, capacity), + } +} + +// Put adds an entry to the map. +func (bm *ByteKeyMap) Put(key ByteKey) { + bm.m[key] = struct{}{} +} + +// Get retrieves a value from the map based on the key. +func (bm *ByteKeyMap) Get(key ByteKey) bool { + _, exists := bm.m[key] + return exists +} + +type ByteSet struct { + capacity int + cache map[[4]byte]struct{} +} + +func NewByteSet(capacity int) *ByteSet { + return &ByteSet{ + capacity: capacity, + cache: make(map[[4]byte]struct{}), + } +} + +func sliceToByteArray(slice []byte) [4]byte { + // Define the desired size of the byte array + // If you want to make it dynamically sized, use len(slice) + var array [4]byte + + // Copy elements from the slice to the array + copy(array[:], slice) + + return array +} + +// NewFourByteKeyFromSlice converts a byte slice to a FourByteKey. +func NewFourByteKeyFromSlice(slice []byte) FourByteKey { + var key FourByteKey + copy(key[:], slice) + return key +} + +// NewThirtyOneByteKeyFromSlice converts a byte slice to a FourByteKey. +func NewThirtyOneByteKeyFromSlice(slice []byte) ThirtyOneByteKey { + var key ThirtyOneByteKey + copy(key[:], slice) + return key +} + +func (c ByteSet) Get(key string) bool { + if _, ok := c.cache[sliceToByteArray([]byte(key))]; ok { + return true + } + return false +} + +func (c *ByteSet) Put(key string) { + c.cache[sliceToByteArray([]byte(key))] = struct{}{} +} + +func (c *ByteSet) PutBytes(value []byte) { + c.cache[sliceToByteArray(value)] = struct{}{} +} + +func (c *ByteSet) Clear() { + for k := range c.cache { + delete(c.cache, k) + } +} + +type FourByteKeyLRUCache struct { + capacity int + m map[[4]byte]*list.Element + list *list.List +} + +func NewFourByteKeyLRUCache(capacity int) *FourByteKeyLRUCache { + return &FourByteKeyLRUCache{ + capacity: capacity, + m: make(map[[4]byte]*list.Element, capacity), + list: list.New(), + } +} + +func (c *FourByteKeyLRUCache) Get(key [4]byte) bool { + if value, ok := c.m[key]; ok { + // Move the accessed element to the front of the list + c.list.MoveToFront(value) + return true + } + return false +} + +func (c *FourByteKeyLRUCache) Put(key [4]byte) { + if value, ok := c.m[key]; ok { + // If the key already exists, move it to the front + c.list.MoveToFront(value) + } else { + // If the cache is full, remove the least recently used element + if len(c.m) >= c.capacity { + // Get the least recently used element from the back of the list + tailElem := c.list.Back() + if tailElem != nil { + deletedEntry := c.list.Remove(tailElem).([4]byte) + delete(c.m, deletedEntry) + } + } + + // Add the new key to the cache and the front of the list + elem := c.list.PushFront(key) + c.m[key] = elem + } +} + +func (c *FourByteKeyLRUCache) Clear() { + // Iterate through the list and remove all elements + for elem := c.list.Front(); elem != nil; elem = elem.Next() { + delete(c.m, elem.Value.([4]byte)) + } + + // Clear the list + c.list.Init() +} + +type LRUCache5 struct { + capacity int + cache map[string]*LRUNode5 + head *LRUNode5 + tail *LRUNode5 +} + +type LRUNode5 struct { + key string + prev *LRUNode5 + next *LRUNode5 +} + +func NewLRUCache5(capacity int) *LRUCache5 { + return &LRUCache5{ + capacity: capacity, + } +} +func (c *LRUCache5) init() { + c.cache = make(map[string]*LRUNode5, c.capacity) + c.head = new(LRUNode5) + c.tail = new(LRUNode5) + c.head.next = c.tail + c.tail.prev = c.head +} + +func (c *LRUCache5) pop(item *LRUNode5) { + item.prev.next = item.next + item.next.prev = item.prev +} + +func (c *LRUCache5) push(item *LRUNode5) { + c.head.next.prev = item + item.next = c.head.next + item.prev = c.head + c.head.next = item +} + +func (c *LRUCache5) evict() *LRUNode5 { + item := c.tail.prev + c.pop(item) + delete(c.cache, item.key) + return item +} + +func (c *LRUCache5) Get(key string) bool { + if c.cache == nil { + c.init() + } + item := c.cache[key] + if item == nil { + return false + } + if c.head.next != item { + c.pop(item) + c.push(item) + } + return true +} + +func (c *LRUCache5) Put(key string) { + if c.cache == nil { + c.init() + } + item := c.cache[key] + if item == nil { + if len(c.cache) == c.capacity { + item = c.evict() + } else { + item = new(LRUNode5) + } + item.key = key + c.push(item) + c.cache[key] = item + } else { + if c.head.next != item { + c.pop(item) + c.push(item) + } + } +} + +func (c *LRUCache5) Clear() { + if c.cache != nil { + + for elem := range c.cache { + delete(c.cache, elem) + } + + c.head = nil + c.tail = nil + } +} + +func (c *LRUCache5) Dump() { + if c.cache != nil { + + for elem := range c.cache { + fmt.Println(elem) + } + + } +} diff --git a/tools/tsdb/bloom-tester/lrucache_test.go b/tools/tsdb/bloom-tester/lrucache_test.go new file mode 100644 index 0000000000..c1125af011 --- /dev/null +++ b/tools/tsdb/bloom-tester/lrucache_test.go @@ -0,0 +1,206 @@ +package main + +import ( + "encoding/binary" + "github.com/stretchr/testify/require" + "strconv" + "testing" +) + +var num = 1000000 + +func BenchmarkLRU1Put(b *testing.B) { + cache := NewLRUCache(num) + for i := 0; i < b.N; i++ { + cache.Put(strconv.Itoa(i)) + } +} + +func BenchmarkLRU1Get(b *testing.B) { + cache := NewLRUCache(num) + for i := 0; i < num; i++ { + cache.Put(strconv.Itoa(i)) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + cache.Get(strconv.Itoa(i)) + } +} + +func BenchmarkLRU2Put(b *testing.B) { + cache := NewLRUCache2(num) + for i := 0; i < b.N; i++ { + cache.Put(strconv.Itoa(i)) + } +} + +func BenchmarkLRU2Get(b *testing.B) { + cache := NewLRUCache2(num) + for i := 0; i < num; i++ { + cache.Put(strconv.Itoa(i)) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + cache.Get(strconv.Itoa(i)) + } +} + +func BenchmarkLRU4Put(b *testing.B) { + cache := NewLRUCache4(num) + for i := 0; i < b.N; i++ { + cache.Put([]byte(strconv.Itoa(i))) + } +} + +func BenchmarkLRU4Get(b *testing.B) { + cache := NewLRUCache4(num) + for i := 0; i < num; i++ { + cache.Put([]byte(strconv.Itoa(i))) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + cache.Get([]byte(strconv.Itoa(i))) + } +} + +func TestByteSet(t *testing.T) { + set := NewByteSet(30) + set.Put("fooa") + set.PutBytes([]byte("foob")) + for _, tc := range []struct { + desc string + input string + exp bool + }{ + { + desc: "test string put", + input: "fooa", + exp: true, + }, + { + desc: "test byte put", + input: "foob", + exp: true, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + require.Equal(t, tc.exp, set.Get(tc.input)) + }) + } +} + +func TestByteKeyLRUCache(t *testing.T) { + set := NewByteKeyLRUCache(30) + set.Put(NewFourByteKeyFromSlice([]byte("fooa"))) + //set.PutBytes([]byte("foob")) + for _, tc := range []struct { + desc string + input string + exp bool + }{ + { + desc: "test valid", + input: "fooa", + exp: true, + }, + { + desc: "test not valid", + input: "foob", + exp: false, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + require.Equal(t, tc.exp, set.Get(NewFourByteKeyFromSlice([]byte(tc.input)))) + }) + } +} + +func TestLRUCache5(t *testing.T) { + set := NewLRUCache5(30) + set.Put("fooa") + for _, tc := range []struct { + desc string + input string + exp bool + }{ + { + desc: "test valid", + input: "fooa", + exp: true, + }, + { + desc: "test not valid", + input: "foob", + exp: false, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + require.Equal(t, tc.exp, set.Get(tc.input)) + }) + } +} + +func BenchmarkLRU5Put(b *testing.B) { + cache := NewLRUCache5(num) + for i := 0; i < b.N; i++ { + cache.Put(strconv.Itoa(i)) + } +} + +func BenchmarkLRU5Get(b *testing.B) { + cache := NewLRUCache5(num) + for i := 0; i < num; i++ { + cache.Put(strconv.Itoa(i)) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + cache.Get(strconv.Itoa(i)) + } +} + +func BenchmarkByteKeyLRUCacheSet(b *testing.B) { + buf := make([]byte, 26) + cache := NewByteKeyLRUCache(num) + for i := 0; i < b.N; i++ { + binary.LittleEndian.PutUint64(buf, uint64(i)) + + cache.Put(NewThirtyOneByteKeyFromSlice(buf)) + } +} + +func BenchmarkByteKeyLRUCacheGet(b *testing.B) { + buf := make([]byte, 26) + + cache := NewByteKeyLRUCache(num) + for i := 0; i < b.N; i++ { + binary.LittleEndian.PutUint64(buf, uint64(i)) + + cache.Put(NewThirtyOneByteKeyFromSlice(buf)) + //cache.Put(NewTwentySixByteKeyFromSlice([]byte(strconv.Itoa(i)))) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + binary.LittleEndian.PutUint64(buf, uint64(i)) + + cache.Get(NewThirtyOneByteKeyFromSlice(buf)) + //cache.Get(NewTwentySixByteKeyFromSlice([]byte(strconv.Itoa(i)))) + } +} + +func BenchmarkByteSetPut(b *testing.B) { + cache := NewByteSet(num) + for i := 0; i < b.N; i++ { + cache.Put(strconv.Itoa(i)) + } +} + +func BenchmarkByteSetGet(b *testing.B) { + cache := NewByteSet(num) + for i := 0; i < b.N; i++ { + cache.Put(strconv.Itoa(i)) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + cache.Get(strconv.Itoa(i)) + } +} diff --git a/tools/tsdb/bloom-tester/metrics.go b/tools/tsdb/bloom-tester/metrics.go index 0b1d5d5499..c330d7edb8 100644 --- a/tools/tsdb/bloom-tester/metrics.go +++ b/tools/tsdb/bloom-tester/metrics.go @@ -1,6 +1,7 @@ package main import ( + bt "github.com/grafana/loki/pkg/storage/bloom/v1" "github.com/grafana/loki/pkg/storage/bloom/v1/filter" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -8,12 +9,12 @@ import ( type Experiment struct { name string - tokenizer Tokenizer + tokenizer bt.Tokenizer bloom func() *filter.ScalableBloomFilter encodeChunkID bool } -func NewExperiment(name string, tokenizer Tokenizer, encodeChunkID bool, bloom func() *filter.ScalableBloomFilter) Experiment { +func NewExperiment(name string, tokenizer bt.Tokenizer, encodeChunkID bool, bloom func() *filter.ScalableBloomFilter) Experiment { return Experiment{ name: name, tokenizer: tokenizer, diff --git a/tools/tsdb/bloom-tester/readlib.go b/tools/tsdb/bloom-tester/readlib.go index 4d70b03463..7f1dc51f93 100644 --- a/tools/tsdb/bloom-tester/readlib.go +++ b/tools/tsdb/bloom-tester/readlib.go @@ -9,6 +9,7 @@ import ( "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/log" + bt "github.com/grafana/loki/pkg/storage/bloom/v1" "github.com/grafana/loki/pkg/storage/bloom/v1/filter" "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/config" @@ -198,15 +199,15 @@ func analyzeRead(metrics *Metrics, sampler Sampler, shipper indexshipper.IndexSh objectClient) for gotIdx := range got { // for every chunk for _, queryExperiment := range queryExperiments { // for each search string - if len(queryExperiment.searchString) >= experiment.tokenizer.getMin()+experiment.tokenizer.getSkip() { + if len(queryExperiment.searchString) >= experiment.tokenizer.GetMin()+experiment.tokenizer.GetSkip() { foundInChunk := false foundInSbf := false - chunkTokenizer := ChunkIDTokenizerHalfInit(experiment.tokenizer) + chunkTokenizer := bt.ChunkIDTokenizer(experiment.tokenizer) - chunkTokenizer.reinit(got[gotIdx].ChunkRef) - var tokenizer Tokenizer = chunkTokenizer + chunkTokenizer.Reinit(got[gotIdx].ChunkRef) + var tokenizer bt.Tokenizer = chunkTokenizer if !experiment.encodeChunkID { tokenizer = experiment.tokenizer } @@ -309,10 +310,10 @@ func readSBFFromObjectStorage(location, prefix, period, tenant, series string, o return sbf } -func searchSbf(sbf *filter.ScalableBloomFilter, tokenizer Tokenizer, searchString string) bool { - for i := 0; i <= tokenizer.getSkip(); i++ { +func searchSbf(sbf *filter.ScalableBloomFilter, tokenizer bt.Tokenizer, searchString string) bool { + for i := 0; i <= tokenizer.GetSkip(); i++ { numMatches := 0 - if (len(searchString) - i) >= tokenizer.getMin() { + if (len(searchString) - i) >= tokenizer.GetMin() { tokens := tokenizer.Tokens(searchString[i:]) for _, token := range tokens { diff --git a/tools/tsdb/bloom-tester/tokenizer.go b/tools/tsdb/bloom-tester/tokenizer.go deleted file mode 100644 index e0e5d9e5b5..0000000000 --- a/tools/tsdb/bloom-tester/tokenizer.go +++ /dev/null @@ -1,255 +0,0 @@ -package main - -import ( - "encoding/binary" - "unicode/utf8" - - "github.com/grafana/loki/pkg/logproto" -) - -type Token struct { - Key []byte - Value string -} - -type Tokenizer interface { - Tokens(line string) []Token - getSkip() int - getMin() int - getMax() int -} - -/* -type logfmtTokenizer struct { - parser *log.LogfmtParser - lbls *log.LabelsBuilder -} - -func (t *logfmtTokenizer) Tokens(line string) []Token { - t.lbls.Reset() - t.parser.Process(0, []byte(line), t.lbls) - ls := t.lbls.LabelsResult().Labels() - res := make([]Token, 0, len(ls)) - for _, l := range ls { - res = append(res, Token{Key: l.Name, Value: l.Value}) - } - return res -} - -func newLogfmtTokenizer() *logfmtTokenizer { - return &logfmtTokenizer{ - // non strict, allow empty values - parser: log.NewLogfmtParser(false, true), - lbls: log.NewBaseLabelsBuilder().ForLabels(nil, 0), - } -} - -*/ - -type ngramTokenizer struct { - // [min,max) exclusivity - min, max, skip int - buffers [][]rune // circular buffers used for ngram generation - runeBuffer []byte // buffer used for token generation - tokenBuffer []Token // buffer used for holding tokens that is returned - internalTokenBuffer []Token // circular buffer for tokens -} - -func newNGramTokenizer(min, max, skip int) *ngramTokenizer { - capacity := max - min - t := &ngramTokenizer{ - min: min, - max: max, - skip: skip, - buffers: make([][]rune, capacity), - } - for i := t.min; i < t.max; i++ { - t.buffers[i-t.min] = make([]rune, i) - } - t.runeBuffer = make([]byte, 0, max*4) - t.tokenBuffer = make([]Token, 0, 1024) - t.internalTokenBuffer = make([]Token, 0, 1024) - for i := 0; i < cap(t.internalTokenBuffer); i++ { - tok := Token{} - tok.Key = make([]byte, 0, 132) - t.internalTokenBuffer = append(t.internalTokenBuffer, tok) - } - - return t -} - -func (t *ngramTokenizer) getSkip() int { - return t.skip -} - -func (t *ngramTokenizer) getMin() int { - return t.min -} - -func (t *ngramTokenizer) getMax() int { - return t.max -} - -func (t *ngramTokenizer) Tokens(line string) []Token { - t.tokenBuffer = t.tokenBuffer[:0] // Reset the result slice - var i int // rune index (not position that is measured in the range loop) - numToks := 0 - for _, r := range line { - - // j is the index of the buffer to use - for j := 0; j < (t.max - t.min); j++ { - // n is the length of the ngram - n := j + t.min - // pos is the position in the buffer to overwrite - pos := i % n - t.buffers[j][pos] = r - - if i >= n-1 && (i+1-n)%(t.skip+1) == 0 { - t.runeBuffer = reassemble(t.buffers[j], (i+1)%n, t.runeBuffer) - //fmt.Println(numToks, cap(t.internalTokenBuffer), len(t.internalTokenBuffer)) - if numToks >= cap(t.internalTokenBuffer) || numToks == len(t.internalTokenBuffer) { - tok := Token{} - tok.Key = make([]byte, 0, 132) - t.internalTokenBuffer = append(t.internalTokenBuffer, tok) - } - //fmt.Println(numToks, cap(t.internalTokenBuffer), len(t.internalTokenBuffer)) - t.internalTokenBuffer[numToks].Key = t.internalTokenBuffer[numToks].Key[:0] - t.internalTokenBuffer[numToks].Key = append(t.internalTokenBuffer[numToks].Key, t.runeBuffer...) - t.internalTokenBuffer[numToks].Value = string(t.internalTokenBuffer[numToks].Key) - numToks++ - } - } - i++ - } - t.tokenBuffer = append(t.tokenBuffer, t.internalTokenBuffer[:numToks]...) - return t.tokenBuffer -} - -func (t *ngramTokenizer) OldTokens(line string) []Token { - t.tokenBuffer = t.tokenBuffer[:0] // Reset the result slice - var i int // rune index (not position that is measured in the range loop) - for _, r := range line { - - // j is the index of the buffer to use - for j := 0; j < (t.max - t.min); j++ { - // n is the length of the ngram - n := j + t.min - // pos is the position in the buffer to overwrite - pos := i % n - t.buffers[j][pos] = r - - if i >= n-1 && (i+1-n)%(t.skip+1) == 0 { - t.runeBuffer = reassemble(t.buffers[j], (i+1)%n, t.runeBuffer) - b := Token{} - b.Key = make([]byte, 0, 132) // TODO: Yeah, that's too big but I didn't fee like doing the math at the end of the day - b.Key = append(b.Key, t.runeBuffer...) - b.Value = string(b.Key) - t.tokenBuffer = append(t.tokenBuffer, b) - } - } - i++ - } - return t.tokenBuffer -} - -func reassemble(buf []rune, pos int, result []byte) []byte { - result = result[:0] // Reset the result slice - for i := 0; i < len(buf); i++ { - cur := (pos + i) % len(buf) - result = utf8.AppendRune(result, buf[cur]) - } - return result -} - -type WrappedTokenizer struct { - t Tokenizer - f func(Token) Token - tokenBuffer []Token - prefix []byte - i64buf []byte - i32buf []byte -} - -func (w *WrappedTokenizer) Tokens(line string) []Token { - w.tokenBuffer = w.tokenBuffer[:0] // Reset the result slice - toks := w.t.Tokens(line) - for _, tok := range toks { - w.tokenBuffer = append(w.tokenBuffer, w.f(tok)) - } - return append(w.tokenBuffer, toks...) -} - -func (w *WrappedTokenizer) getSkip() int { - return w.t.getSkip() -} - -func (w *WrappedTokenizer) getMin() int { - return w.t.getMin() -} - -func (w *WrappedTokenizer) getMax() int { - return w.t.getMax() -} - -func ChunkIDTokenizer(chk logproto.ChunkRef, t Tokenizer) *WrappedTokenizer { - //prefix := fmt.Sprintf("%d:%d:%d:", chk.From, chk.Through, chk.Checksum) - p := make([]byte, 0, 256) - i64buf := make([]byte, binary.MaxVarintLen64) - i32buf := make([]byte, 4) - - binary.PutVarint(i64buf, int64(chk.From)) - p = append(p, i64buf...) - p = append(p, 58) - binary.PutVarint(i64buf, int64(chk.Through)) - p = append(p, i64buf...) - p = append(p, 58) - binary.LittleEndian.PutUint32(i32buf, chk.Checksum) - p = append(p, i32buf...) - p = append(p, 58) - - return &WrappedTokenizer{ - t: t, - f: func(tok Token) Token { - tok.Key = append(append(tok.Key, p...), tok.Key...)[len(tok.Key):] - tok.Value = string(tok.Key) - return tok - }, - tokenBuffer: make([]Token, 0, 1024), - prefix: p, - i64buf: i64buf, - i32buf: i32buf, - } -} - -func ChunkIDTokenizerHalfInit(t Tokenizer) *WrappedTokenizer { - p := make([]byte, 0, 256) - return &WrappedTokenizer{ - t: t, - tokenBuffer: make([]Token, 0, 1024), - prefix: p, - i64buf: make([]byte, binary.MaxVarintLen64), - i32buf: make([]byte, 4), - } -} - -func (w *WrappedTokenizer) reinit(chk logproto.ChunkRef) { - //prefix := fmt.Sprintf("%d:%d:%d:", chk.From, chk.Through, chk.Checksum) - w.prefix = w.prefix[:0] - - //w.prefix = fmt.Appendf(w.prefix, "%d:%d:%d:", chk.From, chk.Through, chk.Checksum) - binary.PutVarint(w.i64buf, int64(chk.From)) - w.prefix = append(w.prefix, w.i64buf...) - w.prefix = append(w.prefix, 58) - binary.PutVarint(w.i64buf, int64(chk.Through)) - w.prefix = append(w.prefix, w.i64buf...) - w.prefix = append(w.prefix, 58) - binary.LittleEndian.PutUint32(w.i32buf, chk.Checksum) - w.prefix = append(w.prefix, w.i32buf...) - w.prefix = append(w.prefix, 58) - - w.f = func(tok Token) Token { - tok.Key = append(append(tok.Key, w.prefix...), tok.Key...)[len(tok.Key):] - tok.Value = string(tok.Key) - return tok - } -}