Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/engine/internal/executor/cast.go

177 lines
4.4 KiB

package executor
import (
"fmt"
"strconv"
"time"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/dustin/go-humanize"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
)
func castFn(operation types.UnaryOp) UnaryFunction {
return UnaryFunc(func(input arrow.Array) (arrow.Array, error) {
sourceCol, ok := input.(*array.String)
if !ok {
return nil, fmt.Errorf("expected column to be of type string, got %T", input)
}
// Get conversion function and process values
conversionFn := getConversionFunction(operation)
castCol, errTracker := castValues(sourceCol, conversionFn)
// Build error columns if needed
errorCol, errorDetailsCol := errTracker.buildArrays()
// Build output schema and record
fields := buildValueAndErrorFields(errTracker.hasErrors)
return buildValueAndErrorStruct(castCol, errorCol, errorDetailsCol, fields)
})
}
type conversionFn func(value string) (float64, error)
func getConversionFunction(operation types.UnaryOp) conversionFn {
switch operation {
case types.UnaryOpCastBytes:
return convertBytes
case types.UnaryOpCastDuration:
return convertDuration
default:
return convertFloat
}
}
func castValues(
sourceCol *array.String,
conversionFn conversionFn,
) (arrow.Array, *errorTracker) {
castBuilder := array.NewFloat64Builder(memory.DefaultAllocator)
tracker := newErrorTracker()
for i := 0; i < sourceCol.Len(); i++ {
if sourceCol.IsNull(i) {
castBuilder.AppendNull()
tracker.recordSuccess()
} else {
valueStr := sourceCol.Value(i)
if val, err := conversionFn(valueStr); err == nil {
castBuilder.Append(val)
tracker.recordSuccess()
} else {
// Use 0.0 as default for errors, for backwards compatibility with old engine
castBuilder.Append(0.0)
tracker.recordError(i, err)
}
}
}
return castBuilder.NewArray(), tracker
}
func buildValueAndErrorFields(
hasErrors bool,
) []arrow.Field {
fields := make([]arrow.Field, 0, 3)
// Add value field. Not nullable in practice since we use 0.0 when conversion fails, but as of
// writing all coumns are marked as nullable, even Timestamp and Message, so staying consistent
fields = append(fields, semconv.FieldFromIdent(semconv.ColumnIdentValue, true))
// Add error fields if needed
if hasErrors {
fields = append(fields,
semconv.FieldFromIdent(semconv.ColumnIdentError, true),
semconv.FieldFromIdent(semconv.ColumnIdentErrorDetails, true),
)
}
return fields
}
func buildValueAndErrorStruct(
valVol, errorCol, errorDetailsCol arrow.Array,
fields []arrow.Field,
) (*array.Struct, error) {
hasErrors := errorCol != nil
totalCols := 1
if hasErrors {
totalCols += 2
}
columns := make([]arrow.Array, totalCols)
// Add new columns - these are newly created so don't need extra retain
columns[0] = valVol
if hasErrors {
columns[1] = errorCol
columns[2] = errorDetailsCol
}
// NewStructArrayWithFields will retain all columns
return array.NewStructArrayWithFields(columns, fields)
}
func convertFloat(v string) (float64, error) {
return strconv.ParseFloat(v, 64)
}
func convertDuration(v string) (float64, error) {
d, err := time.ParseDuration(v)
if err != nil {
return 0, err
}
return d.Seconds(), nil
}
func convertBytes(v string) (float64, error) {
b, err := humanize.ParseBytes(v)
if err != nil {
return 0, err
}
return float64(b), nil
}
type errorTracker struct {
hasErrors bool
errorBuilder *array.StringBuilder
detailsBuilder *array.StringBuilder
}
func newErrorTracker() *errorTracker {
return &errorTracker{}
}
func (et *errorTracker) recordError(rowIndex int, err error) {
if !et.hasErrors {
et.errorBuilder = array.NewStringBuilder(memory.DefaultAllocator)
et.detailsBuilder = array.NewStringBuilder(memory.DefaultAllocator)
// Backfill nulls for previous rows
et.errorBuilder.AppendNulls(rowIndex)
et.detailsBuilder.AppendNulls(rowIndex)
et.hasErrors = true
}
et.errorBuilder.Append(types.SampleExtractionErrorType)
et.detailsBuilder.Append(err.Error())
}
func (et *errorTracker) recordSuccess() {
if et.hasErrors {
et.errorBuilder.AppendNull()
et.detailsBuilder.AppendNull()
}
}
func (et *errorTracker) buildArrays() (arrow.Array, arrow.Array) {
if !et.hasErrors {
return nil, nil
}
return et.errorBuilder.NewArray(), et.detailsBuilder.NewArray()
}