Revert "Update Ingester Rate Calculations (#7652)" (#7991)

This change was supposed to smooth out the per-second rate of streams so
distributors could react more uniformly to streams with variable rates.
In practice, it resulted in drastically more resource usage on
distributors because `GetStreamRates` now responds with all streams on
an ingester rather than only the streams seen in the last second. This
change also didn't seem to affect sharding precision one way or another.

If we'd still like to implement exponential smoothing for stream rates,
it should be implemented in distributors to keep communication overhead
to a minimum.
pull/7990/head
Travis Patterson 3 years ago committed by GitHub
parent d086c237df
commit 92fca94bb5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      pkg/ingester/instance.go
  2. 4
      pkg/ingester/instance_test.go
  3. 117
      pkg/ingester/stream_rate_calculator.go
  4. 98
      pkg/ingester/stream_rate_calculator_test.go

@ -319,7 +319,6 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, record *WALR
// removeStream removes a stream from the instance.
func (i *instance) removeStream(s *stream) {
if i.streams.Delete(s) {
i.streamRateCalculator.Remove(i.instanceID, s.labelHash)
i.index.Delete(s.labels, s.fp)
i.streamsRemovedTotal.Inc()
memoryStreams.WithLabelValues(i.instanceID).Dec()

@ -205,11 +205,11 @@ func TestGetStreamRates(t *testing.T) {
return valid
}, 3*time.Second, 100*time.Millisecond)
// Decay
// Decay back to 0
require.Eventually(t, func() bool {
rates = inst.streamRateCalculator.Rates()
for _, r := range rates {
if r.Rate > 65000 {
if r.Rate != 0 {
return false
}
}

@ -14,17 +14,6 @@ const (
// The intent is for a per-second rate so this is hard coded
updateInterval = time.Second
// The factor used to weight the moving average. Must be in the range [0, 1.0].
// A larger factor weights recent samples more heavily while a smaller
// factor weights historic samples more heavily.
smoothingFactor = .2
// A threshold for when to reset the average without smoothing. Calculated
// by currentValue >= (lastValue * burstThreshold). This allows us to set
// the smoothing factor fairly small to preserve historic samples but still
// be able to react to sudden bursts in load
burstThreshold = 1.5
)
// stripeLock is taken from ruler/storage/wal/series.go
@ -46,7 +35,7 @@ type StreamRateCalculator struct {
func NewStreamRateCalculator() *StreamRateCalculator {
calc := &StreamRateCalculator{
size: defaultStripeSize,
size: defaultStripeSize,
// Lookup pattern: tenant -> fingerprint -> rate
samples: make([]map[string]map[uint64]logproto.StreamRate, defaultStripeSize),
locks: make([]stripeLock, defaultStripeSize),
@ -77,86 +66,31 @@ func (c *StreamRateCalculator) updateLoop() {
}
func (c *StreamRateCalculator) updateRates() {
rates := c.rates()
samples := c.currentSamplesPerTenant()
samples = updateSamples(rates, samples)
updatedRates := make([]logproto.StreamRate, 0, c.size)
for _, tenantRates := range samples {
for _, streamRates := range tenantRates {
updatedRates = append(updatedRates, streamRates)
}
}
c.rateLock.Lock()
defer c.rateLock.Unlock()
c.allRates = updatedRates
}
func (c *StreamRateCalculator) rates() []logproto.StreamRate {
c.rateLock.RLock()
defer c.rateLock.RUnlock()
return append([]logproto.StreamRate(nil), c.allRates...)
}
func (c *StreamRateCalculator) currentSamplesPerTenant() map[string]map[uint64]logproto.StreamRate {
rates := make(map[string]map[uint64]logproto.StreamRate, c.size)
rates := make([]logproto.StreamRate, 0, c.size)
for i := 0; i < c.size; i++ {
c.locks[i].Lock()
for tenantID, tenant := range c.samples[i] {
existingRates := ratesForTenant(rates, tenantID)
for streamHash, streamRate := range tenant {
existingRates[streamHash] = logproto.StreamRate{
tenantRates := c.samples[i]
for _, tenant := range tenantRates {
for _, streamRate := range tenant {
rates = append(rates, logproto.StreamRate{
Tenant: streamRate.Tenant,
StreamHash: streamRate.StreamHash,
StreamHashNoShard: streamRate.StreamHashNoShard,
Rate: streamRate.Rate,
}
})
}
rates[tenantID] = existingRates
}
c.samples[i] = make(map[string]map[uint64]logproto.StreamRate)
c.locks[i].Unlock()
}
return rates
}
func updateSamples(rates []logproto.StreamRate, samples map[string]map[uint64]logproto.StreamRate) map[string]map[uint64]logproto.StreamRate {
for _, streamRate := range rates {
tenantRates := ratesForTenant(samples, streamRate.Tenant)
if rate, ok := tenantRates[streamRate.StreamHash]; ok {
rate.Rate = weightedMovingAverage(rate.Rate, streamRate.Rate)
tenantRates[streamRate.StreamHash] = rate
} else {
streamRate.Rate = weightedMovingAverage(0, streamRate.Rate)
tenantRates[streamRate.StreamHash] = streamRate
}
samples[streamRate.Tenant] = tenantRates
}
return samples
}
func weightedMovingAverage(n, l int64) int64 {
next, last := float64(n), float64(l)
// If we see a sudden spike use the new value without smoothing
if next >= (last * burstThreshold) {
return n
}
c.rateLock.Lock()
defer c.rateLock.Unlock()
// https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
return int64((smoothingFactor * next) + ((1 - smoothingFactor) * last))
c.allRates = rates
}
func (c *StreamRateCalculator) Rates() []logproto.StreamRate {
@ -172,7 +106,7 @@ func (c *StreamRateCalculator) Record(tenant string, streamHash, streamHashNoSha
c.locks[i].Lock()
defer c.locks[i].Unlock()
tenantMap := c.tenantMapFromSamples(i, tenant)
tenantMap := c.getTenant(i, tenant)
streamRate := tenantMap[streamHash]
streamRate.StreamHash = streamHash
streamRate.StreamHashNoShard = streamHashNoShard
@ -183,40 +117,13 @@ func (c *StreamRateCalculator) Record(tenant string, streamHash, streamHashNoSha
c.samples[i][tenant] = tenantMap
}
func (c *StreamRateCalculator) Remove(tenant string, streamHash uint64) {
i := streamHash & uint64(c.size-1)
c.locks[i].Lock()
tenantMap := c.tenantMapFromSamples(i, tenant)
delete(tenantMap, streamHash)
c.samples[i][tenant] = tenantMap
c.locks[i].Unlock()
c.rateLock.Lock()
defer c.rateLock.Unlock()
for i, rate := range c.allRates {
if rate.Tenant == tenant && rate.StreamHash == streamHash {
c.allRates = append(c.allRates[:i], c.allRates[i+1:]...)
break
}
}
}
func (c *StreamRateCalculator) tenantMapFromSamples(idx uint64, tenant string) map[uint64]logproto.StreamRate {
func (c *StreamRateCalculator) getTenant(idx uint64, tenant string) map[uint64]logproto.StreamRate {
if t, ok := c.samples[idx][tenant]; ok {
return t
}
return make(map[uint64]logproto.StreamRate)
}
func ratesForTenant(rates map[string]map[uint64]logproto.StreamRate, tenant string) map[uint64]logproto.StreamRate {
if t, ok := rates[tenant]; ok {
return t
}
return make(map[uint64]logproto.StreamRate)
}
func (c *StreamRateCalculator) Stop() {
close(c.stopchan)
}

@ -3,99 +3,39 @@ package ingester
import (
"sort"
"testing"
"github.com/grafana/loki/pkg/logproto"
"time"
"github.com/stretchr/testify/require"
)
func TestStreamRateCalculator(t *testing.T) {
t.Run("it records rates for tenants and streams", func(t *testing.T) {
calc := setupCalculator()
calc := NewStreamRateCalculator()
defer calc.Stop()
for i := 0; i < 100; i++ {
calc.Record("tenant 1", 1, 1, 100)
}
for i := 0; i < 100; i++ {
calc.Record("tenant 1", 1, 1, 100)
}
for i := 0; i < 100; i++ {
calc.Record("tenant 2", 1, 1, 100)
}
for i := 0; i < 100; i++ {
calc.Record("tenant 2", 1, 1, 100)
}
calc.updateRates()
require.Eventually(t, func() bool {
rates := calc.Rates()
require.Len(t, rates, 2)
sort.Slice(rates, func(i, j int) bool {
return rates[i].Tenant < rates[j].Tenant
})
require.Equal(t, []logproto.StreamRate{
{StreamHash: 1, StreamHashNoShard: 1, Rate: 10000, Tenant: "tenant 1"},
{StreamHash: 1, StreamHashNoShard: 1, Rate: 10000, Tenant: "tenant 2"},
}, rates)
calc.Remove("tenant 1", 1)
calc.Remove("tenant 2", 1)
calc.updateRates()
require.Len(t, calc.Rates(), 0)
})
t.Run("it records rates using an exponential weighted average", func(t *testing.T) {
calc := setupCalculator()
calc.Record("tenant 1", 1, 1, 10000)
calc.updateRates()
rates := calc.Rates()
require.Len(t, rates, 1)
require.Equal(t, int64(10000), rates[0].Rate)
calc.updateRates()
rates = calc.Rates()
require.Len(t, rates, 1)
require.Equal(t, int64(8000), rates[0].Rate)
calc.Record("tenant 1", 1, 1, 10000)
calc.updateRates()
rates = calc.Rates()
require.Len(t, rates, 1)
require.Equal(t, int64(8400), rates[0].Rate)
})
if len(rates) > 1 {
return rates[0].Tenant == "tenant 1" && rates[0].Rate == 10000 &&
rates[1].Tenant == "tenant 2" && rates[1].Rate == 10000
}
t.Run("it uses the larger sample without taking the average when there's a spike in load", func(t *testing.T) {
calc := setupCalculator()
return false
}, 2*time.Second, 250*time.Millisecond)
calc.Record("tenant 1", 1, 1, 10000)
calc.updateRates()
require.Eventually(t, func() bool {
rates := calc.Rates()
require.Len(t, rates, 1)
require.Equal(t, int64(10000), rates[0].Rate)
calc.updateRates()
rates = calc.Rates()
require.Len(t, rates, 1)
require.Equal(t, int64(8000), rates[0].Rate)
calc.Record("tenant 1", 1, 1, 13000)
calc.updateRates()
rates = calc.Rates()
require.Len(t, rates, 1)
require.Equal(t, int64(13000), rates[0].Rate)
})
}
// This is just the constructor without the async start so we can control it for tests
func setupCalculator() *StreamRateCalculator {
calc := &StreamRateCalculator{
size: defaultStripeSize,
// Lookup pattern: tenant -> fingerprint -> rate
samples: make([]map[string]map[uint64]logproto.StreamRate, defaultStripeSize),
locks: make([]stripeLock, defaultStripeSize),
stopchan: make(chan struct{}),
}
for i := 0; i < defaultStripeSize; i++ {
calc.samples[i] = make(map[string]map[uint64]logproto.StreamRate)
}
return calc
return len(rates) == 0
}, 2*time.Second, 250*time.Millisecond)
}

Loading…
Cancel
Save