feat: Tune Patterns query drain instance (#13137)

pull/13083/head^2
benclive 12 months ago committed by GitHub
parent 6e119aaef5
commit 30df31e28b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 2
      pkg/pattern/drain/drain.go
  2. 24
      pkg/pattern/ingester_querier.go
  3. 43
      pkg/pattern/ingester_querier_test.go
  4. 22
      pkg/pattern/metrics.go

@ -220,7 +220,7 @@ func (d *Drain) train(tokens []string, stringer func([]string) string, ts int64)
func (d *Drain) TrainPattern(content string, samples []*logproto.PatternSample) *LogCluster {
tokens := deduplicatePlaceholders(d.tokenizer.Tokenize(content), d.config.ParamString)
matchCluster := d.treeSearch(d.rootNode, tokens, d.config.SimTh, false)
matchCluster := d.treeSearch(d.rootNode, tokens, d.config.SimTh, true)
// Match no existing log cluster
if matchCluster == nil {
d.clustersCounter++

@ -25,7 +25,8 @@ type IngesterQuerier struct {
ringClient *RingClient
registerer prometheus.Registerer
registerer prometheus.Registerer
ingesterQuerierMetrics *ingesterQuerierMetrics
}
func NewIngesterQuerier(
@ -36,10 +37,11 @@ func NewIngesterQuerier(
logger log.Logger,
) (*IngesterQuerier, error) {
return &IngesterQuerier{
logger: log.With(logger, "component", "pattern-ingester-querier"),
ringClient: ringClient,
cfg: cfg,
registerer: prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", registerer),
logger: log.With(logger, "component", "pattern-ingester-querier"),
ringClient: ringClient,
cfg: cfg,
registerer: prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", registerer),
ingesterQuerierMetrics: newIngesterQuerierMetrics(registerer, metricsNamespace),
}, nil
}
@ -63,11 +65,15 @@ func (q *IngesterQuerier) Patterns(ctx context.Context, req *logproto.QueryPatte
if err != nil {
return nil, err
}
return prunePatterns(resp, minClusterSize), nil
return prunePatterns(resp, minClusterSize, q.ingesterQuerierMetrics), nil
}
func prunePatterns(resp *logproto.QueryPatternsResponse, minClusterSize int) *logproto.QueryPatternsResponse {
d := drain.New(drain.DefaultConfig(), nil)
func prunePatterns(resp *logproto.QueryPatternsResponse, minClusterSize int, metrics *ingesterQuerierMetrics) *logproto.QueryPatternsResponse {
pruneConfig := drain.DefaultConfig()
pruneConfig.SimTh = 1.0 // Merge & de-dup patterns but don't modify them
patternsBefore := len(resp.Series)
d := drain.New(pruneConfig, nil)
for _, p := range resp.Series {
d.TrainPattern(p.Pattern, p.Samples)
}
@ -86,6 +92,8 @@ func prunePatterns(resp *logproto.QueryPatternsResponse, minClusterSize int) *lo
Samples: cluster.Samples(),
})
}
metrics.patternsPrunedTotal.Add(float64(patternsBefore - len(resp.Series)))
metrics.patternsRetainedTotal.Add(float64(len(resp.Series)))
return resp
}

@ -5,6 +5,7 @@ import (
"os"
"testing"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
@ -12,7 +13,7 @@ import (
)
func Test_prunePatterns(t *testing.T) {
file, err := os.Open(`testdata/patterns.txt`)
file, err := os.Open("testdata/patterns.txt")
require.NoError(t, err)
defer file.Close()
@ -24,15 +25,40 @@ func Test_prunePatterns(t *testing.T) {
})
}
require.NoError(t, scanner.Err())
prunePatterns(resp, 0)
startingPatterns := len(resp.Series)
prunePatterns(resp, 0, newIngesterQuerierMetrics(prometheus.DefaultRegisterer, `test`))
expectedPatterns := []string{
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=<_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" <_> partitionID=<_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=<_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=<_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=<_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=<_> handledMessageTime="2024-04-03 <_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=0 <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=1 <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=2 <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=3 <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=4 <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=5 <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=6 <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=7 <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" <_> partitionID=0, <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" <_> partitionID=7, <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=0, <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=1, <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=2, <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=3, <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=3, <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=4, <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=4, <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=5, <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=5, <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=6, <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=7, <_> +0000 UTC, <_>`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=0 <_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=1 <_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=2 <_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=3 handledMessageTime="2024-04-03 <_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=4 handledMessageTime="2024-04-03 <_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=5 handledMessageTime="2024-04-03 <_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=6 <_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=7 <_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=wrapper.go:48 level=info component=distributor msg="sample remote write" eventType=bi <_>`,
}
@ -43,4 +69,5 @@ func Test_prunePatterns(t *testing.T) {
slices.Sort(patterns)
require.Equal(t, expectedPatterns, patterns)
require.Less(t, len(patterns), startingPatterns, `prunePatterns should remove duplicates`)
}

@ -33,3 +33,25 @@ func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *inges
}),
}
}
type ingesterQuerierMetrics struct {
patternsPrunedTotal prometheus.Counter
patternsRetainedTotal prometheus.Counter
}
func newIngesterQuerierMetrics(r prometheus.Registerer, metricsNamespace string) *ingesterQuerierMetrics {
return &ingesterQuerierMetrics{
patternsPrunedTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "query_pruned_total",
Help: "The total number of patterns removed at query time by the pruning Drain instance",
}),
patternsRetainedTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "query_retained_total",
Help: "The total number of patterns retained at query time by the pruning Drain instance",
}),
}
}

Loading…
Cancel
Save