Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/querier/queryrange/engine_router.go

373 lines
10 KiB

package queryrange
import (
"context"
"fmt"
"net/http"
"slices"
"time"
"github.com/coder/quartz"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql"
"github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase"
)
// RouterConfig configures sending queries to a separate engine.
type RouterConfig struct {
Enabled bool
Start time.Time // Start time of the v2 engine
Lag time.Duration // Lag after which v2 engine has data
// Validate function to check if the query is supported by the engine.
Validate func(params logql.Params) bool
// Handler to execute queries against the engine.
Handler queryrangebase.Handler
}
// engineReqResp represents a request with its result channel
type engineReqResp struct {
lokiResult
isV2Engine bool
}
// engineRouter handles splitting queries between V1 and V2 engines
type engineRouter struct {
v2Start time.Time
v2Lag time.Duration
forMetricQuery bool
v1Next queryrangebase.Handler
v2Next queryrangebase.Handler
checkV2 func(params logql.Params) bool
merger queryrangebase.Merger
logger log.Logger
// Used for tests.
clock quartz.Clock
}
// NewEngineRouterMiddleware creates a middleware that splits and routes part of the query
// to v2 engine if the query is supported by it.
func NewEngineRouterMiddleware(
v2RouterConfig RouterConfig,
v1Chain []queryrangebase.Middleware,
merger queryrangebase.Merger,
metricQuery bool,
logger log.Logger,
) queryrangebase.Middleware {
if v2RouterConfig.Handler == nil {
panic("v2 engine handler cannot be nil")
}
return queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler {
return &engineRouter{
v2Start: v2RouterConfig.Start,
v2Lag: v2RouterConfig.Lag,
v1Next: queryrangebase.MergeMiddlewares(v1Chain...).Wrap(next),
v2Next: v2RouterConfig.Handler,
checkV2: v2RouterConfig.Validate,
merger: merger,
logger: logger,
forMetricQuery: metricQuery,
clock: quartz.NewReal(),
}
})
}
func (e *engineRouter) Do(ctx context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
start, end := e.v2Start, e.getEnd()
// if query is entirely before or after v2 engine range, process using next handler.
// ignore any boundary overlap, splitting requests that fall on bounary would result in tiny requests.
if !e.isOverlappingV2range(r, start, end) {
return e.v1Next.Do(ctx, r)
}
params, err := ParamsFromRequest(r)
if err != nil {
return nil, err
}
// Unsupported queries should be entirely executed by chunks.
if !e.checkV2(params) {
return e.v1Next.Do(ctx, r)
}
inputs := e.splitOverlapping(r, start, end)
// for log queries, order the splits to return early on hitting limits.
var limit uint32
if !e.forMetricQuery && len(inputs) > 1 {
r, ok := r.(*LokiRequest)
if !ok {
level.Error(e.logger).Log("msg", "engine router received unexpected request type", "type", fmt.Sprintf("%T", r))
return nil, errors.New("engine router: unexpected request type")
}
limit = r.Limit
if r.Direction == logproto.BACKWARD {
slices.SortFunc(inputs, func(a, b *engineReqResp) int {
return b.req.GetStart().Compare(a.req.GetStart())
})
} else {
slices.SortFunc(inputs, func(a, b *engineReqResp) int {
return a.req.GetStart().Compare(b.req.GetStart())
})
}
}
responses, err := e.process(ctx, inputs, limit)
if err != nil {
return nil, err
}
// Merge responses
return e.merger.MergeResponse(responses...)
}
// whether the time range of the request overlaps with the time range of the v2 engine
func (e engineRouter) isOverlappingV2range(r queryrangebase.Request, start, end time.Time) bool {
if !r.GetEnd().After(start) || !r.GetStart().Before(end) {
return false
}
return true
}
// the end time of the v2 engine based on current timestamp and v2 engine lag
func (e engineRouter) getEnd() time.Time {
return e.clock.Now().UTC().Add(-e.v2Lag)
}
// splitOverlapping creates a set of requests to be made. Returned requests are
// split up to the boundary of the v2 engine's time range (v2Start, v2End).
//
// splitOverlapping can create 1 to 3 splits depending on the overlap between
// the requested time range and the v2 engine's time range:
//
// - A split for v2 is created for timestamps that overlap with v2Start and
// v2End.
// - If necessary, a pre-v2 split is created for timestamps before v2Start.
// - If necessary, a post-v2 split is created for timestamps after v2End.
//
// If engineRouter is used for metric queries, the split for v2 will be aligned
// to the requested step: the start time will be shifted up to the next step
// boundary, and the end time will be rounded down to the previous step
// boundary.
func (e *engineRouter) splitOverlapping(r queryrangebase.Request, v2Start, v2End time.Time) []*engineReqResp {
if !e.isOverlappingV2range(r, v2Start, v2End) {
// This line should be unreachable, since [engineRouter.Do] already
// makes sure there is at least some overlap. However, we keep this line
// for safety so that logsSplit doesn't produce incorrect splits.
return []*engineReqResp{{
lokiResult: lokiResult{
req: r,
ch: make(chan *packedResp),
},
isV2Engine: false,
}}
}
// step determines alignment for splits.
//
// step is only used for metrics queries, which produce a set of samples
// from r.GetStart() to r.GetEnd() incremented by step. For logs queries,
// step is left as the zero value.
var step time.Duration
if e.forMetricQuery {
// Get the step (in milliseconds) from the request.
step = time.Duration(r.GetStep() * int64(time.Millisecond))
}
var (
// Align v2Start and v2End to the requested step. This is a no-op for logs
// queries, as the step is 0.
alignedV2Start, alignedV2End = alignV2Range(step, v2Start, v2End)
// v2SplitStart is clamped to the earliest timestamp which is still
// serviceable by v2.
v2SplitStart = maxTime(alignedV2Start, r.GetStart())
// v2SplitEnd is clamped to the latest timestamp which is still
// serviceable by v2.
v2SplitEnd = minTime(alignedV2End, r.GetEnd())
)
var splits []*engineReqResp
// We need a pre-v2 split if the request starts before v2.
if r.GetStart().Before(v2SplitStart) {
// For metric queries, two splits must not share an instant (determined
// by step). To avoid this, we shift back the end by one step.
//
// To avoid this resulting in end < start, we clamp it to the starting
// time.
preV2SplitEnd := maxTime(r.GetStart(), v2SplitStart.Add(-step))
splits = append(splits, &engineReqResp{
lokiResult: lokiResult{
req: r.WithStartEnd(r.GetStart(), preV2SplitEnd),
ch: make(chan *packedResp),
},
isV2Engine: false,
})
}
// Create the v2 split.
{
splits = append(splits, &engineReqResp{
lokiResult: lokiResult{
req: r.WithStartEnd(v2SplitStart, v2SplitEnd),
ch: make(chan *packedResp),
},
isV2Engine: true,
})
}
// We need a post-v2 split if v2 ends before the request ends.
if v2SplitEnd.Before(r.GetEnd()) {
// For metric queries, two splits must not share an instant (determined
// by step). To avoid this, we shift the start forward by one step.
//
// To avoid this resulting in end < start, we clamp it to the ending
// time.
postV2SplitStart := minTime(v2SplitEnd.Add(step), r.GetEnd())
// The start must be one step *after* v2SplitEnd.
splits = append(splits, &engineReqResp{
lokiResult: lokiResult{
req: r.WithStartEnd(postV2SplitStart, r.GetEnd()),
ch: make(chan *packedResp),
},
isV2Engine: false,
})
}
return splits
}
func minTime(a, b time.Time) time.Time {
if a.Before(b) {
return a
}
return b
}
func maxTime(a, b time.Time) time.Time {
if a.After(b) {
return a
}
return b
}
func (e *engineRouter) handleReq(ctx context.Context, r *engineReqResp) {
var resp packedResp
if r.isV2Engine {
resp.resp, resp.err = e.v2Next.Do(ctx, r.req)
if isUnsupportedError(resp.err) {
// Our router validates queries beforehand, but we fall back here for safety.
level.Warn(e.logger).Log("msg", "falling back to v1 engine", "err", resp.err)
resp.resp, resp.err = e.v1Next.Do(ctx, r.req)
}
} else {
resp.resp, resp.err = e.v1Next.Do(ctx, r.req)
}
select {
case <-ctx.Done():
return
case r.ch <- &resp:
}
}
// isUnsupportedError checks whether the provided error corresponds to a
// [http.StatusNotImplemented] provided via [httpgrpc].
func isUnsupportedError(err error) bool {
if err == nil {
return false
}
resp, ok := httpgrpc.HTTPResponseFromError(err)
return ok && resp.Code == http.StatusNotImplemented
}
// process executes the inputs in parallel and collects the responses.
func (e *engineRouter) process(ctx context.Context, inputs []*engineReqResp, limit uint32) ([]queryrangebase.Response, error) {
ctx, cancel := context.WithCancelCause(ctx)
defer cancel(errors.New("engine router process cancelled"))
// Run all requests in parallel as we only get a max of 3 splits.
for _, r := range inputs {
go e.handleReq(ctx, r)
}
var responses []queryrangebase.Response
var count int64
for _, x := range inputs {
select {
case <-ctx.Done():
return nil, ctx.Err()
case data := <-x.ch:
if data.err != nil {
return nil, data.err
}
responses = append(responses, data.resp)
if limit > 0 {
// exit early if limit has been reached
if r, ok := data.resp.(*LokiResponse); ok {
count += r.Count()
if count >= int64(limit) {
return responses, nil
}
}
}
}
}
return responses, nil
}
// alignV2Range aligns v2Start and v2End to the given step.
//
// - If v2Start is not aligned to step, it is rounded up to the next step boundary.
// - If v2End is not aligned to step, it is rounded down to the previous step boundary.
//
// If step is 0, no alignment is done.
func alignV2Range(step time.Duration, v2Start, v2End time.Time) (v2AlignedStart, v2AlignedEnd time.Time) {
if step == 0 {
// No alignment to perform.
return v2Start, v2End
}
if mod := v2Start.UnixNano() % step.Nanoseconds(); mod != 0 {
// Round the start time up.
v2AlignedStart = v2Start.Add(step - time.Duration(mod))
} else {
// Start time is already aligned.
v2AlignedStart = v2Start
}
if mod := v2End.UnixNano() % step.Nanoseconds(); mod != 0 {
// Round the end time down to the previous step boundary.
v2AlignedEnd = v2End.Add(-time.Duration(mod))
} else {
// End time is already aligned.
v2AlignedEnd = v2End
}
return v2AlignedStart, v2AlignedEnd
}