mirror of https://github.com/grafana/loki
Ruler: rule evaluation jitter (#8848)
**What this PR does / why we need it**: This PR introduces a configurable random sleep before each rule evaluation to prevent contention on the query engine should multiple runs execute concurrently. It's quite likely that rules will be evaluated ~concurrently if they have the same interval.pull/8853/head
parent
f5f1753851
commit
5c3d204ebf
@ -0,0 +1,37 @@ |
||||
package ruler |
||||
|
||||
import ( |
||||
"context" |
||||
"math/rand" |
||||
"time" |
||||
|
||||
"github.com/grafana/loki/pkg/logqlmodel" |
||||
) |
||||
|
||||
// EvaluatorWithJitter wraps a given Evaluator. It applies a randomly-generated jitter (sleep) before each evaluation to
|
||||
// protect against thundering-herd scenarios where multiple rules are evaluated at the same time.
|
||||
type EvaluatorWithJitter struct { |
||||
inner Evaluator |
||||
maxJitter time.Duration |
||||
rng *rand.Rand |
||||
} |
||||
|
||||
func NewEvaluatorWithJitter(inner Evaluator, maxJitter time.Duration, rngSource rand.Source) Evaluator { |
||||
if maxJitter <= 0 { |
||||
// jitter is disabled or invalid
|
||||
return inner |
||||
} |
||||
|
||||
return &EvaluatorWithJitter{ |
||||
inner: inner, |
||||
maxJitter: maxJitter, |
||||
rng: rand.New(rngSource), |
||||
} |
||||
} |
||||
|
||||
func (e *EvaluatorWithJitter) Eval(ctx context.Context, qs string, now time.Time) (*logqlmodel.Result, error) { |
||||
jitter := time.Duration(e.rng.Int63n(e.maxJitter.Nanoseconds())) |
||||
time.Sleep(jitter) |
||||
|
||||
return e.inner.Eval(ctx, qs, now) |
||||
} |
@ -0,0 +1,46 @@ |
||||
package ruler |
||||
|
||||
import ( |
||||
"context" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/stretchr/testify/require" |
||||
|
||||
"github.com/grafana/loki/pkg/logqlmodel" |
||||
) |
||||
|
||||
const fixedRandNum int64 = 987654321 |
||||
|
||||
type mockEval struct{} |
||||
|
||||
func (m mockEval) Eval(context.Context, string, time.Time) (*logqlmodel.Result, error) { |
||||
return nil, nil |
||||
} |
||||
|
||||
type fakeSource struct{} |
||||
|
||||
func (f fakeSource) Int63() int64 { return fixedRandNum } |
||||
func (f fakeSource) Seed(int64) {} |
||||
|
||||
func TestEvaluationWithJitter(t *testing.T) { |
||||
const jitter = 2 * time.Second |
||||
|
||||
eval := NewEvaluatorWithJitter(mockEval{}, jitter, fakeSource{}) |
||||
|
||||
then := time.Now() |
||||
_, _ = eval.Eval(context.Background(), "some logql query...", time.Now()) |
||||
since := time.Since(then) |
||||
|
||||
require.GreaterOrEqual(t, since.Nanoseconds(), fixedRandNum) |
||||
} |
||||
|
||||
func TestEvaluationWithNoJitter(t *testing.T) { |
||||
const jitter = 0 |
||||
|
||||
inner := mockEval{} |
||||
eval := NewEvaluatorWithJitter(inner, jitter, fakeSource{}) |
||||
|
||||
// return the inner evaluator if jitter is disabled
|
||||
require.Exactly(t, inner, eval) |
||||
} |
Loading…
Reference in new issue