From efaafd2f6cd300c028acff65bb9cdfb8b367c100 Mon Sep 17 00:00:00 2001 From: George Robinson Date: Mon, 2 Jun 2025 08:28:22 +0100 Subject: [PATCH] feat: add a fast path for consuming records from Kafka (#17858) --- pkg/limits/consumer.go | 16 +++++++++-- pkg/limits/consumer_test.go | 38 ++++++++++++++++---------- pkg/limits/service.go | 2 +- pkg/limits/store.go | 50 ++++++++++++++++++++++++++++------ pkg/limits/store_bench_test.go | 8 +++--- pkg/limits/store_test.go | 23 ++++++++++++++-- 6 files changed, 104 insertions(+), 33 deletions(-) diff --git a/pkg/limits/consumer.go b/pkg/limits/consumer.go index fdcfee91b7..0481c5d67a 100644 --- a/pkg/limits/consumer.go +++ b/pkg/limits/consumer.go @@ -172,15 +172,25 @@ func (c *consumer) processRecord(_ context.Context, state partitionState, r *kgo c.recordsInvalid.Inc() return fmt.Errorf("corrupted record: %w", err) } - if state == partitionReady && c.zone == s.Zone { - // Discard our own records so we don't count the same streams twice. + if c.shouldDiscardRecord(state, &s) { c.recordsDiscarded.Inc() return nil } - c.usage.Update(s.Tenant, []*proto.StreamMetadata{s.Metadata}, r.Timestamp, nil) + if err := c.usage.Update(s.Tenant, s.Metadata, r.Timestamp); err != nil { + if errors.Is(err, errOutsideActiveWindow) { + c.recordsDiscarded.Inc() + } else { + return err + } + } return nil } +func (c *consumer) shouldDiscardRecord(state partitionState, s *proto.StreamMetadataRecord) bool { + // Discard our own records so we don't count the same streams twice. + return state == partitionReady && c.zone == s.Zone +} + type partitionReadinessCheck func(partition int32, r *kgo.Record) (bool, error) // newOffsetReadinessCheck marks a partition as ready if the target offset diff --git a/pkg/limits/consumer_test.go b/pkg/limits/consumer_test.go index 1e49e845b7..226a4fc8bd 100644 --- a/pkg/limits/consumer_test.go +++ b/pkg/limits/consumer_test.go @@ -4,6 +4,7 @@ import ( "context" "testing" + "github.com/coder/quartz" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" @@ -26,15 +27,17 @@ func TestConsumer_ProcessRecords(t *testing.T) { b, err := sameZoneRecord.Marshal() require.NoError(t, err) // Set up a mock kafka that will return the record during the first poll. - k := mockKafka{ + clock := quartz.NewMock(t) + kafka := mockKafka{ fetches: []kgo.Fetches{{{ Topics: []kgo.FetchTopic{{ Topic: "test", Partitions: []kgo.FetchPartition{{ Partition: 1, Records: []*kgo.Record{{ - Key: []byte("tenant"), - Value: b, + Key: []byte("tenant"), + Value: b, + Timestamp: clock.Now(), }}, }}, }}, @@ -51,7 +54,8 @@ func TestConsumer_ProcessRecords(t *testing.T) { // was stored. u, err := newUsageStore(DefaultActiveWindow, DefaultRateWindow, DefaultBucketSize, 1, reg) require.NoError(t, err) - c := newConsumer(&k, m, u, newOffsetReadinessCheck(m), "zone1", + u.clock = clock + c := newConsumer(&kafka, m, u, newOffsetReadinessCheck(m), "zone1", log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, c.pollFetches(ctx)) // Check that the record was stored. @@ -72,8 +76,9 @@ func TestConsumer_ProcessRecords(t *testing.T) { } b, err := sameZoneRecord.Marshal() require.NoError(t, err) + clock := quartz.NewMock(t) // Set up a mock kafka that will return the record during the first poll. - k := mockKafka{ + kafka := mockKafka{ fetches: []kgo.Fetches{{{ Topics: []kgo.FetchTopic{{ Topic: "test", @@ -98,7 +103,8 @@ func TestConsumer_ProcessRecords(t *testing.T) { // was discarded. u, err := newUsageStore(DefaultActiveWindow, DefaultRateWindow, DefaultBucketSize, 1, reg) require.NoError(t, err) - c := newConsumer(&k, m, u, newOffsetReadinessCheck(m), "zone1", + u.clock = clock + c := newConsumer(&kafka, m, u, newOffsetReadinessCheck(m), "zone1", log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, c.pollFetches(ctx)) // Check that the record was discarded. @@ -130,9 +136,10 @@ func TestConsumer_ReadinessCheck(t *testing.T) { } b2, err := otherZoneRecord.Marshal() require.NoError(t, err) + clock := quartz.NewMock(t) // Set up a mock kafka that will return the records over two consecutive // polls. - k := mockKafka{ + kafka := mockKafka{ fetches: []kgo.Fetches{{{ // First poll. Topics: []kgo.FetchTopic{{ @@ -140,9 +147,10 @@ func TestConsumer_ReadinessCheck(t *testing.T) { Partitions: []kgo.FetchPartition{{ Partition: 1, Records: []*kgo.Record{{ - Key: []byte("tenant"), - Value: b1, - Offset: 1, + Key: []byte("tenant"), + Value: b1, + Timestamp: clock.Now(), + Offset: 1, }}, }}, }}, @@ -153,9 +161,10 @@ func TestConsumer_ReadinessCheck(t *testing.T) { Partitions: []kgo.FetchPartition{{ Partition: 1, Records: []*kgo.Record{{ - Key: []byte("tenant"), - Value: b2, - Offset: 2, + Key: []byte("tenant"), + Value: b2, + Timestamp: clock.Now(), + Offset: 2, }}, }}, }}, @@ -173,7 +182,8 @@ func TestConsumer_ReadinessCheck(t *testing.T) { // We don't need the usage store for this test. u, err := newUsageStore(DefaultActiveWindow, DefaultRateWindow, DefaultBucketSize, 1, reg) require.NoError(t, err) - c := newConsumer(&k, m, u, newOffsetReadinessCheck(m), "zone1", + u.clock = clock + c := newConsumer(&kafka, m, u, newOffsetReadinessCheck(m), "zone1", log.NewNopLogger(), prometheus.NewRegistry()) // The first poll should fetch the first record. require.NoError(t, c.pollFetches(ctx)) diff --git a/pkg/limits/service.go b/pkg/limits/service.go index e6264dc15b..af710acace 100644 --- a/pkg/limits/service.go +++ b/pkg/limits/service.go @@ -328,7 +328,7 @@ func (s *Service) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimitsReq streams = streams[:valid] cond := streamLimitExceeded(maxActiveStreams) - accepted, rejected := s.usage.Update(req.Tenant, streams, lastSeenAt, cond) + accepted, rejected := s.usage.UpdateCond(req.Tenant, streams, lastSeenAt, cond) var ingestedBytes uint64 for _, stream := range accepted { diff --git a/pkg/limits/store.go b/pkg/limits/store.go index 6ce1695ef3..ee15827ba7 100644 --- a/pkg/limits/store.go +++ b/pkg/limits/store.go @@ -1,6 +1,7 @@ package limits import ( + "errors" "fmt" "hash/fnv" "sync" @@ -15,6 +16,10 @@ import ( // The number of stripe locks. const numStripes = 64 +var ( + errOutsideActiveWindow = errors.New("outside active time window") +) + var ( tenantStreamsDesc = prometheus.NewDesc( "loki_ingest_limits_streams", @@ -124,7 +129,23 @@ func (s *usageStore) IterTenant(tenant string, fn iterateFunc) { }) } -func (s *usageStore) Update(tenant string, streams []*proto.StreamMetadata, lastSeenAt time.Time, cond condFunc) ([]*proto.StreamMetadata, []*proto.StreamMetadata) { +func (s *usageStore) Update(tenant string, metadata *proto.StreamMetadata, seenAt time.Time) error { + if !s.withinActiveWindow(seenAt) { + return errOutsideActiveWindow + } + var ( + partition = s.getPartitionForHash(metadata.StreamHash) + bucketStart = seenAt.Truncate(s.bucketSize).UnixNano() + bucketCutoff = seenAt.Add(-s.rateWindow).UnixNano() + ) + s.withLock(tenant, func(i int) { + s.storeStream(i, tenant, partition, metadata.StreamHash, + metadata.TotalSize, seenAt, bucketStart, bucketCutoff) + }) + return nil +} + +func (s *usageStore) UpdateCond(tenant string, streams []*proto.StreamMetadata, lastSeenAt time.Time, cond condFunc) ([]*proto.StreamMetadata, []*proto.StreamMetadata) { var ( // Calculate the cutoff for the window size cutoff = lastSeenAt.Add(-s.activeWindow).UnixNano() @@ -264,6 +285,8 @@ func (s *usageStore) Collect(metrics chan<- prometheus.Metric) { } func (s *usageStore) storeStream(i int, tenant string, partition int32, streamHash, recTotalSize uint64, recordTime time.Time, bucketStart, bucketCutOff int64) { + s.checkInitMap(i, tenant, partition) + // Check if the stream already exists in the metadata recorded, ok := s.stripes[i][tenant][partition][streamHash] @@ -364,19 +387,30 @@ func (s *usageStore) getPartitionForHash(hash uint64) int32 { return int32(hash % uint64(s.numPartitions)) } +// withinActiveWindow returns true if t is within the active window. +func (s *usageStore) withinActiveWindow(t time.Time) bool { + return s.clock.Now().Add(-s.activeWindow).Before(t) +} + +// checkInitMap checks if the maps for the tenant and partition are +// initialized, and if not, initializes them. It must not be called without +// the stripe lock for i. +func (s *usageStore) checkInitMap(i int, tenant string, partition int32) { + if _, ok := s.stripes[i][tenant]; !ok { + s.stripes[i][tenant] = make(tenantUsage) + } + if _, ok := s.stripes[i][tenant][partition]; !ok { + s.stripes[i][tenant][partition] = make(map[uint64]streamUsage) + } +} + // Used in tests. func (s *usageStore) set(tenant string, stream streamUsage) { partition := s.getPartitionForHash(stream.hash) s.withLock(tenant, func(i int) { - if _, ok := s.stripes[i][tenant]; !ok { - s.stripes[i][tenant] = make(tenantUsage) - } - if _, ok := s.stripes[i][tenant][partition]; !ok { - s.stripes[i][tenant][partition] = make(map[uint64]streamUsage) - } + s.checkInitMap(i, tenant, partition) s.stripes[i][tenant][partition][stream.hash] = stream }) - } // streamLimitExceeded returns a condFunc that checks if the number of active diff --git a/pkg/limits/store_bench_test.go b/pkg/limits/store_bench_test.go index 59f7e1e32e..f901726656 100644 --- a/pkg/limits/store_bench_test.go +++ b/pkg/limits/store_bench_test.go @@ -62,7 +62,7 @@ func BenchmarkUsageStore_Store(b *testing.B) { TotalSize: 1500, }} - s.Update(tenant, metadata, updateTime, nil) + s.UpdateCond(tenant, metadata, updateTime, nil) } }) @@ -81,7 +81,7 @@ func BenchmarkUsageStore_Store(b *testing.B) { TotalSize: 1500, }} - s.Update(tenant, metadata, updateTime, nil) + s.UpdateCond(tenant, metadata, updateTime, nil) } }) @@ -104,7 +104,7 @@ func BenchmarkUsageStore_Store(b *testing.B) { TotalSize: 1500, }} - s.Update(tenant, metadata, updateTime, nil) + s.UpdateCond(tenant, metadata, updateTime, nil) i++ } }) @@ -125,7 +125,7 @@ func BenchmarkUsageStore_Store(b *testing.B) { TotalSize: 1500, }} - s.Update(tenant, metadata, updateTime, nil) + s.UpdateCond(tenant, metadata, updateTime, nil) i++ } }) diff --git a/pkg/limits/store_test.go b/pkg/limits/store_test.go index de8f2d59c7..d4bf8e3080 100644 --- a/pkg/limits/store_test.go +++ b/pkg/limits/store_test.go @@ -61,7 +61,24 @@ func TestUsageStore_ForTenant(t *testing.T) { require.ElementsMatch(t, expected2, actual2) } -func TestUsageStore_Store(t *testing.T) { +func TestUsageStore_Update(t *testing.T) { + s, err := newUsageStore(DefaultActiveWindow, DefaultRateWindow, DefaultBucketSize, 1, prometheus.NewRegistry()) + require.NoError(t, err) + clock := quartz.NewMock(t) + s.clock = clock + metadata := &proto.StreamMetadata{ + StreamHash: 0x1, + TotalSize: 100, + } + // Metadata outside the active time window returns an error. + time1 := clock.Now().Add(-DefaultActiveWindow) + require.EqualError(t, s.Update("tenant1", metadata, time1), "outside active time window") + // Metadata within the active time window is accepted. + time2 := clock.Now() + require.NoError(t, s.Update("tenant1", metadata, time2)) +} + +func TestUsageStore_UpdateBulk(t *testing.T) { tests := []struct { name string numPartitions int @@ -168,9 +185,9 @@ func TestUsageStore_Store(t *testing.T) { require.NoError(t, err) clock := quartz.NewMock(t) s.clock = clock - s.Update("tenant", test.seed, clock.Now(), nil) + s.UpdateCond("tenant", test.seed, clock.Now(), nil) streamLimitCond := streamLimitExceeded(test.maxGlobalStreams) - accepted, rejected := s.Update("tenant", test.streams, clock.Now(), streamLimitCond) + accepted, rejected := s.UpdateCond("tenant", test.streams, clock.Now(), streamLimitCond) require.ElementsMatch(t, test.expectedAccepted, accepted) require.ElementsMatch(t, test.expectedRejected, rejected) })