Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/engine/internal/executor/aggregator.go

190 lines
5.5 KiB

package executor
import (
"fmt"
"maps"
"slices"
"strings"
"time"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/cespare/xxhash/v2"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
)
type groupState struct {
value float64 // aggregated value
labelValues []string // grouping label values
}
type aggregationOperation int
const (
aggregationOperationSum aggregationOperation = iota
aggregationOperationMax
aggregationOperationMin
aggregationOperationCount
)
// aggregator is used to aggregate sample values by a set of grouping keys for each point in time.
type aggregator struct {
groupBy []physical.ColumnExpression // columns to group by
points map[time.Time]map[uint64]*groupState // holds the groupState for each point in time series
digest *xxhash.Digest // used to compute key for each group
operation aggregationOperation // aggregation type
}
// newAggregator creates a new aggregator with the specified groupBy columns.
// empty groupBy indicates no grouping. All values are aggregated into a single group.
// TODO: add without argument to support `without(...)` grouping.
// A special case of `without()` that has empty groupBy is used for Noop grouping which retains the input labels as is.
func newAggregator(groupBy []physical.ColumnExpression, pointsSizeHint int, operation aggregationOperation) *aggregator {
a := aggregator{
groupBy: groupBy,
digest: xxhash.New(),
operation: operation,
}
if pointsSizeHint > 0 {
a.points = make(map[time.Time]map[uint64]*groupState, pointsSizeHint)
} else {
a.points = make(map[time.Time]map[uint64]*groupState)
}
return &a
}
// Add adds a new sample value to the aggregation for the given timestamp and grouping label values.
// It expects labelValues to be in the same order as the groupBy columns.
func (a *aggregator) Add(ts time.Time, value float64, labelValues []string) {
point, ok := a.points[ts]
if !ok {
point = make(map[uint64]*groupState)
a.points[ts] = point
}
var key uint64
if len(a.groupBy) != 0 {
a.digest.Reset()
for i, val := range labelValues {
if i > 0 {
_, _ = a.digest.Write([]byte{0}) // separator
}
_, _ = a.digest.WriteString(val)
}
key = a.digest.Sum64()
}
if state, ok := point[key]; ok {
// TODO: handle hash collisions
// accumulate value based on aggregation type
switch a.operation {
case aggregationOperationSum:
state.value += value
case aggregationOperationMax:
if value > state.value {
state.value = value
}
case aggregationOperationMin:
if value < state.value {
state.value = value
}
case aggregationOperationCount:
state.value = state.value + 1
}
} else {
v := value
if a.operation == aggregationOperationCount {
v = 1
}
if len(a.groupBy) == 0 {
// special case: All values aggregated into a single group.
// This applies to queries like `sum(...)`, `sum by () (...)`, `count_over_time by () (...)`.
point[key] = &groupState{
value: v,
}
return
}
// create a new slice since labelValues is reused by the calling code
labelValuesCopy := make([]string, len(labelValues))
for i, v := range labelValues {
// copy the value as this is backed by the arrow array data buffer.
// We could retain the record to avoid this copy, but that would hold
// all other columns in memory for as long as the query is evaluated.
labelValuesCopy[i] = strings.Clone(v)
}
// TODO: add limits on number of groups
point[key] = &groupState{
labelValues: labelValuesCopy,
value: v,
}
}
}
func (a *aggregator) BuildRecord() (arrow.RecordBatch, error) {
fields := make([]arrow.Field, 0, len(a.groupBy)+2)
fields = append(fields,
semconv.FieldFromIdent(semconv.ColumnIdentTimestamp, false),
semconv.FieldFromIdent(semconv.ColumnIdentValue, false),
)
for _, column := range a.groupBy {
colExpr, ok := column.(*physical.ColumnExpr)
if !ok {
panic(fmt.Sprintf("invalid column expression type %T", column))
}
ident := semconv.NewIdentifier(colExpr.Ref.Column, colExpr.Ref.Type, types.Loki.String)
fields = append(fields, semconv.FieldFromIdent(ident, true))
}
schema := arrow.NewSchema(fields, nil)
rb := array.NewRecordBuilder(memory.NewGoAllocator(), schema)
// emit aggregated results in sorted order of timestamp
for _, ts := range a.getSortedTimestamps() {
tsValue, _ := arrow.TimestampFromTime(ts, arrow.Nanosecond)
for _, entry := range a.points[ts] {
rb.Field(0).(*array.TimestampBuilder).Append(tsValue)
rb.Field(1).(*array.Float64Builder).Append(entry.value)
for col, val := range entry.labelValues {
builder := rb.Field(col + 2) // offset by 2 as the first 2 fields are timestamp and value
// TODO: differentiate between null and actual empty string
if val == "" {
builder.(*array.StringBuilder).AppendNull()
} else {
builder.(*array.StringBuilder).Append(val)
}
}
}
}
return rb.NewRecordBatch(), nil
}
func (a *aggregator) Reset() {
a.digest.Reset()
// keep the timestamps but clear the aggregated values
for _, point := range a.points {
clear(point)
}
}
// getSortedTimestamps returns all timestamps in sorted order
func (a *aggregator) getSortedTimestamps() []time.Time {
return slices.SortedFunc(maps.Keys(a.points), func(a, b time.Time) int {
return a.Compare(b)
})
}