Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/engine/internal/arrowagg/mapper.go

96 lines
2.7 KiB

package arrowagg
import (
"github.com/apache/arrow-go/v18/arrow"
)
// Mapper is a utility for quickly finding the index of common fields in a
// schema.
//
// Mapper caches mappings to speed up repeated lookups. Caches are cleared as
// [Mapper.RemoveSchema] is called, or when calling [Mapping.Reset].
type Mapper struct {
target []arrow.Field
mappings map[*arrow.Schema]*mapping
}
// NewMapper creates a new Mapper that locates the target fields in a schema.
func NewMapper(target []arrow.Field) *Mapper {
return &Mapper{
target: target,
mappings: make(map[*arrow.Schema]*mapping),
}
}
// FieldIndex returns the index of the targetIndex'd field in the schema, or -1
// if it doesn't exist. targetIndex corresponds to the index of the field in
// the target slice passed to [NewMapper].
//
// FieldIndex returns -1 if targetIndex is out of bounds for the target slice
// passed to [NewMapper].
func (m *Mapper) FieldIndex(schema *arrow.Schema, targetIndex int) int {
if targetIndex < 0 || targetIndex >= len(m.target) {
return -1
}
mapping := m.getOrMakeMapping(schema)
if len(mapping.lookups) <= targetIndex {
return -1
}
return mapping.lookups[targetIndex]
}
func (m *Mapper) getOrMakeMapping(schema *arrow.Schema) *mapping {
if cached, ok := m.mappings[schema]; ok {
return cached
}
res := newMapping(schema, m.target)
m.mappings[schema] = res
return res
}
// RemoveSchema removes an individual schema from the mapper.
func (m *Mapper) RemoveSchema(schema *arrow.Schema) {
delete(m.mappings, schema)
}
// Reset resets the mapper, immediately clearing any cached mappings.
func (m *Mapper) Reset() {
clear(m.mappings)
}
type mapping struct {
schema *arrow.Schema
checked map[*arrow.Schema]struct{} // schemas that have been checked for equality against this mapping.
lookups []int // lookups[i] -> index of the i'th "to" field in schema.
}
func newMapping(schema *arrow.Schema, to []arrow.Field) *mapping {
// Create a new mapping for the schema, and store it in the mappings map.
mapping := &mapping{
schema: schema,
lookups: make([]int, len(to)),
checked: make(map[*arrow.Schema]struct{}),
}
for i, target := range to {
// Default to -1 for fields that are not found in the schema.
mapping.lookups[i] = -1
fieldIdxs := schema.FieldIndices(target.Name)
if len(fieldIdxs) == 0 {
continue
} else if len(fieldIdxs) > 1 {
// this should not occur as FQN should make field names unique.
panic("mapper: multiple fields with the same name in schema")
}
// this check might be unnecessary given FQN uniqueness?
if schema.Field(fieldIdxs[0]).Equal(target) {
mapping.lookups[i] = fieldIdxs[0]
}
}
return mapping
}