feat: add a fast path for consuming records from Kafka (#17858)

pull/17850/head^2
George Robinson 9 months ago committed by GitHub
parent e81b6e3dcd
commit efaafd2f6c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 16
      pkg/limits/consumer.go
  2. 38
      pkg/limits/consumer_test.go
  3. 2
      pkg/limits/service.go
  4. 50
      pkg/limits/store.go
  5. 8
      pkg/limits/store_bench_test.go
  6. 23
      pkg/limits/store_test.go

@ -172,15 +172,25 @@ func (c *consumer) processRecord(_ context.Context, state partitionState, r *kgo
c.recordsInvalid.Inc()
return fmt.Errorf("corrupted record: %w", err)
}
if state == partitionReady && c.zone == s.Zone {
// Discard our own records so we don't count the same streams twice.
if c.shouldDiscardRecord(state, &s) {
c.recordsDiscarded.Inc()
return nil
}
c.usage.Update(s.Tenant, []*proto.StreamMetadata{s.Metadata}, r.Timestamp, nil)
if err := c.usage.Update(s.Tenant, s.Metadata, r.Timestamp); err != nil {
if errors.Is(err, errOutsideActiveWindow) {
c.recordsDiscarded.Inc()
} else {
return err
}
}
return nil
}
func (c *consumer) shouldDiscardRecord(state partitionState, s *proto.StreamMetadataRecord) bool {
// Discard our own records so we don't count the same streams twice.
return state == partitionReady && c.zone == s.Zone
}
type partitionReadinessCheck func(partition int32, r *kgo.Record) (bool, error)
// newOffsetReadinessCheck marks a partition as ready if the target offset

@ -4,6 +4,7 @@ import (
"context"
"testing"
"github.com/coder/quartz"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
@ -26,15 +27,17 @@ func TestConsumer_ProcessRecords(t *testing.T) {
b, err := sameZoneRecord.Marshal()
require.NoError(t, err)
// Set up a mock kafka that will return the record during the first poll.
k := mockKafka{
clock := quartz.NewMock(t)
kafka := mockKafka{
fetches: []kgo.Fetches{{{
Topics: []kgo.FetchTopic{{
Topic: "test",
Partitions: []kgo.FetchPartition{{
Partition: 1,
Records: []*kgo.Record{{
Key: []byte("tenant"),
Value: b,
Key: []byte("tenant"),
Value: b,
Timestamp: clock.Now(),
}},
}},
}},
@ -51,7 +54,8 @@ func TestConsumer_ProcessRecords(t *testing.T) {
// was stored.
u, err := newUsageStore(DefaultActiveWindow, DefaultRateWindow, DefaultBucketSize, 1, reg)
require.NoError(t, err)
c := newConsumer(&k, m, u, newOffsetReadinessCheck(m), "zone1",
u.clock = clock
c := newConsumer(&kafka, m, u, newOffsetReadinessCheck(m), "zone1",
log.NewNopLogger(), prometheus.NewRegistry())
require.NoError(t, c.pollFetches(ctx))
// Check that the record was stored.
@ -72,8 +76,9 @@ func TestConsumer_ProcessRecords(t *testing.T) {
}
b, err := sameZoneRecord.Marshal()
require.NoError(t, err)
clock := quartz.NewMock(t)
// Set up a mock kafka that will return the record during the first poll.
k := mockKafka{
kafka := mockKafka{
fetches: []kgo.Fetches{{{
Topics: []kgo.FetchTopic{{
Topic: "test",
@ -98,7 +103,8 @@ func TestConsumer_ProcessRecords(t *testing.T) {
// was discarded.
u, err := newUsageStore(DefaultActiveWindow, DefaultRateWindow, DefaultBucketSize, 1, reg)
require.NoError(t, err)
c := newConsumer(&k, m, u, newOffsetReadinessCheck(m), "zone1",
u.clock = clock
c := newConsumer(&kafka, m, u, newOffsetReadinessCheck(m), "zone1",
log.NewNopLogger(), prometheus.NewRegistry())
require.NoError(t, c.pollFetches(ctx))
// Check that the record was discarded.
@ -130,9 +136,10 @@ func TestConsumer_ReadinessCheck(t *testing.T) {
}
b2, err := otherZoneRecord.Marshal()
require.NoError(t, err)
clock := quartz.NewMock(t)
// Set up a mock kafka that will return the records over two consecutive
// polls.
k := mockKafka{
kafka := mockKafka{
fetches: []kgo.Fetches{{{
// First poll.
Topics: []kgo.FetchTopic{{
@ -140,9 +147,10 @@ func TestConsumer_ReadinessCheck(t *testing.T) {
Partitions: []kgo.FetchPartition{{
Partition: 1,
Records: []*kgo.Record{{
Key: []byte("tenant"),
Value: b1,
Offset: 1,
Key: []byte("tenant"),
Value: b1,
Timestamp: clock.Now(),
Offset: 1,
}},
}},
}},
@ -153,9 +161,10 @@ func TestConsumer_ReadinessCheck(t *testing.T) {
Partitions: []kgo.FetchPartition{{
Partition: 1,
Records: []*kgo.Record{{
Key: []byte("tenant"),
Value: b2,
Offset: 2,
Key: []byte("tenant"),
Value: b2,
Timestamp: clock.Now(),
Offset: 2,
}},
}},
}},
@ -173,7 +182,8 @@ func TestConsumer_ReadinessCheck(t *testing.T) {
// We don't need the usage store for this test.
u, err := newUsageStore(DefaultActiveWindow, DefaultRateWindow, DefaultBucketSize, 1, reg)
require.NoError(t, err)
c := newConsumer(&k, m, u, newOffsetReadinessCheck(m), "zone1",
u.clock = clock
c := newConsumer(&kafka, m, u, newOffsetReadinessCheck(m), "zone1",
log.NewNopLogger(), prometheus.NewRegistry())
// The first poll should fetch the first record.
require.NoError(t, c.pollFetches(ctx))

@ -328,7 +328,7 @@ func (s *Service) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimitsReq
streams = streams[:valid]
cond := streamLimitExceeded(maxActiveStreams)
accepted, rejected := s.usage.Update(req.Tenant, streams, lastSeenAt, cond)
accepted, rejected := s.usage.UpdateCond(req.Tenant, streams, lastSeenAt, cond)
var ingestedBytes uint64
for _, stream := range accepted {

@ -1,6 +1,7 @@
package limits
import (
"errors"
"fmt"
"hash/fnv"
"sync"
@ -15,6 +16,10 @@ import (
// The number of stripe locks.
const numStripes = 64
var (
errOutsideActiveWindow = errors.New("outside active time window")
)
var (
tenantStreamsDesc = prometheus.NewDesc(
"loki_ingest_limits_streams",
@ -124,7 +129,23 @@ func (s *usageStore) IterTenant(tenant string, fn iterateFunc) {
})
}
func (s *usageStore) Update(tenant string, streams []*proto.StreamMetadata, lastSeenAt time.Time, cond condFunc) ([]*proto.StreamMetadata, []*proto.StreamMetadata) {
func (s *usageStore) Update(tenant string, metadata *proto.StreamMetadata, seenAt time.Time) error {
if !s.withinActiveWindow(seenAt) {
return errOutsideActiveWindow
}
var (
partition = s.getPartitionForHash(metadata.StreamHash)
bucketStart = seenAt.Truncate(s.bucketSize).UnixNano()
bucketCutoff = seenAt.Add(-s.rateWindow).UnixNano()
)
s.withLock(tenant, func(i int) {
s.storeStream(i, tenant, partition, metadata.StreamHash,
metadata.TotalSize, seenAt, bucketStart, bucketCutoff)
})
return nil
}
func (s *usageStore) UpdateCond(tenant string, streams []*proto.StreamMetadata, lastSeenAt time.Time, cond condFunc) ([]*proto.StreamMetadata, []*proto.StreamMetadata) {
var (
// Calculate the cutoff for the window size
cutoff = lastSeenAt.Add(-s.activeWindow).UnixNano()
@ -264,6 +285,8 @@ func (s *usageStore) Collect(metrics chan<- prometheus.Metric) {
}
func (s *usageStore) storeStream(i int, tenant string, partition int32, streamHash, recTotalSize uint64, recordTime time.Time, bucketStart, bucketCutOff int64) {
s.checkInitMap(i, tenant, partition)
// Check if the stream already exists in the metadata
recorded, ok := s.stripes[i][tenant][partition][streamHash]
@ -364,19 +387,30 @@ func (s *usageStore) getPartitionForHash(hash uint64) int32 {
return int32(hash % uint64(s.numPartitions))
}
// withinActiveWindow returns true if t is within the active window.
func (s *usageStore) withinActiveWindow(t time.Time) bool {
return s.clock.Now().Add(-s.activeWindow).Before(t)
}
// checkInitMap checks if the maps for the tenant and partition are
// initialized, and if not, initializes them. It must not be called without
// the stripe lock for i.
func (s *usageStore) checkInitMap(i int, tenant string, partition int32) {
if _, ok := s.stripes[i][tenant]; !ok {
s.stripes[i][tenant] = make(tenantUsage)
}
if _, ok := s.stripes[i][tenant][partition]; !ok {
s.stripes[i][tenant][partition] = make(map[uint64]streamUsage)
}
}
// Used in tests.
func (s *usageStore) set(tenant string, stream streamUsage) {
partition := s.getPartitionForHash(stream.hash)
s.withLock(tenant, func(i int) {
if _, ok := s.stripes[i][tenant]; !ok {
s.stripes[i][tenant] = make(tenantUsage)
}
if _, ok := s.stripes[i][tenant][partition]; !ok {
s.stripes[i][tenant][partition] = make(map[uint64]streamUsage)
}
s.checkInitMap(i, tenant, partition)
s.stripes[i][tenant][partition][stream.hash] = stream
})
}
// streamLimitExceeded returns a condFunc that checks if the number of active

@ -62,7 +62,7 @@ func BenchmarkUsageStore_Store(b *testing.B) {
TotalSize: 1500,
}}
s.Update(tenant, metadata, updateTime, nil)
s.UpdateCond(tenant, metadata, updateTime, nil)
}
})
@ -81,7 +81,7 @@ func BenchmarkUsageStore_Store(b *testing.B) {
TotalSize: 1500,
}}
s.Update(tenant, metadata, updateTime, nil)
s.UpdateCond(tenant, metadata, updateTime, nil)
}
})
@ -104,7 +104,7 @@ func BenchmarkUsageStore_Store(b *testing.B) {
TotalSize: 1500,
}}
s.Update(tenant, metadata, updateTime, nil)
s.UpdateCond(tenant, metadata, updateTime, nil)
i++
}
})
@ -125,7 +125,7 @@ func BenchmarkUsageStore_Store(b *testing.B) {
TotalSize: 1500,
}}
s.Update(tenant, metadata, updateTime, nil)
s.UpdateCond(tenant, metadata, updateTime, nil)
i++
}
})

@ -61,7 +61,24 @@ func TestUsageStore_ForTenant(t *testing.T) {
require.ElementsMatch(t, expected2, actual2)
}
func TestUsageStore_Store(t *testing.T) {
func TestUsageStore_Update(t *testing.T) {
s, err := newUsageStore(DefaultActiveWindow, DefaultRateWindow, DefaultBucketSize, 1, prometheus.NewRegistry())
require.NoError(t, err)
clock := quartz.NewMock(t)
s.clock = clock
metadata := &proto.StreamMetadata{
StreamHash: 0x1,
TotalSize: 100,
}
// Metadata outside the active time window returns an error.
time1 := clock.Now().Add(-DefaultActiveWindow)
require.EqualError(t, s.Update("tenant1", metadata, time1), "outside active time window")
// Metadata within the active time window is accepted.
time2 := clock.Now()
require.NoError(t, s.Update("tenant1", metadata, time2))
}
func TestUsageStore_UpdateBulk(t *testing.T) {
tests := []struct {
name string
numPartitions int
@ -168,9 +185,9 @@ func TestUsageStore_Store(t *testing.T) {
require.NoError(t, err)
clock := quartz.NewMock(t)
s.clock = clock
s.Update("tenant", test.seed, clock.Now(), nil)
s.UpdateCond("tenant", test.seed, clock.Now(), nil)
streamLimitCond := streamLimitExceeded(test.maxGlobalStreams)
accepted, rejected := s.Update("tenant", test.streams, clock.Now(), streamLimitCond)
accepted, rejected := s.UpdateCond("tenant", test.streams, clock.Now(), streamLimitCond)
require.ElementsMatch(t, test.expectedAccepted, accepted)
require.ElementsMatch(t, test.expectedRejected, rejected)
})

Loading…
Cancel
Save