Initial loki bloom creation library (#10957)

**What this PR does / why we need it**:

Optimization and benchmarking of various LRUs that sit in front of the
bloom filters.
Initial creation of a library that can be used to create blooms from
chunks

**Which issue(s) this PR fixes**:
Fixes #<issue number>

**Special notes for your reviewer**:

**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [ ] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e3ec)

---------

Co-authored-by: Owen Diehl <ow.diehl@gmail.com>
pull/11020/head^2
Paul Rogers 2 years ago committed by GitHub
parent 54320f27e3
commit c7eb757ed8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 111
      pkg/storage/bloom/v1/bloom_tokenizer.go
  2. 30
      pkg/storage/bloom/v1/bloom_tokenizer_test.go
  3. 162
      pkg/storage/bloom/v1/tokenizer.go
  4. 244
      pkg/storage/bloom/v1/tokenizer_test.go
  5. 121
      tools/tsdb/bloom-tester/lib.go
  6. 486
      tools/tsdb/bloom-tester/lib_test.go
  7. 339
      tools/tsdb/bloom-tester/lrucache.go
  8. 206
      tools/tsdb/bloom-tester/lrucache_test.go
  9. 5
      tools/tsdb/bloom-tester/metrics.go
  10. 15
      tools/tsdb/bloom-tester/readlib.go
  11. 255
      tools/tsdb/bloom-tester/tokenizer.go

@ -0,0 +1,111 @@
package v1
import (
"context"
"math"
"time"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/storage/bloom/v1/filter"
"github.com/grafana/loki/pkg/storage/chunk"
util_log "github.com/grafana/loki/pkg/util/log"
//"github.com/grafana/loki/tools/tsdb/helpers"
)
type metrics struct{}
/*
BloomTokenizer is a utility that converts either Loki chunks or individual lines into tokens.
These tokens are n-grams, representing adjacent letters, that are used to populate a bloom filter.
https://en.wikipedia.org/wiki/Bloom_filter
Bloom filters are utilized for faster lookups of log lines.
*/
type BloomTokenizer struct {
metrics *metrics
lineTokenizer Tokenizer
chunkIDTokenizer *WrappedTokenizer
cache map[string]interface{}
}
const CacheSize = 150000
// NewBloomTokenizer returns a new instance of the Bloom Tokenizer.
// Warning: the tokens returned use the same byte slice to reduce allocations. This has two consequences:
// 1) The token slices generated must not be mutated externally
// 2) The token slice must not be used after the next call to `Tokens()` as it will repopulate the slice.
// 2) This is not thread safe.
func NewBloomTokenizer(reg prometheus.Registerer) (*BloomTokenizer, error) {
t := &BloomTokenizer{
metrics: newMetrics(reg),
}
t.cache = make(map[string]interface{}, CacheSize)
t.lineTokenizer = NewNGramTokenizer(4, 5, 0) // default to 4-grams, no skip
t.chunkIDTokenizer = ChunkIDTokenizer(t.lineTokenizer)
level.Info(util_log.Logger).Log("bloom tokenizer created")
return t, nil
}
func (bt *BloomTokenizer) SetLineTokenizer(t Tokenizer) {
bt.lineTokenizer = t
bt.chunkIDTokenizer = ChunkIDTokenizer(bt.lineTokenizer)
}
// TODO: Something real here with metrics
func newMetrics(r prometheus.Registerer) *metrics {
return &metrics{}
}
func clearCache(cache map[string]interface{}) {
for k := range cache {
delete(cache, k)
}
}
func (bt *BloomTokenizer) PopulateSBF(sbf *filter.ScalableBloomFilter, chunks []chunk.Chunk) {
clearCache(bt.cache)
for idx := range chunks {
lc := chunks[idx].Data.(*chunkenc.Facade).LokiChunk()
bt.chunkIDTokenizer.Reinit(chunks[idx].ChunkRef)
// TODO: error handling
itr, _ := lc.Iterator(
context.Background(),
time.Unix(0, 0), // TODO: Parameterize/better handle the timestamps?
time.Unix(0, math.MaxInt64),
logproto.FORWARD,
log.NewNoopPipeline().ForStream(chunks[idx].Metric),
)
for itr.Next() && itr.Error() == nil {
toks := bt.chunkIDTokenizer.Tokens(itr.Entry().Line)
for _, tok := range toks {
if tok.Key != nil {
str := string(tok.Key)
_, found := bt.cache[str] // A cache is used ahead of the SBF, as it cuts out the costly operations of scaling bloom filters
if !found {
bt.cache[str] = nil
sbf.TestAndAdd(tok.Key)
if len(bt.cache) > 150000 { // While crude, this has proven efficient in performance testing. This speaks to the similarity in log lines near each other
clearCache(bt.cache)
}
}
}
}
}
} // for each chunk
}
func (bt *BloomTokenizer) TokenizeLine(line string) []Token {
return bt.lineTokenizer.Tokens(line)
}

@ -0,0 +1,30 @@
package v1
import (
"fmt"
"testing"
"github.com/prometheus/client_golang/prometheus"
)
func BenchmarkMapClear(b *testing.B) {
bt, _ := NewBloomTokenizer(prometheus.DefaultRegisterer)
for i := 0; i < b.N; i++ {
for k := 0; k < CacheSize; k++ {
bt.cache[fmt.Sprint(k)] = k
}
clearCache(bt.cache)
}
}
func BenchmarkNewMap(b *testing.B) {
bt, _ := NewBloomTokenizer(prometheus.DefaultRegisterer)
for i := 0; i < b.N; i++ {
for k := 0; k < CacheSize; k++ {
bt.cache[fmt.Sprint(k)] = k
}
bt.cache = make(map[string]interface{}, CacheSize)
}
}

@ -0,0 +1,162 @@
package v1
import (
"encoding/binary"
"unicode/utf8"
"github.com/grafana/loki/pkg/logproto"
)
type Token struct {
Key []byte
}
type Tokenizer interface {
Tokens(line string) []Token
GetSkip() int
GetMin() int
GetMax() int
}
const TokenBufferSize = 4096
const TokenKeySize = 132
type NgramTokenizer struct {
// [min,max) exclusivity
min, max, skip int
buffers [][]rune // circular buffers used for ngram generation
runeBuffer []byte // buffer used for token generation
internalTokenBuffer []Token // circular buffer for tokens
}
/*
N-Grams (https://en.wikipedia.org/wiki/N-gram) are a series of 'n' adjacent characters in a string.
These will be utilized for the bloom filters to allow for fuzzy searching.
*/
func NewNGramTokenizer(min, max, skip int) *NgramTokenizer {
capacity := max - min
t := &NgramTokenizer{
min: min,
max: max,
skip: skip,
buffers: make([][]rune, capacity),
runeBuffer: make([]byte, 0, max*4),
internalTokenBuffer: make([]Token, 0, TokenBufferSize),
}
for i := range t.buffers {
t.buffers[i] = make([]rune, t.min+i)
}
for i := 0; i < cap(t.internalTokenBuffer); i++ {
t.internalTokenBuffer = append(t.internalTokenBuffer, Token{Key: make([]byte, 0, TokenKeySize)})
}
return t
}
func (t *NgramTokenizer) GetSkip() int {
return t.skip
}
func (t *NgramTokenizer) GetMin() int {
return t.min
}
func (t *NgramTokenizer) GetMax() int {
return t.max
}
func (t *NgramTokenizer) Tokens(line string) []Token {
var i int // rune index (not position that is measured in the range loop)
numToks := 0
for _, r := range line {
// j is the index of the buffer to use
for j := 0; j < (t.max - t.min); j++ {
// n is the length of the ngram
n := j + t.min
// pos is the position in the buffer to overwrite
pos := i % n
t.buffers[j][pos] = r
if i >= n-1 && (i+1-n)%(t.skip+1) == 0 {
t.runeBuffer = reassemble(t.buffers[j], (i+1)%n, t.runeBuffer)
if numToks >= cap(t.internalTokenBuffer) || numToks == len(t.internalTokenBuffer) {
t.internalTokenBuffer = append(t.internalTokenBuffer, Token{Key: make([]byte, 0, TokenKeySize)})
}
t.internalTokenBuffer[numToks].Key = t.internalTokenBuffer[numToks].Key[:0]
t.internalTokenBuffer[numToks].Key = append(t.internalTokenBuffer[numToks].Key, t.runeBuffer...)
numToks++
}
}
i++
}
return t.internalTokenBuffer[0:numToks]
}
func reassemble(buf []rune, pos int, result []byte) []byte {
result = result[:0] // Reset the result slice
for i := 0; i < len(buf); i++ {
cur := (pos + i) % len(buf)
result = utf8.AppendRune(result, buf[cur])
}
return result
}
func chunkIDTransformer(tok Token, prefix []byte) Token {
tok.Key = append(append(tok.Key, prefix...), tok.Key...)[len(tok.Key):]
return tok
}
type WrappedTokenizer struct {
t Tokenizer
tokenBuffer []Token
prefix []byte
i64buf []byte
i32buf []byte
}
func (w *WrappedTokenizer) Tokens(line string) []Token {
w.tokenBuffer = w.tokenBuffer[:0] // Reset the result slice
toks := w.t.Tokens(line)
for _, tok := range toks {
w.tokenBuffer = append(w.tokenBuffer, chunkIDTransformer(tok, w.prefix), tok)
}
return w.tokenBuffer
}
func (w *WrappedTokenizer) GetSkip() int {
return w.t.GetSkip()
}
func (w *WrappedTokenizer) GetMin() int {
return w.t.GetMin()
}
func (w *WrappedTokenizer) GetMax() int {
return w.t.GetMax()
}
func ChunkIDTokenizer(t Tokenizer) *WrappedTokenizer {
p := make([]byte, 0, 256)
return &WrappedTokenizer{
t: t,
tokenBuffer: make([]Token, 0, TokenBufferSize),
prefix: p,
i64buf: make([]byte, binary.MaxVarintLen64),
i32buf: make([]byte, 4),
}
}
func (w *WrappedTokenizer) Reinit(chk logproto.ChunkRef) {
w.prefix = w.prefix[:0]
binary.PutVarint(w.i64buf, int64(chk.From))
w.prefix = append(w.prefix, w.i64buf...)
binary.PutVarint(w.i64buf, int64(chk.Through))
w.prefix = append(w.prefix, w.i64buf...)
binary.LittleEndian.PutUint32(w.i32buf, chk.Checksum)
w.prefix = append(w.prefix, w.i32buf...)
}

