exposes loki codec (#3758)

pull/3762/head
Owen Diehl 5 years ago committed by GitHub
parent 15d417efe1
commit 1f6453fb81
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      pkg/querier/queryrange/codec.go
  2. 24
      pkg/querier/queryrange/codec_test.go
  3. 6
      pkg/querier/queryrange/limits_test.go
  4. 2
      pkg/querier/queryrange/querysharding_test.go
  5. 8
      pkg/querier/queryrange/roundtrip.go
  6. 26
      pkg/querier/queryrange/roundtrip_test.go
  7. 8
      pkg/querier/queryrange/split_by_interval_test.go

@ -30,9 +30,9 @@ import (
marshal_legacy "github.com/grafana/loki/pkg/util/marshal/legacy"
)
var lokiCodec = &codec{}
var LokiCodec = &Codec{}
type codec struct{}
type Codec struct{}
func (r *LokiRequest) GetEnd() int64 {
return r.EndTs.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond))
@ -150,7 +150,7 @@ func (r *LokiLabelNamesRequest) LogToSpan(sp opentracing.Span) {
func (*LokiLabelNamesRequest) GetCachingOptions() (res queryrange.CachingOptions) { return }
func (codec) DecodeRequest(_ context.Context, r *http.Request) (queryrange.Request, error) {
func (Codec) DecodeRequest(_ context.Context, r *http.Request) (queryrange.Request, error) {
if err := r.ParseForm(); err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
@ -199,7 +199,7 @@ func (codec) DecodeRequest(_ context.Context, r *http.Request) (queryrange.Reque
}
func (codec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.Request, error) {
func (Codec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.Request, error) {
switch request := r.(type) {
case *LokiRequest:
params := url.Values{
@ -271,7 +271,7 @@ func (codec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.Req
}
}
func (codec) DecodeResponse(ctx context.Context, r *http.Response, req queryrange.Request) (queryrange.Response, error) {
func (Codec) DecodeResponse(ctx context.Context, r *http.Response, req queryrange.Request) (queryrange.Response, error) {
if r.StatusCode/100 != 2 {
body, _ := ioutil.ReadAll(r.Body)
return nil, httpgrpc.Errorf(r.StatusCode, string(body))
@ -358,7 +358,7 @@ func (codec) DecodeResponse(ctx context.Context, r *http.Response, req queryrang
}
func (codec) EncodeResponse(ctx context.Context, res queryrange.Response) (*http.Response, error) {
func (Codec) EncodeResponse(ctx context.Context, res queryrange.Response) (*http.Response, error) {
sp, _ := opentracing.StartSpanFromContext(ctx, "codec.EncodeResponse")
defer sp.Finish()
var buf bytes.Buffer
@ -424,7 +424,7 @@ func (codec) EncodeResponse(ctx context.Context, res queryrange.Response) (*http
// NOTE: When we would start caching response from non-metric queries we would have to consider cache gen headers as well in
// MergeResponse implementation for Loki codecs same as it is done in Cortex at https://github.com/cortexproject/cortex/blob/21bad57b346c730d684d6d0205efef133422ab28/pkg/querier/queryrange/query_range.go#L170
func (codec) MergeResponse(responses ...queryrange.Response) (queryrange.Response, error) {
func (Codec) MergeResponse(responses ...queryrange.Response) (queryrange.Response, error) {
if len(responses) == 0 {
return nil, errors.New("merging responses requires at least one response")
}

@ -84,7 +84,7 @@ func Test_codec_DecodeRequest(t *testing.T) {
if err != nil {
t.Fatal(err)
}
got, err := lokiCodec.DecodeRequest(context.TODO(), req)
got, err := LokiCodec.DecodeRequest(context.TODO(), req)
if (err != nil) != tt.wantErr {
t.Errorf("codec.DecodeRequest() error = %v, wantErr %v", err, tt.wantErr)
return
@ -171,7 +171,7 @@ func Test_codec_DecodeResponse(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := lokiCodec.DecodeResponse(context.TODO(), tt.res, tt.req)
got, err := LokiCodec.DecodeResponse(context.TODO(), tt.res, tt.req)
if (err != nil) != tt.wantErr {
t.Errorf("codec.DecodeResponse() error = %v, wantErr %v", err, tt.wantErr)
return
@ -183,7 +183,7 @@ func Test_codec_DecodeResponse(t *testing.T) {
func Test_codec_EncodeRequest(t *testing.T) {
// we only accept LokiRequest.
got, err := lokiCodec.EncodeRequest(context.TODO(), &queryrange.PrometheusRequest{})
got, err := LokiCodec.EncodeRequest(context.TODO(), &queryrange.PrometheusRequest{})
require.Error(t, err)
require.Nil(t, got)
@ -197,7 +197,7 @@ func Test_codec_EncodeRequest(t *testing.T) {
StartTs: start,
EndTs: end,
}
got, err = lokiCodec.EncodeRequest(ctx, toEncode)
got, err = LokiCodec.EncodeRequest(ctx, toEncode)
require.NoError(t, err)
require.Equal(t, ctx, got.Context())
require.Equal(t, "/loki/api/v1/query_range", got.URL.Path)
@ -209,7 +209,7 @@ func Test_codec_EncodeRequest(t *testing.T) {
require.Equal(t, "86400.000000", got.URL.Query().Get("step"))
// testing a full roundtrip
req, err := lokiCodec.DecodeRequest(context.TODO(), got)
req, err := LokiCodec.DecodeRequest(context.TODO(), got)
require.NoError(t, err)
require.Equal(t, toEncode.Query, req.(*LokiRequest).Query)
require.Equal(t, toEncode.Step, req.(*LokiRequest).Step)
@ -221,7 +221,7 @@ func Test_codec_EncodeRequest(t *testing.T) {
}
func Test_codec_series_EncodeRequest(t *testing.T) {
got, err := lokiCodec.EncodeRequest(context.TODO(), &queryrange.PrometheusRequest{})
got, err := LokiCodec.EncodeRequest(context.TODO(), &queryrange.PrometheusRequest{})
require.Error(t, err)
require.Nil(t, got)
@ -232,7 +232,7 @@ func Test_codec_series_EncodeRequest(t *testing.T) {
StartTs: start,
EndTs: end,
}
got, err = lokiCodec.EncodeRequest(ctx, toEncode)
got, err = LokiCodec.EncodeRequest(ctx, toEncode)
require.NoError(t, err)
require.Equal(t, ctx, got.Context())
require.Equal(t, "/loki/api/v1/series", got.URL.Path)
@ -241,7 +241,7 @@ func Test_codec_series_EncodeRequest(t *testing.T) {
require.Equal(t, `{foo="bar"}`, got.URL.Query().Get("match[]"))
// testing a full roundtrip
req, err := lokiCodec.DecodeRequest(context.TODO(), got)
req, err := LokiCodec.DecodeRequest(context.TODO(), got)
require.NoError(t, err)
require.Equal(t, toEncode.Match, req.(*LokiSeriesRequest).Match)
require.Equal(t, toEncode.StartTs, req.(*LokiSeriesRequest).StartTs)
@ -256,7 +256,7 @@ func Test_codec_labels_EncodeRequest(t *testing.T) {
StartTs: start,
EndTs: end,
}
got, err := lokiCodec.EncodeRequest(ctx, toEncode)
got, err := LokiCodec.EncodeRequest(ctx, toEncode)
require.NoError(t, err)
require.Equal(t, ctx, got.Context())
require.Equal(t, "/loki/api/v1/labels", got.URL.Path)
@ -264,7 +264,7 @@ func Test_codec_labels_EncodeRequest(t *testing.T) {
require.Equal(t, fmt.Sprintf("%d", end.UnixNano()), got.URL.Query().Get("end"))
// testing a full roundtrip
req, err := lokiCodec.DecodeRequest(context.TODO(), got)
req, err := LokiCodec.DecodeRequest(context.TODO(), got)
require.NoError(t, err)
require.Equal(t, toEncode.StartTs, req.(*LokiLabelNamesRequest).StartTs)
require.Equal(t, toEncode.EndTs, req.(*LokiLabelNamesRequest).EndTs)
@ -344,7 +344,7 @@ func Test_codec_EncodeResponse(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := lokiCodec.EncodeResponse(context.TODO(), tt.res)
got, err := LokiCodec.EncodeResponse(context.TODO(), tt.res)
if (err != nil) != tt.wantErr {
t.Errorf("codec.EncodeResponse() error = %v, wantErr %v", err, tt.wantErr)
return
@ -810,7 +810,7 @@ func Test_codec_MergeResponse(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := lokiCodec.MergeResponse(tt.responses...)
got, err := LokiCodec.MergeResponse(tt.responses...)
if (err != nil) != tt.wantErr {
t.Errorf("codec.MergeResponse() error = %v, wantErr %v", err, tt.wantErr)
return

@ -72,7 +72,7 @@ func Test_seriesLimiter(t *testing.T) {
}
ctx := user.InjectOrgID(context.Background(), "1")
req, err := lokiCodec.EncodeRequest(ctx, lreq)
req, err := LokiCodec.EncodeRequest(ctx, lreq)
require.NoError(t, err)
req = req.WithContext(ctx)
@ -159,7 +159,7 @@ func Test_MaxQueryParallelism(t *testing.T) {
r, err := http.NewRequestWithContext(ctx, "GET", "/query_range", http.NoBody)
require.Nil(t, err)
_, _ = NewLimitedRoundTripper(f, lokiCodec, fakeLimits{maxQueryParallelism: maxQueryParallelism},
_, _ = NewLimitedRoundTripper(f, LokiCodec, fakeLimits{maxQueryParallelism: maxQueryParallelism},
queryrange.MiddlewareFunc(func(next queryrange.Handler) queryrange.Handler {
return queryrange.HandlerFunc(func(c context.Context, r queryrange.Request) (queryrange.Response, error) {
var wg sync.WaitGroup
@ -193,7 +193,7 @@ func Test_MaxQueryParallelismLateScheduling(t *testing.T) {
r, err := http.NewRequestWithContext(ctx, "GET", "/query_range", http.NoBody)
require.Nil(t, err)
_, _ = NewLimitedRoundTripper(f, lokiCodec, fakeLimits{maxQueryParallelism: maxQueryParallelism},
_, _ = NewLimitedRoundTripper(f, LokiCodec, fakeLimits{maxQueryParallelism: maxQueryParallelism},
queryrange.MiddlewareFunc(func(next queryrange.Handler) queryrange.Handler {
return queryrange.HandlerFunc(func(c context.Context, r queryrange.Request) (queryrange.Response, error) {
for i := 0; i < 10; i++ {

@ -153,7 +153,7 @@ func Test_astMapper(t *testing.T) {
resp, err := mware.Do(context.Background(), defaultReq().WithQuery(`{food="bar"}`))
require.Nil(t, err)
expected, err := lokiCodec.MergeResponse(lokiResps...)
expected, err := LokiCodec.MergeResponse(lokiResps...)
sort.Sort(logproto.Streams(expected.(*LokiResponse).Data.Result))
require.Nil(t, err)
require.Equal(t, called, 2)

@ -53,7 +53,7 @@ func NewTripperware(
shardingMetrics := logql.NewShardingMetrics(registerer)
splitByMetrics := NewSplitByMetrics(registerer)
metricsTripperware, cache, err := NewMetricTripperware(cfg, log, limits, schema, minShardingLookback, lokiCodec,
metricsTripperware, cache, err := NewMetricTripperware(cfg, log, limits, schema, minShardingLookback, LokiCodec,
PrometheusExtractor{}, instrumentMetrics, retryMetrics, shardingMetrics, splitByMetrics, registerer)
if err != nil {
return nil, nil, err
@ -61,17 +61,17 @@ func NewTripperware(
// NOTE: When we would start caching response from non-metric queries we would have to consider cache gen headers as well in
// MergeResponse implementation for Loki codecs same as it is done in Cortex at https://github.com/cortexproject/cortex/blob/21bad57b346c730d684d6d0205efef133422ab28/pkg/querier/queryrange/query_range.go#L170
logFilterTripperware, err := NewLogFilterTripperware(cfg, log, limits, schema, minShardingLookback, lokiCodec, instrumentMetrics, retryMetrics, shardingMetrics, splitByMetrics)
logFilterTripperware, err := NewLogFilterTripperware(cfg, log, limits, schema, minShardingLookback, LokiCodec, instrumentMetrics, retryMetrics, shardingMetrics, splitByMetrics)
if err != nil {
return nil, nil, err
}
seriesTripperware, err := NewSeriesTripperware(cfg, log, limits, lokiCodec, instrumentMetrics, retryMetrics, splitByMetrics)
seriesTripperware, err := NewSeriesTripperware(cfg, log, limits, LokiCodec, instrumentMetrics, retryMetrics, splitByMetrics)
if err != nil {
return nil, nil, err
}
labelsTripperware, err := NewLabelsTripperware(cfg, log, limits, lokiCodec, instrumentMetrics, retryMetrics, splitByMetrics)
labelsTripperware, err := NewLabelsTripperware(cfg, log, limits, LokiCodec, instrumentMetrics, retryMetrics, splitByMetrics)
if err != nil {
return nil, nil, err
}

@ -109,7 +109,7 @@ func TestMetricsTripperware(t *testing.T) {
}
ctx := user.InjectOrgID(context.Background(), "1")
req, err := lokiCodec.EncodeRequest(ctx, lreq)
req, err := LokiCodec.EncodeRequest(ctx, lreq)
require.NoError(t, err)
req = req.WithContext(ctx)
@ -138,7 +138,7 @@ func TestMetricsTripperware(t *testing.T) {
// 2 queries
require.Equal(t, 2, *count)
require.NoError(t, err)
lokiResponse, err := lokiCodec.DecodeResponse(ctx, resp, lreq)
lokiResponse, err := LokiCodec.DecodeResponse(ctx, resp, lreq)
require.NoError(t, err)
// testing cache
@ -148,7 +148,7 @@ func TestMetricsTripperware(t *testing.T) {
// 0 queries result are cached.
require.Equal(t, 0, *count)
require.NoError(t, err)
lokiCacheResponse, err := lokiCodec.DecodeResponse(ctx, cacheResp, lreq)
lokiCacheResponse, err := LokiCodec.DecodeResponse(ctx, cacheResp, lreq)
require.NoError(t, err)
require.Equal(t, lokiResponse.(*LokiPromResponse).Response, lokiCacheResponse.(*LokiPromResponse).Response)
@ -175,7 +175,7 @@ func TestLogFilterTripperware(t *testing.T) {
}
ctx := user.InjectOrgID(context.Background(), "1")
req, err := lokiCodec.EncodeRequest(ctx, lreq)
req, err := LokiCodec.EncodeRequest(ctx, lreq)
require.NoError(t, err)
req = req.WithContext(ctx)
@ -191,7 +191,7 @@ func TestLogFilterTripperware(t *testing.T) {
// set the query length back to normal
lreq.StartTs = testTime.Add(-6 * time.Hour)
req, err = lokiCodec.EncodeRequest(ctx, lreq)
req, err = LokiCodec.EncodeRequest(ctx, lreq)
require.NoError(t, err)
// testing retry
@ -221,7 +221,7 @@ func TestSeriesTripperware(t *testing.T) {
}
ctx := user.InjectOrgID(context.Background(), "1")
req, err := lokiCodec.EncodeRequest(ctx, lreq)
req, err := LokiCodec.EncodeRequest(ctx, lreq)
require.NoError(t, err)
req = req.WithContext(ctx)
@ -234,7 +234,7 @@ func TestSeriesTripperware(t *testing.T) {
// 2 queries
require.Equal(t, 2, *count)
require.NoError(t, err)
lokiSeriesResponse, err := lokiCodec.DecodeResponse(ctx, resp, lreq)
lokiSeriesResponse, err := LokiCodec.DecodeResponse(ctx, resp, lreq)
res, ok := lokiSeriesResponse.(*LokiSeriesResponse)
require.Equal(t, true, ok)
@ -262,7 +262,7 @@ func TestLabelsTripperware(t *testing.T) {
}
ctx := user.InjectOrgID(context.Background(), "1")
req, err := lokiCodec.EncodeRequest(ctx, lreq)
req, err := LokiCodec.EncodeRequest(ctx, lreq)
require.NoError(t, err)
req = req.WithContext(ctx)
@ -283,7 +283,7 @@ func TestLabelsTripperware(t *testing.T) {
// verify 2 calls have been made to downstream.
require.Equal(t, 2, handler.count)
require.NoError(t, err)
lokiLabelsResponse, err := lokiCodec.DecodeResponse(ctx, resp, lreq)
lokiLabelsResponse, err := LokiCodec.DecodeResponse(ctx, resp, lreq)
res, ok := lokiLabelsResponse.(*LokiLabelNamesResponse)
require.Equal(t, true, ok)
require.Equal(t, []string{"foo", "bar", "blop", "blip"}, res.Data)
@ -311,7 +311,7 @@ func TestLogNoRegex(t *testing.T) {
}
ctx := user.InjectOrgID(context.Background(), "1")
req, err := lokiCodec.EncodeRequest(ctx, lreq)
req, err := LokiCodec.EncodeRequest(ctx, lreq)
require.NoError(t, err)
req = req.WithContext(ctx)
@ -369,7 +369,7 @@ func TestRegexpParamsSupport(t *testing.T) {
}
ctx := user.InjectOrgID(context.Background(), "1")
req, err := lokiCodec.EncodeRequest(ctx, lreq)
req, err := LokiCodec.EncodeRequest(ctx, lreq)
require.NoError(t, err)
// fudge a regexp params
@ -448,7 +448,7 @@ func TestEntriesLimitsTripperware(t *testing.T) {
}
ctx := user.InjectOrgID(context.Background(), "1")
req, err := lokiCodec.EncodeRequest(ctx, lreq)
req, err := LokiCodec.EncodeRequest(ctx, lreq)
require.NoError(t, err)
req = req.WithContext(ctx)
@ -479,7 +479,7 @@ func TestEntriesLimitWithZeroTripperware(t *testing.T) {
}
ctx := user.InjectOrgID(context.Background(), "1")
req, err := lokiCodec.EncodeRequest(ctx, lreq)
req, err := LokiCodec.EncodeRequest(ctx, lreq)
require.NoError(t, err)
req = req.WithContext(ctx)

@ -295,7 +295,7 @@ func Test_splitByInterval_Do(t *testing.T) {
l := WithDefaultLimits(fakeLimits{}, queryrange.Config{SplitQueriesByInterval: time.Hour})
split := SplitByIntervalMiddleware(
l,
lokiCodec,
LokiCodec,
splitByTime,
nilMetrics,
).Wrap(next)
@ -463,7 +463,7 @@ func Test_series_splitByInterval_Do(t *testing.T) {
l := WithDefaultLimits(fakeLimits{}, queryrange.Config{SplitQueriesByInterval: time.Hour})
split := SplitByIntervalMiddleware(
l,
lokiCodec,
LokiCodec,
splitByTime,
nilMetrics,
).Wrap(next)
@ -544,7 +544,7 @@ func Test_ExitEarly(t *testing.T) {
l := WithDefaultLimits(fakeLimits{}, queryrange.Config{SplitQueriesByInterval: time.Hour})
split := SplitByIntervalMiddleware(
l,
lokiCodec,
LokiCodec,
splitByTime,
nilMetrics,
).Wrap(next)
@ -623,7 +623,7 @@ func Test_DoesntDeadlock(t *testing.T) {
}, queryrange.Config{SplitQueriesByInterval: time.Hour})
split := SplitByIntervalMiddleware(
l,
lokiCodec,
LokiCodec,
splitByTime,
nilMetrics,
).Wrap(next)

Loading…
Cancel
Save