feat: Add new Drain tokenizer that splits on most punctuation (#13143)

pull/13180/head
benclive 2 years ago committed by GitHub
parent 9981e9e40d
commit 6a0fdd0880
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      pkg/logcli/output/loki.go
  2. 89
      pkg/pattern/drain/drain.go
  3. 2
      pkg/pattern/drain/drain_benchmark_test.go
  4. 580
      pkg/pattern/drain/drain_test.go
  5. 120
      pkg/pattern/drain/line_tokenizer.go
  6. 174
      pkg/pattern/drain/line_tokenizer_test.go
  7. 14
      pkg/pattern/drain/log_cluster.go
  8. 50000
      pkg/pattern/drain/testdata/grafana-ruler.txt
  9. 110
      pkg/pattern/ingester_querier_test.go

@ -0,0 +1 @@
package output

@ -25,7 +25,9 @@ package drain
import (
"math"
"strconv"
"strings"
"unicode"
"unsafe"
"github.com/hashicorp/golang-lru/v2/simplelru"
"github.com/prometheus/common/model"
@ -139,7 +141,7 @@ func DefaultConfig() *Config {
// MaxClusterDepth and SimTh, the less the chance that there will be
// "similar" clusters, but the greater the footprint.
SimTh: 0.3,
MaxChildren: 100,
MaxChildren: 15,
ParamString: `<_>`,
MaxClusters: 300,
}
@ -156,22 +158,24 @@ func New(config *Config, metrics *Metrics) *Drain {
}
d := &Drain{
config: config,
rootNode: createNode(),
idToCluster: createLogClusterCache(config.MaxClusters, evictFn),
metrics: metrics,
tokenizer: splittingTokenizer{}, // Default to this for now
config: config,
rootNode: createNode(),
idToCluster: createLogClusterCache(config.MaxClusters, evictFn),
metrics: metrics,
tokenizer: newPunctuationTokenizer(),
maxAllowedLineLength: 3000,
}
return d
}
type Drain struct {
config *Config
rootNode *Node
idToCluster *LogClusterCache
clustersCounter int
metrics *Metrics
tokenizer LineTokenizer
config *Config
rootNode *Node
idToCluster *LogClusterCache
clustersCounter int
metrics *Metrics
tokenizer LineTokenizer
maxAllowedLineLength int
}
func (d *Drain) Clusters() []*LogCluster {
@ -183,10 +187,14 @@ func (d *Drain) TrainTokens(tokens []string, stringer func([]string) string, ts
}
func (d *Drain) Train(content string, ts int64) *LogCluster {
return d.train(d.tokenizer.Tokenize(content), d.tokenizer.Join, ts)
if len(content) > d.maxAllowedLineLength {
return nil
}
tokens, state := d.tokenizer.Tokenize(content)
return d.train(tokens, state, ts)
}
func (d *Drain) train(tokens []string, stringer func([]string) string, ts int64) *LogCluster {
func (d *Drain) train(tokens []string, state interface{}, ts int64) *LogCluster {
if len(tokens) < 4 {
return nil
}
@ -196,11 +204,12 @@ func (d *Drain) train(tokens []string, stringer func([]string) string, ts int64)
d.clustersCounter++
clusterID := d.clustersCounter
matchCluster = &LogCluster{
Tokens: tokens,
id: clusterID,
Size: 1,
Stringer: stringer,
Chunks: Chunks{},
Tokens: tokens,
TokenState: state,
id: clusterID,
Size: 1,
Stringer: d.tokenizer.Join,
Chunks: Chunks{},
}
matchCluster.append(model.TimeFromUnixNano(ts))
d.idToCluster.Set(clusterID, matchCluster)
@ -219,15 +228,16 @@ func (d *Drain) train(tokens []string, stringer func([]string) string, ts int64)
}
func (d *Drain) TrainPattern(content string, samples []*logproto.PatternSample) *LogCluster {
tokens := deduplicatePlaceholders(d.tokenizer.Tokenize(content), d.config.ParamString)
tokens, state := d.tokenizer.Tokenize(content)
matchCluster := d.treeSearch(d.rootNode, tokens, d.config.SimTh, true)
// Match no existing log cluster
if matchCluster == nil {
d.clustersCounter++
clusterID := d.clustersCounter
matchCluster = &LogCluster{
Tokens: tokens,
id: clusterID,
Tokens: tokens,
TokenState: state,
id: clusterID,
}
d.idToCluster.Set(clusterID, matchCluster)
d.addSeqToPrefixTree(d.rootNode, matchCluster)
@ -241,24 +251,33 @@ func (d *Drain) TrainPattern(content string, samples []*logproto.PatternSample)
return matchCluster
}
func deduplicatePlaceholders(tokens []string, param string) []string {
if len(tokens) < 2 {
return tokens
func deduplicatePlaceholders(line string, placeholder string) string {
first := strings.Index(line, "<_><_>")
if first == -1 {
return line
}
i := 1
for k := 1; k < len(tokens); k++ {
if tokens[k] != param || tokens[k] != tokens[k-1] {
if i != k {
tokens[i] = tokens[k]
builder := make([]byte, 0, len(line))
low := 0
for i := first; i < len(line)-5; i++ {
if line[i:i+len(placeholder)] == placeholder {
high := i + 3
for ; high < len(line)-2; high += 3 {
if line[high:high+len(placeholder)] != placeholder {
break
}
}
i++
builder = append(builder, line[low:i+len(placeholder)]...)
low = high
i = high
}
}
return tokens[:i]
builder = append(builder, line[low:]...)
return unsafe.String(unsafe.SliceData(builder), len(builder))
}
func (d *Drain) PatternString(c *LogCluster) string {
s := d.tokenizer.Join(deduplicatePlaceholders(c.Tokens, d.config.ParamString))
s := deduplicatePlaceholders(d.tokenizer.Join(c.Tokens, c.TokenState), d.config.ParamString)
if s == d.config.ParamString {
return ""
}
@ -271,7 +290,7 @@ func (d *Drain) Delete(cluster *LogCluster) {
// Match against an already existing cluster. Match shall be perfect (sim_th=1.0). New cluster will not be created as a result of this call, nor any cluster modifications.
func (d *Drain) Match(content string) *LogCluster {
contentTokens := d.tokenizer.Tokenize(content)
contentTokens, _ := d.tokenizer.Tokenize(content)
matchCluster := d.treeSearch(d.rootNode, contentTokens, 1.0, true)
return matchCluster
}
@ -413,6 +432,7 @@ func (d *Drain) addSeqToPrefixTree(rootNode *Node, cluster *LogCluster) {
// if token not matched in this layer of existing tree.
if _, ok = curNode.keyToChildNode[token]; !ok {
if !d.hasNumbers(token) {
// Numbers in token: Prioritize the param string path
if _, ok = curNode.keyToChildNode[d.config.ParamString]; ok {
if len(curNode.keyToChildNode) < d.config.MaxChildren {
newNode := createNode()
@ -435,6 +455,7 @@ func (d *Drain) addSeqToPrefixTree(rootNode *Node, cluster *LogCluster) {
}
}
} else {
// No numbers, use the key as-is to traverse
if _, ok = curNode.keyToChildNode[d.config.ParamString]; !ok {
newNode := createNode()
curNode.keyToChildNode[d.config.ParamString] = newNode

@ -39,8 +39,8 @@ func BenchmarkDrain_TrainExtractsPatterns(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
drain := New(DefaultConfig(), nil)
for _, line := range lines {
drain := New(DefaultConfig(), nil)
drain.Train(line, 0)
}
}

File diff suppressed because one or more lines are too long

@ -1,25 +1,96 @@
package drain
import "strings"
import (
"strings"
"unicode"
)
type LineTokenizer interface {
Tokenize(line string) []string
Join(tokens []string) string
Tokenize(line string) ([]string, interface{})
Join(tokens []string, state interface{}) string
}
type spacesTokenizer struct{}
func (spacesTokenizer) Tokenize(line string) []string {
return strings.Split(line, " ")
func (spacesTokenizer) Tokenize(line string) ([]string, interface{}) {
return strings.Split(line, " "), nil
}
func (spacesTokenizer) Join(tokens []string) string {
func (spacesTokenizer) Join(tokens []string, _ interface{}) string {
return strings.Join(tokens, " ")
}
type punctuationTokenizer struct {
includeDelimiters [128]rune
excludeDelimiters [128]rune
}
func newPunctuationTokenizer() *punctuationTokenizer {
var included [128]rune
var excluded [128]rune
included['='] = 1
excluded['_'] = 1
excluded['-'] = 1
return &punctuationTokenizer{
includeDelimiters: included,
excludeDelimiters: excluded,
}
}
func (p *punctuationTokenizer) Tokenize(line string) ([]string, interface{}) {
tokens := make([]string, len(line)) // Maximum size is every character is punctuation
spacesAfter := make([]int, strings.Count(line, " ")) // Could be a bitmap, but it's not worth it for a few bytes.
start := 0
nextTokenIdx := 0
nextSpaceIdx := 0
for i, char := range line {
if unicode.IsLetter(char) || unicode.IsNumber(char) || char < 128 && p.excludeDelimiters[char] != 0 {
continue
}
included := char < 128 && p.includeDelimiters[char] != 0
if char == ' ' || included || unicode.IsPunct(char) {
if i > start {
tokens[nextTokenIdx] = line[start:i]
nextTokenIdx++
}
if char == ' ' {
spacesAfter[nextSpaceIdx] = nextTokenIdx - 1
nextSpaceIdx++
} else {
tokens[nextTokenIdx] = line[i : i+1]
nextTokenIdx++
}
start = i + 1
}
}
if start < len(line) {
tokens[nextTokenIdx] = line[start:]
nextTokenIdx++
}
return tokens[:nextTokenIdx], spacesAfter[:nextSpaceIdx]
}
func (p *punctuationTokenizer) Join(tokens []string, state interface{}) string {
spacesAfter := state.([]int)
strBuilder := strings.Builder{}
spacesIdx := 0
for i, token := range tokens {
strBuilder.WriteString(token)
for spacesIdx < len(spacesAfter) && i == spacesAfter[spacesIdx] {
// One entry for each space following the token
strBuilder.WriteRune(' ')
spacesIdx++
}
}
return strBuilder.String()
}
type splittingTokenizer struct{}
func (splittingTokenizer) Tokenize(line string) []string {
func (splittingTokenizer) Tokenize(line string) ([]string, interface{}) {
numEquals := strings.Count(line, "=")
numColons := strings.Count(line, ":")
numSpaces := strings.Count(line, " ")
@ -32,24 +103,31 @@ func (splittingTokenizer) Tokenize(line string) []string {
}
tokens := make([]string, 0, expectedTokens)
spacesAfter := make([]int, 0, strings.Count(line, " "))
for _, token := range strings.SplitAfter(line, keyvalSeparator) {
tokens = append(tokens, strings.Split(token, " ")...)
words := strings.Split(token, " ")
for i, entry := range words {
tokens = append(tokens, entry)
if i == len(words)-1 {
continue
}
spacesAfter = append(spacesAfter, len(tokens)-1)
}
}
return tokens
return tokens, spacesAfter
}
func (splittingTokenizer) Join(tokens []string) string {
var builder strings.Builder
for _, token := range tokens {
if strings.HasSuffix(token, "=") || strings.HasSuffix(token, ":") {
builder.WriteString(token)
} else {
builder.WriteString(token + " ")
func (splittingTokenizer) Join(tokens []string, state interface{}) string {
spacesAfter := state.([]int)
strBuilder := strings.Builder{}
spacesIdx := 0
for i, token := range tokens {
strBuilder.WriteString(token)
for spacesIdx < len(spacesAfter) && i == spacesAfter[spacesIdx] {
// One entry for each space following the token
strBuilder.WriteRune(' ')
spacesIdx++
}
}
output := builder.String()
if output[len(output)-1] == ' ' {
return output[:len(output)-1]
}
return output
return strBuilder.String()
}

@ -1,59 +1,163 @@
package drain
import (
"reflect"
"testing"
"github.com/stretchr/testify/require"
)
func TestSplittingTokenizer_Tokenize(t *testing.T) {
tokenizer := splittingTokenizer{}
type TestCase struct {
name string
line string
want map[string][]string
}
tests := []struct {
name string
line string
want []string
}{
{
name: "Test with equals sign",
line: "key1=value1 key2=value2",
want: []string{"key1=", "value1", "key2=", "value2"},
const typePunctuation = "punctuation"
const typeSplitting = "splitting"
var testCases = []TestCase{
{
name: "Test with equals sign",
line: "key1=value1 key2=value2",
want: map[string][]string{
typePunctuation: {"key1", "=", "value1", "key2", "=", "value2"},
typeSplitting: {"key1=", "value1", "key2=", "value2"},
},
{
name: "Test with colon",
line: "key1:value1 key2:value2",
want: []string{"key1:", "value1", "key2:", "value2"},
},
{
name: "Test with colon",
line: "key1:value1 key2:value2",
want: map[string][]string{
typePunctuation: {"key1", ":", "value1", "key2", ":", "value2"},
typeSplitting: {"key1:", "value1", "key2:", "value2"},
},
{
name: "Test with mixed delimiters, more = than :",
line: "key1=value1 key2:value2 key3=value3",
want: []string{"key1=", "value1", "key2:value2", "key3=", "value3"},
},
{
name: "Test with mixed delimiters, more = than :",
line: "key1=value1 key2:value2 key3=value3",
want: map[string][]string{
typePunctuation: {"key1", "=", "value1", "key2", ":", "value2", "key3", "=", "value3"},
typeSplitting: {"key1=", "value1", "key2:value2", "key3=", "value3"},
},
},
{
name: "Test with mixed delimiters, more : than =",
line: "key1:value1 key2:value2 key3=value3",
want: map[string][]string{
typePunctuation: {"key1", ":", "value1", "key2", ":", "value2", "key3", "=", "value3"},
typeSplitting: {"key1:", "value1", "key2:", "value2", "key3=value3"},
},
},
{
name: "Dense json",
line: `{"key1":"value1","key2":"value2","key3":"value3"}`,
want: map[string][]string{
typePunctuation: {`{`, `"`, `key1`, `"`, `:`, `"`, `value1`, `"`, `,`, `"`, `key2`, `"`, `:`, `"`, `value2`, `"`, `,`, `"`, `key3`, `"`, `:`, `"`, `value3`, `"`, `}`},
typeSplitting: {`{"key1":`, `"value1","key2":`, `"value2","key3":`, `"value3"}`},
},
},
{
name: "json with spaces",
line: `{"key1":"value1", "key2":"value2", "key3":"value3"}`,
want: map[string][]string{
typePunctuation: {`{`, `"`, `key1`, `"`, `:`, `"`, `value1`, `"`, `,`, `"`, `key2`, `"`, `:`, `"`, `value2`, `"`, `,`, `"`, `key3`, `"`, `:`, `"`, `value3`, `"`, `}`},
typeSplitting: {`{"key1":`, `"value1",`, `"key2":`, `"value2",`, `"key3":`, `"value3"}`},
},
},
{
name: "logfmt multiword values",
line: `key1=value1 key2=value2 msg="this is a message"`,
want: map[string][]string{
typePunctuation: {"key1", "=", "value1", "key2", "=", "value2", "msg", "=", `"`, `this`, "is", "a", `message`, `"`},
typeSplitting: {"key1=", "value1", "key2=", "value2", "msg=", `"this`, "is", "a", `message"`},
},
},
{
name: "longer line",
line: "09:17:38.033366 ▶ INFO route ops sending to dest https://graphite-cortex-ops-blocks-us-east4.grafana.net/graphite/metrics: service_is_carbon-relay-ng.instance_is_carbon-relay-ng-c665b7b-j2trk.mtype_is_counter.dest_is_https_graphite-cortex-ops-blocks-us-east4_grafana_netgraphitemetrics.unit_is_Metric.action_is_drop.reason_is_queue_full 0 1717060658",
want: map[string][]string{
typePunctuation: {`09`, `:`, `17`, `:`, `38`, `.`, `033366`, ``, `INFO`, `route`, `ops`, `sending`, `to`, `dest`, `https`, `:`, `/`, `/`, `graphite-cortex-ops-blocks-us-east4`, `.`, `grafana`, `.`, `net`, `/`, `graphite`, `/`, `metrics`, `:`, `service_is_carbon-relay-ng`, `.`, `instance_is_carbon-relay-ng-c665b7b-j2trk`, `.`, `mtype_is_counter`, `.`, `dest_is_https_graphite-cortex-ops-blocks-us-east4_grafana_netgraphitemetrics`, `.`, `unit_is_Metric`, `.`, `action_is_drop`, `.`, `reason_is_queue_full`, `0`, `1717060658`},
typeSplitting: {`09:`, `17:`, `38.033366`, ``, `INFO`, ``, `route`, `ops`, `sending`, `to`, `dest`, `https:`, `//graphite-cortex-ops-blocks-us-east4.grafana.net/graphite/metrics:`, ``, `service_is_carbon-relay-ng.instance_is_carbon-relay-ng-c665b7b-j2trk.mtype_is_counter.dest_is_https_graphite-cortex-ops-blocks-us-east4_grafana_netgraphitemetrics.unit_is_Metric.action_is_drop.reason_is_queue_full`, `0`, `1717060658`},
},
},
{
name: "Consecutive splits points: equals followed by space",
line: `ts=2024-05-30T12:50:36.648377186Z caller=scheduler_processor.go:143 level=warn msg="error contacting scheduler" err="rpc error: code = Unavailable desc = connection error: desc = \"error reading server preface: EOF\"" addr=10.0.151.101:9095`,
want: map[string][]string{
typePunctuation: {`ts`, `=`, `2024-05-30T12`, `:`, `50`, `:`, `36`, `.`, `648377186Z`, `caller`, `=`, `scheduler_processor`, `.`, `go`, `:`, `143`, `level`, `=`, `warn`, `msg`, `=`, `"`, `error`, `contacting`, `scheduler`, `"`, `err`, `=`, `"`, `rpc`, `error`, `:`, `code`, `=`, `Unavailable`, `desc`, `=`, `connection`, `error`, `:`, `desc`, `=`, `\`, `"`, `error`, `reading`, `server`, `preface`, `:`, `EOF`, `\`, `"`, `"`, `addr`, `=`, `10`, `.`, `0`, `.`, `151`, `.`, `101`, `:`, `9095`},
typeSplitting: {"ts=", "2024-05-30T12:50:36.648377186Z", "caller=", "scheduler_processor.go:143", "level=", "warn", "msg=", "\"error", "contacting", "scheduler\"", "err=", "\"rpc", "error:", "code", "=", ``, "Unavailable", "desc", "=", ``, "connection", "error:", "desc", "=", ``, `\"error`, "reading", "server", "preface:", `EOF\""`, "addr=", "10.0.151.101:9095"},
},
},
{
name: "Only punctation",
line: `!@£$%^&*()`,
want: map[string][]string{
typePunctuation: {`!`, `@`, `£$`, `%`, `^`, `&`, `*`, `(`, `)`},
typeSplitting: {`!@£$%^&*()`},
},
},
}
func TestTokenizer_Tokenize(t *testing.T) {
tests := []struct {
name string
tokenizer LineTokenizer
}{
{
name: "Test with mixed delimiters, more : than =",
line: "key1:value1 key2:value2 key3=value3",
want: []string{"key1:", "value1", "key2:", "value2", "key3=value3"},
name: typePunctuation,
tokenizer: newPunctuationTokenizer(),
},
{
name: "Dense json",
line: `{"key1":"value1","key2":"value2","key3":"value3"}`,
want: []string{`{"key1":`, `"value1","key2":`, `"value2","key3":`, `"value3"}`},
name: typeSplitting,
tokenizer: splittingTokenizer{},
},
}
for _, tt := range tests {
for _, tc := range testCases {
t.Run(tt.name+":"+tc.name, func(t *testing.T) {
got, _ := tt.tokenizer.Tokenize(tc.line)
require.Equal(t, tc.want[tt.name], got)
})
}
}
}
func TestTokenizer_TokenizeAndJoin(t *testing.T) {
tests := []struct {
name string
tokenizer LineTokenizer
}{
{
name: "json with spaces",
line: `{"key1":"value1", "key2":"value2", "key3":"value3"}`,
want: []string{`{"key1":`, `"value1",`, `"key2":`, `"value2",`, `"key3":`, `"value3"}`},
name: typePunctuation,
tokenizer: newPunctuationTokenizer(),
},
{
name: "logfmt multiword values",
line: `key1=value1 key2=value2 msg="this is a message"`,
want: []string{"key1=", "value1", "key2=", "value2", "msg=", `"this`, "is", "a", `message"`},
name: typeSplitting,
tokenizer: splittingTokenizer{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := tokenizer.Tokenize(tt.line); !reflect.DeepEqual(got, tt.want) {
t.Errorf("splittingTokenizer.Tokenize() = %v, want %v", got, tt.want)
for _, tc := range testCases {
t.Run(tt.name+":"+tc.name, func(t *testing.T) {
got := tt.tokenizer.Join(tt.tokenizer.Tokenize(tc.line))
require.Equal(t, tc.line, got)
})
}
}
}
func BenchmarkSplittingTokenizer(b *testing.B) {
tokenizer := newPunctuationTokenizer()
for _, tt := range testCases {
tc := tt
b.Run(tc.name, func(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
tokenizer.Tokenize(tc.line)
}
})
}

@ -11,16 +11,18 @@ import (
)
type LogCluster struct {
id int
Size int
Tokens []string
Stringer func([]string) string
Chunks Chunks
id int
Size int
Tokens []string
TokenState interface{}
Stringer func([]string, interface{}) string
Chunks Chunks
}
func (c *LogCluster) String() string {
if c.Stringer != nil {
return c.Stringer(c.Tokens)
return c.Stringer(c.Tokens, c.TokenState)
}
return strings.Join(c.Tokens, " ")
}

File diff suppressed because one or more lines are too long

@ -13,7 +13,7 @@ import (
)
func Test_prunePatterns(t *testing.T) {
file, err := os.Open("testdata/patterns.txt")
file, err := os.Open(`testdata/patterns.txt`)
require.NoError(t, err)
defer file.Close()
@ -27,39 +27,83 @@ func Test_prunePatterns(t *testing.T) {
require.NoError(t, scanner.Err())
startingPatterns := len(resp.Series)
prunePatterns(resp, 0, newIngesterQuerierMetrics(prometheus.DefaultRegisterer, `test`))
prunePatterns(resp, 0, newIngesterQuerierMetrics(prometheus.DefaultRegisterer, "test"))
expectedPatterns := []string{
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=0 <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=1 <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=2 <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=3 <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=4 <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=5 <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=6 <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=7 <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" <_> partitionID=0, <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" <_> partitionID=7, <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=0, <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=1, <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=2, <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=3, <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=3, <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=4, <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=4, <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=5, <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=5, <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=6, <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=7, <_> +0000 UTC, <_>`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=0 <_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=1 <_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=2 <_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=3 handledMessageTime="2024-04-03 <_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=4 handledMessageTime="2024-04-03 <_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=5 handledMessageTime="2024-04-03 <_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=6 <_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=7 <_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=wrapper.go:48 level=info component=distributor msg="sample remote write" eventType=bi <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=0 <_> <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=1 <_> <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=2 <_> <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=3 <_> <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=4 <_> <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=5 <_> <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=6 <_> <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=7 <_> <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" <_> partitionID=0, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" <_> partitionID=0, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" <_> partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=0, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=0, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=0, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=0, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=0, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=3, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=3, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=3, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=3, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=3, <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=4, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=4, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=4, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=4, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=4, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=4, <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=4, <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=5, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=5, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=5, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=5, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=5, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=5, <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=0 <_> <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=1 <_> <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=2 <_> <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=3 handledMessageTime="2024-04-03 <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=4 handledMessageTime="2024-04-03 <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=5 handledMessageTime="2024-04-03 <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=6 <_> <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=7 <_> <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=wrapper.go:48 level=info component=distributor msg="sample remote write" eventType=bi <_> <_> <_>`,
}
patterns := make([]string, 0, len(resp.Series))
@ -69,5 +113,5 @@ func Test_prunePatterns(t *testing.T) {
slices.Sort(patterns)
require.Equal(t, expectedPatterns, patterns)
require.Less(t, len(patterns), startingPatterns, `prunePatterns should remove duplicates`)
require.Less(t, len(patterns), startingPatterns, "prunePatterns should remove duplicates")
}

Loading…
Cancel
Save