Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/dataobj/sections/stats/builder.go

126 lines
3.7 KiB

package stats
import (
"fmt"
"sort"
"strings"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/sections/internal/columnar"
)
// Builder accumulates [Stat] rows and encodes them into a section via a
// [dataobj.SectionWriter].
//
// Flush writes are done via [Builder.Flush], which encodes all accumulated
// rows and writes them to the provided [dataobj.SectionWriter].
type Builder struct {
metrics *Metrics
tenant string
encode SectionEncoder
rows []Stat
}
// NewBuilder creates a new Builder.
// encode is the [SectionEncoder] to use for encoding rows.
// metrics may be nil to disable instrumentation.
func NewBuilder(metrics *Metrics, encode SectionEncoder) *Builder {
return &Builder{
metrics: metrics,
encode: encode,
}
}
// SetTenant sets the tenant for this builder.
func (b *Builder) SetTenant(tenant string) { b.tenant = tenant }
// Tenant returns the tenant for this builder.
func (b *Builder) Tenant() string { return b.tenant }
// Type returns the [dataobj.SectionType] of the stats builder.
func (b *Builder) Type() dataobj.SectionType { return sectionType }
// Append adds a [Stat] row to the builder.
func (b *Builder) Append(stat Stat) {
b.rows = append(b.rows, stat)
}
// EstimatedSize returns an estimate of the encoded size of the accumulated
// rows in bytes. The estimate uses a per-row heuristic:
// - 5 int64 columns × 8 bytes = 40 bytes (SectionIndex, MinTimestamp, MaxTimestamp, RowCount, UncompressedSize)
// - len(ObjectPath) + len(SortSchema) bytes for fixed string columns
// - sum of len(k)+len(v) for all entries in Labels
func (b *Builder) EstimatedSize() int {
var total int
for _, r := range b.rows {
total += 5 * 8 // int64 columns: SectionIndex, MinTimestamp, MaxTimestamp, RowCount, UncompressedSize
total += len(r.ObjectPath) + len(r.SortSchema)
for k, v := range r.Labels {
total += len(k) + len(v)
}
}
return total
}
// Reset clears all accumulated rows and resets the builder to a fresh state.
func (b *Builder) Reset() {
b.rows = b.rows[:0]
}
// compareStats returns true if a should sort before b, using the sort order:
// label values in sort schema order, then MinTimestamp, then MaxTimestamp.
//
// Iterates the SortSchema with [strings.SplitSeq] so the function does not
// allocate per comparison; sort.SliceStable invokes this O(n log n) times per
// flush, so avoiding the allocation matters at high row counts.
func compareStats(a, b Stat) bool {
for key := range strings.SplitSeq(a.SortSchema, ",") {
va, vb := a.Labels[key], b.Labels[key]
if va != vb {
return va < vb
}
}
if a.MinTimestamp != b.MinTimestamp {
return a.MinTimestamp < b.MinTimestamp
}
return a.MaxTimestamp < b.MaxTimestamp
}
// Flush sorts the accumulated rows by label values in sort schema order, then
// by MinTimestamp, then by MaxTimestamp. It encodes them via the [SectionEncoder]
// and writes the result to the provided [dataobj.SectionWriter].
//
// After a successful flush, the builder is reset.
func (b *Builder) Flush(w dataobj.SectionWriter) (n int64, err error) {
if len(b.rows) == 0 {
return 0, nil
}
if b.metrics != nil {
timer := prometheus.NewTimer(b.metrics.encodeSeconds)
defer timer.ObserveDuration()
}
// Sort rows by label values in sort schema order, then by MinTimestamp,
// then by MaxTimestamp as a final tie-breaker.
sort.SliceStable(b.rows, func(i, j int) bool {
return compareStats(b.rows[i], b.rows[j])
})
var enc columnar.Encoder
defer enc.Reset()
if err := b.encode(b.rows, &enc); err != nil {
return 0, fmt.Errorf("encoding stats: %w", err)
}
enc.SetTenant(b.tenant)
n, err = enc.Flush(w)
if err == nil {
b.Reset()
}
return n, err
}