limit for concurrent tail requests (#1562)

* limit for concurrent tail requests

When a new request comes in, querier would ask one of the active ingesters for count of active tail requests for the user
If the count exceeds the configured limits for the user then tail request would be rejected right away.

* goimports querier_test.go

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* query all ingesters for their max tail count

Signed-off-by: Edward Welch <edward.welch@grafana.com>

* changed to check only ACTIVE ingesters and tweaked error handling some

Signed-off-by: Edward Welch <edward.welch@grafana.com>

Co-authored-by: Cyril Tovena <cyril.tovena@gmail.com>
Co-authored-by: Ed Welch <ed@oqqer.com>
pull/1573/head
Sandeep Sukhani 6 years ago committed by Cyril Tovena
parent d4cae667ae
commit c7a3ec54f1
  1. 17
      pkg/ingester/ingester.go
  2. 28
      pkg/ingester/instance.go
  3. 535
      pkg/logproto/logproto.pb.go
  4. 11
      pkg/logproto/logproto.proto
  5. 60
      pkg/querier/querier.go
  6. 5
      pkg/querier/querier_mock_test.go
  7. 86
      pkg/querier/querier_test.go
  8. 7
      pkg/util/validation/limits.go

@ -322,3 +322,20 @@ func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_
tailer.loop()
return nil
}
// TailersCount returns count of active tail requests from a user
func (i *Ingester) TailersCount(ctx context.Context, in *logproto.TailersCountRequest) (*logproto.TailersCountResponse, error) {
instanceID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}
resp := logproto.TailersCountResponse{}
instance, ok := i.getInstanceByID(instanceID)
if ok {
resp.Count = instance.openTailersCount()
}
return &resp, nil
}

