package executor import ( "context" "errors" "fmt" "github.com/apache/arrow-go/v18/arrow/memory" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/thanos-io/objstore" "github.com/grafana/loki/v3/pkg/dataobj" "github.com/grafana/loki/v3/pkg/dataobj/sections/logs" "github.com/grafana/loki/v3/pkg/dataobj/sections/streams" "github.com/grafana/loki/v3/pkg/engine/internal/types" "github.com/grafana/loki/v3/pkg/engine/planner/physical" ) type Config struct { BatchSize int64 Bucket objstore.Bucket DataobjScanPageCacheSize int64 } func Run(ctx context.Context, cfg Config, plan *physical.Plan, logger log.Logger) Pipeline { c := &Context{ plan: plan, batchSize: cfg.BatchSize, bucket: cfg.Bucket, logger: logger, dataobjScanPageCacheSize: cfg.DataobjScanPageCacheSize, } if plan == nil { return errorPipeline(errors.New("plan is nil")) } node, err := plan.Root() if err != nil { return errorPipeline(err) } return c.execute(ctx, node) } // Context is the execution context type Context struct { batchSize int64 logger log.Logger plan *physical.Plan evaluator expressionEvaluator bucket objstore.Bucket dataobjScanPageCacheSize int64 } func (c *Context) execute(ctx context.Context, node physical.Node) Pipeline { children := c.plan.Children(node) inputs := make([]Pipeline, 0, len(children)) for _, child := range children { inputs = append(inputs, c.execute(ctx, child)) } switch n := node.(type) { case *physical.DataObjScan: return c.executeDataObjScan(ctx, n) case *physical.SortMerge: return c.executeSortMerge(ctx, n, inputs) case *physical.Limit: return c.executeLimit(ctx, n, inputs) case *physical.Filter: return c.executeFilter(ctx, n, inputs) case *physical.Projection: return c.executeProjection(ctx, n, inputs) case *physical.RangeAggregation: return c.executeRangeAggregation(ctx, n, inputs) case *physical.VectorAggregation: return c.executeVectorAggregation(ctx, n, inputs) default: return errorPipeline(fmt.Errorf("invalid node type: %T", node)) } } func (c *Context) executeDataObjScan(ctx context.Context, node *physical.DataObjScan) Pipeline { if c.bucket == nil { return errorPipeline(errors.New("no object store bucket configured")) } obj, err := dataobj.FromBucket(ctx, c.bucket, string(node.Location)) if err != nil { return errorPipeline(fmt.Errorf("creating data object: %w", err)) } var ( streamsSection *streams.Section logsSection *logs.Section ) for _, sec := range obj.Sections().Filter(streams.CheckSection) { if streamsSection != nil { return errorPipeline(fmt.Errorf("multiple streams sections found in data object %q", node.Location)) } var err error streamsSection, err = streams.Open(ctx, sec) if err != nil { return errorPipeline(fmt.Errorf("opening streams section %q: %w", sec.Type, err)) } } if streamsSection == nil { return errorPipeline(fmt.Errorf("streams section not found in data object %q", node.Location)) } for i, sec := range obj.Sections().Filter(logs.CheckSection) { if i != node.Section { continue } var err error logsSection, err = logs.Open(ctx, sec) if err != nil { return errorPipeline(fmt.Errorf("opening logs section %q: %w", sec.Type, err)) } } if logsSection == nil { return errorPipeline(fmt.Errorf("logs section %d not found in data object %q", node.Section, node.Location)) } predicates := make([]logs.Predicate, 0, len(node.Predicates)) for _, p := range node.Predicates { conv, err := buildLogsPredicate(p, logsSection.Columns()) if err != nil { return errorPipeline(err) } predicates = append(predicates, conv) } var pipeline Pipeline = newDataobjScanPipeline(dataobjScanOptions{ // TODO(rfratto): passing the streams section means that each DataObjScan // will read the entire streams section (for IDs being loaded), which is // going to be quite a bit of wasted effort. // // Longer term, there should be a dedicated plan node which handles joining // streams and log records based on StreamID, which is shared between all // sections in the same object. StreamsSection: streamsSection, LogsSection: logsSection, StreamIDs: node.StreamIDs, Predicates: predicates, Projections: node.Projections, // TODO(rfratto): pass custom allocator Allocator: memory.DefaultAllocator, BatchSize: c.batchSize, CacheSize: int(c.dataobjScanPageCacheSize), }) sortType, sortDirection, err := logsSection.PrimarySortOrder() if err != nil { level.Warn(c.logger).Log("msg", "could not determine primary sort order for logs section, forcing topk sort", "err", err) sortType = logs.ColumnTypeInvalid sortDirection = logs.SortDirectionUnspecified } // Wrap our pipeline to enforce expected sorting. We always emit logs in // timestamp-sorted order, so we need to sort if either the section doesn't // match the expected sort order or the requested sort type is not timestamp. // // If it's already sorted, we wrap by LimitPipeline to enforce the limit // given to the node (if defined). if node.Direction != physical.UNSORTED && (node.Direction != logsSortOrder(sortDirection) || sortType != logs.ColumnTypeTimestamp) { level.Debug(c.logger).Log("msg", "sorting logs section", "source_sort", sortType, "source_direction", sortDirection, "requested_sort", logs.ColumnTypeTimestamp, "requested_dir", node.Direction) pipeline, err = newTopkPipeline(topkOptions{ Inputs: []Pipeline{pipeline}, SortBy: []physical.ColumnExpression{ &physical.ColumnExpr{ Ref: types.ColumnRef{ Column: types.ColumnNameBuiltinTimestamp, Type: types.ColumnTypeBuiltin, }, }, }, Ascending: node.Direction == physical.ASC, K: int(node.Limit), MaxUnused: int(c.batchSize) * 2, }) if err != nil { return errorPipeline(err) } } else if node.Limit > 0 { pipeline = NewLimitPipeline(pipeline, 0, node.Limit) } return pipeline } func logsSortOrder(dir logs.SortDirection) physical.SortOrder { switch dir { case logs.SortDirectionAscending: return physical.ASC case logs.SortDirectionDescending: return physical.DESC } return physical.UNSORTED } func (c *Context) executeSortMerge(_ context.Context, sortmerge *physical.SortMerge, inputs []Pipeline) Pipeline { if len(inputs) == 0 { return emptyPipeline() } pipeline, err := NewSortMergePipeline(inputs, sortmerge.Order, sortmerge.Column, c.evaluator) if err != nil { return errorPipeline(err) } return pipeline } func (c *Context) executeLimit(_ context.Context, limit *physical.Limit, inputs []Pipeline) Pipeline { if len(inputs) == 0 { return emptyPipeline() } if len(inputs) > 1 { return errorPipeline(fmt.Errorf("limit expects exactly one input, got %d", len(inputs))) } return NewLimitPipeline(inputs[0], limit.Skip, limit.Fetch) } func (c *Context) executeFilter(_ context.Context, filter *physical.Filter, inputs []Pipeline) Pipeline { if len(inputs) == 0 { return emptyPipeline() } // TODO: support multiple inputs if len(inputs) > 1 { return errorPipeline(fmt.Errorf("filter expects exactly one input, got %d", len(inputs))) } return NewFilterPipeline(filter, inputs[0], c.evaluator) } func (c *Context) executeProjection(_ context.Context, proj *physical.Projection, inputs []Pipeline) Pipeline { if len(inputs) == 0 { return emptyPipeline() } if len(inputs) > 1 { // unsupported for now return errorPipeline(fmt.Errorf("projection expects exactly one input, got %d", len(inputs))) } if len(proj.Columns) == 0 { return errorPipeline(fmt.Errorf("projection expects at least one column, got 0")) } p, err := NewProjectPipeline(inputs[0], proj.Columns, &c.evaluator) if err != nil { return errorPipeline(err) } return p } func (c *Context) executeRangeAggregation(_ context.Context, plan *physical.RangeAggregation, inputs []Pipeline) Pipeline { if len(inputs) == 0 { return emptyPipeline() } pipeline, err := NewRangeAggregationPipeline(inputs, c.evaluator, rangeAggregationOptions{ partitionBy: plan.PartitionBy, startTs: plan.Start, endTs: plan.End, rangeInterval: plan.Range, step: plan.Step, }) if err != nil { return errorPipeline(err) } return pipeline } func (c *Context) executeVectorAggregation(_ context.Context, plan *physical.VectorAggregation, inputs []Pipeline) Pipeline { if len(inputs) == 0 { return emptyPipeline() } pipeline, err := NewVectorAggregationPipeline(inputs, plan.GroupBy, c.evaluator) if err != nil { return errorPipeline(err) } return pipeline }