perf(engine): Improve regexp expression evaluation (#19644)

This PR changes the evaluation of regex-match binary expression evaluations so that the regexp on right hand side of the expression is only compiled once for each batch evaluation, rather than for each row.

This saves a ton of allocations and improves general performance of queries with a regexp line or label filter.
pull/19647/head
Christian Haudum 2 months ago committed by GitHub
parent 0291c521d8
commit 73bc30d9a8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 2
      pkg/dataobj/sections/logs/table.go
  2. 7
      pkg/engine/internal/executor/expressions.go
  3. 91
      pkg/engine/internal/executor/functions.go
  4. 62
      pkg/engine/internal/executor/functions_test.go
  5. 2
      pkg/limits/store.go
  6. 2
      pkg/logql/log/label_filter.go

@ -340,7 +340,7 @@ func (b *tableBuffer) Flush() (*table, error) {
// Each metadata column may have a different number of rows compared to
// other columns. Backfill them with NULLs to match the max rows in the buffer.
// It is safe to use streamID column row count since it is a requried column in logs section.
// It is safe to use streamID column row count since it is a required column in logs section.
metadataBuilder.Backfill(streamID.Desc.RowsCount)
metadata, _ := metadataBuilder.Flush()

@ -118,7 +118,12 @@ func (e expressionEvaluator) eval(expr physical.Expression, input arrow.Record)
if err != nil {
return nil, fmt.Errorf("failed to lookup binary function for signature %v(%v,%v): %w", expr.Op, lhs.DataType(), rhs.DataType(), err)
}
return fn.Evaluate(lhs, rhs)
// Check is lhs and rhs are Scalar vectors, because certain function types, such as regexp functions
// can optimize the evaluation per batch.
_, lhsIsScalar := expr.Left.(*physical.LiteralExpr)
_, rhsIsScalar := expr.Right.(*physical.LiteralExpr)
return fn.Evaluate(lhs, rhs, lhsIsScalar, rhsIsScalar)
}
return nil, fmt.Errorf("unknown expression: %v", expr)

@ -80,22 +80,26 @@ func init() {
// Functions for [types.BinaryOpNotMatchSubstr]
binaryFunctions.register(types.BinaryOpNotMatchSubstr, arrow.BinaryTypes.String, &genericBoolFunction[*array.String, string]{eval: func(a, b string) (bool, error) { return !strings.Contains(a, b), nil }})
// Functions for [types.BinaryOpMatchRe]
// TODO(chaudum): Performance of regex evaluation can be improved if RHS is a Scalar,
// because the regexp would only need to compiled once for the given scalar value.
// TODO(chaudum): Performance of regex evaluation can be improved by simplifying the regex,
// see pkg/logql/log/filter.go:645
binaryFunctions.register(types.BinaryOpMatchRe, arrow.BinaryTypes.String, &genericBoolFunction[*array.String, string]{eval: func(a, b string) (bool, error) {
reg, err := regexp.Compile(b)
if err != nil {
return false, err
binaryFunctions.register(types.BinaryOpMatchRe, arrow.BinaryTypes.String, &regexpFunction{eval: func(a, b string, reg *regexp.Regexp) (bool, error) {
if reg == nil {
// Fallback to per-row regex-compilation due to non-scalar expression
var err error
reg, err = regexp.Compile(b)
if err != nil {
return false, err
}
}
return reg.Match([]byte(a)), nil
}})
// Functions for [types.BinaryOpNotMatchRe]
binaryFunctions.register(types.BinaryOpNotMatchRe, arrow.BinaryTypes.String, &genericBoolFunction[*array.String, string]{eval: func(a, b string) (bool, error) {
reg, err := regexp.Compile(b)
if err != nil {
return false, err
binaryFunctions.register(types.BinaryOpNotMatchRe, arrow.BinaryTypes.String, &regexpFunction{eval: func(a, b string, reg *regexp.Regexp) (bool, error) {
if reg == nil {
// Fallback to per-row regex-compilation due to non-scalar expression
var err error
reg, err = regexp.Compile(b)
if err != nil {
return false, err
}
}
return !reg.Match([]byte(a)), nil
}})
@ -158,7 +162,7 @@ type BinaryFunctionRegistry interface {
}
type BinaryFunction interface {
Evaluate(lhs, rhs arrow.Array) (arrow.Array, error)
Evaluate(lhs, rhs arrow.Array, lhsIsScalar, rhsIsScalar bool) (arrow.Array, error)
}
type binaryFuncReg struct {
@ -192,6 +196,63 @@ func (b *binaryFuncReg) GetForSignature(op types.BinaryOp, ltype arrow.DataType)
return fn, nil
}
type regexpFunction struct {
eval func(a, b string, reg *regexp.Regexp) (bool, error)
}
func (f *regexpFunction) Evaluate(lhs arrow.Array, rhs arrow.Array, _, rhsIsScalar bool) (arrow.Array, error) {
if lhs.Len() != rhs.Len() {
return nil, arrow.ErrIndex
}
lhsArr, ok := lhs.(*array.String)
if !ok {
return nil, fmt.Errorf("invalid array type: expected %T, got %T", new(*array.String), lhs)
}
rhsArr, ok := rhs.(*array.String)
if !ok {
return nil, fmt.Errorf("invalid array type: expected %T, got %T", new(*array.String), rhs)
}
builder := array.NewBooleanBuilder(memory.DefaultAllocator)
if rhs.Len() == 0 {
return builder.NewArray(), nil
}
var (
re *regexp.Regexp
err error
)
if rhsIsScalar {
if rhsArr.IsNull(0) {
return nil, fmt.Errorf("invalid NULL value, expected string")
}
// TODO(chaudum): Performance of regex evaluation can be improved by simplifying the regex,
// see pkg/logql/log/filter.go:645
re, err = regexp.Compile(rhsArr.Value(0))
if err != nil {
return nil, fmt.Errorf("failed to compile regular expression for batch: %w", err)
}
}
for i := range lhsArr.Len() {
if lhsArr.IsNull(i) || rhsArr.IsNull(i) {
builder.Append(false)
continue
}
res, err := f.eval(lhsArr.Value(i), rhsArr.Value(i), re)
if err != nil {
return nil, err
}
builder.Append(res)
}
return builder.NewArray(), nil
}
// genericBoolFunction is a struct that implements the [BinaryFunction] interface methods
// and can be used for any array type with comparable elements.
type genericBoolFunction[E arrow.TypedArray[T], T arrow.ValueType] struct {
@ -199,7 +260,7 @@ type genericBoolFunction[E arrow.TypedArray[T], T arrow.ValueType] struct {
}
// Evaluate implements BinaryFunction.
func (f *genericBoolFunction[E, T]) Evaluate(lhs arrow.Array, rhs arrow.Array) (arrow.Array, error) {
func (f *genericBoolFunction[E, T]) Evaluate(lhs arrow.Array, rhs arrow.Array, _, _ bool) (arrow.Array, error) {
if lhs.Len() != rhs.Len() {
return nil, arrow.ErrIndex
}
@ -238,7 +299,7 @@ type genericFloat64Function[E arrow.TypedArray[T], T arrow.ValueType] struct {
}
// Evaluate implements BinaryFunction.
func (f *genericFloat64Function[E, T]) Evaluate(lhs arrow.Array, rhs arrow.Array) (arrow.Array, error) {
func (f *genericFloat64Function[E, T]) Evaluate(lhs arrow.Array, rhs arrow.Array, _, _ bool) (arrow.Array, error) {
if lhs.Len() != rhs.Len() {
return nil, arrow.ErrIndex
}

@ -264,7 +264,7 @@ func TestBooleanComparisonFunctions(t *testing.T) {
fn, err := binaryFunctions.GetForSignature(tt.op, arrow.FixedWidthTypes.Boolean)
require.NoError(t, err)
result, err := fn.Evaluate(lhsArray, rhsArray)
result, err := fn.Evaluate(lhsArray, rhsArray, false, false)
require.NoError(t, err)
actual := extractBoolValues(result)
@ -333,7 +333,7 @@ func TestStringComparisonFunctions(t *testing.T) {
fn, err := binaryFunctions.GetForSignature(tt.op, arrow.BinaryTypes.String)
require.NoError(t, err)
result, err := fn.Evaluate(lhsArray, rhsArray)
result, err := fn.Evaluate(lhsArray, rhsArray, false, false)
require.NoError(t, err)
actual := extractBoolValues(result)
@ -402,7 +402,7 @@ func TestIntegerComparisonFunctions(t *testing.T) {
fn, err := binaryFunctions.GetForSignature(tt.op, arrow.PrimitiveTypes.Int64)
require.NoError(t, err)
result, err := fn.Evaluate(lhsArray, rhsArray)
result, err := fn.Evaluate(lhsArray, rhsArray, false, false)
require.NoError(t, err)
actual := extractBoolValues(result)
@ -471,7 +471,7 @@ func TestTimestampComparisonFunctions(t *testing.T) {
fn, err := binaryFunctions.GetForSignature(tt.op, arrow.FixedWidthTypes.Timestamp_ns)
require.NoError(t, err)
result, err := fn.Evaluate(lhsArray, rhsArray)
result, err := fn.Evaluate(lhsArray, rhsArray, false, false)
require.NoError(t, err)
actual := extractBoolValues(result)
@ -540,7 +540,7 @@ func TestFloat64ComparisonFunctions(t *testing.T) {
fn, err := binaryFunctions.GetForSignature(tt.op, arrow.PrimitiveTypes.Float64)
require.NoError(t, err)
result, err := fn.Evaluate(lhsArray, rhsArray)
result, err := fn.Evaluate(lhsArray, rhsArray, false, false)
require.NoError(t, err)
actual := extractBoolValues(result)
@ -609,7 +609,7 @@ func TestStringMatchingFunctions(t *testing.T) {
fn, err := binaryFunctions.GetForSignature(tt.op, arrow.BinaryTypes.String)
require.NoError(t, err)
result, err := fn.Evaluate(lhsArray, rhsArray)
result, err := fn.Evaluate(lhsArray, rhsArray, false, false)
require.NoError(t, err)
actual := extractBoolValues(result)
@ -618,6 +618,48 @@ func TestStringMatchingFunctions(t *testing.T) {
}
}
func TestCompileRegexMatchFunctions(t *testing.T) {
tests := []struct {
name string
op types.BinaryOp
lhs []string
rhs []string
expected []bool
}{
{
name: "regex match", // |~ "^\w+\d+$"
op: types.BinaryOpMatchRe,
lhs: []string{"foo123", "foo", "bar456", "bar"},
rhs: []string{"^\\w+\\d+$", "^\\w+\\d+$", "^\\w+\\d+$", "^\\w+\\d+$"},
expected: []bool{true, false, true, false},
},
{
name: "regex not match", // !~ "^\w+\d+$"
op: types.BinaryOpNotMatchRe,
lhs: []string{"foo123", "foo", "bar456", "bar"},
rhs: []string{"^\\w+\\d+$", "^\\w+\\d+$", "^\\w+\\d+$", "^\\w+\\d+$"},
expected: []bool{false, true, false, true},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
lhsArray := createStringArray(tt.lhs, nil)
rhsArray := createStringArray(tt.rhs, nil)
fn, err := binaryFunctions.GetForSignature(tt.op, arrow.BinaryTypes.String)
require.NoError(t, err)
result, err := fn.Evaluate(lhsArray, rhsArray, false, true)
require.NoError(t, err)
actual := extractBoolValues(result)
assert.Equal(t, tt.expected, actual)
})
}
}
func TestNullValueHandling(t *testing.T) {
tests := []struct {
name string
@ -668,7 +710,7 @@ func TestNullValueHandling(t *testing.T) {
fn, err := binaryFunctions.GetForSignature(tt.op, tt.dataType)
require.NoError(t, err)
result, err := fn.Evaluate(lhs, rhs)
result, err := fn.Evaluate(lhs, rhs, false, false)
require.NoError(t, err)
actual := extractBoolValues(result)
@ -723,7 +765,7 @@ func TestArrayLengthMismatch(t *testing.T) {
fn, err := binaryFunctions.GetForSignature(tt.op, tt.dataType)
require.NoError(t, err)
result, err := fn.Evaluate(lhs, rhs)
result, err := fn.Evaluate(lhs, rhs, false, false)
assert.Error(t, err)
assert.Nil(t, result)
})
@ -738,7 +780,7 @@ func TestRegexCompileError(t *testing.T) {
fn, err := binaryFunctions.GetForSignature(types.BinaryOpMatchRe, arrow.BinaryTypes.String)
require.NoError(t, err)
_, err = fn.Evaluate(lhs, rhs)
_, err = fn.Evaluate(lhs, rhs, false, false)
require.Error(t, err)
}
@ -776,7 +818,7 @@ func TestEmptyArrays(t *testing.T) {
fn, err := binaryFunctions.GetForSignature(types.BinaryOpEq, arrow.BinaryTypes.String)
require.NoError(t, err)
result, err := fn.Evaluate(lhs, rhs)
result, err := fn.Evaluate(lhs, rhs, false, false)
require.NoError(t, err)
assert.Equal(t, int(0), result.Len())

@ -40,7 +40,7 @@ type iterateFunc func(tenant string, partition int32, stream streamUsage)
// getPolicyBucketAndLimit determines which policy bucket to use and the max streams limit
// for a given tenant and policy. Returns the policy bucket name and the max streams limit.
// The policy bucket will be the input policy name only if the max streams limit is overriden for the policy.
// The policy bucket will be the input policy name only if the max streams limit is overridden for the policy.
func (s *usageStore) getPolicyBucketAndStreamsLimit(tenant, policy string) (policyBucket string, maxStreams uint64) {
defaultMaxStreams := uint64(s.limits.MaxGlobalStreamsPerUser(tenant) / s.numPartitions)

@ -398,7 +398,7 @@ func (s *LineFilterLabelFilter) String() string {
} else {
rStr = fmt.Sprintf("`%s`", rStr)
}
str := fmt.Sprintf("%s%s%s", s.Matcher.Name, s.Matcher.Type, rStr)
str := fmt.Sprintf("%s%s%s", s.Name, s.Type, rStr)
return str
}
return s.Matcher.String()

Loading…
Cancel
Save