Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loki/pkg/querier/queryrange/marshal.go

204 lines
5.4 KiB

// Package contains methods to marshal logqmodel types to queryrange Protobuf types.
// Its cousing is util/marshal which converts them to JSON.
package queryrange
import (
"fmt"
"io"
"github.com/prometheus/prometheus/promql"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/sketch"
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
)
const (
JSONType = `application/json; charset=utf-8`
ProtobufType = `application/vnd.google.protobuf`
)
// WriteQueryResponseProtobuf marshals the promql.Value to queryrange QueryResonse and then
// writes it to the provided io.Writer.
func WriteQueryResponseProtobuf(params logql.Params, v logqlmodel.Result, w io.Writer) error {
r, err := ResultToResponse(v, params)
if err != nil {
return err
}
p, err := QueryResponseWrap(r)
if err != nil {
return err
}
buf, err := p.Marshal()
if err != nil {
return err
}
_, err = w.Write(buf)
return err
}
// ResultToResponse is the reverse of ResponseToResult below.
func ResultToResponse(result logqlmodel.Result, params logql.Params) (queryrangebase.Response, error) {
switch data := result.Data.(type) {
case promql.Vector:
sampleStream, err := queryrangebase.FromValue(data)
if err != nil {
return nil, err
}
return &LokiPromResponse{
Response: &queryrangebase.PrometheusResponse{
Status: "success",
Data: queryrangebase.PrometheusData{
ResultType: loghttp.ResultTypeVector,
Result: sampleStream,
},
},
Statistics: result.Statistics,
}, nil
case promql.Matrix:
sampleStream, err := queryrangebase.FromValue(data)
if err != nil {
return nil, err
}
return &LokiPromResponse{
Response: &queryrangebase.PrometheusResponse{
Status: "success",
Data: queryrangebase.PrometheusData{
ResultType: loghttp.ResultTypeMatrix,
Result: sampleStream,
},
},
Statistics: result.Statistics,
}, nil
case promql.Scalar:
sampleStream, err := queryrangebase.FromValue(data)
if err != nil {
return nil, err
}
return &LokiPromResponse{
Response: &queryrangebase.PrometheusResponse{
Status: "success",
Data: queryrangebase.PrometheusData{
ResultType: loghttp.ResultTypeScalar,
Result: sampleStream,
},
},
Statistics: result.Statistics,
}, nil
case logqlmodel.Streams:
return &LokiResponse{
Direction: params.Direction(),
Limit: params.Limit(),
Data: LokiData{
ResultType: loghttp.ResultTypeStream,
Result: data,
},
Status: "success",
Statistics: result.Statistics,
}, nil
case sketch.TopKMatrix:
sk, err := data.ToProto()
return &TopKSketchesResponse{Response: sk}, err
case sketch.QuantileSketchMatrix:
return &QuantileSketchResponse{Response: data.ToProto()}, nil
}
return nil, fmt.Errorf("unsupported data type: %t", result.Data)
}
Turn frontend Tripperware into a Middleware. (#10688) **What this PR does / why we need it**: Currently, a request to Loki's frontend API goes through these conversions: ``` http.Request ↓ limitedRoundTripper queryrangebase.Request ↓ queryrangebase.Middlware … ↓ queryrangebase.Request ↓ limitedRoundTripper http.Request ↓ grpcRoundTripperAdapter httpgrpc ↓ grpcRoundTripperAdapter http.Response ↓ limitedRoundTripper queryrangebase.Response ↓ limitedRoundtripper http.Response ``` Since `httgrpc` and `queryrangebase.Request` are Protobufs there's no good reason to encode and decode them to HTTP responses/requests. Furthermore, the encoding to HTTP makes it harder for us to encode query plans. Thus the conversions are changed to the following: ``` http.Request ↓ queryrangebase.Request ↓ queryrangebase.Middlware … ↓ queryrangebase.Request ↓ httpgrpc ↓ queryrangebase.Response ↓ http.Response ``` In order to achieve this the `http.RoundTripper` is pushed to the outside. Only the serialization layer from `http.Request` to `queryrangebase.Request` and `http.Response` to `queryrangebase.Response` will be an `http.RoundTripper`. Everything else is either a `queryrangebase.Handler` or `queryrangebase.Middleware`. **Checklist** - [ ] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [x] Tests updated - [ ] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/setup/upgrade/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213)
2 years ago
func ResponseToResult(resp queryrangebase.Response) (logqlmodel.Result, error) {
switch r := resp.(type) {
case *LokiResponse:
if r.Error != "" {
return logqlmodel.Result{}, fmt.Errorf("%s: %s", r.ErrorType, r.Error)
}
streams := make(logqlmodel.Streams, 0, len(r.Data.Result))
for _, stream := range r.Data.Result {
streams = append(streams, stream)
}
return logqlmodel.Result{
Statistics: r.Statistics,
Data: streams,
Headers: resp.GetHeaders(),
}, nil
case *LokiPromResponse:
if r.Response.Error != "" {
return logqlmodel.Result{}, fmt.Errorf("%s: %s", r.Response.ErrorType, r.Response.Error)
}
if r.Response.Data.ResultType == loghttp.ResultTypeVector {
return logqlmodel.Result{
Statistics: r.Statistics,
Data: sampleStreamToVector(r.Response.Data.Result),
Headers: resp.GetHeaders(),
}, nil
}
return logqlmodel.Result{
Statistics: r.Statistics,
Data: sampleStreamToMatrix(r.Response.Data.Result),
Headers: resp.GetHeaders(),
}, nil
case *TopKSketchesResponse:
matrix, err := sketch.TopKMatrixFromProto(r.Response)
if err != nil {
return logqlmodel.Result{}, fmt.Errorf("cannot decode topk sketch: %w", err)
}
return logqlmodel.Result{
Data: matrix,
Headers: resp.GetHeaders(),
}, nil
case *QuantileSketchResponse:
matrix, err := sketch.QuantileSketchMatrixFromProto(r.Response)
if err != nil {
return logqlmodel.Result{}, fmt.Errorf("cannot decode quantile sketch: %w", err)
}
return logqlmodel.Result{
Data: matrix,
Headers: resp.GetHeaders(),
}, nil
default:
return logqlmodel.Result{}, fmt.Errorf("cannot decode (%T)", resp)
}
}
func QueryResponseWrap(res queryrangebase.Response) (*QueryResponse, error) {
p := &QueryResponse{}
switch response := res.(type) {
case *LokiPromResponse:
p.Response = &QueryResponse_Prom{response}
case *LokiResponse:
p.Response = &QueryResponse_Streams{response}
case *LokiSeriesResponse:
p.Response = &QueryResponse_Series{response}
case *MergedSeriesResponseView:
mat, err := response.Materialize()
if err != nil {
return p, err
}
p.Response = &QueryResponse_Series{mat}
case *LokiLabelNamesResponse:
p.Response = &QueryResponse_Labels{response}
case *IndexStatsResponse:
p.Response = &QueryResponse_Stats{response}
case *TopKSketchesResponse:
p.Response = &QueryResponse_TopkSketches{response}
case *QuantileSketchResponse:
p.Response = &QueryResponse_QuantileSketches{response}
default:
return nil, fmt.Errorf("invalid response format, got (%T)", res)
}
return p, nil
}