|
|
|
|
@ -3,6 +3,7 @@ package jobs |
|
|
|
|
import ( |
|
|
|
|
"context" |
|
|
|
|
"fmt" |
|
|
|
|
"sync" |
|
|
|
|
"time" |
|
|
|
|
|
|
|
|
|
"github.com/grafana/grafana-app-sdk/logging" |
|
|
|
|
@ -14,10 +15,17 @@ import ( |
|
|
|
|
// or if the job completed
|
|
|
|
|
func maybeNotifyProgress(threshold time.Duration, fn ProgressFn) ProgressFn { |
|
|
|
|
var last time.Time |
|
|
|
|
var mu sync.Mutex |
|
|
|
|
|
|
|
|
|
return func(ctx context.Context, status provisioning.JobStatus) error { |
|
|
|
|
if status.Finished != 0 || last.IsZero() || time.Since(last) > threshold { |
|
|
|
|
mu.Lock() |
|
|
|
|
shouldNotify := status.Finished != 0 || last.IsZero() || time.Since(last) > threshold |
|
|
|
|
if shouldNotify { |
|
|
|
|
last = time.Now() |
|
|
|
|
} |
|
|
|
|
mu.Unlock() |
|
|
|
|
|
|
|
|
|
if shouldNotify { |
|
|
|
|
return fn(ctx, status) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -36,8 +44,10 @@ type JobResourceResult struct { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type jobProgressRecorder struct { |
|
|
|
|
mu sync.RWMutex |
|
|
|
|
started time.Time |
|
|
|
|
total int |
|
|
|
|
maxErrors int |
|
|
|
|
message string |
|
|
|
|
finalMessage string |
|
|
|
|
resultCount int |
|
|
|
|
@ -50,7 +60,8 @@ type jobProgressRecorder struct { |
|
|
|
|
|
|
|
|
|
func newJobProgressRecorder(ProgressFn ProgressFn) JobProgressRecorder { |
|
|
|
|
return &jobProgressRecorder{ |
|
|
|
|
started: time.Now(), |
|
|
|
|
maxErrors: 20, |
|
|
|
|
started: time.Now(), |
|
|
|
|
// Have a faster notifier for messages and total
|
|
|
|
|
notifyImmediatelyFn: maybeNotifyProgress(500*time.Millisecond, ProgressFn), |
|
|
|
|
maybeNotifyFn: maybeNotifyProgress(5*time.Second, ProgressFn), |
|
|
|
|
@ -59,6 +70,7 @@ func newJobProgressRecorder(ProgressFn ProgressFn) JobProgressRecorder { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *jobProgressRecorder) Record(ctx context.Context, result JobResourceResult) { |
|
|
|
|
r.mu.Lock() |
|
|
|
|
r.resultCount++ |
|
|
|
|
|
|
|
|
|
logger := logging.FromContext(ctx).With("path", result.Path, "resource", result.Resource, "group", result.Group, "action", result.Action, "name", result.Name) |
|
|
|
|
@ -73,11 +85,16 @@ func (r *jobProgressRecorder) Record(ctx context.Context, result JobResourceResu |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
r.updateSummary(result) |
|
|
|
|
r.mu.Unlock() |
|
|
|
|
|
|
|
|
|
r.maybeNotify(ctx) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// ResetResults will reset the results of the job
|
|
|
|
|
func (r *jobProgressRecorder) ResetResults() { |
|
|
|
|
r.mu.Lock() |
|
|
|
|
defer r.mu.Unlock() |
|
|
|
|
|
|
|
|
|
r.resultCount = 0 |
|
|
|
|
r.errorCount = 0 |
|
|
|
|
r.errors = nil |
|
|
|
|
@ -85,24 +102,41 @@ func (r *jobProgressRecorder) ResetResults() { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *jobProgressRecorder) SetMessage(ctx context.Context, msg string) { |
|
|
|
|
r.mu.Lock() |
|
|
|
|
r.message = msg |
|
|
|
|
r.mu.Unlock() |
|
|
|
|
|
|
|
|
|
logging.FromContext(ctx).Info("job progress message", "message", msg) |
|
|
|
|
r.notifyImmediately(ctx) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *jobProgressRecorder) SetFinalMessage(ctx context.Context, msg string) { |
|
|
|
|
r.mu.Lock() |
|
|
|
|
r.finalMessage = msg |
|
|
|
|
r.mu.Unlock() |
|
|
|
|
|
|
|
|
|
logging.FromContext(ctx).Info("job final message", "message", msg) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *jobProgressRecorder) SetTotal(ctx context.Context, total int) { |
|
|
|
|
r.mu.Lock() |
|
|
|
|
r.total = total |
|
|
|
|
r.mu.Unlock() |
|
|
|
|
|
|
|
|
|
r.notifyImmediately(ctx) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *jobProgressRecorder) Strict() { |
|
|
|
|
r.mu.Lock() |
|
|
|
|
r.maxErrors = 1 |
|
|
|
|
r.mu.Unlock() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *jobProgressRecorder) TooManyErrors() error { |
|
|
|
|
if r.errorCount > 20 { |
|
|
|
|
r.mu.RLock() |
|
|
|
|
defer r.mu.RUnlock() |
|
|
|
|
|
|
|
|
|
if r.errorCount >= r.maxErrors { |
|
|
|
|
return fmt.Errorf("too many errors: %d", r.errorCount) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -110,6 +144,9 @@ func (r *jobProgressRecorder) TooManyErrors() error { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *jobProgressRecorder) summary() []*provisioning.JobResourceSummary { |
|
|
|
|
r.mu.RLock() |
|
|
|
|
defer r.mu.RUnlock() |
|
|
|
|
|
|
|
|
|
if len(r.summaries) == 0 { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
@ -123,6 +160,7 @@ func (r *jobProgressRecorder) summary() []*provisioning.JobResourceSummary { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *jobProgressRecorder) updateSummary(result JobResourceResult) { |
|
|
|
|
// Note: This method is called from Record() which already holds the lock
|
|
|
|
|
key := result.Resource + ":" + result.Group |
|
|
|
|
summary, exists := r.summaries[key] |
|
|
|
|
if !exists { |
|
|
|
|
@ -155,6 +193,7 @@ func (r *jobProgressRecorder) updateSummary(result JobResourceResult) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *jobProgressRecorder) progress() float64 { |
|
|
|
|
// Note: This method is called from currentStatus() which already holds the lock
|
|
|
|
|
if r.total == 0 { |
|
|
|
|
return 0 |
|
|
|
|
} |
|
|
|
|
@ -162,15 +201,22 @@ func (r *jobProgressRecorder) progress() float64 { |
|
|
|
|
return float64(r.resultCount) / float64(r.total) * 100 |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *jobProgressRecorder) notifyImmediately(ctx context.Context) { |
|
|
|
|
jobStatus := provisioning.JobStatus{ |
|
|
|
|
func (r *jobProgressRecorder) currentStatus() provisioning.JobStatus { |
|
|
|
|
r.mu.RLock() |
|
|
|
|
defer r.mu.RUnlock() |
|
|
|
|
|
|
|
|
|
return provisioning.JobStatus{ |
|
|
|
|
Started: r.started.UnixMilli(), |
|
|
|
|
State: provisioning.JobStateWorking, |
|
|
|
|
Message: r.message, |
|
|
|
|
Errors: r.errors, |
|
|
|
|
Progress: r.progress(), |
|
|
|
|
Summary: r.summary(), |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *jobProgressRecorder) notifyImmediately(ctx context.Context) { |
|
|
|
|
jobStatus := r.currentStatus() |
|
|
|
|
logger := logging.FromContext(ctx) |
|
|
|
|
if err := r.notifyImmediatelyFn(ctx, jobStatus); err != nil { |
|
|
|
|
logger.Warn("error notifying immediate progress", "err", err) |
|
|
|
|
@ -178,13 +224,7 @@ func (r *jobProgressRecorder) notifyImmediately(ctx context.Context) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *jobProgressRecorder) maybeNotify(ctx context.Context) { |
|
|
|
|
jobStatus := provisioning.JobStatus{ |
|
|
|
|
State: provisioning.JobStateWorking, |
|
|
|
|
Message: r.message, |
|
|
|
|
Errors: r.errors, |
|
|
|
|
Progress: r.progress(), |
|
|
|
|
Summary: r.summary(), |
|
|
|
|
} |
|
|
|
|
jobStatus := r.currentStatus() |
|
|
|
|
|
|
|
|
|
logger := logging.FromContext(ctx) |
|
|
|
|
if err := r.maybeNotifyFn(ctx, jobStatus); err != nil { |
|
|
|
|
@ -193,6 +233,9 @@ func (r *jobProgressRecorder) maybeNotify(ctx context.Context) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *jobProgressRecorder) Complete(ctx context.Context, err error) provisioning.JobStatus { |
|
|
|
|
r.mu.RLock() |
|
|
|
|
defer r.mu.RUnlock() |
|
|
|
|
|
|
|
|
|
// Initialize base job status
|
|
|
|
|
jobStatus := provisioning.JobStatus{ |
|
|
|
|
Started: r.started.UnixMilli(), |
|
|
|
|
|