feat(compactor HS): add support for worker for processing of jobs from the compactor's job queue (#18165)

pull/18176/head
Sandeep Sukhani 7 days ago committed by GitHub
parent 4d8d05fdd7
commit d05c4bc500
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 95
      pkg/compactor/jobqueue/queue.go
  2. 87
      pkg/compactor/jobqueue/queue_test.go
  3. 159
      pkg/compactor/jobqueue/worker.go
  4. 180
      pkg/compactor/jobqueue/worker_test.go

@ -43,17 +43,18 @@ type Queue struct {
processingJobs map[string]*processingJob
processingJobsMtx sync.RWMutex
jobTimeout time.Duration
maxRetries int
maxAttempts int
}
type processingJob struct {
job *grpc.Job
dequeued time.Time
retryCount int
job *grpc.Job
dequeued time.Time
attemptsLeft int
lastAttemptFailed bool
}
// New creates a new job queue
func New() *Queue {
// NewQueue creates a new job queue
func NewQueue() *Queue {
return newQueue(time.Minute)
}
@ -65,14 +66,14 @@ func newQueue(checkTimedOutJobsInterval time.Duration) *Queue {
stop: make(chan struct{}),
checkTimedOutJobsInterval: checkTimedOutJobsInterval,
processingJobs: make(map[string]*processingJob),
// ToDo(Sandeep): make jobTimeout and maxRetries configurable(possibly job specific)
jobTimeout: 15 * time.Minute,
maxRetries: 3,
// ToDo(Sandeep): make jobTimeout and maxAttempts configurable(possibly job specific)
jobTimeout: 15 * time.Minute,
maxAttempts: 3,
}
// Start the job timeout checker
q.wg.Add(1)
go q.checkJobTimeouts()
go q.retryFailedJobs()
return q
}
@ -96,13 +97,6 @@ func (q *Queue) Start(ctx context.Context) error {
return nil
}
// Stop stops all builders
func (q *Queue) Stop() error {
close(q.stop)
q.wg.Wait()
return nil
}
func (q *Queue) startBuilder(ctx context.Context, builder Builder) {
defer q.wg.Done()
@ -123,7 +117,8 @@ func (q *Queue) startBuilder(ctx context.Context, builder Builder) {
}
}
func (q *Queue) checkJobTimeouts() {
// retryFailedJobs retries the jobs which are failed. It includes jobs which have hit a timeout.
func (q *Queue) retryFailedJobs() {
defer q.wg.Done()
ticker := time.NewTicker(q.checkTimedOutJobsInterval)
@ -137,20 +132,32 @@ func (q *Queue) checkJobTimeouts() {
q.processingJobsMtx.Lock()
now := time.Now()
for jobID, pj := range q.processingJobs {
if now.Sub(pj.dequeued) > q.jobTimeout {
if pj.attemptsLeft <= 0 {
delete(q.processingJobs, jobID)
continue
}
if pj.lastAttemptFailed || now.Sub(pj.dequeued) > q.jobTimeout {
// Requeue the job
select {
case <-q.stop:
return
case q.queue <- pj.job:
reason := "timeout"
if pj.lastAttemptFailed {
reason = "failed"
}
level.Warn(util_log.Logger).Log(
"msg", "job timed out, requeuing",
"msg", "requeued job",
"job_id", jobID,
"job_type", pj.job.Type,
"timeout", q.jobTimeout,
"reason", reason,
)
// reset the dequeued time so that the timeout is calculated from the time when the job is sent for processing.
q.processingJobs[jobID].dequeued = time.Now()
q.processingJobs[jobID].lastAttemptFailed = false
q.processingJobs[jobID].attemptsLeft--
}
delete(q.processingJobs, jobID)
}
}
q.processingJobsMtx.Unlock()
@ -161,6 +168,7 @@ func (q *Queue) checkJobTimeouts() {
func (q *Queue) Loop(s grpc.JobQueue_LoopServer) error {
for {
var job *grpc.Job
var ok bool
ctx := s.Context()
select {
@ -168,15 +176,20 @@ func (q *Queue) Loop(s grpc.JobQueue_LoopServer) error {
return ctx.Err()
case <-q.stop:
return nil
case job = <-q.queue:
case job, ok = <-q.queue:
if !ok {
return nil
}
}
// Track the job as being processed
q.processingJobsMtx.Lock()
q.processingJobs[job.Id] = &processingJob{
job: job,
dequeued: time.Now(),
retryCount: 0,
if _, ok := q.processingJobs[job.Id]; !ok {
q.processingJobs[job.Id] = &processingJob{
job: job,
dequeued: time.Now(),
attemptsLeft: q.maxAttempts - 1,
}
}
q.processingJobsMtx.Unlock()
@ -191,17 +204,21 @@ func (q *Queue) Loop(s grpc.JobQueue_LoopServer) error {
return err
}
if err := q.reportJobResult(ctx, resp); err != nil {
if err := q.reportJobResult(resp); err != nil {
return err
}
}
}
func (q *Queue) reportJobResult(ctx context.Context, result *grpc.JobResult) error {
func (q *Queue) reportJobResult(result *grpc.JobResult) error {
if result == nil {
return status.Error(codes.InvalidArgument, "result cannot be nil")
}
if _, ok := q.builders[result.JobType]; !ok {
return status.Error(codes.InvalidArgument, "unknown job type")
}
q.processingJobsMtx.Lock()
defer q.processingJobsMtx.Unlock()
pj, exists := q.processingJobs[result.JobId]
@ -215,32 +232,24 @@ func (q *Queue) reportJobResult(ctx context.Context, result *grpc.JobResult) err
"job_id", result.JobId,
"job_type", result.JobType,
"error", result.Error,
"retry_count", pj.retryCount,
)
// Check if we should retry the job
if pj.retryCount < q.maxRetries {
pj.retryCount++
if pj.attemptsLeft > 0 {
level.Info(util_log.Logger).Log(
"msg", "retrying failed job",
"job_id", result.JobId,
"job_type", result.JobType,
"retry_count", pj.retryCount,
"max_retries", q.maxRetries,
"attempts_left", pj.attemptsLeft,
)
// Requeue the job
select {
case <-ctx.Done():
case q.queue <- pj.job:
return nil
}
pj.lastAttemptFailed = true
return nil
} else {
level.Error(util_log.Logger).Log(
"msg", "job failed after max retries",
"msg", "job failed after max attempts",
"job_id", result.JobId,
"job_type", result.JobType,
"max_retries", q.maxRetries,
)
}
} else {
@ -261,7 +270,9 @@ func (q *Queue) reportJobResult(ctx context.Context, result *grpc.JobResult) err
// Close closes the queue and releases all resources
func (q *Queue) Close() {
if !q.closed.Load() {
close(q.queue)
q.closed.Store(true)
close(q.stop)
q.wg.Wait()
close(q.queue)
}
}

@ -4,11 +4,11 @@ import (
"context"
"fmt"
"net"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/test/bufconn"
@ -18,10 +18,19 @@ import (
// mockBuilder implements the Builder interface for testing
type mockBuilder struct {
jobsToBuild []*compactor_grpc.Job
jobsToBuild []*compactor_grpc.Job
jobsSentCount atomic.Int32
jobsSucceeded atomic.Int32
jobsFailed atomic.Int32
}
func (m *mockBuilder) OnJobResponse(_ *compactor_grpc.JobResult) {}
func (m *mockBuilder) OnJobResponse(res *compactor_grpc.JobResult) {
if res.Error != "" {
m.jobsFailed.Inc()
} else {
m.jobsSucceeded.Inc()
}
}
func (m *mockBuilder) BuildJobs(ctx context.Context, jobsChan chan<- *compactor_grpc.Job) {
for _, job := range m.jobsToBuild {
@ -29,6 +38,7 @@ func (m *mockBuilder) BuildJobs(ctx context.Context, jobsChan chan<- *compactor_
case <-ctx.Done():
return
case jobsChan <- job:
m.jobsSentCount.Inc()
}
}
@ -36,7 +46,7 @@ func (m *mockBuilder) BuildJobs(ctx context.Context, jobsChan chan<- *compactor_
<-ctx.Done()
}
func server(t *testing.T, q *Queue) (*grpc.ClientConn, func()) {
func setupGRPC(t *testing.T, q *Queue) (*grpc.ClientConn, func()) {
buffer := 101024 * 1024
lis := bufconn.Listen(buffer)
@ -65,7 +75,7 @@ func server(t *testing.T, q *Queue) (*grpc.ClientConn, func()) {
}
func TestQueue_RegisterBuilder(t *testing.T) {
q := New()
q := NewQueue()
builder := &mockBuilder{}
// Register builder successfully
@ -78,9 +88,9 @@ func TestQueue_RegisterBuilder(t *testing.T) {
}
func TestQueue_Loop(t *testing.T) {
q := New()
q := NewQueue()
conn, closer := server(t, q)
conn, closer := setupGRPC(t, q)
defer closer()
// Create a couple of test jobs
@ -115,7 +125,7 @@ func TestQueue_Loop(t *testing.T) {
q.processingJobsMtx.RLock()
require.Equal(t, 1, len(q.processingJobs))
require.Equal(t, jobs[0], q.processingJobs[jobs[0].Id].job)
require.Equal(t, 0, q.processingJobs[jobs[0].Id].retryCount)
require.Equal(t, 2, q.processingJobs[jobs[0].Id].attemptsLeft)
q.processingJobsMtx.RUnlock()
// another Recv call on client1Stream without calling the Send call should get blocked
@ -140,9 +150,9 @@ func TestQueue_Loop(t *testing.T) {
q.processingJobsMtx.RLock()
require.Equal(t, 2, len(q.processingJobs))
require.Equal(t, jobs[0], q.processingJobs[jobs[0].Id].job)
require.Equal(t, 0, q.processingJobs[jobs[0].Id].retryCount)
require.Equal(t, 2, q.processingJobs[jobs[0].Id].attemptsLeft)
require.Equal(t, jobs[1], q.processingJobs[jobs[1].Id].job)
require.Equal(t, 0, q.processingJobs[jobs[1].Id].retryCount)
require.Equal(t, 2, q.processingJobs[jobs[1].Id].attemptsLeft)
q.processingJobsMtx.RUnlock()
// sending a response on client1Stream should get it unblocked to Recv the next job
@ -159,15 +169,14 @@ func TestQueue_Loop(t *testing.T) {
q.processingJobsMtx.RLock()
require.Equal(t, 2, len(q.processingJobs))
require.Equal(t, jobs[1], q.processingJobs[jobs[1].Id].job)
require.Equal(t, 0, q.processingJobs[jobs[1].Id].retryCount)
require.Equal(t, 2, q.processingJobs[jobs[1].Id].attemptsLeft)
require.Equal(t, jobs[2], q.processingJobs[jobs[2].Id].job)
require.Equal(t, 0, q.processingJobs[jobs[2].Id].retryCount)
require.Equal(t, 2, q.processingJobs[jobs[2].Id].attemptsLeft)
q.processingJobsMtx.RUnlock()
}
func TestQueue_ReportJobResult(t *testing.T) {
ctx := context.Background()
q := New()
q := newQueue(time.Second)
require.NoError(t, q.RegisterBuilder(compactor_grpc.JOB_TYPE_DELETION, &mockBuilder{}))
// Create a test job
@ -179,14 +188,14 @@ func TestQueue_ReportJobResult(t *testing.T) {
// Add job to processing jobs
q.processingJobsMtx.Lock()
q.processingJobs[job.Id] = &processingJob{
job: job,
dequeued: time.Now(),
retryCount: 0,
job: job,
dequeued: time.Now(),
attemptsLeft: 2,
}
q.processingJobsMtx.Unlock()
// Test successful response
err := q.reportJobResult(ctx, &compactor_grpc.JobResult{
err := q.reportJobResult(&compactor_grpc.JobResult{
JobId: job.Id,
JobType: job.Type,
})
@ -202,25 +211,18 @@ func TestQueue_ReportJobResult(t *testing.T) {
job.Id = "retry-job"
q.processingJobsMtx.Lock()
q.processingJobs[job.Id] = &processingJob{
job: job,
dequeued: time.Now(),
retryCount: 0,
job: job,
dequeued: time.Now(),
attemptsLeft: 2,
}
q.processingJobsMtx.Unlock()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := q.reportJobResult(ctx, &compactor_grpc.JobResult{
JobId: job.Id,
JobType: job.Type,
Error: "test error",
})
require.NoError(t, err)
}()
err = q.reportJobResult(&compactor_grpc.JobResult{
JobId: job.Id,
JobType: job.Type,
Error: "test error",
})
require.NoError(t, err)
// Verify job is requeued with timeout
select {
@ -230,14 +232,12 @@ func TestQueue_ReportJobResult(t *testing.T) {
t.Fatal("job was not requeued")
}
wg.Wait()
// Verify retry count is incremented
q.processingJobsMtx.RLock()
pj, exists := q.processingJobs[job.Id]
q.processingJobsMtx.RUnlock()
require.True(t, exists)
require.Equal(t, 1, pj.retryCount)
require.Equal(t, 1, pj.attemptsLeft)
}
func TestQueue_JobTimeout(t *testing.T) {
@ -253,9 +253,9 @@ func TestQueue_JobTimeout(t *testing.T) {
// Add job to processing jobs with old dequeued time
q.processingJobsMtx.Lock()
q.processingJobs[job.Id] = &processingJob{
job: job,
dequeued: time.Now().Add(-200 * time.Millisecond),
retryCount: 0,
job: job,
dequeued: time.Now().Add(-200 * time.Millisecond),
attemptsLeft: 2,
}
q.processingJobsMtx.Unlock()
@ -272,13 +272,14 @@ func TestQueue_JobTimeout(t *testing.T) {
// Verify job is removed from processing jobs
q.processingJobsMtx.RLock()
_, exists := q.processingJobs[job.Id]
pj, exists := q.processingJobs[job.Id]
q.processingJobsMtx.RUnlock()
require.False(t, exists)
require.True(t, exists)
require.Equal(t, 1, pj.attemptsLeft)
}
func TestQueue_Close(t *testing.T) {
q := New()
q := NewQueue()
// Close the queue
q.Close()

@ -0,0 +1,159 @@
package jobqueue
import (
"context"
"fmt"
"sync"
"time"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/pkg/errors"
"github.com/grafana/loki/v3/pkg/compactor/client/grpc"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)
var (
connBackoffConfig = backoff.Config{
MinBackoff: 500 * time.Millisecond,
MaxBackoff: 5 * time.Second,
}
)
type CompactorClient interface {
JobQueueClient() grpc.JobQueueClient
}
type JobRunner interface {
Run(ctx context.Context, job *grpc.Job) ([]byte, error)
}
type WorkerManager struct {
grpcClient CompactorClient
jobRunners map[grpc.JobType]JobRunner
cancel context.CancelFunc
wg sync.WaitGroup
}
func NewWorkerManager(grpcClient CompactorClient) *WorkerManager {
return &WorkerManager{
grpcClient: grpcClient,
jobRunners: make(map[grpc.JobType]JobRunner),
}
}
func (w *WorkerManager) RegisterJobRunner(jobType grpc.JobType, jobRunner JobRunner) error {
if _, exists := w.jobRunners[jobType]; exists {
return ErrJobTypeAlreadyRegistered
}
w.jobRunners[jobType] = jobRunner
return nil
}
func (w *WorkerManager) Start(ctx context.Context, numWorkers int) {
ctx, cancel := context.WithCancel(ctx)
w.cancel = cancel
for i := 0; i < numWorkers; i++ {
w.wg.Add(1)
go func() {
defer w.wg.Done()
newWorker(w.grpcClient, w.jobRunners).start(ctx)
}()
}
}
func (w *WorkerManager) Stop() {
if w.cancel != nil {
w.cancel()
}
w.wg.Wait()
}
type worker struct {
grpcClient CompactorClient
jobRunners map[grpc.JobType]JobRunner
}
func newWorker(grpcClient CompactorClient, jobRunners map[grpc.JobType]JobRunner) *worker {
return &worker{
grpcClient: grpcClient,
jobRunners: jobRunners,
}
}
func (w *worker) start(ctx context.Context) {
client := w.grpcClient.JobQueueClient()
backoff := backoff.New(ctx, connBackoffConfig)
for backoff.Ongoing() {
c, err := client.Loop(ctx)
if err != nil {
level.Warn(util_log.Logger).Log("msg", "error contacting compactor", "err", err)
backoff.Wait()
continue
}
if err := w.process(c); err != nil {
level.Error(util_log.Logger).Log("msg", "error running jobs", "err", err)
backoff.Wait()
continue
}
backoff.Reset()
}
}
// process pull jobs from the established stream, processes them and sends back the job result to the stream.
func (w *worker) process(c grpc.JobQueue_LoopClient) error {
// Build a child context so we can cancel the job when the stream is closed.
ctx, cancel := context.WithCancelCause(c.Context())
defer cancel(errors.New("job queue stream closed"))
for {
job, err := c.Recv()
if err != nil {
return err
}
// Execute the job on a "background" goroutine, so we go back to
// blocking on c.Recv(). This allows us to detect the stream closing
// and cancel the job execution. We don't process jobs in parallel
// here, as we're running in a lock-step with the server - each Recv is
// paired with a Send.
go func() {
jobResult := &grpc.JobResult{
JobId: job.Id,
JobType: job.Type,
}
jobRunner, ok := w.jobRunners[job.Type]
if !ok {
level.Error(util_log.Logger).Log("msg", "job runner for job type not registered", "jobType", job.Type)
jobResult.Error = fmt.Sprintf("unknown job type %s", job.Type)
if err := c.Send(jobResult); err != nil {
level.Error(util_log.Logger).Log("msg", "error sending job result", "err", err)
}
return
}
jobResponse, err := jobRunner.Run(ctx, job)
if err != nil {
level.Error(util_log.Logger).Log("msg", "error running job", "err", err)
jobResult.Error = err.Error()
if err := c.Send(jobResult); err != nil {
level.Error(util_log.Logger).Log("msg", "error sending job result", "err", err)
}
return
}
jobResult.Result = jobResponse
if err := c.Send(jobResult); err != nil {
level.Error(util_log.Logger).Log("msg", "error sending job result", "err", err)
return
}
}()
}
}

@ -0,0 +1,180 @@
package jobqueue
import (
"context"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"google.golang.org/grpc"
compactor_grpc "github.com/grafana/loki/v3/pkg/compactor/client/grpc"
)
type mockCompactorClient struct {
conn *grpc.ClientConn
}
func (m mockCompactorClient) JobQueueClient() compactor_grpc.JobQueueClient {
return compactor_grpc.NewJobQueueClient(m.conn)
}
type mockJobRunner struct {
mock.Mock
}
func (m *mockJobRunner) Run(ctx context.Context, job *compactor_grpc.Job) ([]byte, error) {
args := m.Called(ctx, job)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).([]byte), args.Error(1)
}
func TestWorkerManager(t *testing.T) {
// create a new job queue
q := NewQueue()
conn, closer := setupGRPC(t, q)
defer closer()
// create a mock job builder which would build only a single job
mockJobBuilder := &mockBuilder{
jobsToBuild: []*compactor_grpc.Job{
{
Id: "1",
Type: compactor_grpc.JOB_TYPE_DELETION,
},
},
}
// register the job builder with the queue and start the queue
require.NoError(t, q.RegisterBuilder(compactor_grpc.JOB_TYPE_DELETION, mockJobBuilder))
require.NoError(t, q.Start(context.Background()))
require.Equal(t, int32(0), mockJobBuilder.jobsSentCount.Load())
jobRunner := &mockJobRunner{}
jobRunner.On("Run", mock.Anything, mock.Anything).Return(nil, nil)
// create a new worker manager and register the mock job runner
wm := NewWorkerManager(mockCompactorClient{conn})
require.NoError(t, wm.RegisterJobRunner(compactor_grpc.JOB_TYPE_DELETION, jobRunner))
// trying to register job runner for same job type should throw an error
require.Error(t, wm.RegisterJobRunner(compactor_grpc.JOB_TYPE_DELETION, &mockJobRunner{}))
// start two workers so only one of them would get a job
wm.Start(context.Background(), 2)
// verify that the job builder got to send the job and that it got processed successfully
require.Eventually(t, func() bool {
if mockJobBuilder.jobsSentCount.Load() != 1 {
return false
}
if mockJobBuilder.jobsSucceeded.Load() != 1 {
return false
}
return true
}, time.Second, time.Millisecond*100)
// stop the worker manager
wm.Stop()
}
func TestWorker_ProcessJob(t *testing.T) {
// create a new job queue
q := newQueue(50 * time.Millisecond)
conn, closer := setupGRPC(t, q)
defer closer()
// create a mock job builder which would build a couple of jobs
mockJobBuilder := &mockBuilder{
jobsToBuild: []*compactor_grpc.Job{
{
Id: "1",
Type: compactor_grpc.JOB_TYPE_DELETION,
},
{
Id: "2",
Type: compactor_grpc.JOB_TYPE_DELETION + 1, // an unknown job should not break anything in processing further valid jobs
},
{
Id: "3",
Type: compactor_grpc.JOB_TYPE_DELETION,
},
},
}
jobRunner := &mockJobRunner{}
jobRunner.On("Run", mock.Anything, mock.Anything).Return(nil, nil).Once()
jobRunner.On("Run", mock.Anything, mock.Anything).Return(nil, fmt.Errorf("fail")).Times(3)
// register the job builder with the queue and start the queue
require.NoError(t, q.RegisterBuilder(compactor_grpc.JOB_TYPE_DELETION, mockJobBuilder))
require.NoError(t, q.Start(context.Background()))
require.Equal(t, int32(0), mockJobBuilder.jobsSentCount.Load())
// build a worker and start it
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go newWorker(mockCompactorClient{conn: conn}, map[compactor_grpc.JobType]JobRunner{
compactor_grpc.JOB_TYPE_DELETION: jobRunner,
}).start(ctx)
// verify that the job builder got to send all 3 jobs and that both the valid jobs got processed
require.Eventually(t, func() bool {
if mockJobBuilder.jobsSentCount.Load() != 3 {
return false
}
if mockJobBuilder.jobsSucceeded.Load() != 1 {
return false
}
if mockJobBuilder.jobsFailed.Load() != 1 {
return false
}
return true
}, 2*time.Second, time.Millisecond*50)
jobRunner.AssertExpectations(t)
}
func TestWorker_StreamClosure(t *testing.T) {
// build a queue
q := NewQueue()
conn, closer := setupGRPC(t, q)
defer closer()
// register a builder and start the queue
require.NoError(t, q.RegisterBuilder(compactor_grpc.JOB_TYPE_DELETION, &mockBuilder{}))
require.NoError(t, q.Start(context.Background()))
// build a worker
worker := newWorker(mockCompactorClient{conn: conn}, map[compactor_grpc.JobType]JobRunner{
compactor_grpc.JOB_TYPE_DELETION: &mockJobRunner{},
})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var running atomic.Bool
// start the worker and ensure that it is running
go func() {
running.Store(true)
defer running.Store(false)
worker.start(ctx)
}()
require.Eventually(t, func() bool {
return running.Load()
}, time.Second, time.Millisecond*100)
// close the queue so that it closes the stream
q.Close()
// sleep for a while and ensure that the worker is still running
time.Sleep(100 * time.Millisecond)
require.True(t, running.Load())
}
Loading…
Cancel
Save