Loki: Apply the ingester ring config to all other rings (distributor, ruler, query-scheduler) (#4546)

* apply the ingester ring config to all other rings in ApplyDynamicConfig

* update the test, only verify ingester config wasn't manipulated by the step to apply the common memberlist config when join_members is defined.
pull/4563/head
Ed Welch 4 years ago committed by GitHub
parent 2864a4f728
commit 6853c387ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 48
      pkg/loki/config_wrapper.go
  2. 13
      pkg/loki/config_wrapper_test.go

@ -85,6 +85,7 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source {
r.QueryScheduler.UseSchedulerRing = true
}
applyIngesterRingConfig(r)
applyMemberlistConfig(r)
if err := applyStorageConfig(r, &defaults); err != nil {
@ -101,6 +102,53 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source {
}
}
// applyIngesterRingConfig will use whatever config is setup for the ingester ring and use it everywhere else
// we have a ring configured. The reason for centralizing on the ingester ring as this is been set in basically
// all of our provided config files for all of time, usually set to `inmemory` for all the single binary Loki's
// and is the most central ring config for Loki.
func applyIngesterRingConfig(r *ConfigWrapper) {
lc := r.Ingester.LifecyclerConfig
rc := r.Ingester.LifecyclerConfig.RingConfig
s := rc.KVStore.Store
sc := r.Ingester.LifecyclerConfig.RingConfig.KVStore.StoreConfig
// This gets ugly because we use a separate struct for configuring each ring...
// Distributor
r.Distributor.DistributorRing.HeartbeatTimeout = rc.HeartbeatTimeout
r.Distributor.DistributorRing.HeartbeatPeriod = lc.HeartbeatPeriod
r.Distributor.DistributorRing.InstancePort = lc.Port
r.Distributor.DistributorRing.InstanceAddr = lc.Addr
r.Distributor.DistributorRing.InstanceID = lc.ID
r.Distributor.DistributorRing.InstanceInterfaceNames = lc.InfNames
r.Distributor.DistributorRing.KVStore.Store = s
r.Distributor.DistributorRing.KVStore.StoreConfig = sc
// Ruler
r.Ruler.Ring.HeartbeatTimeout = rc.HeartbeatTimeout
r.Ruler.Ring.HeartbeatPeriod = lc.HeartbeatPeriod
r.Ruler.Ring.InstancePort = lc.Port
r.Ruler.Ring.InstanceAddr = lc.Addr
r.Ruler.Ring.InstanceID = lc.ID
r.Ruler.Ring.InstanceInterfaceNames = lc.InfNames
r.Ruler.Ring.NumTokens = lc.NumTokens
r.Ruler.Ring.KVStore.Store = s
r.Ruler.Ring.KVStore.StoreConfig = sc
// Query Scheduler
r.QueryScheduler.SchedulerRing.HeartbeatTimeout = rc.HeartbeatTimeout
r.QueryScheduler.SchedulerRing.HeartbeatPeriod = lc.HeartbeatPeriod
r.QueryScheduler.SchedulerRing.InstancePort = lc.Port
r.QueryScheduler.SchedulerRing.InstanceAddr = lc.Addr
r.QueryScheduler.SchedulerRing.InstanceID = lc.ID
r.QueryScheduler.SchedulerRing.InstanceInterfaceNames = lc.InfNames
r.QueryScheduler.SchedulerRing.InstanceZone = lc.Zone
r.QueryScheduler.SchedulerRing.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled
r.QueryScheduler.SchedulerRing.TokensFilePath = lc.TokensFilePath
r.QueryScheduler.SchedulerRing.KVStore.Store = s
r.QueryScheduler.SchedulerRing.KVStore.StoreConfig = sc
}
// applyMemberlistConfig will change the default ingester, distributor, ruler, and query scheduler ring configurations to use memberlist
// if the -memberlist.join_members config is provided. The idea here is that if a user explicitly configured the
// memberlist configuration section, they probably want to be using memberlist for all their ring configurations.

@ -2,12 +2,15 @@ package loki
import (
"flag"
"fmt"
"io/ioutil"
"net/url"
"os"
"reflect"
"testing"
"time"
"github.com/cortexproject/cortex/pkg/distributor"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -118,8 +121,6 @@ common:
config, defaults := testContext(emptyConfigString, nil)
assert.EqualValues(t, defaults.Ingester.LifecyclerConfig.RingConfig.KVStore.Store, config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store)
assert.EqualValues(t, defaults.Distributor.DistributorRing.KVStore.Store, config.Distributor.DistributorRing.KVStore.Store)
assert.EqualValues(t, defaults.Ruler.Ring.KVStore.Store, config.Ruler.Ring.KVStore.Store)
})
t.Run("when top-level memberlist join_members are provided, all applicable rings are defaulted to use memberlist", func(t *testing.T) {
@ -818,3 +819,11 @@ func TestDefaultUnmarshal(t *testing.T) {
assert.Equal(t, 9095, config.Server.GRPCListenPort)
})
}
func Test_applyIngesterRingConfig(t *testing.T) {
msgf := "%s has changed, this is a crude attempt to catch mapping errors missed in config_wrapper.applyIngesterRingConfig when a ring config changes. Please add a new mapping and update the expected value in this test."
assert.Equal(t, 8, reflect.TypeOf(distributor.RingConfig{}).NumField(), fmt.Sprintf(msgf, reflect.TypeOf(distributor.RingConfig{}).String()))
}

Loading…
Cancel
Save