Moves request parsing into the loghttp package (#1171)

* refactor query requests parsing

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* add tests and documentation

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
pull/1120/head^2
Cyril Tovena 6 years ago committed by GitHub
parent de040ca040
commit 1abe8841ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 21
      pkg/loghttp/labels.go
  2. 65
      pkg/loghttp/labels_test.go
  3. 123
      pkg/loghttp/params.go
  4. 123
      pkg/loghttp/params_test.go
  5. 94
      pkg/loghttp/query.go
  6. 112
      pkg/loghttp/query_test.go
  7. 33
      pkg/loghttp/tail.go
  8. 53
      pkg/loghttp/tail_test.go
  9. 273
      pkg/querier/http.go
  10. 90
      pkg/querier/http_test.go

@ -2,8 +2,12 @@ package loghttp
import (
"bytes"
"net/http"
"sort"
"strconv"
"github.com/gorilla/mux"
"github.com/grafana/loki/pkg/logproto"
)
// LabelResponse represents the http json response to a label query
@ -37,3 +41,20 @@ func (l LabelSet) String() string {
b.WriteByte('}')
return b.String()
}
// ParseLabelQuery parses a LabelRequest request from an http request.
func ParseLabelQuery(r *http.Request) (*logproto.LabelRequest, error) {
name, ok := mux.Vars(r)["name"]
req := &logproto.LabelRequest{
Values: ok,
Name: name,
}
start, end, err := bounds(r)
if err != nil {
return nil, err
}
req.Start = &start
req.End = &end
return req, nil
}

@ -0,0 +1,65 @@
package loghttp
import (
"net/http"
"reflect"
"testing"
"time"
"github.com/gorilla/mux"
"github.com/grafana/loki/pkg/logproto"
)
func TestParseLabelQuery(t *testing.T) {
t.Parallel()
tests := []struct {
name string
r *http.Request
want *logproto.LabelRequest
wantErr bool
}{
{"bad start", &http.Request{URL: mustParseURL(`?&start=t`)}, nil, true},
{"bad end", &http.Request{URL: mustParseURL(`?&start=2016-06-10T21:42:24.760738998Z&end=h`)}, nil, true},
{"good no name in the pah",
requestWithVar(&http.Request{
URL: mustParseURL(`?start=2017-06-10T21:42:24.760738998Z&end=2017-07-10T21:42:24.760738998Z`),
}, "name", "test"), &logproto.LabelRequest{
Name: "test",
Values: true,
Start: timePtr(time.Date(2017, 06, 10, 21, 42, 24, 760738998, time.UTC)),
End: timePtr(time.Date(2017, 07, 10, 21, 42, 24, 760738998, time.UTC)),
}, false},
{"good with name",
&http.Request{
URL: mustParseURL(`?start=2017-06-10T21:42:24.760738998Z&end=2017-07-10T21:42:24.760738998Z`),
}, &logproto.LabelRequest{
Name: "",
Values: false,
Start: timePtr(time.Date(2017, 06, 10, 21, 42, 24, 760738998, time.UTC)),
End: timePtr(time.Date(2017, 07, 10, 21, 42, 24, 760738998, time.UTC)),
}, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := ParseLabelQuery(tt.r)
if (err != nil) != tt.wantErr {
t.Errorf("ParseLabelQuery() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("ParseLabelQuery() = %v, want %v", got, tt.want)
}
})
}
}
func timePtr(t time.Time) *time.Time {
return &t
}
func requestWithVar(req *http.Request, name, value string) *http.Request {
return mux.SetURLVars(req, map[string]string{
name: value,
})
}

@ -0,0 +1,123 @@
package loghttp
import (
"fmt"
"math"
"net/http"
"strconv"
"strings"
"time"
"github.com/grafana/loki/pkg/logproto"
)
const (
defaultQueryLimit = 100
defaultSince = 1 * time.Hour
defaultStep = 1 // 1 seconds
)
func limit(r *http.Request) (uint32, error) {
l, err := parseInt(r.URL.Query().Get("limit"), defaultQueryLimit)
if err != nil {
return 0, err
}
return uint32(l), nil
}
func query(r *http.Request) string {
return r.URL.Query().Get("query")
}
func ts(r *http.Request) (time.Time, error) {
return parseTimestamp(r.URL.Query().Get("time"), time.Now())
}
func direction(r *http.Request) (logproto.Direction, error) {
return parseDirection(r.URL.Query().Get("direction"), logproto.BACKWARD)
}
func bounds(r *http.Request) (time.Time, time.Time, error) {
now := time.Now()
start, err := parseTimestamp(r.URL.Query().Get("start"), now.Add(-defaultSince))
if err != nil {
return time.Time{}, time.Time{}, err
}
end, err := parseTimestamp(r.URL.Query().Get("end"), now)
if err != nil {
return time.Time{}, time.Time{}, err
}
return start, end, nil
}
func step(r *http.Request, start, end time.Time) (time.Duration, error) {
s, err := parseInt(r.URL.Query().Get("step"), defaultQueryRangeStep(start, end))
if err != nil {
return 0, err
}
return time.Duration(s) * time.Second, nil
}
// defaultQueryRangeStep returns the default step used in the query range API,
// which is dinamically calculated based on the time range
func defaultQueryRangeStep(start time.Time, end time.Time) int {
return int(math.Max(math.Floor(end.Sub(start).Seconds()/250), 1))
}
func tailDelay(r *http.Request) (uint32, error) {
l, err := parseInt(r.URL.Query().Get("delay_for"), 0)
if err != nil {
return 0, err
}
return uint32(l), nil
}
// parseInt parses an int from a string
// if the value is empty it returns a default value passed as second parameter
func parseInt(value string, def int) (int, error) {
if value == "" {
return def, nil
}
return strconv.Atoi(value)
}
// parseUnixNano parses a ns unix timestamp from a string
// if the value is empty it returns a default value passed as second parameter
func parseTimestamp(value string, def time.Time) (time.Time, error) {
if value == "" {
return def, nil
}
if strings.Contains(value, ".") {
if t, err := strconv.ParseFloat(value, 64); err == nil {
s, ns := math.Modf(t)
ns = math.Round(ns*1000) / 1000
return time.Unix(int64(s), int64(ns*float64(time.Second))), nil
}
}
nanos, err := strconv.ParseInt(value, 10, 64)
if err != nil {
if ts, err := time.Parse(time.RFC3339Nano, value); err == nil {
return ts, nil
}
return time.Time{}, err
}
if len(value) <= 10 {
return time.Unix(nanos, 0), nil
}
return time.Unix(0, nanos), nil
}
// parseDirection parses a logproto.Direction from a string
// if the value is empty it returns a default value passed as second parameter
func parseDirection(value string, def logproto.Direction) (logproto.Direction, error) {
if value == "" {
return def, nil
}
d, ok := logproto.Direction_value[strings.ToUpper(value)]
if !ok {
return logproto.FORWARD, fmt.Errorf("invalid direction '%s'", value)
}
return logproto.Direction(d), nil
}

@ -0,0 +1,123 @@
package loghttp
import (
"net/http/httptest"
"reflect"
"testing"
"time"
"github.com/grafana/loki/pkg/logproto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestHttp_defaultQueryRangeStep(t *testing.T) {
t.Parallel()
tests := map[string]struct {
start time.Time
end time.Time
expected int
}{
"should not be lower then 1s": {
start: time.Unix(60, 0),
end: time.Unix(60, 0),
expected: 1,
},
"should return 1s if input time range is 5m": {
start: time.Unix(60, 0),
end: time.Unix(360, 0),
expected: 1,
},
"should return 14s if input time range is 1h": {
start: time.Unix(60, 0),
end: time.Unix(3660, 0),
expected: 14,
},
}
for testName, testData := range tests {
testData := testData
t.Run(testName, func(t *testing.T) {
assert.Equal(t, testData.expected, defaultQueryRangeStep(testData.start, testData.end))
})
}
}
func TestHttp_ParseRangeQuery_Step(t *testing.T) {
t.Parallel()
tests := map[string]struct {
reqPath string
expected *RangeQuery
}{
"should set the default step based on the input time range if the step parameter is not provided": {
reqPath: "/loki/api/v1/query_range?query={}&start=0&end=3600000000000",
expected: &RangeQuery{
Query: "{}",
Start: time.Unix(0, 0),
End: time.Unix(3600, 0),
Step: 14 * time.Second,
Limit: 100,
Direction: logproto.BACKWARD,
},
},
"should use the input step parameter if provided": {
reqPath: "/loki/api/v1/query_range?query={}&start=0&end=3600000000000&step=5",
expected: &RangeQuery{
Query: "{}",
Start: time.Unix(0, 0),
End: time.Unix(3600, 0),
Step: 5 * time.Second,
Limit: 100,
Direction: logproto.BACKWARD,
},
},
}
for testName, testData := range tests {
testData := testData
t.Run(testName, func(t *testing.T) {
req := httptest.NewRequest("GET", testData.reqPath, nil)
actual, err := ParseRangeQuery(req)
require.NoError(t, err)
assert.Equal(t, testData.expected, actual)
})
}
}
func Test_parseTimestamp(t *testing.T) {
now := time.Now()
tests := []struct {
name string
value string
def time.Time
want time.Time
wantErr bool
}{
{"default", "", now, now, false},
{"unix timestamp", "1571332130", now, time.Unix(1571332130, 0), false},
{"unix nano timestamp", "1571334162051000000", now, time.Unix(0, 1571334162051000000), false},
{"unix timestamp with subseconds", "1571332130.934", now, time.Unix(1571332130, 934*1e6), false},
{"RFC3339 format", "2002-10-02T15:00:00Z", now, time.Date(2002, 10, 02, 15, 0, 0, 0, time.UTC), false},
{"RFC3339nano format", "2009-11-10T23:00:00.000000001Z", now, time.Date(2009, 11, 10, 23, 0, 0, 1, time.UTC), false},
{"invalid", "we", now, time.Time{}, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := parseTimestamp(tt.value, tt.def)
if (err != nil) != tt.wantErr {
t.Errorf("parseTimestamp() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("parseTimestamp() = %v, want %v", got, tt.want)
}
})
}
}

@ -2,13 +2,22 @@ package loghttp
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"strconv"
"time"
"github.com/grafana/loki/pkg/logproto"
"github.com/prometheus/common/model"
)
var (
errEndBeforeStart = errors.New("end timestamp must not be before start time")
errNegativeStep = errors.New("zero or negative query resolution step widths are not accepted. Try a positive integer")
errStepTooSmall = errors.New("exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)")
)
// QueryStatus holds the status of a query
type QueryStatus string
@ -149,3 +158,88 @@ type Vector []model.Sample
// Matrix is a slice of SampleStreams
type Matrix []model.SampleStream
// InstantQuery defines a log instant query.
type InstantQuery struct {
Query string
Ts time.Time
Limit uint32
Direction logproto.Direction
}
// ParseInstantQuery parses an InstantQuery request from an http request.
func ParseInstantQuery(r *http.Request) (*InstantQuery, error) {
var err error
request := &InstantQuery{
Query: query(r),
}
request.Limit, err = limit(r)
if err != nil {
return nil, err
}
request.Ts, err = ts(r)
if err != nil {
return nil, err
}
request.Direction, err = direction(r)
if err != nil {
return nil, err
}
return request, nil
}
// RangeQuery defines a log range query.
type RangeQuery struct {
Start time.Time
End time.Time
Step time.Duration
Query string
Direction logproto.Direction
Limit uint32
}
// ParseRangeQuery parses a RangeQuery request from an http request.
func ParseRangeQuery(r *http.Request) (*RangeQuery, error) {
var result RangeQuery
var err error
result.Query = query(r)
result.Start, result.End, err = bounds(r)
if err != nil {
return nil, err
}
if result.End.Before(result.Start) {
return nil, errEndBeforeStart
}
result.Limit, err = limit(r)
if err != nil {
return nil, err
}
result.Direction, err = direction(r)
if err != nil {
return nil, err
}
result.Step, err = step(r, result.Start, result.End)
if err != nil {
return nil, err
}
if result.Step <= 0 {
return nil, errNegativeStep
}
// For safety, limit the number of returned points per timeseries.
// This is sufficient for 60s resolution for a week or 1h resolution for a year.
if (result.End.Sub(result.Start) / result.Step) > 11000 {
return nil, errStepTooSmall
}
return &result, nil
}

