From c4ed0d0b7fcc0701a0c64dccc3af4fe412eadf4d Mon Sep 17 00:00:00 2001 From: Paul Rogers <129207811+paul1r@users.noreply.github.com> Date: Wed, 8 Nov 2023 17:03:37 -0500 Subject: [PATCH] Tokenizer tests and TokenizeLine updates (#11133) **What this PR does / why we need it**: The thrust of this PR is to ensure we have tests for each major function of the Bloom Tokenizer. In addition, there was some cleanup, in that constants are used to set some common parameters. Lastly, the TokenizeLine() call was updated to correctly tokenize a line when a "skip tokenizer" is utilized. **Which issue(s) this PR fixes**: Fixes # **Special notes for your reviewer**: **Checklist** - [ ] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [ ] Tests updated - [ ] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/setup/upgrade/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213) - [ ] If the change is deprecating or removing a configuration option, update the `deprecated-config.yaml` and `deleted-config.yaml` files respectively in the `tools/deprecated-config-checker` directory. [Example PR](https://github.com/grafana/loki/pull/10840/commits/0d4416a4b03739583349934b96f272fb4f685d15) --- pkg/storage/bloom/v1/bloom_tokenizer.go | 37 ++++- pkg/storage/bloom/v1/bloom_tokenizer_test.go | 148 +++++++++++++++++++ pkg/storage/bloom/v1/tokenizer.go | 8 + tools/tsdb/bloom-tester/readlib.go | 35 ++--- tools/tsdb/bloom-tester/readlib_test.go | 17 +-- 5 files changed, 211 insertions(+), 34 deletions(-) diff --git a/pkg/storage/bloom/v1/bloom_tokenizer.go b/pkg/storage/bloom/v1/bloom_tokenizer.go index 7e194405be..26ebd63006 100644 --- a/pkg/storage/bloom/v1/bloom_tokenizer.go +++ b/pkg/storage/bloom/v1/bloom_tokenizer.go @@ -33,6 +33,8 @@ type BloomTokenizer struct { } const CacheSize = 150000 +const DefaultNGramLength = 4 +const DefaultNGramSkip = 0 // NewBloomTokenizer returns a new instance of the Bloom Tokenizer. // Warning: the tokens returned use the same byte slice to reduce allocations. This has two consequences: @@ -44,7 +46,7 @@ func NewBloomTokenizer(reg prometheus.Registerer) (*BloomTokenizer, error) { metrics: newMetrics(reg), } t.cache = make(map[string]interface{}, CacheSize) - t.lineTokenizer = NewNGramTokenizer(4, 5, 0) // default to 4-grams, no skip + t.lineTokenizer = NewNGramTokenizer(DefaultNGramLength, DefaultNGramLength+1, DefaultNGramSkip) // default to 4-grams, no skip t.chunkIDTokenizer = ChunkIDTokenizer(t.lineTokenizer) level.Info(util_log.Logger).Log("bloom tokenizer created") @@ -68,6 +70,7 @@ func clearCache(cache map[string]interface{}) { } } +// PopulateSeriesWithBloom is intended to be called on the write path, and is used to populate the bloom filter for a given series. func (bt *BloomTokenizer) PopulateSeriesWithBloom(seriesWithBloom *SeriesWithBloom, chunks []chunk.Chunk) { clearCache(bt.cache) for idx := range chunks { @@ -101,7 +104,7 @@ func (bt *BloomTokenizer) PopulateSeriesWithBloom(seriesWithBloom *SeriesWithBlo seriesWithBloom.Bloom.ScalableBloomFilter.TestAndAdd(tok.Key) - if len(bt.cache) > 150000 { // While crude, this has proven efficient in performance testing. This speaks to the similarity in log lines near each other + if len(bt.cache) >= CacheSize { // While crude, this has proven efficient in performance testing. This speaks to the similarity in log lines near each other clearCache(bt.cache) } } @@ -116,6 +119,32 @@ func (bt *BloomTokenizer) PopulateSeriesWithBloom(seriesWithBloom *SeriesWithBlo } // for each chunk } -func (bt *BloomTokenizer) TokenizeLine(line string) []Token { - return bt.lineTokenizer.Tokens(line) +// SearchesForTokenizerAndLine is for taking a given search string (ex: on the read/query path) and returning +// all the possible tokens, given a tokenizer. +// This is a multi-dimensional slice where the first slice is the offset into the line, and the +// second slice is the tokens for that offset. If an offset into the line returns no tokens, this first dimension +// will be less than 1 + the number of skips specified in the tokenizer +// The offset is used if the Tokenizer has a skip value being utilized. +func SearchesForTokenizerAndLine(t Tokenizer, line string) (res [][]Token) { + res = make([][]Token, 0, 10) + for i := range line { // iterate by runes + if i >= t.GetSkip()+1 { + break + } + tmpTokens := make([]Token, 0, 100) + tokens := t.Tokens(line[i:]) + // As the way the tokenizer is coded, it will reuse its internal buffers, + // but we need to save the data, hence the need for copying + for _, token := range tokens { + tmpToken := Token{} + tmpToken.Key = make([]byte, len(token.Key)) + copy(tmpToken.Key, token.Key) + tmpTokens = append(tmpTokens, tmpToken) + } + if len(tokens) > 0 { + res = append(res, tmpTokens) + } + } + + return res } diff --git a/pkg/storage/bloom/v1/bloom_tokenizer_test.go b/pkg/storage/bloom/v1/bloom_tokenizer_test.go index eaff6c7837..034301f88c 100644 --- a/pkg/storage/bloom/v1/bloom_tokenizer_test.go +++ b/pkg/storage/bloom/v1/bloom_tokenizer_test.go @@ -2,11 +2,159 @@ package v1 import ( "fmt" + "time" + + "github.com/prometheus/prometheus/model/labels" + + "github.com/grafana/loki/pkg/chunkenc" + "github.com/grafana/loki/pkg/push" + "github.com/grafana/loki/pkg/storage/chunk" + "testing" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/storage/bloom/v1/filter" + "github.com/prometheus/client_golang/prometheus" ) +func TestSetLineTokenizer(t *testing.T) { + bt, _ := NewBloomTokenizer(prometheus.DefaultRegisterer) + + // Validate defaults + require.Equal(t, bt.lineTokenizer.GetMin(), DefaultNGramLength) + require.Equal(t, bt.lineTokenizer.GetMax(), DefaultNGramLength+1) + require.Equal(t, bt.lineTokenizer.GetSkip(), DefaultNGramSkip) + + require.Equal(t, bt.chunkIDTokenizer.GetMin(), DefaultNGramLength) + require.Equal(t, bt.chunkIDTokenizer.GetMax(), DefaultNGramLength+1) + require.Equal(t, bt.chunkIDTokenizer.GetSkip(), DefaultNGramSkip) + + // Set new tokenizer, and validate against that + bt.SetLineTokenizer(NewNGramTokenizer(6, 7, 2)) + require.Equal(t, bt.lineTokenizer.GetMin(), 6) + require.Equal(t, bt.lineTokenizer.GetMax(), 7) + require.Equal(t, bt.lineTokenizer.GetSkip(), 2) + + require.Equal(t, bt.chunkIDTokenizer.GetMin(), 6) + require.Equal(t, bt.chunkIDTokenizer.GetMax(), 7) + require.Equal(t, bt.chunkIDTokenizer.GetSkip(), 2) +} + +func TestSearchesForTokenizerAndLine(t *testing.T) { + for _, tc := range []struct { + desc string + input string + t Tokenizer + exp [][]Token + }{ + { + desc: "empty", + input: "", + t: four, + exp: [][]Token{}, + }, + { + desc: "single char", + input: "a", + t: four, + exp: [][]Token{}, + }, + { + desc: "four chars", + input: "abcd", + t: four, + exp: [][]Token{ + {{Key: []byte("abcd")}}}, + }, + { + desc: "uuid partial", + input: "2b1a5e46-36a2-4", + t: four, + exp: [][]Token{{ + {Key: []byte("2b1a")}, + {Key: []byte("b1a5")}, + {Key: []byte("1a5e")}, + {Key: []byte("a5e4")}, + {Key: []byte("5e46")}, + {Key: []byte("e46-")}, + {Key: []byte("46-3")}, + {Key: []byte("6-36")}, + {Key: []byte("-36a")}, + {Key: []byte("36a2")}, + {Key: []byte("6a2-")}, + {Key: []byte("a2-4")}}, + }, + }, + { + desc: "short special chars", + t: four, + input: "日本語", + exp: [][]Token{}, + }, + { + desc: "longer special chars", + t: four, + input: "日本語日本語", + exp: [][]Token{{ + {Key: []byte("日本語日")}, + {Key: []byte("本語日本")}, + {Key: []byte("語日本語")}}}, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + require.Equal(t, tc.exp, SearchesForTokenizerAndLine(tc.t, tc.input)) + }) + } + +} + +func TestPopulateSeriesWithBloom(t *testing.T) { + var testLine = "this is a log line" + bt, _ := NewBloomTokenizer(prometheus.DefaultRegisterer) + + sbf := filter.NewScalableBloomFilter(1024, 0.01, 0.8) + var lbsList []labels.Labels + lbsList = append(lbsList, labels.FromStrings("foo", "bar")) + + var fpList []model.Fingerprint + for i := range lbsList { + fpList = append(fpList, model.Fingerprint(lbsList[i].Hash())) + } + + var memChunks = make([]*chunkenc.MemChunk, 0) + memChunk0 := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000) + _ = memChunk0.Append(&push.Entry{ + Timestamp: time.Unix(0, 1), + Line: testLine, + }) + memChunks = append(memChunks, memChunk0) + + var chunks = make([]chunk.Chunk, 0) + for i := range memChunks { + chunks = append(chunks, chunk.NewChunk("user", fpList[i], lbsList[i], chunkenc.NewFacade(memChunks[i], 256000, 1500000), model.TimeFromUnixNano(0), model.TimeFromUnixNano(1))) + } + + bloom := Bloom{ + ScalableBloomFilter: *sbf, + } + series := Series{ + Fingerprint: model.Fingerprint(lbsList[0].Hash()), + } + swb := SeriesWithBloom{ + Bloom: &bloom, + Series: &series, + } + + bt.PopulateSeriesWithBloom(&swb, chunks) + tokens := SearchesForTokenizerAndLine(four, testLine) + for _, token := range tokens[0] { + require.True(t, swb.Bloom.Test(token.Key)) + } +} + func BenchmarkMapClear(b *testing.B) { bt, _ := NewBloomTokenizer(prometheus.DefaultRegisterer) for i := 0; i < b.N; i++ { diff --git a/pkg/storage/bloom/v1/tokenizer.go b/pkg/storage/bloom/v1/tokenizer.go index 22da439f07..96e51f2cd0 100644 --- a/pkg/storage/bloom/v1/tokenizer.go +++ b/pkg/storage/bloom/v1/tokenizer.go @@ -150,8 +150,16 @@ func ChunkIDTokenizer(t Tokenizer) *WrappedTokenizer { } } +func zeroBuffer(buf []byte) { + for i := range buf { + buf[i] = 0 + } +} + func (w *WrappedTokenizer) Reinit(chk logproto.ChunkRef) { w.prefix = w.prefix[:0] + zeroBuffer(w.i64buf) + zeroBuffer(w.i32buf) binary.PutVarint(w.i64buf, int64(chk.From)) w.prefix = append(w.prefix, w.i64buf...) diff --git a/tools/tsdb/bloom-tester/readlib.go b/tools/tsdb/bloom-tester/readlib.go index ee0456a6e3..eaca7a38c1 100644 --- a/tools/tsdb/bloom-tester/readlib.go +++ b/tools/tsdb/bloom-tester/readlib.go @@ -126,6 +126,7 @@ func analyzeRead(metrics *Metrics, sampler Sampler, shipper indexshipper.IndexSh //searchString := os.Getenv("SEARCH_STRING") //147854,148226,145541,145603,147159,147836,145551,145599,147393,147841,145265,145620,146181,147225,147167,146131,146189,146739,147510,145572,146710,148031,29,146205,147175,146984,147345 //mytenants := []string{"29"} + bloomTokenizer, _ := bt.NewBloomTokenizer(prometheus.DefaultRegisterer) for _, tenant := range tenants { level.Info(util_log.Logger).Log("Analyzing tenant", tenant, "table", tableName) err := shipper.ForEach( @@ -199,6 +200,7 @@ func analyzeRead(metrics *Metrics, sampler Sampler, shipper indexshipper.IndexSh tenant, ls.String(), objectClient) + bloomTokenizer.SetLineTokenizer(experiment.tokenizer) for gotIdx := range got { // for every chunk for _, queryExperiment := range queryExperiments { // for each search string if len(queryExperiment.searchString) >= experiment.tokenizer.GetMin()+experiment.tokenizer.GetSkip() { @@ -206,15 +208,7 @@ func analyzeRead(metrics *Metrics, sampler Sampler, shipper indexshipper.IndexSh foundInChunk := false foundInSbf := false - chunkTokenizer := bt.ChunkIDTokenizer(experiment.tokenizer) - - chunkTokenizer.Reinit(got[gotIdx].ChunkRef) - var tokenizer bt.Tokenizer = chunkTokenizer - if !experiment.encodeChunkID { - tokenizer = experiment.tokenizer - } - - foundInSbf = searchSbf(sbf, tokenizer, queryExperiment.searchString) + foundInSbf = searchSbf(sbf, experiment.tokenizer, queryExperiment.searchString) lc := got[gotIdx].Data.(*chunkenc.Facade).LokiChunk() @@ -313,22 +307,21 @@ func readSBFFromObjectStorage(location, prefix, period, tenant, series string, o } func searchSbf(sbf *filter.ScalableBloomFilter, tokenizer bt.Tokenizer, searchString string) bool { - for i := 0; i <= tokenizer.GetSkip(); i++ { + tokens := bt.SearchesForTokenizerAndLine(tokenizer, searchString) + for _, tokenSet := range tokens { numMatches := 0 - if (len(searchString) - i) >= tokenizer.GetMin() { - tokens := tokenizer.Tokens(searchString[i:]) - - for _, token := range tokens { - if sbf.Test(token.Key) { - numMatches++ - } + for _, token := range tokenSet { + if sbf.Test(token.Key) { + numMatches++ } - if numMatches > 0 { - if numMatches == len(tokens) { - return true - } + } + if numMatches > 0 { + if numMatches == len(tokenSet) { + return true } } + } + return false } diff --git a/tools/tsdb/bloom-tester/readlib_test.go b/tools/tsdb/bloom-tester/readlib_test.go index ad7b7f0b73..5216918010 100644 --- a/tools/tsdb/bloom-tester/readlib_test.go +++ b/tools/tsdb/bloom-tester/readlib_test.go @@ -1,19 +1,16 @@ package main import ( + bt "github.com/grafana/loki/pkg/storage/bloom/v1" "testing" "github.com/stretchr/testify/require" ) func TestSearchSbf(t *testing.T) { - tokenizer := four - - searchString := "trace" - experiment := NewExperiment( "token=4skip0_error=1%_indexchunks=true", - tokenizer, + four, true, onePctError, ) @@ -69,11 +66,13 @@ func TestSearchSbf(t *testing.T) { } { t.Run(tc.desc, func(t *testing.T) { sbf := experiment.bloom() - tokens := tokenizer.Tokens(tc.inputLine) - for _, token := range tokens { - sbf.Add(token.Key) + tokens := bt.SearchesForTokenizerAndLine(four, tc.inputLine) + for _, tokenSet := range tokens { + for _, token := range tokenSet { + sbf.Add(token.Key) + } } - require.Equal(t, tc.exp, searchSbf(sbf, tokenizer, searchString)) + require.Equal(t, tc.exp, searchSbf(sbf, four, tc.inputSearch)) }) } }