diff --git a/pkg/limits/frontend/frontend.go b/pkg/limits/frontend/frontend.go index 577dd7bf7a..34cf812af7 100644 --- a/pkg/limits/frontend/frontend.go +++ b/pkg/limits/frontend/frontend.go @@ -97,7 +97,7 @@ func (f *Frontend) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimitsRe for _, resp := range resps { results = append(results, resp.Results...) } - return &proto.ExceedsLimitsResponse{results}, nil + return &proto.ExceedsLimitsResponse{Results: results}, nil } func (f *Frontend) CheckReady(ctx context.Context) error { diff --git a/pkg/limits/ingest_limits.go b/pkg/limits/ingest_limits.go index 44c177a570..b993491d2e 100644 --- a/pkg/limits/ingest_limits.go +++ b/pkg/limits/ingest_limits.go @@ -93,7 +93,8 @@ type IngestLimits struct { cfg Config logger log.Logger - client *kgo.Client + reader *kgo.Client + writer *kgo.Client lifecycler *ring.Lifecycler lifecyclerWatcher *services.FailureWatcher @@ -106,6 +107,7 @@ type IngestLimits struct { // Track stream metadata metadata StreamMetadata + wal WAL // Track partition assignments partitionManager *PartitionManager @@ -157,7 +159,7 @@ func NewIngestLimits(cfg Config, lims Limits, logger log.Logger, reg prometheus. kCfg.AutoCreateTopicEnabled = true kCfg.AutoCreateTopicDefaultPartitions = cfg.NumPartitions - s.client, err = client.NewReaderClient("ingest-limits", kCfg, logger, reg, + s.reader, err = client.NewReaderClient("ingest-limits-reader", kCfg, logger, reg, kgo.ConsumerGroup(consumerGroup), kgo.ConsumeTopics(kCfg.Topic), kgo.Balancers(kgo.CooperativeStickyBalancer()), @@ -170,6 +172,12 @@ func NewIngestLimits(cfg Config, lims Limits, logger log.Logger, reg prometheus. return nil, fmt.Errorf("failed to create kafka client: %w", err) } + s.writer, err = client.NewWriterClient("ingest-limits-writer", kCfg, 20, logger, reg) + if err != nil { + return nil, fmt.Errorf("failed to create kafka client: %w", err) + } + s.wal = NewKafkaWAL(s.writer, s.cfg.KafkaConfig.Topic, uint64(s.cfg.NumPartitions), logger) + s.Service = services.NewBasicService(s.starting, s.running, s.stopping) return s, nil } @@ -325,9 +333,14 @@ func (s *IngestLimits) updateMetadata(rec *proto.StreamMetadata, tenant string, // It returns nil for expected termination cases (context cancellation or client closure) // and returns the original error for other failure cases. func (s *IngestLimits) stopping(failureCase error) error { - if s.client != nil { - s.client.Close() + if s.reader != nil { + s.reader.Close() + } + + if s.wal != nil { + s.wal.Close() } + if errors.Is(failureCase, context.Canceled) || errors.Is(failureCase, kgo.ErrClientClosed) { return nil } @@ -350,7 +363,7 @@ func (s *IngestLimits) GetAssignedPartitions(_ context.Context, _ *proto.GetAssi // ExceedsLimits implements the proto.IngestLimitsServer interface. // It returns the number of active streams for a tenant and the status of requested streams. -func (s *IngestLimits) ExceedsLimits(_ context.Context, req *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error) { +func (s *IngestLimits) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error) { var ( lastSeenAt = s.clock.Now() // Use the provided lastSeenAt timestamp as the last seen time @@ -363,11 +376,11 @@ func (s *IngestLimits) ExceedsLimits(_ context.Context, req *proto.ExceedsLimits bucketCutoff = lastSeenAt.Add(-s.cfg.RateWindow).UnixNano() // Calculate the max active streams per tenant per partition maxActiveStreams = uint64(s.limits.MaxGlobalStreamsPerUser(req.Tenant) / s.cfg.NumPartitions) - // Create a map of streams per partition - streams = make(map[int32][]Stream) ) - for _, stream := range req.Streams { + streams := req.Streams + valid := 0 + for _, stream := range streams { partitionID := int32(stream.StreamHash % uint64(s.cfg.NumPartitions)) // TODO(periklis): Do we need to report this as an error to the frontend? @@ -376,29 +389,33 @@ func (s *IngestLimits) ExceedsLimits(_ context.Context, req *proto.ExceedsLimits continue } - streams[partitionID] = append(streams[partitionID], Stream{ - Hash: stream.StreamHash, - LastSeenAt: recordTime, - TotalSize: stream.TotalSize, - }) + streams[valid] = stream + valid++ } + streams = streams[:valid] - storeRes := make(map[Reason][]uint64) - cond := streamLimitExceeded(maxActiveStreams, storeRes) + cond := streamLimitExceeded(maxActiveStreams) + accepted, rejected := s.metadata.StoreCond(req.Tenant, streams, recordTime, cutoff, bucketStart, bucketCutoff, cond) - ingestedBytes := s.metadata.StoreCond(req.Tenant, streams, cutoff, bucketStart, bucketCutoff, cond) + var ingestedBytes uint64 + for _, stream := range accepted { + ingestedBytes += stream.TotalSize - var results []*proto.ExceedsLimitsResult - for reason, streamHashes := range storeRes { - for _, streamHash := range streamHashes { - results = append(results, &proto.ExceedsLimitsResult{ - StreamHash: streamHash, - Reason: uint32(reason), - }) + err := s.wal.Append(context.WithoutCancel(ctx), req.Tenant, stream) + if err != nil { + level.Error(s.logger).Log("msg", "failed to append stream metadata to WAL", "error", err) } } s.metrics.tenantIngestedBytesTotal.WithLabelValues(req.Tenant).Add(float64(ingestedBytes)) + results := make([]*proto.ExceedsLimitsResult, 0, len(rejected)) + for _, stream := range rejected { + results = append(results, &proto.ExceedsLimitsResult{ + StreamHash: stream.StreamHash, + Reason: uint32(ReasonExceedsMaxStreams), + }) + } + return &proto.ExceedsLimitsResponse{Results: results}, nil } diff --git a/pkg/limits/ingest_limits_test.go b/pkg/limits/ingest_limits_test.go index 3d08cb973f..279566f30d 100644 --- a/pkg/limits/ingest_limits_test.go +++ b/pkg/limits/ingest_limits_test.go @@ -41,6 +41,7 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) { // Expectations. expectedIngestedBytes float64 expectedResults []*proto.ExceedsLimitsResult + expectedAppendsTotal int }{ { name: "tenant not found", @@ -48,6 +49,7 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) { assignedPartitionIDs: []int32{0}, numPartitions: 1, metadata: &streamMetadata{ + numPartitions: 1, stripes: []map[string]map[int32]map[uint64]Stream{ { "tenant1": { @@ -74,6 +76,7 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) { }, // expect data expectedIngestedBytes: 1010, + expectedAppendsTotal: 1, }, { name: "all existing streams still active", @@ -81,6 +84,7 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) { assignedPartitionIDs: []int32{0}, numPartitions: 1, metadata: &streamMetadata{ + numPartitions: 1, stripes: []map[string]map[int32]map[uint64]Stream{ { "tenant1": { @@ -109,6 +113,7 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) { }, // expect data expectedIngestedBytes: 4040, + expectedAppendsTotal: 4, }, { name: "keep existing active streams and drop new streams", @@ -116,6 +121,7 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) { assignedPartitionIDs: []int32{0}, numPartitions: 1, metadata: &streamMetadata{ + numPartitions: 1, stripes: []map[string]map[int32]map[uint64]Stream{ { "tenant1": { @@ -152,6 +158,7 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) { assignedPartitionIDs: []int32{0}, numPartitions: 1, metadata: &streamMetadata{ + numPartitions: 1, stripes: []map[string]map[int32]map[uint64]Stream{ { "tenant1": { @@ -184,6 +191,7 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) { {StreamHash: 0x2, Reason: uint32(ReasonExceedsMaxStreams)}, {StreamHash: 0x4, Reason: uint32(ReasonExceedsMaxStreams)}, }, + expectedAppendsTotal: 3, }, { name: "update active streams and re-activate expired streams", @@ -191,6 +199,7 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) { assignedPartitionIDs: []int32{0}, numPartitions: 1, metadata: &streamMetadata{ + numPartitions: 1, stripes: []map[string]map[int32]map[uint64]Stream{ { "tenant1": { @@ -221,6 +230,7 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) { }, // expect data expectedIngestedBytes: 5050, + expectedAppendsTotal: 5, }, { name: "drop streams per partition limit", @@ -228,7 +238,8 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) { assignedPartitionIDs: []int32{0, 1}, numPartitions: 2, metadata: &streamMetadata{ - locks: make([]stripeLock, 2), + numPartitions: 2, + locks: make([]stripeLock, 2), stripes: []map[string]map[int32]map[uint64]Stream{ make(map[string]map[int32]map[uint64]Stream), make(map[string]map[int32]map[uint64]Stream), @@ -252,6 +263,7 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) { {StreamHash: 0x3, Reason: uint32(ReasonExceedsMaxStreams)}, {StreamHash: 0x4, Reason: uint32(ReasonExceedsMaxStreams)}, }, + expectedAppendsTotal: 2, }, { name: "skip streams assigned to partitions not owned by instance but enforce limit", @@ -259,7 +271,8 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) { assignedPartitionIDs: []int32{0}, numPartitions: 2, metadata: &streamMetadata{ - locks: make([]stripeLock, 2), + numPartitions: 2, + locks: make([]stripeLock, 2), stripes: []map[string]map[int32]map[uint64]Stream{ make(map[string]map[int32]map[uint64]Stream), make(map[string]map[int32]map[uint64]Stream), @@ -282,6 +295,7 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) { expectedResults: []*proto.ExceedsLimitsResult{ {StreamHash: 0x4, Reason: uint32(ReasonExceedsMaxStreams)}, }, + expectedAppendsTotal: 1, }, } @@ -292,6 +306,8 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) { MaxGlobalStreams: tt.maxActiveStreams, } + wal := &mockWAL{t: t, ExpectedAppendsTotal: tt.expectedAppendsTotal} + s := &IngestLimits{ cfg: Config{ NumPartitions: tt.numPartitions, @@ -319,6 +335,7 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) { metadata: tt.metadata, partitionManager: NewPartitionManager(log.NewNopLogger()), clock: clock, + wal: wal, } // Assign the Partition IDs. @@ -347,6 +364,8 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) { break } } + + wal.AssertAppendsTotal() }) } } @@ -359,8 +378,11 @@ func TestIngestLimits_ExceedsLimits_Concurrent(t *testing.T) { MaxGlobalStreams: 5, } + wal := &mockWAL{t: t, ExpectedAppendsTotal: 50} + // Setup test data with a mix of active and expired streams> metadata := &streamMetadata{ + numPartitions: 1, stripes: []map[string]map[int32]map[uint64]Stream{ { "tenant1": { @@ -404,6 +426,7 @@ func TestIngestLimits_ExceedsLimits_Concurrent(t *testing.T) { metrics: newMetrics(prometheus.NewRegistry()), limits: limits, clock: clock, + wal: wal, } // Assign the Partition IDs. @@ -426,18 +449,20 @@ func TestIngestLimits_ExceedsLimits_Concurrent(t *testing.T) { resp, err := s.ExceedsLimits(context.Background(), req) require.NoError(t, err) require.NotNil(t, resp) - require.Nil(t, resp.Results) + require.Empty(t, resp.Results) }() } // Wait for all goroutines to complete wg.Wait() + wal.AssertAppendsTotal() } func TestNewIngestLimits(t *testing.T) { cfg := Config{ KafkaConfig: kafka.Config{ - Topic: "test-topic", + Topic: "test-topic", + WriteTimeout: 10 * time.Second, }, WindowSize: time.Hour, LifecyclerConfig: ring.LifecyclerConfig{ @@ -464,7 +489,7 @@ func TestNewIngestLimits(t *testing.T) { s, err := NewIngestLimits(cfg, limits, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) require.NotNil(t, s) - require.NotNil(t, s.client) + require.NotNil(t, s.reader) require.Equal(t, cfg, s.cfg) diff --git a/pkg/limits/mock_test.go b/pkg/limits/mock_test.go new file mode 100644 index 0000000000..a011ed439e --- /dev/null +++ b/pkg/limits/mock_test.go @@ -0,0 +1,33 @@ +package limits + +import ( + "context" + "sync" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/limits/proto" +) + +type mockWAL struct { + t *testing.T + NumAppendsTotal int + ExpectedAppendsTotal int + mtx sync.Mutex +} + +func (m *mockWAL) Append(_ context.Context, _ string, _ *proto.StreamMetadata) error { + m.mtx.Lock() + defer m.mtx.Unlock() + m.NumAppendsTotal++ + return nil +} + +func (m *mockWAL) Close() error { + return nil +} + +func (m *mockWAL) AssertAppendsTotal() { + require.Equal(m.t, m.ExpectedAppendsTotal, m.NumAppendsTotal) +} diff --git a/pkg/limits/proto/limits.pb.go b/pkg/limits/proto/limits.pb.go index fe1f982382..fb9e7481b4 100644 --- a/pkg/limits/proto/limits.pb.go +++ b/pkg/limits/proto/limits.pb.go @@ -303,6 +303,65 @@ func (m *StreamMetadata) GetTotalSize() uint64 { return 0 } +type StreamMetadataRecord struct { + Zone string `protobuf:"bytes,1,opt,name=zone,proto3" json:"zone,omitempty"` + Tenant string `protobuf:"bytes,2,opt,name=tenant,proto3" json:"tenant,omitempty"` + Metadata *StreamMetadata `protobuf:"bytes,3,opt,name=metadata,proto3" json:"metadata,omitempty"` +} + +func (m *StreamMetadataRecord) Reset() { *m = StreamMetadataRecord{} } +func (*StreamMetadataRecord) ProtoMessage() {} +func (*StreamMetadataRecord) Descriptor() ([]byte, []int) { + return fileDescriptor_aaed9e7d5298ac0f, []int{6} +} +func (m *StreamMetadataRecord) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *StreamMetadataRecord) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_StreamMetadataRecord.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *StreamMetadataRecord) XXX_Merge(src proto.Message) { + xxx_messageInfo_StreamMetadataRecord.Merge(m, src) +} +func (m *StreamMetadataRecord) XXX_Size() int { + return m.Size() +} +func (m *StreamMetadataRecord) XXX_DiscardUnknown() { + xxx_messageInfo_StreamMetadataRecord.DiscardUnknown(m) +} + +var xxx_messageInfo_StreamMetadataRecord proto.InternalMessageInfo + +func (m *StreamMetadataRecord) GetZone() string { + if m != nil { + return m.Zone + } + return "" +} + +func (m *StreamMetadataRecord) GetTenant() string { + if m != nil { + return m.Tenant + } + return "" +} + +func (m *StreamMetadataRecord) GetMetadata() *StreamMetadata { + if m != nil { + return m.Metadata + } + return nil +} + func init() { proto.RegisterType((*ExceedsLimitsRequest)(nil), "proto.ExceedsLimitsRequest") proto.RegisterType((*ExceedsLimitsResponse)(nil), "proto.ExceedsLimitsResponse") @@ -311,41 +370,44 @@ func init() { proto.RegisterType((*GetAssignedPartitionsResponse)(nil), "proto.GetAssignedPartitionsResponse") proto.RegisterMapType((map[int32]int64)(nil), "proto.GetAssignedPartitionsResponse.AssignedPartitionsEntry") proto.RegisterType((*StreamMetadata)(nil), "proto.StreamMetadata") + proto.RegisterType((*StreamMetadataRecord)(nil), "proto.StreamMetadataRecord") } func init() { proto.RegisterFile("pkg/limits/proto/limits.proto", fileDescriptor_aaed9e7d5298ac0f) } var fileDescriptor_aaed9e7d5298ac0f = []byte{ - // 449 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x92, 0xcf, 0x6e, 0xd3, 0x40, - 0x10, 0xc6, 0x77, 0x93, 0xa6, 0x55, 0x07, 0x8a, 0xd0, 0x92, 0x80, 0x15, 0xd2, 0x55, 0xb4, 0x70, - 0xc8, 0x29, 0x91, 0x42, 0x0f, 0x08, 0x71, 0x01, 0x29, 0xfc, 0x91, 0x1a, 0x84, 0xb6, 0x0f, 0x80, - 0x16, 0x3c, 0x0a, 0x56, 0xdd, 0x75, 0xf0, 0x8e, 0x11, 0xe5, 0xc4, 0x23, 0xf0, 0x18, 0x3c, 0x07, - 0x27, 0x8e, 0x39, 0x70, 0xe8, 0x91, 0x38, 0x17, 0x8e, 0x7d, 0x04, 0xd4, 0xb5, 0x23, 0xfa, 0xc7, - 0x29, 0x1c, 0x7a, 0xb2, 0x67, 0xe6, 0xd3, 0x6f, 0xfc, 0xcd, 0x67, 0xd8, 0x9e, 0xee, 0x4f, 0x06, - 0x71, 0x74, 0x10, 0x91, 0x1b, 0x4c, 0xd3, 0x84, 0x92, 0xb2, 0xe8, 0xfb, 0x42, 0x34, 0xfc, 0x43, - 0xbd, 0x81, 0xe6, 0xe8, 0xd3, 0x3b, 0xc4, 0xd0, 0xed, 0xfa, 0xa9, 0xc6, 0x0f, 0x19, 0x3a, 0x12, - 0xb7, 0x61, 0x9d, 0xd0, 0x1a, 0x4b, 0x01, 0xef, 0xf2, 0xde, 0xa6, 0x2e, 0x2b, 0x31, 0x80, 0x0d, - 0x47, 0x29, 0x9a, 0x03, 0x17, 0xd4, 0xba, 0xf5, 0xde, 0xb5, 0x61, 0xab, 0xe0, 0xf5, 0xf7, 0x7c, - 0x77, 0x8c, 0x64, 0x42, 0x43, 0x46, 0x2f, 0x55, 0x6a, 0x0c, 0xad, 0x73, 0x0b, 0xdc, 0x34, 0xb1, - 0x0e, 0xc5, 0x0e, 0x6c, 0xa4, 0xe8, 0xb2, 0x98, 0x5c, 0xc0, 0x3d, 0xa9, 0x5d, 0x92, 0xce, 0xcb, - 0xb3, 0x98, 0xf4, 0x52, 0xaa, 0xc6, 0x70, 0xab, 0x62, 0x2e, 0x24, 0x40, 0xb1, 0xf0, 0x85, 0x71, - 0xef, 0xfd, 0x27, 0xaf, 0xe9, 0x53, 0x9d, 0x13, 0x3b, 0x29, 0x1a, 0x97, 0xd8, 0xa0, 0xd6, 0xe5, - 0xbd, 0x2d, 0x5d, 0x56, 0x4a, 0x42, 0xe7, 0x39, 0xd2, 0x13, 0xe7, 0xa2, 0x89, 0xc5, 0xf0, 0xb5, - 0x49, 0x29, 0xa2, 0x28, 0xb1, 0xcb, 0x33, 0xa8, 0x9f, 0x1c, 0xb6, 0x57, 0x08, 0x4a, 0x1b, 0x31, - 0x08, 0x73, 0x61, 0x5a, 0x3a, 0x7a, 0x5c, 0x3a, 0xba, 0x94, 0xd0, 0xbf, 0x38, 0x1a, 0x59, 0x4a, - 0x0f, 0x75, 0x05, 0xb7, 0x3d, 0x82, 0x3b, 0x2b, 0xe4, 0xe2, 0x26, 0xd4, 0xf7, 0xf1, 0xd0, 0x7b, - 0x6f, 0xe8, 0x93, 0x57, 0xd1, 0x84, 0xc6, 0x47, 0x13, 0x67, 0xe8, 0x3d, 0xd7, 0x75, 0x51, 0x3c, - 0xaa, 0x3d, 0xe4, 0xea, 0x15, 0xdc, 0x38, 0x9b, 0xd7, 0x3f, 0x0f, 0xd8, 0x81, 0x4d, 0x4a, 0xc8, - 0xc4, 0x7b, 0xd1, 0xe7, 0x82, 0xb7, 0xa6, 0xff, 0x36, 0x86, 0x21, 0x34, 0x5f, 0xda, 0x09, 0x3a, - 0x2a, 0x42, 0x79, 0x96, 0x26, 0x96, 0xd0, 0x86, 0x62, 0x17, 0xb6, 0xce, 0xa4, 0x25, 0xee, 0x56, - 0x67, 0xec, 0x8f, 0xdd, 0xee, 0xac, 0xf8, 0x01, 0xfc, 0x99, 0x14, 0x1b, 0x7e, 0xe7, 0x70, 0xfd, - 0xf4, 0x9a, 0xab, 0xc5, 0x8b, 0x10, 0x5a, 0x95, 0x41, 0x89, 0x7b, 0x97, 0xc7, 0x58, 0xd0, 0xef, - 0xff, 0x4f, 0xd6, 0x8a, 0x3d, 0xdd, 0x99, 0xcd, 0x25, 0x3b, 0x9a, 0x4b, 0x76, 0x3c, 0x97, 0xfc, - 0x4b, 0x2e, 0xf9, 0xb7, 0x5c, 0xf2, 0x1f, 0xb9, 0xe4, 0xb3, 0x5c, 0xf2, 0x5f, 0xb9, 0xe4, 0xbf, - 0x73, 0xc9, 0x8e, 0x73, 0xc9, 0xbf, 0x2e, 0x24, 0x9b, 0x2d, 0x24, 0x3b, 0x5a, 0x48, 0xf6, 0x76, - 0xdd, 0xc3, 0x1f, 0xfc, 0x09, 0x00, 0x00, 0xff, 0xff, 0x14, 0x3f, 0x94, 0x86, 0xd5, 0x03, 0x00, - 0x00, + // 487 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x92, 0xbf, 0x6e, 0x13, 0x41, + 0x10, 0xc6, 0x6f, 0xed, 0x38, 0x21, 0x03, 0x41, 0x68, 0xb1, 0xe1, 0x64, 0x9c, 0x95, 0x75, 0x50, + 0xb8, 0xb2, 0x85, 0x49, 0x81, 0x10, 0x0d, 0x48, 0xe6, 0x8f, 0x14, 0x23, 0xb4, 0x79, 0x00, 0xb4, + 0xe4, 0x46, 0xe6, 0x94, 0xf3, 0x9e, 0xb9, 0x9d, 0x43, 0x24, 0x15, 0x8f, 0xc0, 0x63, 0xf0, 0x1c, + 0x54, 0x94, 0x2e, 0x28, 0x52, 0xe2, 0x73, 0x43, 0x99, 0x47, 0x40, 0xd9, 0x5b, 0x83, 0xed, 0xd8, + 0x81, 0x82, 0xca, 0x3b, 0x3b, 0x9f, 0xbf, 0xb9, 0x6f, 0xe7, 0x07, 0xbb, 0xa3, 0xa3, 0x41, 0x27, + 0x8e, 0x86, 0x11, 0x99, 0xce, 0x28, 0x4d, 0x28, 0x71, 0x45, 0xdb, 0x16, 0xbc, 0x62, 0x7f, 0x82, + 0x37, 0x50, 0xed, 0x7d, 0x3c, 0x44, 0x0c, 0xcd, 0xbe, 0xed, 0x4a, 0x7c, 0x9f, 0xa1, 0x21, 0x7e, + 0x0b, 0x36, 0x09, 0xb5, 0xd2, 0xe4, 0xb3, 0x26, 0x6b, 0x6d, 0x4b, 0x57, 0xf1, 0x0e, 0x6c, 0x19, + 0x4a, 0x51, 0x0d, 0x8d, 0x5f, 0x6a, 0x96, 0x5b, 0x57, 0xbb, 0xb5, 0xc2, 0xaf, 0x7d, 0x60, 0x6f, + 0xfb, 0x48, 0x2a, 0x54, 0xa4, 0xe4, 0x4c, 0x15, 0xf4, 0xa1, 0xb6, 0x34, 0xc0, 0x8c, 0x12, 0x6d, + 0x90, 0xef, 0xc1, 0x56, 0x8a, 0x26, 0x8b, 0xc9, 0xf8, 0xcc, 0x3a, 0xd5, 0x9d, 0xd3, 0xb2, 0x3c, + 0x8b, 0x49, 0xce, 0xa4, 0x41, 0x1f, 0x6e, 0xae, 0xe8, 0x73, 0x01, 0x50, 0x0c, 0x7c, 0xa1, 0xcc, + 0x3b, 0xfb, 0xc9, 0x1b, 0x72, 0xee, 0xe6, 0x3c, 0x4e, 0x8a, 0xca, 0x24, 0xda, 0x2f, 0x35, 0x59, + 0x6b, 0x47, 0xba, 0x2a, 0x10, 0xd0, 0x78, 0x8e, 0xf4, 0xc4, 0x98, 0x68, 0xa0, 0x31, 0x7c, 0xad, + 0x52, 0x8a, 0x28, 0x4a, 0xf4, 0xec, 0x19, 0x82, 0xef, 0x0c, 0x76, 0xd7, 0x08, 0x5c, 0x8c, 0x18, + 0xb8, 0xba, 0xd0, 0x75, 0x89, 0x1e, 0xbb, 0x44, 0x97, 0x3a, 0xb4, 0x2f, 0xb6, 0x7a, 0x9a, 0xd2, + 0x63, 0xb9, 0xc2, 0xb7, 0xde, 0x83, 0xdb, 0x6b, 0xe4, 0xfc, 0x06, 0x94, 0x8f, 0xf0, 0xd8, 0x66, + 0xaf, 0xc8, 0xf3, 0x23, 0xaf, 0x42, 0xe5, 0x83, 0x8a, 0x33, 0xb4, 0x99, 0xcb, 0xb2, 0x28, 0x1e, + 0x95, 0x1e, 0xb2, 0xe0, 0x15, 0x5c, 0x5f, 0xdc, 0xd7, 0x5f, 0x1f, 0xb0, 0x01, 0xdb, 0x94, 0x90, + 0x8a, 0x0f, 0xa2, 0x93, 0xc2, 0x6f, 0x43, 0xfe, 0xb9, 0x08, 0x32, 0xa8, 0x2e, 0xed, 0x1f, 0x0f, + 0x93, 0x34, 0xe4, 0x1c, 0x36, 0x4e, 0x12, 0x8d, 0x8e, 0x21, 0x7b, 0x9e, 0x23, 0xab, 0xb4, 0x40, + 0xd6, 0x7d, 0xb8, 0x32, 0x74, 0xff, 0xf6, 0xcb, 0x4d, 0xb6, 0x1e, 0xad, 0xdf, 0xb2, 0x6e, 0x08, + 0xd5, 0x97, 0x7a, 0x80, 0x86, 0x0a, 0x16, 0x9e, 0xa5, 0x89, 0x26, 0xd4, 0x21, 0xdf, 0x87, 0x9d, + 0x05, 0x48, 0xf8, 0x9d, 0xd5, 0x68, 0xd9, 0x1d, 0xd7, 0x1b, 0x6b, 0xb8, 0xb3, 0xdb, 0x09, 0xbc, + 0xee, 0x57, 0x06, 0xd7, 0xe6, 0xc7, 0xfc, 0x5f, 0x7b, 0x1e, 0x42, 0x6d, 0x25, 0x1f, 0xfc, 0xee, + 0xe5, 0xf4, 0x14, 0xee, 0xf7, 0xfe, 0x05, 0xb1, 0xc0, 0x7b, 0xba, 0x37, 0x9e, 0x08, 0xef, 0x74, + 0x22, 0xbc, 0xb3, 0x89, 0x60, 0x9f, 0x72, 0xc1, 0xbe, 0xe4, 0x82, 0x7d, 0xcb, 0x05, 0x1b, 0xe7, + 0x82, 0xfd, 0xc8, 0x05, 0xfb, 0x99, 0x0b, 0xef, 0x2c, 0x17, 0xec, 0xf3, 0x54, 0x78, 0xe3, 0xa9, + 0xf0, 0x4e, 0xa7, 0xc2, 0x7b, 0xbb, 0x69, 0xcd, 0x1f, 0xfc, 0x0a, 0x00, 0x00, 0xff, 0xff, 0x36, + 0x1b, 0xa0, 0x70, 0x4c, 0x04, 0x00, 0x00, } func (this *ExceedsLimitsRequest) Equal(that interface{}) bool { @@ -513,6 +575,36 @@ func (this *StreamMetadata) Equal(that interface{}) bool { } return true } +func (this *StreamMetadataRecord) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*StreamMetadataRecord) + if !ok { + that2, ok := that.(StreamMetadataRecord) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.Zone != that1.Zone { + return false + } + if this.Tenant != that1.Tenant { + return false + } + if !this.Metadata.Equal(that1.Metadata) { + return false + } + return true +} func (this *ExceedsLimitsRequest) GoString() string { if this == nil { return "nil" @@ -591,6 +683,20 @@ func (this *StreamMetadata) GoString() string { s = append(s, "}") return strings.Join(s, "") } +func (this *StreamMetadataRecord) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 7) + s = append(s, "&proto.StreamMetadataRecord{") + s = append(s, "Zone: "+fmt.Sprintf("%#v", this.Zone)+",\n") + s = append(s, "Tenant: "+fmt.Sprintf("%#v", this.Tenant)+",\n") + if this.Metadata != nil { + s = append(s, "Metadata: "+fmt.Sprintf("%#v", this.Metadata)+",\n") + } + s = append(s, "}") + return strings.Join(s, "") +} func valueToGoStringLimits(v interface{}, typ string) string { rv := reflect.ValueOf(v) if rv.IsNil() { @@ -996,6 +1102,55 @@ func (m *StreamMetadata) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *StreamMetadataRecord) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *StreamMetadataRecord) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *StreamMetadataRecord) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Metadata != nil { + { + size, err := m.Metadata.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintLimits(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x1a + } + if len(m.Tenant) > 0 { + i -= len(m.Tenant) + copy(dAtA[i:], m.Tenant) + i = encodeVarintLimits(dAtA, i, uint64(len(m.Tenant))) + i-- + dAtA[i] = 0x12 + } + if len(m.Zone) > 0 { + i -= len(m.Zone) + copy(dAtA[i:], m.Zone) + i = encodeVarintLimits(dAtA, i, uint64(len(m.Zone))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + func encodeVarintLimits(dAtA []byte, offset int, v uint64) int { offset -= sovLimits(v) base := offset @@ -1097,6 +1252,27 @@ func (m *StreamMetadata) Size() (n int) { return n } +func (m *StreamMetadataRecord) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Zone) + if l > 0 { + n += 1 + l + sovLimits(uint64(l)) + } + l = len(m.Tenant) + if l > 0 { + n += 1 + l + sovLimits(uint64(l)) + } + if m.Metadata != nil { + l = m.Metadata.Size() + n += 1 + l + sovLimits(uint64(l)) + } + return n +} + func sovLimits(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 } @@ -1185,6 +1361,18 @@ func (this *StreamMetadata) String() string { }, "") return s } +func (this *StreamMetadataRecord) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&StreamMetadataRecord{`, + `Zone:` + fmt.Sprintf("%v", this.Zone) + `,`, + `Tenant:` + fmt.Sprintf("%v", this.Tenant) + `,`, + `Metadata:` + strings.Replace(this.Metadata.String(), "StreamMetadata", "StreamMetadata", 1) + `,`, + `}`, + }, "") + return s +} func valueToStringLimits(v interface{}) string { rv := reflect.ValueOf(v) if rv.IsNil() { @@ -1786,6 +1974,159 @@ func (m *StreamMetadata) Unmarshal(dAtA []byte) error { } return nil } +func (m *StreamMetadataRecord) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLimits + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: StreamMetadataRecord: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: StreamMetadataRecord: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Zone", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLimits + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthLimits + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthLimits + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Zone = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Tenant", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLimits + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthLimits + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthLimits + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Tenant = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Metadata", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLimits + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthLimits + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthLimits + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Metadata == nil { + m.Metadata = &StreamMetadata{} + } + if err := m.Metadata.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipLimits(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthLimits + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthLimits + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipLimits(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 diff --git a/pkg/limits/proto/limits.proto b/pkg/limits/proto/limits.proto index 963d6ab517..5cd39ac881 100644 --- a/pkg/limits/proto/limits.proto +++ b/pkg/limits/proto/limits.proto @@ -35,3 +35,9 @@ message StreamMetadata { uint64 streamHash = 1; uint64 totalSize = 2; } + +message StreamMetadataRecord { + string zone = 1; + string tenant = 2; + StreamMetadata metadata = 3; +} diff --git a/pkg/limits/stream_metadata.go b/pkg/limits/stream_metadata.go index 91ee64966f..f978e6451f 100644 --- a/pkg/limits/stream_metadata.go +++ b/pkg/limits/stream_metadata.go @@ -3,6 +3,8 @@ package limits import ( "hash/fnv" "sync" + + "github.com/grafana/loki/v3/pkg/limits/proto" ) // AllFunc is a function that is called for each stream in the metadata. @@ -18,7 +20,7 @@ type UsageFunc = func(partitionID int32, stream Stream) // CondFunc is a function that is called for each stream in the metadata. // It is used to check if the stream should be stored. // It returns true if the stream should be stored, false otherwise. -type CondFunc = func(acc float64, stream Stream) bool +type CondFunc = func(acc float64, stream *proto.StreamMetadata) bool // StreamMetadata represents the ingest limits interface for the stream metadata. type StreamMetadata interface { @@ -31,7 +33,7 @@ type StreamMetadata interface { // StoreCond tries to store a list of streams for a specific tenant and partition, // until the partition limit is reached. It returns the total ingested bytes. - StoreCond(tenant string, streams map[int32][]Stream, cutoff, bucketStart, bucketCutOff int64, cond CondFunc) uint64 + StoreCond(tenant string, streams []*proto.StreamMetadata, lastSeenAt, cutoff, bucketStart, bucketCutOff int64, cond CondFunc) ([]*proto.StreamMetadata, []*proto.StreamMetadata) // Store updates or creates the stream metadata for a specific tenant and partition. Store(tenant string, partitionID int32, streamHash, recTotalSize uint64, recordTime, bucketStart, bucketCutOff int64) @@ -67,14 +69,16 @@ type stripeLock struct { } type streamMetadata struct { - stripes []map[string]map[int32]map[uint64]Stream // stripe -> tenant -> partitionID -> streamMetadata - locks []stripeLock + numPartitions int + stripes []map[string]map[int32]map[uint64]Stream // stripe -> tenant -> partitionID -> streamMetadata + locks []stripeLock } -func NewStreamMetadata(size int) StreamMetadata { +func NewStreamMetadata(numPartitions int) StreamMetadata { s := &streamMetadata{ - stripes: make([]map[string]map[int32]map[uint64]Stream, size), - locks: make([]stripeLock, size), + numPartitions: numPartitions, + stripes: make([]map[string]map[int32]map[uint64]Stream, numPartitions), + locks: make([]stripeLock, numPartitions), } for i := range s.stripes { s.stripes[i] = make(map[string]map[int32]map[uint64]Stream) @@ -104,56 +108,58 @@ func (s *streamMetadata) Usage(tenant string, fn UsageFunc) { }) } -func (s *streamMetadata) StoreCond(tenant string, streams map[int32][]Stream, cutoff, bucketStart, bucketCutOff int64, cond CondFunc) uint64 { - var ingestedBytes uint64 +func (s *streamMetadata) StoreCond(tenant string, streams []*proto.StreamMetadata, lastSeenAt, cutoff, bucketStart, bucketCutOff int64, cond CondFunc) ([]*proto.StreamMetadata, []*proto.StreamMetadata) { + stored := make([]*proto.StreamMetadata, 0, len(streams)) + rejected := make([]*proto.StreamMetadata, 0, len(streams)) + s.withLock(tenant, func(i int) { if _, ok := s.stripes[i][tenant]; !ok { s.stripes[i][tenant] = make(map[int32]map[uint64]Stream) } - for partitionID, streams := range streams { + activeStreams := make(map[int32]int) + + for _, stream := range streams { + partitionID := int32(stream.StreamHash % uint64(s.numPartitions)) + if _, ok := s.stripes[i][tenant][partitionID]; !ok { s.stripes[i][tenant][partitionID] = make(map[uint64]Stream) } - var ( - activeStreams = 0 - newStreams = 0 - ) - - // Count as active streams all stream that are not expired. - for _, stored := range s.stripes[i][tenant][partitionID] { - if stored.LastSeenAt >= cutoff { - activeStreams++ + // Count as active streams all streams that are not expired. + if _, ok := activeStreams[partitionID]; !ok { + for _, stored := range s.stripes[i][tenant][partitionID] { + if stored.LastSeenAt >= cutoff { + activeStreams[partitionID]++ + } } } - for _, stream := range streams { - stored, found := s.stripes[i][tenant][partitionID][stream.Hash] + recorded, found := s.stripes[i][tenant][partitionID][stream.StreamHash] - // If the stream is new or expired, check if it exceeds the limit. - // If limit is not exceeded and the stream is expired, reset the stream. - if !found || (stored.LastSeenAt < cutoff) { - // Count up the new stream before updating - newStreams++ + // If the stream is new or expired, check if it exceeds the limit. + // If limit is not exceeded and the stream is expired, reset the stream. + if !found || (recorded.LastSeenAt < cutoff) { + activeStreams[partitionID]++ - if !cond(float64(activeStreams+newStreams), stream) { - continue - } + if !cond(float64(activeStreams[partitionID]), stream) { + rejected = append(rejected, stream) + continue + } - // If the stream is stored and expired, reset the stream - if found && stored.LastSeenAt < cutoff { - s.stripes[i][tenant][partitionID][stream.Hash] = Stream{Hash: stream.Hash, LastSeenAt: stream.LastSeenAt} - } + // If the stream is stored and expired, reset the stream + if found && recorded.LastSeenAt < cutoff { + s.stripes[i][tenant][partitionID][stream.StreamHash] = Stream{Hash: stream.StreamHash, LastSeenAt: lastSeenAt} } + } - s.storeStream(i, tenant, partitionID, stream.Hash, stream.TotalSize, stream.LastSeenAt, bucketStart, bucketCutOff) + s.storeStream(i, tenant, partitionID, stream.StreamHash, stream.TotalSize, lastSeenAt, bucketStart, bucketCutOff) - ingestedBytes += stream.TotalSize - } + stored = append(stored, stream) } }) - return ingestedBytes + + return stored, rejected } func (s *streamMetadata) Store(tenant string, partitionID int32, streamHash, recTotalSize uint64, recordTime, bucketStart, bucketCutOff int64) { @@ -300,12 +306,8 @@ func (s *streamMetadata) getStripe(tenant string) int { // streamLimitExceeded returns a CondFunc that checks if the number of active streams // exceeds the given limit. If it does, the stream is added to the results map. -func streamLimitExceeded(limit uint64, results map[Reason][]uint64) CondFunc { - return func(acc float64, stream Stream) bool { - if acc > float64(limit) { - results[ReasonExceedsMaxStreams] = append(results[ReasonExceedsMaxStreams], stream.Hash) - return false - } - return true +func streamLimitExceeded(limit uint64) CondFunc { + return func(acc float64, _ *proto.StreamMetadata) bool { + return acc <= float64(limit) } } diff --git a/pkg/limits/stream_metadata_test.go b/pkg/limits/stream_metadata_test.go index b66cff0252..27a6442683 100644 --- a/pkg/limits/stream_metadata_test.go +++ b/pkg/limits/stream_metadata_test.go @@ -497,13 +497,12 @@ func TestStreamMetadata_StoreCond(t *testing.T) { // setup data metadata *streamMetadata - streams map[int32][]Stream + streams []*proto.StreamMetadata maxActiveStreams uint64 // expectations - expectedStored map[string]map[int32][]Stream - expectedDropped map[Reason][]uint64 - expectedIngestedBytes uint64 + expectedStored []*proto.StreamMetadata + expectedRejected []*proto.StreamMetadata }{ { name: "no streams", @@ -511,118 +510,91 @@ func TestStreamMetadata_StoreCond(t *testing.T) { stripes: []map[string]map[int32]map[uint64]Stream{make(map[string]map[int32]map[uint64]Stream)}, locks: make([]stripeLock, 1), }, - streams: map[int32][]Stream{}, maxActiveStreams: 10, - expectedStored: map[string]map[int32][]Stream{}, }, { name: "all streams within partition limit", metadata: &streamMetadata{ - stripes: []map[string]map[int32]map[uint64]Stream{make(map[string]map[int32]map[uint64]Stream)}, - locks: make([]stripeLock, 1), + numPartitions: 1, + stripes: []map[string]map[int32]map[uint64]Stream{make(map[string]map[int32]map[uint64]Stream)}, + locks: make([]stripeLock, 1), + }, + streams: []*proto.StreamMetadata{ + {StreamHash: 0x0, TotalSize: 1000}, + {StreamHash: 0x1, TotalSize: 1000}, }, - streams: map[int32][]Stream{0: { - {Hash: 0x0, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, - {Hash: 0x1, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, - }}, maxActiveStreams: 2, - expectedStored: map[string]map[int32][]Stream{ - "tenant1": { - 0: []Stream{ - {Hash: 0x0, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, - {Hash: 0x1, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, - }, - }, + expectedStored: []*proto.StreamMetadata{ + {StreamHash: 0x0, TotalSize: 1000}, + {StreamHash: 0x1, TotalSize: 1000}, }, - expectedIngestedBytes: 2000, }, { name: "all stream within limit per partition", metadata: &streamMetadata{ - stripes: []map[string]map[int32]map[uint64]Stream{make(map[string]map[int32]map[uint64]Stream)}, - locks: make([]stripeLock, 2), + numPartitions: 1, + stripes: []map[string]map[int32]map[uint64]Stream{make(map[string]map[int32]map[uint64]Stream)}, + locks: make([]stripeLock, 1), }, - streams: map[int32][]Stream{ - 0: { - {Hash: 0x0, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, - }, - 1: { - {Hash: 0x1, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, - }, + streams: []*proto.StreamMetadata{ + {StreamHash: 0x0, TotalSize: 1000}, + {StreamHash: 0x1, TotalSize: 1000}, }, maxActiveStreams: 2, - expectedStored: map[string]map[int32][]Stream{ - "tenant1": { - 0: []Stream{ - {Hash: 0x0, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, - }, - 1: []Stream{ - {Hash: 0x1, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, - }, - }, + expectedStored: []*proto.StreamMetadata{ + {StreamHash: 0x0, TotalSize: 1000}, + {StreamHash: 0x1, TotalSize: 1000}, }, - expectedIngestedBytes: 2000, }, { name: "some streams dropped", metadata: &streamMetadata{ - stripes: []map[string]map[int32]map[uint64]Stream{make(map[string]map[int32]map[uint64]Stream)}, - locks: make([]stripeLock, 1), + numPartitions: 1, + stripes: []map[string]map[int32]map[uint64]Stream{make(map[string]map[int32]map[uint64]Stream)}, + locks: make([]stripeLock, 1), }, - streams: map[int32][]Stream{ - 0: { - {Hash: 0x0, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, - {Hash: 0x1, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, - }, + streams: []*proto.StreamMetadata{ + {StreamHash: 0x0, TotalSize: 1000}, + {StreamHash: 0x1, TotalSize: 1000}, }, maxActiveStreams: 1, - expectedStored: map[string]map[int32][]Stream{ - "tenant1": { - 0: []Stream{ - {Hash: 0x0, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, - }, - }, + expectedStored: []*proto.StreamMetadata{ + {StreamHash: 0x0, TotalSize: 1000}, }, - expectedIngestedBytes: 1000, - expectedDropped: map[Reason][]uint64{ - ReasonExceedsMaxStreams: {0x1}, + expectedRejected: []*proto.StreamMetadata{ + {StreamHash: 0x1, TotalSize: 1000}, }, }, { name: "some streams dropped per partition", metadata: &streamMetadata{ - stripes: []map[string]map[int32]map[uint64]Stream{make(map[string]map[int32]map[uint64]Stream)}, - locks: make([]stripeLock, 2), - }, - streams: map[int32][]Stream{ - 0: { - {Hash: 0x0, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, - {Hash: 0x1, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, - }, - 1: { - {Hash: 0x2, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, - {Hash: 0x3, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, + numPartitions: 2, + stripes: []map[string]map[int32]map[uint64]Stream{ + make(map[string]map[int32]map[uint64]Stream), + make(map[string]map[int32]map[uint64]Stream), }, + locks: make([]stripeLock, 2), + }, + streams: []*proto.StreamMetadata{ + {StreamHash: 0x0, TotalSize: 1000}, // 0 % 2 = 0 + {StreamHash: 0x1, TotalSize: 1000}, // 1 % 2 = 1 + {StreamHash: 0x2, TotalSize: 1000}, // 2 % 2 = 0 + {StreamHash: 0x3, TotalSize: 1000}, // 3 % 2 = 1 }, maxActiveStreams: 1, - expectedStored: map[string]map[int32][]Stream{ - "tenant1": { - 0: []Stream{ - {Hash: 0x0, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, - }, - 1: []Stream{ - {Hash: 0x2, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, - }, - }, + expectedStored: []*proto.StreamMetadata{ + {StreamHash: 0x0, TotalSize: 1000}, + {StreamHash: 0x1, TotalSize: 1000}, }, - expectedIngestedBytes: 2000, - expectedDropped: map[Reason][]uint64{ - ReasonExceedsMaxStreams: {0x1, 0x3}, + expectedRejected: []*proto.StreamMetadata{ + {StreamHash: 0x2, TotalSize: 1000}, + {StreamHash: 0x3, TotalSize: 1000}, }, }, { name: "some streams dropped from a single partition", metadata: &streamMetadata{ + numPartitions: 2, stripes: []map[string]map[int32]map[uint64]Stream{ { "tenant1": { @@ -634,84 +606,64 @@ func TestStreamMetadata_StoreCond(t *testing.T) { }}, locks: make([]stripeLock, 2), }, - streams: map[int32][]Stream{ - 0: { - {Hash: 0x0, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, - }, - 1: { - {Hash: 0x2, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, - {Hash: 0x3, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, - }, + streams: []*proto.StreamMetadata{ + {StreamHash: 0x0, TotalSize: 1000}, + {StreamHash: 0x3, TotalSize: 1000}, + {StreamHash: 0x5, TotalSize: 1000}, }, maxActiveStreams: 2, - expectedStored: map[string]map[int32][]Stream{ - "tenant1": { - 0: []Stream{ - {Hash: 0x0, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, - }, - 1: []Stream{ - {Hash: 0x1, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, - {Hash: 0x2, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, - }, - }, + expectedStored: []*proto.StreamMetadata{ + {StreamHash: 0x0, TotalSize: 1000}, + {StreamHash: 0x3, TotalSize: 1000}, }, - expectedIngestedBytes: 2000, - expectedDropped: map[Reason][]uint64{ - ReasonExceedsMaxStreams: {0x3}, + expectedRejected: []*proto.StreamMetadata{ + {StreamHash: 0x5, TotalSize: 1000}, }, }, { name: "drops new streams but updates existing streams", metadata: &streamMetadata{ + numPartitions: 2, stripes: []map[string]map[int32]map[uint64]Stream{ { "tenant1": { 0: { 0x0: {Hash: 0x0, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, - 0x1: {Hash: 0x1, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, + 0x4: {Hash: 0x4, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, }, 1: { + 0x1: {Hash: 0x1, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, 0x3: {Hash: 0x3, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, - 0x4: {Hash: 0x4, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, }, }, }, }, - locks: make([]stripeLock, 1), + locks: make([]stripeLock, 2), }, - streams: map[int32][]Stream{ - 0: { // Mixed order of new and existing streams - {Hash: 0x0, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, - {Hash: 0x2, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, - {Hash: 0x1, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, - }, - 1: { // Mixed order of new and existing streams - {Hash: 0x3, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, - {Hash: 0x5, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, - {Hash: 0x4, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, - }, + streams: []*proto.StreamMetadata{ + {StreamHash: 0x0, TotalSize: 1000}, // 0 % 2 = 0 Existing + {StreamHash: 0x2, TotalSize: 1000}, // 2 % 2 = 0 New + {StreamHash: 0x1, TotalSize: 1000}, // 1 % 2 = 1 Existing + {StreamHash: 0x3, TotalSize: 1000}, // 3 % 2 = 1 Existing + {StreamHash: 0x5, TotalSize: 1000}, // 5 % 2 = 1 New + {StreamHash: 0x4, TotalSize: 1000}, // 4 % 2 = 0 Existing }, maxActiveStreams: 2, - expectedStored: map[string]map[int32][]Stream{ - "tenant1": { - 0: []Stream{ - {Hash: 0x0, LastSeenAt: now.UnixNano(), TotalSize: 2000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 2000}}}, - {Hash: 0x1, LastSeenAt: now.UnixNano(), TotalSize: 2000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 2000}}}, - }, - 1: []Stream{ - {Hash: 0x3, LastSeenAt: now.UnixNano(), TotalSize: 2000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 2000}}}, - {Hash: 0x4, LastSeenAt: now.UnixNano(), TotalSize: 2000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 2000}}}, - }, - }, + expectedStored: []*proto.StreamMetadata{ + {StreamHash: 0x0, TotalSize: 1000}, + {StreamHash: 0x1, TotalSize: 1000}, + {StreamHash: 0x3, TotalSize: 1000}, + {StreamHash: 0x4, TotalSize: 1000}, }, - expectedIngestedBytes: 4000, - expectedDropped: map[Reason][]uint64{ - ReasonExceedsMaxStreams: {0x2, 0x5}, + expectedRejected: []*proto.StreamMetadata{ + {StreamHash: 0x2, TotalSize: 1000}, + {StreamHash: 0x5, TotalSize: 1000}, }, }, { name: "reset expired but not evicted streams", metadata: &streamMetadata{ + numPartitions: 1, stripes: []map[string]map[int32]map[uint64]Stream{ { "tenant1": { @@ -725,54 +677,25 @@ func TestStreamMetadata_StoreCond(t *testing.T) { locks: make([]stripeLock, 1), }, maxActiveStreams: 2, - streams: map[int32][]Stream{ - 0: { - {Hash: 0x0, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, - {Hash: 0x1, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, - }, + streams: []*proto.StreamMetadata{ + {StreamHash: 0x0, TotalSize: 1000}, + {StreamHash: 0x1, TotalSize: 1000}, }, - expectedIngestedBytes: 2000, - expectedStored: map[string]map[int32][]Stream{ - "tenant1": { - 0: []Stream{ - {Hash: 0x0, LastSeenAt: now.UnixNano(), TotalSize: 1000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 1000}}}, - {Hash: 0x1, LastSeenAt: now.UnixNano(), TotalSize: 2000, RateBuckets: []RateBucket{{Timestamp: bucketStart, Size: 2000}}}, - }, - }, + expectedStored: []*proto.StreamMetadata{ + {StreamHash: 0x0, TotalSize: 1000}, + {StreamHash: 0x1, TotalSize: 1000}, }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - actualDropped := make(map[Reason][]uint64) - cond := streamLimitExceeded(tt.maxActiveStreams, actualDropped) - - actualIngestedBytes := tt.metadata.StoreCond("tenant1", tt.streams, cutoff, bucketStart, bucketCutOff, cond) + cond := streamLimitExceeded(tt.maxActiveStreams) - actualStored := make(map[string]map[int32][]Stream) - tt.metadata.All(func(tenant string, partitionID int32, stream Stream) { - if _, ok := actualStored[tenant]; !ok { - actualStored[tenant] = make(map[int32][]Stream) - } - actualStored[tenant][partitionID] = append(actualStored[tenant][partitionID], stream) - }) - - for tenant, partitions := range tt.expectedStored { - require.Len(t, actualStored[tenant], len(partitions)) - - for partitionID, streams := range partitions { - require.Len(t, actualStored[tenant][partitionID], len(streams)) - require.ElementsMatch(t, streams, actualStored[tenant][partitionID]) - } - } - - for reason, streamHashes := range tt.expectedDropped { - require.Contains(t, actualDropped, reason) - require.ElementsMatch(t, streamHashes, actualDropped[reason]) - } + stored, rejected := tt.metadata.StoreCond("tenant1", tt.streams, now.UnixNano(), cutoff, bucketStart, bucketCutOff, cond) - require.Equal(t, tt.expectedIngestedBytes, actualIngestedBytes) + require.ElementsMatch(t, tt.expectedStored, stored) + require.ElementsMatch(t, tt.expectedRejected, rejected) }) } } diff --git a/pkg/limits/wal.go b/pkg/limits/wal.go new file mode 100644 index 0000000000..0ca936fd60 --- /dev/null +++ b/pkg/limits/wal.go @@ -0,0 +1,73 @@ +package limits + +import ( + "context" + "fmt" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/twmb/franz-go/pkg/kgo" + + "github.com/grafana/loki/v3/pkg/limits/proto" +) + +type WAL interface { + // Append writes the metadata record to the end of the WAL. + Append(ctx context.Context, tenant string, metadata *proto.StreamMetadata) error + + // Close closes the WAL. + Close() error +} + +// KafkaWAL is a write-ahead log on top of Kafka. +type KafkaWAL struct { + client *kgo.Client + topic string + partitions uint64 + logger log.Logger +} + +func NewKafkaWAL(client *kgo.Client, topic string, partitions uint64, logger log.Logger) *KafkaWAL { + return &KafkaWAL{ + client: client, + topic: topic, + partitions: partitions, + logger: logger, + } +} + +// Append implements the WAL interface. +func (w *KafkaWAL) Append(ctx context.Context, tenant string, metadata *proto.StreamMetadata) error { + v := proto.StreamMetadataRecord{ + Tenant: tenant, + Metadata: metadata, + } + b, err := v.Marshal() + if err != nil { + return fmt.Errorf("failed to marshal proto: %w", err) + } + // The stream metadata topic expects a fixed number of partitions, + // the size of which is determined ahead of time. Streams are + // sharded over partitions using a simple mod. + partition := int32(metadata.StreamHash % w.partitions) + r := kgo.Record{ + Key: []byte(tenant), + Value: b, + Partition: partition, + Topic: w.topic, + } + w.client.Produce(ctx, &r, w.logProduceErr) + return nil +} + +// Close implements the WAL interface. +func (w *KafkaWAL) Close() error { + w.client.Close() + return nil +} + +func (w *KafkaWAL) logProduceErr(_ *kgo.Record, err error) { + if err != nil { + level.Error(w.logger).Log("msg", "failed to produce record", "err", err.Error()) + } +}