@ -318,12 +318,13 @@ func (i *instance) addNewTailer(t *tailer) {
}
func (i *instance) addTailersToNewStream(stream *stream) {
closedTailers := []uint32{}
i.tailerMtx.RLock()
defer i.tailerMtx.RUnlock()
for _, t := range i.tailers {
// we don't want to watch streams for closed tailers.
// When a new tail request comes in we will clean references to closed tailers
if t.isClosed() {
closedTailers = append(closedTailers, t.getID())
continue
}
@ -331,6 +332,18 @@ func (i *instance) addTailersToNewStream(stream *stream) {
stream.addTailer(t)
}
}
}
func (i *instance) checkClosedTailers() {
closedTailers := []uint32{}
i.tailerMtx.RLock()
for _, t := range i.tailers {
if t.isClosed() {
closedTailers = append(closedTailers, t.getID())
continue
}
}
i.tailerMtx.RUnlock()
if len(closedTailers) != 0 {
@ -350,6 +363,15 @@ func (i *instance) closeTailers() {
}
}
func (i *instance) openTailersCount() uint32 {
i.checkClosedTailers()
i.tailerMtx.RLock()
defer i.tailerMtx.RUnlock()
return uint32(len(i.tailers))
}
func isDone(ctx context.Context) bool {
select {
case <-ctx.Done():

@ -983,6 +983,84 @@ func (m *TransferChunksResponse) XXX_DiscardUnknown() {
var xxx_messageInfo_TransferChunksResponse proto.InternalMessageInfo
type TailersCountRequest struct {
}
func (m *TailersCountRequest) Reset() { *m = TailersCountRequest{} }
func (*TailersCountRequest) ProtoMessage() {}
func (*TailersCountRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_c28a5f14f1f4c79a, []int{18}
}
func (m *TailersCountRequest) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *TailersCountRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_TailersCountRequest.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *TailersCountRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_TailersCountRequest.Merge(m, src)
}
func (m *TailersCountRequest) XXX_Size() int {
return m.Size()
}
func (m *TailersCountRequest) XXX_DiscardUnknown() {
xxx_messageInfo_TailersCountRequest.DiscardUnknown(m)
}
var xxx_messageInfo_TailersCountRequest proto.InternalMessageInfo
type TailersCountResponse struct {
Count uint32 `protobuf:"varint,1,opt,name=count,proto3" json:"count,omitempty"`
}
func (m *TailersCountResponse) Reset() { *m = TailersCountResponse{} }
func (*TailersCountResponse) ProtoMessage() {}
func (*TailersCountResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_c28a5f14f1f4c79a, []int{19}
}
func (m *TailersCountResponse) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *TailersCountResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_TailersCountResponse.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *TailersCountResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_TailersCountResponse.Merge(m, src)
}
func (m *TailersCountResponse) XXX_Size() int {
return m.Size()
}
func (m *TailersCountResponse) XXX_DiscardUnknown() {
xxx_messageInfo_TailersCountResponse.DiscardUnknown(m)
}
var xxx_messageInfo_TailersCountResponse proto.InternalMessageInfo
func (m *TailersCountResponse) GetCount() uint32 {
if m != nil {
return m.Count
}
return 0
}
func init() {
proto.RegisterEnum("logproto.Direction", Direction_name, Direction_value)
proto.RegisterType((*PushRequest)(nil), "logproto.PushRequest")
@ -1004,79 +1082,84 @@ func init() {
proto.RegisterType((*LabelPair)(nil), "logproto.LabelPair")
proto.RegisterType((*Chunk)(nil), "logproto.Chunk")
proto.RegisterType((*TransferChunksResponse)(nil), "logproto.TransferChunksResponse")
proto.RegisterType((*TailersCountRequest)(nil), "logproto.TailersCountRequest")
proto.RegisterType((*TailersCountResponse)(nil), "logproto.TailersCountResponse")
}
func init() { proto.RegisterFile("pkg/logproto/logproto.proto", fileDescriptor_c28a5f14f1f4c79a) }
var fileDescriptor_c28a5f14f1f4c79a = []byte{
// 1065 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 0xdd, 0x6e, 0x1b, 0x45,
0x14, 0xde, 0xb1, 0xd7, 0x6b, 0xfb, 0xf8, 0x27, 0xd6, 0x50, 0x92, 0xc5, 0x45, 0x6b, 0x6b, 0x85,
0x5a, 0xab, 0x08, 0x1b, 0xcc, 0x4f, 0xd3, 0xf0, 0xa7, 0xb8, 0x25, 0x22, 0x01, 0x89, 0x76, 0x13,
0x09, 0xa9, 0x12, 0xaa, 0x36, 0xd9, 0xb1, 0xb3, 0xca, 0x7a, 0xd7, 0xdd, 0x1d, 0x23, 0xe5, 0x8e,
0x17, 0x40, 0xea, 0x1d, 0x17, 0xbc, 0x00, 0xe2, 0x82, 0xe7, 0xe8, 0x65, 0x2e, 0x7b, 0x15, 0x88,
0x73, 0x83, 0x22, 0x21, 0xf5, 0x05, 0x90, 0xd0, 0xfc, 0xec, 0x7a, 0xe2, 0x04, 0x90, 0x7b, 0x63,
0xcf, 0x99, 0x39, 0x67, 0xe6, 0x7c, 0xdf, 0xf9, 0xce, 0xcc, 0xc2, 0xcd, 0xc9, 0xd1, 0xa8, 0x17,
0x44, 0xa3, 0x49, 0x1c, 0xd1, 0x28, 0x1b, 0x74, 0xf9, 0x2f, 0x2e, 0xa5, 0x76, 0xb3, 0x35, 0x8a,
0xa2, 0x51, 0x40, 0x7a, 0xdc, 0xda, 0x9f, 0x0e, 0x7b, 0xd4, 0x1f, 0x93, 0x84, 0xba, 0xe3, 0x89,
0x70, 0x6d, 0xbe, 0x33, 0xf2, 0xe9, 0xe1, 0x74, 0xbf, 0x7b, 0x10, 0x8d, 0x7b, 0xa3, 0x68, 0x14,
0xcd, 0x3d, 0x99, 0x25, 0x76, 0x67, 0x23, 0xe1, 0x6e, 0x6f, 0x41, 0xe5, 0xe1, 0x34, 0x39, 0x74,
0xc8, 0xd3, 0x29, 0x49, 0x28, 0xbe, 0x0b, 0xc5, 0x84, 0xc6, 0xc4, 0x1d, 0x27, 0x26, 0x6a, 0xe7,
0x3b, 0x95, 0x7e, 0xa3, 0x9b, 0xa5, 0xb2, 0xcb, 0x17, 0x06, 0x95, 0x8b, 0xd3, 0x56, 0xea, 0xe4,
0xa4, 0x03, 0xbb, 0x0e, 0x55, 0xb1, 0x4f, 0x32, 0x89, 0xc2, 0x84, 0xd8, 0x7f, 0x21, 0xa8, 0x3e,
0x9a, 0x92, 0xf8, 0x38, 0xdd, 0xb9, 0x09, 0xa5, 0x84, 0x04, 0xe4, 0x80, 0x46, 0xb1, 0x89, 0xda,
0xa8, 0x53, 0x76, 0x32, 0x1b, 0xdf, 0x80, 0x42, 0xe0, 0x8f, 0x7d, 0x6a, 0xe6, 0xda, 0xa8, 0x53,
0x73, 0x84, 0x81, 0x37, 0xa0, 0x90, 0x50, 0x37, 0xa6, 0x66, 0xbe, 0x8d, 0x3a, 0x95, 0x7e, 0xb3,
0x2b, 0xa0, 0x77, 0x53, 0x40, 0xdd, 0xbd, 0x14, 0xfa, 0xa0, 0xf4, 0xfc, 0xb4, 0xa5, 0x3d, 0xfb,
0xbd, 0x85, 0x1c, 0x11, 0x82, 0x3f, 0x82, 0x3c, 0x09, 0x3d, 0x53, 0x5f, 0x22, 0x92, 0x05, 0xe0,
0xf7, 0xa0, 0xec, 0xf9, 0x31, 0x39, 0xa0, 0x7e, 0x14, 0x9a, 0x85, 0x36, 0xea, 0xd4, 0xfb, 0xaf,
0xcd, 0x19, 0x78, 0x90, 0x2e, 0x39, 0x73, 0xaf, 0x1d, 0xbd, 0x64, 0x34, 0x8a, 0xf6, 0xc7, 0x50,
0x93, 0x70, 0x05, 0x01, 0xf8, 0xce, 0xff, 0x32, 0x39, 0x27, 0xef, 0x37, 0x04, 0xd5, 0xaf, 0xdd,
0x7d, 0x12, 0xa4, 0x64, 0x61, 0xd0, 0x43, 0x77, 0x4c, 0x24, 0x51, 0x7c, 0x8c, 0x57, 0xc1, 0xf8,
0xde, 0x0d, 0xa6, 0x24, 0xe1, 0x2c, 0x95, 0x1c, 0x69, 0x2d, 0x4b, 0x13, 0x7a, 0x65, 0x9a, 0x50,
0x46, 0x93, 0x7d, 0x1b, 0x6a, 0x32, 0x5f, 0x89, 0x76, 0x9e, 0x1c, 0x03, 0x5b, 0x4e, 0x93, 0xb3,
0x0f, 0xc1, 0x10, 0x60, 0xb1, 0x0d, 0x46, 0xc0, 0x42, 0x12, 0x01, 0x6a, 0x00, 0x17, 0xa7, 0x2d,
0x39, 0xe3, 0xc8, 0x7f, 0xbc, 0x01, 0x45, 0x12, 0xd2, 0xd8, 0xe7, 0x18, 0x19, 0x67, 0x2b, 0x73,
0xce, 0xbe, 0x08, 0x69, 0x7c, 0x3c, 0x58, 0x61, 0xe5, 0x62, 0x02, 0x94, 0x7e, 0x4e, 0x3a, 0xb0,
0x23, 0x28, 0x70, 0x17, 0xfc, 0x25, 0x94, 0xb3, 0x9e, 0xe0, 0x67, 0xfd, 0x37, 0xb2, 0xba, 0xdc,
0x31, 0x47, 0x13, 0x8e, 0x6f, 0x1e, 0x8c, 0xdf, 0x04, 0x3d, 0xf0, 0x43, 0xc2, 0xf9, 0x2e, 0x0f,
0x4a, 0x17, 0xa7, 0x2d, 0x6e, 0x3b, 0xfc, 0xd7, 0xfe, 0x09, 0x41, 0x65, 0xcf, 0xf5, 0xb3, 0x9a,
0xdd, 0x80, 0xc2, 0x53, 0xa6, 0x00, 0x59, 0x34, 0x61, 0x30, 0xd9, 0x7b, 0x24, 0x70, 0x8f, 0xb7,
0xa2, 0x98, 0x17, 0xa8, 0xe6, 0x64, 0xf6, 0x5c, 0xf6, 0xfa, 0xb5, 0xb2, 0x2f, 0x2c, 0x2d, 0xfb,
0x1d, 0xbd, 0x94, 0x6b, 0xe4, 0xed, 0x63, 0xa8, 0x8a, 0xc4, 0x64, 0x71, 0x3a, 0x60, 0x08, 0xa5,
0x49, 0x3a, 0xae, 0x2a, 0x51, 0xae, 0xe3, 0xcf, 0xa1, 0xee, 0xc5, 0xd1, 0x64, 0x42, 0xbc, 0x5d,
0xa9, 0x5d, 0x51, 0x87, 0x35, 0xa5, 0x07, 0xd4, 0x75, 0x67, 0xc1, 0xdd, 0xfe, 0x19, 0x41, 0x6d,
0x97, 0xf0, 0xca, 0x48, 0x5a, 0x32, 0x38, 0xe8, 0x95, 0xbb, 0x38, 0xb7, 0x6c, 0x17, 0xaf, 0x82,
0x31, 0x8a, 0xa3, 0xe9, 0x24, 0x31, 0xf3, 0x42, 0x8d, 0xc2, 0xb2, 0x77, 0xa0, 0x9e, 0x26, 0x27,
0xa9, 0x59, 0x07, 0x23, 0xe1, 0x33, 0xb2, 0x49, 0x9b, 0x0a, 0x35, 0x7c, 0x7e, 0xdb, 0x23, 0x21,
0xf5, 0x87, 0x3e, 0x89, 0x07, 0x3a, 0x3b, 0xc4, 0x91, 0xfe, 0xf6, 0x8f, 0x08, 0x1a, 0x8b, 0x2e,
0xf8, 0x33, 0x45, 0xe4, 0x6c, 0xbb, 0x5b, 0xff, 0xbe, 0x5d, 0x97, 0x37, 0x50, 0xc2, 0x35, 0x9b,
0x36, 0x40, 0xf3, 0x1e, 0x54, 0x94, 0x69, 0xdc, 0x80, 0xfc, 0x11, 0x49, 0x05, 0xc5, 0x86, 0x4c,
0x32, 0xbc, 0xb3, 0x84, 0x26, 0x1d, 0x61, 0x6c, 0xe4, 0xd6, 0x11, 0x93, 0x63, 0xed, 0x52, 0x6d,
0xf0, 0x3a, 0xe8, 0xc3, 0x38, 0x1a, 0x2f, 0x45, 0x3c, 0x8f, 0xc0, 0x1f, 0x40, 0x8e, 0x46, 0x4b,
0xd1, 0x9e, 0xa3, 0x11, 0x63, 0x5d, 0x82, 0xcf, 0xf3, 0xe4, 0xa4, 0x65, 0xff, 0x8a, 0x60, 0x85,
0xc5, 0x08, 0x06, 0xee, 0x1f, 0x4e, 0xc3, 0x23, 0xdc, 0x81, 0x06, 0x3b, 0xe9, 0x89, 0x1f, 0x8e,
0x48, 0x42, 0x49, 0xfc, 0xc4, 0xf7, 0x24, 0xcc, 0x3a, 0x9b, 0xdf, 0x96, 0xd3, 0xdb, 0x1e, 0x5e,
0x83, 0xe2, 0x34, 0x11, 0x0e, 0x02, 0xb3, 0xc1, 0xcc, 0x6d, 0x0f, 0xbf, 0xad, 0x1c, 0xc7, 0xb8,
0x56, 0xee, 0x69, 0xce, 0xe1, 0x43, 0xd7, 0x8f, 0xb3, 0x9b, 0xe5, 0x36, 0x18, 0x07, 0xec, 0xe0,
0xc4, 0xd4, 0x17, 0x2f, 0x16, 0x9e, 0x90, 0x23, 0x97, 0xed, 0x0f, 0xa1, 0x9c, 0x45, 0x5f, 0x7b,
0x0d, 0x5f, 0x5b, 0x01, 0xfb, 0x26, 0x14, 0x04, 0x30, 0x0c, 0xba, 0xe7, 0x52, 0x97, 0x87, 0x54,
0x1d, 0x3e, 0xb6, 0x4d, 0x58, 0xdd, 0x8b, 0xdd, 0x30, 0x19, 0x92, 0x98, 0x3b, 0x65, 0xf2, 0xbb,
0x73, 0x0b, 0xca, 0xd9, 0x9b, 0x82, 0x2b, 0x50, 0xdc, 0xfa, 0xc6, 0xf9, 0x76, 0xd3, 0x79, 0xd0,
0xd0, 0x70, 0x15, 0x4a, 0x83, 0xcd, 0xfb, 0x5f, 0x71, 0x0b, 0xf5, 0x37, 0xc1, 0x60, 0xaf, 0x2b,
0x89, 0xf1, 0x5d, 0xd0, 0xd9, 0x08, 0xbf, 0x3e, 0x07, 0xa0, 0xbc, 0xdf, 0xcd, 0xd5, 0xc5, 0x69,
0xf9, 0x1c, 0x6b, 0xfd, 0xbf, 0x11, 0x14, 0xd9, 0x0b, 0xc5, 0x64, 0xfa, 0x09, 0x14, 0xf8, 0x63,
0x85, 0x15, 0x77, 0xf5, 0xb1, 0x6e, 0xae, 0x5d, 0x99, 0x4f, 0xf7, 0x79, 0x17, 0xb1, 0x8e, 0xe6,
0x14, 0xa9, 0xd1, 0xea, 0xeb, 0xa5, 0x46, 0x5f, 0x7a, 0x25, 0x6c, 0x0d, 0xdf, 0x03, 0x9d, 0x5d,
0x4d, 0x6a, 0xfa, 0xca, 0x1d, 0xaa, 0xa6, 0xaf, 0xde, 0x60, 0xfc, 0xd8, 0x4f, 0xc1, 0x10, 0x0a,
0xc2, 0x6b, 0x8b, 0x5d, 0x95, 0x86, 0x9b, 0x57, 0x17, 0x32, 0xfc, 0xdf, 0x41, 0x29, 0x55, 0x15,
0x7e, 0x04, 0xf5, 0xcb, 0x05, 0xc1, 0x6f, 0x28, 0x07, 0x5f, 0x96, 0x6a, 0xb3, 0xad, 0x2c, 0x5d,
0x5b, 0x45, 0x5b, 0xeb, 0xa0, 0xc1, 0xe3, 0x93, 0x33, 0x4b, 0x7b, 0x71, 0x66, 0x69, 0x2f, 0xcf,
0x2c, 0xf4, 0xc3, 0xcc, 0x42, 0xbf, 0xcc, 0x2c, 0xf4, 0x7c, 0x66, 0xa1, 0x93, 0x99, 0x85, 0xfe,
0x98, 0x59, 0xe8, 0xcf, 0x99, 0xa5, 0xbd, 0x9c, 0x59, 0xe8, 0xd9, 0xb9, 0xa5, 0x9d, 0x9c, 0x5b,
0xda, 0x8b, 0x73, 0x4b, 0x7b, 0xfc, 0x96, 0xfa, 0xb1, 0x16, 0xbb, 0x43, 0x37, 0x74, 0x7b, 0x41,
0x74, 0xe4, 0xf7, 0xd4, 0x8f, 0xc1, 0x7d, 0x83, 0xff, 0xbd, 0xff, 0x4f, 0x00, 0x00, 0x00, 0xff,
0xff, 0xb9, 0x5d, 0x4d, 0x8c, 0x23, 0x0a, 0x00, 0x00,
// 1113 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 0x5f, 0x6f, 0x1b, 0x45,
0x10, 0xbf, 0xb5, 0xcf, 0x67, 0x7b, 0xfc, 0xa7, 0xd6, 0x36, 0x4d, 0x8c, 0x0b, 0x67, 0xeb, 0x84,
0x5a, 0xab, 0x80, 0x0d, 0xe1, 0x4f, 0xd3, 0xf0, 0x4f, 0x71, 0x4a, 0x44, 0x02, 0x52, 0xdb, 0x4b,
0x24, 0xa4, 0x4a, 0xa8, 0xba, 0xc4, 0x6b, 0xe7, 0x14, 0xfb, 0xce, 0xbd, 0x5d, 0x23, 0xe5, 0x8d,
0x2f, 0x80, 0xd4, 0x37, 0x1e, 0xf8, 0x02, 0x88, 0x07, 0x3e, 0x47, 0x9f, 0x50, 0x1e, 0xfb, 0x14,
0x88, 0xf3, 0x82, 0x22, 0x21, 0xf5, 0x23, 0xa0, 0xfd, 0x73, 0xe7, 0xb5, 0x93, 0x82, 0xdc, 0x17,
0x7b, 0x67, 0x76, 0x66, 0x77, 0xe6, 0x37, 0xbf, 0x99, 0x3d, 0xb8, 0x39, 0x3a, 0xea, 0xb7, 0x07,
0x61, 0x7f, 0x14, 0x85, 0x2c, 0x4c, 0x16, 0x2d, 0xf1, 0x8b, 0x73, 0xb1, 0x5c, 0xab, 0xf7, 0xc3,
0xb0, 0x3f, 0x20, 0x6d, 0x21, 0xed, 0x8f, 0x7b, 0x6d, 0xe6, 0x0f, 0x09, 0x65, 0xde, 0x70, 0x24,
0x4d, 0x6b, 0xef, 0xf5, 0x7d, 0x76, 0x38, 0xde, 0x6f, 0x1d, 0x84, 0xc3, 0x76, 0x3f, 0xec, 0x87,
0x53, 0x4b, 0x2e, 0xc9, 0xd3, 0xf9, 0x4a, 0x9a, 0x3b, 0x5b, 0x50, 0x78, 0x38, 0xa6, 0x87, 0x2e,
0x79, 0x3a, 0x26, 0x94, 0xe1, 0xbb, 0x90, 0xa5, 0x2c, 0x22, 0xde, 0x90, 0x56, 0x51, 0x23, 0xdd,
0x2c, 0xac, 0x56, 0x5a, 0x49, 0x28, 0xbb, 0x62, 0xa3, 0x53, 0xb8, 0x38, 0xad, 0xc7, 0x46, 0x6e,
0xbc, 0x70, 0xca, 0x50, 0x94, 0xe7, 0xd0, 0x51, 0x18, 0x50, 0xe2, 0xfc, 0x83, 0xa0, 0xf8, 0x68,
0x4c, 0xa2, 0xe3, 0xf8, 0xe4, 0x1a, 0xe4, 0x28, 0x19, 0x90, 0x03, 0x16, 0x46, 0x55, 0xd4, 0x40,
0xcd, 0xbc, 0x9b, 0xc8, 0x78, 0x09, 0x32, 0x03, 0x7f, 0xe8, 0xb3, 0x6a, 0xaa, 0x81, 0x9a, 0x25,
0x57, 0x0a, 0x78, 0x1d, 0x32, 0x94, 0x79, 0x11, 0xab, 0xa6, 0x1b, 0xa8, 0x59, 0x58, 0xad, 0xb5,
0x64, 0xea, 0xad, 0x38, 0xa1, 0xd6, 0x5e, 0x9c, 0x7a, 0x27, 0xf7, 0xfc, 0xb4, 0x6e, 0x3c, 0xfb,
0xb3, 0x8e, 0x5c, 0xe9, 0x82, 0x3f, 0x81, 0x34, 0x09, 0xba, 0x55, 0x73, 0x01, 0x4f, 0xee, 0x80,
0x3f, 0x80, 0x7c, 0xd7, 0x8f, 0xc8, 0x01, 0xf3, 0xc3, 0xa0, 0x9a, 0x69, 0xa0, 0x66, 0x79, 0xf5,
0xfa, 0x14, 0x81, 0xfb, 0xf1, 0x96, 0x3b, 0xb5, 0xda, 0x31, 0x73, 0x56, 0x25, 0xeb, 0x7c, 0x0a,
0x25, 0x95, 0xae, 0x04, 0x00, 0xdf, 0xf9, 0x5f, 0x24, 0xa7, 0xe0, 0xfd, 0x8e, 0xa0, 0xf8, 0xad,
0xb7, 0x4f, 0x06, 0x31, 0x58, 0x18, 0xcc, 0xc0, 0x1b, 0x12, 0x05, 0x94, 0x58, 0xe3, 0x65, 0xb0,
0x7e, 0xf0, 0x06, 0x63, 0x42, 0x05, 0x4a, 0x39, 0x57, 0x49, 0x8b, 0xc2, 0x84, 0x5e, 0x1b, 0x26,
0x94, 0xc0, 0xe4, 0xdc, 0x86, 0x92, 0x8a, 0x57, 0x65, 0x3b, 0x0d, 0x8e, 0x27, 0x9b, 0x8f, 0x83,
0x73, 0x0e, 0xc1, 0x92, 0xc9, 0x62, 0x07, 0xac, 0x01, 0x77, 0xa1, 0x32, 0xa9, 0x0e, 0x5c, 0x9c,
0xd6, 0x95, 0xc6, 0x55, 0xff, 0x78, 0x1d, 0xb2, 0x24, 0x60, 0x91, 0x2f, 0x72, 0xe4, 0x98, 0x5d,
0x9b, 0x62, 0xf6, 0x55, 0xc0, 0xa2, 0xe3, 0xce, 0x35, 0x5e, 0x2e, 0x4e, 0x40, 0x65, 0xe7, 0xc6,
0x0b, 0x27, 0x84, 0x8c, 0x30, 0xc1, 0x5f, 0x43, 0x3e, 0xe9, 0x09, 0x71, 0xd7, 0x7f, 0x67, 0x56,
0x56, 0x27, 0xa6, 0x18, 0x15, 0xf9, 0x4d, 0x9d, 0xf1, 0x9b, 0x60, 0x0e, 0xfc, 0x80, 0x08, 0xbc,
0xf3, 0x9d, 0xdc, 0xc5, 0x69, 0x5d, 0xc8, 0xae, 0xf8, 0x75, 0x7e, 0x46, 0x50, 0xd8, 0xf3, 0xfc,
0xa4, 0x66, 0x4b, 0x90, 0x79, 0xca, 0x19, 0xa0, 0x8a, 0x26, 0x05, 0x4e, 0xfb, 0x2e, 0x19, 0x78,
0xc7, 0x5b, 0x61, 0x24, 0x0a, 0x54, 0x72, 0x13, 0x79, 0x4a, 0x7b, 0xf3, 0x4a, 0xda, 0x67, 0x16,
0xa6, 0xfd, 0x8e, 0x99, 0x4b, 0x55, 0xd2, 0xce, 0x31, 0x14, 0x65, 0x60, 0xaa, 0x38, 0x4d, 0xb0,
0x24, 0xd3, 0x14, 0x1c, 0x97, 0x99, 0xa8, 0xf6, 0xf1, 0x97, 0x50, 0xee, 0x46, 0xe1, 0x68, 0x44,
0xba, 0xbb, 0x8a, 0xbb, 0xb2, 0x0e, 0x2b, 0x5a, 0x0f, 0xe8, 0xfb, 0xee, 0x9c, 0xb9, 0xf3, 0x0b,
0x82, 0xd2, 0x2e, 0x11, 0x95, 0x51, 0xb0, 0x24, 0xe9, 0xa0, 0xd7, 0xee, 0xe2, 0xd4, 0xa2, 0x5d,
0xbc, 0x0c, 0x56, 0x3f, 0x0a, 0xc7, 0x23, 0x5a, 0x4d, 0x4b, 0x36, 0x4a, 0xc9, 0xd9, 0x81, 0x72,
0x1c, 0x9c, 0x82, 0x66, 0x0d, 0x2c, 0x2a, 0x34, 0xaa, 0x49, 0x6b, 0x1a, 0x34, 0x42, 0xbf, 0xdd,
0x25, 0x01, 0xf3, 0x7b, 0x3e, 0x89, 0x3a, 0x26, 0xbf, 0xc4, 0x55, 0xf6, 0xce, 0x4f, 0x08, 0x2a,
0xf3, 0x26, 0xf8, 0x0b, 0x8d, 0xe4, 0xfc, 0xb8, 0x5b, 0xaf, 0x3e, 0xae, 0x25, 0x1a, 0x88, 0x0a,
0xce, 0xc6, 0x0d, 0x50, 0xbb, 0x07, 0x05, 0x4d, 0x8d, 0x2b, 0x90, 0x3e, 0x22, 0x31, 0xa1, 0xf8,
0x92, 0x53, 0x46, 0x74, 0x96, 0xe4, 0xa4, 0x2b, 0x85, 0xf5, 0xd4, 0x1a, 0xe2, 0x74, 0x2c, 0xcd,
0xd4, 0x06, 0xaf, 0x81, 0xd9, 0x8b, 0xc2, 0xe1, 0x42, 0xc0, 0x0b, 0x0f, 0xfc, 0x11, 0xa4, 0x58,
0xb8, 0x10, 0xec, 0x29, 0x16, 0x72, 0xd4, 0x55, 0xf2, 0x69, 0x11, 0x9c, 0x92, 0x9c, 0xdf, 0x10,
0x5c, 0xe3, 0x3e, 0x12, 0x81, 0xcd, 0xc3, 0x71, 0x70, 0x84, 0x9b, 0x50, 0xe1, 0x37, 0x3d, 0xf1,
0x83, 0x3e, 0xa1, 0x8c, 0x44, 0x4f, 0xfc, 0xae, 0x4a, 0xb3, 0xcc, 0xf5, 0xdb, 0x4a, 0xbd, 0xdd,
0xc5, 0x2b, 0x90, 0x1d, 0x53, 0x69, 0x20, 0x73, 0xb6, 0xb8, 0xb8, 0xdd, 0xc5, 0xef, 0x68, 0xd7,
0x71, 0xac, 0xb5, 0x39, 0x2d, 0x30, 0x7c, 0xe8, 0xf9, 0x51, 0x32, 0x59, 0x6e, 0x83, 0x75, 0xc0,
0x2f, 0xa6, 0x55, 0x73, 0x7e, 0xb0, 0x88, 0x80, 0x5c, 0xb5, 0xed, 0x7c, 0x0c, 0xf9, 0xc4, 0xfb,
0xca, 0x31, 0x7c, 0x65, 0x05, 0x9c, 0x9b, 0x90, 0x91, 0x89, 0x61, 0x30, 0xbb, 0x1e, 0xf3, 0x84,
0x4b, 0xd1, 0x15, 0x6b, 0xa7, 0x0a, 0xcb, 0x7b, 0x91, 0x17, 0xd0, 0x1e, 0x89, 0x84, 0x51, 0x42,
0x3f, 0xe7, 0x06, 0x5c, 0xe7, 0x9d, 0x4a, 0x22, 0xba, 0x19, 0x8e, 0x03, 0xa6, 0x7a, 0xc6, 0x79,
0x17, 0x96, 0x66, 0xd5, 0x8a, 0xad, 0x4b, 0x90, 0x39, 0xe0, 0x0a, 0x71, 0x7a, 0xc9, 0x95, 0xc2,
0x9d, 0x5b, 0x90, 0x4f, 0x1e, 0x26, 0x5c, 0x80, 0xec, 0xd6, 0x03, 0xf7, 0xbb, 0x0d, 0xf7, 0x7e,
0xc5, 0xc0, 0x45, 0xc8, 0x75, 0x36, 0x36, 0xbf, 0x11, 0x12, 0x5a, 0xdd, 0x00, 0x8b, 0x3f, 0xd1,
0x24, 0xc2, 0x77, 0xc1, 0xe4, 0x2b, 0x7c, 0x63, 0x8a, 0x82, 0xf6, 0x11, 0x50, 0x5b, 0x9e, 0x57,
0xab, 0x68, 0x8d, 0xd5, 0x3f, 0x52, 0x90, 0xe5, 0xcf, 0x1c, 0xe7, 0xfa, 0x67, 0x90, 0x11, 0x2f,
0x1e, 0xd6, 0xcc, 0xf5, 0x17, 0xbf, 0xb6, 0x72, 0x49, 0x1f, 0x9f, 0xf3, 0x3e, 0xe2, 0x63, 0x41,
0xe0, 0xac, 0x7b, 0xeb, 0x4f, 0xa0, 0xee, 0x3d, 0xf3, 0xd4, 0x38, 0x06, 0xbe, 0x07, 0x26, 0x87,
0x47, 0x0f, 0x5f, 0x1b, 0xc4, 0x7a, 0xf8, 0xfa, 0x18, 0x14, 0xd7, 0x7e, 0x0e, 0x96, 0xa4, 0x21,
0x5e, 0x99, 0x6f, 0xcd, 0xd8, 0xbd, 0x7a, 0x79, 0x23, 0xb9, 0xf9, 0x81, 0x9c, 0xac, 0x71, 0x61,
0xf0, 0x5b, 0xb3, 0x57, 0xcd, 0xd5, 0xb1, 0x66, 0xbf, 0x6a, 0x3b, 0x01, 0xf4, 0x7b, 0xc8, 0xc5,
0x5c, 0xc7, 0x8f, 0xa0, 0x3c, 0x4b, 0x13, 0xfc, 0x86, 0xe6, 0x3f, 0xdb, 0x40, 0xb5, 0x86, 0xb6,
0x75, 0x35, 0xb7, 0x8c, 0x26, 0xea, 0x3c, 0x3e, 0x39, 0xb3, 0x8d, 0x17, 0x67, 0xb6, 0xf1, 0xf2,
0xcc, 0x46, 0x3f, 0x4e, 0x6c, 0xf4, 0xeb, 0xc4, 0x46, 0xcf, 0x27, 0x36, 0x3a, 0x99, 0xd8, 0xe8,
0xaf, 0x89, 0x8d, 0xfe, 0x9e, 0xd8, 0xc6, 0xcb, 0x89, 0x8d, 0x9e, 0x9d, 0xdb, 0xc6, 0xc9, 0xb9,
0x6d, 0xbc, 0x38, 0xb7, 0x8d, 0xc7, 0x6f, 0xeb, 0x9f, 0x90, 0x91, 0xd7, 0xf3, 0x02, 0xaf, 0x3d,
0x08, 0x8f, 0xfc, 0xb6, 0xfe, 0x89, 0xba, 0x6f, 0x89, 0xbf, 0x0f, 0xff, 0x0d, 0x00, 0x00, 0xff,
0xff, 0x89, 0x7a, 0x2c, 0xbd, 0xb9, 0x0a, 0x00, 0x00,
}
func (x Direction) String() string {
@ -1633,6 +1716,51 @@ func (this *TransferChunksResponse) Equal(that interface{}) bool {
}
return true
}
func (this *TailersCountRequest) Equal(that interface{}) bool {
if that == nil {
return this == nil
}
that1, ok := that.(*TailersCountRequest)
if !ok {
that2, ok := that.(TailersCountRequest)
if ok {
that1 = &that2
} else {
return false
}
}
if that1 == nil {
return this == nil
} else if this == nil {
return false
}
return true
}
func (this *TailersCountResponse) Equal(that interface{}) bool {
if that == nil {
return this == nil
}
that1, ok := that.(*TailersCountResponse)
if !ok {
that2, ok := that.(TailersCountResponse)
if ok {
that1 = &that2
} else {
return false
}
}
if that1 == nil {
return this == nil
} else if this == nil {
return false
}
if this.Count != that1.Count {
return false
}
return true
}
func (this *PushRequest) GoString() string {
if this == nil {
return "nil"
@ -1868,6 +1996,25 @@ func (this *TransferChunksResponse) GoString() string {
s = append(s, "}")
return strings.Join(s, "")
}
func (this *TailersCountRequest) GoString() string {
if this == nil {
return "nil"
}
s := make([]string, 0, 4)
s = append(s, "&logproto.TailersCountRequest{")
s = append(s, "}")
return strings.Join(s, "")
}
func (this *TailersCountResponse) GoString() string {
if this == nil {
return "nil"
}
s := make([]string, 0, 5)
s = append(s, "&logproto.TailersCountResponse{")
s = append(s, "Count: "+fmt.Sprintf("%#v", this.Count)+",\n")
s = append(s, "}")
return strings.Join(s, "")
}
func valueToGoStringLogproto(v interface{}, typ string) string {
rv := reflect.ValueOf(v)
if rv.IsNil() {
@ -1965,6 +2112,7 @@ type QuerierClient interface {
Label(ctx context.Context, in *LabelRequest, opts ...grpc.CallOption) (*LabelResponse, error)
Tail(ctx context.Context, in *TailRequest, opts ...grpc.CallOption) (Querier_TailClient, error)
Series(ctx context.Context, in *SeriesRequest, opts ...grpc.CallOption) (*SeriesResponse, error)
TailersCount(ctx context.Context, in *TailersCountRequest, opts ...grpc.CallOption) (*TailersCountResponse, error)
}
type querierClient struct {
@ -2057,12 +2205,22 @@ func (c *querierClient) Series(ctx context.Context, in *SeriesRequest, opts ...g
return out, nil
}
func (c *querierClient) TailersCount(ctx context.Context, in *TailersCountRequest, opts ...grpc.CallOption) (*TailersCountResponse, error) {
out := new(TailersCountResponse)
err := c.cc.Invoke(ctx, "/logproto.Querier/TailersCount", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// QuerierServer is the server API for Querier service.
type QuerierServer interface {
Query(*QueryRequest, Querier_QueryServer) error
Label(context.Context, *LabelRequest) (*LabelResponse, error)
Tail(*TailRequest, Querier_TailServer) error
Series(context.Context, *SeriesRequest) (*SeriesResponse, error)
TailersCount(context.Context, *TailersCountRequest) (*TailersCountResponse, error)
}
// UnimplementedQuerierServer can be embedded to have forward compatible implementations.
@ -2081,6 +2239,9 @@ func (*UnimplementedQuerierServer) Tail(req *TailRequest, srv Querier_TailServer
func (*UnimplementedQuerierServer) Series(ctx context.Context, req *SeriesRequest) (*SeriesResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Series not implemented")
}
func (*UnimplementedQuerierServer) TailersCount(ctx context.Context, req *TailersCountRequest) (*TailersCountResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method TailersCount not implemented")
}
func RegisterQuerierServer(s *grpc.Server, srv QuerierServer) {
s.RegisterService(&_Querier_serviceDesc, srv)
@ -2164,6 +2325,24 @@ func _Querier_Series_Handler(srv interface{}, ctx context.Context, dec func(inte
return interceptor(ctx, in, info, handler)
}
func _Querier_TailersCount_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(TailersCountRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(QuerierServer).TailersCount(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/logproto.Querier/TailersCount",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(QuerierServer).TailersCount(ctx, req.(*TailersCountRequest))
}
return interceptor(ctx, in, info, handler)
}
var _Querier_serviceDesc = grpc.ServiceDesc{
ServiceName: "logproto.Querier",
HandlerType: (*QuerierServer)(nil),
@ -2176,6 +2355,10 @@ var _Querier_serviceDesc = grpc.ServiceDesc{
MethodName: "Series",
Handler: _Querier_Series_Handler,
},
{
MethodName: "TailersCount",
Handler: _Querier_TailersCount_Handler,
},
},
Streams: []grpc.StreamDesc{
{
@ -3050,6 +3233,57 @@ func (m *TransferChunksResponse) MarshalToSizedBuffer(dAtA []byte) (int, error)
return len(dAtA) - i, nil
}
func (m *TailersCountRequest) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *TailersCountRequest) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *TailersCountRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
return len(dAtA) - i, nil
}
func (m *TailersCountResponse) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *TailersCountResponse) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *TailersCountResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.Count != 0 {
i = encodeVarintLogproto(dAtA, i, uint64(m.Count))
i--
dAtA[i] = 0x8
}
return len(dAtA) - i, nil
}
func encodeVarintLogproto(dAtA []byte, offset int, v uint64) int {
offset -= sovLogproto(v)
base := offset
@ -3372,6 +3606,27 @@ func (m *TransferChunksResponse) Size() (n int) {
return n
}
func (m *TailersCountRequest) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
return n
}
func (m *TailersCountResponse) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.Count != 0 {
n += 1 + sovLogproto(uint64(m.Count))
}
return n
}
func sovLogproto(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}
@ -3622,6 +3877,25 @@ func (this *TransferChunksResponse) String() string {
}, "")
return s
}
func (this *TailersCountRequest) String() string {
if this == nil {
return "nil"
}
s := strings.Join([]string{`&TailersCountRequest{`,
`}`,
}, "")
return s
}
func (this *TailersCountResponse) String() string {
if this == nil {
return "nil"
}
s := strings.Join([]string{`&TailersCountResponse{`,
`Count:` + fmt.Sprintf("%v", this.Count) + `,`,
`}`,
}, "")
return s
}
func valueToStringLogproto(v interface{}) string {
rv := reflect.ValueOf(v)
if rv.IsNil() {
@ -5835,6 +6109,131 @@ func (m *TransferChunksResponse) Unmarshal(dAtA []byte) error {
}
return nil
}
func (m *TailersCountRequest) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowLogproto
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: TailersCountRequest: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: TailersCountRequest: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
default:
iNdEx = preIndex
skippy, err := skipLogproto(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthLogproto
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthLogproto
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *TailersCountResponse) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowLogproto
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: TailersCountResponse: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: TailersCountResponse: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Count", wireType)
}
m.Count = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowLogproto
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Count |= uint32(b&0x7F) << shift
if b < 0x80 {
break
}
}
default:
iNdEx = preIndex
skippy, err := skipLogproto(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthLogproto
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthLogproto
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipLogproto(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0

@ -16,6 +16,7 @@ service Querier {
rpc Label(LabelRequest) returns (LabelResponse) {};
rpc Tail(TailRequest) returns (stream TailResponse) {};
rpc Series(SeriesRequest) returns (SeriesResponse) {};
rpc TailersCount(TailersCountRequest) returns (TailersCountResponse) {};
}
service Ingester {
@ -119,4 +120,12 @@ message Chunk {
message TransferChunksResponse {
}
}
message TailersCountRequest {
}
message TailersCountResponse {
uint32 count = 1;
}

@ -274,6 +274,11 @@ func mergePair(s1, s2 []string) []string {
// Tail keeps getting matching logs from all ingesters for given query
func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer, error) {
err := q.checkTailRequestLimit(ctx)
if err != nil {
return nil, err
}
histReq := logql.SelectParams{
QueryRequest: &logproto.QueryRequest{
Selector: req.Query,
@ -284,7 +289,7 @@ func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer,
},
}
err := q.validateQueryRequest(ctx, histReq.QueryRequest)
err = q.validateQueryRequest(ctx, histReq.QueryRequest)
if err != nil {
return nil, err
}
@ -536,3 +541,56 @@ func (q *Querier) validateQueryTimeRange(userID string, from *time.Time, through
return nil
}
func (q *Querier) checkTailRequestLimit(ctx context.Context) error {
userID, err := user.ExtractOrgID(ctx)
if err != nil {
return err
}
replicationSet, err := q.ring.GetAll()
if err != nil {
return err
}
// we want to check count of active tailers with only active ingesters
ingesters := make([]ring.IngesterDesc, 0, 1)
for i := range replicationSet.Ingesters {
if replicationSet.Ingesters[i].State == ring.ACTIVE {
ingesters = append(ingesters, replicationSet.Ingesters[i])
}
}
if len(ingesters) == 0 {
return httpgrpc.Errorf(http.StatusInternalServerError, "no active ingester found")
}
responses, err := q.forGivenIngesters(ctx, replicationSet, func(querierClient logproto.QuerierClient) (interface{}, error) {
resp, err := querierClient.TailersCount(ctx, nil)
if err != nil {
return nil, err
}
return resp.Count, nil
})
// We are only checking active ingesters, and any error returned stops checking other ingesters
// so return that error here as well.
if err != nil {
return err
}
var maxCnt uint32
maxCnt = 0
for _, resp := range responses {
r := resp.response.(uint32)
if r > maxCnt {
maxCnt = r
}
}
if maxCnt >= uint32(q.limits.MaxConcurrentTailRequests(userID)) {
return httpgrpc.Errorf(http.StatusBadRequest,
"max concurrent tail requests limit exceeded, count > limit (%d > %d)", maxCnt+1, 1)
}
return nil
}

@ -60,6 +60,11 @@ func (c *querierClientMock) Series(ctx context.Context, in *logproto.SeriesReque
return res.(*logproto.SeriesResponse), args.Error(1)
}
func (c *querierClientMock) TailersCount(ctx context.Context, in *logproto.TailersCountRequest, opts ...grpc.CallOption) (*logproto.TailersCountResponse, error) {
args := c.Called(ctx, in, opts)
return args.Get(0).(*logproto.TailersCountResponse), args.Error(1)
}
func (c *querierClientMock) Context() context.Context {
return context.Background()
}

@ -96,6 +96,7 @@ func TestQuerier_Tail_QueryTimeoutConfigFlag(t *testing.T) {
ingesterClient := newQuerierClientMock()
ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything).Return(queryClient, nil)
ingesterClient.On("Tail", mock.Anything, &request, mock.Anything).Return(tailClient, nil)
ingesterClient.On("TailersCount", mock.Anything, mock.Anything, mock.Anything).Return(&logproto.TailersCountResponse{}, nil)
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
@ -447,7 +448,7 @@ func TestQuerier_SeriesAPI(t *testing.T) {
func TestQuerier_IngesterMaxQueryLookback(t *testing.T) {
limits, err := validation.NewOverrides(defaultLimitsTestConfig())
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
for _, tc := range []struct {
@ -522,3 +523,86 @@ func TestQuerier_IngesterMaxQueryLookback(t *testing.T) {
})
}
}
func TestQuerier_concurrentTailLimits(t *testing.T) {
request := logproto.TailRequest{
Query: "{type=\"test\"}",
DelayFor: 0,
Limit: 10,
Start: time.Now(),
}
t.Parallel()
tests := map[string]struct {
ringIngesters []ring.IngesterDesc
expectedError error
tailersCount uint32
}{
"empty ring": {
ringIngesters: []ring.IngesterDesc{},
expectedError: httpgrpc.Errorf(http.StatusInternalServerError, "no active ingester found"),
},
"ring containing one pending ingester": {
ringIngesters: []ring.IngesterDesc{mockIngesterDesc("1.1.1.1", ring.PENDING)},
expectedError: httpgrpc.Errorf(http.StatusInternalServerError, "no active ingester found"),
},
"ring containing one active ingester and 0 active tailers": {
ringIngesters: []ring.IngesterDesc{mockIngesterDesc("1.1.1.1", ring.ACTIVE)},
},
"ring containing one active ingester and 1 active tailer": {
ringIngesters: []ring.IngesterDesc{mockIngesterDesc("1.1.1.1", ring.ACTIVE)},
tailersCount: 1,
},
"ring containing one pending and active ingester with 1 active tailer": {
ringIngesters: []ring.IngesterDesc{mockIngesterDesc("1.1.1.1", ring.PENDING), mockIngesterDesc("2.2.2.2", ring.ACTIVE)},
tailersCount: 1,
},
"ring containing one active ingester and max active tailers": {
ringIngesters: []ring.IngesterDesc{mockIngesterDesc("1.1.1.1", ring.ACTIVE)},
expectedError: httpgrpc.Errorf(http.StatusBadRequest,
"max concurrent tail requests limit exceeded, count > limit (%d > %d)", 6, 1),
tailersCount: 5,
},
}
for testName, testData := range tests {
testData := testData
t.Run(testName, func(t *testing.T) {
// For this test's purpose, whenever a new ingester client needs to
// be created, the factory will always return the same mock instance
store := newStoreMock()
store.On("LazyQuery", mock.Anything, mock.Anything).Return(mockStreamIterator(1, 2), nil)
queryClient := newQueryClientMock()
queryClient.On("Recv").Return(mockQueryResponse([]*logproto.Stream{mockStream(1, 2)}), nil)
tailClient := newTailClientMock()
tailClient.On("Recv").Return(mockTailResponse(mockStream(1, 2)), nil)
ingesterClient := newQuerierClientMock()
ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything).Return(queryClient, nil)
ingesterClient.On("Tail", mock.Anything, &request, mock.Anything).Return(tailClient, nil)
ingesterClient.On("TailersCount", mock.Anything, mock.Anything, mock.Anything).Return(&logproto.TailersCountResponse{testData.tailersCount}, nil)
defaultLimits := defaultLimitsTestConfig()
defaultLimits.MaxConcurrentTailRequests = 5
limits, err := validation.NewOverrides(defaultLimits, nil)
require.NoError(t, err)
q, err := newQuerier(
mockQuerierConfig(),
mockIngesterClientConfig(),
newIngesterClientMockFactory(ingesterClient),
newReadRingMock(testData.ringIngesters),
store, limits)
require.NoError(t, err)
ctx := user.InjectOrgID(context.Background(), "test")
_, err = q.Tail(ctx, &request)
assert.Equal(t, testData.expectedError, err)
})
}
}

@ -40,6 +40,7 @@ type Limits struct {
MaxQueryParallelism int `yaml:"max_query_parallelism"`
CardinalityLimit int `yaml:"cardinality_limit"`
MaxStreamsMatchersPerQuery int `yaml:"max_streams_matchers_per_query"`
MaxConcurrentTailRequests int `yaml:"max_concurrent_tail_requests"`
// Config for overrides, convenient if it goes here.
PerTenantOverrideConfig string `yaml:"per_tenant_override_config"`
@ -67,6 +68,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.MaxQueryParallelism, "querier.max-query-parallelism", 14, "Maximum number of queries will be scheduled in parallel by the frontend.")
f.IntVar(&l.CardinalityLimit, "store.cardinality-limit", 1e5, "Cardinality limit for index queries.")
f.IntVar(&l.MaxStreamsMatchersPerQuery, "querier.max-streams-matcher-per-query", 1000, "Limit the number of streams matchers per query")
f.IntVar(&l.MaxConcurrentTailRequests, "querier.max-concurrent-tail-requests", 10, "Limit the number of concurrent tail requests")
f.StringVar(&l.PerTenantOverrideConfig, "limits.per-user-override-config", "", "File name of per-user overrides.")
f.DurationVar(&l.PerTenantOverridePeriod, "limits.per-user-override-period", 10*time.Second, "Period with this to reload the overrides.")
@ -212,6 +214,11 @@ func (o *Overrides) MaxStreamsMatchersPerQuery(userID string) int {
return o.getOverridesForUser(userID).MaxStreamsMatchersPerQuery
}
// MaxConcurrentTailRequests returns the limit to number of concurrent tail requests.
func (o *Overrides) MaxConcurrentTailRequests(userID string) int {
return o.getOverridesForUser(userID).MaxConcurrentTailRequests
}
func (o *Overrides) getOverridesForUser(userID string) *Limits {
if o.tenantLimits != nil {
l := o.tenantLimits(userID)

Loading…
Cancel
Save