Advisor: Better handling of context cancel (#106470)

pull/106495/head
Andres Martinez Gotor 1 month ago committed by GitHub
parent 1dd3dd24a2
commit 34ef571542
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 58
      apps/advisor/pkg/app/checkscheduler/checkscheduler.go
  2. 152
      apps/advisor/pkg/app/checkscheduler/checkscheduler_test.go
  3. 4
      apps/advisor/pkg/app/checktyperegisterer/checktyperegisterer.go

@ -26,8 +26,8 @@ const defaultMaxHistory = 10
// with the existing checks types. This does not need to be a CRUD resource, but it is
// the only way existing at the moment to expose the check types.
type Runner struct {
checkRegistry checkregistry.CheckService
client resource.Client
typesClient resource.Client
evaluationInterval time.Duration
maxHistory int
namespace string
@ -41,7 +41,6 @@ func New(cfg app.Config, log logging.Logger) (app.Runnable, error) {
if !ok {
return nil, fmt.Errorf("invalid config type")
}
checkRegistry := specificConfig.CheckRegistry
evalInterval, err := getEvaluationInterval(specificConfig.PluginConfig)
if err != nil {
return nil, err
@ -61,10 +60,14 @@ func New(cfg app.Config, log logging.Logger) (app.Runnable, error) {
if err != nil {
return nil, err
}
typesClient, err := clientGenerator.ClientFor(advisorv0alpha1.CheckTypeKind())
if err != nil {
return nil, err
}
return &Runner{
checkRegistry: checkRegistry,
client: client,
typesClient: typesClient,
evaluationInterval: evalInterval,
maxHistory: maxHistory,
namespace: namespace,
@ -73,8 +76,8 @@ func New(cfg app.Config, log logging.Logger) (app.Runnable, error) {
}
func (r *Runner) Run(ctx context.Context) error {
lastCreated, err := r.checkLastCreated(ctx)
logger := r.log.WithContext(ctx)
lastCreated, err := r.checkLastCreated(ctx, logger)
if err != nil {
logger.Error("Error getting last check creation time", "error", err)
// Wait for interval to create the next scheduled check
@ -113,7 +116,6 @@ func (r *Runner) Run(ctx context.Context) error {
}
ticker.Reset(nextSendInterval)
case <-ctx.Done():
r.markUnprocessedChecksAsErrored(ctx, logger)
return ctx.Err()
}
}
@ -122,7 +124,8 @@ func (r *Runner) Run(ctx context.Context) error {
// checkLastCreated returns the creation time of the last check created
// regardless of its ID. This assumes that the checks are created in batches
// so a batch will have a similar creation time.
func (r *Runner) checkLastCreated(ctx context.Context) (time.Time, error) {
// In case it finds an unprocessed check from a previous run, it will set it to error.
func (r *Runner) checkLastCreated(ctx context.Context, log logging.Logger) (time.Time, error) {
list, err := r.client.List(ctx, r.namespace, resource.ListOptions{})
if err != nil {
return time.Time{}, err
@ -133,19 +136,40 @@ func (r *Runner) checkLastCreated(ctx context.Context) (time.Time, error) {
if itemCreated.After(lastCreated) {
lastCreated = itemCreated
}
// If the check is unprocessed, set it to error
if checks.GetStatusAnnotation(item) == "" {
log.Error("Check is unprocessed", "check", item.GetStaticMetadata().Identifier())
err := checks.SetStatusAnnotation(ctx, r.client, item, checks.StatusAnnotationError)
if err != nil {
log.Error("Error setting check status to error", "error", err)
}
}
}
return lastCreated, nil
}
// createChecks creates a new check for each check type in the registry.
func (r *Runner) createChecks(ctx context.Context) error {
for _, check := range r.checkRegistry.Checks() {
// List existing CheckType objects
list, err := r.typesClient.List(ctx, r.namespace, resource.ListOptions{})
if err != nil {
return fmt.Errorf("error listing check types: %w", err)
}
// Create checks for each CheckType
for _, item := range list.GetItems() {
checkType, ok := item.(*advisorv0alpha1.CheckType)
if !ok {
continue
}
obj := &advisorv0alpha1.Check{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "check-",
Namespace: r.namespace,
Labels: map[string]string{
checks.TypeLabel: check.ID(),
checks.TypeLabel: checkType.Spec.Name,
},
},
Spec: advisorv0alpha1.CheckSpec{},
@ -237,21 +261,3 @@ func getMaxHistory(pluginConfig map[string]string) (int, error) {
}
return maxHistory, nil
}
func (r *Runner) markUnprocessedChecksAsErrored(ctx context.Context, log logging.Logger) {
list, err := r.client.List(ctx, r.namespace, resource.ListOptions{})
if err != nil {
log.Error("Error getting checks", "error", err)
return
}
for _, check := range list.GetItems() {
if checks.GetStatusAnnotation(check) == "" {
log.Error("Check is unprocessed", "check", check.GetStaticMetadata().Identifier())
err := checks.SetStatusAnnotation(ctx, r.client, check, checks.StatusAnnotationError)
if err != nil {
log.Error("Error setting check status to error", "error", err)
}
}
}
}

@ -18,7 +18,6 @@ import (
func TestRunner_Run(t *testing.T) {
t.Run("does not crash when error on list", func(t *testing.T) {
mockCheckService := &MockCheckService{}
mockClient := &MockClient{
listFunc: func(ctx context.Context, namespace string, options resource.ListOptions) (resource.ListObject, error) {
return nil, errors.New("list error")
@ -28,9 +27,15 @@ func TestRunner_Run(t *testing.T) {
},
}
mockTypesClient := &MockClient{
listFunc: func(ctx context.Context, namespace string, options resource.ListOptions) (resource.ListObject, error) {
return &advisorv0alpha1.CheckTypeList{Items: []advisorv0alpha1.CheckType{}}, nil
},
}
runner := &Runner{
checkRegistry: mockCheckService,
client: mockClient,
typesClient: mockTypesClient,
log: logging.DefaultLogger,
evaluationInterval: 1 * time.Hour,
}
@ -54,29 +59,71 @@ func TestRunner_checkLastCreated_ErrorOnList(t *testing.T) {
log: logging.DefaultLogger,
}
lastCreated, err := runner.checkLastCreated(context.Background())
lastCreated, err := runner.checkLastCreated(context.Background(), logging.DefaultLogger)
assert.Error(t, err)
assert.True(t, lastCreated.IsZero())
}
func TestRunner_createChecks_ErrorOnCreate(t *testing.T) {
mockCheckService := &MockCheckService{
checks: []checks.Check{
&mockCheck{
id: "check-1",
},
func TestRunner_checkLastCreated_UnprocessedCheck(t *testing.T) {
patchOperation := resource.PatchOperation{}
identifier := resource.Identifier{}
mockClient := &MockClient{
listFunc: func(ctx context.Context, namespace string, options resource.ListOptions) (resource.ListObject, error) {
return &advisorv0alpha1.CheckList{
Items: []advisorv0alpha1.Check{
{
ObjectMeta: metav1.ObjectMeta{
Name: "check-1",
},
},
},
}, nil
},
patchFunc: func(ctx context.Context, id resource.Identifier, patch resource.PatchRequest, options resource.PatchOptions, into resource.Object) error {
patchOperation = patch.Operations[0]
identifier = id
return nil
},
}
runner := &Runner{
client: mockClient,
log: logging.DefaultLogger,
}
lastCreated, err := runner.checkLastCreated(context.Background(), logging.DefaultLogger)
assert.NoError(t, err)
assert.True(t, lastCreated.IsZero())
assert.Equal(t, "check-1", identifier.Name)
assert.Equal(t, "/metadata/annotations", patchOperation.Path)
expectedAnnotations := map[string]string{
checks.StatusAnnotation: "error",
}
assert.Equal(t, expectedAnnotations, patchOperation.Value)
}
func TestRunner_createChecks_ErrorOnCreate(t *testing.T) {
mockClient := &MockClient{
createFunc: func(ctx context.Context, id resource.Identifier, obj resource.Object, opts resource.CreateOptions) (resource.Object, error) {
return nil, errors.New("create error")
},
}
mockTypesClient := &MockClient{
listFunc: func(ctx context.Context, namespace string, options resource.ListOptions) (resource.ListObject, error) {
checkType := &advisorv0alpha1.CheckType{}
checkType.Spec.Name = "check-1"
return &advisorv0alpha1.CheckTypeList{
Items: []advisorv0alpha1.CheckType{*checkType},
}, nil
},
}
runner := &Runner{
checkRegistry: mockCheckService,
client: mockClient,
log: logging.DefaultLogger,
client: mockClient,
typesClient: mockTypesClient,
log: logging.DefaultLogger,
}
err := runner.createChecks(context.Background())
@ -84,23 +131,26 @@ func TestRunner_createChecks_ErrorOnCreate(t *testing.T) {
}
func TestRunner_createChecks_Success(t *testing.T) {
mockCheckService := &MockCheckService{
checks: []checks.Check{
&mockCheck{
id: "check-1",
},
},
}
mockClient := &MockClient{
createFunc: func(ctx context.Context, id resource.Identifier, obj resource.Object, opts resource.CreateOptions) (resource.Object, error) {
return &advisorv0alpha1.Check{}, nil
},
}
mockTypesClient := &MockClient{
listFunc: func(ctx context.Context, namespace string, options resource.ListOptions) (resource.ListObject, error) {
checkType := &advisorv0alpha1.CheckType{}
checkType.Spec.Name = "check-1"
return &advisorv0alpha1.CheckTypeList{
Items: []advisorv0alpha1.CheckType{*checkType},
}, nil
},
}
runner := &Runner{
checkRegistry: mockCheckService,
client: mockClient,
log: logging.DefaultLogger,
client: mockClient,
typesClient: mockTypesClient,
log: logging.DefaultLogger,
}
err := runner.createChecks(context.Background())
@ -250,41 +300,6 @@ func Test_getMaxHistory(t *testing.T) {
})
}
func Test_markUnprocessedChecksAsErrored(t *testing.T) {
checkList := &advisorv0alpha1.CheckList{
Items: []advisorv0alpha1.Check{
{
ObjectMeta: metav1.ObjectMeta{
Name: "check-1",
},
},
},
}
patchOperation := resource.PatchOperation{}
identifier := resource.Identifier{}
mockClient := &MockClient{
listFunc: func(ctx context.Context, namespace string, options resource.ListOptions) (resource.ListObject, error) {
return checkList, nil
},
patchFunc: func(ctx context.Context, id resource.Identifier, patch resource.PatchRequest, options resource.PatchOptions, into resource.Object) error {
patchOperation = patch.Operations[0]
identifier = id
return nil
},
}
runner := &Runner{
client: mockClient,
log: logging.DefaultLogger,
}
runner.markUnprocessedChecksAsErrored(context.Background(), logging.DefaultLogger)
assert.Equal(t, "check-1", identifier.Name)
assert.Equal(t, "/metadata/annotations", patchOperation.Path)
expectedAnnotations := map[string]string{
checks.StatusAnnotation: "error",
}
assert.Equal(t, expectedAnnotations, patchOperation.Value)
}
func Test_getNextSendInterval(t *testing.T) {
lastCreated := time.Now().Add(-7 * 24 * time.Hour)
evaluationInterval := 7 * 24 * time.Hour
@ -296,14 +311,6 @@ func Test_getNextSendInterval(t *testing.T) {
assert.NotEqual(t, nextSendInterval, nextSendInterval2)
}
type MockCheckService struct {
checks []checks.Check
}
func (m *MockCheckService) Checks() []checks.Check {
return m.checks
}
type MockClient struct {
resource.Client
listFunc func(ctx context.Context, namespace string, options resource.ListOptions) (resource.ListObject, error)
@ -327,18 +334,3 @@ func (m *MockClient) Delete(ctx context.Context, identifier resource.Identifier,
func (m *MockClient) PatchInto(ctx context.Context, identifier resource.Identifier, patch resource.PatchRequest, options resource.PatchOptions, into resource.Object) error {
return m.patchFunc(ctx, identifier, patch, options, into)
}
type mockCheck struct {
checks.Check
id string
steps []checks.Step
}
func (m *mockCheck) ID() string {
return m.id
}
func (m *mockCheck) Steps() []checks.Step {
return m.steps
}

@ -79,7 +79,7 @@ func (r *Runner) createOrUpdate(ctx context.Context, log logging.Logger, obj res
maps.Copy(currentAnnotations, annotations)
obj.SetAnnotations(currentAnnotations) // This will update the annotations in the object
_, err = r.client.Update(ctx, id, obj, resource.UpdateOptions{})
if err != nil {
if err != nil && !errors.IsAlreadyExists(err) {
// Ignore the error, it's probably due to a race condition
log.Error("Error updating check type", "error", err)
}
@ -121,7 +121,7 @@ func (r *Runner) Run(ctx context.Context) error {
},
}
for i := 0; i < r.retryAttempts; i++ {
err := r.createOrUpdate(ctx, logger, obj)
err := r.createOrUpdate(context.WithoutCancel(ctx), logger, obj)
if err != nil {
logger.Error("Error creating check type, retrying", "error", err, "attempt", i+1)
if i == r.retryAttempts-1 {

Loading…
Cancel
Save