Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/pattern/ingester_test.go

925 lines
24 KiB

package pattern
import (
"context"
"flag"
"fmt"
"math"
"math/rand"
"strings"
"testing"
"time"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/ring"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/pattern/aggregation"
"github.com/grafana/loki/v3/pkg/pattern/iter"
"github.com/grafana/loki/v3/pkg/util/constants"
"github.com/grafana/loki/v3/pkg/pattern/drain"
"github.com/grafana/loki/pkg/push"
)
func TestInstancePushQuery(t *testing.T) {
lbs := labels.FromStrings("test", "test", "service_name", "test_service")
now := drain.TruncateTimestamp(model.Now(), drain.TimeResolution)
ingesterID := "foo"
replicationSet := ring.ReplicationSet{
Instances: []ring.InstanceDesc{
{Id: ingesterID, Addr: "ingester0"},
{Id: "bar", Addr: "ingester1"},
{Id: "baz", Addr: "ingester2"},
},
}
fakeRing := &fakeRing{}
fakeRing.On("Get", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(replicationSet, nil)
ringClient := &fakeRingClient{
ring: fakeRing,
}
mockWriter := &mockEntryWriter{}
mockWriter.On("WriteEntry", mock.Anything, mock.Anything, mock.Anything, mock.Anything)
inst, err := newInstance(
"foo",
log.NewNopLogger(),
newIngesterMetrics(nil, "test"),
drain.DefaultConfig(),
&fakeLimits{},
ringClient,
ingesterID,
mockWriter,
mockWriter,
aggregation.NewMetrics(nil),
0.99,
)
require.NoError(t, err)
err = inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
{
Labels: lbs.String(),
Entries: []push.Entry{
{
Timestamp: now.Time(),
Line: "ts=1 msg=hello",
StructuredMetadata: push.LabelsAdapter{
push.LabelAdapter{
Name: constants.LevelLabel,
Value: constants.LogLevelInfo,
},
},
},
},
},
},
})
require.NoError(t, err)
for i := 0; i <= 30; i++ {
foo := "bar"
if i%2 != 0 {
foo = "baz"
}
err = inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
{
Labels: lbs.String(),
Entries: []push.Entry{
{
Timestamp: now.Add(time.Duration(i) * time.Second).Time(),
Line: fmt.Sprintf("foo=%s num=%d", foo, rand.Int()),
},
},
},
},
})
require.NoError(t, err)
}
err = inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
{
Labels: lbs.String(),
Entries: []push.Entry{
{
Timestamp: now.Add(1 * time.Minute).Time(),
Line: "ts=2 msg=hello",
StructuredMetadata: push.LabelsAdapter{
push.LabelAdapter{
Name: constants.LevelLabel,
Value: constants.LogLevelInfo,
},
},
},
},
},
},
})
require.NoError(t, err)
it, err := inst.Iterator(context.Background(), &logproto.QueryPatternsRequest{
Query: "{test=\"test\"}",
Start: time.Unix(0, 0),
End: time.Unix(0, math.MaxInt64),
})
require.NoError(t, err)
res, err := iter.ReadAll(it)
require.NoError(t, err)
require.Equal(t, 3, len(res.Series))
patterns := make([]string, 0, 3)
for _, it := range res.Series {
patterns = append(patterns, it.Pattern)
}
require.ElementsMatch(t, []string{
"foo=bar num=<_>",
"foo=baz num=<_>",
"ts=<_> msg=hello",
}, patterns)
}
func TestInstancePushAggregateMetrics(t *testing.T) {
lbs := labels.New(
labels.Label{Name: "test", Value: "test"},
labels.Label{Name: "service_name", Value: "test_service"},
)
lbs2 := labels.New(
labels.Label{Name: "foo", Value: "bar"},
labels.Label{Name: "service_name", Value: "foo_service"},
)
lbs3 := labels.New(
labels.Label{Name: "foo", Value: "baz"},
labels.Label{Name: "service_name", Value: "baz_service"},
)
setup := func(now time.Time) (*instance, *mockEntryWriter) {
ingesterID := "foo"
replicationSet := ring.ReplicationSet{
Instances: []ring.InstanceDesc{
{Id: ingesterID, Addr: "ingester0"},
{Id: "bar", Addr: "ingester1"},
{Id: "baz", Addr: "ingester2"},
},
}
fakeRing := &fakeRing{}
fakeRing.On("Get", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(replicationSet, nil)
ringClient := &fakeRingClient{
ring: fakeRing,
}
mockWriter := &mockEntryWriter{}
mockWriter.On("WriteEntry", mock.Anything, mock.Anything, mock.Anything, mock.Anything)
inst, err := newInstance(
"foo",
log.NewNopLogger(),
newIngesterMetrics(nil, "test"),
drain.DefaultConfig(),
&fakeLimits{},
ringClient,
ingesterID,
mockWriter,
mockWriter,
aggregation.NewMetrics(nil),
0.99,
)
require.NoError(t, err)
err = inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
{
Labels: lbs.String(),
Entries: []push.Entry{
{
Timestamp: now.Add(-1 * time.Minute),
Line: "ts=1 msg=hello",
StructuredMetadata: push.LabelsAdapter{
push.LabelAdapter{
Name: constants.LevelLabel,
Value: "info",
},
},
},
},
},
{
Labels: lbs2.String(),
Entries: []push.Entry{
{
Timestamp: now.Add(-1 * time.Minute),
Line: fmt.Sprintf("ts=%d msg=hello", rand.Intn(9)),
StructuredMetadata: push.LabelsAdapter{
push.LabelAdapter{
Name: constants.LevelLabel,
Value: "error",
},
},
},
},
},
{
Labels: lbs3.String(),
Entries: []push.Entry{
{
Timestamp: now.Add(-1 * time.Minute),
Line: "error error error",
StructuredMetadata: push.LabelsAdapter{
push.LabelAdapter{
Name: constants.LevelLabel,
Value: "error",
},
},
},
},
},
},
})
for i := 0; i < 30; i++ {
err = inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
{
Labels: lbs.String(),
Entries: []push.Entry{
{
Timestamp: now.Add(-1 * time.Duration(i) * time.Second),
Line: "foo=bar baz=qux",
StructuredMetadata: push.LabelsAdapter{
push.LabelAdapter{
Name: constants.LevelLabel,
Value: "info",
},
},
},
},
},
{
Labels: lbs2.String(),
Entries: []push.Entry{
{
Timestamp: now.Add(-1 * time.Duration(i) * time.Second),
Line: "foo=bar baz=qux",
StructuredMetadata: push.LabelsAdapter{
push.LabelAdapter{
Name: constants.LevelLabel,
Value: "error",
},
},
},
},
},
},
})
require.NoError(t, err)
}
require.NoError(t, err)
return inst, mockWriter
}
t.Run("accumulates bytes and count for each stream and level on every push", func(t *testing.T) {
now := time.Now()
inst, _ := setup(now)
require.Len(t, inst.aggMetricsByStreamAndLevel, 3)
require.Equal(t, uint64(14+(15*30)), inst.aggMetricsByStreamAndLevel[lbs.String()]["info"].bytes)
require.Equal(t, uint64(14+(15*30)), inst.aggMetricsByStreamAndLevel[lbs2.String()]["error"].bytes)
require.Equal(t, uint64(17), inst.aggMetricsByStreamAndLevel[lbs3.String()]["error"].bytes)
require.Equal(
t,
uint64(31),
inst.aggMetricsByStreamAndLevel[lbs.String()]["info"].count,
)
require.Equal(
t,
uint64(31),
inst.aggMetricsByStreamAndLevel[lbs2.String()]["error"].count,
)
require.Equal(
t,
uint64(1),
inst.aggMetricsByStreamAndLevel[lbs3.String()]["error"].count,
)
})
t.Run("downsamples aggregated metrics", func(t *testing.T) {
now := model.Now()
inst, mockWriter := setup(now.Time())
inst.Downsample(now)
mockWriter.AssertCalled(
t,
"WriteEntry",
now.Time(),
aggregation.AggregatedMetricEntry(
now,
uint64(14+(15*30)),
uint64(31),
lbs,
),
labels.New(
labels.Label{Name: constants.AggregatedMetricLabel, Value: "test_service"},
),
[]logproto.LabelAdapter{
{Name: constants.LevelLabel, Value: constants.LogLevelInfo},
},
)
mockWriter.AssertCalled(
t,
"WriteEntry",
now.Time(),
aggregation.AggregatedMetricEntry(
now,
uint64(14+(15*30)),
uint64(31),
lbs2,
),
labels.New(
labels.Label{Name: constants.AggregatedMetricLabel, Value: "foo_service"},
),
[]logproto.LabelAdapter{
{Name: constants.LevelLabel, Value: constants.LogLevelError},
},
)
mockWriter.AssertCalled(
t,
"WriteEntry",
now.Time(),
aggregation.AggregatedMetricEntry(
now,
uint64(17),
uint64(1),
lbs3,
),
labels.New(
labels.Label{Name: constants.AggregatedMetricLabel, Value: "baz_service"},
),
[]logproto.LabelAdapter{
{Name: constants.LevelLabel, Value: constants.LogLevelError},
},
)
require.Equal(t, 0, len(inst.aggMetricsByStreamAndLevel))
})
}
type mockEntryWriter struct {
mock.Mock
}
func (m *mockEntryWriter) WriteEntry(ts time.Time, entry string, lbls labels.Labels, structuredMetadata []logproto.LabelAdapter) {
_ = m.Called(ts, entry, lbls, structuredMetadata)
}
func (m *mockEntryWriter) Stop() {
_ = m.Called()
}
type fakeLimits struct {
metricAggregationEnabled bool
patternPersistenceEnabled bool
persistenceGranularity time.Duration
patternRateThreshold float64
}
var _ drain.Limits = &fakeLimits{}
var _ Limits = &fakeLimits{}
func (f *fakeLimits) PatternIngesterTokenizableJSONFields(_ string) []string {
return []string{"log", "message", "msg", "msg_", "_msg", "content"}
}
func (f *fakeLimits) MetricAggregationEnabled(_ string) bool {
return f.metricAggregationEnabled
}
func (f *fakeLimits) PatternPersistenceEnabled(_ string) bool {
return f.patternPersistenceEnabled
}
func (f *fakeLimits) PersistenceGranularity(_ string) time.Duration {
return f.persistenceGranularity
}
func (f *fakeLimits) PatternRateThreshold(_ string) float64 {
return f.patternRateThreshold
}
func TestIngesterShutdownFlush(t *testing.T) {
lbs := labels.FromStrings("test", "test", "service_name", "test_service")
now := model.Now()
ingesterID := "foo"
replicationSet := ring.ReplicationSet{
Instances: []ring.InstanceDesc{
{Id: ingesterID, Addr: "ingester0"},
{Id: "bar", Addr: "ingester1"},
{Id: "baz", Addr: "ingester2"},
},
}
fakeRing := &fakeRing{}
fakeRing.On("Get", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(replicationSet, nil)
ringClient := &fakeRingClient{
ring: fakeRing,
}
mockWriter := &mockEntryWriter{}
mockWriter.On("WriteEntry", mock.Anything, mock.Anything, mock.Anything, mock.Anything)
mockWriter.On("Stop")
inst, err := newInstance(
"foo",
log.NewNopLogger(),
newIngesterMetrics(nil, "test"),
drain.DefaultConfig(),
&fakeLimits{patternPersistenceEnabled: true},
ringClient,
ingesterID,
mockWriter,
mockWriter,
aggregation.NewMetrics(nil),
0.99,
)
require.NoError(t, err)
// Push some log entries to create patterns (use older timestamps)
oldTime := now.Add(-2 * time.Hour)
err = inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
{
Labels: lbs.String(),
Entries: []push.Entry{
{
Timestamp: oldTime.Time(),
Line: "info: user logged in",
},
{
Timestamp: oldTime.Add(time.Second).Time(),
Line: "info: user logged out",
},
},
},
},
})
require.NoError(t, err)
// Verify patterns exist before shutdown
it, err := inst.Iterator(context.Background(), &logproto.QueryPatternsRequest{
Query: "{test=\"test\"}",
Start: time.Unix(0, 0),
End: time.Unix(0, math.MaxInt64),
})
require.NoError(t, err)
res, err := iter.ReadAll(it)
require.NoError(t, err)
require.Greater(t, len(res.Series), 0, "should have patterns before shutdown")
// To isolate that the WriteEntry was called during flush
mockWriter.AssertNotCalled(t, "WriteEntry", mock.Anything, mock.Anything, mock.Anything, mock.Anything)
// Flush patterns (simulating shutdown)
inst.flushPatterns()
// Verify WriteEntry was called during flush
mockWriter.AssertCalled(t, "WriteEntry", mock.Anything, mock.Anything, mock.Anything, mock.Anything)
}
func TestIngesterShutdownFlushMaintainsChunkBoundaries(t *testing.T) {
lbs := labels.FromStrings("test", "test", "service_name", "test_service")
ingesterID := "foo"
replicationSet := ring.ReplicationSet{
Instances: []ring.InstanceDesc{
{Id: ingesterID, Addr: "ingester0"},
},
}
fakeRing := &fakeRing{}
fakeRing.On("Get", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(replicationSet, nil)
ringClient := &fakeRingClient{
ring: fakeRing,
}
mockWriter := &mockEntryWriter{}
// Track WriteEntry calls to verify chunk boundaries
var writeEntryCalls []mock.Call
mockWriter.On("WriteEntry", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
writeEntryCalls = append(writeEntryCalls, mock.Call{Arguments: args})
})
mockWriter.On("Stop")
inst, err := newInstance(
"foo",
log.NewNopLogger(),
newIngesterMetrics(nil, "test"),
drain.DefaultConfig(),
&fakeLimits{patternPersistenceEnabled: true},
ringClient,
ingesterID,
mockWriter,
mockWriter,
aggregation.NewMetrics(nil),
0.99,
)
require.NoError(t, err)
// Push log entries across different time windows to create multiple chunks
baseTime := time.Now()
err = inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
{
Labels: lbs.String(),
Entries: []push.Entry{
{
Timestamp: baseTime,
Line: "info: user action completed",
},
{
Timestamp: baseTime.Add(2 * time.Hour), // Force new chunk (> 1 hour)
Line: "info: user action completed",
},
},
},
},
})
require.NoError(t, err)
// To isolate that the WriteEntry was called during flush
mockWriter.AssertNotCalled(t, "WriteEntry", mock.Anything, mock.Anything, mock.Anything, mock.Anything)
// Flush patterns (simulating shutdown)
inst.flushPatterns()
// Verify WriteEntry was called multiple times (once per chunk)
// With 2 entries across 2 hour windows, we should get 2 pattern writes
require.Greater(t, len(writeEntryCalls), 0, "should have written patterns during shutdown")
// Verify each call has the expected pattern (URL-encoded in the entry)
for _, call := range writeEntryCalls {
entry := call.Arguments[1].(string)
require.Contains(t, entry, "detected_pattern=", "should contain detected_pattern field")
require.Contains(t, entry, "info", "should contain pattern info")
}
}
func TestConfigChunkDuration(t *testing.T) {
t.Run("should have default value of 1 hour", func(t *testing.T) {
cfg := Config{}
flagext.DefaultValues(&cfg)
require.Equal(t, time.Hour, cfg.MaxChunkAge, "ChunkDuration should default to 1 hour")
})
t.Run("should register max-chunk-age flag", func(t *testing.T) {
cfg := Config{}
fs := flag.NewFlagSet("test", flag.ContinueOnError)
cfg.RegisterFlags(fs)
// Verify the flag was registered
maxChunkAgeFlag := fs.Lookup("pattern-ingester.max-chunk-age")
require.NotNil(t, maxChunkAgeFlag, "pattern-ingester.max-chunk-age flag should be registered")
require.Equal(t, "1h0m0s", maxChunkAgeFlag.DefValue, "flag should have default value of 1h0m0s")
})
}
func TestConfigPatternSampleInterval(t *testing.T) {
t.Run("should have default value of 10 seconds", func(t *testing.T) {
cfg := Config{}
flagext.DefaultValues(&cfg)
require.Equal(t, 10*time.Second, cfg.PatternSampleInterval, "PatternSampleInterval should default to 10 seconds")
})
t.Run("should register sample-interval flag", func(t *testing.T) {
cfg := Config{}
fs := flag.NewFlagSet("test", flag.ContinueOnError)
cfg.RegisterFlags(fs)
// Verify the flag was registered
sampleIntervalFlag := fs.Lookup("pattern-ingester.sample-interval")
require.NotNil(t, sampleIntervalFlag, "pattern-ingester.sample-interval flag should be registered")
require.Equal(t, "10s", sampleIntervalFlag.DefValue, "flag should have default value of 10s")
})
}
func TestConfigurationPropagation(t *testing.T) {
t.Run("should propagate ChunkDuration and PatternSampleInterval to drain config", func(t *testing.T) {
cfg := Config{}
flagext.DefaultValues(&cfg)
// Set custom values
cfg.MaxChunkAge = 30 * time.Minute
cfg.PatternSampleInterval = 5 * time.Second
// Create ingester
ingester, err := New(
cfg,
&fakeLimits{},
&fakeRingClient{},
"test",
prometheus.NewRegistry(),
log.NewNopLogger(),
)
require.NoError(t, err)
// Verify that the drain config has the correct values
require.Equal(t, 30*time.Minute, ingester.drainCfg.MaxChunkAge, "ChunkDuration should be propagated to drain config")
require.Equal(t, 5*time.Second, ingester.drainCfg.SampleInterval, "PatternSampleInterval should be propagated to drain config")
})
}
func TestPerTenantPersistenceGranularity(t *testing.T) {
t.Run("should use per-tenant persistence granularity override", func(t *testing.T) {
cfg := Config{}
flagext.DefaultValues(&cfg)
// Create limits with per-tenant override
limits := &fakeLimits{
persistenceGranularity: 15 * time.Minute, // Override for this tenant
}
// Create ingester
ingester, err := New(
cfg,
limits,
&fakeRingClient{},
"test",
prometheus.NewRegistry(),
log.NewNopLogger(),
)
require.NoError(t, err)
// Test per-tenant override is used when creating instances
tenantID := "test-tenant"
granularity := ingester.getEffectivePersistenceGranularity(tenantID)
require.Equal(t, 15*time.Minute, granularity, "should use per-tenant override")
})
t.Run("should fall back to chunk duration when no per-tenant override", func(t *testing.T) {
cfg := Config{}
flagext.DefaultValues(&cfg)
// Set custom chunk duration
cfg.MaxChunkAge = 45 * time.Minute
// Create limits without per-tenant override
limits := &fakeLimits{
persistenceGranularity: 0, // No override
}
// Create ingester
ingester, err := New(
cfg,
limits,
&fakeRingClient{},
"test",
prometheus.NewRegistry(),
log.NewNopLogger(),
)
require.NoError(t, err)
// Test chunk duration is used when no override
tenantID := "test-tenant"
granularity := ingester.getEffectivePersistenceGranularity(tenantID)
require.Equal(t, 45*time.Minute, granularity, "should use chunk duration when no override")
})
t.Run("should fall back to chunk duration if per-tenant override is greater than chunk duration", func(t *testing.T) {
cfg := Config{}
flagext.DefaultValues(&cfg)
// Set custom chunk duration
cfg.MaxChunkAge = 45 * time.Minute
// Create limits without per-tenant override
limits := &fakeLimits{
persistenceGranularity: time.Hour,
}
// Create ingester
ingester, err := New(
cfg,
limits,
&fakeRingClient{},
"test",
prometheus.NewRegistry(),
log.NewNopLogger(),
)
require.NoError(t, err)
// Test chunk duration is used when no override
tenantID := "test-tenant"
granularity := ingester.getEffectivePersistenceGranularity(tenantID)
require.Equal(t, 45*time.Minute, granularity, "should use chunk duration when override is greater than chunk duration")
})
}
func TestVolumeFilteringEndToEnd(t *testing.T) {
t.Run("should filter patterns by volume threshold through full pipeline", func(t *testing.T) {
cfg := Config{}
flagext.DefaultValues(&cfg)
cfg.VolumeThreshold = 0.7 // Only keep top 70% of patterns by volume
// cfg.LifecyclerConfig.RingConfig.ReplicationFactor = 1
limits := &fakeLimits{
patternPersistenceEnabled: true,
patternRateThreshold: 0, // Disable rate threshold for this test
}
ingesterID := "test-ingester"
replicationSet := ring.ReplicationSet{
Instances: []ring.InstanceDesc{
{Id: ingesterID, Addr: "ingester0"},
},
}
fakeRing := &fakeRing{}
fakeRing.On("Get", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(replicationSet, nil)
ringClient := &fakeRingClient{ring: fakeRing}
// Track what patterns are written
writtenPatterns := make(map[string]int)
mockWriter := &mockEntryWriter{}
mockWriter.On("WriteEntry", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Run(func(args mock.Arguments) {
entry := args.Get(1).(string)
// Extract pattern from the entry
if strings.Contains(entry, "detected_pattern=") {
parts := strings.Split(entry, "detected_pattern=")
if len(parts) > 1 {
patternPart := strings.Split(parts[1], " ")[0]
writtenPatterns[patternPart]++
}
}
})
mockWriter.On("Stop")
ingester, err := New(
cfg,
limits,
ringClient,
"test",
prometheus.NewRegistry(),
log.NewNopLogger(),
)
require.NoError(t, err)
// Override ingester ID for testing
ingester.lifecycler.ID = ingesterID
// Create instance
inst, err := ingester.GetOrCreateInstance("test-tenant")
require.NoError(t, err)
inst.patternWriter = mockWriter
lbs := labels.FromStrings("test", "test", "service_name", "test_service")
now := time.Now()
// Push many logs to create patterns with different volumes
// Pattern 1: High volume (60% of total)
for i := range 60 {
err = inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
{
Labels: lbs.String(),
Entries: []push.Entry{
{
Timestamp: now.Add(-2*time.Hour + time.Duration(i)*time.Second),
Line: fmt.Sprintf("high volume pattern log %d", i),
},
},
},
},
})
require.NoError(t, err)
}
// Pattern 2: Medium volume (30% of total)
for i := range 30 {
err = inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
{
Labels: lbs.String(),
Entries: []push.Entry{
{
Timestamp: now.Add(-2*time.Hour + time.Duration(60+i)*time.Second),
Line: fmt.Sprintf("medium volume pattern entry %d", i),
},
},
},
},
})
require.NoError(t, err)
}
// Pattern 3: Low volume (10% of total)
for i := range 10 {
err = inst.Push(context.Background(), &push.PushRequest{
Streams: []push.Stream{
{
Labels: lbs.String(),
Entries: []push.Entry{
{
Timestamp: now.Add(-2*time.Hour + time.Duration(90+i)*time.Second),
Line: fmt.Sprintf("low volume pattern item %d", i),
},
},
},
},
})
require.NoError(t, err)
}
// Flush patterns
inst.flushPatterns()
// With 70% threshold, high (60%) and medium (30%) patterns should be written
// Low volume pattern (10%) should be filtered out
require.Equal(t, len(writtenPatterns), 2, "2 patterns, high and medium volume, should have been written")
// Verify WriteEntry was called
mockWriter.AssertCalled(t, "WriteEntry", mock.Anything, mock.Anything, mock.Anything, mock.Anything)
})
}
func TestConfigurationValidation(t *testing.T) {
t.Run("should validate retain-for >= chunk-duration", func(t *testing.T) {
cfg := Config{}
flagext.DefaultValues(&cfg)
// Set replication factor to 1 to pass basic validation
cfg.LifecyclerConfig.RingConfig.ReplicationFactor = 1
// Set invalid values: retain-for < chunk-duration
cfg.RetainFor = 1 * time.Hour
cfg.MaxChunkAge = 2 * time.Hour
err := cfg.Validate()
require.Error(t, err, "should fail validation when retain-for < chunk-duration")
require.Contains(t, err.Error(), "retain-for", "error message should mention retain-for")
require.Contains(t, err.Error(), "chunk-duration", "error message should mention chunk-duration")
})
t.Run("should validate chunk-duration >= sample-interval", func(t *testing.T) {
cfg := Config{}
flagext.DefaultValues(&cfg)
// Set replication factor to 1 to pass basic validation
cfg.LifecyclerConfig.RingConfig.ReplicationFactor = 1
// Set invalid values: chunk-duration < sample-interval
cfg.MaxChunkAge = 5 * time.Second
cfg.PatternSampleInterval = 10 * time.Second
err := cfg.Validate()
require.Error(t, err, "should fail validation when chunk-duration < sample-interval")
require.Contains(t, err.Error(), "chunk-duration", "error message should mention chunk-duration")
require.Contains(t, err.Error(), "sample-interval", "error message should mention sample-interval")
})
t.Run("should pass validation with valid configuration", func(t *testing.T) {
cfg := Config{}
flagext.DefaultValues(&cfg)
// Set replication factor to 1 to pass basic validation
cfg.LifecyclerConfig.RingConfig.ReplicationFactor = 1
// Set valid values
cfg.RetainFor = 3 * time.Hour
cfg.MaxChunkAge = 1 * time.Hour
cfg.PatternSampleInterval = 10 * time.Second
err := cfg.Validate()
require.NoError(t, err, "should pass validation with valid configuration")
})
}