@ -0,0 +1,112 @@
package loghttp
import (
"net/http"
"net/url"
"reflect"
"testing"
"time"
"github.com/grafana/loki/pkg/logproto"
)
func TestParseRangeQuery(t *testing.T) {
t.Parallel()
tests := []struct {
name string
r *http.Request
want *RangeQuery
wantErr bool
}{
{"bad start", &http.Request{URL: mustParseURL(`?query={foo="bar"}&start=t`)}, nil, true},
{"bad end", &http.Request{URL: mustParseURL(`?query={foo="bar"}&end=t`)}, nil, true},
{"end before start", &http.Request{URL: mustParseURL(`?query={foo="bar"}&start=2016-06-10T21:42:24.760738998Z&end=2015-06-10T21:42:24.760738998Z`)}, nil, true},
{"bad limit", &http.Request{URL: mustParseURL(`?query={foo="bar"}&start=2016-06-10T21:42:24.760738998Z&end=2017-06-10T21:42:24.760738998Z&limit=h`)}, nil, true},
{"bad direction",
&http.Request{
URL: mustParseURL(`?query={foo="bar"}&start=2016-06-10T21:42:24.760738998Z&end=2017-06-10T21:42:24.760738998Z&limit=100&direction=fw`),
}, nil, true},
{"bad step",
&http.Request{
URL: mustParseURL(`?query={foo="bar"}&start=2016-06-10T21:42:24.760738998Z&end=2017-06-10T21:42:24.760738998Z&limit=100&direction=FORWARD&step=h`),
}, nil, true},
{"negative step",
&http.Request{
URL: mustParseURL(`?query={foo="bar"}&start=2016-06-10T21:42:24.760738998Z&end=2017-06-10T21:42:24.760738998Z&limit=100&direction=BACKWARD&step=-1`),
}, nil, true},
{"too small step",
&http.Request{
URL: mustParseURL(`?query={foo="bar"}&start=2016-06-10T21:42:24.760738998Z&end=2017-06-10T21:42:24.760738998Z&limit=100&direction=BACKWARD&step=1`),
}, nil, true},
{"good",
&http.Request{
URL: mustParseURL(`?query={foo="bar"}&start=2017-06-10T21:42:24.760738998Z&end=2017-07-10T21:42:24.760738998Z&limit=1000&direction=BACKWARD&step=3600`),
}, &RangeQuery{
Step: time.Hour,
Query: `{foo="bar"}`,
Direction: logproto.BACKWARD,
Start: time.Date(2017, 06, 10, 21, 42, 24, 760738998, time.UTC),
End: time.Date(2017, 07, 10, 21, 42, 24, 760738998, time.UTC),
Limit: 1000,
}, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := ParseRangeQuery(tt.r)
if (err != nil) != tt.wantErr {
t.Errorf("ParseRangeQuery() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("ParseRangeQuery() = %v, want %v", got, tt.want)
}
})
}
}
func TestParseInstantQuery(t *testing.T) {
tests := []struct {
name string
r *http.Request
want *InstantQuery
wantErr bool
}{
{"bad time", &http.Request{URL: mustParseURL(`?query={foo="bar"}&time=t`)}, nil, true},
{"bad limit", &http.Request{URL: mustParseURL(`?query={foo="bar"}&time=2016-06-10T21:42:24.760738998Z&limit=h`)}, nil, true},
{"bad direction",
&http.Request{
URL: mustParseURL(`?query={foo="bar"}&time=2016-06-10T21:42:24.760738998Z&limit=100&direction=fw`),
}, nil, true},
{"good",
&http.Request{
URL: mustParseURL(`?query={foo="bar"}&time=2017-06-10T21:42:24.760738998Z&limit=1000&direction=BACKWARD`),
}, &InstantQuery{
Query: `{foo="bar"}`,
Direction: logproto.BACKWARD,
Ts: time.Date(2017, 06, 10, 21, 42, 24, 760738998, time.UTC),
Limit: 1000,
}, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := ParseInstantQuery(tt.r)
if (err != nil) != tt.wantErr {
t.Errorf("ParseInstantQuery() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("ParseInstantQuery() = %v, want %v", got, tt.want)
}
})
}
}
func mustParseURL(u string) *url.URL {
url, err := url.Parse(u)
if err != nil {
panic(err)
}
return url
}

