Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/engine/internal/executor/project.go

174 lines
5.8 KiB

package executor
import (
"context"
"fmt"
"slices"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/xcap"
)
func NewProjectPipeline(input Pipeline, proj *physical.Projection, evaluator *expressionEvaluator, region *xcap.Region) (Pipeline, error) {
// Shortcut for ALL=true DROP=false EXPAND=false
if proj.All && !proj.Drop && !proj.Expand {
return input, nil
}
// Get the column names from the projection expressions
colRefs := make([]types.ColumnRef, 0, len(proj.Expressions))
expandExprs := make([]physical.Expression, 0, len(proj.Expressions))
for i, expr := range proj.Expressions {
switch expr := expr.(type) {
case *physical.ColumnExpr:
colRefs = append(colRefs, expr.Ref)
case *physical.UnaryExpr:
expandExprs = append(expandExprs, expr)
case *physical.BinaryExpr:
expandExprs = append(expandExprs, expr)
case *physical.VariadicExpr:
expandExprs = append(expandExprs, expr)
default:
return nil, fmt.Errorf("projection expression %d is unsupported", i)
}
}
if len(expandExprs) > 1 {
return nil, fmt.Errorf("there might be only one math expression for `value` column at a time")
}
// Create KEEP projection pipeline:
// Drop all columns except the ones referenced in proj.Expressions.
if !proj.All && !proj.Drop && !proj.Expand {
return newKeepPipeline(colRefs, func(refs []types.ColumnRef, ident *semconv.Identifier) bool {
return slices.ContainsFunc(refs, func(ref types.ColumnRef) bool {
// Keep all of the ambiguous columns
if ref.Type == types.ColumnTypeAmbiguous {
return ref.Column == ident.ShortName()
}
// Keep only if type matches
return ref.Column == ident.ShortName() && ref.Type == ident.ColumnType()
})
}, input, region)
}
// Create DROP projection pipeline:
// Keep all columns except the ones referenced in proj.Expressions.
if proj.All && proj.Drop {
return newKeepPipeline(colRefs, func(refs []types.ColumnRef, ident *semconv.Identifier) bool {
return !slices.ContainsFunc(refs, func(ref types.ColumnRef) bool {
// Drop all of the ambiguous columns
if ref.Type == types.ColumnTypeAmbiguous {
return ref.Column == ident.ShortName()
}
// Drop only if type matches
return ref.Column == ident.ShortName() && ref.Type == ident.ColumnType()
})
}, input, region)
}
// Create EXPAND projection pipeline:
// Keep all columns and expand the ones referenced in proj.Expressions.
// TODO: as implemented, epanding and keeping/dropping cannot happen in the same projection. Is this desired?
if proj.All && proj.Expand && len(expandExprs) > 0 {
return newExpandPipeline(expandExprs[0], evaluator, input, region)
}
return nil, errNotImplemented
}
func newKeepPipeline(colRefs []types.ColumnRef, keepFunc func([]types.ColumnRef, *semconv.Identifier) bool, input Pipeline, region *xcap.Region) (*GenericPipeline, error) {
return newGenericPipelineWithRegion(func(ctx context.Context, inputs []Pipeline) (arrow.RecordBatch, error) {
if len(inputs) != 1 {
return nil, fmt.Errorf("expected 1 input, got %d", len(inputs))
}
input := inputs[0]
batch, err := input.Read(ctx)
if err != nil {
return nil, err
}
columns := make([]arrow.Array, 0, batch.NumCols())
fields := make([]arrow.Field, 0, batch.NumCols())
for i, field := range batch.Schema().Fields() {
ident, err := semconv.ParseFQN(field.Name)
if err != nil {
return nil, err
}
if keepFunc(colRefs, ident) {
columns = append(columns, batch.Column(i))
fields = append(fields, field)
}
}
metadata := batch.Schema().Metadata()
schema := arrow.NewSchema(fields, &metadata)
return array.NewRecordBatch(schema, columns, batch.NumRows()), nil
}, region, input), nil
}
func newExpandPipeline(expr physical.Expression, evaluator *expressionEvaluator, input Pipeline, region *xcap.Region) (*GenericPipeline, error) {
return newGenericPipelineWithRegion(func(ctx context.Context, inputs []Pipeline) (arrow.RecordBatch, error) {
if len(inputs) != 1 {
return nil, fmt.Errorf("expected 1 input, got %d", len(inputs))
}
input := inputs[0]
batch, err := input.Read(ctx)
if err != nil {
return nil, err
}
outputFields := make([]arrow.Field, 0)
outputCols := make([]arrow.Array, 0)
schema := batch.Schema()
// move all columns into the output except `value`
for i, field := range batch.Schema().Fields() {
ident, err := semconv.ParseFQN(schema.Field(i).Name)
if err != nil {
return nil, err
}
if !ident.Equal(semconv.ColumnIdentValue) {
outputCols = append(outputCols, batch.Column(i))
outputFields = append(outputFields, field)
}
}
vec, err := evaluator.eval(expr, batch)
if err != nil {
return nil, err
}
if vec == nil {
return batch, nil
}
switch arrCasted := vec.(type) {
case *array.Struct:
structSchema, ok := arrCasted.DataType().(*arrow.StructType)
if !ok {
return nil, fmt.Errorf("unexpected type returned from evaluation, expected *arrow.StructType, got %T", arrCasted.DataType())
}
for i := range arrCasted.NumField() {
outputCols = append(outputCols, arrCasted.Field(i))
outputFields = append(outputFields, structSchema.Field(i))
}
case *array.Float64:
outputFields = append(outputFields, semconv.FieldFromIdent(semconv.ColumnIdentValue, false))
outputCols = append(outputCols, arrCasted)
default:
return nil, fmt.Errorf("unexpected type returned from evaluation %T", arrCasted.DataType())
}
metadata := schema.Metadata()
outputSchema := arrow.NewSchema(outputFields, &metadata)
return array.NewRecordBatch(outputSchema, outputCols, batch.NumRows()), nil
}, region, input), nil
}