Tokenizer tests and TokenizeLine updates (#11133)

**What this PR does / why we need it**:
The thrust of this PR is to ensure we have tests for each major function
of the Bloom Tokenizer. In addition, there was some cleanup, in that
constants are used to set some common parameters.

Lastly, the TokenizeLine() call was updated to correctly tokenize a line
when a "skip tokenizer" is utilized.

**Which issue(s) this PR fixes**:
Fixes #<issue number>

**Special notes for your reviewer**:

**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [ ] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e3ec)
- [ ] If the change is deprecating or removing a configuration option,
update the `deprecated-config.yaml` and `deleted-config.yaml` files
respectively in the `tools/deprecated-config-checker` directory.
[Example
PR](0d4416a4b0)
pull/11151/head^2
Paul Rogers 2 years ago committed by GitHub
parent cc6f0eca98
commit c4ed0d0b7f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 37
      pkg/storage/bloom/v1/bloom_tokenizer.go
  2. 148
      pkg/storage/bloom/v1/bloom_tokenizer_test.go
  3. 8
      pkg/storage/bloom/v1/tokenizer.go
  4. 35
      tools/tsdb/bloom-tester/readlib.go
  5. 17
      tools/tsdb/bloom-tester/readlib_test.go

@ -33,6 +33,8 @@ type BloomTokenizer struct {
}
const CacheSize = 150000
const DefaultNGramLength = 4
const DefaultNGramSkip = 0
// NewBloomTokenizer returns a new instance of the Bloom Tokenizer.
// Warning: the tokens returned use the same byte slice to reduce allocations. This has two consequences:
@ -44,7 +46,7 @@ func NewBloomTokenizer(reg prometheus.Registerer) (*BloomTokenizer, error) {
metrics: newMetrics(reg),
}
t.cache = make(map[string]interface{}, CacheSize)
t.lineTokenizer = NewNGramTokenizer(4, 5, 0) // default to 4-grams, no skip
t.lineTokenizer = NewNGramTokenizer(DefaultNGramLength, DefaultNGramLength+1, DefaultNGramSkip) // default to 4-grams, no skip
t.chunkIDTokenizer = ChunkIDTokenizer(t.lineTokenizer)
level.Info(util_log.Logger).Log("bloom tokenizer created")
@ -68,6 +70,7 @@ func clearCache(cache map[string]interface{}) {
}
}
// PopulateSeriesWithBloom is intended to be called on the write path, and is used to populate the bloom filter for a given series.
func (bt *BloomTokenizer) PopulateSeriesWithBloom(seriesWithBloom *SeriesWithBloom, chunks []chunk.Chunk) {
clearCache(bt.cache)
for idx := range chunks {
@ -101,7 +104,7 @@ func (bt *BloomTokenizer) PopulateSeriesWithBloom(seriesWithBloom *SeriesWithBlo
seriesWithBloom.Bloom.ScalableBloomFilter.TestAndAdd(tok.Key)
if len(bt.cache) > 150000 { // While crude, this has proven efficient in performance testing. This speaks to the similarity in log lines near each other
if len(bt.cache) >= CacheSize { // While crude, this has proven efficient in performance testing. This speaks to the similarity in log lines near each other
clearCache(bt.cache)
}
}
@ -116,6 +119,32 @@ func (bt *BloomTokenizer) PopulateSeriesWithBloom(seriesWithBloom *SeriesWithBlo
} // for each chunk
}
func (bt *BloomTokenizer) TokenizeLine(line string) []Token {
return bt.lineTokenizer.Tokens(line)
// SearchesForTokenizerAndLine is for taking a given search string (ex: on the read/query path) and returning
// all the possible tokens, given a tokenizer.
// This is a multi-dimensional slice where the first slice is the offset into the line, and the
// second slice is the tokens for that offset. If an offset into the line returns no tokens, this first dimension
// will be less than 1 + the number of skips specified in the tokenizer
// The offset is used if the Tokenizer has a skip value being utilized.
func SearchesForTokenizerAndLine(t Tokenizer, line string) (res [][]Token) {
res = make([][]Token, 0, 10)
for i := range line { // iterate by runes
if i >= t.GetSkip()+1 {
break
}
tmpTokens := make([]Token, 0, 100)
tokens := t.Tokens(line[i:])
// As the way the tokenizer is coded, it will reuse its internal buffers,
// but we need to save the data, hence the need for copying
for _, token := range tokens {
tmpToken := Token{}
tmpToken.Key = make([]byte, len(token.Key))
copy(tmpToken.Key, token.Key)
tmpTokens = append(tmpTokens, tmpToken)
}
if len(tokens) > 0 {
res = append(res, tmpTokens)
}
}
return res
}

@ -2,11 +2,159 @@ package v1
import (
"fmt"
"time"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/push"
"github.com/grafana/loki/pkg/storage/chunk"
"testing"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/storage/bloom/v1/filter"
"github.com/prometheus/client_golang/prometheus"
)
func TestSetLineTokenizer(t *testing.T) {
bt, _ := NewBloomTokenizer(prometheus.DefaultRegisterer)
// Validate defaults
require.Equal(t, bt.lineTokenizer.GetMin(), DefaultNGramLength)
require.Equal(t, bt.lineTokenizer.GetMax(), DefaultNGramLength+1)
require.Equal(t, bt.lineTokenizer.GetSkip(), DefaultNGramSkip)
require.Equal(t, bt.chunkIDTokenizer.GetMin(), DefaultNGramLength)
require.Equal(t, bt.chunkIDTokenizer.GetMax(), DefaultNGramLength+1)
require.Equal(t, bt.chunkIDTokenizer.GetSkip(), DefaultNGramSkip)
// Set new tokenizer, and validate against that
bt.SetLineTokenizer(NewNGramTokenizer(6, 7, 2))
require.Equal(t, bt.lineTokenizer.GetMin(), 6)
require.Equal(t, bt.lineTokenizer.GetMax(), 7)
require.Equal(t, bt.lineTokenizer.GetSkip(), 2)
require.Equal(t, bt.chunkIDTokenizer.GetMin(), 6)
require.Equal(t, bt.chunkIDTokenizer.GetMax(), 7)
require.Equal(t, bt.chunkIDTokenizer.GetSkip(), 2)
}
func TestSearchesForTokenizerAndLine(t *testing.T) {
for _, tc := range []struct {
desc string
input string
t Tokenizer
exp [][]Token
}{
{
desc: "empty",
input: "",
t: four,
exp: [][]Token{},
},
{
desc: "single char",
input: "a",
t: four,
exp: [][]Token{},
},
{
desc: "four chars",
input: "abcd",
t: four,
exp: [][]Token{
{{Key: []byte("abcd")}}},
},
{
desc: "uuid partial",
input: "2b1a5e46-36a2-4",
t: four,
exp: [][]Token{{
{Key: []byte("2b1a")},
{Key: []byte("b1a5")},
{Key: []byte("1a5e")},
{Key: []byte("a5e4")},
{Key: []byte("5e46")},
{Key: []byte("e46-")},
{Key: []byte("46-3")},
{Key: []byte("6-36")},
{Key: []byte("-36a")},
{Key: []byte("36a2")},
{Key: []byte("6a2-")},
{Key: []byte("a2-4")}},
},
},
{
desc: "short special chars",
t: four,
input: "日本語",
exp: [][]Token{},
},
{
desc: "longer special chars",
t: four,
input: "日本語日本語",
exp: [][]Token{{
{Key: []byte("日本語日")},
{Key: []byte("本語日本")},
{Key: []byte("語日本語")}}},
},
} {
t.Run(tc.desc, func(t *testing.T) {
require.Equal(t, tc.exp, SearchesForTokenizerAndLine(tc.t, tc.input))
})
}
}
func TestPopulateSeriesWithBloom(t *testing.T) {
var testLine = "this is a log line"
bt, _ := NewBloomTokenizer(prometheus.DefaultRegisterer)
sbf := filter.NewScalableBloomFilter(1024, 0.01, 0.8)
var lbsList []labels.Labels
lbsList = append(lbsList, labels.FromStrings("foo", "bar"))
var fpList []model.Fingerprint
for i := range lbsList {
fpList = append(fpList, model.Fingerprint(lbsList[i].Hash()))
}
var memChunks = make([]*chunkenc.MemChunk, 0)
memChunk0 := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000)
_ = memChunk0.Append(&push.Entry{
Timestamp: time.Unix(0, 1),
Line: testLine,
})
memChunks = append(memChunks, memChunk0)
var chunks = make([]chunk.Chunk, 0)
for i := range memChunks {
chunks = append(chunks, chunk.NewChunk("user", fpList[i], lbsList[i], chunkenc.NewFacade(memChunks[i], 256000, 1500000), model.TimeFromUnixNano(0), model.TimeFromUnixNano(1)))
}
bloom := Bloom{
ScalableBloomFilter: *sbf,
}
series := Series{
Fingerprint: model.Fingerprint(lbsList[0].Hash()),
}
swb := SeriesWithBloom{
Bloom: &bloom,
Series: &series,
}
bt.PopulateSeriesWithBloom(&swb, chunks)
tokens := SearchesForTokenizerAndLine(four, testLine)
for _, token := range tokens[0] {
require.True(t, swb.Bloom.Test(token.Key))
}
}
func BenchmarkMapClear(b *testing.B) {
bt, _ := NewBloomTokenizer(prometheus.DefaultRegisterer)
for i := 0; i < b.N; i++ {

@ -150,8 +150,16 @@ func ChunkIDTokenizer(t Tokenizer) *WrappedTokenizer {
}
}
func zeroBuffer(buf []byte) {
for i := range buf {
buf[i] = 0
}
}
func (w *WrappedTokenizer) Reinit(chk logproto.ChunkRef) {
w.prefix = w.prefix[:0]
zeroBuffer(w.i64buf)
zeroBuffer(w.i32buf)
binary.PutVarint(w.i64buf, int64(chk.From))
w.prefix = append(w.prefix, w.i64buf...)

@ -126,6 +126,7 @@ func analyzeRead(metrics *Metrics, sampler Sampler, shipper indexshipper.IndexSh
//searchString := os.Getenv("SEARCH_STRING")
//147854,148226,145541,145603,147159,147836,145551,145599,147393,147841,145265,145620,146181,147225,147167,146131,146189,146739,147510,145572,146710,148031,29,146205,147175,146984,147345
//mytenants := []string{"29"}
bloomTokenizer, _ := bt.NewBloomTokenizer(prometheus.DefaultRegisterer)
for _, tenant := range tenants {
level.Info(util_log.Logger).Log("Analyzing tenant", tenant, "table", tableName)
err := shipper.ForEach(
@ -199,6 +200,7 @@ func analyzeRead(metrics *Metrics, sampler Sampler, shipper indexshipper.IndexSh
tenant,
ls.String(),
objectClient)
bloomTokenizer.SetLineTokenizer(experiment.tokenizer)
for gotIdx := range got { // for every chunk
for _, queryExperiment := range queryExperiments { // for each search string
if len(queryExperiment.searchString) >= experiment.tokenizer.GetMin()+experiment.tokenizer.GetSkip() {
@ -206,15 +208,7 @@ func analyzeRead(metrics *Metrics, sampler Sampler, shipper indexshipper.IndexSh
foundInChunk := false
foundInSbf := false
chunkTokenizer := bt.ChunkIDTokenizer(experiment.tokenizer)
chunkTokenizer.Reinit(got[gotIdx].ChunkRef)
var tokenizer bt.Tokenizer = chunkTokenizer
if !experiment.encodeChunkID {
tokenizer = experiment.tokenizer
}
foundInSbf = searchSbf(sbf, tokenizer, queryExperiment.searchString)
foundInSbf = searchSbf(sbf, experiment.tokenizer, queryExperiment.searchString)
lc := got[gotIdx].Data.(*chunkenc.Facade).LokiChunk()
@ -313,22 +307,21 @@ func readSBFFromObjectStorage(location, prefix, period, tenant, series string, o
}
func searchSbf(sbf *filter.ScalableBloomFilter, tokenizer bt.Tokenizer, searchString string) bool {
for i := 0; i <= tokenizer.GetSkip(); i++ {
tokens := bt.SearchesForTokenizerAndLine(tokenizer, searchString)
for _, tokenSet := range tokens {
numMatches := 0
if (len(searchString) - i) >= tokenizer.GetMin() {
tokens := tokenizer.Tokens(searchString[i:])
for _, token := range tokens {
if sbf.Test(token.Key) {
numMatches++
}
for _, token := range tokenSet {
if sbf.Test(token.Key) {
numMatches++
}
if numMatches > 0 {
if numMatches == len(tokens) {
return true
}
}
if numMatches > 0 {
if numMatches == len(tokenSet) {
return true
}
}
}
return false
}

@ -1,19 +1,16 @@
package main
import (
bt "github.com/grafana/loki/pkg/storage/bloom/v1"
"testing"
"github.com/stretchr/testify/require"
)
func TestSearchSbf(t *testing.T) {
tokenizer := four
searchString := "trace"
experiment := NewExperiment(
"token=4skip0_error=1%_indexchunks=true",
tokenizer,
four,
true,
onePctError,
)
@ -69,11 +66,13 @@ func TestSearchSbf(t *testing.T) {
} {
t.Run(tc.desc, func(t *testing.T) {
sbf := experiment.bloom()
tokens := tokenizer.Tokens(tc.inputLine)
for _, token := range tokens {
sbf.Add(token.Key)
tokens := bt.SearchesForTokenizerAndLine(four, tc.inputLine)
for _, tokenSet := range tokens {
for _, token := range tokenSet {
sbf.Add(token.Key)
}
}
require.Equal(t, tc.exp, searchSbf(sbf, tokenizer, searchString))
require.Equal(t, tc.exp, searchSbf(sbf, four, tc.inputSearch))
})
}
}

Loading…
Cancel
Save