mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
320 lines
8.3 KiB
320 lines
8.3 KiB
package jobqueue
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
"go.uber.org/atomic"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
"google.golang.org/grpc/test/bufconn"
|
|
|
|
compactor_grpc "github.com/grafana/loki/v3/pkg/compactor/client/grpc"
|
|
)
|
|
|
|
const (
|
|
jobTimeout = 5 * time.Minute
|
|
jobRetries = 3
|
|
)
|
|
|
|
// mockBuilder implements the Builder interface for testing
|
|
type mockBuilder struct {
|
|
jobsToBuild []*compactor_grpc.Job
|
|
jobsSentCount atomic.Int32
|
|
jobsSucceeded atomic.Int32
|
|
jobsFailed atomic.Int32
|
|
}
|
|
|
|
func (m *mockBuilder) OnJobResponse(res *compactor_grpc.JobResult) error {
|
|
if res.Error != "" {
|
|
m.jobsFailed.Inc()
|
|
} else {
|
|
m.jobsSucceeded.Inc()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (m *mockBuilder) BuildJobs(ctx context.Context, jobsChan chan<- *compactor_grpc.Job) {
|
|
for _, job := range m.jobsToBuild {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case jobsChan <- job:
|
|
m.jobsSentCount.Inc()
|
|
}
|
|
}
|
|
|
|
// Keep running until context is cancelled
|
|
<-ctx.Done()
|
|
}
|
|
|
|
func (m *mockBuilder) JobsLeft() int {
|
|
return len(m.jobsToBuild) - int(m.jobsSucceeded.Load())
|
|
}
|
|
|
|
func setupGRPC(t *testing.T, q *Queue) (*grpc.ClientConn, func()) {
|
|
buffer := 101024 * 1024
|
|
lis := bufconn.Listen(buffer)
|
|
|
|
baseServer := grpc.NewServer()
|
|
|
|
compactor_grpc.RegisterJobQueueServer(baseServer, q)
|
|
go func() {
|
|
if err := baseServer.Serve(lis); err != nil {
|
|
t.Logf("Failed to serve: %v", err)
|
|
}
|
|
}()
|
|
|
|
// nolint:staticcheck // compactor_grpc.DialContext() has been deprecated; we'll address it before upgrading to gRPC 2.
|
|
conn, err := grpc.DialContext(context.Background(), "",
|
|
grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
|
|
return lis.Dial()
|
|
}), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
require.NoError(t, err)
|
|
|
|
closer := func() {
|
|
require.NoError(t, lis.Close())
|
|
baseServer.GracefulStop()
|
|
}
|
|
|
|
return conn, closer
|
|
}
|
|
|
|
func TestQueue_RegisterBuilder(t *testing.T) {
|
|
q := NewQueue(nil)
|
|
builder := &mockBuilder{}
|
|
|
|
// Register builder successfully
|
|
err := q.RegisterBuilder(compactor_grpc.JOB_TYPE_DELETION, builder, jobTimeout, jobRetries, nil)
|
|
require.NoError(t, err)
|
|
|
|
// Try to register same builder type again
|
|
err = q.RegisterBuilder(compactor_grpc.JOB_TYPE_DELETION, builder, jobTimeout, jobRetries, nil)
|
|
require.ErrorIs(t, err, ErrJobTypeAlreadyRegistered)
|
|
}
|
|
|
|
func TestQueue_Loop(t *testing.T) {
|
|
q := NewQueue(nil)
|
|
|
|
conn, closer := setupGRPC(t, q)
|
|
defer closer()
|
|
|
|
// Create a couple of test jobs
|
|
var jobs []*compactor_grpc.Job
|
|
for i := 0; i < 3; i++ {
|
|
jobs = append(jobs, &compactor_grpc.Job{
|
|
Id: fmt.Sprintf("test-job-%d", i),
|
|
Type: compactor_grpc.JOB_TYPE_DELETION,
|
|
})
|
|
}
|
|
|
|
builder := &mockBuilder{
|
|
jobsToBuild: jobs,
|
|
}
|
|
|
|
require.NoError(t, q.RegisterBuilder(compactor_grpc.JOB_TYPE_DELETION, builder, jobTimeout, jobRetries, nil))
|
|
go q.Start(context.Background())
|
|
|
|
// Dequeue the job
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
|
|
defer cancel()
|
|
|
|
client1 := compactor_grpc.NewJobQueueClient(conn)
|
|
client1Stream, err := client1.Loop(ctx)
|
|
require.NoError(t, err)
|
|
|
|
resp, err := client1Stream.Recv()
|
|
require.NoError(t, err)
|
|
require.Equal(t, jobs[0], resp)
|
|
|
|
// Verify job is tracked as being processed
|
|
q.processingJobsMtx.RLock()
|
|
require.Equal(t, 1, len(q.processingJobs))
|
|
require.Equal(t, jobs[0], q.processingJobs[jobs[0].Id].job)
|
|
require.Equal(t, 3, q.processingJobs[jobs[0].Id].attemptsLeft)
|
|
q.processingJobsMtx.RUnlock()
|
|
|
|
// another Recv call on client1Stream without calling the Send call should get blocked
|
|
client1ReceivedNextJob := make(chan struct{})
|
|
go func() {
|
|
resp, err := client1Stream.Recv()
|
|
require.NoError(t, err)
|
|
require.Equal(t, jobs[2], resp)
|
|
client1ReceivedNextJob <- struct{}{}
|
|
}()
|
|
|
|
// make a new client and try getting a job
|
|
client2 := compactor_grpc.NewJobQueueClient(conn)
|
|
client2Stream, err := client2.Loop(ctx)
|
|
require.NoError(t, err)
|
|
|
|
resp, err = client2Stream.Recv()
|
|
require.NoError(t, err)
|
|
require.Equal(t, jobs[1], resp)
|
|
|
|
// Verify both the jobs are tracked as being processed
|
|
q.processingJobsMtx.RLock()
|
|
require.Equal(t, 2, len(q.processingJobs))
|
|
require.Equal(t, jobs[0], q.processingJobs[jobs[0].Id].job)
|
|
require.Equal(t, 3, q.processingJobs[jobs[0].Id].attemptsLeft)
|
|
require.Equal(t, jobs[1], q.processingJobs[jobs[1].Id].job)
|
|
require.Equal(t, 3, q.processingJobs[jobs[1].Id].attemptsLeft)
|
|
q.processingJobsMtx.RUnlock()
|
|
|
|
// sending a response on client1Stream should get it unblocked to Recv the next job
|
|
err = client1Stream.Send(&compactor_grpc.JobResult{
|
|
JobId: jobs[0].Id,
|
|
JobType: jobs[0].Type,
|
|
Result: []byte("test-result"),
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
<-client1ReceivedNextJob
|
|
|
|
// Verify that now job1 & job2 are tracked as being processed
|
|
q.processingJobsMtx.RLock()
|
|
require.Equal(t, 2, len(q.processingJobs))
|
|
require.Equal(t, jobs[1], q.processingJobs[jobs[1].Id].job)
|
|
require.Equal(t, 3, q.processingJobs[jobs[1].Id].attemptsLeft)
|
|
require.Equal(t, jobs[2], q.processingJobs[jobs[2].Id].job)
|
|
require.Equal(t, 3, q.processingJobs[jobs[2].Id].attemptsLeft)
|
|
q.processingJobsMtx.RUnlock()
|
|
}
|
|
|
|
func TestQueue_ReportJobResult(t *testing.T) {
|
|
q := newQueue(time.Second, nil)
|
|
require.NoError(t, q.RegisterBuilder(compactor_grpc.JOB_TYPE_DELETION, &mockBuilder{}, jobTimeout, jobRetries, nil))
|
|
|
|
// Create a test job
|
|
job := &compactor_grpc.Job{
|
|
Id: "test-job",
|
|
Type: compactor_grpc.JOB_TYPE_DELETION,
|
|
}
|
|
|
|
// Add job to processing jobs
|
|
q.processingJobsMtx.Lock()
|
|
q.processingJobs[job.Id] = &processingJob{
|
|
job: job,
|
|
dequeued: time.Now(),
|
|
attemptsLeft: 2,
|
|
}
|
|
q.processingJobsMtx.Unlock()
|
|
|
|
// Test successful response
|
|
err := q.reportJobResult(&compactor_grpc.JobResult{
|
|
JobId: job.Id,
|
|
JobType: job.Type,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
// Verify job is removed from processing jobs
|
|
q.processingJobsMtx.RLock()
|
|
_, exists := q.processingJobs[job.Id]
|
|
q.processingJobsMtx.RUnlock()
|
|
require.False(t, exists)
|
|
|
|
// Test error response with retry
|
|
job.Id = "retry-job"
|
|
q.processingJobsMtx.Lock()
|
|
q.processingJobs[job.Id] = &processingJob{
|
|
job: job,
|
|
dequeued: time.Now(),
|
|
attemptsLeft: 2,
|
|
}
|
|
q.processingJobsMtx.Unlock()
|
|
|
|
err = q.reportJobResult(&compactor_grpc.JobResult{
|
|
JobId: job.Id,
|
|
JobType: job.Type,
|
|
Error: "test error",
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
// Verify job is requeued with timeout
|
|
select {
|
|
case requeuedJob := <-q.queue:
|
|
require.Equal(t, job, requeuedJob)
|
|
case <-time.After(time.Minute):
|
|
t.Fatal("job was not requeued")
|
|
}
|
|
|
|
// Verify retry count is incremented
|
|
q.processingJobsMtx.RLock()
|
|
pj, exists := q.processingJobs[job.Id]
|
|
q.processingJobsMtx.RUnlock()
|
|
require.True(t, exists)
|
|
require.Equal(t, 1, pj.attemptsLeft)
|
|
}
|
|
|
|
func TestQueue_JobTimeout(t *testing.T) {
|
|
q := newQueue(50*time.Millisecond, nil)
|
|
// Short job timeout for testing
|
|
require.NoError(t, q.RegisterBuilder(compactor_grpc.JOB_TYPE_DELETION, &mockBuilder{}, 100*time.Millisecond, jobRetries, nil))
|
|
|
|
// Create a test job
|
|
job := &compactor_grpc.Job{
|
|
Id: "test-job",
|
|
Type: compactor_grpc.JOB_TYPE_DELETION,
|
|
}
|
|
|
|
// Add job to processing jobs with old dequeued time
|
|
q.processingJobsMtx.Lock()
|
|
q.processingJobs[job.Id] = &processingJob{
|
|
job: job,
|
|
dequeued: time.Now().Add(-200 * time.Millisecond),
|
|
attemptsLeft: 2,
|
|
}
|
|
q.processingJobsMtx.Unlock()
|
|
|
|
// Wait for timeout checker to run
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
// Verify job is requeued
|
|
select {
|
|
case requeuedJob := <-q.queue:
|
|
require.Equal(t, job, requeuedJob)
|
|
case <-time.After(time.Second):
|
|
t.Fatal("job was not requeued after timeout")
|
|
}
|
|
|
|
// Verify job is removed from processing jobs
|
|
q.processingJobsMtx.RLock()
|
|
pj, exists := q.processingJobs[job.Id]
|
|
require.True(t, exists)
|
|
require.Equal(t, 1, pj.attemptsLeft)
|
|
q.processingJobsMtx.RUnlock()
|
|
}
|
|
|
|
func TestQueue_Close(t *testing.T) {
|
|
q := NewQueue(nil)
|
|
|
|
require.NoError(t, q.RegisterBuilder(compactor_grpc.JOB_TYPE_DELETION, &mockBuilder{}, jobTimeout, jobRetries, nil))
|
|
|
|
wg := sync.WaitGroup{}
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
q.Start(ctx)
|
|
}()
|
|
|
|
// cancel the context to stop all the builders and close the queue
|
|
cancel()
|
|
|
|
// wait for everything to stop before we check for closed channel
|
|
wg.Wait()
|
|
|
|
// Verify queue channel is closed
|
|
select {
|
|
case _, ok := <-q.queue:
|
|
require.False(t, ok)
|
|
default:
|
|
t.Fatal("queue channel should be closed")
|
|
}
|
|
}
|
|
|