@ -3,8 +3,15 @@ package loghttp
import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"time"
"github.com/grafana/loki/pkg/logproto"
)
const (
maxDelayForInTailing = 5
)
// TailResponse represents the http json response to a tail query
@ -53,3 +60,29 @@ func (s *DroppedStream) UnmarshalJSON(data []byte) error {
return nil
}
// ParseTailQuery parses a TailRequest request from an http request.
func ParseTailQuery(r *http.Request) (*logproto.TailRequest, error) {
var err error
req := logproto.TailRequest{
Query: query(r),
}
req.Limit, err = limit(r)
if err != nil {
return nil, err
}
req.Start, _, err = bounds(r)
if err != nil {
return nil, err
}
req.DelayFor, err = tailDelay(r)
if err != nil {
return nil, err
}
if req.DelayFor > maxDelayForInTailing {
return nil, fmt.Errorf("delay_for can't be greater than %d", maxDelayForInTailing)
}
return &req, nil
}

@ -0,0 +1,53 @@
package loghttp
import (
"net/http"
"reflect"
"testing"
"time"
"github.com/grafana/loki/pkg/logproto"
)
func TestParseTailQuery(t *testing.T) {
t.Parallel()
tests := []struct {
name string
r *http.Request
want *logproto.TailRequest
wantErr bool
}{
{"bad time", &http.Request{URL: mustParseURL(`?query={foo="bar"}&start=t`)}, nil, true},
{"bad limit", &http.Request{URL: mustParseURL(`?query={foo="bar"}&start=2016-06-10T21:42:24.760738998Z&limit=h`)}, nil, true},
{"bad delay",
&http.Request{
URL: mustParseURL(`?query={foo="bar"}&time=2016-06-10T21:42:24.760738998Z&limit=100&delay_for=fw`),
}, nil, true},
{"too much delay",
&http.Request{
URL: mustParseURL(`?query={foo="bar"}&time=2016-06-10T21:42:24.760738998Z&limit=100&delay_for=20`),
}, nil, true},
{"good",
&http.Request{
URL: mustParseURL(`?query={foo="bar"}&start=2017-06-10T21:42:24.760738998Z&limit=1000&delay_for=5`),
}, &logproto.TailRequest{
Query: `{foo="bar"}`,
DelayFor: 5,
Start: time.Date(2017, 06, 10, 21, 42, 24, 760738998, time.UTC),
Limit: 1000,
}, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := ParseTailQuery(tt.r)
if (err != nil) != tt.wantErr {
t.Errorf("ParseTailQuery() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("ParseTailQuery() = %v, want %v", got, tt.want)
}
})
}
}

