From 92fca94bb58c73ebacecfe228a0cd789e485c2ec Mon Sep 17 00:00:00 2001 From: Travis Patterson Date: Thu, 22 Dec 2022 08:21:18 -0700 Subject: [PATCH] Revert "Update Ingester Rate Calculations (#7652)" (#7991) This change was supposed to smooth out the per-second rate of streams so distributors could react more uniformly to streams with variable rates. In practice, it resulted in drastically more resource usage on distributors because `GetStreamRates` now responds with all streams on an ingester rather than only the streams seen in the last second. This change also didn't seem to affect sharding precision one way or another. If we'd still like to implement exponential smoothing for stream rates, it should be implemented in distributors to keep communication overhead to a minimum. --- pkg/ingester/instance.go | 1 - pkg/ingester/instance_test.go | 4 +- pkg/ingester/stream_rate_calculator.go | 117 ++------------------ pkg/ingester/stream_rate_calculator_test.go | 98 ++++------------ 4 files changed, 33 insertions(+), 187 deletions(-) diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 6ca3804255..6134353d2a 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -319,7 +319,6 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, record *WALR // removeStream removes a stream from the instance. func (i *instance) removeStream(s *stream) { if i.streams.Delete(s) { - i.streamRateCalculator.Remove(i.instanceID, s.labelHash) i.index.Delete(s.labels, s.fp) i.streamsRemovedTotal.Inc() memoryStreams.WithLabelValues(i.instanceID).Dec() diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index e14e2ffffa..67e92dc6c9 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -205,11 +205,11 @@ func TestGetStreamRates(t *testing.T) { return valid }, 3*time.Second, 100*time.Millisecond) - // Decay + // Decay back to 0 require.Eventually(t, func() bool { rates = inst.streamRateCalculator.Rates() for _, r := range rates { - if r.Rate > 65000 { + if r.Rate != 0 { return false } } diff --git a/pkg/ingester/stream_rate_calculator.go b/pkg/ingester/stream_rate_calculator.go index ae292e18ae..6312cbe1ae 100644 --- a/pkg/ingester/stream_rate_calculator.go +++ b/pkg/ingester/stream_rate_calculator.go @@ -14,17 +14,6 @@ const ( // The intent is for a per-second rate so this is hard coded updateInterval = time.Second - - // The factor used to weight the moving average. Must be in the range [0, 1.0]. - // A larger factor weights recent samples more heavily while a smaller - // factor weights historic samples more heavily. - smoothingFactor = .2 - - // A threshold for when to reset the average without smoothing. Calculated - // by currentValue >= (lastValue * burstThreshold). This allows us to set - // the smoothing factor fairly small to preserve historic samples but still - // be able to react to sudden bursts in load - burstThreshold = 1.5 ) // stripeLock is taken from ruler/storage/wal/series.go @@ -46,7 +35,7 @@ type StreamRateCalculator struct { func NewStreamRateCalculator() *StreamRateCalculator { calc := &StreamRateCalculator{ - size: defaultStripeSize, + size: defaultStripeSize, // Lookup pattern: tenant -> fingerprint -> rate samples: make([]map[string]map[uint64]logproto.StreamRate, defaultStripeSize), locks: make([]stripeLock, defaultStripeSize), @@ -77,86 +66,31 @@ func (c *StreamRateCalculator) updateLoop() { } func (c *StreamRateCalculator) updateRates() { - rates := c.rates() - samples := c.currentSamplesPerTenant() - samples = updateSamples(rates, samples) - - updatedRates := make([]logproto.StreamRate, 0, c.size) - for _, tenantRates := range samples { - for _, streamRates := range tenantRates { - updatedRates = append(updatedRates, streamRates) - } - } - - c.rateLock.Lock() - defer c.rateLock.Unlock() - - c.allRates = updatedRates -} - -func (c *StreamRateCalculator) rates() []logproto.StreamRate { - c.rateLock.RLock() - defer c.rateLock.RUnlock() - - return append([]logproto.StreamRate(nil), c.allRates...) -} - -func (c *StreamRateCalculator) currentSamplesPerTenant() map[string]map[uint64]logproto.StreamRate { - rates := make(map[string]map[uint64]logproto.StreamRate, c.size) + rates := make([]logproto.StreamRate, 0, c.size) for i := 0; i < c.size; i++ { c.locks[i].Lock() - for tenantID, tenant := range c.samples[i] { - existingRates := ratesForTenant(rates, tenantID) - - for streamHash, streamRate := range tenant { - existingRates[streamHash] = logproto.StreamRate{ + tenantRates := c.samples[i] + for _, tenant := range tenantRates { + for _, streamRate := range tenant { + rates = append(rates, logproto.StreamRate{ Tenant: streamRate.Tenant, StreamHash: streamRate.StreamHash, StreamHashNoShard: streamRate.StreamHashNoShard, Rate: streamRate.Rate, - } + }) } - - rates[tenantID] = existingRates } c.samples[i] = make(map[string]map[uint64]logproto.StreamRate) c.locks[i].Unlock() } - return rates -} - -func updateSamples(rates []logproto.StreamRate, samples map[string]map[uint64]logproto.StreamRate) map[string]map[uint64]logproto.StreamRate { - for _, streamRate := range rates { - tenantRates := ratesForTenant(samples, streamRate.Tenant) - - if rate, ok := tenantRates[streamRate.StreamHash]; ok { - rate.Rate = weightedMovingAverage(rate.Rate, streamRate.Rate) - tenantRates[streamRate.StreamHash] = rate - } else { - streamRate.Rate = weightedMovingAverage(0, streamRate.Rate) - tenantRates[streamRate.StreamHash] = streamRate - } - - samples[streamRate.Tenant] = tenantRates - } - - return samples -} - -func weightedMovingAverage(n, l int64) int64 { - next, last := float64(n), float64(l) - - // If we see a sudden spike use the new value without smoothing - if next >= (last * burstThreshold) { - return n - } + c.rateLock.Lock() + defer c.rateLock.Unlock() - // https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average - return int64((smoothingFactor * next) + ((1 - smoothingFactor) * last)) + c.allRates = rates } func (c *StreamRateCalculator) Rates() []logproto.StreamRate { @@ -172,7 +106,7 @@ func (c *StreamRateCalculator) Record(tenant string, streamHash, streamHashNoSha c.locks[i].Lock() defer c.locks[i].Unlock() - tenantMap := c.tenantMapFromSamples(i, tenant) + tenantMap := c.getTenant(i, tenant) streamRate := tenantMap[streamHash] streamRate.StreamHash = streamHash streamRate.StreamHashNoShard = streamHashNoShard @@ -183,40 +117,13 @@ func (c *StreamRateCalculator) Record(tenant string, streamHash, streamHashNoSha c.samples[i][tenant] = tenantMap } -func (c *StreamRateCalculator) Remove(tenant string, streamHash uint64) { - i := streamHash & uint64(c.size-1) - - c.locks[i].Lock() - tenantMap := c.tenantMapFromSamples(i, tenant) - delete(tenantMap, streamHash) - c.samples[i][tenant] = tenantMap - c.locks[i].Unlock() - - c.rateLock.Lock() - defer c.rateLock.Unlock() - - for i, rate := range c.allRates { - if rate.Tenant == tenant && rate.StreamHash == streamHash { - c.allRates = append(c.allRates[:i], c.allRates[i+1:]...) - break - } - } -} - -func (c *StreamRateCalculator) tenantMapFromSamples(idx uint64, tenant string) map[uint64]logproto.StreamRate { +func (c *StreamRateCalculator) getTenant(idx uint64, tenant string) map[uint64]logproto.StreamRate { if t, ok := c.samples[idx][tenant]; ok { return t } return make(map[uint64]logproto.StreamRate) } -func ratesForTenant(rates map[string]map[uint64]logproto.StreamRate, tenant string) map[uint64]logproto.StreamRate { - if t, ok := rates[tenant]; ok { - return t - } - return make(map[uint64]logproto.StreamRate) -} - func (c *StreamRateCalculator) Stop() { close(c.stopchan) } diff --git a/pkg/ingester/stream_rate_calculator_test.go b/pkg/ingester/stream_rate_calculator_test.go index 54bada6d94..fa70ff051e 100644 --- a/pkg/ingester/stream_rate_calculator_test.go +++ b/pkg/ingester/stream_rate_calculator_test.go @@ -3,99 +3,39 @@ package ingester import ( "sort" "testing" - - "github.com/grafana/loki/pkg/logproto" + "time" "github.com/stretchr/testify/require" ) func TestStreamRateCalculator(t *testing.T) { - t.Run("it records rates for tenants and streams", func(t *testing.T) { - calc := setupCalculator() + calc := NewStreamRateCalculator() + defer calc.Stop() - for i := 0; i < 100; i++ { - calc.Record("tenant 1", 1, 1, 100) - } + for i := 0; i < 100; i++ { + calc.Record("tenant 1", 1, 1, 100) + } - for i := 0; i < 100; i++ { - calc.Record("tenant 2", 1, 1, 100) - } + for i := 0; i < 100; i++ { + calc.Record("tenant 2", 1, 1, 100) + } - calc.updateRates() + require.Eventually(t, func() bool { rates := calc.Rates() - require.Len(t, rates, 2) sort.Slice(rates, func(i, j int) bool { return rates[i].Tenant < rates[j].Tenant }) - require.Equal(t, []logproto.StreamRate{ - {StreamHash: 1, StreamHashNoShard: 1, Rate: 10000, Tenant: "tenant 1"}, - {StreamHash: 1, StreamHashNoShard: 1, Rate: 10000, Tenant: "tenant 2"}, - }, rates) - - calc.Remove("tenant 1", 1) - calc.Remove("tenant 2", 1) - calc.updateRates() - - require.Len(t, calc.Rates(), 0) - }) - - t.Run("it records rates using an exponential weighted average", func(t *testing.T) { - calc := setupCalculator() - - calc.Record("tenant 1", 1, 1, 10000) - calc.updateRates() - rates := calc.Rates() - require.Len(t, rates, 1) - require.Equal(t, int64(10000), rates[0].Rate) - - calc.updateRates() - rates = calc.Rates() - require.Len(t, rates, 1) - require.Equal(t, int64(8000), rates[0].Rate) - - calc.Record("tenant 1", 1, 1, 10000) - calc.updateRates() - rates = calc.Rates() - require.Len(t, rates, 1) - require.Equal(t, int64(8400), rates[0].Rate) - }) + if len(rates) > 1 { + return rates[0].Tenant == "tenant 1" && rates[0].Rate == 10000 && + rates[1].Tenant == "tenant 2" && rates[1].Rate == 10000 + } - t.Run("it uses the larger sample without taking the average when there's a spike in load", func(t *testing.T) { - calc := setupCalculator() + return false + }, 2*time.Second, 250*time.Millisecond) - calc.Record("tenant 1", 1, 1, 10000) - calc.updateRates() + require.Eventually(t, func() bool { rates := calc.Rates() - require.Len(t, rates, 1) - require.Equal(t, int64(10000), rates[0].Rate) - - calc.updateRates() - rates = calc.Rates() - require.Len(t, rates, 1) - require.Equal(t, int64(8000), rates[0].Rate) - - calc.Record("tenant 1", 1, 1, 13000) - calc.updateRates() - rates = calc.Rates() - require.Len(t, rates, 1) - require.Equal(t, int64(13000), rates[0].Rate) - }) -} - -// This is just the constructor without the async start so we can control it for tests -func setupCalculator() *StreamRateCalculator { - calc := &StreamRateCalculator{ - size: defaultStripeSize, - // Lookup pattern: tenant -> fingerprint -> rate - samples: make([]map[string]map[uint64]logproto.StreamRate, defaultStripeSize), - locks: make([]stripeLock, defaultStripeSize), - stopchan: make(chan struct{}), - } - - for i := 0; i < defaultStripeSize; i++ { - calc.samples[i] = make(map[string]map[uint64]logproto.StreamRate) - } - - return calc + return len(rates) == 0 + }, 2*time.Second, 250*time.Millisecond) }