From c7a3ec54f19cdce11fa516cc6f709ba3be551706 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Fri, 24 Jan 2020 21:32:15 +0530 Subject: [PATCH] limit for concurrent tail requests (#1562) * limit for concurrent tail requests When a new request comes in, querier would ask one of the active ingesters for count of active tail requests for the user If the count exceeds the configured limits for the user then tail request would be rejected right away. * goimports querier_test.go Signed-off-by: Cyril Tovena * query all ingesters for their max tail count Signed-off-by: Edward Welch * changed to check only ACTIVE ingesters and tweaked error handling some Signed-off-by: Edward Welch Co-authored-by: Cyril Tovena Co-authored-by: Ed Welch --- pkg/ingester/ingester.go | 17 + pkg/ingester/instance.go | 28 +- pkg/logproto/logproto.pb.go | 535 +++++++++++++++++++++++++++---- pkg/logproto/logproto.proto | 11 +- pkg/querier/querier.go | 60 +++- pkg/querier/querier_mock_test.go | 5 + pkg/querier/querier_test.go | 86 ++++- pkg/util/validation/limits.go | 7 + 8 files changed, 675 insertions(+), 74 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 30c7574acd..c1bdd893b1 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -322,3 +322,20 @@ func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_ tailer.loop() return nil } + +// TailersCount returns count of active tail requests from a user +func (i *Ingester) TailersCount(ctx context.Context, in *logproto.TailersCountRequest) (*logproto.TailersCountResponse, error) { + instanceID, err := user.ExtractOrgID(ctx) + if err != nil { + return nil, err + } + + resp := logproto.TailersCountResponse{} + + instance, ok := i.getInstanceByID(instanceID) + if ok { + resp.Count = instance.openTailersCount() + } + + return &resp, nil +} diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 477cbf1aa7..ce896ba378 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -318,12 +318,13 @@ func (i *instance) addNewTailer(t *tailer) { } func (i *instance) addTailersToNewStream(stream *stream) { - closedTailers := []uint32{} - i.tailerMtx.RLock() + defer i.tailerMtx.RUnlock() + for _, t := range i.tailers { + // we don't want to watch streams for closed tailers. + // When a new tail request comes in we will clean references to closed tailers if t.isClosed() { - closedTailers = append(closedTailers, t.getID()) continue } @@ -331,6 +332,18 @@ func (i *instance) addTailersToNewStream(stream *stream) { stream.addTailer(t) } } +} + +func (i *instance) checkClosedTailers() { + closedTailers := []uint32{} + + i.tailerMtx.RLock() + for _, t := range i.tailers { + if t.isClosed() { + closedTailers = append(closedTailers, t.getID()) + continue + } + } i.tailerMtx.RUnlock() if len(closedTailers) != 0 { @@ -350,6 +363,15 @@ func (i *instance) closeTailers() { } } +func (i *instance) openTailersCount() uint32 { + i.checkClosedTailers() + + i.tailerMtx.RLock() + defer i.tailerMtx.RUnlock() + + return uint32(len(i.tailers)) +} + func isDone(ctx context.Context) bool { select { case <-ctx.Done(): diff --git a/pkg/logproto/logproto.pb.go b/pkg/logproto/logproto.pb.go index af0ab34c3e..08b4ab15d7 100644 --- a/pkg/logproto/logproto.pb.go +++ b/pkg/logproto/logproto.pb.go @@ -983,6 +983,84 @@ func (m *TransferChunksResponse) XXX_DiscardUnknown() { var xxx_messageInfo_TransferChunksResponse proto.InternalMessageInfo +type TailersCountRequest struct { +} + +func (m *TailersCountRequest) Reset() { *m = TailersCountRequest{} } +func (*TailersCountRequest) ProtoMessage() {} +func (*TailersCountRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_c28a5f14f1f4c79a, []int{18} +} +func (m *TailersCountRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *TailersCountRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_TailersCountRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *TailersCountRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_TailersCountRequest.Merge(m, src) +} +func (m *TailersCountRequest) XXX_Size() int { + return m.Size() +} +func (m *TailersCountRequest) XXX_DiscardUnknown() { + xxx_messageInfo_TailersCountRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_TailersCountRequest proto.InternalMessageInfo + +type TailersCountResponse struct { + Count uint32 `protobuf:"varint,1,opt,name=count,proto3" json:"count,omitempty"` +} + +func (m *TailersCountResponse) Reset() { *m = TailersCountResponse{} } +func (*TailersCountResponse) ProtoMessage() {} +func (*TailersCountResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_c28a5f14f1f4c79a, []int{19} +} +func (m *TailersCountResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *TailersCountResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_TailersCountResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *TailersCountResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_TailersCountResponse.Merge(m, src) +} +func (m *TailersCountResponse) XXX_Size() int { + return m.Size() +} +func (m *TailersCountResponse) XXX_DiscardUnknown() { + xxx_messageInfo_TailersCountResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_TailersCountResponse proto.InternalMessageInfo + +func (m *TailersCountResponse) GetCount() uint32 { + if m != nil { + return m.Count + } + return 0 +} + func init() { proto.RegisterEnum("logproto.Direction", Direction_name, Direction_value) proto.RegisterType((*PushRequest)(nil), "logproto.PushRequest") @@ -1004,79 +1082,84 @@ func init() { proto.RegisterType((*LabelPair)(nil), "logproto.LabelPair") proto.RegisterType((*Chunk)(nil), "logproto.Chunk") proto.RegisterType((*TransferChunksResponse)(nil), "logproto.TransferChunksResponse") + proto.RegisterType((*TailersCountRequest)(nil), "logproto.TailersCountRequest") + proto.RegisterType((*TailersCountResponse)(nil), "logproto.TailersCountResponse") } func init() { proto.RegisterFile("pkg/logproto/logproto.proto", fileDescriptor_c28a5f14f1f4c79a) } var fileDescriptor_c28a5f14f1f4c79a = []byte{ - // 1065 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 0xdd, 0x6e, 0x1b, 0x45, - 0x14, 0xde, 0xb1, 0xd7, 0x6b, 0xfb, 0xf8, 0x27, 0xd6, 0x50, 0x92, 0xc5, 0x45, 0x6b, 0x6b, 0x85, - 0x5a, 0xab, 0x08, 0x1b, 0xcc, 0x4f, 0xd3, 0xf0, 0xa7, 0xb8, 0x25, 0x22, 0x01, 0x89, 0x76, 0x13, - 0x09, 0xa9, 0x12, 0xaa, 0x36, 0xd9, 0xb1, 0xb3, 0xca, 0x7a, 0xd7, 0xdd, 0x1d, 0x23, 0xe5, 0x8e, - 0x17, 0x40, 0xea, 0x1d, 0x17, 0xbc, 0x00, 0xe2, 0x82, 0xe7, 0xe8, 0x65, 0x2e, 0x7b, 0x15, 0x88, - 0x73, 0x83, 0x22, 0x21, 0xf5, 0x05, 0x90, 0xd0, 0xfc, 0xec, 0x7a, 0xe2, 0x04, 0x90, 0x7b, 0x63, - 0xcf, 0x99, 0x39, 0x67, 0xe6, 0x7c, 0xdf, 0xf9, 0xce, 0xcc, 0xc2, 0xcd, 0xc9, 0xd1, 0xa8, 0x17, - 0x44, 0xa3, 0x49, 0x1c, 0xd1, 0x28, 0x1b, 0x74, 0xf9, 0x2f, 0x2e, 0xa5, 0x76, 0xb3, 0x35, 0x8a, - 0xa2, 0x51, 0x40, 0x7a, 0xdc, 0xda, 0x9f, 0x0e, 0x7b, 0xd4, 0x1f, 0x93, 0x84, 0xba, 0xe3, 0x89, - 0x70, 0x6d, 0xbe, 0x33, 0xf2, 0xe9, 0xe1, 0x74, 0xbf, 0x7b, 0x10, 0x8d, 0x7b, 0xa3, 0x68, 0x14, - 0xcd, 0x3d, 0x99, 0x25, 0x76, 0x67, 0x23, 0xe1, 0x6e, 0x6f, 0x41, 0xe5, 0xe1, 0x34, 0x39, 0x74, - 0xc8, 0xd3, 0x29, 0x49, 0x28, 0xbe, 0x0b, 0xc5, 0x84, 0xc6, 0xc4, 0x1d, 0x27, 0x26, 0x6a, 0xe7, - 0x3b, 0x95, 0x7e, 0xa3, 0x9b, 0xa5, 0xb2, 0xcb, 0x17, 0x06, 0x95, 0x8b, 0xd3, 0x56, 0xea, 0xe4, - 0xa4, 0x03, 0xbb, 0x0e, 0x55, 0xb1, 0x4f, 0x32, 0x89, 0xc2, 0x84, 0xd8, 0x7f, 0x21, 0xa8, 0x3e, - 0x9a, 0x92, 0xf8, 0x38, 0xdd, 0xb9, 0x09, 0xa5, 0x84, 0x04, 0xe4, 0x80, 0x46, 0xb1, 0x89, 0xda, - 0xa8, 0x53, 0x76, 0x32, 0x1b, 0xdf, 0x80, 0x42, 0xe0, 0x8f, 0x7d, 0x6a, 0xe6, 0xda, 0xa8, 0x53, - 0x73, 0x84, 0x81, 0x37, 0xa0, 0x90, 0x50, 0x37, 0xa6, 0x66, 0xbe, 0x8d, 0x3a, 0x95, 0x7e, 0xb3, - 0x2b, 0xa0, 0x77, 0x53, 0x40, 0xdd, 0xbd, 0x14, 0xfa, 0xa0, 0xf4, 0xfc, 0xb4, 0xa5, 0x3d, 0xfb, - 0xbd, 0x85, 0x1c, 0x11, 0x82, 0x3f, 0x82, 0x3c, 0x09, 0x3d, 0x53, 0x5f, 0x22, 0x92, 0x05, 0xe0, - 0xf7, 0xa0, 0xec, 0xf9, 0x31, 0x39, 0xa0, 0x7e, 0x14, 0x9a, 0x85, 0x36, 0xea, 0xd4, 0xfb, 0xaf, - 0xcd, 0x19, 0x78, 0x90, 0x2e, 0x39, 0x73, 0xaf, 0x1d, 0xbd, 0x64, 0x34, 0x8a, 0xf6, 0xc7, 0x50, - 0x93, 0x70, 0x05, 0x01, 0xf8, 0xce, 0xff, 0x32, 0x39, 0x27, 0xef, 0x37, 0x04, 0xd5, 0xaf, 0xdd, - 0x7d, 0x12, 0xa4, 0x64, 0x61, 0xd0, 0x43, 0x77, 0x4c, 0x24, 0x51, 0x7c, 0x8c, 0x57, 0xc1, 0xf8, - 0xde, 0x0d, 0xa6, 0x24, 0xe1, 0x2c, 0x95, 0x1c, 0x69, 0x2d, 0x4b, 0x13, 0x7a, 0x65, 0x9a, 0x50, - 0x46, 0x93, 0x7d, 0x1b, 0x6a, 0x32, 0x5f, 0x89, 0x76, 0x9e, 0x1c, 0x03, 0x5b, 0x4e, 0x93, 0xb3, - 0x0f, 0xc1, 0x10, 0x60, 0xb1, 0x0d, 0x46, 0xc0, 0x42, 0x12, 0x01, 0x6a, 0x00, 0x17, 0xa7, 0x2d, - 0x39, 0xe3, 0xc8, 0x7f, 0xbc, 0x01, 0x45, 0x12, 0xd2, 0xd8, 0xe7, 0x18, 0x19, 0x67, 0x2b, 0x73, - 0xce, 0xbe, 0x08, 0x69, 0x7c, 0x3c, 0x58, 0x61, 0xe5, 0x62, 0x02, 0x94, 0x7e, 0x4e, 0x3a, 0xb0, - 0x23, 0x28, 0x70, 0x17, 0xfc, 0x25, 0x94, 0xb3, 0x9e, 0xe0, 0x67, 0xfd, 0x37, 0xb2, 0xba, 0xdc, - 0x31, 0x47, 0x13, 0x8e, 0x6f, 0x1e, 0x8c, 0xdf, 0x04, 0x3d, 0xf0, 0x43, 0xc2, 0xf9, 0x2e, 0x0f, - 0x4a, 0x17, 0xa7, 0x2d, 0x6e, 0x3b, 0xfc, 0xd7, 0xfe, 0x09, 0x41, 0x65, 0xcf, 0xf5, 0xb3, 0x9a, - 0xdd, 0x80, 0xc2, 0x53, 0xa6, 0x00, 0x59, 0x34, 0x61, 0x30, 0xd9, 0x7b, 0x24, 0x70, 0x8f, 0xb7, - 0xa2, 0x98, 0x17, 0xa8, 0xe6, 0x64, 0xf6, 0x5c, 0xf6, 0xfa, 0xb5, 0xb2, 0x2f, 0x2c, 0x2d, 0xfb, - 0x1d, 0xbd, 0x94, 0x6b, 0xe4, 0xed, 0x63, 0xa8, 0x8a, 0xc4, 0x64, 0x71, 0x3a, 0x60, 0x08, 0xa5, - 0x49, 0x3a, 0xae, 0x2a, 0x51, 0xae, 0xe3, 0xcf, 0xa1, 0xee, 0xc5, 0xd1, 0x64, 0x42, 0xbc, 0x5d, - 0xa9, 0x5d, 0x51, 0x87, 0x35, 0xa5, 0x07, 0xd4, 0x75, 0x67, 0xc1, 0xdd, 0xfe, 0x19, 0x41, 0x6d, - 0x97, 0xf0, 0xca, 0x48, 0x5a, 0x32, 0x38, 0xe8, 0x95, 0xbb, 0x38, 0xb7, 0x6c, 0x17, 0xaf, 0x82, - 0x31, 0x8a, 0xa3, 0xe9, 0x24, 0x31, 0xf3, 0x42, 0x8d, 0xc2, 0xb2, 0x77, 0xa0, 0x9e, 0x26, 0x27, - 0xa9, 0x59, 0x07, 0x23, 0xe1, 0x33, 0xb2, 0x49, 0x9b, 0x0a, 0x35, 0x7c, 0x7e, 0xdb, 0x23, 0x21, - 0xf5, 0x87, 0x3e, 0x89, 0x07, 0x3a, 0x3b, 0xc4, 0x91, 0xfe, 0xf6, 0x8f, 0x08, 0x1a, 0x8b, 0x2e, - 0xf8, 0x33, 0x45, 0xe4, 0x6c, 0xbb, 0x5b, 0xff, 0xbe, 0x5d, 0x97, 0x37, 0x50, 0xc2, 0x35, 0x9b, - 0x36, 0x40, 0xf3, 0x1e, 0x54, 0x94, 0x69, 0xdc, 0x80, 0xfc, 0x11, 0x49, 0x05, 0xc5, 0x86, 0x4c, - 0x32, 0xbc, 0xb3, 0x84, 0x26, 0x1d, 0x61, 0x6c, 0xe4, 0xd6, 0x11, 0x93, 0x63, 0xed, 0x52, 0x6d, - 0xf0, 0x3a, 0xe8, 0xc3, 0x38, 0x1a, 0x2f, 0x45, 0x3c, 0x8f, 0xc0, 0x1f, 0x40, 0x8e, 0x46, 0x4b, - 0xd1, 0x9e, 0xa3, 0x11, 0x63, 0x5d, 0x82, 0xcf, 0xf3, 0xe4, 0xa4, 0x65, 0xff, 0x8a, 0x60, 0x85, - 0xc5, 0x08, 0x06, 0xee, 0x1f, 0x4e, 0xc3, 0x23, 0xdc, 0x81, 0x06, 0x3b, 0xe9, 0x89, 0x1f, 0x8e, - 0x48, 0x42, 0x49, 0xfc, 0xc4, 0xf7, 0x24, 0xcc, 0x3a, 0x9b, 0xdf, 0x96, 0xd3, 0xdb, 0x1e, 0x5e, - 0x83, 0xe2, 0x34, 0x11, 0x0e, 0x02, 0xb3, 0xc1, 0xcc, 0x6d, 0x0f, 0xbf, 0xad, 0x1c, 0xc7, 0xb8, - 0x56, 0xee, 0x69, 0xce, 0xe1, 0x43, 0xd7, 0x8f, 0xb3, 0x9b, 0xe5, 0x36, 0x18, 0x07, 0xec, 0xe0, - 0xc4, 0xd4, 0x17, 0x2f, 0x16, 0x9e, 0x90, 0x23, 0x97, 0xed, 0x0f, 0xa1, 0x9c, 0x45, 0x5f, 0x7b, - 0x0d, 0x5f, 0x5b, 0x01, 0xfb, 0x26, 0x14, 0x04, 0x30, 0x0c, 0xba, 0xe7, 0x52, 0x97, 0x87, 0x54, - 0x1d, 0x3e, 0xb6, 0x4d, 0x58, 0xdd, 0x8b, 0xdd, 0x30, 0x19, 0x92, 0x98, 0x3b, 0x65, 0xf2, 0xbb, - 0x73, 0x0b, 0xca, 0xd9, 0x9b, 0x82, 0x2b, 0x50, 0xdc, 0xfa, 0xc6, 0xf9, 0x76, 0xd3, 0x79, 0xd0, - 0xd0, 0x70, 0x15, 0x4a, 0x83, 0xcd, 0xfb, 0x5f, 0x71, 0x0b, 0xf5, 0x37, 0xc1, 0x60, 0xaf, 0x2b, - 0x89, 0xf1, 0x5d, 0xd0, 0xd9, 0x08, 0xbf, 0x3e, 0x07, 0xa0, 0xbc, 0xdf, 0xcd, 0xd5, 0xc5, 0x69, - 0xf9, 0x1c, 0x6b, 0xfd, 0xbf, 0x11, 0x14, 0xd9, 0x0b, 0xc5, 0x64, 0xfa, 0x09, 0x14, 0xf8, 0x63, - 0x85, 0x15, 0x77, 0xf5, 0xb1, 0x6e, 0xae, 0x5d, 0x99, 0x4f, 0xf7, 0x79, 0x17, 0xb1, 0x8e, 0xe6, - 0x14, 0xa9, 0xd1, 0xea, 0xeb, 0xa5, 0x46, 0x5f, 0x7a, 0x25, 0x6c, 0x0d, 0xdf, 0x03, 0x9d, 0x5d, - 0x4d, 0x6a, 0xfa, 0xca, 0x1d, 0xaa, 0xa6, 0xaf, 0xde, 0x60, 0xfc, 0xd8, 0x4f, 0xc1, 0x10, 0x0a, - 0xc2, 0x6b, 0x8b, 0x5d, 0x95, 0x86, 0x9b, 0x57, 0x17, 0x32, 0xfc, 0xdf, 0x41, 0x29, 0x55, 0x15, - 0x7e, 0x04, 0xf5, 0xcb, 0x05, 0xc1, 0x6f, 0x28, 0x07, 0x5f, 0x96, 0x6a, 0xb3, 0xad, 0x2c, 0x5d, - 0x5b, 0x45, 0x5b, 0xeb, 0xa0, 0xc1, 0xe3, 0x93, 0x33, 0x4b, 0x7b, 0x71, 0x66, 0x69, 0x2f, 0xcf, - 0x2c, 0xf4, 0xc3, 0xcc, 0x42, 0xbf, 0xcc, 0x2c, 0xf4, 0x7c, 0x66, 0xa1, 0x93, 0x99, 0x85, 0xfe, - 0x98, 0x59, 0xe8, 0xcf, 0x99, 0xa5, 0xbd, 0x9c, 0x59, 0xe8, 0xd9, 0xb9, 0xa5, 0x9d, 0x9c, 0x5b, - 0xda, 0x8b, 0x73, 0x4b, 0x7b, 0xfc, 0x96, 0xfa, 0xb1, 0x16, 0xbb, 0x43, 0x37, 0x74, 0x7b, 0x41, - 0x74, 0xe4, 0xf7, 0xd4, 0x8f, 0xc1, 0x7d, 0x83, 0xff, 0xbd, 0xff, 0x4f, 0x00, 0x00, 0x00, 0xff, - 0xff, 0xb9, 0x5d, 0x4d, 0x8c, 0x23, 0x0a, 0x00, 0x00, + // 1113 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 0x5f, 0x6f, 0x1b, 0x45, + 0x10, 0xbf, 0xb5, 0xcf, 0x67, 0x7b, 0xfc, 0xa7, 0xd6, 0x36, 0x4d, 0x8c, 0x0b, 0x67, 0xeb, 0x84, + 0x5a, 0xab, 0x80, 0x0d, 0xe1, 0x4f, 0xd3, 0xf0, 0x4f, 0x71, 0x4a, 0x44, 0x02, 0x52, 0xdb, 0x4b, + 0x24, 0xa4, 0x4a, 0xa8, 0xba, 0xc4, 0x6b, 0xe7, 0x14, 0xfb, 0xce, 0xbd, 0x5d, 0x23, 0xe5, 0x8d, + 0x2f, 0x80, 0xd4, 0x37, 0x1e, 0xf8, 0x02, 0x88, 0x07, 0x3e, 0x47, 0x9f, 0x50, 0x1e, 0xfb, 0x14, + 0x88, 0xf3, 0x82, 0x22, 0x21, 0xf5, 0x23, 0xa0, 0xfd, 0x73, 0xe7, 0xb5, 0x93, 0x82, 0xdc, 0x17, + 0x7b, 0x67, 0x76, 0x66, 0x77, 0xe6, 0x37, 0xbf, 0x99, 0x3d, 0xb8, 0x39, 0x3a, 0xea, 0xb7, 0x07, + 0x61, 0x7f, 0x14, 0x85, 0x2c, 0x4c, 0x16, 0x2d, 0xf1, 0x8b, 0x73, 0xb1, 0x5c, 0xab, 0xf7, 0xc3, + 0xb0, 0x3f, 0x20, 0x6d, 0x21, 0xed, 0x8f, 0x7b, 0x6d, 0xe6, 0x0f, 0x09, 0x65, 0xde, 0x70, 0x24, + 0x4d, 0x6b, 0xef, 0xf5, 0x7d, 0x76, 0x38, 0xde, 0x6f, 0x1d, 0x84, 0xc3, 0x76, 0x3f, 0xec, 0x87, + 0x53, 0x4b, 0x2e, 0xc9, 0xd3, 0xf9, 0x4a, 0x9a, 0x3b, 0x5b, 0x50, 0x78, 0x38, 0xa6, 0x87, 0x2e, + 0x79, 0x3a, 0x26, 0x94, 0xe1, 0xbb, 0x90, 0xa5, 0x2c, 0x22, 0xde, 0x90, 0x56, 0x51, 0x23, 0xdd, + 0x2c, 0xac, 0x56, 0x5a, 0x49, 0x28, 0xbb, 0x62, 0xa3, 0x53, 0xb8, 0x38, 0xad, 0xc7, 0x46, 0x6e, + 0xbc, 0x70, 0xca, 0x50, 0x94, 0xe7, 0xd0, 0x51, 0x18, 0x50, 0xe2, 0xfc, 0x83, 0xa0, 0xf8, 0x68, + 0x4c, 0xa2, 0xe3, 0xf8, 0xe4, 0x1a, 0xe4, 0x28, 0x19, 0x90, 0x03, 0x16, 0x46, 0x55, 0xd4, 0x40, + 0xcd, 0xbc, 0x9b, 0xc8, 0x78, 0x09, 0x32, 0x03, 0x7f, 0xe8, 0xb3, 0x6a, 0xaa, 0x81, 0x9a, 0x25, + 0x57, 0x0a, 0x78, 0x1d, 0x32, 0x94, 0x79, 0x11, 0xab, 0xa6, 0x1b, 0xa8, 0x59, 0x58, 0xad, 0xb5, + 0x64, 0xea, 0xad, 0x38, 0xa1, 0xd6, 0x5e, 0x9c, 0x7a, 0x27, 0xf7, 0xfc, 0xb4, 0x6e, 0x3c, 0xfb, + 0xb3, 0x8e, 0x5c, 0xe9, 0x82, 0x3f, 0x81, 0x34, 0x09, 0xba, 0x55, 0x73, 0x01, 0x4f, 0xee, 0x80, + 0x3f, 0x80, 0x7c, 0xd7, 0x8f, 0xc8, 0x01, 0xf3, 0xc3, 0xa0, 0x9a, 0x69, 0xa0, 0x66, 0x79, 0xf5, + 0xfa, 0x14, 0x81, 0xfb, 0xf1, 0x96, 0x3b, 0xb5, 0xda, 0x31, 0x73, 0x56, 0x25, 0xeb, 0x7c, 0x0a, + 0x25, 0x95, 0xae, 0x04, 0x00, 0xdf, 0xf9, 0x5f, 0x24, 0xa7, 0xe0, 0xfd, 0x8e, 0xa0, 0xf8, 0xad, + 0xb7, 0x4f, 0x06, 0x31, 0x58, 0x18, 0xcc, 0xc0, 0x1b, 0x12, 0x05, 0x94, 0x58, 0xe3, 0x65, 0xb0, + 0x7e, 0xf0, 0x06, 0x63, 0x42, 0x05, 0x4a, 0x39, 0x57, 0x49, 0x8b, 0xc2, 0x84, 0x5e, 0x1b, 0x26, + 0x94, 0xc0, 0xe4, 0xdc, 0x86, 0x92, 0x8a, 0x57, 0x65, 0x3b, 0x0d, 0x8e, 0x27, 0x9b, 0x8f, 0x83, + 0x73, 0x0e, 0xc1, 0x92, 0xc9, 0x62, 0x07, 0xac, 0x01, 0x77, 0xa1, 0x32, 0xa9, 0x0e, 0x5c, 0x9c, + 0xd6, 0x95, 0xc6, 0x55, 0xff, 0x78, 0x1d, 0xb2, 0x24, 0x60, 0x91, 0x2f, 0x72, 0xe4, 0x98, 0x5d, + 0x9b, 0x62, 0xf6, 0x55, 0xc0, 0xa2, 0xe3, 0xce, 0x35, 0x5e, 0x2e, 0x4e, 0x40, 0x65, 0xe7, 0xc6, + 0x0b, 0x27, 0x84, 0x8c, 0x30, 0xc1, 0x5f, 0x43, 0x3e, 0xe9, 0x09, 0x71, 0xd7, 0x7f, 0x67, 0x56, + 0x56, 0x27, 0xa6, 0x18, 0x15, 0xf9, 0x4d, 0x9d, 0xf1, 0x9b, 0x60, 0x0e, 0xfc, 0x80, 0x08, 0xbc, + 0xf3, 0x9d, 0xdc, 0xc5, 0x69, 0x5d, 0xc8, 0xae, 0xf8, 0x75, 0x7e, 0x46, 0x50, 0xd8, 0xf3, 0xfc, + 0xa4, 0x66, 0x4b, 0x90, 0x79, 0xca, 0x19, 0xa0, 0x8a, 0x26, 0x05, 0x4e, 0xfb, 0x2e, 0x19, 0x78, + 0xc7, 0x5b, 0x61, 0x24, 0x0a, 0x54, 0x72, 0x13, 0x79, 0x4a, 0x7b, 0xf3, 0x4a, 0xda, 0x67, 0x16, + 0xa6, 0xfd, 0x8e, 0x99, 0x4b, 0x55, 0xd2, 0xce, 0x31, 0x14, 0x65, 0x60, 0xaa, 0x38, 0x4d, 0xb0, + 0x24, 0xd3, 0x14, 0x1c, 0x97, 0x99, 0xa8, 0xf6, 0xf1, 0x97, 0x50, 0xee, 0x46, 0xe1, 0x68, 0x44, + 0xba, 0xbb, 0x8a, 0xbb, 0xb2, 0x0e, 0x2b, 0x5a, 0x0f, 0xe8, 0xfb, 0xee, 0x9c, 0xb9, 0xf3, 0x0b, + 0x82, 0xd2, 0x2e, 0x11, 0x95, 0x51, 0xb0, 0x24, 0xe9, 0xa0, 0xd7, 0xee, 0xe2, 0xd4, 0xa2, 0x5d, + 0xbc, 0x0c, 0x56, 0x3f, 0x0a, 0xc7, 0x23, 0x5a, 0x4d, 0x4b, 0x36, 0x4a, 0xc9, 0xd9, 0x81, 0x72, + 0x1c, 0x9c, 0x82, 0x66, 0x0d, 0x2c, 0x2a, 0x34, 0xaa, 0x49, 0x6b, 0x1a, 0x34, 0x42, 0xbf, 0xdd, + 0x25, 0x01, 0xf3, 0x7b, 0x3e, 0x89, 0x3a, 0x26, 0xbf, 0xc4, 0x55, 0xf6, 0xce, 0x4f, 0x08, 0x2a, + 0xf3, 0x26, 0xf8, 0x0b, 0x8d, 0xe4, 0xfc, 0xb8, 0x5b, 0xaf, 0x3e, 0xae, 0x25, 0x1a, 0x88, 0x0a, + 0xce, 0xc6, 0x0d, 0x50, 0xbb, 0x07, 0x05, 0x4d, 0x8d, 0x2b, 0x90, 0x3e, 0x22, 0x31, 0xa1, 0xf8, + 0x92, 0x53, 0x46, 0x74, 0x96, 0xe4, 0xa4, 0x2b, 0x85, 0xf5, 0xd4, 0x1a, 0xe2, 0x74, 0x2c, 0xcd, + 0xd4, 0x06, 0xaf, 0x81, 0xd9, 0x8b, 0xc2, 0xe1, 0x42, 0xc0, 0x0b, 0x0f, 0xfc, 0x11, 0xa4, 0x58, + 0xb8, 0x10, 0xec, 0x29, 0x16, 0x72, 0xd4, 0x55, 0xf2, 0x69, 0x11, 0x9c, 0x92, 0x9c, 0xdf, 0x10, + 0x5c, 0xe3, 0x3e, 0x12, 0x81, 0xcd, 0xc3, 0x71, 0x70, 0x84, 0x9b, 0x50, 0xe1, 0x37, 0x3d, 0xf1, + 0x83, 0x3e, 0xa1, 0x8c, 0x44, 0x4f, 0xfc, 0xae, 0x4a, 0xb3, 0xcc, 0xf5, 0xdb, 0x4a, 0xbd, 0xdd, + 0xc5, 0x2b, 0x90, 0x1d, 0x53, 0x69, 0x20, 0x73, 0xb6, 0xb8, 0xb8, 0xdd, 0xc5, 0xef, 0x68, 0xd7, + 0x71, 0xac, 0xb5, 0x39, 0x2d, 0x30, 0x7c, 0xe8, 0xf9, 0x51, 0x32, 0x59, 0x6e, 0x83, 0x75, 0xc0, + 0x2f, 0xa6, 0x55, 0x73, 0x7e, 0xb0, 0x88, 0x80, 0x5c, 0xb5, 0xed, 0x7c, 0x0c, 0xf9, 0xc4, 0xfb, + 0xca, 0x31, 0x7c, 0x65, 0x05, 0x9c, 0x9b, 0x90, 0x91, 0x89, 0x61, 0x30, 0xbb, 0x1e, 0xf3, 0x84, + 0x4b, 0xd1, 0x15, 0x6b, 0xa7, 0x0a, 0xcb, 0x7b, 0x91, 0x17, 0xd0, 0x1e, 0x89, 0x84, 0x51, 0x42, + 0x3f, 0xe7, 0x06, 0x5c, 0xe7, 0x9d, 0x4a, 0x22, 0xba, 0x19, 0x8e, 0x03, 0xa6, 0x7a, 0xc6, 0x79, + 0x17, 0x96, 0x66, 0xd5, 0x8a, 0xad, 0x4b, 0x90, 0x39, 0xe0, 0x0a, 0x71, 0x7a, 0xc9, 0x95, 0xc2, + 0x9d, 0x5b, 0x90, 0x4f, 0x1e, 0x26, 0x5c, 0x80, 0xec, 0xd6, 0x03, 0xf7, 0xbb, 0x0d, 0xf7, 0x7e, + 0xc5, 0xc0, 0x45, 0xc8, 0x75, 0x36, 0x36, 0xbf, 0x11, 0x12, 0x5a, 0xdd, 0x00, 0x8b, 0x3f, 0xd1, + 0x24, 0xc2, 0x77, 0xc1, 0xe4, 0x2b, 0x7c, 0x63, 0x8a, 0x82, 0xf6, 0x11, 0x50, 0x5b, 0x9e, 0x57, + 0xab, 0x68, 0x8d, 0xd5, 0x3f, 0x52, 0x90, 0xe5, 0xcf, 0x1c, 0xe7, 0xfa, 0x67, 0x90, 0x11, 0x2f, + 0x1e, 0xd6, 0xcc, 0xf5, 0x17, 0xbf, 0xb6, 0x72, 0x49, 0x1f, 0x9f, 0xf3, 0x3e, 0xe2, 0x63, 0x41, + 0xe0, 0xac, 0x7b, 0xeb, 0x4f, 0xa0, 0xee, 0x3d, 0xf3, 0xd4, 0x38, 0x06, 0xbe, 0x07, 0x26, 0x87, + 0x47, 0x0f, 0x5f, 0x1b, 0xc4, 0x7a, 0xf8, 0xfa, 0x18, 0x14, 0xd7, 0x7e, 0x0e, 0x96, 0xa4, 0x21, + 0x5e, 0x99, 0x6f, 0xcd, 0xd8, 0xbd, 0x7a, 0x79, 0x23, 0xb9, 0xf9, 0x81, 0x9c, 0xac, 0x71, 0x61, + 0xf0, 0x5b, 0xb3, 0x57, 0xcd, 0xd5, 0xb1, 0x66, 0xbf, 0x6a, 0x3b, 0x01, 0xf4, 0x7b, 0xc8, 0xc5, + 0x5c, 0xc7, 0x8f, 0xa0, 0x3c, 0x4b, 0x13, 0xfc, 0x86, 0xe6, 0x3f, 0xdb, 0x40, 0xb5, 0x86, 0xb6, + 0x75, 0x35, 0xb7, 0x8c, 0x26, 0xea, 0x3c, 0x3e, 0x39, 0xb3, 0x8d, 0x17, 0x67, 0xb6, 0xf1, 0xf2, + 0xcc, 0x46, 0x3f, 0x4e, 0x6c, 0xf4, 0xeb, 0xc4, 0x46, 0xcf, 0x27, 0x36, 0x3a, 0x99, 0xd8, 0xe8, + 0xaf, 0x89, 0x8d, 0xfe, 0x9e, 0xd8, 0xc6, 0xcb, 0x89, 0x8d, 0x9e, 0x9d, 0xdb, 0xc6, 0xc9, 0xb9, + 0x6d, 0xbc, 0x38, 0xb7, 0x8d, 0xc7, 0x6f, 0xeb, 0x9f, 0x90, 0x91, 0xd7, 0xf3, 0x02, 0xaf, 0x3d, + 0x08, 0x8f, 0xfc, 0xb6, 0xfe, 0x89, 0xba, 0x6f, 0x89, 0xbf, 0x0f, 0xff, 0x0d, 0x00, 0x00, 0xff, + 0xff, 0x89, 0x7a, 0x2c, 0xbd, 0xb9, 0x0a, 0x00, 0x00, } func (x Direction) String() string { @@ -1633,6 +1716,51 @@ func (this *TransferChunksResponse) Equal(that interface{}) bool { } return true } +func (this *TailersCountRequest) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*TailersCountRequest) + if !ok { + that2, ok := that.(TailersCountRequest) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + return true +} +func (this *TailersCountResponse) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*TailersCountResponse) + if !ok { + that2, ok := that.(TailersCountResponse) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.Count != that1.Count { + return false + } + return true +} func (this *PushRequest) GoString() string { if this == nil { return "nil" @@ -1868,6 +1996,25 @@ func (this *TransferChunksResponse) GoString() string { s = append(s, "}") return strings.Join(s, "") } +func (this *TailersCountRequest) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 4) + s = append(s, "&logproto.TailersCountRequest{") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *TailersCountResponse) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 5) + s = append(s, "&logproto.TailersCountResponse{") + s = append(s, "Count: "+fmt.Sprintf("%#v", this.Count)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} func valueToGoStringLogproto(v interface{}, typ string) string { rv := reflect.ValueOf(v) if rv.IsNil() { @@ -1965,6 +2112,7 @@ type QuerierClient interface { Label(ctx context.Context, in *LabelRequest, opts ...grpc.CallOption) (*LabelResponse, error) Tail(ctx context.Context, in *TailRequest, opts ...grpc.CallOption) (Querier_TailClient, error) Series(ctx context.Context, in *SeriesRequest, opts ...grpc.CallOption) (*SeriesResponse, error) + TailersCount(ctx context.Context, in *TailersCountRequest, opts ...grpc.CallOption) (*TailersCountResponse, error) } type querierClient struct { @@ -2057,12 +2205,22 @@ func (c *querierClient) Series(ctx context.Context, in *SeriesRequest, opts ...g return out, nil } +func (c *querierClient) TailersCount(ctx context.Context, in *TailersCountRequest, opts ...grpc.CallOption) (*TailersCountResponse, error) { + out := new(TailersCountResponse) + err := c.cc.Invoke(ctx, "/logproto.Querier/TailersCount", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // QuerierServer is the server API for Querier service. type QuerierServer interface { Query(*QueryRequest, Querier_QueryServer) error Label(context.Context, *LabelRequest) (*LabelResponse, error) Tail(*TailRequest, Querier_TailServer) error Series(context.Context, *SeriesRequest) (*SeriesResponse, error) + TailersCount(context.Context, *TailersCountRequest) (*TailersCountResponse, error) } // UnimplementedQuerierServer can be embedded to have forward compatible implementations. @@ -2081,6 +2239,9 @@ func (*UnimplementedQuerierServer) Tail(req *TailRequest, srv Querier_TailServer func (*UnimplementedQuerierServer) Series(ctx context.Context, req *SeriesRequest) (*SeriesResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Series not implemented") } +func (*UnimplementedQuerierServer) TailersCount(ctx context.Context, req *TailersCountRequest) (*TailersCountResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method TailersCount not implemented") +} func RegisterQuerierServer(s *grpc.Server, srv QuerierServer) { s.RegisterService(&_Querier_serviceDesc, srv) @@ -2164,6 +2325,24 @@ func _Querier_Series_Handler(srv interface{}, ctx context.Context, dec func(inte return interceptor(ctx, in, info, handler) } +func _Querier_TailersCount_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(TailersCountRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(QuerierServer).TailersCount(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/logproto.Querier/TailersCount", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(QuerierServer).TailersCount(ctx, req.(*TailersCountRequest)) + } + return interceptor(ctx, in, info, handler) +} + var _Querier_serviceDesc = grpc.ServiceDesc{ ServiceName: "logproto.Querier", HandlerType: (*QuerierServer)(nil), @@ -2176,6 +2355,10 @@ var _Querier_serviceDesc = grpc.ServiceDesc{ MethodName: "Series", Handler: _Querier_Series_Handler, }, + { + MethodName: "TailersCount", + Handler: _Querier_TailersCount_Handler, + }, }, Streams: []grpc.StreamDesc{ { @@ -3050,6 +3233,57 @@ func (m *TransferChunksResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) return len(dAtA) - i, nil } +func (m *TailersCountRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *TailersCountRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *TailersCountRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + return len(dAtA) - i, nil +} + +func (m *TailersCountResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *TailersCountResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *TailersCountResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Count != 0 { + i = encodeVarintLogproto(dAtA, i, uint64(m.Count)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + func encodeVarintLogproto(dAtA []byte, offset int, v uint64) int { offset -= sovLogproto(v) base := offset @@ -3372,6 +3606,27 @@ func (m *TransferChunksResponse) Size() (n int) { return n } +func (m *TailersCountRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func (m *TailersCountResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Count != 0 { + n += 1 + sovLogproto(uint64(m.Count)) + } + return n +} + func sovLogproto(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 } @@ -3622,6 +3877,25 @@ func (this *TransferChunksResponse) String() string { }, "") return s } +func (this *TailersCountRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&TailersCountRequest{`, + `}`, + }, "") + return s +} +func (this *TailersCountResponse) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&TailersCountResponse{`, + `Count:` + fmt.Sprintf("%v", this.Count) + `,`, + `}`, + }, "") + return s +} func valueToStringLogproto(v interface{}) string { rv := reflect.ValueOf(v) if rv.IsNil() { @@ -5835,6 +6109,131 @@ func (m *TransferChunksResponse) Unmarshal(dAtA []byte) error { } return nil } +func (m *TailersCountRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: TailersCountRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: TailersCountRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipLogproto(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthLogproto + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthLogproto + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *TailersCountResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: TailersCountResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: TailersCountResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Count", wireType) + } + m.Count = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Count |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipLogproto(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthLogproto + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthLogproto + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipLogproto(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 diff --git a/pkg/logproto/logproto.proto b/pkg/logproto/logproto.proto index a8fa23e710..241c819471 100644 --- a/pkg/logproto/logproto.proto +++ b/pkg/logproto/logproto.proto @@ -16,6 +16,7 @@ service Querier { rpc Label(LabelRequest) returns (LabelResponse) {}; rpc Tail(TailRequest) returns (stream TailResponse) {}; rpc Series(SeriesRequest) returns (SeriesResponse) {}; + rpc TailersCount(TailersCountRequest) returns (TailersCountResponse) {}; } service Ingester { @@ -119,4 +120,12 @@ message Chunk { message TransferChunksResponse { -} \ No newline at end of file +} + +message TailersCountRequest { + +} + +message TailersCountResponse { + uint32 count = 1; +} diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 59539731e6..bc8c610d21 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -274,6 +274,11 @@ func mergePair(s1, s2 []string) []string { // Tail keeps getting matching logs from all ingesters for given query func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer, error) { + err := q.checkTailRequestLimit(ctx) + if err != nil { + return nil, err + } + histReq := logql.SelectParams{ QueryRequest: &logproto.QueryRequest{ Selector: req.Query, @@ -284,7 +289,7 @@ func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer, }, } - err := q.validateQueryRequest(ctx, histReq.QueryRequest) + err = q.validateQueryRequest(ctx, histReq.QueryRequest) if err != nil { return nil, err } @@ -536,3 +541,56 @@ func (q *Querier) validateQueryTimeRange(userID string, from *time.Time, through return nil } + +func (q *Querier) checkTailRequestLimit(ctx context.Context) error { + userID, err := user.ExtractOrgID(ctx) + if err != nil { + return err + } + + replicationSet, err := q.ring.GetAll() + if err != nil { + return err + } + + // we want to check count of active tailers with only active ingesters + ingesters := make([]ring.IngesterDesc, 0, 1) + for i := range replicationSet.Ingesters { + if replicationSet.Ingesters[i].State == ring.ACTIVE { + ingesters = append(ingesters, replicationSet.Ingesters[i]) + } + } + + if len(ingesters) == 0 { + return httpgrpc.Errorf(http.StatusInternalServerError, "no active ingester found") + } + + responses, err := q.forGivenIngesters(ctx, replicationSet, func(querierClient logproto.QuerierClient) (interface{}, error) { + resp, err := querierClient.TailersCount(ctx, nil) + if err != nil { + return nil, err + } + return resp.Count, nil + }) + // We are only checking active ingesters, and any error returned stops checking other ingesters + // so return that error here as well. + if err != nil { + return err + } + + var maxCnt uint32 + maxCnt = 0 + for _, resp := range responses { + r := resp.response.(uint32) + if r > maxCnt { + maxCnt = r + } + } + + if maxCnt >= uint32(q.limits.MaxConcurrentTailRequests(userID)) { + return httpgrpc.Errorf(http.StatusBadRequest, + "max concurrent tail requests limit exceeded, count > limit (%d > %d)", maxCnt+1, 1) + } + + return nil +} diff --git a/pkg/querier/querier_mock_test.go b/pkg/querier/querier_mock_test.go index a9868d199c..54bc79851c 100644 --- a/pkg/querier/querier_mock_test.go +++ b/pkg/querier/querier_mock_test.go @@ -60,6 +60,11 @@ func (c *querierClientMock) Series(ctx context.Context, in *logproto.SeriesReque return res.(*logproto.SeriesResponse), args.Error(1) } +func (c *querierClientMock) TailersCount(ctx context.Context, in *logproto.TailersCountRequest, opts ...grpc.CallOption) (*logproto.TailersCountResponse, error) { + args := c.Called(ctx, in, opts) + return args.Get(0).(*logproto.TailersCountResponse), args.Error(1) +} + func (c *querierClientMock) Context() context.Context { return context.Background() } diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index fc67c8d89e..13df8482a8 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -96,6 +96,7 @@ func TestQuerier_Tail_QueryTimeoutConfigFlag(t *testing.T) { ingesterClient := newQuerierClientMock() ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything).Return(queryClient, nil) ingesterClient.On("Tail", mock.Anything, &request, mock.Anything).Return(tailClient, nil) + ingesterClient.On("TailersCount", mock.Anything, mock.Anything, mock.Anything).Return(&logproto.TailersCountResponse{}, nil) limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) @@ -447,7 +448,7 @@ func TestQuerier_SeriesAPI(t *testing.T) { func TestQuerier_IngesterMaxQueryLookback(t *testing.T) { - limits, err := validation.NewOverrides(defaultLimitsTestConfig()) + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) for _, tc := range []struct { @@ -522,3 +523,86 @@ func TestQuerier_IngesterMaxQueryLookback(t *testing.T) { }) } } + +func TestQuerier_concurrentTailLimits(t *testing.T) { + request := logproto.TailRequest{ + Query: "{type=\"test\"}", + DelayFor: 0, + Limit: 10, + Start: time.Now(), + } + + t.Parallel() + + tests := map[string]struct { + ringIngesters []ring.IngesterDesc + expectedError error + tailersCount uint32 + }{ + "empty ring": { + ringIngesters: []ring.IngesterDesc{}, + expectedError: httpgrpc.Errorf(http.StatusInternalServerError, "no active ingester found"), + }, + "ring containing one pending ingester": { + ringIngesters: []ring.IngesterDesc{mockIngesterDesc("1.1.1.1", ring.PENDING)}, + expectedError: httpgrpc.Errorf(http.StatusInternalServerError, "no active ingester found"), + }, + "ring containing one active ingester and 0 active tailers": { + ringIngesters: []ring.IngesterDesc{mockIngesterDesc("1.1.1.1", ring.ACTIVE)}, + }, + "ring containing one active ingester and 1 active tailer": { + ringIngesters: []ring.IngesterDesc{mockIngesterDesc("1.1.1.1", ring.ACTIVE)}, + tailersCount: 1, + }, + "ring containing one pending and active ingester with 1 active tailer": { + ringIngesters: []ring.IngesterDesc{mockIngesterDesc("1.1.1.1", ring.PENDING), mockIngesterDesc("2.2.2.2", ring.ACTIVE)}, + tailersCount: 1, + }, + "ring containing one active ingester and max active tailers": { + ringIngesters: []ring.IngesterDesc{mockIngesterDesc("1.1.1.1", ring.ACTIVE)}, + expectedError: httpgrpc.Errorf(http.StatusBadRequest, + "max concurrent tail requests limit exceeded, count > limit (%d > %d)", 6, 1), + tailersCount: 5, + }, + } + + for testName, testData := range tests { + testData := testData + + t.Run(testName, func(t *testing.T) { + // For this test's purpose, whenever a new ingester client needs to + // be created, the factory will always return the same mock instance + store := newStoreMock() + store.On("LazyQuery", mock.Anything, mock.Anything).Return(mockStreamIterator(1, 2), nil) + + queryClient := newQueryClientMock() + queryClient.On("Recv").Return(mockQueryResponse([]*logproto.Stream{mockStream(1, 2)}), nil) + + tailClient := newTailClientMock() + tailClient.On("Recv").Return(mockTailResponse(mockStream(1, 2)), nil) + + ingesterClient := newQuerierClientMock() + ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything).Return(queryClient, nil) + ingesterClient.On("Tail", mock.Anything, &request, mock.Anything).Return(tailClient, nil) + ingesterClient.On("TailersCount", mock.Anything, mock.Anything, mock.Anything).Return(&logproto.TailersCountResponse{testData.tailersCount}, nil) + + defaultLimits := defaultLimitsTestConfig() + defaultLimits.MaxConcurrentTailRequests = 5 + + limits, err := validation.NewOverrides(defaultLimits, nil) + require.NoError(t, err) + + q, err := newQuerier( + mockQuerierConfig(), + mockIngesterClientConfig(), + newIngesterClientMockFactory(ingesterClient), + newReadRingMock(testData.ringIngesters), + store, limits) + require.NoError(t, err) + + ctx := user.InjectOrgID(context.Background(), "test") + _, err = q.Tail(ctx, &request) + assert.Equal(t, testData.expectedError, err) + }) + } +} diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index e306b91166..34810d86ae 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -40,6 +40,7 @@ type Limits struct { MaxQueryParallelism int `yaml:"max_query_parallelism"` CardinalityLimit int `yaml:"cardinality_limit"` MaxStreamsMatchersPerQuery int `yaml:"max_streams_matchers_per_query"` + MaxConcurrentTailRequests int `yaml:"max_concurrent_tail_requests"` // Config for overrides, convenient if it goes here. PerTenantOverrideConfig string `yaml:"per_tenant_override_config"` @@ -67,6 +68,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.MaxQueryParallelism, "querier.max-query-parallelism", 14, "Maximum number of queries will be scheduled in parallel by the frontend.") f.IntVar(&l.CardinalityLimit, "store.cardinality-limit", 1e5, "Cardinality limit for index queries.") f.IntVar(&l.MaxStreamsMatchersPerQuery, "querier.max-streams-matcher-per-query", 1000, "Limit the number of streams matchers per query") + f.IntVar(&l.MaxConcurrentTailRequests, "querier.max-concurrent-tail-requests", 10, "Limit the number of concurrent tail requests") f.StringVar(&l.PerTenantOverrideConfig, "limits.per-user-override-config", "", "File name of per-user overrides.") f.DurationVar(&l.PerTenantOverridePeriod, "limits.per-user-override-period", 10*time.Second, "Period with this to reload the overrides.") @@ -212,6 +214,11 @@ func (o *Overrides) MaxStreamsMatchersPerQuery(userID string) int { return o.getOverridesForUser(userID).MaxStreamsMatchersPerQuery } +// MaxConcurrentTailRequests returns the limit to number of concurrent tail requests. +func (o *Overrides) MaxConcurrentTailRequests(userID string) int { + return o.getOverridesForUser(userID).MaxConcurrentTailRequests +} + func (o *Overrides) getOverridesForUser(userID string) *Limits { if o.tenantLimits != nil { l := o.tenantLimits(userID)