Support content negotiation between query frontend and querier. (#9813)

**What this PR does / why we need it**:
Currently, the querier sends results to the query frontend in JSON which
is then decoded to Protobuf. It is more efficient to send the results as
Protobuf.

This will also allow to extend the results with custom data structures.

The change is backwards compatible through content negotiation.

**Special notes for your reviewer**:

**Checklist**
- [x] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [x] Documentation added
- [x] Tests updated
- [x] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/upgrading/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e3ec)
pull/9838/head
Karsten Jeschkies 3 years ago committed by GitHub
parent 0711580154
commit b35bbd80d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 5
      docs/sources/configuration/_index.md
  3. 1
      integration/loki_micro_services_test.go
  4. 42
      pkg/querier/http.go
  5. 118
      pkg/querier/queryrange/codec.go
  6. 340
      pkg/querier/queryrange/codec_test.go
  7. 32
      pkg/querier/queryrange/extensions.go
  8. 4
      pkg/querier/queryrange/index_stats_cache_test.go
  9. 2
      pkg/querier/queryrange/limits.go
  10. 16
      pkg/querier/queryrange/limits_test.go
  11. 226
      pkg/querier/queryrange/marshal.go
  12. 1123
      pkg/querier/queryrange/queryrange.pb.go
  13. 11
      pkg/querier/queryrange/queryrange.proto
  14. 2
      pkg/querier/queryrange/queryrangebase/definitions/interface.go
  15. 2
      pkg/querier/queryrange/queryrangebase/marshaling_test.go
  16. 3
      pkg/querier/queryrange/queryrangebase/query_range.go
  17. 2
      pkg/querier/queryrange/queryrangebase/query_range_test.go
  18. 11
      pkg/querier/queryrange/queryrangebase/roundtrip.go
  19. 9
      pkg/querier/queryrange/queryrangebase/value.go
  20. 8
      pkg/querier/queryrange/querysharding_test.go
  21. 23
      pkg/querier/queryrange/roundtrip.go
  22. 48
      pkg/querier/queryrange/roundtrip_test.go
  23. 10
      pkg/querier/queryrange/split_by_interval_test.go
  24. 4
      pkg/querier/queryrange/stats.go
  25. 5
      pkg/querier/worker_service_test.go
  26. 30
      pkg/util/marshal/marshal.go
  27. 5
      pkg/util/server/middleware.go
  28. 1
      tools/dev/loki-boltdb-storage-s3/config/loki.yaml
  29. 2
      tools/dev/loki-boltdb-storage-s3/dev.dockerfile

