feat(ingest-limits): Append stream metadata records to kafka-based WAL (#17602)

Co-authored-by: George Robinson <george.robinson@grafana.com>
pull/17064/head
Periklis Tsirakidis 8 months ago committed by GitHub
parent 1629181f9c
commit 932f39bc0d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 2
      pkg/limits/frontend/frontend.go
  2. 63
      pkg/limits/ingest_limits.go
  3. 35
      pkg/limits/ingest_limits_test.go
  4. 33
      pkg/limits/mock_test.go
  5. 401
      pkg/limits/proto/limits.pb.go
  6. 6
      pkg/limits/proto/limits.proto
  7. 90
      pkg/limits/stream_metadata.go
  8. 255
      pkg/limits/stream_metadata_test.go
  9. 73
      pkg/limits/wal.go

@ -97,7 +97,7 @@ func (f *Frontend) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimitsRe
for _, resp := range resps {
results = append(results, resp.Results...)
}
return &proto.ExceedsLimitsResponse{results}, nil
return &proto.ExceedsLimitsResponse{Results: results}, nil
}
func (f *Frontend) CheckReady(ctx context.Context) error {

@ -93,7 +93,8 @@ type IngestLimits struct {
cfg Config
logger log.Logger
client *kgo.Client
reader *kgo.Client
writer *kgo.Client
lifecycler *ring.Lifecycler
lifecyclerWatcher *services.FailureWatcher
@ -106,6 +107,7 @@ type IngestLimits struct {
// Track stream metadata
metadata StreamMetadata
wal WAL
// Track partition assignments
partitionManager *PartitionManager
@ -157,7 +159,7 @@ func NewIngestLimits(cfg Config, lims Limits, logger log.Logger, reg prometheus.
kCfg.AutoCreateTopicEnabled = true
kCfg.AutoCreateTopicDefaultPartitions = cfg.NumPartitions
s.client, err = client.NewReaderClient("ingest-limits", kCfg, logger, reg,
s.reader, err = client.NewReaderClient("ingest-limits-reader", kCfg, logger, reg,
kgo.ConsumerGroup(consumerGroup),
kgo.ConsumeTopics(kCfg.Topic),
kgo.Balancers(kgo.CooperativeStickyBalancer()),
@ -170,6 +172,12 @@ func NewIngestLimits(cfg Config, lims Limits, logger log.Logger, reg prometheus.
return nil, fmt.Errorf("failed to create kafka client: %w", err)
}
s.writer, err = client.NewWriterClient("ingest-limits-writer", kCfg, 20, logger, reg)
if err != nil {
return nil, fmt.Errorf("failed to create kafka client: %w", err)
}
s.wal = NewKafkaWAL(s.writer, s.cfg.KafkaConfig.Topic, uint64(s.cfg.NumPartitions), logger)
s.Service = services.NewBasicService(s.starting, s.running, s.stopping)
return s, nil
}
@ -325,9 +333,14 @@ func (s *IngestLimits) updateMetadata(rec *proto.StreamMetadata, tenant string,
// It returns nil for expected termination cases (context cancellation or client closure)
// and returns the original error for other failure cases.
func (s *IngestLimits) stopping(failureCase error) error {
if s.client != nil {
s.client.Close()
if s.reader != nil {
s.reader.Close()
}
if s.wal != nil {
s.wal.Close()
}
if errors.Is(failureCase, context.Canceled) || errors.Is(failureCase, kgo.ErrClientClosed) {
return nil
}
@ -350,7 +363,7 @@ func (s *IngestLimits) GetAssignedPartitions(_ context.Context, _ *proto.GetAssi
// ExceedsLimits implements the proto.IngestLimitsServer interface.
// It returns the number of active streams for a tenant and the status of requested streams.
func (s *IngestLimits) ExceedsLimits(_ context.Context, req *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error) {
func (s *IngestLimits) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error) {
var (
lastSeenAt = s.clock.Now()
// Use the provided lastSeenAt timestamp as the last seen time
@ -363,11 +376,11 @@ func (s *IngestLimits) ExceedsLimits(_ context.Context, req *proto.ExceedsLimits
bucketCutoff = lastSeenAt.Add(-s.cfg.RateWindow).UnixNano()
// Calculate the max active streams per tenant per partition
maxActiveStreams = uint64(s.limits.MaxGlobalStreamsPerUser(req.Tenant) / s.cfg.NumPartitions)
// Create a map of streams per partition
streams = make(map[int32][]Stream)
)
for _, stream := range req.Streams {
streams := req.Streams
valid := 0
for _, stream := range streams {
partitionID := int32(stream.StreamHash % uint64(s.cfg.NumPartitions))
// TODO(periklis): Do we need to report this as an error to the frontend?
@ -376,29 +389,33 @@ func (s *IngestLimits) ExceedsLimits(_ context.Context, req *proto.ExceedsLimits
continue
}
streams[partitionID] = append(streams[partitionID], Stream{
Hash: stream.StreamHash,
LastSeenAt: recordTime,
TotalSize: stream.TotalSize,
})
streams[valid] = stream
valid++
}
streams = streams[:valid]
storeRes := make(map[Reason][]uint64)
cond := streamLimitExceeded(maxActiveStreams, storeRes)
cond := streamLimitExceeded(maxActiveStreams)
accepted, rejected := s.metadata.StoreCond(req.Tenant, streams, recordTime, cutoff, bucketStart, bucketCutoff, cond)
ingestedBytes := s.metadata.StoreCond(req.Tenant, streams, cutoff, bucketStart, bucketCutoff, cond)
var ingestedBytes uint64
for _, stream := range accepted {
ingestedBytes += stream.TotalSize
var results []*proto.ExceedsLimitsResult
for reason, streamHashes := range storeRes {
for _, streamHash := range streamHashes {
results = append(results, &proto.ExceedsLimitsResult{
StreamHash: streamHash,
Reason: uint32(reason),
})
err := s.wal.Append(context.WithoutCancel(ctx), req.Tenant, stream)
if err != nil {
level.Error(s.logger).Log("msg", "failed to append stream metadata to WAL", "error", err)
}
}
s.metrics.tenantIngestedBytesTotal.WithLabelValues(req.Tenant).Add(float64(ingestedBytes))
results := make([]*proto.ExceedsLimitsResult, 0, len(rejected))
for _, stream := range rejected {
results = append(results, &proto.ExceedsLimitsResult{
StreamHash: stream.StreamHash,
Reason: uint32(ReasonExceedsMaxStreams),
})
}
return &proto.ExceedsLimitsResponse{Results: results}, nil
}

@ -41,6 +41,7 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) {
// Expectations.
expectedIngestedBytes float64
expectedResults []*proto.ExceedsLimitsResult
expectedAppendsTotal int
}{
{
name: "tenant not found",
@ -48,6 +49,7 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) {
assignedPartitionIDs: []int32{0},
numPartitions: 1,
metadata: &streamMetadata{
numPartitions: 1,
stripes: []map[string]map[int32]map[uint64]Stream{
{
"tenant1": {
@ -74,6 +76,7 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) {
},
// expect data
expectedIngestedBytes: 1010,
expectedAppendsTotal: 1,
},
{
name: "all existing streams still active",
@ -81,6 +84,7 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) {
assignedPartitionIDs: []int32{0},
numPartitions: 1,
metadata: &streamMetadata{
numPartitions: 1,
stripes: []map[string]map[int32]map[uint64]Stream{
{
"tenant1": {
@ -109,6 +113,7 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) {
},
// expect data
expectedIngestedBytes: 4040,
expectedAppendsTotal: 4,
},
{
name: "keep existing active streams and drop new streams",
@ -116,6 +121,7 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) {
assignedPartitionIDs: []int32{0},
numPartitions: 1,
metadata: &streamMetadata{
numPartitions: 1,
stripes: []map[string]map[int32]map[uint64]Stream{
{
"tenant1": {
@ -152,6 +158,7 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) {
assignedPartitionIDs: []int32{0},
numPartitions: 1,
metadata: &streamMetadata{
numPartitions: 1,
stripes: []map[string]map[int32]map[uint64]Stream{
{
"tenant1": {
@ -184,6 +191,7 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) {
{StreamHash: 0x2, Reason: uint32(ReasonExceedsMaxStreams)},
{StreamHash: 0x4, Reason: uint32(ReasonExceedsMaxStreams)},
},
expectedAppendsTotal: 3,
},
{
name: "update active streams and re-activate expired streams",
@ -191,6 +199,7 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) {
assignedPartitionIDs: []int32{0},
numPartitions: 1,
metadata: &streamMetadata{
numPartitions: 1,
stripes: []map[string]map[int32]map[uint64]Stream{
{
"tenant1": {
@ -221,6 +230,7 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) {
},
// expect data
expectedIngestedBytes: 5050,
expectedAppendsTotal: 5,
},
{
name: "drop streams per partition limit",
@ -228,7 +238,8 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) {
assignedPartitionIDs: []int32{0, 1},
numPartitions: 2,
metadata: &streamMetadata{
locks: make([]stripeLock, 2),
numPartitions: 2,
locks: make([]stripeLock, 2),
stripes: []map[string]map[int32]map[uint64]Stream{
make(map[string]map[int32]map[uint64]Stream),
make(map[string]map[int32]map[uint64]Stream),
@ -252,6 +263,7 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) {
{StreamHash: 0x3, Reason: uint32(ReasonExceedsMaxStreams)},
{StreamHash: 0x4, Reason: uint32(ReasonExceedsMaxStreams)},
},
expectedAppendsTotal: 2,
},
{
name: "skip streams assigned to partitions not owned by instance but enforce limit",
@ -259,7 +271,8 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) {
assignedPartitionIDs: []int32{0},
numPartitions: 2,
metadata: &streamMetadata{
locks: make([]stripeLock, 2),
numPartitions: 2,
locks: make([]stripeLock, 2),
stripes: []map[string]map[int32]map[uint64]Stream{
make(map[string]map[int32]map[uint64]Stream),
make(map[string]map[int32]map[uint64]Stream),
@ -282,6 +295,7 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) {
expectedResults: []*proto.ExceedsLimitsResult{
{StreamHash: 0x4, Reason: uint32(ReasonExceedsMaxStreams)},
},
expectedAppendsTotal: 1,
},
}
@ -292,6 +306,8 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) {
MaxGlobalStreams: tt.maxActiveStreams,
}
wal := &mockWAL{t: t, ExpectedAppendsTotal: tt.expectedAppendsTotal}
s := &IngestLimits{
cfg: Config{
NumPartitions: tt.numPartitions,
@ -319,6 +335,7 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) {
metadata: tt.metadata,
partitionManager: NewPartitionManager(log.NewNopLogger()),
clock: clock,
wal: wal,
}
// Assign the Partition IDs.
@ -347,6 +364,8 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) {
break
}
}
wal.AssertAppendsTotal()
})
}
}
@ -359,8 +378,11 @@ func TestIngestLimits_ExceedsLimits_Concurrent(t *testing.T) {
MaxGlobalStreams: 5,
}
wal := &mockWAL{t: t, ExpectedAppendsTotal: 50}
// Setup test data with a mix of active and expired streams>
metadata := &streamMetadata{
numPartitions: 1,
stripes: []map[string]map[int32]map[uint64]Stream{
{
"tenant1": {
@ -404,6 +426,7 @@ func TestIngestLimits_ExceedsLimits_Concurrent(t *testing.T) {
metrics: newMetrics(prometheus.NewRegistry()),
limits: limits,
clock: clock,
wal: wal,
}
// Assign the Partition IDs.
@ -426,18 +449,20 @@ func TestIngestLimits_ExceedsLimits_Concurrent(t *testing.T) {
resp, err := s.ExceedsLimits(context.Background(), req)
require.NoError(t, err)
require.NotNil(t, resp)
require.Nil(t, resp.Results)
require.Empty(t, resp.Results)
}()
}
// Wait for all goroutines to complete
wg.Wait()
wal.AssertAppendsTotal()
}
func TestNewIngestLimits(t *testing.T) {
cfg := Config{
KafkaConfig: kafka.Config{
Topic: "test-topic",
Topic: "test-topic",
WriteTimeout: 10 * time.Second,
},
WindowSize: time.Hour,
LifecyclerConfig: ring.LifecyclerConfig{
@ -464,7 +489,7 @@ func TestNewIngestLimits(t *testing.T) {
s, err := NewIngestLimits(cfg, limits, log.NewNopLogger(), prometheus.NewRegistry())
require.NoError(t, err)
require.NotNil(t, s)
require.NotNil(t, s.client)
require.NotNil(t, s.reader)
require.Equal(t, cfg, s.cfg)

@ -0,0 +1,33 @@
package limits
import (
"context"
"sync"
"testing"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/limits/proto"
)
type mockWAL struct {
t *testing.T
NumAppendsTotal int
ExpectedAppendsTotal int
mtx sync.Mutex
}
func (m *mockWAL) Append(_ context.Context, _ string, _ *proto.StreamMetadata) error {
m.mtx.Lock()
defer m.mtx.Unlock()
m.NumAppendsTotal++
return nil
}
func (m *mockWAL) Close() error {
return nil
}
func (m *mockWAL) AssertAppendsTotal() {
require.Equal(m.t, m.ExpectedAppendsTotal, m.NumAppendsTotal)
}

@ -303,6 +303,65 @@ func (m *StreamMetadata) GetTotalSize() uint64 {
return 0
}
type StreamMetadataRecord struct {
Zone string `protobuf:"bytes,1,opt,name=zone,proto3" json:"zone,omitempty"`
Tenant string `protobuf:"bytes,2,opt,name=tenant,proto3" json:"tenant,omitempty"`
Metadata *StreamMetadata `protobuf:"bytes,3,opt,name=metadata,proto3" json:"metadata,omitempty"`
}
func (m *StreamMetadataRecord) Reset() { *m = StreamMetadataRecord{} }
func (*StreamMetadataRecord) ProtoMessage() {}
func (*StreamMetadataRecord) Descriptor() ([]byte, []int) {
return fileDescriptor_aaed9e7d5298ac0f, []int{6}
}
func (m *StreamMetadataRecord) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *StreamMetadataRecord) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_StreamMetadataRecord.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *StreamMetadataRecord) XXX_Merge(src proto.Message) {
xxx_messageInfo_StreamMetadataRecord.Merge(m, src)
}
func (m *StreamMetadataRecord) XXX_Size() int {
return m.Size()
}
func (m *StreamMetadataRecord) XXX_DiscardUnknown() {
xxx_messageInfo_StreamMetadataRecord.DiscardUnknown(m)
}
var xxx_messageInfo_StreamMetadataRecord proto.InternalMessageInfo
func (m *StreamMetadataRecord) GetZone() string {
if m != nil {
return m.Zone
}
return ""
}
func (m *StreamMetadataRecord) GetTenant() string {
if m != nil {
return m.Tenant
}
return ""
}
func (m *StreamMetadataRecord) GetMetadata() *StreamMetadata {
if m != nil {
return m.Metadata
}
return nil
}
func init() {
proto.RegisterType((*ExceedsLimitsRequest)(nil), "proto.ExceedsLimitsRequest")
proto.RegisterType((*ExceedsLimitsResponse)(nil), "proto.ExceedsLimitsResponse")
@ -311,41 +370,44 @@ func init() {
proto.RegisterType((*GetAssignedPartitionsResponse)(nil), "proto.GetAssignedPartitionsResponse")
proto.RegisterMapType((map[int32]int64)(nil), "proto.GetAssignedPartitionsResponse.AssignedPartitionsEntry")
proto.RegisterType((*StreamMetadata)(nil), "proto.StreamMetadata")
proto.RegisterType((*StreamMetadataRecord)(nil), "proto.StreamMetadataRecord")
}
func init() { proto.RegisterFile("pkg/limits/proto/limits.proto", fileDescriptor_aaed9e7d5298ac0f) }
var fileDescriptor_aaed9e7d5298ac0f = []byte{
// 449 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x92, 0xcf, 0x6e, 0xd3, 0x40,
0x10, 0xc6, 0x77, 0x93, 0xa6, 0x55, 0x07, 0x8a, 0xd0, 0x92, 0x80, 0x15, 0xd2, 0x55, 0xb4, 0x70,
0xc8, 0x29, 0x91, 0x42, 0x0f, 0x08, 0x71, 0x01, 0x29, 0xfc, 0x91, 0x1a, 0x84, 0xb6, 0x0f, 0x80,
0x16, 0x3c, 0x0a, 0x56, 0xdd, 0x75, 0xf0, 0x8e, 0x11, 0xe5, 0xc4, 0x23, 0xf0, 0x18, 0x3c, 0x07,
0x27, 0x8e, 0x39, 0x70, 0xe8, 0x91, 0x38, 0x17, 0x8e, 0x7d, 0x04, 0xd4, 0xb5, 0x23, 0xfa, 0xc7,
0x29, 0x1c, 0x7a, 0xb2, 0x67, 0xe6, 0xd3, 0x6f, 0xfc, 0xcd, 0x67, 0xd8, 0x9e, 0xee, 0x4f, 0x06,
0x71, 0x74, 0x10, 0x91, 0x1b, 0x4c, 0xd3, 0x84, 0x92, 0xb2, 0xe8, 0xfb, 0x42, 0x34, 0xfc, 0x43,
0xbd, 0x81, 0xe6, 0xe8, 0xd3, 0x3b, 0xc4, 0xd0, 0xed, 0xfa, 0xa9, 0xc6, 0x0f, 0x19, 0x3a, 0x12,
0xb7, 0x61, 0x9d, 0xd0, 0x1a, 0x4b, 0x01, 0xef, 0xf2, 0xde, 0xa6, 0x2e, 0x2b, 0x31, 0x80, 0x0d,
0x47, 0x29, 0x9a, 0x03, 0x17, 0xd4, 0xba, 0xf5, 0xde, 0xb5, 0x61, 0xab, 0xe0, 0xf5, 0xf7, 0x7c,
0x77, 0x8c, 0x64, 0x42, 0x43, 0x46, 0x2f, 0x55, 0x6a, 0x0c, 0xad, 0x73, 0x0b, 0xdc, 0x34, 0xb1,
0x0e, 0xc5, 0x0e, 0x6c, 0xa4, 0xe8, 0xb2, 0x98, 0x5c, 0xc0, 0x3d, 0xa9, 0x5d, 0x92, 0xce, 0xcb,
0xb3, 0x98, 0xf4, 0x52, 0xaa, 0xc6, 0x70, 0xab, 0x62, 0x2e, 0x24, 0x40, 0xb1, 0xf0, 0x85, 0x71,
0xef, 0xfd, 0x27, 0xaf, 0xe9, 0x53, 0x9d, 0x13, 0x3b, 0x29, 0x1a, 0x97, 0xd8, 0xa0, 0xd6, 0xe5,
0xbd, 0x2d, 0x5d, 0x56, 0x4a, 0x42, 0xe7, 0x39, 0xd2, 0x13, 0xe7, 0xa2, 0x89, 0xc5, 0xf0, 0xb5,
0x49, 0x29, 0xa2, 0x28, 0xb1, 0xcb, 0x33, 0xa8, 0x9f, 0x1c, 0xb6, 0x57, 0x08, 0x4a, 0x1b, 0x31,
0x08, 0x73, 0x61, 0x5a, 0x3a, 0x7a, 0x5c, 0x3a, 0xba, 0x94, 0xd0, 0xbf, 0x38, 0x1a, 0x59, 0x4a,
0x0f, 0x75, 0x05, 0xb7, 0x3d, 0x82, 0x3b, 0x2b, 0xe4, 0xe2, 0x26, 0xd4, 0xf7, 0xf1, 0xd0, 0x7b,
0x6f, 0xe8, 0x93, 0x57, 0xd1, 0x84, 0xc6, 0x47, 0x13, 0x67, 0xe8, 0x3d, 0xd7, 0x75, 0x51, 0x3c,
0xaa, 0x3d, 0xe4, 0xea, 0x15, 0xdc, 0x38, 0x9b, 0xd7, 0x3f, 0x0f, 0xd8, 0x81, 0x4d, 0x4a, 0xc8,
0xc4, 0x7b, 0xd1, 0xe7, 0x82, 0xb7, 0xa6, 0xff, 0x36, 0x86, 0x21, 0x34, 0x5f, 0xda, 0x09, 0x3a,
0x2a, 0x42, 0x79, 0x96, 0x26, 0x96, 0xd0, 0x86, 0x62, 0x17, 0xb6, 0xce, 0xa4, 0x25, 0xee, 0x56,
0x67, 0xec, 0x8f, 0xdd, 0xee, 0xac, 0xf8, 0x01, 0xfc, 0x99, 0x14, 0x1b, 0x7e, 0xe7, 0x70, 0xfd,
0xf4, 0x9a, 0xab, 0xc5, 0x8b, 0x10, 0x5a, 0x95, 0x41, 0x89, 0x7b, 0x97, 0xc7, 0x58, 0xd0, 0xef,
0xff, 0x4f, 0xd6, 0x8a, 0x3d, 0xdd, 0x99, 0xcd, 0x25, 0x3b, 0x9a, 0x4b, 0x76, 0x3c, 0x97, 0xfc,
0x4b, 0x2e, 0xf9, 0xb7, 0x5c, 0xf2, 0x1f, 0xb9, 0xe4, 0xb3, 0x5c, 0xf2, 0x5f, 0xb9, 0xe4, 0xbf,
0x73, 0xc9, 0x8e, 0x73, 0xc9, 0xbf, 0x2e, 0x24, 0x9b, 0x2d, 0x24, 0x3b, 0x5a, 0x48, 0xf6, 0x76,
0xdd, 0xc3, 0x1f, 0xfc, 0x09, 0x00, 0x00, 0xff, 0xff, 0x14, 0x3f, 0x94, 0x86, 0xd5, 0x03, 0x00,
0x00,
// 487 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x92, 0xbf, 0x6e, 0x13, 0x41,
0x10, 0xc6, 0x6f, 0xed, 0x38, 0x21, 0x03, 0x41, 0x68, 0xb1, 0xe1, 0x64, 0x9c, 0x95, 0x75, 0x50,
0xb8, 0xb2, 0x85, 0x49, 0x81, 0x10, 0x0d, 0x48, 0xe6, 0x8f, 0x14, 0x23, 0xb4, 0x79, 0x00, 0xb4,
0xe4, 0x46, 0xe6, 0x94, 0xf3, 0x9e, 0xb9, 0x9d, 0x43, 0x24, 0x15, 0x8f, 0xc0, 0x63, 0xf0, 0x1c,
0x54, 0x94, 0x2e, 0x28, 0x52, 0xe2, 0x73, 0x43, 0x99, 0x47, 0x40, 0xd9, 0x5b, 0x83, 0xed, 0xd8,
0x81, 0x82, 0xca, 0x3b, 0x3b, 0x9f, 0xbf, 0xb9, 0x6f, 0xe7, 0x07, 0xbb, 0xa3, 0xa3, 0x41, 0x27,
0x8e, 0x86, 0x11, 0x99, 0xce, 0x28, 0x4d, 0x28, 0x71, 0x45, 0xdb, 0x16, 0xbc, 0x62, 0x7f, 0x82,
0x37, 0x50, 0xed, 0x7d, 0x3c, 0x44, 0x0c, 0xcd, 0xbe, 0xed, 0x4a, 0x7c, 0x9f, 0xa1, 0x21, 0x7e,
0x0b, 0x36, 0x09, 0xb5, 0xd2, 0xe4, 0xb3, 0x26, 0x6b, 0x6d, 0x4b, 0x57, 0xf1, 0x0e, 0x6c, 0x19,
0x4a, 0x51, 0x0d, 0x8d, 0x5f, 0x6a, 0x96, 0x5b, 0x57, 0xbb, 0xb5, 0xc2, 0xaf, 0x7d, 0x60, 0x6f,
0xfb, 0x48, 0x2a, 0x54, 0xa4, 0xe4, 0x4c, 0x15, 0xf4, 0xa1, 0xb6, 0x34, 0xc0, 0x8c, 0x12, 0x6d,
0x90, 0xef, 0xc1, 0x56, 0x8a, 0x26, 0x8b, 0xc9, 0xf8, 0xcc, 0x3a, 0xd5, 0x9d, 0xd3, 0xb2, 0x3c,
0x8b, 0x49, 0xce, 0xa4, 0x41, 0x1f, 0x6e, 0xae, 0xe8, 0x73, 0x01, 0x50, 0x0c, 0x7c, 0xa1, 0xcc,
0x3b, 0xfb, 0xc9, 0x1b, 0x72, 0xee, 0xe6, 0x3c, 0x4e, 0x8a, 0xca, 0x24, 0xda, 0x2f, 0x35, 0x59,
0x6b, 0x47, 0xba, 0x2a, 0x10, 0xd0, 0x78, 0x8e, 0xf4, 0xc4, 0x98, 0x68, 0xa0, 0x31, 0x7c, 0xad,
0x52, 0x8a, 0x28, 0x4a, 0xf4, 0xec, 0x19, 0x82, 0xef, 0x0c, 0x76, 0xd7, 0x08, 0x5c, 0x8c, 0x18,
0xb8, 0xba, 0xd0, 0x75, 0x89, 0x1e, 0xbb, 0x44, 0x97, 0x3a, 0xb4, 0x2f, 0xb6, 0x7a, 0x9a, 0xd2,
0x63, 0xb9, 0xc2, 0xb7, 0xde, 0x83, 0xdb, 0x6b, 0xe4, 0xfc, 0x06, 0x94, 0x8f, 0xf0, 0xd8, 0x66,
0xaf, 0xc8, 0xf3, 0x23, 0xaf, 0x42, 0xe5, 0x83, 0x8a, 0x33, 0xb4, 0x99, 0xcb, 0xb2, 0x28, 0x1e,
0x95, 0x1e, 0xb2, 0xe0, 0x15, 0x5c, 0x5f, 0xdc, 0xd7, 0x5f, 0x1f, 0xb0, 0x01, 0xdb, 0x94, 0x90,
0x8a, 0x0f, 0xa2, 0x93, 0xc2, 0x6f, 0x43, 0xfe, 0xb9, 0x08, 0x32, 0xa8, 0x2e, 0xed, 0x1f, 0x0f,
0x93, 0x34, 0xe4, 0x1c, 0x36, 0x4e, 0x12, 0x8d, 0x8e, 0x21, 0x7b, 0x9e, 0x23, 0xab, 0xb4, 0x40,
0xd6, 0x7d, 0xb8, 0x32, 0x74, 0xff, 0xf6, 0xcb, 0x4d, 0xb6, 0x1e, 0xad, 0xdf, 0xb2, 0x6e, 0x08,
0xd5, 0x97, 0x7a, 0x80, 0x86, 0x0a, 0x16, 0x9e, 0xa5, 0x89, 0x26, 0xd4, 0x21, 0xdf, 0x87, 0x9d,
0x05, 0x48, 0xf8, 0x9d, 0xd5, 0x68, 0xd9, 0x1d, 0xd7, 0x1b, 0x6b, 0xb8, 0xb3, 0xdb, 0x09, 0xbc,
0xee, 0x57, 0x06, 0xd7, 0xe6, 0xc7, 0xfc, 0x5f, 0x7b, 0x1e, 0x42, 0x6d, 0x25, 0x1f, 0xfc, 0xee,
0xe5, 0xf4, 0x14, 0xee, 0xf7, 0xfe, 0x05, 0xb1, 0xc0, 0x7b, 0xba, 0x37, 0x9e, 0x08, 0xef, 0x74,
0x22, 0xbc, 0xb3, 0x89, 0x60, 0x9f, 0x72, 0xc1, 0xbe, 0xe4, 0x82, 0x7d, 0xcb, 0x05, 0x1b, 0xe7,
0x82, 0xfd, 0xc8, 0x05, 0xfb, 0x99, 0x0b, 0xef, 0x2c, 0x17, 0xec, 0xf3, 0x54, 0x78, 0xe3, 0xa9,
0xf0, 0x4e, 0xa7, 0xc2, 0x7b, 0xbb, 0x69, 0xcd, 0x1f, 0xfc, 0x0a, 0x00, 0x00, 0xff, 0xff, 0x36,
0x1b, 0xa0, 0x70, 0x4c, 0x04, 0x00, 0x00,
}
func (this *ExceedsLimitsRequest) Equal(that interface{}) bool {
@ -513,6 +575,36 @@ func (this *StreamMetadata) Equal(that interface{}) bool {
}
return true
}
func (this *StreamMetadataRecord) Equal(that interface{}) bool {
if that == nil {
return this == nil
}
that1, ok := that.(*StreamMetadataRecord)
if !ok {
that2, ok := that.(StreamMetadataRecord)
if ok {
that1 = &that2
} else {
return false
}
}
if that1 == nil {
return this == nil
} else if this == nil {
return false
}
if this.Zone != that1.Zone {
return false
}
if this.Tenant != that1.Tenant {
return false
}
if !this.Metadata.Equal(that1.Metadata) {
return false
}
return true
}
func (this *ExceedsLimitsRequest) GoString() string {
if this == nil {
return "nil"
@ -591,6 +683,20 @@ func (this *StreamMetadata) GoString() string {
s = append(s, "}")
return strings.Join(s, "")
}
func (this *StreamMetadataRecord) GoString() string {
if this == nil {
return "nil"
}
s := make([]string, 0, 7)
s = append(s, "&proto.StreamMetadataRecord{")
s = append(s, "Zone: "+fmt.Sprintf("%#v", this.Zone)+",\n")
s = append(s, "Tenant: "+fmt.Sprintf("%#v", this.Tenant)+",\n")
if this.Metadata != nil {
s = append(s, "Metadata: "+fmt.Sprintf("%#v", this.Metadata)+",\n")
}
s = append(s, "}")
return strings.Join(s, "")
}
func valueToGoStringLimits(v interface{}, typ string) string {
rv := reflect.ValueOf(v)
if rv.IsNil() {
@ -996,6 +1102,55 @@ func (m *StreamMetadata) MarshalToSizedBuffer(dAtA []byte) (int, error) {
return len(dAtA) - i, nil
}
func (m *StreamMetadataRecord) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *StreamMetadataRecord) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *StreamMetadataRecord) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.Metadata != nil {
{
size, err := m.Metadata.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintLimits(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x1a
}
if len(m.Tenant) > 0 {
i -= len(m.Tenant)
copy(dAtA[i:], m.Tenant)
i = encodeVarintLimits(dAtA, i, uint64(len(m.Tenant)))
i--
dAtA[i] = 0x12
}
if len(m.Zone) > 0 {
i -= len(m.Zone)
copy(dAtA[i:], m.Zone)
i = encodeVarintLimits(dAtA, i, uint64(len(m.Zone)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func encodeVarintLimits(dAtA []byte, offset int, v uint64) int {
offset -= sovLimits(v)
base := offset
@ -1097,6 +1252,27 @@ func (m *StreamMetadata) Size() (n int) {
return n
}
func (m *StreamMetadataRecord) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.Zone)
if l > 0 {
n += 1 + l + sovLimits(uint64(l))
}
l = len(m.Tenant)
if l > 0 {
n += 1 + l + sovLimits(uint64(l))
}
if m.Metadata != nil {
l = m.Metadata.Size()
n += 1 + l + sovLimits(uint64(l))
}
return n
}
func sovLimits(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}
@ -1185,6 +1361,18 @@ func (this *StreamMetadata) String() string {
}, "")
return s
}
func (this *StreamMetadataRecord) String() string {
if this == nil {
return "nil"
}
s := strings.Join([]string{`&StreamMetadataRecord{`,
`Zone:` + fmt.Sprintf("%v", this.Zone) + `,`,
`Tenant:` + fmt.Sprintf("%v", this.Tenant) + `,`,
`Metadata:` + strings.Replace(this.Metadata.String(), "StreamMetadata", "StreamMetadata", 1) + `,`,
`}`,
}, "")
return s
}
func valueToStringLimits(v interface{}) string {
rv := reflect.ValueOf(v)
if rv.IsNil() {
@ -1786,6 +1974,159 @@ func (m *StreamMetadata) Unmarshal(dAtA []byte) error {
}
return nil
}
func (m *StreamMetadataRecord) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowLimits
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: StreamMetadataRecord: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: StreamMetadataRecord: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Zone", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowLimits
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthLimits
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthLimits
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Zone = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Tenant", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowLimits
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthLimits
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthLimits
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Tenant = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Metadata", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowLimits
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthLimits
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthLimits
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Metadata == nil {
m.Metadata = &StreamMetadata{}
}
if err := m.Metadata.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipLimits(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthLimits
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthLimits
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipLimits(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0

@ -35,3 +35,9 @@ message StreamMetadata {
uint64 streamHash = 1;
uint64 totalSize = 2;
}
message StreamMetadataRecord {
string zone = 1;
string tenant = 2;
StreamMetadata metadata = 3;
}

@ -3,6 +3,8 @@ package limits
import (
"hash/fnv"
"sync"
"github.com/grafana/loki/v3/pkg/limits/proto"
)
// AllFunc is a function that is called for each stream in the metadata.
@ -18,7 +20,7 @@ type UsageFunc = func(partitionID int32, stream Stream)
// CondFunc is a function that is called for each stream in the metadata.
// It is used to check if the stream should be stored.
// It returns true if the stream should be stored, false otherwise.
type CondFunc = func(acc float64, stream Stream) bool
type CondFunc = func(acc float64, stream *proto.StreamMetadata) bool
// StreamMetadata represents the ingest limits interface for the stream metadata.
type StreamMetadata interface {
@ -31,7 +33,7 @@ type StreamMetadata interface {
// StoreCond tries to store a list of streams for a specific tenant and partition,
// until the partition limit is reached. It returns the total ingested bytes.
StoreCond(tenant string, streams map[int32][]Stream, cutoff, bucketStart, bucketCutOff int64, cond CondFunc) uint64
StoreCond(tenant string, streams []*proto.StreamMetadata, lastSeenAt, cutoff, bucketStart, bucketCutOff int64, cond CondFunc) ([]*proto.StreamMetadata, []*proto.StreamMetadata)
// Store updates or creates the stream metadata for a specific tenant and partition.
Store(tenant string, partitionID int32, streamHash, recTotalSize uint64, recordTime, bucketStart, bucketCutOff int64)
@ -67,14 +69,16 @@ type stripeLock struct {
}
type streamMetadata struct {
stripes []map[string]map[int32]map[uint64]Stream // stripe -> tenant -> partitionID -> streamMetadata
locks []stripeLock
numPartitions int
stripes []map[string]map[int32]map[uint64]Stream // stripe -> tenant -> partitionID -> streamMetadata
locks []stripeLock
}
func NewStreamMetadata(size int) StreamMetadata {
func NewStreamMetadata(numPartitions int) StreamMetadata {
s := &streamMetadata{
stripes: make([]map[string]map[int32]map[uint64]Stream, size),
locks: make([]stripeLock, size),
numPartitions: numPartitions,
stripes: make([]map[string]map[int32]map[uint64]Stream, numPartitions),
locks: make([]stripeLock, numPartitions),
}
for i := range s.stripes {
s.stripes[i] = make(map[string]map[int32]map[uint64]Stream)
@ -104,56 +108,58 @@ func (s *streamMetadata) Usage(tenant string, fn UsageFunc) {
})
}
func (s *streamMetadata) StoreCond(tenant string, streams map[int32][]Stream, cutoff, bucketStart, bucketCutOff int64, cond CondFunc) uint64 {
var ingestedBytes uint64
func (s *streamMetadata) StoreCond(tenant string, streams []*proto.StreamMetadata, lastSeenAt, cutoff, bucketStart, bucketCutOff int64, cond CondFunc) ([]*proto.StreamMetadata, []*proto.StreamMetadata) {
stored := make([]*proto.StreamMetadata, 0, len(streams))
rejected := make([]*proto.StreamMetadata, 0, len(streams))
s.withLock(tenant, func(i int) {
if _, ok := s.stripes[i][tenant]; !ok {
s.stripes[i][tenant] = make(map[int32]map[uint64]Stream)
}
for partitionID, streams := range streams {
activeStreams := make(map[int32]int)
for _, stream := range streams {
partitionID := int32(stream.StreamHash % uint64(s.numPartitions))
if _, ok := s.stripes[i][tenant][partitionID]; !ok {
s.stripes[i][tenant][partitionID] = make(map[uint64]Stream)
}
var (
activeStreams = 0
newStreams = 0
)
// Count as active streams all stream that are not expired.
for _, stored := range s.stripes[i][tenant][partitionID] {
if stored.LastSeenAt >= cutoff {
activeStreams++
// Count as active streams all streams that are not expired.
if _, ok := activeStreams[partitionID]; !ok {
for _, stored := range s.stripes[i][tenant][partitionID] {
if stored.LastSeenAt >= cutoff {
activeStreams[partitionID]++
}
}
}
for _, stream := range streams {
stored, found := s.stripes[i][tenant][partitionID][stream.Hash]
recorded, found := s.stripes[i][tenant][partitionID][stream.StreamHash]
// If the stream is new or expired, check if it exceeds the limit.
// If limit is not exceeded and the stream is expired, reset the stream.
if !found || (stored.LastSeenAt < cutoff) {
// Count up the new stream before updating
newStreams++
// If the stream is new or expired, check if it exceeds the limit.
// If limit is not exceeded and the stream is expired, reset the stream.
if !found || (recorded.LastSeenAt < cutoff) {
activeStreams[partitionID]++
if !cond(float64(activeStreams+newStreams), stream) {
continue
}
if !cond(float64(activeStreams[partitionID]), stream) {
rejected = append(rejected, stream)
continue
}
// If the stream is stored and expired, reset the stream
if found && stored.LastSeenAt < cutoff {
s.stripes[i][tenant][partitionID][stream.Hash] = Stream{Hash: stream.Hash, LastSeenAt: stream.LastSeenAt}
}
// If the stream is stored and expired, reset the stream
if found && recorded.LastSeenAt < cutoff {
s.stripes[i][tenant][partitionID][stream.StreamHash] = Stream{Hash: stream.StreamHash, LastSeenAt: lastSeenAt}
}
}
s.storeStream(i, tenant, partitionID, stream.Hash, stream.TotalSize, stream.LastSeenAt, bucketStart, bucketCutOff)
s.storeStream(i, tenant, partitionID, stream.StreamHash, stream.TotalSize, lastSeenAt, bucketStart, bucketCutOff)
ingestedBytes += stream.TotalSize
}
stored = append(stored, stream)
}
})
return ingestedBytes
return stored, rejected
}
func (s *streamMetadata) Store(tenant string, partitionID int32, streamHash, recTotalSize uint64, recordTime, bucketStart, bucketCutOff int64) {
@ -300,12 +306,8 @@ func (s *streamMetadata) getStripe(tenant string) int {
// streamLimitExceeded returns a CondFunc that checks if the number of active streams
// exceeds the given limit. If it does, the stream is added to the results map.
func streamLimitExceeded(limit uint64, results map[Reason][]uint64) CondFunc {
return func(acc float64, stream Stream) bool {
if acc > float64(limit) {
results[ReasonExceedsMaxStreams] = append(results[ReasonExceedsMaxStreams], stream.Hash)
return false
}
return true
func streamLimitExceeded(limit uint64) CondFunc {
return func(acc float64, _ *proto.StreamMetadata) bool {
return acc <= float64(limit)
}
}

@ -497,13 +497,12 @@ func TestStreamMetadata_StoreCond(t *testing.T) {
// setup data
metadata *streamMetadata
streams map[int32][]Stream
streams []*proto.StreamMetadata
maxActiveStreams uint64
// expectations
expectedStored map[string]map[int32][]Stream
expectedDropped map[Reason][]uint64
expectedIngestedBytes uint64
expectedStored []*proto.StreamMetadata
expectedRejected []*proto.StreamMetadata
}{
{
name: "no streams",
@ -511,118 +510,91 @@ func TestStreamMetadata_StoreCond(t *testing.T) {
stripes: []map[string]map[int32]map[uint64]Stream{make(map[string]map[int32]map[uint64]Stream)},
locks: make([]stripeLock, 1),
},
streams: map[int32][]Stream{},
maxActiveStreams: 10,
expectedStored: map[string]map[int32][]Stream{},
},
{
name: "all streams within partition limit",
metadata: &streamMetadata{
stripes: []map[string]map[int32]map[uint64]Stream{make(map[string]map[int32]map[uint64]Stream)},
locks: make([]stripeLock, 1),
numPartitions: 1,
stripes: []map[string]map[int32]map[uint64]Stream{make(map[string]map[int32]map[uint64]Stream)},
locks: make([]stripeLock, 1),
},
streams: []*proto.StreamMetadata{
{StreamHash: 0x0, TotalSize: 1000},
{StreamHash: 0x1, TotalSize: 1000},
},
streams: map[int32][]Stream{0: {
{Hash: 0x0, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
{Hash: 0x1, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
}},
maxActiveStreams: 2,
expectedStored: map[string]map[int32][]Stream{
"tenant1": {
0: []Stream{
{Hash: 0x0, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
{Hash: 0x1, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
},
},
expectedStored: []*proto.StreamMetadata{
{StreamHash: 0x0, TotalSize: 1000},
{StreamHash: 0x1, TotalSize: 1000},
},
expectedIngestedBytes: 2000,
},
{
name: "all stream within limit per partition",
metadata: &streamMetadata{
stripes: []map[string]map[int32]map[uint64]Stream{make(map[string]map[int32]map[uint64]Stream)},
locks: make([]stripeLock, 2),
numPartitions: 1,
stripes: []map[string]map[int32]map[uint64]Stream{make(map[string]map[int32]map[uint64]Stream)},
locks: make([]stripeLock, 1),
},
streams: map[int32][]Stream{
0: {
{Hash: 0x0, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
},
1: {
{Hash: 0x1, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
},
streams: []*proto.StreamMetadata{
{StreamHash: 0x0, TotalSize: 1000},
{StreamHash: 0x1, TotalSize: 1000},
},
maxActiveStreams: 2,
expectedStored: map[string]map[int32][]Stream{
"tenant1": {
0: []Stream{
{Hash: 0x0, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
},
1: []Stream{
{Hash: 0x1, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
},
},
expectedStored: []*proto.StreamMetadata{
{StreamHash: 0x0, TotalSize: 1000},
{StreamHash: 0x1, TotalSize: 1000},
},
expectedIngestedBytes: 2000,
},
{
name: "some streams dropped",
metadata: &streamMetadata{
stripes: []map[string]map[int32]map[uint64]Stream{make(map[string]map[int32]map[uint64]Stream)},
locks: make([]stripeLock, 1),
numPartitions: 1,
stripes: []map[string]map[int32]map[uint64]Stream{make(map[string]map[int32]map[uint64]Stream)},
locks: make([]stripeLock, 1),
},
streams: map[int32][]Stream{
0: {
{Hash: 0x0, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
{Hash: 0x1, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
},
streams: []*proto.StreamMetadata{
{StreamHash: 0x0, TotalSize: 1000},
{StreamHash: 0x1, TotalSize: 1000},
},
maxActiveStreams: 1,
expectedStored: map[string]map[int32][]Stream{
"tenant1": {
0: []Stream{
{Hash: 0x0, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
},
},
expectedStored: []*proto.StreamMetadata{
{StreamHash: 0x0, TotalSize: 1000},
},
expectedIngestedBytes: 1000,
expectedDropped: map[Reason][]uint64{
ReasonExceedsMaxStreams: {0x1},
expectedRejected: []*proto.StreamMetadata{
{StreamHash: 0x1, TotalSize: 1000},
},
},
{
name: "some streams dropped per partition",
metadata: &streamMetadata{
stripes: []map[string]map[int32]map[uint64]Stream{make(map[string]map[int32]map[uint64]Stream)},
locks: make([]stripeLock, 2),
},
streams: map[int32][]Stream{
0: {
{Hash: 0x0, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
{Hash: 0x1, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
},
1: {
{Hash: 0x2, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
{Hash: 0x3, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
numPartitions: 2,
stripes: []map[string]map[int32]map[uint64]Stream{
make(map[string]map[int32]map[uint64]Stream),
make(map[string]map[int32]map[uint64]Stream),
},
locks: make([]stripeLock, 2),
},
streams: []*proto.StreamMetadata{
{StreamHash: 0x0, TotalSize: 1000}, // 0 % 2 = 0
{StreamHash: 0x1, TotalSize: 1000}, // 1 % 2 = 1
{StreamHash: 0x2, TotalSize: 1000}, // 2 % 2 = 0
{StreamHash: 0x3, TotalSize: 1000}, // 3 % 2 = 1
},
maxActiveStreams: 1,
expectedStored: map[string]map[int32][]Stream{
"tenant1": {
0: []Stream{
{Hash: 0x0, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
},
1: []Stream{
{Hash: 0x2, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
},
},
expectedStored: []*proto.StreamMetadata{
{StreamHash: 0x0, TotalSize: 1000},
{StreamHash: 0x1, TotalSize: 1000},
},
expectedIngestedBytes: 2000,
expectedDropped: map[Reason][]uint64{
ReasonExceedsMaxStreams: {0x1, 0x3},
expectedRejected: []*proto.StreamMetadata{
{StreamHash: 0x2, TotalSize: 1000},
{StreamHash: 0x3, TotalSize: 1000},
},
},
{
name: "some streams dropped from a single partition",
metadata: &streamMetadata{
numPartitions: 2,
stripes: []map[string]map[int32]map[uint64]Stream{
{
"tenant1": {
@ -634,84 +606,64 @@ func TestStreamMetadata_StoreCond(t *testing.T) {
}},
locks: make([]stripeLock, 2),
},
streams: map[int32][]Stream{
0: {
{Hash: 0x0, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
},
1: {
{Hash: 0x2, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
{Hash: 0x3, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
},
streams: []*proto.StreamMetadata{
{StreamHash: 0x0, TotalSize: 1000},
{StreamHash: 0x3, TotalSize: 1000},
{StreamHash: 0x5, TotalSize: 1000},
},
maxActiveStreams: 2,
expectedStored: map[string]map[int32][]Stream{
"tenant1": {
0: []Stream{
{Hash: 0x0, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
},
1: []Stream{
{Hash: 0x1, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
{Hash: 0x2, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
},
},
expectedStored: []*proto.StreamMetadata{
{StreamHash: 0x0, TotalSize: 1000},
{StreamHash: 0x3, TotalSize: 1000},
},
expectedIngestedBytes: 2000,
expectedDropped: map[Reason][]uint64{
ReasonExceedsMaxStreams: {0x3},
expectedRejected: []*proto.StreamMetadata{
{StreamHash: 0x5, TotalSize: 1000},
},
},
{
name: "drops new streams but updates existing streams",
metadata: &streamMetadata{
numPartitions: 2,
stripes: []map[string]map[int32]map[uint64]Stream{
{
"tenant1": {
0: {
0x0: {Hash: 0x0, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
0x1: {Hash: 0x1, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
0x4: {Hash: 0x4, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
},
1: {
0x1: {Hash: 0x1, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
0x3: {Hash: 0x3, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
0x4: {Hash: 0x4, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
},
},
},
},
locks: make([]stripeLock, 1),
locks: make([]stripeLock, 2),
},
streams: map[int32][]Stream{
0: { // Mixed order of new and existing streams
{Hash: 0x0, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
{Hash: 0x2, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
{Hash: 0x1, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
},
1: { // Mixed order of new and existing streams
{Hash: 0x3, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
{Hash: 0x5, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
{Hash: 0x4, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
},
streams: []*proto.StreamMetadata{
{StreamHash: 0x0, TotalSize: 1000}, // 0 % 2 = 0 Existing
{StreamHash: 0x2, TotalSize: 1000}, // 2 % 2 = 0 New
{StreamHash: 0x1, TotalSize: 1000}, // 1 % 2 = 1 Existing
{StreamHash: 0x3, TotalSize: 1000}, // 3 % 2 = 1 Existing
{StreamHash: 0x5, TotalSize: 1000}, // 5 % 2 = 1 New
{StreamHash: 0x4, TotalSize: 1000}, // 4 % 2 = 0 Existing
},
maxActiveStreams: 2,
expectedStored: map[string]map[int32][]Stream{
"tenant1": {
0: []Stream{
{Hash: 0x0, LastSeenAt: now.UnixNano(), TotalSize: 2000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 2000}}},
{Hash: 0x1, LastSeenAt: now.UnixNano(), TotalSize: 2000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 2000}}},
},
1: []Stream{
{Hash: 0x3, LastSeenAt: now.UnixNano(), TotalSize: 2000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 2000}}},
{Hash: 0x4, LastSeenAt: now.UnixNano(), TotalSize: 2000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 2000}}},
},
},
expectedStored: []*proto.StreamMetadata{
{StreamHash: 0x0, TotalSize: 1000},
{StreamHash: 0x1, TotalSize: 1000},
{StreamHash: 0x3, TotalSize: 1000},
{StreamHash: 0x4, TotalSize: 1000},
},
expectedIngestedBytes: 4000,
expectedDropped: map[Reason][]uint64{
ReasonExceedsMaxStreams: {0x2, 0x5},
expectedRejected: []*proto.StreamMetadata{
{StreamHash: 0x2, TotalSize: 1000},
{StreamHash: 0x5, TotalSize: 1000},
},
},
{
name: "reset expired but not evicted streams",
metadata: &streamMetadata{
numPartitions: 1,
stripes: []map[string]map[int32]map[uint64]Stream{
{
"tenant1": {
@ -725,54 +677,25 @@ func TestStreamMetadata_StoreCond(t *testing.T) {
locks: make([]stripeLock, 1),
},
maxActiveStreams: 2,
streams: map[int32][]Stream{
0: {
{Hash: 0x0, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
{Hash: 0x1, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
},
streams: []*proto.StreamMetadata{
{StreamHash: 0x0, TotalSize: 1000},
{StreamHash: 0x1, TotalSize: 1000},
},
expectedIngestedBytes: 2000,
expectedStored: map[string]map[int32][]Stream{
"tenant1": {
0: []Stream{
{Hash: 0x0, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}},
{Hash: 0x1, LastSeenAt: now.UnixNano(), TotalSize: 2000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 2000}}},
},
},
expectedStored: []*proto.StreamMetadata{
{StreamHash: 0x0, TotalSize: 1000},
{StreamHash: 0x1, TotalSize: 1000},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actualDropped := make(map[Reason][]uint64)
cond := streamLimitExceeded(tt.maxActiveStreams, actualDropped)
actualIngestedBytes := tt.metadata.StoreCond("tenant1", tt.streams, cutoff, bucketStart, bucketCutOff, cond)
cond := streamLimitExceeded(tt.maxActiveStreams)
actualStored := make(map[string]map[int32][]Stream)
tt.metadata.All(func(tenant string, partitionID int32, stream Stream) {
if _, ok := actualStored[tenant]; !ok {
actualStored[tenant] = make(map[int32][]Stream)
}
actualStored[tenant][partitionID] = append(actualStored[tenant][partitionID], stream)
})
for tenant, partitions := range tt.expectedStored {
require.Len(t, actualStored[tenant], len(partitions))
for partitionID, streams := range partitions {
require.Len(t, actualStored[tenant][partitionID], len(streams))
require.ElementsMatch(t, streams, actualStored[tenant][partitionID])
}
}
for reason, streamHashes := range tt.expectedDropped {
require.Contains(t, actualDropped, reason)
require.ElementsMatch(t, streamHashes, actualDropped[reason])
}
stored, rejected := tt.metadata.StoreCond("tenant1", tt.streams, now.UnixNano(), cutoff, bucketStart, bucketCutOff, cond)
require.Equal(t, tt.expectedIngestedBytes, actualIngestedBytes)
require.ElementsMatch(t, tt.expectedStored, stored)
require.ElementsMatch(t, tt.expectedRejected, rejected)
})
}
}

@ -0,0 +1,73 @@
package limits
import (
"context"
"fmt"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/grafana/loki/v3/pkg/limits/proto"
)
type WAL interface {
// Append writes the metadata record to the end of the WAL.
Append(ctx context.Context, tenant string, metadata *proto.StreamMetadata) error
// Close closes the WAL.
Close() error
}
// KafkaWAL is a write-ahead log on top of Kafka.
type KafkaWAL struct {
client *kgo.Client
topic string
partitions uint64
logger log.Logger
}
func NewKafkaWAL(client *kgo.Client, topic string, partitions uint64, logger log.Logger) *KafkaWAL {
return &KafkaWAL{
client: client,
topic: topic,
partitions: partitions,
logger: logger,
}
}
// Append implements the WAL interface.
func (w *KafkaWAL) Append(ctx context.Context, tenant string, metadata *proto.StreamMetadata) error {
v := proto.StreamMetadataRecord{
Tenant: tenant,
Metadata: metadata,
}
b, err := v.Marshal()
if err != nil {
return fmt.Errorf("failed to marshal proto: %w", err)
}
// The stream metadata topic expects a fixed number of partitions,
// the size of which is determined ahead of time. Streams are
// sharded over partitions using a simple mod.
partition := int32(metadata.StreamHash % w.partitions)
r := kgo.Record{
Key: []byte(tenant),
Value: b,
Partition: partition,
Topic: w.topic,
}
w.client.Produce(ctx, &r, w.logProduceErr)
return nil
}
// Close implements the WAL interface.
func (w *KafkaWAL) Close() error {
w.client.Close()
return nil
}
func (w *KafkaWAL) logProduceErr(_ *kgo.Record, err error) {
if err != nil {
level.Error(w.logger).Log("msg", "failed to produce record", "err", err.Error())
}
}
Loading…
Cancel
Save