Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/compactor/jobqueue/queue.go

322 lines
8.3 KiB

package jobqueue
import (
"context"
"errors"
"sync"
"time"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/grafana/loki/v3/pkg/compactor/client/grpc"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)
var (
// ErrJobTypeAlreadyRegistered is returned when trying to register a job type that is already registered
ErrJobTypeAlreadyRegistered = errors.New("job type already registered")
)
// Builder defines the interface for building jobs that will be added to the queue
type Builder interface {
// BuildJobs builds new jobs and sends them to the provided channel
// It should be a blocking call and returns when ctx is cancelled.
BuildJobs(ctx context.Context, jobsChan chan<- *grpc.Job)
// OnJobResponse reports back the response of the job execution.
OnJobResponse(response *grpc.JobResult) error
// JobsLeft reports the estimated number of jobs left to process.
// It should be updated when jobs are successfully processed or new work is picked up for execution.
// It should include in-flight and upcoming jobs even if they are created on-demand.
// It could just estimate jobs left to be processed for concluding ongoing unit of work.
// The implementation must be concurrency safe.
JobsLeft() int
}
// Queue implements the job queue service
type Queue struct {
queue chan *grpc.Job
builders map[grpc.JobType]builder
wg sync.WaitGroup
stop chan struct{}
checkTimedOutJobsInterval time.Duration
metrics *queueMetrics
// Track jobs that are being processed
processingJobs map[string]*processingJob
processingJobsMtx sync.RWMutex
}
type processingJob struct {
job *grpc.Job
dequeued time.Time
attemptsLeft int
lastAttemptFailed bool
}
type builder struct {
Builder
jobTimeout time.Duration
maxRetries int
}
// NewQueue creates a new job queue
func NewQueue(r prometheus.Registerer) *Queue {
return newQueue(time.Minute, r)
}
// newQueue creates a new job queue with a configurable timed out jobs check ticker interval (for testing)
func newQueue(checkTimedOutJobsInterval time.Duration, r prometheus.Registerer) *Queue {
q := &Queue{
queue: make(chan *grpc.Job),
builders: make(map[grpc.JobType]builder),
stop: make(chan struct{}),
checkTimedOutJobsInterval: checkTimedOutJobsInterval,
processingJobs: make(map[string]*processingJob),
metrics: newQueueMetrics(r),
}
// Start the job timeout checker
q.wg.Add(1)
go q.retryFailedJobs()
return q
}
// RegisterBuilder registers a builder for a specific job type
func (q *Queue) RegisterBuilder(jobType grpc.JobType, b Builder, jobTimeout time.Duration, maxRetries int, r prometheus.Registerer) error {
if _, exists := q.builders[jobType]; exists {
return ErrJobTypeAlreadyRegistered
}
registerJobsLeftTrackerMetric(jobType.Humanize(), func() float64 {
return float64(b.JobsLeft())
}, r)
q.builders[jobType] = builder{
Builder: b,
jobTimeout: jobTimeout,
maxRetries: maxRetries,
}
return nil
}
// Start starts all registered builders
func (q *Queue) Start(ctx context.Context) {
buildersWg := sync.WaitGroup{}
for _, builder := range q.builders {
buildersWg.Add(1)
go func() {
defer buildersWg.Done()
q.startBuilder(ctx, builder)
}()
}
buildersWg.Wait()
close(q.stop)
q.wg.Wait()
close(q.queue)
}
func (q *Queue) startBuilder(ctx context.Context, builder Builder) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
builder.BuildJobs(ctx, q.queue)
}()
for {
select {
case <-ctx.Done():
return
case <-q.stop:
return
}
}
}
// retryFailedJobs retries the jobs which are failed. It includes jobs which have hit a timeout.
func (q *Queue) retryFailedJobs() {
defer q.wg.Done()
ticker := time.NewTicker(q.checkTimedOutJobsInterval)
defer ticker.Stop()
for {
select {
case <-q.stop:
return
case <-ticker.C:
var jobsToRetry []string
q.processingJobsMtx.Lock()
now := time.Now()
for jobID, pj := range q.processingJobs {
if pj.attemptsLeft <= 0 {
level.Error(util_log.Logger).Log("msg", "job ran out of attempts, dropping it", "jobID", jobID)
q.metrics.jobsDropped.Inc()
delete(q.processingJobs, jobID)
continue
}
timeout := q.builders[pj.job.Type].jobTimeout
if pj.lastAttemptFailed || now.Sub(pj.dequeued) > timeout {
jobsToRetry = append(jobsToRetry, jobID)
}
}
q.processingJobsMtx.Unlock()
for _, jobID := range jobsToRetry {
reason := "timeout"
q.processingJobsMtx.Lock()
pj := q.processingJobs[jobID]
if pj.lastAttemptFailed {
reason = "failed"
}
// reset the dequeued time so that the timeout is calculated from the time when the job is sent for processing.
q.processingJobs[jobID].dequeued = time.Now()
q.processingJobs[jobID].lastAttemptFailed = false
q.processingJobs[jobID].attemptsLeft--
q.processingJobsMtx.Unlock()
// Requeue the job
select {
case <-q.stop:
return
case q.queue <- pj.job:
q.metrics.jobRetries.WithLabelValues(reason).Inc()
level.Warn(util_log.Logger).Log(
"msg", "requeued job",
"job_id", jobID,
"job_type", pj.job.Type,
"reason", reason,
)
}
}
}
}
}
func (q *Queue) Loop(s grpc.JobQueue_LoopServer) error {
for {
var job *grpc.Job
var ok bool
ctx := s.Context()
select {
case <-ctx.Done():
return ctx.Err()
case <-q.stop:
return nil
case job, ok = <-q.queue:
if !ok {
return nil
}
}
// Track the job as being processed
q.processingJobsMtx.Lock()
if _, ok := q.processingJobs[job.Id]; !ok {
q.processingJobs[job.Id] = &processingJob{
job: job,
dequeued: time.Now(),
attemptsLeft: q.builders[job.Type].maxRetries,
}
}
q.processingJobsMtx.Unlock()
now := time.Now()
if err := s.Send(job); err != nil {
level.Error(util_log.Logger).Log("msg", "failed to send job", "job_id", job.Id, "err", err)
return err
}
q.metrics.jobsSent.Inc()
// Wait for the worker to finish the current job before we give it the next job.
// Worker signals completion of job by sending us back the execution result of the job we sent.
resp, err := s.Recv()
if err != nil {
q.metrics.jobsProcessingDuration.Observe(time.Since(now).Seconds())
level.Error(util_log.Logger).Log("msg", "error receiving job response", "job_id", job.Id, "err", err)
return err
}
q.metrics.jobsProcessingDuration.Observe(time.Since(now).Seconds())
if err := q.reportJobResult(resp); err != nil {
return err
}
}
}
func (q *Queue) reportJobResult(result *grpc.JobResult) error {
if result == nil {
return status.Error(codes.InvalidArgument, "result cannot be nil")
}
if _, ok := q.builders[result.JobType]; !ok {
return status.Error(codes.InvalidArgument, "unknown job type")
}
q.processingJobsMtx.Lock()
defer q.processingJobsMtx.Unlock()
pj, exists := q.processingJobs[result.JobId]
if !exists {
return status.Error(codes.NotFound, "job not found")
}
if result.Error != "" {
level.Error(util_log.Logger).Log(
"msg", "job execution failed",
"job_id", result.JobId,
"job_type", result.JobType,
"error", result.Error,
)
// Check if we should retry the job
if pj.attemptsLeft > 0 {
level.Info(util_log.Logger).Log(
"msg", "retrying failed job",
"job_id", result.JobId,
"job_type", result.JobType,
"attempts_left", pj.attemptsLeft,
)
pj.lastAttemptFailed = true
return nil
}
level.Error(util_log.Logger).Log(
"msg", "job failed after max attempts",
"job_id", result.JobId,
"job_type", result.JobType,
)
q.metrics.jobsDropped.Inc()
} else {
q.metrics.jobsProcessed.Inc()
level.Debug(util_log.Logger).Log(
"msg", "job execution succeeded",
"job_id", result.JobId,
"job_type", result.JobType,
)
}
if err := q.builders[result.JobType].OnJobResponse(result); err != nil {
level.Error(util_log.Logger).Log(
"msg", "failed to process job response",
"job_id", result.JobId,
"job_type", result.JobType,
"error", err,
)
return err
}
// Remove the job from processing jobs
delete(q.processingJobs, result.JobId)
return nil
}