feat: Add metrics for number of patterns detected & evicted (#12918)

pull/12977/head
benclive 2 years ago committed by GitHub
parent a1b1eeb095
commit bc53b33721
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 17
      pkg/pattern/drain/drain.go
  2. 56
      pkg/pattern/drain/drain_test.go
  3. 8
      pkg/pattern/drain/metrics.go
  4. 2
      pkg/pattern/ingester.go
  5. 2
      pkg/pattern/ingester_querier.go
  6. 2
      pkg/pattern/ingester_test.go
  7. 6
      pkg/pattern/instance.go
  8. 16
      pkg/pattern/metrics.go
  9. 6
      pkg/pattern/stream.go
  10. 4
      pkg/pattern/stream_test.go

@ -44,11 +44,11 @@ type Config struct {
ParamString string
}
func createLogClusterCache(maxSize int) *LogClusterCache {
func createLogClusterCache(maxSize int, onEvict func(int, *LogCluster)) *LogClusterCache {
if maxSize == 0 {
maxSize = math.MaxInt
}
cache, _ := simplelru.NewLRU[int, *LogCluster](maxSize, nil)
cache, _ := simplelru.NewLRU[int, *LogCluster](maxSize, onEvict)
return &LogClusterCache{
cache: cache,
}
@ -146,16 +146,21 @@ func DefaultConfig() *Config {
}
}
func New(config *Config) *Drain {
func New(config *Config, metrics *Metrics) *Drain {
if config.LogClusterDepth < 3 {
panic("depth argument must be at least 3")
}
config.maxNodeDepth = config.LogClusterDepth - 2
var evictFn func(int, *LogCluster)
if metrics != nil {
evictFn = func(int, *LogCluster) { metrics.PatternsEvictedTotal.Inc() }
}
d := &Drain{
config: config,
rootNode: createNode(),
idToCluster: createLogClusterCache(config.MaxClusters),
idToCluster: createLogClusterCache(config.MaxClusters, evictFn),
metrics: metrics,
}
return d
}
@ -165,6 +170,7 @@ type Drain struct {
rootNode *Node
idToCluster *LogClusterCache
clustersCounter int
metrics *Metrics
}
func (d *Drain) Clusters() []*LogCluster {
@ -195,6 +201,9 @@ func (d *Drain) train(tokens []string, stringer func([]string) string, ts int64)
matchCluster.append(model.TimeFromUnixNano(ts))
d.idToCluster.Set(clusterID, matchCluster)
d.addSeqToPrefixTree(d.rootNode, matchCluster)
if d.metrics != nil {
d.metrics.PatternsDetectedTotal.Inc()
}
} else {
newTemplateTokens := d.createTemplate(tokens, matchCluster.Tokens)
matchCluster.Tokens = newTemplateTokens

@ -20,9 +20,9 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
}{
{
// High variation leads to many patterns including some that are too generic (many tokens matched) and some that are too specific (too few matchers)
name: "Generate patterns on high variation logfmt logs",
drain: New(DefaultConfig()),
inputFile: "testdata/agent-logfmt.txt",
name: `Generate patterns on high variation logfmt logs`,
drain: New(DefaultConfig(), nil),
inputFile: `testdata/agent-logfmt.txt`,
patterns: []string{
"ts=2024-04-16T15:10:43.192290389Z caller=filetargetmanager.go:361 level=info component=logs logs_config=default msg=\"Adding target\" key=\"/var/log/pods/*19a1cce8-5f04-46e0-a124-292b0dd9b343/testcoordinator/*.log:{batch_kubernetes_io_controller_uid=\\\"25ec5edf-f78e-468b-b6f3-3b9685f0cc8f\\\", batch_kubernetes_io_job_name=\\\"testcoordinator-job-2665838\\\", container=\\\"testcoordinator\\\", controller_uid=\\\"25ec5edf-f78e-468b-b6f3-3b9685f0cc8f\\\", job=\\\"k6-cloud/testcoordinator\\\", job_name=\\\"testcoordinator-job-2665838\\\", name=\\\"testcoordinator\\\", namespace=\\\"k6-cloud\\\", pod=\\\"testcoordinator-job-2665838-9g8ds\\\"}\"",
"<_> <_> level=info component=logs logs_config=default <_> target\" <_> <_> <_> <_> <_> <_>",
@ -42,9 +42,9 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
{
// Lower variation leads to fewer patterns including some with limited value (single lines, no matchers)
name: "Generate patterns on low variation logfmt logs",
drain: New(DefaultConfig()),
inputFile: "testdata/ingester-logfmt.txt",
name: `Generate patterns on low variation logfmt logs`,
drain: New(DefaultConfig(), nil),
inputFile: `testdata/ingester-logfmt.txt`,
patterns: []string{
"<_> caller=head.go:216 level=debug tenant=987678 msg=\"profile is empty after delta computation\" metricName=memory",
"ts=2024-04-17T09:52:46.363974185Z caller=http.go:194 level=debug traceID=1b48f5156a61ca69 msg=\"GET /debug/pprof/delta_mutex (200) 1.161082ms\"",
@ -53,9 +53,9 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
{
// Lower variation logs in json leads to a high number of patterns with very few matchers
name: "Generate patterns on json formatted logs",
drain: New(DefaultConfig()),
inputFile: "testdata/drone-json.txt",
name: `Generate patterns on json formatted logs`,
drain: New(DefaultConfig(), nil),
inputFile: `testdata/drone-json.txt`,
patterns: []string{
"<_> capacity <_>",
"<_> capacity changes <_>",
@ -96,7 +96,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
{
name: "Patterns for distributor logs",
drain: New(DefaultConfig()),
drain: New(DefaultConfig(), nil),
inputFile: "testdata/distributor-logfmt.txt",
patterns: []string{
`<_> caller=http.go:194 level=debug <_> <_> msg="POST <_> <_> <_>`,
@ -104,7 +104,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
{
name: "Patterns for journald logs",
drain: New(DefaultConfig()),
drain: New(DefaultConfig(), nil),
inputFile: "testdata/journald.txt",
patterns: []string{
"2024-05-07T11:59:43.484606Z INFO ExtHandler ExtHandler Downloading agent manifest",
@ -195,7 +195,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
{
name: "Patterns for kafka logs",
drain: New(DefaultConfig()),
drain: New(DefaultConfig(), nil),
inputFile: "testdata/kafka.txt",
patterns: []string{
`[2024-05-07 <_> INFO [LocalLog partition=mimir-dev-09-aggregations-offsets-0, dir=/bitnami/kafka/data] Deleting segment files <_> size=948, <_> <_> (kafka.log.LocalLog$)`,
@ -219,7 +219,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
{
name: "Patterns for kubernetes logs",
drain: New(DefaultConfig()),
drain: New(DefaultConfig(), nil),
inputFile: "testdata/kubernetes.txt",
patterns: []string{
"I0507 12:04:17.596484 1 highnodeutilization.go:107] \"Criteria for a node below target utilization\" CPU=50 Mem=50 Pods=100",
@ -252,7 +252,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
{
name: "Patterns for vault logs",
drain: New(DefaultConfig()),
drain: New(DefaultConfig(), nil),
inputFile: "testdata/vault.txt",
patterns: []string{
"<_> [INFO] expiration: revoked lease: <_>",
@ -260,7 +260,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
{
name: "Patterns for calico logs",
drain: New(DefaultConfig()),
drain: New(DefaultConfig(), nil),
inputFile: "testdata/calico.txt",
patterns: []string{
`2024-05-08 <_> [DEBUG][216945] felix/table.go 870: Found forward-reference <_> ipVersion=0x4 <_> <_> [0:0]" table="nat"`,
@ -383,8 +383,8 @@ func TestDrain_TrainGeneratesMatchablePatterns(t *testing.T) {
inputLines []string
}{
{
name: `should match each line against a pattern`,
drain: New(DefaultConfig()),
name: "should match each line against a pattern",
drain: New(DefaultConfig(), nil),
inputLines: []string{
"test test test",
"test test test",
@ -393,8 +393,8 @@ func TestDrain_TrainGeneratesMatchablePatterns(t *testing.T) {
},
},
{
name: `should also match newlines`,
drain: New(DefaultConfig()),
name: "should also match newlines",
drain: New(DefaultConfig(), nil),
inputLines: []string{
`test test test
`,
@ -413,7 +413,6 @@ func TestDrain_TrainGeneratesMatchablePatterns(t *testing.T) {
for _, line := range tt.inputLines {
tt.drain.Train(line, 0)
}
t.Log(`Learned clusters`, tt.drain.Clusters())
for _, line := range tt.inputLines {
match := tt.drain.Match(line)
@ -432,8 +431,8 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
inputLines []string
}{
{
name: `should extract patterns that all lines match`,
drain: New(DefaultConfig()),
name: "should extract patterns that all lines match",
drain: New(DefaultConfig(), nil),
inputLines: []string{
"test 1 test",
"test 2 test",
@ -442,8 +441,8 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
},
},
{
name: `should extract patterns that match if line ends with newlines`,
drain: New(DefaultConfig()),
name: "should extract patterns that match if line ends with newlines",
drain: New(DefaultConfig(), nil),
inputLines: []string{
`test 1 test
`,
@ -456,8 +455,8 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
},
},
{
name: `should extract patterns that match if line ends with empty space`,
drain: New(DefaultConfig()),
name: "should extract patterns that match if line ends with empty space",
drain: New(DefaultConfig(), nil),
inputLines: []string{
`test 1 test `,
`test 2 test `,
@ -466,8 +465,8 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
},
},
{
name: `should extract patterns that match if line starts with empty space`,
drain: New(DefaultConfig()),
name: "should extract patterns that match if line starts with empty space",
drain: New(DefaultConfig(), nil),
inputLines: []string{
` test 1 test`,
` test 2 test`,
@ -484,7 +483,6 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
}
require.Equal(t, 1, len(tt.drain.Clusters()))
cluster := tt.drain.Clusters()[0]
t.Log(`Extracted cluster: `, cluster)
matcher, err := pattern.ParseLineFilter([]byte(cluster.String()))
require.NoError(t, err)

@ -0,0 +1,8 @@
package drain
import "github.com/prometheus/client_golang/prometheus"
type Metrics struct {
PatternsEvictedTotal prometheus.Counter
PatternsDetectedTotal prometheus.Counter
}

@ -273,7 +273,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { /
inst, ok = i.instances[instanceID]
if !ok {
var err error
inst, err = newInstance(instanceID, i.logger)
inst, err = newInstance(instanceID, i.logger, i.metrics)
if err != nil {
return nil, err
}

@ -67,7 +67,7 @@ func (q *IngesterQuerier) Patterns(ctx context.Context, req *logproto.QueryPatte
}
func prunePatterns(resp *logproto.QueryPatternsResponse, minClusterSize int) *logproto.QueryPatternsResponse {
d := drain.New(drain.DefaultConfig())
d := drain.New(drain.DefaultConfig(), nil)
for _, p := range resp.Series {
d.TrainPattern(p.Pattern, p.Samples)
}

@ -18,7 +18,7 @@ import (
func TestInstancePushQuery(t *testing.T) {
lbs := labels.New(labels.Label{Name: "test", Value: "test"})
inst, err := newInstance("foo", log.NewNopLogger())
inst, err := newInstance("foo", log.NewNopLogger(), newIngesterMetrics(nil, "test"))
require.NoError(t, err)
err = inst.Push(context.Background(), &push.PushRequest{

@ -30,9 +30,10 @@ type instance struct {
streams *streamsMap
index *index.BitPrefixInvertedIndex
logger log.Logger
metrics *ingesterMetrics
}
func newInstance(instanceID string, logger log.Logger) (*instance, error) {
func newInstance(instanceID string, logger log.Logger, metrics *ingesterMetrics) (*instance, error) {
index, err := index.NewBitPrefixWithShards(indexShards)
if err != nil {
return nil, err
@ -43,6 +44,7 @@ func newInstance(instanceID string, logger log.Logger) (*instance, error) {
instanceID: instanceID,
streams: newStreamsMap(),
index: index,
metrics: metrics,
}
i.mapper = ingester.NewFPMapper(i.getLabelsFromFingerprint)
return i, nil
@ -138,7 +140,7 @@ func (i *instance) createStream(_ context.Context, pushReqStream logproto.Stream
}
fp := i.getHashForLabels(labels)
sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(labels), fp)
s, err := newStream(fp, sortedLabels)
s, err := newStream(fp, sortedLabels, i.metrics)
if err != nil {
return nil, fmt.Errorf("failed to create stream: %w", err)
}

@ -6,7 +6,9 @@ import (
)
type ingesterMetrics struct {
flushQueueLength prometheus.Gauge
flushQueueLength prometheus.Gauge
patternsDiscardedTotal prometheus.Counter
patternsDetectedTotal prometheus.Counter
}
func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *ingesterMetrics {
@ -17,5 +19,17 @@ func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *inges
Name: "flush_queue_length",
Help: "The total number of series pending in the flush queue.",
}),
patternsDiscardedTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "patterns_evicted_total",
Help: "The total number of patterns evicted from the LRU cache.",
}),
patternsDetectedTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "patterns_detected_total",
Help: "The total number of patterns detected from incoming log lines.",
}),
}
}

@ -27,13 +27,17 @@ type stream struct {
func newStream(
fp model.Fingerprint,
labels labels.Labels,
metrics *ingesterMetrics,
) (*stream, error) {
return &stream{
fp: fp,
labels: labels,
labelsString: labels.String(),
labelHash: labels.Hash(),
patterns: drain.New(drain.DefaultConfig()),
patterns: drain.New(drain.DefaultConfig(), &drain.Metrics{
PatternsEvictedTotal: metrics.patternsDiscardedTotal,
PatternsDetectedTotal: metrics.patternsDetectedTotal,
}),
}, nil
}

@ -16,7 +16,7 @@ import (
func TestAddStream(t *testing.T) {
lbs := labels.New(labels.Label{Name: "test", Value: "test"})
stream, err := newStream(model.Fingerprint(lbs.Hash()), lbs)
stream, err := newStream(model.Fingerprint(lbs.Hash()), lbs, newIngesterMetrics(nil, "test"))
require.NoError(t, err)
err = stream.Push(context.Background(), []push.Entry{
@ -44,7 +44,7 @@ func TestAddStream(t *testing.T) {
func TestPruneStream(t *testing.T) {
lbs := labels.New(labels.Label{Name: "test", Value: "test"})
stream, err := newStream(model.Fingerprint(lbs.Hash()), lbs)
stream, err := newStream(model.Fingerprint(lbs.Hash()), lbs, newIngesterMetrics(nil, "test"))
require.NoError(t, err)
err = stream.Push(context.Background(), []push.Entry{

Loading…
Cancel
Save