Convert SeriesVolume response to prometheus-style (#9703)

Changes the response type of the label volume stats endpoint to
return volumes as prometheus-style timeseries metrics. It currently only
supports instant queries, but is a necessary step to eventually
supporting range queries.
pull/9761/head
Trevor Whitney 3 years ago committed by GitHub
parent d74c67a9d0
commit dbc3040739
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      pkg/ingester/ingester.go
  2. 2
      pkg/ingester/instance.go
  3. 6
      pkg/ingester/instance_test.go
  4. 351
      pkg/logproto/logproto.pb.go
  5. 8
      pkg/logproto/logproto.proto
  6. 25
      pkg/querier/http_test.go
  7. 72
      pkg/querier/queryrange/codec.go
  8. 206
      pkg/querier/queryrange/codec_test.go
  9. 39
      pkg/querier/queryrange/roundtrip_test.go
  10. 5
      pkg/querier/queryrange/split_by_interval.go
  11. 126
      pkg/querier/queryrange/split_by_interval_test.go
  12. 25
      pkg/storage/stores/index/seriesvolume/volume.go
  13. 361
      pkg/storage/stores/index/seriesvolume/volume_test.go
  14. 4
      pkg/storage/stores/tsdb/index_client.go
  15. 16
      pkg/storage/stores/tsdb/index_client_test.go
  16. 1
      pkg/storage/stores/tsdb/single_file_index.go
  17. 88
      pkg/storage/stores/tsdb/single_file_index_test.go

@ -1173,7 +1173,7 @@ func (i *Ingester) GetSeriesVolume(ctx context.Context, req *logproto.VolumeRequ
ctx,
len(jobs),
2,
func(ctx context.Context, idx int) error {
func(_ context.Context, idx int) error {
res, err := jobs[idx]()
resps[idx] = res
return err

@ -686,7 +686,7 @@ func (i *instance) GetSeriesVolume(ctx context.Context, req *logproto.VolumeRequ
return nil, err
}
res := seriesvolume.MapToSeriesVolumeResponse(volumes, int(req.Limit))
res := seriesvolume.MapToSeriesVolumeResponse(volumes, int(req.Limit), from, through)
return res, nil
}

@ -843,7 +843,7 @@ func TestInstance_SeriesVolume(t *testing.T) {
instance := defaultInstance(t)
volumes, err := instance.GetSeriesVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 11000,
Through: 1.1 * 1e3, //milliseconds
Matchers: "{}",
Limit: 2,
})
@ -859,7 +859,7 @@ func TestInstance_SeriesVolume(t *testing.T) {
instance := defaultInstance(t)
volumes, err := instance.GetSeriesVolume(context.Background(), &logproto.VolumeRequest{
From: 0,
Through: 11000,
Through: 1.1 * 1e3, //milliseconds
Matchers: `{log_stream="dispatcher"}`,
Limit: 2,
})
@ -874,7 +874,7 @@ func TestInstance_SeriesVolume(t *testing.T) {
instance := defaultInstance(t)
volumes, err := instance.GetSeriesVolume(context.Background(), &logproto.VolumeRequest{
From: 5,
Through: 11,
Through: 1.1 * 1e3, //milliseconds
Matchers: "{}",
Limit: 3,
})

@ -2339,8 +2339,10 @@ func (m *VolumeRequest) GetLimit() int32 {
}
type VolumeResponse struct {
Volumes []Volume `protobuf:"bytes,1,rep,name=volumes,proto3" json:"volumes"`
Limit int32 `protobuf:"varint,2,opt,name=limit,proto3" json:"limit,omitempty"`
Volumes []Volume `protobuf:"bytes,1,rep,name=volumes,proto3" json:"volumes"`
Limit int32 `protobuf:"varint,2,opt,name=limit,proto3" json:"limit,omitempty"`
From github_com_prometheus_common_model.Time `protobuf:"varint,3,opt,name=from,proto3,customtype=github.com/prometheus/common/model.Time" json:"from"`
Through github_com_prometheus_common_model.Time `protobuf:"varint,4,opt,name=through,proto3,customtype=github.com/prometheus/common/model.Time" json:"through"`
}
func (m *VolumeResponse) Reset() { *m = VolumeResponse{} }
@ -2501,145 +2503,146 @@ func init() {
func init() { proto.RegisterFile("pkg/logproto/logproto.proto", fileDescriptor_c28a5f14f1f4c79a) }
var fileDescriptor_c28a5f14f1f4c79a = []byte{
// 2205 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xd4, 0x19, 0x4d, 0x8f, 0x1b, 0x49,
0xd5, 0x65, 0xb7, 0xbf, 0x9e, 0xed, 0x19, 0xa7, 0xc6, 0x9b, 0x18, 0x27, 0xb1, 0x27, 0xad, 0x25,
0x19, 0x25, 0x59, 0x7b, 0x33, 0x0b, 0x4b, 0x36, 0x61, 0x41, 0xf1, 0xcc, 0x26, 0x3b, 0xf9, 0xde,
0x9a, 0x10, 0xd0, 0x8a, 0x55, 0xd4, 0x63, 0x97, 0x3f, 0x34, 0x6e, 0xb7, 0xd3, 0xdd, 0xde, 0xec,
0x48, 0x1c, 0xf8, 0x03, 0x2b, 0xed, 0x0d, 0x71, 0x41, 0x1c, 0x90, 0x40, 0x48, 0x5c, 0xf8, 0x01,
0xc0, 0x01, 0x89, 0x70, 0x0b, 0xb7, 0x15, 0x07, 0x43, 0x26, 0x17, 0x34, 0xa7, 0xfd, 0x03, 0x20,
0x54, 0x5f, 0xdd, 0xd5, 0x3d, 0xce, 0xee, 0x3a, 0x44, 0x42, 0xb9, 0x8c, 0xfb, 0xbd, 0x7a, 0xf5,
0xea, 0x7d, 0xbf, 0x7a, 0x35, 0x70, 0x7c, 0xb2, 0xdb, 0x6f, 0x8d, 0x9c, 0xfe, 0xc4, 0x75, 0x7c,
0x27, 0xf8, 0x68, 0xf2, 0xbf, 0x38, 0xa7, 0xe0, 0x5a, 0xa5, 0xef, 0xf4, 0x1d, 0x41, 0xc3, 0xbe,
0xc4, 0x7a, 0xad, 0xd1, 0x77, 0x9c, 0xfe, 0x88, 0xb6, 0x38, 0xb4, 0x33, 0xed, 0xb5, 0xfc, 0xa1,
0x4d, 0x3d, 0xdf, 0xb2, 0x27, 0x92, 0x60, 0x55, 0x72, 0x7f, 0x38, 0xb2, 0x9d, 0x2e, 0x1d, 0xb5,
0x3c, 0xdf, 0xf2, 0x3d, 0xf1, 0x57, 0x52, 0xac, 0x30, 0x8a, 0xc9, 0xd4, 0x1b, 0xf0, 0x3f, 0x02,
0x69, 0x56, 0x00, 0x6f, 0xfb, 0x2e, 0xb5, 0x6c, 0x62, 0xf9, 0xd4, 0x23, 0xf4, 0xe1, 0x94, 0x7a,
0xbe, 0x79, 0x0b, 0x56, 0x22, 0x58, 0x6f, 0xe2, 0x8c, 0x3d, 0x8a, 0xdf, 0x86, 0x82, 0x17, 0xa2,
0xab, 0x68, 0x35, 0xb5, 0x56, 0x58, 0xaf, 0x34, 0x03, 0x55, 0xc2, 0x3d, 0x44, 0x27, 0x34, 0x7f,
0x81, 0x00, 0xc2, 0x35, 0x5c, 0x07, 0x10, 0xab, 0xef, 0x5b, 0xde, 0xa0, 0x8a, 0x56, 0xd1, 0x9a,
0x41, 0x34, 0x0c, 0x3e, 0x0f, 0x47, 0x42, 0xe8, 0xb6, 0xb3, 0x3d, 0xb0, 0xdc, 0x6e, 0x35, 0xc9,
0xc9, 0x0e, 0x2f, 0x60, 0x0c, 0x86, 0x6b, 0xf9, 0xb4, 0x9a, 0x5a, 0x45, 0x6b, 0x29, 0xc2, 0xbf,
0xf1, 0x51, 0xc8, 0xf8, 0x74, 0x6c, 0x8d, 0xfd, 0xaa, 0xb1, 0x8a, 0xd6, 0xf2, 0x44, 0x42, 0x0c,
0xcf, 0x74, 0xa7, 0x5e, 0x35, 0xbd, 0x8a, 0xd6, 0x4a, 0x44, 0x42, 0xe6, 0x5f, 0x92, 0x50, 0xfc,
0x60, 0x4a, 0xdd, 0x3d, 0x69, 0x00, 0x5c, 0x83, 0x9c, 0x47, 0x47, 0xb4, 0xe3, 0x3b, 0x2e, 0x17,
0x30, 0x4f, 0x02, 0x18, 0x57, 0x20, 0x3d, 0x1a, 0xda, 0x43, 0x9f, 0x8b, 0x54, 0x22, 0x02, 0xc0,
0x97, 0x20, 0xed, 0xf9, 0x96, 0xeb, 0x73, 0x39, 0x0a, 0xeb, 0xb5, 0xa6, 0x70, 0x58, 0x53, 0x39,
0xac, 0x79, 0x4f, 0x39, 0xac, 0x9d, 0x7b, 0x3c, 0x6b, 0x24, 0x3e, 0xfb, 0x47, 0x03, 0x11, 0xb1,
0x05, 0xbf, 0x0d, 0x29, 0x3a, 0xee, 0x72, 0x59, 0xbf, 0xee, 0x4e, 0xb6, 0x01, 0x5f, 0x80, 0x7c,
0x77, 0xe8, 0xd2, 0x8e, 0x3f, 0x74, 0xc6, 0x5c, 0xa3, 0xa5, 0xf5, 0x95, 0xd0, 0x1b, 0x9b, 0x6a,
0x89, 0x84, 0x54, 0xf8, 0x3c, 0x64, 0x3c, 0x66, 0x36, 0xaf, 0x9a, 0x5d, 0x4d, 0xad, 0xe5, 0xdb,
0x95, 0x83, 0x59, 0xa3, 0x2c, 0x30, 0xe7, 0x1d, 0x7b, 0xe8, 0x53, 0x7b, 0xe2, 0xef, 0x11, 0x49,
0x83, 0xcf, 0x42, 0xb6, 0x4b, 0x47, 0x94, 0x39, 0x3b, 0xc7, 0x9d, 0x5d, 0xd6, 0xd8, 0xf3, 0x05,
0xa2, 0x08, 0xae, 0x1b, 0xb9, 0x4c, 0x39, 0x6b, 0xfe, 0x07, 0x01, 0xde, 0xb6, 0xec, 0xc9, 0x88,
0x7e, 0x6d, 0x7b, 0x06, 0x96, 0x4b, 0xbe, 0xb0, 0xe5, 0x52, 0x8b, 0x5a, 0x2e, 0x34, 0x83, 0xb1,
0x98, 0x19, 0xd2, 0x5f, 0x61, 0x06, 0xf3, 0x26, 0x64, 0x04, 0xea, 0xab, 0x62, 0x28, 0xd4, 0x39,
0xa5, 0xb4, 0x29, 0x87, 0xda, 0xa4, 0xb8, 0x9c, 0xe6, 0x2f, 0x11, 0x94, 0xa4, 0x21, 0x65, 0x0e,
0xee, 0x40, 0x56, 0xe4, 0x80, 0xca, 0xbf, 0x63, 0xf1, 0xfc, 0xbb, 0xd2, 0xb5, 0x26, 0x3e, 0x75,
0xdb, 0xad, 0xc7, 0xb3, 0x06, 0xfa, 0xfb, 0xac, 0x71, 0xa6, 0x3f, 0xf4, 0x07, 0xd3, 0x9d, 0x66,
0xc7, 0xb1, 0x5b, 0x7d, 0xd7, 0xea, 0x59, 0x63, 0xab, 0x35, 0x72, 0x76, 0x87, 0x2d, 0x55, 0x0f,
0x54, 0xde, 0x2a, 0xc6, 0xf8, 0x1c, 0x97, 0xce, 0xf7, 0xa4, 0x47, 0x96, 0x9b, 0xa2, 0x8c, 0x6c,
0x8d, 0xfb, 0xd4, 0x63, 0x9c, 0x0d, 0x66, 0x4c, 0x22, 0x68, 0xcc, 0x9f, 0xc0, 0x4a, 0xc4, 0xe1,
0x52, 0xce, 0x8b, 0x90, 0xf1, 0xa8, 0x3b, 0x0c, 0xca, 0x84, 0x66, 0xb2, 0x6d, 0x8e, 0x6f, 0x2f,
0x49, 0xf9, 0x32, 0x02, 0x26, 0x92, 0x7e, 0xb1, 0xd3, 0xff, 0x8c, 0xa0, 0x78, 0xd3, 0xda, 0xa1,
0x23, 0x15, 0x69, 0x18, 0x8c, 0xb1, 0x65, 0x53, 0x69, 0x71, 0xfe, 0xcd, 0xd2, 0xfe, 0x63, 0x6b,
0x34, 0xa5, 0x82, 0x65, 0x8e, 0x48, 0x68, 0xd1, 0x9c, 0x45, 0x2f, 0x9c, 0xb3, 0x28, 0x8c, 0xbc,
0x0a, 0xa4, 0x1f, 0x32, 0x43, 0xf1, 0x7c, 0xcd, 0x13, 0x01, 0x98, 0x67, 0xa0, 0x24, 0xb5, 0x90,
0xe6, 0x0b, 0x45, 0x66, 0xe6, 0xcb, 0x2b, 0x91, 0x4d, 0x1b, 0x32, 0xc2, 0xda, 0xf8, 0x75, 0xc8,
0x07, 0x3d, 0x80, 0x6b, 0x9b, 0x6a, 0x67, 0x0e, 0x66, 0x8d, 0xa4, 0xef, 0x91, 0x70, 0x01, 0x37,
0x20, 0xcd, 0x77, 0x72, 0xcd, 0x51, 0x3b, 0x7f, 0x30, 0x6b, 0x08, 0x04, 0x11, 0x3f, 0xf8, 0x04,
0x18, 0x03, 0x56, 0x86, 0x99, 0x09, 0x8c, 0x76, 0xee, 0x60, 0xd6, 0xe0, 0x30, 0xe1, 0x7f, 0xcd,
0x6b, 0x50, 0xbc, 0x49, 0xfb, 0x56, 0x67, 0x4f, 0x1e, 0x5a, 0x51, 0xec, 0xd8, 0x81, 0x48, 0xf1,
0x38, 0x05, 0xc5, 0xe0, 0xc4, 0x07, 0xb6, 0x27, 0x83, 0xba, 0x10, 0xe0, 0x6e, 0x79, 0xe6, 0xcf,
0x11, 0x48, 0x3f, 0x63, 0x13, 0x32, 0x23, 0xa6, 0xab, 0x27, 0x7c, 0xd4, 0x86, 0x83, 0x59, 0x43,
0x62, 0x88, 0xfc, 0xc5, 0x97, 0x21, 0xeb, 0xf1, 0x13, 0x19, 0xb3, 0x78, 0xf8, 0xf0, 0x85, 0xf6,
0x32, 0x0b, 0x83, 0x83, 0x59, 0x43, 0x11, 0x12, 0xf5, 0x81, 0x9b, 0x91, 0xfe, 0x22, 0x14, 0x5b,
0x3a, 0x98, 0x35, 0x34, 0xac, 0xde, 0x6f, 0xcc, 0x9f, 0x21, 0x28, 0xdc, 0xb3, 0x86, 0x41, 0x08,
0x05, 0x2e, 0x42, 0x9a, 0x8b, 0x58, 0x3a, 0x77, 0xe9, 0xc8, 0xda, 0xbb, 0xea, 0xb8, 0x9c, 0x67,
0x89, 0x04, 0x70, 0xd8, 0x12, 0x8c, 0xb9, 0x2d, 0x21, 0xbd, 0x70, 0x61, 0xbb, 0x6e, 0xe4, 0x92,
0xe5, 0x94, 0xf9, 0x3b, 0x04, 0x45, 0x21, 0x99, 0x0c, 0x8b, 0x1f, 0x43, 0x46, 0x08, 0xce, 0x65,
0xfb, 0x92, 0xe4, 0x3f, 0xb7, 0x48, 0xe2, 0x4b, 0x9e, 0xf8, 0xfb, 0xb0, 0xd4, 0x75, 0x9d, 0xc9,
0x84, 0x76, 0xb7, 0x65, 0x89, 0x49, 0xc6, 0x4b, 0xcc, 0xa6, 0xbe, 0x4e, 0x62, 0xe4, 0xe6, 0x5f,
0x11, 0x94, 0x64, 0x36, 0x4b, 0x5b, 0x06, 0x36, 0x40, 0x2f, 0x5c, 0xdc, 0x93, 0x8b, 0x16, 0xf7,
0xa3, 0x90, 0xe9, 0xbb, 0xce, 0x74, 0xe2, 0x55, 0x53, 0x22, 0x77, 0x04, 0xb4, 0x58, 0xd1, 0x37,
0xaf, 0xc3, 0x92, 0x52, 0xe5, 0x39, 0x25, 0xad, 0x16, 0x2f, 0x69, 0x5b, 0x5d, 0x3a, 0xf6, 0x87,
0xbd, 0x61, 0x50, 0xa4, 0x24, 0xbd, 0xf9, 0x29, 0x82, 0x72, 0x9c, 0x04, 0x7f, 0x4f, 0xcb, 0x03,
0xc6, 0xee, 0xf4, 0xf3, 0xd9, 0x35, 0x79, 0x71, 0xf0, 0xde, 0x1b, 0xfb, 0xee, 0x9e, 0xca, 0x91,
0xda, 0x3b, 0x50, 0xd0, 0xd0, 0xac, 0x79, 0xec, 0x52, 0x15, 0xb3, 0xec, 0x33, 0x4c, 0xd6, 0xa4,
0x88, 0x63, 0x0e, 0x5c, 0x4a, 0x5e, 0x44, 0x2c, 0xe2, 0x4b, 0x11, 0x4f, 0xe2, 0x8b, 0x60, 0xf4,
0x5c, 0xc7, 0x5e, 0xc8, 0x4d, 0x7c, 0x07, 0xfe, 0x16, 0x24, 0x7d, 0x67, 0x21, 0x27, 0x25, 0x7d,
0x87, 0xf9, 0x48, 0x2a, 0x9f, 0x12, 0x37, 0x34, 0x01, 0x99, 0xbf, 0x45, 0xb0, 0xcc, 0xf6, 0x08,
0x0b, 0x6c, 0x0c, 0xa6, 0xe3, 0x5d, 0xbc, 0x06, 0x65, 0x76, 0xd2, 0x83, 0xa1, 0xec, 0x00, 0x0f,
0x86, 0x5d, 0xa9, 0xe6, 0x12, 0xc3, 0xab, 0xc6, 0xb0, 0xd5, 0xc5, 0xc7, 0x20, 0x3b, 0xf5, 0x04,
0x81, 0xd0, 0x39, 0xc3, 0xc0, 0xad, 0x2e, 0x3e, 0xa7, 0x1d, 0xc7, 0x6c, 0xad, 0x5d, 0x93, 0xb8,
0x0d, 0xef, 0x5a, 0x43, 0x37, 0x28, 0x3e, 0x67, 0x20, 0xd3, 0x61, 0x07, 0x8b, 0x38, 0x61, 0x1d,
0x28, 0x20, 0xe6, 0x02, 0x11, 0xb9, 0x6c, 0x7e, 0x1b, 0xf2, 0xc1, 0xee, 0xb9, 0x8d, 0x67, 0xae,
0x07, 0xcc, 0xcb, 0xb0, 0x2c, 0x8a, 0xea, 0xfc, 0xcd, 0xc5, 0x79, 0x9b, 0x8b, 0x6a, 0xf3, 0x71,
0x48, 0x0b, 0xab, 0x60, 0x30, 0xba, 0x96, 0x6f, 0xa9, 0x2d, 0xec, 0xdb, 0xac, 0xc2, 0xd1, 0x7b,
0xae, 0x35, 0xf6, 0x7a, 0xd4, 0xe5, 0x44, 0x41, 0xec, 0x9a, 0xaf, 0xc1, 0x0a, 0x2b, 0x24, 0xd4,
0xf5, 0x36, 0x9c, 0xe9, 0xd8, 0x57, 0x17, 0xfd, 0xf3, 0x50, 0x89, 0xa2, 0x65, 0xa8, 0x57, 0x20,
0xdd, 0x61, 0x08, 0xce, 0xbd, 0x44, 0x04, 0x60, 0xfe, 0x0a, 0x01, 0xbe, 0x46, 0x7d, 0xce, 0x7a,
0x6b, 0xd3, 0xd3, 0x2e, 0x77, 0xb6, 0xe5, 0x77, 0x06, 0xd4, 0xf5, 0xd4, 0x45, 0x47, 0xc1, 0xff,
0x8f, 0xcb, 0x9d, 0x79, 0x01, 0x56, 0x22, 0x52, 0x4a, 0x9d, 0x6a, 0x90, 0xeb, 0x48, 0x9c, 0x6c,
0xaa, 0x01, 0x6c, 0xfe, 0x3e, 0x09, 0x39, 0xe1, 0x5b, 0xda, 0xc3, 0x17, 0xa0, 0xd0, 0x63, 0xb1,
0xe6, 0x4e, 0xdc, 0xa1, 0x34, 0x81, 0xd1, 0x5e, 0x3e, 0x98, 0x35, 0x74, 0x34, 0xd1, 0x01, 0xfc,
0x46, 0x2c, 0xf0, 0xda, 0x95, 0xfd, 0x59, 0x23, 0xf3, 0x03, 0x16, 0x7c, 0x9b, 0xac, 0xbd, 0xf1,
0x30, 0xdc, 0x0c, 0xc2, 0xf1, 0x86, 0xcc, 0x36, 0x7e, 0xd3, 0x6b, 0x7f, 0x87, 0x89, 0x1f, 0xab,
0xd7, 0x13, 0xd7, 0xb1, 0xa9, 0x3f, 0xa0, 0x53, 0xaf, 0xd5, 0x71, 0x6c, 0xdb, 0x19, 0xb7, 0xf8,
0x58, 0xc7, 0x95, 0x66, 0x3d, 0x9a, 0x6d, 0x97, 0x09, 0x78, 0x0f, 0xb2, 0xfe, 0xc0, 0x75, 0xa6,
0xfd, 0x01, 0x6f, 0x3f, 0xa9, 0xf6, 0xa5, 0xc5, 0xf9, 0x29, 0x0e, 0x44, 0x7d, 0xe0, 0x53, 0xcc,
0x5a, 0xb4, 0xb3, 0xeb, 0x4d, 0x6d, 0x31, 0x2c, 0xb5, 0xd3, 0x07, 0xb3, 0x06, 0x7a, 0x83, 0x04,
0x68, 0xf3, 0xd3, 0x24, 0x34, 0x78, 0x08, 0xdf, 0xe7, 0x77, 0x93, 0xab, 0x8e, 0x7b, 0x8b, 0xfa,
0xee, 0xb0, 0x73, 0xdb, 0xb2, 0xa9, 0x8a, 0x8d, 0x06, 0x14, 0x6c, 0x8e, 0x7c, 0xa0, 0x25, 0x07,
0xd8, 0x01, 0x1d, 0x3e, 0x09, 0xc0, 0xd3, 0x4e, 0xac, 0x8b, 0x3c, 0xc9, 0x73, 0x0c, 0x5f, 0xde,
0x88, 0x58, 0xaa, 0xb5, 0xa0, 0x66, 0xd2, 0x42, 0x5b, 0x71, 0x0b, 0x2d, 0xcc, 0x27, 0x30, 0x8b,
0x1e, 0xeb, 0xe9, 0x68, 0xac, 0x9b, 0x7f, 0x43, 0x50, 0xbf, 0xa9, 0x24, 0x7f, 0x41, 0x73, 0x28,
0x7d, 0x93, 0x2f, 0x49, 0xdf, 0xd4, 0xff, 0xa6, 0xaf, 0xf9, 0x27, 0x2d, 0xe5, 0x09, 0xed, 0x29,
0x3d, 0x36, 0xb4, 0x76, 0xf1, 0x32, 0xc4, 0x4c, 0xbe, 0x44, 0xb7, 0xa4, 0x62, 0x6e, 0x79, 0x37,
0x2c, 0x07, 0x5c, 0x03, 0x59, 0x0e, 0x4e, 0x83, 0xe1, 0xd2, 0x9e, 0x6a, 0xbe, 0x38, 0x5e, 0xe3,
0x69, 0x8f, 0xf0, 0x75, 0xf3, 0x0f, 0x08, 0xca, 0xd7, 0xa8, 0x1f, 0xbd, 0xd6, 0xbc, 0x4a, 0xfa,
0xbf, 0x0f, 0x47, 0x34, 0xf9, 0xa5, 0xf6, 0x6f, 0xc5, 0xee, 0x32, 0xaf, 0x85, 0xfa, 0x6f, 0x8d,
0xbb, 0xf4, 0x13, 0x39, 0xa3, 0x45, 0xaf, 0x31, 0x77, 0xa1, 0xa0, 0x2d, 0xe2, 0x2b, 0xb1, 0x0b,
0xcc, 0xbc, 0xa6, 0xda, 0xae, 0x48, 0x9d, 0xc4, 0x94, 0x26, 0xaf, 0xa7, 0x41, 0xbb, 0xdf, 0x06,
0xcc, 0xc7, 0x46, 0xce, 0x56, 0xaf, 0xd4, 0x1c, 0x7b, 0x23, 0xb8, 0xcf, 0x04, 0x30, 0x3e, 0x05,
0x86, 0xeb, 0x3c, 0x52, 0x37, 0xd3, 0x52, 0x78, 0x24, 0x71, 0x1e, 0x11, 0xbe, 0x64, 0x5e, 0x86,
0x14, 0x71, 0x1e, 0xe1, 0x3a, 0x80, 0x6b, 0x8d, 0xfb, 0xf4, 0x7e, 0x30, 0xb0, 0x14, 0x89, 0x86,
0x79, 0x4e, 0x7f, 0xdd, 0x80, 0x23, 0xba, 0x44, 0xc2, 0xdd, 0x4d, 0xc8, 0x32, 0xe4, 0x70, 0xde,
0xa3, 0x17, 0x27, 0x14, 0xb3, 0xaf, 0x22, 0x62, 0x31, 0x03, 0x21, 0x1e, 0x9f, 0x80, 0xbc, 0x6f,
0xed, 0x8c, 0xe8, 0xed, 0x30, 0xe7, 0x43, 0x04, 0x5b, 0x65, 0xb3, 0xd6, 0x7d, 0xed, 0xa2, 0x10,
0x22, 0xf0, 0x59, 0x28, 0x87, 0x32, 0xdf, 0x75, 0x69, 0x6f, 0xf8, 0x09, 0xf7, 0x70, 0x91, 0x1c,
0xc2, 0xe3, 0x35, 0x58, 0x0e, 0x71, 0xdb, 0xbc, 0xed, 0x1a, 0x9c, 0x34, 0x8e, 0x66, 0xb6, 0xe1,
0xea, 0xbe, 0xf7, 0x70, 0x6a, 0x8d, 0x78, 0x21, 0x2b, 0x12, 0x0d, 0x63, 0xfe, 0x11, 0xc1, 0x11,
0xe1, 0x6a, 0x36, 0x65, 0xbf, 0x8a, 0x51, 0xff, 0x6b, 0x04, 0x58, 0xd7, 0x40, 0x86, 0xd6, 0x37,
0xf5, 0xe7, 0x13, 0xd6, 0xd7, 0x0b, 0x7c, 0x84, 0x14, 0xa8, 0xf0, 0x05, 0xc4, 0x0c, 0xae, 0x80,
0xfc, 0xdd, 0x51, 0xcc, 0xa8, 0x02, 0xa3, 0x6e, 0x7f, 0x6c, 0xb4, 0xde, 0xd9, 0xf3, 0xa9, 0x27,
0x27, 0x4c, 0x3e, 0x5a, 0x73, 0x04, 0x11, 0x3f, 0xec, 0x2c, 0x3a, 0xf6, 0x79, 0xd4, 0x18, 0xe1,
0x59, 0x12, 0x45, 0xd4, 0x07, 0x6b, 0x1b, 0xa5, 0xfb, 0xce, 0x68, 0x1a, 0x76, 0x89, 0x57, 0xc8,
0xce, 0xd1, 0xd1, 0x37, 0x2d, 0x47, 0x5f, 0xf3, 0x47, 0xb0, 0xa4, 0x54, 0x92, 0x86, 0x7f, 0x13,
0xb2, 0x1f, 0x73, 0xcc, 0x9c, 0x07, 0x21, 0x41, 0x2a, 0x8b, 0x8d, 0x22, 0x8b, 0xbe, 0xb3, 0x06,
0x9c, 0x77, 0x21, 0x23, 0xc8, 0xf1, 0x09, 0xfd, 0xc2, 0x2d, 0x5e, 0x2e, 0x18, 0x2c, 0x6f, 0xcf,
0x91, 0x87, 0x8f, 0xfc, 0x9c, 0x87, 0x0f, 0x13, 0x32, 0xe2, 0x24, 0xe9, 0x3f, 0xee, 0x62, 0x81,
0x21, 0xf2, 0xf7, 0xec, 0x69, 0xc8, 0x07, 0xaf, 0xa8, 0xb8, 0x00, 0xd9, 0xab, 0x77, 0xc8, 0x0f,
0xaf, 0x90, 0xcd, 0x72, 0x02, 0x17, 0x21, 0xd7, 0xbe, 0xb2, 0x71, 0x83, 0x43, 0x68, 0xfd, 0xdf,
0x86, 0x2a, 0x10, 0x2e, 0xfe, 0x2e, 0xa4, 0x45, 0xd6, 0x1f, 0x0d, 0x15, 0xd4, 0xdf, 0x42, 0x6b,
0xc7, 0x0e, 0xe1, 0xe5, 0x1d, 0x3d, 0xf1, 0x26, 0xc2, 0xb7, 0xa1, 0xc0, 0x91, 0xf2, 0xbd, 0xe5,
0x44, 0xfc, 0xd9, 0x23, 0xc2, 0xe9, 0xe4, 0x73, 0x56, 0x35, 0x7e, 0x97, 0x20, 0xcd, 0x0b, 0xaf,
0x2e, 0x8d, 0xfe, 0x5e, 0xa6, 0x4b, 0x13, 0x79, 0x81, 0x32, 0x13, 0xf8, 0x1d, 0x30, 0xd8, 0x70,
0x80, 0xb5, 0xde, 0xa0, 0x3d, 0x93, 0xd4, 0x8e, 0xc6, 0xd1, 0xda, 0xb1, 0xef, 0x06, 0xaf, 0x3d,
0xc7, 0xe2, 0x53, 0xad, 0xda, 0x5e, 0x3d, 0xbc, 0x10, 0x9c, 0x7c, 0x47, 0x3c, 0x7b, 0xa8, 0xb1,
0x04, 0x9f, 0x8c, 0x1e, 0x15, 0x9b, 0x62, 0x6a, 0xf5, 0xe7, 0x2d, 0x07, 0x0c, 0x6f, 0x42, 0x41,
0x1b, 0x09, 0x74, 0xb3, 0x1e, 0x9e, 0x67, 0x74, 0xb3, 0xce, 0x99, 0x23, 0xcc, 0x04, 0xbe, 0x06,
0x39, 0xd6, 0x51, 0x59, 0x61, 0xc1, 0xc7, 0xe3, 0x8d, 0x53, 0x2b, 0x98, 0xb5, 0x13, 0xf3, 0x17,
0x03, 0x46, 0x57, 0x61, 0x39, 0x68, 0xcd, 0x32, 0xaa, 0x8f, 0xc5, 0xd3, 0x62, 0x8e, 0xbd, 0xa2,
0xa9, 0x65, 0x26, 0xd6, 0x3f, 0x82, 0x9c, 0x9a, 0x82, 0xf1, 0x07, 0xb0, 0x14, 0x9d, 0x01, 0xf1,
0x37, 0x34, 0xf3, 0x44, 0x47, 0xeb, 0xda, 0xaa, 0xb6, 0x34, 0x7f, 0x70, 0x4c, 0xac, 0xa1, 0xf5,
0x8f, 0xd4, 0xbf, 0x6f, 0x36, 0x2d, 0xdf, 0xc2, 0x77, 0x60, 0x89, 0x6b, 0x1f, 0xfc, 0x7f, 0x27,
0x12, 0xa5, 0x87, 0xfe, 0x99, 0x14, 0x89, 0xd2, 0xc3, 0xff, 0x54, 0x32, 0x13, 0xed, 0x0f, 0x9f,
0x3c, 0xad, 0x27, 0x3e, 0x7f, 0x5a, 0x4f, 0x7c, 0xf1, 0xb4, 0x8e, 0x7e, 0xba, 0x5f, 0x47, 0xbf,
0xd9, 0xaf, 0xa3, 0xc7, 0xfb, 0x75, 0xf4, 0x64, 0xbf, 0x8e, 0xfe, 0xb9, 0x5f, 0x47, 0xff, 0xda,
0xaf, 0x27, 0xbe, 0xd8, 0xaf, 0xa3, 0xcf, 0x9e, 0xd5, 0x13, 0x4f, 0x9e, 0xd5, 0x13, 0x9f, 0x3f,
0xab, 0x27, 0x3e, 0x7c, 0xfd, 0xcb, 0x5e, 0xb7, 0xd4, 0x89, 0x3b, 0x19, 0xfe, 0xf3, 0xd6, 0x7f,
0x03, 0x00, 0x00, 0xff, 0xff, 0x0a, 0x2d, 0xf5, 0x83, 0x7d, 0x1b, 0x00, 0x00,
// 2213 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xd4, 0x39, 0xcd, 0x6f, 0x1b, 0xc7,
0xf5, 0x1c, 0x72, 0x49, 0x91, 0x8f, 0xd4, 0x87, 0x47, 0x8c, 0xad, 0x1f, 0x6d, 0x93, 0xf2, 0x20,
0x3f, 0x5b, 0xb0, 0x1d, 0x32, 0x56, 0xda, 0xd4, 0xb1, 0x9b, 0x16, 0xa6, 0x14, 0x3b, 0xf2, 0x77,
0x46, 0xae, 0x0b, 0x04, 0x0d, 0x8c, 0x15, 0x39, 0xfc, 0x80, 0xb8, 0x5c, 0x7a, 0x77, 0x19, 0x47,
0x40, 0x0f, 0xfd, 0x07, 0x02, 0xe4, 0x56, 0xf4, 0x52, 0xf4, 0x50, 0xa0, 0x45, 0x81, 0x5e, 0xfa,
0x07, 0xb4, 0x3d, 0x14, 0xa8, 0x7b, 0x73, 0x6f, 0x41, 0x0f, 0x6c, 0x2d, 0x5f, 0x02, 0x9d, 0xf2,
0x0f, 0xb4, 0x28, 0xe6, 0x6b, 0x77, 0x76, 0x45, 0x27, 0xa1, 0x6b, 0xa0, 0xf5, 0x45, 0xdc, 0xf7,
0xe6, 0xcd, 0x9b, 0xf7, 0xfd, 0xe6, 0x8d, 0xe0, 0xf8, 0x68, 0xb7, 0xdb, 0x18, 0xb8, 0xdd, 0x91,
0xe7, 0x06, 0x6e, 0xf8, 0x51, 0x17, 0x7f, 0x71, 0x5e, 0xc3, 0x95, 0x72, 0xd7, 0xed, 0xba, 0x92,
0x86, 0x7f, 0xc9, 0xf5, 0x4a, 0xad, 0xeb, 0xba, 0xdd, 0x01, 0x6b, 0x08, 0x68, 0x67, 0xdc, 0x69,
0x04, 0x7d, 0x87, 0xf9, 0x81, 0xed, 0x8c, 0x14, 0xc1, 0xaa, 0xe2, 0xfe, 0x70, 0xe0, 0xb8, 0x6d,
0x36, 0x68, 0xf8, 0x81, 0x1d, 0xf8, 0xf2, 0xaf, 0xa2, 0x58, 0xe6, 0x14, 0xa3, 0xb1, 0xdf, 0x13,
0x7f, 0x24, 0x92, 0x94, 0x01, 0x6f, 0x07, 0x1e, 0xb3, 0x1d, 0x6a, 0x07, 0xcc, 0xa7, 0xec, 0xe1,
0x98, 0xf9, 0x01, 0xb9, 0x05, 0xcb, 0x31, 0xac, 0x3f, 0x72, 0x87, 0x3e, 0xc3, 0x6f, 0x43, 0xd1,
0x8f, 0xd0, 0x2b, 0x68, 0x35, 0xb3, 0x56, 0x5c, 0x2f, 0xd7, 0x43, 0x55, 0xa2, 0x3d, 0xd4, 0x24,
0x24, 0x3f, 0x47, 0x00, 0xd1, 0x1a, 0xae, 0x02, 0xc8, 0xd5, 0xf7, 0x6d, 0xbf, 0xb7, 0x82, 0x56,
0xd1, 0x9a, 0x45, 0x0d, 0x0c, 0x3e, 0x0f, 0x47, 0x22, 0xe8, 0xb6, 0xbb, 0xdd, 0xb3, 0xbd, 0xf6,
0x4a, 0x5a, 0x90, 0x1d, 0x5e, 0xc0, 0x18, 0x2c, 0xcf, 0x0e, 0xd8, 0x4a, 0x66, 0x15, 0xad, 0x65,
0xa8, 0xf8, 0xc6, 0x47, 0x21, 0x17, 0xb0, 0xa1, 0x3d, 0x0c, 0x56, 0xac, 0x55, 0xb4, 0x56, 0xa0,
0x0a, 0xe2, 0x78, 0xae, 0x3b, 0xf3, 0x57, 0xb2, 0xab, 0x68, 0x6d, 0x9e, 0x2a, 0x88, 0xfc, 0x39,
0x0d, 0xa5, 0x0f, 0xc6, 0xcc, 0xdb, 0x53, 0x06, 0xc0, 0x15, 0xc8, 0xfb, 0x6c, 0xc0, 0x5a, 0x81,
0xeb, 0x09, 0x01, 0x0b, 0x34, 0x84, 0x71, 0x19, 0xb2, 0x83, 0xbe, 0xd3, 0x0f, 0x84, 0x48, 0xf3,
0x54, 0x02, 0xf8, 0x12, 0x64, 0xfd, 0xc0, 0xf6, 0x02, 0x21, 0x47, 0x71, 0xbd, 0x52, 0x97, 0x0e,
0xab, 0x6b, 0x87, 0xd5, 0xef, 0x69, 0x87, 0x35, 0xf3, 0x8f, 0x27, 0xb5, 0xd4, 0x67, 0x7f, 0xaf,
0x21, 0x2a, 0xb7, 0xe0, 0xb7, 0x21, 0xc3, 0x86, 0x6d, 0x21, 0xeb, 0x37, 0xdd, 0xc9, 0x37, 0xe0,
0x0b, 0x50, 0x68, 0xf7, 0x3d, 0xd6, 0x0a, 0xfa, 0xee, 0x50, 0x68, 0xb4, 0xb0, 0xbe, 0x1c, 0x79,
0x63, 0x53, 0x2f, 0xd1, 0x88, 0x0a, 0x9f, 0x87, 0x9c, 0xcf, 0xcd, 0xe6, 0xaf, 0xcc, 0xad, 0x66,
0xd6, 0x0a, 0xcd, 0xf2, 0xc1, 0xa4, 0xb6, 0x24, 0x31, 0xe7, 0x5d, 0xa7, 0x1f, 0x30, 0x67, 0x14,
0xec, 0x51, 0x45, 0x83, 0xcf, 0xc2, 0x5c, 0x9b, 0x0d, 0x18, 0x77, 0x76, 0x5e, 0x38, 0x7b, 0xc9,
0x60, 0x2f, 0x16, 0xa8, 0x26, 0xb8, 0x6e, 0xe5, 0x73, 0x4b, 0x73, 0xe4, 0x5f, 0x08, 0xf0, 0xb6,
0xed, 0x8c, 0x06, 0xec, 0x1b, 0xdb, 0x33, 0xb4, 0x5c, 0xfa, 0x85, 0x2d, 0x97, 0x99, 0xd5, 0x72,
0x91, 0x19, 0xac, 0xd9, 0xcc, 0x90, 0xfd, 0x1a, 0x33, 0x90, 0x9b, 0x90, 0x93, 0xa8, 0xaf, 0x8b,
0xa1, 0x48, 0xe7, 0x8c, 0xd6, 0x66, 0x29, 0xd2, 0x26, 0x23, 0xe4, 0x24, 0xbf, 0x40, 0x30, 0xaf,
0x0c, 0xa9, 0x72, 0x70, 0x07, 0xe6, 0x64, 0x0e, 0xe8, 0xfc, 0x3b, 0x96, 0xcc, 0xbf, 0x2b, 0x6d,
0x7b, 0x14, 0x30, 0xaf, 0xd9, 0x78, 0x3c, 0xa9, 0xa1, 0xbf, 0x4d, 0x6a, 0x67, 0xba, 0xfd, 0xa0,
0x37, 0xde, 0xa9, 0xb7, 0x5c, 0xa7, 0xd1, 0xf5, 0xec, 0x8e, 0x3d, 0xb4, 0x1b, 0x03, 0x77, 0xb7,
0xdf, 0xd0, 0xf5, 0x40, 0xe7, 0xad, 0x66, 0x8c, 0xcf, 0x09, 0xe9, 0x02, 0x5f, 0x79, 0x64, 0xb1,
0x2e, 0xcb, 0xc8, 0xd6, 0xb0, 0xcb, 0x7c, 0xce, 0xd9, 0xe2, 0xc6, 0xa4, 0x92, 0x86, 0xfc, 0x18,
0x96, 0x63, 0x0e, 0x57, 0x72, 0x5e, 0x84, 0x9c, 0xcf, 0xbc, 0x7e, 0x58, 0x26, 0x0c, 0x93, 0x6d,
0x0b, 0x7c, 0x73, 0x41, 0xc9, 0x97, 0x93, 0x30, 0x55, 0xf4, 0xb3, 0x9d, 0xfe, 0x27, 0x04, 0xa5,
0x9b, 0xf6, 0x0e, 0x1b, 0xe8, 0x48, 0xc3, 0x60, 0x0d, 0x6d, 0x87, 0x29, 0x8b, 0x8b, 0x6f, 0x9e,
0xf6, 0x1f, 0xdb, 0x83, 0x31, 0x93, 0x2c, 0xf3, 0x54, 0x41, 0xb3, 0xe6, 0x2c, 0x7a, 0xe1, 0x9c,
0x45, 0x51, 0xe4, 0x95, 0x21, 0xfb, 0x90, 0x1b, 0x4a, 0xe4, 0x6b, 0x81, 0x4a, 0x80, 0x9c, 0x81,
0x79, 0xa5, 0x85, 0x32, 0x5f, 0x24, 0x32, 0x37, 0x5f, 0x41, 0x8b, 0x4c, 0x1c, 0xc8, 0x49, 0x6b,
0xe3, 0xd7, 0xa1, 0x10, 0xf6, 0x00, 0xa1, 0x6d, 0xa6, 0x99, 0x3b, 0x98, 0xd4, 0xd2, 0x81, 0x4f,
0xa3, 0x05, 0x5c, 0x83, 0xac, 0xd8, 0x29, 0x34, 0x47, 0xcd, 0xc2, 0xc1, 0xa4, 0x26, 0x11, 0x54,
0xfe, 0xe0, 0x13, 0x60, 0xf5, 0x78, 0x19, 0xe6, 0x26, 0xb0, 0x9a, 0xf9, 0x83, 0x49, 0x4d, 0xc0,
0x54, 0xfc, 0x25, 0xd7, 0xa0, 0x74, 0x93, 0x75, 0xed, 0xd6, 0x9e, 0x3a, 0xb4, 0xac, 0xd9, 0xf1,
0x03, 0x91, 0xe6, 0x71, 0x0a, 0x4a, 0xe1, 0x89, 0x0f, 0x1c, 0x5f, 0x05, 0x75, 0x31, 0xc4, 0xdd,
0xf2, 0xc9, 0xcf, 0x10, 0x28, 0x3f, 0x63, 0x02, 0xb9, 0x01, 0xd7, 0xd5, 0x97, 0x3e, 0x6a, 0xc2,
0xc1, 0xa4, 0xa6, 0x30, 0x54, 0xfd, 0xe2, 0xcb, 0x30, 0xe7, 0x8b, 0x13, 0x39, 0xb3, 0x64, 0xf8,
0x88, 0x85, 0xe6, 0x22, 0x0f, 0x83, 0x83, 0x49, 0x4d, 0x13, 0x52, 0xfd, 0x81, 0xeb, 0xb1, 0xfe,
0x22, 0x15, 0x5b, 0x38, 0x98, 0xd4, 0x0c, 0xac, 0xd9, 0x6f, 0xc8, 0x4f, 0x11, 0x14, 0xef, 0xd9,
0xfd, 0x30, 0x84, 0x42, 0x17, 0x21, 0xc3, 0x45, 0x3c, 0x9d, 0xdb, 0x6c, 0x60, 0xef, 0x5d, 0x75,
0x3d, 0xc1, 0x73, 0x9e, 0x86, 0x70, 0xd4, 0x12, 0xac, 0xa9, 0x2d, 0x21, 0x3b, 0x73, 0x61, 0xbb,
0x6e, 0xe5, 0xd3, 0x4b, 0x19, 0xf2, 0x5b, 0x04, 0x25, 0x29, 0x99, 0x0a, 0x8b, 0x1f, 0x41, 0x4e,
0x0a, 0x2e, 0x64, 0xfb, 0x8a, 0xe4, 0x3f, 0x37, 0x4b, 0xe2, 0x2b, 0x9e, 0xf8, 0xfb, 0xb0, 0xd0,
0xf6, 0xdc, 0xd1, 0x88, 0xb5, 0xb7, 0x55, 0x89, 0x49, 0x27, 0x4b, 0xcc, 0xa6, 0xb9, 0x4e, 0x13,
0xe4, 0xe4, 0x2f, 0x08, 0xe6, 0x55, 0x36, 0x2b, 0x5b, 0x86, 0x36, 0x40, 0x2f, 0x5c, 0xdc, 0xd3,
0xb3, 0x16, 0xf7, 0xa3, 0x90, 0xeb, 0x7a, 0xee, 0x78, 0xe4, 0xaf, 0x64, 0x64, 0xee, 0x48, 0x68,
0xb6, 0xa2, 0x4f, 0xae, 0xc3, 0x82, 0x56, 0xe5, 0x39, 0x25, 0xad, 0x92, 0x2c, 0x69, 0x5b, 0x6d,
0x36, 0x0c, 0xfa, 0x9d, 0x7e, 0x58, 0xa4, 0x14, 0x3d, 0xf9, 0x14, 0xc1, 0x52, 0x92, 0x04, 0x7f,
0xcf, 0xc8, 0x03, 0xce, 0xee, 0xf4, 0xf3, 0xd9, 0xd5, 0x45, 0x71, 0xf0, 0xdf, 0x1b, 0x06, 0xde,
0x9e, 0xce, 0x91, 0xca, 0x3b, 0x50, 0x34, 0xd0, 0xbc, 0x79, 0xec, 0x32, 0x1d, 0xb3, 0xfc, 0x33,
0x4a, 0xd6, 0xb4, 0x8c, 0x63, 0x01, 0x5c, 0x4a, 0x5f, 0x44, 0x3c, 0xe2, 0xe7, 0x63, 0x9e, 0xc4,
0x17, 0xc1, 0xea, 0x78, 0xae, 0x33, 0x93, 0x9b, 0xc4, 0x0e, 0xfc, 0x2d, 0x48, 0x07, 0xee, 0x4c,
0x4e, 0x4a, 0x07, 0x2e, 0xf7, 0x91, 0x52, 0x3e, 0x23, 0x6f, 0x68, 0x12, 0x22, 0xbf, 0x41, 0xb0,
0xc8, 0xf7, 0x48, 0x0b, 0x6c, 0xf4, 0xc6, 0xc3, 0x5d, 0xbc, 0x06, 0x4b, 0xfc, 0xa4, 0x07, 0x7d,
0xd5, 0x01, 0x1e, 0xf4, 0xdb, 0x4a, 0xcd, 0x05, 0x8e, 0xd7, 0x8d, 0x61, 0xab, 0x8d, 0x8f, 0xc1,
0xdc, 0xd8, 0x97, 0x04, 0x52, 0xe7, 0x1c, 0x07, 0xb7, 0xda, 0xf8, 0x9c, 0x71, 0x1c, 0xb7, 0xb5,
0x71, 0x4d, 0x12, 0x36, 0xbc, 0x6b, 0xf7, 0xbd, 0xb0, 0xf8, 0x9c, 0x81, 0x5c, 0x8b, 0x1f, 0x2c,
0xe3, 0x84, 0x77, 0xa0, 0x90, 0x58, 0x08, 0x44, 0xd5, 0x32, 0xf9, 0x36, 0x14, 0xc2, 0xdd, 0x53,
0x1b, 0xcf, 0x54, 0x0f, 0x90, 0xcb, 0xb0, 0x28, 0x8b, 0xea, 0xf4, 0xcd, 0xa5, 0x69, 0x9b, 0x4b,
0x7a, 0xf3, 0x71, 0xc8, 0x4a, 0xab, 0x60, 0xb0, 0xda, 0x76, 0x60, 0xeb, 0x2d, 0xfc, 0x9b, 0xac,
0xc0, 0xd1, 0x7b, 0x9e, 0x3d, 0xf4, 0x3b, 0xcc, 0x13, 0x44, 0x61, 0xec, 0x92, 0xd7, 0x60, 0x99,
0x17, 0x12, 0xe6, 0xf9, 0x1b, 0xee, 0x78, 0x18, 0xe8, 0x8b, 0xfe, 0x79, 0x28, 0xc7, 0xd1, 0x2a,
0xd4, 0xcb, 0x90, 0x6d, 0x71, 0x84, 0xe0, 0x3e, 0x4f, 0x25, 0x40, 0x7e, 0x89, 0x00, 0x5f, 0x63,
0x81, 0x60, 0xbd, 0xb5, 0xe9, 0x1b, 0x97, 0x3b, 0xc7, 0x0e, 0x5a, 0x3d, 0xe6, 0xf9, 0xfa, 0xa2,
0xa3, 0xe1, 0xff, 0xc6, 0xe5, 0x8e, 0x5c, 0x80, 0xe5, 0x98, 0x94, 0x4a, 0xa7, 0x0a, 0xe4, 0x5b,
0x0a, 0xa7, 0x9a, 0x6a, 0x08, 0x93, 0xdf, 0xa5, 0x21, 0x2f, 0x7d, 0xcb, 0x3a, 0xf8, 0x02, 0x14,
0x3b, 0x3c, 0xd6, 0xbc, 0x91, 0xd7, 0x57, 0x26, 0xb0, 0x9a, 0x8b, 0x07, 0x93, 0x9a, 0x89, 0xa6,
0x26, 0x80, 0xdf, 0x48, 0x04, 0x5e, 0xb3, 0xbc, 0x3f, 0xa9, 0xe5, 0x7e, 0xc0, 0x83, 0x6f, 0x93,
0xb7, 0x37, 0x11, 0x86, 0x9b, 0x61, 0x38, 0xde, 0x50, 0xd9, 0x26, 0x6e, 0x7a, 0xcd, 0xef, 0x70,
0xf1, 0x13, 0xf5, 0x7a, 0xe4, 0xb9, 0x0e, 0x0b, 0x7a, 0x6c, 0xec, 0x37, 0x5a, 0xae, 0xe3, 0xb8,
0xc3, 0x86, 0x18, 0xeb, 0x84, 0xd2, 0xbc, 0x47, 0xf3, 0xed, 0x2a, 0x01, 0xef, 0xc1, 0x5c, 0xd0,
0xf3, 0xdc, 0x71, 0xb7, 0x27, 0xda, 0x4f, 0xa6, 0x79, 0x69, 0x76, 0x7e, 0x9a, 0x03, 0xd5, 0x1f,
0xf8, 0x14, 0xb7, 0x16, 0x6b, 0xed, 0xfa, 0x63, 0x47, 0x0e, 0x4b, 0xcd, 0xec, 0xc1, 0xa4, 0x86,
0xde, 0xa0, 0x21, 0x9a, 0x7c, 0x9a, 0x86, 0x9a, 0x08, 0xe1, 0xfb, 0xe2, 0x6e, 0x72, 0xd5, 0xf5,
0x6e, 0xb1, 0xc0, 0xeb, 0xb7, 0x6e, 0xdb, 0x0e, 0xd3, 0xb1, 0x51, 0x83, 0xa2, 0x23, 0x90, 0x0f,
0x8c, 0xe4, 0x00, 0x27, 0xa4, 0xc3, 0x27, 0x01, 0x44, 0xda, 0xc9, 0x75, 0x99, 0x27, 0x05, 0x81,
0x11, 0xcb, 0x1b, 0x31, 0x4b, 0x35, 0x66, 0xd4, 0x4c, 0x59, 0x68, 0x2b, 0x69, 0xa1, 0x99, 0xf9,
0x84, 0x66, 0x31, 0x63, 0x3d, 0x1b, 0x8f, 0x75, 0xf2, 0x57, 0x04, 0xd5, 0x9b, 0x5a, 0xf2, 0x17,
0x34, 0x87, 0xd6, 0x37, 0xfd, 0x92, 0xf4, 0xcd, 0xfc, 0x67, 0xfa, 0x92, 0x3f, 0x1a, 0x29, 0x4f,
0x59, 0x47, 0xeb, 0xb1, 0x61, 0xb4, 0x8b, 0x97, 0x21, 0x66, 0xfa, 0x25, 0xba, 0x25, 0x93, 0x70,
0xcb, 0xbb, 0x51, 0x39, 0x10, 0x1a, 0xa8, 0x72, 0x70, 0x1a, 0x2c, 0x8f, 0x75, 0x74, 0xf3, 0xc5,
0xc9, 0x1a, 0xcf, 0x3a, 0x54, 0xac, 0x93, 0xdf, 0x23, 0x58, 0xba, 0xc6, 0x82, 0xf8, 0xb5, 0xe6,
0x55, 0xd2, 0xff, 0x7d, 0x38, 0x62, 0xc8, 0xaf, 0xb4, 0x7f, 0x2b, 0x71, 0x97, 0x79, 0x2d, 0xd2,
0x7f, 0x6b, 0xd8, 0x66, 0x9f, 0xa8, 0x19, 0x2d, 0x7e, 0x8d, 0xb9, 0x0b, 0x45, 0x63, 0x11, 0x5f,
0x49, 0x5c, 0x60, 0xa6, 0x35, 0xd5, 0x66, 0x59, 0xe9, 0x24, 0xa7, 0x34, 0x75, 0x3d, 0x0d, 0xdb,
0xfd, 0x36, 0x60, 0x31, 0x36, 0x0a, 0xb6, 0x66, 0xa5, 0x16, 0xd8, 0x1b, 0xe1, 0x7d, 0x26, 0x84,
0xf1, 0x29, 0xb0, 0x3c, 0xf7, 0x91, 0xbe, 0x99, 0xce, 0x47, 0x47, 0x52, 0xf7, 0x11, 0x15, 0x4b,
0xe4, 0x32, 0x64, 0xa8, 0xfb, 0x08, 0x57, 0x01, 0x3c, 0x7b, 0xd8, 0x65, 0xf7, 0xc3, 0x81, 0xa5,
0x44, 0x0d, 0xcc, 0x73, 0xfa, 0xeb, 0x06, 0x1c, 0x31, 0x25, 0x92, 0xee, 0xae, 0xc3, 0x1c, 0x47,
0xf6, 0xa7, 0x3d, 0x7a, 0x09, 0x42, 0x39, 0xfb, 0x6a, 0x22, 0x1e, 0x33, 0x10, 0xe1, 0xf1, 0x09,
0x28, 0x04, 0xf6, 0xce, 0x80, 0xdd, 0x8e, 0x72, 0x3e, 0x42, 0xf0, 0x55, 0x3e, 0x6b, 0xdd, 0x37,
0x2e, 0x0a, 0x11, 0x02, 0x9f, 0x85, 0xa5, 0x48, 0xe6, 0xbb, 0x1e, 0xeb, 0xf4, 0x3f, 0x11, 0x1e,
0x2e, 0xd1, 0x43, 0x78, 0xbc, 0x06, 0x8b, 0x11, 0x6e, 0x5b, 0xb4, 0x5d, 0x4b, 0x90, 0x26, 0xd1,
0xdc, 0x36, 0x42, 0xdd, 0xf7, 0x1e, 0x8e, 0xed, 0x81, 0x28, 0x64, 0x25, 0x6a, 0x60, 0xc8, 0x1f,
0x10, 0x1c, 0x91, 0xae, 0xe6, 0x53, 0xf6, 0xab, 0x18, 0xf5, 0xbf, 0x42, 0x80, 0x4d, 0x0d, 0x54,
0x68, 0xfd, 0xbf, 0xf9, 0x7c, 0xc2, 0xfb, 0x7a, 0x51, 0x8c, 0x90, 0x12, 0x15, 0xbd, 0x80, 0x90,
0xf0, 0x0a, 0x28, 0xde, 0x1d, 0xe5, 0x8c, 0x2a, 0x31, 0xfa, 0xf6, 0xc7, 0x47, 0xeb, 0x9d, 0xbd,
0x80, 0xf9, 0x6a, 0xc2, 0x14, 0xa3, 0xb5, 0x40, 0x50, 0xf9, 0xc3, 0xcf, 0x62, 0xc3, 0x40, 0x44,
0x8d, 0x15, 0x9d, 0xa5, 0x50, 0x54, 0x7f, 0xf0, 0xb6, 0x31, 0x7f, 0xdf, 0x1d, 0x8c, 0xa3, 0x2e,
0xf1, 0x0a, 0xd9, 0x39, 0x3e, 0xfa, 0x66, 0xd5, 0xe8, 0x4b, 0xbe, 0x40, 0xb0, 0xa0, 0x75, 0x52,
0x96, 0x7f, 0x13, 0xe6, 0x3e, 0x16, 0x98, 0x29, 0x2f, 0x42, 0x92, 0x54, 0x55, 0x1b, 0x4d, 0x16,
0x7f, 0x68, 0xd5, 0xac, 0xff, 0xd7, 0x6e, 0x04, 0x64, 0x17, 0x72, 0x52, 0x7c, 0x7c, 0xc2, 0x9c,
0x00, 0xe4, 0x53, 0x0a, 0x87, 0xd5, 0x75, 0x3e, 0xf6, 0x12, 0x53, 0x98, 0xf2, 0x12, 0x43, 0x20,
0x27, 0x35, 0x57, 0x01, 0x25, 0x62, 0x4e, 0x62, 0xa8, 0xfa, 0x3d, 0x7b, 0x1a, 0x0a, 0xe1, 0xb3,
0x2e, 0x2e, 0xc2, 0xdc, 0xd5, 0x3b, 0xf4, 0x87, 0x57, 0xe8, 0xe6, 0x52, 0x0a, 0x97, 0x20, 0xdf,
0xbc, 0xb2, 0x71, 0x43, 0x40, 0x68, 0xfd, 0x9f, 0x96, 0xae, 0x58, 0x1e, 0xfe, 0x2e, 0x64, 0x65,
0x19, 0x3a, 0x1a, 0x19, 0xdc, 0x7c, 0x9c, 0xad, 0x1c, 0x3b, 0x84, 0x57, 0x43, 0x43, 0xea, 0x4d,
0x84, 0x6f, 0x43, 0x51, 0x20, 0xd5, 0x03, 0xd0, 0x89, 0xe4, 0x3b, 0x4c, 0x8c, 0xd3, 0xc9, 0xe7,
0xac, 0x1a, 0xfc, 0x2e, 0x41, 0x56, 0x74, 0x02, 0x53, 0x1a, 0xf3, 0x01, 0xcf, 0x94, 0x26, 0xf6,
0x24, 0x46, 0x52, 0xf8, 0x1d, 0xb0, 0xf8, 0xb4, 0x82, 0x8d, 0x66, 0x65, 0xbc, 0xdb, 0x54, 0x8e,
0x26, 0xd1, 0xc6, 0xb1, 0xef, 0x86, 0xcf, 0x4f, 0xc7, 0x92, 0x63, 0xb6, 0xde, 0xbe, 0x72, 0x78,
0x21, 0x3c, 0xf9, 0x8e, 0x7c, 0x87, 0xd1, 0x73, 0x12, 0x3e, 0x19, 0x3f, 0x2a, 0x31, 0x56, 0x55,
0xaa, 0xcf, 0x5b, 0x0e, 0x19, 0xde, 0x84, 0xa2, 0x31, 0xa3, 0x98, 0x66, 0x3d, 0x3c, 0x60, 0x99,
0x66, 0x9d, 0x32, 0xd8, 0x90, 0x14, 0xbe, 0x06, 0x79, 0xde, 0xe2, 0x79, 0xa5, 0xc3, 0xc7, 0x93,
0x9d, 0xdc, 0xa8, 0xe0, 0x95, 0x13, 0xd3, 0x17, 0x43, 0x46, 0x57, 0x61, 0x31, 0xbc, 0x2b, 0xa8,
0xa8, 0x3e, 0x96, 0x4c, 0xd3, 0x29, 0xf6, 0x8a, 0xa7, 0x3a, 0x49, 0xad, 0x7f, 0x04, 0x79, 0x3d,
0x96, 0xe3, 0x0f, 0x60, 0x21, 0x3e, 0x94, 0xe2, 0xff, 0x33, 0xcc, 0x13, 0x9f, 0xf5, 0x2b, 0xab,
0xc6, 0xd2, 0xf4, 0x49, 0x36, 0xb5, 0x86, 0xd6, 0x3f, 0xd2, 0xff, 0x4f, 0xda, 0xb4, 0x03, 0x1b,
0xdf, 0x81, 0x05, 0xa1, 0x7d, 0xf8, 0x0f, 0xa7, 0x58, 0x94, 0x1e, 0xfa, 0xef, 0x56, 0x2c, 0x4a,
0x0f, 0xff, 0x97, 0x8b, 0xa4, 0x9a, 0x1f, 0x3e, 0x79, 0x5a, 0x4d, 0x7d, 0xfe, 0xb4, 0x9a, 0xfa,
0xf2, 0x69, 0x15, 0xfd, 0x64, 0xbf, 0x8a, 0x7e, 0xbd, 0x5f, 0x45, 0x8f, 0xf7, 0xab, 0xe8, 0xc9,
0x7e, 0x15, 0xfd, 0x63, 0xbf, 0x8a, 0xbe, 0xd8, 0xaf, 0xa6, 0xbe, 0xdc, 0xaf, 0xa2, 0xcf, 0x9e,
0x55, 0x53, 0x4f, 0x9e, 0x55, 0x53, 0x9f, 0x3f, 0xab, 0xa6, 0x3e, 0x7c, 0xfd, 0xab, 0x9e, 0xdb,
0xf4, 0x89, 0x3b, 0x39, 0xf1, 0xf3, 0xd6, 0xbf, 0x03, 0x00, 0x00, 0xff, 0xff, 0x3e, 0x47, 0x25,
0xcf, 0x0e, 0x1c, 0x00, 0x00,
}
func (x Direction) String() string {
@ -4037,6 +4040,12 @@ func (this *VolumeResponse) Equal(that interface{}) bool {
if this.Limit != that1.Limit {
return false
}
if !this.From.Equal(that1.From) {
return false
}
if !this.Through.Equal(that1.Through) {
return false
}
return true
}
func (this *Volume) Equal(that interface{}) bool {
@ -4619,7 +4628,7 @@ func (this *VolumeResponse) GoString() string {
if this == nil {
return "nil"
}
s := make([]string, 0, 6)
s := make([]string, 0, 8)
s = append(s, "&logproto.VolumeResponse{")
if this.Volumes != nil {
vs := make([]*Volume, len(this.Volumes))
@ -4629,6 +4638,8 @@ func (this *VolumeResponse) GoString() string {
s = append(s, "Volumes: "+fmt.Sprintf("%#v", vs)+",\n")
}
s = append(s, "Limit: "+fmt.Sprintf("%#v", this.Limit)+",\n")
s = append(s, "From: "+fmt.Sprintf("%#v", this.From)+",\n")
s = append(s, "Through: "+fmt.Sprintf("%#v", this.Through)+",\n")
s = append(s, "}")
return strings.Join(s, "")
}
@ -7161,6 +7172,16 @@ func (m *VolumeResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) {
_ = i
var l int
_ = l
if m.Through != 0 {
i = encodeVarintLogproto(dAtA, i, uint64(m.Through))
i--
dAtA[i] = 0x20
}
if m.From != 0 {
i = encodeVarintLogproto(dAtA, i, uint64(m.From))
i--
dAtA[i] = 0x18
}
if m.Limit != 0 {
i = encodeVarintLogproto(dAtA, i, uint64(m.Limit))
i--
@ -8063,6 +8084,12 @@ func (m *VolumeResponse) Size() (n int) {
if m.Limit != 0 {
n += 1 + sovLogproto(uint64(m.Limit))
}
if m.From != 0 {
n += 1 + sovLogproto(uint64(m.From))
}
if m.Through != 0 {
n += 1 + sovLogproto(uint64(m.Through))
}
return n
}
@ -8672,6 +8699,8 @@ func (this *VolumeResponse) String() string {
s := strings.Join([]string{`&VolumeResponse{`,
`Volumes:` + repeatedStringForVolumes + `,`,
`Limit:` + fmt.Sprintf("%v", this.Limit) + `,`,
`From:` + fmt.Sprintf("%v", this.From) + `,`,
`Through:` + fmt.Sprintf("%v", this.Through) + `,`,
`}`,
}, "")
return s
@ -14253,6 +14282,44 @@ func (m *VolumeResponse) Unmarshal(dAtA []byte) error {
break
}
}
case 3:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field From", wireType)
}
m.From = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowLogproto
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.From |= github_com_prometheus_common_model.Time(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 4:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Through", wireType)
}
m.Through = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowLogproto
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Through |= github_com_prometheus_common_model.Time(b&0x7F) << shift
if b < 0x80 {
break
}
}
default:
iNdEx = preIndex
skippy, err := skipLogproto(dAtA[iNdEx:])

@ -397,6 +397,14 @@ message VolumeRequest {
message VolumeResponse {
repeated Volume volumes = 1 [(gogoproto.nullable) = false];
int32 limit = 2;
int64 from = 3 [
(gogoproto.customtype) = "github.com/prometheus/common/model.Time",
(gogoproto.nullable) = false
];
int64 through = 4 [
(gogoproto.customtype) = "github.com/prometheus/common/model.Time",
(gogoproto.nullable) = false
];
}
message Volume {

@ -199,10 +199,18 @@ func TestQueryWrapperMiddleware(t *testing.T) {
}
func TestSeriesVolumeHandler(t *testing.T) {
now := time.Now()
t1 := model.TimeFromUnix(now.Add(-time.Hour).Unix())
t2 := model.TimeFromUnix(now.Add(-time.Minute).Unix())
t.Run("it returns label volumes from the querier", func(t *testing.T) {
ret := &logproto.VolumeResponse{Volumes: []logproto.Volume{
{Name: "foo", Value: "bar", Volume: 38},
}}
ret := &logproto.VolumeResponse{
Volumes: []logproto.Volume{
{Name: `{foo="bar"}`, Value: "", Volume: 38},
},
From: t1,
Through: t2,
}
querier := newQuerierMock()
querier.On("SeriesVolume", mock.Anything, mock.Anything).Return(ret, nil)
@ -225,7 +233,14 @@ func TestSeriesVolumeHandler(t *testing.T) {
Limit: 100,
}, calls[0].Arguments[1])
require.Equal(t, strings.TrimSpace(w.Body.String()), `{"volumes":[{"name":"foo","value":"bar","volume":38}]}`)
require.Equal(
t,
fmt.Sprintf(
`{"volumes":[{"name":"{foo=\"bar\"}","value":"","volume":38}],"from":%s,"through":%s}`,
t1, t2,
),
strings.TrimSpace(w.Body.String()),
)
require.Equal(t, http.StatusOK, w.Result().StatusCode)
})
@ -245,7 +260,7 @@ func TestSeriesVolumeHandler(t *testing.T) {
calls := querier.GetMockedCallsByMethod("SeriesVolume")
require.Len(t, calls, 1)
require.Equal(t, strings.TrimSpace(w.Body.String()), `{"volumes":[]}`)
require.Equal(t, strings.TrimSpace(w.Body.String()), `{"volumes":[],"from":0,"through":0}`)
require.Equal(t, http.StatusOK, w.Result().StatusCode)
})

@ -28,6 +28,7 @@ import (
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase/definitions"
indexStats "github.com/grafana/loki/pkg/storage/stores/index/stats"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/httpreq"
@ -744,23 +745,84 @@ func (Codec) MergeResponse(responses ...queryrangebase.Response) (queryrangebase
}, nil
case *VolumeResponse:
resp0 := responses[0].(*VolumeResponse)
headers := resp0.Headers
headers := make([]*definitions.PrometheusResponseHeader, len(resp0.Headers))
for i, header := range resp0.Headers {
h := header
headers[i] = &h
}
resps := make([]*logproto.VolumeResponse, 0, len(responses))
for _, r := range responses {
resps = append(resps, r.(*VolumeResponse).Response)
}
return &VolumeResponse{
Response: seriesvolume.Merge(resps, resp0.Response.Limit),
Headers: headers,
}, nil
promResponse := queryrangebase.PrometheusResponse{
Status: loghttp.QueryStatusSuccess,
Data: MergeToPrometheusResponse(resps, resp0.Response.Limit),
Headers: headers,
}
return &LokiPromResponse{
Response: &promResponse,
Statistics: stats.Result{},
}, nil
default:
return nil, errors.New("unknown response in merging responses")
}
}
func MergeToPrometheusResponse(responses []*logproto.VolumeResponse, limit int32) queryrangebase.PrometheusData {
mergedVolumes := seriesvolume.Merge(responses, limit)
return mapToPrometheusResponse(mergedVolumes)
}
func mapToPrometheusResponse(mergedResponse *logproto.VolumeResponse) queryrangebase.PrometheusData {
samplesByStream := map[string]*logproto.LegacySample{}
// since this is an instant response, we're only interested in a single "bucket", which is
// bounded by the latest timestamp we've seen (which should correspond to the end of the query range)
tsMs := mergedResponse.Through.UnixNano() / 1e6 //convert ns to ms
// Aggregate samples into single sample with latest timestamp
for _, volume := range mergedResponse.Volumes {
if _, ok := samplesByStream[volume.Name]; !ok {
samplesByStream[volume.Name] = &logproto.LegacySample{
TimestampMs: tsMs,
}
}
samplesByStream[volume.Name].Value += float64(volume.Volume)
}
result := make([]queryrangebase.SampleStream, 0, len(samplesByStream))
for stream, sample := range samplesByStream {
lbls, err := syntax.ParseLabels(stream)
if err != nil {
continue
}
result = append(result, queryrangebase.SampleStream{
Labels: logproto.FromLabelsToLabelAdapters(lbls),
Samples: []logproto.LegacySample{*sample},
})
}
// sort to enusre consistent ordering in results
// this only works because all samples in this result set have the same timestamp
// and each sample stream only has a single sample.
sort.Slice(result, func(i, j int) bool {
if result[i].Samples[0].Value == result[j].Samples[0].Value {
return result[i].Labels[0].Name < result[j].Labels[0].Name
}
return result[i].Samples[0].Value > result[j].Samples[0].Value
})
return queryrangebase.PrometheusData{
ResultType: loghttp.ResultTypeVector,
Result: result,
}
}
// mergeOrderedNonOverlappingStreams merges a set of ordered, nonoverlapping responses by concatenating matching streams then running them through a heap to pull out limit values
func mergeOrderedNonOverlappingStreams(resps []*LokiResponse, limit uint32, direction logproto.Direction) []logproto.Stream {
var total int

@ -265,7 +265,7 @@ func Test_codec_DecodeResponse(t *testing.T) {
&VolumeResponse{
Response: &logproto.VolumeResponse{
Volumes: []logproto.Volume{
{Name: "foo", Value: "bar", Volume: 38},
{Name: `{foo="bar"}`, Value: "", Volume: 38},
},
Limit: 100,
},
@ -522,9 +522,11 @@ func Test_codec_EncodeResponse(t *testing.T) {
&VolumeResponse{
Response: &logproto.VolumeResponse{
Volumes: []logproto.Volume{
{Name: "foo", Value: "bar", Volume: 38},
{Name: `{foo="bar"}`, Value: "", Volume: 38},
},
Limit: 100,
Limit: 100,
From: 0,
Through: 0,
},
}, labelVolumeString, false,
},
@ -1012,6 +1014,184 @@ func Test_codec_MergeResponse(t *testing.T) {
}
}
func Test_codec_MergeResponse_Volume(t *testing.T) {
now := time.Now()
from := model.TimeFromUnix(now.Add(-time.Hour).Unix())
through := model.TimeFromUnix(now.Add(-time.Minute).Unix())
t.Run("converts to prometheus response, merging results for the same timestamp", func(t *testing.T) {
responses := []queryrangebase.Response{
&VolumeResponse{
Response: &logproto.VolumeResponse{
Volumes: []logproto.Volume{
{
Name: `{job="prometheus"}`,
Value: "",
Volume: 150,
},
{
Name: `{job="loki"}`,
Value: "",
Volume: 300,
},
},
From: from,
Through: through,
Limit: 5,
},
},
&VolumeResponse{
Response: &logproto.VolumeResponse{
Volumes: []logproto.Volume{
{
Name: `{job="prometheus"}`,
Value: "",
Volume: 100,
},
{
Name: `{job="loki"}`,
Value: "",
Volume: 200,
},
},
From: from,
Through: through,
Limit: 5,
},
},
}
got, err := LokiCodec.MergeResponse(responses...)
require.NoError(t, err)
expected := got.(*LokiPromResponse).Response
require.Equal(t, expected.Status, "success")
require.Equal(t, expected.Data.ResultType, "vector")
require.Len(t, expected.Data.Result, 2)
require.Equal(t, expected.Data.Result, []queryrangebase.SampleStream{{
Labels: []logproto.LabelAdapter{{Name: "job", Value: "loki"}},
Samples: []logproto.LegacySample{{Value: 500, TimestampMs: through.Unix() * 1e3}},
}, {
Labels: []logproto.LabelAdapter{{Name: "job", Value: "prometheus"}},
Samples: []logproto.LegacySample{{Value: 250, TimestampMs: through.Unix() * 1e3}},
}})
})
t.Run("converts to prometheus response, aggregating all samples per series into the latest timestamp", func(t *testing.T) {
responses := []queryrangebase.Response{
&VolumeResponse{
Response: &logproto.VolumeResponse{
Volumes: []logproto.Volume{
{
Name: `{job="prometheus"}`,
Value: "",
Volume: 150,
},
{
Name: `{job="loki"}`,
Value: "",
Volume: 300,
},
},
From: from,
Through: through,
Limit: 5,
},
},
&VolumeResponse{
Response: &logproto.VolumeResponse{
Volumes: []logproto.Volume{
{
Name: `{job="prometheus"}`,
Value: "",
Volume: 100,
},
{
Name: `{job="loki"}`,
Value: "",
Volume: 200,
},
},
From: from,
Through: through,
Limit: 5,
},
},
}
got, err := LokiCodec.MergeResponse(responses...)
require.NoError(t, err)
expected := got.(*LokiPromResponse).Response
require.Len(t, expected.Data.Result, 2)
require.Contains(t, expected.Data.Result, queryrangebase.SampleStream{
Labels: []logproto.LabelAdapter{{Name: "job", Value: "prometheus"}},
Samples: []logproto.LegacySample{{Value: 250, TimestampMs: through.Unix() * 1e3}},
})
require.Contains(t, expected.Data.Result, queryrangebase.SampleStream{
Labels: []logproto.LabelAdapter{{Name: "job", Value: "loki"}},
Samples: []logproto.LegacySample{{Value: 500, TimestampMs: through.Unix() * 1e3}},
})
})
t.Run("limits number of series return to limit parameter", func(t *testing.T) {
responses := []queryrangebase.Response{
&VolumeResponse{
Response: &logproto.VolumeResponse{
Volumes: []logproto.Volume{
{
Name: `{job="prometheus"}`,
Value: "",
Volume: 150,
},
{
Name: `{cluster="dev"}`,
Value: "",
Volume: 300,
},
},
From: from,
Through: through,
Limit: 1,
},
},
&VolumeResponse{
Response: &logproto.VolumeResponse{
Volumes: []logproto.Volume{
{
Name: `{job="prometheus"}`,
Value: "",
Volume: 100,
},
{
Name: `{cluster="dev"}`,
Value: "",
Volume: 200,
},
},
From: from,
Through: through,
Limit: 1,
},
},
}
got, err := LokiCodec.MergeResponse(responses...)
require.NoError(t, err)
castedGot := got.(*LokiPromResponse).Response
require.Len(t, castedGot.Data.Result, 1)
require.Equal(t, queryrangebase.SampleStream{
Labels: []logproto.LabelAdapter{{Name: "cluster", Value: "dev"}},
Samples: []logproto.LegacySample{{Value: 500, TimestampMs: through.Unix() * 1e3}},
}, castedGot.Data.Result[0])
})
}
type badResponse struct{}
func (badResponse) Reset() {}
@ -1249,15 +1429,17 @@ var (
"entries": 4
}`
labelVolumeString = `{
"volumes": [
{
"name": "foo",
"value": "bar",
"volume": 38
}
],
"limit": 100
}`
"from": 0,
"limit": 100,
"through": 0,
"volumes": [
{
"name": "{foo=\"bar\"}",
"value": "",
"volume": 38
}
]
}`
labelsData = []string{"foo", "bar"}
statsResult = stats.Result{
Summary: stats.Summary{

@ -24,6 +24,7 @@ import (
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logqlmodel"
@ -134,7 +135,9 @@ var (
{Name: `{foo="bar"}`, Value: "", Volume: 1024},
{Name: `{bar="baz"}`, Value: "", Volume: 3350},
},
Limit: 10,
From: model.TimeFromUnix(testTime.Add(-4 * time.Hour).Unix()),
Through: model.TimeFromUnix(testTime.Add(-1 * time.Hour).Unix()),
Limit: 5,
}
)
@ -580,19 +583,39 @@ func TestSeriesVolumeTripperware(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 2, *count) // 2 queries from splitting
volumeResp, err := LokiCodec.DecodeResponse(ctx, resp, lreq)
volumeResp, err := LokiCodec.DecodeResponse(ctx, resp, nil)
require.NoError(t, err)
expected := logproto.VolumeResponse{
Volumes: []logproto.Volume{ // add volumes from across shards
{Name: `{bar="baz"}`, Value: "", Volume: 6700},
{Name: `{foo="bar"}`, Value: "", Volume: 2048},
expected := queryrangebase.PrometheusData{
ResultType: loghttp.ResultTypeVector,
Result: []queryrangebase.SampleStream{
{
Labels: []logproto.LabelAdapter{{
Name: "bar",
Value: "baz",
}},
Samples: []logproto.LegacySample{{
Value: 6700,
TimestampMs: testTime.Add(-1*time.Hour).Unix() * 1e3,
}},
},
{
Labels: []logproto.LabelAdapter{{
Name: "foo",
Value: "bar",
}},
Samples: []logproto.LegacySample{{
Value: 2048,
TimestampMs: testTime.Add(-1*time.Hour).Unix() * 1e3,
}},
},
},
}
res, ok := volumeResp.(*VolumeResponse)
res, ok := volumeResp.(*LokiPromResponse)
require.Equal(t, true, ok)
require.Equal(t, expected.Volumes, res.Response.Volumes)
require.Equal(t, "success", res.Response.Status)
require.Equal(t, expected, res.Response.Data)
}
func TestNewTripperware_Caches(t *testing.T) {

@ -200,7 +200,10 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrangebase.Request) (que
sp.LogFields(otlog.Int("n_intervals", len(intervals)))
}
if len(intervals) == 1 {
// We always need to merge VolumeRequests as that's how they get turned into
// Prometheus style metric responses. For all other types we can skip merging
// if there is only one interval.
if _, ok := r.(*logproto.VolumeRequest); len(intervals) == 1 && !ok {
return h.next.Do(ctx, intervals[0])
}

@ -853,59 +853,95 @@ func Test_series_splitByInterval_Do(t *testing.T) {
func Test_seriesvolume_splitByInterval_Do(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "1")
next := queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
return &VolumeResponse{
Response: &logproto.VolumeResponse{
Volumes: []logproto.Volume{
{Name: `{foo="bar"}`, Value: "", Volume: 38},
{Name: `{bar="baz"}`, Value: "", Volume: 28},
},
Limit: 1},
Headers: nil,
}, nil
})
setup := func(next queryrangebase.Handler) queryrangebase.Handler {
l := WithSplitByLimits(fakeLimits{maxQueryParallelism: 1}, time.Hour)
return SplitByIntervalMiddleware(
testSchemas,
l,
LokiCodec,
splitByTime,
nilMetrics,
).Wrap(next)
}
l := WithSplitByLimits(fakeLimits{maxQueryParallelism: 1}, time.Hour)
split := SplitByIntervalMiddleware(
testSchemas,
l,
LokiCodec,
splitByTime,
nilMetrics,
).Wrap(next)
t.Run("label volumes", func(t *testing.T) {
from := model.TimeFromUnixNano(start.UnixNano())
through := model.TimeFromUnixNano(end.UnixNano())
tests := []struct {
name string
req *logproto.VolumeRequest
want *VolumeResponse
}{
{
"label volumes",
&logproto.VolumeRequest{
From: model.TimeFromUnixNano(start.UnixNano()),
Through: model.TimeFromUnixNano(end.UnixNano()),
Matchers: "{}",
Limit: 1,
},
&VolumeResponse{
next := queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
return &VolumeResponse{
Response: &logproto.VolumeResponse{
Volumes: []logproto.Volume{
{Name: `{foo="bar"}`, Value: "", Volume: 76},
{Name: `{foo="bar"}`, Value: "", Volume: 38},
{Name: `{bar="baz"}`, Value: "", Volume: 28},
},
Limit: 1,
},
From: from,
Through: through,
Limit: 2},
Headers: nil,
},
},
}
}, nil
})
split := setup(next)
req := &logproto.VolumeRequest{
From: from,
Through: through,
Matchers: "{}",
Limit: 2,
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
res, err := split.Do(ctx, tt.req)
require.NoError(t, err)
require.Equal(t, tt.want, res)
res, err := split.Do(ctx, req)
require.NoError(t, err)
response := res.(*LokiPromResponse)
require.Len(t, response.Response.Data.Result, 2)
require.Contains(t, response.Response.Data.Result, queryrangebase.SampleStream{
Labels: []logproto.LabelAdapter{{Name: "foo", Value: "bar"}},
Samples: []logproto.LegacySample{{TimestampMs: end.Unix() * 1e3, Value: 76}},
})
}
require.Contains(t, response.Response.Data.Result, queryrangebase.SampleStream{
Labels: []logproto.LabelAdapter{{Name: "bar", Value: "baz"}},
Samples: []logproto.LegacySample{{TimestampMs: end.Unix() * 1e3, Value: 56}},
})
})
t.Run("label volumes with limits", func(t *testing.T) {
from := model.TimeFromUnixNano(start.UnixNano())
through := model.TimeFromUnixNano(end.UnixNano())
next := queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
return &VolumeResponse{
Response: &logproto.VolumeResponse{
Volumes: []logproto.Volume{
{Name: `{foo="bar"}`, Value: "", Volume: 38},
{Name: `{bar="baz"}`, Value: "", Volume: 28},
{Name: `{foo="bar"}`, Value: "", Volume: 38},
{Name: `{fizz="buzz"}`, Value: "", Volume: 28},
},
From: from,
Through: through,
Limit: 1},
Headers: nil,
}, nil
})
split := setup(next)
req := &logproto.VolumeRequest{
From: from,
Through: through,
Matchers: "{}",
Limit: 1,
}
res, err := split.Do(ctx, req)
require.NoError(t, err)
response := res.(*LokiPromResponse)
require.Len(t, response.Response.Data.Result, 1)
require.Contains(t, response.Response.Data.Result, queryrangebase.SampleStream{
Labels: []logproto.LabelAdapter{{Name: "foo", Value: "bar"}},
Samples: []logproto.LegacySample{{TimestampMs: end.Unix() * 1e3, Value: 152}},
})
})
}
func Test_ExitEarly(t *testing.T) {

@ -3,6 +3,9 @@ package seriesvolume
import (
"sort"
"sync"
"time"
"github.com/prometheus/common/model"
"github.com/grafana/loki/pkg/logproto"
)
@ -35,15 +38,16 @@ func (acc *Accumulator) AddVolumes(volumes map[string]uint64) {
}
}
func (acc *Accumulator) Volumes() *logproto.VolumeResponse {
func (acc *Accumulator) Volumes(from, through time.Time) *logproto.VolumeResponse {
acc.lock.RLock()
defer acc.lock.RUnlock()
return MapToSeriesVolumeResponse(acc.volumes, int(acc.limit))
return MapToSeriesVolumeResponse(acc.volumes, int(acc.limit), from, through)
}
func Merge(responses []*logproto.VolumeResponse, limit int32) *logproto.VolumeResponse {
mergedVolumes := make(map[string]uint64)
var from, through time.Time
for _, res := range responses {
if res == nil {
@ -51,15 +55,26 @@ func Merge(responses []*logproto.VolumeResponse, limit int32) *logproto.VolumeRe
continue
}
resFrom, resThrough := res.From.Time(), res.Through.Time()
if resFrom.Before(from) || from.IsZero() {
from = resFrom
}
if resThrough.After(through) || through.IsZero() {
through = resThrough
}
for _, v := range res.Volumes {
mergedVolumes[v.Name] += v.GetVolume()
}
}
return MapToSeriesVolumeResponse(mergedVolumes, int(limit))
return MapToSeriesVolumeResponse(mergedVolumes, int(limit), from, through)
}
func MapToSeriesVolumeResponse(mergedVolumes map[string]uint64, limit int) *logproto.VolumeResponse {
func MapToSeriesVolumeResponse(mergedVolumes map[string]uint64, limit int, from, through time.Time) *logproto.VolumeResponse {
volumes := make([]logproto.Volume, 0, len(mergedVolumes))
for name, size := range mergedVolumes {
volumes = append(volumes, logproto.Volume{
@ -88,5 +103,7 @@ func MapToSeriesVolumeResponse(mergedVolumes map[string]uint64, limit int) *logp
return &logproto.VolumeResponse{
Volumes: volumes,
Limit: int32(limit),
From: model.TimeFromUnixNano(from.UnixNano()),
Through: model.TimeFromUnixNano(through.UnixNano()),
}
}

@ -0,0 +1,361 @@
package seriesvolume
import (
"testing"
"time"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/logproto"
)
func Test_AddVolumes(t *testing.T) {
now := time.Now()
t1 := now.Add(-time.Hour)
t2 := now.Add(-time.Minute)
volumes := map[string]uint64{
`{job: "loki"}`: 5,
`{job: "prometheus"}`: 10,
`{cluster: "dev"}`: 25,
`{cluster: "prod"}`: 50,
}
t.Run("accumulates values for the same series", func(t *testing.T) {
acc := NewAccumulator(4)
acc.AddVolumes(volumes)
resp := acc.Volumes(t1, t2)
require.Equal(t, &logproto.VolumeResponse{
Volumes: []logproto.Volume{
{
Name: "{cluster: \"prod\"}",
Value: "",
Volume: 50,
},
{
Name: "{cluster: \"dev\"}",
Value: "",
Volume: 25,
},
{
Name: "{job: \"prometheus\"}",
Value: "",
Volume: 10,
},
{
Name: "{job: \"loki\"}",
Value: "",
Volume: 5,
},
},
Limit: 4,
From: model.TimeFromUnixNano(t1.UnixNano()),
Through: model.TimeFromUnixNano(t2.UnixNano()),
}, resp)
acc.AddVolumes(volumes)
resp = acc.Volumes(t1, t2)
require.Equal(t, &logproto.VolumeResponse{
Volumes: []logproto.Volume{
{
Name: "{cluster: \"prod\"}",
Value: "",
Volume: 100,
},
{
Name: "{cluster: \"dev\"}",
Value: "",
Volume: 50,
},
{
Name: "{job: \"prometheus\"}",
Value: "",
Volume: 20,
},
{
Name: "{job: \"loki\"}",
Value: "",
Volume: 10,
},
},
Limit: 4,
From: model.TimeFromUnixNano(t1.UnixNano()),
Through: model.TimeFromUnixNano(t2.UnixNano()),
}, resp)
})
t.Run("sorts label value pairs by volume", func(t *testing.T) {
acc := NewAccumulator(5)
acc.AddVolumes(volumes)
resp := acc.Volumes(t1, t2)
require.Equal(t, &logproto.VolumeResponse{
Volumes: []logproto.Volume{
{
Name: "{cluster: \"prod\"}",
Value: "",
Volume: 50,
},
{
Name: "{cluster: \"dev\"}",
Value: "",
Volume: 25,
},
{
Name: "{job: \"prometheus\"}",
Value: "",
Volume: 10,
},
{
Name: "{job: \"loki\"}",
Value: "",
Volume: 5,
},
},
Limit: 5,
From: model.TimeFromUnixNano(t1.UnixNano()),
Through: model.TimeFromUnixNano(t2.UnixNano()),
}, resp)
})
t.Run("applies limit", func(t *testing.T) {
acc := NewAccumulator(2)
volumes := map[string]uint64{
`{job: "loki"}`: 5,
`{job: "prometheus"}`: 10,
`{job: "mimir"}`: 1,
}
acc.AddVolumes(volumes)
volumes = map[string]uint64{
`{job: "loki"}`: 20,
`{job: "prometheus"}`: 30,
`{job: "mimir"}`: 1,
}
acc.AddVolumes(volumes)
resp := acc.Volumes(t1, t2)
require.Equal(t, &logproto.VolumeResponse{
Volumes: []logproto.Volume{
{
Name: "{job: \"prometheus\"}",
Value: "",
Volume: 40,
},
{
Name: "{job: \"loki\"}",
Value: "",
Volume: 25,
},
},
Limit: 2,
From: model.TimeFromUnixNano(t1.UnixNano()),
Through: model.TimeFromUnixNano(t2.UnixNano()),
}, resp)
})
}
func Test_Merge(t *testing.T) {
t.Run("merges and sorts multiple volume responses into a single response with values aggregated", func(t *testing.T) {
limit := int32(5)
responses := []*logproto.VolumeResponse{
{
Volumes: []logproto.Volume{
{
Name: "{cluster: \"dev\"}",
Value: "",
Volume: 25,
},
{
Name: "{cluster: \"prod\"}",
Value: "",
Volume: 50,
},
},
Limit: limit,
},
{
Volumes: []logproto.Volume{
{
Name: "{cluster: \"dev\"}",
Value: "",
Volume: 25,
},
{
Name: "{job: \"foo\"}",
Value: "",
Volume: 15,
},
{
Name: "{cluster: \"prod\"}",
Value: "",
Volume: 50,
},
},
Limit: limit,
},
}
mergedResponse := Merge(responses, limit)
require.Equal(t, &logproto.VolumeResponse{
Volumes: []logproto.Volume{
{
Name: "{cluster: \"prod\"}",
Value: "",
Volume: 100,
},
{
Name: "{cluster: \"dev\"}",
Value: "",
Volume: 50,
},
{
Name: "{job: \"foo\"}",
Value: "",
Volume: 15,
},
},
Limit: limit,
From: 0,
Through: 0,
}, mergedResponse)
})
t.Run("applies limit to return N biggest series", func(t *testing.T) {
limit := int32(2)
responses := []*logproto.VolumeResponse{
{
Volumes: []logproto.Volume{
{
Name: "{cluster: \"dev\"}",
Value: "",
Volume: 25,
},
{
Name: "{cluster: \"prod\"}",
Value: "",
Volume: 50,
},
},
Limit: limit,
},
{
Volumes: []logproto.Volume{
{
Name: "{cluster: \"dev\"}",
Value: "",
Volume: 25,
},
{
Name: "{job: \"foo\"}",
Value: "",
Volume: 15,
},
{
Name: "{cluster: \"prod\"}",
Value: "",
Volume: 50,
},
},
Limit: limit,
},
}
mergedResponse := Merge(responses, limit)
require.Equal(t, &logproto.VolumeResponse{
Volumes: []logproto.Volume{
{
Name: "{cluster: \"prod\"}",
Value: "",
Volume: 100,
},
{
Name: "{cluster: \"dev\"}",
Value: "",
Volume: 50,
},
},
Limit: limit,
From: 0,
Through: 0,
}, mergedResponse)
})
t.Run("aggregates responses into earliest from and latest through timestamp of input", func(t *testing.T) {
limit := int32(5)
now := time.Now()
oneHourFromNow := now.Add(time.Hour)
oneHourAgo := now.Add(-time.Hour)
responses := []*logproto.VolumeResponse{
{
Volumes: []logproto.Volume{
{
Name: "{cluster: \"dev\"}",
Value: "",
Volume: 25,
},
{
Name: "{cluster: \"prod\"}",
Value: "",
Volume: 50,
},
},
Limit: limit,
From: 5,
Through: model.TimeFromUnixNano(oneHourAgo.UnixNano()),
},
{
Volumes: []logproto.Volume{
{
Name: "{cluster: \"dev\"}",
Value: "",
Volume: 25,
},
{
Name: "{job: \"foo\"}",
Value: "",
Volume: 15,
},
{
Name: "{cluster: \"prod\"}",
Value: "",
Volume: 50,
},
},
Limit: limit,
From: 10,
Through: model.TimeFromUnixNano(oneHourFromNow.UnixNano()),
},
}
mergedResponse := Merge(responses, limit)
require.Equal(t, &logproto.VolumeResponse{
Volumes: []logproto.Volume{
{
Name: "{cluster: \"prod\"}",
Value: "",
Volume: 100,
},
{
Name: "{cluster: \"dev\"}",
Value: "",
Volume: 50,
},
{
Name: "{job: \"foo\"}",
Value: "",
Volume: 15,
},
},
Limit: limit,
From: 5,
Through: model.TimeFromUnixNano(oneHourFromNow.UnixNano()),
}, mergedResponse)
})
}

@ -49,7 +49,7 @@ type IndexStatsAccumulator interface {
type SeriesVolumeAccumulator interface {
AddVolumes(map[string]uint64)
Volumes() *logproto.VolumeResponse
Volumes(from, through time.Time) *logproto.VolumeResponse
}
func NewIndexClient(idx Index, opts IndexClientOptions) *IndexClient {
@ -278,7 +278,7 @@ func (c *IndexClient) SeriesVolume(ctx context.Context, userID string, from, thr
return nil, err
}
return acc.Volumes(), nil
return acc.Volumes(from.Time(), through.Time()), nil
}
// SetChunkFilterer sets a chunk filter to be used when retrieving chunks.

@ -271,9 +271,11 @@ func TestIndexClient_SeriesVolume(t *testing.T) {
})
indexClient := NewIndexClient(idx, IndexClientOptions{UseBloomFilters: true})
from := indexStartYesterday
through := indexStartToday + 1000
t.Run("it returns series volumes from the whole index", func(t *testing.T) {
vol, err := indexClient.SeriesVolume(context.Background(), "", indexStartYesterday, indexStartToday+1000, 10, nil...)
vol, err := indexClient.SeriesVolume(context.Background(), "", from, through, 10, nil...)
require.NoError(t, err)
require.Equal(t, &logproto.VolumeResponse{
@ -283,17 +285,23 @@ func TestIndexClient_SeriesVolume(t *testing.T) {
{Name: `{fizz="buzz"}`, Value: "", Volume: 100 * 1024},
{Name: `{ping="pong"}`, Value: "", Volume: 100 * 1024},
},
Limit: 10}, vol)
Limit: 10,
From: from,
Through: through,
}, vol)
})
t.Run("it returns largest series from the index", func(t *testing.T) {
vol, err := indexClient.SeriesVolume(context.Background(), "", indexStartYesterday, indexStartToday+1000, 1, nil...)
vol, err := indexClient.SeriesVolume(context.Background(), "", from, through, 1, nil...)
require.NoError(t, err)
require.Equal(t, &logproto.VolumeResponse{
Volumes: []logproto.Volume{
{Name: `{foo="bar"}`, Value: "", Volume: 200 * 1024},
},
Limit: 1}, vol)
Limit: 1,
From: from,
Through: through,
}, vol)
})
}

@ -392,7 +392,6 @@ func (i *TSDBIndex) SeriesVolume(ctx context.Context, _ string, from, through mo
if err != nil {
return err
}
acc.AddVolumes(volumes)
return nil
}

@ -371,21 +371,25 @@ func TestTSDBIndex_Stats(t *testing.T) {
}
func TestTSDBIndex_SeriesVolume(t *testing.T) {
now := time.Now()
t1 := now.Add(-time.Hour)
t2 := now.Add(-time.Minute)
series := []LoadableSeries{
{
Labels: mustParseLabels(`{foo="bar", fizz="buzz", __loki_tenant__="fake"}`),
Chunks: []index.ChunkMeta{
{
MinTime: 0,
MaxTime: 10,
MinTime: t1.UnixMilli(),
MaxTime: t1.Add(30 * time.Minute).UnixMilli(),
Checksum: 1,
Entries: 10,
KB: 10,
},
{
MinTime: 10,
MaxTime: 20,
MinTime: t1.Add(30 * time.Minute).UnixMilli(),
MaxTime: t2.UnixMilli(),
Checksum: 2,
Entries: 20,
KB: 20,
@ -396,15 +400,15 @@ func TestTSDBIndex_SeriesVolume(t *testing.T) {
Labels: mustParseLabels(`{foo="bar", fizz="fizz", __loki_tenant__="fake"}`),
Chunks: []index.ChunkMeta{
{
MinTime: 0,
MaxTime: 10,
MinTime: t1.UnixMilli(),
MaxTime: t1.Add(30 * time.Minute).UnixMilli(),
Checksum: 3,
Entries: 30,
KB: 30,
},
{
MinTime: 10,
MaxTime: 20,
MinTime: t1.Add(30 * time.Minute).UnixMilli(),
MaxTime: t2.UnixMilli(),
Checksum: 4,
Entries: 40,
KB: 40,
@ -417,17 +421,23 @@ func TestTSDBIndex_SeriesVolume(t *testing.T) {
tempDir := t.TempDir()
tsdbIndex := BuildIndex(t, tempDir, series)
from := model.TimeFromUnixNano(t1.UnixNano())
through := model.TimeFromUnixNano(t2.UnixNano())
t.Run("it matches all the series when the match all matcher is passed", func(t *testing.T) {
matcher := labels.MustNewMatcher(labels.MatchEqual, "", "")
acc := seriesvolume.NewAccumulator(10)
err := tsdbIndex.SeriesVolume(context.Background(), "fake", 0, 20, acc, nil, nil, matcher)
err := tsdbIndex.SeriesVolume(context.Background(), "fake", from, through, acc, nil, nil, matcher)
require.NoError(t, err)
require.Equal(t, &logproto.VolumeResponse{
Volumes: []logproto.Volume{
{Name: `{fizz="fizz", foo="bar"}`, Value: "", Volume: (30 + 40) * 1024},
{Name: `{fizz="buzz", foo="bar"}`, Volume: (10 + 20) * 1024},
{Name: `{fizz="buzz", foo="bar"}`, Value: "", Volume: (10 + 20) * 1024},
},
Limit: 10}, acc.Volumes())
Limit: 10,
From: from,
Through: through,
}, acc.Volumes(t1, t2))
})
t.Run("it ignores the tenant label matcher", func(t *testing.T) {
@ -436,36 +446,64 @@ func TestTSDBIndex_SeriesVolume(t *testing.T) {
labels.MustNewMatcher(labels.MatchRegexp, "foo", ".+"),
}
acc := seriesvolume.NewAccumulator(10)
err := tsdbIndex.SeriesVolume(context.Background(), "fake", 0, 20, acc, nil, nil, withTenantLabelMatcher("fake", matcher)...)
err := tsdbIndex.SeriesVolume(context.Background(), "fake", from, through, acc, nil, nil, withTenantLabelMatcher("fake", matcher)...)
require.NoError(t, err)
require.Equal(t, &logproto.VolumeResponse{
Volumes: []logproto.Volume{
{Name: `{fizz="fizz", foo="bar"}`, Value: "", Volume: (30 + 40) * 1024},
{Name: `{fizz="buzz", foo="bar"}`, Volume: (10 + 20) * 1024},
{Name: `{fizz="buzz", foo="bar"}`, Value: "", Volume: (10 + 20) * 1024},
},
Limit: 10}, acc.Volumes())
Limit: 10,
From: from,
Through: through,
}, acc.Volumes(t1, t2))
})
t.Run("it matches none of the series", func(t *testing.T) {
matcher := labels.MustNewMatcher(labels.MatchEqual, "foo", "baz")
acc := seriesvolume.NewAccumulator(10)
err := tsdbIndex.SeriesVolume(context.Background(), "fake", 0, 20, acc, nil, nil, matcher)
err := tsdbIndex.SeriesVolume(context.Background(), "fake", from, through, acc, nil, nil, matcher)
require.NoError(t, err)
require.Equal(t, &logproto.VolumeResponse{
Volumes: []logproto.Volume{},
Limit: 10}, acc.Volumes())
Limit: 10,
From: from,
Through: through,
}, acc.Volumes(t1, t2))
})
t.Run("it only returns results for the labels in the matcher", func(t *testing.T) {
matcher := labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")
acc := seriesvolume.NewAccumulator(10)
err := tsdbIndex.SeriesVolume(context.Background(), "fake", 0, 20, acc, nil, nil, matcher)
err := tsdbIndex.SeriesVolume(context.Background(), "fake", from, through, acc, nil, nil, matcher)
require.NoError(t, err)
require.Equal(t, &logproto.VolumeResponse{
Volumes: []logproto.Volume{
{Name: `{foo="bar"}`, Value: "", Volume: (10 + 20 + 30 + 40) * 1024},
},
Limit: 10}, acc.Volumes())
Limit: 10,
From: from,
Through: through,
}, acc.Volumes(t1, t2))
})
t.Run("it returns results for label names in matchers", func(t *testing.T) {
matchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
labels.MustNewMatcher(labels.MatchRegexp, "fizz", ".+"),
}
acc := seriesvolume.NewAccumulator(10)
err := tsdbIndex.SeriesVolume(context.Background(), "fake", from, through, acc, nil, nil, matchers...)
require.NoError(t, err)
require.Equal(t, &logproto.VolumeResponse{
Volumes: []logproto.Volume{
{Name: `{fizz="fizz", foo="bar"}`, Value: "", Volume: (30 + 40) * 1024},
{Name: `{fizz="buzz", foo="bar"}`, Value: "", Volume: (10 + 20) * 1024},
},
Limit: 10,
From: from,
Through: through,
}, acc.Volumes(t1, t2))
})
t.Run("it returns results for label names in matchers", func(t *testing.T) {
@ -474,14 +512,17 @@ func TestTSDBIndex_SeriesVolume(t *testing.T) {
labels.MustNewMatcher(labels.MatchRegexp, "fizz", ".+"),
}
acc := seriesvolume.NewAccumulator(10)
err := tsdbIndex.SeriesVolume(context.Background(), "fake", 0, 20, acc, nil, nil, matchers...)
err := tsdbIndex.SeriesVolume(context.Background(), "fake", from, through, acc, nil, nil, matchers...)
require.NoError(t, err)
require.Equal(t, &logproto.VolumeResponse{
Volumes: []logproto.Volume{
{Name: `{fizz="fizz", foo="bar"}`, Value: "", Volume: (30 + 40) * 1024},
{Name: `{fizz="buzz", foo="bar"}`, Volume: (10 + 20) * 1024},
},
Limit: 10}, acc.Volumes())
Limit: 10,
From: from,
Through: through,
}, acc.Volumes(t1, t2))
})
t.Run("it can filter chunks", func(t *testing.T) {
@ -490,12 +531,15 @@ func TestTSDBIndex_SeriesVolume(t *testing.T) {
matcher := labels.MustNewMatcher(labels.MatchEqual, "", "")
acc := seriesvolume.NewAccumulator(10)
err := tsdbIndex.SeriesVolume(context.Background(), "fake", 0, 20, acc, nil, nil, matcher)
err := tsdbIndex.SeriesVolume(context.Background(), "fake", from, through, acc, nil, nil, matcher)
require.NoError(t, err)
require.Equal(t, &logproto.VolumeResponse{
Volumes: []logproto.Volume{},
Limit: 10}, acc.Volumes())
Limit: 10,
From: from,
Through: through,
}, acc.Volumes(t1, t2))
})
}

Loading…
Cancel
Save