diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 2f0a651432..0df0a6c344 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -2463,34 +2463,6 @@ func TestDistributor_PushIngestLimits(t *testing.T) { }}, }, expectedErr: "rpc error: code = Code(429) desc = request exceeded limits: max streams exceeded", - }, { - name: "rate limit is exceeded", - ingestLimitsEnabled: true, - tenant: "test", - streams: logproto.PushRequest{ - Streams: []logproto.Stream{{ - Labels: "{foo=\"bar\"}", - Entries: []logproto.Entry{{ - Timestamp: time.Now(), - Line: "baz", - }}, - }}, - }, - expectedLimitsCalls: 1, - expectedLimitsRequest: &limitsproto.ExceedsLimitsRequest{ - Tenant: "test", - Streams: []*limitsproto.StreamMetadata{{ - StreamHash: 0x90eb45def17f924, - TotalSize: 0x3, - }}, - }, - limitsResponse: &limitsproto.ExceedsLimitsResponse{ - Results: []*limitsproto.ExceedsLimitsResult{{ - StreamHash: 0x90eb45def17f924, - Reason: uint32(limits.ReasonExceedsRateLimit), - }}, - }, - expectedErr: "rpc error: code = Code(429) desc = request exceeded limits: rate limit exceeded", }, { name: "one of two streams exceed max stream limit, request is accepted", ingestLimitsEnabled: true, diff --git a/pkg/distributor/ingest_limits_test.go b/pkg/distributor/ingest_limits_test.go index aa4f0f8078..6565950bb0 100644 --- a/pkg/distributor/ingest_limits_test.go +++ b/pkg/distributor/ingest_limits_test.go @@ -112,11 +112,11 @@ func TestIngestLimits_EnforceLimits(t *testing.T) { response: &proto.ExceedsLimitsResponse{ Results: []*proto.ExceedsLimitsResult{{ StreamHash: 1, - Reason: uint32(limits.ReasonExceedsRateLimit), + Reason: uint32(limits.ReasonExceedsMaxStreams), }}, }, expectedStreams: []KeyedStream{}, - expectedReasons: map[uint64][]string{1: {"rate limit exceeded"}}, + expectedReasons: map[uint64][]string{1: {"max streams exceeded"}}, }, { name: "one of two streams exceeds limits", tenant: "test", @@ -138,14 +138,14 @@ func TestIngestLimits_EnforceLimits(t *testing.T) { response: &proto.ExceedsLimitsResponse{ Results: []*proto.ExceedsLimitsResult{{ StreamHash: 1, - Reason: uint32(limits.ReasonExceedsRateLimit), + Reason: uint32(limits.ReasonExceedsMaxStreams), }}, }, expectedStreams: []KeyedStream{{ HashKey: 2000, // Should not be used. HashKeyNoShard: 2, }}, - expectedReasons: map[uint64][]string{1: {"rate limit exceeded"}}, + expectedReasons: map[uint64][]string{1: {"max streams exceeded"}}, }, { name: "does not exceed limits", tenant: "test", @@ -246,11 +246,11 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) { response: &proto.ExceedsLimitsResponse{ Results: []*proto.ExceedsLimitsResult{{ StreamHash: 1, - Reason: uint32(limits.ReasonExceedsRateLimit), + Reason: uint32(limits.ReasonExceedsMaxStreams), }}, }, expectedExceedsLimits: true, - expectedReasons: map[uint64][]string{1: {"rate limit exceeded"}}, + expectedReasons: map[uint64][]string{1: {"max streams exceeded"}}, }, { name: "does not exceed limits", tenant: "test", diff --git a/pkg/limits/frontend/frontend_test.go b/pkg/limits/frontend/frontend_test.go index abc043c691..f1d1ea8625 100644 --- a/pkg/limits/frontend/frontend_test.go +++ b/pkg/limits/frontend/frontend_test.go @@ -80,7 +80,7 @@ func TestFrontend_ExceedsLimits(t *testing.T) { Reason: uint32(limits.ReasonExceedsMaxStreams), }, { StreamHash: 0x4, - Reason: uint32(limits.ReasonExceedsRateLimit), + Reason: uint32(limits.ReasonExceedsMaxStreams), }}, }}, expected: &proto.ExceedsLimitsResponse{ @@ -89,7 +89,7 @@ func TestFrontend_ExceedsLimits(t *testing.T) { Reason: uint32(limits.ReasonExceedsMaxStreams), }, { StreamHash: 0x4, - Reason: uint32(limits.ReasonExceedsRateLimit), + Reason: uint32(limits.ReasonExceedsMaxStreams), }}, }, }, { @@ -112,7 +112,7 @@ func TestFrontend_ExceedsLimits(t *testing.T) { }, { Results: []*proto.ExceedsLimitsResult{{ StreamHash: 0x4, - Reason: uint32(limits.ReasonExceedsRateLimit), + Reason: uint32(limits.ReasonExceedsMaxStreams), }}, }}, expected: &proto.ExceedsLimitsResponse{ @@ -121,7 +121,7 @@ func TestFrontend_ExceedsLimits(t *testing.T) { Reason: uint32(limits.ReasonExceedsMaxStreams), }, { StreamHash: 0x4, - Reason: uint32(limits.ReasonExceedsRateLimit), + Reason: uint32(limits.ReasonExceedsMaxStreams), }}, }, }} diff --git a/pkg/limits/frontend/http_test.go b/pkg/limits/frontend/http_test.go index 6ab43107e0..9d39f85a39 100644 --- a/pkg/limits/frontend/http_test.go +++ b/pkg/limits/frontend/http_test.go @@ -51,7 +51,7 @@ func TestFrontend_ServeHTTP(t *testing.T) { exceedsLimitsResponses: []*proto.ExceedsLimitsResponse{{ Results: []*proto.ExceedsLimitsResult{{ StreamHash: 0x1, - Reason: uint32(limits.ReasonExceedsRateLimit), + Reason: uint32(limits.ReasonExceedsMaxStreams), }}, }}, request: httpExceedsLimitsRequest{ @@ -64,7 +64,7 @@ func TestFrontend_ServeHTTP(t *testing.T) { expected: httpExceedsLimitsResponse{ Results: []*proto.ExceedsLimitsResult{{ StreamHash: 0x1, - Reason: uint32(limits.ReasonExceedsRateLimit), + Reason: uint32(limits.ReasonExceedsMaxStreams), }}, }, }} diff --git a/pkg/limits/reason.go b/pkg/limits/reason.go index bf19c4becd..6387a7ca05 100644 --- a/pkg/limits/reason.go +++ b/pkg/limits/reason.go @@ -6,18 +6,12 @@ const ( // ReasonExceedsMaxStreams is returned when a tenant exceeds the maximum // number of active streams as per their per-tenant limit. ReasonExceedsMaxStreams Reason = iota - - // ReasonExceedsRateLimit is returned when a tenant exceeds their maximum - // rate limit as per their per-tenant limit. - ReasonExceedsRateLimit ) func (r Reason) String() string { switch r { case ReasonExceedsMaxStreams: return "max streams exceeded" - case ReasonExceedsRateLimit: - return "rate limit exceeded" default: return "unknown reason" } diff --git a/pkg/limits/store.go b/pkg/limits/store.go index d830053095..bc462c960d 100644 --- a/pkg/limits/store.go +++ b/pkg/limits/store.go @@ -64,8 +64,10 @@ type streamUsage struct { // TODO(grobinson): This is a quick fix to allow us to keep testing // correctness. lastProducedAt int64 - totalSize uint64 - rateBuckets []rateBucket + // TODO(grobinson): Rate buckets are not used as we have decided to defer + // implementing rate limits to a later date in the future. + totalSize uint64 + rateBuckets []rateBucket } // RateBucket represents the bytes received during a specific time interval @@ -336,7 +338,7 @@ func (s *usageStore) get(i int, tenant string, partition int32, streamHash uint6 func (s *usageStore) update(i int, tenant string, partition int32, metadata *proto.StreamMetadata, seenAt time.Time) { s.checkInitMap(i, tenant, partition) - streamHash, totalSize := metadata.StreamHash, metadata.TotalSize + streamHash, _ := metadata.StreamHash, metadata.TotalSize // Get the stats for the stream. stream, ok := s.stripes[i][tenant][partition][streamHash] cutoff := seenAt.Add(-s.activeWindow).UnixNano() @@ -344,28 +346,30 @@ func (s *usageStore) update(i int, tenant string, partition int32, metadata *pro if !ok || stream.lastSeenAt < cutoff { stream.hash = streamHash stream.totalSize = 0 - stream.rateBuckets = make([]rateBucket, s.numBuckets) + // stream.rateBuckets = make([]rateBucket, s.numBuckets) } seenAtUnixNano := seenAt.UnixNano() if stream.lastSeenAt <= seenAtUnixNano { stream.lastSeenAt = seenAtUnixNano } - stream.totalSize += totalSize + // TODO(grobinson): As mentioned above, we will come back and implement + // rate limits at a later date in the future. + // stream.totalSize += totalSize // rate buckets are implemented as a circular list. To update a rate // bucket we must first calculate the bucket index. - bucketNum := seenAtUnixNano / int64(s.bucketSize) - bucketIdx := int(bucketNum % int64(s.numBuckets)) - bucket := stream.rateBuckets[bucketIdx] + // bucketNum := seenAtUnixNano / int64(s.bucketSize) + // bucketIdx := int(bucketNum % int64(s.numBuckets)) + // bucket := stream.rateBuckets[bucketIdx] // Once we have found the bucket, we then need to check if it is an old // bucket outside the rate window. If it is, we must reset it before we // can re-use it. - bucketStart := seenAt.Truncate(s.bucketSize).UnixNano() - if bucket.timestamp < bucketStart { - bucket.timestamp = bucketStart - bucket.size = 0 - } - bucket.size += totalSize - stream.rateBuckets[bucketIdx] = bucket + // bucketStart := seenAt.Truncate(s.bucketSize).UnixNano() + // if bucket.timestamp < bucketStart { + // bucket.timestamp = bucketStart + // bucket.size = 0 + // } + // bucket.size += totalSize + // stream.rateBuckets[bucketIdx] = bucket s.stripes[i][tenant][partition][streamHash] = stream } diff --git a/pkg/limits/store_test.go b/pkg/limits/store_test.go index f149457227..85b7492b34 100644 --- a/pkg/limits/store_test.go +++ b/pkg/limits/store_test.go @@ -136,7 +136,67 @@ func TestUsageStore_Update(t *testing.T) { // This test asserts that we update the correct rate buckets, and as rate // buckets are implemented as a circular list, when we reach the end of // list the next bucket is the start of the list. -func TestUsageStore_UpdateRateBuckets(t *testing.T) { +// func TestUsageStore_UpdateRateBuckets(t *testing.T) { +// s, err := newUsageStore(15*time.Minute, 5*time.Minute, time.Minute, 1, prometheus.NewRegistry()) +// require.NoError(t, err) +// clock := quartz.NewMock(t) +// s.clock = clock +// metadata := &proto.StreamMetadata{ +// StreamHash: 0x1, +// TotalSize: 100, +// } +// // Metadata at clock.Now() should update the first rate bucket because +// // the mocked clock starts at 2024-01-01T00:00:00Z. +// time1 := clock.Now() +// require.NoError(t, s.Update("tenant", metadata, time1)) +// stream, ok := s.getForTests("tenant", 0x1) +// require.True(t, ok) +// expected := newRateBuckets(5*time.Minute, time.Minute) +// expected[0].timestamp = time1.UnixNano() +// expected[0].size = 100 +// require.Equal(t, expected, stream.rateBuckets) +// // Update the first bucket with the same metadata but 1 second later. +// clock.Advance(time.Second) +// time2 := clock.Now() +// require.NoError(t, s.Update("tenant", metadata, time2)) +// expected[0].size = 200 +// require.Equal(t, expected, stream.rateBuckets) +// // Advance the clock forward to the next bucket. Should update the second +// // bucket and leave the first bucket unmodified. +// clock.Advance(time.Minute) +// time3 := clock.Now() +// require.NoError(t, s.Update("tenant", metadata, time3)) +// stream, ok = s.getForTests("tenant", 0x1) +// require.True(t, ok) +// // As the clock is now 1 second ahead of the bucket start time, we must +// // truncate the expected time to the start of the bucket. +// expected[1].timestamp = time3.Truncate(time.Minute).UnixNano() +// expected[1].size = 100 +// require.Equal(t, expected, stream.rateBuckets) +// // Advance the clock to the last bucket. +// clock.Advance(3 * time.Minute) +// time4 := clock.Now() +// require.NoError(t, s.Update("tenant", metadata, time4)) +// stream, ok = s.getForTests("tenant", 0x1) +// require.True(t, ok) +// expected[4].timestamp = time4.Truncate(time.Minute).UnixNano() +// expected[4].size = 100 +// require.Equal(t, expected, stream.rateBuckets) +// // Advance the clock one last one. It should wrap around to the start of +// // the list and replace the original bucket with time1. +// clock.Advance(time.Minute) +// time5 := clock.Now() +// require.NoError(t, s.Update("tenant", metadata, time5)) +// stream, ok = s.getForTests("tenant", 0x1) +// require.True(t, ok) +// expected[0].timestamp = time5.Truncate(time.Minute).UnixNano() +// expected[0].size = 100 +// require.Equal(t, expected, stream.rateBuckets) +// } + +// This test asserts that rate buckets are not updated while the TODOs are +// in place. +func TestUsageStore_RateBucketsAreNotUsed(t *testing.T) { s, err := newUsageStore(15*time.Minute, 5*time.Minute, time.Minute, 1, prometheus.NewRegistry()) require.NoError(t, err) clock := quartz.NewMock(t) @@ -145,53 +205,12 @@ func TestUsageStore_UpdateRateBuckets(t *testing.T) { StreamHash: 0x1, TotalSize: 100, } - // Metadata at clock.Now() should update the first rate bucket because - // the mocked clock starts at 2024-01-01T00:00:00Z. - time1 := clock.Now() - require.NoError(t, s.Update("tenant", metadata, time1)) + require.NoError(t, s.Update("tenant", metadata, clock.Now())) stream, ok := s.getForTests("tenant", 0x1) require.True(t, ok) - expected := newRateBuckets(5*time.Minute, time.Minute) - expected[0].timestamp = time1.UnixNano() - expected[0].size = 100 - require.Equal(t, expected, stream.rateBuckets) - // Update the first bucket with the same metadata but 1 second later. - clock.Advance(time.Second) - time2 := clock.Now() - require.NoError(t, s.Update("tenant", metadata, time2)) - expected[0].size = 200 - require.Equal(t, expected, stream.rateBuckets) - // Advance the clock forward to the next bucket. Should update the second - // bucket and leave the first bucket unmodified. - clock.Advance(time.Minute) - time3 := clock.Now() - require.NoError(t, s.Update("tenant", metadata, time3)) - stream, ok = s.getForTests("tenant", 0x1) - require.True(t, ok) - // As the clock is now 1 second ahead of the bucket start time, we must - // truncate the expected time to the start of the bucket. - expected[1].timestamp = time3.Truncate(time.Minute).UnixNano() - expected[1].size = 100 - require.Equal(t, expected, stream.rateBuckets) - // Advance the clock to the last bucket. - clock.Advance(3 * time.Minute) - time4 := clock.Now() - require.NoError(t, s.Update("tenant", metadata, time4)) - stream, ok = s.getForTests("tenant", 0x1) - require.True(t, ok) - expected[4].timestamp = time4.Truncate(time.Minute).UnixNano() - expected[4].size = 100 - require.Equal(t, expected, stream.rateBuckets) - // Advance the clock one last one. It should wrap around to the start of - // the list and replace the original bucket with time1. - clock.Advance(time.Minute) - time5 := clock.Now() - require.NoError(t, s.Update("tenant", metadata, time5)) - stream, ok = s.getForTests("tenant", 0x1) - require.True(t, ok) - expected[0].timestamp = time5.Truncate(time.Minute).UnixNano() - expected[0].size = 100 - require.Equal(t, expected, stream.rateBuckets) + + require.Equal(t, uint64(0), stream.totalSize) + require.Nil(t, stream.rateBuckets) } func TestUsageStore_UpdateCond(t *testing.T) {