mirror of https://github.com/grafana/loki
chore(compute): change selection vector behaviour (#21462)
parent
6c438b722a
commit
a523685dbd
@ -0,0 +1,30 @@ |
||||
package columnar |
||||
|
||||
import ( |
||||
"fmt" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/memory" |
||||
) |
||||
|
||||
type invalidMaskError struct { |
||||
maskLength int |
||||
inputLength int |
||||
} |
||||
|
||||
func (e *invalidMaskError) Error() string { |
||||
return fmt.Sprintf("mask length %d does not match input length %d", e.maskLength, e.inputLength) |
||||
} |
||||
|
||||
// AllSelected returns true if all elements in the array are selected by the
|
||||
// mask, or if the mask is the zero value.
|
||||
//
|
||||
// If the mask is non-zero, it must have the same length as the array.
|
||||
func AllSelected(arr Array, mask memory.Bitmap) (bool, error) { |
||||
if mask.Len() == 0 { |
||||
return true, nil |
||||
} |
||||
if mask.Len() != arr.Len() { |
||||
return false, &invalidMaskError{maskLength: mask.Len(), inputLength: arr.Len()} |
||||
} |
||||
return mask.SetCount() == arr.Len(), nil |
||||
} |
||||
@ -0,0 +1,101 @@ |
||||
package compute |
||||
|
||||
import ( |
||||
"fmt" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/columnar" |
||||
"github.com/grafana/loki/v3/pkg/memory" |
||||
) |
||||
|
||||
// Filter selects rows from the input datum where the corresponding bit in mask
|
||||
// is set, returning a new compacted array containing only the selected rows.
|
||||
//
|
||||
// The input must be an [columnar.Array]; Filter returns an error if a
|
||||
// [columnar.Scalar] is provided.
|
||||
//
|
||||
// If mask is empty (Len == 0), all rows are selected and the input is returned
|
||||
// unchanged. Filter returns an error if the mask length does not match the
|
||||
// input array length.
|
||||
func Filter(alloc *memory.Allocator, input columnar.Datum, mask memory.Bitmap) (columnar.Datum, error) { |
||||
arr, ok := input.(columnar.Array) |
||||
if !ok { |
||||
return nil, fmt.Errorf("Filter requires an Array input, got %T", input) |
||||
} |
||||
|
||||
if ok, err := columnar.AllSelected(arr, mask); ok { |
||||
return arr, nil |
||||
} else if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
switch src := arr.(type) { |
||||
case *columnar.Bool: |
||||
return filterBool(alloc, src, mask), nil |
||||
case *columnar.Number[int32]: |
||||
return filterNumber(alloc, src, mask), nil |
||||
case *columnar.Number[int64]: |
||||
return filterNumber(alloc, src, mask), nil |
||||
case *columnar.Number[uint32]: |
||||
return filterNumber(alloc, src, mask), nil |
||||
case *columnar.Number[uint64]: |
||||
return filterNumber(alloc, src, mask), nil |
||||
case *columnar.UTF8: |
||||
return filterUTF8(alloc, src, mask), nil |
||||
case *columnar.Null: |
||||
return filterNull(alloc, mask), nil |
||||
default: |
||||
return nil, fmt.Errorf("Filter: unsupported array type %T", input) |
||||
} |
||||
} |
||||
|
||||
func filterBool(alloc *memory.Allocator, src *columnar.Bool, mask memory.Bitmap) *columnar.Bool { |
||||
builder := columnar.NewBoolBuilder(alloc) |
||||
builder.Grow(mask.SetCount()) |
||||
for i := range mask.IterValues(true) { |
||||
if src.IsNull(i) { |
||||
builder.AppendNull() |
||||
} else { |
||||
builder.AppendValue(src.Get(i)) |
||||
} |
||||
} |
||||
return builder.Build() |
||||
} |
||||
|
||||
func filterNumber[T columnar.Numeric](alloc *memory.Allocator, src *columnar.Number[T], mask memory.Bitmap) *columnar.Number[T] { |
||||
builder := columnar.NewNumberBuilder[T](alloc) |
||||
builder.Grow(mask.SetCount()) |
||||
for i := range mask.IterValues(true) { |
||||
if src.IsNull(i) { |
||||
builder.AppendNull() |
||||
} else { |
||||
builder.AppendValue(src.Get(i)) |
||||
} |
||||
} |
||||
return builder.Build() |
||||
} |
||||
|
||||
func filterUTF8(alloc *memory.Allocator, src *columnar.UTF8, mask memory.Bitmap) *columnar.UTF8 { |
||||
n := mask.SetCount() |
||||
builder := columnar.NewUTF8Builder(alloc) |
||||
builder.Grow(n) |
||||
// Estimate data bytes from the source average. A two-pass approach to
|
||||
// compute the exact size is ~30% slower due to the extra iteration over
|
||||
// variable-length values; the estimate avoids most reallocations without
|
||||
// that cost.
|
||||
builder.GrowData(src.Size() * n / max(src.Len(), 1)) |
||||
for i := range mask.IterValues(true) { |
||||
if src.IsNull(i) { |
||||
builder.AppendNull() |
||||
} else { |
||||
builder.AppendValue(src.Get(i)) |
||||
} |
||||
} |
||||
return builder.Build() |
||||
} |
||||
|
||||
func filterNull(alloc *memory.Allocator, mask memory.Bitmap) *columnar.Null { |
||||
n := mask.SetCount() |
||||
validity := memory.NewBitmap(alloc, n) |
||||
validity.AppendCount(false, n) |
||||
return columnar.NewNull(validity) |
||||
} |
||||
@ -0,0 +1,84 @@ |
||||
package compute_test |
||||
|
||||
import ( |
||||
"testing" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/compute" |
||||
"github.com/grafana/loki/v3/pkg/memory" |
||||
) |
||||
|
||||
var filterSelectivities = map[string]func(*testing.B, *memory.Allocator) memory.Bitmap{ |
||||
"selectivity=100": func(*testing.B, *memory.Allocator) memory.Bitmap { return memory.Bitmap{} }, |
||||
"selectivity=50": func(b *testing.B, alloc *memory.Allocator) memory.Bitmap { |
||||
return makeAlternatingSelection(b, alloc, benchmarkSize) |
||||
}, |
||||
"selectivity=05": func(b *testing.B, alloc *memory.Allocator) memory.Bitmap { |
||||
return makeSparseSelection(b, alloc, benchmarkSize, 0.05) |
||||
}, |
||||
} |
||||
|
||||
func BenchmarkFilter_Bool(b *testing.B) { |
||||
for name, maskFunc := range filterSelectivities { |
||||
b.Run(name, func(b *testing.B) { |
||||
var alloc memory.Allocator |
||||
input := makeBoolArray(b, &alloc, benchmarkSize) |
||||
mask := maskFunc(b, &alloc) |
||||
|
||||
benchAlloc := memory.NewAllocator(nil) |
||||
for b.Loop() { |
||||
benchAlloc.Reclaim() |
||||
result, err := compute.Filter(benchAlloc, input, mask) |
||||
if err != nil { |
||||
b.Fatal(err) |
||||
} |
||||
_ = result |
||||
} |
||||
|
||||
reportArrayBenchMetrics(b, input) |
||||
}) |
||||
} |
||||
} |
||||
|
||||
func BenchmarkFilter_Int64(b *testing.B) { |
||||
for name, maskFunc := range filterSelectivities { |
||||
b.Run(name, func(b *testing.B) { |
||||
var alloc memory.Allocator |
||||
input := makeInt64Array(b, &alloc, benchmarkSize) |
||||
mask := maskFunc(b, &alloc) |
||||
|
||||
benchAlloc := memory.NewAllocator(nil) |
||||
for b.Loop() { |
||||
benchAlloc.Reclaim() |
||||
result, err := compute.Filter(benchAlloc, input, mask) |
||||
if err != nil { |
||||
b.Fatal(err) |
||||
} |
||||
_ = result |
||||
} |
||||
|
||||
reportArrayBenchMetrics(b, input) |
||||
}) |
||||
} |
||||
} |
||||
|
||||
func BenchmarkFilter_UTF8(b *testing.B) { |
||||
for name, maskFunc := range filterSelectivities { |
||||
b.Run(name, func(b *testing.B) { |
||||
var alloc memory.Allocator |
||||
input := makeUTF8Array(b, &alloc, benchmarkSize) |
||||
mask := maskFunc(b, &alloc) |
||||
|
||||
benchAlloc := memory.NewAllocator(nil) |
||||
for b.Loop() { |
||||
benchAlloc.Reclaim() |
||||
result, err := compute.Filter(benchAlloc, input, mask) |
||||
if err != nil { |
||||
b.Fatal(err) |
||||
} |
||||
_ = result |
||||
} |
||||
|
||||
reportArrayBenchMetrics(b, input) |
||||
}) |
||||
} |
||||
} |
||||
@ -1,19 +0,0 @@ |
||||
package compute |
||||
|
||||
import ( |
||||
"github.com/grafana/loki/v3/pkg/columnar" |
||||
"github.com/grafana/loki/v3/pkg/memory" |
||||
) |
||||
|
||||
// applySelectionToBoolArray applies a selection bitmap to a boolean array,
|
||||
// marking unselected rows as null in the result.
|
||||
func applySelectionToBoolArray(alloc *memory.Allocator, arr *columnar.Bool, selection memory.Bitmap) (*columnar.Bool, error) { |
||||
if selection.Len() == 0 { |
||||
return arr, nil |
||||
} |
||||
validity, err := computeValidityAA(alloc, arr.Validity(), selection) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return columnar.NewBool(arr.Values(), validity), nil |
||||
} |
||||
@ -0,0 +1,36 @@ |
||||
# |
||||
# FILTER (compute.Filter) |
||||
# |
||||
# FILTER takes an array and a selection mask, returning only the rows where |
||||
# the mask is true. |
||||
# |
||||
|
||||
# Bool arrays |
||||
FILTER bool:[true false true false] select:[true false true false] -> bool:[true true] |
||||
FILTER bool:[true false true false] select:[true true true true] -> bool:[true false true false] |
||||
FILTER bool:[true false true false] select:[false false false false] -> bool:[] |
||||
FILTER bool:[true false null true] select:[true false true false] -> bool:[true null] |
||||
FILTER bool:[null null null] select:[true true true] -> bool:[null null null] |
||||
|
||||
# Int32 arrays |
||||
FILTER int32:[1 2 3 4 5] select:[true false true false true] -> int32:[1 3 5] |
||||
FILTER int32:[10 20 30] select:[false false false] -> int32:[] |
||||
FILTER int32:[10 null 30] select:[true true false] -> int32:[10 null] |
||||
|
||||
# Int64 arrays |
||||
FILTER int64:[100 200 300] select:[true false true] -> int64:[100 300] |
||||
FILTER int64:[null 200 300] select:[true true false] -> int64:[null 200] |
||||
|
||||
# Uint32 arrays |
||||
FILTER uint32:[1 2 3] select:[false true false] -> uint32:[2] |
||||
|
||||
# Uint64 arrays |
||||
FILTER uint64:[10 20 30] select:[true true false] -> uint64:[10 20] |
||||
|
||||
# UTF8 arrays |
||||
FILTER utf8:["hello" "world" "foo"] select:[true false true] -> utf8:["hello" "foo"] |
||||
FILTER utf8:["a" "b" "c" "d"] select:[false true true false] -> utf8:["b" "c"] |
||||
FILTER utf8:[null "hello" "world"] select:[true true false] -> utf8:[null "hello"] |
||||
|
||||
# Null arrays |
||||
FILTER null:[null null null] select:[true false true] -> null:[null null] |
||||
Loading…
Reference in new issue