mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
243 lines
6.7 KiB
243 lines
6.7 KiB
package executor
|
|
|
|
import (
|
|
"maps"
|
|
"slices"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/apache/arrow-go/v18/arrow"
|
|
"github.com/apache/arrow-go/v18/arrow/array"
|
|
"github.com/apache/arrow-go/v18/arrow/memory"
|
|
"github.com/cespare/xxhash/v2"
|
|
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
|
|
)
|
|
|
|
type groupState struct {
|
|
value float64 // aggregated value
|
|
count int64 // values counter
|
|
labels []arrow.Field // grouping labels
|
|
labelValues []string // grouping label values
|
|
}
|
|
|
|
type aggregationOperation int
|
|
|
|
const (
|
|
aggregationOperationSum aggregationOperation = iota
|
|
aggregationOperationMax
|
|
aggregationOperationMin
|
|
aggregationOperationCount
|
|
aggregationOperationAvg
|
|
aggregationOperationStddev
|
|
aggregationOperationStdvar
|
|
aggregationOperationBytes
|
|
)
|
|
|
|
// aggregator is used to aggregate sample values by a set of grouping keys for each point in time.
|
|
type aggregator struct {
|
|
points map[time.Time]map[uint64]*groupState // holds the groupState for each point in time series
|
|
digest *xxhash.Digest // used to compute key for each group
|
|
operation aggregationOperation // aggregation type
|
|
labels []arrow.Field // combined list of all label fields for all sample values
|
|
clonedLabelValues map[string]string // cache of cloned strings to reduce allocations for repeated values
|
|
}
|
|
|
|
// newAggregator creates a new aggregator with the specified grouping.
|
|
func newAggregator(pointsSizeHint int, operation aggregationOperation) *aggregator {
|
|
a := aggregator{
|
|
digest: xxhash.New(),
|
|
operation: operation,
|
|
clonedLabelValues: make(map[string]string),
|
|
}
|
|
|
|
if pointsSizeHint > 0 {
|
|
a.points = make(map[time.Time]map[uint64]*groupState, pointsSizeHint)
|
|
} else {
|
|
a.points = make(map[time.Time]map[uint64]*groupState)
|
|
}
|
|
|
|
return &a
|
|
}
|
|
|
|
// AddLabels merges a list of labels that all sample values will have combined. This can be done several times
|
|
// over the lifetime of an aggregator to accommodate processing of multiple records with different schemas.
|
|
func (a *aggregator) AddLabels(labels []arrow.Field) {
|
|
for _, label := range labels {
|
|
if !slices.ContainsFunc(a.labels, func(l arrow.Field) bool {
|
|
return label.Equal(l)
|
|
}) {
|
|
a.labels = append(a.labels, label)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Add adds a new sample value to the aggregation for the given timestamp and grouping label values.
|
|
// It expects labelValues to be in the same order as the groupBy columns.
|
|
func (a *aggregator) Add(ts time.Time, value float64, labels []arrow.Field, labelValues []string) {
|
|
if len(labels) != len(labelValues) {
|
|
panic("len(labels) != len(labelValues)")
|
|
}
|
|
|
|
point, ok := a.points[ts]
|
|
if !ok {
|
|
point = make(map[uint64]*groupState)
|
|
a.points[ts] = point
|
|
}
|
|
|
|
var key uint64
|
|
if len(labelValues) != 0 {
|
|
a.digest.Reset()
|
|
for i, val := range labelValues {
|
|
if i > 0 {
|
|
_, _ = a.digest.Write([]byte{0}) // separator
|
|
}
|
|
|
|
_, _ = a.digest.WriteString(labels[i].Name)
|
|
_, _ = a.digest.Write([]byte("="))
|
|
_, _ = a.digest.WriteString(val)
|
|
}
|
|
key = a.digest.Sum64()
|
|
}
|
|
|
|
if state, ok := point[key]; ok {
|
|
// TODO: handle hash collisions
|
|
|
|
// accumulate value based on aggregation type
|
|
switch a.operation {
|
|
case aggregationOperationSum:
|
|
state.value += value
|
|
case aggregationOperationMax:
|
|
if value > state.value {
|
|
state.value = value
|
|
}
|
|
case aggregationOperationMin:
|
|
if value < state.value {
|
|
state.value = value
|
|
}
|
|
case aggregationOperationAvg:
|
|
state.value += value
|
|
}
|
|
|
|
state.count++
|
|
} else {
|
|
count := int64(1)
|
|
|
|
if len(labels) == 0 {
|
|
// special case: All values aggregated into a single group.
|
|
point[key] = &groupState{
|
|
value: value,
|
|
count: count,
|
|
}
|
|
return
|
|
}
|
|
|
|
labelValuesCopy := make([]string, len(labelValues))
|
|
for i, v := range labelValues {
|
|
// copy the value as this is backed by the arrow array data buffer.
|
|
// We could retain the record to avoid this copy, but that would hold
|
|
// all other columns in memory for as long as the query is evaluated.
|
|
cloned, ok := a.clonedLabelValues[v]
|
|
if !ok {
|
|
cloned = strings.Clone(v)
|
|
a.clonedLabelValues[v] = cloned
|
|
}
|
|
labelValuesCopy[i] = cloned
|
|
}
|
|
|
|
// TODO: add limits on number of groups
|
|
point[key] = &groupState{
|
|
labels: labels,
|
|
labelValues: labelValuesCopy,
|
|
value: value,
|
|
count: count,
|
|
}
|
|
}
|
|
}
|
|
|
|
func (a *aggregator) BuildRecord() (arrow.RecordBatch, error) {
|
|
fields := make([]arrow.Field, 0, len(a.labels)+2)
|
|
fields = append(fields,
|
|
semconv.FieldFromIdent(semconv.ColumnIdentTimestamp, false),
|
|
semconv.FieldFromIdent(semconv.ColumnIdentValue, false),
|
|
)
|
|
for _, label := range a.labels {
|
|
fields = append(fields, arrow.Field{
|
|
Name: label.Name,
|
|
Type: label.Type,
|
|
Nullable: true,
|
|
Metadata: label.Metadata,
|
|
})
|
|
}
|
|
|
|
schema := arrow.NewSchema(fields, nil)
|
|
rb := array.NewRecordBuilder(memory.NewGoAllocator(), schema)
|
|
|
|
// emit aggregated results in sorted order of timestamp
|
|
sortedTimestamps := a.getSortedTimestamps()
|
|
|
|
// preallocate all builders to the total amount of rows
|
|
total := 0
|
|
for _, ts := range sortedTimestamps {
|
|
total += len(a.points[ts])
|
|
}
|
|
rb.Field(0).Reserve(total)
|
|
rb.Field(1).Reserve(total)
|
|
for i := range a.labels {
|
|
rb.Field(2 + i).Reserve(total)
|
|
}
|
|
|
|
for _, ts := range sortedTimestamps {
|
|
tsValue, _ := arrow.TimestampFromTime(ts, arrow.Nanosecond)
|
|
|
|
for _, entry := range a.points[ts] {
|
|
var value float64
|
|
switch a.operation {
|
|
case aggregationOperationAvg:
|
|
value = entry.value / float64(entry.count)
|
|
case aggregationOperationCount:
|
|
value = float64(entry.count)
|
|
default:
|
|
value = entry.value
|
|
}
|
|
|
|
rb.Field(0).(*array.TimestampBuilder).Append(tsValue)
|
|
rb.Field(1).(*array.Float64Builder).Append(value)
|
|
|
|
for i, label := range a.labels {
|
|
builder := rb.Field(2 + i) // offset by 2 as the first 2 fields are timestamp and value
|
|
|
|
j := slices.IndexFunc(entry.labels, func(l arrow.Field) bool {
|
|
return l.Name == label.Name
|
|
})
|
|
if j == -1 {
|
|
builder.(*array.StringBuilder).AppendNull()
|
|
} else {
|
|
// TODO: differentiate between null and actual empty string
|
|
if entry.labelValues[j] == "" {
|
|
builder.(*array.StringBuilder).AppendNull()
|
|
} else {
|
|
builder.(*array.StringBuilder).Append(entry.labelValues[j])
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return rb.NewRecordBatch(), nil
|
|
}
|
|
|
|
func (a *aggregator) Reset() {
|
|
a.digest.Reset()
|
|
// keep the timestamps but clear the aggregated values
|
|
for _, point := range a.points {
|
|
clear(point)
|
|
}
|
|
}
|
|
|
|
// getSortedTimestamps returns all timestamps in sorted order
|
|
func (a *aggregator) getSortedTimestamps() []time.Time {
|
|
return slices.SortedFunc(maps.Keys(a.points), func(a, b time.Time) int {
|
|
return a.Compare(b)
|
|
})
|
|
}
|
|
|