Advisor: Reduce and spread load (#105012)

alerting/support-unknown-rule-state^2
Andres Martinez Gotor 2 months ago committed by GitHub
parent 1254fb9b68
commit 60670003b1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 41
      apps/advisor/pkg/app/checks/plugincheck/check.go
  2. 6
      apps/advisor/pkg/app/checks/plugincheck/check_test.go
  3. 20
      apps/advisor/pkg/app/checkscheduler/checkscheduler.go
  4. 13
      apps/advisor/pkg/app/checkscheduler/checkscheduler_test.go

@ -73,6 +73,9 @@ func (c *check) Steps() []checks.Step {
return []checks.Step{
&deprecationStep{
PluginRepo: c.PluginRepo,
PluginPreinstall: c.PluginPreinstall,
ManagedPlugins: c.ManagedPlugins,
ProvisionedPlugins: c.ProvisionedPlugins,
},
&updateStep{
PluginRepo: c.PluginRepo,
@ -85,6 +88,10 @@ func (c *check) Steps() []checks.Step {
type deprecationStep struct {
PluginRepo repo.Service
PluginPreinstall plugininstaller.Preinstall
ManagedPlugins managedplugins.Manager
ProvisionedPlugins provisionedplugins.Manager
provisionedPlugins []string
}
func (s *deprecationStep) Title() string {
@ -112,6 +119,19 @@ func (s *deprecationStep) Run(ctx context.Context, log logging.Logger, _ *adviso
// Skip if it's a core plugin
if p.IsCorePlugin() {
log.Debug("Skipping core plugin", "plugin", p.ID)
return nil, nil
}
// Skip if it's managed or pinned
if s.isManaged(ctx, p.ID) || s.PluginPreinstall.IsPinned(p.ID) {
log.Debug("Skipping managed or pinned plugin", "plugin", p.ID)
return nil, nil
}
// Skip if it's provisioned
if s.isProvisioned(ctx, p.ID) {
log.Debug("Skipping provisioned plugin", "plugin", p.ID)
return nil, nil
}
@ -241,3 +261,24 @@ func (s *updateStep) isProvisioned(ctx context.Context, pluginID string) bool {
}
return slices.Contains(s.provisionedPlugins, pluginID)
}
// Temporary duplicated code until there is a common IsUpdatable function
func (s *deprecationStep) isManaged(ctx context.Context, pluginID string) bool {
for _, managedPlugin := range s.ManagedPlugins.ManagedPlugins(ctx) {
if managedPlugin == pluginID {
return true
}
}
return false
}
func (s *deprecationStep) isProvisioned(ctx context.Context, pluginID string) bool {
if s.provisionedPlugins == nil {
var err error
s.provisionedPlugins, err = s.ProvisionedPlugins.ProvisionedPlugins(ctx)
if err != nil {
return false
}
}
return slices.Contains(s.provisionedPlugins, pluginID)
}

@ -115,7 +115,7 @@ func TestRun(t *testing.T) {
{JSONData: plugins.JSONData{ID: "plugin3", Name: "Plugin 3", Info: plugins.Info{Version: "1.0.0"}}},
},
pluginInfo: map[string]*repo.PluginInfo{
"plugin3": {Status: "active"},
"plugin3": {Status: "deprecated"}, // This should be ignored
},
pluginArchives: map[string]*repo.PluginArchiveInfo{
"plugin3": {Version: "1.1.0"},
@ -129,7 +129,7 @@ func TestRun(t *testing.T) {
{JSONData: plugins.JSONData{ID: "plugin4", Name: "Plugin 4", Info: plugins.Info{Version: "1.0.0"}}},
},
pluginInfo: map[string]*repo.PluginInfo{
"plugin4": {Status: "active"},
"plugin4": {Status: "deprecated"}, // This should be ignored
},
pluginArchives: map[string]*repo.PluginArchiveInfo{
"plugin4": {Version: "1.1.0"},
@ -143,7 +143,7 @@ func TestRun(t *testing.T) {
{JSONData: plugins.JSONData{ID: "plugin5", Name: "Plugin 5", Info: plugins.Info{Version: "1.0.0"}}},
},
pluginInfo: map[string]*repo.PluginInfo{
"plugin5": {Status: "active"},
"plugin5": {Status: "deprecated"}, // This should be ignored
},
pluginArchives: map[string]*repo.PluginArchiveInfo{
"plugin5": {Version: "1.1.0"},

@ -3,6 +3,7 @@ package checkscheduler
import (
"context"
"fmt"
"math/rand"
"sort"
"strconv"
"time"
@ -18,7 +19,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
const defaultEvaluationInterval = 24 * time.Hour
const defaultEvaluationInterval = 7 * 24 * time.Hour // 7 days
const defaultMaxHistory = 10
// Runner is a "runnable" app used to be able to expose and API endpoint
@ -90,11 +91,7 @@ func (r *Runner) Run(ctx context.Context) error {
}
}
nextSendInterval := time.Until(lastCreated.Add(r.evaluationInterval))
if nextSendInterval < time.Minute {
nextSendInterval = 1 * time.Minute
}
nextSendInterval := getNextSendInterval(lastCreated, r.evaluationInterval)
ticker := time.NewTicker(nextSendInterval)
defer ticker.Stop()
@ -217,6 +214,17 @@ func getEvaluationInterval(pluginConfig map[string]string) (time.Duration, error
return evaluationInterval, nil
}
func getNextSendInterval(lastCreated time.Time, evaluationInterval time.Duration) time.Duration {
nextSendInterval := time.Until(lastCreated.Add(evaluationInterval))
// Add random variation of one hour
randomVariation := time.Duration(rand.Int63n(time.Hour.Nanoseconds()))
nextSendInterval += randomVariation
if nextSendInterval < time.Minute {
nextSendInterval = 1 * time.Minute
}
return nextSendInterval
}
func getMaxHistory(pluginConfig map[string]string) (int, error) {
maxHistory := defaultMaxHistory
configMaxHistory, ok := pluginConfig["max_history"]

@ -214,7 +214,7 @@ func Test_getEvaluationInterval(t *testing.T) {
t.Run("default", func(t *testing.T) {
interval, err := getEvaluationInterval(map[string]string{})
assert.NoError(t, err)
assert.Equal(t, 24*time.Hour, interval)
assert.Equal(t, 7*24*time.Hour, interval)
})
t.Run("invalid", func(t *testing.T) {
@ -285,6 +285,17 @@ func Test_markUnprocessedChecksAsErrored(t *testing.T) {
assert.Equal(t, expectedAnnotations, patchOperation.Value)
}
func Test_getNextSendInterval(t *testing.T) {
lastCreated := time.Now().Add(-7 * 24 * time.Hour)
evaluationInterval := 7 * 24 * time.Hour
nextSendInterval := getNextSendInterval(lastCreated, evaluationInterval)
// The next send interval should be in < 1 hour
assert.True(t, nextSendInterval < time.Hour)
// Calculate the next send interval again and it should be different
nextSendInterval2 := getNextSendInterval(lastCreated, evaluationInterval)
assert.NotEqual(t, nextSendInterval, nextSendInterval2)
}
type MockCheckService struct {
checks []checks.Check
}

Loading…
Cancel
Save