chore: stop updating rate limits and remove reason as implementation is deferred (#18052)

pull/18053/head
George Robinson 7 months ago committed by GitHub
parent 89c972286c
commit c69721600c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 28
      pkg/distributor/distributor_test.go
  2. 12
      pkg/distributor/ingest_limits_test.go
  3. 8
      pkg/limits/frontend/frontend_test.go
  4. 4
      pkg/limits/frontend/http_test.go
  5. 6
      pkg/limits/reason.go
  6. 34
      pkg/limits/store.go
  7. 111
      pkg/limits/store_test.go

@ -2463,34 +2463,6 @@ func TestDistributor_PushIngestLimits(t *testing.T) {
}},
},
expectedErr: "rpc error: code = Code(429) desc = request exceeded limits: max streams exceeded",
}, {
name: "rate limit is exceeded",
ingestLimitsEnabled: true,
tenant: "test",
streams: logproto.PushRequest{
Streams: []logproto.Stream{{
Labels: "{foo=\"bar\"}",
Entries: []logproto.Entry{{
Timestamp: time.Now(),
Line: "baz",
}},
}},
},
expectedLimitsCalls: 1,
expectedLimitsRequest: &limitsproto.ExceedsLimitsRequest{
Tenant: "test",
Streams: []*limitsproto.StreamMetadata{{
StreamHash: 0x90eb45def17f924,
TotalSize: 0x3,
}},
},
limitsResponse: &limitsproto.ExceedsLimitsResponse{
Results: []*limitsproto.ExceedsLimitsResult{{
StreamHash: 0x90eb45def17f924,
Reason: uint32(limits.ReasonExceedsRateLimit),
}},
},
expectedErr: "rpc error: code = Code(429) desc = request exceeded limits: rate limit exceeded",
}, {
name: "one of two streams exceed max stream limit, request is accepted",
ingestLimitsEnabled: true,

@ -112,11 +112,11 @@ func TestIngestLimits_EnforceLimits(t *testing.T) {
response: &proto.ExceedsLimitsResponse{
Results: []*proto.ExceedsLimitsResult{{
StreamHash: 1,
Reason: uint32(limits.ReasonExceedsRateLimit),
Reason: uint32(limits.ReasonExceedsMaxStreams),
}},
},
expectedStreams: []KeyedStream{},
expectedReasons: map[uint64][]string{1: {"rate limit exceeded"}},
expectedReasons: map[uint64][]string{1: {"max streams exceeded"}},
}, {
name: "one of two streams exceeds limits",
tenant: "test",
@ -138,14 +138,14 @@ func TestIngestLimits_EnforceLimits(t *testing.T) {
response: &proto.ExceedsLimitsResponse{
Results: []*proto.ExceedsLimitsResult{{
StreamHash: 1,
Reason: uint32(limits.ReasonExceedsRateLimit),
Reason: uint32(limits.ReasonExceedsMaxStreams),
}},
},
expectedStreams: []KeyedStream{{
HashKey: 2000, // Should not be used.
HashKeyNoShard: 2,
}},
expectedReasons: map[uint64][]string{1: {"rate limit exceeded"}},
expectedReasons: map[uint64][]string{1: {"max streams exceeded"}},
}, {
name: "does not exceed limits",
tenant: "test",
@ -246,11 +246,11 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) {
response: &proto.ExceedsLimitsResponse{
Results: []*proto.ExceedsLimitsResult{{
StreamHash: 1,
Reason: uint32(limits.ReasonExceedsRateLimit),
Reason: uint32(limits.ReasonExceedsMaxStreams),
}},
},
expectedExceedsLimits: true,
expectedReasons: map[uint64][]string{1: {"rate limit exceeded"}},
expectedReasons: map[uint64][]string{1: {"max streams exceeded"}},
}, {
name: "does not exceed limits",
tenant: "test",

@ -80,7 +80,7 @@ func TestFrontend_ExceedsLimits(t *testing.T) {
Reason: uint32(limits.ReasonExceedsMaxStreams),
}, {
StreamHash: 0x4,
Reason: uint32(limits.ReasonExceedsRateLimit),
Reason: uint32(limits.ReasonExceedsMaxStreams),
}},
}},
expected: &proto.ExceedsLimitsResponse{
@ -89,7 +89,7 @@ func TestFrontend_ExceedsLimits(t *testing.T) {
Reason: uint32(limits.ReasonExceedsMaxStreams),
}, {
StreamHash: 0x4,
Reason: uint32(limits.ReasonExceedsRateLimit),
Reason: uint32(limits.ReasonExceedsMaxStreams),
}},
},
}, {
@ -112,7 +112,7 @@ func TestFrontend_ExceedsLimits(t *testing.T) {
}, {
Results: []*proto.ExceedsLimitsResult{{
StreamHash: 0x4,
Reason: uint32(limits.ReasonExceedsRateLimit),
Reason: uint32(limits.ReasonExceedsMaxStreams),
}},
}},
expected: &proto.ExceedsLimitsResponse{
@ -121,7 +121,7 @@ func TestFrontend_ExceedsLimits(t *testing.T) {
Reason: uint32(limits.ReasonExceedsMaxStreams),
}, {
StreamHash: 0x4,
Reason: uint32(limits.ReasonExceedsRateLimit),
Reason: uint32(limits.ReasonExceedsMaxStreams),
}},
},
}}

