mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
282 lines
11 KiB
282 lines
11 KiB
package executor
|
|
|
|
import (
|
|
"github.com/apache/arrow-go/v18/arrow"
|
|
"github.com/apache/arrow-go/v18/arrow/array"
|
|
"github.com/apache/arrow-go/v18/arrow/memory"
|
|
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/errors"
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/types"
|
|
)
|
|
|
|
var (
|
|
unaryFunctions UnaryFunctionRegistry = &unaryFuncReg{}
|
|
binaryFunctions BinaryFunctionRegistry = &binaryFuncReg{}
|
|
)
|
|
|
|
func init() {
|
|
// Functions for [types.BinaryOpEq]
|
|
binaryFunctions.register(types.BinaryOpEq, arrow.FixedWidthTypes.Boolean, &boolCompareFunction{cmp: func(a, b bool) bool { return a == b }})
|
|
binaryFunctions.register(types.BinaryOpEq, arrow.BinaryTypes.String, &strCompareFunction{cmp: func(a, b string) bool { return a == b }})
|
|
binaryFunctions.register(types.BinaryOpEq, arrow.PrimitiveTypes.Int64, &intCompareFunction{cmp: func(a, b int64) bool { return a == b }})
|
|
binaryFunctions.register(types.BinaryOpEq, arrow.PrimitiveTypes.Uint64, ×tampCompareFunction{cmp: func(a, b uint64) bool { return a == b }})
|
|
binaryFunctions.register(types.BinaryOpEq, arrow.PrimitiveTypes.Float64, &floatCompareFunction{cmp: func(a, b float64) bool { return a == b }})
|
|
// Functions for [types.BinaryOpNeq]
|
|
binaryFunctions.register(types.BinaryOpNeq, arrow.FixedWidthTypes.Boolean, &boolCompareFunction{cmp: func(a, b bool) bool { return a != b }})
|
|
binaryFunctions.register(types.BinaryOpNeq, arrow.BinaryTypes.String, &strCompareFunction{cmp: func(a, b string) bool { return a != b }})
|
|
binaryFunctions.register(types.BinaryOpNeq, arrow.PrimitiveTypes.Int64, &intCompareFunction{cmp: func(a, b int64) bool { return a != b }})
|
|
binaryFunctions.register(types.BinaryOpNeq, arrow.PrimitiveTypes.Uint64, ×tampCompareFunction{cmp: func(a, b uint64) bool { return a != b }})
|
|
binaryFunctions.register(types.BinaryOpNeq, arrow.PrimitiveTypes.Float64, &floatCompareFunction{cmp: func(a, b float64) bool { return a != b }})
|
|
// Functions for [types.BinaryOpGt]
|
|
binaryFunctions.register(types.BinaryOpGt, arrow.FixedWidthTypes.Boolean, &boolCompareFunction{cmp: func(a, b bool) bool { return boolToInt(a) > boolToInt(b) }})
|
|
binaryFunctions.register(types.BinaryOpGt, arrow.BinaryTypes.String, &strCompareFunction{cmp: func(a, b string) bool { return a > b }})
|
|
binaryFunctions.register(types.BinaryOpGt, arrow.PrimitiveTypes.Int64, &intCompareFunction{cmp: func(a, b int64) bool { return a > b }})
|
|
binaryFunctions.register(types.BinaryOpGt, arrow.PrimitiveTypes.Uint64, ×tampCompareFunction{cmp: func(a, b uint64) bool { return a > b }})
|
|
binaryFunctions.register(types.BinaryOpGt, arrow.PrimitiveTypes.Float64, &floatCompareFunction{cmp: func(a, b float64) bool { return a > b }})
|
|
// Functions for [types.BinaryOpGte]
|
|
binaryFunctions.register(types.BinaryOpGte, arrow.FixedWidthTypes.Boolean, &boolCompareFunction{cmp: func(a, b bool) bool { return boolToInt(a) >= boolToInt(b) }})
|
|
binaryFunctions.register(types.BinaryOpGte, arrow.BinaryTypes.String, &strCompareFunction{cmp: func(a, b string) bool { return a >= b }})
|
|
binaryFunctions.register(types.BinaryOpGte, arrow.PrimitiveTypes.Int64, &intCompareFunction{cmp: func(a, b int64) bool { return a >= b }})
|
|
binaryFunctions.register(types.BinaryOpGte, arrow.PrimitiveTypes.Uint64, ×tampCompareFunction{cmp: func(a, b uint64) bool { return a >= b }})
|
|
binaryFunctions.register(types.BinaryOpGte, arrow.PrimitiveTypes.Float64, &floatCompareFunction{cmp: func(a, b float64) bool { return a >= b }})
|
|
// Functions for [types.BinaryOpLt]
|
|
binaryFunctions.register(types.BinaryOpLt, arrow.FixedWidthTypes.Boolean, &boolCompareFunction{cmp: func(a, b bool) bool { return boolToInt(a) < boolToInt(b) }})
|
|
binaryFunctions.register(types.BinaryOpLt, arrow.BinaryTypes.String, &strCompareFunction{cmp: func(a, b string) bool { return a < b }})
|
|
binaryFunctions.register(types.BinaryOpLt, arrow.PrimitiveTypes.Int64, &intCompareFunction{cmp: func(a, b int64) bool { return a < b }})
|
|
binaryFunctions.register(types.BinaryOpLt, arrow.PrimitiveTypes.Uint64, ×tampCompareFunction{cmp: func(a, b uint64) bool { return a < b }})
|
|
binaryFunctions.register(types.BinaryOpLt, arrow.PrimitiveTypes.Float64, &floatCompareFunction{cmp: func(a, b float64) bool { return a < b }})
|
|
// Functions for [types.BinaryOpLte]
|
|
binaryFunctions.register(types.BinaryOpLte, arrow.FixedWidthTypes.Boolean, &boolCompareFunction{cmp: func(a, b bool) bool { return boolToInt(a) <= boolToInt(b) }})
|
|
binaryFunctions.register(types.BinaryOpLte, arrow.BinaryTypes.String, &strCompareFunction{cmp: func(a, b string) bool { return a <= b }})
|
|
binaryFunctions.register(types.BinaryOpLte, arrow.PrimitiveTypes.Int64, &intCompareFunction{cmp: func(a, b int64) bool { return a <= b }})
|
|
binaryFunctions.register(types.BinaryOpLte, arrow.PrimitiveTypes.Uint64, ×tampCompareFunction{cmp: func(a, b uint64) bool { return a <= b }})
|
|
binaryFunctions.register(types.BinaryOpLte, arrow.PrimitiveTypes.Float64, &floatCompareFunction{cmp: func(a, b float64) bool { return a <= b }})
|
|
}
|
|
|
|
type UnaryFunctionRegistry interface {
|
|
register(types.UnaryOp, arrow.DataType, UnaryFunction)
|
|
GetForSignature(types.UnaryOp, arrow.DataType) (UnaryFunction, error)
|
|
}
|
|
|
|
type UnaryFunction interface {
|
|
Evaluate(lhs ColumnVector) (ColumnVector, error)
|
|
}
|
|
|
|
type unaryFuncReg struct {
|
|
reg map[types.UnaryOp]map[arrow.DataType]UnaryFunction
|
|
}
|
|
|
|
// register implements UnaryFunctionRegistry.
|
|
func (u *unaryFuncReg) register(op types.UnaryOp, ltype arrow.DataType, f UnaryFunction) {
|
|
if u.reg == nil {
|
|
u.reg = make(map[types.UnaryOp]map[arrow.DataType]UnaryFunction)
|
|
}
|
|
if _, ok := u.reg[op]; !ok {
|
|
u.reg[op] = make(map[arrow.DataType]UnaryFunction)
|
|
}
|
|
// TODO(chaudum): Should the function panic when duplicate keys are registered?
|
|
u.reg[op][ltype] = f
|
|
}
|
|
|
|
// GetForSignature implements UnaryFunctionRegistry.
|
|
func (u *unaryFuncReg) GetForSignature(types.UnaryOp, arrow.DataType) (UnaryFunction, error) {
|
|
return nil, errors.ErrNotImplemented
|
|
}
|
|
|
|
type BinaryFunctionRegistry interface {
|
|
register(types.BinaryOp, arrow.DataType, BinaryFunction)
|
|
GetForSignature(types.BinaryOp, arrow.DataType) (BinaryFunction, error)
|
|
}
|
|
|
|
// TODO(chaudum): Make BinaryFunction typed:
|
|
//
|
|
// type BinaryFunction[L, R arrow.DataType] interface {
|
|
// Evaluate(lhs ColumnVector[L], rhs ColumnVector[R]) (ColumnVector[arrow.BOOL], error)
|
|
// }
|
|
type BinaryFunction interface {
|
|
Evaluate(lhs, rhs ColumnVector) (ColumnVector, error)
|
|
}
|
|
|
|
type binaryFuncReg struct {
|
|
reg map[types.BinaryOp]map[arrow.DataType]BinaryFunction
|
|
}
|
|
|
|
// register implements BinaryFunctionRegistry.
|
|
func (b *binaryFuncReg) register(op types.BinaryOp, ty arrow.DataType, f BinaryFunction) {
|
|
if b.reg == nil {
|
|
b.reg = make(map[types.BinaryOp]map[arrow.DataType]BinaryFunction)
|
|
}
|
|
if _, ok := b.reg[op]; !ok {
|
|
b.reg[op] = make(map[arrow.DataType]BinaryFunction)
|
|
}
|
|
// TODO(chaudum): Should the function panic when duplicate keys are registered?
|
|
b.reg[op][ty] = f
|
|
}
|
|
|
|
// GetForSignature implements BinaryFunctionRegistry.
|
|
func (b *binaryFuncReg) GetForSignature(op types.BinaryOp, ltype arrow.DataType) (BinaryFunction, error) {
|
|
// Get registered functions for the specific operation
|
|
reg, ok := b.reg[op]
|
|
if !ok {
|
|
return nil, errors.ErrNotImplemented
|
|
}
|
|
// Get registered function for the specific data type
|
|
fn, ok := reg[ltype]
|
|
if !ok {
|
|
return nil, errors.ErrNotImplemented
|
|
}
|
|
return fn, nil
|
|
}
|
|
|
|
type boolCompareFunction struct {
|
|
cmp func(a, b bool) bool
|
|
}
|
|
|
|
// Evaluate implements BinaryFunction.
|
|
func (f *boolCompareFunction) Evaluate(lhs ColumnVector, rhs ColumnVector) (ColumnVector, error) {
|
|
lhsArr := lhs.ToArray().(*array.Boolean)
|
|
rhsArr := rhs.ToArray().(*array.Boolean)
|
|
|
|
if lhsArr.Len() != rhsArr.Len() {
|
|
return nil, errors.ErrIndex
|
|
}
|
|
|
|
mem := memory.NewGoAllocator()
|
|
builder := array.NewBooleanBuilder(mem)
|
|
defer builder.Release()
|
|
|
|
for i := 0; i < lhs.ToArray().Len(); i++ {
|
|
if lhsArr.IsNull(i) || rhsArr.IsNull(i) {
|
|
builder.Append(false)
|
|
continue
|
|
}
|
|
builder.Append(f.cmp(lhsArr.Value(i), rhsArr.Value(i)))
|
|
}
|
|
|
|
return &Array{array: builder.NewArray()}, nil
|
|
}
|
|
|
|
type strCompareFunction struct {
|
|
cmp func(a, b string) bool
|
|
}
|
|
|
|
// Evaluate implements BinaryFunction.
|
|
func (f *strCompareFunction) Evaluate(lhs ColumnVector, rhs ColumnVector) (ColumnVector, error) {
|
|
if lhs.Len() != rhs.Len() {
|
|
return nil, arrow.ErrIndex
|
|
}
|
|
|
|
lhsArr := lhs.ToArray().(*array.String)
|
|
rhsArr := rhs.ToArray().(*array.String)
|
|
|
|
mem := memory.NewGoAllocator()
|
|
builder := array.NewBooleanBuilder(mem)
|
|
defer builder.Release()
|
|
|
|
for i := 0; i < lhs.ToArray().Len(); i++ {
|
|
if lhsArr.IsNull(i) || rhsArr.IsNull(i) {
|
|
builder.Append(false)
|
|
continue
|
|
}
|
|
builder.Append(f.cmp(lhsArr.Value(i), rhsArr.Value(i)))
|
|
}
|
|
|
|
return &Array{array: builder.NewArray()}, nil
|
|
}
|
|
|
|
type intCompareFunction struct {
|
|
cmp func(a, b int64) bool
|
|
}
|
|
|
|
// Evaluate implements BinaryFunction.
|
|
func (f *intCompareFunction) Evaluate(lhs ColumnVector, rhs ColumnVector) (ColumnVector, error) {
|
|
if lhs.Len() != rhs.Len() {
|
|
return nil, arrow.ErrIndex
|
|
}
|
|
|
|
lhsArr := lhs.ToArray().(*array.Int64)
|
|
rhsArr := rhs.ToArray().(*array.Int64)
|
|
|
|
mem := memory.NewGoAllocator()
|
|
builder := array.NewBooleanBuilder(mem)
|
|
defer builder.Release()
|
|
|
|
for i := 0; i < lhs.ToArray().Len(); i++ {
|
|
if lhsArr.IsNull(i) || rhsArr.IsNull(i) {
|
|
builder.Append(false)
|
|
continue
|
|
}
|
|
builder.Append(f.cmp(lhsArr.Value(i), rhsArr.Value(i)))
|
|
}
|
|
|
|
return &Array{array: builder.NewArray()}, nil
|
|
}
|
|
|
|
type timestampCompareFunction struct {
|
|
cmp func(a, b uint64) bool
|
|
}
|
|
|
|
// Evaluate implements BinaryFunction.
|
|
func (f *timestampCompareFunction) Evaluate(lhs ColumnVector, rhs ColumnVector) (ColumnVector, error) {
|
|
if lhs.Len() != rhs.Len() {
|
|
return nil, arrow.ErrIndex
|
|
}
|
|
|
|
lhsArr := lhs.ToArray().(*array.Uint64)
|
|
rhsArr := rhs.ToArray().(*array.Uint64)
|
|
|
|
mem := memory.NewGoAllocator()
|
|
builder := array.NewBooleanBuilder(mem)
|
|
defer builder.Release()
|
|
|
|
for i := 0; i < lhs.ToArray().Len(); i++ {
|
|
if lhsArr.IsNull(i) || rhsArr.IsNull(i) {
|
|
builder.Append(false)
|
|
continue
|
|
}
|
|
builder.Append(f.cmp(lhsArr.Value(i), rhsArr.Value(i)))
|
|
}
|
|
|
|
return &Array{array: builder.NewArray()}, nil
|
|
}
|
|
|
|
type floatCompareFunction struct {
|
|
cmp func(a, b float64) bool
|
|
}
|
|
|
|
// Evaluate implements BinaryFunction.
|
|
func (f *floatCompareFunction) Evaluate(lhs ColumnVector, rhs ColumnVector) (ColumnVector, error) {
|
|
if lhs.Len() != rhs.Len() {
|
|
return nil, arrow.ErrIndex
|
|
}
|
|
|
|
lhsArr := lhs.ToArray().(*array.Float64)
|
|
rhsArr := rhs.ToArray().(*array.Float64)
|
|
|
|
mem := memory.NewGoAllocator()
|
|
builder := array.NewBooleanBuilder(mem)
|
|
defer builder.Release()
|
|
|
|
for i := 0; i < lhs.ToArray().Len(); i++ {
|
|
if lhsArr.IsNull(i) || rhsArr.IsNull(i) {
|
|
builder.Append(false)
|
|
continue
|
|
}
|
|
builder.Append(f.cmp(lhsArr.Value(i), rhsArr.Value(i)))
|
|
}
|
|
|
|
return &Array{array: builder.NewArray()}, nil
|
|
}
|
|
|
|
// Compiler optimized version of converting boolean b into an integer of value 0 or 1
|
|
// https://github.com/golang/go/issues/6011#issuecomment-323144578
|
|
func boolToInt(b bool) int {
|
|
var i int
|
|
if b {
|
|
i = 1
|
|
} else {
|
|
i = 0
|
|
}
|
|
return i
|
|
|
|
}
|
|
|