fix: Fixes pattern pruning stability (#13429)

pull/11888/head^2
Cyril Tovena 11 months ago committed by GitHub
parent 2affa4862e
commit 7c86e651ac
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 116
      pkg/pattern/ingester_querier.go
  2. 176
      pkg/pattern/ingester_querier_test.go
  3. 75
      pkg/pattern/testdata/patterns.txt

@ -5,23 +5,27 @@ import (
"errors"
"math"
"net/http"
"sort"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/ring"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/errgroup"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/pattern/drain"
loki_iter "github.com/grafana/loki/v3/pkg/iter"
pattern_iter "github.com/grafana/loki/v3/pkg/pattern/iter"
)
// TODO(kolesnikovae): parametrise QueryPatternsRequest
const minClusterSize = 30
const (
minClusterSize = 30
maxPatterns = 300
)
var ErrParseQuery = errors.New("only byte_over_time and count_over_time queries without filters are supported")
@ -132,36 +136,63 @@ func (q *IngesterQuerier) querySample(ctx context.Context, req *logproto.QuerySa
return iterators, nil
}
func prunePatterns(resp *logproto.QueryPatternsResponse, minClusterSize int, metrics *ingesterQuerierMetrics) *logproto.QueryPatternsResponse {
pruneConfig := drain.DefaultConfig()
pruneConfig.SimTh = 1.0 // Merge & de-dup patterns but don't modify them
func prunePatterns(resp *logproto.QueryPatternsResponse, minClusterSize int64, metrics *ingesterQuerierMetrics) *logproto.QueryPatternsResponse {
patternsBefore := len(resp.Series)
d := drain.New(pruneConfig, "", nil)
for _, p := range resp.Series {
d.TrainPattern(p.GetPattern(), p.Samples)
total := make([]int64, len(resp.Series))
for i, p := range resp.Series {
for _, s := range p.Samples {
total[i] += s.Value
}
}
resp.Series = resp.Series[:0]
for _, cluster := range d.Clusters() {
if cluster.Size < minClusterSize {
continue
// Create a slice of structs to keep Series and total together
type SeriesWithTotal struct {
Series *logproto.PatternSeries
Total int64
}
seriesWithTotals := make([]SeriesWithTotal, len(resp.Series))
for i := range resp.Series {
seriesWithTotals[i] = SeriesWithTotal{
Series: resp.Series[i],
Total: total[i],
}
pattern := d.PatternString(cluster)
if pattern == "" {
continue
}
// Sort the slice of structs by the Total field
sort.Slice(seriesWithTotals, func(i, j int) bool {
return seriesWithTotals[i].Total > seriesWithTotals[j].Total
})
// Initialize a variable to keep track of the position for valid series
pos := 0
// Iterate over the seriesWithTotals
for i := range seriesWithTotals {
if seriesWithTotals[i].Total >= minClusterSize {
// Place the valid series at the current position
resp.Series[pos] = seriesWithTotals[i].Series
pos++
}
resp.Series = append(resp.Series,
logproto.NewPatternSeries(pattern, cluster.Samples()))
}
// Slice the resp.Series to include only the valid series
resp.Series = resp.Series[:pos]
if len(resp.Series) > maxPatterns {
resp.Series = resp.Series[:maxPatterns]
}
metrics.patternsPrunedTotal.Add(float64(patternsBefore - len(resp.Series)))
metrics.patternsRetainedTotal.Add(float64(len(resp.Series)))
return resp
}
// ForAllIngesters runs f, in parallel, for all ingesters
func (q *IngesterQuerier) forAllIngesters(ctx context.Context, f func(context.Context, logproto.PatternClient) (interface{}, error)) ([]ResponseFromIngesters, error) {
replicationSet, err := q.ringClient.Ring().GetReplicationSetForOperation(ring.Read)
replicationSet, err := q.ringClient.Ring().GetAllHealthy(ring.Read)
if err != nil {
return nil, err
}
@ -174,32 +205,29 @@ type ResponseFromIngesters struct {
response interface{}
}
// forGivenIngesters runs f, in parallel, for given ingesters
func (q *IngesterQuerier) forGivenIngesters(ctx context.Context, replicationSet ring.ReplicationSet, f func(context.Context, logproto.PatternClient) (interface{}, error)) ([]ResponseFromIngesters, error) {
cfg := ring.DoUntilQuorumConfig{
// Nothing here
}
results, err := ring.DoUntilQuorum(ctx, replicationSet, cfg, func(ctx context.Context, ingester *ring.InstanceDesc) (ResponseFromIngesters, error) {
client, err := q.ringClient.Pool().GetClientFor(ingester.Addr)
if err != nil {
return ResponseFromIngesters{addr: ingester.Addr}, err
}
resp, err := f(ctx, client.(logproto.PatternClient))
if err != nil {
return ResponseFromIngesters{addr: ingester.Addr}, err
}
return ResponseFromIngesters{ingester.Addr, resp}, nil
}, func(ResponseFromIngesters) {
// Nothing to do
})
if err != nil {
g, ctx := errgroup.WithContext(ctx)
responses := make([]ResponseFromIngesters, len(replicationSet.Instances))
for i, ingester := range replicationSet.Instances {
ingester := ingester
i := i
g.Go(func() error {
client, err := q.ringClient.Pool().GetClientFor(ingester.Addr)
if err != nil {
return err
}
resp, err := f(ctx, client.(logproto.PatternClient))
if err != nil {
return err
}
responses[i] = ResponseFromIngesters{addr: ingester.Addr, response: resp}
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
responses := make([]ResponseFromIngesters, 0, len(results))
responses = append(responses, results...)
return responses, err
return responses, nil
}

@ -1,16 +1,13 @@
package pattern
import (
"bufio"
"context"
"os"
"testing"
"time"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
"github.com/grafana/dskit/ring"
ring_client "github.com/grafana/dskit/ring/client"
@ -20,106 +17,83 @@ import (
"github.com/grafana/loki/v3/pkg/pattern/metric"
)
func Test_prunePatterns(t *testing.T) {
file, err := os.Open(`testdata/patterns.txt`)
require.NoError(t, err)
defer file.Close()
resp := new(logproto.QueryPatternsResponse)
scanner := bufio.NewScanner(file)
for scanner.Scan() {
resp.Series = append(resp.Series, logproto.NewPatternSeries(scanner.Text(), []*logproto.PatternSample{}))
}
require.NoError(t, scanner.Err())
startingPatterns := len(resp.Series)
prunePatterns(resp, 0, newIngesterQuerierMetrics(prometheus.DefaultRegisterer, "test"))
expectedPatterns := []string{
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=0 <_> <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=1 <_> <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=2 <_> <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=3 <_> <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=4 <_> <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=5 <_> <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=6 <_> <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=7 <_> <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" <_> partitionID=0, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" <_> partitionID=0, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" <_> partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=0, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=0, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=0, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=0, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=0, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=3, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=3, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=3, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=3, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=3, <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=4, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=4, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=4, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=4, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=4, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=4, <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=4, <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=5, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=5, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=5, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=5, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=5, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=5, <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=0 <_> <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=1 <_> <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=2 <_> <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=3 handledMessageTime="2024-04-03 <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=4 handledMessageTime="2024-04-03 <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=5 handledMessageTime="2024-04-03 <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=6 <_> <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=7 <_> <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=wrapper.go:48 level=info component=distributor msg="sample remote write" eventType=bi <_> <_> <_>`,
func TestPrunePatterns(t *testing.T) {
metrics := newIngesterQuerierMetrics(prometheus.NewRegistry(), "test")
testCases := []struct {
name string
inputSeries []*logproto.PatternSeries
minClusterSize int64
expectedSeries []*logproto.PatternSeries
expectedPruned int
expectedRetained int
}{
{
name: "No pruning needed",
inputSeries: []*logproto.PatternSeries{
{Pattern: `{app="test1"}`, Samples: []*logproto.PatternSample{{Value: 40}}},
{Pattern: `{app="test2"}`, Samples: []*logproto.PatternSample{{Value: 35}}},
},
minClusterSize: 20,
expectedSeries: []*logproto.PatternSeries{
{Pattern: `{app="test1"}`, Samples: []*logproto.PatternSample{{Value: 40}}},
{Pattern: `{app="test2"}`, Samples: []*logproto.PatternSample{{Value: 35}}},
},
expectedPruned: 0,
expectedRetained: 2,
},
{
name: "Pruning some patterns",
inputSeries: []*logproto.PatternSeries{
{Pattern: `{app="test1"}`, Samples: []*logproto.PatternSample{{Value: 10}}},
{Pattern: `{app="test2"}`, Samples: []*logproto.PatternSample{{Value: 5}}},
{Pattern: `{app="test3"}`, Samples: []*logproto.PatternSample{{Value: 50}}},
},
minClusterSize: 20,
expectedSeries: []*logproto.PatternSeries{
{Pattern: `{app="test3"}`, Samples: []*logproto.PatternSample{{Value: 50}}},
},
expectedPruned: 2,
expectedRetained: 1,
},
{
name: "Limit patterns to maxPatterns",
inputSeries: func() []*logproto.PatternSeries {
series := make([]*logproto.PatternSeries, maxPatterns+10)
for i := 0; i < maxPatterns+10; i++ {
series[i] = &logproto.PatternSeries{
Pattern: `{app="test"}`,
Samples: []*logproto.PatternSample{{Value: int64(maxPatterns + 10 - i)}},
}
}
return series
}(),
minClusterSize: 0,
expectedSeries: func() []*logproto.PatternSeries {
series := make([]*logproto.PatternSeries, maxPatterns)
for i := 0; i < maxPatterns; i++ {
series[i] = &logproto.PatternSeries{
Pattern: `{app="test"}`,
Samples: []*logproto.PatternSample{{Value: int64(maxPatterns + 10 - i)}},
}
}
return series
}(),
expectedPruned: 10,
expectedRetained: maxPatterns,
},
}
patterns := make([]string, 0, len(resp.Series))
for _, p := range resp.Series {
patterns = append(patterns, p.GetPattern())
}
slices.Sort(patterns)
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
resp := &logproto.QueryPatternsResponse{
Series: tc.inputSeries,
}
result := prunePatterns(resp, tc.minClusterSize, metrics)
require.Equal(t, expectedPatterns, patterns)
require.Less(t, len(patterns), startingPatterns, "prunePatterns should remove duplicates")
require.Equal(t, len(tc.expectedSeries), len(result.Series))
require.Equal(t, tc.expectedSeries, result.Series)
})
}
}
func Test_Samples(t *testing.T) {
@ -281,7 +255,7 @@ func (f *fakeRing) Get(
}
func (f *fakeRing) GetAllHealthy(_ ring.Operation) (ring.ReplicationSet, error) {
panic("not implemented")
return ring.ReplicationSet{}, nil
}
func (f *fakeRing) GetReplicationSetForOperation(_ ring.Operation) (ring.ReplicationSet, error) {

@ -1,75 +0,0 @@
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=5, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>
<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=3 <_> <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=0 <_> <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=4, <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=4, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>
<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=2 <_> <_>
<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=6 <_> <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=5, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" <_> partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=0, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=5, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=wrapper.go:48 level=info component=distributor msg="sample remote write" eventType=bi <_> <_> <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" <_> partitionID=0, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" <_> partitionID=0, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=7 <_> <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type"
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=3, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=3, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=4, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=1 <_> <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=6 <_> <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type"
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=0, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=0 <_> <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type"
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=4, <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=2 <_> <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type"
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=5, <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>
<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=3 handledMessageTime="2024-04-03 <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type"
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=3, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=0, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=4, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>
<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=5 <_> <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=4, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=5, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=0, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=0, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=3, <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=4 <_> <_>
<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=4 handledMessageTime="2024-04-03 <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type"
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=4, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=1 <_> <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type"
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=3, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=7 <_> <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=5, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=3, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>
<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=5 handledMessageTime="2024-04-03 <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type"
<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>
Loading…
Cancel
Save