Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/engine/internal/executor/parse.go

241 lines
7.9 KiB

package executor
import (
"fmt"
"sort"
"unsafe"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/engine/internal/util"
)
func parseFn(op types.VariadicOp) VariadicFunction {
return VariadicFunctionFunc(func(args ...arrow.Array) (arrow.Array, error) {
sourceCol, requestedKeys, strict, keepEmpty, err := extractParseFnParameters(args)
if err != nil {
panic(err)
}
var headers []string
var parsedColumns []arrow.Array
switch op {
case types.VariadicOpParseLogfmt:
headers, parsedColumns = buildLogfmtColumns(sourceCol, requestedKeys, strict, keepEmpty)
case types.VariadicOpParseJSON:
headers, parsedColumns = buildJSONColumns(sourceCol, requestedKeys)
default:
return nil, fmt.Errorf("unsupported parser kind: %v", op)
}
// Build new schema with original fields plus parsed fields
newFields := make([]arrow.Field, 0, len(headers))
for _, header := range headers {
ct := types.ColumnTypeParsed
if header == semconv.ColumnIdentError.ShortName() || header == semconv.ColumnIdentErrorDetails.ShortName() {
ct = types.ColumnTypeGenerated
}
ident := semconv.NewIdentifier(header, ct, types.Loki.String)
newFields = append(newFields, semconv.FieldFromIdent(ident, true))
}
if len(parsedColumns) == 0 {
return nil, nil
}
return array.NewStructArrayWithFields(parsedColumns, newFields)
})
}
func extractParseFnParameters(args []arrow.Array) (*array.String, []string, bool, bool, error) {
// Valid signatures:
// parse(sourceColVec, requestedKeys, strict, keepEmpty)
if len(args) != 4 {
return nil, nil, false, false, fmt.Errorf("parse function expected 4 arguments, got %d", len(args))
}
var sourceColArr, requestedKeysArr, strictArr, keepEmptyArr arrow.Array
sourceColArr = args[0]
requestedKeysArr = args[1]
strictArr = args[2]
keepEmptyArr = args[3]
if sourceColArr == nil {
return nil, nil, false, false, fmt.Errorf("parse function arguments did not include a source ColumnVector to parse")
}
sourceCol, ok := sourceColArr.(*array.String)
if !ok {
return nil, nil, false, false, fmt.Errorf("parse can only operate on string column types, got %T", sourceColArr)
}
var requestedKeys []string
var strict, keepEmpty bool
// Requested keys will be the same for all rows, so we only need the first one
reqKeysIdx := 0
if requestedKeysArr != nil && !requestedKeysArr.IsNull(reqKeysIdx) {
reqKeysList, ok := requestedKeysArr.(*array.List)
if !ok {
return nil, nil, false, false, fmt.Errorf("requested keys must be a list of string arrays, got %T", requestedKeysArr)
}
firstRow, ok := util.ArrayListValue(reqKeysList, reqKeysIdx).([]string)
if !ok {
return nil, nil, false, false, fmt.Errorf("requested keys must be a list of string arrays, got a list of %T", firstRow)
}
requestedKeys = append(requestedKeys, firstRow...)
}
// Extract strict flag (boolean scalar array)
if strictArr != nil && strictArr.Len() > 0 {
boolArr, ok := strictArr.(*array.Boolean)
if !ok {
return nil, nil, false, false, fmt.Errorf("strict flag must be a boolean, got %T", strictArr)
}
strict = boolArr.Value(0)
}
// Extract keepEmpty flag (boolean scalar array)
if keepEmptyArr != nil && keepEmptyArr.Len() > 0 {
boolArr, ok := keepEmptyArr.(*array.Boolean)
if !ok {
return nil, nil, false, false, fmt.Errorf("keepEmpty flag must be a boolean, got %T", keepEmptyArr)
}
keepEmpty = boolArr.Value(0)
}
return sourceCol, requestedKeys, strict, keepEmpty, nil
}
// parseFunc represents a function that parses a single line and returns key-value pairs
type parseFunc func(line string) (map[string]string, error)
// buildColumns builds Arrow columns from input lines using the provided parser
// Returns the column headers, the Arrow columns, and any error
func buildColumns(input *array.String, _ []string, parseFunc parseFunc, errorType string) ([]string, []arrow.Array) {
columnBuilders := make(map[string]*array.StringBuilder)
columnOrder := parseLines(input, columnBuilders, parseFunc, errorType)
// Build final arrays
columns := make([]arrow.Array, 0, len(columnOrder))
headers := make([]string, 0, len(columnOrder))
for _, key := range columnOrder {
builder := columnBuilders[key]
columns = append(columns, builder.NewArray())
headers = append(headers, key)
}
return headers, columns
}
// parseLines discovers columns dynamically as lines are parsed
func parseLines(input *array.String, columnBuilders map[string]*array.StringBuilder, parseFunc parseFunc, errorType string) []string {
columnOrder := []string{}
var errorBuilder, errorDetailsBuilder *array.StringBuilder
hasErrorColumns := false
for i := 0; i < input.Len(); i++ {
line := input.Value(i)
parsed, err := parseFunc(line)
// Handle error columns
if err != nil {
// Create error columns on first error
if !hasErrorColumns {
errorBuilder = array.NewStringBuilder(memory.DefaultAllocator)
errorDetailsBuilder = array.NewStringBuilder(memory.DefaultAllocator)
columnBuilders[semconv.ColumnIdentError.ShortName()] = errorBuilder
columnBuilders[semconv.ColumnIdentErrorDetails.ShortName()] = errorDetailsBuilder
columnOrder = append(
columnOrder,
semconv.ColumnIdentError.ShortName(),
semconv.ColumnIdentErrorDetails.ShortName(),
)
hasErrorColumns = true
// Backfill NULLs for previous rows
for j := 0; j < i; j++ {
errorBuilder.AppendNull()
errorDetailsBuilder.AppendNull()
}
}
// Append error values
errorBuilder.Append(errorType)
errorDetailsBuilder.Append(err.Error())
// When there's an error, don't create new columns for the failed parse
// Only add NULLs for columns that already exist
} else if hasErrorColumns {
// No error on this row, but we have error columns
errorBuilder.AppendNull()
errorDetailsBuilder.AppendNull()
}
// Track which keys we've seen this row
seenKeys := make(map[string]struct{})
if hasErrorColumns {
// Mark error columns as seen so we don't append nulls for them
seenKeys[semconv.ColumnIdentError.ShortName()] = struct{}{}
seenKeys[semconv.ColumnIdentErrorDetails.ShortName()] = struct{}{}
}
// Add values for parsed keys (only if no error)
if err == nil {
for key, value := range parsed {
seenKeys[key] = struct{}{}
builder, exists := columnBuilders[key]
if !exists {
// New column discovered - create and backfill
builder = array.NewStringBuilder(memory.DefaultAllocator)
columnBuilders[key] = builder
columnOrder = append(columnOrder, key)
// Backfill NULLs for previous rows
builder.AppendNulls(i)
}
builder.Append(value)
}
}
// For error cases, don't mark the failed keys as seen - let them get NULLs below
// Append NULLs for columns not in this row
for _, key := range columnOrder {
if _, found := seenKeys[key]; !found {
columnBuilders[key].AppendNull()
}
}
}
// Sort column order for consistency (excluding error columns)
if hasErrorColumns {
// Keep error columns at the end, sort the rest
nonErrorColumns := make([]string, 0, len(columnOrder)-2)
for _, key := range columnOrder {
if key != semconv.ColumnIdentError.ShortName() && key != semconv.ColumnIdentErrorDetails.ShortName() {
nonErrorColumns = append(nonErrorColumns, key)
}
}
sort.Strings(nonErrorColumns)
columnOrder = append(nonErrorColumns, semconv.ColumnIdentError.ShortName(), semconv.ColumnIdentErrorDetails.ShortName())
} else {
sort.Strings(columnOrder)
}
return columnOrder
}
// unsafeBytes converts a string to []byte without allocation
func unsafeBytes(s string) []byte {
return unsafe.Slice(unsafe.StringData(s), len(s))
}
// unsafeString converts a []byte to string without allocation
func unsafeString(b []byte) string {
return unsafe.String(unsafe.SliceData(b), len(b))
}