diff --git a/pkg/loghttp/query.go b/pkg/loghttp/query.go index af8ebbe968..03bb05b9d6 100644 --- a/pkg/loghttp/query.go +++ b/pkg/loghttp/query.go @@ -18,10 +18,11 @@ import ( ) var ( - errEndBeforeStart = errors.New("end timestamp must not be before or equal to start time") - errNegativeStep = errors.New("zero or negative query resolution step widths are not accepted. Try a positive integer") - errStepTooSmall = errors.New("exceeded maximum resolution of 11,000 points per time series. Try increasing the value of the step parameter") - errNegativeInterval = errors.New("interval must be >= 0") + errEndBeforeStart = errors.New("end timestamp must not be before or equal to start time") + errZeroOrNegativeStep = errors.New("zero or negative query resolution step widths are not accepted. Try a positive integer") + errNegativeStep = errors.New("negative query resolution step widths are not accepted. Try a positive integer") + errStepTooSmall = errors.New("exceeded maximum resolution of 11,000 points per time series. Try increasing the value of the step parameter") + errNegativeInterval = errors.New("interval must be >= 0") ) // QueryStatus holds the status of a query @@ -317,7 +318,7 @@ func ParseRangeQuery(r *http.Request) (*RangeQuery, error) { } if result.Step <= 0 { - return nil, errNegativeStep + return nil, errZeroOrNegativeStep } result.Shards = shards(r) @@ -346,7 +347,53 @@ func ParseIndexStatsQuery(r *http.Request) (*RangeQuery, error) { return ParseRangeQuery(r) } -func ParseSeriesVolumeQuery(r *http.Request) (*RangeQuery, error) { +type SeriesVolumeInstantQuery struct { + Start time.Time + End time.Time + Query string + Ts time.Time + Limit uint32 +} + +func ParseSeriesVolumeInstantQuery(r *http.Request) (*SeriesVolumeInstantQuery, error) { + err := labelVolumeLimit(r) + if err != nil { + return nil, err + } + + result, err := ParseInstantQuery(r) + if err != nil { + return nil, err + } + + svInstantQuery := SeriesVolumeInstantQuery{ + Query: result.Query, + Ts: result.Ts, + Limit: result.Limit, + } + + svInstantQuery.Start, svInstantQuery.End, err = bounds(r) + if err != nil { + return nil, err + } + + if svInstantQuery.End.Before(svInstantQuery.Start) { + return nil, errEndBeforeStart + } + + return &svInstantQuery, nil +} + +type SeriesVolumeRangeQuery struct { + Start time.Time + End time.Time + Step time.Duration + Interval time.Duration + Query string + Limit uint32 +} + +func ParseSeriesVolumeRangeQuery(r *http.Request) (*SeriesVolumeRangeQuery, error) { err := labelVolumeLimit(r) if err != nil { return nil, err @@ -357,7 +404,14 @@ func ParseSeriesVolumeQuery(r *http.Request) (*RangeQuery, error) { return nil, err } - return result, nil + return &SeriesVolumeRangeQuery{ + Start: result.Start, + End: result.End, + Step: result.Step, + Interval: result.Interval, + Query: result.Query, + Limit: result.Limit, + }, nil } func labelVolumeLimit(r *http.Request) error { diff --git a/pkg/logproto/compat.go b/pkg/logproto/compat.go index fc5aba479e..f1703e43c0 100644 --- a/pkg/logproto/compat.go +++ b/pkg/logproto/compat.go @@ -288,9 +288,6 @@ func (m *VolumeRequest) GetEnd() int64 { return int64(m.Through) } -// GetStep returns the step of the request in milliseconds. -func (m *VolumeRequest) GetStep() int64 { return 0 } - // GetQuery returns the query of the request. func (m *VolumeRequest) GetQuery() string { return m.Matchers diff --git a/pkg/logproto/logproto.pb.go b/pkg/logproto/logproto.pb.go index 7cc5927a83..4962a04f24 100644 --- a/pkg/logproto/logproto.pb.go +++ b/pkg/logproto/logproto.pb.go @@ -2290,6 +2290,7 @@ type VolumeRequest struct { Through github_com_prometheus_common_model.Time `protobuf:"varint,2,opt,name=through,proto3,customtype=github.com/prometheus/common/model.Time" json:"through"` Matchers string `protobuf:"bytes,3,opt,name=matchers,proto3" json:"matchers,omitempty"` Limit int32 `protobuf:"varint,4,opt,name=limit,proto3" json:"limit,omitempty"` + Step int64 `protobuf:"varint,5,opt,name=step,proto3" json:"step,omitempty"` } func (m *VolumeRequest) Reset() { *m = VolumeRequest{} } @@ -2338,6 +2339,13 @@ func (m *VolumeRequest) GetLimit() int32 { return 0 } +func (m *VolumeRequest) GetStep() int64 { + if m != nil { + return m.Step + } + return 0 +} + type VolumeResponse struct { Volumes []Volume `protobuf:"bytes,1,rep,name=volumes,proto3" json:"volumes"` Limit int32 `protobuf:"varint,2,opt,name=limit,proto3" json:"limit,omitempty"` @@ -2493,145 +2501,146 @@ func init() { func init() { proto.RegisterFile("pkg/logproto/logproto.proto", fileDescriptor_c28a5f14f1f4c79a) } var fileDescriptor_c28a5f14f1f4c79a = []byte{ - // 2197 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xd4, 0x19, 0x4b, 0x8f, 0x1b, 0x49, - 0xd9, 0x65, 0xb7, 0x5f, 0x9f, 0xed, 0x19, 0xa7, 0xc6, 0x9b, 0x18, 0x27, 0xb1, 0x27, 0xad, 0x25, - 0x19, 0x25, 0x59, 0x7b, 0x33, 0x0b, 0x4b, 0x36, 0x61, 0x41, 0xf1, 0xcc, 0x26, 0x3b, 0x79, 0x6f, - 0x4d, 0x08, 0x68, 0xc5, 0x2a, 0xea, 0xb1, 0xcb, 0x0f, 0xc5, 0xed, 0x76, 0xba, 0xdb, 0x9b, 0x1d, - 0x89, 0x03, 0x7f, 0x60, 0xa5, 0xbd, 0x21, 0x2e, 0x88, 0x03, 0x12, 0x08, 0x89, 0x0b, 0x3f, 0x00, - 0x38, 0x20, 0x11, 0x6e, 0xe1, 0xb6, 0xe2, 0x60, 0xc8, 0xe4, 0x82, 0xe6, 0xb4, 0x7f, 0x00, 0x84, - 0xea, 0xd5, 0x5d, 0xdd, 0xe3, 0xd9, 0x5d, 0x87, 0x48, 0x28, 0x97, 0x71, 0xd7, 0x57, 0x5f, 0x7d, - 0xf5, 0xbd, 0x1f, 0x35, 0x70, 0x7c, 0xf2, 0xb0, 0xdf, 0x1a, 0x39, 0xfd, 0x89, 0xeb, 0xf8, 0x4e, - 0xf0, 0xd1, 0xe4, 0x7f, 0x71, 0x4e, 0xad, 0x6b, 0x95, 0xbe, 0xd3, 0x77, 0x04, 0x0e, 0xfb, 0x12, - 0xfb, 0xb5, 0x46, 0xdf, 0x71, 0xfa, 0x23, 0xda, 0xe2, 0xab, 0x9d, 0x69, 0xaf, 0xe5, 0x0f, 0x6d, - 0xea, 0xf9, 0x96, 0x3d, 0x91, 0x08, 0xab, 0x92, 0xfa, 0xa3, 0x91, 0xed, 0x74, 0xe9, 0xa8, 0xe5, - 0xf9, 0x96, 0xef, 0x89, 0xbf, 0x12, 0x63, 0x85, 0x61, 0x4c, 0xa6, 0xde, 0x80, 0xff, 0x11, 0x40, - 0xb3, 0x02, 0x78, 0xdb, 0x77, 0xa9, 0x65, 0x13, 0xcb, 0xa7, 0x1e, 0xa1, 0x8f, 0xa6, 0xd4, 0xf3, - 0xcd, 0x5b, 0xb0, 0x12, 0x81, 0x7a, 0x13, 0x67, 0xec, 0x51, 0xfc, 0x36, 0x14, 0xbc, 0x10, 0x5c, - 0x45, 0xab, 0xa9, 0xb5, 0xc2, 0x7a, 0xa5, 0x19, 0x88, 0x12, 0x9e, 0x21, 0x3a, 0xa2, 0xf9, 0x0b, - 0x04, 0x10, 0xee, 0xe1, 0x3a, 0x80, 0xd8, 0x7d, 0xdf, 0xf2, 0x06, 0x55, 0xb4, 0x8a, 0xd6, 0x0c, - 0xa2, 0x41, 0xf0, 0x79, 0x38, 0x12, 0xae, 0x6e, 0x3b, 0xdb, 0x03, 0xcb, 0xed, 0x56, 0x93, 0x1c, - 0xed, 0xe0, 0x06, 0xc6, 0x60, 0xb8, 0x96, 0x4f, 0xab, 0xa9, 0x55, 0xb4, 0x96, 0x22, 0xfc, 0x1b, - 0x1f, 0x85, 0x8c, 0x4f, 0xc7, 0xd6, 0xd8, 0xaf, 0x1a, 0xab, 0x68, 0x2d, 0x4f, 0xe4, 0x8a, 0xc1, - 0x99, 0xec, 0xd4, 0xab, 0xa6, 0x57, 0xd1, 0x5a, 0x89, 0xc8, 0x95, 0xf9, 0x97, 0x24, 0x14, 0x3f, - 0x98, 0x52, 0x77, 0x57, 0x2a, 0x00, 0xd7, 0x20, 0xe7, 0xd1, 0x11, 0xed, 0xf8, 0x8e, 0xcb, 0x19, - 0xcc, 0x93, 0x60, 0x8d, 0x2b, 0x90, 0x1e, 0x0d, 0xed, 0xa1, 0xcf, 0x59, 0x2a, 0x11, 0xb1, 0xc0, - 0x97, 0x20, 0xed, 0xf9, 0x96, 0xeb, 0x73, 0x3e, 0x0a, 0xeb, 0xb5, 0xa6, 0x30, 0x58, 0x53, 0x19, - 0xac, 0x79, 0x4f, 0x19, 0xac, 0x9d, 0x7b, 0x32, 0x6b, 0x24, 0x3e, 0xfb, 0x47, 0x03, 0x11, 0x71, - 0x04, 0xbf, 0x0d, 0x29, 0x3a, 0xee, 0x72, 0x5e, 0xbf, 0xee, 0x49, 0x76, 0x00, 0x5f, 0x80, 0x7c, - 0x77, 0xe8, 0xd2, 0x8e, 0x3f, 0x74, 0xc6, 0x5c, 0xa2, 0xa5, 0xf5, 0x95, 0xd0, 0x1a, 0x9b, 0x6a, - 0x8b, 0x84, 0x58, 0xf8, 0x3c, 0x64, 0x3c, 0xa6, 0x36, 0xaf, 0x9a, 0x5d, 0x4d, 0xad, 0xe5, 0xdb, - 0x95, 0xfd, 0x59, 0xa3, 0x2c, 0x20, 0xe7, 0x1d, 0x7b, 0xe8, 0x53, 0x7b, 0xe2, 0xef, 0x12, 0x89, - 0x83, 0xcf, 0x42, 0xb6, 0x4b, 0x47, 0x94, 0x19, 0x3b, 0xc7, 0x8d, 0x5d, 0xd6, 0xc8, 0xf3, 0x0d, - 0xa2, 0x10, 0xae, 0x1b, 0xb9, 0x4c, 0x39, 0x6b, 0xfe, 0x07, 0x01, 0xde, 0xb6, 0xec, 0xc9, 0x88, - 0x7e, 0x6d, 0x7d, 0x06, 0x9a, 0x4b, 0xbe, 0xb0, 0xe6, 0x52, 0x8b, 0x6a, 0x2e, 0x54, 0x83, 0xb1, - 0x98, 0x1a, 0xd2, 0x5f, 0xa1, 0x06, 0xf3, 0x26, 0x64, 0x04, 0xe8, 0xab, 0x7c, 0x28, 0x94, 0x39, - 0xa5, 0xa4, 0x29, 0x87, 0xd2, 0xa4, 0x38, 0x9f, 0xe6, 0x2f, 0x11, 0x94, 0xa4, 0x22, 0x65, 0x0c, - 0xee, 0x40, 0x56, 0xc4, 0x80, 0x8a, 0xbf, 0x63, 0xf1, 0xf8, 0xbb, 0xd2, 0xb5, 0x26, 0x3e, 0x75, - 0xdb, 0xad, 0x27, 0xb3, 0x06, 0xfa, 0xfb, 0xac, 0x71, 0xa6, 0x3f, 0xf4, 0x07, 0xd3, 0x9d, 0x66, - 0xc7, 0xb1, 0x5b, 0x7d, 0xd7, 0xea, 0x59, 0x63, 0xab, 0x35, 0x72, 0x1e, 0x0e, 0x5b, 0x2a, 0x1f, - 0xa8, 0xb8, 0x55, 0x84, 0xf1, 0x39, 0xce, 0x9d, 0xef, 0x49, 0x8b, 0x2c, 0x37, 0x45, 0x1a, 0xd9, - 0x1a, 0xf7, 0xa9, 0xc7, 0x28, 0x1b, 0x4c, 0x99, 0x44, 0xe0, 0x98, 0x3f, 0x81, 0x95, 0x88, 0xc1, - 0x25, 0x9f, 0x17, 0x21, 0xe3, 0x51, 0x77, 0x18, 0xa4, 0x09, 0x4d, 0x65, 0xdb, 0x1c, 0xde, 0x5e, - 0x92, 0xfc, 0x65, 0xc4, 0x9a, 0x48, 0xfc, 0xc5, 0x6e, 0xff, 0x33, 0x82, 0xe2, 0x4d, 0x6b, 0x87, - 0x8e, 0x94, 0xa7, 0x61, 0x30, 0xc6, 0x96, 0x4d, 0xa5, 0xc6, 0xf9, 0x37, 0x0b, 0xfb, 0x8f, 0xad, - 0xd1, 0x94, 0x0a, 0x92, 0x39, 0x22, 0x57, 0x8b, 0xc6, 0x2c, 0x7a, 0xe1, 0x98, 0x45, 0xa1, 0xe7, - 0x55, 0x20, 0xfd, 0x88, 0x29, 0x8a, 0xc7, 0x6b, 0x9e, 0x88, 0x85, 0x79, 0x06, 0x4a, 0x52, 0x0a, - 0xa9, 0xbe, 0x90, 0x65, 0xa6, 0xbe, 0xbc, 0x62, 0xd9, 0xb4, 0x21, 0x23, 0xb4, 0x8d, 0x5f, 0x87, - 0x7c, 0x50, 0x03, 0xb8, 0xb4, 0xa9, 0x76, 0x66, 0x7f, 0xd6, 0x48, 0xfa, 0x1e, 0x09, 0x37, 0x70, - 0x03, 0xd2, 0xfc, 0x24, 0x97, 0x1c, 0xb5, 0xf3, 0xfb, 0xb3, 0x86, 0x00, 0x10, 0xf1, 0x83, 0x4f, - 0x80, 0x31, 0x60, 0x69, 0x98, 0xa9, 0xc0, 0x68, 0xe7, 0xf6, 0x67, 0x0d, 0xbe, 0x26, 0xfc, 0xaf, - 0x79, 0x0d, 0x8a, 0x37, 0x69, 0xdf, 0xea, 0xec, 0xca, 0x4b, 0x2b, 0x8a, 0x1c, 0xbb, 0x10, 0x29, - 0x1a, 0xa7, 0xa0, 0x18, 0xdc, 0xf8, 0xc0, 0xf6, 0xa4, 0x53, 0x17, 0x02, 0xd8, 0x2d, 0xcf, 0xfc, - 0x39, 0x02, 0x69, 0x67, 0x6c, 0x42, 0x66, 0xc4, 0x64, 0xf5, 0x84, 0x8d, 0xda, 0xb0, 0x3f, 0x6b, - 0x48, 0x08, 0x91, 0xbf, 0xf8, 0x32, 0x64, 0x3d, 0x7e, 0x23, 0x23, 0x16, 0x77, 0x1f, 0xbe, 0xd1, - 0x5e, 0x66, 0x6e, 0xb0, 0x3f, 0x6b, 0x28, 0x44, 0xa2, 0x3e, 0x70, 0x33, 0x52, 0x5f, 0x84, 0x60, - 0x4b, 0xfb, 0xb3, 0x86, 0x06, 0xd5, 0xeb, 0x8d, 0xf9, 0x33, 0x04, 0x85, 0x7b, 0xd6, 0x30, 0x70, - 0xa1, 0xc0, 0x44, 0x48, 0x33, 0x11, 0x0b, 0xe7, 0x2e, 0x1d, 0x59, 0xbb, 0x57, 0x1d, 0x97, 0xd3, - 0x2c, 0x91, 0x60, 0x1d, 0x96, 0x04, 0x63, 0x6e, 0x49, 0x48, 0x2f, 0x9c, 0xd8, 0xae, 0x1b, 0xb9, - 0x64, 0x39, 0x65, 0xfe, 0x0e, 0x41, 0x51, 0x70, 0x26, 0xdd, 0xe2, 0xc7, 0x90, 0x11, 0x8c, 0x73, - 0xde, 0xbe, 0x24, 0xf8, 0xcf, 0x2d, 0x12, 0xf8, 0x92, 0x26, 0xfe, 0x3e, 0x2c, 0x75, 0x5d, 0x67, - 0x32, 0xa1, 0xdd, 0x6d, 0x99, 0x62, 0x92, 0xf1, 0x14, 0xb3, 0xa9, 0xef, 0x93, 0x18, 0xba, 0xf9, - 0x57, 0x04, 0x25, 0x19, 0xcd, 0x52, 0x97, 0x81, 0x0e, 0xd0, 0x0b, 0x27, 0xf7, 0xe4, 0xa2, 0xc9, - 0xfd, 0x28, 0x64, 0xfa, 0xae, 0x33, 0x9d, 0x78, 0xd5, 0x94, 0x88, 0x1d, 0xb1, 0x5a, 0x2c, 0xe9, - 0x9b, 0xd7, 0x61, 0x49, 0x89, 0x72, 0x48, 0x4a, 0xab, 0xc5, 0x53, 0xda, 0x56, 0x97, 0x8e, 0xfd, - 0x61, 0x6f, 0x18, 0x24, 0x29, 0x89, 0x6f, 0x7e, 0x8a, 0xa0, 0x1c, 0x47, 0xc1, 0xdf, 0xd3, 0xe2, - 0x80, 0x91, 0x3b, 0x7d, 0x38, 0xb9, 0x26, 0x4f, 0x0e, 0xde, 0x7b, 0x63, 0xdf, 0xdd, 0x55, 0x31, - 0x52, 0x7b, 0x07, 0x0a, 0x1a, 0x98, 0x15, 0x8f, 0x87, 0x54, 0xf9, 0x2c, 0xfb, 0x0c, 0x83, 0x35, - 0x29, 0xfc, 0x98, 0x2f, 0x2e, 0x25, 0x2f, 0x22, 0xe6, 0xf1, 0xa5, 0x88, 0x25, 0xf1, 0x45, 0x30, - 0x7a, 0xae, 0x63, 0x2f, 0x64, 0x26, 0x7e, 0x02, 0x7f, 0x0b, 0x92, 0xbe, 0xb3, 0x90, 0x91, 0x92, - 0xbe, 0xc3, 0x6c, 0x24, 0x85, 0x4f, 0x89, 0x0e, 0x4d, 0xac, 0xcc, 0xdf, 0x22, 0x58, 0x66, 0x67, - 0x84, 0x06, 0x36, 0x06, 0xd3, 0xf1, 0x43, 0xbc, 0x06, 0x65, 0x76, 0xd3, 0x83, 0xa1, 0xac, 0x00, - 0x0f, 0x86, 0x5d, 0x29, 0xe6, 0x12, 0x83, 0xab, 0xc2, 0xb0, 0xd5, 0xc5, 0xc7, 0x20, 0x3b, 0xf5, - 0x04, 0x82, 0x90, 0x39, 0xc3, 0x96, 0x5b, 0x5d, 0x7c, 0x4e, 0xbb, 0x8e, 0xe9, 0x5a, 0x6b, 0x93, - 0xb8, 0x0e, 0xef, 0x5a, 0x43, 0x37, 0x48, 0x3e, 0x67, 0x20, 0xd3, 0x61, 0x17, 0x0b, 0x3f, 0x61, - 0x15, 0x28, 0x40, 0xe6, 0x0c, 0x11, 0xb9, 0x6d, 0x7e, 0x1b, 0xf2, 0xc1, 0xe9, 0xb9, 0x85, 0x67, - 0xae, 0x05, 0xcc, 0xcb, 0xb0, 0x2c, 0x92, 0xea, 0xfc, 0xc3, 0xc5, 0x79, 0x87, 0x8b, 0xea, 0xf0, - 0x71, 0x48, 0x0b, 0xad, 0x60, 0x30, 0xba, 0x96, 0x6f, 0xa9, 0x23, 0xec, 0xdb, 0xac, 0xc2, 0xd1, - 0x7b, 0xae, 0x35, 0xf6, 0x7a, 0xd4, 0xe5, 0x48, 0x81, 0xef, 0x9a, 0xaf, 0xc1, 0x0a, 0x4b, 0x24, - 0xd4, 0xf5, 0x36, 0x9c, 0xe9, 0xd8, 0x57, 0x8d, 0xfe, 0x79, 0xa8, 0x44, 0xc1, 0xd2, 0xd5, 0x2b, - 0x90, 0xee, 0x30, 0x00, 0xa7, 0x5e, 0x22, 0x62, 0x61, 0xfe, 0x0a, 0x01, 0xbe, 0x46, 0x7d, 0x4e, - 0x7a, 0x6b, 0xd3, 0xd3, 0x9a, 0x3b, 0xdb, 0xf2, 0x3b, 0x03, 0xea, 0x7a, 0xaa, 0xd1, 0x51, 0xeb, - 0xff, 0x47, 0x73, 0x67, 0x5e, 0x80, 0x95, 0x08, 0x97, 0x52, 0xa6, 0x1a, 0xe4, 0x3a, 0x12, 0x26, - 0x8b, 0x6a, 0xb0, 0x36, 0x7f, 0x9f, 0x84, 0x9c, 0xb0, 0x2d, 0xed, 0xe1, 0x0b, 0x50, 0xe8, 0x31, - 0x5f, 0x73, 0x27, 0xee, 0x50, 0xaa, 0xc0, 0x68, 0x2f, 0xef, 0xcf, 0x1a, 0x3a, 0x98, 0xe8, 0x0b, - 0xfc, 0x46, 0xcc, 0xf1, 0xda, 0x95, 0xbd, 0x59, 0x23, 0xf3, 0x03, 0xe6, 0x7c, 0x9b, 0xac, 0xbc, - 0x71, 0x37, 0xdc, 0x0c, 0xdc, 0xf1, 0x86, 0x8c, 0x36, 0xde, 0xe9, 0xb5, 0xbf, 0xc3, 0xd8, 0x8f, - 0xe5, 0xeb, 0x89, 0xeb, 0xd8, 0xd4, 0x1f, 0xd0, 0xa9, 0xd7, 0xea, 0x38, 0xb6, 0xed, 0x8c, 0x5b, - 0x7c, 0xac, 0xe3, 0x42, 0xb3, 0x1a, 0xcd, 0x8e, 0xcb, 0x00, 0xbc, 0x07, 0x59, 0x7f, 0xe0, 0x3a, - 0xd3, 0xfe, 0x80, 0x97, 0x9f, 0x54, 0xfb, 0xd2, 0xe2, 0xf4, 0x14, 0x05, 0xa2, 0x3e, 0xf0, 0x29, - 0xa6, 0x2d, 0xda, 0x79, 0xe8, 0x4d, 0x6d, 0x31, 0x2c, 0xb5, 0xd3, 0xfb, 0xb3, 0x06, 0x7a, 0x83, - 0x04, 0x60, 0xf3, 0xd3, 0x24, 0x34, 0xb8, 0x0b, 0xdf, 0xe7, 0xbd, 0xc9, 0x55, 0xc7, 0xbd, 0x45, - 0x7d, 0x77, 0xd8, 0xb9, 0x6d, 0xd9, 0x54, 0xf9, 0x46, 0x03, 0x0a, 0x36, 0x07, 0x3e, 0xd0, 0x82, - 0x03, 0xec, 0x00, 0x0f, 0x9f, 0x04, 0xe0, 0x61, 0x27, 0xf6, 0x45, 0x9c, 0xe4, 0x39, 0x84, 0x6f, - 0x6f, 0x44, 0x34, 0xd5, 0x5a, 0x50, 0x32, 0xa9, 0xa1, 0xad, 0xb8, 0x86, 0x16, 0xa6, 0x13, 0xa8, - 0x45, 0xf7, 0xf5, 0x74, 0xd4, 0xd7, 0xcd, 0xbf, 0x21, 0xa8, 0xdf, 0x54, 0x9c, 0xbf, 0xa0, 0x3a, - 0x94, 0xbc, 0xc9, 0x97, 0x24, 0x6f, 0xea, 0x7f, 0x93, 0xd7, 0xfc, 0x93, 0x16, 0xf2, 0x84, 0xf6, - 0x94, 0x1c, 0x1b, 0x5a, 0xb9, 0x78, 0x19, 0x6c, 0x26, 0x5f, 0xa2, 0x59, 0x52, 0x31, 0xb3, 0xbc, - 0x1b, 0xa6, 0x03, 0x2e, 0x81, 0x4c, 0x07, 0xa7, 0xc1, 0x70, 0x69, 0x4f, 0x15, 0x5f, 0x1c, 0xcf, - 0xf1, 0xb4, 0x47, 0xf8, 0xbe, 0xf9, 0x07, 0x04, 0xe5, 0x6b, 0xd4, 0x8f, 0xb6, 0x35, 0xaf, 0x92, - 0xfc, 0xef, 0xc3, 0x11, 0x8d, 0x7f, 0x29, 0xfd, 0x5b, 0xb1, 0x5e, 0xe6, 0xb5, 0x50, 0xfe, 0xad, - 0x71, 0x97, 0x7e, 0x22, 0x67, 0xb4, 0x68, 0x1b, 0x73, 0x17, 0x0a, 0xda, 0x26, 0xbe, 0x12, 0x6b, - 0x60, 0xe6, 0x15, 0xd5, 0x76, 0x45, 0xca, 0x24, 0xa6, 0x34, 0xd9, 0x9e, 0x06, 0xe5, 0x7e, 0x1b, - 0x30, 0x1f, 0x1b, 0x39, 0x59, 0x3d, 0x53, 0x73, 0xe8, 0x8d, 0xa0, 0x9f, 0x09, 0xd6, 0xf8, 0x14, - 0x18, 0xae, 0xf3, 0x58, 0x75, 0xa6, 0xa5, 0xf0, 0x4a, 0xe2, 0x3c, 0x26, 0x7c, 0xcb, 0xbc, 0x0c, - 0x29, 0xe2, 0x3c, 0xc6, 0x75, 0x00, 0xd7, 0x1a, 0xf7, 0xe9, 0xfd, 0x60, 0x60, 0x29, 0x12, 0x0d, - 0x72, 0x48, 0x7d, 0xdd, 0x80, 0x23, 0x3a, 0x47, 0xc2, 0xdc, 0x4d, 0xc8, 0x32, 0xe0, 0x70, 0xde, - 0xa3, 0x17, 0x47, 0x14, 0xb3, 0xaf, 0x42, 0x62, 0x3e, 0x03, 0x21, 0x1c, 0x9f, 0x80, 0xbc, 0x6f, - 0xed, 0x8c, 0xe8, 0xed, 0x30, 0xe6, 0x43, 0x00, 0xdb, 0x65, 0xb3, 0xd6, 0x7d, 0xad, 0x51, 0x08, - 0x01, 0xf8, 0x2c, 0x94, 0x43, 0x9e, 0xef, 0xba, 0xb4, 0x37, 0xfc, 0x84, 0x5b, 0xb8, 0x48, 0x0e, - 0xc0, 0xf1, 0x1a, 0x2c, 0x87, 0xb0, 0x6d, 0x5e, 0x76, 0x0d, 0x8e, 0x1a, 0x07, 0x33, 0xdd, 0x70, - 0x71, 0xdf, 0x7b, 0x34, 0xb5, 0x46, 0x3c, 0x91, 0x15, 0x89, 0x06, 0x31, 0xff, 0x88, 0xe0, 0x88, - 0x30, 0x35, 0x9b, 0xb2, 0x5f, 0x45, 0xaf, 0xff, 0x35, 0x02, 0xac, 0x4b, 0x20, 0x5d, 0xeb, 0x9b, - 0xfa, 0xf3, 0x09, 0xab, 0xeb, 0x05, 0x3e, 0x42, 0x0a, 0x50, 0xf8, 0x02, 0x62, 0x06, 0x2d, 0x20, - 0x7f, 0x77, 0x14, 0x33, 0xaa, 0x80, 0xa8, 0xee, 0x8f, 0x8d, 0xd6, 0x3b, 0xbb, 0x3e, 0xf5, 0xe4, - 0x84, 0xc9, 0x47, 0x6b, 0x0e, 0x20, 0xe2, 0x87, 0xdd, 0x45, 0xc7, 0x3e, 0xf7, 0x1a, 0x23, 0xbc, - 0x4b, 0x82, 0x88, 0xfa, 0x60, 0x65, 0xa3, 0x74, 0xdf, 0x19, 0x4d, 0xc3, 0x2a, 0xf1, 0x0a, 0xe9, - 0x39, 0x3a, 0xfa, 0xa6, 0xe5, 0xe8, 0x6b, 0xfe, 0x08, 0x96, 0x94, 0x48, 0x52, 0xf1, 0x6f, 0x42, - 0xf6, 0x63, 0x0e, 0x99, 0xf3, 0x20, 0x24, 0x50, 0x65, 0xb2, 0x51, 0x68, 0xd1, 0x77, 0xd6, 0x80, - 0xf2, 0x75, 0xc8, 0x08, 0x74, 0x7c, 0x42, 0x6f, 0xb8, 0xc5, 0xcb, 0x05, 0x5b, 0xcb, 0xee, 0xd9, - 0x84, 0x8c, 0x20, 0x24, 0xcd, 0xc3, 0x2d, 0x28, 0x20, 0x44, 0xfe, 0x9e, 0x3d, 0x0d, 0xf9, 0xe0, - 0x91, 0x14, 0x17, 0x20, 0x7b, 0xf5, 0x0e, 0xf9, 0xe1, 0x15, 0xb2, 0x59, 0x4e, 0xe0, 0x22, 0xe4, - 0xda, 0x57, 0x36, 0x6e, 0xf0, 0x15, 0x5a, 0xff, 0xb7, 0xa1, 0xe2, 0xdf, 0xc5, 0xdf, 0x85, 0xb4, - 0x08, 0xea, 0xa3, 0x21, 0xff, 0xfa, 0x53, 0x67, 0xed, 0xd8, 0x01, 0xb8, 0x6c, 0xc1, 0x13, 0x6f, - 0x22, 0x7c, 0x1b, 0x0a, 0x1c, 0x28, 0x9f, 0x53, 0x4e, 0xc4, 0x5f, 0x35, 0x22, 0x94, 0x4e, 0x1e, - 0xb2, 0xab, 0xd1, 0xbb, 0x04, 0x69, 0x9e, 0x57, 0x75, 0x6e, 0xf4, 0xe7, 0x30, 0x9d, 0x9b, 0xc8, - 0x03, 0x93, 0x99, 0xc0, 0xef, 0x80, 0xc1, 0x7a, 0x7f, 0xac, 0xa5, 0x7e, 0xed, 0x15, 0xa4, 0x76, - 0x34, 0x0e, 0xd6, 0xae, 0x7d, 0x37, 0x78, 0xcc, 0x39, 0x16, 0x1f, 0x5a, 0xd5, 0xf1, 0xea, 0xc1, - 0x8d, 0xe0, 0xe6, 0x3b, 0xe2, 0x55, 0x43, 0x4d, 0x1d, 0xf8, 0x64, 0xf4, 0xaa, 0xd8, 0x90, 0x52, - 0xab, 0x1f, 0xb6, 0x1d, 0x10, 0xbc, 0x09, 0x05, 0xad, 0xe3, 0xd7, 0xd5, 0x7a, 0x70, 0x5c, 0xd1, - 0xd5, 0x3a, 0x67, 0x4c, 0x30, 0x13, 0xf8, 0x1a, 0xe4, 0x58, 0xc1, 0x64, 0x79, 0x03, 0x1f, 0x8f, - 0xd7, 0x45, 0x2d, 0x1f, 0xd6, 0x4e, 0xcc, 0xdf, 0x0c, 0x08, 0x5d, 0x85, 0xe5, 0xa0, 0xf2, 0x4a, - 0xa7, 0x3d, 0x16, 0xf7, 0xfa, 0x39, 0xfa, 0x8a, 0x46, 0x8e, 0x99, 0x58, 0xff, 0x08, 0x72, 0x6a, - 0xc8, 0xc5, 0x1f, 0xc0, 0x52, 0x74, 0xc4, 0xc3, 0xdf, 0xd0, 0xd4, 0x13, 0x9d, 0x9c, 0x6b, 0xab, - 0xda, 0xd6, 0xfc, 0xb9, 0x30, 0xb1, 0x86, 0xd6, 0x3f, 0x52, 0xff, 0x9d, 0xd9, 0xb4, 0x7c, 0x0b, - 0xdf, 0x81, 0x25, 0x2e, 0x7d, 0xf0, 0xef, 0x9b, 0x88, 0x97, 0x1e, 0xf8, 0x5f, 0x51, 0xc4, 0x4b, - 0x0f, 0xfe, 0xcf, 0xc8, 0x4c, 0xb4, 0x3f, 0x7c, 0xfa, 0xac, 0x9e, 0xf8, 0xfc, 0x59, 0x3d, 0xf1, - 0xc5, 0xb3, 0x3a, 0xfa, 0xe9, 0x5e, 0x1d, 0xfd, 0x66, 0xaf, 0x8e, 0x9e, 0xec, 0xd5, 0xd1, 0xd3, - 0xbd, 0x3a, 0xfa, 0xe7, 0x5e, 0x1d, 0xfd, 0x6b, 0xaf, 0x9e, 0xf8, 0x62, 0xaf, 0x8e, 0x3e, 0x7b, - 0x5e, 0x4f, 0x3c, 0x7d, 0x5e, 0x4f, 0x7c, 0xfe, 0xbc, 0x9e, 0xf8, 0xf0, 0xf5, 0x2f, 0x7b, 0xbc, - 0x52, 0x37, 0xee, 0x64, 0xf8, 0xcf, 0x5b, 0xff, 0x0d, 0x00, 0x00, 0xff, 0xff, 0x21, 0xf6, 0xc1, - 0x47, 0x5c, 0x1b, 0x00, 0x00, + // 2213 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xd4, 0x39, 0xcd, 0x6f, 0x1b, 0xc7, + 0xf5, 0x1c, 0x72, 0x49, 0x91, 0x8f, 0xd4, 0x87, 0x47, 0x8c, 0xad, 0x1f, 0x6d, 0x93, 0xf2, 0x20, + 0x3f, 0x5b, 0xb0, 0x1d, 0x32, 0x56, 0xda, 0xd4, 0xb1, 0x9b, 0x16, 0xa6, 0x14, 0x3b, 0xf2, 0x77, + 0x46, 0xae, 0x5b, 0x04, 0x0d, 0x8c, 0x15, 0x39, 0xfc, 0x80, 0xb9, 0x5c, 0x7a, 0x77, 0x19, 0x47, + 0x40, 0x0f, 0xfd, 0x07, 0x02, 0xe4, 0x56, 0xf4, 0x52, 0xf4, 0x50, 0xa0, 0x45, 0x81, 0x5e, 0xfa, + 0x07, 0xb4, 0x3d, 0x14, 0xa8, 0x7b, 0x73, 0x6f, 0x41, 0x0f, 0x6c, 0x2d, 0xa3, 0x40, 0xa1, 0x53, + 0xfe, 0x81, 0x16, 0xc5, 0x7c, 0xed, 0xce, 0xae, 0xa8, 0x24, 0x74, 0x0d, 0x14, 0xbe, 0x88, 0xfb, + 0xde, 0xbc, 0x79, 0xf3, 0xbe, 0xdf, 0xbc, 0x11, 0x1c, 0x1f, 0x3d, 0xec, 0x36, 0x06, 0x6e, 0x77, + 0xe4, 0xb9, 0x81, 0x1b, 0x7e, 0xd4, 0xc5, 0x5f, 0x9c, 0xd7, 0x70, 0xa5, 0xdc, 0x75, 0xbb, 0xae, + 0xa4, 0xe1, 0x5f, 0x72, 0xbd, 0x52, 0xeb, 0xba, 0x6e, 0x77, 0xc0, 0x1a, 0x02, 0xda, 0x19, 0x77, + 0x1a, 0x41, 0xdf, 0x61, 0x7e, 0x60, 0x3b, 0x23, 0x45, 0xb0, 0xaa, 0xb8, 0x3f, 0x1a, 0x38, 0x6e, + 0x9b, 0x0d, 0x1a, 0x7e, 0x60, 0x07, 0xbe, 0xfc, 0xab, 0x28, 0x96, 0x39, 0xc5, 0x68, 0xec, 0xf7, + 0xc4, 0x1f, 0x89, 0x24, 0x65, 0xc0, 0xdb, 0x81, 0xc7, 0x6c, 0x87, 0xda, 0x01, 0xf3, 0x29, 0x7b, + 0x34, 0x66, 0x7e, 0x40, 0x6e, 0xc1, 0x72, 0x0c, 0xeb, 0x8f, 0xdc, 0xa1, 0xcf, 0xf0, 0xdb, 0x50, + 0xf4, 0x23, 0xf4, 0x0a, 0x5a, 0xcd, 0xac, 0x15, 0xd7, 0xcb, 0xf5, 0x50, 0x95, 0x68, 0x0f, 0x35, + 0x09, 0xc9, 0xcf, 0x10, 0x40, 0xb4, 0x86, 0xab, 0x00, 0x72, 0xf5, 0x7d, 0xdb, 0xef, 0xad, 0xa0, + 0x55, 0xb4, 0x66, 0x51, 0x03, 0x83, 0xcf, 0xc3, 0x91, 0x08, 0xba, 0xed, 0x6e, 0xf7, 0x6c, 0xaf, + 0xbd, 0x92, 0x16, 0x64, 0x07, 0x17, 0x30, 0x06, 0xcb, 0xb3, 0x03, 0xb6, 0x92, 0x59, 0x45, 0x6b, + 0x19, 0x2a, 0xbe, 0xf1, 0x51, 0xc8, 0x05, 0x6c, 0x68, 0x0f, 0x83, 0x15, 0x6b, 0x15, 0xad, 0x15, + 0xa8, 0x82, 0x38, 0x9e, 0xeb, 0xce, 0xfc, 0x95, 0xec, 0x2a, 0x5a, 0x9b, 0xa7, 0x0a, 0x22, 0x7f, + 0x4a, 0x43, 0xe9, 0x83, 0x31, 0xf3, 0x76, 0x95, 0x01, 0x70, 0x05, 0xf2, 0x3e, 0x1b, 0xb0, 0x56, + 0xe0, 0x7a, 0x42, 0xc0, 0x02, 0x0d, 0x61, 0x5c, 0x86, 0xec, 0xa0, 0xef, 0xf4, 0x03, 0x21, 0xd2, + 0x3c, 0x95, 0x00, 0xbe, 0x04, 0x59, 0x3f, 0xb0, 0xbd, 0x40, 0xc8, 0x51, 0x5c, 0xaf, 0xd4, 0xa5, + 0xc3, 0xea, 0xda, 0x61, 0xf5, 0x7b, 0xda, 0x61, 0xcd, 0xfc, 0x93, 0x49, 0x2d, 0xf5, 0xd9, 0xdf, + 0x6a, 0x88, 0xca, 0x2d, 0xf8, 0x6d, 0xc8, 0xb0, 0x61, 0x5b, 0xc8, 0xfa, 0x75, 0x77, 0xf2, 0x0d, + 0xf8, 0x02, 0x14, 0xda, 0x7d, 0x8f, 0xb5, 0x82, 0xbe, 0x3b, 0x14, 0x1a, 0x2d, 0xac, 0x2f, 0x47, + 0xde, 0xd8, 0xd4, 0x4b, 0x34, 0xa2, 0xc2, 0xe7, 0x21, 0xe7, 0x73, 0xb3, 0xf9, 0x2b, 0x73, 0xab, + 0x99, 0xb5, 0x42, 0xb3, 0xbc, 0x3f, 0xa9, 0x2d, 0x49, 0xcc, 0x79, 0xd7, 0xe9, 0x07, 0xcc, 0x19, + 0x05, 0xbb, 0x54, 0xd1, 0xe0, 0xb3, 0x30, 0xd7, 0x66, 0x03, 0xc6, 0x9d, 0x9d, 0x17, 0xce, 0x5e, + 0x32, 0xd8, 0x8b, 0x05, 0xaa, 0x09, 0xae, 0x5b, 0xf9, 0xdc, 0xd2, 0x1c, 0xf9, 0x37, 0x02, 0xbc, + 0x6d, 0x3b, 0xa3, 0x01, 0xfb, 0xda, 0xf6, 0x0c, 0x2d, 0x97, 0x7e, 0x61, 0xcb, 0x65, 0x66, 0xb5, + 0x5c, 0x64, 0x06, 0x6b, 0x36, 0x33, 0x64, 0xbf, 0xc2, 0x0c, 0xe4, 0x26, 0xe4, 0x24, 0xea, 0xab, + 0x62, 0x28, 0xd2, 0x39, 0xa3, 0xb5, 0x59, 0x8a, 0xb4, 0xc9, 0x08, 0x39, 0xc9, 0xcf, 0x11, 0xcc, + 0x2b, 0x43, 0xaa, 0x1c, 0xdc, 0x81, 0x39, 0x99, 0x03, 0x3a, 0xff, 0x8e, 0x25, 0xf3, 0xef, 0x4a, + 0xdb, 0x1e, 0x05, 0xcc, 0x6b, 0x36, 0x9e, 0x4c, 0x6a, 0xe8, 0xaf, 0x93, 0xda, 0x99, 0x6e, 0x3f, + 0xe8, 0x8d, 0x77, 0xea, 0x2d, 0xd7, 0x69, 0x74, 0x3d, 0xbb, 0x63, 0x0f, 0xed, 0xc6, 0xc0, 0x7d, + 0xd8, 0x6f, 0xe8, 0x7a, 0xa0, 0xf3, 0x56, 0x33, 0xc6, 0xe7, 0x84, 0x74, 0x81, 0xaf, 0x3c, 0xb2, + 0x58, 0x97, 0x65, 0x64, 0x6b, 0xd8, 0x65, 0x3e, 0xe7, 0x6c, 0x71, 0x63, 0x52, 0x49, 0x43, 0x7e, + 0x04, 0xcb, 0x31, 0x87, 0x2b, 0x39, 0x2f, 0x42, 0xce, 0x67, 0x5e, 0x3f, 0x2c, 0x13, 0x86, 0xc9, + 0xb6, 0x05, 0xbe, 0xb9, 0xa0, 0xe4, 0xcb, 0x49, 0x98, 0x2a, 0xfa, 0xd9, 0x4e, 0xff, 0x23, 0x82, + 0xd2, 0x4d, 0x7b, 0x87, 0x0d, 0x74, 0xa4, 0x61, 0xb0, 0x86, 0xb6, 0xc3, 0x94, 0xc5, 0xc5, 0x37, + 0x4f, 0xfb, 0x8f, 0xed, 0xc1, 0x98, 0x49, 0x96, 0x79, 0xaa, 0xa0, 0x59, 0x73, 0x16, 0xbd, 0x70, + 0xce, 0xa2, 0x28, 0xf2, 0xca, 0x90, 0x7d, 0xc4, 0x0d, 0x25, 0xf2, 0xb5, 0x40, 0x25, 0x40, 0xce, + 0xc0, 0xbc, 0xd2, 0x42, 0x99, 0x2f, 0x12, 0x99, 0x9b, 0xaf, 0xa0, 0x45, 0x26, 0x0e, 0xe4, 0xa4, + 0xb5, 0xf1, 0xeb, 0x50, 0x08, 0x7b, 0x80, 0xd0, 0x36, 0xd3, 0xcc, 0xed, 0x4f, 0x6a, 0xe9, 0xc0, + 0xa7, 0xd1, 0x02, 0xae, 0x41, 0x56, 0xec, 0x14, 0x9a, 0xa3, 0x66, 0x61, 0x7f, 0x52, 0x93, 0x08, + 0x2a, 0x7f, 0xf0, 0x09, 0xb0, 0x7a, 0xbc, 0x0c, 0x73, 0x13, 0x58, 0xcd, 0xfc, 0xfe, 0xa4, 0x26, + 0x60, 0x2a, 0xfe, 0x92, 0x6b, 0x50, 0xba, 0xc9, 0xba, 0x76, 0x6b, 0x57, 0x1d, 0x5a, 0xd6, 0xec, + 0xf8, 0x81, 0x48, 0xf3, 0x38, 0x05, 0xa5, 0xf0, 0xc4, 0x07, 0x8e, 0xaf, 0x82, 0xba, 0x18, 0xe2, + 0x6e, 0xf9, 0xe4, 0xa7, 0x08, 0x94, 0x9f, 0x31, 0x81, 0xdc, 0x80, 0xeb, 0xea, 0x4b, 0x1f, 0x35, + 0x61, 0x7f, 0x52, 0x53, 0x18, 0xaa, 0x7e, 0xf1, 0x65, 0x98, 0xf3, 0xc5, 0x89, 0x9c, 0x59, 0x32, + 0x7c, 0xc4, 0x42, 0x73, 0x91, 0x87, 0xc1, 0xfe, 0xa4, 0xa6, 0x09, 0xa9, 0xfe, 0xc0, 0xf5, 0x58, + 0x7f, 0x91, 0x8a, 0x2d, 0xec, 0x4f, 0x6a, 0x06, 0xd6, 0xec, 0x37, 0xe4, 0x27, 0x08, 0x8a, 0xf7, + 0xec, 0x7e, 0x18, 0x42, 0xa1, 0x8b, 0x90, 0xe1, 0x22, 0x9e, 0xce, 0x6d, 0x36, 0xb0, 0x77, 0xaf, + 0xba, 0x9e, 0xe0, 0x39, 0x4f, 0x43, 0x38, 0x6a, 0x09, 0xd6, 0xd4, 0x96, 0x90, 0x9d, 0xb9, 0xb0, + 0x5d, 0xb7, 0xf2, 0xe9, 0xa5, 0x0c, 0xf9, 0x0d, 0x82, 0x92, 0x94, 0x4c, 0x85, 0xc5, 0x0f, 0x21, + 0x27, 0x05, 0x17, 0xb2, 0x7d, 0x49, 0xf2, 0x9f, 0x9b, 0x25, 0xf1, 0x15, 0x4f, 0xfc, 0x5d, 0x58, + 0x68, 0x7b, 0xee, 0x68, 0xc4, 0xda, 0xdb, 0xaa, 0xc4, 0xa4, 0x93, 0x25, 0x66, 0xd3, 0x5c, 0xa7, + 0x09, 0x72, 0xf2, 0x67, 0x04, 0xf3, 0x2a, 0x9b, 0x95, 0x2d, 0x43, 0x1b, 0xa0, 0x17, 0x2e, 0xee, + 0xe9, 0x59, 0x8b, 0xfb, 0x51, 0xc8, 0x75, 0x3d, 0x77, 0x3c, 0xf2, 0x57, 0x32, 0x32, 0x77, 0x24, + 0x34, 0x5b, 0xd1, 0x27, 0xd7, 0x61, 0x41, 0xab, 0x72, 0x48, 0x49, 0xab, 0x24, 0x4b, 0xda, 0x56, + 0x9b, 0x0d, 0x83, 0x7e, 0xa7, 0x1f, 0x16, 0x29, 0x45, 0x4f, 0x3e, 0x45, 0xb0, 0x94, 0x24, 0xc1, + 0xdf, 0x31, 0xf2, 0x80, 0xb3, 0x3b, 0x7d, 0x38, 0xbb, 0xba, 0x28, 0x0e, 0xfe, 0x7b, 0xc3, 0xc0, + 0xdb, 0xd5, 0x39, 0x52, 0x79, 0x07, 0x8a, 0x06, 0x9a, 0x37, 0x8f, 0x87, 0x4c, 0xc7, 0x2c, 0xff, + 0x8c, 0x92, 0x35, 0x2d, 0xe3, 0x58, 0x00, 0x97, 0xd2, 0x17, 0x11, 0x8f, 0xf8, 0xf9, 0x98, 0x27, + 0xf1, 0x45, 0xb0, 0x3a, 0x9e, 0xeb, 0xcc, 0xe4, 0x26, 0xb1, 0x03, 0x7f, 0x03, 0xd2, 0x81, 0x3b, + 0x93, 0x93, 0xd2, 0x81, 0xcb, 0x7d, 0xa4, 0x94, 0xcf, 0xc8, 0x1b, 0x9a, 0x84, 0xc8, 0xaf, 0x11, + 0x2c, 0xf2, 0x3d, 0xd2, 0x02, 0x1b, 0xbd, 0xf1, 0xf0, 0x21, 0x5e, 0x83, 0x25, 0x7e, 0xd2, 0x83, + 0xbe, 0xea, 0x00, 0x0f, 0xfa, 0x6d, 0xa5, 0xe6, 0x02, 0xc7, 0xeb, 0xc6, 0xb0, 0xd5, 0xc6, 0xc7, + 0x60, 0x6e, 0xec, 0x4b, 0x02, 0xa9, 0x73, 0x8e, 0x83, 0x5b, 0x6d, 0x7c, 0xce, 0x38, 0x8e, 0xdb, + 0xda, 0xb8, 0x26, 0x09, 0x1b, 0xde, 0xb5, 0xfb, 0x5e, 0x58, 0x7c, 0xce, 0x40, 0xae, 0xc5, 0x0f, + 0x96, 0x71, 0xc2, 0x3b, 0x50, 0x48, 0x2c, 0x04, 0xa2, 0x6a, 0x99, 0x7c, 0x13, 0x0a, 0xe1, 0xee, + 0xa9, 0x8d, 0x67, 0xaa, 0x07, 0xc8, 0x65, 0x58, 0x94, 0x45, 0x75, 0xfa, 0xe6, 0xd2, 0xb4, 0xcd, + 0x25, 0xbd, 0xf9, 0x38, 0x64, 0xa5, 0x55, 0x30, 0x58, 0x6d, 0x3b, 0xb0, 0xf5, 0x16, 0xfe, 0x4d, + 0x56, 0xe0, 0xe8, 0x3d, 0xcf, 0x1e, 0xfa, 0x1d, 0xe6, 0x09, 0xa2, 0x30, 0x76, 0xc9, 0x6b, 0xb0, + 0xcc, 0x0b, 0x09, 0xf3, 0xfc, 0x0d, 0x77, 0x3c, 0x0c, 0xf4, 0x45, 0xff, 0x3c, 0x94, 0xe3, 0x68, + 0x15, 0xea, 0x65, 0xc8, 0xb6, 0x38, 0x42, 0x70, 0x9f, 0xa7, 0x12, 0x20, 0xbf, 0x40, 0x80, 0xaf, + 0xb1, 0x40, 0xb0, 0xde, 0xda, 0xf4, 0x8d, 0xcb, 0x9d, 0x63, 0x07, 0xad, 0x1e, 0xf3, 0x7c, 0x7d, + 0xd1, 0xd1, 0xf0, 0xff, 0xe2, 0x72, 0x47, 0x2e, 0xc0, 0x72, 0x4c, 0x4a, 0xa5, 0x53, 0x05, 0xf2, + 0x2d, 0x85, 0x53, 0x4d, 0x35, 0x84, 0xc9, 0x6f, 0xd3, 0x90, 0x97, 0xbe, 0x65, 0x1d, 0x7c, 0x01, + 0x8a, 0x1d, 0x1e, 0x6b, 0xde, 0xc8, 0xeb, 0x2b, 0x13, 0x58, 0xcd, 0xc5, 0xfd, 0x49, 0xcd, 0x44, + 0x53, 0x13, 0xc0, 0x6f, 0x24, 0x02, 0xaf, 0x59, 0xde, 0x9b, 0xd4, 0x72, 0xdf, 0xe3, 0xc1, 0xb7, + 0xc9, 0xdb, 0x9b, 0x08, 0xc3, 0xcd, 0x30, 0x1c, 0x6f, 0xa8, 0x6c, 0x13, 0x37, 0xbd, 0xe6, 0xb7, + 0xb8, 0xf8, 0x89, 0x7a, 0x3d, 0xf2, 0x5c, 0x87, 0x05, 0x3d, 0x36, 0xf6, 0x1b, 0x2d, 0xd7, 0x71, + 0xdc, 0x61, 0x43, 0x8c, 0x75, 0x42, 0x69, 0xde, 0xa3, 0xf9, 0x76, 0x95, 0x80, 0xf7, 0x60, 0x2e, + 0xe8, 0x79, 0xee, 0xb8, 0xdb, 0x13, 0xed, 0x27, 0xd3, 0xbc, 0x34, 0x3b, 0x3f, 0xcd, 0x81, 0xea, + 0x0f, 0x7c, 0x8a, 0x5b, 0x8b, 0xb5, 0x1e, 0xfa, 0x63, 0x47, 0x0e, 0x4b, 0xcd, 0xec, 0xfe, 0xa4, + 0x86, 0xde, 0xa0, 0x21, 0x9a, 0x7c, 0x9a, 0x86, 0x9a, 0x08, 0xe1, 0xfb, 0xe2, 0x6e, 0x72, 0xd5, + 0xf5, 0x6e, 0xb1, 0xc0, 0xeb, 0xb7, 0x6e, 0xdb, 0x0e, 0xd3, 0xb1, 0x51, 0x83, 0xa2, 0x23, 0x90, + 0x0f, 0x8c, 0xe4, 0x00, 0x27, 0xa4, 0xc3, 0x27, 0x01, 0x44, 0xda, 0xc9, 0x75, 0x99, 0x27, 0x05, + 0x81, 0x11, 0xcb, 0x1b, 0x31, 0x4b, 0x35, 0x66, 0xd4, 0x4c, 0x59, 0x68, 0x2b, 0x69, 0xa1, 0x99, + 0xf9, 0x84, 0x66, 0x31, 0x63, 0x3d, 0x1b, 0x8f, 0x75, 0xf2, 0x17, 0x04, 0xd5, 0x9b, 0x5a, 0xf2, + 0x17, 0x34, 0x87, 0xd6, 0x37, 0xfd, 0x92, 0xf4, 0xcd, 0xfc, 0x77, 0xfa, 0x92, 0x3f, 0x18, 0x29, + 0x4f, 0x59, 0x47, 0xeb, 0xb1, 0x61, 0xb4, 0x8b, 0x97, 0x21, 0x66, 0xfa, 0x25, 0xba, 0x25, 0x93, + 0x70, 0xcb, 0xbb, 0x51, 0x39, 0x10, 0x1a, 0xa8, 0x72, 0x70, 0x1a, 0x2c, 0x8f, 0x75, 0x74, 0xf3, + 0xc5, 0xc9, 0x1a, 0xcf, 0x3a, 0x54, 0xac, 0x93, 0xdf, 0x21, 0x58, 0xba, 0xc6, 0x82, 0xf8, 0xb5, + 0xe6, 0x55, 0xd2, 0xff, 0x7d, 0x38, 0x62, 0xc8, 0xaf, 0xb4, 0x7f, 0x2b, 0x71, 0x97, 0x79, 0x2d, + 0xd2, 0x7f, 0x6b, 0xd8, 0x66, 0x9f, 0xa8, 0x19, 0x2d, 0x7e, 0x8d, 0xb9, 0x0b, 0x45, 0x63, 0x11, + 0x5f, 0x49, 0x5c, 0x60, 0xa6, 0x35, 0xd5, 0x66, 0x59, 0xe9, 0x24, 0xa7, 0x34, 0x75, 0x3d, 0x0d, + 0xdb, 0xfd, 0x36, 0x60, 0x31, 0x36, 0x0a, 0xb6, 0x66, 0xa5, 0x16, 0xd8, 0x1b, 0xe1, 0x7d, 0x26, + 0x84, 0xf1, 0x29, 0xb0, 0x3c, 0xf7, 0xb1, 0xbe, 0x99, 0xce, 0x47, 0x47, 0x52, 0xf7, 0x31, 0x15, + 0x4b, 0xe4, 0x32, 0x64, 0xa8, 0xfb, 0x18, 0x57, 0x01, 0x3c, 0x7b, 0xd8, 0x65, 0xf7, 0xc3, 0x81, + 0xa5, 0x44, 0x0d, 0xcc, 0x21, 0xfd, 0x75, 0x03, 0x8e, 0x98, 0x12, 0x49, 0x77, 0xd7, 0x61, 0x8e, + 0x23, 0xfb, 0xd3, 0x1e, 0xbd, 0x04, 0xa1, 0x9c, 0x7d, 0x35, 0x11, 0x8f, 0x19, 0x88, 0xf0, 0xf8, + 0x04, 0x14, 0x02, 0x7b, 0x67, 0xc0, 0x6e, 0x47, 0x39, 0x1f, 0x21, 0xf8, 0x2a, 0x9f, 0xb5, 0xee, + 0x1b, 0x17, 0x85, 0x08, 0x81, 0xcf, 0xc2, 0x52, 0x24, 0xf3, 0x5d, 0x8f, 0x75, 0xfa, 0x9f, 0x08, + 0x0f, 0x97, 0xe8, 0x01, 0x3c, 0x5e, 0x83, 0xc5, 0x08, 0xb7, 0x2d, 0xda, 0xae, 0x25, 0x48, 0x93, + 0x68, 0x6e, 0x1b, 0xa1, 0xee, 0x7b, 0x8f, 0xc6, 0xf6, 0x40, 0x14, 0xb2, 0x12, 0x35, 0x30, 0xe4, + 0xf7, 0x08, 0x8e, 0x48, 0x57, 0xf3, 0x29, 0xfb, 0x55, 0x8c, 0xfa, 0x5f, 0x22, 0xc0, 0xa6, 0x06, + 0x2a, 0xb4, 0xfe, 0xdf, 0x7c, 0x3e, 0xe1, 0x7d, 0xbd, 0x28, 0x46, 0x48, 0x89, 0x8a, 0x5e, 0x40, + 0x48, 0x78, 0x05, 0x14, 0xef, 0x8e, 0x72, 0x46, 0x95, 0x18, 0x7d, 0xfb, 0xe3, 0xa3, 0xf5, 0xce, + 0x6e, 0xc0, 0x7c, 0x35, 0x61, 0x8a, 0xd1, 0x5a, 0x20, 0xa8, 0xfc, 0xe1, 0x67, 0xb1, 0x61, 0x20, + 0xa2, 0xc6, 0x8a, 0xce, 0x52, 0x28, 0xaa, 0x3f, 0xc8, 0x3f, 0x10, 0xcc, 0xdf, 0x77, 0x07, 0xe3, + 0xa8, 0x4b, 0xbc, 0x42, 0x76, 0x8e, 0x8f, 0xbe, 0x59, 0x3d, 0xfa, 0x62, 0xb0, 0xfc, 0x80, 0x8d, + 0x44, 0x64, 0x65, 0xa8, 0xf8, 0x26, 0x3f, 0x80, 0x05, 0xad, 0xa6, 0x72, 0xc6, 0x9b, 0x30, 0xf7, + 0xb1, 0xc0, 0x4c, 0x79, 0x24, 0x92, 0xa4, 0xaa, 0x00, 0x69, 0xb2, 0xf8, 0xdb, 0xab, 0x3e, 0x8d, + 0x5c, 0x87, 0x9c, 0x24, 0xc7, 0x27, 0xcc, 0x4b, 0xb8, 0x7c, 0xcd, 0xe0, 0xb0, 0xba, 0x51, 0x13, + 0xc8, 0x49, 0x46, 0xca, 0x65, 0xc2, 0xab, 0x12, 0x43, 0xd5, 0xef, 0xd9, 0xd3, 0x50, 0x08, 0x1f, + 0x4e, 0x71, 0x11, 0xe6, 0xae, 0xde, 0xa1, 0xdf, 0xbf, 0x42, 0x37, 0x97, 0x52, 0xb8, 0x04, 0xf9, + 0xe6, 0x95, 0x8d, 0x1b, 0x02, 0x42, 0xeb, 0xff, 0xb2, 0x74, 0x4d, 0xf0, 0xf0, 0xb7, 0x21, 0x2b, + 0x13, 0xfd, 0x68, 0x24, 0xbf, 0xf9, 0xfc, 0x59, 0x39, 0x76, 0x00, 0xaf, 0xae, 0xe5, 0xa9, 0x37, + 0x11, 0xbe, 0x0d, 0x45, 0x81, 0x54, 0x4f, 0x2c, 0x27, 0x92, 0x2f, 0x1d, 0x31, 0x4e, 0x27, 0x0f, + 0x59, 0x35, 0xf8, 0x5d, 0x82, 0xac, 0xa8, 0xb5, 0xa6, 0x34, 0xe6, 0x13, 0x99, 0x29, 0x4d, 0xec, + 0xd1, 0x89, 0xa4, 0xf0, 0x3b, 0x60, 0xf1, 0x79, 0x00, 0x1b, 0xed, 0xc0, 0x78, 0x19, 0xa9, 0x1c, + 0x4d, 0xa2, 0x8d, 0x63, 0xdf, 0x0d, 0x1f, 0x78, 0x8e, 0x25, 0x07, 0x59, 0xbd, 0x7d, 0xe5, 0xe0, + 0x42, 0x78, 0xf2, 0x1d, 0xf9, 0xd2, 0xa1, 0x27, 0x11, 0x7c, 0x32, 0x7e, 0x54, 0x62, 0x70, 0xa9, + 0x54, 0x0f, 0x5b, 0x0e, 0x19, 0xde, 0x84, 0xa2, 0x31, 0x05, 0x98, 0x66, 0x3d, 0x38, 0xc2, 0x98, + 0x66, 0x9d, 0x32, 0x3a, 0x90, 0x14, 0xbe, 0x06, 0x79, 0xde, 0x44, 0x79, 0x2d, 0xc1, 0xc7, 0x93, + 0xbd, 0xd2, 0xa8, 0x91, 0x95, 0x13, 0xd3, 0x17, 0x43, 0x46, 0x57, 0x61, 0x31, 0xec, 0xc6, 0x2a, + 0x68, 0x8f, 0x25, 0xa3, 0x7e, 0x8a, 0xbd, 0xe2, 0x99, 0x43, 0x52, 0xeb, 0x1f, 0x41, 0x5e, 0x0f, + 0xbe, 0xf8, 0x03, 0x58, 0x88, 0x8f, 0x7d, 0xf8, 0xff, 0x0c, 0xf3, 0xc4, 0xa7, 0xe9, 0xca, 0xaa, + 0xb1, 0x34, 0x7d, 0x56, 0x4c, 0xad, 0xa1, 0xf5, 0x8f, 0xf4, 0x7f, 0x6c, 0x36, 0xed, 0xc0, 0xc6, + 0x77, 0x60, 0x41, 0x68, 0x1f, 0xfe, 0x4b, 0x27, 0x16, 0xa5, 0x07, 0xfe, 0x7f, 0x14, 0x8b, 0xd2, + 0x83, 0xff, 0x47, 0x22, 0xa9, 0xe6, 0x87, 0x4f, 0x9f, 0x55, 0x53, 0x9f, 0x3f, 0xab, 0xa6, 0xbe, + 0x78, 0x56, 0x45, 0x3f, 0xde, 0xab, 0xa2, 0x5f, 0xed, 0x55, 0xd1, 0x93, 0xbd, 0x2a, 0x7a, 0xba, + 0x57, 0x45, 0x7f, 0xdf, 0xab, 0xa2, 0x7f, 0xee, 0x55, 0x53, 0x5f, 0xec, 0x55, 0xd1, 0x67, 0xcf, + 0xab, 0xa9, 0xa7, 0xcf, 0xab, 0xa9, 0xcf, 0x9f, 0x57, 0x53, 0x1f, 0xbe, 0xfe, 0x65, 0x0f, 0x5a, + 0xfa, 0xc4, 0x9d, 0x9c, 0xf8, 0x79, 0xeb, 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x4e, 0x23, 0x5c, + 0x1e, 0x70, 0x1b, 0x00, 0x00, } func (x Direction) String() string { @@ -3997,6 +4006,9 @@ func (this *VolumeRequest) Equal(that interface{}) bool { if this.Limit != that1.Limit { return false } + if this.Step != that1.Step { + return false + } return true } func (this *VolumeResponse) Equal(that interface{}) bool { @@ -4595,12 +4607,13 @@ func (this *VolumeRequest) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 8) + s := make([]string, 0, 9) s = append(s, "&logproto.VolumeRequest{") s = append(s, "From: "+fmt.Sprintf("%#v", this.From)+",\n") s = append(s, "Through: "+fmt.Sprintf("%#v", this.Through)+",\n") s = append(s, "Matchers: "+fmt.Sprintf("%#v", this.Matchers)+",\n") s = append(s, "Limit: "+fmt.Sprintf("%#v", this.Limit)+",\n") + s = append(s, "Step: "+fmt.Sprintf("%#v", this.Step)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -7104,6 +7117,11 @@ func (m *VolumeRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.Step != 0 { + i = encodeVarintLogproto(dAtA, i, uint64(m.Step)) + i-- + dAtA[i] = 0x28 + } if m.Limit != 0 { i = encodeVarintLogproto(dAtA, i, uint64(m.Limit)) i-- @@ -8026,6 +8044,9 @@ func (m *VolumeRequest) Size() (n int) { if m.Limit != 0 { n += 1 + sovLogproto(uint64(m.Limit)) } + if m.Step != 0 { + n += 1 + sovLogproto(uint64(m.Step)) + } return n } @@ -8633,6 +8654,7 @@ func (this *VolumeRequest) String() string { `Through:` + fmt.Sprintf("%v", this.Through) + `,`, `Matchers:` + fmt.Sprintf("%v", this.Matchers) + `,`, `Limit:` + fmt.Sprintf("%v", this.Limit) + `,`, + `Step:` + fmt.Sprintf("%v", this.Step) + `,`, `}`, }, "") return s @@ -14123,6 +14145,25 @@ func (m *VolumeRequest) Unmarshal(dAtA []byte) error { break } } + case 5: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Step", wireType) + } + m.Step = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Step |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipLogproto(dAtA[iNdEx:]) diff --git a/pkg/logproto/logproto.proto b/pkg/logproto/logproto.proto index f5ba5ceb01..614444d873 100644 --- a/pkg/logproto/logproto.proto +++ b/pkg/logproto/logproto.proto @@ -392,6 +392,7 @@ message VolumeRequest { ]; string matchers = 3; int32 limit = 4; + int64 step = 5; } message VolumeResponse { diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index ec1d165214..d9586603c5 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -415,9 +415,10 @@ func (t *Loki) initQuerier() (services.Service, error) { "/loki/api/v1/labels": labelsHTTPMiddleware.Wrap(http.HandlerFunc(t.querierAPI.LabelHandler)), "/loki/api/v1/label/{name}/values": labelsHTTPMiddleware.Wrap(http.HandlerFunc(t.querierAPI.LabelHandler)), - "/loki/api/v1/series": querier.WrapQuerySpanAndTimeout("query.Series", t.querierAPI).Wrap(http.HandlerFunc(t.querierAPI.SeriesHandler)), - "/loki/api/v1/index/stats": indexStatsHTTPMiddleware.Wrap(http.HandlerFunc(t.querierAPI.IndexStatsHandler)), - "/loki/api/v1/index/series_volume": querier.WrapQuerySpanAndTimeout("query.SeriesVolume", t.querierAPI).Wrap(http.HandlerFunc(t.querierAPI.SeriesVolumeHandler)), + "/loki/api/v1/series": querier.WrapQuerySpanAndTimeout("query.Series", t.querierAPI).Wrap(http.HandlerFunc(t.querierAPI.SeriesHandler)), + "/loki/api/v1/index/stats": indexStatsHTTPMiddleware.Wrap(http.HandlerFunc(t.querierAPI.IndexStatsHandler)), + "/loki/api/v1/index/series_volume": querier.WrapQuerySpanAndTimeout("query.SeriesVolumeInstant", t.querierAPI).Wrap(http.HandlerFunc(t.querierAPI.SeriesVolumeInstantHandler)), + "/loki/api/v1/index/series_volume_range": querier.WrapQuerySpanAndTimeout("query.SeriesVolumeRange", t.querierAPI).Wrap(http.HandlerFunc(t.querierAPI.SeriesVolumeRangeHandler)), "/api/prom/query": middleware.Merge( httpMiddleware, @@ -879,6 +880,7 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) { t.Server.HTTP.Path("/loki/api/v1/series").Methods("GET", "POST").Handler(frontendHandler) t.Server.HTTP.Path("/loki/api/v1/index/stats").Methods("GET", "POST").Handler(frontendHandler) t.Server.HTTP.Path("/loki/api/v1/index/series_volume").Methods("GET", "POST").Handler(frontendHandler) + t.Server.HTTP.Path("/loki/api/v1/index/series_volume_range").Methods("GET", "POST").Handler(frontendHandler) t.Server.HTTP.Path("/api/prom/query").Methods("GET", "POST").Handler(frontendHandler) t.Server.HTTP.Path("/api/prom/label").Methods("GET", "POST").Handler(frontendHandler) t.Server.HTTP.Path("/api/prom/label/{name}/values").Methods("GET", "POST").Handler(frontendHandler) diff --git a/pkg/querier/http.go b/pkg/querier/http.go index 81f6b26d9c..390d0e07e8 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -34,7 +34,6 @@ import ( util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/util/marshal" marshal_legacy "github.com/grafana/loki/pkg/util/marshal/legacy" - "github.com/grafana/loki/pkg/util/server" serverutil "github.com/grafana/loki/pkg/util/server" "github.com/grafana/loki/pkg/util/spanlogger" util_validation "github.com/grafana/loki/pkg/util/validation" @@ -224,7 +223,7 @@ func (q *QuerierAPI) LabelHandler(w http.ResponseWriter, r *http.Request) { status := 200 if err != nil { - status, _ = server.ClientHTTPStatusAndError(err) + status, _ = serverutil.ClientHTTPStatusAndError(err) } logql.RecordLabelQueryMetrics(ctx, log, *req.Start, *req.End, req.Name, req.Query, strconv.Itoa(status), statResult) @@ -399,7 +398,7 @@ func (q *QuerierAPI) SeriesHandler(w http.ResponseWriter, r *http.Request) { status := 200 if err != nil { - status, _ = server.ClientHTTPStatusAndError(err) + status, _ = serverutil.ClientHTTPStatusAndError(err) } logql.RecordSeriesQueryMetrics(ctx, log, req.Start, req.End, req.Groups, strconv.Itoa(status), statResult) @@ -442,9 +441,13 @@ func (q *QuerierAPI) IndexStatsHandler(w http.ResponseWriter, r *http.Request) { } } -// SeriesVolumeHandler queries the index label volumes related to the passed matchers -func (q *QuerierAPI) SeriesVolumeHandler(w http.ResponseWriter, r *http.Request) { - rawReq, err := loghttp.ParseSeriesVolumeQuery(r) +//TODO(trevorwhitney): add test for the handler split + +// SeriesVolumeRangeHandler queries the index label volumes related to the passed matchers and given time range. +// Returns N values where N is the time range / step. +func (q *QuerierAPI) SeriesVolumeRangeHandler(w http.ResponseWriter, r *http.Request) { + rawReq, err := loghttp.ParseSeriesVolumeRangeQuery(r) + if err != nil { serverutil.WriteError(httpgrpc.Errorf(http.StatusBadRequest, err.Error()), w) return @@ -454,10 +457,36 @@ func (q *QuerierAPI) SeriesVolumeHandler(w http.ResponseWriter, r *http.Request) From: model.TimeFromUnixNano(rawReq.Start.UnixNano()), Through: model.TimeFromUnixNano(rawReq.End.UnixNano()), Matchers: rawReq.Query, + Step: rawReq.Step.Milliseconds(), Limit: int32(rawReq.Limit), } - resp, err := q.querier.SeriesVolume(r.Context(), req) + q.seriesVolumeHandler(r.Context(), req, w) +} + +// SeriesVolumeInstantHandler queries the index label volumes related to the passed matchers and given time range. +// Returns a single value for the time range. +func (q *QuerierAPI) SeriesVolumeInstantHandler(w http.ResponseWriter, r *http.Request) { + rawReq, err := loghttp.ParseSeriesVolumeInstantQuery(r) + + if err != nil { + serverutil.WriteError(httpgrpc.Errorf(http.StatusBadRequest, err.Error()), w) + return + } + + req := &logproto.VolumeRequest{ + From: model.TimeFromUnixNano(rawReq.Start.UnixNano()), + Through: model.TimeFromUnixNano(rawReq.End.UnixNano()), + Matchers: rawReq.Query, + Step: 0, + Limit: int32(rawReq.Limit), + } + + q.seriesVolumeHandler(r.Context(), req, w) +} + +func (q *QuerierAPI) seriesVolumeHandler(ctx context.Context, req *logproto.VolumeRequest, w http.ResponseWriter) { + resp, err := q.querier.SeriesVolume(ctx, req) if err != nil { serverutil.WriteError(err, w) return diff --git a/pkg/querier/http_test.go b/pkg/querier/http_test.go index d790a1f69f..2dc9288a82 100644 --- a/pkg/querier/http_test.go +++ b/pkg/querier/http_test.go @@ -199,79 +199,146 @@ func TestQueryWrapperMiddleware(t *testing.T) { } func TestSeriesVolumeHandler(t *testing.T) { - t.Run("it returns label volumes from the querier", func(t *testing.T) { - ret := &logproto.VolumeResponse{ - Volumes: []logproto.Volume{ - {Name: `{foo="bar"}`, Volume: 38}, - }, - } - - querier := newQuerierMock() - querier.On("SeriesVolume", mock.Anything, mock.Anything).Return(ret, nil) + ret := &logproto.VolumeResponse{ + Volumes: []logproto.Volume{ + {Name: `{foo="bar"}`, Volume: 38}, + }, + } - api := NewQuerierAPI(Config{}, querier, nil, log.NewNopLogger()) + setupAPI := func(querier *querierMock) *QuerierAPI { + return NewQuerierAPI(Config{}, querier, nil, log.NewNopLogger()) + } - req := httptest.NewRequest(http.MethodGet, "/series_volume?start=0&end=1&query=%7Bfoo%3D%22bar%22%7D", nil) + makeRequest := func(handler http.HandlerFunc, req *http.Request) *httptest.ResponseRecorder { err := req.ParseForm() require.NoError(t, err) w := httptest.NewRecorder() - api.SeriesVolumeHandler(w, req) + handler(w, req) + return w + } - calls := querier.GetMockedCallsByMethod("SeriesVolume") - require.Len(t, calls, 1) - require.Equal(t, &logproto.VolumeRequest{ - From: 0, - Through: 1000, - Matchers: `{foo="bar"}`, - Limit: 100, - }, calls[0].Arguments[1]) - - require.Equal( - t, - `{"volumes":[{"name":"{foo=\"bar\"}","volume":38}]}`, - strings.TrimSpace(w.Body.String()), - ) - require.Equal(t, http.StatusOK, w.Result().StatusCode) + t.Run("shared beavhior between range and instant queries", func(t *testing.T) { + for _, tc := range []struct { + mode string + handler func(api *QuerierAPI) http.HandlerFunc + }{ + {mode: "instant", handler: func(api *QuerierAPI) http.HandlerFunc { return api.SeriesVolumeInstantHandler }}, + {mode: "range", handler: func(api *QuerierAPI) http.HandlerFunc { return api.SeriesVolumeRangeHandler }}, + } { + t.Run(fmt.Sprintf("%s queries return label volumes from the querier", tc.mode), func(t *testing.T) { + querier := newQuerierMock() + querier.On("SeriesVolume", mock.Anything, mock.Anything).Return(ret, nil) + api := setupAPI(querier) + + req := httptest.NewRequest(http.MethodGet, "/series_volume"+ + "?start=0"+ + "&end=1"+ + "&query=%7Bfoo%3D%22bar%22%7D", nil) + + w := makeRequest(tc.handler(api), req) + + calls := querier.GetMockedCallsByMethod("SeriesVolume") + require.Len(t, calls, 1) + + request := calls[0].Arguments[1].(*logproto.VolumeRequest) + require.Equal(t, `{foo="bar"}`, request.Matchers) + + require.Equal( + t, + `{"volumes":[{"name":"{foo=\"bar\"}","volume":38}]}`, + strings.TrimSpace(w.Body.String()), + ) + require.Equal(t, http.StatusOK, w.Result().StatusCode) + }) + + t.Run(fmt.Sprintf("%s queries return nothing when a store doesn't support label volumes", tc.mode), func(t *testing.T) { + querier := newQuerierMock() + querier.On("SeriesVolume", mock.Anything, mock.Anything).Return(nil, nil) + api := setupAPI(querier) + + req := httptest.NewRequest(http.MethodGet, "/series_volume?start=0&end=1&query=%7Bfoo%3D%22bar%22%7D", nil) + w := makeRequest(tc.handler(api), req) + + calls := querier.GetMockedCallsByMethod("SeriesVolume") + require.Len(t, calls, 1) + + require.Equal(t, strings.TrimSpace(w.Body.String()), `{"volumes":[]}`) + require.Equal(t, http.StatusOK, w.Result().StatusCode) + }) + + t.Run(fmt.Sprintf("%s queries return error when there's an error in the querier", tc.mode), func(t *testing.T) { + err := errors.New("something bad") + querier := newQuerierMock() + querier.On("SeriesVolume", mock.Anything, mock.Anything).Return(nil, err) + + api := setupAPI(querier) + + req := httptest.NewRequest(http.MethodGet, "/series_volume?start=0&end=1&query=%7Bfoo%3D%22bar%22%7D", nil) + w := makeRequest(tc.handler(api), req) + + calls := querier.GetMockedCallsByMethod("SeriesVolume") + require.Len(t, calls, 1) + + require.Equal(t, strings.TrimSpace(w.Body.String()), `something bad`) + require.Equal(t, http.StatusInternalServerError, w.Result().StatusCode) + }) + } }) - t.Run("it returns nothing when a store doesn't support label volumes", func(t *testing.T) { + t.Run("instant queries set a step of 0", func(t *testing.T) { querier := newQuerierMock() - querier.On("SeriesVolume", mock.Anything, mock.Anything).Return(nil, nil) - - api := NewQuerierAPI(Config{}, querier, nil, log.NewNopLogger()) - - req := httptest.NewRequest(http.MethodGet, "/series_volume?start=0&end=1&query=%7Bfoo%3D%22bar%22%7D", nil) - err := req.ParseForm() - require.NoError(t, err) + querier.On("SeriesVolume", mock.Anything, mock.Anything).Return(ret, nil) + api := setupAPI(querier) - w := httptest.NewRecorder() - api.SeriesVolumeHandler(w, req) + req := httptest.NewRequest(http.MethodGet, "/series_volume"+ + "?start=0"+ + "&end=1"+ + "&step=42"+ + "&query=%7Bfoo%3D%22bar%22%7D", nil) + makeRequest(api.SeriesVolumeInstantHandler, req) calls := querier.GetMockedCallsByMethod("SeriesVolume") require.Len(t, calls, 1) - require.Equal(t, strings.TrimSpace(w.Body.String()), `{"volumes":[]}`) - require.Equal(t, http.StatusOK, w.Result().StatusCode) + request := calls[0].Arguments[1].(*logproto.VolumeRequest) + require.Equal(t, int64(0), request.Step) }) - t.Run("it returns error when there's an error in the querier", func(t *testing.T) { + t.Run("range queries parse step from request", func(t *testing.T) { querier := newQuerierMock() - querier.On("SeriesVolume", mock.Anything, mock.Anything).Return(nil, errors.New("something bad")) + querier.On("SeriesVolume", mock.Anything, mock.Anything).Return(ret, nil) + api := setupAPI(querier) - api := NewQuerierAPI(Config{}, querier, nil, log.NewNopLogger()) + req := httptest.NewRequest(http.MethodGet, "/series_volume"+ + "?start=0"+ + "&end=1"+ + "&step=42"+ + "&query=%7Bfoo%3D%22bar%22%7D", nil) + makeRequest(api.SeriesVolumeRangeHandler, req) - req := httptest.NewRequest(http.MethodGet, "/series_volume?start=0&end=1&query=%7Bfoo%3D%22bar%22%7D", nil) - err := req.ParseForm() - require.NoError(t, err) + calls := querier.GetMockedCallsByMethod("SeriesVolume") + require.Len(t, calls, 1) - w := httptest.NewRecorder() - api.SeriesVolumeHandler(w, req) + request := calls[0].Arguments[1].(*logproto.VolumeRequest) + require.Equal(t, (42 * time.Second).Milliseconds(), request.Step) + }) + + t.Run("range queries provide default step when not provided", func(t *testing.T) { + querier := newQuerierMock() + querier.On("SeriesVolume", mock.Anything, mock.Anything).Return(ret, nil) + api := setupAPI(querier) + + req := httptest.NewRequest(http.MethodGet, "/series_volume"+ + "?start=0"+ + "&end=1"+ + "&query=%7Bfoo%3D%22bar%22%7D", nil) + makeRequest(api.SeriesVolumeRangeHandler, req) calls := querier.GetMockedCallsByMethod("SeriesVolume") require.Len(t, calls, 1) - require.Equal(t, strings.TrimSpace(w.Body.String()), `something bad`) - require.Equal(t, http.StatusInternalServerError, w.Result().StatusCode) + request := calls[0].Arguments[1].(*logproto.VolumeRequest) + require.Equal(t, time.Second.Milliseconds(), request.Step) }) } diff --git a/pkg/querier/queryrange/codec.go b/pkg/querier/queryrange/codec.go index fe430d8b03..bd4a3b850d 100644 --- a/pkg/querier/queryrange/codec.go +++ b/pkg/querier/queryrange/codec.go @@ -272,7 +272,7 @@ func (Codec) DecodeRequest(_ context.Context, r *http.Request, _ []string) (quer Matchers: req.Query, }, err case SeriesVolumeOp: - req, err := loghttp.ParseSeriesVolumeQuery(r) + req, err := loghttp.ParseSeriesVolumeInstantQuery(r) if err != nil { return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } @@ -282,6 +282,20 @@ func (Codec) DecodeRequest(_ context.Context, r *http.Request, _ []string) (quer Through: through, Matchers: req.Query, Limit: int32(req.Limit), + Step: 0, + }, err + case SeriesVolumeRangeOp: + req, err := loghttp.ParseSeriesVolumeRangeQuery(r) + if err != nil { + return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + } + from, through := util.RoundToMilliseconds(req.Start, req.End) + return &logproto.VolumeRequest{ + From: from, + Through: through, + Matchers: req.Query, + Limit: int32(req.Limit), + Step: req.Step.Milliseconds(), }, err default: return nil, httpgrpc.Errorf(http.StatusBadRequest, fmt.Sprintf("unknown request path: %s", r.URL.Path)) @@ -421,9 +435,19 @@ func (Codec) EncodeRequest(ctx context.Context, r queryrangebase.Request) (*http "query": []string{request.GetQuery()}, "limit": []string{fmt.Sprintf("%d", request.Limit)}, } - u := &url.URL{ - Path: "/loki/api/v1/index/series_volume", - RawQuery: params.Encode(), + + var u *url.URL + if request.Step != 0 { + params["step"] = []string{fmt.Sprintf("%f", float64(request.Step)/float64(1e3))} + u = &url.URL{ + Path: "/loki/api/v1/index/series_volume_range", + RawQuery: params.Encode(), + } + } else { + u = &url.URL{ + Path: "/loki/api/v1/index/series_volume", + RawQuery: params.Encode(), + } } req := &http.Request{ Method: "GET", diff --git a/pkg/querier/queryrange/codec_test.go b/pkg/querier/queryrange/codec_test.go index a10c9ee9c0..c6fed1d9a0 100644 --- a/pkg/querier/queryrange/codec_test.go +++ b/pkg/querier/queryrange/codec_test.go @@ -107,12 +107,14 @@ func Test_codec_DecodeRequest(t *testing.T) { Through: model.TimeFromUnixNano(end.UnixNano()), Matchers: `{job="foo"}`, Limit: 3, + Step: 0, }) }, &logproto.VolumeRequest{ From: model.TimeFromUnixNano(start.UnixNano()), Through: model.TimeFromUnixNano(end.UnixNano()), Matchers: `{job="foo"}`, Limit: 3, + Step: 0, }, false}, {"series_volume_default_limit", func() (*http.Request, error) { return LokiCodec.EncodeRequest(context.Background(), &logproto.VolumeRequest{ @@ -125,6 +127,36 @@ func Test_codec_DecodeRequest(t *testing.T) { Through: model.TimeFromUnixNano(end.UnixNano()), Matchers: `{job="foo"}`, Limit: 100, + Step: 0, + }, false}, + {"series_volume_range", func() (*http.Request, error) { + return LokiCodec.EncodeRequest(context.Background(), &logproto.VolumeRequest{ + From: model.TimeFromUnixNano(start.UnixNano()), + Through: model.TimeFromUnixNano(end.UnixNano()), + Matchers: `{job="foo"}`, + Limit: 3, + Step: 30 * 1e3, + }) + }, &logproto.VolumeRequest{ + From: model.TimeFromUnixNano(start.UnixNano()), + Through: model.TimeFromUnixNano(end.UnixNano()), + Matchers: `{job="foo"}`, + Limit: 3, + Step: 30 * 1e3, // step is expected in ms + }, false}, + {"series_volume_range_default_limit", func() (*http.Request, error) { + return LokiCodec.EncodeRequest(context.Background(), &logproto.VolumeRequest{ + From: model.TimeFromUnixNano(start.UnixNano()), + Through: model.TimeFromUnixNano(end.UnixNano()), + Matchers: `{job="foo"}`, + Step: 30 * 1e3, // step is expected in ms + }) + }, &logproto.VolumeRequest{ + From: model.TimeFromUnixNano(start.UnixNano()), + Through: model.TimeFromUnixNano(end.UnixNano()), + Matchers: `{job="foo"}`, + Limit: 100, + Step: 30 * 1e3, // step is expected in ms; default is 0 or no step }, false}, } for _, tt := range tests { @@ -436,6 +468,24 @@ func Test_codec_index_stats_EncodeRequest(t *testing.T) { require.Equal(t, `{job="foo"}`, got.URL.Query().Get("query")) } +func Test_codec_seriesVolume_EncodeRequest(t *testing.T) { + from, through := util.RoundToMilliseconds(start, end) + toEncode := &logproto.VolumeRequest{ + From: from, + Through: through, + Matchers: `{job="foo"}`, + Limit: 20, + Step: 30 * 1e6, + } + got, err := LokiCodec.EncodeRequest(context.Background(), toEncode) + require.Nil(t, err) + require.Equal(t, fmt.Sprintf("%d", from.UnixNano()), got.URL.Query().Get("start")) + require.Equal(t, fmt.Sprintf("%d", through.UnixNano()), got.URL.Query().Get("end")) + require.Equal(t, `{job="foo"}`, got.URL.Query().Get("query")) + require.Equal(t, "20", got.URL.Query().Get("limit")) + require.Equal(t, fmt.Sprintf("%f", float64(toEncode.Step/1e3)), got.URL.Query().Get("step")) +} + func Test_codec_EncodeResponse(t *testing.T) { tests := []struct { name string diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index a68325958d..a91089d7cc 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -7,9 +7,6 @@ import ( "strings" "time" - "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase/definitions" - "github.com/weaveworks/common/user" "github.com/go-kit/log" @@ -171,37 +168,37 @@ func NewTripperware( return nil, nil, err } - labelVolumeTripperware, err := NewSeriesVolumeTripperware(cfg, log, limits, schema, LokiCodec, statsCache, cacheGenNumLoader, retentionEnabled, metrics) + seriesVolumeTripperware, err := NewSeriesVolumeTripperware(cfg, log, limits, schema, LokiCodec, statsCache, cacheGenNumLoader, retentionEnabled, metrics) if err != nil { return nil, nil, err } return func(next http.RoundTripper) http.RoundTripper { var ( - metricRT = metricsTripperware(next) - limitedRT = limitedTripperware(next) - logFilterRT = logFilterTripperware(next) - seriesRT = seriesTripperware(next) - labelsRT = labelsTripperware(next) - instantRT = instantMetricTripperware(next) - statsRT = indexStatsTripperware(next) - labelVolumeRT = labelVolumeTripperware(next) + metricRT = metricsTripperware(next) + limitedRT = limitedTripperware(next) + logFilterRT = logFilterTripperware(next) + seriesRT = seriesTripperware(next) + labelsRT = labelsTripperware(next) + instantRT = instantMetricTripperware(next) + statsRT = indexStatsTripperware(next) + seriesVolumeRT = seriesVolumeTripperware(next) ) - return newRoundTripper(log, next, limitedRT, logFilterRT, metricRT, seriesRT, labelsRT, instantRT, statsRT, labelVolumeRT, limits) + return newRoundTripper(log, next, limitedRT, logFilterRT, metricRT, seriesRT, labelsRT, instantRT, statsRT, seriesVolumeRT, limits) }, StopperWrapper{resultsCache, statsCache}, nil } type roundTripper struct { logger log.Logger - next, limited, log, metric, series, labels, instantMetric, indexStats, labelVolume http.RoundTripper + next, limited, log, metric, series, labels, instantMetric, indexStats, seriesVolume http.RoundTripper limits Limits } // newRoundTripper creates a new queryrange roundtripper -func newRoundTripper(logger log.Logger, next, limited, log, metric, series, labels, instantMetric, indexStats, labelVolume http.RoundTripper, limits Limits) roundTripper { +func newRoundTripper(logger log.Logger, next, limited, log, metric, series, labels, instantMetric, indexStats, seriesVolume http.RoundTripper, limits Limits) roundTripper { return roundTripper{ logger: logger, limited: limited, @@ -212,7 +209,7 @@ func newRoundTripper(logger log.Logger, next, limited, log, metric, series, labe labels: labels, instantMetric: instantMetric, indexStats: indexStats, - labelVolume: labelVolume, + seriesVolume: seriesVolume, next: next, } } @@ -321,13 +318,32 @@ func (r roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { return r.indexStats.RoundTrip(req) case SeriesVolumeOp: - volumeQuery, err := loghttp.ParseSeriesVolumeQuery(req) + volumeQuery, err := loghttp.ParseSeriesVolumeInstantQuery(req) + if err != nil { + return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + } + level.Info(logger).Log( + "msg", "executing query", + "type", "series_volume", + "query", volumeQuery.Query, + "length", volumeQuery.Start.Sub(volumeQuery.End), + "limit", volumeQuery.Limit) + + return r.seriesVolume.RoundTrip(req) + case SeriesVolumeRangeOp: + volumeQuery, err := loghttp.ParseSeriesVolumeRangeQuery(req) if err != nil { return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } - level.Info(logger).Log("msg", "executing query", "type", "series_volume", "query", volumeQuery.Query, "length", volumeQuery.End.Sub(volumeQuery.Start), "limit", volumeQuery.Limit) + level.Info(logger).Log( + "msg", "executing query", + "type", "series_volume_range", + "query", volumeQuery.Query, + "length", volumeQuery.End.Sub(volumeQuery.Start), + "step", volumeQuery.Step, + "limit", volumeQuery.Limit) - return r.labelVolume.RoundTrip(req) + return r.seriesVolume.RoundTrip(req) default: return r.next.RoundTrip(req) } @@ -353,12 +369,13 @@ func transformRegexQuery(req *http.Request, expr syntax.LogSelectorExpr) (syntax } const ( - InstantQueryOp = "instant_query" - QueryRangeOp = "query_range" - SeriesOp = "series" - LabelNamesOp = "labels" - IndexStatsOp = "index_stats" - SeriesVolumeOp = "series_volume" + InstantQueryOp = "instant_query" + QueryRangeOp = "query_range" + SeriesOp = "series" + LabelNamesOp = "labels" + IndexStatsOp = "index_stats" + SeriesVolumeOp = "series_volume" + SeriesVolumeRangeOp = "series_volume_range" ) func getOperation(path string) string { @@ -375,6 +392,8 @@ func getOperation(path string) string { return IndexStatsOp case path == "/loki/api/v1/index/series_volume": return SeriesVolumeOp + case path == "/loki/api/v1/index/series_volume_range": + return SeriesVolumeRangeOp default: return "" } @@ -782,63 +801,24 @@ func volumeRangeTripperware(codec queryrangebase.Codec, nextTW queryrangebase.Tr return nil, err } - resp, err := nextRT.RoundTrip(r) - if err != nil { - return nil, err + seriesVolumeMiddlewares := []queryrangebase.Middleware{ + NewSeriesVolumeMiddleware(), } - response, err := codec.DecodeResponse(r.Context(), resp, request) + // wrap nextRT with our new middleware + response, err := queryrangebase.MergeMiddlewares( + seriesVolumeMiddlewares..., + ).Wrap( + SeriesVolumeDownstreamHandler(nextRT, codec), + ).Do(r.Context(), request) + if err != nil { return nil, err } - promResp := toPrometheusResponse(response.(*VolumeResponse), model.Time(request.GetEnd())) - return codec.EncodeResponse(r.Context(), promResp) - }) - } -} - -func toPrometheusResponse(resp *VolumeResponse, ts model.Time) *LokiPromResponse { - headers := make([]*definitions.PrometheusResponseHeader, len(resp.Headers)) - for i, header := range resp.Headers { - h := header - headers[i] = &h - } - - promResponse := queryrangebase.PrometheusResponse{ - Status: loghttp.QueryStatusSuccess, - Data: toPrometheusSamples(resp, ts), - Headers: headers, - } - - return &LokiPromResponse{ - Response: &promResponse, - Statistics: stats.Result{}, - } -} - -func toPrometheusSamples(r *VolumeResponse, ts model.Time) queryrangebase.PrometheusData { - result := make([]queryrangebase.SampleStream, 0, len(r.Response.Volumes)) - for _, volume := range r.Response.Volumes { - lbls, err := syntax.ParseLabels(volume.Name) - if err != nil { - continue - } - - result = append(result, queryrangebase.SampleStream{ - Labels: logproto.FromLabelsToLabelAdapters(lbls), - Samples: []logproto.LegacySample{ - { - Value: float64(volume.Volume), - TimestampMs: ts.UnixNano() / 1e6, - }}, + return codec.EncodeResponse(r.Context(), response) }) } - - return queryrangebase.PrometheusData{ - ResultType: loghttp.ResultTypeVector, - Result: result, - } } func volumeFeatureFlagRoundTripper(nextTW queryrangebase.Tripperware, limits Limits) func(next http.RoundTripper) http.RoundTripper { diff --git a/pkg/querier/queryrange/roundtrip_test.go b/pkg/querier/queryrange/roundtrip_test.go index 0d3294aa59..8eb8b61ee1 100644 --- a/pkg/querier/queryrange/roundtrip_test.go +++ b/pkg/querier/queryrange/roundtrip_test.go @@ -9,6 +9,7 @@ import ( "net/http" "net/http/httptest" "net/url" + "sort" "strconv" "sync" "testing" @@ -32,6 +33,7 @@ import ( "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" "github.com/grafana/loki/pkg/storage/chunk/cache" "github.com/grafana/loki/pkg/storage/config" + "github.com/grafana/loki/pkg/util" util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/util/marshal" "github.com/grafana/loki/pkg/util/validation" @@ -550,71 +552,172 @@ func TestIndexStatsTripperware(t *testing.T) { } func TestSeriesVolumeTripperware(t *testing.T) { - tpw, stopper, err := NewTripperware(testConfig, testEngineOpts, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, volumeEnabled: true}, config.SchemaConfig{Configs: testSchemas}, nil, false, nil) - if stopper != nil { - defer stopper.Stop() - } - require.NoError(t, err) + t.Run("instant queries hardcode step to 0 and return a prometheus style vector response", func(t *testing.T) { + tpw, stopper, err := NewTripperware(testConfig, testEngineOpts, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, volumeEnabled: true}, config.SchemaConfig{Configs: testSchemas}, nil, false, nil) + if stopper != nil { + defer stopper.Stop() + } + require.NoError(t, err) + + rt, err := newfakeRoundTripper() + require.NoError(t, err) + defer rt.Close() + + lreq := &logproto.VolumeRequest{ + Matchers: `{job="varlogs"}`, + From: model.TimeFromUnixNano(testTime.Add(-25 * time.Hour).UnixNano()), // bigger than split by interval limit + Through: model.TimeFromUnixNano(testTime.UnixNano()), + Limit: 10, + Step: 42, // this should be ignored and set to 0 + } - rt, err := newfakeRoundTripper() - require.NoError(t, err) - defer rt.Close() + ctx := user.InjectOrgID(context.Background(), "1") + req, err := LokiCodec.EncodeRequest(ctx, lreq) + require.NoError(t, err) - lreq := &logproto.VolumeRequest{ - Matchers: `{job="varlogs"}`, - From: model.TimeFromUnixNano(testTime.Add(-25 * time.Hour).UnixNano()), // bigger than split by interval limit - Through: model.TimeFromUnixNano(testTime.UnixNano()), - Limit: 10, - } + req = req.WithContext(ctx) + err = user.InjectOrgIDIntoHTTPRequest(ctx, req) + require.NoError(t, err) - ctx := user.InjectOrgID(context.Background(), "1") - req, err := LokiCodec.EncodeRequest(ctx, lreq) - require.NoError(t, err) + req.URL.Path = "/loki/api/v1/index/series_volume" - req = req.WithContext(ctx) - err = user.InjectOrgIDIntoHTTPRequest(ctx, req) - require.NoError(t, err) - - count, h := seriesVolumeResult(seriesVolume) - rt.setHandler(h) + count, h := seriesVolumeResult(seriesVolume) + rt.setHandler(h) - resp, err := tpw(rt).RoundTrip(req) - require.NoError(t, err) - require.Equal(t, 2, *count) // 2 queries from splitting + resp, err := tpw(rt).RoundTrip(req) + require.NoError(t, err) + require.Equal(t, 2, *count) // 2 queries from splitting - volumeResp, err := LokiCodec.DecodeResponse(ctx, resp, nil) - require.NoError(t, err) + volumeResp, err := LokiCodec.DecodeResponse(ctx, resp, nil) + require.NoError(t, err) - expected := queryrangebase.PrometheusData{ - ResultType: loghttp.ResultTypeVector, - Result: []queryrangebase.SampleStream{ - { - Labels: []logproto.LabelAdapter{{ - Name: "bar", - Value: "baz", - }}, - Samples: []logproto.LegacySample{{ - Value: 6700, - TimestampMs: testTime.Unix() * 1e3, - }}, + expected := queryrangebase.PrometheusData{ + ResultType: loghttp.ResultTypeVector, + Result: []queryrangebase.SampleStream{ + { + Labels: []logproto.LabelAdapter{{ + Name: "bar", + Value: "baz", + }}, + Samples: []logproto.LegacySample{{ + Value: 6700, + TimestampMs: testTime.Unix() * 1e3, + }}, + }, + { + Labels: []logproto.LabelAdapter{{ + Name: "foo", + Value: "bar", + }}, + Samples: []logproto.LegacySample{{ + Value: 2048, + TimestampMs: testTime.Unix() * 1e3, + }}, + }, }, - { - Labels: []logproto.LabelAdapter{{ - Name: "foo", - Value: "bar", - }}, - Samples: []logproto.LegacySample{{ - Value: 2048, - TimestampMs: testTime.Unix() * 1e3, - }}, + } + + res, ok := volumeResp.(*LokiPromResponse) + require.Equal(t, true, ok) + require.Equal(t, "success", res.Response.Status) + require.Equal(t, expected, res.Response.Data) + }) + + t.Run("range queries return a prometheus style metrics response, putting volumes in buckets based on the step", func(t *testing.T) { + tpw, stopper, err := NewTripperware(testConfig, testEngineOpts, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, volumeEnabled: true}, config.SchemaConfig{Configs: testSchemas}, nil, false, nil) + if stopper != nil { + defer stopper.Stop() + } + require.NoError(t, err) + + rt, err := newfakeRoundTripper() + require.NoError(t, err) + defer rt.Close() + + start := testTime.Add(-5 * time.Hour) + end := testTime + + lreq := &logproto.VolumeRequest{ + Matchers: `{job="varlogs"}`, + From: model.TimeFromUnixNano(start.UnixNano()), // bigger than split by interval limit + Through: model.TimeFromUnixNano(end.UnixNano()), + Step: time.Hour.Milliseconds(), + Limit: 10, + } + + ctx := user.InjectOrgID(context.Background(), "1") + req, err := LokiCodec.EncodeRequest(ctx, lreq) + require.NoError(t, err) + + req = req.WithContext(ctx) + err = user.InjectOrgIDIntoHTTPRequest(ctx, req) + require.NoError(t, err) + + req.URL.Path = "/loki/api/v1/index/series_volume_range" + + count, h := seriesVolumeResult(seriesVolume) + rt.setHandler(h) + + resp, err := tpw(rt).RoundTrip(req) + require.NoError(t, err) + + /* + testTime is 2019-12-02T6:10:10Z + so with a 1 hour split, we expect 6 queries to be made: + 6:10 -> 7, 7 -> 8, 8 -> 9, 9 -> 10, 10 -> 11, 11 -> 11:10 + */ + require.Equal(t, 6, *count) // 6 queries from splitting into step buckets + + volumeResp, err := LokiCodec.DecodeResponse(ctx, resp, nil) + require.NoError(t, err) + + barBazExpectedSamples := []logproto.LegacySample{} + util.ForInterval(time.Hour, start, end, true, func(s, _ time.Time) { + barBazExpectedSamples = append(barBazExpectedSamples, logproto.LegacySample{ + Value: 3350, + TimestampMs: s.Unix() * 1e3, + }) + }) + sort.Slice(barBazExpectedSamples, func(i, j int) bool { + return barBazExpectedSamples[i].TimestampMs < barBazExpectedSamples[j].TimestampMs + }) + + fooBarExpectedSamples := []logproto.LegacySample{} + util.ForInterval(time.Hour, start, end, true, func(s, _ time.Time) { + fooBarExpectedSamples = append(fooBarExpectedSamples, logproto.LegacySample{ + Value: 1024, + TimestampMs: s.Unix() * 1e3, + }) + }) + sort.Slice(fooBarExpectedSamples, func(i, j int) bool { + return fooBarExpectedSamples[i].TimestampMs < fooBarExpectedSamples[j].TimestampMs + }) + + expected := queryrangebase.PrometheusData{ + ResultType: loghttp.ResultTypeMatrix, + Result: []queryrangebase.SampleStream{ + { + Labels: []logproto.LabelAdapter{{ + Name: "bar", + Value: "baz", + }}, + Samples: barBazExpectedSamples, + }, + { + Labels: []logproto.LabelAdapter{{ + Name: "foo", + Value: "bar", + }}, + Samples: fooBarExpectedSamples, + }, }, - }, - } + } - res, ok := volumeResp.(*LokiPromResponse) - require.Equal(t, true, ok) - require.Equal(t, "success", res.Response.Status) - require.Equal(t, expected, res.Response.Data) + res, ok := volumeResp.(*LokiPromResponse) + require.Equal(t, true, ok) + require.Equal(t, "success", res.Response.Status) + require.Equal(t, expected, res.Response.Data) + }) } func TestNewTripperware_Caches(t *testing.T) { diff --git a/pkg/querier/queryrange/series_volume.go b/pkg/querier/queryrange/series_volume.go new file mode 100644 index 0000000000..f5bd563250 --- /dev/null +++ b/pkg/querier/queryrange/series_volume.go @@ -0,0 +1,219 @@ +package queryrange + +import ( + "context" + "net/http" + "sort" + "time" + + "github.com/grafana/dskit/concurrency" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/user" + + "github.com/grafana/loki/pkg/loghttp" + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql/syntax" + "github.com/grafana/loki/pkg/logqlmodel/stats" + "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" + "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase/definitions" + "github.com/grafana/loki/pkg/util" +) + +func SeriesVolumeDownstreamHandler(nextRT http.RoundTripper, codec queryrangebase.Codec) queryrangebase.Handler { + return queryrangebase.HandlerFunc(func(ctx context.Context, req queryrangebase.Request) (queryrangebase.Response, error) { + request, err := codec.EncodeRequest(ctx, req) + if err != nil { + return nil, err + } + + if err := user.InjectOrgIDIntoHTTPRequest(ctx, request); err != nil { + return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + } + + resp, err := nextRT.RoundTrip(request) + if err != nil { + return nil, err + } + + return codec.DecodeResponse(ctx, resp, req) + }) +} + +func NewSeriesVolumeMiddleware() queryrangebase.Middleware { + return queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler { + return queryrangebase.HandlerFunc(func(ctx context.Context, req queryrangebase.Request) (queryrangebase.Response, error) { + volReq, ok := req.(*logproto.VolumeRequest) + + if !ok { + return next.Do(ctx, req) + } + + reqs := map[time.Time]queryrangebase.Request{} + startTS := volReq.From.Time() + endTS := volReq.Through.Time() + interval := time.Duration(volReq.Step * 1e6) + + util.ForInterval(interval, startTS, endTS, true, func(start, end time.Time) { + // Range query buckets are aligned to the starting timestamp + // Instant queries are for "this instant", which aligns to the end of the requested range + bucket := start + if interval == 0 { + bucket = end + } + + reqs[bucket] = &logproto.VolumeRequest{ + From: model.TimeFromUnix(start.Unix()), + Through: model.TimeFromUnix(end.Unix()), + Matchers: volReq.Matchers, + Limit: volReq.Limit, + Step: volReq.Step, + } + }) + + type f func(context.Context) (time.Time, definitions.Response, error) + var jobs []f + + for bucket, req := range reqs { + b, r := bucket, req + jobs = append(jobs, f(func(ctx context.Context) (time.Time, definitions.Response, error) { + resp, err := next.Do(ctx, r) + if err != nil { + return b, nil, err + } + + return b, resp, nil + })) + } + + collector := make(chan *bucketedVolumeResponse, len(jobs)) + err := concurrency.ForEachJob( + ctx, + len(jobs), + len(jobs), + func(ctx context.Context, i int) error { + bucket, resp, err := jobs[i](ctx) + if resp == nil { + collector <- nil + return err + } + + collector <- &bucketedVolumeResponse{ + bucket, resp.(*VolumeResponse), + } + return err + }) + close(collector) + + if err != nil { + return nil, err + } + + promResp := toPrometheusResponse(collector) + return promResp, nil + }) + }) +} + +type bucketedVolumeResponse struct { + bucket time.Time + response *VolumeResponse +} + +func toPrometheusResponse(respsCh chan *bucketedVolumeResponse) *LokiPromResponse { + var headers []*definitions.PrometheusResponseHeader + samplesByName := make(map[string][]logproto.LegacySample) + + for bucketedVolumeResponse := range respsCh { + if bucketedVolumeResponse == nil { + continue + } + + bucket, resp := bucketedVolumeResponse.bucket, bucketedVolumeResponse.response + + if headers == nil { + headers := make([]*definitions.PrometheusResponseHeader, len(resp.Headers)) + for i, header := range resp.Headers { + h := header + headers[i] = &h + } + } + + for _, volume := range resp.Response.Volumes { + if _, ok := samplesByName[volume.Name]; !ok { + samplesByName[volume.Name] = make([]logproto.LegacySample, 0, 1) + } + + samplesByName[volume.Name] = append(samplesByName[volume.Name], toPrometheusSample(volume, bucket)) + } + } + + promResponse := queryrangebase.PrometheusResponse{ + Status: loghttp.QueryStatusSuccess, + Data: toPrometheusData(samplesByName), + Headers: headers, + } + + return &LokiPromResponse{ + Response: &promResponse, + Statistics: stats.Result{}, + } +} + +func toPrometheusSample(volume logproto.Volume, t time.Time) logproto.LegacySample { + ts := model.TimeFromUnix(t.Unix()) + return logproto.LegacySample{ + Value: float64(volume.Volume), + TimestampMs: ts.UnixNano() / 1e6, + } +} + +type sortableSampleStream struct { + name string + labels labels.Labels + samples []logproto.LegacySample +} + +func toPrometheusData(series map[string][]logproto.LegacySample) queryrangebase.PrometheusData { + resultType := loghttp.ResultTypeVector + sortableResult := make([]sortableSampleStream, 0, len(series)) + + for name, samples := range series { + if resultType == loghttp.ResultTypeVector && len(samples) > 1 { + resultType = loghttp.ResultTypeMatrix + } + + lbls, err := syntax.ParseLabels(name) + if err != nil { + continue + } + + sort.Slice(samples, func(i, j int) bool { + return samples[i].TimestampMs < samples[j].TimestampMs + }) + + sortableResult = append(sortableResult, sortableSampleStream{ + name: name, + labels: lbls, + samples: samples, + }) + } + + sort.Slice(sortableResult, func(i, j int) bool { + return sortableResult[i].name < sortableResult[j].name + }) + + result := make([]queryrangebase.SampleStream, 0, len(sortableResult)) + for _, r := range sortableResult { + result = append(result, queryrangebase.SampleStream{ + Labels: logproto.FromLabelsToLabelAdapters(r.labels), + Samples: r.samples, + }) + } + + return queryrangebase.PrometheusData{ + ResultType: resultType, + Result: result, + } +} diff --git a/pkg/querier/queryrange/split_by_interval_test.go b/pkg/querier/queryrange/split_by_interval_test.go index 5de0c10095..2cebd06922 100644 --- a/pkg/querier/queryrange/split_by_interval_test.go +++ b/pkg/querier/queryrange/split_by_interval_test.go @@ -853,8 +853,7 @@ func Test_series_splitByInterval_Do(t *testing.T) { func Test_seriesvolume_splitByInterval_Do(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "1") - setup := func(next queryrangebase.Handler) queryrangebase.Handler { - l := WithSplitByLimits(fakeLimits{maxQueryParallelism: 1}, time.Hour) + setup := func(next queryrangebase.Handler, l Limits) queryrangebase.Handler { return SplitByIntervalMiddleware( testSchemas, l, @@ -879,7 +878,9 @@ func Test_seriesvolume_splitByInterval_Do(t *testing.T) { Headers: nil, }, nil }) - split := setup(next) + + l := WithSplitByLimits(fakeLimits{maxQueryParallelism: 1}, time.Hour) + split := setup(next, l) req := &logproto.VolumeRequest{ From: from, Through: through, @@ -917,7 +918,9 @@ func Test_seriesvolume_splitByInterval_Do(t *testing.T) { Headers: nil, }, nil }) - split := setup(next) + + l := WithSplitByLimits(fakeLimits{maxQueryParallelism: 1}, time.Hour) + split := setup(next, l) req := &logproto.VolumeRequest{ From: from, Through: through, @@ -937,6 +940,43 @@ func Test_seriesvolume_splitByInterval_Do(t *testing.T) { Limit: 1, }) }) + + // This will never happen because we hardcode 24h spit by for this code path + // in the middleware. However, that split by is not validated here, so we either + // need to support this case or error. + t.Run("series volumes with a query split by of 0", func(t *testing.T) { + from := model.TimeFromUnixNano(start.UnixNano()) + through := model.TimeFromUnixNano(end.UnixNano()) + next := queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { + return &VolumeResponse{ + Response: &logproto.VolumeResponse{ + Volumes: []logproto.Volume{ + {Name: `{foo="bar"}`, Volume: 38}, + {Name: `{bar="baz"}`, Volume: 28}, + }, + Limit: 2}, + Headers: nil, + }, nil + }) + + l := WithSplitByLimits(fakeLimits{maxQueryParallelism: 1}, 0) + split := setup(next, l) + req := &logproto.VolumeRequest{ + From: from, + Through: through, + Matchers: "{}", + Limit: 2, + } + + res, err := split.Do(ctx, req) + require.NoError(t, err) + + response := res.(*VolumeResponse) + + require.Len(t, response.Response.Volumes, 2) + require.Contains(t, response.Response.Volumes, logproto.Volume{Name: `{foo="bar"}`, Volume: 38}) + require.Contains(t, response.Response.Volumes, logproto.Volume{Name: `{bar="baz"}`, Volume: 28}) + }) } func Test_ExitEarly(t *testing.T) { diff --git a/pkg/storage/stores/tsdb/single_file_index_test.go b/pkg/storage/stores/tsdb/single_file_index_test.go index 8680dbfe8e..b69e105aed 100644 --- a/pkg/storage/stores/tsdb/single_file_index_test.go +++ b/pkg/storage/stores/tsdb/single_file_index_test.go @@ -527,6 +527,20 @@ func TestTSDBIndex_SeriesVolume(t *testing.T) { Limit: 10, }, acc.Volumes()) }) + + t.Run("only gets factor of stream size within time bounds", func(t *testing.T) { + matcher := labels.MustNewMatcher(labels.MatchEqual, "", "") + acc := seriesvolume.NewAccumulator(10) + err := tsdbIndex.SeriesVolume(context.Background(), "fake", from, through.Add(-30*time.Minute), acc, nil, nil, matcher) + require.NoError(t, err) + require.Equal(t, &logproto.VolumeResponse{ + Volumes: []logproto.Volume{ + {Name: `{fizz="fizz", foo="bar"}`, Volume: (29) * 1024}, + {Name: `{fizz="buzz", foo="bar"}`, Volume: (9) * 1024}, + }, + Limit: 10, + }, acc.Volumes()) + }) } type filterAll struct{} diff --git a/pkg/util/time.go b/pkg/util/time.go index 59fcc4d08f..c86d0d7494 100644 --- a/pkg/util/time.go +++ b/pkg/util/time.go @@ -92,6 +92,11 @@ func NewDisableableTicker(interval time.Duration) (func(), <-chan time.Time) { // except for the start time of first split and end time of last split which would be kept same as original start/end // When endTimeInclusive is true, it would keep a gap of 1ms between the splits. func ForInterval(interval time.Duration, start, end time.Time, endTimeInclusive bool, callback func(start, end time.Time)) { + if interval <= 0 { + callback(start, end) + return + } + ogStart := start startNs := start.UnixNano() start = time.Unix(0, startNs-startNs%interval.Nanoseconds()) diff --git a/pkg/util/time_test.go b/pkg/util/time_test.go index 7dc80d4163..f77530a5da 100644 --- a/pkg/util/time_test.go +++ b/pkg/util/time_test.go @@ -228,6 +228,27 @@ func TestForInterval(t *testing.T) { } } +func TestForInterval_OfZero(t *testing.T) { + var actualIntervals []timeInterval + + from := time.Unix(5, 0) + through := time.Unix(8, 0) + endTimeInclusive := true + ForInterval(0, from, through, endTimeInclusive, func(start, end time.Time) { + actualIntervals = append(actualIntervals, timeInterval{ + from: start, + through: end, + }) + }) + + require.Equal(t, []timeInterval{ + { + from: time.Unix(5, 0), + through: time.Unix(8, 0), + }, + }, actualIntervals) +} + func TestGetFactorOfTime(t *testing.T) { for _, tc := range []struct { desc string