Unify ring managers across components (#10931)

**What this PR does / why we need it**:

This PR creates a re-usable `RingManager` implementation that is then used by

* Query Scheduler ring
* Index Gateway ring
* Bloom Gateway ring
* Bloom Compactor ring

**Which issue(s) this PR fixes**:

Every component had its own implementation of the ring manager which only really differed in the log messages containing the component name.

---------

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/10962/head
Christian Haudum 2 years ago committed by GitHub
parent 89829065e7
commit 2abb472e7e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      docs/sources/configure/_index.md
  2. 16
      pkg/bloomcompactor/config.go
  3. 208
      pkg/bloomcompactor/ringmanager.go
  4. 10
      pkg/bloomgateway/bloomgateway_test.go
  5. 18
      pkg/bloomgateway/config.go
  6. 42
      pkg/compactor/compactor.go
  7. 7
      pkg/distributor/distributor.go
  8. 4
      pkg/loki/common/common.go
  9. 34
      pkg/loki/config_wrapper.go
  10. 6
      pkg/loki/config_wrapper_test.go
  11. 9
      pkg/loki/loki.go
  12. 39
      pkg/loki/modules.go
  13. 29
      pkg/scheduler/lifecycle.go
  14. 252
      pkg/scheduler/ringmanager.go
  15. 26
      pkg/scheduler/scheduler.go
  16. 7
      pkg/storage/stores/shipper/indexshipper/indexgateway/config.go
  17. 29
      pkg/storage/stores/shipper/indexshipper/indexgateway/lifecycle.go
  18. 235
      pkg/storage/stores/shipper/indexshipper/indexgateway/ringmanager.go
  19. 6
      pkg/storage/stores/shipper/indexshipper/indexgateway/shufflesharding.go
  20. 6
      pkg/util/ring/ring.go
  21. 19
      pkg/util/ring/ring_config.go
  22. 2
      pkg/util/ring/ring_test.go
  23. 77
      pkg/util/ring/ringmanager.go

@ -1791,7 +1791,7 @@ ring:
# CLI flag: -bloom-gateway.ring.instance-enable-ipv6
[instance_enable_ipv6: <boolean> | default = false]
# Factor for data replication on the bloom gateways.
# Factor for data replication.
# CLI flag: -bloom-gateway.replication-factor
[replication_factor: <int> | default = 3]

@ -4,7 +4,7 @@ import (
"flag"
"time"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/ring"
)
// Config configures the bloom-compactor component.
@ -12,7 +12,7 @@ type Config struct {
// Ring configures the ring store used to save and retrieve the different Bloom-Compactor instances.
// In case it isn't explicitly set, it follows the same behavior of the other rings (ex: using the common configuration
// section and the ingester configuration by default).
RingCfg RingCfg `yaml:"ring,omitempty" doc:"description=Defines the ring to be used by the bloom-compactor servers. In case this isn't configured, this block supports inheriting configuration from the common ring section."`
Ring ring.RingConfig `yaml:"ring,omitempty" doc:"description=Defines the ring to be used by the bloom-compactor servers. In case this isn't configured, this block supports inheriting configuration from the common ring section."`
// Enabled configures whether bloom-compactors should be used to compact index values into bloomfilters
Enabled bool `yaml:"enabled"`
WorkingDirectory string `yaml:"working_directory"`
@ -21,16 +21,6 @@ type Config struct {
// RegisterFlags registers flags for the Bloom-Compactor configuration.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.RingCfg.RegisterFlags("bloom-compactor.", "collectors/", f)
cfg.Ring.RegisterFlagsWithPrefix("bloom-compactor.", "collectors/", f)
f.BoolVar(&cfg.Enabled, "bloom-compactor.enabled", false, "Flag to enable or disable the usage of the bloom-compactor component.")
}
// RingCfg is a wrapper for our internally used ring configuration plus the replication factor.
type RingCfg struct {
// RingConfig configures the Bloom-Compactor ring.
util.RingConfig `yaml:",inline"`
}
func (cfg *RingCfg) RegisterFlags(prefix, storePrefix string, f *flag.FlagSet) {
cfg.RingConfig.RegisterFlagsWithPrefix(prefix, storePrefix, f)
}

@ -1,208 +0,0 @@
package bloomcompactor
import (
"context"
"net/http"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
)
const (
// ringAutoForgetUnhealthyPeriods is how many consecutive timeout periods an unhealthy instance
// in the ring will be automatically removed.
ringAutoForgetUnhealthyPeriods = 10
// ringNameForServer is the name of the ring used by the bloom-compactor server.
ringNameForServer = "bloom-compactor"
// start with a single instance
ringNumTokens = 1
ringCheckPeriod = 3 * time.Second
// ringKey is the key under which we register different instances of bloom-compactor in the KVStore.
ringKey = "bloom-compactor"
replicationFactor = 1
)
type RingManager struct {
services.Service
cfg Config
logger log.Logger
subservices *services.Manager
subservicesWatcher *services.FailureWatcher
RingLifecycler *ring.BasicLifecycler
Ring *ring.Ring
}
func NewRingManager(cfg Config, logger log.Logger, registerer prometheus.Registerer) (*RingManager, error) {
rm := &RingManager{
cfg: cfg, logger: logger,
}
// instantiate kv store.
ringStore, err := kv.NewClient(
rm.cfg.RingCfg.KVStore,
ring.GetCodec(),
kv.RegistererWithKVName(prometheus.WrapRegistererWithPrefix("loki_", registerer), "bloom-compactor-ring-manager"),
rm.logger,
)
if err != nil {
return nil, errors.Wrap(err, "bloom-compactor ring manager failed to create KV store client")
}
lifecyclerCfg, err := rm.cfg.RingCfg.ToLifecyclerConfig(ringNumTokens, rm.logger)
if err != nil {
return nil, errors.Wrap(err, "invalid ring lifecycler config")
}
// Define lifecycler delegates in reverse order (last to be called defined first because they're
// chained via "next delegate").
delegate := ring.BasicLifecyclerDelegate(rm)
delegate = ring.NewLeaveOnStoppingDelegate(delegate, rm.logger)
delegate = ring.NewTokensPersistencyDelegate(rm.cfg.RingCfg.TokensFilePath, ring.JOINING, delegate, rm.logger)
delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*rm.cfg.RingCfg.HeartbeatTimeout, delegate, rm.logger)
rm.RingLifecycler, err = ring.NewBasicLifecycler(lifecyclerCfg, ringNameForServer, ringKey, ringStore, delegate, rm.logger, registerer)
if err != nil {
return nil, errors.Wrap(err, "failed to create bloom-compactor ring manager lifecycler")
}
// instantiate ring.
ringCfg := rm.cfg.RingCfg.ToRingConfig(replicationFactor)
rm.Ring, err = ring.NewWithStoreClientAndStrategy(
ringCfg,
ringNameForServer,
ringKey,
ringStore,
ring.NewIgnoreUnhealthyInstancesReplicationStrategy(),
prometheus.WrapRegistererWithPrefix("loki_", registerer),
rm.logger,
)
if err != nil {
return nil, errors.Wrap(err, "bloom-compactor ring manager failed to create ring client")
}
svcs := []services.Service{rm.RingLifecycler, rm.Ring}
rm.subservices, err = services.NewManager(svcs...)
if err != nil {
return nil, errors.Wrap(err, "new bloom services manager in server mode")
}
rm.subservicesWatcher = services.NewFailureWatcher()
rm.subservicesWatcher.WatchManager(rm.subservices)
rm.Service = services.NewBasicService(rm.starting, rm.running, rm.stopping)
return rm, nil
}
// starting implements the Lifecycler interface and is one of the lifecycle hooks.
func (rm *RingManager) starting(ctx context.Context) (err error) {
// In case this function will return error we want to unregister the instance
// from the ring. We do it ensuring dependencies are gracefully stopped if they
// were already started.
defer func() {
if err == nil || rm.subservices == nil {
return
}
if stopErr := services.StopManagerAndAwaitStopped(context.Background(), rm.subservices); stopErr != nil {
level.Error(rm.logger).Log("msg", "failed to gracefully stop bloom-compactor ring manager dependencies", "err", stopErr)
}
}()
if err := services.StartManagerAndAwaitHealthy(ctx, rm.subservices); err != nil {
return errors.Wrap(err, "unable to start bloom-compactor ring manager subservices")
}
// The BasicLifecycler does not automatically move state to ACTIVE such that any additional work that
// someone wants to do can be done before becoming ACTIVE. For the bloom-compactor we don't currently
// have any additional work so we can become ACTIVE right away.
// Wait until the ring client detected this instance in the JOINING
// state to make sure that when we'll run the initial sync we already
// know the tokens assigned to this instance.
level.Info(rm.logger).Log("msg", "waiting until bloom-compactor is JOINING in the ring")
if err := ring.WaitInstanceState(ctx, rm.Ring, rm.RingLifecycler.GetInstanceID(), ring.JOINING); err != nil {
return err
}
level.Info(rm.logger).Log("msg", "bloom-compactor is JOINING in the ring")
if err = rm.RingLifecycler.ChangeState(ctx, ring.ACTIVE); err != nil {
return errors.Wrapf(err, "switch instance to %s in the ring", ring.ACTIVE)
}
// Wait until the ring client detected this instance in the ACTIVE state to
// make sure that when we'll run the loop it won't be detected as a ring
// topology change.
level.Info(rm.logger).Log("msg", "waiting until bloom-compactor is ACTIVE in the ring")
if err := ring.WaitInstanceState(ctx, rm.Ring, rm.RingLifecycler.GetInstanceID(), ring.ACTIVE); err != nil {
return err
}
level.Info(rm.logger).Log("msg", "bloom-compactor is ACTIVE in the ring")
return nil
}
// running implements the Lifecycler interface and is one of the lifecycle hooks.
func (rm *RingManager) running(ctx context.Context) error {
t := time.NewTicker(ringCheckPeriod)
defer t.Stop()
for {
select {
case <-ctx.Done():
return nil
case err := <-rm.subservicesWatcher.Chan():
return errors.Wrap(err, "running bloom-compactor ring manager subservice failed")
case <-t.C:
continue
}
}
}
// stopping implements the Lifecycler interface and is one of the lifecycle hooks.
func (rm *RingManager) stopping(_ error) error {
level.Debug(rm.logger).Log("msg", "stopping bloom-compactor ring manager")
return services.StopManagerAndAwaitStopped(context.Background(), rm.subservices)
}
func (rm *RingManager) ServeHTTP(w http.ResponseWriter, req *http.Request) {
rm.Ring.ServeHTTP(w, req)
}
func (rm *RingManager) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, _ string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) {
// When we initialize the bloom-compactor instance in the ring we want to start from
// a clean situation, so whatever is the state we set it JOINING, while we keep existing
// tokens (if any) or the ones loaded from file.
var tokens []uint32
if instanceExists {
tokens = instanceDesc.GetTokens()
}
takenTokens := ringDesc.GetTokens()
gen := ring.NewRandomTokenGenerator()
newTokens := gen.GenerateTokens(ringNumTokens-len(tokens), takenTokens)
// Tokens sorting will be enforced by the parent caller.
tokens = append(tokens, newTokens...)
return ring.JOINING, tokens
}
func (rm *RingManager) OnRingInstanceTokens(_ *ring.BasicLifecycler, _ ring.Tokens) {
}
func (rm *RingManager) OnRingInstanceStopping(_ *ring.BasicLifecycler) {
}
func (rm *RingManager) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, _ *ring.Desc, _ *ring.InstanceDesc) {
}

