Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/engine/handler.go

171 lines
5.4 KiB

package engine
import (
"context"
"errors"
"fmt"
"net/http"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/tenant"
"github.com/grafana/loki/v3/pkg/logql"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/logqlmodel"
querier_limits "github.com/grafana/loki/v3/pkg/querier/limits"
"github.com/grafana/loki/v3/pkg/querier/queryrange"
"github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase"
utillog "github.com/grafana/loki/v3/pkg/util/log"
util_validation "github.com/grafana/loki/v3/pkg/util/validation"
)
// Handler returns an [http.Handler] for serving queries. Unsupported queries
// will result in an error.
func Handler(cfg Config, logger log.Logger, engine *Engine, limits querier_limits.Limits) http.Handler {
return executorHandler(cfg, logger, engine, limits)
}
// queryExecutor is an interface implemented by [Engine] for mocking in tests.
type queryExecutor interface {
Execute(ctx context.Context, params logql.Params) (logqlmodel.Result, error)
}
func executorHandler(cfg Config, logger log.Logger, exec queryExecutor, limits querier_limits.Limits) http.Handler {
h := &queryHandler{
cfg: cfg,
logger: logger,
exec: exec,
limits: limits,
}
return queryrange.NewSerializeHTTPHandler(h, queryrange.DefaultCodec)
}
type queryHandler struct {
cfg Config
logger log.Logger
exec queryExecutor
limits querier_limits.Limits
}
var _ queryrangebase.Handler = (*queryHandler)(nil)
func (h *queryHandler) Do(ctx context.Context, req queryrangebase.Request) (queryrangebase.Response, error) {
// TODO(rfratto): Can this be removed by making [querier.Handler] and
// [querier.QuerierAPI] more generic?
switch req := req.(type) {
case *queryrange.LokiRequest:
res, err := h.doRequest(ctx, req)
if err != nil {
return nil, err
}
params, err := queryrange.ParamsFromRequest(req)
if err != nil {
return nil, err
}
return queryrange.ResultToResponse(res, params)
case *queryrange.LokiInstantRequest:
res, err := h.doInstantRequest(ctx, req)
if err != nil {
return nil, err
}
params, err := queryrange.ParamsFromRequest(req)
if err != nil {
return nil, err
}
return queryrange.ResultToResponse(res, params)
default:
return nil, httpgrpc.Errorf(http.StatusNotImplemented, "unsupported query type %T", req)
}
}
func (h *queryHandler) doRequest(ctx context.Context, req *queryrange.LokiRequest) (logqlmodel.Result, error) {
logger := utillog.WithContext(ctx, h.logger)
if err := h.validateMaxEntriesLimits(ctx, req.Plan.AST, req.Limit); err != nil {
return logqlmodel.Result{}, err
}
params, err := queryrange.ParamsFromRequest(req)
if err != nil {
return logqlmodel.Result{}, err
}
return h.execute(ctx, logger, params)
}
func (h *queryHandler) validateMaxEntriesLimits(ctx context.Context, expr syntax.Expr, limit uint32) error {
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error())
}
// entry limit does not apply to metric queries.
if _, ok := expr.(syntax.SampleExpr); ok {
return nil
}
maxEntriesCapture := func(id string) int { return h.limits.MaxEntriesLimitPerQuery(ctx, id) }
maxEntriesLimit := util_validation.SmallestPositiveNonZeroIntPerTenant(tenantIDs, maxEntriesCapture)
if int(limit) > maxEntriesLimit && maxEntriesLimit != 0 {
return httpgrpc.Errorf(http.StatusBadRequest,
"max entries limit per query exceeded, limit > max_entries_limit_per_query (%d > %d)", limit, maxEntriesLimit)
}
return nil
}
func (h *queryHandler) execute(ctx context.Context, logger log.Logger, params logql.Params) (logqlmodel.Result, error) {
if err := h.validateTimeRange(params); err != nil {
return logqlmodel.Result{}, httpgrpc.Error(http.StatusNotImplemented, err.Error())
}
res, err := h.exec.Execute(ctx, params)
if err != nil && errors.Is(err, ErrNotSupported) {
level.Warn(logger).Log("msg", "unsupported query", "err", err)
return res, httpgrpc.Error(http.StatusNotImplemented, "unsupported query")
} else if err != nil {
level.Error(logger).Log("msg", "query execution failed with new query engine", "err", err)
return res, err
}
return res, nil
}
// validateTimeRange returns an error if the requested time range in params is
// outside of the time range supported by the new engine.
func (h *queryHandler) validateTimeRange(params logql.Params) error {
reqStart, reqEnd := params.Start(), params.End()
validStart, validEnd := h.cfg.ValidQueryRange()
if !reqEnd.After(validEnd) && !reqStart.Before(validStart) {
return nil
}
return fmt.Errorf(
"query outside of acceptable time range: requested {start=%s, end=%s}, valid range {start=%s, end=%s}",
reqStart, reqEnd,
validStart, validEnd,
)
}
func (h *queryHandler) doInstantRequest(ctx context.Context, req *queryrange.LokiInstantRequest) (logqlmodel.Result, error) {
logger := utillog.WithContext(ctx, h.logger)
// Do not allow log selector expression (aka log query) as instant query.
if _, ok := req.Plan.AST.(syntax.SampleExpr); !ok {
return logqlmodel.Result{}, logqlmodel.ErrUnsupportedSyntaxForInstantQuery
}
if err := h.validateMaxEntriesLimits(ctx, req.Plan.AST, req.Limit); err != nil {
return logqlmodel.Result{}, err
}
params, err := queryrange.ParamsFromRequest(req)
if err != nil {
return logqlmodel.Result{}, err
}
return h.execute(ctx, logger, params)
}