Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/engine/engine.go

611 lines
23 KiB

package engine
import (
"context"
"errors"
"flag"
"fmt"
"net/http"
gotrace "runtime/trace"
"strings"
"time"
"github.com/alecthomas/units"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/tenant"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"github.com/grafana/loki/v3/pkg/dataobj/metastore"
"github.com/grafana/loki/v3/pkg/engine/internal/deletion"
"github.com/grafana/loki/v3/pkg/engine/internal/executor"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/logical"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/workflow"
"github.com/grafana/loki/v3/pkg/logql"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/logqlmodel"
"github.com/grafana/loki/v3/pkg/logqlmodel/metadata"
"github.com/grafana/loki/v3/pkg/storage/chunk/cache"
"github.com/grafana/loki/v3/pkg/storage/chunk/cache/resultscache"
"github.com/grafana/loki/v3/pkg/util/httpreq"
util_log "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/rangeio"
"github.com/grafana/loki/v3/pkg/xcap"
)
var (
// ErrPlanningFailed is returned when query planning fails unexpectedly.
// ErrPlanningFailed is not used for unimplemented features, which returns
// [ErrNotSupported] instead.
ErrPlanningFailed = errors.New("query planning failed unexpectedly")
// ErrSchedulingFailed is returned when communication with the scheduler fails.
ErrSchedulingFailed = errors.New("failed to schedule query")
)
var tracer = otel.Tracer("pkg/engine")
// Re-export internal types for external use.
type (
// RequestStreamFilterer creates a StreamFilterer for a given request context.
RequestStreamFilterer = executor.RequestStreamFilterer
// StreamFilterer filters streams based on their labels.
StreamFilterer = executor.StreamFilterer
)
// ExecutorConfig configures engine execution.
type ExecutorConfig struct {
// Batch size of the v2 execution engine.
BatchSize int `yaml:"batch_size" category:"experimental"`
// PrefetchBytes controls the number of bytes read ahead from a data object
// when opening it.
PrefetchBytes flagext.Bytes `yaml:"prefetch_bytes" category:"experimental"`
// MergePrefetchCount controls the number of inputs that are prefetched simultaneously by any Merge node.
MergePrefetchCount int `yaml:"merge_prefetch_count" category:"experimental"`
// RangeConfig determines how to optimize range reads in the V2 engine.
RangeConfig rangeio.Config `yaml:"range_reads" category:"experimental" doc:"description=Configures how to read byte ranges from object storage when using the V2 engine."`
// StreamFilterer is an optional filterer that can filter streams based on their labels.
// When set, streams are filtered before scanning.
StreamFilterer executor.RequestStreamFilterer `yaml:"-"`
// TaskResultsCache configures the backing cache for task results.
TaskResultsCache TaskCacheConfig `yaml:"task_results_cache" category:"experimental"`
}
func (cfg *ExecutorConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.PrefetchBytes = flagext.Bytes(16 * units.KiB)
f.IntVar(&cfg.BatchSize, prefix+"batch-size", 100, "Experimental: Batch size of the next generation query engine.")
f.Var(&cfg.PrefetchBytes, prefix+"prefetch-bytes", "Experimental: Number of bytes to prefetch when opening a data object for decoding metadata and overlapping section reads. Clamps to at least 16KiB.")
f.IntVar(&cfg.MergePrefetchCount, prefix+"merge-prefetch-count", 0, "Experimental: The number of inputs that are prefetched simultaneously by any Merge node. A value of 0 means that only the currently processed input is prefetched, 1 means that only the next input is prefetched, and so on. A negative value means that all inputs are be prefetched in parallel.")
cfg.RangeConfig.RegisterFlags(prefix+"range-reads.", f)
cfg.TaskResultsCache.RegisterFlagsWithPrefix(prefix+"task-results-cache.", f)
}
// TaskCacheConfig extends resultscache.Config with additional task-cache-specific settings.
type TaskCacheConfig struct {
resultscache.Config `yaml:",inline"`
TaskResultMaxCacheableSize flagext.Bytes `yaml:"task_result_max_cacheable_size" category:"experimental"`
DataObjScanResultMaxCacheableSize flagext.Bytes `yaml:"dataobjscan_result_max_cacheable_size" category:"experimental"`
PruneEmptyCachedTasks bool `yaml:"prune_empty_cached_tasks" category:"experimental"`
PruneCachedTasksMaxSize flagext.Bytes `yaml:"prune_cached_tasks_max_size" category:"experimental"`
PruneCachedTasksFetchTimeout time.Duration `yaml:"prune_cached_tasks_fetch_timeout" category:"experimental"`
}
// RegisterFlagsWithPrefix registers flags for TaskCacheConfig with the given prefix.
func (cfg *TaskCacheConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.Config.RegisterFlagsWithPrefix(f, prefix)
f.Var(&cfg.TaskResultMaxCacheableSize, prefix+"task-result-max-cacheable-size",
"Experimental: Maximum size for a task result to be cacheable. 0 means only empty responses are cached.")
f.Var(&cfg.DataObjScanResultMaxCacheableSize, prefix+"dataobjscan-result-max-cacheable-size",
"Experimental: Maximum size for a DataObjScan result to be cacheable. 0 means only empty responses are cached.")
f.BoolVar(&cfg.PruneEmptyCachedTasks, prefix+"prune-empty-cached-tasks", false,
"Experimental: When enabled, the scheduler checks cached results at plan time and prunes tasks whose cached result is known to be empty.")
f.Var(&cfg.PruneCachedTasksMaxSize, prefix+"prune-cached-tasks-max-size",
"Experimental: Maximum total size of non-empty cached task results embedded in task assignments. "+
"Results that would exceed the budget are skipped (smaller results that fit are still included). "+
"0 disables non-empty task pruning.")
f.DurationVar(&cfg.PruneCachedTasksFetchTimeout, prefix+"prune-cached-tasks-fetch-timeout", time.Second,
"Experimental: Timeout for cache fetch operations during cached-task pruning at plan time. 0 disables the timeout.")
}
// Params holds parameters for constructing a new [Engine].
type Params struct {
Logger log.Logger // Logger for optional log messages.
Registerer prometheus.Registerer // Registerer for optional metrics.
Config Config // Config for the Engine.
Scheduler *Scheduler // Scheduler to manage the execution of tasks.
Metastore metastore.Metastore // Metastore to access the indexes
Limits logql.Limits // Limits to apply to engine queries.
DeleteGetter deletion.Getter // DeleteGetter to fetch delete requests for query-time filtering.
}
// validate validates p and applies defaults.
func (p *Params) validate() error {
if p.Logger == nil {
p.Logger = log.NewNopLogger()
}
if p.Registerer == nil {
p.Registerer = prometheus.NewRegistry()
}
if p.Scheduler == nil {
return errors.New("scheduler is required")
}
if p.Metastore == nil {
return errors.New("metastore is required")
}
if p.Config.Executor.BatchSize <= 0 {
return fmt.Errorf("invalid batch size for query engine. must be greater than 0, got %d", p.Config.Executor.BatchSize)
}
return nil
}
// Engine defines parameters for executing queries.
type Engine struct {
logger log.Logger
metrics *metrics
cfg Config
scheduler *Scheduler // Scheduler to manage the execution of tasks.
limits logql.Limits // Limits to apply to engine queries.
deleteGetter deletion.Getter // DeleteGetter to fetch delete requests for query-time filtering.
metastore metastore.Metastore
taskCaches executor.TaskCacheRegistry
}
// New creates a new Engine.
func New(params Params) (*Engine, error) {
if err := params.validate(); err != nil {
return nil, err
}
var taskCaches executor.TaskCacheRegistry
if params.Config.Executor.TaskResultsCache.PruneEmptyCachedTasks {
var err error
taskCaches, err = executor.NewTaskCacheRegistry(params.Config.Executor.TaskResultsCache.Config, params.Registerer, params.Logger)
if err != nil {
return nil, fmt.Errorf("creating task cache registry: %w", err)
}
}
e := &Engine{
logger: params.Logger,
metrics: newMetrics(params.Registerer),
cfg: params.Config,
scheduler: params.Scheduler,
limits: params.Limits,
deleteGetter: params.DeleteGetter,
metastore: params.Metastore,
taskCaches: taskCaches,
}
return e, nil
}
// Execute executes the given query. Execute returns [ErrNotSupported] if params
// denotes a query that is not yet implemented in the new engine.
func (e *Engine) Execute(ctx context.Context, params logql.Params) (logqlmodel.Result, error) {
// NOTE(rfratto): To simplify the API, Engine does not directly implement
// [logql.Engine], whose interface definition is not useful to the V2
// engine. As such, callers must define adapters to use Engine work as a
// [logql.Engine].
//
// This pain point will eventually go away as remaining usages of
// [logql.Engine] disappear.
// Starts the execution capture for the query.
// All recorded observations will be captured to it.
ctx, capture := xcap.NewCapture(ctx, nil)
defer capture.End()
startTime := time.Now()
ctx, task := gotrace.NewTask(ctx, "Engine.Execute")
defer task.End()
tenantID, err := tenant.TenantID(ctx)
if err != nil {
return logqlmodel.Result{}, httpgrpc.Error(http.StatusBadRequest, err.Error())
}
cacheEnabled := cache.IsCacheConfigured(e.cfg.Executor.TaskResultsCache.CacheConfig) && !params.CachingOptions().Disabled
ctx, span := xcap.StartSpan(ctx, tracer, "Engine.Execute",
trace.WithAttributes(
attribute.String("type", string(logql.GetRangeType(params))),
attribute.String("query", params.QueryString()),
attribute.Stringer("start", params.Start()),
attribute.Stringer("end", params.End()),
attribute.Stringer("step", params.Step()),
attribute.Stringer("length", params.End().Sub(params.Start())),
attribute.StringSlice("shards", params.Shards()),
attribute.Bool("cache_enabled", cacheEnabled),
),
)
defer span.End()
ctx = e.buildContext(ctx)
logger := util_log.WithContext(ctx, e.logger)
logger = log.With(logger, "engine", "v2")
logger = injectQueryTags(ctx, logger)
level.Info(logger).Log("msg", "starting query", "query", params.QueryString(), "shard", strings.Join(params.Shards(), ","))
logicalPlan, durLogicalPlanning, err := e.buildLogicalPlan(ctx, logger, params)
if err != nil {
e.metrics.subqueries.WithLabelValues(statusNotImplemented).Inc()
span.SetStatus(codes.Error, "failed to create logical plan")
return logqlmodel.Result{}, ErrNotSupported
}
gotrace.Log(ctx, "logical_planning", "done")
physicalPlan, durPhysicalPlanning, err := e.buildPhysicalPlan(ctx, tenantID, logger, params, logicalPlan, cacheEnabled)
if err != nil {
e.metrics.subqueries.WithLabelValues(statusFailure).Inc()
span.SetStatus(codes.Error, "failed to create physical plan")
return logqlmodel.Result{}, ErrPlanningFailed
}
gotrace.Log(ctx, "physical_planning", "done")
// Enable admission lanes only for log queries
useAdmissionLanes := !isMetricQuery(params.GetExpression())
wf, durWorkflowPlanning, err := e.buildWorkflow(ctx, tenantID, logger, physicalPlan, useAdmissionLanes, cacheEnabled)
if err != nil {
e.metrics.subqueries.WithLabelValues(statusFailure).Inc()
span.SetStatus(codes.Error, "failed to create execution plan")
return logqlmodel.Result{}, ErrPlanningFailed
}
defer wf.Close()
gotrace.Log(ctx, "workflow_planning", "done")
pipeline, err := wf.Run(ctx)
if err != nil {
level.Error(logger).Log("msg", "failed to execute query", "err", err)
e.metrics.subqueries.WithLabelValues(statusFailure).Inc()
span.SetStatus(codes.Error, "failed to execute query")
return logqlmodel.Result{}, ErrSchedulingFailed
}
defer pipeline.Close()
gotrace.Log(ctx, "collect_result", "start")
builder, durExecution, err := e.collectResult(ctx, logger, params, pipeline)
if err != nil {
e.metrics.subqueries.WithLabelValues(statusFailure).Inc()
span.SetStatus(codes.Error, "error during query execution")
return logqlmodel.Result{}, err
}
durFull := time.Since(startTime)
logValues := []any{
"msg", "finished executing",
"user_agent", httpreq.ExtractHeader(ctx, "User-Agent"),
"query", params.QueryString(),
"length", params.End().Sub(params.Start()).String(),
"step", params.Step().String(),
"duration_logical_planning", durLogicalPlanning,
"duration_physical_planning", durPhysicalPlanning,
"duration_workflow_planning", durWorkflowPlanning,
"duration_execution", durExecution,
"duration_full", durFull,
}
gotrace.Log(ctx, "collect_result", "done")
// Close the pipeline to calculate the stats.
pipeline.Close()
span.SetStatus(codes.Ok, "")
// explicitly call End() before exporting even though we have a defer above.
// It is safe to call End() multiple times.
span.End()
capture.End()
logValues = append(logValues, xcap.SummaryLogValues(capture)...)
level.Info(logger).Log(
logValues...,
)
// TODO: capture and report queue time
md := metadata.FromContext(ctx)
stats := capture.ToStatsSummary(durFull, 0, builder.Len())
result := builder.Build(stats, md)
return result, nil
}
// buildContext initializes a request-scoped context prior to execution.
func (e *Engine) buildContext(ctx context.Context) context.Context {
metadataContext, ctx := metadata.NewContext(ctx)
// Inject the range config into the context for any calls to
// [rangeio.ReadRanges] to make use of.
ctx = rangeio.WithConfig(ctx, &e.cfg.Executor.RangeConfig)
metadataContext.AddWarning("Query was executed using the new experimental query engine and dataobj storage.")
return ctx
}
// injectQueryTags adds query tags as key-value pairs from the context into the
// given logger, if they have been defined via [httpreq.InjectQueryTags].
// Otherwise, the original logger is returned unmodified.
func injectQueryTags(ctx context.Context, logger log.Logger) log.Logger {
tags := httpreq.ExtractQueryTagsFromContext(ctx)
if len(tags) == 0 {
return logger
}
return log.With(logger, httpreq.TagsToKeyValues(tags)...)
}
// isMetricQuery returns true if the given expression is a metric query,
// false if it is a log query.
func isMetricQuery(expr syntax.Expr) bool {
_, ok := expr.(syntax.SampleExpr)
return ok
}
// buildLogicalPlan builds a logical plan from the given params.
func (e *Engine) buildLogicalPlan(ctx context.Context, logger log.Logger, params logql.Params) (*logical.Plan, time.Duration, error) {
span := trace.SpanFromContext(ctx)
timer := prometheus.NewTimer(e.metrics.logicalPlanning)
var deleteReqs []*deletion.Request
if e.deleteGetter != nil {
var err error
deleteReqs, err = deletion.DeletesForUser(ctx, params.Start(), params.End(), e.deleteGetter)
if err != nil {
return nil, 0, fmt.Errorf("failed to get delete requests: %w", err)
}
}
logicalPlan, err := logical.BuildPlanWithDeletes(ctx, params, deleteReqs)
if err != nil {
level.Warn(logger).Log("msg", "failed to create logical plan", "err", err)
span.RecordError(err)
return nil, 0, ErrNotSupported
}
if err := logical.Optimize(logicalPlan); err != nil {
level.Warn(logger).Log("msg", "failed to optimize logical plan", "err", err)
span.RecordError(err)
return nil, 0, ErrNotSupported
}
duration := timer.ObserveDuration()
level.Info(logger).Log(
"msg", "finished logical planning",
"plan", logicalPlan.String(),
"duration", duration.String(),
)
span.AddEvent("finished logical planning",
trace.WithAttributes(
attribute.Stringer("plan", logicalPlan),
attribute.Stringer("duration", duration),
),
)
return logicalPlan, duration, nil
}
// buildPhysicalPlan builds a physical plan from the given logical plan.
func (e *Engine) buildPhysicalPlan(ctx context.Context, tenantID string, logger log.Logger, params logql.Params, logicalPlan *logical.Plan, taskCacheEnabled bool) (*physical.Plan, time.Duration, error) {
span := trace.SpanFromContext(ctx)
timer := prometheus.NewTimer(e.metrics.physicalPlanning)
catalog := physical.NewMetastoreCatalog(e.metastoreSectionsResolver(ctx, tenantID, logger, taskCacheEnabled))
// TODO(rfratto): It feels strange that we need to past the start/end time
// to the physical planner. Isn't it already represented by the logical
// plan?
plannerCtx := physical.NewContext(params.Start(), params.End())
// Get the tenant's MaxQuerySeries limit and pass it to the planner context if enforcement is enabled
if e.cfg.EnforceQuerySeriesLimit {
plannerCtx = plannerCtx.WithMaxQuerySeries(e.limits.MaxQuerySeries(ctx, tenantID))
}
planner := physical.NewPlanner(plannerCtx, catalog)
physicalPlan, err := planner.Build(logicalPlan)
if err != nil {
level.Warn(logger).Log("msg", "failed to create physical plan", "err", err)
span.RecordError(err)
return nil, 0, ErrNotSupported
}
physicalPlan, err = planner.Optimize(physicalPlan)
if err != nil {
level.Warn(logger).Log("msg", "failed to optimize physical plan", "err", err)
span.RecordError(err)
return nil, 0, ErrNotSupported
}
physicalPlan, err = physical.WrapWithBatching(physicalPlan, e.cfg.Executor.BatchSize)
if err != nil {
level.Warn(logger).Log("msg", "failed to wrap physical plan with batching", "err", err)
span.RecordError(err)
return nil, 0, ErrNotSupported
}
duration := timer.ObserveDuration()
level.Info(logger).Log(
"msg", "finished physical planning",
"plan", physical.PrintAsTree(physicalPlan),
"duration", duration.String(),
)
span.AddEvent("finished physical planning", trace.WithAttributes(attribute.Stringer("duration", duration)))
return physicalPlan, duration, nil
}
func (e *Engine) metastoreSectionsResolver(ctx context.Context, tenantID string, logger log.Logger, cacheEnabled bool) physical.MetastoreSectionsResolver {
planner := physical.NewMetastorePlanner(e.metastore, e.cfg.Executor.BatchSize)
logger = log.With(logger, "subcomponent", "metastore")
return func(selector physical.Expression, predicates []physical.Expression, start time.Time, end time.Time) ([]*metastore.DataobjSectionDescriptor, error) {
ctx, span := xcap.StartSpan(ctx, tracer, "engine.metastoreResolver")
defer span.End()
plan, err := planner.Plan(ctx, selector, predicates, start, end)
if err != nil {
return nil, fmt.Errorf("metastore: build plan: %w", err)
}
// Disable admission lanes for metastore queries
useAdmissionLanes := false
wf, _, err := e.buildWorkflow(ctx, tenantID, logger, plan, useAdmissionLanes, cacheEnabled)
if err != nil {
return nil, fmt.Errorf("metastore: build workflow: %w", err)
}
defer wf.Close()
pipeline, err := wf.Run(ctx)
if err != nil {
return nil, fmt.Errorf("metastore: run workflow: %w", err)
}
reader := executor.TranslateEOF(pipeline)
defer reader.Close()
if err := reader.Open(ctx); err != nil {
return nil, fmt.Errorf("metastore: open pipeline: %w", err)
}
resp, err := e.metastore.CollectSections(ctx, metastore.CollectSectionsRequest{
// externalize EOFs returned by executor pipelines (executor.EOF -> io.EOF)
// because metastore is not aware about executor implementation details
Reader: reader,
})
if err != nil {
return nil, fmt.Errorf("metastore: collect sections: %w", err)
}
// close to report the stats
reader.Close()
return resp.SectionsResponse.Sections, nil
}
}
// buildWorkflow builds a workflow from the given physical plan.
func (e *Engine) buildWorkflow(ctx context.Context, tenantID string, logger log.Logger, physicalPlan *physical.Plan, useAdmissionLanes bool, cacheEnabled bool) (*workflow.Workflow, time.Duration, error) {
span := trace.SpanFromContext(ctx)
timer := prometheus.NewTimer(e.metrics.workflowPlanning)
maxRunningScanTasks := 0
// Set max running tasks limits on when admission lanes are enabled
if useAdmissionLanes {
maxRunningScanTasks = e.limits.MaxScanTaskParallelism(tenantID)
}
opts := workflow.Options{
Tenant: tenantID,
Actor: httpreq.ExtractActorPath(ctx),
MaxRunningScanTasks: maxRunningScanTasks,
MaxRunningOtherTasks: 0,
CacheEnabled: cacheEnabled,
MaxTaskCacheSize: uint64(e.cfg.Executor.TaskResultsCache.TaskResultMaxCacheableSize),
MaxDataObjScanCacheSize: uint64(e.cfg.Executor.TaskResultsCache.DataObjScanResultMaxCacheableSize),
CacheCompression: e.cfg.Executor.TaskResultsCache.Compression,
PruneEmptyCachedTasks: e.cfg.Executor.TaskResultsCache.PruneEmptyCachedTasks,
NonEmptyCachedTasksMaxSize: uint64(e.cfg.Executor.TaskResultsCache.PruneCachedTasksMaxSize),
PruneCachedTasksFetchTimeout: e.cfg.Executor.TaskResultsCache.PruneCachedTasksFetchTimeout,
TaskCacheRegistry: e.taskCaches,
DebugTasks: e.limits.DebugEngineTasks(tenantID),
DebugStreams: e.limits.DebugEngineStreams(tenantID),
}
wf, err := workflow.New(opts, logger, e.scheduler.inner, physicalPlan)
if err != nil {
level.Warn(logger).Log("msg", "failed to create workflow", "err", err)
span.RecordError(err)
return nil, 0, ErrNotSupported
}
duration := timer.ObserveDuration()
// The execution plan can be way more verbose than the physical plan, so we
// only log it at debug level.
level.Debug(logger).Log(
"msg", "generated execution plan",
"plan", workflow.Sprint(wf),
)
level.Info(logger).Log(
"msg", "finished execution planning",
"duration", duration.String(),
"tasks", wf.Len(),
)
span.AddEvent("finished execution planning", trace.WithAttributes(attribute.Stringer("duration", duration)))
return wf, duration, nil
}
// collectResult processes the results of the execution plan.
func (e *Engine) collectResult(ctx context.Context, logger log.Logger, params logql.Params, pipeline executor.Pipeline) (ResultBuilder, time.Duration, error) {
span := trace.SpanFromContext(ctx)
timer := prometheus.NewTimer(e.metrics.execution)
encodingFlags := httpreq.ExtractEncodingFlagsFromCtx(ctx)
var builder ResultBuilder
switch params.GetExpression().(type) {
case syntax.LogSelectorExpr:
builder = newStreamsResultBuilder(params.Direction(), encodingFlags.Has(httpreq.FlagCategorizeLabels))
case syntax.SampleExpr:
if params.Step() > 0 {
builder = newMatrixResultBuilder()
} else {
builder = newVectorResultBuilder()
}
default:
// This should never trigger since we already checked the expression
// type in the logical planner.
panic(fmt.Sprintf("invalid expression type %T", params.GetExpression()))
}
if err := pipeline.Open(ctx); err != nil {
return nil, time.Duration(0), err
}
for {
rec, err := pipeline.Read(ctx)
if err != nil {
if errors.Is(err, executor.EOF) {
break
}
level.Warn(logger).Log(
"msg", "error during execution",
"err", err,
)
return builder, time.Duration(0), err
}
builder.CollectRecord(rec)
}
duration := timer.ObserveDuration()
// We don't log an event here because a final event will be logged by the
// caller noting that execution completed.
span.AddEvent("finished execution",
trace.WithAttributes(attribute.Stringer("duration", duration)),
)
return builder, duration, nil
}