|
|
|
@ -3,8 +3,6 @@ package queryrange |
|
|
|
|
import ( |
|
|
|
|
"flag" |
|
|
|
|
"net/http" |
|
|
|
|
"net/url" |
|
|
|
|
"strconv" |
|
|
|
|
"strings" |
|
|
|
|
|
|
|
|
|
"github.com/cortexproject/cortex/pkg/chunk/cache" |
|
|
|
@ -17,6 +15,7 @@ import ( |
|
|
|
|
"github.com/weaveworks/common/httpgrpc" |
|
|
|
|
"github.com/weaveworks/common/user" |
|
|
|
|
|
|
|
|
|
"github.com/grafana/loki/pkg/loghttp" |
|
|
|
|
"github.com/grafana/loki/pkg/logql" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
@ -55,60 +54,87 @@ func NewTripperware(cfg Config, log log.Logger, limits Limits, registerer promet |
|
|
|
|
return func(next http.RoundTripper) http.RoundTripper { |
|
|
|
|
metricRT := metricsTripperware(next) |
|
|
|
|
logFilterRT := logFilterTripperware(next) |
|
|
|
|
return frontend.RoundTripFunc(func(req *http.Request) (*http.Response, error) { |
|
|
|
|
if !strings.HasSuffix(req.URL.Path, "/query_range") && !strings.HasSuffix(req.URL.Path, "/prom/query") { |
|
|
|
|
return next.RoundTrip(req) |
|
|
|
|
} |
|
|
|
|
params := req.URL.Query() |
|
|
|
|
query := params.Get("query") |
|
|
|
|
expr, err := logql.ParseExpr(query) |
|
|
|
|
if err != nil { |
|
|
|
|
// weavework server uses httpgrpc errors for status code.
|
|
|
|
|
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if _, ok := expr.(logql.SampleExpr); ok { |
|
|
|
|
return metricRT.RoundTrip(req) |
|
|
|
|
} |
|
|
|
|
if logSelector, ok := expr.(logql.LogSelectorExpr); ok { |
|
|
|
|
if err := validateLimits(req, params, limits); err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// backport the old regexp params into the query params
|
|
|
|
|
regexp := params.Get("regexp") |
|
|
|
|
if regexp != "" { |
|
|
|
|
logSelector = logql.NewFilterExpr(logSelector, labels.MatchRegexp, regexp) |
|
|
|
|
params.Set("query", logSelector.String()) |
|
|
|
|
req.URL.RawQuery = params.Encode() |
|
|
|
|
} |
|
|
|
|
filter, err := logSelector.Filter() |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) |
|
|
|
|
} |
|
|
|
|
if filter != nil { |
|
|
|
|
return logFilterRT.RoundTrip(req) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return next.RoundTrip(req) |
|
|
|
|
}) |
|
|
|
|
return newRoundTripper(next, logFilterRT, metricRT, limits) |
|
|
|
|
}, cache, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// validates log entries limits
|
|
|
|
|
func validateLimits(req *http.Request, params url.Values, limits Limits) error { |
|
|
|
|
userID, err := user.ExtractOrgID(req.Context()) |
|
|
|
|
type roundTripper struct { |
|
|
|
|
next, log, metric http.RoundTripper |
|
|
|
|
|
|
|
|
|
limits Limits |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// newRoundTripper creates a new queryrange roundtripper
|
|
|
|
|
func newRoundTripper(next, log, metric http.RoundTripper, limits Limits) roundTripper { |
|
|
|
|
return roundTripper{ |
|
|
|
|
log: log, |
|
|
|
|
limits: limits, |
|
|
|
|
metric: metric, |
|
|
|
|
next: next, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { |
|
|
|
|
if !strings.HasSuffix(req.URL.Path, "/query_range") && !strings.HasSuffix(req.URL.Path, "/prom/query") { |
|
|
|
|
return r.next.RoundTrip(req) |
|
|
|
|
} |
|
|
|
|
err := req.ParseForm() |
|
|
|
|
if err != nil { |
|
|
|
|
return httpgrpc.Errorf(http.StatusBadRequest, err.Error()) |
|
|
|
|
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) |
|
|
|
|
} |
|
|
|
|
rangeQuery, err := loghttp.ParseRangeQuery(req) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) |
|
|
|
|
} |
|
|
|
|
expr, err := logql.ParseExpr(rangeQuery.Query) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) |
|
|
|
|
} |
|
|
|
|
switch e := expr.(type) { |
|
|
|
|
case logql.SampleExpr: |
|
|
|
|
return r.metric.RoundTrip(req) |
|
|
|
|
case logql.LogSelectorExpr: |
|
|
|
|
filter, err := transformRegexQuery(req, e).Filter() |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) |
|
|
|
|
} |
|
|
|
|
if err := validateLimits(req, rangeQuery.Limit, r.limits); err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
if filter == nil { |
|
|
|
|
return r.next.RoundTrip(req) |
|
|
|
|
} |
|
|
|
|
return r.log.RoundTrip(req) |
|
|
|
|
|
|
|
|
|
reqLimit, err := strconv.Atoi(params.Get("limit")) |
|
|
|
|
default: |
|
|
|
|
return r.next.RoundTrip(req) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// transformRegexQuery backport the old regexp params into the v1 query format
|
|
|
|
|
func transformRegexQuery(req *http.Request, expr logql.LogSelectorExpr) logql.LogSelectorExpr { |
|
|
|
|
regexp := req.Form.Get("regexp") |
|
|
|
|
if regexp != "" { |
|
|
|
|
expr = logql.NewFilterExpr(expr, labels.MatchRegexp, regexp) |
|
|
|
|
params := req.URL.Query() |
|
|
|
|
params.Set("query", expr.String()) |
|
|
|
|
req.URL.RawQuery = params.Encode() |
|
|
|
|
// force the form and query to be parsed again.
|
|
|
|
|
req.Form = nil |
|
|
|
|
req.PostForm = nil |
|
|
|
|
} |
|
|
|
|
return expr |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// validates log entries limits
|
|
|
|
|
func validateLimits(req *http.Request, reqLimit uint32, limits Limits) error { |
|
|
|
|
userID, err := user.ExtractOrgID(req.Context()) |
|
|
|
|
if err != nil { |
|
|
|
|
return httpgrpc.Errorf(http.StatusBadRequest, err.Error()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
maxEntriesLimit := limits.MaxEntriesLimitPerQuery(userID) |
|
|
|
|
if reqLimit > maxEntriesLimit && maxEntriesLimit != 0 { |
|
|
|
|
if int(reqLimit) > maxEntriesLimit && maxEntriesLimit != 0 { |
|
|
|
|
return httpgrpc.Errorf(http.StatusBadRequest, |
|
|
|
|
"max entries limit per query exceeded, limit > max_entries_limit (%d > %d)", reqLimit, maxEntriesLimit) |
|
|
|
|
} |
|
|
|
|