package engine import ( "context" "errors" "flag" "fmt" "net/http" gotrace "runtime/trace" "strings" "time" "github.com/alecthomas/units" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/httpgrpc" "github.com/grafana/dskit/tenant" "github.com/prometheus/client_golang/prometheus" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" "github.com/grafana/loki/v3/pkg/dataobj/metastore" "github.com/grafana/loki/v3/pkg/engine/internal/deletion" "github.com/grafana/loki/v3/pkg/engine/internal/executor" "github.com/grafana/loki/v3/pkg/engine/internal/planner/logical" "github.com/grafana/loki/v3/pkg/engine/internal/planner/physical" "github.com/grafana/loki/v3/pkg/engine/internal/workflow" "github.com/grafana/loki/v3/pkg/logql" "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/logqlmodel" "github.com/grafana/loki/v3/pkg/logqlmodel/metadata" "github.com/grafana/loki/v3/pkg/storage/chunk/cache" "github.com/grafana/loki/v3/pkg/storage/chunk/cache/resultscache" "github.com/grafana/loki/v3/pkg/util/httpreq" util_log "github.com/grafana/loki/v3/pkg/util/log" "github.com/grafana/loki/v3/pkg/util/rangeio" "github.com/grafana/loki/v3/pkg/xcap" ) var ( // ErrPlanningFailed is returned when query planning fails unexpectedly. // ErrPlanningFailed is not used for unimplemented features, which returns // [ErrNotSupported] instead. ErrPlanningFailed = errors.New("query planning failed unexpectedly") // ErrSchedulingFailed is returned when communication with the scheduler fails. ErrSchedulingFailed = errors.New("failed to schedule query") ) var tracer = otel.Tracer("pkg/engine") // Re-export internal types for external use. type ( // RequestStreamFilterer creates a StreamFilterer for a given request context. RequestStreamFilterer = executor.RequestStreamFilterer // StreamFilterer filters streams based on their labels. StreamFilterer = executor.StreamFilterer ) // ExecutorConfig configures engine execution. type ExecutorConfig struct { // Batch size of the v2 execution engine. BatchSize int `yaml:"batch_size" category:"experimental"` // PrefetchBytes controls the number of bytes read ahead from a data object // when opening it. PrefetchBytes flagext.Bytes `yaml:"prefetch_bytes" category:"experimental"` // MergePrefetchCount controls the number of inputs that are prefetched simultaneously by any Merge node. MergePrefetchCount int `yaml:"merge_prefetch_count" category:"experimental"` // RangeConfig determines how to optimize range reads in the V2 engine. RangeConfig rangeio.Config `yaml:"range_reads" category:"experimental" doc:"description=Configures how to read byte ranges from object storage when using the V2 engine."` // StreamFilterer is an optional filterer that can filter streams based on their labels. // When set, streams are filtered before scanning. StreamFilterer executor.RequestStreamFilterer `yaml:"-"` // TaskResultsCache configures the backing cache for task results. TaskResultsCache TaskCacheConfig `yaml:"task_results_cache" category:"experimental"` } func (cfg *ExecutorConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { cfg.PrefetchBytes = flagext.Bytes(16 * units.KiB) f.IntVar(&cfg.BatchSize, prefix+"batch-size", 100, "Experimental: Batch size of the next generation query engine.") f.Var(&cfg.PrefetchBytes, prefix+"prefetch-bytes", "Experimental: Number of bytes to prefetch when opening a data object for decoding metadata and overlapping section reads. Clamps to at least 16KiB.") f.IntVar(&cfg.MergePrefetchCount, prefix+"merge-prefetch-count", 0, "Experimental: The number of inputs that are prefetched simultaneously by any Merge node. A value of 0 means that only the currently processed input is prefetched, 1 means that only the next input is prefetched, and so on. A negative value means that all inputs are be prefetched in parallel.") cfg.RangeConfig.RegisterFlags(prefix+"range-reads.", f) cfg.TaskResultsCache.RegisterFlagsWithPrefix(prefix+"task-results-cache.", f) } // TaskCacheConfig extends resultscache.Config with additional task-cache-specific settings. type TaskCacheConfig struct { resultscache.Config `yaml:",inline"` TaskResultMaxCacheableSize flagext.Bytes `yaml:"task_result_max_cacheable_size" category:"experimental"` DataObjScanResultMaxCacheableSize flagext.Bytes `yaml:"dataobjscan_result_max_cacheable_size" category:"experimental"` PruneEmptyCachedTasks bool `yaml:"prune_empty_cached_tasks" category:"experimental"` PruneCachedTasksMaxSize flagext.Bytes `yaml:"prune_cached_tasks_max_size" category:"experimental"` PruneCachedTasksFetchTimeout time.Duration `yaml:"prune_cached_tasks_fetch_timeout" category:"experimental"` } // RegisterFlagsWithPrefix registers flags for TaskCacheConfig with the given prefix. func (cfg *TaskCacheConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { cfg.Config.RegisterFlagsWithPrefix(f, prefix) f.Var(&cfg.TaskResultMaxCacheableSize, prefix+"task-result-max-cacheable-size", "Experimental: Maximum size for a task result to be cacheable. 0 means only empty responses are cached.") f.Var(&cfg.DataObjScanResultMaxCacheableSize, prefix+"dataobjscan-result-max-cacheable-size", "Experimental: Maximum size for a DataObjScan result to be cacheable. 0 means only empty responses are cached.") f.BoolVar(&cfg.PruneEmptyCachedTasks, prefix+"prune-empty-cached-tasks", false, "Experimental: When enabled, the scheduler checks cached results at plan time and prunes tasks whose cached result is known to be empty.") f.Var(&cfg.PruneCachedTasksMaxSize, prefix+"prune-cached-tasks-max-size", "Experimental: Maximum total size of non-empty cached task results embedded in task assignments. "+ "Results that would exceed the budget are skipped (smaller results that fit are still included). "+ "0 disables non-empty task pruning.") f.DurationVar(&cfg.PruneCachedTasksFetchTimeout, prefix+"prune-cached-tasks-fetch-timeout", time.Second, "Experimental: Timeout for cache fetch operations during cached-task pruning at plan time. 0 disables the timeout.") } // Params holds parameters for constructing a new [Engine]. type Params struct { Logger log.Logger // Logger for optional log messages. Registerer prometheus.Registerer // Registerer for optional metrics. Config Config // Config for the Engine. Scheduler *Scheduler // Scheduler to manage the execution of tasks. Metastore metastore.Metastore // Metastore to access the indexes Limits logql.Limits // Limits to apply to engine queries. DeleteGetter deletion.Getter // DeleteGetter to fetch delete requests for query-time filtering. } // validate validates p and applies defaults. func (p *Params) validate() error { if p.Logger == nil { p.Logger = log.NewNopLogger() } if p.Registerer == nil { p.Registerer = prometheus.NewRegistry() } if p.Scheduler == nil { return errors.New("scheduler is required") } if p.Metastore == nil { return errors.New("metastore is required") } if p.Config.Executor.BatchSize <= 0 { return fmt.Errorf("invalid batch size for query engine. must be greater than 0, got %d", p.Config.Executor.BatchSize) } return nil } // Engine defines parameters for executing queries. type Engine struct { logger log.Logger metrics *metrics cfg Config scheduler *Scheduler // Scheduler to manage the execution of tasks. limits logql.Limits // Limits to apply to engine queries. deleteGetter deletion.Getter // DeleteGetter to fetch delete requests for query-time filtering. metastore metastore.Metastore taskCaches executor.TaskCacheRegistry } // New creates a new Engine. func New(params Params) (*Engine, error) { if err := params.validate(); err != nil { return nil, err } var taskCaches executor.TaskCacheRegistry if params.Config.Executor.TaskResultsCache.PruneEmptyCachedTasks { var err error taskCaches, err = executor.NewTaskCacheRegistry(params.Config.Executor.TaskResultsCache.Config, params.Registerer, params.Logger) if err != nil { return nil, fmt.Errorf("creating task cache registry: %w", err) } } e := &Engine{ logger: params.Logger, metrics: newMetrics(params.Registerer), cfg: params.Config, scheduler: params.Scheduler, limits: params.Limits, deleteGetter: params.DeleteGetter, metastore: params.Metastore, taskCaches: taskCaches, } return e, nil } // Execute executes the given query. Execute returns [ErrNotSupported] if params // denotes a query that is not yet implemented in the new engine. func (e *Engine) Execute(ctx context.Context, params logql.Params) (logqlmodel.Result, error) { // NOTE(rfratto): To simplify the API, Engine does not directly implement // [logql.Engine], whose interface definition is not useful to the V2 // engine. As such, callers must define adapters to use Engine work as a // [logql.Engine]. // // This pain point will eventually go away as remaining usages of // [logql.Engine] disappear. // Starts the execution capture for the query. // All recorded observations will be captured to it. ctx, capture := xcap.NewCapture(ctx, nil) defer capture.End() startTime := time.Now() ctx, task := gotrace.NewTask(ctx, "Engine.Execute") defer task.End() tenantID, err := tenant.TenantID(ctx) if err != nil { return logqlmodel.Result{}, httpgrpc.Error(http.StatusBadRequest, err.Error()) } cacheEnabled := cache.IsCacheConfigured(e.cfg.Executor.TaskResultsCache.CacheConfig) && !params.CachingOptions().Disabled ctx, span := xcap.StartSpan(ctx, tracer, "Engine.Execute", trace.WithAttributes( attribute.String("type", string(logql.GetRangeType(params))), attribute.String("query", params.QueryString()), attribute.Stringer("start", params.Start()), attribute.Stringer("end", params.End()), attribute.Stringer("step", params.Step()), attribute.Stringer("length", params.End().Sub(params.Start())), attribute.StringSlice("shards", params.Shards()), attribute.Bool("cache_enabled", cacheEnabled), ), ) defer span.End() ctx = e.buildContext(ctx) logger := util_log.WithContext(ctx, e.logger) logger = log.With(logger, "engine", "v2") logger = injectQueryTags(ctx, logger) level.Info(logger).Log("msg", "starting query", "query", params.QueryString(), "shard", strings.Join(params.Shards(), ",")) logicalPlan, durLogicalPlanning, err := e.buildLogicalPlan(ctx, logger, params) if err != nil { e.metrics.subqueries.WithLabelValues(statusNotImplemented).Inc() span.SetStatus(codes.Error, "failed to create logical plan") return logqlmodel.Result{}, ErrNotSupported } gotrace.Log(ctx, "logical_planning", "done") physicalPlan, durPhysicalPlanning, err := e.buildPhysicalPlan(ctx, tenantID, logger, params, logicalPlan, cacheEnabled) if err != nil { e.metrics.subqueries.WithLabelValues(statusFailure).Inc() span.SetStatus(codes.Error, "failed to create physical plan") return logqlmodel.Result{}, ErrPlanningFailed } gotrace.Log(ctx, "physical_planning", "done") // Enable admission lanes only for log queries useAdmissionLanes := !isMetricQuery(params.GetExpression()) wf, durWorkflowPlanning, err := e.buildWorkflow(ctx, tenantID, logger, physicalPlan, useAdmissionLanes, cacheEnabled) if err != nil { e.metrics.subqueries.WithLabelValues(statusFailure).Inc() span.SetStatus(codes.Error, "failed to create execution plan") return logqlmodel.Result{}, ErrPlanningFailed } defer wf.Close() gotrace.Log(ctx, "workflow_planning", "done") pipeline, err := wf.Run(ctx) if err != nil { level.Error(logger).Log("msg", "failed to execute query", "err", err) e.metrics.subqueries.WithLabelValues(statusFailure).Inc() span.SetStatus(codes.Error, "failed to execute query") return logqlmodel.Result{}, ErrSchedulingFailed } defer pipeline.Close() gotrace.Log(ctx, "collect_result", "start") builder, durExecution, err := e.collectResult(ctx, logger, params, pipeline) if err != nil { e.metrics.subqueries.WithLabelValues(statusFailure).Inc() span.SetStatus(codes.Error, "error during query execution") return logqlmodel.Result{}, err } durFull := time.Since(startTime) logValues := []any{ "msg", "finished executing", "user_agent", httpreq.ExtractHeader(ctx, "User-Agent"), "query", params.QueryString(), "length", params.End().Sub(params.Start()).String(), "step", params.Step().String(), "duration_logical_planning", durLogicalPlanning, "duration_physical_planning", durPhysicalPlanning, "duration_workflow_planning", durWorkflowPlanning, "duration_execution", durExecution, "duration_full", durFull, } gotrace.Log(ctx, "collect_result", "done") // Close the pipeline to calculate the stats. pipeline.Close() span.SetStatus(codes.Ok, "") // explicitly call End() before exporting even though we have a defer above. // It is safe to call End() multiple times. span.End() capture.End() logValues = append(logValues, xcap.SummaryLogValues(capture)...) level.Info(logger).Log( logValues..., ) // TODO: capture and report queue time md := metadata.FromContext(ctx) stats := capture.ToStatsSummary(durFull, 0, builder.Len()) result := builder.Build(stats, md) return result, nil } // buildContext initializes a request-scoped context prior to execution. func (e *Engine) buildContext(ctx context.Context) context.Context { metadataContext, ctx := metadata.NewContext(ctx) // Inject the range config into the context for any calls to // [rangeio.ReadRanges] to make use of. ctx = rangeio.WithConfig(ctx, &e.cfg.Executor.RangeConfig) metadataContext.AddWarning("Query was executed using the new experimental query engine and dataobj storage.") return ctx } // injectQueryTags adds query tags as key-value pairs from the context into the // given logger, if they have been defined via [httpreq.InjectQueryTags]. // Otherwise, the original logger is returned unmodified. func injectQueryTags(ctx context.Context, logger log.Logger) log.Logger { tags := httpreq.ExtractQueryTagsFromContext(ctx) if len(tags) == 0 { return logger } return log.With(logger, httpreq.TagsToKeyValues(tags)...) } // isMetricQuery returns true if the given expression is a metric query, // false if it is a log query. func isMetricQuery(expr syntax.Expr) bool { _, ok := expr.(syntax.SampleExpr) return ok } // buildLogicalPlan builds a logical plan from the given params. func (e *Engine) buildLogicalPlan(ctx context.Context, logger log.Logger, params logql.Params) (*logical.Plan, time.Duration, error) { span := trace.SpanFromContext(ctx) timer := prometheus.NewTimer(e.metrics.logicalPlanning) var deleteReqs []*deletion.Request if e.deleteGetter != nil { var err error deleteReqs, err = deletion.DeletesForUser(ctx, params.Start(), params.End(), e.deleteGetter) if err != nil { return nil, 0, fmt.Errorf("failed to get delete requests: %w", err) } } logicalPlan, err := logical.BuildPlanWithDeletes(ctx, params, deleteReqs) if err != nil { level.Warn(logger).Log("msg", "failed to create logical plan", "err", err) span.RecordError(err) return nil, 0, ErrNotSupported } if err := logical.Optimize(logicalPlan); err != nil { level.Warn(logger).Log("msg", "failed to optimize logical plan", "err", err) span.RecordError(err) return nil, 0, ErrNotSupported } duration := timer.ObserveDuration() level.Info(logger).Log( "msg", "finished logical planning", "plan", logicalPlan.String(), "duration", duration.String(), ) span.AddEvent("finished logical planning", trace.WithAttributes( attribute.Stringer("plan", logicalPlan), attribute.Stringer("duration", duration), ), ) return logicalPlan, duration, nil } // buildPhysicalPlan builds a physical plan from the given logical plan. func (e *Engine) buildPhysicalPlan(ctx context.Context, tenantID string, logger log.Logger, params logql.Params, logicalPlan *logical.Plan, taskCacheEnabled bool) (*physical.Plan, time.Duration, error) { span := trace.SpanFromContext(ctx) timer := prometheus.NewTimer(e.metrics.physicalPlanning) catalog := physical.NewMetastoreCatalog(e.metastoreSectionsResolver(ctx, tenantID, logger, taskCacheEnabled)) // TODO(rfratto): It feels strange that we need to past the start/end time // to the physical planner. Isn't it already represented by the logical // plan? plannerCtx := physical.NewContext(params.Start(), params.End()) // Get the tenant's MaxQuerySeries limit and pass it to the planner context if enforcement is enabled if e.cfg.EnforceQuerySeriesLimit { plannerCtx = plannerCtx.WithMaxQuerySeries(e.limits.MaxQuerySeries(ctx, tenantID)) } planner := physical.NewPlanner(plannerCtx, catalog) physicalPlan, err := planner.Build(logicalPlan) if err != nil { level.Warn(logger).Log("msg", "failed to create physical plan", "err", err) span.RecordError(err) return nil, 0, ErrNotSupported } physicalPlan, err = planner.Optimize(physicalPlan) if err != nil { level.Warn(logger).Log("msg", "failed to optimize physical plan", "err", err) span.RecordError(err) return nil, 0, ErrNotSupported } physicalPlan, err = physical.WrapWithBatching(physicalPlan, e.cfg.Executor.BatchSize) if err != nil { level.Warn(logger).Log("msg", "failed to wrap physical plan with batching", "err", err) span.RecordError(err) return nil, 0, ErrNotSupported } duration := timer.ObserveDuration() level.Info(logger).Log( "msg", "finished physical planning", "plan", physical.PrintAsTree(physicalPlan), "duration", duration.String(), ) span.AddEvent("finished physical planning", trace.WithAttributes(attribute.Stringer("duration", duration))) return physicalPlan, duration, nil } func (e *Engine) metastoreSectionsResolver(ctx context.Context, tenantID string, logger log.Logger, cacheEnabled bool) physical.MetastoreSectionsResolver { planner := physical.NewMetastorePlanner(e.metastore, e.cfg.Executor.BatchSize) logger = log.With(logger, "subcomponent", "metastore") return func(selector physical.Expression, predicates []physical.Expression, start time.Time, end time.Time) ([]*metastore.DataobjSectionDescriptor, error) { ctx, span := xcap.StartSpan(ctx, tracer, "engine.metastoreResolver") defer span.End() plan, err := planner.Plan(ctx, selector, predicates, start, end) if err != nil { return nil, fmt.Errorf("metastore: build plan: %w", err) } // Disable admission lanes for metastore queries useAdmissionLanes := false wf, _, err := e.buildWorkflow(ctx, tenantID, logger, plan, useAdmissionLanes, cacheEnabled) if err != nil { return nil, fmt.Errorf("metastore: build workflow: %w", err) } defer wf.Close() pipeline, err := wf.Run(ctx) if err != nil { return nil, fmt.Errorf("metastore: run workflow: %w", err) } reader := executor.TranslateEOF(pipeline) defer reader.Close() if err := reader.Open(ctx); err != nil { return nil, fmt.Errorf("metastore: open pipeline: %w", err) } resp, err := e.metastore.CollectSections(ctx, metastore.CollectSectionsRequest{ // externalize EOFs returned by executor pipelines (executor.EOF -> io.EOF) // because metastore is not aware about executor implementation details Reader: reader, }) if err != nil { return nil, fmt.Errorf("metastore: collect sections: %w", err) } // close to report the stats reader.Close() return resp.SectionsResponse.Sections, nil } } // buildWorkflow builds a workflow from the given physical plan. func (e *Engine) buildWorkflow(ctx context.Context, tenantID string, logger log.Logger, physicalPlan *physical.Plan, useAdmissionLanes bool, cacheEnabled bool) (*workflow.Workflow, time.Duration, error) { span := trace.SpanFromContext(ctx) timer := prometheus.NewTimer(e.metrics.workflowPlanning) maxRunningScanTasks := 0 // Set max running tasks limits on when admission lanes are enabled if useAdmissionLanes { maxRunningScanTasks = e.limits.MaxScanTaskParallelism(tenantID) } opts := workflow.Options{ Tenant: tenantID, Actor: httpreq.ExtractActorPath(ctx), MaxRunningScanTasks: maxRunningScanTasks, MaxRunningOtherTasks: 0, CacheEnabled: cacheEnabled, MaxTaskCacheSize: uint64(e.cfg.Executor.TaskResultsCache.TaskResultMaxCacheableSize), MaxDataObjScanCacheSize: uint64(e.cfg.Executor.TaskResultsCache.DataObjScanResultMaxCacheableSize), CacheCompression: e.cfg.Executor.TaskResultsCache.Compression, PruneEmptyCachedTasks: e.cfg.Executor.TaskResultsCache.PruneEmptyCachedTasks, NonEmptyCachedTasksMaxSize: uint64(e.cfg.Executor.TaskResultsCache.PruneCachedTasksMaxSize), PruneCachedTasksFetchTimeout: e.cfg.Executor.TaskResultsCache.PruneCachedTasksFetchTimeout, TaskCacheRegistry: e.taskCaches, DebugTasks: e.limits.DebugEngineTasks(tenantID), DebugStreams: e.limits.DebugEngineStreams(tenantID), } wf, err := workflow.New(opts, logger, e.scheduler.inner, physicalPlan) if err != nil { level.Warn(logger).Log("msg", "failed to create workflow", "err", err) span.RecordError(err) return nil, 0, ErrNotSupported } duration := timer.ObserveDuration() // The execution plan can be way more verbose than the physical plan, so we // only log it at debug level. level.Debug(logger).Log( "msg", "generated execution plan", "plan", workflow.Sprint(wf), ) level.Info(logger).Log( "msg", "finished execution planning", "duration", duration.String(), "tasks", wf.Len(), ) span.AddEvent("finished execution planning", trace.WithAttributes(attribute.Stringer("duration", duration))) return wf, duration, nil } // collectResult processes the results of the execution plan. func (e *Engine) collectResult(ctx context.Context, logger log.Logger, params logql.Params, pipeline executor.Pipeline) (ResultBuilder, time.Duration, error) { span := trace.SpanFromContext(ctx) timer := prometheus.NewTimer(e.metrics.execution) encodingFlags := httpreq.ExtractEncodingFlagsFromCtx(ctx) var builder ResultBuilder switch params.GetExpression().(type) { case syntax.LogSelectorExpr: builder = newStreamsResultBuilder(params.Direction(), encodingFlags.Has(httpreq.FlagCategorizeLabels)) case syntax.SampleExpr: if params.Step() > 0 { builder = newMatrixResultBuilder() } else { builder = newVectorResultBuilder() } default: // This should never trigger since we already checked the expression // type in the logical planner. panic(fmt.Sprintf("invalid expression type %T", params.GetExpression())) } if err := pipeline.Open(ctx); err != nil { return nil, time.Duration(0), err } for { rec, err := pipeline.Read(ctx) if err != nil { if errors.Is(err, executor.EOF) { break } level.Warn(logger).Log( "msg", "error during execution", "err", err, ) return builder, time.Duration(0), err } builder.CollectRecord(rec) } duration := timer.ObserveDuration() // We don't log an event here because a final event will be logged by the // caller noting that execution completed. span.AddEvent("finished execution", trace.WithAttributes(attribute.Stringer("duration", duration)), ) return builder, duration, nil }