refactor: Add RF and Tokens to Loki ring Cfg and allow overwriting docs. (#12142)

Co-authored-by: Owen Diehl <ow.diehl@gmail.com>
pull/12222/head
Salva Corts 2 years ago committed by GitHub
parent 862d0fb2cf
commit 63c88489d0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 54
      docs/sources/configure/_index.md
  2. 16
      pkg/bloomcompactor/bloomcompactor_test.go
  3. 33
      pkg/bloomcompactor/config.go
  4. 28
      pkg/bloomgateway/bloomgateway_test.go
  5. 19
      pkg/bloomgateway/config.go
  6. 17
      pkg/compactor/compactor.go
  7. 5
      pkg/loki/config_wrapper.go
  8. 2
      pkg/loki/config_wrapper_test.go
  9. 6
      pkg/loki/loki.go
  10. 10
      pkg/loki/modules.go
  11. 27
      pkg/scheduler/scheduler.go
  12. 50
      pkg/storage/stores/shipper/indexshipper/indexgateway/config.go
  13. 55
      pkg/util/flagext/flagsetskip.go
  14. 26
      pkg/util/ring/ring_config.go
  15. 15
      tools/doc-generator/writer.go

@ -1763,6 +1763,12 @@ ring:
# CLI flag: -index-gateway.ring.zone-awareness-enabled
[zone_awareness_enabled: <boolean> | default = false]
# Deprecated: How many index gateway instances are assigned to each tenant.
# Use -index-gateway.shard-size instead. The shard size is also a per-tenant
# setting.
# CLI flag: -replication-factor
[replication_factor: <int> | default = 3]
# Instance ID to register in the ring.
# CLI flag: -index-gateway.ring.instance-id
[instance_id: <string> | default = "<hostname>"]
@ -1787,12 +1793,6 @@ ring:
# Enable using a IPv6 instance address.
# CLI flag: -index-gateway.ring.instance-enable-ipv6
[instance_enable_ipv6: <boolean> | default = false]
# Deprecated: How many index gateway instances are assigned to each tenant.
# Use -index-gateway.shard-size instead. The shard size is also a per-tenant
# setting.
# CLI flag: -replication-factor
[replication_factor: <int> | default = 3]
```
### bloom_gateway
@ -1860,6 +1860,17 @@ ring:
# CLI flag: -bloom-gateway.ring.zone-awareness-enabled
[zone_awareness_enabled: <boolean> | default = false]
# Number of tokens to use in the ring. The bigger the number of tokens, the
# more fingerprint ranges the compactor will own, but the smaller these ranges
# will be. Bigger number of tokens means that more but smaller requests will
# be handled by each gateway.
# CLI flag: -bloom-gateway.ring.tokens
[num_tokens: <int> | default = 16]
# Factor for data replication.
# CLI flag: -bloom-gateway.ring.replication-factor
[replication_factor: <int> | default = 3]
# Instance ID to register in the ring.
# CLI flag: -bloom-gateway.ring.instance-id
[instance_id: <string> | default = "<hostname>"]
@ -1885,17 +1896,6 @@ ring:
# CLI flag: -bloom-gateway.ring.instance-enable-ipv6
[instance_enable_ipv6: <boolean> | default = false]
# Factor for data replication.
# CLI flag: -bloom-gateway.replication-factor
[replication_factor: <int> | default = 3]
# Number of tokens to use in the ring. The bigger the number of tokens, the
# more fingerprint ranges the compactor will own, but the smaller these ranges
# will be. Bigger number of tokens means that more but smaller requests will
# be handled by each gateway.
# CLI flag: -bloom-gateway.ring.tokens
[tokens: <int> | default = 16]
# Flag to enable or disable the bloom gateway component globally.
# CLI flag: -bloom-gateway.enabled
[enabled: <boolean> | default = false]
@ -2654,6 +2654,11 @@ ring:
# CLI flag: -bloom-compactor.ring.zone-awareness-enabled
[zone_awareness_enabled: <boolean> | default = false]
# Number of tokens to use in the ring per compactor. Higher number of tokens
# will result in more and smaller files (metas and blocks.)
# CLI flag: -bloom-compactor.ring.num-tokens
[num_tokens: <int> | default = 10]
# Instance ID to register in the ring.
# CLI flag: -bloom-compactor.ring.instance-id
[instance_id: <string> | default = "<hostname>"]
@ -2679,13 +2684,6 @@ ring:
# CLI flag: -bloom-compactor.ring.instance-enable-ipv6
[instance_enable_ipv6: <boolean> | default = false]
# Number of tokens to use in the ring. The bigger the number of tokens, the
# more fingerprint ranges the compactor will own, but the smaller these ranges
# will be. Bigger number of tokens will result in more and smaller metas and
# blocks.
# CLI flag: -bloom-compactor.ring.tokens
[tokens: <int> | default = 10]
# Flag to enable or disable the usage of the bloom-compactor component.
# CLI flag: -bloom-compactor.enabled
[enabled: <boolean> | default = false]
@ -3836,6 +3834,14 @@ ring:
# CLI flag: -common.storage.ring.zone-awareness-enabled
[zone_awareness_enabled: <boolean> | default = false]
# Number of tokens to own in the ring.
# CLI flag: -common.storage.ring.num-tokens
[num_tokens: <int> | default = 128]
# Factor for data replication.
# CLI flag: -common.storage.ring.replication-factor
[replication_factor: <int> | default = 3]
# Instance ID to register in the ring.
# CLI flag: -common.storage.ring.instance-id
[instance_id: <string> | default = "<hostname>"]

@ -68,22 +68,20 @@ func TestCompactor_ownsTenant(t *testing.T) {
var ringManagers []*lokiring.RingManager
var compactors []*Compactor
for i := 0; i < tc.compactors; i++ {
var ringCfg RingConfig
ringCfg.RegisterFlagsWithPrefix("", "", flag.NewFlagSet("ring", flag.PanicOnError))
ringCfg.KVStore.Store = "inmemory"
ringCfg.InstanceID = fmt.Sprintf("bloom-compactor-%d", i)
ringCfg.InstanceAddr = fmt.Sprintf("localhost-%d", i)
var cfg Config
cfg.RegisterFlags(flag.NewFlagSet("ring", flag.PanicOnError))
cfg.Ring.KVStore.Store = "inmemory"
cfg.Ring.InstanceID = fmt.Sprintf("bloom-compactor-%d", i)
cfg.Ring.InstanceAddr = fmt.Sprintf("localhost-%d", i)
ringManager, err := lokiring.NewRingManager("bloom-compactor", lokiring.ServerMode, ringCfg.RingConfig, 1, ringCfg.Tokens, util_log.Logger, prometheus.NewRegistry())
ringManager, err := lokiring.NewRingManager("bloom-compactor", lokiring.ServerMode, cfg.Ring, 1, cfg.Ring.NumTokens, util_log.Logger, prometheus.NewRegistry())
require.NoError(t, err)
require.NoError(t, ringManager.StartAsync(context.Background()))
shuffleSharding := util_ring.NewTenantShuffleSharding(ringManager.Ring, ringManager.RingLifecycler, tc.limits.BloomCompactorShardSize)
compactor := &Compactor{
cfg: Config{
Ring: ringCfg,
},
cfg: cfg,
sharding: shuffleSharding,
limits: tc.limits,
}

@ -3,18 +3,23 @@ package bloomcompactor
import (
"flag"
"fmt"
"github.com/pkg/errors"
"time"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/downloads"
"github.com/grafana/loki/pkg/util/ring"
)
const (
ringReplicationFactor = 1
)
// Config configures the bloom-compactor component.
type Config struct {
// Ring configures the ring store used to save and retrieve the different Bloom-Compactor instances.
// In case it isn't explicitly set, it follows the same behavior of the other rings (ex: using the common configuration
// section and the ingester configuration by default).
Ring RingConfig `yaml:"ring,omitempty" doc:"description=Defines the ring to be used by the bloom-compactor servers. In case this isn't configured, this block supports inheriting configuration from the common ring section."`
Ring ring.RingConfig `yaml:"ring,omitempty" doc:"description=Defines the ring to be used by the bloom-compactor servers. In case this isn't configured, this block supports inheriting configuration from the common ring section."`
// Enabled configures whether bloom-compactors should be used to compact index values into bloomfilters
Enabled bool `yaml:"enabled"`
CompactionInterval time.Duration `yaml:"compaction_interval"`
@ -30,7 +35,6 @@ type Config struct {
// RegisterFlags registers flags for the Bloom-Compactor configuration.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.Ring.RegisterFlagsWithPrefix("bloom-compactor.", "collectors/", f)
f.BoolVar(&cfg.Enabled, "bloom-compactor.enabled", false, "Flag to enable or disable the usage of the bloom-compactor component.")
f.DurationVar(&cfg.CompactionInterval, "bloom-compactor.compaction-interval", 10*time.Minute, "Interval at which to re-run the compaction operation.")
f.IntVar(&cfg.WorkerParallelism, "bloom-compactor.worker-parallelism", 1, "Number of workers to run in parallel for compaction.")
@ -48,26 +52,29 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.RetryMaxBackoff, "bloom-compactor.compaction-retries-max-backoff", time.Minute, "Maximum backoff time between retries.")
f.IntVar(&cfg.CompactionRetries, "bloom-compactor.compaction-retries", 3, "Number of retries to perform when compaction fails.")
f.IntVar(&cfg.MaxCompactionParallelism, "bloom-compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.")
// Ring
skipFlags := []string{
"bloom-compactor.ring.num-tokens",
"bloom-compactor.ring.replication-factor",
}
cfg.Ring.RegisterFlagsWithPrefix("bloom-compactor.", "collectors/", f, skipFlags...)
// Overrides
f.IntVar(&cfg.Ring.NumTokens, "bloom-compactor.ring.num-tokens", 10, "Number of tokens to use in the ring per compactor. Higher number of tokens will result in more and smaller files (metas and blocks.)")
// Ignored
f.IntVar(&cfg.Ring.ReplicationFactor, "bloom-compactor.ring.replication-factor", ringReplicationFactor, fmt.Sprintf("IGNORED: Replication factor is fixed to %d", ringReplicationFactor))
}
func (cfg *Config) Validate() error {
if cfg.MinTableCompactionPeriod > cfg.MaxTableCompactionPeriod {
return fmt.Errorf("min_compaction_age must be less than or equal to max_compaction_age")
}
if cfg.Ring.ReplicationFactor != ringReplicationFactor {
return errors.New("Replication factor must not be changed as it will not take effect")
}
return nil
}
type RingConfig struct {
ring.RingConfig `yaml:",inline"`
Tokens int `yaml:"tokens"`
}
func (cfg *RingConfig) RegisterFlagsWithPrefix(flagsPrefix, storePrefix string, f *flag.FlagSet) {
cfg.RingConfig.RegisterFlagsWithPrefix(flagsPrefix, storePrefix, f)
f.IntVar(&cfg.Tokens, flagsPrefix+"ring.tokens", 10, "Number of tokens to use in the ring. The bigger the number of tokens, the more fingerprint ranges the compactor will own, but the smaller these ranges will be. Bigger number of tokens will result in more and smaller metas and blocks.")
}
type Limits interface {
downloads.Limits
BloomCompactorShardSize(tenantID string) int

@ -102,16 +102,12 @@ func TestBloomGateway_StartStopService(t *testing.T) {
cfg := Config{
Enabled: true,
Ring: RingConfig{
RingConfigWithRF: lokiring.RingConfigWithRF{
RingConfig: lokiring.RingConfig{
KVStore: kv.Config{
Mock: kvStore,
},
},
ReplicationFactor: 1,
Ring: lokiring.RingConfig{
KVStore: kv.Config{
Mock: kvStore,
},
Tokens: 16,
ReplicationFactor: 1,
NumTokens: 16,
},
WorkerConcurrency: 4,
MaxOutstandingPerTenant: 1024,
@ -147,16 +143,12 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
cfg := Config{
Enabled: true,
Ring: RingConfig{
RingConfigWithRF: lokiring.RingConfigWithRF{
RingConfig: lokiring.RingConfig{
KVStore: kv.Config{
Mock: kvStore,
},
},
ReplicationFactor: 1,
Ring: lokiring.RingConfig{
KVStore: kv.Config{
Mock: kvStore,
},
Tokens: 16,
ReplicationFactor: 1,
NumTokens: 16,
},
WorkerConcurrency: 4,
MaxOutstandingPerTenant: 1024,

@ -11,7 +11,7 @@ type Config struct {
// Ring configures the ring store used to save and retrieve the different Bloom Gateway instances.
// In case it isn't explicitly set, it follows the same behavior of the other rings (ex: using the common configuration
// section and the ingester configuration by default).
Ring RingConfig `yaml:"ring,omitempty" doc:"description=Defines the ring to be used by the bloom gateway servers and clients. In case this isn't configured, this block supports inheriting configuration from the common ring section."`
Ring ring.RingConfig `yaml:"ring,omitempty" doc:"description=Defines the ring to be used by the bloom gateway servers and clients. In case this isn't configured, this block supports inheriting configuration from the common ring section."`
// Enabled is the global switch to configures whether Bloom Gateways should be used to filter chunks.
Enabled bool `yaml:"enabled"`
// Client configures the Bloom Gateway client
@ -29,7 +29,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
// RegisterFlagsWithPrefix registers flags for the Bloom Gateway configuration with a common prefix.
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.Ring.RegisterFlagsWithPrefix(prefix, "collectors/", f)
f.BoolVar(&cfg.Enabled, prefix+"enabled", false, "Flag to enable or disable the bloom gateway component globally.")
f.IntVar(&cfg.WorkerConcurrency, prefix+"worker-concurrency", 4, "Number of workers to use for filtering chunks concurrently.")
f.IntVar(&cfg.MaxOutstandingPerTenant, prefix+"max-outstanding-per-tenant", 1024, "Maximum number of outstanding tasks per tenant.")
@ -37,17 +36,13 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
// TODO(chaudum): Figure out what the better place is for registering flags
// -bloom-gateway.client.* or -bloom-gateway-client.*
cfg.Client.RegisterFlags(f)
}
type RingConfig struct {
ring.RingConfigWithRF `yaml:",inline"`
Tokens int `yaml:"tokens"`
}
func (cfg *RingConfig) RegisterFlagsWithPrefix(flagsPrefix, storePrefix string, f *flag.FlagSet) {
cfg.RingConfigWithRF.RegisterFlagsWithPrefix(flagsPrefix, storePrefix, f)
f.IntVar(&cfg.Tokens, flagsPrefix+"ring.tokens", 16, "Number of tokens to use in the ring. The bigger the number of tokens, the more fingerprint ranges the compactor will own, but the smaller these ranges will be. Bigger number of tokens means that more but smaller requests will be handled by each gateway.")
// Ring
skipFlags := []string{
prefix + "ring.tokens",
}
cfg.Ring.RegisterFlagsWithPrefix(prefix, "collectors/", f, skipFlags...)
f.IntVar(&cfg.Ring.NumTokens, prefix+"ring.tokens", 16, "Number of tokens to use in the ring. The bigger the number of tokens, the more fingerprint ranges the compactor will own, but the smaller these ranges will be. Bigger number of tokens means that more but smaller requests will be handled by each gateway.")
}
type Limits interface {

@ -110,7 +110,14 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.TablesToCompact, "compactor.tables-to-compact", 0, "Number of tables that compactor will try to compact. Newer tables are chosen when this is less than the number of tables available.")
f.IntVar(&cfg.SkipLatestNTables, "compactor.skip-latest-n-tables", 0, "Do not compact N latest tables. Together with -compactor.run-once and -compactor.tables-to-compact, this is useful when clearing compactor backlogs.")
cfg.CompactorRing.RegisterFlagsWithPrefix("compactor.", "collectors/", f)
// Ring
skipFlags := []string{
"compactor.ring.num-tokens",
"compactor.ring.replication-factor",
}
cfg.CompactorRing.RegisterFlagsWithPrefix("compactor.", "collectors/", f, skipFlags...)
f.IntVar(&cfg.CompactorRing.NumTokens, "compactor.ring.num-tokens", ringNumTokens, fmt.Sprintf("IGNORED: Num tokens is fixed to %d", ringNumTokens))
f.IntVar(&cfg.CompactorRing.ReplicationFactor, "compactor.ring.replication-factor", ringReplicationFactor, fmt.Sprintf("IGNORED: Replication factor is fixed to %d", ringReplicationFactor))
}
// Validate verifies the config does not contain inappropriate values
@ -119,6 +126,14 @@ func (cfg *Config) Validate() error {
return errors.New("max compaction parallelism must be >= 1")
}
if cfg.CompactorRing.NumTokens != ringNumTokens {
return errors.New("Num tokens must not be changed as it will not take effect")
}
if cfg.CompactorRing.ReplicationFactor != ringReplicationFactor {
return errors.New("Replication factor must not be changed as it will not take effect")
}
if cfg.RetentionEnabled {
if cfg.DeleteRequestStore == "" {
return fmt.Errorf("compactor.delete-request-store should be configured when retention is enabled")

@ -178,10 +178,11 @@ func applyInstanceConfigs(r, defaults *ConfigWrapper) {
}
}
// applyCommonReplicationFactor apply the common replication factor to the Index Gateway ring.
// applyCommonReplicationFactor apply the common replication factor to the Index Gateway and Bloom Gateway rings.
func applyCommonReplicationFactor(r, defaults *ConfigWrapper) {
if !reflect.DeepEqual(r.Common.ReplicationFactor, defaults.Common.ReplicationFactor) {
r.IndexGateway.Ring.ReplicationFactor = r.Common.ReplicationFactor
r.BloomGateway.Ring.ReplicationFactor = r.Common.ReplicationFactor
}
}
@ -314,6 +315,7 @@ func applyConfigToRings(r, defaults *ConfigWrapper, rc lokiring.RingConfig, merg
r.BloomCompactor.Ring.InstanceZone = rc.InstanceZone
r.BloomCompactor.Ring.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled
r.BloomCompactor.Ring.KVStore = rc.KVStore
r.BloomCompactor.Ring.NumTokens = rc.NumTokens
}
// BloomGateway
@ -327,6 +329,7 @@ func applyConfigToRings(r, defaults *ConfigWrapper, rc lokiring.RingConfig, merg
r.BloomGateway.Ring.InstanceZone = rc.InstanceZone
r.BloomGateway.Ring.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled
r.BloomGateway.Ring.KVStore = rc.KVStore
r.BloomGateway.Ring.NumTokens = rc.NumTokens
}
}

@ -1174,7 +1174,7 @@ func Test_applyIngesterRingConfig(t *testing.T) {
assert.Equal(t, 9,
reflect.TypeOf(distributor.RingConfig{}).NumField(),
fmt.Sprintf(msgf, reflect.TypeOf(distributor.RingConfig{}).String()))
assert.Equal(t, 13,
assert.Equal(t, 15,
reflect.TypeOf(lokiring.RingConfig{}).NumField(),
fmt.Sprintf(msgf, reflect.TypeOf(lokiring.RingConfig{}).String()))
})

@ -223,6 +223,9 @@ func (c *Config) Validate() error {
if err := c.Querier.Validate(); err != nil {
return errors.Wrap(err, "invalid querier config")
}
if err := c.QueryScheduler.Validate(); err != nil {
return errors.Wrap(err, "invalid query_scheduler config")
}
if err := c.TableManager.Validate(); err != nil {
return errors.Wrap(err, "invalid tablemanager config")
}
@ -241,6 +244,9 @@ func (c *Config) Validate() error {
if err := c.StorageConfig.BoltDBShipperConfig.Validate(); err != nil {
return errors.Wrap(err, "invalid boltdb-shipper config")
}
if err := c.IndexGateway.Validate(); err != nil {
return errors.Wrap(err, "invalid index_gateway config")
}
if err := c.CompactorConfig.Validate(); err != nil {
return errors.Wrap(err, "invalid compactor config")
}

@ -1345,7 +1345,7 @@ func (t *Loki) initBloomGatewayRing() (services.Service, error) {
if t.Cfg.isModuleEnabled(BloomGateway) || t.Cfg.isModuleEnabled(Backend) || legacyReadMode {
mode = lokiring.ServerMode
}
manager, err := lokiring.NewRingManager(bloomGatewayRingKey, mode, t.Cfg.BloomGateway.Ring.RingConfig, t.Cfg.BloomGateway.Ring.ReplicationFactor, t.Cfg.BloomGateway.Ring.Tokens, util_log.Logger, prometheus.DefaultRegisterer)
manager, err := lokiring.NewRingManager(bloomGatewayRingKey, mode, t.Cfg.BloomGateway.Ring, t.Cfg.BloomGateway.Ring.ReplicationFactor, t.Cfg.BloomGateway.Ring.NumTokens, util_log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, gerrors.Wrap(err, "error initializing bloom gateway ring manager")
}
@ -1442,7 +1442,7 @@ func (t *Loki) initIndexGatewayRing() (_ services.Service, err error) {
if t.Cfg.isModuleEnabled(IndexGateway) || legacyReadMode || t.Cfg.isModuleEnabled(Backend) {
managerMode = lokiring.ServerMode
}
rm, err := lokiring.NewRingManager(indexGatewayRingKey, managerMode, t.Cfg.IndexGateway.Ring.RingConfig, t.Cfg.IndexGateway.Ring.ReplicationFactor, 128, util_log.Logger, prometheus.DefaultRegisterer)
rm, err := lokiring.NewRingManager(indexGatewayRingKey, managerMode, t.Cfg.IndexGateway.Ring, t.Cfg.IndexGateway.Ring.ReplicationFactor, indexgateway.NumTokens, util_log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, gerrors.Wrap(err, "new index gateway ring manager")
@ -1493,7 +1493,7 @@ func (t *Loki) initBloomCompactorRing() (services.Service, error) {
// is LegacyMode needed?
// legacyReadMode := t.Cfg.LegacyReadTarget && t.isModuleActive(Read)
rm, err := lokiring.NewRingManager(bloomCompactorRingKey, lokiring.ServerMode, t.Cfg.BloomCompactor.Ring.RingConfig, 1, t.Cfg.BloomCompactor.Ring.Tokens, util_log.Logger, prometheus.DefaultRegisterer)
rm, err := lokiring.NewRingManager(bloomCompactorRingKey, lokiring.ServerMode, t.Cfg.BloomCompactor.Ring, 1, t.Cfg.BloomCompactor.Ring.NumTokens, util_log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, gerrors.Wrap(err, "error initializing bloom-compactor ring manager")
}
@ -1534,9 +1534,7 @@ func (t *Loki) initQuerySchedulerRing() (_ services.Service, err error) {
if t.Cfg.isModuleEnabled(QueryScheduler) || t.Cfg.isModuleEnabled(Backend) || t.Cfg.isModuleEnabled(All) || (t.Cfg.LegacyReadTarget && t.Cfg.isModuleEnabled(Read)) {
managerMode = lokiring.ServerMode
}
rf := 2 // ringReplicationFactor should be 2 because we want 2 schedulers.
tokens := 1 // we only need to insert 1 token to be used for leader election purposes.
rm, err := lokiring.NewRingManager(schedulerRingKey, managerMode, t.Cfg.QueryScheduler.SchedulerRing, rf, tokens, util_log.Logger, prometheus.DefaultRegisterer)
rm, err := lokiring.NewRingManager(schedulerRingKey, managerMode, t.Cfg.QueryScheduler.SchedulerRing, scheduler.ReplicationFactor, scheduler.NumTokens, util_log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, gerrors.Wrap(err, "new scheduler ring manager")

@ -39,6 +39,13 @@ import (
lokiring "github.com/grafana/loki/pkg/util/ring"
)
const (
// NumTokens is 1 since we only need to insert 1 token to be used for leader election purposes.
NumTokens = 1
// ReplicationFactor should be 2 because we want 2 schedulers.
ReplicationFactor = 2
)
var errSchedulerIsNotRunning = errors.New("scheduler is not running")
// Scheduler is responsible for queueing and dispatching queries to Queriers.
@ -111,7 +118,25 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.QuerierForgetDelay, "query-scheduler.querier-forget-delay", 0, "If a querier disconnects without sending notification about graceful shutdown, the query-scheduler will keep the querier in the tenant's shard until the forget delay has passed. This feature is useful to reduce the blast radius when shuffle-sharding is enabled.")
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("query-scheduler.grpc-client-config", f)
f.BoolVar(&cfg.UseSchedulerRing, "query-scheduler.use-scheduler-ring", false, "Set to true to have the query schedulers create and place themselves in a ring. If no frontend_address or scheduler_address are present anywhere else in the configuration, Loki will toggle this value to true.")
cfg.SchedulerRing.RegisterFlagsWithPrefix("query-scheduler.", "collectors/", f)
// Ring
skipFlags := []string{
"query-scheduler.ring.num-tokens",
"query-scheduler.ring.replication-factor",
}
cfg.SchedulerRing.RegisterFlagsWithPrefix("query-scheduler.", "collectors/", f, skipFlags...)
f.IntVar(&cfg.SchedulerRing.NumTokens, "query-scheduler.ring.num-tokens", NumTokens, fmt.Sprintf("IGNORED: Num tokens is fixed to %d", NumTokens))
f.IntVar(&cfg.SchedulerRing.ReplicationFactor, "query-scheduler.ring.replication-factor", ReplicationFactor, fmt.Sprintf("IGNORED: Replication factor is fixed to %d", ReplicationFactor))
}
func (cfg *Config) Validate() error {
if cfg.SchedulerRing.NumTokens != NumTokens {
return errors.New("Num tokens must not be changed as it will not take effect")
}
if cfg.SchedulerRing.ReplicationFactor != ReplicationFactor {
return errors.New("Replication factor must not be changed as it will not take effect")
}
return nil
}
// NewScheduler creates a new Scheduler.

@ -3,10 +3,16 @@ package indexgateway
import (
"flag"
"fmt"
"github.com/pkg/errors"
"github.com/grafana/loki/pkg/util/ring"
)
const (
NumTokens = 128
ReplicationFactor = 3
)
// Mode represents in which mode an Index Gateway instance is running.
//
// Right now, two modes are supported: simple mode (default) and ring mode.
@ -47,26 +53,6 @@ const (
RingMode Mode = "ring"
)
// RingCfg is identical to ring.RingConfigWithRF with the difference that the
// ReplicationFactor field is deprecated.
type RingCfg struct {
// InternalRingCfg configures the Index Gateway ring.
ring.RingConfig `yaml:",inline"`
// ReplicationFactor defines how many Index Gateway instances are assigned to each tenant.
//
// Whenever the store queries the ring key-value store for the Index Gateway instance responsible for tenant X,
// multiple Index Gateway instances are expected to be returned as Index Gateway might be busy/locked for specific
// reasons (this is assured by the spikey behavior of Index Gateway latencies).
ReplicationFactor int `yaml:"replication_factor"`
}
// RegisterFlagsWithPrefix register all Index Gateway flags related to its ring but with a proper store prefix to avoid conflicts.
func (cfg *RingCfg) RegisterFlags(prefix, storePrefix string, f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix(prefix, storePrefix, f)
f.IntVar(&cfg.ReplicationFactor, "replication-factor", 3, "Deprecated: How many index gateway instances are assigned to each tenant. Use -index-gateway.shard-size instead. The shard size is also a per-tenant setting.")
}
// Config configures an Index Gateway server.
type Config struct {
// Mode configures in which mode the client will be running when querying and communicating with an Index Gateway instance.
@ -76,11 +62,31 @@ type Config struct {
//
// In case it isn't explicitly set, it follows the same behavior of the other rings (ex: using the common configuration
// section and the ingester configuration by default).
Ring RingCfg `yaml:"ring,omitempty" doc:"description=Defines the ring to be used by the index gateway servers and clients in case the servers are configured to run in 'ring' mode. In case this isn't configured, this block supports inheriting configuration from the common ring section."`
Ring ring.RingConfig `yaml:"ring,omitempty" doc:"description=Defines the ring to be used by the index gateway servers and clients in case the servers are configured to run in 'ring' mode. In case this isn't configured, this block supports inheriting configuration from the common ring section."`
}
// RegisterFlags register all IndexGatewayClientConfig flags and all the flags of its subconfigs but with a prefix (ex: shipper).
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.Ring.RegisterFlags("index-gateway.", "collectors/", f)
f.StringVar((*string)(&cfg.Mode), "index-gateway.mode", SimpleMode.String(), "Defines in which mode the index gateway server will operate (default to 'simple'). It supports two modes:\n- 'simple': an index gateway server instance is responsible for handling, storing and returning requests for all indices for all tenants.\n- 'ring': an index gateway server instance is responsible for a subset of tenants instead of all tenants.")
// Ring
skipFlags := []string{
"index-gateway.ring.num-tokens",
"index-gateway.ring.replication-factor",
}
cfg.Ring.RegisterFlagsWithPrefix("index-gateway.", "collectors/", f, skipFlags...)
f.IntVar(&cfg.Ring.NumTokens, "index-gateway.ring.num-tokens", NumTokens, fmt.Sprintf("IGNORED: Num tokens is fixed to %d", NumTokens))
// ReplicationFactor defines how many Index Gateway instances are assigned to each tenant.
//
// Whenever the store queries the ring key-value store for the Index Gateway instance responsible for tenant X,
// multiple Index Gateway instances are expected to be returned as Index Gateway might be busy/locked for specific
// reasons (this is assured by the spikey behavior of Index Gateway latencies).
f.IntVar(&cfg.Ring.ReplicationFactor, "replication-factor", ReplicationFactor, "Deprecated: How many index gateway instances are assigned to each tenant. Use -index-gateway.shard-size instead. The shard size is also a per-tenant setting.")
}
func (cfg *Config) Validate() error {
if cfg.Ring.NumTokens != NumTokens {
return errors.New("Num tokens must not be changed as it will not take effect")
}
return nil
}

@ -0,0 +1,55 @@
package flagext
import (
"flag"
"time"
)
type FlagSetWithSkip struct {
*flag.FlagSet
skip map[string]struct{}
}
func NewFlagSetWithSkip(f *flag.FlagSet, skip []string) *FlagSetWithSkip {
skipMap := make(map[string]struct{}, len(skip))
for _, s := range skip {
skipMap[s] = struct{}{}
}
return &FlagSetWithSkip{f, skipMap}
}
func (f *FlagSetWithSkip) ToFlagSet() *flag.FlagSet {
return f.FlagSet
}
func (f *FlagSetWithSkip) DurationVar(p *time.Duration, name string, value time.Duration, usage string) {
if _, ok := f.skip[name]; !ok {
f.FlagSet.DurationVar(p, name, value, usage)
}
}
func (f *FlagSetWithSkip) StringVar(p *string, name string, value string, usage string) {
if _, ok := f.skip[name]; !ok {
f.FlagSet.StringVar(p, name, value, usage)
}
}
func (f *FlagSetWithSkip) BoolVar(p *bool, name string, value bool, usage string) {
if _, ok := f.skip[name]; !ok {
f.FlagSet.BoolVar(p, name, value, usage)
}
}
func (f *FlagSetWithSkip) IntVar(p *int, name string, value int, usage string) {
if _, ok := f.skip[name]; !ok {
f.FlagSet.IntVar(p, name, value, usage)
}
}
func (f *FlagSetWithSkip) Var(value flag.Value, name string, usage string) {
if _, ok := f.skip[name]; !ok {
f.FlagSet.Var(value, name, usage)
}
}
// TODO: Add more methods as needed.

@ -15,6 +15,7 @@ import (
"github.com/grafana/dskit/netutil"
"github.com/grafana/dskit/ring"
util_flagext "github.com/grafana/loki/pkg/util/flagext"
util_log "github.com/grafana/loki/pkg/util/log"
)
@ -28,6 +29,8 @@ type RingConfig struct { // nolint:revive
HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"`
TokensFilePath string `yaml:"tokens_file_path"`
ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"`
NumTokens int `yaml:"num_tokens"`
ReplicationFactor int `yaml:"replication_factor"`
// Instance details
InstanceID string `yaml:"instance_id" doc:"default=<hostname>"`
@ -45,7 +48,9 @@ type RingConfig struct { // nolint:revive
// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
// storePrefix is used to set the path in the KVStore and should end with a /
func (cfg *RingConfig) RegisterFlagsWithPrefix(flagsPrefix, storePrefix string, f *flag.FlagSet) {
func (cfg *RingConfig) RegisterFlagsWithPrefix(flagsPrefix, storePrefix string, fs *flag.FlagSet, skip ...string) {
f := util_flagext.NewFlagSetWithSkip(fs, skip)
hostname, err := os.Hostname()
if err != nil {
level.Error(util_log.Logger).Log("msg", "failed to get hostname", "err", err)
@ -53,11 +58,13 @@ func (cfg *RingConfig) RegisterFlagsWithPrefix(flagsPrefix, storePrefix string,
}
// Ring flags
cfg.KVStore.RegisterFlagsWithPrefix(flagsPrefix+"ring.", storePrefix, f)
cfg.KVStore.RegisterFlagsWithPrefix(flagsPrefix+"ring.", storePrefix, f.ToFlagSet())
f.DurationVar(&cfg.HeartbeatPeriod, flagsPrefix+"ring.heartbeat-period", 15*time.Second, "Period at which to heartbeat to the ring. 0 = disabled.")
f.DurationVar(&cfg.HeartbeatTimeout, flagsPrefix+"ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which compactors are considered unhealthy within the ring. 0 = never (timeout disabled).")
f.StringVar(&cfg.TokensFilePath, flagsPrefix+"ring.tokens-file-path", "", "File path where tokens are stored. If empty, tokens are not stored at shutdown and restored at startup.")
f.BoolVar(&cfg.ZoneAwarenessEnabled, flagsPrefix+"ring.zone-awareness-enabled", false, "True to enable zone-awareness and replicate blocks across different availability zones.")
f.IntVar(&cfg.NumTokens, flagsPrefix+"ring.num-tokens", 128, "Number of tokens to own in the ring.")
f.IntVar(&cfg.ReplicationFactor, flagsPrefix+"ring.replication-factor", 3, "Factor for data replication.")
// Instance flags
cfg.InstanceInterfaceNames = netutil.PrivateNetworkInterfacesWithFallback([]string{"eth0", "en0"}, util_log.Logger)
@ -123,18 +130,3 @@ func (cfg *RingConfig) ToRingConfig(replicationFactor int) ring.Config {
return rc
}
// RingConfigWithRF is a wrapper for our internally used ring configuration plus the replication factor.
type RingConfigWithRF struct { // nolint:revive
// RingConfig configures the ring.
RingConfig `yaml:",inline"`
// ReplicationFactor defines how many replicas store a single data shard.
ReplicationFactor int `yaml:"replication_factor"`
}
// RegisterFlagsWithPrefix registers all Bloom Gateway CLI flags.
func (cfg *RingConfigWithRF) RegisterFlagsWithPrefix(prefix, storePrefix string, f *flag.FlagSet) {
cfg.RingConfig.RegisterFlagsWithPrefix(prefix, storePrefix, f)
f.IntVar(&cfg.ReplicationFactor, prefix+"replication-factor", 3, "Factor for data replication.")
}

@ -27,18 +27,20 @@ func (w *specWriter) writeConfigBlock(b *parse.ConfigBlock, indent int) {
return
}
var written bool
for i, entry := range b.Entries {
// Add a new line to separate from the previous entry
if i > 0 {
if written && i > 0 {
w.out.WriteString("\n")
}
w.writeConfigEntry(entry, indent)
written = w.writeConfigEntry(entry, indent)
}
}
// nolint:goconst
func (w *specWriter) writeConfigEntry(e *parse.ConfigEntry, indent int) {
func (w *specWriter) writeConfigEntry(e *parse.ConfigEntry, indent int) (written bool) {
written = true
if e.Kind == parse.KindBlock {
// If the block is a root block it will have its dedicated section in the doc,
// so here we've just to write down the reference without re-iterating on it.
@ -64,6 +66,11 @@ func (w *specWriter) writeConfigEntry(e *parse.ConfigEntry, indent int) {
}
if e.Kind == parse.KindField || e.Kind == parse.KindSlice || e.Kind == parse.KindMap {
if strings.HasPrefix(e.Description(), "IGNORED:") {
// We skip documenting any field whose description starts with "IGNORED:".
return false
}
// Description
w.writeComment(e.Description(), indent, 0)
w.writeExample(e.FieldExample, indent)
@ -87,6 +94,8 @@ func (w *specWriter) writeConfigEntry(e *parse.ConfigEntry, indent int) {
w.out.WriteString(pad(indent) + "[" + e.Name + ": <" + e.FieldType + ">" + defaultValue + "]\n")
}
}
return written
}
func (w *specWriter) writeFlag(name string, indent int) {

Loading…
Cancel
Save