Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/ruler/base/lifecycle_test.go

124 lines
3.7 KiB

package base
import (
"context"
"sort"
"testing"
"time"
"github.com/go-kit/log"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/kv/consul"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/util/test"
)
// TestRulerShutdown tests shutting down ruler unregisters correctly
func TestRulerShutdown(t *testing.T) {
ctx := context.Background()
config := defaultRulerConfig(t, newMockRuleStore(mockRules))
m := storage.NewClientMetrics()
defer m.Unregister()
r := buildRuler(t, config, nil, m, nil)
r.cfg.EnableSharding = true
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
err := enableSharding(r, ringStore)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, r))
defer services.StopAndAwaitTerminated(ctx, r) //nolint:errcheck
// Wait until the tokens are registered in the ring
test.Poll(t, 100*time.Millisecond, config.Ring.NumTokens, func() interface{} {
return numTokens(ringStore, "localhost", ringKey)
})
require.Equal(t, ring.ACTIVE, r.lifecycler.GetState())
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), r))
// Wait until the tokens are unregistered from the ring
test.Poll(t, 100*time.Millisecond, 0, func() interface{} {
return numTokens(ringStore, "localhost", ringKey)
})
}
func TestRuler_RingLifecyclerShouldAutoForgetUnhealthyInstances(t *testing.T) {
const unhealthyInstanceID = "unhealthy-id"
const heartbeatTimeout = time.Minute
ctx := context.Background()
config := defaultRulerConfig(t, newMockRuleStore(mockRules))
m := storage.NewClientMetrics()
defer m.Unregister()
r := buildRuler(t, config, nil, m, nil)
r.cfg.EnableSharding = true
r.cfg.Ring.HeartbeatPeriod = 100 * time.Millisecond
r.cfg.Ring.HeartbeatTimeout = heartbeatTimeout
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
err := enableSharding(r, ringStore)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, r))
defer services.StopAndAwaitTerminated(ctx, r) //nolint:errcheck
// Add an unhealthy instance to the ring.
require.NoError(t, ringStore.CAS(ctx, ringKey, func(in interface{}) (interface{}, bool, error) {
ringDesc := ring.GetOrCreateRingDesc(in)
instance := ringDesc.AddIngester(unhealthyInstanceID, "1.1.1.1", "", generateSortedTokens(config.Ring.NumTokens), ring.ACTIVE, time.Now())
instance.Timestamp = time.Now().Add(-(ringAutoForgetUnhealthyPeriods + 1) * heartbeatTimeout).Unix()
ringDesc.Ingesters[unhealthyInstanceID] = instance
return ringDesc, true, nil
}))
// Ensure the unhealthy instance is removed from the ring.
test.Poll(t, time.Second*5, false, func() interface{} {
d, err := ringStore.Get(ctx, ringKey)
if err != nil {
return err
}
_, ok := ring.GetOrCreateRingDesc(d).Ingesters[unhealthyInstanceID]
return ok
})
}
func generateSortedTokens(numTokens int) ring.Tokens {
tokens := ring.GenerateTokens(numTokens, nil)
// Ensure generated tokens are sorted.
sort.Slice(tokens, func(i, j int) bool {
return tokens[i] < tokens[j]
})
return ring.Tokens(tokens)
}
// numTokens determines the number of tokens owned by the specified
// address
func numTokens(c kv.Client, name, ringKey string) int {
ringDesc, err := c.Get(context.Background(), ringKey)
// The ringDesc may be null if the lifecycler hasn't stored the ring
// to the KVStore yet.
if ringDesc == nil || err != nil {
return 0
}
rd := ringDesc.(*ring.Desc)
return len(rd.Ingesters[name].Tokens)
}