Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/engine/internal/executor/vector_aggregate.go

217 lines
5.9 KiB

package executor
import (
"context"
"errors"
"fmt"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/xcap"
)
// vectorAggregationPipeline is a pipeline that performs vector aggregations.
//
// It reads from the input pipeline, groups the data by specified columns,
// and applies the aggregation function on each group.
type vectorAggregationPipeline struct {
inputs []Pipeline
inputsExhausted bool // indicates if all inputs are exhausted
aggregator *aggregator
evaluator expressionEvaluator
grouping physical.Grouping
region *xcap.Region
tsEval evalFunc // used to evaluate the timestamp column
valueEval evalFunc // used to evaluate the value column
}
var (
vectorAggregationOperations = map[types.VectorAggregationType]aggregationOperation{
types.VectorAggregationTypeSum: aggregationOperationSum,
types.VectorAggregationTypeCount: aggregationOperationCount,
}
)
func newVectorAggregationPipeline(inputs []Pipeline, grouping physical.Grouping, evaluator expressionEvaluator, operation types.VectorAggregationType, region *xcap.Region) (*vectorAggregationPipeline, error) {
if len(inputs) == 0 {
return nil, fmt.Errorf("vector aggregation expects at least one input")
}
op, ok := vectorAggregationOperations[operation]
if !ok {
panic(fmt.Sprintf("unknown vector aggregation operation: %v", operation))
}
return &vectorAggregationPipeline{
inputs: inputs,
evaluator: evaluator,
grouping: grouping,
aggregator: newAggregator(0, op),
region: region,
tsEval: evaluator.newFunc(&physical.ColumnExpr{
Ref: types.ColumnRef{
Column: types.ColumnNameBuiltinTimestamp,
Type: types.ColumnTypeBuiltin,
},
}),
valueEval: evaluator.newFunc(&physical.ColumnExpr{
Ref: types.ColumnRef{
Column: types.ColumnNameGeneratedValue,
Type: types.ColumnTypeGenerated,
},
}),
}, nil
}
// Read reads the next value into its state.
func (v *vectorAggregationPipeline) Read(ctx context.Context) (arrow.RecordBatch, error) {
if v.inputsExhausted {
return nil, EOF
}
return v.read(ctx)
}
func (v *vectorAggregationPipeline) read(ctx context.Context) (arrow.RecordBatch, error) {
v.aggregator.Reset() // reset before reading new inputs
inputsExhausted := false
for !inputsExhausted {
inputsExhausted = true
for _, input := range v.inputs {
record, err := input.Read(ctx)
if err != nil {
if errors.Is(err, EOF) {
continue
}
return nil, err
}
inputsExhausted = false
// extract timestamp column
tsVec, err := v.tsEval(record)
if err != nil {
return nil, err
}
tsCol := tsVec.(*array.Timestamp)
// extract value column
valueVec, err := v.valueEval(record)
if err != nil {
return nil, err
}
valueArr := valueVec.(*array.Float64)
// extract all the columns that are used for grouping
var arrays []*array.String
var fields []arrow.Field
if v.grouping.Without {
// Grouping without a lable set. Exclude lables from that set.
schema := record.Schema()
for i, field := range schema.Fields() {
ident, err := semconv.ParseFQN(field.Name)
if err != nil {
return nil, err
}
if ident.ColumnType() == types.ColumnTypeLabel ||
ident.ColumnType() == types.ColumnTypeMetadata ||
ident.ColumnType() == types.ColumnTypeParsed {
found := false
for _, g := range v.grouping.Columns {
colExpr, ok := g.(*physical.ColumnExpr)
if !ok {
return nil, fmt.Errorf("unknown column expression %v", g)
}
// Match ambiguous columns only by name
if colExpr.Ref.Type == types.ColumnTypeAmbiguous && colExpr.Ref.Column == ident.ShortName() {
found = true
break
}
// Match all other columns by name and type
if colExpr.Ref.Column == ident.ShortName() && colExpr.Ref.Type == ident.ColumnType() {
found = true
break
}
}
if !found {
arrays = append(arrays, record.Column(i).(*array.String))
fields = append(fields, field)
}
}
}
} else {
// Gouping by a label set. Take only labels from that set.
for _, columnExpr := range v.grouping.Columns {
vec, err := v.evaluator.eval(columnExpr, record)
if err != nil {
return nil, err
}
if vec.DataType().ID() != types.Arrow.String.ID() {
return nil, fmt.Errorf("unsupported datatype for grouping %s", vec.DataType())
}
arr := vec.(*array.String)
arrays = append(arrays, arr)
colExpr, ok := columnExpr.(*physical.ColumnExpr)
if !ok {
return nil, fmt.Errorf("invalid column expression type %T", columnExpr)
}
ident := semconv.NewIdentifier(colExpr.Ref.Column, colExpr.Ref.Type, types.Loki.String)
fields = append(fields, semconv.FieldFromIdent(ident, true))
}
}
v.aggregator.AddLabels(fields)
for row := range int(record.NumRows()) {
if valueArr.IsNull(row) {
continue
}
labelValues := make([]string, 0, len(arrays))
labels := make([]arrow.Field, 0, len(arrays))
for i, arr := range arrays {
val := arr.Value(row)
if val != "" {
labelValues = append(labelValues, val)
labels = append(labels, fields[i])
}
}
v.aggregator.Add(tsCol.Value(row).ToTime(arrow.Nanosecond), valueArr.Value(row), labels, labelValues)
}
}
}
v.inputsExhausted = true
return v.aggregator.BuildRecord()
}
// Close closes the resources of the pipeline.
func (v *vectorAggregationPipeline) Close() {
if v.region != nil {
v.region.End()
}
for _, input := range v.inputs {
input.Close()
}
}
// Region implements RegionProvider.
func (v *vectorAggregationPipeline) Region() *xcap.Region {
return v.region
}