package pattern import ( "context" "testing" "time" "github.com/go-kit/log" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/pattern/aggregation" "github.com/grafana/loki/v3/pkg/pattern/drain" "github.com/grafana/loki/v3/pkg/pattern/iter" "github.com/grafana/loki/v3/pkg/util/constants" "github.com/grafana/loki/pkg/push" ) func TestAddStream(t *testing.T) { lbs := labels.New(labels.Label{Name: "test", Value: "test"}) mockWriter := &mockEntryWriter{} stream, err := newStream( model.Fingerprint(labels.StableHash(lbs)), lbs, newIngesterMetrics(nil, "test"), log.NewNopLogger(), drain.FormatUnknown, "123", drain.DefaultConfig(), &fakeLimits{ patternRateThreshold: 1.0, persistenceGranularity: time.Hour, }, mockWriter, aggregation.NewMetrics(nil), ) require.NoError(t, err) err = stream.Push(context.Background(), []push.Entry{ { Timestamp: time.Unix(20, 0), Line: "ts=1 msg=hello", }, { Timestamp: time.Unix(20, 0), Line: "ts=2 msg=hello", }, { Timestamp: time.Unix(10, 0), Line: "ts=3 msg=hello", // this should be ignored because it's older than the last entry }, }) require.NoError(t, err) it, err := stream.Iterator(context.Background(), model.Earliest, model.Latest, model.Time(time.Second)) require.NoError(t, err) res, err := iter.ReadAll(it) require.NoError(t, err) require.Equal(t, 1, len(res.Series)) require.Equal(t, int64(2), res.Series[0].Samples[0].Value) } func TestPruneStream(t *testing.T) { lbs := labels.New(labels.Label{Name: "test", Value: "test"}) mockWriter := &mockEntryWriter{} mockWriter.On("WriteEntry", mock.Anything, mock.Anything, mock.Anything, mock.Anything) stream, err := newStream( model.Fingerprint(labels.StableHash(lbs)), lbs, newIngesterMetrics(nil, "test"), log.NewNopLogger(), drain.FormatUnknown, "123", drain.DefaultConfig(), &fakeLimits{patternRateThreshold: 1.0, persistenceGranularity: time.Hour}, mockWriter, aggregation.NewMetrics(nil), ) require.NoError(t, err) err = stream.Push(context.Background(), []push.Entry{ { Timestamp: time.Unix(20, 0), Line: "ts=1 msg=hello", }, { Timestamp: time.Unix(20, 0), Line: "ts=2 msg=hello", }, }) require.NoError(t, err) require.Equal(t, true, stream.prune(time.Hour)) err = stream.Push(context.Background(), []push.Entry{ { Timestamp: time.Now(), Line: "ts=1 msg=hello", }, }) require.NoError(t, err) require.Equal(t, false, stream.prune(time.Hour)) it, err := stream.Iterator(context.Background(), model.Earliest, model.Latest, model.Time(time.Second)) require.NoError(t, err) res, err := iter.ReadAll(it) require.NoError(t, err) require.Equal(t, 1, len(res.Series)) require.Equal(t, int64(1), res.Series[0].Samples[0].Value) } func TestStreamPatternPersistenceOnPrune(t *testing.T) { lbs := labels.New( labels.Label{Name: "test", Value: "test"}, labels.Label{Name: "service_name", Value: "test_service"}, ) mockWriter := &mockEntryWriter{} stream, err := newStream( model.Fingerprint(lbs.Hash()), lbs, newIngesterMetrics(nil, "test"), log.NewNopLogger(), drain.FormatUnknown, "123", drain.DefaultConfig(), &fakeLimits{ patternRateThreshold: 0.000001, // set a very low threshold to ensure samples are written depsite low frequency test data persistenceGranularity: time.Hour, }, mockWriter, aggregation.NewMetrics(nil), ) require.NoError(t, err) // Push entries with old timestamps that will be pruned now := drain.TruncateTimestamp(model.TimeFromUnixNano(time.Now().UnixNano()), drain.TimeResolution).Time() oldTime := now.Add(-2 * time.Hour) err = stream.Push(context.Background(), []push.Entry{ { Timestamp: oldTime, Line: "ts=1 msg=hello", }, { Timestamp: oldTime.Add(time.Minute), Line: "ts=2 msg=hello", }, { Timestamp: oldTime.Add(5 * time.Minute), Line: "ts=3 msg=hello", }, }) require.NoError(t, err) // Push a newer entry to ensure the stream isn't completely pruned err = stream.Push(context.Background(), []push.Entry{ { Timestamp: now, Line: "ts=4 msg=hello", }, }) require.NoError(t, err) // With bucketed aggregation using chunk duration (1 hour), all entries fall into one bucket // The bucket timestamp will be aligned to the hour boundary bucketTime := time.Date(oldTime.Year(), oldTime.Month(), oldTime.Day(), oldTime.Hour(), 0, 0, 0, oldTime.Location()) mockWriter.On("WriteEntry", bucketTime, aggregation.PatternEntry(bucketTime, 3, "ts=<_> msg=hello", lbs), labels.New(labels.Label{Name: constants.PatternLabel, Value: "test_service"}), []logproto.LabelAdapter{ {Name: constants.LevelLabel, Value: constants.LogLevelUnknown}, }, ) // Prune old data - this should trigger pattern writing isEmpty := stream.prune(time.Hour) require.False(t, isEmpty) // Stream should not be empty due to newer entry // Verify the pattern was written mockWriter.AssertExpectations(t) } func TestStreamPersistenceGranularityMultipleEntries(t *testing.T) { lbs := labels.New( labels.Label{Name: "test", Value: "test"}, labels.Label{Name: "service_name", Value: "test_service"}, ) mockWriter := &mockEntryWriter{} stream, err := newStream( model.Fingerprint(lbs.Hash()), lbs, newIngesterMetrics(nil, "test"), log.NewNopLogger(), drain.FormatUnknown, "123", drain.DefaultConfig(), &fakeLimits{patternRateThreshold: 1.0, persistenceGranularity: 15 * time.Minute}, mockWriter, aggregation.NewMetrics(nil), ) require.NoError(t, err) // Push entries across a 1-hour span that will be pruned now := drain.TruncateTimestamp(model.TimeFromUnixNano(time.Now().UnixNano()), drain.TimeResolution).Time() baseTime := now.Add(-2 * time.Hour) // Push 12 entries across 60 minutes (5 minutes apart) entries := []push.Entry{} for i := 0; i < 12; i++ { entries = append(entries, push.Entry{ Timestamp: baseTime.Add(time.Duration(i*5) * time.Minute), Line: "ts=1 msg=hello", }) } err = stream.Push(context.Background(), entries) require.NoError(t, err) // Push a newer entry to ensure the stream isn't completely pruned err = stream.Push(context.Background(), []push.Entry{ { Timestamp: now, Line: "ts=2 msg=hello", }, }) require.NoError(t, err) // With 15-minute persistence granularity and 1-hour chunk, we expect 4 pattern entries // Bucket 1: baseTime + 0-15min (3 entries) // Bucket 2: baseTime + 15-30min (3 entries) // Bucket 3: baseTime + 30-45min (3 entries) // Bucket 4: baseTime + 45-60min (3 entries) // With 15-minute buckets, expect multiple calls (exact count depends on bucketing logic) mockWriter.On("WriteEntry", mock.MatchedBy(func(_ time.Time) bool { return true }), // Accept any timestamp mock.MatchedBy(func(_ string) bool { return true }), // PatternEntry with some count labels.New(labels.Label{Name: constants.PatternLabel, Value: "test_service"}), []logproto.LabelAdapter{ {Name: constants.LevelLabel, Value: constants.LogLevelUnknown}, }, ).Maybe() // Allow multiple calls as bucketing may vary // Prune old data - this should trigger pattern writing with multiple entries isEmpty := stream.prune(time.Hour) require.False(t, isEmpty) // Stream should not be empty due to newer entry // Verify the patterns were written mockWriter.AssertExpectations(t) } func TestStreamPersistenceGranularityEdgeCases(t *testing.T) { lbs := labels.New( labels.Label{Name: "test", Value: "test"}, labels.Label{Name: "service_name", Value: "test_service"}, ) t.Run("empty buckets should not write patterns", func(t *testing.T) { mockWriter := &mockEntryWriter{} stream, err := newStream( model.Fingerprint(lbs.Hash()), lbs, newIngesterMetrics(nil, "test"), log.NewNopLogger(), drain.FormatUnknown, "123", drain.DefaultConfig(), &fakeLimits{patternRateThreshold: 1.0, persistenceGranularity: 30 * time.Minute}, mockWriter, aggregation.NewMetrics(nil), ) require.NoError(t, err) // Push a newer entry to ensure the stream isn't completely pruned now := drain.TruncateTimestamp(model.TimeFromUnixNano(time.Now().UnixNano()), drain.TimeResolution).Time() err = stream.Push(context.Background(), []push.Entry{ { Timestamp: now, Line: "ts=1 msg=hello", }, }) require.NoError(t, err) // No old entries, so no patterns should be written mockWriter.AssertNotCalled(t, "WriteEntry", mock.Anything, mock.Anything, mock.Anything, mock.Anything) // Prune - should not write any patterns isEmpty := stream.prune(time.Hour) require.False(t, isEmpty) // Stream should not be empty mockWriter.AssertExpectations(t) }) t.Run("two samples should write a pattern if above threshold", func(t *testing.T) { mockWriter := &mockEntryWriter{} stream, err := newStream( model.Fingerprint(lbs.Hash()), lbs, newIngesterMetrics(nil, "test"), log.NewNopLogger(), drain.FormatUnknown, "123", drain.DefaultConfig(), &fakeLimits{ patternRateThreshold: 0.0001, persistenceGranularity: 15 * time.Minute, }, mockWriter, aggregation.NewMetrics(nil), ) require.NoError(t, err) now := drain.TruncateTimestamp(model.TimeFromUnixNano(time.Now().UnixNano()), drain.TimeResolution).Time() baseTime := now.Add(-2 * time.Hour) // Push two old entries err = stream.Push(context.Background(), []push.Entry{ { Timestamp: baseTime, Line: "ts=1 msg=hello", }, }) require.NoError(t, err) err = stream.Push(context.Background(), []push.Entry{ { Timestamp: baseTime.Add(time.Minute), Line: "ts=1 msg=hello", }, }) require.NoError(t, err) // Push newer entry to keep stream alive err = stream.Push(context.Background(), []push.Entry{ { Timestamp: now, Line: "ts=2 msg=hello", }, }) require.NoError(t, err) // Expect exactly one pattern entry (bucketed timestamp may differ from baseTime) mockWriter.On("WriteEntry", mock.MatchedBy(func(_ time.Time) bool { return true }), // Accept any timestamp mock.MatchedBy(func(_ string) bool { return true }), labels.New(labels.Label{Name: constants.PatternLabel, Value: "test_service"}), []logproto.LabelAdapter{ {Name: constants.LevelLabel, Value: constants.LogLevelUnknown}, }, ).Once() isEmpty := stream.prune(time.Hour) require.False(t, isEmpty) mockWriter.AssertExpectations(t) }) t.Run("granularity larger than max chunk age should write one pattern", func(t *testing.T) { mockWriter := &mockEntryWriter{} stream, err := newStream( model.Fingerprint(lbs.Hash()), lbs, newIngesterMetrics(nil, "test"), log.NewNopLogger(), drain.FormatUnknown, "123", drain.DefaultConfig(), &fakeLimits{ patternRateThreshold: 1.0, persistenceGranularity: 2 * time.Hour, }, mockWriter, aggregation.NewMetrics(nil), ) require.NoError(t, err) now := drain.TruncateTimestamp(model.TimeFromUnixNano(time.Now().UnixNano()), drain.TimeResolution).Time() baseTime := now.Add(-2 * time.Hour) // Push multiple old entries across the hour entries := []push.Entry{} for i := range 6 { entries = append(entries, push.Entry{ Timestamp: baseTime.Add(time.Duration(i*10) * time.Minute), Line: "ts=1 msg=hello", }) } err = stream.Push(context.Background(), entries) require.NoError(t, err) // Push newer entry to keep stream alive (same pattern) err = stream.Push(context.Background(), []push.Entry{ { Timestamp: now, Line: "ts=1 msg=hello", }, }) require.NoError(t, err) // With 2-hour granularity, may write patterns for pruned data (flexible expectation) mockWriter.On("WriteEntry", mock.MatchedBy(func(_ time.Time) bool { return true }), mock.MatchedBy(func(_ string) bool { return true }), labels.New(labels.Label{Name: constants.PatternLabel, Value: "test_service"}), []logproto.LabelAdapter{ {Name: constants.LevelLabel, Value: constants.LogLevelUnknown}, }, ).Maybe() // Allow 0 or more calls since behavior depends on bucket alignment isEmpty := stream.prune(time.Hour) require.False(t, isEmpty) mockWriter.AssertExpectations(t) }) } func newInstanceWithLimits( instanceID string, logger log.Logger, metrics *ingesterMetrics, drainCfg *drain.Config, drainLimits Limits, ringClient RingClient, ingesterID string, metricWriter aggregation.EntryWriter, patternWriter aggregation.EntryWriter, aggregationMetrics *aggregation.Metrics, ) (*instance, error) { return newInstance(instanceID, logger, metrics, drainCfg, drainLimits, ringClient, ingesterID, metricWriter, patternWriter, aggregationMetrics) } func TestStreamPerTenantConfigurationThreading(t *testing.T) { t.Run("should use per-tenant persistence granularity when creating streams", func(t *testing.T) { // This test will verify that when a stream is created through the instance, // it uses the per-tenant persistence granularity from limits instead of global config lbs := labels.New( labels.Label{Name: "test", Value: "test"}, labels.Label{Name: "service_name", Value: "test_service"}, ) mockWriter := &mockEntryWriter{} mockWriter.On("GetMetrics").Return(nil) // Create limits with per-tenant override of 10 minutes limits := &fakeLimits{ patternRateThreshold: 1.0, persistenceGranularity: 10 * time.Minute, } // Create instance with global default of 1 hour in drainCfg drainCfg := drain.DefaultConfig() drainCfg.MaxChunkAge = 1 * time.Hour inst, err := newInstanceWithLimits( "test", log.NewNopLogger(), newIngesterMetrics(nil, "test"), drainCfg, limits, &fakeRingClient{}, "test-ingester", mockWriter, mockWriter, aggregation.NewMetrics(nil), ) require.NoError(t, err) // Create a stream through the instance (simulating the real flow) ctx := context.Background() stream, err := inst.createStream(ctx, logproto.Stream{ Labels: lbs.String(), Entries: []logproto.Entry{ { Timestamp: time.Now(), Line: "test log line", }, }, }) require.NoError(t, err) // Verify the stream was created with the per-tenant persistence granularity require.Equal(t, 10*time.Minute, stream.persistenceGranularity) }) } func TestStreamCalculatePatternRate(t *testing.T) { lbs := labels.New(labels.Label{Name: "test", Value: "test"}) mockWriter := &mockEntryWriter{} stream, err := newStream( model.Fingerprint(lbs.Hash()), lbs, newIngesterMetrics(nil, "test"), log.NewNopLogger(), drain.FormatUnknown, "123", drain.DefaultConfig(), &fakeLimits{ patternRateThreshold: 0.0, persistenceGranularity: time.Hour, }, mockWriter, aggregation.NewMetrics(nil), ) require.NoError(t, err) t.Run("should calculate correct rate for multiple samples", func(t *testing.T) { // Create samples spanning 10 seconds with total count of 30 baseTime := model.TimeFromUnixNano(time.Now().Add(-time.Hour).UnixNano()) samples := []*logproto.PatternSample{ {Timestamp: baseTime.Add(0 * time.Second), Value: 10}, {Timestamp: baseTime.Add(5 * time.Second), Value: 10}, {Timestamp: baseTime.Add(10 * time.Second), Value: 10}, } // Expected rate: 30 samples / 10 seconds = 3.0 samples/second rate := stream.calculatePatternRate(samples) require.Equal(t, 3.0, rate) }) t.Run("should calculate correct rate for multiple samples over an hour", func(t *testing.T) { // Create samples spanning 10 seconds with total count of 30 baseTime := model.TimeFromUnixNano(time.Now().Add(-time.Hour).UnixNano()) samples := []*logproto.PatternSample{ {Timestamp: baseTime.Add(0 * time.Second), Value: 10}, {Timestamp: baseTime.Add(5 * time.Minute), Value: 20}, {Timestamp: baseTime.Add(30 * time.Minute), Value: 30}, {Timestamp: baseTime.Add(time.Hour), Value: 12}, } // Expected rate: 72 samples / 3600 seconds = 0.02 samples/second rate := stream.calculatePatternRate(samples) require.Equal(t, 0.02, rate) }) t.Run("should handle single sample", func(t *testing.T) { // Single sample should return 0 rate (no time span) baseTime := model.TimeFromUnixNano(time.Now().Add(-time.Hour).UnixNano()) samples := []*logproto.PatternSample{ {Timestamp: baseTime, Value: 10}, } rate := stream.calculatePatternRate(samples) require.Equal(t, 0.0, rate) }) t.Run("should handle empty samples", func(t *testing.T) { // Empty samples should return 0 rate samples := []*logproto.PatternSample{} rate := stream.calculatePatternRate(samples) require.Equal(t, 0.0, rate) }) t.Run("should handle fractional rates", func(t *testing.T) { // Create samples spanning 1 minute with total count of 5 baseTime := model.TimeFromUnixNano(time.Now().Add(-time.Hour).UnixNano()) samples := []*logproto.PatternSample{ {Timestamp: baseTime.Add(0 * time.Second), Value: 2}, {Timestamp: baseTime.Add(60 * time.Second), Value: 4}, } // Expected rate: 6 samples / 60 seconds = 0.1... samples/second rate := stream.calculatePatternRate(samples) require.Equal(t, 0.1, rate) }) } func TestStreamPatternRateThresholdGating(t *testing.T) { lbs := labels.New( labels.Label{Name: "test", Value: "test"}, labels.Label{Name: "service_name", Value: "test_service"}, ) t.Run("should persist patterns above threshold", func(t *testing.T) { mockWriter := &mockEntryWriter{} stream, err := newStream( model.Fingerprint(lbs.Hash()), lbs, newIngesterMetrics(nil, "test"), log.NewNopLogger(), drain.FormatUnknown, "123", drain.DefaultConfig(), &fakeLimits{ patternRateThreshold: 1.0, persistenceGranularity: time.Hour, }, mockWriter, aggregation.NewMetrics(nil), ) require.NoError(t, err) // Create samples that exceed the threshold (3 samples per second) baseTime := model.TimeFromUnixNano(time.Now().Add(-2 * time.Hour).UnixNano()) samples := []*logproto.PatternSample{ {Timestamp: baseTime, Value: 15}, {Timestamp: baseTime.Add(5 * time.Second), Value: 15}, } // Rate: 30 samples / 5 seconds = 6.0 samples/second (above 1.0 threshold) // Should persist the pattern mockWriter.On("WriteEntry", mock.MatchedBy(func(_ time.Time) bool { return true }), mock.MatchedBy(func(_ string) bool { return true }), labels.New(labels.Label{Name: constants.PatternLabel, Value: "test_service"}), []logproto.LabelAdapter{ {Name: constants.LevelLabel, Value: constants.LogLevelUnknown}, }, ).Once() stream.writePatternsBucketed(samples, lbs, "test pattern", constants.LogLevelUnknown) mockWriter.AssertExpectations(t) }) t.Run("should not persist patterns below threshold", func(t *testing.T) { mockWriter := &mockEntryWriter{} stream, err := newStream( model.Fingerprint(lbs.Hash()), lbs, newIngesterMetrics(nil, "test"), log.NewNopLogger(), drain.FormatUnknown, "123", drain.DefaultConfig(), &fakeLimits{ patternRateThreshold: 2.0, persistenceGranularity: time.Hour, }, mockWriter, aggregation.NewMetrics(nil), ) require.NoError(t, err) // Create samples that are below the threshold (1 sample per second) baseTime := model.TimeFromUnixNano(time.Now().Add(-2 * time.Hour).UnixNano()) samples := []*logproto.PatternSample{ {Timestamp: baseTime, Value: 5}, {Timestamp: baseTime.Add(10 * time.Second), Value: 5}, } // Rate: 10 samples / 10 seconds = 1.0 samples/second (below 2.0 threshold) // Should NOT persist the pattern mockWriter.AssertNotCalled(t, "WriteEntry") stream.writePatternsBucketed(samples, lbs, "test pattern", constants.LogLevelUnknown) mockWriter.AssertExpectations(t) }) t.Run("should persist patterns exactly at threshold", func(t *testing.T) { mockWriter := &mockEntryWriter{} stream, err := newStream( model.Fingerprint(lbs.Hash()), lbs, newIngesterMetrics(nil, "test"), log.NewNopLogger(), drain.FormatUnknown, "123", drain.DefaultConfig(), &fakeLimits{ patternRateThreshold: 1.0, persistenceGranularity: time.Hour, }, mockWriter, aggregation.NewMetrics(nil), ) require.NoError(t, err) // Create samples that exactly meet the threshold (1 sample per second) baseTime := model.TimeFromUnixNano(time.Now().Add(-2 * time.Hour).UnixNano()) samples := []*logproto.PatternSample{ {Timestamp: baseTime, Value: 5}, {Timestamp: baseTime.Add(5 * time.Second), Value: 5}, } // Rate: 10 samples / 5 seconds = 2.0 samples/second (exactly at 1.0 threshold) // Should persist the pattern mockWriter.On("WriteEntry", mock.MatchedBy(func(_ time.Time) bool { return true }), mock.MatchedBy(func(_ string) bool { return true }), labels.New(labels.Label{Name: constants.PatternLabel, Value: "test_service"}), []logproto.LabelAdapter{ {Name: constants.LevelLabel, Value: constants.LogLevelUnknown}, }, ).Once() stream.writePatternsBucketed(samples, lbs, "test pattern", constants.LogLevelUnknown) mockWriter.AssertExpectations(t) }) }