updated to the latest version of dskit (#9920)

**What this PR does / why we need it**:
Updated Loki with the latest version of dskit

**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [ ] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/upgrading/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e3ec)

---------

Signed-off-by: Vladyslav Diachenko <vlad.diachenko@grafana.com>
pull/9910/head^2
Vladyslav Diachenko 3 years ago committed by GitHub
parent bad691b509
commit 3243676f4c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      docs/sources/configure/_index.md
  2. 2
      go.mod
  3. 4
      go.sum
  4. 17
      pkg/distributor/instance_count_test.go
  5. 2
      pkg/loki/loki.go
  6. 1
      pkg/loki/modules.go
  7. 4
      pkg/querier/worker/worker.go
  8. 4
      pkg/ruler/base/lifecycle.go
  9. 4
      pkg/ruler/base/lifecycle_test.go
  10. 4
      pkg/scheduler/lifecycle.go
  11. 5
      pkg/storage/chunk/client/gcp/bigtable_index_client.go
  12. 2
      pkg/storage/factory.go
  13. 4
      pkg/storage/stores/indexshipper/compactor/compactor.go
  14. 4
      pkg/storage/stores/shipper/indexgateway/lifecycle.go
  15. 2
      vendor/github.com/grafana/dskit/dns/miekgdns/resolver.go
  16. 31
      vendor/github.com/grafana/dskit/flagext/bytes.go
  17. 30
      vendor/github.com/grafana/dskit/grpcclient/grpcclient.go
  18. 9
      vendor/github.com/grafana/dskit/kv/client.go
  19. 6
      vendor/github.com/grafana/dskit/kv/consul/client.go
  20. 6
      vendor/github.com/grafana/dskit/kv/consul/mock.go
  21. 19
      vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go
  22. 14
      vendor/github.com/grafana/dskit/kv/memberlist/metrics.go
  23. 7
      vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go
  24. 12
      vendor/github.com/grafana/dskit/kv/mock.go
  25. 3
      vendor/github.com/grafana/dskit/loser/loser.go
  26. 35
      vendor/github.com/grafana/dskit/ring/basic_lifecycler.go
  27. 4
      vendor/github.com/grafana/dskit/ring/basic_lifecycler_delegates.go
  28. 5
      vendor/github.com/grafana/dskit/ring/client/pool.go
  29. 20
      vendor/github.com/grafana/dskit/ring/doc.go
  30. 2
      vendor/github.com/grafana/dskit/ring/flush.go
  31. 73
      vendor/github.com/grafana/dskit/ring/lifecycler.go
  32. 136
      vendor/github.com/grafana/dskit/ring/ownership_priority_queue.go
  33. 126
      vendor/github.com/grafana/dskit/ring/replication_set.go
  34. 272
      vendor/github.com/grafana/dskit/ring/replication_set_tracker.go
  35. 51
      vendor/github.com/grafana/dskit/ring/ring.go
  36. 2
      vendor/github.com/grafana/dskit/ring/ring.pb.go
  37. 2
      vendor/github.com/grafana/dskit/ring/ring.proto
  38. 305
      vendor/github.com/grafana/dskit/ring/spread_minimizing_token_generator.go
  39. 54
      vendor/github.com/grafana/dskit/ring/token_generator.go
  40. 45
      vendor/github.com/grafana/dskit/ring/util.go
  41. 3
      vendor/github.com/grafana/dskit/runutil/runutil.go
  42. 2
      vendor/github.com/grafana/dskit/services/manager.go
  43. 2
      vendor/modules.txt

@ -3607,7 +3607,7 @@ The `grpc_client` block configures the gRPC client used to communicate between t
# CLI flag: -<prefix>.grpc-client-rate-limit-burst
[rate_limit_burst: <int> | default = 0]
# Enable backoff and retry when we hit ratelimits.
# Enable backoff and retry when we hit rate limits.
# CLI flag: -<prefix>.backoff-on-ratelimits
[backoff_on_ratelimits: <boolean> | default = false]
@ -3624,7 +3624,19 @@ backoff_config:
# CLI flag: -<prefix>.backoff-retries
[max_retries: <int> | default = 10]
# Enable TLS in the GRPC client. This flag needs to be enabled when any other
# Initial stream window size. Values less than the default are not supported and
# are ignored. Setting this to a value other than the default disables the BDP
# estimator.
# CLI flag: -<prefix>.initial-stream-window-size
[initial_stream_window_size: <int> | default = 63KiB1023B]
# Initial connection window size. Values less than the default are not supported
# and are ignored. Setting this to a value other than the default disables the
# BDP estimator.
# CLI flag: -<prefix>.initial-connection-window-size
[initial_connection_window_size: <int> | default = 63KiB1023B]
# Enable TLS in the gRPC client. This flag needs to be enabled when any other
# TLS flag is set. If set to false, insecure connection to gRPC server will be
# used.
# CLI flag: -<prefix>.tls-enabled