@ -45,6 +45,7 @@
* [7447](https://github.com/grafana/loki/pull/7447) **ashwanthgoli** compactor: multi-store support.
* [7754](https://github.com/grafana/loki/pull/7754) **ashwanthgoli** index-shipper: add support for multiple stores.
* [8662](https://github.com/grafana/loki/pull/8662) **liguozhong**: LogQL: Introduce `distinct`
* [9813](https://github.com/grafana/loki/pull/9813) **jeschkies**: Enable Protobuf encoding via content negotiation between querier and query frontend.
##### Fixes

@ -791,6 +791,11 @@ results_cache:
# CLI flag: -frontend.forward-headers-list
[forward_headers_list: <list of strings> | default = []]
# The downstream querier is required to answer in the accepted format. Can be
# 'json' or 'protobuf'. Note: Both will still be routed over GRPC.
# CLI flag: -frontend.required-query-response-format
[required_query_response_format: <string> | default = "json"]
# Cache index stats query results.
# CLI flag: -querier.cache-index-stats-results
[cache_index_stats_results: <boolean> | default = false]

@ -68,6 +68,7 @@ func TestMicroServicesIngestQuery(t *testing.T) {
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
"-common.compactor-address="+tCompactor.HTTPURL(),
"-querier.per-request-limits-enabled=true",
"-frontend.required-query-response-format=protobuf",
)
_ = clu.AddComponent(
"querier",

@ -7,15 +7,12 @@ import (
"strings"
"time"
"github.com/prometheus/common/model"
"github.com/grafana/loki/pkg/logproto"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gorilla/websocket"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"
"github.com/weaveworks/common/httpgrpc"
@ -25,10 +22,12 @@ import (
"github.com/grafana/loki/pkg/loghttp"
loghttp_legacy "github.com/grafana/loki/pkg/loghttp/legacy"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/querier/queryrange"
index_stats "github.com/grafana/loki/pkg/storage/stores/index/stats"
"github.com/grafana/loki/pkg/util/httpreq"
util_log "github.com/grafana/loki/pkg/util/log"
@ -97,9 +96,9 @@ func (q *QuerierAPI) RangeQueryHandler(w http.ResponseWriter, r *http.Request) {
serverutil.WriteError(err, w)
return
}
if err := marshal.WriteQueryResponseJSON(result, w); err != nil {
if err := queryrange.WriteResponse(r, &params, result, w); err != nil {
serverutil.WriteError(err, w)
return
}
}
@ -134,9 +133,8 @@ func (q *QuerierAPI) InstantQueryHandler(w http.ResponseWriter, r *http.Request)
return
}
if err := marshal.WriteQueryResponseJSON(result, w); err != nil {
if err := queryrange.WriteResponse(r, &params, result, w); err != nil {
serverutil.WriteError(err, w)
return
}
}
@ -189,9 +187,8 @@ func (q *QuerierAPI) LogQueryHandler(w http.ResponseWriter, r *http.Request) {
return
}
if err := marshal_legacy.WriteQueryResponseJSON(result, w); err != nil {
if err := queryrange.WriteResponse(r, &params, result, w); err != nil {
serverutil.WriteError(err, w)
return
}
}
@ -233,14 +230,8 @@ func (q *QuerierAPI) LabelHandler(w http.ResponseWriter, r *http.Request) {
return
}
if loghttp.GetVersion(r.RequestURI) == loghttp.VersionV1 {
err = marshal.WriteLabelResponseJSON(*resp, w)
} else {
err = marshal_legacy.WriteLabelResponseJSON(*resp, w)
}
if err != nil {
if err := queryrange.WriteResponse(r, nil, resp, w); err != nil {
serverutil.WriteError(err, w)
return
}
}
@ -407,10 +398,8 @@ func (q *QuerierAPI) SeriesHandler(w http.ResponseWriter, r *http.Request) {
return
}
err = marshal.WriteSeriesResponseJSON(*resp, w)
if err != nil {
if err := queryrange.WriteResponse(r, nil, resp, w); err != nil {
serverutil.WriteError(err, w)
return
}
}
@ -434,10 +423,8 @@ func (q *QuerierAPI) IndexStatsHandler(w http.ResponseWriter, r *http.Request) {
return
}
err = marshal.WriteIndexStatsResponseJSON(resp, w)
if err != nil {
if err := queryrange.WriteResponse(r, nil, resp, w); err != nil {
serverutil.WriteError(err, w)
return
}
}
@ -461,7 +448,7 @@ func (q *QuerierAPI) SeriesVolumeRangeHandler(w http.ResponseWriter, r *http.Req
Limit: int32(rawReq.Limit),
}
q.seriesVolumeHandler(r.Context(), req, w)
q.seriesVolumeHandler(r.Context(), r, req, w)
}
// SeriesVolumeInstantHandler queries the index label volumes related to the passed matchers and given time range.
@ -482,10 +469,10 @@ func (q *QuerierAPI) SeriesVolumeInstantHandler(w http.ResponseWriter, r *http.R
Limit: int32(rawReq.Limit),
}
q.seriesVolumeHandler(r.Context(), req, w)
q.seriesVolumeHandler(r.Context(), r, req, w)
}
func (q *QuerierAPI) seriesVolumeHandler(ctx context.Context, req *logproto.VolumeRequest, w http.ResponseWriter) {
func (q *QuerierAPI) seriesVolumeHandler(ctx context.Context, r *http.Request, req *logproto.VolumeRequest, w http.ResponseWriter) {
resp, err := q.querier.SeriesVolume(ctx, req)
if err != nil {
serverutil.WriteError(err, w)
@ -496,9 +483,8 @@ func (q *QuerierAPI) seriesVolumeHandler(ctx context.Context, req *logproto.Volu
resp = &logproto.VolumeResponse{Volumes: []logproto.Volume{}}
}
if marshal.WriteSeriesVolumeResponseJSON(resp, w) != nil {
if err := queryrange.WriteResponse(r, nil, resp, w); err != nil {
serverutil.WriteError(err, w)
return
}
}

@ -35,10 +35,14 @@ import (
marshal_legacy "github.com/grafana/loki/pkg/util/marshal/legacy"
)
var LokiCodec = &Codec{}
var DefaultCodec = &Codec{}
type Codec struct{}
type RequestProtobufCodec struct {
Codec
}
func (r *LokiRequest) GetEnd() int64 {
return r.EndTs.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond))
}
@ -302,7 +306,7 @@ func (Codec) DecodeRequest(_ context.Context, r *http.Request, _ []string) (quer
}
}
func (Codec) EncodeRequest(ctx context.Context, r queryrangebase.Request) (*http.Request, error) {
func (c Codec) EncodeRequest(ctx context.Context, r queryrangebase.Request) (*http.Request, error) {
header := make(http.Header)
queryTags := getQueryTags(ctx)
if queryTags != "" {
@ -462,6 +466,16 @@ func (Codec) EncodeRequest(ctx context.Context, r queryrangebase.Request) (*http
}
}
func (p RequestProtobufCodec) EncodeRequest(ctx context.Context, r queryrangebase.Request) (*http.Request, error) {
req, err := p.Codec.EncodeRequest(ctx, r)
if err != nil {
return nil, err
}
req.Header.Set("Accept", "application/vnd.google.protobuf")
return req, nil
}
type Buffer interface {
Bytes() []byte
}
@ -472,6 +486,16 @@ func (Codec) DecodeResponse(_ context.Context, r *http.Response, req queryrangeb
return nil, httpgrpc.Errorf(r.StatusCode, string(body))
}
if r.Header.Get("Content-Type") == ProtobufType {
return decodeResponseProtobuf(r, req)
}
// Default to JSON.
return decodeResponseJSON(r, req)
}
func decodeResponseJSON(r *http.Response, req queryrangebase.Request) (queryrangebase.Response, error) {
var buf []byte
var err error
if buffer, ok := r.Body.(Buffer); ok {
@ -609,7 +633,55 @@ func (Codec) DecodeResponse(_ context.Context, r *http.Response, req queryrangeb
}
}
func (Codec) EncodeResponse(ctx context.Context, res queryrangebase.Response) (*http.Response, error) {
func decodeResponseProtobuf(r *http.Response, req queryrangebase.Request) (queryrangebase.Response, error) {
var buf []byte
var err error
if buffer, ok := r.Body.(Buffer); ok {
buf = buffer.Bytes()
} else {
buf, err = io.ReadAll(r.Body)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err)
}
}
resp := &QueryResponse{}
err = resp.Unmarshal(buf)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err)
}
headers := httpResponseHeadersToPromResponseHeaders(r.Header)
switch req.(type) {
case *LokiSeriesRequest:
return resp.GetSeries().WithHeaders(headers), nil
case *LokiLabelNamesRequest:
return resp.GetLabels().WithHeaders(headers), nil
case *logproto.IndexStatsRequest:
return resp.GetStats().WithHeaders(headers), nil
default:
switch concrete := resp.Response.(type) {
case *QueryResponse_Prom:
return concrete.Prom.WithHeaders(headers), nil
case *QueryResponse_Streams:
return concrete.Streams.WithHeaders(headers), nil
default:
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "unsupported response type, got (%t)", resp.Response)
}
}
}
func (Codec) EncodeResponse(ctx context.Context, req *http.Request, res queryrangebase.Response) (*http.Response, error) {
if req.Header.Get("Accept") == ProtobufType {
return encodeResponseProtobuf(ctx, res)
}
// Default to JSON.
version := loghttp.GetVersion(req.RequestURI)
return encodeResponseJSON(ctx, version, res)
}
func encodeResponseJSON(ctx context.Context, version loghttp.Version, res queryrangebase.Response) (*http.Response, error) {
sp, _ := opentracing.StartSpanFromContext(ctx, "codec.EncodeResponse")
defer sp.Finish()
var buf bytes.Buffer
@ -630,7 +702,7 @@ func (Codec) EncodeResponse(ctx context.Context, res queryrangebase.Response) (*
Data: logqlmodel.Streams(streams),
Statistics: response.Statistics,
}
if loghttp.Version(response.Version) == loghttp.VersionLegacy {
if version == loghttp.VersionLegacy {
if err := marshal_legacy.WriteQueryResponseJSON(result, &buf); err != nil {
return nil, err
}
@ -673,7 +745,7 @@ func (Codec) EncodeResponse(ctx context.Context, res queryrangebase.Response) (*
resp := http.Response{
Header: http.Header{
"Content-Type": []string{"application/json"},
"Content-Type": []string{"application/json; charset=UTF-8"},
},
Body: io.NopCloser(&buf),
StatusCode: http.StatusOK,
@ -681,6 +753,42 @@ func (Codec) EncodeResponse(ctx context.Context, res queryrangebase.Response) (*
return &resp, nil
}
func encodeResponseProtobuf(ctx context.Context, res queryrangebase.Response) (*http.Response, error) {
sp, _ := opentracing.StartSpanFromContext(ctx, "codec.EncodeResponse")
defer sp.Finish()
p := QueryResponse{}
switch response := res.(type) {
case *LokiPromResponse:
p.Response = &QueryResponse_Prom{response}
case *LokiResponse:
p.Response = &QueryResponse_Streams{response}
case *LokiSeriesResponse:
p.Response = &QueryResponse_Series{response}
case *LokiLabelNamesResponse:
p.Response = &QueryResponse_Labels{response}
case *IndexStatsResponse:
p.Response = &QueryResponse_Stats{response}
default:
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "invalid response format")
}
buf, err := p.Marshal()
if err != nil {
return nil, fmt.Errorf("could not marshal protobuf: %w", err)
}
resp := http.Response{
Header: http.Header{
"Content-Type": []string{ProtobufType},
},
Body: io.NopCloser(bytes.NewBuffer(buf)),
StatusCode: http.StatusOK,
}
return &resp, nil
}
// NOTE: When we would start caching response from non-metric queries we would have to consider cache gen headers as well in
// MergeResponse implementation for Loki codecs same as it is done in Cortex at https://github.com/cortexproject/cortex/blob/21bad57b346c730d684d6d0205efef133422ab28/pkg/querier/queryrange/query_range.go#L170
func (Codec) MergeResponse(responses ...queryrangebase.Response) (queryrangebase.Response, error) {

@ -13,10 +13,15 @@ import (
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/pkg/util"
@ -91,7 +96,7 @@ func Test_codec_DecodeRequest(t *testing.T) {
Query: `{foo="bar"}`,
}, false},
{"index_stats", func() (*http.Request, error) {
return LokiCodec.EncodeRequest(context.Background(), &logproto.IndexStatsRequest{
return DefaultCodec.EncodeRequest(context.Background(), &logproto.IndexStatsRequest{
From: model.TimeFromUnixNano(start.UnixNano()),
Through: model.TimeFromUnixNano(end.UnixNano()),
Matchers: `{job="foo"}`,
@ -102,7 +107,7 @@ func Test_codec_DecodeRequest(t *testing.T) {
Matchers: `{job="foo"}`,
}, false},
{"series_volume", func() (*http.Request, error) {
return LokiCodec.EncodeRequest(context.Background(), &logproto.VolumeRequest{
return DefaultCodec.EncodeRequest(context.Background(), &logproto.VolumeRequest{
From: model.TimeFromUnixNano(start.UnixNano()),
Through: model.TimeFromUnixNano(end.UnixNano()),
Matchers: `{job="foo"}`,
@ -117,7 +122,7 @@ func Test_codec_DecodeRequest(t *testing.T) {
Step: 0,
}, false},
{"series_volume_default_limit", func() (*http.Request, error) {
return LokiCodec.EncodeRequest(context.Background(), &logproto.VolumeRequest{
return DefaultCodec.EncodeRequest(context.Background(), &logproto.VolumeRequest{
From: model.TimeFromUnixNano(start.UnixNano()),
Through: model.TimeFromUnixNano(end.UnixNano()),
Matchers: `{job="foo"}`,
@ -130,7 +135,7 @@ func Test_codec_DecodeRequest(t *testing.T) {
Step: 0,
}, false},
{"series_volume_range", func() (*http.Request, error) {
return LokiCodec.EncodeRequest(context.Background(), &logproto.VolumeRequest{
return DefaultCodec.EncodeRequest(context.Background(), &logproto.VolumeRequest{
From: model.TimeFromUnixNano(start.UnixNano()),
Through: model.TimeFromUnixNano(end.UnixNano()),
Matchers: `{job="foo"}`,
@ -145,7 +150,7 @@ func Test_codec_DecodeRequest(t *testing.T) {
Step: 30 * 1e3, // step is expected in ms
}, false},
{"series_volume_range_default_limit", func() (*http.Request, error) {
return LokiCodec.EncodeRequest(context.Background(), &logproto.VolumeRequest{
return DefaultCodec.EncodeRequest(context.Background(), &logproto.VolumeRequest{
From: model.TimeFromUnixNano(start.UnixNano()),
Through: model.TimeFromUnixNano(end.UnixNano()),
Matchers: `{job="foo"}`,
@ -165,7 +170,7 @@ func Test_codec_DecodeRequest(t *testing.T) {
if err != nil {
t.Fatal(err)
}
got, err := LokiCodec.DecodeRequest(context.TODO(), req, nil)
got, err := DefaultCodec.DecodeRequest(context.TODO(), req, nil)
if (err != nil) != tt.wantErr {
t.Errorf("codec.DecodeRequest() error = %v, wantErr %v", err, tt.wantErr)
return
@ -306,7 +311,7 @@ func Test_codec_DecodeResponse(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := LokiCodec.DecodeResponse(context.TODO(), tt.res, tt.req)
got, err := DefaultCodec.DecodeResponse(context.TODO(), tt.res, tt.req)
if (err != nil) != tt.wantErr {
t.Errorf("codec.DecodeResponse() error = %v, wantErr %v", err, tt.wantErr)
return
@ -316,9 +321,231 @@ func Test_codec_DecodeResponse(t *testing.T) {
}
}
func Test_codec_DecodeProtobufResponseParity(t *testing.T) {
// test fixtures from pkg/util/marshal_test
var queryTests = []struct {
actual parser.Value
expected string
}{
{
logqlmodel.Streams{
logproto.Stream{
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 123456789012345),
Line: "super line",
},
},
Labels: `{test="test"}`,
},
},
`{
"status": "success",
"data": {
` + statsResultString + `
"resultType": "streams",
"result": [
{
"stream": {
"test": "test"
},
"values":[
[ "123456789012345", "super line" ]
]
}
]
}
}`,
},
// vector test
{
promql.Vector{
{
T: 1568404331324,
F: 0.013333333333333334,
Metric: []labels.Label{
{
Name: "filename",
Value: `/var/hostlog/apport.log`,
},
{
Name: "job",
Value: "varlogs",
},
},
},
{
T: 1568404331324,
F: 3.45,
Metric: []labels.Label{
{
Name: "filename",
Value: `/var/hostlog/syslog`,
},
{
Name: "job",
Value: "varlogs",
},
},
},
},
`{
"data": {
` + statsResultString + `
"resultType": "vector",
"result": [
{
"metric": {
"filename": "\/var\/hostlog\/apport.log",
"job": "varlogs"
},
"value": [
1568404331.324,
"0.013333333333333334"
]
},
{
"metric": {
"filename": "\/var\/hostlog\/syslog",
"job": "varlogs"
},
"value": [
1568404331.324,
"3.45"
]
}
]
},
"status": "success"
}`,
},
// matrix test
{
promql.Matrix{
{
Floats: []promql.FPoint{
{
T: 1568404331324,
F: 0.013333333333333334,
},
},
Metric: []labels.Label{
{
Name: "filename",
Value: `/var/hostlog/apport.log`,
},
{
Name: "job",
Value: "varlogs",
},
},
},
{
Floats: []promql.FPoint{
{
T: 1568404331324,
F: 3.45,
},
{
T: 1568404331339,
F: 4.45,
},
},
Metric: []labels.Label{
{
Name: "filename",
Value: `/var/hostlog/syslog`,
},
{
Name: "job",
Value: "varlogs",
},
},
},
},
`{
"data": {
` + statsResultString + `
"resultType": "matrix",
"result": [
{
"metric": {
"filename": "\/var\/hostlog\/apport.log",
"job": "varlogs"
},
"values": [
[
1568404331.324,
"0.013333333333333334"
]
]
},
{
"metric": {
"filename": "\/var\/hostlog\/syslog",
"job": "varlogs"
},
"values": [
[
1568404331.324,
"3.45"
],
[
1568404331.339,
"4.45"
]
]
}
]
},
"status": "success"
}`,
},
}
codec := RequestProtobufCodec{}
for i, queryTest := range queryTests {
u := &url.URL{Path: "/loki/api/v1/query_range"}
httpReq := &http.Request{
Method: "GET",
RequestURI: u.String(),
URL: u,
}
req, err := codec.DecodeRequest(context.TODO(), httpReq, nil)
require.NoError(t, err)
// parser.Value -> queryrange.QueryResponse
var b bytes.Buffer
result := logqlmodel.Result{
Data: queryTest.actual,
Statistics: statsResult,
}
err = WriteQueryResponseProtobuf(&logql.LiteralParams{}, result, &b)
require.NoError(t, err)
// queryrange.QueryResponse -> queryrangebase.Response
querierResp := &http.Response{
StatusCode: 200,
Body: io.NopCloser(&b),
Header: http.Header{
"Content-Type": []string{ProtobufType},
},
}
resp, err := codec.DecodeResponse(context.TODO(), querierResp, req)
require.NoError(t, err)
// queryrange.Response -> JSON
httpResp, err := codec.EncodeResponse(context.TODO(), httpReq, resp)
require.NoError(t, err)
body, _ := io.ReadAll(httpResp.Body)
require.JSONEqf(t, queryTest.expected, string(body), "Protobuf Decode Query Test %d failed", i)
}
}
func Test_codec_EncodeRequest(t *testing.T) {
// we only accept LokiRequest.
got, err := LokiCodec.EncodeRequest(context.TODO(), &queryrangebase.PrometheusRequest{})
got, err := DefaultCodec.EncodeRequest(context.TODO(), &queryrangebase.PrometheusRequest{})
require.Error(t, err)
require.Nil(t, got)
@ -333,7 +560,7 @@ func Test_codec_EncodeRequest(t *testing.T) {
StartTs: start,
EndTs: end,
}
got, err = LokiCodec.EncodeRequest(ctx, toEncode)
got, err = DefaultCodec.EncodeRequest(ctx, toEncode)
require.NoError(t, err)
require.Equal(t, ctx, got.Context())
require.Equal(t, "/loki/api/v1/query_range", got.URL.Path)
@ -346,7 +573,7 @@ func Test_codec_EncodeRequest(t *testing.T) {
require.Equal(t, "10000.000000", got.URL.Query().Get("interval"))
// testing a full roundtrip
req, err := LokiCodec.DecodeRequest(context.TODO(), got, nil)
req, err := DefaultCodec.DecodeRequest(context.TODO(), got, nil)
require.NoError(t, err)
require.Equal(t, toEncode.Query, req.(*LokiRequest).Query)
require.Equal(t, toEncode.Step, req.(*LokiRequest).Step)
@ -359,7 +586,7 @@ func Test_codec_EncodeRequest(t *testing.T) {
}
func Test_codec_series_EncodeRequest(t *testing.T) {
got, err := LokiCodec.EncodeRequest(context.TODO(), &queryrangebase.PrometheusRequest{})
got, err := DefaultCodec.EncodeRequest(context.TODO(), &queryrangebase.PrometheusRequest{})
require.Error(t, err)
require.Nil(t, got)
@ -370,7 +597,7 @@ func Test_codec_series_EncodeRequest(t *testing.T) {
StartTs: start,
EndTs: end,
}
got, err = LokiCodec.EncodeRequest(ctx, toEncode)
got, err = DefaultCodec.EncodeRequest(ctx, toEncode)
require.NoError(t, err)
require.Equal(t, ctx, got.Context())
require.Equal(t, "/loki/api/v1/series", got.URL.Path)
@ -379,7 +606,7 @@ func Test_codec_series_EncodeRequest(t *testing.T) {
require.Equal(t, `{foo="bar"}`, got.URL.Query().Get("match[]"))
// testing a full roundtrip
req, err := LokiCodec.DecodeRequest(context.TODO(), got, nil)
req, err := DefaultCodec.DecodeRequest(context.TODO(), got, nil)
require.NoError(t, err)
require.Equal(t, toEncode.Match, req.(*LokiSeriesRequest).Match)
require.Equal(t, toEncode.StartTs, req.(*LokiSeriesRequest).StartTs)
@ -394,7 +621,7 @@ func Test_codec_labels_EncodeRequest(t *testing.T) {
StartTs: start,
EndTs: end,
}
got, err := LokiCodec.EncodeRequest(ctx, toEncode)
got, err := DefaultCodec.EncodeRequest(ctx, toEncode)
require.NoError(t, err)
require.Equal(t, ctx, got.Context())
require.Equal(t, "/loki/api/v1/labels", got.URL.Path)
@ -402,7 +629,7 @@ func Test_codec_labels_EncodeRequest(t *testing.T) {
require.Equal(t, fmt.Sprintf("%d", end.UnixNano()), got.URL.Query().Get("end"))
// testing a full roundtrip
req, err := LokiCodec.DecodeRequest(context.TODO(), got, nil)
req, err := DefaultCodec.DecodeRequest(context.TODO(), got, nil)
require.NoError(t, err)
require.Equal(t, toEncode.StartTs, req.(*LokiLabelNamesRequest).StartTs)
require.Equal(t, toEncode.EndTs, req.(*LokiLabelNamesRequest).EndTs)
@ -415,7 +642,7 @@ func Test_codec_labels_EncodeRequest(t *testing.T) {
EndTs: end,
Query: `{foo="bar"}`,
}
got, err = LokiCodec.EncodeRequest(ctx, toEncode)
got, err = DefaultCodec.EncodeRequest(ctx, toEncode)
require.NoError(t, err)
require.Equal(t, ctx, got.Context())
require.Equal(t, "/loki/api/v1/labels/__name__/values", got.URL.Path)
@ -424,7 +651,7 @@ func Test_codec_labels_EncodeRequest(t *testing.T) {
require.Equal(t, `{foo="bar"}`, got.URL.Query().Get("query"))
// testing a full roundtrip
req, err = LokiCodec.DecodeRequest(context.TODO(), got, nil)
req, err = DefaultCodec.DecodeRequest(context.TODO(), got, nil)
require.NoError(t, err)
require.Equal(t, toEncode.StartTs, req.(*LokiLabelNamesRequest).StartTs)
require.Equal(t, toEncode.EndTs, req.(*LokiLabelNamesRequest).EndTs)
@ -438,14 +665,14 @@ func Test_codec_labels_DecodeRequest(t *testing.T) {
require.NoError(t, err)
r := &http.Request{URL: u}
req, err := LokiCodec.DecodeRequest(context.TODO(), r, nil)
req, err := DefaultCodec.DecodeRequest(context.TODO(), r, nil)
require.NoError(t, err)
require.Equal(t, start, req.(*LokiLabelNamesRequest).StartTs)
require.Equal(t, end, req.(*LokiLabelNamesRequest).EndTs)
require.Equal(t, `{foo="bar"}`, req.(*LokiLabelNamesRequest).Query)
require.Equal(t, "/loki/api/v1/labels/__name__/values", req.(*LokiLabelNamesRequest).Path)
got, err := LokiCodec.EncodeRequest(ctx, req)
got, err := DefaultCodec.EncodeRequest(ctx, req)
require.NoError(t, err)
require.Equal(t, ctx, got.Context())
require.Equal(t, "/loki/api/v1/labels/__name__/values", got.URL.Path)
@ -461,7 +688,7 @@ func Test_codec_index_stats_EncodeRequest(t *testing.T) {
Through: through,
Matchers: `{job="foo"}`,
}
got, err := LokiCodec.EncodeRequest(context.Background(), toEncode)
got, err := DefaultCodec.EncodeRequest(context.Background(), toEncode)
require.Nil(t, err)
require.Equal(t, fmt.Sprintf("%d", from.UnixNano()), got.URL.Query().Get("start"))
require.Equal(t, fmt.Sprintf("%d", through.UnixNano()), got.URL.Query().Get("end"))
@ -477,7 +704,7 @@ func Test_codec_seriesVolume_EncodeRequest(t *testing.T) {
Limit: 20,
Step: 30 * 1e6,
}
got, err := LokiCodec.EncodeRequest(context.Background(), toEncode)
got, err := DefaultCodec.EncodeRequest(context.Background(), toEncode)
require.Nil(t, err)
require.Equal(t, fmt.Sprintf("%d", from.UnixNano()), got.URL.Query().Get("start"))
require.Equal(t, fmt.Sprintf("%d", through.UnixNano()), got.URL.Query().Get("end"))
@ -489,23 +716,26 @@ func Test_codec_seriesVolume_EncodeRequest(t *testing.T) {
func Test_codec_EncodeResponse(t *testing.T) {
tests := []struct {
name string
path string
res queryrangebase.Response
body string
wantErr bool
}{
{"error", &badResponse{}, "", true},
{"prom", &LokiPromResponse{
Response: &queryrangebase.PrometheusResponse{
Status: loghttp.QueryStatusSuccess,
Data: queryrangebase.PrometheusData{
ResultType: loghttp.ResultTypeMatrix,
Result: sampleStreams,
{"error", "/loki/api/v1/query_range", &badResponse{}, "", true},
{
"prom", "/loki/api/v1/query_range",
&LokiPromResponse{
Response: &queryrangebase.PrometheusResponse{
Status: loghttp.QueryStatusSuccess,
Data: queryrangebase.PrometheusData{
ResultType: loghttp.ResultTypeMatrix,
Result: sampleStreams,
},
},
},
Statistics: statsResult,
}, matrixString, false},
Statistics: statsResult,
}, matrixString, false},
{
"loki v1",
"loki v1", "/loki/api/v1/query_range",
&LokiResponse{
Status: loghttp.QueryStatusSuccess,
Direction: logproto.FORWARD,
@ -519,7 +749,7 @@ func Test_codec_EncodeResponse(t *testing.T) {
}, streamsString, false,
},
{
"loki legacy",
"loki legacy", "/api/promt/query",
&LokiResponse{
Status: loghttp.QueryStatusSuccess,
Direction: logproto.FORWARD,
@ -533,7 +763,7 @@ func Test_codec_EncodeResponse(t *testing.T) {
}, streamsStringLegacy, false,
},
{
"loki series",
"loki series", "/loki/api/v1/series",
&LokiSeriesResponse{
Status: "success",
Version: uint32(loghttp.VersionV1),
@ -541,7 +771,7 @@ func Test_codec_EncodeResponse(t *testing.T) {
}, seriesString, false,
},
{
"loki labels",
"loki labels", "/loki/api/v1/labels",
&LokiLabelNamesResponse{
Status: "success",
Version: uint32(loghttp.VersionV1),
@ -549,7 +779,7 @@ func Test_codec_EncodeResponse(t *testing.T) {
}, labelsString, false,
},
{
"loki labels legacy",
"loki labels legacy", "/api/prom/label",
&LokiLabelNamesResponse{
Status: "success",
Version: uint32(loghttp.VersionLegacy),
@ -557,7 +787,7 @@ func Test_codec_EncodeResponse(t *testing.T) {
}, labelsLegacyString, false,
},
{
"index stats",
"index stats", "/loki/api/v1/index/stats",
&IndexStatsResponse{
Response: &logproto.IndexStatsResponse{
Streams: 1,
@ -568,7 +798,7 @@ func Test_codec_EncodeResponse(t *testing.T) {
}, indexStatsString, false,
},
{
"series volume",
"series volume", "/loki/api/v1/index/series_volume",
&VolumeResponse{
Response: &logproto.VolumeResponse{
Volumes: []logproto.Volume{
@ -581,7 +811,13 @@ func Test_codec_EncodeResponse(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := LokiCodec.EncodeResponse(context.TODO(), tt.res)
u := &url.URL{Path: tt.path}
req := &http.Request{
Method: "GET",
RequestURI: u.String(),
URL: u,
}
got, err := DefaultCodec.EncodeResponse(context.TODO(), req, tt.res)
if (err != nil) != tt.wantErr {
t.Errorf("codec.EncodeResponse() error = %v, wantErr %v", err, tt.wantErr)
return
@ -1052,7 +1288,7 @@ func Test_codec_MergeResponse(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := LokiCodec.MergeResponse(tt.responses...)
got, err := DefaultCodec.MergeResponse(tt.responses...)
if (err != nil) != tt.wantErr {
t.Errorf("codec.MergeResponse() error = %v, wantErr %v", err, tt.wantErr)
return
@ -1448,7 +1684,13 @@ func (b *buffer) Bytes() []byte {
func Benchmark_CodecDecodeLogs(b *testing.B) {
ctx := context.Background()
resp, err := LokiCodec.EncodeResponse(ctx, &LokiResponse{
u := &url.URL{Path: "/loki/api/v1/query_range"}
req := &http.Request{
Method: "GET",
RequestURI: u.String(), // This is what the httpgrpc code looks at.
URL: u,
}
resp, err := DefaultCodec.EncodeResponse(ctx, req, &LokiResponse{
Status: loghttp.QueryStatusSuccess,
Direction: logproto.BACKWARD,
Version: uint32(loghttp.VersionV1),
@ -1472,12 +1714,12 @@ func Benchmark_CodecDecodeLogs(b *testing.B) {
for n := 0; n < b.N; n++ {
_, _ = reader.Seek(0, io.SeekStart)
result, err := LokiCodec.DecodeResponse(ctx, resp, &LokiRequest{
result, err := DefaultCodec.DecodeResponse(ctx, resp, &LokiRequest{
Limit: 100,
StartTs: start,
EndTs: end,
Direction: logproto.BACKWARD,
Path: "/loki/api/v1/query_range",
Path: u.String(),
})
require.Nil(b, err)
require.NotNil(b, result)
@ -1486,7 +1728,13 @@ func Benchmark_CodecDecodeLogs(b *testing.B) {
func Benchmark_CodecDecodeSamples(b *testing.B) {
ctx := context.Background()
resp, err := LokiCodec.EncodeResponse(ctx, &LokiPromResponse{
u := &url.URL{Path: "/loki/api/v1/query_range"}
req := &http.Request{
Method: "GET",
RequestURI: u.String(), // This is what the httpgrpc code looks at.
URL: u,
}
resp, err := DefaultCodec.EncodeResponse(ctx, req, &LokiPromResponse{
Response: &queryrangebase.PrometheusResponse{
Status: loghttp.QueryStatusSuccess,
Data: queryrangebase.PrometheusData{
@ -1506,12 +1754,12 @@ func Benchmark_CodecDecodeSamples(b *testing.B) {
for n := 0; n < b.N; n++ {
_, _ = reader.Seek(0, io.SeekStart)
result, err := LokiCodec.DecodeResponse(ctx, resp, &LokiRequest{
result, err := DefaultCodec.DecodeResponse(ctx, resp, &LokiRequest{
Limit: 100,
StartTs: start,
EndTs: end,
Direction: logproto.BACKWARD,
Path: "/loki/api/v1/query_range",
Path: u.String(),
})
require.Nil(b, err)
require.NotNil(b, result)

@ -1,5 +1,3 @@
// This file includes manual changes done on top of autogenerated queryrange.pb.go from .proto file.
package queryrange
import "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
@ -20,6 +18,11 @@ func (m *LokiLabelNamesResponse) GetHeaders() []*queryrangebase.PrometheusRespon
return nil
}
func (m *LokiLabelNamesResponse) WithHeaders(h []queryrangebase.PrometheusResponseHeader) queryrangebase.Response {
m.Headers = h
return m
}
func (m *LokiSeriesResponse) GetHeaders() []*queryrangebase.PrometheusResponseHeader {
if m != nil {
return convertPrometheusResponseHeadersToPointers(m.Headers)
@ -27,6 +30,11 @@ func (m *LokiSeriesResponse) GetHeaders() []*queryrangebase.PrometheusResponseHe
return nil
}
func (m *LokiSeriesResponse) WithHeaders(h []queryrangebase.PrometheusResponseHeader) queryrangebase.Response {
m.Headers = h
return m
}
func (m *LokiPromResponse) GetHeaders() []*queryrangebase.PrometheusResponseHeader {
if m != nil {
return m.Response.GetHeaders()
@ -34,6 +42,11 @@ func (m *LokiPromResponse) GetHeaders() []*queryrangebase.PrometheusResponseHead
return nil
}
func (m *LokiPromResponse) WithHeaders(h []queryrangebase.PrometheusResponseHeader) queryrangebase.Response {
m.Response.Headers = convertPrometheusResponseHeadersToPointers(h)
return m
}
func (m *LokiResponse) GetHeaders() []*queryrangebase.PrometheusResponseHeader {
if m != nil {
return convertPrometheusResponseHeadersToPointers(m.Headers)
@ -41,6 +54,11 @@ func (m *LokiResponse) GetHeaders() []*queryrangebase.PrometheusResponseHeader {
return nil
}
func (m *LokiResponse) WithHeaders(h []queryrangebase.PrometheusResponseHeader) queryrangebase.Response {
m.Headers = h
return m
}
func convertPrometheusResponseHeadersToPointers(h []queryrangebase.PrometheusResponseHeader) []*queryrangebase.PrometheusResponseHeader {
if h == nil {
return nil
@ -62,6 +80,11 @@ func (m *IndexStatsResponse) GetHeaders() []*queryrangebase.PrometheusResponseHe
return nil
}
func (m *IndexStatsResponse) WithHeaders(h []queryrangebase.PrometheusResponseHeader) queryrangebase.Response {
m.Headers = h
return m
}
// GetHeaders returns the HTTP headers in the response.
func (m *VolumeResponse) GetHeaders() []*queryrangebase.PrometheusResponseHeader {
if m != nil {
@ -69,3 +92,8 @@ func (m *VolumeResponse) GetHeaders() []*queryrangebase.PrometheusResponseHeader
}
return nil
}
func (m *VolumeResponse) WithHeaders(h []queryrangebase.PrometheusResponseHeader) queryrangebase.Response {
m.Headers = h
return m
}

@ -29,7 +29,7 @@ func TestIndexStatsCache(t *testing.T) {
cacheMiddleware, err := NewIndexStatsCacheMiddleware(
log.NewNopLogger(),
WithSplitByLimits(fakeLimits{}, 24*time.Hour),
LokiCodec,
DefaultCodec,
c,
nil,
nil,
@ -170,7 +170,7 @@ func TestIndexStatsCache_RecentData(t *testing.T) {
cacheMiddleware, err := NewIndexStatsCacheMiddleware(
log.NewNopLogger(),
WithSplitByLimits(lim, 24*time.Hour),
LokiCodec,
DefaultCodec,
c,
nil,
nil,

@ -562,7 +562,7 @@ func (rt limitedRoundTripper) RoundTrip(r *http.Request) (*http.Response, error)
if err != nil {
return nil, err
}
return rt.codec.EncodeResponse(ctx, response)
return rt.codec.EncodeResponse(ctx, r, response)
}
func (rt limitedRoundTripper) do(ctx context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {

@ -75,7 +75,7 @@ func Test_seriesLimiter(t *testing.T) {
}
ctx := user.InjectOrgID(context.Background(), "1")
req, err := LokiCodec.EncodeRequest(ctx, lreq)
req, err := DefaultCodec.EncodeRequest(ctx, lreq)
require.NoError(t, err)
req = req.WithContext(ctx)
@ -162,7 +162,7 @@ func Test_MaxQueryParallelism(t *testing.T) {
r, err := http.NewRequestWithContext(ctx, "GET", "/query_range", http.NoBody)
require.Nil(t, err)
_, _ = NewLimitedRoundTripper(f, LokiCodec, fakeLimits{maxQueryParallelism: maxQueryParallelism},
_, _ = NewLimitedRoundTripper(f, DefaultCodec, fakeLimits{maxQueryParallelism: maxQueryParallelism},
testSchemas,
queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler {
return queryrangebase.HandlerFunc(func(c context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
@ -197,7 +197,7 @@ func Test_MaxQueryParallelismLateScheduling(t *testing.T) {
r, err := http.NewRequestWithContext(ctx, "GET", "/query_range", http.NoBody)
require.Nil(t, err)
_, _ = NewLimitedRoundTripper(f, LokiCodec, fakeLimits{maxQueryParallelism: maxQueryParallelism},
_, _ = NewLimitedRoundTripper(f, DefaultCodec, fakeLimits{maxQueryParallelism: maxQueryParallelism},
testSchemas,
queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler {
return queryrangebase.HandlerFunc(func(c context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
@ -226,7 +226,7 @@ func Test_MaxQueryParallelismDisable(t *testing.T) {
r, err := http.NewRequestWithContext(ctx, "GET", "/query_range", http.NoBody)
require.Nil(t, err)
_, err = NewLimitedRoundTripper(f, LokiCodec, fakeLimits{maxQueryParallelism: maxQueryParallelism},
_, err = NewLimitedRoundTripper(f, DefaultCodec, fakeLimits{maxQueryParallelism: maxQueryParallelism},
testSchemas,
queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler {
return queryrangebase.HandlerFunc(func(c context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
@ -267,7 +267,7 @@ func Test_MaxQueryLookBack(t *testing.T) {
}
ctx := user.InjectOrgID(context.Background(), "1")
req, err := LokiCodec.EncodeRequest(ctx, lreq)
req, err := DefaultCodec.EncodeRequest(ctx, lreq)
require.NoError(t, err)
req = req.WithContext(ctx)
@ -445,7 +445,7 @@ func getFakeStatsHandler(retBytes uint64) (queryrangebase.Handler, *int, error)
fakeRT.setHandler(statsHandler)
return queryrangebase.NewRoundTripperHandler(fakeRT, LokiCodec), count, nil
return queryrangebase.NewRoundTripperHandler(fakeRT, DefaultCodec), count, nil
}
func Test_MaxQuerySize(t *testing.T) {
@ -590,7 +590,7 @@ func Test_MaxQuerySize(t *testing.T) {
}
ctx := user.InjectOrgID(context.Background(), "foo")
req, err := LokiCodec.EncodeRequest(ctx, lokiReq)
req, err := DefaultCodec.EncodeRequest(ctx, lokiReq)
require.NoError(t, err)
req = req.WithContext(ctx)
@ -602,7 +602,7 @@ func Test_MaxQuerySize(t *testing.T) {
NewQuerierSizeLimiterMiddleware(schemas, testEngineOpts, util_log.Logger, tc.limits, querierStatsHandler),
}
_, err = queryrangebase.NewRoundTripper(fakeRT, LokiCodec, nil, middlewares...).RoundTrip(req)
_, err = queryrangebase.NewRoundTripper(fakeRT, DefaultCodec, nil, middlewares...).RoundTrip(req)
if tc.shouldErr {
require.Error(t, err)

@ -0,0 +1,226 @@
// Package contains methods to marshal logqmodel types to queryrange Protobuf types.
// Its cousing is util/marshal which converts them to JSON.
package queryrange
import (
"fmt"
"io"
"net/http"
"github.com/prometheus/prometheus/promql"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/pkg/storage/stores/index/stats"
"github.com/grafana/loki/pkg/util/marshal"
)
const (
JSONType = `application/json; charset=utf-8`
ProtobufType = `application/vnd.google.protobuf`
)
func WriteResponse(req *http.Request, params *logql.LiteralParams, v any, w http.ResponseWriter) error {
if req.Header.Get("Accept") == ProtobufType {
w.Header().Add("Content-Type", ProtobufType)
return WriteResponseProtobuf(req, params, v, w)
}
w.Header().Add("Content-Type", JSONType)
return marshal.WriteResponseJSON(req, v, w)
}
func WriteResponseProtobuf(req *http.Request, params *logql.LiteralParams, v any, w http.ResponseWriter) error {
switch result := v.(type) {
case logqlmodel.Result:
return WriteQueryResponseProtobuf(params, result, w)
case *logproto.LabelResponse:
version := loghttp.GetVersion(req.RequestURI)
return WriteLabelResponseProtobuf(version, *result, w)
case logproto.SeriesResponse:
version := loghttp.GetVersion(req.RequestURI)
return WriteSeriesResponseProtobuf(version, result, w)
case *stats.Stats:
return WriteIndexStatsResponseProtobuf(result, w)
case *logproto.VolumeResponse:
return WriteSeriesVolumeResponseProtobuf(result, w)
}
return fmt.Errorf("unknown response type %T", v)
}
// WriteQueryResponseProtobuf marshals the promql.Value to queryrange QueryResonse and then
// writes it to the provided io.Writer.
func WriteQueryResponseProtobuf(params *logql.LiteralParams, v logqlmodel.Result, w io.Writer) error {
p, err := ResultToResponse(v, params)
if err != nil {
return err
}
buf, err := p.Marshal()
if err != nil {
return err
}
_, err = w.Write(buf)
return err
}
// WriteLabelResponseProtobuf marshals a logproto.LabelResponse to queryrange LokiLabelNamesResponse
// and then writes it to the provided io.Writer.
func WriteLabelResponseProtobuf(version loghttp.Version, l logproto.LabelResponse, w io.Writer) error {
p := QueryResponse{
Response: &QueryResponse_Labels{
Labels: &LokiLabelNamesResponse{
Status: "success",
Data: l.Values,
Version: uint32(version),
// Statistics: statResult,
},
},
}
buf, err := p.Marshal()
if err != nil {
return err
}
_, err = w.Write(buf)
return err
}
// WriteSeriesResponseProtobuf marshals a logproto.SeriesResponse to queryrange LokiSeriesResponse
// and then writes it to the provided io.Writer.
func WriteSeriesResponseProtobuf(version loghttp.Version, r logproto.SeriesResponse, w io.Writer) error {
p := QueryResponse{
Response: &QueryResponse_Series{
Series: &LokiSeriesResponse{
Status: "success",
Version: uint32(version),
Data: r.Series,
// Statistics: statResult,
}},
}
buf, err := p.Marshal()
if err != nil {
return err
}
_, err = w.Write(buf)
return err
}
// WriteIndexStatsResponseProtobuf marshals a gatewaypb.Stats to queryrange IndexStatsResponse
// and then writes it to the provided io.Writer.
func WriteIndexStatsResponseProtobuf(r *stats.Stats, w io.Writer) error {
p := QueryResponse{
Response: &QueryResponse_Stats{
Stats: &IndexStatsResponse{
Response: r,
}},
}
buf, err := p.Marshal()
if err != nil {
return err
}
_, err = w.Write(buf)
return err
}
// WriteIndexStatsResponseProtobuf marshals a logproto.VolumeResponse to queryrange.QueryResponse
// and then writes it to the provided io.Writer.
func WriteSeriesVolumeResponseProtobuf(r *logproto.VolumeResponse, w io.Writer) error {
p := QueryResponse{
Response: &QueryResponse_Volume{
Volume: &VolumeResponse{
Response: r,
}},
}
buf, err := p.Marshal()
if err != nil {
return err
}
_, err = w.Write(buf)
return err
}
// ResultToResponse is the reverse of ResponseToResult in downstreamer.
func ResultToResponse(result logqlmodel.Result, params *logql.LiteralParams) (*QueryResponse, error) {
switch data := result.Data.(type) {
case promql.Vector:
sampleStream, err := queryrangebase.FromValue(data)
if err != nil {
return nil, err
}
return &QueryResponse{
Response: &QueryResponse_Prom{
Prom: &LokiPromResponse{
Response: &queryrangebase.PrometheusResponse{
Status: "success",
Data: queryrangebase.PrometheusData{
ResultType: loghttp.ResultTypeVector,
Result: sampleStream,
},
},
Statistics: result.Statistics,
},
},
}, nil
case promql.Matrix:
sampleStream, err := queryrangebase.FromValue(data)
if err != nil {
return nil, err
}
return &QueryResponse{
Response: &QueryResponse_Prom{
Prom: &LokiPromResponse{
Response: &queryrangebase.PrometheusResponse{
Status: "success",
Data: queryrangebase.PrometheusData{
ResultType: loghttp.ResultTypeMatrix,
Result: sampleStream,
},
},
Statistics: result.Statistics,
},
},
}, nil
case promql.Scalar:
sampleStream, err := queryrangebase.FromValue(data)
if err != nil {
return nil, err
}
return &QueryResponse{
Response: &QueryResponse_Prom{
Prom: &LokiPromResponse{
Response: &queryrangebase.PrometheusResponse{
Status: "success",
Data: queryrangebase.PrometheusData{
ResultType: loghttp.ResultTypeScalar,
Result: sampleStream,
},
},
Statistics: result.Statistics,
},
},
}, nil
case logqlmodel.Streams:
return &QueryResponse{
Response: &QueryResponse_Streams{
// Note: we are omitting the Version here because the Protobuf already defines a schema.
Streams: &LokiResponse{
Direction: params.Direction(),
Limit: params.Limit(),
Data: LokiData{
ResultType: loghttp.ResultTypeStream,
Result: data,
},
Status: "success",
Statistics: result.Statistics,
},
},
}, nil
}
return nil, fmt.Errorf("unsupported data type: %t", result.Data)
}

File diff suppressed because it is too large Load Diff

@ -164,3 +164,14 @@ message VolumeResponse {
(gogoproto.customtype) = "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase/definitions.PrometheusResponseHeader"
];
}
message QueryResponse {
oneof response {
LokiSeriesResponse series = 1;
LokiLabelNamesResponse labels = 2;
IndexStatsResponse stats = 3;
LokiPromResponse prom = 4;
LokiResponse streams = 5;
VolumeResponse volume = 6;
}
}

@ -20,7 +20,7 @@ type Codec interface {
// EncodeRequest encodes a Request into an http request.
EncodeRequest(context.Context, Request) (*http.Request, error)
// EncodeResponse encodes a Response into an http response.
EncodeResponse(context.Context, Response) (*http.Response, error)
EncodeResponse(context.Context, *http.Request, Response) (*http.Response, error)
}
// Merger is used by middlewares making multiple requests to merge back all responses into a single one.

@ -51,7 +51,7 @@ func BenchmarkPrometheusCodec_EncodeResponse(b *testing.B) {
b.ReportAllocs()
for n := 0; n < b.N; n++ {
_, err := PrometheusCodec.EncodeResponse(context.Background(), res)
_, err := PrometheusCodec.EncodeResponse(context.Background(), nil, res)
require.NoError(b, err)
}
}

@ -277,7 +277,8 @@ func bodyBuffer(res *http.Response) ([]byte, error) {
return buf.Bytes(), nil
}
func (prometheusCodec) EncodeResponse(ctx context.Context, res Response) (*http.Response, error) {
// TODO(karsten): remove prometheusCodec from code base since only MergeResponse is used.
func (prometheusCodec) EncodeResponse(ctx context.Context, _ *http.Request, res Response) (*http.Response, error) {
sp, _ := opentracing.StartSpanFromContext(ctx, "APIResponse.ToHTTPResponse")
defer sp.Finish()

@ -109,7 +109,7 @@ func TestResponse(t *testing.T) {
Body: io.NopCloser(bytes.NewBuffer([]byte(tc.body))),
ContentLength: int64(len(tc.body)),
}
resp2, err := PrometheusCodec.EncodeResponse(context.Background(), resp)
resp2, err := PrometheusCodec.EncodeResponse(context.Background(), nil, resp)
require.NoError(t, err)
assert.Equal(t, response, resp2)
})

@ -46,8 +46,14 @@ type Config struct {
CacheResults bool `yaml:"cache_results"`
MaxRetries int `yaml:"max_retries"`
ShardedQueries bool `yaml:"parallelise_shardable_queries"`
// TODO(karsten): remove used option ForwardHeaders with Loki 3.0 since
// it's a breaking change.
// List of headers which query_range middleware chain would forward to downstream querier.
ForwardHeaders flagext.StringSlice `yaml:"forward_headers_list"`
// Required format for querier responses
RequiredQueryResponseFormat string `yaml:"required_query_response_format"`
}
// RegisterFlags adds the flags required to config this to the given FlagSet.
@ -57,6 +63,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.CacheResults, "querier.cache-results", false, "Cache query results.")
f.BoolVar(&cfg.ShardedQueries, "querier.parallelise-shardable-queries", true, "Perform query parallelisations based on storage sharding configuration and query ASTs. This feature is supported only by the chunks storage engine.")
f.Var(&cfg.ForwardHeaders, "frontend.forward-headers-list", "List of headers forwarded by the query Frontend to downstream querier.")
f.StringVar(&cfg.RequiredQueryResponseFormat, "frontend.required-query-response-format", "json", "The downstream querier is required to answer in the accepted format. Can be 'json' or 'protobuf'. Note: Both will still be routed over GRPC.")
cfg.ResultsCacheConfig.RegisterFlags(f)
}
@ -157,7 +166,7 @@ func (q roundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
return nil, err
}
return q.codec.EncodeResponse(r.Context(), response)
return q.codec.EncodeResponse(r.Context(), r, response)
}
type roundTripperHandler struct {

@ -19,7 +19,12 @@ func FromResult(res *promql.Result) ([]SampleStream, error) {
// correctly parse the error in parent callers (eg. gRPC response status code extraction).
return nil, errors.Cause(res.Err)
}
switch v := res.Value.(type) {
return FromValue((res.Value))
}
func FromValue(value parser.Value) ([]SampleStream, error) {
switch v := value.(type) {
case promql.Scalar:
return []SampleStream{
{
@ -59,7 +64,7 @@ func FromResult(res *promql.Result) ([]SampleStream, error) {
}
return nil, errors.Errorf("Unexpected value type: [%s]", res.Value.Type())
return nil, errors.Errorf("Unexpected value type: [%s]", value.Type())
}
func mapLabels(ls labels.Labels) []logproto.LabelAdapter {

@ -181,7 +181,7 @@ func Test_astMapper(t *testing.T) {
{Name: "Header", Values: []string{"value"}},
}, resp.GetHeaders())
expected, err := LokiCodec.MergeResponse(lokiResps...)
expected, err := DefaultCodec.MergeResponse(lokiResps...)
sort.Sort(logproto.Streams(expected.(*LokiResponse).Data.Result))
require.Nil(t, err)
require.Equal(t, called, 2)
@ -410,7 +410,7 @@ func Test_InstantSharding(t *testing.T) {
cpyPeriodConf.RowShards = 3
sharding := NewQueryShardMiddleware(log.NewNopLogger(), ShardingConfigs{
cpyPeriodConf,
}, testEngineOpts, LokiCodec, queryrangebase.NewInstrumentMiddlewareMetrics(nil),
}, testEngineOpts, DefaultCodec, queryrangebase.NewInstrumentMiddlewareMetrics(nil),
nilShardingMetrics,
fakeLimits{
maxSeries: math.MaxInt32,
@ -475,7 +475,7 @@ func Test_SeriesShardingHandler(t *testing.T) {
fakeLimits{
maxQueryParallelism: 10,
},
LokiCodec,
DefaultCodec,
)
ctx := user.InjectOrgID(context.Background(), "1")
@ -778,7 +778,7 @@ func TestShardingAcrossConfigs_SeriesSharding(t *testing.T) {
fakeLimits{
maxQueryParallelism: 10,
},
LokiCodec,
DefaultCodec,
)
_, err := mware.Wrap(queryrangebase.HandlerFunc(func(c context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {

@ -129,46 +129,51 @@ func NewTripperware(
}
}
indexStatsTripperware, err := NewIndexStatsTripperware(cfg, log, limits, schema, LokiCodec, statsCache,
var codec queryrangebase.Codec = DefaultCodec
if cfg.RequiredQueryResponseFormat == "protobuf" {
codec = &RequestProtobufCodec{}
}
indexStatsTripperware, err := NewIndexStatsTripperware(cfg, log, limits, schema, codec, statsCache,
cacheGenNumLoader, retentionEnabled, metrics)
if err != nil {
return nil, nil, err
}
metricsTripperware, err := NewMetricTripperware(cfg, engineOpts, log, limits, schema, LokiCodec, resultsCache,
metricsTripperware, err := NewMetricTripperware(cfg, engineOpts, log, limits, schema, codec, resultsCache,
cacheGenNumLoader, retentionEnabled, PrometheusExtractor{}, metrics, indexStatsTripperware)
if err != nil {
return nil, nil, err
}
limitedTripperware, err := NewLimitedTripperware(cfg, engineOpts, log, limits, schema, LokiCodec, metrics, indexStatsTripperware)
limitedTripperware, err := NewLimitedTripperware(cfg, engineOpts, log, limits, schema, codec, metrics, indexStatsTripperware)
if err != nil {
return nil, nil, err
}
// NOTE: When we would start caching response from non-metric queries we would have to consider cache gen headers as well in
// MergeResponse implementation for Loki codecs same as it is done in Cortex at https://github.com/cortexproject/cortex/blob/21bad57b346c730d684d6d0205efef133422ab28/pkg/querier/queryrange/query_range.go#L170
logFilterTripperware, err := NewLogFilterTripperware(cfg, engineOpts, log, limits, schema, LokiCodec, resultsCache, metrics, indexStatsTripperware)
logFilterTripperware, err := NewLogFilterTripperware(cfg, engineOpts, log, limits, schema, codec, resultsCache, metrics, indexStatsTripperware)
if err != nil {
return nil, nil, err
}
seriesTripperware, err := NewSeriesTripperware(cfg, log, limits, LokiCodec, metrics, schema)
seriesTripperware, err := NewSeriesTripperware(cfg, log, limits, codec, metrics, schema)
if err != nil {
return nil, nil, err
}
labelsTripperware, err := NewLabelsTripperware(cfg, log, limits, LokiCodec, metrics, schema)
labelsTripperware, err := NewLabelsTripperware(cfg, log, limits, codec, metrics, schema)
if err != nil {
return nil, nil, err
}
instantMetricTripperware, err := NewInstantMetricTripperware(cfg, engineOpts, log, limits, schema, LokiCodec, metrics, indexStatsTripperware)
instantMetricTripperware, err := NewInstantMetricTripperware(cfg, engineOpts, log, limits, schema, codec, metrics, indexStatsTripperware)
if err != nil {
return nil, nil, err
}
seriesVolumeTripperware, err := NewSeriesVolumeTripperware(cfg, log, limits, schema, LokiCodec, statsCache, cacheGenNumLoader, retentionEnabled, metrics)
seriesVolumeTripperware, err := NewSeriesVolumeTripperware(cfg, log, limits, schema, codec, statsCache, cacheGenNumLoader, retentionEnabled, metrics)
if err != nil {
return nil, nil, err
}
@ -816,7 +821,7 @@ func volumeRangeTripperware(codec queryrangebase.Codec, nextTW queryrangebase.Tr
return nil, err
}
return codec.EncodeResponse(r.Context(), response)
return codec.EncodeResponse(r.Context(), r, response)
})
}
}

@ -190,7 +190,7 @@ func TestMetricsTripperware(t *testing.T) {
}
ctx := user.InjectOrgID(context.Background(), "1")
req, err := LokiCodec.EncodeRequest(ctx, lreq)
req, err := DefaultCodec.EncodeRequest(ctx, lreq)
require.NoError(t, err)
req = req.WithContext(ctx)
@ -248,7 +248,7 @@ func TestMetricsTripperware(t *testing.T) {
// 2 queries
require.Equal(t, 2, *count)
require.NoError(t, err)
lokiResponse, err := LokiCodec.DecodeResponse(ctx, resp, lreq)
lokiResponse, err := DefaultCodec.DecodeResponse(ctx, resp, lreq)
require.NoError(t, err)
// testing cache
@ -258,7 +258,7 @@ func TestMetricsTripperware(t *testing.T) {
// 0 queries result are cached.
require.Equal(t, 0, *count)
require.NoError(t, err)
lokiCacheResponse, err := LokiCodec.DecodeResponse(ctx, cacheResp, lreq)
lokiCacheResponse, err := DefaultCodec.DecodeResponse(ctx, cacheResp, lreq)
require.NoError(t, err)
require.Equal(t, lokiResponse.(*LokiPromResponse).Response, lokiCacheResponse.(*LokiPromResponse).Response)
@ -293,7 +293,7 @@ func TestLogFilterTripperware(t *testing.T) {
}
ctx := user.InjectOrgID(context.Background(), "1")
req, err := LokiCodec.EncodeRequest(ctx, lreq)
req, err := DefaultCodec.EncodeRequest(ctx, lreq)
require.NoError(t, err)
req = req.WithContext(ctx)
@ -309,7 +309,7 @@ func TestLogFilterTripperware(t *testing.T) {
// set the query length back to normal
lreq.StartTs = testTime.Add(-6 * time.Hour)
req, err = LokiCodec.EncodeRequest(ctx, lreq)
req, err = DefaultCodec.EncodeRequest(ctx, lreq)
require.NoError(t, err)
// testing retry
@ -370,7 +370,7 @@ func TestInstantQueryTripperware(t *testing.T) {
}
ctx := user.InjectOrgID(context.Background(), "1")
req, err := LokiCodec.EncodeRequest(ctx, lreq)
req, err := DefaultCodec.EncodeRequest(ctx, lreq)
require.NoError(t, err)
req = req.WithContext(ctx)
@ -402,7 +402,7 @@ func TestInstantQueryTripperware(t *testing.T) {
require.Equal(t, 1, *count)
require.NoError(t, err)
lokiResponse, err := LokiCodec.DecodeResponse(ctx, resp, lreq)
lokiResponse, err := DefaultCodec.DecodeResponse(ctx, resp, lreq)
require.NoError(t, err)
require.IsType(t, &LokiPromResponse{}, lokiResponse)
}
@ -425,7 +425,7 @@ func TestSeriesTripperware(t *testing.T) {
}
ctx := user.InjectOrgID(context.Background(), "1")
req, err := LokiCodec.EncodeRequest(ctx, lreq)
req, err := DefaultCodec.EncodeRequest(ctx, lreq)
require.NoError(t, err)
req = req.WithContext(ctx)
@ -438,7 +438,7 @@ func TestSeriesTripperware(t *testing.T) {
// 2 queries
require.Equal(t, 2, *count)
require.NoError(t, err)
lokiSeriesResponse, err := LokiCodec.DecodeResponse(ctx, resp, lreq)
lokiSeriesResponse, err := DefaultCodec.DecodeResponse(ctx, resp, lreq)
res, ok := lokiSeriesResponse.(*LokiSeriesResponse)
require.Equal(t, true, ok)
@ -465,7 +465,7 @@ func TestLabelsTripperware(t *testing.T) {
}
ctx := user.InjectOrgID(context.Background(), "1")
req, err := LokiCodec.EncodeRequest(ctx, lreq)
req, err := DefaultCodec.EncodeRequest(ctx, lreq)
require.NoError(t, err)
req = req.WithContext(ctx)
@ -486,7 +486,7 @@ func TestLabelsTripperware(t *testing.T) {
// verify 2 calls have been made to downstream.
require.Equal(t, 2, handler.count)
require.NoError(t, err)
lokiLabelsResponse, err := LokiCodec.DecodeResponse(ctx, resp, lreq)
lokiLabelsResponse, err := DefaultCodec.DecodeResponse(ctx, resp, lreq)
res, ok := lokiLabelsResponse.(*LokiLabelNamesResponse)
require.Equal(t, true, ok)
require.Equal(t, []string{"foo", "bar", "blop", "blip"}, res.Data)
@ -511,7 +511,7 @@ func TestIndexStatsTripperware(t *testing.T) {
}
ctx := user.InjectOrgID(context.Background(), "1")
req, err := LokiCodec.EncodeRequest(ctx, lreq)
req, err := DefaultCodec.EncodeRequest(ctx, lreq)
require.NoError(t, err)
req = req.WithContext(ctx)
@ -541,7 +541,7 @@ func TestIndexStatsTripperware(t *testing.T) {
require.Equal(t, 0, *count)
// Test the response is the expected
indexStatsResponse, err := LokiCodec.DecodeResponse(ctx, resp, lreq)
indexStatsResponse, err := DefaultCodec.DecodeResponse(ctx, resp, lreq)
require.NoError(t, err)
res, ok := indexStatsResponse.(*IndexStatsResponse)
require.Equal(t, true, ok)
@ -572,7 +572,7 @@ func TestSeriesVolumeTripperware(t *testing.T) {
}
ctx := user.InjectOrgID(context.Background(), "1")
req, err := LokiCodec.EncodeRequest(ctx, lreq)
req, err := DefaultCodec.EncodeRequest(ctx, lreq)
require.NoError(t, err)
req = req.WithContext(ctx)
@ -588,7 +588,7 @@ func TestSeriesVolumeTripperware(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 2, *count) // 2 queries from splitting
volumeResp, err := LokiCodec.DecodeResponse(ctx, resp, nil)
volumeResp, err := DefaultCodec.DecodeResponse(ctx, resp, nil)
require.NoError(t, err)
expected := queryrangebase.PrometheusData{
@ -646,7 +646,7 @@ func TestSeriesVolumeTripperware(t *testing.T) {
}
ctx := user.InjectOrgID(context.Background(), "1")
req, err := LokiCodec.EncodeRequest(ctx, lreq)
req, err := DefaultCodec.EncodeRequest(ctx, lreq)
require.NoError(t, err)
req = req.WithContext(ctx)
@ -668,7 +668,7 @@ func TestSeriesVolumeTripperware(t *testing.T) {
*/
require.Equal(t, 6, *count) // 6 queries from splitting into step buckets
volumeResp, err := LokiCodec.DecodeResponse(ctx, resp, nil)
volumeResp, err := DefaultCodec.DecodeResponse(ctx, resp, nil)
require.NoError(t, err)
barBazExpectedSamples := []logproto.LegacySample{}
@ -883,7 +883,7 @@ func TestLogNoFilter(t *testing.T) {
}
ctx := user.InjectOrgID(context.Background(), "1")
req, err := LokiCodec.EncodeRequest(ctx, lreq)
req, err := DefaultCodec.EncodeRequest(ctx, lreq)
require.NoError(t, err)
req = req.WithContext(ctx)
@ -918,7 +918,7 @@ func TestRegexpParamsSupport(t *testing.T) {
}
ctx := user.InjectOrgID(context.Background(), "1")
req, err := LokiCodec.EncodeRequest(ctx, lreq)
req, err := DefaultCodec.EncodeRequest(ctx, lreq)
require.NoError(t, err)
// fudge a regexp params
@ -1014,7 +1014,7 @@ func TestTripperware_EntriesLimit(t *testing.T) {
}
ctx := user.InjectOrgID(context.Background(), "1")
req, err := LokiCodec.EncodeRequest(ctx, lreq)
req, err := DefaultCodec.EncodeRequest(ctx, lreq)
require.NoError(t, err)
req = req.WithContext(ctx)
@ -1064,7 +1064,7 @@ func TestTripperware_RequiredLabels(t *testing.T) {
}
ctx := user.InjectOrgID(context.Background(), "1")
req, err := LokiCodec.EncodeRequest(ctx, lreq)
req, err := DefaultCodec.EncodeRequest(ctx, lreq)
require.NoError(t, err)
req = req.WithContext(ctx)
@ -1176,7 +1176,7 @@ func TestTripperware_RequiredNumberLabels(t *testing.T) {
}
ctx := user.InjectOrgID(context.Background(), "1")
req, err := LokiCodec.EncodeRequest(ctx, lreq)
req, err := DefaultCodec.EncodeRequest(ctx, lreq)
require.NoError(t, err)
req = req.WithContext(ctx)
@ -1344,7 +1344,7 @@ func TestMetricsTripperware_SplitShardStats(t *testing.T) {
require.NoError(t, err)
ctx := user.InjectOrgID(context.Background(), "1")
req, err := LokiCodec.EncodeRequest(ctx, tc.request)
req, err := DefaultCodec.EncodeRequest(ctx, tc.request)
require.NoError(t, err)
req = req.WithContext(ctx)
@ -1360,7 +1360,7 @@ func TestMetricsTripperware_SplitShardStats(t *testing.T) {
resp, err := tpw(rt).RoundTrip(req)
require.NoError(t, err)
lokiResponse, err := LokiCodec.DecodeResponse(ctx, resp, tc.request)
lokiResponse, err := DefaultCodec.DecodeResponse(ctx, resp, tc.request)
require.NoError(t, err)
require.Equal(t, tc.expectedSplitStats, lokiResponse.(*LokiPromResponse).Statistics.Summary.Splits)

@ -635,7 +635,7 @@ func Test_splitByInterval_Do(t *testing.T) {
split := SplitByIntervalMiddleware(
testSchemas,
l,
LokiCodec,
DefaultCodec,
splitByTime,
nilMetrics,
).Wrap(next)
@ -808,7 +808,7 @@ func Test_series_splitByInterval_Do(t *testing.T) {
split := SplitByIntervalMiddleware(
testSchemas,
l,
LokiCodec,
DefaultCodec,
splitByTime,
nilMetrics,
).Wrap(next)
@ -857,7 +857,7 @@ func Test_seriesvolume_splitByInterval_Do(t *testing.T) {
return SplitByIntervalMiddleware(
testSchemas,
l,
LokiCodec,
DefaultCodec,
splitByTime,
nilMetrics,
).Wrap(next)
@ -1018,7 +1018,7 @@ func Test_ExitEarly(t *testing.T) {
split := SplitByIntervalMiddleware(
testSchemas,
l,
LokiCodec,
DefaultCodec,
splitByTime,
nilMetrics,
).Wrap(next)
@ -1100,7 +1100,7 @@ func Test_DoesntDeadlock(t *testing.T) {
split := SplitByIntervalMiddleware(
testSchemas,
l,
LokiCodec,
DefaultCodec,
splitByTime,
nilMetrics,
).Wrap(next)

@ -138,11 +138,11 @@ func StatsCollectorMiddleware() queryrangebase.Middleware {
}
queryType = queryTypeMetric
case *LokiSeriesResponse:
responseStats = &r.Statistics
responseStats = &r.Statistics // TODO: this is always nil. See codec.DecodeResponse
totalEntries = len(r.Data)
queryType = queryTypeSeries
case *LokiLabelNamesResponse:
responseStats = &r.Statistics
responseStats = &r.Statistics // TODO: this is always nil. See codec.DecodeResponse
totalEntries = len(r.Data)
queryType = queryTypeLabel
default:

@ -17,7 +17,8 @@ import (
func Test_InitQuerierService(t *testing.T) {
var mockQueryHandlers = map[string]http.Handler{
"/loki/api/v1/query": http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
_, err := res.Write([]byte("test handler"))
_, err := res.Write([]byte(`{"handler": "test"}`))
res.Header().Del("Content-Type")
require.NoError(t, err)
}),
}
@ -67,7 +68,7 @@ func Test_InitQuerierService(t *testing.T) {
request := httptest.NewRequest("GET", "/loki/api/v1/query", nil)
externalRouter.ServeHTTP(recorder, request)
assert.Equal(t, 200, recorder.Code)
assert.Equal(t, "test handler", recorder.Body.String())
assert.Equal(t, `{"handler": "test"}`, recorder.Body.String())
// Tail endpoints always external
recorder = httptest.NewRecorder()

@ -5,6 +5,7 @@ package marshal
import (
"fmt"
"io"
"net/http"
"github.com/gorilla/websocket"
jsoniter "github.com/json-iterator/go"
@ -14,8 +15,35 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/storage/stores/index/stats"
marshal_legacy "github.com/grafana/loki/pkg/util/marshal/legacy"
)
func WriteResponseJSON(r *http.Request, v any, w http.ResponseWriter) error {
switch result := v.(type) {
case logqlmodel.Result:
version := loghttp.GetVersion(r.RequestURI)
if version == loghttp.VersionV1 {
return WriteQueryResponseJSON(result, w)
}
return marshal_legacy.WriteQueryResponseJSON(result, w)
case *logproto.LabelResponse:
version := loghttp.GetVersion(r.RequestURI)
if version == loghttp.VersionV1 {
return WriteLabelResponseJSON(*result, w)
}
return marshal_legacy.WriteLabelResponseJSON(*result, w)
case logproto.SeriesResponse:
return WriteSeriesResponseJSON(result, w)
case *stats.Stats:
return WriteIndexStatsResponseJSON(result, w)
case *logproto.VolumeResponse:
return WriteSeriesVolumeResponseJSON(result, w)
}
return fmt.Errorf("unknown response type %T", v)
}
// WriteQueryResponseJSON marshals the promql.Value to v1 loghttp JSON and then
// writes it to the provided io.Writer.
func WriteQueryResponseJSON(v logqlmodel.Result, w io.Writer) error {
@ -99,7 +127,7 @@ func WriteIndexStatsResponseJSON(r *stats.Stats, w io.Writer) error {
return s.Flush()
}
// WriteIndexStatsResponseJSON marshals a gatewaypb.Stats to JSON and then
// WriteSeriesVolumeResponseJSON marshals a logproto.VolumeResponse to JSON and then
// writes it to the provided io.Writer.
func WriteSeriesVolumeResponseJSON(r *logproto.VolumeResponse, w io.Writer) error {
s := jsoniter.ConfigFastest.BorrowStream(w)

@ -23,11 +23,14 @@ func NewPrepopulateMiddleware() middleware.Interface {
})
}
// ResponseJSONMiddleware sets the Content-Type header to JSON if it's not set.
func ResponseJSONMiddleware() middleware.Interface {
return middleware.Func(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
next.ServeHTTP(w, req)
if w.Header().Get("Content-Type") == "" {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
}
})
})
}

@ -102,6 +102,7 @@ query_range:
cache:
memcached_client:
addresses: memcached:11211
required_query_response_format: protobuf
schema_config:
configs:
- from: "2020-07-30"

@ -1,6 +1,6 @@
FROM golang:1.20.4
ENV CGO_ENABLED=0
RUN go install github.com/go-delve/delve/cmd/dlv@v1.9.0
RUN go install github.com/go-delve/delve/cmd/dlv@v1.20.2
FROM alpine:3.16.4

Loading…
Cancel
Save