diff --git a/pkg/querier/queryrange/codec.go b/pkg/querier/queryrange/codec.go index 51f18bc8be..9f0def497d 100644 --- a/pkg/querier/queryrange/codec.go +++ b/pkg/querier/queryrange/codec.go @@ -58,59 +58,125 @@ func (r *LokiRequest) WithShards(shards logql.Shards) *LokiRequest { return &new } +func (r *LokiSeriesRequest) GetEnd() int64 { + return r.EndTs.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond)) +} + +func (r *LokiSeriesRequest) GetStart() int64 { + return r.StartTs.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond)) +} + +func (r *LokiSeriesRequest) WithStartEnd(s int64, e int64) queryrange.Request { + new := *r + new.StartTs = time.Unix(0, s*int64(time.Millisecond)) + new.EndTs = time.Unix(0, e*int64(time.Millisecond)) + return &new +} + +func (r *LokiSeriesRequest) WithQuery(query string) queryrange.Request { + new := *r + return &new +} + +func (r *LokiSeriesRequest) GetQuery() string { + return "" +} + +func (r *LokiSeriesRequest) GetStep() int64 { + return 0 +} + func (codec) DecodeRequest(_ context.Context, r *http.Request) (queryrange.Request, error) { if err := r.ParseForm(); err != nil { return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } - req, err := loghttp.ParseRangeQuery(r) - if err != nil { - return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + + switch op := getOperation(r); op { + case "query_range": + req, err := loghttp.ParseRangeQuery(r) + if err != nil { + return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + } + return &LokiRequest{ + Query: req.Query, + Limit: req.Limit, + Direction: req.Direction, + StartTs: req.Start.UTC(), + EndTs: req.End.UTC(), + // GetStep must return milliseconds + Step: int64(req.Step) / 1e6, + Path: r.URL.Path, + Shards: req.Shards, + }, nil + case "series": + req, err := loghttp.ParseSeriesQuery(r) + if err != nil { + return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + } + return &LokiSeriesRequest{ + Match: req.Groups, + StartTs: req.Start.UTC(), + EndTs: req.End.UTC(), + Path: r.URL.Path, + }, nil + default: + return nil, httpgrpc.Errorf(http.StatusBadRequest, fmt.Sprintf("unknown request path: %s", r.URL.Path)) } - return &LokiRequest{ - Query: req.Query, - Limit: req.Limit, - Direction: req.Direction, - StartTs: req.Start.UTC(), - EndTs: req.End.UTC(), - // GetStep must return milliseconds - Step: int64(req.Step) / 1e6, - Path: r.URL.Path, - Shards: req.Shards, - }, nil + } func (codec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.Request, error) { - lokiReq, ok := r.(*LokiRequest) - if !ok { + switch request := r.(type) { + case *LokiRequest: + params := url.Values{ + "start": []string{fmt.Sprintf("%d", request.StartTs.UnixNano())}, + "end": []string{fmt.Sprintf("%d", request.EndTs.UnixNano())}, + "query": []string{request.Query}, + "direction": []string{request.Direction.String()}, + "limit": []string{fmt.Sprintf("%d", request.Limit)}, + } + if len(request.Shards) > 0 { + params["shards"] = request.Shards + } + if request.Step != 0 { + params["step"] = []string{fmt.Sprintf("%f", float64(request.Step)/float64(1e3))} + } + u := &url.URL{ + // the request could come /api/prom/query but we want to only use the new api. + Path: "/loki/api/v1/query_range", + RawQuery: params.Encode(), + } + req := &http.Request{ + Method: "GET", + RequestURI: u.String(), // This is what the httpgrpc code looks at. + URL: u, + Body: http.NoBody, + Header: http.Header{}, + } + + return req.WithContext(ctx), nil + case *LokiSeriesRequest: + params := url.Values{ + "start": []string{fmt.Sprintf("%d", request.StartTs.UnixNano())}, + "end": []string{fmt.Sprintf("%d", request.EndTs.UnixNano())}, + "match[]": request.Match, + } + + u := &url.URL{ + Path: "/loki/api/v1/series", + RawQuery: params.Encode(), + } + req := &http.Request{ + Method: "GET", + RequestURI: u.String(), // This is what the httpgrpc code looks at. + URL: u, + Body: http.NoBody, + Header: http.Header{}, + } + return req.WithContext(ctx), nil + default: return nil, httpgrpc.Errorf(http.StatusInternalServerError, "invalid request format") } - params := url.Values{ - "start": []string{fmt.Sprintf("%d", lokiReq.StartTs.UnixNano())}, - "end": []string{fmt.Sprintf("%d", lokiReq.EndTs.UnixNano())}, - "query": []string{lokiReq.Query}, - "direction": []string{lokiReq.Direction.String()}, - "limit": []string{fmt.Sprintf("%d", lokiReq.Limit)}, - } - if len(lokiReq.Shards) > 0 { - params["shards"] = lokiReq.Shards - } - if lokiReq.Step != 0 { - params["step"] = []string{fmt.Sprintf("%f", float64(lokiReq.Step)/float64(1e3))} - } - u := &url.URL{ - // the request could come /api/prom/query but we want to only use the new api. - Path: "/loki/api/v1/query_range", - RawQuery: params.Encode(), - } - req := &http.Request{ - Method: "GET", - RequestURI: u.String(), // This is what the httpgrpc code looks at. - URL: u, - Body: http.NoBody, - Header: http.Header{}, - } - - return req.WithContext(ctx), nil } func (codec) DecodeResponse(ctx context.Context, r *http.Response, req queryrange.Request) (queryrange.Response, error) { @@ -130,85 +196,129 @@ func (codec) DecodeResponse(ctx context.Context, r *http.Response, req queryrang sp.LogFields(otlog.Int("bytes", len(buf))) - var resp loghttp.QueryResponse - if err := json.Unmarshal(buf, &resp); err != nil { - return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err) - } - switch string(resp.Data.ResultType) { - case loghttp.ResultTypeMatrix: - return &LokiPromResponse{ - Response: &queryrange.PrometheusResponse{ - Status: resp.Status, - Data: queryrange.PrometheusData{ - ResultType: loghttp.ResultTypeMatrix, - Result: toProto(resp.Data.Result.(loghttp.Matrix)), - }, - }, - Statistics: resp.Data.Statistics, - }, nil - case loghttp.ResultTypeStream: - return &LokiResponse{ - Status: resp.Status, - Direction: req.(*LokiRequest).Direction, - Limit: req.(*LokiRequest).Limit, - Version: uint32(loghttp.GetVersion(req.(*LokiRequest).Path)), - Statistics: resp.Data.Statistics, - Data: LokiData{ - ResultType: loghttp.ResultTypeStream, - Result: resp.Data.Result.(loghttp.Streams).ToProto(), - }, + switch req.(type) { + case *LokiSeriesRequest: + var resp loghttp.SeriesResponse + if err := json.Unmarshal(buf, &resp); err != nil { + return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err) + } + + data := make([]logproto.SeriesIdentifier, 0, len(resp.Data)) + for _, label := range resp.Data { + d := logproto.SeriesIdentifier{ + Labels: label.Map(), + } + data = append(data, d) + } + + return &LokiSeriesResponse{ + Status: resp.Status, + Version: uint32(loghttp.GetVersion(req.(*LokiSeriesRequest).Path)), + Data: data, }, nil default: - return nil, httpgrpc.Errorf(http.StatusBadRequest, "unsupported response type") + var resp loghttp.QueryResponse + if err := json.Unmarshal(buf, &resp); err != nil { + return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err) + } + switch string(resp.Data.ResultType) { + case loghttp.ResultTypeMatrix: + return &LokiPromResponse{ + Response: &queryrange.PrometheusResponse{ + Status: resp.Status, + Data: queryrange.PrometheusData{ + ResultType: loghttp.ResultTypeMatrix, + Result: toProto(resp.Data.Result.(loghttp.Matrix)), + }, + }, + Statistics: resp.Data.Statistics, + }, nil + case loghttp.ResultTypeStream: + return &LokiResponse{ + Status: resp.Status, + Direction: req.(*LokiRequest).Direction, + Limit: req.(*LokiRequest).Limit, + Version: uint32(loghttp.GetVersion(req.(*LokiRequest).Path)), + Statistics: resp.Data.Statistics, + Data: LokiData{ + ResultType: loghttp.ResultTypeStream, + Result: resp.Data.Result.(loghttp.Streams).ToProto(), + }, + }, nil + default: + return nil, httpgrpc.Errorf(http.StatusBadRequest, "unsupported response type") + } } + } func (codec) EncodeResponse(ctx context.Context, res queryrange.Response) (*http.Response, error) { sp, _ := opentracing.StartSpanFromContext(ctx, "codec.EncodeResponse") defer sp.Finish() - if promRes, ok := res.(*LokiPromResponse); ok { - return promRes.encode(ctx) - } + switch response := res.(type) { + case *LokiPromResponse: + return response.encode(ctx) + case *LokiResponse: + streams := make([]logproto.Stream, len(response.Data.Result)) - proto, ok := res.(*LokiResponse) - if !ok { - return nil, httpgrpc.Errorf(http.StatusInternalServerError, "invalid response format") - } + for i, stream := range response.Data.Result { + streams[i] = logproto.Stream{ + Labels: stream.Labels, + Entries: stream.Entries, + } + } + result := logql.Result{ + Data: logql.Streams(streams), + Statistics: response.Statistics, + } + var buf bytes.Buffer + if loghttp.Version(response.Version) == loghttp.VersionLegacy { + if err := marshal_legacy.WriteQueryResponseJSON(result, &buf); err != nil { + return nil, err + } + } else { + if err := marshal.WriteQueryResponseJSON(result, &buf); err != nil { + return nil, err + } + } - streams := make([]logproto.Stream, len(proto.Data.Result)) + sp.LogFields(otlog.Int("bytes", buf.Len())) - for i, stream := range proto.Data.Result { - streams[i] = logproto.Stream{ - Labels: stream.Labels, - Entries: stream.Entries, + resp := http.Response{ + Header: http.Header{ + "Content-Type": []string{"application/json"}, + }, + Body: ioutil.NopCloser(&buf), + StatusCode: http.StatusOK, } - } - result := logql.Result{ - Data: logql.Streams(streams), - Statistics: proto.Statistics, - } - var buf bytes.Buffer - if loghttp.Version(proto.Version) == loghttp.VersionLegacy { - if err := marshal_legacy.WriteQueryResponseJSON(result, &buf); err != nil { - return nil, err + return &resp, nil + case *LokiSeriesResponse: + var data []loghttp.LabelSet + for _, series := range response.Data { + data = append(data, series.GetLabels()) } - } else { - if err := marshal.WriteQueryResponseJSON(result, &buf); err != nil { + result := logproto.SeriesResponse{ + Series: response.Data, + } + var buf bytes.Buffer + if err := marshal.WriteSeriesResponseJSON(result, &buf); err != nil { return nil, err } - } - sp.LogFields(otlog.Int("bytes", buf.Len())) + sp.LogFields(otlog.Int("bytes", buf.Len())) - resp := http.Response{ - Header: http.Header{ - "Content-Type": []string{"application/json"}, - }, - Body: ioutil.NopCloser(&buf), - StatusCode: http.StatusOK, + resp := http.Response{ + Header: http.Header{ + "Content-Type": []string{"application/json"}, + }, + Body: ioutil.NopCloser(&buf), + StatusCode: http.StatusOK, + } + return &resp, nil + default: + return nil, httpgrpc.Errorf(http.StatusInternalServerError, "invalid response format") } - return &resp, nil } func (codec) MergeResponse(responses ...queryrange.Response) (queryrange.Response, error) { @@ -216,7 +326,9 @@ func (codec) MergeResponse(responses ...queryrange.Response) (queryrange.Respons return nil, errors.New("merging responses requires at least one response") } var mergedStats stats.Result - if _, ok := responses[0].(*LokiPromResponse); ok { + switch responses[0].(type) { + case *LokiPromResponse: + promResponses := make([]queryrange.Response, 0, len(responses)) for _, res := range responses { mergedStats.Merge(res.(*LokiPromResponse).Statistics) @@ -230,32 +342,55 @@ func (codec) MergeResponse(responses ...queryrange.Response) (queryrange.Respons Response: promRes.(*queryrange.PrometheusResponse), Statistics: mergedStats, }, nil - } - lokiRes, ok := responses[0].(*LokiResponse) - if !ok { - return nil, errors.New("unexpected response type while merging") - } + case *LokiResponse: + lokiRes := responses[0].(*LokiResponse) - lokiResponses := make([]*LokiResponse, 0, len(responses)) - for _, res := range responses { - lokiResult := res.(*LokiResponse) - mergedStats.Merge(lokiResult.Statistics) - lokiResponses = append(lokiResponses, lokiResult) - } + lokiResponses := make([]*LokiResponse, 0, len(responses)) + for _, res := range responses { + lokiResult := res.(*LokiResponse) + mergedStats.Merge(lokiResult.Statistics) + lokiResponses = append(lokiResponses, lokiResult) + } - return &LokiResponse{ - Status: loghttp.QueryStatusSuccess, - Direction: lokiRes.Direction, - Limit: lokiRes.Limit, - Version: lokiRes.Version, - ErrorType: lokiRes.ErrorType, - Error: lokiRes.Error, - Statistics: mergedStats, - Data: LokiData{ - ResultType: loghttp.ResultTypeStream, - Result: mergeOrderedNonOverlappingStreams(lokiResponses, lokiRes.Limit, lokiRes.Direction), - }, - }, nil + return &LokiResponse{ + Status: loghttp.QueryStatusSuccess, + Direction: lokiRes.Direction, + Limit: lokiRes.Limit, + Version: lokiRes.Version, + ErrorType: lokiRes.ErrorType, + Error: lokiRes.Error, + Statistics: mergedStats, + Data: LokiData{ + ResultType: loghttp.ResultTypeStream, + Result: mergeOrderedNonOverlappingStreams(lokiResponses, lokiRes.Limit, lokiRes.Direction), + }, + }, nil + case *LokiSeriesResponse: + lokiSeriesRes := responses[0].(*LokiSeriesResponse) + + var lokiSeriesData []logproto.SeriesIdentifier + uniqueSeries := make(map[string]struct{}) + + // only unique series should be merged + for _, res := range responses { + lokiResult := res.(*LokiSeriesResponse) + for _, series := range lokiResult.Data { + if _, ok := uniqueSeries[series.String()]; !ok { + lokiSeriesData = append(lokiSeriesData, series) + uniqueSeries[series.String()] = struct{}{} + } + + } + } + + return &LokiSeriesResponse{ + Status: lokiSeriesRes.Status, + Version: lokiSeriesRes.Version, + Data: lokiSeriesData, + }, nil + default: + return nil, errors.New("unknown response in merging responses") + } } // mergeOrderedNonOverlappingStreams merges a set of ordered, nonoverlapping responses by concatenating matching streams then running them through a heap to pull out limit values diff --git a/pkg/querier/queryrange/codec_test.go b/pkg/querier/queryrange/codec_test.go index e011d757b8..15ff10030f 100644 --- a/pkg/querier/queryrange/codec_test.go +++ b/pkg/querier/queryrange/codec_test.go @@ -48,6 +48,15 @@ func Test_codec_DecodeRequest(t *testing.T) { StartTs: start, EndTs: end, }, false}, + {"series", func() (*http.Request, error) { + return http.NewRequest(http.MethodGet, + fmt.Sprintf(`/series?start=%d&end=%d&match={foo="bar"}`, start.UnixNano(), end.UnixNano()), nil) + }, &LokiSeriesRequest{ + Match: []string{`{foo="bar"}`}, + Path: "/series", + StartTs: start, + EndTs: end, + }, false}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -115,6 +124,13 @@ func Test_codec_DecodeResponse(t *testing.T) { }, Statistics: statsResult, }, false}, + {"series", &http.Response{StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader(seriesString))}, + &LokiSeriesRequest{Path: "/loki/api/v1/series"}, + &LokiSeriesResponse{ + Status: "success", + Version: uint32(loghttp.VersionV1), + Data: seriesData, + }, false}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -168,6 +184,35 @@ func Test_codec_EncodeRequest(t *testing.T) { require.Equal(t, "/loki/api/v1/query_range", req.(*LokiRequest).Path) } +func Test_codec_series_EncodeRequest(t *testing.T) { + got, err := lokiCodec.EncodeRequest(context.TODO(), &queryrange.PrometheusRequest{}) + require.Error(t, err) + require.Nil(t, got) + + ctx := context.Background() + toEncode := &LokiSeriesRequest{ + Match: []string{`{foo="bar"}`}, + Path: "/series", + StartTs: start, + EndTs: end, + } + got, err = lokiCodec.EncodeRequest(ctx, toEncode) + require.NoError(t, err) + require.Equal(t, ctx, got.Context()) + require.Equal(t, "/loki/api/v1/series", got.URL.Path) + require.Equal(t, fmt.Sprintf("%d", start.UnixNano()), got.URL.Query().Get("start")) + require.Equal(t, fmt.Sprintf("%d", end.UnixNano()), got.URL.Query().Get("end")) + require.Equal(t, `{foo="bar"}`, got.URL.Query().Get("match[]")) + + // testing a full roundtrip + req, err := lokiCodec.DecodeRequest(context.TODO(), got) + require.NoError(t, err) + require.Equal(t, toEncode.Match, req.(*LokiSeriesRequest).Match) + require.Equal(t, toEncode.StartTs, req.(*LokiSeriesRequest).StartTs) + require.Equal(t, toEncode.EndTs, req.(*LokiSeriesRequest).EndTs) + require.Equal(t, "/loki/api/v1/series", req.(*LokiSeriesRequest).Path) +} + func Test_codec_EncodeResponse(t *testing.T) { tests := []struct { @@ -211,6 +256,12 @@ func Test_codec_EncodeResponse(t *testing.T) { }, Statistics: statsResult, }, streamsStringLegacy, false}, + {"loki series", + &LokiSeriesResponse{ + Status: "success", + Version: uint32(loghttp.VersionV1), + Data: seriesData, + }, seriesString, false}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -604,6 +655,51 @@ func Test_codec_MergeResponse(t *testing.T) { }, false, }, + { + "loki series", + []queryrange.Response{ + &LokiSeriesResponse{ + Status: "success", + Version: 1, + Data: []logproto.SeriesIdentifier{ + { + Labels: map[string]string{"filename": "/var/hostlog/apport.log", "job": "varlogs"}, + }, + { + Labels: map[string]string{"filename": "/var/hostlog/test.log", "job": "varlogs"}, + }, + }, + }, + &LokiSeriesResponse{ + Status: "success", + Version: 1, + Data: []logproto.SeriesIdentifier{ + { + Labels: map[string]string{"filename": "/var/hostlog/apport.log", "job": "varlogs"}, + }, + { + Labels: map[string]string{"filename": "/var/hostlog/other.log", "job": "varlogs"}, + }, + }, + }, + }, + &LokiSeriesResponse{ + Status: "success", + Version: 1, + Data: []logproto.SeriesIdentifier{ + { + Labels: map[string]string{"filename": "/var/hostlog/apport.log", "job": "varlogs"}, + }, + { + Labels: map[string]string{"filename": "/var/hostlog/test.log", "job": "varlogs"}, + }, + { + Labels: map[string]string{"filename": "/var/hostlog/other.log", "job": "varlogs"}, + }, + }, + }, + false, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -756,6 +852,21 @@ var ( }, }, } + seriesString = `{ + "status": "success", + "data": [ + {"filename": "/var/hostlog/apport.log", "job": "varlogs"}, + {"filename": "/var/hostlog/test.log", "job": "varlogs"} + ] + }` + seriesData = []logproto.SeriesIdentifier{ + { + Labels: map[string]string{"filename": "/var/hostlog/apport.log", "job": "varlogs"}, + }, + { + Labels: map[string]string{"filename": "/var/hostlog/test.log", "job": "varlogs"}, + }, + } statsResult = stats.Result{ Summary: stats.Summary{ BytesProcessedPerSecond: 20, diff --git a/pkg/querier/queryrange/queryrange.pb.go b/pkg/querier/queryrange/queryrange.pb.go index 79f36216a6..94a52862eb 100644 --- a/pkg/querier/queryrange/queryrange.pb.go +++ b/pkg/querier/queryrange/queryrange.pb.go @@ -230,6 +230,132 @@ func (m *LokiResponse) GetStatistics() stats.Result { return stats.Result{} } +type LokiSeriesRequest struct { + Match []string `protobuf:"bytes,1,rep,name=match,proto3" json:"match,omitempty"` + StartTs time.Time `protobuf:"bytes,2,opt,name=startTs,proto3,stdtime" json:"startTs"` + EndTs time.Time `protobuf:"bytes,3,opt,name=endTs,proto3,stdtime" json:"endTs"` + Path string `protobuf:"bytes,4,opt,name=path,proto3" json:"path,omitempty"` +} + +func (m *LokiSeriesRequest) Reset() { *m = LokiSeriesRequest{} } +func (*LokiSeriesRequest) ProtoMessage() {} +func (*LokiSeriesRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_51b9d53b40d11902, []int{2} +} +func (m *LokiSeriesRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *LokiSeriesRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_LokiSeriesRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *LokiSeriesRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_LokiSeriesRequest.Merge(m, src) +} +func (m *LokiSeriesRequest) XXX_Size() int { + return m.Size() +} +func (m *LokiSeriesRequest) XXX_DiscardUnknown() { + xxx_messageInfo_LokiSeriesRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_LokiSeriesRequest proto.InternalMessageInfo + +func (m *LokiSeriesRequest) GetMatch() []string { + if m != nil { + return m.Match + } + return nil +} + +func (m *LokiSeriesRequest) GetStartTs() time.Time { + if m != nil { + return m.StartTs + } + return time.Time{} +} + +func (m *LokiSeriesRequest) GetEndTs() time.Time { + if m != nil { + return m.EndTs + } + return time.Time{} +} + +func (m *LokiSeriesRequest) GetPath() string { + if m != nil { + return m.Path + } + return "" +} + +type LokiSeriesResponse struct { + Status string `protobuf:"bytes,1,opt,name=Status,json=status,proto3" json:"status"` + Data []logproto.SeriesIdentifier `protobuf:"bytes,2,rep,name=Data,json=data,proto3" json:"data,omitempty"` + Version uint32 `protobuf:"varint,3,opt,name=version,proto3" json:"version,omitempty"` +} + +func (m *LokiSeriesResponse) Reset() { *m = LokiSeriesResponse{} } +func (*LokiSeriesResponse) ProtoMessage() {} +func (*LokiSeriesResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_51b9d53b40d11902, []int{3} +} +func (m *LokiSeriesResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *LokiSeriesResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_LokiSeriesResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *LokiSeriesResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_LokiSeriesResponse.Merge(m, src) +} +func (m *LokiSeriesResponse) XXX_Size() int { + return m.Size() +} +func (m *LokiSeriesResponse) XXX_DiscardUnknown() { + xxx_messageInfo_LokiSeriesResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_LokiSeriesResponse proto.InternalMessageInfo + +func (m *LokiSeriesResponse) GetStatus() string { + if m != nil { + return m.Status + } + return "" +} + +func (m *LokiSeriesResponse) GetData() []logproto.SeriesIdentifier { + if m != nil { + return m.Data + } + return nil +} + +func (m *LokiSeriesResponse) GetVersion() uint32 { + if m != nil { + return m.Version + } + return 0 +} + type LokiData struct { ResultType string `protobuf:"bytes,1,opt,name=ResultType,json=resultType,proto3" json:"resultType"` Result []github_com_grafana_loki_pkg_logproto.Stream `protobuf:"bytes,2,rep,name=Result,json=result,proto3,customtype=github.com/grafana/loki/pkg/logproto.Stream" json:"result"` @@ -238,7 +364,7 @@ type LokiData struct { func (m *LokiData) Reset() { *m = LokiData{} } func (*LokiData) ProtoMessage() {} func (*LokiData) Descriptor() ([]byte, []int) { - return fileDescriptor_51b9d53b40d11902, []int{2} + return fileDescriptor_51b9d53b40d11902, []int{4} } func (m *LokiData) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -283,7 +409,7 @@ type LokiPromResponse struct { func (m *LokiPromResponse) Reset() { *m = LokiPromResponse{} } func (*LokiPromResponse) ProtoMessage() {} func (*LokiPromResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_51b9d53b40d11902, []int{3} + return fileDescriptor_51b9d53b40d11902, []int{5} } func (m *LokiPromResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -329,6 +455,8 @@ func (m *LokiPromResponse) GetStatistics() stats.Result { func init() { proto.RegisterType((*LokiRequest)(nil), "queryrange.LokiRequest") proto.RegisterType((*LokiResponse)(nil), "queryrange.LokiResponse") + proto.RegisterType((*LokiSeriesRequest)(nil), "queryrange.LokiSeriesRequest") + proto.RegisterType((*LokiSeriesResponse)(nil), "queryrange.LokiSeriesResponse") proto.RegisterType((*LokiData)(nil), "queryrange.LokiData") proto.RegisterType((*LokiPromResponse)(nil), "queryrange.LokiPromResponse") } @@ -338,50 +466,55 @@ func init() { } var fileDescriptor_51b9d53b40d11902 = []byte{ - // 687 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x53, 0xcf, 0x6e, 0x13, 0x3f, - 0x10, 0x5e, 0xe7, 0x7f, 0x9c, 0x5f, 0xfb, 0x43, 0x6e, 0x45, 0x57, 0x41, 0xf2, 0x46, 0xb9, 0x10, - 0x04, 0x6c, 0x44, 0x0a, 0x17, 0x24, 0x50, 0xbb, 0x2a, 0xe2, 0xc2, 0x01, 0xb9, 0x79, 0x01, 0x37, - 0x71, 0x37, 0x4b, 0xb3, 0xf1, 0xd6, 0x76, 0x10, 0xbd, 0x71, 0xe5, 0xd6, 0xc7, 0x40, 0x9c, 0x79, - 0x88, 0x1e, 0x7b, 0xac, 0x38, 0x04, 0x9a, 0x5e, 0x50, 0x4e, 0x7d, 0x00, 0x0e, 0xc8, 0xf6, 0x6e, - 0xb2, 0x45, 0x1c, 0xca, 0x65, 0x67, 0xe6, 0x9b, 0xf9, 0xec, 0x99, 0xcf, 0xb3, 0xf0, 0x7e, 0x72, - 0x14, 0x76, 0x8f, 0xa7, 0x4c, 0x44, 0x4c, 0x18, 0x7b, 0x22, 0xe8, 0x24, 0x64, 0x39, 0xd7, 0x4f, - 0x04, 0x57, 0x1c, 0xc1, 0x15, 0xd2, 0x7c, 0x1c, 0x46, 0x6a, 0x34, 0x3d, 0xf0, 0x07, 0x3c, 0xee, - 0x86, 0x3c, 0xe4, 0x5d, 0x53, 0x72, 0x30, 0x3d, 0x34, 0x91, 0x09, 0x8c, 0x67, 0xa9, 0xcd, 0x7b, - 0xfa, 0x8e, 0x31, 0x0f, 0x6d, 0x22, 0x73, 0xfe, 0x48, 0x1e, 0x8f, 0xbb, 0x52, 0x51, 0x25, 0xed, - 0x37, 0x4d, 0xbe, 0xce, 0x5d, 0x34, 0xe0, 0x42, 0xb1, 0x0f, 0x89, 0xe0, 0xef, 0xd8, 0x40, 0xa5, - 0x51, 0xf7, 0x96, 0xdd, 0x37, 0xbd, 0x90, 0xf3, 0x70, 0xcc, 0x56, 0x8d, 0xaa, 0x28, 0x66, 0x52, - 0xd1, 0x38, 0xb1, 0x05, 0xed, 0xaf, 0x05, 0xd8, 0x78, 0xc3, 0x8f, 0x22, 0xc2, 0x8e, 0xa7, 0x4c, - 0x2a, 0xb4, 0x09, 0xcb, 0xe6, 0x10, 0x17, 0xb4, 0x40, 0xa7, 0x4e, 0x6c, 0xa0, 0xd1, 0x71, 0x14, - 0x47, 0xca, 0x2d, 0xb4, 0x40, 0x67, 0x8d, 0xd8, 0x00, 0x21, 0x58, 0x92, 0x8a, 0x25, 0x6e, 0xb1, - 0x05, 0x3a, 0x45, 0x62, 0x7c, 0xf4, 0x12, 0x56, 0xa5, 0xa2, 0x42, 0xf5, 0xa5, 0x5b, 0x6a, 0x81, - 0x4e, 0xa3, 0xd7, 0xf4, 0x6d, 0x0b, 0x7e, 0xd6, 0x82, 0xdf, 0xcf, 0x5a, 0x08, 0x6a, 0x67, 0x33, - 0xcf, 0x39, 0xfd, 0xee, 0x01, 0x92, 0x91, 0xd0, 0x73, 0x58, 0x66, 0x93, 0x61, 0x5f, 0xba, 0xe5, - 0x7f, 0x60, 0x5b, 0x0a, 0x7a, 0x02, 0xeb, 0xc3, 0x48, 0xb0, 0x81, 0x8a, 0xf8, 0xc4, 0xad, 0xb4, - 0x40, 0x67, 0xbd, 0xb7, 0xe1, 0x2f, 0x65, 0xdf, 0xcb, 0x52, 0x64, 0x55, 0xa5, 0x47, 0x48, 0xa8, - 0x1a, 0xb9, 0x55, 0x33, 0xad, 0xf1, 0x51, 0x1b, 0x56, 0xe4, 0x88, 0x8a, 0xa1, 0x74, 0x6b, 0xad, - 0x62, 0xa7, 0x1e, 0xc0, 0xc5, 0xcc, 0x4b, 0x11, 0x92, 0xda, 0xf6, 0xaf, 0x02, 0xfc, 0xcf, 0xca, - 0x26, 0x13, 0x3e, 0x91, 0x4c, 0x93, 0xf6, 0x15, 0x55, 0x53, 0x69, 0x85, 0x4b, 0x49, 0x06, 0x21, - 0xa9, 0x45, 0x3b, 0xb0, 0xb4, 0x47, 0x15, 0x35, 0x22, 0x36, 0x7a, 0x9b, 0x7e, 0xee, 0xb5, 0xf4, - 0x59, 0x3a, 0x17, 0xdc, 0xd5, 0x43, 0x2d, 0x66, 0xde, 0xfa, 0x90, 0x2a, 0xfa, 0x88, 0xc7, 0x91, - 0x62, 0x71, 0xa2, 0x4e, 0x48, 0x49, 0xc7, 0xe8, 0x19, 0xac, 0xbf, 0x12, 0x82, 0x8b, 0xfe, 0x49, - 0xc2, 0x8c, 0xec, 0xf5, 0x60, 0x6b, 0x31, 0xf3, 0x36, 0x58, 0x06, 0xe6, 0x18, 0xf5, 0x25, 0x88, - 0x1e, 0xc0, 0xb2, 0xa1, 0x99, 0x27, 0xa9, 0x07, 0x1b, 0x8b, 0x99, 0xf7, 0xbf, 0xc9, 0xe6, 0xca, - 0xcb, 0x06, 0xb8, 0xa9, 0x61, 0xf9, 0x56, 0x1a, 0x2e, 0x97, 0xa3, 0x92, 0x5f, 0x0e, 0x17, 0x56, - 0xdf, 0x33, 0x21, 0xf5, 0x31, 0x55, 0x83, 0x67, 0x21, 0xda, 0x85, 0x50, 0x0b, 0x12, 0x49, 0x15, - 0x0d, 0xb4, 0xc6, 0x5a, 0x8c, 0x35, 0xdf, 0xae, 0x3f, 0x61, 0x72, 0x3a, 0x56, 0x01, 0x4a, 0x55, - 0xc8, 0x15, 0x92, 0x9c, 0xdf, 0xfe, 0x02, 0x60, 0x2d, 0x93, 0x0c, 0xf9, 0x10, 0x5a, 0x9a, 0x51, - 0xc5, 0xca, 0xbf, 0xae, 0xc9, 0x62, 0x89, 0x92, 0x9c, 0x8f, 0x26, 0xb0, 0x62, 0xeb, 0xdd, 0x42, - 0xab, 0xd8, 0x69, 0xf4, 0xb6, 0x56, 0xf3, 0xed, 0x2b, 0xc1, 0x68, 0xbc, 0x3b, 0xa4, 0x89, 0x62, - 0x22, 0x78, 0xa1, 0xbb, 0xf8, 0x36, 0xf3, 0x1e, 0xe6, 0x7f, 0x7b, 0x41, 0x0f, 0xe9, 0x84, 0x76, - 0xc7, 0xfc, 0x28, 0xea, 0xe6, 0xff, 0xef, 0x94, 0xab, 0x9f, 0xdd, 0xde, 0x45, 0x52, 0xdb, 0xfe, - 0x04, 0xe0, 0x1d, 0xdd, 0xec, 0x5b, 0xc1, 0xe3, 0xe5, 0xbe, 0xec, 0xc0, 0x9a, 0x48, 0x7d, 0xd3, - 0x72, 0xa3, 0x87, 0xf3, 0xfb, 0xa0, 0x6b, 0x99, 0x1a, 0xb1, 0xa9, 0xcc, 0x18, 0x41, 0xe9, 0x6c, - 0xe6, 0x01, 0xb2, 0x64, 0xa1, 0xed, 0x1b, 0x32, 0x16, 0xfe, 0x26, 0xa3, 0xa6, 0x38, 0x79, 0xe1, - 0x82, 0xa7, 0xe7, 0x97, 0xd8, 0xb9, 0xb8, 0xc4, 0xce, 0xf5, 0x25, 0x06, 0x1f, 0xe7, 0x18, 0x7c, - 0x9e, 0x63, 0x70, 0x36, 0xc7, 0xe0, 0x7c, 0x8e, 0xc1, 0x8f, 0x39, 0x06, 0x3f, 0xe7, 0xd8, 0xb9, - 0x9e, 0x63, 0x70, 0x7a, 0x85, 0x9d, 0xf3, 0x2b, 0xec, 0x5c, 0x5c, 0x61, 0xe7, 0xa0, 0x62, 0x26, - 0xdc, 0xfe, 0x1d, 0x00, 0x00, 0xff, 0xff, 0x80, 0x97, 0x84, 0xc9, 0x35, 0x05, 0x00, 0x00, + // 764 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x54, 0x4f, 0x6f, 0x13, 0x39, + 0x14, 0x1f, 0xe7, 0x7f, 0x9c, 0x6d, 0x77, 0xd7, 0xad, 0xb6, 0xa3, 0xac, 0x34, 0x13, 0xe5, 0xb2, + 0x59, 0xed, 0x32, 0x11, 0x29, 0x5c, 0x90, 0x40, 0xed, 0xa8, 0x80, 0x90, 0x38, 0xa0, 0x69, 0xbe, + 0x80, 0x9b, 0xb8, 0x93, 0xa1, 0x99, 0x78, 0x6a, 0x3b, 0x88, 0xde, 0xb8, 0x72, 0xeb, 0x99, 0x4f, + 0x80, 0x38, 0x73, 0xe1, 0x1b, 0xf4, 0xd8, 0x63, 0xc5, 0x21, 0xd0, 0xf4, 0x82, 0x72, 0xea, 0x07, + 0xe0, 0x80, 0x6c, 0xcf, 0x4c, 0x5c, 0x84, 0x44, 0x2b, 0x2e, 0xf1, 0x7b, 0xcf, 0xef, 0x67, 0xff, + 0xfc, 0x7b, 0xbf, 0x0c, 0xfc, 0x27, 0x39, 0x08, 0xbb, 0x87, 0x53, 0xc2, 0x22, 0xc2, 0xd4, 0x7a, + 0xc4, 0xf0, 0x24, 0x24, 0x46, 0xe8, 0x25, 0x8c, 0x0a, 0x8a, 0xe0, 0xb2, 0xd2, 0xbc, 0x15, 0x46, + 0x62, 0x34, 0xdd, 0xf3, 0x06, 0x34, 0xee, 0x86, 0x34, 0xa4, 0x5d, 0xd5, 0xb2, 0x37, 0xdd, 0x57, + 0x99, 0x4a, 0x54, 0xa4, 0xa1, 0xcd, 0xbf, 0xe5, 0x1d, 0x63, 0x1a, 0xea, 0x8d, 0x2c, 0xf8, 0x6e, + 0xf3, 0x70, 0xdc, 0xe5, 0x02, 0x0b, 0xae, 0x7f, 0xd3, 0xcd, 0xc7, 0xc6, 0x45, 0x03, 0xca, 0x04, + 0x79, 0x99, 0x30, 0xfa, 0x9c, 0x0c, 0x44, 0x9a, 0x75, 0xaf, 0xc9, 0xbe, 0xe9, 0x86, 0x94, 0x86, + 0x63, 0xb2, 0x24, 0x2a, 0xa2, 0x98, 0x70, 0x81, 0xe3, 0x44, 0x37, 0xb4, 0xdf, 0x17, 0x60, 0xe3, + 0x29, 0x3d, 0x88, 0x02, 0x72, 0x38, 0x25, 0x5c, 0xa0, 0x75, 0x58, 0x56, 0x87, 0xd8, 0xa0, 0x05, + 0x3a, 0xf5, 0x40, 0x27, 0xb2, 0x3a, 0x8e, 0xe2, 0x48, 0xd8, 0x85, 0x16, 0xe8, 0xac, 0x04, 0x3a, + 0x41, 0x08, 0x96, 0xb8, 0x20, 0x89, 0x5d, 0x6c, 0x81, 0x4e, 0x31, 0x50, 0x31, 0x7a, 0x00, 0xab, + 0x5c, 0x60, 0x26, 0xfa, 0xdc, 0x2e, 0xb5, 0x40, 0xa7, 0xd1, 0x6b, 0x7a, 0x9a, 0x82, 0x97, 0x51, + 0xf0, 0xfa, 0x19, 0x05, 0xbf, 0x76, 0x32, 0x73, 0xad, 0xe3, 0x4f, 0x2e, 0x08, 0x32, 0x10, 0xba, + 0x07, 0xcb, 0x64, 0x32, 0xec, 0x73, 0xbb, 0x7c, 0x03, 0xb4, 0x86, 0xa0, 0xdb, 0xb0, 0x3e, 0x8c, + 0x18, 0x19, 0x88, 0x88, 0x4e, 0xec, 0x4a, 0x0b, 0x74, 0x56, 0x7b, 0x6b, 0x5e, 0x2e, 0xfb, 0x4e, + 0xb6, 0x15, 0x2c, 0xbb, 0xe4, 0x13, 0x12, 0x2c, 0x46, 0x76, 0x55, 0xbd, 0x56, 0xc5, 0xa8, 0x0d, + 0x2b, 0x7c, 0x84, 0xd9, 0x90, 0xdb, 0xb5, 0x56, 0xb1, 0x53, 0xf7, 0xe1, 0x62, 0xe6, 0xa6, 0x95, + 0x20, 0x5d, 0xdb, 0x5f, 0x0b, 0xf0, 0x37, 0x2d, 0x1b, 0x4f, 0xe8, 0x84, 0x13, 0x09, 0xda, 0x15, + 0x58, 0x4c, 0xb9, 0x16, 0x2e, 0x05, 0xa9, 0x4a, 0x90, 0xae, 0x68, 0x0b, 0x96, 0x76, 0xb0, 0xc0, + 0x4a, 0xc4, 0x46, 0x6f, 0xdd, 0x33, 0xa6, 0x25, 0xcf, 0x92, 0x7b, 0xfe, 0x5f, 0xf2, 0x51, 0x8b, + 0x99, 0xbb, 0x3a, 0xc4, 0x02, 0xff, 0x4f, 0xe3, 0x48, 0x90, 0x38, 0x11, 0x47, 0x41, 0x49, 0xe6, + 0xe8, 0x2e, 0xac, 0x3f, 0x64, 0x8c, 0xb2, 0xfe, 0x51, 0x42, 0x94, 0xec, 0x75, 0x7f, 0x63, 0x31, + 0x73, 0xd7, 0x48, 0x56, 0x34, 0x10, 0xf5, 0xbc, 0x88, 0xfe, 0x85, 0x65, 0x05, 0x53, 0x23, 0xa9, + 0xfb, 0x6b, 0x8b, 0x99, 0xfb, 0xbb, 0xda, 0x35, 0xda, 0xcb, 0xaa, 0x70, 0x55, 0xc3, 0xf2, 0xb5, + 0x34, 0xcc, 0xcd, 0x51, 0x31, 0xcd, 0x61, 0xc3, 0xea, 0x0b, 0xc2, 0xb8, 0x3c, 0xa6, 0xaa, 0xea, + 0x59, 0x8a, 0xb6, 0x21, 0x94, 0x82, 0x44, 0x5c, 0x44, 0x03, 0xa9, 0xb1, 0x14, 0x63, 0xc5, 0xd3, + 0xf6, 0x0f, 0x08, 0x9f, 0x8e, 0x85, 0x8f, 0x52, 0x15, 0x8c, 0xc6, 0xc0, 0x88, 0xdb, 0x1f, 0x00, + 0xfc, 0x53, 0x4a, 0xb6, 0x2b, 0xff, 0x01, 0xdc, 0xf0, 0x6e, 0x8c, 0xc5, 0x60, 0x64, 0x03, 0x39, + 0xb7, 0x40, 0x27, 0xa6, 0x23, 0x0b, 0xbf, 0xe4, 0xc8, 0xe2, 0xcd, 0x1d, 0x99, 0xd9, 0xab, 0xb4, + 0xb4, 0x57, 0xfb, 0x0d, 0x80, 0xc8, 0xe4, 0x7e, 0x03, 0x03, 0x3d, 0xca, 0x0d, 0x54, 0x54, 0x4c, + 0xf2, 0xb9, 0xe8, 0xb3, 0x9e, 0x0c, 0xc9, 0x44, 0x44, 0xfb, 0x11, 0x61, 0x3f, 0xb1, 0x91, 0x31, + 0x9b, 0xe2, 0x95, 0xd9, 0xb4, 0xdf, 0x01, 0x58, 0xcb, 0xbc, 0x88, 0x3c, 0x08, 0xf5, 0x3c, 0x94, + 0xdd, 0x34, 0xad, 0x55, 0x39, 0x15, 0x96, 0x57, 0x03, 0x23, 0x46, 0x13, 0x58, 0xd1, 0xfd, 0x29, + 0xc1, 0x0d, 0x83, 0xa0, 0x60, 0x04, 0xc7, 0xdb, 0x43, 0x9c, 0x08, 0xc2, 0xfc, 0xfb, 0x92, 0xdd, + 0xc7, 0x99, 0xfb, 0x9f, 0xf9, 0x3d, 0x65, 0x78, 0x1f, 0x4f, 0x70, 0x77, 0x4c, 0x0f, 0xa2, 0xae, + 0xf9, 0xe1, 0x4c, 0xb1, 0x52, 0x0e, 0x7d, 0x57, 0x90, 0xae, 0xed, 0xd7, 0x00, 0xfe, 0x21, 0xc9, + 0x3e, 0x63, 0x34, 0xce, 0x75, 0xdc, 0x82, 0x35, 0x96, 0xc6, 0x8a, 0x72, 0xa3, 0xe7, 0x98, 0x7f, + 0x34, 0xd9, 0x4b, 0xc4, 0x88, 0x4c, 0x73, 0xe5, 0xfd, 0xd2, 0xc9, 0xcc, 0x05, 0x41, 0x8e, 0x42, + 0x9b, 0x57, 0xfc, 0x59, 0xf8, 0x91, 0x3f, 0x25, 0xc4, 0x32, 0x1d, 0xe9, 0xdf, 0x39, 0x3d, 0x77, + 0xac, 0xb3, 0x73, 0xc7, 0xba, 0x3c, 0x77, 0xc0, 0xab, 0xb9, 0x03, 0xde, 0xce, 0x1d, 0x70, 0x32, + 0x77, 0xc0, 0xe9, 0xdc, 0x01, 0x9f, 0xe7, 0x0e, 0xf8, 0x32, 0x77, 0xac, 0xcb, 0xb9, 0x03, 0x8e, + 0x2f, 0x1c, 0xeb, 0xf4, 0xc2, 0xb1, 0xce, 0x2e, 0x1c, 0x6b, 0xaf, 0xa2, 0x5e, 0xb8, 0xf9, 0x2d, + 0x00, 0x00, 0xff, 0xff, 0x93, 0x63, 0xfc, 0xe4, 0x8e, 0x06, 0x00, 0x00, } func (this *LokiRequest) Equal(that interface{}) bool { @@ -479,6 +612,79 @@ func (this *LokiResponse) Equal(that interface{}) bool { } return true } +func (this *LokiSeriesRequest) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*LokiSeriesRequest) + if !ok { + that2, ok := that.(LokiSeriesRequest) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if len(this.Match) != len(that1.Match) { + return false + } + for i := range this.Match { + if this.Match[i] != that1.Match[i] { + return false + } + } + if !this.StartTs.Equal(that1.StartTs) { + return false + } + if !this.EndTs.Equal(that1.EndTs) { + return false + } + if this.Path != that1.Path { + return false + } + return true +} +func (this *LokiSeriesResponse) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*LokiSeriesResponse) + if !ok { + that2, ok := that.(LokiSeriesResponse) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.Status != that1.Status { + return false + } + if len(this.Data) != len(that1.Data) { + return false + } + for i := range this.Data { + if !this.Data[i].Equal(&that1.Data[i]) { + return false + } + } + if this.Version != that1.Version { + return false + } + return true +} func (this *LokiData) Equal(that interface{}) bool { if that == nil { return this == nil @@ -572,6 +778,37 @@ func (this *LokiResponse) GoString() string { s = append(s, "}") return strings.Join(s, "") } +func (this *LokiSeriesRequest) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 8) + s = append(s, "&queryrange.LokiSeriesRequest{") + s = append(s, "Match: "+fmt.Sprintf("%#v", this.Match)+",\n") + s = append(s, "StartTs: "+fmt.Sprintf("%#v", this.StartTs)+",\n") + s = append(s, "EndTs: "+fmt.Sprintf("%#v", this.EndTs)+",\n") + s = append(s, "Path: "+fmt.Sprintf("%#v", this.Path)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *LokiSeriesResponse) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 7) + s = append(s, "&queryrange.LokiSeriesResponse{") + s = append(s, "Status: "+fmt.Sprintf("%#v", this.Status)+",\n") + if this.Data != nil { + vs := make([]*logproto.SeriesIdentifier, len(this.Data)) + for i := range vs { + vs[i] = &this.Data[i] + } + s = append(s, "Data: "+fmt.Sprintf("%#v", vs)+",\n") + } + s = append(s, "Version: "+fmt.Sprintf("%#v", this.Version)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} func (this *LokiData) GoString() string { if this == nil { return "nil" @@ -747,6 +984,102 @@ func (m *LokiResponse) MarshalTo(dAtA []byte) (int, error) { return i, nil } +func (m *LokiSeriesRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *LokiSeriesRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Match) > 0 { + for _, s := range m.Match { + dAtA[i] = 0xa + i++ + l = len(s) + for l >= 1<<7 { + dAtA[i] = uint8(uint64(l)&0x7f | 0x80) + l >>= 7 + i++ + } + dAtA[i] = uint8(l) + i++ + i += copy(dAtA[i:], s) + } + } + dAtA[i] = 0x12 + i++ + i = encodeVarintQueryrange(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.StartTs))) + n5, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.StartTs, dAtA[i:]) + if err != nil { + return 0, err + } + i += n5 + dAtA[i] = 0x1a + i++ + i = encodeVarintQueryrange(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.EndTs))) + n6, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.EndTs, dAtA[i:]) + if err != nil { + return 0, err + } + i += n6 + if len(m.Path) > 0 { + dAtA[i] = 0x22 + i++ + i = encodeVarintQueryrange(dAtA, i, uint64(len(m.Path))) + i += copy(dAtA[i:], m.Path) + } + return i, nil +} + +func (m *LokiSeriesResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *LokiSeriesResponse) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Status) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintQueryrange(dAtA, i, uint64(len(m.Status))) + i += copy(dAtA[i:], m.Status) + } + if len(m.Data) > 0 { + for _, msg := range m.Data { + dAtA[i] = 0x12 + i++ + i = encodeVarintQueryrange(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + if m.Version != 0 { + dAtA[i] = 0x18 + i++ + i = encodeVarintQueryrange(dAtA, i, uint64(m.Version)) + } + return i, nil +} + func (m *LokiData) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -802,20 +1135,20 @@ func (m *LokiPromResponse) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0xa i++ i = encodeVarintQueryrange(dAtA, i, uint64(m.Response.Size())) - n5, err := m.Response.MarshalTo(dAtA[i:]) + n7, err := m.Response.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n5 + i += n7 } dAtA[i] = 0x12 i++ i = encodeVarintQueryrange(dAtA, i, uint64(m.Statistics.Size())) - n6, err := m.Statistics.MarshalTo(dAtA[i:]) + n8, err := m.Statistics.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n6 + i += n8 return i, nil } @@ -898,16 +1231,61 @@ func (m *LokiResponse) Size() (n int) { return n } -func (m *LokiData) Size() (n int) { +func (m *LokiSeriesRequest) Size() (n int) { if m == nil { return 0 } var l int _ = l - l = len(m.ResultType) - if l > 0 { - n += 1 + l + sovQueryrange(uint64(l)) - } + if len(m.Match) > 0 { + for _, s := range m.Match { + l = len(s) + n += 1 + l + sovQueryrange(uint64(l)) + } + } + l = github_com_gogo_protobuf_types.SizeOfStdTime(m.StartTs) + n += 1 + l + sovQueryrange(uint64(l)) + l = github_com_gogo_protobuf_types.SizeOfStdTime(m.EndTs) + n += 1 + l + sovQueryrange(uint64(l)) + l = len(m.Path) + if l > 0 { + n += 1 + l + sovQueryrange(uint64(l)) + } + return n +} + +func (m *LokiSeriesResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Status) + if l > 0 { + n += 1 + l + sovQueryrange(uint64(l)) + } + if len(m.Data) > 0 { + for _, e := range m.Data { + l = e.Size() + n += 1 + l + sovQueryrange(uint64(l)) + } + } + if m.Version != 0 { + n += 1 + sovQueryrange(uint64(m.Version)) + } + return n +} + +func (m *LokiData) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.ResultType) + if l > 0 { + n += 1 + l + sovQueryrange(uint64(l)) + } if len(m.Result) > 0 { for _, e := range m.Result { l = e.Size() @@ -979,6 +1357,31 @@ func (this *LokiResponse) String() string { }, "") return s } +func (this *LokiSeriesRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&LokiSeriesRequest{`, + `Match:` + fmt.Sprintf("%v", this.Match) + `,`, + `StartTs:` + strings.Replace(strings.Replace(this.StartTs.String(), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`, + `EndTs:` + strings.Replace(strings.Replace(this.EndTs.String(), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`, + `Path:` + fmt.Sprintf("%v", this.Path) + `,`, + `}`, + }, "") + return s +} +func (this *LokiSeriesResponse) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&LokiSeriesResponse{`, + `Status:` + fmt.Sprintf("%v", this.Status) + `,`, + `Data:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.Data), "SeriesIdentifier", "logproto.SeriesIdentifier", 1), `&`, ``, 1) + `,`, + `Version:` + fmt.Sprintf("%v", this.Version) + `,`, + `}`, + }, "") + return s +} func (this *LokiData) String() string { if this == nil { return "nil" @@ -1553,6 +1956,327 @@ func (m *LokiResponse) Unmarshal(dAtA []byte) error { } return nil } +func (m *LokiSeriesRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueryrange + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: LokiSeriesRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: LokiSeriesRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Match", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueryrange + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthQueryrange + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthQueryrange + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Match = append(m.Match, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field StartTs", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueryrange + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthQueryrange + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthQueryrange + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(&m.StartTs, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field EndTs", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueryrange + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthQueryrange + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthQueryrange + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(&m.EndTs, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Path", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueryrange + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthQueryrange + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthQueryrange + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Path = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipQueryrange(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthQueryrange + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthQueryrange + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *LokiSeriesResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueryrange + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: LokiSeriesResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: LokiSeriesResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Status", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueryrange + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthQueryrange + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthQueryrange + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Status = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueryrange + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthQueryrange + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthQueryrange + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Data = append(m.Data, logproto.SeriesIdentifier{}) + if err := m.Data[len(m.Data)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Version", wireType) + } + m.Version = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQueryrange + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Version |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipQueryrange(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthQueryrange + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthQueryrange + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *LokiData) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 diff --git a/pkg/querier/queryrange/queryrange.proto b/pkg/querier/queryrange/queryrange.proto index 8710d7cfc0..8dec36dd85 100644 --- a/pkg/querier/queryrange/queryrange.proto +++ b/pkg/querier/queryrange/queryrange.proto @@ -33,6 +33,19 @@ message LokiResponse { stats.Result statistics = 8 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "statistics"]; } +message LokiSeriesRequest { + repeated string match = 1; + google.protobuf.Timestamp startTs = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; + google.protobuf.Timestamp endTs = 3 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; + string path = 4; +} + +message LokiSeriesResponse { + string Status = 1 [(gogoproto.jsontag) = "status"]; + repeated logproto.SeriesIdentifier Data = 2 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "data,omitempty"]; + uint32 version = 3; +} + message LokiData { string ResultType = 1 [(gogoproto.jsontag) = "resultType"]; repeated logproto.StreamAdapter Result = 2 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "result", (gogoproto.customtype) = "github.com/grafana/loki/pkg/logproto.Stream"]; diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index f982caa8e8..e9e7e98467 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -64,61 +64,77 @@ func NewTripperware( return nil, nil, err } + seriesTripperware, err := NewSeriesTripperware(cfg, log, limits, lokiCodec, instrumentMetrics, retryMetrics, splitByMetrics) + if err != nil { + return nil, nil, err + } + return func(next http.RoundTripper) http.RoundTripper { metricRT := metricsTripperware(next) logFilterRT := logFilterTripperware(next) - return newRoundTripper(next, logFilterRT, metricRT, limits) + seriesRT := seriesTripperware(next) + return newRoundTripper(next, logFilterRT, metricRT, seriesRT, limits) }, cache, nil } type roundTripper struct { - next, log, metric http.RoundTripper + next, log, metric, series http.RoundTripper limits Limits } // newRoundTripper creates a new queryrange roundtripper -func newRoundTripper(next, log, metric http.RoundTripper, limits Limits) roundTripper { +func newRoundTripper(next, log, metric, series http.RoundTripper, limits Limits) roundTripper { return roundTripper{ log: log, limits: limits, metric: metric, + series: series, next: next, } } func (r roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { - if !strings.HasSuffix(req.URL.Path, "/query_range") && !strings.HasSuffix(req.URL.Path, "/prom/query") { - return r.next.RoundTrip(req) - } err := req.ParseForm() if err != nil { return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } - rangeQuery, err := loghttp.ParseRangeQuery(req) - if err != nil { - return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) - } - expr, err := logql.ParseExpr(rangeQuery.Query) - if err != nil { - return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) - } - switch e := expr.(type) { - case logql.SampleExpr: - return r.metric.RoundTrip(req) - case logql.LogSelectorExpr: - filter, err := transformRegexQuery(req, e).Filter() + + switch op := getOperation(req); op { + case "query_range": + rangeQuery, err := loghttp.ParseRangeQuery(req) if err != nil { return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } - if err := validateLimits(req, rangeQuery.Limit, r.limits); err != nil { - return nil, err + expr, err := logql.ParseExpr(rangeQuery.Query) + if err != nil { + return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } - if filter == nil { + switch e := expr.(type) { + case logql.SampleExpr: + return r.metric.RoundTrip(req) + case logql.LogSelectorExpr: + filter, err := transformRegexQuery(req, e).Filter() + if err != nil { + return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + } + if err := validateLimits(req, rangeQuery.Limit, r.limits); err != nil { + return nil, err + } + if filter == nil { + return r.next.RoundTrip(req) + } + return r.log.RoundTrip(req) + + default: return r.next.RoundTrip(req) } - return r.log.RoundTrip(req) - + case "series": + _, err := loghttp.ParseSeriesQuery(req) + if err != nil { + return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + } + return r.series.RoundTrip(req) default: return r.next.RoundTrip(req) } @@ -154,6 +170,16 @@ func validateLimits(req *http.Request, reqLimit uint32, limits Limits) error { return nil } +func getOperation(req *http.Request) string { + if strings.HasSuffix(req.URL.Path, "/query_range") || strings.HasSuffix(req.URL.Path, "/prom/query") { + return "query_range" + } else if strings.HasSuffix(req.URL.Path, "/series") { + return "series" + } else { + return "" + } +} + // NewLogFilterTripperware creates a new frontend tripperware responsible for handling log requests with regex. func NewLogFilterTripperware( cfg Config, @@ -199,6 +225,32 @@ func NewLogFilterTripperware( }, nil } +// NewSeriesripperware creates a new frontend tripperware responsible for handling series requests +func NewSeriesTripperware( + cfg Config, + log log.Logger, + limits Limits, + codec queryrange.Codec, + instrumentMetrics *queryrange.InstrumentMiddlewareMetrics, + retryMiddlewareMetrics *queryrange.RetryMiddlewareMetrics, + splitByMetrics *SplitByMetrics, +) (frontend.Tripperware, error) { + queryRangeMiddleware := []queryrange.Middleware{} + if cfg.SplitQueriesByInterval != 0 { + queryRangeMiddleware = append(queryRangeMiddleware, queryrange.InstrumentMiddleware("split_by_interval", instrumentMetrics), SplitByIntervalMiddleware(limits, codec, splitByMetrics)) + } + if cfg.MaxRetries > 0 { + queryRangeMiddleware = append(queryRangeMiddleware, queryrange.InstrumentMiddleware("retry", instrumentMetrics), queryrange.NewRetryMiddleware(log, cfg.MaxRetries, retryMiddlewareMetrics)) + } + + return func(next http.RoundTripper) http.RoundTripper { + if len(queryRangeMiddleware) > 0 { + return queryrange.NewRoundTripper(next, codec, queryRangeMiddleware...) + } + return next + }, nil +} + // NewMetricTripperware creates a new frontend tripperware responsible for handling metric queries func NewMetricTripperware( cfg Config, diff --git a/pkg/querier/queryrange/roundtrip_test.go b/pkg/querier/queryrange/roundtrip_test.go index 760c1de8d3..19a16eebe1 100644 --- a/pkg/querier/queryrange/roundtrip_test.go +++ b/pkg/querier/queryrange/roundtrip_test.go @@ -77,6 +77,17 @@ var ( Labels: `{filename="/var/hostlog/apport.log", job="varlogs"}`, }, } + + series = logproto.SeriesResponse{ + Series: []logproto.SeriesIdentifier{ + { + Labels: map[string]string{"filename": "/var/hostlog/apport.log", "job": "varlogs"}, + }, + { + Labels: map[string]string{"filename": "/var/hostlog/test.log", "job": "varlogs"}, + }, + }, + } ) // those tests are mostly for testing the glue between all component and make sure they activate correctly. @@ -192,6 +203,47 @@ func TestLogFilterTripperware(t *testing.T) { require.Error(t, err) } +func TestSeriesTripperware(t *testing.T) { + + tpw, stopper, err := NewTripperware(testConfig, util.Logger, fakeLimits{}, chunk.SchemaConfig{}, 0, nil) + if stopper != nil { + defer stopper.Stop() + } + require.NoError(t, err) + rt, err := newfakeRoundTripper() + require.NoError(t, err) + defer rt.Close() + + lreq := &LokiSeriesRequest{ + Match: []string{`{job="varlogs"}`}, + StartTs: testTime.Add(-6 * time.Hour), // bigger than the limit + EndTs: testTime, + Path: "/loki/api/v1/series", + } + + ctx := user.InjectOrgID(context.Background(), "1") + req, err := lokiCodec.EncodeRequest(ctx, lreq) + require.NoError(t, err) + + req = req.WithContext(ctx) + err = user.InjectOrgIDIntoHTTPRequest(ctx, req) + require.NoError(t, err) + + count, h := seriesResult(series) + rt.setHandler(h) + resp, err := tpw(rt).RoundTrip(req) + // 2 queries + require.Equal(t, 2, *count) + require.NoError(t, err) + lokiSeriesResponse, err := lokiCodec.DecodeResponse(ctx, resp, lreq) + res, ok := lokiSeriesResponse.(*LokiSeriesResponse) + require.Equal(t, true, ok) + + // make sure we return unique series since responses from + // SplitByInterval middleware might have duplicate series + require.Equal(t, series.Series, res.Data) + require.NoError(t, err) +} func TestLogNoRegex(t *testing.T) { tpw, stopper, err := NewTripperware(testConfig, util.Logger, fakeLimits{}, chunk.SchemaConfig{}, 0, nil) if stopper != nil { @@ -316,6 +368,10 @@ func TestPostQueries(t *testing.T) { t.Error("unexpected metric roundtripper called") return nil, nil }), + frontend.RoundTripFunc(func(*http.Request) (*http.Response, error) { + t.Error("unexpected series roundtripper called") + return nil, nil + }), fakeLimits{}, ).RoundTrip(req) require.NoError(t, err) @@ -447,7 +503,19 @@ func promqlResult(v parser.Value) (*int, http.Handler) { } count++ }) +} +func seriesResult(v logproto.SeriesResponse) (*int, http.Handler) { + count := 0 + var lock sync.Mutex + return &count, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + lock.Lock() + defer lock.Unlock() + if err := marshal.WriteSeriesResponseJSON(v, w); err != nil { + panic(err) + } + count++ + }) } type fakeRoundTripper struct { diff --git a/pkg/querier/queryrange/split_by_interval.go b/pkg/querier/queryrange/split_by_interval.go index af6c58c957..6b66d9a7a8 100644 --- a/pkg/querier/queryrange/split_by_interval.go +++ b/pkg/querier/queryrange/split_by_interval.go @@ -153,7 +153,7 @@ func (h *splitByInterval) loop(ctx context.Context, ch <-chan *lokiResult) { } func (h *splitByInterval) Do(ctx context.Context, r queryrange.Request) (queryrange.Response, error) { - lokiRequest := r.(*LokiRequest) + //lokiRequest := r.(*LokiRequest) userid, err := user.ExtractOrgID(ctx) if err != nil { @@ -166,7 +166,7 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrange.Request) (queryra return h.next.Do(ctx, r) } - intervals := splitByTime(lokiRequest, interval) + intervals := splitByTime(r, interval) h.metrics.splits.Observe(float64(len(intervals))) // no interval should not be processed by the frontend. @@ -179,10 +179,20 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrange.Request) (queryra } - if lokiRequest.Direction == logproto.BACKWARD { - for i, j := 0, len(intervals)-1; i < j; i, j = i+1, j-1 { - intervals[i], intervals[j] = intervals[j], intervals[i] + var limit int64 + switch req := r.(type) { + case *LokiRequest: + limit = int64(req.Limit) + if req.Direction == logproto.BACKWARD { + for i, j := 0, len(intervals)-1; i < j; i, j = i+1, j-1 { + intervals[i], intervals[j] = intervals[j], intervals[i] + } } + case *LokiSeriesRequest: + // Set this to 0 since this is not used in Series Request. + limit = 0 + default: + return nil, httpgrpc.Errorf(http.StatusBadRequest, "unknown request type") } input := make([]*lokiResult, 0, len(intervals)) @@ -193,29 +203,49 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrange.Request) (queryra }) } - resps, err := h.Process(ctx, h.limits.MaxQueryParallelism(userid), int64(lokiRequest.Limit), input) + resps, err := h.Process(ctx, h.limits.MaxQueryParallelism(userid), limit, input) if err != nil { return nil, err } return h.merger.MergeResponse(resps...) } -func splitByTime(r *LokiRequest, interval time.Duration) []queryrange.Request { +func splitByTime(req queryrange.Request, interval time.Duration) []queryrange.Request { var reqs []queryrange.Request - for start := r.StartTs; start.Before(r.EndTs); start = start.Add(interval) { - end := start.Add(interval) - if end.After(r.EndTs) { - end = r.EndTs + + switch r := req.(type) { + case *LokiRequest: + for start := r.StartTs; start.Before(r.EndTs); start = start.Add(interval) { + end := start.Add(interval) + if end.After(r.EndTs) { + end = r.EndTs + } + reqs = append(reqs, &LokiRequest{ + Query: r.Query, + Limit: r.Limit, + Step: r.Step, + Direction: r.Direction, + Path: r.Path, + StartTs: start, + EndTs: end, + }) } - reqs = append(reqs, &LokiRequest{ - Query: r.Query, - Limit: r.Limit, - Step: r.Step, - Direction: r.Direction, - Path: r.Path, - StartTs: start, - EndTs: end, - }) + return reqs + case *LokiSeriesRequest: + for start := r.StartTs; start.Before(r.EndTs); start = start.Add(interval) { + end := start.Add(interval) + if end.After(r.EndTs) { + end = r.EndTs + } + reqs = append(reqs, &LokiSeriesRequest{ + Match: r.Match, + Path: r.Path, + StartTs: start, + EndTs: end, + }) + } + return reqs + default: + return nil } - return reqs } diff --git a/pkg/querier/queryrange/split_by_interval_test.go b/pkg/querier/queryrange/split_by_interval_test.go index 0a469d71d3..3ffbfc888c 100644 --- a/pkg/querier/queryrange/split_by_interval_test.go +++ b/pkg/querier/queryrange/split_by_interval_test.go @@ -22,7 +22,7 @@ func Test_splitQuery(t *testing.T) { tests := []struct { name string - req *LokiRequest + req queryrange.Request interval time.Duration want []queryrange.Request }{ @@ -72,6 +72,28 @@ func Test_splitQuery(t *testing.T) { }, }, }, + { + "3 intervals series", + &LokiSeriesRequest{ + StartTs: time.Date(2019, 12, 9, 12, 0, 0, 1, time.UTC), + EndTs: time.Date(2019, 12, 9, 16, 0, 0, 2, time.UTC), + }, + 2 * time.Hour, + []queryrange.Request{ + &LokiSeriesRequest{ + StartTs: time.Date(2019, 12, 9, 12, 0, 0, 1, time.UTC), + EndTs: time.Date(2019, 12, 9, 14, 0, 0, 1, time.UTC), + }, + &LokiSeriesRequest{ + StartTs: time.Date(2019, 12, 9, 14, 0, 0, 1, time.UTC), + EndTs: time.Date(2019, 12, 9, 16, 0, 0, 1, time.UTC), + }, + &LokiSeriesRequest{ + StartTs: time.Date(2019, 12, 9, 16, 0, 0, 1, time.UTC), + EndTs: time.Date(2019, 12, 9, 16, 0, 0, 2, time.UTC), + }, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -251,6 +273,71 @@ func Test_splitByInterval_Do(t *testing.T) { } +func Test_series_splitByInterval_Do(t *testing.T) { + ctx := user.InjectOrgID(context.Background(), "1") + next := queryrange.HandlerFunc(func(_ context.Context, r queryrange.Request) (queryrange.Response, error) { + return &LokiSeriesResponse{ + Status: "success", + Version: uint32(loghttp.VersionV1), + Data: []logproto.SeriesIdentifier{ + { + Labels: map[string]string{"filename": "/var/hostlog/apport.log", "job": "varlogs"}, + }, + { + Labels: map[string]string{"filename": "/var/hostlog/test.log", "job": "varlogs"}, + }, + { + Labels: map[string]string{"filename": "/var/hostlog/test.log", "job": "varlogs"}, + }, + }, + }, nil + }) + + l := WithDefaultLimits(fakeLimits{}, queryrange.Config{SplitQueriesByInterval: time.Hour}) + split := SplitByIntervalMiddleware( + l, + lokiCodec, + nilMetrics, + ).Wrap(next) + + tests := []struct { + name string + req *LokiSeriesRequest + want *LokiSeriesResponse + }{ + { + "backward", + &LokiSeriesRequest{ + StartTs: time.Unix(0, 0), + EndTs: time.Unix(0, (4 * time.Hour).Nanoseconds()), + Match: []string{`{job="varlogs"}`}, + Path: "/loki/api/v1/series", + }, + &LokiSeriesResponse{ + Status: "success", + Version: 1, + Data: []logproto.SeriesIdentifier{ + { + Labels: map[string]string{"filename": "/var/hostlog/apport.log", "job": "varlogs"}, + }, + { + Labels: map[string]string{"filename": "/var/hostlog/test.log", "job": "varlogs"}, + }, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + res, err := split.Do(ctx, tt.req) + require.NoError(t, err) + require.Equal(t, tt.want, res) + }) + } + +} + func Test_ExitEarly(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "1")