Fixes instant queries in the frontend. (#4091)

* Fixes instant queries in the frontend.

This is a set of fixes for instant queries:
- correctly propagate statistics.
- correctly convert sampleStream to vector.
- correctly marshal to json.
- correctly encode the time for the request.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes tests.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
pull/4094/head
Cyril Tovena 4 years ago committed by GitHub
parent c14172dc63
commit 1efeace644
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      pkg/querier/queryrange/codec.go
  2. 27
      pkg/querier/queryrange/downstreamer.go
  3. 90
      pkg/querier/queryrange/prometheus.go
  4. 159
      pkg/querier/queryrange/prometheus_test.go
  5. 1
      pkg/querier/queryrange/querysharding.go
  6. 34
      pkg/querier/queryrange/querysharding_test.go
  7. 7
      pkg/querier/queryrange/stats.go

@ -328,6 +328,7 @@ func (Codec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.Req
"query": []string{request.Query},
"direction": []string{request.Direction.String()},
"limit": []string{fmt.Sprintf("%d", request.Limit)},
"time": []string{fmt.Sprintf("%d", request.TimeTs.UnixNano())},
}
if len(request.Shards) > 0 {
params["shards"] = request.Shards

@ -12,6 +12,7 @@ import (
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logqlmodel"
)
@ -162,6 +163,25 @@ func sampleStreamToMatrix(streams []queryrange.SampleStream) parser.Value {
return xs
}
func sampleStreamToVector(streams []queryrange.SampleStream) parser.Value {
xs := make(promql.Vector, 0, len(streams))
for _, stream := range streams {
x := promql.Sample{}
x.Metric = make(labels.Labels, 0, len(stream.Labels))
for _, l := range stream.Labels {
x.Metric = append(x.Metric, labels.Label(l))
}
x.Point = promql.Point{
T: stream.Samples[0].TimestampMs,
V: stream.Samples[0].Value,
}
xs = append(xs, x)
}
return xs
}
func ResponseToResult(resp queryrange.Response) (logqlmodel.Result, error) {
switch r := resp.(type) {
case *LokiResponse:
@ -184,7 +204,12 @@ func ResponseToResult(resp queryrange.Response) (logqlmodel.Result, error) {
if r.Response.Error != "" {
return logqlmodel.Result{}, fmt.Errorf("%s: %s", r.Response.ErrorType, r.Response.Error)
}
if r.Response.Data.ResultType == loghttp.ResultTypeVector {
return logqlmodel.Result{
Statistics: r.Statistics,
Data: sampleStreamToVector(r.Response.Data.Result),
}, nil
}
return logqlmodel.Result{
Statistics: r.Statistics,
Data: sampleStreamToMatrix(r.Response.Data.Result),

@ -10,7 +10,9 @@ import (
jsoniter "github.com/json-iterator/go"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/prometheus/common/model"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logqlmodel/stats"
)
@ -41,12 +43,78 @@ func (PrometheusExtractor) ResponseWithoutHeaders(resp queryrange.Response) quer
// encode encodes a Prometheus response and injects Loki stats.
func (p *LokiPromResponse) encode(ctx context.Context) (*http.Response, error) {
sp := opentracing.SpanFromContext(ctx)
var (
b []byte
err error
)
if p.Response.Data.ResultType == loghttp.ResultTypeVector {
b, err = p.marshalVector()
} else {
b, err = p.marshalMatrix()
}
if err != nil {
return nil, err
}
if sp != nil {
sp.LogFields(otlog.Int("bytes", len(b)))
}
resp := http.Response{
Header: http.Header{
"Content-Type": []string{"application/json"},
},
Body: ioutil.NopCloser(bytes.NewBuffer(b)),
StatusCode: http.StatusOK,
}
return &resp, nil
}
func (p *LokiPromResponse) marshalVector() ([]byte, error) {
vec := make(loghttp.Vector, len(p.Response.Data.Result))
for i, v := range p.Response.Data.Result {
lbs := make(model.LabelSet, len(v.Labels))
for _, v := range v.Labels {
lbs[model.LabelName(v.Name)] = model.LabelValue(v.Value)
}
vec[i] = model.Sample{
Metric: model.Metric(lbs),
Timestamp: model.Time(v.Samples[0].TimestampMs),
Value: model.SampleValue(v.Samples[0].Value),
}
}
return jsonStd.Marshal(struct {
Status string `json:"status"`
Data struct {
ResultType string `json:"resultType"`
Result loghttp.Vector `json:"result"`
Statistics stats.Result `json:"stats,omitempty"`
} `json:"data,omitempty"`
ErrorType string `json:"errorType,omitempty"`
Error string `json:"error,omitempty"`
}{
Error: p.Response.Error,
Data: struct {
ResultType string `json:"resultType"`
Result loghttp.Vector `json:"result"`
Statistics stats.Result `json:"stats,omitempty"`
}{
ResultType: loghttp.ResultTypeVector,
Result: vec,
Statistics: p.Statistics,
},
ErrorType: p.Response.ErrorType,
Status: p.Response.Status,
})
}
func (p *LokiPromResponse) marshalMatrix() ([]byte, error) {
// embed response and add statistics.
b, err := jsonStd.Marshal(struct {
return jsonStd.Marshal(struct {
Status string `json:"status"`
Data struct {
queryrange.PrometheusData
Statistics stats.Result `json:"stats"`
Statistics stats.Result `json:"stats,omitempty"`
} `json:"data,omitempty"`
ErrorType string `json:"errorType,omitempty"`
Error string `json:"error,omitempty"`
@ -54,7 +122,7 @@ func (p *LokiPromResponse) encode(ctx context.Context) (*http.Response, error) {
Error: p.Response.Error,
Data: struct {
queryrange.PrometheusData
Statistics stats.Result `json:"stats"`
Statistics stats.Result `json:"stats,omitempty"`
}{
PrometheusData: p.Response.Data,
Statistics: p.Statistics,
@ -62,20 +130,4 @@ func (p *LokiPromResponse) encode(ctx context.Context) (*http.Response, error) {
ErrorType: p.Response.ErrorType,
Status: p.Response.Status,
})
if err != nil {
return nil, err
}
if sp != nil {
sp.LogFields(otlog.Int("bytes", len(b)))
}
resp := http.Response{
Header: http.Header{
"Content-Type": []string{"application/json"},
},
Body: ioutil.NopCloser(bytes.NewBuffer(b)),
StatusCode: http.StatusOK,
}
return &resp, nil
}

@ -0,0 +1,159 @@
package queryrange
import (
"context"
"io"
"testing"
"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/loghttp"
)
var emptyStats = `"stats": {
"summary": {
"bytesProcessedPerSecond": 0,
"linesProcessedPerSecond": 0,
"totalBytesProcessed": 0,
"totalLinesProcessed": 0,
"execTime": 0.0
},
"store": {
"totalChunksRef": 0,
"totalChunksDownloaded": 0,
"chunksDownloadTime": 0,
"headChunkBytes": 0,
"headChunkLines": 0,
"decompressedBytes": 0,
"decompressedLines": 0,
"compressedBytes": 0,
"totalDuplicates": 0
},
"ingester": {
"totalReached": 0,
"totalChunksMatched": 0,
"totalBatches": 0,
"totalLinesSent": 0,
"headChunkBytes": 0,
"headChunkLines": 0,
"decompressedBytes": 0,
"decompressedLines": 0,
"compressedBytes": 0,
"totalDuplicates": 0
}
}`
func Test_encodePromResponse(t *testing.T) {
for _, tt := range []struct {
name string
resp *LokiPromResponse
want string
}{
{
"matrix",
&LokiPromResponse{
Response: &queryrange.PrometheusResponse{
Status: string(queryrange.StatusSuccess),
Data: queryrange.PrometheusData{
ResultType: loghttp.ResultTypeMatrix,
Result: []queryrange.SampleStream{
{
Labels: []cortexpb.LabelAdapter{
{Name: "foo", Value: "bar"},
},
Samples: []cortexpb.Sample{
{Value: 1, TimestampMs: 1000},
{Value: 1, TimestampMs: 2000},
},
},
{
Labels: []cortexpb.LabelAdapter{
{Name: "foo", Value: "buzz"},
},
Samples: []cortexpb.Sample{
{Value: 4, TimestampMs: 1000},
{Value: 5, TimestampMs: 2000},
},
},
},
},
},
},
`{
"status": "success",
"data": {
"resultType": "matrix",
"result": [
{
"metric": {"foo": "bar"},
"values": [[1, "1"],[2, "1"]]
},
{
"metric": {"foo": "buzz"},
"values": [[1, "4"],[2, "5"]]
}
],
` + emptyStats + `
}
}`,
},
{
"vector",
&LokiPromResponse{
Response: &queryrange.PrometheusResponse{
Status: string(queryrange.StatusSuccess),
Data: queryrange.PrometheusData{
ResultType: loghttp.ResultTypeVector,
Result: []queryrange.SampleStream{
{
Labels: []cortexpb.LabelAdapter{
{Name: "foo", Value: "bar"},
},
Samples: []cortexpb.Sample{
{Value: 1, TimestampMs: 1000},
},
},
{
Labels: []cortexpb.LabelAdapter{
{Name: "foo", Value: "buzz"},
},
Samples: []cortexpb.Sample{
{Value: 4, TimestampMs: 1000},
},
},
},
},
},
},
`{
"status": "success",
"data": {
"resultType": "vector",
"result": [
{
"metric": {"foo": "bar"},
"value": [1, "1"]
},
{
"metric": {"foo": "buzz"},
"value": [1, "4"]
}
],
` + emptyStats + `
}
}`,
},
} {
tt := tt
t.Run(tt.name, func(t *testing.T) {
r, err := tt.resp.encode(context.Background())
require.NoError(t, err)
b, err := io.ReadAll(r.Body)
require.NoError(t, err)
got := string(b)
require.JSONEq(t, tt.want, got)
})
}
}

@ -169,6 +169,7 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrange.Request) (queryra
}, nil
case parser.ValueTypeVector:
return &LokiPromResponse{
Statistics: res.Statistics,
Response: &queryrange.PrometheusResponse{
Status: loghttp.QueryStatusSuccess,
Data: queryrange.PrometheusData{

@ -283,26 +283,24 @@ func Test_InstantSharding(t *testing.T) {
require.Equal(t, 3, called, "expected 3 calls but got {}", called)
require.Len(t, response.(*LokiPromResponse).Response.Data.Result, 3)
require.ElementsMatch(t, []string{"0_of_3", "1_of_3", "2_of_3"}, shards)
require.Equal(t, &LokiPromResponse{Response: &queryrange.PrometheusResponse{
Status: "success",
Data: queryrange.PrometheusData{
ResultType: loghttp.ResultTypeVector,
Result: []queryrange.SampleStream{
{
Labels: []cortexpb.LabelAdapter{{Name: "foo", Value: "bar"}},
Samples: []cortexpb.Sample{{Value: 10, TimestampMs: 10}},
},
{
Labels: []cortexpb.LabelAdapter{{Name: "foo", Value: "bar"}},
Samples: []cortexpb.Sample{{Value: 10, TimestampMs: 10}},
},
{
Labels: []cortexpb.LabelAdapter{{Name: "foo", Value: "bar"}},
Samples: []cortexpb.Sample{{Value: 10, TimestampMs: 10}},
},
require.Equal(t, queryrange.PrometheusData{
ResultType: loghttp.ResultTypeVector,
Result: []queryrange.SampleStream{
{
Labels: []cortexpb.LabelAdapter{{Name: "foo", Value: "bar"}},
Samples: []cortexpb.Sample{{Value: 10, TimestampMs: 10}},
},
{
Labels: []cortexpb.LabelAdapter{{Name: "foo", Value: "bar"}},
Samples: []cortexpb.Sample{{Value: 10, TimestampMs: 10}},
},
{
Labels: []cortexpb.LabelAdapter{{Name: "foo", Value: "bar"}},
Samples: []cortexpb.Sample{{Value: 10, TimestampMs: 10}},
},
},
}}, response)
}, response.(*LokiPromResponse).Response.Data)
require.Equal(t, loghttp.QueryStatusSuccess, response.(*LokiPromResponse).Response.Status)
}
func Test_SeriesShardingHandler(t *testing.T) {

@ -110,10 +110,11 @@ func StatsCollectorMiddleware() queryrange.Middleware {
data.recorded = true
data.statistics = statistics
data.result = res
data.params, err = paramsFromRequest(req)
if err != nil {
return nil, err
p, errReq := paramsFromRequest(req)
if errReq != nil {
return nil, errReq
}
data.params = p
}
return resp, err
})

Loading…
Cancel
Save