diff --git a/pkg/logql/ast.go b/pkg/logql/ast.go index 07aba11efd..a02844a7e6 100644 --- a/pkg/logql/ast.go +++ b/pkg/logql/ast.go @@ -26,6 +26,10 @@ type Expr interface { fmt.Stringer } +func Clone(e Expr) (Expr, error) { + return ParseExpr(e.String()) +} + type QueryParams interface { LogSelector() (LogSelectorExpr, error) GetStart() time.Time diff --git a/pkg/logql/evaluator.go b/pkg/logql/evaluator.go index e9fbcfc1f2..bf0a5fa8f9 100644 --- a/pkg/logql/evaluator.go +++ b/pkg/logql/evaluator.go @@ -11,6 +11,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" + "golang.org/x/sync/errgroup" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" @@ -547,16 +548,31 @@ func binOpStepEvaluator( ) } - // we have two non literal legs - lse, err := ev.StepEvaluator(ctx, ev, expr.SampleExpr, q) - if err != nil { - return nil, err - } - rse, err := ev.StepEvaluator(ctx, ev, expr.RHS, q) - if err != nil { + var lse, rse StepEvaluator + g, ctx := errgroup.WithContext(ctx) + + // We have two non literal legs, + // load them in parallel + g.Go(func() error { + var err error + lse, err = ev.StepEvaluator(ctx, ev, expr.SampleExpr, q) + return err + }) + g.Go(func() error { + var err error + rse, err = ev.StepEvaluator(ctx, ev, expr.RHS, q) + return err + }) + + // ensure both sides are loaded before returning the combined evaluator + if err := g.Wait(); err != nil { return nil, err } + // keep a scoped reference to err as it's referenced in the Error() + // implementation of this StepEvaluator + var scopedErr error + return newStepEvaluator(func() (bool, int64, promql.Vector) { var ( ts int64 @@ -593,7 +609,7 @@ func binOpStepEvaluator( case OpTypeUnless: results = vectorUnless(lhs, rhs, lsigs, rsigs) default: - results, err = vectorBinop(expr.Op, expr.Opts, lhs, rhs, lsigs, rsigs) + results, scopedErr = vectorBinop(expr.Op, expr.Opts, lhs, rhs, lsigs, rsigs) } return true, ts, results }, func() (lastError error) { @@ -605,8 +621,8 @@ func binOpStepEvaluator( return lastError }, func() error { var errs []error - if err != nil { - errs = append(errs, err) + if scopedErr != nil { + errs = append(errs, scopedErr) } for _, ev := range []StepEvaluator{lse, rse} { if err := ev.Error(); err != nil { diff --git a/pkg/logql/shardmapper.go b/pkg/logql/shardmapper.go index f7fdbfec38..d6a1f93f70 100644 --- a/pkg/logql/shardmapper.go +++ b/pkg/logql/shardmapper.go @@ -111,8 +111,8 @@ func (m ShardMapper) Parse(query string) (noop bool, expr Expr, err error) { return false, nil, err } - mappedStr := mapped.String() originalStr := parsed.String() + mappedStr := mapped.String() noop = originalStr == mappedStr if noop { m.metrics.parsed.WithLabelValues(NoopKey).Inc() @@ -126,6 +126,12 @@ func (m ShardMapper) Parse(query string) (noop bool, expr Expr, err error) { } func (m ShardMapper) Map(expr Expr, r *shardRecorder) (Expr, error) { + // immediately clone the passed expr to avoid mutating the original + expr, err := Clone(expr) + if err != nil { + return nil, err + } + switch e := expr.(type) { case *LiteralExpr: return e, nil diff --git a/pkg/querier/queryrange/downstreamer.go b/pkg/querier/queryrange/downstreamer.go index 976e878f2f..f4ae9e756d 100644 --- a/pkg/querier/queryrange/downstreamer.go +++ b/pkg/querier/queryrange/downstreamer.go @@ -18,7 +18,7 @@ import ( ) const ( - DefaultDownstreamConcurrency = 32 + DefaultDownstreamConcurrency = 128 ) type DownstreamHandler struct { @@ -48,6 +48,12 @@ func ParamsToLokiRequest(params logql.Params, shards logql.Shards) queryrangebas } } +// Note: After the introduction of the LimitedRoundTripper, +// bounding concurrency in the downstreamer is mostly redundant +// The reason we don't remove it is to prevent malicious queries +// from creating an unreasonably large number of goroutines, such as +// the case of a query like `a / a / a / a / a ..etc`, which could try +// to shard each leg, quickly dispatching an unreasonable number of goroutines. func (h DownstreamHandler) Downstreamer() logql.Downstreamer { p := DefaultDownstreamConcurrency locks := make(chan struct{}, p)