|
|
|
|
@ -19,7 +19,6 @@ import ( |
|
|
|
|
"fmt" |
|
|
|
|
"hash/fnv" |
|
|
|
|
"math" |
|
|
|
|
"sort" |
|
|
|
|
"time" |
|
|
|
|
|
|
|
|
|
clientmodel "github.com/prometheus/client_golang/model" |
|
|
|
|
@ -382,27 +381,6 @@ func (node *VectorAggregation) labelsToGroupingKey(labels clientmodel.Metric) ui |
|
|
|
|
return summer.Sum64() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func labelsToKey(labels clientmodel.Metric) uint64 { |
|
|
|
|
pairs := metric.LabelPairs{} |
|
|
|
|
|
|
|
|
|
for label, value := range labels { |
|
|
|
|
pairs = append(pairs, &metric.LabelPair{ |
|
|
|
|
Name: label, |
|
|
|
|
Value: value, |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
sort.Sort(pairs) |
|
|
|
|
|
|
|
|
|
summer := fnv.New64a() |
|
|
|
|
|
|
|
|
|
for _, pair := range pairs { |
|
|
|
|
fmt.Fprint(summer, pair.Name, pair.Value) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return summer.Sum64() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// EvalVectorInstant evaluates a VectorNode with an instant query.
|
|
|
|
|
func EvalVectorInstant(node VectorNode, timestamp clientmodel.Timestamp, storage local.Storage, queryStats *stats.TimerGroup) (Vector, error) { |
|
|
|
|
totalEvalTimer := queryStats.GetTimer(stats.TotalEvalTime).Start() |
|
|
|
|
@ -436,7 +414,7 @@ func EvalVectorRange(node VectorNode, start clientmodel.Timestamp, end clientmod |
|
|
|
|
defer closer.Close() |
|
|
|
|
|
|
|
|
|
evalTimer := queryStats.GetTimer(stats.InnerEvalTime).Start() |
|
|
|
|
sampleStreams := map[uint64]*SampleStream{} |
|
|
|
|
sampleStreams := map[clientmodel.Fingerprint]*SampleStream{} |
|
|
|
|
for t := start; !t.After(end); t = t.Add(interval) { |
|
|
|
|
if et := totalEvalTimer.ElapsedTime(); et > *queryTimeout { |
|
|
|
|
evalTimer.Stop() |
|
|
|
|
@ -448,14 +426,14 @@ func EvalVectorRange(node VectorNode, start clientmodel.Timestamp, end clientmod |
|
|
|
|
Value: sample.Value, |
|
|
|
|
Timestamp: sample.Timestamp, |
|
|
|
|
} |
|
|
|
|
groupingKey := labelsToKey(sample.Metric.Metric) |
|
|
|
|
if sampleStreams[groupingKey] == nil { |
|
|
|
|
sampleStreams[groupingKey] = &SampleStream{ |
|
|
|
|
fp := sample.Metric.Metric.Fingerprint() |
|
|
|
|
if sampleStreams[fp] == nil { |
|
|
|
|
sampleStreams[fp] = &SampleStream{ |
|
|
|
|
Metric: sample.Metric, |
|
|
|
|
Values: metric.Values{samplePair}, |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
sampleStreams[groupingKey].Values = append(sampleStreams[groupingKey].Values, samplePair) |
|
|
|
|
sampleStreams[fp].Values = append(sampleStreams[fp].Values, samplePair) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|