Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/engine/internal/executor/aggregator.go

223 lines
6.1 KiB

package executor
import (
"maps"
"slices"
"strings"
"time"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/cespare/xxhash/v2"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
)
type groupState struct {
value float64 // aggregated value
count int64 // values counter
labels []arrow.Field // grouping labels
labelValues []string // grouping label values
}
type aggregationOperation int
const (
aggregationOperationSum aggregationOperation = iota
aggregationOperationMax
aggregationOperationMin
aggregationOperationCount
aggregationOperationAvg
aggregationOperationStddev
aggregationOperationStdvar
aggregationOperationBytes
)
// aggregator is used to aggregate sample values by a set of grouping keys for each point in time.
type aggregator struct {
points map[time.Time]map[uint64]*groupState // holds the groupState for each point in time series
digest *xxhash.Digest // used to compute key for each group
operation aggregationOperation // aggregation type
labels []arrow.Field // combined list of all label fields for all sample values
}
// newAggregator creates a new aggregator with the specified grouping.
func newAggregator(pointsSizeHint int, operation aggregationOperation) *aggregator {
a := aggregator{
digest: xxhash.New(),
operation: operation,
}
if pointsSizeHint > 0 {
a.points = make(map[time.Time]map[uint64]*groupState, pointsSizeHint)
} else {
a.points = make(map[time.Time]map[uint64]*groupState)
}
return &a
}
// AddLabels merges a list of labels that all sample values will have combined. This can be done several times
// over the lifetime of an aggregator to accommodate processing of multiple records with different schemas.
func (a *aggregator) AddLabels(labels []arrow.Field) {
for _, label := range labels {
if !slices.ContainsFunc(a.labels, func(l arrow.Field) bool {
return label.Equal(l)
}) {
a.labels = append(a.labels, label)
}
}
}
// Add adds a new sample value to the aggregation for the given timestamp and grouping label values.
// It expects labelValues to be in the same order as the groupBy columns.
func (a *aggregator) Add(ts time.Time, value float64, labels []arrow.Field, labelValues []string) {
if len(labels) != len(labelValues) {
panic("len(labels) != len(labelValues)")
}
point, ok := a.points[ts]
if !ok {
point = make(map[uint64]*groupState)
a.points[ts] = point
}
var key uint64
if len(labelValues) != 0 {
a.digest.Reset()
for i, val := range labelValues {
if i > 0 {
_, _ = a.digest.Write([]byte{0}) // separator
}
_, _ = a.digest.WriteString(labels[i].Name)
_, _ = a.digest.Write([]byte("="))
_, _ = a.digest.WriteString(val)
}
key = a.digest.Sum64()
}
if state, ok := point[key]; ok {
// TODO: handle hash collisions
// accumulate value based on aggregation type
switch a.operation {
case aggregationOperationSum:
state.value += value
case aggregationOperationMax:
if value > state.value {
state.value = value
}
case aggregationOperationMin:
if value < state.value {
state.value = value
}
case aggregationOperationAvg:
state.value += value
}
state.count++
} else {
count := int64(1)
if len(labels) == 0 {
// special case: All values aggregated into a single group.
point[key] = &groupState{
value: value,
count: count,
}
return
}
labelValuesCopy := make([]string, len(labelValues))
for i, v := range labelValues {
// copy the value as this is backed by the arrow array data buffer.
// We could retain the record to avoid this copy, but that would hold
// all other columns in memory for as long as the query is evaluated.
labelValuesCopy[i] = strings.Clone(v)
}
// TODO: add limits on number of groups
point[key] = &groupState{
labels: labels,
labelValues: labelValuesCopy,
value: value,
count: count,
}
}
}
func (a *aggregator) BuildRecord() (arrow.RecordBatch, error) {
fields := make([]arrow.Field, 0, len(a.labels)+2)
fields = append(fields,
semconv.FieldFromIdent(semconv.ColumnIdentTimestamp, false),
semconv.FieldFromIdent(semconv.ColumnIdentValue, false),
)
for _, label := range a.labels {
fields = append(fields, arrow.Field{
Name: label.Name,
Type: label.Type,
Nullable: true,
Metadata: label.Metadata,
})
}
schema := arrow.NewSchema(fields, nil)
rb := array.NewRecordBuilder(memory.NewGoAllocator(), schema)
// emit aggregated results in sorted order of timestamp
for _, ts := range a.getSortedTimestamps() {
tsValue, _ := arrow.TimestampFromTime(ts, arrow.Nanosecond)
for _, entry := range a.points[ts] {
var value float64
switch a.operation {
case aggregationOperationAvg:
value = entry.value / float64(entry.count)
case aggregationOperationCount:
value = float64(entry.count)
default:
value = entry.value
}
rb.Field(0).(*array.TimestampBuilder).Append(tsValue)
rb.Field(1).(*array.Float64Builder).Append(value)
for i, label := range a.labels {
builder := rb.Field(2 + i) // offset by 2 as the first 2 fields are timestamp and value
j := slices.IndexFunc(entry.labels, func(l arrow.Field) bool {
return l.Name == label.Name
})
if j == -1 {
builder.(*array.StringBuilder).AppendNull()
} else {
// TODO: differentiate between null and actual empty string
if entry.labelValues[j] == "" {
builder.(*array.StringBuilder).AppendNull()
} else {
builder.(*array.StringBuilder).Append(entry.labelValues[j])
}
}
}
}
}
return rb.NewRecordBatch(), nil
}
func (a *aggregator) Reset() {
a.digest.Reset()
// keep the timestamps but clear the aggregated values
for _, point := range a.points {
clear(point)
}
}
// getSortedTimestamps returns all timestamps in sorted order
func (a *aggregator) getSortedTimestamps() []time.Time {
return slices.SortedFunc(maps.Keys(a.points), func(a, b time.Time) int {
return a.Compare(b)
})
}