Filter instant queries and shard them. (#3984)

* Filter instan queries and shard them.

* [WIP] Test instant query sharding

* Trace casting error.

* WRap LokiInstantRequest for Params interface.

* Convert paras to request.

* Convert vector.

* Use proper query time stamp.

* Assert number of calls.

* Convert vector to multiple samples.

* Format code.

* Use type switch.

* Format code.

* Rename Matrix to proto conversion.

* Remvoe comment on caching.

* Update pkg/querier/queryrange/roundtrip_test.go

Co-authored-by: Cyril Tovena <cyril.tovena@gmail.com>

* Remove nil middlware.

* Handle LokiInstantRequest correctly.

* Assert reponse type.

* Return LokiPromResponse.

* Verify err in tests.

* Set shards.

* Correct result type.

* Use limits middleware.

* Use shars parameter in querier.

* Return vector.

* Unify params.

* Change direction to FORWARD.

Co-authored-by: Cyril Tovena <cyril.tovena@gmail.com>
pull/4034/head
Karsten Jeschkies 5 years ago committed by GitHub
parent 5970ab9b1e
commit 17f4a73a24
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      pkg/loghttp/query.go
  2. 2
      pkg/querier/http.go
  3. 209
      pkg/querier/queryrange/codec.go
  4. 17
      pkg/querier/queryrange/downstreamer.go
  5. 2
      pkg/querier/queryrange/downstreamer_test.go
  6. 633
      pkg/querier/queryrange/queryrange.pb.go
  7. 9
      pkg/querier/queryrange/queryrange.proto
  8. 41
      pkg/querier/queryrange/querysharding.go
  9. 65
      pkg/querier/queryrange/querysharding_test.go
  10. 92
      pkg/querier/queryrange/roundtrip.go
  11. 61
      pkg/querier/queryrange/roundtrip_test.go
  12. 5
      pkg/querier/queryrange/stats.go
  13. 6
      pkg/querier/queryrange/stats_test.go
  14. 4
      tools/dev/loki-boltdb-storage-s3/compose-up.sh

@ -244,6 +244,7 @@ type InstantQuery struct {
Ts time.Time
Limit uint32
Direction logproto.Direction
Shards []string
}
// ParseInstantQuery parses an InstantQuery request from an http request.
@ -261,6 +262,7 @@ func ParseInstantQuery(r *http.Request) (*InstantQuery, error) {
if err != nil {
return nil, err
}
request.Shards = shards(r)
request.Direction, err = direction(r)
if err != nil {

@ -96,7 +96,7 @@ func (q *Querier) InstantQueryHandler(w http.ResponseWriter, r *http.Request) {
0,
request.Direction,
request.Limit,
nil,
request.Shards,
)
query := q.engine.Query(params)
result, err := query.Exec(ctx)

@ -75,6 +75,48 @@ func (r *LokiRequest) LogToSpan(sp opentracing.Span) {
func (*LokiRequest) GetCachingOptions() (res queryrange.CachingOptions) { return }
func (r *LokiInstantRequest) GetStep() int64 {
return 0
}
func (r *LokiInstantRequest) GetEnd() int64 {
return r.TimeTs.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond))
}
func (r *LokiInstantRequest) GetStart() int64 {
return r.TimeTs.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond))
}
func (r *LokiInstantRequest) WithStartEnd(s int64, e int64) queryrange.Request {
new := *r
new.TimeTs = time.Unix(0, s*int64(time.Millisecond))
return &new
}
func (r *LokiInstantRequest) WithQuery(query string) queryrange.Request {
new := *r
new.Query = query
return &new
}
func (r *LokiInstantRequest) WithShards(shards logql.Shards) *LokiInstantRequest {
new := *r
new.Shards = shards.Encode()
return &new
}
func (r *LokiInstantRequest) LogToSpan(sp opentracing.Span) {
sp.LogFields(
otlog.String("query", r.GetQuery()),
otlog.String("ts", timestamp.Time(r.GetStart()).String()),
otlog.Int64("limit", int64(r.GetLimit())),
otlog.String("direction", r.GetDirection().String()),
otlog.String("shards", strings.Join(r.GetShards(), ",")),
)
}
func (*LokiInstantRequest) GetCachingOptions() (res queryrange.CachingOptions) { return }
func (r *LokiSeriesRequest) GetEnd() int64 {
return r.EndTs.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond))
}
@ -173,6 +215,19 @@ func (Codec) DecodeRequest(_ context.Context, r *http.Request) (queryrange.Reque
Path: r.URL.Path,
Shards: req.Shards,
}, nil
case InstantQueryOp:
req, err := loghttp.ParseInstantQuery(r)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
return &LokiInstantRequest{
Query: req.Query,
Limit: req.Limit,
Direction: req.Direction,
TimeTs: req.Ts.UTC(),
Path: r.URL.Path,
Shards: req.Shards,
}, nil
case SeriesOp:
req, err := logql.ParseAndValidateSeriesQuery(r)
if err != nil {
@ -267,6 +322,29 @@ func (Codec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.Req
Body: http.NoBody,
Header: http.Header{},
}
return req.WithContext(ctx), nil
case *LokiInstantRequest:
params := url.Values{
"query": []string{request.Query},
"direction": []string{request.Direction.String()},
"limit": []string{fmt.Sprintf("%d", request.Limit)},
}
if len(request.Shards) > 0 {
params["shards"] = request.Shards
}
u := &url.URL{
// the request could come /api/prom/query but we want to only use the new api.
Path: "/loki/api/v1/query",
RawQuery: params.Encode(),
}
req := &http.Request{
Method: "GET",
RequestURI: u.String(), // This is what the httpgrpc code looks at.
URL: u,
Body: http.NoBody,
Header: http.Header{},
}
return req.WithContext(ctx), nil
default:
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "invalid request format")
@ -343,18 +421,33 @@ func (Codec) DecodeResponse(ctx context.Context, r *http.Response, req queryrang
Status: resp.Status,
Data: queryrange.PrometheusData{
ResultType: loghttp.ResultTypeMatrix,
Result: toProto(resp.Data.Result.(loghttp.Matrix)),
Result: toProtoMatrix(resp.Data.Result.(loghttp.Matrix)),
},
Headers: convertPrometheusResponseHeadersToPointers(httpResponseHeadersToPromResponseHeaders(r.Header)),
},
Statistics: resp.Data.Statistics,
}, nil
case loghttp.ResultTypeStream:
// This is the same as in querysharding.go
params, err := paramsFromRequest(req)
if err != nil {
return nil, err
}
var path string
switch r := req.(type) {
case *LokiRequest:
path = r.GetPath()
case *LokiInstantRequest:
path = r.GetPath()
default:
return nil, fmt.Errorf("expected *LokiRequest or *LokiInstantRequest, got (%T)", r)
}
return &LokiResponse{
Status: resp.Status,
Direction: req.(*LokiRequest).Direction,
Limit: req.(*LokiRequest).Limit,
Version: uint32(loghttp.GetVersion(req.(*LokiRequest).Path)),
Direction: params.Direction(),
Limit: params.Limit(),
Version: uint32(loghttp.GetVersion(path)),
Statistics: resp.Data.Statistics,
Data: LokiData{
ResultType: loghttp.ResultTypeStream,
@ -362,8 +455,20 @@ func (Codec) DecodeResponse(ctx context.Context, r *http.Response, req queryrang
},
Headers: httpResponseHeadersToPromResponseHeaders(r.Header),
}, nil
case loghttp.ResultTypeVector:
return &LokiPromResponse{
Response: &queryrange.PrometheusResponse{
Status: resp.Status,
Data: queryrange.PrometheusData{
ResultType: loghttp.ResultTypeVector,
Result: toProtoVector(resp.Data.Result.(loghttp.Vector)),
},
Headers: convertPrometheusResponseHeadersToPointers(httpResponseHeadersToPromResponseHeaders(r.Header)),
},
Statistics: resp.Data.Statistics,
}, nil
default:
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "unsupported response type")
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "unsupported response type, got (%s)", string(resp.Data.ResultType))
}
}
}
@ -621,7 +726,7 @@ func mergeOrderedNonOverlappingStreams(resps []*LokiResponse, limit uint32, dire
return results
}
func toProto(m loghttp.Matrix) []queryrange.SampleStream {
func toProtoMatrix(m loghttp.Matrix) []queryrange.SampleStream {
if len(m) == 0 {
return nil
}
@ -642,6 +747,23 @@ func toProto(m loghttp.Matrix) []queryrange.SampleStream {
return res
}
func toProtoVector(v loghttp.Vector) []queryrange.SampleStream {
if len(v) == 0 {
return nil
}
res := make([]queryrange.SampleStream, 0, len(v))
for _, s := range v {
res = append(res, queryrange.SampleStream{
Samples: []cortexpb.Sample{{
Value: float64(s.Value),
TimestampMs: int64(s.Timestamp),
}},
Labels: cortexpb.FromMetricsToLabelAdapters(s.Metric),
})
}
return res
}
func (res LokiResponse) Count() int64 {
var result int64
for _, s := range res.Data.Result {
@ -650,38 +772,75 @@ func (res LokiResponse) Count() int64 {
return result
}
type paramsWrapper struct {
func paramsFromRequest(req queryrange.Request) (logql.Params, error) {
switch r := req.(type) {
case *LokiRequest:
return &paramsRangeWrapper{
LokiRequest: r,
}, nil
case *LokiInstantRequest:
return &paramsInstantWrapper{
LokiInstantRequest: r,
}, nil
default:
return nil, fmt.Errorf("expected *LokiRequest or *LokiInstantRequest, got (%T)", r)
}
}
type paramsRangeWrapper struct {
*LokiRequest
}
func paramsFromRequest(req queryrange.Request) *paramsWrapper {
return &paramsWrapper{
LokiRequest: req.(*LokiRequest),
}
func (p paramsRangeWrapper) Query() string {
return p.GetQuery()
}
func (p paramsRangeWrapper) Start() time.Time {
return p.GetStartTs()
}
func (p paramsRangeWrapper) End() time.Time {
return p.GetEndTs()
}
func (p paramsRangeWrapper) Step() time.Duration {
return time.Duration(p.GetStep() * 1e6)
}
func (p paramsRangeWrapper) Interval() time.Duration { return 0 }
func (p paramsRangeWrapper) Direction() logproto.Direction {
return p.GetDirection()
}
func (p paramsRangeWrapper) Limit() uint32 { return p.LokiRequest.Limit }
func (p paramsRangeWrapper) Shards() []string {
return p.GetShards()
}
type paramsInstantWrapper struct {
*LokiInstantRequest
}
func (p paramsWrapper) Query() string {
return p.LokiRequest.Query
func (p paramsInstantWrapper) Query() string {
return p.GetQuery()
}
func (p paramsWrapper) Start() time.Time {
return p.StartTs
func (p paramsInstantWrapper) Start() time.Time {
return p.LokiInstantRequest.GetTimeTs()
}
func (p paramsWrapper) End() time.Time {
return p.EndTs
func (p paramsInstantWrapper) End() time.Time {
return p.LokiInstantRequest.GetTimeTs()
}
func (p paramsWrapper) Step() time.Duration {
return time.Duration(p.LokiRequest.Step * 1e6)
func (p paramsInstantWrapper) Step() time.Duration {
return time.Duration(p.GetStep() * 1e6)
}
func (p paramsWrapper) Interval() time.Duration { return 0 }
func (p paramsWrapper) Direction() logproto.Direction {
return p.LokiRequest.Direction
func (p paramsInstantWrapper) Interval() time.Duration { return 0 }
func (p paramsInstantWrapper) Direction() logproto.Direction {
return p.GetDirection()
}
func (p paramsWrapper) Limit() uint32 { return p.LokiRequest.Limit }
func (p paramsWrapper) Shards() []string {
return p.LokiRequest.Shards
func (p paramsInstantWrapper) Limit() uint32 { return p.LokiInstantRequest.Limit }
func (p paramsInstantWrapper) Shards() []string {
return p.GetShards()
}
func httpResponseHeadersToPromResponseHeaders(httpHeaders http.Header) []queryrange.PrometheusResponseHeader {

@ -24,7 +24,17 @@ type DownstreamHandler struct {
next queryrange.Handler
}
func ParamsToLokiRequest(params logql.Params) *LokiRequest {
func ParamsToLokiRequest(params logql.Params, shards logql.Shards) queryrange.Request {
if params.Start() == params.End() {
return &LokiInstantRequest{
Query: params.Query(),
Limit: params.Limit(),
TimeTs: params.Start(),
Direction: params.Direction(),
Path: "/loki/api/v1/query", // TODO(owen-d): make this derivable
Shards: shards.Encode(),
}
}
return &LokiRequest{
Query: params.Query(),
Limit: params.Limit(),
@ -33,6 +43,7 @@ func ParamsToLokiRequest(params logql.Params) *LokiRequest {
EndTs: params.End(),
Direction: params.Direction(),
Path: "/loki/api/v1/query_range", // TODO(owen-d): make this derivable
Shards: shards.Encode(),
}
}
@ -58,10 +69,10 @@ type instance struct {
func (in instance) Downstream(ctx context.Context, queries []logql.DownstreamQuery) ([]logqlmodel.Result, error) {
return in.For(ctx, queries, func(qry logql.DownstreamQuery) (logqlmodel.Result, error) {
req := ParamsToLokiRequest(qry.Params).WithShards(qry.Shards).WithQuery(qry.Expr.String()).(*LokiRequest)
req := ParamsToLokiRequest(qry.Params, qry.Shards).WithQuery(qry.Expr.String())
logger, ctx := spanlogger.New(ctx, "DownstreamHandler.instance")
defer logger.Finish()
level.Debug(logger).Log("shards", fmt.Sprintf("%+v", req.Shards), "query", req.Query, "step", req.GetStep())
level.Debug(logger).Log("shards", fmt.Sprintf("%+v", qry.Shards), "query", req.GetQuery(), "step", req.GetStep())
res, err := in.handler.Do(ctx, req)
if err != nil {

@ -329,7 +329,7 @@ func TestInstanceDownstream(t *testing.T) {
// for some reason these seemingly can't be checked in their own goroutines,
// so we assign them to scoped variables for later comparison.
got = req
want = ParamsToLokiRequest(params).WithShards(logql.Shards{{Shard: 0, Of: 2}}).WithQuery(expr.String())
want = ParamsToLokiRequest(params, queries[0].Shards).WithQuery(expr.String())
return expectedResp(), nil
},

@ -132,6 +132,89 @@ func (m *LokiRequest) GetShards() []string {
return nil
}
type LokiInstantRequest struct {
Query string `protobuf:"bytes,1,opt,name=query,proto3" json:"query,omitempty"`
Limit uint32 `protobuf:"varint,2,opt,name=limit,proto3" json:"limit,omitempty"`
TimeTs time.Time `protobuf:"bytes,3,opt,name=timeTs,proto3,stdtime" json:"timeTs"`
Direction logproto.Direction `protobuf:"varint,4,opt,name=direction,proto3,enum=logproto.Direction" json:"direction,omitempty"`
Path string `protobuf:"bytes,5,opt,name=path,proto3" json:"path,omitempty"`
Shards []string `protobuf:"bytes,6,rep,name=shards,proto3" json:"shards"`
}
func (m *LokiInstantRequest) Reset() { *m = LokiInstantRequest{} }
func (*LokiInstantRequest) ProtoMessage() {}
func (*LokiInstantRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_51b9d53b40d11902, []int{1}
}
func (m *LokiInstantRequest) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *LokiInstantRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_LokiInstantRequest.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalTo(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *LokiInstantRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_LokiInstantRequest.Merge(m, src)
}
func (m *LokiInstantRequest) XXX_Size() int {
return m.Size()
}
func (m *LokiInstantRequest) XXX_DiscardUnknown() {
xxx_messageInfo_LokiInstantRequest.DiscardUnknown(m)
}
var xxx_messageInfo_LokiInstantRequest proto.InternalMessageInfo
func (m *LokiInstantRequest) GetQuery() string {
if m != nil {
return m.Query
}
return ""
}
func (m *LokiInstantRequest) GetLimit() uint32 {
if m != nil {
return m.Limit
}
return 0
}
func (m *LokiInstantRequest) GetTimeTs() time.Time {
if m != nil {
return m.TimeTs
}
return time.Time{}
}
func (m *LokiInstantRequest) GetDirection() logproto.Direction {
if m != nil {
return m.Direction
}
return logproto.FORWARD
}
func (m *LokiInstantRequest) GetPath() string {
if m != nil {
return m.Path
}
return ""
}
func (m *LokiInstantRequest) GetShards() []string {
if m != nil {
return m.Shards
}
return nil
}
type LokiResponse struct {
Status string `protobuf:"bytes,1,opt,name=Status,proto3" json:"status"`
Data LokiData `protobuf:"bytes,2,opt,name=Data,proto3" json:"data,omitempty"`
@ -147,7 +230,7 @@ type LokiResponse struct {
func (m *LokiResponse) Reset() { *m = LokiResponse{} }
func (*LokiResponse) ProtoMessage() {}
func (*LokiResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_51b9d53b40d11902, []int{1}
return fileDescriptor_51b9d53b40d11902, []int{2}
}
func (m *LokiResponse) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -243,7 +326,7 @@ type LokiSeriesRequest struct {
func (m *LokiSeriesRequest) Reset() { *m = LokiSeriesRequest{} }
func (*LokiSeriesRequest) ProtoMessage() {}
func (*LokiSeriesRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_51b9d53b40d11902, []int{2}
return fileDescriptor_51b9d53b40d11902, []int{3}
}
func (m *LokiSeriesRequest) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -317,7 +400,7 @@ type LokiSeriesResponse struct {
func (m *LokiSeriesResponse) Reset() { *m = LokiSeriesResponse{} }
func (*LokiSeriesResponse) ProtoMessage() {}
func (*LokiSeriesResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_51b9d53b40d11902, []int{3}
return fileDescriptor_51b9d53b40d11902, []int{4}
}
func (m *LokiSeriesResponse) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -376,7 +459,7 @@ type LokiLabelNamesRequest struct {
func (m *LokiLabelNamesRequest) Reset() { *m = LokiLabelNamesRequest{} }
func (*LokiLabelNamesRequest) ProtoMessage() {}
func (*LokiLabelNamesRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_51b9d53b40d11902, []int{4}
return fileDescriptor_51b9d53b40d11902, []int{5}
}
func (m *LokiLabelNamesRequest) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -436,7 +519,7 @@ type LokiLabelNamesResponse struct {
func (m *LokiLabelNamesResponse) Reset() { *m = LokiLabelNamesResponse{} }
func (*LokiLabelNamesResponse) ProtoMessage() {}
func (*LokiLabelNamesResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_51b9d53b40d11902, []int{5}
return fileDescriptor_51b9d53b40d11902, []int{6}
}
func (m *LokiLabelNamesResponse) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -494,7 +577,7 @@ type LokiData struct {
func (m *LokiData) Reset() { *m = LokiData{} }
func (*LokiData) ProtoMessage() {}
func (*LokiData) Descriptor() ([]byte, []int) {
return fileDescriptor_51b9d53b40d11902, []int{6}
return fileDescriptor_51b9d53b40d11902, []int{7}
}
func (m *LokiData) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -539,7 +622,7 @@ type LokiPromResponse struct {
func (m *LokiPromResponse) Reset() { *m = LokiPromResponse{} }
func (*LokiPromResponse) ProtoMessage() {}
func (*LokiPromResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_51b9d53b40d11902, []int{7}
return fileDescriptor_51b9d53b40d11902, []int{8}
}
func (m *LokiPromResponse) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -584,6 +667,7 @@ func (m *LokiPromResponse) GetStatistics() stats.Result {
func init() {
proto.RegisterType((*LokiRequest)(nil), "queryrange.LokiRequest")
proto.RegisterType((*LokiInstantRequest)(nil), "queryrange.LokiInstantRequest")
proto.RegisterType((*LokiResponse)(nil), "queryrange.LokiResponse")
proto.RegisterType((*LokiSeriesRequest)(nil), "queryrange.LokiSeriesRequest")
proto.RegisterType((*LokiSeriesResponse)(nil), "queryrange.LokiSeriesResponse")
@ -598,62 +682,65 @@ func init() {
}
var fileDescriptor_51b9d53b40d11902 = []byte{
// 874 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xdc, 0x56, 0x4f, 0x6f, 0xdc, 0x44,
0x14, 0xdf, 0x59, 0xef, 0x9f, 0x78, 0x42, 0x03, 0x4c, 0x4a, 0x6b, 0x05, 0xc9, 0xb6, 0x56, 0x08,
0x16, 0x41, 0xbd, 0x22, 0x85, 0x0b, 0x12, 0xa8, 0xb5, 0xca, 0x3f, 0xa9, 0x02, 0x34, 0xcd, 0x81,
0xeb, 0x64, 0x3d, 0xf1, 0x9a, 0xd8, 0x3b, 0xce, 0xcc, 0x2c, 0x22, 0x37, 0xae, 0xdc, 0x7a, 0x03,
0x3e, 0x01, 0x88, 0x33, 0x7c, 0x87, 0x1c, 0x73, 0xac, 0x2a, 0x61, 0xc8, 0xe6, 0x82, 0xf6, 0xd4,
0x8f, 0x80, 0x66, 0xc6, 0xde, 0x9d, 0xa0, 0x16, 0xba, 0xcd, 0x05, 0x71, 0xd9, 0x7d, 0xef, 0xcd,
0x7b, 0x33, 0xef, 0xfd, 0xde, 0xef, 0x3d, 0x19, 0xbe, 0x56, 0x1e, 0xa6, 0xa3, 0xa3, 0x19, 0xe5,
0x19, 0xe5, 0xfa, 0xff, 0x98, 0x93, 0x69, 0x4a, 0x2d, 0x31, 0x2a, 0x39, 0x93, 0x0c, 0xc1, 0x95,
0x65, 0xe7, 0x46, 0x9a, 0xc9, 0xc9, 0x6c, 0x3f, 0x1a, 0xb3, 0x62, 0x94, 0xb2, 0x94, 0x8d, 0xb4,
0xcb, 0xfe, 0xec, 0x40, 0x6b, 0x5a, 0xd1, 0x92, 0x09, 0xdd, 0x79, 0x59, 0xbd, 0x91, 0xb3, 0xd4,
0x1c, 0x34, 0x42, 0x7d, 0x18, 0xd6, 0x87, 0x47, 0x79, 0xc1, 0x12, 0x9a, 0x8f, 0x84, 0x24, 0x52,
0x98, 0xdf, 0xda, 0xe3, 0x23, 0xeb, 0xb5, 0x31, 0xe3, 0x92, 0x7e, 0x5d, 0x72, 0xf6, 0x25, 0x1d,
0xcb, 0x5a, 0x1b, 0x3d, 0x65, 0x09, 0x3b, 0x41, 0xca, 0x58, 0x9a, 0xd3, 0x55, 0xb6, 0x32, 0x2b,
0xa8, 0x90, 0xa4, 0x28, 0x8d, 0xc3, 0xe0, 0x97, 0x36, 0xdc, 0xbc, 0xcb, 0x0e, 0x33, 0x4c, 0x8f,
0x66, 0x54, 0x48, 0x74, 0x15, 0x76, 0xf5, 0x25, 0x1e, 0x08, 0xc1, 0xd0, 0xc5, 0x46, 0x51, 0xd6,
0x3c, 0x2b, 0x32, 0xe9, 0xb5, 0x43, 0x30, 0xbc, 0x82, 0x8d, 0x82, 0x10, 0xec, 0x08, 0x49, 0x4b,
0xcf, 0x09, 0xc1, 0xd0, 0xc1, 0x5a, 0x46, 0xef, 0xc3, 0xbe, 0x90, 0x84, 0xcb, 0x3d, 0xe1, 0x75,
0x42, 0x30, 0xdc, 0xdc, 0xdd, 0x89, 0x4c, 0x0a, 0x51, 0x93, 0x42, 0xb4, 0xd7, 0xa4, 0x10, 0x6f,
0x9c, 0x54, 0x41, 0xeb, 0xfe, 0xef, 0x01, 0xc0, 0x4d, 0x10, 0x7a, 0x17, 0x76, 0xe9, 0x34, 0xd9,
0x13, 0x5e, 0x77, 0x8d, 0x68, 0x13, 0x82, 0xde, 0x82, 0x6e, 0x92, 0x71, 0x3a, 0x96, 0x19, 0x9b,
0x7a, 0xbd, 0x10, 0x0c, 0xb7, 0x76, 0xb7, 0xa3, 0x25, 0xf6, 0x77, 0x9a, 0x23, 0xbc, 0xf2, 0x52,
0x25, 0x94, 0x44, 0x4e, 0xbc, 0xbe, 0xae, 0x56, 0xcb, 0x68, 0x00, 0x7b, 0x62, 0x42, 0x78, 0x22,
0xbc, 0x8d, 0xd0, 0x19, 0xba, 0x31, 0x5c, 0x54, 0x41, 0x6d, 0xc1, 0xf5, 0xff, 0xe0, 0xbb, 0x0e,
0x7c, 0xce, 0xc0, 0x26, 0x4a, 0x36, 0x15, 0x54, 0x05, 0xdd, 0x93, 0x44, 0xce, 0x84, 0x01, 0xae,
0x0e, 0xd2, 0x16, 0x5c, 0x9f, 0xa0, 0x5b, 0xb0, 0x73, 0x87, 0x48, 0xa2, 0x41, 0xdc, 0xdc, 0xbd,
0x1a, 0x59, 0xdd, 0x52, 0x77, 0xa9, 0xb3, 0xf8, 0x9a, 0x2a, 0x6a, 0x51, 0x05, 0x5b, 0x09, 0x91,
0xe4, 0x4d, 0x56, 0x64, 0x92, 0x16, 0xa5, 0x3c, 0xc6, 0x3a, 0x12, 0xbd, 0x03, 0xdd, 0x0f, 0x38,
0x67, 0x7c, 0xef, 0xb8, 0xa4, 0x1a, 0x76, 0x37, 0xbe, 0xbe, 0xa8, 0x82, 0x6d, 0xda, 0x18, 0xad,
0x88, 0x95, 0x27, 0x7a, 0x1d, 0x76, 0xb5, 0xa2, 0x5b, 0xe2, 0xc6, 0xdb, 0x8b, 0x2a, 0x78, 0x5e,
0x87, 0x58, 0xee, 0xc6, 0xe3, 0x22, 0x86, 0xdd, 0xa7, 0xc2, 0x70, 0x49, 0x8e, 0x9e, 0x4d, 0x0e,
0x0f, 0xf6, 0xbf, 0xa2, 0x5c, 0xa8, 0x6b, 0xfa, 0xda, 0xde, 0xa8, 0xe8, 0x36, 0x84, 0x0a, 0x98,
0x4c, 0xc8, 0x6c, 0xac, 0x30, 0x56, 0x60, 0x5c, 0x89, 0x0c, 0xfd, 0x31, 0x15, 0xb3, 0x5c, 0xc6,
0xa8, 0x46, 0xc1, 0x72, 0xc4, 0x96, 0x8c, 0xbe, 0x07, 0xb0, 0xff, 0x31, 0x25, 0x09, 0xe5, 0xc2,
0x73, 0x43, 0x67, 0xb8, 0xb9, 0xfb, 0x8a, 0x8d, 0xe6, 0xe7, 0x9c, 0x15, 0x54, 0x4e, 0xe8, 0x4c,
0x34, 0xfd, 0x31, 0xce, 0xf1, 0x17, 0x0f, 0xab, 0xe0, 0xb3, 0x67, 0x9b, 0xad, 0x27, 0x5e, 0xba,
0xa8, 0x02, 0x70, 0x03, 0x37, 0xe9, 0x0c, 0x7e, 0x03, 0xf0, 0x45, 0xd5, 0xcd, 0x7b, 0xea, 0x02,
0x61, 0x8d, 0x55, 0x41, 0xe4, 0x78, 0xe2, 0x01, 0x45, 0x29, 0x6c, 0x14, 0x7b, 0x58, 0xda, 0x97,
0x1a, 0x16, 0x67, 0xfd, 0x61, 0x69, 0x98, 0xdf, 0x79, 0x2c, 0xf3, 0xbb, 0x4f, 0x64, 0xfe, 0xaf,
0x6d, 0x88, 0xec, 0xfa, 0xd6, 0xe0, 0xff, 0x87, 0x4b, 0xfe, 0x3b, 0x3a, 0xdb, 0x25, 0xad, 0xcc,
0x5d, 0x9f, 0x24, 0x74, 0x2a, 0xb3, 0x83, 0x8c, 0xf2, 0x7f, 0x99, 0x02, 0x8b, 0x5a, 0xce, 0x45,
0x6a, 0xd9, 0xbc, 0xe8, 0xfc, 0xb7, 0x78, 0xf1, 0x23, 0x80, 0x2f, 0x29, 0xdc, 0xee, 0x92, 0x7d,
0x9a, 0x7f, 0x4a, 0x8a, 0x15, 0x37, 0x2c, 0x16, 0x80, 0x4b, 0xb1, 0xa0, 0xfd, 0xec, 0x2c, 0x70,
0x56, 0x2c, 0x18, 0xfc, 0xd0, 0x86, 0xd7, 0xfe, 0x9e, 0xe9, 0x1a, 0x5d, 0x7e, 0xd5, 0xea, 0xb2,
0x1b, 0xa3, 0xff, 0x57, 0x17, 0x7f, 0x06, 0x70, 0xa3, 0xd9, 0xd5, 0x28, 0x82, 0xd0, 0xec, 0x2b,
0xbd, 0x8e, 0x0d, 0x22, 0x5b, 0x6a, 0x6b, 0xf1, 0xa5, 0x15, 0x5b, 0x1e, 0x68, 0x0a, 0x7b, 0x46,
0xab, 0x27, 0xe0, 0xba, 0x35, 0x01, 0x92, 0x53, 0x52, 0xdc, 0x4e, 0x48, 0x29, 0x29, 0x8f, 0xdf,
0x53, 0x6d, 0x7a, 0x58, 0x05, 0x6f, 0xd8, 0x1f, 0x1d, 0x9c, 0x1c, 0x90, 0x29, 0x19, 0xe5, 0xec,
0x30, 0x1b, 0xd9, 0x5f, 0x17, 0x75, 0xac, 0xea, 0x84, 0x79, 0x17, 0xd7, 0xaf, 0x0c, 0xbe, 0x05,
0xf0, 0x05, 0x95, 0xac, 0xaa, 0x6d, 0xd9, 0xc2, 0x5b, 0x70, 0x83, 0xd7, 0x72, 0x4d, 0x37, 0xff,
0x9f, 0xc1, 0x8d, 0x3b, 0x27, 0x55, 0x00, 0xf0, 0x32, 0x0a, 0xdd, 0xbc, 0xb0, 0xbf, 0xdb, 0x8f,
0xdb, 0xdf, 0x2a, 0xa4, 0x65, 0x6f, 0xec, 0xf8, 0xed, 0xd3, 0x33, 0xbf, 0xf5, 0xe0, 0xcc, 0x6f,
0x3d, 0x3a, 0xf3, 0xc1, 0x37, 0x73, 0x1f, 0xfc, 0x34, 0xf7, 0xc1, 0xc9, 0xdc, 0x07, 0xa7, 0x73,
0x1f, 0xfc, 0x31, 0xf7, 0xc1, 0x9f, 0x73, 0xbf, 0xf5, 0x68, 0xee, 0x83, 0xfb, 0xe7, 0x7e, 0xeb,
0xf4, 0xdc, 0x6f, 0x3d, 0x38, 0xf7, 0x5b, 0xfb, 0x3d, 0x5d, 0xe1, 0xcd, 0xbf, 0x02, 0x00, 0x00,
0xff, 0xff, 0xb2, 0xbc, 0xe5, 0x40, 0xb3, 0x09, 0x00, 0x00,
// 913 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xdc, 0x56, 0x4f, 0x6f, 0x23, 0x35,
0x14, 0x8f, 0xf3, 0xb7, 0x71, 0xd9, 0x02, 0xee, 0xb2, 0x3b, 0x2a, 0xd2, 0x4c, 0x14, 0x21, 0x08,
0x82, 0x9d, 0x88, 0x2e, 0x5c, 0x10, 0xa0, 0xdd, 0xd1, 0xf2, 0x67, 0xa5, 0x15, 0x20, 0x6f, 0x0e,
0x5c, 0xdd, 0x8c, 0x3b, 0x19, 0x3a, 0x33, 0x9e, 0xda, 0x0e, 0xa2, 0x37, 0xae, 0xdc, 0xf6, 0x06,
0x7c, 0x02, 0x10, 0x67, 0xf8, 0x0e, 0x3d, 0xf6, 0xb8, 0x5a, 0x89, 0x81, 0xa6, 0x17, 0xc8, 0x69,
0x3f, 0x02, 0xb2, 0x3d, 0x93, 0xb8, 0xa8, 0x85, 0xa6, 0xbd, 0xa0, 0xbd, 0x24, 0x7e, 0xcf, 0xef,
0xd9, 0xef, 0xf7, 0x7b, 0x3f, 0x3f, 0x0d, 0x7c, 0x2d, 0xdf, 0x8b, 0x86, 0xfb, 0x53, 0xca, 0x63,
0xca, 0xf5, 0xff, 0x01, 0x27, 0x59, 0x44, 0xad, 0xa5, 0x9f, 0x73, 0x26, 0x19, 0x82, 0x4b, 0xcf,
0xd6, 0xad, 0x28, 0x96, 0x93, 0xe9, 0x8e, 0x3f, 0x66, 0xe9, 0x30, 0x62, 0x11, 0x1b, 0xea, 0x90,
0x9d, 0xe9, 0xae, 0xb6, 0xb4, 0xa1, 0x57, 0x26, 0x75, 0xeb, 0x65, 0x75, 0x47, 0xc2, 0x22, 0xb3,
0x51, 0x2d, 0xca, 0xcd, 0x5e, 0xb9, 0xb9, 0x9f, 0xa4, 0x2c, 0xa4, 0xc9, 0x50, 0x48, 0x22, 0x85,
0xf9, 0x2d, 0x23, 0x3e, 0xb6, 0x6e, 0x1b, 0x33, 0x2e, 0xe9, 0xd7, 0x39, 0x67, 0x5f, 0xd2, 0xb1,
0x2c, 0xad, 0xe1, 0x05, 0x21, 0x6c, 0x79, 0x11, 0x63, 0x51, 0x42, 0x97, 0xd5, 0xca, 0x38, 0xa5,
0x42, 0x92, 0x34, 0x37, 0x01, 0xfd, 0x5f, 0xea, 0x70, 0xfd, 0x01, 0xdb, 0x8b, 0x31, 0xdd, 0x9f,
0x52, 0x21, 0xd1, 0x75, 0xd8, 0xd2, 0x87, 0x38, 0xa0, 0x07, 0x06, 0x5d, 0x6c, 0x0c, 0xe5, 0x4d,
0xe2, 0x34, 0x96, 0x4e, 0xbd, 0x07, 0x06, 0xd7, 0xb0, 0x31, 0x10, 0x82, 0x4d, 0x21, 0x69, 0xee,
0x34, 0x7a, 0x60, 0xd0, 0xc0, 0x7a, 0x8d, 0x3e, 0x80, 0x1d, 0x21, 0x09, 0x97, 0x23, 0xe1, 0x34,
0x7b, 0x60, 0xb0, 0xbe, 0xbd, 0xe5, 0x9b, 0x12, 0xfc, 0xaa, 0x04, 0x7f, 0x54, 0x95, 0x10, 0xac,
0x1d, 0x16, 0x5e, 0xed, 0xd1, 0xef, 0x1e, 0xc0, 0x55, 0x12, 0x7a, 0x17, 0xb6, 0x68, 0x16, 0x8e,
0x84, 0xd3, 0x5a, 0x21, 0xdb, 0xa4, 0xa0, 0xb7, 0x60, 0x37, 0x8c, 0x39, 0x1d, 0xcb, 0x98, 0x65,
0x4e, 0xbb, 0x07, 0x06, 0x1b, 0xdb, 0x9b, 0xfe, 0x82, 0xfb, 0x7b, 0xd5, 0x16, 0x5e, 0x46, 0x29,
0x08, 0x39, 0x91, 0x13, 0xa7, 0xa3, 0xd1, 0xea, 0x35, 0xea, 0xc3, 0xb6, 0x98, 0x10, 0x1e, 0x0a,
0x67, 0xad, 0xd7, 0x18, 0x74, 0x03, 0x38, 0x2f, 0xbc, 0xd2, 0x83, 0xcb, 0xff, 0xfe, 0x5f, 0x00,
0x22, 0x45, 0xdb, 0xfd, 0x4c, 0x48, 0x92, 0xc9, 0xcb, 0xb0, 0xf7, 0x1e, 0x6c, 0xab, 0x66, 0x8c,
0x84, 0xe6, 0xef, 0xa2, 0x50, 0xcb, 0x9c, 0xd3, 0x58, 0x9b, 0x2b, 0x61, 0x6d, 0x9d, 0x89, 0xb5,
0x7d, 0x2e, 0xd6, 0xef, 0x9a, 0xf0, 0x39, 0x23, 0x11, 0x91, 0xb3, 0x4c, 0x50, 0x95, 0xf4, 0x50,
0x12, 0x39, 0x15, 0x06, 0x66, 0x99, 0xa4, 0x3d, 0xb8, 0xdc, 0x41, 0x77, 0x60, 0xf3, 0x1e, 0x91,
0x44, 0x43, 0x5e, 0xdf, 0xbe, 0xee, 0x5b, 0xca, 0x54, 0x67, 0xa9, 0xbd, 0xe0, 0x86, 0x42, 0x35,
0x2f, 0xbc, 0x8d, 0x90, 0x48, 0xf2, 0x26, 0x4b, 0x63, 0x49, 0xd3, 0x5c, 0x1e, 0x60, 0x9d, 0x89,
0xde, 0x81, 0xdd, 0x0f, 0x39, 0x67, 0x7c, 0x74, 0x90, 0x53, 0x4d, 0x51, 0x37, 0xb8, 0x39, 0x2f,
0xbc, 0x4d, 0x5a, 0x39, 0xad, 0x8c, 0x65, 0x24, 0x7a, 0x1d, 0xb6, 0xb4, 0xa1, 0x49, 0xe9, 0x06,
0x9b, 0xf3, 0xc2, 0x7b, 0x5e, 0xa7, 0x58, 0xe1, 0x26, 0xe2, 0x34, 0x87, 0xad, 0x0b, 0x71, 0xb8,
0x68, 0x65, 0xdb, 0x6e, 0xa5, 0x03, 0x3b, 0x5f, 0x51, 0x2e, 0xd4, 0x31, 0x1d, 0xed, 0xaf, 0x4c,
0x74, 0x17, 0x42, 0x45, 0x4c, 0x2c, 0x64, 0x3c, 0x56, 0x7a, 0x52, 0x64, 0x5c, 0xf3, 0xcd, 0x53,
0xc7, 0x54, 0x4c, 0x13, 0x19, 0xa0, 0x92, 0x05, 0x2b, 0x10, 0x5b, 0x6b, 0xf4, 0x3d, 0x80, 0x9d,
0x4f, 0x28, 0x09, 0x29, 0x17, 0x4e, 0xb7, 0xd7, 0x18, 0xac, 0x6f, 0xbf, 0x62, 0xb3, 0xf9, 0x39,
0x67, 0x29, 0x95, 0x13, 0x3a, 0x15, 0x55, 0x7f, 0x4c, 0x70, 0xf0, 0xc5, 0x93, 0xc2, 0xfb, 0xec,
0x72, 0x73, 0xe4, 0xdc, 0x43, 0xe7, 0x85, 0x07, 0x6e, 0xe1, 0xaa, 0x9c, 0xfe, 0x6f, 0x00, 0xbe,
0xa8, 0xba, 0xf9, 0x50, 0x1d, 0x20, 0xac, 0x47, 0x90, 0x12, 0x39, 0x9e, 0x38, 0x40, 0x49, 0x0a,
0x1b, 0xc3, 0x1e, 0x0c, 0xf5, 0x2b, 0x0d, 0x86, 0xc6, 0xea, 0x83, 0xa1, 0x52, 0x7e, 0xf3, 0x4c,
0xe5, 0xb7, 0xce, 0x55, 0xfe, 0xaf, 0x75, 0xf3, 0xca, 0x2b, 0x7c, 0x2b, 0xe8, 0xff, 0xa3, 0x85,
0xfe, 0x1b, 0xba, 0xda, 0x85, 0xac, 0xcc, 0x59, 0xf7, 0x43, 0x9a, 0xc9, 0x78, 0x37, 0xa6, 0xfc,
0x3f, 0x5e, 0x81, 0x25, 0xad, 0xc6, 0x69, 0x69, 0xd9, 0xba, 0x68, 0xfe, 0xbf, 0x74, 0xf1, 0x23,
0x80, 0x2f, 0x29, 0xde, 0x1e, 0x90, 0x1d, 0x9a, 0x7c, 0x4a, 0xd2, 0xa5, 0x36, 0x2c, 0x15, 0x80,
0x2b, 0xa9, 0xa0, 0x7e, 0x79, 0x15, 0x34, 0x96, 0x2a, 0xe8, 0xff, 0x50, 0x87, 0x37, 0xfe, 0x59,
0xe9, 0x0a, 0x5d, 0x7e, 0xd5, 0xea, 0x72, 0x37, 0x40, 0xcf, 0x56, 0x17, 0x7f, 0x06, 0x70, 0xad,
0x9a, 0xd5, 0xc8, 0x87, 0xd0, 0xcc, 0x2b, 0x3d, 0x8e, 0x0d, 0x23, 0x1b, 0x6a, 0x6a, 0xf1, 0x85,
0x17, 0x5b, 0x11, 0x28, 0x83, 0x6d, 0x63, 0x95, 0x2f, 0xe0, 0xa6, 0xf5, 0x02, 0x24, 0xa7, 0x24,
0xbd, 0x1b, 0x92, 0x5c, 0x52, 0x1e, 0xbc, 0xaf, 0xda, 0xf4, 0xa4, 0xf0, 0xde, 0xb0, 0x3f, 0xb0,
0x38, 0xd9, 0x25, 0x19, 0x19, 0x26, 0x6c, 0x2f, 0x1e, 0xda, 0x5f, 0x52, 0x65, 0xae, 0xea, 0x84,
0xb9, 0x17, 0x97, 0xb7, 0xf4, 0xbf, 0x05, 0xf0, 0x05, 0x55, 0xac, 0xc2, 0xb6, 0x68, 0xe1, 0x1d,
0xb8, 0xc6, 0xcb, 0x75, 0x29, 0x37, 0xf7, 0xdf, 0xc9, 0x0d, 0x9a, 0x87, 0x85, 0x07, 0xf0, 0x22,
0x0b, 0xdd, 0x3e, 0x35, 0xbf, 0xeb, 0x67, 0xcd, 0x6f, 0x95, 0x52, 0xb3, 0x27, 0x76, 0xf0, 0xf6,
0xd1, 0xb1, 0x5b, 0x7b, 0x7c, 0xec, 0xd6, 0x9e, 0x1e, 0xbb, 0xe0, 0x9b, 0x99, 0x0b, 0x7e, 0x9a,
0xb9, 0xe0, 0x70, 0xe6, 0x82, 0xa3, 0x99, 0x0b, 0xfe, 0x98, 0xb9, 0xe0, 0xcf, 0x99, 0x5b, 0x7b,
0x3a, 0x73, 0xc1, 0xa3, 0x13, 0xb7, 0x76, 0x74, 0xe2, 0xd6, 0x1e, 0x9f, 0xb8, 0xb5, 0x9d, 0xb6,
0x46, 0x78, 0xfb, 0xef, 0x00, 0x00, 0x00, 0xff, 0xff, 0x52, 0x92, 0xbf, 0x59, 0x9f, 0x0a, 0x00,
0x00,
}
func (this *LokiRequest) Equal(that interface{}) bool {
@ -706,6 +793,50 @@ func (this *LokiRequest) Equal(that interface{}) bool {
}
return true
}
func (this *LokiInstantRequest) Equal(that interface{}) bool {
if that == nil {
return this == nil
}
that1, ok := that.(*LokiInstantRequest)
if !ok {
that2, ok := that.(LokiInstantRequest)
if ok {
that1 = &that2
} else {
return false
}
}
if that1 == nil {
return this == nil
} else if this == nil {
return false
}
if this.Query != that1.Query {
return false
}
if this.Limit != that1.Limit {
return false
}
if !this.TimeTs.Equal(that1.TimeTs) {
return false
}
if this.Direction != that1.Direction {
return false
}
if this.Path != that1.Path {
return false
}
if len(this.Shards) != len(that1.Shards) {
return false
}
for i := range this.Shards {
if this.Shards[i] != that1.Shards[i] {
return false
}
}
return true
}
func (this *LokiResponse) Equal(that interface{}) bool {
if that == nil {
return this == nil
@ -997,6 +1128,21 @@ func (this *LokiRequest) GoString() string {
s = append(s, "}")
return strings.Join(s, "")
}
func (this *LokiInstantRequest) GoString() string {
if this == nil {
return "nil"
}
s := make([]string, 0, 10)
s = append(s, "&queryrange.LokiInstantRequest{")
s = append(s, "Query: "+fmt.Sprintf("%#v", this.Query)+",\n")
s = append(s, "Limit: "+fmt.Sprintf("%#v", this.Limit)+",\n")
s = append(s, "TimeTs: "+fmt.Sprintf("%#v", this.TimeTs)+",\n")
s = append(s, "Direction: "+fmt.Sprintf("%#v", this.Direction)+",\n")
s = append(s, "Path: "+fmt.Sprintf("%#v", this.Path)+",\n")
s = append(s, "Shards: "+fmt.Sprintf("%#v", this.Shards)+",\n")
s = append(s, "}")
return strings.Join(s, "")
}
func (this *LokiResponse) GoString() string {
if this == nil {
return "nil"
@ -1181,6 +1327,69 @@ func (m *LokiRequest) MarshalTo(dAtA []byte) (int, error) {
return i, nil
}
func (m *LokiInstantRequest) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *LokiInstantRequest) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if len(m.Query) > 0 {
dAtA[i] = 0xa
i++
i = encodeVarintQueryrange(dAtA, i, uint64(len(m.Query)))
i += copy(dAtA[i:], m.Query)
}
if m.Limit != 0 {
dAtA[i] = 0x10
i++
i = encodeVarintQueryrange(dAtA, i, uint64(m.Limit))
}
dAtA[i] = 0x1a
i++
i = encodeVarintQueryrange(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.TimeTs)))
n3, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.TimeTs, dAtA[i:])
if err != nil {
return 0, err
}
i += n3
if m.Direction != 0 {
dAtA[i] = 0x20
i++
i = encodeVarintQueryrange(dAtA, i, uint64(m.Direction))
}
if len(m.Path) > 0 {
dAtA[i] = 0x2a
i++
i = encodeVarintQueryrange(dAtA, i, uint64(len(m.Path)))
i += copy(dAtA[i:], m.Path)
}
if len(m.Shards) > 0 {
for _, s := range m.Shards {
dAtA[i] = 0x32
i++
l = len(s)
for l >= 1<<7 {
dAtA[i] = uint8(uint64(l)&0x7f | 0x80)
l >>= 7
i++
}
dAtA[i] = uint8(l)
i++
i += copy(dAtA[i:], s)
}
}
return i, nil
}
func (m *LokiResponse) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
@ -1205,11 +1414,11 @@ func (m *LokiResponse) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0x12
i++
i = encodeVarintQueryrange(dAtA, i, uint64(m.Data.Size()))
n3, err := m.Data.MarshalTo(dAtA[i:])
n4, err := m.Data.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n3
i += n4
if len(m.ErrorType) > 0 {
dAtA[i] = 0x1a
i++
@ -1240,11 +1449,11 @@ func (m *LokiResponse) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0x42
i++
i = encodeVarintQueryrange(dAtA, i, uint64(m.Statistics.Size()))
n4, err := m.Statistics.MarshalTo(dAtA[i:])
n5, err := m.Statistics.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n4
i += n5
if len(m.Headers) > 0 {
for _, msg := range m.Headers {
dAtA[i] = 0x4a
@ -1293,19 +1502,19 @@ func (m *LokiSeriesRequest) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0x12
i++
i = encodeVarintQueryrange(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.StartTs)))
n5, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.StartTs, dAtA[i:])
n6, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.StartTs, dAtA[i:])
if err != nil {
return 0, err
}
i += n5
i += n6
dAtA[i] = 0x1a
i++
i = encodeVarintQueryrange(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.EndTs)))
n6, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.EndTs, dAtA[i:])
n7, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.EndTs, dAtA[i:])
if err != nil {
return 0, err
}
i += n6
i += n7
if len(m.Path) > 0 {
dAtA[i] = 0x22
i++
@ -1401,19 +1610,19 @@ func (m *LokiLabelNamesRequest) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0xa
i++
i = encodeVarintQueryrange(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.StartTs)))
n7, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.StartTs, dAtA[i:])
n8, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.StartTs, dAtA[i:])
if err != nil {
return 0, err
}
i += n7
i += n8
dAtA[i] = 0x12
i++
i = encodeVarintQueryrange(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.EndTs)))
n8, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.EndTs, dAtA[i:])
n9, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.EndTs, dAtA[i:])
if err != nil {
return 0, err
}
i += n8
i += n9
if len(m.Path) > 0 {
dAtA[i] = 0x1a
i++
@ -1534,20 +1743,20 @@ func (m *LokiPromResponse) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0xa
i++
i = encodeVarintQueryrange(dAtA, i, uint64(m.Response.Size()))
n9, err := m.Response.MarshalTo(dAtA[i:])
n10, err := m.Response.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n9
i += n10
}
dAtA[i] = 0x12
i++
i = encodeVarintQueryrange(dAtA, i, uint64(m.Statistics.Size()))
n10, err := m.Statistics.MarshalTo(dAtA[i:])
n11, err := m.Statistics.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n10
i += n11
return i, nil
}
@ -1596,6 +1805,37 @@ func (m *LokiRequest) Size() (n int) {
return n
}
func (m *LokiInstantRequest) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.Query)
if l > 0 {
n += 1 + l + sovQueryrange(uint64(l))
}
if m.Limit != 0 {
n += 1 + sovQueryrange(uint64(m.Limit))
}
l = github_com_gogo_protobuf_types.SizeOfStdTime(m.TimeTs)
n += 1 + l + sovQueryrange(uint64(l))
if m.Direction != 0 {
n += 1 + sovQueryrange(uint64(m.Direction))
}
l = len(m.Path)
if l > 0 {
n += 1 + l + sovQueryrange(uint64(l))
}
if len(m.Shards) > 0 {
for _, s := range m.Shards {
l = len(s)
n += 1 + l + sovQueryrange(uint64(l))
}
}
return n
}
func (m *LokiResponse) Size() (n int) {
if m == nil {
return 0
@ -1802,6 +2042,21 @@ func (this *LokiRequest) String() string {
}, "")
return s
}
func (this *LokiInstantRequest) String() string {
if this == nil {
return "nil"
}
s := strings.Join([]string{`&LokiInstantRequest{`,
`Query:` + fmt.Sprintf("%v", this.Query) + `,`,
`Limit:` + fmt.Sprintf("%v", this.Limit) + `,`,
`TimeTs:` + strings.Replace(strings.Replace(this.TimeTs.String(), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`,
`Direction:` + fmt.Sprintf("%v", this.Direction) + `,`,
`Path:` + fmt.Sprintf("%v", this.Path) + `,`,
`Shards:` + fmt.Sprintf("%v", this.Shards) + `,`,
`}`,
}, "")
return s
}
func (this *LokiResponse) String() string {
if this == nil {
return "nil"
@ -2174,6 +2429,226 @@ func (m *LokiRequest) Unmarshal(dAtA []byte) error {
}
return nil
}
func (m *LokiInstantRequest) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowQueryrange
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: LokiInstantRequest: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: LokiInstantRequest: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Query", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowQueryrange
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthQueryrange
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthQueryrange
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Query = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Limit", wireType)
}
m.Limit = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowQueryrange
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Limit |= uint32(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field TimeTs", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowQueryrange
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthQueryrange
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthQueryrange
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(&m.TimeTs, dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 4:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Direction", wireType)
}
m.Direction = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowQueryrange
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Direction |= logproto.Direction(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 5:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Path", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowQueryrange
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthQueryrange
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthQueryrange
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Path = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 6:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Shards", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowQueryrange
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthQueryrange
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthQueryrange
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Shards = append(m.Shards, string(dAtA[iNdEx:postIndex]))
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipQueryrange(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthQueryrange
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthQueryrange
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *LokiResponse) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0

@ -22,6 +22,15 @@ message LokiRequest {
repeated string shards = 8 [(gogoproto.jsontag) = "shards"];
}
message LokiInstantRequest {
string query = 1;
uint32 limit = 2;
google.protobuf.Timestamp timeTs = 3 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
logproto.Direction direction = 4;
string path = 5;
repeated string shards = 6 [(gogoproto.jsontag) = "shards"];
}
message LokiResponse {
string Status = 1 [(gogoproto.jsontag) = "status"];
LokiData Data = 2 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "data,omitempty"];

@ -23,7 +23,6 @@ import (
func NewQueryShardMiddleware(
logger log.Logger,
confs queryrange.ShardingConfigs,
minShardingLookback time.Duration,
middlewareMetrics *queryrange.InstrumentMiddlewareMetrics,
shardingMetrics *logql.ShardingMetrics,
limits logql.Limits,
@ -88,11 +87,6 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrange.Request) (queryra
shardedLog, ctx := spanlogger.New(ctx, "shardedEngine")
defer shardedLog.Finish()
req, ok := r.(*LokiRequest)
if !ok {
return nil, fmt.Errorf("expected *LokiRequest, got (%T)", r)
}
mapper, err := logql.NewShardMapper(int(conf.RowShards), ast.metrics)
if err != nil {
return nil, err
@ -111,7 +105,21 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrange.Request) (queryra
return ast.next.Do(ctx, r)
}
query := ast.ng.Query(paramsFromRequest(req), parsed)
params, err := paramsFromRequest(r)
if err != nil {
return nil, err
}
var path string
switch r := r.(type) {
case *LokiRequest:
path = r.GetPath()
case *LokiInstantRequest:
path = r.GetPath()
default:
return nil, fmt.Errorf("expected *LokiRequest or *LokiInstantRequest, got (%T)", r)
}
query := ast.ng.Query(params, parsed)
res, err := query.Exec(ctx)
if err != nil {
@ -130,7 +138,7 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrange.Request) (queryra
Status: loghttp.QueryStatusSuccess,
Data: queryrange.PrometheusData{
ResultType: loghttp.ResultTypeMatrix,
Result: toProto(value.(loghttp.Matrix)),
Result: toProtoMatrix(value.(loghttp.Matrix)),
},
},
Statistics: res.Statistics,
@ -138,17 +146,26 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrange.Request) (queryra
case logqlmodel.ValueTypeStreams:
return &LokiResponse{
Status: loghttp.QueryStatusSuccess,
Direction: req.Direction,
Limit: req.Limit,
Version: uint32(loghttp.GetVersion(req.Path)),
Direction: params.Direction(),
Limit: params.Limit(),
Version: uint32(loghttp.GetVersion(path)),
Statistics: res.Statistics,
Data: LokiData{
ResultType: loghttp.ResultTypeStream,
Result: value.(loghttp.Streams).ToProto(),
},
}, nil
case parser.ValueTypeVector:
return &LokiPromResponse{Response: &queryrange.PrometheusResponse{
Status: loghttp.QueryStatusSuccess,
Data: queryrange.PrometheusData{
ResultType: loghttp.ResultTypeVector,
Result: toProtoVector(value.(loghttp.Vector)),
},
},
}, nil
default:
return nil, fmt.Errorf("unexpected downstream response type (%T)", res.Data)
return nil, fmt.Errorf("unexpected downstream response type (%T)", res.Data.Type())
}
}

@ -11,6 +11,7 @@ import (
"time"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log"
@ -234,6 +235,70 @@ func mockHandler(resp queryrange.Response, err error) queryrange.Handler {
})
}
func Test_InstantSharding(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "1")
var lock sync.Mutex
called := 0
shards := []string{}
sharding := NewQueryShardMiddleware(log.NewNopLogger(), queryrange.ShardingConfigs{
chunk.PeriodConfig{
RowShards: 3,
},
}, queryrange.NewInstrumentMiddlewareMetrics(nil),
nilShardingMetrics,
fakeLimits{
maxSeries: math.MaxInt32,
maxQueryParallelism: 10,
})
response, err := sharding.Wrap(queryrange.HandlerFunc(func(c context.Context, r queryrange.Request) (queryrange.Response, error) {
lock.Lock()
defer lock.Unlock()
called++
shards = append(shards, r.(*LokiInstantRequest).Shards...)
return &LokiPromResponse{Response: &queryrange.PrometheusResponse{
Data: queryrange.PrometheusData{
ResultType: loghttp.ResultTypeVector,
Result: []queryrange.SampleStream{
{
Labels: []cortexpb.LabelAdapter{{Name: "foo", Value: "bar"}},
Samples: []cortexpb.Sample{{Value: 10, TimestampMs: 10}},
},
},
},
}}, nil
})).Do(ctx, &LokiInstantRequest{
Query: `rate({app="foo"}[1m])`,
TimeTs: util.TimeFromMillis(10),
Path: "/v1/query",
})
require.NoError(t, err)
require.Equal(t, 3, called, "expected 3 calls but got {}", called)
require.Len(t, response.(*LokiPromResponse).Response.Data.Result, 3)
require.ElementsMatch(t, []string{"0_of_3", "1_of_3", "2_of_3"}, shards)
require.Equal(t, &LokiPromResponse{Response: &queryrange.PrometheusResponse{
Status: "success",
Data: queryrange.PrometheusData{
ResultType: loghttp.ResultTypeVector,
Result: []queryrange.SampleStream{
{
Labels: []cortexpb.LabelAdapter{{Name: "foo", Value: "bar"}},
Samples: []cortexpb.Sample{{Value: 10, TimestampMs: 10}},
},
{
Labels: []cortexpb.LabelAdapter{{Name: "foo", Value: "bar"}},
Samples: []cortexpb.Sample{{Value: 10, TimestampMs: 10}},
},
{
Labels: []cortexpb.LabelAdapter{{Name: "foo", Value: "bar"}},
Samples: []cortexpb.Sample{{Value: 10, TimestampMs: 10}},
},
},
},
}}, response)
}
func Test_SeriesShardingHandler(t *testing.T) {
sharding := NewSeriesQueryShardMiddleware(log.NewNopLogger(), queryrange.ShardingConfigs{
chunk.PeriodConfig{

@ -76,30 +76,36 @@ func NewTripperware(
return nil, nil, err
}
instantMetricTripperware, err := NewInstantMetricTripperware(cfg, log, limits, schema, LokiCodec, instrumentMetrics, retryMetrics, shardingMetrics, splitByMetrics)
if err != nil {
return nil, nil, err
}
return func(next http.RoundTripper) http.RoundTripper {
metricRT := metricsTripperware(next)
logFilterRT := logFilterTripperware(next)
seriesRT := seriesTripperware(next)
labelsRT := labelsTripperware(next)
return newRoundTripper(next, logFilterRT, metricRT, seriesRT, labelsRT, limits)
instantRT := instantMetricTripperware(next)
return newRoundTripper(next, logFilterRT, metricRT, seriesRT, labelsRT, instantRT, limits)
}, cache, nil
}
type roundTripper struct {
next, log, metric, series, labels http.RoundTripper
next, log, metric, series, labels, instantMetric http.RoundTripper
limits Limits
}
// newRoundTripper creates a new queryrange roundtripper
func newRoundTripper(next, log, metric, series, labels http.RoundTripper, limits Limits) roundTripper {
func newRoundTripper(next, log, metric, series, labels, instantMetric http.RoundTripper, limits Limits) roundTripper {
return roundTripper{
log: log,
limits: limits,
metric: metric,
series: series,
labels: labels,
next: next,
log: log,
limits: limits,
metric: metric,
series: series,
labels: labels,
instantMetric: instantMetric,
next: next,
}
}
@ -150,6 +156,21 @@ func (r roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
return r.labels.RoundTrip(req)
case InstantQueryOp:
instantQuery, err := loghttp.ParseInstantQuery(req)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
expr, err := logql.ParseExpr(instantQuery.Query)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
switch expr.(type) {
case logql.SampleExpr:
return r.instantMetric.RoundTrip(req)
default:
return r.next.RoundTrip(req)
}
default:
return r.next.RoundTrip(req)
}
@ -190,9 +211,10 @@ func validateLimits(req *http.Request, reqLimit uint32, limits Limits) error {
}
const (
QueryRangeOp = "query_range"
SeriesOp = "series"
LabelNamesOp = "labels"
InstantQueryOp = "instant_query"
QueryRangeOp = "query_range"
SeriesOp = "series"
LabelNamesOp = "labels"
)
func getOperation(path string) string {
@ -203,6 +225,8 @@ func getOperation(path string) string {
return SeriesOp
case strings.HasSuffix(path, "/labels") || strings.HasSuffix(path, "/label"):
return LabelNamesOp
case strings.HasSuffix(path, "/v1/query"):
return InstantQueryOp
default:
return ""
}
@ -234,7 +258,6 @@ func NewLogFilterTripperware(
NewQueryShardMiddleware(
log,
schema.Configs,
minShardingLookback,
instrumentMetrics, // instrumentation is included in the sharding middleware
shardingMetrics,
limits,
@ -396,7 +419,6 @@ func NewMetricTripperware(
NewQueryShardMiddleware(
log,
schema.Configs,
minShardingLookback,
instrumentMetrics, // instrumentation is included in the sharding middleware
shardingMetrics,
limits,
@ -426,3 +448,45 @@ func NewMetricTripperware(
return next
}, c, nil
}
// NewInstantMetricTripperware creates a new frontend tripperware responsible for handling metric queries
func NewInstantMetricTripperware(
cfg Config,
log log.Logger,
limits Limits,
schema chunk.SchemaConfig,
codec queryrange.Codec,
instrumentMetrics *queryrange.InstrumentMiddlewareMetrics,
retryMiddlewareMetrics *queryrange.RetryMiddlewareMetrics,
shardingMetrics *logql.ShardingMetrics,
splitByMetrics *SplitByMetrics,
) (queryrange.Tripperware, error) {
queryRangeMiddleware := []queryrange.Middleware{StatsCollectorMiddleware(), queryrange.NewLimitsMiddleware(limits)}
if cfg.ShardedQueries {
queryRangeMiddleware = append(queryRangeMiddleware,
NewQueryShardMiddleware(
log,
schema.Configs,
instrumentMetrics, // instrumentation is included in the sharding middleware
shardingMetrics,
limits,
),
)
}
if cfg.MaxRetries > 0 {
queryRangeMiddleware = append(
queryRangeMiddleware,
queryrange.InstrumentMiddleware("retry", instrumentMetrics),
queryrange.NewRetryMiddleware(log, cfg.MaxRetries, retryMiddlewareMetrics),
)
}
return func(next http.RoundTripper) http.RoundTripper {
if len(queryRangeMiddleware) > 0 {
return NewLimitedRoundTripper(next, codec, limits, queryRangeMiddleware...)
}
return next
}, nil
}

@ -67,6 +67,24 @@ var (
},
},
}
vector = promql.Vector{
{
Point: promql.Point{
T: toMs(testTime.Add(-4 * time.Hour)),
V: 0.013333333333333334,
},
Metric: []labels.Label{
{
Name: "filename",
Value: `/var/hostlog/apport.log`,
},
{
Name: "job",
Value: "varlogs",
},
},
},
}
streams = logqlmodel.Streams{
{
Entries: []logproto.Entry{
@ -200,6 +218,45 @@ func TestLogFilterTripperware(t *testing.T) {
require.Error(t, err)
}
func TestInstantQueryTripperware(t *testing.T) {
testShardingConfig := testConfig
testShardingConfig.ShardedQueries = true
tpw, stopper, err := NewTripperware(testShardingConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, 1*time.Second, nil)
if stopper != nil {
defer stopper.Stop()
}
require.NoError(t, err)
rt, err := newfakeRoundTripper()
require.NoError(t, err)
defer rt.Close()
lreq := &LokiInstantRequest{
Query: `sum by (job) (bytes_rate({cluster="dev-us-central-0"}[15m]))`,
Limit: 1000,
Direction: logproto.FORWARD,
Path: "/loki/api/v1/query",
}
ctx := user.InjectOrgID(context.Background(), "1")
req, err := LokiCodec.EncodeRequest(ctx, lreq)
require.NoError(t, err)
req = req.WithContext(ctx)
err = user.InjectOrgIDIntoHTTPRequest(ctx, req)
require.NoError(t, err)
count, h := promqlResult(vector)
rt.setHandler(h)
resp, err := tpw(rt).RoundTrip(req)
require.Equal(t, 1, *count)
require.NoError(t, err)
lokiResponse, err := LokiCodec.DecodeResponse(ctx, resp, lreq)
require.NoError(t, err)
require.IsType(t, &LokiPromResponse{}, lokiResponse)
}
func TestSeriesTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, 0, nil)
if stopper != nil {
@ -419,6 +476,10 @@ func TestPostQueries(t *testing.T) {
t.Error("unexpected labels roundtripper called")
return nil, nil
}),
queryrange.RoundTripFunc(func(*http.Request) (*http.Response, error) {
t.Error("unexpected instant roundtripper called")
return nil, nil
}),
fakeLimits{},
).RoundTrip(req)
require.NoError(t, err)

@ -109,8 +109,11 @@ func StatsCollectorMiddleware() queryrange.Middleware {
if data, ok := ctxValue.(*queryData); ok {
data.recorded = true
data.statistics = statistics
data.params = paramsFromRequest(req)
data.result = res
data.params, err = paramsFromRequest(req)
if err != nil {
return nil, err
}
}
return resp, err
})

@ -86,7 +86,7 @@ func Test_StatsHTTP(t *testing.T) {
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
data := r.Context().Value(ctxKey).(*queryData)
data.recorded = true
data.params = paramsFromRequest(&LokiRequest{
data.params, _ = paramsFromRequest(&LokiRequest{
Query: "foo",
Direction: logproto.BACKWARD,
Limit: 100,
@ -106,7 +106,7 @@ func Test_StatsHTTP(t *testing.T) {
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
data := r.Context().Value(ctxKey).(*queryData)
data.recorded = true
data.params = paramsFromRequest(&LokiRequest{
data.params, _ = paramsFromRequest(&LokiRequest{
Query: "foo",
Direction: logproto.BACKWARD,
Limit: 100,
@ -127,7 +127,7 @@ func Test_StatsHTTP(t *testing.T) {
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
data := r.Context().Value(ctxKey).(*queryData)
data.recorded = true
data.params = paramsFromRequest(&LokiRequest{
data.params, _ = paramsFromRequest(&LokiRequest{
Query: "foo",
Direction: logproto.BACKWARD,
Limit: 100,

@ -18,7 +18,7 @@ CGO_ENABLED=0 GOOS=linux go build -mod=vendor -gcflags "all=-N -l" -o "${SCRIPT_
# ## install loki driver to send logs
docker plugin install grafana/loki-docker-driver:latest --alias loki-compose --grant-all-permissions || true
# build the compose image
docker compose -f "${SCRIPT_DIR}"/docker-compose.yml build distributor
docker-compose -f "${SCRIPT_DIR}"/docker-compose.yml build distributor
# cleanup sources
rm -Rf "${SRC_DEST}"
docker compose -f "${SCRIPT_DIR}"/docker-compose.yml up "$@"
docker-compose -f "${SCRIPT_DIR}"/docker-compose.yml up "$@"

Loading…
Cancel
Save