@ -2,239 +2,44 @@ package querier
import (
"context"
"fmt"
"math"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"github.com/grafana/loki/pkg/loghttp"
loghttp_legacy "github.com/grafana/loki/pkg/loghttp/legacy"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/marshal"
marshal_legacy "github.com/grafana/loki/pkg/logql/marshal/legacy"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/httpgrpc/server"
)
const (
defaultQueryLimit = 100
defaultSince = 1 * time.Hour
wsPingPeriod = 1 * time.Second
maxDelayForInTailing = 5
wsPingPeriod = 1 * time.Second
)
// nolint
func intParam(values url.Values, name string, def int) (int, error) {
value := values.Get(name)
if value == "" {
return def, nil
}
return strconv.Atoi(value)
}
func unixNanoTimeParam(values url.Values, name string, def time.Time) (time.Time, error) {
value := values.Get(name)
if value == "" {
return def, nil
}
if strings.Contains(value, ".") {
if t, err := strconv.ParseFloat(value, 64); err == nil {
s, ns := math.Modf(t)
ns = math.Round(ns*1000) / 1000
return time.Unix(int64(s), int64(ns*float64(time.Second))), nil
}
}
nanos, err := strconv.ParseInt(value, 10, 64)
if err != nil {
if ts, err := time.Parse(time.RFC3339Nano, value); err == nil {
return ts, nil
}
return time.Time{}, err
}
if len(value) <= 10 {
return time.Unix(nanos, 0), nil
}
return time.Unix(0, nanos), nil
}
// nolint
func directionParam(values url.Values, name string, def logproto.Direction) (logproto.Direction, error) {
value := values.Get(name)
if value == "" {
return def, nil
}
d, ok := logproto.Direction_value[strings.ToUpper(value)]
if !ok {
return logproto.FORWARD, fmt.Errorf("invalid direction '%s'", value)
}
return logproto.Direction(d), nil
}
// defaultQueryRangeStep returns the default step used in the query range API,
// which is dinamically calculated based on the time range
func defaultQueryRangeStep(start time.Time, end time.Time) int {
return int(math.Max(math.Floor(end.Sub(start).Seconds()/250), 1))
}
func httpRequestToInstantQueryRequest(httpRequest *http.Request) (*instantQueryRequest, error) {
params := httpRequest.URL.Query()
queryRequest := instantQueryRequest{
query: params.Get("query"),
}
limit, err := intParam(params, "limit", defaultQueryLimit)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
queryRequest.limit = uint32(limit)
queryRequest.ts, err = unixNanoTimeParam(params, "time", time.Now())
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
queryRequest.direction, err = directionParam(params, "direction", logproto.BACKWARD)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
return &queryRequest, nil
}
func httpRequestToRangeQueryRequest(httpRequest *http.Request) (*rangeQueryRequest, error) {
var err error
params := httpRequest.URL.Query()
queryRequest := rangeQueryRequest{
query: params.Get("query"),
}
queryRequest.limit, queryRequest.start, queryRequest.end, err = httpRequestToLookback(httpRequest)
if err != nil {
return nil, err
}
step, err := intParam(params, "step", defaultQueryRangeStep(queryRequest.start, queryRequest.end))
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
queryRequest.step = time.Duration(step) * time.Second
queryRequest.direction, err = directionParam(params, "direction", logproto.BACKWARD)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
return &queryRequest, nil
}
func httpRequestToTailRequest(httpRequest *http.Request) (*logproto.TailRequest, error) {
params := httpRequest.URL.Query()
tailRequest := logproto.TailRequest{
Query: params.Get("query"),
}
var err error
tailRequest.Limit, tailRequest.Start, _, err = httpRequestToLookback(httpRequest)
if err != nil {
return nil, err
}
// delay_for is used to allow server to let slow loggers catch up.
// Entries would be accumulated in a heap until they become older than now()-<delay_for>
delayFor, err := intParam(params, "delay_for", 0)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
tailRequest.DelayFor = uint32(delayFor)
return &tailRequest, nil
}
func httpRequestToLookback(httpRequest *http.Request) (limit uint32, start, end time.Time, err error) {
params := httpRequest.URL.Query()
now := time.Now()
lim, err := intParam(params, "limit", defaultQueryLimit)
if err != nil {
return 0, time.Now(), time.Now(), httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
limit = uint32(lim)
start, err = unixNanoTimeParam(params, "start", now.Add(-defaultSince))
if err != nil {
return 0, time.Now(), time.Now(), httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
end, err = unixNanoTimeParam(params, "end", now)
if err != nil {
return 0, time.Now(), time.Now(), httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
return
}
// parseRegexQuery parses regex and query querystring from httpRequest and returns the combined LogQL query.
// This is used only to keep regexp query string support until it gets fully deprecated.
func parseRegexQuery(httpRequest *http.Request) (string, error) {
params := httpRequest.URL.Query()
query := params.Get("query")
regexp := params.Get("regexp")
if regexp != "" {
expr, err := logql.ParseLogSelector(query)
if err != nil {
return "", err
}
query = logql.NewFilterExpr(expr, labels.MatchRegexp, regexp).String()
}
return query, nil
}
type QueryResponse struct {
ResultType promql.ValueType `json:"resultType"`
Result promql.Value `json:"result"`
}
type rangeQueryRequest struct {
query string
start, end time.Time
step time.Duration
limit uint32
direction logproto.Direction
}
type instantQueryRequest struct {
query string
ts time.Time
limit uint32
direction logproto.Direction
}
// RangeQueryHandler is a http.HandlerFunc for range queries.
func (q *Querier) RangeQueryHandler(w http.ResponseWriter, r *http.Request) {
// Enforce the query timeout while querying backends
ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(q.cfg.QueryTimeout))
defer cancel()
request, err := httpRequestToRangeQueryRequest(r)
request, err := loghttp.ParseRangeQuery(r)
if err != nil {
server.WriteError(w, err)
http.Error(w, httpgrpc.Errorf(http.StatusBadRequest, err.Error()).Error(), http.StatusBadRequest)
return
}
query := q.engine.NewRangeQuery(q, request.query, request.start, request.end, request.step, request.direction, request.limit)
query := q.engine.NewRangeQuery(q, request.Query, request.Start, request.End, request.Step, request.Direction, request.Limit)
result, err := query.Exec(ctx)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
@ -253,12 +58,12 @@ func (q *Querier) InstantQueryHandler(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(q.cfg.QueryTimeout))
defer cancel()
request, err := httpRequestToInstantQueryRequest(r)
request, err := loghttp.ParseInstantQuery(r)
if err != nil {
server.WriteError(w, err)
http.Error(w, httpgrpc.Errorf(http.StatusBadRequest, err.Error()).Error(), http.StatusBadRequest)
return
}
query := q.engine.NewInstantQuery(q, request.query, request.ts, request.direction, request.limit)
query := q.engine.NewInstantQuery(q, request.Query, request.Ts, request.Direction, request.Limit)
result, err := query.Exec(ctx)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
@ -277,18 +82,18 @@ func (q *Querier) LogQueryHandler(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(q.cfg.QueryTimeout))
defer cancel()
request, err := httpRequestToRangeQueryRequest(r)
request, err := loghttp.ParseRangeQuery(r)
if err != nil {
server.WriteError(w, err)
http.Error(w, httpgrpc.Errorf(http.StatusBadRequest, err.Error()).Error(), http.StatusBadRequest)
return
}
request.query, err = parseRegexQuery(r)
request.Query, err = parseRegexQuery(r)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
http.Error(w, httpgrpc.Errorf(http.StatusBadRequest, err.Error()).Error(), http.StatusBadRequest)
return
}
query := q.engine.NewRangeQuery(q, request.query, request.start, request.end, request.step, request.direction, request.limit)
query := q.engine.NewRangeQuery(q, request.Query, request.Start, request.End, request.Step, request.Direction, request.Limit)
result, err := query.Exec(ctx)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
@ -303,27 +108,11 @@ func (q *Querier) LogQueryHandler(w http.ResponseWriter, r *http.Request) {
// LabelHandler is a http.HandlerFunc for handling label queries.
func (q *Querier) LabelHandler(w http.ResponseWriter, r *http.Request) {
name, ok := mux.Vars(r)["name"]
params := r.URL.Query()
now := time.Now()
req := &logproto.LabelRequest{
Values: ok,
Name: name,
}
end, err := unixNanoTimeParam(params, "end", now)
if err != nil {
http.Error(w, httpgrpc.Errorf(http.StatusBadRequest, err.Error()).Error(), http.StatusBadRequest)
return
}
req.End = &end
start, err := unixNanoTimeParam(params, "start", end.Add(-6*time.Hour))
req, err := loghttp.ParseLabelQuery(r)
if err != nil {
http.Error(w, httpgrpc.Errorf(http.StatusBadRequest, err.Error()).Error(), http.StatusBadRequest)
return
}
req.Start = &start
resp, err := q.Label(r.Context(), req)
if err != nil {
@ -348,24 +137,18 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) {
CheckOrigin: func(r *http.Request) bool { return true },
}
tailRequestPtr, err := httpRequestToTailRequest(r)
req, err := loghttp.ParseTailQuery(r)
if err != nil {
server.WriteError(w, err)
http.Error(w, httpgrpc.Errorf(http.StatusBadRequest, err.Error()).Error(), http.StatusBadRequest)
return
}
tailRequestPtr.Query, err = parseRegexQuery(r)
req.Query, err = parseRegexQuery(r)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if tailRequestPtr.DelayFor > maxDelayForInTailing {
server.WriteError(w, fmt.Errorf("delay_for can't be greater than %d", maxDelayForInTailing))
level.Error(util.Logger).Log("msg", "Error in upgrading websocket", "err", err)
return
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
level.Error(util.Logger).Log("msg", "Error in upgrading websocket", "err", err)
@ -378,11 +161,7 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) {
}
}()
// response from httpRequestToQueryRequest is a ptr, if we keep passing pointer down the call then it would stay on
// heap until connection to websocket stays open
tailRequest := *tailRequestPtr
tailer, err := q.Tail(r.Context(), &tailRequest)
tailer, err := q.Tail(r.Context(), req)
if err != nil {
if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {
level.Error(util.Logger).Log("msg", "Error connecting to ingesters for tailing", "err", err)
@ -437,3 +216,19 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) {
}
}
}
// parseRegexQuery parses regex and query querystring from httpRequest and returns the combined LogQL query.
// This is used only to keep regexp query string support until it gets fully deprecated.
func parseRegexQuery(httpRequest *http.Request) (string, error) {
params := httpRequest.URL.Query()
query := params.Get("query")
regexp := params.Get("regexp")
if regexp != "" {
expr, err := logql.ParseLogSelector(query)
if err != nil {
return "", err
}
query = logql.NewFilterExpr(expr, labels.MatchRegexp, regexp).String()
}
return query, nil
}

@ -1,90 +0,0 @@
package querier
import (
"net/http/httptest"
"testing"
"time"
"github.com/grafana/loki/pkg/logproto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestHttp_defaultQueryRangeStep(t *testing.T) {
t.Parallel()
tests := map[string]struct {
start time.Time
end time.Time
expected int
}{
"should not be lower then 1s": {
start: time.Unix(60, 0),
end: time.Unix(60, 0),
expected: 1,
},
"should return 1s if input time range is 5m": {
start: time.Unix(60, 0),
end: time.Unix(360, 0),
expected: 1,
},
"should return 14s if input time range is 1h": {
start: time.Unix(60, 0),
end: time.Unix(3660, 0),
expected: 14,
},
}
for testName, testData := range tests {
testData := testData
t.Run(testName, func(t *testing.T) {
assert.Equal(t, testData.expected, defaultQueryRangeStep(testData.start, testData.end))
})
}
}
func TestHttp_httpRequestToRangeQueryRequest(t *testing.T) {
t.Parallel()
tests := map[string]struct {
reqPath string
expected *rangeQueryRequest
}{
"should set the default step based on the input time range if the step parameter is not provided": {
reqPath: "/loki/api/v1/query_range?query={}&start=0&end=3600000000000",
expected: &rangeQueryRequest{
query: "{}",
start: time.Unix(0, 0),
end: time.Unix(3600, 0),
step: 14 * time.Second,
limit: 100,
direction: logproto.BACKWARD,
},
},
"should use the input step parameter if provided": {
reqPath: "/loki/api/v1/query_range?query={}&start=0&end=3600000000000&step=5",
expected: &rangeQueryRequest{
query: "{}",
start: time.Unix(0, 0),
end: time.Unix(3600, 0),
step: 5 * time.Second,
limit: 100,
direction: logproto.BACKWARD,
},
},
}
for testName, testData := range tests {
testData := testData
t.Run(testName, func(t *testing.T) {
req := httptest.NewRequest("GET", testData.reqPath, nil)
actual, err := httpRequestToRangeQueryRequest(req)
require.NoError(t, err)
assert.Equal(t, testData.expected, actual)
})
}
}
Loading…
Cancel
Save