diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 6ca3804255..6134353d2a 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -319,7 +319,6 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, record *WALR // removeStream removes a stream from the instance. func (i *instance) removeStream(s *stream) { if i.streams.Delete(s) { - i.streamRateCalculator.Remove(i.instanceID, s.labelHash) i.index.Delete(s.labels, s.fp) i.streamsRemovedTotal.Inc() memoryStreams.WithLabelValues(i.instanceID).Dec() diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index e14e2ffffa..67e92dc6c9 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -205,11 +205,11 @@ func TestGetStreamRates(t *testing.T) { return valid }, 3*time.Second, 100*time.Millisecond) - // Decay + // Decay back to 0 require.Eventually(t, func() bool { rates = inst.streamRateCalculator.Rates() for _, r := range rates { - if r.Rate > 65000 { + if r.Rate != 0 { return false } } diff --git a/pkg/ingester/stream_rate_calculator.go b/pkg/ingester/stream_rate_calculator.go index ae292e18ae..6312cbe1ae 100644 --- a/pkg/ingester/stream_rate_calculator.go +++ b/pkg/ingester/stream_rate_calculator.go @@ -14,17 +14,6 @@ const ( // The intent is for a per-second rate so this is hard coded updateInterval = time.Second - - // The factor used to weight the moving average. Must be in the range [0, 1.0]. - // A larger factor weights recent samples more heavily while a smaller - // factor weights historic samples more heavily. - smoothingFactor = .2 - - // A threshold for when to reset the average without smoothing. Calculated - // by currentValue >= (lastValue * burstThreshold). This allows us to set - // the smoothing factor fairly small to preserve historic samples but still - // be able to react to sudden bursts in load - burstThreshold = 1.5 ) // stripeLock is taken from ruler/storage/wal/series.go @@ -46,7 +35,7 @@ type StreamRateCalculator struct { func NewStreamRateCalculator() *StreamRateCalculator { calc := &StreamRateCalculator{ - size: defaultStripeSize, + size: defaultStripeSize, // Lookup pattern: tenant -> fingerprint -> rate samples: make([]map[string]map[uint64]logproto.StreamRate, defaultStripeSize), locks: make([]stripeLock, defaultStripeSize), @@ -77,86 +66,31 @@ func (c *StreamRateCalculator) updateLoop() { } func (c *StreamRateCalculator) updateRates() { - rates := c.rates() - samples := c.currentSamplesPerTenant() - samples = updateSamples(rates, samples) - - updatedRates := make([]logproto.StreamRate, 0, c.size) - for _, tenantRates := range samples { - for _, streamRates := range tenantRates { - updatedRates = append(updatedRates, streamRates) - } - } - - c.rateLock.Lock() - defer c.rateLock.Unlock() - - c.allRates = updatedRates -} - -func (c *StreamRateCalculator) rates() []logproto.StreamRate { - c.rateLock.RLock() - defer c.rateLock.RUnlock() - - return append([]logproto.StreamRate(nil), c.allRates...) -} - -func (c *StreamRateCalculator) currentSamplesPerTenant() map[string]map[uint64]logproto.StreamRate { - rates := make(map[string]map[uint64]logproto.StreamRate, c.size) + rates := make([]logproto.StreamRate, 0, c.size) for i := 0; i < c.size; i++ { c.locks[i].Lock() - for tenantID, tenant := range c.samples[i] { - existingRates := ratesForTenant(rates, tenantID) - - for streamHash, streamRate := range tenant { - existingRates[streamHash] = logproto.StreamRate{ + tenantRates := c.samples[i] + for _, tenant := range tenantRates { + for _, streamRate := range tenant { + rates = append(rates, logproto.StreamRate{ Tenant: streamRate.Tenant, StreamHash: streamRate.StreamHash, StreamHashNoShard: streamRate.StreamHashNoShard, Rate: streamRate.Rate, - } + }) } - - rates[tenantID] = existingRates } c.samples[i] = make(map[string]map[uint64]logproto.StreamRate) c.locks[i].Unlock() } - return rates -} - -func updateSamples(rates []logproto.StreamRate, samples map[string]map[uint64]logproto.StreamRate) map[string]map[uint64]logproto.StreamRate { - for _, streamRate := range rates { - tenantRates := ratesForTenant(samples, streamRate.Tenant) - - if rate, ok := tenantRates[streamRate.StreamHash]; ok { - rate.Rate = weightedMovingAverage(rate.Rate, streamRate.Rate) - tenantRates[streamRate.StreamHash] = rate - } else { - streamRate.Rate = weightedMovingAverage(0, streamRate.Rate) - tenantRates[streamRate.StreamHash] = streamRate - } - - samples[streamRate.Tenant] = tenantRates - } - - return samples -} - -func weightedMovingAverage(n, l int64) int64 { - next, last := float64(n), float64(l) - - // If we see a sudden spike use the new value without smoothing - if next >= (last * burstThreshold) { - return n - } + c.rateLock.Lock() + defer c.rateLock.Unlock() - // https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average - return int64((smoothingFactor * next) + ((1 - smoothingFactor) * last)) + c.allRates = rates } func (c *StreamRateCalculator) Rates() []logproto.StreamRate { @@ -172,7 +106,7 @@ func (c *StreamRateCalculator) Record(tenant string, streamHash, streamHashNoSha c.locks[i].Lock() defer c.locks[i].Unlock() - tenantMap := c.tenantMapFromSamples(i, tenant) + tenantMap := c.getTenant(i, tenant) streamRate := tenantMap[streamHash] streamRate.StreamHash = streamHash streamRate.StreamHashNoShard = streamHashNoShard @@ -183,40 +117,13 @@ func (c *StreamRateCalculator) Record(tenant string, streamHash, streamHashNoSha c.samples[i][tenant] = tenantMap } -func (c *StreamRateCalculator) Remove(tenant string, streamHash uint64) { - i := streamHash & uint64(c.size-1) - - c.locks[i].Lock() - tenantMap := c.tenantMapFromSamples(i, tenant) - delete(tenantMap, streamHash) - c.samples[i][tenant] = tenantMap - c.locks[i].Unlock() - - c.rateLock.Lock() - defer c.rateLock.Unlock() - - for i, rate := range c.allRates { - if rate.Tenant == tenant && rate.StreamHash == streamHash { - c.allRates = append(c.allRates[:i], c.allRates[i+1:]...) - break - } - } -} - -func (c *StreamRateCalculator) tenantMapFromSamples(idx uint64, tenant string) map[uint64]logproto.StreamRate { +func (c *StreamRateCalculator) getTenant(idx uint64, tenant string) map[uint64]logproto.StreamRate { if t, ok := c.samples[idx][tenant]; ok { return t } return make(map[uint64]logproto.StreamRate) } -func ratesForTenant(rates map[string]map[uint64]logproto.StreamRate, tenant string) map[uint64]logproto.StreamRate { - if t, ok := rates[tenant]; ok { - return t - } - return make(map[uint64]logproto.StreamRate) -} - func (c *StreamRateCalculator) Stop() { close(c.stopchan) } diff --git a/pkg/ingester/stream_rate_calculator_test.go b/pkg/ingester/stream_rate_calculator_test.go index 54bada6d94..fa70ff051e 100644 --- a/pkg/ingester/stream_rate_calculator_test.go +++ b/pkg/ingester/stream_rate_calculator_test.go @@ -3,99 +3,39 @@ package ingester import ( "sort" "testing" - - "github.com/grafana/loki/pkg/logproto" + "time" "github.com/stretchr/testify/require" ) func TestStreamRateCalculator(t *testing.T) { - t.Run("it records rates for tenants and streams", func(t *testing.T) { - calc := setupCalculator() + calc := NewStreamRateCalculator() + defer calc.Stop() - for i := 0; i < 100; i++ { - calc.Record("tenant 1", 1, 1, 100) - } + for i := 0; i < 100; i++ { + calc.Record("tenant 1", 1, 1, 100) + } - for i := 0; i < 100; i++ { - calc.Record("tenant 2", 1, 1, 100) - } + for i := 0; i < 100; i++ { + calc.Record("tenant 2", 1, 1, 100) + } - calc.updateRates() + require.Eventually(t, func() bool { rates := calc.Rates() - require.Len(t, rates, 2) sort.Slice(rates, func(i, j int) bool { return rates[i].Tenant < rates[j].Tenant }) - require.Equal(t, []logproto.StreamRate{ - {StreamHash: 1, StreamHashNoShard: 1, Rate: 10000, Tenant: "tenant 1"}, - {StreamHash: 1, StreamHashNoShard: 1, Rate: 10000, Tenant: "tenant 2"}, - }, rates) - - calc.Remove("tenant 1", 1) - calc.Remove("tenant 2", 1) - calc.updateRates() - - require.Len(t, calc.Rates(), 0) - }) - - t.Run("it records rates using an exponential weighted average", func(t *testing.T) { - calc := setupCalculator() - - calc.Record("tenant 1", 1, 1, 10000) - calc.updateRates() - rates := calc.Rates() - require.Len(t, rates, 1) - require.Equal(t, int64(10000), rates[0].Rate) - - calc.updateRates() - rates = calc.Rates() - require.Len(t, rates, 1) - require.Equal(t, int64(8000), rates[0].Rate) - - calc.Record("tenant 1", 1, 1, 10000) - calc.updateRates() - rates = calc.Rates() - require.Len(t, rates, 1) - require.Equal(t, int64(8400), rates[0].Rate) - }) + if len(rates) > 1 { + return rates[0].Tenant == "tenant 1" && rates[0].Rate == 10000 && + rates[1].Tenant == "tenant 2" && rates[1].Rate == 10000 + } - t.Run("it uses the larger sample without taking the average when there's a spike in load", func(t *testing.T) { - calc := setupCalculator() + return false + }, 2*time.Second, 250*time.Millisecond) - calc.Record("tenant 1", 1, 1, 10000) - calc.updateRates() + require.Eventually(t, func() bool { rates := calc.Rates() - require.Len(t, rates, 1) - require.Equal(t, int64(10000), rates[0].Rate) - - calc.updateRates() - rates = calc.Rates() - require.Len(t, rates, 1) - require.Equal(t, int64(8000), rates[0].Rate) - - calc.Record("tenant 1", 1, 1, 13000) - calc.updateRates() - rates = calc.Rates() - require.Len(t, rates, 1) - require.Equal(t, int64(13000), rates[0].Rate) - }) -} - -// This is just the constructor without the async start so we can control it for tests -func setupCalculator() *StreamRateCalculator { - calc := &StreamRateCalculator{ - size: defaultStripeSize, - // Lookup pattern: tenant -> fingerprint -> rate - samples: make([]map[string]map[uint64]logproto.StreamRate, defaultStripeSize), - locks: make([]stripeLock, defaultStripeSize), - stopchan: make(chan struct{}), - } - - for i := 0; i < defaultStripeSize; i++ { - calc.samples[i] = make(map[string]map[uint64]logproto.StreamRate) - } - - return calc + return len(rates) == 0 + }, 2*time.Second, 250*time.Millisecond) }