mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
373 lines
10 KiB
373 lines
10 KiB
package queryrange
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
"slices"
|
|
"time"
|
|
|
|
"github.com/coder/quartz"
|
|
"github.com/go-kit/log"
|
|
"github.com/go-kit/log/level"
|
|
"github.com/pkg/errors"
|
|
|
|
"github.com/grafana/dskit/httpgrpc"
|
|
|
|
"github.com/grafana/loki/v3/pkg/logproto"
|
|
"github.com/grafana/loki/v3/pkg/logql"
|
|
"github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase"
|
|
)
|
|
|
|
// RouterConfig configures sending queries to a separate engine.
|
|
type RouterConfig struct {
|
|
Enabled bool
|
|
|
|
Start time.Time // Start time of the v2 engine
|
|
Lag time.Duration // Lag after which v2 engine has data
|
|
|
|
// Validate function to check if the query is supported by the engine.
|
|
Validate func(params logql.Params) bool
|
|
|
|
// Handler to execute queries against the engine.
|
|
Handler queryrangebase.Handler
|
|
}
|
|
|
|
// engineReqResp represents a request with its result channel
|
|
type engineReqResp struct {
|
|
lokiResult
|
|
isV2Engine bool
|
|
}
|
|
|
|
// engineRouter handles splitting queries between V1 and V2 engines
|
|
type engineRouter struct {
|
|
v2Start time.Time
|
|
v2Lag time.Duration
|
|
|
|
forMetricQuery bool
|
|
|
|
v1Next queryrangebase.Handler
|
|
v2Next queryrangebase.Handler
|
|
|
|
checkV2 func(params logql.Params) bool
|
|
|
|
merger queryrangebase.Merger
|
|
|
|
logger log.Logger
|
|
|
|
// Used for tests.
|
|
clock quartz.Clock
|
|
}
|
|
|
|
// NewEngineRouterMiddleware creates a middleware that splits and routes part of the query
|
|
// to v2 engine if the query is supported by it.
|
|
func NewEngineRouterMiddleware(
|
|
v2RouterConfig RouterConfig,
|
|
v1Chain []queryrangebase.Middleware,
|
|
merger queryrangebase.Merger,
|
|
metricQuery bool,
|
|
logger log.Logger,
|
|
) queryrangebase.Middleware {
|
|
if v2RouterConfig.Handler == nil {
|
|
panic("v2 engine handler cannot be nil")
|
|
}
|
|
|
|
return queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler {
|
|
return &engineRouter{
|
|
v2Start: v2RouterConfig.Start,
|
|
v2Lag: v2RouterConfig.Lag,
|
|
v1Next: queryrangebase.MergeMiddlewares(v1Chain...).Wrap(next),
|
|
v2Next: v2RouterConfig.Handler,
|
|
checkV2: v2RouterConfig.Validate,
|
|
merger: merger,
|
|
logger: logger,
|
|
forMetricQuery: metricQuery,
|
|
clock: quartz.NewReal(),
|
|
}
|
|
})
|
|
}
|
|
|
|
func (e *engineRouter) Do(ctx context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
|
|
start, end := e.v2Start, e.getEnd()
|
|
// if query is entirely before or after v2 engine range, process using next handler.
|
|
// ignore any boundary overlap, splitting requests that fall on bounary would result in tiny requests.
|
|
if !e.isOverlappingV2range(r, start, end) {
|
|
return e.v1Next.Do(ctx, r)
|
|
}
|
|
|
|
params, err := ParamsFromRequest(r)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Unsupported queries should be entirely executed by chunks.
|
|
if !e.checkV2(params) {
|
|
return e.v1Next.Do(ctx, r)
|
|
}
|
|
|
|
inputs := e.splitOverlapping(r, start, end)
|
|
|
|
// for log queries, order the splits to return early on hitting limits.
|
|
var limit uint32
|
|
if !e.forMetricQuery && len(inputs) > 1 {
|
|
r, ok := r.(*LokiRequest)
|
|
if !ok {
|
|
level.Error(e.logger).Log("msg", "engine router received unexpected request type", "type", fmt.Sprintf("%T", r))
|
|
return nil, errors.New("engine router: unexpected request type")
|
|
}
|
|
|
|
limit = r.Limit
|
|
|
|
if r.Direction == logproto.BACKWARD {
|
|
slices.SortFunc(inputs, func(a, b *engineReqResp) int {
|
|
return b.req.GetStart().Compare(a.req.GetStart())
|
|
})
|
|
} else {
|
|
slices.SortFunc(inputs, func(a, b *engineReqResp) int {
|
|
return a.req.GetStart().Compare(b.req.GetStart())
|
|
})
|
|
}
|
|
}
|
|
|
|
responses, err := e.process(ctx, inputs, limit)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Merge responses
|
|
return e.merger.MergeResponse(responses...)
|
|
}
|
|
|
|
// whether the time range of the request overlaps with the time range of the v2 engine
|
|
func (e engineRouter) isOverlappingV2range(r queryrangebase.Request, start, end time.Time) bool {
|
|
if !r.GetEnd().After(start) || !r.GetStart().Before(end) {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// the end time of the v2 engine based on current timestamp and v2 engine lag
|
|
func (e engineRouter) getEnd() time.Time {
|
|
return e.clock.Now().UTC().Add(-e.v2Lag)
|
|
}
|
|
|
|
// splitOverlapping creates a set of requests to be made. Returned requests are
|
|
// split up to the boundary of the v2 engine's time range (v2Start, v2End).
|
|
//
|
|
// splitOverlapping can create 1 to 3 splits depending on the overlap between
|
|
// the requested time range and the v2 engine's time range:
|
|
//
|
|
// - A split for v2 is created for timestamps that overlap with v2Start and
|
|
// v2End.
|
|
// - If necessary, a pre-v2 split is created for timestamps before v2Start.
|
|
// - If necessary, a post-v2 split is created for timestamps after v2End.
|
|
//
|
|
// If engineRouter is used for metric queries, the split for v2 will be aligned
|
|
// to the requested step: the start time will be shifted up to the next step
|
|
// boundary, and the end time will be rounded down to the previous step
|
|
// boundary.
|
|
func (e *engineRouter) splitOverlapping(r queryrangebase.Request, v2Start, v2End time.Time) []*engineReqResp {
|
|
if !e.isOverlappingV2range(r, v2Start, v2End) {
|
|
// This line should be unreachable, since [engineRouter.Do] already
|
|
// makes sure there is at least some overlap. However, we keep this line
|
|
// for safety so that logsSplit doesn't produce incorrect splits.
|
|
return []*engineReqResp{{
|
|
lokiResult: lokiResult{
|
|
req: r,
|
|
ch: make(chan *packedResp),
|
|
},
|
|
isV2Engine: false,
|
|
}}
|
|
}
|
|
|
|
// step determines alignment for splits.
|
|
//
|
|
// step is only used for metrics queries, which produce a set of samples
|
|
// from r.GetStart() to r.GetEnd() incremented by step. For logs queries,
|
|
// step is left as the zero value.
|
|
var step time.Duration
|
|
|
|
if e.forMetricQuery {
|
|
// Get the step (in milliseconds) from the request.
|
|
step = time.Duration(r.GetStep() * int64(time.Millisecond))
|
|
}
|
|
|
|
var (
|
|
// Align v2Start and v2End to the requested step. This is a no-op for logs
|
|
// queries, as the step is 0.
|
|
alignedV2Start, alignedV2End = alignV2Range(step, v2Start, v2End)
|
|
|
|
// v2SplitStart is clamped to the earliest timestamp which is still
|
|
// serviceable by v2.
|
|
v2SplitStart = maxTime(alignedV2Start, r.GetStart())
|
|
|
|
// v2SplitEnd is clamped to the latest timestamp which is still
|
|
// serviceable by v2.
|
|
v2SplitEnd = minTime(alignedV2End, r.GetEnd())
|
|
)
|
|
|
|
var splits []*engineReqResp
|
|
|
|
// We need a pre-v2 split if the request starts before v2.
|
|
if r.GetStart().Before(v2SplitStart) {
|
|
// For metric queries, two splits must not share an instant (determined
|
|
// by step). To avoid this, we shift back the end by one step.
|
|
//
|
|
// To avoid this resulting in end < start, we clamp it to the starting
|
|
// time.
|
|
preV2SplitEnd := maxTime(r.GetStart(), v2SplitStart.Add(-step))
|
|
|
|
splits = append(splits, &engineReqResp{
|
|
lokiResult: lokiResult{
|
|
req: r.WithStartEnd(r.GetStart(), preV2SplitEnd),
|
|
ch: make(chan *packedResp),
|
|
},
|
|
isV2Engine: false,
|
|
})
|
|
}
|
|
|
|
// Create the v2 split.
|
|
{
|
|
splits = append(splits, &engineReqResp{
|
|
lokiResult: lokiResult{
|
|
req: r.WithStartEnd(v2SplitStart, v2SplitEnd),
|
|
ch: make(chan *packedResp),
|
|
},
|
|
isV2Engine: true,
|
|
})
|
|
}
|
|
|
|
// We need a post-v2 split if v2 ends before the request ends.
|
|
if v2SplitEnd.Before(r.GetEnd()) {
|
|
// For metric queries, two splits must not share an instant (determined
|
|
// by step). To avoid this, we shift the start forward by one step.
|
|
//
|
|
// To avoid this resulting in end < start, we clamp it to the ending
|
|
// time.
|
|
postV2SplitStart := minTime(v2SplitEnd.Add(step), r.GetEnd())
|
|
|
|
// The start must be one step *after* v2SplitEnd.
|
|
splits = append(splits, &engineReqResp{
|
|
lokiResult: lokiResult{
|
|
req: r.WithStartEnd(postV2SplitStart, r.GetEnd()),
|
|
ch: make(chan *packedResp),
|
|
},
|
|
isV2Engine: false,
|
|
})
|
|
}
|
|
|
|
return splits
|
|
}
|
|
|
|
func minTime(a, b time.Time) time.Time {
|
|
if a.Before(b) {
|
|
return a
|
|
}
|
|
return b
|
|
}
|
|
|
|
func maxTime(a, b time.Time) time.Time {
|
|
if a.After(b) {
|
|
return a
|
|
}
|
|
return b
|
|
}
|
|
|
|
func (e *engineRouter) handleReq(ctx context.Context, r *engineReqResp) {
|
|
var resp packedResp
|
|
if r.isV2Engine {
|
|
resp.resp, resp.err = e.v2Next.Do(ctx, r.req)
|
|
if isUnsupportedError(resp.err) {
|
|
// Our router validates queries beforehand, but we fall back here for safety.
|
|
level.Warn(e.logger).Log("msg", "falling back to v1 engine", "err", resp.err)
|
|
resp.resp, resp.err = e.v1Next.Do(ctx, r.req)
|
|
}
|
|
} else {
|
|
resp.resp, resp.err = e.v1Next.Do(ctx, r.req)
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case r.ch <- &resp:
|
|
}
|
|
}
|
|
|
|
// isUnsupportedError checks whether the provided error corresponds to a
|
|
// [http.StatusNotImplemented] provided via [httpgrpc].
|
|
func isUnsupportedError(err error) bool {
|
|
if err == nil {
|
|
return false
|
|
}
|
|
|
|
resp, ok := httpgrpc.HTTPResponseFromError(err)
|
|
return ok && resp.Code == http.StatusNotImplemented
|
|
}
|
|
|
|
// process executes the inputs in parallel and collects the responses.
|
|
func (e *engineRouter) process(ctx context.Context, inputs []*engineReqResp, limit uint32) ([]queryrangebase.Response, error) {
|
|
ctx, cancel := context.WithCancelCause(ctx)
|
|
defer cancel(errors.New("engine router process cancelled"))
|
|
|
|
// Run all requests in parallel as we only get a max of 3 splits.
|
|
for _, r := range inputs {
|
|
go e.handleReq(ctx, r)
|
|
}
|
|
|
|
var responses []queryrangebase.Response
|
|
var count int64
|
|
for _, x := range inputs {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
case data := <-x.ch:
|
|
if data.err != nil {
|
|
return nil, data.err
|
|
}
|
|
|
|
responses = append(responses, data.resp)
|
|
if limit > 0 {
|
|
// exit early if limit has been reached
|
|
if r, ok := data.resp.(*LokiResponse); ok {
|
|
count += r.Count()
|
|
if count >= int64(limit) {
|
|
return responses, nil
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return responses, nil
|
|
}
|
|
|
|
// alignV2Range aligns v2Start and v2End to the given step.
|
|
//
|
|
// - If v2Start is not aligned to step, it is rounded up to the next step boundary.
|
|
// - If v2End is not aligned to step, it is rounded down to the previous step boundary.
|
|
//
|
|
// If step is 0, no alignment is done.
|
|
func alignV2Range(step time.Duration, v2Start, v2End time.Time) (v2AlignedStart, v2AlignedEnd time.Time) {
|
|
if step == 0 {
|
|
// No alignment to perform.
|
|
return v2Start, v2End
|
|
}
|
|
|
|
if mod := v2Start.UnixNano() % step.Nanoseconds(); mod != 0 {
|
|
// Round the start time up.
|
|
v2AlignedStart = v2Start.Add(step - time.Duration(mod))
|
|
} else {
|
|
// Start time is already aligned.
|
|
v2AlignedStart = v2Start
|
|
}
|
|
|
|
if mod := v2End.UnixNano() % step.Nanoseconds(); mod != 0 {
|
|
// Round the end time down to the previous step boundary.
|
|
v2AlignedEnd = v2End.Add(-time.Duration(mod))
|
|
} else {
|
|
// End time is already aligned.
|
|
v2AlignedEnd = v2End
|
|
}
|
|
|
|
return v2AlignedStart, v2AlignedEnd
|
|
}
|
|
|