chore(blooms): Remove bloom gateway ring (#12484)

Remove ring in favour of a client side, DNS discovery of the bloom gateway servers combined with consistent hashing using the jumphash algorithm (#12470).

Instead of configuring the ring, the bloom gateway client needs to be configured with the `-bloom-gateway-client.addresses` flag, which takes a list of comma separated, DNS-discovery-style strings.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/12463/head^2
Christian Haudum 2 years ago committed by GitHub
parent 57ae4abbec
commit ff08d2fcc2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 98
      docs/sources/configure/_index.md
  2. 34
      pkg/bloomgateway/bloomgateway_test.go
  3. 13
      pkg/bloomgateway/config.go
  4. 27
      pkg/loki/config_wrapper.go
  5. 9
      pkg/loki/loki.go
  6. 32
      pkg/loki/modules.go
  7. 1
      pkg/loki/modules_test.go

@ -1986,102 +1986,6 @@ ring:
The `bloom_gateway` block configures the Loki bloom gateway server, responsible for serving queries for filtering chunks based on filter expressions.
```yaml
# Defines the ring to be used by the bloom gateway servers and clients. In case
# this isn't configured, this block supports inheriting configuration from the
# common ring section.
ring:
kvstore:
# Backend storage to use for the ring. Supported values are: consul, etcd,
# inmemory, memberlist, multi.
# CLI flag: -bloom-gateway.ring.store
[store: <string> | default = "consul"]
# The prefix for the keys in the store. Should end with a /.
# CLI flag: -bloom-gateway.ring.prefix
[prefix: <string> | default = "collectors/"]
# Configuration for a Consul client. Only applies if the selected kvstore is
# consul.
# The CLI flags prefix for this block configuration is: bloom-gateway.ring
[consul: <consul>]
# Configuration for an ETCD v3 client. Only applies if the selected kvstore
# is etcd.
# The CLI flags prefix for this block configuration is: bloom-gateway.ring
[etcd: <etcd>]
multi:
# Primary backend storage used by multi-client.
# CLI flag: -bloom-gateway.ring.multi.primary
[primary: <string> | default = ""]
# Secondary backend storage used by multi-client.
# CLI flag: -bloom-gateway.ring.multi.secondary
[secondary: <string> | default = ""]
# Mirror writes to secondary store.
# CLI flag: -bloom-gateway.ring.multi.mirror-enabled
[mirror_enabled: <boolean> | default = false]
# Timeout for storing value to secondary store.
# CLI flag: -bloom-gateway.ring.multi.mirror-timeout
[mirror_timeout: <duration> | default = 2s]
# Period at which to heartbeat to the ring. 0 = disabled.
# CLI flag: -bloom-gateway.ring.heartbeat-period
[heartbeat_period: <duration> | default = 15s]
# The heartbeat timeout after which compactors are considered unhealthy within
# the ring. 0 = never (timeout disabled).
# CLI flag: -bloom-gateway.ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]
# File path where tokens are stored. If empty, tokens are not stored at
# shutdown and restored at startup.
# CLI flag: -bloom-gateway.ring.tokens-file-path
[tokens_file_path: <string> | default = ""]
# True to enable zone-awareness and replicate blocks across different
# availability zones.
# CLI flag: -bloom-gateway.ring.zone-awareness-enabled
[zone_awareness_enabled: <boolean> | default = false]
# Number of tokens to use in the ring. The bigger the number of tokens, the
# more fingerprint ranges the compactor will own, but the smaller these ranges
# will be. Bigger number of tokens means that more but smaller requests will
# be handled by each gateway.
# CLI flag: -bloom-gateway.ring.tokens
[num_tokens: <int> | default = 16]
# Factor for data replication.
# CLI flag: -bloom-gateway.ring.replication-factor
[replication_factor: <int> | default = 3]
# Instance ID to register in the ring.
# CLI flag: -bloom-gateway.ring.instance-id
[instance_id: <string> | default = "<hostname>"]
# Name of network interface to read address from.
# CLI flag: -bloom-gateway.ring.instance-interface-names
[instance_interface_names: <list of strings> | default = [<private network interfaces>]]
# Port to advertise in the ring (defaults to server.grpc-listen-port).
# CLI flag: -bloom-gateway.ring.instance-port
[instance_port: <int> | default = 0]
# IP address to advertise in the ring.
# CLI flag: -bloom-gateway.ring.instance-addr
[instance_addr: <string> | default = ""]
# The availability zone where this instance is running. Required if
# zone-awareness is enabled.
# CLI flag: -bloom-gateway.ring.instance-availability-zone
[instance_availability_zone: <string> | default = ""]
# Enable using a IPv6 instance address.
# CLI flag: -bloom-gateway.ring.instance-enable-ipv6
[instance_enable_ipv6: <boolean> | default = false]
# Flag to enable or disable the bloom gateway component globally.
# CLI flag: -bloom-gateway.enabled
[enabled: <boolean> | default = false]
@ -4149,7 +4053,6 @@ ring:
Configuration for a Consul client. Only applies if the selected kvstore is `consul`. The supported CLI flags `<prefix>` used to reference this configuration block are:
- `bloom-compactor.ring`
- `bloom-gateway.ring`
- `common.storage.ring`
- `compactor.ring`
- `distributor.ring`
@ -4196,7 +4099,6 @@ Configuration for a Consul client. Only applies if the selected kvstore is `cons
Configuration for an ETCD v3 client. Only applies if the selected kvstore is `etcd`. The supported CLI flags `<prefix>` used to reference this configuration block are:
- `bloom-compactor.ring`
- `bloom-gateway.ring`
- `common.storage.ring`
- `compactor.ring`
- `distributor.ring`

@ -10,9 +10,6 @@ import (
"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/kv/consul"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/user"
"github.com/pkg/errors"
@ -29,7 +26,6 @@ import (
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
bloomshipperconfig "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper/config"
lokiring "github.com/grafana/loki/v3/pkg/util/ring"
"github.com/grafana/loki/v3/pkg/validation"
)
@ -99,20 +95,8 @@ func TestBloomGateway_StartStopService(t *testing.T) {
reg := prometheus.NewRegistry()
t.Run("start and stop bloom gateway", func(t *testing.T) {
kvStore, closer := consul.NewInMemoryClient(ring.GetCodec(), logger, reg)
t.Cleanup(func() {
closer.Close()
})
cfg := Config{
Enabled: true,
Ring: lokiring.RingConfig{
KVStore: kv.Config{
Mock: kvStore,
},
ReplicationFactor: 1,
NumTokens: 16,
},
Enabled: true,
WorkerConcurrency: 4,
MaxOutstandingPerTenant: 1024,
}
@ -137,22 +121,8 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
tenantID := "test"
logger := log.NewNopLogger()
reg := prometheus.NewRegistry()
kvStore, closer := consul.NewInMemoryClient(ring.GetCodec(), logger, reg)
t.Cleanup(func() {
closer.Close()
})
cfg := Config{
Enabled: true,
Ring: lokiring.RingConfig{
KVStore: kv.Config{
Mock: kvStore,
},
ReplicationFactor: 1,
NumTokens: 16,
},
Enabled: true,
WorkerConcurrency: 2,
BlockQueryConcurrency: 2,
MaxOutstandingPerTenant: 1024,

@ -2,16 +2,10 @@ package bloomgateway
import (
"flag"
"github.com/grafana/loki/v3/pkg/util/ring"
)
// Config configures the Bloom Gateway component.
type Config struct {
// Ring configures the ring store used to save and retrieve the different Bloom Gateway instances.
// In case it isn't explicitly set, it follows the same behavior of the other rings (ex: using the common configuration
// section and the ingester configuration by default).
Ring ring.RingConfig `yaml:"ring,omitempty" doc:"description=Defines the ring to be used by the bloom gateway servers and clients. In case this isn't configured, this block supports inheriting configuration from the common ring section."`
// Enabled is the global switch to configures whether Bloom Gateways should be used to filter chunks.
Enabled bool `yaml:"enabled"`
// Client configures the Bloom Gateway client
@ -38,13 +32,6 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
// TODO(chaudum): Figure out what the better place is for registering flags
// -bloom-gateway.client.* or -bloom-gateway-client.*
cfg.Client.RegisterFlags(f)
// Ring
skipFlags := []string{
prefix + "ring.tokens",
}
cfg.Ring.RegisterFlagsWithPrefix(prefix, "collectors/", f, skipFlags...)
f.IntVar(&cfg.Ring.NumTokens, prefix+"ring.tokens", 16, "Number of tokens to use in the ring. The bigger the number of tokens, the more fingerprint ranges the compactor will own, but the smaller these ranges will be. Bigger number of tokens means that more but smaller requests will be handled by each gateway.")
}
type Limits interface {

@ -182,7 +182,6 @@ func applyInstanceConfigs(r, defaults *ConfigWrapper) {
func applyCommonReplicationFactor(r, defaults *ConfigWrapper) {
if !reflect.DeepEqual(r.Common.ReplicationFactor, defaults.Common.ReplicationFactor) {
r.IndexGateway.Ring.ReplicationFactor = r.Common.ReplicationFactor
r.BloomGateway.Ring.ReplicationFactor = r.Common.ReplicationFactor
}
}
@ -332,20 +331,6 @@ func applyConfigToRings(r, defaults *ConfigWrapper, rc lokiring.RingConfig, merg
r.BloomCompactor.Ring.KVStore = rc.KVStore
r.BloomCompactor.Ring.NumTokens = rc.NumTokens
}
// BloomGateway
if mergeWithExisting || reflect.DeepEqual(r.BloomGateway.Ring, defaults.BloomGateway.Ring) {
r.BloomGateway.Ring.HeartbeatTimeout = rc.HeartbeatTimeout
r.BloomGateway.Ring.HeartbeatPeriod = rc.HeartbeatPeriod
r.BloomGateway.Ring.InstancePort = rc.InstancePort
r.BloomGateway.Ring.InstanceAddr = rc.InstanceAddr
r.BloomGateway.Ring.InstanceID = rc.InstanceID
r.BloomGateway.Ring.InstanceInterfaceNames = rc.InstanceInterfaceNames
r.BloomGateway.Ring.InstanceZone = rc.InstanceZone
r.BloomGateway.Ring.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled
r.BloomGateway.Ring.KVStore = rc.KVStore
r.BloomGateway.Ring.NumTokens = rc.NumTokens
}
}
func applyTokensFilePath(cfg *ConfigWrapper) error {
@ -384,13 +369,6 @@ func applyTokensFilePath(cfg *ConfigWrapper) error {
}
cfg.BloomCompactor.Ring.TokensFilePath = f
// Bloom-Gateway
f, err = tokensFile(cfg, "bloomgateway.tokens")
if err != nil {
return err
}
cfg.BloomGateway.Ring.TokensFilePath = f
// Pattern
f, err = tokensFile(cfg, "pattern.tokens")
if err != nil {
@ -487,10 +465,6 @@ func appendLoopbackInterface(cfg, defaults *ConfigWrapper) {
if reflect.DeepEqual(cfg.BloomCompactor.Ring.InstanceInterfaceNames, defaults.BloomCompactor.Ring.InstanceInterfaceNames) {
cfg.BloomCompactor.Ring.InstanceInterfaceNames = append(cfg.BloomCompactor.Ring.InstanceInterfaceNames, loopbackIface)
}
if reflect.DeepEqual(cfg.BloomGateway.Ring.InstanceInterfaceNames, defaults.BloomGateway.Ring.InstanceInterfaceNames) {
cfg.BloomGateway.Ring.InstanceInterfaceNames = append(cfg.BloomGateway.Ring.InstanceInterfaceNames, loopbackIface)
}
}
// applyMemberlistConfig will change the default ingester, distributor, ruler, and query scheduler ring configurations to use memberlist.
@ -506,7 +480,6 @@ func applyMemberlistConfig(r *ConfigWrapper) {
r.CompactorConfig.CompactorRing.KVStore.Store = memberlistStr
r.IndexGateway.Ring.KVStore.Store = memberlistStr
r.BloomCompactor.Ring.KVStore.Store = memberlistStr
r.BloomGateway.Ring.KVStore.Store = memberlistStr
}
var ErrTooManyStorageConfigs = errors.New("too many storage configs provided in the common config, please only define one storage backend")

@ -681,7 +681,6 @@ func (t *Loki) setupModuleManager() error {
mm.RegisterModule(IndexGatewayRing, t.initIndexGatewayRing, modules.UserInvisibleModule)
mm.RegisterModule(IndexGatewayInterceptors, t.initIndexGatewayInterceptors, modules.UserInvisibleModule)
mm.RegisterModule(BloomGateway, t.initBloomGateway)
mm.RegisterModule(BloomGatewayRing, t.initBloomGatewayRing, modules.UserInvisibleModule)
mm.RegisterModule(QueryScheduler, t.initQueryScheduler)
mm.RegisterModule(QuerySchedulerRing, t.initQuerySchedulerRing, modules.UserInvisibleModule)
mm.RegisterModule(Analytics, t.initAnalytics)
@ -713,14 +712,13 @@ func (t *Loki) setupModuleManager() error {
TableManager: {Server, Analytics},
Compactor: {Server, Overrides, MemberlistKV, Analytics},
IndexGateway: {Server, Store, IndexGatewayRing, IndexGatewayInterceptors, Analytics},
BloomGateway: {Server, BloomStore, BloomGatewayRing, Analytics},
BloomGateway: {Server, BloomStore, Analytics},
BloomCompactor: {Server, BloomStore, BloomCompactorRing, Analytics, Store},
PatternIngester: {Server, MemberlistKV, Analytics},
PatternRingClient: {Server, MemberlistKV, Analytics},
IngesterQuerier: {Ring},
QuerySchedulerRing: {Overrides, MemberlistKV},
IndexGatewayRing: {Overrides, MemberlistKV},
BloomGatewayRing: {Overrides, MemberlistKV},
BloomCompactorRing: {Overrides, MemberlistKV},
MemberlistKV: {Server},
@ -777,11 +775,6 @@ func (t *Loki) setupModuleManager() error {
deps[Server] = append(deps[Server], IngesterGRPCInterceptors)
}
// Add bloom gateway ring in client mode to IndexGateway service dependencies if bloom filtering is enabled.
if t.Cfg.BloomGateway.Enabled {
deps[IndexGateway] = append(deps[IndexGateway], BloomGatewayRing)
}
if t.Cfg.LegacyReadTarget {
deps[Read] = append(deps[Read], deps[Backend]...)
}

@ -114,7 +114,6 @@ const (
MemberlistKV string = "memberlist-kv"
Compactor string = "compactor"
BloomGateway string = "bloom-gateway"
BloomGatewayRing string = "bloom-gateway-ring"
IndexGateway string = "index-gateway"
IndexGatewayRing string = "index-gateway-ring"
IndexGatewayInterceptors string = "index-gateway-interceptors"
@ -278,7 +277,6 @@ func (t *Loki) initRuntimeConfig() (services.Service, error) {
t.Cfg.BloomCompactor.Ring.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
t.Cfg.Distributor.DistributorRing.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
t.Cfg.IndexGateway.Ring.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
t.Cfg.BloomGateway.Ring.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
t.Cfg.QueryScheduler.SchedulerRing.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
t.Cfg.Ruler.Ring.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
@ -1293,7 +1291,6 @@ func (t *Loki) initMemberlistKV() (services.Service, error) {
t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.QueryScheduler.SchedulerRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.Ruler.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.BloomGateway.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.BloomCompactor.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.Pattern.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Server.HTTP.Handle("/memberlist", t.MemberlistKV)
@ -1386,35 +1383,6 @@ func (t *Loki) initBloomGateway() (services.Service, error) {
return gateway, nil
}
func (t *Loki) initBloomGatewayRing() (services.Service, error) {
if !t.Cfg.BloomGateway.Enabled {
return nil, nil
}
// Inherit ring listen port from gRPC config
t.Cfg.BloomGateway.Ring.ListenPort = t.Cfg.Server.GRPCListenPort
// TODO(chaudum): Do we want to integration the bloom gateway component into the backend target?
mode := lokiring.ClientMode
legacyReadMode := t.Cfg.LegacyReadTarget && t.isModuleActive(Read)
if t.Cfg.isModuleEnabled(BloomGateway) || t.Cfg.isModuleEnabled(Backend) || legacyReadMode {
mode = lokiring.ServerMode
}
manager, err := lokiring.NewRingManager(bloomGatewayRingKey, mode, t.Cfg.BloomGateway.Ring, t.Cfg.BloomGateway.Ring.ReplicationFactor, t.Cfg.BloomGateway.Ring.NumTokens, util_log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, gerrors.Wrap(err, "error initializing bloom gateway ring manager")
}
t.bloomGatewayRingManager = manager
t.Server.HTTP.Path("/bloomgateway/ring").Methods("GET", "POST").Handler(t.bloomGatewayRingManager)
if t.Cfg.InternalServer.Enable {
t.InternalServer.HTTP.Path("/bloomgateway/ring").Methods("GET", "POST").Handler(t.bloomGatewayRingManager)
}
return t.bloomGatewayRingManager, nil
}
func (t *Loki) initIndexGateway() (services.Service, error) {
shardingStrategy := indexgateway.GetShardingStrategy(t.Cfg.IndexGateway, t.indexGatewayRingManager, t.Overrides)

@ -410,7 +410,6 @@ func minimalWorkingConfig(t *testing.T, dir, target string, cfgTransformers ...f
cfg.IndexGateway.Mode = indexgateway.SimpleMode
cfg.IndexGateway.Ring.InstanceAddr = localhost
cfg.BloomCompactor.Ring.InstanceAddr = localhost
cfg.BloomGateway.Ring.InstanceAddr = localhost
cfg.CompactorConfig.CompactorRing.InstanceAddr = localhost
cfg.CompactorConfig.WorkingDirectory = filepath.Join(dir, "compactor")

Loading…
Cancel
Save