feat: wire up ingest-limits service (#16660)

pull/16629/head
George Robinson 3 months ago committed by GitHub
parent 0be4897a41
commit 18d143befe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 344
      docs/sources/shared/configuration.md
  2. 8
      pkg/limits/frontend/client/client.go
  3. 14
      pkg/limits/frontend/frontend.go
  4. 4
      pkg/limits/ingest_limits.go
  5. 57
      pkg/loki/config_wrapper.go
  6. 102
      pkg/loki/config_wrapper_test.go
  7. 52
      pkg/loki/loki.go
  8. 123
      pkg/loki/modules.go

@ -874,6 +874,340 @@ dataobj:
# CLI flag: -dataobj-storage-bucket-prefix
[storage_bucket_prefix: <string> | default = "dataobj/"]
ingest_limits:
# Enable the ingest limits service.
# CLI flag: -ingest-limits.enabled
[enabled: <boolean> | default = false]
# The time window for which stream metadata is considered active.
# CLI flag: -ingest-limits.window-size
[window_size: <duration> | default = 1h]
lifecycler:
ring:
kvstore:
# Backend storage to use for the ring. Supported values are: consul,
# etcd, inmemory, memberlist, multi.
# CLI flag: -ingest-limits.store
[store: <string> | default = "consul"]
# The prefix for the keys in the store. Should end with a /.
# CLI flag: -ingest-limits.prefix
[prefix: <string> | default = "collectors/"]
# Configuration for a Consul client. Only applies if the selected
# kvstore is consul.
# The CLI flags prefix for this block configuration is: ingest-limits
[consul: <consul>]
# Configuration for an ETCD v3 client. Only applies if the selected
# kvstore is etcd.
# The CLI flags prefix for this block configuration is: ingest-limits
[etcd: <etcd>]
multi:
# Primary backend storage used by multi-client.
# CLI flag: -ingest-limits.multi.primary
[primary: <string> | default = ""]
# Secondary backend storage used by multi-client.
# CLI flag: -ingest-limits.multi.secondary
[secondary: <string> | default = ""]
# Mirror writes to secondary store.
# CLI flag: -ingest-limits.multi.mirror-enabled
[mirror_enabled: <boolean> | default = false]
# Timeout for storing value to secondary store.
# CLI flag: -ingest-limits.multi.mirror-timeout
[mirror_timeout: <duration> | default = 2s]
# The heartbeat timeout after which ingesters are skipped for
# reads/writes. 0 = never (timeout disabled).
# CLI flag: -ingest-limits.ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]
# The number of ingesters to write to and read from.
# CLI flag: -ingest-limits.distributor.replication-factor
[replication_factor: <int> | default = 3]
# True to enable the zone-awareness and replicate ingested samples across
# different availability zones.
# CLI flag: -ingest-limits.distributor.zone-awareness-enabled
[zone_awareness_enabled: <boolean> | default = false]
# Comma-separated list of zones to exclude from the ring. Instances in
# excluded zones will be filtered out from the ring.
# CLI flag: -ingest-limits.distributor.excluded-zones
[excluded_zones: <string> | default = ""]
# Number of tokens for each ingester.
# CLI flag: -ingest-limits.num-tokens
[num_tokens: <int> | default = 128]
# Period at which to heartbeat to consul. 0 = disabled.
# CLI flag: -ingest-limits.heartbeat-period
[heartbeat_period: <duration> | default = 5s]
# Heartbeat timeout after which instance is assumed to be unhealthy. 0 =
# disabled.
# CLI flag: -ingest-limits.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]
# Observe tokens after generating to resolve collisions. Useful when using
# gossiping ring.
# CLI flag: -ingest-limits.observe-period
[observe_period: <duration> | default = 0s]
# Period to wait for a claim from another member; will join automatically
# after this.
# CLI flag: -ingest-limits.join-after
[join_after: <duration> | default = 0s]
# Minimum duration to wait after the internal readiness checks have passed
# but before succeeding the readiness endpoint. This is used to slowdown
# deployment controllers (eg. Kubernetes) after an instance is ready and
# before they proceed with a rolling update, to give the rest of the cluster
# instances enough time to receive ring updates.
# CLI flag: -ingest-limits.min-ready-duration
[min_ready_duration: <duration> | default = 15s]
# Name of network interface to read address from.
# CLI flag: -ingest-limits.lifecycler.interface
[interface_names: <list of strings> | default = [<private network interfaces>]]
# Enable IPv6 support. Required to make use of IP addresses from IPv6
# interfaces.
# CLI flag: -ingest-limits.enable-inet6
[enable_inet6: <boolean> | default = false]
# Duration to sleep for before exiting, to ensure metrics are scraped.
# CLI flag: -ingest-limits.final-sleep
[final_sleep: <duration> | default = 0s]
# File path where tokens are stored. If empty, tokens are not stored at
# shutdown and restored at startup.
# CLI flag: -ingest-limits.tokens-file-path
[tokens_file_path: <string> | default = ""]
# The availability zone where this instance is running.
# CLI flag: -ingest-limits.availability-zone
[availability_zone: <string> | default = ""]
# Unregister from the ring upon clean shutdown. It can be useful to disable
# for rolling restarts with consistent naming in conjunction with
# -distributor.extend-writes=false.
# CLI flag: -ingest-limits.unregister-on-shutdown
[unregister_on_shutdown: <boolean> | default = true]
# When enabled the readiness probe succeeds only after all instances are
# ACTIVE and healthy in the ring, otherwise only the instance itself is
# checked. This option should be disabled if in your cluster multiple
# instances can be rolled out simultaneously, otherwise rolling updates may
# be slowed down.
# CLI flag: -ingest-limits.readiness-check-ring-health
[readiness_check_ring_health: <boolean> | default = true]
# IP address to advertise in the ring.
# CLI flag: -ingest-limits.lifecycler.addr
[address: <string> | default = ""]
# port to advertise in consul (defaults to server.grpc-listen-port).
# CLI flag: -ingest-limits.lifecycler.port
[port: <int> | default = 0]
# ID to register in the ring.
# CLI flag: -ingest-limits.lifecycler.ID
[id: <string> | default = "<hostname>"]
# The number of partitions for the Kafka topic used to read and write stream
# metadata. It is fixed, not a maximum.
# CLI flag: -ingest-limits.num-partitions
[num_partitions: <int> | default = 64]
ingest_limits_frontend:
client_config:
# Configures client gRPC connections to limits service.
# The CLI flags prefix for this block configuration is:
# ingest-limits-frontend.limits-client
[grpc_client_config: <grpc_client>]
# Configures client gRPC connections pool to limits service.
pool_config:
# How frequently to clean up clients for ingest-limits that have gone
# away.
# CLI flag: -ingest-limits-frontend.client-cleanup-period
[client_cleanup_period: <duration> | default = 15s]
# Run a health check on each ingest-limits client during periodic cleanup.
# CLI flag: -ingest-limits-frontend.health-check-ingest-limits
[health_check_ingest_limits: <boolean> | default = true]
# Timeout for the health check.
# CLI flag: -ingest-limits-frontend.remote-timeout
[remote_timeout: <duration> | default = 1s]
lifecycler:
ring:
kvstore:
# Backend storage to use for the ring. Supported values are: consul,
# etcd, inmemory, memberlist, multi.
# CLI flag: -ingest-limits-frontend.store
[store: <string> | default = "consul"]
# The prefix for the keys in the store. Should end with a /.
# CLI flag: -ingest-limits-frontend.prefix
[prefix: <string> | default = "collectors/"]
# Configuration for a Consul client. Only applies if the selected
# kvstore is consul.
# The CLI flags prefix for this block configuration is:
# ingest-limits-frontend
[consul: <consul>]
# Configuration for an ETCD v3 client. Only applies if the selected
# kvstore is etcd.
# The CLI flags prefix for this block configuration is:
# ingest-limits-frontend
[etcd: <etcd>]
multi:
# Primary backend storage used by multi-client.
# CLI flag: -ingest-limits-frontend.multi.primary
[primary: <string> | default = ""]
# Secondary backend storage used by multi-client.
# CLI flag: -ingest-limits-frontend.multi.secondary
[secondary: <string> | default = ""]
# Mirror writes to secondary store.
# CLI flag: -ingest-limits-frontend.multi.mirror-enabled
[mirror_enabled: <boolean> | default = false]
# Timeout for storing value to secondary store.
# CLI flag: -ingest-limits-frontend.multi.mirror-timeout
[mirror_timeout: <duration> | default = 2s]
# The heartbeat timeout after which ingesters are skipped for
# reads/writes. 0 = never (timeout disabled).
# CLI flag: -ingest-limits-frontend.ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]
# The number of ingesters to write to and read from.
# CLI flag: -ingest-limits-frontend.distributor.replication-factor
[replication_factor: <int> | default = 3]
# True to enable the zone-awareness and replicate ingested samples across
# different availability zones.
# CLI flag: -ingest-limits-frontend.distributor.zone-awareness-enabled
[zone_awareness_enabled: <boolean> | default = false]
# Comma-separated list of zones to exclude from the ring. Instances in
# excluded zones will be filtered out from the ring.
# CLI flag: -ingest-limits-frontend.distributor.excluded-zones
[excluded_zones: <string> | default = ""]
# Number of tokens for each ingester.
# CLI flag: -ingest-limits-frontend.num-tokens
[num_tokens: <int> | default = 128]
# Period at which to heartbeat to consul. 0 = disabled.
# CLI flag: -ingest-limits-frontend.heartbeat-period
[heartbeat_period: <duration> | default = 5s]
# Heartbeat timeout after which instance is assumed to be unhealthy. 0 =
# disabled.
# CLI flag: -ingest-limits-frontend.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]
# Observe tokens after generating to resolve collisions. Useful when using
# gossiping ring.
# CLI flag: -ingest-limits-frontend.observe-period
[observe_period: <duration> | default = 0s]
# Period to wait for a claim from another member; will join automatically
# after this.
# CLI flag: -ingest-limits-frontend.join-after
[join_after: <duration> | default = 0s]
# Minimum duration to wait after the internal readiness checks have passed
# but before succeeding the readiness endpoint. This is used to slowdown
# deployment controllers (eg. Kubernetes) after an instance is ready and
# before they proceed with a rolling update, to give the rest of the cluster
# instances enough time to receive ring updates.
# CLI flag: -ingest-limits-frontend.min-ready-duration
[min_ready_duration: <duration> | default = 15s]
# Name of network interface to read address from.
# CLI flag: -ingest-limits-frontend.lifecycler.interface
[interface_names: <list of strings> | default = [<private network interfaces>]]
# Enable IPv6 support. Required to make use of IP addresses from IPv6
# interfaces.
# CLI flag: -ingest-limits-frontend.enable-inet6
[enable_inet6: <boolean> | default = false]
# Duration to sleep for before exiting, to ensure metrics are scraped.
# CLI flag: -ingest-limits-frontend.final-sleep
[final_sleep: <duration> | default = 0s]
# File path where tokens are stored. If empty, tokens are not stored at
# shutdown and restored at startup.
# CLI flag: -ingest-limits-frontend.tokens-file-path
[tokens_file_path: <string> | default = ""]
# The availability zone where this instance is running.
# CLI flag: -ingest-limits-frontend.availability-zone
[availability_zone: <string> | default = ""]
# Unregister from the ring upon clean shutdown. It can be useful to disable
# for rolling restarts with consistent naming in conjunction with
# -distributor.extend-writes=false.
# CLI flag: -ingest-limits-frontend.unregister-on-shutdown
[unregister_on_shutdown: <boolean> | default = true]
# When enabled the readiness probe succeeds only after all instances are
# ACTIVE and healthy in the ring, otherwise only the instance itself is
# checked. This option should be disabled if in your cluster multiple
# instances can be rolled out simultaneously, otherwise rolling updates may
# be slowed down.
# CLI flag: -ingest-limits-frontend.readiness-check-ring-health
[readiness_check_ring_health: <boolean> | default = true]
# IP address to advertise in the ring.
# CLI flag: -ingest-limits-frontend.lifecycler.addr
[address: <string> | default = ""]
# port to advertise in consul (defaults to server.grpc-listen-port).
# CLI flag: -ingest-limits-frontend.lifecycler.port
[port: <int> | default = 0]
# ID to register in the ring.
# CLI flag: -ingest-limits-frontend.lifecycler.ID
[id: <string> | default = "<hostname>"]
ingest_limits_frontend_client:
# Configures client gRPC connections to limits service.
# The CLI flags prefix for this block configuration is:
# ingest-limits-frontend-client
[grpc_client_config: <grpc_client>]
# Configures client gRPC connections pool to limits service.
pool_config:
# How frequently to clean up clients for ingest-limits-frontend that have
# gone away.
# CLI flag: -ingest-limits-frontend-client.client-cleanup-period
[client_cleanup_period: <duration> | default = 15s]
# Run a health check on each ingest-limits-frontend client during periodic
# cleanup.
# CLI flag: -ingest-limits-frontend-client.health-check-ingest-limits
[health_check_ingest_limits: <boolean> | default = true]
# Timeout for the health check.
# CLI flag: -ingest-limits-frontend-client.remote-timeout
[remote_timeout: <duration> | default = 1s]
# Configuration for 'runtime config' module, responsible for reloading runtime
# configuration file.
[runtime_config: <runtime_config>]
@ -1980,6 +2314,8 @@ Configuration for a Consul client. Only applies if the selected kvstore is `cons
- `compactor.ring`
- `distributor.ring`
- `index-gateway.ring`
- `ingest-limits`
- `ingest-limits-frontend`
- `ingester.partition-ring`
- `pattern-ingester`
- `query-scheduler.ring`
@ -2234,6 +2570,8 @@ Configuration for an ETCD v3 client. Only applies if the selected kvstore is `et
- `compactor.ring`
- `distributor.ring`
- `index-gateway.ring`
- `ingest-limits`
- `ingest-limits-frontend`
- `ingester.partition-ring`
- `pattern-ingester`
- `query-scheduler.ring`
@ -2476,6 +2814,8 @@ The `grpc_client` block configures the gRPC client used to communicate between a
- `boltdb.shipper.index-gateway-client.grpc`
- `compactor.grpc-client`
- `frontend.grpc-client-config`
- `ingest-limits-frontend-client`
- `ingest-limits-frontend.limits-client`
- `ingester.client`
- `pattern-ingester.client`
- `querier.frontend-client`
@ -6478,6 +6818,10 @@ The TLS configuration. The supported CLI flags `<prefix>` used to reference this
- `frontend.tail-tls-config`
- `frontend.volume-results-cache.memcached`
- `index-gateway.ring.etcd`
- `ingest-limits-frontend-client`
- `ingest-limits-frontend.etcd`
- `ingest-limits-frontend.limits-client`
- `ingest-limits.etcd`
- `ingester.client`
- `ingester.partition-ring.etcd`
- `memberlist`

@ -5,6 +5,7 @@ package client
import (
"flag"
"fmt"
"io"
"time"
@ -62,6 +63,13 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.PoolConfig.RegisterFlagsWithPrefix(prefix, f)
}
func (cfg *Config) Validate() error {
if err := cfg.GRPCClientConfig.Validate(); err != nil {
return fmt.Errorf("invalid gRPC client config: %w", err)
}
return nil
}
// PoolConfig contains the config for a pool of ingest-limits-frontend clients.
type PoolConfig struct {
ClientCleanupPeriod time.Duration `yaml:"client_cleanup_period"`

@ -151,6 +151,20 @@ func (f *Frontend) ExceedsLimits(ctx context.Context, r *logproto.ExceedsLimitsR
return f.limits.ExceedsLimits(ctx, r)
}
func (f *Frontend) CheckReady(ctx context.Context) error {
if f.State() != services.Running && f.State() != services.Stopping {
return fmt.Errorf("ingest limits frontend not ready: %v", f.State())
}
err := f.lifecycler.CheckReady(ctx)
if err != nil {
level.Error(f.logger).Log("msg", "ingest limits frontend not ready", "err", err)
return err
}
return nil
}
type exceedsLimitsRequest struct {
TenantID string `json:"tenantID"`
StreamHashes []uint64 `json:"streamHashes"`

@ -85,6 +85,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.NumPartitions, "ingest-limits.num-partitions", 64, "The number of partitions for the Kafka topic used to read and write stream metadata. It is fixed, not a maximum.")
}
func (cfg *Config) Validate() error {
return nil
}
type metrics struct {
tenantStreamEvictionsTotal *prometheus.CounterVec

@ -262,6 +262,38 @@ func applyConfigToRings(r, defaults *ConfigWrapper, rc lokiring.RingConfig, merg
r.Pattern.LifecyclerConfig.ObservePeriod = rc.ObservePeriod
}
// IngestLimits
if mergeWithExisting || reflect.DeepEqual(r.IngestLimits.LifecyclerConfig.RingConfig, defaults.IngestLimits.LifecyclerConfig.RingConfig) {
r.IngestLimits.LifecyclerConfig.RingConfig.KVStore = rc.KVStore
r.IngestLimits.LifecyclerConfig.HeartbeatPeriod = rc.HeartbeatPeriod
r.IngestLimits.LifecyclerConfig.RingConfig.HeartbeatTimeout = rc.HeartbeatTimeout
r.IngestLimits.LifecyclerConfig.TokensFilePath = rc.TokensFilePath
r.IngestLimits.LifecyclerConfig.RingConfig.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled
r.IngestLimits.LifecyclerConfig.ID = rc.InstanceID
r.IngestLimits.LifecyclerConfig.InfNames = rc.InstanceInterfaceNames
r.IngestLimits.LifecyclerConfig.Port = rc.InstancePort
r.IngestLimits.LifecyclerConfig.Addr = rc.InstanceAddr
r.IngestLimits.LifecyclerConfig.Zone = rc.InstanceZone
r.IngestLimits.LifecyclerConfig.ListenPort = rc.ListenPort
r.IngestLimits.LifecyclerConfig.ObservePeriod = rc.ObservePeriod
}
// IngestLimitsFrontend
if mergeWithExisting || reflect.DeepEqual(r.IngestLimitsFrontend.LifecyclerConfig.RingConfig, defaults.IngestLimitsFrontend.LifecyclerConfig.RingConfig) {
r.IngestLimitsFrontend.LifecyclerConfig.RingConfig.KVStore = rc.KVStore
r.IngestLimitsFrontend.LifecyclerConfig.HeartbeatPeriod = rc.HeartbeatPeriod
r.IngestLimitsFrontend.LifecyclerConfig.RingConfig.HeartbeatTimeout = rc.HeartbeatTimeout
r.IngestLimitsFrontend.LifecyclerConfig.TokensFilePath = rc.TokensFilePath
r.IngestLimitsFrontend.LifecyclerConfig.RingConfig.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled
r.IngestLimitsFrontend.LifecyclerConfig.ID = rc.InstanceID
r.IngestLimitsFrontend.LifecyclerConfig.InfNames = rc.InstanceInterfaceNames
r.IngestLimitsFrontend.LifecyclerConfig.Port = rc.InstancePort
r.IngestLimitsFrontend.LifecyclerConfig.Addr = rc.InstanceAddr
r.IngestLimitsFrontend.LifecyclerConfig.Zone = rc.InstanceZone
r.IngestLimitsFrontend.LifecyclerConfig.ListenPort = rc.ListenPort
r.IngestLimitsFrontend.LifecyclerConfig.ObservePeriod = rc.ObservePeriod
}
// Distributor
if mergeWithExisting || reflect.DeepEqual(r.Distributor.DistributorRing, defaults.Distributor.DistributorRing) {
r.Distributor.DistributorRing.HeartbeatTimeout = rc.HeartbeatTimeout
@ -332,6 +364,20 @@ func applyTokensFilePath(cfg *ConfigWrapper) error {
}
cfg.Ingester.LifecyclerConfig.TokensFilePath = f
// IngestLimits
f, err = tokensFile(cfg, "ingestlimits.tokens")
if err != nil {
return err
}
cfg.IngestLimits.LifecyclerConfig.TokensFilePath = f
// IngestLimitsFrontend
f, err = tokensFile(cfg, "ingestlimitsfrontend.tokens")
if err != nil {
return err
}
cfg.IngestLimitsFrontend.LifecyclerConfig.TokensFilePath = f
// Compactor
f, err = tokensFile(cfg, "compactor.tokens")
if err != nil {
@ -414,6 +460,15 @@ func appendLoopbackInterface(cfg, defaults *ConfigWrapper) {
if reflect.DeepEqual(cfg.Ingester.LifecyclerConfig.InfNames, defaults.Ingester.LifecyclerConfig.InfNames) {
cfg.Ingester.LifecyclerConfig.InfNames = append(cfg.Ingester.LifecyclerConfig.InfNames, loopbackIface)
}
if reflect.DeepEqual(cfg.IngestLimits.LifecyclerConfig.InfNames, defaults.IngestLimits.LifecyclerConfig.InfNames) {
cfg.IngestLimits.LifecyclerConfig.InfNames = append(cfg.IngestLimits.LifecyclerConfig.InfNames, loopbackIface)
}
if reflect.DeepEqual(cfg.IngestLimitsFrontend.LifecyclerConfig.InfNames, defaults.IngestLimitsFrontend.LifecyclerConfig.InfNames) {
cfg.IngestLimitsFrontend.LifecyclerConfig.InfNames = append(cfg.IngestLimitsFrontend.LifecyclerConfig.InfNames, loopbackIface)
}
if reflect.DeepEqual(cfg.Pattern.LifecyclerConfig.InfNames, defaults.Pattern.LifecyclerConfig.InfNames) {
cfg.Pattern.LifecyclerConfig.InfNames = append(cfg.Pattern.LifecyclerConfig.InfNames, loopbackIface)
}
@ -453,6 +508,8 @@ func appendLoopbackInterface(cfg, defaults *ConfigWrapper) {
// (for example, use consul for the distributor), it seems harmless to take a guess at better defaults here.
func applyMemberlistConfig(r *ConfigWrapper) {
r.Ingester.LifecyclerConfig.RingConfig.KVStore.Store = memberlistStr
r.IngestLimits.LifecyclerConfig.RingConfig.KVStore.Store = memberlistStr
r.IngestLimitsFrontend.LifecyclerConfig.RingConfig.KVStore.Store = memberlistStr
r.Pattern.LifecyclerConfig.RingConfig.KVStore.Store = memberlistStr
r.Distributor.DistributorRing.KVStore.Store = memberlistStr
r.Ruler.Ring.KVStore.Store = memberlistStr

@ -148,11 +148,15 @@ common:
// * ingester
// * distributor
// * ruler
// * ingest limits
// * ingest limits frontend
t.Run("does not automatically configure memberlist when no top-level memberlist config is provided", func(t *testing.T) {
config, defaults := testContext(emptyConfigString, nil)
assert.EqualValues(t, defaults.Ingester.LifecyclerConfig.RingConfig.KVStore.Store, config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store)
assert.EqualValues(t, defaults.IngestLimits.LifecyclerConfig.RingConfig.KVStore.Store, config.IngestLimits.LifecyclerConfig.RingConfig.KVStore.Store)
assert.EqualValues(t, defaults.IngestLimitsFrontend.LifecyclerConfig.RingConfig.KVStore.Store, config.IngestLimitsFrontend.LifecyclerConfig.RingConfig.KVStore.Store)
})
t.Run("when top-level memberlist join_members are provided, all applicable rings are defaulted to use memberlist", func(t *testing.T) {
@ -166,6 +170,8 @@ memberlist:
assert.EqualValues(t, memberlistStr, config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store)
assert.EqualValues(t, memberlistStr, config.Distributor.DistributorRing.KVStore.Store)
assert.EqualValues(t, memberlistStr, config.Ruler.Ring.KVStore.Store)
assert.EqualValues(t, memberlistStr, config.IngestLimits.LifecyclerConfig.RingConfig.KVStore.Store)
assert.EqualValues(t, memberlistStr, config.IngestLimitsFrontend.LifecyclerConfig.RingConfig.KVStore.Store)
})
t.Run("explicit ring configs provided via config file are preserved", func(t *testing.T) {
@ -184,6 +190,8 @@ distributor:
assert.EqualValues(t, memberlistStr, config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store)
assert.EqualValues(t, memberlistStr, config.Ruler.Ring.KVStore.Store)
assert.EqualValues(t, memberlistStr, config.IngestLimits.LifecyclerConfig.RingConfig.KVStore.Store)
assert.EqualValues(t, memberlistStr, config.IngestLimitsFrontend.LifecyclerConfig.RingConfig.KVStore.Store)
})
t.Run("explicit ring configs provided via command line are preserved", func(t *testing.T) {
@ -199,6 +207,8 @@ memberlist:
assert.EqualValues(t, memberlistStr, config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store)
assert.EqualValues(t, memberlistStr, config.Distributor.DistributorRing.KVStore.Store)
assert.EqualValues(t, memberlistStr, config.IngestLimits.LifecyclerConfig.RingConfig.KVStore.Store)
assert.EqualValues(t, memberlistStr, config.IngestLimitsFrontend.LifecyclerConfig.RingConfig.KVStore.Store)
})
})
@ -1515,6 +1525,8 @@ common:
assert.NoError(t, err)
assert.Equal(t, "", config.Ingester.LifecyclerConfig.TokensFilePath)
assert.Equal(t, "", config.IngestLimits.LifecyclerConfig.TokensFilePath)
assert.Equal(t, "", config.IngestLimitsFrontend.LifecyclerConfig.TokensFilePath)
assert.Equal(t, "", config.CompactorConfig.CompactorRing.TokensFilePath)
assert.Equal(t, "", config.QueryScheduler.SchedulerRing.TokensFilePath)
assert.Equal(t, "", config.IndexGateway.Ring.TokensFilePath)
@ -1530,6 +1542,8 @@ common:
assert.NoError(t, err)
assert.Equal(t, "/loki/ingester.tokens", config.Ingester.LifecyclerConfig.TokensFilePath)
assert.Equal(t, "/loki/ingestlimits.tokens", config.IngestLimits.LifecyclerConfig.TokensFilePath)
assert.Equal(t, "/loki/ingestlimitsfrontend.tokens", config.IngestLimitsFrontend.LifecyclerConfig.TokensFilePath)
assert.Equal(t, "/loki/compactor.tokens", config.CompactorConfig.CompactorRing.TokensFilePath)
assert.Equal(t, "/loki/scheduler.tokens", config.QueryScheduler.SchedulerRing.TokensFilePath)
assert.Equal(t, "/loki/indexgateway.tokens", config.IndexGateway.Ring.TokensFilePath)
@ -1540,6 +1554,12 @@ common:
ingester:
lifecycler:
tokens_file_path: /loki/toookens
ingest_limits:
lifecycler:
tokens_file_path: /limits/toookens
ingest_limits_frontend:
lifecycler:
tokens_file_path: /limitsfrontend/toookens
compactor:
compactor_ring:
tokens_file_path: /foo/tokens
@ -1557,6 +1577,8 @@ common:
assert.NoError(t, err)
assert.Equal(t, "/loki/toookens", config.Ingester.LifecyclerConfig.TokensFilePath)
assert.Equal(t, "/limits/toookens", config.IngestLimits.LifecyclerConfig.TokensFilePath)
assert.Equal(t, "/limitsfrontend/toookens", config.IngestLimitsFrontend.LifecyclerConfig.TokensFilePath)
assert.Equal(t, "/foo/tokens", config.CompactorConfig.CompactorRing.TokensFilePath)
assert.Equal(t, "/sched/tokes", config.QueryScheduler.SchedulerRing.TokensFilePath)
assert.Equal(t, "/looki/tookens", config.IndexGateway.Ring.TokensFilePath)
@ -1574,6 +1596,8 @@ ingester:
assert.NoError(t, err)
assert.Equal(t, "etcd", config.Distributor.DistributorRing.KVStore.Store)
assert.Equal(t, "etcd", config.IngestLimits.LifecyclerConfig.RingConfig.KVStore.Store)
assert.Equal(t, "etcd", config.IngestLimitsFrontend.LifecyclerConfig.RingConfig.KVStore.Store)
assert.Equal(t, "etcd", config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store)
assert.Equal(t, "etcd", config.Ruler.Ring.KVStore.Store)
assert.Equal(t, "etcd", config.QueryScheduler.SchedulerRing.KVStore.Store)
@ -1597,6 +1621,8 @@ ingester:
assert.Equal(t, "etcd", config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store)
assert.Equal(t, "memberlist", config.IngestLimits.LifecyclerConfig.RingConfig.KVStore.Store)
assert.Equal(t, "memberlist", config.IngestLimitsFrontend.LifecyclerConfig.RingConfig.KVStore.Store)
assert.Equal(t, "memberlist", config.Distributor.DistributorRing.KVStore.Store)
assert.Equal(t, "memberlist", config.Ruler.Ring.KVStore.Store)
assert.Equal(t, "memberlist", config.QueryScheduler.SchedulerRing.KVStore.Store)
@ -1616,6 +1642,8 @@ func TestRingInterfaceNames(t *testing.T) {
assert.Contains(t, config.Common.Ring.InstanceInterfaceNames, defaultIface)
assert.Contains(t, config.Ingester.LifecyclerConfig.InfNames, defaultIface)
assert.Contains(t, config.IngestLimits.LifecyclerConfig.InfNames, defaultIface)
assert.Contains(t, config.IngestLimitsFrontend.LifecyclerConfig.InfNames, defaultIface)
assert.Contains(t, config.Distributor.DistributorRing.InstanceInterfaceNames, defaultIface)
assert.Contains(t, config.QueryScheduler.SchedulerRing.InstanceInterfaceNames, defaultIface)
assert.Contains(t, config.Ruler.Ring.InstanceInterfaceNames, defaultIface)
@ -1630,6 +1658,8 @@ func TestRingInterfaceNames(t *testing.T) {
config, _, err := configWrapperFromYAML(t, yamlContent, []string{})
assert.NoError(t, err)
assert.Equal(t, config.Distributor.DistributorRing.InstanceInterfaceNames, []string{"ingesteriface"})
assert.Equal(t, config.IngestLimits.LifecyclerConfig.InfNames, []string{"ingesteriface"})
assert.Equal(t, config.IngestLimitsFrontend.LifecyclerConfig.InfNames, []string{"ingesteriface"})
assert.Equal(t, config.QueryScheduler.SchedulerRing.InstanceInterfaceNames, []string{"ingesteriface"})
assert.Equal(t, config.Ruler.Ring.InstanceInterfaceNames, []string{"ingesteriface"})
assert.Equal(t, config.Ingester.LifecyclerConfig.InfNames, []string{"ingesteriface"})
@ -1640,6 +1670,14 @@ func TestRingInterfaceNames(t *testing.T) {
ring:
instance_interface_names:
- distributoriface
ingest_limits:
lifecycler:
interface_names:
- ingestlimitsiface
ingest_limits_frontend:
lifecycler:
interface_names:
- ingestlimitsfrontendiface
ruler:
ring:
instance_interface_names:
@ -1656,6 +1694,8 @@ ingester:
config, _, err := configWrapperFromYAML(t, yamlContent, []string{})
assert.NoError(t, err)
assert.Equal(t, config.Ingester.LifecyclerConfig.InfNames, []string{"ingesteriface"})
assert.Equal(t, config.IngestLimits.LifecyclerConfig.InfNames, []string{"ingestlimitsiface"})
assert.Equal(t, config.IngestLimitsFrontend.LifecyclerConfig.InfNames, []string{"ingestlimitsfrontendiface"})
assert.Equal(t, config.Distributor.DistributorRing.InstanceInterfaceNames, []string{"distributoriface"})
assert.Equal(t, config.QueryScheduler.SchedulerRing.InstanceInterfaceNames, []string{"scheduleriface"})
assert.Equal(t, config.Ruler.Ring.InstanceInterfaceNames, []string{"ruleriface"})
@ -1666,6 +1706,14 @@ ingester:
ring:
instance_interface_names:
- distributoriface
ingest_limits:
lifecycler:
interface_names:
- ingestlimitsiface
ingest_limits_frontend:
lifecycler:
interface_names:
- ingestlimitsfrontendiface
ruler:
ring:
instance_interface_names:
@ -1677,6 +1725,8 @@ query_scheduler:
config, _, err := configWrapperFromYAML(t, yamlContent, []string{})
assert.NoError(t, err)
assert.Equal(t, config.IngestLimits.LifecyclerConfig.InfNames, []string{"ingestlimitsiface"})
assert.Equal(t, config.IngestLimitsFrontend.LifecyclerConfig.InfNames, []string{"ingestlimitsfrontendiface"})
assert.Equal(t, config.Distributor.DistributorRing.InstanceInterfaceNames, []string{"distributoriface"})
assert.Equal(t, config.QueryScheduler.SchedulerRing.InstanceInterfaceNames, []string{"scheduleriface"})
assert.Equal(t, config.Ruler.Ring.InstanceInterfaceNames, []string{"ruleriface"})
@ -1749,6 +1799,8 @@ func TestCommonRingConfigSection(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, "etcd", config.Distributor.DistributorRing.KVStore.Store)
assert.Equal(t, "etcd", config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store)
assert.Equal(t, "etcd", config.IngestLimits.LifecyclerConfig.RingConfig.KVStore.Store)
assert.Equal(t, "etcd", config.IngestLimitsFrontend.LifecyclerConfig.RingConfig.KVStore.Store)
assert.Equal(t, "etcd", config.Ruler.Ring.KVStore.Store)
assert.Equal(t, "etcd", config.QueryScheduler.SchedulerRing.KVStore.Store)
assert.Equal(t, "etcd", config.CompactorConfig.CompactorRing.KVStore.Store)
@ -1775,6 +1827,8 @@ ingester:
assert.Equal(t, "etcd", config.QueryScheduler.SchedulerRing.KVStore.Store)
assert.Equal(t, "etcd", config.CompactorConfig.CompactorRing.KVStore.Store)
assert.Equal(t, "etcd", config.IndexGateway.Ring.KVStore.Store)
assert.Equal(t, "etcd", config.IngestLimits.LifecyclerConfig.RingConfig.KVStore.Store)
assert.Equal(t, "etcd", config.IngestLimitsFrontend.LifecyclerConfig.RingConfig.KVStore.Store)
})
t.Run("if only ingester ring is provided, reuse it for all rings", func(t *testing.T) {
@ -1787,6 +1841,8 @@ ingester:
assert.NoError(t, err)
assert.Equal(t, "etcd", config.Distributor.DistributorRing.KVStore.Store)
assert.Equal(t, "etcd", config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store)
assert.Equal(t, "etcd", config.IngestLimits.LifecyclerConfig.RingConfig.KVStore.Store)
assert.Equal(t, "etcd", config.IngestLimitsFrontend.LifecyclerConfig.RingConfig.KVStore.Store)
assert.Equal(t, "etcd", config.Ruler.Ring.KVStore.Store)
assert.Equal(t, "etcd", config.QueryScheduler.SchedulerRing.KVStore.Store)
assert.Equal(t, "etcd", config.CompactorConfig.CompactorRing.KVStore.Store)
@ -1814,6 +1870,12 @@ ingester:
assert.Equal(t, "inmemory", config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store)
assert.Equal(t, 5*time.Minute, config.Ingester.LifecyclerConfig.HeartbeatPeriod)
assert.Equal(t, "inmemory", config.IngestLimits.LifecyclerConfig.RingConfig.KVStore.Store)
assert.Equal(t, 5*time.Minute, config.IngestLimits.LifecyclerConfig.HeartbeatPeriod)
assert.Equal(t, "inmemory", config.IngestLimitsFrontend.LifecyclerConfig.RingConfig.KVStore.Store)
assert.Equal(t, 5*time.Minute, config.IngestLimitsFrontend.LifecyclerConfig.HeartbeatPeriod)
assert.Equal(t, "inmemory", config.Ruler.Ring.KVStore.Store)
assert.Equal(t, 5*time.Minute, config.Ruler.Ring.HeartbeatPeriod)
@ -1848,6 +1910,12 @@ distributor:
assert.Equal(t, "inmemory", config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store)
assert.Equal(t, 5*time.Minute, config.Ingester.LifecyclerConfig.HeartbeatPeriod)
assert.Equal(t, "inmemory", config.IngestLimits.LifecyclerConfig.RingConfig.KVStore.Store)
assert.Equal(t, 5*time.Minute, config.IngestLimits.LifecyclerConfig.HeartbeatPeriod)
assert.Equal(t, "inmemory", config.IngestLimitsFrontend.LifecyclerConfig.RingConfig.KVStore.Store)
assert.Equal(t, 5*time.Minute, config.IngestLimitsFrontend.LifecyclerConfig.HeartbeatPeriod)
assert.Equal(t, "inmemory", config.Ruler.Ring.KVStore.Store)
assert.Equal(t, 5*time.Minute, config.Ruler.Ring.HeartbeatPeriod)
@ -1872,6 +1940,8 @@ distributor:
assert.NoError(t, err)
assert.Equal(t, "etcd", config.Distributor.DistributorRing.KVStore.Store)
assert.Equal(t, "consul", config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store)
assert.Equal(t, "consul", config.IngestLimits.LifecyclerConfig.RingConfig.KVStore.Store)
assert.Equal(t, "consul", config.IngestLimitsFrontend.LifecyclerConfig.RingConfig.KVStore.Store)
assert.Equal(t, "consul", config.Ruler.Ring.KVStore.Store)
assert.Equal(t, "consul", config.QueryScheduler.SchedulerRing.KVStore.Store)
assert.Equal(t, "consul", config.CompactorConfig.CompactorRing.KVStore.Store)
@ -1890,6 +1960,8 @@ common:
assert.NoError(t, err)
assert.Equal(t, "etcd", config.Distributor.DistributorRing.KVStore.Store)
assert.Equal(t, "etcd", config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store)
assert.Equal(t, "etcd", config.IngestLimits.LifecyclerConfig.RingConfig.KVStore.Store)
assert.Equal(t, "etcd", config.IngestLimitsFrontend.LifecyclerConfig.RingConfig.KVStore.Store)
assert.Equal(t, "etcd", config.Ruler.Ring.KVStore.Store)
assert.Equal(t, "etcd", config.QueryScheduler.SchedulerRing.KVStore.Store)
assert.Equal(t, "etcd", config.CompactorConfig.CompactorRing.KVStore.Store)
@ -2009,6 +2081,12 @@ func Test_instanceAddr(t *testing.T) {
ingester:
lifecycler:
address: myingester
ingest_limits:
lifecycler:
address: myingestlimits
ingest_limits_frontend:
lifecycler:
address: myingestlimitsfrontend
memberlist:
advertise_addr: mymemberlist
ruler:
@ -2033,6 +2111,8 @@ common:
assert.NoError(t, err)
assert.Equal(t, "mydistributor", config.Distributor.DistributorRing.InstanceAddr)
assert.Equal(t, "myingester", config.Ingester.LifecyclerConfig.Addr)
assert.Equal(t, "myingestlimits", config.IngestLimits.LifecyclerConfig.Addr)
assert.Equal(t, "myingestlimitsfrontend", config.IngestLimitsFrontend.LifecyclerConfig.Addr)
assert.Equal(t, "mymemberlist", config.MemberlistKV.AdvertiseAddr)
assert.Equal(t, "myruler", config.Ruler.Ring.InstanceAddr)
assert.Equal(t, "myscheduler", config.QueryScheduler.SchedulerRing.InstanceAddr)
@ -2048,6 +2128,8 @@ common:
assert.NoError(t, err)
assert.Equal(t, "99.99.99.99", config.Distributor.DistributorRing.InstanceAddr)
assert.Equal(t, "99.99.99.99", config.Ingester.LifecyclerConfig.Addr)
assert.Equal(t, "99.99.99.99", config.IngestLimits.LifecyclerConfig.Addr)
assert.Equal(t, "99.99.99.99", config.IngestLimitsFrontend.LifecyclerConfig.Addr)
assert.Equal(t, "99.99.99.99", config.MemberlistKV.AdvertiseAddr)
assert.Equal(t, "99.99.99.99", config.Ruler.Ring.InstanceAddr)
assert.Equal(t, "99.99.99.99", config.QueryScheduler.SchedulerRing.InstanceAddr)
@ -2066,6 +2148,8 @@ common:
assert.NoError(t, err)
assert.Equal(t, "22.22.22.22", config.Distributor.DistributorRing.InstanceAddr)
assert.Equal(t, "22.22.22.22", config.Ingester.LifecyclerConfig.Addr)
assert.Equal(t, "22.22.22.22", config.IngestLimits.LifecyclerConfig.Addr)
assert.Equal(t, "22.22.22.22", config.IngestLimitsFrontend.LifecyclerConfig.Addr)
assert.Equal(t, "99.99.99.99", config.MemberlistKV.AdvertiseAddr) /// not a ring.
assert.Equal(t, "22.22.22.22", config.Ruler.Ring.InstanceAddr)
assert.Equal(t, "22.22.22.22", config.QueryScheduler.SchedulerRing.InstanceAddr)
@ -2085,6 +2169,14 @@ ingester:
lifecycler:
interface_names:
- myingester
ingest_limits:
lifecycler:
interface_names:
- myingestlimits
ingest_limits_frontend:
lifecycler:
interface_names:
- myingestlimitsfrontend
ruler:
ring:
instance_interface_names:
@ -2114,6 +2206,8 @@ common:
assert.NoError(t, err)
assert.Equal(t, []string{"mydistributor"}, config.Distributor.DistributorRing.InstanceInterfaceNames)
assert.Equal(t, []string{"myingester"}, config.Ingester.LifecyclerConfig.InfNames)
assert.Equal(t, []string{"myingestlimits"}, config.IngestLimits.LifecyclerConfig.InfNames)
assert.Equal(t, []string{"myingestlimitsfrontend"}, config.IngestLimitsFrontend.LifecyclerConfig.InfNames)
assert.Equal(t, []string{"myruler"}, config.Ruler.Ring.InstanceInterfaceNames)
assert.Equal(t, []string{"myscheduler"}, config.QueryScheduler.SchedulerRing.InstanceInterfaceNames)
assert.Equal(t, []string{"myfrontend"}, config.Frontend.FrontendV2.InfNames)
@ -2129,6 +2223,8 @@ common:
assert.NoError(t, err)
assert.Equal(t, []string{"commoninterface"}, config.Distributor.DistributorRing.InstanceInterfaceNames)
assert.Equal(t, []string{"commoninterface"}, config.Ingester.LifecyclerConfig.InfNames)
assert.Equal(t, []string{"commoninterface"}, config.IngestLimits.LifecyclerConfig.InfNames)
assert.Equal(t, []string{"commoninterface"}, config.IngestLimitsFrontend.LifecyclerConfig.InfNames)
assert.Equal(t, []string{"commoninterface"}, config.Ruler.Ring.InstanceInterfaceNames)
assert.Equal(t, []string{"commoninterface"}, config.QueryScheduler.SchedulerRing.InstanceInterfaceNames)
assert.Equal(t, []string{"commoninterface"}, config.Frontend.FrontendV2.InfNames)
@ -2148,6 +2244,8 @@ common:
assert.NoError(t, err)
assert.Equal(t, []string{"ringsshouldusethis"}, config.Distributor.DistributorRing.InstanceInterfaceNames)
assert.Equal(t, []string{"ringsshouldusethis"}, config.Ingester.LifecyclerConfig.InfNames)
assert.Equal(t, []string{"ringsshouldusethis"}, config.IngestLimits.LifecyclerConfig.InfNames)
assert.Equal(t, []string{"ringsshouldusethis"}, config.IngestLimitsFrontend.LifecyclerConfig.InfNames)
assert.Equal(t, []string{"ringsshouldusethis"}, config.Ruler.Ring.InstanceInterfaceNames)
assert.Equal(t, []string{"ringsshouldusethis"}, config.QueryScheduler.SchedulerRing.InstanceInterfaceNames)
assert.Equal(t, []string{"ringsshouldntusethis"}, config.Frontend.FrontendV2.InfNames) // not a ring.
@ -2167,6 +2265,8 @@ common:
assert.NoError(t, err)
assert.Equal(t, []string{"interface"}, config.Distributor.DistributorRing.InstanceInterfaceNames)
assert.Equal(t, []string{"interface"}, config.Ingester.LifecyclerConfig.InfNames)
assert.Equal(t, []string{"interface"}, config.IngestLimits.LifecyclerConfig.InfNames)
assert.Equal(t, []string{"interface"}, config.IngestLimitsFrontend.LifecyclerConfig.InfNames)
assert.Equal(t, []string{"interface"}, config.Ruler.Ring.InstanceInterfaceNames)
assert.Equal(t, []string{"interface"}, config.QueryScheduler.SchedulerRing.InstanceInterfaceNames)
assert.Equal(t, []string{"interface"}, config.Frontend.FrontendV2.InfNames)
@ -2187,6 +2287,8 @@ common:
assert.NoError(t, err)
assert.Equal(t, []string{"ringsshouldusethis"}, config.Distributor.DistributorRing.InstanceInterfaceNames)
assert.Equal(t, []string{"ringsshouldusethis"}, config.Ingester.LifecyclerConfig.InfNames)
assert.Equal(t, []string{"ringsshouldusethis"}, config.IngestLimits.LifecyclerConfig.InfNames)
assert.Equal(t, []string{"ringsshouldusethis"}, config.IngestLimitsFrontend.LifecyclerConfig.InfNames)
assert.Equal(t, []string{"ringsshouldusethis"}, config.Ruler.Ring.InstanceInterfaceNames)
assert.Equal(t, []string{"ringsshouldusethis"}, config.QueryScheduler.SchedulerRing.InstanceInterfaceNames)
assert.Equal(t, []string{"ringsshouldntusethis"}, config.Frontend.FrontendV2.InfNames) // not a ring.

@ -45,6 +45,9 @@ import (
"github.com/grafana/loki/v3/pkg/ingester"
ingester_client "github.com/grafana/loki/v3/pkg/ingester/client"
"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/limits"
limits_frontend "github.com/grafana/loki/v3/pkg/limits/frontend"
limits_frontend_client "github.com/grafana/loki/v3/pkg/limits/frontend/client"
"github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/loki/common"
"github.com/grafana/loki/v3/pkg/lokifrontend"
@ -114,6 +117,10 @@ type Config struct {
KafkaConfig kafka.Config `yaml:"kafka_config,omitempty" category:"experimental"`
DataObj dataobjconfig.Config `yaml:"dataobj,omitempty" category:"experimental"`
IngestLimits limits.Config `yaml:"ingest_limits,omitempty" category:"experimental"`
IngestLimitsFrontend limits_frontend.Config `yaml:"ingest_limits_frontend,omitempty" category:"experimental"`
IngestLimitsFrontendClient limits_frontend_client.Config `yaml:"ingest_limits_frontend_client,omitempty" category:"experimental"`
RuntimeConfig runtimeconfig.Config `yaml:"runtime_config,omitempty"`
OperationalConfig runtime.Config `yaml:"operational_config,omitempty"`
Tracing tracing.Config `yaml:"tracing"`
@ -194,6 +201,9 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.KafkaConfig.RegisterFlags(f)
c.BlockBuilder.RegisterFlags(f)
c.BlockScheduler.RegisterFlags(f)
c.IngestLimits.RegisterFlags(f)
c.IngestLimitsFrontend.RegisterFlags(f)
c.IngestLimitsFrontendClient.RegisterFlags(f)
c.UI.RegisterFlags(f)
c.DataObj.RegisterFlags(f)
}
@ -280,6 +290,15 @@ func (c *Config) Validate() error {
if err := c.LimitsConfig.Validate(); err != nil {
errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid limits_config config"))
}
if err := c.IngestLimits.Validate(); err != nil {
errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid ingest_limits config"))
}
if err := c.IngestLimitsFrontend.Validate(); err != nil {
errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid ingest_limits_frontend config"))
}
if err := c.IngestLimitsFrontendClient.Validate(); err != nil {
errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid ingest_limits_frontend_client config"))
}
if err := c.Worker.Validate(); err != nil {
errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid frontend_worker config"))
}
@ -373,6 +392,10 @@ type Loki struct {
tenantConfigs *runtime.TenantConfigs
TenantLimits validation.TenantLimits
distributor *distributor.Distributor
ingestLimits *limits.IngestLimits
ingestLimitsRing *ring.Ring
ingestLimitsFrontend *limits_frontend.Frontend
ingestLimitsFrontendRing *ring.Ring
Ingester ingester.Interface
PatternIngester *pattern.Ingester
PatternRingClient pattern.RingClient
@ -669,6 +692,23 @@ func (t *Loki) readyHandler(sm *services.Manager, shutdownRequested *atomic.Bool
}
}
// Ingest Limits has a special check that makes sure that it was able to register into the ring
if t.ingestLimits != nil {
if err := t.ingestLimits.CheckReady(r.Context()); err != nil {
http.Error(w, "Ingest Limits not ready: "+err.Error(), http.StatusServiceUnavailable)
return
}
}
// Ingest Limits Frontend has a special check that makes sure that it was able to register into
// the ring
if t.ingestLimitsFrontend != nil {
if err := t.ingestLimitsFrontend.CheckReady(r.Context()); err != nil {
http.Error(w, "Ingest Limits Frontend not ready: "+err.Error(), http.StatusServiceUnavailable)
return
}
}
http.Error(w, "ready", http.StatusOK)
}
}
@ -685,10 +725,14 @@ func (t *Loki) setupModuleManager() error {
mm.RegisterModule(RuntimeConfig, t.initRuntimeConfig, modules.UserInvisibleModule)
mm.RegisterModule(MemberlistKV, t.initMemberlistKV, modules.UserInvisibleModule)
mm.RegisterModule(Ring, t.initRing, modules.UserInvisibleModule)
mm.RegisterModule(IngestLimitsRing, t.initIngestLimitsRing, modules.UserInvisibleModule)
mm.RegisterModule(IngestLimitsFrontendRing, t.initIngestLimitsFrontendRing, modules.UserInvisibleModule)
mm.RegisterModule(Overrides, t.initOverrides, modules.UserInvisibleModule)
mm.RegisterModule(OverridesExporter, t.initOverridesExporter)
mm.RegisterModule(TenantConfigs, t.initTenantConfigs, modules.UserInvisibleModule)
mm.RegisterModule(Distributor, t.initDistributor)
mm.RegisterModule(IngestLimits, t.initIngestLimits)
mm.RegisterModule(IngestLimitsFrontend, t.initIngestLimitsFrontend)
mm.RegisterModule(Store, t.initStore, modules.UserInvisibleModule)
mm.RegisterModule(Querier, t.initQuerier)
mm.RegisterModule(Ingester, t.initIngester)
@ -737,6 +781,10 @@ func (t *Loki) setupModuleManager() error {
TenantConfigs: {RuntimeConfig},
UI: {Server},
Distributor: {Ring, Server, Overrides, TenantConfigs, PatternRingClient, PatternIngesterTee, Analytics, PartitionRing, UI},
IngestLimitsRing: {RuntimeConfig, Server, MemberlistKV},
IngestLimits: {MemberlistKV, Server},
IngestLimitsFrontend: {IngestLimitsRing, Overrides, Server, MemberlistKV},
IngestLimitsFrontendRing: {RuntimeConfig, Server, MemberlistKV},
Store: {Overrides, IndexGatewayRing},
Ingester: {Store, Server, MemberlistKV, TenantConfigs, Analytics, PartitionRing, UI},
Querier: {Store, Ring, Server, IngesterQuerier, PatternRingClient, Overrides, Analytics, CacheGenerationLoader, QuerySchedulerRing, UI},
@ -772,6 +820,10 @@ func (t *Loki) setupModuleManager() error {
All: {QueryScheduler, QueryFrontend, Querier, Ingester, PatternIngester, Distributor, Ruler, Compactor, UI},
}
if t.Cfg.IngestLimits.Enabled {
deps[All] = append(deps[All], IngestLimits, IngestLimitsFrontend)
}
if t.Cfg.Querier.PerRequestLimitsEnabled {
level.Debug(util_log.Logger).Log("msg", "per-query request limits support enabled")
mm.RegisterModule(QueryLimiter, t.initQueryLimiter, modules.UserInvisibleModule)

@ -62,6 +62,8 @@ import (
"github.com/grafana/loki/v3/pkg/indexgateway"
"github.com/grafana/loki/v3/pkg/ingester"
"github.com/grafana/loki/v3/pkg/kafka/partition"
"github.com/grafana/loki/v3/pkg/limits"
limits_frontend "github.com/grafana/loki/v3/pkg/limits/frontend"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
@ -115,6 +117,10 @@ const (
Server = "server"
InternalServer = "internal-server"
Distributor = "distributor"
IngestLimits = "ingest-limits"
IngestLimitsRing = "ingest-limits-ring"
IngestLimitsFrontend = "ingest-limits-frontend"
IngestLimitsFrontendRing = "ingest-limits-frontend-ring"
Ingester = "ingester"
PatternIngester = "pattern-ingester"
PatternRingClient = "pattern-ring-client"
@ -308,6 +314,8 @@ func (t *Loki) initRuntimeConfig() (services.Service, error) {
t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
t.Cfg.QueryScheduler.SchedulerRing.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
t.Cfg.Ruler.Ring.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
t.Cfg.IngestLimits.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
t.Cfg.IngestLimitsFrontend.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
return t.runtimeConfig, err
}
@ -398,6 +406,118 @@ func (t *Loki) initDistributor() (services.Service, error) {
return t.distributor, nil
}
func (t *Loki) initIngestLimitsRing() (_ services.Service, err error) {
if !t.Cfg.IngestLimits.Enabled {
return nil, nil
}
reg := prometheus.WrapRegistererWithPrefix(t.Cfg.MetricsNamespace+"_", prometheus.DefaultRegisterer)
t.ingestLimitsRing, err = ring.New(
t.Cfg.IngestLimits.LifecyclerConfig.RingConfig,
limits.RingName,
limits.RingKey,
util_log.Logger,
reg,
)
if err != nil {
return nil, fmt.Errorf("failed to create %s ring: %w", limits.RingName, err)
}
t.Server.HTTP.Path("/ingest-limits/ring").Methods("GET", "POST").Handler(t.ingestLimitsRing)
if t.Cfg.InternalServer.Enable {
t.InternalServer.HTTP.Path("/ingest-limits/ring").Methods("GET", "POST").Handler(t.ingestLimitsRing)
}
return t.ingestLimitsRing, nil
}
func (t *Loki) initIngestLimits() (services.Service, error) {
if !t.Cfg.IngestLimits.Enabled {
return nil, nil
}
t.Cfg.IngestLimits.LifecyclerConfig.ListenPort = t.Cfg.Server.GRPCListenPort
t.Cfg.IngestLimits.KafkaConfig = t.Cfg.KafkaConfig
ingestLimits, err := limits.NewIngestLimits(
t.Cfg.IngestLimits,
util_log.Logger,
prometheus.DefaultRegisterer,
)
if err != nil {
return nil, err
}
t.ingestLimits = ingestLimits
logproto.RegisterIngestLimitsServer(t.Server.GRPC, ingestLimits)
// Register HTTP handler for metadata
t.Server.HTTP.Path("/ingest/limits").Methods("GET").Handler(ingestLimits)
return ingestLimits, nil
}
func (t *Loki) initIngestLimitsFrontendRing() (_ services.Service, err error) {
if !t.Cfg.IngestLimits.Enabled {
return nil, nil
}
reg := prometheus.WrapRegistererWithPrefix(t.Cfg.MetricsNamespace+"_", prometheus.DefaultRegisterer)
if t.ingestLimitsFrontendRing, err = ring.New(
t.Cfg.IngestLimitsFrontend.LifecyclerConfig.RingConfig,
limits_frontend.RingName,
limits_frontend.RingKey,
util_log.Logger,
reg,
); err != nil {
return nil, fmt.Errorf("failed to create %s ring: %w", limits_frontend.RingName, err)
}
t.Server.HTTP.Path("/ingest-limits-frontend/ring").
Methods("GET", "POST").
Handler(t.ingestLimitsFrontendRing)
if t.Cfg.InternalServer.Enable {
t.InternalServer.HTTP.Path("/ingest-limits-frontend/ring").
Methods("GET", "POST").
Handler(t.ingestLimitsFrontendRing)
}
return t.ingestLimitsFrontendRing, nil
}
func (t *Loki) initIngestLimitsFrontend() (services.Service, error) {
if !t.Cfg.IngestLimits.Enabled {
return nil, nil
}
// Members of the ring are expected to listen on their gRPC server port.
t.Cfg.IngestLimitsFrontend.LifecyclerConfig.ListenPort = t.Cfg.Server.GRPCListenPort
logger := log.With(util_log.Logger, "component", "ingest-limits-frontend")
ingestLimitsFrontend, err := limits_frontend.New(
t.Cfg.IngestLimitsFrontend,
limits.RingName,
t.ingestLimitsRing,
t.Overrides,
logger,
prometheus.DefaultRegisterer,
)
if err != nil {
return nil, err
}
t.ingestLimitsFrontend = ingestLimitsFrontend
logproto.RegisterIngestLimitsFrontendServer(t.Server.GRPC, ingestLimitsFrontend)
// Register HTTP handler to check if a tenant exceeds limits
// Returns a JSON response for the frontend to display which
// streams are rejected.
t.Server.HTTP.Path("/ingest/exceeds-limits").Methods("POST").Handler(ingestLimitsFrontend)
return ingestLimitsFrontend, nil
}
// initCodec sets the codec used to encode and decode requests.
func (t *Loki) initCodec() (services.Service, error) {
t.Codec = queryrange.DefaultCodec
@ -1477,6 +1597,9 @@ func (t *Loki) initMemberlistKV() (services.Service, error) {
t.Cfg.Ruler.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.Pattern.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.Ingester.KafkaIngestion.PartitionRingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.IngestLimits.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.IngestLimitsFrontend.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Server.HTTP.Handle("/memberlist", t.MemberlistKV)
if t.Cfg.InternalServer.Enable {

Loading…
Cancel
Save