Loki: Remove ring client from the distributor (#6622)

Removes the ring client from the distributor.
- It was originally added on PR #4938 because it was the only ring implementation with HTTPHandler support (used for the ring page). Now that DSKit has support for the other ring implementations (including the lifecycler) this isn't necessary anymore.
- It had an IgnoreUnhealthyInstancesReplicationStrategy attached to it, added by mistake
pull/6664/head
Dylan Guedes 4 years ago committed by GitHub
parent ca0fd01f33
commit 2bb2a2c0e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 17
      pkg/distributor/distributor.go
  2. 2
      pkg/distributor/distributor_test.go
  3. 2
      pkg/distributor/http.go

@ -6,7 +6,6 @@ import (
"net/http"
"time"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/limiter"
"github.com/grafana/dskit/ring"
ring_client "github.com/grafana/dskit/ring/client"
@ -71,7 +70,6 @@ type Distributor struct {
// The global rate limiter requires a distributors ring to count
// the number of healthy instances.
distributorsRing *ring.Ring
distributorsLifecycler *ring.Lifecycler
rateLimitStrat string
@ -106,17 +104,11 @@ func New(cfg Config, clientCfg client.Config, configs *runtime.TenantConfigs, in
// Create the configured ingestion rate limit strategy (local or global).
var ingestionRateStrategy limiter.RateLimiterStrategy
var distributorsLifecycler *ring.Lifecycler
var distributorsRing *ring.Ring
rateLimitStrat := validation.LocalIngestionRateStrategy
var servs []services.Service
if overrides.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy {
rateLimitStrat = validation.GlobalIngestionRateStrategy
ringStore, err := kv.NewClient(
cfg.DistributorRing.KVStore,
ring.GetCodec(),
kv.RegistererWithKVName(prometheus.WrapRegistererWithPrefix("loki_", registerer), "distributor"),
util_log.Logger)
if err != nil {
return nil, errors.Wrap(err, "create distributor KV store client")
}
@ -126,13 +118,7 @@ func New(cfg Config, clientCfg client.Config, configs *runtime.TenantConfigs, in
return nil, errors.Wrap(err, "create distributor lifecycler")
}
distributorsRing, err = ring.NewWithStoreClientAndStrategy(cfg.DistributorRing.ToRingConfig(),
"distributor", "distributor", ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy(), prometheus.WrapRegistererWithPrefix("cortex_", registerer), util_log.Logger)
if err != nil {
return nil, errors.Wrap(err, "create distributor ring client")
}
servs = append(servs, distributorsLifecycler, distributorsRing)
servs = append(servs, distributorsLifecycler)
ingestionRateStrategy = newGlobalIngestionRateStrategy(overrides, distributorsLifecycler)
} else {
ingestionRateStrategy = newLocalIngestionRateStrategy(overrides)
@ -148,7 +134,6 @@ func New(cfg Config, clientCfg client.Config, configs *runtime.TenantConfigs, in
tenantConfigs: configs,
tenantsRetention: retention.NewTenantsRetention(overrides),
ingestersRing: ingestersRing,
distributorsRing: distributorsRing,
distributorsLifecycler: distributorsLifecycler,
validator: validator,
pool: clientpool.NewPool(clientCfg.PoolConfig, ingestersRing, factory, util_log.Logger),

@ -493,7 +493,7 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
// If the distributors ring is setup, wait until the first distributor
// updates to the expected size
if distributors[0].distributorsRing != nil {
if distributors[0].distributorsLifecycler != nil {
test.Poll(t, time.Second, testData.distributors, func() interface{} {
return distributors[0].distributorsLifecycler.HealthyInstancesCount()
})

@ -89,7 +89,7 @@ func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) {
// the distributor and as such, no ring status is returned from this function.
func (d *Distributor) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if d.rateLimitStrat == validation.GlobalIngestionRateStrategy {
d.distributorsRing.ServeHTTP(w, r)
d.distributorsLifecycler.ServeHTTP(w, r)
return
}

Loading…
Cancel
Save