mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
417 lines
19 KiB
417 lines
19 KiB
package executor
|
|
|
|
import (
|
|
"fmt"
|
|
"math"
|
|
"regexp"
|
|
"strings"
|
|
|
|
"github.com/apache/arrow-go/v18/arrow"
|
|
"github.com/apache/arrow-go/v18/arrow/array"
|
|
"github.com/apache/arrow-go/v18/arrow/memory"
|
|
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/errors"
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/types"
|
|
)
|
|
|
|
var (
|
|
unaryFunctions UnaryFunctionRegistry = &unaryFuncReg{}
|
|
binaryFunctions BinaryFunctionRegistry = &binaryFuncReg{}
|
|
variadicFunctions VariadicFunctionRegistry = &variadicFuncReg{}
|
|
)
|
|
|
|
func init() {
|
|
// Functions for [types.BinaryOpDiv]
|
|
binaryFunctions.register(types.BinaryOpDiv, arrow.PrimitiveTypes.Float64, &genericFloat64Function[*array.Float64, float64]{eval: func(a, b float64) (float64, error) { return a / b, nil }})
|
|
binaryFunctions.register(types.BinaryOpDiv, arrow.PrimitiveTypes.Int64, &genericFloat64Function[*array.Int64, int64]{eval: func(a, b int64) (float64, error) { return float64(a) / float64(b), nil }})
|
|
// Functions for [types.BinaryOpAdd]
|
|
binaryFunctions.register(types.BinaryOpAdd, arrow.PrimitiveTypes.Float64, &genericFloat64Function[*array.Float64, float64]{eval: func(a, b float64) (float64, error) { return a + b, nil }})
|
|
binaryFunctions.register(types.BinaryOpAdd, arrow.PrimitiveTypes.Int64, &genericFloat64Function[*array.Int64, int64]{eval: func(a, b int64) (float64, error) { return float64(a) + float64(b), nil }})
|
|
// Functions for [types.BinaryOpSub]
|
|
binaryFunctions.register(types.BinaryOpSub, arrow.PrimitiveTypes.Float64, &genericFloat64Function[*array.Float64, float64]{eval: func(a, b float64) (float64, error) { return a - b, nil }})
|
|
binaryFunctions.register(types.BinaryOpSub, arrow.PrimitiveTypes.Int64, &genericFloat64Function[*array.Int64, int64]{eval: func(a, b int64) (float64, error) { return float64(a) - float64(b), nil }})
|
|
// Functions for [types.BinaryOpMul]
|
|
binaryFunctions.register(types.BinaryOpMul, arrow.PrimitiveTypes.Float64, &genericFloat64Function[*array.Float64, float64]{eval: func(a, b float64) (float64, error) { return a * b, nil }})
|
|
binaryFunctions.register(types.BinaryOpMul, arrow.PrimitiveTypes.Int64, &genericFloat64Function[*array.Int64, int64]{eval: func(a, b int64) (float64, error) { return float64(a) * float64(b), nil }})
|
|
// Functions for [types.BinaryOpMod]
|
|
binaryFunctions.register(types.BinaryOpMod, arrow.PrimitiveTypes.Float64, &genericFloat64Function[*array.Float64, float64]{eval: func(a, b float64) (float64, error) { return math.Mod(a, b), nil }})
|
|
binaryFunctions.register(types.BinaryOpMod, arrow.PrimitiveTypes.Int64, &genericFloat64Function[*array.Int64, int64]{eval: func(a, b int64) (float64, error) { return float64(a % b), nil }})
|
|
// Functions for [types.BinaryOpPow]
|
|
binaryFunctions.register(types.BinaryOpPow, arrow.PrimitiveTypes.Float64, &genericFloat64Function[*array.Float64, float64]{eval: func(a, b float64) (float64, error) { return math.Pow(a, b), nil }})
|
|
binaryFunctions.register(types.BinaryOpPow, arrow.PrimitiveTypes.Int64, &genericFloat64Function[*array.Int64, int64]{eval: func(a, b int64) (float64, error) { return math.Pow(float64(a), float64(b)), nil }})
|
|
|
|
// Functions for [types.BinaryOpEq]
|
|
binaryFunctions.register(types.BinaryOpEq, arrow.FixedWidthTypes.Boolean, &genericBoolFunction[*array.Boolean, bool]{eval: func(a, b bool) (bool, error) { return a == b, nil }})
|
|
binaryFunctions.register(types.BinaryOpEq, arrow.BinaryTypes.String, &genericBoolFunction[*array.String, string]{eval: func(a, b string) (bool, error) { return a == b, nil }})
|
|
binaryFunctions.register(types.BinaryOpEq, arrow.PrimitiveTypes.Int64, &genericBoolFunction[*array.Int64, int64]{eval: func(a, b int64) (bool, error) { return a == b, nil }})
|
|
binaryFunctions.register(types.BinaryOpEq, arrow.FixedWidthTypes.Timestamp_ns, &genericBoolFunction[*array.Timestamp, arrow.Timestamp]{eval: func(a, b arrow.Timestamp) (bool, error) { return a == b, nil }})
|
|
binaryFunctions.register(types.BinaryOpEq, arrow.PrimitiveTypes.Float64, &genericBoolFunction[*array.Float64, float64]{eval: func(a, b float64) (bool, error) { return a == b, nil }})
|
|
// Functions for [types.BinaryOpNeq]
|
|
binaryFunctions.register(types.BinaryOpNeq, arrow.FixedWidthTypes.Boolean, &genericBoolFunction[*array.Boolean, bool]{eval: func(a, b bool) (bool, error) { return a != b, nil }})
|
|
binaryFunctions.register(types.BinaryOpNeq, arrow.BinaryTypes.String, &genericBoolFunction[*array.String, string]{eval: func(a, b string) (bool, error) { return a != b, nil }})
|
|
binaryFunctions.register(types.BinaryOpNeq, arrow.PrimitiveTypes.Int64, &genericBoolFunction[*array.Int64, int64]{eval: func(a, b int64) (bool, error) { return a != b, nil }})
|
|
binaryFunctions.register(types.BinaryOpNeq, arrow.FixedWidthTypes.Timestamp_ns, &genericBoolFunction[*array.Timestamp, arrow.Timestamp]{eval: func(a, b arrow.Timestamp) (bool, error) { return a != b, nil }})
|
|
binaryFunctions.register(types.BinaryOpNeq, arrow.PrimitiveTypes.Float64, &genericBoolFunction[*array.Float64, float64]{eval: func(a, b float64) (bool, error) { return a != b, nil }})
|
|
// Functions for [types.BinaryOpGt]
|
|
binaryFunctions.register(types.BinaryOpGt, arrow.FixedWidthTypes.Boolean, &genericBoolFunction[*array.Boolean, bool]{eval: func(a, b bool) (bool, error) { return boolToInt(a) > boolToInt(b), nil }})
|
|
binaryFunctions.register(types.BinaryOpGt, arrow.BinaryTypes.String, &genericBoolFunction[*array.String, string]{eval: func(a, b string) (bool, error) { return a > b, nil }})
|
|
binaryFunctions.register(types.BinaryOpGt, arrow.PrimitiveTypes.Int64, &genericBoolFunction[*array.Int64, int64]{eval: func(a, b int64) (bool, error) { return a > b, nil }})
|
|
binaryFunctions.register(types.BinaryOpGt, arrow.FixedWidthTypes.Timestamp_ns, &genericBoolFunction[*array.Timestamp, arrow.Timestamp]{eval: func(a, b arrow.Timestamp) (bool, error) { return a > b, nil }})
|
|
binaryFunctions.register(types.BinaryOpGt, arrow.PrimitiveTypes.Float64, &genericBoolFunction[*array.Float64, float64]{eval: func(a, b float64) (bool, error) { return a > b, nil }})
|
|
// Functions for [types.BinaryOpGte]
|
|
binaryFunctions.register(types.BinaryOpGte, arrow.FixedWidthTypes.Boolean, &genericBoolFunction[*array.Boolean, bool]{eval: func(a, b bool) (bool, error) { return boolToInt(a) >= boolToInt(b), nil }})
|
|
binaryFunctions.register(types.BinaryOpGte, arrow.BinaryTypes.String, &genericBoolFunction[*array.String, string]{eval: func(a, b string) (bool, error) { return a >= b, nil }})
|
|
binaryFunctions.register(types.BinaryOpGte, arrow.PrimitiveTypes.Int64, &genericBoolFunction[*array.Int64, int64]{eval: func(a, b int64) (bool, error) { return a >= b, nil }})
|
|
binaryFunctions.register(types.BinaryOpGte, arrow.FixedWidthTypes.Timestamp_ns, &genericBoolFunction[*array.Timestamp, arrow.Timestamp]{eval: func(a, b arrow.Timestamp) (bool, error) { return a >= b, nil }})
|
|
binaryFunctions.register(types.BinaryOpGte, arrow.PrimitiveTypes.Float64, &genericBoolFunction[*array.Float64, float64]{eval: func(a, b float64) (bool, error) { return a >= b, nil }})
|
|
// Functions for [types.BinaryOpLt]
|
|
binaryFunctions.register(types.BinaryOpLt, arrow.FixedWidthTypes.Boolean, &genericBoolFunction[*array.Boolean, bool]{eval: func(a, b bool) (bool, error) { return boolToInt(a) < boolToInt(b), nil }})
|
|
binaryFunctions.register(types.BinaryOpLt, arrow.BinaryTypes.String, &genericBoolFunction[*array.String, string]{eval: func(a, b string) (bool, error) { return a < b, nil }})
|
|
binaryFunctions.register(types.BinaryOpLt, arrow.PrimitiveTypes.Int64, &genericBoolFunction[*array.Int64, int64]{eval: func(a, b int64) (bool, error) { return a < b, nil }})
|
|
binaryFunctions.register(types.BinaryOpLt, arrow.FixedWidthTypes.Timestamp_ns, &genericBoolFunction[*array.Timestamp, arrow.Timestamp]{eval: func(a, b arrow.Timestamp) (bool, error) { return a < b, nil }})
|
|
binaryFunctions.register(types.BinaryOpLt, arrow.PrimitiveTypes.Float64, &genericBoolFunction[*array.Float64, float64]{eval: func(a, b float64) (bool, error) { return a < b, nil }})
|
|
// Functions for [types.BinaryOpLte]
|
|
binaryFunctions.register(types.BinaryOpLte, arrow.FixedWidthTypes.Boolean, &genericBoolFunction[*array.Boolean, bool]{eval: func(a, b bool) (bool, error) { return boolToInt(a) <= boolToInt(b), nil }})
|
|
binaryFunctions.register(types.BinaryOpLte, arrow.BinaryTypes.String, &genericBoolFunction[*array.String, string]{eval: func(a, b string) (bool, error) { return a <= b, nil }})
|
|
binaryFunctions.register(types.BinaryOpLte, arrow.PrimitiveTypes.Int64, &genericBoolFunction[*array.Int64, int64]{eval: func(a, b int64) (bool, error) { return a <= b, nil }})
|
|
binaryFunctions.register(types.BinaryOpLte, arrow.FixedWidthTypes.Timestamp_ns, &genericBoolFunction[*array.Timestamp, arrow.Timestamp]{eval: func(a, b arrow.Timestamp) (bool, error) { return a <= b, nil }})
|
|
binaryFunctions.register(types.BinaryOpLte, arrow.PrimitiveTypes.Float64, &genericBoolFunction[*array.Float64, float64]{eval: func(a, b float64) (bool, error) { return a <= b, nil }})
|
|
// Functions for [types.BinaryOpAnd]
|
|
binaryFunctions.register(types.BinaryOpAnd, arrow.FixedWidthTypes.Boolean, &genericBoolFunction[*array.Boolean, bool]{eval: func(a, b bool) (bool, error) { return a && b, nil }})
|
|
// Functions for [types.BinaryOpOr]
|
|
binaryFunctions.register(types.BinaryOpOr, arrow.FixedWidthTypes.Boolean, &genericBoolFunction[*array.Boolean, bool]{eval: func(a, b bool) (bool, error) { return a || b, nil }})
|
|
// Functions for [types.BinaryOpXor]
|
|
binaryFunctions.register(types.BinaryOpXor, arrow.FixedWidthTypes.Boolean, &genericBoolFunction[*array.Boolean, bool]{eval: func(a, b bool) (bool, error) { return a != b, nil }})
|
|
// Functions for [types.BinaryOpMatchSubstr]
|
|
binaryFunctions.register(types.BinaryOpMatchSubstr, arrow.BinaryTypes.String, &genericBoolFunction[*array.String, string]{eval: func(a, b string) (bool, error) { return strings.Contains(a, b), nil }})
|
|
// Functions for [types.BinaryOpNotMatchSubstr]
|
|
binaryFunctions.register(types.BinaryOpNotMatchSubstr, arrow.BinaryTypes.String, &genericBoolFunction[*array.String, string]{eval: func(a, b string) (bool, error) { return !strings.Contains(a, b), nil }})
|
|
// Functions for [types.BinaryOpMatchRe]
|
|
binaryFunctions.register(types.BinaryOpMatchRe, arrow.BinaryTypes.String, ®expFunction{eval: func(a, b string, reg *regexp.Regexp) (bool, error) {
|
|
if reg == nil {
|
|
// Fallback to per-row regex-compilation due to non-scalar expression
|
|
var err error
|
|
reg, err = regexp.Compile(b)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
return reg.Match([]byte(a)), nil
|
|
}})
|
|
// Functions for [types.BinaryOpNotMatchRe]
|
|
binaryFunctions.register(types.BinaryOpNotMatchRe, arrow.BinaryTypes.String, ®expFunction{eval: func(a, b string, reg *regexp.Regexp) (bool, error) {
|
|
if reg == nil {
|
|
// Fallback to per-row regex-compilation due to non-scalar expression
|
|
var err error
|
|
reg, err = regexp.Compile(b)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
return !reg.Match([]byte(a)), nil
|
|
}})
|
|
|
|
// Functions for [types.UnaryOpNot]
|
|
unaryFunctions.register(types.UnaryOpNot, arrow.FixedWidthTypes.Boolean, UnaryFunc(func(input arrow.Array) (arrow.Array, error) {
|
|
arr, ok := input.(*array.Boolean)
|
|
if !ok {
|
|
return nil, fmt.Errorf("invalid array type: expected *array.Boolean, got %T", input)
|
|
}
|
|
|
|
builder := array.NewBooleanBuilder(memory.DefaultAllocator)
|
|
for i := range arr.Len() {
|
|
if arr.IsNull(i) {
|
|
builder.AppendNull()
|
|
continue
|
|
}
|
|
builder.Append(!arr.Value(i))
|
|
}
|
|
|
|
return builder.NewArray(), nil
|
|
}))
|
|
|
|
// Cast functions
|
|
unaryFunctions.register(types.UnaryOpCastFloat, arrow.BinaryTypes.String, castFn(types.UnaryOpCastFloat))
|
|
unaryFunctions.register(types.UnaryOpCastBytes, arrow.BinaryTypes.String, castFn(types.UnaryOpCastBytes))
|
|
unaryFunctions.register(types.UnaryOpCastDuration, arrow.BinaryTypes.String, castFn(types.UnaryOpCastDuration))
|
|
|
|
// Parse functions
|
|
variadicFunctions.register(types.VariadicOpParseLogfmt, parseFn(types.VariadicOpParseLogfmt))
|
|
variadicFunctions.register(types.VariadicOpParseJSON, parseFn(types.VariadicOpParseJSON))
|
|
}
|
|
|
|
type UnaryFunctionRegistry interface {
|
|
register(types.UnaryOp, arrow.DataType, UnaryFunction)
|
|
GetForSignature(types.UnaryOp, arrow.DataType) (UnaryFunction, error)
|
|
}
|
|
|
|
type UnaryFunction interface {
|
|
Evaluate(lhs arrow.Array) (arrow.Array, error)
|
|
}
|
|
|
|
type UnaryFunc func(arrow.Array) (arrow.Array, error)
|
|
|
|
func (f UnaryFunc) Evaluate(lhs arrow.Array) (arrow.Array, error) {
|
|
return f(lhs)
|
|
}
|
|
|
|
type unaryFuncReg struct {
|
|
reg map[types.UnaryOp]map[arrow.DataType]UnaryFunction
|
|
}
|
|
|
|
// register implements UnaryFunctionRegistry.
|
|
func (u *unaryFuncReg) register(op types.UnaryOp, ltype arrow.DataType, f UnaryFunction) {
|
|
if u.reg == nil {
|
|
u.reg = make(map[types.UnaryOp]map[arrow.DataType]UnaryFunction)
|
|
}
|
|
if _, ok := u.reg[op]; !ok {
|
|
u.reg[op] = make(map[arrow.DataType]UnaryFunction)
|
|
}
|
|
// TODO(chaudum): Should the function panic when duplicate keys are registered?
|
|
u.reg[op][ltype] = f
|
|
}
|
|
|
|
// GetForSignature implements UnaryFunctionRegistry.
|
|
func (u *unaryFuncReg) GetForSignature(op types.UnaryOp, ltype arrow.DataType) (UnaryFunction, error) {
|
|
// Get registered functions for the specific operation
|
|
reg, ok := u.reg[op]
|
|
if !ok {
|
|
return nil, errors.ErrNotImplemented
|
|
}
|
|
// Get registered function for the specific data type
|
|
fn, ok := reg[ltype]
|
|
if !ok {
|
|
return nil, errors.ErrNotImplemented
|
|
}
|
|
return fn, nil
|
|
}
|
|
|
|
type BinaryFunctionRegistry interface {
|
|
register(types.BinaryOp, arrow.DataType, BinaryFunction)
|
|
GetForSignature(types.BinaryOp, arrow.DataType) (BinaryFunction, error)
|
|
}
|
|
|
|
type BinaryFunction interface {
|
|
Evaluate(lhs, rhs arrow.Array, lhsIsScalar, rhsIsScalar bool) (arrow.Array, error)
|
|
}
|
|
|
|
type binaryFuncReg struct {
|
|
reg map[types.BinaryOp]map[arrow.DataType]BinaryFunction
|
|
}
|
|
|
|
// register implements BinaryFunctionRegistry.
|
|
func (b *binaryFuncReg) register(op types.BinaryOp, ty arrow.DataType, f BinaryFunction) {
|
|
if b.reg == nil {
|
|
b.reg = make(map[types.BinaryOp]map[arrow.DataType]BinaryFunction)
|
|
}
|
|
if _, ok := b.reg[op]; !ok {
|
|
b.reg[op] = make(map[arrow.DataType]BinaryFunction)
|
|
}
|
|
// TODO(chaudum): Should the function panic when duplicate keys are registered?
|
|
b.reg[op][ty] = f
|
|
}
|
|
|
|
// GetForSignature implements BinaryFunctionRegistry.
|
|
func (b *binaryFuncReg) GetForSignature(op types.BinaryOp, ltype arrow.DataType) (BinaryFunction, error) {
|
|
// Get registered functions for the specific operation
|
|
reg, ok := b.reg[op]
|
|
if !ok {
|
|
return nil, errors.ErrNotImplemented
|
|
}
|
|
// Get registered function for the specific data type
|
|
fn, ok := reg[ltype]
|
|
if !ok {
|
|
return nil, errors.ErrNotImplemented
|
|
}
|
|
return fn, nil
|
|
}
|
|
|
|
type regexpFunction struct {
|
|
eval func(a, b string, reg *regexp.Regexp) (bool, error)
|
|
}
|
|
|
|
func (f *regexpFunction) Evaluate(lhs arrow.Array, rhs arrow.Array, _, rhsIsScalar bool) (arrow.Array, error) {
|
|
if lhs.Len() != rhs.Len() {
|
|
return nil, arrow.ErrIndex
|
|
}
|
|
|
|
lhsArr, ok := lhs.(*array.String)
|
|
if !ok {
|
|
return nil, fmt.Errorf("invalid array type: expected %T, got %T", new(*array.String), lhs)
|
|
}
|
|
|
|
rhsArr, ok := rhs.(*array.String)
|
|
if !ok {
|
|
return nil, fmt.Errorf("invalid array type: expected %T, got %T", new(*array.String), rhs)
|
|
}
|
|
|
|
builder := array.NewBooleanBuilder(memory.DefaultAllocator)
|
|
if rhs.Len() == 0 {
|
|
return builder.NewArray(), nil
|
|
}
|
|
|
|
var (
|
|
re *regexp.Regexp
|
|
err error
|
|
)
|
|
|
|
if rhsIsScalar {
|
|
if rhsArr.IsNull(0) {
|
|
return nil, fmt.Errorf("invalid NULL value, expected string")
|
|
}
|
|
// TODO(chaudum): Performance of regex evaluation can be improved by simplifying the regex,
|
|
// see pkg/logql/log/filter.go:645
|
|
re, err = regexp.Compile(rhsArr.Value(0))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to compile regular expression for batch: %w", err)
|
|
}
|
|
}
|
|
|
|
for i := range lhsArr.Len() {
|
|
if lhsArr.IsNull(i) || rhsArr.IsNull(i) {
|
|
builder.Append(false)
|
|
continue
|
|
}
|
|
res, err := f.eval(lhsArr.Value(i), rhsArr.Value(i), re)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
builder.Append(res)
|
|
}
|
|
|
|
return builder.NewArray(), nil
|
|
|
|
}
|
|
|
|
// genericBoolFunction is a struct that implements the [BinaryFunction] interface methods
|
|
// and can be used for any array type with comparable elements.
|
|
type genericBoolFunction[E arrow.TypedArray[T], T arrow.ValueType] struct {
|
|
eval func(a, b T) (bool, error)
|
|
}
|
|
|
|
// Evaluate implements BinaryFunction.
|
|
func (f *genericBoolFunction[E, T]) Evaluate(lhs arrow.Array, rhs arrow.Array, _, _ bool) (arrow.Array, error) {
|
|
if lhs.Len() != rhs.Len() {
|
|
return nil, arrow.ErrIndex
|
|
}
|
|
|
|
lhsArr, ok := lhs.(E)
|
|
if !ok {
|
|
return nil, fmt.Errorf("invalid array type: expected %T, got %T", new(E), lhs)
|
|
}
|
|
|
|
rhsArr, ok := rhs.(E)
|
|
if !ok {
|
|
return nil, fmt.Errorf("invalid array type: expected %T, got %T", new(E), rhs)
|
|
}
|
|
|
|
builder := array.NewBooleanBuilder(memory.DefaultAllocator)
|
|
|
|
for i := range lhsArr.Len() {
|
|
if lhsArr.IsNull(i) || rhsArr.IsNull(i) {
|
|
builder.Append(false)
|
|
continue
|
|
}
|
|
res, err := f.eval(lhsArr.Value(i), rhsArr.Value(i))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
builder.Append(res)
|
|
}
|
|
|
|
return builder.NewArray(), nil
|
|
}
|
|
|
|
// genericFloat64Function is a struct that implements the [BinaryFunction] interface methods
|
|
// and can be used for any array type with numeric elements.
|
|
type genericFloat64Function[E arrow.TypedArray[T], T arrow.ValueType] struct {
|
|
eval func(a, b T) (float64, error)
|
|
}
|
|
|
|
// Evaluate implements BinaryFunction.
|
|
func (f *genericFloat64Function[E, T]) Evaluate(lhs arrow.Array, rhs arrow.Array, _, _ bool) (arrow.Array, error) {
|
|
if lhs.Len() != rhs.Len() {
|
|
return nil, arrow.ErrIndex
|
|
}
|
|
|
|
lhsArr, ok := lhs.(E)
|
|
if !ok {
|
|
return nil, fmt.Errorf("invalid array type: expected %T, got %T", new(E), lhs)
|
|
}
|
|
|
|
rhsArr, ok := rhs.(E)
|
|
if !ok {
|
|
return nil, fmt.Errorf("invalid array type: expected %T, got %T", new(E), rhs)
|
|
}
|
|
|
|
builder := array.NewFloat64Builder(memory.DefaultAllocator)
|
|
|
|
for i := range lhsArr.Len() {
|
|
if lhsArr.IsNull(i) || rhsArr.IsNull(i) {
|
|
builder.AppendNull()
|
|
continue
|
|
}
|
|
res, err := f.eval(lhsArr.Value(i), rhsArr.Value(i))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
builder.Append(res)
|
|
}
|
|
|
|
return builder.NewArray(), nil
|
|
}
|
|
|
|
// Compiler optimized version of converting boolean b into an integer of value 0 or 1
|
|
// https://github.com/golang/go/issues/6011#issuecomment-323144578
|
|
func boolToInt(b bool) int {
|
|
var i int
|
|
if b {
|
|
i = 1
|
|
} else {
|
|
i = 0
|
|
}
|
|
return i
|
|
}
|
|
|
|
type VariadicFunctionRegistry interface {
|
|
register(types.VariadicOp, VariadicFunction)
|
|
GetForSignature(types.VariadicOp) (VariadicFunction, error)
|
|
}
|
|
|
|
type VariadicFunction interface {
|
|
Evaluate(args ...arrow.Array) (arrow.Array, error)
|
|
}
|
|
|
|
type VariadicFunctionFunc func(args ...arrow.Array) (arrow.Array, error)
|
|
|
|
func (f VariadicFunctionFunc) Evaluate(args ...arrow.Array) (arrow.Array, error) {
|
|
return f(args...)
|
|
}
|
|
|
|
type variadicFuncReg struct {
|
|
reg map[types.VariadicOp]VariadicFunction
|
|
}
|
|
|
|
// register implements VariadicFunctionRegistry.
|
|
func (u *variadicFuncReg) register(op types.VariadicOp, f VariadicFunction) {
|
|
if u.reg == nil {
|
|
u.reg = make(map[types.VariadicOp]VariadicFunction)
|
|
}
|
|
|
|
_, exists := u.reg[op]
|
|
if exists {
|
|
panic(fmt.Sprintf("duplicate variadic function registration for %s", op))
|
|
}
|
|
|
|
u.reg[op] = f
|
|
}
|
|
|
|
// GetForSignature implements VariadicFunctionRegistry.
|
|
func (u *variadicFuncReg) GetForSignature(op types.VariadicOp) (VariadicFunction, error) {
|
|
// Get registered function for the specific operation
|
|
fn, ok := u.reg[op]
|
|
if !ok {
|
|
return nil, errors.ErrNotImplemented
|
|
}
|
|
return fn, nil
|
|
}
|
|
|