mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
298 lines
8.0 KiB
298 lines
8.0 KiB
package xcap
|
|
|
|
import (
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/dustin/go-humanize"
|
|
|
|
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
|
|
)
|
|
|
|
// observations holds aggregated observations that can be transformed and merged.
|
|
//
|
|
// All transformation methods (filter, prefix, normalizeKeys) return new instances,
|
|
// leaving the original unchanged.
|
|
type observations struct {
|
|
data map[StatisticKey]*AggregatedObservation
|
|
}
|
|
|
|
// newObservations creates an empty observations.
|
|
func newObservations() *observations {
|
|
return &observations{data: make(map[StatisticKey]*AggregatedObservation)}
|
|
}
|
|
|
|
// filter returns a new observations containing only entries with matching stat keys.
|
|
func (o *observations) filter(keys ...StatisticKey) *observations {
|
|
if len(keys) == 0 || o == nil {
|
|
return o
|
|
}
|
|
|
|
keySet := make(map[StatisticKey]struct{}, len(keys))
|
|
for _, k := range keys {
|
|
keySet[k] = struct{}{}
|
|
}
|
|
|
|
result := newObservations()
|
|
for k, obs := range o.data {
|
|
if _, ok := keySet[k]; ok {
|
|
result.data[k] = obs
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
|
|
// prefix returns a new observations with all stat names prefixed.
|
|
func (o *observations) prefix(p string) *observations {
|
|
if p == "" || o == nil {
|
|
return o
|
|
}
|
|
|
|
result := newObservations()
|
|
for k, obs := range o.data {
|
|
newKey := StatisticKey{
|
|
Name: p + k.Name,
|
|
DataType: k.DataType,
|
|
Aggregation: k.Aggregation,
|
|
}
|
|
result.data[newKey] = obs
|
|
}
|
|
return result
|
|
}
|
|
|
|
// normalizeKeys returns a new observations with dots replaced by underscores in stat names.
|
|
func (o *observations) normalizeKeys() *observations {
|
|
if o == nil {
|
|
return o
|
|
}
|
|
|
|
result := newObservations()
|
|
for k, obs := range o.data {
|
|
newKey := StatisticKey{
|
|
Name: strings.ReplaceAll(k.Name, ".", "_"),
|
|
DataType: k.DataType,
|
|
Aggregation: k.Aggregation,
|
|
}
|
|
result.data[newKey] = obs
|
|
}
|
|
return result
|
|
}
|
|
|
|
// merge merges another observations into this one.
|
|
func (o *observations) merge(other *observations) {
|
|
if other == nil {
|
|
return
|
|
}
|
|
for k, obs := range other.data {
|
|
if existing, ok := o.data[k]; ok {
|
|
existing.Merge(obs)
|
|
} else {
|
|
o.data[k] = &AggregatedObservation{
|
|
Statistic: obs.Statistic,
|
|
Value: obs.Value,
|
|
Count: obs.Count,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// ToLogValues converts observations to a slice suitable for go-kit/log.
|
|
// Keys are sorted for deterministic output.
|
|
func (o *observations) toLogValues() []any {
|
|
if o == nil {
|
|
return nil
|
|
}
|
|
|
|
// Collect key-value pairs for sorting by name.
|
|
type kv struct {
|
|
name string
|
|
value any
|
|
}
|
|
pairs := make([]kv, 0, len(o.data))
|
|
for k, obs := range o.data {
|
|
pairs = append(pairs, kv{name: k.Name, value: obs.Value})
|
|
}
|
|
sort.Slice(pairs, func(i, j int) bool {
|
|
return strings.Compare(pairs[i].name, pairs[j].name) < 0
|
|
})
|
|
|
|
result := make([]any, 0, len(pairs)*2)
|
|
for _, p := range pairs {
|
|
value := p.value
|
|
|
|
// Format bytes values (keys ending with "_bytes")
|
|
if strings.HasSuffix(p.name, "_bytes") {
|
|
switch val := value.(type) {
|
|
case uint64:
|
|
value = humanize.Bytes(val)
|
|
case int64:
|
|
value = humanize.Bytes(uint64(val))
|
|
}
|
|
}
|
|
|
|
// Format duration values (keys ending with "duration")
|
|
if strings.HasSuffix(p.name, "duration") {
|
|
switch val := value.(type) {
|
|
case float64:
|
|
value = time.Duration(val * float64(time.Second)).String()
|
|
case int64:
|
|
value = time.Duration(val * int64(time.Second)).String()
|
|
case uint64:
|
|
value = time.Duration(val * uint64(time.Second)).String()
|
|
}
|
|
}
|
|
|
|
result = append(result, p.name, value)
|
|
}
|
|
return result
|
|
}
|
|
|
|
// observationCollector provides methods to collect observations from a Capture.
|
|
type observationCollector struct {
|
|
capture *Capture
|
|
childrenMap map[identifier][]*Region
|
|
nameToRegions map[string][]*Region
|
|
}
|
|
|
|
// newObservationCollector creates a new collector for gathering observations from the given capture.
|
|
func newObservationCollector(capture *Capture) *observationCollector {
|
|
if capture == nil {
|
|
return nil
|
|
}
|
|
|
|
// Build
|
|
// - parent -> children
|
|
// - name -> matching regions
|
|
childrenMap := make(map[identifier][]*Region)
|
|
nameToRegions := make(map[string][]*Region)
|
|
for _, r := range capture.regions {
|
|
childrenMap[r.parentID] = append(childrenMap[r.parentID], r)
|
|
nameToRegions[r.name] = append(nameToRegions[r.name], r)
|
|
}
|
|
|
|
return &observationCollector{
|
|
capture: capture,
|
|
childrenMap: childrenMap,
|
|
nameToRegions: nameToRegions,
|
|
}
|
|
}
|
|
|
|
// fromRegions collects observations from regions with the given name.
|
|
// If rollUp is true, each region's stats include all its descendant stats
|
|
// aggregated according to each stat's aggregation type.
|
|
func (c *observationCollector) fromRegions(name string, rollUp bool, excluded ...string) *observations {
|
|
result := newObservations()
|
|
|
|
if c == nil {
|
|
return result
|
|
}
|
|
|
|
regions := c.nameToRegions[name]
|
|
if len(regions) == 0 {
|
|
return result
|
|
}
|
|
|
|
excludedSet := make(map[string]struct{}, len(excluded))
|
|
for _, name := range excluded {
|
|
excludedSet[name] = struct{}{}
|
|
}
|
|
|
|
for _, region := range regions {
|
|
var obs *observations
|
|
if rollUp {
|
|
obs = c.rollUpObservations(region, excludedSet)
|
|
} else {
|
|
obs = c.getRegionObservations(region)
|
|
}
|
|
|
|
result.merge(obs)
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// getRegionObservations returns a copy of a region's observations.
|
|
func (c *observationCollector) getRegionObservations(region *Region) *observations {
|
|
result := newObservations()
|
|
for k, obs := range region.observations {
|
|
result.data[k] = &AggregatedObservation{
|
|
Statistic: obs.Statistic,
|
|
Value: obs.Value,
|
|
Count: obs.Count,
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
|
|
// rollUpObservations computes observations for a region including all its descendants.
|
|
// Stats are aggregated according to their aggregation type.
|
|
func (c *observationCollector) rollUpObservations(region *Region, excludedSet map[string]struct{}) *observations {
|
|
result := c.getRegionObservations(region)
|
|
|
|
// Recursively aggregate from children.
|
|
for _, child := range c.childrenMap[region.id] {
|
|
// Skip children with excluded names.
|
|
if _, excluded := excludedSet[child.name]; excluded {
|
|
continue
|
|
}
|
|
result.merge(c.rollUpObservations(child, excludedSet))
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// Region name for data object scan operations.
|
|
const regionNameDataObjScan = "DataObjScan"
|
|
|
|
// ToStatsSummary computes a stats.Result from observations in the capture.
|
|
func (c *Capture) ToStatsSummary(execTime, queueTime time.Duration, totalEntriesReturned int) stats.Result {
|
|
result := stats.Result{
|
|
Querier: stats.Querier{
|
|
Store: stats.Store{
|
|
QueryUsedV2Engine: true,
|
|
},
|
|
},
|
|
}
|
|
|
|
if c == nil {
|
|
result.ComputeSummary(execTime, queueTime, totalEntriesReturned)
|
|
return result
|
|
}
|
|
|
|
// Collect observations from DataObjScan as the summary stats mainly relate to log lines.
|
|
// In practice, new engine would process more bytes while scanning metastore objects and stream sections.
|
|
collector := newObservationCollector(c)
|
|
observations := collector.fromRegions(regionNameDataObjScan, true).filter(
|
|
StatPipelineRowsOut.Key(),
|
|
StatDatasetPrimaryRowsRead.Key(),
|
|
StatDatasetPrimaryRowBytes.Key(),
|
|
StatDatasetSecondaryRowBytes.Key(),
|
|
)
|
|
|
|
// TODO: track and report TotalStructuredMetadataBytesProcessed
|
|
result.Querier.Store.Dataobj.PrePredicateDecompressedBytes = readInt64(observations, StatDatasetPrimaryRowBytes.Key())
|
|
result.Querier.Store.Dataobj.PostPredicateDecompressedBytes = readInt64(observations, StatDatasetSecondaryRowBytes.Key())
|
|
result.Querier.Store.Dataobj.PrePredicateDecompressedRows = readInt64(observations, StatDatasetPrimaryRowsRead.Key())
|
|
// TotalPostFilterLines: rows output after filtering
|
|
// TODO: this will report the wrong value if the plan has a filter stage.
|
|
// pick the min of row_out from filter and scan nodes.
|
|
result.Querier.Store.Dataobj.PostFilterRows = readInt64(observations, StatPipelineRowsOut.Key())
|
|
|
|
result.ComputeSummary(execTime, queueTime, totalEntriesReturned)
|
|
return result
|
|
}
|
|
|
|
// readInt64 reads an int64 observation for the given stat key.
|
|
func readInt64(o *observations, key StatisticKey) int64 {
|
|
if o == nil {
|
|
return 0
|
|
}
|
|
|
|
if agg, ok := o.data[key]; ok {
|
|
if v, ok := agg.Int64(); ok {
|
|
return v
|
|
}
|
|
}
|
|
return 0
|
|
}
|
|
|