@ -1,17 +1,32 @@
package main
package v1
import (
"bufio"
"encoding/binary"
"github.com/grafana/loki/pkg/logproto"
"os"
"testing"
"github.com/grafana/loki/pkg/logproto"
"github.com/stretchr/testify/require"
)
func TestNGramTokenizer(t *testing.T) {
tokenizer := threeSkip2
const BigFile = "../../../logql/sketch/testdata/war_peace.txt"
var (
twoSkipOne = NewNGramTokenizer(2, 3, 1)
three = NewNGramTokenizer(3, 4, 0)
threeSkip1 = NewNGramTokenizer(3, 4, 1)
threeSkip2 = NewNGramTokenizer(3, 4, 2)
four = NewNGramTokenizer(4, 5, 0)
fourSkip1 = NewNGramTokenizer(4, 5, 1)
fourSkip2 = NewNGramTokenizer(4, 5, 2)
five = NewNGramTokenizer(5, 6, 0)
six = NewNGramTokenizer(6, 7, 0)
)
func TestNGrams(t *testing.T) {
tokenizer := NewNGramTokenizer(2, 4, 0)
for _, tc := range []struct {
desc string
input string
@ -27,10 +42,25 @@ func TestNGramTokenizer(t *testing.T) {
input: "a",
exp: []Token{},
},
{
desc: "two chars",
input: "ab",
exp: []Token{{Key: []byte("ab")}},
},
{
desc: "three chars",
input: "abc",
exp: []Token{{Key: []byte("ab")}, {Key: []byte("bc")}, {Key: []byte("abc")}},
},
{
desc: "four chars",
input: "abcd",
exp: []Token{{Key: []byte("abc"), Value: "abc"}},
exp: []Token{{Key: []byte("ab")}, {Key: []byte("bc")}, {Key: []byte("abc")}, {Key: []byte("cd")}, {Key: []byte("bcd")}},
},
{
desc: "foo",
input: "日本語",
exp: []Token{{Key: []byte("日本")}, {Key: []byte("本語")}, {Key: []byte("日本語")}},
},
} {
t.Run(tc.desc, func(t *testing.T) {
@ -39,7 +69,50 @@ func TestNGramTokenizer(t *testing.T) {
}
}
func Test3Gram0SkipTokenizer(t *testing.T) {
func TestNGramsSkip(t *testing.T) {
for _, tc := range []struct {
desc string
tokenizer *NgramTokenizer
input string
exp []Token
}{
{
desc: "four chars",
tokenizer: twoSkipOne,
input: "abcd",
exp: []Token{{Key: []byte("ab")}, {Key: []byte("cd")}},
},
{
desc: "special chars",
tokenizer: twoSkipOne,
input: "日本語",
exp: []Token{{Key: []byte("日本")}},
},
{
desc: "multi",
tokenizer: NewNGramTokenizer(2, 4, 1),
input: "abcdefghij",
exp: []Token{
{Key: []byte("ab")},
{Key: []byte("abc")},
{Key: []byte("cd")},
{Key: []byte("cde")},
{Key: []byte("ef")},
{Key: []byte("efg")},
{Key: []byte("gh")},
{Key: []byte("ghi")},
{Key: []byte("ij")},
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
require.Equal(t, tc.exp, tc.tokenizer.Tokens(tc.input))
})
}
}
func Test3GramSkip0Tokenizer(t *testing.T) {
tokenizer := three
for _, tc := range []struct {
desc string
@ -59,12 +132,12 @@ func Test3Gram0SkipTokenizer(t *testing.T) {
{
desc: "three char",
input: "abc",
exp: []Token{{Key: []byte("abc"), Value: "abc"}},
exp: []Token{{Key: []byte("abc")}},
},
{
desc: "four chars",
input: "abcd",
exp: []Token{{Key: []byte("abc"), Value: "abc"}, {Key: []byte("bcd"), Value: "bcd"}},
exp: []Token{{Key: []byte("abc")}, {Key: []byte("bcd")}},
},
} {
t.Run(tc.desc, func(t *testing.T) {
@ -73,7 +146,7 @@ func Test3Gram0SkipTokenizer(t *testing.T) {
}
}
func Test3Gram1SkipTokenizer(t *testing.T) {
func Test3GramSkip1Tokenizer(t *testing.T) {
tokenizer := threeSkip1
for _, tc := range []struct {
desc string
@ -93,17 +166,46 @@ func Test3Gram1SkipTokenizer(t *testing.T) {
{
desc: "three char",
input: "abc",
exp: []Token{{Key: []byte("abc"), Value: "abc"}},
exp: []Token{{Key: []byte("abc")}},
},
{
desc: "four chars",
input: "abcd",
exp: []Token{{Key: []byte("abc"), Value: "abc"}},
exp: []Token{{Key: []byte("abc")}},
},
{
desc: "five chars",
input: "abcde",
exp: []Token{{Key: []byte("abc"), Value: "abc"}, {Key: []byte("cde"), Value: "cde"}},
exp: []Token{{Key: []byte("abc")}, {Key: []byte("cde")}},
},
} {
t.Run(tc.desc, func(t *testing.T) {
require.Equal(t, tc.exp, tokenizer.Tokens(tc.input))
})
}
}
func Test3GramSkip2Tokenizer(t *testing.T) {
tokenizer := threeSkip2
for _, tc := range []struct {
desc string
input string
exp []Token
}{
{
desc: "empty",
input: "",
exp: []Token{},
},
{
desc: "single char",
input: "a",
exp: []Token{},
},
{
desc: "four chars",
input: "abcd",
exp: []Token{{Key: []byte("abc")}},
},
} {
t.Run(tc.desc, func(t *testing.T) {
@ -112,7 +214,7 @@ func Test3Gram1SkipTokenizer(t *testing.T) {
}
}
func Test4Gram0SkipTokenizer(t *testing.T) {
func Test4GramSkip0Tokenizer(t *testing.T) {
tokenizer := four
for _, tc := range []struct {
desc string
@ -137,12 +239,12 @@ func Test4Gram0SkipTokenizer(t *testing.T) {
{
desc: "four chars",
input: "abcd",
exp: []Token{{Key: []byte("abcd"), Value: "abcd"}},
exp: []Token{{Key: []byte("abcd")}},
},
{
desc: "five chars",
input: "abcde",
exp: []Token{{Key: []byte("abcd"), Value: "abcd"}, {Key: []byte("bcde"), Value: "bcde"}},
exp: []Token{{Key: []byte("abcd")}, {Key: []byte("bcde")}},
},
} {
t.Run(tc.desc, func(t *testing.T) {
@ -151,7 +253,7 @@ func Test4Gram0SkipTokenizer(t *testing.T) {
}
}
func Test4Gram1SkipTokenizer(t *testing.T) {
func Test4GramSkip1Tokenizer(t *testing.T) {
tokenizer := fourSkip1
for _, tc := range []struct {
desc string
@ -176,27 +278,27 @@ func Test4Gram1SkipTokenizer(t *testing.T) {
{
desc: "four chars",
input: "abcd",
exp: []Token{{Key: []byte("abcd"), Value: "abcd"}},
exp: []Token{{Key: []byte("abcd")}},
},
{
desc: "five chars",
input: "abcde",
exp: []Token{{Key: []byte("abcd"), Value: "abcd"}},
exp: []Token{{Key: []byte("abcd")}},
},
{
desc: "six chars",
input: "abcdef",
exp: []Token{{Key: []byte("abcd"), Value: "abcd"}, {Key: []byte("cdef"), Value: "cdef"}},
exp: []Token{{Key: []byte("abcd")}, {Key: []byte("cdef")}},
},
{
desc: "seven chars",
input: "abcdefg",
exp: []Token{{Key: []byte("abcd"), Value: "abcd"}, {Key: []byte("cdef"), Value: "cdef"}},
exp: []Token{{Key: []byte("abcd")}, {Key: []byte("cdef")}},
},
{
desc: "eight chars",
input: "abcdefgh",
exp: []Token{{Key: []byte("abcd"), Value: "abcd"}, {Key: []byte("cdef"), Value: "cdef"}, {Key: []byte("efgh"), Value: "efgh"}},
exp: []Token{{Key: []byte("abcd")}, {Key: []byte("cdef")}, {Key: []byte("efgh")}},
},
} {
t.Run(tc.desc, func(t *testing.T) {
@ -205,7 +307,7 @@ func Test4Gram1SkipTokenizer(t *testing.T) {
}
}
func Test4Gram2SkipTokenizer(t *testing.T) {
func Test4GramSkip2Tokenizer(t *testing.T) {
tokenizer := fourSkip2
for _, tc := range []struct {
desc string
@ -230,37 +332,37 @@ func Test4Gram2SkipTokenizer(t *testing.T) {
{
desc: "four chars",
input: "abcd",
exp: []Token{{Key: []byte("abcd"), Value: "abcd"}},
exp: []Token{{Key: []byte("abcd")}},
},
{
desc: "five chars",
input: "abcde",
exp: []Token{{Key: []byte("abcd"), Value: "abcd"}},
exp: []Token{{Key: []byte("abcd")}},
},
{
desc: "six chars",
input: "abcdef",
exp: []Token{{Key: []byte("abcd"), Value: "abcd"}},
exp: []Token{{Key: []byte("abcd")}},
},
{
desc: "seven chars",
input: "abcdefg",
exp: []Token{{Key: []byte("abcd"), Value: "abcd"}, {Key: []byte("defg"), Value: "defg"}},
exp: []Token{{Key: []byte("abcd")}, {Key: []byte("defg")}},
},
{
desc: "eight chars",
input: "abcdefgh",
exp: []Token{{Key: []byte("abcd"), Value: "abcd"}, {Key: []byte("defg"), Value: "defg"}},
exp: []Token{{Key: []byte("abcd")}, {Key: []byte("defg")}},
},
{
desc: "nine chars",
input: "abcdefghi",
exp: []Token{{Key: []byte("abcd"), Value: "abcd"}, {Key: []byte("defg"), Value: "defg"}},
exp: []Token{{Key: []byte("abcd")}, {Key: []byte("defg")}},
},
{
desc: "ten chars",
input: "abcdefghij",
exp: []Token{{Key: []byte("abcd"), Value: "abcd"}, {Key: []byte("defg"), Value: "defg"}, {Key: []byte("ghij"), Value: "ghij"}},
exp: []Token{{Key: []byte("abcd")}, {Key: []byte("defg")}, {Key: []byte("ghij")}},
},
} {
t.Run(tc.desc, func(t *testing.T) {
@ -269,7 +371,7 @@ func Test4Gram2SkipTokenizer(t *testing.T) {
}
}
func Test5Gram0SkipTokenizer(t *testing.T) {
func Test5GramSkip0Tokenizer(t *testing.T) {
tokenizer := five
for _, tc := range []struct {
desc string
@ -299,12 +401,12 @@ func Test5Gram0SkipTokenizer(t *testing.T) {
{
desc: "five chars",
input: "abcde",
exp: []Token{{Key: []byte("abcde"), Value: "abcde"}},
exp: []Token{{Key: []byte("abcde")}},
},
{
desc: "six chars",
input: "abcdef",
exp: []Token{{Key: []byte("abcde"), Value: "abcde"}, {Key: []byte("bcdef"), Value: "bcdef"}},
exp: []Token{{Key: []byte("abcde")}, {Key: []byte("bcdef")}},
},
} {
t.Run(tc.desc, func(t *testing.T) {
@ -313,7 +415,7 @@ func Test5Gram0SkipTokenizer(t *testing.T) {
}
}
func Test6Gram0SkipTokenizer(t *testing.T) {
func Test6GramSkip0Tokenizer(t *testing.T) {
tokenizer := six
for _, tc := range []struct {
desc string
@ -348,12 +450,12 @@ func Test6Gram0SkipTokenizer(t *testing.T) {
{
desc: "six chars",
input: "abcdef",
exp: []Token{{Key: []byte("abcdef"), Value: "abcdef"}},
exp: []Token{{Key: []byte("abcdef")}},
},
{
desc: "seven chars",
input: "abcdefg",
exp: []Token{{Key: []byte("abcdef"), Value: "abcdef"}, {Key: []byte("bcdefg"), Value: "bcdefg"}},
exp: []Token{{Key: []byte("abcdef")}, {Key: []byte("bcdefg")}},
},
} {
t.Run(tc.desc, func(t *testing.T) {
@ -369,13 +471,10 @@ func makeBuf(from, through, checksum int) []byte {
binary.PutVarint(i64buf, int64(from))
p = append(p, i64buf...)
p = append(p, 58)
binary.PutVarint(i64buf, int64(through))
p = append(p, i64buf...)
p = append(p, 58)
binary.LittleEndian.PutUint32(i32buf, uint32(checksum))
p = append(p, i32buf...)
p = append(p, 58)
return p
}
@ -400,46 +499,43 @@ func TestWrappedTokenizer(t *testing.T) {
desc: "four chars",
input: "abcd",
exp: []Token{
{Key: append(makeBuf(0, 999999, 1), []byte("abc")...), Value: string(makeBuf(0, 999999, 1)) + "abc"},
{Key: []byte("abc"), Value: "abc"}},
{Key: append(makeBuf(0, 999999, 1), []byte("abc")...)},
{Key: []byte("abc")}},
},
{
desc: "uuid",
input: "2b1a5e46-36a2-4694-a4b1-f34cc7bdfc45",
exp: []Token{
{Key: append(makeBuf(0, 999999, 1), []byte("2b1")...), Value: string(makeBuf(0, 999999, 1)) + "2b1"},
{Key: append(makeBuf(0, 999999, 1), []byte("a5e")...), Value: string(makeBuf(0, 999999, 1)) + "a5e"},
{Key: append(makeBuf(0, 999999, 1), []byte("46-")...), Value: string(makeBuf(0, 999999, 1)) + "46-"},
{Key: append(makeBuf(0, 999999, 1), []byte("36a")...), Value: string(makeBuf(0, 999999, 1)) + "36a"},
{Key: append(makeBuf(0, 999999, 1), []byte("2-4")...), Value: string(makeBuf(0, 999999, 1)) + "2-4"},
{Key: append(makeBuf(0, 999999, 1), []byte("694")...), Value: string(makeBuf(0, 999999, 1)) + "694"},
{Key: append(makeBuf(0, 999999, 1), []byte("-a4")...), Value: string(makeBuf(0, 999999, 1)) + "-a4"},
{Key: append(makeBuf(0, 999999, 1), []byte("b1-")...), Value: string(makeBuf(0, 999999, 1)) + "b1-"},
{Key: append(makeBuf(0, 999999, 1), []byte("f34")...), Value: string(makeBuf(0, 999999, 1)) + "f34"},
{Key: append(makeBuf(0, 999999, 1), []byte("cc7")...), Value: string(makeBuf(0, 999999, 1)) + "cc7"},
{Key: append(makeBuf(0, 999999, 1), []byte("bdf")...), Value: string(makeBuf(0, 999999, 1)) + "bdf"},
{Key: append(makeBuf(0, 999999, 1), []byte("c45")...), Value: string(makeBuf(0, 999999, 1)) + "c45"},
{Key: []byte("2b1"), Value: "2b1"},
{Key: []byte("a5e"), Value: "a5e"},
{Key: []byte("46-"), Value: "46-"},
{Key: []byte("36a"), Value: "36a"},
{Key: []byte("2-4"), Value: "2-4"},
{Key: []byte("694"), Value: "694"},
{Key: []byte("-a4"), Value: "-a4"},
{Key: []byte("b1-"), Value: "b1-"},
{Key: []byte("f34"), Value: "f34"},
{Key: []byte("cc7"), Value: "cc7"},
{Key: []byte("bdf"), Value: "bdf"},
{Key: []byte("c45"), Value: "c45"},
{Key: append(makeBuf(0, 999999, 1), []byte("2b1")...)},
{Key: []byte("2b1")},
{Key: append(makeBuf(0, 999999, 1), []byte("a5e")...)},
{Key: []byte("a5e")},
{Key: append(makeBuf(0, 999999, 1), []byte("46-")...)},
{Key: []byte("46-")},
{Key: append(makeBuf(0, 999999, 1), []byte("36a")...)},
{Key: []byte("36a")},
{Key: append(makeBuf(0, 999999, 1), []byte("2-4")...)},
{Key: []byte("2-4")},
{Key: append(makeBuf(0, 999999, 1), []byte("694")...)},
{Key: []byte("694")},
{Key: append(makeBuf(0, 999999, 1), []byte("-a4")...)},
{Key: []byte("-a4")},
{Key: append(makeBuf(0, 999999, 1), []byte("b1-")...)},
{Key: []byte("b1-")},
{Key: append(makeBuf(0, 999999, 1), []byte("f34")...)},
{Key: []byte("f34")},
{Key: append(makeBuf(0, 999999, 1), []byte("cc7")...)},
{Key: []byte("cc7")},
{Key: append(makeBuf(0, 999999, 1), []byte("bdf")...)},
{Key: []byte("bdf")},
{Key: append(makeBuf(0, 999999, 1), []byte("c45")...)},
{Key: []byte("c45")},
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
chunkTokenizer := ChunkIDTokenizer(logproto.ChunkRef{Fingerprint: 1,
From: 0,
Through: 999999,
Checksum: 1,
}, tokenizer)
chunkTokenizer := ChunkIDTokenizer(tokenizer)
chunkTokenizer.Reinit(logproto.ChunkRef{From: 0, Through: 999999, Checksum: 1})
require.Equal(t, tc.exp, chunkTokenizer.Tokens(tc.input))
})
}
@ -448,7 +544,7 @@ func TestWrappedTokenizer(t *testing.T) {
func BenchmarkTokens(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
file, _ := os.Open("big.txt")
file, _ := os.Open(BigFile)
defer file.Close()
scanner := bufio.NewScanner(file)
@ -460,17 +556,19 @@ func BenchmarkTokens(b *testing.B) {
}
}
func BenchmarkOldTokens(b *testing.B) {
func BenchmarkWrappedTokens(b *testing.B) {
chunkTokenizer := ChunkIDTokenizer(three)
chunkTokenizer.Reinit(logproto.ChunkRef{From: 0, Through: 999999, Checksum: 1})
for i := 0; i < b.N; i++ {
b.StopTimer()
file, _ := os.Open("big.txt")
file, _ := os.Open(BigFile)
defer file.Close()
scanner := bufio.NewScanner(file)
b.StartTimer()
for scanner.Scan() {
line := scanner.Text()
_ = three.OldTokens(line)
_ = chunkTokenizer.Tokens(line)
}
}
}

@ -10,7 +10,6 @@ import (
"github.com/grafana/loki/pkg/storage/bloom/v1/filter"
tsdbindex "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index"
//"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
"hash/fnv"
"math"
"os"
@ -26,8 +25,8 @@ import (
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/storage"
bt "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/client"
"github.com/grafana/loki/pkg/storage/config"
@ -89,15 +88,15 @@ func execute() {
}
var (
three = newNGramTokenizer(3, 4, 0)
threeSkip1 = newNGramTokenizer(3, 4, 1)
threeSkip2 = newNGramTokenizer(3, 4, 2)
threeSkip3 = newNGramTokenizer(3, 4, 3)
four = newNGramTokenizer(4, 5, 0)
fourSkip1 = newNGramTokenizer(4, 5, 1)
fourSkip2 = newNGramTokenizer(4, 5, 2)
five = newNGramTokenizer(5, 6, 0)
six = newNGramTokenizer(6, 7, 0)
three = bt.NewNGramTokenizer(3, 4, 0)
threeSkip1 = bt.NewNGramTokenizer(3, 4, 1)
threeSkip2 = bt.NewNGramTokenizer(3, 4, 2)
threeSkip3 = bt.NewNGramTokenizer(3, 4, 3)
four = bt.NewNGramTokenizer(4, 5, 0)
fourSkip1 = bt.NewNGramTokenizer(4, 5, 1)
fourSkip2 = bt.NewNGramTokenizer(4, 5, 2)
five = bt.NewNGramTokenizer(5, 6, 0)
six = bt.NewNGramTokenizer(6, 7, 0)
onePctError = func() *filter.ScalableBloomFilter { return filter.NewScalableBloomFilter(1024, 0.01, 0.8) }
fivePctError = func() *filter.ScalableBloomFilter { return filter.NewScalableBloomFilter(1024, 0.05, 0.8) }
@ -120,26 +119,26 @@ var experiments = []Experiment{
true,
onePctError,
),
NewExperiment(
"token=4skip1_error=1%_indexchunks=true",
fourSkip1,
true,
onePctError,
),
/*
NewExperiment(
"token=4skip1_error=1%_indexchunks=true",
fourSkip1,
true,
onePctError,
),
NewExperiment(
"token=4skip2_error=1%_indexchunks=true",
fourSkip2,
true,
onePctError,
),
NewExperiment(
"token=4skip2_error=1%_indexchunks=true",
fourSkip2,
"token=4skip0_error=5%_indexchunks=true",
four,
true,
onePctError,
fivePctError,
),*/
NewExperiment(
"token=4skip0_error=5%_indexchunks=true",
four,
true,
fivePctError,
),
/*
NewExperiment(
"token=4skip1_error=5%_indexchunks=true",
@ -266,11 +265,10 @@ func analyze(metrics *Metrics, sampler Sampler, indexShipper indexshipper.IndexS
}
level.Info(util_log.Logger).Log("msg", "starting analyze()", "tester", testerNumber, "total", numTesters)
var n int // count iterated series
reportEvery := 10 // report every n chunks
var n int // count iterated series
//pool := newPool(runtime.NumCPU())
//pool := newPool(1)
bloomTokenizer, _ := bt.NewBloomTokenizer(prometheus.DefaultRegisterer)
for _, tenant := range tenants {
level.Info(util_log.Logger).Log("Analyzing tenant", tenant, "table", tableName)
err := indexShipper.ForEach(
@ -308,8 +306,6 @@ func analyze(metrics *Metrics, sampler Sampler, indexShipper indexshipper.IndexS
return
}
cache := NewLRUCache4(150000)
transformed := make([]chunk.Chunk, 0, len(chks))
for _, chk := range chks {
transformed = append(transformed, chunk.Chunk{
@ -333,11 +329,10 @@ func analyze(metrics *Metrics, sampler Sampler, indexShipper indexshipper.IndexS
for _, c := range got {
chunkTotalUncompressedSize += c.Data.(*chunkenc.Facade).LokiChunk().UncompressedSize()
}
metrics.chunkSize.Observe(float64(chunkTotalUncompressedSize))
n += len(got)
// iterate experiments
for experimentIdx, experiment := range experiments {
for _, experiment := range experiments {
bucketPrefix := os.Getenv("BUCKET_PREFIX")
if strings.EqualFold(bucketPrefix, "") {
bucketPrefix = "named-experiments-"
@ -348,63 +343,13 @@ func analyze(metrics *Metrics, sampler Sampler, indexShipper indexshipper.IndexS
tenant,
ls.String(),
objectClient) {
bloomTokenizer.SetLineTokenizer(experiment.tokenizer)
level.Info(util_log.Logger).Log("Starting work on: ", ls.String(), "'", FNV32a(ls.String()), "'", experiment.name, tenant)
startTime := time.Now().UnixMilli()
sbf := experiment.bloom()
cache.Clear()
// Iterate chunks
var (
lines, inserts, collisions float64
)
for cidx := range got {
chunkTokenizer := ChunkIDTokenizer(got[cidx].ChunkRef, experiment.tokenizer)
var tokenizer Tokenizer = chunkTokenizer
if !experiment.encodeChunkID {
tokenizer = experiment.tokenizer // so I don't have to change the lines of code below
}
lc := got[cidx].Data.(*chunkenc.Facade).LokiChunk()
// Only report on the last experiment since they run serially
if experimentIdx == len(experiments)-1 && (n+cidx+1)%reportEvery == 0 {
estimatedProgress := float64(fp) / float64(model.Fingerprint(math.MaxUint64)) * 100.
level.Info(util_log.Logger).Log(
"msg", "iterated",
"progress", fmt.Sprintf("%.2f%%", estimatedProgress),
"chunks", len(chks),
"series", ls.String(),
)
}
itr, err := lc.Iterator(
context.Background(),
time.Unix(0, 0),
time.Unix(0, math.MaxInt64),
logproto.FORWARD,
log.NewNoopPipeline().ForStream(ls),
)
helpers.ExitErr("getting iterator", err)
for itr.Next() && itr.Error() == nil {
toks := tokenizer.Tokens(itr.Entry().Line)
lines++
for _, tok := range toks {
if tok.Key != nil {
if !cache.GetString(tok.Value) {
cache.PutStringByte(tok.Value, tok.Key)
if dup := sbf.TestAndAdd(tok.Key); dup {
collisions++
}
inserts++
}
}
}
}
helpers.ExitErr("iterating chunks", itr.Error())
} // for each chunk
bloomTokenizer.PopulateSBF(sbf, got)
endTime := time.Now().UnixMilli()
if len(got) > 0 {
@ -414,9 +359,6 @@ func analyze(metrics *Metrics, sampler Sampler, indexShipper indexshipper.IndexS
metrics.estimatedCount.WithLabelValues(experiment.name).Observe(
float64(estimatedCount(sbf.Capacity(), sbf.FillRatio())),
)
metrics.lines.WithLabelValues(experiment.name).Add(lines)
metrics.inserts.WithLabelValues(experiment.name).Add(inserts)
metrics.collisions.WithLabelValues(experiment.name).Add(collisions)
writeSBF(sbf,
os.Getenv("DIR"),
@ -428,6 +370,7 @@ func analyze(metrics *Metrics, sampler Sampler, indexShipper indexshipper.IndexS
metrics.sbfCreationTime.WithLabelValues(experiment.name).Add(float64(endTime - startTime))
metrics.sbfsCreated.WithLabelValues(experiment.name).Inc()
metrics.chunkSize.Observe(float64(chunkTotalUncompressedSize))
if err != nil {
helpers.ExitErr("writing sbf to file", err)

@ -3,262 +3,151 @@ package main
import (
"bufio"
"os"
"strconv"
"testing"
"github.com/stretchr/testify/require"
)
func TestNGrams(t *testing.T) {
tokenizer := newNGramTokenizer(2, 4, 0)
for _, tc := range []struct {
desc string
input string
exp []Token
}{
{
desc: "empty",
input: "",
exp: []Token{},
},
{
desc: "single char",
input: "a",
exp: []Token{},
},
{
desc: "two chars",
input: "ab",
exp: []Token{{Key: []byte("ab"), Value: "ab"}},
},
{
desc: "three chars",
input: "abc",
exp: []Token{{Key: []byte("ab"), Value: "ab"}, {Key: []byte("bc"), Value: "bc"}, {Key: []byte("abc"), Value: "abc"}},
},
{
desc: "four chars",
input: "abcd",
exp: []Token{{Key: []byte("ab"), Value: "ab"}, {Key: []byte("bc"), Value: "bc"}, {Key: []byte("abc"), Value: "abc"}, {Key: []byte("cd"), Value: "cd"}, {Key: []byte("bcd"), Value: "bcd"}},
},
{
desc: "foo",
input: "日本語",
exp: []Token{{Key: []byte("日本"), Value: "日本"}, {Key: []byte("本語"), Value: "本語"}, {Key: []byte("日本語"), Value: "日本語"}},
},
} {
t.Run(tc.desc, func(t *testing.T) {
require.Equal(t, tc.exp, tokenizer.Tokens(tc.input))
})
}
}
func Test4NGrams(t *testing.T) {
tokenizer := four
for _, tc := range []struct {
desc string
input string
exp []Token
}{
{
desc: "empty",
input: "",
exp: []Token{},
},
{
desc: "single char",
input: "a",
exp: []Token{},
},
{
desc: "two chars",
input: "ab",
exp: []Token{},
},
{
desc: "three chars",
input: "abc",
exp: []Token{},
},
{
desc: "four chars",
input: "abcd",
exp: []Token{{Key: []byte("abcd"), Value: "abcd"}},
},
{
desc: "five chars",
input: "abcde",
exp: []Token{{Key: []byte("abcd"), Value: "abcd"}, {Key: []byte("bcde"), Value: "bcde"}},
},
} {
t.Run(tc.desc, func(t *testing.T) {
require.Equal(t, tc.exp, tokenizer.Tokens(tc.input))
})
}
}
func Test6NGrams(t *testing.T) {
tokenizer := six
for _, tc := range []struct {
desc string
input string
exp []Token
}{
{
desc: "empty",
input: "",
exp: []Token{},
},
{
desc: "single char",
input: "a",
exp: []Token{},
},
{
desc: "two chars",
input: "ab",
exp: []Token{},
},
{
desc: "three chars",
input: "abc",
exp: []Token{},
},
{
desc: "four chars",
input: "abcd",
exp: []Token{},
},
{
desc: "five chars",
input: "abcde",
exp: []Token{},
},
{
desc: "six chars",
input: "abcdef",
exp: []Token{{Key: []byte("abcdef"), Value: "abcdef"}},
},
{
desc: "seven chars",
input: "abcdefg",
exp: []Token{{Key: []byte("abcdef"), Value: "abcdef"}, {Key: []byte("bcdefg"), Value: "bcdefg"}},
},
{
desc: "eight chars",
input: "abcdefgh",
exp: []Token{{Key: []byte("abcdef"), Value: "abcdef"}, {Key: []byte("bcdefg"), Value: "bcdefg"}, {Key: []byte("cdefgh"), Value: "cdefgh"}},
},
} {
t.Run(tc.desc, func(t *testing.T) {
require.Equal(t, tc.exp, tokenizer.Tokens(tc.input))
})
}
}
func TestNGramsSkip(t *testing.T) {
twoSkipOne := newNGramTokenizer(2, 3, 1)
for _, tc := range []struct {
desc string
tokenizer *ngramTokenizer
input string
exp []Token
}{
{
desc: "four chars",
tokenizer: twoSkipOne,
input: "abcd",
exp: []Token{{Key: []byte("ab"), Value: "ab"}, {Key: []byte("cd"), Value: "cd"}},
},
{
desc: "special chars",
tokenizer: twoSkipOne,
input: "日本語",
exp: []Token{{Key: []byte("日本"), Value: "日本"}},
},
{
desc: "multi",
tokenizer: newNGramTokenizer(2, 4, 1),
input: "abcdefghij",
exp: []Token{
{Key: []byte("ab"), Value: "ab"},
{Key: []byte("abc"), Value: "abc"},
{Key: []byte("cd"), Value: "cd"},
{Key: []byte("cde"), Value: "cde"},
{Key: []byte("ef"), Value: "ef"},
{Key: []byte("efg"), Value: "efg"},
{Key: []byte("gh"), Value: "gh"},
{Key: []byte("ghi"), Value: "ghi"},
{Key: []byte("ij"), Value: "ij"},
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
require.Equal(t, tc.exp, tc.tokenizer.Tokens(tc.input))
})
}
}
var num = 1000000
const BigFile = "../../../pkg/logql/sketch/testdata/war_peace.txt"
func BenchmarkLRU1Put(b *testing.B) {
cache := NewLRUCache(num)
func BenchmarkSBFTestAndAdd(b *testing.B) {
for i := 0; i < b.N; i++ {
cache.Put(strconv.Itoa(i))
b.StopTimer()
file, _ := os.Open(BigFile)
defer file.Close()
scanner := bufio.NewScanner(file)
experiment := NewExperiment(
"token=3skip0_error=1%_indexchunks=true",
three,
true,
onePctError,
)
sbf := experiment.bloom()
b.StartTimer()
for scanner.Scan() {
line := scanner.Text()
tokens := experiment.tokenizer.Tokens(line)
for _, token := range tokens {
sbf.TestAndAdd(token.Key)
}
}
}
}
func BenchmarkLRU1Get(b *testing.B) {
cache := NewLRUCache(num)
for i := 0; i < num; i++ {
cache.Put(strconv.Itoa(i))
}
b.ResetTimer()
func BenchmarkSBFAdd(b *testing.B) {
for i := 0; i < b.N; i++ {
cache.Get(strconv.Itoa(i))
b.StopTimer()
file, _ := os.Open(BigFile)
defer file.Close()
scanner := bufio.NewScanner(file)
experiment := NewExperiment(
"token=3skip0_error=1%_indexchunks=true",
three,
true,
onePctError,
)
sbf := experiment.bloom()
b.StartTimer()
for scanner.Scan() {
line := scanner.Text()
tokens := experiment.tokenizer.Tokens(line)
for _, token := range tokens {
sbf.Add(token.Key)
}
}
}
}
func BenchmarkLRU2Put(b *testing.B) {
cache := NewLRUCache2(num)
func BenchmarkSBFSeparateTestAndAdd(b *testing.B) {
for i := 0; i < b.N; i++ {
cache.Put(strconv.Itoa(i))
b.StopTimer()
file, _ := os.Open(BigFile)
defer file.Close()
scanner := bufio.NewScanner(file)
experiment := NewExperiment(
"token=3skip0_error=1%_indexchunks=true",
three,
true,
onePctError,
)
sbf := experiment.bloom()
b.StartTimer()
for scanner.Scan() {
line := scanner.Text()
tokens := experiment.tokenizer.Tokens(line)
for _, token := range tokens {
found := sbf.Test(token.Key)
if !found {
sbf.Add(token.Key)
}
}
}
}
}
func BenchmarkLRU2Get(b *testing.B) {
cache := NewLRUCache2(num)
for i := 0; i < num; i++ {
cache.Put(strconv.Itoa(i))
}
b.ResetTimer()
func BenchmarkSBFTestAndAddWithLRU(b *testing.B) {
for i := 0; i < b.N; i++ {
cache.Get(strconv.Itoa(i))
b.StopTimer()
file, _ := os.Open(BigFile)
defer file.Close()
scanner := bufio.NewScanner(file)
experiment := NewExperiment(
"token=3skip0_error=1%_indexchunks=true",
three,
true,
onePctError,
)
sbf := experiment.bloom()
cache := NewLRUCache4(150000)
b.StartTimer()
for scanner.Scan() {
line := scanner.Text()
tokens := experiment.tokenizer.Tokens(line)
for _, token := range tokens {
if !cache.Get(token.Key) {
cache.Put(token.Key)
sbf.TestAndAdd(token.Key)
}
}
}
}
}
func BenchmarkLRU4Put(b *testing.B) {
cache := NewLRUCache4(num)
func BenchmarkSBFSeparateTestAndAddWithLRU(b *testing.B) {
for i := 0; i < b.N; i++ {
cache.Put([]byte(strconv.Itoa(i)))
}
}
b.StopTimer()
file, _ := os.Open(BigFile)
defer file.Close()
scanner := bufio.NewScanner(file)
experiment := NewExperiment(
"token=3skip0_error=1%_indexchunks=true",
three,
true,
onePctError,
)
sbf := experiment.bloom()
cache := NewLRUCache4(150000)
b.StartTimer()
for scanner.Scan() {
line := scanner.Text()
tokens := experiment.tokenizer.Tokens(line)
for _, token := range tokens {
if !cache.Get(token.Key) {
cache.Put(token.Key)
func BenchmarkLRU4Get(b *testing.B) {
cache := NewLRUCache4(num)
for i := 0; i < num; i++ {
cache.Put([]byte(strconv.Itoa(i)))
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
cache.Get([]byte(strconv.Itoa(i)))
found := sbf.Test(token.Key)
if !found {
sbf.Add(token.Key)
}
//sbf.TestAndAdd(token.Key)
}
}
}
}
}
func BenchmarkSBFTestAndAdd(b *testing.B) {
func BenchmarkSBFSeparateTestAndAddWithLRU5(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
file, _ := os.Open("big.txt")
file, _ := os.Open(BigFile)
defer file.Close()
scanner := bufio.NewScanner(file)
experiment := NewExperiment(
@ -268,21 +157,31 @@ func BenchmarkSBFTestAndAdd(b *testing.B) {
onePctError,
)
sbf := experiment.bloom()
cache := NewLRUCache5(150000)
b.StartTimer()
for scanner.Scan() {
line := scanner.Text()
tokens := experiment.tokenizer.Tokens(line)
for _, token := range tokens {
sbf.TestAndAdd(token.Key)
str := string(token.Key)
if !cache.Get(str) {
cache.Put(str)
found := sbf.Test(token.Key)
if !found {
sbf.Add(token.Key)
}
}
}
}
}
}
func BenchmarkSBFAdd(b *testing.B) {
func BenchmarkSBFTestAndAddWithLRU5(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
file, _ := os.Open("big.txt")
file, _ := os.Open(BigFile)
defer file.Close()
scanner := bufio.NewScanner(file)
experiment := NewExperiment(
@ -292,21 +191,92 @@ func BenchmarkSBFAdd(b *testing.B) {
onePctError,
)
sbf := experiment.bloom()
cache := NewLRUCache5(150000)
b.StartTimer()
for scanner.Scan() {
line := scanner.Text()
tokens := experiment.tokenizer.Tokens(line)
for _, token := range tokens {
sbf.Add(token.Key)
str := string(token.Key)
if !cache.Get(str) {
cache.Put(str)
sbf.TestAndAdd(token.Key)
}
}
}
}
}
func BenchmarkSBFSeparateTestAndAdd(b *testing.B) {
func BenchmarkSBFTestAndAddWithByteKeyLRU(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
file, _ := os.Open(BigFile)
defer file.Close()
scanner := bufio.NewScanner(file)
experiment := NewExperiment(
"token=4skip0_error=1%_indexchunks=false",
four,
false,
onePctError,
)
sbf := experiment.bloom()
cache := NewByteKeyLRUCache(150000)
b.StartTimer()
for scanner.Scan() {
line := scanner.Text()
tokens := experiment.tokenizer.Tokens(line)
for _, token := range tokens {
array := NewFourByteKeyFromSlice(token.Key)
if !cache.Get(array) {
cache.Put(array)
sbf.TestAndAdd(token.Key)
}
}
}
}
}
func BenchmarkSBFTestAndAddWithFourByteKeyLRU(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
file, _ := os.Open("big.txt")
file, _ := os.Open(BigFile)
defer file.Close()
scanner := bufio.NewScanner(file)
experiment := NewExperiment(
"token=4skip0_error=1%_indexchunks=false",
four,
false,
onePctError,
)
sbf := experiment.bloom()
cache := NewFourByteKeyLRUCache(150000)
b.StartTimer()
for scanner.Scan() {
line := scanner.Text()
tokens := experiment.tokenizer.Tokens(line)
for _, token := range tokens {
if !cache.Get([4]byte(token.Key)) {
cache.Put([4]byte(token.Key))
found := sbf.Test(token.Key)
if !found {
sbf.Add(token.Key)
}
//sbf.TestAndAdd(token.Key)
}
}
}
}
}
func BenchmarkSBFAddWithLRU(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
file, _ := os.Open(BigFile)
defer file.Close()
scanner := bufio.NewScanner(file)
experiment := NewExperiment(
@ -316,13 +286,14 @@ func BenchmarkSBFSeparateTestAndAdd(b *testing.B) {
onePctError,
)
sbf := experiment.bloom()
cache := NewLRUCache4(150000)
b.StartTimer()
for scanner.Scan() {
line := scanner.Text()
tokens := experiment.tokenizer.Tokens(line)
for _, token := range tokens {
found := sbf.Test(token.Key)
if !found {
if !cache.Get(token.Key) {
cache.Put(token.Key)
sbf.Add(token.Key)
}
}
@ -330,10 +301,10 @@ func BenchmarkSBFSeparateTestAndAdd(b *testing.B) {
}
}
func BenchmarkSBFTestAndAddWithLRU(b *testing.B) {
func BenchmarkSBFSeparateTestAndAddWithLRU1(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
file, _ := os.Open("big.txt")
file, _ := os.Open(BigFile)
defer file.Close()
scanner := bufio.NewScanner(file)
experiment := NewExperiment(
@ -343,25 +314,30 @@ func BenchmarkSBFTestAndAddWithLRU(b *testing.B) {
onePctError,
)
sbf := experiment.bloom()
cache := NewLRUCache4(150000)
cache := NewLRUCache(150000)
b.StartTimer()
for scanner.Scan() {
line := scanner.Text()
tokens := experiment.tokenizer.Tokens(line)
for _, token := range tokens {
if !cache.Get(token.Key) {
cache.Put(token.Key)
sbf.TestAndAdd(token.Key)
str := string(token.Key)
if !cache.Get(str) {
cache.Put(str)
found := sbf.Test(token.Key)
if !found {
sbf.Add(token.Key)
}
//sbf.Add(token.Key)
}
}
}
}
}
func BenchmarkSBFAddWithLRU(b *testing.B) {
func BenchmarkSBFSeparateTestAndAddWithMap(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
file, _ := os.Open("big.txt")
file, _ := os.Open(BigFile)
defer file.Close()
scanner := bufio.NewScanner(file)
experiment := NewExperiment(
@ -371,15 +347,27 @@ func BenchmarkSBFAddWithLRU(b *testing.B) {
onePctError,
)
sbf := experiment.bloom()
cache := NewLRUCache4(150000)
cache := make(map[string]interface{}, 150000)
b.StartTimer()
for scanner.Scan() {
line := scanner.Text()
tokens := experiment.tokenizer.Tokens(line)
for _, token := range tokens {
if !cache.Get(token.Key) {
cache.Put(token.Key)
sbf.Add(token.Key)
str := string(token.Key)
_, found := cache[str]
if !found {
cache[str] = ""
f := sbf.Test(token.Key)
if !f {
sbf.Add(token.Key)
}
if len(cache) > 150000 {
for elem := range cache {
delete(cache, elem)
}
}
}
}
}

@ -1,6 +1,9 @@
package main
import "container/list"
import (
"container/list"
"fmt"
)
type LRUCache struct {
capacity int
@ -267,3 +270,337 @@ func (c *HashSet) Clear() {
delete(c.cache, k)
}
}
// ByteKey is an interface for types that represent keys of a certain size.
type ByteKey interface {
Size() int
Equal(other ByteKey) bool
}
// FourByteKey represents a key of 4 bytes.
type FourByteKey [4]byte
// Size returns the size of the FourByteKey.
func (k FourByteKey) Size() int {
return 4
}
// Equal checks if two FourByteKeys are equal.
func (k FourByteKey) Equal(other ByteKey) bool {
if otherFourByteKey, ok := other.(FourByteKey); ok {
return k == otherFourByteKey
}
return false
}
// ThirtyOneByteKey represents a key of 31 bytes.
type ThirtyOneByteKey [31]byte
// Size returns the size of the ThirtyOneByteKey.
func (k ThirtyOneByteKey) Size() int {
return 31
}
// Equal checks if two ThirtyOneByteKeys are equal.
func (k ThirtyOneByteKey) Equal(other ByteKey) bool {
if otherThirtyOneByteKey, ok := other.(ThirtyOneByteKey); ok {
return k == otherThirtyOneByteKey
}
return false
}
type ByteKeyLRUCache struct {
capacity int
//m map[ByteKey]struct{}
m map[ByteKey]*list.Element
list *list.List
}
func NewByteKeyLRUCache(capacity int) *ByteKeyLRUCache {
return &ByteKeyLRUCache{
capacity: capacity,
m: make(map[ByteKey]*list.Element, capacity),
list: list.New(),
}
}
func (c *ByteKeyLRUCache) Get(key ByteKey) bool {
if value, ok := c.m[key]; ok {
// Move the accessed element to the front of the list
c.list.MoveToFront(value)
return true
}
return false
}
func (c *ByteKeyLRUCache) Put(key ByteKey) {
if value, ok := c.m[key]; ok {
// If the key already exists, move it to the front
c.list.MoveToFront(value)
} else {
// If the cache is full, remove the least recently used element
if len(c.m) >= c.capacity {
// Get the least recently used element from the back of the list
tailElem := c.list.Back()
if tailElem != nil {
deletedEntry := c.list.Remove(tailElem).(ByteKey)
delete(c.m, deletedEntry)
}
}
// Add the new key to the cache and the front of the list
elem := c.list.PushFront(key)
c.m[key] = elem
}
}
func (c *ByteKeyLRUCache) Clear() {
// Iterate through the list and remove all elements
for elem := c.list.Front(); elem != nil; elem = elem.Next() {
delete(c.m, elem.Value.(ByteKey))
}
// Clear the list
c.list.Init()
}
// ByteKeyMap is a map that uses ByteKey as a key.
type ByteKeyMap struct {
capacity int
m map[ByteKey]struct{}
}
// NewByteKeyMap creates a new ByteKeyMap.
func NewByteKeyMap(capacity int) ByteKeyMap {
return ByteKeyMap{
capacity: capacity,
m: make(map[ByteKey]struct{}, capacity),
}
}
// Put adds an entry to the map.
func (bm *ByteKeyMap) Put(key ByteKey) {
bm.m[key] = struct{}{}
}
// Get retrieves a value from the map based on the key.
func (bm *ByteKeyMap) Get(key ByteKey) bool {
_, exists := bm.m[key]
return exists
}
type ByteSet struct {
capacity int
cache map[[4]byte]struct{}
}
func NewByteSet(capacity int) *ByteSet {
return &ByteSet{
capacity: capacity,
cache: make(map[[4]byte]struct{}),
}
}
func sliceToByteArray(slice []byte) [4]byte {
// Define the desired size of the byte array
// If you want to make it dynamically sized, use len(slice)
var array [4]byte
// Copy elements from the slice to the array
copy(array[:], slice)
return array
}
// NewFourByteKeyFromSlice converts a byte slice to a FourByteKey.
func NewFourByteKeyFromSlice(slice []byte) FourByteKey {
var key FourByteKey
copy(key[:], slice)
return key
}
// NewThirtyOneByteKeyFromSlice converts a byte slice to a FourByteKey.
func NewThirtyOneByteKeyFromSlice(slice []byte) ThirtyOneByteKey {
var key ThirtyOneByteKey
copy(key[:], slice)
return key
}
func (c ByteSet) Get(key string) bool {
if _, ok := c.cache[sliceToByteArray([]byte(key))]; ok {
return true
}
return false
}
func (c *ByteSet) Put(key string) {
c.cache[sliceToByteArray([]byte(key))] = struct{}{}
}
func (c *ByteSet) PutBytes(value []byte) {
c.cache[sliceToByteArray(value)] = struct{}{}
}
func (c *ByteSet) Clear() {
for k := range c.cache {
delete(c.cache, k)
}
}
type FourByteKeyLRUCache struct {
capacity int
m map[[4]byte]*list.Element
list *list.List
}
func NewFourByteKeyLRUCache(capacity int) *FourByteKeyLRUCache {
return &FourByteKeyLRUCache{
capacity: capacity,
m: make(map[[4]byte]*list.Element, capacity),
list: list.New(),
}
}
func (c *FourByteKeyLRUCache) Get(key [4]byte) bool {
if value, ok := c.m[key]; ok {
// Move the accessed element to the front of the list
c.list.MoveToFront(value)
return true
}
return false
}
func (c *FourByteKeyLRUCache) Put(key [4]byte) {
if value, ok := c.m[key]; ok {
// If the key already exists, move it to the front
c.list.MoveToFront(value)
} else {
// If the cache is full, remove the least recently used element
if len(c.m) >= c.capacity {
// Get the least recently used element from the back of the list
tailElem := c.list.Back()
if tailElem != nil {
deletedEntry := c.list.Remove(tailElem).([4]byte)
delete(c.m, deletedEntry)
}
}
// Add the new key to the cache and the front of the list
elem := c.list.PushFront(key)
c.m[key] = elem
}
}
func (c *FourByteKeyLRUCache) Clear() {
// Iterate through the list and remove all elements
for elem := c.list.Front(); elem != nil; elem = elem.Next() {
delete(c.m, elem.Value.([4]byte))
}
// Clear the list
c.list.Init()
}
type LRUCache5 struct {
capacity int
cache map[string]*LRUNode5
head *LRUNode5
tail *LRUNode5
}
type LRUNode5 struct {
key string
prev *LRUNode5
next *LRUNode5
}
func NewLRUCache5(capacity int) *LRUCache5 {
return &LRUCache5{
capacity: capacity,
}
}
func (c *LRUCache5) init() {
c.cache = make(map[string]*LRUNode5, c.capacity)
c.head = new(LRUNode5)
c.tail = new(LRUNode5)
c.head.next = c.tail
c.tail.prev = c.head
}
func (c *LRUCache5) pop(item *LRUNode5) {
item.prev.next = item.next
item.next.prev = item.prev
}
func (c *LRUCache5) push(item *LRUNode5) {
c.head.next.prev = item
item.next = c.head.next
item.prev = c.head
c.head.next = item
}
func (c *LRUCache5) evict() *LRUNode5 {
item := c.tail.prev
c.pop(item)
delete(c.cache, item.key)
return item
}
func (c *LRUCache5) Get(key string) bool {
if c.cache == nil {
c.init()
}
item := c.cache[key]
if item == nil {
return false
}
if c.head.next != item {
c.pop(item)
c.push(item)
}
return true
}
func (c *LRUCache5) Put(key string) {
if c.cache == nil {
c.init()
}
item := c.cache[key]
if item == nil {
if len(c.cache) == c.capacity {
item = c.evict()
} else {
item = new(LRUNode5)
}
item.key = key
c.push(item)
c.cache[key] = item
} else {
if c.head.next != item {
c.pop(item)
c.push(item)
}
}
}
func (c *LRUCache5) Clear() {
if c.cache != nil {
for elem := range c.cache {
delete(c.cache, elem)
}
c.head = nil
c.tail = nil
}
}
func (c *LRUCache5) Dump() {
if c.cache != nil {
for elem := range c.cache {
fmt.Println(elem)
}
}
}

@ -0,0 +1,206 @@
package main
import (
"encoding/binary"
"github.com/stretchr/testify/require"
"strconv"
"testing"
)
var num = 1000000
func BenchmarkLRU1Put(b *testing.B) {
cache := NewLRUCache(num)
for i := 0; i < b.N; i++ {
cache.Put(strconv.Itoa(i))
}
}
func BenchmarkLRU1Get(b *testing.B) {
cache := NewLRUCache(num)
for i := 0; i < num; i++ {
cache.Put(strconv.Itoa(i))
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
cache.Get(strconv.Itoa(i))
}
}
func BenchmarkLRU2Put(b *testing.B) {
cache := NewLRUCache2(num)
for i := 0; i < b.N; i++ {
cache.Put(strconv.Itoa(i))
}
}
func BenchmarkLRU2Get(b *testing.B) {
cache := NewLRUCache2(num)
for i := 0; i < num; i++ {
cache.Put(strconv.Itoa(i))
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
cache.Get(strconv.Itoa(i))
}
}
func BenchmarkLRU4Put(b *testing.B) {
cache := NewLRUCache4(num)
for i := 0; i < b.N; i++ {
cache.Put([]byte(strconv.Itoa(i)))
}
}
func BenchmarkLRU4Get(b *testing.B) {
cache := NewLRUCache4(num)
for i := 0; i < num; i++ {
cache.Put([]byte(strconv.Itoa(i)))
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
cache.Get([]byte(strconv.Itoa(i)))
}
}
func TestByteSet(t *testing.T) {
set := NewByteSet(30)
set.Put("fooa")
set.PutBytes([]byte("foob"))
for _, tc := range []struct {
desc string
input string
exp bool
}{
{
desc: "test string put",
input: "fooa",
exp: true,
},
{
desc: "test byte put",
input: "foob",
exp: true,
},
} {
t.Run(tc.desc, func(t *testing.T) {
require.Equal(t, tc.exp, set.Get(tc.input))
})
}
}
func TestByteKeyLRUCache(t *testing.T) {
set := NewByteKeyLRUCache(30)
set.Put(NewFourByteKeyFromSlice([]byte("fooa")))
//set.PutBytes([]byte("foob"))
for _, tc := range []struct {
desc string
input string
exp bool
}{
{
desc: "test valid",
input: "fooa",
exp: true,
},
{
desc: "test not valid",
input: "foob",
exp: false,
},
} {
t.Run(tc.desc, func(t *testing.T) {
require.Equal(t, tc.exp, set.Get(NewFourByteKeyFromSlice([]byte(tc.input))))
})
}
}
func TestLRUCache5(t *testing.T) {
set := NewLRUCache5(30)
set.Put("fooa")
for _, tc := range []struct {
desc string
input string
exp bool
}{
{
desc: "test valid",
input: "fooa",
exp: true,
},
{
desc: "test not valid",
input: "foob",
exp: false,
},
} {
t.Run(tc.desc, func(t *testing.T) {
require.Equal(t, tc.exp, set.Get(tc.input))
})
}
}
func BenchmarkLRU5Put(b *testing.B) {
cache := NewLRUCache5(num)
for i := 0; i < b.N; i++ {
cache.Put(strconv.Itoa(i))
}
}
func BenchmarkLRU5Get(b *testing.B) {
cache := NewLRUCache5(num)
for i := 0; i < num; i++ {
cache.Put(strconv.Itoa(i))
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
cache.Get(strconv.Itoa(i))
}
}
func BenchmarkByteKeyLRUCacheSet(b *testing.B) {
buf := make([]byte, 26)
cache := NewByteKeyLRUCache(num)
for i := 0; i < b.N; i++ {
binary.LittleEndian.PutUint64(buf, uint64(i))
cache.Put(NewThirtyOneByteKeyFromSlice(buf))
}
}
func BenchmarkByteKeyLRUCacheGet(b *testing.B) {
buf := make([]byte, 26)
cache := NewByteKeyLRUCache(num)
for i := 0; i < b.N; i++ {
binary.LittleEndian.PutUint64(buf, uint64(i))
cache.Put(NewThirtyOneByteKeyFromSlice(buf))
//cache.Put(NewTwentySixByteKeyFromSlice([]byte(strconv.Itoa(i))))
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
binary.LittleEndian.PutUint64(buf, uint64(i))
cache.Get(NewThirtyOneByteKeyFromSlice(buf))
//cache.Get(NewTwentySixByteKeyFromSlice([]byte(strconv.Itoa(i))))
}
}
func BenchmarkByteSetPut(b *testing.B) {
cache := NewByteSet(num)
for i := 0; i < b.N; i++ {
cache.Put(strconv.Itoa(i))
}
}
func BenchmarkByteSetGet(b *testing.B) {
cache := NewByteSet(num)
for i := 0; i < b.N; i++ {
cache.Put(strconv.Itoa(i))
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
cache.Get(strconv.Itoa(i))
}
}

@ -1,6 +1,7 @@
package main
import (
bt "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/bloom/v1/filter"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
@ -8,12 +9,12 @@ import (
type Experiment struct {
name string
tokenizer Tokenizer
tokenizer bt.Tokenizer
bloom func() *filter.ScalableBloomFilter
encodeChunkID bool
}
func NewExperiment(name string, tokenizer Tokenizer, encodeChunkID bool, bloom func() *filter.ScalableBloomFilter) Experiment {
func NewExperiment(name string, tokenizer bt.Tokenizer, encodeChunkID bool, bloom func() *filter.ScalableBloomFilter) Experiment {
return Experiment{
name: name,
tokenizer: tokenizer,

@ -9,6 +9,7 @@ import (
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/log"
bt "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/bloom/v1/filter"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/config"
@ -198,15 +199,15 @@ func analyzeRead(metrics *Metrics, sampler Sampler, shipper indexshipper.IndexSh
objectClient)
for gotIdx := range got { // for every chunk
for _, queryExperiment := range queryExperiments { // for each search string
if len(queryExperiment.searchString) >= experiment.tokenizer.getMin()+experiment.tokenizer.getSkip() {
if len(queryExperiment.searchString) >= experiment.tokenizer.GetMin()+experiment.tokenizer.GetSkip() {
foundInChunk := false
foundInSbf := false
chunkTokenizer := ChunkIDTokenizerHalfInit(experiment.tokenizer)
chunkTokenizer := bt.ChunkIDTokenizer(experiment.tokenizer)
chunkTokenizer.reinit(got[gotIdx].ChunkRef)
var tokenizer Tokenizer = chunkTokenizer
chunkTokenizer.Reinit(got[gotIdx].ChunkRef)
var tokenizer bt.Tokenizer = chunkTokenizer
if !experiment.encodeChunkID {
tokenizer = experiment.tokenizer
}
@ -309,10 +310,10 @@ func readSBFFromObjectStorage(location, prefix, period, tenant, series string, o
return sbf
}
func searchSbf(sbf *filter.ScalableBloomFilter, tokenizer Tokenizer, searchString string) bool {
for i := 0; i <= tokenizer.getSkip(); i++ {
func searchSbf(sbf *filter.ScalableBloomFilter, tokenizer bt.Tokenizer, searchString string) bool {
for i := 0; i <= tokenizer.GetSkip(); i++ {
numMatches := 0
if (len(searchString) - i) >= tokenizer.getMin() {
if (len(searchString) - i) >= tokenizer.GetMin() {
tokens := tokenizer.Tokens(searchString[i:])
for _, token := range tokens {

@ -1,255 +0,0 @@
package main
import (
"encoding/binary"
"unicode/utf8"
"github.com/grafana/loki/pkg/logproto"
)
type Token struct {
Key []byte
Value string
}
type Tokenizer interface {
Tokens(line string) []Token
getSkip() int
getMin() int
getMax() int
}
/*
type logfmtTokenizer struct {
parser *log.LogfmtParser
lbls *log.LabelsBuilder
}
func (t *logfmtTokenizer) Tokens(line string) []Token {
t.lbls.Reset()
t.parser.Process(0, []byte(line), t.lbls)
ls := t.lbls.LabelsResult().Labels()
res := make([]Token, 0, len(ls))
for _, l := range ls {
res = append(res, Token{Key: l.Name, Value: l.Value})
}
return res
}
func newLogfmtTokenizer() *logfmtTokenizer {
return &logfmtTokenizer{
// non strict, allow empty values
parser: log.NewLogfmtParser(false, true),
lbls: log.NewBaseLabelsBuilder().ForLabels(nil, 0),
}
}
*/
type ngramTokenizer struct {
// [min,max) exclusivity
min, max, skip int
buffers [][]rune // circular buffers used for ngram generation
runeBuffer []byte // buffer used for token generation
tokenBuffer []Token // buffer used for holding tokens that is returned
internalTokenBuffer []Token // circular buffer for tokens
}
func newNGramTokenizer(min, max, skip int) *ngramTokenizer {
capacity := max - min
t := &ngramTokenizer{
min: min,
max: max,
skip: skip,
buffers: make([][]rune, capacity),
}
for i := t.min; i < t.max; i++ {
t.buffers[i-t.min] = make([]rune, i)
}
t.runeBuffer = make([]byte, 0, max*4)
t.tokenBuffer = make([]Token, 0, 1024)
t.internalTokenBuffer = make([]Token, 0, 1024)
for i := 0; i < cap(t.internalTokenBuffer); i++ {
tok := Token{}
tok.Key = make([]byte, 0, 132)
t.internalTokenBuffer = append(t.internalTokenBuffer, tok)
}
return t
}
func (t *ngramTokenizer) getSkip() int {
return t.skip
}
func (t *ngramTokenizer) getMin() int {
return t.min
}
func (t *ngramTokenizer) getMax() int {
return t.max
}
func (t *ngramTokenizer) Tokens(line string) []Token {
t.tokenBuffer = t.tokenBuffer[:0] // Reset the result slice
var i int // rune index (not position that is measured in the range loop)
numToks := 0
for _, r := range line {
// j is the index of the buffer to use
for j := 0; j < (t.max - t.min); j++ {
// n is the length of the ngram
n := j + t.min
// pos is the position in the buffer to overwrite
pos := i % n
t.buffers[j][pos] = r
if i >= n-1 && (i+1-n)%(t.skip+1) == 0 {
t.runeBuffer = reassemble(t.buffers[j], (i+1)%n, t.runeBuffer)
//fmt.Println(numToks, cap(t.internalTokenBuffer), len(t.internalTokenBuffer))
if numToks >= cap(t.internalTokenBuffer) || numToks == len(t.internalTokenBuffer) {
tok := Token{}
tok.Key = make([]byte, 0, 132)
t.internalTokenBuffer = append(t.internalTokenBuffer, tok)
}
//fmt.Println(numToks, cap(t.internalTokenBuffer), len(t.internalTokenBuffer))
t.internalTokenBuffer[numToks].Key = t.internalTokenBuffer[numToks].Key[:0]
t.internalTokenBuffer[numToks].Key = append(t.internalTokenBuffer[numToks].Key, t.runeBuffer...)
t.internalTokenBuffer[numToks].Value = string(t.internalTokenBuffer[numToks].Key)
numToks++
}
}
i++
}
t.tokenBuffer = append(t.tokenBuffer, t.internalTokenBuffer[:numToks]...)
return t.tokenBuffer
}
func (t *ngramTokenizer) OldTokens(line string) []Token {
t.tokenBuffer = t.tokenBuffer[:0] // Reset the result slice
var i int // rune index (not position that is measured in the range loop)
for _, r := range line {
// j is the index of the buffer to use
for j := 0; j < (t.max - t.min); j++ {
// n is the length of the ngram
n := j + t.min
// pos is the position in the buffer to overwrite
pos := i % n
t.buffers[j][pos] = r
if i >= n-1 && (i+1-n)%(t.skip+1) == 0 {
t.runeBuffer = reassemble(t.buffers[j], (i+1)%n, t.runeBuffer)
b := Token{}
b.Key = make([]byte, 0, 132) // TODO: Yeah, that's too big but I didn't fee like doing the math at the end of the day
b.Key = append(b.Key, t.runeBuffer...)
b.Value = string(b.Key)
t.tokenBuffer = append(t.tokenBuffer, b)
}
}
i++
}
return t.tokenBuffer
}
func reassemble(buf []rune, pos int, result []byte) []byte {
result = result[:0] // Reset the result slice
for i := 0; i < len(buf); i++ {
cur := (pos + i) % len(buf)
result = utf8.AppendRune(result, buf[cur])
}
return result
}
type WrappedTokenizer struct {
t Tokenizer
f func(Token) Token
tokenBuffer []Token
prefix []byte
i64buf []byte
i32buf []byte
}
func (w *WrappedTokenizer) Tokens(line string) []Token {
w.tokenBuffer = w.tokenBuffer[:0] // Reset the result slice
toks := w.t.Tokens(line)
for _, tok := range toks {
w.tokenBuffer = append(w.tokenBuffer, w.f(tok))
}
return append(w.tokenBuffer, toks...)
}
func (w *WrappedTokenizer) getSkip() int {
return w.t.getSkip()
}
func (w *WrappedTokenizer) getMin() int {
return w.t.getMin()
}
func (w *WrappedTokenizer) getMax() int {
return w.t.getMax()
}
func ChunkIDTokenizer(chk logproto.ChunkRef, t Tokenizer) *WrappedTokenizer {
//prefix := fmt.Sprintf("%d:%d:%d:", chk.From, chk.Through, chk.Checksum)
p := make([]byte, 0, 256)
i64buf := make([]byte, binary.MaxVarintLen64)
i32buf := make([]byte, 4)
binary.PutVarint(i64buf, int64(chk.From))
p = append(p, i64buf...)
p = append(p, 58)
binary.PutVarint(i64buf, int64(chk.Through))
p = append(p, i64buf...)
p = append(p, 58)
binary.LittleEndian.PutUint32(i32buf, chk.Checksum)
p = append(p, i32buf...)
p = append(p, 58)
return &WrappedTokenizer{
t: t,
f: func(tok Token) Token {
tok.Key = append(append(tok.Key, p...), tok.Key...)[len(tok.Key):]
tok.Value = string(tok.Key)
return tok
},
tokenBuffer: make([]Token, 0, 1024),
prefix: p,
i64buf: i64buf,
i32buf: i32buf,
}
}
func ChunkIDTokenizerHalfInit(t Tokenizer) *WrappedTokenizer {
p := make([]byte, 0, 256)
return &WrappedTokenizer{
t: t,
tokenBuffer: make([]Token, 0, 1024),
prefix: p,
i64buf: make([]byte, binary.MaxVarintLen64),
i32buf: make([]byte, 4),
}
}
func (w *WrappedTokenizer) reinit(chk logproto.ChunkRef) {
//prefix := fmt.Sprintf("%d:%d:%d:", chk.From, chk.Through, chk.Checksum)
w.prefix = w.prefix[:0]
//w.prefix = fmt.Appendf(w.prefix, "%d:%d:%d:", chk.From, chk.Through, chk.Checksum)
binary.PutVarint(w.i64buf, int64(chk.From))
w.prefix = append(w.prefix, w.i64buf...)
w.prefix = append(w.prefix, 58)
binary.PutVarint(w.i64buf, int64(chk.Through))
w.prefix = append(w.prefix, w.i64buf...)
w.prefix = append(w.prefix, 58)
binary.LittleEndian.PutUint32(w.i32buf, chk.Checksum)
w.prefix = append(w.prefix, w.i32buf...)
w.prefix = append(w.prefix, 58)
w.f = func(tok Token) Token {
tok.Key = append(append(tok.Key, w.prefix...), tok.Key...)[len(tok.Key):]
tok.Value = string(tok.Key)
return tok
}
}
Loading…
Cancel
Save