mirror of https://github.com/grafana/loki
feat(compactor HS): add job queue and deletion job builder (#17843)
parent
44b2c27eaa
commit
267f4dc0ec
@ -0,0 +1,306 @@ |
||||
package deletion |
||||
|
||||
import ( |
||||
"context" |
||||
"encoding/json" |
||||
"fmt" |
||||
"path" |
||||
"strconv" |
||||
"sync" |
||||
"time" |
||||
|
||||
"github.com/go-kit/log/level" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/compactor/jobqueue" |
||||
"github.com/grafana/loki/v3/pkg/storage/chunk/client" |
||||
util_log "github.com/grafana/loki/v3/pkg/util/log" |
||||
) |
||||
|
||||
const maxChunksPerJob = 1000 |
||||
|
||||
type deletionJob struct { |
||||
ChunkIDs []string `json:"chunk_ids"` |
||||
DeletionQueries []string `json:"deletion_queries"` |
||||
} |
||||
|
||||
type manifestJobs struct { |
||||
jobsInProgress map[string]struct{} |
||||
cancel context.CancelFunc |
||||
manifestPath string |
||||
} |
||||
|
||||
type manifestError struct { |
||||
JobID string `json:"job_id"` |
||||
Error string `json:"error"` |
||||
} |
||||
|
||||
type JobBuilder struct { |
||||
deleteStoreClient client.ObjectClient |
||||
|
||||
// Current manifest being processed
|
||||
currentManifest manifestJobs |
||||
currentManifestMtx sync.RWMutex |
||||
} |
||||
|
||||
func NewJobBuilder(deleteStoreClient client.ObjectClient) *JobBuilder { |
||||
return &JobBuilder{ |
||||
deleteStoreClient: deleteStoreClient, |
||||
} |
||||
} |
||||
|
||||
// BuildJobs implements jobqueue.Builder interface
|
||||
func (b *JobBuilder) BuildJobs(ctx context.Context, jobsChan chan<- *jobqueue.Job) error { |
||||
ticker := time.NewTicker(5 * time.Minute) |
||||
defer ticker.Stop() |
||||
|
||||
for { |
||||
if err := b.buildJobs(ctx, jobsChan); err != nil { |
||||
return err |
||||
} |
||||
|
||||
// Wait for next tick or context cancellation
|
||||
select { |
||||
case <-ctx.Done(): |
||||
return ctx.Err() |
||||
case <-ticker.C: |
||||
// Continue to next iteration
|
||||
} |
||||
} |
||||
} |
||||
|
||||
func (b *JobBuilder) buildJobs(ctx context.Context, jobsChan chan<- *jobqueue.Job) error { |
||||
// List all manifest directories
|
||||
manifests, err := b.listManifests(ctx) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
// Process each manifest
|
||||
for _, manifestPath := range manifests { |
||||
if err := b.processManifest(ctx, manifestPath, jobsChan); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func (b *JobBuilder) processManifest(ctx context.Context, manifestPath string, jobsChan chan<- *jobqueue.Job) error { |
||||
level.Info(util_log.Logger).Log("msg", "starting manifest processing", "manifest", manifestPath) |
||||
|
||||
// Read manifest
|
||||
manifest, err := b.readManifest(ctx, manifestPath) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
// Initialize tracking for this manifest
|
||||
ctx, cancel := context.WithCancel(ctx) |
||||
b.currentManifestMtx.Lock() |
||||
b.currentManifest = manifestJobs{ |
||||
jobsInProgress: make(map[string]struct{}), |
||||
manifestPath: manifestPath, |
||||
cancel: cancel, |
||||
} |
||||
b.currentManifestMtx.Unlock() |
||||
|
||||
// Process segments sequentially
|
||||
for segmentNum := 1; ctx.Err() == nil && segmentNum <= manifest.SegmentsCount; segmentNum++ { |
||||
level.Info(util_log.Logger).Log("msg", "starting segment processing", |
||||
"manifest", manifestPath, |
||||
"segment", segmentNum) |
||||
|
||||
segmentPath := path.Join(manifestPath, fmt.Sprintf("%d.json", segmentNum)) |
||||
|
||||
manifestExists, err := b.deleteStoreClient.ObjectExists(ctx, segmentPath) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
if !manifestExists { |
||||
level.Info(util_log.Logger).Log("msg", "manifest does not exist(likely processed already), skipping", "manifest", manifestPath) |
||||
continue |
||||
} |
||||
|
||||
segment, err := b.getSegment(ctx, segmentPath) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
// Reset job counters for this segment
|
||||
b.currentManifestMtx.Lock() |
||||
b.currentManifest.jobsInProgress = make(map[string]struct{}) |
||||
b.currentManifestMtx.Unlock() |
||||
|
||||
// Process each chunks group (same deletion query)
|
||||
for i, group := range segment.ChunksGroups { |
||||
// Check if we should stop processing this manifest
|
||||
if ctx.Err() != nil { |
||||
return ctx.Err() |
||||
} |
||||
|
||||
if err := b.createJobsForChunksGroup(ctx, fmt.Sprintf("%d", i), group, jobsChan); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
|
||||
// Wait for all jobs in this segment to complete
|
||||
if err := b.waitForSegmentCompletion(ctx); err != nil { |
||||
return err |
||||
} |
||||
|
||||
// Delete the processed segment
|
||||
if err := b.deleteStoreClient.DeleteObject(ctx, segmentPath); err != nil { |
||||
level.Warn(util_log.Logger).Log("msg", "failed to delete processed segment", |
||||
"segment", segmentPath, |
||||
"error", err) |
||||
} |
||||
|
||||
level.Info(util_log.Logger).Log("msg", "finished segment processing", |
||||
"manifest", manifestPath, |
||||
"segment", segmentNum) |
||||
} |
||||
|
||||
level.Info(util_log.Logger).Log("msg", "finished manifest processing", "manifest", manifestPath) |
||||
return nil |
||||
} |
||||
|
||||
func (b *JobBuilder) waitForSegmentCompletion(ctx context.Context) error { |
||||
for { |
||||
b.currentManifestMtx.RLock() |
||||
if len(b.currentManifest.jobsInProgress) == 0 { |
||||
b.currentManifestMtx.RUnlock() |
||||
return nil |
||||
} |
||||
b.currentManifestMtx.RUnlock() |
||||
|
||||
select { |
||||
case <-ctx.Done(): |
||||
return ctx.Err() |
||||
case <-time.After(time.Second): |
||||
// Check again
|
||||
} |
||||
} |
||||
} |
||||
|
||||
func (b *JobBuilder) listManifests(ctx context.Context) ([]string, error) { |
||||
// List all directories in the deletion store
|
||||
_, commonPrefixes, err := b.deleteStoreClient.List(ctx, "", "/") |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
// Filter for manifest directories (they are named with Unix timestamps)
|
||||
var manifests []string |
||||
for _, commonPrefix := range commonPrefixes { |
||||
// Check if directory name is a valid timestamp
|
||||
if _, err := strconv.ParseInt(path.Base(string(commonPrefix)), 10, 64); err != nil { |
||||
continue |
||||
} |
||||
|
||||
// Check if manifest.json exists in this directory
|
||||
manifestPath := path.Join(string(commonPrefix), manifestFileName) |
||||
exists, err := b.deleteStoreClient.ObjectExists(ctx, manifestPath) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
if !exists { |
||||
// Skip directories without manifest.json
|
||||
continue |
||||
} |
||||
|
||||
manifests = append(manifests, string(commonPrefix)) |
||||
} |
||||
|
||||
return manifests, nil |
||||
} |
||||
|
||||
func (b *JobBuilder) readManifest(ctx context.Context, manifestPath string) (*manifest, error) { |
||||
// Read manifest file
|
||||
reader, _, err := b.deleteStoreClient.GetObject(ctx, path.Join(manifestPath, manifestFileName)) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
defer reader.Close() |
||||
|
||||
var m manifest |
||||
if err := json.NewDecoder(reader).Decode(&m); err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return &m, nil |
||||
} |
||||
|
||||
func (b *JobBuilder) createJobsForChunksGroup(ctx context.Context, groupID string, group ChunksGroup, jobsChan chan<- *jobqueue.Job) error { |
||||
deletionQueries := make([]string, 0, len(group.Requests)) |
||||
for _, req := range group.Requests { |
||||
deletionQueries = append(deletionQueries, req.Query) |
||||
} |
||||
|
||||
// Split chunks into groups of maxChunksPerJob
|
||||
for i := 0; i < len(group.Chunks); i += maxChunksPerJob { |
||||
end := i + maxChunksPerJob |
||||
if end > len(group.Chunks) { |
||||
end = len(group.Chunks) |
||||
} |
||||
|
||||
payload, err := json.Marshal(&deletionJob{ |
||||
ChunkIDs: group.Chunks[i:end], |
||||
DeletionQueries: deletionQueries, |
||||
}) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
job := &jobqueue.Job{ |
||||
Id: fmt.Sprintf("%s_%d", groupID, i/maxChunksPerJob), |
||||
Type: jobqueue.JOB_TYPE_DELETION, |
||||
Payload: payload, |
||||
} |
||||
|
||||
b.currentManifestMtx.Lock() |
||||
b.currentManifest.jobsInProgress[job.Id] = struct{}{} |
||||
b.currentManifestMtx.Unlock() |
||||
|
||||
select { |
||||
case <-ctx.Done(): |
||||
return ctx.Err() |
||||
case jobsChan <- job: |
||||
} |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// OnJobResponse implements jobqueue.Builder interface
|
||||
func (b *JobBuilder) OnJobResponse(response *jobqueue.ReportJobResultRequest) { |
||||
b.currentManifestMtx.Lock() |
||||
defer b.currentManifestMtx.Unlock() |
||||
|
||||
if _, ok := b.currentManifest.jobsInProgress[response.JobId]; !ok { |
||||
return |
||||
} |
||||
|
||||
// Check for job failure
|
||||
if response.Error != "" { |
||||
util_log.Logger.Log("msg", "job failed", "job_id", response.JobId, "error", response.Error) |
||||
b.currentManifest.cancel() |
||||
return |
||||
} |
||||
|
||||
delete(b.currentManifest.jobsInProgress, response.JobId) |
||||
} |
||||
|
||||
func (b *JobBuilder) getSegment(ctx context.Context, segmentPath string) (*segment, error) { |
||||
reader, _, err := b.deleteStoreClient.GetObject(ctx, segmentPath) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
defer reader.Close() |
||||
|
||||
var segment segment |
||||
if err := json.NewDecoder(reader).Decode(&segment); err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return &segment, nil |
||||
} |
||||
@ -0,0 +1,253 @@ |
||||
package deletion |
||||
|
||||
import ( |
||||
"bytes" |
||||
"context" |
||||
"encoding/json" |
||||
"math" |
||||
"testing" |
||||
|
||||
"github.com/stretchr/testify/require" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/compactor/jobqueue" |
||||
"github.com/grafana/loki/v3/pkg/storage/chunk/client" |
||||
"github.com/grafana/loki/v3/pkg/storage/chunk/client/local" |
||||
) |
||||
|
||||
func TestJobBuilder_buildJobs(t *testing.T) { |
||||
for _, tc := range []struct { |
||||
name string |
||||
setupManifest func(client client.ObjectClient) |
||||
expectedJobs []jobqueue.Job |
||||
}{ |
||||
{ |
||||
name: "no manifests in storage", |
||||
setupManifest: func(_ client.ObjectClient) {}, |
||||
}, |
||||
{ |
||||
name: "one manifest in storage with less than maxChunksPerJob", |
||||
setupManifest: func(client client.ObjectClient) { |
||||
deleteRequestBatch := newDeleteRequestBatch(nil) |
||||
deleteRequestBatch.addDeleteRequest(&DeleteRequest{ |
||||
UserID: user1, |
||||
Query: lblFooBar, |
||||
StartTime: 0, |
||||
EndTime: math.MaxInt64, |
||||
}) |
||||
manifestBuilder, err := newDeletionManifestBuilder(client, *deleteRequestBatch) |
||||
require.NoError(t, err) |
||||
|
||||
require.NoError(t, manifestBuilder.AddSeries(context.Background(), table1, &mockSeries{ |
||||
userID: user1, |
||||
labels: mustParseLabel(lblFooBar), |
||||
chunks: buildRetentionChunks(0, maxChunksPerJob-1), |
||||
})) |
||||
|
||||
require.NoError(t, manifestBuilder.Finish(context.Background())) |
||||
}, |
||||
expectedJobs: []jobqueue.Job{ |
||||
{ |
||||
Id: "0_0", |
||||
Type: jobqueue.JOB_TYPE_DELETION, |
||||
Payload: mustMarshalPayload(&deletionJob{ |
||||
ChunkIDs: getChunkIDsFromRetentionChunks(buildRetentionChunks(0, maxChunksPerJob-1)), |
||||
DeletionQueries: []string{lblFooBar}, |
||||
}), |
||||
}, |
||||
}, |
||||
}, |
||||
{ |
||||
name: "one manifest in storage with more than maxChunksPerJob", |
||||
setupManifest: func(client client.ObjectClient) { |
||||
deleteRequestBatch := newDeleteRequestBatch(nil) |
||||
deleteRequestBatch.addDeleteRequest(&DeleteRequest{ |
||||
UserID: user1, |
||||
Query: lblFooBar, |
||||
StartTime: 0, |
||||
EndTime: math.MaxInt64, |
||||
}) |
||||
manifestBuilder, err := newDeletionManifestBuilder(client, *deleteRequestBatch) |
||||
require.NoError(t, err) |
||||
|
||||
require.NoError(t, manifestBuilder.AddSeries(context.Background(), table1, &mockSeries{ |
||||
userID: user1, |
||||
labels: mustParseLabel(lblFooBar), |
||||
chunks: buildRetentionChunks(0, maxChunksPerJob+1), |
||||
})) |
||||
|
||||
require.NoError(t, manifestBuilder.Finish(context.Background())) |
||||
}, |
||||
expectedJobs: []jobqueue.Job{ |
||||
{ |
||||
Id: "0_0", |
||||
Type: jobqueue.JOB_TYPE_DELETION, |
||||
Payload: mustMarshalPayload(&deletionJob{ |
||||
ChunkIDs: getChunkIDsFromRetentionChunks(buildRetentionChunks(0, maxChunksPerJob)), |
||||
DeletionQueries: []string{lblFooBar}, |
||||
}), |
||||
}, |
||||
{ |
||||
Id: "0_1", |
||||
Type: jobqueue.JOB_TYPE_DELETION, |
||||
Payload: mustMarshalPayload(&deletionJob{ |
||||
ChunkIDs: getChunkIDsFromRetentionChunks(buildRetentionChunks(maxChunksPerJob, 1)), |
||||
DeletionQueries: []string{lblFooBar}, |
||||
}), |
||||
}, |
||||
}, |
||||
}, |
||||
{ |
||||
name: "one manifest in storage with multiple groups", |
||||
setupManifest: func(client client.ObjectClient) { |
||||
deleteRequestBatch := newDeleteRequestBatch(nil) |
||||
deleteRequestBatch.addDeleteRequest(&DeleteRequest{ |
||||
UserID: user1, |
||||
RequestID: req1, |
||||
Query: lblFooBar, |
||||
StartTime: 0, |
||||
EndTime: 100, |
||||
}) |
||||
deleteRequestBatch.addDeleteRequest(&DeleteRequest{ |
||||
UserID: user1, |
||||
RequestID: req2, |
||||
Query: lblFizzBuzz, |
||||
StartTime: 51, |
||||
EndTime: 100, |
||||
}) |
||||
manifestBuilder, err := newDeletionManifestBuilder(client, *deleteRequestBatch) |
||||
require.NoError(t, err) |
||||
|
||||
require.NoError(t, manifestBuilder.AddSeries(context.Background(), table1, &mockSeries{ |
||||
userID: user1, |
||||
labels: mustParseLabel(lblFooBarAndFizzBuzz), |
||||
chunks: buildRetentionChunks(25, 50), |
||||
})) |
||||
|
||||
require.NoError(t, manifestBuilder.Finish(context.Background())) |
||||
}, |
||||
expectedJobs: []jobqueue.Job{ |
||||
{ |
||||
Id: "0_0", |
||||
Type: jobqueue.JOB_TYPE_DELETION, |
||||
Payload: mustMarshalPayload(&deletionJob{ |
||||
ChunkIDs: getChunkIDsFromRetentionChunks(buildRetentionChunks(25, 25)), |
||||
DeletionQueries: []string{lblFooBar}, |
||||
}), |
||||
}, |
||||
{ |
||||
Id: "1_0", |
||||
Type: jobqueue.JOB_TYPE_DELETION, |
||||
Payload: mustMarshalPayload(&deletionJob{ |
||||
ChunkIDs: getChunkIDsFromRetentionChunks(buildRetentionChunks(50, 25)), |
||||
DeletionQueries: []string{lblFooBar, lblFizzBuzz}, |
||||
}), |
||||
}, |
||||
}, |
||||
}, |
||||
} { |
||||
t.Run(tc.name, func(t *testing.T) { |
||||
objectClient, err := local.NewFSObjectClient(local.FSConfig{ |
||||
Directory: t.TempDir(), |
||||
}) |
||||
require.NoError(t, err) |
||||
tc.setupManifest(objectClient) |
||||
|
||||
builder := NewJobBuilder(objectClient) |
||||
jobsChan := make(chan *jobqueue.Job) |
||||
|
||||
var jobsBuilt []jobqueue.Job |
||||
go func() { |
||||
for job := range jobsChan { |
||||
jobsBuilt = append(jobsBuilt, *job) |
||||
builder.OnJobResponse(&jobqueue.ReportJobResultRequest{ |
||||
JobId: job.Id, |
||||
JobType: job.Type, |
||||
}) |
||||
} |
||||
}() |
||||
|
||||
err = builder.buildJobs(context.Background(), jobsChan) |
||||
require.NoError(t, err) |
||||
|
||||
require.Equal(t, len(tc.expectedJobs), len(jobsBuilt)) |
||||
require.Equal(t, tc.expectedJobs, jobsBuilt) |
||||
}) |
||||
} |
||||
} |
||||
|
||||
func TestJobBuilder_ProcessManifest(t *testing.T) { |
||||
for _, tc := range []struct { |
||||
name string |
||||
jobProcessingError string |
||||
}{ |
||||
{ |
||||
name: "all jobs succeeded", |
||||
}, { |
||||
name: "job failure should fail the manifest processing", |
||||
jobProcessingError: "job processing failed", |
||||
}, |
||||
} { |
||||
t.Run(tc.name, func(t *testing.T) { |
||||
objectClient, err := local.NewFSObjectClient(local.FSConfig{ |
||||
Directory: t.TempDir(), |
||||
}) |
||||
require.NoError(t, err) |
||||
|
||||
builder := NewJobBuilder(objectClient) |
||||
|
||||
// Create a test manifest
|
||||
manifest := &manifest{ |
||||
SegmentsCount: 1, |
||||
} |
||||
manifestData, err := json.Marshal(manifest) |
||||
require.NoError(t, err) |
||||
err = objectClient.PutObject(context.Background(), "test-manifest/manifest.json", bytes.NewReader(manifestData)) |
||||
require.NoError(t, err) |
||||
|
||||
// Create a test segment
|
||||
segment := &segment{ |
||||
UserID: "user1", |
||||
TableName: "table1", |
||||
ChunksGroups: []ChunksGroup{ |
||||
{ |
||||
Chunks: []string{"chunk1", "chunk2"}, |
||||
Requests: []DeleteRequest{ |
||||
{Query: "{job=\"test\"}"}, |
||||
}, |
||||
}, |
||||
}, |
||||
} |
||||
segmentData, err := json.Marshal(segment) |
||||
require.NoError(t, err) |
||||
err = objectClient.PutObject(context.Background(), "test-manifest/1.json", bytes.NewReader(segmentData)) |
||||
require.NoError(t, err) |
||||
|
||||
jobsChan := make(chan *jobqueue.Job) |
||||
go func() { |
||||
for job := range jobsChan { |
||||
builder.OnJobResponse(&jobqueue.ReportJobResultRequest{ |
||||
JobId: job.Id, |
||||
JobType: job.Type, |
||||
Error: tc.jobProcessingError, |
||||
}) |
||||
} |
||||
}() |
||||
|
||||
err = builder.processManifest(context.Background(), "test-manifest", jobsChan) |
||||
if tc.jobProcessingError != "" { |
||||
require.ErrorIs(t, err, context.Canceled) |
||||
} else { |
||||
require.NoError(t, err) |
||||
} |
||||
}) |
||||
} |
||||
} |
||||
|
||||
func mustMarshalPayload(job *deletionJob) []byte { |
||||
payload, err := json.Marshal(job) |
||||
if err != nil { |
||||
panic(err) |
||||
} |
||||
|
||||
return payload |
||||
} |
||||
@ -0,0 +1,261 @@ |
||||
package jobqueue |
||||
|
||||
import ( |
||||
"context" |
||||
"errors" |
||||
"sync" |
||||
"time" |
||||
|
||||
"github.com/go-kit/log/level" |
||||
"go.uber.org/atomic" |
||||
"google.golang.org/grpc/codes" |
||||
"google.golang.org/grpc/status" |
||||
|
||||
util_log "github.com/grafana/loki/v3/pkg/util/log" |
||||
) |
||||
|
||||
var ( |
||||
// ErrBuilderAlreadyRegistered is returned when trying to register a builder for a job type that already has one
|
||||
ErrBuilderAlreadyRegistered = errors.New("builder already registered for this job type") |
||||
) |
||||
|
||||
// Builder defines the interface for building jobs that will be added to the queue
|
||||
type Builder interface { |
||||
// BuildJobs builds new jobs and sends them to the provided channel
|
||||
// It should be a blocking call and returns when ctx is cancelled.
|
||||
BuildJobs(ctx context.Context, jobsChan chan<- *Job) error |
||||
|
||||
// OnJobResponse reports back the response of the job execution.
|
||||
OnJobResponse(response *ReportJobResultRequest) |
||||
} |
||||
|
||||
// Queue implements the job queue service
|
||||
type Queue struct { |
||||
queue chan *Job |
||||
closed atomic.Bool |
||||
builders map[JobType]Builder |
||||
wg sync.WaitGroup |
||||
stop chan struct{} |
||||
checkTimedOutJobsInterval time.Duration |
||||
|
||||
// Track jobs that are being processed
|
||||
processingJobs map[string]*processingJob |
||||
processingJobsMtx sync.RWMutex |
||||
jobTimeout time.Duration |
||||
maxRetries int |
||||
} |
||||
|
||||
type processingJob struct { |
||||
job *Job |
||||
dequeued time.Time |
||||
retryCount int |
||||
} |
||||
|
||||
// New creates a new job queue
|
||||
func New() *Queue { |
||||
return newQueue(time.Minute) |
||||
} |
||||
|
||||
// newQueue creates a new job queue with a configurable timed out jobs check ticker interval (for testing)
|
||||
func newQueue(checkTimedOutJobsInterval time.Duration) *Queue { |
||||
q := &Queue{ |
||||
queue: make(chan *Job), |
||||
builders: make(map[JobType]Builder), |
||||
stop: make(chan struct{}), |
||||
checkTimedOutJobsInterval: checkTimedOutJobsInterval, |
||||
processingJobs: make(map[string]*processingJob), |
||||
// ToDo(Sandeep): make jobTimeout and maxRetries configurable(possibly job specific)
|
||||
jobTimeout: 15 * time.Minute, |
||||
maxRetries: 3, |
||||
} |
||||
|
||||
// Start the job timeout checker
|
||||
q.wg.Add(1) |
||||
go q.checkJobTimeouts() |
||||
|
||||
return q |
||||
} |
||||
|
||||
// RegisterBuilder registers a builder for a specific job type
|
||||
func (q *Queue) RegisterBuilder(jobType JobType, builder Builder) error { |
||||
if _, exists := q.builders[jobType]; exists { |
||||
return ErrBuilderAlreadyRegistered |
||||
} |
||||
|
||||
q.builders[jobType] = builder |
||||
return nil |
||||
} |
||||
|
||||
// Start starts all registered builders
|
||||
func (q *Queue) Start(ctx context.Context) error { |
||||
for jobType, builder := range q.builders { |
||||
q.wg.Add(1) |
||||
go q.startBuilder(ctx, jobType, builder) |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// Stop stops all builders
|
||||
func (q *Queue) Stop() error { |
||||
close(q.stop) |
||||
q.wg.Wait() |
||||
return nil |
||||
} |
||||
|
||||
func (q *Queue) startBuilder(ctx context.Context, jobType JobType, builder Builder) { |
||||
defer q.wg.Done() |
||||
|
||||
// Start the builder in a separate goroutine
|
||||
builderErrChan := make(chan error, 1) |
||||
go func() { |
||||
builderErrChan <- builder.BuildJobs(ctx, q.queue) |
||||
}() |
||||
|
||||
for { |
||||
select { |
||||
case <-ctx.Done(): |
||||
return |
||||
case <-q.stop: |
||||
return |
||||
case err := <-builderErrChan: |
||||
if err != nil && !errors.Is(err, context.Canceled) { |
||||
level.Error(util_log.Logger).Log("msg", "builder error", "job_type", jobType, "error", err) |
||||
} |
||||
return |
||||
} |
||||
} |
||||
} |
||||
|
||||
func (q *Queue) checkJobTimeouts() { |
||||
defer q.wg.Done() |
||||
|
||||
ticker := time.NewTicker(q.checkTimedOutJobsInterval) |
||||
defer ticker.Stop() |
||||
|
||||
for { |
||||
select { |
||||
case <-q.stop: |
||||
return |
||||
case <-ticker.C: |
||||
q.processingJobsMtx.Lock() |
||||
now := time.Now() |
||||
for jobID, pj := range q.processingJobs { |
||||
if now.Sub(pj.dequeued) > q.jobTimeout { |
||||
// Requeue the job
|
||||
select { |
||||
case <-q.stop: |
||||
return |
||||
case q.queue <- pj.job: |
||||
level.Warn(util_log.Logger).Log( |
||||
"msg", "job timed out, requeuing", |
||||
"job_id", jobID, |
||||
"job_type", pj.job.Type, |
||||
"timeout", q.jobTimeout, |
||||
) |
||||
} |
||||
delete(q.processingJobs, jobID) |
||||
} |
||||
} |
||||
q.processingJobsMtx.Unlock() |
||||
} |
||||
} |
||||
} |
||||
|
||||
// Dequeue implements the gRPC Dequeue method
|
||||
func (q *Queue) Dequeue(ctx context.Context, _ *DequeueRequest) (*DequeueResponse, error) { |
||||
if q.closed.Load() { |
||||
return &DequeueResponse{}, nil |
||||
} |
||||
|
||||
select { |
||||
case <-ctx.Done(): |
||||
return nil, status.Error(codes.Canceled, ctx.Err().Error()) |
||||
case job, ok := <-q.queue: |
||||
if !ok { |
||||
return &DequeueResponse{}, nil |
||||
} |
||||
|
||||
// Track the job as being processed
|
||||
q.processingJobsMtx.Lock() |
||||
defer q.processingJobsMtx.Unlock() |
||||
q.processingJobs[job.Id] = &processingJob{ |
||||
job: job, |
||||
dequeued: time.Now(), |
||||
retryCount: 0, |
||||
} |
||||
|
||||
return &DequeueResponse{ |
||||
Job: job, |
||||
}, nil |
||||
} |
||||
} |
||||
|
||||
// ReportJobResult implements the gRPC ReportJobResult method
|
||||
func (q *Queue) ReportJobResult(ctx context.Context, req *ReportJobResultRequest) (*ReportJobResultResponse, error) { |
||||
if req == nil { |
||||
return nil, status.Error(codes.InvalidArgument, "request cannot be nil") |
||||
} |
||||
|
||||
q.processingJobsMtx.Lock() |
||||
defer q.processingJobsMtx.Unlock() |
||||
pj, exists := q.processingJobs[req.JobId] |
||||
if !exists { |
||||
return nil, status.Error(codes.NotFound, "job not found") |
||||
} |
||||
|
||||
if req.Error != "" { |
||||
level.Error(util_log.Logger).Log( |
||||
"msg", "job execution failed", |
||||
"job_id", req.JobId, |
||||
"job_type", req.JobType, |
||||
"error", req.Error, |
||||
"retry_count", pj.retryCount, |
||||
) |
||||
|
||||
// Check if we should retry the job
|
||||
if pj.retryCount < q.maxRetries { |
||||
pj.retryCount++ |
||||
level.Info(util_log.Logger).Log( |
||||
"msg", "retrying failed job", |
||||
"job_id", req.JobId, |
||||
"job_type", req.JobType, |
||||
"retry_count", pj.retryCount, |
||||
"max_retries", q.maxRetries, |
||||
) |
||||
|
||||
// Requeue the job
|
||||
select { |
||||
case <-ctx.Done(): |
||||
case q.queue <- pj.job: |
||||
return &ReportJobResultResponse{}, nil |
||||
} |
||||
} else { |
||||
level.Error(util_log.Logger).Log( |
||||
"msg", "job failed after max retries", |
||||
"job_id", req.JobId, |
||||
"job_type", req.JobType, |
||||
"max_retries", q.maxRetries, |
||||
) |
||||
} |
||||
} else { |
||||
level.Debug(util_log.Logger).Log( |
||||
"msg", "job execution succeeded", |
||||
"job_id", req.JobId, |
||||
"job_type", req.JobType, |
||||
) |
||||
} |
||||
q.builders[req.JobType].OnJobResponse(req) |
||||
|
||||
// Remove the job from processing jobs
|
||||
delete(q.processingJobs, req.JobId) |
||||
|
||||
return &ReportJobResultResponse{}, nil |
||||
} |
||||
|
||||
// Close closes the queue and releases all resources
|
||||
func (q *Queue) Close() { |
||||
if !q.closed.Load() { |
||||
close(q.queue) |
||||
q.closed.Store(true) |
||||
} |
||||
} |
||||
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,48 @@ |
||||
syntax = "proto3"; |
||||
|
||||
package jobqueue; |
||||
|
||||
import "google/protobuf/any.proto"; |
||||
import "google/protobuf/timestamp.proto"; |
||||
|
||||
option go_package = "github.com/grafana/loki/pkg/compactor/jobqueue"; |
||||
|
||||
// Job represents a single job in the queue |
||||
message Job { |
||||
string id = 1; |
||||
JobType type = 2; |
||||
bytes payload = 3; // encoded job specific payload |
||||
} |
||||
|
||||
// JobType represents the type of job |
||||
enum JobType { |
||||
JOB_TYPE_DELETION = 0; |
||||
// Add more job types as needed |
||||
} |
||||
|
||||
// DequeueRequest is used to request a job from the queue |
||||
message DequeueRequest {} |
||||
|
||||
// DequeueResponse contains the dequeued job |
||||
message DequeueResponse { |
||||
Job job = 1; |
||||
} |
||||
|
||||
// ReportJobResultRequest is used to report the result of executing a job |
||||
message ReportJobResultRequest { |
||||
string job_id = 1; |
||||
JobType job_type = 2; |
||||
string error = 3; // Empty string indicates success |
||||
google.protobuf.Any result = 4; |
||||
} |
||||
|
||||
// ReportJobResultResponse is the response to reporting a job result |
||||
message ReportJobResultResponse {} |
||||
|
||||
// JobQueue provides RPC methods for job queue operations |
||||
service JobQueue { |
||||
// Dequeue retrieves the next job from the queue |
||||
rpc Dequeue(DequeueRequest) returns (DequeueResponse) {} |
||||
// ReportJobResult reports the result of executing a job |
||||
rpc ReportJobResult(ReportJobResultRequest) returns (ReportJobResultResponse) {} |
||||
} |
||||
@ -0,0 +1,238 @@ |
||||
package jobqueue |
||||
|
||||
import ( |
||||
"context" |
||||
"errors" |
||||
"sync" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/stretchr/testify/require" |
||||
) |
||||
|
||||
// mockBuilder implements the Builder interface for testing
|
||||
type mockBuilder struct { |
||||
jobsToBuild []*Job |
||||
buildErr error |
||||
} |
||||
|
||||
func (m *mockBuilder) OnJobResponse(_ *ReportJobResultRequest) {} |
||||
|
||||
func (m *mockBuilder) BuildJobs(ctx context.Context, jobsChan chan<- *Job) error { |
||||
if m.buildErr != nil { |
||||
return m.buildErr |
||||
} |
||||
|
||||
for _, job := range m.jobsToBuild { |
||||
select { |
||||
case <-ctx.Done(): |
||||
return ctx.Err() |
||||
case jobsChan <- job: |
||||
} |
||||
} |
||||
|
||||
// Keep running until context is cancelled
|
||||
<-ctx.Done() |
||||
return ctx.Err() |
||||
} |
||||
|
||||
func TestQueue_RegisterBuilder(t *testing.T) { |
||||
q := New() |
||||
builder := &mockBuilder{} |
||||
|
||||
// Register builder successfully
|
||||
err := q.RegisterBuilder(JOB_TYPE_DELETION, builder) |
||||
require.NoError(t, err) |
||||
|
||||
// Try to register same builder type again
|
||||
err = q.RegisterBuilder(JOB_TYPE_DELETION, builder) |
||||
require.ErrorIs(t, err, ErrBuilderAlreadyRegistered) |
||||
} |
||||
|
||||
func TestQueue_Dequeue(t *testing.T) { |
||||
q := New() |
||||
|
||||
// Create a test job
|
||||
job := &Job{ |
||||
Id: "test-job", |
||||
Type: JOB_TYPE_DELETION, |
||||
} |
||||
|
||||
go func() { |
||||
// Enqueue the job
|
||||
q.queue <- job |
||||
}() |
||||
|
||||
// Dequeue the job
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Minute) |
||||
defer cancel() |
||||
|
||||
resp, err := q.Dequeue(ctx, &DequeueRequest{}) |
||||
require.NoError(t, err) |
||||
require.Equal(t, job, resp.Job) |
||||
|
||||
// Verify job is tracked as being processed
|
||||
q.processingJobsMtx.RLock() |
||||
pj, exists := q.processingJobs[job.Id] |
||||
q.processingJobsMtx.RUnlock() |
||||
require.True(t, exists) |
||||
require.Equal(t, job, pj.job) |
||||
require.Equal(t, 0, pj.retryCount) |
||||
} |
||||
|
||||
func TestQueue_ReportJobResult(t *testing.T) { |
||||
ctx := context.Background() |
||||
q := New() |
||||
require.NoError(t, q.RegisterBuilder(JOB_TYPE_DELETION, &mockBuilder{})) |
||||
|
||||
// Create a test job
|
||||
job := &Job{ |
||||
Id: "test-job", |
||||
Type: JOB_TYPE_DELETION, |
||||
} |
||||
|
||||
// Add job to processing jobs
|
||||
q.processingJobsMtx.Lock() |
||||
q.processingJobs[job.Id] = &processingJob{ |
||||
job: job, |
||||
dequeued: time.Now(), |
||||
retryCount: 0, |
||||
} |
||||
q.processingJobsMtx.Unlock() |
||||
|
||||
// Test successful response
|
||||
resp, err := q.ReportJobResult(ctx, &ReportJobResultRequest{ |
||||
JobId: job.Id, |
||||
JobType: job.Type, |
||||
}) |
||||
require.NoError(t, err) |
||||
require.NotNil(t, resp) |
||||
|
||||
// Verify job is removed from processing jobs
|
||||
q.processingJobsMtx.RLock() |
||||
_, exists := q.processingJobs[job.Id] |
||||
q.processingJobsMtx.RUnlock() |
||||
require.False(t, exists) |
||||
|
||||
// Test error response with retry
|
||||
job.Id = "retry-job" |
||||
q.processingJobsMtx.Lock() |
||||
q.processingJobs[job.Id] = &processingJob{ |
||||
job: job, |
||||
dequeued: time.Now(), |
||||
retryCount: 0, |
||||
} |
||||
q.processingJobsMtx.Unlock() |
||||
|
||||
var wg sync.WaitGroup |
||||
wg.Add(1) |
||||
|
||||
go func() { |
||||
defer wg.Done() |
||||
|
||||
resp, err = q.ReportJobResult(ctx, &ReportJobResultRequest{ |
||||
JobId: job.Id, |
||||
JobType: job.Type, |
||||
Error: "test error", |
||||
}) |
||||
require.NoError(t, err) |
||||
require.NotNil(t, resp) |
||||
}() |
||||
|
||||
// Verify job is requeued with timeout
|
||||
select { |
||||
case requeuedJob := <-q.queue: |
||||
require.Equal(t, job, requeuedJob) |
||||
case <-time.After(time.Minute): |
||||
t.Fatal("job was not requeued") |
||||
} |
||||
|
||||
wg.Wait() |
||||
|
||||
// Verify retry count is incremented
|
||||
q.processingJobsMtx.RLock() |
||||
pj, exists := q.processingJobs[job.Id] |
||||
q.processingJobsMtx.RUnlock() |
||||
require.True(t, exists) |
||||
require.Equal(t, 1, pj.retryCount) |
||||
} |
||||
|
||||
func TestQueue_JobTimeout(t *testing.T) { |
||||
q := newQueue(50 * time.Millisecond) |
||||
q.jobTimeout = 100 * time.Millisecond // Short timeout for testing
|
||||
|
||||
// Create a test job
|
||||
job := &Job{ |
||||
Id: "test-job", |
||||
Type: JOB_TYPE_DELETION, |
||||
} |
||||
|
||||
// Add job to processing jobs with old dequeued time
|
||||
q.processingJobsMtx.Lock() |
||||
q.processingJobs[job.Id] = &processingJob{ |
||||
job: job, |
||||
dequeued: time.Now().Add(-200 * time.Millisecond), |
||||
retryCount: 0, |
||||
} |
||||
q.processingJobsMtx.Unlock() |
||||
|
||||
// Wait for timeout checker to run
|
||||
time.Sleep(100 * time.Millisecond) |
||||
|
||||
// Verify job is requeued
|
||||
select { |
||||
case requeuedJob := <-q.queue: |
||||
require.Equal(t, job, requeuedJob) |
||||
case <-time.After(time.Second): |
||||
t.Fatal("job was not requeued after timeout") |
||||
} |
||||
|
||||
// Verify job is removed from processing jobs
|
||||
q.processingJobsMtx.RLock() |
||||
_, exists := q.processingJobs[job.Id] |
||||
q.processingJobsMtx.RUnlock() |
||||
require.False(t, exists) |
||||
} |
||||
|
||||
func TestQueue_StartStop(t *testing.T) { |
||||
q := New() |
||||
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) |
||||
defer cancel() |
||||
|
||||
// Create a builder that returns an error
|
||||
builder := &mockBuilder{ |
||||
buildErr: errors.New("test error"), |
||||
} |
||||
|
||||
// Register and start the builder
|
||||
err := q.RegisterBuilder(JOB_TYPE_DELETION, builder) |
||||
require.NoError(t, err) |
||||
|
||||
err = q.Start(ctx) |
||||
require.NoError(t, err) |
||||
|
||||
// Wait for context cancellation
|
||||
<-ctx.Done() |
||||
|
||||
// Stop the queue
|
||||
err = q.Stop() |
||||
require.NoError(t, err) |
||||
} |
||||
|
||||
func TestQueue_Close(t *testing.T) { |
||||
q := New() |
||||
|
||||
// Close the queue
|
||||
q.Close() |
||||
|
||||
// Verify queue is closed
|
||||
require.True(t, q.closed.Load()) |
||||
|
||||
// Verify channel is closed
|
||||
select { |
||||
case _, ok := <-q.queue: |
||||
require.False(t, ok) |
||||
default: |
||||
t.Fatal("queue channel should be closed") |
||||
} |
||||
} |
||||
Loading…
Reference in new issue