chore(deps): update dskit 20240819 (#13924)

pull/13930/head
Paul Rogers 1 year ago committed by GitHub
parent 1ba4bff005
commit 8fb7b488bd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 8
      docs/sources/shared/configuration.md
  2. 4
      go.mod
  3. 11
      go.sum
  4. 2
      pkg/distributor/instance_count_test.go
  5. 10
      pkg/ingester/ingester_test.go
  6. 14
      pkg/pattern/flush_test.go
  7. 4
      pkg/pattern/ring_client.go
  8. 10
      pkg/querier/querier_mock_test.go
  9. 2
      pkg/ruler/base/lifecycle_test.go
  10. 134
      pkg/ruler/base/ruler_test.go
  11. 10
      pkg/util/ring/ring_test.go
  12. 11
      vendor/github.com/grafana/dskit/backoff/backoff.go
  13. 5
      vendor/github.com/grafana/dskit/concurrency/runner.go
  14. 42
      vendor/github.com/grafana/dskit/crypto/tls/tls.go
  15. 9
      vendor/github.com/grafana/dskit/internal/math/math.go
  16. 4
      vendor/github.com/grafana/dskit/kv/memberlist/broadcast.go
  17. 112
      vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go
  18. 32
      vendor/github.com/grafana/dskit/kv/memberlist/metrics.go
  19. 6
      vendor/github.com/grafana/dskit/ring/basic_lifecycler.go
  20. 86
      vendor/github.com/grafana/dskit/ring/lifecycler.go
  21. 76
      vendor/github.com/grafana/dskit/ring/model.go
  22. 109
      vendor/github.com/grafana/dskit/ring/ring.go
  23. 155
      vendor/github.com/grafana/dskit/ring/ring.pb.go
  24. 11
      vendor/github.com/grafana/dskit/ring/ring.proto
  25. 46
      vendor/github.com/grafana/dskit/ring/ring_http.go
  26. 6
      vendor/github.com/grafana/dskit/ring/ring_status.gohtml
  27. 5
      vendor/github.com/grafana/dskit/ring/shard/shard.go
  28. 48
      vendor/github.com/grafana/dskit/services/basic_service.go
  29. 53
      vendor/github.com/grafana/dskit/services/failure_watcher.go
  30. 56
      vendor/github.com/grafana/dskit/services/manager.go
  31. 5
      vendor/github.com/grafana/dskit/services/service.go
  32. 50
      vendor/github.com/grafana/dskit/spanlogger/spanlogger.go
  33. 17
      vendor/github.com/grafana/dskit/tenant/resolver.go
  34. 24
      vendor/github.com/grafana/dskit/tracing/tracing.go
  35. 3
      vendor/github.com/grafana/pyroscope-go/godeltaprof/internal/pprof/stub.go
  36. 7
      vendor/modules.txt

@ -4181,6 +4181,14 @@ When a memberlist config with atleast 1 join_members is defined, kvstore of type
# CLI flag: -memberlist.leave-timeout
[leave_timeout: <duration> | default = 20s]
# Timeout for broadcasting all remaining locally-generated updates to other
# nodes when shutting down. Only used if there are nodes left in the memberlist
# cluster, and only applies to locally-generated updates, not to broadcast
# messages that are result of incoming gossip updates. 0 = no timeout, wait
# until all locally-generated updates are sent.
# CLI flag: -memberlist.broadcast-timeout-for-local-updates-on-shutdown
[broadcast_timeout_for_local_updates_on_shutdown: <duration> | default = 10s]
# How much space to use for keeping received and sent messages in memory for
# troubleshooting (two buffers). 0 to disable.
# CLI flag: -memberlist.message-history-buffer-bytes

@ -50,7 +50,7 @@ require (
github.com/gorilla/mux v1.8.1
github.com/gorilla/websocket v1.5.0
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2
github.com/grafana/dskit v0.0.0-20240626184720-35810fdf1c6d
github.com/grafana/dskit v0.0.0-20240819131358-463219e80ea0
github.com/grafana/go-gelf/v2 v2.0.1
github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56
github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc
@ -271,7 +271,7 @@ require (
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.5 // indirect
github.com/gophercloud/gophercloud v1.13.0 // indirect
github.com/grafana/pyroscope-go/godeltaprof v0.1.6 // indirect
github.com/grafana/pyroscope-go/godeltaprof v0.1.7 // indirect
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect

@ -274,9 +274,8 @@ github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb0
github.com/NYTimes/gziphandler v1.0.1/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ=
github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cqUQ3I=
github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/OneOfOne/xxhash v1.2.6 h1:U68crOE3y3MPttCMQGywZOLrTeF5HHJ3/vDBCJn9/bA=
github.com/OneOfOne/xxhash v1.2.6/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
github.com/OpenDNS/vegadns2client v0.0.0-20180418235048-a3fa4a771d87/go.mod h1:iGLljf5n9GjT6kc0HBvyI1nOKnGQbNB66VzSNbK5iks=
github.com/PuerkitoBio/purell v1.0.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
github.com/PuerkitoBio/purell v1.1.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
@ -1044,8 +1043,8 @@ github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWm
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 h1:qhugDMdQ4Vp68H0tp/0iN17DM2ehRo1rLEdOFe/gB8I=
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2/go.mod h1:w/aiO1POVIeXUQyl0VQSZjl5OAGDTL5aX+4v0RA1tcw=
github.com/grafana/dskit v0.0.0-20240626184720-35810fdf1c6d h1:CD8PWWX+9lYdgeMquSofmLErvCtk7jb+3/W/SH6oo/k=
github.com/grafana/dskit v0.0.0-20240626184720-35810fdf1c6d/go.mod h1:HvSf3uf8Ps2vPpzHeAFyZTdUcbVr+Rxpq1xcx7J/muc=
github.com/grafana/dskit v0.0.0-20240819131358-463219e80ea0 h1:iMShjkEYATnBMbEa2wV4QiK5PU2trw24FOCON3v7+K4=
github.com/grafana/dskit v0.0.0-20240819131358-463219e80ea0/go.mod h1:c4ASJAo1QFmXGydDzNed2o0+Fncx+x4YmQ1r9HfYU3c=
github.com/grafana/go-gelf/v2 v2.0.1 h1:BOChP0h/jLeD+7F9mL7tq10xVkDG15he3T1zHuQaWak=
github.com/grafana/go-gelf/v2 v2.0.1/go.mod h1:lexHie0xzYGwCgiRGcvZ723bSNyNI8ZRD4s0CLobh90=
github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY=
@ -1056,8 +1055,8 @@ github.com/grafana/jsonparser v0.0.0-20240425183733-ea80629e1a32 h1:NznuPwItog+r
github.com/grafana/jsonparser v0.0.0-20240425183733-ea80629e1a32/go.mod h1:796sq+UcONnSlzA3RtlBZ+b/hrerkZXiEmO8oMjyRwY=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
github.com/grafana/pyroscope-go/godeltaprof v0.1.6 h1:nEdZ8louGAplSvIJi1HVp7kWvFvdiiYg3COLlTwJiFo=
github.com/grafana/pyroscope-go/godeltaprof v0.1.6/go.mod h1:Tk376Nbldo4Cha9RgiU7ik8WKFkNpfds98aUzS8omLE=
github.com/grafana/pyroscope-go/godeltaprof v0.1.7 h1:C11j63y7gymiW8VugJ9ZW0pWfxTZugdSJyC48olk5KY=
github.com/grafana/pyroscope-go/godeltaprof v0.1.7/go.mod h1:Tk376Nbldo4Cha9RgiU7ik8WKFkNpfds98aUzS8omLE=
github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc h1:GN2Lv3MGO7AS6PrRoT6yV5+wkrOpcszoIsO4+4ds248=
github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc/go.mod h1:+JKpmjMGhpgPL+rXZ5nsZieVzvarn86asRlBg4uNGnk=
github.com/grafana/tail v0.0.0-20230510142333-77b18831edf0 h1:bjh0PVYSVVFxzINqPFYJmAmJNrWPgnVjuSdYJGHmtFU=

@ -111,7 +111,7 @@ func TestInstanceCountDelegate_CorrectlyInvokesOtherDelegates(t *testing.T) {
require.NoError(t, err)
ingesters := ring.NewDesc()
ingesters.AddIngester("ingester-0", "ingester-0:3100", "zone-a", []uint32{1}, ring.ACTIVE, time.Now())
ingesters.AddIngester("ingester-0", "ingester-0:3100", "zone-a", []uint32{1}, ring.ACTIVE, time.Now(), false, time.Now())
// initial state.
require.Equal(t, 0, sentry1["Heartbeat"])

@ -1609,6 +1609,16 @@ func (r *readRingMock) GetTokenRangesForInstance(instance string) (ring.TokenRan
return tr, nil
}
// WritableInstancesWithTokensCount returns the number of writable instances in the ring that have tokens.
func (r *readRingMock) WritableInstancesWithTokensCount() int {
return len(r.replicationSet.Instances)
}
// WritableInstancesWithTokensInZoneCount returns the number of writable instances in the ring that are registered in given zone and have tokens.
func (r *readRingMock) WritableInstancesWithTokensInZoneCount(_ string) int {
return len(r.replicationSet.Instances)
}
func mockReadRingWithOneActiveIngester() *readRingMock {
return newReadRingMock([]ring.InstanceDesc{
{Addr: "test", Timestamp: time.Now().UnixNano(), State: ring.ACTIVE, Tokens: []uint32{1, 2, 3}},

@ -144,7 +144,7 @@ func (f *fakeRingClient) State() services.State {
panic("not implemented")
}
func (f *fakeRingClient) AddListener(_ services.Listener) {
func (f *fakeRingClient) AddListener(_ services.Listener) func() {
panic("not implemented")
}
@ -184,6 +184,18 @@ func (f *fakeRing) ZonesCount() int {
return args.Int(0)
}
// WritableInstancesWithTokensCount returns the number of writable instances in the ring that have tokens.
func (f *fakeRing) WritableInstancesWithTokensCount() int {
args := f.Called()
return args.Int(0)
}
// WritableInstancesWithTokensInZoneCount returns the number of writable instances in the ring that are registered in given zone and have tokens.
func (f *fakeRing) WritableInstancesWithTokensInZoneCount(zone string) int {
args := f.Called(zone)
return args.Int(0)
}
func (f *fakeRing) Get(
key uint32,
op ring.Operation,

@ -110,8 +110,8 @@ func (r *ringClient) State() services.State {
return r.ring.State()
}
func (r *ringClient) AddListener(listener services.Listener) {
r.ring.AddListener(listener)
func (r *ringClient) AddListener(listener services.Listener) func() {
return r.ring.AddListener(listener)
}
func (r *ringClient) GetClientFor(addr string) (ring_client.PoolClient, error) {

@ -496,6 +496,16 @@ func (r *readRingMock) GetTokenRangesForInstance(_ string) (ring.TokenRanges, er
return tr, nil
}
// WritableInstancesWithTokensCount returns the number of writable instances in the ring that have tokens.
func (r *readRingMock) WritableInstancesWithTokensCount() int {
return len(r.replicationSet.Instances)
}
// WritableInstancesWithTokensInZoneCount returns the number of writable instances in the ring that are registered in given zone and have tokens.
func (r *readRingMock) WritableInstancesWithTokensInZoneCount(_ string) int {
return len(r.replicationSet.Instances)
}
func mockReadRingWithOneActiveIngester() *readRingMock {
return newReadRingMock([]ring.InstanceDesc{
{Addr: "test", Timestamp: time.Now().UnixNano(), State: ring.ACTIVE, Tokens: []uint32{1, 2, 3}},

@ -79,7 +79,7 @@ func TestRuler_RingLifecyclerShouldAutoForgetUnhealthyInstances(t *testing.T) {
require.NoError(t, ringStore.CAS(ctx, ringKey, func(in interface{}) (interface{}, bool, error) {
ringDesc := ring.GetOrCreateRingDesc(in)
instance := ringDesc.AddIngester(unhealthyInstanceID, "1.1.1.1", "", generateSortedTokens(config.Ring.NumTokens), ring.ACTIVE, time.Now())
instance := ringDesc.AddIngester(unhealthyInstanceID, "1.1.1.1", "", generateSortedTokens(config.Ring.NumTokens), ring.ACTIVE, time.Now(), false, time.Now())
instance.Timestamp = time.Now().Add(-(ringAutoForgetUnhealthyPeriods + 1) * heartbeatTimeout).Unix()
ringDesc.Ingesters[unhealthyInstanceID] = instance

@ -562,7 +562,7 @@ func TestGetRules(t *testing.T) {
d = ring.NewDesc()
}
for rID, tokens := range allTokensByRuler {
d.AddIngester(rID, rulerAddrMap[rID].lifecycler.GetInstanceAddr(), "", tokens, ring.ACTIVE, time.Now())
d.AddIngester(rID, rulerAddrMap[rID].lifecycler.GetInstanceAddr(), "", tokens, ring.ACTIVE, time.Now(), false, time.Now())
}
return d, true, nil
})
@ -744,7 +744,7 @@ func TestSharding(t *testing.T) {
sharding: true,
shardingStrategy: util.ShardingStrategyDefault,
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", []uint32{0}, ring.ACTIVE, time.Now())
desc.AddIngester(ruler1, ruler1Addr, "", []uint32{0}, ring.ACTIVE, time.Now(), false, time.Now())
},
expectedRules: expectedRulesMap{ruler1: allRules},
},
@ -754,7 +754,7 @@ func TestSharding(t *testing.T) {
shardingStrategy: util.ShardingStrategyDefault,
enabledUsers: []string{user1},
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", []uint32{0}, ring.ACTIVE, time.Now())
desc.AddIngester(ruler1, ruler1Addr, "", []uint32{0}, ring.ACTIVE, time.Now(), false, time.Now())
},
expectedRules: expectedRulesMap{ruler1: map[string]rulespb.RuleGroupList{
user1: {user1Group1, user1Group2},
@ -766,7 +766,7 @@ func TestSharding(t *testing.T) {
shardingStrategy: util.ShardingStrategyDefault,
disabledUsers: []string{user1},
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", []uint32{0}, ring.ACTIVE, time.Now())
desc.AddIngester(ruler1, ruler1Addr, "", []uint32{0}, ring.ACTIVE, time.Now(), false, time.Now())
},
expectedRules: expectedRulesMap{ruler1: map[string]rulespb.RuleGroupList{
user2: {user2Group1},
@ -778,8 +778,8 @@ func TestSharding(t *testing.T) {
sharding: true,
shardingStrategy: util.ShardingStrategyDefault,
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
},
expectedRules: expectedRulesMap{
@ -800,8 +800,8 @@ func TestSharding(t *testing.T) {
shardingStrategy: util.ShardingStrategyDefault,
enabledUsers: []string{user1},
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
},
expectedRules: expectedRulesMap{
@ -820,8 +820,8 @@ func TestSharding(t *testing.T) {
shardingStrategy: util.ShardingStrategyDefault,
disabledUsers: []string{user1},
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
},
expectedRules: expectedRulesMap{
@ -840,7 +840,7 @@ func TestSharding(t *testing.T) {
shardingStrategy: util.ShardingStrategyDefault,
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
desc.Ingesters[ruler2] = ring.InstanceDesc{
Addr: ruler2Addr,
Timestamp: time.Now().Add(-time.Hour).Unix(),
@ -864,8 +864,8 @@ func TestSharding(t *testing.T) {
shardingStrategy: util.ShardingStrategyDefault,
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.LEAVING, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.LEAVING, time.Now(), false, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
},
expectedRules: expectedRulesMap{
@ -880,8 +880,8 @@ func TestSharding(t *testing.T) {
shardingStrategy: util.ShardingStrategyDefault,
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.JOINING, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Token + 1, user2Group1Token + 1}), ring.JOINING, time.Now(), false, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
},
expectedRules: expectedRulesMap{
@ -896,7 +896,7 @@ func TestSharding(t *testing.T) {
shardingStrategy: util.ShardingStrategyShuffle,
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{0}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{0}), ring.ACTIVE, time.Now(), false, time.Now())
},
expectedRules: expectedRulesMap{
@ -910,8 +910,8 @@ func TestSharding(t *testing.T) {
shuffleShardSize: 1,
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 0) + 1, userToken(user3, 0) + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group1Token + 1, user1Group2Token + 1, user2Group1Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 0) + 1, userToken(user3, 0) + 1}), ring.ACTIVE, time.Now(), false, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group1Token + 1, user1Group2Token + 1, user2Group1Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
},
expectedRules: expectedRulesMap{
@ -928,8 +928,8 @@ func TestSharding(t *testing.T) {
setupRing: func(desc *ring.Desc) {
// Exact same tokens setup as previous test.
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 0) + 1, userToken(user3, 0) + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group1Token + 1, user1Group2Token + 1, user2Group1Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 0) + 1, userToken(user3, 0) + 1}), ring.ACTIVE, time.Now(), false, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group1Token + 1, user1Group2Token + 1, user2Group1Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
},
expectedRules: expectedRulesMap{
@ -944,8 +944,8 @@ func TestSharding(t *testing.T) {
shuffleShardSize: 1,
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1}), ring.ACTIVE, time.Now(), false, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1}), ring.ACTIVE, time.Now(), false, time.Now())
},
expectedRules: expectedRulesMap{
@ -964,9 +964,9 @@ func TestSharding(t *testing.T) {
shuffleShardSize: 2,
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, user1Group1Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, user1Group2Token + 1, userToken(user2, 1) + 1, userToken(user3, 1) + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user2Group1Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, user1Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, user1Group2Token + 1, userToken(user2, 1) + 1, userToken(user3, 1) + 1}), ring.ACTIVE, time.Now(), false, time.Now())
desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user2Group1Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
},
expectedRules: expectedRulesMap{
@ -988,9 +988,9 @@ func TestSharding(t *testing.T) {
shuffleShardSize: 2,
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 1) + 1, user1Group1Token + 1, user1Group2Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, userToken(user3, 1) + 1, user2Group1Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 1) + 1, user1Group1Token + 1, user1Group2Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, userToken(user3, 1) + 1, user2Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
},
expectedRules: expectedRulesMap{
@ -1012,9 +1012,9 @@ func TestSharding(t *testing.T) {
enabledUsers: []string{user1},
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, user1Group1Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, user1Group2Token + 1, userToken(user2, 1) + 1, userToken(user3, 1) + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user2Group1Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, user1Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, user1Group2Token + 1, userToken(user2, 1) + 1, userToken(user3, 1) + 1}), ring.ACTIVE, time.Now(), false, time.Now())
desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user2Group1Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
},
expectedRules: expectedRulesMap{
@ -1035,9 +1035,9 @@ func TestSharding(t *testing.T) {
disabledUsers: []string{user1},
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, user1Group1Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, user1Group2Token + 1, userToken(user2, 1) + 1, userToken(user3, 1) + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user2Group1Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, user1Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, user1Group2Token + 1, userToken(user2, 1) + 1, userToken(user3, 1) + 1}), ring.ACTIVE, time.Now(), false, time.Now())
desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user2Group1Token + 1, user3Group1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
},
expectedRules: expectedRulesMap{
@ -1054,7 +1054,7 @@ func TestSharding(t *testing.T) {
sharding: true,
shardingAlgo: util.ShardingAlgoByRule,
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", []uint32{0}, ring.ACTIVE, time.Now())
desc.AddIngester(ruler1, ruler1Addr, "", []uint32{0}, ring.ACTIVE, time.Now(), false, time.Now())
},
expectedRules: expectedRulesMap{ruler1: allRulesSharded},
},
@ -1064,7 +1064,7 @@ func TestSharding(t *testing.T) {
shardingAlgo: util.ShardingAlgoByRule,
enabledUsers: []string{user1},
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", []uint32{0}, ring.ACTIVE, time.Now())
desc.AddIngester(ruler1, ruler1Addr, "", []uint32{0}, ring.ACTIVE, time.Now(), false, time.Now())
},
expectedRules: expectedRulesMap{ruler1: map[string]rulespb.RuleGroupList{
user1: {
@ -1080,7 +1080,7 @@ func TestSharding(t *testing.T) {
shardingAlgo: util.ShardingAlgoByRule,
disabledUsers: []string{user1},
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", []uint32{0}, ring.ACTIVE, time.Now())
desc.AddIngester(ruler1, ruler1Addr, "", []uint32{0}, ring.ACTIVE, time.Now(), false, time.Now())
},
expectedRules: expectedRulesMap{ruler1: map[string]rulespb.RuleGroupList{
user2: {
@ -1098,8 +1098,8 @@ func TestSharding(t *testing.T) {
sharding: true,
shardingAlgo: util.ShardingAlgoByRule,
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Rule1Token + 1, user2Group1Rule2Token + 1, user1Group1Rule2Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Rule1Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Rule1Token + 1, user2Group1Rule2Token + 1, user1Group1Rule2Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Rule1Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
},
expectedRules: expectedRulesMap{
@ -1127,8 +1127,8 @@ func TestSharding(t *testing.T) {
shardingAlgo: util.ShardingAlgoByRule,
enabledUsers: []string{user1},
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Rule1Token + 1, user2Group1Rule2Token + 1, user1Group1Rule2Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Rule1Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Rule1Token + 1, user2Group1Rule2Token + 1, user1Group1Rule2Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Rule1Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
},
expectedRules: expectedRulesMap{
@ -1152,8 +1152,8 @@ func TestSharding(t *testing.T) {
shardingAlgo: util.ShardingAlgoByRule,
disabledUsers: []string{user1},
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Rule1Token + 1, user2Group1Rule2Token + 1, user1Group1Rule2Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Rule1Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Rule1Token + 1, user2Group1Rule2Token + 1, user1Group1Rule2Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Rule1Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
},
expectedRules: expectedRulesMap{
@ -1180,7 +1180,7 @@ func TestSharding(t *testing.T) {
shardingAlgo: util.ShardingAlgoByRule,
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Rule1Token + 1, user2Group1Rule2Token + 1, user1Group1Rule2Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Rule1Token + 1, user2Group1Rule2Token + 1, user1Group1Rule2Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
desc.Ingesters[ruler2] = ring.InstanceDesc{
Addr: ruler2Addr,
Timestamp: time.Now().Add(-time.Hour).Unix(),
@ -1209,8 +1209,8 @@ func TestSharding(t *testing.T) {
shardingAlgo: util.ShardingAlgoByRule,
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Rule1Token + 1, user2Group1Rule2Token + 1, user1Group1Rule2Token + 1}), ring.LEAVING, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Rule1Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Rule1Token + 1, user2Group1Rule2Token + 1, user1Group1Rule2Token + 1}), ring.LEAVING, time.Now(), false, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Rule1Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
},
expectedRules: expectedRulesMap{
@ -1225,8 +1225,8 @@ func TestSharding(t *testing.T) {
shardingAlgo: util.ShardingAlgoByRule,
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Rule1Token + 1, user2Group1Rule2Token + 1, user1Group1Rule2Token + 1}), ring.JOINING, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Rule1Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{user1Group1Rule1Token + 1, user2Group1Rule2Token + 1, user1Group1Rule2Token + 1}), ring.JOINING, time.Now(), false, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Rule1Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
},
expectedRules: expectedRulesMap{
@ -1242,7 +1242,7 @@ func TestSharding(t *testing.T) {
shardingAlgo: util.ShardingAlgoByRule,
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{0}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{0}), ring.ACTIVE, time.Now(), false, time.Now())
},
expectedRules: expectedRulesMap{
@ -1257,9 +1257,9 @@ func TestSharding(t *testing.T) {
shuffleShardSize: 1,
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 0) + 1, userToken(user3, 0) + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 0) + 1, userToken(user3, 0) + 1}), ring.ACTIVE, time.Now(), false, time.Now())
// immaterial what tokens this ruler has, it won't be assigned any rules
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Rule1Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group2Rule1Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
},
expectedRules: expectedRulesMap{
@ -1277,9 +1277,9 @@ func TestSharding(t *testing.T) {
setupRing: func(desc *ring.Desc) {
// Exact same tokens setup as previous test.
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 0) + 1, userToken(user3, 0) + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 0) + 1, userToken(user3, 0) + 1}), ring.ACTIVE, time.Now(), false, time.Now())
// this ruler has all the rule tokens, so it gets all the rules
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group1Rule1Token + 1, user1Group1Rule2Token + 1, user1Group2Rule1Token + 1, user2Group1Rule1Token + 1, user2Group1Rule2Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{user1Group1Rule1Token + 1, user1Group1Rule2Token + 1, user1Group2Rule1Token + 1, user2Group1Rule1Token + 1, user2Group1Rule2Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
},
expectedRules: expectedRulesMap{
@ -1295,8 +1295,8 @@ func TestSharding(t *testing.T) {
shuffleShardSize: 1,
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1}), ring.ACTIVE, time.Now(), false, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1}), ring.ACTIVE, time.Now(), false, time.Now())
},
expectedRules: expectedRulesMap{
@ -1327,9 +1327,9 @@ func TestSharding(t *testing.T) {
shuffleShardSize: 2,
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, user1Group1Rule1Token + 1, user1Group2Rule1Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, user1Group1Rule2Token + 1, userToken(user2, 1) + 1, userToken(user3, 1) + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user2Group1Rule1Token + 1, user2Group1Rule2Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, user1Group1Rule1Token + 1, user1Group2Rule1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, user1Group1Rule2Token + 1, userToken(user2, 1) + 1, userToken(user3, 1) + 1}), ring.ACTIVE, time.Now(), false, time.Now())
desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user2Group1Rule1Token + 1, user2Group1Rule2Token + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
},
expectedRules: expectedRulesMap{
@ -1363,9 +1363,9 @@ func TestSharding(t *testing.T) {
shuffleShardSize: 2,
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 1) + 1, user1Group1Rule1Token + 1, user1Group1Rule2Token + 1, user1Group2Rule1Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, userToken(user3, 1) + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule2Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 1) + 1, user1Group1Rule1Token + 1, user1Group1Rule2Token + 1, user1Group2Rule1Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, userToken(user3, 1) + 1}), ring.ACTIVE, time.Now(), false, time.Now())
desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule2Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
},
expectedRules: expectedRulesMap{
@ -1400,9 +1400,9 @@ func TestSharding(t *testing.T) {
enabledUsers: []string{user1},
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 1) + 1, user1Group1Rule1Token + 1, user1Group1Rule2Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, userToken(user3, 1) + 1, user1Group2Rule1Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule2Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 1) + 1, user1Group1Rule1Token + 1, user1Group1Rule2Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, userToken(user3, 1) + 1, user1Group2Rule1Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule2Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
},
expectedRules: expectedRulesMap{
@ -1429,9 +1429,9 @@ func TestSharding(t *testing.T) {
disabledUsers: []string{user1},
setupRing: func(desc *ring.Desc) {
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 1) + 1, user1Group1Rule1Token + 1, user1Group1Rule2Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, userToken(user3, 1) + 1, user1Group2Rule1Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule2Token + 1}), ring.ACTIVE, time.Now())
desc.AddIngester(ruler1, ruler1Addr, "", sortTokens([]uint32{userToken(user1, 0) + 1, userToken(user2, 1) + 1, user1Group1Rule1Token + 1, user1Group1Rule2Token + 1, user2Group1Rule1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
desc.AddIngester(ruler2, ruler2Addr, "", sortTokens([]uint32{userToken(user1, 1) + 1, userToken(user3, 1) + 1, user1Group2Rule1Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
desc.AddIngester(ruler3, ruler3Addr, "", sortTokens([]uint32{userToken(user2, 0) + 1, userToken(user3, 0) + 1, user3Group1Rule1Token + 1, user3Group1Rule2Token + 1, user2Group1Rule2Token + 1}), ring.ACTIVE, time.Now(), false, time.Now())
},
expectedRules: expectedRulesMap{

@ -133,6 +133,16 @@ func (r *readRingMock) ZonesCount() int {
return len(uniqueZone)
}
// WritableInstancesWithTokensCount returns the number of writable instances in the ring that have tokens.
func (r *readRingMock) WritableInstancesWithTokensCount() int {
return len(r.replicationSet.Instances)
}
// WritableInstancesWithTokensInZoneCount returns the number of writable instances in the ring that are registered in given zone and have tokens.
func (r *readRingMock) WritableInstancesWithTokensInZoneCount(_ string) int {
return len(r.replicationSet.Instances)
}
type readLifecyclerMock struct {
mock.Mock
addr string

@ -54,7 +54,7 @@ func (b *Backoff) Ongoing() bool {
return b.ctx.Err() == nil && (b.cfg.MaxRetries == 0 || b.numRetries < b.cfg.MaxRetries)
}
// Err returns the reason for terminating the backoff, or nil if it didn't terminate
// Err returns the reason for terminating the backoff, or nil if it didn't terminate.
func (b *Backoff) Err() error {
if b.ctx.Err() != nil {
return b.ctx.Err()
@ -65,6 +65,15 @@ func (b *Backoff) Err() error {
return nil
}
// ErrCause is like Err() but returns the context cause if backoff is terminated because the
// context has been canceled.
func (b *Backoff) ErrCause() error {
if b.ctx.Err() != nil {
return context.Cause(b.ctx)
}
return b.Err()
}
// NumRetries returns the number of retries so far
func (b *Backoff) NumRetries() int {
return b.numRetries

@ -7,7 +7,6 @@ import (
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
"github.com/grafana/dskit/internal/math"
"github.com/grafana/dskit/multierror"
)
@ -31,7 +30,7 @@ func ForEachUser(ctx context.Context, userIDs []string, concurrency int, userFun
errsMx := sync.Mutex{}
wg := sync.WaitGroup{}
for ix := 0; ix < math.Min(concurrency, len(userIDs)); ix++ {
for ix := 0; ix < min(concurrency, len(userIDs)); ix++ {
wg.Add(1)
go func() {
defer wg.Done()
@ -108,7 +107,7 @@ func ForEachJob(ctx context.Context, jobs int, concurrency int, jobFunc func(ctx
// Start workers to process jobs.
g, ctx := errgroup.WithContext(ctx)
for ix := 0; ix < math.Min(concurrency, jobs); ix++ {
for ix := 0; ix < min(concurrency, jobs); ix++ {
g.Go(func() error {
for ctx.Err() == nil {
idx := int(indexes.Inc())

@ -109,6 +109,24 @@ func (cfg *ClientConfig) GetTLSConfig() (*tls.Config, error) {
config.RootCAs = caCertPool
}
loadCert := func() (tls.Certificate, error) {
cert, err := reader.ReadSecret(cfg.CertPath)
if err != nil {
return tls.Certificate{}, errors.Wrapf(err, "error loading client cert: %s", cfg.CertPath)
}
key, err := reader.ReadSecret(cfg.KeyPath)
if err != nil {
return tls.Certificate{}, errors.Wrapf(err, "error loading client key: %s", cfg.KeyPath)
}
clientCert, err := tls.X509KeyPair(cert, key)
if err != nil {
return tls.Certificate{}, errors.Wrapf(err, "failed to load TLS certificate %s,%s", cfg.CertPath, cfg.KeyPath)
}
return clientCert, nil
}
// Read Client Certificate
if cfg.CertPath != "" || cfg.KeyPath != "" {
if cfg.CertPath == "" {
@ -117,21 +135,23 @@ func (cfg *ClientConfig) GetTLSConfig() (*tls.Config, error) {
if cfg.KeyPath == "" {
return nil, errKeyMissing
}
cert, err := reader.ReadSecret(cfg.CertPath)
// Confirm that certificate and key paths are valid.
cert, err := loadCert()
if err != nil {
return nil, errors.Wrapf(err, "error loading client cert: %s", cfg.CertPath)
}
key, err := reader.ReadSecret(cfg.KeyPath)
if err != nil {
return nil, errors.Wrapf(err, "error loading client key: %s", cfg.KeyPath)
return nil, err
}
clientCert, err := tls.X509KeyPair(cert, key)
if err != nil {
return nil, errors.Wrapf(err, "failed to load TLS certificate %s,%s", cfg.CertPath, cfg.KeyPath)
config.GetClientCertificate = func(_ *tls.CertificateRequestInfo) (*tls.Certificate, error) {
c, err := loadCert()
if err != nil {
return nil, err
}
return &c, err
}
config.Certificates = []tls.Certificate{clientCert}
// Allow fallback for callers using this config also for server purposes (i.e., kv/memberlist).
// Clients will prefer GetClientCertificate, but servers can use Certificates.
config.Certificates = []tls.Certificate{cert}
}
if cfg.MinVersion != "" {

@ -1,9 +0,0 @@
package math
// Min returns the minimum of two ints.
func Min(a, b int) int {
if a < b {
return a
}
return b
}

@ -1,10 +1,7 @@
package memberlist
import (
"fmt"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/hashicorp/memberlist"
)
@ -45,7 +42,6 @@ func (r ringBroadcast) Invalidates(old memberlist.Broadcast) bool {
// otherwise, we may be invalidating some older messages, which however covered different
// ingesters
if r.version >= oldb.version {
level.Debug(r.logger).Log("msg", "Invalidating forwarded broadcast", "key", r.key, "version", r.version, "oldVersion", oldb.version, "content", fmt.Sprintf("%v", r.content), "oldContent", fmt.Sprintf("%v", oldb.content))
return true
}
}

@ -157,7 +157,8 @@ type KVConfig struct {
LeftIngestersTimeout time.Duration `yaml:"left_ingesters_timeout" category:"advanced"`
// Timeout used when leaving the memberlist cluster.
LeaveTimeout time.Duration `yaml:"leave_timeout" category:"advanced"`
LeaveTimeout time.Duration `yaml:"leave_timeout" category:"advanced"`
BroadcastTimeoutForLocalUpdatesOnShutdown time.Duration `yaml:"broadcast_timeout_for_local_updates_on_shutdown" category:"advanced"`
// How much space to use to keep received and sent messages in memory (for troubleshooting).
MessageHistoryBufferBytes int `yaml:"message_history_buffer_bytes" category:"advanced"`
@ -198,6 +199,7 @@ func (cfg *KVConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
f.IntVar(&cfg.AdvertisePort, prefix+"memberlist.advertise-port", mlDefaults.AdvertisePort, "Gossip port to advertise to other members in the cluster. Used for NAT traversal.")
f.StringVar(&cfg.ClusterLabel, prefix+"memberlist.cluster-label", mlDefaults.Label, "The cluster label is an optional string to include in outbound packets and gossip streams. Other members in the memberlist cluster will discard any message whose label doesn't match the configured one, unless the 'cluster-label-verification-disabled' configuration option is set to true.")
f.BoolVar(&cfg.ClusterLabelVerificationDisabled, prefix+"memberlist.cluster-label-verification-disabled", mlDefaults.SkipInboundLabelCheck, "When true, memberlist doesn't verify that inbound packets and gossip streams have the cluster label matching the configured one. This verification should be disabled while rolling out the change to the configured cluster label in a live memberlist cluster.")
f.DurationVar(&cfg.BroadcastTimeoutForLocalUpdatesOnShutdown, prefix+"memberlist.broadcast-timeout-for-local-updates-on-shutdown", 10*time.Second, "Timeout for broadcasting all remaining locally-generated updates to other nodes when shutting down. Only used if there are nodes left in the memberlist cluster, and only applies to locally-generated updates, not to broadcast messages that are result of incoming gossip updates. 0 = no timeout, wait until all locally-generated updates are sent.")
cfg.TCPTransport.RegisterFlagsWithPrefix(f, prefix)
}
@ -231,10 +233,11 @@ type KV struct {
// dns discovery provider
provider DNSProvider
// Protects access to memberlist and broadcasts fields.
delegateReady atomic.Bool
memberlist *memberlist.Memberlist
broadcasts *memberlist.TransmitLimitedQueue
// Protects access to memberlist and broadcast queues.
delegateReady atomic.Bool
memberlist *memberlist.Memberlist
localBroadcasts *memberlist.TransmitLimitedQueue // queue for messages generated locally
gossipBroadcasts *memberlist.TransmitLimitedQueue // queue for messages that we forward from other nodes
// KV Store.
storeMu sync.Mutex
@ -273,7 +276,8 @@ type KV struct {
numberOfPushes prometheus.Counter
totalSizeOfPulls prometheus.Counter
totalSizeOfPushes prometheus.Counter
numberOfBroadcastMessagesInQueue prometheus.GaugeFunc
numberOfGossipMessagesInQueue prometheus.GaugeFunc
numberOfLocalMessagesInQueue prometheus.GaugeFunc
totalSizeOfBroadcastMessagesInQueue prometheus.Gauge
numberOfBroadcastMessagesDropped prometheus.Counter
casAttempts prometheus.Counter
@ -456,7 +460,11 @@ func (m *KV) starting(ctx context.Context) error {
}
// Finish delegate initialization.
m.memberlist = list
m.broadcasts = &memberlist.TransmitLimitedQueue{
m.localBroadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: list.NumMembers,
RetransmitMult: mlCfg.RetransmitMult,
}
m.gossipBroadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: list.NumMembers,
RetransmitMult: mlCfg.RetransmitMult,
}
@ -719,20 +727,24 @@ func (m *KV) discoverMembers(ctx context.Context, members []string) []string {
func (m *KV) stopping(_ error) error {
level.Info(m.logger).Log("msg", "leaving memberlist cluster")
// Wait until broadcast queue is empty, but don't wait for too long.
// Wait until queue with locally-generated messages is empty, but don't wait for too long.
// Also don't wait if there is just one node left.
// Problem is that broadcast queue is also filled up by state changes received from other nodes,
// so it may never be empty in a busy cluster. However, we generally only care about messages
// generated on this node via CAS, and those are disabled now (via casBroadcastsEnabled), and should be able
// to get out in this timeout.
// Note: Once we enter Stopping state, we don't queue more locally-generated messages.
waitTimeout := time.Now().Add(10 * time.Second)
for m.broadcasts.NumQueued() > 0 && m.memberlist.NumMembers() > 1 && time.Now().Before(waitTimeout) {
deadline := time.Now().Add(m.cfg.BroadcastTimeoutForLocalUpdatesOnShutdown)
msgs := m.localBroadcasts.NumQueued()
nodes := m.memberlist.NumMembers()
for msgs > 0 && nodes > 1 && (m.cfg.BroadcastTimeoutForLocalUpdatesOnShutdown <= 0 || time.Now().Before(deadline)) {
level.Info(m.logger).Log("msg", "waiting for locally-generated broadcast messages to be sent out", "count", msgs, "nodes", nodes)
time.Sleep(250 * time.Millisecond)
msgs = m.localBroadcasts.NumQueued()
nodes = m.memberlist.NumMembers()
}
if cnt := m.broadcasts.NumQueued(); cnt > 0 {
level.Warn(m.logger).Log("msg", "broadcast messages left in queue", "count", cnt, "nodes", m.memberlist.NumMembers())
if msgs > 0 {
level.Warn(m.logger).Log("msg", "locally-generated broadcast messages left the queue", "count", msgs, "nodes", nodes)
}
err := m.memberlist.Leave(m.cfg.LeaveTimeout)
@ -972,11 +984,7 @@ outer:
m.casSuccesses.Inc()
m.notifyWatchers(key)
if m.State() == services.Running {
m.broadcastNewValue(key, change, newver, codec)
} else {
level.Warn(m.logger).Log("msg", "skipped broadcasting CAS update because memberlist KV is shutting down", "key", key)
}
m.broadcastNewValue(key, change, newver, codec, true)
}
return nil
@ -1034,7 +1042,12 @@ func (m *KV) trySingleCas(key string, codec codec.Codec, f func(in interface{})
return change, newver, retry, nil
}
func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec codec.Codec) {
func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec codec.Codec, locallyGenerated bool) {
if locallyGenerated && m.State() != services.Running {
level.Warn(m.logger).Log("msg", "skipped broadcasting of locally-generated update because memberlist KV is shutting down", "key", key)
return
}
data, err := codec.Encode(change)
if err != nil {
level.Error(m.logger).Log("msg", "failed to encode change", "key", key, "version", version, "err", err)
@ -1058,7 +1071,25 @@ func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec
Changes: change.MergeContent(),
})
m.queueBroadcast(key, change.MergeContent(), version, pairData)
l := len(pairData)
b := ringBroadcast{
key: key,
content: change.MergeContent(),
version: version,
msg: pairData,
finished: func(ringBroadcast) {
m.totalSizeOfBroadcastMessagesInQueue.Sub(float64(l))
},
logger: m.logger,
}
m.totalSizeOfBroadcastMessagesInQueue.Add(float64(l))
if locallyGenerated {
m.localBroadcasts.QueueBroadcast(b)
} else {
m.gossipBroadcasts.QueueBroadcast(b)
}
}
// NodeMeta is method from Memberlist Delegate interface
@ -1153,7 +1184,7 @@ func (m *KV) processValueUpdate(workerCh <-chan valueUpdate, key string) {
m.notifyWatchers(key)
// Don't resend original message, but only changes.
m.broadcastNewValue(key, mod, version, update.codec)
m.broadcastNewValue(key, mod, version, update.codec, false)
}
case <-m.shutdown:
@ -1163,24 +1194,6 @@ func (m *KV) processValueUpdate(workerCh <-chan valueUpdate, key string) {
}
}
func (m *KV) queueBroadcast(key string, content []string, version uint, message []byte) {
l := len(message)
b := ringBroadcast{
key: key,
content: content,
version: version,
msg: message,
finished: func(ringBroadcast) {
m.totalSizeOfBroadcastMessagesInQueue.Sub(float64(l))
},
logger: m.logger,
}
m.totalSizeOfBroadcastMessagesInQueue.Add(float64(l))
m.broadcasts.QueueBroadcast(b)
}
// GetBroadcasts is method from Memberlist Delegate interface
// It returns all pending broadcasts (within the size limit)
func (m *KV) GetBroadcasts(overhead, limit int) [][]byte {
@ -1188,7 +1201,18 @@ func (m *KV) GetBroadcasts(overhead, limit int) [][]byte {
return nil
}
return m.broadcasts.GetBroadcasts(overhead, limit)
// Prioritize locally-generated messages
msgs := m.localBroadcasts.GetBroadcasts(overhead, limit)
// Decrease limit for each message we got from locally-generated broadcasts.
for _, m := range msgs {
limit -= overhead + len(m)
}
if limit > 0 {
msgs = append(msgs, m.gossipBroadcasts.GetBroadcasts(overhead, limit)...)
}
return msgs
}
// LocalState is method from Memberlist Delegate interface
@ -1335,7 +1359,7 @@ func (m *KV) MergeRemoteState(data []byte, _ bool) {
level.Error(m.logger).Log("msg", "failed to store received value", "key", kvPair.Key, "err", err)
} else if newver > 0 {
m.notifyWatchers(kvPair.Key)
m.broadcastNewValue(kvPair.Key, change, newver, codec)
m.broadcastNewValue(kvPair.Key, change, newver, codec, false)
}
}

@ -71,15 +71,33 @@ func (m *KV) createAndRegisterMetrics() {
Help: "Total size of pulled state",
})
m.numberOfBroadcastMessagesInQueue = promauto.With(m.registerer).NewGaugeFunc(prometheus.GaugeOpts{
Namespace: m.cfg.MetricsNamespace,
Subsystem: subsystem,
Name: "messages_in_broadcast_queue",
Help: "Number of user messages in the broadcast queue",
const queueMetricName = "messages_in_broadcast_queue"
const queueMetricHelp = "Number of user messages in the broadcast queue"
m.numberOfGossipMessagesInQueue = promauto.With(m.registerer).NewGaugeFunc(prometheus.GaugeOpts{
Namespace: m.cfg.MetricsNamespace,
Subsystem: subsystem,
Name: queueMetricName,
Help: queueMetricHelp,
ConstLabels: map[string]string{"queue": "gossip"},
}, func() float64 {
// Queues are not set before Starting state
if m.State() == services.Running || m.State() == services.Stopping {
return float64(m.gossipBroadcasts.NumQueued())
}
return 0
})
m.numberOfLocalMessagesInQueue = promauto.With(m.registerer).NewGaugeFunc(prometheus.GaugeOpts{
Namespace: m.cfg.MetricsNamespace,
Subsystem: subsystem,
Name: queueMetricName,
Help: queueMetricHelp,
ConstLabels: map[string]string{"queue": "local"},
}, func() float64 {
// m.broadcasts is not set before Starting state
// Queues are not set before Starting state
if m.State() == services.Running || m.State() == services.Stopping {
return float64(m.broadcasts.NumQueued())
return float64(m.localBroadcasts.NumQueued())
}
return 0
})

@ -73,6 +73,8 @@ Rather, it's the delegate's responsibility to call [BasicLifecycler.ChangeState]
- The lifecycler will then periodically, based on the [ring.BasicLifecyclerConfig.TokensObservePeriod], attempt to verify that its tokens have been added to the ring, after which it will call [ring.BasicLifecyclerDelegate.OnRingInstanceTokens].
- The lifecycler will update they key/value store with heartbeats and state changes based on the [ring.BasicLifecyclerConfig.HeartbeatPeriod], calling [ring.BasicLifecyclerDelegate.OnRingInstanceHeartbeat] each time.
- When the BasicLifecycler is stopped, it will call [ring.BasicLifecyclerDelegate.OnRingInstanceStopping].
BasicLifecycler does not support read only instances for now.
*/
type BasicLifecycler struct {
*services.BasicService
@ -316,7 +318,7 @@ func (l *BasicLifecycler) registerInstance(ctx context.Context) error {
// Always overwrite the instance in the ring (even if already exists) because some properties
// may have changed (stated, tokens, zone, address) and even if they didn't the heartbeat at
// least did.
instanceDesc = ringDesc.AddIngester(l.cfg.ID, l.cfg.Addr, l.cfg.Zone, tokens, state, registeredAt)
instanceDesc = ringDesc.AddIngester(l.cfg.ID, l.cfg.Addr, l.cfg.Zone, tokens, state, registeredAt, false, time.Time{})
return ringDesc, true, nil
})
@ -443,7 +445,7 @@ func (l *BasicLifecycler) updateInstance(ctx context.Context, update func(*Desc,
// a resharding of tenants among instances: to guarantee query correctness we need to update the
// registration timestamp to current time.
registeredAt := time.Now()
instanceDesc = ringDesc.AddIngester(l.cfg.ID, l.cfg.Addr, l.cfg.Zone, l.GetTokens(), l.GetState(), registeredAt)
instanceDesc = ringDesc.AddIngester(l.cfg.ID, l.cfg.Addr, l.cfg.Zone, l.GetTokens(), l.GetState(), registeredAt, false, time.Time{})
}
prevTimestamp := instanceDesc.Timestamp

@ -147,10 +147,12 @@ type Lifecycler struct {
// We need to remember the ingester state, tokens and registered timestamp just in case the KV store
// goes away and comes back empty. The state changes during lifecycle of instance.
stateMtx sync.RWMutex
state InstanceState
tokens Tokens
registeredAt time.Time
stateMtx sync.RWMutex
state InstanceState
tokens Tokens
registeredAt time.Time
readOnly bool
readOnlyLastUpdated time.Time
// Controls the ready-reporting
readyLock sync.Mutex
@ -161,6 +163,7 @@ type Lifecycler struct {
countersLock sync.RWMutex
healthyInstancesCount int
instancesCount int
readOnlyInstancesCount int
healthyInstancesInZoneCount int
instancesInZoneCount int
zonesCount int
@ -349,6 +352,26 @@ func (i *Lifecycler) ChangeState(ctx context.Context, state InstanceState) error
return <-errCh
}
func (i *Lifecycler) ChangeReadOnlyState(ctx context.Context, readOnly bool) error {
errCh := make(chan error)
fn := func() {
prevReadOnly, _ := i.GetReadOnlyState()
if prevReadOnly == readOnly {
errCh <- nil
return
}
level.Info(i.logger).Log("msg", "changing read-only state of instance in the ring", "readOnly", readOnly, "ring", i.RingName)
i.setReadOnlyState(readOnly, time.Now())
errCh <- i.updateConsul(ctx)
}
if err := i.sendToLifecyclerLoop(fn); err != nil {
return err
}
return <-errCh
}
func (i *Lifecycler) getTokens() Tokens {
i.stateMtx.RLock()
defer i.stateMtx.RUnlock()
@ -379,6 +402,21 @@ func (i *Lifecycler) setRegisteredAt(registeredAt time.Time) {
i.registeredAt = registeredAt
}
// GetReadOnlyState returns the read-only state of this instance -- whether instance is read-only, and when what the last
// update of read-only state (possibly zero).
func (i *Lifecycler) GetReadOnlyState() (bool, time.Time) {
i.stateMtx.RLock()
defer i.stateMtx.RUnlock()
return i.readOnly, i.readOnlyLastUpdated
}
func (i *Lifecycler) setReadOnlyState(readOnly bool, readOnlyLastUpdated time.Time) {
i.stateMtx.Lock()
defer i.stateMtx.Unlock()
i.readOnly = readOnly
i.readOnlyLastUpdated = readOnlyLastUpdated
}
// ClaimTokensFor takes all the tokens for the supplied ingester and assigns them to this ingester.
//
// For this method to work correctly (especially when using gossiping), source ingester (specified by
@ -442,6 +480,14 @@ func (i *Lifecycler) InstancesCount() int {
return i.instancesCount
}
// ReadOnlyInstancesCount returns the total number of instances in the ring that are read only, updated during the last heartbeat period.
func (i *Lifecycler) ReadOnlyInstancesCount() int {
i.countersLock.RLock()
defer i.countersLock.RUnlock()
return i.readOnlyInstancesCount
}
// HealthyInstancesInZoneCount returns the number of healthy instances in the ring that are registered in
// this lifecycler's zone, updated during the last heartbeat period.
func (i *Lifecycler) HealthyInstancesInZoneCount() int {
@ -629,10 +675,11 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
instanceDesc, ok := ringDesc.Ingesters[i.ID]
if !ok {
// The instance doesn't exist in the ring, so it's safe to set the registered timestamp
// as of now.
registeredAt := time.Now()
i.setRegisteredAt(registeredAt)
now := time.Now()
// The instance doesn't exist in the ring, so it's safe to set the registered timestamp as of now.
i.setRegisteredAt(now)
// Clear read-only state, and set last update time to "now".
i.setReadOnlyState(false, now)
// We use the tokens from the file only if it does not exist in the ring yet.
if len(tokensFromFile) > 0 {
@ -640,14 +687,16 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
if len(tokensFromFile) >= i.cfg.NumTokens {
i.setState(ACTIVE)
}
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokensFromFile, i.GetState(), registeredAt)
ro, rots := i.GetReadOnlyState()
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokensFromFile, i.GetState(), i.getRegisteredAt(), ro, rots)
i.setTokens(tokensFromFile)
return ringDesc, true, nil
}
// Either we are a new ingester, or consul must have restarted
level.Info(i.logger).Log("msg", "instance not found in ring, adding with no tokens", "ring", i.RingName)
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, []uint32{}, i.GetState(), registeredAt)
ro, rots := i.GetReadOnlyState()
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, []uint32{}, i.GetState(), i.getRegisteredAt(), ro, rots)
return ringDesc, true, nil
}
@ -655,6 +704,9 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
// but we need to update the local state accordingly.
i.setRegisteredAt(instanceDesc.GetRegisteredAt())
// Set lifecycler read-only state from ring entry. We will not modify ring entry's read-only state.
i.setReadOnlyState(instanceDesc.GetReadOnlyState())
// If the ingester is in the JOINING state this means it crashed due to
// a failed token transfer or some other reason during startup. We want
// to set it back to PENDING in order to start the lifecycle from the
@ -747,7 +799,8 @@ func (i *Lifecycler) verifyTokens(ctx context.Context) bool {
ringTokens = append(ringTokens, newTokens...)
sort.Sort(ringTokens)
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, ringTokens, i.GetState(), i.getRegisteredAt())
ro, rots := i.GetReadOnlyState()
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, ringTokens, i.GetState(), i.getRegisteredAt(), ro, rots)
i.setTokens(ringTokens)
@ -855,7 +908,8 @@ func (i *Lifecycler) autoJoin(ctx context.Context, targetState InstanceState) er
sort.Sort(myTokens)
i.setTokens(myTokens)
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), i.GetState(), i.getRegisteredAt())
ro, rots := i.GetReadOnlyState()
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), i.GetState(), i.getRegisteredAt(), ro, rots)
return ringDesc, true, nil
})
@ -889,7 +943,8 @@ func (i *Lifecycler) updateConsul(ctx context.Context) error {
tokens = instanceDesc.Tokens
}
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokens, i.GetState(), i.getRegisteredAt())
ro, rots := i.GetReadOnlyState()
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokens, i.GetState(), i.getRegisteredAt(), ro, rots)
return ringDesc, true, nil
})
@ -922,6 +977,7 @@ func (i *Lifecycler) changeState(ctx context.Context, state InstanceState) error
func (i *Lifecycler) updateCounters(ringDesc *Desc) {
healthyInstancesCount := 0
instancesCount := 0
readOnlyInstancesCount := 0
zones := map[string]int{}
healthyInstancesInZone := map[string]int{}
@ -931,6 +987,9 @@ func (i *Lifecycler) updateCounters(ringDesc *Desc) {
for _, ingester := range ringDesc.Ingesters {
zones[ingester.Zone]++
instancesCount++
if ingester.ReadOnly {
readOnlyInstancesCount++
}
// Count the number of healthy instances for Write operation.
if ingester.IsHealthy(Write, i.cfg.RingConfig.HeartbeatTimeout, now) {
@ -944,6 +1003,7 @@ func (i *Lifecycler) updateCounters(ringDesc *Desc) {
i.countersLock.Lock()
i.healthyInstancesCount = healthyInstancesCount
i.instancesCount = instancesCount
i.readOnlyInstancesCount = readOnlyInstancesCount
i.healthyInstancesInZoneCount = healthyInstancesInZone[i.cfg.Zone]
i.instancesInZoneCount = zones[i.cfg.Zone]
i.zonesCount = len(zones)

@ -45,26 +45,30 @@ func NewDesc() *Desc {
}
}
func timeToUnixSecons(t time.Time) int64 {
if t.IsZero() {
return 0
}
return t.Unix()
}
// AddIngester adds the given ingester to the ring. Ingester will only use supplied tokens,
// any other tokens are removed.
func (d *Desc) AddIngester(id, addr, zone string, tokens []uint32, state InstanceState, registeredAt time.Time) InstanceDesc {
func (d *Desc) AddIngester(id, addr, zone string, tokens []uint32, state InstanceState, registeredAt time.Time, readOnly bool, readOnlyUpdated time.Time) InstanceDesc {
if d.Ingesters == nil {
d.Ingesters = map[string]InstanceDesc{}
}
registeredTimestamp := int64(0)
if !registeredAt.IsZero() {
registeredTimestamp = registeredAt.Unix()
}
ingester := InstanceDesc{
Id: id,
Addr: addr,
Timestamp: time.Now().Unix(),
RegisteredTimestamp: registeredTimestamp,
State: state,
Tokens: tokens,
Zone: zone,
Id: id,
Addr: addr,
Timestamp: time.Now().Unix(),
State: state,
Tokens: tokens,
Zone: zone,
RegisteredTimestamp: timeToUnixSecons(registeredAt),
ReadOnly: readOnly,
ReadOnlyUpdatedTimestamp: timeToUnixSecons(readOnlyUpdated),
}
d.Ingesters[id] = ingester
@ -142,6 +146,20 @@ func (i *InstanceDesc) GetRegisteredAt() time.Time {
return time.Unix(i.RegisteredTimestamp, 0)
}
// GetReadOnlyState returns the read-only state and timestamp of last read-only state update.
func (i *InstanceDesc) GetReadOnlyState() (bool, time.Time) {
if i == nil {
return false, time.Time{}
}
ts := time.Time{}
if i.ReadOnlyUpdatedTimestamp > 0 {
ts = time.Unix(i.ReadOnlyUpdatedTimestamp, 0)
}
return i.ReadOnly, ts
}
func (i *InstanceDesc) IsHealthy(op Operation, heartbeatTimeout time.Duration, now time.Time) bool {
healthy := op.IsInstanceInStateHealthy(i.State)
@ -552,6 +570,30 @@ func (d *Desc) instancesWithTokensCountPerZone() map[string]int {
return instancesCountPerZone
}
func (d *Desc) writableInstancesWithTokensCount() int {
writableInstancesWithTokensCount := 0
if d != nil {
for _, ingester := range d.Ingesters {
if len(ingester.Tokens) > 0 && !ingester.ReadOnly {
writableInstancesWithTokensCount++
}
}
}
return writableInstancesWithTokensCount
}
func (d *Desc) writableInstancesWithTokensCountPerZone() map[string]int {
instancesCountPerZone := map[string]int{}
if d != nil {
for _, ingester := range d.Ingesters {
if len(ingester.Tokens) > 0 && !ingester.ReadOnly {
instancesCountPerZone[ingester.Zone]++
}
}
}
return instancesCountPerZone
}
type CompareResult int
// CompareResult responses
@ -600,6 +642,14 @@ func (d *Desc) RingCompare(o *Desc) CompareResult {
return Different
}
if ing.ReadOnly != oing.ReadOnly {
return Different
}
if ing.ReadOnlyUpdatedTimestamp != oing.ReadOnlyUpdatedTimestamp {
return Different
}
if len(ing.Tokens) != len(oing.Tokens) {
return Different
}

@ -20,7 +20,6 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/grafana/dskit/flagext"
dsmath "github.com/grafana/dskit/internal/math"
"github.com/grafana/dskit/internal/slices"
"github.com/grafana/dskit/kv"
shardUtil "github.com/grafana/dskit/ring/shard"
@ -36,6 +35,7 @@ const (
)
// ReadRing represents the read interface to the ring.
// Support for read-only instances requires use of ShuffleShard or ShuffleShardWithLookback prior to getting a ReplicationSet.
type ReadRing interface {
// Get returns n (or more) instances which form the replicas for the given key.
// bufDescs, bufHosts and bufZones are slices to be overwritten for the return value
@ -88,6 +88,12 @@ type ReadRing interface {
// InstancesWithTokensInZoneCount returns the number of instances in the ring that are registered in given zone and have tokens.
InstancesWithTokensInZoneCount(zone string) int
// WritableInstancesWithTokensCount returns the number of writable instances in the ring that have tokens.
WritableInstancesWithTokensCount() int
// WritableInstancesWithTokensInZoneCount returns the number of writable instances in the ring that are registered in given zone and have tokens.
WritableInstancesWithTokensInZoneCount(zone string) int
// ZonesCount returns the number of zones for which there's at least 1 instance registered in the ring.
ZonesCount() int
}
@ -167,6 +173,7 @@ type instanceInfo struct {
}
// Ring is a Service that maintains an in-memory copy of a ring and watches for changes.
// Support for read-only instances requires use of ShuffleShard or ShuffleShardWithLookback prior to getting a ReplicationSet.
type Ring struct {
services.Service
@ -205,6 +212,12 @@ type Ring struct {
// Nubmber of registered instances with tokens per zone.
instancesWithTokensCountPerZone map[string]int
// Number of registered instances are writable and have tokens.
writableInstancesWithTokensCount int
// Nubmber of registered instances with tokens per zone that are writable.
writableInstancesWithTokensCountPerZone map[string]int
// Cache of shuffle-sharded subrings per identifier. Invalidated when topology changes.
// If set to nil, no caching is done (used by tests, and subrings).
shuffledSubringCache map[subringCacheKey]*Ring
@ -357,6 +370,8 @@ func (r *Ring) updateRingState(ringDesc *Desc) {
instancesWithTokensCount := ringDesc.instancesWithTokensCount()
instancesCountPerZone := ringDesc.instancesCountPerZone()
instancesWithTokensCountPerZone := ringDesc.instancesWithTokensCountPerZone()
writableInstancesWithTokensCount := ringDesc.writableInstancesWithTokensCount()
writableInstancesWithTokensCountPerZone := ringDesc.writableInstancesWithTokensCountPerZone()
r.mtx.Lock()
defer r.mtx.Unlock()
@ -368,6 +383,8 @@ func (r *Ring) updateRingState(ringDesc *Desc) {
r.instancesWithTokensCount = instancesWithTokensCount
r.instancesCountPerZone = instancesCountPerZone
r.instancesWithTokensCountPerZone = instancesWithTokensCountPerZone
r.writableInstancesWithTokensCount = writableInstancesWithTokensCount
r.writableInstancesWithTokensCountPerZone = writableInstancesWithTokensCountPerZone
r.oldestRegisteredTimestamp = oldestRegisteredTimestamp
r.lastTopologyChange = now
@ -423,7 +440,7 @@ func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []Instance
distinctHosts = bufHosts[:0]
distinctZones = bufZones[:0]
)
for i := start; len(distinctHosts) < dsmath.Min(maxInstances, n) && len(distinctZones) < maxZones && iterations < len(r.ringTokens); i++ {
for i := start; len(distinctHosts) < min(maxInstances, n) && len(distinctZones) < maxZones && iterations < len(r.ringTokens); i++ {
iterations++
// Wrap i around in the ring.
i %= len(r.ringTokens)
@ -528,7 +545,7 @@ func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, erro
// Given data is replicated to RF different zones, we can tolerate a number of
// RF/2 failing zones. However, we need to protect from the case the ring currently
// contains instances in a number of zones < RF.
numReplicatedZones := dsmath.Min(len(r.ringZones), r.cfg.ReplicationFactor)
numReplicatedZones := min(len(r.ringZones), r.cfg.ReplicationFactor)
minSuccessZones := (numReplicatedZones / 2) + 1
maxUnavailableZones = minSuccessZones - 1
@ -677,10 +694,14 @@ func (r *Ring) updateRingMetrics(compareResult CompareResult) {
//
// - Shuffling: probabilistically, for a large enough cluster each identifier gets a different
// set of instances, with a reduced number of overlapping instances between two identifiers.
//
// Subring returned by this method does not contain instances that have read-only field set.
func (r *Ring) ShuffleShard(identifier string, size int) ReadRing {
// Nothing to do if the shard size is not smaller then the actual ring.
if size <= 0 || r.InstancesCount() <= size {
return r
// Use all possible instances if shuffle sharding is disabled. We don't set size to r.InstancesCount(), because
// that could lead to not all instances being returned when ring zones are unbalanced.
// Reason for not returning entire ring directly is that we need to filter out read-only instances.
if size <= 0 {
size = math.MaxInt
}
if cached := r.getCachedShuffledSubring(identifier, size); cached != nil {
@ -705,7 +726,7 @@ func (r *Ring) ShuffleShard(identifier string, size int) ReadRing {
// This function supports caching, but the cache will only be effective if successive calls for the
// same identifier are with the same lookbackPeriod and increasing values of now.
func (r *Ring) ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing {
// Nothing to do if the shard size is not smaller then the actual ring.
// Nothing to do if the shard size is not smaller than the actual ring.
if size <= 0 || r.InstancesCount() <= size {
return r
}
@ -750,7 +771,7 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur
actualZones = []string{""}
}
shard := make(map[string]InstanceDesc, size)
shard := make(map[string]InstanceDesc, min(len(r.ringDesc.Ingesters), size))
// We need to iterate zones always in the same order to guarantee stability.
for _, zone := range actualZones {
@ -797,6 +818,13 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur
instanceID := info.InstanceID
instance := r.ringDesc.Ingesters[instanceID]
// The lookbackPeriod is 0 when this function is called by ShuffleShard(). In this case, we want read only instances excluded.
if lookbackPeriod == 0 && instance.ReadOnly {
continue
}
// Include instance in the subring.
shard[instanceID] = instance
// If the lookback is enabled and this instance has been registered within the lookback period
@ -805,6 +833,15 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur
continue
}
// If the lookback is enabled, and this instance is read only or has switched its read-only state
// within the lookback period, then we should include it in the subring, but continue selecting more instances.
//
// * If instance switched to read-only state within the lookback period, then next instance is currently receiving data that previously belonged to this instance.
// * If instance switched to read-write state (read-only=false) within the lookback period, then there was another instance that received data that now belongs back to this instance.
if lookbackPeriod > 0 && (instance.ReadOnly || instance.ReadOnlyUpdatedTimestamp >= lookbackUntil) {
continue
}
found = true
break
}
@ -824,15 +861,17 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur
shardTokens := mergeTokenGroups(shardTokensByZone)
return &Ring{
cfg: r.cfg,
strategy: r.strategy,
ringDesc: shardDesc,
ringTokens: shardTokens,
ringTokensByZone: shardTokensByZone,
ringZones: getZones(shardTokensByZone),
instancesWithTokensCount: shardDesc.instancesWithTokensCount(),
instancesCountPerZone: shardDesc.instancesCountPerZone(),
instancesWithTokensCountPerZone: shardDesc.instancesWithTokensCountPerZone(),
cfg: r.cfg,
strategy: r.strategy,
ringDesc: shardDesc,
ringTokens: shardTokens,
ringTokensByZone: shardTokensByZone,
ringZones: getZones(shardTokensByZone),
instancesWithTokensCount: shardDesc.instancesWithTokensCount(),
instancesCountPerZone: shardDesc.instancesCountPerZone(),
instancesWithTokensCountPerZone: shardDesc.instancesWithTokensCountPerZone(),
writableInstancesWithTokensCount: shardDesc.writableInstancesWithTokensCount(),
writableInstancesWithTokensCountPerZone: shardDesc.writableInstancesWithTokensCountPerZone(),
oldestRegisteredTimestamp: shardDesc.getOldestRegisteredTimestamp(),
@ -1036,11 +1075,12 @@ func (r *Ring) setCachedShuffledSubringWithLookback(identifier string, size int,
validForLookbackWindowsStartingBefore := int64(math.MaxInt64)
for _, instance := range subring.ringDesc.Ingesters {
registeredDuringLookbackWindow := instance.RegisteredTimestamp >= lookbackWindowStart
if registeredDuringLookbackWindow && instance.RegisteredTimestamp < validForLookbackWindowsStartingBefore {
if instance.RegisteredTimestamp >= lookbackWindowStart && instance.RegisteredTimestamp < validForLookbackWindowsStartingBefore {
validForLookbackWindowsStartingBefore = instance.RegisteredTimestamp
}
if instance.ReadOnlyUpdatedTimestamp >= lookbackWindowStart && instance.ReadOnlyUpdatedTimestamp < validForLookbackWindowsStartingBefore {
validForLookbackWindowsStartingBefore = instance.ReadOnlyUpdatedTimestamp
}
}
r.mtx.Lock()
@ -1141,6 +1181,22 @@ func (r *Ring) InstancesWithTokensInZoneCount(zone string) int {
return r.instancesWithTokensCountPerZone[zone]
}
// WritableInstancesWithTokensCount returns the number of writable instances in the ring that have tokens.
func (r *Ring) WritableInstancesWithTokensCount() int {
r.mtx.RLock()
defer r.mtx.RUnlock()
return r.writableInstancesWithTokensCount
}
// WritableInstancesWithTokensInZoneCount returns the number of writable instances in the ring that are registered in given zone and have tokens.
func (r *Ring) WritableInstancesWithTokensInZoneCount(zone string) int {
r.mtx.RLock()
defer r.mtx.RUnlock()
return r.writableInstancesWithTokensCountPerZone[zone]
}
func (r *Ring) ZonesCount() int {
r.mtx.RLock()
defer r.mtx.RUnlock()
@ -1148,6 +1204,19 @@ func (r *Ring) ZonesCount() int {
return len(r.ringZones)
}
// readOnlyInstanceCount returns the number of read only instances in the ring.
func (r *Ring) readOnlyInstanceCount() int {
r.mtx.RLock()
c := 0
for _, i := range r.ringDesc.Ingesters {
if i.ReadOnly {
c++
}
}
r.mtx.RUnlock()
return c
}
// Operation describes which instances can be included in the replica set, based on their state.
//
// Implemented as bitmap, with upper 16-bits used for encoding extendReplicaSet, and lower 16-bits used for encoding healthy states.

@ -128,6 +128,15 @@ type InstanceDesc struct {
RegisteredTimestamp int64 `protobuf:"varint,8,opt,name=registered_timestamp,json=registeredTimestamp,proto3" json:"registered_timestamp,omitempty"`
// ID of the instance. This value is the same as the key in the ingesters map in Desc.
Id string `protobuf:"bytes,9,opt,name=id,proto3" json:"id,omitempty"`
// Unix timestamp (with seconds precision) of when the read_only flag was updated. This
// is used to find other instances that could have possibly owned a specific token in
// the past on the write path, due to *this* instance being read-only. This value should
// only increase.
ReadOnlyUpdatedTimestamp int64 `protobuf:"varint,10,opt,name=read_only_updated_timestamp,json=readOnlyUpdatedTimestamp,proto3" json:"read_only_updated_timestamp,omitempty"`
// Indicates whether this instance is read only.
// Read-only instances go through standard state changes, and special handling is applied to them
// during shuffle shards.
ReadOnly bool `protobuf:"varint,11,opt,name=read_only,json=readOnly,proto3" json:"read_only,omitempty"`
}
func (m *InstanceDesc) Reset() { *m = InstanceDesc{} }
@ -211,6 +220,20 @@ func (m *InstanceDesc) GetId() string {
return ""
}
func (m *InstanceDesc) GetReadOnlyUpdatedTimestamp() int64 {
if m != nil {
return m.ReadOnlyUpdatedTimestamp
}
return 0
}
func (m *InstanceDesc) GetReadOnly() bool {
if m != nil {
return m.ReadOnly
}
return false
}
func init() {
proto.RegisterEnum("ring.InstanceState", InstanceState_name, InstanceState_value)
proto.RegisterType((*Desc)(nil), "ring.Desc")
@ -221,35 +244,37 @@ func init() {
func init() { proto.RegisterFile("ring.proto", fileDescriptor_26381ed67e202a6e) }
var fileDescriptor_26381ed67e202a6e = []byte{
// 435 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x92, 0x41, 0x8b, 0xd3, 0x40,
0x1c, 0xc5, 0xe7, 0x9f, 0x4c, 0xb3, 0xe9, 0xbf, 0x6e, 0x09, 0xb3, 0x22, 0x71, 0x91, 0x31, 0xec,
0x29, 0x0a, 0x76, 0xb1, 0x7a, 0x10, 0xc1, 0xc3, 0xae, 0x1b, 0x25, 0xa5, 0xd4, 0x25, 0x96, 0xbd,
0x4a, 0xda, 0x8c, 0x31, 0xac, 0x4d, 0x96, 0x64, 0x2a, 0xac, 0x27, 0x3f, 0x82, 0x5f, 0xc0, 0xbb,
0x1f, 0x65, 0x8f, 0x3d, 0xee, 0x49, 0x6c, 0x0a, 0xe2, 0x71, 0x3f, 0x82, 0xcc, 0xa4, 0x5a, 0x7b,
0x7b, 0xbf, 0xbc, 0x37, 0xef, 0x4d, 0x60, 0x10, 0xcb, 0x2c, 0x4f, 0x7b, 0x17, 0x65, 0x21, 0x0b,
0x46, 0x95, 0xde, 0x7f, 0x94, 0x66, 0xf2, 0xc3, 0x7c, 0xd2, 0x9b, 0x16, 0xb3, 0xc3, 0xb4, 0x48,
0x8b, 0x43, 0x6d, 0x4e, 0xe6, 0xef, 0x35, 0x69, 0xd0, 0xaa, 0x39, 0x74, 0xf0, 0x0d, 0x90, 0x9e,
0x88, 0x6a, 0xca, 0x5e, 0x60, 0x3b, 0xcb, 0x53, 0x51, 0x49, 0x51, 0x56, 0x2e, 0x78, 0xa6, 0xdf,
0xe9, 0xdf, 0xed, 0xe9, 0x76, 0x65, 0xf7, 0xc2, 0xbf, 0x5e, 0x90, 0xcb, 0xf2, 0xf2, 0x98, 0x5e,
0xfd, 0xb8, 0x4f, 0xa2, 0xcd, 0x89, 0xfd, 0x53, 0xec, 0x6e, 0x47, 0x98, 0x83, 0xe6, 0xb9, 0xb8,
0x74, 0xc1, 0x03, 0xbf, 0x1d, 0x29, 0xc9, 0x7c, 0x6c, 0x7d, 0x8a, 0x3f, 0xce, 0x85, 0x6b, 0x78,
0xe0, 0x77, 0xfa, 0xac, 0xa9, 0x0f, 0xf3, 0x4a, 0xc6, 0xf9, 0x54, 0xa8, 0x99, 0xa8, 0x09, 0x3c,
0x37, 0x9e, 0xc1, 0x80, 0xda, 0x86, 0x63, 0x1e, 0xfc, 0x02, 0xbc, 0xf5, 0x7f, 0x82, 0x31, 0xa4,
0x71, 0x92, 0x94, 0xeb, 0x5e, 0xad, 0xd9, 0x3d, 0x6c, 0xcb, 0x6c, 0x26, 0x2a, 0x19, 0xcf, 0x2e,
0x74, 0xb9, 0x19, 0x6d, 0x3e, 0xb0, 0x07, 0xd8, 0xaa, 0x64, 0x2c, 0x85, 0x6b, 0x7a, 0xe0, 0x77,
0xfb, 0x7b, 0xdb, 0xb3, 0x6f, 0x95, 0x15, 0x35, 0x09, 0x76, 0x07, 0x2d, 0x59, 0x9c, 0x8b, 0xbc,
0x72, 0x2d, 0xcf, 0xf4, 0x77, 0xa3, 0x35, 0xa9, 0xd1, 0xcf, 0x45, 0x2e, 0xdc, 0x9d, 0x66, 0x54,
0x69, 0xf6, 0x18, 0x6f, 0x97, 0x22, 0xcd, 0xd4, 0x1f, 0x8b, 0xe4, 0xdd, 0x66, 0xdf, 0xd6, 0xfb,
0x7b, 0x1b, 0x6f, 0xfc, 0xef, 0x26, 0x5d, 0x34, 0xb2, 0xc4, 0x6d, 0xeb, 0x12, 0x23, 0x4b, 0x06,
0xd4, 0xa6, 0x4e, 0x6b, 0x40, 0xed, 0x96, 0x63, 0x3d, 0x1c, 0xe2, 0xee, 0xd6, 0x95, 0x18, 0xa2,
0x75, 0xf4, 0x72, 0x1c, 0x9e, 0x05, 0x0e, 0x61, 0x1d, 0xdc, 0x19, 0x06, 0x47, 0x67, 0xe1, 0xe8,
0xb5, 0x03, 0x0a, 0x4e, 0x83, 0xd1, 0x89, 0x02, 0x43, 0xc1, 0xe0, 0x4d, 0x38, 0x52, 0x60, 0x32,
0x1b, 0xe9, 0x30, 0x78, 0x35, 0x76, 0xe8, 0xf1, 0xd3, 0xc5, 0x92, 0x93, 0xeb, 0x25, 0x27, 0x37,
0x4b, 0x0e, 0x5f, 0x6a, 0x0e, 0xdf, 0x6b, 0x0e, 0x57, 0x35, 0x87, 0x45, 0xcd, 0xe1, 0x67, 0xcd,
0xe1, 0x77, 0xcd, 0xc9, 0x4d, 0xcd, 0xe1, 0xeb, 0x8a, 0x93, 0xc5, 0x8a, 0x93, 0xeb, 0x15, 0x27,
0x13, 0x4b, 0xbf, 0x89, 0x27, 0x7f, 0x02, 0x00, 0x00, 0xff, 0xff, 0xad, 0x7a, 0xa4, 0x89, 0x56,
0x02, 0x00, 0x00,
// 478 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x92, 0x31, 0x6f, 0xd3, 0x40,
0x1c, 0xc5, 0x7d, 0xf6, 0xc5, 0xb5, 0xff, 0xa1, 0x91, 0x75, 0x45, 0xc8, 0xb4, 0xe8, 0xb0, 0x3a,
0x19, 0x24, 0x52, 0x11, 0x18, 0x10, 0x52, 0x87, 0x96, 0x1a, 0xe4, 0x28, 0x4a, 0x2b, 0x13, 0xba,
0x46, 0x4e, 0x7c, 0x18, 0xab, 0x89, 0x1d, 0xd9, 0x17, 0xa4, 0x30, 0xf1, 0x11, 0xf8, 0x02, 0xec,
0x7c, 0x0e, 0xa6, 0x8e, 0x19, 0x3b, 0x21, 0xe2, 0x2c, 0x8c, 0xfd, 0x08, 0xe8, 0xce, 0x6d, 0xdd,
0x6c, 0xef, 0xe5, 0xbd, 0xff, 0xef, 0xe5, 0x24, 0x03, 0xe4, 0x49, 0x1a, 0xb7, 0x67, 0x79, 0xc6,
0x33, 0x82, 0x85, 0xde, 0x7d, 0x11, 0x27, 0xfc, 0xcb, 0x7c, 0xd4, 0x1e, 0x67, 0xd3, 0x83, 0x38,
0x8b, 0xb3, 0x03, 0x19, 0x8e, 0xe6, 0x9f, 0xa5, 0x93, 0x46, 0xaa, 0xea, 0x68, 0xff, 0x27, 0x02,
0x7c, 0xc2, 0x8a, 0x31, 0x39, 0x04, 0x33, 0x49, 0x63, 0x56, 0x70, 0x96, 0x17, 0x36, 0x72, 0x34,
0xb7, 0xd9, 0x79, 0xdc, 0x96, 0x74, 0x11, 0xb7, 0xfd, 0xdb, 0xcc, 0x4b, 0x79, 0xbe, 0x38, 0xc6,
0x97, 0x7f, 0x9e, 0x2a, 0x41, 0x7d, 0xb1, 0x7b, 0x06, 0xad, 0xcd, 0x0a, 0xb1, 0x40, 0xbb, 0x60,
0x0b, 0x1b, 0x39, 0xc8, 0x35, 0x03, 0x21, 0x89, 0x0b, 0x8d, 0xaf, 0xe1, 0x64, 0xce, 0x6c, 0xd5,
0x41, 0x6e, 0xb3, 0x43, 0x2a, 0xbc, 0x9f, 0x16, 0x3c, 0x4c, 0xc7, 0x4c, 0xcc, 0x04, 0x55, 0xe1,
0xad, 0xfa, 0x06, 0x75, 0xb1, 0xa1, 0x5a, 0xda, 0xfe, 0x6f, 0x15, 0x1e, 0xdc, 0x6f, 0x10, 0x02,
0x38, 0x8c, 0xa2, 0xfc, 0x86, 0x2b, 0x35, 0x79, 0x02, 0x26, 0x4f, 0xa6, 0xac, 0xe0, 0xe1, 0x74,
0x26, 0xe1, 0x5a, 0x50, 0xff, 0x40, 0x9e, 0x41, 0xa3, 0xe0, 0x21, 0x67, 0xb6, 0xe6, 0x20, 0xb7,
0xd5, 0xd9, 0xd9, 0x9c, 0xfd, 0x28, 0xa2, 0xa0, 0x6a, 0x90, 0x47, 0xa0, 0xf3, 0xec, 0x82, 0xa5,
0x85, 0xad, 0x3b, 0x9a, 0xbb, 0x1d, 0xdc, 0x38, 0x31, 0xfa, 0x2d, 0x4b, 0x99, 0xbd, 0x55, 0x8d,
0x0a, 0x4d, 0x5e, 0xc2, 0xc3, 0x9c, 0xc5, 0x89, 0x78, 0x31, 0x8b, 0x86, 0xf5, 0xbe, 0x21, 0xf7,
0x77, 0xea, 0x6c, 0x70, 0xf7, 0x4f, 0x5a, 0xa0, 0x26, 0x91, 0x6d, 0x4a, 0x88, 0x9a, 0x44, 0xe4,
0x10, 0xf6, 0x72, 0x16, 0x46, 0xc3, 0x2c, 0x9d, 0x2c, 0x86, 0xf3, 0x59, 0x14, 0xf2, 0x0d, 0x12,
0x48, 0x92, 0x2d, 0x2a, 0xa7, 0xe9, 0x64, 0xf1, 0xa9, 0x2a, 0xd4, 0xb8, 0x3d, 0x30, 0xef, 0xce,
0xed, 0xa6, 0x83, 0x5c, 0x23, 0x30, 0x6e, 0xcb, 0x5d, 0x6c, 0x60, 0xab, 0xd1, 0xc5, 0x46, 0xc3,
0xd2, 0x9f, 0xf7, 0x60, 0x7b, 0xe3, 0xb9, 0x04, 0x40, 0x3f, 0x7a, 0x37, 0xf0, 0xcf, 0x3d, 0x4b,
0x21, 0x4d, 0xd8, 0xea, 0x79, 0x47, 0xe7, 0x7e, 0xff, 0x83, 0x85, 0x84, 0x39, 0xf3, 0xfa, 0x27,
0xc2, 0xa8, 0xc2, 0x74, 0x4f, 0xfd, 0xbe, 0x30, 0x1a, 0x31, 0x00, 0xf7, 0xbc, 0xf7, 0x03, 0x0b,
0x1f, 0xbf, 0x5e, 0xae, 0xa8, 0x72, 0xb5, 0xa2, 0xca, 0xf5, 0x8a, 0xa2, 0xef, 0x25, 0x45, 0xbf,
0x4a, 0x8a, 0x2e, 0x4b, 0x8a, 0x96, 0x25, 0x45, 0x7f, 0x4b, 0x8a, 0xfe, 0x95, 0x54, 0xb9, 0x2e,
0x29, 0xfa, 0xb1, 0xa6, 0xca, 0x72, 0x4d, 0x95, 0xab, 0x35, 0x55, 0x46, 0xba, 0xfc, 0xde, 0x5e,
0xfd, 0x0f, 0x00, 0x00, 0xff, 0xff, 0x6a, 0x5b, 0x75, 0x81, 0xb2, 0x02, 0x00, 0x00,
}
func (x InstanceState) String() string {
@ -335,6 +360,12 @@ func (this *InstanceDesc) Equal(that interface{}) bool {
if this.Id != that1.Id {
return false
}
if this.ReadOnlyUpdatedTimestamp != that1.ReadOnlyUpdatedTimestamp {
return false
}
if this.ReadOnly != that1.ReadOnly {
return false
}
return true
}
func (this *Desc) GoString() string {
@ -363,7 +394,7 @@ func (this *InstanceDesc) GoString() string {
if this == nil {
return "nil"
}
s := make([]string, 0, 11)
s := make([]string, 0, 13)
s = append(s, "&ring.InstanceDesc{")
s = append(s, "Addr: "+fmt.Sprintf("%#v", this.Addr)+",\n")
s = append(s, "Timestamp: "+fmt.Sprintf("%#v", this.Timestamp)+",\n")
@ -372,6 +403,8 @@ func (this *InstanceDesc) GoString() string {
s = append(s, "Zone: "+fmt.Sprintf("%#v", this.Zone)+",\n")
s = append(s, "RegisteredTimestamp: "+fmt.Sprintf("%#v", this.RegisteredTimestamp)+",\n")
s = append(s, "Id: "+fmt.Sprintf("%#v", this.Id)+",\n")
s = append(s, "ReadOnlyUpdatedTimestamp: "+fmt.Sprintf("%#v", this.ReadOnlyUpdatedTimestamp)+",\n")
s = append(s, "ReadOnly: "+fmt.Sprintf("%#v", this.ReadOnly)+",\n")
s = append(s, "}")
return strings.Join(s, "")
}
@ -450,6 +483,21 @@ func (m *InstanceDesc) MarshalToSizedBuffer(dAtA []byte) (int, error) {
_ = i
var l int
_ = l
if m.ReadOnly {
i--
if m.ReadOnly {
dAtA[i] = 1
} else {
dAtA[i] = 0
}
i--
dAtA[i] = 0x58
}
if m.ReadOnlyUpdatedTimestamp != 0 {
i = encodeVarintRing(dAtA, i, uint64(m.ReadOnlyUpdatedTimestamp))
i--
dAtA[i] = 0x50
}
if len(m.Id) > 0 {
i -= len(m.Id)
copy(dAtA[i:], m.Id)
@ -570,6 +618,12 @@ func (m *InstanceDesc) Size() (n int) {
if l > 0 {
n += 1 + l + sovRing(uint64(l))
}
if m.ReadOnlyUpdatedTimestamp != 0 {
n += 1 + sovRing(uint64(m.ReadOnlyUpdatedTimestamp))
}
if m.ReadOnly {
n += 2
}
return n
}
@ -611,6 +665,8 @@ func (this *InstanceDesc) String() string {
`Zone:` + fmt.Sprintf("%v", this.Zone) + `,`,
`RegisteredTimestamp:` + fmt.Sprintf("%v", this.RegisteredTimestamp) + `,`,
`Id:` + fmt.Sprintf("%v", this.Id) + `,`,
`ReadOnlyUpdatedTimestamp:` + fmt.Sprintf("%v", this.ReadOnlyUpdatedTimestamp) + `,`,
`ReadOnly:` + fmt.Sprintf("%v", this.ReadOnly) + `,`,
`}`,
}, "")
return s
@ -1063,6 +1119,45 @@ func (m *InstanceDesc) Unmarshal(dAtA []byte) error {
}
m.Id = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 10:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field ReadOnlyUpdatedTimestamp", wireType)
}
m.ReadOnlyUpdatedTimestamp = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRing
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.ReadOnlyUpdatedTimestamp |= int64(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 11:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field ReadOnly", wireType)
}
var v int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRing
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
m.ReadOnly = bool(v != 0)
default:
iNdEx = preIndex
skippy, err := skipRing(dAtA[iNdEx:])

@ -44,6 +44,17 @@ message InstanceDesc {
// ID of the instance. This value is the same as the key in the ingesters map in Desc.
string id = 9;
// Unix timestamp (with seconds precision) of when the read_only flag was updated. This
// is used to find other instances that could have possibly owned a specific token in
// the past on the write path, due to *this* instance being read-only. This value should
// only increase.
int64 read_only_updated_timestamp = 10;
// Indicates whether this instance is read only.
// Read-only instances go through standard state changes, and special handling is applied to them
// during shuffle shards.
bool read_only = 11;
}
enum InstanceState {

@ -24,9 +24,9 @@ var defaultPageTemplate = template.Must(template.New("webpage").Funcs(template.F
if t.IsZero() {
return ""
}
return t.Format(time.RFC3339Nano)
return t.Format(time.RFC3339)
},
"durationSince": func(t time.Time) string { return time.Since(t).Truncate(time.Millisecond).String() },
"durationSince": func(t time.Time) string { return time.Since(t).Truncate(time.Second).String() },
}).Parse(defaultPageContent))
type httpResponse struct {
@ -36,15 +36,17 @@ type httpResponse struct {
}
type ingesterDesc struct {
ID string `json:"id"`
State string `json:"state"`
Address string `json:"address"`
HeartbeatTimestamp time.Time `json:"timestamp"`
RegisteredTimestamp time.Time `json:"registered_timestamp"`
Zone string `json:"zone"`
Tokens []uint32 `json:"tokens"`
NumTokens int `json:"-"`
Ownership float64 `json:"-"`
ID string `json:"id"`
State string `json:"state"`
Address string `json:"address"`
HeartbeatTimestamp time.Time `json:"timestamp"`
RegisteredTimestamp time.Time `json:"registered_timestamp"`
ReadOnly bool `json:"read_only"`
ReadOnlyUpdatedTimestamp time.Time `json:"read_only_updated_timestamp"`
Zone string `json:"zone"`
Tokens []uint32 `json:"tokens"`
NumTokens int `json:"-"`
Ownership float64 `json:"-"`
}
type ringAccess interface {
@ -110,16 +112,20 @@ func (h *ringPageHandler) handle(w http.ResponseWriter, req *http.Request) {
state = "UNHEALTHY"
}
ro, rots := ing.GetReadOnlyState()
ingesters = append(ingesters, ingesterDesc{
ID: id,
State: state,
Address: ing.Addr,
HeartbeatTimestamp: time.Unix(ing.Timestamp, 0).UTC(),
RegisteredTimestamp: ing.GetRegisteredAt().UTC(),
Tokens: ing.Tokens,
Zone: ing.Zone,
NumTokens: len(ing.Tokens),
Ownership: (float64(ownedTokens[id]) / float64(math.MaxUint32)) * 100,
ID: id,
State: state,
Address: ing.Addr,
HeartbeatTimestamp: time.Unix(ing.Timestamp, 0).UTC(),
RegisteredTimestamp: ing.GetRegisteredAt().UTC(),
ReadOnly: ro,
ReadOnlyUpdatedTimestamp: rots.UTC(),
Tokens: ing.Tokens,
Zone: ing.Zone,
NumTokens: len(ing.Tokens),
Ownership: (float64(ownedTokens[id]) / float64(math.MaxUint32)) * 100,
})
}

@ -18,6 +18,8 @@
<th>State</th>
<th>Address</th>
<th>Registered At</th>
<th>Read-Only</th>
<th>Read-Only Updated</th>
<th>Last Heartbeat</th>
<th>Tokens</th>
<th>Ownership</th>
@ -36,6 +38,8 @@
<td>{{ .State }}</td>
<td>{{ .Address }}</td>
<td>{{ .RegisteredTimestamp | timeOrEmptyString }}</td>
<td>{{ .ReadOnly }}</td>
<td>{{ .ReadOnlyUpdatedTimestamp | timeOrEmptyString }}</td>
<td>{{ .HeartbeatTimestamp | durationSince }} ago ({{ .HeartbeatTimestamp.Format "15:04:05.999" }})</td>
<td>{{ .NumTokens }}</td>
<td>{{ .Ownership | humanFloat }}%</td>
@ -66,4 +70,4 @@
{{ end }}
</form>
</body>
</html>
</html>

@ -30,6 +30,9 @@ func ShuffleShardSeed(identifier, zone string) int64 {
// zone when zone-aware replication is enabled. The algorithm expects the shard size to be divisible
// by the number of zones, in order to have nodes balanced across zones. If it's not, we do round up.
func ShuffleShardExpectedInstancesPerZone(shardSize, numZones int) int {
if shardSize == math.MaxInt {
return math.MaxInt
}
return int(math.Ceil(float64(shardSize) / float64(numZones)))
}
@ -41,5 +44,5 @@ func ShuffleShardExpectedInstances(shardSize, numZones int) int {
// yoloBuf will return an unsafe pointer to a string, as the name yolo.yoloBuf implies use at your own risk.
func yoloBuf(s string) []byte {
return *((*[]byte)(unsafe.Pointer(&s)))
return unsafe.Slice(unsafe.StringData(s), len(s))
}

@ -3,7 +3,10 @@ package services
import (
"context"
"fmt"
"slices"
"sync"
"go.uber.org/atomic"
)
// StartingFn is called when service enters Starting state. If StartingFn returns
@ -325,26 +328,59 @@ func (b *BasicService) State() State {
}
// AddListener is part of Service interface.
func (b *BasicService) AddListener(listener Listener) {
func (b *BasicService) AddListener(listener Listener) func() {
b.stateMu.Lock()
defer b.stateMu.Unlock()
if b.state == Terminated || b.state == Failed {
// no more state transitions will be done, and channel wouldn't get closed
return
return func() {}
}
// There are max 4 state transitions. We use buffer to avoid blocking the sender,
// which holds service lock.
ch := make(chan func(l Listener), 4)
b.listeners = append(b.listeners, ch)
listenerCh := make(chan func(l Listener), 4)
b.listeners = append(b.listeners, listenerCh)
stop := make(chan struct{})
stopClosed := atomic.NewBool(false)
wg := sync.WaitGroup{}
wg.Add(1)
// each listener has its own goroutine, processing events.
go func() {
for lfn := range ch {
lfn(listener)
defer wg.Done()
for {
select {
// Process events from service.
case lfn, ok := <-listenerCh:
if !ok {
return
}
lfn(listener)
case <-stop:
return
}
}
}()
return func() {
if stopClosed.CompareAndSwap(false, true) {
// Tell listener goroutine to stop.
close(stop)
}
// Remove channel for notifications from service's list of listeners.
b.stateMu.Lock()
b.listeners = slices.DeleteFunc(b.listeners, func(c chan func(l Listener)) bool {
return listenerCh == c
})
b.stateMu.Unlock()
wg.Wait()
}
}
// lock must be held here. Read lock would be good enough, but since

@ -1,16 +1,22 @@
package services
import (
"sync"
"github.com/pkg/errors"
)
var (
errFailureWatcherNotInitialized = errors.New("FailureWatcher has not been initialized")
errFailureWatcherClosed = errors.New("FailureWatcher has been stopped")
)
// FailureWatcher waits for service failures, and passed them to the channel.
type FailureWatcher struct {
ch chan error
mu sync.Mutex
ch chan error
closed bool
unregisterListeners []func()
}
func NewFailureWatcher() *FailureWatcher {
@ -35,9 +41,17 @@ func (w *FailureWatcher) WatchService(service Service) {
panic(errFailureWatcherNotInitialized)
}
service.AddListener(NewListener(nil, nil, nil, nil, func(_ State, failure error) {
w.mu.Lock()
defer w.mu.Unlock()
if w.closed {
panic(errFailureWatcherClosed)
}
stop := service.AddListener(NewListener(nil, nil, nil, nil, func(_ State, failure error) {
w.ch <- errors.Wrapf(failure, "service %s failed", DescribeService(service))
}))
w.unregisterListeners = append(w.unregisterListeners, stop)
}
func (w *FailureWatcher) WatchManager(manager *Manager) {
@ -47,7 +61,40 @@ func (w *FailureWatcher) WatchManager(manager *Manager) {
panic(errFailureWatcherNotInitialized)
}
manager.AddListener(NewManagerListener(nil, nil, func(service Service) {
w.mu.Lock()
defer w.mu.Unlock()
if w.closed {
panic(errFailureWatcherClosed)
}
stop := manager.AddListener(NewManagerListener(nil, nil, func(service Service) {
w.ch <- errors.Wrapf(service.FailureCase(), "service %s failed", DescribeService(service))
}))
w.unregisterListeners = append(w.unregisterListeners, stop)
}
// Close stops this failure watcher and closes channel returned by Chan() method. After closing failure watcher,
// it cannot be used to watch additional services or managers.
// Repeated calls to Close() do nothing.
func (w *FailureWatcher) Close() {
// Graceful handle the case FailureWatcher has not been initialized,
// to simplify the code in the components using it.
if w == nil {
return
}
w.mu.Lock()
defer w.mu.Unlock()
if w.closed {
return
}
for _, stop := range w.unregisterListeners {
stop()
}
// All listeners are now stopped, and can't receive more notifications. We can close the channel.
close(w.ch)
w.closed = true
}

@ -4,7 +4,10 @@ import (
"context"
"errors"
"fmt"
"slices"
"sync"
"go.uber.org/atomic"
)
type managerState int
@ -31,6 +34,11 @@ type ManagerListener interface {
// Manager can start them, and observe their state as a group.
// Once all services are running, Manager is said to be Healthy. It is possible for manager to never reach the Healthy state, if some services fail to start.
// When all services are stopped (Terminated or Failed), manager is Stopped.
//
// Note: Manager's state is defined by state of services. Services can be started outside of Manager and if all become Running, Manager will be Healthy as well.
//
// Note: Creating a manager immediately installs listeners to all services (to compute manager's state), which may start goroutines.
// To avoid leaking goroutines, make sure to eventually stop all services or the manager (which stops services), even if manager wasn't explicitly started.
type Manager struct {
services []Service
@ -226,25 +234,61 @@ func (m *Manager) serviceStateChanged(s Service, from State, to State) {
// Specifically, a given listener will have its callbacks invoked in the same order as the underlying service enters those states.
// Additionally, at most one of the listener's callbacks will execute at once.
// However, multiple listeners' callbacks may execute concurrently, and listeners may execute in an order different from the one in which they were registered.
func (m *Manager) AddListener(listener ManagerListener) {
//
// Returned function can be used to stop the listener and free resources used by it (e.g. goroutine).
func (m *Manager) AddListener(listener ManagerListener) func() {
m.mu.Lock()
defer m.mu.Unlock()
if m.state == stopped {
// no need to register listener, as no more events will be sent
return
return func() {}
}
// max number of events is: failed notification for each service + healthy + stopped.
// we use buffer to avoid blocking the sender, which holds the manager's lock.
ch := make(chan func(l ManagerListener), len(m.services)+2)
m.listeners = append(m.listeners, ch)
listenerCh := make(chan func(l ManagerListener), len(m.services)+2)
m.listeners = append(m.listeners, listenerCh)
stop := make(chan struct{})
stopClosed := atomic.NewBool(false)
wg := sync.WaitGroup{}
wg.Add(1)
// each listener has its own goroutine, processing events.
go func() {
for fn := range ch {
fn(listener)
defer wg.Done()
for {
select {
// Process events from service.
case fn, ok := <-listenerCh:
if !ok {
return
}
fn(listener)
case <-stop:
return
}
}
}()
return func() {
if stopClosed.CompareAndSwap(false, true) {
// Tell listener goroutine to stop.
close(stop)
}
// Remove channel for notifications from manager's list of listeners.
m.mu.Lock()
m.listeners = slices.DeleteFunc(m.listeners, func(c chan func(listener ManagerListener)) bool {
return listenerCh == c
})
m.mu.Unlock()
wg.Wait()
}
}
// called with lock

@ -91,7 +91,10 @@ type Service interface {
// as the service enters those states. Additionally, at most one of the listener's callbacks will execute
// at once. However, multiple listeners' callbacks may execute concurrently, and listeners may execute
// in an order different from the one in which they were registered.
AddListener(listener Listener)
//
// Returned function can be used to stop the listener from receiving additional events from the service,
// and release resources used by the listener (e.g. goroutine, if it was started by adding listener).
AddListener(listener Listener) func()
}
// NamedService extends Service with a name.

@ -2,6 +2,8 @@ package spanlogger
import (
"context"
"runtime"
"strings"
"go.uber.org/atomic" // Really just need sync/atomic but there is a lint rule preventing it.
@ -160,6 +162,10 @@ func (s *SpanLogger) getLogger() log.Logger {
if ok {
logger = log.With(logger, "trace_id", traceID)
}
// Replace the default valuer for the 'caller' attribute with one that gets the caller of the methods in this file.
logger = log.With(logger, "caller", spanLoggerAwareCaller())
// If the value has been set by another goroutine, fetch that other value and discard the one we made.
if !s.logger.CompareAndSwap(nil, &logger) {
pLogger := s.logger.Load()
@ -181,3 +187,47 @@ func (s *SpanLogger) SetSpanAndLogTag(key string, value interface{}) {
wrappedLogger := log.With(logger, key, value)
s.logger.Store(&wrappedLogger)
}
// spanLoggerAwareCaller is like log.Caller, but ensures that the caller information is
// that of the caller to SpanLogger, not SpanLogger itself.
func spanLoggerAwareCaller() log.Valuer {
valuer := atomic.NewPointer[log.Valuer](nil)
return func() interface{} {
// If we've already determined the correct stack depth, use it.
existingValuer := valuer.Load()
if existingValuer != nil {
return (*existingValuer)()
}
// We haven't been called before, determine the correct stack depth to
// skip the configured logger's internals and the SpanLogger's internals too.
//
// Note that we can't do this in spanLoggerAwareCaller() directly because we
// need to do this when invoked by the configured logger - otherwise we cannot
// measure the stack depth of the logger's internals.
stackDepth := 3 // log.DefaultCaller uses a stack depth of 3, so start searching for the correct stack depth there.
for {
_, file, _, ok := runtime.Caller(stackDepth)
if !ok {
// We've run out of possible stack frames. Give up.
valuer.Store(&unknownCaller)
return unknownCaller()
}
if strings.HasSuffix(file, "spanlogger/spanlogger.go") {
stackValuer := log.Caller(stackDepth + 2) // Add one to skip the stack frame for the SpanLogger method, and another to skip the stack frame for the valuer which we'll invoke below.
valuer.Store(&stackValuer)
return stackValuer()
}
stackDepth++
}
}
}
var unknownCaller log.Valuer = func() interface{} {
return "<unknown>"
}

@ -16,7 +16,18 @@ import (
//
//nolint:revive
func TenantID(ctx context.Context) (string, error) {
orgIDs, err := TenantIDs(ctx)
//lint:ignore faillint wrapper around upstream method
orgID, err := user.ExtractOrgID(ctx)
if err != nil {
return "", err
}
if !strings.Contains(orgID, tenantIDsSeparator) {
if err := ValidTenantID(orgID); err != nil {
return "", err
}
return orgID, nil
}
orgIDs, err := tenantIDsFromString(orgID)
if err != nil {
return "", err
}
@ -42,6 +53,10 @@ func TenantIDs(ctx context.Context) ([]string, error) {
return nil, err
}
return tenantIDsFromString(orgID)
}
func tenantIDsFromString(orgID string) ([]string, error) {
orgIDs := strings.Split(orgID, tenantIDsSeparator)
for _, id := range orgIDs {
if err := ValidTenantID(id); err != nil {

@ -55,16 +55,30 @@ func NewFromEnv(serviceName string, options ...jaegercfg.Option) (io.Closer, err
// ExtractTraceID extracts the trace id, if any from the context.
func ExtractTraceID(ctx context.Context) (string, bool) {
if tid, _, ok := extractJaegerContext(ctx); ok {
return tid.String(), true
}
return "", false
}
// ExtractTraceSpanID extracts the trace id, span id if any from the context.
func ExtractTraceSpanID(ctx context.Context) (string, string, bool) {
if tid, sid, ok := extractJaegerContext(ctx); ok {
return tid.String(), sid.String(), true
}
return "", "", false
}
func extractJaegerContext(ctx context.Context) (tid jaeger.TraceID, sid jaeger.SpanID, success bool) {
sp := opentracing.SpanFromContext(ctx)
if sp == nil {
return "", false
return
}
sctx, ok := sp.Context().(jaeger.SpanContext)
jsp, ok := sp.Context().(jaeger.SpanContext)
if !ok {
return "", false
return
}
return sctx.TraceID().String(), true
return jsp.TraceID(), jsp.SpanID(), true
}
// ExtractSampledTraceID works like ExtractTraceID but the returned bool is only

@ -1,6 +1,3 @@
//go:build go1.16 && !go1.23
// +build go1.16,!go1.23
package pprof
// unsafe is required for go:linkname

@ -971,8 +971,8 @@ github.com/gorilla/websocket
# github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2
## explicit; go 1.17
github.com/grafana/cloudflare-go
# github.com/grafana/dskit v0.0.0-20240626184720-35810fdf1c6d
## explicit; go 1.20
# github.com/grafana/dskit v0.0.0-20240819131358-463219e80ea0
## explicit; go 1.21
github.com/grafana/dskit/aws
github.com/grafana/dskit/backoff
github.com/grafana/dskit/cancellation
@ -988,7 +988,6 @@ github.com/grafana/dskit/grpcutil
github.com/grafana/dskit/httpgrpc
github.com/grafana/dskit/httpgrpc/server
github.com/grafana/dskit/instrument
github.com/grafana/dskit/internal/math
github.com/grafana/dskit/internal/slices
github.com/grafana/dskit/kv
github.com/grafana/dskit/kv/codec
@ -1029,7 +1028,7 @@ github.com/grafana/jsonparser
# github.com/grafana/loki/pkg/push v0.0.0-20231124142027-e52380921608 => ./pkg/push
## explicit; go 1.19
github.com/grafana/loki/pkg/push
# github.com/grafana/pyroscope-go/godeltaprof v0.1.6
# github.com/grafana/pyroscope-go/godeltaprof v0.1.7
## explicit; go 1.16
github.com/grafana/pyroscope-go/godeltaprof
github.com/grafana/pyroscope-go/godeltaprof/http/pprof

Loading…
Cancel
Save