diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 30c7574acd..c1bdd893b1 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -322,3 +322,20 @@ func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_ tailer.loop() return nil } + +// TailersCount returns count of active tail requests from a user +func (i *Ingester) TailersCount(ctx context.Context, in *logproto.TailersCountRequest) (*logproto.TailersCountResponse, error) { + instanceID, err := user.ExtractOrgID(ctx) + if err != nil { + return nil, err + } + + resp := logproto.TailersCountResponse{} + + instance, ok := i.getInstanceByID(instanceID) + if ok { + resp.Count = instance.openTailersCount() + } + + return &resp, nil +} diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 477cbf1aa7..ce896ba378 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -318,12 +318,13 @@ func (i *instance) addNewTailer(t *tailer) { } func (i *instance) addTailersToNewStream(stream *stream) { - closedTailers := []uint32{} - i.tailerMtx.RLock() + defer i.tailerMtx.RUnlock() + for _, t := range i.tailers { + // we don't want to watch streams for closed tailers. + // When a new tail request comes in we will clean references to closed tailers if t.isClosed() { - closedTailers = append(closedTailers, t.getID()) continue } @@ -331,6 +332,18 @@ func (i *instance) addTailersToNewStream(stream *stream) { stream.addTailer(t) } } +} + +func (i *instance) checkClosedTailers() { + closedTailers := []uint32{} + + i.tailerMtx.RLock() + for _, t := range i.tailers { + if t.isClosed() { + closedTailers = append(closedTailers, t.getID()) + continue + } + } i.tailerMtx.RUnlock() if len(closedTailers) != 0 { @@ -350,6 +363,15 @@ func (i *instance) closeTailers() { } } +func (i *instance) openTailersCount() uint32 { + i.checkClosedTailers() + + i.tailerMtx.RLock() + defer i.tailerMtx.RUnlock() + + return uint32(len(i.tailers)) +} + func isDone(ctx context.Context) bool { select { case <-ctx.Done(): diff --git a/pkg/logproto/logproto.pb.go b/pkg/logproto/logproto.pb.go index af0ab34c3e..08b4ab15d7 100644 --- a/pkg/logproto/logproto.pb.go +++ b/pkg/logproto/logproto.pb.go @@ -983,6 +983,84 @@ func (m *TransferChunksResponse) XXX_DiscardUnknown() { var xxx_messageInfo_TransferChunksResponse proto.InternalMessageInfo +type TailersCountRequest struct { +} + +func (m *TailersCountRequest) Reset() { *m = TailersCountRequest{} } +func (*TailersCountRequest) ProtoMessage() {} +func (*TailersCountRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_c28a5f14f1f4c79a, []int{18} +} +func (m *TailersCountRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *TailersCountRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_TailersCountRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *TailersCountRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_TailersCountRequest.Merge(m, src) +} +func (m *TailersCountRequest) XXX_Size() int { + return m.Size() +} +func (m *TailersCountRequest) XXX_DiscardUnknown() { + xxx_messageInfo_TailersCountRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_TailersCountRequest proto.InternalMessageInfo + +type TailersCountResponse struct { + Count uint32 `protobuf:"varint,1,opt,name=count,proto3" json:"count,omitempty"` +} + +func (m *TailersCountResponse) Reset() { *m = TailersCountResponse{} } +func (*TailersCountResponse) ProtoMessage() {} +func (*TailersCountResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_c28a5f14f1f4c79a, []int{19} +} +func (m *TailersCountResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *TailersCountResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_TailersCountResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *TailersCountResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_TailersCountResponse.Merge(m, src) +} +func (m *TailersCountResponse) XXX_Size() int { + return m.Size() +} +func (m *TailersCountResponse) XXX_DiscardUnknown() { + xxx_messageInfo_TailersCountResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_TailersCountResponse proto.InternalMessageInfo + +func (m *TailersCountResponse) GetCount() uint32 { + if m != nil { + return m.Count + } + return 0 +} + func init() { proto.RegisterEnum("logproto.Direction", Direction_name, Direction_value) proto.RegisterType((*PushRequest)(nil), "logproto.PushRequest") @@ -1004,79 +1082,84 @@ func init() { proto.RegisterType((*LabelPair)(nil), "logproto.LabelPair") proto.RegisterType((*Chunk)(nil), "logproto.Chunk") proto.RegisterType((*TransferChunksResponse)(nil), "logproto.TransferChunksResponse") + proto.RegisterType((*TailersCountRequest)(nil), "logproto.TailersCountRequest") + proto.RegisterType((*TailersCountResponse)(nil), "logproto.TailersCountResponse") } func init() { proto.RegisterFile("pkg/logproto/logproto.proto", fileDescriptor_c28a5f14f1f4c79a) } var fileDescriptor_c28a5f14f1f4c79a = []byte{ - // 1065 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 0xdd, 0x6e, 0x1b, 0x45, - 0x14, 0xde, 0xb1, 0xd7, 0x6b, 0xfb, 0xf8, 0x27, 0xd6, 0x50, 0x92, 0xc5, 0x45, 0x6b, 0x6b, 0x85, - 0x5a, 0xab, 0x08, 0x1b, 0xcc, 0x4f, 0xd3, 0xf0, 0xa7, 0xb8, 0x25, 0x22, 0x01, 0x89, 0x76, 0x13, - 0x09, 0xa9, 0x12, 0xaa, 0x36, 0xd9, 0xb1, 0xb3, 0xca, 0x7a, 0xd7, 0xdd, 0x1d, 0x23, 0xe5, 0x8e, - 0x17, 0x40, 0xea, 0x1d, 0x17, 0xbc, 0x00, 0xe2, 0x82, 0xe7, 0xe8, 0x65, 0x2e, 0x7b, 0x15, 0x88, - 0x73, 0x83, 0x22, 0x21, 0xf5, 0x05, 0x90, 0xd0, 0xfc, 0xec, 0x7a, 0xe2, 0x04, 0x90, 0x7b, 0x63, - 0xcf, 0x99, 0x39, 0x67, 0xe6, 0x7c, 0xdf, 0xf9, 0xce, 0xcc, 0xc2, 0xcd, 0xc9, 0xd1, 0xa8, 0x17, - 0x44, 0xa3, 0x49, 0x1c, 0xd1, 0x28, 0x1b, 0x74, 0xf9, 0x2f, 0x2e, 0xa5, 0x76, 0xb3, 0x35, 0x8a, - 0xa2, 0x51, 0x40, 0x7a, 0xdc, 0xda, 0x9f, 0x0e, 0x7b, 0xd4, 0x1f, 0x93, 0x84, 0xba, 0xe3, 0x89, - 0x70, 0x6d, 0xbe, 0x33, 0xf2, 0xe9, 0xe1, 0x74, 0xbf, 0x7b, 0x10, 0x8d, 0x7b, 0xa3, 0x68, 0x14, - 0xcd, 0x3d, 0x99, 0x25, 0x76, 0x67, 0x23, 0xe1, 0x6e, 0x6f, 0x41, 0xe5, 0xe1, 0x34, 0x39, 0x74, - 0xc8, 0xd3, 0x29, 0x49, 0x28, 0xbe, 0x0b, 0xc5, 0x84, 0xc6, 0xc4, 0x1d, 0x27, 0x26, 0x6a, 0xe7, - 0x3b, 0x95, 0x7e, 0xa3, 0x9b, 0xa5, 0xb2, 0xcb, 0x17, 0x06, 0x95, 0x8b, 0xd3, 0x56, 0xea, 0xe4, - 0xa4, 0x03, 0xbb, 0x0e, 0x55, 0xb1, 0x4f, 0x32, 0x89, 0xc2, 0x84, 0xd8, 0x7f, 0x21, 0xa8, 0x3e, - 0x9a, 0x92, 0xf8, 0x38, 0xdd, 0xb9, 0x09, 0xa5, 0x84, 0x04, 0xe4, 0x80, 0x46, 0xb1, 0x89, 0xda, - 0xa8, 0x53, 0x76, 0x32, 0x1b, 0xdf, 0x80, 0x42, 0xe0, 0x8f, 0x7d, 0x6a, 0xe6, 0xda, 0xa8, 0x53, - 0x73, 0x84, 0x81, 0x37, 0xa0, 0x90, 0x50, 0x37, 0xa6, 0x66, 0xbe, 0x8d, 0x3a, 0x95, 0x7e, 0xb3, - 0x2b, 0xa0, 0x77, 0x53, 0x40, 0xdd, 0xbd, 0x14, 0xfa, 0xa0, 0xf4, 0xfc, 0xb4, 0xa5, 0x3d, 0xfb, - 0xbd, 0x85, 0x1c, 0x11, 0x82, 0x3f, 0x82, 0x3c, 0x09, 0x3d, 0x53, 0x5f, 0x22, 0x92, 0x05, 0xe0, - 0xf7, 0xa0, 0xec, 0xf9, 0x31, 0x39, 0xa0, 0x7e, 0x14, 0x9a, 0x85, 0x36, 0xea, 0xd4, 0xfb, 0xaf, - 0xcd, 0x19, 0x78, 0x90, 0x2e, 0x39, 0x73, 0xaf, 0x1d, 0xbd, 0x64, 0x34, 0x8a, 0xf6, 0xc7, 0x50, - 0x93, 0x70, 0x05, 0x01, 0xf8, 0xce, 0xff, 0x32, 0x39, 0x27, 0xef, 0x37, 0x04, 0xd5, 0xaf, 0xdd, - 0x7d, 0x12, 0xa4, 0x64, 0x61, 0xd0, 0x43, 0x77, 0x4c, 0x24, 0x51, 0x7c, 0x8c, 0x57, 0xc1, 0xf8, - 0xde, 0x0d, 0xa6, 0x24, 0xe1, 0x2c, 0x95, 0x1c, 0x69, 0x2d, 0x4b, 0x13, 0x7a, 0x65, 0x9a, 0x50, - 0x46, 0x93, 0x7d, 0x1b, 0x6a, 0x32, 0x5f, 0x89, 0x76, 0x9e, 0x1c, 0x03, 0x5b, 0x4e, 0x93, 0xb3, - 0x0f, 0xc1, 0x10, 0x60, 0xb1, 0x0d, 0x46, 0xc0, 0x42, 0x12, 0x01, 0x6a, 0x00, 0x17, 0xa7, 0x2d, - 0x39, 0xe3, 0xc8, 0x7f, 0xbc, 0x01, 0x45, 0x12, 0xd2, 0xd8, 0xe7, 0x18, 0x19, 0x67, 0x2b, 0x73, - 0xce, 0xbe, 0x08, 0x69, 0x7c, 0x3c, 0x58, 0x61, 0xe5, 0x62, 0x02, 0x94, 0x7e, 0x4e, 0x3a, 0xb0, - 0x23, 0x28, 0x70, 0x17, 0xfc, 0x25, 0x94, 0xb3, 0x9e, 0xe0, 0x67, 0xfd, 0x37, 0xb2, 0xba, 0xdc, - 0x31, 0x47, 0x13, 0x8e, 0x6f, 0x1e, 0x8c, 0xdf, 0x04, 0x3d, 0xf0, 0x43, 0xc2, 0xf9, 0x2e, 0x0f, - 0x4a, 0x17, 0xa7, 0x2d, 0x6e, 0x3b, 0xfc, 0xd7, 0xfe, 0x09, 0x41, 0x65, 0xcf, 0xf5, 0xb3, 0x9a, - 0xdd, 0x80, 0xc2, 0x53, 0xa6, 0x00, 0x59, 0x34, 0x61, 0x30, 0xd9, 0x7b, 0x24, 0x70, 0x8f, 0xb7, - 0xa2, 0x98, 0x17, 0xa8, 0xe6, 0x64, 0xf6, 0x5c, 0xf6, 0xfa, 0xb5, 0xb2, 0x2f, 0x2c, 0x2d, 0xfb, - 0x1d, 0xbd, 0x94, 0x6b, 0xe4, 0xed, 0x63, 0xa8, 0x8a, 0xc4, 0x64, 0x71, 0x3a, 0x60, 0x08, 0xa5, - 0x49, 0x3a, 0xae, 0x2a, 0x51, 0xae, 0xe3, 0xcf, 0xa1, 0xee, 0xc5, 0xd1, 0x64, 0x42, 0xbc, 0x5d, - 0xa9, 0x5d, 0x51, 0x87, 0x35, 0xa5, 0x07, 0xd4, 0x75, 0x67, 0xc1, 0xdd, 0xfe, 0x19, 0x41, 0x6d, - 0x97, 0xf0, 0xca, 0x48, 0x5a, 0x32, 0x38, 0xe8, 0x95, 0xbb, 0x38, 0xb7, 0x6c, 0x17, 0xaf, 0x82, - 0x31, 0x8a, 0xa3, 0xe9, 0x24, 0x31, 0xf3, 0x42, 0x8d, 0xc2, 0xb2, 0x77, 0xa0, 0x9e, 0x26, 0x27, - 0xa9, 0x59, 0x07, 0x23, 0xe1, 0x33, 0xb2, 0x49, 0x9b, 0x0a, 0x35, 0x7c, 0x7e, 0xdb, 0x23, 0x21, - 0xf5, 0x87, 0x3e, 0x89, 0x07, 0x3a, 0x3b, 0xc4, 0x91, 0xfe, 0xf6, 0x8f, 0x08, 0x1a, 0x8b, 0x2e, - 0xf8, 0x33, 0x45, 0xe4, 0x6c, 0xbb, 0x5b, 0xff, 0xbe, 0x5d, 0x97, 0x37, 0x50, 0xc2, 0x35, 0x9b, - 0x36, 0x40, 0xf3, 0x1e, 0x54, 0x94, 0x69, 0xdc, 0x80, 0xfc, 0x11, 0x49, 0x05, 0xc5, 0x86, 0x4c, - 0x32, 0xbc, 0xb3, 0x84, 0x26, 0x1d, 0x61, 0x6c, 0xe4, 0xd6, 0x11, 0x93, 0x63, 0xed, 0x52, 0x6d, - 0xf0, 0x3a, 0xe8, 0xc3, 0x38, 0x1a, 0x2f, 0x45, 0x3c, 0x8f, 0xc0, 0x1f, 0x40, 0x8e, 0x46, 0x4b, - 0xd1, 0x9e, 0xa3, 0x11, 0x63, 0x5d, 0x82, 0xcf, 0xf3, 0xe4, 0xa4, 0x65, 0xff, 0x8a, 0x60, 0x85, - 0xc5, 0x08, 0x06, 0xee, 0x1f, 0x4e, 0xc3, 0x23, 0xdc, 0x81, 0x06, 0x3b, 0xe9, 0x89, 0x1f, 0x8e, - 0x48, 0x42, 0x49, 0xfc, 0xc4, 0xf7, 0x24, 0xcc, 0x3a, 0x9b, 0xdf, 0x96, 0xd3, 0xdb, 0x1e, 0x5e, - 0x83, 0xe2, 0x34, 0x11, 0x0e, 0x02, 0xb3, 0xc1, 0xcc, 0x6d, 0x0f, 0xbf, 0xad, 0x1c, 0xc7, 0xb8, - 0x56, 0xee, 0x69, 0xce, 0xe1, 0x43, 0xd7, 0x8f, 0xb3, 0x9b, 0xe5, 0x36, 0x18, 0x07, 0xec, 0xe0, - 0xc4, 0xd4, 0x17, 0x2f, 0x16, 0x9e, 0x90, 0x23, 0x97, 0xed, 0x0f, 0xa1, 0x9c, 0x45, 0x5f, 0x7b, - 0x0d, 0x5f, 0x5b, 0x01, 0xfb, 0x26, 0x14, 0x04, 0x30, 0x0c, 0xba, 0xe7, 0x52, 0x97, 0x87, 0x54, - 0x1d, 0x3e, 0xb6, 0x4d, 0x58, 0xdd, 0x8b, 0xdd, 0x30, 0x19, 0x92, 0x98, 0x3b, 0x65, 0xf2, 0xbb, - 0x73, 0x0b, 0xca, 0xd9, 0x9b, 0x82, 0x2b, 0x50, 0xdc, 0xfa, 0xc6, 0xf9, 0x76, 0xd3, 0x79, 0xd0, - 0xd0, 0x70, 0x15, 0x4a, 0x83, 0xcd, 0xfb, 0x5f, 0x71, 0x0b, 0xf5, 0x37, 0xc1, 0x60, 0xaf, 0x2b, - 0x89, 0xf1, 0x5d, 0xd0, 0xd9, 0x08, 0xbf, 0x3e, 0x07, 0xa0, 0xbc, 0xdf, 0xcd, 0xd5, 0xc5, 0x69, - 0xf9, 0x1c, 0x6b, 0xfd, 0xbf, 0x11, 0x14, 0xd9, 0x0b, 0xc5, 0x64, 0xfa, 0x09, 0x14, 0xf8, 0x63, - 0x85, 0x15, 0x77, 0xf5, 0xb1, 0x6e, 0xae, 0x5d, 0x99, 0x4f, 0xf7, 0x79, 0x17, 0xb1, 0x8e, 0xe6, - 0x14, 0xa9, 0xd1, 0xea, 0xeb, 0xa5, 0x46, 0x5f, 0x7a, 0x25, 0x6c, 0x0d, 0xdf, 0x03, 0x9d, 0x5d, - 0x4d, 0x6a, 0xfa, 0xca, 0x1d, 0xaa, 0xa6, 0xaf, 0xde, 0x60, 0xfc, 0xd8, 0x4f, 0xc1, 0x10, 0x0a, - 0xc2, 0x6b, 0x8b, 0x5d, 0x95, 0x86, 0x9b, 0x57, 0x17, 0x32, 0xfc, 0xdf, 0x41, 0x29, 0x55, 0x15, - 0x7e, 0x04, 0xf5, 0xcb, 0x05, 0xc1, 0x6f, 0x28, 0x07, 0x5f, 0x96, 0x6a, 0xb3, 0xad, 0x2c, 0x5d, - 0x5b, 0x45, 0x5b, 0xeb, 0xa0, 0xc1, 0xe3, 0x93, 0x33, 0x4b, 0x7b, 0x71, 0x66, 0x69, 0x2f, 0xcf, - 0x2c, 0xf4, 0xc3, 0xcc, 0x42, 0xbf, 0xcc, 0x2c, 0xf4, 0x7c, 0x66, 0xa1, 0x93, 0x99, 0x85, 0xfe, - 0x98, 0x59, 0xe8, 0xcf, 0x99, 0xa5, 0xbd, 0x9c, 0x59, 0xe8, 0xd9, 0xb9, 0xa5, 0x9d, 0x9c, 0x5b, - 0xda, 0x8b, 0x73, 0x4b, 0x7b, 0xfc, 0x96, 0xfa, 0xb1, 0x16, 0xbb, 0x43, 0x37, 0x74, 0x7b, 0x41, - 0x74, 0xe4, 0xf7, 0xd4, 0x8f, 0xc1, 0x7d, 0x83, 0xff, 0xbd, 0xff, 0x4f, 0x00, 0x00, 0x00, 0xff, - 0xff, 0xb9, 0x5d, 0x4d, 0x8c, 0x23, 0x0a, 0x00, 0x00, + // 1113 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 0x5f, 0x6f, 0x1b, 0x45, + 0x10, 0xbf, 0xb5, 0xcf, 0x67, 0x7b, 0xfc, 0xa7, 0xd6, 0x36, 0x4d, 0x8c, 0x0b, 0x67, 0xeb, 0x84, + 0x5a, 0xab, 0x80, 0x0d, 0xe1, 0x4f, 0xd3, 0xf0, 0x4f, 0x71, 0x4a, 0x44, 0x02, 0x52, 0xdb, 0x4b, + 0x24, 0xa4, 0x4a, 0xa8, 0xba, 0xc4, 0x6b, 0xe7, 0x14, 0xfb, 0xce, 0xbd, 0x5d, 0x23, 0xe5, 0x8d, + 0x2f, 0x80, 0xd4, 0x37, 0x1e, 0xf8, 0x02, 0x88, 0x07, 0x3e, 0x47, 0x9f, 0x50, 0x1e, 0xfb, 0x14, + 0x88, 0xf3, 0x82, 0x22, 0x21, 0xf5, 0x23, 0xa0, 0xfd, 0x73, 0xe7, 0xb5, 0x93, 0x82, 0xdc, 0x17, + 0x7b, 0x67, 0x76, 0x66, 0x77, 0xe6, 0x37, 0xbf, 0x99, 0x3d, 0xb8, 0x39, 0x3a, 0xea, 0xb7, 0x07, + 0x61, 0x7f, 0x14, 0x85, 0x2c, 0x4c, 0x16, 0x2d, 0xf1, 0x8b, 0x73, 0xb1, 0x5c, 0xab, 0xf7, 0xc3, + 0xb0, 0x3f, 0x20, 0x6d, 0x21, 0xed, 0x8f, 0x7b, 0x6d, 0xe6, 0x0f, 0x09, 0x65, 0xde, 0x70, 0x24, + 0x4d, 0x6b, 0xef, 0xf5, 0x7d, 0x76, 0x38, 0xde, 0x6f, 0x1d, 0x84, 0xc3, 0x76, 0x3f, 0xec, 0x87, + 0x53, 0x4b, 0x2e, 0xc9, 0xd3, 0xf9, 0x4a, 0x9a, 0x3b, 0x5b, 0x50, 0x78, 0x38, 0xa6, 0x87, 0x2e, + 0x79, 0x3a, 0x26, 0x94, 0xe1, 0xbb, 0x90, 0xa5, 0x2c, 0x22, 0xde, 0x90, 0x56, 0x51, 0x23, 0xdd, + 0x2c, 0xac, 0x56, 0x5a, 0x49, 0x28, 0xbb, 0x62, 0xa3, 0x53, 0xb8, 0x38, 0xad, 0xc7, 0x46, 0x6e, + 0xbc, 0x70, 0xca, 0x50, 0x94, 0xe7, 0xd0, 0x51, 0x18, 0x50, 0xe2, 0xfc, 0x83, 0xa0, 0xf8, 0x68, + 0x4c, 0xa2, 0xe3, 0xf8, 0xe4, 0x1a, 0xe4, 0x28, 0x19, 0x90, 0x03, 0x16, 0x46, 0x55, 0xd4, 0x40, + 0xcd, 0xbc, 0x9b, 0xc8, 0x78, 0x09, 0x32, 0x03, 0x7f, 0xe8, 0xb3, 0x6a, 0xaa, 0x81, 0x9a, 0x25, + 0x57, 0x0a, 0x78, 0x1d, 0x32, 0x94, 0x79, 0x11, 0xab, 0xa6, 0x1b, 0xa8, 0x59, 0x58, 0xad, 0xb5, + 0x64, 0xea, 0xad, 0x38, 0xa1, 0xd6, 0x5e, 0x9c, 0x7a, 0x27, 0xf7, 0xfc, 0xb4, 0x6e, 0x3c, 0xfb, + 0xb3, 0x8e, 0x5c, 0xe9, 0x82, 0x3f, 0x81, 0x34, 0x09, 0xba, 0x55, 0x73, 0x01, 0x4f, 0xee, 0x80, + 0x3f, 0x80, 0x7c, 0xd7, 0x8f, 0xc8, 0x01, 0xf3, 0xc3, 0xa0, 0x9a, 0x69, 0xa0, 0x66, 0x79, 0xf5, + 0xfa, 0x14, 0x81, 0xfb, 0xf1, 0x96, 0x3b, 0xb5, 0xda, 0x31, 0x73, 0x56, 0x25, 0xeb, 0x7c, 0x0a, + 0x25, 0x95, 0xae, 0x04, 0x00, 0xdf, 0xf9, 0x5f, 0x24, 0xa7, 0xe0, 0xfd, 0x8e, 0xa0, 0xf8, 0xad, + 0xb7, 0x4f, 0x06, 0x31, 0x58, 0x18, 0xcc, 0xc0, 0x1b, 0x12, 0x05, 0x94, 0x58, 0xe3, 0x65, 0xb0, + 0x7e, 0xf0, 0x06, 0x63, 0x42, 0x05, 0x4a, 0x39, 0x57, 0x49, 0x8b, 0xc2, 0x84, 0x5e, 0x1b, 0x26, + 0x94, 0xc0, 0xe4, 0xdc, 0x86, 0x92, 0x8a, 0x57, 0x65, 0x3b, 0x0d, 0x8e, 0x27, 0x9b, 0x8f, 0x83, + 0x73, 0x0e, 0xc1, 0x92, 0xc9, 0x62, 0x07, 0xac, 0x01, 0x77, 0xa1, 0x32, 0xa9, 0x0e, 0x5c, 0x9c, + 0xd6, 0x95, 0xc6, 0x55, 0xff, 0x78, 0x1d, 0xb2, 0x24, 0x60, 0x91, 0x2f, 0x72, 0xe4, 0x98, 0x5d, + 0x9b, 0x62, 0xf6, 0x55, 0xc0, 0xa2, 0xe3, 0xce, 0x35, 0x5e, 0x2e, 0x4e, 0x40, 0x65, 0xe7, 0xc6, + 0x0b, 0x27, 0x84, 0x8c, 0x30, 0xc1, 0x5f, 0x43, 0x3e, 0xe9, 0x09, 0x71, 0xd7, 0x7f, 0x67, 0x56, + 0x56, 0x27, 0xa6, 0x18, 0x15, 0xf9, 0x4d, 0x9d, 0xf1, 0x9b, 0x60, 0x0e, 0xfc, 0x80, 0x08, 0xbc, + 0xf3, 0x9d, 0xdc, 0xc5, 0x69, 0x5d, 0xc8, 0xae, 0xf8, 0x75, 0x7e, 0x46, 0x50, 0xd8, 0xf3, 0xfc, + 0xa4, 0x66, 0x4b, 0x90, 0x79, 0xca, 0x19, 0xa0, 0x8a, 0x26, 0x05, 0x4e, 0xfb, 0x2e, 0x19, 0x78, + 0xc7, 0x5b, 0x61, 0x24, 0x0a, 0x54, 0x72, 0x13, 0x79, 0x4a, 0x7b, 0xf3, 0x4a, 0xda, 0x67, 0x16, + 0xa6, 0xfd, 0x8e, 0x99, 0x4b, 0x55, 0xd2, 0xce, 0x31, 0x14, 0x65, 0x60, 0xaa, 0x38, 0x4d, 0xb0, + 0x24, 0xd3, 0x14, 0x1c, 0x97, 0x99, 0xa8, 0xf6, 0xf1, 0x97, 0x50, 0xee, 0x46, 0xe1, 0x68, 0x44, + 0xba, 0xbb, 0x8a, 0xbb, 0xb2, 0x0e, 0x2b, 0x5a, 0x0f, 0xe8, 0xfb, 0xee, 0x9c, 0xb9, 0xf3, 0x0b, + 0x82, 0xd2, 0x2e, 0x11, 0x95, 0x51, 0xb0, 0x24, 0xe9, 0xa0, 0xd7, 0xee, 0xe2, 0xd4, 0xa2, 0x5d, + 0xbc, 0x0c, 0x56, 0x3f, 0x0a, 0xc7, 0x23, 0x5a, 0x4d, 0x4b, 0x36, 0x4a, 0xc9, 0xd9, 0x81, 0x72, + 0x1c, 0x9c, 0x82, 0x66, 0x0d, 0x2c, 0x2a, 0x34, 0xaa, 0x49, 0x6b, 0x1a, 0x34, 0x42, 0xbf, 0xdd, + 0x25, 0x01, 0xf3, 0x7b, 0x3e, 0x89, 0x3a, 0x26, 0xbf, 0xc4, 0x55, 0xf6, 0xce, 0x4f, 0x08, 0x2a, + 0xf3, 0x26, 0xf8, 0x0b, 0x8d, 0xe4, 0xfc, 0xb8, 0x5b, 0xaf, 0x3e, 0xae, 0x25, 0x1a, 0x88, 0x0a, + 0xce, 0xc6, 0x0d, 0x50, 0xbb, 0x07, 0x05, 0x4d, 0x8d, 0x2b, 0x90, 0x3e, 0x22, 0x31, 0xa1, 0xf8, + 0x92, 0x53, 0x46, 0x74, 0x96, 0xe4, 0xa4, 0x2b, 0x85, 0xf5, 0xd4, 0x1a, 0xe2, 0x74, 0x2c, 0xcd, + 0xd4, 0x06, 0xaf, 0x81, 0xd9, 0x8b, 0xc2, 0xe1, 0x42, 0xc0, 0x0b, 0x0f, 0xfc, 0x11, 0xa4, 0x58, + 0xb8, 0x10, 0xec, 0x29, 0x16, 0x72, 0xd4, 0x55, 0xf2, 0x69, 0x11, 0x9c, 0x92, 0x9c, 0xdf, 0x10, + 0x5c, 0xe3, 0x3e, 0x12, 0x81, 0xcd, 0xc3, 0x71, 0x70, 0x84, 0x9b, 0x50, 0xe1, 0x37, 0x3d, 0xf1, + 0x83, 0x3e, 0xa1, 0x8c, 0x44, 0x4f, 0xfc, 0xae, 0x4a, 0xb3, 0xcc, 0xf5, 0xdb, 0x4a, 0xbd, 0xdd, + 0xc5, 0x2b, 0x90, 0x1d, 0x53, 0x69, 0x20, 0x73, 0xb6, 0xb8, 0xb8, 0xdd, 0xc5, 0xef, 0x68, 0xd7, + 0x71, 0xac, 0xb5, 0x39, 0x2d, 0x30, 0x7c, 0xe8, 0xf9, 0x51, 0x32, 0x59, 0x6e, 0x83, 0x75, 0xc0, + 0x2f, 0xa6, 0x55, 0x73, 0x7e, 0xb0, 0x88, 0x80, 0x5c, 0xb5, 0xed, 0x7c, 0x0c, 0xf9, 0xc4, 0xfb, + 0xca, 0x31, 0x7c, 0x65, 0x05, 0x9c, 0x9b, 0x90, 0x91, 0x89, 0x61, 0x30, 0xbb, 0x1e, 0xf3, 0x84, + 0x4b, 0xd1, 0x15, 0x6b, 0xa7, 0x0a, 0xcb, 0x7b, 0x91, 0x17, 0xd0, 0x1e, 0x89, 0x84, 0x51, 0x42, + 0x3f, 0xe7, 0x06, 0x5c, 0xe7, 0x9d, 0x4a, 0x22, 0xba, 0x19, 0x8e, 0x03, 0xa6, 0x7a, 0xc6, 0x79, + 0x17, 0x96, 0x66, 0xd5, 0x8a, 0xad, 0x4b, 0x90, 0x39, 0xe0, 0x0a, 0x71, 0x7a, 0xc9, 0x95, 0xc2, + 0x9d, 0x5b, 0x90, 0x4f, 0x1e, 0x26, 0x5c, 0x80, 0xec, 0xd6, 0x03, 0xf7, 0xbb, 0x0d, 0xf7, 0x7e, + 0xc5, 0xc0, 0x45, 0xc8, 0x75, 0x36, 0x36, 0xbf, 0x11, 0x12, 0x5a, 0xdd, 0x00, 0x8b, 0x3f, 0xd1, + 0x24, 0xc2, 0x77, 0xc1, 0xe4, 0x2b, 0x7c, 0x63, 0x8a, 0x82, 0xf6, 0x11, 0x50, 0x5b, 0x9e, 0x57, + 0xab, 0x68, 0x8d, 0xd5, 0x3f, 0x52, 0x90, 0xe5, 0xcf, 0x1c, 0xe7, 0xfa, 0x67, 0x90, 0x11, 0x2f, + 0x1e, 0xd6, 0xcc, 0xf5, 0x17, 0xbf, 0xb6, 0x72, 0x49, 0x1f, 0x9f, 0xf3, 0x3e, 0xe2, 0x63, 0x41, + 0xe0, 0xac, 0x7b, 0xeb, 0x4f, 0xa0, 0xee, 0x3d, 0xf3, 0xd4, 0x38, 0x06, 0xbe, 0x07, 0x26, 0x87, + 0x47, 0x0f, 0x5f, 0x1b, 0xc4, 0x7a, 0xf8, 0xfa, 0x18, 0x14, 0xd7, 0x7e, 0x0e, 0x96, 0xa4, 0x21, + 0x5e, 0x99, 0x6f, 0xcd, 0xd8, 0xbd, 0x7a, 0x79, 0x23, 0xb9, 0xf9, 0x81, 0x9c, 0xac, 0x71, 0x61, + 0xf0, 0x5b, 0xb3, 0x57, 0xcd, 0xd5, 0xb1, 0x66, 0xbf, 0x6a, 0x3b, 0x01, 0xf4, 0x7b, 0xc8, 0xc5, + 0x5c, 0xc7, 0x8f, 0xa0, 0x3c, 0x4b, 0x13, 0xfc, 0x86, 0xe6, 0x3f, 0xdb, 0x40, 0xb5, 0x86, 0xb6, + 0x75, 0x35, 0xb7, 0x8c, 0x26, 0xea, 0x3c, 0x3e, 0x39, 0xb3, 0x8d, 0x17, 0x67, 0xb6, 0xf1, 0xf2, + 0xcc, 0x46, 0x3f, 0x4e, 0x6c, 0xf4, 0xeb, 0xc4, 0x46, 0xcf, 0x27, 0x36, 0x3a, 0x99, 0xd8, 0xe8, + 0xaf, 0x89, 0x8d, 0xfe, 0x9e, 0xd8, 0xc6, 0xcb, 0x89, 0x8d, 0x9e, 0x9d, 0xdb, 0xc6, 0xc9, 0xb9, + 0x6d, 0xbc, 0x38, 0xb7, 0x8d, 0xc7, 0x6f, 0xeb, 0x9f, 0x90, 0x91, 0xd7, 0xf3, 0x02, 0xaf, 0x3d, + 0x08, 0x8f, 0xfc, 0xb6, 0xfe, 0x89, 0xba, 0x6f, 0x89, 0xbf, 0x0f, 0xff, 0x0d, 0x00, 0x00, 0xff, + 0xff, 0x89, 0x7a, 0x2c, 0xbd, 0xb9, 0x0a, 0x00, 0x00, } func (x Direction) String() string { @@ -1633,6 +1716,51 @@ func (this *TransferChunksResponse) Equal(that interface{}) bool { } return true } +func (this *TailersCountRequest) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*TailersCountRequest) + if !ok { + that2, ok := that.(TailersCountRequest) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + return true +} +func (this *TailersCountResponse) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*TailersCountResponse) + if !ok { + that2, ok := that.(TailersCountResponse) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.Count != that1.Count { + return false + } + return true +} func (this *PushRequest) GoString() string { if this == nil { return "nil" @@ -1868,6 +1996,25 @@ func (this *TransferChunksResponse) GoString() string { s = append(s, "}") return strings.Join(s, "") } +func (this *TailersCountRequest) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 4) + s = append(s, "&logproto.TailersCountRequest{") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *TailersCountResponse) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 5) + s = append(s, "&logproto.TailersCountResponse{") + s = append(s, "Count: "+fmt.Sprintf("%#v", this.Count)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} func valueToGoStringLogproto(v interface{}, typ string) string { rv := reflect.ValueOf(v) if rv.IsNil() { @@ -1965,6 +2112,7 @@ type QuerierClient interface { Label(ctx context.Context, in *LabelRequest, opts ...grpc.CallOption) (*LabelResponse, error) Tail(ctx context.Context, in *TailRequest, opts ...grpc.CallOption) (Querier_TailClient, error) Series(ctx context.Context, in *SeriesRequest, opts ...grpc.CallOption) (*SeriesResponse, error) + TailersCount(ctx context.Context, in *TailersCountRequest, opts ...grpc.CallOption) (*TailersCountResponse, error) } type querierClient struct { @@ -2057,12 +2205,22 @@ func (c *querierClient) Series(ctx context.Context, in *SeriesRequest, opts ...g return out, nil } +func (c *querierClient) TailersCount(ctx context.Context, in *TailersCountRequest, opts ...grpc.CallOption) (*TailersCountResponse, error) { + out := new(TailersCountResponse) + err := c.cc.Invoke(ctx, "/logproto.Querier/TailersCount", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // QuerierServer is the server API for Querier service. type QuerierServer interface { Query(*QueryRequest, Querier_QueryServer) error Label(context.Context, *LabelRequest) (*LabelResponse, error) Tail(*TailRequest, Querier_TailServer) error Series(context.Context, *SeriesRequest) (*SeriesResponse, error) + TailersCount(context.Context, *TailersCountRequest) (*TailersCountResponse, error) } // UnimplementedQuerierServer can be embedded to have forward compatible implementations. @@ -2081,6 +2239,9 @@ func (*UnimplementedQuerierServer) Tail(req *TailRequest, srv Querier_TailServer func (*UnimplementedQuerierServer) Series(ctx context.Context, req *SeriesRequest) (*SeriesResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Series not implemented") } +func (*UnimplementedQuerierServer) TailersCount(ctx context.Context, req *TailersCountRequest) (*TailersCountResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method TailersCount not implemented") +} func RegisterQuerierServer(s *grpc.Server, srv QuerierServer) { s.RegisterService(&_Querier_serviceDesc, srv) @@ -2164,6 +2325,24 @@ func _Querier_Series_Handler(srv interface{}, ctx context.Context, dec func(inte return interceptor(ctx, in, info, handler) } +func _Querier_TailersCount_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(TailersCountRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(QuerierServer).TailersCount(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/logproto.Querier/TailersCount", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(QuerierServer).TailersCount(ctx, req.(*TailersCountRequest)) + } + return interceptor(ctx, in, info, handler) +} + var _Querier_serviceDesc = grpc.ServiceDesc{ ServiceName: "logproto.Querier", HandlerType: (*QuerierServer)(nil), @@ -2176,6 +2355,10 @@ var _Querier_serviceDesc = grpc.ServiceDesc{ MethodName: "Series", Handler: _Querier_Series_Handler, }, + { + MethodName: "TailersCount", + Handler: _Querier_TailersCount_Handler, + }, }, Streams: []grpc.StreamDesc{ { @@ -3050,6 +3233,57 @@ func (m *TransferChunksResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) return len(dAtA) - i, nil } +func (m *TailersCountRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *TailersCountRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *TailersCountRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + return len(dAtA) - i, nil +} + +func (m *TailersCountResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *TailersCountResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *TailersCountResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Count != 0 { + i = encodeVarintLogproto(dAtA, i, uint64(m.Count)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + func encodeVarintLogproto(dAtA []byte, offset int, v uint64) int { offset -= sovLogproto(v) base := offset @@ -3372,6 +3606,27 @@ func (m *TransferChunksResponse) Size() (n int) { return n } +func (m *TailersCountRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func (m *TailersCountResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Count != 0 { + n += 1 + sovLogproto(uint64(m.Count)) + } + return n +} + func sovLogproto(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 } @@ -3622,6 +3877,25 @@ func (this *TransferChunksResponse) String() string { }, "") return s } +func (this *TailersCountRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&TailersCountRequest{`, + `}`, + }, "") + return s +} +func (this *TailersCountResponse) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&TailersCountResponse{`, + `Count:` + fmt.Sprintf("%v", this.Count) + `,`, + `}`, + }, "") + return s +} func valueToStringLogproto(v interface{}) string { rv := reflect.ValueOf(v) if rv.IsNil() { @@ -5835,6 +6109,131 @@ func (m *TransferChunksResponse) Unmarshal(dAtA []byte) error { } return nil } +func (m *TailersCountRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: TailersCountRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: TailersCountRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipLogproto(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthLogproto + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthLogproto + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *TailersCountResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: TailersCountResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: TailersCountResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Count", wireType) + } + m.Count = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Count |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipLogproto(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthLogproto + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthLogproto + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipLogproto(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 diff --git a/pkg/logproto/logproto.proto b/pkg/logproto/logproto.proto index a8fa23e710..241c819471 100644 --- a/pkg/logproto/logproto.proto +++ b/pkg/logproto/logproto.proto @@ -16,6 +16,7 @@ service Querier { rpc Label(LabelRequest) returns (LabelResponse) {}; rpc Tail(TailRequest) returns (stream TailResponse) {}; rpc Series(SeriesRequest) returns (SeriesResponse) {}; + rpc TailersCount(TailersCountRequest) returns (TailersCountResponse) {}; } service Ingester { @@ -119,4 +120,12 @@ message Chunk { message TransferChunksResponse { -} \ No newline at end of file +} + +message TailersCountRequest { + +} + +message TailersCountResponse { + uint32 count = 1; +} diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 59539731e6..bc8c610d21 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -274,6 +274,11 @@ func mergePair(s1, s2 []string) []string { // Tail keeps getting matching logs from all ingesters for given query func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer, error) { + err := q.checkTailRequestLimit(ctx) + if err != nil { + return nil, err + } + histReq := logql.SelectParams{ QueryRequest: &logproto.QueryRequest{ Selector: req.Query, @@ -284,7 +289,7 @@ func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer, }, } - err := q.validateQueryRequest(ctx, histReq.QueryRequest) + err = q.validateQueryRequest(ctx, histReq.QueryRequest) if err != nil { return nil, err } @@ -536,3 +541,56 @@ func (q *Querier) validateQueryTimeRange(userID string, from *time.Time, through return nil } + +func (q *Querier) checkTailRequestLimit(ctx context.Context) error { + userID, err := user.ExtractOrgID(ctx) + if err != nil { + return err + } + + replicationSet, err := q.ring.GetAll() + if err != nil { + return err + } + + // we want to check count of active tailers with only active ingesters + ingesters := make([]ring.IngesterDesc, 0, 1) + for i := range replicationSet.Ingesters { + if replicationSet.Ingesters[i].State == ring.ACTIVE { + ingesters = append(ingesters, replicationSet.Ingesters[i]) + } + } + + if len(ingesters) == 0 { + return httpgrpc.Errorf(http.StatusInternalServerError, "no active ingester found") + } + + responses, err := q.forGivenIngesters(ctx, replicationSet, func(querierClient logproto.QuerierClient) (interface{}, error) { + resp, err := querierClient.TailersCount(ctx, nil) + if err != nil { + return nil, err + } + return resp.Count, nil + }) + // We are only checking active ingesters, and any error returned stops checking other ingesters + // so return that error here as well. + if err != nil { + return err + } + + var maxCnt uint32 + maxCnt = 0 + for _, resp := range responses { + r := resp.response.(uint32) + if r > maxCnt { + maxCnt = r + } + } + + if maxCnt >= uint32(q.limits.MaxConcurrentTailRequests(userID)) { + return httpgrpc.Errorf(http.StatusBadRequest, + "max concurrent tail requests limit exceeded, count > limit (%d > %d)", maxCnt+1, 1) + } + + return nil +} diff --git a/pkg/querier/querier_mock_test.go b/pkg/querier/querier_mock_test.go index a9868d199c..54bc79851c 100644 --- a/pkg/querier/querier_mock_test.go +++ b/pkg/querier/querier_mock_test.go @@ -60,6 +60,11 @@ func (c *querierClientMock) Series(ctx context.Context, in *logproto.SeriesReque return res.(*logproto.SeriesResponse), args.Error(1) } +func (c *querierClientMock) TailersCount(ctx context.Context, in *logproto.TailersCountRequest, opts ...grpc.CallOption) (*logproto.TailersCountResponse, error) { + args := c.Called(ctx, in, opts) + return args.Get(0).(*logproto.TailersCountResponse), args.Error(1) +} + func (c *querierClientMock) Context() context.Context { return context.Background() } diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index fc67c8d89e..13df8482a8 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -96,6 +96,7 @@ func TestQuerier_Tail_QueryTimeoutConfigFlag(t *testing.T) { ingesterClient := newQuerierClientMock() ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything).Return(queryClient, nil) ingesterClient.On("Tail", mock.Anything, &request, mock.Anything).Return(tailClient, nil) + ingesterClient.On("TailersCount", mock.Anything, mock.Anything, mock.Anything).Return(&logproto.TailersCountResponse{}, nil) limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) @@ -447,7 +448,7 @@ func TestQuerier_SeriesAPI(t *testing.T) { func TestQuerier_IngesterMaxQueryLookback(t *testing.T) { - limits, err := validation.NewOverrides(defaultLimitsTestConfig()) + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) for _, tc := range []struct { @@ -522,3 +523,86 @@ func TestQuerier_IngesterMaxQueryLookback(t *testing.T) { }) } } + +func TestQuerier_concurrentTailLimits(t *testing.T) { + request := logproto.TailRequest{ + Query: "{type=\"test\"}", + DelayFor: 0, + Limit: 10, + Start: time.Now(), + } + + t.Parallel() + + tests := map[string]struct { + ringIngesters []ring.IngesterDesc + expectedError error + tailersCount uint32 + }{ + "empty ring": { + ringIngesters: []ring.IngesterDesc{}, + expectedError: httpgrpc.Errorf(http.StatusInternalServerError, "no active ingester found"), + }, + "ring containing one pending ingester": { + ringIngesters: []ring.IngesterDesc{mockIngesterDesc("1.1.1.1", ring.PENDING)}, + expectedError: httpgrpc.Errorf(http.StatusInternalServerError, "no active ingester found"), + }, + "ring containing one active ingester and 0 active tailers": { + ringIngesters: []ring.IngesterDesc{mockIngesterDesc("1.1.1.1", ring.ACTIVE)}, + }, + "ring containing one active ingester and 1 active tailer": { + ringIngesters: []ring.IngesterDesc{mockIngesterDesc("1.1.1.1", ring.ACTIVE)}, + tailersCount: 1, + }, + "ring containing one pending and active ingester with 1 active tailer": { + ringIngesters: []ring.IngesterDesc{mockIngesterDesc("1.1.1.1", ring.PENDING), mockIngesterDesc("2.2.2.2", ring.ACTIVE)}, + tailersCount: 1, + }, + "ring containing one active ingester and max active tailers": { + ringIngesters: []ring.IngesterDesc{mockIngesterDesc("1.1.1.1", ring.ACTIVE)}, + expectedError: httpgrpc.Errorf(http.StatusBadRequest, + "max concurrent tail requests limit exceeded, count > limit (%d > %d)", 6, 1), + tailersCount: 5, + }, + } + + for testName, testData := range tests { + testData := testData + + t.Run(testName, func(t *testing.T) { + // For this test's purpose, whenever a new ingester client needs to + // be created, the factory will always return the same mock instance + store := newStoreMock() + store.On("LazyQuery", mock.Anything, mock.Anything).Return(mockStreamIterator(1, 2), nil) + + queryClient := newQueryClientMock() + queryClient.On("Recv").Return(mockQueryResponse([]*logproto.Stream{mockStream(1, 2)}), nil) + + tailClient := newTailClientMock() + tailClient.On("Recv").Return(mockTailResponse(mockStream(1, 2)), nil) + + ingesterClient := newQuerierClientMock() + ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything).Return(queryClient, nil) + ingesterClient.On("Tail", mock.Anything, &request, mock.Anything).Return(tailClient, nil) + ingesterClient.On("TailersCount", mock.Anything, mock.Anything, mock.Anything).Return(&logproto.TailersCountResponse{testData.tailersCount}, nil) + + defaultLimits := defaultLimitsTestConfig() + defaultLimits.MaxConcurrentTailRequests = 5 + + limits, err := validation.NewOverrides(defaultLimits, nil) + require.NoError(t, err) + + q, err := newQuerier( + mockQuerierConfig(), + mockIngesterClientConfig(), + newIngesterClientMockFactory(ingesterClient), + newReadRingMock(testData.ringIngesters), + store, limits) + require.NoError(t, err) + + ctx := user.InjectOrgID(context.Background(), "test") + _, err = q.Tail(ctx, &request) + assert.Equal(t, testData.expectedError, err) + }) + } +} diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index e306b91166..34810d86ae 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -40,6 +40,7 @@ type Limits struct { MaxQueryParallelism int `yaml:"max_query_parallelism"` CardinalityLimit int `yaml:"cardinality_limit"` MaxStreamsMatchersPerQuery int `yaml:"max_streams_matchers_per_query"` + MaxConcurrentTailRequests int `yaml:"max_concurrent_tail_requests"` // Config for overrides, convenient if it goes here. PerTenantOverrideConfig string `yaml:"per_tenant_override_config"` @@ -67,6 +68,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.MaxQueryParallelism, "querier.max-query-parallelism", 14, "Maximum number of queries will be scheduled in parallel by the frontend.") f.IntVar(&l.CardinalityLimit, "store.cardinality-limit", 1e5, "Cardinality limit for index queries.") f.IntVar(&l.MaxStreamsMatchersPerQuery, "querier.max-streams-matcher-per-query", 1000, "Limit the number of streams matchers per query") + f.IntVar(&l.MaxConcurrentTailRequests, "querier.max-concurrent-tail-requests", 10, "Limit the number of concurrent tail requests") f.StringVar(&l.PerTenantOverrideConfig, "limits.per-user-override-config", "", "File name of per-user overrides.") f.DurationVar(&l.PerTenantOverridePeriod, "limits.per-user-override-period", 10*time.Second, "Period with this to reload the overrides.") @@ -212,6 +214,11 @@ func (o *Overrides) MaxStreamsMatchersPerQuery(userID string) int { return o.getOverridesForUser(userID).MaxStreamsMatchersPerQuery } +// MaxConcurrentTailRequests returns the limit to number of concurrent tail requests. +func (o *Overrides) MaxConcurrentTailRequests(userID string) int { + return o.getOverridesForUser(userID).MaxConcurrentTailRequests +} + func (o *Overrides) getOverridesForUser(userID string) *Limits { if o.tenantLimits != nil { l := o.tenantLimits(userID)