@ -19,7 +19,7 @@ import (
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/chunk/client/local"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/util"
lokiring "github.com/grafana/loki/pkg/util/ring"
)
func parseDayTime(s string) config.DayTime {
@ -71,8 +71,8 @@ func TestBloomGateway_StartStopService(t *testing.T) {
cfg := Config{
Enabled: true,
Ring: RingCfg{
RingConfig: util.RingConfig{
Ring: lokiring.RingConfigWithRF{
RingConfig: lokiring.RingConfig{
KVStore: kv.Config{
Mock: kvStore,
},
@ -125,8 +125,8 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
cfg := Config{
Enabled: true,
Ring: RingCfg{
RingConfig: util.RingConfig{
Ring: lokiring.RingConfigWithRF{
RingConfig: lokiring.RingConfig{
KVStore: kv.Config{
Mock: kvStore,
},

@ -3,29 +3,15 @@ package bloomgateway
import (
"flag"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/ring"
)
// RingCfg is a wrapper for our internally used ring configuration plus the replication factor.
type RingCfg struct {
// RingConfig configures the Bloom Gateway ring.
util.RingConfig `yaml:",inline"`
// ReplicationFactor defines how many replicas of the Bloom Gateway store a single data shard.
ReplicationFactor int `yaml:"replication_factor"`
}
// RegisterFlagsWithPrefix registers all Bloom Gateway CLI flags.
func (cfg *RingCfg) RegisterFlagsWithPrefix(prefix, storePrefix string, f *flag.FlagSet) {
cfg.RingConfig.RegisterFlagsWithPrefix(prefix, storePrefix, f)
f.IntVar(&cfg.ReplicationFactor, prefix+"replication-factor", 3, "Factor for data replication on the bloom gateways.")
}
// Config configures the Bloom Gateway component.
type Config struct {
// Ring configures the ring store used to save and retrieve the different Bloom Gateway instances.
// In case it isn't explicitly set, it follows the same behavior of the other rings (ex: using the common configuration
// section and the ingester configuration by default).
Ring RingCfg `yaml:"ring,omitempty" doc:"description=Defines the ring to be used by the bloom gateway servers and clients. In case this isn't configured, this block supports inheriting configuration from the common ring section."`
Ring ring.RingConfigWithRF `yaml:"ring,omitempty" doc:"description=Defines the ring to be used by the bloom gateway servers and clients. In case this isn't configured, this block supports inheriting configuration from the common ring section."`
// Enabled is the global switch to configures whether Bloom Gateways should be used to filter chunks.
Enabled bool `yaml:"enabled"`
// Client configures the Bloom Gateway client

@ -28,9 +28,9 @@ import (
chunk_util "github.com/grafana/loki/pkg/storage/chunk/client/util"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/storage"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/filter"
util_log "github.com/grafana/loki/pkg/util/log"
lokiring "github.com/grafana/loki/pkg/util/ring"
"github.com/grafana/loki/pkg/validation"
)
@ -71,26 +71,26 @@ var (
)
type Config struct {
WorkingDirectory string `yaml:"working_directory"`
SharedStoreType string `yaml:"shared_store"`
SharedStoreKeyPrefix string `yaml:"shared_store_key_prefix"`
CompactionInterval time.Duration `yaml:"compaction_interval"`
ApplyRetentionInterval time.Duration `yaml:"apply_retention_interval"`
RetentionEnabled bool `yaml:"retention_enabled"`
RetentionDeleteDelay time.Duration `yaml:"retention_delete_delay"`
RetentionDeleteWorkCount int `yaml:"retention_delete_worker_count"`
RetentionTableTimeout time.Duration `yaml:"retention_table_timeout"`
DeleteRequestStore string `yaml:"delete_request_store"`
DefaultDeleteRequestStore string `yaml:"-" doc:"hidden"`
DeleteBatchSize int `yaml:"delete_batch_size"`
DeleteRequestCancelPeriod time.Duration `yaml:"delete_request_cancel_period"`
DeleteMaxInterval time.Duration `yaml:"delete_max_interval"`
MaxCompactionParallelism int `yaml:"max_compaction_parallelism"`
UploadParallelism int `yaml:"upload_parallelism"`
CompactorRing util.RingConfig `yaml:"compactor_ring,omitempty" doc:"description=The hash ring configuration used by compactors to elect a single instance for running compactions. The CLI flags prefix for this block config is: compactor.ring"`
RunOnce bool `yaml:"_" doc:"hidden"`
TablesToCompact int `yaml:"tables_to_compact"`
SkipLatestNTables int `yaml:"skip_latest_n_tables"`
WorkingDirectory string `yaml:"working_directory"`
SharedStoreType string `yaml:"shared_store"`
SharedStoreKeyPrefix string `yaml:"shared_store_key_prefix"`
CompactionInterval time.Duration `yaml:"compaction_interval"`
ApplyRetentionInterval time.Duration `yaml:"apply_retention_interval"`
RetentionEnabled bool `yaml:"retention_enabled"`
RetentionDeleteDelay time.Duration `yaml:"retention_delete_delay"`
RetentionDeleteWorkCount int `yaml:"retention_delete_worker_count"`
RetentionTableTimeout time.Duration `yaml:"retention_table_timeout"`
DeleteRequestStore string `yaml:"delete_request_store"`
DefaultDeleteRequestStore string `yaml:"-" doc:"hidden"`
DeleteBatchSize int `yaml:"delete_batch_size"`
DeleteRequestCancelPeriod time.Duration `yaml:"delete_request_cancel_period"`
DeleteMaxInterval time.Duration `yaml:"delete_max_interval"`
MaxCompactionParallelism int `yaml:"max_compaction_parallelism"`
UploadParallelism int `yaml:"upload_parallelism"`
CompactorRing lokiring.RingConfig `yaml:"compactor_ring,omitempty" doc:"description=The hash ring configuration used by compactors to elect a single instance for running compactions. The CLI flags prefix for this block config is: compactor.ring"`
RunOnce bool `yaml:"_" doc:"hidden"`
TablesToCompact int `yaml:"tables_to_compact"`
SkipLatestNTables int `yaml:"skip_latest_n_tables"`
// Deprecated
DeletionMode string `yaml:"deletion_mode" doc:"deprecated|description=Use deletion_mode per tenant configuration instead."`

@ -44,6 +44,7 @@ import (
"github.com/grafana/loki/pkg/runtime"
"github.com/grafana/loki/pkg/util"
util_log "github.com/grafana/loki/pkg/util/log"
lokiring "github.com/grafana/loki/pkg/util/ring"
"github.com/grafana/loki/pkg/validation"
)
@ -376,7 +377,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
keys = append(keys, derivedKeys...)
streams = append(streams, derivedStreams...)
} else {
keys = append(keys, util.TokenFor(tenantID, stream.Labels))
keys = append(keys, lokiring.TokenFor(tenantID, stream.Labels))
streams = append(streams, streamTracker{stream: stream})
}
}
@ -474,7 +475,7 @@ func (d *Distributor) shardStream(stream logproto.Stream, pushSize int, tenantID
shardCount := d.shardCountFor(logger, &stream, pushSize, tenantID, shardStreamsCfg)
if shardCount <= 1 {
return []uint32{util.TokenFor(tenantID, stream.Labels)}, []streamTracker{{stream: stream}}
return []uint32{lokiring.TokenFor(tenantID, stream.Labels)}, []streamTracker{{stream: stream}}
}
d.streamShardCount.Inc()
@ -518,7 +519,7 @@ func (d *Distributor) createShards(stream logproto.Stream, totalShards int, tena
shardNum := (startShard + i) % totalShards
shard := d.createShard(streamLabels, streamPattern, shardNum, entriesPerShard)
derivedKeys = append(derivedKeys, util.TokenFor(tenantID, shard.Labels))
derivedKeys = append(derivedKeys, lokiring.TokenFor(tenantID, shard.Labels))
derivedStreams = append(derivedStreams, streamTracker{stream: shard})
if shardStreamsCfg.LoggingEnabled {

@ -15,8 +15,8 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/client/hedging"
"github.com/grafana/loki/pkg/storage/chunk/client/ibmcloud"
"github.com/grafana/loki/pkg/storage/chunk/client/openstack"
"github.com/grafana/loki/pkg/util"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/ring"
)
// Config holds common config that can be shared between multiple other config sections.
@ -27,7 +27,7 @@ type Config struct {
Storage Storage `yaml:"storage"`
PersistTokens bool `yaml:"persist_tokens"`
ReplicationFactor int `yaml:"replication_factor"`
Ring util.RingConfig `yaml:"ring"`
Ring ring.RingConfig `yaml:"ring"`
// InstanceInterfaceNames represents a common list of net interfaces used to look for host addresses.
//

@ -14,8 +14,8 @@ import (
"github.com/grafana/loki/pkg/loki/common"
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/cfg"
lokiring "github.com/grafana/loki/pkg/util/ring"
"github.com/grafana/loki/pkg/ruler/rulestore/local"
loki_net "github.com/grafana/loki/pkg/util/net"
@ -211,7 +211,7 @@ func applyDynamicRingConfigs(r, defaults *ConfigWrapper) {
// neither common ring nor memberlist set, use ingester ring configuration for all rings
// that have not been configured. Don't merge any ingester ring configurations for rings
// that deviate from the default in any way.
ingesterRingCfg := util.CortexLifecyclerConfigToRingConfig(r.Ingester.LifecyclerConfig)
ingesterRingCfg := lokiring.CortexLifecyclerConfigToRingConfig(r.Ingester.LifecyclerConfig)
applyConfigToRings(r, defaults, ingesterRingCfg, false)
}
}
@ -223,7 +223,7 @@ func applyDynamicRingConfigs(r, defaults *ConfigWrapper) {
// ring defined. When `mergeWithExisting` is false, we will not apply any of the ring config to a ring that has
// any deviations from defaults. When mergeWithExisting is true, the ring config is overlaid on top of any specified
// derivations, with the derivations taking precedence.
func applyConfigToRings(r, defaults *ConfigWrapper, rc util.RingConfig, mergeWithExisting bool) {
func applyConfigToRings(r, defaults *ConfigWrapper, rc lokiring.RingConfig, mergeWithExisting bool) {
// Ingester - mergeWithExisting is false when applying the ingester config, and we only want to
// change ingester ring values when applying the common config, so there's no need for the DeepEqual
// check here.
@ -304,16 +304,16 @@ func applyConfigToRings(r, defaults *ConfigWrapper, rc util.RingConfig, mergeWit
}
// BloomCompactor
if mergeWithExisting || reflect.DeepEqual(r.BloomCompactor.RingCfg, defaults.BloomCompactor.RingCfg) {
r.BloomCompactor.RingCfg.HeartbeatTimeout = rc.HeartbeatTimeout
r.BloomCompactor.RingCfg.HeartbeatPeriod = rc.HeartbeatPeriod
r.BloomCompactor.RingCfg.InstancePort = rc.InstancePort
r.BloomCompactor.RingCfg.InstanceAddr = rc.InstanceAddr
r.BloomCompactor.RingCfg.InstanceID = rc.InstanceID
r.BloomCompactor.RingCfg.InstanceInterfaceNames = rc.InstanceInterfaceNames
r.BloomCompactor.RingCfg.InstanceZone = rc.InstanceZone
r.BloomCompactor.RingCfg.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled
r.BloomCompactor.RingCfg.KVStore = rc.KVStore
if mergeWithExisting || reflect.DeepEqual(r.BloomCompactor.Ring, defaults.BloomCompactor.Ring) {
r.BloomCompactor.Ring.HeartbeatTimeout = rc.HeartbeatTimeout
r.BloomCompactor.Ring.HeartbeatPeriod = rc.HeartbeatPeriod
r.BloomCompactor.Ring.InstancePort = rc.InstancePort
r.BloomCompactor.Ring.InstanceAddr = rc.InstanceAddr
r.BloomCompactor.Ring.InstanceID = rc.InstanceID
r.BloomCompactor.Ring.InstanceInterfaceNames = rc.InstanceInterfaceNames
r.BloomCompactor.Ring.InstanceZone = rc.InstanceZone
r.BloomCompactor.Ring.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled
r.BloomCompactor.Ring.KVStore = rc.KVStore
}
// BloomGateway
@ -364,7 +364,7 @@ func applyTokensFilePath(cfg *ConfigWrapper) error {
if err != nil {
return err
}
cfg.BloomCompactor.RingCfg.TokensFilePath = f
cfg.BloomCompactor.Ring.TokensFilePath = f
// Bloom-Gateway
f, err = tokensFile(cfg, "bloomgateway.tokens")
@ -454,8 +454,8 @@ func appendLoopbackInterface(cfg, defaults *ConfigWrapper) {
cfg.IndexGateway.Ring.InstanceInterfaceNames = append(cfg.IndexGateway.Ring.InstanceInterfaceNames, loopbackIface)
}
if reflect.DeepEqual(cfg.BloomCompactor.RingCfg.InstanceInterfaceNames, defaults.BloomCompactor.RingCfg.InstanceInterfaceNames) {
cfg.BloomCompactor.RingCfg.InstanceInterfaceNames = append(cfg.BloomCompactor.RingCfg.InstanceInterfaceNames, loopbackIface)
if reflect.DeepEqual(cfg.BloomCompactor.Ring.InstanceInterfaceNames, defaults.BloomCompactor.Ring.InstanceInterfaceNames) {
cfg.BloomCompactor.Ring.InstanceInterfaceNames = append(cfg.BloomCompactor.Ring.InstanceInterfaceNames, loopbackIface)
}
if reflect.DeepEqual(cfg.BloomGateway.Ring.InstanceInterfaceNames, defaults.BloomGateway.Ring.InstanceInterfaceNames) {
@ -474,7 +474,7 @@ func applyMemberlistConfig(r *ConfigWrapper) {
r.QueryScheduler.SchedulerRing.KVStore.Store = memberlistStr
r.CompactorConfig.CompactorRing.KVStore.Store = memberlistStr
r.IndexGateway.Ring.KVStore.Store = memberlistStr
r.BloomCompactor.RingCfg.KVStore.Store = memberlistStr
r.BloomCompactor.Ring.KVStore.Store = memberlistStr
r.BloomGateway.Ring.KVStore.Store = memberlistStr
}

@ -25,10 +25,10 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/client/local"
"github.com/grafana/loki/pkg/storage/chunk/client/openstack"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/cfg"
util_log "github.com/grafana/loki/pkg/util/log"
loki_net "github.com/grafana/loki/pkg/util/net"
lokiring "github.com/grafana/loki/pkg/util/ring"
)
// Can't use a totally empty yaml file or it causes weird behavior in the unmarshalling.
@ -1089,8 +1089,8 @@ func Test_applyIngesterRingConfig(t *testing.T) {
reflect.TypeOf(distributor.RingConfig{}).NumField(),
fmt.Sprintf(msgf, reflect.TypeOf(distributor.RingConfig{}).String()))
assert.Equal(t, 13,
reflect.TypeOf(util.RingConfig{}).NumField(),
fmt.Sprintf(msgf, reflect.TypeOf(util.RingConfig{}).String()))
reflect.TypeOf(lokiring.RingConfig{}).NumField(),
fmt.Sprintf(msgf, reflect.TypeOf(lokiring.RingConfig{}).String()))
})
t.Run("compactor and scheduler tokens file should not be configured if persist_tokens is false", func(t *testing.T) {

@ -59,6 +59,7 @@ import (
"github.com/grafana/loki/pkg/util/fakeauth"
"github.com/grafana/loki/pkg/util/limiter"
util_log "github.com/grafana/loki/pkg/util/log"
lokiring "github.com/grafana/loki/pkg/util/ring"
serverutil "github.com/grafana/loki/pkg/util/server"
"github.com/grafana/loki/pkg/validation"
)
@ -313,11 +314,11 @@ type Loki struct {
compactor *compactor.Compactor
QueryFrontEndTripperware queryrangebase.Tripperware
queryScheduler *scheduler.Scheduler
querySchedulerRingManager *scheduler.RingManager
querySchedulerRingManager *lokiring.RingManager
usageReport *analytics.Reporter
indexGatewayRingManager *indexgateway.RingManager
bloomCompactorRingManager *bloomcompactor.RingManager
bloomGatewayRingManager *bloomgateway.RingManager
indexGatewayRingManager *lokiring.RingManager
bloomCompactorRingManager *lokiring.RingManager
bloomGatewayRingManager *lokiring.RingManager
clientMetrics storage.ClientMetrics
deleteClientMetrics *deletion.DeleteRequestClientMetrics

@ -73,6 +73,7 @@ import (
"github.com/grafana/loki/pkg/util/limiter"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/querylimits"
lokiring "github.com/grafana/loki/pkg/util/ring"
serverutil "github.com/grafana/loki/pkg/util/server"
"github.com/grafana/loki/pkg/validation"
)
@ -121,6 +122,13 @@ const (
Analytics string = "analytics"
)
const (
schedulerRingKey = "scheduler"
indexGatewayRingKey = "index-gateway"
bloomGatewayRingKey = "bloom-gateway"
bloomCompactorRingKey = "bloom-compactor"
)
func (t *Loki) initServer() (services.Service, error) {
prometheus.MustRegister(version.NewCollector("loki"))
// unregister default go collector
@ -255,7 +263,7 @@ func (t *Loki) initRuntimeConfig() (services.Service, error) {
// By doing the initialization here instead of per-module init function, we avoid the problem
// of projects based on Loki forgetting the wiring if they override module's init method (they also don't have access to private symbols).
t.Cfg.CompactorConfig.CompactorRing.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
t.Cfg.BloomCompactor.RingCfg.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
t.Cfg.BloomCompactor.Ring.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
t.Cfg.Distributor.DistributorRing.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
t.Cfg.IndexGateway.Ring.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
t.Cfg.BloomGateway.Ring.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
@ -369,7 +377,7 @@ func (t *Loki) initQuerier() (services.Service, error) {
QuerierWorkerConfig: &t.Cfg.Worker,
QueryFrontendEnabled: t.Cfg.isModuleEnabled(QueryFrontend),
QuerySchedulerEnabled: t.Cfg.isModuleEnabled(QueryScheduler),
SchedulerRing: scheduler.SafeReadRing(t.querySchedulerRingManager),
SchedulerRing: scheduler.SafeReadRing(t.Cfg.QueryScheduler, t.querySchedulerRingManager),
}
toMerge := []middleware.Interface{
@ -854,7 +862,7 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
}
roundTripper, frontendV1, frontendV2, err := frontend.InitFrontend(
combinedCfg,
scheduler.SafeReadRing(t.querySchedulerRingManager),
scheduler.SafeReadRing(t.Cfg.QueryScheduler, t.querySchedulerRingManager),
disabledShuffleShardingLimits{},
t.Cfg.Server.GRPCListenPort,
util_log.Logger,
@ -1246,12 +1254,12 @@ func (t *Loki) initBloomGatewayRing() (services.Service, error) {
t.Cfg.BloomGateway.Ring.ListenPort = t.Cfg.Server.GRPCListenPort
// TODO(chaudum): Do we want to integration the bloom gateway component into the backend target?
mode := bloomgateway.ClientMode
mode := lokiring.ClientMode
legacyReadMode := t.Cfg.LegacyReadTarget && t.isModuleActive(Read)
if t.Cfg.isModuleEnabled(BloomGateway) || t.Cfg.isModuleEnabled(Backend) || legacyReadMode {
mode = bloomgateway.ServerMode
mode = lokiring.ServerMode
}
manager, err := bloomgateway.NewRingManager(mode, t.Cfg.BloomGateway, util_log.Logger, prometheus.DefaultRegisterer)
manager, err := lokiring.NewRingManager(bloomGatewayRingKey, mode, t.Cfg.BloomGateway.Ring.RingConfig, t.Cfg.BloomGateway.Ring.ReplicationFactor, 128, util_log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, gerrors.Wrap(err, "error initializing bloom gateway ring manager")
@ -1336,11 +1344,11 @@ func (t *Loki) initIndexGatewayRing() (_ services.Service, err error) {
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = indexshipper.ModeReadOnly
t.Cfg.StorageConfig.TSDBShipperConfig.Mode = indexshipper.ModeReadOnly
managerMode := indexgateway.ClientMode
managerMode := lokiring.ClientMode
if t.Cfg.isModuleEnabled(IndexGateway) || legacyReadMode || t.Cfg.isModuleEnabled(Backend) {
managerMode = indexgateway.ServerMode
managerMode = lokiring.ServerMode
}
rm, err := indexgateway.NewRingManager(managerMode, t.Cfg.IndexGateway, util_log.Logger, prometheus.DefaultRegisterer)
rm, err := lokiring.NewRingManager(indexGatewayRingKey, managerMode, t.Cfg.IndexGateway.Ring.RingConfig, t.Cfg.IndexGateway.Ring.ReplicationFactor, 128, util_log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, gerrors.Wrap(err, "new index gateway ring manager")
@ -1384,12 +1392,13 @@ func (t *Loki) initBloomCompactor() (services.Service, error) {
}
func (t *Loki) initBloomCompactorRing() (services.Service, error) {
t.Cfg.BloomCompactor.RingCfg.ListenPort = t.Cfg.Server.GRPCListenPort
t.Cfg.BloomCompactor.Ring.ListenPort = t.Cfg.Server.GRPCListenPort
// is LegacyMode needed?
//legacyReadMode := t.Cfg.LegacyReadTarget && t.isModuleActive(Read)
rm, err := bloomcompactor.NewRingManager(t.Cfg.BloomCompactor, util_log.Logger, prometheus.DefaultRegisterer)
rm, err := lokiring.NewRingManager(bloomCompactorRingKey, lokiring.ServerMode, t.Cfg.BloomCompactor.Ring, 1, 1, util_log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, gerrors.Wrap(err, "error initializing bloom-compactor ring manager")
}
@ -1426,11 +1435,13 @@ func (t *Loki) initQuerySchedulerRing() (_ services.Service, err error) {
// Set some config sections from other config sections in the config struct
t.Cfg.QueryScheduler.SchedulerRing.ListenPort = t.Cfg.Server.GRPCListenPort
managerMode := scheduler.RingManagerModeReader
managerMode := lokiring.ClientMode
if t.Cfg.isModuleEnabled(QueryScheduler) || t.Cfg.isModuleEnabled(Backend) || t.Cfg.isModuleEnabled(All) || (t.Cfg.LegacyReadTarget && t.Cfg.isModuleEnabled(Read)) {
managerMode = scheduler.RingManagerModeMember
managerMode = lokiring.ServerMode
}
rm, err := scheduler.NewRingManager(managerMode, t.Cfg.QueryScheduler, util_log.Logger, prometheus.DefaultRegisterer)
rf := 2 // ringReplicationFactor should be 2 because we want 2 schedulers.
tokens := 1 // we only need to insert 1 token to be used for leader election purposes.
rm, err := lokiring.NewRingManager(schedulerRingKey, managerMode, t.Cfg.QueryScheduler.SchedulerRing, rf, tokens, util_log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, gerrors.Wrap(err, "new scheduler ring manager")

@ -1,29 +0,0 @@
package scheduler
import (
"github.com/grafana/dskit/ring"
)
func (rm *RingManager) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, _ string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) {
// When we initialize the scheduler instance in the ring we want to start from
// a clean situation, so whatever is the state we set it JOINING, while we keep existing
// tokens (if any) or the ones loaded from file.
var tokens []uint32
if instanceExists {
tokens = instanceDesc.GetTokens()
}
takenTokens := ringDesc.GetTokens()
gen := ring.NewRandomTokenGenerator()
newTokens := gen.GenerateTokens(ringNumTokens-len(tokens), takenTokens)
// Tokens sorting will be enforced by the parent caller.
tokens = append(tokens, newTokens...)
return ring.JOINING, tokens
}
func (rm *RingManager) OnRingInstanceTokens(_ *ring.BasicLifecycler, _ ring.Tokens) {}
func (rm *RingManager) OnRingInstanceStopping(_ *ring.BasicLifecycler) {}
func (rm *RingManager) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, _ *ring.Desc, _ *ring.InstanceDesc) {
}

@ -1,252 +0,0 @@
package scheduler
import (
"context"
"fmt"
"net/http"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
)
const (
// ringAutoForgetUnhealthyPeriods is how many consecutive timeout periods an unhealthy instance
// in the ring will be automatically removed.
ringAutoForgetUnhealthyPeriods = 10
// ringKey is the key under which we store the store gateways ring in the KVStore.
ringKey = "scheduler"
// ringNameForServer is the name of the ring used by the compactor server.
ringNameForServer = "scheduler"
// ringReplicationFactor should be 2 because we want 2 schedulers.
ringReplicationFactor = 2
// ringNumTokens sets our single token in the ring,
// we only need to insert 1 token to be used for leader election purposes.
ringNumTokens = 1
// ringCheckPeriod is how often we check the ring to see if this instance is still in
// the replicaset of instances to act as schedulers.
ringCheckPeriod = 3 * time.Second
)
// RingManagerMode defines the different modes for the RingManager to execute.
//
// The RingManager and its modes are only relevant if the Scheduler discovery is done using ring.
type RingManagerMode int
const (
// RingManagerModeReader is the RingManager mode executed by Loki components that want to discover Scheduler instances.
// The RingManager in reader mode will have its own ring key-value store client, but it won't try to register itself in the ring.
RingManagerModeReader RingManagerMode = iota
// RingManagerModeMember is the RingManager mode execute by the Schedulers to register themselves in the ring.
RingManagerModeMember
)
// RingManager is a component instantiated before all the others and is responsible for the ring setup.
//
// All Loki components that are involved with the Schedulers (including the Schedulers itself) will
// require a RingManager. However, the components that are clients of the Schedulers will run it in reader
// mode while the Schedulers itself will run the manager in member mode.
type RingManager struct {
services.Service
subservices *services.Manager
subservicesWatcher *services.FailureWatcher
RingLifecycler *ring.BasicLifecycler
Ring *ring.Ring
managerMode RingManagerMode
cfg Config
log log.Logger
}
// NewRingManager is the recommended way of instantiating a RingManager.
//
// The other functions will assume the RingManager was instantiated through this function.
func NewRingManager(managerMode RingManagerMode, cfg Config, log log.Logger, registerer prometheus.Registerer) (*RingManager, error) {
rm := &RingManager{
cfg: cfg, log: log, managerMode: managerMode,
}
if !cfg.UseSchedulerRing {
return nil, fmt.Errorf("ring manager shouldn't be invoked when ring is not used for discovering schedulers")
}
// instantiate kv store for both modes.
ringStore, err := kv.NewClient(
rm.cfg.SchedulerRing.KVStore,
ring.GetCodec(),
kv.RegistererWithKVName(prometheus.WrapRegistererWithPrefix("loki_", registerer), "scheduler"),
rm.log,
)
if err != nil {
return nil, errors.Wrap(err, "scheduler ring manager create KV store client")
}
// instantiate ring for both mode modes.
ringCfg := rm.cfg.SchedulerRing.ToRingConfig(ringReplicationFactor)
rm.Ring, err = ring.NewWithStoreClientAndStrategy(
ringCfg,
ringNameForServer,
ringKey,
ringStore,
ring.NewIgnoreUnhealthyInstancesReplicationStrategy(),
prometheus.WrapRegistererWithPrefix("cortex_", registerer),
rm.log,
)
if err != nil {
return nil, errors.Wrap(err, "failed to create ring client for scheduler ring manager")
}
if managerMode == RingManagerModeMember {
if err := rm.startMemberMode(ringStore, registerer); err != nil {
return nil, err
}
return rm, nil
}
if err := rm.startReaderMode(); err != nil {
return nil, err
}
return rm, nil
}
func (rm *RingManager) startMemberMode(ringStore kv.Client, registerer prometheus.Registerer) error {
lifecyclerCfg, err := rm.cfg.SchedulerRing.ToLifecyclerConfig(ringNumTokens, rm.log)
if err != nil {
return errors.Wrap(err, "invalid ring lifecycler config")
}
delegate := ring.BasicLifecyclerDelegate(rm)
delegate = ring.NewLeaveOnStoppingDelegate(delegate, rm.log)
delegate = ring.NewTokensPersistencyDelegate(rm.cfg.SchedulerRing.TokensFilePath, ring.JOINING, delegate, rm.log)
delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*rm.cfg.SchedulerRing.HeartbeatTimeout, delegate, rm.log)
rm.RingLifecycler, err = ring.NewBasicLifecycler(lifecyclerCfg, ringNameForServer, ringKey, ringStore, delegate, rm.log, registerer)
if err != nil {
return errors.Wrap(err, "failed to create ring lifecycler for scheduler ring manager")
}
svcs := []services.Service{rm.RingLifecycler, rm.Ring}
rm.subservices, err = services.NewManager(svcs...)
if err != nil {
return errors.Wrap(err, "failed to create services manager for scheduler ring manager in member mode")
}
rm.subservicesWatcher = services.NewFailureWatcher()
rm.subservicesWatcher.WatchManager(rm.subservices)
rm.Service = services.NewBasicService(rm.starting, rm.running, rm.stopping)
return nil
}
func (rm *RingManager) startReaderMode() error {
var err error
svcs := []services.Service{rm.Ring}
rm.subservices, err = services.NewManager(svcs...)
if err != nil {
return errors.Wrap(err, "failed to create services manager for scheduler ring manager in reader mode")
}
rm.subservicesWatcher = services.NewFailureWatcher()
rm.subservicesWatcher.WatchManager(rm.subservices)
rm.Service = services.NewIdleService(func(ctx context.Context) error {
return services.StartManagerAndAwaitHealthy(ctx, rm.subservices)
}, func(failureCase error) error {
return services.StopManagerAndAwaitStopped(context.Background(), rm.subservices)
})
return nil
}
// starting implements the Lifecycler interface and is one of the lifecycle hooks.
func (rm *RingManager) starting(ctx context.Context) (err error) {
// In case this function will return error we want to unregister the instance
// from the ring. We do it ensuring dependencies are gracefully stopped if they
// were already started.
defer func() {
if err == nil || rm.subservices == nil {
return
}
if stopErr := services.StopManagerAndAwaitStopped(context.Background(), rm.subservices); stopErr != nil {
level.Error(rm.log).Log("msg", "failed to gracefully stop scheduler ring manager dependencies", "err", stopErr)
}
}()
if err := services.StartManagerAndAwaitHealthy(ctx, rm.subservices); err != nil {
return errors.Wrap(err, "unable to start scheduler ring manager subservices")
}
// The BasicLifecycler does not automatically move state to ACTIVE such that any additional work that
// someone wants to do can be done before becoming ACTIVE. For the schedulers we don't currently
// have any additional work so we can become ACTIVE right away.
// Wait until the ring client detected this instance in the JOINING
// state to make sure that when we'll run the initial sync we already
// know the tokens assigned to this instance.
level.Info(rm.log).Log("msg", "waiting until scheduler is JOINING in the ring")
if err := ring.WaitInstanceState(ctx, rm.Ring, rm.RingLifecycler.GetInstanceID(), ring.JOINING); err != nil {
return err
}
level.Info(rm.log).Log("msg", "scheduler is JOINING in the ring")
if err = rm.RingLifecycler.ChangeState(ctx, ring.ACTIVE); err != nil {
return errors.Wrapf(err, "switch instance to %s in the ring", ring.ACTIVE)
}
// Wait until the ring client detected this instance in the ACTIVE state to
// make sure that when we'll run the loop it won't be detected as a ring
// topology change.
level.Info(rm.log).Log("msg", "waiting until scheduler is ACTIVE in the ring")
if err := ring.WaitInstanceState(ctx, rm.Ring, rm.RingLifecycler.GetInstanceID(), ring.ACTIVE); err != nil {
return err
}
level.Info(rm.log).Log("msg", "scheduler is ACTIVE in the ring")
return nil
}
// running implements the Lifecycler interface and is one of the lifecycle hooks.
func (rm *RingManager) running(ctx context.Context) error {
t := time.NewTicker(ringCheckPeriod)
defer t.Stop()
for {
select {
case <-ctx.Done():
return nil
case err := <-rm.subservicesWatcher.Chan():
return errors.Wrap(err, "running scheduler ring manager subservice failed")
case <-t.C:
continue
}
}
}
// stopping implements the Lifecycler interface and is one of the lifecycle hooks.
func (rm *RingManager) stopping(_ error) error {
level.Debug(rm.log).Log("msg", "stopping scheduler ring manager")
return services.StopManagerAndAwaitStopped(context.Background(), rm.subservices)
}
// ServeHTTP serves the HTTP route /scheduler/ring.
func (rm *RingManager) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if rm.cfg.UseSchedulerRing {
rm.Ring.ServeHTTP(w, req)
} else {
_, _ = w.Write([]byte("QueryScheduler running with '-query-scheduler.use-scheduler-ring' set to false."))
}
}

@ -18,6 +18,7 @@ import (
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/tenant"
"github.com/grafana/dskit/user"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
@ -27,14 +28,13 @@ import (
"go.uber.org/atomic"
"google.golang.org/grpc"
"github.com/grafana/dskit/tenant"
"github.com/grafana/loki/pkg/lokifrontend/frontend/v2/frontendv2pb"
"github.com/grafana/loki/pkg/scheduler/queue"
"github.com/grafana/loki/pkg/scheduler/schedulerpb"
"github.com/grafana/loki/pkg/util"
lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc"
lokihttpreq "github.com/grafana/loki/pkg/util/httpreq"
lokiring "github.com/grafana/loki/pkg/util/ring"
"github.com/grafana/loki/pkg/util/validation"
)
@ -73,7 +73,7 @@ type Scheduler struct {
inflightRequests prometheus.Summary
// Ring used for finding schedulers
ringManager *RingManager
ringManager *lokiring.RingManager
// Controls for this being a chosen scheduler
shouldRun atomic.Bool
@ -100,8 +100,8 @@ type Config struct {
QuerierForgetDelay time.Duration `yaml:"querier_forget_delay"`
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=This configures the gRPC client used to report errors back to the query-frontend."`
// Schedulers ring
UseSchedulerRing bool `yaml:"use_scheduler_ring"`
SchedulerRing util.RingConfig `yaml:"scheduler_ring,omitempty" doc:"description=The hash ring configuration. This option is required only if use_scheduler_ring is true."`
UseSchedulerRing bool `yaml:"use_scheduler_ring"`
SchedulerRing lokiring.RingConfig `yaml:"scheduler_ring,omitempty" doc:"description=The hash ring configuration. This option is required only if use_scheduler_ring is true."`
}
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
@ -114,12 +114,12 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
}
// NewScheduler creates a new Scheduler.
func NewScheduler(cfg Config, limits Limits, log log.Logger, ringManager *RingManager, registerer prometheus.Registerer) (*Scheduler, error) {
func NewScheduler(cfg Config, limits Limits, log log.Logger, ringManager *lokiring.RingManager, registerer prometheus.Registerer) (*Scheduler, error) {
if cfg.UseSchedulerRing {
if ringManager == nil {
return nil, errors.New("ring manager can't be empty when use_scheduler_ring is true")
} else if ringManager.managerMode != RingManagerModeMember {
return nil, errors.New("ring manager must be initialized in RingManagerModeMember for query schedulers")
} else if ringManager.Mode != lokiring.ServerMode {
return nil, errors.New("ring manager must be initialized in ServerMode for query schedulers")
}
}
@ -579,7 +579,7 @@ func (s *Scheduler) running(ctx context.Context) error {
inflightRequestsTicker := time.NewTicker(250 * time.Millisecond)
defer inflightRequestsTicker.Stop()
ringCheckTicker := time.NewTicker(ringCheckPeriod)
ringCheckTicker := time.NewTicker(lokiring.RingCheckPeriod)
defer ringCheckTicker.Stop()
for {
@ -592,7 +592,7 @@ func (s *Scheduler) running(ctx context.Context) error {
if !s.cfg.UseSchedulerRing {
continue
}
isInSet, err := util.IsInReplicationSet(s.ringManager.Ring, util.RingKeyOfLeader, s.ringManager.RingLifecycler.GetInstanceAddr())
isInSet, err := lokiring.IsInReplicationSet(s.ringManager.Ring, util.RingKeyOfLeader, s.ringManager.RingLifecycler.GetInstanceAddr())
if err != nil {
level.Error(s.log).Log("msg", "failed to query the ring to see if scheduler instance is in ReplicatonSet, will try again", "err", err)
continue
@ -662,10 +662,10 @@ func (s *Scheduler) getConnectedFrontendClientsMetric() float64 {
// SafeReadRing does a nil check on the Scheduler before attempting to return it's ring
// this is necessary as many callers of this function will only have a valid Scheduler
// reference if the QueryScheduler target has been specified, which is not guaranteed
func SafeReadRing(s *RingManager) ring.ReadRing {
if s == nil || s.Ring == nil || !s.cfg.UseSchedulerRing {
func SafeReadRing(cfg Config, rm *lokiring.RingManager) ring.ReadRing {
if rm == nil || rm.Ring == nil || !cfg.UseSchedulerRing {
return nil
}
return s.Ring
return rm.Ring
}

@ -4,7 +4,7 @@ import (
"flag"
"fmt"
loki_util "github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/ring"
)
// Mode represents in which mode an Index Gateway instance is running.
@ -47,10 +47,11 @@ const (
RingMode Mode = "ring"
)
// RingCfg is a wrapper for our Index Gateway ring configuration plus the replication factor.
// RingCfg is identical to ring.RingConfigWithRF with the difference that the
// ReplicationFactor field is deprecated.
type RingCfg struct {
// InternalRingCfg configures the Index Gateway ring.
loki_util.RingConfig `yaml:",inline"`
ring.RingConfig `yaml:",inline"`
// ReplicationFactor defines how many Index Gateway instances are assigned to each tenant.
//

@ -1,29 +0,0 @@
package indexgateway
import (
"github.com/grafana/dskit/ring"
)
func (rm *RingManager) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, _ string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) {
// When we initialize the index gateway instance in the ring we want to start from
// a clean situation, so whatever is the state we set it JOINING, while we keep existing
// tokens (if any) or the ones loaded from file.
var tokens []uint32
if instanceExists {
tokens = instanceDesc.GetTokens()
}
takenTokens := ringDesc.GetTokens()
gen := ring.NewRandomTokenGenerator()
newTokens := gen.GenerateTokens(ringNumTokens-len(tokens), takenTokens)
// Tokens sorting will be enforced by the parent caller.
tokens = append(tokens, newTokens...)
return ring.JOINING, tokens
}
func (rm *RingManager) OnRingInstanceTokens(_ *ring.BasicLifecycler, _ ring.Tokens) {}
func (rm *RingManager) OnRingInstanceStopping(_ *ring.BasicLifecycler) {}
func (rm *RingManager) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, _ *ring.Desc, _ *ring.InstanceDesc) {
}

@ -1,235 +0,0 @@
package indexgateway
import (
"context"
"fmt"
"net/http"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
)
const (
ringAutoForgetUnhealthyPeriods = 10
ringNameForServer = "index-gateway"
ringNumTokens = 128
ringCheckPeriod = 3 * time.Second
// RingIdentifier is used as a unique name to register the Index Gateway ring.
RingIdentifier = "index-gateway"
// RingKey is the name of the key used to register the different Index Gateway instances in the key-value store.
RingKey = "index-gateway"
)
// ManagerMode defines the different modes for the RingManager to execute.
//
// The RingManager and its modes are only relevant if the IndexGateway is running in ring mode.
type ManagerMode int
const (
// ClientMode is the RingManager mode executed by Loki components that are clients of the IndexGateway.
// The RingManager in client will have its own ring key-value store but it won't try to register itself in the ring.
ClientMode ManagerMode = iota
// ServerMode is the RingManager mode execute by the IndexGateway.
// The RingManager in server mode will register itself in the ring.
ServerMode
)
// RingManager is a component instantiated before all the others and is responsible for the ring setup.
//
// All Loki components that are involved with the IndexGateway (including the IndexGateway itself) will
// require a RingManager. However, the components that are clients of the IndexGateway will ran it in client
// mode while the IndexGateway itself will ran the manager in server mode.
type RingManager struct {
services.Service
subservices *services.Manager
subservicesWatcher *services.FailureWatcher
RingLifecycler *ring.BasicLifecycler
Ring *ring.Ring
Mode ManagerMode
cfg Config
log log.Logger
}
// NewRingManager is the recommended way of instantiating a RingManager.
//
// The other functions will assume the RingManager was instantiated through this function.
func NewRingManager(mode ManagerMode, cfg Config, log log.Logger, registerer prometheus.Registerer) (*RingManager, error) {
rm := &RingManager{
cfg: cfg, log: log, Mode: mode,
}
if cfg.Mode != RingMode {
return nil, fmt.Errorf("ring manager shouldn't be invoked when index gateway not in ring mode")
}
// instantiate kv store for both modes.
ringStore, err := kv.NewClient(
rm.cfg.Ring.KVStore,
ring.GetCodec(),
kv.RegistererWithKVName(prometheus.WrapRegistererWithPrefix("loki_", registerer), "index-gateway-ring-manager"),
rm.log,
)
if err != nil {
return nil, errors.Wrap(err, "index gateway ring manager create KV store client")
}
// instantiate ring for both mode modes.
ringCfg := rm.cfg.Ring.ToRingConfig(rm.cfg.Ring.ReplicationFactor)
rm.Ring, err = ring.NewWithStoreClientAndStrategy(ringCfg, ringNameForServer, RingKey, ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy(), prometheus.WrapRegistererWithPrefix("loki_", registerer), rm.log)
if err != nil {
return nil, errors.Wrap(err, "index gateway ring manager create ring client")
}
if mode == ServerMode {
if err := rm.startServerMode(ringStore, registerer); err != nil {
return nil, err
}
return rm, nil
}
if err := rm.startClientMode(); err != nil {
return nil, err
}
return rm, nil
}
func (rm *RingManager) startServerMode(ringStore kv.Client, registerer prometheus.Registerer) error {
lifecyclerCfg, err := rm.cfg.Ring.ToLifecyclerConfig(ringNumTokens, rm.log)
if err != nil {
return errors.Wrap(err, "invalid ring lifecycler config")
}
delegate := ring.BasicLifecyclerDelegate(rm)
delegate = ring.NewLeaveOnStoppingDelegate(delegate, rm.log)
delegate = ring.NewTokensPersistencyDelegate(rm.cfg.Ring.TokensFilePath, ring.JOINING, delegate, rm.log)
delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*rm.cfg.Ring.HeartbeatTimeout, delegate, rm.log)
rm.RingLifecycler, err = ring.NewBasicLifecycler(lifecyclerCfg, ringNameForServer, RingKey, ringStore, delegate, rm.log, registerer)
if err != nil {
return errors.Wrap(err, "index gateway ring manager create ring lifecycler")
}
svcs := []services.Service{rm.RingLifecycler, rm.Ring}
rm.subservices, err = services.NewManager(svcs...)
if err != nil {
return errors.Wrap(err, "new index gateway services manager in server mode")
}
rm.subservicesWatcher = services.NewFailureWatcher()
rm.subservicesWatcher.WatchManager(rm.subservices)
rm.Service = services.NewBasicService(rm.starting, rm.running, rm.stopping)
return nil
}
func (rm *RingManager) startClientMode() error {
var err error
svcs := []services.Service{rm.Ring}
rm.subservices, err = services.NewManager(svcs...)
if err != nil {
return errors.Wrap(err, "new index gateway services manager in client mode")
}
rm.subservicesWatcher = services.NewFailureWatcher()
rm.subservicesWatcher.WatchManager(rm.subservices)
rm.Service = services.NewIdleService(func(ctx context.Context) error {
return services.StartManagerAndAwaitHealthy(ctx, rm.subservices)
}, func(failureCase error) error {
return services.StopManagerAndAwaitStopped(context.Background(), rm.subservices)
})
return nil
}
// starting implements the Lifecycler interface and is one of the lifecycle hooks.
func (rm *RingManager) starting(ctx context.Context) (err error) {
// In case this function will return error we want to unregister the instance
// from the ring. We do it ensuring dependencies are gracefully stopped if they
// were already started.
defer func() {
if err == nil || rm.subservices == nil {
return
}
if stopErr := services.StopManagerAndAwaitStopped(context.Background(), rm.subservices); stopErr != nil {
level.Error(rm.log).Log("msg", "failed to gracefully stop index gateway ring manager dependencies", "err", stopErr)
}
}()
if err := services.StartManagerAndAwaitHealthy(ctx, rm.subservices); err != nil {
return errors.Wrap(err, "unable to start index gateway ring manager subservices")
}
// The BasicLifecycler does not automatically move state to ACTIVE such that any additional work that
// someone wants to do can be done before becoming ACTIVE. For the index gateway we don't currently
// have any additional work so we can become ACTIVE right away.
// Wait until the ring client detected this instance in the JOINING
// state to make sure that when we'll run the initial sync we already
// know the tokens assigned to this instance.
level.Info(rm.log).Log("msg", "waiting until index gateway is JOINING in the ring")
if err := ring.WaitInstanceState(ctx, rm.Ring, rm.RingLifecycler.GetInstanceID(), ring.JOINING); err != nil {
return err
}
level.Info(rm.log).Log("msg", "index gateway is JOINING in the ring")
if err = rm.RingLifecycler.ChangeState(ctx, ring.ACTIVE); err != nil {
return errors.Wrapf(err, "switch instance to %s in the ring", ring.ACTIVE)
}
// Wait until the ring client detected this instance in the ACTIVE state to
// make sure that when we'll run the loop it won't be detected as a ring
// topology change.
level.Info(rm.log).Log("msg", "waiting until index gateway is ACTIVE in the ring")
if err := ring.WaitInstanceState(ctx, rm.Ring, rm.RingLifecycler.GetInstanceID(), ring.ACTIVE); err != nil {
return err
}
level.Info(rm.log).Log("msg", "index gateway is ACTIVE in the ring")
return nil
}
// running implements the Lifecycler interface and is one of the lifecycle hooks.
func (rm *RingManager) running(ctx context.Context) error {
t := time.NewTicker(ringCheckPeriod)
defer t.Stop()
for {
select {
case <-ctx.Done():
return nil
case err := <-rm.subservicesWatcher.Chan():
return errors.Wrap(err, "running index gateway ring manager subservice failed")
case <-t.C:
continue
}
}
}
// stopping implements the Lifecycler interface and is one of the lifecycle hooks.
func (rm *RingManager) stopping(_ error) error {
level.Debug(rm.log).Log("msg", "stopping index gateway ring manager")
return services.StopManagerAndAwaitStopped(context.Background(), rm.subservices)
}
// ServeHTTP serves the HTTP route /indexgateway/ring.
func (rm *RingManager) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if rm.cfg.Mode == RingMode {
rm.Ring.ServeHTTP(w, req)
} else {
_, _ = w.Write([]byte("IndexGateway running with 'useIndexGatewayRing' disabled."))
}
}

@ -3,6 +3,8 @@ package indexgateway
import (
"github.com/grafana/dskit/ring"
"github.com/pkg/errors"
lokiring "github.com/grafana/loki/pkg/util/ring"
)
var (
@ -103,8 +105,8 @@ func (s *NoopStrategy) FilterTenants(tenantIDs []string) ([]string, error) {
// GetShardingStrategy returns the correct ShardingStrategy implementation based
// on provided configuration.
func GetShardingStrategy(cfg Config, indexGatewayRingManager *RingManager, o Limits) ShardingStrategy {
if cfg.Mode != RingMode || indexGatewayRingManager.Mode == ClientMode {
func GetShardingStrategy(cfg Config, indexGatewayRingManager *lokiring.RingManager, o Limits) ShardingStrategy {
if cfg.Mode != RingMode || indexGatewayRingManager.Mode == lokiring.ClientMode {
return NewNoopStrategy()
}
instanceAddr := indexGatewayRingManager.RingLifecycler.GetInstanceAddr()

@ -1,9 +1,11 @@
package util
package ring
import (
"hash/fnv"
"github.com/grafana/dskit/ring"
"github.com/grafana/loki/pkg/util"
)
// TokenFor generates a token used for finding ingesters from ring
@ -22,5 +24,5 @@ func IsInReplicationSet(r ring.ReadRing, ringKey uint32, address string) (bool,
if err != nil {
return false, err
}
return StringsContain(rs.GetAddresses(), address), nil
return util.StringsContain(rs.GetAddresses(), address), nil
}

@ -1,4 +1,4 @@
package util
package ring
import (
"flag"
@ -21,7 +21,7 @@ import (
// many options not really required by the distributors ring. This config
// is used to strip down the config to the minimum, and avoid confusion
// to the user.
type RingConfig struct {
type RingConfig struct { // nolint:revive
KVStore kv.Config `yaml:"kvstore"`
HeartbeatPeriod time.Duration `yaml:"heartbeat_period"`
HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"`
@ -122,3 +122,18 @@ func (cfg *RingConfig) ToRingConfig(replicationFactor int) ring.Config {
return rc
}
// RingConfigWithRF is a wrapper for our internally used ring configuration plus the replication factor.
type RingConfigWithRF struct { // nolint:revive
// RingConfig configures the ring.
RingConfig `yaml:",inline"`
// ReplicationFactor defines how many replicas store a single data shard.
ReplicationFactor int `yaml:"replication_factor"`
}
// RegisterFlagsWithPrefix registers all Bloom Gateway CLI flags.
func (cfg *RingConfigWithRF) RegisterFlagsWithPrefix(prefix, storePrefix string, f *flag.FlagSet) {
cfg.RingConfig.RegisterFlagsWithPrefix(prefix, storePrefix, f)
f.IntVar(&cfg.ReplicationFactor, prefix+"replication-factor", 3, "Factor for data replication.")
}

@ -1,4 +1,4 @@
package util
package ring
import (
"testing"

@ -1,4 +1,4 @@
package bloomgateway
package ring
import (
"context"
@ -17,15 +17,7 @@ import (
const (
ringAutoForgetUnhealthyPeriods = 10
ringNameForServer = "bloom-gateway"
ringNumTokens = 128
ringCheckPeriod = 3 * time.Second
// RingIdentifier is used as a unique name to register the Bloom Gateway ring.
RingIdentifier = "bloom-gateway"
// RingKey is the name of the key used to register the different Bloom Gateway instances in the key-value store.
RingKey = "bloom-gateway"
RingCheckPeriod = 3 * time.Second
)
// ManagerMode defines the different modes for the RingManager to execute.
@ -48,10 +40,16 @@ const (
// All Loki components that are involved with the Bloom Gateway (including the Bloom Gateway itself) will
// require a RingManager. However, the components that are clients of the Bloom Gateway will ran it in client
// mode while the Bloom Gateway itself will ran the manager in server mode.
type RingManager struct {
type RingManager struct { // nolint:revive
services.Service
cfg Config
Mode ManagerMode
name string
rf int
tokens int
cfg RingConfig
logger log.Logger
subservices *services.Manager
@ -59,40 +57,39 @@ type RingManager struct {
RingLifecycler *ring.BasicLifecycler
Ring *ring.Ring
Mode ManagerMode
}
// NewRingManager instantiates a new RingManager instance.
// The other functions will assume the RingManager was instantiated through this function.
func NewRingManager(mode ManagerMode, cfg Config, logger log.Logger, registerer prometheus.Registerer) (*RingManager, error) {
func NewRingManager(name string, mode ManagerMode, cfg RingConfig, rf int, tokens int, logger log.Logger, registerer prometheus.Registerer) (*RingManager, error) {
rm := &RingManager{
cfg: cfg, logger: logger, Mode: mode,
cfg: cfg, logger: logger, Mode: mode, name: name, tokens: tokens, rf: rf,
}
// instantiate kv store for both modes.
ringStore, err := kv.NewClient(
rm.cfg.Ring.KVStore,
rm.cfg.KVStore,
ring.GetCodec(),
kv.RegistererWithKVName(prometheus.WrapRegistererWithPrefix("loki_", registerer), "bloom-gateway-ring-manager"),
kv.RegistererWithKVName(prometheus.WrapRegistererWithPrefix("loki_", registerer), fmt.Sprintf("%s-ring-manager", name)),
rm.logger,
)
if err != nil {
return nil, errors.Wrap(err, "bloom gateway ring manager create KV store client")
return nil, errors.Wrapf(err, "%s ring manager create KV store client", name)
}
// instantiate ring for both mode modes.
ringCfg := rm.cfg.Ring.ToRingConfig(rm.cfg.Ring.ReplicationFactor)
ringCfg := rm.cfg.ToRingConfig(rf)
rm.Ring, err = ring.NewWithStoreClientAndStrategy(
ringCfg,
ringNameForServer,
RingKey,
name,
name,
ringStore,
ring.NewIgnoreUnhealthyInstancesReplicationStrategy(),
prometheus.WrapRegistererWithPrefix("loki_", registerer),
rm.logger,
)
if err != nil {
return nil, errors.Wrap(err, "bloom gateway ring manager create ring client")
return nil, errors.Wrapf(err, "%s ring manager create ring client", name)
}
switch mode {
@ -105,32 +102,32 @@ func NewRingManager(mode ManagerMode, cfg Config, logger log.Logger, registerer
return nil, err
}
default:
return nil, fmt.Errorf("starting bloom gateway in unsupported mode %v", mode)
return nil, fmt.Errorf("starting %s ring in unsupported mode %v", name, mode)
}
return rm, nil
}
func (rm *RingManager) startServerMode(ringStore kv.Client, registerer prometheus.Registerer) error {
lifecyclerCfg, err := rm.cfg.Ring.ToLifecyclerConfig(ringNumTokens, rm.logger)
lifecyclerCfg, err := rm.cfg.ToLifecyclerConfig(rm.tokens, rm.logger)
if err != nil {
return errors.Wrap(err, "invalid ring lifecycler config")
}
delegate := ring.BasicLifecyclerDelegate(rm)
delegate = ring.NewLeaveOnStoppingDelegate(delegate, rm.logger)
delegate = ring.NewTokensPersistencyDelegate(rm.cfg.Ring.TokensFilePath, ring.JOINING, delegate, rm.logger)
delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*rm.cfg.Ring.HeartbeatTimeout, delegate, rm.logger)
delegate = ring.NewTokensPersistencyDelegate(rm.cfg.TokensFilePath, ring.JOINING, delegate, rm.logger)
delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*rm.cfg.HeartbeatTimeout, delegate, rm.logger)
rm.RingLifecycler, err = ring.NewBasicLifecycler(lifecyclerCfg, ringNameForServer, RingKey, ringStore, delegate, rm.logger, registerer)
rm.RingLifecycler, err = ring.NewBasicLifecycler(lifecyclerCfg, rm.name, rm.name, ringStore, delegate, rm.logger, registerer)
if err != nil {
return errors.Wrap(err, "bloom gateway ring manager create ring lifecycler")
return errors.Wrapf(err, "%s ring manager create ring lifecycler", rm.name)
}
svcs := []services.Service{rm.RingLifecycler, rm.Ring}
rm.subservices, err = services.NewManager(svcs...)
if err != nil {
return errors.Wrap(err, "new bloom gateway services manager in server mode")
return errors.Wrapf(err, "new %s services manager in server mode", rm.name)
}
rm.subservicesWatcher = services.NewFailureWatcher()
@ -146,7 +143,7 @@ func (rm *RingManager) startClientMode() error {
svcs := []services.Service{rm.Ring}
rm.subservices, err = services.NewManager(svcs...)
if err != nil {
return errors.Wrap(err, "new bloom gateway services manager in client mode")
return errors.Wrapf(err, "new %s services manager in client mode", rm.name)
}
rm.subservicesWatcher = services.NewFailureWatcher()
@ -172,12 +169,12 @@ func (rm *RingManager) starting(ctx context.Context) (err error) {
}
if stopErr := services.StopManagerAndAwaitStopped(context.Background(), rm.subservices); stopErr != nil {
level.Error(rm.logger).Log("msg", "failed to gracefully stop bloom gateway ring manager dependencies", "err", stopErr)
level.Error(rm.logger).Log("msg", "failed to gracefully stop ring manager dependencies", "name", rm.name, "err", stopErr)
}
}()
if err := services.StartManagerAndAwaitHealthy(ctx, rm.subservices); err != nil {
return errors.Wrap(err, "unable to start bloom gateway ring manager subservices")
return errors.Wrapf(err, "unable to start %s ring manager subservices", rm.name)
}
// The BasicLifecycler does not automatically move state to ACTIVE such that any additional work that
@ -186,11 +183,11 @@ func (rm *RingManager) starting(ctx context.Context) (err error) {
// Wait until the ring client detected this instance in the JOINING
// state to make sure that when we'll run the initial sync we already
// know the tokens assigned to this instance.
level.Info(rm.logger).Log("msg", "waiting until bloom gateway is JOINING in the ring")
level.Info(rm.logger).Log("msg", fmt.Sprintf("waiting until %s is JOINING in the ring", rm.name))
if err := ring.WaitInstanceState(ctx, rm.Ring, rm.RingLifecycler.GetInstanceID(), ring.JOINING); err != nil {
return err
}
level.Info(rm.logger).Log("msg", "bloom gateway is JOINING in the ring")
level.Info(rm.logger).Log("msg", fmt.Sprintf("%s is JOINING in the ring", rm.name))
if err = rm.RingLifecycler.ChangeState(ctx, ring.ACTIVE); err != nil {
return errors.Wrapf(err, "switch instance to %s in the ring", ring.ACTIVE)
@ -199,25 +196,25 @@ func (rm *RingManager) starting(ctx context.Context) (err error) {
// Wait until the ring client detected this instance in the ACTIVE state to
// make sure that when we'll run the loop it won't be detected as a ring
// topology change.
level.Info(rm.logger).Log("msg", "waiting until bloom gateway is ACTIVE in the ring")
level.Info(rm.logger).Log("msg", fmt.Sprintf("waiting until %s is ACTIVE in the ring", rm.name))
if err := ring.WaitInstanceState(ctx, rm.Ring, rm.RingLifecycler.GetInstanceID(), ring.ACTIVE); err != nil {
return err
}
level.Info(rm.logger).Log("msg", "bloom gateway is ACTIVE in the ring")
level.Info(rm.logger).Log("msg", fmt.Sprintf("%s is ACTIVE in the ring", rm.name))
return nil
}
// running implements the Lifecycler interface and is one of the lifecycle hooks.
func (rm *RingManager) running(ctx context.Context) error {
t := time.NewTicker(ringCheckPeriod)
t := time.NewTicker(RingCheckPeriod)
defer t.Stop()
for {
select {
case <-ctx.Done():
return nil
case err := <-rm.subservicesWatcher.Chan():
return errors.Wrap(err, "running bloom gateway ring manager subservice failed")
return errors.Wrapf(err, "running %s ring manager subservice failed", rm.name)
case <-t.C:
continue
}
@ -226,7 +223,7 @@ func (rm *RingManager) running(ctx context.Context) error {
// stopping implements the Lifecycler interface and is one of the lifecycle hooks.
func (rm *RingManager) stopping(_ error) error {
level.Debug(rm.logger).Log("msg", "stopping bloom gateway ring manager")
level.Debug(rm.logger).Log("msg", fmt.Sprintf("stopping %s ring manager", rm.name))
return services.StopManagerAndAwaitStopped(context.Background(), rm.subservices)
}
@ -246,7 +243,7 @@ func (rm *RingManager) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc
takenTokens := ringDesc.GetTokens()
gen := ring.NewRandomTokenGenerator()
newTokens := gen.GenerateTokens(ringNumTokens-len(tokens), takenTokens)
newTokens := gen.GenerateTokens(rm.tokens-len(tokens), takenTokens)
// Tokens sorting will be enforced by the parent caller.
tokens = append(tokens, newTokens...)
Loading…
Cancel
Save