@ -49,7 +49,7 @@ require (
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.5.0
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2
github.com/grafana/dskit v0.0.0-20230518162305-3c92c534827e
github.com/grafana/dskit v0.0.0-20230706162620-5081d8ed53e6
github.com/grafana/go-gelf/v2 v2.0.1
github.com/grafana/gomemcache v0.0.0-20230316202710-a081dae0aba9
github.com/grafana/regexp v0.0.0-20221122212121-6b5c0a4cb7fd

@ -1071,8 +1071,8 @@ github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad
github.com/gotestyourself/gotestyourself v2.2.0+incompatible/go.mod h1:zZKM6oeNM8k+FRljX1mnzVYeS8wiGgQyvST1/GafPbY=
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 h1:qhugDMdQ4Vp68H0tp/0iN17DM2ehRo1rLEdOFe/gB8I=
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2/go.mod h1:w/aiO1POVIeXUQyl0VQSZjl5OAGDTL5aX+4v0RA1tcw=
github.com/grafana/dskit v0.0.0-20230518162305-3c92c534827e h1:ODjv+9dmklDS33O2B4zPgIDKdnji18o9ofD9qWA+mAs=
github.com/grafana/dskit v0.0.0-20230518162305-3c92c534827e/go.mod h1:M03k2fzuQ2n9TVE1xfVKTESibxsXdw0wYfWT3+9Owp4=
github.com/grafana/dskit v0.0.0-20230706162620-5081d8ed53e6 h1:/19OPOCKP95g9hKLn1mN2dR/qBE4+oEY2F9XZ7G1xJM=
github.com/grafana/dskit v0.0.0-20230706162620-5081d8ed53e6/go.mod h1:M03k2fzuQ2n9TVE1xfVKTESibxsXdw0wYfWT3+9Owp4=
github.com/grafana/go-gelf/v2 v2.0.1 h1:BOChP0h/jLeD+7F9mL7tq10xVkDG15he3T1zHuQaWak=
github.com/grafana/go-gelf/v2 v2.0.1/go.mod h1:lexHie0xzYGwCgiRGcvZ723bSNyNI8ZRD4s0CLobh90=
github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY=

@ -4,6 +4,10 @@ import (
"testing"
"time"
"github.com/go-kit/log"
"github.com/grafana/dskit/kv/consul"
"github.com/stretchr/testify/assert"
"github.com/grafana/dskit/ring"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
@ -91,6 +95,8 @@ func TestInstanceCountDelegate_CorrectlyInvokesOtherDelegates(t *testing.T) {
sentry1 := map[string]int{}
sentry2 := map[string]int{}
store, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
var delegate ring.BasicLifecyclerDelegate
delegate = ring.NewInstanceRegisterDelegate(ring.ACTIVE, 1 /* tokenCount */)
@ -98,6 +104,9 @@ func TestInstanceCountDelegate_CorrectlyInvokesOtherDelegates(t *testing.T) {
delegate = newHealthyInstanceDelegate(counter, time.Second, delegate)
delegate = &sentryDelegate{BasicLifecyclerDelegate: delegate, calls: sentry2} // sentry delegate AFTER newHealthyInstancesDelegate
lifecycler, err := ring.NewBasicLifecycler(ring.BasicLifecyclerConfig{}, "test-ring", "test-ring-key", store, delegate, log.NewNopLogger(), nil)
require.NoError(t, err)
ingesters := ring.NewDesc()
ingesters.AddIngester("ingester-0", "ingester-0:3100", "zone-a", []uint32{1}, ring.ACTIVE, time.Now())
@ -111,19 +120,19 @@ func TestInstanceCountDelegate_CorrectlyInvokesOtherDelegates(t *testing.T) {
require.Equal(t, 0, sentry1["Tokens"])
require.Equal(t, 0, sentry2["Tokens"])
delegate.OnRingInstanceHeartbeat(nil, ingesters, nil)
delegate.OnRingInstanceHeartbeat(lifecycler, ingesters, nil)
require.Equal(t, 1, sentry1["Heartbeat"])
require.Equal(t, 1, sentry2["Heartbeat"])
delegate.OnRingInstanceRegister(nil, *ingesters, true, "ingester-0", ring.InstanceDesc{})
delegate.OnRingInstanceRegister(lifecycler, *ingesters, true, "ingester-0", ring.InstanceDesc{})
require.Equal(t, 1, sentry1["Register"])
require.Equal(t, 1, sentry2["Register"])
delegate.OnRingInstanceStopping(nil)
delegate.OnRingInstanceStopping(lifecycler)
require.Equal(t, 1, sentry1["Stopping"])
require.Equal(t, 1, sentry2["Stopping"])
delegate.OnRingInstanceTokens(nil, ring.Tokens{})
delegate.OnRingInstanceTokens(lifecycler, ring.Tokens{})
require.Equal(t, 1, sentry1["Stopping"])
require.Equal(t, 1, sentry2["Stopping"])
}

@ -230,7 +230,7 @@ func (c *Config) Validate() error {
if err := c.LimitsConfig.Validate(); err != nil {
return errors.Wrap(err, "invalid limits config")
}
if err := c.Worker.Validate(util_log.Logger); err != nil {
if err := c.Worker.Validate(); err != nil {
return errors.Wrap(err, "invalid frontend-worker config")
}
if err := c.StorageConfig.BoltDBShipperConfig.Validate(); err != nil {

@ -1059,7 +1059,6 @@ func (t *Loki) initMemberlistKV() (services.Service, error) {
reg := prometheus.DefaultRegisterer
t.Cfg.MemberlistKV.MetricsNamespace = "loki"
t.Cfg.MemberlistKV.MetricsRegisterer = reg
t.Cfg.MemberlistKV.Codecs = []codec.Codec{
ring.GetCodec(),
analytics.JSONCodec,

@ -48,11 +48,11 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("querier.frontend-client", f)
}
func (cfg *Config) Validate(log log.Logger) error {
func (cfg *Config) Validate() error {
if cfg.FrontendAddress != "" && cfg.SchedulerAddress != "" {
return errors.New("frontend address and scheduler address are mutually exclusive, please use only one")
}
return cfg.GRPCClientConfig.Validate(log)
return cfg.GRPCClientConfig.Validate()
}
// Handler for HTTP requests wrapped in protobuf messages.

@ -4,7 +4,7 @@ import (
"github.com/grafana/dskit/ring"
)
func (r *Ruler) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, _ string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) {
func (r *Ruler) OnRingInstanceRegister(l *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, _ string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) {
// When we initialize the ruler instance in the ring we want to start from
// a clean situation, so whatever is the state we set it ACTIVE, while we keep existing
// tokens (if any).
@ -14,7 +14,7 @@ func (r *Ruler) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.De
}
takenTokens := ringDesc.GetTokens()
newTokens := ring.GenerateTokens(r.cfg.Ring.NumTokens-len(tokens), takenTokens)
newTokens := l.GetTokenGenerator().GenerateTokens(r.cfg.Ring.NumTokens-len(tokens), takenTokens)
// Tokens sorting will be enforced by the parent caller.
tokens = append(tokens, newTokens...)

@ -99,14 +99,14 @@ func TestRuler_RingLifecyclerShouldAutoForgetUnhealthyInstances(t *testing.T) {
}
func generateSortedTokens(numTokens int) ring.Tokens {
tokens := ring.GenerateTokens(numTokens, nil)
tokens := ring.NewRandomTokenGenerator().GenerateTokens(numTokens, nil)
// Ensure generated tokens are sorted.
sort.Slice(tokens, func(i, j int) bool {
return tokens[i] < tokens[j]
})
return ring.Tokens(tokens)
return tokens
}
// numTokens determines the number of tokens owned by the specified

@ -4,7 +4,7 @@ import (
"github.com/grafana/dskit/ring"
)
func (rm *RingManager) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, _ string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) {
func (rm *RingManager) OnRingInstanceRegister(l *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, _ string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) {
// When we initialize the scheduler instance in the ring we want to start from
// a clean situation, so whatever is the state we set it JOINING, while we keep existing
// tokens (if any) or the ones loaded from file.
@ -14,7 +14,7 @@ func (rm *RingManager) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc
}
takenTokens := ringDesc.GetTokens()
newTokens := ring.GenerateTokens(ringNumTokens-len(tokens), takenTokens)
newTokens := l.GetTokenGenerator().GenerateTokens(ringNumTokens-len(tokens), takenTokens)
// Tokens sorting will be enforced by the parent caller.
tokens = append(tokens, newTokens...)

@ -11,7 +11,6 @@ import (
"time"
"cloud.google.com/go/bigtable"
"github.com/go-kit/log"
"github.com/grafana/dskit/grpcclient"
ot "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
@ -57,8 +56,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("bigtable", f)
}
func (cfg *Config) Validate(log log.Logger) error {
return cfg.GRPCClientConfig.Validate(log)
func (cfg *Config) Validate() error {
return cfg.GRPCClientConfig.Validate()
}
// storageClientColumnKey implements chunk.storageClient for GCP.

@ -322,7 +322,7 @@ func (cfg *Config) Validate() error {
if err := cfg.CassandraStorageConfig.Validate(); err != nil {
return errors.Wrap(err, "invalid Cassandra Storage config")
}
if err := cfg.GCPStorageConfig.Validate(util_log.Logger); err != nil {
if err := cfg.GCPStorageConfig.Validate(); err != nil {
return errors.Wrap(err, "invalid GCP Storage Storage config")
}
if err := cfg.Swift.Validate(); err != nil {

@ -757,7 +757,7 @@ func (e *expirationChecker) DropFromIndex(ref retention.ChunkEntry, tableEndTime
return e.retentionExpiryChecker.DropFromIndex(ref, tableEndTime, now) || e.deletionExpiryChecker.DropFromIndex(ref, tableEndTime, now)
}
func (c *Compactor) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, _ string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) {
func (c *Compactor) OnRingInstanceRegister(l *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, _ string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) {
// When we initialize the compactor instance in the ring we want to start from
// a clean situation, so whatever is the state we set it JOINING, while we keep existing
// tokens (if any) or the ones loaded from file.
@ -767,7 +767,7 @@ func (c *Compactor) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc rin
}
takenTokens := ringDesc.GetTokens()
newTokens := ring.GenerateTokens(ringNumTokens-len(tokens), takenTokens)
newTokens := l.GetTokenGenerator().GenerateTokens(ringNumTokens-len(tokens), takenTokens)
// Tokens sorting will be enforced by the parent caller.
tokens = append(tokens, newTokens...)

@ -4,7 +4,7 @@ import (
"github.com/grafana/dskit/ring"
)
func (rm *RingManager) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, _ string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) {
func (rm *RingManager) OnRingInstanceRegister(l *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, _ string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) {
// When we initialize the index gateway instance in the ring we want to start from
// a clean situation, so whatever is the state we set it JOINING, while we keep existing
// tokens (if any) or the ones loaded from file.
@ -14,7 +14,7 @@ func (rm *RingManager) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc
}
takenTokens := ringDesc.GetTokens()
newTokens := ring.GenerateTokens(ringNumTokens-len(tokens), takenTokens)
newTokens := l.GetTokenGenerator().GenerateTokens(ringNumTokens-len(tokens), takenTokens)
// Tokens sorting will be enforced by the parent caller.
tokens = append(tokens, newTokens...)

@ -20,7 +20,7 @@ type Resolver struct {
ResolvConf string
}
func (r *Resolver) LookupSRV(ctx context.Context, service, proto, name string) (cname string, addrs []*net.SRV, err error) {
func (r *Resolver) LookupSRV(_ context.Context, service, proto, name string) (cname string, addrs []*net.SRV, err error) {
var target string
if service == "" && proto == "" {
target = name

@ -8,18 +8,18 @@ import (
"github.com/alecthomas/units"
)
// Bytes is a data type which supports yaml serialization/deserialization
// with units.
// Bytes is a data type which supports use as a flag and yaml
// serialization/deserialization with units.
type Bytes uint64
func (b *Bytes) UnmarshalYAML(unmarshal func(interface{}) error) error {
var value string
err := unmarshal(&value)
if err != nil {
return err
}
// String implements flag.Value
func (b *Bytes) String() string {
return units.Base2Bytes(*b).String()
}
bytes, err := units.ParseBase2Bytes(value)
// Set implements flag.Value
func (b *Bytes) Set(s string) error {
bytes, err := units.ParseBase2Bytes(s)
if err != nil {
return err
}
@ -28,6 +28,15 @@ func (b *Bytes) UnmarshalYAML(unmarshal func(interface{}) error) error {
return nil
}
func (b *Bytes) MarshalYAML() (interface{}, error) {
return units.Base2Bytes(*b).String(), nil
func (b *Bytes) UnmarshalYAML(unmarshal func(interface{}) error) error {
var value string
if err := unmarshal(&value); err != nil {
return err
}
return b.Set(value)
}
func (b Bytes) MarshalYAML() (interface{}, error) {
return b.String(), nil
}

@ -4,7 +4,6 @@ import (
"flag"
"time"
"github.com/go-kit/log"
"github.com/pkg/errors"
"google.golang.org/grpc"
grpcbackoff "google.golang.org/grpc/backoff"
@ -13,6 +12,7 @@ import (
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/crypto/tls"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/grpcencoding/snappy"
)
@ -27,6 +27,9 @@ type Config struct {
BackoffOnRatelimits bool `yaml:"backoff_on_ratelimits" category:"advanced"`
BackoffConfig backoff.Config `yaml:"backoff_config"`
InitialStreamWindowSize flagext.Bytes `yaml:"initial_stream_window_size" category:"experimental"`
InitialConnectionWindowSize flagext.Bytes `yaml:"initial_connection_window_size" category:"experimental"`
TLSEnabled bool `yaml:"tls_enabled" category:"advanced"`
TLS tls.ClientConfig `yaml:",inline"`
@ -41,16 +44,23 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("", f)
}
const defaultInitialWindowSize = 65535 // From https://github.com/grpc/grpc-go/blob/c9d3ea5673252d212c69f3d3c10ce1d7b287a86b/internal/transport/defaults.go#L28
// RegisterFlagsWithPrefix registers flags with prefix.
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
// Set the default values.
cfg.InitialStreamWindowSize = defaultInitialWindowSize
cfg.InitialConnectionWindowSize = defaultInitialWindowSize
f.IntVar(&cfg.MaxRecvMsgSize, prefix+".grpc-max-recv-msg-size", 100<<20, "gRPC client max receive message size (bytes).")
f.IntVar(&cfg.MaxSendMsgSize, prefix+".grpc-max-send-msg-size", 100<<20, "gRPC client max send message size (bytes).")
f.StringVar(&cfg.GRPCCompression, prefix+".grpc-compression", "", "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)")
f.Float64Var(&cfg.RateLimit, prefix+".grpc-client-rate-limit", 0., "Rate limit for gRPC client; 0 means disabled.")
f.IntVar(&cfg.RateLimitBurst, prefix+".grpc-client-rate-limit-burst", 0, "Rate limit burst for gRPC client.")
f.BoolVar(&cfg.BackoffOnRatelimits, prefix+".backoff-on-ratelimits", false, "Enable backoff and retry when we hit ratelimits.")
f.BoolVar(&cfg.TLSEnabled, prefix+".tls-enabled", cfg.TLSEnabled, "Enable TLS in the GRPC client. This flag needs to be enabled when any other TLS flag is set. If set to false, insecure connection to gRPC server will be used.")
f.BoolVar(&cfg.BackoffOnRatelimits, prefix+".backoff-on-ratelimits", false, "Enable backoff and retry when we hit rate limits.")
f.Var(&cfg.InitialStreamWindowSize, prefix+".initial-stream-window-size", "Initial stream window size. Values less than the default are not supported and are ignored. Setting this to a value other than the default disables the BDP estimator.")
f.Var(&cfg.InitialConnectionWindowSize, prefix+".initial-connection-window-size", "Initial connection window size. Values less than the default are not supported and are ignored. Setting this to a value other than the default disables the BDP estimator.")
f.BoolVar(&cfg.TLSEnabled, prefix+".tls-enabled", cfg.TLSEnabled, "Enable TLS in the gRPC client. This flag needs to be enabled when any other TLS flag is set. If set to false, insecure connection to gRPC server will be used.")
f.DurationVar(&cfg.ConnectTimeout, prefix+".connect-timeout", 0, "The maximum amount of time to establish a connection. A value of 0 means default gRPC connect timeout and backoff.")
f.DurationVar(&cfg.ConnectBackoffBaseDelay, prefix+".connect-backoff-base-delay", time.Second, "Initial backoff delay after first connection failure. Only relevant if ConnectTimeout > 0.")
f.DurationVar(&cfg.ConnectBackoffMaxDelay, prefix+".connect-backoff-max-delay", 5*time.Second, "Maximum backoff delay when establishing a connection. Only relevant if ConnectTimeout > 0.")
@ -60,7 +70,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.TLS.RegisterFlagsWithPrefix(prefix, f)
}
func (cfg *Config) Validate(log log.Logger) error {
func (cfg *Config) Validate() error {
switch cfg.GRPCCompression {
case gzip.Name, snappy.Name, "":
// valid
@ -118,6 +128,16 @@ func (cfg *Config) DialOption(unaryClientInterceptors []grpc.UnaryClientIntercep
)
}
if cfg.InitialStreamWindowSize > defaultInitialWindowSize {
// We only want to explicitly set the window size if it's not the default, as setting the window size (even to the default) always disables the BDP estimator.
opts = append(opts, grpc.WithInitialWindowSize(int32(cfg.InitialStreamWindowSize)))
}
if cfg.InitialConnectionWindowSize > defaultInitialWindowSize {
// We only want to explicitly set the window size if it's not the default, as setting the window size (even to the default) always disables the BDP estimator.
opts = append(opts, grpc.WithInitialConnWindowSize(int32(cfg.InitialConnectionWindowSize)))
}
return append(
opts,
grpc.WithDefaultCallOptions(cfg.CallOptions()...),

@ -105,13 +105,16 @@ type Client interface {
// be returned if the key does not exist.
Delete(ctx context.Context, key string) error
// CAS stands for Compare-And-Swap. Will call provided callback f with the
// CAS stands for Compare-And-Swap. Will call provided callback f with the
// current value of the key and allow callback to return a different value.
// Will then attempt to atomically swap the current value for the new value.
// If that doesn't succeed will try again - callback will be called again
// with new value etc. Guarantees that only a single concurrent CAS
// succeeds. Callback can return nil to indicate it is happy with existing
// with new value etc. Guarantees that only a single concurrent CAS
// succeeds. Callback can return nil to indicate it is happy with existing
// value.
//
// If the callback returns an error and true for retry, and the max number of
// attempts is not exceeded, the operation will be retried.
CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error
// WatchKey calls f whenever the value stored under key changes.

@ -401,7 +401,7 @@ func (c *Client) createRateLimiter() *rate.Limiter {
// WithCodec Clones and changes the codec of the consul client.
func (c *Client) WithCodec(codec codec.Codec) *Client {
new := *c
new.codec = codec
return &new
n := *c
n.codec = codec
return &n
}

@ -95,7 +95,7 @@ func (m *mockKV) loop() {
}
}
func (m *mockKV) Put(p *consul.KVPair, q *consul.WriteOptions) (*consul.WriteMeta, error) {
func (m *mockKV) Put(p *consul.KVPair, _ *consul.WriteOptions) (*consul.WriteMeta, error) {
m.mtx.Lock()
defer m.mtx.Unlock()
@ -119,7 +119,7 @@ func (m *mockKV) Put(p *consul.KVPair, q *consul.WriteOptions) (*consul.WriteMet
return nil, nil
}
func (m *mockKV) CAS(p *consul.KVPair, q *consul.WriteOptions) (bool, *consul.WriteMeta, error) {
func (m *mockKV) CAS(p *consul.KVPair, _ *consul.WriteOptions) (bool, *consul.WriteMeta, error) {
level.Debug(m.logger).Log("msg", "CAS", "key", p.Key, "modify_index", p.ModifyIndex, "value", fmt.Sprintf("%.40q", p.Value))
m.mtx.Lock()
@ -226,7 +226,7 @@ func (m *mockKV) List(prefix string, q *consul.QueryOptions) (consul.KVPairs, *c
return result, &consul.QueryMeta{LastIndex: m.current}, nil
}
func (m *mockKV) Delete(key string, q *consul.WriteOptions) (*consul.WriteMeta, error) {
func (m *mockKV) Delete(key string, _ *consul.WriteOptions) (*consul.WriteMeta, error) {
m.mtx.Lock()
defer m.mtx.Unlock()
delete(m.kvps, key)

@ -72,7 +72,7 @@ func (c *Client) Get(ctx context.Context, key string) (interface{}, error) {
}
// Delete is part of kv.Client interface.
func (c *Client) Delete(ctx context.Context, key string) error {
func (c *Client) Delete(_ context.Context, _ string) error {
return errors.New("memberlist does not support Delete")
}
@ -164,9 +164,7 @@ type KVConfig struct {
TCPTransport TCPTransportConfig `yaml:",inline"`
// Where to put custom metrics. Metrics are not registered, if this is nil.
MetricsRegisterer prometheus.Registerer `yaml:"-"`
MetricsNamespace string `yaml:"-"`
MetricsNamespace string `yaml:"-"`
// Codecs to register. Codecs need to be registered before joining other members.
Codecs []codec.Codec `yaml:"-"`
@ -354,7 +352,6 @@ var (
// trigger connecting to the existing memberlist cluster. If that fails and AbortIfJoinFails is true, error is returned
// and service enters Failed state.
func NewKV(cfg KVConfig, logger log.Logger, dnsProvider DNSProvider, registerer prometheus.Registerer) *KV {
cfg.TCPTransport.MetricsRegisterer = cfg.MetricsRegisterer
cfg.TCPTransport.MetricsNamespace = cfg.MetricsNamespace
mlkv := &KV{
@ -386,7 +383,7 @@ func defaultMemberlistConfig() *memberlist.Config {
}
func (m *KV) buildMemberlistConfig() (*memberlist.Config, error) {
tr, err := NewTCPTransport(m.cfg.TCPTransport, m.logger)
tr, err := NewTCPTransport(m.cfg.TCPTransport, m.logger, m.registerer)
if err != nil {
return nil, fmt.Errorf("failed to create transport: %v", err)
}
@ -693,7 +690,7 @@ func (m *KV) Get(key string, codec codec.Codec) (interface{}, error) {
}
// Returns current value with removed tombstones.
func (m *KV) get(key string, codec codec.Codec) (out interface{}, version uint, err error) {
func (m *KV) get(key string, _ codec.Codec) (out interface{}, version uint, err error) {
m.storeMu.Lock()
v := m.store[key].Clone()
m.storeMu.Unlock()
@ -983,7 +980,7 @@ func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec
}
// NodeMeta is method from Memberlist Delegate interface
func (m *KV) NodeMeta(limit int) []byte {
func (m *KV) NodeMeta(_ int) []byte {
// we can send local state from here (512 bytes only)
// if state is updated, we need to tell memberlist to distribute it.
return nil
@ -1117,7 +1114,7 @@ func (m *KV) GetBroadcasts(overhead, limit int) [][]byte {
// This is "pull" part of push/pull sync (either periodic, or when new node joins the cluster).
// Here we dump our entire state -- all keys and their values. There is no limit on message size here,
// as Memberlist uses 'stream' operations for transferring this state.
func (m *KV) LocalState(join bool) []byte {
func (m *KV) LocalState(_ bool) []byte {
if !m.delegateReady.Load() {
return nil
}
@ -1186,12 +1183,12 @@ func (m *KV) LocalState(join bool) []byte {
return buf.Bytes()
}
// MergeRemoteState is method from Memberlist Delegate interface
// MergeRemoteState is a method from the Memberlist Delegate interface.
//
// This is 'push' part of push/pull sync. We merge incoming KV store (all keys and values) with ours.
//
// Data is full state of remote KV store, as generated by LocalState method (run on another node).
func (m *KV) MergeRemoteState(data []byte, join bool) {
func (m *KV) MergeRemoteState(data []byte, _ bool) {
if !m.delegateReady.Load() {
return
}

@ -171,17 +171,25 @@ func (m *KV) createAndRegisterMetrics() {
Help: "Number of dropped notifications in WatchPrefix function",
}, []string{"prefix"})
if m.cfg.MetricsRegisterer == nil {
if m.registerer == nil {
return
}
m.registerer.MustRegister(m)
// memberlist uses armonmetrics package for internal usage
// here we configure armonmetrics to use prometheus
sink, err := armonprometheus.NewPrometheusSink() // there is no option to pass registrerer, this uses default
// here we configure armonmetrics to use prometheus.
opts := armonprometheus.PrometheusOpts{
Expiration: 0, // Don't expire metrics.
Registerer: m.registerer,
}
sink, err := armonprometheus.NewPrometheusSinkFrom(opts)
if err == nil {
cfg := armonmetrics.DefaultConfig("")
cfg.EnableHostname = false // no need to put hostname into metric
cfg.EnableHostnameLabel = false // no need to put hostname into labels
cfg.EnableServiceLabel = false // Don't put service name into a label. (We don't set service name anyway).
cfg.EnableRuntimeMetrics = false // metrics about Go runtime already provided by prometheus
cfg.EnableTypePrefix = true // to make better sense of internal memberlist metrics
cfg.TimerGranularity = time.Second // timers are in seconds in prometheus world

@ -56,8 +56,7 @@ type TCPTransportConfig struct {
TransportDebug bool `yaml:"-" category:"advanced"`
// Where to put custom metrics. nil = don't register.
MetricsRegisterer prometheus.Registerer `yaml:"-"`
MetricsNamespace string `yaml:"-"`
MetricsNamespace string `yaml:"-"`
TLSEnabled bool `yaml:"tls_enabled" category:"advanced"`
TLS dstls.ClientConfig `yaml:",inline"`
@ -113,7 +112,7 @@ type TCPTransport struct {
// NewTCPTransport returns a new tcp-based transport with the given configuration. On
// success all the network listeners will be created and listening.
func NewTCPTransport(config TCPTransportConfig, logger log.Logger) (*TCPTransport, error) {
func NewTCPTransport(config TCPTransportConfig, logger log.Logger, registerer prometheus.Registerer) (*TCPTransport, error) {
if len(config.BindAddrs) == 0 {
config.BindAddrs = []string{zeroZeroZeroZero}
}
@ -135,7 +134,7 @@ func NewTCPTransport(config TCPTransportConfig, logger log.Logger) (*TCPTranspor
}
}
t.registerMetrics(config.MetricsRegisterer)
t.registerMetrics(registerer)
// Clean up listeners if there's an error.
defer func() {

@ -16,24 +16,24 @@ func buildMockClient(logger log.Logger) (Client, error) {
return mockClient{}, nil
}
func (m mockClient) List(ctx context.Context, prefix string) ([]string, error) {
func (m mockClient) List(_ context.Context, _ string) ([]string, error) {
return []string{}, nil
}
func (m mockClient) Get(ctx context.Context, key string) (interface{}, error) {
func (m mockClient) Get(_ context.Context, _ string) (interface{}, error) {
return "", nil
}
func (m mockClient) Delete(ctx context.Context, key string) error {
func (m mockClient) Delete(_ context.Context, _ string) error {
return nil
}
func (m mockClient) CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error {
func (m mockClient) CAS(_ context.Context, _ string, _ func(in interface{}) (out interface{}, retry bool, err error)) error {
return nil
}
func (m mockClient) WatchKey(ctx context.Context, key string, f func(interface{}) bool) {
func (m mockClient) WatchKey(_ context.Context, _ string, _ func(interface{}) bool) {
}
func (m mockClient) WatchPrefix(ctx context.Context, prefix string, f func(string, interface{}) bool) {
func (m mockClient) WatchPrefix(_ context.Context, _ string, _ func(string, interface{}) bool) {
}

@ -152,7 +152,8 @@ func (t *Tree[E]) Push(list []E) {
}
// We need to expand the tree. Pick the next biggest power of 2 to amortise resizing cost.
size := 1
for ; size <= len(t.nodes)/2; size *= 2 {
for size <= len(t.nodes)/2 {
size *= 2
}
newPos := size + len(t.nodes)/2
newNodes := make([]node[E], size*2)

@ -57,14 +57,23 @@ type BasicLifecyclerConfig struct {
// If true lifecycler doesn't unregister instance from the ring when it's stopping. Default value is false,
// which means unregistering.
KeepInstanceInTheRingOnShutdown bool
// If set, specifies the TokenGenerator implementation that will be used for generating tokens.
// Default value is nil, which means that RandomTokenGenerator is used.
RingTokenGenerator TokenGenerator
}
// BasicLifecycler is a basic ring lifecycler which allows to hook custom
// logic at different stages of the lifecycle. This lifecycler should be
// used to build higher level lifecyclers.
//
// This lifecycler never change the instance state. It's the delegate
// responsibility to ChangeState().
/*
BasicLifecycler is a Service that is responsible for publishing changes to a ring for a single instance.
It accepts a delegate that can handle lifecycle events, and should be used to build higher level lifecyclers.
Unlike [Lifecycler], BasicLifecycler does not change instance state internally.
Rather, it's the delegate's responsibility to call [BasicLifecycler.ChangeState].
- When a BasicLifecycler first starts, it will call [ring.BasicLifecyclerDelegate.OnRingInstanceRegister] for the delegate, and will add the instance to the ring.
- The lifecycler will then periodically, based on the [ring.BasicLifecyclerConfig.TokensObservePeriod], attempt to verify that its tokens have been added to the ring, after which it will call [ring.BasicLifecyclerDelegate.OnRingInstanceTokens].
- The lifecycler will update they key/value store with heartbeats and state changes based on the [ring.BasicLifecyclerConfig.HeartbeatPeriod], calling [ring.BasicLifecyclerDelegate.OnRingInstanceHeartbeat] each time.
- When the BasicLifecycler is stopped, it will call [ring.BasicLifecyclerDelegate.OnRingInstanceStopping].
*/
type BasicLifecycler struct {
*services.BasicService
@ -87,10 +96,17 @@ type BasicLifecycler struct {
// Whether to keep the instance in the ring or to unregister it on shutdown
keepInstanceInTheRingOnShutdown *atomic.Bool
tokenGenerator TokenGenerator
}
// NewBasicLifecycler makes a new BasicLifecycler.
func NewBasicLifecycler(cfg BasicLifecyclerConfig, ringName, ringKey string, store kv.Client, delegate BasicLifecyclerDelegate, logger log.Logger, reg prometheus.Registerer) (*BasicLifecycler, error) {
tokenGenerator := cfg.RingTokenGenerator
if tokenGenerator == nil {
tokenGenerator = NewRandomTokenGenerator()
}
l := &BasicLifecycler{
cfg: cfg,
ringName: ringName,
@ -101,6 +117,7 @@ func NewBasicLifecycler(cfg BasicLifecyclerConfig, ringName, ringKey string, sto
metrics: NewBasicLifecyclerMetrics(ringName, reg),
actorChan: make(chan func()),
keepInstanceInTheRingOnShutdown: atomic.NewBool(cfg.KeepInstanceInTheRingOnShutdown),
tokenGenerator: tokenGenerator,
}
l.metrics.tokensToOwn.Set(float64(cfg.NumTokens))
@ -143,6 +160,10 @@ func (l *BasicLifecycler) GetTokens() Tokens {
return l.currInstanceDesc.GetTokens()
}
func (l *BasicLifecycler) GetTokenGenerator() TokenGenerator {
return l.tokenGenerator
}
// GetRegisteredAt returns the timestamp when the instance has been registered to the ring
// or a zero value if the lifecycler hasn't been started yet or was already registered and its
// timestamp is unknown.
@ -365,7 +386,7 @@ func (l *BasicLifecycler) verifyTokens(ctx context.Context) bool {
needTokens := l.cfg.NumTokens - len(actualTokens)
level.Info(l.logger).Log("msg", "generating new tokens", "count", needTokens, "ring", l.ringName)
newTokens := GenerateTokens(needTokens, takenTokens)
newTokens := l.tokenGenerator.GenerateTokens(needTokens, takenTokens)
actualTokens = append(actualTokens, newTokens...)
sort.Sort(actualTokens)

@ -164,7 +164,7 @@ func NewInstanceRegisterDelegate(state InstanceState, tokenCount int) InstanceRe
}
}
func (d InstanceRegisterDelegate) OnRingInstanceRegister(_ *BasicLifecycler, ringDesc Desc, instanceExists bool, instanceID string, instanceDesc InstanceDesc) (InstanceState, Tokens) {
func (d InstanceRegisterDelegate) OnRingInstanceRegister(l *BasicLifecycler, ringDesc Desc, instanceExists bool, _ string, instanceDesc InstanceDesc) (InstanceState, Tokens) {
// Keep the existing tokens if any, otherwise start with a clean situation.
var tokens []uint32
if instanceExists {
@ -172,7 +172,7 @@ func (d InstanceRegisterDelegate) OnRingInstanceRegister(_ *BasicLifecycler, rin
}
takenTokens := ringDesc.GetTokens()
newTokens := GenerateTokens(d.tokenCount-len(tokens), takenTokens)
newTokens := l.GetTokenGenerator().GenerateTokens(d.tokenCount-len(tokens), takenTokens)
// Tokens sorting will be enforced by the parent caller.
tokens = append(tokens, newTokens...)

@ -80,7 +80,7 @@ func NewPool(clientName string, cfg PoolConfig, discovery PoolServiceDiscovery,
return p
}
func (p *Pool) iteration(ctx context.Context) error {
func (p *Pool) iteration(_ context.Context) error {
p.removeStaleClients()
if p.cfg.HealthCheckEnabled {
p.cleanUnhealthy()
@ -103,8 +103,11 @@ func (p *Pool) GetClientFor(addr string) (PoolClient, error) {
return client, nil
}
// No client in cache so create one
p.Lock()
defer p.Unlock()
// Check if a client has been created just after checking the cache and before acquiring the lock.
client, ok = p.clients[addr]
if ok {
return client, nil

@ -0,0 +1,20 @@
/*
Package ring contains types and functions for creating and working with rings.
# Overview
Rings are shared between instances via a key-value store, and are represented by the [Desc] struct, which contains a map of [InstanceDesc]
structs representing individual instances in the ring.
# Creating a Ring
Two types are available for creating and updating rings:
- [Lifecycler] - A Service that writes to a ring on behalf of a single instance. It's responsible for claiming tokens on the ring and updating the key/value store with heartbeats, state changes, and token changes. This type is the original lifecycler implementation, and [BasicLifecycler] should be used instead for new services.
- [BasicLifecycler] - A Service that writes to a ring on behalf of a single instance. It's responsible for claiming tokens on the ring, updating the key/value store with heartbeats and state changes, and uses a delegate with event listeners to help with these. This type is general purpose, is used by numerous services, and is meant for building higher level lifecyclers.
# Observing a ring
The [Ring] type is a Service that is primarily used for reading and watching for changes to a ring.
*/
package ring

@ -31,6 +31,6 @@ func NewNoopFlushTransferer() *NoopFlushTransferer {
func (t *NoopFlushTransferer) Flush() {}
// TransferOut is a noop
func (t *NoopFlushTransferer) TransferOut(ctx context.Context) error {
func (t *NoopFlushTransferer) TransferOut(_ context.Context) error {
return nil
}

@ -4,11 +4,12 @@ import (
"context"
"flag"
"fmt"
"math/rand"
"net"
"net/http"
"os"
"sort"
strconv "strconv"
"strconv"
"sync"
"time"
@ -53,6 +54,10 @@ type LifecyclerConfig struct {
// Injected internally
ListenPort int `yaml:"-"`
// If set, specifies the TokenGenerator implementation that will be used for generating tokens.
// Default value is nil, which means that RandomTokenGenerator is used.
RingTokenGenerator TokenGenerator `yaml:"-"`
}
// RegisterFlags adds the flags required to config this to the given FlagSet.
@ -97,7 +102,14 @@ func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.Flag
f.BoolVar(&cfg.EnableInet6, prefix+"enable-inet6", false, "Enable IPv6 support. Required to make use of IP addresses from IPv6 interfaces.")
}
// Lifecycler is responsible for managing the lifecycle of entries in the ring.
/*
Lifecycler is a Service that is responsible for publishing changes to a ring for a single instance.
- When a Lifecycler first starts, it will be in a [PENDING] state.
- After the configured [ring.LifecyclerConfig.JoinAfter] period, it selects some random tokens and enters the [JOINING] state, creating or updating the ring as needed.
- The lifecycler will then periodically, based on the [ring.LifecyclerConfig.ObservePeriod], attempt to verify that its tokens have been added to the ring, after which it will transition to the [ACTIVE] state.
- The lifecycler will update the key/value store with heartbeats, state changes, and token changes, based on the [ring.LifecyclerConfig.HeartbeatPeriod].
*/
type Lifecycler struct {
*services.BasicService
@ -138,6 +150,8 @@ type Lifecycler struct {
instancesInZoneCount int
zonesCount int
tokenGenerator TokenGenerator
lifecyclerMetrics *LifecyclerMetrics
logger log.Logger
}
@ -167,6 +181,11 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringNa
flushTransferer = NewNoopFlushTransferer()
}
tokenGenerator := cfg.RingTokenGenerator
if tokenGenerator == nil {
tokenGenerator = NewRandomTokenGenerator()
}
l := &Lifecycler{
cfg: cfg,
flushTransferer: flushTransferer,
@ -181,6 +200,7 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringNa
Zone: cfg.Zone,
actorChan: make(chan func()),
state: PENDING,
tokenGenerator: tokenGenerator,
lifecyclerMetrics: NewLifecyclerMetrics(ringName, reg),
logger: logger,
}
@ -552,7 +572,7 @@ heartbeatLoop:
}
// initRing is the first thing we do when we start. It:
// - add an ingester entry to the ring
// - adds an ingester entry to the ring
// - copies out our state and tokens if they exist
func (i *Lifecycler) initRing(ctx context.Context) error {
var (
@ -616,23 +636,42 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
return ringDesc, true, nil
}
// If the ingester failed to clean its ring entry up in can leave its state in LEAVING
// OR unregister_on_shutdown=false
// Move it into ACTIVE to ensure the ingester joins the ring.
if instanceDesc.State == LEAVING && len(instanceDesc.Tokens) == i.cfg.NumTokens {
tokens := Tokens(instanceDesc.Tokens)
level.Info(i.logger).Log("msg", "existing instance found in ring", "state", instanceDesc.State, "tokens",
len(tokens), "ring", i.RingName)
// If the ingester fails to clean its ring entry up or unregister_on_shutdown=false, it can leave behind its
// ring state as LEAVING. Make sure to switch to the ACTIVE state.
if instanceDesc.State == LEAVING {
delta := i.cfg.NumTokens - len(tokens)
if delta > 0 {
// We need more tokens
level.Info(i.logger).Log("msg", "existing instance has too few tokens, adding difference",
"current_tokens", len(tokens), "desired_tokens", i.cfg.NumTokens)
newTokens := i.tokenGenerator.GenerateTokens(delta, ringDesc.GetTokens())
tokens = append(tokens, newTokens...)
sort.Sort(tokens)
} else if delta < 0 {
// We have too many tokens
level.Info(i.logger).Log("msg", "existing instance has too many tokens, removing difference",
"current_tokens", len(tokens), "desired_tokens", i.cfg.NumTokens)
// Make sure we don't pick the N smallest tokens, since that would increase the chance of the instance receiving only smaller hashes.
rand.Shuffle(len(tokens), tokens.Swap)
tokens = tokens[0:i.cfg.NumTokens]
sort.Sort(tokens)
}
instanceDesc.State = ACTIVE
instanceDesc.Tokens = tokens
}
// We're taking over this entry, update instanceDesc with our values
instanceDesc.Addr = i.Addr
instanceDesc.Zone = i.Zone
// We exist in the ring, so assume the ring is right and copy out tokens & state out of there.
// Set the local state based on the updated instance.
i.setState(instanceDesc.State)
tokens, _ := ringDesc.TokensFor(i.ID)
i.setTokens(tokens)
level.Info(i.logger).Log("msg", "existing entry found in ring", "state", i.GetState(), "tokens", len(tokens), "ring", i.RingName)
// We're taking over this entry, update instanceDesc with our values
instanceDesc.Addr = i.Addr
instanceDesc.Zone = i.Zone
// Update the ring if the instance has been changed. We don't want to rely on heartbeat update, as heartbeat
// can be configured to long time, and until then lifecycler would not report this instance as ready in CheckReady.
@ -673,11 +712,11 @@ func (i *Lifecycler) verifyTokens(ctx context.Context) bool {
ringTokens, takenTokens := ringDesc.TokensFor(i.ID)
if !i.compareTokens(ringTokens) {
// uh, oh... our tokens are not our anymore. Let's try new ones.
// uh, oh... our tokens are not ours anymore. Let's try new ones.
needTokens := i.cfg.NumTokens - len(ringTokens)
level.Info(i.logger).Log("msg", "generating new tokens", "count", needTokens, "ring", i.RingName)
newTokens := GenerateTokens(needTokens, takenTokens)
newTokens := i.tokenGenerator.GenerateTokens(needTokens, takenTokens)
ringTokens = append(ringTokens, newTokens...)
sort.Sort(ringTokens)
@ -737,7 +776,7 @@ func (i *Lifecycler) autoJoin(ctx context.Context, targetState InstanceState) er
level.Error(i.logger).Log("msg", "tokens already exist for this instance - wasn't expecting any!", "num_tokens", len(myTokens), "ring", i.RingName)
}
newTokens := GenerateTokens(i.cfg.NumTokens-len(myTokens), takenTokens)
newTokens := i.tokenGenerator.GenerateTokens(i.cfg.NumTokens-len(myTokens), takenTokens)
i.setState(targetState)
myTokens = append(myTokens, newTokens...)

@ -0,0 +1,136 @@
package ring
import (
"fmt"
"math"
"strings"
)
type ringItem interface {
key() int
String() string
}
type ringInstance struct {
instanceID int
}
func (ri ringInstance) key() int {
return ri.instanceID
}
func (ri ringInstance) String() string {
return fmt.Sprintf("[instanceID: %d]", ri.instanceID)
}
type ringToken struct {
token uint32
prevToken uint32
}
func (rt ringToken) key() int {
return int(rt.token)
}
func (rt ringToken) String() string {
return fmt.Sprintf("[token: %d, prevToken: %d]", rt.token, rt.prevToken)
}
type ownershipInfo[T ringItem] struct {
item T
ownership float64
}
func newRingTokenOwnershipInfo(token, prevToken uint32) ownershipInfo[ringToken] {
ownership := float64(tokenDistance(prevToken, token))
return ownershipInfo[ringToken]{
ownership: ownership,
item: ringToken{
token: token,
prevToken: prevToken,
},
}
}
func newRingInstanceOwnershipInfo(instanceID int, ownership float64) ownershipInfo[ringInstance] {
return ownershipInfo[ringInstance]{
ownership: ownership,
item: ringInstance{
instanceID: instanceID,
},
}
}
// ownershipPriorityQueue is a max-heap, i.e., a priority queue
// where items with a higher priority will be extracted first.
// Namely, items with a higher ownership have a higher priority.
// In order to guarantee that 2 instances of ownershipPriorityQueue
// with the same items always assign equal priorities to equal items,
// in the case of items with equal ownership, we rely on the
// order of item ids.
type ownershipPriorityQueue[T ringItem] struct {
items []ownershipInfo[T]
}
func newPriorityQueue[T ringItem](capacity int) ownershipPriorityQueue[T] {
return ownershipPriorityQueue[T]{
items: make([]ownershipInfo[T], 0, capacity),
}
}
func (pq *ownershipPriorityQueue[T]) Len() int {
return len(pq.items)
}
func (pq *ownershipPriorityQueue[T]) Swap(i, j int) {
pq.items[i], pq.items[j] = pq.items[j], pq.items[i]
}
func (pq *ownershipPriorityQueue[T]) Less(i, j int) bool {
if pq.items[i].ownership == pq.items[j].ownership {
// In order to guarantee the stability, i.e., that the same instanceID and zone as input
// always generate the same slice of tokens as output, we enforce that by equal ownership
// higher priority is determined by the order of ids.
return pq.items[i].item.key() > pq.items[j].item.key()
}
// We are implementing a max-heap, so we are using > here.
// Since we compare float64, NaN values must be placed at the end.
return pq.items[i].ownership > pq.items[j].ownership || (math.IsNaN(pq.items[j].ownership) && !math.IsNaN(pq.items[i].ownership))
}
// Push implements heap.Push(any). It pushes the element item onto ownershipPriorityQueue.
func (pq *ownershipPriorityQueue[T]) Push(item any) {
ownershipInfo := item.(ownershipInfo[T])
pq.items = append(pq.items, ownershipInfo)
}
// Pop implements heap.Pop(). It removes and returns the element with the highest priority from ownershipPriorityQueue.
func (pq *ownershipPriorityQueue[T]) Pop() any {
n := len(pq.items)
item := pq.items[n-1]
pq.items = pq.items[0 : n-1]
return item
}
// Peek the returns the element with the highest priority from ownershipPriorityQueue,
// but it does not remove it from the latter. Time complexity is O(1).
func (pq *ownershipPriorityQueue[T]) Peek() *ownershipInfo[T] {
if len(pq.items) == 0 {
return nil
}
return &pq.items[0]
}
func (pq *ownershipPriorityQueue[T]) String() string {
return fmt.Sprintf("[%s]", strings.Join(mapItems(pq.items, func(item ownershipInfo[T]) string {
return fmt.Sprintf("%s-ownership: %.3f", item.item, item.ownership)
}), ","))
}
func mapItems[T, V any](in []T, mapItem func(T) V) []V {
out := make([]V, len(in))
for i, v := range in {
out[i] = mapItem(v)
}
return out
}

@ -2,6 +2,7 @@ package ring
import (
"context"
"errors"
"sort"
"time"
)
@ -89,31 +90,112 @@ func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(cont
return results, nil
}
type DoUntilQuorumConfig struct {
// If true, enable request minimization.
// See docs for DoUntilQuorum for more information.
MinimizeRequests bool
// If non-zero and MinimizeRequests is true, enables hedging.
// See docs for DoUntilQuorum for more information.
HedgingDelay time.Duration
}
func (c DoUntilQuorumConfig) Validate() error {
if c.HedgingDelay < 0 {
return errors.New("invalid DoUntilQuorumConfig: HedgingDelay must be non-negative")
}
return nil
}
// DoUntilQuorum runs function f in parallel for all replicas in r.
//
// If r.MaxUnavailableZones is greater than zero:
// # Result selection
//
// If r.MaxUnavailableZones is greater than zero, DoUntilQuorum operates in zone-aware mode:
// - DoUntilQuorum returns an error if calls to f for instances in more than r.MaxUnavailableZones zones return errors
// - Otherwise, DoUntilQuorum returns all results from all replicas in the first zones for which f succeeds
// for every instance in that zone (eg. if there are 3 zones and r.MaxUnavailableZones is 1, DoUntilQuorum will
// return the results from all instances in 2 zones, even if all calls to f succeed).
//
// Otherwise:
// Otherwise, DoUntilQuorum operates in non-zone-aware mode:
// - DoUntilQuorum returns an error if more than r.MaxErrors calls to f return errors
// - Otherwise, DoUntilQuorum returns all results from the first len(r.Instances) - r.MaxErrors instances
// (eg. if there are 6 replicas and r.MaxErrors is 2, DoUntilQuorum will return the results from the first 4
// successful calls to f, even if all 6 calls to f succeed).
//
// # Request minimization
//
// cfg.MinimizeRequests enables or disables request minimization.
//
// Regardless of the value of cfg.MinimizeRequests, if one of the termination conditions above is satisfied or ctx is
// cancelled before f is called for an instance, f may not be called for that instance at all.
//
// ## When disabled
//
// If request minimization is disabled, DoUntilQuorum will call f for each instance in r. The value of cfg.HedgingDelay
// is ignored.
//
// ## When enabled
//
// If request minimization is enabled, DoUntilQuorum will initially call f for the minimum number of instances needed to
// reach the termination conditions above, and later call f for further instances if required. For example, if
// r.MaxUnavailableZones is 1 and there are three zones, DoUntilQuorum will initially only call f for instances in two
// zones, and only call f for instances in the remaining zone if a request in the initial two zones fails.
//
// DoUntilQuorum will randomly select available zones / instances such that calling DoUntilQuorum multiple times with
// the same ReplicationSet should evenly distribute requests across all zones / instances.
//
// If cfg.HedgingDelay is non-zero, DoUntilQuorum will call f for an additional zone's instances (if zone-aware) / an
// additional instance (if not zone-aware) every cfg.HedgingDelay until one of the termination conditions above is
// reached. For example, if r.MaxUnavailableZones is 2, cfg.HedgingDelay is 4 seconds and there are fives zones,
// DoUntilQuorum will initially only call f for instances in three zones, and unless one of the termination conditions
// is reached earlier, will then call f for instances in a fourth zone approximately 4 seconds later, and then call f
// for instances in the final zone approximately 4 seconds after that (ie. roughly 8 seconds since the call to
// DoUntilQuorum began).
//
// # Cleanup
//
// Any results from successful calls to f that are not returned by DoUntilQuorum will be passed to cleanupFunc,
// including when DoUntilQuorum returns an error or only returns a subset of successful results. cleanupFunc may
// be called both before and after DoUntilQuorum returns.
//
// A call to f is considered successful if it returns a nil error.
//
// DoUntilQuorum cancels the context.Context passed to each invocation of f if the result of that invocation of
// f will not be returned. If the result of that invocation of f will be returned, the context.Context passed
// to that invocation of f will not be cancelled by DoUntilQuorum, but the context.Context is a child of ctx
// passed to DoUntilQuorum and so will be cancelled if ctx is cancelled.
func DoUntilQuorum[T any](ctx context.Context, r ReplicationSet, f func(context.Context, *InstanceDesc) (T, error), cleanupFunc func(T)) ([]T, error) {
// # Contexts
//
// The context.Context passed to an invocation of f may be cancelled at any time if the result of that invocation of
// f will not be used.
//
// DoUntilQuorum cancels the context.Context passed to each invocation of f before DoUntilQuorum returns.
func DoUntilQuorum[T any](ctx context.Context, r ReplicationSet, cfg DoUntilQuorumConfig, f func(context.Context, *InstanceDesc) (T, error), cleanupFunc func(T)) ([]T, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
wrappedF := func(ctx context.Context, desc *InstanceDesc, _ context.CancelFunc) (T, error) {
return f(ctx, desc)
}
return DoUntilQuorumWithoutSuccessfulContextCancellation(ctx, r, cfg, wrappedF, cleanupFunc)
}
// DoUntilQuorumWithoutSuccessfulContextCancellation behaves the same as DoUntilQuorum, except it does not cancel
// the context.Context passed to invocations of f whose results are returned.
//
// For example, this is useful in situations where DoUntilQuorumWithoutSuccessfulContextCancellation is used
// to establish a set of streams that will be used after DoUntilQuorumWithoutSuccessfulContextCancellation returns.
//
// It is the caller's responsibility to ensure that either of the following are eventually true:
// - ctx is cancelled, or
// - the corresponding context.CancelFunc is called for all invocations of f whose results are returned by
// DoUntilQuorumWithoutSuccessfulContextCancellation
//
// Failing to do this may result in a memory leak.
func DoUntilQuorumWithoutSuccessfulContextCancellation[T any](ctx context.Context, r ReplicationSet, cfg DoUntilQuorumConfig, f func(context.Context, *InstanceDesc, context.CancelFunc) (T, error), cleanupFunc func(T)) ([]T, error) {
if err := cfg.Validate(); err != nil {
return nil, err
}
resultsChan := make(chan instanceResult[T], len(r.Instances))
resultsRemaining := len(r.Instances)
@ -140,12 +222,28 @@ func DoUntilQuorum[T any](ctx context.Context, r ReplicationSet, f func(context.
contextTracker = newDefaultContextTracker(ctx, r.Instances)
}
if cfg.MinimizeRequests {
resultTracker.startMinimumRequests()
} else {
resultTracker.startAllRequests()
}
for i := range r.Instances {
instance := &r.Instances[i]
instanceCtx := contextTracker.contextFor(instance)
ctx, ctxCancel := contextTracker.contextFor(instance)
go func(desc *InstanceDesc) {
result, err := f(instanceCtx, desc)
if err := resultTracker.awaitStart(ctx, desc); err != nil {
// Post to resultsChan so that the deferred cleanup handler above eventually terminates.
resultsChan <- instanceResult[T]{
err: err,
instance: desc,
}
return
}
result, err := f(ctx, desc, ctxCancel)
resultsChan <- instanceResult[T]{
result: result,
err: err,
@ -161,6 +259,14 @@ func DoUntilQuorum[T any](ctx context.Context, r ReplicationSet, f func(context.
}
}
var hedgingTrigger <-chan time.Time
if cfg.HedgingDelay > 0 {
ticker := time.NewTicker(cfg.HedgingDelay)
defer ticker.Stop()
hedgingTrigger = ticker.C
}
for !resultTracker.succeeded() {
select {
case <-ctx.Done():
@ -168,6 +274,8 @@ func DoUntilQuorum[T any](ctx context.Context, r ReplicationSet, f func(context.
cleanupResultsAlreadyReceived()
return nil, ctx.Err()
case <-hedgingTrigger:
resultTracker.startAdditionalRequests()
case result := <-resultsChan:
resultsRemaining--
resultTracker.done(result.instance, result.err)

@ -1,6 +1,12 @@
package ring
import "context"
import (
"context"
"errors"
"math/rand"
"go.uber.org/atomic"
)
type replicationSetResultTracker interface {
// Signals an instance has done the execution, either successful (no error)
@ -20,11 +26,42 @@ type replicationSetResultTracker interface {
// This method should only be called after succeeded returns true for the first time and before
// calling done any further times.
shouldIncludeResultFrom(instance *InstanceDesc) bool
// Starts an initial set of requests sufficient to meet the quorum requirements of this tracker.
// Further requests will be started if necessary when done is called with a non-nil error.
// Calling this method multiple times may lead to unpredictable behaviour.
// Calling both this method and releaseAllRequests may lead to unpredictable behaviour.
// This method must only be called before calling done.
startMinimumRequests()
// Starts additional request(s) as defined by the quorum requirements of this tracker.
// For example, a zone-aware tracker would start requests for another zone, whereas a
// non-zone-aware tracker would start a request for another instance.
// This method must only be called after calling startMinimumRequests or startAllRequests.
// If requests for all instances have already been started, this method does nothing.
// This method must only be called before calling done.
startAdditionalRequests()
// Starts requests for all instances.
// Calling this method multiple times may lead to unpredictable behaviour.
// Calling both this method and releaseMinimumRequests may lead to unpredictable behaviour.
// This method must only be called before calling done.
startAllRequests()
// Blocks until the request for this instance should be started.
// Returns nil if the request should be started, or a non-nil error if the request is not required
// or ctx has been cancelled.
// Must only be called after releaseMinimumRequests or releaseAllRequests returns.
// Calling this method multiple times for the same instance may lead to unpredictable behaviour.
awaitStart(ctx context.Context, instance *InstanceDesc) error
}
type replicationSetContextTracker interface {
// Returns a context.Context for instance.
contextFor(instance *InstanceDesc) context.Context
// Returns a context.Context and context.CancelFunc for instance.
// The context.CancelFunc will only cancel the context for this instance (ie. if this tracker
// is zone-aware, calling the context.CancelFunc should not cancel contexts for other instances
// in the same zone).
contextFor(instance *InstanceDesc) (context.Context, context.CancelFunc)
// Cancels the context for instance previously obtained with contextFor.
// This method may cancel the context for other instances if those other instances are part of
@ -35,11 +72,16 @@ type replicationSetContextTracker interface {
cancelAllContexts()
}
var errResultNotNeeded = errors.New("result from this instance is not needed")
type defaultResultTracker struct {
minSucceeded int
numSucceeded int
numErrors int
maxErrors int
minSucceeded int
numSucceeded int
numErrors int
maxErrors int
instances []InstanceDesc
instanceRelease map[*InstanceDesc]chan struct{}
pendingInstances []*InstanceDesc
}
func newDefaultResultTracker(instances []InstanceDesc, maxErrors int) *defaultResultTracker {
@ -48,14 +90,20 @@ func newDefaultResultTracker(instances []InstanceDesc, maxErrors int) *defaultRe
numSucceeded: 0,
numErrors: 0,
maxErrors: maxErrors,
instances: instances,
}
}
func (t *defaultResultTracker) done(_ *InstanceDesc, err error) {
if err == nil {
t.numSucceeded++
if t.succeeded() {
t.onSucceeded()
}
} else {
t.numErrors++
t.startAdditionalRequests()
}
}
@ -63,6 +111,15 @@ func (t *defaultResultTracker) succeeded() bool {
return t.numSucceeded >= t.minSucceeded
}
func (t *defaultResultTracker) onSucceeded() {
// We don't need any of the requests that are waiting to be released. Signal that they should abort.
for _, i := range t.pendingInstances {
close(t.instanceRelease[i])
}
t.pendingInstances = nil
}
func (t *defaultResultTracker) failed() bool {
return t.numErrors > t.maxErrors
}
@ -71,6 +128,66 @@ func (t *defaultResultTracker) shouldIncludeResultFrom(_ *InstanceDesc) bool {
return true
}
func (t *defaultResultTracker) startMinimumRequests() {
t.instanceRelease = make(map[*InstanceDesc]chan struct{}, len(t.instances))
for i := range t.instances {
instance := &t.instances[i]
t.instanceRelease[instance] = make(chan struct{}, 1)
}
releaseOrder := rand.Perm(len(t.instances))
t.pendingInstances = make([]*InstanceDesc, 0, t.maxErrors)
for _, instanceIdx := range releaseOrder {
instance := &t.instances[instanceIdx]
if len(t.pendingInstances) < t.maxErrors {
t.pendingInstances = append(t.pendingInstances, instance)
} else {
t.instanceRelease[instance] <- struct{}{}
}
}
// If we've already succeeded (which should only happen if the replica set is misconfigured with MaxErrors >= the number of instances),
// then make sure we don't block requests forever.
if t.succeeded() {
t.onSucceeded()
}
}
func (t *defaultResultTracker) startAdditionalRequests() {
if len(t.pendingInstances) > 0 {
// There are some outstanding requests we could make before we reach maxErrors. Release the next one.
i := t.pendingInstances[0]
t.instanceRelease[i] <- struct{}{}
t.pendingInstances = t.pendingInstances[1:]
}
}
func (t *defaultResultTracker) startAllRequests() {
t.instanceRelease = make(map[*InstanceDesc]chan struct{}, len(t.instances))
for i := range t.instances {
instance := &t.instances[i]
t.instanceRelease[instance] = make(chan struct{}, 1)
t.instanceRelease[instance] <- struct{}{}
}
}
func (t *defaultResultTracker) awaitStart(ctx context.Context, instance *InstanceDesc) error {
select {
case <-ctx.Done():
return ctx.Err()
case _, ok := <-t.instanceRelease[instance]:
if ok {
return nil
}
return errResultNotNeeded
}
}
type defaultContextTracker struct {
ctx context.Context
cancelFuncs map[*InstanceDesc]context.CancelFunc
@ -83,10 +200,10 @@ func newDefaultContextTracker(ctx context.Context, instances []InstanceDesc) *de
}
}
func (t *defaultContextTracker) contextFor(instance *InstanceDesc) context.Context {
func (t *defaultContextTracker) contextFor(instance *InstanceDesc) (context.Context, context.CancelFunc) {
ctx, cancel := context.WithCancel(t.ctx)
t.cancelFuncs[instance] = cancel
return ctx
return ctx, cancel
}
func (t *defaultContextTracker) cancelContextFor(instance *InstanceDesc) {
@ -110,6 +227,9 @@ type zoneAwareResultTracker struct {
failuresByZone map[string]int
minSuccessfulZones int
maxUnavailableZones int
zoneRelease map[string]chan struct{}
zoneShouldStart map[string]*atomic.Bool
pendingZones []string
}
func newZoneAwareResultTracker(instances []InstanceDesc, maxUnavailableZones int) *zoneAwareResultTracker {
@ -125,14 +245,27 @@ func newZoneAwareResultTracker(instances []InstanceDesc, maxUnavailableZones int
t.minSuccessfulZones = len(t.waitingByZone) - maxUnavailableZones
if t.minSuccessfulZones < 0 {
t.minSuccessfulZones = 0
}
return t
}
func (t *zoneAwareResultTracker) done(instance *InstanceDesc, err error) {
t.waitingByZone[instance.Zone]--
if err != nil {
if err == nil {
if t.succeeded() {
t.onSucceeded()
}
} else {
t.failuresByZone[instance.Zone]++
if t.failuresByZone[instance.Zone] == 1 {
// If this was the first failure for this zone, release another zone's requests and signal they should start.
t.startAdditionalRequests()
}
}
}
@ -150,6 +283,15 @@ func (t *zoneAwareResultTracker) succeeded() bool {
return successfulZones >= t.minSuccessfulZones
}
func (t *zoneAwareResultTracker) onSucceeded() {
// We don't need any of the requests that are waiting to be released. Signal that they should abort.
for _, zone := range t.pendingZones {
t.releaseZone(zone, false)
}
t.pendingZones = nil
}
func (t *zoneAwareResultTracker) failed() bool {
failedZones := len(t.failuresByZone)
return failedZones > t.maxUnavailableZones
@ -159,42 +301,118 @@ func (t *zoneAwareResultTracker) shouldIncludeResultFrom(instance *InstanceDesc)
return t.failuresByZone[instance.Zone] == 0 && t.waitingByZone[instance.Zone] == 0
}
func (t *zoneAwareResultTracker) startMinimumRequests() {
t.createReleaseChannels()
allZones := make([]string, 0, len(t.waitingByZone))
for zone := range t.waitingByZone {
allZones = append(allZones, zone)
}
rand.Shuffle(len(allZones), func(i, j int) {
allZones[i], allZones[j] = allZones[j], allZones[i]
})
for i := 0; i < t.minSuccessfulZones; i++ {
t.releaseZone(allZones[i], true)
}
t.pendingZones = allZones[t.minSuccessfulZones:]
// If we've already succeeded (which should only happen if the replica set is misconfigured with MaxUnavailableZones >= the number of zones),
// then make sure we don't block requests forever.
if t.succeeded() {
t.onSucceeded()
}
}
func (t *zoneAwareResultTracker) startAdditionalRequests() {
if len(t.pendingZones) > 0 {
// If there are more zones we could try before reaching maxUnavailableZones, release another zone's requests and signal they should start.
t.releaseZone(t.pendingZones[0], true)
t.pendingZones = t.pendingZones[1:]
}
}
func (t *zoneAwareResultTracker) startAllRequests() {
t.createReleaseChannels()
for zone := range t.waitingByZone {
t.releaseZone(zone, true)
}
}
func (t *zoneAwareResultTracker) createReleaseChannels() {
t.zoneRelease = make(map[string]chan struct{}, len(t.waitingByZone))
t.zoneShouldStart = make(map[string]*atomic.Bool, len(t.waitingByZone))
for zone := range t.waitingByZone {
t.zoneRelease[zone] = make(chan struct{})
t.zoneShouldStart[zone] = atomic.NewBool(false)
}
}
func (t *zoneAwareResultTracker) releaseZone(zone string, shouldStart bool) {
t.zoneShouldStart[zone].Store(shouldStart)
close(t.zoneRelease[zone])
}
func (t *zoneAwareResultTracker) awaitStart(ctx context.Context, instance *InstanceDesc) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-t.zoneRelease[instance.Zone]:
if t.zoneShouldStart[instance.Zone].Load() {
return nil
}
return errResultNotNeeded
}
}
type zoneAwareContextTracker struct {
contexts map[string]context.Context
cancelFuncs map[string]context.CancelFunc
contexts map[*InstanceDesc]context.Context
cancelFuncs map[*InstanceDesc]context.CancelFunc
}
func newZoneAwareContextTracker(ctx context.Context, instances []InstanceDesc) *zoneAwareContextTracker {
t := &zoneAwareContextTracker{
contexts: map[string]context.Context{},
cancelFuncs: map[string]context.CancelFunc{},
contexts: make(map[*InstanceDesc]context.Context, len(instances)),
cancelFuncs: make(map[*InstanceDesc]context.CancelFunc, len(instances)),
}
for _, instance := range instances {
if _, ok := t.contexts[instance.Zone]; !ok {
zoneCtx, cancel := context.WithCancel(ctx)
t.contexts[instance.Zone] = zoneCtx
t.cancelFuncs[instance.Zone] = cancel
}
for i := range instances {
instance := &instances[i]
ctx, cancel := context.WithCancel(ctx)
t.contexts[instance] = ctx
t.cancelFuncs[instance] = cancel
}
return t
}
func (t *zoneAwareContextTracker) contextFor(instance *InstanceDesc) context.Context {
return t.contexts[instance.Zone]
func (t *zoneAwareContextTracker) contextFor(instance *InstanceDesc) (context.Context, context.CancelFunc) {
return t.contexts[instance], t.cancelFuncs[instance]
}
func (t *zoneAwareContextTracker) cancelContextFor(instance *InstanceDesc) {
if cancel, ok := t.cancelFuncs[instance.Zone]; ok {
cancel()
delete(t.cancelFuncs, instance.Zone)
// Why not create a per-zone parent context to make this easier?
// If we create a per-zone parent context, we'd need to have some way to cancel the per-zone context when the last of the individual
// contexts in a zone are cancelled using the context.CancelFunc returned from contextFor.
for i, cancel := range t.cancelFuncs {
if i.Zone == instance.Zone {
cancel()
delete(t.contexts, i)
delete(t.cancelFuncs, i)
}
}
}
func (t *zoneAwareContextTracker) cancelAllContexts() {
for zone, cancel := range t.cancelFuncs {
for instance, cancel := range t.cancelFuncs {
cancel()
delete(t.cancelFuncs, zone)
delete(t.contexts, instance)
delete(t.cancelFuncs, instance)
}
}

@ -37,7 +37,6 @@ const (
// ReadRing represents the read interface to the ring.
type ReadRing interface {
// Get returns n (or more) instances which form the replicas for the given key.
// bufDescs, bufHosts and bufZones are slices to be overwritten for the return value
// to avoid memory allocation; can be nil, or created with ring.MakeBuffersForGet().
@ -152,7 +151,7 @@ type instanceInfo struct {
Zone string
}
// Ring holds the information about the members of the consistent hash ring.
// Ring is a Service that maintains an in-memory copy of a ring and watches for changes.
type Ring struct {
services.Service
@ -171,7 +170,7 @@ type Ring struct {
oldestRegisteredTimestamp int64
// Maps a token with the information of the instance holding it. This map is immutable and
// cannot be chanced in place because it's shared "as is" between subrings (the only way to
// cannot be changed in place because it's shared "as is" between subrings (the only way to
// change it is to create a new one and replace it).
ringInstanceByToken map[uint32]instanceInfo
@ -239,16 +238,19 @@ func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client
numMembersGaugeVec: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "ring_members",
Help: "Number of members in the ring",
ConstLabels: map[string]string{"name": name}},
ConstLabels: map[string]string{"name": name},
},
[]string{"state"}),
totalTokensGauge: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "ring_tokens_total",
Help: "Number of tokens in the ring",
ConstLabels: map[string]string{"name": name}}),
ConstLabels: map[string]string{"name": name},
}),
oldestTimestampGaugeVec: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "ring_oldest_member_timestamp",
Help: "Timestamp of the oldest member in the ring.",
ConstLabels: map[string]string{"name": name}},
ConstLabels: map[string]string{"name": name},
},
[]string{"state"}),
logger: logger,
}
@ -523,26 +525,31 @@ func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, erro
}
// CountTokens returns the number tokens within the range for each instance.
func (r *Desc) CountTokens() map[string]uint32 {
// In case of zone-awareness, this method takes into account only tokens of
// the same zone. More precisely, for each instance only the distance between
// its tokens and tokens of the instances from the same zone will be considered.
func (r *Desc) CountTokens() map[string]int64 {
var (
owned = make(map[string]uint32, len(r.Ingesters))
ringTokens = r.GetTokens()
owned = make(map[string]int64, len(r.Ingesters))
ringTokensByZone = r.getTokensByZone()
ringInstanceByToken = r.getTokensInfo()
)
for i, token := range ringTokens {
var diff uint32
for _, ringTokens := range ringTokensByZone {
for i, token := range ringTokens {
var prevToken uint32
// Compute how many tokens are within the range.
if i == 0 {
lastToken := ringTokens[len(ringTokens)-1]
diff = token + (math.MaxUint32 - lastToken)
} else {
diff = token - ringTokens[i-1]
}
// Compute how many tokens are within the range.
if i == 0 {
prevToken = ringTokens[len(ringTokens)-1]
} else {
prevToken = ringTokens[i-1]
}
info := ringInstanceByToken[token]
owned[info.InstanceID] = owned[info.InstanceID] + diff
diff := tokenDistance(prevToken, token)
info := ringInstanceByToken[token]
owned[info.InstanceID] = owned[info.InstanceID] + diff
}
}
// Set to 0 the number of owned tokens by instances which don't have tokens yet.
@ -727,7 +734,7 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur
panic(ErrInconsistentTokensInfo)
}
// Ensure we select an unique instance.
// Ensure we select a unique instance.
if _, ok := shard[info.InstanceID]; ok {
continue
}
@ -1014,7 +1021,7 @@ func (r *Ring) casRing(ctx context.Context, f func(in interface{}) (out interfac
return r.KVClient.CAS(ctx, r.key, f)
}
func (r *Ring) getRing(ctx context.Context) (*Desc, error) {
func (r *Ring) getRing(_ context.Context) (*Desc, error) {
r.mtx.RLock()
defer r.mtx.RUnlock()

@ -59,6 +59,7 @@ func (InstanceState) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_26381ed67e202a6e, []int{0}
}
// Desc is the top-level type used to model a ring, containing information for individual instances.
type Desc struct {
Ingesters map[string]InstanceDesc `protobuf:"bytes,1,rep,name=ingesters,proto3" json:"ingesters" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
}
@ -102,6 +103,7 @@ func (m *Desc) GetIngesters() map[string]InstanceDesc {
return nil
}
// InstanceDesc is the top-level type used to model per-instance information in a ring.
type InstanceDesc struct {
Addr string `protobuf:"bytes,1,opt,name=addr,proto3" json:"addr,omitempty"`
// Unix timestamp (with seconds precision) of the last heartbeat sent

@ -7,11 +7,13 @@ import "github.com/gogo/protobuf/gogoproto/gogo.proto";
option (gogoproto.marshaler_all) = true;
option (gogoproto.unmarshaler_all) = true;
// Desc is the top-level type used to model a ring, containing information for individual instances.
message Desc {
map<string,InstanceDesc> ingesters = 1 [(gogoproto.nullable) = false];
reserved 2;
}
// InstanceDesc is the top-level type used to model per-instance information in a ring.
message InstanceDesc {
reserved 4, 5; // old, deprecated fields

@ -0,0 +1,305 @@
package ring
import (
"container/heap"
"fmt"
"math"
"regexp"
"sort"
"strconv"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"golang.org/x/exp/slices"
)
const (
totalTokensCount = math.MaxUint32 + 1
optimalTokensPerInstance = 512
maxZonesCount = 8
)
var (
instanceIDRegex = regexp.MustCompile(`^(.*)-(\d+)$`)
errorBadInstanceIDFormat = func(instanceID string) error {
return fmt.Errorf("unable to extract instance id from \"%s\"", instanceID)
}
errorZoneCountOutOfBound = func(zonesCount int) error {
return fmt.Errorf("number of zones %d is not correct: it must be greater than 0 and less or equal than %d", zonesCount, maxZonesCount)
}
errorZoneNotValid = func(zone string) error {
return fmt.Errorf("zone %s is not valid", zone)
}
errorMultipleOfZonesCount = func(optimalTokenOwnership uint32, token ringToken) error {
return fmt.Errorf("calculation of a new token between %d and %d with optimal token ownership %d was impossible: optimal token ownership must be a positive multiple of maximal allowed number of zones %d", token.prevToken, token.token, optimalTokenOwnership, maxZonesCount)
}
errorLowerAndUpperBoundModulo = func(optimalTokenOwnership uint32, token ringToken) error {
return fmt.Errorf("calculation of a new token between %d and %d with optimal token ownership %d was impossible: lower and upper bounds must be congruent modulo maximal allowed number of zones %d", token.prevToken, token.token, optimalTokenOwnership, maxZonesCount)
}
errorDistanceBetweenTokensNotBigEnough = func(optimalTokenOwnership int, ownership int64, token ringToken) error {
return fmt.Errorf("calculation of a new token between %d and %d with optimal token ownership %d was impossible: distance between lower and upper bound %d is not big enough", token.prevToken, token.token, optimalTokenOwnership, ownership)
}
)
type SpreadMinimizingTokenGenerator struct {
instanceID int
zoneID int
spreadMinimizingZones []string
logger log.Logger
}
func NewSpreadMinimizingTokenGenerator(instance, zone string, spreadMinimizingZones []string, logger log.Logger) (*SpreadMinimizingTokenGenerator, error) {
if len(spreadMinimizingZones) <= 0 || len(spreadMinimizingZones) > maxZonesCount {
return nil, errorZoneCountOutOfBound(len(spreadMinimizingZones))
}
sortedZones := make([]string, len(spreadMinimizingZones))
copy(sortedZones, spreadMinimizingZones)
if !slices.IsSorted(sortedZones) {
sort.Strings(sortedZones)
}
instanceID, err := parseInstanceID(instance)
if err != nil {
return nil, err
}
zoneID, err := findZoneID(zone, sortedZones)
if err != nil {
return nil, err
}
tokenGenerator := &SpreadMinimizingTokenGenerator{
instanceID: instanceID,
zoneID: zoneID,
spreadMinimizingZones: sortedZones,
logger: logger,
}
return tokenGenerator, nil
}
func parseInstanceID(instanceID string) (int, error) {
parts := instanceIDRegex.FindStringSubmatch(instanceID)
if len(parts) != 3 {
return 0, errorBadInstanceIDFormat(instanceID)
}
return strconv.Atoi(parts[2])
}
// findZoneID gets a zone name and a slice of sorted zones,
// and return the index of the zone in the slice.
func findZoneID(zone string, sortedZones []string) (int, error) {
index := slices.Index(sortedZones, zone)
if index < 0 {
return 0, errorZoneNotValid(zone)
}
return index, nil
}
// generateFirstInstanceTokens calculates a set of tokens that should be assigned to the first instance (with id 0)
// of the zone of the underlying instance.
func (t *SpreadMinimizingTokenGenerator) generateFirstInstanceTokens() Tokens {
// In this approach all the tokens from the same zone are equal to each other modulo maxZonesCount.
// Therefore, tokenDistance is calculated as a multiple of maxZonesCount, so that we ensure that
// the following for loop calculates the actual tokens following the approach's requirement.
tokenDistance := (totalTokensCount / optimalTokensPerInstance / maxZonesCount) * maxZonesCount
tokens := make(Tokens, 0, optimalTokensPerInstance)
for i := 0; i < optimalTokensPerInstance; i++ {
token := uint32(i*tokenDistance) + uint32(t.zoneID)
tokens = append(tokens, token)
}
return tokens
}
// calculateNewToken determines where in the range represented by the given ringToken should a new token be placed
// in order to satisfy the constraint represented by the optimalTokenOwnership. This method assumes that:
// - ringToken.token % maxZonesCount == ringToken.prevToken % zonesCount
// - optimalTokenOwnership % maxZonesCount == 0,
// where zonesCount is the number of zones in the ring. The caller of this function must ensure that these assumptions hold.
func (t *SpreadMinimizingTokenGenerator) calculateNewToken(token ringToken, optimalTokenOwnership uint32) (uint32, error) {
if optimalTokenOwnership < maxZonesCount || optimalTokenOwnership%maxZonesCount != 0 {
return 0, errorMultipleOfZonesCount(optimalTokenOwnership, token)
}
if token.prevToken%maxZonesCount != token.token%maxZonesCount {
return 0, errorLowerAndUpperBoundModulo(optimalTokenOwnership, token)
}
ownership := tokenDistance(token.prevToken, token.token)
if ownership <= int64(optimalTokenOwnership) {
return 0, errorDistanceBetweenTokensNotBigEnough(int(optimalTokenOwnership), ownership, token)
}
// In the present approach tokens of successive zones are immediate successors of the tokens in
// the previous zone. This means that once a token of the "leading" zone, i.e., the zone with
// id 0 is determined, we must have enough space to accommodate the corresponding tokens in the
// remaining (maxZonesCount-1) zones. Hence, the highest token of the leading zone must be a
// multiple of maxZonesCount, that guarantees that there are remaining (maxZonesCount-1) available
// tokens in the token space.
maxTokenValue := uint32(((totalTokensCount / maxZonesCount) - 1) * maxZonesCount)
offset := maxTokenValue - token.prevToken
if offset < optimalTokenOwnership {
newToken := optimalTokenOwnership - offset
return newToken, nil
}
return token.prevToken + optimalTokenOwnership, nil
}
// optimalTokenOwnership calculates the optimal ownership of the remaining currTokensCount tokens of an instance
// having the given current instances ownership currInstanceOwnership and the given optimal instance ownership
// optimalInstanceOwnership. The resulting token ownership must be a multiple of the number of zones.
func (t *SpreadMinimizingTokenGenerator) optimalTokenOwnership(optimalInstanceOwnership, currInstanceOwnership float64, remainingTokensCount uint32) uint32 {
optimalTokenOwnership := uint32(optimalInstanceOwnership-currInstanceOwnership) / remainingTokensCount
return (optimalTokenOwnership / maxZonesCount) * maxZonesCount
}
// GenerateTokens returns at most requestedTokensCount unique tokens, none of which clashes with the given
// allTakenTokens, representing the set of all tokens currently present in the ring. Returned tokens are sorted.
// The optimal number of tokens (optimalTokenPerInstance), i.e., 512, reserved for the underlying instance are
// generated by generateAllTokens. GenerateTokens selects the first requestedTokensCount tokens from the reserved
// tokens set, that are not already present in the takenTokens.
// The number of returned tokens might be lower than the requested number of tokens in the following cases:
// - if tokensCount is higher than 512 (optimalTokensPerInstance), or
// - if among the 512 (optimalTokenPerInstance) reserved tokens there is less than tokenCount
// tokens not already present in takenTokens.
func (t *SpreadMinimizingTokenGenerator) GenerateTokens(requestedTokensCount int, allTakenTokens []uint32) Tokens {
used := make(map[uint32]bool, len(allTakenTokens))
for _, v := range allTakenTokens {
used[v] = true
}
allTokens := t.generateAllTokens()
uniqueTokens := make(Tokens, 0, requestedTokensCount)
// allTokens is a sorted slice of tokens for instance t.cfg.InstanceID in zone t.cfg.zone
// We filter out tokens from allTakenTokens, if any, and return at most requestedTokensCount tokens.
for i := 0; i < len(allTokens) && len(uniqueTokens) < requestedTokensCount; i++ {
token := allTokens[i]
if used[token] {
continue
}
uniqueTokens = append(uniqueTokens, token)
}
return uniqueTokens
}
// generateAllTokens generates the optimal number of tokens (optimalTokenPerInstance), i.e., 512,
// for the underlying instance (with id t.instanceID). Generated tokens are sorted, and they are
// distributed in such a way that registered ownership of the instance t.instanceID, when it is
// placed in the ring that already contains instances with all the ids lower that t.instanceID
// is optimal.
// Calls to this method will always return the same set of tokens.
func (t *SpreadMinimizingTokenGenerator) generateAllTokens() Tokens {
tokensByInstanceID := t.generateTokensByInstanceID()
allTokens := tokensByInstanceID[t.instanceID]
slices.Sort(allTokens)
return allTokens
}
// generateTokensByInstanceID generates the optimal number of tokens (optimalTokenPerInstance),
// i.e., 512, for all instances whose id is less or equal to the id of the underlying instance
// (with id t.instanceID). Generated tokens are not sorted, but they are distributed in such a
// way that registered ownership of all the instances is optimal.
// Calls to this method will always return the same set of tokens.
func (t *SpreadMinimizingTokenGenerator) generateTokensByInstanceID() map[int]Tokens {
firstInstanceTokens := t.generateFirstInstanceTokens()
tokensByInstanceID := make(map[int]Tokens, t.instanceID+1)
tokensByInstanceID[0] = firstInstanceTokens
if t.instanceID == 0 {
return tokensByInstanceID
}
// tokensQueues is a slice of priority queues. Slice indexes correspond
// to the ids of instances, while priority queues represent the tokens
// of the corresponding instance, ordered from highest to lowest ownership.
tokensQueues := make([]ownershipPriorityQueue[ringToken], t.instanceID)
// Create and initialize priority queue of tokens for the first instance
tokensQueue := newPriorityQueue[ringToken](optimalTokensPerInstance)
prev := len(firstInstanceTokens) - 1
firstInstanceOwnership := 0.0
for tk, token := range firstInstanceTokens {
tokenOwnership := float64(tokenDistance(firstInstanceTokens[prev], token))
firstInstanceOwnership += tokenOwnership
heap.Push(&tokensQueue, newRingTokenOwnershipInfo(token, firstInstanceTokens[prev]))
prev = tk
}
tokensQueues[0] = tokensQueue
// instanceQueue is a priority queue of instances such that instances with higher ownership have a higher priority
instanceQueue := newPriorityQueue[ringInstance](t.instanceID)
heap.Push(&instanceQueue, newRingInstanceOwnershipInfo(0, firstInstanceOwnership))
// ignoredInstances is a slice of the current instances whose tokens
// don't have enough space to accommodate new tokens.
ignoredInstances := make([]ownershipInfo[ringInstance], 0, t.instanceID)
for i := 1; i <= t.instanceID; i++ {
optimalInstanceOwnership := float64(totalTokensCount) / float64(i+1)
currInstanceOwnership := 0.0
addedTokens := 0
ignoredInstances = ignoredInstances[:0]
tokens := make(Tokens, 0, optimalTokensPerInstance)
// currInstanceTokenQueue is the priority queue of tokens of newInstance
currInstanceTokenQueue := newPriorityQueue[ringToken](optimalTokensPerInstance)
for addedTokens < optimalTokensPerInstance {
optimalTokenOwnership := t.optimalTokenOwnership(optimalInstanceOwnership, currInstanceOwnership, uint32(optimalTokensPerInstance-addedTokens))
highestOwnershipInstance := instanceQueue.Peek()
if highestOwnershipInstance == nil || highestOwnershipInstance.ownership <= float64(optimalTokenOwnership) {
level.Warn(t.logger).Log("msg", "it was impossible to add a token because the instance with the highest ownership cannot satisfy the request", "added tokens", addedTokens+1, "highest ownership", highestOwnershipInstance.ownership, "requested ownership", optimalTokenOwnership)
// if this happens, it means that we cannot accommodate other tokens, so we panic
err := fmt.Errorf("it was impossible to add %dth token for instance with id %d in zone %s because the instance with the highest ownership cannot satisfy the requested ownership %d", addedTokens+1, i, t.spreadMinimizingZones[t.zoneID], optimalTokenOwnership)
panic(err)
}
tokensQueue := tokensQueues[highestOwnershipInstance.item.instanceID]
highestOwnershipToken := tokensQueue.Peek()
if highestOwnershipToken.ownership <= float64(optimalTokenOwnership) {
// The token with the highest ownership of the instance with the highest ownership could not
// accommodate a new token, hence we ignore this instance and pass to the next instance.
ignoredInstances = append(ignoredInstances, heap.Pop(&instanceQueue).(ownershipInfo[ringInstance]))
continue
}
token := highestOwnershipToken.item
newToken, err := t.calculateNewToken(token, optimalTokenOwnership)
if err != nil {
level.Error(t.logger).Log("msg", "it was impossible to calculate a new token because an error occurred", "err", err)
// if this happens, it means that we cannot accommodate additional tokens, so we panic
err := fmt.Errorf("it was impossible to calculate the %dth token for instance with id %d in zone %s", addedTokens+1, i, t.spreadMinimizingZones[t.zoneID])
panic(err)
}
tokens = append(tokens, newToken)
// add the new token to currInstanceTokenQueue
heap.Push(&currInstanceTokenQueue, newRingTokenOwnershipInfo(newToken, token.prevToken))
oldTokenOwnership := highestOwnershipToken.ownership
newTokenOwnership := float64(tokenDistance(newToken, token.token))
currInstanceOwnership += oldTokenOwnership - newTokenOwnership
// The token with the highest ownership of the instance with the highest ownership has changed,
// so we propagate these changes in the corresponding tokens queue.
highestOwnershipToken.item.prevToken = newToken
highestOwnershipToken.ownership = newTokenOwnership
heap.Fix(&tokensQueue, 0)
// The ownership of the instance with the highest ownership has changed,
// so we propagate these changes in the instances queue.
highestOwnershipInstance.ownership = highestOwnershipInstance.ownership - oldTokenOwnership + newTokenOwnership
heap.Fix(&instanceQueue, 0)
addedTokens++
}
tokensByInstanceID[i] = tokens
// if this is the last iteration we return, so we avoid to call additional heap.Pushs
if i == t.instanceID {
return tokensByInstanceID
}
// If there were some ignored instances, we put them back on the queue.
for _, ignoredInstance := range ignoredInstances {
heap.Push(&instanceQueue, ignoredInstance)
}
tokensQueues[i] = currInstanceTokenQueue
// add the current instance with the calculated ownership currInstanceOwnership to instanceQueue
heap.Push(&instanceQueue, newRingInstanceOwnershipInfo(i, currInstanceOwnership))
}
return tokensByInstanceID
}

@ -0,0 +1,54 @@
package ring
import (
"math/rand"
"sort"
"time"
)
type TokenGenerator interface {
// GenerateTokens generates at most requestedTokensCount unique tokens, none of which clashes with
// the given allTakenTokens, representing the set of all tokens currently present in the ring.
// Generated tokens are sorted.
GenerateTokens(requestedTokensCount int, allTakenTokens []uint32) Tokens
}
type RandomTokenGenerator struct{}
func NewRandomTokenGenerator() *RandomTokenGenerator {
return &RandomTokenGenerator{}
}
// GenerateTokens generates at most requestedTokensCount unique random tokens, none of which clashes with
// the given allTakenTokens, representing the set of all tokens currently present in the ring.
// Generated tokens are sorted.
func (t *RandomTokenGenerator) GenerateTokens(requestedTokensCount int, allTakenTokens []uint32) Tokens {
if requestedTokensCount <= 0 {
return []uint32{}
}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
used := make(map[uint32]bool, len(allTakenTokens))
for _, v := range allTakenTokens {
used[v] = true
}
tokens := make([]uint32, 0, requestedTokensCount)
for i := 0; i < requestedTokensCount; {
candidate := r.Uint32()
if used[candidate] {
continue
}
used[candidate] = true
tokens = append(tokens, candidate)
i++
}
// Ensure returned tokens are sorted.
sort.Slice(tokens, func(i, j int) bool {
return tokens[i] < tokens[j]
})
return tokens
}

@ -2,7 +2,7 @@ package ring
import (
"context"
"math/rand"
"math"
"sort"
"time"
@ -12,39 +12,6 @@ import (
"github.com/grafana/dskit/netutil"
)
// GenerateTokens make numTokens unique random tokens, none of which clash
// with takenTokens. Generated tokens are sorted.
func GenerateTokens(numTokens int, takenTokens []uint32) []uint32 {
if numTokens <= 0 {
return []uint32{}
}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
used := make(map[uint32]bool, len(takenTokens))
for _, v := range takenTokens {
used[v] = true
}
tokens := make([]uint32, 0, numTokens)
for i := 0; i < numTokens; {
candidate := r.Uint32()
if used[candidate] {
continue
}
used[candidate] = true
tokens = append(tokens, candidate)
i++
}
// Ensure returned tokens are sorted.
sort.Slice(tokens, func(i, j int) bool {
return tokens[i] < tokens[j]
})
return tokens
}
// GetInstanceAddr returns the address to use to register the instance
// in the ring.
func GetInstanceAddr(configAddr string, netInterfaces []string, logger log.Logger, enableInet6 bool) (string, error) {
@ -168,3 +135,13 @@ func searchToken(tokens []uint32, key uint32) int {
}
return i
}
// tokenDistance returns the distance between the given tokens from and to.
// The distance between a token and itself is the whole ring, i.e., math.MaxUint32 + 1.
func tokenDistance(from, to uint32) int64 {
if from < to {
return int64(to - from)
}
// the trailing +1 is needed to ensure that token 0 is counted
return math.MaxUint32 - int64(from) + int64(to) + 1
}

@ -7,7 +7,6 @@ package runutil
import (
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
@ -59,7 +58,7 @@ func ExhaustCloseWithErrCapture(err *error, r io.ReadCloser, format string, a ..
// dir except for the ignoreDirs directories.
// NOTE: DeleteAll is not idempotent.
func DeleteAll(dir string, ignoreDirs ...string) error {
entries, err := ioutil.ReadDir(dir)
entries, err := os.ReadDir(dir)
if os.IsNotExist(err) {
return nil
}

@ -284,7 +284,7 @@ func (l managerServiceListener) Terminated(from State) {
l.m.serviceStateChanged(l.s, from, Terminated)
}
func (l managerServiceListener) Failed(from State, failure error) {
func (l managerServiceListener) Failed(from State, _ error) {
l.m.serviceStateChanged(l.s, from, Failed)
}

@ -793,7 +793,7 @@ github.com/gorilla/websocket
# github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2
## explicit; go 1.17
github.com/grafana/cloudflare-go
# github.com/grafana/dskit v0.0.0-20230518162305-3c92c534827e
# github.com/grafana/dskit v0.0.0-20230706162620-5081d8ed53e6
## explicit; go 1.18
github.com/grafana/dskit/backoff
github.com/grafana/dskit/concurrency

Loading…
Cancel
Save