@ -51,7 +51,7 @@ func TestFrontend_ServeHTTP(t *testing.T) {
exceedsLimitsResponses: []*proto.ExceedsLimitsResponse{{
Results: []*proto.ExceedsLimitsResult{{
StreamHash: 0x1,
Reason: uint32(limits.ReasonExceedsRateLimit),
Reason: uint32(limits.ReasonExceedsMaxStreams),
}},
}},
request: httpExceedsLimitsRequest{
@ -64,7 +64,7 @@ func TestFrontend_ServeHTTP(t *testing.T) {
expected: httpExceedsLimitsResponse{
Results: []*proto.ExceedsLimitsResult{{
StreamHash: 0x1,
Reason: uint32(limits.ReasonExceedsRateLimit),
Reason: uint32(limits.ReasonExceedsMaxStreams),
}},
},
}}

@ -6,18 +6,12 @@ const (
// ReasonExceedsMaxStreams is returned when a tenant exceeds the maximum
// number of active streams as per their per-tenant limit.
ReasonExceedsMaxStreams Reason = iota
// ReasonExceedsRateLimit is returned when a tenant exceeds their maximum
// rate limit as per their per-tenant limit.
ReasonExceedsRateLimit
)
func (r Reason) String() string {
switch r {
case ReasonExceedsMaxStreams:
return "max streams exceeded"
case ReasonExceedsRateLimit:
return "rate limit exceeded"
default:
return "unknown reason"
}

@ -64,8 +64,10 @@ type streamUsage struct {
// TODO(grobinson): This is a quick fix to allow us to keep testing
// correctness.
lastProducedAt int64
totalSize uint64
rateBuckets []rateBucket
// TODO(grobinson): Rate buckets are not used as we have decided to defer
// implementing rate limits to a later date in the future.
totalSize uint64
rateBuckets []rateBucket
}
// RateBucket represents the bytes received during a specific time interval
@ -336,7 +338,7 @@ func (s *usageStore) get(i int, tenant string, partition int32, streamHash uint6
func (s *usageStore) update(i int, tenant string, partition int32, metadata *proto.StreamMetadata, seenAt time.Time) {
s.checkInitMap(i, tenant, partition)
streamHash, totalSize := metadata.StreamHash, metadata.TotalSize
streamHash, _ := metadata.StreamHash, metadata.TotalSize
// Get the stats for the stream.
stream, ok := s.stripes[i][tenant][partition][streamHash]
cutoff := seenAt.Add(-s.activeWindow).UnixNano()
@ -344,28 +346,30 @@ func (s *usageStore) update(i int, tenant string, partition int32, metadata *pro
if !ok || stream.lastSeenAt < cutoff {
stream.hash = streamHash
stream.totalSize = 0
stream.rateBuckets = make([]rateBucket, s.numBuckets)
// stream.rateBuckets = make([]rateBucket, s.numBuckets)
}
seenAtUnixNano := seenAt.UnixNano()
if stream.lastSeenAt <= seenAtUnixNano {
stream.lastSeenAt = seenAtUnixNano
}
stream.totalSize += totalSize
// TODO(grobinson): As mentioned above, we will come back and implement
// rate limits at a later date in the future.
// stream.totalSize += totalSize
// rate buckets are implemented as a circular list. To update a rate
// bucket we must first calculate the bucket index.
bucketNum := seenAtUnixNano / int64(s.bucketSize)
bucketIdx := int(bucketNum % int64(s.numBuckets))
bucket := stream.rateBuckets[bucketIdx]
// bucketNum := seenAtUnixNano / int64(s.bucketSize)
// bucketIdx := int(bucketNum % int64(s.numBuckets))
// bucket := stream.rateBuckets[bucketIdx]
// Once we have found the bucket, we then need to check if it is an old
// bucket outside the rate window. If it is, we must reset it before we
// can re-use it.
bucketStart := seenAt.Truncate(s.bucketSize).UnixNano()
if bucket.timestamp < bucketStart {
bucket.timestamp = bucketStart
bucket.size = 0
}
bucket.size += totalSize
stream.rateBuckets[bucketIdx] = bucket
// bucketStart := seenAt.Truncate(s.bucketSize).UnixNano()
// if bucket.timestamp < bucketStart {
// bucket.timestamp = bucketStart
// bucket.size = 0
// }
// bucket.size += totalSize
// stream.rateBuckets[bucketIdx] = bucket
s.stripes[i][tenant][partition][streamHash] = stream
}

@ -136,7 +136,67 @@ func TestUsageStore_Update(t *testing.T) {
// This test asserts that we update the correct rate buckets, and as rate
// buckets are implemented as a circular list, when we reach the end of
// list the next bucket is the start of the list.
func TestUsageStore_UpdateRateBuckets(t *testing.T) {
// func TestUsageStore_UpdateRateBuckets(t *testing.T) {
// s, err := newUsageStore(15*time.Minute, 5*time.Minute, time.Minute, 1, prometheus.NewRegistry())
// require.NoError(t, err)
// clock := quartz.NewMock(t)
// s.clock = clock
// metadata := &proto.StreamMetadata{
// StreamHash: 0x1,
// TotalSize: 100,
// }
// // Metadata at clock.Now() should update the first rate bucket because
// // the mocked clock starts at 2024-01-01T00:00:00Z.
// time1 := clock.Now()
// require.NoError(t, s.Update("tenant", metadata, time1))
// stream, ok := s.getForTests("tenant", 0x1)
// require.True(t, ok)
// expected := newRateBuckets(5*time.Minute, time.Minute)
// expected[0].timestamp = time1.UnixNano()
// expected[0].size = 100
// require.Equal(t, expected, stream.rateBuckets)
// // Update the first bucket with the same metadata but 1 second later.
// clock.Advance(time.Second)
// time2 := clock.Now()
// require.NoError(t, s.Update("tenant", metadata, time2))
// expected[0].size = 200
// require.Equal(t, expected, stream.rateBuckets)
// // Advance the clock forward to the next bucket. Should update the second
// // bucket and leave the first bucket unmodified.
// clock.Advance(time.Minute)
// time3 := clock.Now()
// require.NoError(t, s.Update("tenant", metadata, time3))
// stream, ok = s.getForTests("tenant", 0x1)
// require.True(t, ok)
// // As the clock is now 1 second ahead of the bucket start time, we must
// // truncate the expected time to the start of the bucket.
// expected[1].timestamp = time3.Truncate(time.Minute).UnixNano()
// expected[1].size = 100
// require.Equal(t, expected, stream.rateBuckets)
// // Advance the clock to the last bucket.
// clock.Advance(3 * time.Minute)
// time4 := clock.Now()
// require.NoError(t, s.Update("tenant", metadata, time4))
// stream, ok = s.getForTests("tenant", 0x1)
// require.True(t, ok)
// expected[4].timestamp = time4.Truncate(time.Minute).UnixNano()
// expected[4].size = 100
// require.Equal(t, expected, stream.rateBuckets)
// // Advance the clock one last one. It should wrap around to the start of
// // the list and replace the original bucket with time1.
// clock.Advance(time.Minute)
// time5 := clock.Now()
// require.NoError(t, s.Update("tenant", metadata, time5))
// stream, ok = s.getForTests("tenant", 0x1)
// require.True(t, ok)
// expected[0].timestamp = time5.Truncate(time.Minute).UnixNano()
// expected[0].size = 100
// require.Equal(t, expected, stream.rateBuckets)
// }
// This test asserts that rate buckets are not updated while the TODOs are
// in place.
func TestUsageStore_RateBucketsAreNotUsed(t *testing.T) {
s, err := newUsageStore(15*time.Minute, 5*time.Minute, time.Minute, 1, prometheus.NewRegistry())
require.NoError(t, err)
clock := quartz.NewMock(t)
@ -145,53 +205,12 @@ func TestUsageStore_UpdateRateBuckets(t *testing.T) {
StreamHash: 0x1,
TotalSize: 100,
}
// Metadata at clock.Now() should update the first rate bucket because
// the mocked clock starts at 2024-01-01T00:00:00Z.
time1 := clock.Now()
require.NoError(t, s.Update("tenant", metadata, time1))
require.NoError(t, s.Update("tenant", metadata, clock.Now()))
stream, ok := s.getForTests("tenant", 0x1)
require.True(t, ok)
expected := newRateBuckets(5*time.Minute, time.Minute)
expected[0].timestamp = time1.UnixNano()
expected[0].size = 100
require.Equal(t, expected, stream.rateBuckets)
// Update the first bucket with the same metadata but 1 second later.
clock.Advance(time.Second)
time2 := clock.Now()
require.NoError(t, s.Update("tenant", metadata, time2))
expected[0].size = 200
require.Equal(t, expected, stream.rateBuckets)
// Advance the clock forward to the next bucket. Should update the second
// bucket and leave the first bucket unmodified.
clock.Advance(time.Minute)
time3 := clock.Now()
require.NoError(t, s.Update("tenant", metadata, time3))
stream, ok = s.getForTests("tenant", 0x1)
require.True(t, ok)
// As the clock is now 1 second ahead of the bucket start time, we must
// truncate the expected time to the start of the bucket.
expected[1].timestamp = time3.Truncate(time.Minute).UnixNano()
expected[1].size = 100
require.Equal(t, expected, stream.rateBuckets)
// Advance the clock to the last bucket.
clock.Advance(3 * time.Minute)
time4 := clock.Now()
require.NoError(t, s.Update("tenant", metadata, time4))
stream, ok = s.getForTests("tenant", 0x1)
require.True(t, ok)
expected[4].timestamp = time4.Truncate(time.Minute).UnixNano()
expected[4].size = 100
require.Equal(t, expected, stream.rateBuckets)
// Advance the clock one last one. It should wrap around to the start of
// the list and replace the original bucket with time1.
clock.Advance(time.Minute)
time5 := clock.Now()
require.NoError(t, s.Update("tenant", metadata, time5))
stream, ok = s.getForTests("tenant", 0x1)
require.True(t, ok)
expected[0].timestamp = time5.Truncate(time.Minute).UnixNano()
expected[0].size = 100
require.Equal(t, expected, stream.rateBuckets)
require.Equal(t, uint64(0), stream.totalSize)
require.Nil(t, stream.rateBuckets)
}
func TestUsageStore_UpdateCond(t *testing.T) {

Loading…
Cancel
Save