[new feature] logql: extrapolate unwrapped rate function (#5013)

* [new feature] logql:reimplement metrics rate function

* [new feature] logql:reimplement metrics rate function

* [new feature] logql:reimplement metrics rate function

* [new feature] logql: extrapolate unwrapped rate function #5013
pull/5032/head
李国忠 3 years ago committed by GitHub
parent 03be8dac1e
commit a66e148f83
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 75
      pkg/logql/engine_test.go
  2. 82
      pkg/logql/functions.go

@ -30,6 +30,58 @@ var (
ErrMockMultiple = util.MultiError{ErrMock, ErrMock}
)
func TestEngine_LogsRateUnwrap(t *testing.T) {
t.Parallel()
for _, test := range []struct {
qs string
ts time.Time
direction logproto.Direction
limit uint32
// an array of data per params will be returned by the querier.
// This is to cover logql that requires multiple queries.
data interface{}
params interface{}
expected interface{}
}{
{
`rate({app="foo"} | unwrap foo [30s])`, time.Unix(60, 0), logproto.FORWARD, 10,
[][]logproto.Series{
// 30s range the lower bound of the range is not inclusive only 15 samples will make it 60 included
{newSeries(testSize, offset(46, incValue(10)), `{app="foo"}`)},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(30, 0), End: time.Unix(60, 0), Selector: `rate({app="foo"} | unwrap foo[30s])`}},
},
promql.Vector{promql.Sample{Point: promql.Point{T: 60 * 1000, V: 0.46666766666666665}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}}}},
},
} {
test := test
t.Run(fmt.Sprintf("%s %s", test.qs, test.direction), func(t *testing.T) {
t.Parallel()
eng := NewEngine(EngineOpts{}, newQuerierRecorder(t, test.data, test.params), NoLimits)
q := eng.Query(LiteralParams{
qs: test.qs,
start: test.ts,
end: test.ts,
direction: test.direction,
limit: test.limit,
})
res, err := q.Exec(user.InjectOrgID(context.Background(), "fake"))
if expectedError, ok := test.expected.(error); ok {
assert.Equal(t, expectedError.Error(), err.Error())
} else {
if err != nil {
t.Fatal(err)
}
assert.Equal(t, test.expected, res.Data)
}
})
}
}
func TestEngine_LogsInstantQuery(t *testing.T) {
t.Parallel()
for _, test := range []struct {
@ -95,7 +147,7 @@ func TestEngine_LogsInstantQuery(t *testing.T) {
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(30, 0), End: time.Unix(60, 0), Selector: `rate({app="foo"} | unwrap foo[30s])`}},
},
promql.Vector{promql.Sample{Point: promql.Point{T: 60 * 1000, V: 1.0}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}}}},
promql.Vector{promql.Sample{Point: promql.Point{T: 60 * 1000, V: 0.0}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}}}},
},
{
`count_over_time({app="foo"} |~".+bar" [1m])`, time.Unix(60, 0), logproto.BACKWARD, 10,
@ -1240,11 +1292,11 @@ func TestEngine_RangeQuery(t *testing.T) {
promql.Matrix{
promql.Series{
Metric: labels.Labels{{Name: "app", Value: "bar"}},
Points: []promql.Point{{T: 60 * 1000, V: 0.4}, {T: 90 * 1000, V: 0.4}, {T: 120 * 1000, V: 0.4}, {T: 150 * 1000, V: 0.4}, {T: 180 * 1000, V: 0.4}},
Points: []promql.Point{{T: 60 * 1000, V: 0.0}, {T: 90 * 1000, V: 0.0}, {T: 120 * 1000, V: 0.0}, {T: 150 * 1000, V: 0.0}, {T: 180 * 1000, V: 0.0}},
},
promql.Series{
Metric: labels.Labels{{Name: "app", Value: "foo"}},
Points: []promql.Point{{T: 60 * 1000, V: 0.2}, {T: 90 * 1000, V: 0.2}, {T: 120 * 1000, V: 0.2}, {T: 150 * 1000, V: 0.2}, {T: 180 * 1000, V: 0.2}},
Points: []promql.Point{{T: 60 * 1000, V: 0.0}, {T: 90 * 1000, V: 0.0}, {T: 120 * 1000, V: 0.0}, {T: 150 * 1000, V: 0.0}, {T: 180 * 1000, V: 0.0}},
},
},
},
@ -2451,6 +2503,23 @@ func constantValue(t int64) generator {
}
}
// nolint
func incValue(val int64) generator {
return func(i int64) logData {
return logData{
Entry: logproto.Entry{
Timestamp: time.Unix(i, 0),
Line: fmt.Sprintf("%d", i),
},
Sample: logproto.Sample{
Timestamp: time.Unix(i, 0).UnixNano(),
Hash: uint64(i),
Value: float64(val + i),
},
}
}
}
// nolint
func inverse(g generator) generator {
return func(i int64) logData {

@ -128,12 +128,86 @@ func rateLogs(selRange time.Duration, computeValues bool) func(samples []promql.
if !computeValues {
return float64(len(samples)) / selRange.Seconds()
}
var total float64
for _, p := range samples {
total += p.V
return extrapolatedRate(samples, selRange, true, true)
}
return total / selRange.Seconds()
}
// extrapolatedRate function is taken from prometheus code promql/functions.go:59
// extrapolatedRate is a utility function for rate/increase/delta.
// It calculates the rate (allowing for counter resets if isCounter is true),
// extrapolates if the first/last sample is close to the boundary, and returns
// the result as either per-second (if isRate is true) or overall.
func extrapolatedRate(samples []promql.Point, selRange time.Duration, isCounter, isRate bool) float64 {
// No sense in trying to compute a rate without at least two points. Drop
// this Vector element.
if len(samples) < 2 {
return 0
}
var (
rangeStart = samples[0].T - durationMilliseconds(selRange)
rangeEnd = samples[len(samples)-1].T
)
resultValue := samples[len(samples)-1].V - samples[0].V
if isCounter {
var lastValue float64
for _, sample := range samples {
if sample.V < lastValue {
resultValue += lastValue
}
lastValue = sample.V
}
}
// Duration between first/last samples and boundary of range.
durationToStart := float64(samples[0].T-rangeStart) / 1000
durationToEnd := float64(rangeEnd-samples[len(samples)-1].T) / 1000
sampledInterval := float64(samples[len(samples)-1].T-samples[0].T) / 1000
averageDurationBetweenSamples := sampledInterval / float64(len(samples)-1)
if isCounter && resultValue > 0 && samples[0].V >= 0 {
// Counters cannot be negative. If we have any slope at
// all (i.e. resultValue went up), we can extrapolate
// the zero point of the counter. If the duration to the
// zero point is shorter than the durationToStart, we
// take the zero point as the start of the series,
// thereby avoiding extrapolation to negative counter
// values.
durationToZero := sampledInterval * (samples[0].V / resultValue)
if durationToZero < durationToStart {
durationToStart = durationToZero
}
}
// If the first/last samples are close to the boundaries of the range,
// extrapolate the result. This is as we expect that another sample
// will exist given the spacing between samples we've seen thus far,
// with an allowance for noise.
extrapolationThreshold := averageDurationBetweenSamples * 1.1
extrapolateToInterval := sampledInterval
if durationToStart < extrapolationThreshold {
extrapolateToInterval += durationToStart
} else {
extrapolateToInterval += averageDurationBetweenSamples / 2
}
if durationToEnd < extrapolationThreshold {
extrapolateToInterval += durationToEnd
} else {
extrapolateToInterval += averageDurationBetweenSamples / 2
}
resultValue = resultValue * (extrapolateToInterval / sampledInterval)
if isRate {
seconds := selRange.Seconds()
resultValue = resultValue / seconds
}
return resultValue
}
func durationMilliseconds(d time.Duration) int64 {
return int64(d / (time.Millisecond / time.Nanosecond))
}
// rateLogBytes calculates the per-second rate of log bytes.

Loading…
Cancel
Save