Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/pattern/stream_test.go

997 lines
32 KiB

package pattern
import (
"context"
"fmt"
"strings"
"testing"
"time"
"github.com/go-kit/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/pattern/aggregation"
"github.com/grafana/loki/v3/pkg/pattern/drain"
"github.com/grafana/loki/v3/pkg/pattern/iter"
"github.com/grafana/loki/v3/pkg/util/constants"
"github.com/grafana/loki/pkg/push"
)
func TestFilterClustersByVolume(t *testing.T) {
// Create test clusters with different volumes
cluster1 := &drain.LogCluster{
Tokens: []string{"high", "volume", "pattern"},
Volume: 500,
SampleCount: 50,
}
cluster2 := &drain.LogCluster{
Tokens: []string{"medium", "volume", "pattern"},
Volume: 300,
SampleCount: 30,
}
cluster3 := &drain.LogCluster{
Tokens: []string{"low", "volume", "pattern", "one"},
Volume: 150,
SampleCount: 15,
}
cluster4 := &drain.LogCluster{
Tokens: []string{"low", "volume", "pattern", "two"},
Volume: 50,
SampleCount: 5,
}
clusters := []clusterWithMeta{
{cluster: cluster1},
{cluster: cluster2},
{cluster: cluster3},
{cluster: cluster4},
}
// Test 90% threshold - should keep 500+300+150=950 out of 1000 total
result := filterClustersByVolume(clusters, 0.9)
require.Equal(t, 3, len(result), "Should keep top 3 clusters for 90% volume threshold")
// Verify sorting worked correctly
require.Equal(t, cluster1, result[0].cluster, "Highest volume cluster should be first")
require.Equal(t, cluster2, result[1].cluster, "Second highest volume cluster should be second")
require.Equal(t, cluster3, result[2].cluster, "Third highest volume cluster should be third")
// Reset clusters order for next test
clusters = []clusterWithMeta{
{cluster: cluster1},
{cluster: cluster2},
{cluster: cluster3},
{cluster: cluster4},
}
// Test 50% threshold - should keep only 500 out of 1000 total
result = filterClustersByVolume(clusters, 0.5)
require.Equal(t, 1, len(result), "Should keep only top cluster for 50% volume threshold")
require.Equal(t, cluster1, result[0].cluster, "Highest volume cluster should be kept")
// Reset clusters order for next test
clusters = []clusterWithMeta{
{cluster: cluster1},
{cluster: cluster2},
{cluster: cluster3},
{cluster: cluster4},
}
// Test 100% threshold - should keep all clusters
result = filterClustersByVolume(clusters, 1.0)
require.Equal(t, 4, len(result), "Should keep all clusters for 100% volume threshold")
}
func TestAddStream(t *testing.T) {
lbs := labels.New(labels.Label{Name: "test", Value: "test"})
mockWriter := &mockEntryWriter{}
stream, err := newStream(
model.Fingerprint(labels.StableHash(lbs)),
lbs,
newIngesterMetrics(nil, "test"),
log.NewNopLogger(),
drain.FormatUnknown,
"123",
drain.DefaultConfig(),
&fakeLimits{
patternRateThreshold: 1.0,
persistenceGranularity: time.Hour,
},
mockWriter,
aggregation.NewMetrics(nil),
0.99,
)
require.NoError(t, err)
err = stream.Push(context.Background(), []push.Entry{
{
Timestamp: time.Unix(20, 0),
Line: "ts=1 msg=hello",
},
{
Timestamp: time.Unix(20, 0),
Line: "ts=2 msg=hello",
},
{
Timestamp: time.Unix(10, 0),
Line: "ts=3 msg=hello", // this should be ignored because it's older than the last entry
},
})
require.NoError(t, err)
it, err := stream.Iterator(context.Background(), model.Earliest, model.Latest, model.Time(time.Second))
require.NoError(t, err)
res, err := iter.ReadAll(it)
require.NoError(t, err)
require.Equal(t, 1, len(res.Series))
require.Equal(t, int64(2), res.Series[0].Samples[0].Value)
}
func TestPruneStream(t *testing.T) {
lbs := labels.New(labels.Label{Name: "test", Value: "test"})
mockWriter := &mockEntryWriter{}
mockWriter.On("WriteEntry", mock.Anything, mock.Anything, mock.Anything, mock.Anything)
stream, err := newStream(
model.Fingerprint(labels.StableHash(lbs)),
lbs,
newIngesterMetrics(nil, "test"),
log.NewNopLogger(),
drain.FormatUnknown,
"123",
drain.DefaultConfig(),
&fakeLimits{patternRateThreshold: 1.0, persistenceGranularity: time.Hour},
mockWriter,
aggregation.NewMetrics(nil),
0.99,
)
require.NoError(t, err)
err = stream.Push(context.Background(), []push.Entry{
{
Timestamp: time.Unix(20, 0),
Line: "ts=1 msg=hello",
},
{
Timestamp: time.Unix(20, 0),
Line: "ts=2 msg=hello",
},
})
require.NoError(t, err)
require.Equal(t, true, stream.prune(time.Hour))
err = stream.Push(context.Background(), []push.Entry{
{
Timestamp: time.Now(),
Line: "ts=1 msg=hello",
},
})
require.NoError(t, err)
require.Equal(t, false, stream.prune(time.Hour))
it, err := stream.Iterator(context.Background(), model.Earliest, model.Latest, model.Time(time.Second))
require.NoError(t, err)
res, err := iter.ReadAll(it)
require.NoError(t, err)
require.Equal(t, 1, len(res.Series))
require.Equal(t, int64(1), res.Series[0].Samples[0].Value)
}
func TestStreamPruneFiltersLowVolumePatterns(t *testing.T) {
lbs := labels.New(
labels.Label{Name: "test", Value: "test"},
labels.Label{Name: "service_name", Value: "test_service"},
)
// Track what patterns are written
writtenPatterns := make(map[string]int)
mockWriter := &mockEntryWriter{}
mockWriter.On("WriteEntry", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Run(func(args mock.Arguments) {
entry := args.Get(1).(string)
// Extract pattern from the entry (it's URL-encoded in the format)
if strings.Contains(entry, "detected_pattern=") {
parts := strings.Split(entry, "detected_pattern=")
if len(parts) > 1 {
patternPart := strings.Split(parts[1], " ")[0]
writtenPatterns[patternPart]++
}
}
})
drainCfg := drain.DefaultConfig()
drainCfg.SimTh = 0.3 // Lower similarity threshold for testing
stream, err := newStream(
model.Fingerprint(labels.StableHash(lbs)),
lbs,
newIngesterMetrics(nil, "test"),
log.NewNopLogger(),
drain.FormatUnknown,
"123",
drainCfg,
&fakeLimits{
patternRateThreshold: 0, // Disable rate threshold
persistenceGranularity: time.Hour,
},
mockWriter,
aggregation.NewMetrics(nil),
0.8, // Test with 80% volume threshold
)
require.NoError(t, err)
// Push many log lines to create patterns with different volumes
// High volume pattern (60% of total volume)
for i := range 30 {
err = stream.Push(context.Background(), []push.Entry{
{
Timestamp: time.Unix(int64(i), 0),
Line: fmt.Sprintf("high volume pattern iteration %d", i),
},
})
require.NoError(t, err)
}
// Medium volume pattern (30% of total volume)
for i := range 15 {
err = stream.Push(context.Background(), []push.Entry{
{
Timestamp: time.Unix(int64(30+i), 0),
Line: fmt.Sprintf("medium volume pattern number %d", i),
},
})
require.NoError(t, err)
}
// Low volume patterns (10% of total volume combined)
for i := range 5 {
err = stream.Push(context.Background(), []push.Entry{
{
Timestamp: time.Unix(int64(45+i), 0),
Line: fmt.Sprintf("low volume pattern instance %d unique", i),
},
})
require.NoError(t, err)
}
// Prune to trigger pattern persistence
stream.prune(0)
// With 80% threshold, only high (60%) and medium (30%) volume patterns should be written
// Low volume patterns (10%) should be filtered out
// Check that we have patterns written
require.Equal(t, len(writtenPatterns), 2, "Should have written some patterns")
// The exact pattern strings depend on drain's clustering, but we should see
// fewer patterns written than total clusters due to filtering
// We pushed 50 total log lines creating at least 3 distinct patterns
// With 80% threshold, we expect the low volume ones to be filtered
// Verify the mock was called (patterns were written)
mockWriter.AssertCalled(t, "WriteEntry", mock.Anything, mock.Anything, mock.Anything, mock.Anything)
}
func TestStreamPatternPersistenceOnPrune(t *testing.T) {
lbs := labels.New(
labels.Label{Name: "test", Value: "test"},
labels.Label{Name: "service_name", Value: "test_service"},
)
mockWriter := &mockEntryWriter{}
stream, err := newStream(
model.Fingerprint(lbs.Hash()),
lbs,
newIngesterMetrics(nil, "test"),
log.NewNopLogger(),
drain.FormatUnknown,
"123",
drain.DefaultConfig(),
&fakeLimits{
patternRateThreshold: 0.000001, // set a very low threshold to ensure samples are written depsite low frequency test data
persistenceGranularity: time.Hour,
},
mockWriter,
aggregation.NewMetrics(nil),
0.99,
)
require.NoError(t, err)
// Push entries with old timestamps that will be pruned
start := time.Date(2020, time.January, 10, 0, 0, 0, 0, time.UTC)
now := drain.TruncateTimestamp(model.TimeFromUnixNano(start.UnixNano()), drain.TimeResolution).Time()
oldTime := now.Add(-2 * time.Hour)
err = stream.Push(context.Background(), []push.Entry{
{
Timestamp: oldTime,
Line: "ts=1 msg=hello",
},
{
Timestamp: oldTime.Add(time.Minute),
Line: "ts=2 msg=hello",
},
{
Timestamp: oldTime.Add(5 * time.Minute),
Line: "ts=3 msg=hello",
},
})
require.NoError(t, err)
// Push a newer entry to ensure the stream isn't completely pruned
err = stream.Push(context.Background(), []push.Entry{
{
Timestamp: now,
Line: "ts=4 msg=hello",
},
})
require.NoError(t, err)
// With bucketed aggregation using chunk duration (1 hour), all entries fall into one bucket
// The bucket timestamp will be aligned to the hour boundary
bucketTime := time.Date(oldTime.Year(), oldTime.Month(), oldTime.Day(), oldTime.Hour(), 0, 0, 0, oldTime.Location())
mockWriter.On("WriteEntry",
bucketTime,
aggregation.PatternEntry(bucketTime, 3, "ts=<_> msg=hello", lbs),
labels.New(labels.Label{Name: constants.PatternLabel, Value: "test_service"}),
[]logproto.LabelAdapter{
{Name: constants.LevelLabel, Value: constants.LogLevelUnknown},
},
)
// Prune old data - this should trigger pattern writing
_ = stream.prune(time.Hour)
// Verify the pattern was written
mockWriter.AssertExpectations(t)
}
func TestStreamPersistenceGranularityMultipleEntries(t *testing.T) {
lbs := labels.New(
labels.Label{Name: "test", Value: "test"},
labels.Label{Name: "service_name", Value: "test_service"},
)
mockWriter := &mockEntryWriter{}
stream, err := newStream(
model.Fingerprint(lbs.Hash()),
lbs,
newIngesterMetrics(nil, "test"),
log.NewNopLogger(),
drain.FormatUnknown,
"123",
drain.DefaultConfig(),
&fakeLimits{patternRateThreshold: 1.0, persistenceGranularity: 15 * time.Minute},
mockWriter,
aggregation.NewMetrics(nil),
0.99,
)
require.NoError(t, err)
// Push entries across a 1-hour span that will be pruned
start := time.Date(2020, time.January, 10, 0, 0, 0, 0, time.UTC)
now := drain.TruncateTimestamp(model.TimeFromUnixNano(start.UnixNano()), drain.TimeResolution).Time()
baseTime := now.Add(-2 * time.Hour)
// Push 12 entries across 60 minutes (5 minutes apart)
entries := []push.Entry{}
for i := range 12 {
entries = append(entries, push.Entry{
Timestamp: baseTime.Add(time.Duration(i*5) * time.Minute),
Line: "ts=1 msg=hello",
})
}
err = stream.Push(context.Background(), entries)
require.NoError(t, err)
// Push a newer entry to ensure the stream isn't completely pruned
err = stream.Push(context.Background(), []push.Entry{
{
Timestamp: now,
Line: "ts=2 msg=hello",
},
})
require.NoError(t, err)
// With 15-minute persistence granularity and 1-hour chunk, we expect 4 pattern entries
// Bucket 1: baseTime + 0-15min (3 entries)
// Bucket 2: baseTime + 15-30min (3 entries)
// Bucket 3: baseTime + 30-45min (3 entries)
// Bucket 4: baseTime + 45-60min (3 entries)
// With 15-minute buckets, expect multiple calls (exact count depends on bucketing logic)
mockWriter.On("WriteEntry",
mock.MatchedBy(func(_ time.Time) bool { return true }), // Accept any timestamp
mock.MatchedBy(func(_ string) bool { return true }), // PatternEntry with some count
labels.New(labels.Label{Name: constants.PatternLabel, Value: "test_service"}),
[]logproto.LabelAdapter{
{Name: constants.LevelLabel, Value: constants.LogLevelUnknown},
},
).Maybe() // Allow multiple calls as bucketing may vary
// Prune old data - this should trigger pattern writing with multiple entries
_ = stream.prune(time.Hour)
// Verify the patterns were written
mockWriter.AssertExpectations(t)
}
func TestStreamPersistenceGranularityEdgeCases(t *testing.T) {
lbs := labels.New(
labels.Label{Name: "test", Value: "test"},
labels.Label{Name: "service_name", Value: "test_service"},
)
t.Run("empty buckets should not write patterns", func(t *testing.T) {
mockWriter := &mockEntryWriter{}
stream, err := newStream(
model.Fingerprint(lbs.Hash()),
lbs,
newIngesterMetrics(nil, "test"),
log.NewNopLogger(),
drain.FormatUnknown,
"123",
drain.DefaultConfig(),
&fakeLimits{patternRateThreshold: 1.0, persistenceGranularity: 30 * time.Minute},
mockWriter,
aggregation.NewMetrics(nil),
0.99,
)
require.NoError(t, err)
// Push a newer entry to ensure the stream isn't completely pruned
start := time.Date(2020, time.January, 10, 0, 0, 0, 0, time.UTC)
now := drain.TruncateTimestamp(model.TimeFromUnixNano(start.UnixNano()), drain.TimeResolution).Time()
err = stream.Push(context.Background(), []push.Entry{
{
Timestamp: now,
Line: "ts=1 msg=hello",
},
})
require.NoError(t, err)
// No old entries, so no patterns should be written
mockWriter.AssertNotCalled(t, "WriteEntry", mock.Anything, mock.Anything, mock.Anything, mock.Anything)
// Prune - should not write any patterns
_ = stream.prune(time.Hour)
mockWriter.AssertExpectations(t)
})
t.Run("two samples should write a pattern if above threshold", func(t *testing.T) {
mockWriter := &mockEntryWriter{}
stream, err := newStream(
model.Fingerprint(lbs.Hash()),
lbs,
newIngesterMetrics(nil, "test"),
log.NewNopLogger(),
drain.FormatUnknown,
"123",
drain.DefaultConfig(),
&fakeLimits{
patternRateThreshold: 0.0001,
persistenceGranularity: 15 * time.Minute,
},
mockWriter,
aggregation.NewMetrics(nil),
0.99,
)
require.NoError(t, err)
start := time.Date(2020, time.January, 10, 0, 0, 0, 0, time.UTC)
now := drain.TruncateTimestamp(model.TimeFromUnixNano(start.UnixNano()), drain.TimeResolution).Time()
baseTime := now.Add(-2 * time.Hour)
// Push two old entries
err = stream.Push(context.Background(), []push.Entry{
{
Timestamp: baseTime,
Line: "ts=1 msg=hello",
},
})
require.NoError(t, err)
err = stream.Push(context.Background(), []push.Entry{
{
Timestamp: baseTime.Add(time.Minute),
Line: "ts=1 msg=hello",
},
})
require.NoError(t, err)
// Push newer entry to keep stream alive
err = stream.Push(context.Background(), []push.Entry{
{
Timestamp: now,
Line: "ts=2 msg=hello",
},
})
require.NoError(t, err)
// Expect exactly one pattern entry (bucketed timestamp may differ from baseTime)
mockWriter.On("WriteEntry",
mock.MatchedBy(func(_ time.Time) bool { return true }), // Accept any timestamp
mock.MatchedBy(func(_ string) bool { return true }),
labels.New(labels.Label{Name: constants.PatternLabel, Value: "test_service"}),
[]logproto.LabelAdapter{
{Name: constants.LevelLabel, Value: constants.LogLevelUnknown},
},
).Once()
_ = stream.prune(time.Hour)
mockWriter.AssertExpectations(t)
})
t.Run("granularity larger than max chunk age should write one pattern", func(t *testing.T) {
mockWriter := &mockEntryWriter{}
stream, err := newStream(
model.Fingerprint(lbs.Hash()),
lbs,
newIngesterMetrics(nil, "test"),
log.NewNopLogger(),
drain.FormatUnknown,
"123",
drain.DefaultConfig(),
&fakeLimits{
patternRateThreshold: 1.0,
persistenceGranularity: 2 * time.Hour,
},
mockWriter,
aggregation.NewMetrics(nil),
0.99,
)
require.NoError(t, err)
start := time.Date(2020, time.January, 10, 0, 0, 0, 0, time.UTC)
now := drain.TruncateTimestamp(model.TimeFromUnixNano(start.UnixNano()), drain.TimeResolution).Time()
baseTime := now.Add(-2 * time.Hour)
// Push multiple old entries across the hour
entries := []push.Entry{}
for i := range 6 {
entries = append(entries, push.Entry{
Timestamp: baseTime.Add(time.Duration(i*10) * time.Minute),
Line: "ts=1 msg=hello",
})
}
err = stream.Push(context.Background(), entries)
require.NoError(t, err)
// Push newer entry to keep stream alive (same pattern)
err = stream.Push(context.Background(), []push.Entry{
{
Timestamp: now,
Line: "ts=1 msg=hello",
},
})
require.NoError(t, err)
// With 2-hour granularity, may write patterns for pruned data (flexible expectation)
mockWriter.On("WriteEntry",
mock.MatchedBy(func(_ time.Time) bool { return true }),
mock.MatchedBy(func(_ string) bool { return true }),
labels.New(labels.Label{Name: constants.PatternLabel, Value: "test_service"}),
[]logproto.LabelAdapter{
{Name: constants.LevelLabel, Value: constants.LogLevelUnknown},
},
).Maybe() // Allow 0 or more calls since behavior depends on bucket alignment
_ = stream.prune(time.Hour)
mockWriter.AssertExpectations(t)
})
}
func newInstanceWithLimits(
instanceID string,
logger log.Logger,
metrics *ingesterMetrics,
drainCfg *drain.Config,
drainLimits Limits,
ringClient RingClient,
ingesterID string,
metricWriter aggregation.EntryWriter,
patternWriter aggregation.EntryWriter,
aggregationMetrics *aggregation.Metrics,
volumeThreshold float64,
) (*instance, error) {
return newInstance(instanceID, logger, metrics, drainCfg, drainLimits, ringClient, ingesterID, metricWriter, patternWriter, aggregationMetrics, volumeThreshold)
}
func TestStreamPerTenantConfigurationThreading(t *testing.T) {
t.Run("should use per-tenant persistence granularity when creating streams", func(t *testing.T) {
// This test will verify that when a stream is created through the instance,
// it uses the per-tenant persistence granularity from limits instead of global config
lbs := labels.New(
labels.Label{Name: "test", Value: "test"},
labels.Label{Name: "service_name", Value: "test_service"},
)
mockWriter := &mockEntryWriter{}
mockWriter.On("GetMetrics").Return(nil)
// Create limits with per-tenant override of 10 minutes
limits := &fakeLimits{
patternRateThreshold: 1.0,
persistenceGranularity: 10 * time.Minute,
}
// Create instance with global default of 1 hour in drainCfg
drainCfg := drain.DefaultConfig()
drainCfg.MaxChunkAge = 1 * time.Hour
inst, err := newInstanceWithLimits(
"test",
log.NewNopLogger(),
newIngesterMetrics(nil, "test"),
drainCfg,
limits,
&fakeRingClient{},
"test-ingester",
mockWriter,
mockWriter,
aggregation.NewMetrics(nil),
0.99,
)
require.NoError(t, err)
// Create a stream through the instance (simulating the real flow)
ctx := context.Background()
stream, err := inst.createStream(ctx, logproto.Stream{
Labels: lbs.String(),
Entries: []logproto.Entry{
{
Timestamp: time.Now(),
Line: "test log line",
},
},
})
require.NoError(t, err)
// Verify the stream was created with the per-tenant persistence granularity
require.Equal(t, 10*time.Minute, stream.persistenceGranularity)
})
}
func TestStreamCalculatePatternRate(t *testing.T) {
lbs := labels.New(labels.Label{Name: "test", Value: "test"})
mockWriter := &mockEntryWriter{}
stream, err := newStream(
model.Fingerprint(lbs.Hash()),
lbs,
newIngesterMetrics(nil, "test"),
log.NewNopLogger(),
drain.FormatUnknown,
"123",
drain.DefaultConfig(),
&fakeLimits{
patternRateThreshold: 0.0,
persistenceGranularity: time.Hour,
},
mockWriter,
aggregation.NewMetrics(nil),
0.99,
)
require.NoError(t, err)
t.Run("should calculate correct rate for multiple samples", func(t *testing.T) {
// Create samples spanning 10 seconds with total count of 30
baseTime := model.TimeFromUnixNano(time.Now().Add(-time.Hour).UnixNano())
samples := []*logproto.PatternSample{
{Timestamp: baseTime.Add(0 * time.Second), Value: 10},
{Timestamp: baseTime.Add(5 * time.Second), Value: 10},
{Timestamp: baseTime.Add(10 * time.Second), Value: 10},
}
// Expected rate: 30 samples / 10 seconds = 3.0 samples/second
rate := stream.calculatePatternRate(samples)
require.Equal(t, 3.0, rate)
})
t.Run("should calculate correct rate for multiple samples over an hour", func(t *testing.T) {
// Create samples spanning 10 seconds with total count of 30
baseTime := model.TimeFromUnixNano(time.Now().Add(-time.Hour).UnixNano())
samples := []*logproto.PatternSample{
{Timestamp: baseTime.Add(0 * time.Second), Value: 10},
{Timestamp: baseTime.Add(5 * time.Minute), Value: 20},
{Timestamp: baseTime.Add(30 * time.Minute), Value: 30},
{Timestamp: baseTime.Add(time.Hour), Value: 12},
}
// Expected rate: 72 samples / 3600 seconds = 0.02 samples/second
rate := stream.calculatePatternRate(samples)
require.Equal(t, 0.02, rate)
})
t.Run("should handle single sample", func(t *testing.T) {
// Single sample should return 0 rate (no time span)
baseTime := model.TimeFromUnixNano(time.Now().Add(-time.Hour).UnixNano())
samples := []*logproto.PatternSample{
{Timestamp: baseTime, Value: 10},
}
rate := stream.calculatePatternRate(samples)
require.Equal(t, 0.0, rate)
})
t.Run("should handle empty samples", func(t *testing.T) {
// Empty samples should return 0 rate
samples := []*logproto.PatternSample{}
rate := stream.calculatePatternRate(samples)
require.Equal(t, 0.0, rate)
})
t.Run("should handle fractional rates", func(t *testing.T) {
// Create samples spanning 1 minute with total count of 5
baseTime := model.TimeFromUnixNano(time.Now().Add(-time.Hour).UnixNano())
samples := []*logproto.PatternSample{
{Timestamp: baseTime.Add(0 * time.Second), Value: 2},
{Timestamp: baseTime.Add(60 * time.Second), Value: 4},
}
// Expected rate: 6 samples / 60 seconds = 0.1... samples/second
rate := stream.calculatePatternRate(samples)
require.Equal(t, 0.1, rate)
})
}
func TestStreamPatternRateThresholdGating(t *testing.T) {
lbs := labels.New(
labels.Label{Name: "test", Value: "test"},
labels.Label{Name: "service_name", Value: "test_service"},
)
t.Run("should persist patterns above threshold", func(t *testing.T) {
mockWriter := &mockEntryWriter{}
stream, err := newStream(
model.Fingerprint(lbs.Hash()),
lbs,
newIngesterMetrics(nil, "test"),
log.NewNopLogger(),
drain.FormatUnknown,
"123",
drain.DefaultConfig(),
&fakeLimits{
patternRateThreshold: 1.0,
persistenceGranularity: time.Hour,
},
mockWriter,
aggregation.NewMetrics(nil),
0.99,
)
require.NoError(t, err)
// Create samples that exceed the threshold (3 samples per second)
baseTime := model.TimeFromUnixNano(time.Now().Add(-2 * time.Hour).UnixNano())
samples := []*logproto.PatternSample{
{Timestamp: baseTime, Value: 15},
{Timestamp: baseTime.Add(5 * time.Second), Value: 15},
}
// Rate: 30 samples / 5 seconds = 6.0 samples/second (above 1.0 threshold)
// Should persist the pattern
mockWriter.On("WriteEntry",
mock.MatchedBy(func(_ time.Time) bool { return true }),
mock.MatchedBy(func(_ string) bool { return true }),
labels.New(labels.Label{Name: constants.PatternLabel, Value: "test_service"}),
[]logproto.LabelAdapter{
{Name: constants.LevelLabel, Value: constants.LogLevelUnknown},
},
).Once()
stream.writePatternsBucketed(samples, lbs, "test pattern", constants.LogLevelUnknown)
mockWriter.AssertExpectations(t)
})
t.Run("should not persist patterns below threshold", func(t *testing.T) {
mockWriter := &mockEntryWriter{}
stream, err := newStream(
model.Fingerprint(lbs.Hash()),
lbs,
newIngesterMetrics(nil, "test"),
log.NewNopLogger(),
drain.FormatUnknown,
"123",
drain.DefaultConfig(),
&fakeLimits{
patternRateThreshold: 2.0,
persistenceGranularity: time.Hour,
},
mockWriter,
aggregation.NewMetrics(nil),
0.99,
)
require.NoError(t, err)
// Create samples that are below the threshold (1 sample per second)
baseTime := model.TimeFromUnixNano(time.Now().Add(-2 * time.Hour).UnixNano())
samples := []*logproto.PatternSample{
{Timestamp: baseTime, Value: 5},
{Timestamp: baseTime.Add(10 * time.Second), Value: 5},
}
// Rate: 10 samples / 10 seconds = 1.0 samples/second (below 2.0 threshold)
// Should NOT persist the pattern
mockWriter.AssertNotCalled(t, "WriteEntry")
stream.writePatternsBucketed(samples, lbs, "test pattern", constants.LogLevelUnknown)
mockWriter.AssertExpectations(t)
})
t.Run("should persist patterns exactly at threshold", func(t *testing.T) {
mockWriter := &mockEntryWriter{}
stream, err := newStream(
model.Fingerprint(lbs.Hash()),
lbs,
newIngesterMetrics(nil, "test"),
log.NewNopLogger(),
drain.FormatUnknown,
"123",
drain.DefaultConfig(),
&fakeLimits{
patternRateThreshold: 1.0,
persistenceGranularity: time.Hour,
},
mockWriter,
aggregation.NewMetrics(nil),
0.99,
)
require.NoError(t, err)
// Create samples that exactly meet the threshold (1 sample per second)
baseTime := model.TimeFromUnixNano(time.Now().Add(-2 * time.Hour).UnixNano())
samples := []*logproto.PatternSample{
{Timestamp: baseTime, Value: 5},
{Timestamp: baseTime.Add(5 * time.Second), Value: 5},
}
// Rate: 10 samples / 5 seconds = 2.0 samples/second (exactly at 1.0 threshold)
// Should persist the pattern
mockWriter.On("WriteEntry",
mock.MatchedBy(func(_ time.Time) bool { return true }),
mock.MatchedBy(func(_ string) bool { return true }),
labels.New(labels.Label{Name: constants.PatternLabel, Value: "test_service"}),
[]logproto.LabelAdapter{
{Name: constants.LevelLabel, Value: constants.LogLevelUnknown},
},
).Once()
stream.writePatternsBucketed(samples, lbs, "test pattern", constants.LogLevelUnknown)
mockWriter.AssertExpectations(t)
})
}
func TestFilterClustersByVolumeEdgeCases(t *testing.T) {
t.Run("should handle empty cluster list", func(t *testing.T) {
clusters := []clusterWithMeta{}
result := filterClustersByVolume(clusters, 0.9)
require.Equal(t, 0, len(result), "Empty list should return 0")
})
t.Run("should keep single cluster regardless of threshold", func(t *testing.T) {
cluster := &drain.LogCluster{
Tokens: []string{"test", "pattern"},
Volume: 100,
SampleCount: 10,
}
// Test with various thresholds
clusters := []clusterWithMeta{{cluster: cluster}}
result := filterClustersByVolume(clusters, 0.1)
require.Equal(t, 1, len(result), "Single cluster should always be kept")
require.Equal(t, cluster, result[0].cluster)
clusters = []clusterWithMeta{{cluster: cluster}}
result = filterClustersByVolume(clusters, 0.5)
require.Equal(t, 1, len(result), "Single cluster should always be kept")
clusters = []clusterWithMeta{{cluster: cluster}}
result = filterClustersByVolume(clusters, 0.99)
require.Equal(t, 1, len(result), "Single cluster should always be kept")
})
t.Run("should handle all clusters with equal volume", func(t *testing.T) {
clusters := []clusterWithMeta{
{cluster: &drain.LogCluster{Tokens: []string{"pattern", "1"}, Volume: 100, SampleCount: 10}},
{cluster: &drain.LogCluster{Tokens: []string{"pattern", "2"}, Volume: 100, SampleCount: 10}},
{cluster: &drain.LogCluster{Tokens: []string{"pattern", "3"}, Volume: 100, SampleCount: 10}},
{cluster: &drain.LogCluster{Tokens: []string{"pattern", "4"}, Volume: 100, SampleCount: 10}},
}
// With 50% threshold and equal volumes, should keep 2 clusters
result := filterClustersByVolume(clusters, 0.5)
require.Equal(t, 2, len(result), "Should keep exactly 2 clusters for 50% threshold")
// Reset for next test
clusters = []clusterWithMeta{
{cluster: &drain.LogCluster{Tokens: []string{"pattern", "1"}, Volume: 100, SampleCount: 10}},
{cluster: &drain.LogCluster{Tokens: []string{"pattern", "2"}, Volume: 100, SampleCount: 10}},
{cluster: &drain.LogCluster{Tokens: []string{"pattern", "3"}, Volume: 100, SampleCount: 10}},
{cluster: &drain.LogCluster{Tokens: []string{"pattern", "4"}, Volume: 100, SampleCount: 10}},
}
// With 75% threshold and equal volumes, should keep 3 clusters
result = filterClustersByVolume(clusters, 0.75)
require.Equal(t, 3, len(result), "Should keep exactly 3 clusters for 75% threshold")
})
t.Run("should handle zero volume clusters", func(t *testing.T) {
clusters := []clusterWithMeta{
{cluster: &drain.LogCluster{Tokens: []string{"pattern", "1"}, Volume: 0, SampleCount: 0}},
{cluster: &drain.LogCluster{Tokens: []string{"pattern", "2"}, Volume: 0, SampleCount: 0}},
}
result := filterClustersByVolume(clusters, 0.9)
require.Equal(t, 0, len(result), "Zero-volume clusters should return empty when total volume is 0")
})
t.Run("should handle threshold of 0", func(t *testing.T) {
clusters := []clusterWithMeta{
{cluster: &drain.LogCluster{Tokens: []string{"high", "volume"}, Volume: 500, SampleCount: 50}},
{cluster: &drain.LogCluster{Tokens: []string{"low", "volume"}, Volume: 100, SampleCount: 10}},
}
// Threshold of 0 means keep nothing (0% of volume)
result := filterClustersByVolume(clusters, 0)
require.Equal(t, 0, len(result), "Threshold of 0 should keep no clusters")
})
t.Run("should handle threshold of 1", func(t *testing.T) {
clusters := []clusterWithMeta{
{cluster: &drain.LogCluster{Tokens: []string{"high", "volume"}, Volume: 500, SampleCount: 50}},
{cluster: &drain.LogCluster{Tokens: []string{"medium", "volume"}, Volume: 300, SampleCount: 30}},
{cluster: &drain.LogCluster{Tokens: []string{"low", "volume"}, Volume: 100, SampleCount: 10}},
}
// Threshold of 1 means keep everything (100% of volume)
result := filterClustersByVolume(clusters, 1.0)
require.Equal(t, 3, len(result), "Threshold of 1 should keep all clusters")
})
t.Run("should handle very small threshold values", func(t *testing.T) {
clusters := []clusterWithMeta{
{cluster: &drain.LogCluster{Tokens: []string{"huge", "volume"}, Volume: 10000, SampleCount: 1000}},
{cluster: &drain.LogCluster{Tokens: []string{"tiny", "volume"}, Volume: 1, SampleCount: 1}},
}
// Even very small threshold should include the huge volume cluster
result := filterClustersByVolume(clusters, 0.0001)
require.Equal(t, 1, len(result), "Very small threshold should still include highest volume cluster")
require.Equal(t, int64(10000), result[0].cluster.Volume)
})
t.Run("should maintain stability when clusters have same volume", func(t *testing.T) {
// When clusters have the same volume, the sort is stable and
// filterClustersByVolume should include all clusters until threshold is met
clusters := []clusterWithMeta{
{cluster: &drain.LogCluster{Tokens: []string{"pattern", "A"}, Volume: 200, SampleCount: 20}},
{cluster: &drain.LogCluster{Tokens: []string{"pattern", "B"}, Volume: 200, SampleCount: 20}},
{cluster: &drain.LogCluster{Tokens: []string{"pattern", "C"}, Volume: 100, SampleCount: 10}},
}
// 40% threshold: should include first cluster (200/500 = 40%)
result := filterClustersByVolume(clusters, 0.4)
require.Equal(t, 1, len(result))
require.Equal(t, int64(200), result[0].cluster.Volume)
// Reset for next test
clusters = []clusterWithMeta{
{cluster: &drain.LogCluster{Tokens: []string{"pattern", "A"}, Volume: 200, SampleCount: 20}},
{cluster: &drain.LogCluster{Tokens: []string{"pattern", "B"}, Volume: 200, SampleCount: 20}},
{cluster: &drain.LogCluster{Tokens: []string{"pattern", "C"}, Volume: 100, SampleCount: 10}},
}
// 80% threshold: should include both 200-volume clusters (400/500 = 80%)
result = filterClustersByVolume(clusters, 0.8)
require.Equal(t, 2, len(result))
for i := 0; i < len(result); i++ {
require.Equal(t, int64(200), result[i].cluster.Volume)
}
})
}