Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/pattern/pattern_persistence_test.go

220 lines
6.8 KiB

package pattern
import (
"context"
"testing"
"time"
"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/user"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
// TestPatternPersistenceConfiguration tests that pattern persistence writers
// are correctly configured based on the limits configuration.
func TestPatternPersistenceConfiguration(t *testing.T) {
tests := []struct {
name string
patternPersistenceEnabled bool
metricAggregationEnabled bool
expectPatternWriter bool
expectMetricWriter bool
}{
{
name: "both disabled",
patternPersistenceEnabled: false,
metricAggregationEnabled: false,
expectPatternWriter: false,
expectMetricWriter: false,
},
{
name: "only pattern persistence enabled",
patternPersistenceEnabled: true,
metricAggregationEnabled: false,
expectPatternWriter: true,
expectMetricWriter: false,
},
{
name: "only metric aggregation enabled",
patternPersistenceEnabled: false,
metricAggregationEnabled: true,
expectPatternWriter: false,
expectMetricWriter: true,
},
{
name: "both enabled",
patternPersistenceEnabled: true,
metricAggregationEnabled: true,
expectPatternWriter: true,
expectMetricWriter: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Setup mock limits
limits := &configurableLimits{
patternPersistenceEnabled: tt.patternPersistenceEnabled,
metricAggregationEnabled: tt.metricAggregationEnabled,
}
// Setup ring
replicationSet := ring.ReplicationSet{
Instances: []ring.InstanceDesc{
{Id: "localhost", Addr: "ingester0"},
},
}
fakeRing := &fakeRing{}
fakeRing.On("Get", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(replicationSet, nil)
ringClient := &fakeRingClient{
ring: fakeRing,
}
// Create ingester with the specified configuration
cfg := testIngesterConfig(t)
ing, err := New(cfg, limits, ringClient, "test", nil, log.NewNopLogger())
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck
err = services.StartAndAwaitRunning(context.Background(), ing)
require.NoError(t, err)
// Create test instance
_ = user.InjectOrgID(context.Background(), "test-tenant")
instance, err := ing.GetOrCreateInstance("test-tenant")
require.NoError(t, err)
// Verify writer configuration based on test expectations
if tt.expectPatternWriter {
require.NotNil(t, instance.patternWriter, "pattern writer should be configured when pattern persistence is enabled")
} else {
require.Nil(t, instance.patternWriter, "pattern writer should be nil when pattern persistence is disabled")
}
if tt.expectMetricWriter {
require.NotNil(t, instance.metricWriter, "metric writer should be configured when metric aggregation is enabled")
} else {
require.Nil(t, instance.metricWriter, "metric writer should be nil when metric aggregation is disabled")
}
// Verify they are different instances when both are enabled
if tt.expectPatternWriter && tt.expectMetricWriter {
require.NotEqual(t, instance.patternWriter, instance.metricWriter,
"pattern writer and metric writer should be different instances")
}
})
}
}
// TestPatternPersistenceStopWriters tests that both pattern and metric writers
// are properly stopped when the ingester shuts down
func TestPatternPersistenceStopWriters(t *testing.T) {
mockPatternWriter := &mockEntryWriter{}
mockMetricWriter := &mockEntryWriter{}
mockPatternWriter.On("Stop").Return()
mockMetricWriter.On("Stop").Return()
replicationSet := ring.ReplicationSet{
Instances: []ring.InstanceDesc{
{Id: "localhost", Addr: "ingester0"},
},
}
fakeRing := &fakeRing{}
fakeRing.On("Get", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(replicationSet, nil)
ringClient := &fakeRingClient{ring: fakeRing}
cfg := testIngesterConfig(t)
ing, err := New(cfg, &configurableLimits{
patternPersistenceEnabled: true,
metricAggregationEnabled: true,
}, ringClient, "test", nil, log.NewNopLogger())
require.NoError(t, err)
err = services.StartAndAwaitRunning(context.Background(), ing)
require.NoError(t, err)
// Create instance and replace writers with mocks
instance, err := ing.GetOrCreateInstance("test-tenant")
require.NoError(t, err)
instance.patternWriter = mockPatternWriter
instance.metricWriter = mockMetricWriter
// Stop the ingester - this should call stopWriters
err = services.StopAndAwaitTerminated(context.Background(), ing)
require.NoError(t, err)
// Verify both writers were stopped
mockPatternWriter.AssertCalled(t, "Stop")
mockMetricWriter.AssertCalled(t, "Stop")
}
// configurableLimits implements the Limits interface with configurable pattern persistence
type configurableLimits struct {
patternPersistenceEnabled bool
metricAggregationEnabled bool
}
var _ Limits = &configurableLimits{}
func (c *configurableLimits) PatternIngesterTokenizableJSONFields(_ string) []string {
return []string{"log", "message", "msg", "msg_", "_msg", "content"}
}
func (c *configurableLimits) PatternPersistenceEnabled(_ string) bool {
return c.patternPersistenceEnabled
}
func (c *configurableLimits) MetricAggregationEnabled(_ string) bool {
return c.metricAggregationEnabled
}
func (c *configurableLimits) PersistenceGranularity(_ string) time.Duration {
return time.Hour // Default value for tests
}
func (c *configurableLimits) PatternRateThreshold(_ string) float64 {
return 1.0 // Default value for tests
}
func testIngesterConfig(t testing.TB) Config {
kvClient, err := kv.NewClient(kv.Config{Store: "inmemory"}, ring.GetCodec(), nil, log.NewNopLogger())
require.NoError(t, err)
cfg := Config{}
flagext.DefaultValues(&cfg)
cfg.FlushCheckPeriod = 99999 * time.Hour
cfg.ConcurrentFlushes = 1
cfg.LifecyclerConfig.RingConfig.KVStore.Mock = kvClient
cfg.LifecyclerConfig.NumTokens = 1
cfg.LifecyclerConfig.ListenPort = 0
cfg.LifecyclerConfig.Addr = "localhost"
cfg.LifecyclerConfig.ID = "localhost"
cfg.LifecyclerConfig.FinalSleep = 0
cfg.LifecyclerConfig.MinReadyDuration = 0
// Configure pattern persistence
cfg.PatternPersistence.LokiAddr = "http://localhost:3100"
cfg.PatternPersistence.WriteTimeout = 30 * time.Second
cfg.PatternPersistence.PushPeriod = 10 * time.Second
cfg.PatternPersistence.BatchSize = 1000
// Configure metric aggregation
cfg.MetricAggregation.LokiAddr = "http://localhost:3100"
cfg.MetricAggregation.WriteTimeout = 30 * time.Second
cfg.MetricAggregation.SamplePeriod = 10 * time.Second
return cfg
}